You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

221 lines
8.2 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

"""
报告生成服务
"""
import os
import json
import pandas as pd
from jinja2 import Template
from typing import Dict, Any
from datetime import datetime
from app.models.scan import Scan
from app.models.project import Project
class ReportService:
"""报告生成服务"""
def __init__(self):
self.templates_dir = "app/templates"
self.reports_dir = "reports"
os.makedirs(self.reports_dir, exist_ok=True)
async def generate_html_report(self, scan: Scan) -> str:
"""生成HTML报告"""
# 准备报告数据
report_data = await self._prepare_report_data(scan)
# 读取HTML模板
template_path = os.path.join(self.templates_dir, "scan_report.html")
with open(template_path, 'r', encoding='utf-8') as f:
template_content = f.read()
# 渲染模板
template = Template(template_content)
html_content = template.render(**report_data)
# 保存HTML文件
report_filename = f"scan_report_{scan.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
report_path = os.path.join(self.reports_dir, report_filename)
with open(report_path, 'w', encoding='utf-8') as f:
f.write(html_content)
return report_path
async def generate_pdf_report(self, scan: Scan) -> str:
"""生成PDF报告"""
# 先生成HTML报告
html_path = await self.generate_html_report(scan)
# 按需导入WeasyPrint并给出友好降级
try:
from weasyprint import HTML # type: ignore
except Exception as exc: # ImportError 或底层依赖缺失
raise RuntimeError(
"PDF 导出所需依赖缺失WeasyPrint 及其系统库)。" \
"请先使用 HTML/Excel/JSON 导出,或按安装指南配置 WeasyPrint。原始错误: " + str(exc)
)
# 转换为PDF
pdf_filename = f"scan_report_{scan.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.pdf"
pdf_path = os.path.join(self.reports_dir, pdf_filename)
HTML(filename=html_path).write_pdf(pdf_path)
return pdf_path
async def generate_json_report(self, scan: Scan) -> Dict[str, Any]:
"""生成JSON报告"""
report_data = await self._prepare_report_data(scan)
return report_data
async def generate_excel_report(self, scan: Scan) -> str:
"""生成Excel报告"""
# 获取漏洞数据
vulnerabilities = scan.vulnerabilities
# 准备Excel数据
excel_data = []
for vuln in vulnerabilities:
excel_data.append({
'ID': vuln.id,
'规则ID': vuln.rule_id,
'严重程度': vuln.severity.value,
'分类': vuln.category.value,
'文件路径': vuln.file_path,
'行号': vuln.line_number,
'描述': vuln.message,
'AI增强': '' if vuln.ai_enhanced else '',
'AI置信度': vuln.ai_confidence,
'AI建议': vuln.ai_suggestion,
'状态': vuln.status.value,
'创建时间': vuln.created_at.strftime('%Y-%m-%d %H:%M:%S')
})
# 创建DataFrame
df = pd.DataFrame(excel_data)
# 保存Excel文件
excel_filename = f"scan_report_{scan.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
excel_path = os.path.join(self.reports_dir, excel_filename)
with pd.ExcelWriter(excel_path, engine='openpyxl') as writer:
df.to_excel(writer, sheet_name='漏洞详情', index=False)
# 添加统计信息表
stats_data = await self._generate_stats_data(scan)
stats_df = pd.DataFrame(stats_data)
stats_df.to_excel(writer, sheet_name='统计信息', index=False)
return excel_path
async def generate_project_html_report(self, project: Project, latest_scan: Scan) -> str:
"""生成项目汇总报告"""
# 准备项目报告数据
report_data = await self._prepare_project_report_data(project, latest_scan)
# 读取项目报告模板
template_path = os.path.join(self.templates_dir, "project_report.html")
with open(template_path, 'r', encoding='utf-8') as f:
template_content = f.read()
# 渲染模板
template = Template(template_content)
html_content = template.render(**report_data)
# 保存HTML文件
report_filename = f"project_report_{project.id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.html"
report_path = os.path.join(self.reports_dir, report_filename)
with open(report_path, 'w', encoding='utf-8') as f:
f.write(html_content)
return report_path
async def generate_project_json_report(self, project: Project, latest_scan: Scan) -> Dict[str, Any]:
"""生成项目JSON报告"""
report_data = await self._prepare_project_report_data(project, latest_scan)
return report_data
async def _prepare_report_data(self, scan: Scan) -> Dict[str, Any]:
"""准备报告数据"""
vulnerabilities = scan.vulnerabilities
# 按严重程度分组
by_severity = {}
by_category = {}
for vuln in vulnerabilities:
severity = vuln.severity.value
category = vuln.category.value
if severity not in by_severity:
by_severity[severity] = []
by_severity[severity].append(vuln)
if category not in by_category:
by_category[category] = []
by_category[category].append(vuln)
return {
'scan': scan,
'project': scan.project,
'vulnerabilities': vulnerabilities,
'by_severity': by_severity,
'by_category': by_category,
'total_vulnerabilities': len(vulnerabilities),
'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
async def _prepare_project_report_data(self, project: Project, latest_scan: Scan) -> Dict[str, Any]:
"""准备项目报告数据"""
vulnerabilities = latest_scan.vulnerabilities
# 统计信息
total_vulnerabilities = len(vulnerabilities)
critical_count = len([v for v in vulnerabilities if v.severity.value == 'critical'])
high_count = len([v for v in vulnerabilities if v.severity.value == 'high'])
medium_count = len([v for v in vulnerabilities if v.severity.value == 'medium'])
low_count = len([v for v in vulnerabilities if v.severity.value == 'low'])
return {
'project': project,
'latest_scan': latest_scan,
'vulnerabilities': vulnerabilities,
'stats': {
'total': total_vulnerabilities,
'critical': critical_count,
'high': high_count,
'medium': medium_count,
'low': low_count
},
'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
}
async def _generate_stats_data(self, scan: Scan) -> list:
"""生成统计信息数据"""
vulnerabilities = scan.vulnerabilities
# 按严重程度统计
severity_stats = {}
for vuln in vulnerabilities:
severity = vuln.severity.value
severity_stats[severity] = severity_stats.get(severity, 0) + 1
# 按分类统计
category_stats = {}
for vuln in vulnerabilities:
category = vuln.category.value
category_stats[category] = category_stats.get(category, 0) + 1
stats_data = []
stats_data.append(['统计类型', '分类', '数量'])
stats_data.append(['严重程度', '总计', len(vulnerabilities)])
for severity, count in severity_stats.items():
stats_data.append(['严重程度', severity, count])
for category, count in category_stats.items():
stats_data.append(['分类', category, count])
return stats_data