listen

void Worker::listen(void)

Usado para executar a escuta após a instância do Worker ser criada.

Este método é principalmente utilizado para criar dinamicamente novas instâncias de Worker após o processo do Worker ter sido iniciado, permitindo que o mesmo processo escute várias portas e suporte vários protocolos. É importante notar que usar este método apenas adiciona a escuta no processo atual, não cria novos processos dinamicamente e não aciona o método onWorkerStart.

Por exemplo, após um Worker HTTP ser iniciado, uma instância de um Worker websocket pode ser criada, permitindo que este processo possa ser acessado tanto via protocolo HTTP quanto via protocolo websocket. Como o Worker websocket e o Worker HTTP estão no mesmo processo, eles podem acessar variáveis de memória compartilhadas e compartilhar todas as conexões de socket. Isso permite que requisições HTTP sejam recebidas e, em seguida, que o cliente websocket seja manipulado para completar a funcionalidade de envio de dados para o cliente.

Atenção:

Se a versão do PHP <= 7.0, não será possível instanciar Workers na mesma porta em múltiplos subprocessos. Por exemplo, se o processo A criar um Worker escutando a porta 2016, o processo B não poderá criar outro Worker escutando a mesma porta 2016, caso contrário, ocorrerá um erro Address already in use. O código abaixo é inexecutável.

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
// 4 processos
$worker->count = 4;
// Após cada processo ser iniciado, adiciona um novo Worker para escutar
$worker->onWorkerStart = function($worker)
{
    /**
     * Quando os 4 processos começam, todos criam Workers na porta 2016
     * Ao executar worker->listen() apresentará erro Address already in use
     * Se worker->count=1, não haverá erro
     */
    $inner_worker = new Worker('http://0.0.0.0:2016');
    $inner_worker->onMessage = 'on_message';
    // Executa a escuta. Aqui apresentará erro Address already in use
    $inner_worker->listen();
};

$worker->onMessage = 'on_message';

function on_message(TcpConnection $connection, $data)
{
    $connection->send("hello\n");
}

// Rodar o worker
Worker::runAll();

Se a sua versão do PHP >= 7.0, você pode definir Worker->reusePort=true, permitindo que múltiplos subprocessos criem Workers na mesma porta. Veja o exemplo abaixo:

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker('text://0.0.0.0:2015');
// 4 processos
$worker->count = 4;
// Após cada processo ser iniciado, adiciona um novo Worker para escutar
$worker->onWorkerStart = function($worker)
{
    $inner_worker = new Worker('http://0.0.0.0:2016');
    // Define reutilização de porta, permite a criação de Workers escutando na mesma porta (necessário PHP>=7.0)
    $inner_worker->reusePort = true;
    $inner_worker->onMessage = 'on_message';
    // Executa a escuta. A escuta funciona normalmente e não apresenta erros
    $inner_worker->listen();
};

$worker->onMessage = 'on_message';

function on_message(TcpConnection $connection, $data)
{
    $connection->send("hello\n");
}

// Rodar o worker
Worker::runAll();

Exemplo de envio de mensagens em tempo real de um backend PHP para o cliente

Princípio:

  1. Estabelecer um Worker websocket, para manter a conexão longa com o cliente.

  2. O Worker websocket cria internamente um Worker text.

  3. O Worker websocket e o Worker text estão no mesmo processo, permitindo compartilhamento fácil da conexão do cliente.

  4. Um sistema backend PHP independente se comunica com o Worker text via protocolo text.

  5. O Worker text manipula a conexão websocket para completar o envio de dados.

Código e Passos

push.php

<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// Inicializa um container Worker, escutando na porta 1234
$worker = new Worker('websocket://0.0.0.0:1234');

/*
 * Atenção, o número de processos deve ser definido como 1
 */
$worker->count = 1;
// Após o processo do worker iniciar, cria um Worker text para abrir uma porta de comunicação interna
$worker->onWorkerStart = function($worker)
{
    // Abre uma porta interna, facilitando que sistemas internos enviem dados, formato do protocolo Text texto + caractere de nova linha
    $inner_text_worker = new Worker('text://0.0.0.0:5678');
    $inner_text_worker->onMessage = function(TcpConnection $connection, $buffer)
    {
        // Formato do array $data, contém uid, que indica para qual uid enviar dados
        $data = json_decode($buffer, true);
        $uid = $data['uid'];
        // Através do workerman, envia dados para a página do uid
        $ret = sendMessageByUid($uid, $buffer);
        // Retorna o resultado do envio
        $connection->send($ret ? 'ok' : 'fail');
    };
    // ## Executa a escuta ##
    $inner_text_worker->listen();
};
// Nova propriedade para manter o mapeamento de uid para conexão
$worker->uidConnections = array();
// Função de callback executada quando um cliente envia uma mensagem
$worker->onMessage = function(TcpConnection $connection, $data)
{
    global $worker;
    // Verifica se o cliente atual já foi autenticado, ou seja, se o uid foi definido
    if(!isset($connection->uid))
    {
       // Se não autenticado, considera o primeiro pacote como uid (aqui para facilitar a demonstração, não foi feita uma verdadeira autenticação)
       $connection->uid = $data;
       /* Salva o mapeamento de uid para conexão, permitindo fácil busca de conexão pelo uid,
        * habilitando o envio de dados específicos a um uid
        */
       $worker->uidConnections[$connection->uid] = $connection;
       return;
    }
};

// Quando um cliente se desconecta
$worker->onClose = function(TcpConnection $connection)
{
    global $worker;
    if(isset($connection->uid))
    {
        // Remove o mapeamento quando a conexão é desconectada
        unset($worker->uidConnections[$connection->uid]);
    }
};

// Enviar dados para todos os usuários autenticados
function broadcast($message)
{
   global $worker;
   foreach($worker->uidConnections as $connection)
   {
        $connection->send($message);
   }
}

// Enviar dados para o uid específico
function sendMessageByUid($uid, $message)
{
    global $worker;
    if(isset($worker->uidConnections[$uid]))
    {
        $connection = $worker->uidConnections[$uid];
        $connection->send($message);
        return true;
    }
    return false;
}

// Rodar todos os workers
Worker::runAll();

Iniciar o serviço backend
php push.php start -d

Código JavaScript do frontend para receber as mensagens

var ws = new WebSocket('ws://127.0.0.1:1234');
ws.onopen = function(){
    var uid = 'uid1';
    ws.send(uid);
};
ws.onmessage = function(e){
    alert(e.data);
};

Código do backend para enviar mensagens

// Estabelecer conexão socket com a porta de envio interna
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
// Dados a serem enviados, contendo o campo uid, indicando para qual uid está enviando
$data = array('uid'=>'uid1', 'percent'=>'88%');
// Enviar dados, atenção, a porta 5678 é para o protocolo Text, que requer caractere de nova linha no final dos dados
fwrite($client, json_encode($data)."\n");
// Ler resultado do envio
echo fread($client, 8192);