zwz_branch
Zhao Wangzi 6 months ago
parent be802e64d9
commit 2dac869369

@ -1,54 +0,0 @@
import logging
from django.contrib import admin
# Register your models here.
from django.urls import reverse
from django.utils.html import format_html
logger = logging.getLogger(__name__)
class OAuthUserAdmin(admin.ModelAdmin):
search_fields = ('nickname', 'email')
list_per_page = 20
list_display = (
'id',
'nickname',
'link_to_usermodel',
'show_user_image',
'type',
'email',
)
list_display_links = ('id', 'nickname')
list_filter = ('author', 'type',)
readonly_fields = []
def get_readonly_fields(self, request, obj=None):
return list(self.readonly_fields) + \
[field.name for field in obj._meta.fields] + \
[field.name for field in obj._meta.many_to_many]
def has_add_permission(self, request):
return False
def link_to_usermodel(self, obj):
if obj.author:
info = (obj.author._meta.app_label, obj.author._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,))
return format_html(
u'<a href="%s">%s</a>' %
(link, obj.author.nickname if obj.author.nickname else obj.author.email))
def show_user_image(self, obj):
img = obj.picture
return format_html(
u'<img src="%s" style="width:50px;height:50px"></img>' %
(img))
link_to_usermodel.short_description = '用户'
show_user_image.short_description = '用户头像'
class OAuthConfigAdmin(admin.ModelAdmin):
list_display = ('type', 'appkey', 'appsecret', 'is_enable')
list_filter = ('type',)

@ -1,5 +0,0 @@
from django.apps import AppConfig
class OauthConfig(AppConfig):
name = 'oauth'

@ -1,12 +0,0 @@
from django.contrib.auth.forms import forms
from django.forms import widgets
class RequireEmailForm(forms.Form):
email = forms.EmailField(label='电子邮箱', required=True)
oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False)
def __init__(self, *args, **kwargs):
super(RequireEmailForm, self).__init__(*args, **kwargs)
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})

@ -1,57 +0,0 @@
# Generated by Django 4.1.7 on 2023-03-07 09:53
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
initial = True
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='OAuthConfig',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('type', models.CharField(choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='类型')),
('appkey', models.CharField(max_length=200, verbose_name='AppKey')),
('appsecret', models.CharField(max_length=200, verbose_name='AppSecret')),
('callback_url', models.CharField(default='http://www.baidu.com', max_length=200, verbose_name='回调地址')),
('is_enable', models.BooleanField(default=True, verbose_name='是否显示')),
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
],
options={
'verbose_name': 'oauth配置',
'verbose_name_plural': 'oauth配置',
'ordering': ['-created_time'],
},
),
migrations.CreateModel(
name='OAuthUser',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('openid', models.CharField(max_length=50)),
('nickname', models.CharField(max_length=50, verbose_name='昵称')),
('token', models.CharField(blank=True, max_length=150, null=True)),
('picture', models.CharField(blank=True, max_length=350, null=True)),
('type', models.CharField(max_length=50)),
('email', models.CharField(blank=True, max_length=50, null=True)),
('metadata', models.TextField(blank=True, null=True)),
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='用户')),
],
options={
'verbose_name': 'oauth用户',
'verbose_name_plural': 'oauth用户',
'ordering': ['-created_time'],
},
),
]

