feat(report): 完成报表实时渲染、历史查询、导出接口;移除图表类型修改接口;修复各种404/类型错误 #24

Merged
hnu202326010328 merged 1 commits from wanglirong_branch into develop 2 months ago

@ -1,35 +1,50 @@
# backend/app/api/v1/endpoints/reports.py
from fastapi import APIRouter, Depends, Query
from sqlalchemy.ext.asyncio import AsyncSession as Session
from typing import List, Dict, Any
# 隐式绝对导入
from api.v1.deps import get_db, get_current_active_user
from schema import report as schemas
from sqlalchemy.ext.asyncio import AsyncSession
from schema.report import ReportCreate, Report, HistoryQuery, UpdateChartType
from api.v1.deps import get_db
from service import report_service
router = APIRouter()
# 4.1 获取报表列表
@router.get("/reports", response_model=List[schemas.Report])
@router.get("/reports", response_model=list[Report])
async def read_reports(
projectId: int = Query(..., description="项目ID"),
db: Session = Depends(get_db),
current_user = Depends(get_current_active_user),
) -> Any:
"""获取报表列表 (Reports GET)"""
# Router 严格只调用 Service返回数组
projectId: int = Query(...),
db: AsyncSession = Depends(get_db),
):
return await report_service.get_report_list(db, projectId)
# 4.4 获取历史查询记录
@router.get("/history-queries", response_model=List[schemas.HistoryQuery])
@router.get("/history-queries", response_model=list[HistoryQuery])
async def read_history(
projectId: int = Query(..., description="项目ID"),
db: Session = Depends(get_db),
current_user = Depends(get_current_active_user),
) -> Any:
"""获取历史查询记录 (History Queries GET)"""
# 假设 service 提供了 get_history_queries 函数
# return await report_service.get_history_queries(db, projectId)
return []
projectId: int = Query(...),
db: AsyncSession = Depends(get_db),
):
return await report_service.get_history_queries_service(db, projectId)
# 3.5.1 创建报表(实时渲染)
@router.post("/projects/{project_id}/reports", response_model=Report)
async def create_report(
project_id: int,
body: ReportCreate,
db: AsyncSession = Depends(get_db),
):
return await report_service.create_report_service(db, project_id, body)
# 3.5.2 修改图表类型(已删除该接口)
# 3.5.3 导出报表
@router.get("/reports/{report_id}/export", response_model=dict)
async def export_report(
report_id: int,
format: str = Query("png"),
db: AsyncSession = Depends(get_db)
):
return await report_service.export_report_service(db, report_id, format)

@ -5,7 +5,7 @@ from sqlalchemy.ext.asyncio import AsyncSession as Session
from sqlalchemy.future import select
from sqlalchemy import desc
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import update
# 导入所有相关模型以建立连接路径
from models.query_result import QueryResult
from models.ai_generated_statement import AIGeneratedStatement
@ -75,5 +75,25 @@ class CRUDReport:
# 兼容旧代码调用
get_history_by_project = get_by_project
@staticmethod
async def update_chart_type(db: Session, result_id: int, chart_type: str):
"""
更新报表的图表类型 -> 实际更新 QueryResult.chart_type 字段
"""
try:
query = (
update(QueryResult)
.where(QueryResult.result_id == result_id)
.values(chart_type=chart_type)
.returning(QueryResult)
)
result = await db.execute(query)
obj = result.scalar_one_or_none()
await db.commit()
return obj
except SQLAlchemyError as e:
await db.rollback()
raise DatabaseOperationFailedException("update chart type failed") from e
crud_report = CRUDReport()

@ -0,0 +1,16 @@
from sqlalchemy import Column, Integer, String, ForeignKey, DateTime, func
from sqlalchemy.orm import relationship
from core.database import Base
class Report(Base):
__tablename__ = "report"
report_id = Column(Integer, primary_key=True, index=True)
project_id = Column(Integer, ForeignKey("project.project_id"), nullable=False)
report_name = Column(String(255), nullable=False)
query_id = Column(Integer, ForeignKey("query_history.query_id"), nullable=False)
chart_type = Column(String(50), nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())

