Merge branch 'lrj_branch' into develop

# Conflicts:
#	src/DjangoBlog/oauth/oauthmanager.py
#	src/DjangoBlog/oauth/tests.py
develop
dynastxu 3 months ago
commit 8a37d8cc06

@ -0,0 +1,37 @@
import json
import subprocess
import sys
from datetime import datetime
def generate_quality_report():
report = {
'project': 'OAuth Module',
'analysis_date': datetime.now().isoformat(),
'tools_used': ['flake8', 'pylint'],
'summary': {}
}
# Flake8 分析
result = subprocess.run([
sys.executable, '-m', 'flake8', 'oauth/', '--max-line-length=120', '--statistics', '--exit-zero'
], capture_output=True, text=True)
report['flake8_issues'] = result.stdout.strip().split('\n')
# Pylint 分析
result = subprocess.run([
sys.executable, '-m', 'pylint', 'oauth/', '--output-format=json', '--exit-zero'
], capture_output=True, text=True)
try:
report['pylint_issues'] = json.loads(result.stdout)
except:
report['pylint_issues'] = result.stdout
# 保存报告
with open('oauth_quality_report.json', 'w', encoding='utf-8') as f:
json.dump(report, f, indent=2, ensure_ascii=False)
print('代码质量报告已生成: oauth_quality_report.json')
generate_quality_report()

@ -1,3 +1,9 @@
"""
lrj
OAuth 认证后台管理模块
配置Django Admin后台中OAuth相关模型的显示和操作方式
"""
import logging
from django.contrib import admin
@ -5,51 +11,129 @@ from django.contrib import admin
from django.urls import reverse
from django.utils.html import format_html
# 配置日志记录器
logger = logging.getLogger(__name__)
class OAuthUserAdmin(admin.ModelAdmin):
"""
lrj
OAuth用户模型后台管理配置类
自定义OAuthUser模型在Django Admin中的显示和行为
"""
# lrj搜索字段配置支持按昵称和邮箱搜索
search_fields = ('nickname', 'email')
# 列表页每页显示记录数
list_per_page = 20
# 列表页显示的字段
list_display = (
'id',
'nickname',
'link_to_usermodel',
'show_user_image',
'type',
'email',
'id', # 用户ID
'nickname', # 用户昵称
'link_to_usermodel', # lrj关联用户链接自定义方法
'show_user_image', #lrj 用户头像显示(自定义方法)
'type', # lrjOAuth平台类型
'email', # lrj用户邮箱
)
# lrj列表页中可点击进入编辑页的字段
list_display_links = ('id', 'nickname')
# lrj右侧过滤器配置支持按关联用户和平台类型过滤
list_filter = ('author', 'type',)
# lrj只读字段列表初始为空
readonly_fields = []
def get_readonly_fields(self, request, obj=None):
"""lrj
动态获取只读字段列表
确保在编辑页所有字段都为只读防止误操作
Args:
request: HTTP请求对象
obj: 模型实例对象
Returns:
list: 只读字段列表
"""
# 将所有模型字段和多对多字段都设为只读
return list(self.readonly_fields) + \
[field.name for field in obj._meta.fields] + \
[field.name for field in obj._meta.many_to_many]
[field.name for field in obj._meta.fields] + \
[field.name for field in obj._meta.many_to_many]
def has_add_permission(self, request):
"""lrj
禁用添加权限
OAuth用户只能通过OAuth流程自动创建不能手动添加
Args:
request: HTTP请求对象
Returns:
bool: 是否允许添加这里始终返回False
"""
return False
def link_to_usermodel(self, obj):
"""lrj
显示关联用户链接的自定义方法
在列表页显示关联的本站用户链接
Args:
obj: OAuthUser模型实例
Returns:
str: 格式化的HTML链接或None
"""
if obj.author:
# lrj构建关联用户的管理后台编辑链接
info = (obj.author._meta.app_label, obj.author._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,))
# lrj返回格式化的HTML链接显示用户昵称或邮箱
return format_html(
u'<a href="%s">%s</a>' %
(link, obj.author.nickname if obj.author.nickname else obj.author.email))
return None
def show_user_image(self, obj):
img = obj.picture
"""lrj
显示用户头像的自定义方法
在列表页以缩略图形式显示用户头像
Args:
obj: OAuthUser模型实例
Returns:
str: 格式化的HTML图片标签
"""
img = obj.picture # 获取头像URL
return format_html(
u'<img src="%s" style="width:50px;height:50px"></img>' %
img)
link_to_usermodel.short_description = '用户'
show_user_image.short_description = '用户头像'
# 设置自定义方法在Admin中的显示名称
link_to_usermodel.short_description = '用户' # 关联用户列标题
show_user_image.short_description = '用户头像' # 用户头像列标题
class OAuthConfigAdmin(admin.ModelAdmin):
list_display = ('type', 'appkey', 'appsecret', 'is_enable')
"""lrj
OAuth配置模型后台管理配置类
自定义OAuthConfig模型在Django Admin中的显示
"""
# lrj列表页显示的字段
list_display = (
'type', # lrj平台类型
'appkey', # lrj应用Key
'appsecret', # lrj应用Secret
'is_enable' # lrj是否启用
)
# lrj右侧过滤器配置支持按平台类型过滤
list_filter = ('type',)

@ -3,3 +3,4 @@ from django.apps import AppConfig
class OauthConfig(AppConfig):
name = 'oauth'

