You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1133 lines
41 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
灵枢智能任务平台 —— Flask 后端服务
数据库: MySQL 8.0 (lingshu)
"""
from flask import Flask, request, jsonify, send_from_directory, g
from flask_cors import CORS
import db
import os
import jwt
import bcrypt
import datetime
import threading
from functools import wraps
import openclaw
app = Flask(__name__, static_folder=os.path.join(os.path.dirname(__file__), '..'))
app.config['JSON_AS_ASCII'] = False
CORS(app)
JWT_SECRET = os.environ.get('LINGSHU_JWT_SECRET', 'lingshu-secret-key-change-in-prod')
JWT_EXPIRE_HOURS = 24 * 7 # 7天
# ─────────────────────────────────────────────
# JWT 认证装饰器
# ─────────────────────────────────────────────
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth_header = request.headers.get('Authorization', '')
if not auth_header.startswith('Bearer '):
return jsonify({'error': '未登录,请先登录'}), 401
token = auth_header[7:]
try:
payload = jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
g.current_user = payload
except jwt.ExpiredSignatureError:
return jsonify({'error': 'Token 已过期,请重新登录'}), 401
except jwt.InvalidTokenError:
return jsonify({'error': 'Token 无效'}), 401
return f(*args, **kwargs)
return decorated
# ─────────────────────────────────────────────
# 静态文件(托管前端 HTML
# ─────────────────────────────────────────────
@app.route('/')
def index():
return send_from_directory(app.static_folder, '灵枢智能任务平台.html')
@app.route('/logo.png')
def logo():
return send_from_directory(app.static_folder, 'logo.png')
# ─────────────────────────────────────────────
# 认证接口
# ─────────────────────────────────────────────
@app.route('/api/auth/login', methods=['POST'])
def login():
data = request.get_json() or {}
username = data.get('username', '').strip()
password = data.get('password', '')
if not username or not password:
return jsonify({'error': '用户名和密码不能为空'}), 400
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT id, username, display_name, password_hash, avatar_color FROM users WHERE username=%s", (username,))
user = cur.fetchone()
cur.close(); conn.close()
if not user:
return jsonify({'error': '用户名或密码错误'}), 401
# 兼容 bcrypt hash 和明文(初始数据用明文,建议后续全部改为 bcrypt
ph = user['password_hash']
ok = False
if ph and ph.startswith('$2'):
try:
ok = bcrypt.checkpw(password.encode(), ph.encode())
except Exception:
ok = False
else:
ok = (password == ph)
if not ok:
return jsonify({'error': '用户名或密码错误'}), 401
payload = {
'id': user['id'],
'username': user['username'],
'name': user['display_name'],
'color': user['avatar_color'] or '#3b82f6',
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=JWT_EXPIRE_HOURS)
}
token = jwt.encode(payload, JWT_SECRET, algorithm='HS256')
return jsonify({
'token': token,
'user': {
'id': user['id'],
'name': user['display_name'],
'username': user['username'],
'color': user['avatar_color'] or '#3b82f6',
'avatar': (user['display_name'] or '?')[0]
}
})
@app.route('/api/auth/me')
@require_auth
def auth_me():
u = g.current_user
return jsonify({
'id': u['id'],
'name': u['name'],
'username': u['username'],
'color': u['color'],
'avatar': (u['name'] or '?')[0]
})
@app.route('/api/auth/register', methods=['POST'])
def register():
"""注册新用户(管理员可用,生产环境可关闭此接口)"""
data = request.get_json() or {}
username = data.get('username', '').strip()
password = data.get('password', '')
display_name = data.get('display_name', username).strip()
if not username or not password:
return jsonify({'error': '用户名和密码不能为空'}), 400
if len(password) < 4:
return jsonify({'error': '密码至少4位'}), 400
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT id FROM users WHERE username=%s", (username,))
if cur.fetchone():
cur.close(); conn.close()
return jsonify({'error': '用户名已存在'}), 409
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
color = data.get('color', '#3b82f6')
cur2 = conn.cursor()
cur2.execute(
"INSERT INTO users (username, display_name, password_hash, avatar_color) VALUES (%s, %s, %s, %s)",
(username, display_name, hashed, color)
)
conn.commit()
new_id = cur2.lastrowid
cur.close(); cur2.close(); conn.close()
# 将新用户加入默认团队id=1
try:
conn2 = db.get_conn()
c2 = conn2.cursor()
c2.execute("INSERT IGNORE INTO team_members (team_id, user_id, role) VALUES (1, %s, 'viewer')", (new_id,))
conn2.commit()
c2.close(); conn2.close()
except Exception:
pass
return jsonify({'id': new_id, 'message': '注册成功'}), 201
# ─────────────────────────────────────────────
# 工作台 KPI
# ─────────────────────────────────────────────
@app.route('/api/dashboard/kpi')
@require_auth
def dashboard_kpi():
owner_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT COUNT(*) AS total, SUM(status='running') AS running FROM tasks WHERE owner_id=%s", (owner_id,))
row = cur.fetchone()
cur.execute("""
SELECT COUNT(*) AS monthly
FROM task_executions e
JOIN tasks t ON e.task_id = t.id
WHERE t.owner_id=%s
AND MONTH(e.started_at)=MONTH(CURDATE())
AND YEAR(e.started_at)=YEAR(CURDATE())
""", (owner_id,))
monthly = cur.fetchone()['monthly']
cur.execute("SELECT COUNT(*) AS alerts FROM alert_notifications WHERE target_user_id=%s AND is_read=0", (owner_id,))
alerts = cur.fetchone()['alerts']
cur.close(); conn.close()
return jsonify({
'running': int(row['running'] or 0),
'total': int(row['total'] or 0),
'monthly_triggers': monthly,
'alerts': alerts
})
@app.route('/api/dashboard/exec_trend')
@require_auth
def exec_trend():
days = int(request.args.get('days', 7))
owner_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT DATE(e.started_at) AS day,
SUM(e.status='success') AS success,
SUM(e.status='failed' OR e.status='timeout') AS fail
FROM task_executions e
JOIN tasks t ON e.task_id = t.id
WHERE t.owner_id=%s AND e.started_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
GROUP BY DATE(e.started_at)
ORDER BY day ASC
""", (owner_id, days))
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify([{'day': str(r['day']), 'success': int(r['success']), 'fail': int(r['fail'])} for r in rows])
@app.route('/api/dashboard/agent_usage')
@require_auth
def agent_usage():
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT name, usage_count FROM agents WHERE is_public=1 ORDER BY usage_count DESC LIMIT 6")
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify([{'name': r['name'], 'value': r['usage_count']} for r in rows])
@app.route('/api/dashboard/status_dist')
@require_auth
def status_dist():
owner_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT status, COUNT(*) AS cnt FROM tasks WHERE owner_id=%s GROUP BY status", (owner_id,))
rows = cur.fetchall()
cur.close(); conn.close()
label_map = {'running': '运行中', 'stopped': '已停止', 'error': '错误', 'draft': '草稿'}
return jsonify([{'name': label_map.get(r['status'], r['status']), 'value': r['cnt']} for r in rows])
# ─────────────────────────────────────────────
# 任务中心
# ─────────────────────────────────────────────
@app.route('/api/tasks')
@require_auth
def list_tasks():
import json as _json
owner_id = g.current_user['id']
keyword = request.args.get('keyword', '').strip()
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
if keyword:
cur.execute("""
SELECT t.id, t.name, t.cron_expression, t.status,
a.name AS agent, t.trigger_type, t.workflow_json
FROM tasks t LEFT JOIN agents a ON t.agent_id=a.id
WHERE t.owner_id=%s AND t.name LIKE %s
ORDER BY t.updated_at DESC
""", (owner_id, f'%{keyword}%'))
else:
cur.execute("""
SELECT t.id, t.name, t.cron_expression, t.status,
a.name AS agent, t.trigger_type, t.workflow_json
FROM tasks t LEFT JOIN agents a ON t.agent_id=a.id
WHERE t.owner_id=%s
ORDER BY t.updated_at DESC
""", (owner_id,))
rows = cur.fetchall()
cur.close(); conn.close()
result = []
for r in rows:
item = dict(r)
wj = item.pop('workflow_json', None)
try:
extra = _json.loads(wj) if wj else {}
except Exception:
extra = {}
item['description'] = extra.get('description', '')
item['category'] = extra.get('category', '')
item['skill_name'] = extra.get('skill_name', '')
item['timeout'] = extra.get('timeout', 300)
item['retry_count'] = extra.get('retry_count', 3)
result.append(item)
return jsonify(result)
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
@require_auth
def get_task(task_id):
import json as _json
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT t.*, a.name AS agent_name
FROM tasks t LEFT JOIN agents a ON t.agent_id=a.id
WHERE t.id=%s
""", (task_id,))
row = cur.fetchone()
cur.close(); conn.close()
if not row:
return jsonify({'error': 'not found'}), 404
wj = row.pop('workflow_json', None)
try:
extra = _json.loads(wj) if wj else {}
except Exception:
extra = {}
row['description'] = extra.get('description', '')
row['category'] = extra.get('category', '')
row['skill_name'] = extra.get('skill_name', '')
row['skill_params'] = extra.get('skill_params', {})
row['timeout'] = extra.get('timeout', 300)
row['retry_count'] = extra.get('retry_count', 3)
return jsonify(db.serialize_row(row))
@app.route('/api/tasks', methods=['POST'])
@require_auth
def create_task():
import json as _json
data = request.get_json() or {}
owner_id = g.current_user['id']
name = data.get('name', '').strip()
if not name:
return jsonify({'error': '任务名称不能为空'}), 400
# 触发方式:有 cron_expression 且不为空则为 cron否则 manual
cron_expr = data.get('cron_expression', '').strip()
trigger_type = 'cron' if cron_expr else 'manual'
# 扩展字段(先用 task_config 的 JSON 字段存储,待数据库结构扩展后迁移)
extra = {
'description': data.get('description', ''),
'category': data.get('category', ''),
'skill_name': data.get('skill_name', ''),
'skill_params': data.get('skill_params', {}),
'datasource_id': data.get('datasource_id'),
'timeout': data.get('timeout', 300),
'retry_count': data.get('retry_count', 3),
}
extra.update(data.get('task_config', {}))
# 获取用户所在团队
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT team_id FROM team_members WHERE user_id=%s LIMIT 1", (owner_id,))
tm = cur.fetchone()
team_id = tm['team_id'] if tm else None
cur2 = conn.cursor()
status = data.get('status', 'stopped')
if status not in ('stopped', 'draft'):
status = 'stopped'
cur2.execute("""
INSERT INTO tasks (owner_id, team_id, name, cron_expression, agent_id, trigger_type, status, workflow_json)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (
owner_id,
team_id,
name,
cron_expr or None,
data.get('agent_id') or None,
trigger_type,
status,
_json.dumps(extra, ensure_ascii=False)
))
conn.commit()
new_id = cur2.lastrowid
cur.close(); cur2.close(); conn.close()
_audit_log(owner_id, team_id, f"创建了任务「{name}", 'task', new_id)
return jsonify({'id': new_id, 'message': '任务创建成功'}), 201
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
@require_auth
def update_task(task_id):
import json as _json
data = request.get_json() or {}
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
# 先读取现有 workflow_json用于合并扩展字段
cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,))
row = cur.fetchone()
try:
existing_extra = _json.loads(row['workflow_json']) if row and row['workflow_json'] else {}
except Exception:
existing_extra = {}
# 更新扩展字段到 workflow_json
for key in ['description', 'category', 'skill_name', 'skill_params', 'datasource_id', 'timeout', 'retry_count']:
if key in data:
existing_extra[key] = data[key]
cur2 = conn.cursor()
fields, vals = [], []
for key in ['name', 'cron_expression', 'agent_id', 'trigger_type']:
if key in data:
fields.append(f"{key}=%s")
vals.append(data[key])
fields.append("workflow_json=%s")
vals.append(_json.dumps(existing_extra, ensure_ascii=False))
vals.append(task_id)
cur2.execute(f"UPDATE tasks SET {','.join(fields)}, updated_at=NOW() WHERE id=%s", vals)
conn.commit()
cur.close(); cur2.close(); conn.close()
return jsonify({'message': '更新成功'})
@app.route('/api/tasks/<int:task_id>/copy', methods=['POST'])
@require_auth
def copy_task(task_id):
import json as _json
owner_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT * FROM tasks WHERE id=%s AND owner_id=%s", (task_id, owner_id))
task = cur.fetchone()
if not task:
cur.close(); conn.close()
return jsonify({'error': '任务不存在'}), 404
cur2 = conn.cursor()
cur2.execute("""
INSERT INTO tasks (owner_id, team_id, name, cron_expression, agent_id, trigger_type, status, workflow_json)
VALUES (%s, %s, %s, %s, %s, %s, 'stopped', %s)
""", (
owner_id,
task['team_id'],
task['name'] + '-副本',
task['cron_expression'],
task['agent_id'],
task['trigger_type'],
task['workflow_json']
))
conn.commit()
new_id = cur2.lastrowid
cur.close(); cur2.close(); conn.close()
_audit_log(owner_id, task['team_id'], f"复制了任务「{task['name']}", 'task', new_id)
return jsonify({'id': new_id, 'message': '复制成功'}), 201
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
@require_auth
def delete_task(task_id):
owner_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT id, name, team_id FROM tasks WHERE id=%s AND owner_id=%s", (task_id, owner_id))
task = cur.fetchone()
if not task:
cur.close(); conn.close()
return jsonify({'error': '任务不存在或无权删除'}), 404
# 先停止(如果运行中),再删除执行记录和任务
cur2 = conn.cursor()
cur2.execute("DELETE FROM task_logs WHERE execution_id IN (SELECT id FROM task_executions WHERE task_id=%s)", (task_id,))
cur2.execute("DELETE FROM task_executions WHERE task_id=%s", (task_id,))
cur2.execute("DELETE FROM tasks WHERE id=%s", (task_id,))
conn.commit()
cur.close(); cur2.close(); conn.close()
_audit_log(owner_id, task.get('team_id'), f"删除了任务「{task['name']}", 'task', task_id)
return jsonify({'message': '删除成功'})
@app.route('/api/tasks/<int:task_id>/status', methods=['PUT'])
@require_auth
def toggle_task_status(task_id):
import json as _json
data = request.get_json()
new_status = data.get('status')
if new_status not in ('running', 'stopped'):
return jsonify({'error': 'invalid status'}), 400
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT id, name, status, workflow_json FROM tasks WHERE id=%s", (task_id,))
task = cur.fetchone()
cur.close(); conn.close()
if not task:
return jsonify({'error': '任务不存在'}), 404
result_msg = f'状态已切换为 {new_status}'
openclaw_msg = ''
if new_status == 'running':
# 向 OpenClaw 注册调度(异步,不阻塞接口响应)
def _register():
res = openclaw.register_task(task_id)
if not res['ok']:
# 注册失败时将任务改回 stopped 并写告警
try:
c = db.get_conn()
cu = c.cursor()
cu.execute("UPDATE tasks SET status='stopped' WHERE id=%s", (task_id,))
cu.execute("""
INSERT INTO alert_notifications (target_user_id, task_id, alert_type, title, content)
SELECT owner_id, %s, 'task_fail', '任务启动失败', %s FROM tasks WHERE id=%s
""", (task_id, res['message'], task_id))
c.commit(); cu.close(); c.close()
except Exception:
pass
threading.Thread(target=_register, daemon=True).start()
openclaw_msg = '正在向 OpenClaw 注册调度...'
else: # stopped
# 通知 OpenClaw 移除调度
res = openclaw.unregister_task(task_id)
openclaw_msg = res['message']
conn2 = db.get_conn()
cur2 = conn2.cursor()
cur2.execute("UPDATE tasks SET status=%s WHERE id=%s", (new_status, task_id))
conn2.commit()
cur2.close(); conn2.close()
return jsonify({'message': result_msg, 'openclaw': openclaw_msg})
@app.route('/api/tasks/<int:task_id>/execute', methods=['POST'])
@require_auth
def manual_execute(task_id):
trigger_user_id = g.current_user['id']
# 1. 创建执行记录
conn = db.get_conn()
cur = conn.cursor()
cur.execute("""
INSERT INTO task_executions (task_id, trigger_type, trigger_user_id, status, started_at)
VALUES (%s, 'manual', %s, 'running', NOW())
""", (task_id, trigger_user_id))
conn.commit()
exec_id = cur.lastrowid
cur.close(); conn.close()
# 2. 异步调用 OpenClaw不阻塞前端响应
def _run():
res = openclaw.trigger_once(task_id, exec_id)
if not res['ok']:
try:
c = db.get_conn()
cu = c.cursor()
cu.execute("""
UPDATE task_executions SET status='failed', ended_at=NOW(), error_message=%s
WHERE id=%s
""", (res['message'], exec_id))
c.commit(); cu.close(); c.close()
except Exception:
pass
threading.Thread(target=_run, daemon=True).start()
return jsonify({'execution_id': exec_id, 'message': '已触发手动执行,正在调用 OpenClaw...'})
@app.route('/api/tasks/<int:task_id>/executions')
@require_auth
def task_executions(task_id):
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT id, trigger_type, status, started_at, ended_at, duration_ms,
retry_attempt, error_message, cpu_usage_pct, memory_usage_pct
FROM task_executions WHERE task_id=%s ORDER BY started_at DESC LIMIT 20
""", (task_id,))
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify(db.serialize_rows(rows))
@app.route('/api/tasks/<int:task_id>/logs')
@require_auth
def task_logs(task_id):
level = request.args.get('level', 'all')
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT id FROM task_executions WHERE task_id=%s ORDER BY started_at DESC LIMIT 1", (task_id,))
exec_row = cur.fetchone()
if not exec_row:
cur.close(); conn.close()
return jsonify([])
exec_id = exec_row['id']
if level == 'all':
cur.execute("SELECT log_level, log_time, message, node_name FROM task_logs WHERE execution_id=%s ORDER BY log_time ASC", (exec_id,))
else:
cur.execute("SELECT log_level, log_time, message, node_name FROM task_logs WHERE execution_id=%s AND log_level=%s ORDER BY log_time ASC", (exec_id, level))
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify(db.serialize_rows(rows))
# ─────────────────────────────────────────────
# 应用空间 —— 智能体广场(公共,不需个人认证也可浏览,但统一加认证)
# ─────────────────────────────────────────────
@app.route('/api/agents')
@require_auth
def list_agents():
keyword = request.args.get('keyword', '').strip()
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
if keyword:
cur.execute("""
SELECT id, agent_code, name, provider, category, description, usage_count, tags
FROM agents WHERE is_public=1 AND status='active'
AND (name LIKE %s OR description LIKE %s)
ORDER BY usage_count DESC
""", (f'%{keyword}%', f'%{keyword}%'))
else:
cur.execute("""
SELECT id, agent_code, name, provider, category, description, usage_count, tags
FROM agents WHERE is_public=1 AND status='active'
ORDER BY usage_count DESC
""")
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify(rows)
# ─────────────────────────────────────────────
# 应用空间 —— 任务模板广场(公共共享)
# ─────────────────────────────────────────────
@app.route('/api/task_templates')
@require_auth
def list_templates():
keyword = request.args.get('keyword', '').strip()
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
if keyword:
cur.execute("""
SELECT tt.id, tt.title, tt.description, tt.category, tt.tags,
tt.usage_count, tt.contributor_alias, u.display_name AS contributor_name
FROM task_templates tt JOIN users u ON tt.contributor_id=u.id
WHERE tt.is_active=1 AND (tt.title LIKE %s OR tt.description LIKE %s)
ORDER BY tt.usage_count DESC
""", (f'%{keyword}%', f'%{keyword}%'))
else:
cur.execute("""
SELECT tt.id, tt.title, tt.description, tt.category, tt.tags,
tt.usage_count, tt.contributor_alias, u.display_name AS contributor_name
FROM task_templates tt JOIN users u ON tt.contributor_id=u.id
WHERE tt.is_active=1 ORDER BY tt.usage_count DESC
""")
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify(rows)
@app.route('/api/task_templates', methods=['POST'])
@require_auth
def publish_template():
data = request.get_json()
owner_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor()
cur.execute("""
INSERT INTO task_templates (title, description, category, contributor_id, contributor_alias, task_config)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
data['title'],
data.get('description', ''),
data.get('category', '情报处理'),
owner_id,
data.get('contributor_alias', g.current_user['name']),
db.to_json(data.get('task_config', {}))
))
conn.commit()
new_id = cur.lastrowid
cur.close(); conn.close()
return jsonify({'id': new_id, 'message': '模板发布成功'}), 201
@app.route('/api/task_templates/<int:tpl_id>/import', methods=['POST'])
@require_auth
def import_template(tpl_id):
owner_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT * FROM task_templates WHERE id=%s", (tpl_id,))
tpl = cur.fetchone()
if not tpl:
cur.close(); conn.close()
return jsonify({'error': 'template not found'}), 404
config = tpl.get('task_config') or {}
cur2 = conn.cursor()
cur2.execute("""
INSERT INTO tasks (owner_id, name, cron_expression, agent_id, trigger_type, status, source_template_id)
VALUES (%s, %s, %s, NULL, 'cron', 'stopped', %s)
""", (
owner_id,
tpl['title'] + ' (导入)',
config.get('cron_expression', '未配置'),
tpl_id
))
cur2.execute("UPDATE task_templates SET usage_count=usage_count+1 WHERE id=%s", (tpl_id,))
conn.commit()
new_task_id = cur2.lastrowid
cur.close(); cur2.close(); conn.close()
return jsonify({'task_id': new_task_id, 'message': '导入成功'})
# ─────────────────────────────────────────────
# 团队协作空间
# ─────────────────────────────────────────────
@app.route('/api/teams/<int:team_id>/members')
@require_auth
def team_members(team_id):
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT u.id, u.display_name AS name, u.avatar_color AS color,
tm.role, tm.is_online AS online, tm.joined_at
FROM team_members tm JOIN users u ON tm.user_id=u.id
WHERE tm.team_id=%s ORDER BY tm.joined_at ASC
""", (team_id,))
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify(db.serialize_rows(rows))
@app.route('/api/teams/<int:team_id>/members/<int:user_id>/role', methods=['PUT'])
@require_auth
def change_member_role(team_id, user_id):
data = request.get_json()
role = data.get('role')
if role not in ('admin', 'editor', 'viewer'):
return jsonify({'error': 'invalid role'}), 400
conn = db.get_conn()
cur = conn.cursor()
cur.execute("UPDATE team_members SET role=%s WHERE team_id=%s AND user_id=%s", (role, team_id, user_id))
conn.commit()
cur.close(); conn.close()
return jsonify({'message': '角色已更新'})
@app.route('/api/teams/<int:team_id>/members/<int:user_id>', methods=['DELETE'])
@require_auth
def remove_member(team_id, user_id):
conn = db.get_conn()
cur = conn.cursor()
cur.execute("DELETE FROM team_members WHERE team_id=%s AND user_id=%s", (team_id, user_id))
conn.commit()
cur.close(); conn.close()
return jsonify({'message': '成员已移除'})
@app.route('/api/teams/<int:team_id>/members', methods=['POST'])
@require_auth
def invite_member(team_id):
data = request.get_json()
name = data.get('name', '').strip()
if not name:
return jsonify({'error': 'name required'}), 400
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT id FROM users WHERE display_name=%s", (name,))
user = cur.fetchone()
if not user:
import random, string
uname = 'user_' + ''.join(random.choices(string.digits, k=6))
cur2 = conn.cursor()
cur2.execute("""
INSERT INTO users (username, display_name, password_hash, avatar_color)
VALUES (%s, %s, 'placeholder', %s)
""", (uname, name, data.get('color', '#3b82f6')))
conn.commit()
user_id = cur2.lastrowid
cur2.close()
else:
user_id = user['id']
cur3 = conn.cursor()
try:
cur3.execute("INSERT INTO team_members (team_id, user_id, role) VALUES (%s, %s, 'viewer')", (team_id, user_id))
conn.commit()
except Exception:
pass
cur3.close(); cur.close(); conn.close()
return jsonify({'user_id': user_id, 'message': f'{name} 已加入团队'})
@app.route('/api/teams/<int:team_id>/shared_resources')
@require_auth
def shared_resources(team_id):
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT sr.id, sr.resource_type, sr.resource_id, sr.resource_name,
sr.usage_count, u.display_name AS creator, sr.shared_at
FROM team_shared_resources sr JOIN users u ON sr.shared_by=u.id
WHERE sr.team_id=%s AND sr.is_active=1 ORDER BY sr.shared_at DESC
""", (team_id,))
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify(db.serialize_rows(rows))
@app.route('/api/teams/<int:team_id>/shared_resources/<int:res_id>', methods=['DELETE'])
@require_auth
def unshare_resource(team_id, res_id):
conn = db.get_conn()
cur = conn.cursor()
cur.execute("UPDATE team_shared_resources SET is_active=0 WHERE id=%s AND team_id=%s", (res_id, team_id))
conn.commit()
cur.close(); conn.close()
return jsonify({'message': '已取消共享'})
@app.route('/api/teams/<int:team_id>/audit_logs')
@require_auth
def audit_logs(team_id):
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT al.id, u.display_name AS user, al.action, al.resource_type, al.created_at
FROM team_audit_logs al JOIN users u ON al.user_id=u.id
WHERE al.team_id=%s ORDER BY al.created_at DESC LIMIT 30
""", (team_id,))
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify(db.serialize_rows(rows))
@app.route('/api/teams/<int:team_id>/comments')
@require_auth
def team_comments(team_id):
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT c.id, u.display_name AS user, u.avatar_color AS color,
c.content AS text, c.created_at AS time, c.parent_id
FROM team_comments c JOIN users u ON c.user_id=u.id
WHERE c.team_id=%s AND c.is_deleted=0 ORDER BY c.created_at ASC
""", (team_id,))
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify(db.serialize_rows(rows))
@app.route('/api/teams/<int:team_id>/comments', methods=['POST'])
@require_auth
def post_comment(team_id):
data = request.get_json()
user_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor()
cur.execute("""
INSERT INTO team_comments (team_id, user_id, content, parent_id)
VALUES (%s, %s, %s, %s)
""", (team_id, user_id, data['content'], data.get('parent_id')))
conn.commit()
new_id = cur.lastrowid
cur.close(); conn.close()
return jsonify({'id': new_id, 'message': '留言成功'})
@app.route('/api/teams/<int:team_id>/info')
@require_auth
def team_info(team_id):
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT id, team_code, team_name, description FROM teams WHERE id=%s", (team_id,))
row = cur.fetchone()
cur.close(); conn.close()
if not row:
return jsonify({'error': 'not found'}), 404
return jsonify(row)
# 获取当前用户所属团队
@app.route('/api/my/team')
@require_auth
def my_team():
user_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT t.id, t.team_code, t.team_name
FROM team_members tm JOIN teams t ON tm.team_id=t.id
WHERE tm.user_id=%s ORDER BY tm.joined_at ASC LIMIT 1
""", (user_id,))
row = cur.fetchone()
cur.close(); conn.close()
if not row:
return jsonify({'id': 1, 'team_code': 'DEFAULT', 'team_name': '默认团队'})
return jsonify(row)
# ─────────────────────────────────────────────
# 核心配置中心
# ─────────────────────────────────────────────
@app.route('/api/datasources')
@require_auth
def list_datasources():
owner_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT id, name, ds_type, host, database_name, username, connection_status, last_tested_at
FROM datasources WHERE owner_id=%s ORDER BY created_at DESC
""", (owner_id,))
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify(db.serialize_rows(rows))
@app.route('/api/datasources/<int:ds_id>', methods=['PUT'])
@require_auth
def update_datasource(ds_id):
data = request.get_json()
conn = db.get_conn()
cur = conn.cursor()
password = data.get('password', '')
enc = db.simple_encrypt(password) if password else None
fields, vals = [], []
if 'host' in data: fields.append('host=%s'); vals.append(data['host'])
if 'username' in data: fields.append('username=%s'); vals.append(data['username'])
if enc: fields.append('credential_enc=%s'); vals.append(enc)
if fields:
vals.append(ds_id)
cur.execute(f"UPDATE datasources SET {','.join(fields)} WHERE id=%s", vals)
conn.commit()
cur.close(); conn.close()
return jsonify({'message': '凭证已保存'})
@app.route('/api/datasources', methods=['POST'])
@require_auth
def create_datasource():
data = request.get_json()
owner_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor()
enc = db.simple_encrypt(data.get('password', ''))
cur.execute("""
INSERT INTO datasources (owner_id, name, ds_type, host, database_name, username, credential_enc, connection_status)
VALUES (%s, %s, %s, %s, %s, %s, %s, 'unknown')
""", (
owner_id,
data['name'], data.get('ds_type', 'mysql'),
data.get('host', ''), data.get('database_name', ''),
data.get('username', ''), enc
))
conn.commit()
new_id = cur.lastrowid
cur.close(); conn.close()
return jsonify({'id': new_id, 'message': '数据源已创建'}), 201
@app.route('/api/ai_engines')
@require_auth
def list_ai_engines():
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("SELECT id, engine_type, name, endpoint, description, status, last_checked_at FROM ai_engines")
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify(db.serialize_rows(rows))
@app.route('/api/ai_engines/<int:eng_id>/restart', methods=['POST'])
@require_auth
def restart_engine(eng_id):
conn = db.get_conn()
cur = conn.cursor()
cur.execute("UPDATE ai_engines SET status='ready', last_checked_at=NOW() WHERE id=%s", (eng_id,))
conn.commit()
cur.close(); conn.close()
return jsonify({'message': '重启指令已发送'})
@app.route('/api/alerts')
@require_auth
def list_alerts():
user_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
cur.execute("""
SELECT id, alert_type, title, content, is_read, created_at
FROM alert_notifications WHERE target_user_id=%s
ORDER BY created_at DESC LIMIT 20
""", (user_id,))
rows = cur.fetchall()
cur.close(); conn.close()
return jsonify(db.serialize_rows(rows))
@app.route('/api/alerts/read_all', methods=['POST'])
@require_auth
def mark_alerts_read():
user_id = g.current_user['id']
conn = db.get_conn()
cur = conn.cursor()
cur.execute("UPDATE alert_notifications SET is_read=1 WHERE target_user_id=%s", (user_id,))
conn.commit()
cur.close(); conn.close()
return jsonify({'message': '全部已读'})
# ─────────────────────────────────────────────
# OpenClaw 回调接口(无需登录验证,由 OpenClaw 直接调用)
# ─────────────────────────────────────────────
@app.route('/api/openclaw/callback/<int:task_id>', methods=['POST'])
def openclaw_callback(task_id):
"""
OpenClaw 执行完成后回调此接口,写入执行结果和日志。
请求体示例:
{
"execution_id": 123, # 灵枢侧执行记录ID可选优先用
"job_id": "lingshu_task_1_exec_123",
"status": "success", # success / failed / timeout
"duration_ms": 1500,
"cpu_usage_pct": 35.2,
"memory_usage_pct": 28.1,
"error_message": "",
"logs": [
{"level": "info", "time": "2026-01-01 12:00:00.000", "message": "xxx", "node": "step1"},
...
]
}
"""
import json as _json
data = request.get_json(silent=True) or {}
status = data.get('status', 'success')
duration_ms = data.get('duration_ms')
cpu = data.get('cpu_usage_pct')
mem = data.get('memory_usage_pct')
error_msg = data.get('error_message', '')
logs = data.get('logs', [])
exec_id_from_cb = data.get('execution_id')
# 找到对应的执行记录
conn = db.get_conn()
cur = conn.cursor(dictionary=True)
if exec_id_from_cb:
exec_id = exec_id_from_cb
else:
# 取该任务最近一条 running 的执行记录
cur.execute("""
SELECT id FROM task_executions
WHERE task_id=%s AND status='running'
ORDER BY started_at DESC LIMIT 1
""", (task_id,))
row = cur.fetchone()
exec_id = row['id'] if row else None
if not exec_id:
cur.close(); conn.close()
return jsonify({'ok': False, 'message': '未找到对应执行记录'}), 404
# 更新执行记录
cur2 = conn.cursor()
cur2.execute("""
UPDATE task_executions
SET status=%s, ended_at=NOW(), duration_ms=%s,
cpu_usage_pct=%s, memory_usage_pct=%s, error_message=%s
WHERE id=%s
""", (status, duration_ms, cpu, mem, error_msg or None, exec_id))
# 更新任务最后运行时间
cur2.execute("UPDATE tasks SET last_run_at=NOW() WHERE id=%s", (task_id,))
# 写入日志
now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
for log in logs:
cur2.execute("""
INSERT INTO task_logs (execution_id, task_id, log_level, log_time, message, node_name)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
exec_id, task_id,
log.get('level', 'info'),
log.get('time', now_str),
log.get('message', ''),
log.get('node')
))
# 失败时写告警通知
if status in ('failed', 'timeout'):
cur2.execute("""
INSERT INTO alert_notifications (target_user_id, task_id, alert_type, title, content)
SELECT owner_id, %s, 'task_fail', '任务执行失败',
CONCAT('任务执行', IF(%s='timeout','超时','失败'), IFNULL(CONCAT('', %s), ''))
FROM tasks WHERE id=%s
""", (task_id, status, error_msg or None, task_id))
conn.commit()
cur.close(); cur2.close(); conn.close()
return jsonify({'ok': True, 'message': '回调已处理'})
@app.route('/api/openclaw/status', methods=['GET'])
def openclaw_status():
"""检查 OpenClaw 连接状态:通过 WebSocket 握手探测"""
try:
result = openclaw._acp_call('cron.status', {})
return jsonify({'connected': True, 'message': 'OpenClaw 在线',
'base_url': openclaw.OPENCLAW_BASE})
except Exception as e:
err = str(e)
connected = 'not reachable' not in err.lower() and 'ConnectionRefusedError' not in err
return jsonify({'connected': False, 'message': err,
'base_url': openclaw.OPENCLAW_BASE})
# ─────────────────────────────────────────────
# 工具函数
# ─────────────────────────────────────────────
def _audit_log(user_id, team_id, action, res_type=None, res_id=None):
if not team_id:
return
try:
conn = db.get_conn()
cur = conn.cursor()
cur.execute("""
INSERT INTO team_audit_logs (team_id, user_id, action, resource_type, resource_id)
VALUES (%s, %s, %s, %s, %s)
""", (team_id, user_id, action, res_type, res_id))
conn.commit()
cur.close(); conn.close()
except Exception:
pass
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)