workerman/redis-queue

基于Redis的消息队列,支持消息延迟处理。

项目地址:

https://github.com/walkor/redis-queue

安装:

composer require workerman/redis-queue

示例

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // 订阅
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // 订阅
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // 消费失败触发的回调(可选)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "队列 " . $package['queue'] . " 消费失败\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // 定时向队列发送消息
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

สร้างตัวอย่าง

  • $address คล้ายกับ redis://ip:6379 ซึ่งต้องขึ้นต้นด้วย redis.

  • $options รวมถึงตัวเลือกต่อไปนี้:

    • auth: ข้อมูลการตรวจสอบสิทธิ์, เริ่มต้นคือ ''
    • db: db, เริ่มต้นคือ 0
    • max_attempts: จำนวนครั้งในการลองใหม่เมื่อการบริโภคล้มเหลว, เริ่มต้นคือ 5
    • retry_seconds: ระยะเวลาในการลองใหม่, หน่วยเป็นวินาที เริ่มต้นคือ 5

การบริโภคล้มเหลวหมายถึงการที่ธุรกิจสร้างข้อยกเว้น Exception หรือ Error. เมื่อการบริโภคล้มเหลว ข้อความจะถูกส่งไปยังคิวการล่าช้าเพื่อรอการลองใหม่ โดยจำนวนครั้งในการลองใหม่ถูกควบคุมโดย max_attempts และระยะระหว่างการลองใหม่ถูกควบคุมร่วมกันโดย retry_seconds และ max_attempts. เช่น max_attempts เป็น 5 และ retry_seconds เป็น 10 ระยะเวลาการลองใหม่ครั้งที่ 1 จะเป็น 1*10 วินาที, ครั้งที่ 2 จะเป็น 2*10 วินาที, ครั้งที่ 3 จะเป็น 3*10 วินาที, อีกเช่นนี้ไปจนกว่าจะมีการลองใหม่ 5 ครั้ง. หากเกิน max_attempts ที่ตั้งไว้ สำหรับจำนวนครั้งในการลองใหม่ ข้อความจะถูกจัดเก็บในคิวที่ล้มเหลวซึ่งชื่อว่า {redis-queue}-failed (ก่อนเวอร์ชั่น 1.0.5 จะเป็น redis-queue-failed)


send(String $queue, Mixed $data, [int $dely=0])

ส่งข้อความไปยังคิวหนึ่งรายการ

  • $queue ชื่อคิว, ประเภท String
  • $data ข้อความที่เผยแพร่, สามารถเป็นอาเรย์หรือสตริง, ประเภท Mixed
  • $dely ระยะเวลาในการบริโภคล่าช้า, หน่วยเป็นวินาที, เริ่มต้นคือ 0, ประเภท Int

subscribe(mixed $queue, callable $callback)

สมัครรับข้อมูลจากคิวหนึ่งหรือหลายคิว

  • $queue ชื่อคิว, สามารถเป็นสตริงหรืออาเรย์ที่ประกอบด้วยชื่อคิวหลายชื่อ
  • $callback ฟังก์ชันการเรียกกลับ, รูปแบบคือ function (Mixed $data) ซึ่งในนั้น $data คือ $data ในการเรียก send($queue, $data).

unsubscribe(mixed $queue)

ยกเลิกการสมัครรับข้อมูล

  • $queue ชื่อคิวหรืออาเรย์ที่ประกอบด้วยชื่อคิวหลายชื่อ

onConsumeFailure(callable $callback)

ถูกเรียกใช้เมื่อการบริโภคล้มเหลว

  • $callback - function (\Throwable $exception, array $package), $package คือโครงสร้างข้อมูลภายในของคิวซึ่งประกอบด้วยข้อมูล data, queue, attempts, max_attempts เป็นต้น

สามารถเปลี่ยนค่าในโครงสร้างข้อมูลภายใน $package ได้ เพียงแค่ทำการส่งค่าที่เปลี่ยนแปลงกลับไป เช่นเมื่อข้อความเกิดข้อผิดพลาดบางอย่างและไม่ต้องการลองใหม่อีกครั้ง โค้ดจะมีลักษณะเช่นนี้

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "ชื่อคิวที่ล้มเหลว:" . $package['queue'] . "\n";
    // เมื่อคิวเกิดข้อผิดพลาดร้ายแรงบางอย่าง
    if ($exception->getMessage() === 'Some Fatal error') {
        // ตั้งค่าการลองใหม่สูงสุดสำหรับข้อความนี้เป็น 0
        $package['max_attempts'] = 0;
    }
    // ส่งกลับโครงสร้างข้อมูล `$package` ที่แก้ไขแล้ว
    return $package;
});

在非workerman环境向队列发送消息

บางครั้งโปรเจ็กต์บางโปรเจ็กต์ทำงานในสภาพแวดล้อมของ apache หรือ php-fpm ซึ่งไม่สามารถใช้ workerman/redis-queue ได้ คุณสามารถดูฟังก์ชันด้านล่างเพื่อส่งข้อความ

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; // ก่อนเวอร์ชั่น 1.0.5 จะเป็น redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed';// ก่อนเวอร์ชั่น 1.0.5 จะเป็น redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

ในที่นี้, พารามิเตอร์ $redis คืออินสแตนซ์ redis เช่นเดียวกับการใช้ redis extension มีลักษณะเหมือนเช่นนี้:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);