You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

263 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import yaml
from pathlib import Path
from typing import Dict, Any, List
import logging
logger = logging.getLogger("ConfigManager")
class ConfigManager:
DEFAULT_CONFIG_DIR = "configs"
def __init__(self, config_dir: str = None):
base = Path(__file__).resolve().parent
self.config_dir = base / (config_dir or self.DEFAULT_CONFIG_DIR)
self._configs: Dict[str, Dict] = {}
self._callbacks: Dict[str, List[callable]] = {}
self._load_all_configs()
def _load_all_configs(self):
config_map = {
"channel_freq": "channel_freq_checker_conf.yaml",
"process": "process_checker_conf.yaml",
"resource": "resource_checker_conf.yaml",
"analyzer_rules": "rule_matcher_conf.yaml",
"guardian": "guardian_conf.yaml",
"chassis": "chassis_checker_conf.yaml",
"system_health": "system_health_checker_conf.yaml",
"file_exist": "file_exist_checker_conf.yaml",
"localization": "localization_checker_conf.yaml",
"latency": "latency_checker_conf.yaml",
"control": "control_checker_conf.yaml",
"collision": "collision_checker_conf.yaml",
"gnss": "gnss_checker_conf.yaml",
"traffic_light": "traffic_light_checker_conf.yaml",
"resource_statistics": "resource_statistic_conf.yaml",
"process_statistics": "process_statistic_conf.yaml",
"latency_statistics": "latency_statistic_conf.yaml",
}
for key, filename in config_map.items():
path = self.config_dir / filename
if path.exists():
self._configs[key] = self._load_yaml(path)
logger.info(f"Loaded config: {filename}")
else:
logger.warning(f"Config not found: {path}, using defaults")
self._configs[key] = self._get_default_config(key)
def _load_yaml(self, path: Path) -> Dict:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
def _get_default_config(self, key: str) -> Dict:
defaults = {
"channel_freq": {
"items": [
{
"name": "/apollo/canbus/chassis",
"level": "ERROR",
"expect_freq": 100,
"lower_limit": 50,
"upper_limit": 150,
"tags": ["chassis", "critical"],
},
{
"name": "/apollo/control",
"level": "FATAL",
"expect_freq": 100,
"lower_limit": 80,
"upper_limit": 120,
"tags": ["control", "critical"],
},
{
"name": "/apollo/localization/pose",
"level": "ERROR",
"expect_freq": 100,
"lower_limit": 80,
"upper_limit": 120,
"tags": ["localization", "critical"],
},
{
"name": "/apollo/perception/obstacles",
"level": "ERROR",
"expect_freq": 10,
"lower_limit": 6,
"upper_limit": 15,
"tags": ["perception", "critical"],
},
{
"name": "/apollo/perception/traffic_light",
"level": "ERROR",
"expect_freq": 10,
"lower_limit": 6,
"upper_limit": 15,
"tags": ["traffic_light"],
},
{
"name": "/apollo/planning",
"level": "ERROR",
"expect_freq": 10,
"lower_limit": 8,
"upper_limit": 12,
"tags": ["planning", "critical"],
},
{
"name": "/apollo/prediction",
"level": "ERROR",
"expect_freq": 10,
"lower_limit": 8,
"upper_limit": 12,
"tags": ["prediction"],
},
]
},
"process": {
"processes": [
{"name": "perception", "level": "ERROR"},
{"name": "planning", "level": "ERROR"},
{"name": "control", "level": "FATAL"},
]
},
"resource": {
"resource": {
"cpu": {"usage_warning": 80, "usage_error": 90},
"memory": {"available_warning": 2000, "available_error": 1000},
}
},
"guardian": {
"error_threshold": 25,
"fatal_threshold": 50,
"hysteresis_window": 3,
},
"chassis": {
"data_timeout": 1.0,
"warn_modes": [],
},
"system_health": {},
"file_exist": {
"items": [
{"name": "novatel_localization_extrinsics.yaml",
"path": "./modules/localization/msf/params/novatel_localization_extrinsics.yaml",
"level": "FATAL"},
{"name": "static_transform_conf.pb.txt",
"path": "./modules/transform/conf/static_transform_conf.pb.txt",
"level": "FATAL"},
{"name": "sensor_meta.pb.txt",
"path": "./modules/perception/data/conf/sensor_meta.pb.txt",
"level": "ERROR"},
]
},
"localization": {
"fatal_threshold": 3,
"status_timeout": 2.0,
},
"latency": {
"items": [
{"name": "chassis",
"channel": "/apollo/canbus/chassis",
"level": "WARN", "max_delay_ms": 500},
{"name": "control",
"channel": "/apollo/control",
"level": "WARN", "max_delay_ms": 500},
{"name": "localization",
"channel": "/apollo/localization/pose",
"level": "WARN", "max_delay_ms": 500},
{"name": "perception obstacle",
"channel": "/apollo/perception/obstacles",
"level": "WARN", "max_delay_ms": 1000},
{"name": "traffic light",
"channel": "/apollo/perception/traffic_light",
"level": "WARN", "max_delay_ms": 1000},
{"name": "planning",
"channel": "/apollo/planning",
"level": "WARN", "max_delay_ms": 1000},
{"name": "prediction",
"channel": "/apollo/prediction",
"level": "WARN", "max_delay_ms": 1000},
]
},
"control": {
"threshold": {"speed_kph": 5.0, "accel_mps2": 1.0, "steer_pct": 10.0},
"data_timeout": 1.0,
},
"collision": {
"warn_sec": 5.0,
"error_sec": 4.0,
"fatal_sec": 3.0,
"type": "NORMAL",
"warn_strategy_conf": {
"collision_check_angle": 90.0,
"collision_check_distance": 100.0,
"adc_extended": {"front": 0.2, "lateral": 0.2, "back": 0.2},
"speed_threshold_mps": 0.01,
"steering_threshold_percentage": 50,
},
},
"gnss": {
"gnss": {
"min_satellites": 6,
"max_diff_age": 5.0,
"max_pdop": 5.0,
"max_hdop": 3.0,
"max_vdop": 4.0,
"require_fixed_solution": True,
"data_timeout": 3.0,
"imu_expect_freq": 100,
"imu_freq_lower_limit": 60,
"max_consecutive_lost": 5,
},
},
"traffic_light": {
"traffic_light": {
"data_timeout": 2.0,
"min_confidence": 0.5,
"max_red_duration": 300,
"max_no_data_duration": 10.0,
"blink_yellow_as_red": True,
"max_consecutive_lost": 10,
"freq_window": 50,
},
},
"resource_statistics": {
"disk_patterns": ["sd[a-z][0-9]*", "nvme[\\w]*", "ssd[\\w]*"],
"collect_interval": 1.0,
"history_size": 300,
},
"process_statistics": {
"general_process_regex": [".*/apollo/scripts/.*", "cyber_recorder record .*"],
"collect_interval": 5.0,
"history_size": 60,
},
"latency_statistics": {
"compensator_topic": "/apollo/sensor/lidar16/compensator/PointCloud2",
"collect_interval": 1.0,
"history_size": 300,
},
}
return defaults.get(key, {})
def get(self, section: str, key: str = None, default: Any = None) -> Any:
config = self._configs.get(section, {})
if key is None or not isinstance(key, str):
if isinstance(key, dict):
# 兼容用法: get("section", {}) 中 {} 被误当作 key实际应是获取整个 section
return config if config else key
return config
return config.get(key, default)
def register_hot_update(self, section: str, callback: callable):
self._callbacks.setdefault(section, []).append(callback)
def hot_update(self, section: str, new_config: Dict):
if section not in self._configs:
logger.error(f"Unknown config section: {section}")
return
old_config = self._configs[section].copy()
self._configs[section].update(new_config)
logger.info(f"Hot updated config: {section}")
for cb in self._callbacks.get(section, []):
try:
cb(old_config, self._configs[section])
except Exception as e:
logger.error(f"Callback error: {e}")