diff --git a/oauth/oauthmanager.py b/oauth/oauthmanager.py deleted file mode 100644 index 2e7ceef..0000000 --- a/oauth/oauthmanager.py +++ /dev/null @@ -1,504 +0,0 @@ -import json -import logging -import os -import urllib.parse -from abc import ABCMeta, abstractmethod - -import requests - -from djangoblog.utils import cache_decorator -from oauth.models import OAuthUser, OAuthConfig - -logger = logging.getLogger(__name__) - - -class OAuthAccessTokenException(Exception): - ''' - oauth授权失败异常 - ''' - - -class BaseOauthManager(metaclass=ABCMeta): - """获取用户授权""" - AUTH_URL = None - """获取token""" - TOKEN_URL = None - """获取用户信息""" - API_URL = None - '''icon图标名''' - ICON_NAME = None - - def __init__(self, access_token=None, openid=None): - self.access_token = access_token - self.openid = openid - - @property - def is_access_token_set(self): - return self.access_token is not None - - @property - def is_authorized(self): - return self.is_access_token_set and self.access_token is not None and self.openid is not None - - @abstractmethod - def get_authorization_url(self, nexturl='/'): - pass - - @abstractmethod - def get_access_token_by_code(self, code): - pass - - @abstractmethod - def get_oauth_userinfo(self): - pass - - @abstractmethod - def get_picture(self, metadata): - pass - - def do_get(self, url, params, headers=None): - rsp = requests.get(url=url, params=params, headers=headers) - logger.info(rsp.text) - return rsp.text - - def do_post(self, url, params, headers=None): - rsp = requests.post(url, params, headers=headers) - logger.info(rsp.text) - return rsp.text - - def get_config(self): - value = OAuthConfig.objects.filter(type=self.ICON_NAME) - return value[0] if value else None - - -class WBOauthManager(BaseOauthManager): - AUTH_URL = 'https://api.weibo.com/oauth2/authorize' - TOKEN_URL = 'https://api.weibo.com/oauth2/access_token' - API_URL = 'https://api.weibo.com/2/users/show.json' - ICON_NAME = 'weibo' - - def __init__(self, access_token=None, openid=None): - config = self.get_config() - self.client_id = config.appkey if config else '' - self.client_secret = config.appsecret if config else '' - self.callback_url = config.callback_url if config else '' - super( - WBOauthManager, - self).__init__( - access_token=access_token, - openid=openid) - - def get_authorization_url(self, nexturl='/'): - params = { - 'client_id': self.client_id, - 'response_type': 'code', - 'redirect_uri': self.callback_url + '&next_url=' + nexturl - } - url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) - return url - - def get_access_token_by_code(self, code): - - params = { - 'client_id': self.client_id, - 'client_secret': self.client_secret, - 'grant_type': 'authorization_code', - 'code': code, - 'redirect_uri': self.callback_url - } - rsp = self.do_post(self.TOKEN_URL, params) - - obj = json.loads(rsp) - if 'access_token' in obj: - self.access_token = str(obj['access_token']) - self.openid = str(obj['uid']) - return self.get_oauth_userinfo() - else: - raise OAuthAccessTokenException(rsp) - - def get_oauth_userinfo(self): - if not self.is_authorized: - return None - params = { - 'uid': self.openid, - 'access_token': self.access_token - } - rsp = self.do_get(self.API_URL, params) - try: - datas = json.loads(rsp) - user = OAuthUser() - user.metadata = rsp - user.picture = datas['avatar_large'] - user.nickname = datas['screen_name'] - user.openid = datas['id'] - user.type = 'weibo' - user.token = self.access_token - if 'email' in datas and datas['email']: - user.email = datas['email'] - return user - except Exception as e: - logger.error(e) - logger.error('weibo oauth error.rsp:' + rsp) - return None - - def get_picture(self, metadata): - datas = json.loads(metadata) - return datas['avatar_large'] - - -class ProxyManagerMixin: - def __init__(self, *args, **kwargs): - if os.environ.get("HTTP_PROXY"): - self.proxies = { - "http": os.environ.get("HTTP_PROXY"), - "https": os.environ.get("HTTP_PROXY") - } - else: - self.proxies = None - - def do_get(self, url, params, headers=None): - rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies) - logger.info(rsp.text) - return rsp.text - - def do_post(self, url, params, headers=None): - rsp = requests.post(url, params, headers=headers, proxies=self.proxies) - logger.info(rsp.text) - return rsp.text - - -class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): - AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' - TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token' - API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo' - ICON_NAME = 'google' - - def __init__(self, access_token=None, openid=None): - config = self.get_config() - self.client_id = config.appkey if config else '' - self.client_secret = config.appsecret if config else '' - self.callback_url = config.callback_url if config else '' - super( - GoogleOauthManager, - self).__init__( - access_token=access_token, - openid=openid) - - def get_authorization_url(self, nexturl='/'): - params = { - 'client_id': self.client_id, - 'response_type': 'code', - 'redirect_uri': self.callback_url, - 'scope': 'openid email', - } - url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) - return url - - def get_access_token_by_code(self, code): - params = { - 'client_id': self.client_id, - 'client_secret': self.client_secret, - 'grant_type': 'authorization_code', - 'code': code, - - 'redirect_uri': self.callback_url - } - rsp = self.do_post(self.TOKEN_URL, params) - - obj = json.loads(rsp) - - if 'access_token' in obj: - self.access_token = str(obj['access_token']) - self.openid = str(obj['id_token']) - logger.info(self.ICON_NAME + ' oauth ' + rsp) - return self.access_token - else: - raise OAuthAccessTokenException(rsp) - - def get_oauth_userinfo(self): - if not self.is_authorized: - return None - params = { - 'access_token': self.access_token - } - rsp = self.do_get(self.API_URL, params) - try: - - datas = json.loads(rsp) - user = OAuthUser() - user.metadata = rsp - user.picture = datas['picture'] - user.nickname = datas['name'] - user.openid = datas['sub'] - user.token = self.access_token - user.type = 'google' - if datas['email']: - user.email = datas['email'] - return user - except Exception as e: - logger.error(e) - logger.error('google oauth error.rsp:' + rsp) - return None - - def get_picture(self, metadata): - datas = json.loads(metadata) - return datas['picture'] - - -class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): - AUTH_URL = 'https://github.com/login/oauth/authorize' - TOKEN_URL = 'https://github.com/login/oauth/access_token' - API_URL = 'https://api.github.com/user' - ICON_NAME = 'github' - - def __init__(self, access_token=None, openid=None): - config = self.get_config() - self.client_id = config.appkey if config else '' - self.client_secret = config.appsecret if config else '' - self.callback_url = config.callback_url if config else '' - super( - GitHubOauthManager, - self).__init__( - access_token=access_token, - openid=openid) - - def get_authorization_url(self, next_url='/'): - params = { - 'client_id': self.client_id, - 'response_type': 'code', - 'redirect_uri': f'{self.callback_url}&next_url={next_url}', - 'scope': 'user' - } - url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) - return url - - def get_access_token_by_code(self, code): - params = { - 'client_id': self.client_id, - 'client_secret': self.client_secret, - 'grant_type': 'authorization_code', - 'code': code, - - 'redirect_uri': self.callback_url - } - rsp = self.do_post(self.TOKEN_URL, params) - - from urllib import parse - r = parse.parse_qs(rsp) - if 'access_token' in r: - self.access_token = (r['access_token'][0]) - return self.access_token - else: - raise OAuthAccessTokenException(rsp) - - def get_oauth_userinfo(self): - - rsp = self.do_get(self.API_URL, params={}, headers={ - "Authorization": "token " + self.access_token - }) - try: - datas = json.loads(rsp) - user = OAuthUser() - user.picture = datas['avatar_url'] - user.nickname = datas['name'] - user.openid = datas['id'] - user.type = 'github' - user.token = self.access_token - user.metadata = rsp - if 'email' in datas and datas['email']: - user.email = datas['email'] - return user - except Exception as e: - logger.error(e) - logger.error('github oauth error.rsp:' + rsp) - return None - - def get_picture(self, metadata): - datas = json.loads(metadata) - return datas['avatar_url'] - - -class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): - AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth' - TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token' - API_URL = 'https://graph.facebook.com/me' - ICON_NAME = 'facebook' - - def __init__(self, access_token=None, openid=None): - config = self.get_config() - self.client_id = config.appkey if config else '' - self.client_secret = config.appsecret if config else '' - self.callback_url = config.callback_url if config else '' - super( - FaceBookOauthManager, - self).__init__( - access_token=access_token, - openid=openid) - - def get_authorization_url(self, next_url='/'): - params = { - 'client_id': self.client_id, - 'response_type': 'code', - 'redirect_uri': self.callback_url, - 'scope': 'email,public_profile' - } - url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) - return url - - def get_access_token_by_code(self, code): - params = { - 'client_id': self.client_id, - 'client_secret': self.client_secret, - # 'grant_type': 'authorization_code', - 'code': code, - - 'redirect_uri': self.callback_url - } - rsp = self.do_post(self.TOKEN_URL, params) - - obj = json.loads(rsp) - if 'access_token' in obj: - token = str(obj['access_token']) - self.access_token = token - return self.access_token - else: - raise OAuthAccessTokenException(rsp) - - def get_oauth_userinfo(self): - params = { - 'access_token': self.access_token, - 'fields': 'id,name,picture,email' - } - try: - rsp = self.do_get(self.API_URL, params) - datas = json.loads(rsp) - user = OAuthUser() - user.nickname = datas['name'] - user.openid = datas['id'] - user.type = 'facebook' - user.token = self.access_token - user.metadata = rsp - if 'email' in datas and datas['email']: - user.email = datas['email'] - if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']: - user.picture = str(datas['picture']['data']['url']) - return user - except Exception as e: - logger.error(e) - return None - - def get_picture(self, metadata): - datas = json.loads(metadata) - return str(datas['picture']['data']['url']) - - -class QQOauthManager(BaseOauthManager): - AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize' - TOKEN_URL = 'https://graph.qq.com/oauth2.0/token' - API_URL = 'https://graph.qq.com/user/get_user_info' - OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' - ICON_NAME = 'qq' - - def __init__(self, access_token=None, openid=None): - config = self.get_config() - self.client_id = config.appkey if config else '' - self.client_secret = config.appsecret if config else '' - self.callback_url = config.callback_url if config else '' - super( - QQOauthManager, - self).__init__( - access_token=access_token, - openid=openid) - - def get_authorization_url(self, next_url='/'): - params = { - 'response_type': 'code', - 'client_id': self.client_id, - 'redirect_uri': self.callback_url + '&next_url=' + next_url, - } - url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) - return url - - def get_access_token_by_code(self, code): - params = { - 'grant_type': 'authorization_code', - 'client_id': self.client_id, - 'client_secret': self.client_secret, - 'code': code, - 'redirect_uri': self.callback_url - } - rsp = self.do_get(self.TOKEN_URL, params) - if rsp: - d = urllib.parse.parse_qs(rsp) - if 'access_token' in d: - token = d['access_token'] - self.access_token = token[0] - return token - else: - raise OAuthAccessTokenException(rsp) - - def get_open_id(self): - if self.is_access_token_set: - params = { - 'access_token': self.access_token - } - rsp = self.do_get(self.OPEN_ID_URL, params) - if rsp: - rsp = rsp.replace( - 'callback(', '').replace( - ')', '').replace( - ';', '') - obj = json.loads(rsp) - openid = str(obj['openid']) - self.openid = openid - return openid - - def get_oauth_userinfo(self): - openid = self.get_open_id() - if openid: - params = { - 'access_token': self.access_token, - 'oauth_consumer_key': self.client_id, - 'openid': self.openid - } - rsp = self.do_get(self.API_URL, params) - logger.info(rsp) - obj = json.loads(rsp) - user = OAuthUser() - user.nickname = obj['nickname'] - user.openid = openid - user.type = 'qq' - user.token = self.access_token - user.metadata = rsp - if 'email' in obj: - user.email = obj['email'] - if 'figureurl' in obj: - user.picture = str(obj['figureurl']) - return user - - def get_picture(self, metadata): - datas = json.loads(metadata) - return str(datas['figureurl']) - - -@cache_decorator(expiration=100 * 60) -def get_oauth_apps(): - configs = OAuthConfig.objects.filter(is_enable=True).all() - if not configs: - return [] - configtypes = [x.type for x in configs] - applications = BaseOauthManager.__subclasses__() - apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes] - return apps - - -def get_manager_by_type(type): - applications = get_oauth_apps() - if applications: - finds = list( - filter( - lambda x: x.ICON_NAME.lower() == type.lower(), - applications)) - if finds: - return finds[0] - return None