|
|
|
|
@ -0,0 +1,442 @@
|
|
|
|
|
"""
|
|
|
|
|
OAuth认证模块
|
|
|
|
|
提供第三方登录功能,支持微信、微博、GitHub、Google、QQ、Facebook等平台
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import logging
|
|
|
|
|
import json
|
|
|
|
|
import urllib.parse
|
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
|
from django.urls import reverse
|
|
|
|
|
from django.shortcuts import get_object_or_404, render
|
|
|
|
|
from django.http import HttpResponseRedirect, HttpResponseForbidden
|
|
|
|
|
from django.contrib import auth
|
|
|
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
|
from .models import OAuthUser, OAuthConfig
|
|
|
|
|
|
|
|
|
|
# 获取日志器
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class BaseOauthManager(ABC):
|
|
|
|
|
"""OAuth认证管理器基类"""
|
|
|
|
|
|
|
|
|
|
# 授权URL和API端点
|
|
|
|
|
AUTH_URL = ""
|
|
|
|
|
TOKEN_URL = ""
|
|
|
|
|
OPEN_ID_URL = ""
|
|
|
|
|
USER_INFO_URL = ""
|
|
|
|
|
ICON_NAME = "" # 平台图标名称
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
"""初始化OAuth管理器"""
|
|
|
|
|
self.access_token = None
|
|
|
|
|
self.openid = None
|
|
|
|
|
self.client_id = None
|
|
|
|
|
self.client_secret = None
|
|
|
|
|
self.callback_url = None
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def get_authorization_url(self, next_url='/'):
|
|
|
|
|
"""获取授权URL"""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def get_access_token_by_code(self, code):
|
|
|
|
|
"""通过授权码获取访问令牌"""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
@abstractmethod
|
|
|
|
|
def get_oauth_userinfo(self):
|
|
|
|
|
"""获取用户信息"""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def get_picture(self, metadata):
|
|
|
|
|
"""获取用户头像(可选实现)"""
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
def do_get(self, url, params, headers=None):
|
|
|
|
|
"""
|
|
|
|
|
执行GET请求
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
url: 请求URL
|
|
|
|
|
params: 请求参数
|
|
|
|
|
headers: 请求头
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: 响应内容
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
response = requests.get(url=url, params=params, headers=headers)
|
|
|
|
|
logger.info(f"GET Response: {response.text}")
|
|
|
|
|
return response.text
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"GET request failed: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def do_post(self, url, params, headers=None):
|
|
|
|
|
"""
|
|
|
|
|
执行POST请求
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
url: 请求URL
|
|
|
|
|
params: 请求参数
|
|
|
|
|
headers: 请求头
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: 响应内容
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
response = requests.post(url, data=params, headers=headers)
|
|
|
|
|
logger.info(f"POST Response: {response.text}")
|
|
|
|
|
return response.text
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"POST request failed: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def is_access_token_set(self):
|
|
|
|
|
"""检查访问令牌是否已设置"""
|
|
|
|
|
return self.access_token is not None
|
|
|
|
|
|
|
|
|
|
def get_config(self):
|
|
|
|
|
"""获取OAuth配置"""
|
|
|
|
|
try:
|
|
|
|
|
config = OAuthConfig.objects.filter(
|
|
|
|
|
type=self.ICON_NAME.lower(),
|
|
|
|
|
is_enable=True
|
|
|
|
|
).first()
|
|
|
|
|
if config:
|
|
|
|
|
self.client_id = config.appkey
|
|
|
|
|
self.client_secret = config.appsecret
|
|
|
|
|
self.callback_url = config.callback_url
|
|
|
|
|
return config
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Get OAuth config failed: {e}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WeiboOauthManager(BaseOauthManager):
|
|
|
|
|
"""微博OAuth认证管理器"""
|
|
|
|
|
|
|
|
|
|
AUTH_URL = "https://api.weibo.com/oauth2/authorize"
|
|
|
|
|
TOKEN_URL = "https://api.weibo.com/oauth2/access_token"
|
|
|
|
|
USER_INFO_URL = "https://api.weibo.com/2/users/show.json"
|
|
|
|
|
ICON_NAME = "weibo"
|
|
|
|
|
|
|
|
|
|
def get_authorization_url(self, next_url='/'):
|
|
|
|
|
"""
|
|
|
|
|
获取微博授权URL
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
next_url: 授权成功后跳转的URL
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
str: 完整的授权URL
|
|
|
|
|
"""
|
|
|
|
|
if not self.get_config():
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
params = {
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'response_type': 'code',
|
|
|
|
|
'redirect_uri': self.callback_url + '&next_url=' + next_url,
|
|
|
|
|
}
|
|
|
|
|
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
|
|
|
|
|
return url
|
|
|
|
|
|
|
|
|
|
def get_access_token_by_code(self, code):
|
|
|
|
|
"""
|
|
|
|
|
通过授权码获取访问令牌
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
code: 授权码
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
tuple: (access_token, 响应数据)
|
|
|
|
|
"""
|
|
|
|
|
params = {
|
|
|
|
|
'grant_type': 'authorization_code',
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'client_secret': self.client_secret,
|
|
|
|
|
'code': code,
|
|
|
|
|
'redirect_uri': self.callback_url
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
response_text = self.do_post(self.TOKEN_URL, params)
|
|
|
|
|
token_data = json.loads(response_text)
|
|
|
|
|
|
|
|
|
|
if 'access_token' in token_data:
|
|
|
|
|
self.access_token = token_data['access_token']
|
|
|
|
|
return self.access_token, token_data
|
|
|
|
|
else:
|
|
|
|
|
logger.error(f"Failed to get access token: {token_data}")
|
|
|
|
|
raise OAuthAccessTokenException(token_data)
|
|
|
|
|
|
|
|
|
|
except json.JSONDecodeError as e:
|
|
|
|
|
logger.error(f"JSON decode error: {e}")
|
|
|
|
|
raise OAuthAccessTokenException("Invalid response format")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Get access token failed: {e}")
|
|
|
|
|
raise OAuthAccessTokenException(str(e))
|
|
|
|
|
|
|
|
|
|
def get_oauth_userinfo(self):
|
|
|
|
|
"""
|
|
|
|
|
获取微博用户信息
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
OAuthUser: 用户信息对象
|
|
|
|
|
"""
|
|
|
|
|
if not self.is_access_token_set:
|
|
|
|
|
raise ValueError("Access token not set")
|
|
|
|
|
|
|
|
|
|
params = {'access_token': self.access_token}
|
|
|
|
|
response_text = self.do_get(self.USER_INFO_URL, params)
|
|
|
|
|
user_data = json.loads(response_text)
|
|
|
|
|
|
|
|
|
|
# 创建OAuth用户对象
|
|
|
|
|
oauth_user = OAuthUser()
|
|
|
|
|
oauth_user.nickname = user_data.get('screen_name', '')
|
|
|
|
|
oauth_user.picture = user_data.get('profile_image_url', '')
|
|
|
|
|
oauth_user.token = self.access_token
|
|
|
|
|
oauth_user.type = 'weibo'
|
|
|
|
|
oauth_user.email = user_data.get('email', '')
|
|
|
|
|
oauth_user.metadata = json.dumps(user_data)
|
|
|
|
|
|
|
|
|
|
return oauth_user
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class GitHubOauthManager(BaseOauthManager):
|
|
|
|
|
"""GitHub OAuth认证管理器"""
|
|
|
|
|
|
|
|
|
|
AUTH_URL = "https://github.com/login/oauth/authorize"
|
|
|
|
|
TOKEN_URL = "https://github.com/login/oauth/access_token"
|
|
|
|
|
USER_INFO_URL = "https://api.github.com/user"
|
|
|
|
|
ICON_NAME = "github"
|
|
|
|
|
|
|
|
|
|
def get_authorization_url(self, next_url='/'):
|
|
|
|
|
"""获取GitHub授权URL"""
|
|
|
|
|
if not self.get_config():
|
|
|
|
|
return ""
|
|
|
|
|
|
|
|
|
|
params = {
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'scope': 'user:email',
|
|
|
|
|
'redirect_uri': self.callback_url + '&next_url=' + next_url,
|
|
|
|
|
}
|
|
|
|
|
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
|
|
|
|
|
return url
|
|
|
|
|
|
|
|
|
|
def get_access_token_by_code(self, code):
|
|
|
|
|
"""通过授权码获取GitHub访问令牌"""
|
|
|
|
|
params = {
|
|
|
|
|
'client_id': self.client_id,
|
|
|
|
|
'client_secret': self.client_secret,
|
|
|
|
|
'code': code,
|
|
|
|
|
}
|
|
|
|
|
headers = {'Accept': 'application/json'}
|
|
|
|
|
|
|
|
|
|
response_text = self.do_post(self.TOKEN_URL, params, headers)
|
|
|
|
|
token_data = json.loads(response_text)
|
|
|
|
|
|
|
|
|
|
if 'access_token' in token_data:
|
|
|
|
|
self.access_token = token_data['access_token']
|
|
|
|
|
return self.access_token, token_data
|
|
|
|
|
else:
|
|
|
|
|
raise OAuthAccessTokenException(token_data)
|
|
|
|
|
|
|
|
|
|
def get_oauth_userinfo(self):
|
|
|
|
|
"""获取GitHub用户信息"""
|
|
|
|
|
if not self.is_access_token_set:
|
|
|
|
|
raise ValueError("Access token not set")
|
|
|
|
|
|
|
|
|
|
headers = {'Authorization': f'token {self.access_token}'}
|
|
|
|
|
response_text = self.do_get(self.USER_INFO_URL, headers=headers)
|
|
|
|
|
user_data = json.loads(response_text)
|
|
|
|
|
|
|
|
|
|
oauth_user = OAuthUser()
|
|
|
|
|
oauth_user.nickname = user_data.get('login', '')
|
|
|
|
|
oauth_user.picture = user_data.get('avatar_url', '')
|
|
|
|
|
oauth_user.token = self.access_token
|
|
|
|
|
oauth_user.type = 'github'
|
|
|
|
|
oauth_user.email = user_data.get('email', '')
|
|
|
|
|
oauth_user.metadata = json.dumps(user_data)
|
|
|
|
|
|
|
|
|
|
return oauth_user
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# OAuth异常类
|
|
|
|
|
class OAuthException(Exception):
|
|
|
|
|
"""OAuth异常基类"""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class OAuthAccessTokenException(OAuthException):
|
|
|
|
|
"""访问令牌获取异常"""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class OAuthUserInfoException(OAuthException):
|
|
|
|
|
"""用户信息获取异常"""
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 工具函数
|
|
|
|
|
def get_oauth_apps():
|
|
|
|
|
"""
|
|
|
|
|
获取所有可用的OAuth应用
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
list: OAuth管理器实例列表
|
|
|
|
|
"""
|
|
|
|
|
config_types = OAuthConfig.objects.filter(
|
|
|
|
|
is_enable=True
|
|
|
|
|
).values_list('type', flat=True)
|
|
|
|
|
|
|
|
|
|
applications = BaseOauthManager.__subclasses__()
|
|
|
|
|
apps = [
|
|
|
|
|
app() for app in applications
|
|
|
|
|
if app().ICON_NAME.lower() in config_types
|
|
|
|
|
]
|
|
|
|
|
return apps
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_manager_by_type(oauth_type):
|
|
|
|
|
"""
|
|
|
|
|
根据类型获取OAuth管理器
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
oauth_type: OAuth类型(weibo、github等)
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
BaseOauthManager: OAuth管理器实例
|
|
|
|
|
"""
|
|
|
|
|
applications = get_oauth_apps()
|
|
|
|
|
if applications:
|
|
|
|
|
finds = list(filter(
|
|
|
|
|
lambda x: x.ICON_NAME.lower() == oauth_type.lower(),
|
|
|
|
|
applications
|
|
|
|
|
))
|
|
|
|
|
return finds[0] if finds else None
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 视图函数
|
|
|
|
|
def oauth_login(request):
|
|
|
|
|
"""
|
|
|
|
|
OAuth登录入口
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
request: HTTP请求对象
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
HttpResponseRedirect: 重定向到授权页面或首页
|
|
|
|
|
"""
|
|
|
|
|
oauth_type = request.GET.get('type') # 修复:避免使用内置函数名
|
|
|
|
|
if not oauth_type:
|
|
|
|
|
return HttpResponseRedirect('/')
|
|
|
|
|
|
|
|
|
|
manager = get_manager_by_type(oauth_type)
|
|
|
|
|
if not manager:
|
|
|
|
|
return HttpResponseRedirect('/')
|
|
|
|
|
|
|
|
|
|
next_url = request.GET.get('next_url', '/')
|
|
|
|
|
authorize_url = manager.get_authorization_url(next_url)
|
|
|
|
|
return HttpResponseRedirect(authorize_url)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def oauth_authorize(request):
|
|
|
|
|
"""
|
|
|
|
|
OAuth授权回调处理
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
request: HTTP请求对象
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
HttpResponse: 授权结果页面或重定向
|
|
|
|
|
"""
|
|
|
|
|
oauth_type = request.GET.get('type') # 修复:避免使用内置函数名
|
|
|
|
|
code = request.GET.get('code')
|
|
|
|
|
next_url = request.GET.get('next_url', '/')
|
|
|
|
|
|
|
|
|
|
if not oauth_type or not code:
|
|
|
|
|
return HttpResponseRedirect('/')
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
manager = get_manager_by_type(oauth_type)
|
|
|
|
|
if not manager:
|
|
|
|
|
return HttpResponseRedirect('/')
|
|
|
|
|
|
|
|
|
|
# 获取访问令牌
|
|
|
|
|
manager.get_access_token_by_code(code)
|
|
|
|
|
|
|
|
|
|
# 获取用户信息
|
|
|
|
|
oauth_user = manager.get_oauth_userinfo()
|
|
|
|
|
|
|
|
|
|
# 处理用户登录或绑定
|
|
|
|
|
user = process_oauth_user(oauth_user, request)
|
|
|
|
|
if user:
|
|
|
|
|
auth.login(request, user)
|
|
|
|
|
return HttpResponseRedirect(next_url)
|
|
|
|
|
else:
|
|
|
|
|
# 转到绑定页面
|
|
|
|
|
return redirect_to_bind_page(oauth_user, request)
|
|
|
|
|
|
|
|
|
|
except OAuthException as e:
|
|
|
|
|
logger.error(f"OAuth authorization failed: {e}")
|
|
|
|
|
return render(request, 'oauth/error.html', {
|
|
|
|
|
'error_message': _('OAuth authentication failed')
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process_oauth_user(oauth_user, request):
|
|
|
|
|
"""
|
|
|
|
|
处理OAuth用户信息
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
oauth_user: OAuth用户对象
|
|
|
|
|
request: HTTP请求对象
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
User: 认证用户对象或None
|
|
|
|
|
"""
|
|
|
|
|
try:
|
|
|
|
|
# 查找已存在的OAuth用户
|
|
|
|
|
existing_oauth_user = OAuthUser.objects.filter(
|
|
|
|
|
type=oauth_user.type,
|
|
|
|
|
openid=oauth_user.openid
|
|
|
|
|
).first()
|
|
|
|
|
|
|
|
|
|
if existing_oauth_user:
|
|
|
|
|
# 已绑定用户,直接登录
|
|
|
|
|
if existing_oauth_user.author:
|
|
|
|
|
return existing_oauth_user.author
|
|
|
|
|
else:
|
|
|
|
|
# 未绑定用户,转到绑定页面
|
|
|
|
|
return None
|
|
|
|
|
else:
|
|
|
|
|
# 新用户,保存OAuth信息并转到绑定页面
|
|
|
|
|
oauth_user.save()
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(f"Process OAuth user failed: {e}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def redirect_to_bind_page(oauth_user, request):
|
|
|
|
|
"""
|
|
|
|
|
重定向到用户绑定页面
|
|
|
|
|
|
|
|
|
|
Args:
|
|
|
|
|
oauth_user: OAuth用户对象
|
|
|
|
|
request: HTTP请求对象
|
|
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
|
HttpResponseRedirect: 重定向到绑定页面
|
|
|
|
|
"""
|
|
|
|
|
# 生成绑定URL
|
|
|
|
|
bind_url = reverse('oauth:bind') + f'?oauth_id={oauth_user.id}'
|
|
|
|
|
return HttpResponseRedirect(bind_url)
|