第七—八周代码注释oauth

wzm_branch
yna1217 3 months ago
parent 088fc7cec3
commit 44e267d822

@ -0,0 +1,93 @@
import logging
from django.contrib import admin
# 注册模型到admin站点
from django.urls import reverse
from django.utils.html import format_html
# 初始化日志记录器,用于记录当前模块的日志信息
logger = logging.getLogger(__name__)
class OAuthUserAdmin(admin.ModelAdmin):
"""
OAuthUser模型的Admin管理类用于在Django admin后台管理第三方登录用户信息
"""
# 配置搜索字段,支持通过昵称和邮箱搜索
search_fields = ('nickname', 'email')
# 配置每页显示20条记录
list_per_page = 20
# 配置列表页显示的字段,包括自定义字段
list_display = (
'id',
'nickname', # 昵称
'link_to_usermodel', # 关联的本地用户(自定义链接字段)
'show_user_image', # 用户头像(自定义图片显示字段)
'type', # 第三方平台类型
'email', # 邮箱
)
# 配置列表页中可点击跳转编辑页的字段
list_display_links = ('id', 'nickname')
# 配置列表页的过滤条件
list_filter = ('author', 'type',)
# 初始只读字段列表(后续会动态扩展)
readonly_fields = []
def get_readonly_fields(self, request, obj=None):
"""
重写只读字段方法当编辑对象时将所有字段设为只读
新增时obj为None不生效编辑时obj存在所有字段只读
"""
if obj: # 编辑已有对象时
# 合并初始只读字段 + 模型所有普通字段 + 所有多对多字段
return list(self.readonly_fields) + \
[field.name for field in obj._meta.fields] + \
[field.name for field in obj._meta.many_to_many]
return self.readonly_fields # 新增时使用初始只读字段
def has_add_permission(self, request):
"""
禁用在admin后台手动添加OAuthUser的权限第三方用户信息应通过登录自动创建
"""
return False
def link_to_usermodel(self, obj):
"""
自定义列表字段生成关联本地用户的admin编辑页链接
"""
if obj.author: # 如果存在关联的本地用户
# 获取关联用户模型的app标签和模型名称
info = (obj.author._meta.app_label, obj.author._meta.model_name)
# 反转生成用户编辑页的URL
link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,))
# 返回带链接的HTML使用format_html确保安全渲染
return format_html(
u'<a href="%s">%s</a>' %
(link, obj.author.nickname if obj.author.nickname else obj.author.email)
# 显示昵称,若昵称不存在则显示邮箱
)
return None # 无关联用户时返回空
def show_user_image(self, obj):
"""
自定义列表字段显示用户头像图片
"""
img = obj.picture # 获取头像图片URL
if img: # 若头像存在
# 返回图片HTML标签限制宽高为50px
return format_html(u'<img src="%s" style="width:50px;height:50px"></img>' % (img))
return None # 无头像时返回空
# 定义自定义字段在列表页的显示名称
link_to_usermodel.short_description = '用户'
show_user_image.short_description = '用户头像'
class OAuthConfigAdmin(admin.ModelAdmin):
"""
OAuthConfig模型的Admin管理类用于在Django admin后台管理第三方登录配置信息
"""
# 配置列表页显示的字段
list_display = ('type', 'appkey', 'appsecret', 'is_enable')
# 配置列表页的过滤条件(按第三方平台类型过滤)
list_filter = ('type',)

@ -0,0 +1,11 @@
# 导入Django的AppConfig类用于配置应用的基本信息
from django.apps import AppConfig
class OauthConfig(AppConfig):
"""
oauth应用的配置类用于定义应用的核心信息
继承自Django的AppConfig是Django应用配置的标准方式
"""
# 应用的名称Django通过该名称识别和管理当前应用
name = 'oauth'

@ -0,0 +1,26 @@
# 导入Django表单基础类和控件模块
from django.contrib.auth.forms import forms
from django.forms import widgets
class RequireEmailForm(forms.Form):
"""
用于收集用户电子邮箱的表单类
通常在OAuth第三方登录时若用户未提供邮箱信息用于补充收集
"""
# 电子邮箱字段使用EmailField进行格式验证标签为"电子邮箱",且为必填项
email = forms.EmailField(label='电子邮箱', required=True)
# OAuth用户ID字段隐藏控件HiddenInput用于关联第三方登录用户非必填
oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False)
def __init__(self, *args, **kwargs):
"""
重写初始化方法自定义表单字段的控件属性
主要用于设置邮箱输入框的占位符和CSS样式类
"""
# 调用父类的初始化方法,确保表单正常初始化
super(RequireEmailForm, self).__init__(*args, **kwargs)
# 为email字段设置自定义控件EmailInput
# 添加placeholder提示文本和form-control的CSS类通常用于Bootstrap样式
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})

