|
|
import mysql.connector
|
|
|
from mysql.connector import Error
|
|
|
from tkinter import messagebox
|
|
|
from Car import Car
|
|
|
|
|
|
class ParkManage:
|
|
|
def __init__(self, host="localhost", user="root", password="root", database="sys", user_gui=None):
|
|
|
"""初始化数据库连接"""
|
|
|
self.conn = mysql.connector.connect(
|
|
|
host="localhost",
|
|
|
user="root",
|
|
|
password="root",
|
|
|
database="sys"
|
|
|
)
|
|
|
self.cursor = self.conn.cursor()
|
|
|
self.create_tables()
|
|
|
self.user_gui = user_gui # 添加一个可选参数用于Tkinter根窗口
|
|
|
|
|
|
def create_tables(self):
|
|
|
"""创建必要的数据库表"""
|
|
|
try:
|
|
|
self.cursor.execute("""
|
|
|
CREATE TABLE IF NOT EXISTS parked_cars (
|
|
|
id INT AUTO_INCREMENT PRIMARY KEY,
|
|
|
license_plate VARCHAR(255) NOT NULL UNIQUE,
|
|
|
parked_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
|
picked_up_time TIMESTAMP NULL
|
|
|
);
|
|
|
""")
|
|
|
self.conn.commit()
|
|
|
except Error as e:
|
|
|
messagebox.showerror("数据库错误", f"创建表失败: {e}")
|
|
|
|
|
|
def park(self, license_plate):
|
|
|
"""处理停车逻辑并存储到数据库"""
|
|
|
try:
|
|
|
query = "INSERT INTO parked_cars (license_plate) VALUES (%s)"
|
|
|
values = (license_plate,)
|
|
|
self.cursor.execute(query, values)
|
|
|
self.conn.commit()
|
|
|
return True
|
|
|
except mysql.connector.IntegrityError: # 车牌号已存在
|
|
|
return False
|
|
|
except Error as e:
|
|
|
messagebox.showerror("数据库错误", f"停车操作时发生错误: {e}")
|
|
|
return False
|
|
|
|
|
|
def pickup(self, license_plate):
|
|
|
"""处理取车逻辑,更新数据库记录"""
|
|
|
try:
|
|
|
# 更新取车时间
|
|
|
query = "UPDATE parked_cars SET picked_up_time = CURRENT_TIMESTAMP WHERE license_plate = %s AND picked_up_time IS NULL"
|
|
|
values = (license_plate,)
|
|
|
self.cursor.execute(query, values)
|
|
|
if self.cursor.rowcount > 0:
|
|
|
self.conn.commit()
|
|
|
return True
|
|
|
else:
|
|
|
messagebox.showinfo("取车失败", f"车牌号{license_plate}的车辆未找到或已取走。")
|
|
|
return False
|
|
|
except Error as e:
|
|
|
messagebox.showerror("数据库错误", f"取车操作时发生错误: {e}")
|
|
|
return False
|
|
|
|
|
|
# 其他方法如查询、编辑、统计等可以根据需要类似地实现
|
|
|
def query_car_info(self, license_plate):
|
|
|
"""根据车牌号查询车辆停放信息"""
|
|
|
try:
|
|
|
query = """
|
|
|
SELECT
|
|
|
license_plate,
|
|
|
parked_time,
|
|
|
picked_up_time
|
|
|
FROM
|
|
|
parked_cars
|
|
|
WHERE
|
|
|
license_plate = %s
|
|
|
"""
|
|
|
values = (license_plate,)
|
|
|
self.cursor.execute(query, values)
|
|
|
car_info = self.cursor.fetchone()
|
|
|
|
|
|
if car_info:
|
|
|
# 格式化输出信息
|
|
|
parked_time_str = car_info[1].strftime('%Y-%m-%d %H:%M:%S')
|
|
|
picked_up_time_str = "尚未取车" if car_info[2] is None else car_info[2].strftime('%Y-%m-%d %H:%M:%S')
|
|
|
message = f"车牌号: {car_info[0]}\n停放时间: {parked_time_str}\n取车时间: {picked_up_time_str}"
|
|
|
messagebox.showinfo("车辆信息", message)
|
|
|
else:
|
|
|
messagebox.showinfo("查询结果", "未找到该车牌号的车辆信息。")
|
|
|
except Error as e:
|
|
|
messagebox.showerror("查询错误", f"查询车辆信息时发生错误: {e}")
|
|
|
|
|
|
def display_all_cars_info(self):
|
|
|
"""显示所有车辆的停放信息"""
|
|
|
try:
|
|
|
query = """
|
|
|
SELECT
|
|
|
license_plate,
|
|
|
parked_time,
|
|
|
picked_up_time
|
|
|
FROM
|
|
|
parked_cars
|
|
|
"""
|
|
|
self.cursor.execute(query)
|
|
|
car_info = self.cursor.fetchall()
|
|
|
|
|
|
if car_info:
|
|
|
# 遍历所有车辆信息并格式化输出
|
|
|
all_cars_message = "\n\n".join([
|
|
|
f"车牌号: {car[0]}\n"
|
|
|
f"停放时间: {car[1].strftime('%Y-%m-%d %H:%M:%S')}\n"
|
|
|
f"取车时间: {'未取车' if car[2] is None else car[2].strftime('%Y-%m-%d %H:%M:%S')}"
|
|
|
for car in car_info
|
|
|
])
|
|
|
messagebox.showinfo("所有车辆信息", all_cars_message)
|
|
|
else:
|
|
|
messagebox.showinfo("信息提示", "当前停车场内没有车辆信息。")
|
|
|
except Error as e:
|
|
|
messagebox.showerror("显示错误", f"显示所有车辆信息时发生错误: {e}")
|
|
|
|
|
|
|
|
|
def count_parked_cars(self):
|
|
|
"""统计当前停在停车场的车辆总数"""
|
|
|
query = "SELECT COUNT(*) FROM parked_cars"
|
|
|
self.cursor.execute(query)
|
|
|
parked_count = self.cursor.fetchone()[0]
|
|
|
return parked_count
|
|
|
|
|
|
def count_empty_slots(self, total_slots):
|
|
|
"""计算空闲车位数"""
|
|
|
parked_count = self.count_parked_cars()
|
|
|
empty_slots = total_slots - parked_count
|
|
|
return empty_slots
|
|
|
|
|
|
def most_frequent_parking_time(self):
|
|
|
"""
|
|
|
分析并返回最频繁的停车时段。
|
|
|
这里简化处理,实际应用中可能需要更复杂的SQL查询来统计时段。
|
|
|
"""
|
|
|
query = """
|
|
|
SELECT
|
|
|
HOUR(parked_time) AS hour,
|
|
|
COUNT(*) AS frequency
|
|
|
FROM parked_cars
|
|
|
GROUP BY hour
|
|
|
ORDER BY frequency DESC
|
|
|
LIMIT 1
|
|
|
"""
|
|
|
self.cursor.execute(query)
|
|
|
most_frequent_hour, _ = self.cursor.fetchone()
|
|
|
return f"最频繁的停车时段是第{most_frequent_hour}小时"
|
|
|
|
|
|
|
|
|
def close_connection(self):
|
|
|
"""关闭数据库连接"""
|
|
|
if self.conn.is_connected():
|
|
|
self.cursor.close()
|
|
|
self.conn.close() |