|
|
"""
|
|
|
Verify模块单元测试
|
|
|
|
|
|
本模块为verify模块提供全面的单元测试覆盖,包括:
|
|
|
- CBMCRunner测试
|
|
|
- CommandBuilder测试
|
|
|
- ResultParser测试
|
|
|
- HarnessGenerator测试
|
|
|
- 异步功能测试
|
|
|
- 错误处理测试
|
|
|
"""
|
|
|
|
|
|
import pytest
|
|
|
import asyncio
|
|
|
import tempfile
|
|
|
import json
|
|
|
import xml.etree.ElementTree as ET
|
|
|
from pathlib import Path
|
|
|
from unittest.mock import Mock, patch, AsyncMock, MagicMock
|
|
|
from typing import Dict, List, Any, Optional
|
|
|
import subprocess
|
|
|
|
|
|
from src.verify.cbmc_runner import CBMCRunner
|
|
|
from src.verify.command_builder import CommandBuilder
|
|
|
from src.verify.result_parser import ResultParser
|
|
|
from src.verify.harness_gen import HarnessGenerator
|
|
|
from src.verify.verification_types import (
|
|
|
VerificationResult, VerificationStatus, VerificationType, CBMCConfiguration,
|
|
|
VerificationSession, VerificationGoal, CBMCMemoryModel, CBMCOutputFormat,
|
|
|
CounterExample, CounterExampleState, CBMCStats
|
|
|
)
|
|
|
from src.verify.exceptions import (
|
|
|
VerificationError, CBMCTimeoutError, CBMCInstallationError,
|
|
|
ConcurrencyError, ResourceError, CBMCConfigurationError,
|
|
|
CommandBuildError, ResultParseError, HarnessError
|
|
|
)
|
|
|
from src.parser.metadata import FunctionInfo, ParameterInfo, SourceLocation
|
|
|
from src.utils.config import ConfigManager
|
|
|
|
|
|
|
|
|
class TestCommandBuilder:
|
|
|
"""CommandBuilder类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.config = CBMCConfiguration(
|
|
|
cbmc_path="cbmc",
|
|
|
working_directory="/tmp",
|
|
|
memory_model=CBMCMemoryModel.LITTLE_ENDIAN,
|
|
|
output_format=CBMCOutputFormat.TEXT,
|
|
|
default_timeout=300,
|
|
|
max_concurrent_jobs=4
|
|
|
)
|
|
|
self.builder = CommandBuilder(self.config)
|
|
|
|
|
|
def test_init_with_config(self):
|
|
|
"""测试使用配置初始化"""
|
|
|
assert self.builder.config == self.config
|
|
|
assert self.builder.config.cbmc_path == "cbmc"
|
|
|
|
|
|
def test_init_without_config(self):
|
|
|
"""测试无配置初始化"""
|
|
|
with patch('src.verify.command_builder.get_config') as mock_get_config:
|
|
|
mock_config = Mock()
|
|
|
mock_config.get.return_value = {"cbmc": {"path": "test_cbmc"}}
|
|
|
builder = CommandBuilder()
|
|
|
assert builder.config.cbmc_path == "test_cbmc"
|
|
|
|
|
|
def test_build_basic_command(self):
|
|
|
"""测试构建基本命令"""
|
|
|
with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f:
|
|
|
harness_file = f.name
|
|
|
|
|
|
try:
|
|
|
command = self.builder.build_command(harness_file)
|
|
|
expected = ["cbmc", harness_file]
|
|
|
assert command == expected
|
|
|
finally:
|
|
|
Path(harness_file).unlink()
|
|
|
|
|
|
def test_build_command_with_source_files(self):
|
|
|
"""测试构建包含源文件的命令"""
|
|
|
with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f:
|
|
|
harness_file = f.name
|
|
|
|
|
|
source_files = ["main.c", "utils.c"]
|
|
|
|
|
|
try:
|
|
|
command = self.builder.build_command(harness_file, source_files=source_files)
|
|
|
expected = ["cbmc", harness_file, "main.c", "utils.c"]
|
|
|
assert command == expected
|
|
|
finally:
|
|
|
Path(harness_file).unlink()
|
|
|
|
|
|
def test_build_command_with_verification_types(self):
|
|
|
"""测试构建包含验证类型的命令"""
|
|
|
with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f:
|
|
|
harness_file = f.name
|
|
|
|
|
|
verification_types = [VerificationType.MEMORY_SAFETY, VerificationType.OVERFLOW_DETECTION]
|
|
|
|
|
|
try:
|
|
|
command = self.builder.build_command(harness_file, verification_types=verification_types)
|
|
|
assert "--bounds-check" in command
|
|
|
assert "--pointer-check" in command
|
|
|
assert "--overflow-check" in command
|
|
|
finally:
|
|
|
Path(harness_file).unlink()
|
|
|
|
|
|
def test_build_command_with_additional_options(self):
|
|
|
"""测试构建包含额外选项的命令"""
|
|
|
with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f:
|
|
|
harness_file = f.name
|
|
|
|
|
|
additional_options = {
|
|
|
"unwind": "10",
|
|
|
"depth": "20",
|
|
|
"json_output": True
|
|
|
}
|
|
|
|
|
|
try:
|
|
|
command = self.builder.build_command(harness_file, additional_options=additional_options)
|
|
|
assert "--unwind" in command
|
|
|
assert "10" in command
|
|
|
assert "--depth" in command
|
|
|
assert "20" in command
|
|
|
finally:
|
|
|
Path(harness_file).unlink()
|
|
|
|
|
|
def test_build_command_with_json_output(self):
|
|
|
"""测试JSON输出格式命令构建"""
|
|
|
self.config.output_format = CBMCOutputFormat.JSON
|
|
|
builder = CommandBuilder(self.config)
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f:
|
|
|
harness_file = f.name
|
|
|
|
|
|
try:
|
|
|
command = builder.build_command(harness_file)
|
|
|
assert "--json-ui" in command
|
|
|
finally:
|
|
|
Path(harness_file).unlink()
|
|
|
|
|
|
def test_build_command_with_xml_output(self):
|
|
|
"""测试XML输出格式命令构建"""
|
|
|
self.config.output_format = CBMCOutputFormat.XML
|
|
|
builder = CommandBuilder(self.config)
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f:
|
|
|
harness_file = f.name
|
|
|
|
|
|
try:
|
|
|
command = builder.build_command(harness_file)
|
|
|
assert "--xml-ui" in command
|
|
|
finally:
|
|
|
Path(harness_file).unlink()
|
|
|
|
|
|
def test_build_command_error_invalid_harness(self):
|
|
|
"""测试无效harness文件路径错误"""
|
|
|
with pytest.raises(CommandBuildError):
|
|
|
self.builder.build_command("/nonexistent/file.c")
|
|
|
|
|
|
def test_build_command_memory_model_options(self):
|
|
|
"""测试内存模型选项"""
|
|
|
self.config.memory_model = CBMCMemoryModel.LITTLE_ENDIAN
|
|
|
builder = CommandBuilder(self.config)
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.c', delete=False) as f:
|
|
|
harness_file = f.name
|
|
|
|
|
|
try:
|
|
|
command = builder.build_command(harness_file)
|
|
|
assert "--little-endian" in command
|
|
|
finally:
|
|
|
Path(harness_file).unlink()
|
|
|
|
|
|
|
|
|
class TestResultParser:
|
|
|
"""ResultParser类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.parser = ResultParser()
|
|
|
|
|
|
def test_init(self):
|
|
|
"""测试初始化"""
|
|
|
assert hasattr(self.parser, 'patterns')
|
|
|
assert 'version' in self.parser.patterns
|
|
|
assert 'verification_result' in self.parser.patterns
|
|
|
|
|
|
def test_parse_simple_success_result(self):
|
|
|
"""测试解析简单成功结果"""
|
|
|
output = """
|
|
|
CBMC version 5.72.0
|
|
|
Parsing test.c
|
|
|
Generating GOTO Programs
|
|
|
Removing function pointers
|
|
|
Generic Property Instrumentation
|
|
|
Starting Bounded Model Checking
|
|
|
Verification result: SUCCESSFUL
|
|
|
Number of verified properties: 5
|
|
|
Number of failed properties: 0
|
|
|
"""
|
|
|
|
|
|
result = self.parser.parse_result(output)
|
|
|
assert result.status == VerificationStatus.SUCCESSFUL
|
|
|
assert result.statistics.verified_properties == 5
|
|
|
assert result.statistics.failed_properties == 0
|
|
|
|
|
|
def test_parse_failure_result_with_counterexample(self):
|
|
|
"""测试解析失败结果及反例"""
|
|
|
output = """
|
|
|
CBMC version 5.72.0
|
|
|
Parsing test.c
|
|
|
Verification result: FAILURE
|
|
|
Counterexample:
|
|
|
State 1 file test.c line 10 function main
|
|
|
x = 10
|
|
|
State 2 file test.c line 11 function main
|
|
|
y = 20
|
|
|
FAILURE: array_bounds_violation
|
|
|
Number of verified properties: 5
|
|
|
Number of failed properties: 1
|
|
|
"""
|
|
|
|
|
|
result = self.parser.parse_result(output)
|
|
|
assert result.status == VerificationStatus.FAILED
|
|
|
assert len(result.counterexamples) == 1
|
|
|
assert result.counterexamples[0].failure_type == "array_bounds_violation"
|
|
|
|
|
|
def test_parse_timeout_result(self):
|
|
|
"""测试解析超时结果"""
|
|
|
output = """
|
|
|
CBMC version 5.72.0
|
|
|
Parsing test.c
|
|
|
TIMEOUT
|
|
|
"""
|
|
|
|
|
|
result = self.parser.parse_result(output)
|
|
|
assert result.status == VerificationStatus.TIMEOUT
|
|
|
|
|
|
def test_parse_json_output(self):
|
|
|
"""测试解析JSON输出"""
|
|
|
json_data = {
|
|
|
"cbmcVersion": "5.72.0",
|
|
|
"result": {
|
|
|
"status": "successful",
|
|
|
"properties": [
|
|
|
{"name": "property1", "status": "SUCCESS"},
|
|
|
{"name": "property2", "status": "SUCCESS"}
|
|
|
]
|
|
|
},
|
|
|
"statistics": {
|
|
|
"verifiedProperties": 2,
|
|
|
"failedProperties": 0
|
|
|
}
|
|
|
}
|
|
|
|
|
|
output = json.dumps(json_data)
|
|
|
result = self.parser.parse_result(output, output_format=CBMCOutputFormat.JSON)
|
|
|
assert result.status == VerificationStatus.SUCCESSFUL
|
|
|
assert result.statistics.verified_properties == 2
|
|
|
|
|
|
def test_parse_xml_output(self):
|
|
|
"""测试解析XML输出"""
|
|
|
xml_data = """<?xml version="1.0" encoding="UTF-8"?>
|
|
|
<cprover>
|
|
|
<cbmcVersion>5.72.0</cbmcVersion>
|
|
|
<result status="SUCCESS"/>
|
|
|
<statistics>
|
|
|
<verifiedProperties>3</verifiedProperties>
|
|
|
<failedProperties>0</failedProperties>
|
|
|
</statistics>
|
|
|
</cprover>
|
|
|
"""
|
|
|
|
|
|
result = self.parser.parse_result(xml_data, output_format=CBMCOutputFormat.XML)
|
|
|
assert result.status == VerificationStatus.SUCCESSFUL
|
|
|
assert result.statistics.verified_properties == 3
|
|
|
|
|
|
def test_parse_error_output(self):
|
|
|
"""测试解析错误输出"""
|
|
|
output = """
|
|
|
ERROR: Cannot open file 'nonexistent.c'
|
|
|
"""
|
|
|
|
|
|
result = self.parser.parse_result(output)
|
|
|
assert result.status == VerificationStatus.ERROR
|
|
|
assert "Cannot open file" in result.error_message
|
|
|
|
|
|
def test_extract_counterexample_states(self):
|
|
|
"""测试提取反例状态"""
|
|
|
counterexample_text = """
|
|
|
Counterexample:
|
|
|
State 1 file test.c line 10 function main
|
|
|
x = 10
|
|
|
State 2 file test.c line 11 function main
|
|
|
y = 20
|
|
|
State 3 file test.c line 12 function main
|
|
|
array[5] = 100
|
|
|
"""
|
|
|
|
|
|
states = self.parser._extract_counterexample_states(counterexample_text)
|
|
|
assert len(states) == 3
|
|
|
assert states[0].state_number == 1
|
|
|
assert states[0].file_path == "test.c"
|
|
|
assert states[0].line_number == 10
|
|
|
assert states[0].function_name == "main"
|
|
|
assert "x = 10" in states[0].assignments
|
|
|
|
|
|
def test_parse_property_status(self):
|
|
|
"""测试解析属性状态"""
|
|
|
property_line = "[SUCCESS] array_bounds: array_bounds_check"
|
|
|
property_info = self.parser._parse_property_status(property_line)
|
|
|
assert property_info['status'] == 'SUCCESS'
|
|
|
assert property_info['name'] == 'array_bounds'
|
|
|
assert property_info['description'] == 'array_bounds_check'
|
|
|
|
|
|
def test_parse_statistics(self):
|
|
|
"""测试解析统计信息"""
|
|
|
stats_text = """
|
|
|
Number of verified properties: 10
|
|
|
Number of failed properties: 2
|
|
|
Verification time: 5.23s
|
|
|
"""
|
|
|
|
|
|
stats = self.parser._parse_statistics(stats_text)
|
|
|
assert stats['verified_properties'] == 10
|
|
|
assert stats['failed_properties'] == 2
|
|
|
assert stats['verification_time'] == '5.23s'
|
|
|
|
|
|
def test_parse_empty_output(self):
|
|
|
"""测试解析空输出"""
|
|
|
with pytest.raises(ResultParseError):
|
|
|
self.parser.parse_result("")
|
|
|
|
|
|
def test_parse_malformed_json(self):
|
|
|
"""测试解析格式错误的JSON"""
|
|
|
malformed_json = '{"invalid": json}'
|
|
|
with pytest.raises(ResultParseError):
|
|
|
self.parser.parse_result(malformed_json, output_format=CBMCOutputFormat.JSON)
|
|
|
|
|
|
def test_parse_malformed_xml(self):
|
|
|
"""测试解析格式错误的XML"""
|
|
|
malformed_xml = '<invalid><xml>'
|
|
|
with pytest.raises(ResultParseError):
|
|
|
self.parser.parse_result(malformed_xml, output_format=CBMCOutputFormat.XML)
|
|
|
|
|
|
|
|
|
class TestHarnessGenerator:
|
|
|
"""HarnessGenerator类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.config = CBMCConfiguration(
|
|
|
working_directory="/tmp",
|
|
|
harness_template_dir="/templates"
|
|
|
)
|
|
|
with patch('src.verify.harness_gen.get_config') as mock_get_config:
|
|
|
mock_get_config.return_value = Mock()
|
|
|
self.generator = HarnessGenerator(self.config)
|
|
|
|
|
|
def test_init(self):
|
|
|
"""测试初始化"""
|
|
|
assert self.generator.config == self.config
|
|
|
assert hasattr(self.generator, 'output_dirs')
|
|
|
|
|
|
def test_create_output_directories(self):
|
|
|
"""测试创建输出目录"""
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
self.config.working_directory = temp_dir
|
|
|
generator = HarnessGenerator(self.config)
|
|
|
|
|
|
assert generator.output_dirs['harnesses'].exists()
|
|
|
assert generator.output_dirs['results'].exists()
|
|
|
assert generator.output_dirs['logs'].exists()
|
|
|
|
|
|
def test_generate_basic_harness(self):
|
|
|
"""测试生成基本harness"""
|
|
|
function_info = FunctionInfo(
|
|
|
name="test_function",
|
|
|
return_type="int",
|
|
|
parameters=[
|
|
|
ParameterInfo(name="a", type="int", is_pointer=False),
|
|
|
ParameterInfo(name="b", type="int", is_pointer=False)
|
|
|
],
|
|
|
source_location=SourceLocation(file="test.c", line=10, column=1),
|
|
|
complexity_score=0.5
|
|
|
)
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
self.config.working_directory = temp_dir
|
|
|
generator = HarnessGenerator(self.config)
|
|
|
|
|
|
harness_path, harness_metadata = generator.generate_harness(function_info)
|
|
|
|
|
|
assert harness_metadata.function_name == "test_function"
|
|
|
assert Path(harness_path).exists()
|
|
|
assert harness_metadata.verification_goals
|
|
|
|
|
|
def test_generate_memory_safety_harness(self):
|
|
|
"""测试生成内存安全harness"""
|
|
|
function_info = FunctionInfo(
|
|
|
name="memory_function",
|
|
|
return_type="void",
|
|
|
parameters=[
|
|
|
ParameterInfo(name="buffer", type="char*", is_pointer=True),
|
|
|
ParameterInfo(name="size", type="size_t", is_pointer=False)
|
|
|
],
|
|
|
source_location=SourceLocation(file="memory.c", line=5, column=1),
|
|
|
complexity_score=0.8
|
|
|
)
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
self.config.working_directory = temp_dir
|
|
|
generator = HarnessGenerator(self.config)
|
|
|
|
|
|
harness_path, harness_metadata = generator.generate_harness(
|
|
|
function_info,
|
|
|
verification_types=[VerificationType.MEMORY_SAFETY]
|
|
|
)
|
|
|
|
|
|
assert VerificationType.MEMORY_SAFETY in harness_metadata.verification_goals
|
|
|
harness_content = Path(harness_path).read_text()
|
|
|
assert "bounds-check" in harness_content or "pointer-check" in harness_content
|
|
|
|
|
|
def test_generate_overflow_detection_harness(self):
|
|
|
"""测试生成溢出检测harness"""
|
|
|
function_info = FunctionInfo(
|
|
|
name="overflow_function",
|
|
|
return_type="int",
|
|
|
parameters=[
|
|
|
ParameterInfo(name="a", type="int", is_pointer=False),
|
|
|
ParameterInfo(name="b", type="int", is_pointer=False)
|
|
|
],
|
|
|
source_location=SourceLocation(file="overflow.c", line=8, column=1),
|
|
|
complexity_score=0.6
|
|
|
)
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
self.config.working_directory = temp_dir
|
|
|
generator = HarnessGenerator(self.config)
|
|
|
|
|
|
harness_path, harness_metadata = generator.generate_harness(
|
|
|
function_info,
|
|
|
verification_types=[VerificationType.OVERFLOW_DETECTION]
|
|
|
)
|
|
|
|
|
|
assert VerificationType.OVERFLOW_DETECTION in harness_metadata.verification_goals
|
|
|
harness_content = Path(harness_path).read_text()
|
|
|
assert "overflow-check" in harness_content
|
|
|
|
|
|
def test_generate_pointer_validity_harness(self):
|
|
|
"""测试生成指针有效性harness"""
|
|
|
function_info = FunctionInfo(
|
|
|
name="pointer_function",
|
|
|
return_type="void",
|
|
|
parameters=[
|
|
|
ParameterInfo(name="ptr", type="int*", is_pointer=True),
|
|
|
ParameterInfo(name="count", type="int", is_pointer=False)
|
|
|
],
|
|
|
source_location=SourceLocation(file="pointer.c", line=12, column=1),
|
|
|
complexity_score=0.7
|
|
|
)
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
self.config.working_directory = temp_dir
|
|
|
generator = HarnessGenerator(self.config)
|
|
|
|
|
|
harness_path, harness_metadata = generator.generate_harness(
|
|
|
function_info,
|
|
|
verification_types=[VerificationType.POINTER_VALIDITY]
|
|
|
)
|
|
|
|
|
|
assert VerificationType.POINTER_VALIDITY in harness_metadata.verification_goals
|
|
|
harness_content = Path(harness_path).read_text()
|
|
|
assert "pointer-check" in harness_content
|
|
|
|
|
|
def test_generate_concurrency_harness(self):
|
|
|
"""测试生成并发性harness"""
|
|
|
function_info = FunctionInfo(
|
|
|
name="concurrent_function",
|
|
|
return_type="void",
|
|
|
parameters=[
|
|
|
ParameterInfo(name="shared_var", type="int*", is_pointer=True),
|
|
|
ParameterInfo(name="mutex", type="pthread_mutex_t*", is_pointer=True)
|
|
|
],
|
|
|
source_location=SourceLocation(file="concurrent.c", line=15, column=1),
|
|
|
complexity_score=0.9
|
|
|
)
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
self.config.working_directory = temp_dir
|
|
|
generator = HarnessGenerator(self.config)
|
|
|
|
|
|
harness_path, harness_metadata = generator.generate_harness(
|
|
|
function_info,
|
|
|
verification_types=[VerificationType.CONCURRENCY]
|
|
|
)
|
|
|
|
|
|
assert VerificationType.CONCURRENCY in harness_metadata.verification_goals
|
|
|
|
|
|
def test_generate_harness_with_custom_options(self):
|
|
|
"""测试生成自定义选项harness"""
|
|
|
function_info = FunctionInfo(
|
|
|
name="custom_function",
|
|
|
return_type="int",
|
|
|
parameters=[],
|
|
|
source_location=SourceLocation(file="custom.c", line=1, column=1),
|
|
|
complexity_score=0.4
|
|
|
)
|
|
|
|
|
|
custom_options = {
|
|
|
"unwind": "5",
|
|
|
"depth": "10",
|
|
|
"additional_includes": ["custom.h"]
|
|
|
}
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
self.config.working_directory = temp_dir
|
|
|
generator = HarnessGenerator(self.config)
|
|
|
|
|
|
harness_path, harness_metadata = generator.generate_harness(
|
|
|
function_info,
|
|
|
custom_options=custom_options
|
|
|
)
|
|
|
|
|
|
harness_content = Path(harness_path).read_text()
|
|
|
# 验证自定义选项被包含
|
|
|
assert "custom.h" in harness_content
|
|
|
|
|
|
def test_generate_harness_error_invalid_function(self):
|
|
|
"""测试无效函数错误"""
|
|
|
function_info = FunctionInfo(
|
|
|
name="",
|
|
|
return_type="",
|
|
|
parameters=[],
|
|
|
source_location=SourceLocation(file="", line=0, column=0),
|
|
|
complexity_score=0.0
|
|
|
)
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
self.config.working_directory = temp_dir
|
|
|
generator = HarnessGenerator(self.config)
|
|
|
|
|
|
with pytest.raises(HarnessError):
|
|
|
generator.generate_harness(function_info)
|
|
|
|
|
|
def test_validate_harness_syntax(self):
|
|
|
"""测试验证harness语法"""
|
|
|
function_info = FunctionInfo(
|
|
|
name="validate_function",
|
|
|
return_type="int",
|
|
|
parameters=[],
|
|
|
source_location=SourceLocation(file="validate.c", line=1, column=1),
|
|
|
complexity_score=0.3
|
|
|
)
|
|
|
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
self.config.working_directory = temp_dir
|
|
|
generator = HarnessGenerator(self.config)
|
|
|
|
|
|
harness_path, harness_metadata = generator.generate_harness(function_info)
|
|
|
is_valid = generator.validate_harness_syntax(harness_path)
|
|
|
assert is_valid
|
|
|
|
|
|
def test_setup_verification_environment(self):
|
|
|
"""测试设置验证环境"""
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
self.config.working_directory = temp_dir
|
|
|
generator = HarnessGenerator(self.config)
|
|
|
|
|
|
source_files = ["main.c", "utils.c"]
|
|
|
env_info = generator.setup_verification_environment(source_files)
|
|
|
|
|
|
assert env_info.source_files == source_files
|
|
|
assert env_info.environment_ready
|
|
|
|
|
|
|
|
|
class TestCBMCRunner:
|
|
|
"""CBMCRunner类测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.config = CBMCConfiguration(
|
|
|
cbmc_path="cbmc",
|
|
|
working_directory="/tmp",
|
|
|
default_timeout=30,
|
|
|
max_concurrent_jobs=2
|
|
|
)
|
|
|
self.runner = CBMCRunner(self.config)
|
|
|
|
|
|
def test_init(self):
|
|
|
"""测试初始化"""
|
|
|
assert self.runner.config == self.config
|
|
|
assert self.runner.active_sessions == {}
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
async def test_run_verification_success(self):
|
|
|
"""测试成功运行验证"""
|
|
|
function_metadata = {"name": "test_function"}
|
|
|
source_file = "test.c"
|
|
|
specification = "void test_function() { }"
|
|
|
|
|
|
with patch.object(self.runner, '_execute_cbmc_command') as mock_execute:
|
|
|
mock_execute.return_value = "Verification result: SUCCESSFUL"
|
|
|
|
|
|
result = await self.runner.run_verification(
|
|
|
function_metadata, source_file, specification
|
|
|
)
|
|
|
|
|
|
assert result.status == VerificationStatus.SUCCESSFUL
|
|
|
mock_execute.assert_called_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
async def test_run_verification_timeout(self):
|
|
|
"""测试验证超时"""
|
|
|
function_metadata = {"name": "test_function"}
|
|
|
source_file = "test.c"
|
|
|
specification = "void test_function() { }"
|
|
|
|
|
|
with patch.object(self.runner, '_execute_cbmc_command') as mock_execute:
|
|
|
mock_execute.side_effect = asyncio.TimeoutError()
|
|
|
|
|
|
with pytest.raises(CBMCTimeoutError):
|
|
|
await self.runner.run_verification(
|
|
|
function_metadata, source_file, specification, timeout=1
|
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
async def test_run_verification_cbmc_error(self):
|
|
|
"""测试CBMC错误"""
|
|
|
function_metadata = {"name": "test_function"}
|
|
|
source_file = "test.c"
|
|
|
specification = "void test_function() { }"
|
|
|
|
|
|
with patch.object(self.runner, '_execute_cbmc_command') as mock_execute:
|
|
|
mock_execute.side_effect = subprocess.CalledProcessError(1, "cbmc")
|
|
|
|
|
|
with pytest.raises(VerificationError):
|
|
|
await self.runner.run_verification(function_metadata, source_file, specification)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
async def test_run_verification_concurrent_limit(self):
|
|
|
"""测试并发限制"""
|
|
|
function_metadata = {"name": "test_function"}
|
|
|
source_file = "test.c"
|
|
|
specification = "void test_function() { }"
|
|
|
|
|
|
# 填满活跃会话
|
|
|
for i in range(self.config.max_concurrent_jobs):
|
|
|
self.runner.active_sessions[f"session_{i}"] = Mock()
|
|
|
|
|
|
with pytest.raises(ConcurrencyError):
|
|
|
await self.runner.run_verification(function_metadata, source_file, specification)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
async def test_stream_verification_results(self):
|
|
|
"""测试流式验证结果"""
|
|
|
function_metadata = {"name": "test_function"}
|
|
|
source_file = "test.c"
|
|
|
specification = "void test_function() { }"
|
|
|
|
|
|
async def mock_stream():
|
|
|
outputs = ["Parsing...", "Generating...", "Verification result: SUCCESSFUL"]
|
|
|
for output in outputs:
|
|
|
yield output
|
|
|
await asyncio.sleep(0.01)
|
|
|
|
|
|
with patch.object(self.runner, '_stream_cbmc_output') as mock_stream:
|
|
|
mock_stream.return_value = mock_stream()
|
|
|
|
|
|
results = []
|
|
|
async for result in self.runner.stream_verification(
|
|
|
function_metadata, source_file, specification
|
|
|
):
|
|
|
results.append(result)
|
|
|
|
|
|
assert len(results) == 3
|
|
|
assert "SUCCESSFUL" in results[-1]
|
|
|
|
|
|
def test_create_verification_session(self):
|
|
|
"""测试创建验证会话"""
|
|
|
function_metadata = {"name": "test_function"}
|
|
|
source_files = ["test.c"]
|
|
|
verification_types = [VerificationType.MEMORY_SAFETY]
|
|
|
|
|
|
session = self.runner._create_verification_session(
|
|
|
function_metadata, source_files, verification_types
|
|
|
)
|
|
|
|
|
|
assert session.function_metadata == function_metadata
|
|
|
assert session.source_files == source_files
|
|
|
assert session.verification_types == verification_types
|
|
|
assert session.session_id is not None
|
|
|
assert session.created_at is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
async def test_execute_cbmc_command_success(self):
|
|
|
"""测试执行CBMC命令成功"""
|
|
|
command = ["cbmc", "test.c"]
|
|
|
|
|
|
with patch('asyncio.create_subprocess_exec') as mock_create:
|
|
|
mock_process = Mock()
|
|
|
mock_process.returncode = 0
|
|
|
mock_process.stdout.read.return_value = b"Verification result: SUCCESSFUL"
|
|
|
mock_process.stderr.read.return_value = b""
|
|
|
mock_create.return_value.__aenter__.return_value = mock_process
|
|
|
|
|
|
result = await self.runner._execute_cbmc_verification(command, "test_function", "test.c", "test.c", time.time())
|
|
|
assert "SUCCESSFUL" in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
async def test_execute_cbmc_command_failure(self):
|
|
|
"""测试执行CBMC命令失败"""
|
|
|
command = ["cbmc", "test.c"]
|
|
|
|
|
|
with patch('asyncio.create_subprocess_exec') as mock_create:
|
|
|
mock_process = Mock()
|
|
|
mock_process.returncode = 1
|
|
|
mock_process.stdout.read.return_value = b"ERROR: Test error"
|
|
|
mock_process.stderr.read.return_value = b"Error details"
|
|
|
mock_create.return_value.__aenter__.return_value = mock_process
|
|
|
|
|
|
with pytest.raises(subprocess.CalledProcessError):
|
|
|
await self.runner._execute_cbmc_verification(command, "test_function", "test.c", "test.c", time.time())
|
|
|
|
|
|
def test_check_cbmc_availability(self):
|
|
|
"""测试检查CBMC可用性"""
|
|
|
with patch('subprocess.run') as mock_run:
|
|
|
mock_run.return_value.returncode = 0
|
|
|
mock_run.return_value.stdout = b"CBMC version 5.72.0"
|
|
|
|
|
|
result = self.runner._check_cbmc_availability()
|
|
|
assert result is True
|
|
|
|
|
|
def test_check_cbmc_unavailable(self):
|
|
|
"""测试CBMC不可用"""
|
|
|
with patch('subprocess.run') as mock_run:
|
|
|
mock_run.side_effect = FileNotFoundError()
|
|
|
|
|
|
with pytest.raises(CBMCInstallationError):
|
|
|
self.runner._check_cbmc_availability()
|
|
|
|
|
|
def test_cleanup_session(self):
|
|
|
"""测试清理会话"""
|
|
|
session_id = "test_session"
|
|
|
self.runner.active_sessions[session_id] = Mock()
|
|
|
|
|
|
self.runner._cleanup_session(session_id)
|
|
|
assert session_id not in self.runner.active_sessions
|
|
|
|
|
|
def test_get_active_sessions(self):
|
|
|
"""测试获取活跃会话"""
|
|
|
session1 = Mock()
|
|
|
session2 = Mock()
|
|
|
self.runner.active_sessions["session1"] = session1
|
|
|
self.runner.active_sessions["session2"] = session2
|
|
|
|
|
|
sessions = self.runner.get_active_sessions()
|
|
|
assert len(sessions) == 2
|
|
|
assert session1 in sessions
|
|
|
assert session2 in sessions
|
|
|
|
|
|
def test_get_session_statistics(self):
|
|
|
"""测试获取会话统计"""
|
|
|
session1 = Mock()
|
|
|
session1.verification_results = [Mock(status=VerificationStatus.SUCCESSFUL)]
|
|
|
session2 = Mock()
|
|
|
session2.verification_results = [Mock(status=VerificationStatus.FAILED)]
|
|
|
|
|
|
self.runner.active_sessions["session1"] = session1
|
|
|
self.runner.active_sessions["session2"] = session2
|
|
|
|
|
|
stats = self.runner.get_session_statistics()
|
|
|
assert stats['total_sessions'] == 2
|
|
|
assert stats['successful_sessions'] == 1
|
|
|
assert stats['failed_sessions'] == 1
|
|
|
|
|
|
|
|
|
class TestVerifyModuleIntegration:
|
|
|
"""Verify模块集成测试"""
|
|
|
|
|
|
def setup_method(self):
|
|
|
"""测试方法设置"""
|
|
|
self.config = CBMCConfiguration(
|
|
|
cbmc_path="cbmc",
|
|
|
working_directory="/tmp",
|
|
|
default_timeout=30,
|
|
|
max_concurrent_jobs=2
|
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
async def test_complete_verification_workflow(self):
|
|
|
"""测试完整验证工作流"""
|
|
|
# 创建模拟的函数元数据
|
|
|
function_metadata = {
|
|
|
"name": "test_function",
|
|
|
"parameters": [
|
|
|
{"name": "a", "type": "int"},
|
|
|
{"name": "b", "type": "int"}
|
|
|
],
|
|
|
"return_type": "int"
|
|
|
}
|
|
|
|
|
|
# 创建CommandBuilder
|
|
|
builder = CommandBuilder(self.config)
|
|
|
|
|
|
# 创建HarnessGenerator
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
self.config.working_directory = temp_dir
|
|
|
generator = HarnessGenerator(self.config)
|
|
|
|
|
|
# 创建FunctionInfo
|
|
|
function_info = FunctionInfo(
|
|
|
name="test_function",
|
|
|
return_type="int",
|
|
|
parameters=[
|
|
|
ParameterInfo(name="a", type="int", is_pointer=False),
|
|
|
ParameterInfo(name="b", type="int", is_pointer=False)
|
|
|
],
|
|
|
source_location=SourceLocation(file="test.c", line=1, column=1),
|
|
|
complexity_score=0.5
|
|
|
)
|
|
|
|
|
|
# 生成harness
|
|
|
harness_path, harness_metadata = generator.generate_harness(function_info)
|
|
|
assert Path(harness_path).exists()
|
|
|
|
|
|
# 构建CBMC命令
|
|
|
command = builder.build_command(
|
|
|
harness_path,
|
|
|
verification_types=[VerificationType.MEMORY_SAFETY]
|
|
|
)
|
|
|
assert command[0] == "cbmc"
|
|
|
|
|
|
# 创建CBMCRunner并模拟运行
|
|
|
runner = CBMCRunner(self.config)
|
|
|
|
|
|
with patch.object(runner, '_execute_cbmc_command') as mock_execute:
|
|
|
mock_execute.return_value = "Verification result: SUCCESSFUL"
|
|
|
|
|
|
result = await runner.run_verification(
|
|
|
function_metadata,
|
|
|
"test.c",
|
|
|
harness_metadata.harness_file.read_text()
|
|
|
)
|
|
|
|
|
|
assert result.status == VerificationStatus.SUCCESSFUL
|
|
|
|
|
|
def test_error_handling_integration(self):
|
|
|
"""测试错误处理集成"""
|
|
|
builder = CommandBuilder(self.config)
|
|
|
|
|
|
# 测试命令构建错误
|
|
|
with pytest.raises(CommandBuildError):
|
|
|
builder.build_command("/nonexistent/file.c")
|
|
|
|
|
|
# 测试结果解析错误
|
|
|
parser = ResultParser()
|
|
|
with pytest.raises(ResultParseError):
|
|
|
parser.parse_result("")
|
|
|
|
|
|
# 测试harness生成错误
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
self.config.working_directory = temp_dir
|
|
|
generator = HarnessGenerator(self.config)
|
|
|
|
|
|
invalid_function = FunctionInfo(
|
|
|
name="",
|
|
|
return_type="",
|
|
|
parameters=[],
|
|
|
source_location=SourceLocation(file="", line=0, column=0),
|
|
|
complexity_score=0.0
|
|
|
)
|
|
|
|
|
|
with pytest.raises(HarnessError):
|
|
|
generator.generate_harness(invalid_function)
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
pytest.main([__file__, "-v"]) |