import mysql.connector from mysql.connector import Error from tkinter import messagebox from Car import Car class ParkManage: def __init__(self, host="localhost", user="root", password="root", database="sys", user_gui=None): """初始化数据库连接""" self.conn = mysql.connector.connect( host="localhost", user="root", password="root", database="sys" ) self.cursor = self.conn.cursor() self.create_tables() self.user_gui = user_gui # 添加一个可选参数用于Tkinter根窗口 def create_tables(self): """创建必要的数据库表""" try: self.cursor.execute(""" CREATE TABLE IF NOT EXISTS parked_cars ( id INT AUTO_INCREMENT PRIMARY KEY, license_plate VARCHAR(255) NOT NULL UNIQUE, parked_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, picked_up_time TIMESTAMP NULL ); """) self.conn.commit() except Error as e: messagebox.showerror("数据库错误", f"创建表失败: {e}") def park(self, license_plate): """处理停车逻辑并存储到数据库""" try: query = "INSERT INTO parked_cars (license_plate) VALUES (%s)" values = (license_plate,) self.cursor.execute(query, values) self.conn.commit() return True except mysql.connector.IntegrityError: # 车牌号已存在 return False except Error as e: messagebox.showerror("数据库错误", f"停车操作时发生错误: {e}") return False def pickup(self, license_plate): """处理取车逻辑,更新数据库记录""" try: # 更新取车时间 query = "UPDATE parked_cars SET picked_up_time = CURRENT_TIMESTAMP WHERE license_plate = %s AND picked_up_time IS NULL" values = (license_plate,) self.cursor.execute(query, values) if self.cursor.rowcount > 0: self.conn.commit() return True else: messagebox.showinfo("取车失败", f"车牌号{license_plate}的车辆未找到或已取走。") return False except Error as e: messagebox.showerror("数据库错误", f"取车操作时发生错误: {e}") return False # 其他方法如查询、编辑、统计等可以根据需要类似地实现 def query_car_info(self, license_plate): """根据车牌号查询车辆停放信息""" try: query = """ SELECT license_plate, parked_time, picked_up_time FROM parked_cars WHERE license_plate = %s """ values = (license_plate,) self.cursor.execute(query, values) car_info = self.cursor.fetchone() if car_info: # 格式化输出信息 parked_time_str = car_info[1].strftime('%Y-%m-%d %H:%M:%S') picked_up_time_str = "尚未取车" if car_info[2] is None else car_info[2].strftime('%Y-%m-%d %H:%M:%S') message = f"车牌号: {car_info[0]}\n停放时间: {parked_time_str}\n取车时间: {picked_up_time_str}" messagebox.showinfo("车辆信息", message) else: messagebox.showinfo("查询结果", "未找到该车牌号的车辆信息。") except Error as e: messagebox.showerror("查询错误", f"查询车辆信息时发生错误: {e}") def display_all_cars_info(self): """显示所有车辆的停放信息""" try: query = """ SELECT license_plate, parked_time, picked_up_time FROM parked_cars """ self.cursor.execute(query) all_cars_info = self.cursor.fetchall() if all_cars_info: # 遍历所有车辆信息并格式化输出 all_cars_message = "\n\n".join([ f"车牌号: {car[0]}\n" f"停放时间: {car[1].strftime('%Y-%m-%d %H:%M:%S')}\n" f"取车时间: {'未取车' if car[2] is None else car[2].strftime('%Y-%m-%d %H:%M:%S')}" for car in all_cars_info ]) messagebox.showinfo("所有车辆信息", all_cars_message) else: messagebox.showinfo("信息提示", "当前停车场内没有车辆信息。") except Error as e: messagebox.showerror("显示错误", f"显示所有车辆信息时发生错误: {e}") def edit_car_pickup_time(self, license_plate): """编辑车辆的取车时间""" try: # 查询指定车牌号的车辆是否存在 check_query = """ SELECT COUNT(*) FROM parked_cars WHERE license_plate = %s """ self.cursor.execute(check_query, (license_plate,)) car_exists = self.cursor.fetchone()[0] if car_exists: # 弹出对话框让用户输入新的取车时间 new_picked_up_time_str = simpledialog.askstring("编辑取车时间", "请输入新的取车时间(YYYY-MM-DD HH:MM:SS):", initialvalue="2023-04-01 14:00:00") if new_picked_up_time_str: try: # 尝试将输入的字符串转换为datetime对象 from datetime import datetime new_picked_up_time = datetime.strptime(new_picked_up_time_str, '%Y-%m-%d %H:%M:%S') except ValueError: messagebox.showerror("输入错误", "请输入正确的日期时间格式!") return # 更新数据库中的取车时间 update_query = """ UPDATE parked_cars SET picked_up_time = %s WHERE license_plate = %s """ self.cursor.execute(update_query, (new_picked_up_time, license_plate)) self.connection.commit() # 提交事务 messagebox.showinfo("更新成功", f"车牌号为{license_plate}的车辆取车时间已更新。") else: messagebox.showinfo("取消操作", "未输入取车时间,操作已取消。") else: messagebox.showinfo("查询结果", "未找到该车牌号的车辆信息。") except Error as e: messagebox.showerror("编辑错误", f"编辑车辆取车时间时发生错误: {e}") def count_parked_cars(self): """统计当前停在停车场的车辆总数""" query = "SELECT COUNT(*) FROM parked_cars" self.cursor.execute(query) parked_count = self.cursor.fetchone()[0] return parked_count def count_empty_slots(self, total_slots): """计算空闲车位数""" parked_count = self.count_parked_cars() empty_slots = total_slots - parked_count return empty_slots def most_frequent_parking_time(self): """ 分析并返回最频繁的停车时段。 这里简化处理,实际应用中可能需要更复杂的SQL查询来统计时段。 """ query = """ SELECT HOUR(parked_time) AS hour, COUNT(*) AS frequency FROM parked_cars GROUP BY hour ORDER BY frequency DESC LIMIT 1 """ self.cursor.execute(query) most_frequent_hour, _ = self.cursor.fetchone() return f"最频繁的停车时段是第{most_frequent_hour}小时" def close_connection(self): """关闭数据库连接""" if self.conn.is_connected(): self.cursor.close() self.conn.close()