비동기 작업을 구현하는 방법

질문:

주요 비즈니스가 오랜 시간 차단되는 것을 피하기 위해 어떻게 무거운 비즈니스를 비동기로 처리할 수 있을까요? 예를 들어 1000명의 사용자에게 이메일을 보내야 하는데, 이 과정이 느려서 몇 초 동안 차단될 수 있습니다. 이 과정에서 주요 프로세스가 차단되면 후속 요청에 영향을 미치는데, 이렇게 무거운 작업을 다른 프로세스에 비동기로 처리하도록 맡기는 방법은 무엇인가요?

답변:

로컬 기기나 다른 서버, 심지어 서버 클러스터에 미리 몇 개의 작업 프로세스를 생성하여 무거운 비즈니스를 처리할 수 있습니다. 작업 프로세스의 수는 CPU의 10배 정도로 늘릴 수 있으며, 호출자는 AsyncTcpConnection을 사용하여 이러한 작업 프로세스에 데이터를 비동기로 전송하고 비동기로 처리된 결과를 얻을 수 있습니다.

작업 프로세스 서버

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// task worker,使用Text协议
$task_worker = new Worker('Text://0.0.0.0:12345');
// task进程数可以根据需要多开一些
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
     // 假设发来的是json数据
     $task_data = json_decode($task_data, true);
     // 根据task_data处理相应的任务逻辑.... 得到结果,这里省略....
     $task_result = ......
     // 发送结果
     $connection->send(json_encode($task_result));
};
Worker::runAll();

Workerman에서 호출하기

use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// websocket服务
$worker = new Worker('websocket://0.0.0.0:8080');

$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
    // 与远程task服务建立异步连接,ip为远程task服务的ip,如果是本机就是127.0.0.1,如果是集群就是lvs的ip
    $task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
    // 任务及参数数据
    $task_data = array(
        'function' => 'send_mail',
        'args'       => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
    );
    // 发送数据
    $task_connection->send(json_encode($task_data));
    // 异步获得结果
    $task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result)use($ws_connection)
    {
         // 结果
         var_dump($task_result);
         // 获得结果后记得关闭异步连接
         $task_connection->close();
         // 通知对应的websocket客户端任务完成
         $ws_connection->send('task complete');
    };
    // 执行异步连接
    $task_connection->connect();
};

Worker::runAll();

이렇게 하면 무거운 작업이 로컬 또는 다른 서버의 프로세스에 맡겨지며, 작업이 완료된 후 비동기로 결과를 받을 수 있으므로 비즈니스 프로세스가 차단되지 않습니다.