workerman/redis-queue

Сообщения в очереди на основе Redis, поддерживающие отложенную обработку сообщений.

Адрес проекта:

https://github.com/walkor/redis-queue

Установка:

composer require workerman/redis-queue

Пример

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // Подписка
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // Подписка
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // Обработчик при сбое потребления (по желанию)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "Очередь " . $package['queue'] . " не удалось обработать\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // Регулярная отправка сообщений в очередь
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

Создает экземпляр

  • $address аналогично redis://ip:6379, должно начинаться с redis.

  • $options включает в себя следующие параметры:

    • auth: информация для аутентификации, по умолчанию ''
    • db: база данных, по умолчанию 0
    • max_attempts: количество попыток повторного потребления после сбоя, по умолчанию 5
    • retry_seconds: интервал повторной попытки, в секундах. По умолчанию 5

Подъем ошибки при потреблении означает, что бизнес-логика выбрасывает исключение Exception или Error. При сбое потребления сообщение помещается в очередь задержки в ожидании повторной попытки, количество повторных попыток контролируется max_attempts, а интервал повторной попытки управляется retry_seconds и max_attempts совместно. Например, если max_attempts равно 5, а retry_seconds равно 10, первый интервал повторной попытки будет 1*10 секунд, второй интервал повторной попытки — 2*10 секунд, третий интервал повторной попытки — 3*10 секунд и так далее до 5 попыток. Если превышено значение, установленное в max_attempts, то сообщение помещается в очередь с ключом {redis-queue}-failed (до версии 1.0.5 — redis-queue-failed).


send(String $queue, Mixed $data, [int $dely=0])

Отправляет сообщение в очередь

  • $queue имя очереди, тип String
  • $data конкретное сообщение, которое отправляется, может быть массивом или строкой, тип Mixed
  • $dely время задержки обработки, в секундах, по умолчанию 0, тип Int

subscribe(mixed $queue, callable $callback)

Подписывается на одну или несколько очередей

  • $queue имя очереди, может быть строкой или массивом с несколькими именами очередей
  • $callback колбэк-функция, формат function (Mixed $data), где $data соответствует $data из send($queue, $data).

unsubscribe(mixed $queue)

Отменяет подписку

  • $queue имя очереди или массив с несколькими именами очередей

onConsumeFailure(callable $callback)

Срабатывает при сбое потребления

  • $callback - function (\Throwable $exception, array $package), $package представляет собой внутреннюю структуру данных очереди, включающую data, queue, attempts, max_attempts и другую информацию.

Поддерживается изменение значений внутренней структуры данных $package, просто верните измененный $package. Например, когда какое-то сообщение вызывает определенную ошибку и вы не хотите, чтобы оно повторно пыталось, код будет выглядеть так:

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "Имя неудачной очереди:" . $package['queue'] . "\n";
    // Если в очереди произошла какая-то фатальная ошибка
    if ($exception->getMessage() === 'Some Fatal error') {
        // Установить максимальное количество попыток повторного потребления для этого сообщения в 0
        $package['max_attempts'] = 0;
    }
    // Вернуть измененную структуру данных `$package`
    return $package;
});

Отправка сообщений в очередь в среде, не использующей workerman

Иногда некоторые проекты работают в среде Apache или PHP-FPM и не могут использовать workerman/redis-queue; вы можете использовать следующую функцию для отправки сообщений:

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; // до версии 1.0.5 было redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed'; // до версии 1.0.5 было redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Здесь параметр $redis является экземпляром Redis. Например, использование расширения Redis может выглядеть следующим образом:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);