You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Django/doc/oauth/oauthmanager.py

593 lines
23 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import logging
import os
import urllib.parse
from abc import ABCMeta, abstractmethod # 用于定义抽象基类
import requests # 用于发送HTTP请求
from djangoblog.utils import cache_decorator # 导入缓存装饰器
from oauth.models import OAuthUser, OAuthConfig # 导入OAuth相关模型
# 初始化日志记录器,用于记录当前模块的日志信息
logger = logging.getLogger(__name__)
class OAuthAccessTokenException(Exception):
'''
自定义异常OAuth授权过程中获取Access Token失败时抛出
'''
class BaseOauthManager(metaclass=ABCMeta):
"""
OAuth抽象基类定义第三方登录的通用接口和基础方法
所有第三方平台的OAuth管理器都需继承此类并实现抽象方法
"""
# 子类需重写授权页面URL用户跳转授权用
AUTH_URL = None
# 子类需重写获取Access Token的URL
TOKEN_URL = None
# 子类需重写获取用户信息的API URL
API_URL = None
# 子类需重写平台图标名称对应OAuthConfig的type字段
ICON_NAME = None
def __init__(self, access_token=None, openid=None):
"""
初始化OAuth管理器
:param access_token: 第三方平台返回的访问令牌
:param openid: 第三方平台用户唯一标识
"""
self.access_token = access_token # 存储访问令牌
self.openid = openid # 存储用户唯一标识
@property
def is_access_token_set(self):
"""属性判断Access Token是否已设置"""
return self.access_token is not None
@property
def is_authorized(self):
"""属性判断是否已完成授权Access Token和OpenID均存在"""
return self.is_access_token_set and self.access_token is not None and self.openid is not None
@abstractmethod
def get_authorization_url(self, nexturl='/'):
"""
抽象方法生成第三方授权页面的URL
:param nexturl: 授权成功后跳转的页面地址
:return: 完整的授权URL字符串
"""
pass
@abstractmethod
def get_access_token_by_code(self, code):
"""
抽象方法通过授权码code获取Access Token
:param code: 第三方平台返回的授权码
:return: 成功返回用户信息或Token失败抛出异常
"""
pass
@abstractmethod
def get_oauth_userinfo(self):
"""
抽象方法通过Access Token获取第三方用户信息
:return: 构造好的OAuthUser对象失败返回None
"""
pass
@abstractmethod
def get_picture(self, metadata):
"""
抽象方法从用户元数据中提取头像URL
:param metadata: 存储用户信息的元数据JSON字符串
:return: 头像URL字符串
"""
pass
def do_get(self, url, params, headers=None):
"""
基础方法发送GET请求子类可重写
:param url: 请求地址
:param params: 请求参数
:param headers: 请求头
:return: 响应文本内容
"""
rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text) # 记录响应日志
return rsp.text
def do_post(self, url, params, headers=None):
"""
基础方法发送POST请求子类可重写
:param url: 请求地址
:param params: 请求参数
:param headers: 请求头
:return: 响应文本内容
"""
rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text) # 记录响应日志
return rsp.text
def get_config(self):
"""
获取当前平台的OAuth配置从OAuthConfig模型中查询
:return: OAuthConfig对象不存在返回None
"""
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None
class WBOauthManager(BaseOauthManager):
"""微博OAuth登录管理器实现微博第三方登录的具体逻辑"""
AUTH_URL = 'https://api.weibo.com/oauth2/authorize' # 微博授权URL
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token' # 微博Token获取URL
API_URL = 'https://api.weibo.com/2/users/show.json' # 微博用户信息API
ICON_NAME = 'weibo' # 对应配置中的平台类型
def __init__(self, access_token=None, openid=None):
# 先获取微博的OAuth配置
config = self.get_config()
self.client_id = config.appkey if config else '' # 应用ID
self.client_secret = config.appsecret if config else '' # 应用密钥
self.callback_url = config.callback_url if config else '' # 回调地址
# 调用父类初始化方法
super(WBOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, nexturl='/'):
"""生成微博授权URL拼接跳转地址参数"""
params = {
'client_id': self.client_id,
'response_type': 'code', # 授权类型为code
'redirect_uri': self.callback_url + '&next_url=' + nexturl # 回调地址+登录后跳转地址
}
# 拼接参数生成完整URL
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取微博Access Token并调用用户信息接口"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code', # 授权模式
'code': code, # 授权码
'redirect_uri': self.callback_url # 回调地址(需与授权时一致)
}
# 发送POST请求获取Token
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
# 成功获取Token则继续获取用户信息
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['uid']) # 微博用户唯一标识uid
return self.get_oauth_userinfo()
else:
# 失败则抛出异常
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""通过Access Token获取微博用户信息构造OAuthUser对象"""
if not self.is_authorized:
return None # 未授权则返回None
params = {
'uid': self.openid,
'access_token': self.access_token
}
# 发送GET请求获取用户信息
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp # 存储原始响应数据
user.picture = datas['avatar_large'] # 大尺寸头像
user.nickname = datas['screen_name'] # 微博昵称
user.openid = datas['id'] # 用户唯一标识
user.type = 'weibo' # 平台类型
user.token = self.access_token # 存储Access Token
# 若返回邮箱则赋值
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('weibo oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
"""从元数据中提取微博用户头像URL"""
datas = json.loads(metadata)
return datas['avatar_large']
class ProxyManagerMixin:
"""
代理混入类为HTTP请求添加代理支持
需与BaseOauthManager组合使用适用于需要代理访问的平台如谷歌、GitHub
"""
def __init__(self, *args, **kwargs):
# 从环境变量读取代理配置
if os.environ.get("HTTP_PROXY"):
self.proxies = {
"http": os.environ.get("HTTP_PROXY"),
"https": os.environ.get("HTTP_PROXY")
}
else:
self.proxies = None # 无代理则为None
# 调用父类初始化方法(注意:混入类需放在继承列表前面)
super().__init__(*args, **kwargs)
def do_get(self, url, params, headers=None):
"""重写GET方法添加代理参数"""
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
"""重写POST方法添加代理参数"""
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
"""谷歌OAuth登录管理器集成代理支持实现谷歌第三方登录逻辑"""
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' # 谷歌授权URL
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token' # 谷歌Token获取URL
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo' # 谷歌用户信息API
ICON_NAME = 'google' # 对应配置中的平台类型
def __init__(self, access_token=None, openid=None):
# 获取谷歌OAuth配置
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
# 调用父类ProxyManagerMixin初始化方法
super(GoogleOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, nexturl='/'):
"""生成谷歌授权URL请求openid和email权限"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'openid email', # 授权范围:获取用户标识和邮箱
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取谷歌Access Token"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['id_token']) # 谷歌用户唯一标识id_token
logger.info(self.ICON_NAME + ' oauth ' + rsp)
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""通过Access Token获取谷歌用户信息"""
if not self.is_authorized:
return None
params = {'access_token': self.access_token}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['picture'] # 头像URL
user.nickname = datas['name'] # 用户名
user.openid = datas['sub'] # 用户唯一标识
user.token = self.access_token
user.type = 'google'
if datas['email']:
user.email = datas['email'] # 邮箱(谷歌授权时已请求)
return user
except Exception as e:
logger.error(e)
logger.error('google oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
"""从元数据中提取谷歌用户头像URL"""
datas = json.loads(metadata)
return datas['picture']
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
"""GitHub OAuth登录管理器集成代理支持实现GitHub第三方登录逻辑"""
AUTH_URL = 'https://github.com/login/oauth/authorize' # GitHub授权URL
TOKEN_URL = 'https://github.com/login/oauth/access_token' # GitHub Token获取URL
API_URL = 'https://api.github.com/user' # GitHub用户信息API
ICON_NAME = 'github' # 对应配置中的平台类型
def __init__(self, access_token=None, openid=None):
# 获取GitHub OAuth配置
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
# 调用父类初始化方法
super(GitHubOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""生成GitHub授权URL请求user权限"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': f'{self.callback_url}&next_url={next_url}', # 回调+跳转地址
'scope': 'user' # 授权范围:获取用户基本信息
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取GitHub Access Token返回格式为query string"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
# GitHub返回的Token是query string格式需解析
from urllib import parse
r = parse.parse_qs(rsp)
if 'access_token' in r:
self.access_token = (r['access_token'][0]) # 取第一个值(列表格式)
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""通过Access Token获取GitHub用户信息需在请求头中携带Token"""
# GitHub需在请求头中传递Token而非URL参数
rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token
})
try:
datas = json.loads(rsp)
user = OAuthUser()
user.picture = datas['avatar_url'] # 头像URL
user.nickname = datas['name'] # 用户名可能为None优先显示login
user.openid = datas['id'] # 用户唯一ID
user.type = 'github'
user.token = self.access_token
user.metadata = rsp
# 若返回邮箱则赋值GitHub部分用户邮箱可能为None
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('github oauth error.rsp:' + rsp)
return None
def get_picture(self, metadata):
"""从元数据中提取GitHub用户头像URL"""
datas = json.loads(metadata)
return datas['avatar_url']
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
"""Facebook OAuth登录管理器集成代理支持实现Facebook第三方登录逻辑"""
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth' # Facebook授权URLv16.0版本)
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token' # Facebook Token获取URL
API_URL = 'https://graph.facebook.com/me' # Facebook用户信息API
ICON_NAME = 'facebook' # 对应配置中的平台类型
def __init__(self, access_token=None, openid=None):
# 获取Facebook OAuth配置
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
# 调用父类初始化方法
super(FaceBookOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""生成Facebook授权URL请求email和公开资料权限"""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'email,public_profile' # 授权范围
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取Facebook Access Token"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
token = str(obj['access_token'])
self.access_token = token
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""通过Access Token获取Facebook用户信息需指定返回字段"""
params = {
'access_token': self.access_token,
'fields': 'id,name,picture,email' # 指定返回的字段(减少冗余)
}
try:
rsp = self.do_get(self.API_URL, params)
datas = json.loads(rsp)
user = OAuthUser()
user.nickname = datas['name'] # 用户名
user.openid = datas['id'] # 用户唯一标识
user.type = 'facebook'
user.token = self.access_token
user.metadata = rsp
# 赋值邮箱可能为None
if 'email' in datas and datas['email']:
user.email = datas['email']
# 处理头像Facebook头像嵌套在picture.data.url中
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
user.picture = str(datas['picture']['data']['url'])
return user
except Exception as e:
logger.error(e)
return None
def get_picture(self, metadata):
"""从元数据中提取Facebook用户头像URL处理嵌套结构"""
datas = json.loads(metadata)
return str(datas['picture']['data']['url'])
class QQOauthManager(BaseOauthManager):
"""QQ OAuth登录管理器实现QQ第三方登录逻辑需单独获取OpenID"""
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize' # QQ授权URL
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token' # QQ Token获取URL
API_URL = 'https://graph.qq.com/user/get_user_info' # QQ用户信息API
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' # QQ OpenID获取URL单独接口
ICON_NAME = 'qq' # 对应配置中的平台类型
def __init__(self, access_token=None, openid=None):
# 获取QQ OAuth配置
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
# 调用父类初始化方法
super(QQOauthManager, self).__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""生成QQ授权URL拼接跳转地址"""
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取QQ Access Token返回格式为query string"""
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': self.callback_url
}
# QQ获取Token使用GET请求
rsp = self.do_get(self.TOKEN_URL, params)
if rsp:
# 解析query string格式的响应
d = urllib.parse.parse_qs(rsp)
if 'access_token' in d:
token = d['access_token']
self.access_token = token[0] # 取第一个值
return token
else:
raise OAuthAccessTokenException(rsp)
def get_open_id(self):
"""单独获取QQ用户的OpenIDQQ OAuth特殊流程"""
if self.is_access_token_set:
params = {'access_token': self.access_token}
rsp = self.do_get(self.OPEN_ID_URL, params)
if rsp:
# QQ返回的OpenID格式为callback包裹的JSON需处理格式
rsp = rsp.replace('callback(', '').replace(')', '').replace(';', '')
obj = json.loads(rsp)
openid = str(obj['openid'])
self.openid = openid
return openid
def get_oauth_userinfo(self):
"""获取QQ用户信息需先获取OpenID"""
openid = self.get_open_id()
if openid:
params = {
'access_token': self.access_token,
'oauth_consumer_key': self.client_id, # QQ需额外传递client_id
'openid': self.openid
}
rsp = self.do_get(self.API_URL, params)
logger.info(rsp)
obj = json.loads(rsp)
user = OAuthUser()
user.nickname = obj['nickname'] # 昵称
user.openid = openid # 唯一标识
user.type = 'qq'
user.token = self.access_token
user.metadata = rsp
# 赋值邮箱可能为None
if 'email' in obj:
user.email = obj['email']
# 赋值头像figureurl为QQ头像URL
if 'figureurl' in obj:
user.picture = str(obj['figureurl'])
return user
def get_picture(self, metadata):
"""从元数据中提取QQ用户头像URL"""
datas = json.loads(metadata)
return str(datas['figureurl'])
@cache_decorator(expiration=100 * 60) # 缓存100分钟减少数据库查询
def get_oauth_apps():
"""
获取所有启用的OAuth应用管理器实例
:return: 启用的OAuthManager实例列表
"""
# 查询所有已启用的OAuth配置
configs = OAuthConfig.objects.filter(is_enable=True).all()
if not configs:
return [] # 无启用配置则返回空列表
# 提取已启用的平台类型
configtypes = [x.type for x in configs]
# 获取BaseOauthManager的所有子类各平台实现类
applications = BaseOauthManager.__subclasses__()
# 筛选出已启用的平台实例
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
return apps
def get_manager_by_type(type):
"""
根据平台类型获取对应的OAuth管理器实例
:param type: 平台类型如weibo、github
:return: 对应的OAuthManager实例不存在返回None
"""
applications = get_oauth_apps()
if applications:
# 忽略大小写匹配平台类型
finds = list(filter(lambda x: x.ICON_NAME.lower() == type.lower(), applications))
if finds:
return finds[0] # 返回第一个匹配的实例
return None