@ -23,18 +23,12 @@ from ..config import now_bj
def _now ( ) - > datetime :
""" 返回当前 北京 时间。"""
""" 返回当前 UTC 时间。"""
return now_bj ( )
async def _find_accessible_node ( db : AsyncSession , user_name : str , hostname : str ) - > Optional [ Node ] :
"""
根据用户名和主机名查找用户有权访问的节点 。
1. 通过用户名查询用户 ID 。
2. 查询该用户关联的所有集群 ID 。
3. 在关联集群中查找匹配主机名的节点 。
"""
""" 校验用户对节点的访问权限,并返回节点对象。 """
uid_res = await db . execute ( text ( " SELECT id FROM users WHERE username=:un LIMIT 1 " ) , { " un " : user_name } )
uid_row = uid_res . first ( )
if not uid_row :
@ -48,7 +42,6 @@ async def _find_accessible_node(db: AsyncSession, user_name: str, hostname: str)
async def _user_has_cluster_access ( db : AsyncSession , user_name : str , cluster_id : int ) - > bool :
""" 校验用户是否有权访问指定的集群。 """
uid_res = await db . execute ( text ( " SELECT id FROM users WHERE username=:un LIMIT 1 " ) , { " un " : user_name } )
uid_row = uid_res . first ( )
if not uid_row :
@ -61,13 +54,7 @@ async def _user_has_cluster_access(db: AsyncSession, user_name: str, cluster_id:
async def _write_exec_log ( db : AsyncSession , exec_id : str , command_type : str , status : str , start : datetime , end : Optional [ datetime ] , exit_code : Optional [ int ] , operator : str , stdout : Optional [ str ] = None , stderr : Optional [ str ] = None ) :
"""
将执行结果写入审计日志 。
1. 获取操作者的用户 ID 。
2. 获取操作者关联的第一个集群作为日志归属 。
3. 创建并保存 HadoopExecLog 记录 。
"""
""" 写入执行审计日志。 """
# 查找 from_user_id 和 cluster_name
uid_res = await db . execute ( text ( " SELECT id FROM users WHERE username=:un LIMIT 1 " ) , { " un " : operator } )
uid_row = uid_res . first ( )
@ -96,34 +83,21 @@ async def _write_exec_log(db: AsyncSession, exec_id: str, command_type: str, sta
async def tool_read_log ( db : AsyncSession , user_name : str , node : str , path : str , lines : int = 200 , pattern : Optional [ str ] = None , ssh_user : Optional [ str ] = None , timeout : int = 20 ) - > Dict [ str , Any ] :
"""
核心工具 : 读取远程节点的日志文件 。
参数 :
- node : 主机名
- path : 日志文件绝对路径
- lines : 读取末尾行数 , 默认 200
- pattern : 可选的正则过滤模式
"""
""" 工具:读取远端日志并可选筛选。 """
n = await _find_accessible_node ( db , user_name , node )
if not n :
return { " error " : " node_not_found " }
if not getattr ( n , " ssh_password " , None ) :
return { " error " : " ssh_password_not_configured " }
# 转义路径和构造 tail 命令
path_q = shlex . quote ( path )
cmd = f " tail -n { lines } { path_q } "
if pattern :
pat_q = shlex . quote ( pattern )
cmd = f " { cmd } | grep -E { pat_q } "
start = _now ( )
# 使用 bash -lc 确保加载环境变量
bash_cmd = f " bash -lc { shlex . quote ( cmd ) } "
def _run ( ) :
""" 在线程池中执行阻塞的 SSH 调用。 """
client = ssh_manager . get_connection (
str ( getattr ( n , " hostname " , node ) ) ,
ip = str ( getattr ( n , " ip_address " , " " ) ) ,
@ -132,26 +106,17 @@ async def tool_read_log(db: AsyncSession, user_name: str, node: str, path: str,
)
return client . execute_command_with_timeout_and_status ( bash_cmd , timeout = timeout )
# 异步运行阻塞的 SSH 命令
code , out , err = await asyncio . to_thread ( _run )
end = _now ( )
exec_id = f " tool_ { start . timestamp ( ) : .0f } "
# 异步写入审计日志
await _write_exec_log ( db , exec_id , " read_log " , ( " success " if code == 0 else " failed " ) , start , end , code , user_name , out , err )
return { " execId " : exec_id , " exitCode " : code , " stdout " : out , " stderr " : err }
async def _fetch_page_text ( client : httpx . AsyncClient , url : str ) - > str :
"""
抓取并提取网页正文文本 。
1. 过滤非 HTTP 链接 。
2. 发起异步请求并设置 User - Agent 。
3. 使用 BeautifulSoup 解析 HTML , 移除无关标签 ( 脚本 、 样式 、 导航等 ) 。
4. 返回清洗后的前 2000 个字符 。
"""
""" Fetch and extract text content from a URL. """
try :
# Skip if not a valid http url
if not url . startswith ( " http " ) :
return " "
@ -161,11 +126,11 @@ async def _fetch_page_text(client: httpx.AsyncClient, url: str) -> str:
resp = await client . get ( url , headers = headers , follow_redirects = True )
if resp . status_code == 200 :
soup = BeautifulSoup ( resp . text , " html.parser " )
# 移除干扰元素
# Remove scripts and styles
for script in soup ( [ " script " , " style " , " nav " , " footer " , " header " ] ) :
script . decompose ( )
text = soup . get_text ( separator = " \n " , strip = True )
# 限制文本长度以防上下文溢出
# Limit text length
return text [ : 2000 ]
except Exception :
pass
@ -173,13 +138,7 @@ async def _fetch_page_text(client: httpx.AsyncClient, url: str) -> str:
async def tool_web_search ( query : str , max_results : int = 5 ) - > Dict [ str , Any ] :
"""
核心工具 : 通过百度进行联网搜索 。
1. 构造搜索请求 。
2. 解析搜索结果页面 , 提取标题 、 链接和摘要 。
3. 并发抓取排名前 2 的网页正文内容 。
"""
""" 工具: 联网搜索( Baidu) 并读取网页内容。 """
try :
results = [ ]
headers = {
@ -192,12 +151,12 @@ async def tool_web_search(query: str, max_results: int = 5) -> Dict[str, Any]:
url = " https://www.baidu.com/s "
params = { " wd " : query }
# 使用同步 requests 获取搜索结果(通常更稳定)
# Use sync requests for search page (stable)
resp = requests . get ( url , params = params , headers = headers , timeout = 10 , verify = False )
if resp . status_code == 200 :
soup = BeautifulSoup ( resp . text , " html.parser " )
# 提取百度搜索结果项
# Baidu results are usually in div with class c-container
for item in soup . select ( " div.c-container, div.result.c-container " ) [ : max_results ] :
title_elem = item . select_one ( " h3 " )
if not title_elem :
@ -206,20 +165,21 @@ async def tool_web_search(query: str, max_results: int = 5) -> Dict[str, Any]:
link_elem = item . select_one ( " a " )
href = link_elem . get ( " href " ) if link_elem else " "
# 提取摘要并移除标题重复内容
# Abstract/Snippet
snippet = item . get_text ( strip = True ) . replace ( title , " " ) [ : 200 ]
results . append ( {
" title " : title ,
" href " : href ,
" body " : snippet ,
" full_content " : " " # 待填充正文
" full_content " : " " # Placeholder
} )
# 并发获取前 2 条结果的全文
# Fetch full content for top 2 results
if results :
async with httpx . AsyncClient ( timeout = 10 , verify = False ) as client :
tasks = [ ]
# Only fetch top 2 to avoid long wait
for r in results [ : 2 ] :
tasks . append ( _fetch_page_text ( client , r [ " href " ] ) )
@ -228,9 +188,10 @@ async def tool_web_search(query: str, max_results: int = 5) -> Dict[str, Any]:
for i , content in enumerate ( contents ) :
if content :
results [ i ] [ " full_content " ] = content
# Append note to body to indicate full content is available
results [ i ] [ " body " ] + = " \n [Full content fetched] "
# 添加当前系统时间,协助 LLM 处理“现在”相关的查询
# Add current system time to help with "now" queries
current_time = datetime . now ( ) . strftime ( " % Y- % m- %d % H: % M: % S % A " )
return { " query " : query , " current_time " : current_time , " results " : results }
except Exception as e :
@ -238,15 +199,7 @@ async def tool_web_search(query: str, max_results: int = 5) -> Dict[str, Any]:
async def tool_start_cluster ( db : AsyncSession , user_name : str , cluster_uuid : str ) - > Dict [ str , Any ] :
"""
核心工具 : 启动指定的 Hadoop 集群 。
1. 校验集群存在性 。
2. 获取 SSH 登录凭据 。
3. 在 NameNode 执行 start - dfs . sh 。
4. 在 ResourceManager 执行 start - yarn . sh 。
5. 更新数据库中的集群健康状态 。
"""
""" 工具:启动 Hadoop 集群。 """
# 1. 权限与用户
uid_res = await db . execute ( text ( " SELECT id FROM users WHERE username=:un LIMIT 1 " ) , { " un " : user_name } )
uid_row = uid_res . first ( )
@ -266,7 +219,7 @@ async def tool_start_cluster(db: AsyncSession, user_name: str, cluster_uuid: str
start_time = _now ( )
logs = [ ]
# 4. 在 NameNode 执行 启动脚本
# 4. 在 NameNode 执行 start-dfs.sh
if cluster . namenode_ip and cluster . namenode_psw :
try :
def run_nn_start ( ) :
@ -277,7 +230,7 @@ async def tool_start_cluster(db: AsyncSession, user_name: str, cluster_uuid: str
except Exception as e :
logs . append ( f " NameNode ( { cluster . namenode_ip } ) start failed: { str ( e ) } " )
# 5. 在 ResourceManager 执行 启动脚本
# 5. 在 ResourceManager 执行 start-yarn.sh
if cluster . rm_ip and cluster . rm_psw :
try :
def run_rm_start ( ) :
@ -290,7 +243,7 @@ async def tool_start_cluster(db: AsyncSession, user_name: str, cluster_uuid: str
end_time = _now ( )
# 6. 更新集群状态
# 6. 更新集群状态 (改进:检查是否有失败日志)
has_failed = any ( " failed " in log . lower ( ) for log in logs )
if not has_failed :
cluster . health_status = " healthy "
@ -300,7 +253,7 @@ async def tool_start_cluster(db: AsyncSession, user_name: str, cluster_uuid: str
cluster . updated_at = end_time
await db . flush ( )
# 7. 记录 审计 日志
# 7. 记录 日志
full_desc = " | " . join ( logs )
exec_row = HadoopExecLog (
from_user_id = user_id ,
@ -316,7 +269,7 @@ async def tool_start_cluster(db: AsyncSession, user_name: str, cluster_uuid: str
async def tool_stop_cluster ( db : AsyncSession , user_name : str , cluster_uuid : str ) - > Dict [ str , Any ] :
""" 核心 工具:停止指定的 Hadoop 集群(反向操作启动逻辑) 。"""
""" 工具:停止 Hadoop 集群。"""
uid_res = await db . execute ( text ( " SELECT id FROM users WHERE username=:un LIMIT 1 " ) , { " un " : user_name } )
uid_row = uid_res . first ( )
user_id = uid_row [ 0 ] if uid_row else 1
@ -333,7 +286,6 @@ async def tool_stop_cluster(db: AsyncSession, user_name: str, cluster_uuid: str)
start_time = _now ( )
logs = [ ]
# 停止 HDFS
if cluster . namenode_ip and cluster . namenode_psw :
try :
def run_nn_stop ( ) :
@ -344,7 +296,6 @@ async def tool_stop_cluster(db: AsyncSession, user_name: str, cluster_uuid: str)
except Exception as e :
logs . append ( f " NameNode ( { cluster . namenode_ip } ) stop failed: { str ( e ) } " )
# 停止 YARN
if cluster . rm_ip and cluster . rm_psw :
try :
def run_rm_stop ( ) :
@ -383,13 +334,7 @@ async def tool_read_cluster_log(
node_hostname : Optional [ str ] = None ,
lines : int = 100 ,
) - > Dict [ str , Any ] :
"""
核心工具 : 智能读取集群中特定组件的日志 。
1. 根据集群 UUID 查找集群 。
2. 确定目标 IP ( NameNode 、 ResourceManager 或指定的主机 ) 。
3. 通过 SSH 尝试在标准日志目录下寻找并 tail 日志文件 。
"""
""" 读取集群中特定服务类型的日志。 """
import uuid as uuidlib
try :
uuidlib . UUID ( cluster_uuid )
@ -409,7 +354,6 @@ async def tool_read_cluster_log(
ssh_user : Optional [ str ] = None
ssh_password : Optional [ str ] = None
# 根据 log_type 自动确定目标节点
if log_type . lower ( ) == " namenode " :
target_ip = str ( cluster . namenode_ip ) if cluster . namenode_ip else None
ssh_password = cluster . namenode_psw
@ -431,7 +375,6 @@ async def tool_read_cluster_log(
if node_obj and node_obj . ssh_user :
ssh_user = node_obj . ssh_user
# 如果通过 hostname 指定了节点
if not target_ip and target_hostname :
node = await _find_accessible_node ( db , user_name , target_hostname )
if not node :
@ -446,14 +389,10 @@ async def tool_read_cluster_log(
target_hostname = target_ip
def _tail_via_ssh ( ) - > Dict [ str , Any ] :
""" SSH 远程读取日志逻辑。 """
ip = str ( target_ip )
hn = str ( target_hostname )
# 初始化日志目录发现机制
log_reader . find_working_log_dir ( hn , ip )
ssh_client = ssh_manager . get_connection ( hn , ip = ip , username = ssh_user , password = ssh_password )
# 尝试匹配已知的日志路径
paths = log_reader . get_log_file_paths ( hn , log_type . lower ( ) )
for p in paths :
p_q = shlex . quote ( p )
@ -464,8 +403,6 @@ async def tool_read_cluster_log(
if err2 :
continue
return { " status " : " success " , " node " : hn , " log_type " : log_type , " path " : p , " content " : out2 }
# 兜底方案:在日志目录下列出文件并模糊匹配组件名
base_dir = log_reader . _node_log_dir . get ( hn , log_reader . log_dir )
base_q = shlex . quote ( base_dir )
out , err = ssh_client . execute_command ( f " ls -1 { base_q } 2>/dev/null " )
@ -476,7 +413,6 @@ async def tool_read_cluster_log(
lf = f . lower ( )
if not f :
continue
# 匹配包含组件名且以常见日志后缀结尾的文件
if log_type . lower ( ) in lf and hn . lower ( ) in lf and ( lf . endswith ( " .log " ) or lf . endswith ( " .out " ) or lf . endswith ( " .out.1 " ) ) :
full = f " { base_dir } / { f } "
full_q = shlex . quote ( full )
@ -488,7 +424,6 @@ async def tool_read_cluster_log(
return await asyncio . to_thread ( _tail_via_ssh )
# 预定义的 Hadoop 故障规则库
_FAULT_RULES : List [ Dict [ str , Any ] ] = [
{
" id " : " hdfs_safemode " ,
@ -557,7 +492,6 @@ _FAULT_RULES: List[Dict[str, Any]] = [
def _detect_faults_from_log_text ( text : str , max_examples_per_rule : int = 3 ) - > List [ Dict [ str , Any ] ] :
""" 根据规则库从日志文本中识别故障。 """
lines = ( text or " " ) . splitlines ( )
hits : List [ Dict [ str , Any ] ] = [ ]
for rule in _FAULT_RULES :
@ -567,7 +501,6 @@ def _detect_faults_from_log_text(text: str, max_examples_per_rule: int = 3) -> L
for idx , line in enumerate ( lines ) :
if not line :
continue
# 如果某行匹配任意正则
if any ( rgx . search ( line ) for rgx in compiled ) :
examples . append ( { " lineNo " : idx + 1 , " line " : line [ : 500 ] } )
if len ( examples ) > = max_examples_per_rule :
@ -594,13 +527,6 @@ async def tool_detect_cluster_faults(
node_hostname : Optional [ str ] = None ,
lines : int = 200 ,
) - > Dict [ str , Any ] :
"""
核心工具 : 自动检测集群组件的常见故障 。
1. 依次读取指定组件 ( 默认 NameNode , ResourceManager ) 的日志 。
2. 使用规则库匹配日志内容 。
3. 返回识别到的故障列表 , 按严重程度排序 。
"""
import uuid as uuidlib
try :
@ -618,7 +544,6 @@ async def tool_detect_cluster_faults(
faults : List [ Dict [ str , Any ] ] = [ ]
for comp in comps :
# 复用 read_cluster_log 读取内容
r = await tool_read_cluster_log (
db = db ,
user_name = user_name ,
@ -631,7 +556,6 @@ async def tool_detect_cluster_faults(
if r . get ( " status " ) != " success " :
continue
content = r . get ( " content " ) or " "
# 执行规则检测
comp_faults = _detect_faults_from_log_text ( content )
for f in comp_faults :
f2 = dict ( f )
@ -640,7 +564,6 @@ async def tool_detect_cluster_faults(
f2 [ " path " ] = r . get ( " path " )
faults . append ( f2 )
# 按严重程度排序 (high > medium > low)
severity_order = { " high " : 0 , " medium " : 1 , " low " : 2 }
faults . sort ( key = lambda x : ( severity_order . get ( ( x . get ( " severity " ) or " " ) . lower ( ) , 9 ) , x . get ( " id " ) or " " ) )
@ -653,7 +576,6 @@ async def tool_detect_cluster_faults(
}
# 预定义的运维命令白名单及其默认执行目标
_OPS_COMMANDS : Dict [ str , Dict [ str , Any ] ] = {
" jps " : { " cmd " : " jps -lm " , " target " : " all_nodes " } ,
" hadoop_version " : { " cmd " : " hadoop version " , " target " : " namenode " } ,
@ -678,14 +600,6 @@ async def tool_run_cluster_command(
timeout : int = 30 ,
limit_nodes : int = 20 ,
) - > Dict [ str , Any ] :
"""
核心工具 : 在集群节点上安全执行白名单内的运维命令 。
参数 :
- command_key : 命令标识 ( 如 jps , df_h )
- target : 目标 ( namenode , resourcemanager , node , all_nodes )
- node_hostname : 当 target = node 时指定的主机名
"""
import uuid as uuidlib
try :
@ -713,7 +627,6 @@ async def tool_run_cluster_command(
bash_cmd = f " bash -lc { shlex . quote ( cmd ) } "
async def _exec_on_node ( hostname : str , ip : str , ssh_user : Optional [ str ] , ssh_password : Optional [ str ] ) - > Dict [ str , Any ] :
""" 执行 SSH 命令的闭包。 """
def _run ( ) :
client = ssh_manager . get_connection ( hostname , ip = ip , username = ssh_user , password = ssh_password )
exit_code , out , err = client . execute_command_with_timeout_and_status ( bash_cmd , timeout = timeout )
@ -730,7 +643,6 @@ async def tool_run_cluster_command(
results : List [ Dict [ str , Any ] ] = [ ]
# 分发逻辑
if tgt == " namenode " :
if not cluster . namenode_ip or not cluster . namenode_psw :
return { " status " : " error " , " message " : " namenode_not_configured " }
@ -760,7 +672,6 @@ async def tool_run_cluster_command(
results . append ( await _exec_on_node ( node . hostname , str ( node . ip_address ) , node . ssh_user or " hadoop " , node . ssh_password ) )
elif tgt == " all_nodes " :
# 批量在所有节点上执行(带限制)
nodes_stmt = select ( Node ) . where ( Node . cluster_id == cluster . id ) . limit ( limit_nodes )
nodes = ( await db . execute ( nodes_stmt ) ) . scalars ( ) . all ( )
for n in nodes :
@ -772,7 +683,6 @@ async def tool_run_cluster_command(
else :
return { " status " : " error " , " message " : " invalid_target " }
# 记录执行日志
start = _now ( )
exec_id = f " tool_ { start . timestamp ( ) : .0f } "
await _write_exec_log ( db , exec_id , " run_cluster_command " , " success " , start , _now ( ) , 0 , user_name )
@ -788,7 +698,7 @@ async def tool_run_cluster_command(
def openai_tools_schema ( ) - > List [ Dict [ str , Any ] ] :
""" 返回 OpenAI 兼容的工具定义( Function Calling) ,供 LLM 调用 。"""
""" 返回 OpenAI 兼容的工具定义( Function Calling) 。"""
return [
{
" type " : " function " ,