You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
54 lines
1.7 KiB
54 lines
1.7 KiB
from models import AnalyzerDecision, AggregatedStatus, SafetyLevel
|
|
from typing import Dict, List
|
|
import logging
|
|
|
|
logger = logging.getLogger("Aggregator")
|
|
|
|
|
|
class ModuleAggregator:
|
|
MODULE_MAPPING = {
|
|
"ChannelFreq": "communication",
|
|
"ControlDeviation": "control",
|
|
"Collision": "communication",
|
|
"FileExist": "system",
|
|
"Latency": "communication",
|
|
"Localization": "communication",
|
|
"Process": "system",
|
|
"Resource": "resource",
|
|
"Battery": "vehicle",
|
|
"SystemHealth": "system",
|
|
"GnssSignal": "communication",
|
|
"TrafficLight": "perception",
|
|
}
|
|
|
|
def __init__(self):
|
|
self._module_decisions: Dict[str, List[AnalyzerDecision]] = {}
|
|
|
|
def add_decision(self, decision: AnalyzerDecision):
|
|
module = "unknown"
|
|
for prefix, mod in self.MODULE_MAPPING.items():
|
|
if decision.source.startswith(prefix):
|
|
module = mod
|
|
break
|
|
|
|
self._module_decisions.setdefault(module, []).append(decision)
|
|
logger.debug(f"Aggregated: {decision.source} -> module:{module}")
|
|
|
|
def clear(self):
|
|
self._module_decisions.clear()
|
|
|
|
def get_aggregated_status(self) -> List[AggregatedStatus]:
|
|
results = []
|
|
for module, decisions in self._module_decisions.items():
|
|
if not decisions:
|
|
continue
|
|
max_level = max(d.level for d in decisions)
|
|
summary = f"{module}: {len(decisions)} checks, worst: {max_level.value}"
|
|
results.append(AggregatedStatus(
|
|
module=module,
|
|
overall_level=max_level,
|
|
decisions=decisions.copy(),
|
|
summary=summary
|
|
))
|
|
return results
|