|
|
"""
|
|
|
系统性能测试
|
|
|
|
|
|
测试CodeDetect系统的整体性能特征,包括:
|
|
|
- 端到端验证流程性能
|
|
|
- 并发处理能力
|
|
|
- 内存使用情况
|
|
|
- 响应时间
|
|
|
- 吞吐量
|
|
|
- 系统资源监控
|
|
|
"""
|
|
|
|
|
|
import pytest
|
|
|
import asyncio
|
|
|
import time
|
|
|
import psutil
|
|
|
import os
|
|
|
import threading
|
|
|
import tempfile
|
|
|
import json
|
|
|
from unittest.mock import Mock, AsyncMock, patch, MagicMock
|
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
from typing import List, Dict, Any
|
|
|
import statistics
|
|
|
import gc
|
|
|
import tracemalloc
|
|
|
|
|
|
from src.mutate.engine import MutationEngine
|
|
|
from src.verify.cbmc_runner import CBMCRunner
|
|
|
from src.ui.api import CodeDetectAPI
|
|
|
from src.ui.web_app import create_app
|
|
|
|
|
|
|
|
|
class TestSystemPerformance:
|
|
|
"""系统级性能测试"""
|
|
|
|
|
|
@pytest.fixture(scope="class")
|
|
|
def performance_config(self):
|
|
|
"""性能测试配置"""
|
|
|
return {
|
|
|
"concurrent_users": [1, 5, 10, 20],
|
|
|
"test_duration": 30, # 秒
|
|
|
"warmup_time": 5, # 预热时间
|
|
|
"memory_threshold_mb": 1024, # 内存阈值
|
|
|
"response_time_threshold_ms": 5000, # 响应时间阈值
|
|
|
"throughput_threshold": 2.0, # 每秒处理数阈值
|
|
|
"max_cpu_percent": 80.0 # CPU使用率阈值
|
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="class")
|
|
|
def test_code_samples(self):
|
|
|
"""测试代码样本"""
|
|
|
return {
|
|
|
"simple": """
|
|
|
int add(int a, int b) {
|
|
|
return a + b;
|
|
|
}
|
|
|
""",
|
|
|
"medium": """
|
|
|
int factorial(int n) {
|
|
|
if (n <= 1) return 1;
|
|
|
return n * factorial(n - 1);
|
|
|
}
|
|
|
|
|
|
int array_sum(int* arr, int size) {
|
|
|
int sum = 0;
|
|
|
for (int i = 0; i < size; i++) {
|
|
|
sum += arr[i];
|
|
|
}
|
|
|
return sum;
|
|
|
}
|
|
|
""",
|
|
|
"complex": """
|
|
|
#include <stdlib.h>
|
|
|
|
|
|
typedef struct Node {
|
|
|
int data;
|
|
|
struct Node* next;
|
|
|
} Node;
|
|
|
|
|
|
Node* create_node(int data) {
|
|
|
Node* node = (Node*)malloc(sizeof(Node));
|
|
|
if (node == NULL) return NULL;
|
|
|
node->data = data;
|
|
|
node->next = NULL;
|
|
|
return node;
|
|
|
}
|
|
|
|
|
|
void free_list(Node* head) {
|
|
|
while (head != NULL) {
|
|
|
Node* temp = head;
|
|
|
head = head->next;
|
|
|
free(temp);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
int list_length(Node* head) {
|
|
|
int count = 0;
|
|
|
while (head != NULL) {
|
|
|
count++;
|
|
|
head = head->next;
|
|
|
}
|
|
|
return count;
|
|
|
}
|
|
|
"""
|
|
|
}
|
|
|
|
|
|
def test_memory_usage_baseline(self, performance_config, test_code_samples):
|
|
|
"""测试内存使用基线"""
|
|
|
# 启动内存跟踪
|
|
|
tracemalloc.start()
|
|
|
|
|
|
# 测试内存使用
|
|
|
initial_memory = psutil.Process().memory_info().rss / 1024 / 1024
|
|
|
|
|
|
# 创建系统组件
|
|
|
mutation_engine = MutationEngine()
|
|
|
cbmc_runner = CBMCRunner()
|
|
|
|
|
|
# 模拟一些处理负载
|
|
|
test_code = test_code_samples["medium"]
|
|
|
|
|
|
# 执行一些操作
|
|
|
for _ in range(10):
|
|
|
try:
|
|
|
# 模拟验证流程
|
|
|
metadata = [{"name": "test_func", "complexity_score": 0.5}]
|
|
|
specification = "void test_func() { }"
|
|
|
|
|
|
# 模拟突变生成
|
|
|
mutations = mutation_engine.generate_mutations(
|
|
|
specification,
|
|
|
metadata,
|
|
|
max_mutations=5
|
|
|
)
|
|
|
|
|
|
# 清理内存
|
|
|
del mutations
|
|
|
gc.collect()
|
|
|
|
|
|
except Exception:
|
|
|
pass # 忽略错误,只关注内存使用
|
|
|
|
|
|
final_memory = psutil.Process().memory_info().rss / 1024 / 1024
|
|
|
memory_increase = final_memory - initial_memory
|
|
|
|
|
|
# 获取内存快照
|
|
|
current, peak = tracemalloc.get_traced_memory()
|
|
|
tracemalloc.stop()
|
|
|
|
|
|
print(f"\n内存使用统计:")
|
|
|
print(f" 初始内存: {initial_memory:.2f} MB")
|
|
|
print(f" 最终内存: {final_memory:.2f} MB")
|
|
|
print(f" 内存增长: {memory_increase:.2f} MB")
|
|
|
print(f" 当前跟踪: {current / 1024 / 1024:.2f} MB")
|
|
|
print(f" 峰值内存: {peak / 1024 / 1024:.2f} MB")
|
|
|
|
|
|
# 验证内存使用在阈值内
|
|
|
assert memory_increase < performance_config["memory_threshold_mb"], \
|
|
|
f"内存增长 {memory_increase:.2f}MB 超过阈值 {performance_config['memory_threshold_mb']}MB"
|
|
|
|
|
|
# 清理
|
|
|
del mutation_engine
|
|
|
del cbmc_runner
|
|
|
gc.collect()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
async def test_concurrent_verification_performance(self, performance_config, test_code_samples):
|
|
|
"""测试并发验证性能"""
|
|
|
results = []
|
|
|
|
|
|
async def run_verification(user_id: int) -> Dict[str, Any]:
|
|
|
"""运行单个验证任务"""
|
|
|
start_time = time.time()
|
|
|
|
|
|
try:
|
|
|
# 模拟完整的验证流程
|
|
|
test_code = test_code_samples["medium"]
|
|
|
|
|
|
# 创建组件
|
|
|
mutation_engine = MutationEngine()
|
|
|
|
|
|
# 模拟元数据
|
|
|
metadata = [{
|
|
|
"name": f"test_func_{user_id}",
|
|
|
"complexity_score": 0.6
|
|
|
}]
|
|
|
|
|
|
# 生成规范
|
|
|
specification = f"void test_func_{user_id}(int x) {{ __CPROVER_assume(x > 0); }}"
|
|
|
|
|
|
# 生成突变
|
|
|
mutations = mutation_engine.generate_mutations(
|
|
|
specification,
|
|
|
metadata,
|
|
|
max_mutations=3
|
|
|
)
|
|
|
|
|
|
# 模拟验证时间
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
end_time = time.time()
|
|
|
|
|
|
return {
|
|
|
"user_id": user_id,
|
|
|
"success": True,
|
|
|
"response_time": (end_time - start_time) * 1000, # 毫秒
|
|
|
"mutations_count": len(mutations) if mutations else 0
|
|
|
}
|
|
|
|
|
|
except Exception as e:
|
|
|
end_time = time.time()
|
|
|
return {
|
|
|
"user_id": user_id,
|
|
|
"success": False,
|
|
|
"response_time": (end_time - start_time) * 1000,
|
|
|
"error": str(e)
|
|
|
}
|
|
|
|
|
|
# 测试不同并发级别
|
|
|
for concurrent_count in performance_config["concurrent_users"]:
|
|
|
print(f"\n测试 {concurrent_count} 个并发用户...")
|
|
|
|
|
|
# 预热
|
|
|
await asyncio.gather(*[
|
|
|
run_verification(i) for i in range(min(3, concurrent_count))
|
|
|
])
|
|
|
await asyncio.sleep(performance_config["warmup_time"])
|
|
|
|
|
|
# 正式测试
|
|
|
start_time = time.time()
|
|
|
tasks = [run_verification(i) for i in range(concurrent_count)]
|
|
|
results_batch = await asyncio.gather(*tasks)
|
|
|
end_time = time.time()
|
|
|
|
|
|
# 分析结果
|
|
|
successful_results = [r for r in results_batch if r["success"]]
|
|
|
response_times = [r["response_time"] for r in successful_results]
|
|
|
|
|
|
if response_times:
|
|
|
avg_response_time = statistics.mean(response_times)
|
|
|
max_response_time = max(response_times)
|
|
|
min_response_time = min(response_times)
|
|
|
throughput = len(successful_results) / (end_time - start_time)
|
|
|
|
|
|
print(f" 成功率: {len(successful_results)}/{concurrent_count} ({len(successful_results)/concurrent_count*100:.1f}%)")
|
|
|
print(f" 平均响应时间: {avg_response_time:.2f}ms")
|
|
|
print(f" 最大响应时间: {max_response_time:.2f}ms")
|
|
|
print(f" 最小响应时间: {min_response_time:.2f}ms")
|
|
|
print(f" 吞吐量: {throughput:.2f} requests/second")
|
|
|
|
|
|
# 性能断言
|
|
|
assert avg_response_time < performance_config["response_time_threshold_ms"], \
|
|
|
f"平均响应时间 {avg_response_time:.2f}ms 超过阈值 {performance_config['response_time_threshold_ms']}ms"
|
|
|
|
|
|
assert throughput >= performance_config["throughput_threshold"], \
|
|
|
f"吞吐量 {throughput:.2f} 低于阈值 {performance_config['throughput_threshold']}"
|
|
|
|
|
|
results.extend(results_batch)
|
|
|
|
|
|
# 总体统计
|
|
|
total_successful = len([r for r in results if r["success"]])
|
|
|
total_response_times = [r["response_time"] for r in results if r["success"]]
|
|
|
|
|
|
if total_response_times:
|
|
|
overall_avg = statistics.mean(total_response_times)
|
|
|
overall_p95 = statistics.quantiles(total_response_times, n=20)[18] # 95th percentile
|
|
|
|
|
|
print(f"\n总体性能统计:")
|
|
|
print(f" 总成功数: {total_successful}/{len(results)}")
|
|
|
print(f" 总平均响应时间: {overall_avg:.2f}ms")
|
|
|
print(f" 95th percentile: {overall_p95:.2f}ms")
|
|
|
|
|
|
def test_cpu_usage_monitoring(self, performance_config):
|
|
|
"""测试CPU使用率监控"""
|
|
|
import threading
|
|
|
import multiprocessing
|
|
|
|
|
|
def cpu_intensive_task():
|
|
|
"""CPU密集型任务"""
|
|
|
result = 0
|
|
|
for i in range(1000000):
|
|
|
result += i * i
|
|
|
return result
|
|
|
|
|
|
# 监控CPU使用率
|
|
|
cpu_monitor = []
|
|
|
monitor_duration = 10 # 监控10秒
|
|
|
monitor_interval = 0.5 # 每0.5秒采样一次
|
|
|
|
|
|
def monitor_cpu():
|
|
|
start_time = time.time()
|
|
|
while time.time() - start_time < monitor_duration:
|
|
|
cpu_percent = psutil.cpu_percent(interval=None)
|
|
|
cpu_monitor.append(cpu_percent)
|
|
|
time.sleep(monitor_interval)
|
|
|
|
|
|
# 启动监控线程
|
|
|
monitor_thread = threading.Thread(target=monitor_cpu)
|
|
|
monitor_thread.start()
|
|
|
|
|
|
# 启动工作线程
|
|
|
num_workers = multiprocessing.cpu_count() - 1 or 1
|
|
|
with ThreadPoolExecutor(max_workers=num_workers) as executor:
|
|
|
futures = [executor.submit(cpu_intensive_task) for _ in range(num_workers * 2)]
|
|
|
|
|
|
# 等待任务完成
|
|
|
for future in as_completed(futures):
|
|
|
try:
|
|
|
future.result()
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
# 等待监控完成
|
|
|
monitor_thread.join()
|
|
|
|
|
|
# 分析CPU使用率
|
|
|
if cpu_monitor:
|
|
|
avg_cpu = statistics.mean(cpu_monitor)
|
|
|
max_cpu = max(cpu_monitor)
|
|
|
|
|
|
print(f"\nCPU使用率监控:")
|
|
|
print(f" 平均CPU使用率: {avg_cpu:.1f}%")
|
|
|
print(f" 最大CPU使用率: {max_cpu:.1f}%")
|
|
|
print(f" 采样点数: {len(cpu_monitor)}")
|
|
|
|
|
|
# 验证CPU使用率在合理范围内
|
|
|
assert avg_cpu < performance_config["max_cpu_percent"], \
|
|
|
f"平均CPU使用率 {avg_cpu:.1f}% 超过阈值 {performance_config['max_cpu_percent']}%"
|
|
|
|
|
|
def test_system_throughput_scalability(self, test_code_samples):
|
|
|
"""测试系统吞吐量可扩展性"""
|
|
|
throughput_results = []
|
|
|
|
|
|
def measure_throughput(concurrent_requests: int, duration: int) -> float:
|
|
|
"""测量指定并发级别的吞吐量"""
|
|
|
start_time = time.time()
|
|
|
completed_requests = 0
|
|
|
|
|
|
async def process_request(request_id: int):
|
|
|
nonlocal completed_requests
|
|
|
try:
|
|
|
# 模拟验证处理
|
|
|
test_code = test_code_samples["simple"]
|
|
|
|
|
|
# 创建组件(在实际应用中应该重用)
|
|
|
mutation_engine = MutationEngine()
|
|
|
|
|
|
metadata = [{"name": f"func_{request_id}", "complexity_score": 0.3}]
|
|
|
specification = f"void func_{request_id}() {{ }}"
|
|
|
|
|
|
# 生成突变
|
|
|
mutations = mutation_engine.generate_mutations(
|
|
|
specification,
|
|
|
metadata,
|
|
|
max_mutations=2
|
|
|
)
|
|
|
|
|
|
completed_requests += 1
|
|
|
|
|
|
except Exception:
|
|
|
pass # 忽略错误,只测量吞吐量
|
|
|
|
|
|
# 创建任务
|
|
|
tasks = [process_request(i) for i in range(concurrent_requests)]
|
|
|
|
|
|
# 在指定时间内运行
|
|
|
async def run_with_timeout():
|
|
|
try:
|
|
|
await asyncio.wait_for(
|
|
|
asyncio.gather(*tasks),
|
|
|
timeout=duration
|
|
|
)
|
|
|
except asyncio.TimeoutError:
|
|
|
pass # 预期的超时
|
|
|
|
|
|
asyncio.run(run_with_timeout())
|
|
|
|
|
|
end_time = time.time()
|
|
|
actual_duration = end_time - start_time
|
|
|
|
|
|
return completed_requests / actual_duration
|
|
|
|
|
|
# 测试不同并发级别
|
|
|
concurrency_levels = [1, 2, 5, 10, 15]
|
|
|
|
|
|
for concurrency in concurrency_levels:
|
|
|
throughput = measure_throughput(concurrency, duration=10)
|
|
|
throughput_results.append((concurrency, throughput))
|
|
|
|
|
|
print(f"并发数 {concurrency}: 吞吐量 {throughput:.2f} req/s")
|
|
|
|
|
|
# 分析可扩展性
|
|
|
if len(throughput_results) > 1:
|
|
|
# 计算扩展效率
|
|
|
baseline_throughput = throughput_results[0][1]
|
|
|
for i, (concurrency, throughput) in enumerate(throughput_results[1:], 1):
|
|
|
ideal_throughput = baseline_throughput * concurrency
|
|
|
efficiency = (throughput / ideal_throughput) * 100
|
|
|
|
|
|
print(f" 并发 {concurrency} 效率: {efficiency:.1f}%")
|
|
|
|
|
|
# 验证可扩展性效率不应过低
|
|
|
assert efficiency > 30, \
|
|
|
f"并发 {concurrency} 的扩展效率 {efficiency:.1f}% 过低"
|
|
|
|
|
|
def test_response_time_distribution(self, test_code_samples):
|
|
|
"""测试响应时间分布"""
|
|
|
response_times = []
|
|
|
sample_size = 50
|
|
|
|
|
|
async def measure_single_request():
|
|
|
"""测量单个请求的响应时间"""
|
|
|
start_time = time.time()
|
|
|
|
|
|
try:
|
|
|
# 模拟验证流程
|
|
|
test_code = test_code_samples["medium"]
|
|
|
|
|
|
mutation_engine = MutationEngine()
|
|
|
|
|
|
metadata = [{"name": "test_func", "complexity_score": 0.5}]
|
|
|
specification = "void test_func(int x) { __CPROVER_assume(x > 0); }"
|
|
|
|
|
|
# 生成突变
|
|
|
mutations = mutation_engine.generate_mutations(
|
|
|
specification,
|
|
|
metadata,
|
|
|
max_mutations=3
|
|
|
)
|
|
|
|
|
|
end_time = time.time()
|
|
|
return (end_time - start_time) * 1000 # 转换为毫秒
|
|
|
|
|
|
except Exception as e:
|
|
|
end_time = time.time()
|
|
|
print(f"请求失败: {e}")
|
|
|
return (end_time - start_time) * 1000
|
|
|
|
|
|
# 收集响应时间样本
|
|
|
for _ in range(sample_size):
|
|
|
response_time = asyncio.run(measure_single_request())
|
|
|
response_times.append(response_time)
|
|
|
|
|
|
# 分析响应时间分布
|
|
|
if response_times:
|
|
|
avg_time = statistics.mean(response_times)
|
|
|
median_time = statistics.median(response_times)
|
|
|
min_time = min(response_times)
|
|
|
max_time = max(response_times)
|
|
|
|
|
|
# 计算百分位数
|
|
|
percentiles = statistics.quantiles(response_times, n=10)
|
|
|
p90 = percentiles[8] # 90th percentile
|
|
|
p95 = percentiles[9] if len(percentiles) > 9 else max_time
|
|
|
|
|
|
# 计算标准差
|
|
|
stdev = statistics.stdev(response_times) if len(response_times) > 1 else 0
|
|
|
|
|
|
print(f"\n响应时间分布 ({sample_size} 个样本):")
|
|
|
print(f" 平均值: {avg_time:.2f}ms")
|
|
|
print(f" 中位数: {median_time:.2f}ms")
|
|
|
print(f" 最小值: {min_time:.2f}ms")
|
|
|
print(f" 最大值: {max_time:.2f}ms")
|
|
|
print(f" 90th percentile: {p90:.2f}ms")
|
|
|
print(f" 95th percentile: {p95:.2f}ms")
|
|
|
print(f" 标准差: {stdev:.2f}ms")
|
|
|
|
|
|
# 验证响应时间分布的合理性
|
|
|
assert max_time < 10000, f"最大响应时间 {max_time:.2f}ms 过长"
|
|
|
assert stdev < avg_time, f"响应时间标准差 {stdev:.2f}ms 相对于平均值过大"
|
|
|
|
|
|
def test_memory_leak_detection(self, performance_config):
|
|
|
"""检测内存泄漏"""
|
|
|
tracemalloc.start()
|
|
|
|
|
|
# 记录初始内存
|
|
|
snapshot1 = tracemalloc.take_snapshot()
|
|
|
|
|
|
# 执行一系列操作
|
|
|
for iteration in range(5):
|
|
|
# 创建组件并执行操作
|
|
|
mutation_engine = MutationEngine()
|
|
|
|
|
|
# 执行一些处理
|
|
|
for i in range(10):
|
|
|
try:
|
|
|
metadata = [{"name": f"func_{i}", "complexity_score": 0.4}]
|
|
|
specification = f"void func_{i}() {{ }}"
|
|
|
|
|
|
mutations = mutation_engine.generate_mutations(
|
|
|
specification,
|
|
|
metadata,
|
|
|
max_mutations=2
|
|
|
)
|
|
|
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
# 删除引用
|
|
|
del mutation_engine
|
|
|
gc.collect()
|
|
|
|
|
|
# 记录最终内存
|
|
|
snapshot2 = tracemalloc.take_snapshot()
|
|
|
|
|
|
# 比较内存快照
|
|
|
top_stats = snapshot2.compare_to(snapshot1, 'lineno')
|
|
|
|
|
|
print("\n内存泄漏检测:")
|
|
|
total_increase = sum(stat.size_diff for stat in top_stats[:10])
|
|
|
print(f" 总内存增长: {total_increase / 1024:.2f} KB")
|
|
|
|
|
|
# 显示增长最多的文件
|
|
|
for stat in top_stats[:5]:
|
|
|
if stat.size_diff > 0:
|
|
|
print(f" {stat.traceback.format()[-1]}: +{stat.size_diff / 1024:.2f} KB")
|
|
|
|
|
|
# 验证没有显著内存泄漏
|
|
|
assert total_increase < 1024 * 100, # 小于100KB
|
|
|
f"检测到可能的内存泄漏,增长 {total_increase / 1024:.2f} KB"
|
|
|
|
|
|
tracemalloc.stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
async def test_system_stability_under_load(self, performance_config):
|
|
|
"""测试系统在负载下的稳定性"""
|
|
|
test_duration = 60 # 60秒
|
|
|
sample_interval = 5 # 每5秒采样一次
|
|
|
|
|
|
system_metrics = {
|
|
|
"timestamp": [],
|
|
|
"memory_mb": [],
|
|
|
"cpu_percent": [],
|
|
|
"active_threads": []
|
|
|
}
|
|
|
|
|
|
def collect_metrics():
|
|
|
"""收集系统指标"""
|
|
|
process = psutil.Process()
|
|
|
|
|
|
timestamp = time.time()
|
|
|
memory_mb = process.memory_info().rss / 1024 / 1024
|
|
|
cpu_percent = process.cpu_percent()
|
|
|
active_threads = process.num_threads()
|
|
|
|
|
|
system_metrics["timestamp"].append(timestamp)
|
|
|
system_metrics["memory_mb"].append(memory_mb)
|
|
|
system_metrics["cpu_percent"].append(cpu_percent)
|
|
|
system_metrics["active_threads"].append(active_threads)
|
|
|
|
|
|
async def sustained_load_task():
|
|
|
"""持续负载任务"""
|
|
|
task_id = threading.current_thread().ident
|
|
|
|
|
|
while time.time() < start_time + test_duration:
|
|
|
try:
|
|
|
# 模拟验证工作
|
|
|
mutation_engine = MutationEngine()
|
|
|
|
|
|
metadata = [{"name": f"load_func_{task_id}", "complexity_score": 0.5}]
|
|
|
specification = f"void load_func_{task_id}() {{ }}"
|
|
|
|
|
|
mutations = mutation_engine.generate_mutations(
|
|
|
specification,
|
|
|
metadata,
|
|
|
max_mutations=1
|
|
|
)
|
|
|
|
|
|
# 短暂休眠
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
except Exception:
|
|
|
pass
|
|
|
|
|
|
# 启动监控
|
|
|
start_time = time.time()
|
|
|
|
|
|
# 启动负载任务
|
|
|
num_load_tasks = 5
|
|
|
load_tasks = [sustained_load_task() for _ in range(num_load_tasks)]
|
|
|
|
|
|
# 在测试期间定期收集指标
|
|
|
async def monitor_during_load():
|
|
|
while time.time() - start_time < test_duration:
|
|
|
collect_metrics()
|
|
|
await asyncio.sleep(sample_interval)
|
|
|
|
|
|
# 运行监控和负载
|
|
|
await asyncio.gather(monitor_during_load(), *load_tasks)
|
|
|
|
|
|
# 分析稳定性指标
|
|
|
memory_values = system_metrics["memory_mb"]
|
|
|
cpu_values = system_metrics["cpu_percent"]
|
|
|
thread_values = system_metrics["active_threads"]
|
|
|
|
|
|
if memory_values:
|
|
|
memory_trend = (memory_values[-1] - memory_values[0]) / len(memory_values)
|
|
|
memory_volatility = statistics.stdev(memory_values) if len(memory_values) > 1 else 0
|
|
|
|
|
|
print(f"\n系统稳定性分析 ({test_duration}秒):")
|
|
|
print(f" 内存趋势: {memory_trend:.2f} MB/样本")
|
|
|
print(f" 内存波动: {memory_volatility:.2f} MB")
|
|
|
print(f" 最终内存: {memory_values[-1]:.2f} MB")
|
|
|
print(f" CPU平均值: {statistics.mean(cpu_values):.1f}%")
|
|
|
print(f" 线程数范围: {min(thread_values)}-{max(thread_values)}")
|
|
|
|
|
|
# 验证稳定性
|
|
|
assert abs(memory_trend) < 1.0, f"内存趋势 {memory_trend:.2f} MB/样本 表明可能存在内存问题"
|
|
|
assert memory_volatility < 50, f"内存波动 {memory_volatility:.2f} MB 过高"
|
|
|
assert max(thread_values) < 100, f"线程数 {max(thread_values)} 过多"
|
|
|
|
|
|
|
|
|
class TestDatabasePerformance:
|
|
|
"""数据库性能测试(如果使用数据库)"""
|
|
|
|
|
|
def test_cache_performance(self):
|
|
|
"""测试缓存性能"""
|
|
|
from functools import lru_cache
|
|
|
|
|
|
@lru_cache(maxsize=100)
|
|
|
def cached_function(key: str) -> str:
|
|
|
"""模拟缓存函数"""
|
|
|
time.sleep(0.001) # 模拟处理时间
|
|
|
return f"result_{key}"
|
|
|
|
|
|
# 测试缓存命中
|
|
|
keys = [f"key_{i % 20}" for i in range(100)] # 20个不同的键
|
|
|
|
|
|
# 预热缓存
|
|
|
for key in keys[:20]:
|
|
|
cached_function(key)
|
|
|
|
|
|
# 测试缓存性能
|
|
|
start_time = time.time()
|
|
|
for key in keys:
|
|
|
result = cached_function(key)
|
|
|
end_time = time.time()
|
|
|
|
|
|
cached_time = end_time - start_time
|
|
|
|
|
|
# 测试无缓存性能
|
|
|
@lru_cache(maxsize=0) # 禁用缓存
|
|
|
def uncached_function(key: str) -> str:
|
|
|
time.sleep(0.001)
|
|
|
return f"result_{key}"
|
|
|
|
|
|
start_time = time.time()
|
|
|
for key in keys:
|
|
|
result = uncached_function(key)
|
|
|
end_time = time.time()
|
|
|
|
|
|
uncached_time = end_time - start_time
|
|
|
|
|
|
speedup = uncached_time / cached_time
|
|
|
|
|
|
print(f"\n缓存性能测试:")
|
|
|
print(f" 缓存时间: {cached_time:.3f}s")
|
|
|
print(f" 无缓存时间: {uncached_time:.3f}s")
|
|
|
print(f" 加速比: {speedup:.1f}x")
|
|
|
|
|
|
# 验证缓存效果
|
|
|
assert speedup > 5, f"缓存加速比 {speedup:.1f}x 过低"
|
|
|
|
|
|
def test_file_io_performance(self):
|
|
|
"""测试文件I/O性能"""
|
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
|
# 创建测试文件
|
|
|
test_file = os.path.join(temp_dir, "test.c")
|
|
|
test_content = """
|
|
|
int test_function(int x) {
|
|
|
return x * 2;
|
|
|
}
|
|
|
""" * 100 # 重复内容模拟大文件
|
|
|
|
|
|
# 测试写入性能
|
|
|
start_time = time.time()
|
|
|
with open(test_file, 'w') as f:
|
|
|
f.write(test_content)
|
|
|
write_time = time.time() - start_time
|
|
|
|
|
|
# 测试读取性能
|
|
|
start_time = time.time()
|
|
|
with open(test_file, 'r') as f:
|
|
|
content = f.read()
|
|
|
read_time = time.time() - start_time
|
|
|
|
|
|
print(f"\n文件I/O性能:")
|
|
|
print(f" 写入时间: {write_time:.4f}s")
|
|
|
print(f" 读取时间: {read_time:.4f}s")
|
|
|
print(f" 文件大小: {len(test_content)} bytes")
|
|
|
|
|
|
# 验证I/O性能
|
|
|
assert write_time < 0.1, f"写入时间 {write_time:.4f}s 过长"
|
|
|
assert read_time < 0.1, f"读取时间 {read_time:.4f}s 过长"
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
pytest.main([__file__, "-v", "--tb=short"]) |