You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ErrorDetecting/backend/app/routers/faults.py

207 lines
6.7 KiB

from fastapi import APIRouter, Depends, Query, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, delete, update
from ..db import get_db
from ..models.hadoop_logs import HadoopLog
from ..models.clusters import Cluster
from ..deps.auth import get_current_user
from pydantic import BaseModel
from datetime import datetime
import json
from ..config import now_bj
from ..config import BJ_TZ
router = APIRouter()
def _get_username(u) -> str:
return getattr(u, "username", None) or (u.get("username") if isinstance(u, dict) else None)
def _now():
return now_bj()
def _map_level(level: str) -> str:
lv = (level or "").lower()
if lv in ("critical", "fatal"):
return "FATAL"
if lv == "high":
return "ERROR"
if lv == "medium":
return "WARN"
return "INFO"
class FaultCreate(BaseModel):
id: str | None = None
type: str
level: str
status: str
title: str
cluster: str | None = None
node: str | None = None
created: str | None = None
class FaultUpdate(BaseModel):
status: str | None = None
title: str | None = None
@router.get("/faults")
async def list_faults(
user=Depends(get_current_user),
db: AsyncSession = Depends(get_db),
cluster: str | None = Query(None),
node: str | None = Query(None),
time_from: str | None = Query(None),
page: int = Query(1, ge=1),
size: int = Query(10, ge=1, le=100),
):
try:
stmt = select(HadoopLog).where(HadoopLog.title == "fault")
count_stmt = select(func.count(HadoopLog.log_id)).where(HadoopLog.title == "fault")
if cluster:
stmt = stmt.where(HadoopLog.cluster_name == cluster)
count_stmt = count_stmt.where(HadoopLog.cluster_name == cluster)
if node:
stmt = stmt.where(HadoopLog.node_host == node)
count_stmt = count_stmt.where(HadoopLog.node_host == node)
if time_from:
try:
tf = datetime.fromisoformat(time_from.replace("Z", "+00:00"))
if tf.tzinfo is None:
tf = tf.replace(tzinfo=BJ_TZ)
else:
tf = tf.astimezone(BJ_TZ)
stmt = stmt.where(HadoopLog.log_time >= tf)
count_stmt = count_stmt.where(HadoopLog.log_time >= tf)
except Exception:
pass
stmt = stmt.order_by(HadoopLog.log_time.desc()).offset((page - 1) * size).limit(size)
rows = (await db.execute(stmt)).scalars().all()
total = (await db.execute(count_stmt)).scalar() or 0
items = []
for r in rows:
meta = {}
try:
if r.info:
meta = json.loads(r.info)
except Exception:
pass
items.append({
"id": str(r.log_id),
"type": meta.get("type", "unknown"),
"level": r.title,
"status": meta.get("status", "active"),
"title": meta.get("title", r.title),
"cluster": r.cluster_name,
"node": r.node_host,
"created": r.log_time.isoformat() if r.log_time else None
})
return {"items": items, "total": int(total)}
except HTTPException:
raise
except Exception as e:
print(f"Error listing faults: {e}")
raise HTTPException(status_code=500, detail="server_error")
@router.post("/faults")
async def create_fault(req: FaultCreate, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
try:
uname = _get_username(user)
if uname not in {"admin", "ops"}:
raise HTTPException(status_code=403, detail="not_allowed")
# 确定集群名称
cluster_name = req.cluster or "unknown"
if req.cluster and "-" in req.cluster: # 可能是 UUID
res = await db.execute(select(Cluster.name).where(Cluster.uuid == req.cluster).limit(1))
name = res.scalars().first()
if name:
cluster_name = name
ts = _now()
if req.created:
try:
dt = datetime.fromisoformat(req.created.replace("Z", "+00:00"))
if dt.tzinfo is None:
ts = dt.replace(tzinfo=BJ_TZ)
else:
ts = dt.astimezone(BJ_TZ)
except Exception:
pass
meta = {"type": req.type, "status": req.status, "title": req.title, "cluster": req.cluster, "node": req.node}
log = HadoopLog(
cluster_name=cluster_name,
node_host=req.node or "unknown",
title="fault",
info=json.dumps(meta, ensure_ascii=False),
log_time=ts
)
db.add(log)
await db.commit()
return {"ok": True, "id": log.log_id}
except HTTPException:
raise
except Exception as e:
print(f"Error creating fault: {e}")
raise HTTPException(status_code=500, detail="server_error")
@router.put("/faults/{fid}")
async def update_fault(fid: int, req: FaultUpdate, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
try:
uname = _get_username(user)
if uname not in {"admin", "ops"}:
raise HTTPException(status_code=403, detail="not_allowed")
res = await db.execute(select(HadoopLog).where(HadoopLog.log_id == fid, HadoopLog.title == "fault").limit(1))
row = res.scalars().first()
if not row:
raise HTTPException(status_code=404, detail="not_found")
meta = {}
try:
if row.info:
meta = json.loads(row.info)
except Exception:
pass
if req.status is not None:
meta["status"] = req.status
if req.title is not None:
meta["title"] = req.title
row.info = json.dumps(meta, ensure_ascii=False)
await db.commit()
return {"ok": True}
except HTTPException:
raise
except Exception as e:
print(f"Error updating fault: {e}")
raise HTTPException(status_code=500, detail="server_error")
@router.delete("/faults/{fid}")
async def delete_fault(fid: int, user=Depends(get_current_user), db: AsyncSession = Depends(get_db)):
try:
uname = _get_username(user)
if uname not in {"admin", "ops"}:
raise HTTPException(status_code=403, detail="not_allowed")
await db.execute(delete(HadoopLog).where(HadoopLog.log_id == fid, HadoopLog.title == "fault"))
await db.commit()
return {"ok": True}
except HTTPException:
raise
except Exception as e:
print(f"Error deleting fault: {e}")
raise HTTPException(status_code=500, detail="server_error")