|
|
"""
|
|
|
报告生成服务
|
|
|
"""
|
|
|
import os
|
|
|
import json
|
|
|
import pandas as pd
|
|
|
from jinja2 import Template
|
|
|
from typing import Dict, Any
|
|
|
from datetime import datetime
|
|
|
from app.models.scan import Scan
|
|
|
from app.models.project import Project
|
|
|
|
|
|
class ReportService:
|
|
|
"""报告生成服务"""
|
|
|
|
|
|
def __init__(self):
|
|
|
self.templates_dir = "app/templates"
|
|
|
self.reports_dir = "reports"
|
|
|
os.makedirs(self.reports_dir, exist_ok=True)
|
|
|
|
|
|
async def generate_html_report(self, scan: Scan) -> str:
|
|
|
"""生成HTML报告"""
|
|
|
# 准备报告数据
|
|
|
report_data = await self._prepare_report_data(scan)
|
|
|
|
|
|
# 读取HTML模板
|
|
|
template_path = os.path.join(self.templates_dir, "scan_report.html")
|
|
|
with open(template_path, 'r', encoding='utf-8') as f:
|
|
|
template_content = f.read()
|
|
|
|
|
|
# 渲染模板
|
|
|
template = Template(template_content)
|
|
|
html_content = template.render(**report_data)
|
|
|
|
|
|
# 保存HTML文件
|
|
|
report_filename = f"scan_report_{scan.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
|
|
|
report_path = os.path.join(self.reports_dir, report_filename)
|
|
|
|
|
|
with open(report_path, 'w', encoding='utf-8') as f:
|
|
|
f.write(html_content)
|
|
|
|
|
|
return report_path
|
|
|
|
|
|
async def generate_pdf_report(self, scan: Scan) -> str:
|
|
|
"""生成PDF报告"""
|
|
|
# 先生成HTML报告
|
|
|
html_path = await self.generate_html_report(scan)
|
|
|
|
|
|
# 按需导入WeasyPrint,并给出友好降级
|
|
|
try:
|
|
|
from weasyprint import HTML # type: ignore
|
|
|
except Exception as exc: # ImportError 或底层依赖缺失
|
|
|
raise RuntimeError(
|
|
|
"PDF 导出所需依赖缺失(WeasyPrint 及其系统库)。" \
|
|
|
"请先使用 HTML/Excel/JSON 导出,或按安装指南配置 WeasyPrint。原始错误: " + str(exc)
|
|
|
)
|
|
|
|
|
|
# 转换为PDF
|
|
|
pdf_filename = f"scan_report_{scan.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
|
|
|
pdf_path = os.path.join(self.reports_dir, pdf_filename)
|
|
|
|
|
|
HTML(filename=html_path).write_pdf(pdf_path)
|
|
|
|
|
|
return pdf_path
|
|
|
|
|
|
async def generate_json_report(self, scan: Scan) -> Dict[str, Any]:
|
|
|
"""生成JSON报告"""
|
|
|
report_data = await self._prepare_report_data(scan)
|
|
|
return report_data
|
|
|
|
|
|
async def generate_excel_report(self, scan: Scan) -> str:
|
|
|
"""生成Excel报告"""
|
|
|
# 获取漏洞数据
|
|
|
vulnerabilities = scan.vulnerabilities
|
|
|
|
|
|
# 准备Excel数据
|
|
|
excel_data = []
|
|
|
for vuln in vulnerabilities:
|
|
|
excel_data.append({
|
|
|
'ID': vuln.id,
|
|
|
'规则ID': vuln.rule_id,
|
|
|
'严重程度': vuln.severity.value,
|
|
|
'分类': vuln.category.value,
|
|
|
'文件路径': vuln.file_path,
|
|
|
'行号': vuln.line_number,
|
|
|
'描述': vuln.message,
|
|
|
'AI增强': '是' if vuln.ai_enhanced else '否',
|
|
|
'AI置信度': vuln.ai_confidence,
|
|
|
'AI建议': vuln.ai_suggestion,
|
|
|
'状态': vuln.status.value,
|
|
|
'创建时间': vuln.created_at.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
})
|
|
|
|
|
|
# 创建DataFrame
|
|
|
df = pd.DataFrame(excel_data)
|
|
|
|
|
|
# 保存Excel文件
|
|
|
excel_filename = f"scan_report_{scan.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
|
|
|
excel_path = os.path.join(self.reports_dir, excel_filename)
|
|
|
|
|
|
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
|
|
|
df.to_excel(writer, sheet_name='漏洞详情', index=False)
|
|
|
|
|
|
# 添加统计信息表
|
|
|
stats_data = await self._generate_stats_data(scan)
|
|
|
stats_df = pd.DataFrame(stats_data)
|
|
|
stats_df.to_excel(writer, sheet_name='统计信息', index=False)
|
|
|
|
|
|
return excel_path
|
|
|
|
|
|
async def generate_project_html_report(self, project: Project, latest_scan: Scan) -> str:
|
|
|
"""生成项目汇总报告"""
|
|
|
# 准备项目报告数据
|
|
|
report_data = await self._prepare_project_report_data(project, latest_scan)
|
|
|
|
|
|
# 读取项目报告模板
|
|
|
template_path = os.path.join(self.templates_dir, "project_report.html")
|
|
|
with open(template_path, 'r', encoding='utf-8') as f:
|
|
|
template_content = f.read()
|
|
|
|
|
|
# 渲染模板
|
|
|
template = Template(template_content)
|
|
|
html_content = template.render(**report_data)
|
|
|
|
|
|
# 保存HTML文件
|
|
|
report_filename = f"project_report_{project.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
|
|
|
report_path = os.path.join(self.reports_dir, report_filename)
|
|
|
|
|
|
with open(report_path, 'w', encoding='utf-8') as f:
|
|
|
f.write(html_content)
|
|
|
|
|
|
return report_path
|
|
|
|
|
|
async def generate_project_json_report(self, project: Project, latest_scan: Scan) -> Dict[str, Any]:
|
|
|
"""生成项目JSON报告"""
|
|
|
report_data = await self._prepare_project_report_data(project, latest_scan)
|
|
|
return report_data
|
|
|
|
|
|
async def _prepare_report_data(self, scan: Scan) -> Dict[str, Any]:
|
|
|
"""准备报告数据"""
|
|
|
vulnerabilities = scan.vulnerabilities
|
|
|
|
|
|
# 按严重程度分组
|
|
|
by_severity = {}
|
|
|
by_category = {}
|
|
|
|
|
|
for vuln in vulnerabilities:
|
|
|
severity = vuln.severity.value
|
|
|
category = vuln.category.value
|
|
|
|
|
|
if severity not in by_severity:
|
|
|
by_severity[severity] = []
|
|
|
by_severity[severity].append(vuln)
|
|
|
|
|
|
if category not in by_category:
|
|
|
by_category[category] = []
|
|
|
by_category[category].append(vuln)
|
|
|
|
|
|
return {
|
|
|
'scan': scan,
|
|
|
'project': scan.project,
|
|
|
'vulnerabilities': vulnerabilities,
|
|
|
'by_severity': by_severity,
|
|
|
'by_category': by_category,
|
|
|
'total_vulnerabilities': len(vulnerabilities),
|
|
|
'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
}
|
|
|
|
|
|
async def _prepare_project_report_data(self, project: Project, latest_scan: Scan) -> Dict[str, Any]:
|
|
|
"""准备项目报告数据"""
|
|
|
vulnerabilities = latest_scan.vulnerabilities
|
|
|
|
|
|
# 统计信息
|
|
|
total_vulnerabilities = len(vulnerabilities)
|
|
|
critical_count = len([v for v in vulnerabilities if v.severity.value == 'critical'])
|
|
|
high_count = len([v for v in vulnerabilities if v.severity.value == 'high'])
|
|
|
medium_count = len([v for v in vulnerabilities if v.severity.value == 'medium'])
|
|
|
low_count = len([v for v in vulnerabilities if v.severity.value == 'low'])
|
|
|
|
|
|
return {
|
|
|
'project': project,
|
|
|
'latest_scan': latest_scan,
|
|
|
'vulnerabilities': vulnerabilities,
|
|
|
'stats': {
|
|
|
'total': total_vulnerabilities,
|
|
|
'critical': critical_count,
|
|
|
'high': high_count,
|
|
|
'medium': medium_count,
|
|
|
'low': low_count
|
|
|
},
|
|
|
'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
|
}
|
|
|
|
|
|
async def _generate_stats_data(self, scan: Scan) -> list:
|
|
|
"""生成统计信息数据"""
|
|
|
vulnerabilities = scan.vulnerabilities
|
|
|
|
|
|
# 按严重程度统计
|
|
|
severity_stats = {}
|
|
|
for vuln in vulnerabilities:
|
|
|
severity = vuln.severity.value
|
|
|
severity_stats[severity] = severity_stats.get(severity, 0) + 1
|
|
|
|
|
|
# 按分类统计
|
|
|
category_stats = {}
|
|
|
for vuln in vulnerabilities:
|
|
|
category = vuln.category.value
|
|
|
category_stats[category] = category_stats.get(category, 0) + 1
|
|
|
|
|
|
stats_data = []
|
|
|
stats_data.append(['统计类型', '分类', '数量'])
|
|
|
stats_data.append(['严重程度', '总计', len(vulnerabilities)])
|
|
|
|
|
|
for severity, count in severity_stats.items():
|
|
|
stats_data.append(['严重程度', severity, count])
|
|
|
|
|
|
for category, count in category_stats.items():
|
|
|
stats_data.append(['分类', category, count])
|
|
|
|
|
|
return stats_data
|