listen
void Worker::listen(void)
Được sử dụng để thực hiện lắng nghe sau khi khởi tạo Worker.
Phương thức này chủ yếu được sử dụng để tạo ra các instance Worker mới một cách động sau khi quá trình Worker khởi động, cho phép một quá trình lắng nghe nhiều cổng khác nhau và hỗ trợ nhiều giao thức. Cần lưu ý rằng việc sử dụng phương thức này chỉ thêm lắng nghe trong quá trình hiện tại và không tạo ra các quá trình mới một cách động, cũng không kích hoạt phương thức onWorkerStart.
Ví dụ, một Worker http khởi động rồi tạo ra một Worker websocket, thì quá trình này có thể truy cập cả hai giao thức http và websocket. Do Worker websocket và Worker http nằm trong cùng một quá trình, nên chúng có thể truy cập các biến bộ nhớ chung và chia sẻ tất cả các kết nối socket. Điều này cho phép nhận các yêu cầu http và sau đó thao tác với các client websocket để thực hiện việc đẩy dữ liệu đến client.
Chú ý:
Nếu phiên bản PHP <= 7.0, thì không hỗ trợ việc tạo các Worker với cùng một cổng trong nhiều quá trình con. Ví dụ, nếu quá trình A tạo một Worker lắng nghe cổng 2016, thì quá trình B không thể tạo thêm một Worker lắng nghe cổng 2016 nữa, nếu không sẽ báo lỗi Address already in use. Ví dụ dưới đây là không thể chạy được.
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
// 4 quá trình
$worker->count = 4;
// Mỗi quá trình khởi động sau đó thêm một Worker lắng nghe
$worker->onWorkerStart = function($worker)
{
/**
* Khi 4 quá trình khởi động đều tạo ra Worker lắng nghe cổng 2016
* Khi thực hiện đến worker->listen() sẽ báo lỗi Address already in use
* Nếu worker->count=1 thì sẽ không có lỗi
*/
$inner_worker = new Worker('http://0.0.0.0:2016');
$inner_worker->onMessage = 'on_message';
// Thực hiện lắng nghe. Ở đây sẽ báo lỗi Address already in use
$inner_worker->listen();
};
$worker->onMessage = 'on_message';
function on_message(TcpConnection $connection, $data)
{
$connection->send("hello\n");
}
// Chạy worker
Worker::runAll();
Nếu phiên bản PHP của bạn >= 7.0, bạn có thể thiết lập Worker->reusePort=true, điều này cho phép nhiều quá trình con tạo ra các Worker với cùng một cổng. Xem ví dụ dưới đây:
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker('text://0.0.0.0:2015');
// 4 quá trình
$worker->count = 4;
// Mỗi quá trình khởi động sau đó thêm một Worker lắng nghe
$worker->onWorkerStart = function($worker)
{
$inner_worker = new Worker('http://0.0.0.0:2016');
// Thiết lập tái sử dụng cổng, có thể tạo ra Worker lắng nghe cùng một cổng (yêu cầu PHP >= 7.0)
$inner_worker->reusePort = true;
$inner_worker->onMessage = 'on_message';
// Thực hiện lắng nghe. Lắng nghe bình thường sẽ không báo lỗi
$inner_worker->listen();
};
$worker->onMessage = 'on_message';
function on_message(TcpConnection $connection, $data)
{
$connection->send("hello\n");
}
// Chạy worker
Worker::runAll();
Ví dụ php backend gửi tin nhắn kịp thời đến client
Nguyên lý:
-
Xây dựng một Worker websocket, dùng để duy trì kết nối dài hạn với client.
-
Worker websocket bên trong tạo ra một Worker text.
-
Worker websocket và Worker text nằm trong cùng một quá trình, có thể chia sẻ kết nối client một cách tiện lợi.
-
Một hệ thống backend PHP độc lập thông qua giao thức text để giao tiếp với Worker text.
-
Worker text thao tác với kết nối websocket để hoàn thành việc đẩy dữ liệu.
Mã và các bước
push.php
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// Khởi tạo một container worker, lắng nghe cổng 1234
$worker = new Worker('websocket://0.0.0.0:1234');
/*
* Lưu ý rằng số lượng quá trình phải được thiết lập là 1
*/
$worker->count = 1;
// Khi quá trình worker khởi động thì tạo ra một Worker text để mở một cổng giao tiếp nội bộ
$worker->onWorkerStart = function($worker)
{
// Khởi động một cổng nội bộ, tiện lợi cho hệ thống nội bộ đẩy dữ liệu, định dạng giao thức Text là văn bản + ký tự xuống dòng
$inner_text_worker = new Worker('text://0.0.0.0:5678');
$inner_text_worker->onMessage = function(TcpConnection $connection, $buffer)
{
// Định dạng mảng $data, trong đó có uid, biểu thị dữ liệu được đẩy đến trang của uid đó
$data = json_decode($buffer, true);
$uid = $data['uid'];
// Thông qua workerman, đẩy dữ liệu đến trang của uid
$ret = sendMessageByUid($uid, $buffer);
// Trả về kết quả đẩy
$connection->send($ret ? 'ok' : 'fail');
};
// ## Thực hiện lắng nghe ##
$inner_text_worker->listen();
};
// Thêm một thuộc tính mới để lưu giữ ánh xạ uid đến connection
$worker->uidConnections = array();
// Hàm callback thực thi khi có tin nhắn từ client
$worker->onMessage = function(TcpConnection $connection, $data)
{
global $worker;
// Kiểm tra xem client hiện tại đã xác thực chưa, tức là đã có uid hay chưa
if(!isset($connection->uid))
{
// Nếu chưa xác thực thì xem gói tin đầu tiên là uid (để tiện cho việc trình diễn, không thực hiện xác thực thực sự)
$connection->uid = $data;
/* Lưu uid vào ánh xạ connection, như vậy có thể dễ dàng tìm kiếm connection qua uid,
* thực hiện gửi dữ liệu cho uid cụ thể
*/
$worker->uidConnections[$connection->uid] = $connection;
return;
}
};
// Khi client ngắt kết nối
$worker->onClose = function(TcpConnection $connection)
{
global $worker;
if(isset($connection->uid))
{
// Khi ngắt kết nối sẽ xóa ánh xạ
unset($worker->uidConnections[$connection->uid]);
}
};
// Đẩy dữ liệu đến tất cả người dùng đã xác thực
function broadcast($message)
{
global $worker;
foreach($worker->uidConnections as $connection)
{
$connection->send($message);
}
}
// Đẩy dữ liệu theo uid
function sendMessageByUid($uid, $message)
{
global $worker;
if(isset($worker->uidConnections[$uid]))
{
$connection = $worker->uidConnections[$uid];
$connection->send($message);
return true;
}
return false;
}
// Chạy tất cả các worker
Worker::runAll();
Khởi động dịch vụ backend
php push.php start -d
Mã js phía frontend nhận đẩy
var ws = new WebSocket('ws://127.0.0.1:1234');
ws.onopen = function(){
var uid = 'uid1';
ws.send(uid);
};
ws.onmessage = function(e){
alert(e.data);
};
Mã máy chủ đẩy tin nhắn
// Tạo kết nối socket đến cổng đẩy nội bộ
$client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
// Dữ liệu để đẩy, chứa trường uid, có nghĩa là đẩy đến uid này
$data = array('uid'=>'uid1', 'percent'=>'88%');
// Gửi dữ liệu, lưu ý cổng 5678 là cổng giao thức Text, giao thức Text cần thêm ký tự xuống dòng ở cuối dữ liệu
fwrite($client, json_encode($data)."\n");
// Đọc kết quả đẩy
echo fread($client, 8192);