""" OpenClaw 调度引擎接口模块 支持两种传输协议(通过环境变量 OPENCLAW_PROTOCOL 切换): 1. websocket(默认):JSON-RPC 2.0 over WebSocket ws://127.0.0.1:18789/acp 或 ws://127.0.0.1:18789/ 每次调用建立短连接,发送请求帧,读取响应帧后关闭。 2. http:JSON-RPC 2.0 over HTTP POST POST http://127.0.0.1:18789/acp Content-Type: application/json 支持的 method: jobs.register.run_once - 立即执行一次 jobs.register.cron - 注册定时调度 jobs.cancel - 取消/移除调度 jobs.get - 查询任务状态 skills.run - 直接调用 Skill(不走调度器) """ import os import uuid import json import requests import datetime import db # OpenClaw 服务地址(从环境变量读取,默认本地 18789) OPENCLAW_BASE = os.environ.get('OPENCLAW_URL', 'http://127.0.0.1:18789') OPENCLAW_TIMEOUT = int(os.environ.get('OPENCLAW_TIMEOUT', '10')) # 传输协议:websocket(默认) 或 http # 设置方式:环境变量 OPENCLAW_PROTOCOL=http 或 OPENCLAW_PROTOCOL=websocket OPENCLAW_PROTOCOL = os.environ.get('OPENCLAW_PROTOCOL', 'websocket').lower() # WebSocket 端点路径(部分 OpenClaw 版本路径不同,可通过环境变量覆盖) OPENCLAW_WS_PATH = os.environ.get('OPENCLAW_WS_PATH', '/acp') # 灵枢回调地址(OpenClaw 执行完成后 POST 到这里) LINGSHU_CALLBACK_BASE = os.environ.get('LINGSHU_CALLBACK_URL', 'http://localhost:5000') # 根据 OPENCLAW_BASE 自动推导 WebSocket 地址 # http://host:port → ws://host:port # https://host:port → wss://host:port def _ws_url(): base = OPENCLAW_BASE.rstrip('/') if base.startswith('https://'): return 'wss://' + base[8:] + OPENCLAW_WS_PATH return 'ws://' + base.replace('http://', '') + OPENCLAW_WS_PATH ACP_ENDPOINT = f'{OPENCLAW_BASE}/acp' def _headers(): token = os.environ.get('OPENCLAW_TOKEN', '') h = {'Content-Type': 'application/json'} if token: h['Authorization'] = f'Bearer {token}' return h def _acp_call_http(method: str, params: dict) -> dict: """JSON-RPC 2.0 over HTTP POST""" req_id = str(uuid.uuid4()) payload = {'jsonrpc': '2.0', 'id': req_id, 'method': method, 'params': params} resp = requests.post( ACP_ENDPOINT, json=payload, headers=_headers(), timeout=OPENCLAW_TIMEOUT ) resp.raise_for_status() body = resp.json() if 'error' in body: err = body['error'] raise RuntimeError(f"ACP error {err.get('code')}: {err.get('message')}") return body.get('result', {}) def _ws_parse(raw) -> dict: """解析 WebSocket 收到的帧,兼容文本帧和二进制帧""" if isinstance(raw, bytes): raw = raw.decode('utf-8', errors='replace') return json.loads(raw) def _build_device_identity(): """ 从 OpenClaw identity 目录读取设备身份(device.json + device-auth.json)。 目录优先级: 1. 环境变量 OPENCLAW_IDENTITY_DIR(start_local.sh / start.sh 自动注入) 2. 默认路径 ~/.openclaw/identity 返回 None 如果不可用(回退到 gateway token 认证)。 """ identity_dir = os.environ.get('OPENCLAW_IDENTITY_DIR', '').strip() if not identity_dir: identity_dir = os.path.join(os.path.expanduser('~'), '.openclaw', 'identity') device_json_path = os.path.join(identity_dir, 'device.json') device_auth_path = os.path.join(identity_dir, 'device-auth.json') try: with open(device_json_path) as f: device_json = json.load(f) with open(device_auth_path) as f: device_auth = json.load(f) return { 'device_id': device_json['deviceId'], 'public_key_pem': device_json.get('publicKeyPem', ''), 'private_key_pem': device_json.get('privateKeyPem', ''), 'device_token': device_auth['tokens']['operator']['token'], 'public_key_raw': None, } except Exception: return None def _read_operator_token() -> str: """ 读取 OpenClaw operator token,优先级: 1. 环境变量 OPENCLAW_OPERATOR_TOKEN(由 start.sh 启动时自动注入) 2. OPENCLAW_IDENTITY_DIR 或默认路径下的 device-auth.json """ env_token = os.environ.get('OPENCLAW_OPERATOR_TOKEN', '').strip() if env_token: return env_token identity_dir = os.environ.get('OPENCLAW_IDENTITY_DIR', '').strip() if not identity_dir: identity_dir = os.path.join(os.path.expanduser('~'), '.openclaw', 'identity') device_auth_path = os.path.join(identity_dir, 'device-auth.json') try: with open(device_auth_path) as f: data = json.load(f) return data['tokens']['operator']['token'] except Exception: return '' def _build_device_signature(device_identity, nonce: str, scopes: list, client_id: str, client_mode: str, role: str) -> dict: """ 构建 device 签名字段(v3 格式),返回 connect 请求中的 device 对象。 payload = v3|deviceId|clientId|clientMode|role|scopes_csv|signedAtMs|operatorToken|nonce|platform|deviceFamily auth.deviceToken 和 payload 里的 token 都使用 operator token,两者必须一致。 """ try: from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.serialization import ( load_pem_private_key, Encoding, PublicFormat ) import base64, time except ImportError: return None signed_at = int(time.time() * 1000) device_token = device_identity['device_token'] # operator token device_id = device_identity['device_id'] scopes_csv = ','.join(scopes) platform = 'linux' device_family = '' payload = '|'.join([ 'v3', device_id, client_id, client_mode, role, scopes_csv, str(signed_at), device_token, nonce, platform, device_family ]) private_key = load_pem_private_key( device_identity['private_key_pem'].encode(), password=None ) sig_bytes = private_key.sign(payload.encode('utf-8')) signature = base64.urlsafe_b64encode(sig_bytes).rstrip(b'=').decode() # 提取裸公钥(base64url,无 padding) pub_raw = private_key.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) public_key_raw = base64.urlsafe_b64encode(pub_raw).rstrip(b'=').decode() return { 'id': device_id, 'publicKey': public_key_raw, 'signature': signature, 'signedAt': signed_at, 'nonce': nonce, } def _acp_call_ws(method: str, params: dict) -> dict: """ OpenClaw ACP 协议 over WebSocket,完整握手流程: 1. 连接 ws://host:port/ 2. 收 connect.challenge(含 nonce) 3. 发 connect 请求(device 签名认证,获取 operator.admin scope) 4. 收 res.ok=true(hello-ok) 5. 发业务 RPC(type=req) 6. 收业务响应(type=res) """ try: import websocket except ImportError as e: raise RuntimeError(f"缺少依赖库: {e},请执行: pip install websocket-client") ws_addr = _ws_url() ws = websocket.WebSocket(timeout=OPENCLAW_TIMEOUT) # suppress_origin=True:不发送 Origin 头,避免 OpenClaw 把连接识别为 Control UI 并拒绝非本机来源 ws.connect(ws_addr, suppress_origin=True) try: # ── 步骤1:收 connect.challenge ────────────────── raw = ws.recv() challenge = _ws_parse(raw) if challenge.get('type') != 'event' or challenge.get('event') != 'connect.challenge': raise RuntimeError(f'预期 connect.challenge,实际收到: {challenge}') nonce = challenge.get('payload', {}).get('nonce', '') # ── 步骤2:构建认证参数 ──────────────────────────── client_id = 'cli' client_mode = 'backend' role = 'operator' # 认证策略:device 签名(operator token),与你本机完全一致 # device-auth.json 里的 operator token 同时用于 auth.deviceToken 和签名 payload full_scopes = ['operator.read', 'operator.write', 'operator.admin', 'operator.approvals', 'operator.pairing'] device_identity = _build_device_identity() device_obj = None if device_identity: device_obj = _build_device_signature( device_identity, nonce, full_scopes, client_id, client_mode, role ) if device_obj: auth_obj = {'deviceToken': device_identity['device_token']} scopes = full_scopes else: auth_obj = {} scopes = full_scopes # ── 步骤3:发 connect 请求 ───────────────────────── connect_id = str(uuid.uuid4()) connect_params = { 'minProtocol': 3, 'maxProtocol': 3, 'client': { 'id': client_id, 'version': '2026.3.24', 'platform': 'linux', 'mode': client_mode, 'instanceId': str(uuid.uuid4()) }, 'role': role, 'scopes': scopes, 'caps': [], 'auth': auth_obj, 'userAgent': 'lingshu/1.0.0', 'locale': 'zh-CN' } if device_obj: connect_params['device'] = device_obj connect_req = { 'type': 'req', 'id': connect_id, 'method': 'connect', 'params': connect_params } ws.send(json.dumps(connect_req)) # ── 步骤3:等待 connected 确认(过滤心跳)──────── for _ in range(5): raw = ws.recv() frame = _ws_parse(raw) if frame.get('type') == 'ping': ws.send(json.dumps({'type': 'pong'})) continue if frame.get('type') == 'res' and frame.get('ok'): break raise RuntimeError(f'认证失败: {frame}') else: raise RuntimeError('未收到认证成功响应(connected)') # ── 步骤4:发业务 RPC ───────────────────────────── req_id = str(uuid.uuid4()) payload = json.dumps({ 'type': 'req', 'id': req_id, 'method': method, 'params': params }) ws.send(payload) # ── 步骤5:读取业务响应(过滤心跳/通知)────────── for _ in range(20): raw = ws.recv() frame = _ws_parse(raw) if frame.get('type') == 'ping': ws.send(json.dumps({'type': 'pong'})) continue if frame.get('id') != req_id: continue if not frame.get('ok'): err = frame.get('error', {}) raise RuntimeError(f"ACP error {err.get('code')}: {err.get('message')}") return frame.get('result', {}) raise RuntimeError('未在规定帧数内收到业务响应') finally: ws.close() def _acp_call(method: str, params: dict) -> dict: """ 统一入口:根据 OPENCLAW_PROTOCOL 选择 WebSocket 或 HTTP。 """ if OPENCLAW_PROTOCOL == 'http': return _acp_call_http(method, params) return _acp_call_ws(method, params) def _log_to_db(execution_id, task_id, level, message, node_name=None): """写一条执行日志到数据库""" try: conn = db.get_conn() cur = conn.cursor() cur.execute(""" INSERT INTO task_logs (execution_id, task_id, log_level, log_time, message, node_name) VALUES (%s, %s, %s, %s, %s, %s) """, (execution_id, task_id, level, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], message, node_name)) conn.commit() cur.close(); conn.close() except Exception as e: print(f'[warn] log_to_db failed: {e}') def _build_cron_payload(task_name: str, skill_name: str, skill_params: dict) -> dict: """ 构建 OpenClaw cron job payload(sessionTarget=main 要求 systemEvent 类型)。 text 字段包含任务描述,供 agent 识别。 """ params_str = json.dumps(skill_params, ensure_ascii=False) if skill_params else '{}' text = ( f'执行任务「{task_name}」' f' Skill={skill_name}' f' 参数={params_str}' ) return { 'kind': 'systemEvent', 'text': text, } def register_task(task_id: int) -> dict: """ 启动任务:从数据库读取任务配置,向 OpenClaw 注册 Cron 调度(cron.add)。 返回 {'ok': True/False, 'message': str, 'job_id': str} """ conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute(""" SELECT t.*, a.agent_code, a.api_endpoint AS agent_endpoint FROM tasks t LEFT JOIN agents a ON t.agent_id = a.id WHERE t.id = %s """, (task_id,)) task = cur.fetchone() cur.close(); conn.close() if not task: return {'ok': False, 'message': f'任务 {task_id} 不存在'} try: extra = json.loads(task['workflow_json']) if task['workflow_json'] else {} except Exception: extra = {} skill_name = extra.get('skill_name', task['name']) skill_params = extra.get('skill_params', {}) cron_expr = task.get('cron_expression') or '' job_name = f'lingshu_task_{task_id}' payload = _build_cron_payload(task['name'], skill_name, skill_params) if cron_expr and task.get('trigger_type') == 'cron': schedule = {'kind': 'cron', 'expr': cron_expr} else: # 无 cron 表达式:立即执行一次(5秒后) import time at_iso = datetime.datetime.utcfromtimestamp(time.time() + 5).strftime('%Y-%m-%dT%H:%M:%SZ') schedule = {'kind': 'at', 'at': at_iso} cron_params = { 'name': job_name, 'enabled': True, 'schedule': schedule, 'sessionTarget': 'main', 'payload': payload, } try: result = _acp_call('cron.add', cron_params) job_id = result.get('id', job_name) _save_job_id(task_id, job_id) return {'ok': True, 'message': f'已注册到 OpenClaw,job_id={job_id}', 'job_id': job_id} except requests.exceptions.ConnectionError: msg = f'OpenClaw 服务不可达({OPENCLAW_BASE}),任务已在灵枢侧标记运行' print(f'[warn] {msg}') return {'ok': True, 'message': msg, 'job_id': None, 'degraded': True} except Exception as e: return {'ok': False, 'message': f'注册失败: {str(e)}'} def unregister_task(task_id: int) -> dict: """ 停止任务:通知 OpenClaw 移除调度(cron.remove)。 """ # 尝试从数据库取已存的 job_id try: conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,)) row = cur.fetchone() cur.close(); conn.close() extra = json.loads(row['workflow_json']) if row and row['workflow_json'] else {} job_id = extra.get('openclaw_job_id', f'lingshu_task_{task_id}') except Exception: job_id = f'lingshu_task_{task_id}' try: _acp_call('cron.remove', {'id': job_id}) return {'ok': True, 'message': f'已从 OpenClaw 移除调度,job_id={job_id}'} except requests.exceptions.ConnectionError: return {'ok': True, 'message': 'OpenClaw 不可达,已在灵枢侧标记为停止', 'degraded': True} except RuntimeError as e: if 'not found' in str(e).lower() or 'NOT_FOUND' in str(e): return {'ok': True, 'message': '任务在 OpenClaw 中已不存在(可能已停止)'} return {'ok': False, 'message': f'停止失败: {str(e)}'} except Exception as e: return {'ok': False, 'message': f'停止失败: {str(e)}'} def trigger_once(task_id: int, execution_id: int) -> dict: """ 手动触发一次:通过 cron.add(一次性调度)让 OpenClaw agent 立即执行任务。 """ conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT * FROM tasks WHERE id=%s", (task_id,)) task = cur.fetchone() cur.close(); conn.close() if not task: return {'ok': False, 'message': f'任务 {task_id} 不存在'} try: extra = json.loads(task['workflow_json']) if task['workflow_json'] else {} except Exception: extra = {} skill_name = extra.get('skill_name', task['name']) skill_params = extra.get('skill_params', {}) if not skill_name and not task['name']: _simulate_execution(task_id, execution_id, task['name']) return {'ok': True, 'message': '模拟执行完成(未配置 Skill)'} _log_to_db(execution_id, task_id, 'info', f'[灵枢] 向 OpenClaw 发送执行请求,Skill={skill_name},method=cron.add(once)') import time as _time job_name = f'lingshu_task_{task_id}_exec_{execution_id}' payload = _build_cron_payload(task['name'], skill_name, skill_params) at_iso = datetime.datetime.utcfromtimestamp(_time.time() + 2).strftime('%Y-%m-%dT%H:%M:%SZ') try: result = _acp_call('cron.add', { 'name': job_name, 'enabled': True, 'schedule': {'kind': 'at', 'at': at_iso}, 'sessionTarget': 'main', 'payload': payload, }) returned_job_id = result.get('id', job_name) _log_to_db(execution_id, task_id, 'info', f'[OpenClaw] 已接收,jobId={returned_job_id},异步执行中') return {'ok': True, 'message': f'OpenClaw 已接收,jobId={returned_job_id},异步执行中'} except requests.exceptions.ConnectionError: _log_to_db(execution_id, task_id, 'warn', '[降级] OpenClaw 不可达,切换为本地模拟执行') _simulate_execution(task_id, execution_id, task['name'], skill_name, skill_params) return {'ok': True, 'message': '降级执行(OpenClaw 不可达,本地模拟)'} except Exception as e: _log_to_db(execution_id, task_id, 'error', f'[错误] 触发失败: {str(e)}') return {'ok': False, 'message': str(e)} def _simulate_execution(task_id, execution_id, task_name, skill_name=None, skill_params=None): """ OpenClaw 不可达时的降级模拟执行,写入日志并更新执行记录。 """ import time, random start = datetime.datetime.now() _log_to_db(execution_id, task_id, 'info', f'[灵枢] 任务「{task_name}」开始执行') if skill_name: _log_to_db(execution_id, task_id, 'info', f'[Skill] 调用: {skill_name}') _log_to_db(execution_id, task_id, 'info', f'[Skill] 参数: {json.dumps(skill_params or {}, ensure_ascii=False)}') time.sleep(0.3) _log_to_db(execution_id, task_id, 'success', f'[灵枢] 任务执行完成(模拟)') duration = int((datetime.datetime.now() - start).total_seconds() * 1000) cpu = round(random.uniform(10, 60), 1) mem = round(random.uniform(20, 50), 1) try: conn = db.get_conn() cur = conn.cursor() cur.execute(""" UPDATE task_executions SET status='success', ended_at=NOW(), duration_ms=%s, cpu_usage_pct=%s, memory_usage_pct=%s WHERE id=%s """, (duration, cpu, mem, execution_id)) cur.execute("UPDATE tasks SET last_run_at=NOW() WHERE id=%s", (task_id,)) conn.commit() cur.close(); conn.close() except Exception as e: print(f'[warn] simulate update failed: {e}') def _save_job_id(task_id, job_id): """将 OpenClaw job_id 存入 workflow_json""" try: conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,)) row = cur.fetchone() extra = {} if row and row['workflow_json']: try: extra = json.loads(row['workflow_json']) except Exception: pass extra['openclaw_job_id'] = job_id cur2 = conn.cursor() cur2.execute("UPDATE tasks SET workflow_json=%s WHERE id=%s", (json.dumps(extra, ensure_ascii=False), task_id)) conn.commit() cur.close(); cur2.close(); conn.close() except Exception as e: print(f'[warn] save_job_id failed: {e}')