From bdf928c7bdc0f72cf82668e69dcc41758aedb367 Mon Sep 17 00:00:00 2001 From: m5tqol2ew Date: Sun, 29 Dec 2024 16:16:33 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E5=A2=9E=E5=88=A0=E6=94=B9?= =?UTF-8?q?=E6=9F=A5=E7=AD=89=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- operation.py | 447 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 447 insertions(+) create mode 100644 operation.py diff --git a/operation.py b/operation.py new file mode 100644 index 0000000..486ad10 --- /dev/null +++ b/operation.py @@ -0,0 +1,447 @@ + + +import conn + +cur = conn.cur + + +def login(username, password): + """ + 用户登录 + :param username: 用户名 + :param password: 密码 + :return: + """ + try: + sql = "select password from userlogin where username = '%s'" % username + cur.execute(sql) + res = cur.fetchone() + if res is None: + raise Exception("用户名错误,登录失败") + else: + my_password = res[0] + if my_password == password: + return True + else: + return False + except Exception as e: + raise e + + +def query_all_employees(): + """ + 查询所有员工信息 + :return: + """ + try: + sql = "SELECT * FROM Employee" + cur.execute(sql) + res = cur.fetchall() + for row in res: + print(f"员工ID: {row[0]}\t姓名: {row[1]}\t性别: {row[2]}\t职位: {row[4]}\t部门ID: {row[5]}\t联系方式: {row[6]}\t入职日期: {row[7]}\t状态: {row[8]}\t出生日期: {row[3]}") + print("-" * 50) + except Exception as e: + print(f"查询失败: {e}") + + + +def query_employee_by_id(employee_id): + """ + 根据员工ID查询员工信息 + :param employee_id: 员工ID + :return: + """ + try: + sql = "SELECT * FROM Employee WHERE EmployeeID = '%s'" % employee_id + cur.execute(sql) + res = cur.fetchone() + if res: + print(f"员工ID: {row[0]}\t姓名: {row[1]}\t性别: {row[2]}\t职位: {row[4]}\t部门ID: {row[5]}\t联系方式: {row[6]}\t入职日期: {row[7]}\t状态: {row[8]}\t出生日期: {row[3]}") + else: + print("没有找到该员工信息!") + print("-" * 50) + except Exception as e: + print(f"查询失败: {e}") + + +def add_employee(): + """ + 添加新员工信息 + :return: + """ + print("-" * 50) + print("请输入新员工的信息:") + employee_id = input("员工ID:") + name = input("姓名:") + gender = input("性别(男/女):") + position = input("职位:") + department_id = input("部门ID:") + contact_info = input("联系方式:") + hire_date = input("入职日期(YYYY-MM-DD):") + employment_status = input("状态(在职/离职/休假):") + date_of_birth = input("出生日期(YYYY-MM-DD):") # 新增出生日期 + + try: + sql = """ + INSERT INTO Employee (EmployeeID, Name, Gender, Position, DepartmentID, ContactInfo, HireDate, EmploymentStatus, DateOfBirth) + VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s') + """ % (employee_id, name, gender, position, department_id, contact_info, hire_date, employment_status, date_of_birth) + cur.execute(sql) + conn.conn.commit() + print("添加新员工成功!") + print("-" * 50) + except Exception as e: + print(f"添加失败: {e}") + + +def update_employee(): + """ + 修改员工信息(如职位、状态、出生日期等) + :return: + """ + print("-" * 50) + employee_id = input("请输入要修改信息的员工ID:") + print("请选择要修改的信息:") + print("1. 修改职位") + print("2. 修改状态") + print("3. 修改出生日期") # 新增修改出生日期选项 + choice = input("请输入选择的编号:") + + try: + if choice == "1": + new_position = input("请输入新的职位:") + sql = "UPDATE Employee SET Position = '%s' WHERE EmployeeID = '%s'" % (new_position, employee_id) + cur.execute(sql) + conn.conn.commit() + print(f"员工ID {employee_id} 的职位已更新为:{new_position}") + elif choice == "2": + new_status = input("请输入新的状态(在职/离职/休假):") + sql = "UPDATE Employee SET EmploymentStatus = '%s' WHERE EmployeeID = '%s'" % (new_status, employee_id) + cur.execute(sql) + conn.conn.commit() + print(f"员工ID {employee_id} 的状态已更新为:{new_status}") + elif choice == "3": + new_date_of_birth = input("请输入新的出生日期(YYYY-MM-DD):") # 修改出生日期 + sql = "UPDATE Employee SET DateOfBirth = '%s' WHERE EmployeeID = '%s'" % (new_date_of_birth, employee_id) + cur.execute(sql) + conn.conn.commit() + print(f"员工ID {employee_id} 的出生日期已更新为:{new_date_of_birth}") + else: + print("无效选择!") + print("-" * 50) + except Exception as e: + print(f"修改失败: {e}") + + +def delete_employee(): + """ + 删除员工信息 + :return: + """ + print("-" * 50) + employee_id = input("请输入要删除的员工ID:") + + try: + # 删除员工时先删除该员工的相关信息(如考勤记录、假期申请等) + sql = "DELETE FROM Attendance WHERE EmployeeID = '%s'" % employee_id + cur.execute(sql) + sql = "DELETE FROM LeaveRequest WHERE EmployeeID = '%s'" % employee_id + cur.execute(sql) + sql = "DELETE FROM PerformanceAppraisal WHERE EmployeeID = '%s'" % employee_id + cur.execute(sql) + sql = "DELETE FROM TrainingRecords WHERE EmployeeID = '%s'" % employee_id + cur.execute(sql) + sql = "DELETE FROM Employee WHERE EmployeeID = '%s'" % employee_id + cur.execute(sql) + conn.conn.commit() + print(f"员工ID {employee_id} 及相关信息已删除!") + print("-" * 50) + except Exception as e: + print(f"删除失败: {e}") + + +def query_id(): + """ + 根据员工ID查询员工信息 + :return: + """ + print("-" * 50) + print("根据员工ID查询员工信息") + employee_id = input("请输入要查询员工的ID:") + + try: + # 使用传入的员工ID进行查询 + sql = "SELECT * FROM Employee WHERE EmployeeID = '%s'" % employee_id + cur.execute(sql) + res = cur.fetchone() + + # 如果查询到结果,输出员工信息 + if res: + print("员工信息如下:") + print("员工ID\t\t姓名\t\t性别\t\t职位\t\t部门ID\t\t联系方式\t入职日期\t\t状态") + print("-" * 50) + print(f"{res[0]}\t\t{res[1]}\t\t{res[2]}\t\t{res[3]}\t\t{res[4]}\t\t{res[5]}\t{res[6]}\t\t{res[7]}") + print("-" * 50) + else: + print("没有找到该员工的信息!") + print("-" * 50) + except Exception as e: + print(f"查询失败: {e}") + + +def query_employee_department(): + """ + 查询某个员工所在的部门 + :return: + """ + print("-" * 50) + print("查询员工所在的部门信息") + employee_id = input("请输入要查询员工的ID:") + + try: + # 首先通过员工ID查找员工所在的DepartmentID + sql_employee = "SELECT DepartmentID, DateOfBirth FROM Employee WHERE EmployeeID = '%s'" % employee_id # 获取出生日期 + cur.execute(sql_employee) + res = cur.fetchone() + + if res: + department_id = res[0] + date_of_birth = res[1] + print(f"员工ID {employee_id} 的出生日期为:{date_of_birth}") + + # 根据DepartmentID查询部门信息 + sql_department = "SELECT DepartmentName FROM Department WHERE DepartmentID = '%s'" % department_id + cur.execute(sql_department) + department_info = cur.fetchone() + + if department_info: + print(f"员工ID {employee_id} 所在的部门为:{department_info[0]}") + else: + print("没有找到该部门的信息!") + else: + print("没有找到该员工的信息!") + + print("-" * 50) + except Exception as e: + print(f"查询失败: {e}") + + +def query_attendance_by_employee_id(employee_id): + """ + 查询某个员工的考勤记录 + :param employee_id: 员工ID + :return: + """ + try: + sql = """ + SELECT a.AttendanceID, a.AttendanceDate, a.CheckInTime, a.CheckOutTime, a.WorkingHours, a.OvertimeHours, a.AttendanceStatus + FROM Attendance a + WHERE a.EmployeeID = '%s' + """ % employee_id + cur.execute(sql) + res = cur.fetchall() + for row in res: + print(f"考勤ID: {row[0]}\t考勤日期: {row[1]}\t上班时间: {row[2]}\t下班时间: {row[3]}\t工作时长: {row[4]}\t加班时长: {row[5]}\t考勤状态: {row[6]}") + print("-" * 50) + except Exception as e: + print(e) + +def query_employees_by_department(department_id): + """ + 查询某个部门的员工信息 + :param department_id: 部门ID + :return: + """ + try: + sql = """ + SELECT e.EmployeeID, e.Name, e.Position, e.ContactInfo, e.HireDate, e.EmploymentStatus + FROM Employee e + WHERE e.DepartmentID = '%s' + """ % department_id + cur.execute(sql) + res = cur.fetchall() + for row in res: + print(f"员工ID: {row[0]}\t姓名: {row[1]}\t职位: {row[2]}\t联系方式: {row[3]}\t入职日期: {row[4]}\t状态: {row[5]}") + print("-" * 50) + except Exception as e: + print(e) + +def query_leave_requests_by_employee(employee_id): + """ + 查询某个员工的假期申请记录 + :param employee_id: 员工ID + :return: + """ + try: + sql = """ + SELECT lr.RequestID, lr.LeaveType, lr.StartDate, lr.EndDate, lr.LeaveDays, lr.RequestStatus, lr.Remarks + FROM LeaveRequest lr + WHERE lr.EmployeeID = '%s' + """ % employee_id + cur.execute(sql) + res = cur.fetchall() + for row in res: + print(f"申请ID: {row[0]}\t假期类型: {row[1]}\t开始日期: {row[2]}\t结束日期: {row[3]}\t假期天数: {row[4]}\t申请状态: {row[5]}\t备注: {row[6]}") + print("-" * 50) + except Exception as e: + print(e) + +def query_attendance_summary_by_employee(employee_id, month): + """ + 查询某个员工的考勤统计(按月) + :param employee_id: 员工ID + :param month: 统计月份(YYYY-MM) + :return: + """ + try: + sql = """ + SELECT asy.PresentDays, asy.LateCount, asy.EarlyLeaveCount, asy.AbsentDays, asy.TotalOvertimeHours + FROM AttendanceSummary asy + WHERE asy.EmployeeID = '%s' AND asy.Month = '%s' + """ % (employee_id, month) + cur.execute(sql) + res = cur.fetchone() + if res: + print(f"出勤天数: {res[0]}\t迟到次数: {res[1]}\t早退次数: {res[2]}\t缺勤天数: {res[3]}\t加班总时长: {res[4]}") + else: + print("该员工在此月份的考勤统计信息不存在!") + print("-" * 50) + except Exception as e: + print(e) + +def query_all_user_roles(): + """ + 查询所有用户角色及描述 + :return: + """ + try: + sql = "SELECT RoleID, RoleName, RoleDescription FROM UserRoles" + cur.execute(sql) + res = cur.fetchall() + for row in res: + print(f"角色ID: {row[0]}\t角色名称: {row[1]}\t角色描述: {row[2]}") + print("-" * 50) + except Exception as e: + print(e) + +def query_performance_reviews_by_employee(employee_id): + """ + 查询某个员工的绩效考核记录 + :param employee_id: 员工ID + :return: + """ + try: + sql = """ + SELECT pr.ReviewID, pr.ReviewDate, pr.ReviewPeriod, pr.ReviewScore, pr.ReviewGrade, pr.ReviewerID + FROM PerformanceAppraisal pr + WHERE pr.EmployeeID = '%s' + """ % employee_id + cur.execute(sql) + res = cur.fetchall() + for row in res: + print(f"考核ID: {row[0]}\t考核日期: {row[1]}\t考核周期: {row[2]}\t考核评分: {row[3]}\t考核等级: {row[4]}\t考核人ID: {row[5]}") + print("-" * 50) + except Exception as e: + print(e) + +def query_employees_after_hire_date(): + """ + 查询在某个入职日期之后的员工 + :return: + """ + # 获取用户输入的日期 + hire_date = input("请输入入职日期(格式:YYYY-MM-DD):") + + try: + # 根据入职日期筛选员工,查找入职日期大于输入日期的员工 + sql = "SELECT * FROM Employee WHERE HireDate > '%s'" % hire_date + cur.execute(sql) + res = cur.fetchall() + + if res: + print("入职日期之后的员工信息:") + print("员工ID\t\t姓名\t\t性别\t\t职位\t\t部门ID\t联系方式\t入职日期\t状态") + print("-" * 50) + for row in res: + print(f"{row[0]}\t\t{row[1]}\t\t{row[2]}\t\t{row[3]}\t\t{row[4]}\t\t{row[5]}\t{row[6]}\t\t{row[7]}") + print("-" * 50) + else: + print("没有找到符合条件的员工!") + print("-" * 50) + except Exception as e: + print(f"查询失败: {e}") + +def query_attendance_by_status(): + """ + 查询某个考勤状态的考勤记录 + :return: + """ + # 获取用户输入的考勤状态 + attendance_status = input("请输入考勤状态(如:正常、迟到、早退、缺勤):") + + try: + # 根据考勤状态筛选考勤记录 + sql = "SELECT * FROM Attendance WHERE AttendanceStatus = '%s'" % attendance_status + cur.execute(sql) + res = cur.fetchall() + + if res: + print(f"考勤状态为'{attendance_status}'的考勤记录:") + print("考勤ID\t\t员工ID\t考勤日期\t上班时间\t下班时间\t工作时长\t加班时长\t考勤状态") + print("-" * 50) + for row in res: + print(f"{row[0]}\t{row[1]}\t{row[2]}\t{row[3]}\t{row[4]}\t{row[5]}\t{row[6]}\t{row[7]}") + print("-" * 50) + else: + print(f"没有找到考勤状态为'{attendance_status}'的考勤记录!") + print("-" * 50) + except Exception as e: + print(f"查询失败: {e}") + +def query_department_manager(): + """ + 查询某个部门的主管名称 + :return: + """ + print("-" * 50) + print("查询某个部门的主管名称") + + # 获取用户输入的部门ID + department_id = input("请输入要查询的部门ID:") + + try: + # 根据部门ID查询主管的EmployeeID(ManagerID) + sql_department = "SELECT ManagerID FROM Department WHERE DepartmentID = '%s'" % department_id + cur.execute(sql_department) + res = cur.fetchone() + + if res: + manager_id = res[0] + print(f"部门ID {department_id} 的主管ID是:{manager_id}") + + # 根据ManagerID查询主管的姓名 + sql_employee = "SELECT Name FROM Employee WHERE EmployeeID = '%s'" % manager_id + cur.execute(sql_employee) + manager_info = cur.fetchone() + + if manager_info: + print(f"部门ID {department_id} 的主管姓名是:{manager_info[0]}") + else: + print("没有找到该主管的信息!") + else: + print("没有找到该部门的信息!") + + print("-" * 50) + except Exception as e: + print(f"查询失败: {e}") + + + +def close(): + """ + 关闭数据库 + :return: + """ + cur.close() + conn.conn.close() +