|
|
"""数据记录模块。
|
|
|
|
|
|
负责将安全检查结果和系统状态数据持久化到磁盘,
|
|
|
支持以下功能:
|
|
|
- 安全状态事件记录
|
|
|
- 控制指令变更记录
|
|
|
- 系统模式变更记录
|
|
|
- 数据落盘(cyber_recorder 录制管理)
|
|
|
- 历史数据查询和回放
|
|
|
- 数据自动清理
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import json
|
|
|
import time
|
|
|
import threading
|
|
|
import logging
|
|
|
from typing import Dict, List, Optional, Any
|
|
|
from pathlib import Path
|
|
|
from datetime import datetime, timedelta
|
|
|
from dataclasses import dataclass, field, asdict
|
|
|
|
|
|
logger = logging.getLogger("Recorder")
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class SafetyEvent:
|
|
|
"""安全事件记录。"""
|
|
|
event_id: str = ""
|
|
|
event_type: str = ""
|
|
|
level: str = ""
|
|
|
source: str = ""
|
|
|
message: str = ""
|
|
|
details: Dict[str, Any] = field(default_factory=dict)
|
|
|
timestamp: float = 0.0
|
|
|
tags: List[str] = field(default_factory=list)
|
|
|
|
|
|
def to_dict(self) -> dict:
|
|
|
return asdict(self)
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class ModeChangeEvent:
|
|
|
"""系统模式变更记录。"""
|
|
|
from_mode: str = ""
|
|
|
to_mode: str = ""
|
|
|
trigger_source: str = ""
|
|
|
reason: str = ""
|
|
|
timestamp: float = 0.0
|
|
|
|
|
|
def to_dict(self) -> dict:
|
|
|
return asdict(self)
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class ControlCommandEvent:
|
|
|
"""控制指令变更记录。"""
|
|
|
command_id: str = ""
|
|
|
action_type: str = ""
|
|
|
parameters: Dict[str, float] = field(default_factory=dict)
|
|
|
reasons: List[str] = field(default_factory=list)
|
|
|
level: str = ""
|
|
|
timestamp: float = 0.0
|
|
|
|
|
|
def to_dict(self) -> dict:
|
|
|
return asdict(self)
|
|
|
|
|
|
|
|
|
class DataRecorder:
|
|
|
"""安全数据记录器。
|
|
|
|
|
|
将安全检查过程中的关键事件持久化到磁盘文件,
|
|
|
支持按日期归档、自动清理过期数据。
|
|
|
|
|
|
数据存储结构:
|
|
|
{record_dir}/
|
|
|
├── events/
|
|
|
│ ├── 2026-01-15.jsonl
|
|
|
│ └── 2026-01-16.jsonl
|
|
|
├── mode_changes/
|
|
|
│ ├── 2026-01-15.jsonl
|
|
|
│ └── 2026-01-16.jsonl
|
|
|
├── commands/
|
|
|
│ ├── 2026-01-15.jsonl
|
|
|
│ └── 2026-01-16.jsonl
|
|
|
└── summary/
|
|
|
└── latest.json
|
|
|
"""
|
|
|
|
|
|
def __init__(self, record_dir: str = "records", config: dict = None):
|
|
|
base = Path(__file__).resolve().parent
|
|
|
self._record_dir = base / record_dir
|
|
|
self._config = config or {}
|
|
|
self._max_file_size_mb = self._config.get("max_file_size_mb", 50)
|
|
|
self._retention_days = self._config.get("retention_days", 7)
|
|
|
self._flush_interval = self._config.get("flush_interval", 5.0)
|
|
|
|
|
|
# 缓冲区
|
|
|
self._event_buffer: List[SafetyEvent] = []
|
|
|
self._mode_buffer: List[ModeChangeEvent] = []
|
|
|
self._command_buffer: List[ControlCommandEvent] = []
|
|
|
|
|
|
# 统计
|
|
|
self._stats = {
|
|
|
"total_events": 0,
|
|
|
"total_mode_changes": 0,
|
|
|
"total_commands": 0,
|
|
|
"start_time": time.time(),
|
|
|
"last_flush_time": 0,
|
|
|
}
|
|
|
|
|
|
self._running = False
|
|
|
self._flush_thread: Optional[threading.Thread] = None
|
|
|
self._lock = threading.Lock()
|
|
|
|
|
|
# 创建目录结构
|
|
|
self._ensure_dirs()
|
|
|
|
|
|
def _ensure_dirs(self):
|
|
|
"""确保数据目录结构存在。"""
|
|
|
for subdir in ["events", "mode_changes", "commands", "summary"]:
|
|
|
(self._record_dir / subdir).mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def start(self):
|
|
|
"""启动后台刷盘线程。"""
|
|
|
self._running = True
|
|
|
self._flush_thread = threading.Thread(target=self._flush_loop, daemon=True)
|
|
|
self._flush_thread.start()
|
|
|
logger.info(f"Data recorder started, dir={self._record_dir}")
|
|
|
|
|
|
def stop(self):
|
|
|
"""停止记录器并刷盘。"""
|
|
|
self._running = False
|
|
|
if self._flush_thread:
|
|
|
self._flush_thread.join(timeout=5)
|
|
|
self._flush_all()
|
|
|
self._write_summary()
|
|
|
logger.info("Data recorder stopped")
|
|
|
|
|
|
def record_event(self, event: SafetyEvent):
|
|
|
"""记录安全事件。"""
|
|
|
if not event.timestamp:
|
|
|
event.timestamp = time.time()
|
|
|
if not event.event_id:
|
|
|
event.event_id = f"evt_{int(event.timestamp * 1000)}"
|
|
|
|
|
|
with self._lock:
|
|
|
self._event_buffer.append(event)
|
|
|
self._stats["total_events"] += 1
|
|
|
|
|
|
logger.debug(f"Recorded event: {event.event_type} [{event.level}]")
|
|
|
|
|
|
def record_mode_change(self, event: ModeChangeEvent):
|
|
|
"""记录系统模式变更。"""
|
|
|
if not event.timestamp:
|
|
|
event.timestamp = time.time()
|
|
|
|
|
|
with self._lock:
|
|
|
self._mode_buffer.append(event)
|
|
|
self._stats["total_mode_changes"] += 1
|
|
|
|
|
|
logger.info(f"Recorded mode change: {event.from_mode} -> {event.to_mode}")
|
|
|
|
|
|
def record_command(self, event: ControlCommandEvent):
|
|
|
"""记录控制指令。"""
|
|
|
if not event.timestamp:
|
|
|
event.timestamp = time.time()
|
|
|
|
|
|
with self._lock:
|
|
|
self._command_buffer.append(event)
|
|
|
self._stats["total_commands"] += 1
|
|
|
|
|
|
logger.debug(f"Recorded command: {event.action_type}")
|
|
|
|
|
|
def query_events(self, start_time: float = 0, end_time: float = 0,
|
|
|
event_type: str = None, level: str = None,
|
|
|
limit: int = 100) -> List[SafetyEvent]:
|
|
|
"""查询历史安全事件。"""
|
|
|
events = []
|
|
|
date_str = datetime.fromtimestamp(start_time or time.time()).strftime("%Y-%m-%d")
|
|
|
filepath = self._record_dir / "events" / f"{date_str}.jsonl"
|
|
|
|
|
|
if not filepath.exists():
|
|
|
return events
|
|
|
|
|
|
try:
|
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
|
for line in f:
|
|
|
line = line.strip()
|
|
|
if not line:
|
|
|
continue
|
|
|
try:
|
|
|
data = json.loads(line)
|
|
|
event = SafetyEvent(**data)
|
|
|
|
|
|
# 过滤
|
|
|
if start_time and event.timestamp < start_time:
|
|
|
continue
|
|
|
if end_time and event.timestamp > end_time:
|
|
|
continue
|
|
|
if event_type and event.event_type != event_type:
|
|
|
continue
|
|
|
if level and event.level != level:
|
|
|
continue
|
|
|
|
|
|
events.append(event)
|
|
|
if len(events) >= limit:
|
|
|
break
|
|
|
except (json.JSONDecodeError, TypeError):
|
|
|
continue
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to query events: {e}")
|
|
|
|
|
|
return events
|
|
|
|
|
|
def query_mode_changes(self, limit: int = 50) -> List[ModeChangeEvent]:
|
|
|
"""查询最近模式变更记录。"""
|
|
|
changes = []
|
|
|
filepath = self._record_dir / "mode_changes" / f"{datetime.now().strftime('%Y-%m-%d')}.jsonl"
|
|
|
|
|
|
if not filepath.exists():
|
|
|
return changes
|
|
|
|
|
|
try:
|
|
|
with open(filepath, "r", encoding="utf-8") as f:
|
|
|
lines = f.readlines()
|
|
|
for line in reversed(lines):
|
|
|
line = line.strip()
|
|
|
if not line:
|
|
|
continue
|
|
|
try:
|
|
|
data = json.loads(line)
|
|
|
changes.append(ModeChangeEvent(**data))
|
|
|
if len(changes) >= limit:
|
|
|
break
|
|
|
except (json.JSONDecodeError, TypeError):
|
|
|
continue
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to query mode changes: {e}")
|
|
|
|
|
|
return changes
|
|
|
|
|
|
def get_stats(self) -> dict:
|
|
|
"""获取记录统计信息。"""
|
|
|
stats = dict(self._stats)
|
|
|
stats["uptime_seconds"] = int(time.time() - stats["start_time"])
|
|
|
stats["buffer_sizes"] = {
|
|
|
"events": len(self._event_buffer),
|
|
|
"mode_changes": len(self._mode_buffer),
|
|
|
"commands": len(self._command_buffer),
|
|
|
}
|
|
|
return stats
|
|
|
|
|
|
def cleanup_old_data(self):
|
|
|
"""清理过期数据文件。"""
|
|
|
cutoff = datetime.now() - timedelta(days=self._retention_days)
|
|
|
cutoff_str = cutoff.strftime("%Y-%m-%d")
|
|
|
|
|
|
for subdir in ["events", "mode_changes", "commands"]:
|
|
|
dir_path = self._record_dir / subdir
|
|
|
if not dir_path.exists():
|
|
|
continue
|
|
|
for filepath in dir_path.glob("*.jsonl"):
|
|
|
# 文件名格式: YYYY-MM-DD.jsonl
|
|
|
date_str = filepath.stem
|
|
|
if date_str < cutoff_str:
|
|
|
try:
|
|
|
filepath.unlink()
|
|
|
logger.info(f"Cleaned up old data: {filepath}")
|
|
|
except OSError as e:
|
|
|
logger.error(f"Failed to delete {filepath}: {e}")
|
|
|
|
|
|
def _flush_loop(self):
|
|
|
"""后台刷盘循环。"""
|
|
|
while self._running:
|
|
|
try:
|
|
|
self._flush_all()
|
|
|
except Exception as e:
|
|
|
logger.error(f"Flush error: {e}")
|
|
|
time.sleep(self._flush_interval)
|
|
|
|
|
|
def _flush_all(self):
|
|
|
"""将所有缓冲区数据写入磁盘。"""
|
|
|
with self._lock:
|
|
|
events = list(self._event_buffer)
|
|
|
mode_changes = list(self._mode_buffer)
|
|
|
commands = list(self._command_buffer)
|
|
|
self._event_buffer.clear()
|
|
|
self._mode_buffer.clear()
|
|
|
self._command_buffer.clear()
|
|
|
|
|
|
if events:
|
|
|
self._write_events(events)
|
|
|
if mode_changes:
|
|
|
self._write_mode_changes(mode_changes)
|
|
|
if commands:
|
|
|
self._write_commands(commands)
|
|
|
|
|
|
self._stats["last_flush_time"] = time.time()
|
|
|
|
|
|
# 检查文件大小,必要时轮转
|
|
|
self._rotate_if_needed()
|
|
|
|
|
|
def _write_events(self, events: List[SafetyEvent]):
|
|
|
"""写入安全事件。"""
|
|
|
date_str = datetime.now().strftime("%Y-%m-%d")
|
|
|
filepath = self._record_dir / "events" / f"{date_str}.jsonl"
|
|
|
try:
|
|
|
with open(filepath, "a", encoding="utf-8") as f:
|
|
|
for event in events:
|
|
|
f.write(json.dumps(event.to_dict(), ensure_ascii=False, default=str) + "\n")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to write events: {e}")
|
|
|
|
|
|
def _write_mode_changes(self, changes: List[ModeChangeEvent]):
|
|
|
"""写入模式变更记录。"""
|
|
|
date_str = datetime.now().strftime("%Y-%m-%d")
|
|
|
filepath = self._record_dir / "mode_changes" / f"{date_str}.jsonl"
|
|
|
try:
|
|
|
with open(filepath, "a", encoding="utf-8") as f:
|
|
|
for change in changes:
|
|
|
f.write(json.dumps(change.to_dict(), ensure_ascii=False, default=str) + "\n")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to write mode changes: {e}")
|
|
|
|
|
|
def _write_commands(self, commands: List[ControlCommandEvent]):
|
|
|
"""写入控制指令记录。"""
|
|
|
date_str = datetime.now().strftime("%Y-%m-%d")
|
|
|
filepath = self._record_dir / "commands" / f"{date_str}.jsonl"
|
|
|
try:
|
|
|
with open(filepath, "a", encoding="utf-8") as f:
|
|
|
for cmd in commands:
|
|
|
f.write(json.dumps(cmd.to_dict(), ensure_ascii=False, default=str) + "\n")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to write commands: {e}")
|
|
|
|
|
|
def _write_summary(self):
|
|
|
"""写入摘要信息。"""
|
|
|
filepath = self._record_dir / "summary" / "latest.json"
|
|
|
try:
|
|
|
summary = {
|
|
|
"timestamp": datetime.now().isoformat(),
|
|
|
"stats": self.get_stats(),
|
|
|
"recorder_version": "1.0",
|
|
|
}
|
|
|
with open(filepath, "w", encoding="utf-8") as f:
|
|
|
json.dump(summary, f, ensure_ascii=False, indent=2, default=str)
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to write summary: {e}")
|
|
|
|
|
|
def _rotate_if_needed(self):
|
|
|
"""检查文件大小,超过限制时轮转。"""
|
|
|
max_bytes = self._max_file_size_mb * 1024 * 1024
|
|
|
date_str = datetime.now().strftime("%Y-%m-%d")
|
|
|
|
|
|
for subdir in ["events", "mode_changes", "commands"]:
|
|
|
filepath = self._record_dir / subdir / f"{date_str}.jsonl"
|
|
|
if filepath.exists():
|
|
|
try:
|
|
|
size = filepath.stat().st_size
|
|
|
if size > max_bytes:
|
|
|
# 轮转:重命名旧文件
|
|
|
rotated = filepath.with_suffix(f".{int(time.time())}.jsonl")
|
|
|
filepath.rename(rotated)
|
|
|
logger.info(f"Rotated {filepath} -> {rotated}")
|
|
|
except OSError:
|
|
|
pass
|