|
|
#!/usr/bin/env python3
|
|
|
"""
|
|
|
简化的DeepSeek v3.1代码规约化测试脚本
|
|
|
|
|
|
直接使用aiohttp调用DeepSeek API,无需复杂的依赖
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import asyncio
|
|
|
import aiohttp
|
|
|
import json
|
|
|
import time
|
|
|
|
|
|
|
|
|
class SimpleDeepSeekTest:
|
|
|
"""简化的DeepSeek测试类"""
|
|
|
|
|
|
def __init__(self, api_key=None, base_url=None):
|
|
|
self.api_key = api_key or os.getenv('SILICONFLOW_API_KEY')
|
|
|
self.base_url = base_url or 'https://api.siliconflow.cn/v1'
|
|
|
self.model = 'deepseek-ai/DeepSeek-V3.1'
|
|
|
|
|
|
if not self.api_key:
|
|
|
raise ValueError("需要设置 SILICONFLOW_API_KEY 环境变量")
|
|
|
|
|
|
async def generate_specification(self, function_info, verification_goals):
|
|
|
"""生成形式化规约"""
|
|
|
# 构建提示词
|
|
|
prompt = self._build_prompt(function_info, verification_goals)
|
|
|
|
|
|
# 准备请求数据
|
|
|
messages = [
|
|
|
{
|
|
|
"role": "system",
|
|
|
"content": "你是一个专业的形式化验证专家,专门为C/C++代码生成CBMC格式的形式化规约。请生成准确、完整的CBMC规约,只包含CBMC条款,不要包含其他解释。"
|
|
|
},
|
|
|
{
|
|
|
"role": "user",
|
|
|
"content": prompt
|
|
|
}
|
|
|
]
|
|
|
|
|
|
data = {
|
|
|
"model": self.model,
|
|
|
"messages": messages,
|
|
|
"temperature": 0.3,
|
|
|
"max_tokens": 1024,
|
|
|
"stream": False
|
|
|
}
|
|
|
|
|
|
headers = {
|
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
|
"Content-Type": "application/json"
|
|
|
}
|
|
|
|
|
|
start_time = time.time()
|
|
|
|
|
|
try:
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
async with session.post(
|
|
|
f"{self.base_url}/chat/completions",
|
|
|
headers=headers,
|
|
|
json=data,
|
|
|
timeout=aiohttp.ClientTimeout(total=60)
|
|
|
) as response:
|
|
|
|
|
|
if response.status == 200:
|
|
|
result = await response.json()
|
|
|
content = result['choices'][0]['message']['content']
|
|
|
tokens_used = result.get('usage', {}).get('total_tokens', 0)
|
|
|
|
|
|
# 后处理:提取CBMC规约
|
|
|
specification = self._extract_cbmc_spec(content)
|
|
|
|
|
|
generation_time = time.time() - start_time
|
|
|
|
|
|
return {
|
|
|
'specification': specification,
|
|
|
'raw_content': content,
|
|
|
'generation_time': generation_time,
|
|
|
'tokens_used': tokens_used,
|
|
|
'success': True
|
|
|
}
|
|
|
else:
|
|
|
error_text = await response.text()
|
|
|
return {
|
|
|
'success': False,
|
|
|
'error': f"API错误: {response.status} - {error_text}"
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
'success': False,
|
|
|
'error': f"请求失败: {str(e)}"
|
|
|
}
|
|
|
|
|
|
def _build_prompt(self, function_info, verification_goals):
|
|
|
"""构建提示词"""
|
|
|
func_name = function_info.get('name', 'unknown')
|
|
|
return_type = function_info.get('return_type', 'void')
|
|
|
parameters = function_info.get('parameters', [])
|
|
|
source_code = function_info.get('source_code', '')
|
|
|
|
|
|
# 构建函数签名
|
|
|
param_str = ', '.join([f"{p['type']} {p['name']}" for p in parameters])
|
|
|
function_signature = f"{return_type} {func_name}({param_str})"
|
|
|
|
|
|
# 构建验证目标描述
|
|
|
goal_descriptions = {
|
|
|
'functional_correctness': '功能正确性验证',
|
|
|
'memory_safety': '内存安全性验证',
|
|
|
'pointer_validity': '指针有效性验证',
|
|
|
'integer_overflow': '整数溢出检查',
|
|
|
'array_bounds': '数组边界检查',
|
|
|
'error_handling': '错误处理验证'
|
|
|
}
|
|
|
|
|
|
goals_text = []
|
|
|
for goal in verification_goals:
|
|
|
description = goal_descriptions.get(goal, goal)
|
|
|
goals_text.append(f"- {description}")
|
|
|
|
|
|
prompt = f"""
|
|
|
请为以下C函数生成CBMC形式化验证规约:
|
|
|
|
|
|
函数签名:{function_signature}
|
|
|
|
|
|
源代码:
|
|
|
```c
|
|
|
{source_code}
|
|
|
```
|
|
|
|
|
|
验证目标:
|
|
|
{chr(10).join(goals_text)}
|
|
|
|
|
|
请生成完整的CBMC规约,包含:
|
|
|
1. \\requires 子句:前置条件
|
|
|
2. \\ensures 子句:后置条件
|
|
|
3. \\assigns 子句:赋值说明(如果需要)
|
|
|
|
|
|
只输出CBMC规约,不要包含其他解释。
|
|
|
"""
|
|
|
return prompt
|
|
|
|
|
|
def _extract_cbmc_spec(self, content):
|
|
|
"""从生成内容中提取CBMC规约"""
|
|
|
lines = content.split('\n')
|
|
|
cbmc_lines = []
|
|
|
|
|
|
for line in lines:
|
|
|
line = line.strip()
|
|
|
# 保留CBMC条款
|
|
|
if (line.startswith(r'\requires') or
|
|
|
line.startswith(r'\ensures') or
|
|
|
line.startswith(r'\assigns') or
|
|
|
line.startswith(r'\valid') or
|
|
|
line.startswith(r'\forall') or
|
|
|
line.startswith(r'\exists')):
|
|
|
cbmc_lines.append(line)
|
|
|
|
|
|
return '\n'.join(cbmc_lines) if cbmc_lines else content.strip()
|
|
|
|
|
|
async def health_check(self):
|
|
|
"""健康检查"""
|
|
|
try:
|
|
|
async with aiohttp.ClientSession() as session:
|
|
|
async with session.post(
|
|
|
f"{self.base_url}/chat/completions",
|
|
|
headers={
|
|
|
"Authorization": f"Bearer {self.api_key}",
|
|
|
"Content-Type": "application/json"
|
|
|
},
|
|
|
json={
|
|
|
"model": self.model,
|
|
|
"messages": [{"role": "user", "content": "test"}],
|
|
|
"max_tokens": 5
|
|
|
},
|
|
|
timeout=aiohttp.ClientTimeout(total=10)
|
|
|
) as response:
|
|
|
|
|
|
return {
|
|
|
'status': 'healthy' if response.status == 200 else 'unhealthy',
|
|
|
'response_time': 0.0, # 简化版本
|
|
|
'status_code': response.status
|
|
|
}
|
|
|
except Exception as e:
|
|
|
return {
|
|
|
'status': 'unhealthy',
|
|
|
'error': str(e)
|
|
|
}
|
|
|
|
|
|
|
|
|
def create_test_functions():
|
|
|
"""创建测试函数"""
|
|
|
return [
|
|
|
{
|
|
|
'name': 'add_numbers',
|
|
|
'return_type': 'int',
|
|
|
'parameters': [
|
|
|
{'name': 'a', 'type': 'int'},
|
|
|
{'name': 'b', 'type': 'int'}
|
|
|
],
|
|
|
'source_code': 'int add_numbers(int a, int b) { return a + b; }',
|
|
|
'verification_goals': ['functional_correctness']
|
|
|
},
|
|
|
{
|
|
|
'name': 'safe_divide',
|
|
|
'return_type': 'bool',
|
|
|
'parameters': [
|
|
|
{'name': 'a', 'type': 'int'},
|
|
|
{'name': 'b', 'type': 'int'},
|
|
|
{'name': 'result', 'type': 'int*'}
|
|
|
],
|
|
|
'source_code': '''
|
|
|
bool safe_divide(int a, int b, int *result) {
|
|
|
if (b == 0 || result == NULL) {
|
|
|
return false;
|
|
|
}
|
|
|
*result = a / b;
|
|
|
return true;
|
|
|
}''',
|
|
|
'verification_goals': ['functional_correctness', 'memory_safety', 'error_handling']
|
|
|
}
|
|
|
]
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
"""主测试函数"""
|
|
|
print("🚀 DeepSeek v3.1 代码规约化简化测试")
|
|
|
print("=" * 50)
|
|
|
|
|
|
# 检查API密钥
|
|
|
if not os.getenv('SILICONFLOW_API_KEY'):
|
|
|
print("❌ 错误: 未设置 SILICONFLOW_API_KEY 环境变量")
|
|
|
print("请设置: export SILICONFLOW_API_KEY=your_api_key")
|
|
|
return
|
|
|
|
|
|
try:
|
|
|
# 创建测试实例
|
|
|
tester = SimpleDeepSeekTest()
|
|
|
print(f"📡 连接模型: {tester.model}")
|
|
|
|
|
|
# 健康检查
|
|
|
print("\n🔍 检查API状态...")
|
|
|
health = await tester.health_check()
|
|
|
if health['status'] == 'healthy':
|
|
|
print("✅ API健康")
|
|
|
else:
|
|
|
print(f"❌ API不健康: {health.get('error', '未知错误')}")
|
|
|
return
|
|
|
|
|
|
# 测试函数
|
|
|
test_functions = create_test_functions()
|
|
|
|
|
|
for i, func in enumerate(test_functions, 1):
|
|
|
print(f"\n{'='*20} 测试 {i} {'='*20}")
|
|
|
print(f"📝 函数: {func['name']}")
|
|
|
print(f"🎯 目标: {', '.join(func['verification_goals'])}")
|
|
|
|
|
|
print("\n⏳ 正在生成规约...")
|
|
|
result = await tester.generate_specification(func, func['verification_goals'])
|
|
|
|
|
|
if result['success']:
|
|
|
print(f"✅ 生成成功!")
|
|
|
print(f"⏱️ 时间: {result['generation_time']:.2f}s")
|
|
|
print(f"🔤 Token: {result['tokens_used']}")
|
|
|
|
|
|
print(f"\n📋 生成的CBMC规约:")
|
|
|
print("-" * 40)
|
|
|
print(result['specification'])
|
|
|
print("-" * 40)
|
|
|
|
|
|
print(f"\n📄 原始输出:")
|
|
|
print("-" * 40)
|
|
|
print(result['raw_content'])
|
|
|
print("-" * 40)
|
|
|
else:
|
|
|
print(f"❌ 生成失败: {result['error']}")
|
|
|
|
|
|
print(f"\n🎉 测试完成!")
|
|
|
|
|
|
except Exception as e:
|
|
|
print(f"❌ 测试失败: {str(e)}")
|
|
|
import traceback
|
|
|
traceback.print_exc()
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
asyncio.run(main()) |