You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

529 lines
23 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import math
import time
import logging
from checkers.base import BaseChecker
from exporter import SafetyExporter
from models import SafetyStatus, SafetyLevel
logger = logging.getLogger("Checker.Control")
# protobuf 消息类型映射
_CONTROL_MSG_TYPE = "apollo::control::ControlCommand"
_CHASSIS_MSG_TYPE = "apollo::canbus::Chassis"
def _resolve_msg_class(msg_type: str):
"""动态加载 protobuf 类。"""
type_map = {
"apollo::control::ControlCommand": {
"module": "modules.common_msgs.control_msgs.control_cmd_pb2",
"class": "ControlCommand",
},
"apollo::canbus::Chassis": {
"module": "modules.common_msgs.chassis_msgs.chassis_pb2",
"class": "Chassis",
},
}
info = type_map.get(msg_type)
if info is None:
return None
try:
import importlib
mod = importlib.import_module(info["module"])
return getattr(mod, info["class"], None)
except Exception:
return None
class ControlDeviationChecker(BaseChecker):
"""检测控制指令与实际车体反馈的偏差。
对比 /apollo/control 发出的控制命令与 /apollo/canbus/chassis 的实际反馈,
检查速度、加速度、转向等关键参数的偏差是否超限。
注意:与 ChannelFreqChecker 共享同一 cyber_node无法重复订阅同一通道。
因此直接从 ChannelFreqChecker 获取已收到的 raw data。
"""
def __init__(self, config: dict, exporter: SafetyExporter,
cyber_node=None, channel_freq_checker=None):
super().__init__("ControlDeviationChecker", config, exporter)
self.cyber_node = cyber_node
self._freq_checker = channel_freq_checker # 共享 ChannelFreqChecker 的数据
self._control_raw = None
self._control_time = 0
self._chassis_raw = None
self._chassis_time = 0
self._control_class = _resolve_msg_class("apollo::control::ControlCommand")
self._chassis_class = _resolve_msg_class("apollo::canbus::Chassis")
self._control_reader = None
self._chassis_reader = None
# 阈值配置
threshold_cfg = config.get("threshold", {})
self._speed_threshold = threshold_cfg.get("speed_kph", 5.0)
self._accel_threshold = threshold_cfg.get("accel_mps2", 1.0)
self._steer_threshold = threshold_cfg.get("steer_pct", 10.0)
self._lat_error_threshold = threshold_cfg.get("lateral_error_m", 0.5)
self._heading_error_threshold = threshold_cfg.get("heading_error_deg", 8.0)
self._station_error_threshold = threshold_cfg.get("station_error_m", 1.5)
self._speed_error_threshold = threshold_cfg.get("speed_error_kph", 3.0)
self._data_timeout = config.get("data_timeout", 1.0)
def start_subscribing(self):
if self._freq_checker is not None:
logger.info("Using shared data from ChannelFreqChecker for control/chassis")
return
if self.cyber_node is None:
logger.warning("No cyber node provided, control monitoring disabled")
return
# 回退独立订阅channel 未被其他 checker 占用时)
try:
self._control_reader = self.cyber_node.create_rawdata_reader(
"/apollo/control", self._control_callback)
if self._control_reader is None:
logger.warning("Failed to create reader for /apollo/control (channel already subscribed by another checker)")
else:
logger.info("Subscribed to /apollo/control for control deviation")
except Exception as e:
logger.error(f"Failed to subscribe to /apollo/control: {e}")
try:
self._chassis_reader = self.cyber_node.create_rawdata_reader(
"/apollo/canbus/chassis", self._chassis_callback)
if self._chassis_reader is None:
logger.warning("Failed to create reader for /apollo/canbus/chassis (channel already subscribed by another checker)")
else:
logger.info("Subscribed to /apollo/canbus/chassis for control deviation")
except Exception as e:
logger.error(f"Failed to subscribe to /apollo/canbus/chassis: {e}")
def stop_subscribing(self):
self._control_reader = None
self._chassis_reader = None
def _control_callback(self, raw_data):
self._control_raw = raw_data
self._control_time = time.time()
def _chassis_callback(self, raw_data):
self._chassis_raw = raw_data
self._chassis_time = time.time()
def _parse_control(self):
"""解析 control command protobuf提取关键字段到报告中。"""
if self._control_raw is None:
return None
# 优先使用 protobuf 类解析
if self._control_class is not None:
try:
msg = self._control_class()
msg.ParseFromString(self._control_raw)
return self._extract_control_from_proto(msg)
except Exception as e:
logger.warning(f"Failed to parse control via protobuf: {e}")
# 回退:使用 cyber ChannelUtils 解析(参考 cyber_channel echo
return self._parse_control_via_cyber()
def _extract_control_from_proto(self, msg):
"""从 protobuf 消息对象提取 control 字段。
ControlCommand proto 主要字段control_cmd.proto
- speed (field 9, m/s)
- acceleration (field 10, m/s²)
- steering_target (field 7, %)
- throttle (field 3, %)
- brake (field 4, %)
- gear_location (field 20, GearPosition enum)
- debug (field 22, Debug) → lateral_error, heading_error, station_error
"""
result = {
"speed": self._get_float(msg, "speed", 0),
"accel": self._get_float(msg, "acceleration", 0),
"steer": self._get_float(msg, "steering_target", 0),
"gear": self._get_str(msg, "gear_location", ""),
"throttle": self._get_float(msg, "throttle", 0),
"brake": self._get_float(msg, "brake", 0),
}
# header 嵌套字段
header = getattr(msg, "header", None)
if header is not None:
result["seq_num"] = int(getattr(header, "sequence_num", 0))
result["timestamp_sec"] = float(getattr(header, "timestamp_sec", 0))
status = getattr(header, "status", None)
if status is not None:
err_code = getattr(status, "error_code", None)
if err_code is not None:
# protobuf enum 返回 int0 = OK
if isinstance(err_code, int):
result["error_code"] = "OK" if err_code == 0 else str(err_code)
else:
result["error_code"] = getattr(err_code, "name", str(err_code))
err_msg = getattr(status, "msg", None)
if err_msg:
result["error_msg"] = str(err_msg)
engage = getattr(header, "engage_advice", None)
if engage is not None:
advice = getattr(engage, "advice", None)
if advice is not None:
result["engage_advice"] = getattr(advice, "name", str(advice))
reason = getattr(engage, "reason", None)
if reason:
result["engage_reason"] = str(reason)
# Debug 嵌套消息 — 轨迹跟踪精度指标
debug = getattr(msg, "debug", None)
if debug is not None:
lon = getattr(debug, "simple_lon_debug", None)
if lon is not None:
result["station_error"] = self._get_float(lon, "station_error", 0)
result["speed_error"] = self._get_float(lon, "speed_error", 0)
lat = getattr(debug, "simple_lat_debug", None)
if lat is not None:
result["lateral_error"] = self._get_float(lat, "lateral_error", 0)
result["heading_error"] = self._get_float(lat, "heading_error", 0)
return result
def _parse_control_via_cyber(self):
"""使用 cyber.ChannelUtils 解析 control 原始数据。
参考 cyber_channel echo 的实现:
msgtype = cyber.ChannelUtils.get_msgtype(channel_name, 0).decode('utf-8')
print(cyber.ChannelUtils.get_debugstring_rawmsgdata(msgtype, raw_data).decode('utf-8'))
ControlCommand proto debug string 字段名:
speed: ... (m/s)
acceleration: ... (m/s²)
steering_target: ... (%)
throttle: ... (%)
brake: ... (%)
"""
try:
from cyber.python.cyber_py3 import cyber
raw_type = cyber.ChannelUtils.get_msgtype("/apollo/control", sleep_s=0)
msg_type = raw_type.decode("utf-8") if isinstance(raw_type, bytes) else str(raw_type)
debug_str = cyber.ChannelUtils.get_debugstring_rawmsgdata(msg_type, self._control_raw)
text = debug_str.decode("utf-8") if isinstance(debug_str, bytes) else str(debug_str)
result = {
"speed": 0, "accel": 0, "steer": 0, "gear": "",
"throttle": 0, "brake": 0,
}
for line in text.split("\n"):
line = line.strip()
if line.startswith("speed:"):
try:
result["speed"] = float(line.split(":")[1].strip())
except (ValueError, IndexError):
pass
elif line.startswith("acceleration:"):
try:
result["accel"] = float(line.split(":")[1].strip())
except (ValueError, IndexError):
pass
elif line.startswith("steering_target:"):
try:
result["steer"] = float(line.split(":")[1].strip())
except (ValueError, IndexError):
pass
elif line.startswith("throttle:"):
try:
result["throttle"] = float(line.split(":")[1].strip())
except (ValueError, IndexError):
pass
elif line.startswith("brake:"):
try:
result["brake"] = float(line.split(":")[1].strip())
except (ValueError, IndexError):
pass
return result
except Exception as e:
logger.debug(f"Failed to parse control via cyber: {e}")
return None
def _parse_chassis(self):
"""解析 chassis protobuf。
Chassis proto 主要字段chassis.proto
- speed_mps (field 3, m/s)
- acceleration (field 30, m/s²)
- steering_percentage (field 8, %)
- gear_location (field 20, GearPosition enum)
- driving_mode (field 19, DrivingMode enum)
"""
if self._chassis_raw is None:
return None
# 优先使用 protobuf 类解析
if self._chassis_class is not None:
try:
msg = self._chassis_class()
msg.ParseFromString(self._chassis_raw)
return {
"speed": self._get_float(msg, "speed_mps", 0),
"accel": self._get_float(msg, "acceleration", 0),
"steer": self._get_float(msg, "steering_percentage", 0),
"gear": self._get_str(msg, "gear_location", ""),
"driving_mode": self._get_str(msg, "driving_mode", ""),
}
except Exception as e:
logger.debug(f"Failed to parse chassis via protobuf: {e}")
# 回退:使用 cyber ChannelUtils 解析
return self._parse_chassis_via_cyber()
def _parse_chassis_via_cyber(self):
"""使用 cyber.ChannelUtils 解析 chassis 原始数据。"""
try:
from cyber.python.cyber_py3 import cyber
raw_type = cyber.ChannelUtils.get_msgtype("/apollo/canbus/chassis", sleep_s=0)
msg_type = raw_type.decode("utf-8") if isinstance(raw_type, bytes) else str(raw_type)
debug_str = cyber.ChannelUtils.get_debugstring_rawmsgdata(msg_type, self._chassis_raw)
text = debug_str.decode("utf-8") if isinstance(debug_str, bytes) else str(debug_str)
result = {
"speed": 0, "accel": 0, "steer": 0,
"gear": "", "driving_mode": "",
}
for line in text.split("\n"):
line = line.strip()
if line.startswith("speed_mps:"):
try:
result["speed"] = float(line.split(":")[1].strip())
except (ValueError, IndexError):
pass
elif line.startswith("steering_percentage:"):
try:
result["steer"] = float(line.split(":")[1].strip())
except (ValueError, IndexError):
pass
elif line.startswith("driving_mode:"):
result["driving_mode"] = line.split(":")[1].strip()
elif line.startswith("gear_location:"):
result["gear"] = line.split(":")[1].strip()
return result
except Exception as e:
logger.debug(f"Failed to parse chassis via cyber: {e}")
return None
def _get_float(self, msg, *names, default=0):
for name in names:
val = getattr(msg, name, None)
if val is not None:
try:
return float(val)
except (TypeError, ValueError):
pass
return default
def _get_str(self, msg, *names, default=""):
for name in names:
val = getattr(msg, name, None)
if val is not None:
if hasattr(val, "name"):
return val.name
return str(val)
return default
def run_once(self) -> SafetyStatus:
# 复用 ChannelFreqChecker 已收到的 raw data同一节点无法重复订阅同一通道
if self._freq_checker is not None:
self._control_raw = self._freq_checker._latest_data.get("/apollo/control")
self._control_time = self._freq_checker._latest_time.get("/apollo/control", 0)
self._chassis_raw = self._freq_checker._latest_data.get("/apollo/canbus/chassis")
self._chassis_time = self._freq_checker._latest_time.get("/apollo/canbus/chassis", 0)
if self._control_raw is None or self._chassis_raw is None:
return SafetyStatus(
name="ControlDeviationCheck", source=self.name,
level=SafetyLevel.ERROR,
message="No control or chassis data available",
)
ctrl_elapsed = time.time() - self._control_time
chassis_elapsed = time.time() - self._chassis_time
if ctrl_elapsed > self._data_timeout:
return SafetyStatus(
name="ControlDeviationCheck", source=self.name,
level=SafetyLevel.ERROR,
message=f"Control data timeout ({ctrl_elapsed:.1f}s)",
)
if chassis_elapsed > self._data_timeout:
return SafetyStatus(
name="ControlDeviationCheck", source=self.name,
level=SafetyLevel.ERROR,
message=f"Chassis data timeout ({chassis_elapsed:.1f}s)",
)
control = self._parse_control()
chassis = self._parse_chassis()
# 如果解析失败,但原始数据已到达,报告数据到达状态
if control is None or chassis is None:
details_raw = {
"control_bytes": len(self._control_raw) if self._control_raw else 0,
"chassis_bytes": len(self._chassis_raw) if self._chassis_raw else 0,
}
return SafetyStatus(
name="ControlDeviationCheck", source=self.name,
level=SafetyLevel.WARN,
message=f"Data received but parse failed (control={'OK' if control else 'FAIL'} chassis={'OK' if chassis else 'FAIL'})",
details=details_raw,
tags=["control"],
)
# 检查控制模块内部错误
error_code = control.get("error_code", "")
error_msg = control.get("error_msg", "")
engage_advice = control.get("engage_advice", "")
# 检查驾驶模式
chassis_mode = chassis.get("driving_mode", "")
if chassis_mode in ("EXCEPTION", "MANUAL"):
return SafetyStatus(
name="ControlDeviationCheck", source=self.name,
level=SafetyLevel.ERROR,
message=f"Chassis in {chassis_mode} mode",
details={"driving_mode": chassis_mode},
tags=["control"],
)
details = {
"speed": control["speed"],
"chassis_speed": chassis["speed"],
"steer": control["steer"],
"gear": control["gear"],
"throttle": control["throttle"],
"brake": control["brake"],
}
# Debug 跟踪精度指标
lateral_error = control.get("lateral_error")
heading_error = control.get("heading_error")
station_error = control.get("station_error")
speed_error = control.get("speed_error")
if lateral_error is not None:
details["lateral_error_m"] = round(lateral_error, 3)
if heading_error is not None:
details["heading_error_deg"] = round(math.degrees(heading_error), 1)
if station_error is not None:
details["station_error_m"] = round(station_error, 3)
if speed_error is not None:
details["speed_error_mps"] = round(speed_error, 3)
if error_code:
details["error_code"] = error_code
if error_msg:
details["error_msg"] = error_msg
if engage_advice:
details["engage_advice"] = engage_advice
# 上报控制模块计算错误(通常由上游模块问题导致)
if error_code and error_code != "OK":
logger.warning(f"Control module error: {error_code}{error_msg}")
return SafetyStatus(
name="ControlDeviationCheck", source=self.name,
level=SafetyLevel.ERROR,
message=f"Control {error_code}: {error_msg}",
details=details,
tags=["control"],
)
if engage_advice and "ENGAGE" not in engage_advice:
reason = control.get("engage_reason", "")
logger.info(f"Control engage_advice={engage_advice} {reason}")
# 计算偏差(所有值单位已在 proto 中统一)
# speed: ControlCommand.speed (m/s) ↔ Chassis.speed_mps (m/s)
# acceleration: ControlCommand.acceleration (m/s²) ↔ Chassis.acceleration (m/s²)
# steer: ControlCommand.steering_target (%) ↔ Chassis.steering_percentage (%)
issues = []
ctrl_speed = control["speed"] # m/s
chas_speed = chassis["speed"] # m/s
# 当 ControlCommand.speed 为 0 但车辆明显在行驶时跳过偏差检查
# 该字段在本版本 Apollo 中可能未被 populatedebug 用途),
# 此时 throttle/brake 才是实际控制输出。
skip_speed_check = (ctrl_speed == 0.0 and chas_speed > 1.0)
if not skip_speed_check:
speed_diff_kph = abs(ctrl_speed - chas_speed) * 3.6
if speed_diff_kph > self._speed_threshold:
level = SafetyLevel.ERROR if speed_diff_kph > self._speed_threshold * 2 else SafetyLevel.WARN
issues.append((level, f"Speed deviation: {speed_diff_kph:.1f} kph "
f"(cmd={ctrl_speed:.1f} m/s, actual={chas_speed:.1f} m/s)"))
else:
speed_diff_kph = 0.0
ctrl_accel = control["accel"] # m/s²
chas_accel = chassis["accel"] # m/s²
accel_diff = abs(ctrl_accel - chas_accel)
if accel_diff > self._accel_threshold:
level = SafetyLevel.ERROR if accel_diff > self._accel_threshold * 2 else SafetyLevel.WARN
issues.append((level, f"Accel deviation: {accel_diff:.1f} m/s2"))
ctrl_steer = control["steer"]
chas_steer = chassis["steer"]
steer_diff = abs(ctrl_steer - chas_steer)
if steer_diff > self._steer_threshold:
level = SafetyLevel.ERROR if steer_diff > self._steer_threshold * 2 else SafetyLevel.WARN
issues.append((level, f"Steer deviation: {steer_diff:.1f}%"))
# 轨迹跟踪偏差
if lateral_error is not None and lateral_error > self._lat_error_threshold:
level = (SafetyLevel.ERROR
if lateral_error > self._lat_error_threshold * 2
else SafetyLevel.WARN)
issues.append((level, f"Lateral error: {lateral_error:.3f}m"))
if heading_error is not None and math.degrees(heading_error) > self._heading_error_threshold:
h_deg = math.degrees(heading_error)
level = (SafetyLevel.ERROR
if h_deg > self._heading_error_threshold * 2
else SafetyLevel.WARN)
issues.append((level, f"Heading error: {h_deg:.1f}°"))
if station_error is not None and abs(station_error) > self._station_error_threshold:
se = abs(station_error)
level = (SafetyLevel.ERROR
if se > self._station_error_threshold * 2
else SafetyLevel.WARN)
issues.append((level, f"Station error: {station_error:.3f}m"))
if speed_error is not None and abs(speed_error) * 3.6 > self._speed_error_threshold:
se_kph = abs(speed_error) * 3.6
level = (SafetyLevel.ERROR
if se_kph > self._speed_error_threshold * 2
else SafetyLevel.WARN)
issues.append((level, f"Speed tracking error: {se_kph:.1f}kph"))
if issues:
worst = max(issues, key=lambda x: x[0])
msgs = [m for _, m in issues]
logger.warning(f"Control deviation: {'; '.join(msgs)}")
return SafetyStatus(
name="ControlDeviationCheck", source=self.name,
level=worst[0],
message=f"Control deviation: {'; '.join(msgs)}",
details=details,
tags=["control"],
)
logger.info(f"Control deviation OK (speed_diff={speed_diff_kph:.1f}kph, "
f"accel_diff={accel_diff:.1f}, steer_diff={steer_diff:.1f})")
return SafetyStatus(
name="ControlDeviationCheck", source=self.name,
level=SafetyLevel.OK,
message=f"Control deviation OK (speed_diff={speed_diff_kph:.1f}kph)",
details=details,
tags=["control"],
)