用户模块完成

master
BINGWU 10 months ago
parent 4a3f7bf5d0
commit 4901734d9d

@ -0,0 +1,18 @@
def execute_before(func):
def wrapper(*args, **kwargs):
# 在这里执行需要提前运行的代码
print("Executing code before the function")
# 然后调用原始函数
return func(*args, **kwargs)
return wrapper
@execute_before
def my_function(age):
print("My function")
print(age)
# 调用my_function将会先执行装饰器中的代码然后再执行my_function
my_function(9999)

@ -1,12 +1,35 @@
from mongoengine import Document, EmbeddedDocument, fields
class SubjectModel(EmbeddedDocument):
subjectName = fields.StringField()
subjectScore = fields.IntField()
class ChildrenModel(EmbeddedDocument):
index = fields.StringField(max_length=30)
name = fields.StringField(max_length=30)
class AsideDataModel(EmbeddedDocument):
icon = fields.StringField(max_length=50)
index = fields.StringField(max_length=30)
name = fields.StringField(max_length=30)
children = fields.ListField(fields.EmbeddedDocumentField(ChildrenModel))
class MetaModel(EmbeddedDocument):
name = fields.StringField(max_length=30)
url = fields.StringField(max_length=30)
class RouterDataModel(EmbeddedDocument):
name = fields.StringField(max_length=30)
url = fields.StringField(max_length=30)
path = fields.StringField(max_length=30)
meta = fields.EmbeddedDocumentField(MetaModel)
class UserModel(Document):
avatar = fields.StringField(max_length=30)
username = fields.StringField(max_length=30)
password = fields.StringField(max_length=30)
subjects = fields.ListField(fields.EmbeddedDocumentField(SubjectModel))
authData = fields.StringField(max_length=50)
asideData = fields.ListField(fields.EmbeddedDocumentField(AsideDataModel))
RouterData = fields.ListField(fields.EmbeddedDocumentField(RouterDataModel))
userType = fields.StringField(max_length=30)

@ -4,17 +4,14 @@ from jwt import ExpiredSignatureError
import jwt
def get_jwt_token(user_name, role_data='default'):
def get_jwt_token(username, password, role_data='default'):
"""
生成jwt-token
:param unit_name:
:param role_data:
:return:
"""
payload = {
'exp': datetime.utcnow() + timedelta(seconds=10), # 单位秒
'exp': datetime.utcnow() + timedelta(seconds=300), # 单位秒
'iat': datetime.utcnow(),
'data': {'username': user_name, 'role_data': role_data}
'data': {'username': username, 'password': password, 'role_data': role_data}
}
encoded_jwt = jwt.encode(payload, JWT_SECRET_KEY, algorithm=JWT_ALGORITHM)
return str(encoded_jwt)
@ -22,12 +19,11 @@ def get_jwt_token(user_name, role_data='default'):
def decode_jwt_token(encoded_jwt):
try:
# 关闭过期时间检验
decoded_token = jwt.decode(encoded_jwt, JWT_SECRET_KEY, algorithms=['HS256'])
return decoded_token
return {'code': 2000}
except ExpiredSignatureError:
# JWT 令牌已过期
return {'error': 'JWT token has expired'}
return {'error': 'token已过期', 'code': 1000}
except jwt.InvalidTokenError:
# 其他 JWT 令牌验证错误
return {'error': 'Invalid JWT token'}
return {'error': 'token不可用', 'code': 1000}

