From f7b8f511a16f53a28ce7a6d900d633f4622b1eef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9F=8F=E7=92=90?= <3217621994@qq.com> Date: Sun, 9 Nov 2025 23:43:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0oauth=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E6=B3=A8=E9=87=8A=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/oauth代码注释.py | 442 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 442 insertions(+) create mode 100644 src/oauth代码注释.py diff --git a/src/oauth代码注释.py b/src/oauth代码注释.py new file mode 100644 index 0000000..d3ec8ff --- /dev/null +++ b/src/oauth代码注释.py @@ -0,0 +1,442 @@ +""" +OAuth认证模块 +提供第三方登录功能,支持微信、微博、GitHub、Google、QQ、Facebook等平台 +""" + +import logging +import json +import urllib.parse +from abc import ABC, abstractmethod +from django.urls import reverse +from django.shortcuts import get_object_or_404, render +from django.http import HttpResponseRedirect, HttpResponseForbidden +from django.contrib import auth +from django.utils.translation import gettext_lazy as _ +from .models import OAuthUser, OAuthConfig + +# 获取日志器 +logger = logging.getLogger(__name__) + + +class BaseOauthManager(ABC): + """OAuth认证管理器基类""" + + # 授权URL和API端点 + AUTH_URL = "" + TOKEN_URL = "" + OPEN_ID_URL = "" + USER_INFO_URL = "" + ICON_NAME = "" # 平台图标名称 + + def __init__(self): + """初始化OAuth管理器""" + self.access_token = None + self.openid = None + self.client_id = None + self.client_secret = None + self.callback_url = None + + @abstractmethod + def get_authorization_url(self, next_url='/'): + """获取授权URL""" + pass + + @abstractmethod + def get_access_token_by_code(self, code): + """通过授权码获取访问令牌""" + pass + + @abstractmethod + def get_oauth_userinfo(self): + """获取用户信息""" + pass + + def get_picture(self, metadata): + """获取用户头像(可选实现)""" + return "" + + def do_get(self, url, params, headers=None): + """ + 执行GET请求 + + Args: + url: 请求URL + params: 请求参数 + headers: 请求头 + + Returns: + str: 响应内容 + """ + try: + response = requests.get(url=url, params=params, headers=headers) + logger.info(f"GET Response: {response.text}") + return response.text + except Exception as e: + logger.error(f"GET request failed: {e}") + raise + + def do_post(self, url, params, headers=None): + """ + 执行POST请求 + + Args: + url: 请求URL + params: 请求参数 + headers: 请求头 + + Returns: + str: 响应内容 + """ + try: + response = requests.post(url, data=params, headers=headers) + logger.info(f"POST Response: {response.text}") + return response.text + except Exception as e: + logger.error(f"POST request failed: {e}") + raise + + @property + def is_access_token_set(self): + """检查访问令牌是否已设置""" + return self.access_token is not None + + def get_config(self): + """获取OAuth配置""" + try: + config = OAuthConfig.objects.filter( + type=self.ICON_NAME.lower(), + is_enable=True + ).first() + if config: + self.client_id = config.appkey + self.client_secret = config.appsecret + self.callback_url = config.callback_url + return config + except Exception as e: + logger.error(f"Get OAuth config failed: {e}") + return None + + +class WeiboOauthManager(BaseOauthManager): + """微博OAuth认证管理器""" + + AUTH_URL = "https://api.weibo.com/oauth2/authorize" + TOKEN_URL = "https://api.weibo.com/oauth2/access_token" + USER_INFO_URL = "https://api.weibo.com/2/users/show.json" + ICON_NAME = "weibo" + + def get_authorization_url(self, next_url='/'): + """ + 获取微博授权URL + + Args: + next_url: 授权成功后跳转的URL + + Returns: + str: 完整的授权URL + """ + if not self.get_config(): + return "" + + params = { + 'client_id': self.client_id, + 'response_type': 'code', + 'redirect_uri': self.callback_url + '&next_url=' + next_url, + } + url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) + return url + + def get_access_token_by_code(self, code): + """ + 通过授权码获取访问令牌 + + Args: + code: 授权码 + + Returns: + tuple: (access_token, 响应数据) + """ + params = { + 'grant_type': 'authorization_code', + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'code': code, + 'redirect_uri': self.callback_url + } + + try: + response_text = self.do_post(self.TOKEN_URL, params) + token_data = json.loads(response_text) + + if 'access_token' in token_data: + self.access_token = token_data['access_token'] + return self.access_token, token_data + else: + logger.error(f"Failed to get access token: {token_data}") + raise OAuthAccessTokenException(token_data) + + except json.JSONDecodeError as e: + logger.error(f"JSON decode error: {e}") + raise OAuthAccessTokenException("Invalid response format") + except Exception as e: + logger.error(f"Get access token failed: {e}") + raise OAuthAccessTokenException(str(e)) + + def get_oauth_userinfo(self): + """ + 获取微博用户信息 + + Returns: + OAuthUser: 用户信息对象 + """ + if not self.is_access_token_set: + raise ValueError("Access token not set") + + params = {'access_token': self.access_token} + response_text = self.do_get(self.USER_INFO_URL, params) + user_data = json.loads(response_text) + + # 创建OAuth用户对象 + oauth_user = OAuthUser() + oauth_user.nickname = user_data.get('screen_name', '') + oauth_user.picture = user_data.get('profile_image_url', '') + oauth_user.token = self.access_token + oauth_user.type = 'weibo' + oauth_user.email = user_data.get('email', '') + oauth_user.metadata = json.dumps(user_data) + + return oauth_user + + +class GitHubOauthManager(BaseOauthManager): + """GitHub OAuth认证管理器""" + + AUTH_URL = "https://github.com/login/oauth/authorize" + TOKEN_URL = "https://github.com/login/oauth/access_token" + USER_INFO_URL = "https://api.github.com/user" + ICON_NAME = "github" + + def get_authorization_url(self, next_url='/'): + """获取GitHub授权URL""" + if not self.get_config(): + return "" + + params = { + 'client_id': self.client_id, + 'scope': 'user:email', + 'redirect_uri': self.callback_url + '&next_url=' + next_url, + } + url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) + return url + + def get_access_token_by_code(self, code): + """通过授权码获取GitHub访问令牌""" + params = { + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'code': code, + } + headers = {'Accept': 'application/json'} + + response_text = self.do_post(self.TOKEN_URL, params, headers) + token_data = json.loads(response_text) + + if 'access_token' in token_data: + self.access_token = token_data['access_token'] + return self.access_token, token_data + else: + raise OAuthAccessTokenException(token_data) + + def get_oauth_userinfo(self): + """获取GitHub用户信息""" + if not self.is_access_token_set: + raise ValueError("Access token not set") + + headers = {'Authorization': f'token {self.access_token}'} + response_text = self.do_get(self.USER_INFO_URL, headers=headers) + user_data = json.loads(response_text) + + oauth_user = OAuthUser() + oauth_user.nickname = user_data.get('login', '') + oauth_user.picture = user_data.get('avatar_url', '') + oauth_user.token = self.access_token + oauth_user.type = 'github' + oauth_user.email = user_data.get('email', '') + oauth_user.metadata = json.dumps(user_data) + + return oauth_user + + +# OAuth异常类 +class OAuthException(Exception): + """OAuth异常基类""" + pass + + +class OAuthAccessTokenException(OAuthException): + """访问令牌获取异常""" + pass + + +class OAuthUserInfoException(OAuthException): + """用户信息获取异常""" + pass + + +# 工具函数 +def get_oauth_apps(): + """ + 获取所有可用的OAuth应用 + + Returns: + list: OAuth管理器实例列表 + """ + config_types = OAuthConfig.objects.filter( + is_enable=True + ).values_list('type', flat=True) + + applications = BaseOauthManager.__subclasses__() + apps = [ + app() for app in applications + if app().ICON_NAME.lower() in config_types + ] + return apps + + +def get_manager_by_type(oauth_type): + """ + 根据类型获取OAuth管理器 + + Args: + oauth_type: OAuth类型(weibo、github等) + + Returns: + BaseOauthManager: OAuth管理器实例 + """ + applications = get_oauth_apps() + if applications: + finds = list(filter( + lambda x: x.ICON_NAME.lower() == oauth_type.lower(), + applications + )) + return finds[0] if finds else None + return None + + +# 视图函数 +def oauth_login(request): + """ + OAuth登录入口 + + Args: + request: HTTP请求对象 + + Returns: + HttpResponseRedirect: 重定向到授权页面或首页 + """ + oauth_type = request.GET.get('type') # 修复:避免使用内置函数名 + if not oauth_type: + return HttpResponseRedirect('/') + + manager = get_manager_by_type(oauth_type) + if not manager: + return HttpResponseRedirect('/') + + next_url = request.GET.get('next_url', '/') + authorize_url = manager.get_authorization_url(next_url) + return HttpResponseRedirect(authorize_url) + + +def oauth_authorize(request): + """ + OAuth授权回调处理 + + Args: + request: HTTP请求对象 + + Returns: + HttpResponse: 授权结果页面或重定向 + """ + oauth_type = request.GET.get('type') # 修复:避免使用内置函数名 + code = request.GET.get('code') + next_url = request.GET.get('next_url', '/') + + if not oauth_type or not code: + return HttpResponseRedirect('/') + + try: + manager = get_manager_by_type(oauth_type) + if not manager: + return HttpResponseRedirect('/') + + # 获取访问令牌 + manager.get_access_token_by_code(code) + + # 获取用户信息 + oauth_user = manager.get_oauth_userinfo() + + # 处理用户登录或绑定 + user = process_oauth_user(oauth_user, request) + if user: + auth.login(request, user) + return HttpResponseRedirect(next_url) + else: + # 转到绑定页面 + return redirect_to_bind_page(oauth_user, request) + + except OAuthException as e: + logger.error(f"OAuth authorization failed: {e}") + return render(request, 'oauth/error.html', { + 'error_message': _('OAuth authentication failed') + }) + + +def process_oauth_user(oauth_user, request): + """ + 处理OAuth用户信息 + + Args: + oauth_user: OAuth用户对象 + request: HTTP请求对象 + + Returns: + User: 认证用户对象或None + """ + try: + # 查找已存在的OAuth用户 + existing_oauth_user = OAuthUser.objects.filter( + type=oauth_user.type, + openid=oauth_user.openid + ).first() + + if existing_oauth_user: + # 已绑定用户,直接登录 + if existing_oauth_user.author: + return existing_oauth_user.author + else: + # 未绑定用户,转到绑定页面 + return None + else: + # 新用户,保存OAuth信息并转到绑定页面 + oauth_user.save() + return None + + except Exception as e: + logger.error(f"Process OAuth user failed: {e}") + return None + + +def redirect_to_bind_page(oauth_user, request): + """ + 重定向到用户绑定页面 + + Args: + oauth_user: OAuth用户对象 + request: HTTP请求对象 + + Returns: + HttpResponseRedirect: 重定向到绑定页面 + """ + # 生成绑定URL + bind_url = reverse('oauth:bind') + f'?oauth_id={oauth_user.id}' + return HttpResponseRedirect(bind_url) \ No newline at end of file