|
|
"""
|
|
|
OpenClaw 调度引擎接口模块
|
|
|
支持两种传输协议(通过环境变量 OPENCLAW_PROTOCOL 切换):
|
|
|
|
|
|
1. websocket(默认):JSON-RPC 2.0 over WebSocket
|
|
|
ws://127.0.0.1:18789/acp 或 ws://127.0.0.1:18789/
|
|
|
每次调用建立短连接,发送请求帧,读取响应帧后关闭。
|
|
|
|
|
|
2. http:JSON-RPC 2.0 over HTTP POST
|
|
|
POST http://127.0.0.1:18789/acp
|
|
|
Content-Type: application/json
|
|
|
|
|
|
支持的 method:
|
|
|
jobs.register.run_once - 立即执行一次
|
|
|
jobs.register.cron - 注册定时调度
|
|
|
jobs.cancel - 取消/移除调度
|
|
|
jobs.get - 查询任务状态
|
|
|
skills.run - 直接调用 Skill(不走调度器)
|
|
|
"""
|
|
|
import os
|
|
|
import uuid
|
|
|
import json
|
|
|
import requests
|
|
|
import datetime
|
|
|
import db
|
|
|
|
|
|
# OpenClaw 服务地址(从环境变量读取,默认本地 18789)
|
|
|
OPENCLAW_BASE = os.environ.get('OPENCLAW_URL', 'http://127.0.0.1:18789')
|
|
|
OPENCLAW_TIMEOUT = int(os.environ.get('OPENCLAW_TIMEOUT', '10'))
|
|
|
|
|
|
# 传输协议:websocket(默认) 或 http
|
|
|
# 设置方式:环境变量 OPENCLAW_PROTOCOL=http 或 OPENCLAW_PROTOCOL=websocket
|
|
|
OPENCLAW_PROTOCOL = os.environ.get('OPENCLAW_PROTOCOL', 'websocket').lower()
|
|
|
|
|
|
# WebSocket 端点路径(部分 OpenClaw 版本路径不同,可通过环境变量覆盖)
|
|
|
OPENCLAW_WS_PATH = os.environ.get('OPENCLAW_WS_PATH', '/acp')
|
|
|
|
|
|
# 灵枢回调地址(OpenClaw 执行完成后 POST 到这里)
|
|
|
LINGSHU_CALLBACK_BASE = os.environ.get('LINGSHU_CALLBACK_URL', 'http://localhost:5000')
|
|
|
|
|
|
# 根据 OPENCLAW_BASE 自动推导 WebSocket 地址
|
|
|
# http://host:port → ws://host:port
|
|
|
# https://host:port → wss://host:port
|
|
|
def _ws_url():
|
|
|
base = OPENCLAW_BASE.rstrip('/')
|
|
|
if base.startswith('https://'):
|
|
|
return 'wss://' + base[8:] + OPENCLAW_WS_PATH
|
|
|
return 'ws://' + base.replace('http://', '') + OPENCLAW_WS_PATH
|
|
|
|
|
|
ACP_ENDPOINT = f'{OPENCLAW_BASE}/acp'
|
|
|
|
|
|
|
|
|
def _headers():
|
|
|
token = os.environ.get('OPENCLAW_TOKEN', '')
|
|
|
h = {'Content-Type': 'application/json'}
|
|
|
if token:
|
|
|
h['Authorization'] = f'Bearer {token}'
|
|
|
return h
|
|
|
|
|
|
|
|
|
def _acp_call_http(method: str, params: dict) -> dict:
|
|
|
"""JSON-RPC 2.0 over HTTP POST"""
|
|
|
req_id = str(uuid.uuid4())
|
|
|
payload = {'jsonrpc': '2.0', 'id': req_id, 'method': method, 'params': params}
|
|
|
resp = requests.post(
|
|
|
ACP_ENDPOINT,
|
|
|
json=payload,
|
|
|
headers=_headers(),
|
|
|
timeout=OPENCLAW_TIMEOUT
|
|
|
)
|
|
|
resp.raise_for_status()
|
|
|
body = resp.json()
|
|
|
if 'error' in body:
|
|
|
err = body['error']
|
|
|
raise RuntimeError(f"ACP error {err.get('code')}: {err.get('message')}")
|
|
|
return body.get('result', {})
|
|
|
|
|
|
|
|
|
def _acp_call_ws(method: str, params: dict) -> dict:
|
|
|
"""JSON-RPC 2.0 over WebSocket(短连接模式,一请求一响应)"""
|
|
|
try:
|
|
|
import websocket # websocket-client
|
|
|
except ImportError:
|
|
|
raise RuntimeError(
|
|
|
"缺少 websocket-client 库,请在容器/环境中执行: pip install websocket-client"
|
|
|
)
|
|
|
|
|
|
req_id = str(uuid.uuid4())
|
|
|
payload = json.dumps({'jsonrpc': '2.0', 'id': req_id, 'method': method, 'params': params})
|
|
|
|
|
|
token = os.environ.get('OPENCLAW_TOKEN', '')
|
|
|
headers = [f'Authorization: Bearer {token}'] if token else []
|
|
|
|
|
|
ws_addr = _ws_url()
|
|
|
ws = websocket.create_connection(
|
|
|
ws_addr,
|
|
|
timeout=OPENCLAW_TIMEOUT,
|
|
|
header=headers
|
|
|
)
|
|
|
try:
|
|
|
ws.send(payload)
|
|
|
raw = ws.recv()
|
|
|
finally:
|
|
|
ws.close()
|
|
|
|
|
|
body = json.loads(raw)
|
|
|
if 'error' in body:
|
|
|
err = body['error']
|
|
|
raise RuntimeError(f"ACP error {err.get('code')}: {err.get('message')}")
|
|
|
return body.get('result', {})
|
|
|
|
|
|
|
|
|
def _acp_call(method: str, params: dict) -> dict:
|
|
|
"""
|
|
|
统一入口:根据 OPENCLAW_PROTOCOL 选择 WebSocket 或 HTTP。
|
|
|
"""
|
|
|
if OPENCLAW_PROTOCOL == 'http':
|
|
|
return _acp_call_http(method, params)
|
|
|
return _acp_call_ws(method, params)
|
|
|
|
|
|
|
|
|
def _log_to_db(execution_id, task_id, level, message, node_name=None):
|
|
|
"""写一条执行日志到数据库"""
|
|
|
try:
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("""
|
|
|
INSERT INTO task_logs (execution_id, task_id, log_level, log_time, message, node_name)
|
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
|
""", (execution_id, task_id, level,
|
|
|
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
|
|
|
message, node_name))
|
|
|
conn.commit()
|
|
|
cur.close(); conn.close()
|
|
|
except Exception as e:
|
|
|
print(f'[warn] log_to_db failed: {e}')
|
|
|
|
|
|
|
|
|
def _build_executor(skill_name, skill_params, timeout, callback_url, execution_id):
|
|
|
"""
|
|
|
构建 OpenClaw executor 配置。
|
|
|
优先使用 skill executor,如果 skill_name 包含路径分隔符则用 shell。
|
|
|
"""
|
|
|
if '/' in skill_name or skill_name.endswith('.sh') or skill_name.endswith('.py'):
|
|
|
# 当作 shell 命令
|
|
|
return {
|
|
|
'type': 'shell',
|
|
|
'command': [skill_name] + [json.dumps(skill_params)] if skill_params else [skill_name],
|
|
|
'timeout': timeout
|
|
|
}
|
|
|
else:
|
|
|
# OpenClaw skill executor
|
|
|
return {
|
|
|
'type': 'skill',
|
|
|
'skill': skill_name,
|
|
|
'params': skill_params or {},
|
|
|
'timeout': timeout,
|
|
|
'callbackUrl': callback_url,
|
|
|
'callbackParams': {'execution_id': execution_id}
|
|
|
}
|
|
|
|
|
|
|
|
|
def register_task(task_id: int) -> dict:
|
|
|
"""
|
|
|
启动任务:从数据库读取任务配置,向 OpenClaw 注册 Cron 调度。
|
|
|
返回 {'ok': True/False, 'message': str, 'job_id': str}
|
|
|
"""
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("""
|
|
|
SELECT t.*, a.agent_code, a.api_endpoint AS agent_endpoint
|
|
|
FROM tasks t LEFT JOIN agents a ON t.agent_id = a.id
|
|
|
WHERE t.id = %s
|
|
|
""", (task_id,))
|
|
|
task = cur.fetchone()
|
|
|
cur.close(); conn.close()
|
|
|
|
|
|
if not task:
|
|
|
return {'ok': False, 'message': f'任务 {task_id} 不存在'}
|
|
|
|
|
|
# 解析 workflow_json 中的扩展字段
|
|
|
try:
|
|
|
extra = json.loads(task['workflow_json']) if task['workflow_json'] else {}
|
|
|
except Exception:
|
|
|
extra = {}
|
|
|
|
|
|
skill_name = extra.get('skill_name', '')
|
|
|
skill_params = extra.get('skill_params', {})
|
|
|
timeout = extra.get('timeout', 300)
|
|
|
retry_count = extra.get('retry_count', 3)
|
|
|
cron_expr = task.get('cron_expression') or ''
|
|
|
|
|
|
if not skill_name:
|
|
|
return {'ok': False, 'message': '未配置 Skill(workflow_json.skill_name),无法启动'}
|
|
|
|
|
|
callback_url = f"{LINGSHU_CALLBACK_BASE}/api/openclaw/callback/{task_id}"
|
|
|
job_name = f'lingshu_task_{task_id}'
|
|
|
|
|
|
executor = _build_executor(skill_name, skill_params, timeout, callback_url, None)
|
|
|
|
|
|
# 有 Cron 表达式:注册定时调度;否则只做 run_once
|
|
|
if cron_expr and task.get('trigger_type') == 'cron':
|
|
|
method = 'jobs.register.cron'
|
|
|
params = {
|
|
|
'job': {
|
|
|
'name': job_name,
|
|
|
'cron': cron_expr,
|
|
|
'executor': executor,
|
|
|
'retry': retry_count,
|
|
|
'taskName': task['name'],
|
|
|
}
|
|
|
}
|
|
|
else:
|
|
|
method = 'jobs.register.run_once'
|
|
|
params = {
|
|
|
'job': {
|
|
|
'name': job_name,
|
|
|
'executor': executor,
|
|
|
'delaySeconds': 0,
|
|
|
'retry': retry_count,
|
|
|
'taskName': task['name'],
|
|
|
}
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
result = _acp_call(method, params)
|
|
|
job_id = result.get('jobId', job_name)
|
|
|
_save_job_id(task_id, job_id)
|
|
|
return {'ok': True, 'message': f'已注册到 OpenClaw,job_id={job_id}', 'job_id': job_id}
|
|
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
|
msg = f'OpenClaw 服务不可达({OPENCLAW_BASE}),任务已在灵枢侧标记运行,将在 OpenClaw 恢复后自动接管'
|
|
|
print(f'[warn] {msg}')
|
|
|
return {'ok': True, 'message': msg, 'job_id': None, 'degraded': True}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'ok': False, 'message': f'注册失败: {str(e)}'}
|
|
|
|
|
|
|
|
|
def unregister_task(task_id: int) -> dict:
|
|
|
"""
|
|
|
停止任务:通知 OpenClaw 移除调度(jobs.cancel)。
|
|
|
"""
|
|
|
job_name = f'lingshu_task_{task_id}'
|
|
|
|
|
|
# 尝试从数据库取已存的 job_id
|
|
|
try:
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,))
|
|
|
row = cur.fetchone()
|
|
|
cur.close(); conn.close()
|
|
|
extra = json.loads(row['workflow_json']) if row and row['workflow_json'] else {}
|
|
|
job_id = extra.get('openclaw_job_id', job_name)
|
|
|
except Exception:
|
|
|
job_id = job_name
|
|
|
|
|
|
try:
|
|
|
_acp_call('jobs.cancel', {'jobId': job_id})
|
|
|
return {'ok': True, 'message': f'已从 OpenClaw 移除调度,job_id={job_id}'}
|
|
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
|
return {'ok': True, 'message': 'OpenClaw 不可达,已在灵枢侧标记为停止', 'degraded': True}
|
|
|
|
|
|
except RuntimeError as e:
|
|
|
# ACP error 404 => job 不存在,视为成功
|
|
|
if '404' in str(e) or 'not found' in str(e).lower():
|
|
|
return {'ok': True, 'message': '任务在 OpenClaw 中已不存在(可能已停止)'}
|
|
|
return {'ok': False, 'message': f'停止失败: {str(e)}'}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'ok': False, 'message': f'停止失败: {str(e)}'}
|
|
|
|
|
|
|
|
|
def trigger_once(task_id: int, execution_id: int) -> dict:
|
|
|
"""
|
|
|
手动触发一次:立即让 OpenClaw 执行 Skill(jobs.register.run_once)。
|
|
|
"""
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT * FROM tasks WHERE id=%s", (task_id,))
|
|
|
task = cur.fetchone()
|
|
|
cur.close(); conn.close()
|
|
|
|
|
|
if not task:
|
|
|
return {'ok': False, 'message': f'任务 {task_id} 不存在'}
|
|
|
|
|
|
try:
|
|
|
extra = json.loads(task['workflow_json']) if task['workflow_json'] else {}
|
|
|
except Exception:
|
|
|
extra = {}
|
|
|
|
|
|
skill_name = extra.get('skill_name', '')
|
|
|
skill_params = extra.get('skill_params', {})
|
|
|
timeout = extra.get('timeout', 300)
|
|
|
|
|
|
if not skill_name:
|
|
|
# 没有 Skill,直接模拟执行(演示用)
|
|
|
_simulate_execution(task_id, execution_id, task['name'])
|
|
|
return {'ok': True, 'message': '模拟执行完成(未配置 Skill)'}
|
|
|
|
|
|
callback_url = f"{LINGSHU_CALLBACK_BASE}/api/openclaw/callback/{task_id}"
|
|
|
job_name = f'lingshu_task_{task_id}_exec_{execution_id}'
|
|
|
|
|
|
executor = _build_executor(skill_name, skill_params, timeout, callback_url, execution_id)
|
|
|
|
|
|
_log_to_db(execution_id, task_id, 'info',
|
|
|
f'[灵枢] 向 OpenClaw 发送执行请求,Skill={skill_name},method=jobs.register.run_once')
|
|
|
|
|
|
try:
|
|
|
result = _acp_call('jobs.register.run_once', {
|
|
|
'job': {
|
|
|
'name': job_name,
|
|
|
'executor': executor,
|
|
|
'delaySeconds': 0,
|
|
|
'taskName': task['name'],
|
|
|
}
|
|
|
})
|
|
|
returned_job_id = result.get('jobId', job_name)
|
|
|
_log_to_db(execution_id, task_id, 'info',
|
|
|
f'[OpenClaw] 已接收,jobId={returned_job_id},异步执行中')
|
|
|
return {'ok': True, 'message': f'OpenClaw 已接收,jobId={returned_job_id},异步执行中'}
|
|
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
|
_log_to_db(execution_id, task_id, 'warn', '[降级] OpenClaw 不可达,切换为本地模拟执行')
|
|
|
_simulate_execution(task_id, execution_id, task['name'], skill_name, skill_params)
|
|
|
return {'ok': True, 'message': '降级执行(OpenClaw 不可达,本地模拟)'}
|
|
|
|
|
|
except Exception as e:
|
|
|
_log_to_db(execution_id, task_id, 'error', f'[错误] 触发失败: {str(e)}')
|
|
|
return {'ok': False, 'message': str(e)}
|
|
|
|
|
|
|
|
|
def query_job(task_id: int) -> dict:
|
|
|
"""查询 OpenClaw 中任务的当前状态(jobs.get)"""
|
|
|
try:
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,))
|
|
|
row = cur.fetchone()
|
|
|
cur.close(); conn.close()
|
|
|
extra = json.loads(row['workflow_json']) if row and row['workflow_json'] else {}
|
|
|
job_id = extra.get('openclaw_job_id', f'lingshu_task_{task_id}')
|
|
|
except Exception:
|
|
|
job_id = f'lingshu_task_{task_id}'
|
|
|
|
|
|
try:
|
|
|
result = _acp_call('jobs.get', {'jobId': job_id})
|
|
|
return {'ok': True, 'job': result}
|
|
|
except requests.exceptions.ConnectionError:
|
|
|
return {'ok': False, 'message': 'OpenClaw 不可达'}
|
|
|
except Exception as e:
|
|
|
return {'ok': False, 'message': str(e)}
|
|
|
|
|
|
|
|
|
def _simulate_execution(task_id, execution_id, task_name, skill_name=None, skill_params=None):
|
|
|
"""
|
|
|
OpenClaw 不可达时的降级模拟执行,写入日志并更新执行记录。
|
|
|
"""
|
|
|
import time, random
|
|
|
start = datetime.datetime.now()
|
|
|
|
|
|
_log_to_db(execution_id, task_id, 'info', f'[灵枢] 任务「{task_name}」开始执行')
|
|
|
if skill_name:
|
|
|
_log_to_db(execution_id, task_id, 'info', f'[Skill] 调用: {skill_name}')
|
|
|
_log_to_db(execution_id, task_id, 'info', f'[Skill] 参数: {json.dumps(skill_params or {}, ensure_ascii=False)}')
|
|
|
time.sleep(0.3)
|
|
|
_log_to_db(execution_id, task_id, 'success', f'[灵枢] 任务执行完成(模拟)')
|
|
|
|
|
|
duration = int((datetime.datetime.now() - start).total_seconds() * 1000)
|
|
|
cpu = round(random.uniform(10, 60), 1)
|
|
|
mem = round(random.uniform(20, 50), 1)
|
|
|
|
|
|
try:
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("""
|
|
|
UPDATE task_executions
|
|
|
SET status='success', ended_at=NOW(), duration_ms=%s,
|
|
|
cpu_usage_pct=%s, memory_usage_pct=%s
|
|
|
WHERE id=%s
|
|
|
""", (duration, cpu, mem, execution_id))
|
|
|
cur.execute("UPDATE tasks SET last_run_at=NOW() WHERE id=%s", (task_id,))
|
|
|
conn.commit()
|
|
|
cur.close(); conn.close()
|
|
|
except Exception as e:
|
|
|
print(f'[warn] simulate update failed: {e}')
|
|
|
|
|
|
|
|
|
def _save_job_id(task_id, job_id):
|
|
|
"""将 OpenClaw job_id 存入 workflow_json"""
|
|
|
try:
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,))
|
|
|
row = cur.fetchone()
|
|
|
extra = {}
|
|
|
if row and row['workflow_json']:
|
|
|
try:
|
|
|
extra = json.loads(row['workflow_json'])
|
|
|
except Exception:
|
|
|
pass
|
|
|
extra['openclaw_job_id'] = job_id
|
|
|
cur2 = conn.cursor()
|
|
|
cur2.execute("UPDATE tasks SET workflow_json=%s WHERE id=%s",
|
|
|
(json.dumps(extra, ensure_ascii=False), task_id))
|
|
|
conn.commit()
|
|
|
cur.close(); cur2.close(); conn.close()
|
|
|
except Exception as e:
|
|
|
print(f'[warn] save_job_id failed: {e}')
|