비동기 작업을 구현하는 방법
질문:
주요 비즈니스가 오랜 시간 차단되는 것을 피하기 위해 어떻게 무거운 비즈니스를 비동기로 처리할 수 있을까요? 예를 들어 1000명의 사용자에게 이메일을 보내야 하는데, 이 과정이 느려서 몇 초 동안 차단될 수 있습니다. 이 과정에서 주요 프로세스가 차단되면 후속 요청에 영향을 미치는데, 이렇게 무거운 작업을 다른 프로세스에 비동기로 처리하도록 맡기는 방법은 무엇인가요?
답변:
로컬 기기나 다른 서버, 심지어 서버 클러스터에 미리 몇 개의 작업 프로세스를 생성하여 무거운 비즈니스를 처리할 수 있습니다. 작업 프로세스의 수는 CPU의 10배 정도로 늘릴 수 있으며, 호출자는 AsyncTcpConnection을 사용하여 이러한 작업 프로세스에 데이터를 비동기로 전송하고 비동기로 처리된 결과를 얻을 수 있습니다.
작업 프로세스 서버
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// task worker,使用Text协议
$task_worker = new Worker('Text://0.0.0.0:12345');
// task进程数可以根据需要多开一些
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
// 假设发来的是json数据
$task_data = json_decode($task_data, true);
// 根据task_data处理相应的任务逻辑.... 得到结果,这里省略....
$task_result = ......
// 发送结果
$connection->send(json_encode($task_result));
};
Worker::runAll();
Workerman에서 호출하기
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// websocket服务
$worker = new Worker('websocket://0.0.0.0:8080');
$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
// 与远程task服务建立异步连接,ip为远程task服务的ip,如果是本机就是127.0.0.1,如果是集群就是lvs的ip
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
// 任务及参数数据
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// 发送数据
$task_connection->send(json_encode($task_data));
// 异步获得结果
$task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result)use($ws_connection)
{
// 结果
var_dump($task_result);
// 获得结果后记得关闭异步连接
$task_connection->close();
// 通知对应的websocket客户端任务完成
$ws_connection->send('task complete');
};
// 执行异步连接
$task_connection->connect();
};
Worker::runAll();
이렇게 하면 무거운 작업이 로컬 또는 다른 서버의 프로세스에 맡겨지며, 작업이 완료된 후 비동기로 결과를 받을 수 있으므로 비즈니스 프로세스가 차단되지 않습니다.