workerman/redis-queue

Cola de mensajes basada en Redis que soporta el procesamiento de mensajes con retraso.

Dirección del proyecto:

https://github.com/walkor/redis-queue

Instalación:

composer require workerman/redis-queue

Ejemplo

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // Suscribirse
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // Suscribirse
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // Callback que se activa cuando falla el consumo (opcional)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "Fallo en el consumo de la cola " . $package['queue'] . "\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // Enviar mensajes a la cola de forma periódica
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

Crear instancia

  • $address Similar a redis://ip:6379, debe comenzar con redis.

  • $options Incluye las siguientes opciones:

    • auth: Información de autenticación, por defecto ''
    • db: db, por defecto 0
    • max_attempts: Número de reintentos después de un fallo en el consumo, por defecto 5
    • retry_seconds: Intervalo de tiempo para reintentar, en segundos. Por defecto 5

Un fallo en el consumo se refiere a una excepción Exception o Error lanzada por el negocio. Después de un fallo en el consumo, el mensaje se coloca en la cola de retraso esperando ser reintentado, el número de reintentos está controlado por max_attempts, y el intervalo de reintentos es controlado conjuntamente por retry_seconds y max_attempts. Por ejemplo, si max_attempts es 5 y retry_seconds es 10, el primer intervalo de reintento es 1*10 segundos, el segundo intervalo de reintento es 2*10 segundos, el tercer intervalo de reintento es 3*10 segundos, y así sucesivamente hasta un máximo de 5 reintentos. Si se excede el número de reintentos configurados en max_attempts, el mensaje se coloca en la cola de fallos con la clave {redis-queue}-failed (anteriormente redis-queue-failed en versiones anteriores a 1.0.5).


send(String $queue, Mixed $data, [int $dely=0])

Enviar un mensaje a la cola

  • $queue Nombre de la cola, tipo String
  • $data Mensaje específico a publicar, puede ser un arreglo o una cadena, tipo Mixed
  • $dely Tiempo de retraso para el consumo, en segundos, por defecto 0, tipo Int

subscribe(mixed $queue, callable $callback)

Suscribirse a una o varias colas

  • $queue Nombre de la cola, puede ser una cadena o un arreglo que contenga varios nombres de cola
  • $callback Función de callback, en el formato function (Mixed $data), donde $data es el $data de send($queue, $data).

unsubscribe(mixed $queue)

Anular suscripción

  • $queue Nombre de la cola o un arreglo que contenga varios nombres de cola

onConsumeFailure(callable $callback)

Activado en caso de fallo en el consumo

  • $callback - function (\Throwable $exception, array $package), $package es la estructura de datos interna de la cola, que contiene información sobre data, queue, attempts, max_attempts, etc.

Se permite modificar los valores de la estructura de datos interna $package, simplemente hay que retornar el $package modificado. Por ejemplo, si no se desea intentar de nuevo un mensaje que ha producido un error, el código sería similar a

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "Nombre de la cola fallida:" . $package['queue'] . "\n";
    // Cuando ocurre un error fatal en la cola
    if ($exception->getMessage() === 'Some Fatal error') {
        // Establecer el número máximo de reintentos para este mensaje en 0
        $package['max_attempts'] = 0;
    }
    // Retornar la estructura de datos `$package` modificada
    return $package;
});

Enviar mensajes a la cola en un entorno que no sea workerman

A veces, algunos proyectos se ejecutan en un entorno apache o php-fpm y no pueden utilizar el proyecto workerman/redis-queue. Se puede consultar la siguiente función para enviar mensajes

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; // Anteriormente `redis-queue-waiting` en versiones antes de 1.0.5
    $queue_delay = '{redis-queue}-delayed'; // Anteriormente `redis-queue-delayed` en versiones antes de 1.0.5

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Donde el parámetro $redis es una instancia de redis. Por ejemplo, el uso de la extensión redis se vería similar a lo siguiente:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);