You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1074 lines
41 KiB
1074 lines
41 KiB
"""
|
|
完整管道集成测试
|
|
|
|
本模块为整个验证管道提供端到端的集成测试,包括:
|
|
- 文件上传和解析
|
|
- LLM规范生成
|
|
- 规范突变
|
|
- CBMC验证
|
|
- 工作流编排
|
|
- 结果处理和统计
|
|
"""
|
|
|
|
import pytest
|
|
import asyncio
|
|
import tempfile
|
|
import json
|
|
import os
|
|
from pathlib import Path
|
|
from unittest.mock import Mock, patch, AsyncMock, MagicMock
|
|
from typing import Dict, List, Any, Optional
|
|
import time
|
|
|
|
from src.ui.workflow import WorkflowManager, WorkflowConfig, WorkflowMode
|
|
from src.ui.job_manager import JobManager, JobStatus, JobType
|
|
from src.ui.api import init_api
|
|
from src.ui.websocket import WebSocketHandler
|
|
from src.ui.web_app import create_app
|
|
from src.utils.config import get_config, ConfigManager
|
|
|
|
# 导入测试固件
|
|
from tests.fixtures.cbmc_outputs import cbmc_fixtures
|
|
from tests.fixtures.llm_responses import llm_fixtures
|
|
from tests.fixtures.pipeline_mocks import PipelineMockFactory, MockScenario, MockFunctionMetadata
|
|
|
|
|
|
class TestCompletePipelineIntegration:
|
|
"""完整管道集成测试"""
|
|
|
|
def setup_method(self):
|
|
"""测试方法设置"""
|
|
# 创建测试配置
|
|
self.config = get_config()
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
self.config.working_directory = self.temp_dir
|
|
|
|
# 初始化各个组件(使用模拟对象)
|
|
self.job_manager = Mock(spec=JobManager)
|
|
self.workflow_manager = Mock(spec=WorkflowManager)
|
|
self.websocket_handler = None # 暂时不使用WebSocket
|
|
|
|
# 创建测试应用
|
|
self.app = create_app(self.config)
|
|
self.app.config['TESTING'] = True
|
|
self.app.config['SECRET_KEY'] = 'test-secret'
|
|
|
|
# 初始化API
|
|
init_api(
|
|
config=self.config,
|
|
job_manager=self.job_manager,
|
|
workflow_manager=self.workflow_manager,
|
|
websocket_handler=self.websocket_handler
|
|
)
|
|
|
|
# 初始化测试计数器
|
|
self.job_counter = 0
|
|
self.workflow_counter = 0
|
|
|
|
# 初始化模拟工厂
|
|
self.mock_factory = PipelineMockFactory()
|
|
|
|
def teardown_method(self):
|
|
"""测试方法清理"""
|
|
# 清理临时目录
|
|
import shutil
|
|
if os.path.exists(self.temp_dir):
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
@patch('src.parser.CodeParser')
|
|
@patch('src.spec.LLMGenerator')
|
|
@patch('src.mutate.MutationEngine')
|
|
@patch('src.verify.CBMCRunner')
|
|
@patch('src.ui.job_manager.JobManager')
|
|
@patch('src.ui.workflow.WorkflowManager')
|
|
def test_basic_arithmetic_pipeline(self, mock_workflow, mock_job_manager, mock_cbmc, mock_mutate, mock_llm, mock_parser):
|
|
"""测试基础算术函数的完整管道"""
|
|
# 创建完整的模拟环境
|
|
self._setup_comprehensive_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc, mock_job_manager, mock_workflow)
|
|
# 创建测试文件
|
|
test_code = """
|
|
int add(int a, int b) {
|
|
return a + b;
|
|
}
|
|
|
|
int multiply(int x, int y) {
|
|
return x * y;
|
|
}
|
|
"""
|
|
|
|
test_file = os.path.join(self.temp_dir, "test_arithmetic.c")
|
|
with open(test_file, 'w') as f:
|
|
f.write(test_code)
|
|
|
|
# 设置模拟返回值
|
|
mock_parser.return_value.parse_file = AsyncMock(return_value={
|
|
'functions': [
|
|
{'name': 'add', 'return_type': 'int', 'parameters': [{'name': 'a', 'type': 'int'}, {'name': 'b', 'type': 'int'}]},
|
|
{'name': 'multiply', 'return_type': 'int', 'parameters': [{'name': 'x', 'type': 'int'}, {'name': 'y', 'type': 'int'}]}
|
|
]
|
|
})
|
|
|
|
mock_llm.return_value.generate_specification = AsyncMock(return_value={
|
|
'specification': 'void add_spec() { __CPROVER_assert(add(1, 2) == 3); }',
|
|
'confidence': 0.95
|
|
})
|
|
|
|
mock_mutate.return_value.generate_mutations = AsyncMock(return_value=[
|
|
{'mutation': 'void add_spec() { __CPROVER_assert(add(1, 2) == 4); }', 'type': 'predicate_mutation'}
|
|
])
|
|
|
|
mock_cbmc.return_value.run_verification = AsyncMock(return_value={
|
|
'status': 'successful',
|
|
'exit_code': 0,
|
|
'stdout': 'VERIFICATION SUCCESSFUL',
|
|
'verification_results': {'verified_properties': 3, 'failed_properties': 0}
|
|
})
|
|
|
|
try:
|
|
# 1. 测试文件上传
|
|
with self.app.test_client() as client:
|
|
with open(test_file, 'rb') as upload_file:
|
|
response = client.post('/api/upload', data={'file': upload_file})
|
|
assert response.status_code == 200
|
|
upload_data = response.get_json()
|
|
assert upload_data['success'] is True
|
|
job_id = upload_data['data']['job_id']
|
|
|
|
# 等待上传作业完成
|
|
self._wait_for_job_completion(job_id)
|
|
|
|
# 2. 测试代码解析
|
|
parse_data = {
|
|
'file_path': test_file,
|
|
'options': {
|
|
'include_functions': True,
|
|
'include_variables': True,
|
|
'complexity_analysis': True
|
|
}
|
|
}
|
|
|
|
with self.app.test_client() as client:
|
|
response = client.post('/api/parse', json=parse_data)
|
|
assert response.status_code == 200
|
|
parse_result = response.get_json()
|
|
assert parse_result['success'] is True
|
|
parse_job_id = parse_result['data']['job_id']
|
|
|
|
# 等待解析完成
|
|
self._wait_for_job_completion(parse_job_id)
|
|
|
|
# 3. 测试规范生成
|
|
function_metadata = [
|
|
{
|
|
'name': 'add',
|
|
'return_type': 'int',
|
|
'parameters': [
|
|
{'name': 'a', 'type': 'int'},
|
|
{'name': 'b', 'type': 'int'}
|
|
],
|
|
'complexity_score': 0.3
|
|
},
|
|
{
|
|
'name': 'multiply',
|
|
'return_type': 'int',
|
|
'parameters': [
|
|
{'name': 'x', 'type': 'int'},
|
|
{'name': 'y', 'type': 'int'}
|
|
],
|
|
'complexity_score': 0.3
|
|
}
|
|
]
|
|
|
|
spec_data = {
|
|
'functions': function_metadata,
|
|
'options': {
|
|
'verification_types': ['memory_safety', 'overflow_detection'],
|
|
'include_comments': True,
|
|
'style_guide': 'cbmc'
|
|
}
|
|
}
|
|
|
|
with self.app.test_client() as client:
|
|
response = client.post('/api/generate', json=spec_data)
|
|
assert response.status_code == 200
|
|
spec_result = response.get_json()
|
|
assert spec_result['success'] is True
|
|
spec_job_id = spec_result['data']['job_id']
|
|
|
|
# 等待规范生成完成
|
|
self._wait_for_job_completion(spec_job_id)
|
|
|
|
# 4. 测试完整工作流
|
|
workflow_data = {
|
|
'source_files': [test_file],
|
|
'target_functions': ['add', 'multiply'],
|
|
'workflow_config': {
|
|
'mode': 'standard',
|
|
'enable_parsing': True,
|
|
'enable_generation': True,
|
|
'enable_mutation': True,
|
|
'enable_verification': True
|
|
}
|
|
}
|
|
|
|
with self.app.test_client() as client:
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
assert response.status_code == 200
|
|
workflow_result = response.get_json()
|
|
assert workflow_result['success'] is True
|
|
workflow_id = workflow_result['workflow_id']
|
|
|
|
# 等待工作流完成
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
# 5. 验证最终结果
|
|
with self.app.test_client() as client:
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
assert response.status_code == 200
|
|
final_result = response.get_json()
|
|
assert final_result['success'] is True
|
|
assert final_result['workflow']['status'] == 'completed'
|
|
|
|
# 验证统计数据
|
|
stats = final_result['workflow']['statistics']
|
|
assert 'total_functions' in stats
|
|
assert stats['total_functions'] == 2
|
|
assert 'generated_specs' in stats
|
|
assert stats['generated_specs'] == 2
|
|
|
|
finally:
|
|
if os.path.exists(test_file):
|
|
os.unlink(test_file)
|
|
|
|
def test_memory_safety_pipeline(self):
|
|
"""测试内存安全函数的完整管道"""
|
|
with patch('src.parser.CodeParser') as mock_parser, \
|
|
patch('src.spec.LLMGenerator') as mock_llm, \
|
|
patch('src.mutate.MutationEngine') as mock_mutate, \
|
|
patch('src.verify.CBMCRunner') as mock_cbmc, \
|
|
patch('src.ui.job_manager.JobManager') as mock_job_manager, \
|
|
patch('src.ui.workflow.WorkflowManager') as mock_workflow:
|
|
|
|
# 设置内存安全特定的模拟
|
|
self._setup_memory_safety_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc)
|
|
test_code = """
|
|
#include <stdlib.h>
|
|
|
|
void safe_copy(char *dest, const char *src, size_t n) {
|
|
if (dest == NULL || src == NULL) {
|
|
return;
|
|
}
|
|
|
|
for (size_t i = 0; i < n && src[i] != '\\0'; i++) {
|
|
dest[i] = src[i];
|
|
}
|
|
}
|
|
|
|
int* create_array(int size) {
|
|
if (size <= 0) {
|
|
return NULL;
|
|
}
|
|
|
|
int *arr = (int*)malloc(size * sizeof(int));
|
|
if (arr == NULL) {
|
|
return NULL;
|
|
}
|
|
|
|
for (int i = 0; i < size; i++) {
|
|
arr[i] = 0;
|
|
}
|
|
|
|
return arr;
|
|
}
|
|
"""
|
|
|
|
test_file = os.path.join(self.temp_dir, "test_memory_safety.c")
|
|
with open(test_file, 'w') as f:
|
|
f.write(test_code)
|
|
|
|
try:
|
|
# 启动内存安全专用工作流
|
|
workflow_data = {
|
|
'source_files': [test_file],
|
|
'target_functions': ['safe_copy', 'create_array'],
|
|
'workflow_config': {
|
|
'mode': 'memory_safety_focused',
|
|
'enable_parsing': True,
|
|
'enable_generation': True,
|
|
'enable_mutation': True,
|
|
'enable_verification': True,
|
|
'verification_types': ['memory_safety', 'pointer_validity']
|
|
}
|
|
}
|
|
|
|
with self.app.test_client() as client:
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
assert response.status_code == 200
|
|
workflow_result = response.get_json()
|
|
workflow_id = workflow_result['workflow_id']
|
|
|
|
# 等待工作流完成
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
# 验证结果
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
assert response.status_code == 200
|
|
final_result = response.get_json()
|
|
assert final_result['success'] is True
|
|
|
|
# 验证内存安全相关的统计数据
|
|
stats = final_result['workflow']['statistics']
|
|
assert 'total_functions' in stats
|
|
assert stats['total_functions'] == 2
|
|
|
|
finally:
|
|
if os.path.exists(test_file):
|
|
os.unlink(test_file)
|
|
|
|
def test_concurrent_pipeline_execution(self):
|
|
"""测试并发管道执行"""
|
|
with patch('src.parser.CodeParser') as mock_parser, \
|
|
patch('src.spec.LLMGenerator') as mock_llm, \
|
|
patch('src.mutate.MutationEngine') as mock_mutate, \
|
|
patch('src.verify.CBMCRunner') as mock_cbmc, \
|
|
patch('src.ui.job_manager.JobManager') as mock_job_manager, \
|
|
patch('src.ui.workflow.WorkflowManager') as mock_workflow:
|
|
|
|
# 设置并发执行模拟
|
|
self._setup_concurrent_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc)
|
|
# 创建多个测试文件
|
|
test_files = []
|
|
for i in range(3):
|
|
test_code = f"""
|
|
int concurrent_function_{i}(int x) {{
|
|
return x * {i};
|
|
}}
|
|
"""
|
|
test_file = os.path.join(self.temp_dir, f"concurrent_test_{i}.c")
|
|
with open(test_file, 'w') as f:
|
|
f.write(test_code)
|
|
test_files.append(test_file)
|
|
|
|
try:
|
|
# 同时启动多个工作流
|
|
workflow_ids = []
|
|
with self.app.test_client() as client:
|
|
for i, test_file in enumerate(test_files):
|
|
workflow_data = {
|
|
'source_files': [test_file],
|
|
'target_functions': [f'concurrent_function_{i}'],
|
|
'workflow_config': {
|
|
'mode': 'standard',
|
|
'enable_parsing': True,
|
|
'enable_generation': True,
|
|
'enable_mutation': True,
|
|
'enable_verification': True
|
|
}
|
|
}
|
|
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
assert response.status_code == 200
|
|
workflow_result = response.get_json()
|
|
workflow_ids.append(workflow_result['workflow_id'])
|
|
|
|
# 等待所有工作流完成
|
|
for workflow_id in workflow_ids:
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
# 验证所有工作流都成功完成
|
|
for workflow_id in workflow_ids:
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
assert response.status_code == 200
|
|
final_result = response.get_json()
|
|
assert final_result['success'] is True
|
|
assert final_result['workflow']['status'] == 'completed'
|
|
|
|
finally:
|
|
for test_file in test_files:
|
|
if os.path.exists(test_file):
|
|
os.unlink(test_file)
|
|
|
|
def test_error_handling_pipeline(self):
|
|
"""测试错误处理管道"""
|
|
with patch('src.parser.CodeParser') as mock_parser, \
|
|
patch('src.spec.LLMGenerator') as mock_llm, \
|
|
patch('src.mutate.MutationEngine') as mock_mutate, \
|
|
patch('src.verify.CBMCRunner') as mock_cbmc:
|
|
|
|
# 设置错误处理模拟
|
|
self._setup_error_handling_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc)
|
|
# 测试语法错误的代码
|
|
invalid_code = """
|
|
int invalid_function( {
|
|
// 语法错误:缺少参数和闭合括号
|
|
return x;
|
|
}
|
|
"""
|
|
|
|
test_file = os.path.join(self.temp_dir, "invalid_code.c")
|
|
with open(test_file, 'w') as f:
|
|
f.write(invalid_code)
|
|
|
|
try:
|
|
with self.app.test_client() as client:
|
|
# 测试文件上传(应该成功)
|
|
with open(test_file, 'rb') as upload_file:
|
|
response = client.post('/api/upload', data={'file': upload_file})
|
|
assert response.status_code == 200
|
|
upload_data = response.get_json()
|
|
assert upload_data['success'] is True
|
|
|
|
# 测试解析(应该失败)
|
|
parse_data = {
|
|
'file_path': test_file,
|
|
'options': {'include_functions': True}
|
|
}
|
|
|
|
response = client.post('/api/parse', json=parse_data)
|
|
# 解析可能会失败,但应该返回适当的错误响应
|
|
assert response.status_code in [200, 400, 500]
|
|
|
|
if response.status_code == 200:
|
|
parse_result = response.get_json()
|
|
if parse_result['success']:
|
|
parse_job_id = parse_result['data']['job_id']
|
|
self._wait_for_job_completion(parse_job_id)
|
|
|
|
finally:
|
|
if os.path.exists(test_file):
|
|
os.unlink(test_file)
|
|
|
|
def test_pipeline_performance_metrics(self):
|
|
"""测试管道性能指标"""
|
|
with patch('src.parser.CodeParser') as mock_parser, \
|
|
patch('src.spec.LLMGenerator') as mock_llm, \
|
|
patch('src.mutate.MutationEngine') as mock_mutate, \
|
|
patch('src.verify.CBMCRunner') as mock_cbmc, \
|
|
patch('src.ui.job_manager.JobManager') as mock_job_manager, \
|
|
patch('src.ui.workflow.WorkflowManager') as mock_workflow:
|
|
|
|
# 设置性能测试模拟(快速响应)
|
|
self._setup_performance_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc)
|
|
test_code = """
|
|
int performance_test(int a, int b, int c) {
|
|
int result = a + b * c;
|
|
if (result > 100) {
|
|
result = result / 2;
|
|
}
|
|
return result;
|
|
}
|
|
"""
|
|
|
|
test_file = os.path.join(self.temp_dir, "performance_test.c")
|
|
with open(test_file, 'w') as f:
|
|
f.write(test_code)
|
|
|
|
try:
|
|
start_time = time.time()
|
|
|
|
with self.app.test_client() as client:
|
|
# 完整工作流
|
|
workflow_data = {
|
|
'source_files': [test_file],
|
|
'target_functions': ['performance_test'],
|
|
'workflow_config': {
|
|
'mode': 'standard',
|
|
'enable_parsing': True,
|
|
'enable_generation': True,
|
|
'enable_mutation': True,
|
|
'enable_verification': True
|
|
}
|
|
}
|
|
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
assert response.status_code == 200
|
|
workflow_result = response.get_json()
|
|
workflow_id = workflow_result['workflow_id']
|
|
|
|
# 等待完成
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
end_time = time.time()
|
|
execution_time = end_time - start_time
|
|
|
|
# 验证执行时间在合理范围内
|
|
assert execution_time < 60.0 # 应该在60秒内完成
|
|
|
|
# 获取详细的性能统计
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
assert response.status_code == 200
|
|
final_result = response.get_json()
|
|
|
|
# 验证性能指标存在
|
|
workflow_data = final_result['workflow']
|
|
assert 'completed_at' in workflow_data
|
|
assert 'created_at' in workflow_data
|
|
|
|
finally:
|
|
if os.path.exists(test_file):
|
|
os.unlink(test_file)
|
|
|
|
def test_pipeline_with_large_file(self):
|
|
"""测试大文件处理管道"""
|
|
with patch('src.parser.CodeParser') as mock_parser, \
|
|
patch('src.spec.LLMGenerator') as mock_llm, \
|
|
patch('src.mutate.MutationEngine') as mock_mutate, \
|
|
patch('src.verify.CBMCRunner') as mock_cbmc:
|
|
|
|
# 设置大文件处理模拟
|
|
self._setup_large_file_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc)
|
|
# 创建较大的测试文件
|
|
large_code = []
|
|
for i in range(100):
|
|
large_code.append(f"""
|
|
int large_function_{i}(int x, int y) {{
|
|
/* Complex computation {i} */
|
|
int result = 0;
|
|
for (int j = 0; j < {i % 10 + 1}; j++) {{
|
|
result += x * y + j;
|
|
}}
|
|
return result;
|
|
}}
|
|
""")
|
|
|
|
test_file = os.path.join(self.temp_dir, "large_test.c")
|
|
with open(test_file, 'w') as f:
|
|
f.write('\n'.join(large_code))
|
|
|
|
try:
|
|
with self.app.test_client() as client:
|
|
# 上传大文件
|
|
with open(test_file, 'rb') as upload_file:
|
|
response = client.post('/api/upload', data={'file': upload_file})
|
|
assert response.status_code == 200
|
|
upload_data = response.get_json()
|
|
assert upload_data['success'] is True
|
|
|
|
# 解析大文件
|
|
parse_data = {
|
|
'file_path': test_file,
|
|
'options': {
|
|
'include_functions': True,
|
|
'complexity_analysis': True
|
|
}
|
|
}
|
|
|
|
response = client.post('/api/parse', json=parse_data)
|
|
assert response.status_code == 200
|
|
parse_result = response.get_json()
|
|
assert parse_result['success'] is True
|
|
parse_job_id = parse_result['data']['job_id']
|
|
|
|
# 等待解析完成
|
|
self._wait_for_job_completion(parse_job_id)
|
|
|
|
# 获取解析结果
|
|
response = client.get(f'/api/jobs/{parse_job_id}')
|
|
assert response.status_code == 200
|
|
job_result = response.get_json()
|
|
assert job_result['success'] is True
|
|
|
|
# 验证解析了正确数量的函数
|
|
job_data = job_result['job']
|
|
if 'result' in job_data and 'functions' in job_data['result']:
|
|
functions = job_data['result']['functions']
|
|
assert len(functions) == 100
|
|
|
|
finally:
|
|
if os.path.exists(test_file):
|
|
os.unlink(test_file)
|
|
|
|
def test_pipeline_resume_and_recovery(self):
|
|
"""测试管道恢复和恢复机制"""
|
|
with patch('src.parser.CodeParser') as mock_parser, \
|
|
patch('src.spec.LLMGenerator') as mock_llm, \
|
|
patch('src.mutate.MutationEngine') as mock_mutate, \
|
|
patch('src.verify.CBMCRunner') as mock_cbmc, \
|
|
patch('src.ui.job_manager.JobManager') as mock_job_manager, \
|
|
patch('src.ui.workflow.WorkflowManager') as mock_workflow:
|
|
|
|
# 设置恢复机制模拟
|
|
self._setup_recovery_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc, mock_job_manager, mock_workflow)
|
|
test_code = """
|
|
int recovery_test(int x) {
|
|
return x * 2;
|
|
}
|
|
"""
|
|
|
|
test_file = os.path.join(self.temp_dir, "recovery_test.c")
|
|
with open(test_file, 'w') as f:
|
|
f.write(test_code)
|
|
|
|
try:
|
|
with self.app.test_client() as client:
|
|
# 启动工作流
|
|
workflow_data = {
|
|
'source_files': [test_file],
|
|
'target_functions': ['recovery_test'],
|
|
'workflow_config': {
|
|
'mode': 'standard',
|
|
'enable_parsing': True,
|
|
'enable_generation': True,
|
|
'enable_mutation': True,
|
|
'enable_verification': True
|
|
}
|
|
}
|
|
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
assert response.status_code == 200
|
|
workflow_result = response.get_json()
|
|
workflow_id = workflow_result['workflow_id']
|
|
|
|
# 等待工作流完成
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
# 验证工作流状态可以被查询
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
assert response.status_code == 200
|
|
status_result = response.get_json()
|
|
assert status_result['success'] is True
|
|
|
|
# 验证作业历史
|
|
response = client.get('/api/jobs')
|
|
assert response.status_code == 200
|
|
jobs_result = response.get_json()
|
|
assert jobs_result['success'] is True
|
|
assert len(jobs_result['jobs']) > 0
|
|
|
|
finally:
|
|
if os.path.exists(test_file):
|
|
os.unlink(test_file)
|
|
|
|
def test_pipeline_configuration_variations(self):
|
|
"""测试不同配置的管道变体"""
|
|
with patch('src.parser.CodeParser') as mock_parser, \
|
|
patch('src.spec.LLMGenerator') as mock_llm, \
|
|
patch('src.mutate.MutationEngine') as mock_mutate, \
|
|
patch('src.verify.CBMCRunner') as mock_cbmc:
|
|
|
|
# 设置配置变体模拟
|
|
self._setup_configuration_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc)
|
|
test_code = """
|
|
int config_test(int a, int b) {
|
|
return a + b;
|
|
}
|
|
"""
|
|
|
|
test_file = os.path.join(self.temp_dir, "config_test.c")
|
|
with open(test_file, 'w') as f:
|
|
f.write(test_code)
|
|
|
|
try:
|
|
configurations = [
|
|
{
|
|
'name': 'minimal',
|
|
'config': {
|
|
'mode': 'minimal',
|
|
'enable_parsing': True,
|
|
'enable_generation': True,
|
|
'enable_mutation': False,
|
|
'enable_verification': False
|
|
}
|
|
},
|
|
{
|
|
'name': 'generation_only',
|
|
'config': {
|
|
'mode': 'generation_only',
|
|
'enable_parsing': True,
|
|
'enable_generation': True,
|
|
'enable_mutation': False,
|
|
'enable_verification': True
|
|
}
|
|
},
|
|
{
|
|
'name': 'comprehensive',
|
|
'config': {
|
|
'mode': 'comprehensive',
|
|
'enable_parsing': True,
|
|
'enable_generation': True,
|
|
'enable_mutation': True,
|
|
'enable_verification': True
|
|
}
|
|
}
|
|
]
|
|
|
|
with self.app.test_client() as client:
|
|
for config_info in configurations:
|
|
workflow_data = {
|
|
'source_files': [test_file],
|
|
'target_functions': ['config_test'],
|
|
'workflow_config': config_info['config']
|
|
}
|
|
|
|
response = client.post('/api/workflow', json=workflow_data)
|
|
assert response.status_code == 200
|
|
workflow_result = response.get_json()
|
|
workflow_id = workflow_result['workflow_id']
|
|
|
|
# 等待完成
|
|
self._wait_for_workflow_completion(workflow_id)
|
|
|
|
# 验证结果
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
assert response.status_code == 200
|
|
final_result = response.get_json()
|
|
assert final_result['success'] is True
|
|
assert final_result['workflow']['status'] == 'completed'
|
|
|
|
finally:
|
|
if os.path.exists(test_file):
|
|
os.unlink(test_file)
|
|
|
|
def _wait_for_job_completion(self, job_id: str, timeout: int = 30):
|
|
"""等待作业完成"""
|
|
import time
|
|
start_time = time.time()
|
|
|
|
with self.app.test_client() as client:
|
|
while time.time() - start_time < timeout:
|
|
response = client.get(f'/api/jobs/{job_id}')
|
|
assert response.status_code == 200
|
|
job_data = response.get_json()
|
|
|
|
if job_data['success']:
|
|
job = job_data['job']
|
|
if job['status'] in ['completed', 'failed', 'cancelled']:
|
|
return
|
|
|
|
time.sleep(1)
|
|
|
|
pytest.fail(f"Job {job_id} did not complete within {timeout} seconds")
|
|
|
|
def _wait_for_workflow_completion(self, workflow_id: str, timeout: int = 60):
|
|
"""等待工作流完成"""
|
|
import time
|
|
start_time = time.time()
|
|
|
|
with self.app.test_client() as client:
|
|
while time.time() - start_time < timeout:
|
|
response = client.get(f'/api/workflow/{workflow_id}')
|
|
assert response.status_code == 200
|
|
workflow_data = response.get_json()
|
|
|
|
if workflow_data['success']:
|
|
workflow = workflow_data['workflow']
|
|
if workflow['status'] in ['completed', 'failed', 'cancelled']:
|
|
return
|
|
|
|
time.sleep(2)
|
|
|
|
pytest.fail(f"Workflow {workflow_id} did not complete within {timeout} seconds")
|
|
|
|
def _setup_comprehensive_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc, mock_job_manager, mock_workflow):
|
|
"""设置完整的模拟环境"""
|
|
# 使用模拟工厂创建完整的模拟环境
|
|
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.SUCCESS)
|
|
|
|
# 配置各个模拟对象
|
|
mock_parser.return_value = mocks['parser'].return_value
|
|
mock_llm.return_value = mocks['llm_generator'].return_value
|
|
mock_mutate.return_value = mocks['mutation_engine'].return_value
|
|
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
|
|
mock_job_manager.return_value = mocks['job_manager'].return_value
|
|
mock_workflow.return_value = mocks['workflow_manager'].return_value
|
|
|
|
def _setup_memory_safety_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc):
|
|
"""设置内存安全特定的模拟"""
|
|
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.MEMORY_SAFETY)
|
|
mock_parser.return_value = mocks['parser'].return_value
|
|
mock_llm.return_value = mocks['llm_generator'].return_value
|
|
mock_mutate.return_value = mocks['mutation_engine'].return_value
|
|
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
|
|
|
|
def _setup_concurrent_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc):
|
|
"""设置并发执行模拟"""
|
|
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.CONCURRENT)
|
|
mock_parser.return_value = mocks['parser'].return_value
|
|
mock_llm.return_value = mocks['llm_generator'].return_value
|
|
mock_mutate.return_value = mocks['mutation_engine'].return_value
|
|
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
|
|
|
|
def _setup_error_handling_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc):
|
|
"""设置错误处理模拟"""
|
|
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.ERROR)
|
|
mock_parser.return_value = mocks['parser'].return_value
|
|
mock_llm.return_value = mocks['llm_generator'].return_value
|
|
mock_mutate.return_value = mocks['mutation_engine'].return_value
|
|
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
|
|
|
|
def _setup_performance_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc):
|
|
"""设置性能测试模拟(快速响应)"""
|
|
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.PERFORMANCE)
|
|
mock_parser.return_value = mocks['parser'].return_value
|
|
mock_llm.return_value = mocks['llm_generator'].return_value
|
|
mock_mutate.return_value = mocks['mutation_engine'].return_value
|
|
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
|
|
|
|
def _setup_large_file_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc):
|
|
"""设置大文件处理模拟"""
|
|
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.LARGE_FILE)
|
|
mock_parser.return_value = mocks['parser'].return_value
|
|
mock_llm.return_value = mocks['llm_generator'].return_value
|
|
mock_mutate.return_value = mocks['mutation_engine'].return_value
|
|
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
|
|
|
|
def _setup_recovery_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc, mock_job_manager, mock_workflow):
|
|
"""设置恢复机制模拟"""
|
|
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.SUCCESS)
|
|
mock_parser.return_value = mocks['parser'].return_value
|
|
mock_llm.return_value = mocks['llm_generator'].return_value
|
|
mock_mutate.return_value = mocks['mutation_engine'].return_value
|
|
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
|
|
mock_job_manager.return_value = mocks['job_manager'].return_value
|
|
mock_workflow.return_value = mocks['workflow_manager'].return_value
|
|
|
|
def _setup_configuration_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc):
|
|
"""设置配置变体模拟"""
|
|
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.SUCCESS)
|
|
mock_parser.return_value = mocks['parser'].return_value
|
|
mock_llm.return_value = mocks['llm_generator'].return_value
|
|
mock_mutate.return_value = mocks['mutation_engine'].return_value
|
|
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
|
|
|
|
|
|
class TestPipelineModuleIntegration:
|
|
"""管道模块集成测试"""
|
|
|
|
def setup_method(self):
|
|
"""测试方法设置"""
|
|
self.config = get_config()
|
|
self.temp_dir = tempfile.mkdtemp()
|
|
self.config.working_directory = self.temp_dir
|
|
|
|
def teardown_method(self):
|
|
"""测试方法清理"""
|
|
import shutil
|
|
if os.path.exists(self.temp_dir):
|
|
shutil.rmtree(self.temp_dir)
|
|
|
|
def test_parser_llm_generator_integration(self):
|
|
"""测试解析器和LLM生成器集成"""
|
|
from src.parser import CodeParser
|
|
from src.spec import SpecGenerator
|
|
|
|
# 创建测试代码
|
|
test_code = """
|
|
int integration_test(int x, int y) {
|
|
return x + y;
|
|
}
|
|
"""
|
|
|
|
# 使用解析器
|
|
parser = CodeParser(self.config)
|
|
parse_result = parser.parse_code(test_code)
|
|
|
|
assert parse_result.success
|
|
assert len(parse_result.functions) > 0
|
|
|
|
# 使用LLM生成器
|
|
generator = SpecGenerator(self.config)
|
|
function_metadata = [
|
|
{
|
|
'name': 'integration_test',
|
|
'return_type': 'int',
|
|
'parameters': [
|
|
{'name': 'x', 'type': 'int'},
|
|
{'name': 'y', 'type': 'int'}
|
|
],
|
|
'complexity_score': 0.3
|
|
}
|
|
]
|
|
|
|
# 模拟LLM响应
|
|
with patch.object(generator, 'generate_specification') as mock_generate:
|
|
mock_generate.return_value = {
|
|
'specification': 'void integration_test(int x, int y) { __CPROVER_assume(x >= 0 && y >= 0); }',
|
|
'confidence': 0.8,
|
|
'explanation': 'Generated specification for integration test'
|
|
}
|
|
|
|
result = generator.generate_specification(function_metadata[0])
|
|
assert result['specification'] is not None
|
|
|
|
def test_llm_generator_mutator_integration(self):
|
|
"""测试LLM生成器和突变器集成"""
|
|
from src.spec import SpecGenerator
|
|
from src.mutate import MutationEngine
|
|
|
|
specification = 'void test_function(int x) { __CPROVER_assume(x > 0); }'
|
|
function_metadata = {
|
|
'name': 'test_function',
|
|
'parameters': [{'name': 'x', 'type': 'int'}]
|
|
}
|
|
|
|
# 使用突变器
|
|
config = Mock()
|
|
config.max_candidates_per_function = 10
|
|
config.max_selected_candidates = 5
|
|
config.quality_threshold = 0.6
|
|
|
|
mutator = MutationEngine(config)
|
|
|
|
# 模拟突变
|
|
with patch.object(mutator, 'execute_mutation') as mock_mutate:
|
|
mock_result = Mock()
|
|
mock_result.selected_candidates = [
|
|
Mock(
|
|
mutated_spec='void test_function(int x) { __CPROVER_assume(x >= 0); }',
|
|
confidence=0.8
|
|
)
|
|
]
|
|
mock_mutate.return_value = mock_result
|
|
|
|
result = mutator.execute_mutation(specification, function_metadata)
|
|
assert len(result.selected_candidates) > 0
|
|
|
|
def test_mutator_verifier_integration(self):
|
|
"""测试突变器和验证器集成"""
|
|
from src.mutate import MutationEngine
|
|
from src.verify import CBMCRunner
|
|
|
|
mutated_spec = 'void test_function(int x) { __CPROVER_assume(x >= 0); }'
|
|
function_metadata = {"name": "test_function"}
|
|
|
|
# 使用验证器
|
|
verifier = CBMCRunner()
|
|
|
|
# 模拟验证
|
|
with patch.object(verifier, 'run_verification') as mock_verify:
|
|
mock_result = Mock()
|
|
mock_result.status = Mock()
|
|
mock_result.status.value = 'successful'
|
|
mock_verify.return_value = mock_result
|
|
|
|
# 异步测试
|
|
async def test_async():
|
|
result = await verifier.run_verification(function_metadata, "test.c", mutated_spec)
|
|
assert result.status.value == 'successful'
|
|
|
|
asyncio.run(test_async())
|
|
|
|
def test_full_module_chain_integration(self):
|
|
"""测试完整模块链集成"""
|
|
test_code = """
|
|
int chain_test(int a, int b) {
|
|
if (a > 0 && b > 0) {
|
|
return a + b;
|
|
}
|
|
return 0;
|
|
}
|
|
"""
|
|
|
|
# 1. 解析
|
|
from src.parser import CodeParser
|
|
parser = CodeParser(self.config)
|
|
parse_result = parser.parse_code(test_code)
|
|
assert parse_result.success
|
|
|
|
# 2. 生成规范
|
|
from src.spec import SpecGenerator
|
|
generator = SpecGenerator(self.config)
|
|
|
|
with patch.object(generator, 'generate_specification') as mock_generate:
|
|
mock_generate.return_value = {
|
|
'specification': 'void chain_test(int a, int b) { __CPROVER_assume(a > 0 && b > 0); }',
|
|
'confidence': 0.8,
|
|
'explanation': 'Generated specification'
|
|
}
|
|
|
|
spec_result = generator.generate_specification(parse_result.functions[0])
|
|
assert spec_result['specification'] is not None
|
|
|
|
# 3. 突变
|
|
from src.mutate import MutationEngine
|
|
config = Mock()
|
|
config.max_candidates_per_function = 5
|
|
config.max_selected_candidates = 3
|
|
config.quality_threshold = 0.7
|
|
|
|
mutator = MutationEngine(config)
|
|
|
|
with patch.object(mutator, 'execute_mutation') as mock_mutate:
|
|
mock_result = Mock()
|
|
mock_result.selected_candidates = [
|
|
Mock(
|
|
mutated_spec='void chain_test(int a, int b) { __CPROVER_assume(a >= 0 && b >= 0); }',
|
|
confidence=0.8
|
|
)
|
|
]
|
|
mock_mutate.return_value = mock_result
|
|
|
|
mutation_result = mutator.execute_mutation(
|
|
spec_result['specification'],
|
|
{'name': 'chain_test', 'parameters': parse_result.functions[0].parameters}
|
|
)
|
|
assert len(mutation_result.selected_candidates) > 0
|
|
|
|
# 4. 验证
|
|
from src.verify import CBMCRunner
|
|
verifier = CBMCRunner()
|
|
|
|
with patch.object(verifier, 'run_verification') as mock_verify:
|
|
mock_result = Mock()
|
|
mock_result.status = Mock()
|
|
mock_result.status.value = 'successful'
|
|
mock_verify.return_value = mock_result
|
|
|
|
async def test_verification():
|
|
result = await verifier.run_verification(
|
|
{'name': 'chain_test'},
|
|
"test.c",
|
|
mutation_result.selected_candidates[0].mutated_spec
|
|
)
|
|
assert result.status.value == 'successful'
|
|
|
|
asyncio.run(test_verification())
|
|
|
|
def test_error_propagation_through_chain(self):
|
|
"""测试错误在链中的传播"""
|
|
# 测试解析错误传播
|
|
from src.parser import CodeParser
|
|
parser = CodeParser(self.config)
|
|
|
|
invalid_code = "int invalid( {"
|
|
parse_result = parser.parse_code(invalid_code)
|
|
assert not parse_result.success
|
|
assert parse_result.error is not None
|
|
|
|
# 测试生成错误传播
|
|
from src.spec import SpecGenerator
|
|
generator = SpecGenerator(self.config)
|
|
|
|
with patch.object(generator, 'generate_specification') as mock_generate:
|
|
mock_generate.side_effect = Exception("LLM service unavailable")
|
|
|
|
with pytest.raises(Exception):
|
|
generator.generate_specification(Mock())
|
|
|
|
# 测试突变错误传播
|
|
from src.mutate import MutationEngine
|
|
config = Mock()
|
|
mutator = MutationEngine(config)
|
|
|
|
with patch.object(mutator, 'execute_mutation') as mock_mutate:
|
|
mock_mutate.side_effect = Exception("Mutation service error")
|
|
|
|
with pytest.raises(Exception):
|
|
mutator.execute_mutation("void test() { }", {"name": "test"})
|
|
|
|
# 测试验证错误传播
|
|
from src.verify import CBMCRunner
|
|
verifier = CBMCRunner()
|
|
|
|
with patch.object(verifier, 'run_verification') as mock_verify:
|
|
mock_verify.side_effect = Exception("CBMC not available")
|
|
|
|
async def test_verification_error():
|
|
with pytest.raises(Exception):
|
|
await verifier.run_verification({"name": "test"}, "test.c", "void test() { }")
|
|
|
|
asyncio.run(test_verification_error())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v"]) |