workerman/redis-queue

Uma fila de mensagens baseada em Redis, que suporta o processamento de mensagens com atraso.

Endereço do projeto:

https://github.com/walkor/redis-queue

Instalação:

composer require workerman/redis-queue

Exemplo

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // Assinar
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // Assinar
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // Callback para falha na consumação (opcional)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "Fila " . $package['queue'] . " falhou na consumação\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // Enviar mensagens para a fila periodicamente
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

Cria uma instância

  • $address Similar a redis://ip:6379, deve começar com redis.

  • $options Inclui as seguintes opções:

    • auth: Informação de autenticação, padrão ''
    • db: db, padrão 0
    • max_attempts: Número de tentativas de reconsumo após falha, padrão 5
    • retry_seconds: Intervalo de tempo para reconsumo, em segundos. Padrão 5

A falha na consumação refere-se a uma exceção Exception ou Error lançada pela lógica de negócio. Após a falha, a mensagem será colocada na fila de atraso aguardando reconsumo, o número de tentativas é controlado por max_attempts, e o intervalo de reconsumo é controlado por retry_seconds e max_attempts juntos. Por exemplo, se max_attempts for 5 e retry_seconds for 10, o intervalo do primeiro reconsumo será 1*10 segundos, o segundo reconsumo será 2*10 segundos, o terceiro será 3*10 segundos, e assim por diante até 5 tentativas. Se o número de reconsumos exceder o configurado em max_attempts, a mensagem será colocada na fila de falhas com a chave {redis-queue}-failed (anteriormente redis-queue-failed antes da versão 1.0.5).


send(String $queue, Mixed $data, [int $dely=0])

Enviar uma mensagem para a fila

  • $queue Nome da fila, tipo String
  • $data Mensagem específica a ser publicada, pode ser um array ou string, tipo Mixed
  • $dely Tempo de atraso para consumação, em segundos, padrão 0, tipo Int

subscribe(mixed $queue, callable $callback)

Assinar uma fila ou várias filas

  • $queue Nome da fila, pode ser uma string ou um array contendo múltiplos nomes de filas
  • $callback Função de callback, no formato function (Mixed $data), onde $data é o mesmo data utilizado em send($queue, $data).

unsubscribe(mixed $queue)

Cancelar inscrição

  • $queue Nome da fila ou um array contendo múltiplos nomes de filas

onConsumeFailure(callable $callback)

Acionado em caso de falha na consumação

  • $callback - function (\Throwable $exception, array $package), onde $package é a estrutura de dados interna da fila, contendo informações como data, queue, attempts, max_attempts, entre outras.

A estrutura de dados interna $package pode ser modificada, basta retornar a versão alterada de $package. Por exemplo, se um erro ocorrer enquanto processamos alguma mensagem e não quisermos que ela seja reprocessada, o código seria semelhante a:

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "Nome da fila com falha: " . $package['queue'] . "\n";
    // Quando ocorre um erro fatal na fila
    if ($exception->getMessage() === 'Some Fatal error') {
        // Configurar o número máximo de tentativas de reconsumo para 0
        $package['max_attempts'] = 0;
    }
    // Retornar a estrutura de dados `$package` modificada
    return $package;
});

Enviar mensagens para a fila em um ambiente que não é Workerman

Às vezes, alguns projetos são executados em ambientes como apache ou php-fpm, e não podem usar o projeto workerman/redis-queue. Abaixo, segue uma função de exemplo para enviar mensagens.

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; //Anteriormente, na versão 1.0.5, era redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed';//Anteriormente, na versão 1.0.5, era redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Onde, o parâmetro $redis é uma instância do redis. Por exemplo, o uso da extensão redis seria semelhante ao seguinte:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);