You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
88 lines
2.7 KiB
88 lines
2.7 KiB
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from typing import Optional, List
|
|
from datetime import datetime
|
|
|
|
|
|
_LEVEL_ORDER = {"OK": 0, "WARN": 1, "ERROR": 2, "FATAL": 3}
|
|
|
|
|
|
class RiskLevel(Enum):
|
|
OK = "OK"
|
|
WARN = "WARN"
|
|
ERROR = "ERROR"
|
|
FATAL = "FATAL"
|
|
|
|
def __lt__(self, other):
|
|
if self.__class__ is other.__class__:
|
|
return _LEVEL_ORDER[self.value] < _LEVEL_ORDER[other.value]
|
|
return NotImplemented
|
|
|
|
def __gt__(self, other):
|
|
if self.__class__ is other.__class__:
|
|
return _LEVEL_ORDER[self.value] > _LEVEL_ORDER[other.value]
|
|
return NotImplemented
|
|
|
|
def __le__(self, other):
|
|
if self.__class__ is other.__class__:
|
|
return _LEVEL_ORDER[self.value] <= _LEVEL_ORDER[other.value]
|
|
return NotImplemented
|
|
|
|
def __ge__(self, other):
|
|
if self.__class__ is other.__class__:
|
|
return _LEVEL_ORDER[self.value] >= _LEVEL_ORDER[other.value]
|
|
return NotImplemented
|
|
|
|
@classmethod
|
|
def from_str(cls, level_str: str) -> "RiskLevel":
|
|
return cls[level_str]
|
|
|
|
|
|
@dataclass
|
|
class DetectedObject:
|
|
obstacle_id: int
|
|
obstacle_type: str
|
|
speed_mps: float # measured speed in m/s
|
|
speed_kph: float # measured speed in km/h
|
|
position_x: float
|
|
position_y: float
|
|
distance: float # distance from ego (m)
|
|
tracking_time: float
|
|
level: RiskLevel = RiskLevel.OK # speed rating for this object
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"id": self.obstacle_id,
|
|
"type": self.obstacle_type,
|
|
"speed_kph": round(self.speed_kph, 1),
|
|
"speed_level": self.level.value,
|
|
"position": {"x": round(self.position_x, 1), "y": round(self.position_y, 1)},
|
|
"distance_m": round(self.distance, 1),
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class SpeedAlert:
|
|
"""Alert generated when a detected object exceeds speed thresholds."""
|
|
level: RiskLevel
|
|
primary_object: Optional[DetectedObject] = None
|
|
objects: List[DetectedObject] = field(default_factory=list)
|
|
total_objects: int = 0
|
|
speeding_count: int = 0
|
|
message: str = ""
|
|
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
|
|
|
|
def to_dict(self) -> dict:
|
|
d = {
|
|
"level": self.level.value,
|
|
"total_objects": self.total_objects,
|
|
"speeding_count": self.speeding_count,
|
|
"message": self.message,
|
|
"timestamp": self.timestamp,
|
|
}
|
|
if self.primary_object:
|
|
d["fastest"] = self.primary_object.to_dict()
|
|
if self.objects:
|
|
d["objects"] = [o.to_dict() for o in self.objects]
|
|
return d
|