workerman/redis-queue
نظام قائمة انتظار الرسائل المستندة إلى Redis، يدعم معالجة الرسائل المتأخرة.
عنوان المشروع:
https://github.com/walkor/redis-queue
التثبيت:
composer require workerman/redis-queue
مثال
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// الاشتراك
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// الاشتراك
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// الاستجابة عند فشل الاستهلاك (اختياري)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "فشل استهلاك قائمة الانتظار " . $package['queue'] . "\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// إرسال الرسائل إلى القائمة بشكل دوري
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
إنشاء مثيل
-
$addressيشبهredis://ip:6379، يجب أن يبدأ بـ redis. -
$optionsتشمل الخيارات التالية:auth: معلومات المصادقة، الافتراضي ''db: قاعدة البيانات، الافتراضي 0max_attempts: عدد المحاولات لإعادة الاستهلاك بعد الفشل، الافتراضي 5retry_seconds: فترة إعادة المحاولة، بوحدات الثواني. الافتراضي 5
الفشل في الاستهلاك يعني أن العمل كان يرمي استثناءً
ExceptionأوError. بعد الفشل في الاستهلاك، ستوضع الرسالة في قائمة الانتظار المتأخرة في انتظار إعادة المحاولة، وعدد المحاولات يتحدد بواسطةmax_attempts، وفترة إعادة المحاولة يتم التحكم بها بشكل مشترك بواسطةretry_secondsوmax_attempts. على سبيل المثال، إذا كانتmax_attemptsتساوي 5 وretry_secondsتساوي 10، ستكون فترة إعادة المحاولة الأولى1*10ثانية، وفترة إعادة المحاولة الثانية2*10ثانية، وفترة إعادة المحاولة الثالثة3*10ثانية، وهكذا حتى يتم إعادة المحاولة 5 مرات. إذا تجاوزت عدد المحاولات المسموح به المحدد فيmax_attempts، فسيتم وضع الرسالة في قائمة الفشل التي تحمل المفتاح{redis-queue}-failed(قبل الإصدار 1.0.5 كانتredis-queue-failed).
send(String $queue, Mixed $data, [int $dely=0])
إرسال رسالة إلى قائمة الانتظار
$queueاسم قائمة الانتظار، نوعString$dataالرسالة المحددة التي سيتم نشرها، يمكن أن تكون مصفوفة أو سلسلة، نوعMixed$delyزمن تأخير الاستهلاك، بوحدات الثواني، الافتراضي 0، نوعInt
subscribe(mixed $queue, callable $callback)
الاشتراك في قائمة انتظار واحدة أو أكثر
$queueاسم قائمة الانتظار، يمكن أن تكون سلسلة أو مصفوفة تحتوي على أسماء متعددة لقوائم الانتظار$callbackدالة رد نداء، بصيغةfunction (Mixed $data)، حيث$dataهو نفس$dataمنsend($queue, $data).
unsubscribe(mixed $queue)
إلغاء الاشتراك
$queueاسم قائمة الانتظار أو مصفوفة تحتوي على أسماء متعددة لقوائم الانتظار
onConsumeFailure(callable $callback)
يتم تنشيطه عند فشل الاستهلاك
$callback-function (\Throwable $exception, array $package)،$packageهو الهيكل الداخلي للبيانات في قائمة الانتظار، ويحتوي على معلوماتdataوqueueوattemptsوmax_attemptsوغيرها
يدعم تعديل القيم في الهيكل الداخلي للبيانات $package، كل ما عليك هو إرجاع $package المعدلة. على سبيل المثال، عندما تحدث خطأ ما في رسالة معينة ولا ترغب في إعادة المحاولة مرة أخرى، سيكون الكود كالتالي:
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "اسم القائمة الفاشلة:" . $package['queue'] . "\n";
// عند حدوث خطأ قاتل في القائمة
if ($exception->getMessage() === 'Some Fatal error') {
// تعيين العدد الأقصى للمحاولات لهذه الرسالة إلى 0
$package['max_attempts'] = 0;
}
// إرجاع الهيكل المعدل للبيانات `$package`
return $package;
});
إرسال الرسائل إلى قائمة الانتظار في بيئة غير workerman
في بعض الأحيان، تعمل بعض المشاريع في بيئة apache أو php-fpm، ولا يمكن استخدام مشروع workerman/redis-queue، يمكنك مشاهدة الوظيفة التالية لإرسال الرسائل:
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; // قبل الإصدار 1.0.5 كانت redis-queue-waiting
$queue_delay = '{redis-queue}-delayed'; // قبل الإصدار 1.0.5 كانت redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
حيث أن المعامل $redis هو مثيل Redis. على سبيل المثال، استخدام ملحق Redis سيكون مشابهًا لما يلي:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);