|
|
"""
|
|
|
灵枢智能任务平台 —— Flask 后端服务
|
|
|
数据库: MySQL 8.0 (lingshu)
|
|
|
"""
|
|
|
from flask import Flask, request, jsonify, send_from_directory, g
|
|
|
from flask_cors import CORS
|
|
|
import db
|
|
|
import os
|
|
|
import jwt
|
|
|
import bcrypt
|
|
|
import datetime
|
|
|
import threading
|
|
|
from functools import wraps
|
|
|
import openclaw
|
|
|
|
|
|
app = Flask(__name__, static_folder=os.path.join(os.path.dirname(__file__), '..'))
|
|
|
app.config['JSON_AS_ASCII'] = False
|
|
|
CORS(app)
|
|
|
|
|
|
JWT_SECRET = os.environ.get('LINGSHU_JWT_SECRET', 'lingshu-secret-key-change-in-prod')
|
|
|
JWT_EXPIRE_HOURS = 24 * 7 # 7天
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
# JWT 认证装饰器
|
|
|
# ─────────────────────────────────────────────
|
|
|
def require_auth(f):
|
|
|
@wraps(f)
|
|
|
def decorated(*args, **kwargs):
|
|
|
auth_header = request.headers.get('Authorization', '')
|
|
|
if not auth_header.startswith('Bearer '):
|
|
|
return jsonify({'error': '未登录,请先登录'}), 401
|
|
|
token = auth_header[7:]
|
|
|
try:
|
|
|
payload = jwt.decode(token, JWT_SECRET, algorithms=['HS256'])
|
|
|
g.current_user = payload
|
|
|
except jwt.ExpiredSignatureError:
|
|
|
return jsonify({'error': 'Token 已过期,请重新登录'}), 401
|
|
|
except jwt.InvalidTokenError:
|
|
|
return jsonify({'error': 'Token 无效'}), 401
|
|
|
return f(*args, **kwargs)
|
|
|
return decorated
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
# 静态文件(托管前端 HTML)
|
|
|
# ─────────────────────────────────────────────
|
|
|
@app.route('/')
|
|
|
def index():
|
|
|
return send_from_directory(app.static_folder, '灵枢智能任务平台.html')
|
|
|
|
|
|
@app.route('/logo.png')
|
|
|
def logo():
|
|
|
return send_from_directory(app.static_folder, 'logo.png')
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
# 认证接口
|
|
|
# ─────────────────────────────────────────────
|
|
|
@app.route('/api/auth/login', methods=['POST'])
|
|
|
def login():
|
|
|
data = request.get_json() or {}
|
|
|
username = data.get('username', '').strip()
|
|
|
password = data.get('password', '')
|
|
|
if not username or not password:
|
|
|
return jsonify({'error': '用户名和密码不能为空'}), 400
|
|
|
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT id, username, display_name, password_hash, avatar_color FROM users WHERE username=%s", (username,))
|
|
|
user = cur.fetchone()
|
|
|
cur.close(); conn.close()
|
|
|
|
|
|
if not user:
|
|
|
return jsonify({'error': '用户名或密码错误'}), 401
|
|
|
|
|
|
# 兼容 bcrypt hash 和明文(初始数据用明文,建议后续全部改为 bcrypt)
|
|
|
ph = user['password_hash']
|
|
|
ok = False
|
|
|
if ph and ph.startswith('$2'):
|
|
|
try:
|
|
|
ok = bcrypt.checkpw(password.encode(), ph.encode())
|
|
|
except Exception:
|
|
|
ok = False
|
|
|
else:
|
|
|
ok = (password == ph)
|
|
|
|
|
|
if not ok:
|
|
|
return jsonify({'error': '用户名或密码错误'}), 401
|
|
|
|
|
|
payload = {
|
|
|
'id': user['id'],
|
|
|
'username': user['username'],
|
|
|
'name': user['display_name'],
|
|
|
'color': user['avatar_color'] or '#3b82f6',
|
|
|
'exp': datetime.datetime.utcnow() + datetime.timedelta(hours=JWT_EXPIRE_HOURS)
|
|
|
}
|
|
|
token = jwt.encode(payload, JWT_SECRET, algorithm='HS256')
|
|
|
return jsonify({
|
|
|
'token': token,
|
|
|
'user': {
|
|
|
'id': user['id'],
|
|
|
'name': user['display_name'],
|
|
|
'username': user['username'],
|
|
|
'color': user['avatar_color'] or '#3b82f6',
|
|
|
'avatar': (user['display_name'] or '?')[0]
|
|
|
}
|
|
|
})
|
|
|
|
|
|
|
|
|
@app.route('/api/auth/me')
|
|
|
@require_auth
|
|
|
def auth_me():
|
|
|
u = g.current_user
|
|
|
return jsonify({
|
|
|
'id': u['id'],
|
|
|
'name': u['name'],
|
|
|
'username': u['username'],
|
|
|
'color': u['color'],
|
|
|
'avatar': (u['name'] or '?')[0]
|
|
|
})
|
|
|
|
|
|
|
|
|
@app.route('/api/auth/register', methods=['POST'])
|
|
|
def register():
|
|
|
"""注册新用户(管理员可用,生产环境可关闭此接口)"""
|
|
|
data = request.get_json() or {}
|
|
|
username = data.get('username', '').strip()
|
|
|
password = data.get('password', '')
|
|
|
display_name = data.get('display_name', username).strip()
|
|
|
if not username or not password:
|
|
|
return jsonify({'error': '用户名和密码不能为空'}), 400
|
|
|
if len(password) < 4:
|
|
|
return jsonify({'error': '密码至少4位'}), 400
|
|
|
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT id FROM users WHERE username=%s", (username,))
|
|
|
if cur.fetchone():
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'error': '用户名已存在'}), 409
|
|
|
|
|
|
hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt()).decode()
|
|
|
color = data.get('color', '#3b82f6')
|
|
|
cur2 = conn.cursor()
|
|
|
cur2.execute(
|
|
|
"INSERT INTO users (username, display_name, password_hash, avatar_color) VALUES (%s, %s, %s, %s)",
|
|
|
(username, display_name, hashed, color)
|
|
|
)
|
|
|
conn.commit()
|
|
|
new_id = cur2.lastrowid
|
|
|
cur.close(); cur2.close(); conn.close()
|
|
|
|
|
|
# 将新用户加入默认团队(id=1)
|
|
|
try:
|
|
|
conn2 = db.get_conn()
|
|
|
c2 = conn2.cursor()
|
|
|
c2.execute("INSERT IGNORE INTO team_members (team_id, user_id, role) VALUES (1, %s, 'viewer')", (new_id,))
|
|
|
conn2.commit()
|
|
|
c2.close(); conn2.close()
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
return jsonify({'id': new_id, 'message': '注册成功'}), 201
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
# 工作台 KPI
|
|
|
# ─────────────────────────────────────────────
|
|
|
@app.route('/api/dashboard/kpi')
|
|
|
@require_auth
|
|
|
def dashboard_kpi():
|
|
|
owner_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
|
|
|
cur.execute("SELECT COUNT(*) AS total, SUM(status='running') AS running FROM tasks WHERE owner_id=%s", (owner_id,))
|
|
|
row = cur.fetchone()
|
|
|
|
|
|
cur.execute("""
|
|
|
SELECT COUNT(*) AS monthly
|
|
|
FROM task_executions e
|
|
|
JOIN tasks t ON e.task_id = t.id
|
|
|
WHERE t.owner_id=%s
|
|
|
AND MONTH(e.started_at)=MONTH(CURDATE())
|
|
|
AND YEAR(e.started_at)=YEAR(CURDATE())
|
|
|
""", (owner_id,))
|
|
|
monthly = cur.fetchone()['monthly']
|
|
|
|
|
|
cur.execute("SELECT COUNT(*) AS alerts FROM alert_notifications WHERE target_user_id=%s AND is_read=0", (owner_id,))
|
|
|
alerts = cur.fetchone()['alerts']
|
|
|
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({
|
|
|
'running': int(row['running'] or 0),
|
|
|
'total': int(row['total'] or 0),
|
|
|
'monthly_triggers': monthly,
|
|
|
'alerts': alerts
|
|
|
})
|
|
|
|
|
|
|
|
|
@app.route('/api/dashboard/exec_trend')
|
|
|
@require_auth
|
|
|
def exec_trend():
|
|
|
days = int(request.args.get('days', 7))
|
|
|
owner_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("""
|
|
|
SELECT DATE(e.started_at) AS day,
|
|
|
SUM(e.status='success') AS success,
|
|
|
SUM(e.status='failed' OR e.status='timeout') AS fail
|
|
|
FROM task_executions e
|
|
|
JOIN tasks t ON e.task_id = t.id
|
|
|
WHERE t.owner_id=%s AND e.started_at >= DATE_SUB(CURDATE(), INTERVAL %s DAY)
|
|
|
GROUP BY DATE(e.started_at)
|
|
|
ORDER BY day ASC
|
|
|
""", (owner_id, days))
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify([{'day': str(r['day']), 'success': int(r['success']), 'fail': int(r['fail'])} for r in rows])
|
|
|
|
|
|
|
|
|
@app.route('/api/dashboard/agent_usage')
|
|
|
@require_auth
|
|
|
def agent_usage():
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT name, usage_count FROM agents WHERE is_public=1 ORDER BY usage_count DESC LIMIT 6")
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify([{'name': r['name'], 'value': r['usage_count']} for r in rows])
|
|
|
|
|
|
|
|
|
@app.route('/api/dashboard/status_dist')
|
|
|
@require_auth
|
|
|
def status_dist():
|
|
|
owner_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT status, COUNT(*) AS cnt FROM tasks WHERE owner_id=%s GROUP BY status", (owner_id,))
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
label_map = {'running': '运行中', 'stopped': '已停止', 'error': '错误', 'draft': '草稿'}
|
|
|
return jsonify([{'name': label_map.get(r['status'], r['status']), 'value': r['cnt']} for r in rows])
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
# 任务中心
|
|
|
# ─────────────────────────────────────────────
|
|
|
@app.route('/api/tasks')
|
|
|
@require_auth
|
|
|
def list_tasks():
|
|
|
import json as _json
|
|
|
owner_id = g.current_user['id']
|
|
|
keyword = request.args.get('keyword', '').strip()
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
if keyword:
|
|
|
cur.execute("""
|
|
|
SELECT t.id, t.name, t.cron_expression, t.status,
|
|
|
a.name AS agent, t.trigger_type, t.workflow_json
|
|
|
FROM tasks t LEFT JOIN agents a ON t.agent_id=a.id
|
|
|
WHERE t.owner_id=%s AND t.name LIKE %s
|
|
|
ORDER BY t.updated_at DESC
|
|
|
""", (owner_id, f'%{keyword}%'))
|
|
|
else:
|
|
|
cur.execute("""
|
|
|
SELECT t.id, t.name, t.cron_expression, t.status,
|
|
|
a.name AS agent, t.trigger_type, t.workflow_json
|
|
|
FROM tasks t LEFT JOIN agents a ON t.agent_id=a.id
|
|
|
WHERE t.owner_id=%s
|
|
|
ORDER BY t.updated_at DESC
|
|
|
""", (owner_id,))
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
result = []
|
|
|
for r in rows:
|
|
|
item = dict(r)
|
|
|
wj = item.pop('workflow_json', None)
|
|
|
try:
|
|
|
extra = _json.loads(wj) if wj else {}
|
|
|
except Exception:
|
|
|
extra = {}
|
|
|
item['description'] = extra.get('description', '')
|
|
|
item['category'] = extra.get('category', '')
|
|
|
item['skill_name'] = extra.get('skill_name', '')
|
|
|
item['timeout'] = extra.get('timeout', 300)
|
|
|
item['retry_count'] = extra.get('retry_count', 3)
|
|
|
result.append(item)
|
|
|
return jsonify(result)
|
|
|
|
|
|
|
|
|
@app.route('/api/tasks/<int:task_id>', methods=['GET'])
|
|
|
@require_auth
|
|
|
def get_task(task_id):
|
|
|
import json as _json
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("""
|
|
|
SELECT t.*, a.name AS agent_name
|
|
|
FROM tasks t LEFT JOIN agents a ON t.agent_id=a.id
|
|
|
WHERE t.id=%s
|
|
|
""", (task_id,))
|
|
|
row = cur.fetchone()
|
|
|
cur.close(); conn.close()
|
|
|
if not row:
|
|
|
return jsonify({'error': 'not found'}), 404
|
|
|
wj = row.pop('workflow_json', None)
|
|
|
try:
|
|
|
extra = _json.loads(wj) if wj else {}
|
|
|
except Exception:
|
|
|
extra = {}
|
|
|
row['description'] = extra.get('description', '')
|
|
|
row['category'] = extra.get('category', '')
|
|
|
row['skill_name'] = extra.get('skill_name', '')
|
|
|
row['skill_params'] = extra.get('skill_params', {})
|
|
|
row['timeout'] = extra.get('timeout', 300)
|
|
|
row['retry_count'] = extra.get('retry_count', 3)
|
|
|
return jsonify(db.serialize_row(row))
|
|
|
|
|
|
|
|
|
@app.route('/api/tasks', methods=['POST'])
|
|
|
@require_auth
|
|
|
def create_task():
|
|
|
import json as _json
|
|
|
data = request.get_json() or {}
|
|
|
owner_id = g.current_user['id']
|
|
|
name = data.get('name', '').strip()
|
|
|
if not name:
|
|
|
return jsonify({'error': '任务名称不能为空'}), 400
|
|
|
|
|
|
# 触发方式:有 cron_expression 且不为空则为 cron,否则 manual
|
|
|
cron_expr = data.get('cron_expression', '').strip()
|
|
|
trigger_type = 'cron' if cron_expr else 'manual'
|
|
|
|
|
|
# 扩展字段(先用 task_config 的 JSON 字段存储,待数据库结构扩展后迁移)
|
|
|
extra = {
|
|
|
'description': data.get('description', ''),
|
|
|
'category': data.get('category', ''),
|
|
|
'skill_name': data.get('skill_name', ''),
|
|
|
'skill_params': data.get('skill_params', {}),
|
|
|
'datasource_id': data.get('datasource_id'),
|
|
|
'timeout': data.get('timeout', 300),
|
|
|
'retry_count': data.get('retry_count', 3),
|
|
|
}
|
|
|
extra.update(data.get('task_config', {}))
|
|
|
|
|
|
# 获取用户所在团队
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT team_id FROM team_members WHERE user_id=%s LIMIT 1", (owner_id,))
|
|
|
tm = cur.fetchone()
|
|
|
team_id = tm['team_id'] if tm else None
|
|
|
|
|
|
cur2 = conn.cursor()
|
|
|
status = data.get('status', 'stopped')
|
|
|
if status not in ('stopped', 'draft'):
|
|
|
status = 'stopped'
|
|
|
cur2.execute("""
|
|
|
INSERT INTO tasks (owner_id, team_id, name, cron_expression, agent_id, trigger_type, status, workflow_json)
|
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
|
|
""", (
|
|
|
owner_id,
|
|
|
team_id,
|
|
|
name,
|
|
|
cron_expr or None,
|
|
|
data.get('agent_id') or None,
|
|
|
trigger_type,
|
|
|
status,
|
|
|
_json.dumps(extra, ensure_ascii=False)
|
|
|
))
|
|
|
conn.commit()
|
|
|
new_id = cur2.lastrowid
|
|
|
cur.close(); cur2.close(); conn.close()
|
|
|
|
|
|
_audit_log(owner_id, team_id, f"创建了任务「{name}」", 'task', new_id)
|
|
|
return jsonify({'id': new_id, 'message': '任务创建成功'}), 201
|
|
|
|
|
|
|
|
|
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
|
|
|
@require_auth
|
|
|
def update_task(task_id):
|
|
|
import json as _json
|
|
|
data = request.get_json() or {}
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
# 先读取现有 workflow_json,用于合并扩展字段
|
|
|
cur.execute("SELECT workflow_json FROM tasks WHERE id=%s", (task_id,))
|
|
|
row = cur.fetchone()
|
|
|
try:
|
|
|
existing_extra = _json.loads(row['workflow_json']) if row and row['workflow_json'] else {}
|
|
|
except Exception:
|
|
|
existing_extra = {}
|
|
|
|
|
|
# 更新扩展字段到 workflow_json
|
|
|
for key in ['description', 'category', 'skill_name', 'skill_params', 'datasource_id', 'timeout', 'retry_count']:
|
|
|
if key in data:
|
|
|
existing_extra[key] = data[key]
|
|
|
|
|
|
cur2 = conn.cursor()
|
|
|
fields, vals = [], []
|
|
|
for key in ['name', 'cron_expression', 'agent_id', 'trigger_type']:
|
|
|
if key in data:
|
|
|
fields.append(f"{key}=%s")
|
|
|
vals.append(data[key])
|
|
|
fields.append("workflow_json=%s")
|
|
|
vals.append(_json.dumps(existing_extra, ensure_ascii=False))
|
|
|
vals.append(task_id)
|
|
|
cur2.execute(f"UPDATE tasks SET {','.join(fields)}, updated_at=NOW() WHERE id=%s", vals)
|
|
|
conn.commit()
|
|
|
cur.close(); cur2.close(); conn.close()
|
|
|
return jsonify({'message': '更新成功'})
|
|
|
|
|
|
|
|
|
@app.route('/api/tasks/<int:task_id>/copy', methods=['POST'])
|
|
|
@require_auth
|
|
|
def copy_task(task_id):
|
|
|
import json as _json
|
|
|
owner_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT * FROM tasks WHERE id=%s AND owner_id=%s", (task_id, owner_id))
|
|
|
task = cur.fetchone()
|
|
|
if not task:
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'error': '任务不存在'}), 404
|
|
|
cur2 = conn.cursor()
|
|
|
cur2.execute("""
|
|
|
INSERT INTO tasks (owner_id, team_id, name, cron_expression, agent_id, trigger_type, status, workflow_json)
|
|
|
VALUES (%s, %s, %s, %s, %s, %s, 'stopped', %s)
|
|
|
""", (
|
|
|
owner_id,
|
|
|
task['team_id'],
|
|
|
task['name'] + '-副本',
|
|
|
task['cron_expression'],
|
|
|
task['agent_id'],
|
|
|
task['trigger_type'],
|
|
|
task['workflow_json']
|
|
|
))
|
|
|
conn.commit()
|
|
|
new_id = cur2.lastrowid
|
|
|
cur.close(); cur2.close(); conn.close()
|
|
|
_audit_log(owner_id, task['team_id'], f"复制了任务「{task['name']}」", 'task', new_id)
|
|
|
return jsonify({'id': new_id, 'message': '复制成功'}), 201
|
|
|
|
|
|
|
|
|
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
|
|
|
@require_auth
|
|
|
def delete_task(task_id):
|
|
|
owner_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT id, name, team_id FROM tasks WHERE id=%s AND owner_id=%s", (task_id, owner_id))
|
|
|
task = cur.fetchone()
|
|
|
if not task:
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'error': '任务不存在或无权删除'}), 404
|
|
|
# 先停止(如果运行中),再删除执行记录和任务
|
|
|
cur2 = conn.cursor()
|
|
|
cur2.execute("DELETE FROM task_logs WHERE execution_id IN (SELECT id FROM task_executions WHERE task_id=%s)", (task_id,))
|
|
|
cur2.execute("DELETE FROM task_executions WHERE task_id=%s", (task_id,))
|
|
|
cur2.execute("DELETE FROM tasks WHERE id=%s", (task_id,))
|
|
|
conn.commit()
|
|
|
cur.close(); cur2.close(); conn.close()
|
|
|
_audit_log(owner_id, task.get('team_id'), f"删除了任务「{task['name']}」", 'task', task_id)
|
|
|
return jsonify({'message': '删除成功'})
|
|
|
|
|
|
|
|
|
@app.route('/api/tasks/<int:task_id>/status', methods=['PUT'])
|
|
|
@require_auth
|
|
|
def toggle_task_status(task_id):
|
|
|
import json as _json
|
|
|
data = request.get_json()
|
|
|
new_status = data.get('status')
|
|
|
if new_status not in ('running', 'stopped'):
|
|
|
return jsonify({'error': 'invalid status'}), 400
|
|
|
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT id, name, status, workflow_json FROM tasks WHERE id=%s", (task_id,))
|
|
|
task = cur.fetchone()
|
|
|
cur.close(); conn.close()
|
|
|
if not task:
|
|
|
return jsonify({'error': '任务不存在'}), 404
|
|
|
|
|
|
result_msg = f'状态已切换为 {new_status}'
|
|
|
openclaw_msg = ''
|
|
|
|
|
|
if new_status == 'running':
|
|
|
# 向 OpenClaw 注册调度(异步,不阻塞接口响应)
|
|
|
def _register():
|
|
|
res = openclaw.register_task(task_id)
|
|
|
if not res['ok']:
|
|
|
# 注册失败时将任务改回 stopped 并写告警
|
|
|
try:
|
|
|
c = db.get_conn()
|
|
|
cu = c.cursor()
|
|
|
cu.execute("UPDATE tasks SET status='stopped' WHERE id=%s", (task_id,))
|
|
|
cu.execute("""
|
|
|
INSERT INTO alert_notifications (target_user_id, task_id, alert_type, title, content)
|
|
|
SELECT owner_id, %s, 'task_fail', '任务启动失败', %s FROM tasks WHERE id=%s
|
|
|
""", (task_id, res['message'], task_id))
|
|
|
c.commit(); cu.close(); c.close()
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
threading.Thread(target=_register, daemon=True).start()
|
|
|
openclaw_msg = '正在向 OpenClaw 注册调度...'
|
|
|
|
|
|
else: # stopped
|
|
|
# 通知 OpenClaw 移除调度
|
|
|
res = openclaw.unregister_task(task_id)
|
|
|
openclaw_msg = res['message']
|
|
|
|
|
|
conn2 = db.get_conn()
|
|
|
cur2 = conn2.cursor()
|
|
|
cur2.execute("UPDATE tasks SET status=%s WHERE id=%s", (new_status, task_id))
|
|
|
conn2.commit()
|
|
|
cur2.close(); conn2.close()
|
|
|
|
|
|
return jsonify({'message': result_msg, 'openclaw': openclaw_msg})
|
|
|
|
|
|
|
|
|
@app.route('/api/tasks/<int:task_id>/execute', methods=['POST'])
|
|
|
@require_auth
|
|
|
def manual_execute(task_id):
|
|
|
trigger_user_id = g.current_user['id']
|
|
|
|
|
|
# 1. 创建执行记录
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("""
|
|
|
INSERT INTO task_executions (task_id, trigger_type, trigger_user_id, status, started_at)
|
|
|
VALUES (%s, 'manual', %s, 'running', NOW())
|
|
|
""", (task_id, trigger_user_id))
|
|
|
conn.commit()
|
|
|
exec_id = cur.lastrowid
|
|
|
cur.close(); conn.close()
|
|
|
|
|
|
# 2. 异步调用 OpenClaw(不阻塞前端响应)
|
|
|
def _run():
|
|
|
res = openclaw.trigger_once(task_id, exec_id)
|
|
|
if not res['ok']:
|
|
|
try:
|
|
|
c = db.get_conn()
|
|
|
cu = c.cursor()
|
|
|
cu.execute("""
|
|
|
UPDATE task_executions SET status='failed', ended_at=NOW(), error_message=%s
|
|
|
WHERE id=%s
|
|
|
""", (res['message'], exec_id))
|
|
|
c.commit(); cu.close(); c.close()
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
threading.Thread(target=_run, daemon=True).start()
|
|
|
|
|
|
return jsonify({'execution_id': exec_id, 'message': '已触发手动执行,正在调用 OpenClaw...'})
|
|
|
|
|
|
|
|
|
@app.route('/api/tasks/<int:task_id>/executions')
|
|
|
@require_auth
|
|
|
def task_executions(task_id):
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("""
|
|
|
SELECT id, trigger_type, status, started_at, ended_at, duration_ms,
|
|
|
retry_attempt, error_message, cpu_usage_pct, memory_usage_pct
|
|
|
FROM task_executions WHERE task_id=%s ORDER BY started_at DESC LIMIT 20
|
|
|
""", (task_id,))
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify(db.serialize_rows(rows))
|
|
|
|
|
|
|
|
|
@app.route('/api/tasks/<int:task_id>/logs')
|
|
|
@require_auth
|
|
|
def task_logs(task_id):
|
|
|
level = request.args.get('level', 'all')
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT id FROM task_executions WHERE task_id=%s ORDER BY started_at DESC LIMIT 1", (task_id,))
|
|
|
exec_row = cur.fetchone()
|
|
|
if not exec_row:
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify([])
|
|
|
exec_id = exec_row['id']
|
|
|
if level == 'all':
|
|
|
cur.execute("SELECT log_level, log_time, message, node_name FROM task_logs WHERE execution_id=%s ORDER BY log_time ASC", (exec_id,))
|
|
|
else:
|
|
|
cur.execute("SELECT log_level, log_time, message, node_name FROM task_logs WHERE execution_id=%s AND log_level=%s ORDER BY log_time ASC", (exec_id, level))
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify(db.serialize_rows(rows))
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
# 应用空间 —— 智能体广场(公共,不需个人认证也可浏览,但统一加认证)
|
|
|
# ─────────────────────────────────────────────
|
|
|
@app.route('/api/agents')
|
|
|
@require_auth
|
|
|
def list_agents():
|
|
|
keyword = request.args.get('keyword', '').strip()
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
if keyword:
|
|
|
cur.execute("""
|
|
|
SELECT id, agent_code, name, provider, category, description, usage_count, tags
|
|
|
FROM agents WHERE is_public=1 AND status='active'
|
|
|
AND (name LIKE %s OR description LIKE %s)
|
|
|
ORDER BY usage_count DESC
|
|
|
""", (f'%{keyword}%', f'%{keyword}%'))
|
|
|
else:
|
|
|
cur.execute("""
|
|
|
SELECT id, agent_code, name, provider, category, description, usage_count, tags
|
|
|
FROM agents WHERE is_public=1 AND status='active'
|
|
|
ORDER BY usage_count DESC
|
|
|
""")
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify(rows)
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
# 应用空间 —— 任务模板广场(公共共享)
|
|
|
# ─────────────────────────────────────────────
|
|
|
@app.route('/api/task_templates')
|
|
|
@require_auth
|
|
|
def list_templates():
|
|
|
keyword = request.args.get('keyword', '').strip()
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
if keyword:
|
|
|
cur.execute("""
|
|
|
SELECT tt.id, tt.title, tt.description, tt.category, tt.tags,
|
|
|
tt.usage_count, tt.contributor_alias, u.display_name AS contributor_name
|
|
|
FROM task_templates tt JOIN users u ON tt.contributor_id=u.id
|
|
|
WHERE tt.is_active=1 AND (tt.title LIKE %s OR tt.description LIKE %s)
|
|
|
ORDER BY tt.usage_count DESC
|
|
|
""", (f'%{keyword}%', f'%{keyword}%'))
|
|
|
else:
|
|
|
cur.execute("""
|
|
|
SELECT tt.id, tt.title, tt.description, tt.category, tt.tags,
|
|
|
tt.usage_count, tt.contributor_alias, u.display_name AS contributor_name
|
|
|
FROM task_templates tt JOIN users u ON tt.contributor_id=u.id
|
|
|
WHERE tt.is_active=1 ORDER BY tt.usage_count DESC
|
|
|
""")
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify(rows)
|
|
|
|
|
|
|
|
|
@app.route('/api/task_templates', methods=['POST'])
|
|
|
@require_auth
|
|
|
def publish_template():
|
|
|
data = request.get_json()
|
|
|
owner_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("""
|
|
|
INSERT INTO task_templates (title, description, category, contributor_id, contributor_alias, task_config)
|
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
|
""", (
|
|
|
data['title'],
|
|
|
data.get('description', ''),
|
|
|
data.get('category', '情报处理'),
|
|
|
owner_id,
|
|
|
data.get('contributor_alias', g.current_user['name']),
|
|
|
db.to_json(data.get('task_config', {}))
|
|
|
))
|
|
|
conn.commit()
|
|
|
new_id = cur.lastrowid
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'id': new_id, 'message': '模板发布成功'}), 201
|
|
|
|
|
|
|
|
|
@app.route('/api/task_templates/<int:tpl_id>/import', methods=['POST'])
|
|
|
@require_auth
|
|
|
def import_template(tpl_id):
|
|
|
owner_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT * FROM task_templates WHERE id=%s", (tpl_id,))
|
|
|
tpl = cur.fetchone()
|
|
|
if not tpl:
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'error': 'template not found'}), 404
|
|
|
|
|
|
config = tpl.get('task_config') or {}
|
|
|
cur2 = conn.cursor()
|
|
|
cur2.execute("""
|
|
|
INSERT INTO tasks (owner_id, name, cron_expression, agent_id, trigger_type, status, source_template_id)
|
|
|
VALUES (%s, %s, %s, NULL, 'cron', 'stopped', %s)
|
|
|
""", (
|
|
|
owner_id,
|
|
|
tpl['title'] + ' (导入)',
|
|
|
config.get('cron_expression', '未配置'),
|
|
|
tpl_id
|
|
|
))
|
|
|
cur2.execute("UPDATE task_templates SET usage_count=usage_count+1 WHERE id=%s", (tpl_id,))
|
|
|
conn.commit()
|
|
|
new_task_id = cur2.lastrowid
|
|
|
cur.close(); cur2.close(); conn.close()
|
|
|
return jsonify({'task_id': new_task_id, 'message': '导入成功'})
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
# 团队协作空间
|
|
|
# ─────────────────────────────────────────────
|
|
|
@app.route('/api/teams/<int:team_id>/members')
|
|
|
@require_auth
|
|
|
def team_members(team_id):
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("""
|
|
|
SELECT u.id, u.display_name AS name, u.avatar_color AS color,
|
|
|
tm.role, tm.is_online AS online, tm.joined_at
|
|
|
FROM team_members tm JOIN users u ON tm.user_id=u.id
|
|
|
WHERE tm.team_id=%s ORDER BY tm.joined_at ASC
|
|
|
""", (team_id,))
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify(db.serialize_rows(rows))
|
|
|
|
|
|
|
|
|
@app.route('/api/teams/<int:team_id>/members/<int:user_id>/role', methods=['PUT'])
|
|
|
@require_auth
|
|
|
def change_member_role(team_id, user_id):
|
|
|
data = request.get_json()
|
|
|
role = data.get('role')
|
|
|
if role not in ('admin', 'editor', 'viewer'):
|
|
|
return jsonify({'error': 'invalid role'}), 400
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("UPDATE team_members SET role=%s WHERE team_id=%s AND user_id=%s", (role, team_id, user_id))
|
|
|
conn.commit()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'message': '角色已更新'})
|
|
|
|
|
|
|
|
|
@app.route('/api/teams/<int:team_id>/members/<int:user_id>', methods=['DELETE'])
|
|
|
@require_auth
|
|
|
def remove_member(team_id, user_id):
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("DELETE FROM team_members WHERE team_id=%s AND user_id=%s", (team_id, user_id))
|
|
|
conn.commit()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'message': '成员已移除'})
|
|
|
|
|
|
|
|
|
@app.route('/api/teams/<int:team_id>/members', methods=['POST'])
|
|
|
@require_auth
|
|
|
def invite_member(team_id):
|
|
|
data = request.get_json()
|
|
|
name = data.get('name', '').strip()
|
|
|
if not name:
|
|
|
return jsonify({'error': 'name required'}), 400
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT id FROM users WHERE display_name=%s", (name,))
|
|
|
user = cur.fetchone()
|
|
|
if not user:
|
|
|
import random, string
|
|
|
uname = 'user_' + ''.join(random.choices(string.digits, k=6))
|
|
|
cur2 = conn.cursor()
|
|
|
cur2.execute("""
|
|
|
INSERT INTO users (username, display_name, password_hash, avatar_color)
|
|
|
VALUES (%s, %s, 'placeholder', %s)
|
|
|
""", (uname, name, data.get('color', '#3b82f6')))
|
|
|
conn.commit()
|
|
|
user_id = cur2.lastrowid
|
|
|
cur2.close()
|
|
|
else:
|
|
|
user_id = user['id']
|
|
|
cur3 = conn.cursor()
|
|
|
try:
|
|
|
cur3.execute("INSERT INTO team_members (team_id, user_id, role) VALUES (%s, %s, 'viewer')", (team_id, user_id))
|
|
|
conn.commit()
|
|
|
except Exception:
|
|
|
pass
|
|
|
cur3.close(); cur.close(); conn.close()
|
|
|
return jsonify({'user_id': user_id, 'message': f'{name} 已加入团队'})
|
|
|
|
|
|
|
|
|
@app.route('/api/teams/<int:team_id>/shared_resources')
|
|
|
@require_auth
|
|
|
def shared_resources(team_id):
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("""
|
|
|
SELECT sr.id, sr.resource_type, sr.resource_id, sr.resource_name,
|
|
|
sr.usage_count, u.display_name AS creator, sr.shared_at
|
|
|
FROM team_shared_resources sr JOIN users u ON sr.shared_by=u.id
|
|
|
WHERE sr.team_id=%s AND sr.is_active=1 ORDER BY sr.shared_at DESC
|
|
|
""", (team_id,))
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify(db.serialize_rows(rows))
|
|
|
|
|
|
|
|
|
@app.route('/api/teams/<int:team_id>/shared_resources/<int:res_id>', methods=['DELETE'])
|
|
|
@require_auth
|
|
|
def unshare_resource(team_id, res_id):
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("UPDATE team_shared_resources SET is_active=0 WHERE id=%s AND team_id=%s", (res_id, team_id))
|
|
|
conn.commit()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'message': '已取消共享'})
|
|
|
|
|
|
|
|
|
@app.route('/api/teams/<int:team_id>/audit_logs')
|
|
|
@require_auth
|
|
|
def audit_logs(team_id):
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("""
|
|
|
SELECT al.id, u.display_name AS user, al.action, al.resource_type, al.created_at
|
|
|
FROM team_audit_logs al JOIN users u ON al.user_id=u.id
|
|
|
WHERE al.team_id=%s ORDER BY al.created_at DESC LIMIT 30
|
|
|
""", (team_id,))
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify(db.serialize_rows(rows))
|
|
|
|
|
|
|
|
|
@app.route('/api/teams/<int:team_id>/comments')
|
|
|
@require_auth
|
|
|
def team_comments(team_id):
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("""
|
|
|
SELECT c.id, u.display_name AS user, u.avatar_color AS color,
|
|
|
c.content AS text, c.created_at AS time, c.parent_id
|
|
|
FROM team_comments c JOIN users u ON c.user_id=u.id
|
|
|
WHERE c.team_id=%s AND c.is_deleted=0 ORDER BY c.created_at ASC
|
|
|
""", (team_id,))
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify(db.serialize_rows(rows))
|
|
|
|
|
|
|
|
|
@app.route('/api/teams/<int:team_id>/comments', methods=['POST'])
|
|
|
@require_auth
|
|
|
def post_comment(team_id):
|
|
|
data = request.get_json()
|
|
|
user_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("""
|
|
|
INSERT INTO team_comments (team_id, user_id, content, parent_id)
|
|
|
VALUES (%s, %s, %s, %s)
|
|
|
""", (team_id, user_id, data['content'], data.get('parent_id')))
|
|
|
conn.commit()
|
|
|
new_id = cur.lastrowid
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'id': new_id, 'message': '留言成功'})
|
|
|
|
|
|
|
|
|
@app.route('/api/teams/<int:team_id>/info')
|
|
|
@require_auth
|
|
|
def team_info(team_id):
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT id, team_code, team_name, description FROM teams WHERE id=%s", (team_id,))
|
|
|
row = cur.fetchone()
|
|
|
cur.close(); conn.close()
|
|
|
if not row:
|
|
|
return jsonify({'error': 'not found'}), 404
|
|
|
return jsonify(row)
|
|
|
|
|
|
|
|
|
# 获取当前用户所属团队
|
|
|
@app.route('/api/my/team')
|
|
|
@require_auth
|
|
|
def my_team():
|
|
|
user_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("""
|
|
|
SELECT t.id, t.team_code, t.team_name
|
|
|
FROM team_members tm JOIN teams t ON tm.team_id=t.id
|
|
|
WHERE tm.user_id=%s ORDER BY tm.joined_at ASC LIMIT 1
|
|
|
""", (user_id,))
|
|
|
row = cur.fetchone()
|
|
|
cur.close(); conn.close()
|
|
|
if not row:
|
|
|
return jsonify({'id': 1, 'team_code': 'DEFAULT', 'team_name': '默认团队'})
|
|
|
return jsonify(row)
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
# 核心配置中心
|
|
|
# ─────────────────────────────────────────────
|
|
|
@app.route('/api/datasources')
|
|
|
@require_auth
|
|
|
def list_datasources():
|
|
|
owner_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("""
|
|
|
SELECT id, name, ds_type, host, database_name, username, connection_status, last_tested_at
|
|
|
FROM datasources WHERE owner_id=%s ORDER BY created_at DESC
|
|
|
""", (owner_id,))
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify(db.serialize_rows(rows))
|
|
|
|
|
|
|
|
|
@app.route('/api/datasources/<int:ds_id>', methods=['PUT'])
|
|
|
@require_auth
|
|
|
def update_datasource(ds_id):
|
|
|
data = request.get_json()
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
password = data.get('password', '')
|
|
|
enc = db.simple_encrypt(password) if password else None
|
|
|
fields, vals = [], []
|
|
|
if 'host' in data: fields.append('host=%s'); vals.append(data['host'])
|
|
|
if 'username' in data: fields.append('username=%s'); vals.append(data['username'])
|
|
|
if enc: fields.append('credential_enc=%s'); vals.append(enc)
|
|
|
if fields:
|
|
|
vals.append(ds_id)
|
|
|
cur.execute(f"UPDATE datasources SET {','.join(fields)} WHERE id=%s", vals)
|
|
|
conn.commit()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'message': '凭证已保存'})
|
|
|
|
|
|
|
|
|
@app.route('/api/datasources', methods=['POST'])
|
|
|
@require_auth
|
|
|
def create_datasource():
|
|
|
data = request.get_json()
|
|
|
owner_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
enc = db.simple_encrypt(data.get('password', ''))
|
|
|
cur.execute("""
|
|
|
INSERT INTO datasources (owner_id, name, ds_type, host, database_name, username, credential_enc, connection_status)
|
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, 'unknown')
|
|
|
""", (
|
|
|
owner_id,
|
|
|
data['name'], data.get('ds_type', 'mysql'),
|
|
|
data.get('host', ''), data.get('database_name', ''),
|
|
|
data.get('username', ''), enc
|
|
|
))
|
|
|
conn.commit()
|
|
|
new_id = cur.lastrowid
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'id': new_id, 'message': '数据源已创建'}), 201
|
|
|
|
|
|
|
|
|
@app.route('/api/ai_engines')
|
|
|
@require_auth
|
|
|
def list_ai_engines():
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("SELECT id, engine_type, name, endpoint, description, status, last_checked_at FROM ai_engines")
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify(db.serialize_rows(rows))
|
|
|
|
|
|
|
|
|
@app.route('/api/ai_engines/<int:eng_id>/restart', methods=['POST'])
|
|
|
@require_auth
|
|
|
def restart_engine(eng_id):
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("UPDATE ai_engines SET status='ready', last_checked_at=NOW() WHERE id=%s", (eng_id,))
|
|
|
conn.commit()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'message': '重启指令已发送'})
|
|
|
|
|
|
|
|
|
@app.route('/api/alerts')
|
|
|
@require_auth
|
|
|
def list_alerts():
|
|
|
user_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
cur.execute("""
|
|
|
SELECT id, alert_type, title, content, is_read, created_at
|
|
|
FROM alert_notifications WHERE target_user_id=%s
|
|
|
ORDER BY created_at DESC LIMIT 20
|
|
|
""", (user_id,))
|
|
|
rows = cur.fetchall()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify(db.serialize_rows(rows))
|
|
|
|
|
|
|
|
|
@app.route('/api/alerts/read_all', methods=['POST'])
|
|
|
@require_auth
|
|
|
def mark_alerts_read():
|
|
|
user_id = g.current_user['id']
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("UPDATE alert_notifications SET is_read=1 WHERE target_user_id=%s", (user_id,))
|
|
|
conn.commit()
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'message': '全部已读'})
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
# OpenClaw 回调接口(无需登录验证,由 OpenClaw 直接调用)
|
|
|
# ─────────────────────────────────────────────
|
|
|
@app.route('/api/openclaw/callback/<int:task_id>', methods=['POST'])
|
|
|
def openclaw_callback(task_id):
|
|
|
"""
|
|
|
OpenClaw 执行完成后回调此接口,写入执行结果和日志。
|
|
|
请求体示例:
|
|
|
{
|
|
|
"execution_id": 123, # 灵枢侧执行记录ID(可选,优先用)
|
|
|
"job_id": "lingshu_task_1_exec_123",
|
|
|
"status": "success", # success / failed / timeout
|
|
|
"duration_ms": 1500,
|
|
|
"cpu_usage_pct": 35.2,
|
|
|
"memory_usage_pct": 28.1,
|
|
|
"error_message": "",
|
|
|
"logs": [
|
|
|
{"level": "info", "time": "2026-01-01 12:00:00.000", "message": "xxx", "node": "step1"},
|
|
|
...
|
|
|
]
|
|
|
}
|
|
|
"""
|
|
|
import json as _json
|
|
|
data = request.get_json(silent=True) or {}
|
|
|
|
|
|
status = data.get('status', 'success')
|
|
|
duration_ms = data.get('duration_ms')
|
|
|
cpu = data.get('cpu_usage_pct')
|
|
|
mem = data.get('memory_usage_pct')
|
|
|
error_msg = data.get('error_message', '')
|
|
|
logs = data.get('logs', [])
|
|
|
exec_id_from_cb = data.get('execution_id')
|
|
|
|
|
|
# 找到对应的执行记录
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor(dictionary=True)
|
|
|
|
|
|
if exec_id_from_cb:
|
|
|
exec_id = exec_id_from_cb
|
|
|
else:
|
|
|
# 取该任务最近一条 running 的执行记录
|
|
|
cur.execute("""
|
|
|
SELECT id FROM task_executions
|
|
|
WHERE task_id=%s AND status='running'
|
|
|
ORDER BY started_at DESC LIMIT 1
|
|
|
""", (task_id,))
|
|
|
row = cur.fetchone()
|
|
|
exec_id = row['id'] if row else None
|
|
|
|
|
|
if not exec_id:
|
|
|
cur.close(); conn.close()
|
|
|
return jsonify({'ok': False, 'message': '未找到对应执行记录'}), 404
|
|
|
|
|
|
# 更新执行记录
|
|
|
cur2 = conn.cursor()
|
|
|
cur2.execute("""
|
|
|
UPDATE task_executions
|
|
|
SET status=%s, ended_at=NOW(), duration_ms=%s,
|
|
|
cpu_usage_pct=%s, memory_usage_pct=%s, error_message=%s
|
|
|
WHERE id=%s
|
|
|
""", (status, duration_ms, cpu, mem, error_msg or None, exec_id))
|
|
|
|
|
|
# 更新任务最后运行时间
|
|
|
cur2.execute("UPDATE tasks SET last_run_at=NOW() WHERE id=%s", (task_id,))
|
|
|
|
|
|
# 写入日志
|
|
|
now_str = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
|
|
|
for log in logs:
|
|
|
cur2.execute("""
|
|
|
INSERT INTO task_logs (execution_id, task_id, log_level, log_time, message, node_name)
|
|
|
VALUES (%s, %s, %s, %s, %s, %s)
|
|
|
""", (
|
|
|
exec_id, task_id,
|
|
|
log.get('level', 'info'),
|
|
|
log.get('time', now_str),
|
|
|
log.get('message', ''),
|
|
|
log.get('node')
|
|
|
))
|
|
|
|
|
|
# 失败时写告警通知
|
|
|
if status in ('failed', 'timeout'):
|
|
|
cur2.execute("""
|
|
|
INSERT INTO alert_notifications (target_user_id, task_id, alert_type, title, content)
|
|
|
SELECT owner_id, %s, 'task_fail', '任务执行失败',
|
|
|
CONCAT('任务执行', IF(%s='timeout','超时','失败'), IFNULL(CONCAT(':', %s), ''))
|
|
|
FROM tasks WHERE id=%s
|
|
|
""", (task_id, status, error_msg or None, task_id))
|
|
|
|
|
|
conn.commit()
|
|
|
cur.close(); cur2.close(); conn.close()
|
|
|
|
|
|
return jsonify({'ok': True, 'message': '回调已处理'})
|
|
|
|
|
|
|
|
|
@app.route('/api/openclaw/status', methods=['GET'])
|
|
|
def openclaw_status():
|
|
|
"""检查 OpenClaw 连接状态:通过 WebSocket 握手探测"""
|
|
|
try:
|
|
|
result = openclaw._acp_call('cron.status', {})
|
|
|
return jsonify({'connected': True, 'message': 'OpenClaw 在线',
|
|
|
'base_url': openclaw.OPENCLAW_BASE})
|
|
|
except Exception as e:
|
|
|
err = str(e)
|
|
|
connected = 'not reachable' not in err.lower() and 'ConnectionRefusedError' not in err
|
|
|
return jsonify({'connected': False, 'message': err,
|
|
|
'base_url': openclaw.OPENCLAW_BASE})
|
|
|
|
|
|
|
|
|
# ─────────────────────────────────────────────
|
|
|
# 工具函数
|
|
|
# ─────────────────────────────────────────────
|
|
|
def _audit_log(user_id, team_id, action, res_type=None, res_id=None):
|
|
|
if not team_id:
|
|
|
return
|
|
|
try:
|
|
|
conn = db.get_conn()
|
|
|
cur = conn.cursor()
|
|
|
cur.execute("""
|
|
|
INSERT INTO team_audit_logs (team_id, user_id, action, resource_type, resource_id)
|
|
|
VALUES (%s, %s, %s, %s, %s)
|
|
|
""", (team_id, user_id, action, res_type, res_id))
|
|
|
conn.commit()
|
|
|
cur.close(); conn.close()
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)
|