|
|
"""进程状态统计模块。
|
|
|
|
|
|
负责采集和统计进程状态信息,包括:
|
|
|
- mainboard 启动的 cyber 组件(DAG)
|
|
|
- mainboard 进程组信息
|
|
|
- 用户自定义程序状态
|
|
|
|
|
|
参考 Apollo.md 中 statistics 模块的描述。
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import re
|
|
|
import time
|
|
|
import subprocess
|
|
|
import threading
|
|
|
import logging
|
|
|
from typing import Dict, List, Optional, Set
|
|
|
from dataclasses import dataclass, field
|
|
|
from datetime import datetime
|
|
|
|
|
|
logger = logging.getLogger("Statistics.Process")
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class ProcessInfo:
|
|
|
"""进程信息。"""
|
|
|
pid: int = 0
|
|
|
name: str = ""
|
|
|
cmdline: str = ""
|
|
|
cpu_percent: float = 0.0
|
|
|
memory_mb: float = 0.0
|
|
|
status: str = ""
|
|
|
create_time: float = 0.0
|
|
|
uptime_seconds: float = 0.0
|
|
|
|
|
|
def to_dict(self) -> dict:
|
|
|
return {
|
|
|
"pid": self.pid,
|
|
|
"name": self.name,
|
|
|
"cmdline": self.cmdline[:200],
|
|
|
"cpu_percent": round(self.cpu_percent, 1),
|
|
|
"memory_mb": round(self.memory_mb, 1),
|
|
|
"status": self.status,
|
|
|
"uptime_seconds": round(self.uptime_seconds, 0),
|
|
|
}
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class DagInfo:
|
|
|
"""DAG 文件信息。"""
|
|
|
dag_path: str = ""
|
|
|
process_group: str = ""
|
|
|
loaded: bool = False
|
|
|
pid: int = 0
|
|
|
|
|
|
def to_dict(self) -> dict:
|
|
|
return {
|
|
|
"dag_path": self.dag_path,
|
|
|
"process_group": self.process_group,
|
|
|
"loaded": self.loaded,
|
|
|
"pid": self.pid,
|
|
|
}
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class ProcessGroupInfo:
|
|
|
"""进程组信息。"""
|
|
|
group_name: str = ""
|
|
|
pids: List[int] = field(default_factory=list)
|
|
|
dag_count: int = 0
|
|
|
dags: List[str] = field(default_factory=list)
|
|
|
running: bool = False
|
|
|
|
|
|
def to_dict(self) -> dict:
|
|
|
return {
|
|
|
"group_name": self.group_name,
|
|
|
"pids": self.pids,
|
|
|
"dag_count": self.dag_count,
|
|
|
"dags": self.dags,
|
|
|
"running": self.running,
|
|
|
}
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class ProcessSnapshot:
|
|
|
"""进程状态快照。"""
|
|
|
mainboard_dags: List[DagInfo] = field(default_factory=list)
|
|
|
mainboard_groups: List[ProcessGroupInfo] = field(default_factory=list)
|
|
|
general_processes: List[ProcessInfo] = field(default_factory=list)
|
|
|
total_mainboard_count: int = 0
|
|
|
total_general_count: int = 0
|
|
|
timestamp: float = 0.0
|
|
|
|
|
|
def to_dict(self) -> dict:
|
|
|
return {
|
|
|
"mainboard_dags": [d.to_dict() for d in self.mainboard_dags],
|
|
|
"mainboard_groups": [g.to_dict() for g in self.mainboard_groups],
|
|
|
"general_processes": [p.to_dict() for p in self.general_processes],
|
|
|
"total_mainboard_count": self.total_mainboard_count,
|
|
|
"total_general_count": self.total_general_count,
|
|
|
"timestamp": self.timestamp,
|
|
|
}
|
|
|
|
|
|
|
|
|
class ProcessStatistic:
|
|
|
"""进程状态统计采集器。
|
|
|
|
|
|
定时采集系统中运行的进程信息,特别关注:
|
|
|
- mainboard 进程及其加载的 DAG 文件
|
|
|
- mainboard 进程组(-p 参数指定的组)
|
|
|
- 用户自定义程序(通过正则表达式匹配)
|
|
|
"""
|
|
|
|
|
|
# mainboard 进程名
|
|
|
_MAINBOARD_PROCESS_NAME = "mainboard"
|
|
|
|
|
|
# 默认用户自定义进程正则匹配
|
|
|
_DEFAULT_GENERAL_PROCESS_PATTERNS = [
|
|
|
r".*/apollo/scripts/.*",
|
|
|
r"cyber_recorder record .*",
|
|
|
]
|
|
|
|
|
|
def __init__(self, config: dict = None):
|
|
|
self._config = config or {}
|
|
|
self._general_patterns = self._config.get(
|
|
|
"general_process_regex", self._DEFAULT_GENERAL_PROCESS_PATTERNS)
|
|
|
self._collect_interval = self._config.get("collect_interval", 5.0)
|
|
|
self._history_size = self._config.get("history_size", 60)
|
|
|
|
|
|
self._snapshots: List[ProcessSnapshot] = []
|
|
|
self._running = False
|
|
|
self._thread: Optional[threading.Thread] = None
|
|
|
self._lock = threading.Lock()
|
|
|
|
|
|
# 编译正则
|
|
|
self._compiled_patterns = []
|
|
|
for pattern in self._general_patterns:
|
|
|
try:
|
|
|
self._compiled_patterns.append(re.compile(pattern))
|
|
|
except re.error as e:
|
|
|
logger.warning(f"Invalid process pattern '{pattern}': {e}")
|
|
|
|
|
|
def start(self):
|
|
|
if self._running:
|
|
|
return
|
|
|
self._running = True
|
|
|
self._thread = threading.Thread(target=self._collect_loop, daemon=True)
|
|
|
self._thread.start()
|
|
|
logger.info("Process statistic collector started")
|
|
|
|
|
|
def stop(self):
|
|
|
self._running = False
|
|
|
if self._thread:
|
|
|
self._thread.join(timeout=5)
|
|
|
logger.info("Process statistic collector stopped")
|
|
|
|
|
|
def collect_once(self) -> ProcessSnapshot:
|
|
|
"""执行一次进程采集。"""
|
|
|
snapshot = ProcessSnapshot(timestamp=time.time())
|
|
|
|
|
|
# 采集 mainboard 进程信息
|
|
|
mainboard_processes = self._find_mainboard_processes()
|
|
|
snapshot.total_mainboard_count = len(mainboard_processes)
|
|
|
|
|
|
# 解析 DAG 和进程组
|
|
|
dags = []
|
|
|
groups = {}
|
|
|
for proc in mainboard_processes:
|
|
|
dag_info = self._parse_dag_from_cmdline(proc.cmdline)
|
|
|
if dag_info:
|
|
|
dags.append(dag_info)
|
|
|
group_name = dag_info.process_group
|
|
|
if group_name not in groups:
|
|
|
groups[group_name] = ProcessGroupInfo(
|
|
|
group_name=group_name,
|
|
|
pids=[],
|
|
|
dags=[],
|
|
|
running=True,
|
|
|
)
|
|
|
groups[group_name].pids.append(proc.pid)
|
|
|
groups[group_name].dags.append(dag_info.dag_path)
|
|
|
groups[group_name].dag_count += 1
|
|
|
|
|
|
snapshot.mainboard_dags = dags
|
|
|
snapshot.mainboard_groups = list(groups.values())
|
|
|
|
|
|
# 采集用户自定义进程
|
|
|
general = self._find_general_processes()
|
|
|
snapshot.general_processes = general
|
|
|
snapshot.total_general_count = len(general)
|
|
|
|
|
|
# 更新历史
|
|
|
with self._lock:
|
|
|
self._snapshots.append(snapshot)
|
|
|
if len(self._snapshots) > self._history_size:
|
|
|
self._snapshots.pop(0)
|
|
|
|
|
|
return snapshot
|
|
|
|
|
|
def get_latest(self) -> Optional[ProcessSnapshot]:
|
|
|
with self._lock:
|
|
|
return self._snapshots[-1] if self._snapshots else None
|
|
|
|
|
|
def get_history(self, count: int = 10) -> List[ProcessSnapshot]:
|
|
|
with self._lock:
|
|
|
return self._snapshots[-count:]
|
|
|
|
|
|
def is_dag_running(self, dag_name: str) -> bool:
|
|
|
"""检查指定 DAG 是否正在运行。"""
|
|
|
latest = self.get_latest()
|
|
|
if latest is None:
|
|
|
return False
|
|
|
for dag in latest.mainboard_dags:
|
|
|
if dag_name in dag.dag_path:
|
|
|
return dag.loaded
|
|
|
return False
|
|
|
|
|
|
def is_group_running(self, group_name: str) -> bool:
|
|
|
"""检查指定进程组是否正在运行。"""
|
|
|
latest = self.get_latest()
|
|
|
if latest is None:
|
|
|
return False
|
|
|
for group in latest.mainboard_groups:
|
|
|
if group.group_name == group_name:
|
|
|
return group.running
|
|
|
return False
|
|
|
|
|
|
def is_process_running(self, pattern: str) -> bool:
|
|
|
"""检查匹配指定模式的进程是否在运行。"""
|
|
|
latest = self.get_latest()
|
|
|
if latest is None:
|
|
|
return False
|
|
|
for proc in latest.general_processes:
|
|
|
if re.search(pattern, proc.cmdline):
|
|
|
return True
|
|
|
# 也检查 mainboard 进程
|
|
|
for proc in latest.mainboard_dags:
|
|
|
if re.search(pattern, proc.dag_path):
|
|
|
return True
|
|
|
return False
|
|
|
|
|
|
def _collect_loop(self):
|
|
|
while self._running:
|
|
|
try:
|
|
|
self.collect_once()
|
|
|
except Exception as e:
|
|
|
logger.error(f"Process collect error: {e}")
|
|
|
time.sleep(self._collect_interval)
|
|
|
|
|
|
def _find_mainboard_processes(self) -> List[ProcessInfo]:
|
|
|
"""查找所有 mainboard 进程。"""
|
|
|
import psutil
|
|
|
processes = []
|
|
|
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cpu_percent',
|
|
|
'memory_info', 'status', 'create_time']):
|
|
|
try:
|
|
|
name = proc.info.get('name', '')
|
|
|
if name == self._MAINBOARD_PROCESS_NAME:
|
|
|
pinfo = ProcessInfo()
|
|
|
pinfo.pid = proc.info['pid']
|
|
|
pinfo.name = name
|
|
|
cmdline_list = proc.info.get('cmdline', [])
|
|
|
pinfo.cmdline = ' '.join(cmdline_list) if cmdline_list else ""
|
|
|
pinfo.cpu_percent = proc.info.get('cpu_percent', 0) or 0
|
|
|
mem_info = proc.info.get('memory_info')
|
|
|
if mem_info:
|
|
|
pinfo.memory_mb = mem_info.rss / (1024 * 1024)
|
|
|
pinfo.status = proc.info.get('status', '')
|
|
|
create_time = proc.info.get('create_time', 0)
|
|
|
if create_time:
|
|
|
pinfo.create_time = create_time
|
|
|
pinfo.uptime_seconds = time.time() - create_time
|
|
|
processes.append(pinfo)
|
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
|
continue
|
|
|
return processes
|
|
|
|
|
|
def _find_general_processes(self) -> List[ProcessInfo]:
|
|
|
"""查找匹配用户自定义正则的进程。"""
|
|
|
import psutil
|
|
|
processes = []
|
|
|
for proc in psutil.process_iter(['pid', 'name', 'cmdline', 'cpu_percent',
|
|
|
'memory_info', 'status', 'create_time']):
|
|
|
try:
|
|
|
cmdline_list = proc.info.get('cmdline', [])
|
|
|
cmdline = ' '.join(cmdline_list) if cmdline_list else ""
|
|
|
if not cmdline:
|
|
|
continue
|
|
|
for pattern in self._compiled_patterns:
|
|
|
if pattern.search(cmdline):
|
|
|
pinfo = ProcessInfo()
|
|
|
pinfo.pid = proc.info['pid']
|
|
|
pinfo.name = proc.info.get('name', '')
|
|
|
pinfo.cmdline = cmdline
|
|
|
pinfo.cpu_percent = proc.info.get('cpu_percent', 0) or 0
|
|
|
mem_info = proc.info.get('memory_info')
|
|
|
if mem_info:
|
|
|
pinfo.memory_mb = mem_info.rss / (1024 * 1024)
|
|
|
pinfo.status = proc.info.get('status', '')
|
|
|
create_time = proc.info.get('create_time', 0)
|
|
|
if create_time:
|
|
|
pinfo.create_time = create_time
|
|
|
pinfo.uptime_seconds = time.time() - create_time
|
|
|
processes.append(pinfo)
|
|
|
break
|
|
|
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
|
|
|
continue
|
|
|
return processes
|
|
|
|
|
|
def _parse_dag_from_cmdline(self, cmdline: str) -> Optional[DagInfo]:
|
|
|
"""从 mainboard 命令行解析 DAG 和进程组信息。"""
|
|
|
if not cmdline:
|
|
|
return None
|
|
|
|
|
|
dag_info = DagInfo()
|
|
|
|
|
|
# 解析 -d 参数 (DAG 文件)
|
|
|
dag_match = re.search(r'-d\s+(\S+\.dag)', cmdline)
|
|
|
if dag_match:
|
|
|
dag_info.dag_path = dag_match.group(1)
|
|
|
dag_info.loaded = True
|
|
|
|
|
|
# 解析 -p 参数 (进程组)
|
|
|
group_match = re.search(r'-p\s+(\S+)', cmdline)
|
|
|
if group_match:
|
|
|
dag_info.process_group = group_match.group(1)
|
|
|
|
|
|
# 获取 PID
|
|
|
# (PID 由调用者设置)
|
|
|
|
|
|
if dag_info.dag_path:
|
|
|
return dag_info
|
|
|
return None
|