workerman/redis-queue
Eine auf Redis basierende Nachrichtenwarteschlange, die die Verarbeitung von Nachrichten mit Verzögerung unterstützt.
Projektadresse:
https://github.com/walkor/redis-queue
Installation:
composer require workerman/redis-queue
Beispiel
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Abonnieren
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Abonnieren
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Callback, das bei Verbrauchsfehlern ausgelöst wird (optional)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Warteschlange " . $package['queue'] . " Verbrauchsfehler\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// Regelmäßig Nachrichten in die Warteschlange senden
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
Instanz erstellen
-
$addressähnlich wieredis://ip:6379, muss mit redis beginnen. -
$optionsumfasst die folgenden Optionen:auth: Authentifizierungsinformationen, standardmäßig ''db: db, standardmäßig 0max_attempts: Anzahl der Wiederholungsversuche bei Verbrauchsfehlern, standardmäßig 5retry_seconds: Zeitintervall für Wiederholungen, in Sekunden. Standardwert 5
Ein Verbrauchsfehler bedeutet, dass eine Ausnahme
ExceptionoderErrorim Geschäftsprozess ausgelöst wird. Bei einem Verbrauchsfehler wird die Nachricht in die verzögerte Warteschlange gelegt und wartet auf eine Wiederholung. Die Anzahl der Wiederholungen wird durchmax_attemptsgesteuert, das Wiederholungsintervall wird durchretry_secondsundmax_attemptsgemeinsam gesteuert. Zum Beispiel, wennmax_attempts5 undretry_seconds10 beträgt, beträgt das Zeitintervall für den 1. Wiederholungsversuch1*10Sekunden, für den 2. Wiederholungsversuch2*10Sekunden, für den 3. Wiederholungsversuch3*10Sekunden und so weiter, bis 5 Wiederholungen erreicht sind. Wenn die festgelegte Anzahl der Wiederholungen vonmax_attemptsüberschritten wird, wird die Nachricht in die Fehlerwarteschlange mit dem Schlüssel{redis-queue}-failed(vor Version 1.0.5 war esredis-queue-failed) gelegt.
send(String $queue, Mixed $data, [int $dely=0])
Eine Nachricht an die Warteschlange senden
$queueWarteschlangenname,String-Typ$dataDie spezifische Nachricht, die veröffentlicht wird, kann ein Array oder ein String sein,Mixed-Typ$delyVerzögerte Verbrauchszeit, in Sekunden, standardmäßig 0,Int-Typ
subscribe(mixed $queue, callable $callback)
Ein oder mehrere Warteschlangen abonnieren
$queueWarteschlangenname, kann ein String oder ein Array mit mehreren Warteschlangenamen sein$callbackCallback-Funktion, Formatfunction (Mixed $data), wobei$datadie Nachricht ist, die insend($queue, $data)gesendet wurde.
unsubscribe(mixed $queue)
Abonnieren aufheben
$queueWarteschlangenname oder ein Array mit mehreren Warteschlangenamen
onConsumeFailure(callable $callback)
Wird bei Verbrauchsfehlern ausgelöst
$callback-function (\Throwable $exception, array $package), wobei$packagedie interne Datenstruktur der Warteschlange ist und Informationen wiedata,queue,attempts,max_attemptsusw. enthält.
Es ist möglich, die Werte der internen Datenstruktur $package zu ändern; Sie müssen nur das geänderte $package zurückgeben. Wenn bei einer bestimmten Nachricht ein Fehler auftritt und Sie keine weiteren Versuche wünschen, könnte der Code wie folgt aussehen:
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Fehlerhafte Warteschlange:" . $package['queue'] . "\n";
// Wenn in der Warteschlange ein schwerwiegender Fehler auftritt
if ($exception->getMessage() === 'Some Fatal error') {
// Setzen Sie die maximale Wiederholungsanzahl für diese Nachricht auf 0
$package['max_attempts'] = 0;
}
// Geben Sie die modifizierte `$package` Datenstruktur zurück
return $package;
});
Senden von Nachrichten an die Warteschlange in einer Nicht-Workerman-Umgebung
Manchmal laufen bestimmte Projekte in einer Apache- oder PHP-FPM-Umgebung, die das workerman/redis-queue-Projekt nicht verwenden kann. Sie können die folgende Funktion als Referenz zum Senden verwenden:
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; // Vor Version 1.0.5 war es redis-queue-waiting
$queue_delay = '{redis-queue}-delayed'; // Vor Version 1.0.5 war es redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Dabei ist der Parameter $redis eine Redis-Instanz. Beispielsweise könnte die Verwendung der Redis-Erweiterung wie folgt aussehen:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);