""" Verify模块单元测试 本模块为verify模块提供全面的单元测试覆盖,包括: - CBMCRunner测试 - CommandBuilder测试 - ResultParser测试 - HarnessGenerator测试 - 异步功能测试 - 错误处理测试 """ import pytest import asyncio import tempfile import json import xml.etree.ElementTree as ET from pathlib import Path from unittest.mock import Mock, patch, AsyncMock, MagicMock from typing import Dict, List, Any, Optional import subprocess from src.verify.cbmc_runner import CBMCRunner from src.verify.command_builder import CommandBuilder from src.verify.result_parser import ResultParser from src.verify.harness_gen import HarnessGenerator from src.verify.verification_types import ( VerificationResult, VerificationStatus, VerificationType, CBMCConfiguration, VerificationSession, VerificationGoal, CBMCMemoryModel, CBMCOutputFormat, CounterExample, CounterExampleState, CBMCStats ) from src.verify.exceptions import ( VerificationError, CBMCTimeoutError, CBMCInstallationError, ConcurrencyError, ResourceError, CBMCConfigurationError, CommandBuildError, ResultParseError, HarnessError ) from src.parser.metadata import FunctionInfo, ParameterInfo, SourceLocation from src.utils.config import ConfigManager class TestCommandBuilder: """CommandBuilder类测试""" def setup_method(self): """测试方法设置""" self.config = CBMCConfiguration( cbmc_path="cbmc", working_directory="/tmp", memory_model=CBMCMemoryModel.LITTLE_ENDIAN, output_format=CBMCOutputFormat.TEXT, default_timeout=300, max_concurrent_jobs=4 ) self.builder = CommandBuilder(self.config) def test_init_with_config(self): """测试使用配置初始化""" assert self.builder.config == self.config assert self.builder.config.cbmc_path == "cbmc" def test_init_without_config(self): """测试无配置初始化""" with patch('src.verify.command_builder.get_config') as mock_get_config: mock_config = Mock() mock_config.get.return_value = {"cbmc": {"path": "test_cbmc"}} builder = CommandBuilder() assert builder.config.cbmc_path == "test_cbmc" def test_build_basic_command(self): """测试构建基本命令""" with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f: harness_file = f.name try: command = self.builder.build_command(harness_file) expected = ["cbmc", harness_file] assert command == expected finally: Path(harness_file).unlink() def test_build_command_with_source_files(self): """测试构建包含源文件的命令""" with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f: harness_file = f.name source_files = ["main.c", "utils.c"] try: command = self.builder.build_command(harness_file, source_files=source_files) expected = ["cbmc", harness_file, "main.c", "utils.c"] assert command == expected finally: Path(harness_file).unlink() def test_build_command_with_verification_types(self): """测试构建包含验证类型的命令""" with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f: harness_file = f.name verification_types = [VerificationType.MEMORY_SAFETY, VerificationType.OVERFLOW_DETECTION] try: command = self.builder.build_command(harness_file, verification_types=verification_types) assert "--bounds-check" in command assert "--pointer-check" in command assert "--overflow-check" in command finally: Path(harness_file).unlink() def test_build_command_with_additional_options(self): """测试构建包含额外选项的命令""" with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f: harness_file = f.name additional_options = { "unwind": "10", "depth": "20", "json_output": True } try: command = self.builder.build_command(harness_file, additional_options=additional_options) assert "--unwind" in command assert "10" in command assert "--depth" in command assert "20" in command finally: Path(harness_file).unlink() def test_build_command_with_json_output(self): """测试JSON输出格式命令构建""" self.config.output_format = CBMCOutputFormat.JSON builder = CommandBuilder(self.config) with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f: harness_file = f.name try: command = builder.build_command(harness_file) assert "--json-ui" in command finally: Path(harness_file).unlink() def test_build_command_with_xml_output(self): """测试XML输出格式命令构建""" self.config.output_format = CBMCOutputFormat.XML builder = CommandBuilder(self.config) with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f: harness_file = f.name try: command = builder.build_command(harness_file) assert "--xml-ui" in command finally: Path(harness_file).unlink() def test_build_command_error_invalid_harness(self): """测试无效harness文件路径错误""" with pytest.raises(CommandBuildError): self.builder.build_command("/nonexistent/file.c") def test_build_command_memory_model_options(self): """测试内存模型选项""" self.config.memory_model = CBMCMemoryModel.LITTLE_ENDIAN builder = CommandBuilder(self.config) with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f: harness_file = f.name try: command = builder.build_command(harness_file) assert "--little-endian" in command finally: Path(harness_file).unlink() class TestResultParser: """ResultParser类测试""" def setup_method(self): """测试方法设置""" self.parser = ResultParser() def test_init(self): """测试初始化""" assert hasattr(self.parser, 'patterns') assert 'version' in self.parser.patterns assert 'verification_result' in self.parser.patterns def test_parse_simple_success_result(self): """测试解析简单成功结果""" output = """ CBMC version 5.72.0 Parsing test.c Generating GOTO Programs Removing function pointers Generic Property Instrumentation Starting Bounded Model Checking Verification result: SUCCESSFUL Number of verified properties: 5 Number of failed properties: 0 """ result = self.parser.parse_result(output) assert result.status == VerificationStatus.SUCCESSFUL assert result.statistics.verified_properties == 5 assert result.statistics.failed_properties == 0 def test_parse_failure_result_with_counterexample(self): """测试解析失败结果及反例""" output = """ CBMC version 5.72.0 Parsing test.c Verification result: FAILURE Counterexample: State 1 file test.c line 10 function main x = 10 State 2 file test.c line 11 function main y = 20 FAILURE: array_bounds_violation Number of verified properties: 5 Number of failed properties: 1 """ result = self.parser.parse_result(output) assert result.status == VerificationStatus.FAILED assert len(result.counterexamples) == 1 assert result.counterexamples[0].failure_type == "array_bounds_violation" def test_parse_timeout_result(self): """测试解析超时结果""" output = """ CBMC version 5.72.0 Parsing test.c TIMEOUT """ result = self.parser.parse_result(output) assert result.status == VerificationStatus.TIMEOUT def test_parse_json_output(self): """测试解析JSON输出""" json_data = { "cbmcVersion": "5.72.0", "result": { "status": "successful", "properties": [ {"name": "property1", "status": "SUCCESS"}, {"name": "property2", "status": "SUCCESS"} ] }, "statistics": { "verifiedProperties": 2, "failedProperties": 0 } } output = json.dumps(json_data) result = self.parser.parse_result(output, output_format=CBMCOutputFormat.JSON) assert result.status == VerificationStatus.SUCCESSFUL assert result.statistics.verified_properties == 2 def test_parse_xml_output(self): """测试解析XML输出""" xml_data = """ 5.72.0 3 0 """ result = self.parser.parse_result(xml_data, output_format=CBMCOutputFormat.XML) assert result.status == VerificationStatus.SUCCESSFUL assert result.statistics.verified_properties == 3 def test_parse_error_output(self): """测试解析错误输出""" output = """ ERROR: Cannot open file 'nonexistent.c' """ result = self.parser.parse_result(output) assert result.status == VerificationStatus.ERROR assert "Cannot open file" in result.error_message def test_extract_counterexample_states(self): """测试提取反例状态""" counterexample_text = """ Counterexample: State 1 file test.c line 10 function main x = 10 State 2 file test.c line 11 function main y = 20 State 3 file test.c line 12 function main array[5] = 100 """ states = self.parser._extract_counterexample_states(counterexample_text) assert len(states) == 3 assert states[0].state_number == 1 assert states[0].file_path == "test.c" assert states[0].line_number == 10 assert states[0].function_name == "main" assert "x = 10" in states[0].assignments def test_parse_property_status(self): """测试解析属性状态""" property_line = "[SUCCESS] array_bounds: array_bounds_check" property_info = self.parser._parse_property_status(property_line) assert property_info['status'] == 'SUCCESS' assert property_info['name'] == 'array_bounds' assert property_info['description'] == 'array_bounds_check' def test_parse_statistics(self): """测试解析统计信息""" stats_text = """ Number of verified properties: 10 Number of failed properties: 2 Verification time: 5.23s """ stats = self.parser._parse_statistics(stats_text) assert stats['verified_properties'] == 10 assert stats['failed_properties'] == 2 assert stats['verification_time'] == '5.23s' def test_parse_empty_output(self): """测试解析空输出""" with pytest.raises(ResultParseError): self.parser.parse_result("") def test_parse_malformed_json(self): """测试解析格式错误的JSON""" malformed_json = '{"invalid": json}' with pytest.raises(ResultParseError): self.parser.parse_result(malformed_json, output_format=CBMCOutputFormat.JSON) def test_parse_malformed_xml(self): """测试解析格式错误的XML""" malformed_xml = '' with pytest.raises(ResultParseError): self.parser.parse_result(malformed_xml, output_format=CBMCOutputFormat.XML) class TestHarnessGenerator: """HarnessGenerator类测试""" def setup_method(self): """测试方法设置""" self.config = CBMCConfiguration( working_directory="/tmp", harness_template_dir="/templates" ) with patch('src.verify.harness_gen.get_config') as mock_get_config: mock_get_config.return_value = Mock() self.generator = HarnessGenerator(self.config) def test_init(self): """测试初始化""" assert self.generator.config == self.config assert hasattr(self.generator, 'output_dirs') def test_create_output_directories(self): """测试创建输出目录""" with tempfile.TemporaryDirectory() as temp_dir: self.config.working_directory = temp_dir generator = HarnessGenerator(self.config) assert generator.output_dirs['harnesses'].exists() assert generator.output_dirs['results'].exists() assert generator.output_dirs['logs'].exists() def test_generate_basic_harness(self): """测试生成基本harness""" function_info = FunctionInfo( name="test_function", return_type="int", parameters=[ ParameterInfo(name="a", type="int", is_pointer=False), ParameterInfo(name="b", type="int", is_pointer=False) ], source_location=SourceLocation(file="test.c", line=10, column=1), complexity_score=0.5 ) with tempfile.TemporaryDirectory() as temp_dir: self.config.working_directory = temp_dir generator = HarnessGenerator(self.config) harness_path, harness_metadata = generator.generate_harness(function_info) assert harness_metadata.function_name == "test_function" assert Path(harness_path).exists() assert harness_metadata.verification_goals def test_generate_memory_safety_harness(self): """测试生成内存安全harness""" function_info = FunctionInfo( name="memory_function", return_type="void", parameters=[ ParameterInfo(name="buffer", type="char*", is_pointer=True), ParameterInfo(name="size", type="size_t", is_pointer=False) ], source_location=SourceLocation(file="memory.c", line=5, column=1), complexity_score=0.8 ) with tempfile.TemporaryDirectory() as temp_dir: self.config.working_directory = temp_dir generator = HarnessGenerator(self.config) harness_path, harness_metadata = generator.generate_harness( function_info, verification_types=[VerificationType.MEMORY_SAFETY] ) assert VerificationType.MEMORY_SAFETY in harness_metadata.verification_goals harness_content = Path(harness_path).read_text() assert "bounds-check" in harness_content or "pointer-check" in harness_content def test_generate_overflow_detection_harness(self): """测试生成溢出检测harness""" function_info = FunctionInfo( name="overflow_function", return_type="int", parameters=[ ParameterInfo(name="a", type="int", is_pointer=False), ParameterInfo(name="b", type="int", is_pointer=False) ], source_location=SourceLocation(file="overflow.c", line=8, column=1), complexity_score=0.6 ) with tempfile.TemporaryDirectory() as temp_dir: self.config.working_directory = temp_dir generator = HarnessGenerator(self.config) harness_path, harness_metadata = generator.generate_harness( function_info, verification_types=[VerificationType.OVERFLOW_DETECTION] ) assert VerificationType.OVERFLOW_DETECTION in harness_metadata.verification_goals harness_content = Path(harness_path).read_text() assert "overflow-check" in harness_content def test_generate_pointer_validity_harness(self): """测试生成指针有效性harness""" function_info = FunctionInfo( name="pointer_function", return_type="void", parameters=[ ParameterInfo(name="ptr", type="int*", is_pointer=True), ParameterInfo(name="count", type="int", is_pointer=False) ], source_location=SourceLocation(file="pointer.c", line=12, column=1), complexity_score=0.7 ) with tempfile.TemporaryDirectory() as temp_dir: self.config.working_directory = temp_dir generator = HarnessGenerator(self.config) harness_path, harness_metadata = generator.generate_harness( function_info, verification_types=[VerificationType.POINTER_VALIDITY] ) assert VerificationType.POINTER_VALIDITY in harness_metadata.verification_goals harness_content = Path(harness_path).read_text() assert "pointer-check" in harness_content def test_generate_concurrency_harness(self): """测试生成并发性harness""" function_info = FunctionInfo( name="concurrent_function", return_type="void", parameters=[ ParameterInfo(name="shared_var", type="int*", is_pointer=True), ParameterInfo(name="mutex", type="pthread_mutex_t*", is_pointer=True) ], source_location=SourceLocation(file="concurrent.c", line=15, column=1), complexity_score=0.9 ) with tempfile.TemporaryDirectory() as temp_dir: self.config.working_directory = temp_dir generator = HarnessGenerator(self.config) harness_path, harness_metadata = generator.generate_harness( function_info, verification_types=[VerificationType.CONCURRENCY] ) assert VerificationType.CONCURRENCY in harness_metadata.verification_goals def test_generate_harness_with_custom_options(self): """测试生成自定义选项harness""" function_info = FunctionInfo( name="custom_function", return_type="int", parameters=[], source_location=SourceLocation(file="custom.c", line=1, column=1), complexity_score=0.4 ) custom_options = { "unwind": "5", "depth": "10", "additional_includes": ["custom.h"] } with tempfile.TemporaryDirectory() as temp_dir: self.config.working_directory = temp_dir generator = HarnessGenerator(self.config) harness_path, harness_metadata = generator.generate_harness( function_info, custom_options=custom_options ) harness_content = Path(harness_path).read_text() # 验证自定义选项被包含 assert "custom.h" in harness_content def test_generate_harness_error_invalid_function(self): """测试无效函数错误""" function_info = FunctionInfo( name="", return_type="", parameters=[], source_location=SourceLocation(file="", line=0, column=0), complexity_score=0.0 ) with tempfile.TemporaryDirectory() as temp_dir: self.config.working_directory = temp_dir generator = HarnessGenerator(self.config) with pytest.raises(HarnessError): generator.generate_harness(function_info) def test_validate_harness_syntax(self): """测试验证harness语法""" function_info = FunctionInfo( name="validate_function", return_type="int", parameters=[], source_location=SourceLocation(file="validate.c", line=1, column=1), complexity_score=0.3 ) with tempfile.TemporaryDirectory() as temp_dir: self.config.working_directory = temp_dir generator = HarnessGenerator(self.config) harness_path, harness_metadata = generator.generate_harness(function_info) is_valid = generator.validate_harness_syntax(harness_path) assert is_valid def test_setup_verification_environment(self): """测试设置验证环境""" with tempfile.TemporaryDirectory() as temp_dir: self.config.working_directory = temp_dir generator = HarnessGenerator(self.config) source_files = ["main.c", "utils.c"] env_info = generator.setup_verification_environment(source_files) assert env_info.source_files == source_files assert env_info.environment_ready class TestCBMCRunner: """CBMCRunner类测试""" def setup_method(self): """测试方法设置""" self.config = CBMCConfiguration( cbmc_path="cbmc", working_directory="/tmp", default_timeout=30, max_concurrent_jobs=2 ) self.runner = CBMCRunner(self.config) def test_init(self): """测试初始化""" assert self.runner.config == self.config assert self.runner.active_sessions == {} @pytest.mark.asyncio async def test_run_verification_success(self): """测试成功运行验证""" function_metadata = {"name": "test_function"} source_file = "test.c" specification = "void test_function() { }" with patch.object(self.runner, '_execute_cbmc_command') as mock_execute: mock_execute.return_value = "Verification result: SUCCESSFUL" result = await self.runner.run_verification( function_metadata, source_file, specification ) assert result.status == VerificationStatus.SUCCESSFUL mock_execute.assert_called_once() @pytest.mark.asyncio async def test_run_verification_timeout(self): """测试验证超时""" function_metadata = {"name": "test_function"} source_file = "test.c" specification = "void test_function() { }" with patch.object(self.runner, '_execute_cbmc_command') as mock_execute: mock_execute.side_effect = asyncio.TimeoutError() with pytest.raises(CBMCTimeoutError): await self.runner.run_verification( function_metadata, source_file, specification, timeout=1 ) @pytest.mark.asyncio async def test_run_verification_cbmc_error(self): """测试CBMC错误""" function_metadata = {"name": "test_function"} source_file = "test.c" specification = "void test_function() { }" with patch.object(self.runner, '_execute_cbmc_command') as mock_execute: mock_execute.side_effect = subprocess.CalledProcessError(1, "cbmc") with pytest.raises(VerificationError): await self.runner.run_verification(function_metadata, source_file, specification) @pytest.mark.asyncio async def test_run_verification_concurrent_limit(self): """测试并发限制""" function_metadata = {"name": "test_function"} source_file = "test.c" specification = "void test_function() { }" # 填满活跃会话 for i in range(self.config.max_concurrent_jobs): self.runner.active_sessions[f"session_{i}"] = Mock() with pytest.raises(ConcurrencyError): await self.runner.run_verification(function_metadata, source_file, specification) @pytest.mark.asyncio async def test_stream_verification_results(self): """测试流式验证结果""" function_metadata = {"name": "test_function"} source_file = "test.c" specification = "void test_function() { }" async def mock_stream(): outputs = ["Parsing...", "Generating...", "Verification result: SUCCESSFUL"] for output in outputs: yield output await asyncio.sleep(0.01) with patch.object(self.runner, '_stream_cbmc_output') as mock_stream: mock_stream.return_value = mock_stream() results = [] async for result in self.runner.stream_verification( function_metadata, source_file, specification ): results.append(result) assert len(results) == 3 assert "SUCCESSFUL" in results[-1] def test_create_verification_session(self): """测试创建验证会话""" function_metadata = {"name": "test_function"} source_files = ["test.c"] verification_types = [VerificationType.MEMORY_SAFETY] session = self.runner._create_verification_session( function_metadata, source_files, verification_types ) assert session.function_metadata == function_metadata assert session.source_files == source_files assert session.verification_types == verification_types assert session.session_id is not None assert session.created_at is not None @pytest.mark.asyncio async def test_execute_cbmc_command_success(self): """测试执行CBMC命令成功""" command = ["cbmc", "test.c"] with patch('asyncio.create_subprocess_exec') as mock_create: mock_process = Mock() mock_process.returncode = 0 mock_process.stdout.read.return_value = b"Verification result: SUCCESSFUL" mock_process.stderr.read.return_value = b"" mock_create.return_value.__aenter__.return_value = mock_process result = await self.runner._execute_cbmc_verification(command, "test_function", "test.c", "test.c", time.time()) assert "SUCCESSFUL" in result @pytest.mark.asyncio async def test_execute_cbmc_command_failure(self): """测试执行CBMC命令失败""" command = ["cbmc", "test.c"] with patch('asyncio.create_subprocess_exec') as mock_create: mock_process = Mock() mock_process.returncode = 1 mock_process.stdout.read.return_value = b"ERROR: Test error" mock_process.stderr.read.return_value = b"Error details" mock_create.return_value.__aenter__.return_value = mock_process with pytest.raises(subprocess.CalledProcessError): await self.runner._execute_cbmc_verification(command, "test_function", "test.c", "test.c", time.time()) def test_check_cbmc_availability(self): """测试检查CBMC可用性""" with patch('subprocess.run') as mock_run: mock_run.return_value.returncode = 0 mock_run.return_value.stdout = b"CBMC version 5.72.0" result = self.runner._check_cbmc_availability() assert result is True def test_check_cbmc_unavailable(self): """测试CBMC不可用""" with patch('subprocess.run') as mock_run: mock_run.side_effect = FileNotFoundError() with pytest.raises(CBMCInstallationError): self.runner._check_cbmc_availability() def test_cleanup_session(self): """测试清理会话""" session_id = "test_session" self.runner.active_sessions[session_id] = Mock() self.runner._cleanup_session(session_id) assert session_id not in self.runner.active_sessions def test_get_active_sessions(self): """测试获取活跃会话""" session1 = Mock() session2 = Mock() self.runner.active_sessions["session1"] = session1 self.runner.active_sessions["session2"] = session2 sessions = self.runner.get_active_sessions() assert len(sessions) == 2 assert session1 in sessions assert session2 in sessions def test_get_session_statistics(self): """测试获取会话统计""" session1 = Mock() session1.verification_results = [Mock(status=VerificationStatus.SUCCESSFUL)] session2 = Mock() session2.verification_results = [Mock(status=VerificationStatus.FAILED)] self.runner.active_sessions["session1"] = session1 self.runner.active_sessions["session2"] = session2 stats = self.runner.get_session_statistics() assert stats['total_sessions'] == 2 assert stats['successful_sessions'] == 1 assert stats['failed_sessions'] == 1 class TestVerifyModuleIntegration: """Verify模块集成测试""" def setup_method(self): """测试方法设置""" self.config = CBMCConfiguration( cbmc_path="cbmc", working_directory="/tmp", default_timeout=30, max_concurrent_jobs=2 ) @pytest.mark.asyncio async def test_complete_verification_workflow(self): """测试完整验证工作流""" # 创建模拟的函数元数据 function_metadata = { "name": "test_function", "parameters": [ {"name": "a", "type": "int"}, {"name": "b", "type": "int"} ], "return_type": "int" } # 创建CommandBuilder builder = CommandBuilder(self.config) # 创建HarnessGenerator with tempfile.TemporaryDirectory() as temp_dir: self.config.working_directory = temp_dir generator = HarnessGenerator(self.config) # 创建FunctionInfo function_info = FunctionInfo( name="test_function", return_type="int", parameters=[ ParameterInfo(name="a", type="int", is_pointer=False), ParameterInfo(name="b", type="int", is_pointer=False) ], source_location=SourceLocation(file="test.c", line=1, column=1), complexity_score=0.5 ) # 生成harness harness_path, harness_metadata = generator.generate_harness(function_info) assert Path(harness_path).exists() # 构建CBMC命令 command = builder.build_command( harness_path, verification_types=[VerificationType.MEMORY_SAFETY] ) assert command[0] == "cbmc" # 创建CBMCRunner并模拟运行 runner = CBMCRunner(self.config) with patch.object(runner, '_execute_cbmc_command') as mock_execute: mock_execute.return_value = "Verification result: SUCCESSFUL" result = await runner.run_verification( function_metadata, "test.c", harness_metadata.harness_file.read_text() ) assert result.status == VerificationStatus.SUCCESSFUL def test_error_handling_integration(self): """测试错误处理集成""" builder = CommandBuilder(self.config) # 测试命令构建错误 with pytest.raises(CommandBuildError): builder.build_command("/nonexistent/file.c") # 测试结果解析错误 parser = ResultParser() with pytest.raises(ResultParseError): parser.parse_result("") # 测试harness生成错误 with tempfile.TemporaryDirectory() as temp_dir: self.config.working_directory = temp_dir generator = HarnessGenerator(self.config) invalid_function = FunctionInfo( name="", return_type="", parameters=[], source_location=SourceLocation(file="", line=0, column=0), complexity_score=0.0 ) with pytest.raises(HarnessError): generator.generate_harness(invalid_function) if __name__ == "__main__": pytest.main([__file__, "-v"])