Wie man asynchrone Aufgaben umsetzt

Frage:

Wie kann man schwere Geschäftsprozesse asynchron bearbeiten, um zu vermeiden, dass das Hauptgeschäft lange blockiert wird? Zum Beispiel, wenn ich 1000 Benutzern E-Mails senden möchte, ist dieser Prozess sehr langsam und könnte mehrere Sekunden blockieren. Während dieser Zeit wird der Hauptprozess blockiert, was sich auf nachfolgende Anforderungen auswirkt. Wie kann man solche schweren Aufgaben an andere Prozesse zur asynchronen Verarbeitung übertragen?

Antwort:

Man kann auf dem eigenen Rechner oder auf anderen Servern, sogar in einem Servercluster, einige Aufgabenprozesse einrichten, um schwere Geschäftsprozesse zu bearbeiten. Die Anzahl der Aufgabenprozesse kann höher sein, zum Beispiel das 10-fache der CPU-Anzahl. Der Aufrufer nutzt AsyncTcpConnection, um Daten asynchron an diese Aufgabenprozesse zu senden und erhält asynchron die Verarbeitungsergebnisse.

Aufgabenprozess-Server

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// task worker, Verwendung des Textprotokolls
$task_worker = new Worker('Text://0.0.0.0:12345');
// Anzahl der Auftragsprozesse kann nach Bedarf erhöht werden
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
     // Angenommen, es handelt sich um JSON-Daten
     $task_data = json_decode($task_data, true);
     // Behandle die entsprechende Aufgabenlogik basierend auf task_data.... Ergebnis erhalten, hier weggelassen....
     $task_result = ......
     // Ergebnis senden
     $connection->send(json_encode($task_result));
};
Worker::runAll();

Aufruf in Workerman

use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// websocket Dienst
$worker = new Worker('websocket://0.0.0.0:8080');

$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
    // Stelle eine asynchrone Verbindung zum entfernten Task-Service her, die IP ist die IP des entfernten Task-Service, wenn es sich um den lokalen Rechner handelt, ist es 127.0.0.1, wenn es sich um ein Cluster handelt, dann die LVS-IP
    $task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
    // Auftrags- und Parameterdaten
    $task_data = array(
        'function' => 'send_mail',
        'args'       => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
    );
    // Daten senden
    $task_connection->send(json_encode($task_data));
    // Asynchron Ergebnis erhalten
    $task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result) use ($ws_connection)
    {
         // Ergebnis
         var_dump($task_result);
         // Vergiss nicht, die asynchrone Verbindung zu schließen, nachdem das Ergebnis erhalten wurde
         $task_connection->close();
         // Benachrichtige den entsprechenden Websocket-Client, dass die Aufgabe abgeschlossen ist
         $ws_connection->send('task complete');
    };
    // Führe die asynchrone Verbindung aus
    $task_connection->connect();
};

Worker::runAll();

Durch diese Methode werden die schweren Aufgaben an Prozesse auf dem eigenen Rechner oder auf anderen Servern übergeben. Nach Abschluss der Aufgaben wird das Ergebnis asynchron empfangen, und der Geschäftsprozess wird nicht blockiert.