ly_第六周注释 #18

Merged
pnry42fjm merged 1 commits from ly_branch into develop 3 months ago

@ -0,0 +1,89 @@
import logging
from django.contrib import admin
# Register your models here.
from django.urls import reverse
from django.utils.html import format_html
logger = logging.getLogger(__name__)
class OAuthUserAdmin(admin.ModelAdmin):
"""
OAuth用户模型的管理界面配置
用于管理第三方登录的用户信息
"""
# 搜索字段配置
search_fields = ('nickname', 'email')
# 每页显示数量
list_per_page = 20
# 列表页显示的字段
list_display = (
'id',
'nickname',
'link_to_usermodel', # 自定义字段:链接到用户模型
'show_user_image', # 自定义字段:显示用户头像
'type', # OAuth类型github、weibo等
'email',
)
# 可点击进入编辑页的字段
list_display_links = ('id', 'nickname')
# 右侧筛选器
list_filter = ('author', 'type',)
# 只读字段列表
readonly_fields = []
def get_readonly_fields(self, request, obj=None):
"""
动态设置所有字段为只读防止在admin中修改OAuth用户数据
"""
return list(self.readonly_fields) + \
[field.name for field in obj._meta.fields] + \
[field.name for field in obj._meta.many_to_many]
def has_add_permission(self, request):
"""
禁止在admin中添加新的OAuth用户
OAuth用户只能通过第三方登录自动创建
"""
return False
def link_to_usermodel(self, obj):
"""
自定义字段显示链接到关联的用户模型
如果OAuth用户已绑定本地用户显示可点击的链接
"""
if obj.author:
# 获取用户模型的admin变更URL信息
info = (obj.author._meta.app_label, obj.author._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,))
return format_html(
u'<a href="%s">%s</a>' %
(link, obj.author.nickname if obj.author.nickname else obj.author.email))
def show_user_image(self, obj):
"""
自定义字段显示用户头像
在admin列表中显示50x50像素的头像图片
"""
img = obj.picture
return format_html(
u'<img src="%s" style="width:50px;height:50px"></img>' %
(img))
# 设置自定义字段在admin中的显示名称
link_to_usermodel.short_description = '用户'
show_user_image.short_description = '用户头像'
class OAuthConfigAdmin(admin.ModelAdmin):
"""
OAuth配置模型的管理界面配置
用于管理第三方登录的配置信息
"""
# 列表页显示的字段
list_display = ('type', 'appkey', 'appsecret', 'is_enable')
# 右侧筛选器
list_filter = ('type',)

@ -0,0 +1,12 @@
from django.apps import AppConfig
class OauthConfig(AppConfig):
"""
OAuth应用配置类
用于配置Django应用中OAuth模块的元数据和行为
"""
# 指定Django应用的完整Python路径
# 这应该与应用目录名和settings.INSTALLED_APPS中的名称一致
name = 'oauth'

@ -0,0 +1,30 @@
# 导入Django的表单基础模块和组件模块
from django.contrib.auth.forms import forms
from django.forms import widgets
# 定义一个用于获取用户邮箱的表单类继承自Django的基础表单类forms.Form
class RequireEmailForm(forms.Form):
# 定义邮箱字段:
# - 类型为EmailField自动验证邮箱格式
# - label设置表单显示的标签为"电子邮箱"
# - required=True表示该字段为必填项
email = forms.EmailField(label='电子邮箱', required=True)
# 定义oauthid字段
# - 类型为IntegerField用于存储第三方登录的关联ID
# - widget=forms.HiddenInput设置为隐藏输入框不在页面显式展示
# - required=False表示该字段为非必填项
oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False)
# 重写表单的初始化方法
def __init__(self, *args, **kwargs):
# 调用父类的初始化方法,确保表单基础功能正常
super(RequireEmailForm, self).__init__(*args, **kwargs)
# 自定义email字段的渲染组件
# - 使用widgets.EmailInput作为输入组件语义化邮箱输入框
# - attrs设置HTML属性
# - placeholder="email":输入框默认提示文本
# - "class": "form-control"添加CSS类用于样式控制通常配合Bootstrap等框架
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})

