listen
void Worker::listen(void)
Используется для выполнения прослушивания после инстанцирования Worker.
Этот метод предназначен для динамического создания новых экземпляров Worker после запуска процесса Worker, что позволяет одному и тому же процессу слушать несколько портов и поддерживать различные протоколы. Необходимо обратить внимание, что данный метод просто добавляет прослушивание в текущий процесс и не создает новых процессов, а также не вызывает метод onWorkerStart.
Например, если http Worker запускается после инстанцирования websocket Worker, тогда этот процесс сможет работать как с http, так и с websocket протоколами. Поскольку websocket Worker и http Worker находятся в одном процессе, они могут обращаться к общим переменным в памяти, делиться всеми сокет-соединениями. Это позволяет принимать http запросы и затем взаимодействовать с websocket клиентом для выполнения отправки данных клиенту.
Важно:
Если версия PHP <= 7.0, то не поддерживается инстанцирование Worker с одинаковым портом в нескольких дочерних процессах. Например, если процесс A создал Worker для прослушивания порта 2016, то процесс B не сможет создать Worker для того же порта 2016, иначе будет выдана ошибка Address already in use. Например, приведенный ниже код не сможет выполниться.
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
// 4 процесса
$worker->count = 4;
// После старта каждого процесса добавляется новый Worker для прослушивания в текущем процессе
$worker->onWorkerStart = function($worker)
{
/**
* При запуске 4-х процессов создается Worker для порта 2016
* При вызове worker->listen() возникнет ошибка Address already in use
* Если worker->count=1, ошибка не возникнет
*/
$inner_worker = new Worker('http://0.0.0.0:2016');
$inner_worker->onMessage = 'on_message';
// Запуск прослушивания. Здесь возникнет ошибка Address already in use
$inner_worker->listen();
};
$worker->onMessage = 'on_message';
function on_message(TcpConnection $connection, $data)
{
$connection->send("hello\n");
}
// Запустить worker
Worker::runAll();
Если ваша версия PHP >= 7.0, вы можете установить Worker->reusePort=true, что позволяет нескольким дочерним процессам создавать Worker с одинаковым портом. См. пример ниже:
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('text://0.0.0.0:2015');
// 4 процесса
$worker->count = 4;
// После старта каждого процесса добавляется новый Worker для прослушивания в текущем процессе
$worker->onWorkerStart = function($worker)
{
$inner_worker = new Worker('http://0.0.0.0:2016');
// Включение повторного использования порта позволяет создать Worker для одинаковых портов (требуется PHP >= 7.0)
$inner_worker->reusePort = true;
$inner_worker->onMessage = 'on_message';
// Запуск прослушивания. Нормальное прослушивание не вызовет ошибки
$inner_worker->listen();
};
$worker->onMessage = 'on_message';
function on_message(TcpConnection $connection, $data)
{
$connection->send("hello\n");
}
// Запустить worker
Worker::runAll();
Пример: php-бэкэнд для мгновенной отправки сообщений клиенту
Принцип:
-
Создание websocket Worker для поддержания долгого соединения с клиентом
-
Внутри websocket Worker создается text Worker
-
websocket Worker и text Worker находятся в одном процессе, что позволяет удобно делиться соединениями с клиентами
-
Некоторая независимая php-система общается с text Worker по текстовому протоколу
-
text Worker управляет websocket соединением для завершения отправки данных
Код и шаги
push.php
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// Инициализация контейнера worker для прослушивания порта 1234
$worker = new Worker('websocket://0.0.0.0:1234');
/*
* Обратите внимание, что число процессов должно быть 1
*/
$worker->count = 1;
// После запуска worker процесса создается text Worker для открытия внутреннего порта связи
$worker->onWorkerStart = function($worker)
{
// Открытие внутреннего порта, удобного для внутренней системы для отправки данных, формат текста + перевод строки
$inner_text_worker = new Worker('text://0.0.0.0:5678');
$inner_text_worker->onMessage = function(TcpConnection $connection, $buffer)
{
// Формат массива $data, в котором есть uid, указывающий, на какую uid страницу отправить данные
$data = json_decode($buffer, true);
$uid = $data['uid'];
// Отправка данных по uid через workerman
$ret = sendMessageByUid($uid, $buffer);
// Возврат результата отправки
$connection->send($ret ? 'ok' : 'fail');
};
// ## Запуск прослушивания ##
$inner_text_worker->listen();
};
// Новый атрибут для сохранения сопоставления uid к соединению
$worker->uidConnections = array();
// Функция обратного вызова, выполняемая при получении сообщения от клиента
$worker->onMessage = function(TcpConnection $connection, $data)
{
global $worker;
// Проверка, прошел ли текущий клиент аутентификацию, то есть установлен ли uid
if(!isset($connection->uid))
{
// Если не аутентифицирован, то первый пакет принимается как uid (для наглядности проверки настоящей аутентификации в этом примере не реализована)
$connection->uid = $data;
/* Сохранение сопоставления uid к соединению, чтобы можно было легко найти соединение по uid,
* чтобы реализовать отправку данных конкретному uid
*/
$worker->uidConnections[$connection->uid] = $connection;
return;
}
};
// При отключении клиента
$worker->onClose = function(TcpConnection $connection)
{
global $worker;
if(isset($connection->uid))
{
// Удаление сопоставления при отключении
unset($worker->uidConnections[$connection->uid]);
}
};
// Отправка данных всем аутентифицированным пользователям
function broadcast($message)
{
global $worker;
foreach($worker->uidConnections as $connection)
{
$connection->send($message);
}
}
// Отправка данных по uid
function sendMessageByUid($uid, $message)
{
global $worker;
if(isset($worker->uidConnections[$uid]))
{
$connection = $worker->uidConnections[$uid];
$connection->send($message);
return true;
}
return false;
}
// Запустить все worker
Worker::runAll();
Запуск бэкэнд-сервиса
php push.php start -d
JavaScript код для приема сообщений на клиенте
var ws = new WebSocket('ws://127.0.0.1:1234');
ws.onopen = function(){
var uid = 'uid1';
ws.send(uid);
};
ws.onmessage = function(e){
alert(e.data);
};
Код для отправки сообщения с сервера
// Установление сокет-соединения с внутренним портом отправки
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
// Данные для отправки, содержащие поле uid, указывающее на этот uid
$data = array('uid'=>'uid1', 'percent'=>'88%');
// Отправка данных, обратите внимание, что порт 5678 - это порт текстового протокола, текстовый протокол требует добавления перевода строки в конце данных
fwrite($client, json_encode($data)."\n");
// Чтение результата отправки
echo fread($client, 8192);