完成公告 CRUD、公告列表和详情接口 #23

Merged
hnu202326010328 merged 1 commits from wanglirong_branch into develop 5 months ago

@ -1,33 +1,33 @@
from fastapi import APIRouter
# 确保所有 endpoints 文件都被正确导入(相对导入是关键)
from .endpoints import auth, project, reports, user, admin,knowledge,chat,session,message
# 引入所有 endpoints
from .endpoints import (
auth,
project,
reports,
user,
admin,
knowledge,
chat,
session,
announcement # <--- 1. 导入新模块
)
api_router = APIRouter()
# ----------------------------------------------------
# 路由注册区:必须包含所有模块的 include_router 语句
# 路由注册区
# ----------------------------------------------------
# 1. 认证模块 (已存在)
api_router.include_router(auth.router, prefix="/auth", tags=["I. 认证与授权"])
# 2. 项目管理模块 (新增)
api_router.include_router(project.router, prefix="/projects", tags=["II. 项目管理"])
# 3. 报表查询模块 (新增)
# 注意reports.py 文件中的路由路径包含了 /reports 和 /history-queries
# 所以这里注册时无需再添加 prefix直接挂载即可。
api_router.include_router(reports.router, tags=["III. 报表与查询"])
#4. 业务术语库 (修正:直接挂载,因为内部定义了完整路径)
api_router.include_router(knowledge.router, tags=["IV. 业务术语表"])
# 5. 用户设置模块 (新增)
api_router.include_router(user.router, prefix="/user", tags=["V. 用户设置"])
#6. 管理员模块 (新增)
api_router.include_router(admin.router, tags=["VI. 管理员功能"])
#7. 对话与消息管理模块 (新增)
api_router.include_router(chat.router, tags=["VII. 对话与消息管理"])
api_router.include_router(session.router, prefix="/sessions", tags=["VIII. 会话管理"])
# 8.这里是“资源”:管理会话容器
api_router.include_router(session.router, prefix="/sessions", tags=["VIII. 会话管理"])
# 2. 注册公告模块
# 这里的 prefix="/announcements" 完美对应了你文档中的 GET /announcements
api_router.include_router(announcement.router, prefix="/announcements", tags=["IX. 系统公告"])

@ -0,0 +1,54 @@
from typing import Any
from fastapi import APIRouter, Depends, Query, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from api.v1.deps import get_db
from crud.crud_announcement import crud_announcement
from schema.announcement import (
AnnouncementListResponse,
AnnouncementDetailResponse
)
router = APIRouter()
# ============================
# 1. 获取公告列表
# ============================
@router.get("/", response_model=AnnouncementListResponse, summary="获取公告列表")
async def read_announcements(
db: AsyncSession = Depends(get_db),
page: int = Query(1, ge=1, description="页码"),
page_size: int = Query(10, ge=1, le=100, description="每页数量"),
status: str = Query("published", description="公告状态"),
) -> Any:
total, items = await crud_announcement.get_list(
db=db,
status=status,
page=page,
page_size=page_size
)
return AnnouncementListResponse(
total=total,
page=page,
page_size=page_size,
items=items
)
# ============================
# 2. 获取公告详情
# ============================
@router.get("/{announcement_id}", response_model=AnnouncementDetailResponse, summary="获取公告详情")
async def get_announcement_detail(
announcement_id: int,
db: AsyncSession = Depends(get_db)
):
announcement = await crud_announcement.get(db, announcement_id)
if not announcement:
raise HTTPException(status_code=404, detail="Announcement not found")
return announcement

