You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

95 lines
3.6 KiB

from typing import Dict, List, Optional
from models import SafetyStatus, AnalyzerDecision, SafetyLevel
from config_manager import ConfigManager
import re
import time
import logging
logger = logging.getLogger("Analyzer")
class RuleMatcher:
def __init__(self, rules_config: List[Dict]):
self.rules = []
for rule_cfg in rules_config:
matcher = rule_cfg.get("matcher", {})
rule = rule_cfg.get("rule", {})
self.rules.append({"matcher": matcher, "rule": rule})
def match(self, status: SafetyStatus) -> Optional[Dict]:
for item in self.rules:
matcher = item["matcher"]
if "name" in matcher:
names = matcher["name"] if isinstance(matcher["name"], list) else [matcher["name"]]
if status.name in names:
return item["rule"]
if "contains" in matcher:
if any(c in status.name for c in matcher["contains"]):
return item["rule"]
if "regex" in matcher:
if any(re.match(p, status.name) for p in matcher["regex"]):
return item["rule"]
if "tags_contains_one" in matcher:
if set(status.tags) & set(matcher["tags_contains_one"]):
return item["rule"]
return None
class Analyzer:
def __init__(self, config_manager: ConfigManager):
self.config = config_manager
rules_cfg = config_manager.get("analyzer_rules", "rules", [])
self.matcher = RuleMatcher(rules_cfg)
self._history: Dict[str, List[Dict]] = {}
self.default_filter = {"required_cycles": 3, "total_cycles": 5}
def on_status(self, status: SafetyStatus) -> Optional[AnalyzerDecision]:
rule = self.matcher.match(status) or {}
filter_cfg = rule.get("filter", self.default_filter)
required = filter_cfg.get("required_cycles", 3)
total = filter_cfg.get("total_cycles", 5)
history = self._history.setdefault(status.name, [])
history.append({
"level": status.level,
"timestamp": status.timestamp,
"message": status.message,
})
if len(history) > total:
history.pop(0)
duration_cfg = rule.get("duration", 0)
if duration_cfg > 0:
anomaly_start = None
for h in reversed(history):
if h["level"] != SafetyLevel.OK:
anomaly_start = h["timestamp"]
else:
break
if anomaly_start and (time.time() - anomaly_start) < duration_cfg:
logger.debug(f"Duration not met: {status.name}")
return None
anomaly_count = sum(1 for h in history if h["level"] != SafetyLevel.OK)
if anomaly_count < required:
logger.debug(f"Filter not met: {status.name} ({anomaly_count}/{required})")
return None
final_level = status.level
if rule.get("upgrade_on_repeat") and anomaly_count >= total:
if final_level == SafetyLevel.WARN:
final_level = SafetyLevel.ERROR
logger.info(f"Upgraded: {status.name} WARN->ERROR")
decision = AnalyzerDecision(
source=status.name,
level=final_level,
message=status.message,
filter_info={"hit": anomaly_count, "total": len(history)},
duration_ms=int((time.time() - history[0]["timestamp"]) * 1000) if history else 0,
)
logger.info(f"Analyzed: {decision.source} -> {decision.level.value}")
return decision