You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

140 lines
5.0 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import logging
import time
from checkers.base import BaseChecker
from exporter import SafetyExporter
from models import SafetyStatus, SafetyLevel
logger = logging.getLogger("Checker.Localization")
# localization status protobuf 映射
_STATUS_CANDIDATES = [
("modules.common_msgs.localization_msgs.localization_pb2", "LocalizationStatus"),
]
def _resolve_status_class():
for module, cls_name in _STATUS_CANDIDATES:
try:
import importlib
mod = importlib.import_module(module)
cls = getattr(mod, cls_name, None)
if cls:
return cls
except Exception:
continue
return None
class LocalizationChecker(BaseChecker):
"""检查定位状态是否存在异常。
读取 /apollo/localization/msf_status channel 的融合定位状态,
根据 MeasureState (OK=0, WARNNING=1, ERROR=2, CRITICAL_ERROR=3, FATAL_ERROR=4)
判断定位质量。
"""
# MeasureState 枚举值 → 安全等级
_MEASURE_STATE_MAP = {
0: SafetyLevel.OK,
1: SafetyLevel.WARN,
2: SafetyLevel.ERROR,
3: SafetyLevel.FATAL,
4: SafetyLevel.FATAL,
}
_MEASURE_STATE_NAMES = {
0: "OK", 1: "WARNING", 2: "ERROR",
3: "CRITICAL_ERROR", 4: "FATAL_ERROR",
}
def __init__(self, config: dict, exporter: SafetyExporter, cyber_node=None,
channel_freq_checker=None):
super().__init__("LocalizationChecker", config, exporter)
self.cyber_node = cyber_node
self._freq_checker = channel_freq_checker # 复用 ChannelFreqChecker 的数据
self._latest_raw = None
self._latest_time = 0
self._status_timeout = config.get("status_timeout", 2.0)
self._fatal_threshold = config.get("fatal_threshold", 3)
self._msg_class = _resolve_status_class()
def start_subscribing(self):
if self._freq_checker is not None:
logger.info("Using shared data from ChannelFreqChecker for msf_status")
return
if self.cyber_node is None:
logger.warning("No cyber node provided, localization monitoring disabled")
return
logger.warning("No freq_checker available, localization data may not be received")
def stop_subscribing(self):
pass
def _parse_status_value(self) -> int:
"""解析 LocalizationStatus返回 fusion_status 的枚举数值。"""
if self._latest_raw is None:
return -1
if self._msg_class is None:
return -1
try:
msg = self._msg_class()
msg.ParseFromString(self._latest_raw)
fs = getattr(msg, "fusion_status", None)
if fs is not None:
if hasattr(fs, "value"):
return fs.value
return int(fs)
except Exception as e:
logger.debug(f"Failed to parse LocalizationStatus: {e}")
return -1
def run_once(self) -> SafetyStatus:
# 复用 ChannelFreqChecker 已收到的 raw data
if self._freq_checker is not None:
self._latest_raw = self._freq_checker._latest_data.get(
"/apollo/localization/msf_status")
self._latest_time = self._freq_checker._latest_time.get(
"/apollo/localization/msf_status", 0)
if self._latest_raw is None:
return SafetyStatus(
name="LocalizationCheck", source=self.name, level=SafetyLevel.ERROR,
message="No localization status data received",
details={"status": "never_received"},
)
elapsed = time.time() - self._latest_time
if elapsed > self._status_timeout:
return SafetyStatus(
name="LocalizationCheck", source=self.name, level=SafetyLevel.ERROR,
message=f"Localization status timeout ({elapsed:.1f}s > {self._status_timeout}s)",
)
status_val = self._parse_status_value()
if status_val < 0:
return SafetyStatus(
name="LocalizationCheck", source=self.name, level=SafetyLevel.ERROR,
message="Failed to parse localization status",
)
state_name = self._MEASURE_STATE_NAMES.get(status_val, f"UNKNOWN({status_val})")
if status_val >= self._fatal_threshold:
level = SafetyLevel.FATAL
msg = f"Localization FATAL ({state_name})"
logger.warning(f"Localization FATAL: {state_name}")
elif status_val >= 1:
level = self._MEASURE_STATE_MAP.get(status_val, SafetyLevel.ERROR)
msg = f"Localization {state_name}"
logger.warning(f"Localization error: {state_name}")
else:
level = SafetyLevel.OK
msg = "Localization OK"
return SafetyStatus(
name="LocalizationCheck", source=self.name, level=level,
message=msg,
details={"status_value": status_val, "state": state_name,
"threshold": self._fatal_threshold},
tags=["localization"],
)