workerman/redis-queue

Redisをベースとしたメッセージキューで、メッセージの遅延処理をサポートしています。

プロジェクトのアドレス:

https://github.com/walkor/redis-queue

インストール:

composer require workerman/redis-queue

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
   // 購読
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
   // 購読
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // キューに定期的にメッセージを送信
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

インスタンスを作成します。

  • $address redis://ip:6379のような形式で、redisで始まる必要があります。

  • $options 以下のオプションを含みます:

    • auth: 認証情報、デフォルトは''
    • db: db、デフォルトは0
    • max_attempts: 消費失敗後の再試行回数、デフォルトは5
    • retry_seconds: 再試行間隔、秒単位。デフォルトは5

消費失敗とは、ビジネスが例外ExceptionまたはErrorをスローしたことを指します。消費失敗後、メッセージは再試行するための遅延キューに配置され、再試行回数はmax_attemptsで制御され、再試行間隔はretry_secondsmax_attemptsによって共同で制御されます。 たとえば、max_attemptsが5でretry_secondsが10の場合、1回目の再試行間隔は1*10秒、2回目の再試行間隔は2*10秒、3回目の再試行間隔は3*10秒、以降も同様に5回まで続きます。max_attemptsで設定された再試行回数を超えると、メッセージは{redis-queue}-failed(バージョン1.0.5以前はredis-queue-failed)という失敗キューに配置されます。


send(String $queue, Mixed $data, [int $dely=0])

キューにメッセージを送信します。

  • $queue キュー名、Stringタイプ
  • $data 送信される具体的なメッセージ、配列または文字列であり、Mixedタイプ
  • $dely 遅延消費時間、デフォルトは0、Intタイプ

subscribe(mixed $queue, callable $callback)

キューまたは複数のキューを購読します。

  • $queue キュー名、文字列または複数のキュー名を含む配列が可能です
  • $callback コールバック関数、形式は function (Mixed $data)で、$datasend($queue, $data)$dataに対応します。

unsubscribe(mixed $queue)

購読をキャンセルします。

  • $queue キュー名または複数のキュー名を含む配列

workerman環境以外でキューにメッセージを送信する

いくつかのプロジェクトは、apacheやphp-fpm環境で実行されており、workerman/redis-queueプロジェクトを使用できない場合があります。次の関数を参考にして実装できます。

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting';//バージョン1.0.5以前はredis-queue-waiting
    $queue_delay = '{redis-queue}-delayed';//バージョン1.0.5以前はredis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

ここで、パラメータ$redisはredisのインスタンスです。たとえば、redisの拡張機能の使用方法は以下のようになります:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);