How to Implement Asynchronous Tasks
Question:
How to asynchronously handle heavy business tasks to avoid blocking the main process for a long time? For example, if I need to send emails to 1000 users, this process is slow and may block for several seconds. During this time, the main flow will be blocked, affecting subsequent requests. How can such heavy tasks be handed over to other processes for asynchronous processing?
Answer:
You can pre-establish some task processes on your local machine, other servers, or even a server cluster to handle heavy business operations. The number of task processes can be a bit more than needed, for example, 10 times the number of CPU cores. Then, the caller can use AsyncTcpConnection to asynchronously send data to these task processes for asynchronous handling, receiving the processing results asynchronously.
Task Process Server
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// task worker, using Text protocol
$task_worker = new Worker('Text://0.0.0.0:12345');
// The number of task processes can be increased as needed
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
// Assume the data received is json
$task_data = json_decode($task_data, true);
// Handle the corresponding task logic based on task_data.... Get the result, omitted here....
$task_result = ......
// Send the result
$connection->send(json_encode($task_result));
};
Worker::runAll();
Calling from Workerman
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// websocket service
$worker = new Worker('websocket://0.0.0.0:8080');
$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
// Establish an asynchronous connection to the remote task service, with the IP as the remote task service's IP. If it's local, it's 127.0.0.1; if it's a cluster, use the LVS IP.
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
// Task and parameters data
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// Send data
$task_connection->send(json_encode($task_data));
// Asynchronously obtain the result
$task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result) use($ws_connection)
{
// Result
var_dump($task_result);
// Don't forget to close the asynchronous connection after obtaining the result
$task_connection->close();
// Notify the corresponding websocket client that the task is complete
$ws_connection->send('task complete');
};
// Execute the asynchronous connection
$task_connection->connect();
};
Worker::runAll();
In this way, heavy tasks are handed over to processes on the local machine or other servers, and once the tasks are completed, the results are received asynchronously, preventing blocking of the business process.