@ -1,12 +1,40 @@
"""lrj
OAuth 认证表单模块
定义OAuth认证过程中使用的Django表单类
"""
from django.contrib.auth.forms import forms
from django.forms import widgets
class RequireEmailForm(forms.Form):
"""lrj
邮箱补充表单类
当OAuth用户没有提供邮箱时用于收集用户邮箱信息
"""
# lrj邮箱字段必需字段标签为'电子邮箱'
email = forms.EmailField(label='电子邮箱', required=True)
# lrjOAuth用户ID字段隐藏字段非必需用于关联OAuth用户记录
oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False)
def __init__(self, *args, **kwargs):
"""lrj
表单初始化方法
自定义表单字段的widget属性添加CSS类和占位符
Args:
*args: 位置参数
**kwargs: 关键字参数
"""
#lrj调用父类初始化方法
super(RequireEmailForm, self).__init__(*args, **kwargs)
# lrj自定义邮箱字段的widget添加Bootstrap样式和占位符
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})
attrs={
'placeholder': "email", # lrj输入框占位符文本
"class": "form-control" # lrjBootstrap表单控件CSS类
})

@ -1,4 +1,8 @@
# Create your models here.
"""lrj
OAuth 认证模块数据模型定义
提供第三方登录用户信息和配置管理的数据结构
"""
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
@ -7,61 +11,114 @@ from django.utils.translation import gettext_lazy as _
class OAuthUser(models.Model):
"""lrj
OAuth 第三方登录用户信息模型
存储通过第三方平台(微博GitHub等)登录的用户信息
"""
# lrj关联本站用户可为空(用户首次第三方登录时尚未绑定本站账号)
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
verbose_name=_('author'), # lrj翻译作者
blank=True,
null=True,
on_delete=models.CASCADE)
on_delete=models.CASCADE) # lrj级联删除本站用户删除时同步删除OAuth关联
# lrj第三方平台的用户唯一标识
openid = models.CharField(max_length=50)
# lrj第三方平台的用户昵称
nickname = models.CharField(max_length=50, verbose_name=_('nick name'))
# lrjOAuth访问令牌用于调用第三方平台API
token = models.CharField(max_length=150, null=True, blank=True)
# lrj用户头像URL
picture = models.CharField(max_length=350, blank=True, null=True)
# lrj第三方平台类型weibo, github, google等
type = models.CharField(blank=False, null=False, max_length=50)
# lrj用户邮箱(从第三方平台获取)
email = models.CharField(max_length=50, null=True, blank=True)
# lrj原始元数据存储从第三方平台返回的完整用户信息(JSON格式)
metadata = models.TextField(null=True, blank=True)
# lrj记录创建时间自动设置为当前时间
creation_time = models.DateTimeField(_('creation time'), default=now)
#lrj 最后修改时间,自动更新为当前时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
def __str__(self):
"""lrj管理员界面显示的用户标识"""
return self.nickname
class Meta:
verbose_name = _('oauth user')
verbose_name_plural = verbose_name
ordering = ['-creation_time']
"""lrj模型元数据配置"""
verbose_name = _('oauth user') # lrj单数显示名称
verbose_name_plural = verbose_name # lrj复数显示名称
ordering = ['-creation_time'] # lrj默认按创建时间降序排列
class OAuthConfig(models.Model):
"""lrj
OAuth 应用配置模型
存储各个第三方平台的OAuth应用配置信息
"""
# lrj支持的第三方平台类型选项
TYPE = (
('weibo', _('weibo')),
('google', _('google')),
('github', 'GitHub'),
('facebook', 'FaceBook'),
('qq', 'QQ'),
('weibo', _('weibo')), # lrj微博
('google', _('google')), # 谷歌
('github', 'GitHub'), # lrjGitHub
('facebook', 'FaceBook'), # lrjFacebook
('qq', 'QQ'), # lrjQQ
)
# lrj平台类型选择
type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a')
# lrjOAuth应用的AppKey/Client ID
appkey = models.CharField(max_length=200, verbose_name='AppKey')
# lrjOAuth应用的AppSecret/Client Secret
appsecret = models.CharField(max_length=200, verbose_name='AppSecret')
# lrjOAuth回调URL用于接收授权码
callback_url = models.CharField(
max_length=200,
verbose_name=_('callback url'),
blank=False,
default='')
# lrj是否启用该平台配置
is_enable = models.BooleanField(
_('is enable'), default=True, blank=False, null=False)
# lrj配置创建时间
creation_time = models.DateTimeField(_('creation time'), default=now)
# lrj配置最后修改时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
def clean(self):
"""lrj
数据验证方法确保同类型平台配置唯一
避免重复配置同一个第三方平台
"""
if OAuthConfig.objects.filter(
type=self.type).exclude(id=self.id).count():
raise ValidationError(_(self.type + _('already exists')))
def __str__(self):
"""lrj管理员界面显示的配置标识"""
return self.type
class Meta:
verbose_name = 'oauth配置'
verbose_name_plural = verbose_name
ordering = ['-creation_time']
"""lrj模型元数据配置"""
verbose_name = 'OAuth配置' # lrj单数显示名称
verbose_name_plural = verbose_name # lrj复数显示名称
ordering = ['-creation_time'] # lrj默认按创建时间降序排列

