|
|
"""安全状态封装类。
|
|
|
|
|
|
提供对 SafetyStatus 的封装,方便 checker 模块使用。
|
|
|
参考 Apollo.md 中 exporter 示例代码中的 SafetyStatusWrapper 类。
|
|
|
|
|
|
用法:
|
|
|
wrapper = SafetyStatusWrapper("CPU Usage")
|
|
|
wrapper.SetTags({"resource"})
|
|
|
wrapper.Summary(SafetyLevel.WARN, "value 75%, threshold 70%")
|
|
|
exporter.Report(wrapper)
|
|
|
"""
|
|
|
|
|
|
import time
|
|
|
import logging
|
|
|
from typing import Dict, List, Optional, Any
|
|
|
from models import SafetyStatus, SafetyLevel
|
|
|
|
|
|
logger = logging.getLogger("SafetyStatusWrapper")
|
|
|
|
|
|
|
|
|
class SafetyStatusWrapper:
|
|
|
"""安全状态封装类,用于构建和报告安全检查结果。
|
|
|
|
|
|
提供链式调用接口,简化安全状态的构建过程。
|
|
|
支持:
|
|
|
- 设置检查项名称
|
|
|
- 设置标签
|
|
|
- 设置原始值
|
|
|
- 设置详细信息
|
|
|
- 设置过滤条件
|
|
|
- 设置超时和持续时间条件
|
|
|
- 生成 SafetyStatus 对象
|
|
|
"""
|
|
|
|
|
|
def __init__(self, name: str, source: str = ""):
|
|
|
self._name = name
|
|
|
self._source = source or name
|
|
|
self._level = SafetyLevel.OK
|
|
|
self._message = ""
|
|
|
self._tags: List[str] = []
|
|
|
self._details: Dict[str, str] = {}
|
|
|
self._raw_value: Optional[Any] = None
|
|
|
self._timestamp = time.time()
|
|
|
|
|
|
# 过滤条件
|
|
|
self._filter_required = 0
|
|
|
self._filter_total = 0
|
|
|
|
|
|
# 超时条件(秒)
|
|
|
self._timeout_seconds = 0.0
|
|
|
|
|
|
# 持续时间条件(秒)
|
|
|
self._duration_seconds = 0.0
|
|
|
|
|
|
# 内部状态跟踪
|
|
|
self._history: List[Dict] = []
|
|
|
self._anomaly_start_time: Optional[float] = None
|
|
|
|
|
|
@property
|
|
|
def name(self) -> str:
|
|
|
return self._name
|
|
|
|
|
|
@property
|
|
|
def source(self) -> str:
|
|
|
return self._source
|
|
|
|
|
|
@property
|
|
|
def level(self) -> SafetyLevel:
|
|
|
return self._level
|
|
|
|
|
|
@property
|
|
|
def message(self) -> str:
|
|
|
return self._message
|
|
|
|
|
|
def SetName(self, name: str) -> "SafetyStatusWrapper":
|
|
|
"""设置检查项名称。"""
|
|
|
self._name = name
|
|
|
return self
|
|
|
|
|
|
def SetSource(self, source: str) -> "SafetyStatusWrapper":
|
|
|
"""设置来源标识。"""
|
|
|
self._source = source
|
|
|
return self
|
|
|
|
|
|
def SetTags(self, tags: List[str]) -> "SafetyStatusWrapper":
|
|
|
"""设置标签列表。"""
|
|
|
self._tags = list(tags)
|
|
|
return self
|
|
|
|
|
|
def AddTag(self, tag: str) -> "SafetyStatusWrapper":
|
|
|
"""添加单个标签。"""
|
|
|
if tag not in self._tags:
|
|
|
self._tags.append(tag)
|
|
|
return self
|
|
|
|
|
|
def SetRawValue(self, value: Any) -> "SafetyStatusWrapper":
|
|
|
"""设置原始值。"""
|
|
|
self._raw_value = value
|
|
|
return self
|
|
|
|
|
|
def SetDetail(self, key: str, value: str) -> "SafetyStatusWrapper":
|
|
|
"""设置单个详情键值对。"""
|
|
|
self._details[key] = str(value)
|
|
|
return self
|
|
|
|
|
|
def SetDetails(self, details: Dict[str, str]) -> "SafetyStatusWrapper":
|
|
|
"""设置所有详情。"""
|
|
|
self._details = dict(details)
|
|
|
return self
|
|
|
|
|
|
def SetDefaultFilter(self, required: int, total: int) -> "SafetyStatusWrapper":
|
|
|
"""设置默认过滤条件。
|
|
|
|
|
|
在 total 个周期内,有 required 个周期异常才判定为异常。
|
|
|
例如: required=3, total=5 表示 5 个周期内 3 次异常才算异常。
|
|
|
"""
|
|
|
self._filter_required = required
|
|
|
self._filter_total = total
|
|
|
return self
|
|
|
|
|
|
def SetDefaultTimeout(self, timeout: float) -> "SafetyStatusWrapper":
|
|
|
"""设置默认超时条件(秒)。
|
|
|
|
|
|
异常消息经过该时间后仍未更新,则恢复为正常状态。
|
|
|
"""
|
|
|
self._timeout_seconds = timeout
|
|
|
return self
|
|
|
|
|
|
def SetDefaultDuration(self, duration: float) -> "SafetyStatusWrapper":
|
|
|
"""设置默认持续时间条件(秒)。
|
|
|
|
|
|
异常需要持续超过该时间才判定为有效异常。
|
|
|
"""
|
|
|
self._duration_seconds = duration
|
|
|
return self
|
|
|
|
|
|
def Summary(self, level: SafetyLevel, message: str) -> "SafetyStatusWrapper":
|
|
|
"""设置安全状态摘要。
|
|
|
|
|
|
Args:
|
|
|
level: 安全级别 (OK/WARN/ERROR/FATAL)
|
|
|
message: 状态描述信息
|
|
|
"""
|
|
|
self._level = level
|
|
|
self._message = message
|
|
|
self._timestamp = time.time()
|
|
|
|
|
|
# 记录到历史
|
|
|
self._history.append({
|
|
|
"level": level,
|
|
|
"message": message,
|
|
|
"timestamp": self._timestamp,
|
|
|
})
|
|
|
|
|
|
# 保持历史记录在合理范围
|
|
|
max_history = max(self._filter_total, 10) * 2
|
|
|
if len(self._history) > max_history:
|
|
|
self._history = self._history[-max_history:]
|
|
|
|
|
|
return self
|
|
|
|
|
|
def Ok(self, message: str = "OK") -> "SafetyStatusWrapper":
|
|
|
"""快捷方法:设置为正常状态。"""
|
|
|
return self.Summary(SafetyLevel.OK, message)
|
|
|
|
|
|
def Warn(self, message: str) -> "SafetyStatusWrapper":
|
|
|
"""快捷方法:设置为警告状态。"""
|
|
|
return self.Summary(SafetyLevel.WARN, message)
|
|
|
|
|
|
def Error(self, message: str) -> "SafetyStatusWrapper":
|
|
|
"""快捷方法:设置为错误状态。"""
|
|
|
return self.Summary(SafetyLevel.ERROR, message)
|
|
|
|
|
|
def Fatal(self, message: str) -> "SafetyStatusWrapper":
|
|
|
"""快捷方法:设置为致命状态。"""
|
|
|
return self.Summary(SafetyLevel.FATAL, message)
|
|
|
|
|
|
def IsOk(self) -> bool:
|
|
|
"""判断当前是否为正常状态。"""
|
|
|
return self._level == SafetyLevel.OK
|
|
|
|
|
|
def IsCritical(self) -> bool:
|
|
|
"""判断当前是否为严重异常(ERROR 或 FATAL)。"""
|
|
|
return self._level in (SafetyLevel.ERROR, SafetyLevel.FATAL)
|
|
|
|
|
|
def ShouldReport(self) -> bool:
|
|
|
"""根据过滤条件判断是否应该上报此状态。
|
|
|
|
|
|
如果设置了过滤条件,需要满足异常次数要求才上报。
|
|
|
"""
|
|
|
if self._filter_required <= 0 or self._filter_total <= 0:
|
|
|
return True
|
|
|
|
|
|
# 检查最近 total 个周期中异常的次数
|
|
|
recent = self._history[-self._filter_total:]
|
|
|
anomaly_count = sum(1 for h in recent if h["level"] != SafetyLevel.OK)
|
|
|
|
|
|
return anomaly_count >= self._filter_required
|
|
|
|
|
|
def CheckDuration(self) -> bool:
|
|
|
"""检查异常持续时间是否满足条件。
|
|
|
|
|
|
如果设置了持续时间条件,异常需要持续超过该时间才判定为有效。
|
|
|
"""
|
|
|
if self._duration_seconds <= 0:
|
|
|
return True
|
|
|
|
|
|
if self._level == SafetyLevel.OK:
|
|
|
self._anomaly_start_time = None
|
|
|
return False
|
|
|
|
|
|
now = time.time()
|
|
|
if self._anomaly_start_time is None:
|
|
|
self._anomaly_start_time = now
|
|
|
|
|
|
return (now - self._anomaly_start_time) >= self._duration_seconds
|
|
|
|
|
|
def CheckTimeout(self) -> bool:
|
|
|
"""检查异常是否已超时。
|
|
|
|
|
|
如果设置了超时条件,异常消息经过该时间后仍未更新,
|
|
|
则恢复为正常状态。
|
|
|
"""
|
|
|
if self._timeout_seconds <= 0:
|
|
|
return False
|
|
|
|
|
|
if self._level == SafetyLevel.OK:
|
|
|
return False
|
|
|
|
|
|
elapsed = time.time() - self._timestamp
|
|
|
return elapsed > self._timeout_seconds
|
|
|
|
|
|
def ToSafetyStatus(self) -> SafetyStatus:
|
|
|
"""转换为 SafetyStatus 对象。"""
|
|
|
return SafetyStatus(
|
|
|
name=self._name,
|
|
|
source=self._source,
|
|
|
level=self._level,
|
|
|
message=self._message,
|
|
|
timestamp=self._timestamp,
|
|
|
raw_value=self._raw_value,
|
|
|
details=dict(self._details),
|
|
|
tags=list(self._tags),
|
|
|
)
|
|
|
|
|
|
def Reset(self) -> "SafetyStatusWrapper":
|
|
|
"""重置状态。"""
|
|
|
self._level = SafetyLevel.OK
|
|
|
self._message = ""
|
|
|
self._raw_value = None
|
|
|
self._details = {}
|
|
|
self._timestamp = time.time()
|
|
|
self._anomaly_start_time = None
|
|
|
return self
|
|
|
|
|
|
def __repr__(self) -> str:
|
|
|
return (f"SafetyStatusWrapper(name={self._name!r}, "
|
|
|
f"level={self._level.value}, message={self._message!r})")
|