workerman/redis-queue
Redisに基づいたメッセージキューで、メッセージの遅延処理をサポートしています。
プロジェクトアドレス:
https://github.com/walkor/redis-queue
インストール:
composer require workerman/redis-queue
例
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// 購読
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// 購読
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// 消費失敗時にトリガーされるコールバック(オプション)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "キュー " . $package['queue'] . " の消費に失敗しました\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// 定期的にキューにメッセージを送信
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
インスタンスを作成する
-
$addressredis://ip:6379の形式で、必ずredisで始まる必要があります。 -
$options以下のオプションを含む:auth: 認証情報、デフォルトは ''db: データベース、デフォルトは 0max_attempts: 消費失敗後の再試行回数、デフォルトは5retry_seconds: 再試行の時間間隔、単位は秒。デフォルトは5
消費失敗は、ビジネスで例外
ExceptionもしくはErrorが発生したことを指します。消費失敗後、メッセージは遅延キューに放置され、再試行されます。再試行回数はmax_attemptsにより制御され、再試行間隔はretry_secondsとmax_attemptsによって共同で制御されます。例えば、max_attemptsが5、retry_secondsが10の場合、第1回の再試行間隔は1*10秒、第2回は2*10秒、第3回は3*10秒と続きます。5回の再試行に達するまで続きます。max_attemptsを超えると、メッセージはキー{redis-queue}-failedに放置されます(1.0.5バージョン以前はredis-queue-failed)。
send(String $queue, Mixed $data, [int $dely=0])
キューにメッセージを送信する
$queueキュー名、String型$data配信される具体的なメッセージ、配列または文字列、Mixed型$dely消費の遅延時間、単位は秒、デフォルトは0、Int型
subscribe(mixed $queue, callable $callback)
1つ以上のキューを購読する
$queueキュー名、文字列または複数のキュー名を含む配列$callbackコールバック関数、形式はfunction (Mixed $data)で、$dataはsend($queue, $data)の中の$dataです。
unsubscribe(mixed $queue)
購読をキャンセルする
$queueキュー名または複数のキュー名を含む配列
onConsumeFailure(callable $callback)
消費失敗時にトリガーされる
$callback-function (\Throwable $exception, array $package)、$packageはキュー内部のデータ構造で、dataqueueattemptsmax_attemptsなどの情報を含みます
内部データ構造の $package の値を変更することもサポートされており、変更後の $package を返すだけです。例えば、特定のメッセージでエラーが発生した場合、再試行を希望しない場合、以下のようにコードを書くことができます。
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "失敗したキュー名:" . $package['queue'] . "\n";
// キューで重大なエラーが発生した場合
if ($exception->getMessage() === 'Some Fatal error') {
// このメッセージの最大再試行回数を0に設定する
$package['max_attempts'] = 0;
}
// 修正された `$package` データ構造を返す
return $package;
});
非workerman環境でキューにメッセージを送信する
場合によっては、いくつかのプロジェクトがapacheまたはphp-fpm環境で実行され、workerman/redis-queueプロジェクトを使用できない場合、以下の関数を参照してメッセージを送信することができます。
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; //1.0.5バージョン以前は redis-queue-waiting
$queue_delay = '{redis-queue}-delayed'; //1.0.5バージョン以前は redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
ここで、引数 $redis はredisインスタンスです。例えば、redis拡張の使用法は以下のようになります:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);