workerman/redis-queue

Redis ভিত্তিক মেসেজ কিউ, মেসেজের দেরিতে প্রক্রিয়া সমর্থন করে।

প্রকল্প ঠিকানা:

https://github.com/walkor/redis-queue

ইনস্টল:

composer require workerman/redis-queue

উদাহরণ

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // শুধুমাত্র সুবিধার জন্য
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // শুধুমাত্র সুবিধার জন্য
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // খরচ ব্যর্থ হলে উদ্দীপ্ত কলব্যাক(ঐচ্ছিক)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "কিউ " . $package['queue'] . " খরচ ব্যর্থ\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // কিউতে সময়সূচি অনুযায়ী বার্তা পাঠানো
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

ইনস্ট্যান্স তৈরি করা

  • $address এরূপ redis://ip:6379 রূপে হতে হবে, অবশ্যই redis দিয়ে শুরু করতে হবে।

  • $options নিম্নলিখিত বিকল্প অন্তর্ভুক্ত করে:

    • auth: প্রমাণীকরণ তথ্য, ডিফল্ট ''
    • db: db, ডিফল্ট 0
    • max_attempts: খরচ ব্যর্থ হলে পুনরায় চেষ্টা সংখ্যা, ডিফল্ট 5
    • retry_seconds: পুনরায় চেষ্টা করার সময়ের অন্তর, সেকেন্ডে। ডিফল্ট 5

খরচ ব্যর্থ হওয়া মানে ব্যবসা Exception অথবা Error ত্রুটি ছোঁড়ে। খরচ ব্যর্থ হলে বার্তা দেরিতে কিউতে পুনরায় চেষ্টা করার জন্য রাখা হয়, পুনরায় চেষ্টা সংখ্যা max_attempts দ্বারা নিয়ন্ত্রণ করা হয়, পুনরায় চেষ্টা সময়সীমা retry_seconds এবং max_attempts উভয় দ্বারা নিয়ন্ত্রণ করা হয়। উদাহরণস্বরূপ, যদি max_attempts 5 হয় এবং retry_seconds 10 হয়, তাহলে প্রথমবারের পুনরায় চেষ্টা সময়সীমা 1*10 সেকেন্ড হবে, দ্বিতীয়বারের পুনরায় চেষ্টা সময়সীমা 2*10 সেকেন্ড হবে, তৃতীয়বারের পুনরায় চেষ্টা সময়সীমা 3*10 সেকেন্ড হবে, এভাবে পরবর্তী 5 বার চেষ্টা করার জন্য। যদি max_attempts নির্ধারিত পুনরায় চেষ্টা সংখ্যা ছাড়িয়ে যায়, তাহলে বার্তাটি failed কিউতে {redis-queue}-failed (1.0.5 সংস্করণ পূর্বে redis-queue-failed) রাখা হবে।


send(String $queue, Mixed $data, [int $dely=0])

কিউতে একটি বার্তা পাঠান

  • $queue কিউয়ের নাম, String ধরনের
  • $data প্রকাশিত নির্দিষ্ট বার্তা, এটি গুচ্ছ বা স্ট্রিং হতে পারে, Mixed ধরনের
  • $dely দেরিতে খরচ করার সময়, সেকেন্ডে, ডিফল্ট 0, Int ধরনের

subscribe(mixed $queue, callable $callback)

একটি বা একাধিক কিউতে সাবস্ক্রিপশন নিবন্ধন করুন

  • $queue কিউয়ের নাম, এটি স্ট্রিং বা একাধিক কিউয়ের নামের গুচ্ছ হতে পারে
  • $callback কলব্যাক ফাংশন, নিচের রূপে function (Mixed $data) যেখানে $data হলো send($queue, $data) এর $data

unsubscribe(mixed $queue)

সাবস্ক্রিপশন বাতিল করুন

  • $queue কিউয়ের নাম বা একাধিক কিউয়ের নামের গুচ্ছ

onConsumeFailure(callable $callback)

খরচ ব্যর্থ হলে উদ্দীপ্ত হয়

  • $callback - function (\Throwable $exception, array $package), $package কিউয়ের অভ্যন্তরীণ তথ্য কাঠামো, এতে data queue attempts max_attempts সহ তথ্য থাকে

অভ্যন্তরীণ তথ্য কাঠামো $package এর মান পরিবর্তন সমর্থন করে, আপনাকে শুধুমাত্র পরিবর্তিত $package রিটার্ন করতে হবে। উদাহরণস্বরূপ, যখন কোনো বার্তা একটি নির্দিষ্ট ত্রুটি সৃষ্টি করে এবং পুনরায় চেষ্টা না করতে চাচ্ছেন, কোডের মতো

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "ব্যর্থ কিউয়ের নাম:" . $package['queue'] . "\n";
    // যখন কিউতে একটি নির্দিষ্ট মারাত্মক ত্রুটি ঘটে
    if ($exception->getMessage() === 'Some Fatal error') {
        // এই বার্তার সর্বাধিক পুনরায় চেষ্টা সংখ্যা 0 এ সেট করা
        $package['max_attempts'] = 0;
    }
    // পরিবর্তিত `$package` তথ্য কাঠামো ফেরত দিন
    return $package;
});

অ-ওয়ার্কারম্যান পরিবেশের জন্য কিউতে বার্তা পাঠানো

কখনও কখনও কিছু প্রকল্প apache অথবা php-fpm পরিবেশে চলে, যা workerman/redis-queue প্রকল্প ব্যবহার করতে পারে না, তখন নিচের ফাংশনটি বার্তা পাঠানোর জন্য ব্যবহার করতে পারেন

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; //1.0.5 সংস্করণ পূর্বে redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed';//1.0.5 সংস্করণ পূর্বে redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

এখানে প্যারামিটার $redis হলো Redis ইনস্ট্যান্স। উদাহরণস্বরূপ, Redis এক্সটেনশনের ব্যবহার সম্ভবতে নিচের মতো:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);