You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
95 lines
3.6 KiB
95 lines
3.6 KiB
from typing import Dict, List, Optional
|
|
from models import SafetyStatus, AnalyzerDecision, SafetyLevel
|
|
from config_manager import ConfigManager
|
|
import re
|
|
import time
|
|
import logging
|
|
|
|
logger = logging.getLogger("Analyzer")
|
|
|
|
|
|
class RuleMatcher:
|
|
def __init__(self, rules_config: List[Dict]):
|
|
self.rules = []
|
|
for rule_cfg in rules_config:
|
|
matcher = rule_cfg.get("matcher", {})
|
|
rule = rule_cfg.get("rule", {})
|
|
self.rules.append({"matcher": matcher, "rule": rule})
|
|
|
|
def match(self, status: SafetyStatus) -> Optional[Dict]:
|
|
for item in self.rules:
|
|
matcher = item["matcher"]
|
|
if "name" in matcher:
|
|
names = matcher["name"] if isinstance(matcher["name"], list) else [matcher["name"]]
|
|
if status.name in names:
|
|
return item["rule"]
|
|
if "contains" in matcher:
|
|
if any(c in status.name for c in matcher["contains"]):
|
|
return item["rule"]
|
|
if "regex" in matcher:
|
|
if any(re.match(p, status.name) for p in matcher["regex"]):
|
|
return item["rule"]
|
|
if "tags_contains_one" in matcher:
|
|
if set(status.tags) & set(matcher["tags_contains_one"]):
|
|
return item["rule"]
|
|
return None
|
|
|
|
|
|
class Analyzer:
|
|
def __init__(self, config_manager: ConfigManager):
|
|
self.config = config_manager
|
|
rules_cfg = config_manager.get("analyzer_rules", "rules", [])
|
|
self.matcher = RuleMatcher(rules_cfg)
|
|
self._history: Dict[str, List[Dict]] = {}
|
|
self.default_filter = {"required_cycles": 3, "total_cycles": 5}
|
|
|
|
def on_status(self, status: SafetyStatus) -> Optional[AnalyzerDecision]:
|
|
rule = self.matcher.match(status) or {}
|
|
|
|
filter_cfg = rule.get("filter", self.default_filter)
|
|
required = filter_cfg.get("required_cycles", 3)
|
|
total = filter_cfg.get("total_cycles", 5)
|
|
|
|
history = self._history.setdefault(status.name, [])
|
|
history.append({
|
|
"level": status.level,
|
|
"timestamp": status.timestamp,
|
|
"message": status.message,
|
|
})
|
|
if len(history) > total:
|
|
history.pop(0)
|
|
|
|
duration_cfg = rule.get("duration", 0)
|
|
if duration_cfg > 0:
|
|
anomaly_start = None
|
|
for h in reversed(history):
|
|
if h["level"] != SafetyLevel.OK:
|
|
anomaly_start = h["timestamp"]
|
|
else:
|
|
break
|
|
if anomaly_start and (time.time() - anomaly_start) < duration_cfg:
|
|
logger.debug(f"Duration not met: {status.name}")
|
|
return None
|
|
|
|
anomaly_count = sum(1 for h in history if h["level"] != SafetyLevel.OK)
|
|
if anomaly_count < required:
|
|
logger.debug(f"Filter not met: {status.name} ({anomaly_count}/{required})")
|
|
return None
|
|
|
|
final_level = status.level
|
|
if rule.get("upgrade_on_repeat") and anomaly_count >= total:
|
|
if final_level == SafetyLevel.WARN:
|
|
final_level = SafetyLevel.ERROR
|
|
logger.info(f"Upgraded: {status.name} WARN->ERROR")
|
|
|
|
decision = AnalyzerDecision(
|
|
source=status.name,
|
|
level=final_level,
|
|
message=status.message,
|
|
filter_info={"hit": anomaly_count, "total": len(history)},
|
|
duration_ms=int((time.time() - history[0]["timestamp"]) * 1000) if history else 0,
|
|
)
|
|
|
|
logger.info(f"Analyzed: {decision.source} -> {decision.level.value}")
|
|
return decision
|