|
|
import logging
|
|
|
import time
|
|
|
from checkers.base import BaseChecker
|
|
|
from exporter import SafetyExporter
|
|
|
from models import SafetyStatus, SafetyLevel
|
|
|
|
|
|
logger = logging.getLogger("Checker.Localization")
|
|
|
|
|
|
# localization status protobuf 映射
|
|
|
_STATUS_CANDIDATES = [
|
|
|
("modules.common_msgs.localization_msgs.localization_pb2", "LocalizationStatus"),
|
|
|
]
|
|
|
|
|
|
|
|
|
def _resolve_status_class():
|
|
|
for module, cls_name in _STATUS_CANDIDATES:
|
|
|
try:
|
|
|
import importlib
|
|
|
mod = importlib.import_module(module)
|
|
|
cls = getattr(mod, cls_name, None)
|
|
|
if cls:
|
|
|
return cls
|
|
|
except Exception:
|
|
|
continue
|
|
|
return None
|
|
|
|
|
|
|
|
|
class LocalizationChecker(BaseChecker):
|
|
|
"""检查定位状态是否存在异常。
|
|
|
|
|
|
读取 /apollo/localization/msf_status channel 的融合定位状态,
|
|
|
根据 MeasureState (OK=0, WARNNING=1, ERROR=2, CRITICAL_ERROR=3, FATAL_ERROR=4)
|
|
|
判断定位质量。
|
|
|
"""
|
|
|
|
|
|
# MeasureState 枚举值 → 安全等级
|
|
|
_MEASURE_STATE_MAP = {
|
|
|
0: SafetyLevel.OK,
|
|
|
1: SafetyLevel.WARN,
|
|
|
2: SafetyLevel.ERROR,
|
|
|
3: SafetyLevel.FATAL,
|
|
|
4: SafetyLevel.FATAL,
|
|
|
}
|
|
|
|
|
|
_MEASURE_STATE_NAMES = {
|
|
|
0: "OK", 1: "WARNING", 2: "ERROR",
|
|
|
3: "CRITICAL_ERROR", 4: "FATAL_ERROR",
|
|
|
}
|
|
|
|
|
|
def __init__(self, config: dict, exporter: SafetyExporter, cyber_node=None,
|
|
|
channel_freq_checker=None):
|
|
|
super().__init__("LocalizationChecker", config, exporter)
|
|
|
self.cyber_node = cyber_node
|
|
|
self._freq_checker = channel_freq_checker # 复用 ChannelFreqChecker 的数据
|
|
|
self._latest_raw = None
|
|
|
self._latest_time = 0
|
|
|
self._status_timeout = config.get("status_timeout", 2.0)
|
|
|
self._fatal_threshold = config.get("fatal_threshold", 3)
|
|
|
self._msg_class = _resolve_status_class()
|
|
|
|
|
|
def start_subscribing(self):
|
|
|
if self._freq_checker is not None:
|
|
|
logger.info("Using shared data from ChannelFreqChecker for msf_status")
|
|
|
return
|
|
|
if self.cyber_node is None:
|
|
|
logger.warning("No cyber node provided, localization monitoring disabled")
|
|
|
return
|
|
|
logger.warning("No freq_checker available, localization data may not be received")
|
|
|
|
|
|
def stop_subscribing(self):
|
|
|
pass
|
|
|
|
|
|
def _parse_status_value(self) -> int:
|
|
|
"""解析 LocalizationStatus,返回 fusion_status 的枚举数值。"""
|
|
|
if self._latest_raw is None:
|
|
|
return -1
|
|
|
if self._msg_class is None:
|
|
|
return -1
|
|
|
try:
|
|
|
msg = self._msg_class()
|
|
|
msg.ParseFromString(self._latest_raw)
|
|
|
fs = getattr(msg, "fusion_status", None)
|
|
|
if fs is not None:
|
|
|
if hasattr(fs, "value"):
|
|
|
return fs.value
|
|
|
return int(fs)
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Failed to parse LocalizationStatus: {e}")
|
|
|
return -1
|
|
|
|
|
|
def run_once(self) -> SafetyStatus:
|
|
|
# 复用 ChannelFreqChecker 已收到的 raw data
|
|
|
if self._freq_checker is not None:
|
|
|
self._latest_raw = self._freq_checker._latest_data.get(
|
|
|
"/apollo/localization/msf_status")
|
|
|
self._latest_time = self._freq_checker._latest_time.get(
|
|
|
"/apollo/localization/msf_status", 0)
|
|
|
|
|
|
if self._latest_raw is None:
|
|
|
return SafetyStatus(
|
|
|
name="LocalizationCheck", source=self.name, level=SafetyLevel.ERROR,
|
|
|
message="No localization status data received",
|
|
|
details={"status": "never_received"},
|
|
|
)
|
|
|
|
|
|
elapsed = time.time() - self._latest_time
|
|
|
if elapsed > self._status_timeout:
|
|
|
return SafetyStatus(
|
|
|
name="LocalizationCheck", source=self.name, level=SafetyLevel.ERROR,
|
|
|
message=f"Localization status timeout ({elapsed:.1f}s > {self._status_timeout}s)",
|
|
|
)
|
|
|
|
|
|
status_val = self._parse_status_value()
|
|
|
if status_val < 0:
|
|
|
return SafetyStatus(
|
|
|
name="LocalizationCheck", source=self.name, level=SafetyLevel.ERROR,
|
|
|
message="Failed to parse localization status",
|
|
|
)
|
|
|
|
|
|
state_name = self._MEASURE_STATE_NAMES.get(status_val, f"UNKNOWN({status_val})")
|
|
|
if status_val >= self._fatal_threshold:
|
|
|
level = SafetyLevel.FATAL
|
|
|
msg = f"Localization FATAL ({state_name})"
|
|
|
logger.warning(f"Localization FATAL: {state_name}")
|
|
|
elif status_val >= 1:
|
|
|
level = self._MEASURE_STATE_MAP.get(status_val, SafetyLevel.ERROR)
|
|
|
msg = f"Localization {state_name}"
|
|
|
logger.warning(f"Localization error: {state_name}")
|
|
|
else:
|
|
|
level = SafetyLevel.OK
|
|
|
msg = "Localization OK"
|
|
|
|
|
|
return SafetyStatus(
|
|
|
name="LocalizationCheck", source=self.name, level=level,
|
|
|
message=msg,
|
|
|
details={"status_value": status_val, "state": state_name,
|
|
|
"threshold": self._fatal_threshold},
|
|
|
tags=["localization"],
|
|
|
)
|