workerman/redis-queue

Une file d'attente de messages basée sur Redis, prenant en charge le traitement des messages avec un délai.

Adresse du projet :

https://github.com/walkor/redis-queue

Installation :

composer require workerman/redis-queue

Exemples

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // Abonnement
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // Abonnement
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // Callback déclenché en cas d'échec de la consommation (optionnel)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "Échec de la consommation de la file d'attente " . $package['queue'] . "\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // Envoi de messages à la file d'attente à intervalles réguliers
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

Créer une instance

  • $address De la forme redis://ip:6379, doit commencer par redis.

  • $options Inclut les options suivantes :

    • auth: Informations d'authentification, par défaut ''
    • db: db, par défaut 0
    • max_attempts: Nombre de tentatives de réessai en cas d'échec de la consommation, par défaut 5
    • retry_seconds: Intervalle de temps de réessai, en secondes. Par défaut 5

Un échec de consommation signifie qu'une exception Exception ou Error est levée par l'application. Après un échec, le message sera placé dans une file d'attente de délai d'attente pour réessayer, le nombre de tentatives est contrôlé par max_attempts, et l'intervalle de réessai est contrôlé par retry_seconds et max_attempts. Par exemple, si max_attempts est 5 et retry_seconds est 10, l'intervalle de la première tentative sera de 1*10 secondes, le deuxième intervalle sera de 2*10 secondes, le troisième de 3*10 secondes, et ainsi de suite jusqu'à 5 tentatives. Si le nombre de tentatives dépasse max_attempts, le message sera placé dans une file d'attente d'échec avec la clé {redis-queue}-failed (avant la version 1.0.5, c'était redis-queue-failed).


send(String $queue, Mixed $data, [int $dely=0])

Envoyer un message à la file d'attente

  • $queue Nom de la file d'attente, de type String
  • $data Message spécifique à publier, pouvant être un tableau ou une chaîne, de type Mixed
  • $dely Temps de consommation retardée, en secondes, par défaut 0, de type Int

subscribe(mixed $queue, callable $callback)

S'abonner à une ou plusieurs files d'attente

  • $queue Nom de la file d'attente, pouvant être une chaîne ou un tableau contenant plusieurs noms de files d'attente
  • $callback Fonction de rappel, format function (Mixed $data), où $data est le $data de send($queue, $data).

unsubscribe(mixed $queue)

Se désabonner

  • $queue Nom de la file d'attente ou tableau contenant plusieurs noms de files d'attente

onConsumeFailure(callable $callback)

Déclenché en cas d'échec de la consommation

  • $callback - function (\Throwable $exception, array $package), $package est la structure de données interne de la file d'attente, contenant des informations telles que data, queue, attempts, max_attempts, etc.

Il est possible de modifier les valeurs de la structure de données interne $package, il suffit de retourner le $package modifié. Par exemple, si un message échoue pour une certaine raison et qu'il ne devrait pas être réessayé, le code serait similaire à

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "Nom de la file d'attente échouée:" . $package['queue'] . "\n";
    // Lorsque la file d'attente rencontre une certaine erreur fatale
    if ($exception->getMessage() === 'Some Fatal error') {
        // Mettre le nombre maximal de réessais pour ce message à 0
        $package['max_attempts'] = 0;
    }
    // Retourner la structure de données `$package` modifiée
    return $package;
});

Envoyer des messages à la file d'attente dans un environnement non-workerman

Parfois, certains projets s'exécutent dans un environnement apache ou php-fpm et ne peuvent pas utiliser le projet workerman/redis-queue. Vous pouvez vous référer à la fonction suivante pour implémenter l'envoi

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; // avant la version 1.0.5, c'était redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed'; // avant la version 1.0.5, c'était redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Dans ce cas, le paramètre $redis est une instance de redis. Par exemple, l'utilisation avec l'extension redis ressemble à ceci :

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);