@ -1,86 +0,0 @@
# Generated by Django 4.2.5 on 2023-09-06 13:13
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('oauth', '0001_initial'),
]
operations = [
migrations.AlterModelOptions(
name='oauthconfig',
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth配置', 'verbose_name_plural': 'oauth配置'},
),
migrations.AlterModelOptions(
name='oauthuser',
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth user', 'verbose_name_plural': 'oauth user'},
),
migrations.RemoveField(
model_name='oauthconfig',
name='created_time',
),
migrations.RemoveField(
model_name='oauthconfig',
name='last_mod_time',
),
migrations.RemoveField(
model_name='oauthuser',
name='created_time',
),
migrations.RemoveField(
model_name='oauthuser',
name='last_mod_time',
),
migrations.AddField(
model_name='oauthconfig',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
migrations.AddField(
model_name='oauthconfig',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
),
migrations.AddField(
model_name='oauthuser',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
migrations.AddField(
model_name='oauthuser',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
),
migrations.AlterField(
model_name='oauthconfig',
name='callback_url',
field=models.CharField(default='', max_length=200, verbose_name='callback url'),
),
migrations.AlterField(
model_name='oauthconfig',
name='is_enable',
field=models.BooleanField(default=True, verbose_name='is enable'),
),
migrations.AlterField(
model_name='oauthconfig',
name='type',
field=models.CharField(choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='type'),
),
migrations.AlterField(
model_name='oauthuser',
name='author',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'),
),
migrations.AlterField(
model_name='oauthuser',
name='nickname',
field=models.CharField(max_length=50, verbose_name='nickname'),
),
]

@ -1,18 +0,0 @@
# Generated by Django 4.2.7 on 2024-01-26 02:41
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('oauth', '0002_alter_oauthconfig_options_alter_oauthuser_options_and_more'),
]
operations = [
migrations.AlterField(
model_name='oauthuser',
name='nickname',
field=models.CharField(max_length=50, verbose_name='nick name'),
),
]

@ -1,94 +0,0 @@
#zwz: Create your models here.
#zwz: 导入Django配置、异常、数据库模型等核心模块
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
#zwz: OAuth用户表存储第三方登录用户的关联信息
class OAuthUser(models.Model):
#zwz: 关联Django系统用户可为空未绑定本地用户时用
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'), # 字段显示名称(多语言支持)
blank=True,
null=True,
on_delete=models.CASCADE) # 关联用户删除时,此记录也删除
#zwz: 第三方平台的用户唯一标识如GitHub的ID
openid = models.CharField(max_length=50)
#zwz: 第三方平台的用户昵称
nickname = models.CharField(max_length=50, verbose_name=_('nick name'))
#zwz: 第三方平台的授权令牌(可选)
token = models.CharField(max_length=150, null=True, blank=True)
#zwz: 第三方平台的用户头像链接
picture = models.CharField(max_length=350, blank=True, null=True)
#zwz: 第三方平台类型如github/weibo
type = models.CharField(blank=False, null=False, max_length=50)
#zwz: 第三方平台的用户邮箱(可选)
email = models.CharField(max_length=50, null=True, blank=True)
#zwz: 第三方平台返回的额外元数据存JSON等
metadata = models.TextField(null=True, blank=True)
#zwz: 记录创建时间(默认当前时间)
creation_time = models.DateTimeField(_('creation time'), default=now)
#zwz: 记录最后修改时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
#zwz: 打印对象时显示昵称
def __str__(self):
return self.nickname
#zwz: 模型元信息配置
class Meta:
verbose_name = _('oauth user') # 模型显示名称
verbose_name_plural = verbose_name # 复数显示名称
ordering = ['-creation_time'] # 默认按创建时间倒序排列
#zwz: OAuth平台配置表存储第三方登录的AppKey、密钥等信息
class OAuthConfig(models.Model):
#zwz: 支持的第三方平台类型(固定选项)
TYPE = (
('weibo', _('weibo')),
('google', _('google')),
('github', 'GitHub'),
('facebook', 'FaceBook'),
('qq', 'QQ'),
)
#zwz: 平台类型关联上面的TYPE选项
type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a')
#zwz: 第三方平台的AppKey
appkey = models.CharField(max_length=200, verbose_name='AppKey')
#zwz: 第三方平台的AppSecret
appsecret = models.CharField(max_length=200, verbose_name='AppSecret')
#zwz: 第三方平台的授权回调地址
callback_url = models.CharField(
max_length=200,
verbose_name=_('callback url'),
blank=False,
default='')
#zwz: 是否启用该平台的登录功能
is_enable = models.BooleanField(
_('is enable'), default=True, blank=False, null=False)
#zwz: 配置创建时间
creation_time = models.DateTimeField(_('creation time'), default=now)
#zwz: 配置最后修改时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
#zwz: 数据验证:同一平台只能有一条配置
def clean(self):
#zwz: 排除当前记录后,检查是否已有同类型配置
if OAuthConfig.objects.filter(
type=self.type).exclude(id=self.id).count():
raise ValidationError(_(self.type + _('already exists')))
#zwz: 打印对象时显示平台类型
def __str__(self):
return self.type
#zwz: 模型元信息配置
class Meta:
verbose_name = 'oauth配置' # 模型显示名称(中文)
verbose_name_plural = verbose_name # 复数显示名称
ordering = ['-creation_time'] # 默认按创建时间倒序排列

