Comment réaliser des tâches asynchrones
Question :
Comment traiter de manière asynchrone des tâches lourdes, afin d'éviter que le processus principal ne soit bloqué pendant une longue période ? Par exemple, je dois envoyer des e-mails à 1000 utilisateurs, ce processus peut être long et risquerait de bloquer pendant plusieurs secondes. Pendant ce temps, le processus principal est bloqué, ce qui peut affecter les requêtes suivantes. Comment déléguer de telles tâches lourdes à d'autres processus pour qu'elles soient traitées de manière asynchrone ?
Réponse :
Il est possible d'établir à l'avance quelques processus de tâches sur la machine locale ou sur d'autres serveurs, voire sur un cluster de serveurs, pour traiter ces tâches lourdes. Le nombre de processus de tâches peut être largement supérieur à celui des CPU, par exemple dix fois plus, puis l'appelant peut utiliser AsyncTcpConnection pour envoyer des données de manière asynchrone à ces processus de tâches pour qu'ils les traitent et retourner les résultats de manière asynchrone.
Serveur des processus de tâches
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// worker de tâche, utilisant le protocole Text
$task_worker = new Worker('Text://0.0.0.0:12345');
// Le nombre de processus de tâches peut être augmenté selon les besoins
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
// Supposons que les données envoyées sont au format JSON
$task_data = json_decode($task_data, true);
// Traiter la logique de la tâche correspondante en fonction de task_data.... Obtenir le résultat, ici omis....
$task_result = ......
// Envoyer le résultat
$connection->send(json_encode($task_result));
};
Worker::runAll();
Appel dans Workerman
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// service websocket
$worker = new Worker('websocket://0.0.0.0:8080');
$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
// Établir une connexion asynchrone avec le service de tâches distant, l'IP étant celle du service de tâches distant, si c'est la machine locale, c'est 127.0.0.1, et si c'est un cluster, c'est l'IP de lvs
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
// Tâche et données des paramètres
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// Envoyer les données
$task_connection->send(json_encode($task_data));
// Obtenir le résultat de manière asynchrone
$task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result)use($ws_connection)
{
// Résultat
var_dump($task_result);
// N'oubliez pas de fermer la connexion asynchrone après avoir obtenu le résultat
$task_connection->close();
// Informer le client websocket correspondant que la tâche est terminée
$ws_connection->send('tâche complète');
};
// Exécuter la connexion asynchrone
$task_connection->connect();
};
Worker::runAll();
Ainsi, les tâches lourdes sont déléguées à des processus fonctionnant sur la machine locale ou d'autres serveurs, et les résultats seront reçus de manière asynchrone une fois la tâche terminée, ce qui évite que le processus métier ne soit bloqué.