workerman/redis-queue
A Redis-based message queue that supports delayed message processing.
Project Address:
https://github.com/walkor/redis-queue
Installation:
composer require workerman/redis-queue
Example
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Subscribe
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Subscribe
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Callback triggered on consumption failure (optional)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Queue " . $package['queue'] . " consumption failed\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// Send messages to the queue at intervals
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
Creates an instance
-
$addressSimilar toredis://ip:6379, must start with redis. -
$optionsIncludes the following options:auth: Authentication information, default ''db: Database, default 0max_attempts: Number of retries after consumption failure, default 5retry_seconds: Retry interval in seconds, default 5
Consumption failure refers to throwing an
ExceptionorErrorby the business logic. After a consumption failure, the message will be placed in a delayed queue waiting for a retry, and the number of retries is controlled bymax_attempts, while the retry interval is jointly controlled byretry_secondsandmax_attempts. For example, ifmax_attemptsis 5 andretry_secondsis 10, the first retry interval will be1*10seconds, the second retry interval will be2*10seconds, the third retry interval will be3*10seconds, and so on until the 5th retry. If the retry count exceeds themax_attemptsset retry count, the message will be placed in a failure queue with the key{redis-queue}-failed(before version 1.0.5 it wasredis-queue-failed)
send(String $queue, Mixed $data, [int $dely=0])
Sends a message to the queue
$queueQueue name,Stringtype$dataThe specific message to be published, can be an array or a string,Mixedtype$delyDelay consumption time, in seconds, default 0,Inttype
subscribe(mixed $queue, callable $callback)
Subscribes to one or more queues
$queueQueue name, can be a string or an array containing multiple queue names$callbackCallback function, in the format offunction (Mixed $data), where$datais thedatainsend($queue, $data).
unsubscribe(mixed $queue)
Unsubscribes
$queueQueue name or an array containing multiple queue names
onConsumeFailure(callable $callback)
Triggered on consumption failure
$callback-function (\Throwable $exception, array $package), where$packageis the internal data structure of the queue, containing information such asdata,queue,attempts,max_attempts, etc.
You can modify the internal data structure value of $package by simply returning the modified $package. For example, when a certain message encounters a specific error and you do not want to retry, the code would look like this:
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Failed queue name:" . $package['queue'] . "\n";
// When a certain fatal error occurs in the queue
if ($exception->getMessage() === 'Some Fatal error') {
// Set the maximum retry attempts for this message to 0
$package['max_attempts'] = 0;
}
// Return the modified `$package` data structure
return $package;
});
Sending Messages to the Queue in Non-Workerman Environments
Sometimes, some projects run in an Apache or PHP-FPM environment where the workerman/redis-queue project cannot be used. You can refer to the following function implementation for sending messages:
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; // before version 1.0.5 it was redis-queue-waiting
$queue_delay = '{redis-queue}-delayed';// before version 1.0.5 it was redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Where the parameter $redis is the redis instance. For example, the usage of the Redis extension is similar to the following:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);