You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cbmc/codedetect/tests/deepseek/scripts/final_deepseek_demo.py

282 lines
9.3 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
DeepSeek v3.1 最终演示脚本
展示实际API调用的完整功能
"""
import os
import requests
import json
import time
from datetime import datetime
class FinalDeepSeekDemo:
"""最终演示类"""
def __init__(self):
self.api_key = os.getenv('SILICONFLOW_API_KEY')
self.base_url = 'https://api.siliconflow.cn/v1'
self.model = 'deepseek-ai/DeepSeek-V3.1'
if not self.api_key:
raise ValueError("需要设置 SILICONFLOW_API_KEY 环境变量")
def generate_specification(self, function_info, verification_goals):
"""生成形式化规约"""
messages = [
{
"role": "system",
"content": "你是CBMC形式化验证专家为C/C++代码生成准确的CBMC规约。只输出CBMC代码不要解释。"
},
{
"role": "user",
"content": f"""函数:{function_info['name']}
{function_info['description']}
源代码:
{function_info['source_code']}
验证目标:{', '.join(verification_goals)}
生成CBMC规约
\\requires ...
\\ensures ...
\\assigns ..."""
}
]
data = {
"model": self.model,
"messages": messages,
"temperature": 0.1,
"max_tokens": 1024,
"stream": False
}
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
try:
response = requests.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=data,
timeout=45
)
if response.status_code == 200:
result = response.json()
content = result['choices'][0]['message']['content']
tokens_used = result.get('usage', {}).get('total_tokens', 0)
generation_time = time.time()
# 提取CBMC规约
lines = content.split('\n')
cbmc_lines = []
for line in lines:
line = line.strip()
if (line.startswith(r'\requires') or
line.startswith(r'\ensures') or
line.startswith(r'\assigns') or
line.startswith(r'\valid')):
cbmc_lines.append(line)
specification = '\n'.join(cbmc_lines) if cbmc_lines else content.strip()
generation_time = time.time() - generation_time
return {
'specification': specification,
'raw_content': content,
'generation_time': generation_time,
'tokens_used': tokens_used,
'success': True
}
else:
return {'success': False, 'error': f'API错误: {response.status_code}'}
except Exception as e:
return {'success': False, 'error': f'请求失败: {str(e)}'}
def run_demo(self):
"""运行演示"""
print("🚀 DeepSeek v3.1 代码规约化 - 实际API演示")
print("=" * 55)
print(f"📅 时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"🤖 模型: {self.model}")
print(f"🔑 API密钥: {self.api_key[:20]}...")
print("=" * 55)
# 测试函数集合
demo_functions = [
{
'name': 'basic_max',
'description': '求最大值函数',
'source_code': 'int basic_max(int a, int b) { return a > b ? a : b; }',
'verification_goals': ['functional_correctness']
},
{
'name': 'safe_string_copy',
'description': '安全字符串拷贝',
'source_code': '''size_t safe_string_copy(char* dest, const char* src, size_t dest_size) {
if (dest == NULL || src == NULL || dest_size == 0) return 0;
size_t i;
for (i = 0; i < dest_size - 1 && src[i] != '\\0'; i++) {
dest[i] = src[i];
}
dest[i] = '\\0';
return i;
}''',
'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds', 'error_handling']
},
{
'name': 'bubble_sort',
'description': '冒泡排序算法',
'source_code': '''void bubble_sort(int* array, size_t size) {
if (array == NULL || size <= 1) return;
for (size_t i = 0; i < size - 1; i++) {
for (size_t j = 0; j < size - i - 1; j++) {
if (array[j] > array[j + 1]) {
int temp = array[j];
array[j] = array[j + 1];
array[j + 1] = temp;
}
}
}
}''',
'verification_goals': ['functional_correctness', 'memory_safety', 'array_bounds']
}
]
successful_tests = 0
results = []
for i, func in enumerate(demo_functions, 1):
print(f"\n{'='*25} 演示 {i}/3 {'='*25}")
print(f"📝 函数: {func['name']}")
print(f"📖 描述: {func['description']}")
print(f"🎯 目标: {', '.join(func['verification_goals'])}")
print("\n⏳ 正在调用DeepSeek v3.1 API...")
result = self.generate_specification(func, func['verification_goals'])
if result['success']:
successful_tests += 1
print(f"✅ API调用成功!")
print(f"⏱️ 响应时间: {result['generation_time']:.2f}s")
print(f"🔤 消耗Token: {result['tokens_used']}")
print(f"\n📋 生成的CBMC规约:")
print("-" * 45)
print(result['specification'])
print("-" * 45)
print(f"\n📄 完整API响应:")
print("-" * 30)
print(result['raw_content'])
print("-" * 30)
results.append({
'function': func['name'],
'success': True,
'time': result['generation_time'],
'tokens': result['tokens_used'],
'specification': result['specification']
})
else:
print(f"❌ API调用失败: {result['error']}")
results.append({
'function': func['name'],
'success': False,
'error': result['error']
})
# 总结报告
print(f"\n{'='*55}")
print("📊 演示总结报告")
print("="*55)
print(f"📈 统计信息:")
print(f" • 测试函数数: {len(demo_functions)}")
print(f" • 成功调用: {successful_tests}")
print(f" • 成功率: {(successful_tests/len(demo_functions)*100):.1f}%")
if results:
avg_time = sum(r['time'] for r in results if r['success']) / successful_tests if successful_tests > 0 else 0
total_tokens = sum(r['tokens'] for r in results if r['success'])
print(f" • 平均响应时间: {avg_time:.2f}s")
print(f" • 总Token消耗: {total_tokens}")
print(f"\n🏆 详细结果:")
for result in results:
if result['success']:
status = ""
details = f"{result['time']:.1f}s, {result['tokens']} tokens"
else:
status = ""
details = result['error']
print(f" {status} {result['function']}: {details}")
# 功能展示
print(f"\n🔧 DeepSeek v3.1 代码规约化功能:")
print(" ✅ 自动CBMC规约生成")
print(" ✅ 多维度验证目标支持")
print(" ✅ 复杂度自适应")
print(" ✅ 实时API调用")
print(" ✅ 质量评估")
print(" ✅ 错误处理")
print(f"\n🎯 生成的规约特征:")
print("\\requires: 前置条件和约束")
print("\\ensures: 后置条件和保证")
print("\\assigns: 内存修改说明")
print("\\valid: 有效性检查")
print(" • 逻辑表达式: 复杂条件验证")
print(f"\n💡 实际应用价值:")
print(" 🛡️ 形式化验证自动化")
print(" 🐛 缺陷检测和预防")
print(" ⚡ 代码质量提升")
print(" 📊 验证覆盖率分析")
print(" 🔧 开发效率优化")
# 保存结果
self.save_results(results)
print(f"\n🎉 演示完成! 结果已保存到: deepseek_demo_results.json")
def save_results(self, results):
"""保存演示结果"""
demo_data = {
'demo_info': {
'model': self.model,
'timestamp': datetime.now().isoformat(),
'total_functions': len(results)
},
'results': results
}
with open('deepseek_demo_results.json', 'w', encoding='utf-8') as f:
json.dump(demo_data, f, ensure_ascii=False, indent=2)
def main():
"""主函数"""
try:
demo = FinalDeepSeekDemo()
demo.run_demo()
except Exception as e:
print(f"❌ 演示失败: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()