workerman/redis-queue
基于Redis的消息队列,支持消息延迟处理。
项目地址:
https://github.com/walkor/redis-queue
安装:
composer require workerman/redis-queue
示例
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// 订阅
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// 订阅
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// 消费失败触发的回调(可选)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "队列 " . $package['queue'] . " 消费失败\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// 定时向队列发送消息
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
สร้างตัวอย่าง
-
$addressคล้ายกับredis://ip:6379ซึ่งต้องขึ้นต้นด้วย redis. -
$optionsรวมถึงตัวเลือกต่อไปนี้:auth: ข้อมูลการตรวจสอบสิทธิ์, เริ่มต้นคือ ''db: db, เริ่มต้นคือ 0max_attempts: จำนวนครั้งในการลองใหม่เมื่อการบริโภคล้มเหลว, เริ่มต้นคือ 5retry_seconds: ระยะเวลาในการลองใหม่, หน่วยเป็นวินาที เริ่มต้นคือ 5
การบริโภคล้มเหลวหมายถึงการที่ธุรกิจสร้างข้อยกเว้น
ExceptionหรือError. เมื่อการบริโภคล้มเหลว ข้อความจะถูกส่งไปยังคิวการล่าช้าเพื่อรอการลองใหม่ โดยจำนวนครั้งในการลองใหม่ถูกควบคุมโดยmax_attemptsและระยะระหว่างการลองใหม่ถูกควบคุมร่วมกันโดยretry_secondsและmax_attempts. เช่นmax_attemptsเป็น 5 และretry_secondsเป็น 10 ระยะเวลาการลองใหม่ครั้งที่ 1 จะเป็น1*10วินาที, ครั้งที่ 2 จะเป็น2*10วินาที, ครั้งที่ 3 จะเป็น3*10วินาที, อีกเช่นนี้ไปจนกว่าจะมีการลองใหม่ 5 ครั้ง. หากเกินmax_attemptsที่ตั้งไว้ สำหรับจำนวนครั้งในการลองใหม่ ข้อความจะถูกจัดเก็บในคิวที่ล้มเหลวซึ่งชื่อว่า{redis-queue}-failed(ก่อนเวอร์ชั่น 1.0.5 จะเป็นredis-queue-failed)
send(String $queue, Mixed $data, [int $dely=0])
ส่งข้อความไปยังคิวหนึ่งรายการ
$queueชื่อคิว, ประเภทString$dataข้อความที่เผยแพร่, สามารถเป็นอาเรย์หรือสตริง, ประเภทMixed$delyระยะเวลาในการบริโภคล่าช้า, หน่วยเป็นวินาที, เริ่มต้นคือ 0, ประเภทInt
subscribe(mixed $queue, callable $callback)
สมัครรับข้อมูลจากคิวหนึ่งหรือหลายคิว
$queueชื่อคิว, สามารถเป็นสตริงหรืออาเรย์ที่ประกอบด้วยชื่อคิวหลายชื่อ$callbackฟังก์ชันการเรียกกลับ, รูปแบบคือfunction (Mixed $data)ซึ่งในนั้น$dataคือ$dataในการเรียกsend($queue, $data).
unsubscribe(mixed $queue)
ยกเลิกการสมัครรับข้อมูล
$queueชื่อคิวหรืออาเรย์ที่ประกอบด้วยชื่อคิวหลายชื่อ
onConsumeFailure(callable $callback)
ถูกเรียกใช้เมื่อการบริโภคล้มเหลว
$callback-function (\Throwable $exception, array $package),$packageคือโครงสร้างข้อมูลภายในของคิวซึ่งประกอบด้วยข้อมูลdata,queue,attempts,max_attemptsเป็นต้น
สามารถเปลี่ยนค่าในโครงสร้างข้อมูลภายใน $package ได้ เพียงแค่ทำการส่งค่าที่เปลี่ยนแปลงกลับไป เช่นเมื่อข้อความเกิดข้อผิดพลาดบางอย่างและไม่ต้องการลองใหม่อีกครั้ง โค้ดจะมีลักษณะเช่นนี้
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "ชื่อคิวที่ล้มเหลว:" . $package['queue'] . "\n";
// เมื่อคิวเกิดข้อผิดพลาดร้ายแรงบางอย่าง
if ($exception->getMessage() === 'Some Fatal error') {
// ตั้งค่าการลองใหม่สูงสุดสำหรับข้อความนี้เป็น 0
$package['max_attempts'] = 0;
}
// ส่งกลับโครงสร้างข้อมูล `$package` ที่แก้ไขแล้ว
return $package;
});
在非workerman环境向队列发送消息
บางครั้งโปรเจ็กต์บางโปรเจ็กต์ทำงานในสภาพแวดล้อมของ apache หรือ php-fpm ซึ่งไม่สามารถใช้ workerman/redis-queue ได้ คุณสามารถดูฟังก์ชันด้านล่างเพื่อส่งข้อความ
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; // ก่อนเวอร์ชั่น 1.0.5 จะเป็น redis-queue-waiting
$queue_delay = '{redis-queue}-delayed';// ก่อนเวอร์ชั่น 1.0.5 จะเป็น redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
ในที่นี้, พารามิเตอร์ $redis คืออินสแตนซ์ redis เช่นเดียวกับการใช้ redis extension มีลักษณะเหมือนเช่นนี้:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);