слушать
void Worker::listen(void)
Используется для запуска прослушивания после создания экземпляра Worker.
Этот метод в основном используется для динамического создания нового экземпляра Worker после запуска рабочего процесса, что позволяет одному процессу прослушивать несколько портов и поддерживать несколько протоколов. Следует отметить, что использование этого метода приводит только к добавлению прослушивания в текущем процессе, но не создает новый процесс динамически и не вызывает метод onWorkerStart.
Например, после запуска http Worker можно создать экземпляр websocket Worker, чтобы этот процесс мог обслуживать запросы как по протоколу http, так и по протоколу websocket. Поскольку websocket Worker и http Worker находятся в одном процессе, они могут обмениваться общими переменными в памяти и использовать общие соединения с сокетами. Это позволяет принимать запросы по протоколу http, после чего взаимодействовать с клиентом через websocket для передачи данных.
Примечание:
Если версия PHP <=7.0, то нельзя создавать Worker, слушающие один и тот же порт в нескольких дочерних процессах. Например, если процесс A создал Worker, слушающий порт 2016, то процесс B не сможет создать Worker, слушающий порт 2016, иначе возникнет ошибка Address already in use
. Например, следующий код не сможет быть выполнен.
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
// 4 процесса
$worker->count = 4;
// После запуска каждого процесса создаем Worker для прослушивания текущего порта в этом процессе
$worker->onWorkerStart = function($worker)
{
/**
* При запуске 4 процессов каждый будет создавать Worker для порта 2016
* При вызове worker->listen() возникнет ошибка Address already in use
* Если worker->count=1, то ошибка не возникнет
*/
$inner_worker = new Worker('http://0.0.0.0:2016');
$inner_worker->onMessage = 'on_message';
// Начинаем прослушивание. Здесь возникнет ошибка Address already in use
$inner_worker->listen();
};
$worker->onMessage = 'on_message';
function on_message(TcpConnection $connection, $data)
{
$connection->send("hello\n");
}
// Запуск worker
Worker::runAll();
Если ваша версия PHP >=7.0, можно установить Worker->reusePort=true, чтобы создавать Worker, слушающие один и тот же порт в нескольких дочерних процессах. Пример ниже:
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('text://0.0.0.0:2015');
// 4 процесса
$worker->count = 4;
// После запуска каждого процесса создаем Worker для прослушивания текущего порта в этом процессе
$worker->onWorkerStart = function($worker)
{
$inner_worker = new Worker('http://0.0.0.0:2016');
// Установка переиспользования порта, что позволяет создавать прослушивание одного и того же порта в разных процессах (требуется PHP >=7.0)
$inner_worker->reusePort = true;
$inner_worker->onMessage = 'on_message';
// Начинаем прослушивание. В этом случае прослушивание будет выполнено без ошибок
$inner_worker->listen();
};
$worker->onMessage = 'on_message';
function on_message(TcpConnection $connection, $data)
{
$connection->send("hello\n");
}
// Запуск worker
Worker::runAll();
Пример: немедленная отправка сообщений от сервера PHP клиенту
Принцип:
- Создать WebSocket Worker для поддержания длительного соединения с клиентом.
- Внутри WebSocket Worker создать text Worker.
- WebSocket Worker и text Worker находятся в одном процессе, что облегчает обмен клиентскими соединениями.
- Некоторая отдельная PHP-система на сервере общается с text Worker через протокол text.
- text Worker управляет соединением WebSocket для отправки данных.
Код и шаги:
push.php
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// Инициализация контейнера для рабочих процессов, прослушивающих порт 1234
$worker = new Worker('websocket://0.0.0.0:1234');
/*
* Обратите внимание, что количество процессов должно быть установлено равным 1
*/
$worker->count = 1;
// После запуска рабочего процесса создаем text Worker для установки внутреннего порта общения
$worker->onWorkerStart = function($worker)
{
// Открытие внутреннего порта для удобства отправки данных внутренними системами, формат протокола Text: текст+символ конца строки
$inner_text_worker = new Worker('text://0.0.0.0:5678');
$inner_text_worker->onMessage = function(TcpConnection $connection, $buffer)
{
// Массив данных содержит uid, указывающий, кому отправлять данные
$data = json_decode($buffer, true);
$uid = $data['uid'];
// Через workerman отправляем данные на страницу с uid
$ret = sendMessageByUid($uid, $buffer);
// Возвращаем результат отправки
$connection->send($ret ? 'ok' : 'fail');
};
// ## Начинаем прослушивание ##
$inner_text_worker->listen();
};
// Добавляем свойство, чтобы сохранить соответствие uid и соединений
$worker->uidConnections = array();
// Когда клиент отправляет сообщение
$worker->onMessage = function(TcpConnection $connection, $data)
{
global $worker;
// Проверяем, прошла ли аутентификация текущего клиента, т.е. установлен ли uid
if(!isset($connection->uid))
{
// Если аутентификация не прошла, то первый пакет используем в качестве uid (для демонстрации, без проведения реальной аутентификации)
$connection->uid = $data;
/* Сохраняем соответствие uid и соединений, что позволяет легко найти соединение по uid и отправлять сообщения каждому отдельному uid */
$worker->uidConnections[$connection->uid] = $connection;
return;
}
};
// Когда у клиента закрыто соединение
$worker->onClose = function(TcpConnection $connection)
{
global $worker;
if(isset($connection->uid))
{
// При разрыве соединения удаляем соответствие
unset($worker->uidConnections[$connection->uid]);
}
};
// Отправка данных всем аутентифицированным пользователям
function broadcast($message)
{
global $worker;
foreach($worker->uidConnections as $connection)
{
$connection->send($message);
}
}
// Отправка данных определенному uid
function sendMessageByUid($uid, $message)
{
global $worker;
if(isset($worker->uidConnections[$uid]))
{
$connection = $worker->uidConnections[$uid];
$connection->send($message);
return true;
}
return false;
}
// Запуск всех рабочих процессов
Worker::runAll();
Запуск сервера
php push.php start -d
JS-код для приема полученного PUSH
var ws = new WebSocket('ws://127.0.0.1:1234');
ws.onopen = function(){
var uid = 'uid1';
ws.send(uid);
};
ws.onmessage = function(e){
alert(e.data);
};
Код для отправки сообщений с сервера
// Устанавливаем соединение с внутренним портом для PUSH-уведомлений
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
// Данные для отправки содержат поле uid, указывающее, кому отправлять
$data = array('uid'=>'uid1', 'percent'=>'88%');
// Отправляем данные, обратите внимание, что порт 5678 использует протокол Text, который требует символ конца строки в конце данных
fwrite($client, json_encode($data)."\n");
// Получаем результат отправки
echo fread($client, 8192);