后端final

zoujiaxuan_branch
echo 3 months ago
parent 908fe6a7f3
commit 10c1bb3c1c

@ -28,6 +28,7 @@ if not _db_url:
_db_url = "postgresql+asyncpg://postgres:password@localhost:5432/hadoop_fault_db"
DATABASE_URL = _db_url
SYNC_DATABASE_URL = _db_url.replace("postgresql+asyncpg://", "postgresql://")
# JWT Configuration
JWT_SECRET = os.getenv("JWT_SECRET", "dev-secret")

@ -87,7 +87,10 @@ class MetricsCollector:
return cpu_pct, mem_pct
async def _save_metrics(self, node_id: int, hostname: str, cluster_id: int, cpu: float, mem: float):
async with SessionLocal() as session:
# 这里的 SessionLocal 绑定的 engine 可能在主线程 loop 中初始化
# 在 asyncio.run() 开启的新 loop 中使用它会报 Loop 冲突
from .db import engine
async with AsyncSession(engine) as session:
now = datetime.datetime.now(BJ_TZ)
await session.execute(text("UPDATE nodes SET cpu_usage=:cpu, memory_usage=:mem, last_heartbeat=:hb WHERE id=:nid"), {"cpu": cpu, "mem": mem, "hb": now, "nid": node_id})
await session.commit()

