实现增删改查等功能

main
m5tqol2ew 8 months ago
parent b346cf8c96
commit bdf928c7bd

@ -0,0 +1,447 @@
import conn
cur = conn.cur
def login(username, password):
"""
用户登录
:param username: 用户名
:param password: 密码
:return:
"""
try:
sql = "select password from userlogin where username = '%s'" % username
cur.execute(sql)
res = cur.fetchone()
if res is None:
raise Exception("用户名错误,登录失败")
else:
my_password = res[0]
if my_password == password:
return True
else:
return False
except Exception as e:
raise e
def query_all_employees():
"""
查询所有员工信息
:return:
"""
try:
sql = "SELECT * FROM Employee"
cur.execute(sql)
res = cur.fetchall()
for row in res:
print(f"员工ID: {row[0]}\t姓名: {row[1]}\t性别: {row[2]}\t职位: {row[4]}\t部门ID: {row[5]}\t联系方式: {row[6]}\t入职日期: {row[7]}\t状态: {row[8]}\t出生日期: {row[3]}")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def query_employee_by_id(employee_id):
"""
根据员工ID查询员工信息
:param employee_id: 员工ID
:return:
"""
try:
sql = "SELECT * FROM Employee WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
res = cur.fetchone()
if res:
print(f"员工ID: {row[0]}\t姓名: {row[1]}\t性别: {row[2]}\t职位: {row[4]}\t部门ID: {row[5]}\t联系方式: {row[6]}\t入职日期: {row[7]}\t状态: {row[8]}\t出生日期: {row[3]}")
else:
print("没有找到该员工信息!")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def add_employee():
"""
添加新员工信息
:return:
"""
print("-" * 50)
print("请输入新员工的信息:")
employee_id = input("员工ID")
name = input("姓名:")
gender = input("性别(男/女):")
position = input("职位:")
department_id = input("部门ID")
contact_info = input("联系方式:")
hire_date = input("入职日期YYYY-MM-DD")
employment_status = input("状态(在职/离职/休假):")
date_of_birth = input("出生日期YYYY-MM-DD") # 新增出生日期
try:
sql = """
INSERT INTO Employee (EmployeeID, Name, Gender, Position, DepartmentID, ContactInfo, HireDate, EmploymentStatus, DateOfBirth)
VALUES ('%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s', '%s')
""" % (employee_id, name, gender, position, department_id, contact_info, hire_date, employment_status, date_of_birth)
cur.execute(sql)
conn.conn.commit()
print("添加新员工成功!")
print("-" * 50)
except Exception as e:
print(f"添加失败: {e}")
def update_employee():
"""
修改员工信息如职位状态出生日期等
:return:
"""
print("-" * 50)
employee_id = input("请输入要修改信息的员工ID")
print("请选择要修改的信息:")
print("1. 修改职位")
print("2. 修改状态")
print("3. 修改出生日期") # 新增修改出生日期选项
choice = input("请输入选择的编号:")
try:
if choice == "1":
new_position = input("请输入新的职位:")
sql = "UPDATE Employee SET Position = '%s' WHERE EmployeeID = '%s'" % (new_position, employee_id)
cur.execute(sql)
conn.conn.commit()
print(f"员工ID {employee_id} 的职位已更新为:{new_position}")
elif choice == "2":
new_status = input("请输入新的状态(在职/离职/休假):")
sql = "UPDATE Employee SET EmploymentStatus = '%s' WHERE EmployeeID = '%s'" % (new_status, employee_id)
cur.execute(sql)
conn.conn.commit()
print(f"员工ID {employee_id} 的状态已更新为:{new_status}")
elif choice == "3":
new_date_of_birth = input("请输入新的出生日期YYYY-MM-DD") # 修改出生日期
sql = "UPDATE Employee SET DateOfBirth = '%s' WHERE EmployeeID = '%s'" % (new_date_of_birth, employee_id)
cur.execute(sql)
conn.conn.commit()
print(f"员工ID {employee_id} 的出生日期已更新为:{new_date_of_birth}")
else:
print("无效选择!")
print("-" * 50)
except Exception as e:
print(f"修改失败: {e}")
def delete_employee():
"""
删除员工信息
:return:
"""
print("-" * 50)
employee_id = input("请输入要删除的员工ID")
try:
# 删除员工时先删除该员工的相关信息(如考勤记录、假期申请等)
sql = "DELETE FROM Attendance WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
sql = "DELETE FROM LeaveRequest WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
sql = "DELETE FROM PerformanceAppraisal WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
sql = "DELETE FROM TrainingRecords WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
sql = "DELETE FROM Employee WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
conn.conn.commit()
print(f"员工ID {employee_id} 及相关信息已删除!")
print("-" * 50)
except Exception as e:
print(f"删除失败: {e}")
def query_id():
"""
根据员工ID查询员工信息
:return:
"""
print("-" * 50)
print("根据员工ID查询员工信息")
employee_id = input("请输入要查询员工的ID")
try:
# 使用传入的员工ID进行查询
sql = "SELECT * FROM Employee WHERE EmployeeID = '%s'" % employee_id
cur.execute(sql)
res = cur.fetchone()
# 如果查询到结果,输出员工信息
if res:
print("员工信息如下:")
print("员工ID\t\t姓名\t\t性别\t\t职位\t\t部门ID\t\t联系方式\t入职日期\t\t状态")
print("-" * 50)
print(f"{res[0]}\t\t{res[1]}\t\t{res[2]}\t\t{res[3]}\t\t{res[4]}\t\t{res[5]}\t{res[6]}\t\t{res[7]}")
print("-" * 50)
else:
print("没有找到该员工的信息!")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def query_employee_department():
"""
查询某个员工所在的部门
:return:
"""
print("-" * 50)
print("查询员工所在的部门信息")
employee_id = input("请输入要查询员工的ID")
try:
# 首先通过员工ID查找员工所在的DepartmentID
sql_employee = "SELECT DepartmentID, DateOfBirth FROM Employee WHERE EmployeeID = '%s'" % employee_id # 获取出生日期
cur.execute(sql_employee)
res = cur.fetchone()
if res:
department_id = res[0]
date_of_birth = res[1]
print(f"员工ID {employee_id} 的出生日期为:{date_of_birth}")
# 根据DepartmentID查询部门信息
sql_department = "SELECT DepartmentName FROM Department WHERE DepartmentID = '%s'" % department_id
cur.execute(sql_department)
department_info = cur.fetchone()
if department_info:
print(f"员工ID {employee_id} 所在的部门为:{department_info[0]}")
else:
print("没有找到该部门的信息!")
else:
print("没有找到该员工的信息!")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def query_attendance_by_employee_id(employee_id):
"""
查询某个员工的考勤记录
:param employee_id: 员工ID
:return:
"""
try:
sql = """
SELECT a.AttendanceID, a.AttendanceDate, a.CheckInTime, a.CheckOutTime, a.WorkingHours, a.OvertimeHours, a.AttendanceStatus
FROM Attendance a
WHERE a.EmployeeID = '%s'
""" % employee_id
cur.execute(sql)
res = cur.fetchall()
for row in res:
print(f"考勤ID: {row[0]}\t考勤日期: {row[1]}\t上班时间: {row[2]}\t下班时间: {row[3]}\t工作时长: {row[4]}\t加班时长: {row[5]}\t考勤状态: {row[6]}")
print("-" * 50)
except Exception as e:
print(e)
def query_employees_by_department(department_id):
"""
查询某个部门的员工信息
:param department_id: 部门ID
:return:
"""
try:
sql = """
SELECT e.EmployeeID, e.Name, e.Position, e.ContactInfo, e.HireDate, e.EmploymentStatus
FROM Employee e
WHERE e.DepartmentID = '%s'
""" % department_id
cur.execute(sql)
res = cur.fetchall()
for row in res:
print(f"员工ID: {row[0]}\t姓名: {row[1]}\t职位: {row[2]}\t联系方式: {row[3]}\t入职日期: {row[4]}\t状态: {row[5]}")
print("-" * 50)
except Exception as e:
print(e)
def query_leave_requests_by_employee(employee_id):
"""
查询某个员工的假期申请记录
:param employee_id: 员工ID
:return:
"""
try:
sql = """
SELECT lr.RequestID, lr.LeaveType, lr.StartDate, lr.EndDate, lr.LeaveDays, lr.RequestStatus, lr.Remarks
FROM LeaveRequest lr
WHERE lr.EmployeeID = '%s'
""" % employee_id
cur.execute(sql)
res = cur.fetchall()
for row in res:
print(f"申请ID: {row[0]}\t假期类型: {row[1]}\t开始日期: {row[2]}\t结束日期: {row[3]}\t假期天数: {row[4]}\t申请状态: {row[5]}\t备注: {row[6]}")
print("-" * 50)
except Exception as e:
print(e)
def query_attendance_summary_by_employee(employee_id, month):
"""
查询某个员工的考勤统计按月
:param employee_id: 员工ID
:param month: 统计月份YYYY-MM
:return:
"""
try:
sql = """
SELECT asy.PresentDays, asy.LateCount, asy.EarlyLeaveCount, asy.AbsentDays, asy.TotalOvertimeHours
FROM AttendanceSummary asy
WHERE asy.EmployeeID = '%s' AND asy.Month = '%s'
""" % (employee_id, month)
cur.execute(sql)
res = cur.fetchone()
if res:
print(f"出勤天数: {res[0]}\t迟到次数: {res[1]}\t早退次数: {res[2]}\t缺勤天数: {res[3]}\t加班总时长: {res[4]}")
else:
print("该员工在此月份的考勤统计信息不存在!")
print("-" * 50)
except Exception as e:
print(e)
def query_all_user_roles():
"""
查询所有用户角色及描述
:return:
"""
try:
sql = "SELECT RoleID, RoleName, RoleDescription FROM UserRoles"
cur.execute(sql)
res = cur.fetchall()
for row in res:
print(f"角色ID: {row[0]}\t角色名称: {row[1]}\t角色描述: {row[2]}")
print("-" * 50)
except Exception as e:
print(e)
def query_performance_reviews_by_employee(employee_id):
"""
查询某个员工的绩效考核记录
:param employee_id: 员工ID
:return:
"""
try:
sql = """
SELECT pr.ReviewID, pr.ReviewDate, pr.ReviewPeriod, pr.ReviewScore, pr.ReviewGrade, pr.ReviewerID
FROM PerformanceAppraisal pr
WHERE pr.EmployeeID = '%s'
""" % employee_id
cur.execute(sql)
res = cur.fetchall()
for row in res:
print(f"考核ID: {row[0]}\t考核日期: {row[1]}\t考核周期: {row[2]}\t考核评分: {row[3]}\t考核等级: {row[4]}\t考核人ID: {row[5]}")
print("-" * 50)
except Exception as e:
print(e)
def query_employees_after_hire_date():
"""
查询在某个入职日期之后的员工
:return:
"""
# 获取用户输入的日期
hire_date = input("请输入入职日期格式YYYY-MM-DD")
try:
# 根据入职日期筛选员工,查找入职日期大于输入日期的员工
sql = "SELECT * FROM Employee WHERE HireDate > '%s'" % hire_date
cur.execute(sql)
res = cur.fetchall()
if res:
print("入职日期之后的员工信息:")
print("员工ID\t\t姓名\t\t性别\t\t职位\t\t部门ID\t联系方式\t入职日期\t状态")
print("-" * 50)
for row in res:
print(f"{row[0]}\t\t{row[1]}\t\t{row[2]}\t\t{row[3]}\t\t{row[4]}\t\t{row[5]}\t{row[6]}\t\t{row[7]}")
print("-" * 50)
else:
print("没有找到符合条件的员工!")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def query_attendance_by_status():
"""
查询某个考勤状态的考勤记录
:return:
"""
# 获取用户输入的考勤状态
attendance_status = input("请输入考勤状态(如:正常、迟到、早退、缺勤):")
try:
# 根据考勤状态筛选考勤记录
sql = "SELECT * FROM Attendance WHERE AttendanceStatus = '%s'" % attendance_status
cur.execute(sql)
res = cur.fetchall()
if res:
print(f"考勤状态为'{attendance_status}'的考勤记录:")
print("考勤ID\t\t员工ID\t考勤日期\t上班时间\t下班时间\t工作时长\t加班时长\t考勤状态")
print("-" * 50)
for row in res:
print(f"{row[0]}\t{row[1]}\t{row[2]}\t{row[3]}\t{row[4]}\t{row[5]}\t{row[6]}\t{row[7]}")
print("-" * 50)
else:
print(f"没有找到考勤状态为'{attendance_status}'的考勤记录!")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def query_department_manager():
"""
查询某个部门的主管名称
:return:
"""
print("-" * 50)
print("查询某个部门的主管名称")
# 获取用户输入的部门ID
department_id = input("请输入要查询的部门ID")
try:
# 根据部门ID查询主管的EmployeeIDManagerID
sql_department = "SELECT ManagerID FROM Department WHERE DepartmentID = '%s'" % department_id
cur.execute(sql_department)
res = cur.fetchone()
if res:
manager_id = res[0]
print(f"部门ID {department_id} 的主管ID是{manager_id}")
# 根据ManagerID查询主管的姓名
sql_employee = "SELECT Name FROM Employee WHERE EmployeeID = '%s'" % manager_id
cur.execute(sql_employee)
manager_info = cur.fetchone()
if manager_info:
print(f"部门ID {department_id} 的主管姓名是:{manager_info[0]}")
else:
print("没有找到该主管的信息!")
else:
print("没有找到该部门的信息!")
print("-" * 50)
except Exception as e:
print(f"查询失败: {e}")
def close():
"""
关闭数据库
:return:
"""
cur.close()
conn.conn.close()
Loading…
Cancel
Save