|
|
|
|
@ -1,3 +1,11 @@
|
|
|
|
|
"""
|
|
|
|
|
OAuth 认证管理器模块
|
|
|
|
|
|
|
|
|
|
该模块实现了多平台OAuth认证的核心逻辑,包含基类定义和具体平台实现。
|
|
|
|
|
支持微博、谷歌、GitHub、Facebook、QQ等主流第三方登录平台。
|
|
|
|
|
采用抽象基类和混合类设计模式,提供统一的OAuth认证接口。
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
@ -9,79 +17,139 @@ import requests
|
|
|
|
|
from djangoblog.utils import cache_decorator
|
|
|
|
|
from oauth.models import OAuthUser, OAuthConfig
|
|
|
|
|
|
|
|
|
|
# 获取当前模块的日志记录器
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class OAuthAccessTokenException(Exception):
|
|
|
|
|
'''
|
|
|
|
|
oauth授权失败异常
|
|
|
|
|
OAuth授权令牌获取异常类
|
|
|
|
|
|
|
|
|
|
当从OAuth服务商获取访问令牌失败时抛出此异常,
|
|
|
|
|
通常由于错误的授权码、应用配置问题或网络问题导致。
|
|
|
|
|
'''
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BaseOauthManager(metaclass=ABCMeta):
|
|
|
|
|
"""获取用户授权"""
|
|
|
|
|
"""
|
|
|
|
|
OAuth认证管理器抽象基类
|
|
|
|
|
|
|
|
|
|
定义所有OAuth平台必须实现的接口和方法,
|
|
|
|
|
提供统一的OAuth认证流程模板。
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# OAuth授权页面URL(需要子类实现)
|
|
|
|
|
AUTH_URL = None
|
|
|
|
|
"""获取token"""
|
|
|
|
|
# 获取访问令牌的URL(需要子类实现)
|
|
|
|
|
TOKEN_URL = None
|
|
|
|
|
"""获取用户信息"""
|
|
|
|
|
# 获取用户信息的API URL(需要子类实现)
|
|
|
|
|
API_URL = None
|
|
|
|
|
'''icon图标名'''
|
|
|
|
|
# 平台图标名称,用于标识和显示(需要子类实现)
|
|
|
|
|
ICON_NAME = None
|
|
|
|
|
|
|
|
|
|
def __init__(self, access_token=None, openid=None):
|
|
|
|
|
"""
|
|
|
|
|
初始化OAuth管理器
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
access_token: 已存在的访问令牌(可选)
|
|
|
|
|
openid: 已存在的用户OpenID(可选)
|
|
|
|
|
"""
|
|
|
|
|
self.access_token = access_token
|
|
|
|
|
self.openid = openid
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def is_access_token_set(self):
|
|
|
|
|
"""检查访问令牌是否已设置"""
|
|
|
|
|
return self.access_token is not None
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def is_authorized(self):
|
|
|
|
|
"""检查是否已完成授权(拥有令牌和OpenID)"""
|
|
|
|
|
return self.is_access_token_set and self.access_token is not None and self.openid is not None
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def get_authorization_url(self, nexturl='/'):
|
|
|
|
|
"""获取授权页面URL(抽象方法,子类必须实现)"""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def get_access_token_by_code(self, code):
|
|
|
|
|
"""通过授权码获取访问令牌(抽象方法,子类必须实现)"""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def get_oauth_userinfo(self):
|
|
|
|
|
"""获取OAuth用户信息(抽象方法,子类必须实现)"""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def get_picture(self, metadata):
|
|
|
|
|
"""从元数据中提取用户头像URL(抽象方法,子类必须实现)"""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def do_get(self, url, params, headers=None):
|
|
|
|
|
"""
|
|
|
|
|
执行GET请求的通用方法
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
url: 请求URL
|
|
|
|
|
params: 请求参数
|
|
|
|
|
headers: 请求头(可选)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: 响应文本内容
|
|
|
|
|
"""
|
|
|
|
|
rsp = requests.get(url=url, params=params, headers=headers)
|
|
|
|
|
logger.info(rsp.text)
|
|
|
|
|
logger.info(rsp.text) # 记录响应日志
|
|
|
|
|
return rsp.text
|
|
|
|
|
|
|
|
|
|
def do_post(self, url, params, headers=None):
|
|
|
|
|
"""
|
|
|
|
|
执行POST请求的通用方法
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
url: 请求URL
|
|
|
|
|
params: 请求参数
|
|
|
|
|
headers: 请求头(可选)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: 响应文本内容
|
|
|
|
|
"""
|
|
|
|
|
rsp = requests.post(url, params, headers=headers)
|
|
|
|
|
logger.info(rsp.text)
|
|
|
|
|
logger.info(rsp.text) # 记录响应日志
|
|
|
|
|
return rsp.text
|
|
|
|
|
|
|
|
|
|
def get_config(self):
|
|
|
|
|
"""
|
|
|
|
|
从数据库获取当前平台的OAuth配置
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
OAuthConfig: 配置对象,如果不存在则返回None
|
|
|
|
|
"""
|
|
|
|
|
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
|
|
|
|
|
return value[0] if value else None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WBOauthManager(BaseOauthManager):
|
|
|
|
|
"""
|
|
|
|
|
微博OAuth认证管理器
|
|
|
|
|
|
|
|
|
|
实现微博平台的OAuth2.0认证流程,包括授权、令牌获取和用户信息获取。
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# 微博OAuth接口地址
|
|
|
|
|
AUTH_URL = 'https://api.weibo.com/oauth2/authorize'
|
|
|
|
|
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token'
|
|
|
|
|
API_URL = 'https://api.weibo.com/2/users/show.json'
|
|
|
|
|
ICON_NAME = 'weibo'
|
|
|
|
|
|
|
|
|
|
def __init__(self, access_token=None, openid=None):
|
|
|
|
|
"""初始化微博OAuth配置"""
|
|
|
|
|
config = self.get_config()
|
|
|
|
|
self.client_id = config.appkey if config else ''
|
|
|
|
|
self.client_secret = config.appsecret if config else ''
|
|
|
|
|
self.callback_url = config.callback_url if config else ''
|
|
|
|
|
self.client_id = config.appkey if config else '' # 应用Key
|
|
|
|
|
self.client_secret = config.appsecret if config else '' # 应用Secret
|
|
|
|
|
self.callback_url = config.callback_url if config else '' # 回调地址
|
|
|
|
|
super(
|
|
|
|
|
WBOauthManager,
|
|
|
|
|
self).__init__(
|
|
|
|
|
@ -89,6 +157,15 @@ class WBOauthManager(BaseOauthManager):
|
|
|
|
|
openid=openid)
|
|
|
|
|
|
|
|
|
|
def get_authorization_url(self, nexturl='/'):
|
|
|
|
|
"""
|
|
|
|
|
生成微博授权页面URL
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
nexturl: 授权成功后跳转的URL
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: 完整的授权URL
|
|
|
|
|
"""
|
|
|
|
|
params = {
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'response_type': 'code',
|
|
|
|
|
@ -98,7 +175,18 @@ class WBOauthManager(BaseOauthManager):
|
|
|
|
|
return url
|
|
|
|
|
|
|
|
|
|
def get_access_token_by_code(self, code):
|
|
|
|
|
"""
|
|
|
|
|
使用授权码获取访问令牌
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
code: OAuth授权码
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
OAuthUser: 用户信息对象
|
|
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
|
OAuthAccessTokenException: 令牌获取失败时抛出
|
|
|
|
|
"""
|
|
|
|
|
params = {
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'client_secret': self.client_secret,
|
|
|
|
|
@ -110,13 +198,20 @@ class WBOauthManager(BaseOauthManager):
|
|
|
|
|
|
|
|
|
|
obj = json.loads(rsp)
|
|
|
|
|
if 'access_token' in obj:
|
|
|
|
|
# 设置访问令牌和用户ID
|
|
|
|
|
self.access_token = str(obj['access_token'])
|
|
|
|
|
self.openid = str(obj['uid'])
|
|
|
|
|
return self.get_oauth_userinfo()
|
|
|
|
|
return self.get_oauth_userinfo() # 获取并返回用户信息
|
|
|
|
|
else:
|
|
|
|
|
raise OAuthAccessTokenException(rsp)
|
|
|
|
|
|
|
|
|
|
def get_oauth_userinfo(self):
|
|
|
|
|
"""
|
|
|
|
|
获取微博用户信息
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
OAuthUser: 包含用户信息的对象,获取失败返回None
|
|
|
|
|
"""
|
|
|
|
|
if not self.is_authorized:
|
|
|
|
|
return None
|
|
|
|
|
params = {
|
|
|
|
|
@ -127,14 +222,14 @@ class WBOauthManager(BaseOauthManager):
|
|
|
|
|
try:
|
|
|
|
|
datas = json.loads(rsp)
|
|
|
|
|
user = OAuthUser()
|
|
|
|
|
user.metadata = rsp
|
|
|
|
|
user.picture = datas['avatar_large']
|
|
|
|
|
user.nickname = datas['screen_name']
|
|
|
|
|
user.openid = datas['id']
|
|
|
|
|
user.type = 'weibo'
|
|
|
|
|
user.token = self.access_token
|
|
|
|
|
user.metadata = rsp # 存储原始响应数据
|
|
|
|
|
user.picture = datas['avatar_large'] # 用户头像
|
|
|
|
|
user.nickname = datas['screen_name'] # 用户昵称
|
|
|
|
|
user.openid = datas['id'] # 用户OpenID
|
|
|
|
|
user.type = 'weibo' # 平台类型
|
|
|
|
|
user.token = self.access_token # 访问令牌
|
|
|
|
|
if 'email' in datas and datas['email']:
|
|
|
|
|
user.email = datas['email']
|
|
|
|
|
user.email = datas['email'] # 用户邮箱
|
|
|
|
|
return user
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(e)
|
|
|
|
|
@ -142,13 +237,30 @@ class WBOauthManager(BaseOauthManager):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def get_picture(self, metadata):
|
|
|
|
|
"""
|
|
|
|
|
从元数据中提取微博用户头像
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
metadata: 用户元数据JSON字符串
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: 用户头像URL
|
|
|
|
|
"""
|
|
|
|
|
datas = json.loads(metadata)
|
|
|
|
|
return datas['avatar_large']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ProxyManagerMixin:
|
|
|
|
|
"""
|
|
|
|
|
代理管理器混合类
|
|
|
|
|
|
|
|
|
|
为OAuth管理器添加HTTP代理支持,用于网络访问受限的环境。
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
|
|
|
"""初始化代理配置"""
|
|
|
|
|
if os.environ.get("HTTP_PROXY"):
|
|
|
|
|
# 设置HTTP和HTTPS代理
|
|
|
|
|
self.proxies = {
|
|
|
|
|
"http": os.environ.get("HTTP_PROXY"),
|
|
|
|
|
"https": os.environ.get("HTTP_PROXY")
|
|
|
|
|
@ -157,23 +269,32 @@ class ProxyManagerMixin:
|
|
|
|
|
self.proxies = None
|
|
|
|
|
|
|
|
|
|
def do_get(self, url, params, headers=None):
|
|
|
|
|
"""带代理支持的GET请求"""
|
|
|
|
|
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
|
|
|
|
|
logger.info(rsp.text)
|
|
|
|
|
return rsp.text
|
|
|
|
|
|
|
|
|
|
def do_post(self, url, params, headers=None):
|
|
|
|
|
"""带代理支持的POST请求"""
|
|
|
|
|
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
|
|
|
|
|
logger.info(rsp.text)
|
|
|
|
|
return rsp.text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
"""
|
|
|
|
|
谷歌OAuth认证管理器
|
|
|
|
|
|
|
|
|
|
实现谷歌平台的OAuth2.0认证流程,支持代理访问。
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
|
|
|
|
|
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
|
|
|
|
|
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
|
|
|
|
|
ICON_NAME = 'google'
|
|
|
|
|
|
|
|
|
|
def __init__(self, access_token=None, openid=None):
|
|
|
|
|
"""初始化谷歌OAuth配置"""
|
|
|
|
|
config = self.get_config()
|
|
|
|
|
self.client_id = config.appkey if config else ''
|
|
|
|
|
self.client_secret = config.appsecret if config else ''
|
|
|
|
|
@ -185,22 +306,23 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
openid=openid)
|
|
|
|
|
|
|
|
|
|
def get_authorization_url(self, nexturl='/'):
|
|
|
|
|
"""生成谷歌授权页面URL"""
|
|
|
|
|
params = {
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'response_type': 'code',
|
|
|
|
|
'redirect_uri': self.callback_url,
|
|
|
|
|
'scope': 'openid email',
|
|
|
|
|
'scope': 'openid email', # 请求openid和email权限
|
|
|
|
|
}
|
|
|
|
|
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
|
|
|
|
|
return url
|
|
|
|
|
|
|
|
|
|
def get_access_token_by_code(self, code):
|
|
|
|
|
"""使用授权码获取谷歌访问令牌"""
|
|
|
|
|
params = {
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'client_secret': self.client_secret,
|
|
|
|
|
'grant_type': 'authorization_code',
|
|
|
|
|
'code': code,
|
|
|
|
|
|
|
|
|
|
'redirect_uri': self.callback_url
|
|
|
|
|
}
|
|
|
|
|
rsp = self.do_post(self.TOKEN_URL, params)
|
|
|
|
|
@ -216,6 +338,7 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
raise OAuthAccessTokenException(rsp)
|
|
|
|
|
|
|
|
|
|
def get_oauth_userinfo(self):
|
|
|
|
|
"""获取谷歌用户信息"""
|
|
|
|
|
if not self.is_authorized:
|
|
|
|
|
return None
|
|
|
|
|
params = {
|
|
|
|
|
@ -223,17 +346,16 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
}
|
|
|
|
|
rsp = self.do_get(self.API_URL, params)
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
|
|
datas = json.loads(rsp)
|
|
|
|
|
user = OAuthUser()
|
|
|
|
|
user.metadata = rsp
|
|
|
|
|
user.picture = datas['picture']
|
|
|
|
|
user.nickname = datas['name']
|
|
|
|
|
user.openid = datas['sub']
|
|
|
|
|
user.picture = datas['picture'] # 谷歌用户头像
|
|
|
|
|
user.nickname = datas['name'] # 谷歌用户姓名
|
|
|
|
|
user.openid = datas['sub'] # 谷歌用户唯一标识
|
|
|
|
|
user.token = self.access_token
|
|
|
|
|
user.type = 'google'
|
|
|
|
|
if datas['email']:
|
|
|
|
|
user.email = datas['email']
|
|
|
|
|
user.email = datas['email'] # 谷歌邮箱
|
|
|
|
|
return user
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(e)
|
|
|
|
|
@ -241,17 +363,25 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def get_picture(self, metadata):
|
|
|
|
|
"""从元数据中提取谷歌用户头像"""
|
|
|
|
|
datas = json.loads(metadata)
|
|
|
|
|
return datas['picture']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
"""
|
|
|
|
|
GitHub OAuth认证管理器
|
|
|
|
|
|
|
|
|
|
实现GitHub平台的OAuth2.0认证流程,支持代理访问。
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
AUTH_URL = 'https://github.com/login/oauth/authorize'
|
|
|
|
|
TOKEN_URL = 'https://github.com/login/oauth/access_token'
|
|
|
|
|
API_URL = 'https://api.github.com/user'
|
|
|
|
|
ICON_NAME = 'github'
|
|
|
|
|
|
|
|
|
|
def __init__(self, access_token=None, openid=None):
|
|
|
|
|
"""初始化GitHub OAuth配置"""
|
|
|
|
|
config = self.get_config()
|
|
|
|
|
self.client_id = config.appkey if config else ''
|
|
|
|
|
self.client_secret = config.appsecret if config else ''
|
|
|
|
|
@ -263,28 +393,29 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
openid=openid)
|
|
|
|
|
|
|
|
|
|
def get_authorization_url(self, next_url='/'):
|
|
|
|
|
"""生成GitHub授权页面URL"""
|
|
|
|
|
params = {
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'response_type': 'code',
|
|
|
|
|
'redirect_uri': f'{self.callback_url}&next_url={next_url}',
|
|
|
|
|
'scope': 'user'
|
|
|
|
|
'scope': 'user' # 请求用户信息权限
|
|
|
|
|
}
|
|
|
|
|
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
|
|
|
|
|
return url
|
|
|
|
|
|
|
|
|
|
def get_access_token_by_code(self, code):
|
|
|
|
|
"""使用授权码获取GitHub访问令牌"""
|
|
|
|
|
params = {
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'client_secret': self.client_secret,
|
|
|
|
|
'grant_type': 'authorization_code',
|
|
|
|
|
'code': code,
|
|
|
|
|
|
|
|
|
|
'redirect_uri': self.callback_url
|
|
|
|
|
}
|
|
|
|
|
rsp = self.do_post(self.TOKEN_URL, params)
|
|
|
|
|
|
|
|
|
|
from urllib import parse
|
|
|
|
|
r = parse.parse_qs(rsp)
|
|
|
|
|
r = parse.parse_qs(rsp) # 解析查询字符串格式的响应
|
|
|
|
|
if 'access_token' in r:
|
|
|
|
|
self.access_token = (r['access_token'][0])
|
|
|
|
|
return self.access_token
|
|
|
|
|
@ -292,21 +423,22 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
raise OAuthAccessTokenException(rsp)
|
|
|
|
|
|
|
|
|
|
def get_oauth_userinfo(self):
|
|
|
|
|
|
|
|
|
|
"""获取GitHub用户信息"""
|
|
|
|
|
# 使用Bearer Token认证方式调用GitHub API
|
|
|
|
|
rsp = self.do_get(self.API_URL, params={}, headers={
|
|
|
|
|
"Authorization": "token " + self.access_token
|
|
|
|
|
})
|
|
|
|
|
try:
|
|
|
|
|
datas = json.loads(rsp)
|
|
|
|
|
user = OAuthUser()
|
|
|
|
|
user.picture = datas['avatar_url']
|
|
|
|
|
user.nickname = datas['name']
|
|
|
|
|
user.openid = datas['id']
|
|
|
|
|
user.picture = datas['avatar_url'] # GitHub头像
|
|
|
|
|
user.nickname = datas['name'] # GitHub姓名
|
|
|
|
|
user.openid = datas['id'] # GitHub用户ID
|
|
|
|
|
user.type = 'github'
|
|
|
|
|
user.token = self.access_token
|
|
|
|
|
user.metadata = rsp
|
|
|
|
|
if 'email' in datas and datas['email']:
|
|
|
|
|
user.email = datas['email']
|
|
|
|
|
user.email = datas['email'] # GitHub邮箱
|
|
|
|
|
return user
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(e)
|
|
|
|
|
@ -314,17 +446,25 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def get_picture(self, metadata):
|
|
|
|
|
"""从元数据中提取GitHub用户头像"""
|
|
|
|
|
datas = json.loads(metadata)
|
|
|
|
|
return datas['avatar_url']
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
"""
|
|
|
|
|
Facebook OAuth认证管理器
|
|
|
|
|
|
|
|
|
|
实现Facebook平台的OAuth2.0认证流程,支持代理访问。
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth'
|
|
|
|
|
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token'
|
|
|
|
|
API_URL = 'https://graph.facebook.com/me'
|
|
|
|
|
ICON_NAME = 'facebook'
|
|
|
|
|
|
|
|
|
|
def __init__(self, access_token=None, openid=None):
|
|
|
|
|
"""初始化Facebook OAuth配置"""
|
|
|
|
|
config = self.get_config()
|
|
|
|
|
self.client_id = config.appkey if config else ''
|
|
|
|
|
self.client_secret = config.appsecret if config else ''
|
|
|
|
|
@ -336,22 +476,22 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
openid=openid)
|
|
|
|
|
|
|
|
|
|
def get_authorization_url(self, next_url='/'):
|
|
|
|
|
"""生成Facebook授权页面URL"""
|
|
|
|
|
params = {
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'response_type': 'code',
|
|
|
|
|
'redirect_uri': self.callback_url,
|
|
|
|
|
'scope': 'email,public_profile'
|
|
|
|
|
'scope': 'email,public_profile' # 请求邮箱和公开资料权限
|
|
|
|
|
}
|
|
|
|
|
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
|
|
|
|
|
return url
|
|
|
|
|
|
|
|
|
|
def get_access_token_by_code(self, code):
|
|
|
|
|
"""使用授权码获取Facebook访问令牌"""
|
|
|
|
|
params = {
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'client_secret': self.client_secret,
|
|
|
|
|
# 'grant_type': 'authorization_code',
|
|
|
|
|
'code': code,
|
|
|
|
|
|
|
|
|
|
'redirect_uri': self.callback_url
|
|
|
|
|
}
|
|
|
|
|
rsp = self.do_post(self.TOKEN_URL, params)
|
|
|
|
|
@ -365,21 +505,23 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
raise OAuthAccessTokenException(rsp)
|
|
|
|
|
|
|
|
|
|
def get_oauth_userinfo(self):
|
|
|
|
|
"""获取Facebook用户信息"""
|
|
|
|
|
params = {
|
|
|
|
|
'access_token': self.access_token,
|
|
|
|
|
'fields': 'id,name,picture,email'
|
|
|
|
|
'fields': 'id,name,picture,email' # 指定需要返回的字段
|
|
|
|
|
}
|
|
|
|
|
try:
|
|
|
|
|
rsp = self.do_get(self.API_URL, params)
|
|
|
|
|
datas = json.loads(rsp)
|
|
|
|
|
user = OAuthUser()
|
|
|
|
|
user.nickname = datas['name']
|
|
|
|
|
user.openid = datas['id']
|
|
|
|
|
user.nickname = datas['name'] # Facebook姓名
|
|
|
|
|
user.openid = datas['id'] # Facebook用户ID
|
|
|
|
|
user.type = 'facebook'
|
|
|
|
|
user.token = self.access_token
|
|
|
|
|
user.metadata = rsp
|
|
|
|
|
if 'email' in datas and datas['email']:
|
|
|
|
|
user.email = datas['email']
|
|
|
|
|
user.email = datas['email'] # Facebook邮箱
|
|
|
|
|
# 处理嵌套的头像数据结构
|
|
|
|
|
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
|
|
|
|
|
user.picture = str(datas['picture']['data']['url'])
|
|
|
|
|
return user
|
|
|
|
|
@ -388,18 +530,26 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
def get_picture(self, metadata):
|
|
|
|
|
"""从元数据中提取Facebook用户头像"""
|
|
|
|
|
datas = json.loads(metadata)
|
|
|
|
|
return str(datas['picture']['data']['url'])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class QQOauthManager(BaseOauthManager):
|
|
|
|
|
"""
|
|
|
|
|
QQ OAuth认证管理器
|
|
|
|
|
|
|
|
|
|
实现QQ平台的OAuth2.0认证流程,包含特殊的OpenID获取步骤。
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize'
|
|
|
|
|
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token'
|
|
|
|
|
API_URL = 'https://graph.qq.com/user/get_user_info'
|
|
|
|
|
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me'
|
|
|
|
|
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' # QQ特有的OpenID获取接口
|
|
|
|
|
ICON_NAME = 'qq'
|
|
|
|
|
|
|
|
|
|
def __init__(self, access_token=None, openid=None):
|
|
|
|
|
"""初始化QQ OAuth配置"""
|
|
|
|
|
config = self.get_config()
|
|
|
|
|
self.client_id = config.appkey if config else ''
|
|
|
|
|
self.client_secret = config.appsecret if config else ''
|
|
|
|
|
@ -411,6 +561,7 @@ class QQOauthManager(BaseOauthManager):
|
|
|
|
|
openid=openid)
|
|
|
|
|
|
|
|
|
|
def get_authorization_url(self, next_url='/'):
|
|
|
|
|
"""生成QQ授权页面URL"""
|
|
|
|
|
params = {
|
|
|
|
|
'response_type': 'code',
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
@ -420,6 +571,7 @@ class QQOauthManager(BaseOauthManager):
|
|
|
|
|
return url
|
|
|
|
|
|
|
|
|
|
def get_access_token_by_code(self, code):
|
|
|
|
|
"""使用授权码获取QQ访问令牌"""
|
|
|
|
|
params = {
|
|
|
|
|
'grant_type': 'authorization_code',
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
@ -429,7 +581,7 @@ class QQOauthManager(BaseOauthManager):
|
|
|
|
|
}
|
|
|
|
|
rsp = self.do_get(self.TOKEN_URL, params)
|
|
|
|
|
if rsp:
|
|
|
|
|
d = urllib.parse.parse_qs(rsp)
|
|
|
|
|
d = urllib.parse.parse_qs(rsp) # 解析查询字符串响应
|
|
|
|
|
if 'access_token' in d:
|
|
|
|
|
token = d['access_token']
|
|
|
|
|
self.access_token = token[0]
|
|
|
|
|
@ -438,23 +590,27 @@ class QQOauthManager(BaseOauthManager):
|
|
|
|
|
raise OAuthAccessTokenException(rsp)
|
|
|
|
|
|
|
|
|
|
def get_open_id(self):
|
|
|
|
|
"""
|
|
|
|
|
获取QQ用户的OpenID
|
|
|
|
|
|
|
|
|
|
QQ平台需要额外调用接口获取用户OpenID
|
|
|
|
|
"""
|
|
|
|
|
if self.is_access_token_set:
|
|
|
|
|
params = {
|
|
|
|
|
'access_token': self.access_token
|
|
|
|
|
}
|
|
|
|
|
rsp = self.do_get(self.OPEN_ID_URL, params)
|
|
|
|
|
if rsp:
|
|
|
|
|
rsp = rsp.replace(
|
|
|
|
|
'callback(', '').replace(
|
|
|
|
|
')', '').replace(
|
|
|
|
|
';', '')
|
|
|
|
|
# 清理JSONP响应格式
|
|
|
|
|
rsp = rsp.replace('callback(', '').replace(')', '').replace(';', '')
|
|
|
|
|
obj = json.loads(rsp)
|
|
|
|
|
openid = str(obj['openid'])
|
|
|
|
|
self.openid = openid
|
|
|
|
|
return openid
|
|
|
|
|
|
|
|
|
|
def get_oauth_userinfo(self):
|
|
|
|
|
openid = self.get_open_id()
|
|
|
|
|
"""获取QQ用户信息"""
|
|
|
|
|
openid = self.get_open_id() # 先获取OpenID
|
|
|
|
|
if openid:
|
|
|
|
|
params = {
|
|
|
|
|
'access_token': self.access_token,
|
|
|
|
|
@ -465,40 +621,60 @@ class QQOauthManager(BaseOauthManager):
|
|
|
|
|
logger.info(rsp)
|
|
|
|
|
obj = json.loads(rsp)
|
|
|
|
|
user = OAuthUser()
|
|
|
|
|
user.nickname = obj['nickname']
|
|
|
|
|
user.openid = openid
|
|
|
|
|
user.nickname = obj['nickname'] # QQ昵称
|
|
|
|
|
user.openid = openid # QQ OpenID
|
|
|
|
|
user.type = 'qq'
|
|
|
|
|
user.token = self.access_token
|
|
|
|
|
user.metadata = rsp
|
|
|
|
|
if 'email' in obj:
|
|
|
|
|
user.email = obj['email']
|
|
|
|
|
user.email = obj['email'] # QQ邮箱
|
|
|
|
|
if 'figureurl' in obj:
|
|
|
|
|
user.picture = str(obj['figureurl'])
|
|
|
|
|
user.picture = str(obj['figureurl']) # QQ头像
|
|
|
|
|
return user
|
|
|
|
|
|
|
|
|
|
def get_picture(self, metadata):
|
|
|
|
|
"""从元数据中提取QQ用户头像"""
|
|
|
|
|
datas = json.loads(metadata)
|
|
|
|
|
return str(datas['figureurl'])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@cache_decorator(expiration=100 * 60)
|
|
|
|
|
def get_oauth_apps():
|
|
|
|
|
"""
|
|
|
|
|
获取所有启用的OAuth应用配置
|
|
|
|
|
|
|
|
|
|
使用缓存装饰器,缓存100分钟,减少数据库查询
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
list: 启用的OAuth管理器实例列表
|
|
|
|
|
"""
|
|
|
|
|
configs = OAuthConfig.objects.filter(is_enable=True).all()
|
|
|
|
|
if not configs:
|
|
|
|
|
return []
|
|
|
|
|
configtypes = [x.type for x in configs]
|
|
|
|
|
applications = BaseOauthManager.__subclasses__()
|
|
|
|
|
configtypes = [x.type for x in configs] # 提取启用的平台类型
|
|
|
|
|
applications = BaseOauthManager.__subclasses__() # 获取所有子类
|
|
|
|
|
# 过滤出已启用的平台管理器实例
|
|
|
|
|
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
|
|
|
|
|
return apps
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_manager_by_type(type):
|
|
|
|
|
"""
|
|
|
|
|
根据平台类型获取对应的OAuth管理器
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
type: 平台类型字符串(如:'weibo', 'github')
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
BaseOauthManager: 对应平台的OAuth管理器实例,未找到返回None
|
|
|
|
|
"""
|
|
|
|
|
applications = get_oauth_apps()
|
|
|
|
|
if applications:
|
|
|
|
|
# 查找匹配平台类型的管理器
|
|
|
|
|
finds = list(
|
|
|
|
|
filter(
|
|
|
|
|
lambda x: x.ICON_NAME.lower() == type.lower(),
|
|
|
|
|
applications))
|
|
|
|
|
if finds:
|
|
|
|
|
return finds[0]
|
|
|
|
|
return None
|
|
|
|
|
return None
|