workerman/redis-queue
Dựa trên Redis, hàng đợi tin nhắn hỗ trợ xử lý tin nhắn trễ.
Địa chỉ dự án:
https://github.com/walkor/redis-queue
Cài đặt:
composer require workerman/redis-queue
Ví dụ
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Đăng ký
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Đăng ký
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Callback được kích hoạt khi tiêu thụ thất bại (tùy chọn)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Hàng đợi " . $package['queue'] . " tiêu thụ thất bại\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// Gửi tin nhắn đến hàng đợi theo định kỳ
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
Tạo một thể hiện
-
$addresstương tự nhưredis://ip:6379, phải bắt đầu bằng redis. -
$optionsbao gồm các tùy chọn sau:auth: Thông tin xác thực, mặc định là ''db: db, mặc định là 0max_attempts: Số lần thử lại sau khi tiêu thụ thất bại, mặc định là 5retry_seconds: Thời gian tạm dừng giữa các lần thử lại, tính bằng giây. Mặc định là 5
Tiêu thụ thất bại có nghĩa là ngữ cảnh kinh doanh ném ra ngoại lệ
ExceptionhoặcError. Sau khi tiêu thụ thất bại, tin nhắn sẽ được đặt vào hàng đợi trễ để chờ thử lại, số lần thử lại được kiểm soát bởimax_attempts, và khoảng thời gian thử lại được điều chỉnh bởiretry_secondsvàmax_attempts. Ví dụ: nếumax_attemptslà 5 vàretry_secondslà 10, khoảng thời gian thử lại lần thứ nhất là1*10giây, lần thứ hai là2*10giây, lần thứ ba là3*10giây, và cứ như vậy cho đến khi thử lại 5 lần. Nếu vượt quá số lần thử lại được thiết lậpmax_attempts, tin nhắn sẽ được đưa vào hàng đợi thất bại với key là{redis-queue}-failed(trước phiên bản 1.0.5 làredis-queue-failed).
send(String $queue, Mixed $data, [int $dely=0])
Gửi một tin nhắn đến hàng đợi
$queueTên hàng đợi, kiểuString$dataTin nhắn cụ thể đã phát hành, có thể là mảng hoặc chuỗi, kiểuMixed$delyThời gian tiêu thụ trễ, tính bằng giây, mặc định là 0, kiểuInt
subscribe(mixed $queue, callable $callback)
Đăng ký một hàng đợi hoặc nhiều hàng đợi
$queueTên hàng đợi, có thể là chuỗi hoặc mảng chứa nhiều tên hàng đợi$callbackHàm callback, định dạng làfunction (Mixed $data), trong đó$datalàsend($queue, $data)中的$data.
unsubscribe(mixed $queue)
Hủy đăng ký
$queueTên hàng đợi hoặc mảng chứa nhiều tên hàng đợi
onConsumeFailure(callable $callback)
Kích hoạt khi tiêu thụ thất bại
$callback-function (\Throwable $exception, array $package),$packagelà cấu trúc dữ liệu bên trong của hàng đợi, chứa thông tin vềdata,queue,attempts,max_attempts, v.v.
Hỗ trợ thay đổi giá trị của cấu trúc dữ liệu bên trong $package, chỉ cần return $package đã được thay đổi. Ví dụ khi một tin nhắn xảy ra lỗi nào đó và không muốn thử lại, code sẽ tương tự như sau
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Tên hàng đợi thất bại:" . $package['queue'] . "\n";
// Khi hàng đợi gặp một lỗi nghiêm trọng nào đó
if ($exception->getMessage() === 'Some Fatal error') {
// Đặt số lần thử lại tối đa cho tin nhắn này thành 0
$package['max_attempts'] = 0;
}
// Trả về cấu trúc dữ liệu `$package` đã được sửa đổi
return $package;
});
Gửi tin nhắn đến hàng đợi trong môi trường không phải workerman
Đôi khi một số dự án chạy trên môi trường apache hoặc php-fpm, không thể sử dụng project workerman/redis-queue, có thể tham khảo hàm dưới đây để thực hiện việc gửi
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; // Trước phiên bản 1.0.5 là redis-queue-waiting
$queue_delay = '{redis-queue}-delayed'; // Trước phiên bản 1.0.5 là redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Trong đó, tham số $redis là thể hiện redis. Ví dụ sử dụng redis extension tương tự như sau:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);