workerman/redis-queue

Eine auf Redis basierende Nachrichtenwarteschlange, die die Verarbeitung von Nachrichten mit Verzögerung unterstützt.

Projektadresse:

https://github.com/walkor/redis-queue

Installation:

composer require workerman/redis-queue

Beispiel

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // Abonnieren
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // Abonnieren
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // Callback, das bei Verbrauchsfehlern ausgelöst wird (optional)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "Warteschlange " . $package['queue'] . " Verbrauchsfehler\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // Regelmäßig Nachrichten in die Warteschlange senden
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

Instanz erstellen

  • $address ähnlich wie redis://ip:6379, muss mit redis beginnen.

  • $options umfasst die folgenden Optionen:

    • auth: Authentifizierungsinformationen, standardmäßig ''
    • db: db, standardmäßig 0
    • max_attempts: Anzahl der Wiederholungsversuche bei Verbrauchsfehlern, standardmäßig 5
    • retry_seconds: Zeitintervall für Wiederholungen, in Sekunden. Standardwert 5

Ein Verbrauchsfehler bedeutet, dass eine Ausnahme Exception oder Error im Geschäftsprozess ausgelöst wird. Bei einem Verbrauchsfehler wird die Nachricht in die verzögerte Warteschlange gelegt und wartet auf eine Wiederholung. Die Anzahl der Wiederholungen wird durch max_attempts gesteuert, das Wiederholungsintervall wird durch retry_seconds und max_attempts gemeinsam gesteuert. Zum Beispiel, wenn max_attempts 5 und retry_seconds 10 beträgt, beträgt das Zeitintervall für den 1. Wiederholungsversuch 1*10 Sekunden, für den 2. Wiederholungsversuch 2*10 Sekunden, für den 3. Wiederholungsversuch 3*10 Sekunden und so weiter, bis 5 Wiederholungen erreicht sind. Wenn die festgelegte Anzahl der Wiederholungen von max_attempts überschritten wird, wird die Nachricht in die Fehlerwarteschlange mit dem Schlüssel {redis-queue}-failed (vor Version 1.0.5 war es redis-queue-failed) gelegt.


send(String $queue, Mixed $data, [int $dely=0])

Eine Nachricht an die Warteschlange senden

  • $queue Warteschlangenname, String-Typ
  • $data Die spezifische Nachricht, die veröffentlicht wird, kann ein Array oder ein String sein, Mixed-Typ
  • $dely Verzögerte Verbrauchszeit, in Sekunden, standardmäßig 0, Int-Typ

subscribe(mixed $queue, callable $callback)

Ein oder mehrere Warteschlangen abonnieren

  • $queue Warteschlangenname, kann ein String oder ein Array mit mehreren Warteschlangenamen sein
  • $callback Callback-Funktion, Format function (Mixed $data), wobei $data die Nachricht ist, die in send($queue, $data) gesendet wurde.

unsubscribe(mixed $queue)

Abonnieren aufheben

  • $queue Warteschlangenname oder ein Array mit mehreren Warteschlangenamen

onConsumeFailure(callable $callback)

Wird bei Verbrauchsfehlern ausgelöst

  • $callback - function (\Throwable $exception, array $package), wobei $package die interne Datenstruktur der Warteschlange ist und Informationen wie data, queue, attempts, max_attempts usw. enthält.

Es ist möglich, die Werte der internen Datenstruktur $package zu ändern; Sie müssen nur das geänderte $package zurückgeben. Wenn bei einer bestimmten Nachricht ein Fehler auftritt und Sie keine weiteren Versuche wünschen, könnte der Code wie folgt aussehen:

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "Fehlerhafte Warteschlange:" . $package['queue'] . "\n";
    // Wenn in der Warteschlange ein schwerwiegender Fehler auftritt
    if ($exception->getMessage() === 'Some Fatal error') {
        // Setzen Sie die maximale Wiederholungsanzahl für diese Nachricht auf 0
        $package['max_attempts'] = 0;
    }
    // Geben Sie die modifizierte `$package` Datenstruktur zurück
    return $package;
});

Senden von Nachrichten an die Warteschlange in einer Nicht-Workerman-Umgebung

Manchmal laufen bestimmte Projekte in einer Apache- oder PHP-FPM-Umgebung, die das workerman/redis-queue-Projekt nicht verwenden kann. Sie können die folgende Funktion als Referenz zum Senden verwenden:

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; // Vor Version 1.0.5 war es redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed'; // Vor Version 1.0.5 war es redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Dabei ist der Parameter $redis eine Redis-Instanz. Beispielsweise könnte die Verwendung der Redis-Erweiterung wie folgt aussehen:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);