listen
void Worker::listen(void)
Workerをインスタンス化した後にリスニングを実行するためのメソッドです。
このメソッドは主に、Workerプロセスの起動後に新しいWorkerインスタンスを動的に作成するために使用され、同じプロセスで複数のポートをリスニングし、さまざまなプロトコルをサポートします。ただし、この方法でリスニングを追加することは、現在のプロセスでのみ行われ、動的に新しいプロセスを作成したり、onWorkerStartメソッドをトリガーしたりすることはありません。
例えば、http Workerが起動した後にwebsocket Workerをインスタンス化すると、このプロセスはhttpプロトコルとwebsocketプロトコルの両方にアクセスできます。websocket Workerとhttp Workerが同じプロセス内にあるため、共通のメモリ変数にアクセスし、すべてのソケット接続を共有できます。これにより、httpリクエストを受信し、websocketクライアントを操作してデータをクライアントにプッシュするような効果を実現できます。
注意:
PHPバージョンが<=7.0の場合、同じポートのWorkerを複数の子プロセスでインスタンス化することはサポートされていません。たとえば、プロセスAが2016ポートをリスニングするWorkerを作成した場合、プロセスBは再び2016ポートをリスニングするWorkerを作成できず、そうするとAddress already in useエラーが発生します。以下のコードは実行できません。
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
// 4プロセス
$worker->count = 4;
// 各プロセス起動後に現在のプロセスに新しいWorkerをリスニングさせる
$worker->onWorkerStart = function($worker)
{
/**
* 4つのプロセスが起動する際に2016ポートのWorkerを作成
* worker->listen()を実行すると、Address already in useエラーが表示される
* worker->count=1の場合はエラーが表示されない
*/
$inner_worker = new Worker('http://0.0.0.0:2016');
$inner_worker->onMessage = 'on_message';
// リスニングを実行します。ここでAddress already in useエラーが表示されます
$inner_worker->listen();
};
$worker->onMessage = 'on_message';
function on_message(TcpConnection $connection, $data)
{
$connection->send("hello\n");
}
// workerを実行
Worker::runAll();
PHPバージョンが>=7.0の場合、Worker->reusePort=trueを設定することで、複数の子プロセスが同じポートのWorkerを作成できるようになります。以下の例を参照してください:
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('text://0.0.0.0:2015');
// 4プロセス
$worker->count = 4;
// 各プロセス起動後に現在のプロセスに新しいWorkerをリスニングさせる
$worker->onWorkerStart = function($worker)
{
$inner_worker = new Worker('http://0.0.0.0:2016');
// ポート再利用を設定し、同じポートのWorkerをリスニングできるようにします(PHP>=7.0が必要です)
$inner_worker->reusePort = true;
$inner_worker->onMessage = 'on_message';
// リスニングを実行します。正常にリスニングし、エラーは表示されません
$inner_worker->listen();
};
$worker->onMessage = 'on_message';
function on_message(TcpConnection $connection, $data)
{
$connection->send("hello\n");
}
// workerを実行
Worker::runAll();
例:phpバックエンドがクライアントに即時にメッセージをプッシュする
原理:
-
クライアントの長期接続を維持するためにwebsocket Workerを作成します。
-
websocket Workerの内部にtext Workerを作成します。
-
websocket Workerとtext Workerは同じプロセス内にあり、クライアント接続を簡単に共有できます。
-
特定の独立したphpバックエンドシステムがtextプロトコルを使用してtext Workerと通信します。
-
text Workerがwebsocket接続を操作してデータをプッシュします。
コードと手順
push.php
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// 1234ポートをリスニングするworkerコンテナを初期化
$worker = new Worker('websocket://0.0.0.0:1234');
/*
* プロセス数は1に設定する必要があることに注意してください
*/
$worker->count = 1;
// workerプロセス起動後に内部通信ポートを開くためにtext Workerを作成します
$worker->onWorkerStart = function($worker)
{
// 内部システムがデータをプッシュしやすくするために内部ポートを開く、Textプロトコル形式はテキスト+改行
$inner_text_worker = new Worker('text://0.0.0.0:5678');
$inner_text_worker->onMessage = function(TcpConnection $connection, $buffer)
{
// $data配列形式、uidを含み、どのuidのページにデータをプッシュするかを示します
$data = json_decode($buffer, true);
$uid = $data['uid'];
// workermanを通じてuidのページにデータをプッシュします
$ret = sendMessageByUid($uid, $buffer);
// プッシュ結果を返します
$connection->send($ret ? 'ok' : 'fail');
};
// ## リスニングを実行 ##
$inner_text_worker->listen();
};
// uidからconnectionへのマッピングを保存するための新しいプロパティを追加
$worker->uidConnections = array();
// クライアントからメッセージが送信されたときに実行されるコールバック関数
$worker->onMessage = function(TcpConnection $connection, $data)
{
global $worker;
// 現在のクライアントがすでに認証されているかどうか、すなわちuidが設定されているかを確認します
if(!isset($connection->uid))
{
// 認証されていない場合、最初のパケットをuidとみなします(これはデモの便宜上、実際の認証は行っていません)
$connection->uid = $data;
/* uidからconnectionへのマッピングを保存します。これによりuidを通じてconnectionを簡単に検索し、
* 特定のuidに対してデータをプッシュできます
*/
$worker->uidConnections[$connection->uid] = $connection;
return;
}
};
// クライアントの接続が切断されたとき
$worker->onClose = function(TcpConnection $connection)
{
global $worker;
if(isset($connection->uid))
{
// 接続が切断されたときにマッピングを削除します
unset($worker->uidConnections[$connection->uid]);
}
};
// 認証されたすべてのユーザーにデータをプッシュします
function broadcast($message)
{
global $worker;
foreach($worker->uidConnections as $connection)
{
$connection->send($message);
}
}
// uidに対してデータをプッシュします
function sendMessageByUid($uid, $message)
{
global $worker;
if(isset($worker->uidConnections[$uid]))
{
$connection = $worker->uidConnections[$uid];
$connection->send($message);
return true;
}
return false;
}
// すべてのworkerを実行する
Worker::runAll();
バックエンドサービスを起動します
php push.php start -d
フロントエンドが受信したプッシュメッセージのjsコード
var ws = new WebSocket('ws://127.0.0.1:1234');
ws.onopen = function(){
var uid = 'uid1';
ws.send(uid);
};
ws.onmessage = function(e){
alert(e.data);
};
バックエンドがメッセージをプッシュするコード
// 内部プッシュポートにソケット接続を確立します
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
// プッシュされるデータ、uidフィールドを含み、このuidに対してプッシュされることを示します
$data = array('uid'=>'uid1', 'percent'=>'88%');
// データを送信します。5678ポートはTextプロトコルのポートであり、Textプロトコルではデータの末尾に改行を追加する必要があります
fwrite($client, json_encode($data)."\n");
// プッシュ結果を読み取ります
echo fread($client, 8192);