You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cbmc/codedetect/tests/performance/test_system_performance.py

699 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
系统性能测试
测试CodeDetect系统的整体性能特征包括
- 端到端验证流程性能
- 并发处理能力
- 内存使用情况
- 响应时间
- 吞吐量
- 系统资源监控
"""
import pytest
import asyncio
import time
import psutil
import os
import threading
import tempfile
import json
from unittest.mock import Mock, AsyncMock, patch, MagicMock
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import List, Dict, Any
import statistics
import gc
import tracemalloc
from src.mutate.engine import MutationEngine
from src.verify.cbmc_runner import CBMCRunner
from src.ui.api import CodeDetectAPI
from src.ui.web_app import create_app
class TestSystemPerformance:
"""系统级性能测试"""
@pytest.fixture(scope="class")
def performance_config(self):
"""性能测试配置"""
return {
"concurrent_users": [1, 5, 10, 20],
"test_duration": 30, # 秒
"warmup_time": 5, # 预热时间
"memory_threshold_mb": 1024, # 内存阈值
"response_time_threshold_ms": 5000, # 响应时间阈值
"throughput_threshold": 2.0, # 每秒处理数阈值
"max_cpu_percent": 80.0 # CPU使用率阈值
}
@pytest.fixture(scope="class")
def test_code_samples(self):
"""测试代码样本"""
return {
"simple": """
int add(int a, int b) {
return a + b;
}
""",
"medium": """
int factorial(int n) {
if (n <= 1) return 1;
return n * factorial(n - 1);
}
int array_sum(int* arr, int size) {
int sum = 0;
for (int i = 0; i < size; i++) {
sum += arr[i];
}
return sum;
}
""",
"complex": """
#include <stdlib.h>
typedef struct Node {
int data;
struct Node* next;
} Node;
Node* create_node(int data) {
Node* node = (Node*)malloc(sizeof(Node));
if (node == NULL) return NULL;
node->data = data;
node->next = NULL;
return node;
}
void free_list(Node* head) {
while (head != NULL) {
Node* temp = head;
head = head->next;
free(temp);
}
}
int list_length(Node* head) {
int count = 0;
while (head != NULL) {
count++;
head = head->next;
}
return count;
}
"""
}
def test_memory_usage_baseline(self, performance_config, test_code_samples):
"""测试内存使用基线"""
# 启动内存跟踪
tracemalloc.start()
# 测试内存使用
initial_memory = psutil.Process().memory_info().rss / 1024 / 1024
# 创建系统组件
mutation_engine = MutationEngine()
cbmc_runner = CBMCRunner()
# 模拟一些处理负载
test_code = test_code_samples["medium"]
# 执行一些操作
for _ in range(10):
try:
# 模拟验证流程
metadata = [{"name": "test_func", "complexity_score": 0.5}]
specification = "void test_func() { }"
# 模拟突变生成
mutations = mutation_engine.generate_mutations(
specification,
metadata,
max_mutations=5
)
# 清理内存
del mutations
gc.collect()
except Exception:
pass # 忽略错误,只关注内存使用
final_memory = psutil.Process().memory_info().rss / 1024 / 1024
memory_increase = final_memory - initial_memory
# 获取内存快照
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"\n内存使用统计:")
print(f" 初始内存: {initial_memory:.2f} MB")
print(f" 最终内存: {final_memory:.2f} MB")
print(f" 内存增长: {memory_increase:.2f} MB")
print(f" 当前跟踪: {current / 1024 / 1024:.2f} MB")
print(f" 峰值内存: {peak / 1024 / 1024:.2f} MB")
# 验证内存使用在阈值内
assert memory_increase < performance_config["memory_threshold_mb"], \
f"内存增长 {memory_increase:.2f}MB 超过阈值 {performance_config['memory_threshold_mb']}MB"
# 清理
del mutation_engine
del cbmc_runner
gc.collect()
@pytest.mark.asyncio
async def test_concurrent_verification_performance(self, performance_config, test_code_samples):
"""测试并发验证性能"""
results = []
async def run_verification(user_id: int) -> Dict[str, Any]:
"""运行单个验证任务"""
start_time = time.time()
try:
# 模拟完整的验证流程
test_code = test_code_samples["medium"]
# 创建组件
mutation_engine = MutationEngine()
# 模拟元数据
metadata = [{
"name": f"test_func_{user_id}",
"complexity_score": 0.6
}]
# 生成规范
specification = f"void test_func_{user_id}(int x) {{ __CPROVER_assume(x > 0); }}"
# 生成突变
mutations = mutation_engine.generate_mutations(
specification,
metadata,
max_mutations=3
)
# 模拟验证时间
await asyncio.sleep(0.1)
end_time = time.time()
return {
"user_id": user_id,
"success": True,
"response_time": (end_time - start_time) * 1000, # 毫秒
"mutations_count": len(mutations) if mutations else 0
}
except Exception as e:
end_time = time.time()
return {
"user_id": user_id,
"success": False,
"response_time": (end_time - start_time) * 1000,
"error": str(e)
}
# 测试不同并发级别
for concurrent_count in performance_config["concurrent_users"]:
print(f"\n测试 {concurrent_count} 个并发用户...")
# 预热
await asyncio.gather(*[
run_verification(i) for i in range(min(3, concurrent_count))
])
await asyncio.sleep(performance_config["warmup_time"])
# 正式测试
start_time = time.time()
tasks = [run_verification(i) for i in range(concurrent_count)]
results_batch = await asyncio.gather(*tasks)
end_time = time.time()
# 分析结果
successful_results = [r for r in results_batch if r["success"]]
response_times = [r["response_time"] for r in successful_results]
if response_times:
avg_response_time = statistics.mean(response_times)
max_response_time = max(response_times)
min_response_time = min(response_times)
throughput = len(successful_results) / (end_time - start_time)
print(f" 成功率: {len(successful_results)}/{concurrent_count} ({len(successful_results)/concurrent_count*100:.1f}%)")
print(f" 平均响应时间: {avg_response_time:.2f}ms")
print(f" 最大响应时间: {max_response_time:.2f}ms")
print(f" 最小响应时间: {min_response_time:.2f}ms")
print(f" 吞吐量: {throughput:.2f} requests/second")
# 性能断言
assert avg_response_time < performance_config["response_time_threshold_ms"], \
f"平均响应时间 {avg_response_time:.2f}ms 超过阈值 {performance_config['response_time_threshold_ms']}ms"
assert throughput >= performance_config["throughput_threshold"], \
f"吞吐量 {throughput:.2f} 低于阈值 {performance_config['throughput_threshold']}"
results.extend(results_batch)
# 总体统计
total_successful = len([r for r in results if r["success"]])
total_response_times = [r["response_time"] for r in results if r["success"]]
if total_response_times:
overall_avg = statistics.mean(total_response_times)
overall_p95 = statistics.quantiles(total_response_times, n=20)[18] # 95th percentile
print(f"\n总体性能统计:")
print(f" 总成功数: {total_successful}/{len(results)}")
print(f" 总平均响应时间: {overall_avg:.2f}ms")
print(f" 95th percentile: {overall_p95:.2f}ms")
def test_cpu_usage_monitoring(self, performance_config):
"""测试CPU使用率监控"""
import threading
import multiprocessing
def cpu_intensive_task():
"""CPU密集型任务"""
result = 0
for i in range(1000000):
result += i * i
return result
# 监控CPU使用率
cpu_monitor = []
monitor_duration = 10 # 监控10秒
monitor_interval = 0.5 # 每0.5秒采样一次
def monitor_cpu():
start_time = time.time()
while time.time() - start_time < monitor_duration:
cpu_percent = psutil.cpu_percent(interval=None)
cpu_monitor.append(cpu_percent)
time.sleep(monitor_interval)
# 启动监控线程
monitor_thread = threading.Thread(target=monitor_cpu)
monitor_thread.start()
# 启动工作线程
num_workers = multiprocessing.cpu_count() - 1 or 1
with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(cpu_intensive_task) for _ in range(num_workers * 2)]
# 等待任务完成
for future in as_completed(futures):
try:
future.result()
except Exception:
pass
# 等待监控完成
monitor_thread.join()
# 分析CPU使用率
if cpu_monitor:
avg_cpu = statistics.mean(cpu_monitor)
max_cpu = max(cpu_monitor)
print(f"\nCPU使用率监控:")
print(f" 平均CPU使用率: {avg_cpu:.1f}%")
print(f" 最大CPU使用率: {max_cpu:.1f}%")
print(f" 采样点数: {len(cpu_monitor)}")
# 验证CPU使用率在合理范围内
assert avg_cpu < performance_config["max_cpu_percent"], \
f"平均CPU使用率 {avg_cpu:.1f}% 超过阈值 {performance_config['max_cpu_percent']}%"
def test_system_throughput_scalability(self, test_code_samples):
"""测试系统吞吐量可扩展性"""
throughput_results = []
def measure_throughput(concurrent_requests: int, duration: int) -> float:
"""测量指定并发级别的吞吐量"""
start_time = time.time()
completed_requests = 0
async def process_request(request_id: int):
nonlocal completed_requests
try:
# 模拟验证处理
test_code = test_code_samples["simple"]
# 创建组件(在实际应用中应该重用)
mutation_engine = MutationEngine()
metadata = [{"name": f"func_{request_id}", "complexity_score": 0.3}]
specification = f"void func_{request_id}() {{ }}"
# 生成突变
mutations = mutation_engine.generate_mutations(
specification,
metadata,
max_mutations=2
)
completed_requests += 1
except Exception:
pass # 忽略错误,只测量吞吐量
# 创建任务
tasks = [process_request(i) for i in range(concurrent_requests)]
# 在指定时间内运行
async def run_with_timeout():
try:
await asyncio.wait_for(
asyncio.gather(*tasks),
timeout=duration
)
except asyncio.TimeoutError:
pass # 预期的超时
asyncio.run(run_with_timeout())
end_time = time.time()
actual_duration = end_time - start_time
return completed_requests / actual_duration
# 测试不同并发级别
concurrency_levels = [1, 2, 5, 10, 15]
for concurrency in concurrency_levels:
throughput = measure_throughput(concurrency, duration=10)
throughput_results.append((concurrency, throughput))
print(f"并发数 {concurrency}: 吞吐量 {throughput:.2f} req/s")
# 分析可扩展性
if len(throughput_results) > 1:
# 计算扩展效率
baseline_throughput = throughput_results[0][1]
for i, (concurrency, throughput) in enumerate(throughput_results[1:], 1):
ideal_throughput = baseline_throughput * concurrency
efficiency = (throughput / ideal_throughput) * 100
print(f" 并发 {concurrency} 效率: {efficiency:.1f}%")
# 验证可扩展性效率不应过低
assert efficiency > 30, \
f"并发 {concurrency} 的扩展效率 {efficiency:.1f}% 过低"
def test_response_time_distribution(self, test_code_samples):
"""测试响应时间分布"""
response_times = []
sample_size = 50
async def measure_single_request():
"""测量单个请求的响应时间"""
start_time = time.time()
try:
# 模拟验证流程
test_code = test_code_samples["medium"]
mutation_engine = MutationEngine()
metadata = [{"name": "test_func", "complexity_score": 0.5}]
specification = "void test_func(int x) { __CPROVER_assume(x > 0); }"
# 生成突变
mutations = mutation_engine.generate_mutations(
specification,
metadata,
max_mutations=3
)
end_time = time.time()
return (end_time - start_time) * 1000 # 转换为毫秒
except Exception as e:
end_time = time.time()
print(f"请求失败: {e}")
return (end_time - start_time) * 1000
# 收集响应时间样本
for _ in range(sample_size):
response_time = asyncio.run(measure_single_request())
response_times.append(response_time)
# 分析响应时间分布
if response_times:
avg_time = statistics.mean(response_times)
median_time = statistics.median(response_times)
min_time = min(response_times)
max_time = max(response_times)
# 计算百分位数
percentiles = statistics.quantiles(response_times, n=10)
p90 = percentiles[8] # 90th percentile
p95 = percentiles[9] if len(percentiles) > 9 else max_time
# 计算标准差
stdev = statistics.stdev(response_times) if len(response_times) > 1 else 0
print(f"\n响应时间分布 ({sample_size} 个样本):")
print(f" 平均值: {avg_time:.2f}ms")
print(f" 中位数: {median_time:.2f}ms")
print(f" 最小值: {min_time:.2f}ms")
print(f" 最大值: {max_time:.2f}ms")
print(f" 90th percentile: {p90:.2f}ms")
print(f" 95th percentile: {p95:.2f}ms")
print(f" 标准差: {stdev:.2f}ms")
# 验证响应时间分布的合理性
assert max_time < 10000, f"最大响应时间 {max_time:.2f}ms 过长"
assert stdev < avg_time, f"响应时间标准差 {stdev:.2f}ms 相对于平均值过大"
def test_memory_leak_detection(self, performance_config):
"""检测内存泄漏"""
tracemalloc.start()
# 记录初始内存
snapshot1 = tracemalloc.take_snapshot()
# 执行一系列操作
for iteration in range(5):
# 创建组件并执行操作
mutation_engine = MutationEngine()
# 执行一些处理
for i in range(10):
try:
metadata = [{"name": f"func_{i}", "complexity_score": 0.4}]
specification = f"void func_{i}() {{ }}"
mutations = mutation_engine.generate_mutations(
specification,
metadata,
max_mutations=2
)
except Exception:
pass
# 删除引用
del mutation_engine
gc.collect()
# 记录最终内存
snapshot2 = tracemalloc.take_snapshot()
# 比较内存快照
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
print("\n内存泄漏检测:")
total_increase = sum(stat.size_diff for stat in top_stats[:10])
print(f" 总内存增长: {total_increase / 1024:.2f} KB")
# 显示增长最多的文件
for stat in top_stats[:5]:
if stat.size_diff > 0:
print(f" {stat.traceback.format()[-1]}: +{stat.size_diff / 1024:.2f} KB")
# 验证没有显著内存泄漏
assert total_increase < 1024 * 100, # 小于100KB
f"检测到可能的内存泄漏,增长 {total_increase / 1024:.2f} KB"
tracemalloc.stop()
@pytest.mark.asyncio
async def test_system_stability_under_load(self, performance_config):
"""测试系统在负载下的稳定性"""
test_duration = 60 # 60秒
sample_interval = 5 # 每5秒采样一次
system_metrics = {
"timestamp": [],
"memory_mb": [],
"cpu_percent": [],
"active_threads": []
}
def collect_metrics():
"""收集系统指标"""
process = psutil.Process()
timestamp = time.time()
memory_mb = process.memory_info().rss / 1024 / 1024
cpu_percent = process.cpu_percent()
active_threads = process.num_threads()
system_metrics["timestamp"].append(timestamp)
system_metrics["memory_mb"].append(memory_mb)
system_metrics["cpu_percent"].append(cpu_percent)
system_metrics["active_threads"].append(active_threads)
async def sustained_load_task():
"""持续负载任务"""
task_id = threading.current_thread().ident
while time.time() < start_time + test_duration:
try:
# 模拟验证工作
mutation_engine = MutationEngine()
metadata = [{"name": f"load_func_{task_id}", "complexity_score": 0.5}]
specification = f"void load_func_{task_id}() {{ }}"
mutations = mutation_engine.generate_mutations(
specification,
metadata,
max_mutations=1
)
# 短暂休眠
await asyncio.sleep(0.1)
except Exception:
pass
# 启动监控
start_time = time.time()
# 启动负载任务
num_load_tasks = 5
load_tasks = [sustained_load_task() for _ in range(num_load_tasks)]
# 在测试期间定期收集指标
async def monitor_during_load():
while time.time() - start_time < test_duration:
collect_metrics()
await asyncio.sleep(sample_interval)
# 运行监控和负载
await asyncio.gather(monitor_during_load(), *load_tasks)
# 分析稳定性指标
memory_values = system_metrics["memory_mb"]
cpu_values = system_metrics["cpu_percent"]
thread_values = system_metrics["active_threads"]
if memory_values:
memory_trend = (memory_values[-1] - memory_values[0]) / len(memory_values)
memory_volatility = statistics.stdev(memory_values) if len(memory_values) > 1 else 0
print(f"\n系统稳定性分析 ({test_duration}秒):")
print(f" 内存趋势: {memory_trend:.2f} MB/样本")
print(f" 内存波动: {memory_volatility:.2f} MB")
print(f" 最终内存: {memory_values[-1]:.2f} MB")
print(f" CPU平均值: {statistics.mean(cpu_values):.1f}%")
print(f" 线程数范围: {min(thread_values)}-{max(thread_values)}")
# 验证稳定性
assert abs(memory_trend) < 1.0, f"内存趋势 {memory_trend:.2f} MB/样本 表明可能存在内存问题"
assert memory_volatility < 50, f"内存波动 {memory_volatility:.2f} MB 过高"
assert max(thread_values) < 100, f"线程数 {max(thread_values)} 过多"
class TestDatabasePerformance:
"""数据库性能测试(如果使用数据库)"""
def test_cache_performance(self):
"""测试缓存性能"""
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_function(key: str) -> str:
"""模拟缓存函数"""
time.sleep(0.001) # 模拟处理时间
return f"result_{key}"
# 测试缓存命中
keys = [f"key_{i % 20}" for i in range(100)] # 20个不同的键
# 预热缓存
for key in keys[:20]:
cached_function(key)
# 测试缓存性能
start_time = time.time()
for key in keys:
result = cached_function(key)
end_time = time.time()
cached_time = end_time - start_time
# 测试无缓存性能
@lru_cache(maxsize=0) # 禁用缓存
def uncached_function(key: str) -> str:
time.sleep(0.001)
return f"result_{key}"
start_time = time.time()
for key in keys:
result = uncached_function(key)
end_time = time.time()
uncached_time = end_time - start_time
speedup = uncached_time / cached_time
print(f"\n缓存性能测试:")
print(f" 缓存时间: {cached_time:.3f}s")
print(f" 无缓存时间: {uncached_time:.3f}s")
print(f" 加速比: {speedup:.1f}x")
# 验证缓存效果
assert speedup > 5, f"缓存加速比 {speedup:.1f}x 过低"
def test_file_io_performance(self):
"""测试文件I/O性能"""
with tempfile.TemporaryDirectory() as temp_dir:
# 创建测试文件
test_file = os.path.join(temp_dir, "test.c")
test_content = """
int test_function(int x) {
return x * 2;
}
""" * 100 # 重复内容模拟大文件
# 测试写入性能
start_time = time.time()
with open(test_file, 'w') as f:
f.write(test_content)
write_time = time.time() - start_time
# 测试读取性能
start_time = time.time()
with open(test_file, 'r') as f:
content = f.read()
read_time = time.time() - start_time
print(f"\n文件I/O性能:")
print(f" 写入时间: {write_time:.4f}s")
print(f" 读取时间: {read_time:.4f}s")
print(f" 文件大小: {len(test_content)} bytes")
# 验证I/O性能
assert write_time < 0.1, f"写入时间 {write_time:.4f}s 过长"
assert read_time < 0.1, f"读取时间 {read_time:.4f}s 过长"
if __name__ == "__main__":
pytest.main([__file__, "-v", "--tb=short"])