You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
273 lines
8.5 KiB
273 lines
8.5 KiB
from flask import Flask, request, jsonify, render_template, send_from_directory
|
|
from flask_cors import CORS
|
|
import os
|
|
import sys
|
|
import json
|
|
import subprocess
|
|
import threading
|
|
import time
|
|
from datetime import datetime
|
|
import uuid
|
|
|
|
# 添加父目录到路径以导入SpecGen模块
|
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
app = Flask(__name__, static_folder='../frontend/build', static_url_path='')
|
|
CORS(app)
|
|
|
|
# 全局变量存储任务状态
|
|
active_tasks = {}
|
|
|
|
class SpecGenTask:
|
|
def __init__(self, task_id, input_code, options=None):
|
|
self.task_id = task_id
|
|
self.input_code = input_code
|
|
self.options = options or {}
|
|
self.status = 'pending'
|
|
self.progress = 0
|
|
self.result = None
|
|
self.error = None
|
|
self.created_at = datetime.now()
|
|
self.started_at = None
|
|
self.completed_at = None
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'task_id': self.task_id,
|
|
'status': self.status,
|
|
'progress': self.progress,
|
|
'result': self.result,
|
|
'error': self.error,
|
|
'created_at': self.created_at.isoformat(),
|
|
'started_at': self.started_at.isoformat() if self.started_at else None,
|
|
'completed_at': self.completed_at.isoformat() if self.completed_at else None,
|
|
'options': self.options
|
|
}
|
|
|
|
@app.route('/')
|
|
def index():
|
|
return send_from_directory('..', 'index.html')
|
|
|
|
@app.route('/api/tasks', methods=['POST'])
|
|
def create_task():
|
|
"""创建新的代码验证任务"""
|
|
try:
|
|
data = request.get_json()
|
|
input_code = data.get('input_code', '')
|
|
options = data.get('options', {})
|
|
|
|
if not input_code.strip():
|
|
return jsonify({'error': '输入代码不能为空'}), 400
|
|
|
|
task_id = str(uuid.uuid4())
|
|
task = SpecGenTask(task_id, input_code, options)
|
|
active_tasks[task_id] = task
|
|
|
|
# 启动后台任务处理
|
|
thread = threading.Thread(target=process_task, args=(task,))
|
|
thread.daemon = True
|
|
thread.start()
|
|
|
|
return jsonify({'task_id': task_id, 'status': 'pending'}), 201
|
|
|
|
except Exception as e:
|
|
return jsonify({'error': f'创建任务失败: {str(e)}'}), 500
|
|
|
|
@app.route('/api/tasks/<task_id>', methods=['GET'])
|
|
def get_task_status(task_id):
|
|
"""获取任务状态"""
|
|
if task_id not in active_tasks:
|
|
return jsonify({'error': '任务不存在'}), 404
|
|
|
|
return jsonify(active_tasks[task_id].to_dict())
|
|
|
|
@app.route('/api/tasks/<task_id>/result', methods=['GET'])
|
|
def get_task_result(task_id):
|
|
"""获取任务结果详情"""
|
|
if task_id not in active_tasks:
|
|
return jsonify({'error': '任务不存在'}), 404
|
|
|
|
task = active_tasks[task_id]
|
|
if task.status != 'completed':
|
|
return jsonify({'error': '任务尚未完成'}), 400
|
|
|
|
return jsonify({
|
|
'task_id': task_id,
|
|
'input_code': task.input_code,
|
|
'generated_code': task.result.get('generated_code'),
|
|
'verification_result': task.result.get('verification_result'),
|
|
'summary': task.result.get('summary'),
|
|
'analysis_report': task.result.get('analysis_report'),
|
|
'analysis_generated': task.result.get('analysis_generated', False),
|
|
'analysis_error': task.result.get('analysis_error'),
|
|
'options': task.options
|
|
})
|
|
|
|
@app.route('/api/tasks/<task_id>/analysis', methods=['GET'])
|
|
def get_task_analysis(task_id):
|
|
"""获取任务分析报告"""
|
|
if task_id not in active_tasks:
|
|
return jsonify({'error': '任务不存在'}), 404
|
|
|
|
task = active_tasks[task_id]
|
|
if task.status != 'completed':
|
|
return jsonify({'error': '任务尚未完成'}), 400
|
|
|
|
return jsonify({
|
|
'task_id': task_id,
|
|
'analysis_report': task.result.get('analysis_report'),
|
|
'analysis_generated': task.result.get('analysis_generated', False),
|
|
'analysis_error': task.result.get('analysis_error')
|
|
})
|
|
|
|
@app.route('/api/tasks', methods=['GET'])
|
|
def list_tasks():
|
|
"""获取所有任务列表"""
|
|
tasks = [task.to_dict() for task in active_tasks.values()]
|
|
return jsonify(sorted(tasks, key=lambda x: x['created_at'], reverse=True))
|
|
|
|
def process_task(task):
|
|
"""后台处理SpecGen任务"""
|
|
try:
|
|
task.status = 'running'
|
|
task.started_at = datetime.now()
|
|
task.progress = 10
|
|
|
|
# 创建临时文件
|
|
temp_dir = f'temp/{task.task_id}'
|
|
os.makedirs(temp_dir, exist_ok=True)
|
|
|
|
input_file = os.path.join(temp_dir, 'input.c')
|
|
output_file = os.path.join(temp_dir, 'output.c')
|
|
|
|
# 写入输入代码
|
|
with open(input_file, 'w', encoding='utf-8') as f:
|
|
f.write(task.input_code)
|
|
|
|
task.progress = 20
|
|
|
|
# 使用演示版本的代码生成器
|
|
try:
|
|
from demo_code import generate_demo_result
|
|
basic_result = generate_demo_result(task.input_code)
|
|
|
|
task.progress = 60
|
|
|
|
# 生成AI分析报告
|
|
try:
|
|
from simple_analysis import generate_demo_analysis_report
|
|
analysis_result = generate_demo_analysis_report(
|
|
task.input_code,
|
|
basic_result['generated_code'],
|
|
basic_result['verification_result']['output']
|
|
)
|
|
|
|
if analysis_result['success']:
|
|
# 将分析报告添加到结果中
|
|
basic_result['analysis_report'] = analysis_result['report']
|
|
basic_result['analysis_generated'] = True
|
|
else:
|
|
basic_result['analysis_report'] = None
|
|
basic_result['analysis_generated'] = False
|
|
basic_result['analysis_error'] = analysis_result.get('error', '未知错误')
|
|
|
|
except Exception as e:
|
|
basic_result['analysis_report'] = None
|
|
basic_result['analysis_generated'] = False
|
|
basic_result['analysis_error'] = f'分析报告生成失败: {str(e)}'
|
|
|
|
task.result = basic_result
|
|
task.progress = 100
|
|
task.status = 'completed'
|
|
task.completed_at = datetime.now()
|
|
|
|
except Exception as e:
|
|
task.status = 'failed'
|
|
task.error = f'演示代码生成失败: {str(e)}'
|
|
task.completed_at = datetime.now()
|
|
return
|
|
|
|
# 清理临时文件
|
|
try:
|
|
import shutil
|
|
shutil.rmtree(temp_dir)
|
|
except:
|
|
pass
|
|
|
|
except subprocess.TimeoutExpired:
|
|
task.status = 'failed'
|
|
task.error = '任务执行超时'
|
|
task.completed_at = datetime.now()
|
|
except Exception as e:
|
|
task.status = 'failed'
|
|
task.error = f'任务执行失败: {str(e)}'
|
|
task.completed_at = datetime.now()
|
|
|
|
@app.route('/api/demo/sample', methods=['GET'])
|
|
def get_demo_sample():
|
|
"""获取示例代码"""
|
|
sample_code = '''#include <stdio.h>
|
|
|
|
/**
|
|
* Simple function to calculate the sum of an array
|
|
*/
|
|
int array_sum(int* arr, int size) {
|
|
int sum = 0;
|
|
for (int i = 0; i < size; i++) {
|
|
sum += arr[i];
|
|
}
|
|
return sum;
|
|
}
|
|
|
|
/**
|
|
* Function to find the maximum value in an array
|
|
*/
|
|
int find_max(int* arr, int size) {
|
|
if (size <= 0) {
|
|
return 0;
|
|
}
|
|
|
|
int max = arr[0];
|
|
for (int i = 1; i < size; i++) {
|
|
if (arr[i] > max) {
|
|
max = arr[i];
|
|
}
|
|
}
|
|
return max;
|
|
}
|
|
|
|
int main() {
|
|
int test_array[] = {1, 2, 3, 4, 5};
|
|
int size = 5;
|
|
|
|
int sum = array_sum(test_array, size);
|
|
int max = find_max(test_array, size);
|
|
|
|
printf("Sum: %d, Max: %d\\n", sum, max);
|
|
|
|
return 0;
|
|
}'''
|
|
|
|
return jsonify({'sample_code': sample_code})
|
|
|
|
@app.route('/api/system/info', methods=['GET'])
|
|
def get_system_info():
|
|
"""获取系统信息"""
|
|
try:
|
|
# 检查CBMC版本
|
|
cbmc_result = subprocess.run(['cbmc', '--version'], capture_output=True, text=True)
|
|
cbmc_version = cbmc_result.stdout.strip() if cbmc_result.returncode == 0 else 'Unknown'
|
|
|
|
return jsonify({
|
|
'cbmc_version': cbmc_version,
|
|
'specgen_version': '2.0',
|
|
'active_tasks': len([t for t in active_tasks.values() if t.status == 'running']),
|
|
'completed_tasks': len([t for t in active_tasks.values() if t.status == 'completed']),
|
|
'failed_tasks': len([t for t in active_tasks.values() if t.status == 'failed'])
|
|
})
|
|
except Exception as e:
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
if __name__ == '__main__':
|
|
os.makedirs('temp', exist_ok=True)
|
|
app.run(debug=True, host='0.0.0.0', port=8080) |