You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

259 lines
7.9 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""安全状态封装类。
提供对 SafetyStatus 的封装,方便 checker 模块使用。
参考 Apollo.md 中 exporter 示例代码中的 SafetyStatusWrapper 类。
用法:
wrapper = SafetyStatusWrapper("CPU Usage")
wrapper.SetTags({"resource"})
wrapper.Summary(SafetyLevel.WARN, "value 75%, threshold 70%")
exporter.Report(wrapper)
"""
import time
import logging
from typing import Dict, List, Optional, Any
from models import SafetyStatus, SafetyLevel
logger = logging.getLogger("SafetyStatusWrapper")
class SafetyStatusWrapper:
"""安全状态封装类,用于构建和报告安全检查结果。
提供链式调用接口,简化安全状态的构建过程。
支持:
- 设置检查项名称
- 设置标签
- 设置原始值
- 设置详细信息
- 设置过滤条件
- 设置超时和持续时间条件
- 生成 SafetyStatus 对象
"""
def __init__(self, name: str, source: str = ""):
self._name = name
self._source = source or name
self._level = SafetyLevel.OK
self._message = ""
self._tags: List[str] = []
self._details: Dict[str, str] = {}
self._raw_value: Optional[Any] = None
self._timestamp = time.time()
# 过滤条件
self._filter_required = 0
self._filter_total = 0
# 超时条件(秒)
self._timeout_seconds = 0.0
# 持续时间条件(秒)
self._duration_seconds = 0.0
# 内部状态跟踪
self._history: List[Dict] = []
self._anomaly_start_time: Optional[float] = None
@property
def name(self) -> str:
return self._name
@property
def source(self) -> str:
return self._source
@property
def level(self) -> SafetyLevel:
return self._level
@property
def message(self) -> str:
return self._message
def SetName(self, name: str) -> "SafetyStatusWrapper":
"""设置检查项名称。"""
self._name = name
return self
def SetSource(self, source: str) -> "SafetyStatusWrapper":
"""设置来源标识。"""
self._source = source
return self
def SetTags(self, tags: List[str]) -> "SafetyStatusWrapper":
"""设置标签列表。"""
self._tags = list(tags)
return self
def AddTag(self, tag: str) -> "SafetyStatusWrapper":
"""添加单个标签。"""
if tag not in self._tags:
self._tags.append(tag)
return self
def SetRawValue(self, value: Any) -> "SafetyStatusWrapper":
"""设置原始值。"""
self._raw_value = value
return self
def SetDetail(self, key: str, value: str) -> "SafetyStatusWrapper":
"""设置单个详情键值对。"""
self._details[key] = str(value)
return self
def SetDetails(self, details: Dict[str, str]) -> "SafetyStatusWrapper":
"""设置所有详情。"""
self._details = dict(details)
return self
def SetDefaultFilter(self, required: int, total: int) -> "SafetyStatusWrapper":
"""设置默认过滤条件。
在 total 个周期内,有 required 个周期异常才判定为异常。
例如: required=3, total=5 表示 5 个周期内 3 次异常才算异常。
"""
self._filter_required = required
self._filter_total = total
return self
def SetDefaultTimeout(self, timeout: float) -> "SafetyStatusWrapper":
"""设置默认超时条件(秒)。
异常消息经过该时间后仍未更新,则恢复为正常状态。
"""
self._timeout_seconds = timeout
return self
def SetDefaultDuration(self, duration: float) -> "SafetyStatusWrapper":
"""设置默认持续时间条件(秒)。
异常需要持续超过该时间才判定为有效异常。
"""
self._duration_seconds = duration
return self
def Summary(self, level: SafetyLevel, message: str) -> "SafetyStatusWrapper":
"""设置安全状态摘要。
Args:
level: 安全级别 (OK/WARN/ERROR/FATAL)
message: 状态描述信息
"""
self._level = level
self._message = message
self._timestamp = time.time()
# 记录到历史
self._history.append({
"level": level,
"message": message,
"timestamp": self._timestamp,
})
# 保持历史记录在合理范围
max_history = max(self._filter_total, 10) * 2
if len(self._history) > max_history:
self._history = self._history[-max_history:]
return self
def Ok(self, message: str = "OK") -> "SafetyStatusWrapper":
"""快捷方法:设置为正常状态。"""
return self.Summary(SafetyLevel.OK, message)
def Warn(self, message: str) -> "SafetyStatusWrapper":
"""快捷方法:设置为警告状态。"""
return self.Summary(SafetyLevel.WARN, message)
def Error(self, message: str) -> "SafetyStatusWrapper":
"""快捷方法:设置为错误状态。"""
return self.Summary(SafetyLevel.ERROR, message)
def Fatal(self, message: str) -> "SafetyStatusWrapper":
"""快捷方法:设置为致命状态。"""
return self.Summary(SafetyLevel.FATAL, message)
def IsOk(self) -> bool:
"""判断当前是否为正常状态。"""
return self._level == SafetyLevel.OK
def IsCritical(self) -> bool:
"""判断当前是否为严重异常ERROR 或 FATAL"""
return self._level in (SafetyLevel.ERROR, SafetyLevel.FATAL)
def ShouldReport(self) -> bool:
"""根据过滤条件判断是否应该上报此状态。
如果设置了过滤条件,需要满足异常次数要求才上报。
"""
if self._filter_required <= 0 or self._filter_total <= 0:
return True
# 检查最近 total 个周期中异常的次数
recent = self._history[-self._filter_total:]
anomaly_count = sum(1 for h in recent if h["level"] != SafetyLevel.OK)
return anomaly_count >= self._filter_required
def CheckDuration(self) -> bool:
"""检查异常持续时间是否满足条件。
如果设置了持续时间条件,异常需要持续超过该时间才判定为有效。
"""
if self._duration_seconds <= 0:
return True
if self._level == SafetyLevel.OK:
self._anomaly_start_time = None
return False
now = time.time()
if self._anomaly_start_time is None:
self._anomaly_start_time = now
return (now - self._anomaly_start_time) >= self._duration_seconds
def CheckTimeout(self) -> bool:
"""检查异常是否已超时。
如果设置了超时条件,异常消息经过该时间后仍未更新,
则恢复为正常状态。
"""
if self._timeout_seconds <= 0:
return False
if self._level == SafetyLevel.OK:
return False
elapsed = time.time() - self._timestamp
return elapsed > self._timeout_seconds
def ToSafetyStatus(self) -> SafetyStatus:
"""转换为 SafetyStatus 对象。"""
return SafetyStatus(
name=self._name,
source=self._source,
level=self._level,
message=self._message,
timestamp=self._timestamp,
raw_value=self._raw_value,
details=dict(self._details),
tags=list(self._tags),
)
def Reset(self) -> "SafetyStatusWrapper":
"""重置状态。"""
self._level = SafetyLevel.OK
self._message = ""
self._raw_value = None
self._details = {}
self._timestamp = time.time()
self._anomaly_start_time = None
return self
def __repr__(self) -> str:
return (f"SafetyStatusWrapper(name={self._name!r}, "
f"level={self._level.value}, message={self._message!r})")