workerman/redis-queue
Coda di messaggi basata su Redis, supporta il trattamento ritardato dei messaggi.
Indirizzo del progetto:
https://github.com/walkor/redis-queue
Installazione:
composer require workerman/redis-queue
Esempio
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Iscrizione
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Iscrizione
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Callback in caso di errore di consumo (opzionale)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Coda " . $package['queue'] . " consumo fallito\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// Inviare messaggi alla coda a intervalli regolari
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
Crea un'istanza
-
$addresssimile aredis://ip:6379, deve iniziare con redis. -
$optionsinclude le seguenti opzioni:auth: informazioni di autenticazione, predefinito ''db: db, predefinito 0max_attempts: numero di tentativi di riprovare in caso di errore di consumo, predefinito 5retry_seconds: intervallo di tempo per il ripristino, in secondi. Predefinito 5
L'errore di consumo si riferisce a un'eccezione
ExceptionoErrorsollevata dal business. Dopo un errore di consumo, il messaggio sarà inserito nella coda di ritardo in attesa di riprovare, il numero di tentativi è controllato damax_attempts, l'intervallo di ripristino è controllato congiuntamente daretry_secondsemax_attempts. Ad esempio, semax_attemptsè 5 eretry_secondsè 10, il primo intervallo di ripristino sarà1*10secondi, il secondo intervallo di ripristino sarà2*10secondi, il terzo intervallo di ripristino sarà3*10secondi, e così via fino a 5 tentativi. Se supera il numero di tentativi impostato damax_attempts, il messaggio sarà inserito nella coda di fallimento con chiave{redis-queue}-failed(prima della versione 1.0.5 eraredis-queue-failed)
send(String $queue, Mixed $data, [int $dely=0])
Invia un messaggio alla coda
$queuenome della coda, di tipoString$datamessaggio specifico da pubblicare, può essere un array o una stringa, di tipoMixed$delytempo di consumo ritardato, in secondi, predefinito 0, di tipoInt
subscribe(mixed $queue, callable $callback)
Iscriviti a una coda o a più code
$queuenome della coda, può essere una stringa o un array contenente più nomi di code$callbackfunzione di callback, formatofunction (Mixed $data), dove$dataè il$datadisend($queue, $data).
unsubscribe(mixed $queue)
Annulla l'iscrizione
$queuenome della coda o array contenente più nomi di code
onConsumeFailure(callable $callback)
Attivato in caso di errore di consumo
$callback-function (\Throwable $exception, array $package),$packageè la struttura dati interna della coda, contiene informazioni comedata,queue,attempts,max_attemptsecc.
Supporta la modifica dei valori della struttura dati interna $package, è sufficiente restituire il $package modificato. Ad esempio, quando un messaggio genera un certo errore e non si desidera riprovare, il codice è simile a
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Nome della coda che ha fallito:" . $package['queue'] . "\n";
// Quando si verifica un certo errore fatale nella coda
if ($exception->getMessage() === 'Some Fatal error') {
// Imposta il numero massimo di tentativi per questo messaggio a 0
$package['max_attempts'] = 0;
}
// Restituisce la struttura dati `$package` modificata
return $package;
});
Inviare messaggi alla coda in un ambiente non workerman
A volte alcuni progetti vengono eseguiti in un ambiente apache o php-fpm, dove non è possibile utilizzare il progetto workerman/redis-queue, si può fare riferimento alla seguente funzione per realizzare l'invio
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; //prima della versione 1.0.5 era redis-queue-waiting
$queue_delay = '{redis-queue}-delayed';//prima della versione 1.0.5 era redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Dove il parametro $redis è un'istanza di redis. Ad esempio, l'uso dell'estensione redis è simile al seguente:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);