nch的注释

pull/34/head
nch 4 months ago
parent 9b8e0064d8
commit 85347e776b

@ -9,86 +9,115 @@ import requests
from djangoblog.utils import cache_decorator
from oauth.models import OAuthUser, OAuthConfig
import logging
import requests
import json
import urllib.parse
import os
from abc import ABCMeta, abstractmethod
from django.core.cache import cache
from cache_decorator import cache_decorator
# 获取logger实例
logger = logging.getLogger(__name__)
class OAuthAccessTokenException(Exception):
'''
oauth授权失败异常
OAuth授权失败异常类
'''
class BaseOauthManager(metaclass=ABCMeta):
"""获取用户授权"""
"""OAuth授权管理器基类"""
# 授权URL
AUTH_URL = None
"""获取token"""
# 获取token的URL
TOKEN_URL = None
"""获取用户信息"""
# 获取用户信息的API URL
API_URL = None
'''icon图标名'''
# icon图标名
ICON_NAME = None
def __init__(self, access_token=None, openid=None):
"""
初始化OAuth管理器
Args:
access_token: 访问令牌
openid: 用户唯一标识
"""
self.access_token = access_token
self.openid = openid
@property
def is_access_token_set(self):
"""检查access_token是否已设置"""
return self.access_token is not None
@property
def is_authorized(self):
"""检查是否已授权既有access_token又有openid"""
return self.is_access_token_set and self.access_token is not None and self.openid is not None
@abstractmethod
def get_authorization_url(self, nexturl='/'):
"""获取授权URL抽象方法"""
pass
@abstractmethod
def get_access_token_by_code(self, code):
"""通过授权码获取访问令牌(抽象方法)"""
pass
@abstractmethod
def get_oauth_userinfo(self):
"""获取用户信息(抽象方法)"""
pass
@abstractmethod
def get_picture(self, metadata):
"""从元数据中获取用户头像(抽象方法)"""
pass
def do_get(self, url, params, headers=None):
"""执行GET请求"""
rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
"""执行POST请求"""
rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text)
return rsp.text
def get_config(self):
"""获取OAuth配置"""
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None
class WBOauthManager(BaseOauthManager):
"""微博OAuth管理器"""
# 微博OAuth相关URL
AUTH_URL = 'https://api.weibo.com/oauth2/authorize'
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token'
API_URL = 'https://api.weibo.com/2/users/show.json'
ICON_NAME = 'weibo'
def __init__(self, access_token=None, openid=None):
"""初始化微博OAuth管理器"""
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
WBOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
self.client_id = config.appkey if config else '' # 应用Key
self.client_secret = config.appsecret if config else '' # 应用Secret
self.callback_url = config.callback_url if config else '' # 回调URL
super(WBOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, nexturl='/'):
"""获取微博授权URL"""
params = {
'client_id': self.client_id,
'response_type': 'code',
@ -98,7 +127,7 @@ class WBOauthManager(BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""通过授权码获取访问令牌"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
@ -117,6 +146,7 @@ class WBOauthManager(BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""获取微博用户信息"""
if not self.is_authorized:
return None
params = {
@ -127,14 +157,14 @@ class WBOauthManager(BaseOauthManager):
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['avatar_large']
user.nickname = datas['screen_name']
user.openid = datas['id']
user.type = 'weibo'
user.token = self.access_token
user.metadata = rsp # 原始元数据
user.picture = datas['avatar_large'] # 用户头像
user.nickname = datas['screen_name'] # 用户昵称
user.openid = datas['id'] # 用户OpenID
user.type = 'weibo' # 用户类型
user.token = self.access_token # 访问令牌
if 'email' in datas and datas['email']:
user.email = datas['email']
user.email = datas['email'] # 用户邮箱
return user
except Exception as e:
logger.error(e)
@ -142,12 +172,16 @@ class WBOauthManager(BaseOauthManager):
return None
def get_picture(self, metadata):
"""从元数据中获取用户头像"""
datas = json.loads(metadata)
return datas['avatar_large']
class ProxyManagerMixin:
"""代理管理器混入类,用于处理网络代理"""
def __init__(self, *args, **kwargs):
"""初始化代理设置"""
if os.environ.get("HTTP_PROXY"):
self.proxies = {
"http": os.environ.get("HTTP_PROXY"),
@ -157,50 +191,52 @@ class ProxyManagerMixin:
self.proxies = None
def do_get(self, url, params, headers=None):
"""使用代理执行GET请求"""
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
"""使用代理执行POST请求"""
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
"""Google OAuth管理器"""
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
ICON_NAME = 'google'
def __init__(self, access_token=None, openid=None):
"""初始化Google OAuth管理器"""
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
GoogleOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super(GoogleOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, nexturl='/'):
"""获取Google授权URL"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'openid email',
'scope': 'openid email', # 请求的权限范围
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取访问令牌"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
@ -216,6 +252,7 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""获取Google用户信息"""
if not self.is_authorized:
return None
params = {
@ -223,17 +260,16 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['picture']
user.nickname = datas['name']
user.openid = datas['sub']
user.picture = datas['picture'] # 用户头像
user.nickname = datas['name'] # 用户昵称
user.openid = datas['sub'] # 用户唯一标识
user.token = self.access_token
user.type = 'google'
if datas['email']:
user.email = datas['email']
user.email = datas['email'] # 用户邮箱
return user
except Exception as e:
logger.error(e)
@ -241,48 +277,50 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
return None
def get_picture(self, metadata):
"""从元数据中获取用户头像"""
datas = json.loads(metadata)
return datas['picture']
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
"""GitHub OAuth管理器"""
AUTH_URL = 'https://github.com/login/oauth/authorize'
TOKEN_URL = 'https://github.com/login/oauth/access_token'
API_URL = 'https://api.github.com/user'
ICON_NAME = 'github'
def __init__(self, access_token=None, openid=None):
"""初始化GitHub OAuth管理器"""
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
GitHubOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super(GitHubOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""获取GitHub授权URL"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': f'{self.callback_url}&next_url={next_url}',
'scope': 'user'
'scope': 'user' # 请求的用户权限
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取访问令牌"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
# 解析URL编码的响应
from urllib import parse
r = parse.parse_qs(rsp)
if 'access_token' in r:
@ -292,21 +330,21 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""获取GitHub用户信息"""
rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token
"Authorization": "token " + self.access_token # 使用token进行认证
})
try:
datas = json.loads(rsp)
user = OAuthUser()
user.picture = datas['avatar_url']
user.nickname = datas['name']
user.openid = datas['id']
user.picture = datas['avatar_url'] # 用户头像
user.nickname = datas['name'] # 用户昵称
user.openid = datas['id'] # 用户ID
user.type = 'github'
user.token = self.access_token
user.metadata = rsp
if 'email' in datas and datas['email']:
user.email = datas['email']
user.email = datas['email'] # 用户邮箱
return user
except Exception as e:
logger.error(e)
@ -314,44 +352,44 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
return None
def get_picture(self, metadata):
"""从元数据中获取用户头像"""
datas = json.loads(metadata)
return datas['avatar_url']
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
"""Facebook OAuth管理器"""
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth'
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token'
API_URL = 'https://graph.facebook.com/me'
ICON_NAME = 'facebook'
def __init__(self, access_token=None, openid=None):
"""初始化Facebook OAuth管理器"""
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
FaceBookOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super(FaceBookOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""获取Facebook授权URL"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'email,public_profile'
'scope': 'email,public_profile' # 请求的权限范围
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取访问令牌"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
# 'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
@ -365,52 +403,54 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""获取Facebook用户信息"""
params = {
'access_token': self.access_token,
'fields': 'id,name,picture,email'
'fields': 'id,name,picture,email' # 请求的用户字段
}
try:
rsp = self.do_get(self.API_URL, params)
datas = json.loads(rsp)
user = OAuthUser()
user.nickname = datas['name']
user.openid = datas['id']
user.nickname = datas['name'] # 用户昵称
user.openid = datas['id'] # 用户ID
user.type = 'facebook'
user.token = self.access_token
user.metadata = rsp
if 'email' in datas and datas['email']:
user.email = datas['email']
user.email = datas['email'] # 用户邮箱
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
user.picture = str(datas['picture']['data']['url'])
user.picture = str(datas['picture']['data']['url']) # 用户头像
return user
except Exception as e:
logger.error(e)
return None
def get_picture(self, metadata):
"""从元数据中获取用户头像"""
datas = json.loads(metadata)
return str(datas['picture']['data']['url'])
class QQOauthManager(BaseOauthManager):
"""QQ OAuth管理器"""
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize'
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token'
API_URL = 'https://graph.qq.com/user/get_user_info'
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me'
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' # 获取OpenID的URL
ICON_NAME = 'qq'
def __init__(self, access_token=None, openid=None):
"""初始化QQ OAuth管理器"""
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
QQOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super(QQOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""获取QQ授权URL"""
params = {
'response_type': 'code',
'client_id': self.client_id,
@ -420,6 +460,7 @@ class QQOauthManager(BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""通过授权码获取访问令牌"""
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
@ -429,6 +470,7 @@ class QQOauthManager(BaseOauthManager):
}
rsp = self.do_get(self.TOKEN_URL, params)
if rsp:
# 解析URL编码的响应
d = urllib.parse.parse_qs(rsp)
if 'access_token' in d:
token = d['access_token']
@ -438,22 +480,22 @@ class QQOauthManager(BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_open_id(self):
"""获取用户的OpenID"""
if self.is_access_token_set:
params = {
'access_token': self.access_token
}
rsp = self.do_get(self.OPEN_ID_URL, params)
if rsp:
rsp = rsp.replace(
'callback(', '').replace(
')', '').replace(
';', '')
# 清理响应格式JSONP格式
rsp = rsp.replace('callback(', '').replace(')', '').replace(';', '')
obj = json.loads(rsp)
openid = str(obj['openid'])
self.openid = openid
return openid
def get_oauth_userinfo(self):
"""获取QQ用户信息"""
openid = self.get_open_id()
if openid:
params = {
@ -465,24 +507,26 @@ class QQOauthManager(BaseOauthManager):
logger.info(rsp)
obj = json.loads(rsp)
user = OAuthUser()
user.nickname = obj['nickname']
user.nickname = obj['nickname'] # 用户昵称
user.openid = openid
user.type = 'qq'
user.token = self.access_token
user.metadata = rsp
if 'email' in obj:
user.email = obj['email']
user.email = obj['email'] # 用户邮箱
if 'figureurl' in obj:
user.picture = str(obj['figureurl'])
user.picture = str(obj['figureurl']) # 用户头像
return user
def get_picture(self, metadata):
"""从元数据中获取用户头像"""
datas = json.loads(metadata)
return str(datas['figureurl'])
@cache_decorator(expiration=100 * 60)
def get_oauth_apps():
"""获取所有启用的OAuth应用带缓存"""
configs = OAuthConfig.objects.filter(is_enable=True).all()
if not configs:
return []
@ -493,12 +537,10 @@ def get_oauth_apps():
def get_manager_by_type(type):
"""根据类型获取对应的OAuth管理器"""
applications = get_oauth_apps()
if applications:
finds = list(
filter(
lambda x: x.ICON_NAME.lower() == type.lower(),
applications))
finds = list(filter(lambda x: x.ICON_NAME.lower() == type.lower(), applications))
if finds:
return finds[0]
return None
return None
Loading…
Cancel
Save