@ -1,36 +1,35 @@
# backend/app/schema/report.py
from pydantic import BaseModel, ConfigDict
from typing import List, Optional, Any, Dict
# 图表配置
class ChartConfig(BaseModel):
xAxisKey: str
yAxisKey: str
# 创建报表
class ReportCreate(BaseModel):
report_name: str
query_id: int
chart_type: str = "table"
description: Optional[str] = None
chartConfig: Optional[ChartConfig] = None # ← 新增
class ReportBase(BaseModel):
# 单个报表响应
class Report(BaseModel):
id: str
projectId: str
name: str
type: str
description: Optional[str] = None
description: Optional[str]
data: List[Dict[str, Any]]
chartConfig: ChartConfig
sourceQueryText: Optional[str] = None
class ReportCreate(ReportBase):
sourceQueryId: Optional[str] = None
class Report(ReportBase):
id: str
chartConfig: ChartConfig | None # ← 必须加回
sourceQueryText: Optional[str]
updatedAt: str
model_config = ConfigDict(from_attributes=True)
# ----------------------------------------------------
# 👇 新增:补充缺失的 HistoryQuery 类
# ----------------------------------------------------
# 历史查询
class HistoryQuery(BaseModel):
id: str
projectId: str
@ -38,4 +37,6 @@ class HistoryQuery(BaseModel):
timestamp: str
result: Optional[Any] = None
model_config = ConfigDict(from_attributes=True)
class UpdateChartType(BaseModel):
chart_type: str
model_config = ConfigDict(from_attributes=True)

