You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
cbmc/codedetect/tests/performance/test_llm_performance.py

589 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
LLM生成性能测试
本模块实现LLM生成性能测试包括API响应时间、令牌使用、质量度量和
不同生成策略的比较。提供基准测试和性能分析功能。
"""
import asyncio
import time
import json
import statistics
import psutil
import threading
from typing import List, Dict, Any, Optional, Tuple
from dataclasses import dataclass, asdict
from pathlib import Path
import aiohttp
import pytest
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
from src.spec.llm_generator import LLMGenerator, GenerationRequest, GenerationResult
from src.parser.c_parser import CParserFactory
from src.parser.ast_extractor import ASTExtractor
from src.utils.logger import get_logger
from test_data.simple_c_examples import get_performance_test_suite
logger = get_logger(__name__)
@dataclass
class PerformanceMetrics:
"""性能指标数据类"""
function_name: str
generation_time: float
tokens_used: int
quality_score: float
api_response_time: float
token_rate: float
memory_usage: float
cpu_usage: float
timestamp: float
complexity_level: str
verification_goals: List[str]
@dataclass
class BenchmarkResult:
"""基准测试结果"""
test_name: str
total_tests: int
successful_tests: int
average_generation_time: float
average_tokens_used: int
average_quality_score: float
average_token_rate: float
min_generation_time: float
max_generation_time: float
std_generation_time: float
total_memory_usage: float
total_cpu_usage: float
metrics: List[PerformanceMetrics]
class PerformanceMonitor:
"""性能监控器"""
def __init__(self):
self.process = psutil.Process()
self.start_memory = None
self.start_cpu = None
self.start_time = None
def start_monitoring(self):
"""开始监控"""
self.start_memory = self.process.memory_info().rss / 1024 / 1024 # MB
self.start_cpu = self.process.cpu_percent()
self.start_time = time.time()
def stop_monitoring(self) -> Tuple[float, float]:
"""停止监控并返回资源使用情况"""
end_memory = self.process.memory_info().rss / 1024 / 1024 # MB
end_cpu = self.process.cpu_percent()
memory_usage = end_memory - self.start_memory if self.start_memory else 0
cpu_usage = end_cpu - self.start_cpu if self.start_cpu else 0
return memory_usage, cpu_usage
class LLMPerformanceTester:
"""LLM性能测试器"""
def __init__(self, api_key: str, base_url: str = "https://api.siliconflow.cn/v1"):
self.api_key = api_key
self.base_url = base_url
self.monitor = PerformanceMonitor()
self.parser = CParserFactory.create_parser()
self.ast_extractor = ASTExtractor()
self.logger = logger
async def benchmark_api_performance(self, test_iterations: int = 10) -> BenchmarkResult:
"""基准测试API性能"""
self.logger.info(f"Starting API performance benchmark with {test_iterations} iterations")
# 创建简单的测试用例
test_request = GenerationRequest(
function_name="test_function",
function_info={
'name': 'test_function',
'return_type': 'int',
'parameters': [
{'name': 'a', 'type': 'int'},
{'name': 'b', 'type': 'int'}
]
},
verification_goals=['functional_correctness'],
max_retries=1,
validate=False,
store=False
)
metrics = []
async with LLMGenerator(api_key=self.api_key, base_url=self.base_url) as generator:
for i in range(test_iterations):
self.monitor.start_monitoring()
try:
start_time = time.time()
result = await generator.generate_specification(test_request)
end_time = time.time()
memory_usage, cpu_usage = self.monitor.stop_monitoring()
metric = PerformanceMetrics(
function_name="test_function",
generation_time=end_time - start_time,
tokens_used=result.tokens_used,
quality_score=result.quality_score,
api_response_time=result.generation_time,
token_rate=result.tokens_used / (end_time - start_time),
memory_usage=memory_usage,
cpu_usage=cpu_usage,
timestamp=time.time(),
complexity_level="basic",
verification_goals=['functional_correctness']
)
metrics.append(metric)
if (i + 1) % 5 == 0:
self.logger.info(f"Completed {i + 1}/{test_iterations} iterations")
except Exception as e:
self.logger.error(f"Iteration {i + 1} failed: {e}")
continue
return self._create_benchmark_result("API Performance Benchmark", metrics)
async def benchmark_by_complexity(self) -> Dict[str, BenchmarkResult]:
"""按复杂度进行基准测试"""
self.logger.info("Starting complexity-based benchmarking")
test_cases = get_performance_test_suite()
complexity_groups = {
'basic': [tc for tc in test_cases if tc.complexity_level == 'basic'],
'intermediate': [tc for tc in test_cases if tc.complexity_level == 'intermediate'],
'advanced': [tc for tc in test_cases if tc.complexity_level == 'advanced']
}
results = {}
for complexity, cases in complexity_groups.items():
if cases:
self.logger.info(f"Benchmarking {complexity} complexity functions")
metrics = []
async with LLMGenerator(api_key=self.api_key, base_url=self.base_url) as generator:
for test_case in cases:
# 创建临时文件
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.c', delete=False) as f:
f.write(test_case.source_code)
temp_path = f.name
try:
# 解析文件
ast = self.parser.parse_file(temp_path)
metadata = self.ast_extractor.extract_metadata(ast)
if metadata.functions:
func_name = list(metadata.functions.keys())[0]
func_info = metadata.functions[func_name]
request = GenerationRequest(
function_name=func_name,
function_info=func_info.to_dict(),
verification_goals=test_case.verification_goals,
max_retries=2,
validate=True,
store=False
)
self.monitor.start_monitoring()
start_time = time.time()
result = await generator.generate_specification(request)
end_time = time.time()
memory_usage, cpu_usage = self.monitor.stop_monitoring()
metric = PerformanceMetrics(
function_name=func_name,
generation_time=end_time - start_time,
tokens_used=result.tokens_used,
quality_score=result.quality_score,
api_response_time=result.generation_time,
token_rate=result.tokens_used / (end_time - start_time),
memory_usage=memory_usage,
cpu_usage=cpu_usage,
timestamp=time.time(),
complexity_level=complexity,
verification_goals=test_case.verification_goals
)
metrics.append(metric)
except Exception as e:
self.logger.error(f"Failed to benchmark {test_case.name}: {e}")
finally:
import os
os.unlink(temp_path)
results[complexity] = self._create_benchmark_result(f"{complexity.title()} Complexity Benchmark", metrics)
return results
async def benchmark_concurrent_requests(self, concurrent_count: int = 5, test_iterations: int = 3) -> BenchmarkResult:
"""基准测试并发请求性能"""
self.logger.info(f"Starting concurrent request benchmark: {concurrent_count} concurrent requests")
test_request = GenerationRequest(
function_name="concurrent_test",
function_info={
'name': 'concurrent_test',
'return_type': 'int',
'parameters': [{'name': 'x', 'type': 'int'}]
},
verification_goals=['functional_correctness'],
max_retries=1,
validate=False,
store=False
)
metrics = []
async with LLMGenerator(api_key=self.api_key, base_url=self.base_url) as generator:
for iteration in range(test_iterations):
self.logger.info(f"Concurrent iteration {iteration + 1}/{test_iterations}")
# 创建并发请求
semaphore = asyncio.Semaphore(concurrent_count)
async def make_request():
async with semaphore:
self.monitor.start_monitoring()
start_time = time.time()
try:
result = await generator.generate_specification(test_request)
end_time = time.time()
memory_usage, cpu_usage = self.monitor.stop_monitoring()
metric = PerformanceMetrics(
function_name="concurrent_test",
generation_time=end_time - start_time,
tokens_used=result.tokens_used,
quality_score=result.quality_score,
api_response_time=result.generation_time,
token_rate=result.tokens_used / (end_time - start_time),
memory_usage=memory_usage,
cpu_usage=cpu_usage,
timestamp=time.time(),
complexity_level="basic",
verification_goals=['functional_correctness']
)
return metric
except Exception as e:
self.logger.error(f"Concurrent request failed: {e}")
return None
# 执行并发请求
tasks = [make_request() for _ in range(concurrent_count)]
results = await asyncio.gather(*tasks, return_exceptions=True)
# 收集成功的指标
for result in results:
if result and not isinstance(result, Exception):
metrics.append(result)
return self._create_benchmark_result(f"Concurrent Requests ({concurrent_count}) Benchmark", metrics)
async def benchmark_retry_impact(self) -> BenchmarkResult:
"""基准测试重试机制的影响"""
self.logger.info("Starting retry impact benchmark")
test_request = GenerationRequest(
function_name="retry_test",
function_info={
'name': 'retry_test',
'return_type': 'int',
'parameters': [{'name': 'x', 'type': 'int'}]
},
verification_goals=['functional_correctness'],
max_retries=3,
validate=True,
store=False
)
metrics = []
async with LLMGenerator(api_key=self.api_key, base_url=self.base_url) as generator:
for i in range(10):
self.monitor.start_monitoring()
start_time = time.time()
try:
result = await generator.generate_specification(test_request)
end_time = time.time()
memory_usage, cpu_usage = self.monitor.stop_monitoring()
metric = PerformanceMetrics(
function_name="retry_test",
generation_time=end_time - start_time,
tokens_used=result.tokens_used,
quality_score=result.quality_score,
api_response_time=result.generation_time,
token_rate=result.tokens_used / (end_time - start_time),
memory_usage=memory_usage,
cpu_usage=cpu_usage,
timestamp=time.time(),
complexity_level="basic",
verification_goals=['functional_correctness']
)
metrics.append(metric)
except Exception as e:
self.logger.error(f"Retry test {i + 1} failed: {e}")
return self._create_benchmark_result("Retry Impact Benchmark", metrics)
def _create_benchmark_result(self, test_name: str, metrics: List[PerformanceMetrics]) -> BenchmarkResult:
"""创建基准测试结果"""
if not metrics:
return BenchmarkResult(
test_name=test_name,
total_tests=0,
successful_tests=0,
average_generation_time=0,
average_tokens_used=0,
average_quality_score=0,
average_token_rate=0,
min_generation_time=0,
max_generation_time=0,
std_generation_time=0,
total_memory_usage=0,
total_cpu_usage=0,
metrics=[]
)
successful_metrics = [m for m in metrics if m.quality_score > 0]
generation_times = [m.generation_time for m in successful_metrics]
return BenchmarkResult(
test_name=test_name,
total_tests=len(metrics),
successful_tests=len(successful_metrics),
average_generation_time=statistics.mean(generation_times) if generation_times else 0,
average_tokens_used=statistics.mean([m.tokens_used for m in successful_metrics]) if successful_metrics else 0,
average_quality_score=statistics.mean([m.quality_score for m in successful_metrics]) if successful_metrics else 0,
average_token_rate=statistics.mean([m.token_rate for m in successful_metrics]) if successful_metrics else 0,
min_generation_time=min(generation_times) if generation_times else 0,
max_generation_time=max(generation_times) if generation_times else 0,
std_generation_time=statistics.stdev(generation_times) if len(generation_times) > 1 else 0,
total_memory_usage=sum(m.memory_usage for m in metrics),
total_cpu_usage=sum(m.cpu_usage for m in metrics),
metrics=metrics
)
def generate_performance_report(self, results: Dict[str, BenchmarkResult], output_dir: str = "performance_reports"):
"""生成性能报告"""
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
report = {
'timestamp': time.time(),
'summary': self._generate_performance_summary(results),
'detailed_results': {name: asdict(result) for name, result in results.items()},
'recommendations': self._generate_recommendations(results)
}
# 保存JSON报告
json_file = output_path / f"performance_report_{int(time.time())}.json"
with open(json_file, 'w') as f:
json.dump(report, f, indent=2)
# 生成可视化报告
self._generate_visualizations(results, output_path)
self.logger.info(f"Performance report generated: {json_file}")
return str(json_file)
def _generate_performance_summary(self, results: Dict[str, BenchmarkResult]) -> Dict[str, Any]:
"""生成性能摘要"""
all_metrics = []
for result in results.values():
all_metrics.extend(result.metrics)
if not all_metrics:
return {'error': 'No performance data available'}
generation_times = [m.generation_time for m in all_metrics if m.generation_time > 0]
quality_scores = [m.quality_score for m in all_metrics if m.quality_score > 0]
token_rates = [m.token_rate for m in all_metrics if m.token_rate > 0]
return {
'total_tests': sum(r.total_tests for r in results.values()),
'successful_tests': sum(r.successful_tests for r in results.values()),
'success_rate': sum(r.successful_tests for r in results.values()) / sum(r.total_tests for r in results.values()),
'average_generation_time': statistics.mean(generation_times) if generation_times else 0,
'average_quality_score': statistics.mean(quality_scores) if quality_scores else 0,
'average_token_rate': statistics.mean(token_rates) if token_rates else 0,
'total_memory_usage': sum(r.total_memory_usage for r in results.values()),
'total_cpu_usage': sum(r.total_cpu_usage for r in results.values()),
'best_performing_test': max(results.items(), key=lambda x: x[1].average_token_rate),
'slowest_test': max(results.items(), key=lambda x: x[1].average_generation_time)
}
def _generate_recommendations(self, results: Dict[str, BenchmarkResult]) -> List[str]:
"""生成性能建议"""
recommendations = []
# 分析平均生成时间
avg_times = [r.average_generation_time for r in results.values() if r.average_generation_time > 0]
if avg_times:
avg_time = statistics.mean(avg_times)
if avg_time > 30:
recommendations.append("Average generation time is high (>30s). Consider optimizing prompts or using faster models.")
# 分析成功率
total_tests = sum(r.total_tests for r in results.values())
successful_tests = sum(r.successful_tests for r in results.values())
if total_tests > 0:
success_rate = successful_tests / total_tests
if success_rate < 0.9:
recommendations.append("Success rate is below 90%. Check API reliability and error handling.")
# 分析令牌速率
token_rates = [r.average_token_rate for r in results.values() if r.average_token_rate > 0]
if token_rates:
avg_token_rate = statistics.mean(token_rates)
if avg_token_rate < 10:
recommendations.append("Token generation rate is low. Consider network optimization or model selection.")
# 分析内存使用
total_memory = sum(r.total_memory_usage for r in results.values())
if total_memory > 1000: # 1GB
recommendations.append("High memory usage detected. Consider optimizing memory management.")
return recommendations
def _generate_visualizations(self, results: Dict[str, BenchmarkResult], output_path: Path):
"""生成可视化图表"""
# 设置图表样式
plt.style.use('seaborn-v0_8')
# 1. 性能对比图
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle('LLM Generation Performance Analysis', fontsize=16)
test_names = list(results.keys())
generation_times = [r.average_generation_time for r in results.values()]
quality_scores = [r.average_quality_score for r in results.values()]
token_rates = [r.average_token_rate for r in results.values()]
# 生成时间对比
axes[0, 0].bar(test_names, generation_times, color='skyblue')
axes[0, 0].set_title('Average Generation Time')
axes[0, 0].set_ylabel('Time (seconds)')
axes[0, 0].tick_params(axis='x', rotation=45)
# 质量分数对比
axes[0, 1].bar(test_names, quality_scores, color='lightgreen')
axes[0, 1].set_title('Average Quality Score')
axes[0, 1].set_ylabel('Quality Score')
axes[0, 1].tick_params(axis='x', rotation=45)
# 令牌速率对比
axes[1, 0].bar(test_names, token_rates, color='lightcoral')
axes[1, 0].set_title('Average Token Rate')
axes[1, 0].set_ylabel('Tokens/second')
axes[1, 0].tick_params(axis='x', rotation=45)
# 成功率对比
success_rates = [r.successful_tests / r.total_tests if r.total_tests > 0 else 0 for r in results.values()]
axes[1, 1].bar(test_names, success_rates, color='gold')
axes[1, 1].set_title('Success Rate')
axes[1, 1].set_ylabel('Success Rate')
axes[1, 1].tick_params(axis='x', rotation=45)
plt.tight_layout()
plt.savefig(output_path / 'performance_comparison.png', dpi=300, bbox_inches='tight')
plt.close()
# 2. 复杂度对比图(如果有)
complexity_results = {k: v for k, v in results.items() if 'complexity' in k.lower()}
if complexity_results:
fig, ax = plt.subplots(1, 1, figsize=(10, 6))
complexities = list(complexity_results.keys())
times = [complexity_results[c].average_generation_time for c in complexities]
qualities = [complexity_results[c].average_quality_score for c in complexities]
x = range(len(complexities))
width = 0.35
ax.bar([i - width/2 for i in x], times, width, label='Generation Time', color='skyblue')
ax.bar([i + width/2 for i in x], qualities, width, label='Quality Score', color='lightgreen')
ax.set_xlabel('Complexity Level')
ax.set_ylabel('Value')
ax.set_title('Performance vs Complexity')
ax.set_xticks(x)
ax.set_xticklabels(complexities)
ax.legend()
plt.tight_layout()
plt.savefig(output_path / 'complexity_analysis.png', dpi=300, bbox_inches='tight')
plt.close()
self.logger.info(f"Performance visualizations saved to {output_path}")
# 便捷函数
async def run_performance_benchmark(api_key: str, base_url: str = "https://api.siliconflow.cn/v1") -> Dict[str, BenchmarkResult]:
"""运行性能基准测试的便捷函数"""
tester = LLMPerformanceTester(api_key, base_url)
# 运行所有基准测试
results = {}
# API性能基准测试
results['api_performance'] = await tester.benchmark_api_performance()
# 复杂度基准测试
complexity_results = await tester.benchmark_by_complexity()
results.update(complexity_results)
# 并发请求基准测试
results['concurrent_5'] = await tester.benchmark_concurrent_requests(5)
# 重试影响基准测试
results['retry_impact'] = await tester.benchmark_retry_impact()
return results
def save_performance_report(results: Dict[str, BenchmarkResult], output_dir: str = "performance_reports"):
"""保存性能报告的便捷函数"""
tester = LLMPerformanceTester("dummy_key") # 只用于生成报告
return tester.generate_performance_report(results, output_dir)
if __name__ == "__main__":
# 运行性能测试
import os
api_key = os.getenv('SILICONFLOW_API_KEY')
if not api_key:
print("Please set SILICONFLOW_API_KEY environment variable")
exit(1)
async def main():
results = await run_performance_benchmark(api_key)
report_path = save_performance_report(results)
print(f"Performance report saved to: {report_path}")
asyncio.run(main())