You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cbmc/codedetect/tools/run_benchmarks.py

848 lines
28 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python3
"""
CodeDetect基准测试运行工具
用于运行和记录系统性能基准测试,包括:
- 解析性能基准
- 验证性能基准
- 突变生成性能基准
- 系统资源使用基准
- 并发性能基准
"""
import os
import sys
import json
import time
import asyncio
import psutil
import statistics
import threading
import multiprocessing
from pathlib import Path
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass, asdict, field
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import matplotlib.pyplot as plt
import numpy as np
import yaml
@dataclass
class BenchmarkResult:
"""基准测试结果"""
name: str
category: str
metric: str
value: float
unit: str
timestamp: datetime
metadata: Dict[str, Any] = field(default_factory=dict)
samples: List[float] = field(default_factory=list)
@dataclass
class BenchmarkSuite:
"""基准测试套件"""
name: str
description: str
results: List[BenchmarkResult] = field(default_factory=list)
config: Dict[str, Any] = field(default_factory=dict)
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
class SystemMonitor:
"""系统资源监控器"""
def __init__(self, interval: float = 0.5):
self.interval = interval
self.monitoring = False
self.cpu_samples = []
self.memory_samples = []
self.disk_io_samples = []
self.network_io_samples = []
self.thread = None
def start(self):
"""开始监控"""
self.monitoring = True
self.cpu_samples = []
self.memory_samples = []
self.disk_io_samples = []
self.network_io_samples = []
self.thread = threading.Thread(target=self._monitor_loop)
self.thread.daemon = True
self.thread.start()
def stop(self):
"""停止监控"""
self.monitoring = False
if self.thread:
self.thread.join(timeout=1.0)
def _monitor_loop(self):
"""监控循环"""
while self.monitoring:
try:
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=None)
self.cpu_samples.append(cpu_percent)
# 内存使用
memory = psutil.virtual_memory()
self.memory_samples.append({
"total": memory.total,
"available": memory.available,
"used": memory.used,
"percent": memory.percent
})
# 磁盘IO
disk_io = psutil.disk_io_counters()
if disk_io:
self.disk_io_samples.append({
"read_bytes": disk_io.read_bytes,
"write_bytes": disk_io.write_bytes,
"read_count": disk_io.read_count,
"write_count": disk_io.write_count
})
# 网络IO
network_io = psutil.net_io_counters()
if network_io:
self.network_io_samples.append({
"bytes_sent": network_io.bytes_sent,
"bytes_recv": network_io.bytes_recv,
"packets_sent": network_io.packets_sent,
"packets_recv": network_io.packets_recv
})
time.sleep(self.interval)
except Exception as e:
print(f"监控错误: {e}")
break
def get_stats(self) -> Dict[str, Any]:
"""获取监控统计"""
stats = {}
if self.cpu_samples:
stats["cpu"] = {
"mean": statistics.mean(self.cpu_samples),
"median": statistics.median(self.cpu_samples),
"min": min(self.cpu_samples),
"max": max(self.cpu_samples),
"std": statistics.stdev(self.cpu_samples) if len(self.cpu_samples) > 1 else 0
}
if self.memory_samples:
memory_percents = [sample["percent"] for sample in self.memory_samples]
stats["memory"] = {
"mean_percent": statistics.mean(memory_percents),
"median_percent": statistics.median(memory_percents),
"min_percent": min(memory_percents),
"max_percent": max(memory_percents),
"peak_used_mb": max(sample["used"] for sample in self.memory_samples) / 1024 / 1024
}
return stats
class CodeParsingBenchmark:
"""代码解析性能基准测试"""
def __init__(self):
self.test_codes = self._generate_test_codes()
def _generate_test_codes(self) -> List[Dict[str, Any]]:
"""生成测试代码"""
return [
{
"name": "small_function",
"code": "int add(int a, int b) { return a + b; }",
"expected_functions": 1
},
{
"name": "medium_functions",
"code": self._generate_medium_code(),
"expected_functions": 5
},
{
"name": "large_functions",
"code": self._generate_large_code(),
"expected_functions": 20
},
{
"name": "complex_structures",
"code": self._generate_complex_code(),
"expected_functions": 10
}
]
def _generate_medium_code(self) -> str:
"""生成中等复杂度代码"""
return """
int max(int a, int b) { return a > b ? a : b; }
int min(int a, int b) { return a < b ? a : b; }
int abs(int x) { return x >= 0 ? x : -x; }
int factorial(int n) { return n <= 1 ? 1 : n * factorial(n - 1); }
int gcd(int a, int b) { return b == 0 ? a : gcd(b, a % b); }
"""
def _generate_large_code(self) -> str:
"""生成大型代码"""
functions = []
for i in range(20):
func = f"""
int func_{i}(int x) {{
int result = x * {i};
for (int j = 0; j < {i % 10}; j++) {{
result += j;
}}
return result;
}}
"""
functions.append(func)
return "\n".join(functions)
def _generate_complex_code(self) -> str:
"""生成复杂代码"""
return """
typedef struct {
int x;
int y;
char name[50];
} Point;
typedef struct {
Point start;
Point end;
int id;
} LineSegment;
float distance(Point p1, Point p2) {
int dx = p1.x - p2.x;
int dy = p1.y - p2.y;
return sqrt(dx*dx + dy*dy);
}
int is_collinear(Point p1, Point p2, Point p3) {
int area = (p2.x - p1.x) * (p3.y - p1.y) - (p3.x - p1.x) * (p2.y - p1.y);
return area == 0;
}
float line_length(LineSegment line) {
return distance(line.start, line.end);
}
Point midpoint(LineSegment line) {
Point mid;
mid.x = (line.start.x + line.end.x) / 2;
mid.y = (line.start.y + line.end.y) / 2;
return mid;
}
int line_intersection(LineSegment l1, LineSegment l2, Point* result) {
// 线段相交检测逻辑
return 0; // 简化实现
}
void normalize_point(Point* p) {
float mag = sqrt(p->x*p->x + p->y*p->y);
if (mag > 0) {
p->x /= mag;
p->y /= mag;
}
}
int point_in_rect(Point p, Point rect_min, Point rect_max) {
return p.x >= rect_min.x && p.x <= rect_max.x &&
p.y >= rect_min.y && p.y <= rect_max.y;
}
float angle_between_points(Point p1, Point p2, Point p3) {
float v1x = p1.x - p2.x;
float v1y = p1.y - p2.y;
float v2x = p3.x - p2.x;
float v2y = p3.y - p2.y;
return atan2(v1x*v2y - v1y*v2x, v1x*v2x + v1y*v2y);
}
"""
async def run_benchmark(self, iterations: int = 10) -> List[BenchmarkResult]:
"""运行解析基准测试"""
results = []
for test_case in self.test_codes:
print(f"🔍 运行解析基准测试: {test_case['name']}")
# 动态导入解析器
try:
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.parse.code_parser import CodeParser
parser = CodeParser()
parse_times = []
# 创建临时文件
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.c', delete=False) as f:
f.write(test_case['code'])
temp_file = f.name
try:
# 多次运行以获取平均值
for i in range(iterations):
start_time = time.time()
result = parser.parse_file(temp_file)
end_time = time.time()
parse_time = end_time - start_time
parse_times.append(parse_time)
# 验证结果
if len(result.functions) != test_case['expected_functions']:
print(f"⚠️ 解析结果不匹配: 期望 {test_case['expected_functions']}, 实际 {len(result.functions)}")
finally:
os.unlink(temp_file)
# 计算统计信息
avg_time = statistics.mean(parse_times)
median_time = statistics.median(parse_times)
std_time = statistics.stdev(parse_times) if len(parse_times) > 1 else 0
# 创建结果
result = BenchmarkResult(
name=f"parsing_{test_case['name']}",
category="parsing",
metric="time",
value=avg_time,
unit="seconds",
timestamp=datetime.now(),
metadata={
"code_size": len(test_case['code']),
"expected_functions": test_case['expected_functions'],
"iterations": iterations,
"median_time": median_time,
"std_time": std_time,
"min_time": min(parse_times),
"max_time": max(parse_times)
},
samples=parse_times
)
results.append(result)
except ImportError as e:
print(f"⚠️ 无法导入解析器: {e}")
continue
return results
class VerificationBenchmark:
"""验证性能基准测试"""
async def run_benchmark(self, iterations: int = 5) -> List[BenchmarkResult]:
"""运行验证基准测试"""
results = []
# 测试用例
test_cases = [
{
"name": "simple_arithmetic",
"code": "int add(int a, int b) { return a + b; }",
"spec": self._generate_simple_spec("add")
},
{
"name": "array_processing",
"code": """
int sum_array(int* arr, int size) {
if (!arr || size <= 0) return 0;
int sum = 0;
for (int i = 0; i < size; i++) {
sum += arr[i];
}
return sum;
}""",
"spec": self._generate_array_spec()
}
]
for test_case in test_cases:
print(f"🔍 运行验证基准测试: {test_case['name']}")
try:
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.verify.cbmc_runner import CBMCRunner
runner = CBMCRunner()
verification_times = []
# 创建临时文件
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.c', delete=False) as f:
f.write(test_case['code'])
temp_file = f.name
try:
for i in range(iterations):
start_time = time.time()
result = await runner.run_verification(
function_metadata={"name": "test_function"},
source_file=temp_file,
specification=test_case['spec']
)
end_time = time.time()
verification_time = end_time - start_time
verification_times.append(verification_time)
# 记录结果状态
metadata = {
"status": result.status,
"execution_time": result.execution_time,
"success": result.status == "success"
}
finally:
os.unlink(temp_file)
if verification_times:
avg_time = statistics.mean(verification_times)
result = BenchmarkResult(
name=f"verification_{test_case['name']}",
category="verification",
metric="time",
value=avg_time,
unit="seconds",
timestamp=datetime.now(),
metadata={
"iterations": iterations,
"success_rate": sum(1 for t in verification_times if t < 30) / len(verification_times),
"min_time": min(verification_times),
"max_time": max(verification_times)
},
samples=verification_times
)
results.append(result)
except ImportError as e:
print(f"⚠️ 无法导入验证器: {e}")
continue
return results
def _generate_simple_spec(self, func_name: str) -> str:
"""生成简单规范"""
return f"""
void {func_name}_test() {{
int a = __CPROVER_nondet_int();
int b = __CPROVER_nondet_int();
__CPROVER_assume(a >= -1000 && a <= 1000);
__CPROVER_assume(b >= -1000 && b <= 1000);
int result = {func_name}(a, b);
__CPROVER_assert(result == a + b, "addition_correct");
}}
"""
def _generate_array_spec(self) -> str:
"""生成数组规范"""
return """
void sum_array_test() {
int size = __CPROVER_nondet_int();
__CPROVER_assume(size >= 0 && size <= 10);
int arr[size];
for (int i = 0; i < size; i++) {
arr[i] = __CPROVER_nondet_int();
}
int result = sum_array(arr, size);
__CPROVER_assert(result >= 0, "non_negative_result");
}
"""
class MutationBenchmark:
"""突变生成性能基准测试"""
async def run_benchmark(self, iterations: int = 10) -> List[BenchmarkResult]:
"""运行突变基准测试"""
results = []
try:
sys.path.insert(0, str(Path(__file__).parent.parent))
from src.mutate.engine import MutationEngine
engine = MutationEngine()
test_cases = [
{
"name": "simple_function",
"spec": "void test() { int x = 0; }",
"metadata": [{"name": "test", "complexity_score": 0.1}]
},
{
"name": "complex_function",
"spec": self._generate_complex_spec(),
"metadata": [{"name": "complex_test", "complexity_score": 0.8}]
}
]
for test_case in test_cases:
print(f"🔍 运行突变基准测试: {test_case['name']}")
mutation_times = []
mutation_counts = []
for i in range(iterations):
start_time = time.time()
mutations = engine.generate_mutations(
test_case['spec'],
test_case['metadata'],
max_mutations=5
)
end_time = time.time()
mutation_time = end_time - start_time
mutation_times.append(mutation_time)
mutation_counts.append(len(mutations))
if mutation_times:
avg_time = statistics.mean(mutation_times)
avg_count = statistics.mean(mutation_counts)
result = BenchmarkResult(
name=f"mutation_{test_case['name']}",
category="mutation",
metric="time",
value=avg_time,
unit="seconds",
timestamp=datetime.now(),
metadata={
"iterations": iterations,
"avg_mutations": avg_count,
"min_time": min(mutation_times),
"max_time": max(mutation_times)
},
samples=mutation_times
)
results.append(result)
except ImportError as e:
print(f"⚠️ 无法导入突变引擎: {e}")
return results
def _generate_complex_spec(self) -> str:
"""生成复杂规范"""
return """
void complex_test() {
int arr[10];
for (int i = 0; i < 10; i++) {
arr[i] = __CPROVER_nondet_int();
__CPROVER_assume(arr[i] >= 0 && arr[i] <= 100);
}
int sum = 0;
for (int i = 0; i < 10; i++) {
if (arr[i] % 2 == 0) {
sum += arr[i];
}
}
__CPROVER_assert(sum >= 0, "sum_non_negative");
}
"""
class ConcurrencyBenchmark:
"""并发性能基准测试"""
async def run_benchmark(self, max_workers: int = 8) -> List[BenchmarkResult]:
"""运行并发基准测试"""
results = []
# 测试不同的并发级别
for workers in [1, 2, 4, 8]:
print(f"🔍 运行并发基准测试: {workers} workers")
throughput_times = []
success_count = 0
def worker_task(task_id: int) -> float:
"""工作线程任务"""
start_time = time.time()
# 模拟工作负载
total = 0
for i in range(10000):
total += i * task_id
end_time = time.time()
return end_time - start_time
# 运行并发测试
for iteration in range(5):
start_time = time.time()
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(worker_task, i) for i in range(workers * 2)]
results_list = [future.result() for future in futures]
end_time = time.time()
total_time = end_time - start_time
throughput_times.append(total_time)
success_count += len(results_list)
if throughput_times:
avg_time = statistics.mean(throughput_time for throughput_time in throughput_times)
throughput = success_count / avg_time if avg_time > 0 else 0
result = BenchmarkResult(
name=f"concurrent_workers_{workers}",
category="concurrency",
metric="throughput",
value=throughput,
unit="tasks/second",
timestamp=datetime.now(),
metadata={
"workers": workers,
"iterations": 5,
"avg_time": avg_time,
"success_rate": success_count / (5 * workers * 2)
},
samples=throughput_times
)
results.append(result)
return results
class BenchmarkRunner:
"""基准测试运行器"""
def __init__(self, output_dir: str):
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.suites: List[BenchmarkSuite] = []
self.system_monitor = SystemMonitor()
async def run_all_benchmarks(self, config: Dict[str, Any]) -> BenchmarkSuite:
"""运行所有基准测试"""
suite = BenchmarkSuite(
name="complete_benchmark_suite",
description="完整的CodeDetect性能基准测试",
config=config,
start_time=datetime.now()
)
print("🚀 开始运行基准测试套件...")
# 开始系统监控
self.system_monitor.start()
try:
# 运行各个基准测试
benchmarks = [
("代码解析", CodeParsingBenchmark()),
("验证性能", VerificationBenchmark()),
("突变生成", MutationBenchmark()),
("并发性能", ConcurrencyBenchmark())
]
for benchmark_name, benchmark in benchmarks:
print(f"\n📊 运行 {benchmark_name} 基准测试...")
results = await benchmark.run_benchmark(
iterations=config.get("iterations", 5)
)
suite.results.extend(results)
finally:
# 停止系统监控
self.system_monitor.stop()
suite.end_time = datetime.now()
# 添加系统监控结果
system_stats = self.system_monitor.get_stats()
if system_stats:
for metric_name, stats in system_stats.items():
for stat_name, value in stats.items():
result = BenchmarkResult(
name=f"system_{metric_name}_{stat_name}",
category="system",
metric=stat_name,
value=value,
unit="percent" if "percent" in stat_name else "value",
timestamp=datetime.now(),
metadata=system_stats
)
suite.results.append(result)
# 保存结果
self._save_suite_results(suite)
self._generate_report(suite)
return suite
def _save_suite_results(self, suite: BenchmarkSuite):
"""保存基准测试结果"""
# 转换为可序列化格式
suite_dict = asdict(suite)
suite_dict["start_time"] = suite.start_time.isoformat()
suite_dict["end_time"] = suite.end_time.isoformat()
suite_dict["results"] = [
{
**asdict(result),
"timestamp": result.timestamp.isoformat()
}
for result in suite.results
]
# 保存JSON格式
json_file = self.output_dir / f"{suite.name}.json"
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(suite_dict, f, indent=2, ensure_ascii=False)
# 保存YAML格式
yaml_file = self.output_dir / f"{suite.name}.yaml"
with open(yaml_file, 'w', encoding='utf-8') as f:
yaml.dump(suite_dict, f, default_flow_style=False, allow_unicode=True)
def _generate_report(self, suite: BenchmarkSuite):
"""生成基准测试报告"""
report_file = self.output_dir / f"{suite.name}_report.md"
with open(report_file, 'w', encoding='utf-8') as f:
f.write(f"# CodeDetect 基准测试报告\n\n")
f.write(f"**测试套件**: {suite.name}\n")
f.write(f"**开始时间**: {suite.start_time}\n")
f.write(f"**结束时间**: {suite.end_time}\n")
f.write(f"**总耗时**: {suite.end_time - suite.start_time}\n\n")
# 按类别分组结果
categories = {}
for result in suite.results:
if result.category not in categories:
categories[result.category] = []
categories[result.category].append(result)
for category, results in categories.items():
f.write(f"## {category.upper()} 基准测试\n\n")
for result in results:
f.write(f"### {result.name}\n")
f.write(f"- **指标**: {result.metric}\n")
f.write(f"- **值**: {result.value:.4f} {result.unit}\n")
f.write(f"- **样本数**: {len(result.samples)}\n")
if result.metadata:
f.write("- **附加信息**:\n")
for key, value in result.metadata.items():
f.write(f" - {key}: {value}\n")
f.write("\n")
# 生成性能趋势图
self._generate_performance_charts(suite)
def _generate_performance_charts(self, suite: BenchmarkSuite):
"""生成性能趋势图"""
try:
import matplotlib.pyplot as plt
# 按类别分组
categories = {}
for result in suite.results:
if result.category not in categories:
categories[result.category] = []
categories[result.category].append(result)
for category, results in categories.items():
# 只为有足够数据的类别生成图表
if len(results) < 2:
continue
plt.figure(figsize=(12, 8))
# 提取数据
names = [result.name for result in results]
values = [result.value for result in results]
# 创建柱状图
plt.bar(names, values)
plt.title(f'{category.upper()} Performance Benchmark')
plt.xlabel('Test Name')
plt.ylabel(f'Value ({results[0].unit})')
plt.xticks(rotation=45, ha='right')
plt.tight_layout()
# 保存图表
chart_file = self.output_dir / f"{category}_benchmark.png"
plt.savefig(chart_file, dpi=300, bbox_inches='tight')
plt.close()
except ImportError:
print("⚠️ matplotlib未安装跳过图表生成")
async def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='CodeDetect基准测试运行工具')
parser.add_argument('--output-dir', type=str, default='benchmark_results',
help='输出目录 (默认: benchmark_results)')
parser.add_argument('--iterations', type=int, default=5,
help='每个测试的迭代次数 (默认: 5)')
parser.add_argument('--suite', type=str, default='default',
help='基准测试套件名称 (默认: default)')
parser.add_argument('--verbose', action='store_true',
help='详细输出')
args = parser.parse_args()
print("🚀 CodeDetect基准测试运行器")
print("=" * 50)
# 创建运行器
runner = BenchmarkRunner(args.output_dir)
# 配置
config = {
"iterations": args.iterations,
"created_at": datetime.now().isoformat(),
"python_version": sys.version,
"platform": sys.platform
}
# 运行基准测试
suite = await runner.run_all_benchmarks(config)
print(f"\n✅ 基准测试完成!")
print(f"📁 结果目录: {args.output_dir}")
print(f"📊 测试数量: {len(suite.results)}")
print(f"⏱️ 总耗时: {suite.end_time - suite.start_time}")
if args.verbose:
print("\n📈 结果摘要:")
categories = {}
for result in suite.results:
if result.category not in categories:
categories[result.category] = []
categories[result.category].append(result)
for category, results in categories.items():
avg_value = statistics.mean(r.value for r in results)
print(f" {category}: 平均 {avg_value:.4f} {results[0].unit}")
if __name__ == "__main__":
asyncio.run(main())