修改任务取消逻辑,添加删除任务接口 #42

Merged
hnu202326010204 merged 7 commits from lianghao_branch into develop 2 weeks ago

@ -15,7 +15,7 @@
- `500 Internal Server Error`:服务器内部错误。
- **JWT 身份错误**:使用 `@jwt_required` 的接口在缺少或失效 Token 时会由 Flask-JWT-Extended 返回标准 401 响应;使用 `@int_jwt_required` 的接口若无法将身份标识转换为整数,则返回 `{"error": "无效的用户身份标识"}`401
- **任务类型代码**`perturbation`(加噪)、`finetune`(微调)、`heatmap`(热力图)、`evaluate`(评估)。
- **任务状态代码**:需与 `task_status` 表保持一致(如 `waiting`、`processing`、`completed`、`failed` 等)。
- **任务状态代码**:需与 `task_status` 表保持一致(如 `waiting`、`processing`、`completed`、`failed`、`cancelled` 等)。
---
@ -967,7 +967,7 @@
**成功响应**`{"message": "用户信息更新成功", "user": {...}}`
**错误响应**
- `401`:管理员 Token 无效。
- `403 {"error": "需要管理员权限"}`(预期;当前实现因装饰器缺陷多返回 500
- `403 {"error": "需要管理员权限"}`(预期;当前实现多因装饰器缺陷直接返回 500
- `404 {"error": "用户不存在"}`
- `400 {"error": "用户名已存在"}`
- `400 {"error": "邮箱已被使用"}`
@ -1027,7 +1027,7 @@
"purpose": "register"
}
```
> `purpose` 可选值:`register`(默认)、`change_email` 等。
> `purpose` 可选值:`register`(默认)、`change_email` `forgot_password`等。
**成功响应** `200 OK`
```json
@ -1093,6 +1093,100 @@
- `400 {"error": "该用户名已被使用"}`
- `500 {"error": "用户名修改失败: ..."}`
### POST `/api/auth/forgot-password`
**功能**:通过邮箱验证码重置密码。
**认证**:否
**请求体**
```json
{
"email": "user@example.com",
"code": "123456",
"new_password": "NewP@ssw0rd"
}
```
**成功响应** `200 OK`
```json
{"message": "密码重置成功"}
```
**错误响应**
- `400 {"error": "邮箱、验证码和新密码不能为空"}`
- `400 {"error": "验证码无效或已过期"}`
- `404 {"error": "用户不存在"}`
- `500 {"error": "密码重置失败: ..."}`
---
### POST `/api/auth/code`
**功能**:发送邮箱验证码(注册、修改邮箱、忘记密码等场景)。
**认证**:否
**请求体**
```json
{
"email": "user@example.com",
"purpose": "forgot" // 可选: register/change_email/forgot_password
}
```
**成功响应** `200 OK`
```json
{"message": "验证码已发送"}
```
**错误响应**
- `400 {"error": "邮箱不能为空"}`
- `400 {"error": "邮箱格式不正确"}`
- `500 {"error": "发送验证码失败: ..."}`
---
### POST `/api/task/<task_id>/restart`
**功能**:重启已取消或失败的任务,重新入队。
**认证**:是
**成功响应** `200 OK`
```json
{"message": "任务已重启", "job_id": "pert_123"}
```
**错误响应**
- `400 {"error": "仅取消或失败的任务可重启"}`
- `404 {"error": "任务不存在或无权限"}`
- `500 {"error": "重启任务失败: ..."}`
---
### DELETE `/api/task/<task_id>`
**功能**:删除已取消或失败的任务,级联删除所有相关数据。
**认证**:是
**成功响应** `200 OK`
```json
{"message": "任务已删除"}
```
**错误响应**
- `400 {"error": "仅取消或失败的任务可删除"}`
- `404 {"error": "任务不存在或无权限"}`
- `500 {"error": "删除任务失败: ..."}`
---
| HTTP 状态码 | 说明 |
| ----------- | -------------- |
| 200 | 请求成功 |
| 400 | 请求参数错误 |
| 403 | 无权限访问 |
| 404 | 资源不存在 |
| 500 | 服务器内部错误 |
---
**任务状态代码说明**
| 状态代码 | 说明 |
| ----------- | ------------ |
| waiting | 待处理 |
| processing | 进行中 |
| completed | 已完成 |
| failed | 失败 |
| cancelled | 已取消 |
---
---
## Image 模块补充
@ -1904,3 +1998,11 @@ Authorization: Bearer <token>
- [POST /api/task/finetune/from-upload](#post-apitaskfinetunefrom-upload):新增 `custom_prompt` 参数。
- [GET /api/task/finetune/<task_id>/coords](#get-apitaskfinetunetask_idcoords)完善3D可视化坐标数据接口文档新增详细的请求响应格式说明和错误处理。
- [GET /api/task/<task_id>/logs](#get-apitasktask_idlogs):完善任务日志接口文档,新增详细的功能说明、响应格式、错误处理和使用场景。
### 2026-01-03 统一重启/删除任务与忘记密码功能更新
- [POST /api/auth/forgot-password](#post-apiauthforgot-password):新增“忘记密码”接口,支持通过邮箱验证码重置密码。
- [POST /api/auth/code](#post-apiauthcode):新增“发送验证码”接口,支持注册、修改邮箱、忘记密码等场景。
- [POST /api/task/<task_id>/restart](#post-apitasktask_idrestart):新增“统一重启任务”接口,支持对 cancelled/failed 状态的任务重新入队。
- [DELETE /api/task/<task_id>](#delete-apitasktask_id):新增“删除任务”接口,支持对 cancelled/completed/failed 状态的任务彻底删除。
- 任务状态说明、相关接口文档已补充 `cancelled` 状态。

@ -153,6 +153,42 @@ def login():
except Exception as e:
return jsonify({'error': f'登录失败: {str(e)}'}), 500
@auth_bp.route('/forgot-password', methods=['POST'])
def forgot_password():
"""
忘记密码校验邮箱验证码后重置密码
参数email, code, new_password
"""
try:
data = request.get_json()
email = data.get('email')
code = data.get('code')
new_password = data.get('new_password')
if not email or not code or not new_password:
return jsonify({'error': '邮箱、验证码和新密码不能为空'}), 400
# 校验邮箱格式
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
return jsonify({'error': '邮箱格式不正确'}), 400
user = User.query.filter_by(email=email).first()
if not user:
return jsonify({'error': '用户不存在'}), 404
verification_service = VerificationService()
if not verification_service.verify_code(email, code, purpose='forgot_password'):
return jsonify({'error': '验证码无效或已过期'}), 400
# 设置新密码
user.set_password(new_password)
db.session.commit()
return jsonify({'message': '密码重置成功'}), 200
except Exception as e:
db.session.rollback()
return jsonify({'error': f'密码重置失败: {str(e)}'}), 500
@auth_bp.route('/change-password', methods=['POST'])
@int_jwt_required
def change_password(current_user_id):

@ -106,24 +106,54 @@ def cancel_task(task_id, current_user_id):
return jsonify({'message': '任务已取消'}), 200
return TaskService.json_error('取消任务失败', 500)
@task_bp.route('/<int:task_id>/restart', methods=['POST'])
@int_jwt_required
def restart_task(task_id, current_user_id):
task = Task.query.get(task_id)
if not TaskService.ensure_task_owner(task, current_user_id):
return TaskService.json_error('任务不存在或无权限', 404)
# 只允许cancelled/failed状态重启
status_code = task.task_status.task_status_code if task and task.task_status else None
if status_code not in ("cancelled", "failed"):
return TaskService.json_error('仅取消或失败的任务可重启', 400)
if not TaskService.restart_task(task_id):
return TaskService.json_error('重启任务失败', 500)
# 自动启动任务(按类型分发)
type_code = TaskService.get_task_type_code(task)
if type_code == 'perturbation':
job_id = TaskService.start_perturbation_task(task_id)
elif type_code == 'finetune':
job_id = TaskService.start_finetune_task(task_id)
elif type_code == 'heatmap':
job_id = TaskService.start_heatmap_task(task_id)
elif type_code == 'evaluate':
job_id = TaskService.start_evaluate_task(task_id)
else:
job_id = None
return jsonify({'message': '任务已重启', 'job_id': job_id}), 200
@task_bp.route('/<int:task_id>', methods=['DELETE'])
@int_jwt_required
def delete_task(task_id, current_user_id):
task = Task.query.get(task_id)
if not TaskService.ensure_task_owner(task, current_user_id):
return TaskService.json_error('任务不存在或无权限', 404)
status_code = task.task_status.task_status_code if task and task.task_status else None
if status_code not in ("cancelled", "failed"):
return TaskService.json_error('仅取消或失败的任务可删除', 400)
success, err = TaskService.delete_task(task_id, user_id=current_user_id)
if not success:
return TaskService.json_error(f'删除任务失败: {err}', 500)
return jsonify({'message': '任务已删除'}), 200
@task_bp.route('/quota', methods=['GET'])
@int_jwt_required
def get_task_quota(current_user_id):
user = TaskService.get_user(current_user_id)
if not user:
quota = TaskService.get_user_task_quota(current_user_id)
if quota is None:
return TaskService.json_error('用户不存在', 404)
role = user.role
max_tasks = role.max_concurrent_tasks if role and role.max_concurrent_tasks is not None else 0
current_count = Task.query.filter_by(user_id=current_user_id).count()
remaining = max(max_tasks - current_count, 0)
return jsonify({
'max_tasks': max_tasks,
'current_tasks': current_count,
'remaining_tasks': remaining
}), 200
return jsonify(quota), 200
# ==================== 加噪任务 ====================
@ -223,6 +253,11 @@ def create_perturbation_task(current_user_id):
except Exception:
return TaskService.json_error('非法的 flow_id 参数')
# 检查配额
quota = TaskService.get_user_task_quota(current_user_id)
if quota and quota['remaining_tasks'] <= 0:
return TaskService.json_error('任务配额已满,请等待现有任务完成', 403)
try:
waiting_status = TaskService.ensure_status('waiting')
perturb_type = TaskService.require_task_type('perturbation')
@ -372,6 +407,11 @@ def create_heatmap_task(current_user_id):
if image_code != 'perturbed':
return TaskService.json_error(f'仅支持加噪图生成热力图,当前图片类型为: {perturbed_image.image_type.image_name}', 400)
# 检查配额
quota = TaskService.get_user_task_quota(current_user_id)
if quota and quota['remaining_tasks'] <= 0:
return TaskService.json_error('任务配额已满,请等待现有任务完成', 403)
try:
heatmap_type = TaskService.require_task_type('heatmap')
waiting_status = TaskService.ensure_status('waiting')
@ -487,6 +527,11 @@ def create_finetune_from_perturbation(current_user_id):
if data_type_id and not DataType.query.get(data_type_id):
return TaskService.json_error('数据集类型不存在')
# 检查配额
quota = TaskService.get_user_task_quota(current_user_id)
if quota and quota['remaining_tasks'] <= 0:
return TaskService.json_error('任务配额已满,请等待现有任务完成', 403)
try:
waiting_status = TaskService.ensure_status('waiting')
finetune_type = TaskService.require_task_type('finetune')
@ -588,6 +633,11 @@ def create_finetune_from_upload(current_user_id):
except Exception:
return TaskService.json_error('非法的 flow_id 参数')
# 检查配额
quota = TaskService.get_user_task_quota(current_user_id)
if quota and quota['remaining_tasks'] <= 0:
return TaskService.json_error('任务配额已满,请等待现有任务完成', 403)
try:
waiting_status = TaskService.ensure_status('waiting')
finetune_type = TaskService.require_task_type('finetune')
@ -715,6 +765,11 @@ def create_evaluate_task(current_user_id):
if not finetune_task.finetune:
return TaskService.json_error('微调任务未配置详情', 400)
# 检查配额
quota = TaskService.get_user_task_quota(current_user_id)
if quota and quota['remaining_tasks'] <= 0:
return TaskService.json_error('任务配额已满,请等待现有任务完成', 403)
try:
evaluate_type = TaskService.require_task_type('evaluate')
waiting_status = TaskService.ensure_status('waiting')

@ -47,7 +47,19 @@ class User(db.Model):
tasks = db.relationship('Task', backref='user', lazy='dynamic', cascade='all, delete-orphan')
def set_password(self, password):
"""设置密码"""
"""设置密码,包含复杂度校验"""
import re
# 密码复杂度要求:长度>=8包含大小写字母、数字和特殊字符
if len(password) < 8:
raise ValueError("密码长度不能少于8位")
if not re.search(r'[A-Z]', password):
raise ValueError("密码需包含大写字母")
if not re.search(r'[a-z]', password):
raise ValueError("密码需包含小写字母")
if not re.search(r'\d', password):
raise ValueError("密码需包含数字")
if not re.search(r'[^A-Za-z0-9]', password):
raise ValueError("密码需包含特殊字符")
self.password_hash = generate_password_hash(password)
def check_password(self, password):

@ -20,6 +20,7 @@ import logging
from datetime import datetime
from typing import Optional
from flask import jsonify
from app import db
from redis import Redis
from rq.job import Job
from app.services.storage import PathManager
@ -86,6 +87,62 @@ def _get_task_handler(task_type: str):
class TaskService:
@staticmethod
def delete_task(task_id, user_id=None):
"""
删除任务仅允许cancelled/failed状态支持可选用户校验
"""
try:
task_repo = _get_task_repo()
task = task_repo.get_by_id(task_id)
if not task:
return False, '任务不存在'
status_code = task.task_status.task_status_code if task.task_status else None
if status_code not in ("cancelled", "failed"):
return False, '仅取消或失败的任务可删除'
if user_id is not None and not task_repo.is_owner(task, user_id):
return False, '无权限删除该任务'
# 删除子表数据
if hasattr(task, 'perturbation') and task.perturbation:
db.session.delete(task.perturbation)
if hasattr(task, 'finetune') and task.finetune:
db.session.delete(task.finetune)
if hasattr(task, 'heatmap') and task.heatmap:
db.session.delete(task.heatmap)
if hasattr(task, 'evaluation') and task.evaluation:
db.session.delete(task.evaluation)
# 删除主任务
db.session.delete(task)
db.session.commit()
return True, None
except Exception as e:
db.session.rollback()
logger.error(f"Error deleting task: {e}")
return False, str(e)
@staticmethod
def restart_task(task_id):
"""
重启任务仅允许cancelled/failed状态重启后设为waiting
"""
try:
task_repo = _get_task_repo()
task = task_repo.get_by_id(task_id)
if not task:
return False
status_code = task.task_status.task_status_code if task.task_status else None
if status_code not in ("cancelled", "failed"):
# 只有取消/失败的任务允许重启
return False
if task_repo.update_status(task, 'waiting'):
task.started_at = None
task.finished_at = None
task.error_message = None
return task_repo.save()
return False
except Exception as e:
logger.error(f"Error restarting task: {e}")
return False
"""任务处理服务"""
# ==================== 路径代理方法(委托给 PathManager====================
@ -271,6 +328,39 @@ class TaskService:
"""获取用户(委托给 UserRepository"""
return _get_user_repo().get_by_id(user_id)
@staticmethod
def get_user_task_quota(user_id):
"""
获取用户任务配额信息
Args:
user_id: 用户ID
Returns:
dict: {
'max_tasks': int,
'current_tasks': int,
'remaining_tasks': int
}
"""
user = TaskService.get_user(user_id)
if not user:
return None
role = user.role
max_tasks = role.max_concurrent_tasks if role and role.max_concurrent_tasks is not None else 0
# 统计正在运行或排队的任务数 (Waiting + Processing)
current_count = _get_task_repo().count_pending_tasks(user_id)
remaining = max(max_tasks - current_count, 0)
return {
'max_tasks': max_tasks,
'current_tasks': current_count,
'remaining_tasks': remaining
}
# ==================== Redis/RQ 连接管理 ====================
@staticmethod
@ -350,37 +440,37 @@ class TaskService:
@staticmethod
def cancel_task(task_id):
"""
取消任务通用取消适用于所有类型任务
Args:
task_id: 任务ID
Returns:
是否成功取消
取消任务仅允许waiting/processing状态取消后设为cancelled
"""
try:
task_repo = _get_task_repo()
task = task_repo.get_by_id(task_id)
if not task:
return False
status_code = task.task_status.task_status_code if task.task_status else None
if status_code not in ("waiting", "processing"):
# 只有待处理/进行中任务允许取消
return False
# 获取任务类型代码
type_code = task_repo.get_type_code(task)
# 尝试从队列中删除任务
# 尝试从队列中删除任务或终止正在运行的任务
try:
redis_conn = TaskService._get_redis_connection()
job_id = TaskService._get_job_id_for_task(task_id, type_code)
job = Job.fetch(job_id, connection=redis_conn)
if job.get_status() == 'started':
from rq.command import send_stop_job_command
send_stop_job_command(redis_conn, job_id)
logger.info(f"Sent stop command for running job {job_id}")
job.cancel()
job.delete()
except Exception as e:
logger.warning(f"Could not cancel RQ job: {e}")
# 使用 Repository 更新状态
if task_repo.update_status(task, 'failed'):
logger.warning(f"Could not cancel/stop RQ job: {e}")
# 更新为cancelled
if task_repo.update_status(task, 'cancelled'):
task.finished_at = datetime.utcnow()
return task_repo.save()
return False
except Exception as e:
logger.error(f"Error cancelling task: {e}")
return False

@ -29,6 +29,7 @@ def init_database():
task_statuses = [
{'task_status_code': 'waiting', 'task_status_name': '待处理', 'description': '任务已创建,等待处理'},
{'task_status_code': 'processing', 'task_status_name': '进行中', 'description': '任务正在处理中'},
{'task_status_code': 'cancelled', 'task_status_name': '已取消', 'description': '任务已被取消'},
{'task_status_code': 'completed', 'task_status_name': '已完成', 'description':'任务已成功完成'},
{'task_status_code': 'failed', 'task_status_name': '失败', 'description': '任务处理失败'}
]
@ -128,9 +129,9 @@ def init_database():
# 创建默认测试用户(三种角色各一个)
test_users = [
{'username': 'admin_test', 'email': 'admin@test.com', 'password': 'admin123', 'role_id': 1},
{'username': 'vip_test', 'email': 'vip@test.com', 'password': 'vip123', 'role_id': 2},
{'username': 'normal_test', 'email': 'normal@test.com', 'password': 'normal123', 'role_id': 3}
{'username': 'admin_test', 'email': 'admin@test.com', 'password': 'Admin123__', 'role_id': 1},
{'username': 'vip_test', 'email': 'vip@test.com', 'password': 'Vip123__', 'role_id': 2},
{'username': 'normal_test', 'email': 'normal@test.com', 'password': 'Normal123__', 'role_id': 3}
]
for user_data in test_users:

Loading…
Cancel
Save