Como implementar tarefas assíncronas
Pergunta:
Como lidar de forma assíncrona com tarefas pesadas para evitar que a operação principal seja bloqueada por longos períodos de tempo. Por exemplo, se eu precisar enviar e-mails para 1000 usuários, esse processo é lento e pode bloquear o servidor por alguns segundos. Durante esse período, o bloqueio da operação principal pode afetar as solicitações subsequentes. Como posso enviar tarefas pesadas para serem processadas de forma assíncrona por outros processos?
Resposta:
Você pode estabelecer préviamente em um único servidor, em outro servidor ou até mesmo em um cluster de servidores, alguns processos de tarefas para lidar com tarefas pesadas. O número de processos de tarefas pode ser aumentado, por exemplo, 10 vezes a quantidade de CPUs. Em seguida, o chamador pode usar a AsyncTcpConnection para enviar os dados de forma assíncrona para esses processos de tarefa para serem processados assincronamente e obter os resultados de forma assíncrona.
Servidor de processos de tarefas:
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// Trabalhador de tarefas, usando o protocolo Text
$task_worker = new Worker('Text://0.0.0.0:12345');
// O número de processos de tarefas pode ser aumentado conforme necessário
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
// Suponha que os dados recebidos sejam do tipo JSON
$task_data = json_decode($task_data, true);
// Processar os dados da tarefa correspondente a task_data.... Obter o resultado, vamos omitir isso...
$task_result = ......
// Enviar o resultado
$connection->send(json_encode($task_result));
};
Worker::runAll();
Chamada no workerman:
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// Serviço websocket
$worker = new Worker('websocket://0.0.0.0:8080');
$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
// Estabelece uma conexão assíncrona com o serviço de tarefas remoto, o IP é o do serviço de tarefas remoto, se for o mesmo servidor, é 127.0.0.1, se for um cluster é o IP do LVS
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
// Dados da tarefa e argumentos
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// Envio de dados
$task_connection->send(json_encode($task_data));
// Recebimento assíncrono do resultado
$task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result)use($ws_connection)
{
// Resultado
var_dump($task_result);
// Após receber o resultado, lembre-se de fechar a conexão assíncrona
$task_connection->close();
// Notificar o cliente do websocket que a tarefa foi concluída
$ws_connection->send('tarefa concluída');
};
// Executa a conexão assíncrona
$task_connection->connect();
};
Worker::runAll();
Dessa forma, as tarefas pesadas são processadas por processos no mesmo servidor ou em outros servidores, e quando a tarefa é concluída, o resultado é recebido de forma assíncrona, garantindo que o processo de negócios não seja bloqueado.