@ -0,0 +1,99 @@
# Generated by Django 4.1.7 on 2023-03-07 09:53
# 备注此迁移文件由Django 4.1.7版本自动生成生成时间为2023年3月7日9:53
from django.conf import settings
# 备注导入Django的配置模块用于获取项目设置如用户模型
from django.db import migrations, models
# 备注导入Django的数据库迁移和模型字段模块用于定义迁移操作和模型结构
import django.db.models.deletion
# 备注导入Django的外键删除行为模块用于定义外键关联的删除策略
import django.utils.timezone
# 备注导入Django的时区工具用于处理时间字段的默认值
class Migration(migrations.Migration):
# 备注:定义迁移类,所有数据库迁移操作都通过此类实现
initial = True
# 备注:标记为初始迁移(首次创建模型时的迁移)
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
# 备注:迁移依赖配置,依赖项目中配置的用户模型(确保用户表先于当前表创建)
operations = [
# 备注:定义迁移操作列表,包含需要执行的数据库操作
migrations.CreateModel(
# 备注:创建第一个模型(数据库表)的操作
name='OAuthConfig',
# 备注:模型名称,对应数据库表名为"oauth_oauthconfig"应用名_模型名
fields=[
# 备注:模型字段定义列表
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 备注主键字段自增BigInteger类型自动创建不可序列化显示名为"ID"
('type', models.CharField(choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='类型')),
# 备注第三方平台类型字段字符类型最大长度10可选值为微博/谷歌等,默认值为'a'(可能需要修正),显示名为"类型"
('appkey', models.CharField(max_length=200, verbose_name='AppKey')),
# 备注第三方应用的AppKey字段字符类型最大长度200显示名为"AppKey"
('appsecret', models.CharField(max_length=200, verbose_name='AppSecret')),
# 备注第三方应用的AppSecret字段字符类型最大长度200显示名为"AppSecret"
('callback_url', models.CharField(default='http://www.baidu.com', max_length=200, verbose_name='回调地址')),
# 备注授权回调地址字段字符类型默认值为百度地址需替换为实际地址最大长度200显示名为"回调地址"
('is_enable', models.BooleanField(default=True, verbose_name='是否显示')),
# 备注是否启用该平台的字段布尔类型默认值为True启用显示名为"是否显示"
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 备注:创建时间字段,日期时间类型,默认值为当前时间,显示名为"创建时间"
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
# 备注:最后修改时间字段,日期时间类型,默认值为当前时间,显示名为"修改时间"
],
options={
# 备注:模型的额外配置
'verbose_name': 'oauth配置',
# 备注:模型的单数显示名称
'verbose_name_plural': 'oauth配置',
# 备注:模型的复数显示名称(与单数相同)
'ordering': ['-created_time'],
# 备注:默认排序方式,按创建时间倒序(最新的在前)
},
),
migrations.CreateModel(
# 备注:创建第二个模型(数据库表)的操作
name='OAuthUser',
# 备注:模型名称,对应数据库表名为"oauth_oauthuser"
fields=[
# 备注:模型字段定义列表
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 备注主键字段同OAuthConfig的id字段
('openid', models.CharField(max_length=50)),
# 备注第三方平台返回的用户唯一标识字符类型最大长度50
('nickname', models.CharField(max_length=50, verbose_name='昵称')),
# 备注第三方用户的昵称字符类型最大长度50显示名为"昵称"
('token', models.CharField(blank=True, max_length=150, null=True)),
# 备注第三方授权令牌字符类型允许为空最大长度150可设为null
('picture', models.CharField(blank=True, max_length=350, null=True)),
# 备注第三方用户的头像地址字符类型允许为空最大长度350可设为null
('type', models.CharField(max_length=50)),
# 备注:关联的第三方平台类型(如"weibo"字符类型最大长度50
('email', models.CharField(blank=True, max_length=50, null=True)),
# 备注第三方用户的邮箱字符类型允许为空最大长度50可设为null
('metadata', models.TextField(blank=True, null=True)),
# 备注存储额外的第三方用户信息文本类型允许为空可设为null
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 备注创建时间字段同OAuthConfig
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
# 备注最后修改时间字段同OAuthConfig
('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='用户')),
# 备注:外键字段,关联项目的用户模型,允许为空(支持未绑定本地账号的场景),删除策略为级联删除(用户删除时关联记录也删除),显示名为"用户"
],
options={
# 备注:模型的额外配置
'verbose_name': 'oauth用户',
# 备注:模型的单数显示名称
'verbose_name_plural': 'oauth用户',
# 备注:模型的复数显示名称(与单数相同)
'ordering': ['-created_time'],
# 备注:默认排序方式,按创建时间倒序
},
),
]

@ -0,0 +1,97 @@
# Generated by Django 4.2.5 on 2023-09-06 13:13
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('oauth', '0001_initial'),
]
operations = [
# 修改模型选项:设置排序方式和中文显示名称
migrations.AlterModelOptions(
name='oauthconfig',
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth配置', 'verbose_name_plural': 'oauth配置'},
),
migrations.AlterModelOptions(
name='oauthuser',
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth user', 'verbose_name_plural': 'oauth user'},
),
# 移除旧的时间字段(从 created_time 和 last_mod_time 改为新的字段命名)
migrations.RemoveField(
model_name='oauthconfig',
name='created_time',
),
migrations.RemoveField(
model_name='oauthconfig',
name='last_mod_time',
),
migrations.RemoveField(
model_name='oauthuser',
name='created_time',
),
migrations.RemoveField(
model_name='oauthuser',
name='last_mod_time',
),
# 添加新的时间字段(使用新的字段命名 convention
migrations.AddField(
model_name='oauthconfig',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
migrations.AddField(
model_name='oauthconfig',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
),
migrations.AddField(
model_name='oauthuser',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
migrations.AddField(
model_name='oauthuser',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
),
# 修改字段定义和选项
migrations.AlterField(
model_name='oauthconfig',
name='callback_url',
field=models.CharField(default='', max_length=200, verbose_name='callback url'),
),
migrations.AlterField(
model_name='oauthconfig',
name='is_enable',
field=models.BooleanField(default=True, verbose_name='is enable'),
),
# 修改OAuth类型选择项定义支持的第三方登录平台
migrations.AlterField(
model_name='oauthconfig',
name='type',
field=models.CharField(
choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'),
('qq', 'QQ')], default='a', max_length=10, verbose_name='type'),
),
# 修改外键关系,关联到用户模型
migrations.AlterField(
model_name='oauthuser',
name='author',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL, verbose_name='author'),
),
migrations.AlterField(
model_name='oauthuser',
name='nickname',
field=models.CharField(max_length=50, verbose_name='nickname'),
),
]

@ -0,0 +1,20 @@
# Generated by Django 4.2.7 on 2024-01-26 02:41
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('oauth', '0002_alter_oauthconfig_options_alter_oauthuser_options_and_more'),
]
operations = [
# 修改OAuth用户昵称字段的显示名称
# 将verbose_name从'nickname'改为'nick name'(添加空格)
migrations.AlterField(
model_name='oauthuser',
name='nickname',
field=models.CharField(max_length=50, verbose_name='nick name'),
),
]

@ -0,0 +1,115 @@
# Create your models here.
# 备注当前文件定义了OAuth相关的数据模型
# 导入Django配置、异常、模型、时间工具和翻译工具
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
class OAuthUser(models.Model):
"""第三方登录用户关联模型,存储用户与第三方平台的绑定信息"""
# 关联本地用户模型(外键)
# - 允许为空(支持未绑定本地账号的场景)
# - 级联删除(本地用户删除时,关联记录也删除)
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'), # 国际化显示名称"作者/用户"
blank=True,
null=True,
on_delete=models.CASCADE)
# 第三方平台返回的用户唯一标识如微信openid、GitHub id等
openid = models.CharField(max_length=50)
# 第三方用户的昵称
nickname = models.CharField(max_length=50, verbose_name=_('nick name'))
# 第三方平台返回的授权令牌(可能过期,允许为空)
token = models.CharField(max_length=150, null=True, blank=True)
# 第三方用户的头像图片地址(允许为空)
picture = models.CharField(max_length=350, blank=True, null=True)
# 第三方平台类型(如'weibo'、'github'等,必选字段)
type = models.CharField(blank=False, null=False, max_length=50)
# 第三方用户的邮箱(可能未提供,允许为空)
email = models.CharField(max_length=50, null=True, blank=True)
# 存储额外的第三方用户元数据(如性别、地区等,文本类型)
metadata = models.TextField(null=True, blank=True)
# 记录创建时间(默认当前时间)
creation_time = models.DateTimeField(_('creation time'), default=now)
# 记录最后修改时间(默认当前时间)
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
# 模型实例的字符串表示(显示昵称)
def __str__(self):
return self.nickname
# 模型元数据配置
class Meta:
verbose_name = _('oauth user') # 单数显示名称(国际化)
verbose_name_plural = verbose_name # 复数显示名称(与单数相同)
ordering = ['-creation_time'] # 默认按创建时间倒序排序
class OAuthConfig(models.Model):
"""第三方登录平台配置模型,存储各平台的授权参数"""
# 定义支持的第三方平台类型选项(元组形式,(存储值, 显示值)
TYPE = (
('weibo', _('weibo')), # 微博(支持国际化)
('google', _('google')), # 谷歌(支持国际化)
('github', 'GitHub'), # GitHub固定显示名称
('facebook', 'FaceBook'), # FaceBook固定显示名称
('qq', 'QQ'), # QQ固定显示名称
)
# 平台类型关联TYPE选项默认值为'a'可能需要修正)
type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a')
# 第三方平台申请的AppKey
appkey = models.CharField(max_length=200, verbose_name='AppKey')
# 第三方平台申请的AppSecret
appsecret = models.CharField(max_length=200, verbose_name='AppSecret')
# 授权回调地址(必选字段,默认空字符串)
callback_url = models.CharField(
max_length=200,
verbose_name=_('callback url'), # 国际化显示名称"回调地址"
blank=False,
default='')
# 是否启用该平台(默认启用,必选字段)
is_enable = models.BooleanField(
_('is enable'), default=True, blank=False, null=False)
# 记录创建时间
creation_time = models.DateTimeField(_('creation time'), default=now)
# 记录最后修改时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
# 数据验证方法(保存时触发)
def clean(self):
# 检查同类型的配置是否已存在(排除当前记录自身)
if OAuthConfig.objects.filter(
type=self.type).exclude(id=self.id).count():
# 若存在则抛出验证错误(提示该平台配置已存在)
raise ValidationError(_(self.type + _('already exists')))
# 模型实例的字符串表示(显示平台类型)
def __str__(self):
return self.type
# 模型元数据配置
class Meta:
verbose_name = 'oauth配置' # 单数显示名称(中文)
verbose_name_plural = verbose_name # 复数显示名称(与单数相同)
ordering = ['-creation_time'] # 默认按创建时间倒序排序

