import conn cur = conn.cur def login(username, password): """ 用户登录 :param username: 用户名 :param password: 密码 :return: """ try: sql = "select password from userlogin where username = '%s'" % username cur.execute(sql) res = cur.fetchone() if res is None: raise Exception("用户名错误,登录失败") else: my_password = res[0] if my_password == password: return True else: return False except Exception as e: raise e def query_all_employees(): """ 查询所有员工信息 :return: """ try: sql = "SELECT * FROM Employee" cur.execute(sql) res = cur.fetchall() for row in res: print(f"员工ID: {row[0]}\t姓名: {row[1]}\t性别: {row[2]}\t职位: {row[4]}\t部门ID: {row[5]}\t联系方式: {row[6]}\t入职日期: {row[7]}\t状态: {row[8]}\t出生日期: {row[3]}") print("-" * 50) except Exception as e: print(f"查询失败: {e}") def query_employee_by_id(employee_id): """ 根据员工ID查询员工信息 :param employee_id: 员工ID :return: """ try: sql = "SELECT * FROM Employee WHERE EmployeeID = '%s'" % employee_id cur.execute(sql) res = cur.fetchone() if res: print(f"员工ID: {row[0]}\t姓名: {row[1]}\t性别: {row[2]}\t职位: {row[4]}\t部门ID: {row[5]}\t联系方式: {row[6]}\t入职日期: {row[7]}\t状态: {row[8]}\t出生日期: {row[3]}") else: print("没有找到该员工信息!") print("-" * 50) except Exception as e: print(f"查询失败: {e}") def add_employee(): """ 添加新员工信息 :return: """ print("-" * 50) print("请输入新员工的信息:") employee_id = input("员工ID:") name = input("姓名:") gender = input("性别(男/女):") position = input("职位:") department_id = input("部门ID:") contact_info = input("联系方式:") hire_date = input("入职日期(YYYY-MM-DD):") employment_status = input("状态(在职/离职/休假):") date_of_birth = input("出生日期(YYYY-MM-DD):") # 新增出生日期 try: sql = """ INSERT INTO Employee (EmployeeID, Name, Gender, Position, DepartmentID, ContactInfo, HireDate, EmploymentStatus, DateOfBirth) VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s') """ % (employee_id, name, gender, position, department_id, contact_info, hire_date, employment_status, date_of_birth) cur.execute(sql) conn.conn.commit() print("添加新员工成功!") print("-" * 50) except Exception as e: print(f"添加失败: {e}") def update_employee(): """ 修改员工信息(如职位、状态、出生日期等) :return: """ print("-" * 50) employee_id = input("请输入要修改信息的员工ID:") print("请选择要修改的信息:") print("1. 修改职位") print("2. 修改状态") print("3. 修改出生日期") # 新增修改出生日期选项 choice = input("请输入选择的编号:") try: if choice == "1": new_position = input("请输入新的职位:") sql = "UPDATE Employee SET Position = '%s' WHERE EmployeeID = '%s'" % (new_position, employee_id) cur.execute(sql) conn.conn.commit() print(f"员工ID {employee_id} 的职位已更新为:{new_position}") elif choice == "2": new_status = input("请输入新的状态(在职/离职/休假):") sql = "UPDATE Employee SET EmploymentStatus = '%s' WHERE EmployeeID = '%s'" % (new_status, employee_id) cur.execute(sql) conn.conn.commit() print(f"员工ID {employee_id} 的状态已更新为:{new_status}") elif choice == "3": new_date_of_birth = input("请输入新的出生日期(YYYY-MM-DD):") # 修改出生日期 sql = "UPDATE Employee SET DateOfBirth = '%s' WHERE EmployeeID = '%s'" % (new_date_of_birth, employee_id) cur.execute(sql) conn.conn.commit() print(f"员工ID {employee_id} 的出生日期已更新为:{new_date_of_birth}") else: print("无效选择!") print("-" * 50) except Exception as e: print(f"修改失败: {e}") def delete_employee(): """ 删除员工信息 :return: """ print("-" * 50) employee_id = input("请输入要删除的员工ID:") try: # 删除员工时先删除该员工的相关信息(如考勤记录、假期申请等) sql = "DELETE FROM Attendance WHERE EmployeeID = '%s'" % employee_id cur.execute(sql) sql = "DELETE FROM LeaveRequest WHERE EmployeeID = '%s'" % employee_id cur.execute(sql) sql = "DELETE FROM PerformanceAppraisal WHERE EmployeeID = '%s'" % employee_id cur.execute(sql) sql = "DELETE FROM TrainingRecords WHERE EmployeeID = '%s'" % employee_id cur.execute(sql) sql = "DELETE FROM Employee WHERE EmployeeID = '%s'" % employee_id cur.execute(sql) conn.conn.commit() print(f"员工ID {employee_id} 及相关信息已删除!") print("-" * 50) except Exception as e: print(f"删除失败: {e}") def query_id(): """ 根据员工ID查询员工信息 :return: """ print("-" * 50) print("根据员工ID查询员工信息") employee_id = input("请输入要查询员工的ID:") try: # 使用传入的员工ID进行查询 sql = "SELECT * FROM Employee WHERE EmployeeID = '%s'" % employee_id cur.execute(sql) res = cur.fetchone() # 如果查询到结果,输出员工信息 if res: print("员工信息如下:") print("员工ID\t\t姓名\t\t性别\t\t职位\t\t部门ID\t\t联系方式\t入职日期\t\t状态") print("-" * 50) print(f"{res[0]}\t\t{res[1]}\t\t{res[2]}\t\t{res[3]}\t\t{res[4]}\t\t{res[5]}\t{res[6]}\t\t{res[7]}") print("-" * 50) else: print("没有找到该员工的信息!") print("-" * 50) except Exception as e: print(f"查询失败: {e}") def query_employee_department(): """ 查询某个员工所在的部门 :return: """ print("-" * 50) print("查询员工所在的部门信息") employee_id = input("请输入要查询员工的ID:") try: # 首先通过员工ID查找员工所在的DepartmentID sql_employee = "SELECT DepartmentID, DateOfBirth FROM Employee WHERE EmployeeID = '%s'" % employee_id # 获取出生日期 cur.execute(sql_employee) res = cur.fetchone() if res: department_id = res[0] date_of_birth = res[1] print(f"员工ID {employee_id} 的出生日期为:{date_of_birth}") # 根据DepartmentID查询部门信息 sql_department = "SELECT DepartmentName FROM Department WHERE DepartmentID = '%s'" % department_id cur.execute(sql_department) department_info = cur.fetchone() if department_info: print(f"员工ID {employee_id} 所在的部门为:{department_info[0]}") else: print("没有找到该部门的信息!") else: print("没有找到该员工的信息!") print("-" * 50) except Exception as e: print(f"查询失败: {e}") def query_attendance_by_employee_id(employee_id): """ 查询某个员工的考勤记录 :param employee_id: 员工ID :return: """ try: sql = """ SELECT a.AttendanceID, a.AttendanceDate, a.CheckInTime, a.CheckOutTime, a.WorkingHours, a.OvertimeHours, a.AttendanceStatus FROM Attendance a WHERE a.EmployeeID = '%s' """ % employee_id cur.execute(sql) res = cur.fetchall() for row in res: print(f"考勤ID: {row[0]}\t考勤日期: {row[1]}\t上班时间: {row[2]}\t下班时间: {row[3]}\t工作时长: {row[4]}\t加班时长: {row[5]}\t考勤状态: {row[6]}") print("-" * 50) except Exception as e: print(e) def query_employees_by_department(department_id): """ 查询某个部门的员工信息 :param department_id: 部门ID :return: """ try: sql = """ SELECT e.EmployeeID, e.Name, e.Position, e.ContactInfo, e.HireDate, e.EmploymentStatus FROM Employee e WHERE e.DepartmentID = '%s' """ % department_id cur.execute(sql) res = cur.fetchall() for row in res: print(f"员工ID: {row[0]}\t姓名: {row[1]}\t职位: {row[2]}\t联系方式: {row[3]}\t入职日期: {row[4]}\t状态: {row[5]}") print("-" * 50) except Exception as e: print(e) def query_leave_requests_by_employee(employee_id): """ 查询某个员工的假期申请记录 :param employee_id: 员工ID :return: """ try: sql = """ SELECT lr.RequestID, lr.LeaveType, lr.StartDate, lr.EndDate, lr.LeaveDays, lr.RequestStatus, lr.Remarks FROM LeaveRequest lr WHERE lr.EmployeeID = '%s' """ % employee_id cur.execute(sql) res = cur.fetchall() for row in res: print(f"申请ID: {row[0]}\t假期类型: {row[1]}\t开始日期: {row[2]}\t结束日期: {row[3]}\t假期天数: {row[4]}\t申请状态: {row[5]}\t备注: {row[6]}") print("-" * 50) except Exception as e: print(e) def query_attendance_summary_by_employee(employee_id, month): """ 查询某个员工的考勤统计(按月) :param employee_id: 员工ID :param month: 统计月份(YYYY-MM) :return: """ try: sql = """ SELECT asy.PresentDays, asy.LateCount, asy.EarlyLeaveCount, asy.AbsentDays, asy.TotalOvertimeHours FROM AttendanceSummary asy WHERE asy.EmployeeID = '%s' AND asy.Month = '%s' """ % (employee_id, month) cur.execute(sql) res = cur.fetchone() if res: print(f"出勤天数: {res[0]}\t迟到次数: {res[1]}\t早退次数: {res[2]}\t缺勤天数: {res[3]}\t加班总时长: {res[4]}") else: print("该员工在此月份的考勤统计信息不存在!") print("-" * 50) except Exception as e: print(e) def query_all_user_roles(): """ 查询所有用户角色及描述 :return: """ try: sql = "SELECT RoleID, RoleName, RoleDescription FROM UserRoles" cur.execute(sql) res = cur.fetchall() for row in res: print(f"角色ID: {row[0]}\t角色名称: {row[1]}\t角色描述: {row[2]}") print("-" * 50) except Exception as e: print(e) def query_performance_reviews_by_employee(employee_id): """ 查询某个员工的绩效考核记录 :param employee_id: 员工ID :return: """ try: sql = """ SELECT pr.ReviewID, pr.ReviewDate, pr.ReviewPeriod, pr.ReviewScore, pr.ReviewGrade, pr.ReviewerID FROM PerformanceAppraisal pr WHERE pr.EmployeeID = '%s' """ % employee_id cur.execute(sql) res = cur.fetchall() for row in res: print(f"考核ID: {row[0]}\t考核日期: {row[1]}\t考核周期: {row[2]}\t考核评分: {row[3]}\t考核等级: {row[4]}\t考核人ID: {row[5]}") print("-" * 50) except Exception as e: print(e) def query_employees_after_hire_date(): """ 查询在某个入职日期之后的员工 :return: """ # 获取用户输入的日期 hire_date = input("请输入入职日期(格式:YYYY-MM-DD):") try: # 根据入职日期筛选员工,查找入职日期大于输入日期的员工 sql = "SELECT * FROM Employee WHERE HireDate > '%s'" % hire_date cur.execute(sql) res = cur.fetchall() if res: print("入职日期之后的员工信息:") print("员工ID\t\t姓名\t\t性别\t\t职位\t\t部门ID\t联系方式\t入职日期\t状态") print("-" * 50) for row in res: print(f"{row[0]}\t\t{row[1]}\t\t{row[2]}\t\t{row[3]}\t\t{row[4]}\t\t{row[5]}\t{row[6]}\t\t{row[7]}") print("-" * 50) else: print("没有找到符合条件的员工!") print("-" * 50) except Exception as e: print(f"查询失败: {e}") def query_attendance_by_status(): """ 查询某个考勤状态的考勤记录 :return: """ # 获取用户输入的考勤状态 attendance_status = input("请输入考勤状态(如:正常、迟到、早退、缺勤):") try: # 根据考勤状态筛选考勤记录 sql = "SELECT * FROM Attendance WHERE AttendanceStatus = '%s'" % attendance_status cur.execute(sql) res = cur.fetchall() if res: print(f"考勤状态为'{attendance_status}'的考勤记录:") print("考勤ID\t\t员工ID\t考勤日期\t上班时间\t下班时间\t工作时长\t加班时长\t考勤状态") print("-" * 50) for row in res: print(f"{row[0]}\t{row[1]}\t{row[2]}\t{row[3]}\t{row[4]}\t{row[5]}\t{row[6]}\t{row[7]}") print("-" * 50) else: print(f"没有找到考勤状态为'{attendance_status}'的考勤记录!") print("-" * 50) except Exception as e: print(f"查询失败: {e}") def query_department_manager(): """ 查询某个部门的主管名称 :return: """ print("-" * 50) print("查询某个部门的主管名称") # 获取用户输入的部门ID department_id = input("请输入要查询的部门ID:") try: # 根据部门ID查询主管的EmployeeID(ManagerID) sql_department = "SELECT ManagerID FROM Department WHERE DepartmentID = '%s'" % department_id cur.execute(sql_department) res = cur.fetchone() if res: manager_id = res[0] print(f"部门ID {department_id} 的主管ID是:{manager_id}") # 根据ManagerID查询主管的姓名 sql_employee = "SELECT Name FROM Employee WHERE EmployeeID = '%s'" % manager_id cur.execute(sql_employee) manager_info = cur.fetchone() if manager_info: print(f"部门ID {department_id} 的主管姓名是:{manager_info[0]}") else: print("没有找到该主管的信息!") else: print("没有找到该部门的信息!") print("-" * 50) except Exception as e: print(f"查询失败: {e}") def close(): """ 关闭数据库 :return: """ cur.close() conn.conn.close()