|
|
#!/usr/bin/env python3
|
|
|
"""
|
|
|
DeepSeek v3.1 综合代码规约化测试
|
|
|
|
|
|
展示从简单到复杂的完整测试流程
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import requests
|
|
|
import json
|
|
|
import time
|
|
|
from datetime import datetime
|
|
|
|
|
|
|
|
|
class ComprehensiveDeepSeekTest:
|
|
|
"""综合DeepSeek测试类"""
|
|
|
|
|
|
def __init__(self, api_key=None):
|
|
|
self.api_key = api_key or os.getenv('SILICONFLOW_API_KEY')
|
|
|
self.base_url = 'https://api.siliconflow.cn/v1'
|
|
|
self.model = 'deepseek-ai/DeepSeek-V3.1'
|
|
|
self.results = []
|
|
|
|
|
|
if not self.api_key:
|
|
|
raise ValueError("需要设置 SILICONFLOW_API_KEY 环境变量")
|
|
|
|
|
|
def generate_specification(self, function_info, verification_goals, complexity='medium'):
|
|
|
"""生成形式化规约"""
|
|
|
prompt = self._build_prompt(function_info, verification_goals, complexity)
|
|
|
|
|
|
messages = [
|
|
|
{
|
|
|
"role": "system",
|
|
|
"content": self._get_system_prompt(complexity)
|
|
|
},
|
|
|
{
|
|
|
"role": "user",
|
|
|
"content": prompt
|
|
|
}
|
|
|
]
|
|
|
|
|
|
data = {
|
|
|
"model": self.model,
|
|
|
"messages": messages,
|
|
|
"temperature": 0.1 if complexity in ['high', 'medium'] else 0.3,
|
|
|
"max_tokens": 2048 if complexity == 'high' else 1024,
|
|
|
"stream": False
|
|
|
}
|
|
|
|
|
|
headers = {
|
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
|
"Content-Type": "application/json"
|
|
|
}
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
try:
|
|
|
response = requests.post(
|
|
|
f"{self.base_url}/chat/completions",
|
|
|
headers=headers,
|
|
|
json=data,
|
|
|
timeout=90
|
|
|
)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
content = result['choices'][0]['message']['content']
|
|
|
tokens_used = result.get('usage', {}).get('total_tokens', 0)
|
|
|
|
|
|
specification = self._extract_cbmc_spec(content)
|
|
|
generation_time = time.time() - start_time
|
|
|
|
|
|
return {
|
|
|
'specification': specification,
|
|
|
'raw_content': content,
|
|
|
'generation_time': generation_time,
|
|
|
'tokens_used': tokens_used,
|
|
|
'success': True
|
|
|
}
|
|
|
else:
|
|
|
return {
|
|
|
'success': False,
|
|
|
'error': f"API错误: {response.status_code}"
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
'success': False,
|
|
|
'error': f"请求失败: {str(e)}"
|
|
|
}
|
|
|
|
|
|
def _get_system_prompt(self, complexity):
|
|
|
"""根据复杂度获取系统提示"""
|
|
|
if complexity == 'high':
|
|
|
return """你是一位资深的CBMC形式化验证专家,精通复杂的C/C++程序验证。
|
|
|
请生成精确、完整、可执行的CBMC规约,确保能够检测所有潜在的缺陷。
|
|
|
严格使用CBMC语法,包含所有必要的验证条件。"""
|
|
|
elif complexity == 'medium':
|
|
|
return """你是CBMC形式化验证专家,擅长为C/C++代码生成形式化规约。
|
|
|
请生成准确的CBMC规约,覆盖主要的验证目标。"""
|
|
|
else:
|
|
|
return """你是CBMC形式化验证助手,为简单的C函数生成基础规约。
|
|
|
生成简洁的CBMC规约,确保基本正确性。"""
|
|
|
|
|
|
def _build_prompt(self, function_info, verification_goals, complexity):
|
|
|
"""构建提示词"""
|
|
|
func_name = function_info.get('name', 'unknown')
|
|
|
return_type = function_info.get('return_type', 'void')
|
|
|
parameters = function_info.get('parameters', [])
|
|
|
source_code = function_info.get('source_code', '')
|
|
|
description = function_info.get('description', '')
|
|
|
|
|
|
param_str = ', '.join([f"{p['type']} {p['name']}" for p in parameters])
|
|
|
function_signature = f"{return_type} {func_name}({param_str})"
|
|
|
|
|
|
goals_text = ', '.join(verification_goals)
|
|
|
|
|
|
if complexity == 'high':
|
|
|
prompt = f"""函数:{description}
|
|
|
签名:{function_signature}
|
|
|
|
|
|
源代码:
|
|
|
```c
|
|
|
{source_code}
|
|
|
```
|
|
|
|
|
|
验证目标:{goals_text}
|
|
|
|
|
|
请生成完整的CBMC规约,包括:
|
|
|
- 详细的前置条件(参数有效性、内存安全、边界检查)
|
|
|
- 精确的后置条件(返回值、状态不变式)
|
|
|
- 赋值说明(修改的内存位置)
|
|
|
- 辅助断言(如果需要)
|
|
|
|
|
|
确保规约能够检测所有指定的验证目标。"""
|
|
|
else:
|
|
|
prompt = f"""函数:{func_name}
|
|
|
签名:{function_signature}
|
|
|
|
|
|
源代码:
|
|
|
```c
|
|
|
{source_code}
|
|
|
```
|
|
|
|
|
|
验证目标:{goals_text}
|
|
|
|
|
|
生成CBMC规约:
|
|
|
\\requires ...
|
|
|
\\ensures ...
|
|
|
\\assigns ..."""
|
|
|
|
|
|
return prompt
|
|
|
|
|
|
def _extract_cbmc_spec(self, content):
|
|
|
"""提取CBMC规约"""
|
|
|
lines = content.split('\n')
|
|
|
cbmc_lines = []
|
|
|
|
|
|
for line in lines:
|
|
|
line = line.strip()
|
|
|
if (line.startswith(r'\requires') or
|
|
|
line.startswith(r'\ensures') or
|
|
|
line.startswith(r'\assigns') or
|
|
|
line.startswith(r'\valid') or
|
|
|
line.startswith(r'\forall') or
|
|
|
line.startswith(r'\exists') or
|
|
|
line.startswith('/*') or
|
|
|
line.startswith('*/') or
|
|
|
line.startswith('@') or
|
|
|
line.startswith('#include') or
|
|
|
'=' in line or
|
|
|
line.endswith(';')):
|
|
|
cbmc_lines.append(line)
|
|
|
|
|
|
return '\n'.join(cbmc_lines) if cbmc_lines else content.strip()
|
|
|
|
|
|
def run_comprehensive_test(self):
|
|
|
"""运行综合测试"""
|
|
|
print("🚀 DeepSeek v3.1 综合代码规约化测试")
|
|
|
print("=" * 60)
|
|
|
print(f"📅 测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
print(f"🤖 模型: {self.model}")
|
|
|
print("=" * 60)
|
|
|
|
|
|
# 定义测试套件
|
|
|
test_suite = [
|
|
|
{
|
|
|
'category': '基础函数',
|
|
|
'functions': [
|
|
|
{
|
|
|
'name': 'basic_add',
|
|
|
'complexity': 'low',
|
|
|
'verification_goals': ['functional_correctness'],
|
|
|
'description': '基本加法函数',
|
|
|
'source_code': '''int basic_add(int a, int b) {
|
|
|
return a + b;
|
|
|
}'''
|
|
|
},
|
|
|
{
|
|
|
'name': 'basic_multiply',
|
|
|
'complexity': 'low',
|
|
|
'verification_goals': ['functional_correctness', 'integer_overflow'],
|
|
|
'description': '基本乘法函数',
|
|
|
'source_code': '''int basic_multiply(int a, int b) {
|
|
|
return a * b;
|
|
|
}'''
|
|
|
}
|
|
|
]
|
|
|
},
|
|
|
{
|
|
|
'category': '内存操作',
|
|
|
'functions': [
|
|
|
{
|
|
|
'name': 'memory_copy',
|
|
|
'complexity': 'medium',
|
|
|
'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds'],
|
|
|
'description': '内存拷贝函数',
|
|
|
'source_code': '''void memory_copy(void* dest, const void* src, size_t n) {
|
|
|
if (dest == NULL || src == NULL || n == 0) return;
|
|
|
|
|
|
unsigned char* d = (unsigned char*)dest;
|
|
|
const unsigned char* s = (const unsigned char*)src;
|
|
|
|
|
|
for (size_t i = 0; i < n; i++) {
|
|
|
d[i] = s[i];
|
|
|
}
|
|
|
}'''
|
|
|
}
|
|
|
]
|
|
|
},
|
|
|
{
|
|
|
'category': '错误处理',
|
|
|
'functions': [
|
|
|
{
|
|
|
'name': 'safe_array_access',
|
|
|
'complexity': 'medium',
|
|
|
'verification_goals': ['functional_correctness', 'memory_safety', 'error_handling', 'array_bounds'],
|
|
|
'description': '安全数组访问函数',
|
|
|
'source_code': '''bool safe_array_access(int* array, size_t size, size_t index, int* result) {
|
|
|
if (array == NULL || result == NULL) return false;
|
|
|
if (index >= size) return false;
|
|
|
|
|
|
*result = array[index];
|
|
|
return true;
|
|
|
}'''
|
|
|
}
|
|
|
]
|
|
|
},
|
|
|
{
|
|
|
'category': '复杂算法',
|
|
|
'functions': [
|
|
|
{
|
|
|
'name': 'binary_search',
|
|
|
'complexity': 'high',
|
|
|
'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds', 'error_handling'],
|
|
|
'description': '二分查找算法',
|
|
|
'source_code': '''int binary_search(const int* array, size_t size, int target) {
|
|
|
if (array == NULL || size == 0) return -1;
|
|
|
|
|
|
size_t left = 0;
|
|
|
size_t right = size - 1;
|
|
|
|
|
|
while (left <= right) {
|
|
|
size_t mid = left + (right - left) / 2;
|
|
|
|
|
|
if (array[mid] == target) {
|
|
|
return (int)mid;
|
|
|
} else if (array[mid] < target) {
|
|
|
left = mid + 1;
|
|
|
} else {
|
|
|
if (mid == 0) break;
|
|
|
right = mid - 1;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
return -1;
|
|
|
}'''
|
|
|
}
|
|
|
]
|
|
|
}
|
|
|
]
|
|
|
|
|
|
total_functions = sum(len(category['functions']) for category in test_suite)
|
|
|
successful_tests = 0
|
|
|
|
|
|
# 运行测试
|
|
|
for category in test_suite:
|
|
|
print(f"\n📂 {category['category']}")
|
|
|
print("-" * 40)
|
|
|
|
|
|
for func in category['functions']:
|
|
|
print(f"\n🔍 {func['name']} ({func['complexity'].upper()})")
|
|
|
print(f"📝 {func['description']}")
|
|
|
print(f"🎯 {', '.join(func['verification_goals'])}")
|
|
|
|
|
|
# 构建函数信息
|
|
|
function_info = {
|
|
|
'name': func['name'],
|
|
|
'return_type': 'int' if 'int' in func['source_code'].split('(')[0].split()[-1] else 'void',
|
|
|
'parameters': self._extract_parameters(func['source_code']),
|
|
|
'source_code': func['source_code'],
|
|
|
'description': func['description']
|
|
|
}
|
|
|
|
|
|
print("\n⏳ 生成规约中...")
|
|
|
result = self.generate_specification(
|
|
|
function_info,
|
|
|
func['verification_goals'],
|
|
|
func['complexity']
|
|
|
)
|
|
|
|
|
|
if result['success']:
|
|
|
successful_tests += 1
|
|
|
print(f"✅ 成功! ⏱️ {result['generation_time']:.2f}s, 🔤 {result['tokens_used']} tokens")
|
|
|
|
|
|
print(f"\n📋 CBMC规约:")
|
|
|
print("-" * 50)
|
|
|
print(result['specification'])
|
|
|
print("-" * 50)
|
|
|
|
|
|
# 保存结果
|
|
|
self.results.append({
|
|
|
'function': func['name'],
|
|
|
'category': category['category'],
|
|
|
'complexity': func['complexity'],
|
|
|
'verification_goals': func['verification_goals'],
|
|
|
'specification': result['specification'],
|
|
|
'generation_time': result['generation_time'],
|
|
|
'tokens_used': result['tokens_used'],
|
|
|
'timestamp': datetime.now().isoformat()
|
|
|
})
|
|
|
else:
|
|
|
print(f"❌ 失败: {result['error']}")
|
|
|
|
|
|
# 生成报告
|
|
|
self.generate_report(successful_tests, total_functions)
|
|
|
|
|
|
def _extract_parameters(self, source_code):
|
|
|
"""从源代码提取参数"""
|
|
|
# 简化的参数提取
|
|
|
params = []
|
|
|
param_section = source_code.split('(')[1].split(')')[0]
|
|
|
if param_section.strip():
|
|
|
for param in param_section.split(','):
|
|
|
param = param.strip()
|
|
|
if ' ' in param:
|
|
|
type_part, name_part = param.rsplit(' ', 1)
|
|
|
params.append({'type': type_part.strip(), 'name': name_part.strip()})
|
|
|
return params
|
|
|
|
|
|
def generate_report(self, successful, total):
|
|
|
"""生成测试报告"""
|
|
|
print(f"\n{'='*60}")
|
|
|
print("📊 综合测试报告")
|
|
|
print("="*60)
|
|
|
|
|
|
# 基本统计
|
|
|
success_rate = (successful / total) * 100 if total > 0 else 0
|
|
|
avg_time = sum(r['generation_time'] for r in self.results) / len(self.results) if self.results else 0
|
|
|
total_tokens = sum(r['tokens_used'] for r in self.results)
|
|
|
|
|
|
print(f"📈 测试统计:")
|
|
|
print(f" • 总函数数: {total}")
|
|
|
print(f" • 成功测试: {successful}")
|
|
|
print(f" • 成功率: {success_rate:.1f}%")
|
|
|
print(f" • 平均时间: {avg_time:.2f}s")
|
|
|
print(f" • 总Token消耗: {total_tokens}")
|
|
|
|
|
|
# 按复杂度统计
|
|
|
complexity_stats = {}
|
|
|
for result in self.results:
|
|
|
complexity = result['complexity']
|
|
|
if complexity not in complexity_stats:
|
|
|
complexity_stats[complexity] = {'count': 0, 'total_time': 0, 'total_tokens': 0}
|
|
|
complexity_stats[complexity]['count'] += 1
|
|
|
complexity_stats[complexity]['total_time'] += result['generation_time']
|
|
|
complexity_stats[complexity]['total_tokens'] += result['tokens_used']
|
|
|
|
|
|
print(f"\n🎯 复杂度分析:")
|
|
|
for complexity, stats in complexity_stats.items():
|
|
|
avg_time = stats['total_time'] / stats['count']
|
|
|
avg_tokens = stats['total_tokens'] / stats['count']
|
|
|
print(f" • {complexity.upper()}: {stats['count']}个函数, 平均 {avg_time:.1f}s, {avg_tokens:.0f} tokens")
|
|
|
|
|
|
# 按类别统计
|
|
|
category_stats = {}
|
|
|
for result in self.results:
|
|
|
category = result['category']
|
|
|
if category not in category_stats:
|
|
|
category_stats[category] = {'count': 0, 'total_time': 0}
|
|
|
category_stats[category]['count'] += 1
|
|
|
category_stats[category]['total_time'] += result['generation_time']
|
|
|
|
|
|
print(f"\n📂 类别分析:")
|
|
|
for category, stats in category_stats.items():
|
|
|
avg_time = stats['total_time'] / stats['count']
|
|
|
print(f" • {category}: {stats['count']}个函数, 平均 {avg_time:.1f}s")
|
|
|
|
|
|
# 保存详细报告
|
|
|
self.save_detailed_report()
|
|
|
|
|
|
print(f"\n🎉 综合测试完成!")
|
|
|
print(f"💾 详细报告已保存到: deepseek_test_report.json")
|
|
|
|
|
|
def save_detailed_report(self):
|
|
|
"""保存详细报告"""
|
|
|
report = {
|
|
|
'test_info': {
|
|
|
'model': self.model,
|
|
|
'test_time': datetime.now().isoformat(),
|
|
|
'total_functions': len(self.results)
|
|
|
},
|
|
|
'results': self.results
|
|
|
}
|
|
|
|
|
|
with open('deepseek_test_report.json', 'w', encoding='utf-8') as f:
|
|
|
json.dump(report, f, ensure_ascii=False, indent=2)
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""主函数"""
|
|
|
try:
|
|
|
tester = ComprehensiveDeepSeekTest()
|
|
|
tester.run_comprehensive_test()
|
|
|
except Exception as e:
|
|
|
print(f"❌ 测试失败: {str(e)}")
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main() |