You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
SoftwareMethodology/src/DjangoBlog-master/oauth/oauthmanager.py

547 lines
24 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import logging
import os
import urllib.parse # 用于URL参数编码/解码
from abc import ABCMeta, abstractmethod # 用于定义抽象基类
import requests # 用于发送HTTP请求获取授权、token、用户信息
from djangoblog.utils import cache_decorator # 导入缓存装饰器,优化重复查询
from oauth.models import OAuthUser, OAuthConfig # 导入OAuth相关模型
# 创建当前模块的日志记录器用于记录OAuth流程中的关键信息和错误
logger = logging.getLogger(__name__)
# 自定义异常类用于表示OAuth授权过程中获取token失败的情况
class OAuthAccessTokenException(Exception):
'''
oauth授权失败异常
'''
# 抽象基类定义所有第三方OAuth管理器的统一接口模板方法模式
class BaseOauthManager(metaclass=ABCMeta):
"""获取用户授权的抽象基类"""
# 子类需重写的常量第三方平台的授权URL、token获取URL、用户信息API URL、图标名称
AUTH_URL = None # 授权页面URL用户跳转授权的地址
TOKEN_URL = None # 获取access_token的API URL
API_URL = None # 获取用户信息的API URL
ICON_NAME = None # 平台图标标识需与OAuthConfig的type字段对应
def __init__(self, access_token=None, openid=None):
# 初始化access_token访问令牌和openid第三方平台用户唯一标识
self.access_token = access_token
self.openid = openid
# 属性判断access_token是否已设置
@property
def is_access_token_set(self):
return self.access_token is not None
# 属性判断是否已完成授权需同时拥有有效access_token和openid
@property
def is_authorized(self):
return self.is_access_token_set and self.access_token is not None and self.openid is not None
# 抽象方法生成授权URL子类需实现返回用户跳转的授权链接
@abstractmethod
def get_authorization_url(self, nexturl='/'):
pass
# 抽象方法通过授权码code获取access_token子类需实现完成token交换
@abstractmethod
def get_access_token_by_code(self, code):
pass
# 抽象方法通过access_token获取第三方用户信息子类需实现返回OAuthUser对象
@abstractmethod
def get_oauth_userinfo(self):
pass
# 抽象方法从用户元数据中提取头像URL子类需实现适配不同平台的字段差异
@abstractmethod
def get_picture(self, metadata):
pass
# 通用HTTP GET请求方法封装请求逻辑打印响应日志
def do_get(self, url, params, headers=None):
rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text) # 记录响应内容,便于调试
return rsp.text
# 通用HTTP POST请求方法封装请求逻辑打印响应日志
def do_post(self, url, params, headers=None):
rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text) # 记录响应内容,便于调试
return rsp.text
# 获取当前平台的OAuth配置从OAuthConfig模型中查询按ICON_NAME匹配
def get_config(self):
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None # 存在则返回第一条配置否则返回None
# 微博OAuth管理器继承BaseOauthManager实现微博平台的授权逻辑
class WBOauthManager(BaseOauthManager):
# 微博平台的固定URL和图标标识
AUTH_URL = 'https://api.weibo.com/oauth2/authorize' # 微博授权页URL
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token' # 微博token获取URL
API_URL = 'https://api.weibo.com/2/users/show.json' # 微博用户信息API
ICON_NAME = 'weibo' # 与OAuthConfig的type字段对应
def __init__(self, access_token=None, openid=None):
# 从配置中获取微博的AppKey、AppSecret、回调地址
config = self.get_config()
self.client_id = config.appkey if config else '' # 微博开放平台AppKey
self.client_secret = config.appsecret if config else '' # 微博开放平台AppSecret
self.callback_url = config.callback_url if config else ''# 微博授权回调地址
# 调用父类初始化方法设置access_token和openid
super(WBOauthManager, self).__init__(access_token=access_token, openid=openid)
# 生成微博授权URL拼接client_id、响应类型、回调地址等参数
def get_authorization_url(self, nexturl='/'):
params = {
'client_id': self.client_id,
'response_type': 'code', # 授权类型为code授权码模式
'redirect_uri': self.callback_url + '&next_url=' + nexturl # 回调地址+授权后跳转地址
}
# 拼接URL和参数urllib.parse.urlencode将字典转为URL查询字符串
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
# 通过授权码code获取微博access_token并自动获取用户信息
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code', # 授权模式为授权码模式
'code': code, # 前端获取的授权码
'redirect_uri': self.callback_url # 回调地址(需与平台配置一致)
}
# 发送POST请求获取token响应
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp) # 解析JSON响应
# 若响应中包含access_token说明获取成功
if 'access_token' in obj:
self.access_token = str(obj['access_token']) # 保存access_token
self.openid = str(obj['uid']) # 微博用户唯一标识为uid
return self.get_oauth_userinfo() # 自动获取用户信息并返回
else:
# 获取失败,抛出异常(携带响应内容便于排查)
raise OAuthAccessTokenException(rsp)
# 通过access_token获取微博用户信息返回OAuthUser对象
def get_oauth_userinfo(self):
if not self.is_authorized: # 未授权则返回None
return None
# 构造用户信息查询参数需uid和access_token
params = {
'uid': self.openid,
'access_token': self.access_token
}
# 发送GET请求获取用户信息
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp) # 解析用户信息JSON
user = OAuthUser() # 创建OAuthUser对象
user.metadata = rsp # 保存原始元数据(便于后续扩展)
user.picture = datas['avatar_large'] # 微博头像URL大尺寸
user.nickname = datas['screen_name'] # 微博昵称
user.openid = datas['id'] # 微博用户唯一ID
user.type = 'weibo' # 平台类型
user.token = self.access_token # 保存access_token
# 若响应中包含邮箱,则赋值(微博需额外申请权限才能获取邮箱)
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
except Exception as e:
# 捕获异常并记录日志(避免流程崩溃)
logger.error(e)
logger.error('weibo oauth error.rsp:' + rsp)
return None
# 从微博用户元数据中提取头像URL
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['avatar_large'] # 微博头像字段为avatar_large
# 代理混合类Mixin为需要代理的OAuth管理器提供HTTP代理支持
class ProxyManagerMixin:
def __init__(self, *args, **kwargs):
# 从环境变量中读取HTTP代理配置若存在则设置代理
if os.environ.get("HTTP_PROXY"):
self.proxies = {
"http": os.environ.get("HTTP_PROXY"),
"https": os.environ.get("HTTP_PROXY") # HTTPS也使用相同代理
}
else:
self.proxies = None # 无代理则为None
# 重写GET方法添加代理参数
def do_get(self, url, params, headers=None):
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
# 重写POST方法添加代理参数
def do_post(self, url, params, headers=None):
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
# 谷歌OAuth管理器继承ProxyManagerMixin和BaseOauthManager支持代理+谷歌授权)
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
# 谷歌平台的固定URL和图标标识
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' # 谷歌授权页URL
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token' # 谷歌token获取URL
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo' # 谷歌用户信息API
ICON_NAME = 'google' # 与OAuthConfig的type字段对应
def __init__(self, access_token=None, openid=None):
# 从配置中获取谷歌的AppKey、AppSecret、回调地址
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
# 调用父类初始化先初始化ProxyManagerMixin再初始化BaseOauthManager
super(GoogleOauthManager, self).__init__(access_token=access_token, openid=openid)
# 生成谷歌授权URL需指定openid和email权限
def get_authorization_url(self, nexturl='/'):
params = {
'client_id': self.client_id,
'response_type': 'code', # 授权码模式
'redirect_uri': self.callback_url, # 回调地址
'scope': 'openid email' # 申请的权限获取用户ID和邮箱
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
# 通过授权码code获取谷歌access_token
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['id_token']) # 谷歌用id_token作为用户唯一标识
logger.info(self.ICON_NAME + ' oauth ' + rsp)
return self.access_token # 返回access_token后续需手动调用get_oauth_userinfo
else:
raise OAuthAccessTokenException(rsp)
# 通过access_token获取谷歌用户信息
def get_oauth_userinfo(self):
if not self.is_authorized:
return None
params = {
'access_token': self.access_token
}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['picture'] # 谷歌头像URL
user.nickname = datas['name'] # 谷歌用户名
user.openid = datas['sub'] # 谷歌用户唯一标识sub字段
user.token = self.access_token
user.type = 'google'
if datas['email']: # 谷歌默认返回邮箱(需申请权限)
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('google oauth error.rsp:' + rsp)
return None
# 从谷歌元数据中提取头像URL
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['picture']
# GitHub OAuth管理器继承ProxyManagerMixin和BaseOauthManager支持代理+GitHub授权
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
# GitHub平台的固定URL和图标标识
AUTH_URL = 'https://github.com/login/oauth/authorize' # GitHub授权页URL
TOKEN_URL = 'https://github.com/login/oauth/access_token' # GitHub token URL
API_URL = 'https://api.github.com/user' # GitHub用户信息API
ICON_NAME = 'github'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(GitHubOauthManager, self).__init__(access_token=access_token, openid=openid)
# 生成GitHub授权URL申请user权限用于获取用户信息
def get_authorization_url(self, next_url='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': f'{self.callback_url}&next_url={next_url}', # 回调+跳转地址
'scope': 'user' # GitHub权限获取用户基本信息
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
# 通过授权码code获取GitHub access_tokenGitHub返回格式为表单需特殊解析
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
# GitHub返回的是表单格式如access_token=xxx&token_type=bearer需用parse_qs解析
from urllib import parse
r = parse.parse_qs(rsp)
if 'access_token' in r:
self.access_token = (r['access_token'][0]) # parse_qs返回列表取第一个元素
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
# 通过access_token获取GitHub用户信息GitHub需在Header中携带token
def get_oauth_userinfo(self):
# GitHub API要求在Header中用Authorization: token xxx传递token
rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token
})
try:
datas = json.loads(rsp)
user = OAuthUser()
user.picture = datas['avatar_url'] # GitHub头像URL
user.nickname = datas['name'] or datas['login'] # 优先用name无则用login用户名
user.openid = datas['id'] # GitHub用户唯一ID数字
user.type = 'github'
user.token = self.access_token
user.metadata = rsp
# GitHub邮箱可能为空用户未公开需判断
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
except Exception as e:
logger.error(e)
logger.error('github oauth error.rsp:' + rsp)
return None
# 从GitHub元数据中提取头像URL
def get_picture(self, metadata):
datas = json.loads(metadata)
return datas['avatar_url']
# Facebook OAuth管理器继承ProxyManagerMixin和BaseOauthManager支持代理+Facebook授权
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
# Facebook平台的固定URL和图标标识注意API版本号v16.0
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth' # Facebook授权页URL
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token' # Facebook token URL
API_URL = 'https://graph.facebook.com/me' # Facebook用户信息API
ICON_NAME = 'facebook'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(FaceBookOauthManager, self).__init__(access_token=access_token, openid=openid)
# 生成Facebook授权URL申请email和public_profile权限
def get_authorization_url(self, next_url='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url,
'scope': 'email,public_profile' # 申请邮箱和公开资料权限
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
# 通过授权码code获取Facebook access_tokenFacebook无需显式grant_type
def get_access_token_by_code(self, code):
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
# 'grant_type': 'authorization_code', # Facebook可省略该参数
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
token = str(obj['access_token'])
self.access_token = token
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
# 通过access_token获取Facebook用户信息需指定fields参数否则返回默认字段
def get_oauth_userinfo(self):
# Facebook需显式指定要获取的字段id、name、picture、email
params = {
'access_token': self.access_token,
'fields': 'id,name,picture,email'
}
try:
rsp = self.do_get(self.API_URL, params)
datas = json.loads(rsp)
user = OAuthUser()
user.nickname = datas['name'] # Facebook用户名
user.openid = datas['id'] # Facebook用户唯一ID
user.type = 'facebook'
user.token = self.access_token
user.metadata = rsp
# 处理邮箱(可能为空,用户未公开)
if 'email' in datas and datas['email']:
user.email = datas['email']
# 处理头像Facebook头像嵌套在picture.data.url中
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
user.picture = str(datas['picture']['data']['url'])
return user
except Exception as e:
logger.error(e)
return None
# 从Facebook元数据中提取头像URL处理嵌套结构
def get_picture(self, metadata):
datas = json.loads(metadata)
return str(datas['picture']['data']['url'])
# QQ OAuth管理器继承BaseOauthManager实现QQ平台的授权逻辑
class QQOauthManager(BaseOauthManager):
# QQ平台的固定URL和图标标识QQ需额外请求openid接口
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize' # QQ授权页URL
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token' # QQ token获取URL
API_URL = 'https://graph.qq.com/user/get_user_info' # QQ用户信息API
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' # QQ openid获取URLQQ特有
ICON_NAME = 'qq'
def __init__(self, access_token=None, openid=None):
config = self.get_config()
self.client_id = config.appkey if config else '' # QQ的AppID
self.client_secret = config.appsecret if config else '' # QQ的AppKey
self.callback_url = config.callback_url if config else ''
super(QQOauthManager, self).__init__(access_token=access_token, openid=openid)
# 生成QQ授权URL
def get_authorization_url(self, next_url='/'):
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
# 通过授权码code获取QQ access_tokenQQ返回格式为表单
def get_access_token_by_code(self, code):
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_get(self.TOKEN_URL, params) # QQ的token接口用GET请求
if rsp:
# QQ返回表单格式如access_token=xxx&expires_in=7776000&refresh_token=xxx
d = urllib.parse.parse_qs(rsp)
if 'access_token' in d:
token = d['access_token']
self.access_token = token[0] # 取列表第一个元素
return token
else:
raise OAuthAccessTokenException(rsp)
# QQ特有通过access_token获取openidQQ的openid需单独请求接口
def get_open_id(self):
if self.is_access_token_set:
params = {
'access_token': self.access_token
}
rsp = self.do_get(self.OPEN_ID_URL, params)
if rsp:
# QQ返回格式为callback({"client_id":"xxx","openid":"xxx"}); 需处理格式
rsp = rsp.replace('callback(', '').replace(')', '').replace(';', '')
obj = json.loads(rsp)
openid = str(obj['openid'])
self.openid = openid
return openid
# 通过access_token和openid获取QQ用户信息
def get_oauth_userinfo(self):
openid = self.get_open_id() # 先获取openid
if openid:
# QQ用户信息接口需传递access_token、oauth_consumer_key即AppID、openid
params = {
'access_token': self.access_token,
'oauth_consumer_key': self.client_id,
'openid': self.openid
}
rsp = self.do_get(self.API_URL, params)
logger.info(rsp)
obj = json.loads(rsp)
user = OAuthUser()
user.nickname = obj['nickname'] # QQ昵称
user.openid = openid # QQ openid
user.type = 'qq'
user.token = self.access_token
user.metadata = rsp
# 处理邮箱QQ需额外申请权限可能为空
if 'email' in obj:
user.email = obj['email']
# 处理头像QQ头像字段为figureurl
if 'figureurl' in obj:
user.picture = str(obj['figureurl'])
return user
# 从QQ元数据中提取头像URL
def get_picture(self, metadata):
datas = json.loads(metadata)
return str(datas['figureurl'])
# 获取所有已启用的OAuth应用带缓存100分钟过期减少数据库查询
@cache_decorator(expiration=100 * 60) # 缓存100分钟100*60秒
def get_oauth_apps():
# 1. 查询所有已启用的OAuth配置is_enable=True
configs = OAuthConfig.objects.filter(is_enable=True).all()
if not configs:
return [] # 无配置则返回空列表
# 2. 提取已启用的平台类型(如['weibo', 'github']
configtypes = [x.type for x in configs]
# 3. 获取所有BaseOauthManager的子类即所有平台的管理器
applications = BaseOauthManager.__subclasses__()
# 4. 筛选出已启用的管理器ICON_NAME在configtypes中
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
return apps
# 根据平台类型获取对应的OAuth管理器
def get_manager_by_type(type):
applications = get_oauth_apps() # 获取已启用的应用
if applications:
# 筛选出类型匹配的管理器(不区分大小写)
finds = list(filter(lambda x: x.ICON_NAME.lower() == type.lower(), applications))
if finds:
return finds[0] # 返回第一个匹配的管理器
return None # 无匹配则返回None