Come gestire i compiti asincroni

Domanda:

Come gestire in modo asincrono carichi di lavoro pesanti, evitando che il processo principale venga bloccato per lungo tempo. Ad esempio, se devo inviare email a 1000 utenti, questo processo è molto lento e potrebbe bloccare il flusso principale per alcuni secondi, influenzando le richieste successive. Come posso delegare compiti pesanti ad altri processi in modo asincrono?

Risposta:

È possibile creare in anticipo alcuni processi di lavoro per gestire carichi pesanti sulla macchina locale o su altri server, anche su un cluster di server. Puoi aumentare il numero di processi di lavoro, ad esempio fino a 10 volte il numero di CPU, e il chiamante può utilizzare AsyncTcpConnection per inviare dati in modo asincrono a questi processi di lavoro, ricevendo in modo asincrono i risultati del trattamento.

Server dei processi di lavoro

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// processi di lavoro per i compiti, utilizza il protocollo Text
$task_worker = new Worker('Text://0.0.0.0:12345');
// il numero di processi di lavoro può essere aumentato secondo necessità
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
     // Si suppone che i dati inviati siano in json
     $task_data = json_decode($task_data, true);
     // Gestire la logica del compito in base a task_data .... ottenere il risultato, qui omesso ....
     $task_result = ......
     // Inviare il risultato
     $connection->send(json_encode($task_result));
};
Worker::runAll();

Chiamata in Workerman

use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// servizio websocket
$worker = new Worker('websocket://0.0.0.0:8080');

$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
    // Stabilire una connessione asincrona con il servizio task remoto, l'ip è quello del servizio task remoto, se è locale è 127.0.0.1, se è un cluster è l'ip di lvs
    $task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
    // dati del compito e parametri
    $task_data = array(
        'function' => 'send_mail',
        'args'       => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
    );
    // Inviare i dati
    $task_connection->send(json_encode($task_data));
    // Ottenere i risultati in modo asincrono
    $task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result) use($ws_connection)
    {
         // Risultato
         var_dump($task_result);
         // Ricordati di chiudere la connessione asincrona dopo aver ottenuto il risultato
         $task_connection->close();
         // Informare il client websocket corrispondente che il compito è stato completato
         $ws_connection->send('task complete');
    };
    // Eseguire la connessione asincrona
    $task_connection->connect();
};

Worker::runAll();

In questo modo, i compiti pesanti vengono delegati ai processi locali o ad altri server e, una volta completati, i risultati vengono ricevuti in modo asincrono, evitando che il processo aziendale si blocchi.