""" LLM生成性能测试 本模块实现LLM生成性能测试,包括API响应时间、令牌使用、质量度量和 不同生成策略的比较。提供基准测试和性能分析功能。 """ import asyncio import time import json import statistics import psutil import threading from typing import List, Dict, Any, Optional, Tuple from dataclasses import dataclass, asdict from pathlib import Path import aiohttp import pytest import matplotlib.pyplot as plt import seaborn as sns import pandas as pd from src.spec.llm_generator import LLMGenerator, GenerationRequest, GenerationResult from src.parser.c_parser import CParserFactory from src.parser.ast_extractor import ASTExtractor from src.utils.logger import get_logger from test_data.simple_c_examples import get_performance_test_suite logger = get_logger(__name__) @dataclass class PerformanceMetrics: """性能指标数据类""" function_name: str generation_time: float tokens_used: int quality_score: float api_response_time: float token_rate: float memory_usage: float cpu_usage: float timestamp: float complexity_level: str verification_goals: List[str] @dataclass class BenchmarkResult: """基准测试结果""" test_name: str total_tests: int successful_tests: int average_generation_time: float average_tokens_used: int average_quality_score: float average_token_rate: float min_generation_time: float max_generation_time: float std_generation_time: float total_memory_usage: float total_cpu_usage: float metrics: List[PerformanceMetrics] class PerformanceMonitor: """性能监控器""" def __init__(self): self.process = psutil.Process() self.start_memory = None self.start_cpu = None self.start_time = None def start_monitoring(self): """开始监控""" self.start_memory = self.process.memory_info().rss / 1024 / 1024 # MB self.start_cpu = self.process.cpu_percent() self.start_time = time.time() def stop_monitoring(self) -> Tuple[float, float]: """停止监控并返回资源使用情况""" end_memory = self.process.memory_info().rss / 1024 / 1024 # MB end_cpu = self.process.cpu_percent() memory_usage = end_memory - self.start_memory if self.start_memory else 0 cpu_usage = end_cpu - self.start_cpu if self.start_cpu else 0 return memory_usage, cpu_usage class LLMPerformanceTester: """LLM性能测试器""" def __init__(self, api_key: str, base_url: str = "https://api.siliconflow.cn/v1"): self.api_key = api_key self.base_url = base_url self.monitor = PerformanceMonitor() self.parser = CParserFactory.create_parser() self.ast_extractor = ASTExtractor() self.logger = logger async def benchmark_api_performance(self, test_iterations: int = 10) -> BenchmarkResult: """基准测试API性能""" self.logger.info(f"Starting API performance benchmark with {test_iterations} iterations") # 创建简单的测试用例 test_request = GenerationRequest( function_name="test_function", function_info={ 'name': 'test_function', 'return_type': 'int', 'parameters': [ {'name': 'a', 'type': 'int'}, {'name': 'b', 'type': 'int'} ] }, verification_goals=['functional_correctness'], max_retries=1, validate=False, store=False ) metrics = [] async with LLMGenerator(api_key=self.api_key, base_url=self.base_url) as generator: for i in range(test_iterations): self.monitor.start_monitoring() try: start_time = time.time() result = await generator.generate_specification(test_request) end_time = time.time() memory_usage, cpu_usage = self.monitor.stop_monitoring() metric = PerformanceMetrics( function_name="test_function", generation_time=end_time - start_time, tokens_used=result.tokens_used, quality_score=result.quality_score, api_response_time=result.generation_time, token_rate=result.tokens_used / (end_time - start_time), memory_usage=memory_usage, cpu_usage=cpu_usage, timestamp=time.time(), complexity_level="basic", verification_goals=['functional_correctness'] ) metrics.append(metric) if (i + 1) % 5 == 0: self.logger.info(f"Completed {i + 1}/{test_iterations} iterations") except Exception as e: self.logger.error(f"Iteration {i + 1} failed: {e}") continue return self._create_benchmark_result("API Performance Benchmark", metrics) async def benchmark_by_complexity(self) -> Dict[str, BenchmarkResult]: """按复杂度进行基准测试""" self.logger.info("Starting complexity-based benchmarking") test_cases = get_performance_test_suite() complexity_groups = { 'basic': [tc for tc in test_cases if tc.complexity_level == 'basic'], 'intermediate': [tc for tc in test_cases if tc.complexity_level == 'intermediate'], 'advanced': [tc for tc in test_cases if tc.complexity_level == 'advanced'] } results = {} for complexity, cases in complexity_groups.items(): if cases: self.logger.info(f"Benchmarking {complexity} complexity functions") metrics = [] async with LLMGenerator(api_key=self.api_key, base_url=self.base_url) as generator: for test_case in cases: # 创建临时文件 import tempfile with tempfile.NamedTemporaryFile(mode='w', suffix='.c', delete=False) as f: f.write(test_case.source_code) temp_path = f.name try: # 解析文件 ast = self.parser.parse_file(temp_path) metadata = self.ast_extractor.extract_metadata(ast) if metadata.functions: func_name = list(metadata.functions.keys())[0] func_info = metadata.functions[func_name] request = GenerationRequest( function_name=func_name, function_info=func_info.to_dict(), verification_goals=test_case.verification_goals, max_retries=2, validate=True, store=False ) self.monitor.start_monitoring() start_time = time.time() result = await generator.generate_specification(request) end_time = time.time() memory_usage, cpu_usage = self.monitor.stop_monitoring() metric = PerformanceMetrics( function_name=func_name, generation_time=end_time - start_time, tokens_used=result.tokens_used, quality_score=result.quality_score, api_response_time=result.generation_time, token_rate=result.tokens_used / (end_time - start_time), memory_usage=memory_usage, cpu_usage=cpu_usage, timestamp=time.time(), complexity_level=complexity, verification_goals=test_case.verification_goals ) metrics.append(metric) except Exception as e: self.logger.error(f"Failed to benchmark {test_case.name}: {e}") finally: import os os.unlink(temp_path) results[complexity] = self._create_benchmark_result(f"{complexity.title()} Complexity Benchmark", metrics) return results async def benchmark_concurrent_requests(self, concurrent_count: int = 5, test_iterations: int = 3) -> BenchmarkResult: """基准测试并发请求性能""" self.logger.info(f"Starting concurrent request benchmark: {concurrent_count} concurrent requests") test_request = GenerationRequest( function_name="concurrent_test", function_info={ 'name': 'concurrent_test', 'return_type': 'int', 'parameters': [{'name': 'x', 'type': 'int'}] }, verification_goals=['functional_correctness'], max_retries=1, validate=False, store=False ) metrics = [] async with LLMGenerator(api_key=self.api_key, base_url=self.base_url) as generator: for iteration in range(test_iterations): self.logger.info(f"Concurrent iteration {iteration + 1}/{test_iterations}") # 创建并发请求 semaphore = asyncio.Semaphore(concurrent_count) async def make_request(): async with semaphore: self.monitor.start_monitoring() start_time = time.time() try: result = await generator.generate_specification(test_request) end_time = time.time() memory_usage, cpu_usage = self.monitor.stop_monitoring() metric = PerformanceMetrics( function_name="concurrent_test", generation_time=end_time - start_time, tokens_used=result.tokens_used, quality_score=result.quality_score, api_response_time=result.generation_time, token_rate=result.tokens_used / (end_time - start_time), memory_usage=memory_usage, cpu_usage=cpu_usage, timestamp=time.time(), complexity_level="basic", verification_goals=['functional_correctness'] ) return metric except Exception as e: self.logger.error(f"Concurrent request failed: {e}") return None # 执行并发请求 tasks = [make_request() for _ in range(concurrent_count)] results = await asyncio.gather(*tasks, return_exceptions=True) # 收集成功的指标 for result in results: if result and not isinstance(result, Exception): metrics.append(result) return self._create_benchmark_result(f"Concurrent Requests ({concurrent_count}) Benchmark", metrics) async def benchmark_retry_impact(self) -> BenchmarkResult: """基准测试重试机制的影响""" self.logger.info("Starting retry impact benchmark") test_request = GenerationRequest( function_name="retry_test", function_info={ 'name': 'retry_test', 'return_type': 'int', 'parameters': [{'name': 'x', 'type': 'int'}] }, verification_goals=['functional_correctness'], max_retries=3, validate=True, store=False ) metrics = [] async with LLMGenerator(api_key=self.api_key, base_url=self.base_url) as generator: for i in range(10): self.monitor.start_monitoring() start_time = time.time() try: result = await generator.generate_specification(test_request) end_time = time.time() memory_usage, cpu_usage = self.monitor.stop_monitoring() metric = PerformanceMetrics( function_name="retry_test", generation_time=end_time - start_time, tokens_used=result.tokens_used, quality_score=result.quality_score, api_response_time=result.generation_time, token_rate=result.tokens_used / (end_time - start_time), memory_usage=memory_usage, cpu_usage=cpu_usage, timestamp=time.time(), complexity_level="basic", verification_goals=['functional_correctness'] ) metrics.append(metric) except Exception as e: self.logger.error(f"Retry test {i + 1} failed: {e}") return self._create_benchmark_result("Retry Impact Benchmark", metrics) def _create_benchmark_result(self, test_name: str, metrics: List[PerformanceMetrics]) -> BenchmarkResult: """创建基准测试结果""" if not metrics: return BenchmarkResult( test_name=test_name, total_tests=0, successful_tests=0, average_generation_time=0, average_tokens_used=0, average_quality_score=0, average_token_rate=0, min_generation_time=0, max_generation_time=0, std_generation_time=0, total_memory_usage=0, total_cpu_usage=0, metrics=[] ) successful_metrics = [m for m in metrics if m.quality_score > 0] generation_times = [m.generation_time for m in successful_metrics] return BenchmarkResult( test_name=test_name, total_tests=len(metrics), successful_tests=len(successful_metrics), average_generation_time=statistics.mean(generation_times) if generation_times else 0, average_tokens_used=statistics.mean([m.tokens_used for m in successful_metrics]) if successful_metrics else 0, average_quality_score=statistics.mean([m.quality_score for m in successful_metrics]) if successful_metrics else 0, average_token_rate=statistics.mean([m.token_rate for m in successful_metrics]) if successful_metrics else 0, min_generation_time=min(generation_times) if generation_times else 0, max_generation_time=max(generation_times) if generation_times else 0, std_generation_time=statistics.stdev(generation_times) if len(generation_times) > 1 else 0, total_memory_usage=sum(m.memory_usage for m in metrics), total_cpu_usage=sum(m.cpu_usage for m in metrics), metrics=metrics ) def generate_performance_report(self, results: Dict[str, BenchmarkResult], output_dir: str = "performance_reports"): """生成性能报告""" output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) report = { 'timestamp': time.time(), 'summary': self._generate_performance_summary(results), 'detailed_results': {name: asdict(result) for name, result in results.items()}, 'recommendations': self._generate_recommendations(results) } # 保存JSON报告 json_file = output_path / f"performance_report_{int(time.time())}.json" with open(json_file, 'w') as f: json.dump(report, f, indent=2) # 生成可视化报告 self._generate_visualizations(results, output_path) self.logger.info(f"Performance report generated: {json_file}") return str(json_file) def _generate_performance_summary(self, results: Dict[str, BenchmarkResult]) -> Dict[str, Any]: """生成性能摘要""" all_metrics = [] for result in results.values(): all_metrics.extend(result.metrics) if not all_metrics: return {'error': 'No performance data available'} generation_times = [m.generation_time for m in all_metrics if m.generation_time > 0] quality_scores = [m.quality_score for m in all_metrics if m.quality_score > 0] token_rates = [m.token_rate for m in all_metrics if m.token_rate > 0] return { 'total_tests': sum(r.total_tests for r in results.values()), 'successful_tests': sum(r.successful_tests for r in results.values()), 'success_rate': sum(r.successful_tests for r in results.values()) / sum(r.total_tests for r in results.values()), 'average_generation_time': statistics.mean(generation_times) if generation_times else 0, 'average_quality_score': statistics.mean(quality_scores) if quality_scores else 0, 'average_token_rate': statistics.mean(token_rates) if token_rates else 0, 'total_memory_usage': sum(r.total_memory_usage for r in results.values()), 'total_cpu_usage': sum(r.total_cpu_usage for r in results.values()), 'best_performing_test': max(results.items(), key=lambda x: x[1].average_token_rate), 'slowest_test': max(results.items(), key=lambda x: x[1].average_generation_time) } def _generate_recommendations(self, results: Dict[str, BenchmarkResult]) -> List[str]: """生成性能建议""" recommendations = [] # 分析平均生成时间 avg_times = [r.average_generation_time for r in results.values() if r.average_generation_time > 0] if avg_times: avg_time = statistics.mean(avg_times) if avg_time > 30: recommendations.append("Average generation time is high (>30s). Consider optimizing prompts or using faster models.") # 分析成功率 total_tests = sum(r.total_tests for r in results.values()) successful_tests = sum(r.successful_tests for r in results.values()) if total_tests > 0: success_rate = successful_tests / total_tests if success_rate < 0.9: recommendations.append("Success rate is below 90%. Check API reliability and error handling.") # 分析令牌速率 token_rates = [r.average_token_rate for r in results.values() if r.average_token_rate > 0] if token_rates: avg_token_rate = statistics.mean(token_rates) if avg_token_rate < 10: recommendations.append("Token generation rate is low. Consider network optimization or model selection.") # 分析内存使用 total_memory = sum(r.total_memory_usage for r in results.values()) if total_memory > 1000: # 1GB recommendations.append("High memory usage detected. Consider optimizing memory management.") return recommendations def _generate_visualizations(self, results: Dict[str, BenchmarkResult], output_path: Path): """生成可视化图表""" # 设置图表样式 plt.style.use('seaborn-v0_8') # 1. 性能对比图 fig, axes = plt.subplots(2, 2, figsize=(15, 10)) fig.suptitle('LLM Generation Performance Analysis', fontsize=16) test_names = list(results.keys()) generation_times = [r.average_generation_time for r in results.values()] quality_scores = [r.average_quality_score for r in results.values()] token_rates = [r.average_token_rate for r in results.values()] # 生成时间对比 axes[0, 0].bar(test_names, generation_times, color='skyblue') axes[0, 0].set_title('Average Generation Time') axes[0, 0].set_ylabel('Time (seconds)') axes[0, 0].tick_params(axis='x', rotation=45) # 质量分数对比 axes[0, 1].bar(test_names, quality_scores, color='lightgreen') axes[0, 1].set_title('Average Quality Score') axes[0, 1].set_ylabel('Quality Score') axes[0, 1].tick_params(axis='x', rotation=45) # 令牌速率对比 axes[1, 0].bar(test_names, token_rates, color='lightcoral') axes[1, 0].set_title('Average Token Rate') axes[1, 0].set_ylabel('Tokens/second') axes[1, 0].tick_params(axis='x', rotation=45) # 成功率对比 success_rates = [r.successful_tests / r.total_tests if r.total_tests > 0 else 0 for r in results.values()] axes[1, 1].bar(test_names, success_rates, color='gold') axes[1, 1].set_title('Success Rate') axes[1, 1].set_ylabel('Success Rate') axes[1, 1].tick_params(axis='x', rotation=45) plt.tight_layout() plt.savefig(output_path / 'performance_comparison.png', dpi=300, bbox_inches='tight') plt.close() # 2. 复杂度对比图(如果有) complexity_results = {k: v for k, v in results.items() if 'complexity' in k.lower()} if complexity_results: fig, ax = plt.subplots(1, 1, figsize=(10, 6)) complexities = list(complexity_results.keys()) times = [complexity_results[c].average_generation_time for c in complexities] qualities = [complexity_results[c].average_quality_score for c in complexities] x = range(len(complexities)) width = 0.35 ax.bar([i - width/2 for i in x], times, width, label='Generation Time', color='skyblue') ax.bar([i + width/2 for i in x], qualities, width, label='Quality Score', color='lightgreen') ax.set_xlabel('Complexity Level') ax.set_ylabel('Value') ax.set_title('Performance vs Complexity') ax.set_xticks(x) ax.set_xticklabels(complexities) ax.legend() plt.tight_layout() plt.savefig(output_path / 'complexity_analysis.png', dpi=300, bbox_inches='tight') plt.close() self.logger.info(f"Performance visualizations saved to {output_path}") # 便捷函数 async def run_performance_benchmark(api_key: str, base_url: str = "https://api.siliconflow.cn/v1") -> Dict[str, BenchmarkResult]: """运行性能基准测试的便捷函数""" tester = LLMPerformanceTester(api_key, base_url) # 运行所有基准测试 results = {} # API性能基准测试 results['api_performance'] = await tester.benchmark_api_performance() # 复杂度基准测试 complexity_results = await tester.benchmark_by_complexity() results.update(complexity_results) # 并发请求基准测试 results['concurrent_5'] = await tester.benchmark_concurrent_requests(5) # 重试影响基准测试 results['retry_impact'] = await tester.benchmark_retry_impact() return results def save_performance_report(results: Dict[str, BenchmarkResult], output_dir: str = "performance_reports"): """保存性能报告的便捷函数""" tester = LLMPerformanceTester("dummy_key") # 只用于生成报告 return tester.generate_performance_report(results, output_dir) if __name__ == "__main__": # 运行性能测试 import os api_key = os.getenv('SILICONFLOW_API_KEY') if not api_key: print("Please set SILICONFLOW_API_KEY environment variable") exit(1) async def main(): results = await run_performance_benchmark(api_key) report_path = save_performance_report(results) print(f"Performance report saved to: {report_path}") asyncio.run(main())