@ -0,0 +1,538 @@
import json
import logging
import os
import urllib.parse
from abc import ABCMeta, abstractmethod
import requests
from djangoblog.utils import cache_decorator
from oauth.models import OAuthUser, OAuthConfig
logger = logging.getLogger(__name__)
class OAuthAccessTokenException(Exception):
'''
OAuth授权失败异常类
当获取access_token过程中发生错误时抛出
'''
class BaseOauthManager(metaclass=ABCMeta):
"""
OAuth管理器基类
定义所有OAuth平台通用的接口和方法
"""
AUTH_URL = None # 授权页面URL
TOKEN_URL = None # 获取token的URL
API_URL = None # 获取用户信息的API URL
ICON_NAME = None # 平台标识名称
def __init__(self, access_token=None, openid=None):
self.access_token = access_token
self.openid = openid
@property
def is_access_token_set(self):
"""检查access_token是否已设置"""
return self.access_token is not None
@property
def is_authorized(self):
"""检查是否已完成授权"""
return self.is_access_token_set and self.access_token is not None and self.openid is not None
@abstractmethod
def get_authorization_url(self, nexturl='/'):
"""获取授权URL抽象方法"""
pass
@abstractmethod
def get_access_token_by_code(self, code):
"""通过授权码获取access_token抽象方法"""
pass
@abstractmethod
def get_oauth_userinfo(self):
"""获取用户信息(抽象方法)"""
pass
@abstractmethod
def get_picture(self, metadata):
"""从元数据中提取用户头像(抽象方法)"""
pass
def do_get(self, url, params, headers=None):
"""执行GET请求"""
rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
"""执行POST请求"""
rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text)
return rsp.text
def get_config(self):
"""从数据库获取OAuth配置"""
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None
class WBOauthManager(BaseOauthManager):
"""
微博OAuth管理器
实现微博平台的OAuth2.0认证流程
"""
AUTH_URL = 'https://api.weibo.com/oauth2/authorize'
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token'
API_URL = 'https://api.weibo.com/2/users/show.json'
ICON_NAME = 'weibo'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(WBOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, nexturl='/'):
"""构造微博授权URL"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url + '&next_url=' + nexturl
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""使用授权码获取access_token"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['uid'])
return self.get_oauth_userinfo()
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""获取微博用户信息"""
if not self.is_authorized:
return None
params = {
'uid': self.openid,
'access_token': self.access_token
}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp # 原始响应数据
user.picture = datas['avatar_large'] # 用户头像
user.nickname = datas['screen_name'] # 昵称
user.openid = datas['id'] # 用户唯一ID
user.type = 'weibo' # 平台类型
user.token = self.access_token # access_token
if 'email' in datas and datas['email']:
user.email = datas['email'] # 邮箱
return user
except Exception as e:
logger.error(e)
logger.error('weibo oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
"""从元数据中提取微博用户头像"""
datas = json.loads(metadata)
return datas['avatar_large']
class ProxyManagerMixin:
"""
代理混合类
为需要代理的OAuth管理器提供代理支持
"""
def __init__(self, *args, **kwargs):
# 从环境变量获取代理配置
if os.environ.get("HTTP_PROXY"):
self.proxies = {
"http": os.environ.get("HTTP_PROXY"),
"https": os.environ.get("HTTP_PROXY")
}
else:
self.proxies = None
def do_get(self, url, params, headers=None):
"""使用代理执行GET请求"""
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
"""使用代理执行POST请求"""
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
"""
Google OAuth管理器
实现Google平台的OAuth2.0认证流程
"""
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
ICON_NAME = 'google'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(GoogleOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, nexturl='/'):
"""构造Google授权URL"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'openid email', # 请求的权限范围
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""使用授权码获取access_token"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['id_token'])
logger.info(self.ICON_NAME + ' oauth ' + rsp)
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""获取Google用户信息"""
if not self.is_authorized:
return None
params = {
'access_token': self.access_token
}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['picture'] # 用户头像
user.nickname = datas['name'] # 昵称
user.openid = datas['sub'] # 用户唯一ID
user.token = self.access_token
user.type = 'google'
if datas['email']:
user.email = datas['email'] # 邮箱
return user
except Exception as e:
logger.error(e)
logger.error('google oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
"""从元数据中提取Google用户头像"""
datas = json.loads(metadata)
return datas['picture']
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
"""
GitHub OAuth管理器
实现GitHub平台的OAuth2.0认证流程
"""
AUTH_URL = 'https://github.com/login/oauth/authorize'
TOKEN_URL = 'https://github.com/login/oauth/access_token'
API_URL = 'https://api.github.com/user'
ICON_NAME = 'github'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(GitHubOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""构造GitHub授权URL"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': f'{self.callback_url}&next_url={next_url}',
'scope': 'user' # 请求用户信息权限
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""使用授权码获取access_token"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
# GitHub返回的是查询字符串格式需要解析
from urllib import parse
r = parse.parse_qs(rsp)
if 'access_token' in r:
self.access_token = (r['access_token'][0])
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""获取GitHub用户信息"""
rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token # GitHub需要使用token认证
})
try:
datas = json.loads(rsp)
user = OAuthUser()
user.picture = datas['avatar_url'] # 用户头像
user.nickname = datas['name'] # 昵称
user.openid = datas['id'] # 用户唯一ID
user.type = 'github'
user.token = self.access_token
user.metadata = rsp
if 'email' in datas and datas['email']:
user.email = datas['email'] # 邮箱
return user
except Exception as e:
logger.error(e)
logger.error('github oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
"""从元数据中提取GitHub用户头像"""
datas = json.loads(metadata)
return datas['avatar_url']
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
"""
Facebook OAuth管理器
实现Facebook平台的OAuth2.0认证流程
"""
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth'
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token'
API_URL = 'https://graph.facebook.com/me'
ICON_NAME = 'facebook'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(FaceBookOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""构造Facebook授权URL"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'email,public_profile' # 请求邮箱和公开资料权限
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""使用授权码获取access_token"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
token = str(obj['access_token'])
self.access_token = token
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""获取Facebook用户信息"""
params = {
'access_token': self.access_token,
'fields': 'id,name,picture,email' # 指定需要的字段
}
try:
rsp = self.do_get(self.API_URL, params)
datas = json.loads(rsp)
user = OAuthUser()
user.nickname = datas['name'] # 昵称
user.openid = datas['id'] # 用户唯一ID
user.type = 'facebook'
user.token = self.access_token
user.metadata = rsp
if 'email' in datas and datas['email']:
user.email = datas['email'] # 邮箱
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
user.picture = str(datas['picture']['data']['url']) # 嵌套的头像URL
return user
except Exception as e:
logger.error(e)
return None
def get_picture(self, metadata):
"""从元数据中提取Facebook用户头像"""
datas = json.loads(metadata)
return str(datas['picture']['data']['url'])
class QQOauthManager(BaseOauthManager):
"""
QQ OAuth管理器
实现QQ平台的OAuth2.0认证流程
"""
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize'
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token'
API_URL = 'https://graph.qq.com/user/get_user_info'
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' # QQ需要单独获取openid
ICON_NAME = 'qq'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(QQOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""构造QQ授权URL"""
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""使用授权码获取access_token"""
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_get(self.TOKEN_URL, params)
if rsp:
# QQ返回的是查询字符串格式
d = urllib.parse.parse_qs(rsp)
if 'access_token' in d:
token = d['access_token']
self.access_token = token[0]
return token
else:
raise OAuthAccessTokenException(rsp)
def get_open_id(self):
"""获取QQ用户的openidQQ特有步骤"""
if self.is_access_token_set:
params = {
'access_token': self.access_token
}
rsp = self.do_get(self.OPEN_ID_URL, params)
if rsp:
# 处理JSONP响应格式
rsp = rsp.replace('callback(', '').replace(')', '').replace(';', '')
obj = json.loads(rsp)
openid = str(obj['openid'])
self.openid = openid
return openid
def get_oauth_userinfo(self):
"""获取QQ用户信息"""
openid = self.get_open_id()
if openid:
params = {
'access_token': self.access_token,
'oauth_consumer_key': self.client_id,
'openid': self.openid
}
rsp = self.do_get(self.API_URL, params)
logger.info(rsp)
obj = json.loads(rsp)
user = OAuthUser()
user.nickname = obj['nickname'] # 昵称
user.openid = openid # 用户唯一ID
user.type = 'qq'
user.token = self.access_token
user.metadata = rsp
if 'email' in obj:
user.email = obj['email'] # 邮箱
if 'figureurl' in obj:
user.picture = str(obj['figureurl']) # 用户头像
return user
def get_picture(self, metadata):
"""从元数据中提取QQ用户头像"""
datas = json.loads(metadata)
return str(datas['figureurl'])
@cache_decorator(expiration=100 * 60)
def get_oauth_apps():
"""
获取所有启用的OAuth应用
使用缓存装饰器100分钟过期
"""
configs = OAuthConfig.objects.filter(is_enable=True).all()
if not configs:
return []
configtypes = [x.type for x in configs]
applications = BaseOauthManager.__subclasses__()
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
return apps
def get_manager_by_type(type):
"""
根据类型获取对应的OAuth管理器
"""
applications = get_oauth_apps()
if applications:
finds = list(filter(lambda x: x.ICON_NAME.lower() == type.lower(), applications))
if finds:
return finds[0]
return None

@ -0,0 +1,54 @@
from django import template
from django.urls import reverse
from oauth.oauthmanager import get_oauth_apps
# 注册Django模板标签库
register = template.Library()
@register.inclusion_tag('oauth/oauth_applications.html')
def load_oauth_applications(request):
"""
自定义模板标签加载OAuth认证应用程序列表
功能
- 获取所有可用的OAuth应用配置
- 生成每个OAuth应用的登录URL
- 通过包含模板的方式渲染OAuth登录按钮
参数
request: HttpRequest对象用于获取当前请求路径
返回
包含apps列表的字典用于渲染模板
"""
# 获取所有配置的OAuth应用
applications = get_oauth_apps()
if applications:
# 获取OAuth登录的基础URL
baseurl = reverse('oauth:oauthlogin')
# 获取当前请求的完整路径(用于登录成功后跳转回原页面)
path = request.get_full_path()
# 为每个OAuth应用生成登录URL
# 格式:/oauth/login/?type=github&next_url=/current/path/
apps = list(map(lambda x: (
x.ICON_NAME, # OAuth应用类型标识github、weibo等
'{baseurl}?type={type}&next_url={next}'.format(
baseurl=baseurl,
type=x.ICON_NAME,
next=path
)),
applications
))
else:
# 如果没有配置任何OAuth应用返回空列表
apps = []
# 返回模板上下文数据
return {
'apps': apps # 包含(OAuth类型, 登录URL)元组的列表
}

@ -0,0 +1,323 @@
# 导入必要的模块JSON处理、单元测试Mock工具、Django配置/认证/测试/URL工具、项目工具类及OAuth相关模型和管理器
import json
from unittest.mock import patch # 用于Mock测试中替换真实函数/方法,模拟第三方接口返回
from django.conf import settings # 导入Django项目配置
from django.contrib import auth # 导入Django认证模块用于获取/验证用户
from django.test import Client, RequestFactory, TestCase # Django测试基础类Client模拟HTTP请求RequestFactory构建请求对象TestCase测试基类
from django.urls import reverse # 用于通过URL名称反向解析URL
from djangoblog.utils import get_sha256 # 导入项目自定义工具类用于生成SHA256加密字符串
from oauth.models import OAuthConfig # 导入OAuth配置模型用于测试中创建配置数据
from oauth.oauthmanager import BaseOauthManager # 导入OAuth管理器基类用于获取所有第三方登录管理器子类
# Create your tests here.
# 定义OAuth配置相关的测试类继承Django测试基类TestCase
class OAuthConfigTest(TestCase):
# 测试前置方法:在每个测试方法执行前初始化测试环境
def setUp(self):
self.client = Client() # 创建HTTP客户端对象用于发送测试请求
self.factory = RequestFactory() # 创建请求工厂对象,用于构建自定义请求
# 测试OAuth登录流程的基础功能以微博为例
def test_oauth_login_test(self):
# 1. 创建测试用的微博OAuth配置数据并保存到数据库
c = OAuthConfig()
c.type = 'weibo' # 平台类型为微博
c.appkey = 'appkey' # 模拟AppKey
c.appsecret = 'appsecret' # 模拟AppSecret
c.save() # 保存到数据库
# 2. 发送GET请求到微博OAuth登录地址测试跳转是否正常
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302) # 断言响应状态码为302重定向符合OAuth授权流程
self.assertTrue("api.weibo.com" in response.url) # 断言重定向URL包含微博API域名确认跳转到微博授权页
# 3. 模拟授权成功后回调发送带code参数的请求到授权处理地址
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302) # 断言响应状态码为302处理后重定向
self.assertEqual(response.url, '/') # 断言重定向到首页,确认授权后跳转正常
# 定义第三方登录流程的测试类继承TestCase
class OauthLoginTest(TestCase):
# 测试前置方法:初始化客户端、请求工厂,并创建所有支持的第三方平台配置
def setUp(self) -> None:
self.client = Client() # 初始化HTTP客户端
self.factory = RequestFactory() # 初始化请求工厂
self.apps = self.init_apps() # 调用自定义方法,创建所有第三方平台的测试配置
# 初始化所有第三方登录平台的配置基于BaseOauthManager的子类
def init_apps(self):
# 1. 获取BaseOauthManager的所有子类即各第三方平台的具体管理器如微博、谷歌等
applications = [p() for p in BaseOauthManager.__subclasses__()]
# 2. 为每个平台创建对应的OAuthConfig数据并保存
for application in applications:
c = OAuthConfig()
c.type = application.ICON_NAME.lower() # 平台类型与管理器的ICON_NAME一致小写
c.appkey = 'appkey' # 模拟AppKey
c.appsecret = 'appsecret' # 模拟AppSecret
c.save() # 保存到数据库
return applications # 返回所有平台管理器实例,供后续测试使用
# 根据平台类型获取对应的管理器实例
def get_app_by_type(self, type):
for app in self.apps:
if app.ICON_NAME.lower() == type: # 匹配平台类型(小写)
return app
return None # 未找到时返回None
# 测试微博登录流程(使用@patch模拟管理器的do_post/do_get方法避免真实调用第三方接口
@patch("oauth.oauthmanager.WBOauthManager.do_post") # Mock微博管理器的POST请求方法
@patch("oauth.oauthmanager.WBOauthManager.do_get") # Mock微博管理器的GET请求方法
def test_weibo_login(self, mock_do_get, mock_do_post):
# 1. 获取微博管理器实例,断言实例存在(确认初始化成功)
weibo_app = self.get_app_by_type('weibo')
assert weibo_app
# 2. 调用获取授权URL的方法仅验证方法可执行未断言URL内容
url = weibo_app.get_authorization_url()
# 3. 模拟do_post返回第三方平台返回的access_token和uid
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
# 4. 模拟do_get返回第三方平台返回的用户信息如头像、昵称等
mock_do_get.return_value = json.dumps({
"avatar_large": "avatar_large",
"screen_name": "screen_name",
"id": "id",
"email": "email",
})
# 5. 调用获取access_token的方法获取用户信息对象
userinfo = weibo_app.get_access_token_by_code('code')
# 6. 断言用户信息中的token和openid与模拟返回一致验证流程正确性
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'id')
# 测试谷歌登录流程逻辑与微博类似Mock谷歌管理器的请求方法
@patch("oauth.oauthmanager.GoogleOauthManager.do_post")
@patch("oauth.oauthmanager.GoogleOauthManager.do_get")
def test_google_login(self, mock_do_get, mock_do_post):
# 1. 获取谷歌管理器实例,断言存在
google_app = self.get_app_by_type('google')
assert google_app
# 2. 调用获取授权URL的方法
url = google_app.get_authorization_url()
# 3. 模拟do_post返回谷歌返回的access_token和id_token
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"id_token": "id_token",
})
# 4. 模拟do_get返回谷歌返回的用户信息
mock_do_get.return_value = json.dumps({
"picture": "picture",
"name": "name",
"sub": "sub", # 谷歌用户唯一标识对应openid
"email": "email",
})
# 5. 调用获取token和用户信息的方法
token = google_app.get_access_token_by_code('code')
userinfo = google_app.get_oauth_userinfo()
# 6. 断言token和openid与模拟返回一致
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'sub')
# 测试GitHub登录流程
@patch("oauth.oauthmanager.GitHubOauthManager.do_post")
@patch("oauth.oauthmanager.GitHubOauthManager.do_get")
def test_github_login(self, mock_do_get, mock_do_post):
# 1. 获取GitHub管理器实例断言存在
github_app = self.get_app_by_type('github')
assert github_app
# 2. 调用获取授权URL断言URL包含GitHub域名和client_id验证授权URL正确性
url = github_app.get_authorization_url()
self.assertTrue("github.com" in url) # 确认跳转到GitHub授权页
self.assertTrue("client_id" in url) # 确认URL包含client_id参数
# 3. 模拟do_post返回GitHub返回的token字符串非JSON格式
mock_do_post.return_value = "access_token=gho_16C7e42F292c6912E7710c838347Ae178B4a&scope=repo%2Cgist&token_type=bearer"
# 4. 模拟do_get返回GitHub返回的用户信息
mock_do_get.return_value = json.dumps({
"avatar_url": "avatar_url",
"name": "name",
"id": "id", # GitHub用户唯一标识
"email": "email",
})
# 5. 调用获取token和用户信息的方法
token = github_app.get_access_token_by_code('code')
userinfo = github_app.get_oauth_userinfo()
# 6. 断言token与模拟返回一致
self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a')
self.assertEqual(userinfo.openid, 'id')
# 测试Facebook登录流程
@patch("oauth.oauthmanager.FaceBookOauthManager.do_post")
@patch("oauth.oauthmanager.FaceBookOauthManager.do_get")
def test_facebook_login(self, mock_do_get, mock_do_post):
# 1. 获取Facebook管理器实例断言存在
facebook_app = self.get_app_by_type('facebook')
assert facebook_app
# 2. 调用获取授权URL断言包含Facebook域名
url = facebook_app.get_authorization_url()
self.assertTrue("facebook.com" in url)
# 3. 模拟do_post返回Facebook返回的access_token
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
})
# 4. 模拟do_get返回Facebook返回的用户信息头像嵌套在picture.data.url中
mock_do_get.return_value = json.dumps({
"name": "name",
"id": "id", # Facebook用户唯一标识
"email": "email",
"picture": {
"data": {
"url": "url"
}
}
})
# 5. 调用获取token和用户信息的方法
token = facebook_app.get_access_token_by_code('code')
userinfo = facebook_app.get_oauth_userinfo()
# 6. 断言token与模拟返回一致
self.assertEqual(userinfo.token, 'access_token')
# 测试QQ登录流程Mock do_get方法模拟多步返回token、openid、用户信息
@patch("oauth.oauthmanager.QQOauthManager.do_get", side_effect=[
# 模拟三次do_get调用的返回值QQ授权流程需多步请求
'access_token=access_token&expires_in=3600', # 第一步获取token
'callback({"client_id":"appid","openid":"openid"} );', # 第二步获取openid带callback包裹
json.dumps({ # 第三步:获取用户信息
"nickname": "nickname",
"email": "email",
"figureurl": "figureurl",
"openid": "openid",
})
])
def test_qq_login(self, mock_do_get):
# 1. 获取QQ管理器实例断言存在
qq_app = self.get_app_by_type('qq')
assert qq_app
# 2. 调用获取授权URL断言包含QQ域名
url = qq_app.get_authorization_url()
self.assertTrue("qq.com" in url)
# 3. 调用获取token和用户信息的方法
token = qq_app.get_access_token_by_code('code')
userinfo = qq_app.get_oauth_userinfo()
# 4. 断言token与模拟返回一致
self.assertEqual(userinfo.token, 'access_token')
# 测试微博登录(用户信息包含邮箱的场景):验证登录后自动绑定用户、跳转首页
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_with_email(self, mock_do_get, mock_do_post):
# 1. 模拟do_post返回access_token和uid
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
# 2. 模拟do_get返回包含邮箱的用户信息
mock_user_info = {
"avatar_large": "avatar_large",
"screen_name": "screen_name1", # 昵称将作为Django用户名
"id": "id",
"email": "email", # 包含邮箱
}
mock_do_get.return_value = json.dumps(mock_user_info)
# 3. 发送请求到微博登录地址断言重定向到微博API
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
# 4. 模拟授权回调发送带code的请求到授权处理地址
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/') # 断言授权后跳转到首页
# 5. 验证用户已登录,且用户信息与模拟数据一致
user = auth.get_user(self.client) # 获取当前登录用户
assert user.is_authenticated # 断言用户已认证
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name']) # 用户名=微博昵称
self.assertEqual(user.email, mock_user_info['email']) # 邮箱=微博返回的邮箱
# 6. 测试登出后再次登录(验证重复登录逻辑)
self.client.logout() # 登出当前用户
response = self.client.get('/oauth/authorize?type=weibo&code=code') # 再次授权
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
# 7. 再次验证用户登录状态和信息
user = auth.get_user(self.client)
assert user.is_authenticated
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email'])
# 测试微博登录(用户信息不含邮箱的场景):验证跳转邮箱填写页、绑定邮箱后登录
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_without_email(self, mock_do_get, mock_do_post):
# 1. 模拟do_post返回access_token和uid
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
# 2. 模拟do_get返回不含邮箱的用户信息
mock_user_info = {
"avatar_large": "avatar_large",
"screen_name": "screen_name1",
"id": "id",
}
mock_do_get.return_value = json.dumps(mock_user_info)
# 3. 发送请求到微博登录地址断言重定向到微博API
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
# 4. 模拟授权回调:因无邮箱,应跳转到邮箱填写页
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302) # 重定向到邮箱填写页
# 5. 解析邮箱填写页URL中的oauth_user_id第三方用户记录ID
oauth_user_id = int(response.url.split('/')[-1].split('.')[0])
self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html') # 断言跳转地址正确
# 6. 模拟提交邮箱发送POST请求到邮箱填写页传入邮箱和oauth_user_id
response = self.client.post(response.url, {'email': 'test@gmail.com', 'oauthid': oauth_user_id})
self.assertEqual(response.status_code, 302) # 提交后重定向到绑定成功页
# 7. 生成邮箱验证的签名使用项目工具类基于SECRET_KEY和oauth_user_id
sign = get_sha256(settings.SECRET_KEY + str(oauth_user_id) + settings.SECRET_KEY)
# 8. 反向解析绑定成功页URL断言提交邮箱后跳转地址正确
url = reverse('oauth:bindsuccess', kwargs={'oauthid': oauth_user_id})
self.assertEqual(response.url, f'{url}?type=email') # 跳转带type=email参数的绑定成功页
# 9. 模拟访问邮箱验证链接(确认邮箱绑定)
path = reverse('oauth:email_confirm', kwargs={'id': oauth_user_id, 'sign': sign})
response = self.client.get(path)
self.assertEqual(response.status_code, 302)
# 断言验证后跳转到带type=success的绑定成功页
self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success')
# 10. 验证用户已登录,且信息与填写的邮箱一致
user = auth.get_user(self.client)
from oauth.models import OAuthUser # 导入OAuthUser模型避免循环导入
oauth_user = OAuthUser.objects.get(author=user) # 获取关联的第三方用户记录
self.assertTrue(user.is_authenticated) # 断言用户已认证
self.assertEqual(user.username, mock_user_info['screen_name']) # 用户名=微博昵称
self.assertEqual(user.email, 'test@gmail.com') # 邮箱=填写的测试邮箱
self.assertEqual(oauth_user.pk, oauth_user_id) # 关联的第三方用户ID正确

