workerman/redis-queue

نظام قائمة انتظار الرسائل المستندة إلى Redis، يدعم معالجة الرسائل المتأخرة.

عنوان المشروع:

https://github.com/walkor/redis-queue

التثبيت:

composer require workerman/redis-queue

مثال

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // الاشتراك
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // الاشتراك
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // الاستجابة عند فشل الاستهلاك (اختياري)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "فشل استهلاك قائمة الانتظار " . $package['queue'] . "\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // إرسال الرسائل إلى القائمة بشكل دوري
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

إنشاء مثيل

  • $address يشبه redis://ip:6379، يجب أن يبدأ بـ redis.

  • $options تشمل الخيارات التالية:

    • auth: معلومات المصادقة، الافتراضي ''
    • db: قاعدة البيانات، الافتراضي 0
    • max_attempts: عدد المحاولات لإعادة الاستهلاك بعد الفشل، الافتراضي 5
    • retry_seconds: فترة إعادة المحاولة، بوحدات الثواني. الافتراضي 5

الفشل في الاستهلاك يعني أن العمل كان يرمي استثناءً Exception أو Error. بعد الفشل في الاستهلاك، ستوضع الرسالة في قائمة الانتظار المتأخرة في انتظار إعادة المحاولة، وعدد المحاولات يتحدد بواسطة max_attempts، وفترة إعادة المحاولة يتم التحكم بها بشكل مشترك بواسطة retry_seconds و max_attempts. على سبيل المثال، إذا كانت max_attempts تساوي 5 و retry_seconds تساوي 10، ستكون فترة إعادة المحاولة الأولى 1*10 ثانية، وفترة إعادة المحاولة الثانية 2*10 ثانية، وفترة إعادة المحاولة الثالثة 3*10 ثانية، وهكذا حتى يتم إعادة المحاولة 5 مرات. إذا تجاوزت عدد المحاولات المسموح به المحدد في max_attempts، فسيتم وضع الرسالة في قائمة الفشل التي تحمل المفتاح {redis-queue}-failed (قبل الإصدار 1.0.5 كانت redis-queue-failed).


send(String $queue, Mixed $data, [int $dely=0])

إرسال رسالة إلى قائمة الانتظار

  • $queue اسم قائمة الانتظار، نوع String
  • $data الرسالة المحددة التي سيتم نشرها، يمكن أن تكون مصفوفة أو سلسلة، نوع Mixed
  • $dely زمن تأخير الاستهلاك، بوحدات الثواني، الافتراضي 0، نوع Int

subscribe(mixed $queue, callable $callback)

الاشتراك في قائمة انتظار واحدة أو أكثر

  • $queue اسم قائمة الانتظار، يمكن أن تكون سلسلة أو مصفوفة تحتوي على أسماء متعددة لقوائم الانتظار
  • $callback دالة رد نداء، بصيغة function (Mixed $data)، حيث $data هو نفس $data من send($queue, $data).

unsubscribe(mixed $queue)

إلغاء الاشتراك

  • $queue اسم قائمة الانتظار أو مصفوفة تحتوي على أسماء متعددة لقوائم الانتظار

onConsumeFailure(callable $callback)

يتم تنشيطه عند فشل الاستهلاك

  • $callback - function (\Throwable $exception, array $package)، $package هو الهيكل الداخلي للبيانات في قائمة الانتظار، ويحتوي على معلومات data و queue و attempts و max_attempts وغيرها

يدعم تعديل القيم في الهيكل الداخلي للبيانات $package، كل ما عليك هو إرجاع $package المعدلة. على سبيل المثال، عندما تحدث خطأ ما في رسالة معينة ولا ترغب في إعادة المحاولة مرة أخرى، سيكون الكود كالتالي:

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "اسم القائمة الفاشلة:" . $package['queue'] . "\n";
    // عند حدوث خطأ قاتل في القائمة
    if ($exception->getMessage() === 'Some Fatal error') {
        // تعيين العدد الأقصى للمحاولات لهذه الرسالة إلى 0
        $package['max_attempts'] = 0;
    }
    // إرجاع الهيكل المعدل للبيانات `$package`
    return $package;
});

إرسال الرسائل إلى قائمة الانتظار في بيئة غير workerman

في بعض الأحيان، تعمل بعض المشاريع في بيئة apache أو php-fpm، ولا يمكن استخدام مشروع workerman/redis-queue، يمكنك مشاهدة الوظيفة التالية لإرسال الرسائل:

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; // قبل الإصدار 1.0.5 كانت redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed'; // قبل الإصدار 1.0.5 كانت redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

حيث أن المعامل $redis هو مثيل Redis. على سبيل المثال، استخدام ملحق Redis سيكون مشابهًا لما يلي:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);