Cách thực hiện nhiệm vụ bất đồng bộ
Hỏi:
Làm thế nào để xử lý công việc nặng một cách bất đồng bộ để tránh việc chính bị chặn lâu dài? Ví dụ, tôi muốn gửi email cho 1000 người dùng, quá trình này rất chậm, có thể mất vài giây, trong suốt quá trình này do quy trình chính bị chặn, điều đó sẽ ảnh hưởng đến các yêu cầu tiếp theo, làm thế nào để giao những nhiệm vụ nặng nề như vậy cho các tiến trình khác xử lý một cách bất đồng bộ.
Đáp:
Bạn có thể thiết lập một số tiến trình nhiệm vụ trên máy này hoặc trên các máy chủ khác thậm chí là cụm máy chủ để xử lý các công việc nặng. Số lượng tiến trình nhiệm vụ có thể mở nhiều hơn, chẳng hạn như gấp 10 lần số CPU, sau đó bên gọi sử dụng AsyncTcpConnection để gửi dữ liệu một cách bất đồng bộ đến các tiến trình nhiệm vụ này để xử lý và nhận được kết quả một cách bất đồng bộ.
Máy chủ tiến trình nhiệm vụ
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// task worker, sử dụng giao thức Text
$task_worker = new Worker('Text://0.0.0.0:12345');
// số lượng tiến trình nhiệm vụ có thể mở rộng theo nhu cầu
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
// giả sử dữ liệu gửi đến là json
$task_data = json_decode($task_data, true);
// xử lý logic nhiệm vụ tương ứng dựa trên task_data.... nhận được kết quả, ở đây bỏ qua....
$task_result = ......
// gửi kết quả
$connection->send(json_encode($task_result));
};
Worker::runAll();
Gọi trong Workerman
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// dịch vụ websocket
$worker = new Worker('websocket://0.0.0.0:8080');
$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
// thiết lập kết nối bất đồng bộ với dịch vụ nhiệm vụ từ xa, ip là ip của dịch vụ nhiệm vụ từ xa, nếu là máy này thì là 127.0.0.1, nếu là cụm thì là ip của lvs
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
// dữ liệu nhiệm vụ và tham số
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// gửi dữ liệu
$task_connection->send(json_encode($task_data));
// nhận kết quả một cách bất đồng bộ
$task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result)use($ws_connection)
{
// kết quả
var_dump($task_result);
// sau khi nhận được kết quả nhớ đóng kết nối bất đồng bộ
$task_connection->close();
// thông báo cho client websocket tương ứng rằng nhiệm vụ đã hoàn thành
$ws_connection->send('task complete');
};
// thực hiện kết nối bất đồng bộ
$task_connection->connect();
};
Worker::runAll();
Bằng cách này, các công việc nặng nề được giao cho các tiến trình trên máy này hoặc các máy chủ khác để thực hiện, và sau khi nhiệm vụ hoàn thành, bạn sẽ nhận được kết quả một cách bất đồng bộ, quy trình kinh doanh sẽ không bị chặn.