|
|
"""
|
|
|
数据库连接与工具函数
|
|
|
"""
|
|
|
import mysql.connector
|
|
|
import json
|
|
|
import datetime
|
|
|
import base64
|
|
|
import os
|
|
|
|
|
|
# ─── 数据库配置(优先读取环境变量,兼容本地直接运行) ───
|
|
|
DB_CONFIG = {
|
|
|
'host': os.environ.get('DB_HOST', 'localhost'),
|
|
|
'port': int(os.environ.get('DB_PORT', 3306)),
|
|
|
'user': os.environ.get('DB_USER', 'root'),
|
|
|
'password': os.environ.get('DB_PASSWORD', '123456'),
|
|
|
'database': os.environ.get('DB_NAME', 'lingshu'),
|
|
|
'charset': 'utf8mb4',
|
|
|
'use_unicode': True,
|
|
|
'autocommit': False,
|
|
|
}
|
|
|
|
|
|
|
|
|
def get_conn():
|
|
|
return mysql.connector.connect(**DB_CONFIG)
|
|
|
|
|
|
|
|
|
def to_json(obj):
|
|
|
"""将 Python dict/list 转为 JSON 字符串(已是字符串则直接返回)"""
|
|
|
if obj is None:
|
|
|
return None
|
|
|
if isinstance(obj, (dict, list)):
|
|
|
return json.dumps(obj, ensure_ascii=False)
|
|
|
return obj
|
|
|
|
|
|
|
|
|
def serialize_rows(rows):
|
|
|
"""将查询结果中的 datetime / Decimal 等类型序列化为 JSON 可用格式"""
|
|
|
result = []
|
|
|
for row in rows:
|
|
|
new_row = {}
|
|
|
for k, v in row.items():
|
|
|
if isinstance(v, (datetime.datetime, datetime.date)):
|
|
|
new_row[k] = v.isoformat()
|
|
|
elif hasattr(v, '__float__'):
|
|
|
new_row[k] = float(v)
|
|
|
elif isinstance(v, bytes):
|
|
|
new_row[k] = v.decode('utf-8', errors='replace')
|
|
|
else:
|
|
|
new_row[k] = v
|
|
|
result.append(new_row)
|
|
|
return result
|
|
|
|
|
|
|
|
|
def serialize_row(row):
|
|
|
"""序列化单行"""
|
|
|
return serialize_rows([row])[0] if row else None
|
|
|
|
|
|
|
|
|
# ─── 极简"加密"(演示用,生产环境请用真正的 AES-256) ───
|
|
|
_SECRET = b'lingshu_secret_key_demo_32bytes!'
|
|
|
|
|
|
|
|
|
def simple_encrypt(plaintext: str) -> str:
|
|
|
"""Base64 编码(仅演示,生产请替换为 cryptography.fernet)"""
|
|
|
return base64.b64encode(plaintext.encode()).decode()
|
|
|
|
|
|
|
|
|
def simple_decrypt(ciphertext: str) -> str:
|
|
|
return base64.b64decode(ciphertext.encode()).decode()
|