Example 1
(Requires Workerman version >= 3.3.0)
A multi-process group push system based on Worker
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
$channel_server = new Channel\Server('0.0.0.0', 2206);
$worker = new Worker('websocket://0.0.0.0:1234');
$worker->count = 8;
// Global mapping array from groups to connections
$group_con_map = array();
$worker->onWorkerStart = function(){
// Channel client connects to Channel server
Channel\Client::connect('127.0.0.1', 2206);
// Listen for global group message sending events
Channel\Client::on('send_to_group', function($event_data){
$group_id = $event_data['group_id'];
$message = $event_data['message'];
global $group_con_map;
var_dump(array_keys($group_con_map));
if (isset($group_con_map[$group_id])) {
foreach ($group_con_map[$group_id] as $con) {
$con->send($message);
}
}
});
};
$worker->onMessage = function(TcpConnection $con, $data){
// Join group message {"cmd":"add_group", "group_id":"123"}
// Or send message to group {"cmd":"send_to_group", "group_id":"123", "message":"this is a message"}
$data = json_decode($data, true);
var_dump($data);
$cmd = $data['cmd'];
$group_id = $data['group_id'];
switch($cmd) {
// Connect to join group
case "add_group":
global $group_con_map;
// Add connection to the corresponding group array
$group_con_map[$group_id][$con->id] = $con;
// Record which groups this connection has joined for cleanup in onclose
$con->group_id = isset($con->group_id) ? $con->group_id : array();
$con->group_id[$group_id] = $group_id;
break;
// Broadcast message to group
case "send_to_group":
// Channel\Client broadcasts group sending message event to all processes on all servers
Channel\Client::publish('send_to_group', array(
'group_id'=>$group_id,
'message'=>$data['message']
));
break;
}
};
// This is very important, remove connection from global group data on close to avoid memory leaks
$worker->onClose = function(TcpConnection $con){
global $group_con_map;
// Iterate over all groups the connection has joined and remove corresponding data from group_con_map
if (isset($con->group_id)) {
foreach ($con->group_id as $group_id) {
unset($group_con_map[$group_id][$con->id]);
if (empty($group_con_map[$group_id])) {
unset($group_con_map[$group_id]);
}
}
}
};
Worker::runAll();
Testing (Assuming everything runs on localhost 127.0.0.1)
- Run the server
php start.php start Workerman[del.php] start in DEBUG mode ----------------------- WORKERMAN ----------------------------- Workerman version:3.4.2 PHP version:7.1.3 ------------------------ WORKERS ------------------------------- user worker listen processes status liliang ChannelServer frame://0.0.0.0:2206 1 [OK] liliang none websocket://0.0.0.0:1234 12 [OK] ---------------------------------------------------------------- Press Ctrl-C to quit. Start success.
2. Client connects to the server
Open Chrome browser, press F12 to open the debug console, and enter in the Console tab (or place the following code in an HTML page to run with JS)
```javascript
// Assuming the server IP is 127.0.0.1, please change to the actual server IP during testing
ws = new WebSocket('ws://127.0.0.1:1234');
ws.onmessage = function(data){console.log(data.data)};
ws.onopen = function() {
ws.send('{"cmd":"add_group", "group_id":"123"}');
ws.send('{"cmd":"send_to_group", "group_id":"123", "message":"this is a message"}');
};