You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

159 lines
6.1 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import mysql.connector
from mysql.connector import Error
from tkinter import messagebox
from Car import Car
class ParkManage:
def __init__(self, host="localhost", user="root", password="root", database="sys", user_gui=None):
"""初始化数据库连接"""
self.conn = mysql.connector.connect(
host="localhost",
user="root",
password="root",
database="sys"
)
self.cursor = self.conn.cursor()
self.create_tables()
self.user_gui = user_gui # 添加一个可选参数用于Tkinter根窗口
def create_tables(self):
"""创建必要的数据库表"""
try:
self.cursor.execute("""
CREATE TABLE IF NOT EXISTS parked_cars (
id INT AUTO_INCREMENT PRIMARY KEY,
license_plate VARCHAR(255) NOT NULL UNIQUE,
parked_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
picked_up_time TIMESTAMP NULL
);
""")
self.conn.commit()
except Error as e:
messagebox.showerror("数据库错误", f"创建表失败: {e}")
def park(self, license_plate):
"""处理停车逻辑并存储到数据库"""
try:
query = "INSERT INTO parked_cars (license_plate) VALUES (%s)"
values = (license_plate,)
self.cursor.execute(query, values)
self.conn.commit()
return True
except mysql.connector.IntegrityError: # 车牌号已存在
return False
except Error as e:
messagebox.showerror("数据库错误", f"停车操作时发生错误: {e}")
return False
def pickup(self, license_plate):
"""处理取车逻辑,更新数据库记录"""
try:
# 更新取车时间
query = "UPDATE parked_cars SET picked_up_time = CURRENT_TIMESTAMP WHERE license_plate = %s AND picked_up_time IS NULL"
values = (license_plate,)
self.cursor.execute(query, values)
if self.cursor.rowcount > 0:
self.conn.commit()
return True
else:
messagebox.showinfo("取车失败", f"车牌号{license_plate}的车辆未找到或已取走。")
return False
except Error as e:
messagebox.showerror("数据库错误", f"取车操作时发生错误: {e}")
return False
# 其他方法如查询、编辑、统计等可以根据需要类似地实现
def query_car_info(self, license_plate):
"""根据车牌号查询车辆停放信息"""
try:
query = """
SELECT
license_plate,
parked_time,
picked_up_time
FROM
parked_cars
WHERE
license_plate = %s
"""
values = (license_plate,)
self.cursor.execute(query, values)
car_info = self.cursor.fetchone()
if car_info:
# 格式化输出信息
parked_time_str = car_info[1].strftime('%Y-%m-%d %H:%M:%S')
picked_up_time_str = "尚未取车" if car_info[2] is None else car_info[2].strftime('%Y-%m-%d %H:%M:%S')
message = f"车牌号: {car_info[0]}\n停放时间: {parked_time_str}\n取车时间: {picked_up_time_str}"
messagebox.showinfo("车辆信息", message)
else:
messagebox.showinfo("查询结果", "未找到该车牌号的车辆信息。")
except Error as e:
messagebox.showerror("查询错误", f"查询车辆信息时发生错误: {e}")
def display_all_cars_info(self):
"""显示所有车辆的停放信息"""
try:
query = """
SELECT
license_plate,
parked_time,
picked_up_time
FROM
parked_cars
"""
self.cursor.execute(query)
car_info = self.cursor.fetchall()
if car_info:
# 遍历所有车辆信息并格式化输出
all_cars_message = "\n\n".join([
f"车牌号: {car[0]}\n"
f"停放时间: {car[1].strftime('%Y-%m-%d %H:%M:%S')}\n"
f"取车时间: {'未取车' if car[2] is None else car[2].strftime('%Y-%m-%d %H:%M:%S')}"
for car in car_info
])
messagebox.showinfo("所有车辆信息", all_cars_message)
else:
messagebox.showinfo("信息提示", "当前停车场内没有车辆信息。")
except Error as e:
messagebox.showerror("显示错误", f"显示所有车辆信息时发生错误: {e}")
def count_parked_cars(self):
"""统计当前停在停车场的车辆总数"""
query = "SELECT COUNT(*) FROM parked_cars"
self.cursor.execute(query)
parked_count = self.cursor.fetchone()[0]
return parked_count
def count_empty_slots(self, total_slots):
"""计算空闲车位数"""
parked_count = self.count_parked_cars()
empty_slots = total_slots - parked_count
return empty_slots
def most_frequent_parking_time(self):
"""
分析并返回最频繁的停车时段。
这里简化处理实际应用中可能需要更复杂的SQL查询来统计时段。
"""
query = """
SELECT
HOUR(parked_time) AS hour,
COUNT(*) AS frequency
FROM parked_cars
GROUP BY hour
ORDER BY frequency DESC
LIMIT 1
"""
self.cursor.execute(query)
most_frequent_hour, _ = self.cursor.fetchone()
return f"最频繁的停车时段是第{most_frequent_hour}小时"
def close_connection(self):
"""关闭数据库连接"""
if self.conn.is_connected():
self.cursor.close()
self.conn.close()