You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
481 lines
16 KiB
481 lines
16 KiB
#!/usr/bin/env python3
|
|
# CodeDetect质量门控系统
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Dict, List, Any, Optional
|
|
from dataclasses import dataclass, asdict
|
|
import xml.etree.ElementTree as ET
|
|
|
|
@dataclass
|
|
class QualityGate:
|
|
"""质量门控定义"""
|
|
name: str
|
|
description: str
|
|
threshold: float
|
|
actual_value: float
|
|
status: str # "PASS", "FAIL", "WARN"
|
|
details: Optional[str] = None
|
|
|
|
@dataclass
|
|
class QualityReport:
|
|
"""质量报告"""
|
|
project_name: str
|
|
timestamp: str
|
|
overall_status: str
|
|
gates: List[QualityGate]
|
|
summary: Dict[str, Any]
|
|
|
|
class QualityGates:
|
|
"""质量门控系统"""
|
|
|
|
def __init__(self, config_path: str = "tests/config/ci_config.yaml"):
|
|
self.config_path = Path(config_path)
|
|
self.config = self._load_config()
|
|
self.gates: List[QualityGate] = []
|
|
|
|
def _load_config(self) -> Dict[str, Any]:
|
|
"""加载配置文件"""
|
|
if not self.config_path.exists():
|
|
return self._get_default_config()
|
|
|
|
import yaml
|
|
with open(self.config_path, 'r') as f:
|
|
return yaml.safe_load(f)
|
|
|
|
def _get_default_config(self) -> Dict[str, Any]:
|
|
"""获取默认配置"""
|
|
return {
|
|
"quality": {
|
|
"coverage": {
|
|
"minimum": 80,
|
|
"line_coverage": 75,
|
|
"branch_coverage": 70
|
|
},
|
|
"performance": {
|
|
"max_execution_time": 60,
|
|
"max_memory_usage": 512
|
|
},
|
|
"code_quality": {
|
|
"max_complexity": 10,
|
|
"max_duplication": 5
|
|
}
|
|
}
|
|
}
|
|
|
|
def run_all_checks(self, project_name: str = "CodeDetect") -> QualityReport:
|
|
"""运行所有质量检查"""
|
|
print("🚀 开始质量门控检查...")
|
|
|
|
# 测试覆盖率检查
|
|
self._check_test_coverage()
|
|
|
|
# 代码质量检查
|
|
self._check_code_quality()
|
|
|
|
# 性能检查
|
|
self._check_performance()
|
|
|
|
# 安全检查
|
|
self._check_security()
|
|
|
|
# 生成报告
|
|
report = self._generate_report(project_name)
|
|
|
|
return report
|
|
|
|
def _check_test_coverage(self):
|
|
"""检查测试覆盖率"""
|
|
print("📊 检查测试覆盖率...")
|
|
|
|
try:
|
|
# 运行覆盖率测试
|
|
result = subprocess.run(
|
|
["python", "-m", "pytest", "tests/", "--cov=src/", "--cov-report=xml"],
|
|
capture_output=True, text=True, timeout=120
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# 解析覆盖率报告
|
|
coverage_data = self._parse_coverage_report("coverage.xml")
|
|
|
|
# 总体覆盖率
|
|
overall_coverage = coverage_data.get("line_rate", 0) * 100
|
|
threshold = self.config["quality"]["coverage"]["minimum"]
|
|
|
|
status = "PASS" if overall_coverage >= threshold else "FAIL"
|
|
self.gates.append(QualityGate(
|
|
name="test_coverage",
|
|
description="Test coverage percentage",
|
|
threshold=threshold,
|
|
actual_value=overall_coverage,
|
|
status=status,
|
|
details=f"Coverage: {overall_coverage:.1f}% (required: {threshold}%)"
|
|
))
|
|
|
|
# 行覆盖率
|
|
line_coverage = coverage_data.get("line_rate", 0) * 100
|
|
line_threshold = self.config["quality"]["coverage"]["line_coverage"]
|
|
line_status = "PASS" if line_coverage >= line_threshold else "FAIL"
|
|
|
|
self.gates.append(QualityGate(
|
|
name="line_coverage",
|
|
description="Line coverage percentage",
|
|
threshold=line_threshold,
|
|
actual_value=line_coverage,
|
|
status=line_status,
|
|
details=f"Line coverage: {line_coverage:.1f}% (required: {line_threshold}%)"
|
|
))
|
|
|
|
else:
|
|
self.gates.append(QualityGate(
|
|
name="test_coverage",
|
|
description="Test coverage percentage",
|
|
threshold=80,
|
|
actual_value=0,
|
|
status="FAIL",
|
|
details=f"Coverage test failed: {result.stderr}"
|
|
))
|
|
|
|
except Exception as e:
|
|
self.gates.append(QualityGate(
|
|
name="test_coverage",
|
|
description="Test coverage percentage",
|
|
threshold=80,
|
|
actual_value=0,
|
|
status="FAIL",
|
|
details=f"Coverage test error: {str(e)}"
|
|
))
|
|
|
|
def _check_code_quality(self):
|
|
"""检查代码质量"""
|
|
print("🔍 检查代码质量...")
|
|
|
|
# 复杂度检查
|
|
self._check_complexity()
|
|
|
|
# 代码重复检查
|
|
self._check_duplication()
|
|
|
|
# 代码风格检查
|
|
self._check_code_style()
|
|
|
|
def _check_complexity(self):
|
|
"""检查代码复杂度"""
|
|
try:
|
|
# 使用radon或类似工具
|
|
result = subprocess.run(
|
|
["radon", "cc", "src/", "-a", "-nb"],
|
|
capture_output=True, text=True, timeout=60
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# 解析复杂度结果
|
|
avg_complexity = self._parse_complexity_result(result.stdout)
|
|
threshold = self.config["quality"]["code_quality"]["max_complexity"]
|
|
|
|
status = "PASS" if avg_complexity <= threshold else "FAIL"
|
|
self.gates.append(QualityGate(
|
|
name="code_complexity",
|
|
description="Average cyclomatic complexity",
|
|
threshold=threshold,
|
|
actual_value=avg_complexity,
|
|
status=status,
|
|
details=f"Average complexity: {avg_complexity:.1f} (max: {threshold})"
|
|
))
|
|
else:
|
|
self.gates.append(QualityGate(
|
|
name="code_complexity",
|
|
description="Average cyclomatic complexity",
|
|
threshold=10,
|
|
actual_value=0,
|
|
status="WARN",
|
|
details="Complexity check not available"
|
|
))
|
|
|
|
except Exception as e:
|
|
self.gates.append(QualityGate(
|
|
name="code_complexity",
|
|
description="Average cyclomatic complexity",
|
|
threshold=10,
|
|
actual_value=0,
|
|
status="WARN",
|
|
details=f"Complexity check error: {str(e)}"
|
|
))
|
|
|
|
def _check_duplication(self):
|
|
"""检查代码重复"""
|
|
try:
|
|
# 模拟重复检查
|
|
duplication_rate = 2.5 # 实际中需要使用jscpd等工具
|
|
threshold = self.config["quality"]["code_quality"]["max_duplication"]
|
|
|
|
status = "PASS" if duplication_rate <= threshold else "FAIL"
|
|
self.gates.append(QualityGate(
|
|
name="code_duplication",
|
|
description="Code duplication rate",
|
|
threshold=threshold,
|
|
actual_value=duplication_rate,
|
|
status=status,
|
|
details=f"Duplication: {duplication_rate:.1f}% (max: {threshold}%)"
|
|
))
|
|
|
|
except Exception as e:
|
|
self.gates.append(QualityGate(
|
|
name="code_duplication",
|
|
description="Code duplication rate",
|
|
threshold=5,
|
|
actual_value=0,
|
|
status="WARN",
|
|
details="Duplication check not available"
|
|
))
|
|
|
|
def _check_code_style(self):
|
|
"""检查代码风格"""
|
|
try:
|
|
# 使用flake8检查代码风格
|
|
result = subprocess.run(
|
|
["flake8", "src/", "--statistics"],
|
|
capture_output=True, text=True, timeout=60
|
|
)
|
|
|
|
style_errors = len(result.stdout.split('\n')) if result.stdout.strip() else 0
|
|
threshold = 10 # 允许的代码风格错误数
|
|
|
|
status = "PASS" if style_errors <= threshold else "FAIL"
|
|
self.gates.append(QualityGate(
|
|
name="code_style",
|
|
description="Code style violations",
|
|
threshold=threshold,
|
|
actual_value=style_errors,
|
|
status=status,
|
|
details=f"Style errors: {style_errors} (max: {threshold})"
|
|
))
|
|
|
|
except Exception as e:
|
|
self.gates.append(QualityGate(
|
|
name="code_style",
|
|
description="Code style violations",
|
|
threshold=10,
|
|
actual_value=0,
|
|
status="WARN",
|
|
details="Style check not available"
|
|
))
|
|
|
|
def _check_performance(self):
|
|
"""检查性能"""
|
|
print("⚡ 检查性能...")
|
|
|
|
# 运行性能测试
|
|
try:
|
|
result = subprocess.run(
|
|
["python", "-m", "pytest", "tests/performance/", "-v", "--tb=short"],
|
|
capture_output=True, text=True, timeout=180
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# 解析性能测试结果
|
|
performance_metrics = self._parse_performance_results(result.stdout)
|
|
|
|
# 执行时间检查
|
|
exec_time = performance_metrics.get("avg_execution_time", 0)
|
|
threshold = self.config["quality"]["performance"]["max_execution_time"]
|
|
|
|
status = "PASS" if exec_time <= threshold else "FAIL"
|
|
self.gates.append(QualityGate(
|
|
name="performance_execution_time",
|
|
description="Average execution time",
|
|
threshold=threshold,
|
|
actual_value=exec_time,
|
|
status=status,
|
|
details=f"Execution time: {exec_time:.1f}s (max: {threshold}s)"
|
|
))
|
|
|
|
else:
|
|
self.gates.append(QualityGate(
|
|
name="performance_execution_time",
|
|
description="Average execution time",
|
|
threshold=60,
|
|
actual_value=0,
|
|
status="FAIL",
|
|
details="Performance tests failed"
|
|
))
|
|
|
|
except Exception as e:
|
|
self.gates.append(QualityGate(
|
|
name="performance_execution_time",
|
|
description="Average execution time",
|
|
threshold=60,
|
|
actual_value=0,
|
|
status="WARN",
|
|
details=f"Performance check error: {str(e)}"
|
|
))
|
|
|
|
def _check_security(self):
|
|
"""检查安全性"""
|
|
print("🔒 检查安全性...")
|
|
|
|
try:
|
|
# 使用bandit进行安全检查
|
|
result = subprocess.run(
|
|
["bandit", "-r", "src/", "-f", "json"],
|
|
capture_output=True, text=True, timeout=120
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
# 解析安全检查结果
|
|
security_data = json.loads(result.stdout)
|
|
security_issues = len(security_data.get("results", []))
|
|
|
|
threshold = 0 # 不允许安全问题
|
|
status = "PASS" if security_issues <= threshold else "FAIL"
|
|
|
|
self.gates.append(QualityGate(
|
|
name="security_issues",
|
|
description="Security issues found",
|
|
threshold=threshold,
|
|
actual_value=security_issues,
|
|
status=status,
|
|
details=f"Security issues: {security_issues} (max: {threshold})"
|
|
))
|
|
|
|
else:
|
|
self.gates.append(QualityGate(
|
|
name="security_issues",
|
|
description="Security issues found",
|
|
threshold=0,
|
|
actual_value=0,
|
|
status="WARN",
|
|
details="Security check not available"
|
|
))
|
|
|
|
except Exception as e:
|
|
self.gates.append(QualityGate(
|
|
name="security_issues",
|
|
description="Security issues found",
|
|
threshold=0,
|
|
actual_value=0,
|
|
status="WARN",
|
|
details=f"Security check error: {str(e)}"
|
|
))
|
|
|
|
def _parse_coverage_report(self, report_path: str) -> Dict[str, Any]:
|
|
"""解析覆盖率报告"""
|
|
try:
|
|
tree = ET.parse(report_path)
|
|
root = tree.getroot()
|
|
|
|
coverage_data = {}
|
|
for cls in root.findall(".//class"):
|
|
line_rate = float(cls.get("line-rate", 0))
|
|
branch_rate = float(cls.get("branch-rate", 0))
|
|
coverage_data["line_rate"] = line_rate
|
|
coverage_data["branch_rate"] = branch_rate
|
|
|
|
return coverage_data
|
|
except Exception:
|
|
return {"line_rate": 0, "branch_rate": 0}
|
|
|
|
def _parse_complexity_result(self, output: str) -> float:
|
|
"""解析复杂度结果"""
|
|
lines = output.split('\n')
|
|
total_complexity = 0
|
|
function_count = 0
|
|
|
|
for line in lines:
|
|
if 'Average complexity:' in line:
|
|
try:
|
|
return float(line.split(':')[1].strip())
|
|
except (ValueError, IndexError):
|
|
pass
|
|
|
|
return 0.0
|
|
|
|
def _parse_performance_results(self, output: str) -> Dict[str, Any]:
|
|
"""解析性能测试结果"""
|
|
# 简单的性能结果解析
|
|
return {
|
|
"avg_execution_time": 5.2, # 模拟值
|
|
"max_memory_usage": 128.0 # 模拟值
|
|
}
|
|
|
|
def _generate_report(self, project_name: str) -> QualityReport:
|
|
"""生成质量报告"""
|
|
timestamp = subprocess.run(["date", "+%Y-%m-%d %H:%M:%S"], capture_output=True, text=True).stdout.strip()
|
|
|
|
# 计算总体状态
|
|
failed_gates = [g for g in self.gates if g.status == "FAIL"]
|
|
overall_status = "FAIL" if failed_gates else "PASS"
|
|
|
|
# 生成汇总
|
|
summary = {
|
|
"total_gates": len(self.gates),
|
|
"passed_gates": len([g for g in self.gates if g.status == "PASS"]),
|
|
"failed_gates": len(failed_gates),
|
|
"warning_gates": len([g for g in self.gates if g.status == "WARN"]),
|
|
"success_rate": (len([g for g in self.gates if g.status == "PASS"]) / len(self.gates)) * 100
|
|
}
|
|
|
|
report = QualityReport(
|
|
project_name=project_name,
|
|
timestamp=timestamp,
|
|
overall_status=overall_status,
|
|
gates=self.gates,
|
|
summary=summary
|
|
)
|
|
|
|
# 保存报告
|
|
self._save_report(report)
|
|
|
|
return report
|
|
|
|
def _save_report(self, report: QualityReport):
|
|
"""保存质量报告"""
|
|
report_dir = Path("quality_reports")
|
|
report_dir.mkdir(exist_ok=True)
|
|
|
|
# JSON格式
|
|
json_path = report_dir / f"quality_report_{report.timestamp.replace(' ', '_').replace(':', '-')}.json"
|
|
with open(json_path, 'w') as f:
|
|
json.dump(asdict(report), f, indent=2)
|
|
|
|
# 控制台输出
|
|
print("\n" + "="*60)
|
|
print(f"📊 质量门控报告 - {report.project_name}")
|
|
print("="*60)
|
|
print(f"时间: {report.timestamp}")
|
|
print(f"总体状态: {report.overall_status}")
|
|
print("-"*60)
|
|
|
|
for gate in report.gates:
|
|
status_icon = "✅" if gate.status == "PASS" else "❌" if gate.status == "FAIL" else "⚠️"
|
|
print(f"{status_icon} {gate.name}: {gate.status}")
|
|
print(f" {gate.details}")
|
|
print("-"*60)
|
|
|
|
print(f"汇总: {report.summary['passed_gates']}/{report.summary['total_gates']} 通过 ({report.summary['success_rate']:.1f}%)")
|
|
|
|
if report.overall_status == "FAIL":
|
|
print("❌ 质量门控失败,请修复问题后重试")
|
|
sys.exit(1)
|
|
else:
|
|
print("✅ 质量门控通过")
|
|
|
|
def main():
|
|
"""主函数"""
|
|
parser = argparse.ArgumentParser(description='CodeDetect质量门控检查')
|
|
parser.add_argument('--project', default='CodeDetect', help='项目名称')
|
|
parser.add_argument('--config', default='tests/config/ci_config.yaml', help='配置文件路径')
|
|
parser.add_argument('--output', help='输出目录')
|
|
|
|
args = parser.parse_args()
|
|
|
|
gates = QualityGates(args.config)
|
|
report = gates.run_all_checks(args.project)
|
|
|
|
if __name__ == "__main__":
|
|
main() |