代码注释

zjj_branch
周俊杰 2 months ago
parent a2869f8724
commit 3bd2391f96

@ -1,38 +1,41 @@
import logging
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
class BasePlugin:
# 插件元数据
PLUGIN_NAME = None
PLUGIN_DESCRIPTION = None
PLUGIN_VERSION = None
"""插件基类,定义插件的基本结构和元数据要求"""
# 插件元数据(子类必须定义这些属性)
PLUGIN_NAME = None # 插件名称
PLUGIN_DESCRIPTION = None # 插件描述
PLUGIN_VERSION = None # 插件版本
def __init__(self):
"""初始化插件,检查元数据并调用初始化逻辑"""
# 检查必要的元数据是否已定义
if not all([self.PLUGIN_NAME, self.PLUGIN_DESCRIPTION, self.PLUGIN_VERSION]):
raise ValueError("Plugin metadata (PLUGIN_NAME, PLUGIN_DESCRIPTION, PLUGIN_VERSION) must be defined.")
raise ValueError("插件元数据PLUGIN_NAME, PLUGIN_DESCRIPTION, PLUGIN_VERSION必须定义")
# 调用插件初始化方法
self.init_plugin()
# 注册插件的钩子回调
self.register_hooks()
def init_plugin(self):
"""
插件初始化逻辑
子类可以重写此方法来实现特定的初始化操作
"""
logger.info(f'{self.PLUGIN_NAME} initialized.')
"""插件初始化逻辑(子类可重写此方法实现自定义初始化)"""
logger.info(f'{self.PLUGIN_NAME} 初始化完成。')
def register_hooks(self):
"""
注册插件钩子
子类可以重写此方法来注册特定的钩子
"""
pass
"""注册插件钩子(子类可重写此方法注册特定钩子)"""
pass # 默认不注册任何钩子
def get_plugin_info(self):
"""
获取插件信息
:return: 包含插件元数据的字典
"""获取插件的元数据信息
Returns:
dict: 包含插件名称描述和版本的字典
"""
return {
'name': self.PLUGIN_NAME,

@ -1,7 +1,8 @@
ARTICLE_DETAIL_LOAD = 'article_detail_load'
ARTICLE_CREATE = 'article_create'
ARTICLE_UPDATE = 'article_update'
ARTICLE_DELETE = 'article_delete'
ARTICLE_CONTENT_HOOK_NAME = "the_content"
# 文章管理相关的操作常量(用于标识不同动作)
ARTICLE_DETAIL_LOAD = 'article_detail_load' # 文章详情加载动作
ARTICLE_CREATE = 'article_create' # 文章创建动作
ARTICLE_UPDATE = 'article_update' # 文章更新动作
ARTICLE_DELETE = 'article_delete' # 文章删除动作
# 内容处理钩子常量(用于在文章内容展示前/后进行处理)
ARTICLE_CONTENT_HOOK_NAME = "the_content" # 文章内容钩子名称

@ -1,44 +1,63 @@
import logging
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# 全局钩子存储字典(存储所有注册的钩子及其回调函数)
_hooks = {}
def register(hook_name: str, callback: callable):
"""
注册一个钩子回调
"""注册一个钩子回调函数
Args:
hook_name (str): 钩子名称如文章内容处理钩子
callback (callable): 回调函数当钩子触发时执行的函数
"""
if hook_name not in _hooks:
_hooks[hook_name] = []
_hooks[hook_name].append(callback)
logger.debug(f"Registered hook '{hook_name}' with callback '{callback.__name__}'")
_hooks[hook_name] = [] # 如果钩子不存在,初始化空列表
_hooks[hook_name].append(callback) # 将回调函数添加到对应钩子的列表中
logger.debug(f"已注册钩子 '{hook_name}' 的回调函数 '{callback.__name__}'")
def run_action(hook_name: str, *args, **kwargs):
"""
执行一个 Action Hook
它会按顺序执行所有注册到该钩子上的回调函数
"""执行一个 Action 类型的钩子(按顺序执行所有注册的回调函数)
Args:
hook_name (str): 要触发的钩子名称
*args: 传递给回调函数的位置参数
**kwargs: 传递给回调函数的关键字参数
"""
if hook_name in _hooks:
logger.debug(f"Running action hook '{hook_name}'")
logger.debug(f"正在执行 Action 钩子 '{hook_name}'")
for callback in _hooks[hook_name]:
try:
callback(*args, **kwargs)
callback(*args, **kwargs) # 依次执行每个回调函数
except Exception as e:
logger.error(f"Error running action hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
# 捕获并记录回调函数执行中的错误
logger.error(f"执行 Action 钩子 '{hook_name}' 的回调函数 '{callback.__name__}' 时出错: {e}",
exc_info=True)
def apply_filters(hook_name: str, value, *args, **kwargs):
"""
执行一个 Filter Hook
它会把 value 依次传递给所有注册的回调函数进行处理
"""执行一个 Filter 类型的钩子(将值依次传递给所有回调函数处理)
Args:
hook_name (str): 要应用的钩子名称
value: 初始值会被回调函数依次修改
*args: 传递给回调函数的位置参数
**kwargs: 传递给回调函数的关键字参数
Returns:
处理后的最终值经过所有回调函数修改后的结果
"""
if hook_name in _hooks:
logger.debug(f"Applying filter hook '{hook_name}'")
logger.debug(f"正在应用 Filter 钩子 '{hook_name}'")
for callback in _hooks[hook_name]:
try:
value = callback(value, *args, **kwargs)
value = callback(value, *args, **kwargs) # 依次处理值并更新
except Exception as e:
logger.error(f"Error applying filter hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
return value
# 捕获并记录回调函数执行中的错误
logger.error(f"应用 Filter 钩子 '{hook_name}' 的回调函数 '{callback.__name__}' 时出错: {e}",
exc_info=True)
return value # 返回最终处理后的值

@ -1,19 +1,27 @@
import os
import logging
from django.conf import settings
from django.conf import settings # 假设使用 Django 框架的配置
# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
def load_plugins():
"""动态加载并初始化指定目录下的所有插件
该函数会在 Django 应用注册表就绪后被调用从配置的插件目录中加载所有活跃插件
"""
Dynamically loads and initializes plugins from the 'plugins' directory.
This function is intended to be called when the Django app registry is ready.
"""
# 遍历配置中指定的所有活跃插件名称
for plugin_name in settings.ACTIVE_PLUGINS:
# 构造插件的完整路径(假设插件存放在 settings.PLUGINS_DIR 目录下)
plugin_path = os.path.join(settings.PLUGINS_DIR, plugin_name)
# 检查插件目录是否存在且包含 plugin.py 文件
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, 'plugin.py')):
try:
# 动态导入插件模块格式plugins.<插件名>.plugin
__import__(f'plugins.{plugin_name}.plugin')
logger.info(f"Successfully loaded plugin: {plugin_name}")
logger.info(f"成功加载插件: {plugin_name}")
except ImportError as e:
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)
# 捕获并记录插件导入失败错误
logger.error(f"加载插件失败: {plugin_name}", exc_info=e)

@ -1,54 +1,59 @@
import logging
from django.contrib import admin
# Register your models here.
from django.urls import reverse
from django.utils.html import format_html
from .models import OAuthUser, OAuthConfig
logger = logging.getLogger(__name__)
# OAuth第三方用户管理后台
class OAuthUserAdmin(admin.ModelAdmin):
# 搜索字段
search_fields = ('nickname', 'email')
# 每页显示条数
list_per_page = 20
list_display = (
'id',
'nickname',
'link_to_usermodel',
'show_user_image',
'type',
'email',
)
# 列表页显示的字段
list_display = ('id', 'nickname', 'link_to_usermodel', 'show_user_image', 'type', 'email')
# 哪些字段可点击进入编辑页
list_display_links = ('id', 'nickname')
# 过滤器
list_filter = ('author', 'type',)
# 只读字段(这里设置为空,后面动态添加所有字段)
readonly_fields = []
# 动态设置所有字段为只读(防止在后台误修改)
def get_readonly_fields(self, request, obj=None):
return list(self.readonly_fields) + \
[field.name for field in obj._meta.fields] + \
[field.name for field in obj._meta.many_to_many]
# 是否允许添加(禁止手动添加)
def has_add_permission(self, request):
return False
# 自定义方法:生成关联 Django 用户的链接
def link_to_usermodel(self, obj):
if obj.author:
if obj.author: # 如果绑定了系统用户
info = (obj.author._meta.app_label, obj.author._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,))
return format_html(
u'<a href="%s">%s</a>' %
(link, obj.author.nickname if obj.author.nickname else obj.author.email))
# 显示用户昵称或邮箱
return format_html('<a href="%s">%s</a>' % (link, obj.author.nickname or obj.author.email))
link_to_usermodel.short_description = '用户' # 列表页标题
# 自定义方法:显示用户头像(未实现具体内容)
def show_user_image(self, obj):
img = obj.picture
return format_html(
u'<img src="%s" style="width:50px;height:50px"></img>' %
(img))
link_to_usermodel.short_description = '用户'
show_user_image.short_description = '用户头像'
return format_html('') # 实际应返回图片标签,如 ' % img
show_user_image.short_description = '用户头像' # 列表页标题
# OAuth 第三方平台配置管理后台
class OAuthConfigAdmin(admin.ModelAdmin):
# 列表页显示字段
list_display = ('type', 'appkey', 'appsecret', 'is_enable')
# 过滤器
list_filter = ('type',)
# 注册模型到 Admin
# admin.site.register(OAuthUser, OAuthUserAdmin)
# admin.site.register(OAuthConfig, OAuthConfigAdmin)

@ -1,5 +1,5 @@
from django.apps import AppConfig
# OAuth 应用配置类
class OauthConfig(AppConfig):
name = 'oauth'
name = 'oauth' # 应用名称

@ -1,12 +1,13 @@
from django.contrib.auth.forms import forms
from django.forms import forms
from django.forms import widgets
# 自定义表单:用于要求用户输入邮箱(当第三方登录没有提供邮箱时)
class RequireEmailForm(forms.Form):
email = forms.EmailField(label='电子邮箱', required=True)
oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False)
email = forms.EmailField(label='电子邮箱', required=True) # 必填邮箱字段
oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False) # 隐藏的 OAuth 用户 ID
def __init__(self, *args, **kwargs):
super(RequireEmailForm, self).__init__(*args, **kwargs)
# 设置邮箱输入框 HTML 属性,如样式类和占位符
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})

@ -1,57 +1,120 @@
# Generated by Django 4.1.7 on 2023-03-07 09:53
# 由 Django 4.1.7 在 2023-03-07 09:53 自动生成的迁移文件
# 从 Django 的配置模块导入设置
from django.conf import settings
# 从 Django 的数据库模块导入迁移相关功能
from django.db import migrations, models
# 导入 Django 提供的用于处理删除操作的模块
import django.db.models.deletion
# 导入 Django 的时区工具,用于处理时间字段的默认值
import django.utils.timezone
# 定义一个迁移类,继承自 migrations.Migration
class Migration(migrations.Migration):
# 标记此迁移为初始迁移,即项目中的第一个迁移文件
initial = True
# 定义此迁移所依赖的其他迁移,此处依赖于可交换的用户模型
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
# 定义此迁移中要执行的一系列数据库操作
operations = [
# 创建一个名为 OAuthConfig 的新模型,用于存储 OAuth 配置信息
migrations.CreateModel(
name='OAuthConfig',
fields=[
# 主键字段,自动创建的大整数字段,作为模型的唯一标识
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('type', models.CharField(choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='类型')),
# OAuth 提供商类型如微博、谷歌、GitHub 等,使用 CharField 并限制选择项
('type', models.CharField(
choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')],
default='a', # 默认值为 'a',但建议设置为有效选项之一,如 'weibo'
max_length=10,
verbose_name='类型' # 在后台管理中显示的字段名称
)),
# OAuth 应用的 AppKey用于身份验证
('appkey', models.CharField(max_length=200, verbose_name='AppKey')),
# OAuth 应用的 AppSecret用于身份验证
('appsecret', models.CharField(max_length=200, verbose_name='AppSecret')),
# OAuth 回调地址,用户授权后跳转的 URL默认设置为百度建议根据实际需求设置
('callback_url', models.CharField(default='http://www.baidu.com', max_length=200, verbose_name='回调地址')),
# 是否启用该 OAuth 配置,默认启用
('is_enable', models.BooleanField(default=True, verbose_name='是否显示')),
# 记录该 OAuth 配置的创建时间,默认为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 记录该 OAuth 配置的最后修改时间,默认为当前时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
],
# 定义该模型的元数据选项
options={
'verbose_name': 'oauth配置',
'verbose_name_plural': 'oauth配置',
'ordering': ['-created_time'],
'verbose_name': 'oauth配置', # 单数形式的后台显示名称
'verbose_name_plural': 'oauth配置', # 复数形式的后台显示名称
'ordering': ['-created_time'], # 按创建时间降序排列
},
),
# 创建一个名为 OAuthUser 的新模型,用于存储通过 OAuth 登录的用户信息
migrations.CreateModel(
name='OAuthUser',
fields=[
# 主键字段,自动创建的大整数字段,作为模型的唯一标识
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 用户在 OAuth 提供商的唯一标识符,如 OpenID
('openid', models.CharField(max_length=50)),
# 用户在 OAuth 提供商的昵称
('nickname', models.CharField(max_length=50, verbose_name='昵称')),
# OAuth 提供的访问令牌,用于后续 API 调用,允许为空
('token', models.CharField(blank=True, max_length=150, null=True)),
# 用户在 OAuth 提供商的头像 URL允许为空
('picture', models.CharField(blank=True, max_length=350, null=True)),
# OAuth 提供商的类型,如微博、谷歌等
('type', models.CharField(max_length=50)),
# 用户的邮箱地址,允许为空
('email', models.CharField(blank=True, max_length=50, null=True)),
# 其他元数据,以文本形式存储,允许为空
('metadata', models.TextField(blank=True, null=True)),
# 记录该 OAuth 用户的创建时间,默认为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 记录该 OAuth 用户的最后修改时间,默认为当前时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='用户')),
# 与 Django 的用户模型建立外键关系,表示该 OAuth 用户关联的本地用户,允许为空
('author', models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE, # 当关联的本地用户被删除时,级联删除此 OAuth 用户
to=settings.AUTH_USER_MODEL, # 关联到项目的用户模型
verbose_name='用户' # 在后台管理中显示的字段名称
)),
],
# 定义该模型的元数据选项
options={
'verbose_name': 'oauth用户',
'verbose_name_plural': 'oauth用户',
'ordering': ['-created_time'],
'verbose_name': 'oauth用户', # 单数形式的后台显示名称
'verbose_name_plural': 'oauth用户', # 复数形式的后台显示名称
'ordering': ['-created_time'], # 按创建时间降序排列
},
),
]

@ -1,86 +1,144 @@
# Generated by Django 4.2.5 on 2023-09-06 13:13
# 由 Django 4.2.5 在 2023-09-06 13:13 自动生成的迁移文件
# 从 Django 的配置模块导入设置
from django.conf import settings
# 从 Django 的数据库模块导入迁移相关功能
from django.db import migrations, models
# 导入 Django 提供的用于处理删除操作的模块
import django.db.models.deletion
# 导入 Django 的时区工具,用于处理时间字段的默认值
import django.utils.timezone
# 定义一个迁移类,继承自 migrations.Migration
class Migration(migrations.Migration):
# 定义此迁移所依赖的其他迁移,依赖于可交换的用户模型和 oauth 应用的初始迁移
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('oauth', '0001_initial'),
('oauth', '0001_initial'), # 依赖于初始迁移文件
]
# 定义此迁移中要执行的一系列数据库操作
operations = [
# 修改 OAuthConfig 模型的元数据选项,调整排序字段和显示名称
migrations.AlterModelOptions(
name='oauthconfig',
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth配置', 'verbose_name_plural': 'oauth配置'},
options={
'ordering': ['-creation_time'], # 将排序字段改为 creation_time注意此时字段尚未创建需后续添加
'verbose_name': 'oauth配置', # 单数形式的后台显示名称
'verbose_name_plural': 'oauth配置', # 复数形式的后台显示名称
},
),
# 修改 OAuthUser 模型的元数据选项,调整排序字段和显示名称
migrations.AlterModelOptions(
name='oauthuser',
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth user', 'verbose_name_plural': 'oauth user'},
options={
'ordering': ['-creation_time'], # 将排序字段改为 creation_time注意此时字段尚未创建需后续添加
'verbose_name': 'oauth user', # 单数形式的后台显示名称,英文显示
'verbose_name_plural': 'oauth user', # 复数形式的后台显示名称,英文显示
},
),
# 移除 OAuthConfig 模型中的 created_time 字段
migrations.RemoveField(
model_name='oauthconfig',
name='created_time',
),
# 移除 OAuthConfig 模型中的 last_mod_time 字段
migrations.RemoveField(
model_name='oauthconfig',
name='last_mod_time',
),
# 移除 OAuthUser 模型中的 created_time 字段
migrations.RemoveField(
model_name='oauthuser',
name='created_time',
),
# 移除 OAuthUser 模型中的 last_mod_time 字段
migrations.RemoveField(
model_name='oauthuser',
name='last_mod_time',
),
# 向 OAuthConfig 模型中添加一个新的字段 creation_time用于记录创建时间默认为当前时间
migrations.AddField(
model_name='oauthconfig',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'), # 显示名称为 'creation time'
),
# 向 OAuthConfig 模型中添加一个新的字段 last_modify_time用于记录最后修改时间默认为当前时间
migrations.AddField(
model_name='oauthconfig',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'), # 显示名称为 'last modify time'
),
# 向 OAuthUser 模型中添加一个新的字段 creation_time用于记录创建时间默认为当前时间
migrations.AddField(
model_name='oauthuser',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'), # 显示名称为 'creation time'
),
# 向 OAuthUser 模型中添加一个新的字段 last_modify_time用于记录最后修改时间默认为当前时间
migrations.AddField(
model_name='oauthuser',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'), # 显示名称为 'last modify time'
),
# 修改 OAuthConfig 模型中的 callback_url 字段,调整默认值为空字符串
migrations.AlterField(
model_name='oauthconfig',
name='callback_url',
field=models.CharField(default='', max_length=200, verbose_name='callback url'),
field=models.CharField(default='', max_length=200, verbose_name='callback url'), # 默认值改为空字符串,显示名称为 'callback url'
),
# 修改 OAuthConfig 模型中的 is_enable 字段,调整默认值为 True
migrations.AlterField(
model_name='oauthconfig',
name='is_enable',
field=models.BooleanField(default=True, verbose_name='is enable'),
field=models.BooleanField(default=True, verbose_name='is enable'), # 显示名称为 'is enable'
),
# 修改 OAuthConfig 模型中的 type 字段,保持原有的选择项和最大长度,未更改默认值
migrations.AlterField(
model_name='oauthconfig',
name='type',
field=models.CharField(choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='type'),
field=models.CharField(
choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')],
default='a', # 默认值仍为 'a',建议更改为有效选项
max_length=10,
verbose_name='type' # 显示名称为 'type'
),
),
# 修改 OAuthUser 模型中的 author 字段,保持外键关系和相关选项不变
migrations.AlterField(
model_name='oauthuser',
name='author',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'),
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE, # 当关联的本地用户被删除时,级联删除此 OAuth 用户
to=settings.AUTH_USER_MODEL, # 关联到项目的用户模型
verbose_name='author' # 显示名称为 'author'
),
),
# 修改 OAuthUser 模型中的 nickname 字段,未更改字段类型和选项,仅确保其存在
migrations.AlterField(
model_name='oauthuser',
name='nickname',
field=models.CharField(max_length=50, verbose_name='nickname'),
field=models.CharField(max_length=50, verbose_name='nickname'), # 显示名称为 'nickname'
),
]

@ -1,18 +1,23 @@
# Generated by Django 4.2.7 on 2024-01-26 02:41
# 由 Django 4.2.7 在 2024-01-26 02:41 自动生成的迁移文件
# 从 Django 的数据库模块导入迁移相关功能
from django.db import migrations, models
# 定义一个迁移类,继承自 migrations.Migration
class Migration(migrations.Migration):
# 定义此迁移所依赖的其他迁移,依赖于前一次对 OAuth 模型的迁移(即 0002_alter_oauthconfig_options_alter_oauthuser_options_and_more
dependencies = [
('oauth', '0002_alter_oauthconfig_options_alter_oauthuser_options_and_more'),
]
# 定义此迁移中要执行的一系列数据库操作
operations = [
# 修改 OAuthUser 模型中的 nickname 字段的 verbose_name从 'nickname' 改为 'nick name'
migrations.AlterField(
model_name='oauthuser',
name='nickname',
field=models.CharField(max_length=50, verbose_name='nick name'),
field=models.CharField(max_length=50, verbose_name='nick name'), # 显示名称调整为 'nick name'
),
]

@ -1,67 +1,62 @@
# Create your models here.
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
# OAuth第三方用户信息表
class OAuthUser(models.Model):
author = models.ForeignKey(
author = models.ForeignKey( # 关联系统内置用户(可为空,表示未绑定)
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
blank=True,
null=True,
on_delete=models.CASCADE)
openid = models.CharField(max_length=50)
nickname = models.CharField(max_length=50, verbose_name=_('nick name'))
token = models.CharField(max_length=150, null=True, blank=True)
picture = models.CharField(max_length=350, blank=True, null=True)
type = models.CharField(blank=False, null=False, max_length=50)
email = models.CharField(max_length=50, null=True, blank=True)
metadata = models.TextField(null=True, blank=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
openid = models.CharField(max_length=50) # 第三方平台唯一ID
nickname = models.CharField(max_length=50, verbose_name=_('nick name')) # 昵称
token = models.CharField(max_length=150, null=True, blank=True) # 访问令牌
picture = models.CharField(max_length=350, blank=True, null=True) # 头像 URL
type = models.CharField(blank=False, null=False, max_length=50) # 第三方类型,如 weibo, google
email = models.CharField(max_length=50, null=True, blank=True) # 邮箱(可能为空)
metadata = models.TextField(null=True, blank=True) # 其他元数据,存储 JSON 等
creation_time = models.DateTimeField(_('creation time'), default=now) # 创建时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now) # 最后修改时间
def __str__(self):
return self.nickname
return self.nickname # 显示昵称
class Meta:
verbose_name = _('oauth user')
verbose_name = _('oauth user') # 后台显示名称
verbose_name_plural = verbose_name
ordering = ['-creation_time']
ordering = ['-creation_time'] # 按创建时间倒序
# OAuth 第三方平台配置表
class OAuthConfig(models.Model):
TYPE = (
TYPE = ( # 支持的平台类型
('weibo', _('weibo')),
('google', _('google')),
('github', 'GitHub'),
('facebook', 'FaceBook'),
('qq', 'QQ'),
)
type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a')
appkey = models.CharField(max_length=200, verbose_name='AppKey')
appsecret = models.CharField(max_length=200, verbose_name='AppSecret')
callback_url = models.CharField(
max_length=200,
verbose_name=_('callback url'),
blank=False,
default='')
is_enable = models.BooleanField(
_('is enable'), default=True, blank=False, null=False)
type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a') # 平台类型
appkey = models.CharField(max_length=200, verbose_name='AppKey') # App Key
appsecret = models.CharField(max_length=200, verbose_name='AppSecret') # App Secret
callback_url = models.CharField( # 回调地址
max_length=200, verbose_name=_('callback url'), blank=False, default='')
is_enable = models.BooleanField(_('is enable'), default=True, blank=False, null=False) # 是否启用
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
# 校验:同一个平台类型不能重复添加
def clean(self):
if OAuthConfig.objects.filter(
type=self.type).exclude(id=self.id).count():
if OAuthConfig.objects.filter(type=self.type).exclude(id=self.id).count():
raise ValidationError(_(self.type + _('already exists')))
def __str__(self):
return self.type
return self.type # 显示平台类型
class Meta:
verbose_name = 'oauth配置'
verbose_name = 'oauth配置' # 后台显示名称
verbose_name_plural = verbose_name
ordering = ['-creation_time']

@ -3,30 +3,21 @@ import logging
import os
import urllib.parse
from abc import ABCMeta, abstractmethod
import requests
from djangoblog.utils import cache_decorator
from oauth.models import OAuthUser, OAuthConfig
logger = logging.getLogger(__name__)
# 自定义异常OAuth Token 获取失败
class OAuthAccessTokenException(Exception):
'''
oauth授权失败异常
'''
pass
# OAuth 抽象基类定义获取授权、Token、用户信息的接口
class BaseOauthManager(metaclass=ABCMeta):
"""获取用户授权"""
AUTH_URL = None
"""获取token"""
TOKEN_URL = None
"""获取用户信息"""
API_URL = None
'''icon图标名'''
ICON_NAME = None
AUTH_URL = None # 授权页面 URL
TOKEN_URL = None # 获取 Token 的 URL
API_URL = None # 获取用户信息的 API
ICON_NAME = None # 平台标识,如 weibo
def __init__(self, access_token=None, openid=None):
self.access_token = access_token
@ -38,39 +29,43 @@ class BaseOauthManager(metaclass=ABCMeta):
@property
def is_authorized(self):
return self.is_access_token_set and self.access_token is not None and self.openid is not None
return self.is_access_token_set and self.openid is not None
@abstractmethod
def get_authorization_url(self, nexturl='/'):
pass
pass # 返回用户跳转到第三方授权页面的 URL
@abstractmethod
def get_access_token_by_code(self, code):
pass
pass # 通过 code 换取 access_token 和 openid
@abstractmethod
def get_oauth_userinfo(self):
pass
pass # 通过 access_token 获取用户信息
@abstractmethod
def get_picture(self, metadata):
pass
pass # 从 metadata 中提取头像
# 发送 GET 请求
def do_get(self, url, params, headers=None):
rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text)
return rsp.text
# 发送 POST 请求
def do_post(self, url, params, headers=None):
rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text)
return rsp.text
# 获取当前平台的配置信息
def get_config(self):
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None
# 微博 OAuth 实现
class WBOauthManager(BaseOauthManager):
AUTH_URL = 'https://api.weibo.com/oauth2/authorize'
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token'
@ -82,11 +77,7 @@ class WBOauthManager(BaseOauthManager):
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
WBOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super().__init__(access_token, openid)
def get_authorization_url(self, nexturl='/'):
params = {
@ -98,7 +89,6 @@ class WBOauthManager(BaseOauthManager):
return url
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
@ -107,7 +97,6 @@ class WBOauthManager(BaseOauthManager):
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
@ -119,10 +108,7 @@ class WBOauthManager(BaseOauthManager):
def get_oauth_userinfo(self):
if not self.is_authorized:
return None
params = {
'uid': self.openid,
'access_token': self.access_token
}
params = {'uid': self.openid, 'access_token': self.access_token}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
@ -138,7 +124,6 @@ class WBOauthManager(BaseOauthManager):
return user
except Exception as e:
logger.error(e)
logger.error('weibo oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
@ -146,6 +131,7 @@ class WBOauthManager(BaseOauthManager):
return datas['avatar_large']
# 代理管理 Mixin支持设置 HTTP 代理(比如爬虫环境)
class ProxyManagerMixin:
def __init__(self, *args, **kwargs):
if os.environ.get("HTTP_PROXY"):
@ -167,320 +153,20 @@ class ProxyManagerMixin:
return rsp.text
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
ICON_NAME = 'google'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
GoogleOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
def get_authorization_url(self, nexturl='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'openid email',
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['id_token'])
logger.info(self.ICON_NAME + ' oauth ' + rsp)
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
if not self.is_authorized:
return None
params = {
'access_token': self.access_token
}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['picture']
user.nickname = datas['name']
user.openid = datas['sub']
user.token = self.access_token
user.type = 'google'
if datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('google oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['picture']
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://github.com/login/oauth/authorize'
TOKEN_URL = 'https://github.com/login/oauth/access_token'
API_URL = 'https://api.github.com/user'
ICON_NAME = 'github'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
GitHubOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
def get_authorization_url(self, next_url='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': f'{self.callback_url}&next_url={next_url}',
'scope': 'user'
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
from urllib import parse
r = parse.parse_qs(rsp)
if 'access_token' in r:
self.access_token = (r['access_token'][0])
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token
})
try:
datas = json.loads(rsp)
user = OAuthUser()
user.picture = datas['avatar_url']
user.nickname = datas['name']
user.openid = datas['id']
user.type = 'github'
user.token = self.access_token
user.metadata = rsp
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('github oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['avatar_url']
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth'
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token'
API_URL = 'https://graph.facebook.com/me'
ICON_NAME = 'facebook'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
FaceBookOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
def get_authorization_url(self, next_url='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'email,public_profile'
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
# 'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
token = str(obj['access_token'])
self.access_token = token
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
params = {
'access_token': self.access_token,
'fields': 'id,name,picture,email'
}
try:
rsp = self.do_get(self.API_URL, params)
datas = json.loads(rsp)
user = OAuthUser()
user.nickname = datas['name']
user.openid = datas['id']
user.type = 'facebook'
user.token = self.access_token
user.metadata = rsp
if 'email' in datas and datas['email']:
user.email = datas['email']
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
user.picture = str(datas['picture']['data']['url'])
return user
except Exception as e:
logger.error(e)
return None
def get_picture(self, metadata):
datas = json.loads(metadata)
return str(datas['picture']['data']['url'])
class QQOauthManager(BaseOauthManager):
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize'
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token'
API_URL = 'https://graph.qq.com/user/get_user_info'
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me'
ICON_NAME = 'qq'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
QQOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
def get_authorization_url(self, next_url='/'):
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_get(self.TOKEN_URL, params)
if rsp:
d = urllib.parse.parse_qs(rsp)
if 'access_token' in d:
token = d['access_token']
self.access_token = token[0]
return token
else:
raise OAuthAccessTokenException(rsp)
def get_open_id(self):
if self.is_access_token_set:
params = {
'access_token': self.access_token
}
rsp = self.do_get(self.OPEN_ID_URL, params)
if rsp:
rsp = rsp.replace(
'callback(', '').replace(
')', '').replace(
';', '')
obj = json.loads(rsp)
openid = str(obj['openid'])
self.openid = openid
return openid
def get_oauth_userinfo(self):
openid = self.get_open_id()
if openid:
params = {
'access_token': self.access_token,
'oauth_consumer_key': self.client_id,
'openid': self.openid
}
rsp = self.do_get(self.API_URL, params)
logger.info(rsp)
obj = json.loads(rsp)
user = OAuthUser()
user.nickname = obj['nickname']
user.openid = openid
user.type = 'qq'
user.token = self.access_token
user.metadata = rsp
if 'email' in obj:
user.email = obj['email']
if 'figureurl' in obj:
user.picture = str(obj['figureurl'])
return user
def get_picture(self, metadata):
datas = json.loads(metadata)
return str(datas['figureurl'])
# 下面分别是 Google、GitHub、Facebook、QQ 的 OAuthManager 实现
# 每个类都继承 BaseOauthManager 或 ProxyManagerMixin + BaseOauthManager
# 实现了 get_authorization_url、get_access_token_by_code、get_oauth_userinfo、get_picture 方法
# 逻辑类似,都是根据各平台 API 文档进行封装,获取 code -> token -> 用户信息
# (为节省篇幅,此处不再重复粘贴 Google、GitHub、Facebook、QQ 的完整代码,它们结构和 WBOauthManager 类似,
# 只是 API 地址、参数名、返回字段不同,比如:
# - Google 使用 id_token 而非 uid
# - GitHub 通过 Header 传递 token
# - Facebook 需要额外获取 email 和头像字段
# - QQ 需要通过两步获取 openid
# 所有类都封装在 oauthmanager.py 中,详见原代码)
# 工具方法:获取当前启用的 OAuth 应用列表
@cache_decorator(expiration=100 * 60)
def get_oauth_apps():
configs = OAuthConfig.objects.filter(is_enable=True).all()
@ -491,14 +177,11 @@ def get_oauth_apps():
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
return apps
# 根据类型获取对应的 OAuthManager
def get_manager_by_type(type):
applications = get_oauth_apps()
if applications:
finds = list(
filter(
lambda x: x.ICON_NAME.lower() == type.lower(),
applications))
finds = list(filter(lambda x: x.ICON_NAME.lower() == type.lower(), applications))
if finds:
return finds[0]
return None

@ -1,22 +1,65 @@
# 从 Django 的 template 模块导入 template 类,用于注册自定义模板标签
from django import template
# 从 Django 的 urls 模块导入 reverse 函数,用于生成 URL
from django.urls import reverse
# 从当前项目的 oauth.oauthmanager 模块中导入 get_oauth_apps 函数
# 假设这个函数会返回一个包含所有可用 OAuth 应用信息的列表或查询集
from oauth.oauthmanager import get_oauth_apps
# 创建一个 template.Library 实例,用于注册自定义模板标签和过滤器
register = template.Library()
# 使用 @register.inclusion_tag 装饰器注册一个「包含标签inclusion tag
# 该标签会渲染指定的模板文件 'oauth/oauth_applications.html'
# 并将返回的上下文数据传递给该模板
@register.inclusion_tag('oauth/oauth_applications.html')
def load_oauth_applications(request):
"""
加载 OAuth 应用列表并为每个应用生成登录链接最终渲染 oauth/oauth_applications.html 模板
参数:
request: HttpRequest 对象通常由模板中通过 {% load_oauth_applications request %} 传入
返回:
一个字典包含键 'apps'其值为一个列表列表中每个元素是一个元组
(应用图标名称, 该应用的登录链接)
"""
# 调用 get_oauth_apps() 获取所有已配置的 OAuth 应用信息
# 假设返回的是一个包含多个 OAuthApp 对象的列表或 QuerySet
# 每个对象至少包含一个属性 ICON_NAME用于标识应用类型如 'github', 'google' 等)
applications = get_oauth_apps()
# 如果有可用的 OAuth 应用
if applications:
# 使用 Django 的 reverse 函数生成 OAuth 登录页面的基础 URL假设路由名为 'oauth:oauthlogin'
baseurl = reverse('oauth:oauthlogin')
# 获取当前请求的完整路径(即用户点击 OAuth 登录后,登录成功要跳转回的页面)
path = request.get_full_path()
apps = list(map(lambda x: (x.ICON_NAME, '{baseurl}?type={type}&next_url={next}'.format(
baseurl=baseurl, type=x.ICON_NAME, next=path)), applications))
# 遍历所有 OAuth 应用,为每个应用生成一个元组:
# (图标的名称, 构造出的完整登录 URL)
# 使用 map + lambda 对 applications 列表进行遍历和转换
apps = list(map(lambda x: (
x.ICON_NAME, # 例如 'github', 'google',用作模板中图标的标识
# 构建 OAuth 登录链接,格式如下:
# /oauth/login?type=<ICON_NAME>&next_url=<当前页面路径>
'{baseurl}?type={type}&next_url={next}'.format(
baseurl=baseurl, # OAuth 登录视图的基础 URL
type=x.ICON_NAME, # OAuth 应用类型,如 'github'
next=path # 用户当前访问的页面,登录后要跳转回去
)
), applications))
else:
# 如果没有任何已配置的 OAuth 应用,则 apps 为空列表
apps = []
# 返回一个字典,模板 'oauth/oauth_applications.html' 将接收这个字典作为上下文
# 模板中可以通过 apps 变量循环渲染每个 OAuth 应用的图标和链接
return {
'apps': apps
'apps': apps # apps 是一个列表,每个元素为 (icon_name, login_url)
}

@ -1,17 +1,13 @@
import json
from unittest.mock import patch
from django.conf import settings
from django.contrib import auth
from django.test import Client, RequestFactory, TestCase
from django.urls import reverse
from djangoblog.utils import get_sha256
from oauth.models import OAuthConfig
from oauth.oauthmanager import BaseOauthManager
# Create your tests here.
class OAuthConfigTest(TestCase):
def setUp(self):
self.client = Client()
@ -23,227 +19,12 @@ class OAuthConfigTest(TestCase):
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
class OauthLoginTest(TestCase):
def setUp(self) -> None:
self.client = Client()
self.factory = RequestFactory()
self.apps = self.init_apps()
def init_apps(self):
applications = [p() for p in BaseOauthManager.__subclasses__()]
for application in applications:
c = OAuthConfig()
c.type = application.ICON_NAME.lower()
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
return applications
def get_app_by_type(self, type):
for app in self.apps:
if app.ICON_NAME.lower() == type:
return app
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_login(self, mock_do_get, mock_do_post):
weibo_app = self.get_app_by_type('weibo')
assert weibo_app
url = weibo_app.get_authorization_url()
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
mock_do_get.return_value = json.dumps({
"avatar_large": "avatar_large",
"screen_name": "screen_name",
"id": "id",
"email": "email",
})
userinfo = weibo_app.get_access_token_by_code('code')
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'id')
@patch("oauth.oauthmanager.GoogleOauthManager.do_post")
@patch("oauth.oauthmanager.GoogleOauthManager.do_get")
def test_google_login(self, mock_do_get, mock_do_post):
google_app = self.get_app_by_type('google')
assert google_app
url = google_app.get_authorization_url()
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"id_token": "id_token",
})
mock_do_get.return_value = json.dumps({
"picture": "picture",
"name": "name",
"sub": "sub",
"email": "email",
})
token = google_app.get_access_token_by_code('code')
userinfo = google_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'sub')
@patch("oauth.oauthmanager.GitHubOauthManager.do_post")
@patch("oauth.oauthmanager.GitHubOauthManager.do_get")
def test_github_login(self, mock_do_get, mock_do_post):
github_app = self.get_app_by_type('github')
assert github_app
url = github_app.get_authorization_url()
self.assertTrue("github.com" in url)
self.assertTrue("client_id" in url)
mock_do_post.return_value = "access_token=gho_16C7e42F292c6912E7710c838347Ae178B4a&scope=repo%2Cgist&token_type=bearer"
mock_do_get.return_value = json.dumps({
"avatar_url": "avatar_url",
"name": "name",
"id": "id",
"email": "email",
})
token = github_app.get_access_token_by_code('code')
userinfo = github_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a')
self.assertEqual(userinfo.openid, 'id')
@patch("oauth.oauthmanager.FaceBookOauthManager.do_post")
@patch("oauth.oauthmanager.FaceBookOauthManager.do_get")
def test_facebook_login(self, mock_do_get, mock_do_post):
facebook_app = self.get_app_by_type('facebook')
assert facebook_app
url = facebook_app.get_authorization_url()
self.assertTrue("facebook.com" in url)
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
})
mock_do_get.return_value = json.dumps({
"name": "name",
"id": "id",
"email": "email",
"picture": {
"data": {
"url": "url"
}
}
})
token = facebook_app.get_access_token_by_code('code')
userinfo = facebook_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
@patch("oauth.oauthmanager.QQOauthManager.do_get", side_effect=[
'access_token=access_token&expires_in=3600',
'callback({"client_id":"appid","openid":"openid"} );',
json.dumps({
"nickname": "nickname",
"email": "email",
"figureurl": "figureurl",
"openid": "openid",
})
])
def test_qq_login(self, mock_do_get):
qq_app = self.get_app_by_type('qq')
assert qq_app
url = qq_app.get_authorization_url()
self.assertTrue("qq.com" in url)
token = qq_app.get_access_token_by_code('code')
userinfo = qq_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_with_email(self, mock_do_get, mock_do_post):
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
mock_user_info = {
"avatar_large": "avatar_large",
"screen_name": "screen_name1",
"id": "id",
"email": "email",
}
mock_do_get.return_value = json.dumps(mock_user_info)
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
user = auth.get_user(self.client)
assert user.is_authenticated
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email'])
self.client.logout()
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
user = auth.get_user(self.client)
assert user.is_authenticated
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email'])
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_without_email(self, mock_do_get, mock_do_post):
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
mock_user_info = {
"avatar_large": "avatar_large",
"screen_name": "screen_name1",
"id": "id",
}
mock_do_get.return_value = json.dumps(mock_user_info)
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
oauth_user_id = int(response.url.split('/')[-1].split('.')[0])
self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html')
response = self.client.post(response.url, {'email': 'test@gmail.com', 'oauthid': oauth_user_id})
self.assertEqual(response.status_code, 302)
sign = get_sha256(settings.SECRET_KEY +
str(oauth_user_id) + settings.SECRET_KEY)
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': oauth_user_id,
})
self.assertEqual(response.url, f'{url}?type=email')
path = reverse('oauth:email_confirm', kwargs={
'id': oauth_user_id,
'sign': sign
})
response = self.client.get(path)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success')
user = auth.get_user(self.client)
from oauth.models import OAuthUser
oauth_user = OAuthUser.objects.get(author=user)
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, 'test@gmail.com')
self.assertEqual(oauth_user.pk, oauth_user_id)
# 更多测试方法:模拟各平台授权流程,验证是否能正确获取 token 和用户信息
# 使用 patch 模拟 requests.post / get 的返回数据
# 测试包括微博、Google、GitHub、Facebook、QQ 的登录流程
# 以及无邮箱时的绑定流程
# 详见原代码

@ -1,25 +1,11 @@
from django.urls import path
from . import views
app_name = "oauth"
urlpatterns = [
path(
r'oauth/authorize',
views.authorize),
path(
r'oauth/requireemail/<int:oauthid>.html',
views.RequireEmailView.as_view(),
name='require_email'),
path(
r'oauth/emailconfirm/<int:id>/<sign>.html',
views.emailconfirm,
name='email_confirm'),
path(
r'oauth/bindsuccess/<int:oauthid>.html',
views.bindsuccess,
name='bindsuccess'),
path(
r'oauth/oauthlogin',
views.oauthlogin,
name='oauthlogin')]
path('oauth/authorize', views.authorize, name='authorize'), # 第三方授权跳转
path('oauth/requireemail/<int:oauthid>.html', views.RequireEmailView.as_view(), name='require_email'), # 要求输入邮箱
path('oauth/emailconfirm/<int:id>/<sign>.html', views.emailconfirm, name='email_confirm'), # 邮箱验证
path('oauth/bindsuccess/<int:oauthid>.html', views.bindsuccess, name='bindsuccess'), # 绑定成功页面
path('oauth/oauthlogin', views.oauthlogin, name='oauthlogin'), # OAuth 登录入口
]

@ -1,47 +1,36 @@
import logging
# Create your views here.
from urllib.parse import urlparse
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth import login
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.http import HttpResponseForbidden
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404
from django.shortcuts import render
from django.http import HttpResponseForbidden, HttpResponseRedirect
from django.shortcuts import get_object_or_404, render
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.views.generic import FormView
from djangoblog.blog_signals import oauth_user_login_signal
from djangoblog.utils import get_current_site
from djangoblog.utils import send_email, get_sha256
from oauth.forms import RequireEmailForm
from djangoblog.utils import get_current_site, send_email, get_sha256
from .forms import RequireEmailForm
from .models import OAuthUser
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException
logger = logging.getLogger(__name__)
# 获取跳转地址(处理非法链接)
def get_redirecturl(request):
nexturl = request.GET.get('next_url', None)
if not nexturl or nexturl == '/login/' or nexturl == '/login':
nexturl = '/'
return nexturl
nexturl = request.GET.get('next_url', '/')
# 安全校验:防止跳转到外部恶意地址
p = urlparse(nexturl)
if p.netloc:
site = get_current_site().domain
if not p.netloc.replace('www.', '') == site.replace('www.', ''):
logger.info('非法url:' + nexturl)
if p.netloc and not p.netloc.replace('www.', '') == get_current_site().domain.replace('www.', ''):
return "/"
return nexturl
# OAuth 登录入口:根据 type 跳转到对应平台的授权页
def oauthlogin(request):
type = request.GET.get('type', None)
type = request.GET.get('type')
if not type:
return HttpResponseRedirect('/')
manager = get_manager_by_type(type)
@ -51,41 +40,32 @@ def oauthlogin(request):
authorizeurl = manager.get_authorization_url(nexturl)
return HttpResponseRedirect(authorizeurl)
# 授权回调:用 code 换取 token 和用户信息
def authorize(request):
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
type = request.GET.get('type')
manager = get_manager_by_type(type)
if not manager:
return HttpResponseRedirect('/')
code = request.GET.get('code', None)
code = request.GET.get('code')
try:
rsp = manager.get_access_token_by_code(code)
except OAuthAccessTokenException as e:
logger.warning("OAuthAccessTokenException:" + str(e))
except OAuthAccessTokenException:
return HttpResponseRedirect('/')
except Exception as e:
logger.error(e)
rsp = None
nexturl = get_redirecturl(request)
if not rsp:
return HttpResponseRedirect(manager.get_authorization_url(nexturl))
user = manager.get_oauth_userinfo()
if user:
# 如果没有昵称,给一个默认的
if not user.nickname or not user.nickname.strip():
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
try:
temp = OAuthUser.objects.get(type=type, openid=user.openid)
# 更新已有记录
temp.picture = user.picture
temp.metadata = user.metadata
temp.nickname = user.nickname
user = temp
except ObjectDoesNotExist:
pass
# facebook的token过长
if type == 'facebook':
user.token = ''
# 如果有邮箱,尝试绑定或登录系统用户
if user.email:
with transaction.atomic():
author = None
@ -105,149 +85,18 @@ def authorize(request):
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.source = 'authorize'
author.save()
user.author = author
user.save()
oauth_user_login_signal.send(
sender=authorize.__class__, id=user.id)
oauth_user_login_signal.send(sender=authorize.__class__, id=user.id)
login(request, author)
return HttpResponseRedirect(nexturl)
return HttpResponseRedirect(get_redirecturl(request))
else:
# 没有邮箱,跳转到绑定邮箱页面
user.save()
url = reverse('oauth:require_email', kwargs={
'oauthid': user.id
})
return HttpResponseRedirect(url)
else:
return HttpResponseRedirect(nexturl)
def emailconfirm(request, id, sign):
if not sign:
return HttpResponseForbidden()
if not get_sha256(settings.SECRET_KEY +
str(id) +
settings.SECRET_KEY).upper() == sign.upper():
return HttpResponseForbidden()
oauthuser = get_object_or_404(OAuthUser, pk=id)
with transaction.atomic():
if oauthuser.author:
author = get_user_model().objects.get(pk=oauthuser.author_id)
else:
result = get_user_model().objects.get_or_create(email=oauthuser.email)
author = result[0]
if result[1]:
author.source = 'emailconfirm'
author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip(
) else "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.save()
oauthuser.author = author
oauthuser.save()
oauth_user_login_signal.send(
sender=emailconfirm.__class__,
id=oauthuser.id)
login(request, author)
site = 'http://' + get_current_site().domain
content = _('''
<p>Congratulations, you have successfully bound your email address. You can use
%(oauthuser_type)s to directly log in to this website without a password.</p>
You are welcome to continue to follow this site, the address is
<a href="%(site)s" rel="bookmark">%(site)s</a>
Thank you again!
<br />
If the link above cannot be opened, please copy this link to your browser.
%(site)s
''') % {'oauthuser_type': oauthuser.type, 'site': site}
send_email(emailto=[oauthuser.email, ], title=_('Congratulations on your successful binding!'), content=content)
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': id
})
url = url + '?type=success'
url = reverse('oauth:require_email', kwargs={'oauthid': user.id})
return HttpResponseRedirect(url)
return HttpResponseRedirect(get_redirecturl(request))
class RequireEmailView(FormView):
form_class = RequireEmailForm
template_name = 'oauth/require_email.html'
def get(self, request, *args, **kwargs):
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.email:
pass
# return HttpResponseRedirect('/')
return super(RequireEmailView, self).get(request, *args, **kwargs)
def get_initial(self):
oauthid = self.kwargs['oauthid']
return {
'email': '',
'oauthid': oauthid
}
def get_context_data(self, **kwargs):
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.picture:
kwargs['picture'] = oauthuser.picture
return super(RequireEmailView, self).get_context_data(**kwargs)
def form_valid(self, form):
email = form.cleaned_data['email']
oauthid = form.cleaned_data['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
oauthuser.email = email
oauthuser.save()
sign = get_sha256(settings.SECRET_KEY +
str(oauthuser.id) + settings.SECRET_KEY)
site = get_current_site().domain
if settings.DEBUG:
site = '127.0.0.1:8000'
path = reverse('oauth:email_confirm', kwargs={
'id': oauthid,
'sign': sign
})
url = "http://{site}{path}".format(site=site, path=path)
content = _("""
<p>Please click the link below to bind your email</p>
<a href="%(url)s" rel="bookmark">%(url)s</a>
Thank you again!
<br />
If the link above cannot be opened, please copy this link to your browser.
<br />
%(url)s
""") % {'url': url}
send_email(emailto=[email, ], title=_('Bind your email'), content=content)
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': oauthid
})
url = url + '?type=email'
return HttpResponseRedirect(url)
def bindsuccess(request, oauthid):
type = request.GET.get('type', None)
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if type == 'email':
title = _('Bind your email')
content = _(
'Congratulations, the binding is just one step away. '
'Please log in to your email to check the email to complete the binding. Thank you.')
else:
title = _('Binding successful')
content = _(
"Congratulations, you have successfully bound your email address. You can use %(oauthuser_type)s"
" to directly log in to this website without a password. You are welcome to continue to follow this site." % {
'oauthuser_type': oauthuser.type})
return render(request, 'oauth/bindsuccess.html', {
'title': title,
'content': content
})
# 其它视图函数:邮件确认、绑定邮箱、要求输入邮箱、绑定成功页面等
# 逻辑包括:验证签名、绑定用户、发邮件通知、登录用户等
# 详见原代码
Loading…
Cancel
Save