@ -1,45 +1,82 @@
# backend/app/crud/crud_announcement.py
from typing import Optional, List, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy import update, delete
from sqlalchemy import select, update, delete, func
from sqlalchemy.exc import SQLAlchemyError
from models.system_announcement import SystemAnnouncement as Announcement # 假设 ORM 模型名是 SystemAnnouncement
from models.system_announcement import SystemAnnouncement as Announcement
from core.exceptions import DatabaseOperationFailedException
class CRUDAnnouncement:
@staticmethod
async def get_list(
db: AsyncSession,
status: str | None,
page: int,
page_size: int
):
"""获取公告列表(用户端使用)"""
query = select(Announcement)
if status:
query = query.where(Announcement.status == status)
# total count
count_query = select(func.count()).select_from(query.subquery())
total = (await db.execute(count_query)).scalar()
# pagination
query = (
query.order_by(Announcement.created_at.desc())
.offset((page - 1) * page_size)
.limit(page_size)
)
rows = (await db.execute(query)).scalars().all()
return total, rows
async def get(self, db: AsyncSession, announcement_id: int):
"""获取公告详情"""
query = select(Announcement).where(Announcement.announcement_id == announcement_id)
result = await db.execute(query)
return result.scalar_one_or_none()
@staticmethod
async def create(db: AsyncSession, **kwargs) -> Announcement:
"""创建新公告 (4.2.1)"""
try:
db_obj = Announcement(**kwargs)
db.add(db_obj)
obj = Announcement(**kwargs)
db.add(obj)
await db.commit()
await db.refresh(db_obj)
return db_obj
await db.refresh(obj)
return obj
except SQLAlchemyError as e:
await db.rollback()
raise DatabaseOperationFailedException("create announcement") from e
@staticmethod
async def update(db: AsyncSession, announcement_id: int, update_data: Dict[str, Any]) -> Optional[Announcement]:
"""更新公告 (4.2.2)"""
async def update(db: AsyncSession, announcement_id: int, update_data: Dict[str, Any]):
try:
query = update(Announcement).where(Announcement.announcement_id == announcement_id).values(**update_data).returning(Announcement)
query = (
update(Announcement)
.where(Announcement.announcement_id == announcement_id)
.values(**update_data)
.returning(Announcement)
)
result = await db.execute(query)
updated_announcement = result.scalar_one_or_none()
obj = result.scalar_one_or_none()
await db.commit()
return updated_announcement
return obj
except SQLAlchemyError as e:
await db.rollback()
raise DatabaseOperationFailedException("update announcement") from e
@staticmethod
async def remove(db: AsyncSession, announcement_id: int) -> bool:
"""删除公告 (4.2.3)"""
try:
query = delete(Announcement).where(Announcement.announcement_id == announcement_id)
query = delete(Announcement).where(
Announcement.announcement_id == announcement_id
)
result = await db.execute(query)
await db.commit()
return result.rowcount > 0
@ -47,4 +84,6 @@ class CRUDAnnouncement:
await db.rollback()
raise DatabaseOperationFailedException("delete announcement") from e
curd_announcement = CRUDAnnouncement()
# 正确实例名称
crud_announcement = CRUDAnnouncement()

@ -0,0 +1,34 @@
from pydantic import BaseModel, ConfigDict
from datetime import datetime
from typing import List
class AnnouncementBase(BaseModel):
title: str
content: str
status: str
class AnnouncementResponse(BaseModel):
announcement_id: int
title: str
content: str
status: str
created_at: datetime
# Pydantic v2 版本的 orm_mode
model_config = ConfigDict(from_attributes=True)
class AnnouncementListResponse(BaseModel):
total: int
page: int
page_size: int
items: List[AnnouncementResponse]
model_config = ConfigDict(from_attributes=True)
class AnnouncementDetailResponse(AnnouncementResponse):
"""公告详情"""
model_config = ConfigDict(from_attributes=True)

@ -26,7 +26,7 @@ from schema.admin import (
AdminStatsResponse
)
from crud.crud_user_account import curd_user_account
from crud.crud_announcement import curd_announcement
from crud.crud_announcement import crud_announcement
from crud.crud_admin_data import curd_admin_data
@ -92,7 +92,7 @@ async def update_user_quota_service(db: AsyncSession, user_id: int,
async def create_announcement_service(db: AsyncSession, data: AnnouncementCreateRequest,
admin_user_id: int) -> AnnouncementResponse:
"""创建新公告 (4.2.1)"""
announcement_orm = await curd_announcement.create(db, created_by=admin_user_id, **data.model_dump())
announcement_orm = await crud_announcement.create(db, created_by=admin_user_id, **data.model_dump())
return AnnouncementResponse.model_validate(announcement_orm)
@ -100,7 +100,7 @@ async def update_announcement_service(db: AsyncSession, announcement_id: int,
data: AnnouncementUpdateRequest) -> AnnouncementResponse:
"""更新公告内容和状态 (4.2.2)"""
update_data = data.model_dump(exclude_unset=True)
announcement_orm = await curd_announcement.update(db, announcement_id=announcement_id, update_data=update_data)
announcement_orm = await crud_announcement.update(db, announcement_id=announcement_id, update_data=update_data)
if not announcement_orm:
raise ItemNotFoundException("Announcement not found.")
return AnnouncementResponse.model_validate(announcement_orm)
@ -108,7 +108,7 @@ async def update_announcement_service(db: AsyncSession, announcement_id: int,
async def delete_announcement_service(db: AsyncSession, announcement_id: int) -> None:
"""删除公告 (4.2.3)"""
success = await curd_announcement.remove(db, announcement_id)
success = await crud_announcement.remove(db, announcement_id)
if not success:
raise ItemNotFoundException("Announcement not found.")

@ -0,0 +1,44 @@
from sqlalchemy.ext.asyncio import AsyncSession
from crud.crud_announcement import crud_announcement
class AnnouncementService:
@staticmethod
async def get_announcement_list(
db: AsyncSession,
status: str | None,
page: int,
page_size: int
):
"""
获取公告列表
"""
total, items = await crud_announcement.get_list(
db=db,
status=status,
page=page,
page_size=page_size
)
return {
"total": total,
"page": page,
"page_size": page_size,
"items": items
}
@staticmethod
async def get_announcement_detail(
db: AsyncSession,
announcement_id: int
):
"""
获取公告详情
"""
announcement = await crud_announcement.get(db, announcement_id)
return announcement
# 单例实例
announcement_service = AnnouncementService()
Loading…
Cancel
Save