""" 扫描服务 """ import asyncio from typing import List from sqlalchemy.orm import Session from app.database import SessionLocal from app.models.scan import Scan, ScanStatus from app.models.vulnerability import Vulnerability, VulnerabilityStatus, SeverityLevel, VulnerabilityCategory from app.services.analyzer_service import AnalyzerService from app.services.report_service import ReportService import json class ScanService: """扫描服务""" def __init__(self): self.analyzer_service = AnalyzerService() self.report_service = ReportService() async def run_scan(self, scan_id: int): """运行扫描任务""" db = SessionLocal() try: # 获取扫描任务 scan = db.query(Scan).filter(Scan.id == scan_id).first() if not scan: print(f"扫描任务不存在: {scan_id}") return # 更新扫描状态为运行中 scan.status = ScanStatus.RUNNING scan.started_at = asyncio.get_event_loop().time() db.commit() try: # 获取项目信息 project = scan.project if not project: raise Exception("项目不存在") # 解析扫描配置 scan_config = {} if scan.scan_config: scan_config = json.loads(scan.scan_config) # 执行代码分析 vulnerabilities_data = await self.analyzer_service.analyze_project( project_path=project.project_path, language=project.language, config=scan_config ) # 保存漏洞数据到数据库 await self._save_vulnerabilities(db, scan, vulnerabilities_data) # 更新扫描统计 scan.total_files = scan_config.get('total_files', 100) # 模拟文件数 scan.scanned_files = scan.total_files scan.total_vulnerabilities = len(vulnerabilities_data) scan.status = ScanStatus.COMPLETED scan.completed_at = asyncio.get_event_loop().time() # 生成结果摘要 summary = { 'total_vulnerabilities': scan.total_vulnerabilities, 'by_severity': {}, 'by_category': {} } for vuln_data in vulnerabilities_data: severity = vuln_data.get('severity', 'medium') category = vuln_data.get('category', 'maintainability') summary['by_severity'][severity] = summary['by_severity'].get(severity, 0) + 1 summary['by_category'][category] = summary['by_category'].get(category, 0) + 1 scan.result_summary = json.dumps(summary) db.commit() print(f"扫描完成: {scan_id}, 发现 {scan.total_vulnerabilities} 个漏洞") except Exception as e: # 扫描失败 scan.status = ScanStatus.FAILED scan.error_message = str(e) db.commit() print(f"扫描失败: {scan_id}, 错误: {str(e)}") finally: db.close() async def _save_vulnerabilities(self, db: Session, scan: Scan, vulnerabilities_data: List[dict]): """保存漏洞数据到数据库""" for vuln_data in vulnerabilities_data: # 映射严重程度 severity_mapping = { 'critical': SeverityLevel.CRITICAL, 'high': SeverityLevel.HIGH, 'medium': SeverityLevel.MEDIUM, 'low': SeverityLevel.LOW, 'info': SeverityLevel.INFO } # 映射分类 category_mapping = { 'security': VulnerabilityCategory.SECURITY, 'performance': VulnerabilityCategory.PERFORMANCE, 'maintainability': VulnerabilityCategory.MAINTAINABILITY, 'reliability': VulnerabilityCategory.RELIABILITY, 'usability': VulnerabilityCategory.USABILITY } vulnerability = Vulnerability( scan_id=scan.id, rule_id=vuln_data.get('rule_id', 'unknown'), message=vuln_data.get('message', ''), category=category_mapping.get(vuln_data.get('category', 'maintainability'), VulnerabilityCategory.MAINTAINABILITY), severity=severity_mapping.get(vuln_data.get('severity', 'medium'), SeverityLevel.MEDIUM), file_path=vuln_data.get('file_path', ''), line_number=vuln_data.get('line_number'), column_number=vuln_data.get('column_number'), end_line=vuln_data.get('end_line'), end_column=vuln_data.get('end_column'), code_snippet=vuln_data.get('code_snippet', ''), context_before=vuln_data.get('context_before', ''), context_after=vuln_data.get('context_after', ''), ai_enhanced=vuln_data.get('ai_enhanced', False), ai_confidence=vuln_data.get('ai_confidence'), ai_suggestion=vuln_data.get('ai_suggestion', ''), status=VulnerabilityStatus.OPEN ) db.add(vulnerability) db.commit() def get_scan_progress(self, scan_id: int) -> dict: """获取扫描进度""" db = SessionLocal() try: scan = db.query(Scan).filter(Scan.id == scan_id).first() if not scan: return {'error': '扫描任务不存在'} progress = 0 if scan.total_files > 0: progress = (scan.scanned_files / scan.total_files) * 100 return { 'scan_id': scan_id, 'status': scan.status.value, 'progress': progress, 'total_files': scan.total_files, 'scanned_files': scan.scanned_files, 'total_vulnerabilities': scan.total_vulnerabilities, 'started_at': scan.started_at, 'completed_at': scan.completed_at, 'error_message': scan.error_message } finally: db.close()