listen

void Worker::listen(void)

Используется для выполнения прослушивания после инстанцирования Worker.

Этот метод предназначен для динамического создания новых экземпляров Worker после запуска процесса Worker, что позволяет одному и тому же процессу слушать несколько портов и поддерживать различные протоколы. Необходимо обратить внимание, что данный метод просто добавляет прослушивание в текущий процесс и не создает новых процессов, а также не вызывает метод onWorkerStart.

Например, если http Worker запускается после инстанцирования websocket Worker, тогда этот процесс сможет работать как с http, так и с websocket протоколами. Поскольку websocket Worker и http Worker находятся в одном процессе, они могут обращаться к общим переменным в памяти, делиться всеми сокет-соединениями. Это позволяет принимать http запросы и затем взаимодействовать с websocket клиентом для выполнения отправки данных клиенту.

Важно:

Если версия PHP <= 7.0, то не поддерживается инстанцирование Worker с одинаковым портом в нескольких дочерних процессах. Например, если процесс A создал Worker для прослушивания порта 2016, то процесс B не сможет создать Worker для того же порта 2016, иначе будет выдана ошибка Address already in use. Например, приведенный ниже код не сможет выполниться.

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
// 4 процесса
$worker->count = 4;
// После старта каждого процесса добавляется новый Worker для прослушивания в текущем процессе
$worker->onWorkerStart = function($worker)
{
    /**
     * При запуске 4-х процессов создается Worker для порта 2016
     * При вызове worker->listen() возникнет ошибка Address already in use
     * Если worker->count=1, ошибка не возникнет
     */
    $inner_worker = new Worker('http://0.0.0.0:2016');
    $inner_worker->onMessage = 'on_message';
    // Запуск прослушивания. Здесь возникнет ошибка Address already in use
    $inner_worker->listen();
};

$worker->onMessage = 'on_message';

function on_message(TcpConnection $connection, $data)
{
    $connection->send("hello\n");
}

// Запустить worker
Worker::runAll();

Если ваша версия PHP >= 7.0, вы можете установить Worker->reusePort=true, что позволяет нескольким дочерним процессам создавать Worker с одинаковым портом. См. пример ниже:

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('text://0.0.0.0:2015');
// 4 процесса
$worker->count = 4;
// После старта каждого процесса добавляется новый Worker для прослушивания в текущем процессе
$worker->onWorkerStart = function($worker)
{
    $inner_worker = new Worker('http://0.0.0.0:2016');
    // Включение повторного использования порта позволяет создать Worker для одинаковых портов (требуется PHP >= 7.0)
    $inner_worker->reusePort = true;
    $inner_worker->onMessage = 'on_message';
    // Запуск прослушивания. Нормальное прослушивание не вызовет ошибки
    $inner_worker->listen();
};

$worker->onMessage = 'on_message';

function on_message(TcpConnection $connection, $data)
{
    $connection->send("hello\n");
}

// Запустить worker
Worker::runAll();

Пример: php-бэкэнд для мгновенной отправки сообщений клиенту

Принцип:

  1. Создание websocket Worker для поддержания долгого соединения с клиентом

  2. Внутри websocket Worker создается text Worker

  3. websocket Worker и text Worker находятся в одном процессе, что позволяет удобно делиться соединениями с клиентами

  4. Некоторая независимая php-система общается с text Worker по текстовому протоколу

  5. text Worker управляет websocket соединением для завершения отправки данных

Код и шаги

push.php

<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// Инициализация контейнера worker для прослушивания порта 1234
$worker = new Worker('websocket://0.0.0.0:1234');

/*
 * Обратите внимание, что число процессов должно быть 1
 */
$worker->count = 1;
// После запуска worker процесса создается text Worker для открытия внутреннего порта связи
$worker->onWorkerStart = function($worker)
{
    // Открытие внутреннего порта, удобного для внутренней системы для отправки данных, формат текста + перевод строки
    $inner_text_worker = new Worker('text://0.0.0.0:5678');
    $inner_text_worker->onMessage = function(TcpConnection $connection, $buffer)
    {
        // Формат массива $data, в котором есть uid, указывающий, на какую uid страницу отправить данные
        $data = json_decode($buffer, true);
        $uid = $data['uid'];
        // Отправка данных по uid через workerman
        $ret = sendMessageByUid($uid, $buffer);
        // Возврат результата отправки
        $connection->send($ret ? 'ok' : 'fail');
    };
    // ## Запуск прослушивания ##
    $inner_text_worker->listen();
};
// Новый атрибут для сохранения сопоставления uid к соединению
$worker->uidConnections = array();
// Функция обратного вызова, выполняемая при получении сообщения от клиента
$worker->onMessage = function(TcpConnection $connection, $data)
{
    global $worker;
    // Проверка, прошел ли текущий клиент аутентификацию, то есть установлен ли uid
    if(!isset($connection->uid))
    {
       // Если не аутентифицирован, то первый пакет принимается как uid (для наглядности проверки настоящей аутентификации в этом примере не реализована)
       $connection->uid = $data;
       /* Сохранение сопоставления uid к соединению, чтобы можно было легко найти соединение по uid,
        * чтобы реализовать отправку данных конкретному uid
        */
       $worker->uidConnections[$connection->uid] = $connection;
       return;
    }
};

// При отключении клиента
$worker->onClose = function(TcpConnection $connection)
{
    global $worker;
    if(isset($connection->uid))
    {
        // Удаление сопоставления при отключении
        unset($worker->uidConnections[$connection->uid]);
    }
};

// Отправка данных всем аутентифицированным пользователям
function broadcast($message)
{
   global $worker;
   foreach($worker->uidConnections as $connection)
   {
        $connection->send($message);
   }
}

// Отправка данных по uid
function sendMessageByUid($uid, $message)
{
    global $worker;
    if(isset($worker->uidConnections[$uid]))
    {
        $connection = $worker->uidConnections[$uid];
        $connection->send($message);
        return true;
    }
    return false;
}

// Запустить все worker
Worker::runAll();

Запуск бэкэнд-сервиса
php push.php start -d

JavaScript код для приема сообщений на клиенте

var ws = new WebSocket('ws://127.0.0.1:1234');
ws.onopen = function(){
    var uid = 'uid1';
    ws.send(uid);
};
ws.onmessage = function(e){
    alert(e.data);
};

Код для отправки сообщения с сервера

// Установление сокет-соединения с внутренним портом отправки
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
// Данные для отправки, содержащие поле uid, указывающее на этот uid
$data = array('uid'=>'uid1', 'percent'=>'88%');
// Отправка данных, обратите внимание, что порт 5678 - это порт текстового протокола, текстовый протокол требует добавления перевода строки в конце данных
fwrite($client, json_encode($data)."\n");
// Чтение результата отправки
echo fread($client, 8192);