workerman/redis-queue
Cola de mensajes basada en Redis que soporta el procesamiento de mensajes con retraso.
Dirección del proyecto:
https://github.com/walkor/redis-queue
Instalación:
composer require workerman/redis-queue
Ejemplo
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Suscribirse
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Suscribirse
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Callback que se activa cuando falla el consumo (opcional)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Fallo en el consumo de la cola " . $package['queue'] . "\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// Enviar mensajes a la cola de forma periódica
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
Crear instancia
-
$addressSimilar aredis://ip:6379, debe comenzar con redis. -
$optionsIncluye las siguientes opciones:auth: Información de autenticación, por defecto ''db: db, por defecto 0max_attempts: Número de reintentos después de un fallo en el consumo, por defecto 5retry_seconds: Intervalo de tiempo para reintentar, en segundos. Por defecto 5
Un fallo en el consumo se refiere a una excepción
ExceptionoErrorlanzada por el negocio. Después de un fallo en el consumo, el mensaje se coloca en la cola de retraso esperando ser reintentado, el número de reintentos está controlado pormax_attempts, y el intervalo de reintentos es controlado conjuntamente porretry_secondsymax_attempts. Por ejemplo, simax_attemptses 5 yretry_secondses 10, el primer intervalo de reintento es1*10segundos, el segundo intervalo de reintento es2*10segundos, el tercer intervalo de reintento es3*10segundos, y así sucesivamente hasta un máximo de 5 reintentos. Si se excede el número de reintentos configurados enmax_attempts, el mensaje se coloca en la cola de fallos con la clave{redis-queue}-failed(anteriormenteredis-queue-faileden versiones anteriores a 1.0.5).
send(String $queue, Mixed $data, [int $dely=0])
Enviar un mensaje a la cola
$queueNombre de la cola, tipoString$dataMensaje específico a publicar, puede ser un arreglo o una cadena, tipoMixed$delyTiempo de retraso para el consumo, en segundos, por defecto 0, tipoInt
subscribe(mixed $queue, callable $callback)
Suscribirse a una o varias colas
$queueNombre de la cola, puede ser una cadena o un arreglo que contenga varios nombres de cola$callbackFunción de callback, en el formatofunction (Mixed $data), donde$dataes el$datadesend($queue, $data).
unsubscribe(mixed $queue)
Anular suscripción
$queueNombre de la cola o un arreglo que contenga varios nombres de cola
onConsumeFailure(callable $callback)
Activado en caso de fallo en el consumo
$callback-function (\Throwable $exception, array $package),$packagees la estructura de datos interna de la cola, que contiene información sobredata,queue,attempts,max_attempts, etc.
Se permite modificar los valores de la estructura de datos interna $package, simplemente hay que retornar el $package modificado. Por ejemplo, si no se desea intentar de nuevo un mensaje que ha producido un error, el código sería similar a
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Nombre de la cola fallida:" . $package['queue'] . "\n";
// Cuando ocurre un error fatal en la cola
if ($exception->getMessage() === 'Some Fatal error') {
// Establecer el número máximo de reintentos para este mensaje en 0
$package['max_attempts'] = 0;
}
// Retornar la estructura de datos `$package` modificada
return $package;
});
Enviar mensajes a la cola en un entorno que no sea workerman
A veces, algunos proyectos se ejecutan en un entorno apache o php-fpm y no pueden utilizar el proyecto workerman/redis-queue. Se puede consultar la siguiente función para enviar mensajes
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; // Anteriormente `redis-queue-waiting` en versiones antes de 1.0.5
$queue_delay = '{redis-queue}-delayed'; // Anteriormente `redis-queue-delayed` en versiones antes de 1.0.5
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Donde el parámetro $redis es una instancia de redis. Por ejemplo, el uso de la extensión redis se vería similar a lo siguiente:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);