|
|
import logging
|
|
|
#zwz: Create your views here.
|
|
|
from urllib.parse import urlparse
|
|
|
|
|
|
#zwz: 导入Django核心模块、工具及项目内模块
|
|
|
from django.conf import settings
|
|
|
from django.contrib.auth import get_user_model, login
|
|
|
from django.core.exceptions import ObjectDoesNotExist
|
|
|
from django.db import transaction
|
|
|
from django.http import HttpResponseForbidden, HttpResponseRedirect
|
|
|
from django.shortcuts import get_object_or_404, render
|
|
|
from django.urls import reverse
|
|
|
from django.utils import timezone
|
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
from django.views.generic import FormView
|
|
|
|
|
|
from djangoblog.blog_signals import oauth_user_login_signal
|
|
|
from djangoblog.utils import get_current_site, send_email, get_sha256
|
|
|
from oauth.forms import RequireEmailForm
|
|
|
from .models import OAuthUser
|
|
|
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException
|
|
|
|
|
|
logger = logging.getLogger(__name__) # 初始化日志记录器
|
|
|
|
|
|
|
|
|
#zwz: 处理重定向URL:过滤非法地址,返回安全的跳转路径
|
|
|
def get_redirecturl(request):
|
|
|
nexturl = request.GET.get('next_url', None)
|
|
|
#zwz: 无URL或为登录页时,默认跳首页
|
|
|
if not nexturl or nexturl in ('/login/', '/login'):
|
|
|
return '/'
|
|
|
#zwz: 校验URL是否为本站地址,防止跨站跳转
|
|
|
p = urlparse(nexturl)
|
|
|
if p.netloc:
|
|
|
site = get_current_site().domain
|
|
|
if p.netloc.replace('www.', '') != site.replace('www.', ''):
|
|
|
logger.info('非法url:' + nexturl)
|
|
|
return "/"
|
|
|
return nexturl
|
|
|
|
|
|
|
|
|
#zwz: 第三方登录入口:根据平台类型获取授权链接
|
|
|
def oauthlogin(request):
|
|
|
type = request.GET.get('type', None)
|
|
|
if not type:
|
|
|
return HttpResponseRedirect('/')
|
|
|
manager = get_manager_by_type(type) # 获取对应平台的授权管理器
|
|
|
if not manager:
|
|
|
return HttpResponseRedirect('/')
|
|
|
nexturl = get_redirecturl(request)
|
|
|
authorizeurl = manager.get_authorization_url(nexturl) # 生成授权链接
|
|
|
return HttpResponseRedirect(authorizeurl)
|
|
|
|
|
|
|
|
|
#zwz: 授权回调处理:获取用户信息并绑定本地账号
|
|
|
def authorize(request):
|
|
|
type = request.GET.get('type', None)
|
|
|
if not type:
|
|
|
return HttpResponseRedirect('/')
|
|
|
manager = get_manager_by_type(type)
|
|
|
if not manager:
|
|
|
return HttpResponseRedirect('/')
|
|
|
code = request.GET.get('code', None)
|
|
|
#zwz: 通过授权码获取access_token
|
|
|
try:
|
|
|
rsp = manager.get_access_token_by_code(code)
|
|
|
except OAuthAccessTokenException as e:
|
|
|
logger.warning("OAuthAccessTokenException:" + str(e))
|
|
|
return HttpResponseRedirect('/')
|
|
|
except Exception as e:
|
|
|
logger.error(e)
|
|
|
rsp = None
|
|
|
nexturl = get_redirecturl(request)
|
|
|
if not rsp:
|
|
|
return HttpResponseRedirect(manager.get_authorization_url(nexturl))
|
|
|
#zwz: 获取第三方用户信息
|
|
|
user = manager.get_oauth_userinfo()
|
|
|
if user:
|
|
|
#zwz: 补全默认昵称(防止空值)
|
|
|
if not user.nickname or not user.nickname.strip():
|
|
|
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
|
|
|
#zwz: 存在旧记录则更新,否则创建新记录
|
|
|
try:
|
|
|
temp = OAuthUser.objects.get(type=type, openid=user.openid)
|
|
|
temp.picture = user.picture
|
|
|
temp.metadata = user.metadata
|
|
|
temp.nickname = user.nickname
|
|
|
user = temp
|
|
|
except ObjectDoesNotExist:
|
|
|
pass
|
|
|
#zwz: Facebook的token过长,这里清空
|
|
|
if type == 'facebook':
|
|
|
user.token = ''
|
|
|
#zwz: 有邮箱则直接绑定本地用户
|
|
|
if user.email:
|
|
|
with transaction.atomic(): # 事务保证数据一致性
|
|
|
author = None
|
|
|
try:
|
|
|
author = get_user_model().objects.get(id=user.author_id)
|
|
|
except ObjectDoesNotExist:
|
|
|
pass
|
|
|
if not author:
|
|
|
#zwz: 邮箱不存在则创建新用户
|
|
|
result = get_user_model().objects.get_or_create(email=user.email)
|
|
|
author = result[0]
|
|
|
if result[1]:
|
|
|
#zwz: 处理昵称重复
|
|
|
try:
|
|
|
get_user_model().objects.get(username=user.nickname)
|
|
|
except ObjectDoesNotExist:
|
|
|
author.username = user.nickname
|
|
|
else:
|
|
|
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
|
|
|
author.source = 'authorize'
|
|
|
author.save()
|
|
|
#zwz: 关联用户并登录
|
|
|
user.author = author
|
|
|
user.save()
|
|
|
oauth_user_login_signal.send(sender=authorize.__class__, id=user.id)
|
|
|
login(request, author)
|
|
|
return HttpResponseRedirect(nexturl)
|
|
|
#zwz: 无邮箱则跳转到邮箱绑定页
|
|
|
else:
|
|
|
user.save()
|
|
|
url = reverse('oauth:require_email', kwargs={'oauthid': user.id})
|
|
|
return HttpResponseRedirect(url)
|
|
|
else:
|
|
|
return HttpResponseRedirect(nexturl)
|
|
|
|
|
|
|
|
|
#zwz: 邮箱验证:确认邮箱并完成绑定
|
|
|
def emailconfirm(request, id, sign):
|
|
|
#zwz: 校验签名是否合法
|
|
|
if not sign or get_sha256(settings.SECRET_KEY + str(id) + settings.SECRET_KEY).upper() != sign.upper():
|
|
|
return HttpResponseForbidden()
|
|
|
oauthuser = get_object_or_404(OAuthUser, pk=id)
|
|
|
with transaction.atomic():
|
|
|
#zwz: 关联或创建本地用户
|
|
|
if oauthuser.author:
|
|
|
author = get_user_model().objects.get(pk=oauthuser.author_id)
|
|
|
else:
|
|
|
result = get_user_model().objects.get_or_create(email=oauthuser.email)
|
|
|
author = result[0]
|
|
|
if result[1]:
|
|
|
author.source = 'emailconfirm'
|
|
|
author.username = oauthuser.nickname.strip() or "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
|
|
|
author.save()
|
|
|
oauthuser.author = author
|
|
|
oauthuser.save()
|
|
|
#zwz: 发送登录信号并完成登录
|
|
|
oauth_user_login_signal.send(sender=emailconfirm.__class__, id=oauthuser.id)
|
|
|
login(request, author)
|
|
|
#zwz: 发送绑定成功邮件
|
|
|
site = 'http://' + get_current_site().domain
|
|
|
content = _('''<p>恭喜您成功绑定邮箱,可直接用%(oauthuser_type)s登录本站。</p >
|
|
|
欢迎关注本站:<a href=" ">%(site)s</a >''') % {'oauthuser_type': oauthuser.type, 'site': site}
|
|
|
send_email(emailto=[oauthuser.email, ], title=_('绑定成功!'), content=content)
|
|
|
return HttpResponseRedirect(reverse('oauth:bindsuccess', kwargs={'oauthid': id}) + '?type=success')
|
|
|
|
|
|
|
|
|
#zwz: 邮箱绑定表单页:处理无邮箱用户的邮箱填写
|
|
|
class RequireEmailView(FormView):
|
|
|
form_class = RequireEmailForm
|
|
|
template_name = 'oauth/require_email.html'
|
|
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
|
oauthid = self.kwargs['oauthid']
|
|
|
get_object_or_404(OAuthUser, pk=oauthid)
|
|
|
return super().get(request, *args, **kwargs)
|
|
|
|
|
|
def get_initial(self):
|
|
|
#zwz: 初始化表单默认值
|
|
|
return {'email': '', 'oauthid': self.kwargs['oauthid']}
|
|
|
|
|
|
def get_context_data(self, **kwargs):
|
|
|
#zwz: 传递用户头像到模板
|
|
|
oauthuser = get_object_or_404(OAuthUser, pk=self.kwargs['oauthid'])
|
|
|
if oauthuser.picture:
|
|
|
kwargs['picture'] = oauthuser.picture
|
|
|
return super().get_context_data(**kwargs)
|
|
|
|
|
|
def form_valid(self, form):
|
|
|
#zwz: 提交邮箱后发送验证邮件
|
|
|
email = form.cleaned_data['email']
|
|
|
oauthid = form.cleaned_data['oauthid']
|
|
|
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
|
|
|
oauthuser.email = email
|
|
|
oauthuser.save()
|
|
|
#zwz: 生成验证签名
|
|
|
sign = get_sha256(settings.SECRET_KEY + str(oauthuser.id) + settings.SECRET_KEY)
|
|
|
site = get_current_site().domain if not settings.DEBUG else '127.0.0.1:8000'
|
|
|
path = reverse('oauth:email_confirm', kwargs={'id': oauthid, 'sign': sign})
|
|
|
url = f"http://{site}{path}"
|
|
|
#zwz: 发送验证邮件
|
|
|
content = _(f'<p>请点击链接绑定邮箱:<a href="{url}">{url}</a ></p >')
|
|
|
send_email(emailto=[email, ], title=_('绑定邮箱'), content=content)
|
|
|
return HttpResponseRedirect(reverse('oauth:bindsuccess', kwargs={'oauthid': oauthid}) + '?type=email')
|
|
|
|
|
|
|
|
|
#zwz: 绑定结果页:显示成功提示
|
|
|
def bindsuccess(request, oauthid):
|
|
|
type = request.GET.get('type', None)
|
|
|
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
|
|
|
#zwz: 根据类型返回不同提示内容
|
|
|
if type == 'email':
|
|
|
title = _('绑定邮箱')
|
|
|
content = _('请登录邮箱完成最后一步绑定')
|
|
|
else:
|
|
|
title = _('绑定成功')
|
|
|
content = _(f"已用{oauthuser.type}绑定账号,可直接登录")
|
|
|
return render(request, 'oauth/bindsuccess.html', {'title': title, 'content': content}) |