workerman/redis-queue
Redis tabanlı mesaj kuyruğu, mesaj gecikmeli işleme desteği sunar.
Proje Adresi:
https://github.com/walkor/redis-queue
Kurulum:
composer require workerman/redis-queue
Örnek
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// Abone ol
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// Abone ol
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// Tüketim hatası durumunda tetiklenecek geri çağırma (isteğe bağlı)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Kuyruk " . $package['queue'] . " tüketimi başarısız oldu\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// Kuyruğa mesaj göndermek için zamanlayıcı
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
Örnek oluştur
-
$addressredis://ip:6379benzeri olmalı, redis ile başlamalıdır. -
$optionsaşağıdaki seçenekleri içerir:auth: Kimlik doğrulama bilgisi, varsayılan ''db: db, varsayılan 0max_attempts: Tüketim hatası sonrası yeniden deneme sayısı, varsayılan 5retry_seconds: Yeniden deneme süre aralığı, saniye cinsinden. Varsayılan 5
Tüketim hatası, iş süreçlerinin
ExceptionveyaErroratması anlamına gelir. Tüketim hatası sonrası mesaj gecikmeli kuyruğa alınır ve yeniden deneme için bekler, yeniden deneme sayısımax_attemptsile kontrol edilir, yeniden deneme aralığı iseretry_secondsvemax_attemptsile birlikte kontrol edilir. Örneğin,max_attempts5 veretry_seconds10 ise, 1. yeniden deneme aralığı1*10saniye, 2. yeniden deneme aralığı2*10saniye, 3. yeniden deneme aralığı3*10saniye şeklinde devam eder. Eğermax_attemptssınırını aşarsa, mesaj{redis-queue}-failed(1.0.5 sürümünden önceredis-queue-failed) isimli başarısız kuyrukta yer alacaktır.
send(String $queue, Mixed $data, [int $dely=0])
Kuyruğa bir mesaj gönder
$queueKuyruk adı,Stringtüründe$dataYayınlanan spesifik mesaj, dizi veya string olabilir,Mixedtüründe$delyGecikmeli tüketim süresi, saniye cinsinden, varsayılan 0,Inttüründe
subscribe(mixed $queue, callable $callback)
Bir veya birden fazla kuyruğa abone ol
$queueKuyruk adı, string veya birden fazla kuyruk adı içeren bir dizi olabilir$callbackGeri çağırma fonksiyonu, formatıfunction (Mixed $data)şeklindedir, burada$datasend($queue, $data)içindeki$datadır.
unsubscribe(mixed $queue)
Aboneliği iptal et
$queueKuyruk adı veya birden fazla kuyruk adı içeren bir dizi
onConsumeFailure(callable $callback)
Tüketim hatası durumunda tetiklenir
$callback-function (\Throwable $exception, array $package),$packagekuyruk içindeki veri yapısını temsil eder,data,queue,attempts,max_attemptsgibi bilgileri içerir.
İç ver yapıdaki $package değerini değiştirmek mümkündür, sadece değişiklik sonrası $package i return etmek yeterlidir. Örneğin, belirli bir mesajda bir hata oluştuğunda yeniden denemek istemiyorsanız, kodlar şu şekilde olabilir:
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "Başarısız kuyruk adı:" . $package['queue'] . "\n";
// Kuyrukta bir tür kritik hata oluştuğunda
if ($exception->getMessage() === 'Bazı Kritik hata') {
// Bu mesajın maksimum yeniden deneme sayısını 0 olarak ayarlayın
$package['max_attempts'] = 0;
}
// Değiştirilen `$package` veri yapısını return edin
return $package;
});
Workerman ortamı dışında kuyruğa mesaj gönderme
Bazen bazı projeler apache veya php-fpm ortamında çalıştığından workerman/redis-queue projesini kullanamazlar, aşağıdaki fonksiyonu kullanarak mesaj göndermeyi gerçekleştirebilirsiniz.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; //1.0.5 sürümünden önce redis-queue-waiting
$queue_delay = '{redis-queue}-delayed';//1.0.5 sürümünden önce redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
Burada $redis parametresi Redis örneğidir. Örneğin, Redis uzantısının kullanımı şu şekildedir:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);