You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cbmc/codedetect/tests/tools/benchmark_runner.py

482 lines
15 KiB

#!/usr/bin/env python3
# CodeDetect基准测试运行器
import os
import sys
import time
import json
import statistics
import asyncio
import psutil
import tracemalloc
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict
from concurrent.futures import ThreadPoolExecutor, as_completed
@dataclass
class BenchmarkResult:
"""基准测试结果"""
test_name: str
execution_time_ms: float
memory_usage_mb: float
cpu_usage_percent: float
success: bool
error_message: Optional[str] = None
metadata: Dict[str, Any] = None
@dataclass
class BenchmarkSuite:
"""基准测试套件"""
name: str
description: str
results: List[BenchmarkResult]
summary: Dict[str, Any]
config: Dict[str, Any]
class BenchmarkRunner:
"""基准测试运行器"""
def __init__(self, output_dir: str = "benchmark_results"):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.results: List[BenchmarkResult] = []
async def run_all_benchmarks(self, config: Dict[str, Any]) -> BenchmarkSuite:
"""运行所有基准测试"""
print("🚀 开始运行基准测试...")
# 启动内存跟踪
tracemalloc.start()
# 运行基准测试
await self._run_code_parsing_benchmarks()
await self._run_verification_benchmarks()
await self._run_mutation_benchmarks()
await self._run_system_benchmarks()
# 停止内存跟踪
tracemalloc.stop()
# 生成汇总
summary = self._generate_summary()
# 创建基准测试套件
suite = BenchmarkSuite(
name="complete_benchmark_suite",
description="CodeDetect完整基准测试套件",
results=self.results,
summary=summary,
config=config
)
# 保存结果
self._save_results(suite)
return suite
async def _run_code_parsing_benchmarks(self):
"""运行代码解析基准测试"""
print("📝 运行代码解析基准测试...")
# 生成测试代码
test_codes = self._generate_test_codes()
for name, code in test_codes.items():
print(f" 测试: {name}")
result = await self._benchmark_code_parsing(name, code)
self.results.append(result)
async def _run_verification_benchmarks(self):
"""运行验证基准测试"""
print("🔍 运行验证基准测试...")
# 模拟验证测试
verification_specs = [
("simple_verification", "void test() { }"),
("complex_verification", """
void complex_test(int* arr, int size) {
if (arr == NULL || size <= 0) return;
for (int i = 0; i < size; i++) {
__CPROVER_assume(arr[i] >= 0);
arr[i] = arr[i] * 2;
}
}
""")
]
for name, spec in verification_specs:
print(f" 测试: {name}")
result = await self._benchmark_verification(name, spec)
self.results.append(result)
async def _run_mutation_benchmarks(self):
"""运行突变基准测试"""
print("🧬 运行突变基准测试...")
mutation_specs = [
("small_mutation", "void small_func() { }"),
("medium_mutation", "void medium_func(int x) { __CPROVER_assume(x > 0); }"),
("large_mutation", "void large_func(int* arr, int size) { /* complex logic */ }")
]
for name, spec in mutation_specs:
print(f" 测试: {name}")
result = await self._benchmark_mutation(name, spec)
self.results.append(result)
async def _run_system_benchmarks(self):
"""运行系统基准测试"""
print("⚙️ 运行系统基准测试...")
# 并发性能测试
for concurrency in [1, 5, 10, 20]:
print(f" 测试: concurrent_{concurrency}")
result = await self._benchmark_concurrency(f"concurrent_{concurrency}", concurrency)
self.results.append(result)
# 内存使用测试
print(f" 测试: memory_usage")
result = await self._benchmark_memory_usage("memory_usage")
self.results.append(result)
async def _benchmark_code_parsing(self, name: str, code: str) -> BenchmarkResult:
"""基准测试:代码解析"""
try:
# 记录开始时间
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024 / 1024
start_cpu = psutil.cpu_percent()
# 模拟代码解析
await self._simulate_code_parsing(code)
# 记录结束时间
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
end_cpu = psutil.cpu_percent()
return BenchmarkResult(
test_name=name,
execution_time_ms=(end_time - start_time) * 1000,
memory_usage_mb=end_memory - start_memory,
cpu_usage_percent=end_cpu - start_cpu,
success=True,
metadata={"code_size": len(code)}
)
except Exception as e:
return BenchmarkResult(
test_name=name,
execution_time_ms=0,
memory_usage_mb=0,
cpu_usage_percent=0,
success=False,
error_message=str(e)
)
async def _benchmark_verification(self, name: str, spec: str) -> BenchmarkResult:
"""基准测试:验证"""
try:
# 记录开始时间
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024 / 1024
start_cpu = psutil.cpu_percent()
# 模拟验证过程
await self._simulate_verification(spec)
# 记录结束时间
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
end_cpu = psutil.cpu_percent()
return BenchmarkResult(
test_name=name,
execution_time_ms=(end_time - start_time) * 1000,
memory_usage_mb=end_memory - start_memory,
cpu_usage_percent=end_cpu - start_cpu,
success=True,
metadata={"spec_size": len(spec)}
)
except Exception as e:
return BenchmarkResult(
test_name=name,
execution_time_ms=0,
memory_usage_mb=0,
cpu_usage_percent=0,
success=False,
error_message=str(e)
)
async def _benchmark_mutation(self, name: str, spec: str) -> BenchmarkResult:
"""基准测试:突变"""
try:
# 记录开始时间
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024 / 1024
start_cpu = psutil.cpu_percent()
# 模拟突变生成
await self._simulate_mutation(spec)
# 记录结束时间
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
end_cpu = psutil.cpu_percent()
return BenchmarkResult(
test_name=name,
execution_time_ms=(end_time - start_time) * 1000,
memory_usage_mb=end_memory - start_memory,
cpu_usage_percent=end_cpu - start_cpu,
success=True,
metadata={"spec_size": len(spec)}
)
except Exception as e:
return BenchmarkResult(
test_name=name,
execution_time_ms=0,
memory_usage_mb=0,
cpu_usage_percent=0,
success=False,
error_message=str(e)
)
async def _benchmark_concurrency(self, name: str, concurrency: int) -> BenchmarkResult:
"""基准测试:并发"""
try:
# 记录开始时间
start_time = time.time()
start_memory = psutil.Process().memory_info().rss / 1024 / 1024
start_cpu = psutil.cpu_percent()
# 运行并发任务
tasks = []
for i in range(concurrency):
task = self._simulate_concurrent_task(f"task_{i}")
tasks.append(task)
await asyncio.gather(*tasks)
# 记录结束时间
end_time = time.time()
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
end_cpu = psutil.cpu_percent()
return BenchmarkResult(
test_name=name,
execution_time_ms=(end_time - start_time) * 1000,
memory_usage_mb=end_memory - start_memory,
cpu_usage_percent=end_cpu - start_cpu,
success=True,
metadata={"concurrency": concurrency}
)
except Exception as e:
return BenchmarkResult(
test_name=name,
execution_time_ms=0,
memory_usage_mb=0,
cpu_usage_percent=0,
success=False,
error_message=str(e)
)
async def _benchmark_memory_usage(self, name: str) -> BenchmarkResult:
"""基准测试:内存使用"""
try:
# 记录开始内存
start_memory = psutil.Process().memory_info().rss / 1024 / 1024
start_time = time.time()
# 模拟内存密集型操作
await self._simulate_memory_operations()
# 记录结束内存
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
end_time = time.time()
return BenchmarkResult(
test_name=name,
execution_time_ms=(end_time - start_time) * 1000,
memory_usage_mb=end_memory - start_memory,
cpu_usage_percent=0,
success=True,
metadata={"memory_operations": "intensive"}
)
except Exception as e:
return BenchmarkResult(
test_name=name,
execution_time_ms=0,
memory_usage_mb=0,
cpu_usage_percent=0,
success=False,
error_message=str(e)
)
async def _simulate_code_parsing(self, code: str):
"""模拟代码解析"""
await asyncio.sleep(0.01) # 模拟解析时间
async def _simulate_verification(self, spec: str):
"""模拟验证过程"""
await asyncio.sleep(0.05) # 模拟验证时间
async def _simulate_mutation(self, spec: str):
"""模拟突变生成"""
await asyncio.sleep(0.03) # 模拟突变时间
async def _simulate_concurrent_task(self, task_id: str):
"""模拟并发任务"""
await asyncio.sleep(0.1)
async def _simulate_memory_operations(self):
"""模拟内存操作"""
# 创建一些内存使用
data = []
for i in range(1000):
data.append([i] * 100)
await asyncio.sleep(0.1)
del data
def _generate_test_codes(self) -> Dict[str, str]:
"""生成测试代码"""
return {
"small_function": """
int add(int a, int b) {
return a + b;
}
""",
"medium_function": """
#include <stdlib.h>
int array_sum(int* arr, int size) {
if (arr == NULL || size <= 0) return 0;
int sum = 0;
for (int i = 0; i < size; i++) {
sum += arr[i];
}
return sum;
}
""",
"large_function": """
#include <stdlib.h>
#include <string.h>
typedef struct Node {
int data;
struct Node* next;
} Node;
Node* create_list(int size) {
Node* head = NULL;
Node** current = &head;
for (int i = 0; i < size; i++) {
*current = malloc(sizeof(Node));
(*current)->data = i;
(*current)->next = NULL;
current = &((*current)->next);
}
return head;
}
void free_list(Node* head) {
while (head != NULL) {
Node* temp = head;
head = head->next;
free(temp);
}
}
int list_sum(Node* head) {
int sum = 0;
while (head != NULL) {
sum += head->data;
head = head->next;
}
return sum;
}
"""
}
def _generate_summary(self) -> Dict[str, Any]:
"""生成基准测试汇总"""
if not self.results:
return {"error": "No results"}
successful_results = [r for r in self.results if r.success]
failed_results = [r for r in self.results if not r.success]
execution_times = [r.execution_time_ms for r in successful_results]
memory_usage = [r.memory_usage_mb for r in successful_results]
cpu_usage = [r.cpu_usage_percent for r in successful_results]
summary = {
"total_tests": len(self.results),
"successful_tests": len(successful_results),
"failed_tests": len(failed_results),
"success_rate": len(successful_results) / len(self.results) * 100,
"execution_time_stats": {
"avg_ms": statistics.mean(execution_times),
"min_ms": min(execution_times),
"max_ms": max(execution_times),
"median_ms": statistics.median(execution_times)
},
"memory_usage_stats": {
"avg_mb": statistics.mean(memory_usage),
"min_mb": min(memory_usage),
"max_mb": max(memory_usage),
"median_mb": statistics.median(memory_usage)
},
"cpu_usage_stats": {
"avg_percent": statistics.mean(cpu_usage),
"min_percent": min(cpu_usage),
"max_percent": max(cpu_usage),
"median_percent": statistics.median(cpu_usage)
}
}
return summary
def _save_results(self, suite: BenchmarkSuite):
"""保存基准测试结果"""
timestamp = int(time.time())
filename = f"benchmark_results_{timestamp}.json"
filepath = self.output_dir / filename
with open(filepath, 'w', encoding='utf-8') as f:
json.dump(asdict(suite), f, indent=2, ensure_ascii=False)
print(f"✅ 基准测试结果已保存到: {filepath}")
# 打印汇总
print("\n📊 基准测试汇总:")
print(f" 总测试数: {suite.summary['total_tests']}")
print(f" 成功测试数: {suite.summary['successful_tests']}")
print(f" 失败测试数: {suite.summary['failed_tests']}")
print(f" 成功率: {suite.summary['success_rate']:.1f}%")
print(f" 平均执行时间: {suite.summary['execution_time_stats']['avg_ms']:.2f}ms")
print(f" 平均内存使用: {suite.summary['memory_usage_stats']['avg_mb']:.2f}MB")
def main():
"""主函数"""
runner = BenchmarkRunner()
config = {
"iterations": 3,
"warmup_iterations": 1,
"timeout_seconds": 60,
"memory_threshold_mb": 1024
}
suite = asyncio.run(runner.run_all_benchmarks(config))
if __name__ == "__main__":
main()