You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
78 lines
2.9 KiB
78 lines
2.9 KiB
from typing import Callable, List, Dict
|
|
from models import SafetyStatus, SafetyLevel
|
|
import threading
|
|
import time
|
|
import logging
|
|
|
|
logger = logging.getLogger("Exporter")
|
|
|
|
|
|
class SafetyExporter:
|
|
def __init__(self, node_id: str, report_interval_ms: int = 1000):
|
|
self.node_id = node_id
|
|
self.report_interval = report_interval_ms / 1000.0
|
|
self._subscribers: List[Callable] = []
|
|
self._pending_reports: List[SafetyStatus] = []
|
|
self._running = False
|
|
self._default_filter = {"required_cycles": 3, "total_cycles": 5}
|
|
self._status_history: Dict[str, List[SafetyStatus]] = {}
|
|
self._lock = threading.Lock()
|
|
|
|
def subscribe(self, callback: Callable):
|
|
self._subscribers.append(callback)
|
|
logger.info(f"Subscriber registered: {callback.__name__}")
|
|
|
|
def set_default_filter(self, required: int, total: int):
|
|
self._default_filter = {"required_cycles": required, "total_cycles": total}
|
|
|
|
def report(self, status: SafetyStatus, bypass_filter: bool = False):
|
|
if bypass_filter or status.level == SafetyLevel.FATAL:
|
|
self._broadcast(status)
|
|
return
|
|
|
|
with self._lock:
|
|
history = self._status_history.setdefault(status.name, [])
|
|
history.append(status)
|
|
max_len = self._default_filter["total_cycles"]
|
|
if len(history) > max_len:
|
|
history.pop(0)
|
|
|
|
anomaly_count = sum(1 for s in history if s.level != SafetyLevel.OK)
|
|
required = self._default_filter["required_cycles"]
|
|
|
|
if anomaly_count >= required:
|
|
upgraded = SafetyStatus(
|
|
name=status.name, source=status.source,
|
|
level=status.level,
|
|
message=f"[Filtered] {status.message} ({anomaly_count}/{required})",
|
|
details={"filter_hit": anomaly_count, "filter_total": len(history)},
|
|
)
|
|
self._broadcast(upgraded)
|
|
else:
|
|
logger.debug(f"Filtering: {status.name} ({anomaly_count}/{required})")
|
|
|
|
def _broadcast(self, status: SafetyStatus):
|
|
for cb in self._subscribers:
|
|
try:
|
|
result = cb(status)
|
|
# Handle coroutine results (if subscriber is async)
|
|
if result is not None and hasattr(result, '__await__'):
|
|
import asyncio
|
|
try:
|
|
loop = asyncio.get_event_loop()
|
|
except RuntimeError:
|
|
loop = asyncio.new_event_loop()
|
|
asyncio.set_event_loop(loop)
|
|
if loop.is_running():
|
|
asyncio.ensure_future(result)
|
|
else:
|
|
loop.run_until_complete(result)
|
|
except Exception as e:
|
|
logger.error(f"Broadcast error to {cb.__name__}: {e}")
|
|
|
|
def start(self):
|
|
self._running = True
|
|
|
|
def stop(self):
|
|
self._running = False
|