workerman/redis-queue

Coda di messaggi basata su Redis, supporta il trattamento ritardato dei messaggi.

Indirizzo del progetto:

https://github.com/walkor/redis-queue

Installazione:

composer require workerman/redis-queue

Esempio

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // Iscrizione
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // Iscrizione
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // Callback in caso di errore di consumo (opzionale)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "Coda " . $package['queue'] . " consumo fallito\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // Inviare messaggi alla coda a intervalli regolari
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

Crea un'istanza

  • $address simile a redis://ip:6379, deve iniziare con redis.

  • $options include le seguenti opzioni:

    • auth: informazioni di autenticazione, predefinito ''
    • db: db, predefinito 0
    • max_attempts: numero di tentativi di riprovare in caso di errore di consumo, predefinito 5
    • retry_seconds: intervallo di tempo per il ripristino, in secondi. Predefinito 5

L'errore di consumo si riferisce a un'eccezione Exception o Error sollevata dal business. Dopo un errore di consumo, il messaggio sarà inserito nella coda di ritardo in attesa di riprovare, il numero di tentativi è controllato da max_attempts, l'intervallo di ripristino è controllato congiuntamente da retry_seconds e max_attempts. Ad esempio, se max_attempts è 5 e retry_seconds è 10, il primo intervallo di ripristino sarà 1*10 secondi, il secondo intervallo di ripristino sarà 2*10 secondi, il terzo intervallo di ripristino sarà 3*10 secondi, e così via fino a 5 tentativi. Se supera il numero di tentativi impostato da max_attempts, il messaggio sarà inserito nella coda di fallimento con chiave {redis-queue}-failed (prima della versione 1.0.5 era redis-queue-failed)


send(String $queue, Mixed $data, [int $dely=0])

Invia un messaggio alla coda

  • $queue nome della coda, di tipo String
  • $data messaggio specifico da pubblicare, può essere un array o una stringa, di tipo Mixed
  • $dely tempo di consumo ritardato, in secondi, predefinito 0, di tipo Int

subscribe(mixed $queue, callable $callback)

Iscriviti a una coda o a più code

  • $queue nome della coda, può essere una stringa o un array contenente più nomi di code
  • $callback funzione di callback, formato function (Mixed $data), dove $data è il $data di send($queue, $data).

unsubscribe(mixed $queue)

Annulla l'iscrizione

  • $queue nome della coda o array contenente più nomi di code

onConsumeFailure(callable $callback)

Attivato in caso di errore di consumo

  • $callback - function (\Throwable $exception, array $package), $package è la struttura dati interna della coda, contiene informazioni come data, queue, attempts, max_attempts ecc.

Supporta la modifica dei valori della struttura dati interna $package, è sufficiente restituire il $package modificato. Ad esempio, quando un messaggio genera un certo errore e non si desidera riprovare, il codice è simile a

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "Nome della coda che ha fallito:" . $package['queue'] . "\n";
    // Quando si verifica un certo errore fatale nella coda
    if ($exception->getMessage() === 'Some Fatal error') {
        // Imposta il numero massimo di tentativi per questo messaggio a 0
        $package['max_attempts'] = 0;
    }
    // Restituisce la struttura dati `$package` modificata
    return $package;
});

Inviare messaggi alla coda in un ambiente non workerman

A volte alcuni progetti vengono eseguiti in un ambiente apache o php-fpm, dove non è possibile utilizzare il progetto workerman/redis-queue, si può fare riferimento alla seguente funzione per realizzare l'invio

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; //prima della versione 1.0.5 era redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed';//prima della versione 1.0.5 era redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Dove il parametro $redis è un'istanza di redis. Ad esempio, l'uso dell'estensione redis è simile al seguente:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);