workerman/redis-queue
基於Redis的消息隊列,支持消息延遲處理。
專案地址:
https://github.com/walkor/redis-queue
安裝:
composer require workerman/redis-queue
示例
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// 訂閱
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// 訂閱
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// 消費失敗觸發的回調(可選)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "隊列 " . $package['queue'] . " 消費失敗\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// 定時向隊列發送消息
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
創建實例
-
$address類似redis://ip:6379,必須以redis開頭. -
$options包括以下選項:auth: 鑑權資訊,預設 ''db: db,預設 0max_attempts: 消費失敗後重試次數,預設5retry_seconds: 重試時間間隔,單位秒。預設5
消費失敗是指業務拋出異常
Exception或者Error。消費失敗後消息會放到延遲隊列等待重試,重試次數由max_attempts控制,重試間隔由retry_seconds和max_attempts共同控制。比如max_attempts為5,retry_seconds為10,第1次重試間隔為1*10秒,第2次重試時間間隔為2*10秒,第3次重試時間間隔為3*10秒,以此類推直到重試5次。如果超過了max_attempts設置的重試次數,則消息放入key為{redis-queue}-failed(1.0.5版本之前為redis-queue-failed)的失敗隊列。
send(String $queue, Mixed $data, [int $dely=0])
向隊列發送一條消息
$queue隊列名,String類型$data發佈的具體消息,可以是數組或者字串,Mixed類型$dely延遲消費時間,單位秒,預設0,Int類型
subscribe(mixed $queue, callable $callback)
訂閱一個隊列或者多個隊列
$queue隊列名,可以是字串或者包含多個隊列名的數組$callback回調函數,格式為function (Mixed $data),其中$data就是send($queue, $data)中的$data.
unsubscribe(mixed $queue)
取消訂閱
$queue隊列名或者包含多個隊列名的數組
onConsumeFailure(callable $callback)
消費失敗時觸發
$callback-function (\Throwable $exception, array $package),$package是隊列內部數據結構,包含了dataqueueattemptsmax_attempts等資訊
支持更改內部數據結構$package的值,只需要將更改後的$package return 即可。例如當某個消息發生某種錯誤時不希望再次重試,代碼類似
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "失敗的隊列名:" . $package['queue'] . "\n";
// 當隊列發生某種致命錯誤時
if ($exception->getMessage() === 'Some Fatal error') {
// 將此消息的最大重試次數設置為0
$package['max_attempts'] = 0;
}
// 返回修改後的`$package`數據結構
return $package;
});
在非workerman環境向隊列發送消息
有時候一些專案運行在apache或者php-fpm環境,無法使用workerman/redis-queue專案,可以參考如下函數實現發送
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; //1.0.5版本之前為redis-queue-waiting
$queue_delay = '{redis-queue}-delayed';//1.0.5版本之前為redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
其中,參數$redis為redis實例。例如redis擴展用法類似如下:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);