|
|
"""
|
|
|
UI API模块单元测试
|
|
|
|
|
|
本模块为UI API模块提供全面的单元测试覆盖,包括:
|
|
|
- REST API端点测试
|
|
|
- WebSocket事件处理测试
|
|
|
- 工作流编排测试
|
|
|
- 作业管理测试
|
|
|
- 文件上传测试
|
|
|
- 错误处理测试
|
|
|
- 认证授权测试
|
|
|
"""
|
|
|
|
|
|
import pytest
|
|
|
import asyncio
|
|
|
import json
|
|
|
import tempfile
|
|
|
import os
|
|
|
from pathlib import Path
|
|
|
from unittest.mock import Mock, patch, AsyncMock, MagicMock
|
|
|
from flask import Flask
|
|
|
from flask_socketio import SocketIO
|
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
|
|
from src.ui.api import (
|
|
|
FileUploadAPI, CodeParseAPI, SpecGenerationAPI, MutationAPI, VerificationAPI,
|
|
|
WorkflowAPI, JobListAPI, JobDetailAPI, WorkflowListAPI, SystemStatusAPI
|
|
|
)
|
|
|
from src.ui.websocket import WebSocketHandler, WebSocketManager, EventType
|
|
|
from src.ui.workflow import WorkflowManager, WorkflowConfig, WorkflowMode
|
|
|
from src.ui.job_manager import JobManager, JobStatus, JobType
|
|
|
from src.ui.exceptions import (
|
|
|
ValidationError, AuthenticationError, AuthorizationError,
|
|
|
ResourceNotFoundError, WorkflowError, JobError, TimeoutError,
|
|
|
ConcurrencyError, ConfigurationError, ExternalServiceError, APIError
|
|
|
)
|
|
|
from src.utils.config import ConfigManager
|
|
|
|
|
|
|
|
|
class TestFileUploadAPI:
|
|
|
"""FileUploadAPI类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.app = Flask(__name__)
|
|
|
self.app.config['TESTING'] = True
|
|
|
self.app.config['SECRET_KEY'] = 'test-secret'
|
|
|
self.app.config['UPLOAD_FOLDER'] = tempfile.mkdtemp()
|
|
|
self.app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB
|
|
|
self.app.config['ALLOWED_EXTENSIONS'] = {'.c', '.cpp', '.h', '.hpp'}
|
|
|
|
|
|
with self.app.test_request_context():
|
|
|
self.api = FileUploadAPI()
|
|
|
|
|
|
def test_post_with_valid_file(self):
|
|
|
"""测试上传有效文件"""
|
|
|
# 创建临时文件
|
|
|
with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f:
|
|
|
f.write(b'int test() { return 0; }')
|
|
|
f.flush()
|
|
|
|
|
|
# 模拟文件上传
|
|
|
with open(f.name, 'rb') as upload_file:
|
|
|
from werkzeug.datastructures import FileStorage
|
|
|
file_storage = FileStorage(
|
|
|
stream=upload_file,
|
|
|
filename='test.c',
|
|
|
content_type='text/plain'
|
|
|
)
|
|
|
|
|
|
with self.app.test_request_context(method='POST', data={'file': file_storage}):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
data = response.get_json()
|
|
|
assert data['success'] is True
|
|
|
assert 'job_id' in data['data']
|
|
|
|
|
|
os.unlink(f.name)
|
|
|
|
|
|
def test_post_with_pasted_code(self):
|
|
|
"""测试粘贴代码上传"""
|
|
|
code_content = 'int pasted_function() { return 42; }'
|
|
|
|
|
|
with self.app.test_request_context(
|
|
|
method='POST',
|
|
|
data={'code_content': code_content}
|
|
|
):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
data = response.get_json()
|
|
|
assert data['success'] is True
|
|
|
assert 'job_id' in data['data']
|
|
|
|
|
|
def test_post_no_file_or_code(self):
|
|
|
"""测试无文件或代码上传"""
|
|
|
with self.app.test_request_context(method='POST'):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 400
|
|
|
data = response.get_json()
|
|
|
assert data['success'] is False
|
|
|
assert 'error' in data
|
|
|
|
|
|
def test_post_invalid_file_extension(self):
|
|
|
"""测试无效文件扩展名"""
|
|
|
with tempfile.NamedTemporaryFile(suffix='.txt', delete=False) as f:
|
|
|
f.write(b'invalid content')
|
|
|
f.flush()
|
|
|
|
|
|
with open(f.name, 'rb') as upload_file:
|
|
|
from werkzeug.datastructures import FileStorage
|
|
|
file_storage = FileStorage(
|
|
|
stream=upload_file,
|
|
|
filename='test.txt',
|
|
|
content_type='text/plain'
|
|
|
)
|
|
|
|
|
|
with self.app.test_request_context(method='POST', data={'file': file_storage}):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 400
|
|
|
data = response.get_json()
|
|
|
assert data['success'] is False
|
|
|
|
|
|
os.unlink(f.name)
|
|
|
|
|
|
def test_post_file_too_large(self):
|
|
|
"""测试文件过大"""
|
|
|
# 创建一个超过限制的大文件
|
|
|
large_content = b'0' * (17 * 1024 * 1024) # 17MB
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f:
|
|
|
f.write(large_content)
|
|
|
f.flush()
|
|
|
|
|
|
try:
|
|
|
with open(f.name, 'rb') as upload_file:
|
|
|
from werkzeug.datastructures import FileStorage
|
|
|
file_storage = FileStorage(
|
|
|
stream=upload_file,
|
|
|
filename='large.c',
|
|
|
content_type='text/plain'
|
|
|
)
|
|
|
|
|
|
with self.app.test_request_context(method='POST', data={'file': file_storage}):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 413
|
|
|
finally:
|
|
|
os.unlink(f.name)
|
|
|
|
|
|
|
|
|
class TestCodeParseAPI:
|
|
|
"""CodeParseAPI类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.app = Flask(__name__)
|
|
|
self.app.config['TESTING'] = True
|
|
|
with self.app.test_request_context():
|
|
|
self.api = CodeParseAPI()
|
|
|
|
|
|
def test_post_with_valid_data(self):
|
|
|
"""测试使用有效数据解析"""
|
|
|
data = {
|
|
|
'file_path': '/tmp/test.c',
|
|
|
'options': {
|
|
|
'include_functions': True,
|
|
|
'include_variables': True,
|
|
|
'complexity_analysis': True
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with patch('src.ui.api.JobManager') as mock_job_manager:
|
|
|
mock_job = Mock()
|
|
|
mock_job.job_id = 'test-job-id'
|
|
|
mock_job_manager.return_value.create_job.return_value = mock_job
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is True
|
|
|
assert 'job_id' in response_data['data']
|
|
|
|
|
|
def test_post_missing_file_path(self):
|
|
|
"""测试缺少文件路径"""
|
|
|
data = {'options': {'include_functions': True}}
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 400
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is False
|
|
|
assert 'file_path' in response_data['error']
|
|
|
|
|
|
def test_post_invalid_options(self):
|
|
|
"""测试无效选项"""
|
|
|
data = {
|
|
|
'file_path': '/tmp/test.c',
|
|
|
'options': 'invalid_options' # 应该是字典
|
|
|
}
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 400
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is False
|
|
|
|
|
|
|
|
|
class TestSpecGenerationAPI:
|
|
|
"""SpecGenerationAPI类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.app = Flask(__name__)
|
|
|
self.app.config['TESTING'] = True
|
|
|
with self.app.test_request_context():
|
|
|
self.api = SpecGenerationAPI()
|
|
|
|
|
|
def test_post_with_valid_metadata(self):
|
|
|
"""测试使用有效元数据生成规范"""
|
|
|
data = {
|
|
|
'functions': [
|
|
|
{
|
|
|
'name': 'test_function',
|
|
|
'return_type': 'int',
|
|
|
'parameters': [
|
|
|
{'name': 'x', 'type': 'int'},
|
|
|
{'name': 'y', 'type': 'int'}
|
|
|
],
|
|
|
'complexity_score': 0.5
|
|
|
}
|
|
|
],
|
|
|
'options': {
|
|
|
'verification_types': ['memory_safety', 'overflow_detection'],
|
|
|
'include_comments': True,
|
|
|
'style_guide': 'cbmc'
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with patch('src.ui.api.JobManager') as mock_job_manager:
|
|
|
mock_job = Mock()
|
|
|
mock_job.job_id = 'spec-job-id'
|
|
|
mock_job_manager.return_value.create_job.return_value = mock_job
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is True
|
|
|
assert 'job_id' in response_data['data']
|
|
|
|
|
|
def test_post_empty_functions_list(self):
|
|
|
"""测试空函数列表"""
|
|
|
data = {
|
|
|
'functions': [],
|
|
|
'options': {'verification_types': ['memory_safety']}
|
|
|
}
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 400
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is False
|
|
|
assert 'functions' in response_data['error']
|
|
|
|
|
|
def test_post_invalid_function_metadata(self):
|
|
|
"""测试无效函数元数据"""
|
|
|
data = {
|
|
|
'functions': [
|
|
|
{
|
|
|
'name': '', # 空名称
|
|
|
'return_type': 'int'
|
|
|
# 缺少必要字段
|
|
|
}
|
|
|
]
|
|
|
}
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 400
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is False
|
|
|
|
|
|
|
|
|
class TestMutationAPI:
|
|
|
"""MutationAPI类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.app = Flask(__name__)
|
|
|
self.app.config['TESTING'] = True
|
|
|
with self.app.test_request_context():
|
|
|
self.api = MutationAPI()
|
|
|
|
|
|
def test_post_with_valid_specification(self):
|
|
|
"""测试使用有效规范进行突变"""
|
|
|
data = {
|
|
|
'specification': 'void test(int x) { __CPROVER_assume(x > 0); }',
|
|
|
'function_name': 'test',
|
|
|
'options': {
|
|
|
'mutation_types': ['predicate', 'boundary'],
|
|
|
'max_mutations': 10,
|
|
|
'quality_threshold': 0.7
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with patch('src.ui.api.JobManager') as mock_job_manager:
|
|
|
mock_job = Mock()
|
|
|
mock_job.job_id = 'mutation-job-id'
|
|
|
mock_job_manager.return_value.create_job.return_value = mock_job
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is True
|
|
|
assert 'job_id' in response_data['data']
|
|
|
|
|
|
def test_post_missing_specification(self):
|
|
|
"""测试缺少规范"""
|
|
|
data = {
|
|
|
'function_name': 'test',
|
|
|
'options': {'mutation_types': ['predicate']}
|
|
|
}
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 400
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is False
|
|
|
assert 'specification' in response_data['error']
|
|
|
|
|
|
def test_post_invalid_specification_syntax(self):
|
|
|
"""测试无效规范语法"""
|
|
|
data = {
|
|
|
'specification': 'invalid syntax here {',
|
|
|
'function_name': 'test'
|
|
|
}
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 400
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is False
|
|
|
|
|
|
|
|
|
class TestVerificationAPI:
|
|
|
"""VerificationAPI类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.app = Flask(__name__)
|
|
|
self.app.config['TESTING'] = True
|
|
|
with self.app.test_request_context():
|
|
|
self.api = VerificationAPI()
|
|
|
|
|
|
def test_post_with_valid_specification(self):
|
|
|
"""测试使用有效规范进行验证"""
|
|
|
data = {
|
|
|
'specification': 'void test(int x) { __CPROVER_assert(x > 0, "positive"); }',
|
|
|
'function_name': 'test',
|
|
|
'options': {
|
|
|
'verification_types': ['memory_safety'],
|
|
|
'timeout': 300,
|
|
|
'depth': 20
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with patch('src.ui.api.JobManager') as mock_job_manager:
|
|
|
mock_job = Mock()
|
|
|
mock_job.job_id = 'verification-job-id'
|
|
|
mock_job_manager.return_value.create_job.return_value = mock_job
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is True
|
|
|
assert 'job_id' in response_data['data']
|
|
|
|
|
|
def test_post_missing_specification(self):
|
|
|
"""测试缺少规范"""
|
|
|
data = {
|
|
|
'function_name': 'test',
|
|
|
'options': {'verification_types': ['memory_safety']}
|
|
|
}
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 400
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is False
|
|
|
assert 'specification' in response_data['error']
|
|
|
|
|
|
def test_post_invalid_verification_options(self):
|
|
|
"""测试无效验证选项"""
|
|
|
data = {
|
|
|
'specification': 'void test(int x) { }',
|
|
|
'function_name': 'test',
|
|
|
'options': {
|
|
|
'timeout': -1 # 无效超时
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 400
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is False
|
|
|
|
|
|
|
|
|
class TestWorkflowAPI:
|
|
|
"""WorkflowAPI类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.app = Flask(__name__)
|
|
|
self.app.config['TESTING'] = True
|
|
|
with self.app.test_request_context():
|
|
|
self.api = WorkflowAPI()
|
|
|
|
|
|
def test_post_start_workflow(self):
|
|
|
"""测试启动工作流"""
|
|
|
data = {
|
|
|
'source_files': ['/tmp/test.c'],
|
|
|
'target_functions': ['test_function'],
|
|
|
'workflow_config': {
|
|
|
'mode': 'standard',
|
|
|
'enable_parsing': True,
|
|
|
'enable_generation': True,
|
|
|
'enable_mutation': True,
|
|
|
'enable_verification': True
|
|
|
}
|
|
|
}
|
|
|
|
|
|
with patch('src.ui.api._workflow_manager') as mock_workflow_manager:
|
|
|
mock_workflow_manager.start_workflow.return_value = 'workflow-123'
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is True
|
|
|
assert 'workflow_id' in response_data
|
|
|
assert response_data['workflow_id'] == 'workflow-123'
|
|
|
|
|
|
def test_post_missing_source_files(self):
|
|
|
"""测试缺少源文件"""
|
|
|
data = {
|
|
|
'target_functions': ['test_function'],
|
|
|
'workflow_config': {'mode': 'standard'}
|
|
|
}
|
|
|
|
|
|
with self.app.test_request_context(json=data):
|
|
|
response = self.api.post()
|
|
|
|
|
|
assert response.status_code == 400
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is False
|
|
|
assert 'source_files' in response_data['error']
|
|
|
|
|
|
def test_get_workflow_status(self):
|
|
|
"""测试获取工作流状态"""
|
|
|
workflow_id = 'workflow-123'
|
|
|
|
|
|
with patch('src.ui.api._workflow_manager') as mock_workflow_manager:
|
|
|
mock_status = {
|
|
|
'workflow_id': workflow_id,
|
|
|
'status': 'running',
|
|
|
'progress': 50.0,
|
|
|
'current_step': 'mutation'
|
|
|
}
|
|
|
mock_workflow_manager.get_workflow_status.return_value = mock_status
|
|
|
|
|
|
with self.app.test_request_context():
|
|
|
response = self.api.get(workflow_id)
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is True
|
|
|
assert response_data['workflow_id'] == workflow_id
|
|
|
assert response_data['status'] == 'running'
|
|
|
|
|
|
def test_delete_workflow(self):
|
|
|
"""测试删除工作流"""
|
|
|
workflow_id = 'workflow-123'
|
|
|
|
|
|
with patch('src.ui.api._workflow_manager') as mock_workflow_manager:
|
|
|
mock_workflow_manager.cancel_workflow.return_value = True
|
|
|
|
|
|
with self.app.test_request_context():
|
|
|
response = self.api.delete(workflow_id)
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is True
|
|
|
assert response_data['message'] == 'Workflow cancelled successfully'
|
|
|
|
|
|
|
|
|
class TestJobListAPI:
|
|
|
"""JobListAPI类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.app = Flask(__name__)
|
|
|
self.app.config['TESTING'] = True
|
|
|
with self.app.test_request_context():
|
|
|
self.api = JobListAPI()
|
|
|
|
|
|
def test_get_job_list(self):
|
|
|
"""测试获取作业列表"""
|
|
|
with patch('src.ui.api._job_manager') as mock_job_manager:
|
|
|
mock_jobs = [
|
|
|
Mock(job_id='job-1', job_type=JobType.CODE_PARSING, status=JobStatus.COMPLETED),
|
|
|
Mock(job_id='job-2', job_type=JobType.SPEC_GENERATION, status=JobStatus.RUNNING)
|
|
|
]
|
|
|
mock_job_manager.get_jobs.return_value = mock_jobs
|
|
|
|
|
|
with self.app.test_request_context():
|
|
|
response = self.api.get()
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is True
|
|
|
assert len(response_data['jobs']) == 2
|
|
|
|
|
|
def test_get_job_list_with_filters(self):
|
|
|
"""测试带过滤器的作业列表"""
|
|
|
with patch('src.ui.api._job_manager') as mock_job_manager:
|
|
|
mock_jobs = [Mock(job_id='job-1', job_type=JobType.CODE_PARSING, status=JobStatus.COMPLETED)]
|
|
|
mock_job_manager.get_jobs.return_value = mock_jobs
|
|
|
|
|
|
with self.app.test_request_context(query_string='status=completed&type=code_parsing'):
|
|
|
response = self.api.get()
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is True
|
|
|
assert len(response_data['jobs']) == 1
|
|
|
|
|
|
|
|
|
class TestJobDetailAPI:
|
|
|
"""JobDetailAPI类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.app = Flask(__name__)
|
|
|
self.app.config['TESTING'] = True
|
|
|
with self.app.test_request_context():
|
|
|
self.api = JobDetailAPI()
|
|
|
|
|
|
def test_get_job_details(self):
|
|
|
"""测试获取作业详情"""
|
|
|
job_id = 'job-123'
|
|
|
|
|
|
with patch('src.ui.api._job_manager') as mock_job_manager:
|
|
|
mock_job = Mock(
|
|
|
job_id=job_id,
|
|
|
job_type=JobType.CODE_PARSING,
|
|
|
status=JobStatus.COMPLETED,
|
|
|
progress=Mock(percentage=100.0, message='Completed'),
|
|
|
created_at='2023-01-01T00:00:00',
|
|
|
completed_at='2023-01-01T00:05:00'
|
|
|
)
|
|
|
mock_job_manager.get_job.return_value = mock_job
|
|
|
|
|
|
with self.app.test_request_context():
|
|
|
response = self.api.get(job_id)
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is True
|
|
|
assert response_data['job']['job_id'] == job_id
|
|
|
assert response_data['job']['status'] == 'completed'
|
|
|
|
|
|
def test_get_nonexistent_job(self):
|
|
|
"""测试获取不存在的作业"""
|
|
|
job_id = 'nonexistent-job'
|
|
|
|
|
|
with patch('src.ui.api._job_manager') as mock_job_manager:
|
|
|
mock_job_manager.get_job.return_value = None
|
|
|
|
|
|
with self.app.test_request_context():
|
|
|
response = self.api.get(job_id)
|
|
|
|
|
|
assert response.status_code == 404
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is False
|
|
|
assert 'not found' in response_data['error'].lower()
|
|
|
|
|
|
def test_delete_job(self):
|
|
|
"""测试删除作业"""
|
|
|
job_id = 'job-123'
|
|
|
|
|
|
with patch('src.ui.api._job_manager') as mock_job_manager:
|
|
|
mock_job_manager.cancel_job.return_value = True
|
|
|
|
|
|
with self.app.test_request_context():
|
|
|
response = self.api.delete(job_id)
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is True
|
|
|
|
|
|
|
|
|
class TestSystemStatusAPI:
|
|
|
"""SystemStatusAPI类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.app = Flask(__name__)
|
|
|
self.app.config['TESTING'] = True
|
|
|
with self.app.test_request_context():
|
|
|
self.api = SystemStatusAPI()
|
|
|
|
|
|
def test_get_system_status(self):
|
|
|
"""测试获取系统状态"""
|
|
|
with patch('src.ui.api._job_manager') as mock_job_manager:
|
|
|
mock_job_manager.get_statistics.return_value = {
|
|
|
'total_jobs': 10,
|
|
|
'active_jobs': 2,
|
|
|
'completed_jobs': 8
|
|
|
}
|
|
|
|
|
|
with patch('src.ui.api._workflow_manager') as mock_workflow_manager:
|
|
|
mock_workflow_manager.get_statistics.return_value = {
|
|
|
'total_workflows': 5,
|
|
|
'active_workflows': 1
|
|
|
}
|
|
|
|
|
|
with self.app.test_request_context():
|
|
|
response = self.api.get()
|
|
|
|
|
|
assert response.status_code == 200
|
|
|
response_data = response.get_json()
|
|
|
assert response_data['success'] is True
|
|
|
assert 'status' in response_data
|
|
|
assert 'version' in response_data
|
|
|
assert 'statistics' in response_data
|
|
|
|
|
|
|
|
|
class TestWebSocketIntegration:
|
|
|
"""WebSocket集成测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.app = Flask(__name__)
|
|
|
self.app.config['TESTING'] = True
|
|
|
self.socketio = SocketIO(self.app)
|
|
|
self.websocket_handler = WebSocketHandler(self.socketio, Mock())
|
|
|
|
|
|
def test_websocket_event_broadcast(self):
|
|
|
"""测试WebSocket事件广播"""
|
|
|
with patch.object(self.websocket_handler, 'send_job_progress') as mock_send:
|
|
|
self.websocket_handler.send_job_progress('job-123', {
|
|
|
'percentage': 50.0,
|
|
|
'message': 'Processing...'
|
|
|
})
|
|
|
|
|
|
mock_send.assert_called_once()
|
|
|
|
|
|
def test_websocket_workflow_update(self):
|
|
|
"""测试WebSocket工作流更新"""
|
|
|
with patch.object(self.websocket_handler, 'send_workflow_update') as mock_send:
|
|
|
self.websocket_handler.send_workflow_update('session-123', {
|
|
|
'workflow_id': 'workflow-123',
|
|
|
'status': 'completed'
|
|
|
})
|
|
|
|
|
|
mock_send.assert_called_once()
|
|
|
|
|
|
def test_websocket_error_notification(self):
|
|
|
"""测试WebSocket错误通知"""
|
|
|
with patch.object(self.websocket_handler, 'send_error_notification') as mock_send:
|
|
|
self.websocket_handler.send_error_notification(
|
|
|
'Test error message',
|
|
|
'ValidationError',
|
|
|
{'field': 'test_field'}
|
|
|
)
|
|
|
|
|
|
mock_send.assert_called_once()
|
|
|
|
|
|
|
|
|
class TestAPIErrorHandling:
|
|
|
"""API错误处理测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.app = Flask(__name__)
|
|
|
self.app.config['TESTING'] = True
|
|
|
with self.app.test_request_context():
|
|
|
pass
|
|
|
|
|
|
def test_validation_error_response(self):
|
|
|
"""测试验证错误响应"""
|
|
|
error = ValidationError("Invalid input", field="test_field")
|
|
|
|
|
|
response = {
|
|
|
"success": False,
|
|
|
"error": str(error),
|
|
|
"error_type": "ValidationError",
|
|
|
"field": "test_field"
|
|
|
}
|
|
|
|
|
|
assert response["success"] is False
|
|
|
assert "Invalid input" in response["error"]
|
|
|
assert response["error_type"] == "ValidationError"
|
|
|
|
|
|
def test_authentication_error_response(self):
|
|
|
"""测试认证错误响应"""
|
|
|
error = AuthenticationError("Authentication required")
|
|
|
|
|
|
response = {
|
|
|
"success": False,
|
|
|
"error": str(error),
|
|
|
"error_type": "AuthenticationError"
|
|
|
}
|
|
|
|
|
|
assert response["success"] is False
|
|
|
assert "Authentication required" in response["error"]
|
|
|
|
|
|
def test_resource_not_found_error_response(self):
|
|
|
"""测试资源未找到错误响应"""
|
|
|
error = ResourceNotFoundError("Resource not found", resource_id="123")
|
|
|
|
|
|
response = {
|
|
|
"success": False,
|
|
|
"error": str(error),
|
|
|
"error_type": "ResourceNotFoundError",
|
|
|
"resource_id": "123"
|
|
|
}
|
|
|
|
|
|
assert response["success"] is False
|
|
|
assert "Resource not found" in response["error"]
|
|
|
|
|
|
def test_job_error_response(self):
|
|
|
"""测试作业错误响应"""
|
|
|
error = JobError("Job failed", job_id="job-123")
|
|
|
|
|
|
response = {
|
|
|
"success": False,
|
|
|
"error": str(error),
|
|
|
"error_type": "JobError",
|
|
|
"job_id": "job-123"
|
|
|
}
|
|
|
|
|
|
assert response["success"] is False
|
|
|
assert "Job failed" in response["error"]
|
|
|
|
|
|
def test_workflow_error_response(self):
|
|
|
"""测试工作流错误响应"""
|
|
|
error = WorkflowError("Workflow error", workflow_step="mutation")
|
|
|
|
|
|
response = {
|
|
|
"success": False,
|
|
|
"error": str(error),
|
|
|
"error_type": "WorkflowError",
|
|
|
"workflow_step": "mutation"
|
|
|
}
|
|
|
|
|
|
assert response["success"] is False
|
|
|
assert "Workflow error" in response["error"]
|
|
|
|
|
|
def test_timeout_error_response(self):
|
|
|
"""测试超时错误响应"""
|
|
|
error = TimeoutError("Operation timed out", operation="verification", timeout_seconds=300)
|
|
|
|
|
|
response = {
|
|
|
"success": False,
|
|
|
"error": str(error),
|
|
|
"error_type": "TimeoutError",
|
|
|
"operation": "verification",
|
|
|
"timeout_seconds": 300
|
|
|
}
|
|
|
|
|
|
assert response["success"] is False
|
|
|
assert "Operation timed out" in response["error"]
|
|
|
|
|
|
def test_concurrency_error_response(self):
|
|
|
"""测试并发错误响应"""
|
|
|
error = ConcurrencyError("Too many concurrent operations")
|
|
|
|
|
|
response = {
|
|
|
"success": False,
|
|
|
"error": str(error),
|
|
|
"error_type": "ConcurrencyError"
|
|
|
}
|
|
|
|
|
|
assert response["success"] is False
|
|
|
assert "Too many concurrent operations" in response["error"]
|
|
|
|
|
|
def test_configuration_error_response(self):
|
|
|
"""测试配置错误响应"""
|
|
|
error = ConfigurationError("Invalid configuration")
|
|
|
|
|
|
response = {
|
|
|
"success": False,
|
|
|
"error": str(error),
|
|
|
"error_type": "ConfigurationError"
|
|
|
}
|
|
|
|
|
|
assert response["success"] is False
|
|
|
assert "Invalid configuration" in response["error"]
|
|
|
|
|
|
def test_external_service_error_response(self):
|
|
|
"""测试外部服务错误响应"""
|
|
|
error = ExternalServiceError("Service unavailable", service="llm_provider")
|
|
|
|
|
|
response = {
|
|
|
"success": False,
|
|
|
"error": str(error),
|
|
|
"error_type": "ExternalServiceError",
|
|
|
"service": "llm_provider"
|
|
|
}
|
|
|
|
|
|
assert response["success"] is False
|
|
|
assert "Service unavailable" in response["error"]
|
|
|
|
|
|
|
|
|
class TestAPIIntegration:
|
|
|
"""API集成测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.app = Flask(__name__)
|
|
|
self.app.config['TESTING'] = True
|
|
|
self.app.config['SECRET_KEY'] = 'test-secret'
|
|
|
|
|
|
# 模拟依赖项
|
|
|
self.mock_job_manager = Mock()
|
|
|
self.mock_workflow_manager = Mock()
|
|
|
self.mock_websocket_handler = Mock()
|
|
|
|
|
|
# 设置应用上下文
|
|
|
with self.app.app_context():
|
|
|
from src.ui.api import init_api
|
|
|
init_api(
|
|
|
config=Mock(),
|
|
|
job_manager=self.mock_job_manager,
|
|
|
workflow_manager=self.mock_workflow_manager,
|
|
|
websocket_handler=self.mock_websocket_handler
|
|
|
)
|
|
|
|
|
|
def test_complete_workflow_integration(self):
|
|
|
"""测试完整工作流集成"""
|
|
|
# 设置模拟响应
|
|
|
self.mock_workflow_manager.start_workflow.return_value = 'workflow-123'
|
|
|
self.mock_job_manager.create_job.return_value = Mock(job_id='job-123')
|
|
|
|
|
|
with self.app.test_client() as client:
|
|
|
# 测试文件上传
|
|
|
with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f:
|
|
|
f.write(b'int test() { return 0; }')
|
|
|
f.flush()
|
|
|
|
|
|
try:
|
|
|
with open(f.name, 'rb') as upload_file:
|
|
|
response = client.post('/api/upload', data={'file': upload_file})
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
# 测试工作流启动
|
|
|
workflow_data = {
|
|
|
'source_files': [f.name],
|
|
|
'target_functions': ['test'],
|
|
|
'workflow_config': {'mode': 'standard'}
|
|
|
}
|
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
# 测试作业列表
|
|
|
response = client.get('/api/jobs')
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
# 测试系统状态
|
|
|
response = client.get('/api/status')
|
|
|
assert response.status_code == 200
|
|
|
|
|
|
finally:
|
|
|
os.unlink(f.name)
|
|
|
|
|
|
def test_error_propagation(self):
|
|
|
"""测试错误传播"""
|
|
|
# 设置模拟错误
|
|
|
self.mock_job_manager.create_job.side_effect = JobError("Database error")
|
|
|
|
|
|
with self.app.test_client() as client:
|
|
|
with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f:
|
|
|
f.write(b'int test() { return 0; }')
|
|
|
f.flush()
|
|
|
|
|
|
try:
|
|
|
with open(f.name, 'rb') as upload_file:
|
|
|
response = client.post('/api/upload', data={'file': upload_file})
|
|
|
assert response.status_code == 500
|
|
|
data = response.get_json()
|
|
|
assert data['success'] is False
|
|
|
assert 'Database error' in data['error']
|
|
|
|
|
|
finally:
|
|
|
os.unlink(f.name)
|
|
|
|
|
|
def test_request_validation(self):
|
|
|
"""测试请求验证"""
|
|
|
with self.app.test_client() as client:
|
|
|
# 测试无效JSON
|
|
|
response = client.post('/api/workflow', data='invalid json', content_type='application/json')
|
|
|
assert response.status_code == 400
|
|
|
|
|
|
# 测试缺少必需字段
|
|
|
response = client.post('/api/workflow', json={})
|
|
|
assert response.status_code == 400
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
pytest.main([__file__, "-v"]) |