from datetime import datetime import pytz from sqlalchemy import func from app.models import db from app.models.train_station_lib import TrainStation #Train实体 class Train(db.Model): __tablename__: str = 'train' id = db.Column(db.Integer, primary_key=True) train_no = db.Column(db.String(120), unique=True, nullable=False, index=True) departure_station = db.Column(db.String(120)) arrival_station = db.Column(db.String(120)) departure_time = db.Column(db.DateTime) expiration_time = db.Column(db.DateTime) effective_time = db.Column(db.DateTime) arrival_time = db.Column(db.DateTime) created_at = db.Column(db.DateTime, default=func.now()) updated_at = db.Column(db.DateTime, default=func.now()) def __repr__(self): return f'' @classmethod def create(cls, new_train): db.session.add(new_train) db.session.commit() return new_train """ 根据出发站、到达站和日期查询符合条件的列车信息。 该方法首先根据出发站和到达站名称分别查询经过这两个站点的列车, 然后找出同时经过这两个站点的列车编号(train_no), 再通过判断站点顺序(出发站在到达站之前),筛选出有效的列车, 最后根据有效时间和列车编号查询最终结果。 参数: - from_station: 出发站名称(字符串) - to_station: 到达站名称(字符串) - date: 查询日期(datetime 对象) 返回: - 符合条件的列车列表 """ # # 根据列车编号和有效时间筛选符合条件的列车 # trains = Train.query.filter( # #Train.effective_time >= date, # 只查询在指定日期之后有效的列车 # Train.train_no.in_(valid_train_nos) # 使用有效列车编号进行过滤 # ).all() @classmethod # 业务层(服务层) def queryTrains(cls, from_station, to_station, date): # 查询所有经过出发站的列车信息 from_train = TrainStation.query.filter_by(station_name=from_station).all() # 查询所有经过到达站的列车信息 to_train = TrainStation.query.filter_by(station_name=to_station).all() # 提取出发站对应的所有列车编号(train_no) from_train_nos = {ts.train_no for ts in from_train} # 提取到达站对应的所有列车编号(train_no) to_train_nos = {ts.train_no for ts in to_train} # 找出两个站点共有的列车编号(即:哪些列车同时经过出发站和到达站) common_train_nos = from_train_nos & to_train_nos # 过滤出“出发站出现在到达站之前”的列车编号 valid_train_nos = [ train_no for train_no in common_train_nos if next(ts.index for ts in from_train if ts.train_no == train_no) < next(ts.index for ts in to_train if ts.train_no == train_no) ] # 获取当前系统时间 now = datetime.now(pytz.timezone('Asia/Shanghai')) #判断date是否大于当前系统时间 if date > now: # 根据列车编号和有效时间筛选符合条件的列车 trains = Train.query.filter( Train.effective_time < date, Train.expiration_time > date, Train.train_no.in_(valid_train_nos) # 使用有效列车编号进行过滤 ).all() else: # 根据列车编号和有效时间筛选符合条件的列车 trains = db.session.query(Train, TrainStation).join( TrainStation, Train.train_no == TrainStation.train_no and TrainStation.station_name == from_station ).filter( Train.effective_time < date, Train.expiration_time > date, # 不大于当前的系统时间还需判断当前的时间是否大于列车的出发时间 func.time(TrainStation.departure_time) > func.time(date), Train.train_no.in_(valid_train_nos) # 使用有效列车编号进行过滤 ).all() # 返回查询到的列车列表 return trains def buildTrain(params): # Create a new Train object train = Train( train_no=params['trainNo'], effective_time=params['effective_time'], expiration_time=params['expiration_time'] ) # Extract indexes for determining first and last station indexes = [e["index"] for e in params['stations']] for e in params['stations']: # Create and associate TrainStation objects train_station = TrainStation( station_name=e["name"], price=e["price"], departure_time=e["depTime"], arrival_time=e["arrTime"], index=e["index"] ) train.train_stations.append(train_station) # Determine the departure time and station for the first station if e["index"] == 0: train.departure_time = e["depTime"] train.departure_station = e["name"] # Determine the arrival time and station for the last station if e["index"] == max(indexes): train.arrival_time = e["arrTime"] train.arrival_station = e["name"] return train