@ -1,504 +0,0 @@
import json
import logging
import os
import urllib.parse
from abc import ABCMeta, abstractmethod
import requests
from djangoblog.utils import cache_decorator
from oauth.models import OAuthUser, OAuthConfig
logger = logging.getLogger(__name__)
class OAuthAccessTokenException(Exception):
'''
oauth授权失败异常
'''
class BaseOauthManager(metaclass=ABCMeta):
"""获取用户授权"""
AUTH_URL = None
"""获取token"""
TOKEN_URL = None
"""获取用户信息"""
API_URL = None
'''icon图标名'''
ICON_NAME = None
def __init__(self, access_token=None, openid=None):
self.access_token = access_token
self.openid = openid
@property
def is_access_token_set(self):
return self.access_token is not None
@property
def is_authorized(self):
return self.is_access_token_set and self.access_token is not None and self.openid is not None
@abstractmethod
def get_authorization_url(self, nexturl='/'):
pass
@abstractmethod
def get_access_token_by_code(self, code):
pass
@abstractmethod
def get_oauth_userinfo(self):
pass
@abstractmethod
def get_picture(self, metadata):
pass
def do_get(self, url, params, headers=None):
rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text)
return rsp.text
def get_config(self):
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None
class WBOauthManager(BaseOauthManager):
AUTH_URL = 'https://api.weibo.com/oauth2/authorize'
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token'
API_URL = 'https://api.weibo.com/2/users/show.json'
ICON_NAME = 'weibo'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
WBOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
def get_authorization_url(self, nexturl='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url + '&next_url=' + nexturl
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['uid'])
return self.get_oauth_userinfo()
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
if not self.is_authorized:
return None
params = {
'uid': self.openid,
'access_token': self.access_token
}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['avatar_large']
user.nickname = datas['screen_name']
user.openid = datas['id']
user.type = 'weibo'
user.token = self.access_token
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('weibo oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['avatar_large']
class ProxyManagerMixin:
def __init__(self, *args, **kwargs):
if os.environ.get("HTTP_PROXY"):
self.proxies = {
"http": os.environ.get("HTTP_PROXY"),
"https": os.environ.get("HTTP_PROXY")
}
else:
self.proxies = None
def do_get(self, url, params, headers=None):
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
ICON_NAME = 'google'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
GoogleOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
def get_authorization_url(self, nexturl='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'openid email',
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['id_token'])
logger.info(self.ICON_NAME + ' oauth ' + rsp)
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
if not self.is_authorized:
return None
params = {
'access_token': self.access_token
}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['picture']
user.nickname = datas['name']
user.openid = datas['sub']
user.token = self.access_token
user.type = 'google'
if datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('google oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['picture']
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://github.com/login/oauth/authorize'
TOKEN_URL = 'https://github.com/login/oauth/access_token'
API_URL = 'https://api.github.com/user'
ICON_NAME = 'github'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
GitHubOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
def get_authorization_url(self, next_url='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': f'{self.callback_url}&next_url={next_url}',
'scope': 'user'
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
from urllib import parse
r = parse.parse_qs(rsp)
if 'access_token' in r:
self.access_token = (r['access_token'][0])
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token
})
try:
datas = json.loads(rsp)
user = OAuthUser()
user.picture = datas['avatar_url']
user.nickname = datas['name']
user.openid = datas['id']
user.type = 'github'
user.token = self.access_token
user.metadata = rsp
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('github oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['avatar_url']
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth'
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token'
API_URL = 'https://graph.facebook.com/me'
ICON_NAME = 'facebook'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
FaceBookOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
def get_authorization_url(self, next_url='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'email,public_profile'
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
# 'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
token = str(obj['access_token'])
self.access_token = token
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
params = {
'access_token': self.access_token,
'fields': 'id,name,picture,email'
}
try:
rsp = self.do_get(self.API_URL, params)
datas = json.loads(rsp)
user = OAuthUser()
user.nickname = datas['name']
user.openid = datas['id']
user.type = 'facebook'
user.token = self.access_token
user.metadata = rsp
if 'email' in datas and datas['email']:
user.email = datas['email']
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
user.picture = str(datas['picture']['data']['url'])
return user
except Exception as e:
logger.error(e)
return None
def get_picture(self, metadata):
datas = json.loads(metadata)
return str(datas['picture']['data']['url'])
class QQOauthManager(BaseOauthManager):
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize'
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token'
API_URL = 'https://graph.qq.com/user/get_user_info'
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me'
ICON_NAME = 'qq'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
QQOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
def get_authorization_url(self, next_url='/'):
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_get(self.TOKEN_URL, params)
if rsp:
d = urllib.parse.parse_qs(rsp)
if 'access_token' in d:
token = d['access_token']
self.access_token = token[0]
return token
else:
raise OAuthAccessTokenException(rsp)
def get_open_id(self):
if self.is_access_token_set:
params = {
'access_token': self.access_token
}
rsp = self.do_get(self.OPEN_ID_URL, params)
if rsp:
rsp = rsp.replace(
'callback(', '').replace(
')', '').replace(
';', '')
obj = json.loads(rsp)
openid = str(obj['openid'])
self.openid = openid
return openid
def get_oauth_userinfo(self):
openid = self.get_open_id()
if openid:
params = {
'access_token': self.access_token,
'oauth_consumer_key': self.client_id,
'openid': self.openid
}
rsp = self.do_get(self.API_URL, params)
logger.info(rsp)
obj = json.loads(rsp)
user = OAuthUser()
user.nickname = obj['nickname']
user.openid = openid
user.type = 'qq'
user.token = self.access_token
user.metadata = rsp
if 'email' in obj:
user.email = obj['email']
if 'figureurl' in obj:
user.picture = str(obj['figureurl'])
return user
def get_picture(self, metadata):
datas = json.loads(metadata)
return str(datas['figureurl'])
@cache_decorator(expiration=100 * 60)
def get_oauth_apps():
configs = OAuthConfig.objects.filter(is_enable=True).all()
if not configs:
return []
configtypes = [x.type for x in configs]
applications = BaseOauthManager.__subclasses__()
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
return apps
def get_manager_by_type(type):
applications = get_oauth_apps()
if applications:
finds = list(
filter(
lambda x: x.ICON_NAME.lower() == type.lower(),
applications))
if finds:
return finds[0]
return None

@ -1,22 +0,0 @@
from django import template
from django.urls import reverse
from oauth.oauthmanager import get_oauth_apps
register = template.Library()
@register.inclusion_tag('oauth/oauth_applications.html')
def load_oauth_applications(request):
applications = get_oauth_apps()
if applications:
baseurl = reverse('oauth:oauthlogin')
path = request.get_full_path()
apps = list(map(lambda x: (x.ICON_NAME, '{baseurl}?type={type}&next_url={next}'.format(
baseurl=baseurl, type=x.ICON_NAME, next=path)), applications))
else:
apps = []
return {
'apps': apps
}

@ -1,249 +0,0 @@
import json
from unittest.mock import patch
from django.conf import settings
from django.contrib import auth
from django.test import Client, RequestFactory, TestCase
from django.urls import reverse
from djangoblog.utils import get_sha256
from oauth.models import OAuthConfig
from oauth.oauthmanager import BaseOauthManager
# Create your tests here.
class OAuthConfigTest(TestCase):
def setUp(self):
self.client = Client()
self.factory = RequestFactory()
def test_oauth_login_test(self):
c = OAuthConfig()
c.type = 'weibo'
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
class OauthLoginTest(TestCase):
def setUp(self) -> None:
self.client = Client()
self.factory = RequestFactory()
self.apps = self.init_apps()
def init_apps(self):
applications = [p() for p in BaseOauthManager.__subclasses__()]
for application in applications:
c = OAuthConfig()
c.type = application.ICON_NAME.lower()
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
return applications
def get_app_by_type(self, type):
for app in self.apps:
if app.ICON_NAME.lower() == type:
return app
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_login(self, mock_do_get, mock_do_post):
weibo_app = self.get_app_by_type('weibo')
assert weibo_app
url = weibo_app.get_authorization_url()
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
mock_do_get.return_value = json.dumps({
"avatar_large": "avatar_large",
"screen_name": "screen_name",
"id": "id",
"email": "email",
})
userinfo = weibo_app.get_access_token_by_code('code')
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'id')
@patch("oauth.oauthmanager.GoogleOauthManager.do_post")
@patch("oauth.oauthmanager.GoogleOauthManager.do_get")
def test_google_login(self, mock_do_get, mock_do_post):
google_app = self.get_app_by_type('google')
assert google_app
url = google_app.get_authorization_url()
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"id_token": "id_token",
})
mock_do_get.return_value = json.dumps({
"picture": "picture",
"name": "name",
"sub": "sub",
"email": "email",
})
token = google_app.get_access_token_by_code('code')
userinfo = google_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
self.assertEqual(userinfo.openid, 'sub')
@patch("oauth.oauthmanager.GitHubOauthManager.do_post")
@patch("oauth.oauthmanager.GitHubOauthManager.do_get")
def test_github_login(self, mock_do_get, mock_do_post):
github_app = self.get_app_by_type('github')
assert github_app
url = github_app.get_authorization_url()
self.assertTrue("github.com" in url)
self.assertTrue("client_id" in url)
mock_do_post.return_value = "access_token=gho_16C7e42F292c6912E7710c838347Ae178B4a&scope=repo%2Cgist&token_type=bearer"
mock_do_get.return_value = json.dumps({
"avatar_url": "avatar_url",
"name": "name",
"id": "id",
"email": "email",
})
token = github_app.get_access_token_by_code('code')
userinfo = github_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'gho_16C7e42F292c6912E7710c838347Ae178B4a')
self.assertEqual(userinfo.openid, 'id')
@patch("oauth.oauthmanager.FaceBookOauthManager.do_post")
@patch("oauth.oauthmanager.FaceBookOauthManager.do_get")
def test_facebook_login(self, mock_do_get, mock_do_post):
facebook_app = self.get_app_by_type('facebook')
assert facebook_app
url = facebook_app.get_authorization_url()
self.assertTrue("facebook.com" in url)
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
})
mock_do_get.return_value = json.dumps({
"name": "name",
"id": "id",
"email": "email",
"picture": {
"data": {
"url": "url"
}
}
})
token = facebook_app.get_access_token_by_code('code')
userinfo = facebook_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
@patch("oauth.oauthmanager.QQOauthManager.do_get", side_effect=[
'access_token=access_token&expires_in=3600',
'callback({"client_id":"appid","openid":"openid"} );',
json.dumps({
"nickname": "nickname",
"email": "email",
"figureurl": "figureurl",
"openid": "openid",
})
])
def test_qq_login(self, mock_do_get):
qq_app = self.get_app_by_type('qq')
assert qq_app
url = qq_app.get_authorization_url()
self.assertTrue("qq.com" in url)
token = qq_app.get_access_token_by_code('code')
userinfo = qq_app.get_oauth_userinfo()
self.assertEqual(userinfo.token, 'access_token')
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_with_email(self, mock_do_get, mock_do_post):
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
mock_user_info = {
"avatar_large": "avatar_large",
"screen_name": "screen_name1",
"id": "id",
"email": "email",
}
mock_do_get.return_value = json.dumps(mock_user_info)
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
user = auth.get_user(self.client)
assert user.is_authenticated
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email'])
self.client.logout()
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
user = auth.get_user(self.client)
assert user.is_authenticated
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, mock_user_info['email'])
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_without_email(self, mock_do_get, mock_do_post):
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
mock_user_info = {
"avatar_large": "avatar_large",
"screen_name": "screen_name1",
"id": "id",
}
mock_do_get.return_value = json.dumps(mock_user_info)
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
oauth_user_id = int(response.url.split('/')[-1].split('.')[0])
self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html')
response = self.client.post(response.url, {'email': 'test@gmail.com', 'oauthid': oauth_user_id})
self.assertEqual(response.status_code, 302)
sign = get_sha256(settings.SECRET_KEY +
str(oauth_user_id) + settings.SECRET_KEY)
url = reverse('oauth:bindsuccess', kwargs={
'oauthid': oauth_user_id,
})
self.assertEqual(response.url, f'{url}?type=email')
path = reverse('oauth:email_confirm', kwargs={
'id': oauth_user_id,
'sign': sign
})
response = self.client.get(path)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success')
user = auth.get_user(self.client)
from oauth.models import OAuthUser
oauth_user = OAuthUser.objects.get(author=user)
self.assertTrue(user.is_authenticated)
self.assertEqual(user.username, mock_user_info['screen_name'])
self.assertEqual(user.email, 'test@gmail.com')
self.assertEqual(oauth_user.pk, oauth_user_id)

