config = Config::getInstance(); // 从配置获取连接池大小 $this->maxConnections = $this->config->get('database.max_connections', 10); $this->minConnections = $this->config->get('database.min_connections', 3); // 初始化连接池 $this->initConnectionPool(); } /** * 获取数据库连接实例 * @return Database 数据库连接实例 */ public static function getInstance() { if (self::$instance === null) { self::$instance = new self(); } return self::$instance; } /** * 初始化连接池 */ private function initConnectionPool() { // 初始化最小数量的连接 for ($i = 0; $i < $this->minConnections; $i++) { $this->connections[] = $this->createConnection(); } } /** * 创建新的数据库连接 * @return PDO 数据库连接 * @throws Exception 连接失败时抛出异常 */ private function createConnection() { $dbConfig = $this->config->get('database'); $host = $dbConfig['host']; $port = $dbConfig['port']; $database = $dbConfig['database']; $username = $dbConfig['username']; $password = $dbConfig['password']; $charset = $dbConfig['charset']; try { // 先连接到服务器,不指定数据库 $dsn = "mysql:host={$host};port={$port};charset={$charset}"; $options = [ PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION, PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC, PDO::ATTR_EMULATE_PREPARES => false, PDO::ATTR_PERSISTENT => true, // 使用持久连接 ]; $pdo = new PDO($dsn, $username, $password, $options); // 尝试创建数据库(如果不存在) $pdo->exec("CREATE DATABASE IF NOT EXISTS $database CHARACTER SET $charset COLLATE {$dbConfig['collation']}"); // 选择数据库 $pdo->exec("USE $database"); return $pdo; } catch (PDOException $e) { throw new Exception("Database connection failed: " . $e->getMessage()); } } /** * 从连接池获取可用连接 * @return PDO 可用的数据库连接 */ private function getConnection() { // 检查是否有可用连接 if (count($this->connections) > 0) { $pdo = array_pop($this->connections); // 验证连接是否有效 if (!$this->isValidConnection($pdo)) { $pdo = $this->createConnection(); } $this->inUse[] = $pdo; return $pdo; } // 如果没有可用连接,检查是否可以创建新连接 if (count($this->inUse) < $this->maxConnections) { $pdo = $this->createConnection(); $this->inUse[] = $pdo; return $pdo; } // 连接池已满,等待可用连接(简单实现,实际应该使用更高效的机制) usleep(1000); // 等待1毫秒 return $this->getConnection(); } /** * 将连接归还到连接池 * @param PDO $pdo 数据库连接 */ private function releaseConnection($pdo) { // 从正在使用的列表中移除 $key = array_search($pdo, $this->inUse); if ($key !== false) { unset($this->inUse[$key]); // 如果连接有效,归还到连接池 if ($this->isValidConnection($pdo)) { $this->connections[] = $pdo; } } } /** * 验证连接是否有效 * @param PDO $pdo 数据库连接 * @return bool 是否有效 */ private function isValidConnection($pdo) { try { $pdo->query("SELECT 1"); return true; } catch (PDOException $e) { return false; } } /** * 执行查询并返回所有结果 * @param string $sql SQL查询语句 * @param array $params 查询参数 * @return array 查询结果 */ public function fetchAll($sql, $params = []) { $pdo = $this->getConnection(); try { $stmt = $pdo->prepare($sql); $stmt->execute($params); return $stmt->fetchAll(); } finally { $this->releaseConnection($pdo); } } /** * 执行查询并返回第一条结果 * @param string $sql SQL查询语句 * @param array $params 查询参数 * @return array 查询结果 */ public function fetchOne($sql, $params = []) { $pdo = $this->getConnection(); try { $stmt = $pdo->prepare($sql); $stmt->execute($params); return $stmt->fetch(); } finally { $this->releaseConnection($pdo); } } /** * 执行查询 * @param string $sql SQL查询语句 * @param array $params 查询参数 * @return PDOStatement PDO语句对象 */ public function query($sql, $params = []) { $pdo = $this->getConnection(); try { $stmt = $pdo->prepare($sql); $stmt->execute($params); return $stmt; } finally { $this->releaseConnection($pdo); } } /** * 执行插入操作 * @param string $sql SQL插入语句 * @param array $params 查询参数 * @return int 插入的ID */ public function insert($sql, $params = []) { $pdo = $this->getConnection(); try { $stmt = $pdo->prepare($sql); $stmt->execute($params); return $pdo->lastInsertId(); } finally { $this->releaseConnection($pdo); } } /** * 执行更新操作 * @param string $sql SQL更新语句 * @param array $params 查询参数 * @return int 影响的行数 */ public function update($sql, $params = []) { $pdo = $this->getConnection(); try { $stmt = $pdo->prepare($sql); $stmt->execute($params); return $stmt->rowCount(); } finally { $this->releaseConnection($pdo); } } /** * 执行删除操作 * @param string $sql SQL删除语句 * @param array $params 查询参数 * @return int 影响的行数 */ public function delete($sql, $params = []) { $pdo = $this->getConnection(); try { $stmt = $pdo->prepare($sql); $stmt->execute($params); return $stmt->rowCount(); } finally { $this->releaseConnection($pdo); } } /** * 执行SQL语句 * @param string $sql SQL语句 * @param array $params 查询参数 * @return int 影响的行数 */ public function execute($sql, $params = []) { $pdo = $this->getConnection(); try { $stmt = $pdo->prepare($sql); $stmt->execute($params); return $stmt->rowCount(); } finally { $this->releaseConnection($pdo); } } /** * 开始事务 * @return PDO 用于事务的连接 */ public function beginTransaction() { $pdo = $this->getConnection(); $pdo->beginTransaction(); // 事务期间连接不归还,由commit或rollback处理 return $pdo; } /** * 提交事务 * @param PDO $pdo 事务连接 */ public function commit($pdo = null) { if ($pdo) { $pdo->commit(); $this->releaseConnection($pdo); } } /** * 回滚事务 * @param PDO $pdo 事务连接 */ public function rollback($pdo = null) { if ($pdo) { $pdo->rollBack(); $this->releaseConnection($pdo); } } /** * 验证数据库连接 * @return bool 是否连接成功 */ public function isConnected() { $pdo = $this->getConnection(); try { $pdo->query("SELECT 1"); return true; } catch (PDOException $e) { return false; } finally { $this->releaseConnection($pdo); } } /** * 获取连接池状态 * @return array 连接池状态 */ public function getPoolStatus() { return [ 'total_connections' => count($this->connections) + count($this->inUse), 'available_connections' => count($this->connections), 'in_use_connections' => count($this->inUse), 'max_connections' => $this->maxConnections ]; } }