Pool kết nối

Nhiều coroutine chia sẻ cùng một kết nối có thể dẫn đến dữ liệu bị rối loạn, vì vậy cần sử dụng pool kết nối để quản lý tài nguyên kết nối với cơ sở dữ liệu, redis, v.v.

Mẹo
Tính năng này yêu cầu workerman>=5.1.0

Lưu ý

  • Tầng dưới tự động hỗ trợ Swoole/Swow/Fiber/Select/Event
  • Khi sử dụng Fiber/Select/Event, nếu sử dụng PDO, redis, v.v. là mở rộng chặn, thì tự động suy giảm thành pool kết nối chỉ có một kết nối

Pool kết nối Redis

<?php
use Workerman\Connection\TcpConnection;
use Workerman\Coroutine\Pool;
use Workerman\Events\Swoole;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

class RedisPool
{

    private Pool $pool;

    public function __construct($host, $port, $max_connections = 10)
    {
        $this->pool = new Pool($max_connections);
        // Thiết lập phương thức tạo kết nối cho pool kết nối
        $this->pool->setConnectionCreator(function () use ($host, $port) {
            $redis = new \Redis();
            $redis->connect($host, $port);
            return $redis;
        });
        // Thiết lập phương thức tiêu huỷ kết nối cho pool kết nối
        $this->pool->setConnectionCloser(function ($redis) {
            $redis->close();
        });
        // Thiết lập phương thức kiểm tra heartbeat
        $this->pool->setHeartbeatChecker(function ($redis) {
            $redis->ping();
        });
    }

    // Lấy kết nối
    public function get(): \Redis
    {
        return $this->pool->get();
    }

    // Trả kết nối
    public function put($redis): void
    {
        $this->pool->put($redis);
    }
}

// Http Server
$worker = new Worker('http://0.0.0.0:8001');
$worker->eventLoop = Swoole::class; // Hoặc Swow::class hoặc Fiber::class
$worker->onMessage = function (TcpConnection $connection, Request $request) {
    static $pool;
    if (!$pool) {
        $pool = new RedisPool('127.0.0.1', 6379, 10);
    }
    $redis = $pool->get();
    $redis->set('key', 'hello');
    $value = $redis->get('key');
    $pool->put($redis);
    $connection->send($value);
};

Worker::runAll();

Pool kết nối MySQL (hỗ trợ tự động lấy và trả kết nối)

<?php
use Workerman\Connection\TcpConnection;
use Workerman\Coroutine\Context;
use Workerman\Coroutine;
use Workerman\Coroutine\Pool;
use Workerman\Events\Swoole;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

class Db
{
    private static ?Pool $pool = null;

    public static function __callStatic($name, $arguments)
    {
        if (self::$pool === null) {
            self::initializePool();
        }
        // Lấy kết nối từ ngữ cảnh coroutine, đảm bảo cùng một coroutine sử dụng cùng một kết nối
        $pdo = Context::get('pdo');
        if (!$pdo) {
            // Lấy kết nối từ pool kết nối
            $pdo = self::$pool->get();
            Context::set('pdo', $pdo);
            // Khi coroutine kết thúc, tự động trả kết nối
            Coroutine::defer(function () use ($pdo) {
                self::$pool->put($pdo);
            });
        }
        return call_user_func_array([$pdo, $name], $arguments);
    }

    private static function initializePool(): void
    {
        self::$pool = new Pool(10);
        self::$pool->setConnectionCreator(function () {
            return new \PDO('mysql:host=127.0.0.1;dbname=your_database', 'your_username', 'your_password');
        });
        self::$pool->setConnectionCloser(function ($pdo) {
            $pdo = null;
        });
        self::$pool->setHeartbeatChecker(function ($pdo) {
            $pdo->query('SELECT 1');
        });
    }

}

// Http Server
$worker = new Worker('http://0.0.0.0:8001');
$worker->eventLoop = Swoole::class; // Hoặc Swow::class hoặc Fiber::class
$worker->onMessage = function (TcpConnection $connection, Request $request) {
    $value = Db::query('SELECT NOW() as now')->fetchAll();
    $connection->send(json_encode($value));
};

Worker::runAll();

Ghi chú API

interface PoolInterface
{

    /**
     * Hàm khởi tạo
     * @param int $max_connections Số kết nối tối đa, mặc định là 1
     * @param array $config = [
     *    'min_connections' => 1, // Số kết nối tối thiểu, mặc định là 1
     *    'idle_timeout' => 60, // Thời gian chờ kết nối không hoạt động (giây), mặc định là 60 giây, sau 60 giây kết nối sẽ bị hủy bỏ và loại bỏ khỏi pool kết nối
     *    'heartbeat_interval' => 50, // Thời gian kiểm tra heartbeat (giây), mặc định là 50 giây, cứ mỗi 50 giây sẽ kiểm tra một lần xem kết nối có bình thường không
     *    'wait_timeout' => 10, // Thời gian chờ lấy kết nối (giây), mặc định là 10 giây, nếu quá 10 giây không lấy được kết nối sẽ ném ra ngoại lệ
     * ] 
     */
    public function __construct(int $max_connections = 1, array $config = []);

    /**
     * Lấy một kết nối
     */
    public function get(): mixed;

    /**
     * Trả lại một kết nối
     */
    public function put(object $connection): void;

    /**
     * Tạo một kết nối
     */
    public function createConnection(): object;

    /**
     * Đóng kết nối, và loại bỏ khỏi pool kết nối
     */
    public function closeConnection(object $connection): void;

    /**
     * Lấy số lượng kết nối hiện tại trong pool kết nối (bao gồm kết nối đang sử dụng và chưa sử dụng)
     */
    public function getConnectionCount(): int;

    /**
     * Đóng các kết nối trong pool kết nối (không bao gồm kết nối đang sử dụng)
     */
    public function closeConnections(): void;

    /**
     * Thiết lập phương thức tạo kết nối cho pool kết nối
     */
    public function setConnectionCreator(callable $connectionCreateHandler): self;

    /**
     * Thiết lập phương thức tiêu hủy kết nối cho pool kết nối
     */
    public function setConnectionCloser(callable $connectionDestroyHandler): self;

    /**
     * Thiết lập phương thức kiểm tra heartbeat
     */
    public function setHeartbeatChecker(callable $connectionHeartbeatHandler): self;

}