|
|
"""
|
|
|
LLM生成性能测试
|
|
|
|
|
|
本模块实现LLM生成性能测试,包括API响应时间、令牌使用、质量度量和
|
|
|
不同生成策略的比较。提供基准测试和性能分析功能。
|
|
|
"""
|
|
|
|
|
|
import asyncio
|
|
|
import time
|
|
|
import json
|
|
|
import statistics
|
|
|
import psutil
|
|
|
import threading
|
|
|
from typing import List, Dict, Any, Optional, Tuple
|
|
|
from dataclasses import dataclass, asdict
|
|
|
from pathlib import Path
|
|
|
import aiohttp
|
|
|
import pytest
|
|
|
import matplotlib.pyplot as plt
|
|
|
import seaborn as sns
|
|
|
import pandas as pd
|
|
|
|
|
|
from src.spec.llm_generator import LLMGenerator, GenerationRequest, GenerationResult
|
|
|
from src.parser.c_parser import CParserFactory
|
|
|
from src.parser.ast_extractor import ASTExtractor
|
|
|
from src.utils.logger import get_logger
|
|
|
from test_data.simple_c_examples import get_performance_test_suite
|
|
|
|
|
|
logger = get_logger(__name__)
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class PerformanceMetrics:
|
|
|
"""性能指标数据类"""
|
|
|
function_name: str
|
|
|
generation_time: float
|
|
|
tokens_used: int
|
|
|
quality_score: float
|
|
|
api_response_time: float
|
|
|
token_rate: float
|
|
|
memory_usage: float
|
|
|
cpu_usage: float
|
|
|
timestamp: float
|
|
|
complexity_level: str
|
|
|
verification_goals: List[str]
|
|
|
|
|
|
|
|
|
@dataclass
|
|
|
class BenchmarkResult:
|
|
|
"""基准测试结果"""
|
|
|
test_name: str
|
|
|
total_tests: int
|
|
|
successful_tests: int
|
|
|
average_generation_time: float
|
|
|
average_tokens_used: int
|
|
|
average_quality_score: float
|
|
|
average_token_rate: float
|
|
|
min_generation_time: float
|
|
|
max_generation_time: float
|
|
|
std_generation_time: float
|
|
|
total_memory_usage: float
|
|
|
total_cpu_usage: float
|
|
|
metrics: List[PerformanceMetrics]
|
|
|
|
|
|
|
|
|
class PerformanceMonitor:
|
|
|
"""性能监控器"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.process = psutil.Process()
|
|
|
self.start_memory = None
|
|
|
self.start_cpu = None
|
|
|
self.start_time = None
|
|
|
|
|
|
def start_monitoring(self):
|
|
|
"""开始监控"""
|
|
|
self.start_memory = self.process.memory_info().rss / 1024 / 1024 # MB
|
|
|
self.start_cpu = self.process.cpu_percent()
|
|
|
self.start_time = time.time()
|
|
|
|
|
|
def stop_monitoring(self) -> Tuple[float, float]:
|
|
|
"""停止监控并返回资源使用情况"""
|
|
|
end_memory = self.process.memory_info().rss / 1024 / 1024 # MB
|
|
|
end_cpu = self.process.cpu_percent()
|
|
|
|
|
|
memory_usage = end_memory - self.start_memory if self.start_memory else 0
|
|
|
cpu_usage = end_cpu - self.start_cpu if self.start_cpu else 0
|
|
|
|
|
|
return memory_usage, cpu_usage
|
|
|
|
|
|
|
|
|
class LLMPerformanceTester:
|
|
|
"""LLM性能测试器"""
|
|
|
|
|
|
def __init__(self, api_key: str, base_url: str = "https://api.siliconflow.cn/v1"):
|
|
|
self.api_key = api_key
|
|
|
self.base_url = base_url
|
|
|
self.monitor = PerformanceMonitor()
|
|
|
self.parser = CParserFactory.create_parser()
|
|
|
self.ast_extractor = ASTExtractor()
|
|
|
self.logger = logger
|
|
|
|
|
|
async def benchmark_api_performance(self, test_iterations: int = 10) -> BenchmarkResult:
|
|
|
"""基准测试API性能"""
|
|
|
self.logger.info(f"Starting API performance benchmark with {test_iterations} iterations")
|
|
|
|
|
|
# 创建简单的测试用例
|
|
|
test_request = GenerationRequest(
|
|
|
function_name="test_function",
|
|
|
function_info={
|
|
|
'name': 'test_function',
|
|
|
'return_type': 'int',
|
|
|
'parameters': [
|
|
|
{'name': 'a', 'type': 'int'},
|
|
|
{'name': 'b', 'type': 'int'}
|
|
|
]
|
|
|
},
|
|
|
verification_goals=['functional_correctness'],
|
|
|
max_retries=1,
|
|
|
validate=False,
|
|
|
store=False
|
|
|
)
|
|
|
|
|
|
metrics = []
|
|
|
|
|
|
async with LLMGenerator(api_key=self.api_key, base_url=self.base_url) as generator:
|
|
|
for i in range(test_iterations):
|
|
|
self.monitor.start_monitoring()
|
|
|
|
|
|
try:
|
|
|
start_time = time.time()
|
|
|
result = await generator.generate_specification(test_request)
|
|
|
end_time = time.time()
|
|
|
|
|
|
memory_usage, cpu_usage = self.monitor.stop_monitoring()
|
|
|
|
|
|
metric = PerformanceMetrics(
|
|
|
function_name="test_function",
|
|
|
generation_time=end_time - start_time,
|
|
|
tokens_used=result.tokens_used,
|
|
|
quality_score=result.quality_score,
|
|
|
api_response_time=result.generation_time,
|
|
|
token_rate=result.tokens_used / (end_time - start_time),
|
|
|
memory_usage=memory_usage,
|
|
|
cpu_usage=cpu_usage,
|
|
|
timestamp=time.time(),
|
|
|
complexity_level="basic",
|
|
|
verification_goals=['functional_correctness']
|
|
|
)
|
|
|
metrics.append(metric)
|
|
|
|
|
|
if (i + 1) % 5 == 0:
|
|
|
self.logger.info(f"Completed {i + 1}/{test_iterations} iterations")
|
|
|
|
|
|
except Exception as e:
|
|
|
self.logger.error(f"Iteration {i + 1} failed: {e}")
|
|
|
continue
|
|
|
|
|
|
return self._create_benchmark_result("API Performance Benchmark", metrics)
|
|
|
|
|
|
async def benchmark_by_complexity(self) -> Dict[str, BenchmarkResult]:
|
|
|
"""按复杂度进行基准测试"""
|
|
|
self.logger.info("Starting complexity-based benchmarking")
|
|
|
|
|
|
test_cases = get_performance_test_suite()
|
|
|
complexity_groups = {
|
|
|
'basic': [tc for tc in test_cases if tc.complexity_level == 'basic'],
|
|
|
'intermediate': [tc for tc in test_cases if tc.complexity_level == 'intermediate'],
|
|
|
'advanced': [tc for tc in test_cases if tc.complexity_level == 'advanced']
|
|
|
}
|
|
|
|
|
|
results = {}
|
|
|
|
|
|
for complexity, cases in complexity_groups.items():
|
|
|
if cases:
|
|
|
self.logger.info(f"Benchmarking {complexity} complexity functions")
|
|
|
metrics = []
|
|
|
|
|
|
async with LLMGenerator(api_key=self.api_key, base_url=self.base_url) as generator:
|
|
|
for test_case in cases:
|
|
|
# 创建临时文件
|
|
|
import tempfile
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.c', delete=False) as f:
|
|
|
f.write(test_case.source_code)
|
|
|
temp_path = f.name
|
|
|
|
|
|
try:
|
|
|
# 解析文件
|
|
|
ast = self.parser.parse_file(temp_path)
|
|
|
metadata = self.ast_extractor.extract_metadata(ast)
|
|
|
|
|
|
if metadata.functions:
|
|
|
func_name = list(metadata.functions.keys())[0]
|
|
|
func_info = metadata.functions[func_name]
|
|
|
|
|
|
request = GenerationRequest(
|
|
|
function_name=func_name,
|
|
|
function_info=func_info.to_dict(),
|
|
|
verification_goals=test_case.verification_goals,
|
|
|
max_retries=2,
|
|
|
validate=True,
|
|
|
store=False
|
|
|
)
|
|
|
|
|
|
self.monitor.start_monitoring()
|
|
|
start_time = time.time()
|
|
|
result = await generator.generate_specification(request)
|
|
|
end_time = time.time()
|
|
|
|
|
|
memory_usage, cpu_usage = self.monitor.stop_monitoring()
|
|
|
|
|
|
metric = PerformanceMetrics(
|
|
|
function_name=func_name,
|
|
|
generation_time=end_time - start_time,
|
|
|
tokens_used=result.tokens_used,
|
|
|
quality_score=result.quality_score,
|
|
|
api_response_time=result.generation_time,
|
|
|
token_rate=result.tokens_used / (end_time - start_time),
|
|
|
memory_usage=memory_usage,
|
|
|
cpu_usage=cpu_usage,
|
|
|
timestamp=time.time(),
|
|
|
complexity_level=complexity,
|
|
|
verification_goals=test_case.verification_goals
|
|
|
)
|
|
|
metrics.append(metric)
|
|
|
|
|
|
except Exception as e:
|
|
|
self.logger.error(f"Failed to benchmark {test_case.name}: {e}")
|
|
|
finally:
|
|
|
import os
|
|
|
os.unlink(temp_path)
|
|
|
|
|
|
results[complexity] = self._create_benchmark_result(f"{complexity.title()} Complexity Benchmark", metrics)
|
|
|
|
|
|
return results
|
|
|
|
|
|
async def benchmark_concurrent_requests(self, concurrent_count: int = 5, test_iterations: int = 3) -> BenchmarkResult:
|
|
|
"""基准测试并发请求性能"""
|
|
|
self.logger.info(f"Starting concurrent request benchmark: {concurrent_count} concurrent requests")
|
|
|
|
|
|
test_request = GenerationRequest(
|
|
|
function_name="concurrent_test",
|
|
|
function_info={
|
|
|
'name': 'concurrent_test',
|
|
|
'return_type': 'int',
|
|
|
'parameters': [{'name': 'x', 'type': 'int'}]
|
|
|
},
|
|
|
verification_goals=['functional_correctness'],
|
|
|
max_retries=1,
|
|
|
validate=False,
|
|
|
store=False
|
|
|
)
|
|
|
|
|
|
metrics = []
|
|
|
|
|
|
async with LLMGenerator(api_key=self.api_key, base_url=self.base_url) as generator:
|
|
|
for iteration in range(test_iterations):
|
|
|
self.logger.info(f"Concurrent iteration {iteration + 1}/{test_iterations}")
|
|
|
|
|
|
# 创建并发请求
|
|
|
semaphore = asyncio.Semaphore(concurrent_count)
|
|
|
|
|
|
async def make_request():
|
|
|
async with semaphore:
|
|
|
self.monitor.start_monitoring()
|
|
|
start_time = time.time()
|
|
|
|
|
|
try:
|
|
|
result = await generator.generate_specification(test_request)
|
|
|
end_time = time.time()
|
|
|
|
|
|
memory_usage, cpu_usage = self.monitor.stop_monitoring()
|
|
|
|
|
|
metric = PerformanceMetrics(
|
|
|
function_name="concurrent_test",
|
|
|
generation_time=end_time - start_time,
|
|
|
tokens_used=result.tokens_used,
|
|
|
quality_score=result.quality_score,
|
|
|
api_response_time=result.generation_time,
|
|
|
token_rate=result.tokens_used / (end_time - start_time),
|
|
|
memory_usage=memory_usage,
|
|
|
cpu_usage=cpu_usage,
|
|
|
timestamp=time.time(),
|
|
|
complexity_level="basic",
|
|
|
verification_goals=['functional_correctness']
|
|
|
)
|
|
|
return metric
|
|
|
|
|
|
except Exception as e:
|
|
|
self.logger.error(f"Concurrent request failed: {e}")
|
|
|
return None
|
|
|
|
|
|
# 执行并发请求
|
|
|
tasks = [make_request() for _ in range(concurrent_count)]
|
|
|
results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
|
|
# 收集成功的指标
|
|
|
for result in results:
|
|
|
if result and not isinstance(result, Exception):
|
|
|
metrics.append(result)
|
|
|
|
|
|
return self._create_benchmark_result(f"Concurrent Requests ({concurrent_count}) Benchmark", metrics)
|
|
|
|
|
|
async def benchmark_retry_impact(self) -> BenchmarkResult:
|
|
|
"""基准测试重试机制的影响"""
|
|
|
self.logger.info("Starting retry impact benchmark")
|
|
|
|
|
|
test_request = GenerationRequest(
|
|
|
function_name="retry_test",
|
|
|
function_info={
|
|
|
'name': 'retry_test',
|
|
|
'return_type': 'int',
|
|
|
'parameters': [{'name': 'x', 'type': 'int'}]
|
|
|
},
|
|
|
verification_goals=['functional_correctness'],
|
|
|
max_retries=3,
|
|
|
validate=True,
|
|
|
store=False
|
|
|
)
|
|
|
|
|
|
metrics = []
|
|
|
|
|
|
async with LLMGenerator(api_key=self.api_key, base_url=self.base_url) as generator:
|
|
|
for i in range(10):
|
|
|
self.monitor.start_monitoring()
|
|
|
start_time = time.time()
|
|
|
|
|
|
try:
|
|
|
result = await generator.generate_specification(test_request)
|
|
|
end_time = time.time()
|
|
|
|
|
|
memory_usage, cpu_usage = self.monitor.stop_monitoring()
|
|
|
|
|
|
metric = PerformanceMetrics(
|
|
|
function_name="retry_test",
|
|
|
generation_time=end_time - start_time,
|
|
|
tokens_used=result.tokens_used,
|
|
|
quality_score=result.quality_score,
|
|
|
api_response_time=result.generation_time,
|
|
|
token_rate=result.tokens_used / (end_time - start_time),
|
|
|
memory_usage=memory_usage,
|
|
|
cpu_usage=cpu_usage,
|
|
|
timestamp=time.time(),
|
|
|
complexity_level="basic",
|
|
|
verification_goals=['functional_correctness']
|
|
|
)
|
|
|
metrics.append(metric)
|
|
|
|
|
|
except Exception as e:
|
|
|
self.logger.error(f"Retry test {i + 1} failed: {e}")
|
|
|
|
|
|
return self._create_benchmark_result("Retry Impact Benchmark", metrics)
|
|
|
|
|
|
def _create_benchmark_result(self, test_name: str, metrics: List[PerformanceMetrics]) -> BenchmarkResult:
|
|
|
"""创建基准测试结果"""
|
|
|
if not metrics:
|
|
|
return BenchmarkResult(
|
|
|
test_name=test_name,
|
|
|
total_tests=0,
|
|
|
successful_tests=0,
|
|
|
average_generation_time=0,
|
|
|
average_tokens_used=0,
|
|
|
average_quality_score=0,
|
|
|
average_token_rate=0,
|
|
|
min_generation_time=0,
|
|
|
max_generation_time=0,
|
|
|
std_generation_time=0,
|
|
|
total_memory_usage=0,
|
|
|
total_cpu_usage=0,
|
|
|
metrics=[]
|
|
|
)
|
|
|
|
|
|
successful_metrics = [m for m in metrics if m.quality_score > 0]
|
|
|
generation_times = [m.generation_time for m in successful_metrics]
|
|
|
|
|
|
return BenchmarkResult(
|
|
|
test_name=test_name,
|
|
|
total_tests=len(metrics),
|
|
|
successful_tests=len(successful_metrics),
|
|
|
average_generation_time=statistics.mean(generation_times) if generation_times else 0,
|
|
|
average_tokens_used=statistics.mean([m.tokens_used for m in successful_metrics]) if successful_metrics else 0,
|
|
|
average_quality_score=statistics.mean([m.quality_score for m in successful_metrics]) if successful_metrics else 0,
|
|
|
average_token_rate=statistics.mean([m.token_rate for m in successful_metrics]) if successful_metrics else 0,
|
|
|
min_generation_time=min(generation_times) if generation_times else 0,
|
|
|
max_generation_time=max(generation_times) if generation_times else 0,
|
|
|
std_generation_time=statistics.stdev(generation_times) if len(generation_times) > 1 else 0,
|
|
|
total_memory_usage=sum(m.memory_usage for m in metrics),
|
|
|
total_cpu_usage=sum(m.cpu_usage for m in metrics),
|
|
|
metrics=metrics
|
|
|
)
|
|
|
|
|
|
def generate_performance_report(self, results: Dict[str, BenchmarkResult], output_dir: str = "performance_reports"):
|
|
|
"""生成性能报告"""
|
|
|
output_path = Path(output_dir)
|
|
|
output_path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
report = {
|
|
|
'timestamp': time.time(),
|
|
|
'summary': self._generate_performance_summary(results),
|
|
|
'detailed_results': {name: asdict(result) for name, result in results.items()},
|
|
|
'recommendations': self._generate_recommendations(results)
|
|
|
}
|
|
|
|
|
|
# 保存JSON报告
|
|
|
json_file = output_path / f"performance_report_{int(time.time())}.json"
|
|
|
with open(json_file, 'w') as f:
|
|
|
json.dump(report, f, indent=2)
|
|
|
|
|
|
# 生成可视化报告
|
|
|
self._generate_visualizations(results, output_path)
|
|
|
|
|
|
self.logger.info(f"Performance report generated: {json_file}")
|
|
|
return str(json_file)
|
|
|
|
|
|
def _generate_performance_summary(self, results: Dict[str, BenchmarkResult]) -> Dict[str, Any]:
|
|
|
"""生成性能摘要"""
|
|
|
all_metrics = []
|
|
|
for result in results.values():
|
|
|
all_metrics.extend(result.metrics)
|
|
|
|
|
|
if not all_metrics:
|
|
|
return {'error': 'No performance data available'}
|
|
|
|
|
|
generation_times = [m.generation_time for m in all_metrics if m.generation_time > 0]
|
|
|
quality_scores = [m.quality_score for m in all_metrics if m.quality_score > 0]
|
|
|
token_rates = [m.token_rate for m in all_metrics if m.token_rate > 0]
|
|
|
|
|
|
return {
|
|
|
'total_tests': sum(r.total_tests for r in results.values()),
|
|
|
'successful_tests': sum(r.successful_tests for r in results.values()),
|
|
|
'success_rate': sum(r.successful_tests for r in results.values()) / sum(r.total_tests for r in results.values()),
|
|
|
'average_generation_time': statistics.mean(generation_times) if generation_times else 0,
|
|
|
'average_quality_score': statistics.mean(quality_scores) if quality_scores else 0,
|
|
|
'average_token_rate': statistics.mean(token_rates) if token_rates else 0,
|
|
|
'total_memory_usage': sum(r.total_memory_usage for r in results.values()),
|
|
|
'total_cpu_usage': sum(r.total_cpu_usage for r in results.values()),
|
|
|
'best_performing_test': max(results.items(), key=lambda x: x[1].average_token_rate),
|
|
|
'slowest_test': max(results.items(), key=lambda x: x[1].average_generation_time)
|
|
|
}
|
|
|
|
|
|
def _generate_recommendations(self, results: Dict[str, BenchmarkResult]) -> List[str]:
|
|
|
"""生成性能建议"""
|
|
|
recommendations = []
|
|
|
|
|
|
# 分析平均生成时间
|
|
|
avg_times = [r.average_generation_time for r in results.values() if r.average_generation_time > 0]
|
|
|
if avg_times:
|
|
|
avg_time = statistics.mean(avg_times)
|
|
|
if avg_time > 30:
|
|
|
recommendations.append("Average generation time is high (>30s). Consider optimizing prompts or using faster models.")
|
|
|
|
|
|
# 分析成功率
|
|
|
total_tests = sum(r.total_tests for r in results.values())
|
|
|
successful_tests = sum(r.successful_tests for r in results.values())
|
|
|
if total_tests > 0:
|
|
|
success_rate = successful_tests / total_tests
|
|
|
if success_rate < 0.9:
|
|
|
recommendations.append("Success rate is below 90%. Check API reliability and error handling.")
|
|
|
|
|
|
# 分析令牌速率
|
|
|
token_rates = [r.average_token_rate for r in results.values() if r.average_token_rate > 0]
|
|
|
if token_rates:
|
|
|
avg_token_rate = statistics.mean(token_rates)
|
|
|
if avg_token_rate < 10:
|
|
|
recommendations.append("Token generation rate is low. Consider network optimization or model selection.")
|
|
|
|
|
|
# 分析内存使用
|
|
|
total_memory = sum(r.total_memory_usage for r in results.values())
|
|
|
if total_memory > 1000: # 1GB
|
|
|
recommendations.append("High memory usage detected. Consider optimizing memory management.")
|
|
|
|
|
|
return recommendations
|
|
|
|
|
|
def _generate_visualizations(self, results: Dict[str, BenchmarkResult], output_path: Path):
|
|
|
"""生成可视化图表"""
|
|
|
# 设置图表样式
|
|
|
plt.style.use('seaborn-v0_8')
|
|
|
|
|
|
# 1. 性能对比图
|
|
|
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
|
|
|
fig.suptitle('LLM Generation Performance Analysis', fontsize=16)
|
|
|
|
|
|
test_names = list(results.keys())
|
|
|
generation_times = [r.average_generation_time for r in results.values()]
|
|
|
quality_scores = [r.average_quality_score for r in results.values()]
|
|
|
token_rates = [r.average_token_rate for r in results.values()]
|
|
|
|
|
|
# 生成时间对比
|
|
|
axes[0, 0].bar(test_names, generation_times, color='skyblue')
|
|
|
axes[0, 0].set_title('Average Generation Time')
|
|
|
axes[0, 0].set_ylabel('Time (seconds)')
|
|
|
axes[0, 0].tick_params(axis='x', rotation=45)
|
|
|
|
|
|
# 质量分数对比
|
|
|
axes[0, 1].bar(test_names, quality_scores, color='lightgreen')
|
|
|
axes[0, 1].set_title('Average Quality Score')
|
|
|
axes[0, 1].set_ylabel('Quality Score')
|
|
|
axes[0, 1].tick_params(axis='x', rotation=45)
|
|
|
|
|
|
# 令牌速率对比
|
|
|
axes[1, 0].bar(test_names, token_rates, color='lightcoral')
|
|
|
axes[1, 0].set_title('Average Token Rate')
|
|
|
axes[1, 0].set_ylabel('Tokens/second')
|
|
|
axes[1, 0].tick_params(axis='x', rotation=45)
|
|
|
|
|
|
# 成功率对比
|
|
|
success_rates = [r.successful_tests / r.total_tests if r.total_tests > 0 else 0 for r in results.values()]
|
|
|
axes[1, 1].bar(test_names, success_rates, color='gold')
|
|
|
axes[1, 1].set_title('Success Rate')
|
|
|
axes[1, 1].set_ylabel('Success Rate')
|
|
|
axes[1, 1].tick_params(axis='x', rotation=45)
|
|
|
|
|
|
plt.tight_layout()
|
|
|
plt.savefig(output_path / 'performance_comparison.png', dpi=300, bbox_inches='tight')
|
|
|
plt.close()
|
|
|
|
|
|
# 2. 复杂度对比图(如果有)
|
|
|
complexity_results = {k: v for k, v in results.items() if 'complexity' in k.lower()}
|
|
|
if complexity_results:
|
|
|
fig, ax = plt.subplots(1, 1, figsize=(10, 6))
|
|
|
|
|
|
complexities = list(complexity_results.keys())
|
|
|
times = [complexity_results[c].average_generation_time for c in complexities]
|
|
|
qualities = [complexity_results[c].average_quality_score for c in complexities]
|
|
|
|
|
|
x = range(len(complexities))
|
|
|
width = 0.35
|
|
|
|
|
|
ax.bar([i - width/2 for i in x], times, width, label='Generation Time', color='skyblue')
|
|
|
ax.bar([i + width/2 for i in x], qualities, width, label='Quality Score', color='lightgreen')
|
|
|
|
|
|
ax.set_xlabel('Complexity Level')
|
|
|
ax.set_ylabel('Value')
|
|
|
ax.set_title('Performance vs Complexity')
|
|
|
ax.set_xticks(x)
|
|
|
ax.set_xticklabels(complexities)
|
|
|
ax.legend()
|
|
|
|
|
|
plt.tight_layout()
|
|
|
plt.savefig(output_path / 'complexity_analysis.png', dpi=300, bbox_inches='tight')
|
|
|
plt.close()
|
|
|
|
|
|
self.logger.info(f"Performance visualizations saved to {output_path}")
|
|
|
|
|
|
|
|
|
# 便捷函数
|
|
|
async def run_performance_benchmark(api_key: str, base_url: str = "https://api.siliconflow.cn/v1") -> Dict[str, BenchmarkResult]:
|
|
|
"""运行性能基准测试的便捷函数"""
|
|
|
tester = LLMPerformanceTester(api_key, base_url)
|
|
|
|
|
|
# 运行所有基准测试
|
|
|
results = {}
|
|
|
|
|
|
# API性能基准测试
|
|
|
results['api_performance'] = await tester.benchmark_api_performance()
|
|
|
|
|
|
# 复杂度基准测试
|
|
|
complexity_results = await tester.benchmark_by_complexity()
|
|
|
results.update(complexity_results)
|
|
|
|
|
|
# 并发请求基准测试
|
|
|
results['concurrent_5'] = await tester.benchmark_concurrent_requests(5)
|
|
|
|
|
|
# 重试影响基准测试
|
|
|
results['retry_impact'] = await tester.benchmark_retry_impact()
|
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
def save_performance_report(results: Dict[str, BenchmarkResult], output_dir: str = "performance_reports"):
|
|
|
"""保存性能报告的便捷函数"""
|
|
|
tester = LLMPerformanceTester("dummy_key") # 只用于生成报告
|
|
|
return tester.generate_performance_report(results, output_dir)
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
# 运行性能测试
|
|
|
import os
|
|
|
api_key = os.getenv('SILICONFLOW_API_KEY')
|
|
|
if not api_key:
|
|
|
print("Please set SILICONFLOW_API_KEY environment variable")
|
|
|
exit(1)
|
|
|
|
|
|
async def main():
|
|
|
results = await run_performance_benchmark(api_key)
|
|
|
report_path = save_performance_report(results)
|
|
|
print(f"Performance report saved to: {report_path}")
|
|
|
|
|
|
asyncio.run(main()) |