You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

433 lines
16 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""Apollo Safety Manager — monitors cyber channels, nodes, and system health."""
import os
import sys
import signal
import time
import threading
import logging
# ── Bootstrap: locate and configure cyber module ──────────────────────────
def _find_cyber():
try:
from cyber.python.cyber_py3 import cyber # noqa: F401
return
except (ImportError, ModuleNotFoundError):
pass
_unshadow_cyber()
candidates = [
"/opt/apollo/neo",
"/home/apollo/application-core/.aem/envroot/opt/apollo/neo",
]
ah = os.environ.get("APOLLO_HOME", "")
if ah:
candidates.insert(0, ah)
dist_home = None
fastrtps_lib = None
for d in candidates:
p = os.path.join(d, "python", "cyber")
f = os.path.join(d, "packages", "3rd-fastdds-wrap", "lib")
if not os.path.isdir(f):
f = os.path.join(d, "packages", "3rd-fastrtps", "lib")
if os.path.isdir(p):
dist_home = d
fastrtps_lib = f if os.path.isdir(f) else None
break
if dist_home is None:
print("[WARN] Could not locate cyber module. Ensure you are running "
"inside an Apollo Docker container.", file=sys.stderr)
return
os.environ["APOLLO_DISTRIBUTION_HOME"] = dist_home
if fastrtps_lib:
existing = os.environ.get("LD_LIBRARY_PATH", "")
os.environ["LD_LIBRARY_PATH"] = fastrtps_lib + (":" + existing if existing else "")
parent = os.path.join(dist_home, "python")
if parent not in sys.path:
sys.path.insert(0, parent)
internal = os.path.join(dist_home, "lib", "cyber", "python", "internal")
if os.path.isdir(internal) and internal not in sys.path:
sys.path.insert(0, internal)
def _unshadow_cyber():
"""Rename conda site-packages cyber/__init__.py so it becomes a namespace package."""
for path in list(sys.path):
if "site-packages" in path:
cyber_dir = os.path.join(path, "cyber")
init_py = os.path.join(cyber_dir, "__init__.py")
if os.path.isfile(init_py):
bak = init_py + ".cyber_bak"
try:
os.rename(init_py, bak)
except OSError:
pass
break
_find_cyber()
# ── Imports ──────────────────────────────────────────────────────────────
from cyber.python.cyber_py3 import cyber
from config_manager import ConfigManager
from exporter import SafetyExporter
from checkers.channel_freq import ChannelFreqChecker
from checkers.resource import ResourceChecker
from checkers.process import ProcessChecker
from checkers.chassis import ChassisChecker
from checkers.system_health import SystemHealthChecker
from checkers.file_exist import FileExistChecker
from checkers.localization import LocalizationChecker
from checkers.latency import LatencyChecker
from checkers.control import ControlDeviationChecker
from checkers.collision import CollisionChecker
from checkers.gnss import GnssSignalChecker
from checkers.traffic_light import TrafficLightChecker
from analyzer import Analyzer
from aggregator import ModuleAggregator
from decider import SafetyDecider
from guardian import SafetyGuardian
from reporter import ReportGenerator
from recorder import DataRecorder, SafetyEvent
from safety_status_wrapper import SafetyStatusWrapper
from statistics.resource_statistic import ResourceStatistic
from statistics.process_statistic import ProcessStatistic
from statistics.latency_statistic import LatencyStatistic
from models import SafetyLevel, SafetyStatus, DegradationMode, SafetyCommand
logger = logging.getLogger("SafetyManager")
def _force_exit_after_timeout(timeout: int = 10):
"""强制退出计时器:超过 timeout 秒后进程仍未退出则强制终止。"""
time.sleep(timeout)
logger.warning(f"Process did not exit within {timeout}s, forcing exit.")
os._exit(1)
def _init_statistics(config: ConfigManager) -> dict:
"""初始化统计模块。"""
stats = {}
resource_stat = ResourceStatistic(
config.get("resource_statistics", None) or {})
resource_stat.start()
stats["resource"] = resource_stat
logger.info("Resource statistics collector started")
process_stat = ProcessStatistic(
config.get("process_statistics", None) or {})
process_stat.start()
stats["process"] = process_stat
logger.info("Process statistics collector started")
latency_stat = LatencyStatistic(
config.get("latency_statistics", None) or {})
latency_stat.start()
stats["latency"] = latency_stat
logger.info("Latency statistics collector started")
return stats
def _init_checkers(config: ConfigManager, exporter: SafetyExporter,
cyber_node) -> list:
"""初始化所有检查器。"""
freq_checker = ChannelFreqChecker(
config=config.get("channel_freq"),
exporter=exporter,
cyber_node=cyber_node,
)
resource_checker = ResourceChecker(
config=config.get("resource"),
exporter=exporter,
)
process_checker = ProcessChecker(
config=config.get("process"),
exporter=exporter,
)
chassis_checker = ChassisChecker(
config=config.get("chassis"),
exporter=exporter,
channel_freq_checker=freq_checker,
)
system_health_checker = SystemHealthChecker(
config=config.get("system_health"),
exporter=exporter,
)
file_exist_checker = FileExistChecker(
config=config.get("file_exist"),
exporter=exporter,
)
localization_checker = LocalizationChecker(
config=config.get("localization"),
exporter=exporter,
cyber_node=cyber_node,
channel_freq_checker=freq_checker,
)
latency_checker = LatencyChecker(
config=config.get("latency"),
exporter=exporter,
cyber_node=cyber_node,
channel_freq_checker=freq_checker,
)
control_checker = ControlDeviationChecker(
config=config.get("control"),
exporter=exporter,
channel_freq_checker=freq_checker,
)
collision_checker = CollisionChecker(
config=config.get("collision"),
exporter=exporter,
cyber_node=cyber_node,
channel_freq_checker=freq_checker,
)
gnss_checker = GnssSignalChecker(
config=config.get("gnss"),
exporter=exporter,
cyber_node=cyber_node,
channel_freq_checker=freq_checker,
)
traffic_light_checker = TrafficLightChecker(
config=config.get("traffic_light"),
exporter=exporter,
cyber_node=cyber_node,
channel_freq_checker=freq_checker,
)
checkers = [
freq_checker, resource_checker, process_checker,
chassis_checker, system_health_checker, file_exist_checker,
localization_checker, latency_checker, control_checker,
collision_checker, gnss_checker, traffic_light_checker,
]
# 开始订阅
freq_checker.start_subscribing()
localization_checker.start_subscribing()
latency_checker.start_subscribing()
control_checker.start_subscribing()
collision_checker.start_subscribing()
chassis_checker.start_subscribing()
gnss_checker.start_subscribing()
traffic_light_checker.start_subscribing()
logger.info(f"All checkers initialized: {len(checkers)} checkers")
return checkers
def _stop_checkers(checkers: list):
"""停止所有检查器的订阅。"""
for checker in checkers:
try:
if hasattr(checker, "stop_subscribing"):
checker.stop_subscribing()
except Exception as e:
logger.debug(f"Error stopping {checker.name}: {e}")
logger.info("All checkers stopped")
def main():
cyber.init("safety_manager")
config = ConfigManager("configs")
# ── 初始化统计模块 ──────────────────────────────────────────────
statistics = _init_statistics(config)
# ── 初始化核心安全模块 ──────────────────────────────────────────
exporter = SafetyExporter("safety_manager_node")
analyzer = Analyzer(config)
aggregator = ModuleAggregator()
decider = SafetyDecider(config)
guardian = SafetyGuardian()
# ── 数据记录器 ──────────────────────────────────────────────────
recorder = DataRecorder(record_dir="records", config={
"max_file_size_mb": 50,
"retention_days": 7,
"flush_interval": 5.0,
})
recorder.start()
def on_status(status):
decision = analyzer.on_status(status)
if decision:
aggregator.add_decision(decision)
exporter.subscribe(on_status)
# ── 初始化检查器 ────────────────────────────────────────────────
node_name = f"safety_manager_node_{int(time.time())}"
cyber_node = cyber.Node(node_name)
logger.info(f"Cyber node created: {node_name}")
checkers = _init_checkers(config, exporter, cyber_node)
# 将 cyber_node 回传给 latency 统计模块用于订阅延迟数据
latency_stat = statistics.get("latency")
if latency_stat and hasattr(latency_stat, "_cyber_node"):
latency_stat._cyber_node = cyber_node
spin_thread = threading.Thread(target=cyber_node.spin, daemon=True)
spin_thread.start()
logger.info("Safety Manager STARTED (cyber mode)")
def shutdown_handler(signum, frame):
logger.info("Received signal, shutting down...")
# 启动强制退出计时器10 秒后未退出则强制终止
threading.Thread(target=_force_exit_after_timeout,
args=(10,), daemon=True).start()
cyber.shutdown()
signal.signal(signal.SIGINT, shutdown_handler)
signal.signal(signal.SIGTERM, shutdown_handler)
exporter.start()
reporter = ReportGenerator(report_dir="reports")
logger.info(f"Report directory: {reporter.report_dir}")
reporter.write()
# ── 主循环 ──────────────────────────────────────────────────────
cycle_count = 0
try:
while cyber.ok():
cycle_count += 1
for checker in checkers:
try:
status = checker.run_once()
if status:
exporter.report(status)
reporter.update(status.name, {
"level": status.level.value,
"message": status.message,
"timestamp": status.timestamp,
"details": status.details,
"raw_value": status.raw_value,
"tags": status.tags,
})
except Exception as e:
error_status = SafetyStatus(
name=f"{checker.name}_INTERNAL_ERROR",
source=checker.name,
level=SafetyLevel.ERROR,
message=f"Checker exception: {str(e)}",
)
exporter.report(error_status, bypass_filter=True)
logger.error(f"Checker {checker.name} error: {e}")
action = None
try:
aggregated = aggregator.get_aggregated_status()
if aggregated:
command = decider.decide(aggregated)
action = guardian.execute(command)
status_str = ", ".join(
f"{s.module}:{s.overall_level.value}" for s in aggregated
)
logger.info(f"Status: [{status_str}] | Action: {action.action_type}")
aggregator.clear()
# 记录指令事件
recorder.record_event(SafetyEvent(
event_type="safety_command",
level=command.level.value,
source="Decider",
message=command.to_log(),
details={"command_id": command.command_id,
"target_mode": command.target_mode.name,
"reasons": command.reasons},
))
except Exception as e:
logger.error(f"Decision loop error: {e}", exc_info=True)
if action:
mode_name = guardian.current_mode.name
reporter.update_global("current_mode", mode_name)
reporter.update_global("last_action", {
"action_type": action.action_type,
"parameters": action.parameters,
"request_takeover": action.request_takeover,
})
# 记录模式变更
if action.request_takeover:
recorder.record_event(SafetyEvent(
event_type="takeover_request",
level="WARN",
source="Guardian",
message=f"Takeover requested: {action.log_message}",
))
# 输出 Guardian 和 Decider 扩展状态
reporter.update_global("guardian_stats", guardian.get_stats())
transitions = guardian.get_transitions(5)
if transitions:
reporter.update_global("mode_transitions",
[t.to_dict() for t in transitions])
reporter.write()
# 定时清理记录数据
if cycle_count % 100 == 0:
recorder.cleanup_old_data()
# 收集所有统计快照并输出
if cycle_count % 2 == 0:
stats_data = {}
for name in ("resource", "process", "latency"):
try:
stat = statistics[name]
if hasattr(stat, 'collect_once'):
stat.collect_once()
latest = stat.get_latest()
if latest is not None and hasattr(latest, 'to_dict'):
stats_data[name] = latest.to_dict()
except Exception as e:
logger.debug(f"{name} stat collect error: {e}")
if stats_data:
reporter.update_global("statistics", stats_data)
time.sleep(0.5)
except KeyboardInterrupt:
logger.info("Received KeyboardInterrupt")
finally:
logger.info("Shutting down...")
_stop_checkers(checkers)
statistics["resource"].stop()
statistics["process"].stop()
statistics["latency"].stop()
recorder.stop()
reporter.stop()
guardian.execute(SafetyCommand(
command_id="shutdown", level=SafetyLevel.OK,
target_mode=DegradationMode.L0_NORMAL,
reasons=["System shutdown"],
))
logger.info("Safety Manager STOPPED")
cyber.shutdown()
if __name__ == "__main__":
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(name)-20s | %(message)s",
)
main()