from dataclasses import dataclass, field from enum import Enum, auto from typing import Dict, List, Optional, Any, Tuple from datetime import datetime _SAFETY_LEVEL_ORDER = {"OK": 0, "WARN": 1, "ERROR": 2, "FATAL": 3} class SafetyLevel(Enum): OK = "OK" WARN = "WARN" ERROR = "ERROR" FATAL = "FATAL" def __lt__(self, other): if self.__class__ is other.__class__: return _SAFETY_LEVEL_ORDER[self.value] < _SAFETY_LEVEL_ORDER[other.value] return NotImplemented def __gt__(self, other): if self.__class__ is other.__class__: return _SAFETY_LEVEL_ORDER[self.value] > _SAFETY_LEVEL_ORDER[other.value] return NotImplemented def __le__(self, other): if self.__class__ is other.__class__: return _SAFETY_LEVEL_ORDER[self.value] <= _SAFETY_LEVEL_ORDER[other.value] return NotImplemented def __ge__(self, other): if self.__class__ is other.__class__: return _SAFETY_LEVEL_ORDER[self.value] >= _SAFETY_LEVEL_ORDER[other.value] return NotImplemented @classmethod def from_str(cls, level_str: str) -> "SafetyLevel": return cls[level_str] class DegradationMode(Enum): L0_NORMAL = auto() L1_LIMITED = auto() L2_SOFT_STOP = auto() L3_EMERGENCY_STOP = auto() @property def description(self) -> str: descriptions = { DegradationMode.L0_NORMAL: "正常自动驾驶", DegradationMode.L1_LIMITED: "限制模式:降速运行", DegradationMode.L2_SOFT_STOP: "缓刹模式:靠边停车", DegradationMode.L3_EMERGENCY_STOP: "急刹模式:紧急停车", } return descriptions.get(self, "") @property def max_speed_kph(self) -> float: speeds = { DegradationMode.L0_NORMAL: 60.0, DegradationMode.L1_LIMITED: 30.0, DegradationMode.L2_SOFT_STOP: 5.0, DegradationMode.L3_EMERGENCY_STOP: 0.0, } return speeds.get(self, 60.0) @property def allow_lane_change(self) -> bool: return self in (DegradationMode.L0_NORMAL,) class DecisionStrategy(Enum): """决策策略类型。""" SCORE_BASED = auto() VOTING = auto() PRIORITY_BASED = auto() @dataclass class SafetyStatus: name: str source: str level: SafetyLevel message: str timestamp: float = field(default_factory=lambda: datetime.now().timestamp()) raw_value: Optional[Any] = None details: Dict[str, str] = field(default_factory=dict) tags: List[str] = field(default_factory=list) def to_dict(self) -> dict: return { "name": self.name, "source": self.source, "level": self.level.value, "message": self.message, "timestamp": self.timestamp, "tags": self.tags, "details": self.details, } def is_critical(self) -> bool: return self.level in (SafetyLevel.ERROR, SafetyLevel.FATAL) def is_ok(self) -> bool: return self.level == SafetyLevel.OK @dataclass class AnalyzerDecision: source: str level: SafetyLevel message: str filter_info: Dict[str, int] = field(default_factory=dict) duration_ms: int = 0 def is_critical(self) -> bool: return self.level in (SafetyLevel.ERROR, SafetyLevel.FATAL) @dataclass class AggregatedStatus: module: str overall_level: SafetyLevel decisions: List[AnalyzerDecision] = field(default_factory=list) summary: str = "" def get_max_level(self) -> SafetyLevel: levels = [d.level for d in self.decisions] if SafetyLevel.FATAL in levels: return SafetyLevel.FATAL if SafetyLevel.ERROR in levels: return SafetyLevel.ERROR if SafetyLevel.WARN in levels: return SafetyLevel.WARN return SafetyLevel.OK def get_critical_count(self) -> int: return sum(1 for d in self.decisions if d.is_critical()) def to_dict(self) -> dict: return { "module": self.module, "overall_level": self.overall_level.value, "decision_count": len(self.decisions), "critical_count": self.get_critical_count(), "summary": self.summary, } @dataclass class SafetyCommand: command_id: str level: SafetyLevel target_mode: DegradationMode reasons: List[str] = field(default_factory=list) timestamp: float = field(default_factory=lambda: datetime.now().timestamp()) strategy: DecisionStrategy = DecisionStrategy.SCORE_BASED def to_log(self) -> str: return (f"[CMD#{self.command_id}] {self.level.value}→{self.target_mode.name} " f"| Strategy: {self.strategy.name} " f"| Reasons: {', '.join(self.reasons)}") def to_dict(self) -> dict: return { "command_id": self.command_id, "level": self.level.value, "target_mode": self.target_mode.name, "reasons": self.reasons, "strategy": self.strategy.name, "timestamp": self.timestamp, } @dataclass class ControlAction: action_type: str parameters: Dict[str, float] = field(default_factory=dict) request_takeover: bool = False log_message: str = "" def to_dict(self) -> dict: return { "action_type": self.action_type, "parameters": self.parameters, "request_takeover": self.request_takeover, "log_message": self.log_message, } @dataclass class ModeTransition: """系统模式转换记录。""" from_mode: DegradationMode to_mode: DegradationMode trigger_level: SafetyLevel trigger_source: str = "" timestamp: float = field(default_factory=lambda: datetime.now().timestamp()) command_id: str = "" def is_escalation(self) -> bool: """判断是否为模式升级(更严重)。""" return self.to_mode.value > self.from_mode.value def is_deescalation(self) -> bool: """判断是否为模式降级(恢复正常)。""" return self.to_mode.value < self.from_mode.value def to_dict(self) -> dict: return { "from_mode": self.from_mode.name, "to_mode": self.to_mode.name, "trigger_level": self.trigger_level.value, "trigger_source": self.trigger_source, "is_escalation": self.is_escalation(), "timestamp": self.timestamp, } @dataclass class SafetySummary: """安全状态汇总。""" current_mode: DegradationMode = DegradationMode.L0_NORMAL total_checkers: int = 0 ok_count: int = 0 warn_count: int = 0 error_count: int = 0 fatal_count: int = 0 active_risks: List[str] = field(default_factory=list) last_update: float = field(default_factory=lambda: datetime.now().timestamp()) @property def health_percent(self) -> float: """健康度百分比。""" if self.total_checkers == 0: return 100.0 return (self.ok_count / self.total_checkers) * 100 def to_dict(self) -> dict: return { "current_mode": self.current_mode.name, "total_checkers": self.total_checkers, "ok_count": self.ok_count, "warn_count": self.warn_count, "error_count": self.error_count, "fatal_count": self.fatal_count, "health_percent": round(self.health_percent, 1), "active_risks": self.active_risks, "last_update": self.last_update, }