""" 灵枢智能任务平台 —— Flask 后端服务 数据库: MySQL 8.0 (lingshu) """ from flask import Flask, request, jsonify, send_from_directory, g from flask_cors import CORS import db import os import jwt import bcrypt import datetime import threading from functools import wraps import openclaw app = Flask(__name__, static_folder=os.path.join(os.path.dirname(__file__), '..')) app.config['JSON_AS_ASCII'] = False CORS(app) JWT_SECRET = os.environ.get('LINGSHU_JWT_SECRET', 'lingshu-secret-key-change-in-prod') JWT_EXPIRE_HOURS = 24 * 7 # 7天 # ───────────────────────────────────────────── # JWT 认证装饰器 # ───────────────────────────────────────────── def require_auth(f): @wraps(f) def decorated(*args, **kwargs): auth_header = request.headers.get('Authorization', '') if not auth_header.startswith('Bearer '): return jsonify({'error': '未登录,请先登录'}), 401 token = auth_header[7:] try: payload = jwt.decode(token, JWT_SECRET, algorithms=['HS256']) g.current_user = payload except jwt.ExpiredSignatureError: return jsonify({'error': 'Token 已过期,请重新登录'}), 401 except jwt.InvalidTokenError: return jsonify({'error': 'Token 无效'}), 401 return f(*args, **kwargs) return decorated # ───────────────────────────────────────────── # 静态文件(托管前端 HTML) # ───────────────────────────────────────────── @app.route('/') def index(): return send_from_directory(app.static_folder, '灵枢智能任务平台.html') @app.route('/logo.png') def logo(): return send_from_directory(app.static_folder, 'logo.png') # ───────────────────────────────────────────── # 认证接口 # ───────────────────────────────────────────── @app.route('/api/auth/login', methods=['POST']) def login(): data = request.get_json() or {} username = data.get('username', '').strip() password = data.get('password', '') if not username or not password: return jsonify({'error': '用户名和密码不能为空'}), 400 conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT id, username, display_name, password_hash, avatar_color FROM users WHERE username=%s", (username,)) user = cur.fetchone() cur.close(); conn.close() if not user: return jsonify({'error': '用户名或密码错误'}), 401 # 兼容 bcrypt hash 和明文(初始数据用明文,建议后续全部改为 bcrypt) ph = user['password_hash'] ok = False if ph and ph.startswith('$2'): try: ok = bcrypt.checkpw(password.encode(), ph.encode()) except Exception: ok = False else: ok = (password == ph) if not ok: return jsonify({'error': '用户名或密码错误'}), 401 payload = { 'id': user['id'], 'username': user['username'], 'name': user['display_name'], 'color': user['avatar_color'] or '#3b82f6', 'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=JWT_EXPIRE_HOURS) } token = jwt.encode(payload, JWT_SECRET, algorithm='HS256') return jsonify({ 'token': token, 'user': { 'id': user['id'], 'name': user['display_name'], 'username': user['username'], 'color': user['avatar_color'] or '#3b82f6', 'avatar': (user['display_name'] or '?')[0] } }) @app.route('/api/auth/me') @require_auth def auth_me(): u = g.current_user return jsonify({ 'id': u['id'], 'name': u['name'], 'username': u['username'], 'color': u['color'], 'avatar': (u['name'] or '?')[0] }) @app.route('/api/auth/register', methods=['POST']) def register(): """注册新用户(管理员可用,生产环境可关闭此接口)""" data = request.get_json() or {} username = data.get('username', '').strip() password = data.get('password', '') display_name = data.get('display_name', username).strip() if not username or not password: return jsonify({'error': '用户名和密码不能为空'}), 400 if len(password) < 4: return jsonify({'error': '密码至少4位'}), 400 conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT id FROM users WHERE username=%s", (username,)) if cur.fetchone(): cur.close(); conn.close() return jsonify({'error': '用户名已存在'}), 409 hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode() color = data.get('color', '#3b82f6') cur2 = conn.cursor() cur2.execute( "INSERT INTO users (username, display_name, password_hash, avatar_color) VALUES (%s, %s, %s, %s)", (username, display_name, hashed, color) ) conn.commit() new_id = cur2.lastrowid cur.close(); cur2.close(); conn.close() # 将新用户加入默认团队(id=1) try: conn2 = db.get_conn() c2 = conn2.cursor() c2.execute("INSERT IGNORE INTO team_members (team_id, user_id, role) VALUES (1, %s, 'viewer')", (new_id,)) conn2.commit() c2.close(); conn2.close() except Exception: pass return jsonify({'id': new_id, 'message': '注册成功'}), 201 # ───────────────────────────────────────────── # 工作台 KPI # ───────────────────────────────────────────── @app.route('/api/dashboard/kpi') @require_auth def dashboard_kpi(): owner_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT COUNT(*) AS total, SUM(status='running') AS running FROM tasks WHERE owner_id=%s", (owner_id,)) row = cur.fetchone() cur.execute(""" SELECT COUNT(*) AS monthly FROM task_executions e JOIN tasks t ON e.task_id = t.id WHERE t.owner_id=%s AND MONTH(e.started_at)=MONTH(CURDATE()) AND YEAR(e.started_at)=YEAR(CURDATE()) """, (owner_id,)) monthly = cur.fetchone()['monthly'] cur.execute("SELECT COUNT(*) AS alerts FROM alert_notifications WHERE target_user_id=%s AND is_read=0", (owner_id,)) alerts = cur.fetchone()['alerts'] cur.close(); conn.close() return jsonify({ 'running': int(row['running'] or 0), 'total': int(row['total'] or 0), 'monthly_triggers': monthly, 'alerts': alerts }) @app.route('/api/dashboard/exec_trend') @require_auth def exec_trend(): days = int(request.args.get('days', 7)) owner_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute(""" SELECT DATE(e.started_at) AS day, SUM(e.status='success') AS success, SUM(e.status='failed' OR e.status='timeout') AS fail FROM task_executions e JOIN tasks t ON e.task_id = t.id WHERE t.owner_id=%s AND e.started_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY) GROUP BY DATE(e.started_at) ORDER BY day ASC """, (owner_id, days)) rows = cur.fetchall() cur.close(); conn.close() return jsonify([{'day': str(r['day']), 'success': int(r['success']), 'fail': int(r['fail'])} for r in rows]) @app.route('/api/dashboard/agent_usage') @require_auth def agent_usage(): conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT name, usage_count FROM agents WHERE is_public=1 ORDER BY usage_count DESC LIMIT 6") rows = cur.fetchall() cur.close(); conn.close() return jsonify([{'name': r['name'], 'value': r['usage_count']} for r in rows]) @app.route('/api/dashboard/status_dist') @require_auth def status_dist(): owner_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT status, COUNT(*) AS cnt FROM tasks WHERE owner_id=%s GROUP BY status", (owner_id,)) rows = cur.fetchall() cur.close(); conn.close() label_map = {'running': '运行中', 'stopped': '已停止', 'error': '错误', 'draft': '草稿'} return jsonify([{'name': label_map.get(r['status'], r['status']), 'value': r['cnt']} for r in rows]) # ───────────────────────────────────────────── # 任务中心 # ───────────────────────────────────────────── @app.route('/api/tasks') @require_auth def list_tasks(): import json as _json owner_id = g.current_user['id'] keyword = request.args.get('keyword', '').strip() conn = db.get_conn() cur = conn.cursor(dictionary=True) if keyword: cur.execute(""" SELECT t.id, t.name, t.cron_expression, t.status, a.name AS agent, t.trigger_type, t.workflow_json FROM tasks t LEFT JOIN agents a ON t.agent_id=a.id WHERE t.owner_id=%s AND t.name LIKE %s ORDER BY t.updated_at DESC """, (owner_id, f'%{keyword}%')) else: cur.execute(""" SELECT t.id, t.name, t.cron_expression, t.status, a.name AS agent, t.trigger_type, t.workflow_json FROM tasks t LEFT JOIN agents a ON t.agent_id=a.id WHERE t.owner_id=%s ORDER BY t.updated_at DESC """, (owner_id,)) rows = cur.fetchall() cur.close(); conn.close() result = [] for r in rows: item = dict(r) wj = item.pop('workflow_json', None) try: extra = _json.loads(wj) if wj else {} except Exception: extra = {} item['description'] = extra.get('description', '') item['category'] = extra.get('category', '') item['skill_name'] = extra.get('skill_name', '') item['timeout'] = extra.get('timeout', 300) item['retry_count'] = extra.get('retry_count', 3) result.append(item) return jsonify(result) @app.route('/api/tasks/', methods=['GET']) @require_auth def get_task(task_id): import json as _json conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute(""" SELECT t.*, a.name AS agent_name FROM tasks t LEFT JOIN agents a ON t.agent_id=a.id WHERE t.id=%s """, (task_id,)) row = cur.fetchone() cur.close(); conn.close() if not row: return jsonify({'error': 'not found'}), 404 wj = row.pop('workflow_json', None) try: extra = _json.loads(wj) if wj else {} except Exception: extra = {} row['description'] = extra.get('description', '') row['category'] = extra.get('category', '') row['skill_name'] = extra.get('skill_name', '') row['skill_params'] = extra.get('skill_params', {}) row['timeout'] = extra.get('timeout', 300) row['retry_count'] = extra.get('retry_count', 3) return jsonify(db.serialize_row(row)) @app.route('/api/tasks', methods=['POST']) @require_auth def create_task(): import json as _json data = request.get_json() or {} owner_id = g.current_user['id'] name = data.get('name', '').strip() if not name: return jsonify({'error': '任务名称不能为空'}), 400 # 触发方式:有 cron_expression 且不为空则为 cron,否则 manual cron_expr = data.get('cron_expression', '').strip() trigger_type = 'cron' if cron_expr else 'manual' # 扩展字段(先用 task_config 的 JSON 字段存储,待数据库结构扩展后迁移) extra = { 'description': data.get('description', ''), 'category': data.get('category', ''), 'skill_name': data.get('skill_name', ''), 'skill_params': data.get('skill_params', {}), 'datasource_id': data.get('datasource_id'), 'timeout': data.get('timeout', 300), 'retry_count': data.get('retry_count', 3), } extra.update(data.get('task_config', {})) # 获取用户所在团队 conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT team_id FROM team_members WHERE user_id=%s LIMIT 1", (owner_id,)) tm = cur.fetchone() team_id = tm['team_id'] if tm else None cur2 = conn.cursor() status = data.get('status', 'stopped') if status not in ('stopped', 'draft'): status = 'stopped' cur2.execute(""" INSERT INTO tasks (owner_id, team_id, name, cron_expression, agent_id, trigger_type, status, workflow_json) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """, ( owner_id, team_id, name, cron_expr or None, data.get('agent_id') or None, trigger_type, status, _json.dumps(extra, ensure_ascii=False) )) conn.commit() new_id = cur2.lastrowid cur.close(); cur2.close(); conn.close() _audit_log(owner_id, team_id, f"创建了任务「{name}」", 'task', new_id) return jsonify({'id': new_id, 'message': '任务创建成功'}), 201 @app.route('/api/tasks/', methods=['PUT']) @require_auth def update_task(task_id): import json as _json data = request.get_json() or {} conn = db.get_conn() cur = conn.cursor(dictionary=True) # 先读取现有 workflow_json,用于合并扩展字段 cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,)) row = cur.fetchone() try: existing_extra = _json.loads(row['workflow_json']) if row and row['workflow_json'] else {} except Exception: existing_extra = {} # 更新扩展字段到 workflow_json for key in ['description', 'category', 'skill_name', 'skill_params', 'datasource_id', 'timeout', 'retry_count']: if key in data: existing_extra[key] = data[key] cur2 = conn.cursor() fields, vals = [], [] for key in ['name', 'cron_expression', 'agent_id', 'trigger_type']: if key in data: fields.append(f"{key}=%s") vals.append(data[key]) fields.append("workflow_json=%s") vals.append(_json.dumps(existing_extra, ensure_ascii=False)) vals.append(task_id) cur2.execute(f"UPDATE tasks SET {','.join(fields)}, updated_at=NOW() WHERE id=%s", vals) conn.commit() cur.close(); cur2.close(); conn.close() return jsonify({'message': '更新成功'}) @app.route('/api/tasks//copy', methods=['POST']) @require_auth def copy_task(task_id): import json as _json owner_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT * FROM tasks WHERE id=%s AND owner_id=%s", (task_id, owner_id)) task = cur.fetchone() if not task: cur.close(); conn.close() return jsonify({'error': '任务不存在'}), 404 cur2 = conn.cursor() cur2.execute(""" INSERT INTO tasks (owner_id, team_id, name, cron_expression, agent_id, trigger_type, status, workflow_json) VALUES (%s, %s, %s, %s, %s, %s, 'stopped', %s) """, ( owner_id, task['team_id'], task['name'] + '-副本', task['cron_expression'], task['agent_id'], task['trigger_type'], task['workflow_json'] )) conn.commit() new_id = cur2.lastrowid cur.close(); cur2.close(); conn.close() _audit_log(owner_id, task['team_id'], f"复制了任务「{task['name']}」", 'task', new_id) return jsonify({'id': new_id, 'message': '复制成功'}), 201 @app.route('/api/tasks/', methods=['DELETE']) @require_auth def delete_task(task_id): owner_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT id, name, team_id FROM tasks WHERE id=%s AND owner_id=%s", (task_id, owner_id)) task = cur.fetchone() if not task: cur.close(); conn.close() return jsonify({'error': '任务不存在或无权删除'}), 404 # 先停止(如果运行中),再删除执行记录和任务 cur2 = conn.cursor() cur2.execute("DELETE FROM task_logs WHERE execution_id IN (SELECT id FROM task_executions WHERE task_id=%s)", (task_id,)) cur2.execute("DELETE FROM task_executions WHERE task_id=%s", (task_id,)) cur2.execute("DELETE FROM tasks WHERE id=%s", (task_id,)) conn.commit() cur.close(); cur2.close(); conn.close() _audit_log(owner_id, task.get('team_id'), f"删除了任务「{task['name']}」", 'task', task_id) return jsonify({'message': '删除成功'}) @app.route('/api/tasks//status', methods=['PUT']) @require_auth def toggle_task_status(task_id): import json as _json data = request.get_json() new_status = data.get('status') if new_status not in ('running', 'stopped'): return jsonify({'error': 'invalid status'}), 400 conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT id, name, status, workflow_json FROM tasks WHERE id=%s", (task_id,)) task = cur.fetchone() cur.close(); conn.close() if not task: return jsonify({'error': '任务不存在'}), 404 result_msg = f'状态已切换为 {new_status}' openclaw_msg = '' if new_status == 'running': # 向 OpenClaw 注册调度(异步,不阻塞接口响应) def _register(): res = openclaw.register_task(task_id) if not res['ok']: # 注册失败时将任务改回 stopped 并写告警 try: c = db.get_conn() cu = c.cursor() cu.execute("UPDATE tasks SET status='stopped' WHERE id=%s", (task_id,)) cu.execute(""" INSERT INTO alert_notifications (target_user_id, task_id, alert_type, title, content) SELECT owner_id, %s, 'task_fail', '任务启动失败', %s FROM tasks WHERE id=%s """, (task_id, res['message'], task_id)) c.commit(); cu.close(); c.close() except Exception: pass threading.Thread(target=_register, daemon=True).start() openclaw_msg = '正在向 OpenClaw 注册调度...' else: # stopped # 通知 OpenClaw 移除调度 res = openclaw.unregister_task(task_id) openclaw_msg = res['message'] conn2 = db.get_conn() cur2 = conn2.cursor() cur2.execute("UPDATE tasks SET status=%s WHERE id=%s", (new_status, task_id)) conn2.commit() cur2.close(); conn2.close() return jsonify({'message': result_msg, 'openclaw': openclaw_msg}) @app.route('/api/tasks//execute', methods=['POST']) @require_auth def manual_execute(task_id): trigger_user_id = g.current_user['id'] # 1. 创建执行记录 conn = db.get_conn() cur = conn.cursor() cur.execute(""" INSERT INTO task_executions (task_id, trigger_type, trigger_user_id, status, started_at) VALUES (%s, 'manual', %s, 'running', NOW()) """, (task_id, trigger_user_id)) conn.commit() exec_id = cur.lastrowid cur.close(); conn.close() # 2. 异步调用 OpenClaw(不阻塞前端响应) def _run(): res = openclaw.trigger_once(task_id, exec_id) if not res['ok']: try: c = db.get_conn() cu = c.cursor() cu.execute(""" UPDATE task_executions SET status='failed', ended_at=NOW(), error_message=%s WHERE id=%s """, (res['message'], exec_id)) c.commit(); cu.close(); c.close() except Exception: pass threading.Thread(target=_run, daemon=True).start() return jsonify({'execution_id': exec_id, 'message': '已触发手动执行,正在调用 OpenClaw...'}) @app.route('/api/tasks//executions') @require_auth def task_executions(task_id): conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute(""" SELECT id, trigger_type, status, started_at, ended_at, duration_ms, retry_attempt, error_message, cpu_usage_pct, memory_usage_pct FROM task_executions WHERE task_id=%s ORDER BY started_at DESC LIMIT 20 """, (task_id,)) rows = cur.fetchall() cur.close(); conn.close() return jsonify(db.serialize_rows(rows)) @app.route('/api/tasks//logs') @require_auth def task_logs(task_id): level = request.args.get('level', 'all') conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT id FROM task_executions WHERE task_id=%s ORDER BY started_at DESC LIMIT 1", (task_id,)) exec_row = cur.fetchone() if not exec_row: cur.close(); conn.close() return jsonify([]) exec_id = exec_row['id'] if level == 'all': cur.execute("SELECT log_level, log_time, message, node_name FROM task_logs WHERE execution_id=%s ORDER BY log_time ASC", (exec_id,)) else: cur.execute("SELECT log_level, log_time, message, node_name FROM task_logs WHERE execution_id=%s AND log_level=%s ORDER BY log_time ASC", (exec_id, level)) rows = cur.fetchall() cur.close(); conn.close() return jsonify(db.serialize_rows(rows)) # ───────────────────────────────────────────── # 应用空间 —— 智能体广场(公共,不需个人认证也可浏览,但统一加认证) # ───────────────────────────────────────────── @app.route('/api/agents') @require_auth def list_agents(): keyword = request.args.get('keyword', '').strip() conn = db.get_conn() cur = conn.cursor(dictionary=True) if keyword: cur.execute(""" SELECT id, agent_code, name, provider, category, description, usage_count, tags FROM agents WHERE is_public=1 AND status='active' AND (name LIKE %s OR description LIKE %s) ORDER BY usage_count DESC """, (f'%{keyword}%', f'%{keyword}%')) else: cur.execute(""" SELECT id, agent_code, name, provider, category, description, usage_count, tags FROM agents WHERE is_public=1 AND status='active' ORDER BY usage_count DESC """) rows = cur.fetchall() cur.close(); conn.close() return jsonify(rows) # ───────────────────────────────────────────── # 应用空间 —— 任务模板广场(公共共享) # ───────────────────────────────────────────── @app.route('/api/task_templates') @require_auth def list_templates(): keyword = request.args.get('keyword', '').strip() conn = db.get_conn() cur = conn.cursor(dictionary=True) if keyword: cur.execute(""" SELECT tt.id, tt.title, tt.description, tt.category, tt.tags, tt.usage_count, tt.contributor_alias, u.display_name AS contributor_name FROM task_templates tt JOIN users u ON tt.contributor_id=u.id WHERE tt.is_active=1 AND (tt.title LIKE %s OR tt.description LIKE %s) ORDER BY tt.usage_count DESC """, (f'%{keyword}%', f'%{keyword}%')) else: cur.execute(""" SELECT tt.id, tt.title, tt.description, tt.category, tt.tags, tt.usage_count, tt.contributor_alias, u.display_name AS contributor_name FROM task_templates tt JOIN users u ON tt.contributor_id=u.id WHERE tt.is_active=1 ORDER BY tt.usage_count DESC """) rows = cur.fetchall() cur.close(); conn.close() return jsonify(rows) @app.route('/api/task_templates', methods=['POST']) @require_auth def publish_template(): data = request.get_json() owner_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor() cur.execute(""" INSERT INTO task_templates (title, description, category, contributor_id, contributor_alias, task_config) VALUES (%s, %s, %s, %s, %s, %s) """, ( data['title'], data.get('description', ''), data.get('category', '情报处理'), owner_id, data.get('contributor_alias', g.current_user['name']), db.to_json(data.get('task_config', {})) )) conn.commit() new_id = cur.lastrowid cur.close(); conn.close() return jsonify({'id': new_id, 'message': '模板发布成功'}), 201 @app.route('/api/task_templates//import', methods=['POST']) @require_auth def import_template(tpl_id): owner_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT * FROM task_templates WHERE id=%s", (tpl_id,)) tpl = cur.fetchone() if not tpl: cur.close(); conn.close() return jsonify({'error': 'template not found'}), 404 config = tpl.get('task_config') or {} cur2 = conn.cursor() cur2.execute(""" INSERT INTO tasks (owner_id, name, cron_expression, agent_id, trigger_type, status, source_template_id) VALUES (%s, %s, %s, NULL, 'cron', 'stopped', %s) """, ( owner_id, tpl['title'] + ' (导入)', config.get('cron_expression', '未配置'), tpl_id )) cur2.execute("UPDATE task_templates SET usage_count=usage_count+1 WHERE id=%s", (tpl_id,)) conn.commit() new_task_id = cur2.lastrowid cur.close(); cur2.close(); conn.close() return jsonify({'task_id': new_task_id, 'message': '导入成功'}) # ───────────────────────────────────────────── # 团队协作空间 # ───────────────────────────────────────────── @app.route('/api/teams//members') @require_auth def team_members(team_id): conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute(""" SELECT u.id, u.display_name AS name, u.avatar_color AS color, tm.role, tm.is_online AS online, tm.joined_at FROM team_members tm JOIN users u ON tm.user_id=u.id WHERE tm.team_id=%s ORDER BY tm.joined_at ASC """, (team_id,)) rows = cur.fetchall() cur.close(); conn.close() return jsonify(db.serialize_rows(rows)) @app.route('/api/teams//members//role', methods=['PUT']) @require_auth def change_member_role(team_id, user_id): data = request.get_json() role = data.get('role') if role not in ('admin', 'editor', 'viewer'): return jsonify({'error': 'invalid role'}), 400 conn = db.get_conn() cur = conn.cursor() cur.execute("UPDATE team_members SET role=%s WHERE team_id=%s AND user_id=%s", (role, team_id, user_id)) conn.commit() cur.close(); conn.close() return jsonify({'message': '角色已更新'}) @app.route('/api/teams//members/', methods=['DELETE']) @require_auth def remove_member(team_id, user_id): conn = db.get_conn() cur = conn.cursor() cur.execute("DELETE FROM team_members WHERE team_id=%s AND user_id=%s", (team_id, user_id)) conn.commit() cur.close(); conn.close() return jsonify({'message': '成员已移除'}) @app.route('/api/teams//members', methods=['POST']) @require_auth def invite_member(team_id): data = request.get_json() name = data.get('name', '').strip() if not name: return jsonify({'error': 'name required'}), 400 conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT id FROM users WHERE display_name=%s", (name,)) user = cur.fetchone() if not user: import random, string uname = 'user_' + ''.join(random.choices(string.digits, k=6)) cur2 = conn.cursor() cur2.execute(""" INSERT INTO users (username, display_name, password_hash, avatar_color) VALUES (%s, %s, 'placeholder', %s) """, (uname, name, data.get('color', '#3b82f6'))) conn.commit() user_id = cur2.lastrowid cur2.close() else: user_id = user['id'] cur3 = conn.cursor() try: cur3.execute("INSERT INTO team_members (team_id, user_id, role) VALUES (%s, %s, 'viewer')", (team_id, user_id)) conn.commit() except Exception: pass cur3.close(); cur.close(); conn.close() return jsonify({'user_id': user_id, 'message': f'{name} 已加入团队'}) @app.route('/api/teams//shared_resources') @require_auth def shared_resources(team_id): conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute(""" SELECT sr.id, sr.resource_type, sr.resource_id, sr.resource_name, sr.usage_count, u.display_name AS creator, sr.shared_at FROM team_shared_resources sr JOIN users u ON sr.shared_by=u.id WHERE sr.team_id=%s AND sr.is_active=1 ORDER BY sr.shared_at DESC """, (team_id,)) rows = cur.fetchall() cur.close(); conn.close() return jsonify(db.serialize_rows(rows)) @app.route('/api/teams//shared_resources/', methods=['DELETE']) @require_auth def unshare_resource(team_id, res_id): conn = db.get_conn() cur = conn.cursor() cur.execute("UPDATE team_shared_resources SET is_active=0 WHERE id=%s AND team_id=%s", (res_id, team_id)) conn.commit() cur.close(); conn.close() return jsonify({'message': '已取消共享'}) @app.route('/api/teams//audit_logs') @require_auth def audit_logs(team_id): conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute(""" SELECT al.id, u.display_name AS user, al.action, al.resource_type, al.created_at FROM team_audit_logs al JOIN users u ON al.user_id=u.id WHERE al.team_id=%s ORDER BY al.created_at DESC LIMIT 30 """, (team_id,)) rows = cur.fetchall() cur.close(); conn.close() return jsonify(db.serialize_rows(rows)) @app.route('/api/teams//comments') @require_auth def team_comments(team_id): conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute(""" SELECT c.id, u.display_name AS user, u.avatar_color AS color, c.content AS text, c.created_at AS time, c.parent_id FROM team_comments c JOIN users u ON c.user_id=u.id WHERE c.team_id=%s AND c.is_deleted=0 ORDER BY c.created_at ASC """, (team_id,)) rows = cur.fetchall() cur.close(); conn.close() return jsonify(db.serialize_rows(rows)) @app.route('/api/teams//comments', methods=['POST']) @require_auth def post_comment(team_id): data = request.get_json() user_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor() cur.execute(""" INSERT INTO team_comments (team_id, user_id, content, parent_id) VALUES (%s, %s, %s, %s) """, (team_id, user_id, data['content'], data.get('parent_id'))) conn.commit() new_id = cur.lastrowid cur.close(); conn.close() return jsonify({'id': new_id, 'message': '留言成功'}) @app.route('/api/teams//info') @require_auth def team_info(team_id): conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT id, team_code, team_name, description FROM teams WHERE id=%s", (team_id,)) row = cur.fetchone() cur.close(); conn.close() if not row: return jsonify({'error': 'not found'}), 404 return jsonify(row) # 获取当前用户所属团队 @app.route('/api/my/team') @require_auth def my_team(): user_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute(""" SELECT t.id, t.team_code, t.team_name FROM team_members tm JOIN teams t ON tm.team_id=t.id WHERE tm.user_id=%s ORDER BY tm.joined_at ASC LIMIT 1 """, (user_id,)) row = cur.fetchone() cur.close(); conn.close() if not row: return jsonify({'id': 1, 'team_code': 'DEFAULT', 'team_name': '默认团队'}) return jsonify(row) # ───────────────────────────────────────────── # 核心配置中心 # ───────────────────────────────────────────── @app.route('/api/datasources') @require_auth def list_datasources(): owner_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute(""" SELECT id, name, ds_type, host, database_name, username, connection_status, last_tested_at FROM datasources WHERE owner_id=%s ORDER BY created_at DESC """, (owner_id,)) rows = cur.fetchall() cur.close(); conn.close() return jsonify(db.serialize_rows(rows)) @app.route('/api/datasources/', methods=['PUT']) @require_auth def update_datasource(ds_id): data = request.get_json() conn = db.get_conn() cur = conn.cursor() password = data.get('password', '') enc = db.simple_encrypt(password) if password else None fields, vals = [], [] if 'host' in data: fields.append('host=%s'); vals.append(data['host']) if 'username' in data: fields.append('username=%s'); vals.append(data['username']) if enc: fields.append('credential_enc=%s'); vals.append(enc) if fields: vals.append(ds_id) cur.execute(f"UPDATE datasources SET {','.join(fields)} WHERE id=%s", vals) conn.commit() cur.close(); conn.close() return jsonify({'message': '凭证已保存'}) @app.route('/api/datasources', methods=['POST']) @require_auth def create_datasource(): data = request.get_json() owner_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor() enc = db.simple_encrypt(data.get('password', '')) cur.execute(""" INSERT INTO datasources (owner_id, name, ds_type, host, database_name, username, credential_enc, connection_status) VALUES (%s, %s, %s, %s, %s, %s, %s, 'unknown') """, ( owner_id, data['name'], data.get('ds_type', 'mysql'), data.get('host', ''), data.get('database_name', ''), data.get('username', ''), enc )) conn.commit() new_id = cur.lastrowid cur.close(); conn.close() return jsonify({'id': new_id, 'message': '数据源已创建'}), 201 @app.route('/api/ai_engines') @require_auth def list_ai_engines(): conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute("SELECT id, engine_type, name, endpoint, description, status, last_checked_at FROM ai_engines") rows = cur.fetchall() cur.close(); conn.close() return jsonify(db.serialize_rows(rows)) @app.route('/api/ai_engines//restart', methods=['POST']) @require_auth def restart_engine(eng_id): conn = db.get_conn() cur = conn.cursor() cur.execute("UPDATE ai_engines SET status='ready', last_checked_at=NOW() WHERE id=%s", (eng_id,)) conn.commit() cur.close(); conn.close() return jsonify({'message': '重启指令已发送'}) @app.route('/api/alerts') @require_auth def list_alerts(): user_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor(dictionary=True) cur.execute(""" SELECT id, alert_type, title, content, is_read, created_at FROM alert_notifications WHERE target_user_id=%s ORDER BY created_at DESC LIMIT 20 """, (user_id,)) rows = cur.fetchall() cur.close(); conn.close() return jsonify(db.serialize_rows(rows)) @app.route('/api/alerts/read_all', methods=['POST']) @require_auth def mark_alerts_read(): user_id = g.current_user['id'] conn = db.get_conn() cur = conn.cursor() cur.execute("UPDATE alert_notifications SET is_read=1 WHERE target_user_id=%s", (user_id,)) conn.commit() cur.close(); conn.close() return jsonify({'message': '全部已读'}) # ───────────────────────────────────────────── # OpenClaw 回调接口(无需登录验证,由 OpenClaw 直接调用) # ───────────────────────────────────────────── @app.route('/api/openclaw/callback/', methods=['POST']) def openclaw_callback(task_id): """ OpenClaw 执行完成后回调此接口,写入执行结果和日志。 请求体示例: { "execution_id": 123, # 灵枢侧执行记录ID(可选,优先用) "job_id": "lingshu_task_1_exec_123", "status": "success", # success / failed / timeout "duration_ms": 1500, "cpu_usage_pct": 35.2, "memory_usage_pct": 28.1, "error_message": "", "logs": [ {"level": "info", "time": "2026-01-01 12:00:00.000", "message": "xxx", "node": "step1"}, ... ] } """ import json as _json data = request.get_json(silent=True) or {} status = data.get('status', 'success') duration_ms = data.get('duration_ms') cpu = data.get('cpu_usage_pct') mem = data.get('memory_usage_pct') error_msg = data.get('error_message', '') logs = data.get('logs', []) exec_id_from_cb = data.get('execution_id') # 找到对应的执行记录 conn = db.get_conn() cur = conn.cursor(dictionary=True) if exec_id_from_cb: exec_id = exec_id_from_cb else: # 取该任务最近一条 running 的执行记录 cur.execute(""" SELECT id FROM task_executions WHERE task_id=%s AND status='running' ORDER BY started_at DESC LIMIT 1 """, (task_id,)) row = cur.fetchone() exec_id = row['id'] if row else None if not exec_id: cur.close(); conn.close() return jsonify({'ok': False, 'message': '未找到对应执行记录'}), 404 # 更新执行记录 cur2 = conn.cursor() cur2.execute(""" UPDATE task_executions SET status=%s, ended_at=NOW(), duration_ms=%s, cpu_usage_pct=%s, memory_usage_pct=%s, error_message=%s WHERE id=%s """, (status, duration_ms, cpu, mem, error_msg or None, exec_id)) # 更新任务最后运行时间 cur2.execute("UPDATE tasks SET last_run_at=NOW() WHERE id=%s", (task_id,)) # 写入日志 now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] for log in logs: cur2.execute(""" INSERT INTO task_logs (execution_id, task_id, log_level, log_time, message, node_name) VALUES (%s, %s, %s, %s, %s, %s) """, ( exec_id, task_id, log.get('level', 'info'), log.get('time', now_str), log.get('message', ''), log.get('node') )) # 失败时写告警通知 if status in ('failed', 'timeout'): cur2.execute(""" INSERT INTO alert_notifications (target_user_id, task_id, alert_type, title, content) SELECT owner_id, %s, 'task_fail', '任务执行失败', CONCAT('任务执行', IF(%s='timeout','超时','失败'), IFNULL(CONCAT(':', %s), '')) FROM tasks WHERE id=%s """, (task_id, status, error_msg or None, task_id)) conn.commit() cur.close(); cur2.close(); conn.close() return jsonify({'ok': True, 'message': '回调已处理'}) @app.route('/api/openclaw/status', methods=['GET']) def openclaw_status(): """检查 OpenClaw 连接状态:通过 WebSocket 握手探测""" try: result = openclaw._acp_call('cron.status', {}) return jsonify({'connected': True, 'message': 'OpenClaw 在线', 'base_url': openclaw.OPENCLAW_BASE}) except Exception as e: err = str(e) connected = 'not reachable' not in err.lower() and 'ConnectionRefusedError' not in err return jsonify({'connected': False, 'message': err, 'base_url': openclaw.OPENCLAW_BASE}) # ───────────────────────────────────────────── # 工具函数 # ───────────────────────────────────────────── def _audit_log(user_id, team_id, action, res_type=None, res_id=None): if not team_id: return try: conn = db.get_conn() cur = conn.cursor() cur.execute(""" INSERT INTO team_audit_logs (team_id, user_id, action, resource_type, resource_id) VALUES (%s, %s, %s, %s, %s) """, (team_id, user_id, action, res_type, res_id)) conn.commit() cur.close(); conn.close() except Exception: pass if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)