Как реализовать асинхронные задачи
Вопрос:
Как асинхронно обрабатывать тяжелые бизнес-задачи, избегая длительной блокировки основной бизнес-логики? Например, если мне нужно отправить электронные письма 1000 пользователям, этот процесс может занять много времени и заблокировать выполнение на несколько секунд, что повлияет на последующие запросы. Как можно передать такие тяжелые задачи другим процессам для асинхронной обработки?
Ответ:
Можно заранее создать несколько рабочих процессов для обработки тяжелых бизнес-задач на локальной машине или на других серверах, или даже в серверных кластерах. Количество рабочих процессов можно увеличить, например, до 10 раз больше, чем количество процессоров, а вызывающая сторона может использовать AsyncTcpConnection для асинхронной отправки данных этим рабочим процессам, получая результаты обработки асинхронно.
Сервер рабочих процессов
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// task worker, использующий текстовый протокол
$task_worker = new Worker('Text://0.0.0.0:12345');
// количество рабочих процессов может быть увеличено по необходимости
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
// предположим, что пришли json данные
$task_data = json_decode($task_data, true);
// обработка соответствующей логики задачи в зависимости от task_data... получаем результат, здесь опущено....
$task_result = ......
// отправка результата
$connection->send(json_encode($task_result));
};
Worker::runAll();
Вызов в Workerman
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// websocket служба
$worker = new Worker('websocket://0.0.0.0:8080');
$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
// установить асинхронное соединение с удаленным сервисом задач, IP - это IP удаленного сервиса задач; для локальной машины это 127.0.0.1, для кластера - это IP LVS
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
// данные задачи и параметры
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// отправка данных
$task_connection->send(json_encode($task_data));
// асинхронное получение результата
$task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result) use ($ws_connection)
{
// результат
var_dump($task_result);
// после получения результата не забудьте закрыть асинхронное соединение
$task_connection->close();
// уведомить соответствующий websocket клиент о завершении задачи
$ws_connection->send('task complete');
};
// выполнение асинхронного соединения
$task_connection->connect();
};
Worker::runAll();
Таким образом, тяжелые задачи передаются на выполнение процессам на локальной машине или на других серверах, и после завершения задачи результаты будут получены асинхронно, что предотвратит блокировку бизнес-процесса.