You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

334 lines
11 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""进程状态统计模块。
负责采集和统计进程状态信息,包括:
- mainboard 启动的 cyber 组件DAG
- mainboard 进程组信息
- 用户自定义程序状态
参考 Apollo.md 中 statistics 模块的描述。
"""
import os
import re
import time
import subprocess
import threading
import logging
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, field
from datetime import datetime
logger = logging.getLogger("Statistics.Process")
@dataclass
class ProcessInfo:
"""进程信息。"""
pid: int = 0
name: str = ""
cmdline: str = ""
cpu_percent: float = 0.0
memory_mb: float = 0.0
status: str = ""
create_time: float = 0.0
uptime_seconds: float = 0.0
def to_dict(self) -> dict:
return {
"pid": self.pid,
"name": self.name,
"cmdline": self.cmdline[:200],
"cpu_percent": round(self.cpu_percent, 1),
"memory_mb": round(self.memory_mb, 1),
"status": self.status,
"uptime_seconds": round(self.uptime_seconds, 0),
}
@dataclass
class DagInfo:
"""DAG 文件信息。"""
dag_path: str = ""
process_group: str = ""
loaded: bool = False
pid: int = 0
def to_dict(self) -> dict:
return {
"dag_path": self.dag_path,
"process_group": self.process_group,
"loaded": self.loaded,
"pid": self.pid,
}
@dataclass
class ProcessGroupInfo:
"""进程组信息。"""
group_name: str = ""
pids: List[int] = field(default_factory=list)
dag_count: int = 0
dags: List[str] = field(default_factory=list)
running: bool = False
def to_dict(self) -> dict:
return {
"group_name": self.group_name,
"pids": self.pids,
"dag_count": self.dag_count,
"dags": self.dags,
"running": self.running,
}
@dataclass
class ProcessSnapshot:
"""进程状态快照。"""
mainboard_dags: List[DagInfo] = field(default_factory=list)
mainboard_groups: List[ProcessGroupInfo] = field(default_factory=list)
general_processes: List[ProcessInfo] = field(default_factory=list)
total_mainboard_count: int = 0
total_general_count: int = 0
timestamp: float = 0.0
def to_dict(self) -> dict:
return {
"mainboard_dags": [d.to_dict() for d in self.mainboard_dags],
"mainboard_groups": [g.to_dict() for g in self.mainboard_groups],
"general_processes": [p.to_dict() for p in self.general_processes],
"total_mainboard_count": self.total_mainboard_count,
"total_general_count": self.total_general_count,
"timestamp": self.timestamp,
}
class ProcessStatistic:
"""进程状态统计采集器。
定时采集系统中运行的进程信息,特别关注:
- mainboard 进程及其加载的 DAG 文件
- mainboard 进程组(-p 参数指定的组)
- 用户自定义程序(通过正则表达式匹配)
"""
# mainboard 进程名
_MAINBOARD_PROCESS_NAME = "mainboard"
# 默认用户自定义进程正则匹配
_DEFAULT_GENERAL_PROCESS_PATTERNS = [
r".*/apollo/scripts/.*",
r"cyber_recorder record .*",
]
def __init__(self, config: dict = None):
self._config = config or {}
self._general_patterns = self._config.get(
"general_process_regex", self._DEFAULT_GENERAL_PROCESS_PATTERNS)
self._collect_interval = self._config.get("collect_interval", 5.0)
self._history_size = self._config.get("history_size", 60)
self._snapshots: List[ProcessSnapshot] = []
self._running = False
self._thread: Optional[threading.Thread] = None
self._lock = threading.Lock()
# 编译正则
self._compiled_patterns = []
for pattern in self._general_patterns:
try:
self._compiled_patterns.append(re.compile(pattern))
except re.error as e:
logger.warning(f"Invalid process pattern '{pattern}': {e}")
def start(self):
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._collect_loop, daemon=True)
self._thread.start()
logger.info("Process statistic collector started")
def stop(self):
self._running = False
if self._thread:
self._thread.join(timeout=5)
logger.info("Process statistic collector stopped")
def collect_once(self) -> ProcessSnapshot:
"""执行一次进程采集。"""
snapshot = ProcessSnapshot(timestamp=time.time())
# 采集 mainboard 进程信息
mainboard_processes = self._find_mainboard_processes()
snapshot.total_mainboard_count = len(mainboard_processes)
# 解析 DAG 和进程组
dags = []
groups = {}
for proc in mainboard_processes:
dag_info = self._parse_dag_from_cmdline(proc.cmdline)
if dag_info:
dags.append(dag_info)
group_name = dag_info.process_group
if group_name not in groups:
groups[group_name] = ProcessGroupInfo(
group_name=group_name,
pids=[],
dags=[],
running=True,
)
groups[group_name].pids.append(proc.pid)
groups[group_name].dags.append(dag_info.dag_path)
groups[group_name].dag_count += 1
snapshot.mainboard_dags = dags
snapshot.mainboard_groups = list(groups.values())
# 采集用户自定义进程
general = self._find_general_processes()
snapshot.general_processes = general
snapshot.total_general_count = len(general)
# 更新历史
with self._lock:
self._snapshots.append(snapshot)
if len(self._snapshots) > self._history_size:
self._snapshots.pop(0)
return snapshot
def get_latest(self) -> Optional[ProcessSnapshot]:
with self._lock:
return self._snapshots[-1] if self._snapshots else None
def get_history(self, count: int = 10) -> List[ProcessSnapshot]:
with self._lock:
return self._snapshots[-count:]
def is_dag_running(self, dag_name: str) -> bool:
"""检查指定 DAG 是否正在运行。"""
latest = self.get_latest()
if latest is None:
return False
for dag in latest.mainboard_dags:
if dag_name in dag.dag_path:
return dag.loaded
return False
def is_group_running(self, group_name: str) -> bool:
"""检查指定进程组是否正在运行。"""
latest = self.get_latest()
if latest is None:
return False
for group in latest.mainboard_groups:
if group.group_name == group_name:
return group.running
return False
def is_process_running(self, pattern: str) -> bool:
"""检查匹配指定模式的进程是否在运行。"""
latest = self.get_latest()
if latest is None:
return False
for proc in latest.general_processes:
if re.search(pattern, proc.cmdline):
return True
# 也检查 mainboard 进程
for proc in latest.mainboard_dags:
if re.search(pattern, proc.dag_path):
return True
return False
def _collect_loop(self):
while self._running:
try:
self.collect_once()
except Exception as e:
logger.error(f"Process collect error: {e}")
time.sleep(self._collect_interval)
def _find_mainboard_processes(self) -> List[ProcessInfo]:
"""查找所有 mainboard 进程。"""
import psutil
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cpu_percent',
'memory_info', 'status', 'create_time']):
try:
name = proc.info.get('name', '')
if name == self._MAINBOARD_PROCESS_NAME:
pinfo = ProcessInfo()
pinfo.pid = proc.info['pid']
pinfo.name = name
cmdline_list = proc.info.get('cmdline', [])
pinfo.cmdline = ' '.join(cmdline_list) if cmdline_list else ""
pinfo.cpu_percent = proc.info.get('cpu_percent', 0) or 0
mem_info = proc.info.get('memory_info')
if mem_info:
pinfo.memory_mb = mem_info.rss / (1024 * 1024)
pinfo.status = proc.info.get('status', '')
create_time = proc.info.get('create_time', 0)
if create_time:
pinfo.create_time = create_time
pinfo.uptime_seconds = time.time() - create_time
processes.append(pinfo)
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return processes
def _find_general_processes(self) -> List[ProcessInfo]:
"""查找匹配用户自定义正则的进程。"""
import psutil
processes = []
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cpu_percent',
'memory_info', 'status', 'create_time']):
try:
cmdline_list = proc.info.get('cmdline', [])
cmdline = ' '.join(cmdline_list) if cmdline_list else ""
if not cmdline:
continue
for pattern in self._compiled_patterns:
if pattern.search(cmdline):
pinfo = ProcessInfo()
pinfo.pid = proc.info['pid']
pinfo.name = proc.info.get('name', '')
pinfo.cmdline = cmdline
pinfo.cpu_percent = proc.info.get('cpu_percent', 0) or 0
mem_info = proc.info.get('memory_info')
if mem_info:
pinfo.memory_mb = mem_info.rss / (1024 * 1024)
pinfo.status = proc.info.get('status', '')
create_time = proc.info.get('create_time', 0)
if create_time:
pinfo.create_time = create_time
pinfo.uptime_seconds = time.time() - create_time
processes.append(pinfo)
break
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return processes
def _parse_dag_from_cmdline(self, cmdline: str) -> Optional[DagInfo]:
"""从 mainboard 命令行解析 DAG 和进程组信息。"""
if not cmdline:
return None
dag_info = DagInfo()
# 解析 -d 参数 (DAG 文件)
dag_match = re.search(r'-d\s+(\S+\.dag)', cmdline)
if dag_match:
dag_info.dag_path = dag_match.group(1)
dag_info.loaded = True
# 解析 -p 参数 (进程组)
group_match = re.search(r'-p\s+(\S+)', cmdline)
if group_match:
dag_info.process_group = group_match.group(1)
# 获取 PID
# (PID 由调用者设置)
if dag_info.dag_path:
return dag_info
return None