import logging # Create your views here. from urllib.parse import urlparse # 用于解析URL,验证跳转地址合法性 from django.conf import settings from django.contrib.auth import get_user_model # 用于动态获取用户模型 from django.contrib.auth import login # 用于用户登录 from django.core.exceptions import ObjectDoesNotExist # 用于处理对象不存在异常 from django.db import transaction # 用于数据库事务处理,确保操作原子性 from django.http import HttpResponseForbidden # 用于返回403禁止访问响应 from django.http import HttpResponseRedirect # 用于HTTP重定向 from django.shortcuts import get_object_or_404 # 用于获取对象,不存在则返回404 from django.shortcuts import render # 用于渲染模板 from django.urls import reverse # 用于反向解析URL from django.utils import timezone # 用于获取当前时间 from django.utils.translation import gettext_lazy as _ # 用于国际化翻译 from django.views.generic import FormView # 用于基于类的表单视图 from djangoblog.blog_signals import oauth_user_login_signal # 导入oauth登录信号 from djangoblog.utils import get_current_site # 用于获取当前站点信息 from djangoblog.utils import send_email, get_sha256 # 导入发送邮件和加密工具函数 from oauth.forms import RequireEmailForm # 导入补充邮箱的表单 from .models import OAuthUser # 导入OAuth用户模型 from .oauthmanager import get_manager_by_type, OAuthAccessTokenException # 导入OAuth管理器和异常 # 初始化日志记录器 logger = logging.getLogger(__name__) def get_redirecturl(request): """ 处理并验证跳转URL,确保跳转地址安全(仅允许本站域名) :param request: 请求对象 :return: 验证后的合法跳转URL,默认返回'/' """ nexturl = request.GET.get('next_url', None) # 过滤非法或默认的跳转地址 if not nexturl or nexturl == '/login/' or nexturl == '/login': nexturl = '/' return nexturl # 解析URL,验证域名是否为本站 p = urlparse(nexturl) if p.netloc: # 存在域名部分时验证 site = get_current_site().domain # 移除www.前缀后比较,确保域名一致 if not p.netloc.replace('www.', '') == site.replace('www.', ''): logger.info('非法url:' + nexturl) return "/" return nexturl def oauthlogin(request): """ OAuth登录入口:根据平台类型生成第三方授权链接并跳转 :param request: 请求对象,包含'type'参数(如weibo、github) :return: 重定向到第三方平台授权页面 """ type = request.GET.get('type', None) if not type: # 未指定平台类型,跳转到首页 return HttpResponseRedirect('/') # 获取对应平台的OAuth管理器 manager = get_manager_by_type(type) if not manager: # 管理器不存在,跳转到首页 return HttpResponseRedirect('/') # 获取合法的跳转地址(授权成功后返回的页面) nexturl = get_redirecturl(request) # 生成第三方平台的授权URL authorizeurl = manager.get_authorization_url(nexturl) # 重定向到授权页面 return HttpResponseRedirect(authorizeurl) def authorize(request): """ OAuth授权回调处理:接收第三方平台返回的code,获取用户信息并完成登录 :param request: 请求对象,包含'type'(平台类型)和'code'(授权码) :return: 重定向到目标页面或补充邮箱页面 """ type = request.GET.get('type', None) if not type: return HttpResponseRedirect('/') # 获取对应平台的OAuth管理器 manager = get_manager_by_type(type) if not manager: return HttpResponseRedirect('/') # 获取授权码 code = request.GET.get('code', None) try: # 通过code获取access token rsp = manager.get_access_token_by_code(code) except OAuthAccessTokenException as e: logger.warning("OAuthAccessTokenException:" + str(e)) return HttpResponseRedirect('/') except Exception as e: logger.error(e) rsp = None # 获取授权成功后的跳转地址 nexturl = get_redirecturl(request) if not rsp: # 获取token失败,重新跳转授权 return HttpResponseRedirect(manager.get_authorization_url(nexturl)) # 通过token获取第三方用户信息 user = manager.get_oauth_userinfo() if user: # 处理用户昵称为空的情况,生成默认昵称 if not user.nickname or not user.nickname.strip(): user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') try: # 若用户已存在(同平台+同openid),更新用户信息 temp = OAuthUser.objects.get(type=type, openid=user.openid) temp.picture = user.picture temp.metadata = user.metadata temp.nickname = user.nickname user = temp except ObjectDoesNotExist: # 用户不存在,使用新用户对象 pass # Facebook的token过长,不存储 if type == 'facebook': user.token = '' # 若用户提供了邮箱,直接关联或创建本地用户并登录 if user.email: with transaction.atomic(): # 数据库事务:确保操作原子性 author = None try: # 尝试获取已关联的本地用户 author = get_user_model().objects.get(id=user.author_id) except ObjectDoesNotExist: pass if not author: # 不存在则创建或获取本地用户(通过邮箱匹配) result = get_user_model().objects.get_or_create(email=user.email) author = result[0] # 若为新创建的用户,设置用户名和来源 if result[1]: try: # 检查用户名是否已存在 get_user_model().objects.get(username=user.nickname) except ObjectDoesNotExist: author.username = user.nickname else: # 用户名已存在,生成唯一用户名 author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') author.source = 'authorize' # 标记来源为oauth授权 author.save() # 关联本地用户并保存 user.author = author user.save() # 发送oauth用户登录信号(用于后续处理,如日志、统计等) oauth_user_login_signal.send( sender=authorize.__class__, id=user.id) # 登录用户 login(request, author) # 重定向到目标页面 return HttpResponseRedirect(nexturl) else: # 未提供邮箱,保存用户信息并跳转到补充邮箱页面 user.save() url = reverse('oauth:require_email', kwargs={'oauthid': user.id}) return HttpResponseRedirect(url) else: # 获取用户信息失败,跳转到目标页面 return HttpResponseRedirect(nexturl) def emailconfirm(request, id, sign): """ 邮箱确认处理:验证签名合法性,完成用户与邮箱的绑定并登录 :param request: 请求对象 :param id: OAuthUser的ID :param sign: 验证签名(基于SECRET_KEY和id生成) :return: 重定向到绑定成功页面 """ if not sign: # 签名为空,返回403 return HttpResponseForbidden() # 验证签名合法性(忽略大小写) if not get_sha256(settings.SECRET_KEY + str(id) + settings.SECRET_KEY).upper() == sign.upper(): return HttpResponseForbidden() # 获取对应的OAuth用户 oauthuser = get_object_or_404(OAuthUser, pk=id) with transaction.atomic(): # 数据库事务 if oauthuser.author: # 已关联本地用户,直接获取 author = get_user_model().objects.get(pk=oauthuser.author_id) else: # 未关联,通过邮箱创建或获取本地用户 result = get_user_model().objects.get_or_create(email=oauthuser.email) author = result[0] if result[1]: # 新创建用户 author.source = 'emailconfirm' # 标记来源为邮箱确认 # 设置用户名为OAuth用户的昵称(或生成默认) author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip() else "djangoblog" + timezone.now().strftime( '%y%m%d%I%M%S') author.save() # 关联本地用户并保存 oauthuser.author = author oauthuser.save() # 发送登录信号 oauth_user_login_signal.send(sender=emailconfirm.__class__, id=oauthuser.id) # 登录用户 login(request, author) # 发送绑定成功邮件 site = 'http://' + get_current_site().domain content = _('''
Congratulations, you have successfully bound your email address. You can use %(oauthuser_type)s to directly log in to this website without a password.
You are welcome to continue to follow this site, the address is %(site)s Thank you again!Please click the link below to bind your email
%(url)s Thank you again!