“3063176711@qq.com” 3 months ago
commit afaac59a88

@ -0,0 +1,442 @@
"""
OAuth认证模块
提供第三方登录功能支持微信微博GitHubGoogleQQFacebook等平台
"""
import logging
import json
import urllib.parse
from abc import ABC, abstractmethod
from django.urls import reverse
from django.shortcuts import get_object_or_404, render
from django.http import HttpResponseRedirect, HttpResponseForbidden
from django.contrib import auth
from django.utils.translation import gettext_lazy as _
from .models import OAuthUser, OAuthConfig
# 获取日志器
logger = logging.getLogger(__name__)
class BaseOauthManager(ABC):
"""OAuth认证管理器基类"""
# 授权URL和API端点
AUTH_URL = ""
TOKEN_URL = ""
OPEN_ID_URL = ""
USER_INFO_URL = ""
ICON_NAME = "" # 平台图标名称
def __init__(self):
"""初始化OAuth管理器"""
self.access_token = None
self.openid = None
self.client_id = None
self.client_secret = None
self.callback_url = None
@abstractmethod
def get_authorization_url(self, next_url='/'):
"""获取授权URL"""
pass
@abstractmethod
def get_access_token_by_code(self, code):
"""通过授权码获取访问令牌"""
pass
@abstractmethod
def get_oauth_userinfo(self):
"""获取用户信息"""
pass
def get_picture(self, metadata):
"""获取用户头像(可选实现)"""
return ""
def do_get(self, url, params, headers=None):
"""
执行GET请求
Args:
url: 请求URL
params: 请求参数
headers: 请求头
Returns:
str: 响应内容
"""
try:
response = requests.get(url=url, params=params, headers=headers)
logger.info(f"GET Response: {response.text}")
return response.text
except Exception as e:
logger.error(f"GET request failed: {e}")
raise
def do_post(self, url, params, headers=None):
"""
执行POST请求
Args:
url: 请求URL
params: 请求参数
headers: 请求头
Returns:
str: 响应内容
"""
try:
response = requests.post(url, data=params, headers=headers)
logger.info(f"POST Response: {response.text}")
return response.text
except Exception as e:
logger.error(f"POST request failed: {e}")
raise
@property
def is_access_token_set(self):
"""检查访问令牌是否已设置"""
return self.access_token is not None
def get_config(self):
"""获取OAuth配置"""
try:
config = OAuthConfig.objects.filter(
type=self.ICON_NAME.lower(),
is_enable=True
).first()
if config:
self.client_id = config.appkey
self.client_secret = config.appsecret
self.callback_url = config.callback_url
return config
except Exception as e:
logger.error(f"Get OAuth config failed: {e}")
return None
class WeiboOauthManager(BaseOauthManager):
"""微博OAuth认证管理器"""
AUTH_URL = "https://api.weibo.com/oauth2/authorize"
TOKEN_URL = "https://api.weibo.com/oauth2/access_token"
USER_INFO_URL = "https://api.weibo.com/2/users/show.json"
ICON_NAME = "weibo"
def get_authorization_url(self, next_url='/'):
"""
获取微博授权URL
Args:
next_url: 授权成功后跳转的URL
Returns:
str: 完整的授权URL
"""
if not self.get_config():
return ""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""
通过授权码获取访问令牌
Args:
code: 授权码
Returns:
tuple: (access_token, 响应数据)
"""
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': self.callback_url
}
try:
response_text = self.do_post(self.TOKEN_URL, params)
token_data = json.loads(response_text)
if 'access_token' in token_data:
self.access_token = token_data['access_token']
return self.access_token, token_data
else:
logger.error(f"Failed to get access token: {token_data}")
raise OAuthAccessTokenException(token_data)
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {e}")
raise OAuthAccessTokenException("Invalid response format")
except Exception as e:
logger.error(f"Get access token failed: {e}")
raise OAuthAccessTokenException(str(e))
def get_oauth_userinfo(self):
"""
获取微博用户信息
Returns:
OAuthUser: 用户信息对象
"""
if not self.is_access_token_set:
raise ValueError("Access token not set")
params = {'access_token': self.access_token}
response_text = self.do_get(self.USER_INFO_URL, params)
user_data = json.loads(response_text)
# 创建OAuth用户对象
oauth_user = OAuthUser()
oauth_user.nickname = user_data.get('screen_name', '')
oauth_user.picture = user_data.get('profile_image_url', '')
oauth_user.token = self.access_token
oauth_user.type = 'weibo'
oauth_user.email = user_data.get('email', '')
oauth_user.metadata = json.dumps(user_data)
return oauth_user
class GitHubOauthManager(BaseOauthManager):
"""GitHub OAuth认证管理器"""
AUTH_URL = "https://github.com/login/oauth/authorize"
TOKEN_URL = "https://github.com/login/oauth/access_token"
USER_INFO_URL = "https://api.github.com/user"
ICON_NAME = "github"
def get_authorization_url(self, next_url='/'):
"""获取GitHub授权URL"""
if not self.get_config():
return ""
params = {
'client_id': self.client_id,
'scope': 'user:email',
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取GitHub访问令牌"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
}
headers = {'Accept': 'application/json'}
response_text = self.do_post(self.TOKEN_URL, params, headers)
token_data = json.loads(response_text)
if 'access_token' in token_data:
self.access_token = token_data['access_token']
return self.access_token, token_data
else:
raise OAuthAccessTokenException(token_data)
def get_oauth_userinfo(self):
"""获取GitHub用户信息"""
if not self.is_access_token_set:
raise ValueError("Access token not set")
headers = {'Authorization': f'token {self.access_token}'}
response_text = self.do_get(self.USER_INFO_URL, headers=headers)
user_data = json.loads(response_text)
oauth_user = OAuthUser()
oauth_user.nickname = user_data.get('login', '')
oauth_user.picture = user_data.get('avatar_url', '')
oauth_user.token = self.access_token
oauth_user.type = 'github'
oauth_user.email = user_data.get('email', '')
oauth_user.metadata = json.dumps(user_data)
return oauth_user
# OAuth异常类
class OAuthException(Exception):
"""OAuth异常基类"""
pass
class OAuthAccessTokenException(OAuthException):
"""访问令牌获取异常"""
pass
class OAuthUserInfoException(OAuthException):
"""用户信息获取异常"""
pass
# 工具函数
def get_oauth_apps():
"""
获取所有可用的OAuth应用
Returns:
list: OAuth管理器实例列表
"""
config_types = OAuthConfig.objects.filter(
is_enable=True
).values_list('type', flat=True)
applications = BaseOauthManager.__subclasses__()
apps = [
app() for app in applications
if app().ICON_NAME.lower() in config_types
]
return apps
def get_manager_by_type(oauth_type):
"""
根据类型获取OAuth管理器
Args:
oauth_type: OAuth类型weibogithub等
Returns:
BaseOauthManager: OAuth管理器实例
"""
applications = get_oauth_apps()
if applications:
finds = list(filter(
lambda x: x.ICON_NAME.lower() == oauth_type.lower(),
applications
))
return finds[0] if finds else None
return None
# 视图函数
def oauth_login(request):
"""
OAuth登录入口
Args:
request: HTTP请求对象
Returns:
HttpResponseRedirect: 重定向到授权页面或首页
"""
oauth_type = request.GET.get('type') # 修复:避免使用内置函数名
if not oauth_type:
return HttpResponseRedirect('/')
manager = get_manager_by_type(oauth_type)
if not manager:
return HttpResponseRedirect('/')
next_url = request.GET.get('next_url', '/')
authorize_url = manager.get_authorization_url(next_url)
return HttpResponseRedirect(authorize_url)
def oauth_authorize(request):
"""
OAuth授权回调处理
Args:
request: HTTP请求对象
Returns:
HttpResponse: 授权结果页面或重定向
"""
oauth_type = request.GET.get('type') # 修复:避免使用内置函数名
code = request.GET.get('code')
next_url = request.GET.get('next_url', '/')
if not oauth_type or not code:
return HttpResponseRedirect('/')
try:
manager = get_manager_by_type(oauth_type)
if not manager:
return HttpResponseRedirect('/')
# 获取访问令牌
manager.get_access_token_by_code(code)
# 获取用户信息
oauth_user = manager.get_oauth_userinfo()
# 处理用户登录或绑定
user = process_oauth_user(oauth_user, request)
if user:
auth.login(request, user)
return HttpResponseRedirect(next_url)
else:
# 转到绑定页面
return redirect_to_bind_page(oauth_user, request)
except OAuthException as e:
logger.error(f"OAuth authorization failed: {e}")
return render(request, 'oauth/error.html', {
'error_message': _('OAuth authentication failed')
})
def process_oauth_user(oauth_user, request):
"""
处理OAuth用户信息
Args:
oauth_user: OAuth用户对象
request: HTTP请求对象
Returns:
User: 认证用户对象或None
"""
try:
# 查找已存在的OAuth用户
existing_oauth_user = OAuthUser.objects.filter(
type=oauth_user.type,
openid=oauth_user.openid
).first()
if existing_oauth_user:
# 已绑定用户,直接登录
if existing_oauth_user.author:
return existing_oauth_user.author
else:
# 未绑定用户,转到绑定页面
return None
else:
# 新用户保存OAuth信息并转到绑定页面
oauth_user.save()
return None
except Exception as e:
logger.error(f"Process OAuth user failed: {e}")
return None
def redirect_to_bind_page(oauth_user, request):
"""
重定向到用户绑定页面
Args:
oauth_user: OAuth用户对象
request: HTTP请求对象
Returns:
HttpResponseRedirect: 重定向到绑定页面
"""
# 生成绑定URL
bind_url = reverse('oauth:bind') + f'?oauth_id={oauth_user.id}'
return HttpResponseRedirect(bind_url)
Loading…
Cancel
Save