@ -1,27 +0,0 @@
from django.urls import path
from . import views
app_name = "oauth" # zwz: 命名空间,方便模板和视图中反向解析
urlpatterns = [
# zwz: 授权回调处理
path(r'oauth/authorize', views.authorize),
# zwz: 邮箱绑定页面
path(r'oauth/requireemail/<<<<int:oauthid>.html',
views.RequireEmailView.as_view(),
name='require_email'),
# zwz: 邮箱验证链接修复sign参数格式
path(r'oauth/emailconfirm/<<<<int:id>/<<str:sign>.html',
views.emailconfirm,
name='email_confirm'),
# zwz: 绑定成功页面
path(r'oauth/bindsuccess/<<<<int:oauthid>.html',
views.bindsuccess,
name='bindsuccess'),
# zwz: 第三方登录入口
path(r'oauth/oauthlogin', views.oauthlogin, name='oauthlogin')
]

@ -1,211 +0,0 @@
import logging
#zwz: Create your views here.
from urllib.parse import urlparse
#zwz: 导入Django核心模块、工具及项目内模块
from django.conf import settings
from django.contrib.auth import get_user_model, login
from django.core.exceptions import ObjectDoesNotExist
from django.db import transaction
from django.http import HttpResponseForbidden, HttpResponseRedirect
from django.shortcuts import get_object_or_404, render
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.views.generic import FormView
from djangoblog.blog_signals import oauth_user_login_signal
from djangoblog.utils import get_current_site, send_email, get_sha256
from oauth.forms import RequireEmailForm
from .models import OAuthUser
from .oauthmanager import get_manager_by_type, OAuthAccessTokenException
logger = logging.getLogger(__name__) # 初始化日志记录器
#zwz: 处理重定向URL过滤非法地址返回安全的跳转路径
def get_redirecturl(request):
nexturl = request.GET.get('next_url', None)
#zwz: 无URL或为登录页时默认跳首页
if not nexturl or nexturl in ('/login/', '/login'):
return '/'
#zwz: 校验URL是否为本站地址防止跨站跳转
p = urlparse(nexturl)
if p.netloc:
site = get_current_site().domain
if p.netloc.replace('www.', '') != site.replace('www.', ''):
logger.info('非法url:' + nexturl)
return "/"
return nexturl
#zwz: 第三方登录入口:根据平台类型获取授权链接
def oauthlogin(request):
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
manager = get_manager_by_type(type) # 获取对应平台的授权管理器
if not manager:
return HttpResponseRedirect('/')
nexturl = get_redirecturl(request)
authorizeurl = manager.get_authorization_url(nexturl) # 生成授权链接
return HttpResponseRedirect(authorizeurl)
#zwz: 授权回调处理:获取用户信息并绑定本地账号
def authorize(request):
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
manager = get_manager_by_type(type)
if not manager:
return HttpResponseRedirect('/')
code = request.GET.get('code', None)
#zwz: 通过授权码获取access_token
try:
rsp = manager.get_access_token_by_code(code)
except OAuthAccessTokenException as e:
logger.warning("OAuthAccessTokenException:" + str(e))
return HttpResponseRedirect('/')
except Exception as e:
logger.error(e)
rsp = None
nexturl = get_redirecturl(request)
if not rsp:
return HttpResponseRedirect(manager.get_authorization_url(nexturl))
#zwz: 获取第三方用户信息
user = manager.get_oauth_userinfo()
if user:
#zwz: 补全默认昵称(防止空值)
if not user.nickname or not user.nickname.strip():
user.nickname = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
#zwz: 存在旧记录则更新,否则创建新记录
try:
temp = OAuthUser.objects.get(type=type, openid=user.openid)
temp.picture = user.picture
temp.metadata = user.metadata
temp.nickname = user.nickname
user = temp
except ObjectDoesNotExist:
pass
#zwz: Facebook的token过长这里清空
if type == 'facebook':
user.token = ''
#zwz: 有邮箱则直接绑定本地用户
if user.email:
with transaction.atomic(): # 事务保证数据一致性
author = None
try:
author = get_user_model().objects.get(id=user.author_id)
except ObjectDoesNotExist:
pass
if not author:
#zwz: 邮箱不存在则创建新用户
result = get_user_model().objects.get_or_create(email=user.email)
author = result[0]
if result[1]:
#zwz: 处理昵称重复
try:
get_user_model().objects.get(username=user.nickname)
except ObjectDoesNotExist:
author.username = user.nickname
else:
author.username = "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.source = 'authorize'
author.save()
#zwz: 关联用户并登录
user.author = author
user.save()
oauth_user_login_signal.send(sender=authorize.__class__, id=user.id)
login(request, author)
return HttpResponseRedirect(nexturl)
#zwz: 无邮箱则跳转到邮箱绑定页
else:
user.save()
url = reverse('oauth:require_email', kwargs={'oauthid': user.id})
return HttpResponseRedirect(url)
else:
return HttpResponseRedirect(nexturl)
#zwz: 邮箱验证:确认邮箱并完成绑定
def emailconfirm(request, id, sign):
#zwz: 校验签名是否合法
if not sign or get_sha256(settings.SECRET_KEY + str(id) + settings.SECRET_KEY).upper() != sign.upper():
return HttpResponseForbidden()
oauthuser = get_object_or_404(OAuthUser, pk=id)
with transaction.atomic():
#zwz: 关联或创建本地用户
if oauthuser.author:
author = get_user_model().objects.get(pk=oauthuser.author_id)
else:
result = get_user_model().objects.get_or_create(email=oauthuser.email)
author = result[0]
if result[1]:
author.source = 'emailconfirm'
author.username = oauthuser.nickname.strip() or "djangoblog" + timezone.now().strftime('%y%m%d%I%M%S')
author.save()
oauthuser.author = author
oauthuser.save()
#zwz: 发送登录信号并完成登录
oauth_user_login_signal.send(sender=emailconfirm.__class__, id=oauthuser.id)
login(request, author)
#zwz: 发送绑定成功邮件
site = 'http://' + get_current_site().domain
content = _('''<p>恭喜您成功绑定邮箱,可直接用%(oauthuser_type)s登录本站。</p >
欢迎关注本站<a href=" ">%(site)s</a >''') % {'oauthuser_type': oauthuser.type, 'site': site}
send_email(emailto=[oauthuser.email, ], title=_('绑定成功!'), content=content)
return HttpResponseRedirect(reverse('oauth:bindsuccess', kwargs={'oauthid': id}) + '?type=success')
#zwz: 邮箱绑定表单页:处理无邮箱用户的邮箱填写
class RequireEmailView(FormView):
form_class = RequireEmailForm
template_name = 'oauth/require_email.html'
def get(self, request, *args, **kwargs):
oauthid = self.kwargs['oauthid']
get_object_or_404(OAuthUser, pk=oauthid)
return super().get(request, *args, **kwargs)
def get_initial(self):
#zwz: 初始化表单默认值
return {'email': '', 'oauthid': self.kwargs['oauthid']}
def get_context_data(self, **kwargs):
#zwz: 传递用户头像到模板
oauthuser = get_object_or_404(OAuthUser, pk=self.kwargs['oauthid'])
if oauthuser.picture:
kwargs['picture'] = oauthuser.picture
return super().get_context_data(**kwargs)
def form_valid(self, form):
#zwz: 提交邮箱后发送验证邮件
email = form.cleaned_data['email']
oauthid = form.cleaned_data['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
oauthuser.email = email
oauthuser.save()
#zwz: 生成验证签名
sign = get_sha256(settings.SECRET_KEY + str(oauthuser.id) + settings.SECRET_KEY)
site = get_current_site().domain if not settings.DEBUG else '127.0.0.1:8000'
path = reverse('oauth:email_confirm', kwargs={'id': oauthid, 'sign': sign})
url = f"http://{site}{path}"
#zwz: 发送验证邮件
content = _(f'<p>请点击链接绑定邮箱:<a href="{url}">{url}</a ></p >')
send_email(emailto=[email, ], title=_('绑定邮箱'), content=content)
return HttpResponseRedirect(reverse('oauth:bindsuccess', kwargs={'oauthid': oauthid}) + '?type=email')
#zwz: 绑定结果页:显示成功提示
def bindsuccess(request, oauthid):
type = request.GET.get('type', None)
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
#zwz: 根据类型返回不同提示内容
if type == 'email':
title = _('绑定邮箱')
content = _('请登录邮箱完成最后一步绑定')
else:
title = _('绑定成功')
content = _(f"已用{oauthuser.type}绑定账号,可直接登录")
return render(request, 'oauth/bindsuccess.html', {'title': title, 'content': content})
Loading…
Cancel
Save