import logging #zwz: Create your views here. from urllib.parse import urlparse #zwz: 导入Django核心模块、工具及项目内模块 from django.conf import settings from django.contrib.auth import get_user_model, login from django.core.exceptions import ObjectDoesNotExist from django.db import transaction from django.http import HttpResponseForbidden, HttpResponseRedirect from django.shortcuts import get_object_or_404, render from django.urls import reverse from django.utils import timezone from django.utils.translation import gettext_lazy as _ from django.views.generic import FormView from djangoblog.blog_signals import oauth_user_login_signal from djangoblog.utils import get_current_site, send_email, get_sha256 from oauth.forms import RequireEmailForm from .models import OAuthUser from .oauthmanager import get_manager_by_type, OAuthAccessTokenException logger = logging.getLogger(__name__) # 初始化日志记录器 #zwz: 处理重定向URL:过滤非法地址,返回安全的跳转路径 def get_redirecturl(request): nexturl = request.GET.get('next_url', None) #zwz: 无URL或为登录页时,默认跳首页 if not nexturl or nexturl in ('/login/', '/login'): return '/' #zwz: 校验URL是否为本站地址,防止跨站跳转 p = urlparse(nexturl) if p.netloc: site = get_current_site().domain if p.netloc.replace('www.', '') != site.replace('www.', ''): logger.info('非法url:' + nexturl) return "/" return nexturl #zwz: 第三方登录入口:根据平台类型获取授权链接 def oauthlogin(request): type = request.GET.get('type', None) if not type: return HttpResponseRedirect('/') manager = get_manager_by_type(type) # 获取对应平台的授权管理器 if not manager: return HttpResponseRedirect('/') nexturl = get_redirecturl(request) authorizeurl = manager.get_authorization_url(nexturl) # 生成授权链接 return HttpResponseRedirect(authorizeurl) #zwz: 授权回调处理:获取用户信息并绑定本地账号 def authorize(request): type = request.GET.get('type', None) if not type: return HttpResponseRedirect('/') manager = get_manager_by_type(type) if not manager: return HttpResponseRedirect('/') code = request.GET.get('code', None) #zwz: 通过授权码获取access_token try: rsp = manager.get_access_token_by_code(code) except OAuthAccessTokenException as e: logger.warning("OAuthAccessTokenException:" + str(e)) return HttpResponseRedirect('/') except Exception as e: logger.error(e) rsp = None nexturl = get_redirecturl(request) if not rsp: return HttpResponseRedirect(manager.get_authorization_url(nexturl)) #zwz: 获取第三方用户信息 user = manager.get_oauth_userinfo() if user: #zwz: 补全默认昵称(防止空值) if not user.nickname or not user.nickname.strip(): user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') #zwz: 存在旧记录则更新,否则创建新记录 try: temp = OAuthUser.objects.get(type=type, openid=user.openid) temp.picture = user.picture temp.metadata = user.metadata temp.nickname = user.nickname user = temp except ObjectDoesNotExist: pass #zwz: Facebook的token过长,这里清空 if type == 'facebook': user.token = '' #zwz: 有邮箱则直接绑定本地用户 if user.email: with transaction.atomic(): # 事务保证数据一致性 author = None try: author = get_user_model().objects.get(id=user.author_id) except ObjectDoesNotExist: pass if not author: #zwz: 邮箱不存在则创建新用户 result = get_user_model().objects.get_or_create(email=user.email) author = result[0] if result[1]: #zwz: 处理昵称重复 try: get_user_model().objects.get(username=user.nickname) except ObjectDoesNotExist: author.username = user.nickname else: author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') author.source = 'authorize' author.save() #zwz: 关联用户并登录 user.author = author user.save() oauth_user_login_signal.send(sender=authorize.__class__, id=user.id) login(request, author) return HttpResponseRedirect(nexturl) #zwz: 无邮箱则跳转到邮箱绑定页 else: user.save() url = reverse('oauth:require_email', kwargs={'oauthid': user.id}) return HttpResponseRedirect(url) else: return HttpResponseRedirect(nexturl) #zwz: 邮箱验证:确认邮箱并完成绑定 def emailconfirm(request, id, sign): #zwz: 校验签名是否合法 if not sign or get_sha256(settings.SECRET_KEY + str(id) + settings.SECRET_KEY).upper() != sign.upper(): return HttpResponseForbidden() oauthuser = get_object_or_404(OAuthUser, pk=id) with transaction.atomic(): #zwz: 关联或创建本地用户 if oauthuser.author: author = get_user_model().objects.get(pk=oauthuser.author_id) else: result = get_user_model().objects.get_or_create(email=oauthuser.email) author = result[0] if result[1]: author.source = 'emailconfirm' author.username = oauthuser.nickname.strip() or "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') author.save() oauthuser.author = author oauthuser.save() #zwz: 发送登录信号并完成登录 oauth_user_login_signal.send(sender=emailconfirm.__class__, id=oauthuser.id) login(request, author) #zwz: 发送绑定成功邮件 site = 'http://' + get_current_site().domain content = _('''

恭喜您成功绑定邮箱,可直接用%(oauthuser_type)s登录本站。

欢迎关注本站:%(site)s''') % {'oauthuser_type': oauthuser.type, 'site': site} send_email(emailto=[oauthuser.email, ], title=_('绑定成功!'), content=content) return HttpResponseRedirect(reverse('oauth:bindsuccess', kwargs={'oauthid': id}) + '?type=success') #zwz: 邮箱绑定表单页:处理无邮箱用户的邮箱填写 class RequireEmailView(FormView): form_class = RequireEmailForm template_name = 'oauth/require_email.html' def get(self, request, *args, **kwargs): oauthid = self.kwargs['oauthid'] get_object_or_404(OAuthUser, pk=oauthid) return super().get(request, *args, **kwargs) def get_initial(self): #zwz: 初始化表单默认值 return {'email': '', 'oauthid': self.kwargs['oauthid']} def get_context_data(self, **kwargs): #zwz: 传递用户头像到模板 oauthuser = get_object_or_404(OAuthUser, pk=self.kwargs['oauthid']) if oauthuser.picture: kwargs['picture'] = oauthuser.picture return super().get_context_data(**kwargs) def form_valid(self, form): #zwz: 提交邮箱后发送验证邮件 email = form.cleaned_data['email'] oauthid = form.cleaned_data['oauthid'] oauthuser = get_object_or_404(OAuthUser, pk=oauthid) oauthuser.email = email oauthuser.save() #zwz: 生成验证签名 sign = get_sha256(settings.SECRET_KEY + str(oauthuser.id) + settings.SECRET_KEY) site = get_current_site().domain if not settings.DEBUG else '127.0.0.1:8000' path = reverse('oauth:email_confirm', kwargs={'id': oauthid, 'sign': sign}) url = f"http://{site}{path}" #zwz: 发送验证邮件 content = _(f'

请点击链接绑定邮箱:{url}

') send_email(emailto=[email, ], title=_('绑定邮箱'), content=content) return HttpResponseRedirect(reverse('oauth:bindsuccess', kwargs={'oauthid': oauthid}) + '?type=email') #zwz: 绑定结果页:显示成功提示 def bindsuccess(request, oauthid): type = request.GET.get('type', None) oauthuser = get_object_or_404(OAuthUser, pk=oauthid) #zwz: 根据类型返回不同提示内容 if type == 'email': title = _('绑定邮箱') content = _('请登录邮箱完成最后一步绑定') else: title = _('绑定成功') content = _(f"已用{oauthuser.type}绑定账号,可直接登录") return render(request, 'oauth/bindsuccess.html', {'title': title, 'content': content})