You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
4.4 KiB

"""
仪表板服务
"""
from sqlalchemy.orm import Session
from sqlalchemy import func, desc
from app.models.project import Project
from app.models.scan import Scan
from app.models.vulnerability import Vulnerability, VulnerabilityStatus, SeverityLevel
class DashboardService:
"""仪表板服务"""
async def get_summary_data(self, db: Session) -> dict:
"""获取仪表板汇总数据"""
# 项目统计
total_projects = db.query(Project).filter(Project.is_active == True).count()
# 扫描统计
total_scans = db.query(Scan).count()
completed_scans = db.query(Scan).filter(Scan.status == "completed").count()
# 漏洞统计
total_vulnerabilities = db.query(Vulnerability).count()
fixed_vulnerabilities = db.query(Vulnerability).filter(
Vulnerability.status == VulnerabilityStatus.FIXED
).count()
# 按严重程度统计
severity_stats = db.query(
Vulnerability.severity,
func.count(Vulnerability.id).label('count')
).group_by(Vulnerability.severity).all()
severity_summary = {}
for severity, count in severity_stats:
severity_summary[severity.value] = count
# 最近发现的漏洞
recent_vulnerabilities = db.query(Vulnerability).order_by(
desc(Vulnerability.created_at)
).limit(10).all()
recent_vuln_data = []
for vuln in recent_vulnerabilities:
recent_vuln_data.append({
'id': vuln.id,
'project_name': vuln.scan.project.name if vuln.scan and vuln.scan.project else 'Unknown',
'category': vuln.category.value,
'severity': vuln.severity.value,
'file_path': vuln.file_path,
'message': vuln.message,
'created_at': vuln.created_at.isoformat()
})
# 项目漏洞统计
project_stats = db.query(
Project.name,
func.count(Vulnerability.id).label('vulnerability_count')
).join(Scan, Project.id == Scan.project_id).join(
Vulnerability, Scan.id == Vulnerability.scan_id
).group_by(Project.id, Project.name).order_by(
desc('vulnerability_count')
).limit(5).all()
project_vuln_data = []
for project_name, count in project_stats:
project_vuln_data.append({
'project_name': project_name,
'vulnerability_count': count
})
return {
'projects': total_projects,
'scans': total_scans,
'completed_scans': completed_scans,
'vulnerabilities': total_vulnerabilities,
'fixed': fixed_vulnerabilities,
'severity_summary': severity_summary,
'recent_vulnerabilities': recent_vuln_data,
'project_stats': project_vuln_data
}
async def get_trend_data(self, db: Session, days: int = 30) -> dict:
"""获取趋势数据"""
from datetime import datetime, timedelta
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# 每日扫描统计
daily_scans = db.query(
func.date(Scan.created_at).label('date'),
func.count(Scan.id).label('count')
).filter(
Scan.created_at >= start_date
).group_by(func.date(Scan.created_at)).all()
# 每日漏洞统计
daily_vulnerabilities = db.query(
func.date(Vulnerability.created_at).label('date'),
func.count(Vulnerability.id).label('count')
).filter(
Vulnerability.created_at >= start_date
).group_by(func.date(Vulnerability.created_at)).all()
return {
'daily_scans': [{'date': str(date), 'count': count} for date, count in daily_scans],
'daily_vulnerabilities': [{'date': str(date), 'count': count} for date, count in daily_vulnerabilities]
}
async def get_category_distribution(self, db: Session) -> dict:
"""获取漏洞分类分布"""
category_stats = db.query(
Vulnerability.category,
func.count(Vulnerability.id).label('count')
).group_by(Vulnerability.category).all()
return {
category.value: count for category, count in category_stats
}