You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ErrorDetecting/backend/app/routers/ops.py

288 lines
10 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, text
from pydantic import BaseModel, Field
from datetime import datetime, timezone
import shlex
import uuid as uuidlib
import asyncio
from ..db import get_db
from ..deps.auth import get_current_user, PermissionChecker
from ..models.nodes import Node
from ..models.clusters import Cluster
from ..models.sys_exec_logs import SysExecLog
from ..models.hadoop_exec_logs import HadoopExecLog
from ..services.runner import run_remote_command
from ..ssh_utils import SSHClient
from ..config import now_bj
router = APIRouter()
def _now() -> datetime:
"""返回当前 UTC 时间。"""
return now_bj()
def _get_username(u) -> str:
"""提取用户名。"""
return getattr(u, "username", None) or (u.get("username") if isinstance(u, dict) else None) or "system"
def _require_ops(u):
"""校验用户是否具有运维权限。"""
name = _get_username(u)
if name not in {"admin", "ops"}:
raise HTTPException(status_code=403, detail="not_allowed")
async def _find_accessible_node(db: AsyncSession, user_name: str, hostname: str) -> Node | None:
"""在用户可访问的集群中查找指定主机名的节点。"""
uid_res = await db.execute(text("SELECT id FROM users WHERE username=:un LIMIT 1"), {"un": user_name})
uid_row = uid_res.first()
if not uid_row:
return None
ids_res = await db.execute(text("SELECT cluster_id FROM user_cluster_mapping WHERE user_id=:uid"), {"uid": uid_row[0]})
cluster_ids = [r[0] for r in ids_res.all()]
if not cluster_ids:
return None
res = await db.execute(select(Node).where(Node.hostname == hostname, Node.cluster_id.in_(cluster_ids)).limit(1))
return res.scalars().first()
def _gen_exec_id() -> str:
"""生成执行记录ID。"""
return uuidlib.uuid4().hex[:32]
class ReadLogReq(BaseModel):
node: str = Field(..., description="目标节点主机名")
path: str = Field(..., description="日志文件路径")
lines: int = Field(200, ge=1, le=5000, description="读取行数")
pattern: str | None = Field(None, description="可选过滤正则")
sshUser: str | None = Field(None, description="SSH 用户名(可选)")
timeout: int = Field(20, ge=1, le=120, description="命令超时时间")
async def _write_exec_log(db: AsyncSession, operation_id: str, description: str, user_id: int):
"""写入系统操作日志。"""
row = SysExecLog(
user_id=user_id,
description=description,
operation_time=_now()
)
db.add(row)
await db.flush()
await db.commit()
@router.post("/ops/read-log")
async def read_log(req: ReadLogReq, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
"""读取远端日志文件内容,支持可选筛选。"""
try:
_require_ops(user)
uname = _get_username(user)
# 假设这里需要 user_id从 user 对象获取或查询
user_id = getattr(user, "id", 1)
node = await _find_accessible_node(db, uname, req.node)
if not node:
raise HTTPException(status_code=404, detail="node_not_found")
path_q = shlex.quote(req.path)
cmd = f"tail -n {req.lines} {path_q}"
if req.pattern:
pat_q = shlex.quote(req.pattern)
cmd = f"{cmd} | grep -E {pat_q}"
start = _now()
code, out, err = await run_remote_command(str(getattr(node, "ip_address", "")), req.sshUser or "", cmd, timeout=req.timeout)
desc = f"Read log: {req.path} on {req.node} (Exit: {code})"
await _write_exec_log(db, None, desc, user_id)
if code != 0:
raise HTTPException(status_code=500, detail="exec_failed")
lines = [ln for ln in out.splitlines()]
return {"exitCode": code, "lines": lines}
except HTTPException:
raise
except Exception:
raise HTTPException(status_code=500, detail="server_error")
async def _write_hadoop_exec_log(db: AsyncSession, user_id: int, cluster_name: str, description: str, start_time: datetime, end_time: datetime):
"""写入 Hadoop 执行审计日志。"""
row = HadoopExecLog(
from_user_id=user_id,
cluster_name=cluster_name,
description=description,
start_time=start_time,
end_time=end_time
)
db.add(row)
await db.flush()
await db.commit()
@router.post("/ops/clusters/{cluster_uuid}/start")
async def start_cluster(
cluster_uuid: str,
user=Depends(PermissionChecker(["cluster:start"])),
db: AsyncSession = Depends(get_db)
):
"""启动集群:在 NameNode 执行 hsfsstart在 ResourceManager 执行 yarnstart。"""
try:
# UUID 格式校验
try:
uuidlib.UUID(cluster_uuid)
except ValueError:
raise HTTPException(status_code=400, detail="invalid_uuid_format")
uname = _get_username(user)
user_id = getattr(user, "id", 1)
# 1. 查找集群
res = await db.execute(select(Cluster).where(Cluster.uuid == cluster_uuid).limit(1))
cluster = res.scalars().first()
if not cluster:
raise HTTPException(status_code=404, detail="cluster_not_found")
# 2. 获取 SSH 用户 (从关联节点中获取,默认为 hadoop)
node_res = await db.execute(select(Node).where(Node.cluster_id == cluster.id).limit(1))
node = node_res.scalars().first()
ssh_user = node.ssh_user if node and node.ssh_user else "hadoop"
start_time = _now()
logs = []
# 3. 在 NameNode 执行 start-dfs.sh
if cluster.namenode_ip and cluster.namenode_psw:
try:
def run_nn_start():
with SSHClient(str(cluster.namenode_ip), ssh_user, cluster.namenode_psw) as client:
return client.execute_command("start-dfs.sh")
out, err = await asyncio.to_thread(run_nn_start)
logs.append(f"NameNode ({cluster.namenode_ip}) start: {out} {err}")
except Exception as e:
logs.append(f"NameNode ({cluster.namenode_ip}) start failed: {str(e)}")
# 4. 在 ResourceManager 执行 start-yarn.sh
if cluster.rm_ip and cluster.rm_psw:
try:
def run_rm_start():
with SSHClient(str(cluster.rm_ip), ssh_user, cluster.rm_psw) as client:
return client.execute_command("start-yarn.sh")
out, err = await asyncio.to_thread(run_rm_start)
logs.append(f"ResourceManager ({cluster.rm_ip}) start: {out} {err}")
except Exception as e:
logs.append(f"ResourceManager ({cluster.rm_ip}) start failed: {str(e)}")
end_time = _now()
# 5. 更新集群状态 (仅当所有尝试都未抛出异常时)
# 改进:检查是否有失败日志
has_failed = any("failed" in log.lower() for log in logs)
if not has_failed:
cluster.health_status = "healthy"
else:
cluster.health_status = "error"
cluster.updated_at = end_time
await db.flush()
# 6. 记录日志
full_desc = " | ".join(logs)
await _write_hadoop_exec_log(db, user_id, cluster.name, f"Start Cluster: {full_desc}", start_time, end_time)
return {"status": "success", "logs": logs}
except HTTPException:
raise
except Exception as e:
print(f"Error starting cluster: {e}")
raise HTTPException(status_code=500, detail="server_error")
@router.post("/ops/clusters/{cluster_uuid}/stop")
async def stop_cluster(
cluster_uuid: str,
user=Depends(PermissionChecker(["cluster:stop"])),
db: AsyncSession = Depends(get_db)
):
"""停止集群:在 NameNode 执行 hsfsstop在 ResourceManager 执行 yarnstop。"""
try:
# UUID 格式校验
try:
uuidlib.UUID(cluster_uuid)
except ValueError:
raise HTTPException(status_code=400, detail="invalid_uuid_format")
uname = _get_username(user)
user_id = getattr(user, "id", 1)
# 1. 查找集群
res = await db.execute(select(Cluster).where(Cluster.uuid == cluster_uuid).limit(1))
cluster = res.scalars().first()
if not cluster:
raise HTTPException(status_code=404, detail="cluster_not_found")
# 2. 获取 SSH 用户
node_res = await db.execute(select(Node).where(Node.cluster_id == cluster.id).limit(1))
node = node_res.scalars().first()
ssh_user = node.ssh_user if node and node.ssh_user else "hadoop"
start_time = _now()
logs = []
# 3. 在 NameNode 执行 stop-dfs.sh
if cluster.namenode_ip and cluster.namenode_psw:
try:
def run_nn_stop():
with SSHClient(str(cluster.namenode_ip), ssh_user, cluster.namenode_psw) as client:
return client.execute_command("stop-dfs.sh")
out, err = await asyncio.to_thread(run_nn_stop)
logs.append(f"NameNode ({cluster.namenode_ip}) stop: {out} {err}")
except Exception as e:
logs.append(f"NameNode ({cluster.namenode_ip}) stop failed: {str(e)}")
# 4. 在 ResourceManager 执行 stop-yarn.sh
if cluster.rm_ip and cluster.rm_psw:
try:
def run_rm_stop():
with SSHClient(str(cluster.rm_ip), ssh_user, cluster.rm_psw) as client:
return client.execute_command("stop-yarn.sh")
out, err = await asyncio.to_thread(run_rm_stop)
logs.append(f"ResourceManager ({cluster.rm_ip}) stop: {out} {err}")
except Exception as e:
logs.append(f"ResourceManager ({cluster.rm_ip}) stop failed: {str(e)}")
end_time = _now()
# 5. 更新集群状态
cluster.health_status = "unknown"
cluster.updated_at = end_time
await db.flush()
# 6. 记录日志
full_desc = " | ".join(logs)
await _write_hadoop_exec_log(db, user_id, cluster.name, f"Stop Cluster: {full_desc}", start_time, end_time)
return {"status": "success", "logs": logs}
except HTTPException:
raise
except Exception as e:
print(f"Error stopping cluster: {e}")
raise HTTPException(status_code=500, detail="server_error")