You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cbmc/codedetect/tests/deepseek/scripts/advanced_deepseek_test.py

409 lines
14 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
高级DeepSeek v3.1代码规约化测试
测试更复杂的函数和验证场景
"""
import os
import requests
import json
import time
class AdvancedDeepSeekTest:
"""高级DeepSeek测试类"""
def __init__(self, api_key=None, base_url=None):
self.api_key = api_key or os.getenv('SILICONFLOW_API_KEY')
self.base_url = base_url or 'https://api.siliconflow.cn/v1'
self.model = 'deepseek-ai/DeepSeek-V3.1'
if not self.api_key:
raise ValueError("需要设置 SILICONFLOW_API_KEY 环境变量")
def generate_specification(self, function_info, verification_goals):
"""生成形式化规约"""
prompt = self._build_prompt(function_info, verification_goals)
messages = [
{
"role": "system",
"content": """你是一个专业的形式化验证专家专门为C/C++代码生成CBMC格式的形式化规约。
请生成准确、完整的CBMC规约严格遵循CBMC语法规范。
生成的规约应该:
1. 使用正确的CBMC语法\\requires, \\ensures, \\assigns等
2. 包含适当的前置条件和后置条件
3. 处理边界条件和错误情况
4. 只输出CBMC规约代码不要包含其他解释"""
},
{
"role": "user",
"content": prompt
}
]
data = {
"model": self.model,
"messages": messages,
"temperature": 0.2, # 更低温度,更精确的输出
"max_tokens": 2048,
"stream": False
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
start_time = time.time()
try:
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=data,
timeout=120
)
if response.status_code == 200:
result = response.json()
content = result['choices'][0]['message']['content']
tokens_used = result.get('usage', {}).get('total_tokens', 0)
# 后处理提取CBMC规约
specification = self._extract_cbmc_spec(content)
generation_time = time.time() - start_time
return {
'specification': specification,
'raw_content': content,
'generation_time': generation_time,
'tokens_used': tokens_used,
'success': True
}
else:
return {
'success': False,
'error': f"API错误: {response.status_code} - {response.text}"
}
except Exception as e:
return {
'success': False,
'error': f"请求失败: {str(e)}"
}
def _build_prompt(self, function_info, verification_goals):
"""构建提示词"""
func_name = function_info.get('name', 'unknown')
return_type = function_info.get('return_type', 'void')
parameters = function_info.get('parameters', [])
source_code = function_info.get('source_code', '')
includes = function_info.get('includes', [])
description = function_info.get('description', '')
# 构建函数签名
param_str = ', '.join([f"{p['type']} {p['name']}" for p in parameters])
function_signature = f"{return_type} {func_name}({param_str})"
# 构建验证目标描述
goal_descriptions = {
'functional_correctness': '功能正确性验证',
'memory_safety': '内存安全性验证',
'pointer_validity': '指针有效性验证',
'integer_overflow': '整数溢出检查',
'array_bounds': '数组边界检查',
'error_handling': '错误处理验证',
'concurrency': '并发安全性',
'data_race': '数据竞争检测',
'deadlock': '死锁检测',
'resource_leak': '资源泄漏检测'
}
goals_text = []
for goal in verification_goals:
description = goal_descriptions.get(goal, goal)
goals_text.append(f"- {description}")
includes_text = '\\n'.join(includes) if includes else ''
prompt = f"""请为以下C函数生成完整的CBMC形式化验证规约
{includes_text}
函数描述:{description}
函数签名:{function_signature}
源代码:
```c
{source_code}
```
验证目标:
{chr(10).join(goals_text)}
请生成完整的CBMC规约包含
1. \\requires 子句:前置条件(参数有效性、边界条件等)
2. \\ensures 子句:后置条件(返回值、状态变化等)
3. \\assigns 子句:赋值说明(修改的变量和内存位置)
4. 必要的辅助函数和断言
确保规约的完整性和准确性,能够检测所有指定的验证目标。
"""
return prompt
def _extract_cbmc_spec(self, content):
"""从生成内容中提取CBMC规约"""
lines = content.split('\n')
cbmc_lines = []
# 移除markdown代码块标记
in_code_block = False
for line in lines:
line = line.strip()
# 处理markdown代码块
if line.startswith('```'):
in_code_block = not in_code_block
continue
if in_code_block or not line.startswith('```'):
# 保留CBMC相关行和包含文件
if (line.startswith('#include') or
line.startswith('/*') or
line.startswith('*/') or
line.startswith('@') or
line.startswith(r'\requires') or
line.startswith(r'\ensures') or
line.startswith(r'\assigns') or
line.startswith(r'\valid') or
line.startswith(r'\forall') or
line.startswith(r'\exists') or
line.startswith(r'\invariant') or
line.startswith(r'\behavior') or
line.startswith('int') or
line.startswith('bool') or
line.startswith('size_t') or
'=' in line or
line.endswith(';') or
line.strip()):
cbmc_lines.append(line)
return '\n'.join(cbmc_lines) if cbmc_lines else content.strip()
def create_advanced_test_functions():
"""创建高级测试函数"""
return [
{
'name': 'array_copy_safe',
'return_type': 'bool',
'parameters': [
{'name': 'dest', 'type': 'int*'},
{'name': 'src', 'type': 'const int*'},
{'name': 'dest_size', 'type': 'size_t'},
{'name': 'src_size', 'type': 'size_t'},
{'name': 'copied', 'type': 'size_t*'}
],
'includes': ['#include <stddef.h>', '#include <stdbool.h>'],
'source_code': '''bool array_copy_safe(int *dest, const int *src, size_t dest_size, size_t src_size, size_t *copied) {
if (dest == NULL || src == NULL || copied == NULL) {
return false;
}
if (dest_size == 0 || src_size == 0) {
*copied = 0;
return true;
}
size_t copy_size = dest_size < src_size ? dest_size : src_size;
for (size_t i = 0; i < copy_size; i++) {
dest[i] = src[i];
}
*copied = copy_size;
return true;
}''',
'description': '安全的数组拷贝函数,检查所有边界条件',
'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds', 'error_handling']
},
{
'name': 'circular_buffer_push',
'return_type': 'bool',
'parameters': [
{'name': 'buffer', 'type': 'int*'},
{'name': 'capacity', 'type': 'size_t'},
{'name': 'head', 'type': 'size_t*'},
{'name': 'tail', 'type': 'size_t*'},
{'name': 'value', 'type': 'int'}
],
'includes': ['#include <stddef.h>', '#include <stdbool.h>'],
'source_code': '''bool circular_buffer_push(int *buffer, size_t capacity, size_t *head, size_t *tail, int value) {
if (buffer == NULL || head == NULL || tail == NULL || capacity == 0) {
return false;
}
size_t next_tail = (*tail + 1) % capacity;
if (next_tail == *head) {
return false; // 缓冲区已满
}
buffer[*tail] = value;
*tail = next_tail;
return true;
}''',
'description': '环形缓冲区推送操作',
'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds', 'error_handling', 'concurrency']
},
{
'name': 'thread_safe_counter',
'return_type': 'int',
'parameters': [
{'name': 'counter', 'type': 'volatile int*'},
{'name': 'increment', 'type': 'int'}
],
'includes': ['#include <stdatomic.h>'],
'source_code': '''int thread_safe_counter(volatile int *counter, int increment) {
if (counter == NULL) {
return 0;
}
int old_value, new_value;
do {
old_value = *counter;
new_value = old_value + increment;
} while (!__atomic_compare_exchange_n(counter, &old_value, new_value,
false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE));
return new_value;
}''',
'description': '线程安全的计数器操作',
'verification_goals': ['functional_correctness', 'concurrency', 'data_race', 'memory_safety']
}
]
def evaluate_specification_quality(spec, verification_goals):
"""评估规约质量"""
score = 0.0
max_score = len(verification_goals) * 2.0
# 检查基本CBMC语法
if r'\requires' in spec:
score += 0.5
if r'\ensures' in spec:
score += 0.5
if r'\assigns' in spec:
score += 0.3
# 检查验证目标覆盖
if 'memory_safety' in verification_goals and r'\valid' in spec:
score += 0.5
if 'array_bounds' in verification_goals and ('size_t' in spec or 'bound' in spec.lower()):
score += 0.5
if 'error_handling' in verification_goals and ('NULL' in spec or 'false' in spec):
score += 0.5
if 'concurrency' in verification_goals and ('atomic' in spec.lower() or 'volatile' in spec):
score += 0.5
# 检查完整性
if '&&' in spec or '||' in spec:
score += 0.3 # 逻辑条件
if '\\result' in spec:
score += 0.3 # 返回值检查
return min(score / max_score, 1.0)
def main():
"""主测试函数"""
print("🚀 DeepSeek v3.1 高级代码规约化测试")
print("=" * 60)
try:
# 创建测试实例
tester = AdvancedDeepSeekTest()
print(f"📡 连接模型: {tester.model}")
# 测试函数
test_functions = create_advanced_test_functions()
results = []
for i, func in enumerate(test_functions, 1):
print(f"\n{'='*25} 高级测试 {i} {'='*25}")
print(f"📝 函数: {func['name']}")
print(f"📖 描述: {func['description']}")
print(f"🎯 目标: {', '.join(func['verification_goals'])}")
print("\n⏳ 正在生成规约...")
result = tester.generate_specification(func, func['verification_goals'])
if result['success']:
print(f"✅ 生成成功!")
print(f"⏱️ 时间: {result['generation_time']:.2f}s")
print(f"🔤 Token: {result['tokens_used']}")
# 评估质量
quality_score = evaluate_specification_quality(
result['specification'],
func['verification_goals']
)
print(f"📊 质量评分: {quality_score:.2f}")
print(f"\n📋 生成的CBMC规约:")
print("-" * 50)
print(result['specification'])
print("-" * 50)
results.append({
'function': func['name'],
'quality': quality_score,
'time': result['generation_time'],
'tokens': result['tokens_used']
})
else:
print(f"❌ 生成失败: {result['error']}")
# 总结报告
print(f"\n{'='*60}")
print("📊 测试总结报告")
print("="*60)
if results:
avg_quality = sum(r['quality'] for r in results) / len(results)
avg_time = sum(r['time'] for r in results) / len(results)
total_tokens = sum(r['tokens'] for r in results)
print(f"📈 总体统计:")
print(f" • 测试函数数: {len(results)}")
print(f" • 平均质量评分: {avg_quality:.2f}")
print(f" • 平均生成时间: {avg_time:.2f}s")
print(f" • 总Token消耗: {total_tokens}")
print(f"\n🏆 详细结果:")
for result in results:
quality_emoji = "🥇" if result['quality'] >= 0.9 else "🥈" if result['quality'] >= 0.7 else "🥉"
print(f" {quality_emoji} {result['function']}: 质量 {result['quality']:.2f}, 时间 {result['time']:.2f}s")
print(f"\n🎉 高级测试完成!")
# 使用建议
print(f"\n💡 使用建议:")
print("1. 复杂函数建议降低temperature参数获得更精确结果")
print("2. 并发相关函数需要更详细的规约描述")
print("3. 建议结合实际CBMC工具验证生成的规约")
print("4. 可以根据具体需求调整提示词模板")
except Exception as e:
print(f"❌ 测试失败: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()