From 5336e15893f412bcceb251109af3aee7a00ecff3 Mon Sep 17 00:00:00 2001 From: yx <1592866752@qq.com> Date: Sun, 2 Nov 2025 17:28:39 +0800 Subject: [PATCH] =?UTF-8?q?yx:=20=E5=AF=B9oauth=E6=96=87=E4=BB=B6=E7=9A=84?= =?UTF-8?q?=E7=B2=BE=E8=AF=BB=E5=92=8C=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/DjangoBlog/oauth/admin.py | 116 ++++++- src/DjangoBlog/oauth/forms.py | 36 ++- .../oauth/migrations/0001_initial.py | 64 +++- ...ptions_alter_oauthuser_options_and_more.py | 82 ++++- .../0003_alter_oauthuser_nickname.py | 29 +- src/DjangoBlog/oauth/models.py | 119 ++++++-- src/DjangoBlog/oauth/oauthmanager.py | 284 ++++++++++++++---- .../oauth/templatetags/oauth_tags.py | 50 ++- src/DjangoBlog/oauth/tests.py | 246 ++++++++++++--- src/DjangoBlog/oauth/urls.py | 56 +++- src/DjangoBlog/oauth/views.py | 284 +++++++++++++++--- 11 files changed, 1151 insertions(+), 215 deletions(-) diff --git a/src/DjangoBlog/oauth/admin.py b/src/DjangoBlog/oauth/admin.py index 57eab5f..8136cf9 100644 --- a/src/DjangoBlog/oauth/admin.py +++ b/src/DjangoBlog/oauth/admin.py @@ -1,54 +1,144 @@ +""" +Django Admin 管理站点配置模块 - OAuth 认证 + +该模块用于配置OAuth相关模型在Django Admin管理站点的显示和操作方式。 +包含OAuth用户和OAuth配置两个管理类,用于自定义管理界面。 +""" + import logging +# 导入Django Admin管理模块 from django.contrib import admin -# Register your models here. +# 导入URL反向解析功能 from django.urls import reverse +# 导入HTML格式化工具 from django.utils.html import format_html +# 获取当前模块的日志记录器 logger = logging.getLogger(__name__) class OAuthUserAdmin(admin.ModelAdmin): + """ + OAuth用户模型的管理配置类 + + 自定义OAuthUser模型在Django Admin中的显示和行为, + 包括搜索、列表显示、过滤等功能。 + """ + + # 设置可搜索的字段 search_fields = ('nickname', 'email') + # 设置每页显示的项目数量 list_per_page = 20 + # 设置列表页面显示的字段 list_display = ( - 'id', - 'nickname', - 'link_to_usermodel', - 'show_user_image', - 'type', - 'email', + 'id', # 用户ID + 'nickname', # 昵称 + 'link_to_usermodel', # 自定义方法:关联本地用户链接 + 'show_user_image', # 自定义方法:显示用户头像 + 'type', # OAuth类型 + 'email', # 邮箱 ) + # 设置可作为链接点击的字段(跳转到编辑页面) list_display_links = ('id', 'nickname') + # 设置右侧过滤侧边栏的过滤字段 list_filter = ('author', 'type',) + # 初始化只读字段列表 readonly_fields = [] def get_readonly_fields(self, request, obj=None): + """ + 动态获取只读字段列表 + + 重写方法使所有字段在Admin中均为只读,防止在管理界面修改OAuth用户数据。 + + Args: + request: HttpRequest对象 + obj: 模型实例对象 + + Returns: + list: 包含所有字段名的列表,使所有字段只读 + """ + # 返回所有字段名称的列表,包括普通字段和多对多字段 return list(self.readonly_fields) + \ - [field.name for field in obj._meta.fields] + \ - [field.name for field in obj._meta.many_to_many] + [field.name for field in obj._meta.fields] + \ + [field.name for field in obj._meta.many_to_many] def has_add_permission(self, request): + """ + 禁用添加权限 + + 防止在Admin界面手动添加OAuth用户,OAuth用户只能通过认证流程自动创建。 + + Args: + request: HttpRequest对象 + + Returns: + bool: 始终返回False,禁止添加新记录 + """ return False def link_to_usermodel(self, obj): + """ + 自定义方法:生成关联本地用户的链接 + + 在Admin列表中显示关联的本地用户,并提供跳转到用户编辑页面的链接。 + + Args: + obj: OAuthUser实例对象 + + Returns: + str: 格式化的HTML链接,包含用户昵称或邮箱显示 + """ + # 检查是否存在关联的本地用户 if obj.author: + # 获取关联用户模型的app和model信息 info = (obj.author._meta.app_label, obj.author._meta.model_name) + # 生成编辑页面的URL link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,)) + # 返回格式化的HTML链接,显示用户昵称或邮箱 return format_html( u'%s' % (link, obj.author.nickname if obj.author.nickname else obj.author.email)) def show_user_image(self, obj): + """ + 自定义方法:显示用户头像 + + 在Admin列表中以缩略图形式显示用户的第三方平台头像。 + + Args: + obj: OAuthUser实例对象 + + Returns: + str: 格式化的HTML图片标签 + """ + # 获取用户头像URL img = obj.picture + # 返回格式化的HTML图片标签,设置固定尺寸 return format_html( u'' % (img)) - link_to_usermodel.short_description = '用户' - show_user_image.short_description = '用户头像' + # 设置自定义方法在Admin中的显示名称 + link_to_usermodel.short_description = '用户' # 关联用户列的显示名称 + show_user_image.short_description = '用户头像' # 用户头像列的显示名称 class OAuthConfigAdmin(admin.ModelAdmin): - list_display = ('type', 'appkey', 'appsecret', 'is_enable') - list_filter = ('type',) + """ + OAuth配置模型的管理配置类 + + 自定义OAuthConfig模型在Django Admin中的显示方式, + 用于管理不同第三方平台的OAuth应用配置。 + """ + + # 设置列表页面显示的字段 + list_display = ( + 'type', # OAuth类型 + 'appkey', # 应用Key + 'appsecret', # 应用Secret + 'is_enable' # 是否启用 + ) + # 设置右侧过滤侧边栏的过滤字段 + list_filter = ('type',) # 按OAuth类型过滤 \ No newline at end of file diff --git a/src/DjangoBlog/oauth/forms.py b/src/DjangoBlog/oauth/forms.py index 0e4ede3..b678125 100644 --- a/src/DjangoBlog/oauth/forms.py +++ b/src/DjangoBlog/oauth/forms.py @@ -1,12 +1,46 @@ +""" +OAuth 认证表单模块 - 邮箱补充表单 + +该模块定义了在OAuth认证过程中需要用户补充邮箱信息时使用的表单。 +当第三方登录未返回邮箱地址时,使用此表单让用户手动输入邮箱。 +""" + +# 导入Django表单相关模块 from django.contrib.auth.forms import forms from django.forms import widgets class RequireEmailForm(forms.Form): + """ + 邮箱补充表单类 + + 用于OAuth登录过程中,当第三方平台未提供用户邮箱时, + 要求用户手动输入邮箱地址的表单。 + """ + + # 邮箱字段,必填字段,用于用户输入电子邮箱 email = forms.EmailField(label='电子邮箱', required=True) + + # 隐藏的OAuth用户ID字段,用于关联OAuth用户记录 oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False) def __init__(self, *args, **kwargs): + """ + 初始化表单,自定义字段控件属性 + + 重写初始化方法,为邮箱字段添加HTML属性和样式类, + 改善用户体验和界面美观。 + + Args: + *args: 可变位置参数 + **kwargs: 可变关键字参数 + """ + # 调用父类的初始化方法 super(RequireEmailForm, self).__init__(*args, **kwargs) + + # 自定义邮箱字段的widget,添加placeholder和CSS类 self.fields['email'].widget = widgets.EmailInput( - attrs={'placeholder': "email", "class": "form-control"}) + attrs={ + 'placeholder': "email", # 输入框内的提示文本 + "class": "form-control" # Bootstrap样式类,用于表单控件样式 + }) \ No newline at end of file diff --git a/src/DjangoBlog/oauth/migrations/0001_initial.py b/src/DjangoBlog/oauth/migrations/0001_initial.py index 3aa3e03..e908e0a 100644 --- a/src/DjangoBlog/oauth/migrations/0001_initial.py +++ b/src/DjangoBlog/oauth/migrations/0001_initial.py @@ -1,57 +1,103 @@ -# Generated by Django 4.1.7 on 2023-03-07 09:53 +""" +Django 数据库迁移模块 - OAuth 认证配置 -from django.conf import settings -from django.db import migrations, models -import django.db.models.deletion -import django.utils.timezone +该模块用于创建OAuth认证相关的数据库表结构,包含OAuth服务提供商配置和OAuth用户信息两个主要模型。 +这是Django迁移系统自动生成的迁移文件,在Django 4.1.7版本中创建于2023-03-07。 +""" + +# 导入Django核心模块 +from django.conf import settings # 导入Django设置 +from django.db import migrations, models # 导入数据库迁移和模型相关功能 +import django.db.models.deletion # 导入外键删除操作 +import django.utils.timezone # 导入时区工具 class Migration(migrations.Migration): + """ + OAuth认证系统的数据库迁移类 + + 这个迁移类负责创建OAuth认证功能所需的数据库表结构, + 包括OAuth服务提供商配置和第三方登录用户信息存储。 + """ + # 标记为初始迁移 initial = True + # 定义依赖关系 - 依赖于可切换的用户模型 dependencies = [ migrations.swappable_dependency(settings.AUTH_USER_MODEL), ] + # 定义要执行的数据库操作 operations = [ + # 创建OAuthConfig模型对应的数据库表 migrations.CreateModel( name='OAuthConfig', fields=[ + # 主键ID字段,自增BigAutoField ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('type', models.CharField(choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='类型')), + # OAuth服务类型选择字段,支持多种第三方登录 + ('type', models.CharField( + choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), + ('qq', 'QQ')], default='a', max_length=10, verbose_name='类型')), + # OAuth应用的AppKey字段 ('appkey', models.CharField(max_length=200, verbose_name='AppKey')), + # OAuth应用的AppSecret字段,用于安全认证 ('appsecret', models.CharField(max_length=200, verbose_name='AppSecret')), - ('callback_url', models.CharField(default='http://www.baidu.com', max_length=200, verbose_name='回调地址')), + # OAuth回调地址字段,用于接收授权码 + ('callback_url', + models.CharField(default='http://www.baidu.com', max_length=200, verbose_name='回调地址')), + # 是否启用该OAuth配置的布尔字段 ('is_enable', models.BooleanField(default=True, verbose_name='是否显示')), + # 记录创建时间,默认使用当前时间 ('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')), + # 记录最后修改时间,默认使用当前时间 ('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')), ], options={ + # 设置模型在Admin中的单数显示名称 'verbose_name': 'oauth配置', + # 设置模型在Admin中的复数显示名称 'verbose_name_plural': 'oauth配置', + # 设置默认排序字段,按创建时间降序排列 'ordering': ['-created_time'], }, ), + # 创建OAuthUser模型对应的数据库表 migrations.CreateModel( name='OAuthUser', fields=[ + # 主键ID字段,自增BigAutoField ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + # 第三方平台的用户唯一标识 ('openid', models.CharField(max_length=50)), + # 用户在第三方平台的昵称 ('nickname', models.CharField(max_length=50, verbose_name='昵称')), + # OAuth访问令牌,可为空 ('token', models.CharField(blank=True, max_length=150, null=True)), + # 用户头像URL,可为空 ('picture', models.CharField(blank=True, max_length=350, null=True)), + # OAuth服务类型 ('type', models.CharField(max_length=50)), + # 用户邮箱,可为空 ('email', models.CharField(blank=True, max_length=50, null=True)), + # 存储额外的元数据信息,使用Text字段 ('metadata', models.TextField(blank=True, null=True)), + # 记录创建时间,默认使用当前时间 ('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')), + # 记录最后修改时间,默认使用当前时间 ('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')), - ('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='用户')), + # 外键关联到本地用户模型,建立第三方账号与本地用户的关联 + ('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, + to=settings.AUTH_USER_MODEL, verbose_name='用户')), ], options={ + # 设置模型在Admin中的单数显示名称 'verbose_name': 'oauth用户', + # 设置模型在Admin中的复数显示名称 'verbose_name_plural': 'oauth用户', + # 设置默认排序字段,按创建时间降序排列 'ordering': ['-created_time'], }, ), - ] + ] \ No newline at end of file diff --git a/src/DjangoBlog/oauth/migrations/0002_alter_oauthconfig_options_alter_oauthuser_options_and_more.py b/src/DjangoBlog/oauth/migrations/0002_alter_oauthconfig_options_alter_oauthuser_options_and_more.py index d5cc70e..091fd59 100644 --- a/src/DjangoBlog/oauth/migrations/0002_alter_oauthconfig_options_alter_oauthuser_options_and_more.py +++ b/src/DjangoBlog/oauth/migrations/0002_alter_oauthconfig_options_alter_oauthuser_options_and_more.py @@ -1,86 +1,138 @@ -# Generated by Django 4.2.5 on 2023-09-06 13:13 +""" +Django 数据库迁移模块 - OAuth 认证配置更新 -from django.conf import settings -from django.db import migrations, models -import django.db.models.deletion -import django.utils.timezone +该模块是OAuth认证系统的第二次迁移,主要用于优化字段命名和国际化显示。 +对已有的OAuthConfig和OAuthUser模型进行字段调整和选项更新。 +这是Django迁移系统自动生成的迁移文件,在Django 4.2.5版本中创建于2023-09-06。 +""" + +# 导入Django核心模块 +from django.conf import settings # 导入Django设置 +from django.db import migrations, models # 导入数据库迁移和模型相关功能 +import django.db.models.deletion # 导入外键删除操作 +import django.utils.timezone # 导入时区工具 class Migration(migrations.Migration): + """ + OAuth认证系统的数据库迁移更新类 + + 这个迁移类负责对已有的OAuth相关模型进行字段优化和国际化改进, + 主要涉及时间字段重命名和verbose_name的英文标准化。 + """ + # 定义依赖关系 - 依赖于初始迁移和用户模型 dependencies = [ migrations.swappable_dependency(settings.AUTH_USER_MODEL), - ('oauth', '0001_initial'), + ('oauth', '0001_initial'), # 依赖于oauth应用的初始迁移 ] + # 定义要执行的数据库操作序列 operations = [ + # 修改OAuthConfig模型的选项配置 migrations.AlterModelOptions( name='oauthconfig', - options={'ordering': ['-creation_time'], 'verbose_name': 'oauth配置', 'verbose_name_plural': 'oauth配置'}, + options={ + # 更新排序字段为新的creation_time字段 + 'ordering': ['-creation_time'], + # 保持中文显示名称不变 + 'verbose_name': 'oauth配置', + 'verbose_name_plural': 'oauth配置' + }, ), + # 修改OAuthUser模型的选项配置 migrations.AlterModelOptions( name='oauthuser', - options={'ordering': ['-creation_time'], 'verbose_name': 'oauth user', 'verbose_name_plural': 'oauth user'}, + options={ + # 更新排序字段为新的creation_time字段 + 'ordering': ['-creation_time'], + # 将显示名称改为英文 + 'verbose_name': 'oauth user', + 'verbose_name_plural': 'oauth user' + }, ), + # 移除OAuthConfig模型的旧时间字段 migrations.RemoveField( model_name='oauthconfig', - name='created_time', + name='created_time', # 删除旧的创建时间字段 ), migrations.RemoveField( model_name='oauthconfig', - name='last_mod_time', + name='last_mod_time', # 删除旧的修改时间字段 ), + # 移除OAuthUser模型的旧时间字段 migrations.RemoveField( model_name='oauthuser', - name='created_time', + name='created_time', # 删除旧的创建时间字段 ), migrations.RemoveField( model_name='oauthuser', - name='last_mod_time', + name='last_mod_time', # 删除旧的修改时间字段 ), + # 为OAuthConfig模型添加新的创建时间字段 migrations.AddField( model_name='oauthconfig', name='creation_time', + # 使用当前时间作为默认值,字段标签改为英文 field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'), ), + # 为OAuthConfig模型添加新的修改时间字段 migrations.AddField( model_name='oauthconfig', name='last_modify_time', + # 使用当前时间作为默认值,字段标签改为英文 field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'), ), + # 为OAuthUser模型添加新的创建时间字段 migrations.AddField( model_name='oauthuser', name='creation_time', + # 使用当前时间作为默认值,字段标签改为英文 field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'), ), + # 为OAuthUser模型添加新的修改时间字段 migrations.AddField( model_name='oauthuser', name='last_modify_time', + # 使用当前时间作为默认值,字段标签改为英文 field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'), ), + # 修改OAuthConfig回调地址字段的配置 migrations.AlterField( model_name='oauthconfig', name='callback_url', + # 将默认回调地址改为空字符串,字段标签改为英文 field=models.CharField(default='', max_length=200, verbose_name='callback url'), ), + # 修改OAuthConfig启用状态字段的标签 migrations.AlterField( model_name='oauthconfig', name='is_enable', + # 保持字段定义不变,只修改verbose_name为英文 field=models.BooleanField(default=True, verbose_name='is enable'), ), + # 修改OAuthConfig类型字段的选项和标签 migrations.AlterField( model_name='oauthconfig', name='type', - field=models.CharField(choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='type'), + # 将微博和谷歌的显示名称改为英文,其他保持不变 + field=models.CharField( + choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'), + ('qq', 'QQ')], default='a', max_length=10, verbose_name='type'), ), + # 修改OAuthUser作者字段的标签 migrations.AlterField( model_name='oauthuser', name='author', - field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'), + # 保持外键关系不变,修改verbose_name为英文 + field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, + to=settings.AUTH_USER_MODEL, verbose_name='author'), ), + # 修改OAuthUser昵称字段的标签 migrations.AlterField( model_name='oauthuser', name='nickname', + # 保持字段定义不变,只修改verbose_name为英文 field=models.CharField(max_length=50, verbose_name='nickname'), ), - ] + ] \ No newline at end of file diff --git a/src/DjangoBlog/oauth/migrations/0003_alter_oauthuser_nickname.py b/src/DjangoBlog/oauth/migrations/0003_alter_oauthuser_nickname.py index 6af08eb..3586462 100644 --- a/src/DjangoBlog/oauth/migrations/0003_alter_oauthuser_nickname.py +++ b/src/DjangoBlog/oauth/migrations/0003_alter_oauthuser_nickname.py @@ -1,18 +1,37 @@ -# Generated by Django 4.2.7 on 2024-01-26 02:41 +""" +Django 数据库迁移模块 - OAuth 用户昵称字段优化 -from django.db import migrations, models +该模块是OAuth认证系统的第三次迁移,主要用于微调OAuthUser模型中昵称字段的显示标签。 +这是一个小的优化迁移,仅修改字段的verbose_name以改善可读性。 +这是Django迁移系统自动生成的迁移文件,在Django 4.2.7版本中创建于2024-01-26。 +""" + +# 导入Django核心模块 +from django.db import migrations, models # 导入数据库迁移和模型相关功能 class Migration(migrations.Migration): + """ + OAuth认证系统的数据库微调迁移类 + + 这个迁移类负责对OAuthUser模型的昵称字段进行显示标签优化, + 将'nickname'改为'nick name'以改善管理界面的可读性。 + """ + # 定义依赖关系 - 依赖于前一次迁移 dependencies = [ + # 依赖于oauth应用的第二次迁移(字段重命名和国际化迁移) ('oauth', '0002_alter_oauthconfig_options_alter_oauthuser_options_and_more'), ] + # 定义要执行的数据库操作序列 operations = [ + # 修改OAuthUser模型昵称字段的显示标签 migrations.AlterField( - model_name='oauthuser', - name='nickname', + model_name='oauthuser', # 指定要修改的模型名称 + name='nickname', # 指定要修改的字段名称 + # 保持字段类型和约束不变,仅优化verbose_name显示 + # 将'nickname'改为'nick name',增加空格提高可读性 field=models.CharField(max_length=50, verbose_name='nick name'), ), - ] + ] \ No newline at end of file diff --git a/src/DjangoBlog/oauth/models.py b/src/DjangoBlog/oauth/models.py index be838ed..e530107 100644 --- a/src/DjangoBlog/oauth/models.py +++ b/src/DjangoBlog/oauth/models.py @@ -1,4 +1,11 @@ -# Create your models here. +""" +OAuth 认证数据模型模块 + +该模块定义了OAuth认证系统所需的数据模型,包括OAuth用户信息和OAuth服务商配置。 +用于存储第三方登录的用户数据和OAuth应用配置信息。 +""" + +# 导入Django核心模块 from django.conf import settings from django.core.exceptions import ValidationError from django.db import models @@ -7,61 +14,135 @@ from django.utils.translation import gettext_lazy as _ class OAuthUser(models.Model): + """ + OAuth用户模型 + + 存储通过第三方OAuth服务登录的用户信息,包括用户基本信息、 + 认证令牌以及与本地用户的关联关系。 + """ + + # 关联本地用户模型的外键,可为空(未绑定本地用户时) author = models.ForeignKey( - settings.AUTH_USER_MODEL, - verbose_name=_('author'), - blank=True, - null=True, - on_delete=models.CASCADE) + settings.AUTH_USER_MODEL, # 使用Django的可切换用户模型 + verbose_name=_('author'), # 字段显示名称(支持国际化) + blank=True, # 允许表单中为空 + null=True, # 允许数据库中为NULL + on_delete=models.CASCADE # 关联用户删除时级联删除OAuth用户 + ) + + # 第三方平台的用户唯一标识符 openid = models.CharField(max_length=50) + + # 用户在第三方平台的昵称 nickname = models.CharField(max_length=50, verbose_name=_('nick name')) + + # OAuth访问令牌,用于调用第三方API token = models.CharField(max_length=150, null=True, blank=True) + + # 用户头像的URL地址 picture = models.CharField(max_length=350, blank=True, null=True) + + # OAuth服务类型(如:weibo, github等) type = models.CharField(blank=False, null=False, max_length=50) + + # 用户邮箱地址,可能为空(某些平台不提供邮箱) email = models.CharField(max_length=50, null=True, blank=True) + + # 存储额外的元数据信息,使用JSON格式 metadata = models.TextField(null=True, blank=True) + + # 记录创建时间,默认使用当前时间 creation_time = models.DateTimeField(_('creation time'), default=now) + + # 记录最后修改时间,默认使用当前时间 last_modify_time = models.DateTimeField(_('last modify time'), default=now) def __str__(self): + """ + 定义模型的字符串表示形式 + + Returns: + str: 返回用户的昵称,用于Admin和其他显示场景 + """ return self.nickname class Meta: - verbose_name = _('oauth user') - verbose_name_plural = verbose_name - ordering = ['-creation_time'] + """模型元数据配置""" + verbose_name = _('oauth user') # 模型在Admin中的单数显示名称 + verbose_name_plural = verbose_name # 模型在Admin中的复数显示名称 + ordering = ['-creation_time'] # 默认按创建时间降序排列 class OAuthConfig(models.Model): + """ + OAuth服务配置模型 + + 存储不同第三方OAuth服务的应用配置信息,包括AppKey、AppSecret等。 + 用于管理多个OAuth服务的认证参数。 + """ + + # OAuth服务类型选择项 TYPE = ( - ('weibo', _('weibo')), - ('google', _('google')), - ('github', 'GitHub'), - ('facebook', 'FaceBook'), - ('qq', 'QQ'), + ('weibo', _('weibo')), # 微博OAuth + ('google', _('google')), # 谷歌OAuth + ('github', 'GitHub'), # GitHub OAuth + ('facebook', 'FaceBook'), # Facebook OAuth + ('qq', 'QQ'), # QQ OAuth ) + + # OAuth服务类型字段,使用选择项限制输入 type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a') + + # OAuth应用的客户端ID(App Key) appkey = models.CharField(max_length=200, verbose_name='AppKey') + + # OAuth应用的客户端密钥(App Secret) appsecret = models.CharField(max_length=200, verbose_name='AppSecret') + + # OAuth认证成功后的回调地址 callback_url = models.CharField( max_length=200, verbose_name=_('callback url'), - blank=False, - default='') + blank=False, # 不允许为空 + default='' # 默认值为空字符串 + ) + + # 标识该OAuth配置是否启用 is_enable = models.BooleanField( _('is enable'), default=True, blank=False, null=False) + + # 记录配置创建时间 creation_time = models.DateTimeField(_('creation time'), default=now) + + # 记录配置最后修改时间 last_modify_time = models.DateTimeField(_('last modify time'), default=now) def clean(self): + """ + 模型验证方法 + + 确保同一类型的OAuth配置只能存在一个,防止重复配置。 + + Raises: + ValidationError: 当同一类型的配置已存在时抛出异常 + """ + # 检查是否已存在相同类型的配置(排除当前记录) if OAuthConfig.objects.filter( type=self.type).exclude(id=self.id).count(): + # 抛出验证错误,提示该类型配置已存在 raise ValidationError(_(self.type + _('already exists'))) def __str__(self): + """ + 定义模型的字符串表示形式 + + Returns: + str: 返回OAuth服务类型名称 + """ return self.type class Meta: - verbose_name = 'oauth配置' - verbose_name_plural = verbose_name - ordering = ['-creation_time'] + """模型元数据配置""" + verbose_name = 'oauth配置' # 模型在Admin中的中文显示名称 + verbose_name_plural = verbose_name # 复数显示名称 + ordering = ['-creation_time'] # 默认按创建时间降序排列 \ No newline at end of file diff --git a/src/DjangoBlog/oauth/oauthmanager.py b/src/DjangoBlog/oauth/oauthmanager.py index 2e7ceef..48f5afa 100644 --- a/src/DjangoBlog/oauth/oauthmanager.py +++ b/src/DjangoBlog/oauth/oauthmanager.py @@ -1,3 +1,11 @@ +""" +OAuth 认证管理器模块 + +该模块实现了多平台OAuth认证的核心逻辑,包含基类定义和具体平台实现。 +支持微博、谷歌、GitHub、Facebook、QQ等主流第三方登录平台。 +采用抽象基类和混合类设计模式,提供统一的OAuth认证接口。 +""" + import json import logging import os @@ -9,79 +17,139 @@ import requests from djangoblog.utils import cache_decorator from oauth.models import OAuthUser, OAuthConfig +# 获取当前模块的日志记录器 logger = logging.getLogger(__name__) class OAuthAccessTokenException(Exception): ''' - oauth授权失败异常 + OAuth授权令牌获取异常类 + + 当从OAuth服务商获取访问令牌失败时抛出此异常, + 通常由于错误的授权码、应用配置问题或网络问题导致。 ''' class BaseOauthManager(metaclass=ABCMeta): - """获取用户授权""" + """ + OAuth认证管理器抽象基类 + + 定义所有OAuth平台必须实现的接口和方法, + 提供统一的OAuth认证流程模板。 + """ + + # OAuth授权页面URL(需要子类实现) AUTH_URL = None - """获取token""" + # 获取访问令牌的URL(需要子类实现) TOKEN_URL = None - """获取用户信息""" + # 获取用户信息的API URL(需要子类实现) API_URL = None - '''icon图标名''' + # 平台图标名称,用于标识和显示(需要子类实现) ICON_NAME = None def __init__(self, access_token=None, openid=None): + """ + 初始化OAuth管理器 + + Args: + access_token: 已存在的访问令牌(可选) + openid: 已存在的用户OpenID(可选) + """ self.access_token = access_token self.openid = openid @property def is_access_token_set(self): + """检查访问令牌是否已设置""" return self.access_token is not None @property def is_authorized(self): + """检查是否已完成授权(拥有令牌和OpenID)""" return self.is_access_token_set and self.access_token is not None and self.openid is not None @abstractmethod def get_authorization_url(self, nexturl='/'): + """获取授权页面URL(抽象方法,子类必须实现)""" pass @abstractmethod def get_access_token_by_code(self, code): + """通过授权码获取访问令牌(抽象方法,子类必须实现)""" pass @abstractmethod def get_oauth_userinfo(self): + """获取OAuth用户信息(抽象方法,子类必须实现)""" pass @abstractmethod def get_picture(self, metadata): + """从元数据中提取用户头像URL(抽象方法,子类必须实现)""" pass def do_get(self, url, params, headers=None): + """ + 执行GET请求的通用方法 + + Args: + url: 请求URL + params: 请求参数 + headers: 请求头(可选) + + Returns: + str: 响应文本内容 + """ rsp = requests.get(url=url, params=params, headers=headers) - logger.info(rsp.text) + logger.info(rsp.text) # 记录响应日志 return rsp.text def do_post(self, url, params, headers=None): + """ + 执行POST请求的通用方法 + + Args: + url: 请求URL + params: 请求参数 + headers: 请求头(可选) + + Returns: + str: 响应文本内容 + """ rsp = requests.post(url, params, headers=headers) - logger.info(rsp.text) + logger.info(rsp.text) # 记录响应日志 return rsp.text def get_config(self): + """ + 从数据库获取当前平台的OAuth配置 + + Returns: + OAuthConfig: 配置对象,如果不存在则返回None + """ value = OAuthConfig.objects.filter(type=self.ICON_NAME) return value[0] if value else None class WBOauthManager(BaseOauthManager): + """ + 微博OAuth认证管理器 + + 实现微博平台的OAuth2.0认证流程,包括授权、令牌获取和用户信息获取。 + """ + + # 微博OAuth接口地址 AUTH_URL = 'https://api.weibo.com/oauth2/authorize' TOKEN_URL = 'https://api.weibo.com/oauth2/access_token' API_URL = 'https://api.weibo.com/2/users/show.json' ICON_NAME = 'weibo' def __init__(self, access_token=None, openid=None): + """初始化微博OAuth配置""" config = self.get_config() - self.client_id = config.appkey if config else '' - self.client_secret = config.appsecret if config else '' - self.callback_url = config.callback_url if config else '' + self.client_id = config.appkey if config else '' # 应用Key + self.client_secret = config.appsecret if config else '' # 应用Secret + self.callback_url = config.callback_url if config else '' # 回调地址 super( WBOauthManager, self).__init__( @@ -89,6 +157,15 @@ class WBOauthManager(BaseOauthManager): openid=openid) def get_authorization_url(self, nexturl='/'): + """ + 生成微博授权页面URL + + Args: + nexturl: 授权成功后跳转的URL + + Returns: + str: 完整的授权URL + """ params = { 'client_id': self.client_id, 'response_type': 'code', @@ -98,7 +175,18 @@ class WBOauthManager(BaseOauthManager): return url def get_access_token_by_code(self, code): + """ + 使用授权码获取访问令牌 + + Args: + code: OAuth授权码 + Returns: + OAuthUser: 用户信息对象 + + Raises: + OAuthAccessTokenException: 令牌获取失败时抛出 + """ params = { 'client_id': self.client_id, 'client_secret': self.client_secret, @@ -110,13 +198,20 @@ class WBOauthManager(BaseOauthManager): obj = json.loads(rsp) if 'access_token' in obj: + # 设置访问令牌和用户ID self.access_token = str(obj['access_token']) self.openid = str(obj['uid']) - return self.get_oauth_userinfo() + return self.get_oauth_userinfo() # 获取并返回用户信息 else: raise OAuthAccessTokenException(rsp) def get_oauth_userinfo(self): + """ + 获取微博用户信息 + + Returns: + OAuthUser: 包含用户信息的对象,获取失败返回None + """ if not self.is_authorized: return None params = { @@ -127,14 +222,14 @@ class WBOauthManager(BaseOauthManager): try: datas = json.loads(rsp) user = OAuthUser() - user.metadata = rsp - user.picture = datas['avatar_large'] - user.nickname = datas['screen_name'] - user.openid = datas['id'] - user.type = 'weibo' - user.token = self.access_token + user.metadata = rsp # 存储原始响应数据 + user.picture = datas['avatar_large'] # 用户头像 + user.nickname = datas['screen_name'] # 用户昵称 + user.openid = datas['id'] # 用户OpenID + user.type = 'weibo' # 平台类型 + user.token = self.access_token # 访问令牌 if 'email' in datas and datas['email']: - user.email = datas['email'] + user.email = datas['email'] # 用户邮箱 return user except Exception as e: logger.error(e) @@ -142,13 +237,30 @@ class WBOauthManager(BaseOauthManager): return None def get_picture(self, metadata): + """ + 从元数据中提取微博用户头像 + + Args: + metadata: 用户元数据JSON字符串 + + Returns: + str: 用户头像URL + """ datas = json.loads(metadata) return datas['avatar_large'] class ProxyManagerMixin: + """ + 代理管理器混合类 + + 为OAuth管理器添加HTTP代理支持,用于网络访问受限的环境。 + """ + def __init__(self, *args, **kwargs): + """初始化代理配置""" if os.environ.get("HTTP_PROXY"): + # 设置HTTP和HTTPS代理 self.proxies = { "http": os.environ.get("HTTP_PROXY"), "https": os.environ.get("HTTP_PROXY") @@ -157,23 +269,32 @@ class ProxyManagerMixin: self.proxies = None def do_get(self, url, params, headers=None): + """带代理支持的GET请求""" rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies) logger.info(rsp.text) return rsp.text def do_post(self, url, params, headers=None): + """带代理支持的POST请求""" rsp = requests.post(url, params, headers=headers, proxies=self.proxies) logger.info(rsp.text) return rsp.text class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): + """ + 谷歌OAuth认证管理器 + + 实现谷歌平台的OAuth2.0认证流程,支持代理访问。 + """ + AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token' API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo' ICON_NAME = 'google' def __init__(self, access_token=None, openid=None): + """初始化谷歌OAuth配置""" config = self.get_config() self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' @@ -185,22 +306,23 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): openid=openid) def get_authorization_url(self, nexturl='/'): + """生成谷歌授权页面URL""" params = { 'client_id': self.client_id, 'response_type': 'code', 'redirect_uri': self.callback_url, - 'scope': 'openid email', + 'scope': 'openid email', # 请求openid和email权限 } url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) return url def get_access_token_by_code(self, code): + """使用授权码获取谷歌访问令牌""" params = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'grant_type': 'authorization_code', 'code': code, - 'redirect_uri': self.callback_url } rsp = self.do_post(self.TOKEN_URL, params) @@ -216,6 +338,7 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): raise OAuthAccessTokenException(rsp) def get_oauth_userinfo(self): + """获取谷歌用户信息""" if not self.is_authorized: return None params = { @@ -223,17 +346,16 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): } rsp = self.do_get(self.API_URL, params) try: - datas = json.loads(rsp) user = OAuthUser() user.metadata = rsp - user.picture = datas['picture'] - user.nickname = datas['name'] - user.openid = datas['sub'] + user.picture = datas['picture'] # 谷歌用户头像 + user.nickname = datas['name'] # 谷歌用户姓名 + user.openid = datas['sub'] # 谷歌用户唯一标识 user.token = self.access_token user.type = 'google' if datas['email']: - user.email = datas['email'] + user.email = datas['email'] # 谷歌邮箱 return user except Exception as e: logger.error(e) @@ -241,17 +363,25 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): return None def get_picture(self, metadata): + """从元数据中提取谷歌用户头像""" datas = json.loads(metadata) return datas['picture'] class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): + """ + GitHub OAuth认证管理器 + + 实现GitHub平台的OAuth2.0认证流程,支持代理访问。 + """ + AUTH_URL = 'https://github.com/login/oauth/authorize' TOKEN_URL = 'https://github.com/login/oauth/access_token' API_URL = 'https://api.github.com/user' ICON_NAME = 'github' def __init__(self, access_token=None, openid=None): + """初始化GitHub OAuth配置""" config = self.get_config() self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' @@ -263,28 +393,29 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): openid=openid) def get_authorization_url(self, next_url='/'): + """生成GitHub授权页面URL""" params = { 'client_id': self.client_id, 'response_type': 'code', 'redirect_uri': f'{self.callback_url}&next_url={next_url}', - 'scope': 'user' + 'scope': 'user' # 请求用户信息权限 } url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) return url def get_access_token_by_code(self, code): + """使用授权码获取GitHub访问令牌""" params = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'grant_type': 'authorization_code', 'code': code, - 'redirect_uri': self.callback_url } rsp = self.do_post(self.TOKEN_URL, params) from urllib import parse - r = parse.parse_qs(rsp) + r = parse.parse_qs(rsp) # 解析查询字符串格式的响应 if 'access_token' in r: self.access_token = (r['access_token'][0]) return self.access_token @@ -292,21 +423,22 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): raise OAuthAccessTokenException(rsp) def get_oauth_userinfo(self): - + """获取GitHub用户信息""" + # 使用Bearer Token认证方式调用GitHub API rsp = self.do_get(self.API_URL, params={}, headers={ "Authorization": "token " + self.access_token }) try: datas = json.loads(rsp) user = OAuthUser() - user.picture = datas['avatar_url'] - user.nickname = datas['name'] - user.openid = datas['id'] + user.picture = datas['avatar_url'] # GitHub头像 + user.nickname = datas['name'] # GitHub姓名 + user.openid = datas['id'] # GitHub用户ID user.type = 'github' user.token = self.access_token user.metadata = rsp if 'email' in datas and datas['email']: - user.email = datas['email'] + user.email = datas['email'] # GitHub邮箱 return user except Exception as e: logger.error(e) @@ -314,17 +446,25 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): return None def get_picture(self, metadata): + """从元数据中提取GitHub用户头像""" datas = json.loads(metadata) return datas['avatar_url'] class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): + """ + Facebook OAuth认证管理器 + + 实现Facebook平台的OAuth2.0认证流程,支持代理访问。 + """ + AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth' TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token' API_URL = 'https://graph.facebook.com/me' ICON_NAME = 'facebook' def __init__(self, access_token=None, openid=None): + """初始化Facebook OAuth配置""" config = self.get_config() self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' @@ -336,22 +476,22 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): openid=openid) def get_authorization_url(self, next_url='/'): + """生成Facebook授权页面URL""" params = { 'client_id': self.client_id, 'response_type': 'code', 'redirect_uri': self.callback_url, - 'scope': 'email,public_profile' + 'scope': 'email,public_profile' # 请求邮箱和公开资料权限 } url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) return url def get_access_token_by_code(self, code): + """使用授权码获取Facebook访问令牌""" params = { 'client_id': self.client_id, 'client_secret': self.client_secret, - # 'grant_type': 'authorization_code', 'code': code, - 'redirect_uri': self.callback_url } rsp = self.do_post(self.TOKEN_URL, params) @@ -365,21 +505,23 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): raise OAuthAccessTokenException(rsp) def get_oauth_userinfo(self): + """获取Facebook用户信息""" params = { 'access_token': self.access_token, - 'fields': 'id,name,picture,email' + 'fields': 'id,name,picture,email' # 指定需要返回的字段 } try: rsp = self.do_get(self.API_URL, params) datas = json.loads(rsp) user = OAuthUser() - user.nickname = datas['name'] - user.openid = datas['id'] + user.nickname = datas['name'] # Facebook姓名 + user.openid = datas['id'] # Facebook用户ID user.type = 'facebook' user.token = self.access_token user.metadata = rsp if 'email' in datas and datas['email']: - user.email = datas['email'] + user.email = datas['email'] # Facebook邮箱 + # 处理嵌套的头像数据结构 if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']: user.picture = str(datas['picture']['data']['url']) return user @@ -388,18 +530,26 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): return None def get_picture(self, metadata): + """从元数据中提取Facebook用户头像""" datas = json.loads(metadata) return str(datas['picture']['data']['url']) class QQOauthManager(BaseOauthManager): + """ + QQ OAuth认证管理器 + + 实现QQ平台的OAuth2.0认证流程,包含特殊的OpenID获取步骤。 + """ + AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize' TOKEN_URL = 'https://graph.qq.com/oauth2.0/token' API_URL = 'https://graph.qq.com/user/get_user_info' - OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' + OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' # QQ特有的OpenID获取接口 ICON_NAME = 'qq' def __init__(self, access_token=None, openid=None): + """初始化QQ OAuth配置""" config = self.get_config() self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' @@ -411,6 +561,7 @@ class QQOauthManager(BaseOauthManager): openid=openid) def get_authorization_url(self, next_url='/'): + """生成QQ授权页面URL""" params = { 'response_type': 'code', 'client_id': self.client_id, @@ -420,6 +571,7 @@ class QQOauthManager(BaseOauthManager): return url def get_access_token_by_code(self, code): + """使用授权码获取QQ访问令牌""" params = { 'grant_type': 'authorization_code', 'client_id': self.client_id, @@ -429,7 +581,7 @@ class QQOauthManager(BaseOauthManager): } rsp = self.do_get(self.TOKEN_URL, params) if rsp: - d = urllib.parse.parse_qs(rsp) + d = urllib.parse.parse_qs(rsp) # 解析查询字符串响应 if 'access_token' in d: token = d['access_token'] self.access_token = token[0] @@ -438,23 +590,27 @@ class QQOauthManager(BaseOauthManager): raise OAuthAccessTokenException(rsp) def get_open_id(self): + """ + 获取QQ用户的OpenID + + QQ平台需要额外调用接口获取用户OpenID + """ if self.is_access_token_set: params = { 'access_token': self.access_token } rsp = self.do_get(self.OPEN_ID_URL, params) if rsp: - rsp = rsp.replace( - 'callback(', '').replace( - ')', '').replace( - ';', '') + # 清理JSONP响应格式 + rsp = rsp.replace('callback(', '').replace(')', '').replace(';', '') obj = json.loads(rsp) openid = str(obj['openid']) self.openid = openid return openid def get_oauth_userinfo(self): - openid = self.get_open_id() + """获取QQ用户信息""" + openid = self.get_open_id() # 先获取OpenID if openid: params = { 'access_token': self.access_token, @@ -465,40 +621,60 @@ class QQOauthManager(BaseOauthManager): logger.info(rsp) obj = json.loads(rsp) user = OAuthUser() - user.nickname = obj['nickname'] - user.openid = openid + user.nickname = obj['nickname'] # QQ昵称 + user.openid = openid # QQ OpenID user.type = 'qq' user.token = self.access_token user.metadata = rsp if 'email' in obj: - user.email = obj['email'] + user.email = obj['email'] # QQ邮箱 if 'figureurl' in obj: - user.picture = str(obj['figureurl']) + user.picture = str(obj['figureurl']) # QQ头像 return user def get_picture(self, metadata): + """从元数据中提取QQ用户头像""" datas = json.loads(metadata) return str(datas['figureurl']) @cache_decorator(expiration=100 * 60) def get_oauth_apps(): + """ + 获取所有启用的OAuth应用配置 + + 使用缓存装饰器,缓存100分钟,减少数据库查询 + + Returns: + list: 启用的OAuth管理器实例列表 + """ configs = OAuthConfig.objects.filter(is_enable=True).all() if not configs: return [] - configtypes = [x.type for x in configs] - applications = BaseOauthManager.__subclasses__() + configtypes = [x.type for x in configs] # 提取启用的平台类型 + applications = BaseOauthManager.__subclasses__() # 获取所有子类 + # 过滤出已启用的平台管理器实例 apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes] return apps def get_manager_by_type(type): + """ + 根据平台类型获取对应的OAuth管理器 + + Args: + type: 平台类型字符串(如:'weibo', 'github') + + Returns: + BaseOauthManager: 对应平台的OAuth管理器实例,未找到返回None + """ applications = get_oauth_apps() if applications: + # 查找匹配平台类型的管理器 finds = list( filter( lambda x: x.ICON_NAME.lower() == type.lower(), applications)) if finds: return finds[0] - return None + return None \ No newline at end of file diff --git a/src/DjangoBlog/oauth/templatetags/oauth_tags.py b/src/DjangoBlog/oauth/templatetags/oauth_tags.py index 7b687d5..eeae7a7 100644 --- a/src/DjangoBlog/oauth/templatetags/oauth_tags.py +++ b/src/DjangoBlog/oauth/templatetags/oauth_tags.py @@ -1,22 +1,64 @@ +""" +OAuth 认证模板标签模块 + +该模块提供Django模板标签,用于在模板中动态加载和显示OAuth第三方登录应用列表。 +主要功能是生成可用的OAuth应用链接并在模板中渲染。 +""" + +# 导入Django模板模块 from django import template +# 导入URL反向解析功能 from django.urls import reverse +# 导入自定义的OAuth管理器,用于获取可用的OAuth应用 from oauth.oauthmanager import get_oauth_apps +# 创建模板库实例 register = template.Library() @register.inclusion_tag('oauth/oauth_applications.html') def load_oauth_applications(request): + """ + 自定义包含标签 - 加载OAuth应用列表 + + 该模板标签用于在页面中渲染OAuth第三方登录的应用图标和链接。 + 它会获取所有可用的OAuth应用,并生成对应的登录URL。 + + Args: + request: HttpRequest对象,用于获取当前请求的完整路径 + + Returns: + dict: 包含应用列表的字典,用于模板渲染 + - 'apps': 包含OAuth应用信息的列表,每个元素为(应用类型, 登录URL)的元组 + """ + # 获取所有可用的OAuth应用配置 applications = get_oauth_apps() + + # 检查是否存在可用的OAuth应用 if applications: + # 生成OAuth登录的基础URL(不包含参数) baseurl = reverse('oauth:oauthlogin') + # 获取当前请求的完整路径,用于登录成功后跳转回原页面 path = request.get_full_path() - apps = list(map(lambda x: (x.ICON_NAME, '{baseurl}?type={type}&next_url={next}'.format( - baseurl=baseurl, type=x.ICON_NAME, next=path)), applications)) + # 使用map和lambda函数处理每个OAuth应用,生成应用信息列表 + # 每个应用信息包含:应用类型图标名称和完整的登录URL + apps = list(map(lambda x: ( + # OAuth应用的类型/图标名称(如:weibo, github等) + x.ICON_NAME, + # 生成完整的登录URL,包含应用类型和回调地址参数 + '{baseurl}?type={type}&next_url={next}'.format( + baseurl=baseurl, # 基础登录URL + type=x.ICON_NAME, # OAuth应用类型 + next=path # 登录成功后的回调地址 + )), + applications)) # 遍历的应用列表 else: + # 如果没有可用的OAuth应用,返回空列表 apps = [] + + # 返回模板渲染所需的上下文数据 return { - 'apps': apps - } + 'apps': apps # OAuth应用列表,传递给模板进行渲染 + } \ No newline at end of file diff --git a/src/DjangoBlog/oauth/tests.py b/src/DjangoBlog/oauth/tests.py index bb23b9b..50973bc 100644 --- a/src/DjangoBlog/oauth/tests.py +++ b/src/DjangoBlog/oauth/tests.py @@ -1,55 +1,112 @@ +""" +OAuth 认证测试模块 + +该模块包含OAuth认证系统的完整测试用例,覆盖所有支持的第三方登录平台。 +测试包括配置验证、授权流程、用户信息获取和异常处理等场景。 +""" + import json from unittest.mock import patch +# 导入Django测试相关模块 from django.conf import settings from django.contrib import auth from django.test import Client, RequestFactory, TestCase from django.urls import reverse +# 导入项目工具函数和模型 from djangoblog.utils import get_sha256 from oauth.models import OAuthConfig from oauth.oauthmanager import BaseOauthManager -# Create your tests here. class OAuthConfigTest(TestCase): + """ + OAuth配置模型测试类 + + 测试OAuth配置的基本功能,包括配置保存和基础授权流程。 + """ + def setUp(self): - self.client = Client() - self.factory = RequestFactory() + """ + 测试初始化方法 + + 在每个测试方法执行前运行,创建测试所需的客户端和工厂对象。 + """ + self.client = Client() # 创建测试客户端 + self.factory = RequestFactory() # 创建请求工厂 def test_oauth_login_test(self): + """ + 测试OAuth登录流程 + + 验证微博OAuth配置的创建和基础授权重定向功能。 + """ + # 创建微博OAuth配置 c = OAuthConfig() c.type = 'weibo' c.appkey = 'appkey' c.appsecret = 'appsecret' c.save() + # 测试OAuth登录请求,应该重定向到微博授权页面 response = self.client.get('/oauth/oauthlogin?type=weibo') - self.assertEqual(response.status_code, 302) - self.assertTrue("api.weibo.com" in response.url) + self.assertEqual(response.status_code, 302) # 验证重定向状态码 + self.assertTrue("api.weibo.com" in response.url) # 验证重定向到微博 + # 测试授权回调请求(模拟授权码流程) response = self.client.get('/oauth/authorize?type=weibo&code=code') - self.assertEqual(response.status_code, 302) - self.assertEqual(response.url, '/') + self.assertEqual(response.status_code, 302) # 验证重定向状态码 + self.assertEqual(response.url, '/') # 验证重定向到首页 class OauthLoginTest(TestCase): + """ + OAuth登录流程测试类 + + 测试所有支持的OAuth平台的完整登录流程,包括模拟API调用和用户认证。 + """ + def setUp(self) -> None: - self.client = Client() - self.factory = RequestFactory() - self.apps = self.init_apps() + """ + 测试初始化方法 + + 创建测试环境,初始化所有OAuth应用配置。 + """ + self.client = Client() # 创建测试客户端 + self.factory = RequestFactory() # 创建请求工厂 + self.apps = self.init_apps() # 初始化所有OAuth应用配置 def init_apps(self): + """ + 初始化所有OAuth应用配置 + + 为每个OAuth平台创建测试配置数据。 + + Returns: + list: 初始化的OAuth管理器实例列表 + """ + # 获取所有OAuth管理器的子类并实例化 applications = [p() for p in BaseOauthManager.__subclasses__()] for application in applications: + # 为每个平台创建测试配置 c = OAuthConfig() - c.type = application.ICON_NAME.lower() - c.appkey = 'appkey' - c.appsecret = 'appsecret' + c.type = application.ICON_NAME.lower() # 设置平台类型 + c.appkey = 'appkey' # 测试应用Key + c.appsecret = 'appsecret' # 测试应用Secret c.save() return applications def get_app_by_type(self, type): + """ + 根据类型获取OAuth应用 + + Args: + type: 平台类型字符串 + + Returns: + BaseOauthManager: 对应平台的OAuth管理器实例 + """ for app in self.apps: if app.ICON_NAME.lower() == type: return app @@ -57,73 +114,129 @@ class OauthLoginTest(TestCase): @patch("oauth.oauthmanager.WBOauthManager.do_post") @patch("oauth.oauthmanager.WBOauthManager.do_get") def test_weibo_login(self, mock_do_get, mock_do_post): + """ + 测试微博OAuth登录流程 + + 使用模拟对象测试微博授权码获取令牌和用户信息的完整流程。 + + Args: + mock_do_get: 模拟GET请求的mock对象 + mock_do_post: 模拟POST请求的mock对象 + """ weibo_app = self.get_app_by_type('weibo') - assert weibo_app + assert weibo_app # 验证微博应用存在 + + # 测试授权URL生成 url = weibo_app.get_authorization_url() + + # 设置模拟返回值 - 令牌获取响应 mock_do_post.return_value = json.dumps({"access_token": "access_token", "uid": "uid" }) + # 设置模拟返回值 - 用户信息获取响应 mock_do_get.return_value = json.dumps({ "avatar_large": "avatar_large", "screen_name": "screen_name", "id": "id", "email": "email", }) + + # 执行授权码换取用户信息流程 userinfo = weibo_app.get_access_token_by_code('code') - self.assertEqual(userinfo.token, 'access_token') - self.assertEqual(userinfo.openid, 'id') + + # 验证返回的用户信息正确性 + self.assertEqual(userinfo.token, 'access_token') # 验证访问令牌 + self.assertEqual(userinfo.openid, 'id') # 验证用户OpenID @patch("oauth.oauthmanager.GoogleOauthManager.do_post") @patch("oauth.oauthmanager.GoogleOauthManager.do_get") def test_google_login(self, mock_do_get, mock_do_post): + """ + 测试谷歌OAuth登录流程 + + 验证谷歌OAuth的令牌获取和用户信息解析。 + """ google_app = self.get_app_by_type('google') - assert google_app + assert google_app # 验证谷歌应用存在 + + # 测试授权URL生成 url = google_app.get_authorization_url() + + # 设置模拟返回值 - 令牌获取响应 mock_do_post.return_value = json.dumps({ "access_token": "access_token", "id_token": "id_token", }) + # 设置模拟返回值 - 用户信息获取响应 mock_do_get.return_value = json.dumps({ "picture": "picture", "name": "name", "sub": "sub", "email": "email", }) + + # 执行授权流程 token = google_app.get_access_token_by_code('code') userinfo = google_app.get_oauth_userinfo() - self.assertEqual(userinfo.token, 'access_token') - self.assertEqual(userinfo.openid, 'sub') + + # 验证用户信息正确性 + self.assertEqual(userinfo.token, 'access_token') # 验证访问令牌 + self.assertEqual(userinfo.openid, 'sub') # 验证用户唯一标识 @patch("oauth.oauthmanager.GitHubOauthManager.do_post") @patch("oauth.oauthmanager.GitHubOauthManager.do_get") def test_github_login(self, mock_do_get, mock_do_post): + """ + 测试GitHub OAuth登录流程 + + 验证GitHub的特殊令牌响应格式和用户信息获取。 + """ github_app = self.get_app_by_type('github') - assert github_app + assert github_app # 验证GitHub应用存在 + + # 测试授权URL生成 url = github_app.get_authorization_url() - self.assertTrue("github.com" in url) - self.assertTrue("client_id" in url) + self.assertTrue("github.com" in url) # 验证URL包含GitHub域名 + self.assertTrue("client_id" in url) # 验证URL包含客户端ID + + # 设置模拟返回值 - GitHub特殊的查询字符串格式响应 mock_do_post.return_value = "access_token=gho_16C7e42F292c6912E7710c838347Ae178B4a&scope=repo%2Cgist&token_type=bearer" + # 设置模拟返回值 - 用户信息获取响应 mock_do_get.return_value = json.dumps({ "avatar_url": "avatar_url", "name": "name", "id": "id", "email": "email", }) + + # 执行授权流程 token = github_app.get_access_token_by_code('code') userinfo = github_app.get_oauth_userinfo() - self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a') - self.assertEqual(userinfo.openid, 'id') + + # 验证用户信息正确性 + self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a') # 验证GitHub令牌 + self.assertEqual(userinfo.openid, 'id') # 验证用户ID @patch("oauth.oauthmanager.FaceBookOauthManager.do_post") @patch("oauth.oauthmanager.FaceBookOauthManager.do_get") def test_facebook_login(self, mock_do_get, mock_do_post): + """ + 测试Facebook OAuth登录流程 + + 验证Facebook的令牌获取和嵌套头像数据结构处理。 + """ facebook_app = self.get_app_by_type('facebook') - assert facebook_app + assert facebook_app # 验证Facebook应用存在 + + # 测试授权URL生成 url = facebook_app.get_authorization_url() - self.assertTrue("facebook.com" in url) + self.assertTrue("facebook.com" in url) # 验证URL包含Facebook域名 + + # 设置模拟返回值 - 令牌获取响应 mock_do_post.return_value = json.dumps({ "access_token": "access_token", }) + # 设置模拟返回值 - 用户信息获取响应(包含嵌套的头像数据) mock_do_get.return_value = json.dumps({ "name": "name", "id": "id", @@ -134,14 +247,18 @@ class OauthLoginTest(TestCase): } } }) + + # 执行授权流程 token = facebook_app.get_access_token_by_code('code') userinfo = facebook_app.get_oauth_userinfo() + + # 验证访问令牌正确性 self.assertEqual(userinfo.token, 'access_token') @patch("oauth.oauthmanager.QQOauthManager.do_get", side_effect=[ - 'access_token=access_token&expires_in=3600', - 'callback({"client_id":"appid","openid":"openid"} );', - json.dumps({ + 'access_token=access_token&expires_in=3600', # 第一次调用:令牌获取响应 + 'callback({"client_id":"appid","openid":"openid"} );', # 第二次调用:OpenID获取响应 + json.dumps({ # 第三次调用:用户信息获取响应 "nickname": "nickname", "email": "email", "figureurl": "figureurl", @@ -149,21 +266,41 @@ class OauthLoginTest(TestCase): }) ]) def test_qq_login(self, mock_do_get): + """ + 测试QQ OAuth登录流程 + + 验证QQ的特殊三步骤流程:获取令牌 → 获取OpenID → 获取用户信息。 + + Args: + mock_do_get: 配置了side_effect的模拟GET请求对象 + """ qq_app = self.get_app_by_type('qq') - assert qq_app + assert qq_app # 验证QQ应用存在 + + # 测试授权URL生成 url = qq_app.get_authorization_url() - self.assertTrue("qq.com" in url) + self.assertTrue("qq.com" in url) # 验证URL包含QQ域名 + + # 执行授权流程(mock_do_get会根据side_effect顺序返回不同的响应) token = qq_app.get_access_token_by_code('code') userinfo = qq_app.get_oauth_userinfo() + + # 验证访问令牌正确性 self.assertEqual(userinfo.token, 'access_token') @patch("oauth.oauthmanager.WBOauthManager.do_post") @patch("oauth.oauthmanager.WBOauthManager.do_get") def test_weibo_authoriz_login_with_email(self, mock_do_get, mock_do_post): + """ + 测试带邮箱的微博授权登录完整流程 + 验证用户首次登录和重复登录的场景,确保用户认证状态正确。 + """ + # 设置模拟返回值 - 令牌获取响应 mock_do_post.return_value = json.dumps({"access_token": "access_token", "uid": "uid" }) + # 设置模拟用户信息(包含邮箱) mock_user_info = { "avatar_large": "avatar_large", "screen_name": "screen_name1", @@ -172,25 +309,32 @@ class OauthLoginTest(TestCase): } mock_do_get.return_value = json.dumps(mock_user_info) + # 第一步:发起OAuth登录请求 response = self.client.get('/oauth/oauthlogin?type=weibo') - self.assertEqual(response.status_code, 302) - self.assertTrue("api.weibo.com" in response.url) + self.assertEqual(response.status_code, 302) # 验证重定向 + self.assertTrue("api.weibo.com" in response.url) # 验证重定向到微博 + # 第二步:模拟授权回调(首次登录) response = self.client.get('/oauth/authorize?type=weibo&code=code') - self.assertEqual(response.status_code, 302) - self.assertEqual(response.url, '/') + self.assertEqual(response.status_code, 302) # 验证重定向 + self.assertEqual(response.url, '/') # 验证重定向到首页 + # 验证用户认证状态 user = auth.get_user(self.client) - assert user.is_authenticated + assert user.is_authenticated # 验证用户已认证 self.assertTrue(user.is_authenticated) - self.assertEqual(user.username, mock_user_info['screen_name']) - self.assertEqual(user.email, mock_user_info['email']) + self.assertEqual(user.username, mock_user_info['screen_name']) # 验证用户名 + self.assertEqual(user.email, mock_user_info['email']) # 验证邮箱 + + # 登出用户 self.client.logout() + # 第三步:模拟再次登录(测试重复登录场景) response = self.client.get('/oauth/authorize?type=weibo&code=code') self.assertEqual(response.status_code, 302) self.assertEqual(response.url, '/') + # 验证用户再次认证状态 user = auth.get_user(self.client) assert user.is_authenticated self.assertTrue(user.is_authenticated) @@ -200,10 +344,16 @@ class OauthLoginTest(TestCase): @patch("oauth.oauthmanager.WBOauthManager.do_post") @patch("oauth.oauthmanager.WBOauthManager.do_get") def test_weibo_authoriz_login_without_email(self, mock_do_get, mock_do_post): + """ + 测试不带邮箱的微博授权登录完整流程 + 验证需要补充邮箱的场景,包括邮箱表单提交和邮箱确认流程。 + """ + # 设置模拟返回值 - 令牌获取响应 mock_do_post.return_value = json.dumps({"access_token": "access_token", "uid": "uid" }) + # 设置模拟用户信息(不包含邮箱) mock_user_info = { "avatar_large": "avatar_large", "screen_name": "screen_name1", @@ -211,28 +361,34 @@ class OauthLoginTest(TestCase): } mock_do_get.return_value = json.dumps(mock_user_info) + # 第一步:发起OAuth登录请求 response = self.client.get('/oauth/oauthlogin?type=weibo') self.assertEqual(response.status_code, 302) self.assertTrue("api.weibo.com" in response.url) + # 第二步:模拟授权回调(应该重定向到邮箱补充页面) response = self.client.get('/oauth/authorize?type=weibo&code=code') - self.assertEqual(response.status_code, 302) + # 解析OAuth用户ID从重定向URL中 oauth_user_id = int(response.url.split('/')[-1].split('.')[0]) self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html') + # 第三步:提交邮箱表单 response = self.client.post(response.url, {'email': 'test@gmail.com', 'oauthid': oauth_user_id}) - self.assertEqual(response.status_code, 302) + + # 生成邮箱确认签名 sign = get_sha256(settings.SECRET_KEY + str(oauth_user_id) + settings.SECRET_KEY) + # 验证重定向到绑定成功页面 url = reverse('oauth:bindsuccess', kwargs={ 'oauthid': oauth_user_id, }) self.assertEqual(response.url, f'{url}?type=email') + # 第四步:模拟邮箱确认链接点击 path = reverse('oauth:email_confirm', kwargs={ 'id': oauth_user_id, 'sign': sign @@ -240,10 +396,12 @@ class OauthLoginTest(TestCase): response = self.client.get(path) self.assertEqual(response.status_code, 302) self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success') + + # 验证最终用户认证状态和关联信息 user = auth.get_user(self.client) from oauth.models import OAuthUser oauth_user = OAuthUser.objects.get(author=user) - self.assertTrue(user.is_authenticated) - self.assertEqual(user.username, mock_user_info['screen_name']) - self.assertEqual(user.email, 'test@gmail.com') - self.assertEqual(oauth_user.pk, oauth_user_id) + self.assertTrue(user.is_authenticated) # 验证用户已认证 + self.assertEqual(user.username, mock_user_info['screen_name']) # 验证用户名 + self.assertEqual(user.email, 'test@gmail.com') # 验证补充的邮箱 + self.assertEqual(oauth_user.pk, oauth_user_id) # 验证OAuth用户关联正确 \ No newline at end of file diff --git a/src/DjangoBlog/oauth/urls.py b/src/DjangoBlog/oauth/urls.py index c4a12a0..fce2abb 100644 --- a/src/DjangoBlog/oauth/urls.py +++ b/src/DjangoBlog/oauth/urls.py @@ -1,25 +1,53 @@ +""" +OAuth 认证URL路由配置模块 + +该模块定义了OAuth认证系统的所有URL路由,包括授权回调、邮箱验证、绑定成功等端点。 +这些路由处理第三方登录的完整流程,从初始授权到最终的用户绑定。 +""" + +# 导入Django URL路由相关模块 from django.urls import path +# 导入当前应用的视图模块 from . import views +# 定义应用命名空间,用于URL反向解析 app_name = "oauth" + +# 定义URL模式列表,将URL路径映射到对应的视图处理函数 urlpatterns = [ + # OAuth授权回调端点 - 处理第三方平台返回的授权码 path( - r'oauth/authorize', - views.authorize), + r'oauth/authorize', # URL路径:/oauth/authorize + views.authorize, # 对应的视图函数 + # 名称:未指定,使用默认(可通过views.authorize.__name__访问) + ), + + # 邮箱补充页面 - 当第三方登录未提供邮箱时显示 path( - r'oauth/requireemail/.html', - views.RequireEmailView.as_view(), - name='require_email'), + r'oauth/requireemail/.html', # URL路径,包含OAuth用户ID参数 + views.RequireEmailView.as_view(), # 类视图,需要调用as_view()方法 + name='require_email' # URL名称,用于反向解析 + ), + + # 邮箱确认端点 - 验证用户提交的邮箱地址 path( - r'oauth/emailconfirm//.html', - views.emailconfirm, - name='email_confirm'), + r'oauth/emailconfirm//.html', # URL路径,包含用户ID和签名参数 + views.emailconfirm, # 对应的视图函数 + name='email_confirm' # URL名称,用于反向解析 + ), + + # 绑定成功页面 - 显示OAuth账号绑定成功信息 path( - r'oauth/bindsuccess/.html', - views.bindsuccess, - name='bindsuccess'), + r'oauth/bindsuccess/.html', # URL路径,包含OAuth用户ID参数 + views.bindsuccess, # 对应的视图函数 + name='bindsuccess' # URL名称,用于反向解析 + ), + + # OAuth登录入口 - 初始化第三方登录流程 path( - r'oauth/oauthlogin', - views.oauthlogin, - name='oauthlogin')] + r'oauth/oauthlogin', # URL路径:/oauth/oauthlogin + views.oauthlogin, # 对应的视图函数 + name='oauthlogin' # URL名称,用于反向解析 + ) +] \ No newline at end of file diff --git a/src/DjangoBlog/oauth/views.py b/src/DjangoBlog/oauth/views.py index 12e3a6e..f34a9e4 100644 --- a/src/DjangoBlog/oauth/views.py +++ b/src/DjangoBlog/oauth/views.py @@ -1,119 +1,223 @@ +""" +OAuth 认证视图模块 + +该模块实现了OAuth认证系统的核心视图逻辑,处理第三方登录的完整流程。 +包括授权初始化、回调处理、邮箱验证、用户绑定等功能。 +""" + import logging -# Create your views here. -from urllib.parse import urlparse +# 导入日志模块 +from urllib.parse import urlparse # 导入URL解析工具 +# 导入Django核心模块 from django.conf import settings -from django.contrib.auth import get_user_model -from django.contrib.auth import login -from django.core.exceptions import ObjectDoesNotExist -from django.db import transaction -from django.http import HttpResponseForbidden -from django.http import HttpResponseRedirect -from django.shortcuts import get_object_or_404 -from django.shortcuts import render -from django.urls import reverse -from django.utils import timezone -from django.utils.translation import gettext_lazy as _ -from django.views.generic import FormView - -from djangoblog.blog_signals import oauth_user_login_signal -from djangoblog.utils import get_current_site -from djangoblog.utils import send_email, get_sha256 -from oauth.forms import RequireEmailForm -from .models import OAuthUser -from .oauthmanager import get_manager_by_type, OAuthAccessTokenException +from django.contrib.auth import get_user_model # 获取用户模型 +from django.contrib.auth import login # 用户登录功能 +from django.core.exceptions import ObjectDoesNotExist # 对象不存在异常 +from django.db import transaction # 数据库事务 +from django.http import HttpResponseForbidden # 403禁止访问响应 +from django.http import HttpResponseRedirect # 重定向响应 +from django.shortcuts import get_object_or_404 # 获取对象或404 +from django.shortcuts import render # 模板渲染 +from django.urls import reverse # URL反向解析 +from django.utils import timezone # 时区工具 +from django.utils.translation import gettext_lazy as _ # 国际化翻译 +from django.views.generic import FormView # 表单视图基类 + +# 导入项目自定义模块 +from djangoblog.blog_signals import oauth_user_login_signal # 信号量 +from djangoblog.utils import get_current_site # 获取当前站点 +from djangoblog.utils import send_email, get_sha256 # 邮件发送和加密工具 +from oauth.forms import RequireEmailForm # 邮箱表单 +from .models import OAuthUser # OAuth用户模型 +from .oauthmanager import get_manager_by_type, OAuthAccessTokenException # OAuth管理器 +# 获取当前模块的日志记录器 logger = logging.getLogger(__name__) def get_redirecturl(request): + """ + 获取安全的重定向URL + + 验证next_url参数的安全性,防止开放重定向漏洞。 + + Args: + request: HttpRequest对象 + + Returns: + str: 安全的跳转URL,默认为首页 + """ + # 从请求参数获取跳转URL nexturl = request.GET.get('next_url', None) + + # 如果nexturl为空或是登录页面,则重定向到首页 if not nexturl or nexturl == '/login/' or nexturl == '/login': nexturl = '/' return nexturl + + # 解析URL以验证安全性 p = urlparse(nexturl) + + # 检查URL是否指向外部域名(防止开放重定向攻击) if p.netloc: site = get_current_site().domain + # 比较域名(忽略www前缀),如果不匹配则视为非法URL if not p.netloc.replace('www.', '') == site.replace('www.', ''): logger.info('非法url:' + nexturl) - return "/" + return "/" # 重定向到首页 + return nexturl def oauthlogin(request): + """ + OAuth登录入口视图 + + 根据平台类型初始化第三方登录流程,重定向到对应平台的授权页面。 + + Args: + request: HttpRequest对象 + + Returns: + HttpResponseRedirect: 重定向到第三方授权页面或首页 + """ + # 从请求参数获取OAuth平台类型 type = request.GET.get('type', None) if not type: - return HttpResponseRedirect('/') + return HttpResponseRedirect('/') # 类型为空则重定向到首页 + + # 获取对应平台的OAuth管理器 manager = get_manager_by_type(type) if not manager: - return HttpResponseRedirect('/') + return HttpResponseRedirect('/') # 管理器不存在则重定向到首页 + + # 获取安全的跳转URL nexturl = get_redirecturl(request) + + # 生成第三方平台的授权URL authorizeurl = manager.get_authorization_url(nexturl) + + # 重定向到第三方授权页面 return HttpResponseRedirect(authorizeurl) def authorize(request): + """ + OAuth授权回调视图 + + 处理第三方平台返回的授权码,获取访问令牌和用户信息, + 完成用户认证或引导用户补充信息。 + + Args: + request: HttpRequest对象 + + Returns: + HttpResponseRedirect: 重定向到相应页面 + """ + # 从请求参数获取OAuth平台类型 type = request.GET.get('type', None) if not type: return HttpResponseRedirect('/') + + # 获取对应平台的OAuth管理器 manager = get_manager_by_type(type) if not manager: return HttpResponseRedirect('/') + + # 从请求参数获取授权码 code = request.GET.get('code', None) + try: + # 使用授权码获取访问令牌和用户信息 rsp = manager.get_access_token_by_code(code) except OAuthAccessTokenException as e: + # 处理令牌获取异常 logger.warning("OAuthAccessTokenException:" + str(e)) return HttpResponseRedirect('/') except Exception as e: + # 处理其他异常 logger.error(e) rsp = None + + # 获取安全的跳转URL nexturl = get_redirecturl(request) + + # 如果获取用户信息失败,重新跳转到授权页面 if not rsp: return HttpResponseRedirect(manager.get_authorization_url(nexturl)) + + # 获取OAuth用户信息 user = manager.get_oauth_userinfo() + if user: + # 处理昵称为空的情况,生成默认昵称 if not user.nickname or not user.nickname.strip(): user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') + try: + # 检查是否已存在相同平台和OpenID的用户 temp = OAuthUser.objects.get(type=type, openid=user.openid) + # 更新现有用户信息 temp.picture = user.picture temp.metadata = user.metadata temp.nickname = user.nickname user = temp except ObjectDoesNotExist: + # 用户不存在,使用新用户对象 pass - # facebook的token过长 + + # Facebook的token过长,清空存储 if type == 'facebook': user.token = '' + + # 如果用户有邮箱,直接完成登录流程 if user.email: - with transaction.atomic(): + with transaction.atomic(): # 使用事务保证数据一致性 author = None try: + # 尝试获取已关联的本地用户 author = get_user_model().objects.get(id=user.author_id) except ObjectDoesNotExist: pass + + # 如果没有关联的本地用户 if not author: + # 根据邮箱获取或创建本地用户 result = get_user_model().objects.get_or_create(email=user.email) author = result[0] + + # 如果是新创建的用户 if result[1]: try: + # 检查昵称是否已被使用 get_user_model().objects.get(username=user.nickname) except ObjectDoesNotExist: + # 昵称可用,设置为用户名 author.username = user.nickname else: + # 昵称已被使用,生成唯一用户名 author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') + + # 设置用户来源和保存 author.source = 'authorize' author.save() + # 关联OAuth用户和本地用户 user.author = author user.save() + # 发送用户登录信号 oauth_user_login_signal.send( sender=authorize.__class__, id=user.id) + + # 登录用户 login(request, author) + + # 重定向到目标页面 return HttpResponseRedirect(nexturl) else: + # 用户没有邮箱,保存用户信息并跳转到邮箱补充页面 user.save() url = reverse('oauth:require_email', kwargs={ 'oauthid': user.id @@ -121,35 +225,68 @@ def authorize(request): return HttpResponseRedirect(url) else: + # 获取用户信息失败,重定向到目标页面 return HttpResponseRedirect(nexturl) def emailconfirm(request, id, sign): + """ + 邮箱确认视图 + + 验证邮箱确认链接的签名,完成OAuth用户与本地用户的绑定。 + + Args: + request: HttpRequest对象 + id: OAuth用户ID + sign: 安全签名 + + Returns: + HttpResponseRedirect: 重定向到绑定成功页面 + """ + # 验证签名是否存在 if not sign: return HttpResponseForbidden() + + # 验证签名是否正确 if not get_sha256(settings.SECRET_KEY + str(id) + settings.SECRET_KEY).upper() == sign.upper(): return HttpResponseForbidden() + + # 获取OAuth用户对象 oauthuser = get_object_or_404(OAuthUser, pk=id) - with transaction.atomic(): + + with transaction.atomic(): # 使用事务保证数据一致性 + # 处理用户关联 if oauthuser.author: + # 已有关联用户,直接获取 author = get_user_model().objects.get(pk=oauthuser.author_id) else: + # 没有关联用户,根据邮箱创建或获取用户 result = get_user_model().objects.get_or_create(email=oauthuser.email) author = result[0] + + # 如果是新创建的用户 if result[1]: - author.source = 'emailconfirm' + author.source = 'emailconfirm' # 设置用户来源 + # 设置用户名(使用昵称或生成唯一用户名) author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip( ) else "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') author.save() + + # 保存用户关联关系 oauthuser.author = author oauthuser.save() + + # 发送用户登录信号 oauth_user_login_signal.send( sender=emailconfirm.__class__, id=oauthuser.id) + + # 登录用户 login(request, author) + # 准备邮件内容 site = 'http://' + get_current_site().domain content = _('''