@ -0,0 +1,84 @@
# Generated by Django 4.1.7 on 2023-03-07 09:53
# 导入Django相关模块
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
"""迁移类,定义数据库表结构的变更操作"""
# 标识为初始迁移(首次创建表结构)
initial = True
# 依赖的其他迁移,此处依赖于用户模型的迁移
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
# 具体的迁移操作列表
operations = [
# 创建OAuthConfig模型存储OAuth第三方登录的配置信息
migrations.CreateModel(
name='OAuthConfig',
fields=[
# 自增主键ID
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# OAuth类型字段限定可选值为常见第三方平台默认值为'a'(可能需要后续调整)
('type', models.CharField(choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='类型')),
# 应用AppKey字段
('appkey', models.CharField(max_length=200, verbose_name='AppKey')),
# 应用AppSecret字段
('appsecret', models.CharField(max_length=200, verbose_name='AppSecret')),
# 回调地址字段,默认值为百度首页
('callback_url', models.CharField(default='http://www.baidu.com', max_length=200, verbose_name='回调地址')),
# 是否启用该OAuth配置的布尔字段默认启用
('is_enable', models.BooleanField(default=True, verbose_name='是否显示')),
# 创建时间字段,默认值为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 最后修改时间字段,默认值为当前时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
],
# 模型元数据配置
options={
'verbose_name': 'oauth配置', # 单数显示名称
'verbose_name_plural': 'oauth配置', # 复数显示名称
'ordering': ['-created_time'], # 排序方式:按创建时间倒序
},
),
# 创建OAuthUser模型存储通过OAuth登录的用户信息
migrations.CreateModel(
name='OAuthUser',
fields=[
# 自增主键ID
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 第三方平台的openid
('openid', models.CharField(max_length=50)),
# 第三方平台的用户昵称
('nickname', models.CharField(max_length=50, verbose_name='昵称')),
# 访问令牌,可为空
('token', models.CharField(blank=True, max_length=150, null=True)),
# 头像图片地址,可为空
('picture', models.CharField(blank=True, max_length=350, null=True)),
# OAuth类型对应第三方平台
('type', models.CharField(max_length=50)),
# 邮箱地址,可为空
('email', models.CharField(blank=True, max_length=50, null=True)),
# 额外元数据,文本类型,可为空
('metadata', models.TextField(blank=True, null=True)),
# 创建时间字段,默认值为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 最后修改时间字段,默认值为当前时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
# 关联到本地用户模型的外键,可为空,级联删除
('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='用户')),
],
# 模型元数据配置
options={
'verbose_name': 'oauth用户', # 单数显示名称
'verbose_name_plural': 'oauth用户', # 复数显示名称
'ordering': ['-created_time'], # 排序方式:按创建时间倒序
},
),
]

@ -0,0 +1,111 @@
# Generated by Django 4.2.5 on 2023-09-06 13:13
# 导入Django配置、数据库迁移相关模块
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
"""迁移类定义对OAuth相关模型的结构修改操作"""
# 依赖的迁移依赖于用户模型的迁移和oauth应用的初始迁移(0001_initial)
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('oauth', '0001_initial'),
]
# 具体的迁移操作列表
operations = [
# 修改OAuthConfig模型的元选项
migrations.AlterModelOptions(
name='oauthconfig',
# 排序方式改为按creation_time倒序显示名称保持为"oauth配置"
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth配置', 'verbose_name_plural': 'oauth配置'},
),
# 修改OAuthUser模型的元选项
migrations.AlterModelOptions(
name='oauthuser',
# 排序方式改为按creation_time倒序显示名称改为英文"oauth user"
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth user', 'verbose_name_plural': 'oauth user'},
),
# 移除OAuthConfig模型中的created_time字段旧时间字段
migrations.RemoveField(
model_name='oauthconfig',
name='created_time',
),
# 移除OAuthConfig模型中的last_mod_time字段旧修改时间字段
migrations.RemoveField(
model_name='oauthconfig',
name='last_mod_time',
),
# 移除OAuthUser模型中的created_time字段旧时间字段
migrations.RemoveField(
model_name='oauthuser',
name='created_time',
),
# 移除OAuthUser模型中的last_mod_time字段旧修改时间字段
migrations.RemoveField(
model_name='oauthuser',
name='last_mod_time',
),
# 为OAuthConfig模型添加creation_time字段新创建时间字段
migrations.AddField(
model_name='oauthconfig',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 为OAuthConfig模型添加last_modify_time字段新修改时间字段
migrations.AddField(
model_name='oauthconfig',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
),
# 为OAuthUser模型添加creation_time字段新创建时间字段
migrations.AddField(
model_name='oauthuser',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 为OAuthUser模型添加last_modify_time字段新修改时间字段
migrations.AddField(
model_name='oauthuser',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
),
# 修改OAuthConfig模型的callback_url字段属性
migrations.AlterField(
model_name='oauthconfig',
name='callback_url',
# 默认值从"http://www.baidu.com"改为空字符串;显示名称改为英文"callback url"
field=models.CharField(default='', max_length=200, verbose_name='callback url'),
),
# 修改OAuthConfig模型的is_enable字段属性
migrations.AlterField(
model_name='oauthconfig',
name='is_enable',
# 显示名称改为英文"is enable"
field=models.BooleanField(default=True, verbose_name='is enable'),
),
# 修改OAuthConfig模型的type字段属性
migrations.AlterField(
model_name='oauthconfig',
name='type',
# 选项中的显示文本部分改为英文(如"微博"改为"weibo");显示名称改为英文"type"
field=models.CharField(choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='type'),
),
# 修改OAuthUser模型的author字段属性
migrations.AlterField(
model_name='oauthuser',
name='author',
# 显示名称改为英文"author"
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'),
),
# 修改OAuthUser模型的nickname字段属性
migrations.AlterField(
model_name='oauthuser',
name='nickname',
# 显示名称改为英文"nickname"
field=models.CharField(max_length=50, verbose_name='nickname'),
),
]

@ -0,0 +1,23 @@
# Generated by Django 4.2.7 on 2024-01-26 02:41
# 导入Django数据库迁移相关模块
from django.db import migrations, models
class Migration(migrations.Migration):
"""迁移类定义对OAuthUser模型的字段修改操作"""
# 依赖的迁移依赖于oauth应用的上一个迁移文件(0002_...)
dependencies = [
('oauth', '0002_alter_oauthconfig_options_alter_oauthuser_options_and_more'),
]
# 具体的迁移操作列表
operations = [
# 修改OAuthUser模型的nickname字段属性
migrations.AlterField(
model_name='oauthuser', # 目标模型为OAuthUser
name='nickname', # 目标字段为nickname
# 字段的verbose_name从'nickname'修改为'nick name'其他属性如max_length=50保持不变
field=models.CharField(max_length=50, verbose_name='nick name'),
),
]

@ -0,0 +1,99 @@
# Create your models here.
# 导入Django配置、模型相关模块及工具类
from django.conf import settings
from django.core.exceptions import ValidationError # 用于数据验证抛出异常
from django.db import models
from django.utils.timezone import now # 用于获取当前时间
from django.utils.translation import gettext_lazy as _ # 用于国际化翻译
class OAuthUser(models.Model):
"""
OAuthUser模型存储通过第三方OAuth登录的用户信息
关联本地用户模型记录第三方平台的用户标识昵称头像等核心信息
"""
# 关联本地用户模型AUTH_USER_MODEL可为空级联删除
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'), # 国际化字段名:作者/用户
blank=True, # 表单提交时可空
null=True, # 数据库中可空
on_delete=models.CASCADE) # 关联用户删除时,该记录同步删除
# 第三方平台用户唯一标识如微博、GitHub的openid
openid = models.CharField(max_length=50)
# 用户昵称,支持国际化显示
nickname = models.CharField(max_length=50, verbose_name=_('nick name'))
# 第三方平台访问令牌,可为空
token = models.CharField(max_length=150, null=True, blank=True)
# 用户头像图片URL可为空
picture = models.CharField(max_length=350, blank=True, null=True)
# OAuth登录平台类型如weibo、github非空
type = models.CharField(blank=False, null=False, max_length=50)
# 用户邮箱,可为空
email = models.CharField(max_length=50, null=True, blank=True)
# 额外元数据(存储第三方返回的其他信息),文本类型,可为空
metadata = models.TextField(null=True, blank=True)
# 创建时间,默认当前时间,支持国际化
creation_time = models.DateTimeField(_('creation time'), default=now)
# 最后修改时间,默认当前时间,支持国际化
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
def __str__(self):
"""模型实例的字符串表示:返回用户昵称"""
return self.nickname
class Meta:
verbose_name = _('oauth user') # 模型单数显示名(国际化)
verbose_name_plural = verbose_name # 模型复数显示名(与单数一致)
ordering = ['-creation_time'] # 排序规则:按创建时间倒序
class OAuthConfig(models.Model):
"""
OAuthConfig模型存储第三方OAuth登录的平台配置信息
记录各平台的AppKeyAppSecret回调地址等核心配置
"""
# 第三方平台类型选项元组形式用于choices参数
TYPE = (
('weibo', _('weibo')), # 微博(支持国际化)
('google', _('google')), # 谷歌(支持国际化)
('github', 'GitHub'), # GitHub固定显示
('facebook', 'FaceBook'), # FaceBook固定显示
('qq', 'QQ'), # QQ固定显示
)
# 平台类型关联TYPE选项默认值为'a',支持国际化
type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a')
# 应用AppKey第三方平台分配的客户端ID
appkey = models.CharField(max_length=200, verbose_name='AppKey')
# 应用AppSecret第三方平台分配的客户端密钥
appsecret = models.CharField(max_length=200, verbose_name='AppSecret')
# 回调地址OAuth授权成功后的跳转地址非空默认空字符串支持国际化
callback_url = models.CharField(
max_length=200,
verbose_name=_('callback url'),
blank=False, # 表单提交时不可空
default='')
# 是否启用该配置,默认启用,非空
is_enable = models.BooleanField(
_('is enable'), default=True, blank=False, null=False)
# 创建时间,默认当前时间,支持国际化
creation_time = models.DateTimeField(_('creation time'), default=now)
# 最后修改时间,默认当前时间,支持国际化
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
def clean(self):
"""数据验证方法:确保同一平台类型的配置不重复"""
# 排除当前记录(编辑时),查询是否已存在同类型的配置
if OAuthConfig.objects.filter(
type=self.type).exclude(id=self.id).count():
# 抛出验证异常,提示该平台配置已存在
raise ValidationError(_(self.type + _('already exists')))
def __str__(self):
"""模型实例的字符串表示:返回平台类型"""
return self.type
class Meta:
verbose_name = 'oauth配置' # 模型单数显示名(中文)
verbose_name_plural = verbose_name # 模型复数显示名(与单数一致)
ordering = ['-creation_time'] # 排序规则:按创建时间倒序

@ -0,0 +1,593 @@
import json
import logging
import os
import urllib.parse
from abc import ABCMeta, abstractmethod # 用于定义抽象基类
import requests # 用于发送HTTP请求
from djangoblog.utils import cache_decorator # 导入缓存装饰器
from oauth.models import OAuthUser, OAuthConfig # 导入OAuth相关模型
# 初始化日志记录器,用于记录当前模块的日志信息
logger = logging.getLogger(__name__)
class OAuthAccessTokenException(Exception):
'''
自定义异常OAuth授权过程中获取Access Token失败时抛出
'''
class BaseOauthManager(metaclass=ABCMeta):
"""
OAuth抽象基类定义第三方登录的通用接口和基础方法
所有第三方平台的OAuth管理器都需继承此类并实现抽象方法
"""
# 子类需重写授权页面URL用户跳转授权用
AUTH_URL = None
# 子类需重写获取Access Token的URL
TOKEN_URL = None
# 子类需重写获取用户信息的API URL
API_URL = None
# 子类需重写平台图标名称对应OAuthConfig的type字段
ICON_NAME = None
def __init__(self, access_token=None, openid=None):
"""
初始化OAuth管理器
:param access_token: 第三方平台返回的访问令牌
:param openid: 第三方平台用户唯一标识
"""
self.access_token = access_token # 存储访问令牌
self.openid = openid # 存储用户唯一标识
@property
def is_access_token_set(self):
"""属性判断Access Token是否已设置"""
return self.access_token is not None
@property
def is_authorized(self):
"""属性判断是否已完成授权Access Token和OpenID均存在"""
return self.is_access_token_set and self.access_token is not None and self.openid is not None
@abstractmethod
def get_authorization_url(self, nexturl='/'):
"""
抽象方法生成第三方授权页面的URL
:param nexturl: 授权成功后跳转的页面地址
:return: 完整的授权URL字符串
"""
pass
@abstractmethod
def get_access_token_by_code(self, code):
"""
抽象方法通过授权码code获取Access Token
:param code: 第三方平台返回的授权码
:return: 成功返回用户信息或Token失败抛出异常
"""
pass
@abstractmethod
def get_oauth_userinfo(self):
"""
抽象方法通过Access Token获取第三方用户信息
:return: 构造好的OAuthUser对象失败返回None
"""
pass
@abstractmethod
def get_picture(self, metadata):
"""
抽象方法从用户元数据中提取头像URL
:param metadata: 存储用户信息的元数据JSON字符串
:return: 头像URL字符串
"""
pass
def do_get(self, url, params, headers=None):
"""
基础方法发送GET请求子类可重写
:param url: 请求地址
:param params: 请求参数
:param headers: 请求头
:return: 响应文本内容
"""
rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text) # 记录响应日志
return rsp.text
def do_post(self, url, params, headers=None):
"""
基础方法发送POST请求子类可重写
:param url: 请求地址
:param params: 请求参数
:param headers: 请求头
:return: 响应文本内容
"""
rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text) # 记录响应日志
return rsp.text
def get_config(self):
"""
获取当前平台的OAuth配置从OAuthConfig模型中查询
:return: OAuthConfig对象不存在返回None
"""
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None
class WBOauthManager(BaseOauthManager):
"""微博OAuth登录管理器实现微博第三方登录的具体逻辑"""
AUTH_URL = 'https://api.weibo.com/oauth2/authorize' # 微博授权URL
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token' # 微博Token获取URL
API_URL = 'https://api.weibo.com/2/users/show.json' # 微博用户信息API
ICON_NAME = 'weibo' # 对应配置中的平台类型
def __init__(self, access_token=None, openid=None):
# 先获取微博的OAuth配置
config = self.get_config()
self.client_id = config.appkey if config else '' # 应用ID
self.client_secret = config.appsecret if config else '' # 应用密钥
self.callback_url = config.callback_url if config else '' # 回调地址
# 调用父类初始化方法
super(WBOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, nexturl='/'):
"""生成微博授权URL拼接跳转地址参数"""
params = {
'client_id': self.client_id,
'response_type': 'code', # 授权类型为code
'redirect_uri': self.callback_url + '&next_url=' + nexturl # 回调地址+登录后跳转地址
}
# 拼接参数生成完整URL
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取微博Access Token并调用用户信息接口"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code', # 授权模式
'code': code, # 授权码
'redirect_uri': self.callback_url # 回调地址(需与授权时一致)
}
# 发送POST请求获取Token
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
# 成功获取Token则继续获取用户信息
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['uid']) # 微博用户唯一标识uid
return self.get_oauth_userinfo()
else:
# 失败则抛出异常
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""通过Access Token获取微博用户信息构造OAuthUser对象"""
if not self.is_authorized:
return None # 未授权则返回None
params = {
'uid': self.openid,
'access_token': self.access_token
}
# 发送GET请求获取用户信息
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp # 存储原始响应数据
user.picture = datas['avatar_large'] # 大尺寸头像
user.nickname = datas['screen_name'] # 微博昵称
user.openid = datas['id'] # 用户唯一标识
user.type = 'weibo' # 平台类型
user.token = self.access_token # 存储Access Token
# 若返回邮箱则赋值
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('weibo oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
"""从元数据中提取微博用户头像URL"""
datas = json.loads(metadata)
return datas['avatar_large']
class ProxyManagerMixin:
"""
代理混入类为HTTP请求添加代理支持
需与BaseOauthManager组合使用适用于需要代理访问的平台如谷歌GitHub
"""
def __init__(self, *args, **kwargs):
# 从环境变量读取代理配置
if os.environ.get("HTTP_PROXY"):
self.proxies = {
"http": os.environ.get("HTTP_PROXY"),
"https": os.environ.get("HTTP_PROXY")
}
else:
self.proxies = None # 无代理则为None
# 调用父类初始化方法(注意:混入类需放在继承列表前面)
super().__init__(*args, **kwargs)
def do_get(self, url, params, headers=None):
"""重写GET方法添加代理参数"""
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
"""重写POST方法添加代理参数"""
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
"""谷歌OAuth登录管理器集成代理支持实现谷歌第三方登录逻辑"""
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' # 谷歌授权URL
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token' # 谷歌Token获取URL
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo' # 谷歌用户信息API
ICON_NAME = 'google' # 对应配置中的平台类型
def __init__(self, access_token=None, openid=None):
# 获取谷歌OAuth配置
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
# 调用父类ProxyManagerMixin初始化方法
super(GoogleOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, nexturl='/'):
"""生成谷歌授权URL请求openid和email权限"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'openid email', # 授权范围:获取用户标识和邮箱
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取谷歌Access Token"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['id_token']) # 谷歌用户唯一标识id_token
logger.info(self.ICON_NAME + ' oauth ' + rsp)
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""通过Access Token获取谷歌用户信息"""
if not self.is_authorized:
return None
params = {'access_token': self.access_token}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['picture'] # 头像URL
user.nickname = datas['name'] # 用户名
user.openid = datas['sub'] # 用户唯一标识
user.token = self.access_token
user.type = 'google'
if datas['email']:
user.email = datas['email'] # 邮箱(谷歌授权时已请求)
return user
except Exception as e:
logger.error(e)
logger.error('google oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
"""从元数据中提取谷歌用户头像URL"""
datas = json.loads(metadata)
return datas['picture']
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
"""GitHub OAuth登录管理器集成代理支持实现GitHub第三方登录逻辑"""
AUTH_URL = 'https://github.com/login/oauth/authorize' # GitHub授权URL
TOKEN_URL = 'https://github.com/login/oauth/access_token' # GitHub Token获取URL
API_URL = 'https://api.github.com/user' # GitHub用户信息API
ICON_NAME = 'github' # 对应配置中的平台类型
def __init__(self, access_token=None, openid=None):
# 获取GitHub OAuth配置
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
# 调用父类初始化方法
super(GitHubOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""生成GitHub授权URL请求user权限"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': f'{self.callback_url}&next_url={next_url}', # 回调+跳转地址
'scope': 'user' # 授权范围:获取用户基本信息
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取GitHub Access Token返回格式为query string"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
# GitHub返回的Token是query string格式需解析
from urllib import parse
r = parse.parse_qs(rsp)
if 'access_token' in r:
self.access_token = (r['access_token'][0]) # 取第一个值(列表格式)
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""通过Access Token获取GitHub用户信息需在请求头中携带Token"""
# GitHub需在请求头中传递Token而非URL参数
rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token
})
try:
datas = json.loads(rsp)
user = OAuthUser()
user.picture = datas['avatar_url'] # 头像URL
user.nickname = datas['name'] # 用户名可能为None优先显示login
user.openid = datas['id'] # 用户唯一ID
user.type = 'github'
user.token = self.access_token
user.metadata = rsp
# 若返回邮箱则赋值GitHub部分用户邮箱可能为None
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('github oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
"""从元数据中提取GitHub用户头像URL"""
datas = json.loads(metadata)
return datas['avatar_url']
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
"""Facebook OAuth登录管理器集成代理支持实现Facebook第三方登录逻辑"""
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth' # Facebook授权URLv16.0版本)
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token' # Facebook Token获取URL
API_URL = 'https://graph.facebook.com/me' # Facebook用户信息API
ICON_NAME = 'facebook' # 对应配置中的平台类型
def __init__(self, access_token=None, openid=None):
# 获取Facebook OAuth配置
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
# 调用父类初始化方法
super(FaceBookOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""生成Facebook授权URL请求email和公开资料权限"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'email,public_profile' # 授权范围
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取Facebook Access Token"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
token = str(obj['access_token'])
self.access_token = token
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""通过Access Token获取Facebook用户信息需指定返回字段"""
params = {
'access_token': self.access_token,
'fields': 'id,name,picture,email' # 指定返回的字段(减少冗余)
}
try:
rsp = self.do_get(self.API_URL, params)
datas = json.loads(rsp)
user = OAuthUser()
user.nickname = datas['name'] # 用户名
user.openid = datas['id'] # 用户唯一标识
user.type = 'facebook'
user.token = self.access_token
user.metadata = rsp
# 赋值邮箱可能为None
if 'email' in datas and datas['email']:
user.email = datas['email']
# 处理头像Facebook头像嵌套在picture.data.url中
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
user.picture = str(datas['picture']['data']['url'])
return user
except Exception as e:
logger.error(e)
return None
def get_picture(self, metadata):
"""从元数据中提取Facebook用户头像URL处理嵌套结构"""
datas = json.loads(metadata)
return str(datas['picture']['data']['url'])
class QQOauthManager(BaseOauthManager):
"""QQ OAuth登录管理器实现QQ第三方登录逻辑需单独获取OpenID"""
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize' # QQ授权URL
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token' # QQ Token获取URL
API_URL = 'https://graph.qq.com/user/get_user_info' # QQ用户信息API
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' # QQ OpenID获取URL单独接口
ICON_NAME = 'qq' # 对应配置中的平台类型
def __init__(self, access_token=None, openid=None):
# 获取QQ OAuth配置
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
# 调用父类初始化方法
super(QQOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""生成QQ授权URL拼接跳转地址"""
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取QQ Access Token返回格式为query string"""
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': self.callback_url
}
# QQ获取Token使用GET请求
rsp = self.do_get(self.TOKEN_URL, params)
if rsp:
# 解析query string格式的响应
d = urllib.parse.parse_qs(rsp)
if 'access_token' in d:
token = d['access_token']
self.access_token = token[0] # 取第一个值
return token
else:
raise OAuthAccessTokenException(rsp)
def get_open_id(self):
"""单独获取QQ用户的OpenIDQQ OAuth特殊流程"""
if self.is_access_token_set:
params = {'access_token': self.access_token}
rsp = self.do_get(self.OPEN_ID_URL, params)
if rsp:
# QQ返回的OpenID格式为callback包裹的JSON需处理格式
rsp = rsp.replace('callback(', '').replace(')', '').replace(';', '')
obj = json.loads(rsp)
openid = str(obj['openid'])
self.openid = openid
return openid
def get_oauth_userinfo(self):
"""获取QQ用户信息需先获取OpenID"""
openid = self.get_open_id()
if openid:
params = {
'access_token': self.access_token,
'oauth_consumer_key': self.client_id, # QQ需额外传递client_id
'openid': self.openid
}
rsp = self.do_get(self.API_URL, params)
logger.info(rsp)
obj = json.loads(rsp)
user = OAuthUser()
user.nickname = obj['nickname'] # 昵称
user.openid = openid # 唯一标识
user.type = 'qq'
user.token = self.access_token
user.metadata = rsp
# 赋值邮箱可能为None
if 'email' in obj:
user.email = obj['email']
# 赋值头像figureurl为QQ头像URL
if 'figureurl' in obj:
user.picture = str(obj['figureurl'])
return user
def get_picture(self, metadata):
"""从元数据中提取QQ用户头像URL"""
datas = json.loads(metadata)
return str(datas['figureurl'])
@cache_decorator(expiration=100 * 60) # 缓存100分钟减少数据库查询
def get_oauth_apps():
"""
获取所有启用的OAuth应用管理器实例
:return: 启用的OAuthManager实例列表
"""
# 查询所有已启用的OAuth配置
configs = OAuthConfig.objects.filter(is_enable=True).all()
if not configs:
return [] # 无启用配置则返回空列表
# 提取已启用的平台类型
configtypes = [x.type for x in configs]
# 获取BaseOauthManager的所有子类各平台实现类
applications = BaseOauthManager.__subclasses__()
# 筛选出已启用的平台实例
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
return apps
def get_manager_by_type(type):
"""
根据平台类型获取对应的OAuth管理器实例
:param type: 平台类型如weibogithub
:return: 对应的OAuthManager实例不存在返回None
"""
applications = get_oauth_apps()
if applications:
# 忽略大小写匹配平台类型
finds = list(filter(lambda x: x.ICON_NAME.lower() == type.lower(), applications))
if finds:
return finds[0] # 返回第一个匹配的实例
return None

@ -0,0 +1,54 @@
# 导入Django模板相关模块和URL反转函数
from django import template
from django.urls import reverse
# 导入获取OAuth应用配置的工具函数
from oauth.oauthmanager import get_oauth_apps
# 注册一个模板标签库,用于在模板中使用自定义标签
register = template.Library()
@register.inclusion_tag('oauth/oauth_applications.html')
def load_oauth_applications(request):
"""
自定义模板包含标签用于加载可用的OAuth登录应用信息并传递给模板
功能
1. 获取所有启用的OAuth应用配置
2. 为每个应用构建对应的登录URL包含类型和跳转路径
3. 将处理后的应用列表传递给'oauth/oauth_applications.html'模板
参数
request: 当前请求对象用于获取当前完整路径登录后跳转使用
返回
包含应用列表的字典供模板渲染使用
"""
# 获取所有可用的OAuth应用配置
applications = get_oauth_apps()
if applications:
# 生成OAuth登录的基础URL通过URL名称反转得到
baseurl = reverse('oauth:oauthlogin')
# 获取当前请求的完整路径(用于登录成功后跳转回原页面)
path = request.get_full_path()
# 处理每个应用生成包含图标名称和完整登录URL的元组列表
apps = list(map(lambda x: (
x.ICON_NAME, # 应用图标名称
# 构建登录URL包含应用类型和跳转路径参数
'{baseurl}?type={type}&next_url={next}'.format(
baseurl=baseurl,
type=x.ICON_NAME,
next=path
)
), applications))
else:
# 若没有可用应用,返回空列表
apps = []
# 返回上下文数据,供模板使用
return {
'apps': apps
}

@ -0,0 +1,319 @@
import json
from unittest.mock import patch # 用于模拟外部依赖如第三方API调用
from django.conf import settings
from django.contrib import auth # 用于用户认证相关操作
from django.test import Client, RequestFactory, TestCase # Django测试工具
from django.urls import reverse # 用于反向解析URL
from djangoblog.utils import get_sha256 # 导入自定义加密工具函数
from oauth.models import OAuthConfig # 导入OAuth配置模型
from oauth.oauthmanager import BaseOauthManager # 导入OAuth基础管理器
# Create your tests here.
class OAuthConfigTest(TestCase):
"""测试OAuth配置模型及基础登录流程"""
def setUp(self):
"""测试前初始化:创建客户端和请求工厂"""
self.client = Client() # 用于模拟HTTP请求的客户端
self.factory = RequestFactory() # 用于创建请求对象的工厂
def test_oauth_login_test(self):
"""测试OAuth配置创建后登录链接和授权回调的正确性"""
# 创建一个微博的OAuth配置
c = OAuthConfig()
c.type = 'weibo'
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
# 测试访问微博OAuth登录链接是否重定向到正确的授权页面
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302) # 验证重定向状态码
self.assertTrue("api.weibo.com" in response.url) # 验证重定向到微博API
# 测试授权回调接口是否正常重定向
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302) # 验证重定向状态码
self.assertEqual(response.url, '/') # 验证默认重定向到首页
class OauthLoginTest(TestCase):
"""测试各第三方平台的OAuth登录流程"""
def setUp(self) -> None:
"""测试前初始化创建客户端、请求工厂并初始化所有平台的OAuth配置"""
self.client = Client()
self.factory = RequestFactory()
self.apps = self.init_apps() # 初始化所有启用的OAuth应用
def init_apps(self):
"""为所有BaseOauthManager的子类各平台管理器创建对应的OAuth配置"""
# 获取所有第三方平台的OAuth管理器实例
applications = [p() for p in BaseOauthManager.__subclasses__()]
# 为每个平台创建默认配置并保存到数据库
for application in applications:
c = OAuthConfig()
c.type = application.ICON_NAME.lower()
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
return applications
def get_app_by_type(self, type):
"""根据平台类型获取对应的OAuth管理器实例"""
for app in self.apps:
if app.ICON_NAME.lower() == type:
return app
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_login(self, mock_do_get, mock_do_post):
"""测试微博登录流程获取授权链接、Token及用户信息"""
# 获取微博OAuth管理器实例
weibo_app = self.get_app_by_type('weibo')
assert weibo_app # 确保实例存在
# 验证授权链接生成无需mock直接调用方法
url = weibo_app.get_authorization_url()
# 模拟获取Token的响应
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"uid": "uid"
})
# 模拟获取用户信息的响应
mock_do_get.return_value = json.dumps({
"avatar_large": "avatar_large",
"screen_name": "screen_name",
"id": "id",
"email": "email",
})
# 测试通过code获取用户信息
userinfo = weibo_app.get_access_token_by_code('code')
# 验证用户信息是否正确
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'id')
@patch("oauth.oauthmanager.GoogleOauthManager.do_post")
@patch("oauth.oauthmanager.GoogleOauthManager.do_get")
def test_google_login(self, mock_do_get, mock_do_post):
"""测试谷歌登录流程获取Token及用户信息"""
google_app = self.get_app_by_type('google')
assert google_app
# 验证授权链接生成
url = google_app.get_authorization_url()
# 模拟获取Token的响应
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"id_token": "id_token",
})
# 模拟获取用户信息的响应
mock_do_get.return_value = json.dumps({
"picture": "picture",
"name": "name",
"sub": "sub",
"email": "email",
})
# 测试获取Token和用户信息
token = google_app.get_access_token_by_code('code')
userinfo = google_app.get_oauth_userinfo()
# 验证用户信息
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'sub')
@patch("oauth.oauthmanager.GitHubOauthManager.do_post")
@patch("oauth.oauthmanager.GitHubOauthManager.do_get")
def test_github_login(self, mock_do_get, mock_do_post):
"""测试GitHub登录流程验证授权链接、Token及用户信息"""
github_app = self.get_app_by_type('github')
assert github_app
# 验证授权链接包含GitHub域名和client_id参数
url = github_app.get_authorization_url()
self.assertTrue("github.com" in url)
self.assertTrue("client_id" in url)
# 模拟GitHub返回的Tokenquery string格式
mock_do_post.return_value = "access_token=gho_16C7e42F292c6912E7710c838347Ae178B4a&scope=repo%2Cgist&token_type=bearer"
# 模拟用户信息响应
mock_do_get.return_value = json.dumps({
"avatar_url": "avatar_url",
"name": "name",
"id": "id",
"email": "email",
})
# 测试获取Token和用户信息
token = github_app.get_access_token_by_code('code')
userinfo = github_app.get_oauth_userinfo()
# 验证Token和openid
self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a')
self.assertEqual(userinfo.openid, 'id')
@patch("oauth.oauthmanager.FaceBookOauthManager.do_post")
@patch("oauth.oauthmanager.FaceBookOauthManager.do_get")
def test_facebook_login(self, mock_do_get, mock_do_post):
"""测试Facebook登录流程验证授权链接、Token及用户信息"""
facebook_app = self.get_app_by_type('facebook')
assert facebook_app
# 验证授权链接包含Facebook域名
url = facebook_app.get_authorization_url()
self.assertTrue("facebook.com" in url)
# 模拟获取Token的响应
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
})
# 模拟用户信息响应(包含嵌套的头像结构)
mock_do_get.return_value = json.dumps({
"name": "name",
"id": "id",
"email": "email",
"picture": {
"data": {
"url": "url"
}
}
})
# 测试获取Token和用户信息
token = facebook_app.get_access_token_by_code('code')
userinfo = facebook_app.get_oauth_userinfo()
# 验证Token
self.assertEqual(userinfo.token, 'access_token')
@patch("oauth.oauthmanager.QQOauthManager.do_get", side_effect=[
# 模拟三次GET请求的响应1.获取Token 2.获取OpenID 3.获取用户信息
'access_token=access_token&expires_in=3600',
'callback({"client_id":"appid","openid":"openid"} );',
json.dumps({
"nickname": "nickname",
"email": "email",
"figureurl": "figureurl",
"openid": "openid",
})
])
def test_qq_login(self, mock_do_get):
"""测试QQ登录流程QQ需单独获取OpenID"""
qq_app = self.get_app_by_type('qq')
assert qq_app
# 验证授权链接包含QQ域名
url = qq_app.get_authorization_url()
self.assertTrue("qq.com" in url)
# 测试获取Token和用户信息
token = qq_app.get_access_token_by_code('code')
userinfo = qq_app.get_oauth_userinfo()
# 验证Token
self.assertEqual(userinfo.token, 'access_token')
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_with_email(self, mock_do_get, mock_do_post):
"""测试微博登录(用户提供邮箱):验证自动注册登录流程"""
# 模拟获取Token的响应
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"uid": "uid"
})
# 模拟包含邮箱的用户信息
mock_user_info = {
"avatar_large": "avatar_large",
"screen_name": "screen_name1",
"id": "id",
"email": "email",
}
mock_do_get.return_value = json.dumps(mock_user_info)
# 测试访问登录链接是否重定向到微博授权页
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
# 测试授权回调后是否重定向到首页
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
# 验证用户已登录,信息正确
user = auth.get_user(self.client)
assert user.is_authenticated
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email'])
# 测试退出登录后再次登录是否正常
self.client.logout()
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
# 再次验证用户登录状态和信息
user = auth.get_user(self.client)
assert user.is_authenticated
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email'])
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_without_email(self, mock_do_get, mock_do_post):
"""测试微博登录(用户未提供邮箱):验证补充邮箱、绑定及确认流程"""
# 模拟获取Token的响应
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"uid": "uid"
})
# 模拟不包含邮箱的用户信息
mock_user_info = {
"avatar_large": "avatar_large",
"screen_name": "screen_name1",
"id": "id",
}
mock_do_get.return_value = json.dumps(mock_user_info)
# 测试访问登录链接
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
# 测试授权后是否跳转到补充邮箱页面
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
# 提取补充邮箱页面的OAuth用户ID
oauth_user_id = int(response.url.split('/')[-1].split('.')[0])
self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html')
# 测试提交邮箱后是否跳转到绑定成功页
response = self.client.post(response.url, {'email': 'test@gmail.com', 'oauthid': oauth_user_id})
self.assertEqual(response.status_code, 302)
# 生成验证签名(模拟邮箱确认流程)
sign = get_sha256(settings.SECRET_KEY + str(oauth_user_id) + settings.SECRET_KEY)
url = reverse('oauth:bindsuccess', kwargs={'oauthid': oauth_user_id})
self.assertEqual(response.url, f'{url}?type=email')
# 测试邮箱确认后是否跳转到成功页,且用户登录状态正确
path = reverse('oauth:email_confirm', kwargs={'id': oauth_user_id, 'sign': sign})
response = self.client.get(path)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success')
# 验证用户信息用户名、邮箱及OAuth用户关联正确
user = auth.get_user(self.client)
from oauth.models import OAuthUser
oauth_user = OAuthUser.objects.get(author=user)
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, 'test@gmail.com')
self.assertEqual(oauth_user.pk, oauth_user_id)

@ -0,0 +1,40 @@
# 导入Django的URL路径配置模块
from django.urls import path
# 导入当前应用oauth的视图模块
from . import views
# 定义应用命名空间,用于模板或反向解析时指定应用(如:{% url 'oauth:oauthlogin' %}
app_name = "oauth"
# URL路由配置列表映射URL路径到对应的视图函数/类
urlpatterns = [
# 1. OAuth授权回调接口接收第三方平台返回的授权码code处理后续登录逻辑
path(
r'oauth/authorize',
views.authorize), # 对应视图函数authorize
# 2. 补充邮箱页面:第三方登录时用户未提供邮箱,跳转至此页面补充
path(
r'oauth/requireemail/<int:oauthid>.html', # 路径参数oauthidOAuthUser的ID
views.RequireEmailView.as_view(), # 对应基于类的视图RequireEmailView
name='require_email'), # 路由名称:用于反向解析
# 3. 邮箱确认接口验证用户补充邮箱的有效性通过sign签名验证
path(
r'oauth/emailconfirm/<int:id>/<sign>.html', # 路径参数idOAuthUser的ID、sign验证签名
views.emailconfirm, # 对应视图函数emailconfirm
name='email_confirm'), # 路由名称:用于反向解析
# 4. 绑定成功页面:邮箱补充或账号绑定完成后,展示成功提示
path(
r'oauth/bindsuccess/<int:oauthid>.html', # 路径参数oauthidOAuthUser的ID
views.bindsuccess, # 对应视图函数bindsuccess
name='bindsuccess'), # 路由名称:用于反向解析
# 5. OAuth登录入口生成第三方平台的授权链接跳转至第三方授权页面
path(
r'oauth/oauthlogin',
views.oauthlogin, # 对应视图函数oauthlogin
name='oauthlogin') # 路由名称:用于反向解析
]

@ -0,0 +1,313 @@
import logging
# Create your views here.
from urllib.parse import urlparse # 用于解析URL验证跳转地址合法性
from django.conf import settings
from django.contrib.auth import get_user_model # 用于动态获取用户模型
from django.contrib.auth import login # 用于用户登录
from django.core.exceptions import ObjectDoesNotExist # 用于处理对象不存在异常
from django.db import transaction # 用于数据库事务处理,确保操作原子性
from django.http import HttpResponseForbidden # 用于返回403禁止访问响应
from django.http import HttpResponseRedirect # 用于HTTP重定向
from django.shortcuts import get_object_or_404 # 用于获取对象不存在则返回404
from django.shortcuts import render # 用于渲染模板
from django.urls import reverse # 用于反向解析URL
from django.utils import timezone # 用于获取当前时间
from django.utils.translation import gettext_lazy as _ # 用于国际化翻译
from django.views.generic import FormView # 用于基于类的表单视图
from djangoblog.blog_signals import oauth_user_login_signal # 导入oauth登录信号
from djangoblog.utils import get_current_site # 用于获取当前站点信息
from djangoblog.utils import send_email, get_sha256 # 导入发送邮件和加密工具函数
from oauth.forms import RequireEmailForm # 导入补充邮箱的表单
from .models import OAuthUser # 导入OAuth用户模型
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException # 导入OAuth管理器和异常
# 初始化日志记录器
logger = logging.getLogger(__name__)
def get_redirecturl(request):
"""
处理并验证跳转URL确保跳转地址安全仅允许本站域名
:param request: 请求对象
:return: 验证后的合法跳转URL默认返回'/'
"""
nexturl = request.GET.get('next_url', None)
# 过滤非法或默认的跳转地址
if not nexturl or nexturl == '/login/' or nexturl == '/login':
nexturl = '/'
return nexturl
# 解析URL验证域名是否为本站
p = urlparse(nexturl)
if p.netloc: # 存在域名部分时验证
site = get_current_site().domain
# 移除www.前缀后比较,确保域名一致
if not p.netloc.replace('www.', '') == site.replace('www.', ''):
logger.info('非法url:' + nexturl)
return "/"
return nexturl
def oauthlogin(request):
"""
OAuth登录入口根据平台类型生成第三方授权链接并跳转
:param request: 请求对象包含'type'参数如weibogithub
:return: 重定向到第三方平台授权页面
"""
type = request.GET.get('type', None)
if not type: # 未指定平台类型,跳转到首页
return HttpResponseRedirect('/')
# 获取对应平台的OAuth管理器
manager = get_manager_by_type(type)
if not manager: # 管理器不存在,跳转到首页
return HttpResponseRedirect('/')
# 获取合法的跳转地址(授权成功后返回的页面)
nexturl = get_redirecturl(request)
# 生成第三方平台的授权URL
authorizeurl = manager.get_authorization_url(nexturl)
# 重定向到授权页面
return HttpResponseRedirect(authorizeurl)
def authorize(request):
"""
OAuth授权回调处理接收第三方平台返回的code获取用户信息并完成登录
:param request: 请求对象包含'type'平台类型'code'授权码
:return: 重定向到目标页面或补充邮箱页面
"""
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
# 获取对应平台的OAuth管理器
manager = get_manager_by_type(type)
if not manager:
return HttpResponseRedirect('/')
# 获取授权码
code = request.GET.get('code', None)
try:
# 通过code获取access token
rsp = manager.get_access_token_by_code(code)
except OAuthAccessTokenException as e:
logger.warning("OAuthAccessTokenException:" + str(e))
return HttpResponseRedirect('/')
except Exception as e:
logger.error(e)
rsp = None
# 获取授权成功后的跳转地址
nexturl = get_redirecturl(request)
if not rsp: # 获取token失败重新跳转授权
return HttpResponseRedirect(manager.get_authorization_url(nexturl))
# 通过token获取第三方用户信息
user = manager.get_oauth_userinfo()
if user:
# 处理用户昵称为空的情况,生成默认昵称
if not user.nickname or not user.nickname.strip():
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
try:
# 若用户已存在(同平台+同openid更新用户信息
temp = OAuthUser.objects.get(type=type, openid=user.openid)
temp.picture = user.picture
temp.metadata = user.metadata
temp.nickname = user.nickname
user = temp
except ObjectDoesNotExist:
# 用户不存在,使用新用户对象
pass
# Facebook的token过长不存储
if type == 'facebook':
user.token = ''
# 若用户提供了邮箱,直接关联或创建本地用户并登录
if user.email:
with transaction.atomic(): # 数据库事务:确保操作原子性
author = None
try:
# 尝试获取已关联的本地用户
author = get_user_model().objects.get(id=user.author_id)
except ObjectDoesNotExist:
pass
if not author:
# 不存在则创建或获取本地用户(通过邮箱匹配)
result = get_user_model().objects.get_or_create(email=user.email)
author = result[0]
# 若为新创建的用户,设置用户名和来源
if result[1]:
try:
# 检查用户名是否已存在
get_user_model().objects.get(username=user.nickname)
except ObjectDoesNotExist:
author.username = user.nickname
else:
# 用户名已存在,生成唯一用户名
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.source = 'authorize' # 标记来源为oauth授权
author.save()
# 关联本地用户并保存
user.author = author
user.save()
# 发送oauth用户登录信号用于后续处理如日志、统计等
oauth_user_login_signal.send(
sender=authorize.__class__, id=user.id)
# 登录用户
login(request, author)
# 重定向到目标页面
return HttpResponseRedirect(nexturl)
else:
# 未提供邮箱,保存用户信息并跳转到补充邮箱页面
user.save()
url = reverse('oauth:require_email', kwargs={'oauthid': user.id})
return HttpResponseRedirect(url)
else:
# 获取用户信息失败,跳转到目标页面
return HttpResponseRedirect(nexturl)
def emailconfirm(request, id, sign):
"""
邮箱确认处理验证签名合法性完成用户与邮箱的绑定并登录
:param request: 请求对象
:param id: OAuthUser的ID
:param sign: 验证签名基于SECRET_KEY和id生成
:return: 重定向到绑定成功页面
"""
if not sign: # 签名为空返回403
return HttpResponseForbidden()
# 验证签名合法性(忽略大小写)
if not get_sha256(settings.SECRET_KEY + str(id) + settings.SECRET_KEY).upper() == sign.upper():
return HttpResponseForbidden()
# 获取对应的OAuth用户
oauthuser = get_object_or_404(OAuthUser, pk=id)
with transaction.atomic(): # 数据库事务
if oauthuser.author:
# 已关联本地用户,直接获取
author = get_user_model().objects.get(pk=oauthuser.author_id)
else:
# 未关联,通过邮箱创建或获取本地用户
result = get_user_model().objects.get_or_create(email=oauthuser.email)
author = result[0]
if result[1]: # 新创建用户
author.source = 'emailconfirm' # 标记来源为邮箱确认
# 设置用户名为OAuth用户的昵称或生成默认
author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip() else "djangoblog" + timezone.now().strftime(
'%y%m%d%I%M%S')
author.save()
# 关联本地用户并保存
oauthuser.author = author
oauthuser.save()
# 发送登录信号
oauth_user_login_signal.send(sender=emailconfirm.__class__, id=oauthuser.id)
# 登录用户
login(request, author)
# 发送绑定成功邮件
site = 'http://' + get_current_site().domain
content = _('''
<p>Congratulations, you have successfully bound your email address. You can use
%(oauthuser_type)s to directly log in to this website without a password.</p>
You are welcome to continue to follow this site, the address is
<a href="%(site)s" rel="bookmark">%(site)s</a>
Thank you again!
<br />
If the link above cannot be opened, please copy this link to your browser.
%(site)s
''') % {'oauthuser_type': oauthuser.type, 'site': site}
send_email(emailto=[oauthuser.email, ], title=_('Congratulations on your successful binding!'), content=content)
# 重定向到绑定成功页面
url = reverse('oauth:bindsuccess', kwargs={'oauthid': id})
url = url + '?type=success'
return HttpResponseRedirect(url)
class RequireEmailView(FormView):
"""
补充邮箱的类视图显示表单收集用户邮箱发送确认链接
"""
form_class = RequireEmailForm # 使用的表单类
template_name = 'oauth/require_email.html' # 渲染的模板
def get(self, request, *args, **kwargs):
"""处理GET请求获取OAuth用户若已填写邮箱则跳转注释中为跳转逻辑实际未启用"""
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.email:
pass # 若已填写邮箱,可在此处添加跳转逻辑
return super(RequireEmailView, self).get(request, *args, **kwargs)
def get_initial(self):
"""初始化表单数据预设oauthid字段"""
oauthid = self.kwargs['oauthid']
return {'email': '', 'oauthid': oauthid}
def get_context_data(self, **kwargs):
"""补充上下文数据:添加用户头像(若存在)"""
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.picture:
kwargs['picture'] = oauthuser.picture
return super(RequireEmailView, self).get_context_data(**kwargs)
def form_valid(self, form):
"""处理表单验证通过:保存邮箱,生成确认链接并发送邮件"""
email = form.cleaned_data['email']
oauthid = form.cleaned_data['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
# 保存用户邮箱
oauthuser.email = email
oauthuser.save()
# 生成验证签名
sign = get_sha256(settings.SECRET_KEY + str(oauthuser.id) + settings.SECRET_KEY)
# 获取当前站点域名
site = get_current_site().domain
if settings.DEBUG: # 开发环境使用本地地址
site = '127.0.0.1:8000'
# 生成邮箱确认链接
path = reverse('oauth:email_confirm', kwargs={'id': oauthid, 'sign': sign})
url = "http://{site}{path}".format(site=site, path=path)
# 发送确认邮件
content = _("""
<p>Please click the link below to bind your email</p>
<a href="%(url)s" rel="bookmark">%(url)s</a>
Thank you again!
<br />
If the link above cannot be opened, please copy this link to your browser.
<br />
%(url)s
""") % {'url': url}
send_email(emailto=[email, ], title=_('Bind your email'), content=content)
# 重定向到绑定成功页面(提示查收邮件)
url = reverse('oauth:bindsuccess', kwargs={'oauthid': oauthid})
url = url + '?type=email'
return HttpResponseRedirect(url)
def bindsuccess(request, oauthid):
"""
绑定成功页面根据类型显示不同的成功信息
:param request: 请求对象包含'type'参数email/success
:param oauthid: OAuthUser的ID
:return: 渲染绑定成功模板
"""
type = request.GET.get('type', None)
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if type == 'email':
# 已发送验证邮件的提示
title = _('Bind your email')
content = _(
'Congratulations, the binding is just one step away. '
'Please log in to your email to check the email to complete the binding. Thank you.')
else:
# 绑定完成的提示
title = _('Binding successful')
content = _(
"Congratulations, you have successfully bound your email address. You can use %(oauthuser_type)s"
" to directly log in to this website without a password. You are welcome to continue to follow this site." % {
'oauthuser_type': oauthuser.type})
return render(request, 'oauth/bindsuccess.html', {'title': title, 'content': content})
Loading…
Cancel
Save