Cómo implementar tareas asincrónicas

Pregunta:

¿Cómo procesar tareas pesadas de forma asincrónica y evitar que el proceso principal se bloquee durante mucho tiempo? Por ejemplo, quiero enviar correos electrónicos a 1000 usuarios, lo que puede ser un proceso lento y puede bloquear durante varios segundos. Durante este tiempo, debido al bloqueo del flujo principal, se verán afectadas las solicitudes posteriores. ¿Cómo puedo delegar este tipo de tareas pesadas a otros procesos para que se manejen de forma asincrónica?

Respuesta:

Se pueden establecer previamente algunos procesos de tarea en la máquina local o en otros servidores, incluso en un clúster de servidores, para manejar las cargas pesadas. El número de procesos de tarea se puede aumentar, por ejemplo, hasta 10 veces el número de CPU. Luego, el llamador puede utilizar AsyncTcpConnection para enviar los datos de forma asincrónica a estos procesos de tarea para su procesamiento, obteniendo los resultados de forma asincrónica.

Servidor de procesos de tarea

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// worker de tarea, usando el protocolo Text
$task_worker = new Worker('Text://0.0.0.0:12345');
// Se puede abrir más procesos de tarea según sea necesario
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
     // Supongamos que se recibe datos en formato json
     $task_data = json_decode($task_data, true);
     // Procesar la lógica de tarea correspondiente según task_data.... obtener resultado, omitido aquí....
     $task_result = ......
     // Enviar resultado
     $connection->send(json_encode($task_result));
};
Worker::runAll();

Llamada en Workerman

use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// servicio websocket
$worker = new Worker('websocket://0.0.0.0:8080');

$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
    // Establecer una conexión asincrónica con el servicio de tarea remoto; la ip es para el servicio de tarea remoto, si es local será 127.0.0.1, y si es un clúster será la ip de lvs
    $task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
    // Datos y parámetros de la tarea
    $task_data = array(
        'function' => 'send_mail',
        'args'       => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
    );
    // Enviar datos
    $task_connection->send(json_encode($task_data));
    // Obtener el resultado de forma asincrónica
    $task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result) use($ws_connection)
    {
         // Resultado
         var_dump($task_result);
         // Recuerda cerrar la conexión asincrónica después de obtener el resultado
         $task_connection->close();
         // Notificar al cliente websocket correspondiente que la tarea se ha completado
         $ws_connection->send('task complete');
    };
    // Ejecutar la conexión asincrónica
    $task_connection->connect();
};

Worker::runAll();

De esta manera, las tareas pesadas se delegan a procesos en la máquina local o en otros servidores. Una vez que la tarea se completa, se recibe el resultado de manera asincrónica, lo que evita que el proceso de negocio se bloquee.