You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

411 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
OpenClaw 调度引擎接口模块
支持两种传输协议(通过环境变量 OPENCLAW_PROTOCOL 切换):
1. websocket默认JSON-RPC 2.0 over WebSocket
ws://127.0.0.1:18789/acp 或 ws://127.0.0.1:18789/
每次调用建立短连接,发送请求帧,读取响应帧后关闭。
2. httpJSON-RPC 2.0 over HTTP POST
POST http://127.0.0.1:18789/acp
Content-Type: application/json
支持的 method
jobs.register.run_once - 立即执行一次
jobs.register.cron - 注册定时调度
jobs.cancel - 取消/移除调度
jobs.get - 查询任务状态
skills.run - 直接调用 Skill不走调度器
"""
import os
import uuid
import json
import requests
import datetime
import db
# OpenClaw 服务地址(从环境变量读取,默认本地 18789
OPENCLAW_BASE = os.environ.get('OPENCLAW_URL', 'http://127.0.0.1:18789')
OPENCLAW_TIMEOUT = int(os.environ.get('OPENCLAW_TIMEOUT', '10'))
# 传输协议websocket默认 或 http
# 设置方式:环境变量 OPENCLAW_PROTOCOL=http 或 OPENCLAW_PROTOCOL=websocket
OPENCLAW_PROTOCOL = os.environ.get('OPENCLAW_PROTOCOL', 'websocket').lower()
# WebSocket 端点路径(部分 OpenClaw 版本路径不同,可通过环境变量覆盖)
OPENCLAW_WS_PATH = os.environ.get('OPENCLAW_WS_PATH', '/acp')
# 灵枢回调地址OpenClaw 执行完成后 POST 到这里)
LINGSHU_CALLBACK_BASE = os.environ.get('LINGSHU_CALLBACK_URL', 'http://localhost:5000')
# 根据 OPENCLAW_BASE 自动推导 WebSocket 地址
# http://host:port → ws://host:port
# https://host:port → wss://host:port
def _ws_url():
base = OPENCLAW_BASE.rstrip('/')
if base.startswith('https://'):
return 'wss://' + base[8:] + OPENCLAW_WS_PATH
return 'ws://' + base.replace('http://', '') + OPENCLAW_WS_PATH
ACP_ENDPOINT = f'{OPENCLAW_BASE}/acp'
def _headers():
token = os.environ.get('OPENCLAW_TOKEN', '')
h = {'Content-Type': 'application/json'}
if token:
h['Authorization'] = f'Bearer {token}'
return h
def _acp_call_http(method: str, params: dict) -> dict:
"""JSON-RPC 2.0 over HTTP POST"""
req_id = str(uuid.uuid4())
payload = {'jsonrpc': '2.0', 'id': req_id, 'method': method, 'params': params}
resp = requests.post(
ACP_ENDPOINT,
json=payload,
headers=_headers(),
timeout=OPENCLAW_TIMEOUT
)
resp.raise_for_status()
body = resp.json()
if 'error' in body:
err = body['error']
raise RuntimeError(f"ACP error {err.get('code')}: {err.get('message')}")
return body.get('result', {})
def _acp_call_ws(method: str, params: dict) -> dict:
"""JSON-RPC 2.0 over WebSocket短连接模式一请求一响应"""
try:
import websocket # websocket-client
except ImportError:
raise RuntimeError(
"缺少 websocket-client 库,请在容器/环境中执行: pip install websocket-client"
)
req_id = str(uuid.uuid4())
payload = json.dumps({'jsonrpc': '2.0', 'id': req_id, 'method': method, 'params': params})
token = os.environ.get('OPENCLAW_TOKEN', '')
headers = [f'Authorization: Bearer {token}'] if token else []
ws_addr = _ws_url()
ws = websocket.create_connection(
ws_addr,
timeout=OPENCLAW_TIMEOUT,
header=headers
)
try:
ws.send(payload)
raw = ws.recv()
finally:
ws.close()
body = json.loads(raw)
if 'error' in body:
err = body['error']
raise RuntimeError(f"ACP error {err.get('code')}: {err.get('message')}")
return body.get('result', {})
def _acp_call(method: str, params: dict) -> dict:
"""
统一入口:根据 OPENCLAW_PROTOCOL 选择 WebSocket 或 HTTP。
"""
if OPENCLAW_PROTOCOL == 'http':
return _acp_call_http(method, params)
return _acp_call_ws(method, params)
def _log_to_db(execution_id, task_id, level, message, node_name=None):
"""写一条执行日志到数据库"""
try:
conn = db.get_conn()
cur = conn.cursor()
cur.execute("""
INSERT INTO task_logs (execution_id, task_id, log_level, log_time, message, node_name)
VALUES (%s, %s, %s, %s, %s, %s)
""", (execution_id, task_id, level,
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
message, node_name))
conn.commit()
cur.close(); conn.close()
except Exception as e:
print(f'[warn] log_to_db failed: {e}')
def _build_executor(skill_name, skill_params, timeout, callback_url, execution_id):
"""
构建 OpenClaw executor 配置。
优先使用 skill executor如果 skill_name 包含路径分隔符则用 shell。
"""
if '/' in skill_name or skill_name.endswith('.sh') or skill_name.endswith('.py'):
# 当作 shell 命令
return {
'type': 'shell',
'command': [skill_name] + [json.dumps(skill_params)] if skill_params else [skill_name],
'timeout': timeout
}
else:
# OpenClaw skill executor
return {
'type': 'skill',
'skill': skill_name,
'params': skill_params or {},
'timeout': timeout,
'callbackUrl': callback_url,
'callbackParams': {'execution_id': execution_id}
}
def register_task(task_id: int) -> dict:
"""
启动任务:从数据库读取任务配置,向 OpenClaw 注册 Cron 调度。
返回 {'ok': True/False, 'message': str, 'job_id': str}
"""
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT t.*, a.agent_code, a.api_endpoint AS agent_endpoint
FROM tasks t LEFT JOIN agents a ON t.agent_id = a.id
WHERE t.id = %s
""", (task_id,))
task = cur.fetchone()
cur.close(); conn.close()
if not task:
return {'ok': False, 'message': f'任务 {task_id} 不存在'}
# 解析 workflow_json 中的扩展字段
try:
extra = json.loads(task['workflow_json']) if task['workflow_json'] else {}
except Exception:
extra = {}
skill_name = extra.get('skill_name', '')
skill_params = extra.get('skill_params', {})
timeout = extra.get('timeout', 300)
retry_count = extra.get('retry_count', 3)
cron_expr = task.get('cron_expression') or ''
if not skill_name:
return {'ok': False, 'message': '未配置 Skillworkflow_json.skill_name无法启动'}
callback_url = f"{LINGSHU_CALLBACK_BASE}/api/openclaw/callback/{task_id}"
job_name = f'lingshu_task_{task_id}'
executor = _build_executor(skill_name, skill_params, timeout, callback_url, None)
# 有 Cron 表达式:注册定时调度;否则只做 run_once
if cron_expr and task.get('trigger_type') == 'cron':
method = 'jobs.register.cron'
params = {
'job': {
'name': job_name,
'cron': cron_expr,
'executor': executor,
'retry': retry_count,
'taskName': task['name'],
}
}
else:
method = 'jobs.register.run_once'
params = {
'job': {
'name': job_name,
'executor': executor,
'delaySeconds': 0,
'retry': retry_count,
'taskName': task['name'],
}
}
try:
result = _acp_call(method, params)
job_id = result.get('jobId', job_name)
_save_job_id(task_id, job_id)
return {'ok': True, 'message': f'已注册到 OpenClawjob_id={job_id}', 'job_id': job_id}
except requests.exceptions.ConnectionError:
msg = f'OpenClaw 服务不可达({OPENCLAW_BASE}),任务已在灵枢侧标记运行,将在 OpenClaw 恢复后自动接管'
print(f'[warn] {msg}')
return {'ok': True, 'message': msg, 'job_id': None, 'degraded': True}
except Exception as e:
return {'ok': False, 'message': f'注册失败: {str(e)}'}
def unregister_task(task_id: int) -> dict:
"""
停止任务:通知 OpenClaw 移除调度jobs.cancel
"""
job_name = f'lingshu_task_{task_id}'
# 尝试从数据库取已存的 job_id
try:
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,))
row = cur.fetchone()
cur.close(); conn.close()
extra = json.loads(row['workflow_json']) if row and row['workflow_json'] else {}
job_id = extra.get('openclaw_job_id', job_name)
except Exception:
job_id = job_name
try:
_acp_call('jobs.cancel', {'jobId': job_id})
return {'ok': True, 'message': f'已从 OpenClaw 移除调度job_id={job_id}'}
except requests.exceptions.ConnectionError:
return {'ok': True, 'message': 'OpenClaw 不可达,已在灵枢侧标记为停止', 'degraded': True}
except RuntimeError as e:
# ACP error 404 => job 不存在,视为成功
if '404' in str(e) or 'not found' in str(e).lower():
return {'ok': True, 'message': '任务在 OpenClaw 中已不存在(可能已停止)'}
return {'ok': False, 'message': f'停止失败: {str(e)}'}
except Exception as e:
return {'ok': False, 'message': f'停止失败: {str(e)}'}
def trigger_once(task_id: int, execution_id: int) -> dict:
"""
手动触发一次:立即让 OpenClaw 执行 Skilljobs.register.run_once
"""
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT * FROM tasks WHERE id=%s", (task_id,))
task = cur.fetchone()
cur.close(); conn.close()
if not task:
return {'ok': False, 'message': f'任务 {task_id} 不存在'}
try:
extra = json.loads(task['workflow_json']) if task['workflow_json'] else {}
except Exception:
extra = {}
skill_name = extra.get('skill_name', '')
skill_params = extra.get('skill_params', {})
timeout = extra.get('timeout', 300)
if not skill_name:
# 没有 Skill直接模拟执行演示用
_simulate_execution(task_id, execution_id, task['name'])
return {'ok': True, 'message': '模拟执行完成(未配置 Skill'}
callback_url = f"{LINGSHU_CALLBACK_BASE}/api/openclaw/callback/{task_id}"
job_name = f'lingshu_task_{task_id}_exec_{execution_id}'
executor = _build_executor(skill_name, skill_params, timeout, callback_url, execution_id)
_log_to_db(execution_id, task_id, 'info',
f'[灵枢] 向 OpenClaw 发送执行请求Skill={skill_name}method=jobs.register.run_once')
try:
result = _acp_call('jobs.register.run_once', {
'job': {
'name': job_name,
'executor': executor,
'delaySeconds': 0,
'taskName': task['name'],
}
})
returned_job_id = result.get('jobId', job_name)
_log_to_db(execution_id, task_id, 'info',
f'[OpenClaw] 已接收jobId={returned_job_id},异步执行中')
return {'ok': True, 'message': f'OpenClaw 已接收jobId={returned_job_id},异步执行中'}
except requests.exceptions.ConnectionError:
_log_to_db(execution_id, task_id, 'warn', '[降级] OpenClaw 不可达,切换为本地模拟执行')
_simulate_execution(task_id, execution_id, task['name'], skill_name, skill_params)
return {'ok': True, 'message': '降级执行OpenClaw 不可达,本地模拟)'}
except Exception as e:
_log_to_db(execution_id, task_id, 'error', f'[错误] 触发失败: {str(e)}')
return {'ok': False, 'message': str(e)}
def query_job(task_id: int) -> dict:
"""查询 OpenClaw 中任务的当前状态jobs.get"""
try:
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,))
row = cur.fetchone()
cur.close(); conn.close()
extra = json.loads(row['workflow_json']) if row and row['workflow_json'] else {}
job_id = extra.get('openclaw_job_id', f'lingshu_task_{task_id}')
except Exception:
job_id = f'lingshu_task_{task_id}'
try:
result = _acp_call('jobs.get', {'jobId': job_id})
return {'ok': True, 'job': result}
except requests.exceptions.ConnectionError:
return {'ok': False, 'message': 'OpenClaw 不可达'}
except Exception as e:
return {'ok': False, 'message': str(e)}
def _simulate_execution(task_id, execution_id, task_name, skill_name=None, skill_params=None):
"""
OpenClaw 不可达时的降级模拟执行,写入日志并更新执行记录。
"""
import time, random
start = datetime.datetime.now()
_log_to_db(execution_id, task_id, 'info', f'[灵枢] 任务「{task_name}」开始执行')
if skill_name:
_log_to_db(execution_id, task_id, 'info', f'[Skill] 调用: {skill_name}')
_log_to_db(execution_id, task_id, 'info', f'[Skill] 参数: {json.dumps(skill_params or {}, ensure_ascii=False)}')
time.sleep(0.3)
_log_to_db(execution_id, task_id, 'success', f'[灵枢] 任务执行完成(模拟)')
duration = int((datetime.datetime.now() - start).total_seconds() * 1000)
cpu = round(random.uniform(10, 60), 1)
mem = round(random.uniform(20, 50), 1)
try:
conn = db.get_conn()
cur = conn.cursor()
cur.execute("""
UPDATE task_executions
SET status='success', ended_at=NOW(), duration_ms=%s,
cpu_usage_pct=%s, memory_usage_pct=%s
WHERE id=%s
""", (duration, cpu, mem, execution_id))
cur.execute("UPDATE tasks SET last_run_at=NOW() WHERE id=%s", (task_id,))
conn.commit()
cur.close(); conn.close()
except Exception as e:
print(f'[warn] simulate update failed: {e}')
def _save_job_id(task_id, job_id):
"""将 OpenClaw job_id 存入 workflow_json"""
try:
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,))
row = cur.fetchone()
extra = {}
if row and row['workflow_json']:
try:
extra = json.loads(row['workflow_json'])
except Exception:
pass
extra['openclaw_job_id'] = job_id
cur2 = conn.cursor()
cur2.execute("UPDATE tasks SET workflow_json=%s WHERE id=%s",
(json.dumps(extra, ensure_ascii=False), task_id))
conn.commit()
cur.close(); cur2.close(); conn.close()
except Exception as e:
print(f'[warn] save_job_id failed: {e}')