""" 系统性能测试 测试CodeDetect系统的整体性能特征,包括: - 端到端验证流程性能 - 并发处理能力 - 内存使用情况 - 响应时间 - 吞吐量 - 系统资源监控 """ import pytest import asyncio import time import psutil import os import threading import tempfile import json from unittest.mock import Mock, AsyncMock, patch, MagicMock from concurrent.futures import ThreadPoolExecutor, as_completed from typing import List, Dict, Any import statistics import gc import tracemalloc from src.mutate.engine import MutationEngine from src.verify.cbmc_runner import CBMCRunner from src.ui.api import CodeDetectAPI from src.ui.web_app import create_app class TestSystemPerformance: """系统级性能测试""" @pytest.fixture(scope="class") def performance_config(self): """性能测试配置""" return { "concurrent_users": [1, 5, 10, 20], "test_duration": 30, # 秒 "warmup_time": 5, # 预热时间 "memory_threshold_mb": 1024, # 内存阈值 "response_time_threshold_ms": 5000, # 响应时间阈值 "throughput_threshold": 2.0, # 每秒处理数阈值 "max_cpu_percent": 80.0 # CPU使用率阈值 } @pytest.fixture(scope="class") def test_code_samples(self): """测试代码样本""" return { "simple": """ int add(int a, int b) { return a + b; } """, "medium": """ int factorial(int n) { if (n <= 1) return 1; return n * factorial(n - 1); } int array_sum(int* arr, int size) { int sum = 0; for (int i = 0; i < size; i++) { sum += arr[i]; } return sum; } """, "complex": """ #include typedef struct Node { int data; struct Node* next; } Node; Node* create_node(int data) { Node* node = (Node*)malloc(sizeof(Node)); if (node == NULL) return NULL; node->data = data; node->next = NULL; return node; } void free_list(Node* head) { while (head != NULL) { Node* temp = head; head = head->next; free(temp); } } int list_length(Node* head) { int count = 0; while (head != NULL) { count++; head = head->next; } return count; } """ } def test_memory_usage_baseline(self, performance_config, test_code_samples): """测试内存使用基线""" # 启动内存跟踪 tracemalloc.start() # 测试内存使用 initial_memory = psutil.Process().memory_info().rss / 1024 / 1024 # 创建系统组件 mutation_engine = MutationEngine() cbmc_runner = CBMCRunner() # 模拟一些处理负载 test_code = test_code_samples["medium"] # 执行一些操作 for _ in range(10): try: # 模拟验证流程 metadata = [{"name": "test_func", "complexity_score": 0.5}] specification = "void test_func() { }" # 模拟突变生成 mutations = mutation_engine.generate_mutations( specification, metadata, max_mutations=5 ) # 清理内存 del mutations gc.collect() except Exception: pass # 忽略错误,只关注内存使用 final_memory = psutil.Process().memory_info().rss / 1024 / 1024 memory_increase = final_memory - initial_memory # 获取内存快照 current, peak = tracemalloc.get_traced_memory() tracemalloc.stop() print(f"\n内存使用统计:") print(f" 初始内存: {initial_memory:.2f} MB") print(f" 最终内存: {final_memory:.2f} MB") print(f" 内存增长: {memory_increase:.2f} MB") print(f" 当前跟踪: {current / 1024 / 1024:.2f} MB") print(f" 峰值内存: {peak / 1024 / 1024:.2f} MB") # 验证内存使用在阈值内 assert memory_increase < performance_config["memory_threshold_mb"], \ f"内存增长 {memory_increase:.2f}MB 超过阈值 {performance_config['memory_threshold_mb']}MB" # 清理 del mutation_engine del cbmc_runner gc.collect() @pytest.mark.asyncio async def test_concurrent_verification_performance(self, performance_config, test_code_samples): """测试并发验证性能""" results = [] async def run_verification(user_id: int) -> Dict[str, Any]: """运行单个验证任务""" start_time = time.time() try: # 模拟完整的验证流程 test_code = test_code_samples["medium"] # 创建组件 mutation_engine = MutationEngine() # 模拟元数据 metadata = [{ "name": f"test_func_{user_id}", "complexity_score": 0.6 }] # 生成规范 specification = f"void test_func_{user_id}(int x) {{ __CPROVER_assume(x > 0); }}" # 生成突变 mutations = mutation_engine.generate_mutations( specification, metadata, max_mutations=3 ) # 模拟验证时间 await asyncio.sleep(0.1) end_time = time.time() return { "user_id": user_id, "success": True, "response_time": (end_time - start_time) * 1000, # 毫秒 "mutations_count": len(mutations) if mutations else 0 } except Exception as e: end_time = time.time() return { "user_id": user_id, "success": False, "response_time": (end_time - start_time) * 1000, "error": str(e) } # 测试不同并发级别 for concurrent_count in performance_config["concurrent_users"]: print(f"\n测试 {concurrent_count} 个并发用户...") # 预热 await asyncio.gather(*[ run_verification(i) for i in range(min(3, concurrent_count)) ]) await asyncio.sleep(performance_config["warmup_time"]) # 正式测试 start_time = time.time() tasks = [run_verification(i) for i in range(concurrent_count)] results_batch = await asyncio.gather(*tasks) end_time = time.time() # 分析结果 successful_results = [r for r in results_batch if r["success"]] response_times = [r["response_time"] for r in successful_results] if response_times: avg_response_time = statistics.mean(response_times) max_response_time = max(response_times) min_response_time = min(response_times) throughput = len(successful_results) / (end_time - start_time) print(f" 成功率: {len(successful_results)}/{concurrent_count} ({len(successful_results)/concurrent_count*100:.1f}%)") print(f" 平均响应时间: {avg_response_time:.2f}ms") print(f" 最大响应时间: {max_response_time:.2f}ms") print(f" 最小响应时间: {min_response_time:.2f}ms") print(f" 吞吐量: {throughput:.2f} requests/second") # 性能断言 assert avg_response_time < performance_config["response_time_threshold_ms"], \ f"平均响应时间 {avg_response_time:.2f}ms 超过阈值 {performance_config['response_time_threshold_ms']}ms" assert throughput >= performance_config["throughput_threshold"], \ f"吞吐量 {throughput:.2f} 低于阈值 {performance_config['throughput_threshold']}" results.extend(results_batch) # 总体统计 total_successful = len([r for r in results if r["success"]]) total_response_times = [r["response_time"] for r in results if r["success"]] if total_response_times: overall_avg = statistics.mean(total_response_times) overall_p95 = statistics.quantiles(total_response_times, n=20)[18] # 95th percentile print(f"\n总体性能统计:") print(f" 总成功数: {total_successful}/{len(results)}") print(f" 总平均响应时间: {overall_avg:.2f}ms") print(f" 95th percentile: {overall_p95:.2f}ms") def test_cpu_usage_monitoring(self, performance_config): """测试CPU使用率监控""" import threading import multiprocessing def cpu_intensive_task(): """CPU密集型任务""" result = 0 for i in range(1000000): result += i * i return result # 监控CPU使用率 cpu_monitor = [] monitor_duration = 10 # 监控10秒 monitor_interval = 0.5 # 每0.5秒采样一次 def monitor_cpu(): start_time = time.time() while time.time() - start_time < monitor_duration: cpu_percent = psutil.cpu_percent(interval=None) cpu_monitor.append(cpu_percent) time.sleep(monitor_interval) # 启动监控线程 monitor_thread = threading.Thread(target=monitor_cpu) monitor_thread.start() # 启动工作线程 num_workers = multiprocessing.cpu_count() - 1 or 1 with ThreadPoolExecutor(max_workers=num_workers) as executor: futures = [executor.submit(cpu_intensive_task) for _ in range(num_workers * 2)] # 等待任务完成 for future in as_completed(futures): try: future.result() except Exception: pass # 等待监控完成 monitor_thread.join() # 分析CPU使用率 if cpu_monitor: avg_cpu = statistics.mean(cpu_monitor) max_cpu = max(cpu_monitor) print(f"\nCPU使用率监控:") print(f" 平均CPU使用率: {avg_cpu:.1f}%") print(f" 最大CPU使用率: {max_cpu:.1f}%") print(f" 采样点数: {len(cpu_monitor)}") # 验证CPU使用率在合理范围内 assert avg_cpu < performance_config["max_cpu_percent"], \ f"平均CPU使用率 {avg_cpu:.1f}% 超过阈值 {performance_config['max_cpu_percent']}%" def test_system_throughput_scalability(self, test_code_samples): """测试系统吞吐量可扩展性""" throughput_results = [] def measure_throughput(concurrent_requests: int, duration: int) -> float: """测量指定并发级别的吞吐量""" start_time = time.time() completed_requests = 0 async def process_request(request_id: int): nonlocal completed_requests try: # 模拟验证处理 test_code = test_code_samples["simple"] # 创建组件(在实际应用中应该重用) mutation_engine = MutationEngine() metadata = [{"name": f"func_{request_id}", "complexity_score": 0.3}] specification = f"void func_{request_id}() {{ }}" # 生成突变 mutations = mutation_engine.generate_mutations( specification, metadata, max_mutations=2 ) completed_requests += 1 except Exception: pass # 忽略错误,只测量吞吐量 # 创建任务 tasks = [process_request(i) for i in range(concurrent_requests)] # 在指定时间内运行 async def run_with_timeout(): try: await asyncio.wait_for( asyncio.gather(*tasks), timeout=duration ) except asyncio.TimeoutError: pass # 预期的超时 asyncio.run(run_with_timeout()) end_time = time.time() actual_duration = end_time - start_time return completed_requests / actual_duration # 测试不同并发级别 concurrency_levels = [1, 2, 5, 10, 15] for concurrency in concurrency_levels: throughput = measure_throughput(concurrency, duration=10) throughput_results.append((concurrency, throughput)) print(f"并发数 {concurrency}: 吞吐量 {throughput:.2f} req/s") # 分析可扩展性 if len(throughput_results) > 1: # 计算扩展效率 baseline_throughput = throughput_results[0][1] for i, (concurrency, throughput) in enumerate(throughput_results[1:], 1): ideal_throughput = baseline_throughput * concurrency efficiency = (throughput / ideal_throughput) * 100 print(f" 并发 {concurrency} 效率: {efficiency:.1f}%") # 验证可扩展性效率不应过低 assert efficiency > 30, \ f"并发 {concurrency} 的扩展效率 {efficiency:.1f}% 过低" def test_response_time_distribution(self, test_code_samples): """测试响应时间分布""" response_times = [] sample_size = 50 async def measure_single_request(): """测量单个请求的响应时间""" start_time = time.time() try: # 模拟验证流程 test_code = test_code_samples["medium"] mutation_engine = MutationEngine() metadata = [{"name": "test_func", "complexity_score": 0.5}] specification = "void test_func(int x) { __CPROVER_assume(x > 0); }" # 生成突变 mutations = mutation_engine.generate_mutations( specification, metadata, max_mutations=3 ) end_time = time.time() return (end_time - start_time) * 1000 # 转换为毫秒 except Exception as e: end_time = time.time() print(f"请求失败: {e}") return (end_time - start_time) * 1000 # 收集响应时间样本 for _ in range(sample_size): response_time = asyncio.run(measure_single_request()) response_times.append(response_time) # 分析响应时间分布 if response_times: avg_time = statistics.mean(response_times) median_time = statistics.median(response_times) min_time = min(response_times) max_time = max(response_times) # 计算百分位数 percentiles = statistics.quantiles(response_times, n=10) p90 = percentiles[8] # 90th percentile p95 = percentiles[9] if len(percentiles) > 9 else max_time # 计算标准差 stdev = statistics.stdev(response_times) if len(response_times) > 1 else 0 print(f"\n响应时间分布 ({sample_size} 个样本):") print(f" 平均值: {avg_time:.2f}ms") print(f" 中位数: {median_time:.2f}ms") print(f" 最小值: {min_time:.2f}ms") print(f" 最大值: {max_time:.2f}ms") print(f" 90th percentile: {p90:.2f}ms") print(f" 95th percentile: {p95:.2f}ms") print(f" 标准差: {stdev:.2f}ms") # 验证响应时间分布的合理性 assert max_time < 10000, f"最大响应时间 {max_time:.2f}ms 过长" assert stdev < avg_time, f"响应时间标准差 {stdev:.2f}ms 相对于平均值过大" def test_memory_leak_detection(self, performance_config): """检测内存泄漏""" tracemalloc.start() # 记录初始内存 snapshot1 = tracemalloc.take_snapshot() # 执行一系列操作 for iteration in range(5): # 创建组件并执行操作 mutation_engine = MutationEngine() # 执行一些处理 for i in range(10): try: metadata = [{"name": f"func_{i}", "complexity_score": 0.4}] specification = f"void func_{i}() {{ }}" mutations = mutation_engine.generate_mutations( specification, metadata, max_mutations=2 ) except Exception: pass # 删除引用 del mutation_engine gc.collect() # 记录最终内存 snapshot2 = tracemalloc.take_snapshot() # 比较内存快照 top_stats = snapshot2.compare_to(snapshot1, 'lineno') print("\n内存泄漏检测:") total_increase = sum(stat.size_diff for stat in top_stats[:10]) print(f" 总内存增长: {total_increase / 1024:.2f} KB") # 显示增长最多的文件 for stat in top_stats[:5]: if stat.size_diff > 0: print(f" {stat.traceback.format()[-1]}: +{stat.size_diff / 1024:.2f} KB") # 验证没有显著内存泄漏 assert total_increase < 1024 * 100, # 小于100KB f"检测到可能的内存泄漏,增长 {total_increase / 1024:.2f} KB" tracemalloc.stop() @pytest.mark.asyncio async def test_system_stability_under_load(self, performance_config): """测试系统在负载下的稳定性""" test_duration = 60 # 60秒 sample_interval = 5 # 每5秒采样一次 system_metrics = { "timestamp": [], "memory_mb": [], "cpu_percent": [], "active_threads": [] } def collect_metrics(): """收集系统指标""" process = psutil.Process() timestamp = time.time() memory_mb = process.memory_info().rss / 1024 / 1024 cpu_percent = process.cpu_percent() active_threads = process.num_threads() system_metrics["timestamp"].append(timestamp) system_metrics["memory_mb"].append(memory_mb) system_metrics["cpu_percent"].append(cpu_percent) system_metrics["active_threads"].append(active_threads) async def sustained_load_task(): """持续负载任务""" task_id = threading.current_thread().ident while time.time() < start_time + test_duration: try: # 模拟验证工作 mutation_engine = MutationEngine() metadata = [{"name": f"load_func_{task_id}", "complexity_score": 0.5}] specification = f"void load_func_{task_id}() {{ }}" mutations = mutation_engine.generate_mutations( specification, metadata, max_mutations=1 ) # 短暂休眠 await asyncio.sleep(0.1) except Exception: pass # 启动监控 start_time = time.time() # 启动负载任务 num_load_tasks = 5 load_tasks = [sustained_load_task() for _ in range(num_load_tasks)] # 在测试期间定期收集指标 async def monitor_during_load(): while time.time() - start_time < test_duration: collect_metrics() await asyncio.sleep(sample_interval) # 运行监控和负载 await asyncio.gather(monitor_during_load(), *load_tasks) # 分析稳定性指标 memory_values = system_metrics["memory_mb"] cpu_values = system_metrics["cpu_percent"] thread_values = system_metrics["active_threads"] if memory_values: memory_trend = (memory_values[-1] - memory_values[0]) / len(memory_values) memory_volatility = statistics.stdev(memory_values) if len(memory_values) > 1 else 0 print(f"\n系统稳定性分析 ({test_duration}秒):") print(f" 内存趋势: {memory_trend:.2f} MB/样本") print(f" 内存波动: {memory_volatility:.2f} MB") print(f" 最终内存: {memory_values[-1]:.2f} MB") print(f" CPU平均值: {statistics.mean(cpu_values):.1f}%") print(f" 线程数范围: {min(thread_values)}-{max(thread_values)}") # 验证稳定性 assert abs(memory_trend) < 1.0, f"内存趋势 {memory_trend:.2f} MB/样本 表明可能存在内存问题" assert memory_volatility < 50, f"内存波动 {memory_volatility:.2f} MB 过高" assert max(thread_values) < 100, f"线程数 {max(thread_values)} 过多" class TestDatabasePerformance: """数据库性能测试(如果使用数据库)""" def test_cache_performance(self): """测试缓存性能""" from functools import lru_cache @lru_cache(maxsize=100) def cached_function(key: str) -> str: """模拟缓存函数""" time.sleep(0.001) # 模拟处理时间 return f"result_{key}" # 测试缓存命中 keys = [f"key_{i % 20}" for i in range(100)] # 20个不同的键 # 预热缓存 for key in keys[:20]: cached_function(key) # 测试缓存性能 start_time = time.time() for key in keys: result = cached_function(key) end_time = time.time() cached_time = end_time - start_time # 测试无缓存性能 @lru_cache(maxsize=0) # 禁用缓存 def uncached_function(key: str) -> str: time.sleep(0.001) return f"result_{key}" start_time = time.time() for key in keys: result = uncached_function(key) end_time = time.time() uncached_time = end_time - start_time speedup = uncached_time / cached_time print(f"\n缓存性能测试:") print(f" 缓存时间: {cached_time:.3f}s") print(f" 无缓存时间: {uncached_time:.3f}s") print(f" 加速比: {speedup:.1f}x") # 验证缓存效果 assert speedup > 5, f"缓存加速比 {speedup:.1f}x 过低" def test_file_io_performance(self): """测试文件I/O性能""" with tempfile.TemporaryDirectory() as temp_dir: # 创建测试文件 test_file = os.path.join(temp_dir, "test.c") test_content = """ int test_function(int x) { return x * 2; } """ * 100 # 重复内容模拟大文件 # 测试写入性能 start_time = time.time() with open(test_file, 'w') as f: f.write(test_content) write_time = time.time() - start_time # 测试读取性能 start_time = time.time() with open(test_file, 'r') as f: content = f.read() read_time = time.time() - start_time print(f"\n文件I/O性能:") print(f" 写入时间: {write_time:.4f}s") print(f" 读取时间: {read_time:.4f}s") print(f" 文件大小: {len(test_content)} bytes") # 验证I/O性能 assert write_time < 0.1, f"写入时间 {write_time:.4f}s 过长" assert read_time < 0.1, f"读取时间 {read_time:.4f}s 过长" if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short"])