from flask import Flask, request, jsonify, render_template, send_from_directory from flask_cors import CORS import os import sys import json import subprocess import threading import time from datetime import datetime import uuid # 添加父目录到路径以导入SpecGen模块 sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) app = Flask(__name__, static_folder='../frontend/build', static_url_path='') CORS(app) # 全局变量存储任务状态 active_tasks = {} class SpecGenTask: def __init__(self, task_id, input_code, options=None): self.task_id = task_id self.input_code = input_code self.options = options or {} self.status = 'pending' self.progress = 0 self.result = None self.error = None self.created_at = datetime.now() self.started_at = None self.completed_at = None def to_dict(self): return { 'task_id': self.task_id, 'status': self.status, 'progress': self.progress, 'result': self.result, 'error': self.error, 'created_at': self.created_at.isoformat(), 'started_at': self.started_at.isoformat() if self.started_at else None, 'completed_at': self.completed_at.isoformat() if self.completed_at else None, 'options': self.options } @app.route('/') def index(): return send_from_directory('..', 'index.html') @app.route('/api/tasks', methods=['POST']) def create_task(): """创建新的代码验证任务""" try: data = request.get_json() input_code = data.get('input_code', '') options = data.get('options', {}) if not input_code.strip(): return jsonify({'error': '输入代码不能为空'}), 400 task_id = str(uuid.uuid4()) task = SpecGenTask(task_id, input_code, options) active_tasks[task_id] = task # 启动后台任务处理 thread = threading.Thread(target=process_task, args=(task,)) thread.daemon = True thread.start() return jsonify({'task_id': task_id, 'status': 'pending'}), 201 except Exception as e: return jsonify({'error': f'创建任务失败: {str(e)}'}), 500 @app.route('/api/tasks/', methods=['GET']) def get_task_status(task_id): """获取任务状态""" if task_id not in active_tasks: return jsonify({'error': '任务不存在'}), 404 return jsonify(active_tasks[task_id].to_dict()) @app.route('/api/tasks//result', methods=['GET']) def get_task_result(task_id): """获取任务结果详情""" if task_id not in active_tasks: return jsonify({'error': '任务不存在'}), 404 task = active_tasks[task_id] if task.status != 'completed': return jsonify({'error': '任务尚未完成'}), 400 return jsonify({ 'task_id': task_id, 'input_code': task.input_code, 'generated_code': task.result.get('generated_code'), 'verification_result': task.result.get('verification_result'), 'summary': task.result.get('summary'), 'analysis_report': task.result.get('analysis_report'), 'analysis_generated': task.result.get('analysis_generated', False), 'analysis_error': task.result.get('analysis_error'), 'options': task.options }) @app.route('/api/tasks//analysis', methods=['GET']) def get_task_analysis(task_id): """获取任务分析报告""" if task_id not in active_tasks: return jsonify({'error': '任务不存在'}), 404 task = active_tasks[task_id] if task.status != 'completed': return jsonify({'error': '任务尚未完成'}), 400 return jsonify({ 'task_id': task_id, 'analysis_report': task.result.get('analysis_report'), 'analysis_generated': task.result.get('analysis_generated', False), 'analysis_error': task.result.get('analysis_error') }) @app.route('/api/tasks', methods=['GET']) def list_tasks(): """获取所有任务列表""" tasks = [task.to_dict() for task in active_tasks.values()] return jsonify(sorted(tasks, key=lambda x: x['created_at'], reverse=True)) def process_task(task): """后台处理SpecGen任务""" try: task.status = 'running' task.started_at = datetime.now() task.progress = 10 # 创建临时文件 temp_dir = f'temp/{task.task_id}' os.makedirs(temp_dir, exist_ok=True) input_file = os.path.join(temp_dir, 'input.c') output_file = os.path.join(temp_dir, 'output.c') # 写入输入代码 with open(input_file, 'w', encoding='utf-8') as f: f.write(task.input_code) task.progress = 20 # 使用演示版本的代码生成器 try: from demo_code import generate_demo_result basic_result = generate_demo_result(task.input_code) task.progress = 60 # 生成AI分析报告 try: from simple_analysis import generate_demo_analysis_report analysis_result = generate_demo_analysis_report( task.input_code, basic_result['generated_code'], basic_result['verification_result']['output'] ) if analysis_result['success']: # 将分析报告添加到结果中 basic_result['analysis_report'] = analysis_result['report'] basic_result['analysis_generated'] = True else: basic_result['analysis_report'] = None basic_result['analysis_generated'] = False basic_result['analysis_error'] = analysis_result.get('error', '未知错误') except Exception as e: basic_result['analysis_report'] = None basic_result['analysis_generated'] = False basic_result['analysis_error'] = f'分析报告生成失败: {str(e)}' task.result = basic_result task.progress = 100 task.status = 'completed' task.completed_at = datetime.now() except Exception as e: task.status = 'failed' task.error = f'演示代码生成失败: {str(e)}' task.completed_at = datetime.now() return # 清理临时文件 try: import shutil shutil.rmtree(temp_dir) except: pass except subprocess.TimeoutExpired: task.status = 'failed' task.error = '任务执行超时' task.completed_at = datetime.now() except Exception as e: task.status = 'failed' task.error = f'任务执行失败: {str(e)}' task.completed_at = datetime.now() @app.route('/api/demo/sample', methods=['GET']) def get_demo_sample(): """获取示例代码""" sample_code = '''#include /** * Simple function to calculate the sum of an array */ int array_sum(int* arr, int size) { int sum = 0; for (int i = 0; i < size; i++) { sum += arr[i]; } return sum; } /** * Function to find the maximum value in an array */ int find_max(int* arr, int size) { if (size <= 0) { return 0; } int max = arr[0]; for (int i = 1; i < size; i++) { if (arr[i] > max) { max = arr[i]; } } return max; } int main() { int test_array[] = {1, 2, 3, 4, 5}; int size = 5; int sum = array_sum(test_array, size); int max = find_max(test_array, size); printf("Sum: %d, Max: %d\\n", sum, max); return 0; }''' return jsonify({'sample_code': sample_code}) @app.route('/api/system/info', methods=['GET']) def get_system_info(): """获取系统信息""" try: # 检查CBMC版本 cbmc_result = subprocess.run(['cbmc', '--version'], capture_output=True, text=True) cbmc_version = cbmc_result.stdout.strip() if cbmc_result.returncode == 0 else 'Unknown' return jsonify({ 'cbmc_version': cbmc_version, 'specgen_version': '2.0', 'active_tasks': len([t for t in active_tasks.values() if t.status == 'running']), 'completed_tasks': len([t for t in active_tasks.values() if t.status == 'completed']), 'failed_tasks': len([t for t in active_tasks.values() if t.status == 'failed']) }) except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': os.makedirs('temp', exist_ok=True) app.run(debug=True, host='0.0.0.0', port=8080)