You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
121 lines
4.4 KiB
121 lines
4.4 KiB
"""
|
|
仪表板服务
|
|
"""
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import func, desc
|
|
from app.models.project import Project
|
|
from app.models.scan import Scan
|
|
from app.models.vulnerability import Vulnerability, VulnerabilityStatus, SeverityLevel
|
|
|
|
class DashboardService:
|
|
"""仪表板服务"""
|
|
|
|
async def get_summary_data(self, db: Session) -> dict:
|
|
"""获取仪表板汇总数据"""
|
|
# 项目统计
|
|
total_projects = db.query(Project).filter(Project.is_active == True).count()
|
|
|
|
# 扫描统计
|
|
total_scans = db.query(Scan).count()
|
|
completed_scans = db.query(Scan).filter(Scan.status == "completed").count()
|
|
|
|
# 漏洞统计
|
|
total_vulnerabilities = db.query(Vulnerability).count()
|
|
fixed_vulnerabilities = db.query(Vulnerability).filter(
|
|
Vulnerability.status == VulnerabilityStatus.FIXED
|
|
).count()
|
|
|
|
# 按严重程度统计
|
|
severity_stats = db.query(
|
|
Vulnerability.severity,
|
|
func.count(Vulnerability.id).label('count')
|
|
).group_by(Vulnerability.severity).all()
|
|
|
|
severity_summary = {}
|
|
for severity, count in severity_stats:
|
|
severity_summary[severity.value] = count
|
|
|
|
# 最近发现的漏洞
|
|
recent_vulnerabilities = db.query(Vulnerability).order_by(
|
|
desc(Vulnerability.created_at)
|
|
).limit(10).all()
|
|
|
|
recent_vuln_data = []
|
|
for vuln in recent_vulnerabilities:
|
|
recent_vuln_data.append({
|
|
'id': vuln.id,
|
|
'project_name': vuln.scan.project.name if vuln.scan and vuln.scan.project else 'Unknown',
|
|
'category': vuln.category.value,
|
|
'severity': vuln.severity.value,
|
|
'file_path': vuln.file_path,
|
|
'message': vuln.message,
|
|
'created_at': vuln.created_at.isoformat()
|
|
})
|
|
|
|
# 项目漏洞统计
|
|
project_stats = db.query(
|
|
Project.name,
|
|
func.count(Vulnerability.id).label('vulnerability_count')
|
|
).join(Scan, Project.id == Scan.project_id).join(
|
|
Vulnerability, Scan.id == Vulnerability.scan_id
|
|
).group_by(Project.id, Project.name).order_by(
|
|
desc('vulnerability_count')
|
|
).limit(5).all()
|
|
|
|
project_vuln_data = []
|
|
for project_name, count in project_stats:
|
|
project_vuln_data.append({
|
|
'project_name': project_name,
|
|
'vulnerability_count': count
|
|
})
|
|
|
|
return {
|
|
'projects': total_projects,
|
|
'scans': total_scans,
|
|
'completed_scans': completed_scans,
|
|
'vulnerabilities': total_vulnerabilities,
|
|
'fixed': fixed_vulnerabilities,
|
|
'severity_summary': severity_summary,
|
|
'recent_vulnerabilities': recent_vuln_data,
|
|
'project_stats': project_vuln_data
|
|
}
|
|
|
|
async def get_trend_data(self, db: Session, days: int = 30) -> dict:
|
|
"""获取趋势数据"""
|
|
from datetime import datetime, timedelta
|
|
|
|
end_date = datetime.now()
|
|
start_date = end_date - timedelta(days=days)
|
|
|
|
# 每日扫描统计
|
|
daily_scans = db.query(
|
|
func.date(Scan.created_at).label('date'),
|
|
func.count(Scan.id).label('count')
|
|
).filter(
|
|
Scan.created_at >= start_date
|
|
).group_by(func.date(Scan.created_at)).all()
|
|
|
|
# 每日漏洞统计
|
|
daily_vulnerabilities = db.query(
|
|
func.date(Vulnerability.created_at).label('date'),
|
|
func.count(Vulnerability.id).label('count')
|
|
).filter(
|
|
Vulnerability.created_at >= start_date
|
|
).group_by(func.date(Vulnerability.created_at)).all()
|
|
|
|
return {
|
|
'daily_scans': [{'date': str(date), 'count': count} for date, count in daily_scans],
|
|
'daily_vulnerabilities': [{'date': str(date), 'count': count} for date, count in daily_vulnerabilities]
|
|
}
|
|
|
|
async def get_category_distribution(self, db: Session) -> dict:
|
|
"""获取漏洞分类分布"""
|
|
category_stats = db.query(
|
|
Vulnerability.category,
|
|
func.count(Vulnerability.id).label('count')
|
|
).group_by(Vulnerability.category).all()
|
|
|
|
return {
|
|
category.value: count for category, count in category_stats
|
|
}
|