Merge branch 'ZY_branch' of https://bdgit.educoder.net/pn39ysei2/project into ZY_branch

develop
ymq 3 months ago
commit e2d9e169d6

@ -1,7 +1,6 @@
import logging
from django.contrib import admin
# Register your models here.
from django.urls import reverse
from django.utils.html import format_html
@ -9,29 +8,30 @@ logger = logging.getLogger(__name__)
class OAuthUserAdmin(admin.ModelAdmin):
search_fields = ('nickname', 'email')
list_per_page = 20
search_fields = ('nickname', 'email') # zy: 管理员搜索字段配置
list_per_page = 20 # zy: 分页设置每页20条记录
list_display = (
'id',
'nickname',
'link_to_usermodel',
'show_user_image',
'link_to_usermodel', # zy: 自定义字段-关联用户链接
'show_user_image', # zy: 自定义字段-显示用户头像
'type',
'email',
)
list_display_links = ('id', 'nickname')
list_filter = ('author', 'type',)
readonly_fields = []
list_display_links = ('id', 'nickname') # zy: 可点击进入编辑页的字段
list_filter = ('author', 'type',) # zy: 右侧筛选器字段
def get_readonly_fields(self, request, obj=None):
# zy: 重要将所有字段设为只读防止管理员修改OAuth用户数据
return list(self.readonly_fields) + \
[field.name for field in obj._meta.fields] + \
[field.name for field in obj._meta.many_to_many]
def has_add_permission(self, request):
return False
return False # zy: 关键禁止在admin中添加OAuth用户只能通过OAuth流程创建
def link_to_usermodel(self, obj):
# zy: 核心功能:生成关联用户的可点击链接
if obj.author:
info = (obj.author._meta.app_label, obj.author._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,))
@ -40,15 +40,16 @@ class OAuthUserAdmin(admin.ModelAdmin):
(link, obj.author.nickname if obj.author.nickname else obj.author.email))
def show_user_image(self, obj):
# zy: 核心功能在admin列表中显示用户头像图片
img = obj.picture
return format_html(
u'<img src="%s" style="width:50px;height:50px"></img>' %
(img))
link_to_usermodel.short_description = '用户'
link_to_usermodel.short_description = '用户' # zy: 自定义列显示名称
show_user_image.short_description = '用户头像'
class OAuthConfigAdmin(admin.ModelAdmin):
list_display = ('type', 'appkey', 'appsecret', 'is_enable')
list_filter = ('type',)
list_display = ('type', 'appkey', 'appsecret', 'is_enable') # zy: OAuth配置列表显示字段
list_filter = ('type',) # zy: 按OAuth类型筛选配置

@ -2,4 +2,4 @@ from django.apps import AppConfig
class OauthConfig(AppConfig):
name = 'oauth'
name = 'oauth' # zy定义Django应用的名称必须与文件夹名称一致

@ -3,10 +3,10 @@ from django.forms import widgets
class RequireEmailForm(forms.Form):
email = forms.EmailField(label='电子邮箱', required=True)
oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False)
email = forms.EmailField(label='电子邮箱', required=True) # zy必填邮箱字段用于OAuth登录时补充邮箱信息
oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False) # zy隐藏字段用于传递OAuth用户ID
def __init__(self, *args, **kwargs):
super(RequireEmailForm, self).__init__(*args, **kwargs)
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})
attrs={'placeholder': "email", "class": "form-control"}) # zy自定义邮箱输入框样式添加Bootstrap类

@ -8,50 +8,50 @@ import django.utils.timezone
class Migration(migrations.Migration):
initial = True
initial = True # zy: 重要 - 标记为初始迁移文件
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
migrations.swappable_dependency(settings.AUTH_USER_MODEL), # zy: 关键依赖 - 依赖用户模型
]
operations = [
migrations.CreateModel(
name='OAuthConfig',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('type', models.CharField(choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='类型')),
('appkey', models.CharField(max_length=200, verbose_name='AppKey')),
('appsecret', models.CharField(max_length=200, verbose_name='AppSecret')),
('callback_url', models.CharField(default='http://www.baidu.com', max_length=200, verbose_name='回调地址')),
('is_enable', models.BooleanField(default=True, verbose_name='是否显示')),
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), # zy: 主键字段 - 自增BigAutoField
('type', models.CharField(choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='类型')), # zy: 关键字段 - OAuth类型选择包含五种服务商
('appkey', models.CharField(max_length=200, verbose_name='AppKey')), # zy: 重要字段 - 应用密钥ID
('appsecret', models.CharField(max_length=200, verbose_name='AppSecret')), # zy: 重要字段 - 应用密钥,需安全存储
('callback_url', models.CharField(default='http://www.baidu.com', max_length=200, verbose_name='回调地址')), # zy: 关键字段 - OAuth回调地址默认值为百度
('is_enable', models.BooleanField(default=True, verbose_name='是否显示')), # zy: 控制字段 - 是否启用该OAuth配置
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')), # zy: 时间字段 - 记录创建时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')), # zy: 时间字段 - 记录最后修改时间
],
options={
'verbose_name': 'oauth配置',
'verbose_name_plural': 'oauth配置',
'ordering': ['-created_time'],
'verbose_name': 'oauth配置', # zy: 单数显示名称
'verbose_name_plural': 'oauth配置', # zy: 复数显示名称
'ordering': ['-created_time'], # zy: 默认排序 - 按创建时间倒序
},
),
migrations.CreateModel(
name='OAuthUser',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('openid', models.CharField(max_length=50)),
('nickname', models.CharField(max_length=50, verbose_name='昵称')),
('token', models.CharField(blank=True, max_length=150, null=True)),
('picture', models.CharField(blank=True, max_length=350, null=True)),
('type', models.CharField(max_length=50)),
('email', models.CharField(blank=True, max_length=50, null=True)),
('metadata', models.TextField(blank=True, null=True)),
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='用户')),
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), # zy: 主键字段
('openid', models.CharField(max_length=50)), # zy: 关键字段 - OAuth服务商提供的用户唯一标识
('nickname', models.CharField(max_length=50, verbose_name='昵称')), # zy: 用户昵称字段
('token', models.CharField(blank=True, max_length=150, null=True)), # zy: 令牌字段 - 存储访问令牌,可为空
('picture', models.CharField(blank=True, max_length=350, null=True)), # zy: 头像字段 - 存储头像URL地址
('type', models.CharField(max_length=50)), # zy: 关键字段 - OAuth服务商类型
('email', models.CharField(blank=True, max_length=50, null=True)), # zy: 邮箱字段 - 用户邮箱,可为空
('metadata', models.TextField(blank=True, null=True)), # zy: 元数据字段 - 存储完整的OAuth用户信息
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')), # zy: 创建时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')), # zy: 修改时间
('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='用户')), # zy: 关键关联 - 关联系统用户,可为空(未绑定状态),级联删除
],
options={
'verbose_name': 'oauth用户',
'verbose_name_plural': 'oauth用户',
'ordering': ['-created_time'],
'verbose_name': 'oauth用户', # zy: 单数显示名称
'verbose_name_plural': 'oauth用户', # zy: 复数显示名称
'ordering': ['-created_time'], # zy: 默认排序 - 按创建时间倒序
},
),
]

