Como implementar tarefas assíncronas
Pergunta:
Como lidar com cargas de trabalho pesadas de forma assíncrona, evitando que o fluxo principal seja bloqueado por longos períodos? Por exemplo, eu preciso enviar e-mails para 1000 usuários, e esse processo é muito lento, podendo causar um bloqueio de vários segundos. Durante esse tempo, o fluxo principal pode ser afetado, impactando as solicitações subsequentes. Como delegar tais tarefas pesadas a outros processos para serem tratadas de forma assíncrona?
Resposta:
Pode-se pré-estabelecer algumas instâncias de processos de tarefa em sua máquina, em outros servidores ou até mesmo em um cluster de servidores, para lidar com cargas de trabalho pesadas. O número de processos de tarefa pode ser aumentado, por exemplo, em até 10 vezes o número de CPUs. O chamador pode utilizar AsyncTcpConnection para enviar dados de forma assíncrona para esses processos de tarefa e, subsequentemente, receber os resultados de forma assíncrona.
Servidor do processo de tarefa
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// worker de tarefa, utilizando protocolo Text
$task_worker = new Worker('Text://0.0.0.0:12345');
// o número de processos de tarefa pode ser aumentado conforme necessário
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
// supondo que os dados enviados sejam em formato json
$task_data = json_decode($task_data, true);
// processa a lógica da tarefa com base em task_data.... obtem resultados, omitido aqui....
$task_result = ......
// envia o resultado
$connection->send(json_encode($task_result));
};
Worker::runAll();
Chamada no Workerman
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// serviço websocket
$worker = new Worker('websocket://0.0.0.0:8080');
$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
// estabelece uma conexão assíncrona com o serviço de tarefa remoto, o ip é o ip do serviço de tarefa remoto, se for local, é 127.0.0.1, se for em cluster, é o ip do lvs
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
// dados da tarefa e parâmetros
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// envia os dados
$task_connection->send(json_encode($task_data));
// obtém resultados de forma assíncrona
$task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result)use($ws_connection)
{
// resultado
var_dump($task_result);
// lembre-se de fechar a conexão assíncrona após obter o resultado
$task_connection->close();
// notifica o cliente websocket correspondente que a tarefa foi concluída
$ws_connection->send('tarefa completa');
};
// executa a conexão assíncrona
$task_connection->connect();
};
Worker::runAll();
Dessa forma, as tarefas pesadas são delegadas a processos na máquina local ou em outros servidores, e quando a tarefa é concluída, os resultados são recebidos de forma assíncrona, evitando o bloqueio do processo de negócios.