You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

368 lines
13 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""数据记录模块。
负责将安全检查结果和系统状态数据持久化到磁盘,
支持以下功能:
- 安全状态事件记录
- 控制指令变更记录
- 系统模式变更记录
- 数据落盘cyber_recorder 录制管理)
- 历史数据查询和回放
- 数据自动清理
"""
import os
import json
import time
import threading
import logging
from typing import Dict, List, Optional, Any
from pathlib import Path
from datetime import datetime, timedelta
from dataclasses import dataclass, field, asdict
logger = logging.getLogger("Recorder")
@dataclass
class SafetyEvent:
"""安全事件记录。"""
event_id: str = ""
event_type: str = ""
level: str = ""
source: str = ""
message: str = ""
details: Dict[str, Any] = field(default_factory=dict)
timestamp: float = 0.0
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class ModeChangeEvent:
"""系统模式变更记录。"""
from_mode: str = ""
to_mode: str = ""
trigger_source: str = ""
reason: str = ""
timestamp: float = 0.0
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class ControlCommandEvent:
"""控制指令变更记录。"""
command_id: str = ""
action_type: str = ""
parameters: Dict[str, float] = field(default_factory=dict)
reasons: List[str] = field(default_factory=list)
level: str = ""
timestamp: float = 0.0
def to_dict(self) -> dict:
return asdict(self)
class DataRecorder:
"""安全数据记录器。
将安全检查过程中的关键事件持久化到磁盘文件,
支持按日期归档、自动清理过期数据。
数据存储结构:
{record_dir}/
├── events/
│ ├── 2026-01-15.jsonl
│ └── 2026-01-16.jsonl
├── mode_changes/
│ ├── 2026-01-15.jsonl
│ └── 2026-01-16.jsonl
├── commands/
│ ├── 2026-01-15.jsonl
│ └── 2026-01-16.jsonl
└── summary/
└── latest.json
"""
def __init__(self, record_dir: str = "records", config: dict = None):
base = Path(__file__).resolve().parent
self._record_dir = base / record_dir
self._config = config or {}
self._max_file_size_mb = self._config.get("max_file_size_mb", 50)
self._retention_days = self._config.get("retention_days", 7)
self._flush_interval = self._config.get("flush_interval", 5.0)
# 缓冲区
self._event_buffer: List[SafetyEvent] = []
self._mode_buffer: List[ModeChangeEvent] = []
self._command_buffer: List[ControlCommandEvent] = []
# 统计
self._stats = {
"total_events": 0,
"total_mode_changes": 0,
"total_commands": 0,
"start_time": time.time(),
"last_flush_time": 0,
}
self._running = False
self._flush_thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
# 创建目录结构
self._ensure_dirs()
def _ensure_dirs(self):
"""确保数据目录结构存在。"""
for subdir in ["events", "mode_changes", "commands", "summary"]:
(self._record_dir / subdir).mkdir(parents=True, exist_ok=True)
def start(self):
"""启动后台刷盘线程。"""
self._running = True
self._flush_thread = threading.Thread(target=self._flush_loop, daemon=True)
self._flush_thread.start()
logger.info(f"Data recorder started, dir={self._record_dir}")
def stop(self):
"""停止记录器并刷盘。"""
self._running = False
if self._flush_thread:
self._flush_thread.join(timeout=5)
self._flush_all()
self._write_summary()
logger.info("Data recorder stopped")
def record_event(self, event: SafetyEvent):
"""记录安全事件。"""
if not event.timestamp:
event.timestamp = time.time()
if not event.event_id:
event.event_id = f"evt_{int(event.timestamp * 1000)}"
with self._lock:
self._event_buffer.append(event)
self._stats["total_events"] += 1
logger.debug(f"Recorded event: {event.event_type} [{event.level}]")
def record_mode_change(self, event: ModeChangeEvent):
"""记录系统模式变更。"""
if not event.timestamp:
event.timestamp = time.time()
with self._lock:
self._mode_buffer.append(event)
self._stats["total_mode_changes"] += 1
logger.info(f"Recorded mode change: {event.from_mode} -> {event.to_mode}")
def record_command(self, event: ControlCommandEvent):
"""记录控制指令。"""
if not event.timestamp:
event.timestamp = time.time()
with self._lock:
self._command_buffer.append(event)
self._stats["total_commands"] += 1
logger.debug(f"Recorded command: {event.action_type}")
def query_events(self, start_time: float = 0, end_time: float = 0,
event_type: str = None, level: str = None,
limit: int = 100) -> List[SafetyEvent]:
"""查询历史安全事件。"""
events = []
date_str = datetime.fromtimestamp(start_time or time.time()).strftime("%Y-%m-%d")
filepath = self._record_dir / "events" / f"{date_str}.jsonl"
if not filepath.exists():
return events
try:
with open(filepath, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
event = SafetyEvent(**data)
# 过滤
if start_time and event.timestamp < start_time:
continue
if end_time and event.timestamp > end_time:
continue
if event_type and event.event_type != event_type:
continue
if level and event.level != level:
continue
events.append(event)
if len(events) >= limit:
break
except (json.JSONDecodeError, TypeError):
continue
except Exception as e:
logger.error(f"Failed to query events: {e}")
return events
def query_mode_changes(self, limit: int = 50) -> List[ModeChangeEvent]:
"""查询最近模式变更记录。"""
changes = []
filepath = self._record_dir / "mode_changes" / f"{datetime.now().strftime('%Y-%m-%d')}.jsonl"
if not filepath.exists():
return changes
try:
with open(filepath, "r", encoding="utf-8") as f:
lines = f.readlines()
for line in reversed(lines):
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
changes.append(ModeChangeEvent(**data))
if len(changes) >= limit:
break
except (json.JSONDecodeError, TypeError):
continue
except Exception as e:
logger.error(f"Failed to query mode changes: {e}")
return changes
def get_stats(self) -> dict:
"""获取记录统计信息。"""
stats = dict(self._stats)
stats["uptime_seconds"] = int(time.time() - stats["start_time"])
stats["buffer_sizes"] = {
"events": len(self._event_buffer),
"mode_changes": len(self._mode_buffer),
"commands": len(self._command_buffer),
}
return stats
def cleanup_old_data(self):
"""清理过期数据文件。"""
cutoff = datetime.now() - timedelta(days=self._retention_days)
cutoff_str = cutoff.strftime("%Y-%m-%d")
for subdir in ["events", "mode_changes", "commands"]:
dir_path = self._record_dir / subdir
if not dir_path.exists():
continue
for filepath in dir_path.glob("*.jsonl"):
# 文件名格式: YYYY-MM-DD.jsonl
date_str = filepath.stem
if date_str < cutoff_str:
try:
filepath.unlink()
logger.info(f"Cleaned up old data: {filepath}")
except OSError as e:
logger.error(f"Failed to delete {filepath}: {e}")
def _flush_loop(self):
"""后台刷盘循环。"""
while self._running:
try:
self._flush_all()
except Exception as e:
logger.error(f"Flush error: {e}")
time.sleep(self._flush_interval)
def _flush_all(self):
"""将所有缓冲区数据写入磁盘。"""
with self._lock:
events = list(self._event_buffer)
mode_changes = list(self._mode_buffer)
commands = list(self._command_buffer)
self._event_buffer.clear()
self._mode_buffer.clear()
self._command_buffer.clear()
if events:
self._write_events(events)
if mode_changes:
self._write_mode_changes(mode_changes)
if commands:
self._write_commands(commands)
self._stats["last_flush_time"] = time.time()
# 检查文件大小,必要时轮转
self._rotate_if_needed()
def _write_events(self, events: List[SafetyEvent]):
"""写入安全事件。"""
date_str = datetime.now().strftime("%Y-%m-%d")
filepath = self._record_dir / "events" / f"{date_str}.jsonl"
try:
with open(filepath, "a", encoding="utf-8") as f:
for event in events:
f.write(json.dumps(event.to_dict(), ensure_ascii=False, default=str) + "\n")
except Exception as e:
logger.error(f"Failed to write events: {e}")
def _write_mode_changes(self, changes: List[ModeChangeEvent]):
"""写入模式变更记录。"""
date_str = datetime.now().strftime("%Y-%m-%d")
filepath = self._record_dir / "mode_changes" / f"{date_str}.jsonl"
try:
with open(filepath, "a", encoding="utf-8") as f:
for change in changes:
f.write(json.dumps(change.to_dict(), ensure_ascii=False, default=str) + "\n")
except Exception as e:
logger.error(f"Failed to write mode changes: {e}")
def _write_commands(self, commands: List[ControlCommandEvent]):
"""写入控制指令记录。"""
date_str = datetime.now().strftime("%Y-%m-%d")
filepath = self._record_dir / "commands" / f"{date_str}.jsonl"
try:
with open(filepath, "a", encoding="utf-8") as f:
for cmd in commands:
f.write(json.dumps(cmd.to_dict(), ensure_ascii=False, default=str) + "\n")
except Exception as e:
logger.error(f"Failed to write commands: {e}")
def _write_summary(self):
"""写入摘要信息。"""
filepath = self._record_dir / "summary" / "latest.json"
try:
summary = {
"timestamp": datetime.now().isoformat(),
"stats": self.get_stats(),
"recorder_version": "1.0",
}
with open(filepath, "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2, default=str)
except Exception as e:
logger.error(f"Failed to write summary: {e}")
def _rotate_if_needed(self):
"""检查文件大小,超过限制时轮转。"""
max_bytes = self._max_file_size_mb * 1024 * 1024
date_str = datetime.now().strftime("%Y-%m-%d")
for subdir in ["events", "mode_changes", "commands"]:
filepath = self._record_dir / subdir / f"{date_str}.jsonl"
if filepath.exists():
try:
size = filepath.stat().st_size
if size > max_bytes:
# 轮转:重命名旧文件
rotated = filepath.with_suffix(f".{int(time.time())}.jsonl")
filepath.rename(rotated)
logger.info(f"Rotated {filepath} -> {rotated}")
except OSError:
pass