You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
248 lines
7.4 KiB
248 lines
7.4 KiB
from dataclasses import dataclass, field
|
|
from enum import Enum, auto
|
|
from typing import Dict, List, Optional, Any, Tuple
|
|
from datetime import datetime
|
|
|
|
|
|
_SAFETY_LEVEL_ORDER = {"OK": 0, "WARN": 1, "ERROR": 2, "FATAL": 3}
|
|
|
|
|
|
class SafetyLevel(Enum):
|
|
OK = "OK"
|
|
WARN = "WARN"
|
|
ERROR = "ERROR"
|
|
FATAL = "FATAL"
|
|
|
|
def __lt__(self, other):
|
|
if self.__class__ is other.__class__:
|
|
return _SAFETY_LEVEL_ORDER[self.value] < _SAFETY_LEVEL_ORDER[other.value]
|
|
return NotImplemented
|
|
|
|
def __gt__(self, other):
|
|
if self.__class__ is other.__class__:
|
|
return _SAFETY_LEVEL_ORDER[self.value] > _SAFETY_LEVEL_ORDER[other.value]
|
|
return NotImplemented
|
|
|
|
def __le__(self, other):
|
|
if self.__class__ is other.__class__:
|
|
return _SAFETY_LEVEL_ORDER[self.value] <= _SAFETY_LEVEL_ORDER[other.value]
|
|
return NotImplemented
|
|
|
|
def __ge__(self, other):
|
|
if self.__class__ is other.__class__:
|
|
return _SAFETY_LEVEL_ORDER[self.value] >= _SAFETY_LEVEL_ORDER[other.value]
|
|
return NotImplemented
|
|
|
|
@classmethod
|
|
def from_str(cls, level_str: str) -> "SafetyLevel":
|
|
return cls[level_str]
|
|
|
|
|
|
class DegradationMode(Enum):
|
|
L0_NORMAL = auto()
|
|
L1_LIMITED = auto()
|
|
L2_SOFT_STOP = auto()
|
|
L3_EMERGENCY_STOP = auto()
|
|
|
|
@property
|
|
def description(self) -> str:
|
|
descriptions = {
|
|
DegradationMode.L0_NORMAL: "正常自动驾驶",
|
|
DegradationMode.L1_LIMITED: "限制模式:降速运行",
|
|
DegradationMode.L2_SOFT_STOP: "缓刹模式:靠边停车",
|
|
DegradationMode.L3_EMERGENCY_STOP: "急刹模式:紧急停车",
|
|
}
|
|
return descriptions.get(self, "")
|
|
|
|
@property
|
|
def max_speed_kph(self) -> float:
|
|
speeds = {
|
|
DegradationMode.L0_NORMAL: 60.0,
|
|
DegradationMode.L1_LIMITED: 30.0,
|
|
DegradationMode.L2_SOFT_STOP: 5.0,
|
|
DegradationMode.L3_EMERGENCY_STOP: 0.0,
|
|
}
|
|
return speeds.get(self, 60.0)
|
|
|
|
@property
|
|
def allow_lane_change(self) -> bool:
|
|
return self in (DegradationMode.L0_NORMAL,)
|
|
|
|
|
|
class DecisionStrategy(Enum):
|
|
"""决策策略类型。"""
|
|
SCORE_BASED = auto()
|
|
VOTING = auto()
|
|
PRIORITY_BASED = auto()
|
|
|
|
|
|
@dataclass
|
|
class SafetyStatus:
|
|
name: str
|
|
source: str
|
|
level: SafetyLevel
|
|
message: str
|
|
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
|
|
raw_value: Optional[Any] = None
|
|
details: Dict[str, str] = field(default_factory=dict)
|
|
tags: List[str] = field(default_factory=list)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"name": self.name, "source": self.source,
|
|
"level": self.level.value, "message": self.message,
|
|
"timestamp": self.timestamp, "tags": self.tags,
|
|
"details": self.details,
|
|
}
|
|
|
|
def is_critical(self) -> bool:
|
|
return self.level in (SafetyLevel.ERROR, SafetyLevel.FATAL)
|
|
|
|
def is_ok(self) -> bool:
|
|
return self.level == SafetyLevel.OK
|
|
|
|
|
|
@dataclass
|
|
class AnalyzerDecision:
|
|
source: str
|
|
level: SafetyLevel
|
|
message: str
|
|
filter_info: Dict[str, int] = field(default_factory=dict)
|
|
duration_ms: int = 0
|
|
|
|
def is_critical(self) -> bool:
|
|
return self.level in (SafetyLevel.ERROR, SafetyLevel.FATAL)
|
|
|
|
|
|
@dataclass
|
|
class AggregatedStatus:
|
|
module: str
|
|
overall_level: SafetyLevel
|
|
decisions: List[AnalyzerDecision] = field(default_factory=list)
|
|
summary: str = ""
|
|
|
|
def get_max_level(self) -> SafetyLevel:
|
|
levels = [d.level for d in self.decisions]
|
|
if SafetyLevel.FATAL in levels:
|
|
return SafetyLevel.FATAL
|
|
if SafetyLevel.ERROR in levels:
|
|
return SafetyLevel.ERROR
|
|
if SafetyLevel.WARN in levels:
|
|
return SafetyLevel.WARN
|
|
return SafetyLevel.OK
|
|
|
|
def get_critical_count(self) -> int:
|
|
return sum(1 for d in self.decisions if d.is_critical())
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"module": self.module,
|
|
"overall_level": self.overall_level.value,
|
|
"decision_count": len(self.decisions),
|
|
"critical_count": self.get_critical_count(),
|
|
"summary": self.summary,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class SafetyCommand:
|
|
command_id: str
|
|
level: SafetyLevel
|
|
target_mode: DegradationMode
|
|
reasons: List[str] = field(default_factory=list)
|
|
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
|
|
strategy: DecisionStrategy = DecisionStrategy.SCORE_BASED
|
|
|
|
def to_log(self) -> str:
|
|
return (f"[CMD#{self.command_id}] {self.level.value}→{self.target_mode.name} "
|
|
f"| Strategy: {self.strategy.name} "
|
|
f"| Reasons: {', '.join(self.reasons)}")
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"command_id": self.command_id,
|
|
"level": self.level.value,
|
|
"target_mode": self.target_mode.name,
|
|
"reasons": self.reasons,
|
|
"strategy": self.strategy.name,
|
|
"timestamp": self.timestamp,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ControlAction:
|
|
action_type: str
|
|
parameters: Dict[str, float] = field(default_factory=dict)
|
|
request_takeover: bool = False
|
|
log_message: str = ""
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"action_type": self.action_type,
|
|
"parameters": self.parameters,
|
|
"request_takeover": self.request_takeover,
|
|
"log_message": self.log_message,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class ModeTransition:
|
|
"""系统模式转换记录。"""
|
|
from_mode: DegradationMode
|
|
to_mode: DegradationMode
|
|
trigger_level: SafetyLevel
|
|
trigger_source: str = ""
|
|
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
|
|
command_id: str = ""
|
|
|
|
def is_escalation(self) -> bool:
|
|
"""判断是否为模式升级(更严重)。"""
|
|
return self.to_mode.value > self.from_mode.value
|
|
|
|
def is_deescalation(self) -> bool:
|
|
"""判断是否为模式降级(恢复正常)。"""
|
|
return self.to_mode.value < self.from_mode.value
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"from_mode": self.from_mode.name,
|
|
"to_mode": self.to_mode.name,
|
|
"trigger_level": self.trigger_level.value,
|
|
"trigger_source": self.trigger_source,
|
|
"is_escalation": self.is_escalation(),
|
|
"timestamp": self.timestamp,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class SafetySummary:
|
|
"""安全状态汇总。"""
|
|
current_mode: DegradationMode = DegradationMode.L0_NORMAL
|
|
total_checkers: int = 0
|
|
ok_count: int = 0
|
|
warn_count: int = 0
|
|
error_count: int = 0
|
|
fatal_count: int = 0
|
|
active_risks: List[str] = field(default_factory=list)
|
|
last_update: float = field(default_factory=lambda: datetime.now().timestamp())
|
|
|
|
@property
|
|
def health_percent(self) -> float:
|
|
"""健康度百分比。"""
|
|
if self.total_checkers == 0:
|
|
return 100.0
|
|
return (self.ok_count / self.total_checkers) * 100
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"current_mode": self.current_mode.name,
|
|
"total_checkers": self.total_checkers,
|
|
"ok_count": self.ok_count,
|
|
"warn_count": self.warn_count,
|
|
"error_count": self.error_count,
|
|
"fatal_count": self.fatal_count,
|
|
"health_percent": round(self.health_percent, 1),
|
|
"active_risks": self.active_risks,
|
|
"last_update": self.last_update,
|
|
}
|