import mysql.connector from mysql.connector import Error # 数据库配置信息 config = { 'user': 'root', 'password': '123456', 'host': 'localhost', 'database': 'drivingschool', 'raise_on_warnings': True } # 连接数据库 def connect_to_db(config): try: connection = mysql.connector.connect(**config) return connection except Error as e: print(f"The error '{e}' occurred") # 断开数据库连接 def close_db_connection(connection): if connection.is_connected(): connection.close() # 添加新学生 def add_new_student(connection, student_id, name, password, phone, email): cursor = connection.cursor() query = """ CALL AddNewStudent(%s, %s, %s, %s, %s) """ cursor.execute(query, (student_id, name, password, phone, email)) connection.commit() cursor.close() # 更新预约状态 def update_appointment_status(connection, appointment_id, new_status): cursor = connection.cursor() query = """ CALL UpdateAppointmentStatus(%s, %s) """ cursor.execute(query, (appointment_id, new_status)) connection.commit() cursor.close() # 查询学生详细信息 def get_student_details(connection, student_id): cursor = connection.cursor() query = """ SELECT * FROM student_details WHERE student_id = %s """ cursor.execute(query, (student_id,)) result = cursor.fetchone() cursor.close() return result # 查询教练详细信息 def get_coach_details(connection, coach_id): cursor = connection.cursor() query = """ SELECT * FROM coach_details WHERE coach_id = %s """ cursor.execute(query, (coach_id,)) result = cursor.fetchone() cursor.close() return result # 查询预约详细信息 def get_appointment_details(connection, appointment_id): cursor = connection.cursor() query = """ SELECT * FROM appointment_details WHERE appointment_id = %s """ cursor.execute(query, (appointment_id,)) result = cursor.fetchone() cursor.close() return result # 查询训练记录详细信息 def get_training_record_details(connection, record_id): cursor = connection.cursor() query = """ SELECT * FROM training_record_view WHERE record_id = %s """ cursor.execute(query, (record_id,)) result = cursor.fetchone() cursor.close() return result # 查询发票详细信息 def get_invoice_details(connection, invoice_id): cursor = connection.cursor() query = """ SELECT * FROM invoice_details WHERE invoice_id = %s """ cursor.execute(query, (invoice_id,)) result = cursor.fetchone() cursor.close() return result # 查询所有学生详细信息 def get_all_students(connection): cursor = connection.cursor() query = """ SELECT * FROM student_details """ cursor.execute(query) results = cursor.fetchall() cursor.close() return results # 查询所有教练详细信息 def get_all_coaches(connection): cursor = connection.cursor() query = """ SELECT * FROM coach_details """ cursor.execute(query) results = cursor.fetchall() cursor.close() return results # 查询所有预约详细信息 def get_all_appointments(connection): cursor = connection.cursor() query = """ SELECT * FROM appointment_details """ cursor.execute(query) results = cursor.fetchall() cursor.close() return results # 查询所有训练记录详细信息 def get_all_training_records(connection): cursor = connection.cursor() query = """ SELECT * FROM training_record_view """ cursor.execute(query) results = cursor.fetchall() cursor.close() return results # 查询所有发票详细信息 def get_all_invoices(connection): cursor = connection.cursor() query = """ SELECT * FROM invoice_details """ cursor.execute(query) results = cursor.fetchall() cursor.close() return results # 查询所有反馈 def get_all_feedback(connection): cursor = connection.cursor() query = """ SELECT * FROM feedback """ cursor.execute(query) results = cursor.fetchall() cursor.close() return results # 查询所有日志 def get_all_logs(connection): cursor = connection.cursor() query = """ SELECT * FROM log """ cursor.execute(query) results = cursor.fetchall() cursor.close() return results def delete_student(connection, student_id): """删除学生""" cursor = connection.cursor() query = """ DELETE FROM students WHERE student_id = %s """ try: cursor.execute(query, (student_id,)) connection.commit() print("学生删除成功") except Error as e: print(f"删除学生时出错: {e}") connection.rollback() finally: cursor.close() # 根据用户输入执行不同的操作 def update_student(connection, student_id, name=None, password=None, phone=None, email=None): """更新学生信息""" cursor = connection.cursor() updates = [] params = [student_id] if name is not None: updates.append("name=%s") params.insert(0, name) if password is not None: updates.append("password=%s") params.insert(1, password) if phone is not None: updates.append("phone=%s") params.insert(2, phone) if email is not None: updates.append("email=%s") params.insert(3, email) if updates: query = f"UPDATE students SET {', '.join(updates)} WHERE student_id=%s" try: cursor.execute(query, tuple(params)) connection.commit() print("学生信息更新成功") except Error as e: print(f"更新学生信息时出错: {e}") connection.rollback() finally: cursor.close() else: print("没有提供要更新的信息") def main(): connection = connect_to_db(config) if connection is not None: while True: print("\n1. 添加新学生") print("2. 更新预约状态") print("3. 查询学生详细信息") print("4. 查询教练详细信息") print("5. 查询预约详细信息") print("6. 查询训练记录详细信息") print("7. 查询发票详细信息") print("8. 查询所有学生详细信息") print("9. 查询所有教练详细信息") print("10. 查询所有预约详细信息") print("11. 查询所有训练记录详细信息") print("12. 查询所有发票详细信息") print("13. 查询所有反馈") print("14. 查询所有日志") print("15. 删除学生") print("16. 更新学生信息") print("17. 退出") choice = input("请选择一个操作:") if choice == '1': student_id = input("输入学生ID:") name = input("输入学生姓名:") password = input("输入学生密码:") phone = input("输入学生电话:") email = input("输入学生邮箱:") add_new_student(connection, student_id, name, password, phone, email) print("新学生添加成功。") elif choice == '2': appointment_id = int(input("输入预约ID:")) new_status = input("输入新的预约状态:") update_appointment_status(connection, appointment_id, new_status) print("预约状态更新成功。") elif choice == '3': student_id = input("输入学生ID:") details = get_student_details(connection, student_id) print("学生详细信息:", details) elif choice == '4': coach_id = input("输入教练ID:") details = get_coach_details(connection, coach_id) print("教练详细信息:", details) elif choice == '5': appointment_id = int(input("输入预约ID:")) details = get_appointment_details(connection, appointment_id) print("预约详细信息:", details) elif choice == '6': record_id = int(input("输入训练记录ID:")) details = get_training_record_details(connection, record_id) print("训练记录详细信息:", details) elif choice == '7': invoice_id = int(input("输入发票ID:")) details = get_invoice_details(connection, invoice_id) print("发票详细信息:", details) elif choice == '8': all_students = get_all_students(connection) print("所有学生详细信息:") for student in all_students: print(student) elif choice == '9': all_coaches = get_all_coaches(connection) print("所有教练详细信息:") for coach in all_coaches: print(coach) elif choice == '10': all_appointments = get_all_appointments(connection) print("所有预约详细信息:") for appointment in all_appointments: print(appointment) elif choice == '11': all_training_records = get_all_training_records(connection) print("所有训练记录详细信息:") for record in all_training_records: print(record) elif choice == '12': all_invoices = get_all_invoices(connection) print("所有发票详细信息:") for invoice in all_invoices: print(invoice) elif choice == '13': all_feedback = get_all_feedback(connection) print("所有反馈:") for feedback in all_feedback: print(feedback) elif choice == '14': all_logs = get_all_logs(connection) print("所有日志:") for log in all_logs: print(log) elif choice == '15': student_id = input("输入要删除的学生ID:") delete_student(connection, student_id) elif choice == '16': student_id = input("输入要更新的学生ID:") name = input("输入新的学生姓名 (留空则不更改): ") or None password = input("输入新的学生密码 (留空则不更改): ") or None phone = input("输入新的学生电话 (留空则不更改): ") or None email = input("输入新的学生邮箱 (留空则不更改): ") or None update_student(connection, student_id, name, password, phone, email) elif choice == '17': break else: print("无效的选择,请重新输入。") close_db_connection(connection) close_db_connection(connection) if __name__ == "__main__": main()