workerman/redis-queue

Redis tabanlı mesaj kuyruğu, mesaj gecikmeli işleme desteği sunar.

Proje Adresi:

https://github.com/walkor/redis-queue

Kurulum:

composer require workerman/redis-queue

Örnek

<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';

$worker = new Worker();
$worker->onWorkerStart = function () {
    $client = new Client('redis://127.0.0.1:6379');
    // Abone ol
    $client->subscribe('user-1', function($data){
        echo "user-1\n";
        var_export($data);
    });
    // Abone ol
    $client->subscribe('user-2', function($data){
        echo "user-2\n";
        var_export($data);
    });
    // Tüketim hatası durumunda tetiklenecek geri çağırma (isteğe bağlı)
    $client->onConsumeFailure(function (\Throwable $exception, $package) {
        echo "Kuyruk " . $package['queue'] . " tüketimi başarısız oldu\n";
        echo $exception->getMessage(), "\n";
        var_export($package);
    });
    // Kuyruğa mesaj göndermek için zamanlayıcı
    Timer::add(1, function()use($client){
        $client->send('user-1', ['some', 'data']);
    });
};

Worker::runAll();

API


__construct (string $address, [array $options])

Örnek oluştur

  • $address redis://ip:6379 benzeri olmalı, redis ile başlamalıdır.

  • $options aşağıdaki seçenekleri içerir:

    • auth: Kimlik doğrulama bilgisi, varsayılan ''
    • db: db, varsayılan 0
    • max_attempts: Tüketim hatası sonrası yeniden deneme sayısı, varsayılan 5
    • retry_seconds: Yeniden deneme süre aralığı, saniye cinsinden. Varsayılan 5

Tüketim hatası, iş süreçlerinin Exception veya Error atması anlamına gelir. Tüketim hatası sonrası mesaj gecikmeli kuyruğa alınır ve yeniden deneme için bekler, yeniden deneme sayısı max_attempts ile kontrol edilir, yeniden deneme aralığı ise retry_seconds ve max_attempts ile birlikte kontrol edilir. Örneğin, max_attempts 5 ve retry_seconds 10 ise, 1. yeniden deneme aralığı 1*10 saniye, 2. yeniden deneme aralığı 2*10 saniye, 3. yeniden deneme aralığı 3*10 saniye şeklinde devam eder. Eğer max_attempts sınırını aşarsa, mesaj {redis-queue}-failed (1.0.5 sürümünden önce redis-queue-failed) isimli başarısız kuyrukta yer alacaktır.


send(String $queue, Mixed $data, [int $dely=0])

Kuyruğa bir mesaj gönder

  • $queue Kuyruk adı, String türünde
  • $data Yayınlanan spesifik mesaj, dizi veya string olabilir, Mixed türünde
  • $dely Gecikmeli tüketim süresi, saniye cinsinden, varsayılan 0, Int türünde

subscribe(mixed $queue, callable $callback)

Bir veya birden fazla kuyruğa abone ol

  • $queue Kuyruk adı, string veya birden fazla kuyruk adı içeren bir dizi olabilir
  • $callback Geri çağırma fonksiyonu, formatı function (Mixed $data) şeklindedir, burada $data send($queue, $data) içindeki $datadır.

unsubscribe(mixed $queue)

Aboneliği iptal et

  • $queue Kuyruk adı veya birden fazla kuyruk adı içeren bir dizi

onConsumeFailure(callable $callback)

Tüketim hatası durumunda tetiklenir

  • $callback - function (\Throwable $exception, array $package), $package kuyruk içindeki veri yapısını temsil eder, data, queue, attempts, max_attempts gibi bilgileri içerir.

İç ver yapıdaki $package değerini değiştirmek mümkündür, sadece değişiklik sonrası $package i return etmek yeterlidir. Örneğin, belirli bir mesajda bir hata oluştuğunda yeniden denemek istemiyorsanız, kodlar şu şekilde olabilir:

$client->onConsumeFailure(function (\Throwable $exception, $package) {
    echo "Başarısız kuyruk adı:" . $package['queue'] . "\n";
    // Kuyrukta bir tür kritik hata oluştuğunda
    if ($exception->getMessage() === 'Bazı Kritik hata') {
        // Bu mesajın maksimum yeniden deneme sayısını 0 olarak ayarlayın
        $package['max_attempts'] = 0;
    }
    // Değiştirilen `$package` veri yapısını return edin
    return $package;
});

Workerman ortamı dışında kuyruğa mesaj gönderme

Bazen bazı projeler apache veya php-fpm ortamında çalıştığından workerman/redis-queue projesini kullanamazlar, aşağıdaki fonksiyonu kullanarak mesaj göndermeyi gerçekleştirebilirsiniz.

function redis_queue_send($redis, $queue, $data, $delay = 0) {
    $queue_waiting = '{redis-queue}-waiting'; //1.0.5 sürümünden önce redis-queue-waiting
    $queue_delay = '{redis-queue}-delayed';//1.0.5 sürümünden önce redis-queue-delayed

    $now = time();
    $package_str = json_encode([
        'id'       => rand(),
        'time'     => $now,
        'delay'    => $delay,
        'attempts' => 0,
        'queue'    => $queue,
        'data'     => $data
    ]);
    if ($delay) {
        return $redis->zAdd($queue_delay, $now + $delay, $package_str);
    }
    return $redis->lPush($queue_waiting.$queue, $package_str);
}

Burada $redis parametresi Redis örneğidir. Örneğin, Redis uzantısının kullanımı şu şekildedir:

$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);