#!/usr/bin/env python3 """ DeepSeek v3.1 综合代码规约化测试 展示从简单到复杂的完整测试流程 """ import os import requests import json import time from datetime import datetime class ComprehensiveDeepSeekTest: """综合DeepSeek测试类""" def __init__(self, api_key=None): self.api_key = api_key or os.getenv('SILICONFLOW_API_KEY') self.base_url = 'https://api.siliconflow.cn/v1' self.model = 'deepseek-ai/DeepSeek-V3.1' self.results = [] if not self.api_key: raise ValueError("需要设置 SILICONFLOW_API_KEY 环境变量") def generate_specification(self, function_info, verification_goals, complexity='medium'): """生成形式化规约""" prompt = self._build_prompt(function_info, verification_goals, complexity) messages = [ { "role": "system", "content": self._get_system_prompt(complexity) }, { "role": "user", "content": prompt } ] data = { "model": self.model, "messages": messages, "temperature": 0.1 if complexity in ['high', 'medium'] else 0.3, "max_tokens": 2048 if complexity == 'high' else 1024, "stream": False } headers = { "Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json" } start_time = time.time() try: response = requests.post( f"{self.base_url}/chat/completions", headers=headers, json=data, timeout=90 ) if response.status_code == 200: result = response.json() content = result['choices'][0]['message']['content'] tokens_used = result.get('usage', {}).get('total_tokens', 0) specification = self._extract_cbmc_spec(content) generation_time = time.time() - start_time return { 'specification': specification, 'raw_content': content, 'generation_time': generation_time, 'tokens_used': tokens_used, 'success': True } else: return { 'success': False, 'error': f"API错误: {response.status_code}" } except Exception as e: return { 'success': False, 'error': f"请求失败: {str(e)}" } def _get_system_prompt(self, complexity): """根据复杂度获取系统提示""" if complexity == 'high': return """你是一位资深的CBMC形式化验证专家,精通复杂的C/C++程序验证。 请生成精确、完整、可执行的CBMC规约,确保能够检测所有潜在的缺陷。 严格使用CBMC语法,包含所有必要的验证条件。""" elif complexity == 'medium': return """你是CBMC形式化验证专家,擅长为C/C++代码生成形式化规约。 请生成准确的CBMC规约,覆盖主要的验证目标。""" else: return """你是CBMC形式化验证助手,为简单的C函数生成基础规约。 生成简洁的CBMC规约,确保基本正确性。""" def _build_prompt(self, function_info, verification_goals, complexity): """构建提示词""" func_name = function_info.get('name', 'unknown') return_type = function_info.get('return_type', 'void') parameters = function_info.get('parameters', []) source_code = function_info.get('source_code', '') description = function_info.get('description', '') param_str = ', '.join([f"{p['type']} {p['name']}" for p in parameters]) function_signature = f"{return_type} {func_name}({param_str})" goals_text = ', '.join(verification_goals) if complexity == 'high': prompt = f"""函数:{description} 签名:{function_signature} 源代码: ```c {source_code} ``` 验证目标:{goals_text} 请生成完整的CBMC规约,包括: - 详细的前置条件(参数有效性、内存安全、边界检查) - 精确的后置条件(返回值、状态不变式) - 赋值说明(修改的内存位置) - 辅助断言(如果需要) 确保规约能够检测所有指定的验证目标。""" else: prompt = f"""函数:{func_name} 签名:{function_signature} 源代码: ```c {source_code} ``` 验证目标:{goals_text} 生成CBMC规约: \\requires ... \\ensures ... \\assigns ...""" return prompt def _extract_cbmc_spec(self, content): """提取CBMC规约""" lines = content.split('\n') cbmc_lines = [] for line in lines: line = line.strip() if (line.startswith(r'\requires') or line.startswith(r'\ensures') or line.startswith(r'\assigns') or line.startswith(r'\valid') or line.startswith(r'\forall') or line.startswith(r'\exists') or line.startswith('/*') or line.startswith('*/') or line.startswith('@') or line.startswith('#include') or '=' in line or line.endswith(';')): cbmc_lines.append(line) return '\n'.join(cbmc_lines) if cbmc_lines else content.strip() def run_comprehensive_test(self): """运行综合测试""" print("🚀 DeepSeek v3.1 综合代码规约化测试") print("=" * 60) print(f"📅 测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") print(f"🤖 模型: {self.model}") print("=" * 60) # 定义测试套件 test_suite = [ { 'category': '基础函数', 'functions': [ { 'name': 'basic_add', 'complexity': 'low', 'verification_goals': ['functional_correctness'], 'description': '基本加法函数', 'source_code': '''int basic_add(int a, int b) { return a + b; }''' }, { 'name': 'basic_multiply', 'complexity': 'low', 'verification_goals': ['functional_correctness', 'integer_overflow'], 'description': '基本乘法函数', 'source_code': '''int basic_multiply(int a, int b) { return a * b; }''' } ] }, { 'category': '内存操作', 'functions': [ { 'name': 'memory_copy', 'complexity': 'medium', 'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds'], 'description': '内存拷贝函数', 'source_code': '''void memory_copy(void* dest, const void* src, size_t n) { if (dest == NULL || src == NULL || n == 0) return; unsigned char* d = (unsigned char*)dest; const unsigned char* s = (const unsigned char*)src; for (size_t i = 0; i < n; i++) { d[i] = s[i]; } }''' } ] }, { 'category': '错误处理', 'functions': [ { 'name': 'safe_array_access', 'complexity': 'medium', 'verification_goals': ['functional_correctness', 'memory_safety', 'error_handling', 'array_bounds'], 'description': '安全数组访问函数', 'source_code': '''bool safe_array_access(int* array, size_t size, size_t index, int* result) { if (array == NULL || result == NULL) return false; if (index >= size) return false; *result = array[index]; return true; }''' } ] }, { 'category': '复杂算法', 'functions': [ { 'name': 'binary_search', 'complexity': 'high', 'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds', 'error_handling'], 'description': '二分查找算法', 'source_code': '''int binary_search(const int* array, size_t size, int target) { if (array == NULL || size == 0) return -1; size_t left = 0; size_t right = size - 1; while (left <= right) { size_t mid = left + (right - left) / 2; if (array[mid] == target) { return (int)mid; } else if (array[mid] < target) { left = mid + 1; } else { if (mid == 0) break; right = mid - 1; } } return -1; }''' } ] } ] total_functions = sum(len(category['functions']) for category in test_suite) successful_tests = 0 # 运行测试 for category in test_suite: print(f"\n📂 {category['category']}") print("-" * 40) for func in category['functions']: print(f"\n🔍 {func['name']} ({func['complexity'].upper()})") print(f"📝 {func['description']}") print(f"🎯 {', '.join(func['verification_goals'])}") # 构建函数信息 function_info = { 'name': func['name'], 'return_type': 'int' if 'int' in func['source_code'].split('(')[0].split()[-1] else 'void', 'parameters': self._extract_parameters(func['source_code']), 'source_code': func['source_code'], 'description': func['description'] } print("\n⏳ 生成规约中...") result = self.generate_specification( function_info, func['verification_goals'], func['complexity'] ) if result['success']: successful_tests += 1 print(f"✅ 成功! ⏱️ {result['generation_time']:.2f}s, 🔤 {result['tokens_used']} tokens") print(f"\n📋 CBMC规约:") print("-" * 50) print(result['specification']) print("-" * 50) # 保存结果 self.results.append({ 'function': func['name'], 'category': category['category'], 'complexity': func['complexity'], 'verification_goals': func['verification_goals'], 'specification': result['specification'], 'generation_time': result['generation_time'], 'tokens_used': result['tokens_used'], 'timestamp': datetime.now().isoformat() }) else: print(f"❌ 失败: {result['error']}") # 生成报告 self.generate_report(successful_tests, total_functions) def _extract_parameters(self, source_code): """从源代码提取参数""" # 简化的参数提取 params = [] param_section = source_code.split('(')[1].split(')')[0] if param_section.strip(): for param in param_section.split(','): param = param.strip() if ' ' in param: type_part, name_part = param.rsplit(' ', 1) params.append({'type': type_part.strip(), 'name': name_part.strip()}) return params def generate_report(self, successful, total): """生成测试报告""" print(f"\n{'='*60}") print("📊 综合测试报告") print("="*60) # 基本统计 success_rate = (successful / total) * 100 if total > 0 else 0 avg_time = sum(r['generation_time'] for r in self.results) / len(self.results) if self.results else 0 total_tokens = sum(r['tokens_used'] for r in self.results) print(f"📈 测试统计:") print(f" • 总函数数: {total}") print(f" • 成功测试: {successful}") print(f" • 成功率: {success_rate:.1f}%") print(f" • 平均时间: {avg_time:.2f}s") print(f" • 总Token消耗: {total_tokens}") # 按复杂度统计 complexity_stats = {} for result in self.results: complexity = result['complexity'] if complexity not in complexity_stats: complexity_stats[complexity] = {'count': 0, 'total_time': 0, 'total_tokens': 0} complexity_stats[complexity]['count'] += 1 complexity_stats[complexity]['total_time'] += result['generation_time'] complexity_stats[complexity]['total_tokens'] += result['tokens_used'] print(f"\n🎯 复杂度分析:") for complexity, stats in complexity_stats.items(): avg_time = stats['total_time'] / stats['count'] avg_tokens = stats['total_tokens'] / stats['count'] print(f" • {complexity.upper()}: {stats['count']}个函数, 平均 {avg_time:.1f}s, {avg_tokens:.0f} tokens") # 按类别统计 category_stats = {} for result in self.results: category = result['category'] if category not in category_stats: category_stats[category] = {'count': 0, 'total_time': 0} category_stats[category]['count'] += 1 category_stats[category]['total_time'] += result['generation_time'] print(f"\n📂 类别分析:") for category, stats in category_stats.items(): avg_time = stats['total_time'] / stats['count'] print(f" • {category}: {stats['count']}个函数, 平均 {avg_time:.1f}s") # 保存详细报告 self.save_detailed_report() print(f"\n🎉 综合测试完成!") print(f"💾 详细报告已保存到: deepseek_test_report.json") def save_detailed_report(self): """保存详细报告""" report = { 'test_info': { 'model': self.model, 'test_time': datetime.now().isoformat(), 'total_functions': len(self.results) }, 'results': self.results } with open('deepseek_test_report.json', 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) def main(): """主函数""" try: tester = ComprehensiveDeepSeekTest() tester.run_comprehensive_test() except Exception as e: print(f"❌ 测试失败: {str(e)}") import traceback traceback.print_exc() if __name__ == "__main__": main()