You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cbmc/codedetect/tests/deepseek/scripts/comprehensive_deepseek_test.py

432 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
DeepSeek v3.1 综合代码规约化测试
展示从简单到复杂的完整测试流程
"""
import os
import requests
import json
import time
from datetime import datetime
class ComprehensiveDeepSeekTest:
"""综合DeepSeek测试类"""
def __init__(self, api_key=None):
self.api_key = api_key or os.getenv('SILICONFLOW_API_KEY')
self.base_url = 'https://api.siliconflow.cn/v1'
self.model = 'deepseek-ai/DeepSeek-V3.1'
self.results = []
if not self.api_key:
raise ValueError("需要设置 SILICONFLOW_API_KEY 环境变量")
def generate_specification(self, function_info, verification_goals, complexity='medium'):
"""生成形式化规约"""
prompt = self._build_prompt(function_info, verification_goals, complexity)
messages = [
{
"role": "system",
"content": self._get_system_prompt(complexity)
},
{
"role": "user",
"content": prompt
}
]
data = {
"model": self.model,
"messages": messages,
"temperature": 0.1 if complexity in ['high', 'medium'] else 0.3,
"max_tokens": 2048 if complexity == 'high' else 1024,
"stream": False
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
start_time = time.time()
try:
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=data,
timeout=90
)
if response.status_code == 200:
result = response.json()
content = result['choices'][0]['message']['content']
tokens_used = result.get('usage', {}).get('total_tokens', 0)
specification = self._extract_cbmc_spec(content)
generation_time = time.time() - start_time
return {
'specification': specification,
'raw_content': content,
'generation_time': generation_time,
'tokens_used': tokens_used,
'success': True
}
else:
return {
'success': False,
'error': f"API错误: {response.status_code}"
}
except Exception as e:
return {
'success': False,
'error': f"请求失败: {str(e)}"
}
def _get_system_prompt(self, complexity):
"""根据复杂度获取系统提示"""
if complexity == 'high':
return """你是一位资深的CBMC形式化验证专家精通复杂的C/C++程序验证。
请生成精确、完整、可执行的CBMC规约确保能够检测所有潜在的缺陷。
严格使用CBMC语法包含所有必要的验证条件。"""
elif complexity == 'medium':
return """你是CBMC形式化验证专家擅长为C/C++代码生成形式化规约。
请生成准确的CBMC规约覆盖主要的验证目标。"""
else:
return """你是CBMC形式化验证助手为简单的C函数生成基础规约。
生成简洁的CBMC规约确保基本正确性。"""
def _build_prompt(self, function_info, verification_goals, complexity):
"""构建提示词"""
func_name = function_info.get('name', 'unknown')
return_type = function_info.get('return_type', 'void')
parameters = function_info.get('parameters', [])
source_code = function_info.get('source_code', '')
description = function_info.get('description', '')
param_str = ', '.join([f"{p['type']} {p['name']}" for p in parameters])
function_signature = f"{return_type} {func_name}({param_str})"
goals_text = ', '.join(verification_goals)
if complexity == 'high':
prompt = f"""函数:{description}
签名:{function_signature}
源代码:
```c
{source_code}
```
验证目标:{goals_text}
请生成完整的CBMC规约包括
- 详细的前置条件(参数有效性、内存安全、边界检查)
- 精确的后置条件(返回值、状态不变式)
- 赋值说明(修改的内存位置)
- 辅助断言(如果需要)
确保规约能够检测所有指定的验证目标。"""
else:
prompt = f"""函数:{func_name}
签名:{function_signature}
源代码:
```c
{source_code}
```
验证目标:{goals_text}
生成CBMC规约
\\requires ...
\\ensures ...
\\assigns ..."""
return prompt
def _extract_cbmc_spec(self, content):
"""提取CBMC规约"""
lines = content.split('\n')
cbmc_lines = []
for line in lines:
line = line.strip()
if (line.startswith(r'\requires') or
line.startswith(r'\ensures') or
line.startswith(r'\assigns') or
line.startswith(r'\valid') or
line.startswith(r'\forall') or
line.startswith(r'\exists') or
line.startswith('/*') or
line.startswith('*/') or
line.startswith('@') or
line.startswith('#include') or
'=' in line or
line.endswith(';')):
cbmc_lines.append(line)
return '\n'.join(cbmc_lines) if cbmc_lines else content.strip()
def run_comprehensive_test(self):
"""运行综合测试"""
print("🚀 DeepSeek v3.1 综合代码规约化测试")
print("=" * 60)
print(f"📅 测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"🤖 模型: {self.model}")
print("=" * 60)
# 定义测试套件
test_suite = [
{
'category': '基础函数',
'functions': [
{
'name': 'basic_add',
'complexity': 'low',
'verification_goals': ['functional_correctness'],
'description': '基本加法函数',
'source_code': '''int basic_add(int a, int b) {
return a + b;
}'''
},
{
'name': 'basic_multiply',
'complexity': 'low',
'verification_goals': ['functional_correctness', 'integer_overflow'],
'description': '基本乘法函数',
'source_code': '''int basic_multiply(int a, int b) {
return a * b;
}'''
}
]
},
{
'category': '内存操作',
'functions': [
{
'name': 'memory_copy',
'complexity': 'medium',
'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds'],
'description': '内存拷贝函数',
'source_code': '''void memory_copy(void* dest, const void* src, size_t n) {
if (dest == NULL || src == NULL || n == 0) return;
unsigned char* d = (unsigned char*)dest;
const unsigned char* s = (const unsigned char*)src;
for (size_t i = 0; i < n; i++) {
d[i] = s[i];
}
}'''
}
]
},
{
'category': '错误处理',
'functions': [
{
'name': 'safe_array_access',
'complexity': 'medium',
'verification_goals': ['functional_correctness', 'memory_safety', 'error_handling', 'array_bounds'],
'description': '安全数组访问函数',
'source_code': '''bool safe_array_access(int* array, size_t size, size_t index, int* result) {
if (array == NULL || result == NULL) return false;
if (index >= size) return false;
*result = array[index];
return true;
}'''
}
]
},
{
'category': '复杂算法',
'functions': [
{
'name': 'binary_search',
'complexity': 'high',
'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds', 'error_handling'],
'description': '二分查找算法',
'source_code': '''int binary_search(const int* array, size_t size, int target) {
if (array == NULL || size == 0) return -1;
size_t left = 0;
size_t right = size - 1;
while (left <= right) {
size_t mid = left + (right - left) / 2;
if (array[mid] == target) {
return (int)mid;
} else if (array[mid] < target) {
left = mid + 1;
} else {
if (mid == 0) break;
right = mid - 1;
}
}
return -1;
}'''
}
]
}
]
total_functions = sum(len(category['functions']) for category in test_suite)
successful_tests = 0
# 运行测试
for category in test_suite:
print(f"\n📂 {category['category']}")
print("-" * 40)
for func in category['functions']:
print(f"\n🔍 {func['name']} ({func['complexity'].upper()})")
print(f"📝 {func['description']}")
print(f"🎯 {', '.join(func['verification_goals'])}")
# 构建函数信息
function_info = {
'name': func['name'],
'return_type': 'int' if 'int' in func['source_code'].split('(')[0].split()[-1] else 'void',
'parameters': self._extract_parameters(func['source_code']),
'source_code': func['source_code'],
'description': func['description']
}
print("\n⏳ 生成规约中...")
result = self.generate_specification(
function_info,
func['verification_goals'],
func['complexity']
)
if result['success']:
successful_tests += 1
print(f"✅ 成功! ⏱️ {result['generation_time']:.2f}s, 🔤 {result['tokens_used']} tokens")
print(f"\n📋 CBMC规约:")
print("-" * 50)
print(result['specification'])
print("-" * 50)
# 保存结果
self.results.append({
'function': func['name'],
'category': category['category'],
'complexity': func['complexity'],
'verification_goals': func['verification_goals'],
'specification': result['specification'],
'generation_time': result['generation_time'],
'tokens_used': result['tokens_used'],
'timestamp': datetime.now().isoformat()
})
else:
print(f"❌ 失败: {result['error']}")
# 生成报告
self.generate_report(successful_tests, total_functions)
def _extract_parameters(self, source_code):
"""从源代码提取参数"""
# 简化的参数提取
params = []
param_section = source_code.split('(')[1].split(')')[0]
if param_section.strip():
for param in param_section.split(','):
param = param.strip()
if ' ' in param:
type_part, name_part = param.rsplit(' ', 1)
params.append({'type': type_part.strip(), 'name': name_part.strip()})
return params
def generate_report(self, successful, total):
"""生成测试报告"""
print(f"\n{'='*60}")
print("📊 综合测试报告")
print("="*60)
# 基本统计
success_rate = (successful / total) * 100 if total > 0 else 0
avg_time = sum(r['generation_time'] for r in self.results) / len(self.results) if self.results else 0
total_tokens = sum(r['tokens_used'] for r in self.results)
print(f"📈 测试统计:")
print(f" • 总函数数: {total}")
print(f" • 成功测试: {successful}")
print(f" • 成功率: {success_rate:.1f}%")
print(f" • 平均时间: {avg_time:.2f}s")
print(f" • 总Token消耗: {total_tokens}")
# 按复杂度统计
complexity_stats = {}
for result in self.results:
complexity = result['complexity']
if complexity not in complexity_stats:
complexity_stats[complexity] = {'count': 0, 'total_time': 0, 'total_tokens': 0}
complexity_stats[complexity]['count'] += 1
complexity_stats[complexity]['total_time'] += result['generation_time']
complexity_stats[complexity]['total_tokens'] += result['tokens_used']
print(f"\n🎯 复杂度分析:")
for complexity, stats in complexity_stats.items():
avg_time = stats['total_time'] / stats['count']
avg_tokens = stats['total_tokens'] / stats['count']
print(f"{complexity.upper()}: {stats['count']}个函数, 平均 {avg_time:.1f}s, {avg_tokens:.0f} tokens")
# 按类别统计
category_stats = {}
for result in self.results:
category = result['category']
if category not in category_stats:
category_stats[category] = {'count': 0, 'total_time': 0}
category_stats[category]['count'] += 1
category_stats[category]['total_time'] += result['generation_time']
print(f"\n📂 类别分析:")
for category, stats in category_stats.items():
avg_time = stats['total_time'] / stats['count']
print(f"{category}: {stats['count']}个函数, 平均 {avg_time:.1f}s")
# 保存详细报告
self.save_detailed_report()
print(f"\n🎉 综合测试完成!")
print(f"💾 详细报告已保存到: deepseek_test_report.json")
def save_detailed_report(self):
"""保存详细报告"""
report = {
'test_info': {
'model': self.model,
'test_time': datetime.now().isoformat(),
'total_functions': len(self.results)
},
'results': self.results
}
with open('deepseek_test_report.json', 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
def main():
"""主函数"""
try:
tester = ComprehensiveDeepSeekTest()
tester.run_comprehensive_test()
except Exception as e:
print(f"❌ 测试失败: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()