You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

561 lines
20 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
OpenClaw 调度引擎接口模块
支持两种传输协议(通过环境变量 OPENCLAW_PROTOCOL 切换):
1. websocket默认JSON-RPC 2.0 over WebSocket
ws://127.0.0.1:18789/acp 或 ws://127.0.0.1:18789/
每次调用建立短连接,发送请求帧,读取响应帧后关闭。
2. httpJSON-RPC 2.0 over HTTP POST
POST http://127.0.0.1:18789/acp
Content-Type: application/json
支持的 method
jobs.register.run_once - 立即执行一次
jobs.register.cron - 注册定时调度
jobs.cancel - 取消/移除调度
jobs.get - 查询任务状态
skills.run - 直接调用 Skill不走调度器
"""
import os
import uuid
import json
import requests
import datetime
import db
# OpenClaw 服务地址(从环境变量读取,默认本地 18789
OPENCLAW_BASE = os.environ.get('OPENCLAW_URL', 'http://127.0.0.1:18789')
OPENCLAW_TIMEOUT = int(os.environ.get('OPENCLAW_TIMEOUT', '10'))
# 传输协议websocket默认 或 http
# 设置方式:环境变量 OPENCLAW_PROTOCOL=http 或 OPENCLAW_PROTOCOL=websocket
OPENCLAW_PROTOCOL = os.environ.get('OPENCLAW_PROTOCOL', 'websocket').lower()
# WebSocket 端点路径(部分 OpenClaw 版本路径不同,可通过环境变量覆盖)
OPENCLAW_WS_PATH = os.environ.get('OPENCLAW_WS_PATH', '/acp')
# 灵枢回调地址OpenClaw 执行完成后 POST 到这里)
LINGSHU_CALLBACK_BASE = os.environ.get('LINGSHU_CALLBACK_URL', 'http://localhost:5000')
# 根据 OPENCLAW_BASE 自动推导 WebSocket 地址
# http://host:port → ws://host:port
# https://host:port → wss://host:port
def _ws_url():
base = OPENCLAW_BASE.rstrip('/')
if base.startswith('https://'):
return 'wss://' + base[8:] + OPENCLAW_WS_PATH
return 'ws://' + base.replace('http://', '') + OPENCLAW_WS_PATH
ACP_ENDPOINT = f'{OPENCLAW_BASE}/acp'
def _headers():
token = os.environ.get('OPENCLAW_TOKEN', '')
h = {'Content-Type': 'application/json'}
if token:
h['Authorization'] = f'Bearer {token}'
return h
def _acp_call_http(method: str, params: dict) -> dict:
"""JSON-RPC 2.0 over HTTP POST"""
req_id = str(uuid.uuid4())
payload = {'jsonrpc': '2.0', 'id': req_id, 'method': method, 'params': params}
resp = requests.post(
ACP_ENDPOINT,
json=payload,
headers=_headers(),
timeout=OPENCLAW_TIMEOUT
)
resp.raise_for_status()
body = resp.json()
if 'error' in body:
err = body['error']
raise RuntimeError(f"ACP error {err.get('code')}: {err.get('message')}")
return body.get('result', {})
def _ws_parse(raw) -> dict:
"""解析 WebSocket 收到的帧,兼容文本帧和二进制帧"""
if isinstance(raw, bytes):
raw = raw.decode('utf-8', errors='replace')
return json.loads(raw)
def _build_device_identity():
"""
从 OpenClaw identity 目录读取设备身份device.json + device-auth.json
目录优先级:
1. 环境变量 OPENCLAW_IDENTITY_DIRstart_local.sh / start.sh 自动注入)
2. 默认路径 ~/.openclaw/identity
返回 None 如果不可用(回退到 gateway token 认证)。
"""
identity_dir = os.environ.get('OPENCLAW_IDENTITY_DIR', '').strip()
if not identity_dir:
identity_dir = os.path.join(os.path.expanduser('~'), '.openclaw', 'identity')
device_json_path = os.path.join(identity_dir, 'device.json')
device_auth_path = os.path.join(identity_dir, 'device-auth.json')
try:
with open(device_json_path) as f:
device_json = json.load(f)
with open(device_auth_path) as f:
device_auth = json.load(f)
return {
'device_id': device_json['deviceId'],
'public_key_pem': device_json.get('publicKeyPem', ''),
'private_key_pem': device_json.get('privateKeyPem', ''),
'device_token': device_auth['tokens']['operator']['token'],
'public_key_raw': None,
}
except Exception:
return None
def _read_operator_token() -> str:
"""
读取 OpenClaw operator token优先级
1. 环境变量 OPENCLAW_OPERATOR_TOKEN由 start.sh 启动时自动注入)
2. OPENCLAW_IDENTITY_DIR 或默认路径下的 device-auth.json
"""
env_token = os.environ.get('OPENCLAW_OPERATOR_TOKEN', '').strip()
if env_token:
return env_token
identity_dir = os.environ.get('OPENCLAW_IDENTITY_DIR', '').strip()
if not identity_dir:
identity_dir = os.path.join(os.path.expanduser('~'), '.openclaw', 'identity')
device_auth_path = os.path.join(identity_dir, 'device-auth.json')
try:
with open(device_auth_path) as f:
data = json.load(f)
return data['tokens']['operator']['token']
except Exception:
return ''
def _build_device_signature(device_identity, nonce: str, scopes: list,
client_id: str, client_mode: str, role: str) -> dict:
"""
构建 device 签名字段v3 格式),返回 connect 请求中的 device 对象。
payload = v3|deviceId|clientId|clientMode|role|scopes_csv|signedAtMs|operatorToken|nonce|platform|deviceFamily
auth.deviceToken 和 payload 里的 token 都使用 operator token两者必须一致。
"""
try:
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.serialization import (
load_pem_private_key, Encoding, PublicFormat
)
import base64, time
except ImportError:
return None
signed_at = int(time.time() * 1000)
device_token = device_identity['device_token'] # operator token
device_id = device_identity['device_id']
scopes_csv = ','.join(scopes)
platform = 'linux'
device_family = ''
payload = '|'.join([
'v3', device_id, client_id, client_mode, role,
scopes_csv, str(signed_at), device_token, nonce, platform, device_family
])
private_key = load_pem_private_key(
device_identity['private_key_pem'].encode(), password=None
)
sig_bytes = private_key.sign(payload.encode('utf-8'))
signature = base64.urlsafe_b64encode(sig_bytes).rstrip(b'=').decode()
# 提取裸公钥base64url无 padding
pub_raw = private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
public_key_raw = base64.urlsafe_b64encode(pub_raw).rstrip(b'=').decode()
return {
'id': device_id,
'publicKey': public_key_raw,
'signature': signature,
'signedAt': signed_at,
'nonce': nonce,
}
def _acp_call_ws(method: str, params: dict) -> dict:
"""
OpenClaw ACP 协议 over WebSocket完整握手流程
1. 连接 ws://host:port/
2. 收 connect.challenge含 nonce
3. 发 connect 请求device 签名认证,获取 operator.admin scope
4. 收 res.ok=truehello-ok
5. 发业务 RPCtype=req
6. 收业务响应type=res
"""
try:
import websocket
except ImportError as e:
raise RuntimeError(f"缺少依赖库: {e},请执行: pip install websocket-client")
ws_addr = _ws_url()
ws = websocket.WebSocket(timeout=OPENCLAW_TIMEOUT)
# suppress_origin=True不发送 Origin 头,避免 OpenClaw 把连接识别为 Control UI 并拒绝非本机来源
ws.connect(ws_addr, suppress_origin=True)
try:
# ── 步骤1收 connect.challenge ──────────────────
raw = ws.recv()
challenge = _ws_parse(raw)
if challenge.get('type') != 'event' or challenge.get('event') != 'connect.challenge':
raise RuntimeError(f'预期 connect.challenge实际收到: {challenge}')
nonce = challenge.get('payload', {}).get('nonce', '')
# ── 步骤2构建认证参数 ────────────────────────────
client_id = 'cli'
client_mode = 'backend'
role = 'operator'
# 认证策略device 签名operator token与你本机完全一致
# device-auth.json 里的 operator token 同时用于 auth.deviceToken 和签名 payload
full_scopes = ['operator.read', 'operator.write', 'operator.admin',
'operator.approvals', 'operator.pairing']
device_identity = _build_device_identity()
device_obj = None
if device_identity:
device_obj = _build_device_signature(
device_identity, nonce, full_scopes, client_id, client_mode, role
)
if device_obj:
auth_obj = {'deviceToken': device_identity['device_token']}
scopes = full_scopes
else:
auth_obj = {}
scopes = full_scopes
# ── 步骤3发 connect 请求 ─────────────────────────
connect_id = str(uuid.uuid4())
connect_params = {
'minProtocol': 3,
'maxProtocol': 3,
'client': {
'id': client_id,
'version': '2026.3.24',
'platform': 'linux',
'mode': client_mode,
'instanceId': str(uuid.uuid4())
},
'role': role,
'scopes': scopes,
'caps': [],
'auth': auth_obj,
'userAgent': 'lingshu/1.0.0',
'locale': 'zh-CN'
}
if device_obj:
connect_params['device'] = device_obj
connect_req = {
'type': 'req',
'id': connect_id,
'method': 'connect',
'params': connect_params
}
ws.send(json.dumps(connect_req))
# ── 步骤3等待 connected 确认(过滤心跳)────────
for _ in range(5):
raw = ws.recv()
frame = _ws_parse(raw)
if frame.get('type') == 'ping':
ws.send(json.dumps({'type': 'pong'}))
continue
if frame.get('type') == 'res' and frame.get('ok'):
break
raise RuntimeError(f'认证失败: {frame}')
else:
raise RuntimeError('未收到认证成功响应connected')
# ── 步骤4发业务 RPC ─────────────────────────────
req_id = str(uuid.uuid4())
payload = json.dumps({
'type': 'req',
'id': req_id,
'method': method,
'params': params
})
ws.send(payload)
# ── 步骤5读取业务响应过滤心跳/通知)──────────
for _ in range(20):
raw = ws.recv()
frame = _ws_parse(raw)
if frame.get('type') == 'ping':
ws.send(json.dumps({'type': 'pong'}))
continue
if frame.get('id') != req_id:
continue
if not frame.get('ok'):
err = frame.get('error', {})
raise RuntimeError(f"ACP error {err.get('code')}: {err.get('message')}")
return frame.get('result', {})
raise RuntimeError('未在规定帧数内收到业务响应')
finally:
ws.close()
def _acp_call(method: str, params: dict) -> dict:
"""
统一入口:根据 OPENCLAW_PROTOCOL 选择 WebSocket 或 HTTP。
"""
if OPENCLAW_PROTOCOL == 'http':
return _acp_call_http(method, params)
return _acp_call_ws(method, params)
def _log_to_db(execution_id, task_id, level, message, node_name=None):
"""写一条执行日志到数据库"""
try:
conn = db.get_conn()
cur = conn.cursor()
cur.execute("""
INSERT INTO task_logs (execution_id, task_id, log_level, log_time, message, node_name)
VALUES (%s, %s, %s, %s, %s, %s)
""", (execution_id, task_id, level,
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
message, node_name))
conn.commit()
cur.close(); conn.close()
except Exception as e:
print(f'[warn] log_to_db failed: {e}')
def _build_cron_payload(task_name: str, skill_name: str, skill_params: dict) -> dict:
"""
构建 OpenClaw cron job payloadsessionTarget=main 要求 systemEvent 类型)。
text 字段包含任务描述,供 agent 识别。
"""
params_str = json.dumps(skill_params, ensure_ascii=False) if skill_params else '{}'
text = (
f'执行任务「{task_name}'
f' Skill={skill_name}'
f' 参数={params_str}'
)
return {
'kind': 'systemEvent',
'text': text,
}
def register_task(task_id: int) -> dict:
"""
启动任务:从数据库读取任务配置,向 OpenClaw 注册 Cron 调度cron.add
返回 {'ok': True/False, 'message': str, 'job_id': str}
"""
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT t.*, a.agent_code, a.api_endpoint AS agent_endpoint
FROM tasks t LEFT JOIN agents a ON t.agent_id = a.id
WHERE t.id = %s
""", (task_id,))
task = cur.fetchone()
cur.close(); conn.close()
if not task:
return {'ok': False, 'message': f'任务 {task_id} 不存在'}
try:
extra = json.loads(task['workflow_json']) if task['workflow_json'] else {}
except Exception:
extra = {}
skill_name = extra.get('skill_name', task['name'])
skill_params = extra.get('skill_params', {})
cron_expr = task.get('cron_expression') or ''
job_name = f'lingshu_task_{task_id}'
payload = _build_cron_payload(task['name'], skill_name, skill_params)
if cron_expr and task.get('trigger_type') == 'cron':
schedule = {'kind': 'cron', 'expr': cron_expr}
else:
# 无 cron 表达式立即执行一次5秒后
import time
at_iso = datetime.datetime.utcfromtimestamp(time.time() + 5).strftime('%Y-%m-%dT%H:%M:%SZ')
schedule = {'kind': 'at', 'at': at_iso}
cron_params = {
'name': job_name,
'enabled': True,
'schedule': schedule,
'sessionTarget': 'main',
'payload': payload,
}
try:
result = _acp_call('cron.add', cron_params)
job_id = result.get('id', job_name)
_save_job_id(task_id, job_id)
return {'ok': True, 'message': f'已注册到 OpenClawjob_id={job_id}', 'job_id': job_id}
except requests.exceptions.ConnectionError:
msg = f'OpenClaw 服务不可达({OPENCLAW_BASE}),任务已在灵枢侧标记运行'
print(f'[warn] {msg}')
return {'ok': True, 'message': msg, 'job_id': None, 'degraded': True}
except Exception as e:
return {'ok': False, 'message': f'注册失败: {str(e)}'}
def unregister_task(task_id: int) -> dict:
"""
停止任务:通知 OpenClaw 移除调度cron.remove
"""
# 尝试从数据库取已存的 job_id
try:
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,))
row = cur.fetchone()
cur.close(); conn.close()
extra = json.loads(row['workflow_json']) if row and row['workflow_json'] else {}
job_id = extra.get('openclaw_job_id', f'lingshu_task_{task_id}')
except Exception:
job_id = f'lingshu_task_{task_id}'
try:
_acp_call('cron.remove', {'id': job_id})
return {'ok': True, 'message': f'已从 OpenClaw 移除调度job_id={job_id}'}
except requests.exceptions.ConnectionError:
return {'ok': True, 'message': 'OpenClaw 不可达,已在灵枢侧标记为停止', 'degraded': True}
except RuntimeError as e:
if 'not found' in str(e).lower() or 'NOT_FOUND' in str(e):
return {'ok': True, 'message': '任务在 OpenClaw 中已不存在(可能已停止)'}
return {'ok': False, 'message': f'停止失败: {str(e)}'}
except Exception as e:
return {'ok': False, 'message': f'停止失败: {str(e)}'}
def trigger_once(task_id: int, execution_id: int) -> dict:
"""
手动触发一次:通过 cron.add一次性调度让 OpenClaw agent 立即执行任务。
"""
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT * FROM tasks WHERE id=%s", (task_id,))
task = cur.fetchone()
cur.close(); conn.close()
if not task:
return {'ok': False, 'message': f'任务 {task_id} 不存在'}
try:
extra = json.loads(task['workflow_json']) if task['workflow_json'] else {}
except Exception:
extra = {}
skill_name = extra.get('skill_name', task['name'])
skill_params = extra.get('skill_params', {})
if not skill_name and not task['name']:
_simulate_execution(task_id, execution_id, task['name'])
return {'ok': True, 'message': '模拟执行完成(未配置 Skill'}
_log_to_db(execution_id, task_id, 'info',
f'[灵枢] 向 OpenClaw 发送执行请求Skill={skill_name}method=cron.add(once)')
import time as _time
job_name = f'lingshu_task_{task_id}_exec_{execution_id}'
payload = _build_cron_payload(task['name'], skill_name, skill_params)
at_iso = datetime.datetime.utcfromtimestamp(_time.time() + 2).strftime('%Y-%m-%dT%H:%M:%SZ')
try:
result = _acp_call('cron.add', {
'name': job_name,
'enabled': True,
'schedule': {'kind': 'at', 'at': at_iso},
'sessionTarget': 'main',
'payload': payload,
})
returned_job_id = result.get('id', job_name)
_log_to_db(execution_id, task_id, 'info',
f'[OpenClaw] 已接收jobId={returned_job_id},异步执行中')
return {'ok': True, 'message': f'OpenClaw 已接收jobId={returned_job_id},异步执行中'}
except requests.exceptions.ConnectionError:
_log_to_db(execution_id, task_id, 'warn', '[降级] OpenClaw 不可达,切换为本地模拟执行')
_simulate_execution(task_id, execution_id, task['name'], skill_name, skill_params)
return {'ok': True, 'message': '降级执行OpenClaw 不可达,本地模拟)'}
except Exception as e:
_log_to_db(execution_id, task_id, 'error', f'[错误] 触发失败: {str(e)}')
return {'ok': False, 'message': str(e)}
def _simulate_execution(task_id, execution_id, task_name, skill_name=None, skill_params=None):
"""
OpenClaw 不可达时的降级模拟执行,写入日志并更新执行记录。
"""
import time, random
start = datetime.datetime.now()
_log_to_db(execution_id, task_id, 'info', f'[灵枢] 任务「{task_name}」开始执行')
if skill_name:
_log_to_db(execution_id, task_id, 'info', f'[Skill] 调用: {skill_name}')
_log_to_db(execution_id, task_id, 'info', f'[Skill] 参数: {json.dumps(skill_params or {}, ensure_ascii=False)}')
time.sleep(0.3)
_log_to_db(execution_id, task_id, 'success', f'[灵枢] 任务执行完成(模拟)')
duration = int((datetime.datetime.now() - start).total_seconds() * 1000)
cpu = round(random.uniform(10, 60), 1)
mem = round(random.uniform(20, 50), 1)
try:
conn = db.get_conn()
cur = conn.cursor()
cur.execute("""
UPDATE task_executions
SET status='success', ended_at=NOW(), duration_ms=%s,
cpu_usage_pct=%s, memory_usage_pct=%s
WHERE id=%s
""", (duration, cpu, mem, execution_id))
cur.execute("UPDATE tasks SET last_run_at=NOW() WHERE id=%s", (task_id,))
conn.commit()
cur.close(); conn.close()
except Exception as e:
print(f'[warn] simulate update failed: {e}')
def _save_job_id(task_id, job_id):
"""将 OpenClaw job_id 存入 workflow_json"""
try:
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,))
row = cur.fetchone()
extra = {}
if row and row['workflow_json']:
try:
extra = json.loads(row['workflow_json'])
except Exception:
pass
extra['openclaw_job_id'] = job_id
cur2 = conn.cursor()
cur2.execute("UPDATE tasks SET workflow_json=%s WHERE id=%s",
(json.dumps(extra, ensure_ascii=False), task_id))
conn.commit()
cur.close(); cur2.close(); conn.close()
except Exception as e:
print(f'[warn] save_job_id failed: {e}')