You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

248 lines
7.4 KiB

from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime
_SAFETY_LEVEL_ORDER = {"OK": 0, "WARN": 1, "ERROR": 2, "FATAL": 3}
class SafetyLevel(Enum):
OK = "OK"
WARN = "WARN"
ERROR = "ERROR"
FATAL = "FATAL"
def __lt__(self, other):
if self.__class__ is other.__class__:
return _SAFETY_LEVEL_ORDER[self.value] < _SAFETY_LEVEL_ORDER[other.value]
return NotImplemented
def __gt__(self, other):
if self.__class__ is other.__class__:
return _SAFETY_LEVEL_ORDER[self.value] > _SAFETY_LEVEL_ORDER[other.value]
return NotImplemented
def __le__(self, other):
if self.__class__ is other.__class__:
return _SAFETY_LEVEL_ORDER[self.value] <= _SAFETY_LEVEL_ORDER[other.value]
return NotImplemented
def __ge__(self, other):
if self.__class__ is other.__class__:
return _SAFETY_LEVEL_ORDER[self.value] >= _SAFETY_LEVEL_ORDER[other.value]
return NotImplemented
@classmethod
def from_str(cls, level_str: str) -> "SafetyLevel":
return cls[level_str]
class DegradationMode(Enum):
L0_NORMAL = auto()
L1_LIMITED = auto()
L2_SOFT_STOP = auto()
L3_EMERGENCY_STOP = auto()
@property
def description(self) -> str:
descriptions = {
DegradationMode.L0_NORMAL: "正常自动驾驶",
DegradationMode.L1_LIMITED: "限制模式:降速运行",
DegradationMode.L2_SOFT_STOP: "缓刹模式:靠边停车",
DegradationMode.L3_EMERGENCY_STOP: "急刹模式:紧急停车",
}
return descriptions.get(self, "")
@property
def max_speed_kph(self) -> float:
speeds = {
DegradationMode.L0_NORMAL: 60.0,
DegradationMode.L1_LIMITED: 30.0,
DegradationMode.L2_SOFT_STOP: 5.0,
DegradationMode.L3_EMERGENCY_STOP: 0.0,
}
return speeds.get(self, 60.0)
@property
def allow_lane_change(self) -> bool:
return self in (DegradationMode.L0_NORMAL,)
class DecisionStrategy(Enum):
"""决策策略类型。"""
SCORE_BASED = auto()
VOTING = auto()
PRIORITY_BASED = auto()
@dataclass
class SafetyStatus:
name: str
source: str
level: SafetyLevel
message: str
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
raw_value: Optional[Any] = None
details: Dict[str, str] = field(default_factory=dict)
tags: List[str] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"name": self.name, "source": self.source,
"level": self.level.value, "message": self.message,
"timestamp": self.timestamp, "tags": self.tags,
"details": self.details,
}
def is_critical(self) -> bool:
return self.level in (SafetyLevel.ERROR, SafetyLevel.FATAL)
def is_ok(self) -> bool:
return self.level == SafetyLevel.OK
@dataclass
class AnalyzerDecision:
source: str
level: SafetyLevel
message: str
filter_info: Dict[str, int] = field(default_factory=dict)
duration_ms: int = 0
def is_critical(self) -> bool:
return self.level in (SafetyLevel.ERROR, SafetyLevel.FATAL)
@dataclass
class AggregatedStatus:
module: str
overall_level: SafetyLevel
decisions: List[AnalyzerDecision] = field(default_factory=list)
summary: str = ""
def get_max_level(self) -> SafetyLevel:
levels = [d.level for d in self.decisions]
if SafetyLevel.FATAL in levels:
return SafetyLevel.FATAL
if SafetyLevel.ERROR in levels:
return SafetyLevel.ERROR
if SafetyLevel.WARN in levels:
return SafetyLevel.WARN
return SafetyLevel.OK
def get_critical_count(self) -> int:
return sum(1 for d in self.decisions if d.is_critical())
def to_dict(self) -> dict:
return {
"module": self.module,
"overall_level": self.overall_level.value,
"decision_count": len(self.decisions),
"critical_count": self.get_critical_count(),
"summary": self.summary,
}
@dataclass
class SafetyCommand:
command_id: str
level: SafetyLevel
target_mode: DegradationMode
reasons: List[str] = field(default_factory=list)
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
strategy: DecisionStrategy = DecisionStrategy.SCORE_BASED
def to_log(self) -> str:
return (f"[CMD#{self.command_id}] {self.level.value}{self.target_mode.name} "
f"| Strategy: {self.strategy.name} "
f"| Reasons: {', '.join(self.reasons)}")
def to_dict(self) -> dict:
return {
"command_id": self.command_id,
"level": self.level.value,
"target_mode": self.target_mode.name,
"reasons": self.reasons,
"strategy": self.strategy.name,
"timestamp": self.timestamp,
}
@dataclass
class ControlAction:
action_type: str
parameters: Dict[str, float] = field(default_factory=dict)
request_takeover: bool = False
log_message: str = ""
def to_dict(self) -> dict:
return {
"action_type": self.action_type,
"parameters": self.parameters,
"request_takeover": self.request_takeover,
"log_message": self.log_message,
}
@dataclass
class ModeTransition:
"""系统模式转换记录。"""
from_mode: DegradationMode
to_mode: DegradationMode
trigger_level: SafetyLevel
trigger_source: str = ""
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
command_id: str = ""
def is_escalation(self) -> bool:
"""判断是否为模式升级(更严重)。"""
return self.to_mode.value > self.from_mode.value
def is_deescalation(self) -> bool:
"""判断是否为模式降级(恢复正常)。"""
return self.to_mode.value < self.from_mode.value
def to_dict(self) -> dict:
return {
"from_mode": self.from_mode.name,
"to_mode": self.to_mode.name,
"trigger_level": self.trigger_level.value,
"trigger_source": self.trigger_source,
"is_escalation": self.is_escalation(),
"timestamp": self.timestamp,
}
@dataclass
class SafetySummary:
"""安全状态汇总。"""
current_mode: DegradationMode = DegradationMode.L0_NORMAL
total_checkers: int = 0
ok_count: int = 0
warn_count: int = 0
error_count: int = 0
fatal_count: int = 0
active_risks: List[str] = field(default_factory=list)
last_update: float = field(default_factory=lambda: datetime.now().timestamp())
@property
def health_percent(self) -> float:
"""健康度百分比。"""
if self.total_checkers == 0:
return 100.0
return (self.ok_count / self.total_checkers) * 100
def to_dict(self) -> dict:
return {
"current_mode": self.current_mode.name,
"total_checkers": self.total_checkers,
"ok_count": self.ok_count,
"warn_count": self.warn_count,
"error_count": self.error_count,
"fatal_count": self.fatal_count,
"health_percent": round(self.health_percent, 1),
"active_risks": self.active_risks,
"last_update": self.last_update,
}