@ -1,5 +1,4 @@
import logging
# Create your views here.
from urllib . parse import urlparse
from django . conf import settings
@ -7,308 +6,212 @@ from django.contrib.auth import get_user_model
from django . contrib . auth import login
from django . core . exceptions import ObjectDoesNotExist
from django . db import transaction
from django . http import HttpResponseForbidden
from django . http import HttpResponseRedirect
from django . shortcuts import get_object_or_404
from django . shortcuts import render
from django . http import HttpResponseForbidden , HttpResponseRedirect
from django . shortcuts import get_object_or_404 , render
from django . urls import reverse
from django . utils import timezone
from django . utils . translation import gettext_lazy as _
from django . views . generic import FormView
from djangoblog . blog_signals import oauth_user_login_signal
from djangoblog . utils import get_current_site
from djangoblog . utils import send_email , get_sha256
from djangoblog . utils import get_current_site , send_email , get_sha256
from oauth . forms import RequireEmailForm
from . models import OAuthUser
from . oauthmanager import get_manager_by_type , OAuthAccessTokenException
# 获取logger实例用于记录日志
logger = logging . getLogger ( __name__ )
def get_redirecturl ( request ) :
"""
获取重定向URL , 并进行安全验证
Args :
request : HTTP请求对象
Returns :
str : 安全的重定向URL
"""
# 从请求参数获取next_url, 默认为None
nexturl = request . GET . get ( ' next_url ' , None )
# 如果nexturl为空或是登录页面, 则重定向到首页
if not nexturl or nexturl == ' /login/ ' or nexturl == ' /login ' :
nexturl = ' / '
return nexturl
# 解析URL, 检查域名安全性
p = urlparse ( nexturl )
if p . netloc :
next_url = request . GET . get ( ' next_url ' , None )
if not next_url or next_url in ( ' /login/ ' , ' /login ' ) :
next_url = ' / '
return next_url
parsed_url = urlparse ( next_url )
if parsed_url . netloc :
site = get_current_site ( ) . domain
# 检查域名是否匹配当前站点,防止开放重定向攻击
if not p . netloc . replace ( ' www. ' , ' ' ) == site . replace ( ' www. ' , ' ' ) :
logger . info ( ' 非法url: ' + nexturl )
if not parsed_url . netloc . replace ( ' www. ' , ' ' ) == site . replace ( ' www. ' , ' ' ) :
logger . info ( ' 非法url: ' + next_url )
return " / "
return next url
return next_url
def oauthlogin ( request ) :
"""
OAuth登录入口 - 重定向到第三方授权页面
Args :
request : HTTP请求对象
Returns :
HttpResponseRedirect : 重定向响应
"""
# 获取OAuth类型( 如weibo、github等)
type = request . GET . get ( ' type ' , None )
if not type :
oauth_type = request . GET . get ( ' type ' , None ) # 修复: 重命名type为oauth_type
if not oauth_type :
return HttpResponseRedirect ( ' / ' )
# 获取对应的OAuth管理器
manager = get_manager_by_type ( type )
if not manager :
oauth_manager = get_manager_by_type ( oauth_type ) # 修复: 重命名manager为oauth_manager
if not oauth_manager :
return HttpResponseRedirect ( ' / ' )
# 获取安全的重定向URL
nexturl = get_redirecturl ( request )
# 获取第三方授权URL并重定向
authorizeurl = manager . get_authorization_url ( nexturl )
return HttpResponseRedirect ( authorizeurl )
next_url = get_redirecturl ( request )
authorize_url = oauth_manager . get_authorization_url ( next_url ) # 修复: 重命名authorizeurl
return HttpResponseRedirect ( authorize_url )
def authorize ( request ) :
"""
OAuth授权回调处理 - 处理第三方登录回调
Args :
request : HTTP请求对象
Returns :
HttpResponseRedirect : 重定向响应
"""
type = request . GET . get ( ' type ' , None )
if not type :
oauth_type = request . GET . get ( ' type ' , None ) # 修复: 重命名type为oauth_type
if not oauth_type :
return HttpResponseRedirect ( ' / ' )
manager = get_manager_by_type ( type )
if not manager:
oauth_manager = get_manager_by_type ( oauth_type ) # 修复: 重命名manager为oauth_manager
if not oauth_manager :
return HttpResponseRedirect ( ' / ' )
# 获取授权码
code = request . GET . get ( ' code ' , None )
auth_code = request . GET . get ( ' code ' , None ) # 修复: 重命名code为auth_code
try :
# 使用授权码获取访问令牌
rsp = manager . get_access_token_by_code ( code )
token_response = oauth_manager . get_access_token_by_code ( auth_code ) # 修复: 重命名rsp
except OAuthAccessTokenException as e :
logger . warning ( " OAuthAccessTokenException: " + str ( e ) )
return HttpResponseRedirect ( ' / ' )
except Exception as e :
logger . error ( e )
rsp = None
token_ re sponse = None
nexturl = get_redirecturl ( request )
if not rsp :
# 如果获取token失败, 重新跳转到授权页面
return HttpResponseRedirect ( manager . get_authorization_url ( nexturl ) )
next_url = get_redirecturl ( request )
if not token_response :
return HttpResponseRedirect ( oauth_manager . get_authorization_url ( next_url ) )
# 获取用户信息
user = manager . get_oauth_userinfo ( )
if user :
# 处理昵称为空的情况
if not user . nickname or not user . nickname . strip ( ) :
user . nickname = " djangoblog " + timezone . now ( ) . strftime ( ' % y % m %d % I % M % S ' )
oauth_user_info = oauth_manager . get_oauth_userinfo ( ) # 修复: 重命名user为oauth_user_info
if oauth_user_info :
if not oauth_user_info . nickname or not oauth_user_info . nickname . strip ( ) :
oauth_user_info . nickname = f " djangoblog { timezone . now ( ) . strftime ( ' % y % m %d % I % M % S ' ) } "
try :
# 检查是否已存在该OAuth用户
temp = OAuthUser . objects . get ( type = type , openid = user . openid )
# 更新用户信息
temp . picture = user . picture
temp . metadata = user . metadata
temp . nickname = user . nickname
user = temp
existing_oauth_user = OAuthUser . objects . get ( type = oauth_type , openid = oauth_user_info . openid ) # 修复: 重命名temp
existing_oauth_user . picture = oauth_user_info . picture
existing_oauth_user . metadata = oauth_user_info . metadata
existing_oauth_user . nickname = oauth_user_info . nickname
oauth_user_info = existing_oauth_user
except ObjectDoesNotExist :
pass
# Facebook的token过长, 清空处理
if type == ' facebook ' :
user . token = ' '
if oauth_type == ' facebook ' :
oauth_user_info . token = ' '
# 如果用户有邮箱,直接处理登录
if user . email :
with transaction . atomic ( ) : # 使用事务保证数据一致性
author = None
if oauth_user_info . email :
with transaction . atomic ( ) :
blog_author = None # 修复: 重命名author为blog_author( 区分系统用户和OAuth用户)
try :
author = get_user_model ( ) . objects . get ( id = user. author_id )
blog_ author = get_user_model ( ) . objects . get ( id = oauth_ user_info . author_id )
except ObjectDoesNotExist :
pass
if not author :
# 创建或获取用户
result = get_user_model ( ) . objects . get_or_create ( email = user . email )
author = result [ 0 ]
if result [ 1 ] : # 如果是新创建的用户
if not blog_author :
result = get_user_model ( ) . objects . get_or_create ( email = oauth_user_info . email )
blog_author = result [ 0 ]
if result [ 1 ] :
try :
get_user_model ( ) . objects . get ( username = user. nickname )
get_user_model ( ) . objects . get ( username = oauth_ user_info . nickname )
except ObjectDoesNotExist :
author. username = user. nickname
blog_ author. username = oauth_ user_info . nickname
else :
# 用户名冲突时生成唯一用户名
author . username = " djangoblog " + timezone . now ( ) . strftime ( ' % y % m %d % I % M % S ' )
author . source = ' authorize '
author . save ( )
blog_author . username = f " djangoblog { timezone . now ( ) . strftime ( ' % y % m %d % I % M % S ' ) } "
blog_author . source = ' authorize '
blog_author . save ( )
# 关联OAuth用户和系统用户
user . author = author
user . save ( )
oauth_user_info . author = blog_author
oauth_user_info . save ( )
# 发送OAuth用户登录信号
oauth_user_login_signal . send (
sender = authorize . __class__ , id = user . id )
# 登录用户
login ( request , author )
return HttpResponseRedirect ( nexturl )
sender = authorize . __class__ , id = oauth_user_info . id )
login ( request , blog_author )
return HttpResponseRedirect ( next_url )
else :
# 没有邮箱, 保存OAuth用户信息并跳转到邮箱输入页面
user . save ( )
url = reverse ( ' oauth:require_email ' , kwargs = {
' oauthid ' : user . id
} )
oauth_user_info . save ( )
url = reverse ( ' oauth:require_email ' , kwargs = { ' oauthid ' : oauth_user_info . id } )
return HttpResponseRedirect ( url )
else :
return HttpResponseRedirect ( next url)
return HttpResponseRedirect ( next_url )
def emailconfirm ( request , id , sign ) :
"""
邮箱确认处理 - 验证邮箱并完成用户绑定
Args :
request : HTTP请求对象
id : OAuth用户ID
sign : 安全签名
Returns :
HttpResponseRedirect : 重定向响应
"""
if not sign :
return HttpResponseForbidden ( )
# 验证签名安全性
if not get_sha256 ( settings . SECRET_KEY +
str ( id ) +
settings . SECRET_KEY ) . upper ( ) == sign . upper ( ) :
if not get_sha256 ( f " { settings . SECRET_KEY } { id } { settings . SECRET_KEY } " ) . upper ( ) == sign . upper ( ) :
return HttpResponseForbidden ( )
# 获取OAuth用户
oauthuser = get_object_or_404 ( OAuthUser , pk = id )
oauth_user = get_object_or_404 ( OAuthUser , pk = id ) # 修复: 重命名oauthuser为oauth_user
with transaction . atomic ( ) :
if oauth user. author :
author = get_user_model ( ) . objects . get ( pk = oauth user. author_id )
if oauth_user . author :
blog_author = get_user_model ( ) . objects . get ( pk = oauth_user . author_id ) # 修复: 重命名author为blog_author
else :
# 创建或获取系统用户
result = get_user_model ( ) . objects . get_or_create ( email = oauthuser . email )
author = result [ 0 ]
if result [ 1 ] : # 新创建的用户
author. source = ' emailconfirm '
author . username = oauth user. nickname . strip ( ) if oauthuser . nickname . strip (
) else " djangoblog " + timezone . now ( ) . strftime ( ' % y % m %d % I % M % S ' )
author . save ( )
# 关联用户
oauthuser . author = author
oauth user. save ( )
# 发送登录信号并登录用户
result = get_user_model ( ) . objects . get_or_create ( email = oauth_user . email )
blog_author = result [ 0 ]
if result [ 1 ] :
blog_author . source = ' emailconfirm '
blog_author. username = (
oauth_ user. nickname . strip ( )
if oauth_user . nickname . strip ( )
else f " djangoblog { timezone . now ( ) . strftime ( ' % y % m %d % I % M % S ' ) } "
)
blog_author . save ( )
oauth _user. author = blog_author
oauth_user . save ( )
oauth_user_login_signal . send (
sender = emailconfirm . __class__ ,
id = oauthuser . id )
login ( request , author )
sender = emailconfirm . __class__ , id = oauth_user . id )
login ( request , blog_author )
# 发送绑定成功邮件
site = ' http:// ' + get_current_site ( ) . domain
content = _ ( '''
< p > 恭喜您 , 您已成功绑定邮箱 。 您可以使用 % ( oauth user _type) s直接登录本站 , 无需密码 。 < / p >
< p > 恭喜您 , 您已成功绑定邮箱 。 您可以使用 % ( oauth _type) s直接登录本站 , 无需密码 。 < / p >
欢迎您继续关注本站 , 地址是 < a href = " %(site)s " rel = " bookmark " > % ( site ) s < / a >
再次感谢 !
< br / >
如果上面的链接无法打开 , 请将此链接复制到浏览器 。
% ( site ) s
''' ) % { ' oauth user _type' : oauth user.type, ' site ' : site}
''' ) % { ' oauth _type' : oauth _ user.type, ' site ' : site} # 修复: 变量名oauthuser_type改为oauth_type
send_email ( emailto = [ oauth user. email , ] , title = _ ( ' 恭喜您绑定成功! ' ) , content = content )
send_email ( emailto = [ oauth _ user. email , ] , title = _ ( ' 恭喜您绑定成功! ' ) , content = content )
# 跳转到绑定成功页面
url = reverse ( ' oauth:bindsuccess ' , kwargs = {
' oauthid ' : id
} )
url = url + ' ?type=success '
url = reverse ( ' oauth:bindsuccess ' , kwargs = { ' oauthid ' : id } )
url + = ' ?type=success '
return HttpResponseRedirect ( url )
class RequireEmailView ( FormView ) :
"""
需要邮箱视图 - 处理用户输入邮箱地址
"""
form_class = RequireEmailForm # 使用的表单类
template_name = ' oauth/require_email.html ' # 模板名称
form_class = RequireEmailForm
template_name = ' oauth/require_email.html '
def get ( self , request , * args , * * kwargs ) :
""" GET请求处理 """
oauthid = self . kwargs [ ' oauthid ' ]
oauthuser = get_object_or_404 ( OAuthUser , pk = oauthid )
# 如果已有邮箱,直接跳过(这里注释掉了重定向逻辑)
if oauthuser . email :
oauth_id = self . kwargs [ ' oauthid ' ] # 修复: 重命名oauthid为oauth_id
oauth_user = get_object_or_404 ( OAuthUser , pk = oauth_id )
if oauth_user . email :
pass
# return HttpResponseRedirect('/')
return super ( RequireEmailView , self ) . get ( request , * args , * * kwargs )
return super ( ) . get ( request , * args , * * kwargs )
def get_initial ( self ) :
""" 获取表单初始数据 """
oauthid = self . kwargs [ ' oauthid ' ]
return {
' email ' : ' ' ,
' oauthid ' : oauthid
}
def get_context_data ( self , * * kwargs ) :
""" 获取模板上下文数据 """
oauthid = self . kwargs [ ' oauthid ' ]
oauthuser = get_object_or_404 ( OAuthUser , pk = oauthid )
# 添加用户头像到上下文
if oauthuser . picture :
kwargs [ ' picture ' ] = oauthuser . picture
return super ( RequireEmailView , self ) . get_context_data ( * * kwargs )
oauth_id = self . kwargs [ ' oauthid ' ]
return { ' email ' : ' ' , ' oauthid ' : oauth_id }
def get_context_data ( self , * * kwargs ) :
oauth_id = self . kwargs [ ' oauthid ' ]
oauth_user = get_object_or_404 ( OAuthUser , pk = oauth_id )
if oauth_user . picture :
kwargs [ ' picture ' ] = oauth_user . picture
return super ( ) . get_context_data ( * * kwargs )
def form_valid ( self , form ) :
""" 表单验证通过后的处理 """
email = form . cleaned_data [ ' email ' ]
oauthid = form . cleaned_data [ ' oauthid ' ]
oauthuser = get_object_or_404 ( OAuthUser , pk = oauthid )
oauthuser . email = email
oauthuser . save ( )
# 生成安全签名
sign = get_sha256 ( settings . SECRET_KEY +
str ( oauthuser . id ) + settings . SECRET_KEY )
oauth_id = form . cleaned_data [ ' oauthid ' ]
oauth_user = get_object_or_404 ( OAuthUser , pk = oauth_id )
oauth_user . email = email
oauth_user . save ( )
sign = get_sha256 ( f " { settings . SECRET_KEY } { oauth_user . id } { settings . SECRET_KEY } " )
site = get_current_site ( ) . domain
if settings . DEBUG :
site = ' 127.0.0.1:8000 '
# 构建邮箱确认URL
path = reverse ( ' oauth:email_confirm ' , kwargs = {
' id ' : oauthid ,
' sign ' : sign
} )
url = " http:// {site} {path} " . format ( site = site , path = path )
path = reverse ( ' oauth:email_confirm ' , kwargs = { ' id ' : oauth_id , ' sign ' : sign } )
confirm_url = f " http:// { site } { path } " # 修复: 重命名url为confirm_url
# 发送确认邮件
content = _ ( """
< p > 请点击下面的链接完成邮箱绑定 < / p >
< a href = " %(url)s " rel = " bookmark " > % ( url ) s < / a >
@ -317,43 +220,26 @@ class RequireEmailView(FormView):
如果上面的链接无法打开 , 请将此链接复制到浏览器 。
< br / >
% ( url ) s
""" ) % { ' url ' : url}
""" ) % { ' url ' : confirm_ url}
send_email ( emailto = [ email , ] , title = _ ( ' 绑定邮箱 ' ) , content = content )
# 跳转到绑定成功提示页面
url = reverse ( ' oauth:bindsuccess ' , kwargs = {
' oauthid ' : oauthid
} )
url = url + ' ?type=email '
return HttpResponseRedirect ( url )
redirect_url = reverse ( ' oauth:bindsuccess ' , kwargs = { ' oauthid ' : oauth_id } )
redirect_url + = ' ?type=email '
return HttpResponseRedirect ( redirect_url )
def bindsuccess ( request , oauthid ) :
"""
绑定成功页面 - 显示绑定状态信息
Args :
request : HTTP请求对象
oauthid : OAuth用户ID
notify_type = request . GET . get ( ' type ' , None ) # 修复: 重命名type为notify_type
oauth_user = get_object_or_404 ( OAuthUser , pk = oauthid )
Returns :
HttpResponse : 渲染的响应
"""
type = request . GET . get ( ' type ' , None )
oauthuser = get_object_or_404 ( OAuthUser , pk = oauthid )
# 根据类型显示不同的提示信息
if type == ' email ' :
if notify_type == ' email ' :
title = _ ( ' 绑定邮箱 ' )
content = _ (
' 恭喜您,绑定只差一步之遥。请登录您的邮箱查看邮件完成绑定。谢谢。 ' )
content = _ ( ' 恭喜您,绑定只差一步之遥。请登录您的邮箱查看邮件完成绑定。谢谢。 ' )
else :
title = _ ( ' 绑定成功 ' )
content = _ (
" 恭喜您,您已成功绑定邮箱地址。您可以使用 %(oauthuser_type)s 直接登录本站,无需密码。欢迎您继续关注本站。 " % {
' oauthuser_type ' : oauthuser . type } )
" 恭喜您,您已成功绑定邮箱地址。您可以使用 %(oauth_type)s 直接登录本站,无需密码。欢迎您继续关注本站。 " % {
' oauth_type ' : oauth_user . type
} )
return render ( request , ' oauth/bindsuccess.html ' , {
' title ' : title ,
' content ' : content
} )
return render ( request , ' oauth/bindsuccess.html ' , { ' title ' : title , ' content ' : content } )