@ -9,78 +9,78 @@ import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('oauth', '0001_initial'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL), # zy: 关键依赖 - 依赖用户模型
('oauth', '0001_initial'), # zy: 重要依赖 - 依赖初始迁移文件
]
operations = [
migrations.AlterModelOptions(
name='oauthconfig',
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth配置', 'verbose_name_plural': 'oauth配置'},
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth配置', 'verbose_name_plural': 'oauth配置'}, # zy: 修改排序字段为creation_time保持中文显示名称
),
migrations.AlterModelOptions(
name='oauthuser',
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth user', 'verbose_name_plural': 'oauth user'},
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth user', 'verbose_name_plural': 'oauth user'}, # zy: 修改排序字段为creation_time改为英文显示名称
),
migrations.RemoveField(
model_name='oauthconfig',
name='created_time',
name='created_time', # zy: 删除旧字段 - 原创建时间字段
),
migrations.RemoveField(
model_name='oauthconfig',
name='last_mod_time',
name='last_mod_time', # zy: 删除旧字段 - 原修改时间字段
),
migrations.RemoveField(
model_name='oauthuser',
name='created_time',
name='created_time', # zy: 删除旧字段 - 原创建时间字段
),
migrations.RemoveField(
model_name='oauthuser',
name='last_mod_time',
name='last_mod_time', # zy: 删除旧字段 - 原修改时间字段
),
migrations.AddField(
model_name='oauthconfig',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
name='creation_time', # zy: 新增字段 - 标准化的创建时间字段
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'), # zy: 使用国际化字段名称
),
migrations.AddField(
model_name='oauthconfig',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
name='last_modify_time', # zy: 新增字段 - 标准化的最后修改时间字段
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'), # zy: 使用国际化字段名称
),
migrations.AddField(
model_name='oauthuser',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
name='creation_time', # zy: 新增字段 - 标准化的创建时间字段
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'), # zy: 使用国际化字段名称
),
migrations.AddField(
model_name='oauthuser',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
name='last_modify_time', # zy: 新增字段 - 标准化的最后修改时间字段
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'), # zy: 使用国际化字段名称
),
migrations.AlterField(
model_name='oauthconfig',
name='callback_url',
field=models.CharField(default='', max_length=200, verbose_name='callback url'),
name='callback_url', # zy: 修改字段 - 回调地址字段
field=models.CharField(default='', max_length=200, verbose_name='callback url'), # zy: 重要变更 - 默认值改为空字符串,使用国际化字段名称
),
migrations.AlterField(
model_name='oauthconfig',
name='is_enable',
field=models.BooleanField(default=True, verbose_name='is enable'),
name='is_enable', # zy: 修改字段 - 启用状态字段
field=models.BooleanField(default=True, verbose_name='is enable'), # zy: 使用国际化字段名称
),
migrations.AlterField(
model_name='oauthconfig',
name='type',
field=models.CharField(choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='type'),
name='type', # zy: 修改字段 - 类型选择字段
field=models.CharField(choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='type'), # zy: 重要变更 - 选项值改为英文,使用国际化字段名称
),
migrations.AlterField(
model_name='oauthuser',
name='author',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'),
name='author', # zy: 修改字段 - 关联用户字段
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'), # zy: 使用国际化字段名称
),
migrations.AlterField(
model_name='oauthuser',
name='nickname',
field=models.CharField(max_length=50, verbose_name='nickname'),
name='nickname', # zy: 修改字段 - 昵称字段
field=models.CharField(max_length=50, verbose_name='nickname'), # zy: 使用国际化字段名称
),
]
]

@ -6,13 +6,13 @@ from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('oauth', '0002_alter_oauthconfig_options_alter_oauthuser_options_and_more'),
('oauth', '0002_alter_oauthconfig_options_alter_oauthuser_options_and_more'), # zy: 依赖 - 依赖之前的第二次迁移文件
]
operations = [
migrations.AlterField(
model_name='oauthuser',
name='nickname',
field=models.CharField(max_length=50, verbose_name='nick name'),
name='nickname', # zy: 修改字段 - 用户昵称字段
field=models.CharField(max_length=50, verbose_name='nick name'), # zy: 细微调整 - 将字段显示名称从'nickname'改为'nick name'(添加空格)
),
]

@ -12,24 +12,24 @@ class OAuthUser(models.Model):
verbose_name=_('author'),
blank=True,
null=True,
on_delete=models.CASCADE)
openid = models.CharField(max_length=50)
nickname = models.CharField(max_length=50, verbose_name=_('nick name'))
token = models.CharField(max_length=150, null=True, blank=True)
picture = models.CharField(max_length=350, blank=True, null=True)
type = models.CharField(blank=False, null=False, max_length=50)
email = models.CharField(max_length=50, null=True, blank=True)
metadata = models.TextField(null=True, blank=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
on_delete=models.CASCADE) # zy: 核心关联 - 关联系统用户,可为空(未绑定状态)
openid = models.CharField(max_length=50) # zy: 关键字段 - OAuth服务商提供的用户唯一标识
nickname = models.CharField(max_length=50, verbose_name=_('nick name')) # zy: 用户昵称
token = models.CharField(max_length=150, null=True, blank=True) # zy: 重要 - 访问令牌用于API调用
picture = models.CharField(max_length=350, blank=True, null=True) # zy: 用户头像URL
type = models.CharField(blank=False, null=False, max_length=50) # zy: 关键 - OAuth类型weibo、github等
email = models.CharField(max_length=50, null=True, blank=True) # zy: 用户邮箱
metadata = models.TextField(null=True, blank=True) # zy: 重要 - 存储完整的OAuth用户信息JSON
creation_time = models.DateTimeField(_('creation time'), default=now) # zy: 记录创建时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now) # zy: 最后修改时间
def __str__(self):
return self.nickname
return self.nickname # zy: 对象显示为昵称
class Meta:
verbose_name = _('oauth user')
verbose_name_plural = verbose_name
ordering = ['-creation_time']
verbose_name = _('oauth user') # zy: 单数显示名称
verbose_name_plural = verbose_name # zy: 复数显示名称
ordering = ['-creation_time'] # zy: 默认按创建时间倒序排列
class OAuthConfig(models.Model):
@ -39,29 +39,30 @@ class OAuthConfig(models.Model):
('github', 'GitHub'),
('facebook', 'FaceBook'),
('qq', 'QQ'),
)
type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a')
appkey = models.CharField(max_length=200, verbose_name='AppKey')
appsecret = models.CharField(max_length=200, verbose_name='AppSecret')
) # zy: 核心配置 - 支持的OAuth类型选项
type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a') # zy: 关键 - OAuth服务商类型
appkey = models.CharField(max_length=200, verbose_name='AppKey') # zy: 重要 - 应用密钥ID
appsecret = models.CharField(max_length=200, verbose_name='AppSecret') # zy: 重要 - 应用密钥,需加密存储
callback_url = models.CharField(
max_length=200,
verbose_name=_('callback url'),
blank=False,
default='')
default='') # zy: 关键 - OAuth回调URL
is_enable = models.BooleanField(
_('is enable'), default=True, blank=False, null=False)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
_('is enable'), default=True, blank=False, null=False) # zy: 重要 - 控制该配置是否启用
creation_time = models.DateTimeField(_('creation time'), default=now) # zy: 配置创建时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now) # zy: 配置最后修改时间
def clean(self):
# zy: 核心验证 - 确保同类型OAuth配置唯一防止重复配置
if OAuthConfig.objects.filter(
type=self.type).exclude(id=self.id).count():
raise ValidationError(_(self.type + _('already exists')))
def __str__(self):
return self.type
return self.type # zy: 对象显示为类型名称
class Meta:
verbose_name = 'oauth配置'
verbose_name_plural = verbose_name
ordering = ['-creation_time']
verbose_name = 'oauth配置' # zy: 中文显示名称
verbose_name_plural = verbose_name # zy: 复数显示名称
ordering = ['-creation_time'] # zy: 按创建时间倒序排列

