workerman/redis-queue
Message queue based on Redis that supports delayed message processing.
Project Address:
https://github.com/walkor/redis-queue
Installation:
composer require workerman/redis-queue
Example
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Subscribe
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Subscribe
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Send messages to the queue at regular intervals
Timer::add(1, function() use ($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
__construct (string $address, [array $options])
Create an instance
-
$address
similar toredis://ip:6379
, must start with redis. -
$options
includes the following options:auth
: authentication information, default is ''db
: database, default is 0max_attempts
: number of retry attempts after consumption failure, default is 5retry_seconds
: retry interval in seconds, default is 5
Consumption failure means that the business throws an exception
Exception
orError
. After consumption failure, the message will be placed in the delay queue for retry. The number of retries is controlled bymax_attempts
, and the retry interval is jointly controlled byretry_seconds
andmax_attempts
. For example, ifmax_attempts
is 5 andretry_seconds
is 10, the interval for the first retry is1*10
seconds, the interval for the second retry is2*10
seconds, and so on, until the fifth retry. If the number of retries exceeds themax_attempts
, the message is placed in the failed queue with the key{redis-queue}-failed
(before version 1.0.5, it wasredis-queue-failed
).
send(String $queue, Mixed $data, [int $dely=0])
Send a message to the queue
$queue
queue name, typeString
$data
specific message being published, can be an array or a string, typeMixed
$dely
delay in consumption time, default is 0, typeInt
subscribe(mixed $queue, callable $callback)
Subscribe to a single queue or multiple queues
$queue
queue name, can be a string or an array containing multiple queue names$callback
callback function in the formatfunction (Mixed $data)
, where$data
is the same as$data
insend($queue, $data)
.
unsubscribe(mixed $queue)
Unsubscribe from a single queue or multiple queues
$queue
queue name or an array containing multiple queue names
Sending messages to the queue in a non-workerman environment
Sometimes, some projects run in an Apache or PHP-FPM environment and cannot use the workerman/redis-queue project. You can refer to the following function to implement sending:
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; //Before version 1.0.5, it was redis-queue-waiting
$queue_delay = '{redis-queue}-delayed'; //Before version 1.0.5, it was redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Where the parameter $redis
is the redis instance. For example, the usage of the redis extension is similar to the following:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);