From 85347e776b04d82096221df11ad52136556afbc7 Mon Sep 17 00:00:00 2001 From: nch Date: Fri, 24 Oct 2025 18:57:43 +0800 Subject: [PATCH] =?UTF-8?q?nch=E7=9A=84=E6=B3=A8=E9=87=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DjangoBlog-master/oauth/oauthmanager.py | 196 +++++++++++------- 1 file changed, 119 insertions(+), 77 deletions(-) diff --git a/src/DjangoBlog-master(1)/DjangoBlog-master/oauth/oauthmanager.py b/src/DjangoBlog-master(1)/DjangoBlog-master/oauth/oauthmanager.py index 2e7ceef..23f4315 100644 --- a/src/DjangoBlog-master(1)/DjangoBlog-master/oauth/oauthmanager.py +++ b/src/DjangoBlog-master(1)/DjangoBlog-master/oauth/oauthmanager.py @@ -9,86 +9,115 @@ import requests from djangoblog.utils import cache_decorator from oauth.models import OAuthUser, OAuthConfig +import logging +import requests +import json +import urllib.parse +import os +from abc import ABCMeta, abstractmethod +from django.core.cache import cache +from cache_decorator import cache_decorator + +# 获取logger实例 logger = logging.getLogger(__name__) class OAuthAccessTokenException(Exception): ''' - oauth授权失败异常 + OAuth授权失败异常类 ''' class BaseOauthManager(metaclass=ABCMeta): - """获取用户授权""" + """OAuth授权管理器基类""" + + # 授权URL AUTH_URL = None - """获取token""" + # 获取token的URL TOKEN_URL = None - """获取用户信息""" + # 获取用户信息的API URL API_URL = None - '''icon图标名''' + # icon图标名 ICON_NAME = None def __init__(self, access_token=None, openid=None): + """ + 初始化OAuth管理器 + + Args: + access_token: 访问令牌 + openid: 用户唯一标识 + """ self.access_token = access_token self.openid = openid @property def is_access_token_set(self): + """检查access_token是否已设置""" return self.access_token is not None @property def is_authorized(self): + """检查是否已授权(既有access_token又有openid)""" return self.is_access_token_set and self.access_token is not None and self.openid is not None @abstractmethod def get_authorization_url(self, nexturl='/'): + """获取授权URL(抽象方法)""" pass @abstractmethod def get_access_token_by_code(self, code): + """通过授权码获取访问令牌(抽象方法)""" pass @abstractmethod def get_oauth_userinfo(self): + """获取用户信息(抽象方法)""" pass @abstractmethod def get_picture(self, metadata): + """从元数据中获取用户头像(抽象方法)""" pass def do_get(self, url, params, headers=None): + """执行GET请求""" rsp = requests.get(url=url, params=params, headers=headers) logger.info(rsp.text) return rsp.text def do_post(self, url, params, headers=None): + """执行POST请求""" rsp = requests.post(url, params, headers=headers) logger.info(rsp.text) return rsp.text def get_config(self): + """获取OAuth配置""" value = OAuthConfig.objects.filter(type=self.ICON_NAME) return value[0] if value else None class WBOauthManager(BaseOauthManager): + """微博OAuth管理器""" + + # 微博OAuth相关URL AUTH_URL = 'https://api.weibo.com/oauth2/authorize' TOKEN_URL = 'https://api.weibo.com/oauth2/access_token' API_URL = 'https://api.weibo.com/2/users/show.json' ICON_NAME = 'weibo' def __init__(self, access_token=None, openid=None): + """初始化微博OAuth管理器""" config = self.get_config() - self.client_id = config.appkey if config else '' - self.client_secret = config.appsecret if config else '' - self.callback_url = config.callback_url if config else '' - super( - WBOauthManager, - self).__init__( - access_token=access_token, - openid=openid) + self.client_id = config.appkey if config else '' # 应用Key + self.client_secret = config.appsecret if config else '' # 应用Secret + self.callback_url = config.callback_url if config else '' # 回调URL + super(WBOauthManager, self).__init__(access_token=access_token, openid=openid) def get_authorization_url(self, nexturl='/'): + """获取微博授权URL""" params = { 'client_id': self.client_id, 'response_type': 'code', @@ -98,7 +127,7 @@ class WBOauthManager(BaseOauthManager): return url def get_access_token_by_code(self, code): - + """通过授权码获取访问令牌""" params = { 'client_id': self.client_id, 'client_secret': self.client_secret, @@ -117,6 +146,7 @@ class WBOauthManager(BaseOauthManager): raise OAuthAccessTokenException(rsp) def get_oauth_userinfo(self): + """获取微博用户信息""" if not self.is_authorized: return None params = { @@ -127,14 +157,14 @@ class WBOauthManager(BaseOauthManager): try: datas = json.loads(rsp) user = OAuthUser() - user.metadata = rsp - user.picture = datas['avatar_large'] - user.nickname = datas['screen_name'] - user.openid = datas['id'] - user.type = 'weibo' - user.token = self.access_token + user.metadata = rsp # 原始元数据 + user.picture = datas['avatar_large'] # 用户头像 + user.nickname = datas['screen_name'] # 用户昵称 + user.openid = datas['id'] # 用户OpenID + user.type = 'weibo' # 用户类型 + user.token = self.access_token # 访问令牌 if 'email' in datas and datas['email']: - user.email = datas['email'] + user.email = datas['email'] # 用户邮箱 return user except Exception as e: logger.error(e) @@ -142,12 +172,16 @@ class WBOauthManager(BaseOauthManager): return None def get_picture(self, metadata): + """从元数据中获取用户头像""" datas = json.loads(metadata) return datas['avatar_large'] class ProxyManagerMixin: + """代理管理器混入类,用于处理网络代理""" + def __init__(self, *args, **kwargs): + """初始化代理设置""" if os.environ.get("HTTP_PROXY"): self.proxies = { "http": os.environ.get("HTTP_PROXY"), @@ -157,50 +191,52 @@ class ProxyManagerMixin: self.proxies = None def do_get(self, url, params, headers=None): + """使用代理执行GET请求""" rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies) logger.info(rsp.text) return rsp.text def do_post(self, url, params, headers=None): + """使用代理执行POST请求""" rsp = requests.post(url, params, headers=headers, proxies=self.proxies) logger.info(rsp.text) return rsp.text class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): + """Google OAuth管理器""" + AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token' API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo' ICON_NAME = 'google' def __init__(self, access_token=None, openid=None): + """初始化Google OAuth管理器""" config = self.get_config() self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' self.callback_url = config.callback_url if config else '' - super( - GoogleOauthManager, - self).__init__( - access_token=access_token, - openid=openid) + super(GoogleOauthManager, self).__init__(access_token=access_token, openid=openid) def get_authorization_url(self, nexturl='/'): + """获取Google授权URL""" params = { 'client_id': self.client_id, 'response_type': 'code', 'redirect_uri': self.callback_url, - 'scope': 'openid email', + 'scope': 'openid email', # 请求的权限范围 } url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) return url def get_access_token_by_code(self, code): + """通过授权码获取访问令牌""" params = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'grant_type': 'authorization_code', 'code': code, - 'redirect_uri': self.callback_url } rsp = self.do_post(self.TOKEN_URL, params) @@ -216,6 +252,7 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): raise OAuthAccessTokenException(rsp) def get_oauth_userinfo(self): + """获取Google用户信息""" if not self.is_authorized: return None params = { @@ -223,17 +260,16 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): } rsp = self.do_get(self.API_URL, params) try: - datas = json.loads(rsp) user = OAuthUser() user.metadata = rsp - user.picture = datas['picture'] - user.nickname = datas['name'] - user.openid = datas['sub'] + user.picture = datas['picture'] # 用户头像 + user.nickname = datas['name'] # 用户昵称 + user.openid = datas['sub'] # 用户唯一标识 user.token = self.access_token user.type = 'google' if datas['email']: - user.email = datas['email'] + user.email = datas['email'] # 用户邮箱 return user except Exception as e: logger.error(e) @@ -241,48 +277,50 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): return None def get_picture(self, metadata): + """从元数据中获取用户头像""" datas = json.loads(metadata) return datas['picture'] class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): + """GitHub OAuth管理器""" + AUTH_URL = 'https://github.com/login/oauth/authorize' TOKEN_URL = 'https://github.com/login/oauth/access_token' API_URL = 'https://api.github.com/user' ICON_NAME = 'github' def __init__(self, access_token=None, openid=None): + """初始化GitHub OAuth管理器""" config = self.get_config() self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' self.callback_url = config.callback_url if config else '' - super( - GitHubOauthManager, - self).__init__( - access_token=access_token, - openid=openid) + super(GitHubOauthManager, self).__init__(access_token=access_token, openid=openid) def get_authorization_url(self, next_url='/'): + """获取GitHub授权URL""" params = { 'client_id': self.client_id, 'response_type': 'code', 'redirect_uri': f'{self.callback_url}&next_url={next_url}', - 'scope': 'user' + 'scope': 'user' # 请求的用户权限 } url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) return url def get_access_token_by_code(self, code): + """通过授权码获取访问令牌""" params = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'grant_type': 'authorization_code', 'code': code, - 'redirect_uri': self.callback_url } rsp = self.do_post(self.TOKEN_URL, params) + # 解析URL编码的响应 from urllib import parse r = parse.parse_qs(rsp) if 'access_token' in r: @@ -292,21 +330,21 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): raise OAuthAccessTokenException(rsp) def get_oauth_userinfo(self): - + """获取GitHub用户信息""" rsp = self.do_get(self.API_URL, params={}, headers={ - "Authorization": "token " + self.access_token + "Authorization": "token " + self.access_token # 使用token进行认证 }) try: datas = json.loads(rsp) user = OAuthUser() - user.picture = datas['avatar_url'] - user.nickname = datas['name'] - user.openid = datas['id'] + user.picture = datas['avatar_url'] # 用户头像 + user.nickname = datas['name'] # 用户昵称 + user.openid = datas['id'] # 用户ID user.type = 'github' user.token = self.access_token user.metadata = rsp if 'email' in datas and datas['email']: - user.email = datas['email'] + user.email = datas['email'] # 用户邮箱 return user except Exception as e: logger.error(e) @@ -314,44 +352,44 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): return None def get_picture(self, metadata): + """从元数据中获取用户头像""" datas = json.loads(metadata) return datas['avatar_url'] class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): + """Facebook OAuth管理器""" + AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth' TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token' API_URL = 'https://graph.facebook.com/me' ICON_NAME = 'facebook' def __init__(self, access_token=None, openid=None): + """初始化Facebook OAuth管理器""" config = self.get_config() self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' self.callback_url = config.callback_url if config else '' - super( - FaceBookOauthManager, - self).__init__( - access_token=access_token, - openid=openid) + super(FaceBookOauthManager, self).__init__(access_token=access_token, openid=openid) def get_authorization_url(self, next_url='/'): + """获取Facebook授权URL""" params = { 'client_id': self.client_id, 'response_type': 'code', 'redirect_uri': self.callback_url, - 'scope': 'email,public_profile' + 'scope': 'email,public_profile' # 请求的权限范围 } url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) return url def get_access_token_by_code(self, code): + """通过授权码获取访问令牌""" params = { 'client_id': self.client_id, 'client_secret': self.client_secret, - # 'grant_type': 'authorization_code', 'code': code, - 'redirect_uri': self.callback_url } rsp = self.do_post(self.TOKEN_URL, params) @@ -365,52 +403,54 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): raise OAuthAccessTokenException(rsp) def get_oauth_userinfo(self): + """获取Facebook用户信息""" params = { 'access_token': self.access_token, - 'fields': 'id,name,picture,email' + 'fields': 'id,name,picture,email' # 请求的用户字段 } try: rsp = self.do_get(self.API_URL, params) datas = json.loads(rsp) user = OAuthUser() - user.nickname = datas['name'] - user.openid = datas['id'] + user.nickname = datas['name'] # 用户昵称 + user.openid = datas['id'] # 用户ID user.type = 'facebook' user.token = self.access_token user.metadata = rsp if 'email' in datas and datas['email']: - user.email = datas['email'] + user.email = datas['email'] # 用户邮箱 if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']: - user.picture = str(datas['picture']['data']['url']) + user.picture = str(datas['picture']['data']['url']) # 用户头像 return user except Exception as e: logger.error(e) return None def get_picture(self, metadata): + """从元数据中获取用户头像""" datas = json.loads(metadata) return str(datas['picture']['data']['url']) class QQOauthManager(BaseOauthManager): + """QQ OAuth管理器""" + AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize' TOKEN_URL = 'https://graph.qq.com/oauth2.0/token' API_URL = 'https://graph.qq.com/user/get_user_info' - OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' + OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' # 获取OpenID的URL ICON_NAME = 'qq' def __init__(self, access_token=None, openid=None): + """初始化QQ OAuth管理器""" config = self.get_config() self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' self.callback_url = config.callback_url if config else '' - super( - QQOauthManager, - self).__init__( - access_token=access_token, - openid=openid) + super(QQOauthManager, self).__init__(access_token=access_token, openid=openid) def get_authorization_url(self, next_url='/'): + """获取QQ授权URL""" params = { 'response_type': 'code', 'client_id': self.client_id, @@ -420,6 +460,7 @@ class QQOauthManager(BaseOauthManager): return url def get_access_token_by_code(self, code): + """通过授权码获取访问令牌""" params = { 'grant_type': 'authorization_code', 'client_id': self.client_id, @@ -429,6 +470,7 @@ class QQOauthManager(BaseOauthManager): } rsp = self.do_get(self.TOKEN_URL, params) if rsp: + # 解析URL编码的响应 d = urllib.parse.parse_qs(rsp) if 'access_token' in d: token = d['access_token'] @@ -438,22 +480,22 @@ class QQOauthManager(BaseOauthManager): raise OAuthAccessTokenException(rsp) def get_open_id(self): + """获取用户的OpenID""" if self.is_access_token_set: params = { 'access_token': self.access_token } rsp = self.do_get(self.OPEN_ID_URL, params) if rsp: - rsp = rsp.replace( - 'callback(', '').replace( - ')', '').replace( - ';', '') + # 清理响应格式(JSONP格式) + rsp = rsp.replace('callback(', '').replace(')', '').replace(';', '') obj = json.loads(rsp) openid = str(obj['openid']) self.openid = openid return openid def get_oauth_userinfo(self): + """获取QQ用户信息""" openid = self.get_open_id() if openid: params = { @@ -465,24 +507,26 @@ class QQOauthManager(BaseOauthManager): logger.info(rsp) obj = json.loads(rsp) user = OAuthUser() - user.nickname = obj['nickname'] + user.nickname = obj['nickname'] # 用户昵称 user.openid = openid user.type = 'qq' user.token = self.access_token user.metadata = rsp if 'email' in obj: - user.email = obj['email'] + user.email = obj['email'] # 用户邮箱 if 'figureurl' in obj: - user.picture = str(obj['figureurl']) + user.picture = str(obj['figureurl']) # 用户头像 return user def get_picture(self, metadata): + """从元数据中获取用户头像""" datas = json.loads(metadata) return str(datas['figureurl']) @cache_decorator(expiration=100 * 60) def get_oauth_apps(): + """获取所有启用的OAuth应用(带缓存)""" configs = OAuthConfig.objects.filter(is_enable=True).all() if not configs: return [] @@ -493,12 +537,10 @@ def get_oauth_apps(): def get_manager_by_type(type): + """根据类型获取对应的OAuth管理器""" applications = get_oauth_apps() if applications: - finds = list( - filter( - lambda x: x.ICON_NAME.lower() == type.lower(), - applications)) + finds = list(filter(lambda x: x.ICON_NAME.lower() == type.lower(), applications)) if finds: return finds[0] - return None + return None \ No newline at end of file