You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoftwareMethodology/src/DjangoBlog-master/oauth/views.py

325 lines
14 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import logging
# 定义视图函数(处理用户请求的逻辑)
from urllib.parse import urlparse # 用于解析URL验证跳转地址合法性
from django.conf import settings
from django.contrib.auth import get_user_model # 获取自定义用户模型
from django.contrib.auth import login # 用于用户登录
from django.core.exceptions import ObjectDoesNotExist # 用于捕获对象不存在异常
from django.db import transaction # 用于数据库事务,确保操作原子性
from django.http import HttpResponseForbidden # 用于返回403禁止访问响应
from django.http import HttpResponseRedirect # 用于重定向响应
from django.shortcuts import get_object_or_404 # 获取对象不存在则返回404
from django.shortcuts import render # 用于渲染模板
from django.urls import reverse # 用于反向解析URL
from django.utils import timezone # 用于获取当前时间
from django.utils.translation import gettext_lazy as _ # 用于国际化翻译
from django.views.generic import FormView # 基于类的表单视图
from djangoblog.blog_signals import oauth_user_login_signal # 导入OAuth登录信号
from djangoblog.utils import get_current_site # 获取当前网站域名
from djangoblog.utils import send_email, get_sha256 # 导入发送邮件和SHA256加密工具
from oauth.forms import RequireEmailForm # 导入补充邮箱表单
from .models import OAuthUser # 导入OAuth用户模型
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException # 导入OAuth管理器和异常
# 创建当前模块的日志记录器
logger = logging.getLogger(__name__)
def get_redirecturl(request):
"""
处理授权后的跳转URL确保安全性只允许本站内跳转
"""
# 从请求参数中获取跳转地址默认值为None
nexturl = request.GET.get('next_url', None)
# 过滤非法跳转地址(如登录页)
if not nexturl or nexturl == '/login/' or nexturl == '/login':
nexturl = '/'
return nexturl
# 解析URL检查是否为本站域名
p = urlparse(nexturl)
if p.netloc: # 若URL包含域名
site = get_current_site().domain # 获取当前网站域名
# 去除www.前缀后比较,确保跳转地址为本站
if not p.netloc.replace('www.', '') == site.replace('www.', ''):
logger.info('非法url:' + nexturl) # 记录非法URL日志
return "/" # 非法则跳转到首页
return nexturl # 返回合法的跳转地址
def oauthlogin(request):
"""
OAuth登录入口生成第三方平台的授权链接引导用户跳转
"""
# 获取请求中的平台类型(如'weibo'、'github'
type = request.GET.get('type', None)
if not type: # 若未指定平台类型,跳转到首页
return HttpResponseRedirect('/')
# 获取对应平台的OAuth管理器
manager = get_manager_by_type(type)
if not manager: # 若管理器不存在,跳转到首页
return HttpResponseRedirect('/')
# 获取安全的跳转地址(授权后返回的页面)
nexturl = get_redirecturl(request)
# 生成第三方平台的授权URL
authorizeurl = manager.get_authorization_url(nexturl)
# 重定向到第三方平台的授权页面
return HttpResponseRedirect(authorizeurl)
def authorize(request):
"""
OAuth授权回调处理解析授权码、获取用户信息、绑定本地用户
"""
# 获取平台类型
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
# 获取对应平台的OAuth管理器
manager = get_manager_by_type(type)
if not manager:
return HttpResponseRedirect('/')
# 获取授权码code第三方平台返回
code = request.GET.get('code', None)
try:
# 使用授权码获取access_token
rsp = manager.get_access_token_by_code(code)
except OAuthAccessTokenException as e:
# 捕获token获取失败异常记录日志并跳转首页
logger.warning("OAuthAccessTokenException:" + str(e))
return HttpResponseRedirect('/')
except Exception as e:
# 捕获其他异常,记录日志
logger.error(e)
rsp = None
# 获取授权后的跳转地址
nexturl = get_redirecturl(request)
if not rsp: # 若获取token失败重新生成授权链接并重定向
return HttpResponseRedirect(manager.get_authorization_url(nexturl))
# 通过token获取第三方用户信息
user = manager.get_oauth_userinfo()
if user: # 若成功获取用户信息
# 处理空昵称生成默认昵称djangoblog+时间戳)
if not user.nickname or not user.nickname.strip():
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
# 检查该第三方用户是否已存在(避免重复创建)
try:
temp = OAuthUser.objects.get(type=type, openid=user.openid)
# 更新已有用户的头像、元数据、昵称
temp.picture = user.picture
temp.metadata = user.metadata
temp.nickname = user.nickname
user = temp # 使用已有用户对象
except ObjectDoesNotExist:
# 若不存在,则使用新创建的用户对象
pass
# Facebook的token过长这里清空根据实际需求处理
if type == 'facebook':
user.token = ''
# 若用户信息包含邮箱,直接绑定本地用户
if user.email:
with transaction.atomic(): # 数据库事务:确保操作原子性
author = None
# 尝试获取已关联的本地用户
try:
author = get_user_model().objects.get(id=user.author_id)
except ObjectDoesNotExist:
pass
# 若未关联本地用户,则创建或获取本地用户
if not author:
# 按邮箱查询或创建本地用户
result = get_user_model().objects.get_or_create(email=user.email)
author = result[0]
# 若为新创建的用户,设置用户名和来源
if result[1]:
try:
# 检查昵称是否已存在
get_user_model().objects.get(username=user.nickname)
except ObjectDoesNotExist:
# 昵称不存在,直接使用
author.username = user.nickname
else:
# 昵称已存在,生成默认用户名
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.source = 'authorize' # 标记来源为OAuth授权
author.save() # 保存用户信息
# 关联本地用户到OAuthUser
user.author = author
user.save()
# 发送OAuth用户登录信号供其他模块监听处理
oauth_user_login_signal.send(
sender=authorize.__class__, id=user.id)
# 登录用户(创建会话)
login(request, author)
# 重定向到授权前的页面
return HttpResponseRedirect(nexturl)
else:
# 若用户信息不含邮箱保存OAuthUser并跳转到补充邮箱页面
user.save()
url = reverse('oauth:require_email', kwargs={'oauthid': user.id})
return HttpResponseRedirect(url)
else:
# 若未获取到用户信息,跳转到授权前的页面
return HttpResponseRedirect(nexturl)
def emailconfirm(request, id, sign):
"""
邮箱验证:验证签名合法性,完成本地用户绑定
"""
if not sign: # 签名为空返回403禁止访问
return HttpResponseForbidden()
# 验证签名使用SECRET_KEY+ID+SECRET_KEY生成SHA256签名
if not get_sha256(settings.SECRET_KEY + str(id) + settings.SECRET_KEY).upper() == sign.upper():
return HttpResponseForbidden() # 签名不匹配返回403
# 获取对应的OAuthUser对象不存在则返回404
oauthuser = get_object_or_404(OAuthUser, pk=id)
with transaction.atomic(): # 数据库事务
# 检查是否已关联本地用户
if oauthuser.author:
author = get_user_model().objects.get(pk=oauthuser.author_id)
else:
# 按邮箱查询或创建本地用户
result = get_user_model().objects.get_or_create(email=oauthuser.email)
author = result[0]
# 若为新创建的用户,设置用户名和来源
if result[1]:
author.source = 'emailconfirm' # 标记来源为邮箱验证
# 设置用户名为OAuth用户的昵称为空则生成默认值
author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip() else "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.save()
# 关联本地用户到OAuthUser
oauthuser.author = author
oauthuser.save()
# 发送OAuth用户登录信号
oauth_user_login_signal.send(
sender=emailconfirm.__class__, id=oauthuser.id)
# 登录用户
login(request, author)
# 发送绑定成功邮件
site = 'http://' + get_current_site().domain
content = _('''
<p>Congratulations, you have successfully bound your email address. You can use
%(oauthuser_type)s to directly log in to this website without a password.</p>
You are welcome to continue to follow this site, the address is
<a href="%(site)s" rel="bookmark">%(site)s</a>
Thank you again!
<br />
If the link above cannot be opened, please copy this link to your browser.
%(site)s
''') % {'oauthuser_type': oauthuser.type, 'site': site}
send_email(emailto=[oauthuser.email, ], title=_('Congratulations on your successful binding!'), content=content)
# 重定向到绑定成功页面
url = reverse('oauth:bindsuccess', kwargs={'oauthid': id})
url = url + '?type=success'
return HttpResponseRedirect(url)
class RequireEmailView(FormView):
"""
补充邮箱视图:处理用户补充邮箱的表单提交
"""
form_class = RequireEmailForm # 使用的表单类
template_name = 'oauth/require_email.html' # 渲染的模板
def get(self, request, *args, **kwargs):
"""处理GET请求展示补充邮箱表单"""
oauthid = self.kwargs['oauthid'] # 从URL中获取OAuthUser的ID
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
# 若已存在邮箱,可在此处添加逻辑(如跳转到首页)
# if oauthuser.email:
# return HttpResponseRedirect('/')
return super(RequireEmailView, self).get(request, *args, **kwargs)
def get_initial(self):
"""设置表单初始值填充oauthid"""
oauthid = self.kwargs['oauthid']
return {'email': '', 'oauthid': oauthid}
def get_context_data(self,** kwargs):
"""向模板传递额外上下文:用户头像"""
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.picture:
kwargs['picture'] = oauthuser.picture # 传递头像URL
return super(RequireEmailView, self).get_context_data(**kwargs)
def form_valid(self, form):
"""处理表单验证通过的逻辑:保存邮箱并发送验证邮件"""
email = form.cleaned_data['email'] # 获取清洗后的邮箱
oauthid = form.cleaned_data['oauthid'] # 获取OAuthUser的ID
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
# 保存用户邮箱
oauthuser.email = email
oauthuser.save()
# 生成邮箱验证签名
sign = get_sha256(settings.SECRET_KEY + str(oauthuser.id) + settings.SECRET_KEY)
# 获取当前网站域名开发环境使用127.0.0.1:8000
site = get_current_site().domain
if settings.DEBUG:
site = '127.0.0.1:8000'
# 生成邮箱验证链接
path = reverse('oauth:email_confirm', kwargs={'id': oauthid, 'sign': sign})
url = "http://{site}{path}".format(site=site, path=path)
# 发送邮箱验证邮件
content = _("""
<p>Please click the link below to bind your email</p>
<a href="%(url)s" rel="bookmark">%(url)s</a>
Thank you again!
<br />
If the link above cannot be opened, please copy this link to your browser.
<br />
%(url)s
""") % {'url': url}
send_email(emailto=[email, ], title=_('Bind your email'), content=content)
# 重定向到绑定成功页面(提示用户查收邮件)
url = reverse('oauth:bindsuccess', kwargs={'oauthid': oauthid})
url = url + '?type=email'
return HttpResponseRedirect(url)
def bindsuccess(request, oauthid):
"""
绑定成功页面:展示绑定结果信息
"""
type = request.GET.get('type', None) # 获取类型email待验证success已验证
oauthuser = get_object_or_404(OAuthUser, pk=oauthid) # 获取对应的OAuthUser
# 根据类型设置页面标题和内容
if type == 'email':
title = _('Bind your email')
content = _(
'Congratulations, the binding is just one step away. '
'Please log in to your email to check the email to complete the binding. Thank you.')
else:
title = _('Binding successful')
content = _(
"Congratulations, you have successfully bound your email address. You can use %(oauthuser_type)s"
" to directly log in to this website without a password. You are welcome to continue to follow this site." % {
'oauthuser_type': oauthuser.type})
# 渲染绑定成功页面
return render(request, 'oauth/bindsuccess.html', {
'title': title,
'content': content
})