|
|
import math
|
|
|
import time
|
|
|
import logging
|
|
|
from checkers.base import BaseChecker
|
|
|
from exporter import SafetyExporter
|
|
|
from models import SafetyStatus, SafetyLevel
|
|
|
|
|
|
logger = logging.getLogger("Checker.Control")
|
|
|
|
|
|
|
|
|
# protobuf 消息类型映射
|
|
|
_CONTROL_MSG_TYPE = "apollo::control::ControlCommand"
|
|
|
_CHASSIS_MSG_TYPE = "apollo::canbus::Chassis"
|
|
|
|
|
|
|
|
|
def _resolve_msg_class(msg_type: str):
|
|
|
"""动态加载 protobuf 类。"""
|
|
|
type_map = {
|
|
|
"apollo::control::ControlCommand": {
|
|
|
"module": "modules.common_msgs.control_msgs.control_cmd_pb2",
|
|
|
"class": "ControlCommand",
|
|
|
},
|
|
|
"apollo::canbus::Chassis": {
|
|
|
"module": "modules.common_msgs.chassis_msgs.chassis_pb2",
|
|
|
"class": "Chassis",
|
|
|
},
|
|
|
}
|
|
|
info = type_map.get(msg_type)
|
|
|
if info is None:
|
|
|
return None
|
|
|
try:
|
|
|
import importlib
|
|
|
mod = importlib.import_module(info["module"])
|
|
|
return getattr(mod, info["class"], None)
|
|
|
except Exception:
|
|
|
return None
|
|
|
|
|
|
|
|
|
class ControlDeviationChecker(BaseChecker):
|
|
|
"""检测控制指令与实际车体反馈的偏差。
|
|
|
|
|
|
对比 /apollo/control 发出的控制命令与 /apollo/canbus/chassis 的实际反馈,
|
|
|
检查速度、加速度、转向等关键参数的偏差是否超限。
|
|
|
|
|
|
注意:与 ChannelFreqChecker 共享同一 cyber_node,无法重复订阅同一通道。
|
|
|
因此直接从 ChannelFreqChecker 获取已收到的 raw data。
|
|
|
"""
|
|
|
|
|
|
def __init__(self, config: dict, exporter: SafetyExporter,
|
|
|
cyber_node=None, channel_freq_checker=None):
|
|
|
super().__init__("ControlDeviationChecker", config, exporter)
|
|
|
self.cyber_node = cyber_node
|
|
|
self._freq_checker = channel_freq_checker # 共享 ChannelFreqChecker 的数据
|
|
|
self._control_raw = None
|
|
|
self._control_time = 0
|
|
|
self._chassis_raw = None
|
|
|
self._chassis_time = 0
|
|
|
self._control_class = _resolve_msg_class("apollo::control::ControlCommand")
|
|
|
self._chassis_class = _resolve_msg_class("apollo::canbus::Chassis")
|
|
|
self._control_reader = None
|
|
|
self._chassis_reader = None
|
|
|
|
|
|
# 阈值配置
|
|
|
threshold_cfg = config.get("threshold", {})
|
|
|
self._speed_threshold = threshold_cfg.get("speed_kph", 5.0)
|
|
|
self._accel_threshold = threshold_cfg.get("accel_mps2", 1.0)
|
|
|
self._steer_threshold = threshold_cfg.get("steer_pct", 10.0)
|
|
|
self._lat_error_threshold = threshold_cfg.get("lateral_error_m", 0.5)
|
|
|
self._heading_error_threshold = threshold_cfg.get("heading_error_deg", 8.0)
|
|
|
self._station_error_threshold = threshold_cfg.get("station_error_m", 1.5)
|
|
|
self._speed_error_threshold = threshold_cfg.get("speed_error_kph", 3.0)
|
|
|
self._data_timeout = config.get("data_timeout", 1.0)
|
|
|
|
|
|
def start_subscribing(self):
|
|
|
if self._freq_checker is not None:
|
|
|
logger.info("Using shared data from ChannelFreqChecker for control/chassis")
|
|
|
return
|
|
|
if self.cyber_node is None:
|
|
|
logger.warning("No cyber node provided, control monitoring disabled")
|
|
|
return
|
|
|
# 回退:独立订阅(channel 未被其他 checker 占用时)
|
|
|
try:
|
|
|
self._control_reader = self.cyber_node.create_rawdata_reader(
|
|
|
"/apollo/control", self._control_callback)
|
|
|
if self._control_reader is None:
|
|
|
logger.warning("Failed to create reader for /apollo/control (channel already subscribed by another checker)")
|
|
|
else:
|
|
|
logger.info("Subscribed to /apollo/control for control deviation")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to subscribe to /apollo/control: {e}")
|
|
|
try:
|
|
|
self._chassis_reader = self.cyber_node.create_rawdata_reader(
|
|
|
"/apollo/canbus/chassis", self._chassis_callback)
|
|
|
if self._chassis_reader is None:
|
|
|
logger.warning("Failed to create reader for /apollo/canbus/chassis (channel already subscribed by another checker)")
|
|
|
else:
|
|
|
logger.info("Subscribed to /apollo/canbus/chassis for control deviation")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to subscribe to /apollo/canbus/chassis: {e}")
|
|
|
|
|
|
def stop_subscribing(self):
|
|
|
self._control_reader = None
|
|
|
self._chassis_reader = None
|
|
|
|
|
|
def _control_callback(self, raw_data):
|
|
|
self._control_raw = raw_data
|
|
|
self._control_time = time.time()
|
|
|
|
|
|
def _chassis_callback(self, raw_data):
|
|
|
self._chassis_raw = raw_data
|
|
|
self._chassis_time = time.time()
|
|
|
|
|
|
def _parse_control(self):
|
|
|
"""解析 control command protobuf,提取关键字段到报告中。"""
|
|
|
if self._control_raw is None:
|
|
|
return None
|
|
|
|
|
|
# 优先使用 protobuf 类解析
|
|
|
if self._control_class is not None:
|
|
|
try:
|
|
|
msg = self._control_class()
|
|
|
msg.ParseFromString(self._control_raw)
|
|
|
return self._extract_control_from_proto(msg)
|
|
|
except Exception as e:
|
|
|
logger.warning(f"Failed to parse control via protobuf: {e}")
|
|
|
|
|
|
# 回退:使用 cyber ChannelUtils 解析(参考 cyber_channel echo)
|
|
|
return self._parse_control_via_cyber()
|
|
|
|
|
|
def _extract_control_from_proto(self, msg):
|
|
|
"""从 protobuf 消息对象提取 control 字段。
|
|
|
|
|
|
ControlCommand proto 主要字段(control_cmd.proto):
|
|
|
- speed (field 9, m/s)
|
|
|
- acceleration (field 10, m/s²)
|
|
|
- steering_target (field 7, %)
|
|
|
- throttle (field 3, %)
|
|
|
- brake (field 4, %)
|
|
|
- gear_location (field 20, GearPosition enum)
|
|
|
- debug (field 22, Debug) → lateral_error, heading_error, station_error
|
|
|
"""
|
|
|
result = {
|
|
|
"speed": self._get_float(msg, "speed", 0),
|
|
|
"accel": self._get_float(msg, "acceleration", 0),
|
|
|
"steer": self._get_float(msg, "steering_target", 0),
|
|
|
"gear": self._get_str(msg, "gear_location", ""),
|
|
|
"throttle": self._get_float(msg, "throttle", 0),
|
|
|
"brake": self._get_float(msg, "brake", 0),
|
|
|
}
|
|
|
|
|
|
# header 嵌套字段
|
|
|
header = getattr(msg, "header", None)
|
|
|
if header is not None:
|
|
|
result["seq_num"] = int(getattr(header, "sequence_num", 0))
|
|
|
result["timestamp_sec"] = float(getattr(header, "timestamp_sec", 0))
|
|
|
|
|
|
status = getattr(header, "status", None)
|
|
|
if status is not None:
|
|
|
err_code = getattr(status, "error_code", None)
|
|
|
if err_code is not None:
|
|
|
# protobuf enum 返回 int,0 = OK
|
|
|
if isinstance(err_code, int):
|
|
|
result["error_code"] = "OK" if err_code == 0 else str(err_code)
|
|
|
else:
|
|
|
result["error_code"] = getattr(err_code, "name", str(err_code))
|
|
|
err_msg = getattr(status, "msg", None)
|
|
|
if err_msg:
|
|
|
result["error_msg"] = str(err_msg)
|
|
|
|
|
|
engage = getattr(header, "engage_advice", None)
|
|
|
if engage is not None:
|
|
|
advice = getattr(engage, "advice", None)
|
|
|
if advice is not None:
|
|
|
result["engage_advice"] = getattr(advice, "name", str(advice))
|
|
|
reason = getattr(engage, "reason", None)
|
|
|
if reason:
|
|
|
result["engage_reason"] = str(reason)
|
|
|
|
|
|
# Debug 嵌套消息 — 轨迹跟踪精度指标
|
|
|
debug = getattr(msg, "debug", None)
|
|
|
if debug is not None:
|
|
|
lon = getattr(debug, "simple_lon_debug", None)
|
|
|
if lon is not None:
|
|
|
result["station_error"] = self._get_float(lon, "station_error", 0)
|
|
|
result["speed_error"] = self._get_float(lon, "speed_error", 0)
|
|
|
lat = getattr(debug, "simple_lat_debug", None)
|
|
|
if lat is not None:
|
|
|
result["lateral_error"] = self._get_float(lat, "lateral_error", 0)
|
|
|
result["heading_error"] = self._get_float(lat, "heading_error", 0)
|
|
|
|
|
|
return result
|
|
|
|
|
|
def _parse_control_via_cyber(self):
|
|
|
"""使用 cyber.ChannelUtils 解析 control 原始数据。
|
|
|
|
|
|
参考 cyber_channel echo 的实现:
|
|
|
msgtype = cyber.ChannelUtils.get_msgtype(channel_name, 0).decode('utf-8')
|
|
|
print(cyber.ChannelUtils.get_debugstring_rawmsgdata(msgtype, raw_data).decode('utf-8'))
|
|
|
|
|
|
ControlCommand proto debug string 字段名:
|
|
|
speed: ... (m/s)
|
|
|
acceleration: ... (m/s²)
|
|
|
steering_target: ... (%)
|
|
|
throttle: ... (%)
|
|
|
brake: ... (%)
|
|
|
"""
|
|
|
try:
|
|
|
from cyber.python.cyber_py3 import cyber
|
|
|
raw_type = cyber.ChannelUtils.get_msgtype("/apollo/control", sleep_s=0)
|
|
|
msg_type = raw_type.decode("utf-8") if isinstance(raw_type, bytes) else str(raw_type)
|
|
|
debug_str = cyber.ChannelUtils.get_debugstring_rawmsgdata(msg_type, self._control_raw)
|
|
|
text = debug_str.decode("utf-8") if isinstance(debug_str, bytes) else str(debug_str)
|
|
|
|
|
|
result = {
|
|
|
"speed": 0, "accel": 0, "steer": 0, "gear": "",
|
|
|
"throttle": 0, "brake": 0,
|
|
|
}
|
|
|
|
|
|
for line in text.split("\n"):
|
|
|
line = line.strip()
|
|
|
if line.startswith("speed:"):
|
|
|
try:
|
|
|
result["speed"] = float(line.split(":")[1].strip())
|
|
|
except (ValueError, IndexError):
|
|
|
pass
|
|
|
elif line.startswith("acceleration:"):
|
|
|
try:
|
|
|
result["accel"] = float(line.split(":")[1].strip())
|
|
|
except (ValueError, IndexError):
|
|
|
pass
|
|
|
elif line.startswith("steering_target:"):
|
|
|
try:
|
|
|
result["steer"] = float(line.split(":")[1].strip())
|
|
|
except (ValueError, IndexError):
|
|
|
pass
|
|
|
elif line.startswith("throttle:"):
|
|
|
try:
|
|
|
result["throttle"] = float(line.split(":")[1].strip())
|
|
|
except (ValueError, IndexError):
|
|
|
pass
|
|
|
elif line.startswith("brake:"):
|
|
|
try:
|
|
|
result["brake"] = float(line.split(":")[1].strip())
|
|
|
except (ValueError, IndexError):
|
|
|
pass
|
|
|
|
|
|
return result
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Failed to parse control via cyber: {e}")
|
|
|
return None
|
|
|
|
|
|
def _parse_chassis(self):
|
|
|
"""解析 chassis protobuf。
|
|
|
|
|
|
Chassis proto 主要字段(chassis.proto):
|
|
|
- speed_mps (field 3, m/s)
|
|
|
- acceleration (field 30, m/s²)
|
|
|
- steering_percentage (field 8, %)
|
|
|
- gear_location (field 20, GearPosition enum)
|
|
|
- driving_mode (field 19, DrivingMode enum)
|
|
|
"""
|
|
|
if self._chassis_raw is None:
|
|
|
return None
|
|
|
|
|
|
# 优先使用 protobuf 类解析
|
|
|
if self._chassis_class is not None:
|
|
|
try:
|
|
|
msg = self._chassis_class()
|
|
|
msg.ParseFromString(self._chassis_raw)
|
|
|
return {
|
|
|
"speed": self._get_float(msg, "speed_mps", 0),
|
|
|
"accel": self._get_float(msg, "acceleration", 0),
|
|
|
"steer": self._get_float(msg, "steering_percentage", 0),
|
|
|
"gear": self._get_str(msg, "gear_location", ""),
|
|
|
"driving_mode": self._get_str(msg, "driving_mode", ""),
|
|
|
}
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Failed to parse chassis via protobuf: {e}")
|
|
|
|
|
|
# 回退:使用 cyber ChannelUtils 解析
|
|
|
return self._parse_chassis_via_cyber()
|
|
|
|
|
|
def _parse_chassis_via_cyber(self):
|
|
|
"""使用 cyber.ChannelUtils 解析 chassis 原始数据。"""
|
|
|
try:
|
|
|
from cyber.python.cyber_py3 import cyber
|
|
|
raw_type = cyber.ChannelUtils.get_msgtype("/apollo/canbus/chassis", sleep_s=0)
|
|
|
msg_type = raw_type.decode("utf-8") if isinstance(raw_type, bytes) else str(raw_type)
|
|
|
debug_str = cyber.ChannelUtils.get_debugstring_rawmsgdata(msg_type, self._chassis_raw)
|
|
|
text = debug_str.decode("utf-8") if isinstance(debug_str, bytes) else str(debug_str)
|
|
|
|
|
|
result = {
|
|
|
"speed": 0, "accel": 0, "steer": 0,
|
|
|
"gear": "", "driving_mode": "",
|
|
|
}
|
|
|
|
|
|
for line in text.split("\n"):
|
|
|
line = line.strip()
|
|
|
if line.startswith("speed_mps:"):
|
|
|
try:
|
|
|
result["speed"] = float(line.split(":")[1].strip())
|
|
|
except (ValueError, IndexError):
|
|
|
pass
|
|
|
elif line.startswith("steering_percentage:"):
|
|
|
try:
|
|
|
result["steer"] = float(line.split(":")[1].strip())
|
|
|
except (ValueError, IndexError):
|
|
|
pass
|
|
|
elif line.startswith("driving_mode:"):
|
|
|
result["driving_mode"] = line.split(":")[1].strip()
|
|
|
elif line.startswith("gear_location:"):
|
|
|
result["gear"] = line.split(":")[1].strip()
|
|
|
|
|
|
return result
|
|
|
except Exception as e:
|
|
|
logger.debug(f"Failed to parse chassis via cyber: {e}")
|
|
|
return None
|
|
|
|
|
|
def _get_float(self, msg, *names, default=0):
|
|
|
for name in names:
|
|
|
val = getattr(msg, name, None)
|
|
|
if val is not None:
|
|
|
try:
|
|
|
return float(val)
|
|
|
except (TypeError, ValueError):
|
|
|
pass
|
|
|
return default
|
|
|
|
|
|
def _get_str(self, msg, *names, default=""):
|
|
|
for name in names:
|
|
|
val = getattr(msg, name, None)
|
|
|
if val is not None:
|
|
|
if hasattr(val, "name"):
|
|
|
return val.name
|
|
|
return str(val)
|
|
|
return default
|
|
|
|
|
|
def run_once(self) -> SafetyStatus:
|
|
|
# 复用 ChannelFreqChecker 已收到的 raw data(同一节点无法重复订阅同一通道)
|
|
|
if self._freq_checker is not None:
|
|
|
self._control_raw = self._freq_checker._latest_data.get("/apollo/control")
|
|
|
self._control_time = self._freq_checker._latest_time.get("/apollo/control", 0)
|
|
|
self._chassis_raw = self._freq_checker._latest_data.get("/apollo/canbus/chassis")
|
|
|
self._chassis_time = self._freq_checker._latest_time.get("/apollo/canbus/chassis", 0)
|
|
|
|
|
|
if self._control_raw is None or self._chassis_raw is None:
|
|
|
return SafetyStatus(
|
|
|
name="ControlDeviationCheck", source=self.name,
|
|
|
level=SafetyLevel.ERROR,
|
|
|
message="No control or chassis data available",
|
|
|
)
|
|
|
|
|
|
ctrl_elapsed = time.time() - self._control_time
|
|
|
chassis_elapsed = time.time() - self._chassis_time
|
|
|
|
|
|
if ctrl_elapsed > self._data_timeout:
|
|
|
return SafetyStatus(
|
|
|
name="ControlDeviationCheck", source=self.name,
|
|
|
level=SafetyLevel.ERROR,
|
|
|
message=f"Control data timeout ({ctrl_elapsed:.1f}s)",
|
|
|
)
|
|
|
if chassis_elapsed > self._data_timeout:
|
|
|
return SafetyStatus(
|
|
|
name="ControlDeviationCheck", source=self.name,
|
|
|
level=SafetyLevel.ERROR,
|
|
|
message=f"Chassis data timeout ({chassis_elapsed:.1f}s)",
|
|
|
)
|
|
|
|
|
|
control = self._parse_control()
|
|
|
chassis = self._parse_chassis()
|
|
|
|
|
|
# 如果解析失败,但原始数据已到达,报告数据到达状态
|
|
|
if control is None or chassis is None:
|
|
|
details_raw = {
|
|
|
"control_bytes": len(self._control_raw) if self._control_raw else 0,
|
|
|
"chassis_bytes": len(self._chassis_raw) if self._chassis_raw else 0,
|
|
|
}
|
|
|
return SafetyStatus(
|
|
|
name="ControlDeviationCheck", source=self.name,
|
|
|
level=SafetyLevel.WARN,
|
|
|
message=f"Data received but parse failed (control={'OK' if control else 'FAIL'} chassis={'OK' if chassis else 'FAIL'})",
|
|
|
details=details_raw,
|
|
|
tags=["control"],
|
|
|
)
|
|
|
|
|
|
# 检查控制模块内部错误
|
|
|
error_code = control.get("error_code", "")
|
|
|
error_msg = control.get("error_msg", "")
|
|
|
engage_advice = control.get("engage_advice", "")
|
|
|
|
|
|
# 检查驾驶模式
|
|
|
chassis_mode = chassis.get("driving_mode", "")
|
|
|
if chassis_mode in ("EXCEPTION", "MANUAL"):
|
|
|
return SafetyStatus(
|
|
|
name="ControlDeviationCheck", source=self.name,
|
|
|
level=SafetyLevel.ERROR,
|
|
|
message=f"Chassis in {chassis_mode} mode",
|
|
|
details={"driving_mode": chassis_mode},
|
|
|
tags=["control"],
|
|
|
)
|
|
|
|
|
|
details = {
|
|
|
"speed": control["speed"],
|
|
|
"chassis_speed": chassis["speed"],
|
|
|
"steer": control["steer"],
|
|
|
"gear": control["gear"],
|
|
|
"throttle": control["throttle"],
|
|
|
"brake": control["brake"],
|
|
|
}
|
|
|
|
|
|
# Debug 跟踪精度指标
|
|
|
lateral_error = control.get("lateral_error")
|
|
|
heading_error = control.get("heading_error")
|
|
|
station_error = control.get("station_error")
|
|
|
speed_error = control.get("speed_error")
|
|
|
if lateral_error is not None:
|
|
|
details["lateral_error_m"] = round(lateral_error, 3)
|
|
|
if heading_error is not None:
|
|
|
details["heading_error_deg"] = round(math.degrees(heading_error), 1)
|
|
|
if station_error is not None:
|
|
|
details["station_error_m"] = round(station_error, 3)
|
|
|
if speed_error is not None:
|
|
|
details["speed_error_mps"] = round(speed_error, 3)
|
|
|
|
|
|
if error_code:
|
|
|
details["error_code"] = error_code
|
|
|
if error_msg:
|
|
|
details["error_msg"] = error_msg
|
|
|
if engage_advice:
|
|
|
details["engage_advice"] = engage_advice
|
|
|
|
|
|
# 上报控制模块计算错误(通常由上游模块问题导致)
|
|
|
if error_code and error_code != "OK":
|
|
|
logger.warning(f"Control module error: {error_code} — {error_msg}")
|
|
|
return SafetyStatus(
|
|
|
name="ControlDeviationCheck", source=self.name,
|
|
|
level=SafetyLevel.ERROR,
|
|
|
message=f"Control {error_code}: {error_msg}",
|
|
|
details=details,
|
|
|
tags=["control"],
|
|
|
)
|
|
|
|
|
|
if engage_advice and "ENGAGE" not in engage_advice:
|
|
|
reason = control.get("engage_reason", "")
|
|
|
logger.info(f"Control engage_advice={engage_advice} {reason}")
|
|
|
|
|
|
# 计算偏差(所有值单位已在 proto 中统一)
|
|
|
# speed: ControlCommand.speed (m/s) ↔ Chassis.speed_mps (m/s)
|
|
|
# acceleration: ControlCommand.acceleration (m/s²) ↔ Chassis.acceleration (m/s²)
|
|
|
# steer: ControlCommand.steering_target (%) ↔ Chassis.steering_percentage (%)
|
|
|
issues = []
|
|
|
ctrl_speed = control["speed"] # m/s
|
|
|
chas_speed = chassis["speed"] # m/s
|
|
|
|
|
|
# 当 ControlCommand.speed 为 0 但车辆明显在行驶时跳过偏差检查
|
|
|
# 该字段在本版本 Apollo 中可能未被 populate(debug 用途),
|
|
|
# 此时 throttle/brake 才是实际控制输出。
|
|
|
skip_speed_check = (ctrl_speed == 0.0 and chas_speed > 1.0)
|
|
|
|
|
|
if not skip_speed_check:
|
|
|
speed_diff_kph = abs(ctrl_speed - chas_speed) * 3.6
|
|
|
if speed_diff_kph > self._speed_threshold:
|
|
|
level = SafetyLevel.ERROR if speed_diff_kph > self._speed_threshold * 2 else SafetyLevel.WARN
|
|
|
issues.append((level, f"Speed deviation: {speed_diff_kph:.1f} kph "
|
|
|
f"(cmd={ctrl_speed:.1f} m/s, actual={chas_speed:.1f} m/s)"))
|
|
|
else:
|
|
|
speed_diff_kph = 0.0
|
|
|
|
|
|
ctrl_accel = control["accel"] # m/s²
|
|
|
chas_accel = chassis["accel"] # m/s²
|
|
|
accel_diff = abs(ctrl_accel - chas_accel)
|
|
|
if accel_diff > self._accel_threshold:
|
|
|
level = SafetyLevel.ERROR if accel_diff > self._accel_threshold * 2 else SafetyLevel.WARN
|
|
|
issues.append((level, f"Accel deviation: {accel_diff:.1f} m/s2"))
|
|
|
|
|
|
ctrl_steer = control["steer"]
|
|
|
chas_steer = chassis["steer"]
|
|
|
steer_diff = abs(ctrl_steer - chas_steer)
|
|
|
if steer_diff > self._steer_threshold:
|
|
|
level = SafetyLevel.ERROR if steer_diff > self._steer_threshold * 2 else SafetyLevel.WARN
|
|
|
issues.append((level, f"Steer deviation: {steer_diff:.1f}%"))
|
|
|
|
|
|
# 轨迹跟踪偏差
|
|
|
if lateral_error is not None and lateral_error > self._lat_error_threshold:
|
|
|
level = (SafetyLevel.ERROR
|
|
|
if lateral_error > self._lat_error_threshold * 2
|
|
|
else SafetyLevel.WARN)
|
|
|
issues.append((level, f"Lateral error: {lateral_error:.3f}m"))
|
|
|
if heading_error is not None and math.degrees(heading_error) > self._heading_error_threshold:
|
|
|
h_deg = math.degrees(heading_error)
|
|
|
level = (SafetyLevel.ERROR
|
|
|
if h_deg > self._heading_error_threshold * 2
|
|
|
else SafetyLevel.WARN)
|
|
|
issues.append((level, f"Heading error: {h_deg:.1f}°"))
|
|
|
if station_error is not None and abs(station_error) > self._station_error_threshold:
|
|
|
se = abs(station_error)
|
|
|
level = (SafetyLevel.ERROR
|
|
|
if se > self._station_error_threshold * 2
|
|
|
else SafetyLevel.WARN)
|
|
|
issues.append((level, f"Station error: {station_error:.3f}m"))
|
|
|
if speed_error is not None and abs(speed_error) * 3.6 > self._speed_error_threshold:
|
|
|
se_kph = abs(speed_error) * 3.6
|
|
|
level = (SafetyLevel.ERROR
|
|
|
if se_kph > self._speed_error_threshold * 2
|
|
|
else SafetyLevel.WARN)
|
|
|
issues.append((level, f"Speed tracking error: {se_kph:.1f}kph"))
|
|
|
|
|
|
if issues:
|
|
|
worst = max(issues, key=lambda x: x[0])
|
|
|
msgs = [m for _, m in issues]
|
|
|
logger.warning(f"Control deviation: {'; '.join(msgs)}")
|
|
|
return SafetyStatus(
|
|
|
name="ControlDeviationCheck", source=self.name,
|
|
|
level=worst[0],
|
|
|
message=f"Control deviation: {'; '.join(msgs)}",
|
|
|
details=details,
|
|
|
tags=["control"],
|
|
|
)
|
|
|
|
|
|
logger.info(f"Control deviation OK (speed_diff={speed_diff_kph:.1f}kph, "
|
|
|
f"accel_diff={accel_diff:.1f}, steer_diff={steer_diff:.1f})")
|
|
|
return SafetyStatus(
|
|
|
name="ControlDeviationCheck", source=self.name,
|
|
|
level=SafetyLevel.OK,
|
|
|
message=f"Control deviation OK (speed_diff={speed_diff_kph:.1f}kph)",
|
|
|
details=details,
|
|
|
tags=["control"],
|
|
|
)
|