日志采集接口修改前保存

zoujiaxuan_branch
echo 3 months ago
parent 13d8b53674
commit a4a92c6f70

@ -3,7 +3,8 @@ from typing import Any, Dict, List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from ..services.llm import LLMClient
from ..services.ops_tools import openai_tools_schema, tool_read_log, tool_start_cluster, tool_stop_cluster
from ..services.ops_tools import openai_tools_schema, tool_read_log, tool_start_cluster, tool_stop_cluster, tool_read_cluster_log, tool_detect_cluster_faults, tool_run_cluster_command
import json
async def run_diagnose_and_repair(db: AsyncSession, operator: str, context: Dict[str, Any], auto: bool = True, max_steps: int = 3, model: Optional[str] = None) -> Dict[str, Any]:
@ -44,10 +45,48 @@ async def run_diagnose_and_repair(db: AsyncSession, operator: str, context: Dict
for tc in tool_calls:
fn = (tc.get("function") or {})
name = fn.get("name")
args = fn.get("arguments") or {}
raw_args = fn.get("arguments") or {}
if isinstance(raw_args, str):
try:
args = json.loads(raw_args)
except Exception:
args = {}
elif isinstance(raw_args, dict):
args = raw_args
else:
args = {}
result: Dict[str, Any]
if name == "read_log":
result = await tool_read_log(db, operator, args.get("node"), args.get("path"), int(args.get("lines", 200)), args.get("pattern"), args.get("sshUser"))
elif name == "read_cluster_log":
result = await tool_read_cluster_log(
db=db,
user_name=operator,
cluster_uuid=args.get("cluster_uuid"),
log_type=args.get("log_type"),
node_hostname=args.get("node_hostname"),
lines=int(args.get("lines", 100)),
)
elif name == "detect_cluster_faults":
result = await tool_detect_cluster_faults(
db=db,
user_name=operator,
cluster_uuid=args.get("cluster_uuid"),
components=args.get("components"),
node_hostname=args.get("node_hostname"),
lines=int(args.get("lines", 200)),
)
elif name == "run_cluster_command":
result = await tool_run_cluster_command(
db=db,
user_name=operator,
cluster_uuid=args.get("cluster_uuid"),
command_key=args.get("command_key"),
target=args.get("target"),
node_hostname=args.get("node_hostname"),
timeout=int(args.get("timeout", 30)),
limit_nodes=int(args.get("limit_nodes", 20)),
)
elif name == "start_cluster":
result = await tool_start_cluster(db, operator, args.get("cluster_uuid"))
elif name == "stop_cluster":
@ -57,4 +96,3 @@ async def run_diagnose_and_repair(db: AsyncSession, operator: str, context: Dict
actions.append({"name": name, "args": args, "result": result})
messages.append({"role": "tool", "content": str(result), "name": name})
return {"rootCause": root_cause, "actions": actions, "residualRisk": residual_risk}

@ -2,9 +2,18 @@ import os
import json
from dotenv import load_dotenv
from typing import Dict, Tuple
from datetime import datetime
from zoneinfo import ZoneInfo
load_dotenv()
# Timezone Configuration
APP_TIMEZONE = os.getenv("APP_TIMEZONE", "Asia/Shanghai")
BJ_TZ = ZoneInfo(APP_TIMEZONE)
def now_bj() -> datetime:
return datetime.now(BJ_TZ)
# Database Configuration
_db_url = os.getenv("DATABASE_URL")
if not _db_url:

@ -1,7 +1,12 @@
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from .config import DATABASE_URL
from .config import DATABASE_URL, APP_TIMEZONE
engine = create_async_engine(DATABASE_URL, echo=False, pool_pre_ping=True)
engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_pre_ping=True,
connect_args={"server_settings": {"timezone": APP_TIMEZONE}},
)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
async def get_db() -> AsyncSession:

@ -3,13 +3,14 @@ import time
import uuid
import datetime
from typing import Dict, List, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker, AsyncEngine
from .log_reader import log_reader
from .ssh_utils import ssh_manager
from .db import SessionLocal
from .models.hadoop_logs import HadoopLog
from sqlalchemy import text
import asyncio
from .config import BJ_TZ, DATABASE_URL, APP_TIMEZONE
class LogCollector:
"""Real-time log collector for Hadoop cluster"""
@ -19,15 +20,19 @@ class LogCollector:
self.is_running: bool = False
self.collection_interval: int = 5 # 默认采集间隔,单位:秒
self._loops: Dict[str, asyncio.AbstractEventLoop] = {}
self._engines: Dict[str, AsyncEngine] = {}
self._session_locals: Dict[str, async_sessionmaker[AsyncSession]] = {}
self._intervals: Dict[str, int] = {}
self._cluster_name_cache: Dict[str, str] = {}
self._targets: Dict[str, str] = {}
self._line_counts: Dict[str, int] = {}
self.max_bytes_per_pull: int = 256 * 1024
def start_collection(self, node_name: str, log_type: str, ip: Optional[str] = None, interval: Optional[int] = None) -> bool:
"""Start real-time log collection for a specific node and log type"""
if interval:
self.collection_interval = interval
collector_id = f"{node_name}_{log_type}"
if interval is not None:
self._intervals[collector_id] = max(1, int(interval))
if collector_id in self.collectors and self.collectors[collector_id].is_alive():
print(f"Collector {collector_id} is already running")
@ -56,6 +61,7 @@ class LogCollector:
# Threads are daemon, so they will exit when main process exits
# We just remove it from our tracking
del self.collectors[collector_id]
self._intervals.pop(collector_id, None)
print(f"Stopped collector {collector_id}")
else:
print(f"Collector {collector_id} is not running")
@ -80,10 +86,10 @@ class LogCollector:
if timestamp_end > 0:
timestamp_str = line[1:timestamp_end]
try:
timestamp = datetime.datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S,%f").replace(tzinfo=datetime.timezone.utc)
timestamp = datetime.datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S,%f").replace(tzinfo=BJ_TZ)
except ValueError:
# If parsing fails, use current time
timestamp = datetime.datetime.now(datetime.timezone.utc)
timestamp = datetime.datetime.now(BJ_TZ)
# Extract log level
log_levels = ["ERROR", "WARN", "INFO", "DEBUG", "TRACE"]
@ -93,7 +99,7 @@ class LogCollector:
break
return {
"timestamp": timestamp or datetime.datetime.now(datetime.timezone.utc),
"timestamp": timestamp or datetime.datetime.now(BJ_TZ),
"log_level": log_level,
"message": message,
"host": node_name,
@ -101,19 +107,24 @@ class LogCollector:
"raw_log": line
}
async def _save_log_to_db(self, log_data: Dict):
async def _save_log_to_db(self, log_data: Dict, collector_id: str | None = None):
"""Save log data to database"""
try:
async with SessionLocal() as session:
session_local = self._session_locals.get(collector_id) if collector_id else None
async with (session_local() if session_local else SessionLocal()) as session:
# 获取集群名称
cluster_res = await session.execute(text("""
SELECT c.name
FROM clusters c
JOIN nodes n ON c.id = n.cluster_id
WHERE n.hostname = :hn LIMIT 1
"""), {"hn": log_data["host"]})
cluster_row = cluster_res.first()
cluster_name = cluster_row[0] if cluster_row else "default_cluster"
host = log_data["host"]
cluster_name = self._cluster_name_cache.get(host)
if not cluster_name:
cluster_res = await session.execute(text("""
SELECT c.name
FROM clusters c
JOIN nodes n ON c.id = n.cluster_id
WHERE n.hostname = :hn LIMIT 1
"""), {"hn": host})
cluster_row = cluster_res.first()
cluster_name = cluster_row[0] if cluster_row else "default_cluster"
self._cluster_name_cache[host] = cluster_name
# Create HadoopLog instance
hadoop_log = HadoopLog(
@ -130,27 +141,34 @@ class LogCollector:
except Exception as e:
print(f"Error saving log to database: {e}")
async def _save_logs_to_db_batch(self, logs: List[Dict]):
async def _save_logs_to_db_batch(self, logs: List[Dict], collector_id: str | None = None):
"""Save a batch of logs to database in one transaction"""
try:
async with SessionLocal() as session:
for log_data in logs:
session_local = self._session_locals.get(collector_id) if collector_id else None
async with (session_local() if session_local else SessionLocal()) as session:
host = logs[0]["host"] if logs else None
cluster_name = self._cluster_name_cache.get(host) if host else None
if host and not cluster_name:
cluster_res = await session.execute(text("""
SELECT c.name
FROM clusters c
JOIN nodes n ON c.id = n.cluster_id
SELECT c.name
FROM clusters c
JOIN nodes n ON c.id = n.cluster_id
WHERE n.hostname = :hn LIMIT 1
"""), {"hn": log_data["host"]})
"""), {"hn": host})
cluster_row = cluster_res.first()
cluster_name = cluster_row[0] if cluster_row else "default_cluster"
hadoop_log = HadoopLog(
self._cluster_name_cache[host] = cluster_name
objs: list[HadoopLog] = []
for log_data in logs:
objs.append(HadoopLog(
log_time=log_data["timestamp"],
node_host=log_data["host"],
title=log_data["service"],
info=log_data["message"],
cluster_name=cluster_name
)
session.add(hadoop_log)
cluster_name=cluster_name or "default_cluster",
))
session.add_all(objs)
await session.commit()
except Exception as e:
print(f"Error batch saving logs: {e}")
@ -163,16 +181,26 @@ class LogCollector:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
self._loops[collector_id] = loop
engine = create_async_engine(
DATABASE_URL,
echo=False,
pool_pre_ping=True,
connect_args={"server_settings": {"timezone": APP_TIMEZONE}},
pool_size=1,
max_overflow=0,
)
self._engines[collector_id] = engine
self._session_locals[collector_id] = async_sessionmaker(engine, expire_on_commit=False, class_=AsyncSession)
last_file_size = 0
last_line_count = self._line_counts.get(collector_id, 0)
last_remote_size = 0
retry_count = 0
max_retries = 3
while collector_id in self.collectors:
try:
# Wait for next collection interval
time.sleep(self.collection_interval)
interval = self._intervals.get(collector_id, self.collection_interval)
time.sleep(interval)
# Resolve target file once and reuse
target = self._targets.get(collector_id)
@ -207,40 +235,41 @@ class LogCollector:
retry_count += 1
continue
# Read current log content
ssh_client = ssh_manager.get_connection(node_name, ip=ip)
current_log_content = ""
out2, err2 = ssh_client.execute_command(f"cat {target} 2>/dev/null")
if not err2:
current_log_content = out2
current_file_size = len(current_log_content)
# Check if log file has new content
if current_file_size > last_file_size:
# Extract new content
new_content = current_log_content[last_file_size:]
# Save new content to database
self._save_log_chunk(node_name, log_type, new_content)
# Update last file size
last_file_size = current_file_size
print(f"Collected {len(new_content.splitlines())} new lines from {node_name}_{log_type}")
size_out, size_err = ssh_client.execute_command(f"stat -c %s {target} 2>/dev/null")
if size_err:
retry_count += 1
continue
try:
remote_size = int((size_out or "").strip())
except Exception:
retry_count += 1
continue
if remote_size < last_remote_size:
last_remote_size = 0
if remote_size > last_remote_size:
delta = remote_size - last_remote_size
if delta > self.max_bytes_per_pull:
start_pos = remote_size - self.max_bytes_per_pull + 1
last_remote_size = remote_size - self.max_bytes_per_pull
else:
start_pos = last_remote_size + 1
out2, err2 = ssh_client.execute_command(f"tail -c +{start_pos} {target} 2>/dev/null")
if err2:
out2, err2 = ssh_client.execute_command(f"dd if={target} bs=1 skip={max(0, start_pos - 1)} 2>/dev/null")
if not err2 and out2 and out2.strip():
self._save_log_chunk(node_name, log_type, out2)
print(f"Collected new logs from {node_name}_{log_type} bytes={len(out2)}")
last_remote_size = remote_size
# Reset retry count on successful collection
retry_count = 0
# Also track by line count to cover same-length encodings
lines = current_log_content.splitlines()
if len(lines) > last_line_count:
tail = "\n".join(lines[last_line_count:])
if tail.strip():
self._save_log_chunk(node_name, log_type, tail)
print(f"Collected {len(lines) - last_line_count} new lines (by count) from {node_name}_{log_type}")
last_line_count = len(lines)
self._line_counts[collector_id] = last_line_count
except Exception as e:
print(f"Error collecting logs from {node_name}_{log_type}: {e}")
retry_count += 1
@ -254,6 +283,10 @@ class LogCollector:
try:
loop = self._loops.pop(collector_id, None)
engine = self._engines.pop(collector_id, None)
self._session_locals.pop(collector_id, None)
if engine and loop:
loop.run_until_complete(engine.dispose())
if loop and loop.is_running():
loop.stop()
if loop:
@ -277,7 +310,7 @@ class LogCollector:
collector_id = f"{node_name}_{log_type}"
loop = self._loops.get(collector_id)
if loop:
loop.run_until_complete(self._save_logs_to_db_batch(log_batch))
loop.run_until_complete(self._save_logs_to_db_batch(log_batch, collector_id=collector_id))
else:
asyncio.run(self._save_logs_to_db_batch(log_batch))
@ -291,6 +324,8 @@ class LogCollector:
def set_collection_interval(self, interval: int):
"""Set the collection interval"""
self.collection_interval = max(1, interval) # Ensure interval is at least 1 second
for k in list(self._intervals.keys()):
self._intervals[k] = self.collection_interval
print(f"Set collection interval to {self.collection_interval} seconds")
def set_log_dir(self, log_dir: str):

@ -10,6 +10,7 @@ from .db import SessionLocal
from .models.nodes import Node
from .models.clusters import Cluster
import asyncio
from .config import BJ_TZ
class MetricsCollector:
def __init__(self):
@ -88,7 +89,7 @@ class MetricsCollector:
async def _save_metrics(self, node_id: int, hostname: str, cluster_id: int, cpu: float, mem: float):
async with SessionLocal() as session:
now = datetime.datetime.now(datetime.timezone.utc)
now = datetime.datetime.now(BJ_TZ)
await session.execute(text("UPDATE nodes SET cpu_usage=:cpu, memory_usage=:mem, last_heartbeat=:hb WHERE id=:nid"), {"cpu": cpu, "mem": mem, "hb": now, "nid": node_id})
await session.commit()
rows = await session.execute(select(func.avg(Node.cpu_usage), func.avg(Node.memory_usage)).where(Node.cluster_id == cluster_id))

@ -1,6 +1,7 @@
from sqlalchemy import Column, Integer, String, Text, DateTime, ForeignKey, Boolean
from sqlalchemy.orm import relationship
from datetime import datetime, timezone
from datetime import datetime
from ..config import BJ_TZ
from . import Base
class ChatSession(Base):
@ -9,8 +10,8 @@ class ChatSession(Base):
id = Column(String, primary_key=True, index=True) # UUID
user_id = Column(Integer, nullable=True, index=True) # Can be linked to a user
title = Column(String, nullable=True)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc), onupdate=lambda: datetime.now(timezone.utc))
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(BJ_TZ))
updated_at = Column(DateTime(timezone=True), default=lambda: datetime.now(BJ_TZ), onupdate=lambda: datetime.now(BJ_TZ))
messages = relationship("ChatMessage", back_populates="session", cascade="all, delete-orphan", lazy="selectin")
@ -21,7 +22,7 @@ class ChatMessage(Base):
session_id = Column(String, ForeignKey("chat_sessions.id"), nullable=False)
role = Column(String, nullable=False) # system, user, assistant, tool
content = Column(Text, nullable=False)
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(BJ_TZ))
# Optional: store tool calls or extra metadata if needed
# For now, we store JSON in content if it's complex, or just text.

@ -13,7 +13,7 @@ from ..models.hadoop_logs import HadoopLog
from ..models.chat import ChatSession, ChatMessage
from ..agents.diagnosis_agent import run_diagnose_and_repair
from ..services.llm import LLMClient
from ..services.ops_tools import openai_tools_schema, tool_web_search
from ..services.ops_tools import openai_tools_schema, tool_web_search, tool_start_cluster, tool_stop_cluster, tool_read_log, tool_read_cluster_log, tool_detect_cluster_faults, tool_run_cluster_command
router = APIRouter()
@ -91,11 +91,10 @@ async def ai_chat(req: ChatReq, user=Depends(get_current_user), db: AsyncSession
db.add(session)
system_prompt = (
"You are a helpful Hadoop diagnostic assistant. "
"Please provide clear, structured, and well-formatted responses using Markdown. "
"Use headers (##, ###) for sections, bullet points for lists, and code blocks for logs or commands. "
"Ensure proper line breaks between paragraphs and sections to enhance readability. "
"If you are providing a diagnosis, use a 'Diagnosis' and 'Recommendation' structure."
"你是 Hadoop 运维诊断助手。输出中文,优先给出根因、影响范围、证据与建议。"
"当用户询问“故障/异常/报错/不可用/打不开/任务失败”等问题时,优先调用 detect_cluster_faults"
"必要时再用 read_cluster_log 补充读取对应组件日志。"
"当用户询问进程/端口/资源/版本等日常运维信息时,优先调用 run_cluster_command例如 jps/df/free/hdfs_report/yarn_node_list"
)
if req.context:
if req.context.get("agent"):
@ -117,14 +116,14 @@ async def ai_chat(req: ChatReq, user=Depends(get_current_user), db: AsyncSession
llm = LLMClient()
target_model = req.context.get("model") if req.context else None
web_search_enabled = bool(req.context and req.context.get("webSearch"))
chat_tools = None
if web_search_enabled:
tools = openai_tools_schema()
chat_tools = [t for t in tools if t["function"]["name"] == "web_search"]
if req.stream and not web_search_enabled:
return await handle_streaming_chat(llm, messages, internal_id, db, tools=None, model=target_model)
# 默认加载所有可用运维工具
chat_tools = openai_tools_schema()
if req.stream:
# 流式暂不支持工具调用后的二次生成(为了简化),如果检测到可能需要工具,先走非流式
# 或者这里可以根据需求调整,目前先保持非流式处理工具逻辑
pass
resp = await llm.chat(messages, tools=chat_tools, stream=False, model=target_model)
choices = resp.get("choices") or []
@ -145,8 +144,44 @@ async def ai_chat(req: ChatReq, user=Depends(get_current_user), db: AsyncSession
args = {}
tool_result = {"error": "unknown_tool"}
uname = _get_username(user)
if name == "web_search":
tool_result = await tool_web_search(args.get("query"), args.get("max_results", 5))
elif name == "start_cluster":
tool_result = await tool_start_cluster(db, uname, args.get("cluster_uuid"))
elif name == "stop_cluster":
tool_result = await tool_stop_cluster(db, uname, args.get("cluster_uuid"))
elif name == "read_log":
tool_result = await tool_read_log(db, uname, args.get("node"), args.get("path"), int(args.get("lines", 200)), args.get("pattern"), args.get("sshUser"))
elif name == "read_cluster_log":
tool_result = await tool_read_cluster_log(
db,
uname,
args.get("cluster_uuid"),
args.get("log_type"),
args.get("node_hostname"),
int(args.get("lines", 100))
)
elif name == "detect_cluster_faults":
tool_result = await tool_detect_cluster_faults(
db,
uname,
args.get("cluster_uuid"),
args.get("components"),
args.get("node_hostname"),
int(args.get("lines", 200)),
)
elif name == "run_cluster_command":
tool_result = await tool_run_cluster_command(
db,
uname,
args.get("cluster_uuid"),
args.get("command_key"),
args.get("target"),
args.get("node_hostname"),
int(args.get("timeout", 30)),
int(args.get("limit_nodes", 20)),
)
messages.append({
"role": "tool",

@ -9,6 +9,7 @@ from ..config import JWT_SECRET, JWT_EXPIRE_MINUTES
import jwt
from datetime import datetime, timedelta, timezone
import re
from ..config import now_bj
router = APIRouter()
@ -88,7 +89,7 @@ async def _get_role_permissions(db: AsyncSession, role_keys: list[str]) -> list[
async def login(req: LoginRequest, db: AsyncSession = Depends(get_db)):
demo = {"admin": "admin123", "ops": "ops123", "obs": "obs123"}
if req.username in demo and req.password == demo[req.username]:
exp = datetime.now(timezone.utc) + timedelta(minutes=JWT_EXPIRE_MINUTES)
exp = now_bj() + timedelta(minutes=JWT_EXPIRE_MINUTES)
token = jwt.encode({"sub": req.username, "exp": exp}, JWT_SECRET, algorithm="HS256")
# 为 demo 账号获取角色和权限
@ -127,7 +128,7 @@ async def login(req: LoginRequest, db: AsyncSession = Depends(get_db)):
roles = await _get_user_roles(db, user.id)
permissions = await _get_role_permissions(db, roles)
exp = datetime.now(timezone.utc) + timedelta(minutes=JWT_EXPIRE_MINUTES)
exp = now_bj() + timedelta(minutes=JWT_EXPIRE_MINUTES)
token = jwt.encode({"sub": user.username, "exp": exp}, JWT_SECRET, algorithm="HS256")
return {
"ok": True,
@ -185,8 +186,8 @@ async def register(req: RegisterRequest, db: AsyncSession = Depends(get_db)):
full_name=req.fullName,
is_active=True,
last_login=None,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
created_at=now_bj(),
updated_at=now_bj(),
)
db.add(user)
await db.flush()
@ -194,7 +195,7 @@ async def register(req: RegisterRequest, db: AsyncSession = Depends(get_db)):
await _map_user_role(db, req.username, "observer")
permissions = await _get_role_permissions(db, ["observer"])
exp = datetime.now(timezone.utc) + timedelta(minutes=JWT_EXPIRE_MINUTES)
exp = now_bj() + timedelta(minutes=JWT_EXPIRE_MINUTES)
token = jwt.encode({"sub": user.username, "exp": exp}, JWT_SECRET, algorithm="HS256")
return {
"ok": True,

@ -9,6 +9,7 @@ from ..services.ssh_probe import check_ssh_connectivity, get_hdfs_cluster_id
from pydantic import BaseModel
from datetime import datetime, timezone
import uuid as uuidlib
from ..config import now_bj
router = APIRouter()
@ -156,8 +157,8 @@ async def create_cluster(
rm_psw=req.rm_psw,
description=req.description,
config_info={},
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
created_at=now_bj(),
updated_at=now_bj(),
)
db.add(c)
await db.flush() # 获取 c.id
@ -173,8 +174,8 @@ async def create_cluster(
ssh_user=n_req.ssh_user,
ssh_password=n_req.ssh_password,
status="unknown",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
created_at=now_bj(),
updated_at=now_bj(),
)
db.add(node)

@ -6,8 +6,10 @@ from ..models.hadoop_logs import HadoopLog
from ..models.clusters import Cluster
from ..deps.auth import get_current_user
from pydantic import BaseModel
from datetime import datetime, timezone
from datetime import datetime
import json
from ..config import now_bj
from ..config import BJ_TZ
router = APIRouter()
@ -17,7 +19,7 @@ def _get_username(u) -> str:
def _now():
return datetime.now(timezone.utc)
return now_bj()
def _map_level(level: str) -> str:
@ -70,6 +72,10 @@ async def list_faults(
if time_from:
try:
tf = datetime.fromisoformat(time_from.replace("Z", "+00:00"))
if tf.tzinfo is None:
tf = tf.replace(tzinfo=BJ_TZ)
else:
tf = tf.astimezone(BJ_TZ)
stmt = stmt.where(HadoopLog.log_time >= tf)
count_stmt = count_stmt.where(HadoopLog.log_time >= tf)
except Exception:
@ -124,7 +130,11 @@ async def create_fault(req: FaultCreate, user=Depends(get_current_user), db: Asy
ts = _now()
if req.created:
try:
ts = datetime.fromisoformat(req.created.replace("Z", "+00:00"))
dt = datetime.fromisoformat(req.created.replace("Z", "+00:00"))
if dt.tzinfo is None:
ts = dt.replace(tzinfo=BJ_TZ)
else:
ts = dt.astimezone(BJ_TZ)
except Exception:
pass

@ -7,6 +7,8 @@ from ..models.users import User
from ..deps.auth import get_current_user
from pydantic import BaseModel
from datetime import datetime, timezone
from ..config import now_bj
from ..config import BJ_TZ
router = APIRouter()
@ -26,16 +28,17 @@ class ExecLogUpdate(BaseModel):
def _now() -> datetime:
return datetime.now(timezone.utc)
return now_bj()
def _parse_time(s: str | None) -> datetime | None:
if not s:
return None
try:
if s.endswith("Z"):
s = s[:-1] + "+00:00"
return datetime.fromisoformat(s)
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
if dt.tzinfo is None:
return dt.replace(tzinfo=BJ_TZ)
return dt.astimezone(BJ_TZ)
except Exception:
return None

@ -15,6 +15,9 @@ import time
from ..models.node_metrics import NodeMetric
from ..models.cluster_metrics import ClusterMetric
from datetime import timedelta
from ..config import now_bj
from ..config import BJ_TZ
from zoneinfo import ZoneInfo
from ..schemas import (
LogRequest,
LogResponse,
@ -64,9 +67,10 @@ def _parse_time(s: str | None) -> datetime | None:
if not s:
return None
try:
if s.endswith("Z"):
s = s[:-1] + "+00:00"
return datetime.fromisoformat(s)
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
if dt.tzinfo is None:
return dt.replace(tzinfo=BJ_TZ)
return dt.astimezone(BJ_TZ)
except Exception:
return None
@ -410,7 +414,7 @@ async def sync_metrics(cluster_uuid: str, user=Depends(get_current_user), db: As
cid, cname = row
nodes_res = await db.execute(select(Node.id, Node.hostname, Node.ip_address).where(Node.cluster_id == cid))
rows = nodes_res.all()
now = datetime.now(timezone.utc)
now = now_bj()
details = []
for nid, hn, ip in rows:
ssh_client = ssh_manager.get_connection(hn, ip=str(ip))

@ -7,6 +7,7 @@ from ..models.nodes import Node
from ..models.clusters import Cluster
from pydantic import BaseModel
from datetime import datetime, timezone
from ..config import now_bj
router = APIRouter()
@ -32,7 +33,7 @@ def _fmt_percent(v: float | None) -> str:
def _fmt_updated(ts: datetime | None) -> str:
if not ts:
return "-"
now = datetime.now(timezone.utc)
now = now_bj()
diff = int((now - ts).total_seconds())
if diff < 60:
return "刚刚"
@ -117,4 +118,3 @@ async def node_detail(name: str, user=Depends(get_current_user), db: AsyncSessio
raise HTTPException(status_code=500, detail="server_error")

@ -15,6 +15,7 @@ from ..models.sys_exec_logs import SysExecLog
from ..models.hadoop_exec_logs import HadoopExecLog
from ..services.runner import run_remote_command
from ..ssh_utils import SSHClient
from ..config import now_bj
router = APIRouter()
@ -22,7 +23,7 @@ router = APIRouter()
def _now() -> datetime:
"""返回当前 UTC 时间。"""
return datetime.now(timezone.utc)
return now_bj()
def _get_username(u) -> str:
@ -138,6 +139,12 @@ async def start_cluster(
):
"""启动集群:在 NameNode 执行 hsfsstart在 ResourceManager 执行 yarnstart。"""
try:
# UUID 格式校验
try:
uuidlib.UUID(cluster_uuid)
except ValueError:
raise HTTPException(status_code=400, detail="invalid_uuid_format")
uname = _get_username(user)
user_id = getattr(user, "id", 1)
@ -179,8 +186,14 @@ async def start_cluster(
end_time = _now()
# 5. 更新集群状态
cluster.health_status = "healthy"
# 5. 更新集群状态 (仅当所有尝试都未抛出异常时)
# 改进:检查是否有失败日志
has_failed = any("failed" in log.lower() for log in logs)
if not has_failed:
cluster.health_status = "healthy"
else:
cluster.health_status = "error"
cluster.updated_at = end_time
await db.flush()
@ -204,6 +217,12 @@ async def stop_cluster(
):
"""停止集群:在 NameNode 执行 hsfsstop在 ResourceManager 执行 yarnstop。"""
try:
# UUID 格式校验
try:
uuidlib.UUID(cluster_uuid)
except ValueError:
raise HTTPException(status_code=400, detail="invalid_uuid_format")
uname = _get_username(user)
user_id = getattr(user, "id", 1)
@ -266,4 +285,3 @@ async def stop_cluster(

@ -8,6 +8,7 @@ from ..deps.auth import get_current_user
from passlib.hash import bcrypt
from datetime import datetime, timezone
import re
from ..config import now_bj
router = APIRouter()
@ -140,7 +141,7 @@ async def create_user(req: CreateUserRequest, user=Depends(get_current_user), db
temp_password = "TempPass#123"
password_hash = bcrypt.hash(temp_password)
now = datetime.now(timezone.utc)
now = now_bj()
user_obj = User(
username=req.username,
email=req.email,

@ -0,0 +1,51 @@
from __future__ import annotations
from ..config import SSH_TIMEOUT
from ..ssh_utils import SSHClient
def collect_cluster_uuid(host: str, user: str, password: str, timeout: int | None = None) -> tuple[str | None, str | None, str | None]:
cli = None
try:
cli = SSHClient(str(host), user or "", password or "")
out, err = cli.execute_command_with_timeout(
"hdfs getconf -confKey dfs.namenode.name.dir",
timeout or SSH_TIMEOUT,
)
if not out or not out.strip():
return None, "probe_name_dirs", (err or "empty_output")
name_dir = out.strip().split(",")[0]
if name_dir.startswith("file://"):
name_dir = name_dir[7:]
version_path = f"{name_dir.rstrip('/')}/current/VERSION"
version_out, version_err = cli.execute_command_with_timeout(
f"cat {version_path}",
timeout or SSH_TIMEOUT,
)
if not version_out or not version_out.strip():
return None, "read_version", (version_err or "empty_output")
cluster_id = None
for line in version_out.splitlines():
if "clusterID" in line:
parts = line.strip().split("=", 1)
if len(parts) == 2 and parts[0].strip() == "clusterID":
cluster_id = parts[1].strip()
break
if not cluster_id:
return None, "parse_cluster_id", version_out.strip()
if cluster_id.startswith("CID-"):
cluster_id = cluster_id[4:]
return cluster_id, None, None
except Exception as e:
return None, "connect_or_exec", str(e)
finally:
try:
if cli:
cli.close()
except Exception:
pass

@ -3,6 +3,7 @@ import asyncio
from typing import Any, Dict, List, Optional, Tuple
from datetime import datetime, timezone
import json
import re
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
@ -16,13 +17,15 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from ..models.nodes import Node
from ..models.clusters import Cluster
from ..models.hadoop_exec_logs import HadoopExecLog
from ..ssh_utils import SSHClient
from ..ssh_utils import SSHClient, ssh_manager
from .runner import run_remote_command
from ..log_reader import log_reader
from ..config import now_bj
def _now() -> datetime:
"""返回当前 UTC 时间。"""
return datetime.now(timezone.utc)
return now_bj()
async def _find_accessible_node(db: AsyncSession, user_name: str, hostname: str) -> Optional[Node]:
@ -39,6 +42,18 @@ async def _find_accessible_node(db: AsyncSession, user_name: str, hostname: str)
return res.scalars().first()
async def _user_has_cluster_access(db: AsyncSession, user_name: str, cluster_id: int) -> bool:
uid_res = await db.execute(text("SELECT id FROM users WHERE username=:un LIMIT 1"), {"un": user_name})
uid_row = uid_res.first()
if not uid_row:
return False
ok_res = await db.execute(
text("SELECT 1 FROM user_cluster_mapping WHERE user_id=:uid AND cluster_id=:cid LIMIT 1"),
{"uid": uid_row[0], "cid": cluster_id},
)
return ok_res.first() is not None
async def _write_exec_log(db: AsyncSession, exec_id: str, command_type: str, status: str, start: datetime, end: Optional[datetime], exit_code: Optional[int], operator: str, stdout: Optional[str] = None, stderr: Optional[str] = None):
"""写入执行审计日志。"""
# 查找 from_user_id 和 cluster_name
@ -216,8 +231,13 @@ async def tool_start_cluster(db: AsyncSession, user_name: str, cluster_uuid: str
end_time = _now()
# 6. 更新集群状态
cluster.health_status = "healthy"
# 6. 更新集群状态 (改进:检查是否有失败日志)
has_failed = any("failed" in log.lower() for log in logs)
if not has_failed:
cluster.health_status = "healthy"
else:
cluster.health_status = "error"
cluster.updated_at = end_time
await db.flush()
@ -294,6 +314,377 @@ async def tool_stop_cluster(db: AsyncSession, user_name: str, cluster_uuid: str)
return {"status": "success", "logs": logs}
async def tool_read_cluster_log(
db: AsyncSession,
user_name: str,
cluster_uuid: str,
log_type: str,
node_hostname: Optional[str] = None,
lines: int = 100,
) -> Dict[str, Any]:
"""读取集群中特定服务类型的日志。"""
import uuid as uuidlib
try:
uuidlib.UUID(cluster_uuid)
except ValueError:
return {"status": "error", "message": "invalid_uuid_format"}
stmt = select(Cluster).where(Cluster.uuid == cluster_uuid)
result = await db.execute(stmt)
cluster = result.scalar_one_or_none()
if not cluster:
return {"status": "error", "message": "cluster_not_found"}
if not await _user_has_cluster_access(db, user_name, int(cluster.id)):
return {"status": "error", "message": "cluster_forbidden"}
target_ip: Optional[str] = None
target_hostname: Optional[str] = node_hostname
ssh_user: Optional[str] = None
ssh_password: Optional[str] = None
if log_type.lower() == "namenode":
target_ip = str(cluster.namenode_ip) if cluster.namenode_ip else None
ssh_password = cluster.namenode_psw
if not target_hostname:
node_stmt = select(Node).where(Node.ip_address == cluster.namenode_ip)
node_res = await db.execute(node_stmt)
node_obj = node_res.scalar_one_or_none()
target_hostname = node_obj.hostname if node_obj else "namenode"
if node_obj and node_obj.ssh_user:
ssh_user = node_obj.ssh_user
elif log_type.lower() == "resourcemanager":
target_ip = str(cluster.rm_ip) if cluster.rm_ip else None
ssh_password = cluster.rm_psw
if not target_hostname:
node_stmt = select(Node).where(Node.ip_address == cluster.rm_ip)
node_res = await db.execute(node_stmt)
node_obj = node_res.scalar_one_or_none()
target_hostname = node_obj.hostname if node_obj else "resourcemanager"
if node_obj and node_obj.ssh_user:
ssh_user = node_obj.ssh_user
if not target_ip and target_hostname:
node = await _find_accessible_node(db, user_name, target_hostname)
if not node:
return {"status": "error", "message": "node_not_found"}
target_ip = str(node.ip_address)
ssh_user = node.ssh_user or ssh_user
ssh_password = node.ssh_password or ssh_password
if not target_ip:
return {"status": "error", "message": f"could_not_determine_node_for_{log_type}"}
if not target_hostname:
target_hostname = target_ip
def _tail_via_ssh() -> Dict[str, Any]:
ip = str(target_ip)
hn = str(target_hostname)
log_reader.find_working_log_dir(hn, ip)
ssh_client = ssh_manager.get_connection(hn, ip=ip, username=ssh_user, password=ssh_password)
paths = log_reader.get_log_file_paths(hn, log_type.lower())
for p in paths:
p_q = shlex.quote(p)
out, err = ssh_client.execute_command(f"ls -la {p_q} 2>/dev/null")
if err or not out.strip():
continue
out2, err2 = ssh_client.execute_command(f"tail -n {int(lines)} {p_q} 2>/dev/null")
if err2:
continue
return {"status": "success", "node": hn, "log_type": log_type, "path": p, "content": out2}
base_dir = log_reader._node_log_dir.get(hn, log_reader.log_dir)
base_q = shlex.quote(base_dir)
out, err = ssh_client.execute_command(f"ls -1 {base_q} 2>/dev/null")
if err or not out.strip():
return {"status": "error", "message": "log_dir_not_found", "node": hn}
for fn in out.splitlines():
f = (fn or "").strip()
lf = f.lower()
if not f:
continue
if log_type.lower() in lf and hn.lower() in lf and (lf.endswith(".log") or lf.endswith(".out") or lf.endswith(".out.1")):
full = f"{base_dir}/{f}"
full_q = shlex.quote(full)
out2, err2 = ssh_client.execute_command(f"tail -n {int(lines)} {full_q} 2>/dev/null")
if not err2:
return {"status": "success", "node": hn, "log_type": log_type, "path": full, "content": out2}
return {"status": "error", "message": "log_file_not_found", "node": hn}
return await asyncio.to_thread(_tail_via_ssh)
_FAULT_RULES: List[Dict[str, Any]] = [
{
"id": "hdfs_safemode",
"severity": "high",
"title": "NameNode 处于 SafeMode",
"patterns": [r"SafeModeException", r"NameNode is in safe mode", r"Safe mode is ON"],
"advice": "检查 DataNode 是否全部注册、磁盘与网络是否正常;必要时执行 hdfs dfsadmin -safemode leave。",
},
{
"id": "hdfs_standby",
"severity": "high",
"title": "访问到 Standby NameNode",
"patterns": [r"StandbyException", r"Operation category READ is not supported in state standby"],
"advice": "确认客户端的 fs.defaultFS/HA 配置;确认 active/standby 切换状态是否正确。",
},
{
"id": "rpc_connection_refused",
"severity": "high",
"title": "RPC 连接被拒绝或目标服务未启动",
"patterns": [r"java\.net\.ConnectException:\s*Connection refused", r"Call to .* failed on local exception", r"Connection refused"],
"advice": "确认对应守护进程是否存活、端口是否监听、iptables/安全组是否放通。",
},
{
"id": "dns_or_route",
"severity": "high",
"title": "DNS/网络不可达",
"patterns": [r"UnknownHostException", r"No route to host", r"Network is unreachable", r"Connection timed out"],
"advice": "检查 DNS 解析、/etc/hosts、一致的主机名配置与网络连通性。",
},
{
"id": "disk_no_space",
"severity": "high",
"title": "磁盘空间不足",
"patterns": [r"No space left on device", r"DiskOutOfSpaceException", r"ENOSPC"],
"advice": "清理磁盘、检查日志/临时目录增长;确认 DataNode 存储目录剩余空间。",
},
{
"id": "permission_denied",
"severity": "medium",
"title": "权限不足或 HDFS ACL/权限问题",
"patterns": [r"Permission denied", r"AccessControlException"],
"advice": "检查用户/组映射、HDFS 权限与 ACL确认相关目录权限与 umask。",
},
{
"id": "kerberos_auth",
"severity": "high",
"title": "Kerberos 认证失败",
"patterns": [r"GSSException", r"Failed to find any Kerberos tgt", r"Client cannot authenticate via:\s*\[TOKEN, KERBEROS\]"],
"advice": "检查 KDC、keytab、principal、时间同步确认客户端已 kinit 且票据未过期。",
},
{
"id": "oom",
"severity": "high",
"title": "Java 内存溢出",
"patterns": [r"OutOfMemoryError", r"Java heap space", r"GC overhead limit exceeded"],
"advice": "检查相关服务 JVM 参数(-Xmx/-Xms、容器/节点内存;结合 GC 日志定位内存泄漏或峰值。",
},
{
"id": "jvm_exit_killed",
"severity": "medium",
"title": "进程异常退出或被杀",
"patterns": [r"ExitCodeException exitCode=143", r"Killed by signal", r"Container killed"],
"advice": "检查是否被资源管理器/系统 OOM killer 杀死;核对 YARN 队列资源与节点资源。",
},
]
def _detect_faults_from_log_text(text: str, max_examples_per_rule: int = 3) -> List[Dict[str, Any]]:
lines = (text or "").splitlines()
hits: List[Dict[str, Any]] = []
for rule in _FAULT_RULES:
patterns = rule.get("patterns") or []
compiled = [re.compile(p, re.IGNORECASE) for p in patterns]
examples: List[Dict[str, Any]] = []
for idx, line in enumerate(lines):
if not line:
continue
if any(rgx.search(line) for rgx in compiled):
examples.append({"lineNo": idx + 1, "line": line[:500]})
if len(examples) >= max_examples_per_rule:
break
if examples:
hits.append(
{
"id": rule.get("id"),
"severity": rule.get("severity"),
"title": rule.get("title"),
"advice": rule.get("advice"),
"examples": examples,
"matchCountApprox": len(examples),
}
)
return hits
async def tool_detect_cluster_faults(
db: AsyncSession,
user_name: str,
cluster_uuid: str,
components: Optional[List[str]] = None,
node_hostname: Optional[str] = None,
lines: int = 200,
) -> Dict[str, Any]:
import uuid as uuidlib
try:
uuidlib.UUID(cluster_uuid)
except ValueError:
return {"status": "error", "message": "invalid_uuid_format"}
comps = components or ["namenode", "resourcemanager"]
comps = [c for c in comps if isinstance(c, str) and c.strip()]
comps = [c.strip().lower() for c in comps]
if not comps:
return {"status": "error", "message": "no_components"}
reads: List[Dict[str, Any]] = []
faults: List[Dict[str, Any]] = []
for comp in comps:
r = await tool_read_cluster_log(
db=db,
user_name=user_name,
cluster_uuid=cluster_uuid,
log_type=comp,
node_hostname=node_hostname,
lines=lines,
)
reads.append({k: r.get(k) for k in ("status", "node", "log_type", "path", "message")})
if r.get("status") != "success":
continue
content = r.get("content") or ""
comp_faults = _detect_faults_from_log_text(content)
for f in comp_faults:
f2 = dict(f)
f2["component"] = comp
f2["node"] = r.get("node")
f2["path"] = r.get("path")
faults.append(f2)
severity_order = {"high": 0, "medium": 1, "low": 2}
faults.sort(key=lambda x: (severity_order.get((x.get("severity") or "").lower(), 9), x.get("id") or ""))
return {
"status": "success",
"cluster_uuid": cluster_uuid,
"components": comps,
"reads": reads,
"faults": faults[:20],
}
_OPS_COMMANDS: Dict[str, Dict[str, Any]] = {
"jps": {"cmd": "jps -lm", "target": "all_nodes"},
"hadoop_version": {"cmd": "hadoop version", "target": "namenode"},
"hdfs_report": {"cmd": "hdfs dfsadmin -report", "target": "namenode"},
"hdfs_safemode_get": {"cmd": "hdfs dfsadmin -safemode get", "target": "namenode"},
"hdfs_ls_root": {"cmd": "hdfs dfs -ls / | head -n 200", "target": "namenode"},
"yarn_node_list": {"cmd": "yarn node -list 2>/dev/null || yarn node -list -all", "target": "resourcemanager"},
"yarn_application_list": {"cmd": "yarn application -list 2>/dev/null || yarn application -list -appStates RUNNING,ACCEPTED,SUBMITTED", "target": "resourcemanager"},
"df_h": {"cmd": "df -h", "target": "all_nodes"},
"free_h": {"cmd": "free -h", "target": "all_nodes"},
"uptime": {"cmd": "uptime", "target": "all_nodes"},
}
async def tool_run_cluster_command(
db: AsyncSession,
user_name: str,
cluster_uuid: str,
command_key: str,
target: Optional[str] = None,
node_hostname: Optional[str] = None,
timeout: int = 30,
limit_nodes: int = 20,
) -> Dict[str, Any]:
import uuid as uuidlib
try:
uuidlib.UUID(cluster_uuid)
except ValueError:
return {"status": "error", "message": "invalid_uuid_format"}
spec = _OPS_COMMANDS.get((command_key or "").strip())
if not spec:
return {"status": "error", "message": "unsupported_command_key"}
stmt = select(Cluster).where(Cluster.uuid == cluster_uuid)
result = await db.execute(stmt)
cluster = result.scalar_one_or_none()
if not cluster:
return {"status": "error", "message": "cluster_not_found"}
if not await _user_has_cluster_access(db, user_name, int(cluster.id)):
return {"status": "error", "message": "cluster_forbidden"}
tgt = (target or spec.get("target") or "namenode").strip().lower()
cmd = str(spec.get("cmd") or "").strip()
if not cmd:
return {"status": "error", "message": "empty_command"}
bash_cmd = f"bash -lc {shlex.quote(cmd)}"
async def _exec_on_node(hostname: str, ip: str, ssh_user: Optional[str], ssh_password: Optional[str]) -> Dict[str, Any]:
def _run():
client = ssh_manager.get_connection(hostname, ip=ip, username=ssh_user, password=ssh_password)
exit_code, out, err = client.execute_command_with_timeout_and_status(bash_cmd, timeout=timeout)
return exit_code, out, err
exit_code, out, err = await asyncio.to_thread(_run)
return {
"node": hostname,
"ip": ip,
"exitCode": int(exit_code),
"stdout": out,
"stderr": err,
}
results: List[Dict[str, Any]] = []
if tgt == "namenode":
if not cluster.namenode_ip or not cluster.namenode_psw:
return {"status": "error", "message": "namenode_not_configured"}
ip = str(cluster.namenode_ip)
node_stmt = select(Node).where(Node.ip_address == cluster.namenode_ip).limit(1)
node_obj = (await db.execute(node_stmt)).scalars().first()
hostname = node_obj.hostname if node_obj else "namenode"
ssh_user = (node_obj.ssh_user if node_obj and node_obj.ssh_user else "hadoop")
results.append(await _exec_on_node(hostname, ip, ssh_user, cluster.namenode_psw))
elif tgt == "resourcemanager":
if not cluster.rm_ip or not cluster.rm_psw:
return {"status": "error", "message": "resourcemanager_not_configured"}
ip = str(cluster.rm_ip)
node_stmt = select(Node).where(Node.ip_address == cluster.rm_ip).limit(1)
node_obj = (await db.execute(node_stmt)).scalars().first()
hostname = node_obj.hostname if node_obj else "resourcemanager"
ssh_user = (node_obj.ssh_user if node_obj and node_obj.ssh_user else "hadoop")
results.append(await _exec_on_node(hostname, ip, ssh_user, cluster.rm_psw))
elif tgt == "node":
if not node_hostname:
return {"status": "error", "message": "node_hostname_required"}
node = await _find_accessible_node(db, user_name, node_hostname)
if not node:
return {"status": "error", "message": "node_not_found"}
results.append(await _exec_on_node(node.hostname, str(node.ip_address), node.ssh_user or "hadoop", node.ssh_password))
elif tgt == "all_nodes":
nodes_stmt = select(Node).where(Node.cluster_id == cluster.id).limit(limit_nodes)
nodes = (await db.execute(nodes_stmt)).scalars().all()
for n in nodes:
n2 = await _find_accessible_node(db, user_name, n.hostname)
if not n2:
continue
results.append(await _exec_on_node(n2.hostname, str(n2.ip_address), n2.ssh_user or "hadoop", n2.ssh_password))
else:
return {"status": "error", "message": "invalid_target"}
start = _now()
exec_id = f"tool_{start.timestamp():.0f}"
await _write_exec_log(db, exec_id, "run_cluster_command", "success", start, _now(), 0, user_name)
return {
"status": "success",
"cluster_uuid": cluster_uuid,
"command_key": command_key,
"target": tgt,
"executed": cmd,
"results": results,
}
def openai_tools_schema() -> List[Dict[str, Any]]:
"""返回 OpenAI 兼容的工具定义Function Calling"""
return [
@ -358,4 +749,60 @@ def openai_tools_schema() -> List[Dict[str, Any]]:
},
},
},
]
{
"type": "function",
"function": {
"name": "read_cluster_log",
"description": "读取集群中特定组件的日志(如 namenode, datanode, resourcemanager",
"parameters": {
"type": "object",
"properties": {
"cluster_uuid": {"type": "string", "description": "集群的 UUID"},
"log_type": {
"type": "string",
"description": "组件类型,例如 namenode, datanode, resourcemanager, nodemanager, historyserver"
},
"node_hostname": {"type": "string", "description": "可选:指定节点的主机名。如果是 datanode 等非唯一组件,建议提供。"},
"lines": {"type": "integer", "default": 100, "description": "读取的行数"},
},
"required": ["cluster_uuid", "log_type"],
},
},
},
{
"type": "function",
"function": {
"name": "detect_cluster_faults",
"description": "基于集群组件日志识别常见故障并输出结构化结果",
"parameters": {
"type": "object",
"properties": {
"cluster_uuid": {"type": "string", "description": "集群的 UUID"},
"components": {"type": "array", "items": {"type": "string"}, "description": "要分析的组件列表,例如 [namenode, resourcemanager, datanode]"},
"node_hostname": {"type": "string", "description": "可选:指定节点主机名(适用于 datanode 等多实例组件)"},
"lines": {"type": "integer", "default": 200, "description": "每个组件读取的行数"},
},
"required": ["cluster_uuid"],
},
},
},
{
"type": "function",
"function": {
"name": "run_cluster_command",
"description": "在集群节点上执行常用运维命令(白名单)并返回结果",
"parameters": {
"type": "object",
"properties": {
"cluster_uuid": {"type": "string", "description": "集群的 UUID"},
"command_key": {"type": "string", "description": "命令标识,例如 jps, hdfs_report, yarn_node_list, df_h"},
"target": {"type": "string", "description": "执行目标namenode/resourcemanager/node/all_nodes不传则按命令默认目标"},
"node_hostname": {"type": "string", "description": "target=node 时必填"},
"timeout": {"type": "integer", "default": 30},
"limit_nodes": {"type": "integer", "default": 20, "description": "target=all_nodes 时最多执行的节点数"},
},
"required": ["cluster_uuid", "command_key"],
},
},
},
]

@ -65,6 +65,12 @@ class SSHClient:
stdin, stdout, stderr = self.client.exec_command(command)
return stdout.read().decode(), stderr.read().decode()
def execute_command_with_status(self, command: str) -> tuple:
self._ensure_connected()
stdin, stdout, stderr = self.client.exec_command(command)
exit_code = stdout.channel.recv_exit_status()
return exit_code, stdout.read().decode(), stderr.read().decode()
def execute_command_with_timeout(self, command: str, timeout: int = 30) -> tuple:
"""Execute command with timeout"""
@ -72,6 +78,12 @@ class SSHClient:
stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
return stdout.read().decode(), stderr.read().decode()
def execute_command_with_timeout_and_status(self, command: str, timeout: int = 30) -> tuple:
self._ensure_connected()
stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout)
exit_code = stdout.channel.recv_exit_status()
return exit_code, stdout.read().decode(), stderr.read().decode()
def read_file(self, file_path: str) -> str:
"""Read file content from remote server"""

@ -0,0 +1,72 @@
# Hadoop 集群启动与停止接口前端联调指南
本文档提供了 Hadoop 集群启动与停止相关 API 的详细说明,用于指导前端开发人员进行接口对接。
## 1. 接口基本信息
| 功能 | 请求方法 | 接口路径 | 权限要求 |
| :--- | :--- | :--- | :--- |
| **启动集群** | `POST` | `/api/v1/ops/clusters/{cluster_uuid}/start` | `cluster:start` |
| **停止集群** | `POST` | `/api/v1/ops/clusters/{cluster_uuid}/stop` | `cluster:stop` |
- **Base URL**: `http://<server-ip>:<port>`
- **Content-Type**: `application/json`
- **认证方式**: 需要在 Header 中携带有效 JWT Token`Authorization: Bearer <your_token>`
## 2. 请求参数 (Path Parameters)
| 参数名 | 类型 | 必选 | 说明 |
| :--- | :--- | :--- | :--- |
| `cluster_uuid` | `string` | 是 | 集群的唯一标识符UUID可从 `/api/v1/clusters` 接口获取。 |
## 3. 响应结构
### 3.1 成功响应 (200 OK)
接口执行时间较长(涉及远程 SSH 指令),建议前端超时时间设置为 **60s**
```json
{
"status": "success",
"logs": [
"NameNode (192.168.1.10) start: Starting namenodes on [localhost]\nlocalhost: starting namenode...",
"ResourceManager (192.168.1.11) start: Starting resourcemanager..."
]
}
```
**字段说明:**
- `status`: 固定为 `"success"`
- `logs`: 字符串数组包含各关键组件NameNode, ResourceManager执行脚本后的标准输出与错误信息。
### 3.2 错误响应
- **401 Unauthorized**: 未提供 Token 或 Token 已失效。
- **403 Forbidden**: 权限不足(仅 `admin``ops` 角色可操作)。
- **404 Not Found**: 集群 UUID 不存在。
- **400 Bad Request**: 请求参数错误。
- `{"detail": "invalid_uuid_format"}`: 传入的 UUID 格式不正确(例如前端误传了 `[object Object]`)。
- **500 Internal Server Error**: 后端连接 SSH 超时或内部逻辑错误。
## 4. 前端联调建议
1. **Loading 状态**: 由于是长耗时操作UI 必须提供明确的加载提示,并禁用操作按钮以防重发。
2. **日志展示**: 建议将返回的 `logs` 数组内容渲染在侧边栏或弹窗的日志终端组件中。
3. **超时处理**: 请务必在 Axios 或 Fetch 配置中显式设置 `timeout: 60000`
4. **状态刷新**: 操作成功后,建议前端重新触发一次集群状态列表查询,以获取最新的 `health_status`
## 5. 代码参考 (JavaScript/Axios)
```javascript
import axios from 'axios';
const clusterApi = {
async controlCluster(uuid, action) {
// action: 'start' 或 'stop'
const response = await axios.post(`/api/v1/ops/clusters/${uuid}/${action}`, {}, {
timeout: 60000
});
return response.data;
}
};
```

@ -1,9 +1,11 @@
import httpx
import asyncio
import json
import os
import pytest
async def test_register():
url = "http://localhost:8000/api/v1/user/register"
async def _run_register_checks(base_url: str):
url = f"{base_url.rstrip('/')}/api/v1/user/register"
# 1. 测试字段缺失 (422)
print("\n1. Testing missing field...")
@ -45,5 +47,12 @@ async def test_register():
print(f"Status: {r.status_code}")
print(f"Response: {r.text}")
def test_register_fix_e2e():
base_url = os.getenv("E2E_BASE_URL", "").strip()
if not base_url:
pytest.skip("需要设置 E2E_BASE_URL 并启动后端服务")
asyncio.run(_run_register_checks(base_url))
if __name__ == "__main__":
asyncio.run(test_register())
url = os.getenv("E2E_BASE_URL", "http://localhost:8000").strip()
asyncio.run(_run_register_checks(url))

Loading…
Cancel
Save