Pool Connexion

Plusieurs coroutines partageant la même connexion peuvent entraîner une confusion des données, il est donc nécessaire d'utiliser un pool de connexions pour gérer les ressources de connexion à la base de données, redis, etc.

Astuce
Cette fonctionnalité nécessite workerman>=5.1.0

Remarques

  • Le support des drivers Swoole/Swow/Fiber/Select/Event est automatique au niveau inférieur
  • Lors de l'utilisation des drivers Fiber/Select/Event, si PDO redis ou d'autres extensions bloquantes sont utilisées, cela se dégradera automatiquement en un pool de connexions avec une seule connexion

Pool de connexions Redis

<?php
use Workerman\Connection\TcpConnection;
use Workerman\Coroutine\Pool;
use Workerman\Events\Swoole;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

class RedisPool
{

    private Pool $pool;

    public function __construct($host, $port, $max_connections = 10)
    {
        $this->pool = new Pool($max_connections);
        // Définir la méthode de création de connexion du pool
        $this->pool->setConnectionCreator(function () use ($host, $port) {
            $redis = new \Redis();
            $redis->connect($host, $port);
            return $redis;
        });
        // Définir la méthode de destruction de connexion du pool
        $this->pool->setConnectionCloser(function ($redis) {
            $redis->close();
        });
        // Définir la méthode de vérification de l'état
        $this->pool->setHeartbeatChecker(function ($redis) {
            $redis->ping();
        });
    }

    // Obtenir une connexion
    public function get(): \Redis
    {
        return $this->pool->get();
    }

    // Rendre la connexion
    public function put($redis): void
    {
        $this->pool->put($redis);
    }
}

// Serveur Http
$worker = new Worker('http://0.0.0.0:8001');
$worker->eventLoop = Swoole::class; // Ou Swow::class ou Fiber::class
$worker->onMessage = function (TcpConnection $connection, Request $request) {
    static $pool;
    if (!$pool) {
        $pool = new RedisPool('127.0.0.1', 6379, 10);
    }
    $redis = $pool->get();
    $redis->set('key', 'hello');
    $value = $redis->get('key');
    $pool->put($redis);
    $connection->send($value);
};

Worker::runAll();

Pool de connexions MySQL (supporte l'obtention et la restitution automatiques des connexions)

<?php
use Workerman\Connection\TcpConnection;
use Workerman\Coroutine\Context;
use Workerman\Coroutine;
use Workerman\Coroutine\Pool;
use Workerman\Events\Swoole;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

class Db
{
    private static ?Pool $pool = null;

    public static function __callStatic($name, $arguments)
    {
        if (self::$pool === null) {
            self::initializePool();
        }
        // Obtenir la connexion depuis le contexte de coroutine, garantissant qu'une même coroutine utilise la même connexion
        $pdo = Context::get('pdo');
        if (!$pdo) {
            // Obtenir une connexion depuis le pool de connexions
            $pdo = self::$pool->get();
            Context::set('pdo', $pdo);
            // Lorsque la coroutine se termine, la connexion est automatiquement restituée
            Coroutine::defer(function () use ($pdo) {
                self::$pool->put($pdo);
            });
        }
        return call_user_func_array([$pdo, $name], $arguments);
    }

    private static function initializePool(): void
    {
        self::$pool = new Pool(10);
        self::$pool->setConnectionCreator(function () {
            return new \PDO('mysql:host=127.0.0.1;dbname=your_database', 'your_username', 'your_password');
        });
        self::$pool->setConnectionCloser(function ($pdo) {
            $pdo = null;
        });
        self::$pool->setHeartbeatChecker(function ($pdo) {
            $pdo->query('SELECT 1');
        });
    }

}

// Serveur Http
$worker = new Worker('http://0.0.0.0:8001');
$worker->eventLoop = Swoole::class; // Ou Swow::class ou Fiber::class
$worker->onMessage = function (TcpConnection $connection, Request $request) {
    $value = Db::query('SELECT NOW() as now')->fetchAll();
    $connection->send(json_encode($value));
};

Worker::runAll();

Description de l'interface

interface PoolInterface
{

    /**
     * Constructeur
     * @param int $max_connections Nombre maximum de connexions, par défaut 1
     * @param array $config = [
     *    'min_connections' => 1, // Nombre minimum de connexions, par défaut 1
     *    'idle_timeout' => 60, // Temps d'attente inactif d'une connexion (secondes), par défaut 60 secondes, après 60 secondes, la connexion sera détruite et retirée du pool
     *    'heartbeat_interval' => 50, // Intervalle de vérification de l'état (secondes), par défaut 50 secondes, vérifiera chaque 50 secondes si la connexion est normale
     *    'wait_timeout' => 10, // Temps d'attente pour obtenir une connexion (secondes), par défaut 10 secondes, après 10 secondes, si l'obtention de la connexion échoue, une exception sera levée
     * ] 
     */
    public function __construct(int $max_connections = 1, array $config = []);

    /**
     * Obtenir une connexion
     */
    public function get(): mixed;

    /**
     * Rendre une connexion
     */
    public function put(object $connection): void;

    /**
     * Créer une connexion
     */
    public function createConnection(): object;

    /**
     * Fermer la connexion et la retirer du pool
     */
    public function closeConnection(object $connection): void;

    /**
     * Obtenir le nombre courant de connexions dans le pool (y compris les connexions en cours d'utilisation et celles non utilisées)
     */
    public function getConnectionCount(): int;

    /**
     * Fermer les connexions dans le pool (non compris les connexions actuellement utilisées)
     */
    public function closeConnections(): void;

    /**
     * Définir la méthode de création de connexion du pool
     */
    public function setConnectionCreator(callable $connectionCreateHandler): self;

    /**
     * Définir la méthode de destruction de connexion du pool
     */
    public function setConnectionCloser(callable $connectionDestroyHandler): self;

    /**
     * Définir la méthode de vérification de l'état
     */
    public function setHeartbeatChecker(callable $connectionHeartbeatHandler): self;

}