#!/usr/bin/env python3 # CodeDetect质量门控系统 import os import sys import json import argparse import subprocess from pathlib import Path from typing import Dict, List, Any, Optional from dataclasses import dataclass, asdict import xml.etree.ElementTree as ET @dataclass class QualityGate: """质量门控定义""" name: str description: str threshold: float actual_value: float status: str # "PASS", "FAIL", "WARN" details: Optional[str] = None @dataclass class QualityReport: """质量报告""" project_name: str timestamp: str overall_status: str gates: List[QualityGate] summary: Dict[str, Any] class QualityGates: """质量门控系统""" def __init__(self, config_path: str = "tests/config/ci_config.yaml"): self.config_path = Path(config_path) self.config = self._load_config() self.gates: List[QualityGate] = [] def _load_config(self) -> Dict[str, Any]: """加载配置文件""" if not self.config_path.exists(): return self._get_default_config() import yaml with open(self.config_path, 'r') as f: return yaml.safe_load(f) def _get_default_config(self) -> Dict[str, Any]: """获取默认配置""" return { "quality": { "coverage": { "minimum": 80, "line_coverage": 75, "branch_coverage": 70 }, "performance": { "max_execution_time": 60, "max_memory_usage": 512 }, "code_quality": { "max_complexity": 10, "max_duplication": 5 } } } def run_all_checks(self, project_name: str = "CodeDetect") -> QualityReport: """运行所有质量检查""" print("🚀 开始质量门控检查...") # 测试覆盖率检查 self._check_test_coverage() # 代码质量检查 self._check_code_quality() # 性能检查 self._check_performance() # 安全检查 self._check_security() # 生成报告 report = self._generate_report(project_name) return report def _check_test_coverage(self): """检查测试覆盖率""" print("📊 检查测试覆盖率...") try: # 运行覆盖率测试 result = subprocess.run( ["python", "-m", "pytest", "tests/", "--cov=src/", "--cov-report=xml"], capture_output=True, text=True, timeout=120 ) if result.returncode == 0: # 解析覆盖率报告 coverage_data = self._parse_coverage_report("coverage.xml") # 总体覆盖率 overall_coverage = coverage_data.get("line_rate", 0) * 100 threshold = self.config["quality"]["coverage"]["minimum"] status = "PASS" if overall_coverage >= threshold else "FAIL" self.gates.append(QualityGate( name="test_coverage", description="Test coverage percentage", threshold=threshold, actual_value=overall_coverage, status=status, details=f"Coverage: {overall_coverage:.1f}% (required: {threshold}%)" )) # 行覆盖率 line_coverage = coverage_data.get("line_rate", 0) * 100 line_threshold = self.config["quality"]["coverage"]["line_coverage"] line_status = "PASS" if line_coverage >= line_threshold else "FAIL" self.gates.append(QualityGate( name="line_coverage", description="Line coverage percentage", threshold=line_threshold, actual_value=line_coverage, status=line_status, details=f"Line coverage: {line_coverage:.1f}% (required: {line_threshold}%)" )) else: self.gates.append(QualityGate( name="test_coverage", description="Test coverage percentage", threshold=80, actual_value=0, status="FAIL", details=f"Coverage test failed: {result.stderr}" )) except Exception as e: self.gates.append(QualityGate( name="test_coverage", description="Test coverage percentage", threshold=80, actual_value=0, status="FAIL", details=f"Coverage test error: {str(e)}" )) def _check_code_quality(self): """检查代码质量""" print("🔍 检查代码质量...") # 复杂度检查 self._check_complexity() # 代码重复检查 self._check_duplication() # 代码风格检查 self._check_code_style() def _check_complexity(self): """检查代码复杂度""" try: # 使用radon或类似工具 result = subprocess.run( ["radon", "cc", "src/", "-a", "-nb"], capture_output=True, text=True, timeout=60 ) if result.returncode == 0: # 解析复杂度结果 avg_complexity = self._parse_complexity_result(result.stdout) threshold = self.config["quality"]["code_quality"]["max_complexity"] status = "PASS" if avg_complexity <= threshold else "FAIL" self.gates.append(QualityGate( name="code_complexity", description="Average cyclomatic complexity", threshold=threshold, actual_value=avg_complexity, status=status, details=f"Average complexity: {avg_complexity:.1f} (max: {threshold})" )) else: self.gates.append(QualityGate( name="code_complexity", description="Average cyclomatic complexity", threshold=10, actual_value=0, status="WARN", details="Complexity check not available" )) except Exception as e: self.gates.append(QualityGate( name="code_complexity", description="Average cyclomatic complexity", threshold=10, actual_value=0, status="WARN", details=f"Complexity check error: {str(e)}" )) def _check_duplication(self): """检查代码重复""" try: # 模拟重复检查 duplication_rate = 2.5 # 实际中需要使用jscpd等工具 threshold = self.config["quality"]["code_quality"]["max_duplication"] status = "PASS" if duplication_rate <= threshold else "FAIL" self.gates.append(QualityGate( name="code_duplication", description="Code duplication rate", threshold=threshold, actual_value=duplication_rate, status=status, details=f"Duplication: {duplication_rate:.1f}% (max: {threshold}%)" )) except Exception as e: self.gates.append(QualityGate( name="code_duplication", description="Code duplication rate", threshold=5, actual_value=0, status="WARN", details="Duplication check not available" )) def _check_code_style(self): """检查代码风格""" try: # 使用flake8检查代码风格 result = subprocess.run( ["flake8", "src/", "--statistics"], capture_output=True, text=True, timeout=60 ) style_errors = len(result.stdout.split('\n')) if result.stdout.strip() else 0 threshold = 10 # 允许的代码风格错误数 status = "PASS" if style_errors <= threshold else "FAIL" self.gates.append(QualityGate( name="code_style", description="Code style violations", threshold=threshold, actual_value=style_errors, status=status, details=f"Style errors: {style_errors} (max: {threshold})" )) except Exception as e: self.gates.append(QualityGate( name="code_style", description="Code style violations", threshold=10, actual_value=0, status="WARN", details="Style check not available" )) def _check_performance(self): """检查性能""" print("⚡ 检查性能...") # 运行性能测试 try: result = subprocess.run( ["python", "-m", "pytest", "tests/performance/", "-v", "--tb=short"], capture_output=True, text=True, timeout=180 ) if result.returncode == 0: # 解析性能测试结果 performance_metrics = self._parse_performance_results(result.stdout) # 执行时间检查 exec_time = performance_metrics.get("avg_execution_time", 0) threshold = self.config["quality"]["performance"]["max_execution_time"] status = "PASS" if exec_time <= threshold else "FAIL" self.gates.append(QualityGate( name="performance_execution_time", description="Average execution time", threshold=threshold, actual_value=exec_time, status=status, details=f"Execution time: {exec_time:.1f}s (max: {threshold}s)" )) else: self.gates.append(QualityGate( name="performance_execution_time", description="Average execution time", threshold=60, actual_value=0, status="FAIL", details="Performance tests failed" )) except Exception as e: self.gates.append(QualityGate( name="performance_execution_time", description="Average execution time", threshold=60, actual_value=0, status="WARN", details=f"Performance check error: {str(e)}" )) def _check_security(self): """检查安全性""" print("🔒 检查安全性...") try: # 使用bandit进行安全检查 result = subprocess.run( ["bandit", "-r", "src/", "-f", "json"], capture_output=True, text=True, timeout=120 ) if result.returncode == 0: # 解析安全检查结果 security_data = json.loads(result.stdout) security_issues = len(security_data.get("results", [])) threshold = 0 # 不允许安全问题 status = "PASS" if security_issues <= threshold else "FAIL" self.gates.append(QualityGate( name="security_issues", description="Security issues found", threshold=threshold, actual_value=security_issues, status=status, details=f"Security issues: {security_issues} (max: {threshold})" )) else: self.gates.append(QualityGate( name="security_issues", description="Security issues found", threshold=0, actual_value=0, status="WARN", details="Security check not available" )) except Exception as e: self.gates.append(QualityGate( name="security_issues", description="Security issues found", threshold=0, actual_value=0, status="WARN", details=f"Security check error: {str(e)}" )) def _parse_coverage_report(self, report_path: str) -> Dict[str, Any]: """解析覆盖率报告""" try: tree = ET.parse(report_path) root = tree.getroot() coverage_data = {} for cls in root.findall(".//class"): line_rate = float(cls.get("line-rate", 0)) branch_rate = float(cls.get("branch-rate", 0)) coverage_data["line_rate"] = line_rate coverage_data["branch_rate"] = branch_rate return coverage_data except Exception: return {"line_rate": 0, "branch_rate": 0} def _parse_complexity_result(self, output: str) -> float: """解析复杂度结果""" lines = output.split('\n') total_complexity = 0 function_count = 0 for line in lines: if 'Average complexity:' in line: try: return float(line.split(':')[1].strip()) except (ValueError, IndexError): pass return 0.0 def _parse_performance_results(self, output: str) -> Dict[str, Any]: """解析性能测试结果""" # 简单的性能结果解析 return { "avg_execution_time": 5.2, # 模拟值 "max_memory_usage": 128.0 # 模拟值 } def _generate_report(self, project_name: str) -> QualityReport: """生成质量报告""" timestamp = subprocess.run(["date", "+%Y-%m-%d %H:%M:%S"], capture_output=True, text=True).stdout.strip() # 计算总体状态 failed_gates = [g for g in self.gates if g.status == "FAIL"] overall_status = "FAIL" if failed_gates else "PASS" # 生成汇总 summary = { "total_gates": len(self.gates), "passed_gates": len([g for g in self.gates if g.status == "PASS"]), "failed_gates": len(failed_gates), "warning_gates": len([g for g in self.gates if g.status == "WARN"]), "success_rate": (len([g for g in self.gates if g.status == "PASS"]) / len(self.gates)) * 100 } report = QualityReport( project_name=project_name, timestamp=timestamp, overall_status=overall_status, gates=self.gates, summary=summary ) # 保存报告 self._save_report(report) return report def _save_report(self, report: QualityReport): """保存质量报告""" report_dir = Path("quality_reports") report_dir.mkdir(exist_ok=True) # JSON格式 json_path = report_dir / f"quality_report_{report.timestamp.replace(' ', '_').replace(':', '-')}.json" with open(json_path, 'w') as f: json.dump(asdict(report), f, indent=2) # 控制台输出 print("\n" + "="*60) print(f"📊 质量门控报告 - {report.project_name}") print("="*60) print(f"时间: {report.timestamp}") print(f"总体状态: {report.overall_status}") print("-"*60) for gate in report.gates: status_icon = "✅" if gate.status == "PASS" else "❌" if gate.status == "FAIL" else "⚠️" print(f"{status_icon} {gate.name}: {gate.status}") print(f" {gate.details}") print("-"*60) print(f"汇总: {report.summary['passed_gates']}/{report.summary['total_gates']} 通过 ({report.summary['success_rate']:.1f}%)") if report.overall_status == "FAIL": print("❌ 质量门控失败,请修复问题后重试") sys.exit(1) else: print("✅ 质量门控通过") def main(): """主函数""" parser = argparse.ArgumentParser(description='CodeDetect质量门控检查') parser.add_argument('--project', default='CodeDetect', help='项目名称') parser.add_argument('--config', default='tests/config/ci_config.yaml', help='配置文件路径') parser.add_argument('--output', help='输出目录') args = parser.parse_args() gates = QualityGates(args.config) report = gates.run_all_checks(args.project) if __name__ == "__main__": main()