You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
5.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import os
import time
import logging
from checkers.base import BaseChecker
from exporter import SafetyExporter
from models import SafetyStatus, SafetyLevel
from cyber.python.cyber_py3 import cyber
logger = logging.getLogger("Checker.Process")
# 模块名 → 输出 channel如果有 writer 说明模块在运行)
_MODULE_CHANNEL_MAP = {
"planning": "/apollo/planning",
"control": "/apollo/control",
"perception": "/apollo/perception/obstacles",
}
# 模块名 → 可能的 cyber node 名列表
_MODULE_NODE_MAP = {
"perception": [
"MsgAdapterComponent",
"LidarDetection", "LidarTracking", "LidarDetectionFilter",
"PointCloudPreprocess", "PointCloudMapROI", "PointCloudGroundDetect",
"MultiSensorFusion",
"CameraDetectionSingleStageComponent",
"CameraDetectionMultiStageComponent",
"CameraDetectionBevComponent",
"CameraLocationEstimationComponent",
"CameraLocationRefinementComponent",
"CameraTrackingComponent",
"TrafficLightDetectionComponent",
"TrafficLightTrackingComponent",
"TrafficLightRecognitionComponent",
"TrafficLightRegionProposalComponent",
"RadarDetectionComponent",
"LaneDetectionComponent",
],
}
class ProcessChecker(BaseChecker):
def __init__(self, config: dict, exporter: SafetyExporter):
super().__init__("ProcessChecker", config, exporter)
self._last_cyber_refresh = 0
self._cached_nodes = set()
self._cached_channels = set()
self._refresh_interval = config.get("refresh_interval", 10.0)
def _refresh_cyber_state(self):
"""刷新 cyber 拓扑信息缓存,避免每次 2s 阻塞主循环。"""
now = time.time()
if now - self._last_cyber_refresh < self._refresh_interval and self._cached_nodes:
return
try:
# 首次发现需要 sleep 等待 DDS 拓扑发现
sleep_s = 2 if self._last_cyber_refresh == 0 else 0
self._cached_nodes = set(cyber.NodeUtils.get_nodes(sleep_s))
self._cached_channels = set(cyber.ChannelUtils.get_channels(sleep_s))
self._last_cyber_refresh = now
logger.debug(f"Discovered {len(self._cached_nodes)} nodes, "
f"{len(self._cached_channels)} channels")
except Exception as e:
logger.debug(f"Failed to refresh cyber state: {e}")
def _is_active_by_cmdline(self, proc_name: str) -> bool:
"""扫描 /proc 进程 cmdline 是否包含模块名。"""
try:
for pid_dir in os.listdir("/proc"):
if not pid_dir.isdigit():
continue
try:
with open(f"/proc/{pid_dir}/cmdline", "rb") as f:
raw = f.read()
if not raw:
continue
text = raw.decode("utf-8", errors="ignore")
if proc_name in text:
return True
except (IOError, OSError):
continue
except Exception as e:
logger.debug(f"Failed to scan /proc: {e}")
return False
def _is_module_active(self, proc_name: str) -> bool:
"""综合判断模块是否活跃。"""
# 1) 输出 channel 存在 writer → 模块肯定在运行
channel = _MODULE_CHANNEL_MAP.get(proc_name)
if channel and channel in self._cached_channels:
return True
# 2) 匹配 cyber node 名
if proc_name in self._cached_nodes:
return True
# 3) 模块映射表匹配
alt_names = _MODULE_NODE_MAP.get(proc_name, [])
for alt in alt_names:
if alt in self._cached_nodes:
return True
# 4) /proc 进程 cmdline 匹配
if self._is_active_by_cmdline(proc_name):
return True
return False
def run_once(self) -> SafetyStatus:
processes = self.config.get("processes", [])
if not processes:
return SafetyStatus("ProcessCheck", self.name, SafetyLevel.OK,
"No processes configured")
self._refresh_cyber_state()
missing = []
for proc_cfg in processes:
proc_name = proc_cfg.get("name", "")
level_str = proc_cfg.get("level", "ERROR")
level = SafetyLevel[level_str]
if not self._is_module_active(proc_name):
missing.append((level, proc_name))
if missing:
worst_level = max(l for l, _ in missing)
msgs = [f"{n} not running" for _, n in missing]
logger.warning(f"Process: {'; '.join(msgs)}")
return SafetyStatus(
name="ProcessCheck", source=self.name, level=worst_level,
message="; ".join(msgs),
details={"missing": [n for _, n in missing]},
)
logger.info(f"ProcessCheck OK: all {len(processes)} processes running")
return SafetyStatus(
name="ProcessCheck", source=self.name, level=SafetyLevel.OK,
message=f"All {len(processes)} processes running",
)