You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
ErrorDetecting/backend/app/services/ops_tools.py

821 lines
34 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import shlex
import asyncio
from typing import Any, Dict, List, Optional, Tuple
from datetime import datetime, timezone
import json
import re
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, text
import requests
from bs4 import BeautifulSoup
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from ..models.nodes import Node
from ..models.clusters import Cluster
from ..models.hadoop_exec_logs import HadoopExecLog
from ..ssh_utils import SSHClient, ssh_manager
from ..log_reader import log_reader
from ..config import now_bj
def _now() -> datetime:
"""返回当前 UTC 时间。"""
return now_bj()
async def _find_accessible_node(db: AsyncSession, user_name: str, hostname: str) -> Optional[Node]:
"""校验用户对节点的访问权限,并返回节点对象。"""
uid_res = await db.execute(text("SELECT id FROM users WHERE username=:un LIMIT 1"), {"un": user_name})
uid_row = uid_res.first()
if not uid_row:
return None
ids_res = await db.execute(text("SELECT cluster_id FROM user_cluster_mapping WHERE user_id=:uid"), {"uid": uid_row[0]})
cluster_ids = [r[0] for r in ids_res.all()]
if not cluster_ids:
return None
res = await db.execute(select(Node).where(Node.hostname == hostname, Node.cluster_id.in_(cluster_ids)).limit(1))
return res.scalars().first()
async def _user_has_cluster_access(db: AsyncSession, user_name: str, cluster_id: int) -> bool:
uid_res = await db.execute(text("SELECT id FROM users WHERE username=:un LIMIT 1"), {"un": user_name})
uid_row = uid_res.first()
if not uid_row:
return False
ok_res = await db.execute(
text("SELECT 1 FROM user_cluster_mapping WHERE user_id=:uid AND cluster_id=:cid LIMIT 1"),
{"uid": uid_row[0], "cid": cluster_id},
)
return ok_res.first() is not None
async def _write_exec_log(db: AsyncSession, exec_id: str, command_type: str, status: str, start: datetime, end: Optional[datetime], exit_code: Optional[int], operator: str, stdout: Optional[str] = None, stderr: Optional[str] = None):
"""写入执行审计日志。"""
# 查找 from_user_id 和 cluster_name
uid_res = await db.execute(text("SELECT id FROM users WHERE username=:un LIMIT 1"), {"un": operator})
uid_row = uid_res.first()
from_user_id = uid_row[0] if uid_row else 1
# 获取集群名称 (这里简化逻辑,取用户关联的第一个集群)
cluster_res = await db.execute(text("""
SELECT c.name
FROM clusters c
JOIN user_cluster_mapping m ON c.id = m.cluster_id
WHERE m.user_id = :uid LIMIT 1
"""), {"uid": from_user_id})
cluster_row = cluster_res.first()
cluster_name = cluster_row[0] if cluster_row else "default_cluster"
row = HadoopExecLog(
from_user_id=from_user_id,
cluster_name=cluster_name,
description=f"[{command_type}] {exec_id}",
start_time=start,
end_time=end
)
db.add(row)
await db.flush()
await db.commit()
async def tool_read_log(db: AsyncSession, user_name: str, node: str, path: str, lines: int = 200, pattern: Optional[str] = None, ssh_user: Optional[str] = None, timeout: int = 20) -> Dict[str, Any]:
"""工具:读取远端日志并可选筛选。"""
n = await _find_accessible_node(db, user_name, node)
if not n:
return {"error": "node_not_found"}
if not getattr(n, "ssh_password", None):
return {"error": "ssh_password_not_configured"}
path_q = shlex.quote(path)
cmd = f"tail -n {lines} {path_q}"
if pattern:
pat_q = shlex.quote(pattern)
cmd = f"{cmd} | grep -E {pat_q}"
start = _now()
bash_cmd = f"bash -lc {shlex.quote(cmd)}"
def _run():
client = ssh_manager.get_connection(
str(getattr(n, "hostname", node)),
ip=str(getattr(n, "ip_address", "")),
username=(ssh_user or getattr(n, "ssh_user", None) or "hadoop"),
password=str(getattr(n, "ssh_password", "")),
)
return client.execute_command_with_timeout_and_status(bash_cmd, timeout=timeout)
code, out, err = await asyncio.to_thread(_run)
end = _now()
exec_id = f"tool_{start.timestamp():.0f}"
await _write_exec_log(db, exec_id, "read_log", ("success" if code == 0 else "failed"), start, end, code, user_name, out, err)
return {"execId": exec_id, "exitCode": code, "stdout": out, "stderr": err}
async def _fetch_page_text(client: httpx.AsyncClient, url: str) -> str:
"""Fetch and extract text content from a URL."""
try:
# Skip if not a valid http url
if not url.startswith("http"):
return ""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
}
resp = await client.get(url, headers=headers, follow_redirects=True)
if resp.status_code == 200:
soup = BeautifulSoup(resp.text, "html.parser")
# Remove scripts and styles
for script in soup(["script", "style", "nav", "footer", "header"]):
script.decompose()
text = soup.get_text(separator="\n", strip=True)
# Limit text length
return text[:2000]
except Exception:
pass
return ""
async def tool_web_search(query: str, max_results: int = 5) -> Dict[str, Any]:
"""工具联网搜索Baidu并读取网页内容。"""
try:
results = []
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1"
}
url = "https://www.baidu.com/s"
params = {"wd": query}
# Use sync requests for search page (stable)
resp = requests.get(url, params=params, headers=headers, timeout=10, verify=False)
if resp.status_code == 200:
soup = BeautifulSoup(resp.text, "html.parser")
# Baidu results are usually in div with class c-container
for item in soup.select("div.c-container, div.result.c-container")[:max_results]:
title_elem = item.select_one("h3")
if not title_elem:
continue
title = title_elem.get_text(strip=True)
link_elem = item.select_one("a")
href = link_elem.get("href") if link_elem else ""
# Abstract/Snippet
snippet = item.get_text(strip=True).replace(title, "")[:200]
results.append({
"title": title,
"href": href,
"body": snippet,
"full_content": "" # Placeholder
})
# Fetch full content for top 2 results
if results:
async with httpx.AsyncClient(timeout=10, verify=False) as client:
tasks = []
# Only fetch top 2 to avoid long wait
for r in results[:2]:
tasks.append(_fetch_page_text(client, r["href"]))
contents = await asyncio.gather(*tasks)
for i, content in enumerate(contents):
if content:
results[i]["full_content"] = content
# Append note to body to indicate full content is available
results[i]["body"] += "\n[Full content fetched]"
# Add current system time to help with "now" queries
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S %A")
return {"query": query, "current_time": current_time, "results": results}
except Exception as e:
return {"error": str(e)}
async def tool_start_cluster(db: AsyncSession, user_name: str, cluster_uuid: str) -> Dict[str, Any]:
"""工具:启动 Hadoop 集群。"""
# 1. 权限与用户
uid_res = await db.execute(text("SELECT id FROM users WHERE username=:un LIMIT 1"), {"un": user_name})
uid_row = uid_res.first()
user_id = uid_row[0] if uid_row else 1
# 2. 查找集群
res = await db.execute(select(Cluster).where(Cluster.uuid == cluster_uuid).limit(1))
cluster = res.scalars().first()
if not cluster:
return {"error": "cluster_not_found"}
# 3. 获取 SSH 用户 (从关联节点中获取,默认为 hadoop)
node_res = await db.execute(select(Node).where(Node.cluster_id == cluster.id).limit(1))
node = node_res.scalars().first()
ssh_user = node.ssh_user if node and node.ssh_user else "hadoop"
start_time = _now()
logs = []
# 4. 在 NameNode 执行 start-dfs.sh
if cluster.namenode_ip and cluster.namenode_psw:
try:
def run_nn_start():
with SSHClient(str(cluster.namenode_ip), ssh_user, cluster.namenode_psw) as client:
return client.execute_command("start-dfs.sh")
out, err = await asyncio.to_thread(run_nn_start)
logs.append(f"NameNode ({cluster.namenode_ip}) start: {out} {err}")
except Exception as e:
logs.append(f"NameNode ({cluster.namenode_ip}) start failed: {str(e)}")
# 5. 在 ResourceManager 执行 start-yarn.sh
if cluster.rm_ip and cluster.rm_psw:
try:
def run_rm_start():
with SSHClient(str(cluster.rm_ip), ssh_user, cluster.rm_psw) as client:
return client.execute_command("start-yarn.sh")
out, err = await asyncio.to_thread(run_rm_start)
logs.append(f"ResourceManager ({cluster.rm_ip}) start: {out} {err}")
except Exception as e:
logs.append(f"ResourceManager ({cluster.rm_ip}) start failed: {str(e)}")
end_time = _now()
# 6. 更新集群状态 (改进:检查是否有失败日志)
has_failed = any("failed" in log.lower() for log in logs)
if not has_failed:
cluster.health_status = "healthy"
else:
cluster.health_status = "error"
cluster.updated_at = end_time
await db.flush()
# 7. 记录日志
full_desc = " | ".join(logs)
exec_row = HadoopExecLog(
from_user_id=user_id,
cluster_name=cluster.name,
description=f"AI Tool Start Cluster: {full_desc}",
start_time=start_time,
end_time=end_time
)
db.add(exec_row)
await db.commit()
return {"status": "success", "logs": logs}
async def tool_stop_cluster(db: AsyncSession, user_name: str, cluster_uuid: str) -> Dict[str, Any]:
"""工具:停止 Hadoop 集群。"""
uid_res = await db.execute(text("SELECT id FROM users WHERE username=:un LIMIT 1"), {"un": user_name})
uid_row = uid_res.first()
user_id = uid_row[0] if uid_row else 1
res = await db.execute(select(Cluster).where(Cluster.uuid == cluster_uuid).limit(1))
cluster = res.scalars().first()
if not cluster:
return {"error": "cluster_not_found"}
node_res = await db.execute(select(Node).where(Node.cluster_id == cluster.id).limit(1))
node = node_res.scalars().first()
ssh_user = node.ssh_user if node and node.ssh_user else "hadoop"
start_time = _now()
logs = []
if cluster.namenode_ip and cluster.namenode_psw:
try:
def run_nn_stop():
with SSHClient(str(cluster.namenode_ip), ssh_user, cluster.namenode_psw) as client:
return client.execute_command("stop-dfs.sh")
out, err = await asyncio.to_thread(run_nn_stop)
logs.append(f"NameNode ({cluster.namenode_ip}) stop: {out} {err}")
except Exception as e:
logs.append(f"NameNode ({cluster.namenode_ip}) stop failed: {str(e)}")
if cluster.rm_ip and cluster.rm_psw:
try:
def run_rm_stop():
with SSHClient(str(cluster.rm_ip), ssh_user, cluster.rm_psw) as client:
return client.execute_command("stop-yarn.sh")
out, err = await asyncio.to_thread(run_rm_stop)
logs.append(f"ResourceManager ({cluster.rm_ip}) stop: {out} {err}")
except Exception as e:
logs.append(f"ResourceManager ({cluster.rm_ip}) stop failed: {str(e)}")
end_time = _now()
cluster.health_status = "unknown"
cluster.updated_at = end_time
await db.flush()
full_desc = " | ".join(logs)
exec_row = HadoopExecLog(
from_user_id=user_id,
cluster_name=cluster.name,
description=f"AI Tool Stop Cluster: {full_desc}",
start_time=start_time,
end_time=end_time
)
db.add(exec_row)
await db.commit()
return {"status": "success", "logs": logs}
async def tool_read_cluster_log(
db: AsyncSession,
user_name: str,
cluster_uuid: str,
log_type: str,
node_hostname: Optional[str] = None,
lines: int = 100,
) -> Dict[str, Any]:
"""读取集群中特定服务类型的日志。"""
import uuid as uuidlib
try:
uuidlib.UUID(cluster_uuid)
except ValueError:
return {"status": "error", "message": "invalid_uuid_format"}
stmt = select(Cluster).where(Cluster.uuid == cluster_uuid)
result = await db.execute(stmt)
cluster = result.scalar_one_or_none()
if not cluster:
return {"status": "error", "message": "cluster_not_found"}
if not await _user_has_cluster_access(db, user_name, int(cluster.id)):
return {"status": "error", "message": "cluster_forbidden"}
target_ip: Optional[str] = None
target_hostname: Optional[str] = node_hostname
ssh_user: Optional[str] = None
ssh_password: Optional[str] = None
if log_type.lower() == "namenode":
target_ip = str(cluster.namenode_ip) if cluster.namenode_ip else None
ssh_password = cluster.namenode_psw
if not target_hostname:
node_stmt = select(Node).where(Node.ip_address == cluster.namenode_ip)
node_res = await db.execute(node_stmt)
node_obj = node_res.scalar_one_or_none()
target_hostname = node_obj.hostname if node_obj else "namenode"
if node_obj and node_obj.ssh_user:
ssh_user = node_obj.ssh_user
elif log_type.lower() == "resourcemanager":
target_ip = str(cluster.rm_ip) if cluster.rm_ip else None
ssh_password = cluster.rm_psw
if not target_hostname:
node_stmt = select(Node).where(Node.ip_address == cluster.rm_ip)
node_res = await db.execute(node_stmt)
node_obj = node_res.scalar_one_or_none()
target_hostname = node_obj.hostname if node_obj else "resourcemanager"
if node_obj and node_obj.ssh_user:
ssh_user = node_obj.ssh_user
if not target_ip and target_hostname:
node = await _find_accessible_node(db, user_name, target_hostname)
if not node:
return {"status": "error", "message": "node_not_found"}
target_ip = str(node.ip_address)
ssh_user = node.ssh_user or ssh_user
ssh_password = node.ssh_password or ssh_password
if not target_ip:
return {"status": "error", "message": f"could_not_determine_node_for_{log_type}"}
if not target_hostname:
target_hostname = target_ip
def _tail_via_ssh() -> Dict[str, Any]:
ip = str(target_ip)
hn = str(target_hostname)
log_reader.find_working_log_dir(hn, ip)
ssh_client = ssh_manager.get_connection(hn, ip=ip, username=ssh_user, password=ssh_password)
paths = log_reader.get_log_file_paths(hn, log_type.lower())
for p in paths:
p_q = shlex.quote(p)
out, err = ssh_client.execute_command(f"ls -la {p_q} 2>/dev/null")
if err or not out.strip():
continue
out2, err2 = ssh_client.execute_command(f"tail -n {int(lines)} {p_q} 2>/dev/null")
if err2:
continue
return {"status": "success", "node": hn, "log_type": log_type, "path": p, "content": out2}
base_dir = log_reader._node_log_dir.get(hn, log_reader.log_dir)
base_q = shlex.quote(base_dir)
out, err = ssh_client.execute_command(f"ls -1 {base_q} 2>/dev/null")
if err or not out.strip():
return {"status": "error", "message": "log_dir_not_found", "node": hn}
for fn in out.splitlines():
f = (fn or "").strip()
lf = f.lower()
if not f:
continue
if log_type.lower() in lf and hn.lower() in lf and (lf.endswith(".log") or lf.endswith(".out") or lf.endswith(".out.1")):
full = f"{base_dir}/{f}"
full_q = shlex.quote(full)
out2, err2 = ssh_client.execute_command(f"tail -n {int(lines)} {full_q} 2>/dev/null")
if not err2:
return {"status": "success", "node": hn, "log_type": log_type, "path": full, "content": out2}
return {"status": "error", "message": "log_file_not_found", "node": hn}
return await asyncio.to_thread(_tail_via_ssh)
_FAULT_RULES: List[Dict[str, Any]] = [
{
"id": "hdfs_safemode",
"severity": "high",
"title": "NameNode 处于 SafeMode",
"patterns": [r"SafeModeException", r"NameNode is in safe mode", r"Safe mode is ON"],
"advice": "检查 DataNode 是否全部注册、磁盘与网络是否正常;必要时执行 hdfs dfsadmin -safemode leave。",
},
{
"id": "hdfs_standby",
"severity": "high",
"title": "访问到 Standby NameNode",
"patterns": [r"StandbyException", r"Operation category READ is not supported in state standby"],
"advice": "确认客户端的 fs.defaultFS/HA 配置;确认 active/standby 切换状态是否正确。",
},
{
"id": "rpc_connection_refused",
"severity": "high",
"title": "RPC 连接被拒绝或目标服务未启动",
"patterns": [r"java\.net\.ConnectException:\s*Connection refused", r"Call to .* failed on local exception", r"Connection refused"],
"advice": "确认对应守护进程是否存活、端口是否监听、iptables/安全组是否放通。",
},
{
"id": "dns_or_route",
"severity": "high",
"title": "DNS/网络不可达",
"patterns": [r"UnknownHostException", r"No route to host", r"Network is unreachable", r"Connection timed out"],
"advice": "检查 DNS 解析、/etc/hosts、一致的主机名配置与网络连通性。",
},
{
"id": "disk_no_space",
"severity": "high",
"title": "磁盘空间不足",
"patterns": [r"No space left on device", r"DiskOutOfSpaceException", r"ENOSPC"],
"advice": "清理磁盘、检查日志/临时目录增长;确认 DataNode 存储目录剩余空间。",
},
{
"id": "permission_denied",
"severity": "medium",
"title": "权限不足或 HDFS ACL/权限问题",
"patterns": [r"Permission denied", r"AccessControlException"],
"advice": "检查用户/组映射、HDFS 权限与 ACL确认相关目录权限与 umask。",
},
{
"id": "kerberos_auth",
"severity": "high",
"title": "Kerberos 认证失败",
"patterns": [r"GSSException", r"Failed to find any Kerberos tgt", r"Client cannot authenticate via:\s*\[TOKEN, KERBEROS\]"],
"advice": "检查 KDC、keytab、principal、时间同步确认客户端已 kinit 且票据未过期。",
},
{
"id": "oom",
"severity": "high",
"title": "Java 内存溢出",
"patterns": [r"OutOfMemoryError", r"Java heap space", r"GC overhead limit exceeded"],
"advice": "检查相关服务 JVM 参数(-Xmx/-Xms、容器/节点内存;结合 GC 日志定位内存泄漏或峰值。",
},
{
"id": "jvm_exit_killed",
"severity": "medium",
"title": "进程异常退出或被杀",
"patterns": [r"ExitCodeException exitCode=143", r"Killed by signal", r"Container killed"],
"advice": "检查是否被资源管理器/系统 OOM killer 杀死;核对 YARN 队列资源与节点资源。",
},
]
def _detect_faults_from_log_text(text: str, max_examples_per_rule: int = 3) -> List[Dict[str, Any]]:
lines = (text or "").splitlines()
hits: List[Dict[str, Any]] = []
for rule in _FAULT_RULES:
patterns = rule.get("patterns") or []
compiled = [re.compile(p, re.IGNORECASE) for p in patterns]
examples: List[Dict[str, Any]] = []
for idx, line in enumerate(lines):
if not line:
continue
if any(rgx.search(line) for rgx in compiled):
examples.append({"lineNo": idx + 1, "line": line[:500]})
if len(examples) >= max_examples_per_rule:
break
if examples:
hits.append(
{
"id": rule.get("id"),
"severity": rule.get("severity"),
"title": rule.get("title"),
"advice": rule.get("advice"),
"examples": examples,
"matchCountApprox": len(examples),
}
)
return hits
async def tool_detect_cluster_faults(
db: AsyncSession,
user_name: str,
cluster_uuid: str,
components: Optional[List[str]] = None,
node_hostname: Optional[str] = None,
lines: int = 200,
) -> Dict[str, Any]:
import uuid as uuidlib
try:
uuidlib.UUID(cluster_uuid)
except ValueError:
return {"status": "error", "message": "invalid_uuid_format"}
comps = components or ["namenode", "resourcemanager"]
comps = [c for c in comps if isinstance(c, str) and c.strip()]
comps = [c.strip().lower() for c in comps]
if not comps:
return {"status": "error", "message": "no_components"}
reads: List[Dict[str, Any]] = []
faults: List[Dict[str, Any]] = []
for comp in comps:
r = await tool_read_cluster_log(
db=db,
user_name=user_name,
cluster_uuid=cluster_uuid,
log_type=comp,
node_hostname=node_hostname,
lines=lines,
)
reads.append({k: r.get(k) for k in ("status", "node", "log_type", "path", "message")})
if r.get("status") != "success":
continue
content = r.get("content") or ""
comp_faults = _detect_faults_from_log_text(content)
for f in comp_faults:
f2 = dict(f)
f2["component"] = comp
f2["node"] = r.get("node")
f2["path"] = r.get("path")
faults.append(f2)
severity_order = {"high": 0, "medium": 1, "low": 2}
faults.sort(key=lambda x: (severity_order.get((x.get("severity") or "").lower(), 9), x.get("id") or ""))
return {
"status": "success",
"cluster_uuid": cluster_uuid,
"components": comps,
"reads": reads,
"faults": faults[:20],
}
_OPS_COMMANDS: Dict[str, Dict[str, Any]] = {
"jps": {"cmd": "jps -lm", "target": "all_nodes"},
"hadoop_version": {"cmd": "hadoop version", "target": "namenode"},
"hdfs_report": {"cmd": "hdfs dfsadmin -report", "target": "namenode"},
"hdfs_safemode_get": {"cmd": "hdfs dfsadmin -safemode get", "target": "namenode"},
"hdfs_ls_root": {"cmd": "hdfs dfs -ls / | head -n 200", "target": "namenode"},
"yarn_node_list": {"cmd": "yarn node -list 2>/dev/null || yarn node -list -all", "target": "resourcemanager"},
"yarn_application_list": {"cmd": "yarn application -list 2>/dev/null || yarn application -list -appStates RUNNING,ACCEPTED,SUBMITTED", "target": "resourcemanager"},
"df_h": {"cmd": "df -h", "target": "all_nodes"},
"free_h": {"cmd": "free -h", "target": "all_nodes"},
"uptime": {"cmd": "uptime", "target": "all_nodes"},
}
async def tool_run_cluster_command(
db: AsyncSession,
user_name: str,
cluster_uuid: str,
command_key: str,
target: Optional[str] = None,
node_hostname: Optional[str] = None,
timeout: int = 30,
limit_nodes: int = 20,
) -> Dict[str, Any]:
import uuid as uuidlib
try:
uuidlib.UUID(cluster_uuid)
except ValueError:
return {"status": "error", "message": "invalid_uuid_format"}
spec = _OPS_COMMANDS.get((command_key or "").strip())
if not spec:
return {"status": "error", "message": "unsupported_command_key"}
stmt = select(Cluster).where(Cluster.uuid == cluster_uuid)
result = await db.execute(stmt)
cluster = result.scalar_one_or_none()
if not cluster:
return {"status": "error", "message": "cluster_not_found"}
if not await _user_has_cluster_access(db, user_name, int(cluster.id)):
return {"status": "error", "message": "cluster_forbidden"}
tgt = (target or spec.get("target") or "namenode").strip().lower()
cmd = str(spec.get("cmd") or "").strip()
if not cmd:
return {"status": "error", "message": "empty_command"}
bash_cmd = f"bash -lc {shlex.quote(cmd)}"
async def _exec_on_node(hostname: str, ip: str, ssh_user: Optional[str], ssh_password: Optional[str]) -> Dict[str, Any]:
def _run():
client = ssh_manager.get_connection(hostname, ip=ip, username=ssh_user, password=ssh_password)
exit_code, out, err = client.execute_command_with_timeout_and_status(bash_cmd, timeout=timeout)
return exit_code, out, err
exit_code, out, err = await asyncio.to_thread(_run)
return {
"node": hostname,
"ip": ip,
"exitCode": int(exit_code),
"stdout": out,
"stderr": err,
}
results: List[Dict[str, Any]] = []
if tgt == "namenode":
if not cluster.namenode_ip or not cluster.namenode_psw:
return {"status": "error", "message": "namenode_not_configured"}
ip = str(cluster.namenode_ip)
node_stmt = select(Node).where(Node.ip_address == cluster.namenode_ip).limit(1)
node_obj = (await db.execute(node_stmt)).scalars().first()
hostname = node_obj.hostname if node_obj else "namenode"
ssh_user = (node_obj.ssh_user if node_obj and node_obj.ssh_user else "hadoop")
results.append(await _exec_on_node(hostname, ip, ssh_user, cluster.namenode_psw))
elif tgt == "resourcemanager":
if not cluster.rm_ip or not cluster.rm_psw:
return {"status": "error", "message": "resourcemanager_not_configured"}
ip = str(cluster.rm_ip)
node_stmt = select(Node).where(Node.ip_address == cluster.rm_ip).limit(1)
node_obj = (await db.execute(node_stmt)).scalars().first()
hostname = node_obj.hostname if node_obj else "resourcemanager"
ssh_user = (node_obj.ssh_user if node_obj and node_obj.ssh_user else "hadoop")
results.append(await _exec_on_node(hostname, ip, ssh_user, cluster.rm_psw))
elif tgt == "node":
if not node_hostname:
return {"status": "error", "message": "node_hostname_required"}
node = await _find_accessible_node(db, user_name, node_hostname)
if not node:
return {"status": "error", "message": "node_not_found"}
results.append(await _exec_on_node(node.hostname, str(node.ip_address), node.ssh_user or "hadoop", node.ssh_password))
elif tgt == "all_nodes":
nodes_stmt = select(Node).where(Node.cluster_id == cluster.id).limit(limit_nodes)
nodes = (await db.execute(nodes_stmt)).scalars().all()
for n in nodes:
n2 = await _find_accessible_node(db, user_name, n.hostname)
if not n2:
continue
results.append(await _exec_on_node(n2.hostname, str(n2.ip_address), n2.ssh_user or "hadoop", n2.ssh_password))
else:
return {"status": "error", "message": "invalid_target"}
start = _now()
exec_id = f"tool_{start.timestamp():.0f}"
await _write_exec_log(db, exec_id, "run_cluster_command", "success", start, _now(), 0, user_name)
return {
"status": "success",
"cluster_uuid": cluster_uuid,
"command_key": command_key,
"target": tgt,
"executed": cmd,
"results": results,
}
def openai_tools_schema() -> List[Dict[str, Any]]:
"""返回 OpenAI 兼容的工具定义Function Calling"""
return [
{
"type": "function",
"function": {
"name": "read_log",
"description": "读取指定节点的日志文件并可按正则筛选",
"parameters": {
"type": "object",
"properties": {
"node": {"type": "string"},
"path": {"type": "string"},
"lines": {"type": "integer", "default": 200},
"pattern": {"type": "string"},
"sshUser": {"type": "string"},
},
"required": ["node", "path"],
},
},
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "联网搜索互联网公开信息,当遇到未知错误码、技术名词或需要外部资料时使用",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索关键词"},
"max_results": {"type": "integer", "default": 5},
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "start_cluster",
"description": "启动指定的 Hadoop 集群",
"parameters": {
"type": "object",
"properties": {
"cluster_uuid": {"type": "string", "description": "集群的 UUID"},
},
"required": ["cluster_uuid"],
},
},
},
{
"type": "function",
"function": {
"name": "stop_cluster",
"description": "停止指定的 Hadoop 集群",
"parameters": {
"type": "object",
"properties": {
"cluster_uuid": {"type": "string", "description": "集群的 UUID"},
},
"required": ["cluster_uuid"],
},
},
},
{
"type": "function",
"function": {
"name": "read_cluster_log",
"description": "读取集群中特定组件的日志(如 namenode, datanode, resourcemanager",
"parameters": {
"type": "object",
"properties": {
"cluster_uuid": {"type": "string", "description": "集群的 UUID"},
"log_type": {
"type": "string",
"description": "组件类型,例如 namenode, datanode, resourcemanager, nodemanager, historyserver"
},
"node_hostname": {"type": "string", "description": "可选:指定节点的主机名。如果是 datanode 等非唯一组件,建议提供。"},
"lines": {"type": "integer", "default": 100, "description": "读取的行数"},
},
"required": ["cluster_uuid", "log_type"],
},
},
},
{
"type": "function",
"function": {
"name": "detect_cluster_faults",
"description": "基于集群组件日志识别常见故障并输出结构化结果",
"parameters": {
"type": "object",
"properties": {
"cluster_uuid": {"type": "string", "description": "集群的 UUID"},
"components": {"type": "array", "items": {"type": "string"}, "description": "要分析的组件列表,例如 [namenode, resourcemanager, datanode]"},
"node_hostname": {"type": "string", "description": "可选:指定节点主机名(适用于 datanode 等多实例组件)"},
"lines": {"type": "integer", "default": 200, "description": "每个组件读取的行数"},
},
"required": ["cluster_uuid"],
},
},
},
{
"type": "function",
"function": {
"name": "run_cluster_command",
"description": "在集群节点上执行常用运维命令(白名单)并返回结果",
"parameters": {
"type": "object",
"properties": {
"cluster_uuid": {"type": "string", "description": "集群的 UUID"},
"command_key": {"type": "string", "description": "命令标识,例如 jps, hdfs_report, yarn_node_list, df_h"},
"target": {"type": "string", "description": "执行目标namenode/resourcemanager/node/all_nodes不传则按命令默认目标"},
"node_hostname": {"type": "string", "description": "target=node 时必填"},
"timeout": {"type": "integer", "default": 30},
"limit_nodes": {"type": "integer", "default": 20, "description": "target=all_nodes 时最多执行的节点数"},
},
"required": ["cluster_uuid", "command_key"],
},
},
},
]