You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cbmc/codedetect/tests/utils/cbmc_spec_validator.py

673 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
CBMC规范验证器
本模块提供CBMC规范验证功能包括语法正确性检查、逻辑一致性验证、
完整性评估和质量度量。用于测试生成的CBMC规范的有效性。
"""
import re
import logging
from typing import Dict, List, Any, Optional, Tuple, Set
from dataclasses import dataclass
from enum import Enum
import json
logger = logging.getLogger(__name__)
class ValidationLevel(Enum):
"""验证级别"""
SYNTAX = "syntax" # 语法正确性
LOGIC = "logic" # 逻辑一致性
COMPLETENESS = "completeness" # 完整性
QUALITY = "quality" # 质量度量
class ValidationResult:
"""验证结果数据类"""
def __init__(self, spec: str, function_info: Dict[str, Any]):
self.spec = spec
self.function_info = function_info
self.is_valid = True
self.errors: List[str] = []
self.warnings: List[str] = []
self.suggestions: List[str] = []
self.quality_score = 0.0
self.validation_results: Dict[str, Any] = {}
self.metrics: Dict[str, Any] = {}
def add_error(self, error: str):
"""添加错误"""
self.errors.append(error)
self.is_valid = False
def add_warning(self, warning: str):
"""添加警告"""
self.warnings.append(warning)
def add_suggestion(self, suggestion: str):
"""添加建议"""
self.suggestions.append(suggestion)
def calculate_quality_score(self):
"""计算质量分数"""
# 基础分数
base_score = 1.0
# 错误扣分
error_penalty = len(self.errors) * 0.3
warning_penalty = len(self.warnings) * 0.1
# 奖励因素
completeness_bonus = self._calculate_completeness_bonus()
syntax_bonus = self._calculate_syntax_bonus()
self.quality_score = max(0.0, base_score - error_penalty - warning_penalty + completeness_bonus + syntax_bonus)
def _calculate_completeness_bonus(self) -> float:
"""计算完整性奖励"""
bonus = 0.0
# 检查必需的CBMC子句
required_clauses = ['requires', 'ensures']
found_clauses = self.validation_results.get('found_clauses', [])
for clause in required_clauses:
if clause in found_clauses:
bonus += 0.1
# 检查参数覆盖
params = self.function_info.get('parameters', [])
if len(params) > 0:
param_coverage = self.validation_results.get('parameter_coverage', 0)
bonus += (param_coverage / len(params)) * 0.2
return min(bonus, 0.5)
def _calculate_syntax_bonus(self) -> float:
"""计算语法奖励"""
bonus = 0.0
# 检查语法正确性
syntax_valid = self.validation_results.get('syntax_valid', False)
if syntax_valid:
bonus += 0.2
# 检查格式规范性
format_score = self.validation_results.get('format_score', 0)
bonus += format_score * 0.1
return min(bonus, 0.3)
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'spec': self.spec,
'is_valid': self.is_valid,
'errors': self.errors,
'warnings': self.warnings,
'suggestions': self.suggestions,
'quality_score': self.quality_score,
'validation_results': self.validation_results,
'metrics': self.metrics
}
@dataclass
class ValidationRule:
"""验证规则数据类"""
name: str
description: str
level: ValidationLevel
check_function: callable
enabled: bool = True
class CBMCSpecificationValidator:
"""CBMC规范验证器"""
def __init__(self):
self.rules: List[ValidationRule] = []
self.logger = logger
self._initialize_rules()
def _initialize_rules(self):
"""初始化验证规则"""
# 语法规则
self.rules.extend([
ValidationRule(
name="cbmc_syntax",
description="CBMC语法正确性",
level=ValidationLevel.SYNTAX,
check_function=self._check_cbmc_syntax
),
ValidationRule(
name="clause_format",
description="子句格式正确性",
level=ValidationLevel.SYNTAX,
check_function=self._check_clause_format
),
ValidationRule(
name="escape_sequences",
description="转义序列正确性",
level=ValidationLevel.SYNTAX,
check_function=self._check_escape_sequences
)
])
# 逻辑规则
self.rules.extend([
ValidationRule(
name="logic_consistency",
description="逻辑一致性",
level=ValidationLevel.LOGIC,
check_function=self._check_logic_consistency
),
ValidationRule(
name="parameter_usage",
description="参数使用一致性",
level=ValidationLevel.LOGIC,
check_function=self._check_parameter_usage
),
ValidationRule(
name="return_value_consistency",
description="返回值一致性",
level=ValidationLevel.LOGIC,
check_function=self._check_return_value_consistency
)
])
# 完整性规则
self.rules.extend([
ValidationRule(
name="required_clauses",
description="必需子句存在性",
level=ValidationLevel.COMPLETENESS,
check_function=self._check_required_clauses
),
ValidationRule(
name="parameter_coverage",
description="参数覆盖度",
level=ValidationLevel.COMPLETENESS,
check_function=self._check_parameter_coverage
),
ValidationRule(
name="memory_safety_coverage",
description="内存安全覆盖度",
level=ValidationLevel.COMPLETENESS,
check_function=self._check_memory_safety_coverage
)
])
# 质量规则
self.rules.extend([
ValidationRule(
name="specification_clarity",
description="规范清晰度",
level=ValidationLevel.QUALITY,
check_function=self._check_specification_clarity
),
ValidationRule(
name="domain_specific_rules",
description="领域特定规则",
level=ValidationLevel.QUALITY,
check_function=self._check_domain_specific_rules
),
ValidationRule(
name="best_practices",
description="最佳实践遵循度",
level=ValidationLevel.QUALITY,
check_function=self._check_best_practices
)
])
def validate_specification(self, spec: str, function_info: Dict[str, Any]) -> ValidationResult:
"""验证CBMC规范"""
result = ValidationResult(spec, function_info)
if not spec or not spec.strip():
result.add_error("Specification is empty")
return result
try:
# 应用所有启用的规则
for rule in self.rules:
if rule.enabled:
try:
rule_result = rule.check_function(spec, function_info)
if isinstance(rule_result, dict):
result.validation_results[rule.name] = rule_result
elif isinstance(rule_result, (list, tuple)):
errors, warnings, suggestions = self._process_rule_result(rule_result)
result.errors.extend(errors)
result.warnings.extend(warnings)
result.suggestions.extend(suggestions)
except Exception as e:
self.logger.error(f"Error applying rule {rule.name}: {e}")
result.add_warning(f"Validation rule {rule.name} failed: {str(e)}")
# 计算质量分数
result.calculate_quality_score()
# 计算指标
result.metrics = self._calculate_metrics(result)
self.logger.info(f"Validation completed for {function_info.get('name', 'unknown')}: "
f"quality={result.quality_score:.2f}, errors={len(result.errors)}")
except Exception as e:
result.add_error(f"Validation failed: {str(e)}")
self.logger.error(f"Validation error: {e}")
return result
def _process_rule_result(self, rule_result) -> Tuple[List[str], List[str], List[str]]:
"""处理规则结果"""
if len(rule_result) == 3:
return rule_result # (errors, warnings, suggestions)
elif len(rule_result) == 2:
return rule_result[0], rule_result[1], [] # (errors, warnings)
elif len(rule_result) == 1:
return rule_result[0], [], [] # (errors,)
else:
return [], [], []
def _calculate_metrics(self, result: ValidationResult) -> Dict[str, Any]:
"""计算验证指标"""
spec_lines = result.spec.split('\\n')
spec_text = result.spec.lower()
return {
'spec_length': len(result.spec),
'line_count': len(spec_lines),
'clause_count': len([line for line in spec_lines if line.strip().startswith('\\')]),
'complexity_score': self._calculate_complexity_score(result.spec),
'readability_score': self._calculate_readability_score(result.spec),
'has_requires': '\\requires' in spec_text,
'has_ensures': '\\ensures' in spec_text,
'has_assigns': '\\assigns' in spec_text,
'has_valid': '\\valid' in spec_text
}
def _calculate_complexity_score(self, spec: str) -> float:
"""计算复杂度分数"""
# 基于逻辑运算符、量化符等的数量
operators = ['&&', '\\|\\|', '==', '!=', '<', '>', '<=', '>=']
quantifiers = ['\\forall', '\\exists']
operator_count = sum(spec.count(op) for op in operators)
quantifier_count = sum(spec.count(q) for q in quantifiers)
# 简单的复杂度计算
complexity = (operator_count * 0.1) + (quantifier_count * 0.2)
return min(complexity, 1.0)
def _calculate_readability_score(self, spec: str) -> float:
"""计算可读性分数"""
lines = spec.split('\\n')
if not lines:
return 0.0
# 检查格式规范性和注释
formatted_lines = sum(1 for line in lines if line.strip().startswith('\\'))
total_lines = len([line for line in lines if line.strip()])
if total_lines == 0:
return 0.0
return formatted_lines / total_lines
# 语法检查函数
def _check_cbmc_syntax(self, spec: str, function_info: Dict[str, Any]) -> Dict[str, Any]:
"""检查CBMC语法正确性"""
errors = []
warnings = []
suggestions = []
lines = spec.split('\\n')
found_clauses = []
for i, line in enumerate(lines):
line = line.strip()
if not line:
continue
# 检查CBMC子句格式
if line.startswith('\\'):
if not any(line.startswith(clause) for clause in [
'\\requires', '\\ensures', '\\assigns', '\\valid',
'\\forall', '\\exists', '\\invariant', '\\axiom',
'\\behavior', '\\complete', '\\disjoint'
]):
warnings.append(f"Line {i+1}: Unknown CBMC clause type")
else:
warnings.append(f"Line {i+1}: Line should start with CBMC clause (\\)")
# 收集找到的子句
if line.startswith('\\requires'):
found_clauses.append('requires')
elif line.startswith('\\ensures'):
found_clauses.append('ensures')
elif line.startswith('\\assigns'):
found_clauses.append('assigns')
return {
'syntax_valid': len(errors) == 0,
'found_clauses': found_clauses,
'format_score': 1.0 - (len(warnings) / max(len(lines), 1))
}
def _check_clause_format(self, spec: str, function_info: Dict[str, Any]) -> List[str]:
"""检查子句格式"""
errors = []
lines = spec.split('\\n')
for i, line in enumerate(lines):
line = line.strip()
if not line or not line.startswith('\\'):
continue
# 检查子句结构
clause_match = re.match(r'\\([a-zA-Z_]+)', line)
if not clause_match:
errors.append(f"Line {i+1}: Invalid clause format")
return errors
def _check_escape_sequences(self, spec: str, function_info: Dict[str, Any]) -> List[str]:
"""检查转义序列"""
errors = []
# 检查双重反斜杠
if '\\\\' in spec:
errors.append("Invalid double backslash escape sequence")
return errors
# 逻辑检查函数
def _check_logic_consistency(self, spec: str, function_info: Dict[str, Any]) -> List[str]:
"""检查逻辑一致性"""
errors = []
warnings = []
# 检查常见的逻辑错误
if 'a == a' in spec:
warnings.append("Tautology detected: a == a")
if 'a != a' in spec:
errors.append("Contradiction detected: a != a")
return errors
def _check_parameter_usage(self, spec: str, function_info: Dict[str, Any]) -> List[str]:
"""检查参数使用一致性"""
warnings = []
suggestions = []
params = function_info.get('parameters', [])
param_names = [p['name'] for p in params if 'name' in p]
for param_name in param_names:
if param_name not in spec:
suggestions.append(f"Parameter '{param_name}' not used in specification")
return [], warnings, suggestions
def _check_return_value_consistency(self, spec: str, function_info: Dict[str, Any]) -> List[str]:
"""检查返回值一致性"""
errors = []
warnings = []
return_type = function_info.get('return_type', 'void')
has_ensures = '\\ensures' in spec
if return_type != 'void' and not has_ensures:
warnings.append("Non-void function should have ensures clause")
if return_type == 'void' and '\\return' in spec:
errors.append("Void function should not have return value specification")
return errors
# 完整性检查函数
def _check_required_clauses(self, spec: str, function_info: Dict[str, Any]) -> List[str]:
"""检查必需子句"""
warnings = []
suggestions = []
required_clauses = ['requires', 'ensures']
missing_clauses = []
for clause in required_clauses:
if f'\\{clause}' not in spec:
missing_clauses.append(clause)
if missing_clauses:
suggestions.append(f"Consider adding missing clauses: {', '.join(missing_clauses)}")
return [], warnings, suggestions
def _check_parameter_coverage(self, spec: str, function_info: Dict[str, Any]) -> Dict[str, Any]:
"""检查参数覆盖度"""
params = function_info.get('parameters', [])
param_names = [p['name'] for p in params if 'name' in p]
covered_params = []
for param_name in param_names:
if param_name in spec:
covered_params.append(param_name)
coverage_count = len(covered_params)
total_count = len(param_names)
return {
'covered_parameters': covered_params,
'total_parameters': total_count,
'parameter_coverage': coverage_count,
'coverage_percentage': (coverage_count / total_count * 100) if total_count > 0 else 0
}
def _check_memory_safety_coverage(self, spec: str, function_info: Dict[str, Any]) -> List[str]:
"""检查内存安全覆盖度"""
suggestions = []
# 检查指针参数
has_pointers = any(p.get('is_pointer', False) for p in function_info.get('parameters', []))
if has_pointers and '\\valid' not in spec:
suggestions.append("Consider adding validity checks for pointer parameters")
# 检查数组参数
has_arrays = any(p.get('is_array', False) for p in function_info.get('parameters', []))
if has_arrays and 'bounds' not in spec.lower():
suggestions.append("Consider adding array bounds checks")
return [], [], suggestions
# 质量检查函数
def _check_specification_clarity(self, spec: str, function_info: Dict[str, Any]) -> List[str]:
"""检查规范清晰度"""
suggestions = []
# 检查规范长度
if len(spec) < 50:
suggestions.append("Specification is very short, consider adding more details")
if len(spec) > 1000:
suggestions.append("Specification is very long, consider simplifying")
# 检查注释
if '/*' not in spec and '//' not in spec:
suggestions.append("Consider adding comments to explain complex conditions")
return [], [], suggestions
def _check_domain_specific_rules(self, spec: str, function_info: Dict[str, Any]) -> List[str]:
"""检查领域特定规则"""
suggestions = []
# 检查FreeRTOS特定规则
if 'task' in function_info.get('name', '').lower():
if 'freertos' not in spec.lower():
suggestions.append("Consider adding FreeRTOS-specific safety constraints")
# 检查嵌入式系统规则
params = function_info.get('parameters', [])
for param in params:
if param.get('type', '').endswith('*') and 'null' not in spec.lower():
suggestions.append("Consider adding null pointer checks")
return [], [], suggestions
def _check_best_practices(self, spec: str, function_info: Dict[str, Any]) -> List[str]:
"""检查最佳实践遵循度"""
suggestions = []
# 检查变量命名
if 'temp' in spec.lower():
suggestions.append("Avoid using 'temp' variable names in specifications")
# 检查硬编码值
if re.search(r'\\b\\d+\\b', spec):
suggestions.append("Consider using named constants instead of hardcoded values")
# 检查复杂条件
if '&&' in spec and spec.count('&&') > 3:
suggestions.append("Consider breaking down complex conditions into simpler ones")
return [], [], suggestions
def validate_batch(self, specs: List[Tuple[str, Dict[str, Any]]]) -> List[ValidationResult]:
"""批量验证规范"""
results = []
for spec, function_info in specs:
result = self.validate_specification(spec, function_info)
results.append(result)
return results
def generate_validation_report(self, results: List[ValidationResult]) -> Dict[str, Any]:
"""生成验证报告"""
if not results:
return {'error': 'No validation results'}
total_specs = len(results)
valid_specs = sum(1 for r in results if r.is_valid)
invalid_specs = total_specs - valid_specs
# 质量分布
quality_scores = [r.quality_score for r in results]
avg_quality = sum(quality_scores) / len(quality_scores) if quality_scores else 0.0
quality_distribution = {
'excellent': len([s for s in quality_scores if s >= 0.9]),
'good': len([s for s in quality_scores if 0.7 <= s < 0.9]),
'fair': len([s for s in quality_scores if 0.5 <= s < 0.7]),
'poor': len([s for s in quality_scores if s < 0.5])
}
# 错误和警告统计
total_errors = sum(len(r.errors) for r in results)
total_warnings = sum(len(r.warnings) for r in results)
total_suggestions = sum(len(r.suggestions) for r in results)
# 常见错误
common_errors = {}
for result in results:
for error in result.errors:
common_errors[error] = common_errors.get(error, 0) + 1
# 按函数分组
function_results = {}
for result in results:
func_name = result.function_info.get('name', 'unknown')
if func_name not in function_results:
function_results[func_name] = []
function_results[func_name].append(result)
report = {
'summary': {
'total_specifications': total_specs,
'valid_specifications': valid_specs,
'invalid_specifications': invalid_specs,
'success_rate': valid_specs / total_specs if total_specs > 0 else 0,
'average_quality': avg_quality,
'total_errors': total_errors,
'total_warnings': total_warnings,
'total_suggestions': total_suggestions
},
'quality_distribution': quality_distribution,
'common_errors': dict(sorted(common_errors.items(), key=lambda x: x[1], reverse=True)[:10]),
'function_results': {
name: {
'count': len(results),
'avg_quality': sum(r.quality_score for r in results) / len(results),
'valid_count': sum(1 for r in results if r.is_valid)
}
for name, results in function_results.items()
},
'detailed_results': [result.to_dict() for result in results]
}
return report
def save_validation_report(self, report: Dict[str, Any], output_path: str):
"""保存验证报告"""
try:
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
self.logger.info(f"Validation report saved to: {output_path}")
except Exception as e:
self.logger.error(f"Failed to save validation report: {e}")
def enable_rule(self, rule_name: str):
"""启用验证规则"""
for rule in self.rules:
if rule.name == rule_name:
rule.enabled = True
self.logger.info(f"Enabled rule: {rule_name}")
return
self.logger.warning(f"Rule not found: {rule_name}")
def disable_rule(self, rule_name: str):
"""禁用验证规则"""
for rule in self.rules:
if rule.name == rule_name:
rule.enabled = False
self.logger.info(f"Disabled rule: {rule_name}")
return
self.logger.warning(f"Rule not found: {rule_name}")
def get_rules_info(self) -> List[Dict[str, Any]]:
"""获取规则信息"""
return [
{
'name': rule.name,
'description': rule.description,
'level': rule.level.value,
'enabled': rule.enabled
}
for rule in self.rules
]
# 便捷函数
def validate_cbmc_specification(spec: str, function_info: Dict[str, Any]) -> ValidationResult:
"""验证CBMC规范的便捷函数"""
validator = CBMCSpecificationValidator()
return validator.validate_specification(spec, function_info)
def validate_cbmc_specifications_batch(specs: List[Tuple[str, Dict[str, Any]]]) -> List[ValidationResult]:
"""批量验证CBMC规范的便捷函数"""
validator = CBMCSpecificationValidator()
return validator.validate_batch(specs)
def generate_validation_summary(results: List[ValidationResult]) -> Dict[str, Any]:
"""生成验证摘要的便捷函数"""
validator = CBMCSpecificationValidator()
return validator.generate_validation_report(results)