|
|
#!/usr/bin/env python3
|
|
|
"""
|
|
|
CodeDetect基准测试运行工具
|
|
|
|
|
|
用于运行和记录系统性能基准测试,包括:
|
|
|
- 解析性能基准
|
|
|
- 验证性能基准
|
|
|
- 突变生成性能基准
|
|
|
- 系统资源使用基准
|
|
|
- 并发性能基准
|
|
|
"""
|
|
|
|
|
|
import os
|
|
|
import sys
|
|
|
import json
|
|
|
import time
|
|
|
import asyncio
|
|
|
import psutil
|
|
|
import statistics
|
|
|
import threading
|
|
|
import multiprocessing
|
|
|
from pathlib import Path
|
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
|
from dataclasses import dataclass, asdict, field
|
|
|
from datetime import datetime, timedelta
|
|
|
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
|
|
|
import matplotlib.pyplot as plt
|
|
|
import numpy as np
|
|
|
import yaml
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class BenchmarkResult:
|
|
|
"""基准测试结果"""
|
|
|
name: str
|
|
|
category: str
|
|
|
metric: str
|
|
|
value: float
|
|
|
unit: str
|
|
|
timestamp: datetime
|
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
samples: List[float] = field(default_factory=list)
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class BenchmarkSuite:
|
|
|
"""基准测试套件"""
|
|
|
name: str
|
|
|
description: str
|
|
|
results: List[BenchmarkResult] = field(default_factory=list)
|
|
|
config: Dict[str, Any] = field(default_factory=dict)
|
|
|
start_time: Optional[datetime] = None
|
|
|
end_time: Optional[datetime] = None
|
|
|
|
|
|
|
|
|
class SystemMonitor:
|
|
|
"""系统资源监控器"""
|
|
|
|
|
|
def __init__(self, interval: float = 0.5):
|
|
|
self.interval = interval
|
|
|
self.monitoring = False
|
|
|
self.cpu_samples = []
|
|
|
self.memory_samples = []
|
|
|
self.disk_io_samples = []
|
|
|
self.network_io_samples = []
|
|
|
self.thread = None
|
|
|
|
|
|
def start(self):
|
|
|
"""开始监控"""
|
|
|
self.monitoring = True
|
|
|
self.cpu_samples = []
|
|
|
self.memory_samples = []
|
|
|
self.disk_io_samples = []
|
|
|
self.network_io_samples = []
|
|
|
|
|
|
self.thread = threading.Thread(target=self._monitor_loop)
|
|
|
self.thread.daemon = True
|
|
|
self.thread.start()
|
|
|
|
|
|
def stop(self):
|
|
|
"""停止监控"""
|
|
|
self.monitoring = False
|
|
|
if self.thread:
|
|
|
self.thread.join(timeout=1.0)
|
|
|
|
|
|
def _monitor_loop(self):
|
|
|
"""监控循环"""
|
|
|
while self.monitoring:
|
|
|
try:
|
|
|
# CPU使用率
|
|
|
cpu_percent = psutil.cpu_percent(interval=None)
|
|
|
self.cpu_samples.append(cpu_percent)
|
|
|
|
|
|
# 内存使用
|
|
|
memory = psutil.virtual_memory()
|
|
|
self.memory_samples.append({
|
|
|
"total": memory.total,
|
|
|
"available": memory.available,
|
|
|
"used": memory.used,
|
|
|
"percent": memory.percent
|
|
|
})
|
|
|
|
|
|
# 磁盘IO
|
|
|
disk_io = psutil.disk_io_counters()
|
|
|
if disk_io:
|
|
|
self.disk_io_samples.append({
|
|
|
"read_bytes": disk_io.read_bytes,
|
|
|
"write_bytes": disk_io.write_bytes,
|
|
|
"read_count": disk_io.read_count,
|
|
|
"write_count": disk_io.write_count
|
|
|
})
|
|
|
|
|
|
# 网络IO
|
|
|
network_io = psutil.net_io_counters()
|
|
|
if network_io:
|
|
|
self.network_io_samples.append({
|
|
|
"bytes_sent": network_io.bytes_sent,
|
|
|
"bytes_recv": network_io.bytes_recv,
|
|
|
"packets_sent": network_io.packets_sent,
|
|
|
"packets_recv": network_io.packets_recv
|
|
|
})
|
|
|
|
|
|
time.sleep(self.interval)
|
|
|
except Exception as e:
|
|
|
print(f"监控错误: {e}")
|
|
|
break
|
|
|
|
|
|
def get_stats(self) -> Dict[str, Any]:
|
|
|
"""获取监控统计"""
|
|
|
stats = {}
|
|
|
|
|
|
if self.cpu_samples:
|
|
|
stats["cpu"] = {
|
|
|
"mean": statistics.mean(self.cpu_samples),
|
|
|
"median": statistics.median(self.cpu_samples),
|
|
|
"min": min(self.cpu_samples),
|
|
|
"max": max(self.cpu_samples),
|
|
|
"std": statistics.stdev(self.cpu_samples) if len(self.cpu_samples) > 1 else 0
|
|
|
}
|
|
|
|
|
|
if self.memory_samples:
|
|
|
memory_percents = [sample["percent"] for sample in self.memory_samples]
|
|
|
stats["memory"] = {
|
|
|
"mean_percent": statistics.mean(memory_percents),
|
|
|
"median_percent": statistics.median(memory_percents),
|
|
|
"min_percent": min(memory_percents),
|
|
|
"max_percent": max(memory_percents),
|
|
|
"peak_used_mb": max(sample["used"] for sample in self.memory_samples) / 1024 / 1024
|
|
|
}
|
|
|
|
|
|
return stats
|
|
|
|
|
|
|
|
|
class CodeParsingBenchmark:
|
|
|
"""代码解析性能基准测试"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.test_codes = self._generate_test_codes()
|
|
|
|
|
|
def _generate_test_codes(self) -> List[Dict[str, Any]]:
|
|
|
"""生成测试代码"""
|
|
|
return [
|
|
|
{
|
|
|
"name": "small_function",
|
|
|
"code": "int add(int a, int b) { return a + b; }",
|
|
|
"expected_functions": 1
|
|
|
},
|
|
|
{
|
|
|
"name": "medium_functions",
|
|
|
"code": self._generate_medium_code(),
|
|
|
"expected_functions": 5
|
|
|
},
|
|
|
{
|
|
|
"name": "large_functions",
|
|
|
"code": self._generate_large_code(),
|
|
|
"expected_functions": 20
|
|
|
},
|
|
|
{
|
|
|
"name": "complex_structures",
|
|
|
"code": self._generate_complex_code(),
|
|
|
"expected_functions": 10
|
|
|
}
|
|
|
]
|
|
|
|
|
|
def _generate_medium_code(self) -> str:
|
|
|
"""生成中等复杂度代码"""
|
|
|
return """
|
|
|
int max(int a, int b) { return a > b ? a : b; }
|
|
|
int min(int a, int b) { return a < b ? a : b; }
|
|
|
int abs(int x) { return x >= 0 ? x : -x; }
|
|
|
int factorial(int n) { return n <= 1 ? 1 : n * factorial(n - 1); }
|
|
|
int gcd(int a, int b) { return b == 0 ? a : gcd(b, a % b); }
|
|
|
"""
|
|
|
|
|
|
def _generate_large_code(self) -> str:
|
|
|
"""生成大型代码"""
|
|
|
functions = []
|
|
|
for i in range(20):
|
|
|
func = f"""
|
|
|
int func_{i}(int x) {{
|
|
|
int result = x * {i};
|
|
|
for (int j = 0; j < {i % 10}; j++) {{
|
|
|
result += j;
|
|
|
}}
|
|
|
return result;
|
|
|
}}
|
|
|
"""
|
|
|
functions.append(func)
|
|
|
return "\n".join(functions)
|
|
|
|
|
|
def _generate_complex_code(self) -> str:
|
|
|
"""生成复杂代码"""
|
|
|
return """
|
|
|
typedef struct {
|
|
|
int x;
|
|
|
int y;
|
|
|
char name[50];
|
|
|
} Point;
|
|
|
|
|
|
typedef struct {
|
|
|
Point start;
|
|
|
Point end;
|
|
|
int id;
|
|
|
} LineSegment;
|
|
|
|
|
|
float distance(Point p1, Point p2) {
|
|
|
int dx = p1.x - p2.x;
|
|
|
int dy = p1.y - p2.y;
|
|
|
return sqrt(dx*dx + dy*dy);
|
|
|
}
|
|
|
|
|
|
int is_collinear(Point p1, Point p2, Point p3) {
|
|
|
int area = (p2.x - p1.x) * (p3.y - p1.y) - (p3.x - p1.x) * (p2.y - p1.y);
|
|
|
return area == 0;
|
|
|
}
|
|
|
|
|
|
float line_length(LineSegment line) {
|
|
|
return distance(line.start, line.end);
|
|
|
}
|
|
|
|
|
|
Point midpoint(LineSegment line) {
|
|
|
Point mid;
|
|
|
mid.x = (line.start.x + line.end.x) / 2;
|
|
|
mid.y = (line.start.y + line.end.y) / 2;
|
|
|
return mid;
|
|
|
}
|
|
|
|
|
|
int line_intersection(LineSegment l1, LineSegment l2, Point* result) {
|
|
|
// 线段相交检测逻辑
|
|
|
return 0; // 简化实现
|
|
|
}
|
|
|
|
|
|
void normalize_point(Point* p) {
|
|
|
float mag = sqrt(p->x*p->x + p->y*p->y);
|
|
|
if (mag > 0) {
|
|
|
p->x /= mag;
|
|
|
p->y /= mag;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
int point_in_rect(Point p, Point rect_min, Point rect_max) {
|
|
|
return p.x >= rect_min.x && p.x <= rect_max.x &&
|
|
|
p.y >= rect_min.y && p.y <= rect_max.y;
|
|
|
}
|
|
|
|
|
|
float angle_between_points(Point p1, Point p2, Point p3) {
|
|
|
float v1x = p1.x - p2.x;
|
|
|
float v1y = p1.y - p2.y;
|
|
|
float v2x = p3.x - p2.x;
|
|
|
float v2y = p3.y - p2.y;
|
|
|
return atan2(v1x*v2y - v1y*v2x, v1x*v2x + v1y*v2y);
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
async def run_benchmark(self, iterations: int = 10) -> List[BenchmarkResult]:
|
|
|
"""运行解析基准测试"""
|
|
|
results = []
|
|
|
|
|
|
for test_case in self.test_codes:
|
|
|
print(f"🔍 运行解析基准测试: {test_case['name']}")
|
|
|
|
|
|
# 动态导入解析器
|
|
|
try:
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
from src.parse.code_parser import CodeParser
|
|
|
|
|
|
parser = CodeParser()
|
|
|
parse_times = []
|
|
|
|
|
|
# 创建临时文件
|
|
|
import tempfile
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.c', delete=False) as f:
|
|
|
f.write(test_case['code'])
|
|
|
temp_file = f.name
|
|
|
|
|
|
try:
|
|
|
# 多次运行以获取平均值
|
|
|
for i in range(iterations):
|
|
|
start_time = time.time()
|
|
|
result = parser.parse_file(temp_file)
|
|
|
end_time = time.time()
|
|
|
|
|
|
parse_time = end_time - start_time
|
|
|
parse_times.append(parse_time)
|
|
|
|
|
|
# 验证结果
|
|
|
if len(result.functions) != test_case['expected_functions']:
|
|
|
print(f"⚠️ 解析结果不匹配: 期望 {test_case['expected_functions']}, 实际 {len(result.functions)}")
|
|
|
|
|
|
finally:
|
|
|
os.unlink(temp_file)
|
|
|
|
|
|
# 计算统计信息
|
|
|
avg_time = statistics.mean(parse_times)
|
|
|
median_time = statistics.median(parse_times)
|
|
|
std_time = statistics.stdev(parse_times) if len(parse_times) > 1 else 0
|
|
|
|
|
|
# 创建结果
|
|
|
result = BenchmarkResult(
|
|
|
name=f"parsing_{test_case['name']}",
|
|
|
category="parsing",
|
|
|
metric="time",
|
|
|
value=avg_time,
|
|
|
unit="seconds",
|
|
|
timestamp=datetime.now(),
|
|
|
metadata={
|
|
|
"code_size": len(test_case['code']),
|
|
|
"expected_functions": test_case['expected_functions'],
|
|
|
"iterations": iterations,
|
|
|
"median_time": median_time,
|
|
|
"std_time": std_time,
|
|
|
"min_time": min(parse_times),
|
|
|
"max_time": max(parse_times)
|
|
|
},
|
|
|
samples=parse_times
|
|
|
)
|
|
|
results.append(result)
|
|
|
|
|
|
except ImportError as e:
|
|
|
print(f"⚠️ 无法导入解析器: {e}")
|
|
|
continue
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
class VerificationBenchmark:
|
|
|
"""验证性能基准测试"""
|
|
|
|
|
|
async def run_benchmark(self, iterations: int = 5) -> List[BenchmarkResult]:
|
|
|
"""运行验证基准测试"""
|
|
|
results = []
|
|
|
|
|
|
# 测试用例
|
|
|
test_cases = [
|
|
|
{
|
|
|
"name": "simple_arithmetic",
|
|
|
"code": "int add(int a, int b) { return a + b; }",
|
|
|
"spec": self._generate_simple_spec("add")
|
|
|
},
|
|
|
{
|
|
|
"name": "array_processing",
|
|
|
"code": """
|
|
|
int sum_array(int* arr, int size) {
|
|
|
if (!arr || size <= 0) return 0;
|
|
|
int sum = 0;
|
|
|
for (int i = 0; i < size; i++) {
|
|
|
sum += arr[i];
|
|
|
}
|
|
|
return sum;
|
|
|
}""",
|
|
|
"spec": self._generate_array_spec()
|
|
|
}
|
|
|
]
|
|
|
|
|
|
for test_case in test_cases:
|
|
|
print(f"🔍 运行验证基准测试: {test_case['name']}")
|
|
|
|
|
|
try:
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
from src.verify.cbmc_runner import CBMCRunner
|
|
|
|
|
|
runner = CBMCRunner()
|
|
|
verification_times = []
|
|
|
|
|
|
# 创建临时文件
|
|
|
import tempfile
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.c', delete=False) as f:
|
|
|
f.write(test_case['code'])
|
|
|
temp_file = f.name
|
|
|
|
|
|
try:
|
|
|
for i in range(iterations):
|
|
|
start_time = time.time()
|
|
|
result = await runner.run_verification(
|
|
|
function_metadata={"name": "test_function"},
|
|
|
source_file=temp_file,
|
|
|
specification=test_case['spec']
|
|
|
)
|
|
|
end_time = time.time()
|
|
|
|
|
|
verification_time = end_time - start_time
|
|
|
verification_times.append(verification_time)
|
|
|
|
|
|
# 记录结果状态
|
|
|
metadata = {
|
|
|
"status": result.status,
|
|
|
"execution_time": result.execution_time,
|
|
|
"success": result.status == "success"
|
|
|
}
|
|
|
|
|
|
finally:
|
|
|
os.unlink(temp_file)
|
|
|
|
|
|
if verification_times:
|
|
|
avg_time = statistics.mean(verification_times)
|
|
|
result = BenchmarkResult(
|
|
|
name=f"verification_{test_case['name']}",
|
|
|
category="verification",
|
|
|
metric="time",
|
|
|
value=avg_time,
|
|
|
unit="seconds",
|
|
|
timestamp=datetime.now(),
|
|
|
metadata={
|
|
|
"iterations": iterations,
|
|
|
"success_rate": sum(1 for t in verification_times if t < 30) / len(verification_times),
|
|
|
"min_time": min(verification_times),
|
|
|
"max_time": max(verification_times)
|
|
|
},
|
|
|
samples=verification_times
|
|
|
)
|
|
|
results.append(result)
|
|
|
|
|
|
except ImportError as e:
|
|
|
print(f"⚠️ 无法导入验证器: {e}")
|
|
|
continue
|
|
|
|
|
|
return results
|
|
|
|
|
|
def _generate_simple_spec(self, func_name: str) -> str:
|
|
|
"""生成简单规范"""
|
|
|
return f"""
|
|
|
void {func_name}_test() {{
|
|
|
int a = __CPROVER_nondet_int();
|
|
|
int b = __CPROVER_nondet_int();
|
|
|
__CPROVER_assume(a >= -1000 && a <= 1000);
|
|
|
__CPROVER_assume(b >= -1000 && b <= 1000);
|
|
|
|
|
|
int result = {func_name}(a, b);
|
|
|
__CPROVER_assert(result == a + b, "addition_correct");
|
|
|
}}
|
|
|
"""
|
|
|
|
|
|
def _generate_array_spec(self) -> str:
|
|
|
"""生成数组规范"""
|
|
|
return """
|
|
|
void sum_array_test() {
|
|
|
int size = __CPROVER_nondet_int();
|
|
|
__CPROVER_assume(size >= 0 && size <= 10);
|
|
|
|
|
|
int arr[size];
|
|
|
for (int i = 0; i < size; i++) {
|
|
|
arr[i] = __CPROVER_nondet_int();
|
|
|
}
|
|
|
|
|
|
int result = sum_array(arr, size);
|
|
|
__CPROVER_assert(result >= 0, "non_negative_result");
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
|
|
|
class MutationBenchmark:
|
|
|
"""突变生成性能基准测试"""
|
|
|
|
|
|
async def run_benchmark(self, iterations: int = 10) -> List[BenchmarkResult]:
|
|
|
"""运行突变基准测试"""
|
|
|
results = []
|
|
|
|
|
|
try:
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
from src.mutate.engine import MutationEngine
|
|
|
|
|
|
engine = MutationEngine()
|
|
|
|
|
|
test_cases = [
|
|
|
{
|
|
|
"name": "simple_function",
|
|
|
"spec": "void test() { int x = 0; }",
|
|
|
"metadata": [{"name": "test", "complexity_score": 0.1}]
|
|
|
},
|
|
|
{
|
|
|
"name": "complex_function",
|
|
|
"spec": self._generate_complex_spec(),
|
|
|
"metadata": [{"name": "complex_test", "complexity_score": 0.8}]
|
|
|
}
|
|
|
]
|
|
|
|
|
|
for test_case in test_cases:
|
|
|
print(f"🔍 运行突变基准测试: {test_case['name']}")
|
|
|
|
|
|
mutation_times = []
|
|
|
mutation_counts = []
|
|
|
|
|
|
for i in range(iterations):
|
|
|
start_time = time.time()
|
|
|
mutations = engine.generate_mutations(
|
|
|
test_case['spec'],
|
|
|
test_case['metadata'],
|
|
|
max_mutations=5
|
|
|
)
|
|
|
end_time = time.time()
|
|
|
|
|
|
mutation_time = end_time - start_time
|
|
|
mutation_times.append(mutation_time)
|
|
|
mutation_counts.append(len(mutations))
|
|
|
|
|
|
if mutation_times:
|
|
|
avg_time = statistics.mean(mutation_times)
|
|
|
avg_count = statistics.mean(mutation_counts)
|
|
|
|
|
|
result = BenchmarkResult(
|
|
|
name=f"mutation_{test_case['name']}",
|
|
|
category="mutation",
|
|
|
metric="time",
|
|
|
value=avg_time,
|
|
|
unit="seconds",
|
|
|
timestamp=datetime.now(),
|
|
|
metadata={
|
|
|
"iterations": iterations,
|
|
|
"avg_mutations": avg_count,
|
|
|
"min_time": min(mutation_times),
|
|
|
"max_time": max(mutation_times)
|
|
|
},
|
|
|
samples=mutation_times
|
|
|
)
|
|
|
results.append(result)
|
|
|
|
|
|
except ImportError as e:
|
|
|
print(f"⚠️ 无法导入突变引擎: {e}")
|
|
|
|
|
|
return results
|
|
|
|
|
|
def _generate_complex_spec(self) -> str:
|
|
|
"""生成复杂规范"""
|
|
|
return """
|
|
|
void complex_test() {
|
|
|
int arr[10];
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
arr[i] = __CPROVER_nondet_int();
|
|
|
__CPROVER_assume(arr[i] >= 0 && arr[i] <= 100);
|
|
|
}
|
|
|
|
|
|
int sum = 0;
|
|
|
for (int i = 0; i < 10; i++) {
|
|
|
if (arr[i] % 2 == 0) {
|
|
|
sum += arr[i];
|
|
|
}
|
|
|
}
|
|
|
|
|
|
__CPROVER_assert(sum >= 0, "sum_non_negative");
|
|
|
}
|
|
|
"""
|
|
|
|
|
|
|
|
|
class ConcurrencyBenchmark:
|
|
|
"""并发性能基准测试"""
|
|
|
|
|
|
async def run_benchmark(self, max_workers: int = 8) -> List[BenchmarkResult]:
|
|
|
"""运行并发基准测试"""
|
|
|
results = []
|
|
|
|
|
|
# 测试不同的并发级别
|
|
|
for workers in [1, 2, 4, 8]:
|
|
|
print(f"🔍 运行并发基准测试: {workers} workers")
|
|
|
|
|
|
throughput_times = []
|
|
|
success_count = 0
|
|
|
|
|
|
def worker_task(task_id: int) -> float:
|
|
|
"""工作线程任务"""
|
|
|
start_time = time.time()
|
|
|
|
|
|
# 模拟工作负载
|
|
|
total = 0
|
|
|
for i in range(10000):
|
|
|
total += i * task_id
|
|
|
|
|
|
end_time = time.time()
|
|
|
return end_time - start_time
|
|
|
|
|
|
# 运行并发测试
|
|
|
for iteration in range(5):
|
|
|
start_time = time.time()
|
|
|
|
|
|
with ThreadPoolExecutor(max_workers=workers) as executor:
|
|
|
futures = [executor.submit(worker_task, i) for i in range(workers * 2)]
|
|
|
results_list = [future.result() for future in futures]
|
|
|
|
|
|
end_time = time.time()
|
|
|
total_time = end_time - start_time
|
|
|
throughput_times.append(total_time)
|
|
|
success_count += len(results_list)
|
|
|
|
|
|
if throughput_times:
|
|
|
avg_time = statistics.mean(throughput_time for throughput_time in throughput_times)
|
|
|
throughput = success_count / avg_time if avg_time > 0 else 0
|
|
|
|
|
|
result = BenchmarkResult(
|
|
|
name=f"concurrent_workers_{workers}",
|
|
|
category="concurrency",
|
|
|
metric="throughput",
|
|
|
value=throughput,
|
|
|
unit="tasks/second",
|
|
|
timestamp=datetime.now(),
|
|
|
metadata={
|
|
|
"workers": workers,
|
|
|
"iterations": 5,
|
|
|
"avg_time": avg_time,
|
|
|
"success_rate": success_count / (5 * workers * 2)
|
|
|
},
|
|
|
samples=throughput_times
|
|
|
)
|
|
|
results.append(result)
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
class BenchmarkRunner:
|
|
|
"""基准测试运行器"""
|
|
|
|
|
|
def __init__(self, output_dir: str):
|
|
|
self.output_dir = Path(output_dir)
|
|
|
self.output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
self.suites: List[BenchmarkSuite] = []
|
|
|
self.system_monitor = SystemMonitor()
|
|
|
|
|
|
async def run_all_benchmarks(self, config: Dict[str, Any]) -> BenchmarkSuite:
|
|
|
"""运行所有基准测试"""
|
|
|
suite = BenchmarkSuite(
|
|
|
name="complete_benchmark_suite",
|
|
|
description="完整的CodeDetect性能基准测试",
|
|
|
config=config,
|
|
|
start_time=datetime.now()
|
|
|
)
|
|
|
|
|
|
print("🚀 开始运行基准测试套件...")
|
|
|
|
|
|
# 开始系统监控
|
|
|
self.system_monitor.start()
|
|
|
|
|
|
try:
|
|
|
# 运行各个基准测试
|
|
|
benchmarks = [
|
|
|
("代码解析", CodeParsingBenchmark()),
|
|
|
("验证性能", VerificationBenchmark()),
|
|
|
("突变生成", MutationBenchmark()),
|
|
|
("并发性能", ConcurrencyBenchmark())
|
|
|
]
|
|
|
|
|
|
for benchmark_name, benchmark in benchmarks:
|
|
|
print(f"\n📊 运行 {benchmark_name} 基准测试...")
|
|
|
results = await benchmark.run_benchmark(
|
|
|
iterations=config.get("iterations", 5)
|
|
|
)
|
|
|
suite.results.extend(results)
|
|
|
|
|
|
finally:
|
|
|
# 停止系统监控
|
|
|
self.system_monitor.stop()
|
|
|
|
|
|
suite.end_time = datetime.now()
|
|
|
|
|
|
# 添加系统监控结果
|
|
|
system_stats = self.system_monitor.get_stats()
|
|
|
if system_stats:
|
|
|
for metric_name, stats in system_stats.items():
|
|
|
for stat_name, value in stats.items():
|
|
|
result = BenchmarkResult(
|
|
|
name=f"system_{metric_name}_{stat_name}",
|
|
|
category="system",
|
|
|
metric=stat_name,
|
|
|
value=value,
|
|
|
unit="percent" if "percent" in stat_name else "value",
|
|
|
timestamp=datetime.now(),
|
|
|
metadata=system_stats
|
|
|
)
|
|
|
suite.results.append(result)
|
|
|
|
|
|
# 保存结果
|
|
|
self._save_suite_results(suite)
|
|
|
self._generate_report(suite)
|
|
|
|
|
|
return suite
|
|
|
|
|
|
def _save_suite_results(self, suite: BenchmarkSuite):
|
|
|
"""保存基准测试结果"""
|
|
|
# 转换为可序列化格式
|
|
|
suite_dict = asdict(suite)
|
|
|
suite_dict["start_time"] = suite.start_time.isoformat()
|
|
|
suite_dict["end_time"] = suite.end_time.isoformat()
|
|
|
suite_dict["results"] = [
|
|
|
{
|
|
|
**asdict(result),
|
|
|
"timestamp": result.timestamp.isoformat()
|
|
|
}
|
|
|
for result in suite.results
|
|
|
]
|
|
|
|
|
|
# 保存JSON格式
|
|
|
json_file = self.output_dir / f"{suite.name}.json"
|
|
|
with open(json_file, 'w', encoding='utf-8') as f:
|
|
|
json.dump(suite_dict, f, indent=2, ensure_ascii=False)
|
|
|
|
|
|
# 保存YAML格式
|
|
|
yaml_file = self.output_dir / f"{suite.name}.yaml"
|
|
|
with open(yaml_file, 'w', encoding='utf-8') as f:
|
|
|
yaml.dump(suite_dict, f, default_flow_style=False, allow_unicode=True)
|
|
|
|
|
|
def _generate_report(self, suite: BenchmarkSuite):
|
|
|
"""生成基准测试报告"""
|
|
|
report_file = self.output_dir / f"{suite.name}_report.md"
|
|
|
|
|
|
with open(report_file, 'w', encoding='utf-8') as f:
|
|
|
f.write(f"# CodeDetect 基准测试报告\n\n")
|
|
|
f.write(f"**测试套件**: {suite.name}\n")
|
|
|
f.write(f"**开始时间**: {suite.start_time}\n")
|
|
|
f.write(f"**结束时间**: {suite.end_time}\n")
|
|
|
f.write(f"**总耗时**: {suite.end_time - suite.start_time}\n\n")
|
|
|
|
|
|
# 按类别分组结果
|
|
|
categories = {}
|
|
|
for result in suite.results:
|
|
|
if result.category not in categories:
|
|
|
categories[result.category] = []
|
|
|
categories[result.category].append(result)
|
|
|
|
|
|
for category, results in categories.items():
|
|
|
f.write(f"## {category.upper()} 基准测试\n\n")
|
|
|
|
|
|
for result in results:
|
|
|
f.write(f"### {result.name}\n")
|
|
|
f.write(f"- **指标**: {result.metric}\n")
|
|
|
f.write(f"- **值**: {result.value:.4f} {result.unit}\n")
|
|
|
f.write(f"- **样本数**: {len(result.samples)}\n")
|
|
|
|
|
|
if result.metadata:
|
|
|
f.write("- **附加信息**:\n")
|
|
|
for key, value in result.metadata.items():
|
|
|
f.write(f" - {key}: {value}\n")
|
|
|
|
|
|
f.write("\n")
|
|
|
|
|
|
# 生成性能趋势图
|
|
|
self._generate_performance_charts(suite)
|
|
|
|
|
|
def _generate_performance_charts(self, suite: BenchmarkSuite):
|
|
|
"""生成性能趋势图"""
|
|
|
try:
|
|
|
import matplotlib.pyplot as plt
|
|
|
|
|
|
# 按类别分组
|
|
|
categories = {}
|
|
|
for result in suite.results:
|
|
|
if result.category not in categories:
|
|
|
categories[result.category] = []
|
|
|
categories[result.category].append(result)
|
|
|
|
|
|
for category, results in categories.items():
|
|
|
# 只为有足够数据的类别生成图表
|
|
|
if len(results) < 2:
|
|
|
continue
|
|
|
|
|
|
plt.figure(figsize=(12, 8))
|
|
|
|
|
|
# 提取数据
|
|
|
names = [result.name for result in results]
|
|
|
values = [result.value for result in results]
|
|
|
|
|
|
# 创建柱状图
|
|
|
plt.bar(names, values)
|
|
|
plt.title(f'{category.upper()} Performance Benchmark')
|
|
|
plt.xlabel('Test Name')
|
|
|
plt.ylabel(f'Value ({results[0].unit})')
|
|
|
plt.xticks(rotation=45, ha='right')
|
|
|
plt.tight_layout()
|
|
|
|
|
|
# 保存图表
|
|
|
chart_file = self.output_dir / f"{category}_benchmark.png"
|
|
|
plt.savefig(chart_file, dpi=300, bbox_inches='tight')
|
|
|
plt.close()
|
|
|
|
|
|
except ImportError:
|
|
|
print("⚠️ matplotlib未安装,跳过图表生成")
|
|
|
|
|
|
|
|
|
async def main():
|
|
|
"""主函数"""
|
|
|
import argparse
|
|
|
|
|
|
parser = argparse.ArgumentParser(description='CodeDetect基准测试运行工具')
|
|
|
parser.add_argument('--output-dir', type=str, default='benchmark_results',
|
|
|
help='输出目录 (默认: benchmark_results)')
|
|
|
parser.add_argument('--iterations', type=int, default=5,
|
|
|
help='每个测试的迭代次数 (默认: 5)')
|
|
|
parser.add_argument('--suite', type=str, default='default',
|
|
|
help='基准测试套件名称 (默认: default)')
|
|
|
parser.add_argument('--verbose', action='store_true',
|
|
|
help='详细输出')
|
|
|
|
|
|
args = parser.parse_args()
|
|
|
|
|
|
print("🚀 CodeDetect基准测试运行器")
|
|
|
print("=" * 50)
|
|
|
|
|
|
# 创建运行器
|
|
|
runner = BenchmarkRunner(args.output_dir)
|
|
|
|
|
|
# 配置
|
|
|
config = {
|
|
|
"iterations": args.iterations,
|
|
|
"created_at": datetime.now().isoformat(),
|
|
|
"python_version": sys.version,
|
|
|
"platform": sys.platform
|
|
|
}
|
|
|
|
|
|
# 运行基准测试
|
|
|
suite = await runner.run_all_benchmarks(config)
|
|
|
|
|
|
print(f"\n✅ 基准测试完成!")
|
|
|
print(f"📁 结果目录: {args.output_dir}")
|
|
|
print(f"📊 测试数量: {len(suite.results)}")
|
|
|
print(f"⏱️ 总耗时: {suite.end_time - suite.start_time}")
|
|
|
|
|
|
if args.verbose:
|
|
|
print("\n📈 结果摘要:")
|
|
|
categories = {}
|
|
|
for result in suite.results:
|
|
|
if result.category not in categories:
|
|
|
categories[result.category] = []
|
|
|
categories[result.category].append(result)
|
|
|
|
|
|
for category, results in categories.items():
|
|
|
avg_value = statistics.mean(r.value for r in results)
|
|
|
print(f" {category}: 平均 {avg_value:.4f} {results[0].unit}")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
asyncio.run(main()) |