workerman/redis-queue

A Redis-based message queue that supports delayed message processing.

Project Address:

https://github.com/walkor/redis-queue

Installation:

composer require workerman/redis-queue

Example

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // Subscribe
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // Subscribe
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // Callback triggered on consumption failure (optional)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "Queue " . $package['queue'] . " consumption failed\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // Send messages to the queue at intervals
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

Creates an instance

  • $address Similar to redis://ip:6379, must start with redis.

  • $options Includes the following options:

    • auth: Authentication information, default ''
    • db: Database, default 0
    • max_attempts: Number of retries after consumption failure, default 5
    • retry_seconds: Retry interval in seconds, default 5

Consumption failure refers to throwing an Exception or Error by the business logic. After a consumption failure, the message will be placed in a delayed queue waiting for a retry, and the number of retries is controlled by max_attempts, while the retry interval is jointly controlled by retry_seconds and max_attempts. For example, if max_attempts is 5 and retry_seconds is 10, the first retry interval will be 1*10 seconds, the second retry interval will be 2*10 seconds, the third retry interval will be 3*10 seconds, and so on until the 5th retry. If the retry count exceeds the max_attempts set retry count, the message will be placed in a failure queue with the key {redis-queue}-failed (before version 1.0.5 it was redis-queue-failed)


send(String $queue, Mixed $data, [int $dely=0])

Sends a message to the queue

  • $queue Queue name, String type
  • $data The specific message to be published, can be an array or a string, Mixed type
  • $dely Delay consumption time, in seconds, default 0, Int type

subscribe(mixed $queue, callable $callback)

Subscribes to one or more queues

  • $queue Queue name, can be a string or an array containing multiple queue names
  • $callback Callback function, in the format of function (Mixed $data), where $data is the data in send($queue, $data).

unsubscribe(mixed $queue)

Unsubscribes

  • $queue Queue name or an array containing multiple queue names

onConsumeFailure(callable $callback)

Triggered on consumption failure

  • $callback - function (\Throwable $exception, array $package), where $package is the internal data structure of the queue, containing information such as data, queue, attempts, max_attempts, etc.

You can modify the internal data structure value of $package by simply returning the modified $package. For example, when a certain message encounters a specific error and you do not want to retry, the code would look like this:

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "Failed queue name:" . $package['queue'] . "\n";
    // When a certain fatal error occurs in the queue
    if ($exception->getMessage() === 'Some Fatal error') {
        // Set the maximum retry attempts for this message to 0
        $package['max_attempts'] = 0;
    }
    // Return the modified `$package` data structure
    return $package;
});

Sending Messages to the Queue in Non-Workerman Environments

Sometimes, some projects run in an Apache or PHP-FPM environment where the workerman/redis-queue project cannot be used. You can refer to the following function implementation for sending messages:

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; // before version 1.0.5 it was redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed';// before version 1.0.5 it was redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Where the parameter $redis is the redis instance. For example, the usage of the Redis extension is similar to the following:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);