listen

void Worker::listen(void)

Worker 인스턴스를 생성한 후에 리슨을 실행하기 위해 사용됩니다.

이 방법은 Worker 프로세스가 시작된 후 동적으로 새로운 Worker 인스턴스를 생성하는 데 주로 사용되며, 하나의 프로세스가 여러 포트를 리슨할 수 있고 다양한 프로토콜을 지원합니다. 이 방법을 사용할 때 주의할 점은 현재 프로세스에 리슨을 추가하는 것일 뿐, 새로운 프로세스를 동적으로 생성하지 않으며 onWorkerStart 메서드를 트리거하지 않게 됩니다.

예를 들어, http Worker가 시작된 후 websocket Worker를 인스턴스화하면, 이 프로세스는 http 프로토콜을 통해 접근할 수 있을 뿐만 아니라 websocket 프로토콜을 통해서도 접근할 수 있습니다. websocket Worker와 http Worker가 같은 프로세스에 있기 때문에, 그들은 공통 메모리 변수를 접근하고 모든 소켓 연결을 공유할 수 있습니다. http 요청을 수신한 후 websocket 클라이언트를 사용하여 클라이언트에게 데이터를 푸시하는 유사한 효과를 얻을 수 있습니다.

주의:

PHP 버전이 <= 7.0 인 경우, 동일한 포트의 Worker를 여러 자식 프로세스에서 인스턴스화하는 것을 지원하지 않습니다. 예를 들어, A 프로세스가 2016 포트를 리슨하는 Worker를 생성하면, B 프로세스는 2016 포트를 리슨하는 Worker를 생성할 수 없으며, 그렇지 않으면 Address already in use 오류가 발생합니다. 아래의 코드는 실행할 수 없는 코드입니다.

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
// 4개의 프로세스
$worker->count = 4;
// 각 프로세스가 시작된 후 현재 프로세스에 새로운 Worker를 추가하여 리슨
$worker->onWorkerStart = function($worker)
{
    /**
     * 4개의 프로세스가 시작될 때마다 2016 포트의 Worker를 생성합니다.
     * worker->listen()을 실행할 때 Address already in use 오류가 발생합니다.
     * 만약 worker->count=1 이라면 오류가 발생하지 않습니다.
     */
    $inner_worker = new Worker('http://0.0.0.0:2016');
    $inner_worker->onMessage = 'on_message';
    // 리슨 실행. 여기서 Address already in use 오류가 발생합니다.
    $inner_worker->listen();
};

$worker->onMessage = 'on_message';

function on_message(TcpConnection $connection, $data)
{
    $connection->send("hello\n");
}

// 워커 실행
Worker::runAll();

PHP 버전이 >= 7.0인 경우, Worker->reusePort=true를 설정하여 여러 자식 프로세스가 동일한 포트의 Worker를 생성할 수 있습니다. 아래 예제를 참조하십시오:

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('text://0.0.0.0:2015');
// 4개의 프로세스
$worker->count = 4;
// 각 프로세스가 시작된 후 현재 프로세스에 새로운 Worker를 추가하여 리슨
$worker->onWorkerStart = function($worker)
{
    $inner_worker = new Worker('http://0.0.0.0:2016');
    // 포트 재사용 설정, 동일한 포트를 리슨하는 Worker 생성 가능 (PHP >= 7.0 필요)
    $inner_worker->reusePort = true;
    $inner_worker->onMessage = 'on_message';
    // 리슨 실행. 정상적으로 리슨하며 오류가 발생하지 않습니다.
    $inner_worker->listen();
};

$worker->onMessage = 'on_message';

function on_message(TcpConnection $connection, $data)
{
    $connection->send("hello\n");
}

// 모든 워커 실행
Worker::runAll();

예제 php 백엔드 실시간으로 메시지를 클라이언트에게 푸시

원리:

  1. 클라이언트의 장기 연결을 유지하기 위한 websocket Worker를 설정합니다.
  2. websocket Worker 내부에 text Worker를 생성합니다.
  3. websocket Worker와 text Worker는 동일한 프로세스 내에서, 클라이언트 연결을 공유하기 쉽게 만들어집니다.
  4. 특정 독립 php 백엔드 시스템이 text 프로토콜을 통해 text Worker와 통신합니다.
  5. text Worker가 websocket 연결을 통해 데이터를 푸시합니다.

