workerman/redis-queue

Redisに基づいたメッセージキューで、メッセージの遅延処理をサポートしています。

プロジェクトアドレス:

https://github.com/walkor/redis-queue

インストール:

composer require workerman/redis-queue

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // 購読
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // 購読
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // 消費失敗時にトリガーされるコールバック(オプション)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "キュー " . $package['queue'] . " の消費に失敗しました\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // 定期的にキューにメッセージを送信
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

インスタンスを作成する

  • $address redis://ip:6379 の形式で、必ずredisで始まる必要があります。

  • $options 以下のオプションを含む:

    • auth: 認証情報、デフォルトは ''
    • db: データベース、デフォルトは 0
    • max_attempts: 消費失敗後の再試行回数、デフォルトは5
    • retry_seconds: 再試行の時間間隔、単位は秒。デフォルトは5

消費失敗は、ビジネスで例外 Exception もしくは Error が発生したことを指します。消費失敗後、メッセージは遅延キューに放置され、再試行されます。再試行回数は max_attempts により制御され、再試行間隔は retry_secondsmax_attempts によって共同で制御されます。例えば、max_attempts が5、retry_seconds が10の場合、第1回の再試行間隔は 1*10 秒、第2回は 2*10 秒、第3回は 3*10 秒と続きます。5回の再試行に達するまで続きます。max_attempts を超えると、メッセージはキー {redis-queue}-failed に放置されます(1.0.5バージョン以前は redis-queue-failed )。


send(String $queue, Mixed $data, [int $dely=0])

キューにメッセージを送信する

  • $queue キュー名、String
  • $data 配信される具体的なメッセージ、配列または文字列、Mixed
  • $dely 消費の遅延時間、単位は秒、デフォルトは0、Int

subscribe(mixed $queue, callable $callback)

1つ以上のキューを購読する

  • $queue キュー名、文字列または複数のキュー名を含む配列
  • $callback コールバック関数、形式は function (Mixed $data) で、$datasend($queue, $data) の中の$data です。

unsubscribe(mixed $queue)

購読をキャンセルする

  • $queue キュー名または複数のキュー名を含む配列

onConsumeFailure(callable $callback)

消費失敗時にトリガーされる

  • $callback - function (\Throwable $exception, array $package)$package はキュー内部のデータ構造で、data queue attempts max_attempts などの情報を含みます

内部データ構造の $package の値を変更することもサポートされており、変更後の $package を返すだけです。例えば、特定のメッセージでエラーが発生した場合、再試行を希望しない場合、以下のようにコードを書くことができます。

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "失敗したキュー名:" . $package['queue'] . "\n";
    // キューで重大なエラーが発生した場合
    if ($exception->getMessage() === 'Some Fatal error') {
        // このメッセージの最大再試行回数を0に設定する
        $package['max_attempts'] = 0;
    }
    // 修正された `$package` データ構造を返す
    return $package;
});

非workerman環境でキューにメッセージを送信する

場合によっては、いくつかのプロジェクトがapacheまたはphp-fpm環境で実行され、workerman/redis-queueプロジェクトを使用できない場合、以下の関数を参照してメッセージを送信することができます。

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; //1.0.5バージョン以前は redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed'; //1.0.5バージョン以前は redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

ここで、引数 $redis はredisインスタンスです。例えば、redis拡張の使用法は以下のようになります:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);