""" 数据库连接与工具函数 """ import mysql.connector import json import datetime import base64 import os # ─── 数据库配置(优先读取环境变量,兼容本地直接运行) ─── DB_CONFIG = { 'host': os.environ.get('DB_HOST', 'localhost'), 'port': int(os.environ.get('DB_PORT', 3306)), 'user': os.environ.get('DB_USER', 'root'), 'password': os.environ.get('DB_PASSWORD', '123456'), 'database': os.environ.get('DB_NAME', 'lingshu'), 'charset': 'utf8mb4', 'use_unicode': True, 'autocommit': False, } def get_conn(): return mysql.connector.connect(**DB_CONFIG) def to_json(obj): """将 Python dict/list 转为 JSON 字符串(已是字符串则直接返回)""" if obj is None: return None if isinstance(obj, (dict, list)): return json.dumps(obj, ensure_ascii=False) return obj def serialize_rows(rows): """将查询结果中的 datetime / Decimal 等类型序列化为 JSON 可用格式""" result = [] for row in rows: new_row = {} for k, v in row.items(): if isinstance(v, (datetime.datetime, datetime.date)): new_row[k] = v.isoformat() elif hasattr(v, '__float__'): new_row[k] = float(v) elif isinstance(v, bytes): new_row[k] = v.decode('utf-8', errors='replace') else: new_row[k] = v result.append(new_row) return result def serialize_row(row): """序列化单行""" return serialize_rows([row])[0] if row else None # ─── 极简"加密"(演示用,生产环境请用真正的 AES-256) ─── _SECRET = b'lingshu_secret_key_demo_32bytes!' def simple_encrypt(plaintext: str) -> str: """Base64 编码(仅演示,生产请替换为 cryptography.fernet)""" return base64.b64encode(plaintext.encode()).decode() def simple_decrypt(ciphertext: str) -> str: return base64.b64decode(ciphertext.encode()).decode()