#!/usr/bin/env python3 """ 高级DeepSeek v3.1代码规约化测试 测试更复杂的函数和验证场景 """ import os import requests import json import time class AdvancedDeepSeekTest: """高级DeepSeek测试类""" def __init__(self, api_key=None, base_url=None): self.api_key = api_key or os.getenv('SILICONFLOW_API_KEY') self.base_url = base_url or 'https://api.siliconflow.cn/v1' self.model = 'deepseek-ai/DeepSeek-V3.1' if not self.api_key: raise ValueError("需要设置 SILICONFLOW_API_KEY 环境变量") def generate_specification(self, function_info, verification_goals): """生成形式化规约""" prompt = self._build_prompt(function_info, verification_goals) messages = [ { "role": "system", "content": """你是一个专业的形式化验证专家,专门为C/C++代码生成CBMC格式的形式化规约。 请生成准确、完整的CBMC规约,严格遵循CBMC语法规范。 生成的规约应该: 1. 使用正确的CBMC语法(\\requires, \\ensures, \\assigns等) 2. 包含适当的前置条件和后置条件 3. 处理边界条件和错误情况 4. 只输出CBMC规约代码,不要包含其他解释""" }, { "role": "user", "content": prompt } ] data = { "model": self.model, "messages": messages, "temperature": 0.2, # 更低温度,更精确的输出 "max_tokens": 2048, "stream": False } headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } start_time = time.time() try: response = requests.post( f"{self.base_url}/chat/completions", headers=headers, json=data, timeout=120 ) if response.status_code == 200: result = response.json() content = result['choices'][0]['message']['content'] tokens_used = result.get('usage', {}).get('total_tokens', 0) # 后处理:提取CBMC规约 specification = self._extract_cbmc_spec(content) generation_time = time.time() - start_time return { 'specification': specification, 'raw_content': content, 'generation_time': generation_time, 'tokens_used': tokens_used, 'success': True } else: return { 'success': False, 'error': f"API错误: {response.status_code} - {response.text}" } except Exception as e: return { 'success': False, 'error': f"请求失败: {str(e)}" } def _build_prompt(self, function_info, verification_goals): """构建提示词""" func_name = function_info.get('name', 'unknown') return_type = function_info.get('return_type', 'void') parameters = function_info.get('parameters', []) source_code = function_info.get('source_code', '') includes = function_info.get('includes', []) description = function_info.get('description', '') # 构建函数签名 param_str = ', '.join([f"{p['type']} {p['name']}" for p in parameters]) function_signature = f"{return_type} {func_name}({param_str})" # 构建验证目标描述 goal_descriptions = { 'functional_correctness': '功能正确性验证', 'memory_safety': '内存安全性验证', 'pointer_validity': '指针有效性验证', 'integer_overflow': '整数溢出检查', 'array_bounds': '数组边界检查', 'error_handling': '错误处理验证', 'concurrency': '并发安全性', 'data_race': '数据竞争检测', 'deadlock': '死锁检测', 'resource_leak': '资源泄漏检测' } goals_text = [] for goal in verification_goals: description = goal_descriptions.get(goal, goal) goals_text.append(f"- {description}") includes_text = '\\n'.join(includes) if includes else '' prompt = f"""请为以下C函数生成完整的CBMC形式化验证规约: {includes_text} 函数描述:{description} 函数签名:{function_signature} 源代码: ```c {source_code} ``` 验证目标: {chr(10).join(goals_text)} 请生成完整的CBMC规约,包含: 1. \\requires 子句:前置条件(参数有效性、边界条件等) 2. \\ensures 子句:后置条件(返回值、状态变化等) 3. \\assigns 子句:赋值说明(修改的变量和内存位置) 4. 必要的辅助函数和断言 确保规约的完整性和准确性,能够检测所有指定的验证目标。 """ return prompt def _extract_cbmc_spec(self, content): """从生成内容中提取CBMC规约""" lines = content.split('\n') cbmc_lines = [] # 移除markdown代码块标记 in_code_block = False for line in lines: line = line.strip() # 处理markdown代码块 if line.startswith('```'): in_code_block = not in_code_block continue if in_code_block or not line.startswith('```'): # 保留CBMC相关行和包含文件 if (line.startswith('#include') or line.startswith('/*') or line.startswith('*/') or line.startswith('@') or line.startswith(r'\requires') or line.startswith(r'\ensures') or line.startswith(r'\assigns') or line.startswith(r'\valid') or line.startswith(r'\forall') or line.startswith(r'\exists') or line.startswith(r'\invariant') or line.startswith(r'\behavior') or line.startswith('int') or line.startswith('bool') or line.startswith('size_t') or '=' in line or line.endswith(';') or line.strip()): cbmc_lines.append(line) return '\n'.join(cbmc_lines) if cbmc_lines else content.strip() def create_advanced_test_functions(): """创建高级测试函数""" return [ { 'name': 'array_copy_safe', 'return_type': 'bool', 'parameters': [ {'name': 'dest', 'type': 'int*'}, {'name': 'src', 'type': 'const int*'}, {'name': 'dest_size', 'type': 'size_t'}, {'name': 'src_size', 'type': 'size_t'}, {'name': 'copied', 'type': 'size_t*'} ], 'includes': ['#include ', '#include '], 'source_code': '''bool array_copy_safe(int *dest, const int *src, size_t dest_size, size_t src_size, size_t *copied) { if (dest == NULL || src == NULL || copied == NULL) { return false; } if (dest_size == 0 || src_size == 0) { *copied = 0; return true; } size_t copy_size = dest_size < src_size ? dest_size : src_size; for (size_t i = 0; i < copy_size; i++) { dest[i] = src[i]; } *copied = copy_size; return true; }''', 'description': '安全的数组拷贝函数,检查所有边界条件', 'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds', 'error_handling'] }, { 'name': 'circular_buffer_push', 'return_type': 'bool', 'parameters': [ {'name': 'buffer', 'type': 'int*'}, {'name': 'capacity', 'type': 'size_t'}, {'name': 'head', 'type': 'size_t*'}, {'name': 'tail', 'type': 'size_t*'}, {'name': 'value', 'type': 'int'} ], 'includes': ['#include ', '#include '], 'source_code': '''bool circular_buffer_push(int *buffer, size_t capacity, size_t *head, size_t *tail, int value) { if (buffer == NULL || head == NULL || tail == NULL || capacity == 0) { return false; } size_t next_tail = (*tail + 1) % capacity; if (next_tail == *head) { return false; // 缓冲区已满 } buffer[*tail] = value; *tail = next_tail; return true; }''', 'description': '环形缓冲区推送操作', 'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds', 'error_handling', 'concurrency'] }, { 'name': 'thread_safe_counter', 'return_type': 'int', 'parameters': [ {'name': 'counter', 'type': 'volatile int*'}, {'name': 'increment', 'type': 'int'} ], 'includes': ['#include '], 'source_code': '''int thread_safe_counter(volatile int *counter, int increment) { if (counter == NULL) { return 0; } int old_value, new_value; do { old_value = *counter; new_value = old_value + increment; } while (!__atomic_compare_exchange_n(counter, &old_value, new_value, false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE)); return new_value; }''', 'description': '线程安全的计数器操作', 'verification_goals': ['functional_correctness', 'concurrency', 'data_race', 'memory_safety'] } ] def evaluate_specification_quality(spec, verification_goals): """评估规约质量""" score = 0.0 max_score = len(verification_goals) * 2.0 # 检查基本CBMC语法 if r'\requires' in spec: score += 0.5 if r'\ensures' in spec: score += 0.5 if r'\assigns' in spec: score += 0.3 # 检查验证目标覆盖 if 'memory_safety' in verification_goals and r'\valid' in spec: score += 0.5 if 'array_bounds' in verification_goals and ('size_t' in spec or 'bound' in spec.lower()): score += 0.5 if 'error_handling' in verification_goals and ('NULL' in spec or 'false' in spec): score += 0.5 if 'concurrency' in verification_goals and ('atomic' in spec.lower() or 'volatile' in spec): score += 0.5 # 检查完整性 if '&&' in spec or '||' in spec: score += 0.3 # 逻辑条件 if '\\result' in spec: score += 0.3 # 返回值检查 return min(score / max_score, 1.0) def main(): """主测试函数""" print("🚀 DeepSeek v3.1 高级代码规约化测试") print("=" * 60) try: # 创建测试实例 tester = AdvancedDeepSeekTest() print(f"📡 连接模型: {tester.model}") # 测试函数 test_functions = create_advanced_test_functions() results = [] for i, func in enumerate(test_functions, 1): print(f"\n{'='*25} 高级测试 {i} {'='*25}") print(f"📝 函数: {func['name']}") print(f"📖 描述: {func['description']}") print(f"🎯 目标: {', '.join(func['verification_goals'])}") print("\n⏳ 正在生成规约...") result = tester.generate_specification(func, func['verification_goals']) if result['success']: print(f"✅ 生成成功!") print(f"⏱️ 时间: {result['generation_time']:.2f}s") print(f"🔤 Token: {result['tokens_used']}") # 评估质量 quality_score = evaluate_specification_quality( result['specification'], func['verification_goals'] ) print(f"📊 质量评分: {quality_score:.2f}") print(f"\n📋 生成的CBMC规约:") print("-" * 50) print(result['specification']) print("-" * 50) results.append({ 'function': func['name'], 'quality': quality_score, 'time': result['generation_time'], 'tokens': result['tokens_used'] }) else: print(f"❌ 生成失败: {result['error']}") # 总结报告 print(f"\n{'='*60}") print("📊 测试总结报告") print("="*60) if results: avg_quality = sum(r['quality'] for r in results) / len(results) avg_time = sum(r['time'] for r in results) / len(results) total_tokens = sum(r['tokens'] for r in results) print(f"📈 总体统计:") print(f" • 测试函数数: {len(results)}") print(f" • 平均质量评分: {avg_quality:.2f}") print(f" • 平均生成时间: {avg_time:.2f}s") print(f" • 总Token消耗: {total_tokens}") print(f"\n🏆 详细结果:") for result in results: quality_emoji = "🥇" if result['quality'] >= 0.9 else "🥈" if result['quality'] >= 0.7 else "🥉" print(f" {quality_emoji} {result['function']}: 质量 {result['quality']:.2f}, 时间 {result['time']:.2f}s") print(f"\n🎉 高级测试完成!") # 使用建议 print(f"\n💡 使用建议:") print("1. 复杂函数建议降低temperature参数获得更精确结果") print("2. 并发相关函数需要更详细的规约描述") print("3. 建议结合实际CBMC工具验证生成的规约") print("4. 可以根据具体需求调整提示词模板") except Exception as e: print(f"❌ 测试失败: {str(e)}") import traceback traceback.print_exc() if __name__ == "__main__": main()