Pool Conexión Pool

Varios corrutinas compartiendo la misma conexión pueden causar confusión de datos, por lo que es necesario usar un pool de conexiones para gestionar recursos de conexión como bases de datos, redis, etc.

Nota
Esta característica requiere workerman>=5.1.0

Atención

  • Soporte automático para Swoole/Swow/Fiber/Select/Event en el nivel inferior.
  • Al usar Fiber/Select/Event, si se utilizan extensiones bloqueantes como PDO redis, se degradará automáticamente a un pool de conexiones con solo una conexión.

Pool de Conexiones Redis

<?php
use Workerman\Connection\TcpConnection;
use Workerman\Coroutine\Pool;
use Workerman\Events\Swoole;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

class RedisPool
{

    private Pool $pool;

    public function __construct($host, $port, $max_connections = 10)
    {
        $this->pool = new Pool($max_connections);
        // Establecer el método de creación de conexiones del pool
        $this->pool->setConnectionCreator(function () use ($host, $port) {
            $redis = new \Redis();
            $redis->connect($host, $port);
            return $redis;
        });
        // Establecer el método de destrucción de conexiones del pool
        $this->pool->setConnectionCloser(function ($redis) {
            $redis->close();
        });
        // Establecer método de comprobación del heartbeat
        $this->pool->setHeartbeatChecker(function ($redis) {
            $redis->ping();
        });
    }

    // Obtener conexión
    public function get(): \Redis
    {
        return $this->pool->get();
    }

    // Devolver conexión
    public function put($redis): void
    {
        $this->pool->put($redis);
    }
}

// Servidor Http
$worker = new Worker('http://0.0.0.0:8001');
$worker->eventLoop = Swoole::class; // O Swow::class o Fiber::class
$worker->onMessage = function (TcpConnection $connection, Request $request) {
    static $pool;
    if (!$pool) {
        $pool = new RedisPool('127.0.0.1', 6379, 10);
    }
    $redis = $pool->get();
    $redis->set('key', 'hello');
    $value = $redis->get('key');
    $pool->put($redis);
    $connection->send($value);
};

Worker::runAll();

Pool de Conexiones MySQL (soporta obtención y devolución de conexiones automáticamente)

<?php
use Workerman\Connection\TcpConnection;
use Workerman\Coroutine\Context;
use Workerman\Coroutine;
use Workerman\Coroutine\Pool;
use Workerman\Events\Swoole;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';

class Db
{
    private static ?Pool $pool = null;

    public static function __callStatic($name, $arguments)
    {
        if (self::$pool === null) {
            self::initializePool();
        }
        // Obtener conexión del contexto de corrutina, asegurando que la misma corrutina use la misma conexión
        $pdo = Context::get('pdo');
        if (!$pdo) {
            // Obtener conexión del pool
            $pdo = self::$pool->get();
            Context::set('pdo', $pdo);
            // Al finalizar la corrutina, devolver conexión automáticamente
            Coroutine::defer(function () use ($pdo) {
                self::$pool->put($pdo);
            });
        }
        return call_user_func_array([$pdo, $name], $arguments);
    }

    private static function initializePool(): void
    {
        self::$pool = new Pool(10);
        self::$pool->setConnectionCreator(function () {
            return new \PDO('mysql:host=127.0.0.1;dbname=your_database', 'your_username', 'your_password');
        });
        self::$pool->setConnectionCloser(function ($pdo) {
            $pdo = null;
        });
        self::$pool->setHeartbeatChecker(function ($pdo) {
            $pdo->query('SELECT 1');
        });
    }

}

// Servidor Http
$worker = new Worker('http://0.0.0.0:8001');
$worker->eventLoop = Swoole::class; // O Swow::class o Fiber::class
$worker->onMessage = function (TcpConnection $connection, Request $request) {
    $value = Db::query('SELECT NOW() as now')->fetchAll();
    $connection->send(json_encode($value));
};

Worker::runAll();

Descripción de la Interfaz

interface PoolInterface
{

    /**
     * Constructor
     * @param int $max_connections Número máximo de conexiones, por defecto 1
     * @param array $config = [
     *    'min_connections' => 1, // Número mínimo de conexiones, por defecto 1
     *    'idle_timeout' => 60, // Tiempo de espera de conexión inactiva (segundos), por defecto 60 segundos, después de 60 segundos la conexión se destruya y se elimina del pool
     *    'heartbeat_interval => 50, // Intervalo de comprobación de heartbeat (segundos), por defecto 50 segundos, cada 50 segundos se comprobará si la conexión está activa
     *    'wait_timeout' => 10, // Tiempo de espera para obtener conexión (segundos), por defecto 10 segundos, si no se obtiene conexión después de 10 segundos se lanzará una excepción
     * ] 
     */
    public function __construct(int $max_connections = 1, array $config = []);

    /**
     * Obtener una conexión
     */
    public function get(): mixed;

    /**
     * Devolver una conexión
     */
    public function put(object $connection): void;

    /**
     * Crear una conexión
     */
    public function createConnection(): object;

    /**
     * Cerrar conexión y eliminarla del pool
     */
    public function closeConnection(object $connection): void;

    /**
     * Obtener el número actual de conexiones en el pool (incluyendo conexiones en uso y no utilizadas)
     */
    public function getConnectionCount(): int;

    /**
     * Cerrar conexiones en el pool (sin incluir conexiones en uso)
     */
    public function closeConnections(): void;

    /**
     * Establecer el método para crear conexiones en el pool
     */
    public function setConnectionCreator(callable $connectionCreateHandler): self;

    /**
     * Establecer el método para destruir conexiones en el pool
     */
    public function setConnectionCloser(callable $connectionDestroyHandler): self;

    /**
     * Establecer el método para comprobar el heartbeat
     */
    public function setHeartbeatChecker(callable $connectionHeartbeatHandler): self;

}