코드 및 단계

push.php

<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// 1234 포트를 리슨하는 워커 컨테이너 초기화
$worker = new Worker('websocket://0.0.0.0:1234');

/*
 * 여기서 프로세스 수는 반드시 1로 설정해야 합니다.
 */
$worker->count = 1;
// worker 프로세스가 시작된 후 내부 통신 포트를 열기 위해 text Worker를 생성합니다.
$worker->onWorkerStart = function($worker)
{
    // 내부 시스템에서 데이터 푸시를 용이하게 하기 위해 내부 포트를 엽니다. Text 프로토콜 형식: 텍스트 + 개행 문자
    $inner_text_worker = new Worker('text://0.0.0.0:5678');
    $inner_text_worker->onMessage = function(TcpConnection $connection, $buffer)
    {
        // $data 배열 형식, uid가 포함되어 해당 uid 페이지로 푸시할 데이터를 나타냅니다.
        $data = json_decode($buffer, true);
        $uid = $data['uid'];
        // workerman을 통해 uid 페이지로 푸시합니다.
        $ret = sendMessageByUid($uid, $buffer);
        // 푸시 결과 반환
        $connection->send($ret ? 'ok' : 'fail');
    };
    // ## 리슨 실행 ##
    $inner_text_worker->listen();
};
// connection과 uid 매핑을 저장하기 위한 새로운 속성 추가
$worker->uidConnections = array();
// 클라이언트로부터 메시지가 수신될 때 실행되는 콜백 함수
$worker->onMessage = function(TcpConnection $connection, $data)
{
    global $worker;
    // 현재 클라이언트가 인증되었는지, 즉 uid가 설정되었는지 확인합니다.
    if(!isset($connection->uid))
    {
       // 인증되지 않은 경우 첫 번째 패킷을 uid로 간주합니다. (여기서는 편의를 위해 실제 인증을 수행하지 않았습니다.)
       $connection->uid = $data;
       /* uid와 connection의 매핑을 저장하여, uid를 통해 connection을 쉽게 찾을 수 있게 합니다.
        * 특정 uid에 대한 데이터 푸시 구현
        */
       $worker->uidConnections[$connection->uid] = $connection;
       return;
    }
};

// 클라이언트 연결이 끊어졌을 때
$worker->onClose = function(TcpConnection $connection)
{
    global $worker;
    if(isset($connection->uid))
    {
        // 연결이 끊어질 때 매핑을 삭제합니다.
        unset($worker->uidConnections[$connection->uid]);
    }
};

// 인증된 모든 사용자에게 데이터 푸시
function broadcast($message)
{
   global $worker;
   foreach($worker->uidConnections as $connection)
   {
        $connection->send($message);
   }
}

// uid에 대한 데이터 푸시
function sendMessageByUid($uid, $message)
{
    global $worker;
    if(isset($worker->uidConnections[$uid]))
    {
        $connection = $worker->uidConnections[$uid];
        $connection->send($message);
        return true;
    }
    return false;
}

// 모든 워커 실행
Worker::runAll();

백엔드 서비스 시작
php push.php start -d

프론트엔드에서 푸시를 수신하는 js 코드

var ws = new WebSocket('ws://127.0.0.1:1234');
ws.onopen = function(){
    var uid = 'uid1';
    ws.send(uid);
};
ws.onmessage = function(e){
    alert(e.data);
};

백엔드에서 메시지를 푸시하는 코드

// 내부 푸시 포트에 대한 소켓 연결 생성
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
// 푸시할 데이터, 해당 uid에게 푸시할 것임을 표시하는 uid 필드가 포함됨
$data = array('uid'=>'uid1', 'percent'=>'88%');
// 데이터 전송, 5678 포트는 Text 프로토콜을 사용하는 포트이며, Text 프로토콜은 데이터 끝에 개행 문자를 추가해야 합니다.
fwrite($client, json_encode($data)."\n");
// 푸시 결과 읽기
echo fread($client, 8192);