from typing import List, Tuple, Optional import sqlite3 class Db_Operation: def __init__(self, db_name: str = "rainbow_chain.db", table_name: str = "rainbow_table"): """ 初始化数据库连接和表结构,并进行必要的优化配置。 :param db_name: 数据库文件名 :param table_name: 表名 """ self.db_name = db_name self.table_name = table_name self.conn = sqlite3.connect(self.db_name, timeout=30) # 设置较高的超时时间以应对高并发 self.cursor = self.conn.cursor() if not self._initialize_table(): self.close() return if not self._configure_database(): self.close() return if not self._create_indexes(): self.close() return def _initialize_table(self) -> bool: """ 创建表结构,如果表不存在。 :return: 如果成功返回 True,否则返回 False """ try: create_table_query = f""" CREATE TABLE IF NOT EXISTS {self.table_name} ( id INTEGER PRIMARY KEY AUTOINCREMENT, start_str TEXT NOT NULL, end_str TEXT NOT NULL ) """ self.cursor.execute(create_table_query) self.conn.commit() return True except sqlite3.Error: return False def _create_indexes(self) -> bool: """ 为 end_str 列创建索引,以加速查询。 :return: 如果成功返回 True,否则返回 False """ try: create_index_query = f"CREATE INDEX IF NOT EXISTS idx_{self.table_name}_end_str ON {self.table_name} " \ f"(end_str) " self.cursor.execute(create_index_query) self.conn.commit() return True except sqlite3.Error: return False def _configure_database(self) -> bool: """ 配置SQLite参数以优化性能。 :return: 如果成功返回 True,否则返回 False """ try: # 启用WAL模式,提高并发性能 self.cursor.execute("PRAGMA journal_mode=WAL;") # 增加缓存大小,以减少磁盘I/O self.cursor.execute("PRAGMA cache_size = 10000;") # 根据需要调整 # 关闭同步以提高写入性能(注意:存在数据丢失风险) self.cursor.execute("PRAGMA synchronous = NORMAL;") # 启用内存中的临时存储,以提升性能 self.cursor.execute("PRAGMA temp_store = MEMORY;") # 提高SQLITE的锁定超时,从默认的5秒延长到30秒 self.cursor.execute("PRAGMA busy_timeout = 30000;") self.conn.commit() return True except sqlite3.Error: return False def write_to_database(self, data: List[Tuple[str, str]]) -> bool: """ 批量写入数据到数据库。使用事务来提升性能。 :param data: 包含 (start_str, end_str) 元组的列表 :return: 如果成功返回 True,否则返回 False """ if not data: return True # 如果没有数据需要写入,直接返回 True try: self.cursor.execute("BEGIN TRANSACTION;") insert_query = f"INSERT INTO {self.table_name} (start_str, end_str) VALUES (?, ?)" self.cursor.executemany(insert_query, data) self.conn.commit() return True except sqlite3.Error: self.conn.rollback() return False def check_string_in_end_str(self, target_string: str) -> Optional[Tuple[int, str, str]]: """ 查询 end_str 列中是否存在目标字符串。 :param target_string: 要查询的字符串 :return: 如果有匹配记录,返回一条数据的元组 (id, start_str, end_str);否则返回 None """ try: query = f"SELECT id, start_str, end_str FROM {self.table_name} WHERE end_str = ? LIMIT 1" self.cursor.execute(query, (target_string,)) result = self.cursor.fetchone() return result except sqlite3.Error: return None def close(self): """ 关闭数据库连接。 """ try: self.cursor.close() self.conn.close() except sqlite3.Error: pass def __enter__(self): """ 支持 with 语句,通过返回自身来管理资源。 """ return self def __exit__(self, exc_type, exc_val, exc_tb): """ 支持 with 语句,确保关闭连接。 """ self.close()