|
|
from fastapi import APIRouter, Depends, HTTPException
|
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
from sqlalchemy import select, text
|
|
|
from pydantic import BaseModel, Field
|
|
|
from datetime import datetime, timezone
|
|
|
import shlex
|
|
|
import uuid as uuidlib
|
|
|
import asyncio
|
|
|
|
|
|
from ..db import get_db
|
|
|
from ..deps.auth import get_current_user, PermissionChecker
|
|
|
from ..models.nodes import Node
|
|
|
from ..models.clusters import Cluster
|
|
|
from ..models.sys_exec_logs import SysExecLog
|
|
|
from ..models.hadoop_exec_logs import HadoopExecLog
|
|
|
from ..services.runner import run_remote_command
|
|
|
from ..ssh_utils import SSHClient
|
|
|
from ..config import now_bj
|
|
|
|
|
|
|
|
|
router = APIRouter()
|
|
|
|
|
|
|
|
|
def _now() -> datetime:
|
|
|
"""返回当前 UTC 时间。"""
|
|
|
return now_bj()
|
|
|
|
|
|
|
|
|
def _get_username(u) -> str:
|
|
|
"""提取用户名。"""
|
|
|
return getattr(u, "username", None) or (u.get("username") if isinstance(u, dict) else None) or "system"
|
|
|
|
|
|
|
|
|
def _require_ops(u):
|
|
|
"""校验用户是否具有运维权限。"""
|
|
|
name = _get_username(u)
|
|
|
if name not in {"admin", "ops"}:
|
|
|
raise HTTPException(status_code=403, detail="not_allowed")
|
|
|
|
|
|
|
|
|
async def _find_accessible_node(db: AsyncSession, user_name: str, hostname: str) -> Node | None:
|
|
|
"""在用户可访问的集群中查找指定主机名的节点。"""
|
|
|
uid_res = await db.execute(text("SELECT id FROM users WHERE username=:un LIMIT 1"), {"un": user_name})
|
|
|
uid_row = uid_res.first()
|
|
|
if not uid_row:
|
|
|
return None
|
|
|
ids_res = await db.execute(text("SELECT cluster_id FROM user_cluster_mapping WHERE user_id=:uid"), {"uid": uid_row[0]})
|
|
|
cluster_ids = [r[0] for r in ids_res.all()]
|
|
|
if not cluster_ids:
|
|
|
return None
|
|
|
res = await db.execute(select(Node).where(Node.hostname == hostname, Node.cluster_id.in_(cluster_ids)).limit(1))
|
|
|
return res.scalars().first()
|
|
|
|
|
|
|
|
|
def _gen_exec_id() -> str:
|
|
|
"""生成执行记录ID。"""
|
|
|
return uuidlib.uuid4().hex[:32]
|
|
|
|
|
|
|
|
|
class ReadLogReq(BaseModel):
|
|
|
node: str = Field(..., description="目标节点主机名")
|
|
|
path: str = Field(..., description="日志文件路径")
|
|
|
lines: int = Field(200, ge=1, le=5000, description="读取行数")
|
|
|
pattern: str | None = Field(None, description="可选过滤正则")
|
|
|
sshUser: str | None = Field(None, description="SSH 用户名(可选)")
|
|
|
timeout: int = Field(20, ge=1, le=120, description="命令超时时间")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _write_exec_log(db: AsyncSession, operation_id: str, description: str, user_id: int):
|
|
|
"""写入系统操作日志。"""
|
|
|
row = SysExecLog(
|
|
|
user_id=user_id,
|
|
|
description=description,
|
|
|
operation_time=_now()
|
|
|
)
|
|
|
db.add(row)
|
|
|
await db.flush()
|
|
|
await db.commit()
|
|
|
|
|
|
|
|
|
@router.post("/ops/read-log")
|
|
|
async def read_log(req: ReadLogReq, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
|
|
|
"""读取远端日志文件内容,支持可选筛选。"""
|
|
|
try:
|
|
|
_require_ops(user)
|
|
|
uname = _get_username(user)
|
|
|
# 假设这里需要 user_id,从 user 对象获取或查询
|
|
|
user_id = getattr(user, "id", 1)
|
|
|
node = await _find_accessible_node(db, uname, req.node)
|
|
|
if not node:
|
|
|
raise HTTPException(status_code=404, detail="node_not_found")
|
|
|
path_q = shlex.quote(req.path)
|
|
|
cmd = f"tail -n {req.lines} {path_q}"
|
|
|
if req.pattern:
|
|
|
pat_q = shlex.quote(req.pattern)
|
|
|
cmd = f"{cmd} | grep -E {pat_q}"
|
|
|
|
|
|
start = _now()
|
|
|
code, out, err = await run_remote_command(str(getattr(node, "ip_address", "")), req.sshUser or "", cmd, timeout=req.timeout)
|
|
|
|
|
|
desc = f"Read log: {req.path} on {req.node} (Exit: {code})"
|
|
|
await _write_exec_log(db, None, desc, user_id)
|
|
|
|
|
|
if code != 0:
|
|
|
raise HTTPException(status_code=500, detail="exec_failed")
|
|
|
lines = [ln for ln in out.splitlines()]
|
|
|
return {"exitCode": code, "lines": lines}
|
|
|
except HTTPException:
|
|
|
raise
|
|
|
except Exception:
|
|
|
raise HTTPException(status_code=500, detail="server_error")
|
|
|
|
|
|
|
|
|
async def _write_hadoop_exec_log(db: AsyncSession, user_id: int, cluster_name: str, description: str, start_time: datetime, end_time: datetime):
|
|
|
"""写入 Hadoop 执行审计日志。"""
|
|
|
row = HadoopExecLog(
|
|
|
from_user_id=user_id,
|
|
|
cluster_name=cluster_name,
|
|
|
description=description,
|
|
|
start_time=start_time,
|
|
|
end_time=end_time
|
|
|
)
|
|
|
db.add(row)
|
|
|
await db.flush()
|
|
|
await db.commit()
|
|
|
|
|
|
|
|
|
@router.post("/ops/clusters/{cluster_uuid}/start")
|
|
|
async def start_cluster(
|
|
|
cluster_uuid: str,
|
|
|
user=Depends(PermissionChecker(["cluster:start"])),
|
|
|
db: AsyncSession = Depends(get_db)
|
|
|
):
|
|
|
"""启动集群:在 NameNode 执行 hsfsstart,在 ResourceManager 执行 yarnstart。"""
|
|
|
try:
|
|
|
# UUID 格式校验
|
|
|
try:
|
|
|
uuidlib.UUID(cluster_uuid)
|
|
|
except ValueError:
|
|
|
raise HTTPException(status_code=400, detail="invalid_uuid_format")
|
|
|
|
|
|
uname = _get_username(user)
|
|
|
user_id = getattr(user, "id", 1)
|
|
|
|
|
|
# 1. 查找集群
|
|
|
res = await db.execute(select(Cluster).where(Cluster.uuid == cluster_uuid).limit(1))
|
|
|
cluster = res.scalars().first()
|
|
|
if not cluster:
|
|
|
raise HTTPException(status_code=404, detail="cluster_not_found")
|
|
|
|
|
|
# 2. 获取 SSH 用户 (从关联节点中获取,默认为 hadoop)
|
|
|
node_res = await db.execute(select(Node).where(Node.cluster_id == cluster.id).limit(1))
|
|
|
node = node_res.scalars().first()
|
|
|
ssh_user = node.ssh_user if node and node.ssh_user else "hadoop"
|
|
|
|
|
|
start_time = _now()
|
|
|
logs = []
|
|
|
|
|
|
# 3. 在 NameNode 执行 start-dfs.sh
|
|
|
if cluster.namenode_ip and cluster.namenode_psw:
|
|
|
try:
|
|
|
def run_nn_start():
|
|
|
with SSHClient(str(cluster.namenode_ip), ssh_user, cluster.namenode_psw) as client:
|
|
|
return client.execute_command("start-dfs.sh")
|
|
|
out, err = await asyncio.to_thread(run_nn_start)
|
|
|
logs.append(f"NameNode ({cluster.namenode_ip}) start: {out} {err}")
|
|
|
except Exception as e:
|
|
|
logs.append(f"NameNode ({cluster.namenode_ip}) start failed: {str(e)}")
|
|
|
|
|
|
# 4. 在 ResourceManager 执行 start-yarn.sh
|
|
|
if cluster.rm_ip and cluster.rm_psw:
|
|
|
try:
|
|
|
def run_rm_start():
|
|
|
with SSHClient(str(cluster.rm_ip), ssh_user, cluster.rm_psw) as client:
|
|
|
return client.execute_command("start-yarn.sh")
|
|
|
out, err = await asyncio.to_thread(run_rm_start)
|
|
|
logs.append(f"ResourceManager ({cluster.rm_ip}) start: {out} {err}")
|
|
|
except Exception as e:
|
|
|
logs.append(f"ResourceManager ({cluster.rm_ip}) start failed: {str(e)}")
|
|
|
|
|
|
end_time = _now()
|
|
|
|
|
|
# 5. 更新集群状态 (仅当所有尝试都未抛出异常时)
|
|
|
# 改进:检查是否有失败日志
|
|
|
has_failed = any("failed" in log.lower() for log in logs)
|
|
|
if not has_failed:
|
|
|
cluster.health_status = "healthy"
|
|
|
else:
|
|
|
cluster.health_status = "error"
|
|
|
|
|
|
cluster.updated_at = end_time
|
|
|
await db.flush()
|
|
|
|
|
|
# 6. 记录日志
|
|
|
full_desc = " | ".join(logs)
|
|
|
await _write_hadoop_exec_log(db, user_id, cluster.name, f"Start Cluster: {full_desc}", start_time, end_time)
|
|
|
|
|
|
return {"status": "success", "logs": logs}
|
|
|
except HTTPException:
|
|
|
raise
|
|
|
except Exception as e:
|
|
|
print(f"Error starting cluster: {e}")
|
|
|
raise HTTPException(status_code=500, detail="server_error")
|
|
|
|
|
|
|
|
|
@router.post("/ops/clusters/{cluster_uuid}/stop")
|
|
|
async def stop_cluster(
|
|
|
cluster_uuid: str,
|
|
|
user=Depends(PermissionChecker(["cluster:stop"])),
|
|
|
db: AsyncSession = Depends(get_db)
|
|
|
):
|
|
|
"""停止集群:在 NameNode 执行 hsfsstop,在 ResourceManager 执行 yarnstop。"""
|
|
|
try:
|
|
|
# UUID 格式校验
|
|
|
try:
|
|
|
uuidlib.UUID(cluster_uuid)
|
|
|
except ValueError:
|
|
|
raise HTTPException(status_code=400, detail="invalid_uuid_format")
|
|
|
|
|
|
uname = _get_username(user)
|
|
|
user_id = getattr(user, "id", 1)
|
|
|
|
|
|
# 1. 查找集群
|
|
|
res = await db.execute(select(Cluster).where(Cluster.uuid == cluster_uuid).limit(1))
|
|
|
cluster = res.scalars().first()
|
|
|
if not cluster:
|
|
|
raise HTTPException(status_code=404, detail="cluster_not_found")
|
|
|
|
|
|
# 2. 获取 SSH 用户
|
|
|
node_res = await db.execute(select(Node).where(Node.cluster_id == cluster.id).limit(1))
|
|
|
node = node_res.scalars().first()
|
|
|
ssh_user = node.ssh_user if node and node.ssh_user else "hadoop"
|
|
|
|
|
|
start_time = _now()
|
|
|
logs = []
|
|
|
|
|
|
# 3. 在 NameNode 执行 stop-dfs.sh
|
|
|
if cluster.namenode_ip and cluster.namenode_psw:
|
|
|
try:
|
|
|
def run_nn_stop():
|
|
|
with SSHClient(str(cluster.namenode_ip), ssh_user, cluster.namenode_psw) as client:
|
|
|
return client.execute_command("stop-dfs.sh")
|
|
|
out, err = await asyncio.to_thread(run_nn_stop)
|
|
|
logs.append(f"NameNode ({cluster.namenode_ip}) stop: {out} {err}")
|
|
|
except Exception as e:
|
|
|
logs.append(f"NameNode ({cluster.namenode_ip}) stop failed: {str(e)}")
|
|
|
|
|
|
# 4. 在 ResourceManager 执行 stop-yarn.sh
|
|
|
if cluster.rm_ip and cluster.rm_psw:
|
|
|
try:
|
|
|
def run_rm_stop():
|
|
|
with SSHClient(str(cluster.rm_ip), ssh_user, cluster.rm_psw) as client:
|
|
|
return client.execute_command("stop-yarn.sh")
|
|
|
out, err = await asyncio.to_thread(run_rm_stop)
|
|
|
logs.append(f"ResourceManager ({cluster.rm_ip}) stop: {out} {err}")
|
|
|
except Exception as e:
|
|
|
logs.append(f"ResourceManager ({cluster.rm_ip}) stop failed: {str(e)}")
|
|
|
|
|
|
end_time = _now()
|
|
|
|
|
|
# 5. 更新集群状态
|
|
|
cluster.health_status = "unknown"
|
|
|
cluster.updated_at = end_time
|
|
|
await db.flush()
|
|
|
|
|
|
# 6. 记录日志
|
|
|
full_desc = " | ".join(logs)
|
|
|
await _write_hadoop_exec_log(db, user_id, cluster.name, f"Stop Cluster: {full_desc}", start_time, end_time)
|
|
|
|
|
|
return {"status": "success", "logs": logs}
|
|
|
except HTTPException:
|
|
|
raise
|
|
|
except Exception as e:
|
|
|
print(f"Error stopping cluster: {e}")
|
|
|
raise HTTPException(status_code=500, detail="server_error")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|