You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

851 lines
26 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
import sys
import os
# 添加当前目录到路径
sys.path.insert(0, os.path.dirname(__file__))
from database import SessionLocal
from models import (
GuestSummary, HousekeepingList, RoomInventoryStats,
Customer, Order, Room, RoomType, Employee
)
import uvicorn
# ===== 请求模型 =====
class CustomerCreate(BaseModel):
name: str
phone: str
id_card: str
class CustomerUpdate(BaseModel):
name: str = None
phone: str = None
class EmployeeCreate(BaseModel):
name: str
role: str
permission: str
phone: str
account: str
password: str
class EmployeeUpdate(BaseModel):
name: str = None
role: str = None
phone: str = None
class OrderCreate(BaseModel):
customer_id: int
checkin_date: str
checkout_date: str
status: str = "pending"
channel: str = "online"
class OrderUpdate(BaseModel):
status: str = None
checkout_date: str = None
class RoomUpdate(BaseModel):
status: str
# ===== 新业务流程模型 =====
class CheckinData(BaseModel):
"""入住登记数据"""
name: str
phone: str
id_card: str
gender: str = "M"
room_id: int
nights: int
checkin_date: str
price: float
class CheckoutData(BaseModel):
"""退房数据"""
order_id: str
room_id: int
room_fee: float
additional_fee: float
payment_method: str
remarks: str = ""
class ServiceRequestData(BaseModel):
"""服务请求"""
room_id: int
service_type: str
description: str
priority: str = ""
class ReservationData(BaseModel):
"""预订数据"""
guest_name: str
phone: str
checkin_date: str
checkout_date: str
room_type: str
channel: str
class BillData(BaseModel):
"""账单数据"""
order_id: str
room_fee: float
service_fee: float = 0
other_fee: float = 0
payment_method: str = ""
status: str = "未支付"
app = FastAPI(title="宾馆管理系统接口")
# 解决跨域问题,确保 index.html 可以访问
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
def strip_str(value):
"""处理字符串的空格和 NULL"""
if value is None:
return ""
if isinstance(value, str):
return value.strip()
return str(value)
# ===== 宾客管理相关 API =====
@app.get("/api/guests")
def get_guest_list():
"""获取当前入住的所有宾客信息(来自视图 v_guest_stay_summary"""
db = SessionLocal()
try:
results = db.query(GuestSummary).all()
payload = []
for g in results:
payload.append({
"order_id": strip_str(g.order_id),
"name": strip_str(g.guest_name) or "匿名",
"phone": strip_str(g.phone),
"room": g.room_number,
"type": strip_str(g.room_type) or "未知",
"orientation": strip_str(g.orientation) or "未设定",
"checkin": g.checkin_date.strftime("%Y-%m-%d %H:%M") if g.checkin_date else "N/A",
"status": strip_str(g.order_status) or "未知",
"preferences": strip_str(g.all_preferences) or "无特殊要求"
})
return payload
except Exception as e:
raise HTTPException(status_code=500, detail=f"宾客查询失败: {str(e)}")
finally:
db.close()
@app.get("/api/customers")
def get_customers():
"""获取所有客户信息"""
db = SessionLocal()
try:
results = db.query(Customer).all()
payload = []
for c in results:
payload.append({
"customer_id": c.customer_id,
"name": strip_str(c.name),
"phone": strip_str(c.phone),
"id_card": strip_str(c.id_card),
"register_time": c.register_time.strftime("%Y-%m-%d %H:%M") if c.register_time else "N/A"
})
return payload
except Exception as e:
raise HTTPException(status_code=500, detail=f"客户查询失败: {str(e)}")
finally:
db.close()
@app.get("/api/orders")
def get_orders():
"""获取所有订单"""
db = SessionLocal()
try:
results = db.query(Order).all()
payload = []
for o in results:
payload.append({
"order_id": strip_str(o.order_id),
"customer_id": o.customer_id,
"checkin_date": o.checkin_date.strftime("%Y-%m-%d %H:%M") if o.checkin_date else "N/A",
"checkout_date": o.checkout_date.strftime("%Y-%m-%d %H:%M") if o.checkout_date else "N/A",
"status": strip_str(o.status) or "未知",
"channel": strip_str(o.order_channel),
"create_time": o.create_time.strftime("%Y-%m-%d %H:%M") if o.create_time else "N/A"
})
return payload
except Exception as e:
raise HTTPException(status_code=500, detail=f"订单查询失败: {str(e)}")
finally:
db.close()
# ===== 房间管理相关 API =====
@app.get("/api/rooms")
def get_rooms():
"""获取所有房间及其状态"""
db = SessionLocal()
try:
results = db.query(Room).all()
payload = []
for r in results:
payload.append({
"room_id": r.room_id,
"type_id": r.room_type_id,
"floor": r.floor,
"status": strip_str(r.status) or "未知",
"orientation": strip_str(r.orientation),
"facility": strip_str(r.facility),
"building_area": float(r.building_area) if r.building_area else 0,
"usable_area": float(r.usable_area) if r.usable_area else 0,
"position": strip_str(r.floor_position),
"window_type": strip_str(r.window_type),
"layout": strip_str(r.room_layout)
})
return payload
except Exception as e:
raise HTTPException(status_code=500, detail=f"房间查询失败: {str(e)}")
finally:
db.close()
@app.get("/api/room-types")
def get_room_types():
"""获取所有房型及价格"""
db = SessionLocal()
try:
results = db.query(RoomType).all()
payload = []
for rt in results:
payload.append({
"type_id": rt.type_id,
"type_name": strip_str(rt.type_name),
"bed_type": strip_str(rt.bed_type),
"price": float(rt.price) if rt.price else 0,
"max_people": rt.max_people
})
return payload
except Exception as e:
raise HTTPException(status_code=500, detail=f"房型查询失败: {str(e)}")
finally:
db.close()
# ===== 视图数据 API =====
@app.get("/api/housekeeping-list")
def get_housekeeping_list():
"""获取保洁任务列表(来自视图 v_housekeeping_list"""
db = SessionLocal()
try:
results = db.query(HousekeepingList).all()
payload = []
for h in results:
payload.append({
"room_number": h.room_number,
"floor": h.floor,
"orientation": strip_str(h.orientation),
"type_name": strip_str(h.type_name),
"booking_status": strip_str(h.booking_status) or "未知",
"guest_name": strip_str(h.guest_name) or "空房",
"instruction": strip_str(h.specific_instruction) or "无特殊说明"
})
return payload
except Exception as e:
raise HTTPException(status_code=500, detail=f"保洁列表查询失败: {str(e)}")
finally:
db.close()
@app.get("/api/room-inventory-stats")
def get_room_inventory_stats():
"""获取房间库存统计(来自视图 v_room_inventory_stats"""
db = SessionLocal()
try:
results = db.query(RoomInventoryStats).all()
payload = []
for r in results:
payload.append({
"orientation": strip_str(r.orientation) or "未知",
"total_rooms": r.total_rooms or 0,
"available_types": strip_str(r.available_types) or "",
"avg_price": float(r.avg_price) if r.avg_price else 0,
"avg_usable_area": float(r.avg_usable_area) if r.avg_usable_area else 0
})
return payload
except Exception as e:
raise HTTPException(status_code=500, detail=f"房间统计查询失败: {str(e)}")
finally:
db.close()
# ===== 员工管理 API =====
@app.get("/api/employees")
def get_employees():
"""获取所有员工信息"""
db = SessionLocal()
try:
results = db.query(Employee).all()
payload = []
for e in results:
payload.append({
"employee_id": e.employee_id,
"name": strip_str(e.name),
"role": strip_str(e.role),
"permission": strip_str(e.permission),
"phone": strip_str(e.phone),
"account": strip_str(e.account)
})
return payload
except Exception as e:
raise HTTPException(status_code=500, detail=f"员工查询失败: {str(e)}")
finally:
db.close()
# ===== 客户管理 CRUD 操作 =====
@app.post("/api/customers")
def create_customer(customer: CustomerCreate):
"""新增客户"""
db = SessionLocal()
try:
from datetime import datetime
new_customer = Customer(
name=customer.name,
phone=customer.phone,
id_card=customer.id_card,
register_time=datetime.now()
)
db.add(new_customer)
db.commit()
db.refresh(new_customer)
return {
"customer_id": new_customer.customer_id,
"name": new_customer.name,
"phone": new_customer.phone
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"添加客户失败: {str(e)}")
finally:
db.close()
@app.put("/api/customers/{customer_id}")
def update_customer(customer_id: int, update_data: CustomerUpdate):
"""编辑客户信息"""
db = SessionLocal()
try:
customer = db.query(Customer).filter(Customer.customer_id == customer_id).first()
if not customer:
raise HTTPException(status_code=404, detail="客户不存在")
if update_data.name:
customer.name = update_data.name
if update_data.phone:
customer.phone = update_data.phone
db.commit()
return {"success": True, "message": "客户信息更新成功"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"更新失败: {str(e)}")
finally:
db.close()
@app.delete("/api/customers/{customer_id}")
def delete_customer(customer_id: int):
"""删除客户"""
db = SessionLocal()
try:
customer = db.query(Customer).filter(Customer.customer_id == customer_id).first()
if not customer:
raise HTTPException(status_code=404, detail="客户不存在")
db.delete(customer)
db.commit()
return {"success": True, "message": "客户删除成功"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"删除失败: {str(e)}")
finally:
db.close()
# ===== 员工管理 CRUD 操作 =====
@app.post("/api/employees")
def create_employee(employee: EmployeeCreate):
"""新增员工"""
db = SessionLocal()
try:
new_employee = Employee(
name=employee.name,
role=employee.role,
permission=employee.permission,
phone=employee.phone,
account=employee.account,
password=employee.password
)
db.add(new_employee)
db.commit()
db.refresh(new_employee)
return {
"employee_id": new_employee.employee_id,
"name": new_employee.name,
"role": new_employee.role
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"添加员工失败: {str(e)}")
finally:
db.close()
@app.put("/api/employees/{employee_id}")
def update_employee(employee_id: int, update_data: EmployeeUpdate):
"""编辑员工信息"""
db = SessionLocal()
try:
employee = db.query(Employee).filter(Employee.employee_id == employee_id).first()
if not employee:
raise HTTPException(status_code=404, detail="员工不存在")
if update_data.name:
employee.name = update_data.name
if update_data.role:
employee.role = update_data.role
if update_data.phone:
employee.phone = update_data.phone
db.commit()
return {"success": True, "message": "员工信息更新成功"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"更新失败: {str(e)}")
finally:
db.close()
@app.delete("/api/employees/{employee_id}")
def delete_employee(employee_id: int):
"""删除员工"""
db = SessionLocal()
try:
employee = db.query(Employee).filter(Employee.employee_id == employee_id).first()
if not employee:
raise HTTPException(status_code=404, detail="员工不存在")
db.delete(employee)
db.commit()
return {"success": True, "message": "员工删除成功"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"删除失败: {str(e)}")
finally:
db.close()
# ===== 房间管理 CRUD 操作 =====
@app.put("/api/rooms/{room_id}")
def update_room_status(room_id: int, update_data: RoomUpdate):
"""更新房间状态"""
db = SessionLocal()
try:
room = db.query(Room).filter(Room.room_id == room_id).first()
if not room:
raise HTTPException(status_code=404, detail="房间不存在")
room.status = update_data.status
db.commit()
return {"success": True, "message": "房间状态更新成功"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"更新失败: {str(e)}")
finally:
db.close()
# ===== 订单管理 CRUD 操作 =====
@app.post("/api/orders")
def create_order(order_data: OrderCreate):
"""新增订单"""
db = SessionLocal()
try:
from datetime import datetime
new_order = Order(
order_id=f"ORD{datetime.now().strftime('%Y%m%d%H%M%S')}",
customer_id=order_data.customer_id,
checkin_date=datetime.fromisoformat(order_data.checkin_date),
checkout_date=datetime.fromisoformat(order_data.checkout_date),
status=order_data.status,
create_time=datetime.now(),
order_channel=order_data.channel
)
db.add(new_order)
db.commit()
db.refresh(new_order)
return {
"order_id": new_order.order_id,
"customer_id": new_order.customer_id,
"status": new_order.status
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"创建订单失败: {str(e)}")
finally:
db.close()
@app.put("/api/orders/{order_id}")
def update_order(order_id: str, update_data: OrderUpdate):
"""编辑订单"""
db = SessionLocal()
try:
order = db.query(Order).filter(Order.order_id == order_id).first()
if not order:
raise HTTPException(status_code=404, detail="订单不存在")
if update_data.status:
order.status = update_data.status
if update_data.checkout_date:
from datetime import datetime
order.checkout_date = datetime.fromisoformat(update_data.checkout_date)
db.commit()
return {"success": True, "message": "订单更新成功"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"更新失败: {str(e)}")
finally:
db.close()
@app.delete("/api/orders/{order_id}")
def delete_order(order_id: str):
"""删除订单"""
db = SessionLocal()
try:
order = db.query(Order).filter(Order.order_id == order_id).first()
if not order:
raise HTTPException(status_code=404, detail="订单不存在")
db.delete(order)
db.commit()
return {"success": True, "message": "订单删除成功"}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"删除失败: {str(e)}")
finally:
db.close()
# ===== 新业务流程 API =====
@app.post("/api/checkin")
def process_checkin(checkin_data: CheckinData):
"""处理入住登记
流程:
1. 创建新客户(如果不存在)
2. 创建新订单
3. 更新房间状态为已占用
"""
db = SessionLocal()
try:
from datetime import datetime, timedelta
# 1. 检查或创建客户
existing_customer = db.query(Customer).filter(
Customer.id_card == checkin_data.id_card
).first()
if existing_customer:
customer_id = existing_customer.customer_id
else:
new_customer = Customer(
name=checkin_data.name,
phone=checkin_data.phone,
id_card=checkin_data.id_card,
register_time=datetime.now()
)
db.add(new_customer)
db.flush()
customer_id = new_customer.customer_id
# 2. 创建订单
checkin_dt = datetime.fromisoformat(checkin_data.checkin_date)
checkout_dt = checkin_dt + timedelta(days=checkin_data.nights)
new_order = Order(
order_id=f"ORD{datetime.now().strftime('%Y%m%d%H%M%S%f')[-6:]}",
customer_id=customer_id,
checkin_date=checkin_dt,
checkout_date=checkout_dt,
status="已入住",
create_time=datetime.now(),
order_channel="前台入住"
)
db.add(new_order)
db.flush()
# 3. 更新房间状态
room = db.query(Room).filter(Room.room_id == checkin_data.room_id).first()
if room:
room.status = "已占用"
db.commit()
return {
"success": True,
"message": f"宾客 {checkin_data.name} 成功入住房间 {checkin_data.room_id}",
"order_id": new_order.order_id,
"customer_id": customer_id
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"入住处理失败: {str(e)}")
finally:
db.close()
@app.post("/api/checkout")
def process_checkout(checkout_data: CheckoutData):
"""处理退房结账
流程:
1. 更新订单状态为已退房
2. 更新房间状态为清洁中
3. 生成账单记录
"""
db = SessionLocal()
try:
from datetime import datetime
# 1. 更新订单
order = db.query(Order).filter(Order.order_id == checkout_data.order_id).first()
if not order:
raise HTTPException(status_code=404, detail="订单不存在")
order.status = "已退房"
order.checkout_date = datetime.now()
# 2. 更新房间状态
room = db.query(Room).filter(Room.room_id == checkout_data.room_id).first()
if room:
room.status = "清洁中"
# 3. 记录账单(可选:如果有独立的账单表)
total_amount = checkout_data.room_fee + checkout_data.additional_fee
db.commit()
return {
"success": True,
"message": f"房间 {checkout_data.room_id} 已完成退房结账",
"total_amount": total_amount,
"payment_method": checkout_data.payment_method,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"退房处理失败: {str(e)}")
finally:
db.close()
@app.post("/api/services")
def create_service_request(service_data: ServiceRequestData):
"""创建服务请求
用于宾客请求客房服务、维修、清洁等
"""
db = SessionLocal()
try:
from datetime import datetime
# 这里可以创建一个新的服务请求表(如果有的话)
# 现在只是返回确认信息
return {
"success": True,
"message": f"服务请求已提交: {service_data.service_type}",
"service_id": f"SRV{datetime.now().strftime('%Y%m%d%H%M%S%f')[-6:]}",
"room_id": service_data.room_id,
"priority": service_data.priority,
"status": "待处理",
"created_time": datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"服务请求失败: {str(e)}")
finally:
db.close()
@app.get("/api/reservations")
def get_reservations():
"""获取所有预订(使用订单表中状态为'预约中'的数据)"""
db = SessionLocal()
try:
results = db.query(Order).filter(Order.status == "预约中").all()
payload = []
for o in results:
customer = db.query(Customer).filter(Customer.customer_id == o.customer_id).first()
payload.append({
"order_id": o.order_id,
"guest_name": customer.name if customer else "未知",
"phone": customer.phone if customer else "未知",
"checkin_date": o.checkin_date.strftime("%Y-%m-%d") if o.checkin_date else "N/A",
"checkout_date": o.checkout_date.strftime("%Y-%m-%d") if o.checkout_date else "N/A",
"status": o.status,
"channel": o.order_channel
})
return payload
except Exception as e:
raise HTTPException(status_code=500, detail=f"预订查询失败: {str(e)}")
finally:
db.close()
@app.post("/api/reservations")
def create_reservation(res_data: ReservationData):
"""创建新预订"""
db = SessionLocal()
try:
from datetime import datetime
# 检查或创建客户
existing_customer = db.query(Customer).filter(
Customer.phone == res_data.phone
).first()
if existing_customer:
customer_id = existing_customer.customer_id
else:
new_customer = Customer(
name=res_data.guest_name,
phone=res_data.phone,
id_card="",
register_time=datetime.now()
)
db.add(new_customer)
db.flush()
customer_id = new_customer.customer_id
# 创建预订订单
new_order = Order(
order_id=f"RES{datetime.now().strftime('%Y%m%d%H%M%S%f')[-6:]}",
customer_id=customer_id,
checkin_date=datetime.fromisoformat(res_data.checkin_date),
checkout_date=datetime.fromisoformat(res_data.checkout_date),
status="预约中",
create_time=datetime.now(),
order_channel=res_data.channel
)
db.add(new_order)
db.commit()
return {
"success": True,
"message": f"预订成功: {res_data.guest_name} - {res_data.room_type}",
"reservation_id": new_order.order_id,
"customer_id": customer_id
}
except Exception as e:
db.rollback()
raise HTTPException(status_code=500, detail=f"创建预订失败: {str(e)}")
finally:
db.close()
@app.get("/api/bills")
def get_bills():
"""获取所有已完成的账单(已退房订单)"""
db = SessionLocal()
try:
results = db.query(Order).filter(Order.status == "已退房").all()
payload = []
for o in results:
customer = db.query(Customer).filter(Customer.customer_id == o.customer_id).first()
payload.append({
"order_id": o.order_id,
"guest_name": customer.name if customer else "未知",
"checkin_date": o.checkin_date.strftime("%Y-%m-%d") if o.checkin_date else "N/A",
"checkout_date": o.checkout_date.strftime("%Y-%m-%d") if o.checkout_date else "N/A",
"status": "已结账",
"create_time": o.create_time.strftime("%Y-%m-%d %H:%M") if o.create_time else "N/A"
})
return payload
except Exception as e:
raise HTTPException(status_code=500, detail=f"账单查询失败: {str(e)}")
finally:
db.close()
@app.post("/api/bills")
def create_bill(bill_data: BillData):
"""创建账单"""
db = SessionLocal()
try:
total_amount = bill_data.room_fee + bill_data.service_fee + bill_data.other_fee
return {
"success": True,
"message": "账单已生成",
"order_id": bill_data.order_id,
"total_amount": total_amount,
"status": bill_data.status
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"账单生成失败: {str(e)}")
finally:
db.close()
@app.post("/api/bills/{bill_id}")
def confirm_bill_payment(bill_id: str):
"""确认账单支付"""
db = SessionLocal()
try:
# 这里可以根据bill_id查询实际的账单信息
# 现在只是返回成功确认
return {
"success": True,
"message": f"账单 {bill_id} 支付已确认",
"bill_id": bill_id,
"status": "已支付",
"timestamp": __import__('datetime').datetime.now().isoformat()
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"确认支付失败: {str(e)}")
finally:
db.close()
# ===== 健康检查 =====
@app.get("/health")
def health_check():
"""系统健康检查"""
return {"status": "healthy", "message": "宾馆管理系统运行中"}
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000)