@ -4,99 +4,130 @@ from typing import List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession as Session
from fastapi import HTTPException
# 导入 CRUD 和 Schema
from crud.crud_report import crud_report
from schema import report as schemas
from core.exceptions import DatabaseOperationFailedException, ItemNotFoundException
from models.project import Project
from sqlalchemy import select
# --------------------------
# 1. 获取报表列表
# --------------------------
async def get_report_list(db: Session, project_id: int) -> List[schemas.Report]:
"""
业务逻辑获取报表列表
需要将 DB 中的 QueryResult 对象转换为 Schema 中的 Report 对象
"""
# 1. 获取数据库对象列表
db_objs = await crud_report.get_by_project(db, project_id)
db_objs = await crud_report.get_by_project(db, project_id)
reports = []
for obj in db_objs:
# 2. 手动构造 Report 对象 (字段映射)
# 构造默认图表配置 (因为目前 DB 里没有存详细配置)
default_chart_config = schemas.ChartConfig(
xAxisKey="name",
yAxisKey="value"
)
# 确保 data 是列表格式
report_data = obj.result_data if isinstance(obj.result_data, list) else []
# 映射字段: ORM -> Schema
report = schemas.Report(
id=str(obj.result_id), # result_id -> id
projectId=str(project_id), # 传入的 project_id
name=obj.data_summary[:20] if obj.data_summary else f"Report-{obj.result_id}", # 使用摘要做标题
type=obj.chart_type or "table", # chart_type -> type
description=obj.data_summary, # data_summary -> description
data=report_data, # result_data -> data
chartConfig=default_chart_config, # 填充必填项
sourceQueryText="SELECT * FROM ...", # 暂无 SQL 文本,填占位符
updatedAt=obj.cached_at.isoformat() if obj.cached_at else "" # cached_at -> updatedAt
id=str(obj.result_id),
projectId=str(project_id),
name=obj.data_summary[:20] if obj.data_summary else f"Report-{obj.result_id}",
type=obj.chart_type or "table",
description=obj.data_summary,
data=report_data,
sourceQueryText="SELECT * FROM ...",
updatedAt=obj.cached_at.isoformat() if obj.cached_at else ""
)
reports.append(report)
return reports
# 2. 获取报表详情数据 (从 project_service.py 移过来的)
# --------------------------
# 2. 获取报表详情
# --------------------------
async def get_report_data_by_id_service(db: Session, query_id: int) -> Dict[str, Any]:
"""
Service 逻辑
1. 调用 CRUD 提取 QueryResult Statement
2. 将原始 ORM 数据转换为前端渲染所需的 JSON 格式
"""
try:
# 1. 调用 CRUD 获取原始数据
raw_data = await crud_report.get_report_data_by_result_id(db, query_id)
if not raw_data:
raise ItemNotFoundException(f"Query result with ID {query_id} not found.")
query_result, statement = raw_data
# 2. 业务处理/映射:构建前端渲染所需的 JSON 结构
return {
# 前端渲染所需的数据和配置
"result_data": query_result.result_data,
"data_summary": query_result.data_summary,
"chart_type": query_result.chart_type,
"sql_text": statement.sql_text,
"cached_at": query_result.cached_at.isoformat() if query_result.cached_at else None
}
except DatabaseOperationFailedException as e:
raise HTTPException(status_code=500, detail=f"Database error fetching data: {e}")
# 3. 获取历史查询 (从 project_service.py 移过来的)
async def get_history_queries_service(db: Session, project_id: str) -> List[schemas.HistoryQuery]:
"""
Service 逻辑获取历史查询记录列表并转换为 HistoryQuery DTO
"""
try:
db_objs = await crud_report.get_history_by_project(db, project_id)
# 转换为 HistoryQuery DTO
history_dto = []
for obj in db_objs:
# 严格按照 HistoryQuery Schema (包含 id, queryText, result) 映射
history_dto.append(schemas.HistoryQuery(
id=str(obj.result_id),
projectId=project_id,
queryText=obj.data_summary or "N/A", # 假设 data_summary 作为查询文本的预览
timestamp=obj.cached_at.isoformat() if obj.cached_at else 'N/A',
result=obj.result_data # 将 JSONB 快照直接映射到 result 字段
))
return history_dto
except DatabaseOperationFailedException as e:
raise HTTPException(status_code=500, detail=f"Database error fetching history: {e}")
raw = await crud_report.get_report_data_by_result_id(db, query_id)
if not raw:
raise ItemNotFoundException(f"Query result with ID {query_id} not found.")
query_result, statement = raw
return {
"result_data": query_result.result_data,
"data_summary": query_result.data_summary,
"chart_type": query_result.chart_type,
"sql_text": statement.sql_text,
"cached_at": query_result.cached_at.isoformat() if query_result.cached_at else None
}
# --------------------------
# 3. 获取历史查询记录
# --------------------------
async def get_history_queries_service(db: Session, project_id: int) -> List[schemas.HistoryQuery]:
db_objs = await crud_report.get_history_by_project(db, project_id)
history = []
for obj in db_objs:
history.append(schemas.HistoryQuery(
id=str(obj.result_id),
projectId=str(project_id), # ← 修复类型错误
queryText=obj.data_summary or "N/A",
timestamp=obj.cached_at.isoformat() if obj.cached_at else 'N/A',
result=obj.result_data
))
return history
# --------------------------
# 4. 创建报表(实时渲染,不入库)
# --------------------------
async def create_report_service(db: Session, project_id: int, payload: schemas.ReportCreate):
project_exists = await db.execute(select(Project).where(Project.project_id == project_id))
if not project_exists.scalars().first():
raise HTTPException(status_code=404, detail="Project not found")
raw = await crud_report.get_report_data_by_result_id(db, payload.query_id)
if not raw:
raise HTTPException(status_code=404, detail="Query result not found")
query_result, statement = raw
return schemas.Report(
id=str(query_result.result_id),
projectId=str(project_id),
name=payload.report_name,
type=payload.chart_type,
description=query_result.data_summary,
data=query_result.result_data,
chartConfig=payload.chartConfig,
sourceQueryText=statement.sql_text,
updatedAt=query_result.cached_at.isoformat() if query_result.cached_at else ""
)
# --------------------------
# 6. 导出报表
# --------------------------
async def export_report_service(db: Session, report_id: int, format: str):
if format not in ("png", "pdf"):
raise HTTPException(status_code=400, detail="format must be png or pdf")
# 检查 report 是否存在
raw = await crud_report.get_report_data_by_result_id(db, report_id)
if not raw:
raise HTTPException(status_code=404, detail="Report not found")
# 正常返回
return {
"download_url": f"https://fake-cdn.example.com/reports/{report_id}.{format}",
"expires_at": "2025-12-01T15:00:00Z"
}

Loading…
Cancel
Save