#!/usr/bin/env python3 # CodeDetect基准测试运行器 import os import sys import time import json import statistics import asyncio import psutil import tracemalloc from pathlib import Path from typing import Dict, List, Any, Optional from dataclasses import dataclass, asdict from concurrent.futures import ThreadPoolExecutor, as_completed @dataclass class BenchmarkResult: """基准测试结果""" test_name: str execution_time_ms: float memory_usage_mb: float cpu_usage_percent: float success: bool error_message: Optional[str] = None metadata: Dict[str, Any] = None @dataclass class BenchmarkSuite: """基准测试套件""" name: str description: str results: List[BenchmarkResult] summary: Dict[str, Any] config: Dict[str, Any] class BenchmarkRunner: """基准测试运行器""" def __init__(self, output_dir: str = "benchmark_results"): self.output_dir = Path(output_dir) self.output_dir.mkdir(parents=True, exist_ok=True) self.results: List[BenchmarkResult] = [] async def run_all_benchmarks(self, config: Dict[str, Any]) -> BenchmarkSuite: """运行所有基准测试""" print("🚀 开始运行基准测试...") # 启动内存跟踪 tracemalloc.start() # 运行基准测试 await self._run_code_parsing_benchmarks() await self._run_verification_benchmarks() await self._run_mutation_benchmarks() await self._run_system_benchmarks() # 停止内存跟踪 tracemalloc.stop() # 生成汇总 summary = self._generate_summary() # 创建基准测试套件 suite = BenchmarkSuite( name="complete_benchmark_suite", description="CodeDetect完整基准测试套件", results=self.results, summary=summary, config=config ) # 保存结果 self._save_results(suite) return suite async def _run_code_parsing_benchmarks(self): """运行代码解析基准测试""" print("📝 运行代码解析基准测试...") # 生成测试代码 test_codes = self._generate_test_codes() for name, code in test_codes.items(): print(f" 测试: {name}") result = await self._benchmark_code_parsing(name, code) self.results.append(result) async def _run_verification_benchmarks(self): """运行验证基准测试""" print("🔍 运行验证基准测试...") # 模拟验证测试 verification_specs = [ ("simple_verification", "void test() { }"), ("complex_verification", """ void complex_test(int* arr, int size) { if (arr == NULL || size <= 0) return; for (int i = 0; i < size; i++) { __CPROVER_assume(arr[i] >= 0); arr[i] = arr[i] * 2; } } """) ] for name, spec in verification_specs: print(f" 测试: {name}") result = await self._benchmark_verification(name, spec) self.results.append(result) async def _run_mutation_benchmarks(self): """运行突变基准测试""" print("🧬 运行突变基准测试...") mutation_specs = [ ("small_mutation", "void small_func() { }"), ("medium_mutation", "void medium_func(int x) { __CPROVER_assume(x > 0); }"), ("large_mutation", "void large_func(int* arr, int size) { /* complex logic */ }") ] for name, spec in mutation_specs: print(f" 测试: {name}") result = await self._benchmark_mutation(name, spec) self.results.append(result) async def _run_system_benchmarks(self): """运行系统基准测试""" print("⚙️ 运行系统基准测试...") # 并发性能测试 for concurrency in [1, 5, 10, 20]: print(f" 测试: concurrent_{concurrency}") result = await self._benchmark_concurrency(f"concurrent_{concurrency}", concurrency) self.results.append(result) # 内存使用测试 print(f" 测试: memory_usage") result = await self._benchmark_memory_usage("memory_usage") self.results.append(result) async def _benchmark_code_parsing(self, name: str, code: str) -> BenchmarkResult: """基准测试:代码解析""" try: # 记录开始时间 start_time = time.time() start_memory = psutil.Process().memory_info().rss / 1024 / 1024 start_cpu = psutil.cpu_percent() # 模拟代码解析 await self._simulate_code_parsing(code) # 记录结束时间 end_time = time.time() end_memory = psutil.Process().memory_info().rss / 1024 / 1024 end_cpu = psutil.cpu_percent() return BenchmarkResult( test_name=name, execution_time_ms=(end_time - start_time) * 1000, memory_usage_mb=end_memory - start_memory, cpu_usage_percent=end_cpu - start_cpu, success=True, metadata={"code_size": len(code)} ) except Exception as e: return BenchmarkResult( test_name=name, execution_time_ms=0, memory_usage_mb=0, cpu_usage_percent=0, success=False, error_message=str(e) ) async def _benchmark_verification(self, name: str, spec: str) -> BenchmarkResult: """基准测试:验证""" try: # 记录开始时间 start_time = time.time() start_memory = psutil.Process().memory_info().rss / 1024 / 1024 start_cpu = psutil.cpu_percent() # 模拟验证过程 await self._simulate_verification(spec) # 记录结束时间 end_time = time.time() end_memory = psutil.Process().memory_info().rss / 1024 / 1024 end_cpu = psutil.cpu_percent() return BenchmarkResult( test_name=name, execution_time_ms=(end_time - start_time) * 1000, memory_usage_mb=end_memory - start_memory, cpu_usage_percent=end_cpu - start_cpu, success=True, metadata={"spec_size": len(spec)} ) except Exception as e: return BenchmarkResult( test_name=name, execution_time_ms=0, memory_usage_mb=0, cpu_usage_percent=0, success=False, error_message=str(e) ) async def _benchmark_mutation(self, name: str, spec: str) -> BenchmarkResult: """基准测试:突变""" try: # 记录开始时间 start_time = time.time() start_memory = psutil.Process().memory_info().rss / 1024 / 1024 start_cpu = psutil.cpu_percent() # 模拟突变生成 await self._simulate_mutation(spec) # 记录结束时间 end_time = time.time() end_memory = psutil.Process().memory_info().rss / 1024 / 1024 end_cpu = psutil.cpu_percent() return BenchmarkResult( test_name=name, execution_time_ms=(end_time - start_time) * 1000, memory_usage_mb=end_memory - start_memory, cpu_usage_percent=end_cpu - start_cpu, success=True, metadata={"spec_size": len(spec)} ) except Exception as e: return BenchmarkResult( test_name=name, execution_time_ms=0, memory_usage_mb=0, cpu_usage_percent=0, success=False, error_message=str(e) ) async def _benchmark_concurrency(self, name: str, concurrency: int) -> BenchmarkResult: """基准测试:并发""" try: # 记录开始时间 start_time = time.time() start_memory = psutil.Process().memory_info().rss / 1024 / 1024 start_cpu = psutil.cpu_percent() # 运行并发任务 tasks = [] for i in range(concurrency): task = self._simulate_concurrent_task(f"task_{i}") tasks.append(task) await asyncio.gather(*tasks) # 记录结束时间 end_time = time.time() end_memory = psutil.Process().memory_info().rss / 1024 / 1024 end_cpu = psutil.cpu_percent() return BenchmarkResult( test_name=name, execution_time_ms=(end_time - start_time) * 1000, memory_usage_mb=end_memory - start_memory, cpu_usage_percent=end_cpu - start_cpu, success=True, metadata={"concurrency": concurrency} ) except Exception as e: return BenchmarkResult( test_name=name, execution_time_ms=0, memory_usage_mb=0, cpu_usage_percent=0, success=False, error_message=str(e) ) async def _benchmark_memory_usage(self, name: str) -> BenchmarkResult: """基准测试:内存使用""" try: # 记录开始内存 start_memory = psutil.Process().memory_info().rss / 1024 / 1024 start_time = time.time() # 模拟内存密集型操作 await self._simulate_memory_operations() # 记录结束内存 end_memory = psutil.Process().memory_info().rss / 1024 / 1024 end_time = time.time() return BenchmarkResult( test_name=name, execution_time_ms=(end_time - start_time) * 1000, memory_usage_mb=end_memory - start_memory, cpu_usage_percent=0, success=True, metadata={"memory_operations": "intensive"} ) except Exception as e: return BenchmarkResult( test_name=name, execution_time_ms=0, memory_usage_mb=0, cpu_usage_percent=0, success=False, error_message=str(e) ) async def _simulate_code_parsing(self, code: str): """模拟代码解析""" await asyncio.sleep(0.01) # 模拟解析时间 async def _simulate_verification(self, spec: str): """模拟验证过程""" await asyncio.sleep(0.05) # 模拟验证时间 async def _simulate_mutation(self, spec: str): """模拟突变生成""" await asyncio.sleep(0.03) # 模拟突变时间 async def _simulate_concurrent_task(self, task_id: str): """模拟并发任务""" await asyncio.sleep(0.1) async def _simulate_memory_operations(self): """模拟内存操作""" # 创建一些内存使用 data = [] for i in range(1000): data.append([i] * 100) await asyncio.sleep(0.1) del data def _generate_test_codes(self) -> Dict[str, str]: """生成测试代码""" return { "small_function": """ int add(int a, int b) { return a + b; } """, "medium_function": """ #include int array_sum(int* arr, int size) { if (arr == NULL || size <= 0) return 0; int sum = 0; for (int i = 0; i < size; i++) { sum += arr[i]; } return sum; } """, "large_function": """ #include #include typedef struct Node { int data; struct Node* next; } Node; Node* create_list(int size) { Node* head = NULL; Node** current = &head; for (int i = 0; i < size; i++) { *current = malloc(sizeof(Node)); (*current)->data = i; (*current)->next = NULL; current = &((*current)->next); } return head; } void free_list(Node* head) { while (head != NULL) { Node* temp = head; head = head->next; free(temp); } } int list_sum(Node* head) { int sum = 0; while (head != NULL) { sum += head->data; head = head->next; } return sum; } """ } def _generate_summary(self) -> Dict[str, Any]: """生成基准测试汇总""" if not self.results: return {"error": "No results"} successful_results = [r for r in self.results if r.success] failed_results = [r for r in self.results if not r.success] execution_times = [r.execution_time_ms for r in successful_results] memory_usage = [r.memory_usage_mb for r in successful_results] cpu_usage = [r.cpu_usage_percent for r in successful_results] summary = { "total_tests": len(self.results), "successful_tests": len(successful_results), "failed_tests": len(failed_results), "success_rate": len(successful_results) / len(self.results) * 100, "execution_time_stats": { "avg_ms": statistics.mean(execution_times), "min_ms": min(execution_times), "max_ms": max(execution_times), "median_ms": statistics.median(execution_times) }, "memory_usage_stats": { "avg_mb": statistics.mean(memory_usage), "min_mb": min(memory_usage), "max_mb": max(memory_usage), "median_mb": statistics.median(memory_usage) }, "cpu_usage_stats": { "avg_percent": statistics.mean(cpu_usage), "min_percent": min(cpu_usage), "max_percent": max(cpu_usage), "median_percent": statistics.median(cpu_usage) } } return summary def _save_results(self, suite: BenchmarkSuite): """保存基准测试结果""" timestamp = int(time.time()) filename = f"benchmark_results_{timestamp}.json" filepath = self.output_dir / filename with open(filepath, 'w', encoding='utf-8') as f: json.dump(asdict(suite), f, indent=2, ensure_ascii=False) print(f"✅ 基准测试结果已保存到: {filepath}") # 打印汇总 print("\n📊 基准测试汇总:") print(f" 总测试数: {suite.summary['total_tests']}") print(f" 成功测试数: {suite.summary['successful_tests']}") print(f" 失败测试数: {suite.summary['failed_tests']}") print(f" 成功率: {suite.summary['success_rate']:.1f}%") print(f" 平均执行时间: {suite.summary['execution_time_stats']['avg_ms']:.2f}ms") print(f" 平均内存使用: {suite.summary['memory_usage_stats']['avg_mb']:.2f}MB") def main(): """主函数""" runner = BenchmarkRunner() config = { "iterations": 3, "warmup_iterations": 1, "timeout_seconds": 60, "memory_threshold_mb": 1024 } suite = asyncio.run(runner.run_all_benchmarks(config)) if __name__ == "__main__": main()