workerman/redis-queue

Redis पर आधारित संदेश कतार, संदेश विलंबित प्रसंस्करण का समर्थन करता है।

परियोजना का पता:

https://github.com/walkor/redis-queue

स्थापना:

composer require workerman/redis-queue

उदाहरण

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // सदस्यता लेना
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // सदस्यता लेना
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // उपभोक्ता विफलता पर कॉल बैक (वैकल्पिक)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "कतार " . $package['queue'] . " उपभोक्ता विफलता\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // नियमित रूप से कतार में संदेश भेजना
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

उदाहरण बनाना

  • $address redis://ip:6379 के समान, इसे redis से शुरू होना चाहिए।

  • $options निम्नलिखित विकल्पों को शामिल करता है:

    • auth: प्रमाणीकरण जानकारी, डिफ़ॉल्ट ''
    • db: db, डिफ़ॉल्ट 0
    • max_attempts: उपभोक्ता विफलता के बाद पुनर्प्रयत्न次数, डिफ़ॉल्ट 5
    • retry_seconds: पुनर्प्रयत्न का समय अंतराल, सेकंड में। डिफ़ॉल्ट 5

उपभोक्ता विफलता का मतलब है कि व्यावसायिक अपवाद Exception या Error फेंका गया। उपभोक्ता विफलता के बाद संदेश विलंबित कतार में पुनः प्रयास के लिए रखा जाएगा, पुनः प्रयास次数 max_attempts द्वारा नियंत्रित किया जाएगा, और पुनर्प्रयत्न का अंतराल retry_seconds और max_attempts द्वारा संयुक्त रूप से नियंत्रित किया जाएगा। जैसे कि max_attempts 5 है, retry_seconds 10 है, पहले पुनर्प्रयत्न का अंतराल 1*10 सेकंड होगा, दूसरे पुनर्प्रयत्न का समय अंतराल 2*10 सेकंड होगा, तीसरे पुनर्प्रयत्न का समय अंतराल 3*10 सेकंड होगा, इस प्रकार जब तक 5 बार पुनर्प्रयत्न नहीं किया जाता। यदि max_attempts सेट किए गए पुनर्प्रयत्न次数 से अधिक हो जाता है, तो संदेश को कुंजी {redis-queue}-failed (1.0.5 संस्करण से पहले redis-queue-failed) के विफलता कतार में रखा जाता है।


send(String $queue, Mixed $data, [int $dely=0])

कतार में एक संदेश भेजें

  • $queue कतार का नाम, String प्रकार
  • $data प्रकाशित विशेष संदेश हो सकता है, जो या तो एक संग्रह या स्ट्रिंग हो, Mixed प्रकार
  • $dely विलंबित उपभोक्ता समय, सेकंड में, डिफ़ॉल्ट 0, Int प्रकार

subscribe(mixed $queue, callable $callback)

एक या अधिक कतारों की सदस्यता लें

  • $queue कतार का नाम, यह स्ट्रिंग या कई कतार नामों के संग्रह हो सकता है
  • $callback कॉल बैक फ़ंक्शन, स्वरूप function (Mixed $data) है, जहाँ $data send($queue, $data) में $data है।

unsubscribe(mixed $queue)

सदस्यता समाप्त करें

  • $queue कतार का नाम या कई कतार नामों के संग्रह

onConsumeFailure(callable $callback)

उपभोक्ता विफलता पर ट्रिगर होता है

  • $callback - function (\Throwable $exception, array $package), $package कतार का आंतरिक डेटा संरचना है, जिसमें data queue attempts max_attempts जैसी जानकारी शामिल है।

आंतरिक डेटा संरचना $package के मानों को बदलने का समर्थन करता है, बस बदले हुए $package को return करना होगा। उदाहरण के लिए, जब किसी संदेश में कोई विशेष त्रुटि होती है और पुनः प्रयास नहीं करना चाहता है, तो कोड इस तरह होगा

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "विफलता कतार का नाम:" . $package['queue'] . "\n";
    // जब कतार में कोई गंभीर त्रुटि होती है
    if ($exception->getMessage() === 'Some Fatal error') {
        // इस संदेश के लिए अधिकतम पुनर्प्रयत्न次数 को 0 पर सेट करें
        $package['max_attempts'] = 0;
    }
    // संशोधित `$package` डेटा संरचना वापस करें
    return $package;
});

गैर-workerman वातावरण में कतार में संदेश भेजना

कभी-कभी, कुछ प्रोजेक्ट्स apache या php-fpm वातावरण में चलते हैं, जिससे workerman/redis-queue प्रोजेक्ट का उपयोग नहीं कर सकते हैं, नीचे दिए गए फ़ंक्शन को संदर्भित करके भेजा जा सकता है

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; //1.0.5 संस्करण से पहले redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed';//1.0.5 संस्करण से पहले redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

यहाँ, पैरामीटर $redis redis उदाहरण है। उदाहरण के लिए redis विस्तारित उपयोग इस प्रकार होगा:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);