|
|
import os
|
|
|
import time
|
|
|
import logging
|
|
|
from checkers.base import BaseChecker
|
|
|
from exporter import SafetyExporter
|
|
|
from models import SafetyStatus, SafetyLevel
|
|
|
|
|
|
from cyber.python.cyber_py3 import cyber
|
|
|
|
|
|
logger = logging.getLogger("Checker.Process")
|
|
|
|
|
|
# 模块名 → 输出 channel(如果有 writer 说明模块在运行)
|
|
|
_MODULE_CHANNEL_MAP = {
|
|
|
"planning": "/apollo/planning",
|
|
|
"control": "/apollo/control",
|
|
|
"perception": "/apollo/perception/obstacles",
|
|
|
}
|
|
|
|
|
|
# 模块名 → 可能的 cyber node 名列表
|
|
|
_MODULE_NODE_MAP = {
|
|
|
"perception": [
|
|
|
"MsgAdapterComponent",
|
|
|
"LidarDetection", "LidarTracking", "LidarDetectionFilter",
|
|
|
"PointCloudPreprocess", "PointCloudMapROI", "PointCloudGroundDetect",
|
|
|
"MultiSensorFusion",
|
|
|
"CameraDetectionSingleStageComponent",
|
|
|
"CameraDetectionMultiStageComponent",
|
|
|
"CameraDetectionBevComponent",
|
|
|
"CameraLocationEstimationComponent",
|
|
|
"CameraLocationRefinementComponent",
|
|
|
"CameraTrackingComponent",
|
|
|
"TrafficLightDetectionComponent",
|
|
|
"TrafficLightTrackingComponent",
|
|
|
"TrafficLightRecognitionComponent",
|
|
|
"TrafficLightRegionProposalComponent",
|
|
|
"RadarDetectionComponent",
|
|
|
"LaneDetectionComponent",
|
|
|
],
|
|
|
}
|
|
|
|
|
|
|
|
|
class ProcessChecker(BaseChecker):
|
|
|
def __init__(self, config: dict, exporter: SafetyExporter):
|
|
|
super().__init__("ProcessChecker", config, exporter)
|
|
|
self._last_cyber_refresh = 0
|
|
|
self._cached_nodes = set()
|
|
|
self._cached_channels = set()
|
|
|
self._refresh_interval = config.get("refresh_interval", 10.0)
|
|
|
|
|
|
def _refresh_cyber_state(self):
|
|
|
"""刷新 cyber 拓扑信息缓存,避免每次 2s 阻塞主循环。"""
|
|
|
now = time.time()
|
|
|
if now - self._last_cyber_refresh < self._refresh_interval and self._cached_nodes:
|
|
|
return
|
|
|
try:
|
|
|
# 首次发现需要 sleep 等待 DDS 拓扑发现
|
|
|
sleep_s = 2 if self._last_cyber_refresh == 0 else 0
|
|
|
self._cached_nodes = set(cyber.NodeUtils.get_nodes(sleep_s))
|
|
|
self._cached_channels = set(cyber.ChannelUtils.get_channels(sleep_s))
|
|
|
self._last_cyber_refresh = now
|
|
|
logger.debug(f"Discovered {len(self._cached_nodes)} nodes, "
|
|
|
f"{len(self._cached_channels)} channels")
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Failed to refresh cyber state: {e}")
|
|
|
|
|
|
def _is_active_by_cmdline(self, proc_name: str) -> bool:
|
|
|
"""扫描 /proc 进程 cmdline 是否包含模块名。"""
|
|
|
try:
|
|
|
for pid_dir in os.listdir("/proc"):
|
|
|
if not pid_dir.isdigit():
|
|
|
continue
|
|
|
try:
|
|
|
with open(f"/proc/{pid_dir}/cmdline", "rb") as f:
|
|
|
raw = f.read()
|
|
|
if not raw:
|
|
|
continue
|
|
|
text = raw.decode("utf-8", errors="ignore")
|
|
|
if proc_name in text:
|
|
|
return True
|
|
|
except (IOError, OSError):
|
|
|
continue
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Failed to scan /proc: {e}")
|
|
|
return False
|
|
|
|
|
|
def _is_module_active(self, proc_name: str) -> bool:
|
|
|
"""综合判断模块是否活跃。"""
|
|
|
# 1) 输出 channel 存在 writer → 模块肯定在运行
|
|
|
channel = _MODULE_CHANNEL_MAP.get(proc_name)
|
|
|
if channel and channel in self._cached_channels:
|
|
|
return True
|
|
|
|
|
|
# 2) 匹配 cyber node 名
|
|
|
if proc_name in self._cached_nodes:
|
|
|
return True
|
|
|
|
|
|
# 3) 模块映射表匹配
|
|
|
alt_names = _MODULE_NODE_MAP.get(proc_name, [])
|
|
|
for alt in alt_names:
|
|
|
if alt in self._cached_nodes:
|
|
|
return True
|
|
|
|
|
|
# 4) /proc 进程 cmdline 匹配
|
|
|
if self._is_active_by_cmdline(proc_name):
|
|
|
return True
|
|
|
|
|
|
return False
|
|
|
|
|
|
def run_once(self) -> SafetyStatus:
|
|
|
processes = self.config.get("processes", [])
|
|
|
if not processes:
|
|
|
return SafetyStatus("ProcessCheck", self.name, SafetyLevel.OK,
|
|
|
"No processes configured")
|
|
|
|
|
|
self._refresh_cyber_state()
|
|
|
|
|
|
missing = []
|
|
|
for proc_cfg in processes:
|
|
|
proc_name = proc_cfg.get("name", "")
|
|
|
level_str = proc_cfg.get("level", "ERROR")
|
|
|
level = SafetyLevel[level_str]
|
|
|
|
|
|
if not self._is_module_active(proc_name):
|
|
|
missing.append((level, proc_name))
|
|
|
|
|
|
if missing:
|
|
|
worst_level = max(l for l, _ in missing)
|
|
|
msgs = [f"{n} not running" for _, n in missing]
|
|
|
logger.warning(f"Process: {'; '.join(msgs)}")
|
|
|
return SafetyStatus(
|
|
|
name="ProcessCheck", source=self.name, level=worst_level,
|
|
|
message="; ".join(msgs),
|
|
|
details={"missing": [n for _, n in missing]},
|
|
|
)
|
|
|
|
|
|
logger.info(f"ProcessCheck OK: all {len(processes)} processes running")
|
|
|
return SafetyStatus(
|
|
|
name="ProcessCheck", source=self.name, level=SafetyLevel.OK,
|
|
|
message=f"All {len(processes)} processes running",
|
|
|
)
|