workerman/redis-queue
Очередь сообщений, основанная на Redis, поддерживающая отложенную обработку сообщений.
Ссылка на проект:
https://github.com/walkor/redis-queue
Установка:
composer require workerman/redis-queue
Пример
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Подписаться
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Подписаться
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Регулярно отправлять сообщения в очередь
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
__construct (string $address, [array $options])
Создание экземпляра
-
$address
похоже наredis://ip:6379
, должно начинаться с redis. -
$options
включает следующие опции:auth
: информация для аутентификации, по умолчанию ''db
: база данных, по умолчанию 0max_attempts
: количество попыток повторной обработки при ошибке, по умолчанию 5retry_seconds
: интервал повторной обработки, в секундах, по умолчанию 5
Ошибка обработки означает возникновение исключения
Exception
илиError
в процессе обработки. После ошибки обработки сообщение будет помещено в отложенную очередь и ожидать повторной обработки. Количество попыток повторной обработки управляется параметромmax_attempts
, а интервал повторной обработки управляется параметрамиretry_seconds
иmax_attempts
. Например, еслиmax_attempts
равно 5, аretry_seconds
равно 10, то интервал повторной обработки для первой попытки составит1*10
секунд, для второй попытки -2*10
секунд, для третьей попытки -3*10
секунд и так далее до пятой попытки. Если количество попыток превысит установленное значениеmax_attempts
, сообщение будет помещено в очередь необработанных сообщений с ключом{redis-queue}-failed
(до версии 1.0.5 -redis-queue-failed
).
send(String $queue, Mixed $data, [int $dely=0])
Отправка сообщения в очередь
$queue
название очереди, типString
$data
конкретное отправляемое сообщение, может быть массивом или строкой, типMixed
$dely
время задержки обработки, в секундах, по умолчанию 0, типInt
subscribe(mixed $queue, callable $callback)
Подписка на одну или несколько очередей
$queue
название очереди, может быть строкой или массивом, содержащим несколько названий очередей$callback
функция обратного вызова в форматеfunction (Mixed $data)
, где$data
- это$data
изsend($queue, $data)
.
unsubscribe(mixed $queue)
Отмена подписки
$queue
название очереди или массив, содержащий несколько названий очередей
Отправка сообщений в очередь вне среды workerman
Иногда некоторые проекты работают в среде Apache или PHP-FPM и не могут использовать проект workerman/redis-queue. В этом случае можно использовать следующую функцию для отправки сообщений:
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; //до версии 1.0.5 - redis-queue-waiting
$queue_delay = '{redis-queue}-delayed';//до версии 1.0.5 - redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Где параметр $redis
- это экземпляр Redis. Например, использование расширения Redis может выглядеть как показано ниже:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);