Pool 连接池
หลาย coroutine ใช้การเชื่อมต่อเดียวกันอาจทำให้ข้อมูลผิดพลาด ดังนั้นจึงจำเป็นต้องใช้การเชื่อมต่อพูลเพื่อจัดการแหล่งข้อมูลเชื่อมต่อเช่น ฐานข้อมูล, redis เป็นต้น
提示
คุณสมบัตินี้ต้องการ workerman>=5.1.0
注意
- สนับสนุน Swoole/Swow/Fiber/Select/Event แบบอัตโนมัติ
- เมื่อใช้ Fiber/Select/Event หากใช้ PDO redis หรือส่วนขยายแบบบล็อก อาจลดลงไปเป็นการเชื่อมต่อพูลที่มีการเชื่อมต่อเพียงหนึ่งเดียวเท่านั้น
Redis连接池
<?php
use Workerman\Connection\TcpConnection;
use Workerman\Coroutine\Pool;
use Workerman\Events\Swoole;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
class RedisPool
{
private Pool $pool;
public function __construct($host, $port, $max_connections = 10)
{
$this->pool = new Pool($max_connections);
// ตั้งค่าฟังก์ชันสร้างการเชื่อมต่อในพูล
$this->pool->setConnectionCreator(function () use ($host, $port) {
$redis = new \Redis();
$redis->connect($host, $port);
return $redis;
});
// ตั้งค่าฟังก์ชันทำลายการเชื่อมต่อในพูล
$this->pool->setConnectionCloser(function ($redis) {
$redis->close();
});
// ตั้งค่าฟังก์ชันตรวจสอบการเต้นของหัวใจ
$this->pool->setHeartbeatChecker(function ($redis) {
$redis->ping();
});
}
// รับการเชื่อมต่อ
public function get(): \Redis
{
return $this->pool->get();
}
// คืนการเชื่อมต่อ
public function put($redis): void
{
$this->pool->put($redis);
}
}
// Http Server
$worker = new Worker('http://0.0.0.0:8001');
$worker->eventLoop = Swoole::class; // หรือ Swow::class หรือ Fiber::class
$worker->onMessage = function (TcpConnection $connection, Request $request) {
static $pool;
if (!$pool) {
$pool = new RedisPool('127.0.0.1', 6379, 10);
}
$redis = $pool->get();
$redis->set('key', 'hello');
$value = $redis->get('key');
$pool->put($redis);
$connection->send($value);
};
Worker::runAll();
MySQL连接池(支持自动获取和归还连接)
<?php
use Workerman\Connection\TcpConnection;
use Workerman\Coroutine\Context;
use Workerman\Coroutine;
use Workerman\Coroutine\Pool;
use Workerman\Events\Swoole;
use Workerman\Protocols\Http\Request;
use Workerman\Worker;
require_once __DIR__ . '/vendor/autoload.php';
class Db
{
private static ?Pool $pool = null;
public static function __callStatic($name, $arguments)
{
if (self::$pool === null) {
self::initializePool();
}
// รับการเชื่อมต่อจากบริบทของ coroutine เพื่อรับประกันว่าใช้การเชื่อมต่อเดียวกัน
$pdo = Context::get('pdo');
if (!$pdo) {
// รับการเชื่อมต่อจากพูลการเชื่อมต่อ
$pdo = self::$pool->get();
Context::set('pdo', $pdo);
// เมื่อ coroutine เสร็จสิ้นจะคืนการเชื่อมต่อโดยอัตโนมัติ
Coroutine::defer(function () use ($pdo) {
self::$pool->put($pdo);
});
}
return call_user_func_array([$pdo, $name], $arguments);
}
private static function initializePool(): void
{
self::$pool = new Pool(10);
self::$pool->setConnectionCreator(function () {
return new \PDO('mysql:host=127.0.0.1;dbname=your_database', 'your_username', 'your_password');
});
self::$pool->setConnectionCloser(function ($pdo) {
$pdo = null;
});
self::$pool->setHeartbeatChecker(function ($pdo) {
$pdo->query('SELECT 1');
});
}
}
// Http Server
$worker = new Worker('http://0.0.0.0:8001');
$worker->eventLoop = Swoole::class; // หรือ Swow::class หรือ Fiber::class
$worker->onMessage = function (TcpConnection $connection, Request $request) {
$value = Db::query('SELECT NOW() as now')->fetchAll();
$connection->send(json_encode($value));
};
Worker::runAll();
接口说明
interface PoolInterface
{
/**
* 构造函数
* @param int $max_connections 最大连接数,默认1
* @param array $config = [
* 'min_connections' => 1, // จำนวนการเชื่อมต่อต่ำสุด เริ่มต้นที่ 1
* 'idle_timeout' => 60, // เวลาออกจากสถานะว่าง (วินาที) เริ่มต้นที่ 60 วินาที การเชื่อมต่อจะถูกทำลายหลังจาก 60 วินาที
* 'heartbeat_interval => 50, // เวลาตรวจสอบการเต้นของหัวใจ (วินาที) เริ่มต้นที่ 50 วินาที จะมีการตรวจสอบการเชื่อมต่อทุก ๆ 50 วินาที
* 'wait_timeout' => 10, // เวลาในการรอรับการเชื่อมต่อ (วินาที) เริ่มต้นที่ 10 วินาที หากล่าช้ากว่า 10 วินาทีจะเกิดข้อผิดพลาด
* ]
*/
public function __construct(int $max_connections = 1, array $config = []);
/**
* รับการเชื่อมต่อหนึ่งรายการ
*/
public function get(): mixed;
/**
* คืนการเชื่อมต่อ
*/
public function put(object $connection): void;
/**
* สร้างการเชื่อมต่อ
*/
public function createConnection(): object;
/**
* ปิดการเชื่อมต่อ และลบจากพูลการเชื่อมต่อ
*/
public function closeConnection(object $connection): void;
/**
* รับจำนวนการเชื่อมต่อในพูลในปัจจุบัน (รวมการเชื่อมต่อที่ถูกใช้งานและไม่ถูกใช้งาน)
*/
public function getConnectionCount(): int;
/**
* ปิดการเชื่อมต่อในพูล (ไม่รวมการเชื่อมต่อที่ถูกใช้งาน)
*/
public function closeConnections(): void;
/**
* ตั้งค่าฟังก์ชันสร้างการเชื่อมต่อในพูล
*/
public function setConnectionCreator(callable $connectionCreateHandler): self
/**
* ตั้งค่าฟังก์ชันทำลายการเชื่อมต่อในพูล
*/
public function setConnectionCloser(callable $connectionDestroyHandler): self
/**
* ตั้งค่าฟังก์ชันตรวจสอบการเต้นของหัวใจ
*/
public function setHeartbeatChecker(callable $connectionHeartbeatHandler): self
}