""" OpenClaw 调度引擎接口模块 支持两种传输协议(通过环境变量 OPENCLAW_PROTOCOL 切换): 1. websocket(默认):JSON-RPC 2.0 over WebSocket ws://127.0.0.1:18789/acp 或 ws://127.0.0.1:18789/ 每次调用建立短连接,发送请求帧,读取响应帧后关闭。 2. http:JSON-RPC 2.0 over HTTP POST POST http://127.0.0.1:18789/acp Content-Type: application/json 支持的 method: jobs.register.run_once - 立即执行一次 jobs.register.cron - 注册定时调度 jobs.cancel - 取消/移除调度 jobs.get - 查询任务状态 skills.run - 直接调用 Skill(不走调度器) """ import os import uuid import json import requests import datetime import db # OpenClaw 服务地址(从环境变量读取,默认本地 18789) OPENCLAW_BASE = os.environ.get('OPENCLAW_URL', 'http://127.0.0.1:18789') OPENCLAW_TIMEOUT = int(os.environ.get('OPENCLAW_TIMEOUT', '10')) # 传输协议:websocket(默认) 或 http # 设置方式:环境变量 OPENCLAW_PROTOCOL=http 或 OPENCLAW_PROTOCOL=websocket OPENCLAW_PROTOCOL = os.environ.get('OPENCLAW_PROTOCOL', 'websocket').lower() # WebSocket 端点路径(部分 OpenClaw 版本路径不同,可通过环境变量覆盖) OPENCLAW_WS_PATH = os.environ.get('OPENCLAW_WS_PATH', '/acp') # 灵枢回调地址(OpenClaw 执行完成后 POST 到这里) LINGSHU_CALLBACK_BASE = os.environ.get('LINGSHU_CALLBACK_URL', 'http://localhost:5000') # 根据 OPENCLAW_BASE 自动推导 WebSocket 地址 # http://host:port → ws://host:port # https://host:port → wss://host:port def _ws_url(): base = OPENCLAW_BASE.rstrip('/') if base.startswith('https://'): return 'wss://' + base[8:] + OPENCLAW_WS_PATH return 'ws://' + base.replace('http://', '') + OPENCLAW_WS_PATH ACP_ENDPOINT = f'{OPENCLAW_BASE}/acp' def _headers(): token = os.environ.get('OPENCLAW_TOKEN', '') h = {'Content-Type': 'application/json'} if token: h['Authorization'] = f'Bearer {token}' return h def _acp_call_http(method: str, params: dict) -> dict: """JSON-RPC 2.0 over HTTP POST""" req_id = str(uuid.uuid4()) payload = {'jsonrpc': '2.0', 'id': req_id, 'method': method, 'params': params} resp = requests.post( ACP_ENDPOINT, json=payload, headers=_headers(), timeout=OPENCLAW_TIMEOUT ) resp.raise_for_status() body = resp.json() if 'error' in body: err = body['error'] raise RuntimeError(f"ACP error {err.get('code')}: {err.get('message')}") return body.get('result', {}) def _acp_call_ws(method: str, params: dict) -> dict: """JSON-RPC 2.0 over WebSocket(短连接模式,一请求一响应)""" try: import websocket # websocket-client except ImportError: raise RuntimeError( "缺少 websocket-client 库,请在容器/环境中执行: pip install websocket-client" ) req_id = str(uuid.uuid4()) payload = json.dumps({'jsonrpc': '2.0', 'id': req_id, 'method': method, 'params': params}) token = os.environ.get('OPENCLAW_TOKEN', '') headers = [f'Authorization: Bearer {token}'] if token else [] ws_addr = _ws_url() ws = websocket.create_connection( ws_addr, timeout=OPENCLAW_TIMEOUT, header=headers ) try: ws.send(payload) raw = ws.recv() finally: ws.close() body = json.loads(raw) if 'error' in body: err = body['error'] raise RuntimeError(f"ACP error {err.get('code')}: {err.get('message')}") return body.get('result', {}) def _acp_call(method: str, params: dict) -> dict: """ 统一入口:根据 OPENCLAW_PROTOCOL 选择 WebSocket 或 HTTP。 """ if OPENCLAW_PROTOCOL == 'http': return _acp_call_http(method, params) return _acp_call_ws(method, params) def _log_to_db(execution_id, task_id, level, message, node_name=None): """写一条执行日志到数据库""" try: conn = db.get_conn() cur = conn.cursor() cur.execute(""" INSERT INTO task_logs (execution_id, task_id, log_level, log_time, message, node_name) VALUES (%s, %s, %s, %s, %s, %s) """, (execution_id, task_id, level, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], message, node_name)) conn.commit() cur.close(); conn.close() except Exception as e: print(f'[warn] log_to_db failed: {e}') def _build_executor(skill_name, skill_params, timeout, callback_url, execution_id): """ 构建 OpenClaw executor 配置。 优先使用 skill executor,如果 skill_name 包含路径分隔符则用 shell。 """ if '/' in skill_name or skill_name.endswith('.sh') or skill_name.endswith('.py'): # 当作 shell 命令 return { 'type': 'shell', 'command': [skill_name] + [json.dumps(skill_params)] if skill_params else [skill_name], 'timeout': timeout } else: # OpenClaw skill executor return { 'type': 'skill', 'skill': skill_name, 'params': skill_params or {}, 'timeout': timeout, 'callbackUrl': callback_url, 'callbackParams': {'execution_id': execution_id} } def register_task(task_id: int) -> dict: """ 启动任务:从数据库读取任务配置,向 OpenClaw 注册 Cron 调度。 返回 {'ok': True/False, 'message': str, 'job_id': str} """ conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute(""" SELECT t.*, a.agent_code, a.api_endpoint AS agent_endpoint FROM tasks t LEFT JOIN agents a ON t.agent_id = a.id WHERE t.id = %s """, (task_id,)) task = cur.fetchone() cur.close(); conn.close() if not task: return {'ok': False, 'message': f'任务 {task_id} 不存在'} # 解析 workflow_json 中的扩展字段 try: extra = json.loads(task['workflow_json']) if task['workflow_json'] else {} except Exception: extra = {} skill_name = extra.get('skill_name', '') skill_params = extra.get('skill_params', {}) timeout = extra.get('timeout', 300) retry_count = extra.get('retry_count', 3) cron_expr = task.get('cron_expression') or '' if not skill_name: return {'ok': False, 'message': '未配置 Skill(workflow_json.skill_name),无法启动'} callback_url = f"{LINGSHU_CALLBACK_BASE}/api/openclaw/callback/{task_id}" job_name = f'lingshu_task_{task_id}' executor = _build_executor(skill_name, skill_params, timeout, callback_url, None) # 有 Cron 表达式:注册定时调度;否则只做 run_once if cron_expr and task.get('trigger_type') == 'cron': method = 'jobs.register.cron' params = { 'job': { 'name': job_name, 'cron': cron_expr, 'executor': executor, 'retry': retry_count, 'taskName': task['name'], } } else: method = 'jobs.register.run_once' params = { 'job': { 'name': job_name, 'executor': executor, 'delaySeconds': 0, 'retry': retry_count, 'taskName': task['name'], } } try: result = _acp_call(method, params) job_id = result.get('jobId', job_name) _save_job_id(task_id, job_id) return {'ok': True, 'message': f'已注册到 OpenClaw,job_id={job_id}', 'job_id': job_id} except requests.exceptions.ConnectionError: msg = f'OpenClaw 服务不可达({OPENCLAW_BASE}),任务已在灵枢侧标记运行,将在 OpenClaw 恢复后自动接管' print(f'[warn] {msg}') return {'ok': True, 'message': msg, 'job_id': None, 'degraded': True} except Exception as e: return {'ok': False, 'message': f'注册失败: {str(e)}'} def unregister_task(task_id: int) -> dict: """ 停止任务:通知 OpenClaw 移除调度(jobs.cancel)。 """ job_name = f'lingshu_task_{task_id}' # 尝试从数据库取已存的 job_id try: conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,)) row = cur.fetchone() cur.close(); conn.close() extra = json.loads(row['workflow_json']) if row and row['workflow_json'] else {} job_id = extra.get('openclaw_job_id', job_name) except Exception: job_id = job_name try: _acp_call('jobs.cancel', {'jobId': job_id}) return {'ok': True, 'message': f'已从 OpenClaw 移除调度,job_id={job_id}'} except requests.exceptions.ConnectionError: return {'ok': True, 'message': 'OpenClaw 不可达,已在灵枢侧标记为停止', 'degraded': True} except RuntimeError as e: # ACP error 404 => job 不存在,视为成功 if '404' in str(e) or 'not found' in str(e).lower(): return {'ok': True, 'message': '任务在 OpenClaw 中已不存在(可能已停止)'} return {'ok': False, 'message': f'停止失败: {str(e)}'} except Exception as e: return {'ok': False, 'message': f'停止失败: {str(e)}'} def trigger_once(task_id: int, execution_id: int) -> dict: """ 手动触发一次:立即让 OpenClaw 执行 Skill(jobs.register.run_once)。 """ conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT * FROM tasks WHERE id=%s", (task_id,)) task = cur.fetchone() cur.close(); conn.close() if not task: return {'ok': False, 'message': f'任务 {task_id} 不存在'} try: extra = json.loads(task['workflow_json']) if task['workflow_json'] else {} except Exception: extra = {} skill_name = extra.get('skill_name', '') skill_params = extra.get('skill_params', {}) timeout = extra.get('timeout', 300) if not skill_name: # 没有 Skill,直接模拟执行(演示用) _simulate_execution(task_id, execution_id, task['name']) return {'ok': True, 'message': '模拟执行完成(未配置 Skill)'} callback_url = f"{LINGSHU_CALLBACK_BASE}/api/openclaw/callback/{task_id}" job_name = f'lingshu_task_{task_id}_exec_{execution_id}' executor = _build_executor(skill_name, skill_params, timeout, callback_url, execution_id) _log_to_db(execution_id, task_id, 'info', f'[灵枢] 向 OpenClaw 发送执行请求,Skill={skill_name},method=jobs.register.run_once') try: result = _acp_call('jobs.register.run_once', { 'job': { 'name': job_name, 'executor': executor, 'delaySeconds': 0, 'taskName': task['name'], } }) returned_job_id = result.get('jobId', job_name) _log_to_db(execution_id, task_id, 'info', f'[OpenClaw] 已接收,jobId={returned_job_id},异步执行中') return {'ok': True, 'message': f'OpenClaw 已接收,jobId={returned_job_id},异步执行中'} except requests.exceptions.ConnectionError: _log_to_db(execution_id, task_id, 'warn', '[降级] OpenClaw 不可达,切换为本地模拟执行') _simulate_execution(task_id, execution_id, task['name'], skill_name, skill_params) return {'ok': True, 'message': '降级执行(OpenClaw 不可达,本地模拟)'} except Exception as e: _log_to_db(execution_id, task_id, 'error', f'[错误] 触发失败: {str(e)}') return {'ok': False, 'message': str(e)} def query_job(task_id: int) -> dict: """查询 OpenClaw 中任务的当前状态(jobs.get)""" try: conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,)) row = cur.fetchone() cur.close(); conn.close() extra = json.loads(row['workflow_json']) if row and row['workflow_json'] else {} job_id = extra.get('openclaw_job_id', f'lingshu_task_{task_id}') except Exception: job_id = f'lingshu_task_{task_id}' try: result = _acp_call('jobs.get', {'jobId': job_id}) return {'ok': True, 'job': result} except requests.exceptions.ConnectionError: return {'ok': False, 'message': 'OpenClaw 不可达'} except Exception as e: return {'ok': False, 'message': str(e)} def _simulate_execution(task_id, execution_id, task_name, skill_name=None, skill_params=None): """ OpenClaw 不可达时的降级模拟执行,写入日志并更新执行记录。 """ import time, random start = datetime.datetime.now() _log_to_db(execution_id, task_id, 'info', f'[灵枢] 任务「{task_name}」开始执行') if skill_name: _log_to_db(execution_id, task_id, 'info', f'[Skill] 调用: {skill_name}') _log_to_db(execution_id, task_id, 'info', f'[Skill] 参数: {json.dumps(skill_params or {}, ensure_ascii=False)}') time.sleep(0.3) _log_to_db(execution_id, task_id, 'success', f'[灵枢] 任务执行完成(模拟)') duration = int((datetime.datetime.now() - start).total_seconds() * 1000) cpu = round(random.uniform(10, 60), 1) mem = round(random.uniform(20, 50), 1) try: conn = db.get_conn() cur = conn.cursor() cur.execute(""" UPDATE task_executions SET status='success', ended_at=NOW(), duration_ms=%s, cpu_usage_pct=%s, memory_usage_pct=%s WHERE id=%s """, (duration, cpu, mem, execution_id)) cur.execute("UPDATE tasks SET last_run_at=NOW() WHERE id=%s", (task_id,)) conn.commit() cur.close(); conn.close() except Exception as e: print(f'[warn] simulate update failed: {e}') def _save_job_id(task_id, job_id): """将 OpenClaw job_id 存入 workflow_json""" try: conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,)) row = cur.fetchone() extra = {} if row and row['workflow_json']: try: extra = json.loads(row['workflow_json']) except Exception: pass extra['openclaw_job_id'] = job_id cur2 = conn.cursor() cur2.execute("UPDATE tasks SET workflow_json=%s WHERE id=%s", (json.dumps(extra, ensure_ascii=False), task_id)) conn.commit() cur.close(); cur2.close(); conn.close() except Exception as e: print(f'[warn] save_job_id failed: {e}')