@ -15,73 +15,73 @@ logger = logging.getLogger(__name__)
class OAuthAccessTokenException(Exception):
'''
oauth授权失败异常
'''
''' # zy: 核心异常类 - 专门处理OAuth授权过程中的令牌获取失败
class BaseOauthManager(metaclass=ABCMeta):
"""获取用户授权"""
AUTH_URL = None
AUTH_URL = None # zy: 关键URL - OAuth授权页面地址子类必须实现
"""获取token"""
TOKEN_URL = None
TOKEN_URL = None # zy: 关键URL - 获取访问令牌的API地址子类必须实现
"""获取用户信息"""
API_URL = None
API_URL = None # zy: 关键URL - 获取用户信息的API地址子类必须实现
'''icon图标名'''
ICON_NAME = None
ICON_NAME = None # zy: 标识符 - OAuth服务商类型标识用于匹配配置
def __init__(self, access_token=None, openid=None):
self.access_token = access_token
self.openid = openid
self.access_token = access_token # zy: 核心属性 - OAuth访问令牌
self.openid = openid # zy: 核心属性 - 用户在OAuth服务商的唯一ID
@property
def is_access_token_set(self):
return self.access_token is not None
return self.access_token is not None # zy: 重要检查 - 判断访问令牌是否已设置
@property
def is_authorized(self):
return self.is_access_token_set and self.access_token is not None and self.openid is not None
return self.is_access_token_set and self.access_token is not None and self.openid is not None # zy: 核心检查 - 判断是否已完成授权
@abstractmethod
def get_authorization_url(self, nexturl='/'):
pass
pass # zy: 抽象方法 - 生成授权URL子类必须实现
@abstractmethod
def get_access_token_by_code(self, code):
pass
pass # zy: 抽象方法 - 通过授权码获取访问令牌,子类必须实现
@abstractmethod
def get_oauth_userinfo(self):
pass
pass # zy: 抽象方法 - 获取用户信息,子类必须实现
@abstractmethod
def get_picture(self, metadata):
pass
pass # zy: 抽象方法 - 从元数据中提取头像,子类必须实现
def do_get(self, url, params, headers=None):
rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text)
rsp = requests.get(url=url, params=params, headers=headers) # zy: 核心方法 - 执行GET请求
logger.info(rsp.text) # zy: 重要日志 - 记录API响应
return rsp.text
def do_post(self, url, params, headers=None):
rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text)
rsp = requests.post(url, params, headers=headers) # zy: 核心方法 - 执行POST请求
logger.info(rsp.text) # zy: 重要日志 - 记录API响应
return rsp.text
def get_config(self):
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None
value = OAuthConfig.objects.filter(type=self.ICON_NAME) # zy: 关键查询 - 获取对应类型的OAuth配置
return value[0] if value else None # zy: 返回第一个配置或None
class WBOauthManager(BaseOauthManager):
AUTH_URL = 'https://api.weibo.com/oauth2/authorize'
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token'
API_URL = 'https://api.weibo.com/2/users/show.json'
ICON_NAME = 'weibo'
AUTH_URL = 'https://api.weibo.com/oauth2/authorize' # zy: 微博授权URL
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token' # zy: 微博令牌获取URL
API_URL = 'https://api.weibo.com/2/users/show.json' # zy: 微博用户信息API
ICON_NAME = 'weibo' # zy: 微博标识
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
self.client_id = config.appkey if config else '' # zy: 关键配置 - 应用Key
self.client_secret = config.appsecret if config else '' # zy: 关键配置 - 应用Secret
self.callback_url = config.callback_url if config else '' # zy: 关键配置 - 回调地址
super(
WBOauthManager,
self).__init__(
@ -92,13 +92,12 @@ class WBOauthManager(BaseOauthManager):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url + '&next_url=' + nexturl
'redirect_uri': self.callback_url + '&next_url=' + nexturl # zy: 重要参数 - 携带next_url用于跳转
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) # zy: 生成完整的授权URL
return url
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
@ -106,19 +105,19 @@ class WBOauthManager(BaseOauthManager):
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
rsp = self.do_post(self.TOKEN_URL, params) # zy: 关键调用 - 获取访问令牌
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['uid'])
return self.get_oauth_userinfo()
self.access_token = str(obj['access_token']) # zy: 保存访问令牌
self.openid = str(obj['uid']) # zy: 保存用户ID
return self.get_oauth_userinfo() # zy: 重要流程 - 获取令牌后立即获取用户信息
else:
raise OAuthAccessTokenException(rsp)
raise OAuthAccessTokenException(rsp) # zy: 令牌获取失败抛出异常
def get_oauth_userinfo(self):
if not self.is_authorized:
return None
return None # zy: 安全检查 - 确保已授权
params = {
'uid': self.openid,
'access_token': self.access_token
@ -126,24 +125,24 @@ class WBOauthManager(BaseOauthManager):
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['avatar_large']
user.nickname = datas['screen_name']
user.openid = datas['id']
user.type = 'weibo'
user.token = self.access_token
user = OAuthUser() # zy: 创建OAuth用户对象
user.metadata = rsp # zy: 保存原始响应数据
user.picture = datas['avatar_large'] # zy: 设置用户头像
user.nickname = datas['screen_name'] # zy: 设置用户昵称
user.openid = datas['id'] # zy: 设置开放ID
user.type = 'weibo' # zy: 设置类型
user.token = self.access_token # zy: 设置访问令牌
if 'email' in datas and datas['email']:
user.email = datas['email']
user.email = datas['email'] # zy: 设置邮箱(如果有)
return user
except Exception as e:
logger.error(e)
logger.error('weibo oauth error.rsp:' + rsp)
logger.error('weibo oauth error.rsp:' + rsp) # zy: 重要错误日志
return None
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['avatar_large']
return datas['avatar_large'] # zy: 从元数据中提取头像URL
class ProxyManagerMixin:
@ -151,27 +150,27 @@ class ProxyManagerMixin:
if os.environ.get("HTTP_PROXY"):
self.proxies = {
"http": os.environ.get("HTTP_PROXY"),
"https": os.environ.get("HTTP_PROXY")
"https": os.environ.get("HTTP_PROXY") # zy: 重要配置 - 设置HTTP代理
}
else:
self.proxies = None
def do_get(self, url, params, headers=None):
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies) # zy: 带代理的GET请求
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
rsp = requests.post(url, params, headers=headers, proxies=self.proxies) # zy: 带代理的POST请求
logger.info(rsp.text)
return rsp.text
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
ICON_NAME = 'google'
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' # zy: Google授权URL
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token' # zy: Google令牌URL
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo' # zy: Google用户信息API
ICON_NAME = 'google' # zy: Google标识
def __init__(self, access_token=None, openid=None):
config = self.get_config()
@ -189,7 +188,7 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'openid email',
'scope': 'openid email', # zy: 重要参数 - 请求openid和email权限
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
@ -200,7 +199,6 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
@ -209,9 +207,9 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['id_token'])
self.openid = str(obj['id_token']) # zy: Google使用id_token作为openid
logger.info(self.ICON_NAME + ' oauth ' + rsp)
return self.access_token
return self.access_token # zy: 返回访问令牌
else:
raise OAuthAccessTokenException(rsp)
@ -223,13 +221,12 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['picture']
user.picture = datas['picture'] # zy: Google头像字段
user.nickname = datas['name']
user.openid = datas['sub']
user.openid = datas['sub'] # zy: Google用户ID字段
user.token = self.access_token
user.type = 'google'
if datas['email']:
@ -246,10 +243,10 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://github.com/login/oauth/authorize'
TOKEN_URL = 'https://github.com/login/oauth/access_token'
API_URL = 'https://api.github.com/user'
ICON_NAME = 'github'
AUTH_URL = 'https://github.com/login/oauth/authorize' # zy: GitHub授权URL
TOKEN_URL = 'https://github.com/login/oauth/access_token' # zy: GitHub令牌URL
API_URL = 'https://api.github.com/user' # zy: GitHub用户信息API
ICON_NAME = 'github' # zy: GitHub标识
def __init__(self, access_token=None, openid=None):
config = self.get_config()
@ -266,8 +263,8 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': f'{self.callback_url}&next_url={next_url}',
'scope': 'user'
'redirect_uri': f'{self.callback_url}&next_url={next_url}', # zy: 使用f-string格式化URL
'scope': 'user' # zy: 请求user权限范围
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
@ -278,13 +275,12 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
from urllib import parse
r = parse.parse_qs(rsp)
r = parse.parse_qs(rsp) # zy: 重要 - GitHub返回的是查询字符串格式
if 'access_token' in r:
self.access_token = (r['access_token'][0])
return self.access_token
@ -292,14 +288,13 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token
"Authorization": "token " + self.access_token # zy: 关键 - GitHub需要在header中传递token
})
try:
datas = json.loads(rsp)
user = OAuthUser()
user.picture = datas['avatar_url']
user.picture = datas['avatar_url'] # zy: GitHub头像字段
user.nickname = datas['name']
user.openid = datas['id']
user.type = 'github'
@ -319,10 +314,10 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth'
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token'
API_URL = 'https://graph.facebook.com/me'
ICON_NAME = 'facebook'
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth' # zy: Facebook授权URL指定API版本
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token' # zy: Facebook令牌URL
API_URL = 'https://graph.facebook.com/me' # zy: Facebook用户信息API
ICON_NAME = 'facebook' # zy: Facebook标识
def __init__(self, access_token=None, openid=None):
config = self.get_config()
@ -340,7 +335,7 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'email,public_profile'
'scope': 'email,public_profile' # zy: 请求邮箱和公开资料权限
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
@ -349,9 +344,7 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
# 'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
@ -367,7 +360,7 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
def get_oauth_userinfo(self):
params = {
'access_token': self.access_token,
'fields': 'id,name,picture,email'
'fields': 'id,name,picture,email' # zy: 重要 - 指定需要返回的字段
}
try:
rsp = self.do_get(self.API_URL, params)
@ -381,7 +374,7 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
if 'email' in datas and datas['email']:
user.email = datas['email']
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
user.picture = str(datas['picture']['data']['url'])
user.picture = str(datas['picture']['data']['url']) # zy: Facebook头像嵌套在data对象中
return user
except Exception as e:
logger.error(e)
@ -393,11 +386,11 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
class QQOauthManager(BaseOauthManager):
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize'
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token'
API_URL = 'https://graph.qq.com/user/get_user_info'
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me'
ICON_NAME = 'qq'
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize' # zy: QQ授权URL
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token' # zy: QQ令牌URL
API_URL = 'https://graph.qq.com/user/get_user_info' # zy: QQ用户信息API
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' # zy: 关键 - QQ需要单独获取openid
ICON_NAME = 'qq' # zy: QQ标识
def __init__(self, access_token=None, openid=None):
config = self.get_config()
@ -429,7 +422,7 @@ class QQOauthManager(BaseOauthManager):
}
rsp = self.do_get(self.TOKEN_URL, params)
if rsp:
d = urllib.parse.parse_qs(rsp)
d = urllib.parse.parse_qs(rsp) # zy: 重要 - QQ返回查询字符串格式
if 'access_token' in d:
token = d['access_token']
self.access_token = token[0]
@ -447,18 +440,18 @@ class QQOauthManager(BaseOauthManager):
rsp = rsp.replace(
'callback(', '').replace(
')', '').replace(
';', '')
';', '') # zy: 关键处理 - 清理JSONP响应格式
obj = json.loads(rsp)
openid = str(obj['openid'])
self.openid = openid
return openid
def get_oauth_userinfo(self):
openid = self.get_open_id()
openid = self.get_open_id() # zy: 重要 - 先获取openid
if openid:
params = {
'access_token': self.access_token,
'oauth_consumer_key': self.client_id,
'oauth_consumer_key': self.client_id, # zy: QQ需要传递client_id作为oauth_consumer_key
'openid': self.openid
}
rsp = self.do_get(self.API_URL, params)
@ -473,7 +466,7 @@ class QQOauthManager(BaseOauthManager):
if 'email' in obj:
user.email = obj['email']
if 'figureurl' in obj:
user.picture = str(obj['figureurl'])
user.picture = str(obj['figureurl']) # zy: QQ头像字段
return user
def get_picture(self, metadata):
@ -483,22 +476,22 @@ class QQOauthManager(BaseOauthManager):
@cache_decorator(expiration=100 * 60)
def get_oauth_apps():
configs = OAuthConfig.objects.filter(is_enable=True).all()
configs = OAuthConfig.objects.filter(is_enable=True).all() # zy: 关键查询 - 只获取启用的OAuth配置
if not configs:
return []
configtypes = [x.type for x in configs]
applications = BaseOauthManager.__subclasses__()
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
configtypes = [x.type for x in configs] # zy: 提取配置类型列表
applications = BaseOauthManager.__subclasses__() # zy: 重要 - 获取所有子类OAuth管理器
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes] # zy: 创建对应的管理器实例
return apps
def get_manager_by_type(type):
applications = get_oauth_apps()
applications = get_oauth_apps() # zy: 获取所有可用的OAuth应用
if applications:
finds = list(
filter(
lambda x: x.ICON_NAME.lower() == type.lower(),
lambda x: x.ICON_NAME.lower() == type.lower(), # zy: 按类型过滤
applications))
if finds:
return finds[0]
return None
return finds[0] # zy: 返回第一个匹配的管理器
return None # zy: 未找到返回None

@ -3,20 +3,20 @@ from django.urls import reverse
from oauth.oauthmanager import get_oauth_apps
register = template.Library()
register = template.Library() # zy: 重要 - 创建Django模板标签库实例
@register.inclusion_tag('oauth/oauth_applications.html')
def load_oauth_applications(request):
applications = get_oauth_apps()
if applications:
baseurl = reverse('oauth:oauthlogin')
path = request.get_full_path()
@register.inclusion_tag('oauth/oauth_applications.html') # zy: 核心装饰器 - 注册包含标签,指定渲染模板
def load_oauth_applications(request): # zy: 关键函数 - 加载OAuth应用列表的模板标签
applications = get_oauth_apps() # zy: 重要调用 - 获取所有启用的OAuth应用管理器
if applications: # zy: 安全检查 - 确保有可用的OAuth应用
baseurl = reverse('oauth:oauthlogin') # zy: 关键调用 - 生成OAuth登录入口URL
path = request.get_full_path() # zy: 获取当前完整路径,用于登录后跳转
apps = list(map(lambda x: (x.ICON_NAME, '{baseurl}?type={type}&next_url={next}'.format(
baseurl=baseurl, type=x.ICON_NAME, next=path)), applications))
baseurl=baseurl, type=x.ICON_NAME, next=path)), applications)) # zy: 核心逻辑 - 生成每个OAuth应用的登录URL
else:
apps = []
apps = [] # zy: 默认值 - 无可用应用时返回空列表
return {
'apps': apps
}
'apps': apps # zy: 返回字典 - 包含应用列表供模板使用
}

@ -14,64 +14,64 @@ from oauth.oauthmanager import BaseOauthManager
# Create your tests here.
class OAuthConfigTest(TestCase):
def setUp(self):
self.client = Client()
self.factory = RequestFactory()
self.client = Client() # zy: 测试客户端 - 用于模拟HTTP请求
self.factory = RequestFactory() # zy: 请求工厂 - 用于创建请求对象
def test_oauth_login_test(self):
c = OAuthConfig()
c.type = 'weibo'
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
c = OAuthConfig() # zy: 创建OAuth配置对象
c.type = 'weibo' # zy: 设置OAuth类型为微博
c.appkey = 'appkey' # zy: 设置应用Key
c.appsecret = 'appsecret' # zy: 设置应用Secret
c.save() # zy: 保存配置到数据库
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
response = self.client.get('/oauth/oauthlogin?type=weibo') # zy: 模拟OAuth登录请求
self.assertEqual(response.status_code, 302) # zy: 断言重定向状态码
self.assertTrue("api.weibo.com" in response.url) # zy: 断言跳转到微博授权页面
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
response = self.client.get('/oauth/authorize?type=weibo&code=code') # zy: 模拟授权回调请求
self.assertEqual(response.status_code, 302) # zy: 断言重定向状态码
self.assertEqual(response.url, '/') # zy: 断言跳转到首页
class OauthLoginTest(TestCase):
def setUp(self) -> None:
self.client = Client()
self.factory = RequestFactory()
self.apps = self.init_apps()
self.apps = self.init_apps() # zy: 初始化所有OAuth应用
def init_apps(self):
applications = [p() for p in BaseOauthManager.__subclasses__()]
applications = [p() for p in BaseOauthManager.__subclasses__()] # zy: 重要 - 获取所有OAuth管理器子类的实例
for application in applications:
c = OAuthConfig()
c.type = application.ICON_NAME.lower()
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
c.type = application.ICON_NAME.lower() # zy: 设置配置类型
c.appkey = 'appkey' # zy: 模拟应用Key
c.appsecret = 'appsecret' # zy: 模拟应用Secret
c.save() # zy: 保存每个OAuth配置
return applications
def get_app_by_type(self, type):
for app in self.apps:
if app.ICON_NAME.lower() == type:
if app.ICON_NAME.lower() == type: # zy: 按类型查找对应的OAuth管理器
return app
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
@patch("oauth.oauthmanager.WBOauthManager.do_post") # zy: 关键 - 模拟微博POST请求
@patch("oauth.oauthmanager.WBOauthManager.do_get") # zy: 关键 - 模拟微博GET请求
def test_weibo_login(self, mock_do_get, mock_do_post):
weibo_app = self.get_app_by_type('weibo')
weibo_app = self.get_app_by_type('weibo') # zy: 获取微博OAuth管理器
assert weibo_app
url = weibo_app.get_authorization_url()
url = weibo_app.get_authorization_url() # zy: 获取授权URL
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
}) # zy: 模拟令牌接口返回
mock_do_get.return_value = json.dumps({
"avatar_large": "avatar_large",
"screen_name": "screen_name",
"id": "id",
"email": "email",
})
userinfo = weibo_app.get_access_token_by_code('code')
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'id')
}) # zy: 模拟用户信息接口返回
userinfo = weibo_app.get_access_token_by_code('code') # zy: 关键调用 - 通过授权码获取用户信息
self.assertEqual(userinfo.token, 'access_token') # zy: 断言令牌正确
self.assertEqual(userinfo.openid, 'id') # zy: 断言用户ID正确
@patch("oauth.oauthmanager.GoogleOauthManager.do_post")
@patch("oauth.oauthmanager.GoogleOauthManager.do_get")
@ -81,18 +81,18 @@ class OauthLoginTest(TestCase):
url = google_app.get_authorization_url()
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"id_token": "id_token",
"id_token": "id_token", # zy: Google特有字段 - ID令牌
})
mock_do_get.return_value = json.dumps({
"picture": "picture",
"name": "name",
"sub": "sub",
"sub": "sub", # zy: Google用户ID字段
"email": "email",
})
token = google_app.get_access_token_by_code('code')
userinfo = google_app.get_oauth_userinfo()
userinfo = google_app.get_oauth_userinfo() # zy: 重要 - 分开获取用户信息
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'sub')
self.assertEqual(userinfo.openid, 'sub') # zy: 断言Google用户ID
@patch("oauth.oauthmanager.GitHubOauthManager.do_post")
@patch("oauth.oauthmanager.GitHubOauthManager.do_get")
@ -100,18 +100,18 @@ class OauthLoginTest(TestCase):
github_app = self.get_app_by_type('github')
assert github_app
url = github_app.get_authorization_url()
self.assertTrue("github.com" in url)
self.assertTrue("client_id" in url)
mock_do_post.return_value = "access_token=gho_16C7e42F292c6912E7710c838347Ae178B4a&scope=repo%2Cgist&token_type=bearer"
self.assertTrue("github.com" in url) # zy: 断言GitHub授权URL
self.assertTrue("client_id" in url) # zy: 断言包含client_id参数
mock_do_post.return_value = "access_token=gho_16C7e42F292c6912E7710c838347Ae178B4a&scope=repo%2Cgist&token_type=bearer" # zy: 重要 - GitHub返回查询字符串格式
mock_do_get.return_value = json.dumps({
"avatar_url": "avatar_url",
"avatar_url": "avatar_url", # zy: GitHub头像字段
"name": "name",
"id": "id",
"email": "email",
})
token = github_app.get_access_token_by_code('code')
userinfo = github_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a')
self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a') # zy: 断言GitHub令牌格式
self.assertEqual(userinfo.openid, 'id')
@patch("oauth.oauthmanager.FaceBookOauthManager.do_post")
@ -120,7 +120,7 @@ class OauthLoginTest(TestCase):
facebook_app = self.get_app_by_type('facebook')
assert facebook_app
url = facebook_app.get_authorization_url()
self.assertTrue("facebook.com" in url)
self.assertTrue("facebook.com" in url) # zy: 断言Facebook授权URL
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
})
@ -130,7 +130,7 @@ class OauthLoginTest(TestCase):
"email": "email",
"picture": {
"data": {
"url": "url"
"url": "url" # zy: Facebook头像嵌套结构
}
}
})
@ -139,20 +139,20 @@ class OauthLoginTest(TestCase):
self.assertEqual(userinfo.token, 'access_token')
@patch("oauth.oauthmanager.QQOauthManager.do_get", side_effect=[
'access_token=access_token&expires_in=3600',
'callback({"client_id":"appid","openid":"openid"} );',
'access_token=access_token&expires_in=3600', # zy: 第一次调用 - 获取令牌
'callback({"client_id":"appid","openid":"openid"} );', # zy: 第二次调用 - 获取openidJSONP格式
json.dumps({
"nickname": "nickname",
"email": "email",
"figureurl": "figureurl",
"figureurl": "figureurl", # zy: QQ头像字段
"openid": "openid",
})
}) # zy: 第三次调用 - 获取用户信息
])
def test_qq_login(self, mock_do_get):
qq_app = self.get_app_by_type('qq')
assert qq_app
url = qq_app.get_authorization_url()
self.assertTrue("qq.com" in url)
self.assertTrue("qq.com" in url) # zy: 断言QQ授权URL
token = qq_app.get_access_token_by_code('code')
userinfo = qq_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
@ -160,7 +160,7 @@ class OauthLoginTest(TestCase):
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_with_email(self, mock_do_get, mock_do_post):
# zy: 重要测试 - 测试带邮箱的微博登录完整流程
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
@ -168,7 +168,7 @@ class OauthLoginTest(TestCase):
"avatar_large": "avatar_large",
"screen_name": "screen_name1",
"id": "id",
"email": "email",
"email": "email", # zy: 包含邮箱信息
}
mock_do_get.return_value = json.dumps(mock_user_info)
@ -176,17 +176,18 @@ class OauthLoginTest(TestCase):
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
response = self.client.get('/oauth/authorize?type=weibo&code=code')
response = self.client.get('/oauth/authorize?type=weibo&code=code') # zy: 模拟授权回调
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
self.assertEqual(response.url, '/') # zy: 直接跳转首页(有邮箱)
user = auth.get_user(self.client)
assert user.is_authenticated
user = auth.get_user(self.client) # zy: 获取当前登录用户
assert user.is_authenticated # zy: 断言用户已认证
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email'])
self.client.logout()
self.assertEqual(user.username, mock_user_info['screen_name']) # zy: 断言用户名
self.assertEqual(user.email, mock_user_info['email']) # zy: 断言邮箱
self.client.logout() # zy: 注销用户
# zy: 重复登录测试
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
@ -200,7 +201,7 @@ class OauthLoginTest(TestCase):
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_without_email(self, mock_do_get, mock_do_post):
# zy: 重要测试 - 测试不带邮箱的微博登录流程(需要补充邮箱)
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
@ -208,6 +209,7 @@ class OauthLoginTest(TestCase):
"avatar_large": "avatar_large",
"screen_name": "screen_name1",
"id": "id",
# zy: 故意不包含邮箱字段
}
mock_do_get.return_value = json.dumps(mock_user_info)
@ -219,31 +221,31 @@ class OauthLoginTest(TestCase):
self.assertEqual(response.status_code, 302)
oauth_user_id = int(response.url.split('/')[-1].split('.')[0])
self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html')
oauth_user_id = int(response.url.split('/')[-1].split('.')[0]) # zy: 从URL中提取OAuth用户ID
self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html') # zy: 断言跳转到邮箱补充页面
response = self.client.post(response.url, {'email': 'test@gmail.com', 'oauthid': oauth_user_id})
response = self.client.post(response.url, {'email': 'test@gmail.com', 'oauthid': oauth_user_id}) # zy: 提交邮箱表单
self.assertEqual(response.status_code, 302)
sign = get_sha256(settings.SECRET_KEY +
str(oauth_user_id) + settings.SECRET_KEY)
str(oauth_user_id) + settings.SECRET_KEY) # zy: 重要 - 生成邮箱确认签名
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': oauth_user_id,
})
self.assertEqual(response.url, f'{url}?type=email')
self.assertEqual(response.url, f'{url}?type=email') # zy: 断言跳转到绑定成功页面
path = reverse('oauth:email_confirm', kwargs={
'id': oauth_user_id,
'sign': sign
'sign': sign # zy: 使用签名进行邮箱确认
})
response = self.client.get(path)
response = self.client.get(path) # zy: 模拟邮箱确认链接点击
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success')
self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success') # zy: 最终成功页面
user = auth.get_user(self.client)
from oauth.models import OAuthUser
oauth_user = OAuthUser.objects.get(author=user)
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, 'test@gmail.com')
self.assertEqual(oauth_user.pk, oauth_user_id)
oauth_user = OAuthUser.objects.get(author=user) # zy: 查询关联的OAuth用户
self.assertTrue(user.is_authenticated) # zy: 断言用户已登录
self.assertEqual(user.username, mock_user_info['screen_name']) # zy: 断言用户名
self.assertEqual(user.email, 'test@gmail.com') # zy: 断言补充的邮箱
self.assertEqual(oauth_user.pk, oauth_user_id) # zy: 断言OAuth用户ID匹配

@ -2,24 +2,24 @@ from django.urls import path
from . import views
app_name = "oauth"
app_name = "oauth" # zy: 重要 - 定义应用命名空间用于URL反向解析
urlpatterns = [
path(
r'oauth/authorize',
views.authorize),
r'oauth/authorize', # zy: 核心路径 - OAuth授权回调处理
views.authorize), # zy: 关键视图 - 处理OAuth服务商回调获取用户信息
path(
r'oauth/requireemail/<int:oauthid>.html',
views.RequireEmailView.as_view(),
name='require_email'),
r'oauth/requireemail/<int:oauthid>.html', # zy: 动态路径 - 需要补充邮箱页面带OAuth用户ID参数
views.RequireEmailView.as_view(), # zy: 类视图 - 处理邮箱补充表单
name='require_email'), # zy: URL名称 - 用于反向解析
path(
r'oauth/emailconfirm/<int:id>/<sign>.html',
views.emailconfirm,
name='email_confirm'),
r'oauth/emailconfirm/<int:id>/<sign>.html', # zy: 关键路径 - 邮箱确认链接带ID和签名参数
views.emailconfirm, # zy: 重要视图 - 验证邮箱确认签名并完成绑定
name='email_confirm'), # zy: URL名称 - 邮箱确认功能
path(
r'oauth/bindsuccess/<int:oauthid>.html',
views.bindsuccess,
name='bindsuccess'),
r'oauth/bindsuccess/<int:oauthid>.html', # zy: 成功页面 - OAuth绑定成功提示
views.bindsuccess, # zy: 视图函数 - 显示绑定成功信息
name='bindsuccess'), # zy: URL名称 - 绑定成功页面
path(
r'oauth/oauthlogin',
views.oauthlogin,
name='oauthlogin')]
r'oauth/oauthlogin', # zy: 入口路径 - OAuth登录入口
views.oauthlogin, # zy: 入口视图 - 跳转到OAuth服务商授权页面
name='oauthlogin')] # zy: URL名称 - OAuth登录功能

@ -27,29 +27,29 @@ logger = logging.getLogger(__name__)
def get_redirecturl(request):
nexturl = request.GET.get('next_url', None)
nexturl = request.GET.get('next_url', None) # zy: 获取重定向URL参数
if not nexturl or nexturl == '/login/' or nexturl == '/login':
nexturl = '/'
nexturl = '/' # zy: 默认重定向到首页
return nexturl
p = urlparse(nexturl)
p = urlparse(nexturl) # zy: 解析URL防止开放重定向攻击
if p.netloc:
site = get_current_site().domain
if not p.netloc.replace('www.', '') == site.replace('www.', ''):
logger.info('非法url:' + nexturl)
logger.info('非法url:' + nexturl) # zy: 安全记录 - 记录非法URL
return "/"
return nexturl
def oauthlogin(request):
type = request.GET.get('type', None)
type = request.GET.get('type', None) # zy: 获取OAuth类型参数
if not type:
return HttpResponseRedirect('/')
manager = get_manager_by_type(type)
manager = get_manager_by_type(type) # zy: 关键调用 - 获取对应类型的OAuth管理器
if not manager:
return HttpResponseRedirect('/')
nexturl = get_redirecturl(request)
authorizeurl = manager.get_authorization_url(nexturl)
return HttpResponseRedirect(authorizeurl)
authorizeurl = manager.get_authorization_url(nexturl) # zy: 核心功能 - 生成授权URL
return HttpResponseRedirect(authorizeurl) # zy: 重定向到OAuth服务商授权页面
def authorize(request):
@ -59,96 +59,96 @@ def authorize(request):
manager = get_manager_by_type(type)
if not manager:
return HttpResponseRedirect('/')
code = request.GET.get('code', None)
code = request.GET.get('code', None) # zy: 关键参数 - OAuth服务商返回的授权码
try:
rsp = manager.get_access_token_by_code(code)
rsp = manager.get_access_token_by_code(code) # zy: 核心调用 - 使用授权码获取访问令牌
except OAuthAccessTokenException as e:
logger.warning("OAuthAccessTokenException:" + str(e))
logger.warning("OAuthAccessTokenException:" + str(e)) # zy: 重要日志 - 令牌获取异常
return HttpResponseRedirect('/')
except Exception as e:
logger.error(e)
rsp = None
nexturl = get_redirecturl(request)
if not rsp:
return HttpResponseRedirect(manager.get_authorization_url(nexturl))
user = manager.get_oauth_userinfo()
return HttpResponseRedirect(manager.get_authorization_url(nexturl)) # zy: 失败时重新授权
user = manager.get_oauth_userinfo() # zy: 关键调用 - 获取用户信息
if user:
if not user.nickname or not user.nickname.strip():
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') # zy: 生成默认昵称
try:
temp = OAuthUser.objects.get(type=type, openid=user.openid)
temp = OAuthUser.objects.get(type=type, openid=user.openid) # zy: 检查是否已存在该OAuth用户
temp.picture = user.picture
temp.metadata = user.metadata
temp.nickname = user.nickname
user = temp
user = temp # zy: 使用已存在的用户记录
except ObjectDoesNotExist:
pass
# facebook的token过长
if type == 'facebook':
user.token = ''
if user.email:
with transaction.atomic():
user.token = '' # zy: 特殊处理 - Facebook令牌过长清空存储
if user.email: # zy: 关键判断 - 用户有邮箱直接登录
with transaction.atomic(): # zy: 重要 - 数据库事务保证数据一致性
author = None
try:
author = get_user_model().objects.get(id=user.author_id)
author = get_user_model().objects.get(id=user.author_id) # zy: 查找已关联的用户
except ObjectDoesNotExist:
pass
if not author:
result = get_user_model().objects.get_or_create(email=user.email)
result = get_user_model().objects.get_or_create(email=user.email) # zy: 根据邮箱获取或创建用户
author = result[0]
if result[1]:
if result[1]: # zy: 判断是否为新创建的用户
try:
get_user_model().objects.get(username=user.nickname)
except ObjectDoesNotExist:
author.username = user.nickname
author.username = user.nickname # zy: 使用OAuth昵称作为用户名
else:
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.source = 'authorize'
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') # zy: 昵称冲突时生成唯一用户名
author.source = 'authorize' # zy: 标记用户来源
author.save()
user.author = author
user.author = author # zy: 关联OAuth用户到系统用户
user.save()
oauth_user_login_signal.send(
sender=authorize.__class__, id=user.id)
login(request, author)
sender=authorize.__class__, id=user.id) # zy: 重要 - 发送登录信号
login(request, author) # zy: 核心功能 - 登录用户
return HttpResponseRedirect(nexturl)
else:
else: # zy: 用户没有邮箱,需要补充
user.save()
url = reverse('oauth:require_email', kwargs={
'oauthid': user.id
})
}) # zy: 生成邮箱补充页面URL
return HttpResponseRedirect(url)
return HttpResponseRedirect(url) # zy: 重定向到邮箱补充页面
else:
return HttpResponseRedirect(nexturl)
def emailconfirm(request, id, sign):
if not sign:
return HttpResponseForbidden()
return HttpResponseForbidden() # zy: 安全拒绝 - 无签名参数
if not get_sha256(settings.SECRET_KEY +
str(id) +
settings.SECRET_KEY).upper() == sign.upper():
settings.SECRET_KEY).upper() == sign.upper(): # zy: 重要 - 验证签名防止篡改
return HttpResponseForbidden()
oauthuser = get_object_or_404(OAuthUser, pk=id)
with transaction.atomic():
if oauthuser.author:
author = get_user_model().objects.get(pk=oauthuser.author_id)
else:
result = get_user_model().objects.get_or_create(email=oauthuser.email)
result = get_user_model().objects.get_or_create(email=oauthuser.email) # zy: 创建系统用户
author = result[0]
if result[1]:
author.source = 'emailconfirm'
author.source = 'emailconfirm' # zy: 标记来源为邮箱确认
author.username = oauthuser.nickname.strip() if oauthuser.nickname.strip(
) else "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
) else "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S') # zy: 设置用户名
author.save()
oauthuser.author = author
oauthuser.author = author # zy: 完成关联
oauthuser.save()
oauth_user_login_signal.send(
sender=emailconfirm.__class__,
id=oauthuser.id)
login(request, author)
id=oauthuser.id) # zy: 发送登录信号
login(request, author) # zy: 登录用户
site = 'http://' + get_current_site().domain
content = _('''
@ -162,22 +162,22 @@ def emailconfirm(request, id, sign):
%(site)s
''') % {'oauthuser_type': oauthuser.type, 'site': site}
send_email(emailto=[oauthuser.email, ], title=_('Congratulations on your successful binding!'), content=content)
send_email(emailto=[oauthuser.email, ], title=_('Congratulations on your successful binding!'), content=content) # zy: 发送绑定成功邮件
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': id
})
url = url + '?type=success'
return HttpResponseRedirect(url)
return HttpResponseRedirect(url) # zy: 重定向到成功页面
class RequireEmailView(FormView):
form_class = RequireEmailForm
template_name = 'oauth/require_email.html'
form_class = RequireEmailForm # zy: 使用邮箱表单类
template_name = 'oauth/require_email.html' # zy: 模板路径
def get(self, request, *args, **kwargs):
oauthid = self.kwargs['oauthid']
oauthid = self.kwargs['oauthid'] # zy: 获取URL参数中的OAuth用户ID
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.email:
if oauthuser.email: # zy: 安全检查 - 如果已有邮箱直接跳过
pass
# return HttpResponseRedirect('/')
@ -187,32 +187,32 @@ class RequireEmailView(FormView):
oauthid = self.kwargs['oauthid']
return {
'email': '',
'oauthid': oauthid
'oauthid': oauthid # zy: 初始化表单数据
}
def get_context_data(self, **kwargs):
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.picture:
kwargs['picture'] = oauthuser.picture
kwargs['picture'] = oauthuser.picture # zy: 传递用户头像到模板
return super(RequireEmailView, self).get_context_data(**kwargs)
def form_valid(self, form):
email = form.cleaned_data['email']
email = form.cleaned_data['email'] # zy: 获取验证后的邮箱
oauthid = form.cleaned_data['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
oauthuser.email = email
oauthuser.email = email # zy: 保存邮箱到OAuth用户
oauthuser.save()
sign = get_sha256(settings.SECRET_KEY +
str(oauthuser.id) + settings.SECRET_KEY)
str(oauthuser.id) + settings.SECRET_KEY) # zy: 生成邮箱确认签名
site = get_current_site().domain
if settings.DEBUG:
site = '127.0.0.1:8000'
site = '127.0.0.1:8000' # zy: 开发环境域名
path = reverse('oauth:email_confirm', kwargs={
'id': oauthid,
'sign': sign
})
url = "http://{site}{path}".format(site=site, path=path)
url = "http://{site}{path}".format(site=site, path=path) # zy: 生成完整的确认链接
content = _("""
<p>Please click the link below to bind your email</p>
@ -225,29 +225,29 @@ class RequireEmailView(FormView):
<br />
%(url)s
""") % {'url': url}
send_email(emailto=[email, ], title=_('Bind your email'), content=content)
send_email(emailto=[email, ], title=_('Bind your email'), content=content) # zy: 发送邮箱确认邮件
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': oauthid
})
url = url + '?type=email'
return HttpResponseRedirect(url)
url = url + '?type=email' # zy: 添加类型参数
return HttpResponseRedirect(url) # zy: 重定向到绑定成功页面
def bindsuccess(request, oauthid):
type = request.GET.get('type', None)
type = request.GET.get('type', None) # zy: 获取成功类型
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if type == 'email':
title = _('Bind your email')
content = _(
'Congratulations, the binding is just one step away. '
'Please log in to your email to check the email to complete the binding. Thank you.')
'Please log in to your email to check the email to complete the binding. Thank you.') # zy: 等待邮箱确认提示
else:
title = _('Binding successful')
content = _(
"Congratulations, you have successfully bound your email address. You can use %(oauthuser_type)s"
" to directly log in to this website without a password. You are welcome to continue to follow this site." % {
'oauthuser_type': oauthuser.type})
'oauthuser_type': oauthuser.type}) # zy: 绑定成功提示
return render(request, 'oauth/bindsuccess.html', {
'title': title,
'content': content
})
'content': content # zy: 渲染成功页面
})
Loading…
Cancel
Save