|
|
#!/usr/bin/env python3
|
|
|
"""Apollo Safety Manager — monitors cyber channels, nodes, and system health."""
|
|
|
import os
|
|
|
import sys
|
|
|
import signal
|
|
|
import time
|
|
|
import threading
|
|
|
import logging
|
|
|
|
|
|
# ── Bootstrap: locate and configure cyber module ──────────────────────────
|
|
|
|
|
|
def _find_cyber():
|
|
|
try:
|
|
|
from cyber.python.cyber_py3 import cyber # noqa: F401
|
|
|
return
|
|
|
except (ImportError, ModuleNotFoundError):
|
|
|
pass
|
|
|
|
|
|
_unshadow_cyber()
|
|
|
|
|
|
candidates = [
|
|
|
"/opt/apollo/neo",
|
|
|
"/home/apollo/application-core/.aem/envroot/opt/apollo/neo",
|
|
|
]
|
|
|
|
|
|
ah = os.environ.get("APOLLO_HOME", "")
|
|
|
if ah:
|
|
|
candidates.insert(0, ah)
|
|
|
|
|
|
dist_home = None
|
|
|
fastrtps_lib = None
|
|
|
|
|
|
for d in candidates:
|
|
|
p = os.path.join(d, "python", "cyber")
|
|
|
f = os.path.join(d, "packages", "3rd-fastdds-wrap", "lib")
|
|
|
if not os.path.isdir(f):
|
|
|
f = os.path.join(d, "packages", "3rd-fastrtps", "lib")
|
|
|
if os.path.isdir(p):
|
|
|
dist_home = d
|
|
|
fastrtps_lib = f if os.path.isdir(f) else None
|
|
|
break
|
|
|
|
|
|
if dist_home is None:
|
|
|
print("[WARN] Could not locate cyber module. Ensure you are running "
|
|
|
"inside an Apollo Docker container.", file=sys.stderr)
|
|
|
return
|
|
|
|
|
|
os.environ["APOLLO_DISTRIBUTION_HOME"] = dist_home
|
|
|
|
|
|
if fastrtps_lib:
|
|
|
existing = os.environ.get("LD_LIBRARY_PATH", "")
|
|
|
os.environ["LD_LIBRARY_PATH"] = fastrtps_lib + (":" + existing if existing else "")
|
|
|
|
|
|
parent = os.path.join(dist_home, "python")
|
|
|
if parent not in sys.path:
|
|
|
sys.path.insert(0, parent)
|
|
|
|
|
|
internal = os.path.join(dist_home, "lib", "cyber", "python", "internal")
|
|
|
if os.path.isdir(internal) and internal not in sys.path:
|
|
|
sys.path.insert(0, internal)
|
|
|
|
|
|
|
|
|
def _unshadow_cyber():
|
|
|
"""Rename conda site-packages cyber/__init__.py so it becomes a namespace package."""
|
|
|
for path in list(sys.path):
|
|
|
if "site-packages" in path:
|
|
|
cyber_dir = os.path.join(path, "cyber")
|
|
|
init_py = os.path.join(cyber_dir, "__init__.py")
|
|
|
if os.path.isfile(init_py):
|
|
|
bak = init_py + ".cyber_bak"
|
|
|
try:
|
|
|
os.rename(init_py, bak)
|
|
|
except OSError:
|
|
|
pass
|
|
|
break
|
|
|
|
|
|
|
|
|
_find_cyber()
|
|
|
|
|
|
# ── Imports ──────────────────────────────────────────────────────────────
|
|
|
|
|
|
from cyber.python.cyber_py3 import cyber
|
|
|
from config_manager import ConfigManager
|
|
|
from exporter import SafetyExporter
|
|
|
from checkers.channel_freq import ChannelFreqChecker
|
|
|
from checkers.resource import ResourceChecker
|
|
|
from checkers.process import ProcessChecker
|
|
|
from checkers.chassis import ChassisChecker
|
|
|
from checkers.system_health import SystemHealthChecker
|
|
|
from checkers.file_exist import FileExistChecker
|
|
|
from checkers.localization import LocalizationChecker
|
|
|
from checkers.latency import LatencyChecker
|
|
|
from checkers.control import ControlDeviationChecker
|
|
|
from checkers.collision import CollisionChecker
|
|
|
from checkers.gnss import GnssSignalChecker
|
|
|
from checkers.traffic_light import TrafficLightChecker
|
|
|
from analyzer import Analyzer
|
|
|
from aggregator import ModuleAggregator
|
|
|
from decider import SafetyDecider
|
|
|
from guardian import SafetyGuardian
|
|
|
from reporter import ReportGenerator
|
|
|
from recorder import DataRecorder, SafetyEvent
|
|
|
from safety_status_wrapper import SafetyStatusWrapper
|
|
|
from statistics.resource_statistic import ResourceStatistic
|
|
|
from statistics.process_statistic import ProcessStatistic
|
|
|
from statistics.latency_statistic import LatencyStatistic
|
|
|
from models import SafetyLevel, SafetyStatus, DegradationMode, SafetyCommand
|
|
|
|
|
|
logger = logging.getLogger("SafetyManager")
|
|
|
|
|
|
|
|
|
def _force_exit_after_timeout(timeout: int = 10):
|
|
|
"""强制退出计时器:超过 timeout 秒后进程仍未退出则强制终止。"""
|
|
|
time.sleep(timeout)
|
|
|
logger.warning(f"Process did not exit within {timeout}s, forcing exit.")
|
|
|
os._exit(1)
|
|
|
|
|
|
|
|
|
def _init_statistics(config: ConfigManager) -> dict:
|
|
|
"""初始化统计模块。"""
|
|
|
stats = {}
|
|
|
|
|
|
resource_stat = ResourceStatistic(
|
|
|
config.get("resource_statistics", None) or {})
|
|
|
resource_stat.start()
|
|
|
stats["resource"] = resource_stat
|
|
|
logger.info("Resource statistics collector started")
|
|
|
|
|
|
process_stat = ProcessStatistic(
|
|
|
config.get("process_statistics", None) or {})
|
|
|
process_stat.start()
|
|
|
stats["process"] = process_stat
|
|
|
logger.info("Process statistics collector started")
|
|
|
|
|
|
latency_stat = LatencyStatistic(
|
|
|
config.get("latency_statistics", None) or {})
|
|
|
latency_stat.start()
|
|
|
stats["latency"] = latency_stat
|
|
|
logger.info("Latency statistics collector started")
|
|
|
|
|
|
return stats
|
|
|
|
|
|
|
|
|
def _init_checkers(config: ConfigManager, exporter: SafetyExporter,
|
|
|
cyber_node) -> list:
|
|
|
"""初始化所有检查器。"""
|
|
|
freq_checker = ChannelFreqChecker(
|
|
|
config=config.get("channel_freq"),
|
|
|
exporter=exporter,
|
|
|
cyber_node=cyber_node,
|
|
|
)
|
|
|
resource_checker = ResourceChecker(
|
|
|
config=config.get("resource"),
|
|
|
exporter=exporter,
|
|
|
)
|
|
|
process_checker = ProcessChecker(
|
|
|
config=config.get("process"),
|
|
|
exporter=exporter,
|
|
|
)
|
|
|
chassis_checker = ChassisChecker(
|
|
|
config=config.get("chassis"),
|
|
|
exporter=exporter,
|
|
|
channel_freq_checker=freq_checker,
|
|
|
)
|
|
|
system_health_checker = SystemHealthChecker(
|
|
|
config=config.get("system_health"),
|
|
|
exporter=exporter,
|
|
|
)
|
|
|
file_exist_checker = FileExistChecker(
|
|
|
config=config.get("file_exist"),
|
|
|
exporter=exporter,
|
|
|
)
|
|
|
localization_checker = LocalizationChecker(
|
|
|
config=config.get("localization"),
|
|
|
exporter=exporter,
|
|
|
cyber_node=cyber_node,
|
|
|
channel_freq_checker=freq_checker,
|
|
|
)
|
|
|
latency_checker = LatencyChecker(
|
|
|
config=config.get("latency"),
|
|
|
exporter=exporter,
|
|
|
cyber_node=cyber_node,
|
|
|
channel_freq_checker=freq_checker,
|
|
|
)
|
|
|
control_checker = ControlDeviationChecker(
|
|
|
config=config.get("control"),
|
|
|
exporter=exporter,
|
|
|
channel_freq_checker=freq_checker,
|
|
|
)
|
|
|
collision_checker = CollisionChecker(
|
|
|
config=config.get("collision"),
|
|
|
exporter=exporter,
|
|
|
cyber_node=cyber_node,
|
|
|
channel_freq_checker=freq_checker,
|
|
|
)
|
|
|
gnss_checker = GnssSignalChecker(
|
|
|
config=config.get("gnss"),
|
|
|
exporter=exporter,
|
|
|
cyber_node=cyber_node,
|
|
|
channel_freq_checker=freq_checker,
|
|
|
)
|
|
|
traffic_light_checker = TrafficLightChecker(
|
|
|
config=config.get("traffic_light"),
|
|
|
exporter=exporter,
|
|
|
cyber_node=cyber_node,
|
|
|
channel_freq_checker=freq_checker,
|
|
|
)
|
|
|
|
|
|
checkers = [
|
|
|
freq_checker, resource_checker, process_checker,
|
|
|
chassis_checker, system_health_checker, file_exist_checker,
|
|
|
localization_checker, latency_checker, control_checker,
|
|
|
collision_checker, gnss_checker, traffic_light_checker,
|
|
|
]
|
|
|
|
|
|
# 开始订阅
|
|
|
freq_checker.start_subscribing()
|
|
|
localization_checker.start_subscribing()
|
|
|
latency_checker.start_subscribing()
|
|
|
control_checker.start_subscribing()
|
|
|
collision_checker.start_subscribing()
|
|
|
chassis_checker.start_subscribing()
|
|
|
gnss_checker.start_subscribing()
|
|
|
traffic_light_checker.start_subscribing()
|
|
|
|
|
|
logger.info(f"All checkers initialized: {len(checkers)} checkers")
|
|
|
return checkers
|
|
|
|
|
|
|
|
|
def _stop_checkers(checkers: list):
|
|
|
"""停止所有检查器的订阅。"""
|
|
|
for checker in checkers:
|
|
|
try:
|
|
|
if hasattr(checker, "stop_subscribing"):
|
|
|
checker.stop_subscribing()
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Error stopping {checker.name}: {e}")
|
|
|
logger.info("All checkers stopped")
|
|
|
|
|
|
|
|
|
def main():
|
|
|
cyber.init("safety_manager")
|
|
|
|
|
|
config = ConfigManager("configs")
|
|
|
|
|
|
# ── 初始化统计模块 ──────────────────────────────────────────────
|
|
|
statistics = _init_statistics(config)
|
|
|
|
|
|
# ── 初始化核心安全模块 ──────────────────────────────────────────
|
|
|
exporter = SafetyExporter("safety_manager_node")
|
|
|
analyzer = Analyzer(config)
|
|
|
aggregator = ModuleAggregator()
|
|
|
decider = SafetyDecider(config)
|
|
|
guardian = SafetyGuardian()
|
|
|
|
|
|
# ── 数据记录器 ──────────────────────────────────────────────────
|
|
|
recorder = DataRecorder(record_dir="records", config={
|
|
|
"max_file_size_mb": 50,
|
|
|
"retention_days": 7,
|
|
|
"flush_interval": 5.0,
|
|
|
})
|
|
|
recorder.start()
|
|
|
|
|
|
def on_status(status):
|
|
|
decision = analyzer.on_status(status)
|
|
|
if decision:
|
|
|
aggregator.add_decision(decision)
|
|
|
|
|
|
exporter.subscribe(on_status)
|
|
|
|
|
|
# ── 初始化检查器 ────────────────────────────────────────────────
|
|
|
node_name = f"safety_manager_node_{int(time.time())}"
|
|
|
cyber_node = cyber.Node(node_name)
|
|
|
logger.info(f"Cyber node created: {node_name}")
|
|
|
checkers = _init_checkers(config, exporter, cyber_node)
|
|
|
|
|
|
# 将 cyber_node 回传给 latency 统计模块用于订阅延迟数据
|
|
|
latency_stat = statistics.get("latency")
|
|
|
if latency_stat and hasattr(latency_stat, "_cyber_node"):
|
|
|
latency_stat._cyber_node = cyber_node
|
|
|
|
|
|
spin_thread = threading.Thread(target=cyber_node.spin, daemon=True)
|
|
|
spin_thread.start()
|
|
|
|
|
|
logger.info("Safety Manager STARTED (cyber mode)")
|
|
|
|
|
|
def shutdown_handler(signum, frame):
|
|
|
logger.info("Received signal, shutting down...")
|
|
|
# 启动强制退出计时器:10 秒后未退出则强制终止
|
|
|
threading.Thread(target=_force_exit_after_timeout,
|
|
|
args=(10,), daemon=True).start()
|
|
|
cyber.shutdown()
|
|
|
|
|
|
signal.signal(signal.SIGINT, shutdown_handler)
|
|
|
signal.signal(signal.SIGTERM, shutdown_handler)
|
|
|
|
|
|
exporter.start()
|
|
|
|
|
|
reporter = ReportGenerator(report_dir="reports")
|
|
|
logger.info(f"Report directory: {reporter.report_dir}")
|
|
|
reporter.write()
|
|
|
|
|
|
# ── 主循环 ──────────────────────────────────────────────────────
|
|
|
|
|
|
cycle_count = 0
|
|
|
try:
|
|
|
while cyber.ok():
|
|
|
cycle_count += 1
|
|
|
|
|
|
for checker in checkers:
|
|
|
try:
|
|
|
status = checker.run_once()
|
|
|
if status:
|
|
|
exporter.report(status)
|
|
|
reporter.update(status.name, {
|
|
|
"level": status.level.value,
|
|
|
"message": status.message,
|
|
|
"timestamp": status.timestamp,
|
|
|
"details": status.details,
|
|
|
"raw_value": status.raw_value,
|
|
|
"tags": status.tags,
|
|
|
})
|
|
|
except Exception as e:
|
|
|
error_status = SafetyStatus(
|
|
|
name=f"{checker.name}_INTERNAL_ERROR",
|
|
|
source=checker.name,
|
|
|
level=SafetyLevel.ERROR,
|
|
|
message=f"Checker exception: {str(e)}",
|
|
|
)
|
|
|
exporter.report(error_status, bypass_filter=True)
|
|
|
logger.error(f"Checker {checker.name} error: {e}")
|
|
|
|
|
|
action = None
|
|
|
try:
|
|
|
aggregated = aggregator.get_aggregated_status()
|
|
|
if aggregated:
|
|
|
command = decider.decide(aggregated)
|
|
|
action = guardian.execute(command)
|
|
|
status_str = ", ".join(
|
|
|
f"{s.module}:{s.overall_level.value}" for s in aggregated
|
|
|
)
|
|
|
logger.info(f"Status: [{status_str}] | Action: {action.action_type}")
|
|
|
aggregator.clear()
|
|
|
|
|
|
# 记录指令事件
|
|
|
recorder.record_event(SafetyEvent(
|
|
|
event_type="safety_command",
|
|
|
level=command.level.value,
|
|
|
source="Decider",
|
|
|
message=command.to_log(),
|
|
|
details={"command_id": command.command_id,
|
|
|
"target_mode": command.target_mode.name,
|
|
|
"reasons": command.reasons},
|
|
|
))
|
|
|
except Exception as e:
|
|
|
logger.error(f"Decision loop error: {e}", exc_info=True)
|
|
|
|
|
|
if action:
|
|
|
mode_name = guardian.current_mode.name
|
|
|
reporter.update_global("current_mode", mode_name)
|
|
|
reporter.update_global("last_action", {
|
|
|
"action_type": action.action_type,
|
|
|
"parameters": action.parameters,
|
|
|
"request_takeover": action.request_takeover,
|
|
|
})
|
|
|
|
|
|
# 记录模式变更
|
|
|
if action.request_takeover:
|
|
|
recorder.record_event(SafetyEvent(
|
|
|
event_type="takeover_request",
|
|
|
level="WARN",
|
|
|
source="Guardian",
|
|
|
message=f"Takeover requested: {action.log_message}",
|
|
|
))
|
|
|
|
|
|
# 输出 Guardian 和 Decider 扩展状态
|
|
|
reporter.update_global("guardian_stats", guardian.get_stats())
|
|
|
transitions = guardian.get_transitions(5)
|
|
|
if transitions:
|
|
|
reporter.update_global("mode_transitions",
|
|
|
[t.to_dict() for t in transitions])
|
|
|
|
|
|
reporter.write()
|
|
|
|
|
|
# 定时清理记录数据
|
|
|
if cycle_count % 100 == 0:
|
|
|
recorder.cleanup_old_data()
|
|
|
|
|
|
# 收集所有统计快照并输出
|
|
|
if cycle_count % 2 == 0:
|
|
|
stats_data = {}
|
|
|
for name in ("resource", "process", "latency"):
|
|
|
try:
|
|
|
stat = statistics[name]
|
|
|
if hasattr(stat, 'collect_once'):
|
|
|
stat.collect_once()
|
|
|
latest = stat.get_latest()
|
|
|
if latest is not None and hasattr(latest, 'to_dict'):
|
|
|
stats_data[name] = latest.to_dict()
|
|
|
except Exception as e:
|
|
|
logger.debug(f"{name} stat collect error: {e}")
|
|
|
|
|
|
if stats_data:
|
|
|
reporter.update_global("statistics", stats_data)
|
|
|
|
|
|
time.sleep(0.5)
|
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
logger.info("Received KeyboardInterrupt")
|
|
|
finally:
|
|
|
logger.info("Shutting down...")
|
|
|
_stop_checkers(checkers)
|
|
|
statistics["resource"].stop()
|
|
|
statistics["process"].stop()
|
|
|
statistics["latency"].stop()
|
|
|
recorder.stop()
|
|
|
reporter.stop()
|
|
|
guardian.execute(SafetyCommand(
|
|
|
command_id="shutdown", level=SafetyLevel.OK,
|
|
|
target_mode=DegradationMode.L0_NORMAL,
|
|
|
reasons=["System shutdown"],
|
|
|
))
|
|
|
logger.info("Safety Manager STOPPED")
|
|
|
cyber.shutdown()
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
logging.basicConfig(
|
|
|
level=logging.INFO,
|
|
|
format="%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s",
|
|
|
)
|
|
|
main()
|