You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

448 lines
15 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import conn
cur = conn.cur
def login(username, password):
"""
用户登录
:param username: 用户名
:param password: 密码
:return:
"""
try:
sql = "select password from userlogin where username = '%s'" % username
cur.execute(sql)
res = cur.fetchone()
if res is None:
raise Exception("用户名错误,登录失败")
else:
my_password = res[0]
if my_password == password:
return True
else:
return False
except Exception as e:
raise e
def query_all_employees():
"""
查询所有员工信息
:return:
"""
try:
sql = "SELECT * FROM Employee"
cur.execute(sql)
res = cur.fetchall()
for row in res:
print(f"员工ID: {row[0]}\t姓名: {row[1]}\t性别: {row[2]}\t职位: {row[4]}\t部门ID: {row[5]}\t联系方式: {row[6]}\t入职日期: {row[7]}\t状态: {row[8]}\t出生日期: {row[3]}")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def query_employee_by_id(employee_id):
"""
根据员工ID查询员工信息
:param employee_id: 员工ID
:return:
"""
try:
sql = "SELECT * FROM Employee WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
res = cur.fetchone()
if res:
print(f"员工ID: {row[0]}\t姓名: {row[1]}\t性别: {row[2]}\t职位: {row[4]}\t部门ID: {row[5]}\t联系方式: {row[6]}\t入职日期: {row[7]}\t状态: {row[8]}\t出生日期: {row[3]}")
else:
print("没有找到该员工信息!")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def add_employee():
"""
添加新员工信息
:return:
"""
print("-" * 50)
print("请输入新员工的信息:")
employee_id = input("员工ID")
name = input("姓名:")
gender = input("性别(男/女):")
position = input("职位:")
department_id = input("部门ID")
contact_info = input("联系方式:")
hire_date = input("入职日期YYYY-MM-DD")
employment_status = input("状态(在职/离职/休假):")
date_of_birth = input("出生日期YYYY-MM-DD") # 新增出生日期
try:
sql = """
INSERT INTO Employee (EmployeeID, Name, Gender, Position, DepartmentID, ContactInfo, HireDate, EmploymentStatus, DateOfBirth)
VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')
""" % (employee_id, name, gender, position, department_id, contact_info, hire_date, employment_status, date_of_birth)
cur.execute(sql)
conn.conn.commit()
print("添加新员工成功!")
print("-" * 50)
except Exception as e:
print(f"添加失败: {e}")
def update_employee():
"""
修改员工信息(如职位、状态、出生日期等)
:return:
"""
print("-" * 50)
employee_id = input("请输入要修改信息的员工ID")
print("请选择要修改的信息:")
print("1. 修改职位")
print("2. 修改状态")
print("3. 修改出生日期") # 新增修改出生日期选项
choice = input("请输入选择的编号:")
try:
if choice == "1":
new_position = input("请输入新的职位:")
sql = "UPDATE Employee SET Position = '%s' WHERE EmployeeID = '%s'" % (new_position, employee_id)
cur.execute(sql)
conn.conn.commit()
print(f"员工ID {employee_id} 的职位已更新为:{new_position}")
elif choice == "2":
new_status = input("请输入新的状态(在职/离职/休假):")
sql = "UPDATE Employee SET EmploymentStatus = '%s' WHERE EmployeeID = '%s'" % (new_status, employee_id)
cur.execute(sql)
conn.conn.commit()
print(f"员工ID {employee_id} 的状态已更新为:{new_status}")
elif choice == "3":
new_date_of_birth = input("请输入新的出生日期YYYY-MM-DD") # 修改出生日期
sql = "UPDATE Employee SET DateOfBirth = '%s' WHERE EmployeeID = '%s'" % (new_date_of_birth, employee_id)
cur.execute(sql)
conn.conn.commit()
print(f"员工ID {employee_id} 的出生日期已更新为:{new_date_of_birth}")
else:
print("无效选择!")
print("-" * 50)
except Exception as e:
print(f"修改失败: {e}")
def delete_employee():
"""
删除员工信息
:return:
"""
print("-" * 50)
employee_id = input("请输入要删除的员工ID")
try:
# 删除员工时先删除该员工的相关信息(如考勤记录、假期申请等)
sql = "DELETE FROM Attendance WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
sql = "DELETE FROM LeaveRequest WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
sql = "DELETE FROM PerformanceAppraisal WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
sql = "DELETE FROM TrainingRecords WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
sql = "DELETE FROM Employee WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
conn.conn.commit()
print(f"员工ID {employee_id} 及相关信息已删除!")
print("-" * 50)
except Exception as e:
print(f"删除失败: {e}")
def query_id():
"""
根据员工ID查询员工信息
:return:
"""
print("-" * 50)
print("根据员工ID查询员工信息")
employee_id = input("请输入要查询员工的ID")
try:
# 使用传入的员工ID进行查询
sql = "SELECT * FROM Employee WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
res = cur.fetchone()
# 如果查询到结果,输出员工信息
if res:
print("员工信息如下:")
print("员工ID\t\t姓名\t\t性别\t\t职位\t\t部门ID\t\t联系方式\t入职日期\t\t状态")
print("-" * 50)
print(f"{res[0]}\t\t{res[1]}\t\t{res[2]}\t\t{res[3]}\t\t{res[4]}\t\t{res[5]}\t{res[6]}\t\t{res[7]}")
print("-" * 50)
else:
print("没有找到该员工的信息!")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def query_employee_department():
"""
查询某个员工所在的部门
:return:
"""
print("-" * 50)
print("查询员工所在的部门信息")
employee_id = input("请输入要查询员工的ID")
try:
# 首先通过员工ID查找员工所在的DepartmentID
sql_employee = "SELECT DepartmentID, DateOfBirth FROM Employee WHERE EmployeeID = '%s'" % employee_id # 获取出生日期
cur.execute(sql_employee)
res = cur.fetchone()
if res:
department_id = res[0]
date_of_birth = res[1]
print(f"员工ID {employee_id} 的出生日期为:{date_of_birth}")
# 根据DepartmentID查询部门信息
sql_department = "SELECT DepartmentName FROM Department WHERE DepartmentID = '%s'" % department_id
cur.execute(sql_department)
department_info = cur.fetchone()
if department_info:
print(f"员工ID {employee_id} 所在的部门为:{department_info[0]}")
else:
print("没有找到该部门的信息!")
else:
print("没有找到该员工的信息!")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def query_attendance_by_employee_id(employee_id):
"""
查询某个员工的考勤记录
:param employee_id: 员工ID
:return:
"""
try:
sql = """
SELECT a.AttendanceID, a.AttendanceDate, a.CheckInTime, a.CheckOutTime, a.WorkingHours, a.OvertimeHours, a.AttendanceStatus
FROM Attendance a
WHERE a.EmployeeID = '%s'
""" % employee_id
cur.execute(sql)
res = cur.fetchall()
for row in res:
print(f"考勤ID: {row[0]}\t考勤日期: {row[1]}\t上班时间: {row[2]}\t下班时间: {row[3]}\t工作时长: {row[4]}\t加班时长: {row[5]}\t考勤状态: {row[6]}")
print("-" * 50)
except Exception as e:
print(e)
def query_employees_by_department(department_id):
"""
查询某个部门的员工信息
:param department_id: 部门ID
:return:
"""
try:
sql = """
SELECT e.EmployeeID, e.Name, e.Position, e.ContactInfo, e.HireDate, e.EmploymentStatus
FROM Employee e
WHERE e.DepartmentID = '%s'
""" % department_id
cur.execute(sql)
res = cur.fetchall()
for row in res:
print(f"员工ID: {row[0]}\t姓名: {row[1]}\t职位: {row[2]}\t联系方式: {row[3]}\t入职日期: {row[4]}\t状态: {row[5]}")
print("-" * 50)
except Exception as e:
print(e)
def query_leave_requests_by_employee(employee_id):
"""
查询某个员工的假期申请记录
:param employee_id: 员工ID
:return:
"""
try:
sql = """
SELECT lr.RequestID, lr.LeaveType, lr.StartDate, lr.EndDate, lr.LeaveDays, lr.RequestStatus, lr.Remarks
FROM LeaveRequest lr
WHERE lr.EmployeeID = '%s'
""" % employee_id
cur.execute(sql)
res = cur.fetchall()
for row in res:
print(f"申请ID: {row[0]}\t假期类型: {row[1]}\t开始日期: {row[2]}\t结束日期: {row[3]}\t假期天数: {row[4]}\t申请状态: {row[5]}\t备注: {row[6]}")
print("-" * 50)
except Exception as e:
print(e)
def query_attendance_summary_by_employee(employee_id, month):
"""
查询某个员工的考勤统计(按月)
:param employee_id: 员工ID
:param month: 统计月份YYYY-MM
:return:
"""
try:
sql = """
SELECT asy.PresentDays, asy.LateCount, asy.EarlyLeaveCount, asy.AbsentDays, asy.TotalOvertimeHours
FROM AttendanceSummary asy
WHERE asy.EmployeeID = '%s' AND asy.Month = '%s'
""" % (employee_id, month)
cur.execute(sql)
res = cur.fetchone()
if res:
print(f"出勤天数: {res[0]}\t迟到次数: {res[1]}\t早退次数: {res[2]}\t缺勤天数: {res[3]}\t加班总时长: {res[4]}")
else:
print("该员工在此月份的考勤统计信息不存在!")
print("-" * 50)
except Exception as e:
print(e)
def query_all_user_roles():
"""
查询所有用户角色及描述
:return:
"""
try:
sql = "SELECT RoleID, RoleName, RoleDescription FROM UserRoles"
cur.execute(sql)
res = cur.fetchall()
for row in res:
print(f"角色ID: {row[0]}\t角色名称: {row[1]}\t角色描述: {row[2]}")
print("-" * 50)
except Exception as e:
print(e)
def query_performance_reviews_by_employee(employee_id):
"""
查询某个员工的绩效考核记录
:param employee_id: 员工ID
:return:
"""
try:
sql = """
SELECT pr.ReviewID, pr.ReviewDate, pr.ReviewPeriod, pr.ReviewScore, pr.ReviewGrade, pr.ReviewerID
FROM PerformanceAppraisal pr
WHERE pr.EmployeeID = '%s'
""" % employee_id
cur.execute(sql)
res = cur.fetchall()
for row in res:
print(f"考核ID: {row[0]}\t考核日期: {row[1]}\t考核周期: {row[2]}\t考核评分: {row[3]}\t考核等级: {row[4]}\t考核人ID: {row[5]}")
print("-" * 50)
except Exception as e:
print(e)
def query_employees_after_hire_date():
"""
查询在某个入职日期之后的员工
:return:
"""
# 获取用户输入的日期
hire_date = input("请输入入职日期格式YYYY-MM-DD")
try:
# 根据入职日期筛选员工,查找入职日期大于输入日期的员工
sql = "SELECT * FROM Employee WHERE HireDate > '%s'" % hire_date
cur.execute(sql)
res = cur.fetchall()
if res:
print("入职日期之后的员工信息:")
print("员工ID\t\t姓名\t\t性别\t\t职位\t\t部门ID\t联系方式\t入职日期\t状态")
print("-" * 50)
for row in res:
print(f"{row[0]}\t\t{row[1]}\t\t{row[2]}\t\t{row[3]}\t\t{row[4]}\t\t{row[5]}\t{row[6]}\t\t{row[7]}")
print("-" * 50)
else:
print("没有找到符合条件的员工!")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def query_attendance_by_status():
"""
查询某个考勤状态的考勤记录
:return:
"""
# 获取用户输入的考勤状态
attendance_status = input("请输入考勤状态(如:正常、迟到、早退、缺勤):")
try:
# 根据考勤状态筛选考勤记录
sql = "SELECT * FROM Attendance WHERE AttendanceStatus = '%s'" % attendance_status
cur.execute(sql)
res = cur.fetchall()
if res:
print(f"考勤状态为'{attendance_status}'的考勤记录:")
print("考勤ID\t\t员工ID\t考勤日期\t上班时间\t下班时间\t工作时长\t加班时长\t考勤状态")
print("-" * 50)
for row in res:
print(f"{row[0]}\t{row[1]}\t{row[2]}\t{row[3]}\t{row[4]}\t{row[5]}\t{row[6]}\t{row[7]}")
print("-" * 50)
else:
print(f"没有找到考勤状态为'{attendance_status}'的考勤记录!")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def query_department_manager():
"""
查询某个部门的主管名称
:return:
"""
print("-" * 50)
print("查询某个部门的主管名称")
# 获取用户输入的部门ID
department_id = input("请输入要查询的部门ID")
try:
# 根据部门ID查询主管的EmployeeIDManagerID
sql_department = "SELECT ManagerID FROM Department WHERE DepartmentID = '%s'" % department_id
cur.execute(sql_department)
res = cur.fetchone()
if res:
manager_id = res[0]
print(f"部门ID {department_id} 的主管ID是{manager_id}")
# 根据ManagerID查询主管的姓名
sql_employee = "SELECT Name FROM Employee WHERE EmployeeID = '%s'" % manager_id
cur.execute(sql_employee)
manager_info = cur.fetchone()
if manager_info:
print(f"部门ID {department_id} 的主管姓名是:{manager_info[0]}")
else:
print("没有找到该主管的信息!")
else:
print("没有找到该部门的信息!")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def close():
"""
关闭数据库
:return:
"""
cur.close()
conn.conn.close()