""" CBMC规范验证器 本模块提供CBMC规范验证功能,包括语法正确性检查、逻辑一致性验证、 完整性评估和质量度量。用于测试生成的CBMC规范的有效性。 """ import re import logging from typing import Dict, List, Any, Optional, Tuple, Set from dataclasses import dataclass from enum import Enum import json logger = logging.getLogger(__name__) class ValidationLevel(Enum): """验证级别""" SYNTAX = "syntax" # 语法正确性 LOGIC = "logic" # 逻辑一致性 COMPLETENESS = "completeness" # 完整性 QUALITY = "quality" # 质量度量 class ValidationResult: """验证结果数据类""" def __init__(self, spec: str, function_info: Dict[str, Any]): self.spec = spec self.function_info = function_info self.is_valid = True self.errors: List[str] = [] self.warnings: List[str] = [] self.suggestions: List[str] = [] self.quality_score = 0.0 self.validation_results: Dict[str, Any] = {} self.metrics: Dict[str, Any] = {} def add_error(self, error: str): """添加错误""" self.errors.append(error) self.is_valid = False def add_warning(self, warning: str): """添加警告""" self.warnings.append(warning) def add_suggestion(self, suggestion: str): """添加建议""" self.suggestions.append(suggestion) def calculate_quality_score(self): """计算质量分数""" # 基础分数 base_score = 1.0 # 错误扣分 error_penalty = len(self.errors) * 0.3 warning_penalty = len(self.warnings) * 0.1 # 奖励因素 completeness_bonus = self._calculate_completeness_bonus() syntax_bonus = self._calculate_syntax_bonus() self.quality_score = max(0.0, base_score - error_penalty - warning_penalty + completeness_bonus + syntax_bonus) def _calculate_completeness_bonus(self) -> float: """计算完整性奖励""" bonus = 0.0 # 检查必需的CBMC子句 required_clauses = ['requires', 'ensures'] found_clauses = self.validation_results.get('found_clauses', []) for clause in required_clauses: if clause in found_clauses: bonus += 0.1 # 检查参数覆盖 params = self.function_info.get('parameters', []) if len(params) > 0: param_coverage = self.validation_results.get('parameter_coverage', 0) bonus += (param_coverage / len(params)) * 0.2 return min(bonus, 0.5) def _calculate_syntax_bonus(self) -> float: """计算语法奖励""" bonus = 0.0 # 检查语法正确性 syntax_valid = self.validation_results.get('syntax_valid', False) if syntax_valid: bonus += 0.2 # 检查格式规范性 format_score = self.validation_results.get('format_score', 0) bonus += format_score * 0.1 return min(bonus, 0.3) def to_dict(self) -> Dict[str, Any]: """转换为字典""" return { 'spec': self.spec, 'is_valid': self.is_valid, 'errors': self.errors, 'warnings': self.warnings, 'suggestions': self.suggestions, 'quality_score': self.quality_score, 'validation_results': self.validation_results, 'metrics': self.metrics } @dataclass class ValidationRule: """验证规则数据类""" name: str description: str level: ValidationLevel check_function: callable enabled: bool = True class CBMCSpecificationValidator: """CBMC规范验证器""" def __init__(self): self.rules: List[ValidationRule] = [] self.logger = logger self._initialize_rules() def _initialize_rules(self): """初始化验证规则""" # 语法规则 self.rules.extend([ ValidationRule( name="cbmc_syntax", description="CBMC语法正确性", level=ValidationLevel.SYNTAX, check_function=self._check_cbmc_syntax ), ValidationRule( name="clause_format", description="子句格式正确性", level=ValidationLevel.SYNTAX, check_function=self._check_clause_format ), ValidationRule( name="escape_sequences", description="转义序列正确性", level=ValidationLevel.SYNTAX, check_function=self._check_escape_sequences ) ]) # 逻辑规则 self.rules.extend([ ValidationRule( name="logic_consistency", description="逻辑一致性", level=ValidationLevel.LOGIC, check_function=self._check_logic_consistency ), ValidationRule( name="parameter_usage", description="参数使用一致性", level=ValidationLevel.LOGIC, check_function=self._check_parameter_usage ), ValidationRule( name="return_value_consistency", description="返回值一致性", level=ValidationLevel.LOGIC, check_function=self._check_return_value_consistency ) ]) # 完整性规则 self.rules.extend([ ValidationRule( name="required_clauses", description="必需子句存在性", level=ValidationLevel.COMPLETENESS, check_function=self._check_required_clauses ), ValidationRule( name="parameter_coverage", description="参数覆盖度", level=ValidationLevel.COMPLETENESS, check_function=self._check_parameter_coverage ), ValidationRule( name="memory_safety_coverage", description="内存安全覆盖度", level=ValidationLevel.COMPLETENESS, check_function=self._check_memory_safety_coverage ) ]) # 质量规则 self.rules.extend([ ValidationRule( name="specification_clarity", description="规范清晰度", level=ValidationLevel.QUALITY, check_function=self._check_specification_clarity ), ValidationRule( name="domain_specific_rules", description="领域特定规则", level=ValidationLevel.QUALITY, check_function=self._check_domain_specific_rules ), ValidationRule( name="best_practices", description="最佳实践遵循度", level=ValidationLevel.QUALITY, check_function=self._check_best_practices ) ]) def validate_specification(self, spec: str, function_info: Dict[str, Any]) -> ValidationResult: """验证CBMC规范""" result = ValidationResult(spec, function_info) if not spec or not spec.strip(): result.add_error("Specification is empty") return result try: # 应用所有启用的规则 for rule in self.rules: if rule.enabled: try: rule_result = rule.check_function(spec, function_info) if isinstance(rule_result, dict): result.validation_results[rule.name] = rule_result elif isinstance(rule_result, (list, tuple)): errors, warnings, suggestions = self._process_rule_result(rule_result) result.errors.extend(errors) result.warnings.extend(warnings) result.suggestions.extend(suggestions) except Exception as e: self.logger.error(f"Error applying rule {rule.name}: {e}") result.add_warning(f"Validation rule {rule.name} failed: {str(e)}") # 计算质量分数 result.calculate_quality_score() # 计算指标 result.metrics = self._calculate_metrics(result) self.logger.info(f"Validation completed for {function_info.get('name', 'unknown')}: " f"quality={result.quality_score:.2f}, errors={len(result.errors)}") except Exception as e: result.add_error(f"Validation failed: {str(e)}") self.logger.error(f"Validation error: {e}") return result def _process_rule_result(self, rule_result) -> Tuple[List[str], List[str], List[str]]: """处理规则结果""" if len(rule_result) == 3: return rule_result # (errors, warnings, suggestions) elif len(rule_result) == 2: return rule_result[0], rule_result[1], [] # (errors, warnings) elif len(rule_result) == 1: return rule_result[0], [], [] # (errors,) else: return [], [], [] def _calculate_metrics(self, result: ValidationResult) -> Dict[str, Any]: """计算验证指标""" spec_lines = result.spec.split('\\n') spec_text = result.spec.lower() return { 'spec_length': len(result.spec), 'line_count': len(spec_lines), 'clause_count': len([line for line in spec_lines if line.strip().startswith('\\')]), 'complexity_score': self._calculate_complexity_score(result.spec), 'readability_score': self._calculate_readability_score(result.spec), 'has_requires': '\\requires' in spec_text, 'has_ensures': '\\ensures' in spec_text, 'has_assigns': '\\assigns' in spec_text, 'has_valid': '\\valid' in spec_text } def _calculate_complexity_score(self, spec: str) -> float: """计算复杂度分数""" # 基于逻辑运算符、量化符等的数量 operators = ['&&', '\\|\\|', '==', '!=', '<', '>', '<=', '>='] quantifiers = ['\\forall', '\\exists'] operator_count = sum(spec.count(op) for op in operators) quantifier_count = sum(spec.count(q) for q in quantifiers) # 简单的复杂度计算 complexity = (operator_count * 0.1) + (quantifier_count * 0.2) return min(complexity, 1.0) def _calculate_readability_score(self, spec: str) -> float: """计算可读性分数""" lines = spec.split('\\n') if not lines: return 0.0 # 检查格式规范性和注释 formatted_lines = sum(1 for line in lines if line.strip().startswith('\\')) total_lines = len([line for line in lines if line.strip()]) if total_lines == 0: return 0.0 return formatted_lines / total_lines # 语法检查函数 def _check_cbmc_syntax(self, spec: str, function_info: Dict[str, Any]) -> Dict[str, Any]: """检查CBMC语法正确性""" errors = [] warnings = [] suggestions = [] lines = spec.split('\\n') found_clauses = [] for i, line in enumerate(lines): line = line.strip() if not line: continue # 检查CBMC子句格式 if line.startswith('\\'): if not any(line.startswith(clause) for clause in [ '\\requires', '\\ensures', '\\assigns', '\\valid', '\\forall', '\\exists', '\\invariant', '\\axiom', '\\behavior', '\\complete', '\\disjoint' ]): warnings.append(f"Line {i+1}: Unknown CBMC clause type") else: warnings.append(f"Line {i+1}: Line should start with CBMC clause (\\)") # 收集找到的子句 if line.startswith('\\requires'): found_clauses.append('requires') elif line.startswith('\\ensures'): found_clauses.append('ensures') elif line.startswith('\\assigns'): found_clauses.append('assigns') return { 'syntax_valid': len(errors) == 0, 'found_clauses': found_clauses, 'format_score': 1.0 - (len(warnings) / max(len(lines), 1)) } def _check_clause_format(self, spec: str, function_info: Dict[str, Any]) -> List[str]: """检查子句格式""" errors = [] lines = spec.split('\\n') for i, line in enumerate(lines): line = line.strip() if not line or not line.startswith('\\'): continue # 检查子句结构 clause_match = re.match(r'\\([a-zA-Z_]+)', line) if not clause_match: errors.append(f"Line {i+1}: Invalid clause format") return errors def _check_escape_sequences(self, spec: str, function_info: Dict[str, Any]) -> List[str]: """检查转义序列""" errors = [] # 检查双重反斜杠 if '\\\\' in spec: errors.append("Invalid double backslash escape sequence") return errors # 逻辑检查函数 def _check_logic_consistency(self, spec: str, function_info: Dict[str, Any]) -> List[str]: """检查逻辑一致性""" errors = [] warnings = [] # 检查常见的逻辑错误 if 'a == a' in spec: warnings.append("Tautology detected: a == a") if 'a != a' in spec: errors.append("Contradiction detected: a != a") return errors def _check_parameter_usage(self, spec: str, function_info: Dict[str, Any]) -> List[str]: """检查参数使用一致性""" warnings = [] suggestions = [] params = function_info.get('parameters', []) param_names = [p['name'] for p in params if 'name' in p] for param_name in param_names: if param_name not in spec: suggestions.append(f"Parameter '{param_name}' not used in specification") return [], warnings, suggestions def _check_return_value_consistency(self, spec: str, function_info: Dict[str, Any]) -> List[str]: """检查返回值一致性""" errors = [] warnings = [] return_type = function_info.get('return_type', 'void') has_ensures = '\\ensures' in spec if return_type != 'void' and not has_ensures: warnings.append("Non-void function should have ensures clause") if return_type == 'void' and '\\return' in spec: errors.append("Void function should not have return value specification") return errors # 完整性检查函数 def _check_required_clauses(self, spec: str, function_info: Dict[str, Any]) -> List[str]: """检查必需子句""" warnings = [] suggestions = [] required_clauses = ['requires', 'ensures'] missing_clauses = [] for clause in required_clauses: if f'\\{clause}' not in spec: missing_clauses.append(clause) if missing_clauses: suggestions.append(f"Consider adding missing clauses: {', '.join(missing_clauses)}") return [], warnings, suggestions def _check_parameter_coverage(self, spec: str, function_info: Dict[str, Any]) -> Dict[str, Any]: """检查参数覆盖度""" params = function_info.get('parameters', []) param_names = [p['name'] for p in params if 'name' in p] covered_params = [] for param_name in param_names: if param_name in spec: covered_params.append(param_name) coverage_count = len(covered_params) total_count = len(param_names) return { 'covered_parameters': covered_params, 'total_parameters': total_count, 'parameter_coverage': coverage_count, 'coverage_percentage': (coverage_count / total_count * 100) if total_count > 0 else 0 } def _check_memory_safety_coverage(self, spec: str, function_info: Dict[str, Any]) -> List[str]: """检查内存安全覆盖度""" suggestions = [] # 检查指针参数 has_pointers = any(p.get('is_pointer', False) for p in function_info.get('parameters', [])) if has_pointers and '\\valid' not in spec: suggestions.append("Consider adding validity checks for pointer parameters") # 检查数组参数 has_arrays = any(p.get('is_array', False) for p in function_info.get('parameters', [])) if has_arrays and 'bounds' not in spec.lower(): suggestions.append("Consider adding array bounds checks") return [], [], suggestions # 质量检查函数 def _check_specification_clarity(self, spec: str, function_info: Dict[str, Any]) -> List[str]: """检查规范清晰度""" suggestions = [] # 检查规范长度 if len(spec) < 50: suggestions.append("Specification is very short, consider adding more details") if len(spec) > 1000: suggestions.append("Specification is very long, consider simplifying") # 检查注释 if '/*' not in spec and '//' not in spec: suggestions.append("Consider adding comments to explain complex conditions") return [], [], suggestions def _check_domain_specific_rules(self, spec: str, function_info: Dict[str, Any]) -> List[str]: """检查领域特定规则""" suggestions = [] # 检查FreeRTOS特定规则 if 'task' in function_info.get('name', '').lower(): if 'freertos' not in spec.lower(): suggestions.append("Consider adding FreeRTOS-specific safety constraints") # 检查嵌入式系统规则 params = function_info.get('parameters', []) for param in params: if param.get('type', '').endswith('*') and 'null' not in spec.lower(): suggestions.append("Consider adding null pointer checks") return [], [], suggestions def _check_best_practices(self, spec: str, function_info: Dict[str, Any]) -> List[str]: """检查最佳实践遵循度""" suggestions = [] # 检查变量命名 if 'temp' in spec.lower(): suggestions.append("Avoid using 'temp' variable names in specifications") # 检查硬编码值 if re.search(r'\\b\\d+\\b', spec): suggestions.append("Consider using named constants instead of hardcoded values") # 检查复杂条件 if '&&' in spec and spec.count('&&') > 3: suggestions.append("Consider breaking down complex conditions into simpler ones") return [], [], suggestions def validate_batch(self, specs: List[Tuple[str, Dict[str, Any]]]) -> List[ValidationResult]: """批量验证规范""" results = [] for spec, function_info in specs: result = self.validate_specification(spec, function_info) results.append(result) return results def generate_validation_report(self, results: List[ValidationResult]) -> Dict[str, Any]: """生成验证报告""" if not results: return {'error': 'No validation results'} total_specs = len(results) valid_specs = sum(1 for r in results if r.is_valid) invalid_specs = total_specs - valid_specs # 质量分布 quality_scores = [r.quality_score for r in results] avg_quality = sum(quality_scores) / len(quality_scores) if quality_scores else 0.0 quality_distribution = { 'excellent': len([s for s in quality_scores if s >= 0.9]), 'good': len([s for s in quality_scores if 0.7 <= s < 0.9]), 'fair': len([s for s in quality_scores if 0.5 <= s < 0.7]), 'poor': len([s for s in quality_scores if s < 0.5]) } # 错误和警告统计 total_errors = sum(len(r.errors) for r in results) total_warnings = sum(len(r.warnings) for r in results) total_suggestions = sum(len(r.suggestions) for r in results) # 常见错误 common_errors = {} for result in results: for error in result.errors: common_errors[error] = common_errors.get(error, 0) + 1 # 按函数分组 function_results = {} for result in results: func_name = result.function_info.get('name', 'unknown') if func_name not in function_results: function_results[func_name] = [] function_results[func_name].append(result) report = { 'summary': { 'total_specifications': total_specs, 'valid_specifications': valid_specs, 'invalid_specifications': invalid_specs, 'success_rate': valid_specs / total_specs if total_specs > 0 else 0, 'average_quality': avg_quality, 'total_errors': total_errors, 'total_warnings': total_warnings, 'total_suggestions': total_suggestions }, 'quality_distribution': quality_distribution, 'common_errors': dict(sorted(common_errors.items(), key=lambda x: x[1], reverse=True)[:10]), 'function_results': { name: { 'count': len(results), 'avg_quality': sum(r.quality_score for r in results) / len(results), 'valid_count': sum(1 for r in results if r.is_valid) } for name, results in function_results.items() }, 'detailed_results': [result.to_dict() for result in results] } return report def save_validation_report(self, report: Dict[str, Any], output_path: str): """保存验证报告""" try: with open(output_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) self.logger.info(f"Validation report saved to: {output_path}") except Exception as e: self.logger.error(f"Failed to save validation report: {e}") def enable_rule(self, rule_name: str): """启用验证规则""" for rule in self.rules: if rule.name == rule_name: rule.enabled = True self.logger.info(f"Enabled rule: {rule_name}") return self.logger.warning(f"Rule not found: {rule_name}") def disable_rule(self, rule_name: str): """禁用验证规则""" for rule in self.rules: if rule.name == rule_name: rule.enabled = False self.logger.info(f"Disabled rule: {rule_name}") return self.logger.warning(f"Rule not found: {rule_name}") def get_rules_info(self) -> List[Dict[str, Any]]: """获取规则信息""" return [ { 'name': rule.name, 'description': rule.description, 'level': rule.level.value, 'enabled': rule.enabled } for rule in self.rules ] # 便捷函数 def validate_cbmc_specification(spec: str, function_info: Dict[str, Any]) -> ValidationResult: """验证CBMC规范的便捷函数""" validator = CBMCSpecificationValidator() return validator.validate_specification(spec, function_info) def validate_cbmc_specifications_batch(specs: List[Tuple[str, Dict[str, Any]]]) -> List[ValidationResult]: """批量验证CBMC规范的便捷函数""" validator = CBMCSpecificationValidator() return validator.validate_batch(specs) def generate_validation_summary(results: List[ValidationResult]) -> Dict[str, Any]: """生成验证摘要的便捷函数""" validator = CBMCSpecificationValidator() return validator.generate_validation_report(results)