You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
482 lines
15 KiB
482 lines
15 KiB
#!/usr/bin/env python3
|
|
# CodeDetect基准测试运行器
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
import statistics
|
|
import asyncio
|
|
import psutil
|
|
import tracemalloc
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
from dataclasses import dataclass, asdict
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
@dataclass
|
|
class BenchmarkResult:
|
|
"""基准测试结果"""
|
|
test_name: str
|
|
execution_time_ms: float
|
|
memory_usage_mb: float
|
|
cpu_usage_percent: float
|
|
success: bool
|
|
error_message: Optional[str] = None
|
|
metadata: Dict[str, Any] = None
|
|
|
|
@dataclass
|
|
class BenchmarkSuite:
|
|
"""基准测试套件"""
|
|
name: str
|
|
description: str
|
|
results: List[BenchmarkResult]
|
|
summary: Dict[str, Any]
|
|
config: Dict[str, Any]
|
|
|
|
class BenchmarkRunner:
|
|
"""基准测试运行器"""
|
|
|
|
def __init__(self, output_dir: str = "benchmark_results"):
|
|
self.output_dir = Path(output_dir)
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
self.results: List[BenchmarkResult] = []
|
|
|
|
async def run_all_benchmarks(self, config: Dict[str, Any]) -> BenchmarkSuite:
|
|
"""运行所有基准测试"""
|
|
print("🚀 开始运行基准测试...")
|
|
|
|
# 启动内存跟踪
|
|
tracemalloc.start()
|
|
|
|
# 运行基准测试
|
|
await self._run_code_parsing_benchmarks()
|
|
await self._run_verification_benchmarks()
|
|
await self._run_mutation_benchmarks()
|
|
await self._run_system_benchmarks()
|
|
|
|
# 停止内存跟踪
|
|
tracemalloc.stop()
|
|
|
|
# 生成汇总
|
|
summary = self._generate_summary()
|
|
|
|
# 创建基准测试套件
|
|
suite = BenchmarkSuite(
|
|
name="complete_benchmark_suite",
|
|
description="CodeDetect完整基准测试套件",
|
|
results=self.results,
|
|
summary=summary,
|
|
config=config
|
|
)
|
|
|
|
# 保存结果
|
|
self._save_results(suite)
|
|
|
|
return suite
|
|
|
|
async def _run_code_parsing_benchmarks(self):
|
|
"""运行代码解析基准测试"""
|
|
print("📝 运行代码解析基准测试...")
|
|
|
|
# 生成测试代码
|
|
test_codes = self._generate_test_codes()
|
|
|
|
for name, code in test_codes.items():
|
|
print(f" 测试: {name}")
|
|
result = await self._benchmark_code_parsing(name, code)
|
|
self.results.append(result)
|
|
|
|
async def _run_verification_benchmarks(self):
|
|
"""运行验证基准测试"""
|
|
print("🔍 运行验证基准测试...")
|
|
|
|
# 模拟验证测试
|
|
verification_specs = [
|
|
("simple_verification", "void test() { }"),
|
|
("complex_verification", """
|
|
void complex_test(int* arr, int size) {
|
|
if (arr == NULL || size <= 0) return;
|
|
for (int i = 0; i < size; i++) {
|
|
__CPROVER_assume(arr[i] >= 0);
|
|
arr[i] = arr[i] * 2;
|
|
}
|
|
}
|
|
""")
|
|
]
|
|
|
|
for name, spec in verification_specs:
|
|
print(f" 测试: {name}")
|
|
result = await self._benchmark_verification(name, spec)
|
|
self.results.append(result)
|
|
|
|
async def _run_mutation_benchmarks(self):
|
|
"""运行突变基准测试"""
|
|
print("🧬 运行突变基准测试...")
|
|
|
|
mutation_specs = [
|
|
("small_mutation", "void small_func() { }"),
|
|
("medium_mutation", "void medium_func(int x) { __CPROVER_assume(x > 0); }"),
|
|
("large_mutation", "void large_func(int* arr, int size) { /* complex logic */ }")
|
|
]
|
|
|
|
for name, spec in mutation_specs:
|
|
print(f" 测试: {name}")
|
|
result = await self._benchmark_mutation(name, spec)
|
|
self.results.append(result)
|
|
|
|
async def _run_system_benchmarks(self):
|
|
"""运行系统基准测试"""
|
|
print("⚙️ 运行系统基准测试...")
|
|
|
|
# 并发性能测试
|
|
for concurrency in [1, 5, 10, 20]:
|
|
print(f" 测试: concurrent_{concurrency}")
|
|
result = await self._benchmark_concurrency(f"concurrent_{concurrency}", concurrency)
|
|
self.results.append(result)
|
|
|
|
# 内存使用测试
|
|
print(f" 测试: memory_usage")
|
|
result = await self._benchmark_memory_usage("memory_usage")
|
|
self.results.append(result)
|
|
|
|
async def _benchmark_code_parsing(self, name: str, code: str) -> BenchmarkResult:
|
|
"""基准测试:代码解析"""
|
|
try:
|
|
# 记录开始时间
|
|
start_time = time.time()
|
|
start_memory = psutil.Process().memory_info().rss / 1024 / 1024
|
|
start_cpu = psutil.cpu_percent()
|
|
|
|
# 模拟代码解析
|
|
await self._simulate_code_parsing(code)
|
|
|
|
# 记录结束时间
|
|
end_time = time.time()
|
|
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
|
|
end_cpu = psutil.cpu_percent()
|
|
|
|
return BenchmarkResult(
|
|
test_name=name,
|
|
execution_time_ms=(end_time - start_time) * 1000,
|
|
memory_usage_mb=end_memory - start_memory,
|
|
cpu_usage_percent=end_cpu - start_cpu,
|
|
success=True,
|
|
metadata={"code_size": len(code)}
|
|
)
|
|
|
|
except Exception as e:
|
|
return BenchmarkResult(
|
|
test_name=name,
|
|
execution_time_ms=0,
|
|
memory_usage_mb=0,
|
|
cpu_usage_percent=0,
|
|
success=False,
|
|
error_message=str(e)
|
|
)
|
|
|
|
async def _benchmark_verification(self, name: str, spec: str) -> BenchmarkResult:
|
|
"""基准测试:验证"""
|
|
try:
|
|
# 记录开始时间
|
|
start_time = time.time()
|
|
start_memory = psutil.Process().memory_info().rss / 1024 / 1024
|
|
start_cpu = psutil.cpu_percent()
|
|
|
|
# 模拟验证过程
|
|
await self._simulate_verification(spec)
|
|
|
|
# 记录结束时间
|
|
end_time = time.time()
|
|
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
|
|
end_cpu = psutil.cpu_percent()
|
|
|
|
return BenchmarkResult(
|
|
test_name=name,
|
|
execution_time_ms=(end_time - start_time) * 1000,
|
|
memory_usage_mb=end_memory - start_memory,
|
|
cpu_usage_percent=end_cpu - start_cpu,
|
|
success=True,
|
|
metadata={"spec_size": len(spec)}
|
|
)
|
|
|
|
except Exception as e:
|
|
return BenchmarkResult(
|
|
test_name=name,
|
|
execution_time_ms=0,
|
|
memory_usage_mb=0,
|
|
cpu_usage_percent=0,
|
|
success=False,
|
|
error_message=str(e)
|
|
)
|
|
|
|
async def _benchmark_mutation(self, name: str, spec: str) -> BenchmarkResult:
|
|
"""基准测试:突变"""
|
|
try:
|
|
# 记录开始时间
|
|
start_time = time.time()
|
|
start_memory = psutil.Process().memory_info().rss / 1024 / 1024
|
|
start_cpu = psutil.cpu_percent()
|
|
|
|
# 模拟突变生成
|
|
await self._simulate_mutation(spec)
|
|
|
|
# 记录结束时间
|
|
end_time = time.time()
|
|
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
|
|
end_cpu = psutil.cpu_percent()
|
|
|
|
return BenchmarkResult(
|
|
test_name=name,
|
|
execution_time_ms=(end_time - start_time) * 1000,
|
|
memory_usage_mb=end_memory - start_memory,
|
|
cpu_usage_percent=end_cpu - start_cpu,
|
|
success=True,
|
|
metadata={"spec_size": len(spec)}
|
|
)
|
|
|
|
except Exception as e:
|
|
return BenchmarkResult(
|
|
test_name=name,
|
|
execution_time_ms=0,
|
|
memory_usage_mb=0,
|
|
cpu_usage_percent=0,
|
|
success=False,
|
|
error_message=str(e)
|
|
)
|
|
|
|
async def _benchmark_concurrency(self, name: str, concurrency: int) -> BenchmarkResult:
|
|
"""基准测试:并发"""
|
|
try:
|
|
# 记录开始时间
|
|
start_time = time.time()
|
|
start_memory = psutil.Process().memory_info().rss / 1024 / 1024
|
|
start_cpu = psutil.cpu_percent()
|
|
|
|
# 运行并发任务
|
|
tasks = []
|
|
for i in range(concurrency):
|
|
task = self._simulate_concurrent_task(f"task_{i}")
|
|
tasks.append(task)
|
|
|
|
await asyncio.gather(*tasks)
|
|
|
|
# 记录结束时间
|
|
end_time = time.time()
|
|
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
|
|
end_cpu = psutil.cpu_percent()
|
|
|
|
return BenchmarkResult(
|
|
test_name=name,
|
|
execution_time_ms=(end_time - start_time) * 1000,
|
|
memory_usage_mb=end_memory - start_memory,
|
|
cpu_usage_percent=end_cpu - start_cpu,
|
|
success=True,
|
|
metadata={"concurrency": concurrency}
|
|
)
|
|
|
|
except Exception as e:
|
|
return BenchmarkResult(
|
|
test_name=name,
|
|
execution_time_ms=0,
|
|
memory_usage_mb=0,
|
|
cpu_usage_percent=0,
|
|
success=False,
|
|
error_message=str(e)
|
|
)
|
|
|
|
async def _benchmark_memory_usage(self, name: str) -> BenchmarkResult:
|
|
"""基准测试:内存使用"""
|
|
try:
|
|
# 记录开始内存
|
|
start_memory = psutil.Process().memory_info().rss / 1024 / 1024
|
|
start_time = time.time()
|
|
|
|
# 模拟内存密集型操作
|
|
await self._simulate_memory_operations()
|
|
|
|
# 记录结束内存
|
|
end_memory = psutil.Process().memory_info().rss / 1024 / 1024
|
|
end_time = time.time()
|
|
|
|
return BenchmarkResult(
|
|
test_name=name,
|
|
execution_time_ms=(end_time - start_time) * 1000,
|
|
memory_usage_mb=end_memory - start_memory,
|
|
cpu_usage_percent=0,
|
|
success=True,
|
|
metadata={"memory_operations": "intensive"}
|
|
)
|
|
|
|
except Exception as e:
|
|
return BenchmarkResult(
|
|
test_name=name,
|
|
execution_time_ms=0,
|
|
memory_usage_mb=0,
|
|
cpu_usage_percent=0,
|
|
success=False,
|
|
error_message=str(e)
|
|
)
|
|
|
|
async def _simulate_code_parsing(self, code: str):
|
|
"""模拟代码解析"""
|
|
await asyncio.sleep(0.01) # 模拟解析时间
|
|
|
|
async def _simulate_verification(self, spec: str):
|
|
"""模拟验证过程"""
|
|
await asyncio.sleep(0.05) # 模拟验证时间
|
|
|
|
async def _simulate_mutation(self, spec: str):
|
|
"""模拟突变生成"""
|
|
await asyncio.sleep(0.03) # 模拟突变时间
|
|
|
|
async def _simulate_concurrent_task(self, task_id: str):
|
|
"""模拟并发任务"""
|
|
await asyncio.sleep(0.1)
|
|
|
|
async def _simulate_memory_operations(self):
|
|
"""模拟内存操作"""
|
|
# 创建一些内存使用
|
|
data = []
|
|
for i in range(1000):
|
|
data.append([i] * 100)
|
|
await asyncio.sleep(0.1)
|
|
del data
|
|
|
|
def _generate_test_codes(self) -> Dict[str, str]:
|
|
"""生成测试代码"""
|
|
return {
|
|
"small_function": """
|
|
int add(int a, int b) {
|
|
return a + b;
|
|
}
|
|
""",
|
|
"medium_function": """
|
|
#include <stdlib.h>
|
|
|
|
int array_sum(int* arr, int size) {
|
|
if (arr == NULL || size <= 0) return 0;
|
|
|
|
int sum = 0;
|
|
for (int i = 0; i < size; i++) {
|
|
sum += arr[i];
|
|
}
|
|
return sum;
|
|
}
|
|
""",
|
|
"large_function": """
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
|
|
typedef struct Node {
|
|
int data;
|
|
struct Node* next;
|
|
} Node;
|
|
|
|
Node* create_list(int size) {
|
|
Node* head = NULL;
|
|
Node** current = &head;
|
|
|
|
for (int i = 0; i < size; i++) {
|
|
*current = malloc(sizeof(Node));
|
|
(*current)->data = i;
|
|
(*current)->next = NULL;
|
|
current = &((*current)->next);
|
|
}
|
|
|
|
return head;
|
|
}
|
|
|
|
void free_list(Node* head) {
|
|
while (head != NULL) {
|
|
Node* temp = head;
|
|
head = head->next;
|
|
free(temp);
|
|
}
|
|
}
|
|
|
|
int list_sum(Node* head) {
|
|
int sum = 0;
|
|
while (head != NULL) {
|
|
sum += head->data;
|
|
head = head->next;
|
|
}
|
|
return sum;
|
|
}
|
|
"""
|
|
}
|
|
|
|
def _generate_summary(self) -> Dict[str, Any]:
|
|
"""生成基准测试汇总"""
|
|
if not self.results:
|
|
return {"error": "No results"}
|
|
|
|
successful_results = [r for r in self.results if r.success]
|
|
failed_results = [r for r in self.results if not r.success]
|
|
|
|
execution_times = [r.execution_time_ms for r in successful_results]
|
|
memory_usage = [r.memory_usage_mb for r in successful_results]
|
|
cpu_usage = [r.cpu_usage_percent for r in successful_results]
|
|
|
|
summary = {
|
|
"total_tests": len(self.results),
|
|
"successful_tests": len(successful_results),
|
|
"failed_tests": len(failed_results),
|
|
"success_rate": len(successful_results) / len(self.results) * 100,
|
|
"execution_time_stats": {
|
|
"avg_ms": statistics.mean(execution_times),
|
|
"min_ms": min(execution_times),
|
|
"max_ms": max(execution_times),
|
|
"median_ms": statistics.median(execution_times)
|
|
},
|
|
"memory_usage_stats": {
|
|
"avg_mb": statistics.mean(memory_usage),
|
|
"min_mb": min(memory_usage),
|
|
"max_mb": max(memory_usage),
|
|
"median_mb": statistics.median(memory_usage)
|
|
},
|
|
"cpu_usage_stats": {
|
|
"avg_percent": statistics.mean(cpu_usage),
|
|
"min_percent": min(cpu_usage),
|
|
"max_percent": max(cpu_usage),
|
|
"median_percent": statistics.median(cpu_usage)
|
|
}
|
|
}
|
|
|
|
return summary
|
|
|
|
def _save_results(self, suite: BenchmarkSuite):
|
|
"""保存基准测试结果"""
|
|
timestamp = int(time.time())
|
|
filename = f"benchmark_results_{timestamp}.json"
|
|
filepath = self.output_dir / filename
|
|
|
|
with open(filepath, 'w', encoding='utf-8') as f:
|
|
json.dump(asdict(suite), f, indent=2, ensure_ascii=False)
|
|
|
|
print(f"✅ 基准测试结果已保存到: {filepath}")
|
|
|
|
# 打印汇总
|
|
print("\n📊 基准测试汇总:")
|
|
print(f" 总测试数: {suite.summary['total_tests']}")
|
|
print(f" 成功测试数: {suite.summary['successful_tests']}")
|
|
print(f" 失败测试数: {suite.summary['failed_tests']}")
|
|
print(f" 成功率: {suite.summary['success_rate']:.1f}%")
|
|
print(f" 平均执行时间: {suite.summary['execution_time_stats']['avg_ms']:.2f}ms")
|
|
print(f" 平均内存使用: {suite.summary['memory_usage_stats']['avg_mb']:.2f}MB")
|
|
|
|
def main():
|
|
"""主函数"""
|
|
runner = BenchmarkRunner()
|
|
|
|
config = {
|
|
"iterations": 3,
|
|
"warmup_iterations": 1,
|
|
"timeout_seconds": 60,
|
|
"memory_threshold_mb": 1024
|
|
}
|
|
|
|
suite = asyncio.run(runner.run_all_benchmarks(config))
|
|
|
|
if __name__ == "__main__":
|
|
main() |