@ -3,6 +3,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, text
from ..db import get_db
from ..deps.auth import get_current_user
from ..metrics_collector import metrics_collector
from ..models.nodes import Node
from ..models.clusters import Cluster
from datetime import datetime, timezone
@ -29,6 +30,113 @@ async def _ensure_access(db: AsyncSession, username: str, cluster_uuid: str) ->
return cid
@router.post("/metrics/collectors/start-by-cluster/{cluster_uuid}")
async def start_collectors_by_cluster(
cluster_uuid: str,
interval: int = Query(5, ge=1, le=3600),
user=Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
try:
name = _get_username(user)
cid = await _ensure_access(db, name, cluster_uuid)
if not cid:
raise HTTPException(status_code=403, detail="not_allowed")
res = await db.execute(select(Node.id, Node.hostname, Node.ip_address).where(Node.cluster_id == cid))
rows = res.all()
nodes = [(int(nid), str(hn), str(ip), int(cid)) for nid, hn, ip in rows]
started_count, started_nodes = metrics_collector.start_for_nodes(nodes, interval=interval)
return {
"started": int(started_count),
"nodes": started_nodes,
"interval": int(metrics_collector.collection_interval),
}
except HTTPException:
raise
except Exception:
raise HTTPException(status_code=500, detail="server_error")
@router.get("/metrics/collectors/status")
async def get_collectors_status(
cluster: str | None = Query(None),
user=Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""查询指标采集器的状态"""
try:
name = _get_username(user)
# 即使校验失败或发生错误,也返回一个 200 结构的友好响应,而不是让接口崩掉
try:
status = metrics_collector.get_collectors_status()
errors = metrics_collector.get_errors()
interval = int(metrics_collector.collection_interval)
# 如果提供了集群 UUID进行过滤
if cluster:
# 获取该集群下的节点列表
cid = await _ensure_access(db, name, cluster)
if cid:
res = await db.execute(select(Node.hostname).where(Node.cluster_id == cid))
cluster_nodes = set(str(hn) for (hn,) in res.all())
status = {k: v for k, v in status.items() if k in cluster_nodes}
errors = {k: v for k, v in errors.items() if k in cluster_nodes}
else:
# 权限不足时,返回空结果而非报错
status = {}
errors = {}
return {
"is_running": any(status.values()) if status else False,
"active_collectors_count": int(sum(1 for v in status.values() if v)),
"interval": interval,
"collectors": status,
"errors": errors
}
except Exception as inner_e:
return {
"is_running": False,
"active_collectors_count": 0,
"interval": 5,
"collectors": {},
"errors": {"system": str(inner_e)}
}
except Exception as e:
# 顶层异常捕获
return {
"is_running": False,
"active_collectors_count": 0,
"interval": 5,
"collectors": {},
"errors": {"fatal": str(e)}
}
@router.post("/metrics/collectors/stop-by-cluster/{cluster_uuid}")
async def stop_collectors_by_cluster(
cluster_uuid: str,
user=Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
try:
name = _get_username(user)
cid = await _ensure_access(db, name, cluster_uuid)
if not cid:
raise HTTPException(status_code=403, detail="not_allowed")
res = await db.execute(select(Node.hostname).where(Node.cluster_id == cid))
hostnames = [str(hn) for (hn,) in res.all()]
stopped = []
for hn in hostnames:
if hn in metrics_collector.collectors:
metrics_collector.stop(hn)
stopped.append(hn)
return {"stopped": int(len(stopped)), "nodes": stopped}
except HTTPException:
raise
except Exception:
raise HTTPException(status_code=500, detail="server_error")
@router.get("/metrics/cpu_trend")
async def cpu_trend(cluster: str = Query(...), user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""获取指定集群的 CPU 使用率趋势数据。"""

@ -0,0 +1,96 @@
# Metrics 采集器前端联调指南
## 1. 概述
本文档旨在指导前端开发人员如何调用新增的 Metrics 采集器接口,实现对集群节点 CPU、内存等指标的实时持续采集。该功能通过后台线程运行每隔固定周期默认 5 秒)自动更新数据库中的节点状态。
## 2. 接口说明
所有接口均需在 Header 中携带有效的 JWT Token 进行认证。
### 2.1 启动集群采集
**接口地址**: `POST /api/v1/metrics/collectors/start-by-cluster/{cluster_uuid}`
**功能描述**: 启动指定集群下所有节点的后台采集线程。如果采集已在运行,此操作将重启采集并应用新的 `interval`
**Query 参数**:
- `interval` (int, 可选): 采集周期,单位为秒。默认为 `5`
**请求示例**:
`POST /api/v1/metrics/collectors/start-by-cluster/550e8400-e29b-41d4-a716-446655440000?interval=10`
**响应示例**:
```json
{
"ok": true,
"message": "Metrics collection started for cluster 550e8400-e29b-41d4-a716-446655440000 with interval 10s"
}
```
---
### 2.2 获取采集器状态
**接口地址**: `GET /api/v1/metrics/collectors/status`
**功能描述**: 查询当前后台采集器的运行状态,包括活跃的采集线程数、周期以及最近的错误信息。
**Query 参数**:
- `cluster` (string, 可选): 指定集群 UUID 过滤状态。
**请求示例**:
`GET /api/v1/metrics/collectors/status?cluster=550e8400-e29b-41d4-a716-446655440000`
**响应示例**:
```json
{
"is_running": true,
"active_collectors_count": 3,
"interval": 5,
"collectors": {
"node-01": "running",
"node-02": "running"
},
"errors": {
"node-03": "SSH Timeout"
}
}
```
---
### 2.3 停止集群采集
**接口地址**: `POST /api/v1/metrics/collectors/stop-by-cluster/{cluster_uuid}`
**功能描述**: 停止指定集群下所有节点的后台采集线程。
**请求示例**:
`POST /api/v1/metrics/collectors/stop-by-cluster/550e8400-e29b-41d4-a716-446655440000`
**响应示例**:
```json
{
"ok": true,
"message": "Metrics collection stopping for cluster 550e8400-e29b-41d4-a716-446655440000"
}
```
## 3. 前端集成逻辑建议
### 3.1 页面加载时同步状态
当用户进入“集群监控”或“节点列表”页面时,应先调用 `GET /api/v1/metrics/collectors/status` 接口。
- 如果 `is_running``false`,界面可以显示“启动监控”按钮。
- 如果为 `true`,界面显示“监控中”状态,并可以根据 `interval` 开启前端定时刷新(调用原有的节点数据接口获取最新值)。
### 3.2 启动监控
点击“启动监控”按钮后,调用 `start-by-cluster` 接口。成功后,前端应每隔一段时间(建议大于等于 `interval`)重新获取节点列表数据,以展示最新的 CPU 和内存使用率。
### 3.3 错误处理
如果在状态接口中发现 `errors` 字段有内容,前端应在对应的节点行或监控卡片上显示错误图标及详情(如 SSH 连接失败等)。
## 4. 注意事项
1. **权限控制**: 只有拥有该集群访问权限的用户才能操作其采集器。
2. **性能影响**: 采集器运行在后台线程,过多频繁的 SSH 轮询可能对目标节点产生轻微负载,建议 `interval` 不要小于 `2` 秒。
3. **数据一致性**: 采集器更新的是 `nodes` 表。前端展示数据时,请调用 `GET /api/v1/nodes` 相关接口获取最新字段。
---
**版本**: v1.0
**最后更新**: 2026-01-07
Loading…
Cancel
Save