Asenkron Görevlerin Nasıl Gerçekleştirileceği

Soru:

Ağır iş yüklerini asenkron olarak nasıl işleyebilirim, ana iş akışının uzun süreli bloke olmasını nasıl önleyebilirim? Örneğin, 1000 kullanıcıya e-posta göndermek istiyorum, bu işlem oldukça yavaş, saniyelerce sürebilir ve bu süreçte ana akışın bloke olması, sonraki istekleri etkileyebilir. Bu tür ağır görevleri başka süreçlere asenkron bir şekilde nasıl devredebilirim?

Cevap:

Ağır iş yüklerini işlemek için yerel makinede veya başka bir sunucuda hatta sunucu kümesinde bazı görev süreçleri önceden oluşturabilirsiniz. Görev süreçlerinin sayısını ihtiyaca göre artırabilirsiniz, örneğin, CPU'nun 10 katı kadar. Ardından, çağrıcının AsyncTcpConnection kullanarak verileri bu görev süreçlerine asenkron olarak göndermesini sağlayabilirsiniz ve asenkron olarak işlem sonucunu alabilirsiniz.

Görev süreci sunucusu

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// görev işçisi, Text protokolünü kullanıyor
$task_worker = new Worker('Text://0.0.0.0:12345');
// görev süreçlerinin sayısını ihtiyaçlara göre artırabilirsiniz
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
     // varsayalım ki gelen veri json formatında
     $task_data = json_decode($task_data, true);
     // task_data'ya göre ilgili görev mantığını işleyin... sonucu alın, burada atlanmıştır....
     $task_result = ......
     // sonucu gönderin
     $connection->send(json_encode($task_result));
};
Worker::runAll();

Workerman'da çağrıldığında

use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// websocket servisi
$worker = new Worker('websocket://0.0.0.0:8080');

$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
    // Uzaktaki görev hizmetiyle asenkron bağlantı kurun, ip uzak görev hizmetinin ip'sidir, eğer yerel makineyse 127.0.0.1, eğer küme ise lvs'nin ip'sidir
    $task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
    // Görev ve parametre verileri
    $task_data = array(
        'function' => 'send_mail',
        'args'       => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
    );
    // Verileri gönderin
    $task_connection->send(json_encode($task_data));
    // Asenkron sonucu alın
    $task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result)use($ws_connection)
    {
         // Sonuç
         var_dump($task_result);
         // Sonucunu aldıktan sonra, asenkron bağlantıyı kapatmayı unutmayın
         $task_connection->close();
         // İlgili websocket istemcisine görevi tamamladığını bildirin
         $ws_connection->send('task complete');
    };
    // Asenkron bağlantıyı gerçekleştirin
    $task_connection->connect();
};

Worker::runAll();

Böylece, ağır görevler, yerel makinedeki veya başka bir sunucudaki süreçlere devredilmiş olur, görev tamamlandığında asenkron olarak sonuç alınacak ve iş akışı bloke olmayacaktır.