You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

88 lines
2.7 KiB

from dataclasses import dataclass, field
from enum import Enum
from typing import Optional, List
from datetime import datetime
_LEVEL_ORDER = {"OK": 0, "WARN": 1, "ERROR": 2, "FATAL": 3}
class RiskLevel(Enum):
OK = "OK"
WARN = "WARN"
ERROR = "ERROR"
FATAL = "FATAL"
def __lt__(self, other):
if self.__class__ is other.__class__:
return _LEVEL_ORDER[self.value] < _LEVEL_ORDER[other.value]
return NotImplemented
def __gt__(self, other):
if self.__class__ is other.__class__:
return _LEVEL_ORDER[self.value] > _LEVEL_ORDER[other.value]
return NotImplemented
def __le__(self, other):
if self.__class__ is other.__class__:
return _LEVEL_ORDER[self.value] <= _LEVEL_ORDER[other.value]
return NotImplemented
def __ge__(self, other):
if self.__class__ is other.__class__:
return _LEVEL_ORDER[self.value] >= _LEVEL_ORDER[other.value]
return NotImplemented
@classmethod
def from_str(cls, level_str: str) -> "RiskLevel":
return cls[level_str]
@dataclass
class DetectedObject:
obstacle_id: int
obstacle_type: str
speed_mps: float # measured speed in m/s
speed_kph: float # measured speed in km/h
position_x: float
position_y: float
distance: float # distance from ego (m)
tracking_time: float
level: RiskLevel = RiskLevel.OK # speed rating for this object
def to_dict(self) -> dict:
return {
"id": self.obstacle_id,
"type": self.obstacle_type,
"speed_kph": round(self.speed_kph, 1),
"speed_level": self.level.value,
"position": {"x": round(self.position_x, 1), "y": round(self.position_y, 1)},
"distance_m": round(self.distance, 1),
}
@dataclass
class SpeedAlert:
"""Alert generated when a detected object exceeds speed thresholds."""
level: RiskLevel
primary_object: Optional[DetectedObject] = None
objects: List[DetectedObject] = field(default_factory=list)
total_objects: int = 0
speeding_count: int = 0
message: str = ""
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
def to_dict(self) -> dict:
d = {
"level": self.level.value,
"total_objects": self.total_objects,
"speeding_count": self.speeding_count,
"message": self.message,
"timestamp": self.timestamp,
}
if self.primary_object:
d["fastest"] = self.primary_object.to_dict()
if self.objects:
d["objects"] = [o.to_dict() for o in self.objects]
return d