workerman/redis-queue

Dựa trên Redis, hàng đợi tin nhắn hỗ trợ xử lý tin nhắn trễ.

Địa chỉ dự án:

https://github.com/walkor/redis-queue

Cài đặt:

composer require workerman/redis-queue

Ví dụ

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // Đăng ký
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // Đăng ký
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // Callback được kích hoạt khi tiêu thụ thất bại (tùy chọn)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "Hàng đợi " . $package['queue'] . " tiêu thụ thất bại\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // Gửi tin nhắn đến hàng đợi theo định kỳ
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

Tạo một thể hiện

  • $address tương tự như redis://ip:6379, phải bắt đầu bằng redis.

  • $options bao gồm các tùy chọn sau:

    • auth: Thông tin xác thực, mặc định là ''
    • db: db, mặc định là 0
    • max_attempts: Số lần thử lại sau khi tiêu thụ thất bại, mặc định là 5
    • retry_seconds: Thời gian tạm dừng giữa các lần thử lại, tính bằng giây. Mặc định là 5

Tiêu thụ thất bại có nghĩa là ngữ cảnh kinh doanh ném ra ngoại lệ Exception hoặc Error. Sau khi tiêu thụ thất bại, tin nhắn sẽ được đặt vào hàng đợi trễ để chờ thử lại, số lần thử lại được kiểm soát bởi max_attempts, và khoảng thời gian thử lại được điều chỉnh bởi retry_secondsmax_attempts. Ví dụ: nếu max_attempts là 5 và retry_seconds là 10, khoảng thời gian thử lại lần thứ nhất là 1*10 giây, lần thứ hai là 2*10 giây, lần thứ ba là 3*10 giây, và cứ như vậy cho đến khi thử lại 5 lần. Nếu vượt quá số lần thử lại được thiết lập max_attempts, tin nhắn sẽ được đưa vào hàng đợi thất bại với key là {redis-queue}-failed (trước phiên bản 1.0.5 là redis-queue-failed).


send(String $queue, Mixed $data, [int $dely=0])

Gửi một tin nhắn đến hàng đợi

  • $queue Tên hàng đợi, kiểu String
  • $data Tin nhắn cụ thể đã phát hành, có thể là mảng hoặc chuỗi, kiểu Mixed
  • $dely Thời gian tiêu thụ trễ, tính bằng giây, mặc định là 0, kiểu Int

subscribe(mixed $queue, callable $callback)

Đăng ký một hàng đợi hoặc nhiều hàng đợi

  • $queue Tên hàng đợi, có thể là chuỗi hoặc mảng chứa nhiều tên hàng đợi
  • $callback Hàm callback, định dạng là function (Mixed $data) , trong đó $datasend($queue, $data)中的$data.

unsubscribe(mixed $queue)

Hủy đăng ký

  • $queue Tên hàng đợi hoặc mảng chứa nhiều tên hàng đợi

onConsumeFailure(callable $callback)

Kích hoạt khi tiêu thụ thất bại

  • $callback - function (\Throwable $exception, array $package), $package là cấu trúc dữ liệu bên trong của hàng đợi, chứa thông tin về data, queue, attempts, max_attempts, v.v.

Hỗ trợ thay đổi giá trị của cấu trúc dữ liệu bên trong $package, chỉ cần return $package đã được thay đổi. Ví dụ khi một tin nhắn xảy ra lỗi nào đó và không muốn thử lại, code sẽ tương tự như sau

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "Tên hàng đợi thất bại:" . $package['queue'] . "\n";
    // Khi hàng đợi gặp một lỗi nghiêm trọng nào đó
    if ($exception->getMessage() === 'Some Fatal error') {
        // Đặt số lần thử lại tối đa cho tin nhắn này thành 0
        $package['max_attempts'] = 0;
    }
    // Trả về cấu trúc dữ liệu `$package` đã được sửa đổi
    return $package;
});

Gửi tin nhắn đến hàng đợi trong môi trường không phải workerman

Đôi khi một số dự án chạy trên môi trường apache hoặc php-fpm, không thể sử dụng project workerman/redis-queue, có thể tham khảo hàm dưới đây để thực hiện việc gửi

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; // Trước phiên bản 1.0.5 là redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed'; // Trước phiên bản 1.0.5 là redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Trong đó, tham số $redis là thể hiện redis. Ví dụ sử dụng redis extension tương tự như sau:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);