workerman/redis-queue
Сообщения в очереди на основе Redis, поддерживающие отложенную обработку сообщений.
Адрес проекта:
https://github.com/walkor/redis-queue
Установка:
composer require workerman/redis-queue
Пример
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Подписка
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Подписка
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Обработчик при сбое потребления (по желанию)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Очередь " . $package['queue'] . " не удалось обработать\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// Регулярная отправка сообщений в очередь
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
Создает экземпляр
-
$addressаналогичноredis://ip:6379, должно начинаться с redis. -
$optionsвключает в себя следующие параметры:auth: информация для аутентификации, по умолчанию ''db: база данных, по умолчанию 0max_attempts: количество попыток повторного потребления после сбоя, по умолчанию 5retry_seconds: интервал повторной попытки, в секундах. По умолчанию 5
Подъем ошибки при потреблении означает, что бизнес-логика выбрасывает исключение
ExceptionилиError. При сбое потребления сообщение помещается в очередь задержки в ожидании повторной попытки, количество повторных попыток контролируетсяmax_attempts, а интервал повторной попытки управляетсяretry_secondsиmax_attemptsсовместно. Например, еслиmax_attemptsравно 5, аretry_secondsравно 10, первый интервал повторной попытки будет1*10секунд, второй интервал повторной попытки —2*10 секунд, третий интервал повторной попытки —3*10 секунди так далее до 5 попыток. Если превышено значение, установленное вmax_attempts, то сообщение помещается в очередь с ключом{redis-queue}-failed(до версии 1.0.5 —redis-queue-failed).
send(String $queue, Mixed $data, [int $dely=0])
Отправляет сообщение в очередь
$queueимя очереди, типString$dataконкретное сообщение, которое отправляется, может быть массивом или строкой, типMixed$delyвремя задержки обработки, в секундах, по умолчанию 0, типInt
subscribe(mixed $queue, callable $callback)
Подписывается на одну или несколько очередей
$queueимя очереди, может быть строкой или массивом с несколькими именами очередей$callbackколбэк-функция, форматfunction (Mixed $data), где$dataсоответствует$dataизsend($queue, $data).
unsubscribe(mixed $queue)
Отменяет подписку
$queueимя очереди или массив с несколькими именами очередей
onConsumeFailure(callable $callback)
Срабатывает при сбое потребления
$callback-function (\Throwable $exception, array $package),$packageпредставляет собой внутреннюю структуру данных очереди, включающуюdata,queue,attempts,max_attemptsи другую информацию.
Поддерживается изменение значений внутренней структуры данных $package, просто верните измененный $package. Например, когда какое-то сообщение вызывает определенную ошибку и вы не хотите, чтобы оно повторно пыталось, код будет выглядеть так:
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Имя неудачной очереди:" . $package['queue'] . "\n";
// Если в очереди произошла какая-то фатальная ошибка
if ($exception->getMessage() === 'Some Fatal error') {
// Установить максимальное количество попыток повторного потребления для этого сообщения в 0
$package['max_attempts'] = 0;
}
// Вернуть измененную структуру данных `$package`
return $package;
});
Отправка сообщений в очередь в среде, не использующей workerman
Иногда некоторые проекты работают в среде Apache или PHP-FPM и не могут использовать workerman/redis-queue; вы можете использовать следующую функцию для отправки сообщений:
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; // до версии 1.0.5 было redis-queue-waiting
$queue_delay = '{redis-queue}-delayed'; // до версии 1.0.5 было redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Здесь параметр $redis является экземпляром Redis. Например, использование расширения Redis может выглядеть следующим образом:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);