|
|
#!/usr/bin/env python3
|
|
|
"""
|
|
|
高级DeepSeek v3.1代码规约化测试
|
|
|
|
|
|
测试更复杂的函数和验证场景
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import requests
|
|
|
import json
|
|
|
import time
|
|
|
|
|
|
|
|
|
class AdvancedDeepSeekTest:
|
|
|
"""高级DeepSeek测试类"""
|
|
|
|
|
|
def __init__(self, api_key=None, base_url=None):
|
|
|
self.api_key = api_key or os.getenv('SILICONFLOW_API_KEY')
|
|
|
self.base_url = base_url or 'https://api.siliconflow.cn/v1'
|
|
|
self.model = 'deepseek-ai/DeepSeek-V3.1'
|
|
|
|
|
|
if not self.api_key:
|
|
|
raise ValueError("需要设置 SILICONFLOW_API_KEY 环境变量")
|
|
|
|
|
|
def generate_specification(self, function_info, verification_goals):
|
|
|
"""生成形式化规约"""
|
|
|
prompt = self._build_prompt(function_info, verification_goals)
|
|
|
|
|
|
messages = [
|
|
|
{
|
|
|
"role": "system",
|
|
|
"content": """你是一个专业的形式化验证专家,专门为C/C++代码生成CBMC格式的形式化规约。
|
|
|
请生成准确、完整的CBMC规约,严格遵循CBMC语法规范。
|
|
|
生成的规约应该:
|
|
|
1. 使用正确的CBMC语法(\\requires, \\ensures, \\assigns等)
|
|
|
2. 包含适当的前置条件和后置条件
|
|
|
3. 处理边界条件和错误情况
|
|
|
4. 只输出CBMC规约代码,不要包含其他解释"""
|
|
|
},
|
|
|
{
|
|
|
"role": "user",
|
|
|
"content": prompt
|
|
|
}
|
|
|
]
|
|
|
|
|
|
data = {
|
|
|
"model": self.model,
|
|
|
"messages": messages,
|
|
|
"temperature": 0.2, # 更低温度,更精确的输出
|
|
|
"max_tokens": 2048,
|
|
|
"stream": False
|
|
|
}
|
|
|
|
|
|
headers = {
|
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
|
"Content-Type": "application/json"
|
|
|
}
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
try:
|
|
|
response = requests.post(
|
|
|
f"{self.base_url}/chat/completions",
|
|
|
headers=headers,
|
|
|
json=data,
|
|
|
timeout=120
|
|
|
)
|
|
|
|
|
|
if response.status_code == 200:
|
|
|
result = response.json()
|
|
|
content = result['choices'][0]['message']['content']
|
|
|
tokens_used = result.get('usage', {}).get('total_tokens', 0)
|
|
|
|
|
|
# 后处理:提取CBMC规约
|
|
|
specification = self._extract_cbmc_spec(content)
|
|
|
|
|
|
generation_time = time.time() - start_time
|
|
|
|
|
|
return {
|
|
|
'specification': specification,
|
|
|
'raw_content': content,
|
|
|
'generation_time': generation_time,
|
|
|
'tokens_used': tokens_used,
|
|
|
'success': True
|
|
|
}
|
|
|
else:
|
|
|
return {
|
|
|
'success': False,
|
|
|
'error': f"API错误: {response.status_code} - {response.text}"
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
'success': False,
|
|
|
'error': f"请求失败: {str(e)}"
|
|
|
}
|
|
|
|
|
|
def _build_prompt(self, function_info, verification_goals):
|
|
|
"""构建提示词"""
|
|
|
func_name = function_info.get('name', 'unknown')
|
|
|
return_type = function_info.get('return_type', 'void')
|
|
|
parameters = function_info.get('parameters', [])
|
|
|
source_code = function_info.get('source_code', '')
|
|
|
includes = function_info.get('includes', [])
|
|
|
description = function_info.get('description', '')
|
|
|
|
|
|
# 构建函数签名
|
|
|
param_str = ', '.join([f"{p['type']} {p['name']}" for p in parameters])
|
|
|
function_signature = f"{return_type} {func_name}({param_str})"
|
|
|
|
|
|
# 构建验证目标描述
|
|
|
goal_descriptions = {
|
|
|
'functional_correctness': '功能正确性验证',
|
|
|
'memory_safety': '内存安全性验证',
|
|
|
'pointer_validity': '指针有效性验证',
|
|
|
'integer_overflow': '整数溢出检查',
|
|
|
'array_bounds': '数组边界检查',
|
|
|
'error_handling': '错误处理验证',
|
|
|
'concurrency': '并发安全性',
|
|
|
'data_race': '数据竞争检测',
|
|
|
'deadlock': '死锁检测',
|
|
|
'resource_leak': '资源泄漏检测'
|
|
|
}
|
|
|
|
|
|
goals_text = []
|
|
|
for goal in verification_goals:
|
|
|
description = goal_descriptions.get(goal, goal)
|
|
|
goals_text.append(f"- {description}")
|
|
|
|
|
|
includes_text = '\\n'.join(includes) if includes else ''
|
|
|
|
|
|
prompt = f"""请为以下C函数生成完整的CBMC形式化验证规约:
|
|
|
|
|
|
{includes_text}
|
|
|
|
|
|
函数描述:{description}
|
|
|
|
|
|
函数签名:{function_signature}
|
|
|
|
|
|
源代码:
|
|
|
```c
|
|
|
{source_code}
|
|
|
```
|
|
|
|
|
|
验证目标:
|
|
|
{chr(10).join(goals_text)}
|
|
|
|
|
|
请生成完整的CBMC规约,包含:
|
|
|
1. \\requires 子句:前置条件(参数有效性、边界条件等)
|
|
|
2. \\ensures 子句:后置条件(返回值、状态变化等)
|
|
|
3. \\assigns 子句:赋值说明(修改的变量和内存位置)
|
|
|
4. 必要的辅助函数和断言
|
|
|
|
|
|
确保规约的完整性和准确性,能够检测所有指定的验证目标。
|
|
|
"""
|
|
|
return prompt
|
|
|
|
|
|
def _extract_cbmc_spec(self, content):
|
|
|
"""从生成内容中提取CBMC规约"""
|
|
|
lines = content.split('\n')
|
|
|
cbmc_lines = []
|
|
|
|
|
|
# 移除markdown代码块标记
|
|
|
in_code_block = False
|
|
|
for line in lines:
|
|
|
line = line.strip()
|
|
|
|
|
|
# 处理markdown代码块
|
|
|
if line.startswith('```'):
|
|
|
in_code_block = not in_code_block
|
|
|
continue
|
|
|
|
|
|
if in_code_block or not line.startswith('```'):
|
|
|
# 保留CBMC相关行和包含文件
|
|
|
if (line.startswith('#include') or
|
|
|
line.startswith('/*') or
|
|
|
line.startswith('*/') or
|
|
|
line.startswith('@') or
|
|
|
line.startswith(r'\requires') or
|
|
|
line.startswith(r'\ensures') or
|
|
|
line.startswith(r'\assigns') or
|
|
|
line.startswith(r'\valid') or
|
|
|
line.startswith(r'\forall') or
|
|
|
line.startswith(r'\exists') or
|
|
|
line.startswith(r'\invariant') or
|
|
|
line.startswith(r'\behavior') or
|
|
|
line.startswith('int') or
|
|
|
line.startswith('bool') or
|
|
|
line.startswith('size_t') or
|
|
|
'=' in line or
|
|
|
line.endswith(';') or
|
|
|
line.strip()):
|
|
|
cbmc_lines.append(line)
|
|
|
|
|
|
return '\n'.join(cbmc_lines) if cbmc_lines else content.strip()
|
|
|
|
|
|
|
|
|
def create_advanced_test_functions():
|
|
|
"""创建高级测试函数"""
|
|
|
return [
|
|
|
{
|
|
|
'name': 'array_copy_safe',
|
|
|
'return_type': 'bool',
|
|
|
'parameters': [
|
|
|
{'name': 'dest', 'type': 'int*'},
|
|
|
{'name': 'src', 'type': 'const int*'},
|
|
|
{'name': 'dest_size', 'type': 'size_t'},
|
|
|
{'name': 'src_size', 'type': 'size_t'},
|
|
|
{'name': 'copied', 'type': 'size_t*'}
|
|
|
],
|
|
|
'includes': ['#include <stddef.h>', '#include <stdbool.h>'],
|
|
|
'source_code': '''bool array_copy_safe(int *dest, const int *src, size_t dest_size, size_t src_size, size_t *copied) {
|
|
|
if (dest == NULL || src == NULL || copied == NULL) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
if (dest_size == 0 || src_size == 0) {
|
|
|
*copied = 0;
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
size_t copy_size = dest_size < src_size ? dest_size : src_size;
|
|
|
|
|
|
for (size_t i = 0; i < copy_size; i++) {
|
|
|
dest[i] = src[i];
|
|
|
}
|
|
|
|
|
|
*copied = copy_size;
|
|
|
return true;
|
|
|
}''',
|
|
|
'description': '安全的数组拷贝函数,检查所有边界条件',
|
|
|
'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds', 'error_handling']
|
|
|
},
|
|
|
{
|
|
|
'name': 'circular_buffer_push',
|
|
|
'return_type': 'bool',
|
|
|
'parameters': [
|
|
|
{'name': 'buffer', 'type': 'int*'},
|
|
|
{'name': 'capacity', 'type': 'size_t'},
|
|
|
{'name': 'head', 'type': 'size_t*'},
|
|
|
{'name': 'tail', 'type': 'size_t*'},
|
|
|
{'name': 'value', 'type': 'int'}
|
|
|
],
|
|
|
'includes': ['#include <stddef.h>', '#include <stdbool.h>'],
|
|
|
'source_code': '''bool circular_buffer_push(int *buffer, size_t capacity, size_t *head, size_t *tail, int value) {
|
|
|
if (buffer == NULL || head == NULL || tail == NULL || capacity == 0) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
size_t next_tail = (*tail + 1) % capacity;
|
|
|
|
|
|
if (next_tail == *head) {
|
|
|
return false; // 缓冲区已满
|
|
|
}
|
|
|
|
|
|
buffer[*tail] = value;
|
|
|
*tail = next_tail;
|
|
|
return true;
|
|
|
}''',
|
|
|
'description': '环形缓冲区推送操作',
|
|
|
'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds', 'error_handling', 'concurrency']
|
|
|
},
|
|
|
{
|
|
|
'name': 'thread_safe_counter',
|
|
|
'return_type': 'int',
|
|
|
'parameters': [
|
|
|
{'name': 'counter', 'type': 'volatile int*'},
|
|
|
{'name': 'increment', 'type': 'int'}
|
|
|
],
|
|
|
'includes': ['#include <stdatomic.h>'],
|
|
|
'source_code': '''int thread_safe_counter(volatile int *counter, int increment) {
|
|
|
if (counter == NULL) {
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
int old_value, new_value;
|
|
|
do {
|
|
|
old_value = *counter;
|
|
|
new_value = old_value + increment;
|
|
|
} while (!__atomic_compare_exchange_n(counter, &old_value, new_value,
|
|
|
false, __ATOMIC_ACQ_REL, __ATOMIC_ACQUIRE));
|
|
|
|
|
|
return new_value;
|
|
|
}''',
|
|
|
'description': '线程安全的计数器操作',
|
|
|
'verification_goals': ['functional_correctness', 'concurrency', 'data_race', 'memory_safety']
|
|
|
}
|
|
|
]
|
|
|
|
|
|
|
|
|
def evaluate_specification_quality(spec, verification_goals):
|
|
|
"""评估规约质量"""
|
|
|
score = 0.0
|
|
|
max_score = len(verification_goals) * 2.0
|
|
|
|
|
|
# 检查基本CBMC语法
|
|
|
if r'\requires' in spec:
|
|
|
score += 0.5
|
|
|
if r'\ensures' in spec:
|
|
|
score += 0.5
|
|
|
if r'\assigns' in spec:
|
|
|
score += 0.3
|
|
|
|
|
|
# 检查验证目标覆盖
|
|
|
if 'memory_safety' in verification_goals and r'\valid' in spec:
|
|
|
score += 0.5
|
|
|
if 'array_bounds' in verification_goals and ('size_t' in spec or 'bound' in spec.lower()):
|
|
|
score += 0.5
|
|
|
if 'error_handling' in verification_goals and ('NULL' in spec or 'false' in spec):
|
|
|
score += 0.5
|
|
|
if 'concurrency' in verification_goals and ('atomic' in spec.lower() or 'volatile' in spec):
|
|
|
score += 0.5
|
|
|
|
|
|
# 检查完整性
|
|
|
if '&&' in spec or '||' in spec:
|
|
|
score += 0.3 # 逻辑条件
|
|
|
if '\\result' in spec:
|
|
|
score += 0.3 # 返回值检查
|
|
|
|
|
|
return min(score / max_score, 1.0)
|
|
|
|
|
|
|
|
|
def main():
|
|
|
"""主测试函数"""
|
|
|
print("🚀 DeepSeek v3.1 高级代码规约化测试")
|
|
|
print("=" * 60)
|
|
|
|
|
|
try:
|
|
|
# 创建测试实例
|
|
|
tester = AdvancedDeepSeekTest()
|
|
|
print(f"📡 连接模型: {tester.model}")
|
|
|
|
|
|
# 测试函数
|
|
|
test_functions = create_advanced_test_functions()
|
|
|
results = []
|
|
|
|
|
|
for i, func in enumerate(test_functions, 1):
|
|
|
print(f"\n{'='*25} 高级测试 {i} {'='*25}")
|
|
|
print(f"📝 函数: {func['name']}")
|
|
|
print(f"📖 描述: {func['description']}")
|
|
|
print(f"🎯 目标: {', '.join(func['verification_goals'])}")
|
|
|
|
|
|
print("\n⏳ 正在生成规约...")
|
|
|
result = tester.generate_specification(func, func['verification_goals'])
|
|
|
|
|
|
if result['success']:
|
|
|
print(f"✅ 生成成功!")
|
|
|
print(f"⏱️ 时间: {result['generation_time']:.2f}s")
|
|
|
print(f"🔤 Token: {result['tokens_used']}")
|
|
|
|
|
|
# 评估质量
|
|
|
quality_score = evaluate_specification_quality(
|
|
|
result['specification'],
|
|
|
func['verification_goals']
|
|
|
)
|
|
|
print(f"📊 质量评分: {quality_score:.2f}")
|
|
|
|
|
|
print(f"\n📋 生成的CBMC规约:")
|
|
|
print("-" * 50)
|
|
|
print(result['specification'])
|
|
|
print("-" * 50)
|
|
|
|
|
|
results.append({
|
|
|
'function': func['name'],
|
|
|
'quality': quality_score,
|
|
|
'time': result['generation_time'],
|
|
|
'tokens': result['tokens_used']
|
|
|
})
|
|
|
else:
|
|
|
print(f"❌ 生成失败: {result['error']}")
|
|
|
|
|
|
# 总结报告
|
|
|
print(f"\n{'='*60}")
|
|
|
print("📊 测试总结报告")
|
|
|
print("="*60)
|
|
|
|
|
|
if results:
|
|
|
avg_quality = sum(r['quality'] for r in results) / len(results)
|
|
|
avg_time = sum(r['time'] for r in results) / len(results)
|
|
|
total_tokens = sum(r['tokens'] for r in results)
|
|
|
|
|
|
print(f"📈 总体统计:")
|
|
|
print(f" • 测试函数数: {len(results)}")
|
|
|
print(f" • 平均质量评分: {avg_quality:.2f}")
|
|
|
print(f" • 平均生成时间: {avg_time:.2f}s")
|
|
|
print(f" • 总Token消耗: {total_tokens}")
|
|
|
|
|
|
print(f"\n🏆 详细结果:")
|
|
|
for result in results:
|
|
|
quality_emoji = "🥇" if result['quality'] >= 0.9 else "🥈" if result['quality'] >= 0.7 else "🥉"
|
|
|
print(f" {quality_emoji} {result['function']}: 质量 {result['quality']:.2f}, 时间 {result['time']:.2f}s")
|
|
|
|
|
|
print(f"\n🎉 高级测试完成!")
|
|
|
|
|
|
# 使用建议
|
|
|
print(f"\n💡 使用建议:")
|
|
|
print("1. 复杂函数建议降低temperature参数获得更精确结果")
|
|
|
print("2. 并发相关函数需要更详细的规约描述")
|
|
|
print("3. 建议结合实际CBMC工具验证生成的规约")
|
|
|
print("4. 可以根据具体需求调整提示词模板")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ 测试失败: {str(e)}")
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main() |