workerman/redis-queue
Une file d'attente de messages basée sur Redis, prenant en charge le traitement des messages avec un délai.
Adresse du projet :
https://github.com/walkor/redis-queue
Installation :
composer require workerman/redis-queue
Exemples
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Abonnement
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Abonnement
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Callback déclenché en cas d'échec de la consommation (optionnel)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Échec de la consommation de la file d'attente " . $package['queue'] . "\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// Envoi de messages à la file d'attente à intervalles réguliers
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
Créer une instance
-
$addressDe la formeredis://ip:6379, doit commencer par redis. -
$optionsInclut les options suivantes :auth: Informations d'authentification, par défaut ''db: db, par défaut 0max_attempts: Nombre de tentatives de réessai en cas d'échec de la consommation, par défaut 5retry_seconds: Intervalle de temps de réessai, en secondes. Par défaut 5
Un échec de consommation signifie qu'une exception
ExceptionouErrorest levée par l'application. Après un échec, le message sera placé dans une file d'attente de délai d'attente pour réessayer, le nombre de tentatives est contrôlé parmax_attempts, et l'intervalle de réessai est contrôlé parretry_secondsetmax_attempts. Par exemple, simax_attemptsest 5 etretry_secondsest 10, l'intervalle de la première tentative sera de1*10secondes, le deuxième intervalle sera de2*10secondes, le troisième de3*10secondes, et ainsi de suite jusqu'à 5 tentatives. Si le nombre de tentatives dépassemax_attempts, le message sera placé dans une file d'attente d'échec avec la clé{redis-queue}-failed(avant la version 1.0.5, c'étaitredis-queue-failed).
send(String $queue, Mixed $data, [int $dely=0])
Envoyer un message à la file d'attente
$queueNom de la file d'attente, de typeString$dataMessage spécifique à publier, pouvant être un tableau ou une chaîne, de typeMixed$delyTemps de consommation retardée, en secondes, par défaut 0, de typeInt
subscribe(mixed $queue, callable $callback)
S'abonner à une ou plusieurs files d'attente
$queueNom de la file d'attente, pouvant être une chaîne ou un tableau contenant plusieurs noms de files d'attente$callbackFonction de rappel, formatfunction (Mixed $data), où$dataest le$datadesend($queue, $data).
unsubscribe(mixed $queue)
Se désabonner
$queueNom de la file d'attente ou tableau contenant plusieurs noms de files d'attente
onConsumeFailure(callable $callback)
Déclenché en cas d'échec de la consommation
$callback-function (\Throwable $exception, array $package),$packageest la structure de données interne de la file d'attente, contenant des informations telles quedata,queue,attempts,max_attempts, etc.
Il est possible de modifier les valeurs de la structure de données interne $package, il suffit de retourner le $package modifié. Par exemple, si un message échoue pour une certaine raison et qu'il ne devrait pas être réessayé, le code serait similaire à
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Nom de la file d'attente échouée:" . $package['queue'] . "\n";
// Lorsque la file d'attente rencontre une certaine erreur fatale
if ($exception->getMessage() === 'Some Fatal error') {
// Mettre le nombre maximal de réessais pour ce message à 0
$package['max_attempts'] = 0;
}
// Retourner la structure de données `$package` modifiée
return $package;
});
Envoyer des messages à la file d'attente dans un environnement non-workerman
Parfois, certains projets s'exécutent dans un environnement apache ou php-fpm et ne peuvent pas utiliser le projet workerman/redis-queue. Vous pouvez vous référer à la fonction suivante pour implémenter l'envoi
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; // avant la version 1.0.5, c'était redis-queue-waiting
$queue_delay = '{redis-queue}-delayed'; // avant la version 1.0.5, c'était redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Dans ce cas, le paramètre $redis est une instance de redis. Par exemple, l'utilisation avec l'extension redis ressemble à ceci :
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);