workerman/redis-queue
Redis 기반 메시지 큐로, 메시지 지연 처리를 지원합니다.
프로젝트 주소:
https://github.com/walkor/redis-queue
설치:
composer require workerman/redis-queue
예제
<?php
use Workerman\Worker;
use Workerman\Timer;
use Workerman\RedisQueue\Client;
require_once __DIR__ . '/vendor/autoload.php';
$worker = new Worker();
$worker->onWorkerStart = function () {
$client = new Client('redis://127.0.0.1:6379');
// 구독
$client->subscribe('user-1', function($data){
echo "user-1\n";
var_export($data);
});
// 구독
$client->subscribe('user-2', function($data){
echo "user-2\n";
var_export($data);
});
// 소비 실패 시 호출되는 콜백 (선택 사항)
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "큐 " . $package['queue'] . " 소비 실패\n";
echo $exception->getMessage(), "\n";
var_export($package);
});
// 주기적으로 큐에 메시지 전송
Timer::add(1, function()use($client){
$client->send('user-1', ['some', 'data']);
});
};
Worker::runAll();
API
Client::__construct()Client::send()Client::subscribe()Client::unsubscribe()Client::onConsumeFailure()
__construct (string $address, [array $options])
인스턴스 생성
-
$addressredis://ip:6379형식이며, redis로 시작해야 합니다. -
$options옵션은 다음과 같습니다:auth: 인증 정보, 기본값은 ''db: db, 기본값은 0max_attempts: 소비 실패 시 재시도 횟수, 기본값은 5retry_seconds: 재시도 간격, 단위는 초. 기본값은 5
소비 실패는 비즈니스가
Exception또는Error를 발생시키는 것을 의미합니다. 소비 실패 후 메시지는 지연 큐에 놓여 재시도를 기다리며, 재시도 횟수는max_attempts에 의해 제어됩니다. 재시도 간격은retry_seconds와max_attempts에 의해 함께 제어됩니다. 예를 들어max_attempts가 5이고,retry_seconds가 10일 경우, 첫 번째 재시도 간격은1*10초, 두 번째 재시도 간격은2*10초, 세 번째 재시도 간격은3*10초, 이렇게 계속하여 5번까지 재시도합니다. 만약max_attempts로 설정된 재시도 횟수를 초과하면, 메시지는 키{redis-queue}-failed에 (1.0.5 버전 이전에는redis-queue-failed) 실패 큐에 저장됩니다.
send(String $queue, Mixed $data, [int $dely=0])
큐에 메시지를 전송
$queue큐의 이름,String타입$data발행되는 구체적인 메시지, 배열 또는 문자열일 수 있으며,Mixed타입$dely지연 소비 시간, 단위는 초, 기본값은 0,Int타입
subscribe(mixed $queue, callable $callback)
하나 또는 여러 개의 큐를 구독
$queue큐의 이름, 문자열 또는 여러 개의 큐 이름을 포함하는 배열일 수 있습니다.$callback콜백 함수, 형식은function (Mixed $data)이며, 여기서$data는send($queue, $data)의$data입니다.
unsubscribe(mixed $queue)
구독 취소
$queue큐의 이름 또는 여러 개의 큐 이름을 포함하는 배열
onConsumeFailure(callable $callback)
소비 실패 시 호출
$callback-function (\Throwable $exception, array $package),$package는 큐 내부 데이터 구조로data,queue,attempts,max_attempts정보가 포함됩니다.
내부 데이터 구조 $package의 값을 변경하는 것도 지원하며, 변경된 $package를 return 하면 됩니다. 예를 들어 특정 메시지가 어떤 오류를 발생시킬 때 재시도를 원하지 않는 경우, 코드는 다음과 유사합니다.
$client->onConsumeFailure(function (\Throwable $exception, $package) {
echo "실패한 큐 이름:" . $package['queue'] . "\n";
// 큐에서 특정 치명적인 오류 발생 시
if ($exception->getMessage() === 'Some Fatal error') {
// 이 메시지의 최대 재시도 횟수를 0으로 설정
$package['max_attempts'] = 0;
}
// 수정된 `$package` 데이터 구조 반환
return $package;
});
비워크맨 환경에서 큐에 메시지 보내기
어떤 프로젝트는 apache 또는 php-fpm 환경에서 실행되며, workerman/redis-queue 프로젝트를 사용할 수 없습니다. 다음 함수를 참고하여 메시지를 보내는 것을 구현할 수 있습니다.
function redis_queue_send($redis, $queue, $data, $delay = 0) {
$queue_waiting = '{redis-queue}-waiting'; // 1.0.5 버전 이전에는 redis-queue-waiting
$queue_delay = '{redis-queue}-delayed'; // 1.0.5 버전 이전에는 redis-queue-delayed
$now = time();
$package_str = json_encode([
'id' => rand(),
'time' => $now,
'delay' => $delay,
'attempts' => 0,
'queue' => $queue,
'data' => $data
]);
if ($delay) {
return $redis->zAdd($queue_delay, $now + $delay, $package_str);
}
return $redis->lPush($queue_waiting.$queue, $package_str);
}
여기서, 매개변수 $redis는 redis 인스턴스를 의미합니다. 예를 들어 redis 확장 사용법은 다음과 유사합니다:
$redis = new Redis;
$redis->connect('127.0.0.1', 6379);
$queue = 'user-1';
$data= ['some', 'data'];
redis_queue_send($redis, $queue, $data);