You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
tentest/doc/DjangoBlog/oauth/views.py

211 lines
8.8 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import logging
#zwz: Create your views here.
from urllib.parse import urlparse
#zwz: 导入Django核心模块、工具及项目内模块
from django.conf import settings
from django.contrib.auth import get_user_model, login
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.http import HttpResponseForbidden, HttpResponseRedirect
from django.shortcuts import get_object_or_404, render
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.views.generic import FormView
from djangoblog.blog_signals import oauth_user_login_signal
from djangoblog.utils import get_current_site, send_email, get_sha256
from oauth.forms import RequireEmailForm
from .models import OAuthUser
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException
logger = logging.getLogger(__name__) # 初始化日志记录器
#zwz: 处理重定向URL过滤非法地址返回安全的跳转路径
def get_redirecturl(request):
nexturl = request.GET.get('next_url', None)
#zwz: 无URL或为登录页时默认跳首页
if not nexturl or nexturl in ('/login/', '/login'):
return '/'
#zwz: 校验URL是否为本站地址防止跨站跳转
p = urlparse(nexturl)
if p.netloc:
site = get_current_site().domain
if p.netloc.replace('www.', '') != site.replace('www.', ''):
logger.info('非法url:' + nexturl)
return "/"
return nexturl
#zwz: 第三方登录入口:根据平台类型获取授权链接
def oauthlogin(request):
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
manager = get_manager_by_type(type) # 获取对应平台的授权管理器
if not manager:
return HttpResponseRedirect('/')
nexturl = get_redirecturl(request)
authorizeurl = manager.get_authorization_url(nexturl) # 生成授权链接
return HttpResponseRedirect(authorizeurl)
#zwz: 授权回调处理:获取用户信息并绑定本地账号
def authorize(request):
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
manager = get_manager_by_type(type)
if not manager:
return HttpResponseRedirect('/')
code = request.GET.get('code', None)
#zwz: 通过授权码获取access_token
try:
rsp = manager.get_access_token_by_code(code)
except OAuthAccessTokenException as e:
logger.warning("OAuthAccessTokenException:" + str(e))
return HttpResponseRedirect('/')
except Exception as e:
logger.error(e)
rsp = None
nexturl = get_redirecturl(request)
if not rsp:
return HttpResponseRedirect(manager.get_authorization_url(nexturl))
#zwz: 获取第三方用户信息
user = manager.get_oauth_userinfo()
if user:
#zwz: 补全默认昵称(防止空值)
if not user.nickname or not user.nickname.strip():
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
#zwz: 存在旧记录则更新,否则创建新记录
try:
temp = OAuthUser.objects.get(type=type, openid=user.openid)
temp.picture = user.picture
temp.metadata = user.metadata
temp.nickname = user.nickname
user = temp
except ObjectDoesNotExist:
pass
#zwz: Facebook的token过长这里清空
if type == 'facebook':
user.token = ''
#zwz: 有邮箱则直接绑定本地用户
if user.email:
with transaction.atomic(): # 事务保证数据一致性
author = None
try:
author = get_user_model().objects.get(id=user.author_id)
except ObjectDoesNotExist:
pass
if not author:
#zwz: 邮箱不存在则创建新用户
result = get_user_model().objects.get_or_create(email=user.email)
author = result[0]
if result[1]:
#zwz: 处理昵称重复
try:
get_user_model().objects.get(username=user.nickname)
except ObjectDoesNotExist:
author.username = user.nickname
else:
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.source = 'authorize'
author.save()
#zwz: 关联用户并登录
user.author = author
user.save()
oauth_user_login_signal.send(sender=authorize.__class__, id=user.id)
login(request, author)
return HttpResponseRedirect(nexturl)
#zwz: 无邮箱则跳转到邮箱绑定页
else:
user.save()
url = reverse('oauth:require_email', kwargs={'oauthid': user.id})
return HttpResponseRedirect(url)
else:
return HttpResponseRedirect(nexturl)
#zwz: 邮箱验证:确认邮箱并完成绑定
def emailconfirm(request, id, sign):
#zwz: 校验签名是否合法
if not sign or get_sha256(settings.SECRET_KEY + str(id) + settings.SECRET_KEY).upper() != sign.upper():
return HttpResponseForbidden()
oauthuser = get_object_or_404(OAuthUser, pk=id)
with transaction.atomic():
#zwz: 关联或创建本地用户
if oauthuser.author:
author = get_user_model().objects.get(pk=oauthuser.author_id)
else:
result = get_user_model().objects.get_or_create(email=oauthuser.email)
author = result[0]
if result[1]:
author.source = 'emailconfirm'
author.username = oauthuser.nickname.strip() or "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.save()
oauthuser.author = author
oauthuser.save()
#zwz: 发送登录信号并完成登录
oauth_user_login_signal.send(sender=emailconfirm.__class__, id=oauthuser.id)
login(request, author)
#zwz: 发送绑定成功邮件
site = 'http://' + get_current_site().domain
content = _('''<p>恭喜您成功绑定邮箱,可直接用%(oauthuser_type)s登录本站。</p >
欢迎关注本站:<a href=" ">%(site)s</a >''') % {'oauthuser_type': oauthuser.type, 'site': site}
send_email(emailto=[oauthuser.email, ], title=_('绑定成功!'), content=content)
return HttpResponseRedirect(reverse('oauth:bindsuccess', kwargs={'oauthid': id}) + '?type=success')
#zwz: 邮箱绑定表单页:处理无邮箱用户的邮箱填写
class RequireEmailView(FormView):
form_class = RequireEmailForm
template_name = 'oauth/require_email.html'
def get(self, request, *args, **kwargs):
oauthid = self.kwargs['oauthid']
get_object_or_404(OAuthUser, pk=oauthid)
return super().get(request, *args, **kwargs)
def get_initial(self):
#zwz: 初始化表单默认值
return {'email': '', 'oauthid': self.kwargs['oauthid']}
def get_context_data(self, **kwargs):
#zwz: 传递用户头像到模板
oauthuser = get_object_or_404(OAuthUser, pk=self.kwargs['oauthid'])
if oauthuser.picture:
kwargs['picture'] = oauthuser.picture
return super().get_context_data(**kwargs)
def form_valid(self, form):
#zwz: 提交邮箱后发送验证邮件
email = form.cleaned_data['email']
oauthid = form.cleaned_data['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
oauthuser.email = email
oauthuser.save()
#zwz: 生成验证签名
sign = get_sha256(settings.SECRET_KEY + str(oauthuser.id) + settings.SECRET_KEY)
site = get_current_site().domain if not settings.DEBUG else '127.0.0.1:8000'
path = reverse('oauth:email_confirm', kwargs={'id': oauthid, 'sign': sign})
url = f"http://{site}{path}"
#zwz: 发送验证邮件
content = _(f'<p>请点击链接绑定邮箱:<a href="{url}">{url}</a ></p >')
send_email(emailto=[email, ], title=_('绑定邮箱'), content=content)
return HttpResponseRedirect(reverse('oauth:bindsuccess', kwargs={'oauthid': oauthid}) + '?type=email')
#zwz: 绑定结果页:显示成功提示
def bindsuccess(request, oauthid):
type = request.GET.get('type', None)
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
#zwz: 根据类型返回不同提示内容
if type == 'email':
title = _('绑定邮箱')
content = _('请登录邮箱完成最后一步绑定')
else:
title = _('绑定成功')
content = _(f"已用{oauthuser.type}绑定账号,可直接登录")
return render(request, 'oauth/bindsuccess.html', {'title': title, 'content': content})