@ -0,0 +1,46 @@
from django.urls import path
from . import views
# 定义应用命名空间用于URL反向解析时区分不同应用的URL
app_name = "oauth"
# OAuth认证模块的URL配置
urlpatterns = [
# OAuth授权回调端点
# 第三方平台授权成功后回调此URL
path(
r'oauth/authorize',
views.authorize),
# 邮箱补充页面
# 当OAuth用户没有邮箱时需要补充邮箱信息
# oauthid: OAuth用户ID
path(
r'oauth/requireemail/<int:oauthid>.html',
views.RequireEmailView.as_view(),
name='require_email'),
# 邮箱确认端点
# 验证用户邮箱的确认链接
# id: 用户ID, sign: 安全签名
path(
r'oauth/emailconfirm/<int:id>/<sign>.html',
views.emailconfirm,
name='email_confirm'),
# 绑定成功页面
# OAuth账号绑定成功后的展示页面
# oauthid: OAuth用户ID
path(
r'oauth/bindsuccess/<int:oauthid>.html',
views.bindsuccess,
name='bindsuccess'),
# OAuth登录入口
# 发起第三方登录请求的端点
path(
r'oauth/oauthlogin',
views.oauthlogin,
name='oauthlogin')
]

@ -0,0 +1,313 @@
import logging # 导入日志模块,用于记录系统运行日志
# Create your views here.
from urllib.parse import urlparse # 导入URL解析工具用于验证跳转URL的合法性
from django.conf import settings # 导入Django项目配置
from django.contrib.auth import get_user_model # 导入获取用户模型的工具(支持自定义用户模型)
from django.contrib.auth import login # 导入登录函数,用于实现用户登录
from django.core.exceptions import ObjectDoesNotExist # 导入对象不存在异常类
from django.db import transaction # 导入数据库事务工具,确保数据操作的原子性
from django.http import HttpResponseForbidden # 导入403禁止访问响应类
from django.http import HttpResponseRedirect # 导入重定向响应类
from django.shortcuts import get_object_or_404 # 导入获取对象或返回404的工具
from django.shortcuts import render # 导入渲染模板函数
from django.urls import reverse # 导入URL反向解析工具
from django.utils import timezone # 导入时间工具,用于生成时间相关的唯一标识
from django.utils.translation import gettext_lazy as _ # 导入国际化翻译工具
from django.views.generic import FormView # 导入表单视图基类,用于处理表单提交
from djangoblog.blog_signals import oauth_user_login_signal # 导入OAuth用户登录信号
from djangoblog.utils import get_current_site # 导入获取当前站点信息的工具
from djangoblog.utils import send_email, get_sha256 # 导入发送邮件和SHA256加密工具
from oauth.forms import RequireEmailForm # 导入获取邮箱的表单类
from .models import OAuthUser # 导入OAuth用户模型
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException # 导入OAuth管理器和异常类
# 初始化日志记录器,用于记录当前模块的日志
logger = logging.getLogger(__name__)
def get_redirecturl(request):
"""
处理跳转URL的验证和清洗确保跳转地址合法
:param request: HTTP请求对象
:return: 清洗后的合法跳转URL
"""
nexturl = request.GET.get('next_url', None) # 从请求参数中获取跳转URL
# 处理默认情况无next_url或指向登录页时默认跳转到首页
if not nexturl or nexturl == '/login/' or nexturl == '/login':
nexturl = '/'
return nexturl
# 解析URL验证域名合法性防止跳转到外部恶意网站
p = urlparse(nexturl)
if p.netloc: # 如果URL包含域名非相对路径
site = get_current_site().domain # 获取当前网站域名
# 比较跳转URL的域名与当前网站域名忽略www.前缀)
if not p.netloc.replace('www.', '') == site.replace('www.', ''):
logger.info('非法url:' + nexturl) # 记录非法URL日志
return "/" # 非法URL时跳转到首页
return nexturl # 返回合法的跳转URL
def oauthlogin(request):
"""
处理第三方登录请求生成并跳转到第三方平台的授权页面
:param request: HTTP请求对象
:return: 重定向到第三方授权页面的响应
"""
type = request.GET.get('type', None) # 获取第三方平台类型如weibo、github等
if not type: # 无平台类型时跳转到首页
return HttpResponseRedirect('/')
# 根据平台类型获取对应的OAuth管理器如微博管理器、GitHub管理器
manager = get_manager_by_type(type)
if not manager: # 管理器不存在时跳转到首页
return HttpResponseRedirect('/')
nexturl = get_redirecturl(request) # 获取并验证跳转URL
# 通过管理器生成第三方平台的授权URL包含回调地址、state等参数
authorizeurl = manager.get_authorization_url(nexturl)
return HttpResponseRedirect(authorizeurl) # 重定向到第三方授权页面
def authorize(request):
"""
处理第三方平台的授权回调验证授权码并完成用户登录/绑定流程
:param request: HTTP请求对象
:return: 重定向到目标页面或邮箱填写页的响应
"""
type = request.GET.get('type', None) # 获取第三方平台类型
if not type: # 无平台类型时跳转到首页
return HttpResponseRedirect('/')
manager = get_manager_by_type(type) # 获取对应的OAuth管理器
if not manager: # 管理器不存在时跳转到首页
return HttpResponseRedirect('/')
code = request.GET.get('code', None) # 获取第三方平台返回的授权码
try:
# 使用授权码获取访问令牌access token
rsp = manager.get_access_token_by_code(code)
except OAuthAccessTokenException as e: # 捕获令牌获取失败异常
logger.warning("OAuthAccessTokenException:" + str(e)) # 记录警告日志
return HttpResponseRedirect('/')
except Exception as e: # 捕获其他异常
logger.error(e) # 记录错误日志
rsp = None
nexturl = get_redirecturl(request) # 获取验证后的跳转URL
if not rsp: # 令牌获取失败时重新生成授权URL并跳转
return HttpResponseRedirect(manager.get_authorization_url(nexturl))
# 通过令牌获取第三方用户信息(如昵称、头像、邮箱等)
user = manager.get_oauth_userinfo()
if user: # 成功获取用户信息
# 处理空昵称:生成基于时间的默认昵称
if not user.nickname or not user.nickname.strip():
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
try:
# 查找是否已存在该第三方用户的关联记录
temp = OAuthUser.objects.get(type=type, openid=user.openid)
# 更新已有记录的头像、元数据和昵称
temp.picture = user.picture
temp.metadata = user.metadata
temp.nickname = user.nickname
user = temp # 复用已有记录
except ObjectDoesNotExist:
# 不存在时使用新用户信息(后续会保存)
pass
# 特殊处理Facebook的token因过长可能导致存储问题此处清空
if type == 'facebook':
user.token = ''
if user.email: # 若第三方用户信息包含邮箱(可直接绑定/创建本地用户)
with transaction.atomic(): # 使用数据库事务确保操作原子性
author = None
try:
# 尝试通过关联ID获取本地用户
author = get_user_model().objects.get(id=user.author_id)
except ObjectDoesNotExist:
pass # 关联用户不存在时继续处理
if not author: # 本地用户不存在时,通过邮箱查找或创建
# 查找或创建与第三方邮箱一致的本地用户
result = get_user_model().objects.get_or_create(email=user.email)
author = result[0]
# 若为新创建的用户result[1]为True
if result[1]:
try:
# 检查昵称是否已被使用
get_user_model().objects.get(username=user.nickname)
except ObjectDoesNotExist:
# 未被使用则设置昵称
author.username = user.nickname
else:
# 已被使用则生成基于时间的唯一用户名
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.source = 'authorize' # 标记用户来源为OAuth授权
author.save() # 保存本地用户信息
# 关联第三方用户记录与本地用户
user.author = author
user.save() # 保存第三方用户记录
# 发送OAuth用户登录信号供其他模块监听处理
oauth_user_login_signal.send(
sender=authorize.__class__, id=user.id)
login(request, author) # 登录本地用户
return HttpResponseRedirect(nexturl) # 重定向到目标页面
else: # 第三方用户信息不含邮箱(需要用户补充邮箱)
user.save() # 先保存第三方用户记录(无关联本地用户)
# 生成邮箱填写页的URL
url = reverse('oauth:require_email', kwargs={'oauthid': user.id})
return HttpResponseRedirect(url) # 重定向到邮箱填写页
else: # 未获取到用户信息时,跳转到目标页面
return HttpResponseRedirect(nexturl)
def emailconfirm(request, id, sign):
"""
处理邮箱验证请求完成第三方用户与本地用户的绑定
:param request: HTTP请求对象
:param id: OAuthUser记录ID
:param sign: 验证签名防止恶意请求
:return: 重定向到绑定成功页或403禁止访问
"""
if not sign: # 无签名时返回403
return HttpResponseForbidden()
# 验证签名合法性使用SECRET_KEY和ID生成SHA256比对
if not get_sha256(settings.SECRET_KEY + str(id) + settings.SECRET_KEY).upper() == sign.upper():
return HttpResponseForbidden() # 签名不匹配时返回403
# 获取对应的第三方用户记录不存在则返回404
oauthuser = get_object_or_404(OAuthUser, pk=id)
with transaction.atomic(): # 数据库事务确保操作原子性
if oauthuser.author: # 若已关联本地用户
author = get_user_model().objects.get(pk=oauthuser.author_id)
else: # 未关联本地用户时,通过邮箱创建或查找
result = get_user_model().objects.get_or_create(email=oauthuser.email)
author = result[0]
# 若为新创建的用户
if result[1]:
author.source = 'emailconfirm' # 标记用户来源为邮箱验证
# 设置用户名为第三方昵称(为空时生成默认值)
author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip() else "djangoblog" + timezone.now().strftime(
'%y%m%d%I%M%S')
author.save() # 保存本地用户
# 关联第三方用户记录与本地用户
oauthuser.author = author
oauthuser.save() # 保存第三方用户记录
# 发送OAuth用户登录信号
oauth_user_login_signal.send(
sender=emailconfirm.__class__, id=oauthuser.id)
login(request, author) # 登录本地用户
# 生成欢迎邮件内容(包含网站信息)
site = 'http://' + get_current_site().domain
content = _('''
<p>Congratulations, you have successfully bound your email address. You can use
%(oauthuser_type)s to directly log in to this website without a password.</p>
You are welcome to continue to follow this site, the address is
<a href="%(site)s" rel="bookmark">%(site)s</a>
Thank you again!
<br />
If the link above cannot be opened, please copy this link to your browser.
%(site)s
''') % {'oauthuser_type': oauthuser.type, 'site': site}
# 发送绑定成功邮件
send_email(emailto=[oauthuser.email, ], title=_('Congratulations on your successful binding!'), content=content)
# 生成绑定成功页URL
url = reverse('oauth:bindsuccess', kwargs={'oauthid': id})
url = url + '?type=success' # 添加成功标识参数
return HttpResponseRedirect(url) # 重定向到绑定成功页
class RequireEmailView(FormView):
"""处理邮箱填写的表单视图继承自Django的FormView"""
form_class = RequireEmailForm # 指定使用的表单类
template_name = 'oauth/require_email.html' # 指定渲染的模板
def get(self, request, *args, **kwargs):
"""处理GET请求获取第三方用户记录若已填写邮箱则跳转注释中逻辑"""
oauthid = self.kwargs['oauthid'] # 从URL参数中获取第三方用户ID
oauthuser = get_object_or_404(OAuthUser, pk=oauthid) # 获取第三方用户记录
if oauthuser.email:
pass # 若已填写邮箱,此处可添加跳转逻辑(当前注释)
# 调用父类的GET方法渲染表单页面
return super(RequireEmailView, self).get(request, *args, **kwargs)
def get_initial(self):
"""设置表单初始值预填第三方用户ID"""
oauthid = self.kwargs['oauthid']
return {'email': '', 'oauthid': oauthid} # 邮箱为空ID为URL参数中的值
def get_context_data(self, **kwargs):
"""扩展上下文数据:添加第三方用户的头像(用于模板显示)"""
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.picture: # 若存在头像地址
kwargs['picture'] = oauthuser.picture # 添加到上下文
# 调用父类方法,返回扩展后的上下文
return super(RequireEmailView, self).get_context_data(**kwargs)
def form_valid(self, form):
"""处理合法的表单提交:保存邮箱并发送验证邮件"""
email = form.cleaned_data['email'] # 获取清洗后的邮箱
oauthid = form.cleaned_data['oauthid'] # 获取第三方用户ID
oauthuser = get_object_or_404(OAuthUser, pk=oauthid) # 获取第三方用户记录
oauthuser.email = email # 设置邮箱
oauthuser.save() # 保存记录
# 生成邮箱验证的签名
sign = get_sha256(settings.SECRET_KEY + str(oauthuser.id) + settings.SECRET_KEY)
# 获取当前站点域名开发环境使用127.0.0.1:8000
site = get_current_site().domain
if settings.DEBUG:
site = '127.0.0.1:8000'
# 生成邮箱验证链接
path = reverse('oauth:email_confirm', kwargs={'id': oauthid, 'sign': sign})
url = "http://{site}{path}".format(site=site, path=path)
# 生成验证邮件内容
content = _("""
<p>Please click the link below to bind your email</p>
<a href="%(url)s" rel="bookmark">%(url)s</a>
Thank you again!
<br />
If the link above cannot be opened, please copy this link to your browser.
<br />
%(url)s
""") % {'url': url}
# 发送验证邮件
send_email(emailto=[email, ], title=_('Bind your email'), content=content)
# 生成绑定成功页URL带邮件发送标识
url = reverse('oauth:bindsuccess', kwargs={'oauthid': oauthid})
url = url + '?type=email'
return HttpResponseRedirect(url) # 重定向到绑定成功页
def bindsuccess(request, oauthid):
"""
显示绑定成功页面根据类型显示不同内容
:param request: HTTP请求对象
:param oauthid: 第三方用户ID
:return: 渲染绑定成功页面的响应
"""
type = request.GET.get('type', None) # 获取类型参数email或success
oauthuser = get_object_or_404(OAuthUser, pk=oauthid) # 获取第三方用户记录
if type == 'email': # 邮件已发送但未验证
title = _('Bind your email')
content = _(
'Congratulations, the binding is just one step away. '
'Please log in to your email to check the email to complete the binding. Thank you.')
else: # 绑定已完成
title = _('Binding successful')
content = _(
"Congratulations, you have successfully bound your email address. You can use %(oauthuser_type)s"
" to directly log in to this website without a password. You are welcome to continue to follow this site." % {
'oauthuser_type': oauthuser.type})
# 渲染绑定成功页面,传递标题和内容
return render(request, 'oauth/bindsuccess.html', {'title': title, 'content': content})
Loading…
Cancel
Save