import mysql.connector from mysql.connector import Error from tkinter import messagebox from Car import Car class ParkManage: def __init__(self, host="localhost", user="root", password="root", database="sys", user_gui=None): """初始化数据库连接""" self.conn = mysql.connector.connect( host="localhost", user="root", password="root", database="sys" ) self.cursor = self.conn.cursor() self.create_tables() self.user_gui = user_gui # 添加一个可选参数用于Tkinter根窗口 def create_tables(self): """创建必要的数据库表""" try: self.cursor.execute(""" CREATE TABLE IF NOT EXISTS parked_cars ( id INT AUTO_INCREMENT PRIMARY KEY, license_plate VARCHAR(255) NOT NULL UNIQUE, parked_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, picked_up_time TIMESTAMP NULL ); """) self.conn.commit() except Error as e: messagebox.showerror("数据库错误", f"创建表失败: {e}") def park(self, license_plate): """处理停车逻辑并存储到数据库""" try: query = "INSERT INTO parked_cars (license_plate) VALUES (%s)" values = (license_plate,) self.cursor.execute(query, values) self.conn.commit() return True except mysql.connector.IntegrityError: # 车牌号已存在 return False except Error as e: messagebox.showerror("数据库错误", f"停车操作时发生错误: {e}") return False def pickup(self, license_plate): """处理取车逻辑,更新数据库记录""" try: # 更新取车时间 query = "UPDATE parked_cars SET picked_up_time = CURRENT_TIMESTAMP WHERE license_plate = %s AND picked_up_time IS NULL" values = (license_plate,) self.cursor.execute(query, values) if self.cursor.rowcount > 0: self.conn.commit() return True else: messagebox.showinfo("取车失败", f"车牌号{license_plate}的车辆未找到或已取走。") return False except Error as e: messagebox.showerror("数据库错误", f"取车操作时发生错误: {e}") return False # 其他方法如查询、编辑、统计等可以根据需要类似地实现 def query_car_info(self, license_plate): """根据车牌号查询车辆停放信息""" try: query = """ SELECT license_plate, parked_time, picked_up_time FROM parked_cars WHERE license_plate = %s """ values = (license_plate,) self.cursor.execute(query, values) car_info = self.cursor.fetchone() if car_info: # 格式化输出信息 parked_time_str = car_info[1].strftime('%Y-%m-%d %H:%M:%S') picked_up_time_str = "尚未取车" if car_info[2] is None else car_info[2].strftime('%Y-%m-%d %H:%M:%S') message = f"车牌号: {car_info[0]}\n停放时间: {parked_time_str}\n取车时间: {picked_up_time_str}" messagebox.showinfo("车辆信息", message) else: messagebox.showinfo("查询结果", "未找到该车牌号的车辆信息。") except Error as e: messagebox.showerror("查询错误", f"查询车辆信息时发生错误: {e}") def display_all_cars_info(self): """显示所有车辆的停放信息""" try: query = """ SELECT license_plate, parked_time, picked_up_time FROM parked_cars """ self.cursor.execute(query) car_info = self.cursor.fetchall() if car_info: # 遍历所有车辆信息并格式化输出 all_cars_message = "\n\n".join([ f"车牌号: {car[0]}\n" f"停放时间: {car[1].strftime('%Y-%m-%d %H:%M:%S')}\n" f"取车时间: {'未取车' if car[2] is None else car[2].strftime('%Y-%m-%d %H:%M:%S')}" for car in car_info ]) messagebox.showinfo("所有车辆信息", all_cars_message) else: messagebox.showinfo("信息提示", "当前停车场内没有车辆信息。") except Error as e: messagebox.showerror("显示错误", f"显示所有车辆信息时发生错误: {e}") def count_parked_cars(self): """统计当前停在停车场的车辆总数""" query = "SELECT COUNT(*) FROM parked_cars" self.cursor.execute(query) parked_count = self.cursor.fetchone()[0] return parked_count def count_empty_slots(self, total_slots): """计算空闲车位数""" parked_count = self.count_parked_cars() empty_slots = total_slots - parked_count return empty_slots def most_frequent_parking_time(self): """ 分析并返回最频繁的停车时段。 这里简化处理,实际应用中可能需要更复杂的SQL查询来统计时段。 """ query = """ SELECT HOUR(parked_time) AS hour, COUNT(*) AS frequency FROM parked_cars GROUP BY hour ORDER BY frequency DESC LIMIT 1 """ self.cursor.execute(query) most_frequent_hour, _ = self.cursor.fetchone() return f"最频繁的停车时段是第{most_frequent_hour}小时" def close_connection(self): """关闭数据库连接""" if self.conn.is_connected(): self.cursor.close() self.conn.close()