কিভাবে অ্যাসিঙ্ক্রোনাস টাস্ক বাস্তবায়ন করবেন

প্রশ্ন:

কিভাবে ভারী ব্যবসাগুলি অ্যাসিঙ্ক্রোনাসভাবে প্রক্রিয়া করতে হয়, প্রধান ব্যবসা দীর্ঘ সময়ের জন্য ব্লক হওয়া এড়ানোর জন্য। উদাহরণস্বরূপ, আমাকে 1000 ব্যবহারকারীকে ইমেল পাঠাতে হবে, এই প্রক্রিয়াটি খুব ধীর হতে পারে, কিছু সেকেন্ড ব্লক হতে পারে, এই সময়ে প্রধান প্রবাহ ব্লক হওয়ার কারণে পরবর্তী অনুরোধগুলিতে প্রভাব ফেলবে, কিভাবে এই ধরনের ভারী টাস্ক অন্য প্রসেসকে অ্যাসিঙ্ক্রোনাস প্রক্রিয়া করতে দেওয়া যায়।

উত্তর:

স্থানীয়ভাবে বা অন্য সার্ভার বা এমনকি সার্ভার ক্লাস্টারে কিছু টাস্ক প্রসেস আগে থেকেই প্রতিষ্ঠিত করা যেতে পারে ভারী ব্যবসাগুলি প্রক্রিয়া করতে, টাস্ক প্রসেসের সংখ্যা প্রয়োজন অনুযায়ী কিছুটা বেশি খোলা যেতে পারে, যেমন CPU এর 10 গুণ, তারপর কলার AsyncTcpConnection ব্যবহার করে এই টাস্ক প্রসেসগুলিতে ডেটা অ্যাসিঙ্ক্রোনাসভাবে পাঠাবে প্রক্রিয়া করতে এবং অ্যাসিঙ্ক্রোনাসভাবে প্রক্রিয়া ফলাফল পাওয়া যাবে।

টাস্ক প্রসেস সার্ভার

use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// টাস্ক কর্মী, Text প্রোটোকল ব্যবহার করছে
$task_worker = new Worker('Text://0.0.0.0:12345');
// টাস্ক প্রসেসের সংখ্যা প্রয়োজন অনুযায়ী কিছুটা বেশি খোলা যেতে পারে
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
     // ধরুন এটি json ডেটা পাঠানো হয়েছে
     $task_data = json_decode($task_data, true);
     // task_data অনুযায়ী সংশ্লিষ্ট টাস্ক লজিক প্রক্রিয়া করুন.... ফলাফল পেতে, এখানে বাদ দেওয়া.....
     $task_result = ......
     // ফলাফল পাঠান
     $connection->send(json_encode($task_result));
};
Worker::runAll();

workerman এ কল করা হচ্ছে

use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';

// websocket সার্ভার
$worker = new Worker('websocket://0.0.0.0:8080');

$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
    // দূরবর্তী টাস্ক পরিষেবার সাথে অ্যাসিঙ্ক্রোনাস সংযোগ স্থাপন করুন, IP দূরবর্তী টাস্ক পরিষেবার IP হবে, যদি এটি স্থানীয় হয় তবে 127.0.0.1, যদি ক্লাস্টার হয় তবে LVS এর IP
    $task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
    // টাস্ক এবং প্যারামিটার ডেটা
    $task_data = array(
        'function' => 'send_mail',
        'args'       => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
    );
    // ডেটা পাঠান
    $task_connection->send(json_encode($task_data));
    // অ্যাসিঙ্ক্রোনাসভাবে ফলাফল পান
    $task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result)use($ws_connection)
    {
         // ফলাফল
         var_dump($task_result);
         // ফলাফল পাওয়ার পরে অ্যাসিঙ্ক্রোনাস সংযোগ বন্ধ করতে মনে রাখবেন
         $task_connection->close();
         // সংশ্লিষ্ট websocket ক্লায়েন্টকে কাজ সম্পন্ন হয়েছে জানিয়ে দিন
         $ws_connection->send('task complete');
    };
    // অ্যাসিঙ্ক্রোনাস সংযোগ কার্যকর করুন
    $task_connection->connect();
};

Worker::runAll();

এইভাবে, ভারী টাস্কগুলি স্থানীয় বা অন্যান্য সার্ভারের প্রসেসের জন্য প্রক্রিয়া করার জন্য রাখা হয়েছে, টাস্ক সম্পন্ন হলে ফলাফল অ্যাসিঙ্ক্রোনাসভাবে গৃহীত হবে, ব্যবসায়িক প্রক্রিয়া ব্লক হবে না।