Come gestire i compiti asincroni
Domanda:
Come gestire in modo asincrono carichi di lavoro pesanti, evitando che il processo principale venga bloccato per lungo tempo. Ad esempio, se devo inviare email a 1000 utenti, questo processo è molto lento e potrebbe bloccare il flusso principale per alcuni secondi, influenzando le richieste successive. Come posso delegare compiti pesanti ad altri processi in modo asincrono?
Risposta:
È possibile creare in anticipo alcuni processi di lavoro per gestire carichi pesanti sulla macchina locale o su altri server, anche su un cluster di server. Puoi aumentare il numero di processi di lavoro, ad esempio fino a 10 volte il numero di CPU, e il chiamante può utilizzare AsyncTcpConnection per inviare dati in modo asincrono a questi processi di lavoro, ricevendo in modo asincrono i risultati del trattamento.
Server dei processi di lavoro
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// processi di lavoro per i compiti, utilizza il protocollo Text
$task_worker = new Worker('Text://0.0.0.0:12345');
// il numero di processi di lavoro può essere aumentato secondo necessità
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
// Si suppone che i dati inviati siano in json
$task_data = json_decode($task_data, true);
// Gestire la logica del compito in base a task_data .... ottenere il risultato, qui omesso ....
$task_result = ......
// Inviare il risultato
$connection->send(json_encode($task_result));
};
Worker::runAll();
Chiamata in Workerman
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// servizio websocket
$worker = new Worker('websocket://0.0.0.0:8080');
$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
// Stabilire una connessione asincrona con il servizio task remoto, l'ip è quello del servizio task remoto, se è locale è 127.0.0.1, se è un cluster è l'ip di lvs
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
// dati del compito e parametri
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// Inviare i dati
$task_connection->send(json_encode($task_data));
// Ottenere i risultati in modo asincrono
$task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result) use($ws_connection)
{
// Risultato
var_dump($task_result);
// Ricordati di chiudere la connessione asincrona dopo aver ottenuto il risultato
$task_connection->close();
// Informare il client websocket corrispondente che il compito è stato completato
$ws_connection->send('task complete');
};
// Eseguire la connessione asincrona
$task_connection->connect();
};
Worker::runAll();
In questo modo, i compiti pesanti vengono delegati ai processi locali o ad altri server e, una volta completati, i risultati vengono ricevuti in modo asincrono, evitando che il processo aziendale si blocchi.