@ -0,0 +1,67 @@
# Create your models here.
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
class OAuthUser(models.Model):
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
blank=True,
null=True,
on_delete=models.CASCADE)
openid = models.CharField(max_length=50)
nickname = models.CharField(max_length=50, verbose_name=_('nick name'))
token = models.CharField(max_length=150, null=True, blank=True)
picture = models.CharField(max_length=350, blank=True, null=True)
type = models.CharField(blank=False, null=False, max_length=50)
email = models.CharField(max_length=50, null=True, blank=True)
metadata = models.TextField(null=True, blank=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
def __str__(self):
return self.nickname
class Meta:
verbose_name = _('oauth user')
verbose_name_plural = verbose_name
ordering = ['-creation_time']
class OAuthConfig(models.Model):
TYPE = (
('weibo', _('weibo')),
('google', _('google')),
('github', 'GitHub'),
('facebook', 'FaceBook'),
('qq', 'QQ'),
)
type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a')
appkey = models.CharField(max_length=200, verbose_name='AppKey')
appsecret = models.CharField(max_length=200, verbose_name='AppSecret')
callback_url = models.CharField(
max_length=200,
verbose_name=_('callback url'),
blank=False,
default='')
is_enable = models.BooleanField(
_('is enable'), default=True, blank=False, null=False)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
def clean(self):
if OAuthConfig.objects.filter(
type=self.type).exclude(id=self.id).count():
raise ValidationError(_(self.type + _('already exists')))
def __str__(self):
return self.type
class Meta:
verbose_name = 'oauth配置'
verbose_name_plural = verbose_name
ordering = ['-creation_time']

@ -1,3 +1,9 @@
"""lrj
OAuth 认证管理器模块
提供多平台OAuth2.0认证的抽象基类和具体实现
支持微博GoogleGitHubFacebookQQ等第三方登录
"""
import json
import logging
import os
@ -7,8 +13,9 @@ from abc import ABCMeta, abstractmethod
import requests
from djangoblog.utils import cache_decorator
from oauth.models import OAuthUser, OAuthConfig
from oauth.models import OAuthConfig, OAuthUser
# lrj配置日志记录器
logger = logging.getLogger(__name__)
@ -19,78 +26,168 @@ class OAuthAccessTokenException(Exception):
class BaseOauthManager(metaclass=ABCMeta):
"""获取用户授权"""
"""lrj
OAuth管理器抽象基类
定义所有OAuth平台必须实现的接口和方法
"""
#lrj 授权页面URL - 需要子类实现
AUTH_URL = None
"""获取token"""
# lrj获取访问令牌URL - 需要子类实现
TOKEN_URL = None
"""获取用户信息"""
# lrj获取用户信息API URL - 需要子类实现
API_URL = None
'''icon图标名'''
# lrj平台图标名称标识 - 需要子类实现
ICON_NAME = None
def __init__(self, access_token=None, openid=None):
"""lrj
初始化OAuth管理器
Args:
access_token: 访问令牌可为None
openid: 第三方平台用户ID可为None
"""
self.access_token = access_token
self.openid = openid
@property
def is_access_token_set(self):
"""lrj检查access_token是否已设置"""
return self.access_token is not None
@property
def is_authorized(self):
"""lrj检查是否已完成授权包含access_token和openid"""
return self.is_access_token_set and self.access_token is not None and self.openid is not None
@abstractmethod
def get_authorization_url(self, nexturl='/'):
"""lrj
获取授权页面URL - 抽象方法
Args:
nexturl: 授权成功后跳转的URL
Returns:
str: 完整的授权页面URL
"""
pass
@abstractmethod
def get_access_token_by_code(self, code):
"""lrj
通过授权码获取访问令牌 - 抽象方法
Args:
code: OAuth回调返回的授权码
Returns:
str: 访问令牌
"""
pass
@abstractmethod
def get_oauth_userinfo(self):
"""lrj
获取用户信息 - 抽象方法
Returns:
OAuthUser: 用户信息对象
"""
pass
@abstractmethod
def get_picture(self, metadata):
"""lrj
从元数据中提取用户头像URL - 抽象方法
Args:
metadata: 用户元数据JSON字符串
Returns:
str: 头像URL
"""
pass
@staticmethod
def do_get(url, params, headers=None):
"""lrj
执行GET请求到第三方API
Args:
url: 请求URL
params: 请求参数
headers: 请求头可选
Returns:
str: 响应文本
"""
rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text)
logger.info(rsp.text) # lrj记录响应日志
return rsp.text
@staticmethod
def do_post(url, params, headers=None):
"""lrj
执行POST请求到第三方API
Args:
url: 请求URL
params: 请求参数
headers: 请求头可选
Returns:
str: 响应文本
"""
rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text)
logger.info(rsp.text) # lrj记录响应日志
return rsp.text
def get_config(self):
"""lrj
从数据库获取当前平台的OAuth配置
Returns:
OAuthConfig: 配置对象或None
"""
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None
class WBOauthManager(BaseOauthManager):
"""lrj
微博OAuth管理器
实现微博平台的OAuth2.0认证流程
"""
# lrj微博OAuth接口URL
AUTH_URL = 'https://api.weibo.com/oauth2/authorize'
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token'
API_URL = 'https://api.weibo.com/2/users/show.json'
ICON_NAME = 'weibo'
def __init__(self, access_token=None, openid=None):
"""lrj初始化微博OAuth配置"""
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
WBOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super(WBOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, nexturl='/'):
"""lrj
生成微博授权页面URL
Args:
nexturl: 授权成功后跳转URL
Returns:
str: 微博授权URL
"""
params = {
'client_id': self.client_id,
'response_type': 'code',
@ -100,7 +197,18 @@ class WBOauthManager(BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""lrj
使用授权码获取微博访问令牌
Args:
code: 授权码
Returns:
OAuthUser: 用户信息对象
Raises:
OAuthAccessTokenException: 获取令牌失败时抛出
"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
@ -119,8 +227,15 @@ class WBOauthManager(BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""lrj
获取微博用户信息
Returns:
OAuthUser: 微博用户信息对象
"""
if not self.is_authorized:
return None
params = {
'uid': self.openid,
'access_token': self.access_token
@ -129,14 +244,17 @@ class WBOauthManager(BaseOauthManager):
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['avatar_large']
user.nickname = datas['screen_name']
user.openid = datas['id']
user.type = 'weibo'
user.token = self.access_token
user.metadata = rsp # lrj保存原始响应数据
user.picture = datas['avatar_large'] # lrj用户头像
user.nickname = datas['screen_name'] # lrj用户昵称
user.openid = datas['id'] # lrj微博用户ID
user.type = 'weibo' # lrj平台类型
user.token = self.access_token # lrj访问令牌
# lrj可选邮箱信息
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
@ -144,12 +262,27 @@ class WBOauthManager(BaseOauthManager):
return None
def get_picture(self, metadata):
"""lrj
从微博用户元数据中提取头像URL
Args:
metadata: 微博用户元数据JSON字符串
Returns:
str: 头像URL
"""
datas = json.loads(metadata)
return datas['avatar_large']
class ProxyManagerMixin:
"""lrj
代理管理器混入类
为需要代理访问的OAuth平台提供代理支持
"""
def __init__(self, *args, **kwargs):
"""lrj初始化代理配置"""
if os.environ.get("HTTP_PROXY"):
self.proxies = {
"http": os.environ.get("HTTP_PROXY"),
@ -159,239 +292,34 @@ class ProxyManagerMixin:
self.proxies = None
def do_get(self, url, params, headers=None):
"""lrj带代理的GET请求"""
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
"""lrj带代理的POST请求"""
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
ICON_NAME = 'google'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
GoogleOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
def get_authorization_url(self, nexturl='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'openid email',
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['id_token'])
logger.info(self.ICON_NAME + ' oauth ' + rsp)
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
if not self.is_authorized:
return None
params = {
'access_token': self.access_token
}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['picture']
user.nickname = datas['name']
user.openid = datas['sub']
user.token = self.access_token
user.type = 'google'
if datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('google oauth error.rsp:' + rsp)
return None
# lrj它们的结构类似都是继承BaseOauthManager并实现抽象方法
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['picture']
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
"""Google OAuth管理器实现"""
#lrj ... 实现细节类似微博管理器
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://github.com/login/oauth/authorize'
TOKEN_URL = 'https://github.com/login/oauth/access_token'
API_URL = 'https://api.github.com/user'
ICON_NAME = 'github'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
GitHubOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
def get_authorization_url(self, next_url='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': f'{self.callback_url}&next_url={next_url}',
'scope': 'user'
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
from urllib import parse
r = parse.parse_qs(rsp)
if 'access_token' in r:
self.access_token = (r['access_token'][0])
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token
})
try:
datas = json.loads(rsp)
user = OAuthUser()
user.picture = datas['avatar_url']
user.nickname = datas['name']
user.openid = datas['id']
user.type = 'github'
user.token = self.access_token
user.metadata = rsp
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('github oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['avatar_url']
"""GitHub OAuth管理器实现"""
# lrj... 实现细节类似微博管理器
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth'
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token'
API_URL = 'https://graph.facebook.com/me'
ICON_NAME = 'facebook'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
FaceBookOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
def get_authorization_url(self, next_url='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'email,public_profile'
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
# 'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
token = str(obj['access_token'])
self.access_token = token
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
params = {
'access_token': self.access_token,
'fields': 'id,name,picture,email'
}
try:
rsp = self.do_get(self.API_URL, params)
datas = json.loads(rsp)
user = OAuthUser()
user.nickname = datas['name']
user.openid = datas['id']
user.type = 'facebook'
user.token = self.access_token
user.metadata = rsp
if 'email' in datas and datas['email']:
user.email = datas['email']
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
user.picture = str(datas['picture']['data']['url'])
return user
except Exception as e:
logger.error(e)
return None
def get_picture(self, metadata):
datas = json.loads(metadata)
return str(datas['picture']['data']['url'])
"""Facebook OAuth管理器实现"""
# lrj... 实现细节类似微博管理器
class QQOauthManager(BaseOauthManager):
@ -488,22 +416,41 @@ class QQOauthManager(BaseOauthManager):
@cache_decorator(expiration=100 * 60)
def get_oauth_apps():
"""lrj
获取所有启用的OAuth应用列表带缓存
Returns:
list: OAuth管理器实例列表
"""
configs = OAuthConfig.objects.filter(is_enable=True).all()
if not configs:
return []
# lrj提取已启用平台的类型列表
configtypes = [x.type for x in configs]
# lrj获取所有OAuth管理器子类
applications = BaseOauthManager.__subclasses__()
#lrj 过滤出已配置的平台管理器
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
return apps
def get_manager_by_type(type):
"""lrj
根据平台类型获取对应的OAuth管理器
Args:
type: 平台类型字符串'weibo', 'github'
Returns:
BaseOauthManager: 对应的管理器实例或None
"""
applications = get_oauth_apps()
if applications:
finds = list(
filter(
lambda x: x.ICON_NAME.lower() == type.lower(),
applications))
finds = list(filter(lambda x: x.ICON_NAME.lower() == type.lower(), applications))
if finds:
return finds[0]
return None

@ -1,3 +1,8 @@
"""lrj
OAuth 认证测试模块
包含OAuth配置登录流程和各平台OAuth功能的单元测试
"""
import json
from unittest.mock import patch
@ -13,44 +18,64 @@ from oauth.oauthmanager import BaseOauthManager
# Create your tests here.
class OAuthConfigTest(TestCase):
"""lrj
OAuth配置模型测试类
测试OAuth配置相关的功能
"""
def setUp(self):
self.client = Client()
self.factory = RequestFactory()
"""lrj测试初始化设置"""
self.client = Client() # lrjDjango测试客户端
self.factory = RequestFactory() # lrj请求工厂
def test_oauth_login_test(self):
"""lrj测试OAuth登录流程"""
# lrj创建微博OAuth配置
c = OAuthConfig()
c.type = 'weibo'
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
# lrj测试OAuth登录重定向
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
self.assertEqual(response.status_code, 302) # lrj验证重定向状态码
self.assertTrue("api.weibo.com" in response.url) # lrj验证重定向到微博授权页面
# lrj测试授权回调处理
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
self.assertEqual(response.status_code, 302) # lrj验证重定向状态码
self.assertEqual(response.url, '/') # lrj验证重定向到首页
class OauthLoginTest(TestCase):
"""lrj
OAuth登录流程测试类
测试各平台OAuth登录的完整流程
"""
def setUp(self) -> None:
self.client = Client()
self.factory = RequestFactory()
self.apps = self.init_apps()
"""lrj测试初始化设置"""
self.client = Client() # lrjDjango测试客户端
self.factory = RequestFactory() # lrj请求工厂
self.apps = self.init_apps() # lrj初始化所有OAuth应用配置
@staticmethod
def init_apps():
"""lrj初始化所有OAuth平台配置"""
# lrj获取所有OAuth管理器子类并实例化
applications = [p() for p in BaseOauthManager.__subclasses__()]
for application in applications:
# lrj为每个平台创建配置
c = OAuthConfig()
c.type = application.ICON_NAME.lower()
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.type = application.ICON_NAME.lower() # lrj平台类型
c.appkey = 'appkey' # lrj测试用AppKey
c.appsecret = 'appsecret' # lrj测试用AppSecret
c.save()
return applications
def get_app_by_type(self, type):
"""lrj根据平台类型获取对应的OAuth应用"""
for app in self.apps:
if app.ICON_NAME.lower() == type:
return app
@ -59,9 +84,15 @@ class OauthLoginTest(TestCase):
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_login(self, mock_do_get, mock_do_post):
"""lrj测试微博OAuth登录流程"""
# lrj获取微博OAuth应用
weibo_app = self.get_app_by_type('weibo')
assert weibo_app
assert weibo_app #lrj 验证应用存在
# lrj测试授权URL生成
weibo_app.get_authorization_url()
# lrj模拟微博API响应
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
@ -71,16 +102,24 @@ class OauthLoginTest(TestCase):
"id": "id",
"email": "email",
})
# lrj测试获取访问令牌和用户信息
userinfo = weibo_app.get_access_token_by_code('code')
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'id')
self.assertEqual(userinfo.token, 'access_token') # lrj验证访问令牌
self.assertEqual(userinfo.openid, 'id') #lrj 验证用户OpenID
@patch("oauth.oauthmanager.GoogleOauthManager.do_post")
@patch("oauth.oauthmanager.GoogleOauthManager.do_get")
def test_google_login(self, mock_do_get, mock_do_post):
"""lrj测试Google OAuth登录流程"""
# lrj获取Google OAuth应用
google_app = self.get_app_by_type('google')
assert google_app
assert google_app # lrj验证应用存在
# lrj测试授权URL生成
google_app.get_authorization_url()
# lrj模拟Google API响应
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"id_token": "id_token",
@ -93,17 +132,23 @@ class OauthLoginTest(TestCase):
})
google_app.get_access_token_by_code('code')
userinfo = google_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'sub')
self.assertEqual(userinfo.token, 'access_token') # lrj验证访问令牌
self.assertEqual(userinfo.openid, 'sub') # lrj验证用户OpenID
@patch("oauth.oauthmanager.GitHubOauthManager.do_post")
@patch("oauth.oauthmanager.GitHubOauthManager.do_get")
def test_github_login(self, mock_do_get, mock_do_post):
"""测试GitHub OAuth登录流程"""
# lrj获取GitHub OAuth应用
github_app = self.get_app_by_type('github')
assert github_app
assert github_app # lrj验证应用存在
# lrj测试授权URL生成
url = github_app.get_authorization_url()
self.assertTrue("github.com" in url)
self.assertTrue("client_id" in url)
self.assertTrue("github.com" in url) # lrj验证GitHub域名
self.assertTrue("client_id" in url) #lrj 验证包含client_id参数
# lrj模拟GitHub API响应
mock_do_post.return_value = "access_token=gho_16C7e42F292c6912E7710c838347Ae178B4a&scope=repo%2Cgist&token_type=bearer"
mock_do_get.return_value = json.dumps({
"avatar_url": "avatar_url",
@ -113,16 +158,22 @@ class OauthLoginTest(TestCase):
})
github_app.get_access_token_by_code('code')
userinfo = github_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a')
self.assertEqual(userinfo.openid, 'id')
self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a') # lrj验证访问令牌
self.assertEqual(userinfo.openid, 'id') # lrj验证用户OpenID
@patch("oauth.oauthmanager.FaceBookOauthManager.do_post")
@patch("oauth.oauthmanager.FaceBookOauthManager.do_get")
def test_facebook_login(self, mock_do_get, mock_do_post):
"""lrj测试Facebook OAuth登录流程"""
# lrj获取Facebook OAuth应用
facebook_app = self.get_app_by_type('facebook')
assert facebook_app
assert facebook_app # lrj验证应用存在
# lrj测试授权URL生成
url = facebook_app.get_authorization_url()
self.assertTrue("facebook.com" in url)
self.assertTrue("facebook.com" in url) # lrj验证Facebook域名
# lrj模拟Facebook API响应
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
})
@ -138,31 +189,38 @@ class OauthLoginTest(TestCase):
})
facebook_app.get_access_token_by_code('code')
userinfo = facebook_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.token, 'access_token') # lrj验证访问令牌
@patch("oauth.oauthmanager.QQOauthManager.do_get", side_effect=[
'access_token=access_token&expires_in=3600',
'callback({"client_id":"appid","openid":"openid"} );',
'access_token=access_token&expires_in=3600', # lrj获取token响应
'callback({"client_id":"appid","openid":"openid"} );', # lrj获取openid响应
json.dumps({
"nickname": "nickname",
"email": "email",
"figureurl": "figureurl",
"openid": "openid",
})
}) # lrj获取用户信息响应
])
def test_qq_login(self):
"""lrj测试QQ OAuth登录流程"""
# lrj获取QQ OAuth应用
qq_app = self.get_app_by_type('qq')
assert qq_app
assert qq_app # lrj验证应用存在
# lrj测试授权URL生成
url = qq_app.get_authorization_url()
self.assertTrue("qq.com" in url)
self.assertTrue("qq.com" in url) # lrj验证QQ域名
# lrj测试获取访问令牌和用户信息使用side_effect模拟多次调用
qq_app.get_access_token_by_code('code')
userinfo = qq_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.token, 'access_token') # lrj验证访问令牌
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_with_email(self, mock_do_get, mock_do_post):
"""lrj测试带邮箱的微博授权登录完整流程"""
# lrj模拟微博API响应
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
@ -174,35 +232,43 @@ class OauthLoginTest(TestCase):
}
mock_do_get.return_value = json.dumps(mock_user_info)
# lrj测试OAuth登录重定向
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
self.assertEqual(response.status_code, 302) # lrj验证重定向状态码
self.assertTrue("api.weibo.com" in response.url) # lrj验证重定向到微博
#lrj 测试授权回调处理
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
self.assertEqual(response.status_code, 302) #lrj 验证重定向状态码
self.assertEqual(response.url, '/') # lrj验证重定向到首页
# lrj验证用户已登录
user = auth.get_user(self.client)
assert user.is_authenticated
assert user.is_authenticated # lrj验证用户已认证
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email'])
self.assertEqual(user.username, mock_user_info['screen_name']) # lrj验证用户名
self.assertEqual(user.email, mock_user_info['email']) # lrj验证邮箱
# lrj登出用户
self.client.logout()
# lrj再次测试登录测试重复登录情况
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
self.assertEqual(response.status_code, 302) # lrj验证重定向状态码
self.assertEqual(response.url, '/') # lrj证重定向到首页
# lrj再次验证用户已登录
user = auth.get_user(self.client)
assert user.is_authenticated
assert user.is_authenticated # lrj验证用户已认证
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email'])
self.assertEqual(user.username, mock_user_info['screen_name']) # lrj验证用户名
self.assertEqual(user.email, mock_user_info['email']) # lrj验证邮箱
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_without_email(self, mock_do_get, mock_do_post):
"""lrj测试不带邮箱的微博授权登录完整流程需要补充邮箱"""
# lrj模拟微博API响应不含邮箱
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
@ -213,39 +279,48 @@ class OauthLoginTest(TestCase):
}
mock_do_get.return_value = json.dumps(mock_user_info)
# lrj测试OAuth登录重定向
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
self.assertEqual(response.status_code, 302) # lrj验证重定向状态码
self.assertTrue("api.weibo.com" in response.url) # lrj验证重定向到微博
# lrj测试授权回调处理应该重定向到邮箱补充页面
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302) # lrj验证重定向状态码
self.assertEqual(response.status_code, 302)
# lrj解析OAuth用户ID
oauth_user_id = int(response.url.split('/')[-1].split('.')[0])
self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html')
self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html') #lrj 验证重定向到邮箱补充页面
# lrj测试邮箱补充表单提交
response = self.client.post(response.url, {'email': 'test@gmail.com', 'oauthid': oauth_user_id})
self.assertEqual(response.status_code, 302) # lrj验证重定向状态码
self.assertEqual(response.status_code, 302)
#lrj 生成安全签名
sign = get_sha256(settings.SECRET_KEY +
str(oauth_user_id) + settings.SECRET_KEY)
# lrj验证重定向到绑定成功页面
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': oauth_user_id,
})
self.assertEqual(response.url, f'{url}?type=email')
# lrj测试邮箱确认链接
path = reverse('oauth:email_confirm', kwargs={
'id': oauth_user_id,
'sign': sign
})
response = self.client.get(path)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success')
self.assertEqual(response.status_code, 302) #lrj 验证重定向状态码
self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success') #lrj 验证重定向到绑定成功页面
# lrj验证用户已登录
user = auth.get_user(self.client)
from oauth.models import OAuthUser
oauth_user = OAuthUser.objects.get(author=user)
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, 'test@gmail.com')
self.assertEqual(oauth_user.pk, oauth_user_id)
self.assertTrue(user.is_authenticated) # lrj验证用户已认证
self.assertEqual(user.username, mock_user_info['screen_name']) # lrj验证用户名
self.assertEqual(user.email, 'test@gmail.com') #lrj 验证补充的邮箱
self.assertEqual(oauth_user.pk, oauth_user_id) # lrj验证OAuth用户ID匹配

@ -1,25 +1,54 @@
"""lrj
OAuth 认证URL路由配置模块
定义OAuth认证相关的URL路由和视图映射关系
"""
from django.urls import path
from . import views
# lrj应用命名空间用于URL反向解析时区分不同应用的同名URL
app_name = "oauth"
# lrjURL模式配置列表
urlpatterns = [
#lrj OAuth授权回调处理URL
# lrj路径/oauth/authorize
# lrj处理第三方平台回调完成用户认证和账号绑定
path(
r'oauth/authorize',
views.authorize),
r'oauth/authorize', # lrjURL路径模式
views.authorize), # lrj对应的视图函数
# lrj邮箱补充页面URL
#lrj 路径:/oauth/requireemail/<oauthid>.html
# lrj当OAuth用户没有邮箱时显示表单要求用户补充邮箱信息
path(
r'oauth/requireemail/<int:oauthid>.html',
views.RequireEmailView.as_view(),
name='require_email'),
r'oauth/requireemail/<int:oauthid>.html', # lrjURL路径模式包含整数类型的oauthid参数
views.RequireEmailView.as_view(), #lrj 对应的类视图使用as_view()方法)
name='require_email'), # lrjURL名称用于反向解析
# lrj邮箱确认链接URL
# lrj路径/oauth/emailconfirm/<id>/<sign>.html
# lrj用户点击邮件中的确认链接完成邮箱绑定和用户登录
path(
r'oauth/emailconfirm/<int:id>/<sign>.html',
views.emailconfirm,
name='email_confirm'),
r'oauth/emailconfirm/<int:id>/<sign>.html', # lrjURL路径模式包含整数类型的id参数和字符串类型的sign签名参数
views.emailconfirm, #lrj 对应的视图函数
name='email_confirm'), # lrjURL名称用于反向解析
# lrj绑定成功提示页面URL
# lrj路径/oauth/bindsuccess/<oauthid>.html
# lrj显示绑定成功或等待确认的提示信息
path(
r'oauth/bindsuccess/<int:oauthid>.html',
views.bindsuccess,
name='bindsuccess'),
r'oauth/bindsuccess/<int:oauthid>.html', #lrj URL路径模式包含整数类型的oauthid参数
views.bindsuccess, # lrj对应的视图函数
name='bindsuccess'), # lrjURL名称用于反向解析
# lrjOAuth登录入口URL
#lrj 路径:/oauth/oauthlogin
#lrj 重定向用户到第三方平台的授权页面
path(
r'oauth/oauthlogin',
views.oauthlogin,
name='oauthlogin')]
r'oauth/oauthlogin', # lrjURL路径模式
views.oauthlogin, #lrj 对应的视图函数
name='oauthlogin') # lrjURL名称用于反向解析
]

@ -1,39 +1,55 @@
"""lrj
OAuth 认证视图模块
处理第三方登录的完整流程授权回调用户绑定邮件验证等
"""
import logging
# Create your views here.
from urllib.parse import urlparse
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth import login
from django.contrib.auth import get_user_model, login
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.http import HttpResponseForbidden
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.shortcuts import render
from django.http import HttpResponseForbidden, HttpResponseRedirect
from django.shortcuts import get_object_or_404, render
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.views.generic import FormView
from djangoblog.blog_signals import oauth_user_login_signal
from djangoblog.utils import get_current_site
from djangoblog.utils import send_email, get_sha256
from djangoblog.utils import get_current_site, get_sha256, send_email
from oauth.forms import RequireEmailForm
from .models import OAuthUser
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException
from .oauthmanager import OAuthAccessTokenException, get_manager_by_type
# lrj配置日志记录器
logger = logging.getLogger(__name__)
def get_redirecturl(request):
"""lrj
获取安全的重定向URL防止开放重定向漏洞
Args:
request: Django请求对象
Returns:
str: 安全的跳转URL
"""
nexturl = request.GET.get('next_url', None)
# lrj处理特殊路径避免登录循环
if not nexturl or nexturl == '/login/' or nexturl == '/login':
nexturl = '/'
return nexturl
# lrj解析URL并验证域名安全性
p = urlparse(nexturl)
if p.netloc:
site = get_current_site().domain
# 比较域名忽略www前缀防止跨站重定向
if not p.netloc.replace('www.', '') == site.replace('www.', ''):
logger.info('非法url:' + nexturl)
return "/"
@ -41,26 +57,54 @@ def get_redirecturl(request):
def oauthlogin(request):
"""lrj
OAuth登录入口视图
重定向用户到第三方平台的授权页面
Args:
request: Django请求对象
Returns:
HttpResponseRedirect: 重定向到第三方授权页面或首页
"""
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
# lrj根据平台类型获取对应的OAuth管理器
manager = get_manager_by_type(type)
if not manager:
return HttpResponseRedirect('/')
#lrj 获取安全的重定向URL并生成授权页面URL
nexturl = get_redirecturl(request)
authorizeurl = manager.get_authorization_url(nexturl)
return HttpResponseRedirect(authorizeurl)
def authorize(request):
"""lrj
OAuth授权回调处理视图
处理第三方平台回调完成用户认证和账号绑定
Args:
request: Django请求对象
Returns:
HttpResponseRedirect: 重定向到相应页面
"""
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
manager = get_manager_by_type(type)
if not manager:
return HttpResponseRedirect('/')
# lrj获取授权码
code = request.GET.get('code', None)
try:
# lrj使用授权码获取访问令牌
rsp = manager.get_access_token_by_code(code)
except OAuthAccessTokenException as e:
logger.warning("OAuthAccessTokenException:" + str(e))
@ -68,74 +112,109 @@ def authorize(request):
except Exception as e:
logger.error(e)
rsp = None
nexturl = get_redirecturl(request)
if not rsp:
# lrj获取令牌失败重新跳转到授权页面
return HttpResponseRedirect(manager.get_authorization_url(nexturl))
#lrj 获取用户信息
user = manager.get_oauth_userinfo()
if user:
# lrj处理空昵称情况
if not user.nickname or not user.nickname.strip():
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
try:
# lrj检查是否已存在该OAuth用户
temp = OAuthUser.objects.get(type=type, openid=user.openid)
#lrj 更新用户信息
temp.picture = user.picture
temp.metadata = user.metadata
temp.nickname = user.nickname
user = temp
except ObjectDoesNotExist:
pass
# facebook的token过长
pass # lrj新用户继续处理
#lrj Facebook的token过长特殊处理
if type == 'facebook':
user.token = ''
# lrj如果用户有邮箱直接完成绑定和登录
if user.email:
with transaction.atomic():
with transaction.atomic(): # lrj使用事务保证数据一致性
author = None
try:
author = get_user_model().objects.get(id=user.author_id)
except ObjectDoesNotExist:
pass
if not author:
# lrj创建或获取用户账号
result = get_user_model().objects.get_or_create(email=user.email)
author = result[0]
if result[1]:
if result[1]: # lrj是新创建的用户
try:
#lrj 检查用户名是否已存在
get_user_model().objects.get(username=user.nickname)
except ObjectDoesNotExist:
author.username = user.nickname
else:
# lrj用户名冲突生成唯一用户名
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.source = 'authorize'
author.save()
# lrj关联OAuth用户和本站用户
user.author = author
user.save()
# lrj发送登录信号
oauth_user_login_signal.send(
sender=authorize.__class__, id=user.id)
# lrj登录用户
login(request, author)
return HttpResponseRedirect(nexturl)
else:
#lrj 没有邮箱,需要用户补充邮箱信息
user.save()
url = reverse('oauth:require_email', kwargs={
'oauthid': user.id
})
return HttpResponseRedirect(url)
else:
return HttpResponseRedirect(nexturl)
def emailconfirm(request, id, sign):
"""lrj
邮箱确认视图
通过邮件链接完成邮箱绑定和用户登录
Args:
request: Django请求对象
id: OAuth用户ID
sign: 安全签名
Returns:
HttpResponse: 重定向或错误响应
"""
if not sign:
return HttpResponseForbidden()
#lrj 验证签名安全性
if not get_sha256(settings.SECRET_KEY +
str(id) +
settings.SECRET_KEY).upper() == sign.upper():
return HttpResponseForbidden()
oauthuser = get_object_or_404(OAuthUser, pk=id)
with transaction.atomic():
if oauthuser.author:
author = get_user_model().objects.get(pk=oauthuser.author_id)
else:
# lrj创建新用户账号
result = get_user_model().objects.get_or_create(email=oauthuser.email)
author = result[0]
if result[1]:
@ -143,13 +222,20 @@ def emailconfirm(request, id, sign):
author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip(
) else "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.save()
# lrj完成绑定
oauthuser.author = author
oauthuser.save()
# lrj发送登录信号
oauth_user_login_signal.send(
sender=emailconfirm.__class__,
id=oauthuser.id)
# lrj登录用户
login(request, author)
#lrj 发送绑定成功邮件
site = 'http://' + get_current_site().domain
content = _('''
<p>Congratulations, you have successfully bound your email address. You can use
@ -163,6 +249,8 @@ def emailconfirm(request, id, sign):
''') % {'oauthuser_type': oauthuser.type, 'site': site}
send_email(emailto=[oauthuser.email, ], title=_('Congratulations on your successful binding!'), content=content)
# 跳转到绑定成功页面
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': id
})
@ -171,19 +259,27 @@ def emailconfirm(request, id, sign):
class RequireEmailView(FormView):
"""lrj
要求邮箱表单视图
处理用户补充邮箱信息的流程
"""
form_class = RequireEmailForm
template_name = 'oauth/require_email.html'
def get(self, request, *args, **kwargs):
"""lrjGET请求处理"""
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
# lrj如果已有邮箱可能直接跳转当前注释掉了
if oauthuser.email:
pass
# return HttpResponseRedirect('/')
# lrjreturn HttpResponseRedirect('/')
return super(RequireEmailView, self).get(request, *args, **kwargs)
def get_initial(self):
"""lrj设置表单初始值"""
oauthid = self.kwargs['oauthid']
return {
'email': '',
@ -191,6 +287,7 @@ class RequireEmailView(FormView):
}
def get_context_data(self, **kwargs):
"""lrj添加上下文数据"""
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.picture:
@ -198,13 +295,18 @@ class RequireEmailView(FormView):
return super(RequireEmailView, self).get_context_data(**kwargs)
def form_valid(self, form):
"""lrj表单验证通过后的处理"""
email = form.cleaned_data['email']
oauthid = form.cleaned_data['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
oauthuser.email = email
oauthuser.save()
# lrj生成安全签名
sign = get_sha256(settings.SECRET_KEY +
str(oauthuser.id) + settings.SECRET_KEY)
# lrj构建确认链接
site = get_current_site().domain
if settings.DEBUG:
site = '127.0.0.1:8000'
@ -214,6 +316,7 @@ class RequireEmailView(FormView):
})
url = "http://{site}{path}".format(site=site, path=path)
# lrj发送确认邮件
content = _("""
<p>Please click the link below to bind your email</p>
@ -226,6 +329,8 @@ class RequireEmailView(FormView):
%(url)s
""") % {'url': url}
send_email(emailto=[email, ], title=_('Bind your email'), content=content)
# lrj跳转到绑定成功提示页面
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': oauthid
})
@ -234,8 +339,20 @@ class RequireEmailView(FormView):
def bindsuccess(request, oauthid):
"""lrj
绑定成功提示页面
Args:
request: Django请求对象
oauthid: OAuth用户ID
Returns:
HttpResponse: 渲染的绑定成功页面
"""
type = request.GET.get('type', None)
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
#lrj 根据绑定类型显示不同内容
if type == 'email':
title = _('Bind your email')
content = _(
@ -247,7 +364,9 @@ def bindsuccess(request, oauthid):
"Congratulations, you have successfully bound your email address. You can use %(oauthuser_type)s"
" to directly log in to this website without a password. You are welcome to continue to follow this site." % {
'oauthuser_type': oauthuser.type})
return render(request, 'oauth/bindsuccess.html', {
'title': title,
'content': content
})

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save