@ -5,30 +5,34 @@ from serve.utils.connectdb import connectdb
from bson.objectid import ObjectId
from serve.middleware.handleErrorMethod import handleErrorMethod
from serve.middleware.handleEmpty import handleEmpty
from serve.utils.pyjwt import get_jwt_token,decode_jwt_token
from serve.utils.pyjwt import get_jwt_token, decode_jwt_token
collection = connectdb('user_model')
def handleUserData(user):
user['_id'] = str(user['_id'])
return user
# 解码字节字符串并解析为 JSON 数据
def decodeBody(request):
body_str = request.body.decode('utf-8')
data = json.loads(body_str)
return data
def createUser(request):
result = handleErrorMethod(request.method, 'POST')
if result[0]:
data = json.loads(request.body)
print(data)
data1 = {
'username': 'Alice',
'password': '123',
'subjects': [
{
'subjectName': '1234',
'subjectScore': 90
}
ret = decode_jwt_token(request.META.get('HTTP_AUTHORIZATION'))
if ret['code'] == 2000:
data = decodeBody(request)
collection.insert_one(data)
return JsonResponse({"code": 2000, "msg": "添加成功"})
else:
return JsonResponse(ret)
]
}
collection.insert_one(data1)
return JsonResponse({"code": 2000, "msg": "添加成功"})
else:
return JsonResponse(result[1])
@ -36,11 +40,17 @@ def createUser(request):
def deleteUser(request):
result = handleErrorMethod(request.method, 'DELETE')
if result[0]:
_ids = ['660188e85f5269231b716281']
filter_criteria = {'_id': {'$in': [ObjectId(_id) for _id in _ids]}}
deleteResult = collection.delete_many(filter_criteria)
res = handleEmpty(deleteResult.deleted_count, '删除')
return JsonResponse(res)
ret = decode_jwt_token(request.META.get('HTTP_AUTHORIZATION'))
if ret['code'] == 2000:
data = decodeBody(request)
_ids = list(data['_ids'])
filter_criteria = {'_id': {'$in': [ObjectId(_id) for _id in _ids]}}
deleteResult = collection.delete_many(filter_criteria)
res = handleEmpty(deleteResult.deleted_count, '删除')
return JsonResponse(res)
else:
return JsonResponse(ret)
else:
return JsonResponse(result[1])
@ -48,14 +58,18 @@ def deleteUser(request):
def updateUser(request):
result = handleErrorMethod(request.method, 'PUT')
if result[0]:
_id = '6601b0749157ae8a26e2af7b'
update_data = {
'username': 'juuuujjj',
'password': 'hhhhuuuuh3'
}
updateResult = collection.update_one({'_id': ObjectId(_id)}, {"$set": update_data})
res = handleEmpty(updateResult.modified_count, '更新')
return JsonResponse(res)
ret = decode_jwt_token(request.META.get('HTTP_AUTHORIZATION'))
if ret['code'] == 2000:
data = decodeBody(request)
_id = data['_id']
update_data = data['userData']
print(_id)
updateResult = collection.update_one({'_id': ObjectId(_id)}, {"$set": update_data})
res = handleEmpty(updateResult.modified_count, '更新用户')
return JsonResponse(res)
else:
return JsonResponse(ret)
else:
return JsonResponse(result[1])
@ -63,36 +77,75 @@ def updateUser(request):
def getUser(request):
result = handleErrorMethod(request.method, 'GET')
if result[0]:
_id = '660195d50d5cb377419134f7' # 替换为你要查找的文档的 _id
user = collection.find_one({'_id': ObjectId(_id)})
if user:
user['_id'] = str(user['_id'])
return JsonResponse(user)
ret = decode_jwt_token(request.META.get('HTTP_AUTHORIZATION'))
if ret['code'] == 2000:
_id = request.GET.get('_id', None)
user = collection.find_one({'_id': ObjectId(_id)})
if user:
user['_id'] = str(user['_id'])
return JsonResponse(user)
else:
return JsonResponse({"code": 1000, "msg": "数据不存在"})
else:
return JsonResponse({"code": 1000, "msg": "数据不存在"})
return JsonResponse(ret)
else:
return JsonResponse(result[1])
def getAllUser(request):
result = handleErrorMethod(request.method, 'GET')
if result[0]:
ret = decode_jwt_token(request.META.get('HTTP_AUTHORIZATION'))
if ret['code'] == 2000:
pageSize = int(request.GET.get('pageSize', None))
pageIndex = int(request.GET.get('pageIndex', None))
# 计算要跳过的文档数量
skip_count = (pageIndex - 1) * pageSize
# 执行分页查询
cursor = collection.find().skip(skip_count).limit(pageSize)
# 将查询结果转换为列表
result = [{**doc, '_id': str(doc['_id'])} for doc in cursor]
# 构建JSON格式的响应
response_data = {
'total': collection.count_documents({}), # 获取总文档数
'data': result,
'code': 2000
}
return JsonResponse(response_data)
else:
return JsonResponse(ret)
else:
return JsonResponse(result[1])
def loginUser(request):
result = handleErrorMethod(request.method, 'POST')
token = get_jwt_token('abc')
if result[0]:
data = decodeBody(request)
query = {
"$and": [
{"username": data['username']},
{"password": data['password']}
]
}
user = collection.find_one(query)
if user:
token = get_jwt_token(data['username'], data['password'])
return JsonResponse({"code": 2000, "msg": "登陆成功", "token": token})
else:
return JsonResponse({"code": 1000, "msg": "数据不存在"})
return JsonResponse({"code": 2000, "msg": "登陆成功", "token": token})
else:
return JsonResponse(result[1])
def testUser(request):
result = handleErrorMethod(request.method, 'POST')
data = decode_jwt_token(
"eyJhbGciOiJIUzI1NiIsInR5cCsI6IkpXVCJ9.eyJleHAiOjE3MTE1MDIzMDcsImlhdCI6MTcxMTUwMjI5NywiZGF0YSI6eyJ1c2VybmFtZSI6ImFiYyIsInJvbGVfZGF0YSI6ImRlZmF1bHQifX0.KYpzmz0RWqyAMI3P7UN2I_TdrZoNaXafrFcZuSajsqQ")
print(data)
token = request.META.get('HTTP_AUTHORIZATION')
data = decode_jwt_token(token)
if result[0]:
return JsonResponse(data)
else:
return JsonResponse(result[1])
@ -103,6 +156,7 @@ user_url = [
path('user/delete', deleteUser),
path('user/update', updateUser),
path('user/get', getUser),
path('user/get-all', getAllUser),
path('user/login', loginUser),
path('user/test', testUser),
]

Loading…
Cancel
Save