workerman/redis-queue
Cola de mensajes basada en Redis, con soporte para el procesamiento de mensajes con retraso.
Dirección del proyecto:
https://github.com/walkor/redis-queue
Instalación:
composer require workerman/redis-queue
Ejemplo
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Suscribirse
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Suscribirse
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Enviar mensajes a la cola en intervalos regulares
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
__construct (string $address, [array $options])
Crear una instancia
-
$address
similar aredis://ip:6379
, debe comenzar con redis. -
$options
incluye las siguientes opciones:auth
: información de autenticación, por defecto ''db
: base de datos, por defecto 0max_attempts
: número de intentos de reintentos después de un consumo fallido, por defecto 5retry_seconds
: intervalo de tiempo entre reintentos, en segundos. Por defecto 5
El consumo fallido se refiere a cuando el negocio arroja una excepción
Exception
oError
. Después de un consumo fallido, el mensaje se coloca en la cola de retraso para volver a intentarlo, el número de intentos se controla conmax_attempts
, y el intervalo de reintentos se controla a través deretry_seconds
ymax_attempts
. Por ejemplo, simax_attempts
es 5 yretry_seconds
es 10, el intervalo de tiempo para el primer reintentoes1*10
segundos, para el segundo es2*10
segundos, para el tercero es3*10
segundos, y así sucesivamente hasta 5 intentos de reintentos. Si se supera el número de intentos configurado enmax_attempts
, el mensaje se coloca en la cola de fallidos con la clave{redis-queue}-failed
(antes de la versión 1.0.5 eraredis-queue-failed
)
send(String $queue, Mixed $data, [int $dely=0])
Enviar un mensaje a la cola
$queue
nombre de la cola, tipoString
$data
mensaje específico a publicar, puede ser un array o una cadena, tipoMixed
$dely
tiempo de retraso en el consumo, en segundos, por defecto 0, tipoInt
subscribe(mixed $queue, callable $callback)
Suscribirse a una o varias colas
$queue
nombre de la cola, puede ser una cadena o un array que contiene varios nombres de cola$callback
función de devolución de llamada, con el formatofunction (Mixed $data)
, donde$data
es el mismo que$data
ensend($queue, $data)
.
unsubscribe(mixed $queue)
Cancelar la suscripción
$queue
nombre de la cola o un array que contiene varios nombres de cola
Enviar mensajes a la cola en un entorno no workerman
A veces, algunos proyectos se ejecutan en entornos como apache o php-fpm, donde no se puede utilizar el proyecto workerman/redis-queue. En ese caso, se puede utilizar la siguiente función para enviarlos:
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; //antes de la versión 1.0.5 era redis-queue-waiting
$queue_delay = '{redis-queue}-delayed';//antes de la versión 1.0.5 era redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Donde el parámetro $redis
es una instancia de redis. Por ejemplo, el uso de la extensión redis es similar al siguiente ejemplo:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);