You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Django/doc/oauth/views.py

313 lines
14 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import logging
# Create your views here.
from urllib.parse import urlparse # 用于解析URL验证跳转地址合法性
from django.conf import settings
from django.contrib.auth import get_user_model # 用于动态获取用户模型
from django.contrib.auth import login # 用于用户登录
from django.core.exceptions import ObjectDoesNotExist # 用于处理对象不存在异常
from django.db import transaction # 用于数据库事务处理,确保操作原子性
from django.http import HttpResponseForbidden # 用于返回403禁止访问响应
from django.http import HttpResponseRedirect # 用于HTTP重定向
from django.shortcuts import get_object_or_404 # 用于获取对象不存在则返回404
from django.shortcuts import render # 用于渲染模板
from django.urls import reverse # 用于反向解析URL
from django.utils import timezone # 用于获取当前时间
from django.utils.translation import gettext_lazy as _ # 用于国际化翻译
from django.views.generic import FormView # 用于基于类的表单视图
from djangoblog.blog_signals import oauth_user_login_signal # 导入oauth登录信号
from djangoblog.utils import get_current_site # 用于获取当前站点信息
from djangoblog.utils import send_email, get_sha256 # 导入发送邮件和加密工具函数
from oauth.forms import RequireEmailForm # 导入补充邮箱的表单
from .models import OAuthUser # 导入OAuth用户模型
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException # 导入OAuth管理器和异常
# 初始化日志记录器
logger = logging.getLogger(__name__)
def get_redirecturl(request):
"""
处理并验证跳转URL确保跳转地址安全仅允许本站域名
:param request: 请求对象
:return: 验证后的合法跳转URL默认返回'/'
"""
nexturl = request.GET.get('next_url', None)
# 过滤非法或默认的跳转地址
if not nexturl or nexturl == '/login/' or nexturl == '/login':
nexturl = '/'
return nexturl
# 解析URL验证域名是否为本站
p = urlparse(nexturl)
if p.netloc: # 存在域名部分时验证
site = get_current_site().domain
# 移除www.前缀后比较,确保域名一致
if not p.netloc.replace('www.', '') == site.replace('www.', ''):
logger.info('非法url:' + nexturl)
return "/"
return nexturl
def oauthlogin(request):
"""
OAuth登录入口根据平台类型生成第三方授权链接并跳转
:param request: 请求对象,包含'type'参数如weibo、github
:return: 重定向到第三方平台授权页面
"""
type = request.GET.get('type', None)
if not type: # 未指定平台类型,跳转到首页
return HttpResponseRedirect('/')
# 获取对应平台的OAuth管理器
manager = get_manager_by_type(type)
if not manager: # 管理器不存在,跳转到首页
return HttpResponseRedirect('/')
# 获取合法的跳转地址(授权成功后返回的页面)
nexturl = get_redirecturl(request)
# 生成第三方平台的授权URL
authorizeurl = manager.get_authorization_url(nexturl)
# 重定向到授权页面
return HttpResponseRedirect(authorizeurl)
def authorize(request):
"""
OAuth授权回调处理接收第三方平台返回的code获取用户信息并完成登录
:param request: 请求对象,包含'type'(平台类型)和'code'(授权码)
:return: 重定向到目标页面或补充邮箱页面
"""
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
# 获取对应平台的OAuth管理器
manager = get_manager_by_type(type)
if not manager:
return HttpResponseRedirect('/')
# 获取授权码
code = request.GET.get('code', None)
try:
# 通过code获取access token
rsp = manager.get_access_token_by_code(code)
except OAuthAccessTokenException as e:
logger.warning("OAuthAccessTokenException:" + str(e))
return HttpResponseRedirect('/')
except Exception as e:
logger.error(e)
rsp = None
# 获取授权成功后的跳转地址
nexturl = get_redirecturl(request)
if not rsp: # 获取token失败重新跳转授权
return HttpResponseRedirect(manager.get_authorization_url(nexturl))
# 通过token获取第三方用户信息
user = manager.get_oauth_userinfo()
if user:
# 处理用户昵称为空的情况,生成默认昵称
if not user.nickname or not user.nickname.strip():
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
try:
# 若用户已存在(同平台+同openid更新用户信息
temp = OAuthUser.objects.get(type=type, openid=user.openid)
temp.picture = user.picture
temp.metadata = user.metadata
temp.nickname = user.nickname
user = temp
except ObjectDoesNotExist:
# 用户不存在,使用新用户对象
pass
# Facebook的token过长不存储
if type == 'facebook':
user.token = ''
# 若用户提供了邮箱,直接关联或创建本地用户并登录
if user.email:
with transaction.atomic(): # 数据库事务:确保操作原子性
author = None
try:
# 尝试获取已关联的本地用户
author = get_user_model().objects.get(id=user.author_id)
except ObjectDoesNotExist:
pass
if not author:
# 不存在则创建或获取本地用户(通过邮箱匹配)
result = get_user_model().objects.get_or_create(email=user.email)
author = result[0]
# 若为新创建的用户,设置用户名和来源
if result[1]:
try:
# 检查用户名是否已存在
get_user_model().objects.get(username=user.nickname)
except ObjectDoesNotExist:
author.username = user.nickname
else:
# 用户名已存在,生成唯一用户名
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.source = 'authorize' # 标记来源为oauth授权
author.save()
# 关联本地用户并保存
user.author = author
user.save()
# 发送oauth用户登录信号用于后续处理如日志、统计等
oauth_user_login_signal.send(
sender=authorize.__class__, id=user.id)
# 登录用户
login(request, author)
# 重定向到目标页面
return HttpResponseRedirect(nexturl)
else:
# 未提供邮箱,保存用户信息并跳转到补充邮箱页面
user.save()
url = reverse('oauth:require_email', kwargs={'oauthid': user.id})
return HttpResponseRedirect(url)
else:
# 获取用户信息失败,跳转到目标页面
return HttpResponseRedirect(nexturl)
def emailconfirm(request, id, sign):
"""
邮箱确认处理:验证签名合法性,完成用户与邮箱的绑定并登录
:param request: 请求对象
:param id: OAuthUser的ID
:param sign: 验证签名基于SECRET_KEY和id生成
:return: 重定向到绑定成功页面
"""
if not sign: # 签名为空返回403
return HttpResponseForbidden()
# 验证签名合法性(忽略大小写)
if not get_sha256(settings.SECRET_KEY + str(id) + settings.SECRET_KEY).upper() == sign.upper():
return HttpResponseForbidden()
# 获取对应的OAuth用户
oauthuser = get_object_or_404(OAuthUser, pk=id)
with transaction.atomic(): # 数据库事务
if oauthuser.author:
# 已关联本地用户,直接获取
author = get_user_model().objects.get(pk=oauthuser.author_id)
else:
# 未关联,通过邮箱创建或获取本地用户
result = get_user_model().objects.get_or_create(email=oauthuser.email)
author = result[0]
if result[1]: # 新创建用户
author.source = 'emailconfirm' # 标记来源为邮箱确认
# 设置用户名为OAuth用户的昵称或生成默认
author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip() else "djangoblog" + timezone.now().strftime(
'%y%m%d%I%M%S')
author.save()
# 关联本地用户并保存
oauthuser.author = author
oauthuser.save()
# 发送登录信号
oauth_user_login_signal.send(sender=emailconfirm.__class__, id=oauthuser.id)
# 登录用户
login(request, author)
# 发送绑定成功邮件
site = 'http://' + get_current_site().domain
content = _('''
<p>Congratulations, you have successfully bound your email address. You can use
%(oauthuser_type)s to directly log in to this website without a password.</p>
You are welcome to continue to follow this site, the address is
<a href="%(site)s" rel="bookmark">%(site)s</a>
Thank you again!
<br />
If the link above cannot be opened, please copy this link to your browser.
%(site)s
''') % {'oauthuser_type': oauthuser.type, 'site': site}
send_email(emailto=[oauthuser.email, ], title=_('Congratulations on your successful binding!'), content=content)
# 重定向到绑定成功页面
url = reverse('oauth:bindsuccess', kwargs={'oauthid': id})
url = url + '?type=success'
return HttpResponseRedirect(url)
class RequireEmailView(FormView):
"""
补充邮箱的类视图:显示表单收集用户邮箱,发送确认链接
"""
form_class = RequireEmailForm # 使用的表单类
template_name = 'oauth/require_email.html' # 渲染的模板
def get(self, request, *args, **kwargs):
"""处理GET请求获取OAuth用户若已填写邮箱则跳转注释中为跳转逻辑实际未启用"""
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.email:
pass # 若已填写邮箱,可在此处添加跳转逻辑
return super(RequireEmailView, self).get(request, *args, **kwargs)
def get_initial(self):
"""初始化表单数据预设oauthid字段"""
oauthid = self.kwargs['oauthid']
return {'email': '', 'oauthid': oauthid}
def get_context_data(self, **kwargs):
"""补充上下文数据:添加用户头像(若存在)"""
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.picture:
kwargs['picture'] = oauthuser.picture
return super(RequireEmailView, self).get_context_data(**kwargs)
def form_valid(self, form):
"""处理表单验证通过:保存邮箱,生成确认链接并发送邮件"""
email = form.cleaned_data['email']
oauthid = form.cleaned_data['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
# 保存用户邮箱
oauthuser.email = email
oauthuser.save()
# 生成验证签名
sign = get_sha256(settings.SECRET_KEY + str(oauthuser.id) + settings.SECRET_KEY)
# 获取当前站点域名
site = get_current_site().domain
if settings.DEBUG: # 开发环境使用本地地址
site = '127.0.0.1:8000'
# 生成邮箱确认链接
path = reverse('oauth:email_confirm', kwargs={'id': oauthid, 'sign': sign})
url = "http://{site}{path}".format(site=site, path=path)
# 发送确认邮件
content = _("""
<p>Please click the link below to bind your email</p>
<a href="%(url)s" rel="bookmark">%(url)s</a>
Thank you again!
<br />
If the link above cannot be opened, please copy this link to your browser.
<br />
%(url)s
""") % {'url': url}
send_email(emailto=[email, ], title=_('Bind your email'), content=content)
# 重定向到绑定成功页面(提示查收邮件)
url = reverse('oauth:bindsuccess', kwargs={'oauthid': oauthid})
url = url + '?type=email'
return HttpResponseRedirect(url)
def bindsuccess(request, oauthid):
"""
绑定成功页面:根据类型显示不同的成功信息
:param request: 请求对象,包含'type'参数email/success
:param oauthid: OAuthUser的ID
:return: 渲染绑定成功模板
"""
type = request.GET.get('type', None)
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if type == 'email':
# 已发送验证邮件的提示
title = _('Bind your email')
content = _(
'Congratulations, the binding is just one step away. '
'Please log in to your email to check the email to complete the binding. Thank you.')
else:
# 绑定完成的提示
title = _('Binding successful')
content = _(
"Congratulations, you have successfully bound your email address. You can use %(oauthuser_type)s"
" to directly log in to this website without a password. You are welcome to continue to follow this site." % {
'oauthuser_type': oauthuser.type})
return render(request, 'oauth/bindsuccess.html', {'title': title, 'content': content})