You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cbmc/codedetect/tests/integration/test_complete_pipeline.py

1074 lines
41 KiB

"""
完整管道集成测试
本模块为整个验证管道提供端到端的集成测试,包括:
- 文件上传和解析
- LLM规范生成
- 规范突变
- CBMC验证
- 工作流编排
- 结果处理和统计
"""
import pytest
import asyncio
import tempfile
import json
import os
from pathlib import Path
from unittest.mock import Mock, patch, AsyncMock, MagicMock
from typing import Dict, List, Any, Optional
import time
from src.ui.workflow import WorkflowManager, WorkflowConfig, WorkflowMode
from src.ui.job_manager import JobManager, JobStatus, JobType
from src.ui.api import init_api
from src.ui.websocket import WebSocketHandler
from src.ui.web_app import create_app
from src.utils.config import get_config, ConfigManager
# 导入测试固件
from tests.fixtures.cbmc_outputs import cbmc_fixtures
from tests.fixtures.llm_responses import llm_fixtures
from tests.fixtures.pipeline_mocks import PipelineMockFactory, MockScenario, MockFunctionMetadata
class TestCompletePipelineIntegration:
"""完整管道集成测试"""
def setup_method(self):
"""测试方法设置"""
# 创建测试配置
self.config = get_config()
self.temp_dir = tempfile.mkdtemp()
self.config.working_directory = self.temp_dir
# 初始化各个组件(使用模拟对象)
self.job_manager = Mock(spec=JobManager)
self.workflow_manager = Mock(spec=WorkflowManager)
self.websocket_handler = None # 暂时不使用WebSocket
# 创建测试应用
self.app = create_app(self.config)
self.app.config['TESTING'] = True
self.app.config['SECRET_KEY'] = 'test-secret'
# 初始化API
init_api(
config=self.config,
job_manager=self.job_manager,
workflow_manager=self.workflow_manager,
websocket_handler=self.websocket_handler
)
# 初始化测试计数器
self.job_counter = 0
self.workflow_counter = 0
# 初始化模拟工厂
self.mock_factory = PipelineMockFactory()
def teardown_method(self):
"""测试方法清理"""
# 清理临时目录
import shutil
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
@patch('src.parser.CodeParser')
@patch('src.spec.LLMGenerator')
@patch('src.mutate.MutationEngine')
@patch('src.verify.CBMCRunner')
@patch('src.ui.job_manager.JobManager')
@patch('src.ui.workflow.WorkflowManager')
def test_basic_arithmetic_pipeline(self, mock_workflow, mock_job_manager, mock_cbmc, mock_mutate, mock_llm, mock_parser):
"""测试基础算术函数的完整管道"""
# 创建完整的模拟环境
self._setup_comprehensive_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc, mock_job_manager, mock_workflow)
# 创建测试文件
test_code = """
int add(int a, int b) {
return a + b;
}
int multiply(int x, int y) {
return x * y;
}
"""
test_file = os.path.join(self.temp_dir, "test_arithmetic.c")
with open(test_file, 'w') as f:
f.write(test_code)
# 设置模拟返回值
mock_parser.return_value.parse_file = AsyncMock(return_value={
'functions': [
{'name': 'add', 'return_type': 'int', 'parameters': [{'name': 'a', 'type': 'int'}, {'name': 'b', 'type': 'int'}]},
{'name': 'multiply', 'return_type': 'int', 'parameters': [{'name': 'x', 'type': 'int'}, {'name': 'y', 'type': 'int'}]}
]
})
mock_llm.return_value.generate_specification = AsyncMock(return_value={
'specification': 'void add_spec() { __CPROVER_assert(add(1, 2) == 3); }',
'confidence': 0.95
})
mock_mutate.return_value.generate_mutations = AsyncMock(return_value=[
{'mutation': 'void add_spec() { __CPROVER_assert(add(1, 2) == 4); }', 'type': 'predicate_mutation'}
])
mock_cbmc.return_value.run_verification = AsyncMock(return_value={
'status': 'successful',
'exit_code': 0,
'stdout': 'VERIFICATION SUCCESSFUL',
'verification_results': {'verified_properties': 3, 'failed_properties': 0}
})
try:
# 1. 测试文件上传
with self.app.test_client() as client:
with open(test_file, 'rb') as upload_file:
response = client.post('/api/upload', data={'file': upload_file})
assert response.status_code == 200
upload_data = response.get_json()
assert upload_data['success'] is True
job_id = upload_data['data']['job_id']
# 等待上传作业完成
self._wait_for_job_completion(job_id)
# 2. 测试代码解析
parse_data = {
'file_path': test_file,
'options': {
'include_functions': True,
'include_variables': True,
'complexity_analysis': True
}
}
with self.app.test_client() as client:
response = client.post('/api/parse', json=parse_data)
assert response.status_code == 200
parse_result = response.get_json()
assert parse_result['success'] is True
parse_job_id = parse_result['data']['job_id']
# 等待解析完成
self._wait_for_job_completion(parse_job_id)
# 3. 测试规范生成
function_metadata = [
{
'name': 'add',
'return_type': 'int',
'parameters': [
{'name': 'a', 'type': 'int'},
{'name': 'b', 'type': 'int'}
],
'complexity_score': 0.3
},
{
'name': 'multiply',
'return_type': 'int',
'parameters': [
{'name': 'x', 'type': 'int'},
{'name': 'y', 'type': 'int'}
],
'complexity_score': 0.3
}
]
spec_data = {
'functions': function_metadata,
'options': {
'verification_types': ['memory_safety', 'overflow_detection'],
'include_comments': True,
'style_guide': 'cbmc'
}
}
with self.app.test_client() as client:
response = client.post('/api/generate', json=spec_data)
assert response.status_code == 200
spec_result = response.get_json()
assert spec_result['success'] is True
spec_job_id = spec_result['data']['job_id']
# 等待规范生成完成
self._wait_for_job_completion(spec_job_id)
# 4. 测试完整工作流
workflow_data = {
'source_files': [test_file],
'target_functions': ['add', 'multiply'],
'workflow_config': {
'mode': 'standard',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True
}
}
with self.app.test_client() as client:
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
assert workflow_result['success'] is True
workflow_id = workflow_result['workflow_id']
# 等待工作流完成
self._wait_for_workflow_completion(workflow_id)
# 5. 验证最终结果
with self.app.test_client() as client:
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
final_result = response.get_json()
assert final_result['success'] is True
assert final_result['workflow']['status'] == 'completed'
# 验证统计数据
stats = final_result['workflow']['statistics']
assert 'total_functions' in stats
assert stats['total_functions'] == 2
assert 'generated_specs' in stats
assert stats['generated_specs'] == 2
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def test_memory_safety_pipeline(self):
"""测试内存安全函数的完整管道"""
with patch('src.parser.CodeParser') as mock_parser, \
patch('src.spec.LLMGenerator') as mock_llm, \
patch('src.mutate.MutationEngine') as mock_mutate, \
patch('src.verify.CBMCRunner') as mock_cbmc, \
patch('src.ui.job_manager.JobManager') as mock_job_manager, \
patch('src.ui.workflow.WorkflowManager') as mock_workflow:
# 设置内存安全特定的模拟
self._setup_memory_safety_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc)
test_code = """
#include <stdlib.h>
void safe_copy(char *dest, const char *src, size_t n) {
if (dest == NULL || src == NULL) {
return;
}
for (size_t i = 0; i < n && src[i] != '\\0'; i++) {
dest[i] = src[i];
}
}
int* create_array(int size) {
if (size <= 0) {
return NULL;
}
int *arr = (int*)malloc(size * sizeof(int));
if (arr == NULL) {
return NULL;
}
for (int i = 0; i < size; i++) {
arr[i] = 0;
}
return arr;
}
"""
test_file = os.path.join(self.temp_dir, "test_memory_safety.c")
with open(test_file, 'w') as f:
f.write(test_code)
try:
# 启动内存安全专用工作流
workflow_data = {
'source_files': [test_file],
'target_functions': ['safe_copy', 'create_array'],
'workflow_config': {
'mode': 'memory_safety_focused',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True,
'verification_types': ['memory_safety', 'pointer_validity']
}
}
with self.app.test_client() as client:
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
workflow_id = workflow_result['workflow_id']
# 等待工作流完成
self._wait_for_workflow_completion(workflow_id)
# 验证结果
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
final_result = response.get_json()
assert final_result['success'] is True
# 验证内存安全相关的统计数据
stats = final_result['workflow']['statistics']
assert 'total_functions' in stats
assert stats['total_functions'] == 2
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def test_concurrent_pipeline_execution(self):
"""测试并发管道执行"""
with patch('src.parser.CodeParser') as mock_parser, \
patch('src.spec.LLMGenerator') as mock_llm, \
patch('src.mutate.MutationEngine') as mock_mutate, \
patch('src.verify.CBMCRunner') as mock_cbmc, \
patch('src.ui.job_manager.JobManager') as mock_job_manager, \
patch('src.ui.workflow.WorkflowManager') as mock_workflow:
# 设置并发执行模拟
self._setup_concurrent_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc)
# 创建多个测试文件
test_files = []
for i in range(3):
test_code = f"""
int concurrent_function_{i}(int x) {{
return x * {i};
}}
"""
test_file = os.path.join(self.temp_dir, f"concurrent_test_{i}.c")
with open(test_file, 'w') as f:
f.write(test_code)
test_files.append(test_file)
try:
# 同时启动多个工作流
workflow_ids = []
with self.app.test_client() as client:
for i, test_file in enumerate(test_files):
workflow_data = {
'source_files': [test_file],
'target_functions': [f'concurrent_function_{i}'],
'workflow_config': {
'mode': 'standard',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True
}
}
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
workflow_ids.append(workflow_result['workflow_id'])
# 等待所有工作流完成
for workflow_id in workflow_ids:
self._wait_for_workflow_completion(workflow_id)
# 验证所有工作流都成功完成
for workflow_id in workflow_ids:
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
final_result = response.get_json()
assert final_result['success'] is True
assert final_result['workflow']['status'] == 'completed'
finally:
for test_file in test_files:
if os.path.exists(test_file):
os.unlink(test_file)
def test_error_handling_pipeline(self):
"""测试错误处理管道"""
with patch('src.parser.CodeParser') as mock_parser, \
patch('src.spec.LLMGenerator') as mock_llm, \
patch('src.mutate.MutationEngine') as mock_mutate, \
patch('src.verify.CBMCRunner') as mock_cbmc:
# 设置错误处理模拟
self._setup_error_handling_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc)
# 测试语法错误的代码
invalid_code = """
int invalid_function( {
// 语法错误:缺少参数和闭合括号
return x;
}
"""
test_file = os.path.join(self.temp_dir, "invalid_code.c")
with open(test_file, 'w') as f:
f.write(invalid_code)
try:
with self.app.test_client() as client:
# 测试文件上传(应该成功)
with open(test_file, 'rb') as upload_file:
response = client.post('/api/upload', data={'file': upload_file})
assert response.status_code == 200
upload_data = response.get_json()
assert upload_data['success'] is True
# 测试解析(应该失败)
parse_data = {
'file_path': test_file,
'options': {'include_functions': True}
}
response = client.post('/api/parse', json=parse_data)
# 解析可能会失败,但应该返回适当的错误响应
assert response.status_code in [200, 400, 500]
if response.status_code == 200:
parse_result = response.get_json()
if parse_result['success']:
parse_job_id = parse_result['data']['job_id']
self._wait_for_job_completion(parse_job_id)
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def test_pipeline_performance_metrics(self):
"""测试管道性能指标"""
with patch('src.parser.CodeParser') as mock_parser, \
patch('src.spec.LLMGenerator') as mock_llm, \
patch('src.mutate.MutationEngine') as mock_mutate, \
patch('src.verify.CBMCRunner') as mock_cbmc, \
patch('src.ui.job_manager.JobManager') as mock_job_manager, \
patch('src.ui.workflow.WorkflowManager') as mock_workflow:
# 设置性能测试模拟(快速响应)
self._setup_performance_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc)
test_code = """
int performance_test(int a, int b, int c) {
int result = a + b * c;
if (result > 100) {
result = result / 2;
}
return result;
}
"""
test_file = os.path.join(self.temp_dir, "performance_test.c")
with open(test_file, 'w') as f:
f.write(test_code)
try:
start_time = time.time()
with self.app.test_client() as client:
# 完整工作流
workflow_data = {
'source_files': [test_file],
'target_functions': ['performance_test'],
'workflow_config': {
'mode': 'standard',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True
}
}
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
workflow_id = workflow_result['workflow_id']
# 等待完成
self._wait_for_workflow_completion(workflow_id)
end_time = time.time()
execution_time = end_time - start_time
# 验证执行时间在合理范围内
assert execution_time < 60.0 # 应该在60秒内完成
# 获取详细的性能统计
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
final_result = response.get_json()
# 验证性能指标存在
workflow_data = final_result['workflow']
assert 'completed_at' in workflow_data
assert 'created_at' in workflow_data
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def test_pipeline_with_large_file(self):
"""测试大文件处理管道"""
with patch('src.parser.CodeParser') as mock_parser, \
patch('src.spec.LLMGenerator') as mock_llm, \
patch('src.mutate.MutationEngine') as mock_mutate, \
patch('src.verify.CBMCRunner') as mock_cbmc:
# 设置大文件处理模拟
self._setup_large_file_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc)
# 创建较大的测试文件
large_code = []
for i in range(100):
large_code.append(f"""
int large_function_{i}(int x, int y) {{
/* Complex computation {i} */
int result = 0;
for (int j = 0; j < {i % 10 + 1}; j++) {{
result += x * y + j;
}}
return result;
}}
""")
test_file = os.path.join(self.temp_dir, "large_test.c")
with open(test_file, 'w') as f:
f.write('\n'.join(large_code))
try:
with self.app.test_client() as client:
# 上传大文件
with open(test_file, 'rb') as upload_file:
response = client.post('/api/upload', data={'file': upload_file})
assert response.status_code == 200
upload_data = response.get_json()
assert upload_data['success'] is True
# 解析大文件
parse_data = {
'file_path': test_file,
'options': {
'include_functions': True,
'complexity_analysis': True
}
}
response = client.post('/api/parse', json=parse_data)
assert response.status_code == 200
parse_result = response.get_json()
assert parse_result['success'] is True
parse_job_id = parse_result['data']['job_id']
# 等待解析完成
self._wait_for_job_completion(parse_job_id)
# 获取解析结果
response = client.get(f'/api/jobs/{parse_job_id}')
assert response.status_code == 200
job_result = response.get_json()
assert job_result['success'] is True
# 验证解析了正确数量的函数
job_data = job_result['job']
if 'result' in job_data and 'functions' in job_data['result']:
functions = job_data['result']['functions']
assert len(functions) == 100
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def test_pipeline_resume_and_recovery(self):
"""测试管道恢复和恢复机制"""
with patch('src.parser.CodeParser') as mock_parser, \
patch('src.spec.LLMGenerator') as mock_llm, \
patch('src.mutate.MutationEngine') as mock_mutate, \
patch('src.verify.CBMCRunner') as mock_cbmc, \
patch('src.ui.job_manager.JobManager') as mock_job_manager, \
patch('src.ui.workflow.WorkflowManager') as mock_workflow:
# 设置恢复机制模拟
self._setup_recovery_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc, mock_job_manager, mock_workflow)
test_code = """
int recovery_test(int x) {
return x * 2;
}
"""
test_file = os.path.join(self.temp_dir, "recovery_test.c")
with open(test_file, 'w') as f:
f.write(test_code)
try:
with self.app.test_client() as client:
# 启动工作流
workflow_data = {
'source_files': [test_file],
'target_functions': ['recovery_test'],
'workflow_config': {
'mode': 'standard',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True
}
}
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
workflow_id = workflow_result['workflow_id']
# 等待工作流完成
self._wait_for_workflow_completion(workflow_id)
# 验证工作流状态可以被查询
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
status_result = response.get_json()
assert status_result['success'] is True
# 验证作业历史
response = client.get('/api/jobs')
assert response.status_code == 200
jobs_result = response.get_json()
assert jobs_result['success'] is True
assert len(jobs_result['jobs']) > 0
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def test_pipeline_configuration_variations(self):
"""测试不同配置的管道变体"""
with patch('src.parser.CodeParser') as mock_parser, \
patch('src.spec.LLMGenerator') as mock_llm, \
patch('src.mutate.MutationEngine') as mock_mutate, \
patch('src.verify.CBMCRunner') as mock_cbmc:
# 设置配置变体模拟
self._setup_configuration_mocks(mock_parser, mock_llm, mock_mutate, mock_cbmc)
test_code = """
int config_test(int a, int b) {
return a + b;
}
"""
test_file = os.path.join(self.temp_dir, "config_test.c")
with open(test_file, 'w') as f:
f.write(test_code)
try:
configurations = [
{
'name': 'minimal',
'config': {
'mode': 'minimal',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': False,
'enable_verification': False
}
},
{
'name': 'generation_only',
'config': {
'mode': 'generation_only',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': False,
'enable_verification': True
}
},
{
'name': 'comprehensive',
'config': {
'mode': 'comprehensive',
'enable_parsing': True,
'enable_generation': True,
'enable_mutation': True,
'enable_verification': True
}
}
]
with self.app.test_client() as client:
for config_info in configurations:
workflow_data = {
'source_files': [test_file],
'target_functions': ['config_test'],
'workflow_config': config_info['config']
}
response = client.post('/api/workflow', json=workflow_data)
assert response.status_code == 200
workflow_result = response.get_json()
workflow_id = workflow_result['workflow_id']
# 等待完成
self._wait_for_workflow_completion(workflow_id)
# 验证结果
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
final_result = response.get_json()
assert final_result['success'] is True
assert final_result['workflow']['status'] == 'completed'
finally:
if os.path.exists(test_file):
os.unlink(test_file)
def _wait_for_job_completion(self, job_id: str, timeout: int = 30):
"""等待作业完成"""
import time
start_time = time.time()
with self.app.test_client() as client:
while time.time() - start_time < timeout:
response = client.get(f'/api/jobs/{job_id}')
assert response.status_code == 200
job_data = response.get_json()
if job_data['success']:
job = job_data['job']
if job['status'] in ['completed', 'failed', 'cancelled']:
return
time.sleep(1)
pytest.fail(f"Job {job_id} did not complete within {timeout} seconds")
def _wait_for_workflow_completion(self, workflow_id: str, timeout: int = 60):
"""等待工作流完成"""
import time
start_time = time.time()
with self.app.test_client() as client:
while time.time() - start_time < timeout:
response = client.get(f'/api/workflow/{workflow_id}')
assert response.status_code == 200
workflow_data = response.get_json()
if workflow_data['success']:
workflow = workflow_data['workflow']
if workflow['status'] in ['completed', 'failed', 'cancelled']:
return
time.sleep(2)
pytest.fail(f"Workflow {workflow_id} did not complete within {timeout} seconds")
def _setup_comprehensive_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc, mock_job_manager, mock_workflow):
"""设置完整的模拟环境"""
# 使用模拟工厂创建完整的模拟环境
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.SUCCESS)
# 配置各个模拟对象
mock_parser.return_value = mocks['parser'].return_value
mock_llm.return_value = mocks['llm_generator'].return_value
mock_mutate.return_value = mocks['mutation_engine'].return_value
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
mock_job_manager.return_value = mocks['job_manager'].return_value
mock_workflow.return_value = mocks['workflow_manager'].return_value
def _setup_memory_safety_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc):
"""设置内存安全特定的模拟"""
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.MEMORY_SAFETY)
mock_parser.return_value = mocks['parser'].return_value
mock_llm.return_value = mocks['llm_generator'].return_value
mock_mutate.return_value = mocks['mutation_engine'].return_value
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
def _setup_concurrent_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc):
"""设置并发执行模拟"""
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.CONCURRENT)
mock_parser.return_value = mocks['parser'].return_value
mock_llm.return_value = mocks['llm_generator'].return_value
mock_mutate.return_value = mocks['mutation_engine'].return_value
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
def _setup_error_handling_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc):
"""设置错误处理模拟"""
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.ERROR)
mock_parser.return_value = mocks['parser'].return_value
mock_llm.return_value = mocks['llm_generator'].return_value
mock_mutate.return_value = mocks['mutation_engine'].return_value
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
def _setup_performance_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc):
"""设置性能测试模拟(快速响应)"""
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.PERFORMANCE)
mock_parser.return_value = mocks['parser'].return_value
mock_llm.return_value = mocks['llm_generator'].return_value
mock_mutate.return_value = mocks['mutation_engine'].return_value
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
def _setup_large_file_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc):
"""设置大文件处理模拟"""
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.LARGE_FILE)
mock_parser.return_value = mocks['parser'].return_value
mock_llm.return_value = mocks['llm_generator'].return_value
mock_mutate.return_value = mocks['mutation_engine'].return_value
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
def _setup_recovery_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc, mock_job_manager, mock_workflow):
"""设置恢复机制模拟"""
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.SUCCESS)
mock_parser.return_value = mocks['parser'].return_value
mock_llm.return_value = mocks['llm_generator'].return_value
mock_mutate.return_value = mocks['mutation_engine'].return_value
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
mock_job_manager.return_value = mocks['job_manager'].return_value
mock_workflow.return_value = mocks['workflow_manager'].return_value
def _setup_configuration_mocks(self, mock_parser, mock_llm, mock_mutate, mock_cbmc):
"""设置配置变体模拟"""
mocks = self.mock_factory.create_comprehensive_mocks(MockScenario.SUCCESS)
mock_parser.return_value = mocks['parser'].return_value
mock_llm.return_value = mocks['llm_generator'].return_value
mock_mutate.return_value = mocks['mutation_engine'].return_value
mock_cbmc.return_value = mocks['cbmc_runner'].return_value
class TestPipelineModuleIntegration:
"""管道模块集成测试"""
def setup_method(self):
"""测试方法设置"""
self.config = get_config()
self.temp_dir = tempfile.mkdtemp()
self.config.working_directory = self.temp_dir
def teardown_method(self):
"""测试方法清理"""
import shutil
if os.path.exists(self.temp_dir):
shutil.rmtree(self.temp_dir)
def test_parser_llm_generator_integration(self):
"""测试解析器和LLM生成器集成"""
from src.parser import CodeParser
from src.spec import SpecGenerator
# 创建测试代码
test_code = """
int integration_test(int x, int y) {
return x + y;
}
"""
# 使用解析器
parser = CodeParser(self.config)
parse_result = parser.parse_code(test_code)
assert parse_result.success
assert len(parse_result.functions) > 0
# 使用LLM生成器
generator = SpecGenerator(self.config)
function_metadata = [
{
'name': 'integration_test',
'return_type': 'int',
'parameters': [
{'name': 'x', 'type': 'int'},
{'name': 'y', 'type': 'int'}
],
'complexity_score': 0.3
}
]
# 模拟LLM响应
with patch.object(generator, 'generate_specification') as mock_generate:
mock_generate.return_value = {
'specification': 'void integration_test(int x, int y) { __CPROVER_assume(x >= 0 && y >= 0); }',
'confidence': 0.8,
'explanation': 'Generated specification for integration test'
}
result = generator.generate_specification(function_metadata[0])
assert result['specification'] is not None
def test_llm_generator_mutator_integration(self):
"""测试LLM生成器和突变器集成"""
from src.spec import SpecGenerator
from src.mutate import MutationEngine
specification = 'void test_function(int x) { __CPROVER_assume(x > 0); }'
function_metadata = {
'name': 'test_function',
'parameters': [{'name': 'x', 'type': 'int'}]
}
# 使用突变器
config = Mock()
config.max_candidates_per_function = 10
config.max_selected_candidates = 5
config.quality_threshold = 0.6
mutator = MutationEngine(config)
# 模拟突变
with patch.object(mutator, 'execute_mutation') as mock_mutate:
mock_result = Mock()
mock_result.selected_candidates = [
Mock(
mutated_spec='void test_function(int x) { __CPROVER_assume(x >= 0); }',
confidence=0.8
)
]
mock_mutate.return_value = mock_result
result = mutator.execute_mutation(specification, function_metadata)
assert len(result.selected_candidates) > 0
def test_mutator_verifier_integration(self):
"""测试突变器和验证器集成"""
from src.mutate import MutationEngine
from src.verify import CBMCRunner
mutated_spec = 'void test_function(int x) { __CPROVER_assume(x >= 0); }'
function_metadata = {"name": "test_function"}
# 使用验证器
verifier = CBMCRunner()
# 模拟验证
with patch.object(verifier, 'run_verification') as mock_verify:
mock_result = Mock()
mock_result.status = Mock()
mock_result.status.value = 'successful'
mock_verify.return_value = mock_result
# 异步测试
async def test_async():
result = await verifier.run_verification(function_metadata, "test.c", mutated_spec)
assert result.status.value == 'successful'
asyncio.run(test_async())
def test_full_module_chain_integration(self):
"""测试完整模块链集成"""
test_code = """
int chain_test(int a, int b) {
if (a > 0 && b > 0) {
return a + b;
}
return 0;
}
"""
# 1. 解析
from src.parser import CodeParser
parser = CodeParser(self.config)
parse_result = parser.parse_code(test_code)
assert parse_result.success
# 2. 生成规范
from src.spec import SpecGenerator
generator = SpecGenerator(self.config)
with patch.object(generator, 'generate_specification') as mock_generate:
mock_generate.return_value = {
'specification': 'void chain_test(int a, int b) { __CPROVER_assume(a > 0 && b > 0); }',
'confidence': 0.8,
'explanation': 'Generated specification'
}
spec_result = generator.generate_specification(parse_result.functions[0])
assert spec_result['specification'] is not None
# 3. 突变
from src.mutate import MutationEngine
config = Mock()
config.max_candidates_per_function = 5
config.max_selected_candidates = 3
config.quality_threshold = 0.7
mutator = MutationEngine(config)
with patch.object(mutator, 'execute_mutation') as mock_mutate:
mock_result = Mock()
mock_result.selected_candidates = [
Mock(
mutated_spec='void chain_test(int a, int b) { __CPROVER_assume(a >= 0 && b >= 0); }',
confidence=0.8
)
]
mock_mutate.return_value = mock_result
mutation_result = mutator.execute_mutation(
spec_result['specification'],
{'name': 'chain_test', 'parameters': parse_result.functions[0].parameters}
)
assert len(mutation_result.selected_candidates) > 0
# 4. 验证
from src.verify import CBMCRunner
verifier = CBMCRunner()
with patch.object(verifier, 'run_verification') as mock_verify:
mock_result = Mock()
mock_result.status = Mock()
mock_result.status.value = 'successful'
mock_verify.return_value = mock_result
async def test_verification():
result = await verifier.run_verification(
{'name': 'chain_test'},
"test.c",
mutation_result.selected_candidates[0].mutated_spec
)
assert result.status.value == 'successful'
asyncio.run(test_verification())
def test_error_propagation_through_chain(self):
"""测试错误在链中的传播"""
# 测试解析错误传播
from src.parser import CodeParser
parser = CodeParser(self.config)
invalid_code = "int invalid( {"
parse_result = parser.parse_code(invalid_code)
assert not parse_result.success
assert parse_result.error is not None
# 测试生成错误传播
from src.spec import SpecGenerator
generator = SpecGenerator(self.config)
with patch.object(generator, 'generate_specification') as mock_generate:
mock_generate.side_effect = Exception("LLM service unavailable")
with pytest.raises(Exception):
generator.generate_specification(Mock())
# 测试突变错误传播
from src.mutate import MutationEngine
config = Mock()
mutator = MutationEngine(config)
with patch.object(mutator, 'execute_mutation') as mock_mutate:
mock_mutate.side_effect = Exception("Mutation service error")
with pytest.raises(Exception):
mutator.execute_mutation("void test() { }", {"name": "test"})
# 测试验证错误传播
from src.verify import CBMCRunner
verifier = CBMCRunner()
with patch.object(verifier, 'run_verification') as mock_verify:
mock_verify.side_effect = Exception("CBMC not available")
async def test_verification_error():
with pytest.raises(Exception):
await verifier.run_verification({"name": "test"}, "test.c", "void test() { }")
asyncio.run(test_verification_error())
if __name__ == "__main__":
pytest.main([__file__, "-v"])