workerman/redis-queue
Eine auf Redis basierende Nachrichtenwarteschlange mit Unterstützung für die Verarbeitung verzögerter Nachrichten.
Projektadresse:
https://github.com/walkor/redis-queue
Installation:
composer require workerman/redis-queue
Beispiel
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Abonnement
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Abonnement
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Zeitgesteuertes Senden von Nachrichten an die Warteschlange
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
__construct (string $address, [array $options])
Instanz erstellen
-
$address
ähnlich wieredis://ip:6379
, muss mit redis beginnen. -
$options
beinhaltet folgende Optionen:auth
: Authentifizierungsinformationen, standardmäßig ''db
: Datenbank, standardmäßig 0max_attempts
: Anzahl der Wiederholungsversuche nach fehlgeschlagener Verarbeitung, standardmäßig 5retry_seconds
: Wiederholungsintervall in Sekunden, standardmäßig 5
Ein fehlgeschlagener Verarbeitungsversuch liegt vor, wenn die Anwendung eine Ausnahme
Exception
oder einenError
wirft. Nach einem fehlgeschlagenen Verarbeitungsversuch wird die Nachricht in die verzögerte Warteschlange verschoben und wartet auf einen erneuten Versuch. Die Anzahl der Versuche wird durchmax_attempts
gesteuert, während das Intervall für die Wiederholung vonretry_seconds
undmax_attempts
gemeinsam gesteuert wird. Zum Beispiel, bei einer Einstellung vonmax_attempts
auf 5 undretry_seconds
auf 10, beträgt das Intervall für den ersten Wiederholungsversuch1*10
Sekunden, für den zweiten Wiederholungsversuch2*10
Sekunden, und so weiter, bis zum fünften Wiederholungsversuch. Wenn die Anzahl der Wiederholungsversuche die inmax_attempts
festgelegte Anzahl überschreitet, wird die Nachricht in die fehlgeschlagene Warteschlange mit dem Key{redis-queue}-failed
(vor Version 1.0.5{redis-queue-failed}
) verschoben.
send(String $queue, Mixed $data, [int $dely=0])
Sendet eine Nachricht an die Warteschlange
$queue
Warteschlangenname, vom TypString
$data
Die zu veröffentlichende Nachricht, kann ein Array oder ein String sein, vom TypMixed
$dely
Verzögerungszeit für die Verarbeitung, standardmäßig 0, vom TypInt
subscribe(mixed $queue, callable $callback)
Abonniert eine oder mehrere Warteschlangen
$queue
Warteschlangenname, kann ein String oder ein Array, das mehrere Warteschlangennamen enthält, sein.$callback
Rückruffunktion im Formatfunction (Mixed $data)
, wobei$data
die$data
aussend($queue, $data)
ist.
unsubscribe(mixed $queue)
Beendet das Abonnement
$queue
Warteschlangenname oder ein Array, das mehrere Warteschlangennamen enthält
Senden von Nachrichten an die Warteschlange in einer nicht-Workerman-Umgebung
Manchmal laufen einige Projekte in einer Apache- oder PHP-FPM-Umgebung und können das Workerman/Redis-Queue-Projekt nicht verwenden. In solchen Fällen kann die nachfolgende Funktion verwendet werden, um Nachrichten zu senden.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; // Vor Version 1.0.5 war es redis-queue-waiting
$queue_delay = '{redis-queue}-delayed'; // Vor Version 1.0.5 war es redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Dabei steht der Parameter $redis
für die Redis-Instanz. Beispielhafte Verwendung mit der Redis-Erweiterung:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);