|
|
"""
|
|
|
OpenClaw 调度引擎接口模块
|
|
|
支持两种传输协议(通过环境变量 OPENCLAW_PROTOCOL 切换):
|
|
|
|
|
|
1. websocket(默认):JSON-RPC 2.0 over WebSocket
|
|
|
ws://127.0.0.1:18789/acp 或 ws://127.0.0.1:18789/
|
|
|
每次调用建立短连接,发送请求帧,读取响应帧后关闭。
|
|
|
|
|
|
2. http:JSON-RPC 2.0 over HTTP POST
|
|
|
POST http://127.0.0.1:18789/acp
|
|
|
Content-Type: application/json
|
|
|
|
|
|
支持的 method:
|
|
|
jobs.register.run_once - 立即执行一次
|
|
|
jobs.register.cron - 注册定时调度
|
|
|
jobs.cancel - 取消/移除调度
|
|
|
jobs.get - 查询任务状态
|
|
|
skills.run - 直接调用 Skill(不走调度器)
|
|
|
"""
|
|
|
import os
|
|
|
import uuid
|
|
|
import json
|
|
|
import requests
|
|
|
import datetime
|
|
|
import db
|
|
|
|
|
|
# OpenClaw 服务地址(从环境变量读取,默认本地 18789)
|
|
|
OPENCLAW_BASE = os.environ.get('OPENCLAW_URL', 'http://127.0.0.1:18789')
|
|
|
OPENCLAW_TIMEOUT = int(os.environ.get('OPENCLAW_TIMEOUT', '10'))
|
|
|
|
|
|
# 传输协议:websocket(默认) 或 http
|
|
|
# 设置方式:环境变量 OPENCLAW_PROTOCOL=http 或 OPENCLAW_PROTOCOL=websocket
|
|
|
OPENCLAW_PROTOCOL = os.environ.get('OPENCLAW_PROTOCOL', 'websocket').lower()
|
|
|
|
|
|
# WebSocket 端点路径(部分 OpenClaw 版本路径不同,可通过环境变量覆盖)
|
|
|
OPENCLAW_WS_PATH = os.environ.get('OPENCLAW_WS_PATH', '/acp')
|
|
|
|
|
|
# 灵枢回调地址(OpenClaw 执行完成后 POST 到这里)
|
|
|
LINGSHU_CALLBACK_BASE = os.environ.get('LINGSHU_CALLBACK_URL', 'http://localhost:5000')
|
|
|
|
|
|
# 根据 OPENCLAW_BASE 自动推导 WebSocket 地址
|
|
|
# http://host:port → ws://host:port
|
|
|
# https://host:port → wss://host:port
|
|
|
def _ws_url():
|
|
|
base = OPENCLAW_BASE.rstrip('/')
|
|
|
if base.startswith('https://'):
|
|
|
return 'wss://' + base[8:] + OPENCLAW_WS_PATH
|
|
|
return 'ws://' + base.replace('http://', '') + OPENCLAW_WS_PATH
|
|
|
|
|
|
ACP_ENDPOINT = f'{OPENCLAW_BASE}/acp'
|
|
|
|
|
|
|
|
|
def _headers():
|
|
|
token = os.environ.get('OPENCLAW_TOKEN', '')
|
|
|
h = {'Content-Type': 'application/json'}
|
|
|
if token:
|
|
|
h['Authorization'] = f'Bearer {token}'
|
|
|
return h
|
|
|
|
|
|
|
|
|
def _acp_call_http(method: str, params: dict) -> dict:
|
|
|
"""JSON-RPC 2.0 over HTTP POST"""
|
|
|
req_id = str(uuid.uuid4())
|
|
|
payload = {'jsonrpc': '2.0', 'id': req_id, 'method': method, 'params': params}
|
|
|
resp = requests.post(
|
|
|
ACP_ENDPOINT,
|
|
|
json=payload,
|
|
|
headers=_headers(),
|
|
|
timeout=OPENCLAW_TIMEOUT
|
|
|
)
|
|
|
resp.raise_for_status()
|
|
|
body = resp.json()
|
|
|
if 'error' in body:
|
|
|
err = body['error']
|
|
|
raise RuntimeError(f"ACP error {err.get('code')}: {err.get('message')}")
|
|
|
return body.get('result', {})
|
|
|
|
|
|
|
|
|
def _ws_parse(raw) -> dict:
|
|
|
"""解析 WebSocket 收到的帧,兼容文本帧和二进制帧"""
|
|
|
if isinstance(raw, bytes):
|
|
|
raw = raw.decode('utf-8', errors='replace')
|
|
|
return json.loads(raw)
|
|
|
|
|
|
|
|
|
def _build_device_identity():
|
|
|
"""
|
|
|
从 OpenClaw identity 目录读取设备身份(device.json + device-auth.json)。
|
|
|
目录优先级:
|
|
|
1. 环境变量 OPENCLAW_IDENTITY_DIR(start_local.sh / start.sh 自动注入)
|
|
|
2. 默认路径 ~/.openclaw/identity
|
|
|
返回 None 如果不可用(回退到 gateway token 认证)。
|
|
|
"""
|
|
|
identity_dir = os.environ.get('OPENCLAW_IDENTITY_DIR', '').strip()
|
|
|
if not identity_dir:
|
|
|
identity_dir = os.path.join(os.path.expanduser('~'), '.openclaw', 'identity')
|
|
|
device_json_path = os.path.join(identity_dir, 'device.json')
|
|
|
device_auth_path = os.path.join(identity_dir, 'device-auth.json')
|
|
|
try:
|
|
|
with open(device_json_path) as f:
|
|
|
device_json = json.load(f)
|
|
|
with open(device_auth_path) as f:
|
|
|
device_auth = json.load(f)
|
|
|
return {
|
|
|
'device_id': device_json['deviceId'],
|
|
|
'public_key_pem': device_json.get('publicKeyPem', ''),
|
|
|
'private_key_pem': device_json.get('privateKeyPem', ''),
|
|
|
'device_token': device_auth['tokens']['operator']['token'],
|
|
|
'public_key_raw': None,
|
|
|
}
|
|
|
except Exception:
|
|
|
return None
|
|
|
|
|
|
|
|
|
def _read_operator_token() -> str:
|
|
|
"""
|
|
|
读取 OpenClaw operator token,优先级:
|
|
|
1. 环境变量 OPENCLAW_OPERATOR_TOKEN(由 start.sh 启动时自动注入)
|
|
|
2. OPENCLAW_IDENTITY_DIR 或默认路径下的 device-auth.json
|
|
|
"""
|
|
|
env_token = os.environ.get('OPENCLAW_OPERATOR_TOKEN', '').strip()
|
|
|
if env_token:
|
|
|
return env_token
|
|
|
|
|
|
identity_dir = os.environ.get('OPENCLAW_IDENTITY_DIR', '').strip()
|
|
|
if not identity_dir:
|
|
|
identity_dir = os.path.join(os.path.expanduser('~'), '.openclaw', 'identity')
|
|
|
device_auth_path = os.path.join(identity_dir, 'device-auth.json')
|
|
|
try:
|
|
|
with open(device_auth_path) as f:
|
|
|
data = json.load(f)
|
|
|
return data['tokens']['operator']['token']
|
|
|
except Exception:
|
|
|
return ''
|
|
|
|
|
|
|
|
|
def _build_device_signature(device_identity, nonce: str, scopes: list,
|
|
|
client_id: str, client_mode: str, role: str) -> dict:
|
|
|
"""
|
|
|
构建 device 签名字段(v3 格式),返回 connect 请求中的 device 对象。
|
|
|
payload = v3|deviceId|clientId|clientMode|role|scopes_csv|signedAtMs|operatorToken|nonce|platform|deviceFamily
|
|
|
auth.deviceToken 和 payload 里的 token 都使用 operator token,两者必须一致。
|
|
|
"""
|
|
|
try:
|
|
|
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
|
|
|
from cryptography.hazmat.primitives.serialization import (
|
|
|
load_pem_private_key, Encoding, PublicFormat
|
|
|
)
|
|
|
import base64, time
|
|
|
except ImportError:
|
|
|
return None
|
|
|
|
|
|
signed_at = int(time.time() * 1000)
|
|
|
device_token = device_identity['device_token'] # operator token
|
|
|
device_id = device_identity['device_id']
|
|
|
scopes_csv = ','.join(scopes)
|
|
|
platform = 'linux'
|
|
|
device_family = ''
|
|
|
|
|
|
payload = '|'.join([
|
|
|
'v3', device_id, client_id, client_mode, role,
|
|
|
scopes_csv, str(signed_at), device_token, nonce, platform, device_family
|
|
|
])
|
|
|
|
|
|
private_key = load_pem_private_key(
|
|
|
device_identity['private_key_pem'].encode(), password=None
|
|
|
)
|
|
|
sig_bytes = private_key.sign(payload.encode('utf-8'))
|
|
|
signature = base64.urlsafe_b64encode(sig_bytes).rstrip(b'=').decode()
|
|
|
|
|
|
# 提取裸公钥(base64url,无 padding)
|
|
|
pub_raw = private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
|
|
|
public_key_raw = base64.urlsafe_b64encode(pub_raw).rstrip(b'=').decode()
|
|
|
|
|
|
return {
|
|
|
'id': device_id,
|
|
|
'publicKey': public_key_raw,
|
|
|
'signature': signature,
|
|
|
'signedAt': signed_at,
|
|
|
'nonce': nonce,
|
|
|
}
|
|
|
|
|
|
|
|
|
def _acp_call_ws(method: str, params: dict) -> dict:
|
|
|
"""
|
|
|
OpenClaw ACP 协议 over WebSocket,完整握手流程:
|
|
|
1. 连接 ws://host:port/
|
|
|
2. 收 connect.challenge(含 nonce)
|
|
|
3. 发 connect 请求(device 签名认证,获取 operator.admin scope)
|
|
|
4. 收 res.ok=true(hello-ok)
|
|
|
5. 发业务 RPC(type=req)
|
|
|
6. 收业务响应(type=res)
|
|
|
"""
|
|
|
try:
|
|
|
import websocket
|
|
|
except ImportError as e:
|
|
|
raise RuntimeError(f"缺少依赖库: {e},请执行: pip install websocket-client")
|
|
|
|
|
|
ws_addr = _ws_url()
|
|
|
|
|
|
ws = websocket.WebSocket(timeout=OPENCLAW_TIMEOUT)
|
|
|
# suppress_origin=True:不发送 Origin 头,避免 OpenClaw 把连接识别为 Control UI 并拒绝非本机来源
|
|
|
ws.connect(ws_addr, suppress_origin=True)
|
|
|
|
|
|
try:
|
|
|
# ── 步骤1:收 connect.challenge ──────────────────
|
|
|
raw = ws.recv()
|
|
|
challenge = _ws_parse(raw)
|
|
|
if challenge.get('type') != 'event' or challenge.get('event') != 'connect.challenge':
|
|
|
raise RuntimeError(f'预期 connect.challenge,实际收到: {challenge}')
|
|
|
nonce = challenge.get('payload', {}).get('nonce', '')
|
|
|
|
|
|
# ── 步骤2:构建认证参数 ────────────────────────────
|
|
|
client_id = 'cli'
|
|
|
client_mode = 'backend'
|
|
|
role = 'operator'
|
|
|
|
|
|
# 认证策略:device 签名(operator token),与你本机完全一致
|
|
|
# device-auth.json 里的 operator token 同时用于 auth.deviceToken 和签名 payload
|
|
|
full_scopes = ['operator.read', 'operator.write', 'operator.admin',
|
|
|
'operator.approvals', 'operator.pairing']
|
|
|
|
|
|
device_identity = _build_device_identity()
|
|
|
device_obj = None
|
|
|
if device_identity:
|
|
|
device_obj = _build_device_signature(
|
|
|
device_identity, nonce, full_scopes, client_id, client_mode, role
|
|
|
)
|
|
|
|
|
|
if device_obj:
|
|
|
auth_obj = {'deviceToken': device_identity['device_token']}
|
|
|
scopes = full_scopes
|
|
|
else:
|
|
|
auth_obj = {}
|
|
|
scopes = full_scopes
|
|
|
|
|
|
# ── 步骤3:发 connect 请求 ─────────────────────────
|
|
|
connect_id = str(uuid.uuid4())
|
|
|
connect_params = {
|
|
|
'minProtocol': 3,
|
|
|
'maxProtocol': 3,
|
|
|
'client': {
|
|
|
'id': client_id,
|
|
|
'version': '2026.3.24',
|
|
|
'platform': 'linux',
|
|
|
'mode': client_mode,
|
|
|
'instanceId': str(uuid.uuid4())
|
|
|
},
|
|
|
'role': role,
|
|
|
'scopes': scopes,
|
|
|
'caps': [],
|
|
|
'auth': auth_obj,
|
|
|
'userAgent': 'lingshu/1.0.0',
|
|
|
'locale': 'zh-CN'
|
|
|
}
|
|
|
if device_obj:
|
|
|
connect_params['device'] = device_obj
|
|
|
|
|
|
connect_req = {
|
|
|
'type': 'req',
|
|
|
'id': connect_id,
|
|
|
'method': 'connect',
|
|
|
'params': connect_params
|
|
|
}
|
|
|
ws.send(json.dumps(connect_req))
|
|
|
|
|
|
# ── 步骤3:等待 connected 确认(过滤心跳)────────
|
|
|
for _ in range(5):
|
|
|
raw = ws.recv()
|
|
|
frame = _ws_parse(raw)
|
|
|
if frame.get('type') == 'ping':
|
|
|
ws.send(json.dumps({'type': 'pong'}))
|
|
|
continue
|
|
|
if frame.get('type') == 'res' and frame.get('ok'):
|
|
|
break
|
|
|
raise RuntimeError(f'认证失败: {frame}')
|
|
|
else:
|
|
|
raise RuntimeError('未收到认证成功响应(connected)')
|
|
|
|
|
|
# ── 步骤4:发业务 RPC ─────────────────────────────
|
|
|
req_id = str(uuid.uuid4())
|
|
|
payload = json.dumps({
|
|
|
'type': 'req',
|
|
|
'id': req_id,
|
|
|
'method': method,
|
|
|
'params': params
|
|
|
})
|
|
|
ws.send(payload)
|
|
|
|
|
|
# ── 步骤5:读取业务响应(过滤心跳/通知)──────────
|
|
|
for _ in range(20):
|
|
|
raw = ws.recv()
|
|
|
frame = _ws_parse(raw)
|
|
|
if frame.get('type') == 'ping':
|
|
|
ws.send(json.dumps({'type': 'pong'}))
|
|
|
continue
|
|
|
if frame.get('id') != req_id:
|
|
|
continue
|
|
|
if not frame.get('ok'):
|
|
|
err = frame.get('error', {})
|
|
|
raise RuntimeError(f"ACP error {err.get('code')}: {err.get('message')}")
|
|
|
return frame.get('result', {})
|
|
|
|
|
|
raise RuntimeError('未在规定帧数内收到业务响应')
|
|
|
|
|
|
finally:
|
|
|
ws.close()
|
|
|
|
|
|
|
|
|
def _acp_call(method: str, params: dict) -> dict:
|
|
|
"""
|
|
|
统一入口:根据 OPENCLAW_PROTOCOL 选择 WebSocket 或 HTTP。
|
|
|
"""
|
|
|
if OPENCLAW_PROTOCOL == 'http':
|
|
|
return _acp_call_http(method, params)
|
|
|
return _acp_call_ws(method, params)
|
|
|
|
|
|
|
|
|
def _log_to_db(execution_id, task_id, level, message, node_name=None):
|
|
|
"""写一条执行日志到数据库"""
|
|
|
try:
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("""
|
|
|
INSERT INTO task_logs (execution_id, task_id, log_level, log_time, message, node_name)
|
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
|
""", (execution_id, task_id, level,
|
|
|
datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3],
|
|
|
message, node_name))
|
|
|
conn.commit()
|
|
|
cur.close(); conn.close()
|
|
|
except Exception as e:
|
|
|
print(f'[warn] log_to_db failed: {e}')
|
|
|
|
|
|
|
|
|
def _build_cron_payload(task_name: str, skill_name: str, skill_params: dict) -> dict:
|
|
|
"""
|
|
|
构建 OpenClaw cron job payload(sessionTarget=main 要求 systemEvent 类型)。
|
|
|
text 字段包含任务描述,供 agent 识别。
|
|
|
"""
|
|
|
params_str = json.dumps(skill_params, ensure_ascii=False) if skill_params else '{}'
|
|
|
text = (
|
|
|
f'执行任务「{task_name}」'
|
|
|
f' Skill={skill_name}'
|
|
|
f' 参数={params_str}'
|
|
|
)
|
|
|
return {
|
|
|
'kind': 'systemEvent',
|
|
|
'text': text,
|
|
|
}
|
|
|
|
|
|
|
|
|
def register_task(task_id: int) -> dict:
|
|
|
"""
|
|
|
启动任务:从数据库读取任务配置,向 OpenClaw 注册 Cron 调度(cron.add)。
|
|
|
返回 {'ok': True/False, 'message': str, 'job_id': str}
|
|
|
"""
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("""
|
|
|
SELECT t.*, a.agent_code, a.api_endpoint AS agent_endpoint
|
|
|
FROM tasks t LEFT JOIN agents a ON t.agent_id = a.id
|
|
|
WHERE t.id = %s
|
|
|
""", (task_id,))
|
|
|
task = cur.fetchone()
|
|
|
cur.close(); conn.close()
|
|
|
|
|
|
if not task:
|
|
|
return {'ok': False, 'message': f'任务 {task_id} 不存在'}
|
|
|
|
|
|
try:
|
|
|
extra = json.loads(task['workflow_json']) if task['workflow_json'] else {}
|
|
|
except Exception:
|
|
|
extra = {}
|
|
|
|
|
|
skill_name = extra.get('skill_name', task['name'])
|
|
|
skill_params = extra.get('skill_params', {})
|
|
|
cron_expr = task.get('cron_expression') or ''
|
|
|
|
|
|
job_name = f'lingshu_task_{task_id}'
|
|
|
payload = _build_cron_payload(task['name'], skill_name, skill_params)
|
|
|
|
|
|
if cron_expr and task.get('trigger_type') == 'cron':
|
|
|
schedule = {'kind': 'cron', 'expr': cron_expr}
|
|
|
else:
|
|
|
# 无 cron 表达式:立即执行一次(5秒后)
|
|
|
import time
|
|
|
at_iso = datetime.datetime.utcfromtimestamp(time.time() + 5).strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
|
schedule = {'kind': 'at', 'at': at_iso}
|
|
|
|
|
|
cron_params = {
|
|
|
'name': job_name,
|
|
|
'enabled': True,
|
|
|
'schedule': schedule,
|
|
|
'sessionTarget': 'main',
|
|
|
'payload': payload,
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
result = _acp_call('cron.add', cron_params)
|
|
|
job_id = result.get('id', job_name)
|
|
|
_save_job_id(task_id, job_id)
|
|
|
return {'ok': True, 'message': f'已注册到 OpenClaw,job_id={job_id}', 'job_id': job_id}
|
|
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
|
msg = f'OpenClaw 服务不可达({OPENCLAW_BASE}),任务已在灵枢侧标记运行'
|
|
|
print(f'[warn] {msg}')
|
|
|
return {'ok': True, 'message': msg, 'job_id': None, 'degraded': True}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'ok': False, 'message': f'注册失败: {str(e)}'}
|
|
|
|
|
|
|
|
|
def unregister_task(task_id: int) -> dict:
|
|
|
"""
|
|
|
停止任务:通知 OpenClaw 移除调度(cron.remove)。
|
|
|
"""
|
|
|
# 尝试从数据库取已存的 job_id
|
|
|
try:
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,))
|
|
|
row = cur.fetchone()
|
|
|
cur.close(); conn.close()
|
|
|
extra = json.loads(row['workflow_json']) if row and row['workflow_json'] else {}
|
|
|
job_id = extra.get('openclaw_job_id', f'lingshu_task_{task_id}')
|
|
|
except Exception:
|
|
|
job_id = f'lingshu_task_{task_id}'
|
|
|
|
|
|
try:
|
|
|
_acp_call('cron.remove', {'id': job_id})
|
|
|
return {'ok': True, 'message': f'已从 OpenClaw 移除调度,job_id={job_id}'}
|
|
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
|
return {'ok': True, 'message': 'OpenClaw 不可达,已在灵枢侧标记为停止', 'degraded': True}
|
|
|
|
|
|
except RuntimeError as e:
|
|
|
if 'not found' in str(e).lower() or 'NOT_FOUND' in str(e):
|
|
|
return {'ok': True, 'message': '任务在 OpenClaw 中已不存在(可能已停止)'}
|
|
|
return {'ok': False, 'message': f'停止失败: {str(e)}'}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {'ok': False, 'message': f'停止失败: {str(e)}'}
|
|
|
|
|
|
|
|
|
def trigger_once(task_id: int, execution_id: int) -> dict:
|
|
|
"""
|
|
|
手动触发一次:通过 cron.add(一次性调度)让 OpenClaw agent 立即执行任务。
|
|
|
"""
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT * FROM tasks WHERE id=%s", (task_id,))
|
|
|
task = cur.fetchone()
|
|
|
cur.close(); conn.close()
|
|
|
|
|
|
if not task:
|
|
|
return {'ok': False, 'message': f'任务 {task_id} 不存在'}
|
|
|
|
|
|
try:
|
|
|
extra = json.loads(task['workflow_json']) if task['workflow_json'] else {}
|
|
|
except Exception:
|
|
|
extra = {}
|
|
|
|
|
|
skill_name = extra.get('skill_name', task['name'])
|
|
|
skill_params = extra.get('skill_params', {})
|
|
|
|
|
|
if not skill_name and not task['name']:
|
|
|
_simulate_execution(task_id, execution_id, task['name'])
|
|
|
return {'ok': True, 'message': '模拟执行完成(未配置 Skill)'}
|
|
|
|
|
|
_log_to_db(execution_id, task_id, 'info',
|
|
|
f'[灵枢] 向 OpenClaw 发送执行请求,Skill={skill_name},method=cron.add(once)')
|
|
|
|
|
|
import time as _time
|
|
|
job_name = f'lingshu_task_{task_id}_exec_{execution_id}'
|
|
|
payload = _build_cron_payload(task['name'], skill_name, skill_params)
|
|
|
at_iso = datetime.datetime.utcfromtimestamp(_time.time() + 2).strftime('%Y-%m-%dT%H:%M:%SZ')
|
|
|
|
|
|
try:
|
|
|
result = _acp_call('cron.add', {
|
|
|
'name': job_name,
|
|
|
'enabled': True,
|
|
|
'schedule': {'kind': 'at', 'at': at_iso},
|
|
|
'sessionTarget': 'main',
|
|
|
'payload': payload,
|
|
|
})
|
|
|
returned_job_id = result.get('id', job_name)
|
|
|
_log_to_db(execution_id, task_id, 'info',
|
|
|
f'[OpenClaw] 已接收,jobId={returned_job_id},异步执行中')
|
|
|
return {'ok': True, 'message': f'OpenClaw 已接收,jobId={returned_job_id},异步执行中'}
|
|
|
|
|
|
except requests.exceptions.ConnectionError:
|
|
|
_log_to_db(execution_id, task_id, 'warn', '[降级] OpenClaw 不可达,切换为本地模拟执行')
|
|
|
_simulate_execution(task_id, execution_id, task['name'], skill_name, skill_params)
|
|
|
return {'ok': True, 'message': '降级执行(OpenClaw 不可达,本地模拟)'}
|
|
|
|
|
|
except Exception as e:
|
|
|
_log_to_db(execution_id, task_id, 'error', f'[错误] 触发失败: {str(e)}')
|
|
|
return {'ok': False, 'message': str(e)}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _simulate_execution(task_id, execution_id, task_name, skill_name=None, skill_params=None):
|
|
|
"""
|
|
|
OpenClaw 不可达时的降级模拟执行,写入日志并更新执行记录。
|
|
|
"""
|
|
|
import time, random
|
|
|
start = datetime.datetime.now()
|
|
|
|
|
|
_log_to_db(execution_id, task_id, 'info', f'[灵枢] 任务「{task_name}」开始执行')
|
|
|
if skill_name:
|
|
|
_log_to_db(execution_id, task_id, 'info', f'[Skill] 调用: {skill_name}')
|
|
|
_log_to_db(execution_id, task_id, 'info', f'[Skill] 参数: {json.dumps(skill_params or {}, ensure_ascii=False)}')
|
|
|
time.sleep(0.3)
|
|
|
_log_to_db(execution_id, task_id, 'success', f'[灵枢] 任务执行完成(模拟)')
|
|
|
|
|
|
duration = int((datetime.datetime.now() - start).total_seconds() * 1000)
|
|
|
cpu = round(random.uniform(10, 60), 1)
|
|
|
mem = round(random.uniform(20, 50), 1)
|
|
|
|
|
|
try:
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("""
|
|
|
UPDATE task_executions
|
|
|
SET status='success', ended_at=NOW(), duration_ms=%s,
|
|
|
cpu_usage_pct=%s, memory_usage_pct=%s
|
|
|
WHERE id=%s
|
|
|
""", (duration, cpu, mem, execution_id))
|
|
|
cur.execute("UPDATE tasks SET last_run_at=NOW() WHERE id=%s", (task_id,))
|
|
|
conn.commit()
|
|
|
cur.close(); conn.close()
|
|
|
except Exception as e:
|
|
|
print(f'[warn] simulate update failed: {e}')
|
|
|
|
|
|
|
|
|
def _save_job_id(task_id, job_id):
|
|
|
"""将 OpenClaw job_id 存入 workflow_json"""
|
|
|
try:
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,))
|
|
|
row = cur.fetchone()
|
|
|
extra = {}
|
|
|
if row and row['workflow_json']:
|
|
|
try:
|
|
|
extra = json.loads(row['workflow_json'])
|
|
|
except Exception:
|
|
|
pass
|
|
|
extra['openclaw_job_id'] = job_id
|
|
|
cur2 = conn.cursor()
|
|
|
cur2.execute("UPDATE tasks SET workflow_json=%s WHERE id=%s",
|
|
|
(json.dumps(extra, ensure_ascii=False), task_id))
|
|
|
conn.commit()
|
|
|
cur.close(); cur2.close(); conn.close()
|
|
|
except Exception as e:
|
|
|
print(f'[warn] save_job_id failed: {e}')
|