Как реализовать асинхронные задачи

Вопрос:

Как асинхронно обрабатывать тяжелые бизнес-задачи, избегая длительной блокировки основной бизнес-логики? Например, если мне нужно отправить электронные письма 1000 пользователям, этот процесс может занять много времени и заблокировать выполнение на несколько секунд, что повлияет на последующие запросы. Как можно передать такие тяжелые задачи другим процессам для асинхронной обработки?

Ответ:

Можно заранее создать несколько рабочих процессов для обработки тяжелых бизнес-задач на локальной машине или на других серверах, или даже в серверных кластерах. Количество рабочих процессов можно увеличить, например, до 10 раз больше, чем количество процессоров, а вызывающая сторона может использовать AsyncTcpConnection для асинхронной отправки данных этим рабочим процессам, получая результаты обработки асинхронно.

Сервер рабочих процессов

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// task worker, использующий текстовый протокол
$task_worker = new Worker('Text://0.0.0.0:12345');
// количество рабочих процессов может быть увеличено по необходимости
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
     // предположим, что пришли json данные
     $task_data = json_decode($task_data, true);
     // обработка соответствующей логики задачи в зависимости от task_data... получаем результат, здесь опущено....
     $task_result = ......
     // отправка результата
     $connection->send(json_encode($task_result));
};
Worker::runAll();

Вызов в Workerman

use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// websocket служба
$worker = new Worker('websocket://0.0.0.0:8080');

$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
    // установить асинхронное соединение с удаленным сервисом задач, IP - это IP удаленного сервиса задач; для локальной машины это 127.0.0.1, для кластера - это IP LVS
    $task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
    // данные задачи и параметры
    $task_data = array(
        'function' => 'send_mail',
        'args'       => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
    );
    // отправка данных
    $task_connection->send(json_encode($task_data));
    // асинхронное получение результата
    $task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result) use ($ws_connection)
    {
         // результат
         var_dump($task_result);
         // после получения результата не забудьте закрыть асинхронное соединение
         $task_connection->close();
         // уведомить соответствующий websocket клиент о завершении задачи
         $ws_connection->send('task complete');
    };
    // выполнение асинхронного соединения
    $task_connection->connect();
};

Worker::runAll();

Таким образом, тяжелые задачи передаются на выполнение процессам на локальной машине или на других серверах, и после завершения задачи результаты будут получены асинхронно, что предотвратит блокировку бизнес-процесса.