Congratulations, you have successfully bound your email address. You can use @@ -162,7 +299,10 @@ def emailconfirm(request, id, sign): %(site)s ''') % {'oauthuser_type': oauthuser.type, 'site': site} + # 发送绑定成功邮件 send_email(emailto=[oauthuser.email, ], title=_('Congratulations on your successful binding!'), content=content) + + # 重定向到绑定成功页面 url = reverse('oauth:bindsuccess', kwargs={ 'oauthid': id }) @@ -171,49 +311,96 @@ def emailconfirm(request, id, sign): class RequireEmailView(FormView): - form_class = RequireEmailForm - template_name = 'oauth/require_email.html' + """ + 邮箱补充表单视图 + + 当第三方登录未提供邮箱时,显示表单让用户输入邮箱地址。 + """ + + form_class = RequireEmailForm # 使用的表单类 + template_name = 'oauth/require_email.html' # 模板名称 def get(self, request, *args, **kwargs): - oauthid = self.kwargs['oauthid'] + """ + GET请求处理 + + 检查OAuth用户是否已有邮箱,如有则跳过此步骤。 + """ + oauthid = self.kwargs['oauthid'] # 获取OAuth用户ID oauthuser = get_object_or_404(OAuthUser, pk=oauthid) + + # 如果用户已有邮箱,理论上应该跳过此步骤 if oauthuser.email: pass - # return HttpResponseRedirect('/') + # 这里可以添加重定向逻辑:return HttpResponseRedirect('/') return super(RequireEmailView, self).get(request, *args, **kwargs) def get_initial(self): + """ + 设置表单初始值 + + Returns: + dict: 包含初始值的字典 + """ oauthid = self.kwargs['oauthid'] return { - 'email': '', - 'oauthid': oauthid + 'email': '', # 邮箱初始值为空 + 'oauthid': oauthid # 隐藏的OAuth用户ID } def get_context_data(self, **kwargs): + """ + 添加上下文数据 + + 将OAuth用户的头像URL添加到模板上下文。 + """ oauthid = self.kwargs['oauthid'] oauthuser = get_object_or_404(OAuthUser, pk=oauthid) + + # 如果用户有头像,添加到上下文 if oauthuser.picture: kwargs['picture'] = oauthuser.picture + return super(RequireEmailView, self).get_context_data(**kwargs) def form_valid(self, form): + """ + 表单验证通过后的处理 + + 保存用户邮箱,发送确认邮件。 + + Args: + form: 验证通过的表单实例 + + Returns: + HttpResponseRedirect: 重定向到邮件发送提示页面 + """ + # 获取表单数据 email = form.cleaned_data['email'] oauthid = form.cleaned_data['oauthid'] + + # 获取OAuth用户并更新邮箱 oauthuser = get_object_or_404(OAuthUser, pk=oauthid) oauthuser.email = email oauthuser.save() + + # 生成安全签名 sign = get_sha256(settings.SECRET_KEY + str(oauthuser.id) + settings.SECRET_KEY) + + # 构建确认链接 site = get_current_site().domain if settings.DEBUG: - site = '127.0.0.1:8000' + site = '127.0.0.1:8000' # 调试模式使用本地地址 + path = reverse('oauth:email_confirm', kwargs={ 'id': oauthid, 'sign': sign }) url = "http://{site}{path}".format(site=site, path=path) + # 准备邮件内容 content = _("""

Please click the link below to bind your email

@@ -225,29 +412,52 @@ class RequireEmailView(FormView):
%(url)s """) % {'url': url} + + # 发送确认邮件 send_email(emailto=[email, ], title=_('Bind your email'), content=content) + + # 重定向到提示页面 url = reverse('oauth:bindsuccess', kwargs={ 'oauthid': oauthid }) - url = url + '?type=email' + url = url + '?type=email' # 添加类型参数 return HttpResponseRedirect(url) def bindsuccess(request, oauthid): + """ + 绑定成功页面视图 + + 根据绑定状态显示不同的成功信息。 + + Args: + request: HttpRequest对象 + oauthid: OAuth用户ID + + Returns: + HttpResponse: 渲染的绑定成功页面 + """ + # 获取绑定类型 type = request.GET.get('type', None) oauthuser = get_object_or_404(OAuthUser, pk=oauthid) + + # 根据类型设置不同的显示内容 if type == 'email': + # 邮箱已发送状态 title = _('Bind your email') content = _( 'Congratulations, the binding is just one step away. ' 'Please log in to your email to check the email to complete the binding. Thank you.') else: + # 绑定完成状态 title = _('Binding successful') content = _( "Congratulations, you have successfully bound your email address. You can use %(oauthuser_type)s" " to directly log in to this website without a password. You are welcome to continue to follow this site." % { 'oauthuser_type': oauthuser.type}) + + # 渲染绑定成功页面 return render(request, 'oauth/bindsuccess.html', { 'title': title, 'content': content - }) + }) \ No newline at end of file