workerman/redis-queue
File d'attente de messages basée sur Redis avec prise en charge du traitement différé des messages.
Adresse du projet :
https://github.com/walkor/redis-queue
Installation :
composer require workerman/redis-queue
Exemple
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Abonnement
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Abonnement
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Envoi périodique de messages à la file d'attente
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
__construct (string $address, [array $options])
Crée une instance
-
$address
de typeredis://ip:6379
, doit commencer par redis. -
$options
comprend les options suivantes :auth
: informations d'authentification, par défaut ''db
: base de données, par défaut 0max_attempts
: nombre de tentatives de réessai après un échec de consommation, par défaut 5retry_seconds
: intervalle de temps entre les réessais, en secondes. Par défaut 5
Un échec de consommation se produit lorsqu'une exception
Exception
ou uneError
est levée dans le traitement du message. Après un échec de consommation, le message est placé dans une file d'attente de traitement différé pour une nouvelle tentative. Le nombre de tentatives de réessai est contrôlé parmax_attempts
, et l'intervalle de réessai est contrôlé parretry_seconds
etmax_attempts
. Par exemple, simax_attempts
est 5 etretry_seconds
est 10, le temps d'attente pour la première tentative de réessai est1*10
secondes, pour la deuxième tentative c'est2*10
secondes, pour la troisième c'est3*10
secondes, et ainsi de suite jusqu'à la cinquième tentative. Si le nombre de tentatives de réessai dépasse la valeur demax_attempts
, le message est placé dans la file d'attente des échecs avec la clé{redis-queue}-failed
(avant la version 1.0.5, elle était{redis-queue-failed}
).
send(String $queue, Mixed $data, [int $dely=0])
Envoie un message à la file d'attente
$queue
: nom de la file d'attente, de typeString
$data
: message spécifique à publier, peut être un tableau ou une chaîne de caractères, de typeMixed
$dely
: délai de consommation différée, en secondes, par défaut 0, de typeInt
subscribe(mixed $queue, callable $callback)
Abonne à une file d'attente ou à plusieurs files d'attente
$queue
: nom de la file d'attente, peut être une chaîne de caractères ou un tableau contenant plusieurs noms de files d'attente$callback
: fonction de rappel, au formatfunction (Mixed $data)
, où$data
représente$data
dusend($queue, $data)
.
unsubscribe(mixed $queue)
Annule l'abonnement
$queue
: nom de la file d'attente ou un tableau contenant plusieurs noms de files d'attente
Envoi de messages à la file d'attente dans un environnement non Workerman
Parfois, certains projets s'exécutent dans un environnement apache ou php-fpm et ne peuvent pas utiliser le projet workerman/redis-queue. Vous pouvez vous référer à la fonction suivante pour mettre en œuvre l'envoi :
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; //avant la version 1.0.5, c'était redis-queue-waiting
$queue_delay = '{redis-queue}-delayed';//avant la version 1.0.5, c'était redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Ici, le paramètre $redis
est une instance de Redis. Par exemple, l'utilisation de l'extension Redis est similaire à ce qui suit :
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);