workerman/redis-queue
Uma fila de mensagens baseada em Redis, que suporta o processamento de mensagens com atraso.
Endereço do projeto:
https://github.com/walkor/redis-queue
Instalação:
composer require workerman/redis-queue
Exemplo
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Assinar
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Assinar
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Callback para falha na consumação (opcional)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Fila " . $package['queue'] . " falhou na consumação\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// Enviar mensagens para a fila periodicamente
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
Cria uma instância
-
$addressSimilar aredis://ip:6379, deve começar com redis. -
$optionsInclui as seguintes opções:auth: Informação de autenticação, padrão ''db: db, padrão 0max_attempts: Número de tentativas de reconsumo após falha, padrão 5retry_seconds: Intervalo de tempo para reconsumo, em segundos. Padrão 5
A falha na consumação refere-se a uma exceção
ExceptionouErrorlançada pela lógica de negócio. Após a falha, a mensagem será colocada na fila de atraso aguardando reconsumo, o número de tentativas é controlado pormax_attempts, e o intervalo de reconsumo é controlado porretry_secondsemax_attemptsjuntos. Por exemplo, semax_attemptsfor 5 eretry_secondsfor 10, o intervalo do primeiro reconsumo será1*10segundos, o segundo reconsumo será2*10segundos, o terceiro será3*10segundos, e assim por diante até 5 tentativas. Se o número de reconsumos exceder o configurado emmax_attempts, a mensagem será colocada na fila de falhas com a chave{redis-queue}-failed(anteriormenteredis-queue-failedantes da versão 1.0.5).
send(String $queue, Mixed $data, [int $dely=0])
Enviar uma mensagem para a fila
$queueNome da fila, tipoString$dataMensagem específica a ser publicada, pode ser um array ou string, tipoMixed$delyTempo de atraso para consumação, em segundos, padrão 0, tipoInt
subscribe(mixed $queue, callable $callback)
Assinar uma fila ou várias filas
$queueNome da fila, pode ser uma string ou um array contendo múltiplos nomes de filas$callbackFunção de callback, no formatofunction (Mixed $data), onde$dataé o mesmodatautilizado emsend($queue, $data).
unsubscribe(mixed $queue)
Cancelar inscrição
$queueNome da fila ou um array contendo múltiplos nomes de filas
onConsumeFailure(callable $callback)
Acionado em caso de falha na consumação
$callback-function (\Throwable $exception, array $package), onde$packageé a estrutura de dados interna da fila, contendo informações comodata,queue,attempts,max_attempts, entre outras.
A estrutura de dados interna $package pode ser modificada, basta retornar a versão alterada de $package. Por exemplo, se um erro ocorrer enquanto processamos alguma mensagem e não quisermos que ela seja reprocessada, o código seria semelhante a:
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Nome da fila com falha: " . $package['queue'] . "\n";
// Quando ocorre um erro fatal na fila
if ($exception->getMessage() === 'Some Fatal error') {
// Configurar o número máximo de tentativas de reconsumo para 0
$package['max_attempts'] = 0;
}
// Retornar a estrutura de dados `$package` modificada
return $package;
});
Enviar mensagens para a fila em um ambiente que não é Workerman
Às vezes, alguns projetos são executados em ambientes como apache ou php-fpm, e não podem usar o projeto workerman/redis-queue. Abaixo, segue uma função de exemplo para enviar mensagens.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; //Anteriormente, na versão 1.0.5, era redis-queue-waiting
$queue_delay = '{redis-queue}-delayed';//Anteriormente, na versão 1.0.5, era redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Onde, o parâmetro $redis é uma instância do redis. Por exemplo, o uso da extensão redis seria semelhante ao seguinte:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);