You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

78 lines
2.9 KiB

from typing import Callable, List, Dict
from models import SafetyStatus, SafetyLevel
import threading
import time
import logging
logger = logging.getLogger("Exporter")
class SafetyExporter:
def __init__(self, node_id: str, report_interval_ms: int = 1000):
self.node_id = node_id
self.report_interval = report_interval_ms / 1000.0
self._subscribers: List[Callable] = []
self._pending_reports: List[SafetyStatus] = []
self._running = False
self._default_filter = {"required_cycles": 3, "total_cycles": 5}
self._status_history: Dict[str, List[SafetyStatus]] = {}
self._lock = threading.Lock()
def subscribe(self, callback: Callable):
self._subscribers.append(callback)
logger.info(f"Subscriber registered: {callback.__name__}")
def set_default_filter(self, required: int, total: int):
self._default_filter = {"required_cycles": required, "total_cycles": total}
def report(self, status: SafetyStatus, bypass_filter: bool = False):
if bypass_filter or status.level == SafetyLevel.FATAL:
self._broadcast(status)
return
with self._lock:
history = self._status_history.setdefault(status.name, [])
history.append(status)
max_len = self._default_filter["total_cycles"]
if len(history) > max_len:
history.pop(0)
anomaly_count = sum(1 for s in history if s.level != SafetyLevel.OK)
required = self._default_filter["required_cycles"]
if anomaly_count >= required:
upgraded = SafetyStatus(
name=status.name, source=status.source,
level=status.level,
message=f"[Filtered] {status.message} ({anomaly_count}/{required})",
details={"filter_hit": anomaly_count, "filter_total": len(history)},
)
self._broadcast(upgraded)
else:
logger.debug(f"Filtering: {status.name} ({anomaly_count}/{required})")
def _broadcast(self, status: SafetyStatus):
for cb in self._subscribers:
try:
result = cb(status)
# Handle coroutine results (if subscriber is async)
if result is not None and hasattr(result, '__await__'):
import asyncio
try:
loop = asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if loop.is_running():
asyncio.ensure_future(result)
else:
loop.run_until_complete(result)
except Exception as e:
logger.error(f"Broadcast error to {cb.__name__}: {e}")
def start(self):
self._running = True
def stop(self):
self._running = False