updata forth

flj_branch
flj666 4 months ago
parent 782ad7f7b7
commit 8f09715ed3

@ -1,4 +1,4 @@
# 这个文件定义了用户相关的数据模型
#flj 这个文件定义了用户相关的数据模型
from django.contrib.auth.models import AbstractUser
from django.db import models
from django.urls import reverse
@ -9,27 +9,28 @@ from djangoblog.utils import get_current_site
# Create your models here.
#zxm 自定义用户模型扩展了Django的默认用户模型
class BlogUser(AbstractUser):
# 用户昵称
#zxm 用户昵称
nickname = models.CharField(_('nick name'), max_length=100, blank=True)
# 创建时间
#zxm 创建时间
creation_time = models.DateTimeField(_('creation time'), default=now)
# 最后修改时间
#zxm 最后修改时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
# 来源
#zxm 来源
source = models.CharField(_('create source'), max_length=100, blank=True)
# 获取用户详情页的url
#zxm 获取用户详情页的url
def get_absolute_url(self):
return reverse(
'blog:author_detail', kwargs={
'author_name': self.username})
# 返回邮箱作为用户标识
#zxm 返回邮箱作为用户标识
def __str__(self):
return self.email
# 获取用户的完整url
#zxm 获取用户的完整url
def get_full_url(self):
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
@ -37,7 +38,7 @@ class BlogUser(AbstractUser):
return url
class Meta:
ordering = ['-id']
verbose_name = _('user')
verbose_name_plural = verbose_name
get_latest_by = 'id'
ordering = ['-id'] #zxm 按ID倒序排列
verbose_name = _('user') #zxm 在管理后台显示的名称
verbose_name_plural = verbose_name #zxm 复数形式
get_latest_by = 'id' #zxm 获取最新记录的依据

@ -1,3 +1,4 @@
#flj 用户账户相关视图
import logging
from django.utils.translation import gettext_lazy as _
from django.conf import settings
@ -31,29 +32,32 @@ logger = logging.getLogger(__name__)
# Create your views here.
#zxm 用户注册视图类
class RegisterView(FormView):
form_class = RegisterForm
template_name = 'account/registration_form.html'
form_class = RegisterForm #zxm 使用注册表单
template_name = 'account/registration_form.html' #zxm 注册页面模板
@method_decorator(csrf_protect)
@method_decorator(csrf_protect) #zxm 防止CSRF攻击
def dispatch(self, *args, **kwargs):
return super(RegisterView, self).dispatch(*args, **kwargs)
#zxm 表单验证成功后的处理
def form_valid(self, form):
if form.is_valid():
user = form.save(False)
user.is_active = False
user.source = 'Register'
user.save(True)
site = get_current_site().domain
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
user = form.save(False) #zxm 不保存到数据库
user.is_active = False #zxm 初始状态为未激活
user.source = 'Register' #zxm 注册来源
user.save(True) #zxm 保存到数据库
site = get_current_site().domain #zxm 获取当前站点域名
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id))) #zxm 生成验证签名
if settings.DEBUG:
site = '127.0.0.1:8000'
path = reverse('account:result')
site = '127.0.0.1:8000' #zxm 开发环境使用本地地址
path = reverse('account:result') #zxm 获取结果页面URL
url = "http://{site}{path}?type=validation&id={id}&sign={sign}".format(
site=site, path=path, id=user.id, sign=sign)
site=site, path=path, id=user.id, sign=sign) #zxm 生成验证链接
#zxm 邮件内容
content = """
<p>请点击下面链接验证您的邮箱</p>
@ -64,6 +68,7 @@ class RegisterView(FormView):
如果上面链接无法打开请将此链接复制至浏览器
{url}
""".format(url=url)
#zxm 发送验证邮件
send_email(
emailto=[
user.email,
@ -80,33 +85,35 @@ class RegisterView(FormView):
})
#fkc 用户登出视图类
class LogoutView(RedirectView):
url = '/login/'
url = '/login/' #zxm 登出后重定向到登录页
@method_decorator(never_cache)
@method_decorator(never_cache) #zxm 不缓存页面
def dispatch(self, request, *args, **kwargs):
return super(LogoutView, self).dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
logout(request)
delete_sidebar_cache()
logout(request) #zxm 登出用户
delete_sidebar_cache() #zxm 删除侧边栏缓存
return super(LogoutView, self).get(request, *args, **kwargs)
#cll 用户登录视图类
class LoginView(FormView):
form_class = LoginForm
template_name = 'account/login.html'
success_url = '/'
redirect_field_name = REDIRECT_FIELD_NAME
login_ttl = 2626560 # 一个月的时间
@method_decorator(sensitive_post_parameters('password'))
@method_decorator(csrf_protect)
@method_decorator(never_cache)
form_class = LoginForm #zxm 使用登录表单
template_name = 'account/login.html' #zxm 登录页面模板
success_url = '/' #zxm 登录成功后重定向到首页
redirect_field_name = REDIRECT_FIELD_NAME #zxm 重定向字段名
login_ttl = 2626560 #zxm 一个月的时间(记住我功能)
@method_decorator(sensitive_post_parameters('password')) #zxm 敏感参数保护
@method_decorator(csrf_protect) #zxm 防止CSRF攻击
@method_decorator(never_cache) #zxm 不缓存页面
def dispatch(self, request, *args, **kwargs):
return super(LoginView, self).dispatch(request, *args, **kwargs)
#zxm 获取上下文数据
def get_context_data(self, **kwargs):
redirect_to = self.request.GET.get(self.redirect_field_name)
if redirect_to is None:
@ -115,90 +122,95 @@ class LoginView(FormView):
return super(LoginView, self).get_context_data(**kwargs)
#zxm 表单验证成功后的处理
def form_valid(self, form):
form = AuthenticationForm(data=self.request.POST, request=self.request)
if form.is_valid():
delete_sidebar_cache()
delete_sidebar_cache() #zxm 删除侧边栏缓存
logger.info(self.redirect_field_name)
auth.login(self.request, form.get_user())
if self.request.POST.get("remember"):
self.request.session.set_expiry(self.login_ttl)
auth.login(self.request, form.get_user()) #zxm 登录用户
if self.request.POST.get("remember"): #zxm 如果勾选记住我
self.request.session.set_expiry(self.login_ttl) #zxm 设置会话过期时间
return super(LoginView, self).form_valid(form)
# return HttpResponseRedirect('/')
else:
return self.render_to_response({
'form': form
})
#zxm 获取成功后重定向的URL
def get_success_url(self):
redirect_to = self.request.POST.get(self.redirect_field_name)
if not url_has_allowed_host_and_scheme(
url=redirect_to, allowed_hosts=[
self.request.get_host()]):
self.request.get_host()]): #zxm 安全检查
redirect_to = self.success_url
return redirect_to
#xy 账户结果处理函数
def account_result(request):
type = request.GET.get('type')
id = request.GET.get('id')
type = request.GET.get('type') #zxm 获取类型参数
id = request.GET.get('id') #zxm 获取用户ID
user = get_object_or_404(get_user_model(), id=id)
user = get_object_or_404(get_user_model(), id=id) #zxm 获取用户对象
logger.info(type)
if user.is_active:
return HttpResponseRedirect('/')
if type and type in ['register', 'validation']:
if type == 'register':
if user.is_active: #zxm 如果用户已激活
return HttpResponseRedirect('/') #zxm 重定向到首页
if type and type in ['register', 'validation']: #zxm 处理注册或验证类型
if type == 'register': #zxm 注册成功
content = '''
恭喜您注册成功一封验证邮件已经发送到您的邮箱请验证您的邮箱后登录本站
'''
title = '注册成功'
else:
c_sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
sign = request.GET.get('sign')
if sign != c_sign:
return HttpResponseForbidden()
user.is_active = True
user.save()
else: #zxm 邮箱验证
c_sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id))) #zxm 计算签名
sign = request.GET.get('sign') #zxm 获取请求中的签名
if sign != c_sign: #zxm 验证签名
return HttpResponseForbidden() #zxm 签名不匹配,禁止访问
user.is_active = True #zxm 激活用户
user.save() #zxm 保存修改
content = '''
恭喜您已经成功的完成邮箱验证您现在可以使用您的账号来登录本站
'''
title = '验证成功'
return render(request, 'account/result.html', {
return render(request, 'account/result.html', { #zxm 渲染结果页面
'title': title,
'content': content
})
else:
return HttpResponseRedirect('/')
return HttpResponseRedirect('/') #zxm 其他情况重定向到首页
#zhj 忘记密码视图类
class ForgetPasswordView(FormView):
form_class = ForgetPasswordForm
template_name = 'account/forget_password.html'
form_class = ForgetPasswordForm #zxm 使用忘记密码表单
template_name = 'account/forget_password.html' #zxm 忘记密码页面模板
#zxm 表单验证成功后的处理
def form_valid(self, form):
if form.is_valid():
blog_user = BlogUser.objects.filter(email=form.cleaned_data.get("email")).get()
blog_user.password = make_password(form.cleaned_data["new_password2"])
blog_user.save()
return HttpResponseRedirect('/login/')
blog_user = BlogUser.objects.filter(email=form.cleaned_data.get("email")).get() #zxm 获取用户
blog_user.password = make_password(form.cleaned_data["new_password2"]) #zxm 设置新密码
blog_user.save() #zxm 保存修改
return HttpResponseRedirect('/login/') #zxm 重定向到登录页
else:
return self.render_to_response({'form': form})
return self.render_to_response({'form': form}) #zxm 渲染表单错误
#flj 忘记密码邮箱验证码视图
class ForgetPasswordEmailCode(View):
#zxm 处理POST请求
def post(self, request: HttpRequest):
form = ForgetPasswordCodeForm(request.POST)
form = ForgetPasswordCodeForm(request.POST) #zxm 验证表单
if not form.is_valid():
return HttpResponse("错误的邮箱")
to_email = form.cleaned_data["email"]
return HttpResponse("错误的邮箱") #zxm 返回错误信息
to_email = form.cleaned_data["email"] #zxm 获取邮箱
code = generate_code()
utils.send_verify_email(to_email, code)
utils.set_code(to_email, code)
code = generate_code() #zxm 生成验证码
utils.send_verify_email(to_email, code) #zxm 发送验证邮件
utils.set_code(to_email, code) #zxm 保存验证码
return HttpResponse("ok")
return HttpResponse("ok") #zxm 返回成功信息

@ -1,3 +1,4 @@
#zxm 评论相关模型
from django.conf import settings
from django.db import models
from django.utils.timezone import now
@ -8,32 +9,41 @@ from blog.models import Article
# Create your models here.
#fkc 评论模型类
class Comment(models.Model):
#fkc 评论正文
body = models.TextField('正文', max_length=300)
#fkc 创建时间
creation_time = models.DateTimeField(_('creation time'), default=now)
#fkc 最后修改时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
#fkc 评论作者(外键关联用户)
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
on_delete=models.CASCADE)
#fkc 评论的文章(外键关联文章)
article = models.ForeignKey(
Article,
verbose_name=_('article'),
on_delete=models.CASCADE)
#fkc 父评论(支持回复功能)
parent_comment = models.ForeignKey(
'self',
verbose_name=_('parent comment'),
blank=True,
null=True,
on_delete=models.CASCADE)
#fkc 是否启用(审核功能)
is_enable = models.BooleanField(_('enable'),
default=False, blank=False, null=False)
class Meta:
ordering = ['-id']
verbose_name = _('comment')
verbose_name_plural = verbose_name
get_latest_by = 'id'
ordering = ['-id'] #fkc 按ID倒序排列
verbose_name = _('comment') #fkc 在管理后台显示的名称
verbose_name_plural = verbose_name #fkc 复数形式
get_latest_by = 'id' #fkc 获取最新记录的依据
#fkc 返回评论正文作为字符串表示
def __str__(self):
return self.body

@ -1,4 +1,5 @@
# Create your views here.
#cll 评论相关视图
from django.core.exceptions import ValidationError
from django.http import HttpResponseRedirect
from django.shortcuts import get_object_or_404
@ -12,52 +13,58 @@ from .forms import CommentForm
from .models import Comment
#xy 评论提交视图类
class CommentPostView(FormView):
form_class = CommentForm
template_name = 'blog/article_detail.html'
form_class = CommentForm #xy 使用评论表单
template_name = 'blog/article_detail.html' #xy 文章详情页面模板
@method_decorator(csrf_protect)
@method_decorator(csrf_protect) #xy 防止CSRF攻击
def dispatch(self, *args, **kwargs):
return super(CommentPostView, self).dispatch(*args, **kwargs)
#xy 处理GET请求
def get(self, request, *args, **kwargs):
article_id = self.kwargs['article_id']
article = get_object_or_404(Article, pk=article_id)
url = article.get_absolute_url()
return HttpResponseRedirect(url + "#comments")
article_id = self.kwargs['article_id'] #xy 获取文章ID
article = get_object_or_404(Article, pk=article_id) #xy 获取文章对象
url = article.get_absolute_url() #xy 获取文章详情页URL
return HttpResponseRedirect(url + "#comments") #xy 重定向到评论区
#xy 表单验证失败后的处理
def form_invalid(self, form):
article_id = self.kwargs['article_id']
article = get_object_or_404(Article, pk=article_id)
article_id = self.kwargs['article_id'] #xy 获取文章ID
article = get_object_or_404(Article, pk=article_id) #xy 获取文章对象
return self.render_to_response({
'form': form,
'article': article
'form': form, #xy 带错误的表单
'article': article #xy 文章对象
})
#xy 表单验证成功后的处理
def form_valid(self, form):
"""提交的数据验证合法后的逻辑"""
user = self.request.user
author = BlogUser.objects.get(pk=user.pk)
article_id = self.kwargs['article_id']
article = get_object_or_404(Article, pk=article_id)
user = self.request.user #xy 获取当前用户
author = BlogUser.objects.get(pk=user.pk) #xy 获取用户对象
article_id = self.kwargs['article_id'] #xy 获取文章ID
article = get_object_or_404(Article, pk=article_id) #xy 获取文章对象
#xy 检查文章是否允许评论
if article.comment_status == 'c' or article.status == 'c':
raise ValidationError("该文章评论已关闭.")
comment = form.save(False)
comment.article = article
comment = form.save(False) #xy 不保存到数据库
comment.article = article #xy 设置评论所属文章
from djangoblog.utils import get_blog_setting
settings = get_blog_setting()
if not settings.comment_need_review:
comment.is_enable = True
comment.author = author
settings = get_blog_setting() #xy 获取博客设置
if not settings.comment_need_review: #xy 如果不需要审核
comment.is_enable = True #xy 直接启用评论
comment.author = author #xy 设置评论作者
if form.cleaned_data['parent_comment_id']:
#xy 处理回复评论
if form.cleaned_data['parent_comment_id']: #xy 如果有父评论ID
parent_comment = Comment.objects.get(
pk=form.cleaned_data['parent_comment_id'])
comment.parent_comment = parent_comment
pk=form.cleaned_data['parent_comment_id']) #xy 获取父评论
comment.parent_comment = parent_comment #xy 设置父评论
comment.save(True)
comment.save(True) #xy 保存评论到数据库
return HttpResponseRedirect(
"%s#div-comment-%d" %
(article.get_absolute_url(), comment.pk))
(article.get_absolute_url(), comment.pk)) #xy 重定向到评论位置

@ -1,4 +1,4 @@
# Create your models here.
#xy OAuth相关模型
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
@ -6,33 +6,47 @@ from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
#zhj OAuth用户模型类
class OAuthUser(models.Model):
#zhj 关联的博客用户
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
blank=True,
null=True,
on_delete=models.CASCADE)
#zhj 第三方平台openid
openid = models.CharField(max_length=50)
#zhj 用户昵称
nickname = models.CharField(max_length=50, verbose_name=_('nick name'))
#zhj 访问令牌
token = models.CharField(max_length=150, null=True, blank=True)
#zhj 用户头像
picture = models.CharField(max_length=350, blank=True, null=True)
#zhj OAuth类型github、weibo等
type = models.CharField(blank=False, null=False, max_length=50)
#zhj 用户邮箱
email = models.CharField(max_length=50, null=True, blank=True)
#zhj 元数据
metadata = models.TextField(null=True, blank=True)
#zhj 创建时间
creation_time = models.DateTimeField(_('creation time'), default=now)
#zhj 最后修改时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
#zhj 返回用户昵称作为字符串表示
def __str__(self):
return self.nickname
class Meta:
verbose_name = _('oauth user')
verbose_name_plural = verbose_name
ordering = ['-creation_time']
verbose_name = _('oauth user') #zhj 在管理后台显示的名称
verbose_name_plural = verbose_name #zhj 复数形式
ordering = ['-creation_time'] #zhj 按创建时间倒序排列
#flj OAuth配置模型类
class OAuthConfig(models.Model):
#flj OAuth类型选项
TYPE = (
('weibo', _('weibo')),
('google', _('google')),
@ -40,28 +54,37 @@ class OAuthConfig(models.Model):
('facebook', 'FaceBook'),
('qq', 'QQ'),
)
#flj OAuth类型
type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a')
#flj 应用密钥
appkey = models.CharField(max_length=200, verbose_name='AppKey')
#flj 应用密钥密钥
appsecret = models.CharField(max_length=200, verbose_name='AppSecret')
#flj 回调URL
callback_url = models.CharField(
max_length=200,
verbose_name=_('callback url'),
blank=False,
default='')
#flj 是否启用
is_enable = models.BooleanField(
_('is enable'), default=True, blank=False, null=False)
#flj 创建时间
creation_time = models.DateTimeField(_('creation time'), default=now)
#flj 最后修改时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
#flj 验证方法,确保每种类型只有一个配置
def clean(self):
if OAuthConfig.objects.filter(
type=self.type).exclude(id=self.id).count():
raise ValidationError(_(self.type + _('already exists')))
#flj 返回OAuth类型作为字符串表示
def __str__(self):
return self.type
class Meta:
verbose_name = 'oauth配置'
verbose_name_plural = verbose_name
ordering = ['-creation_time']
verbose_name = 'oauth配置' #flj 在管理后台显示的名称
verbose_name_plural = verbose_name #flj 复数形式
ordering = ['-creation_time'] #flj 按创建时间倒序排列

@ -9,86 +9,95 @@ import requests
from djangoblog.utils import cache_decorator
from oauth.models import OAuthUser, OAuthConfig
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__) #flj
class OAuthAccessTokenException(Exception):
'''
oauth授权失败异常
'''
class OAuthAccessTokenException(Exception): #fkc
'''oauth授权失败异常'''
class BaseOauthManager(metaclass=ABCMeta):
"""获取用户授权"""
class BaseOauthManager(metaclass=ABCMeta): #cll
"""OAuth授权基类定义统一接口规范"""
#cll 授权页面URL
AUTH_URL = None
"""获取token"""
#cll 获取访问令牌URL
TOKEN_URL = None
"""获取用户信息"""
#cll 获取用户信息URL
API_URL = None
'''icon图标名'''
#cll 平台图标标识名
ICON_NAME = None
def __init__(self, access_token=None, openid=None):
#cll 访问令牌
self.access_token = access_token
#cll 第三方平台用户唯一标识
self.openid = openid
@property
def is_access_token_set(self):
"""判断访问令牌是否已设置"""
return self.access_token is not None
@property
def is_authorized(self):
"""判断是否已完成授权令牌和openid均存在"""
return self.is_access_token_set and self.access_token is not None and self.openid is not None
@abstractmethod
def get_authorization_url(self, nexturl='/'):
"""抽象方法获取授权跳转URL"""
pass
@abstractmethod
def get_access_token_by_code(self, code):
"""抽象方法:通过授权码获取访问令牌"""
pass
@abstractmethod
def get_oauth_userinfo(self):
"""抽象方法:通过访问令牌获取用户信息"""
pass
@abstractmethod
def get_picture(self, metadata):
"""抽象方法从元数据中提取用户头像URL"""
pass
def do_get(self, url, params, headers=None):
"""通用GET请求方法"""
rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
"""通用POST请求方法"""
rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text)
return rsp.text
def get_config(self):
"""获取当前平台的OAuth配置从数据库读取"""
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None
class WBOauthManager(BaseOauthManager):
class WBOauthManager(BaseOauthManager): #xy
"""微博OAuth授权管理器"""
AUTH_URL = 'https://api.weibo.com/oauth2/authorize'
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token'
API_URL = 'https://api.weibo.com/2/users/show.json'
ICON_NAME = 'weibo'
def __init__(self, access_token=None, openid=None):
# cll获取微博OAuth配置
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
WBOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super().__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, nexturl='/'):
"""构建微博授权跳转URL包含回调地址和后续跳转路径"""
params = {
'client_id': self.client_id,
'response_type': 'code',
@ -98,7 +107,7 @@ class WBOauthManager(BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""通过授权码获取微博访问令牌和用户UID"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
@ -107,8 +116,9 @@ class WBOauthManager(BaseOauthManager):
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
# cll 成功获取令牌后,存储并返回用户信息
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['uid'])
@ -117,22 +127,27 @@ class WBOauthManager(BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""通过访问令牌获取微博用户信息(昵称、头像、邮箱等)"""
if not self.is_authorized:
return None
params = {
'uid': self.openid,
'access_token': self.access_token
}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['avatar_large']
user.nickname = datas['screen_name']
user.openid = datas['id']
user.type = 'weibo'
user.token = self.access_token
user.metadata = rsp # cll 存储原始返回数据
user.picture = datas['avatar_large'] # cll 大尺寸头像
user.nickname = datas['screen_name'] # cll 昵称
user.openid = datas['id'] # cll 用户唯一标识
user.type = 'weibo' # cll 平台类型
user.token = self.access_token # cll 存储访问令牌
# cll 若返回邮箱则存储
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
@ -142,12 +157,15 @@ class WBOauthManager(BaseOauthManager):
return None
def get_picture(self, metadata):
"""从元数据中提取微博用户头像URL"""
datas = json.loads(metadata)
return datas['avatar_large']
class ProxyManagerMixin:
class ProxyManagerMixin:
"""代理请求混入类支持通过环境变量配置HTTP代理"""
def __init__(self, *args, **kwargs):
# cll 从环境变量读取代理配置
if os.environ.get("HTTP_PROXY"):
self.proxies = {
"http": os.environ.get("HTTP_PROXY"),
@ -157,17 +175,20 @@ class ProxyManagerMixin:
self.proxies = None
def do_get(self, url, params, headers=None):
"""带代理的GET请求"""
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
"""带代理的POST请求"""
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
"""Google OAuth授权管理器支持代理"""
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
@ -178,13 +199,10 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
GoogleOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super().__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, nexturl='/'):
"""构建Google授权跳转URL请求openid和email权限"""
params = {
'client_id': self.client_id,
'response_type': 'code',
@ -195,43 +213,45 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""通过授权码获取Google访问令牌"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
self.access_token = str(obj['access_token'])
self.openid = str(obj['id_token'])
self.openid = str(obj['id_token']) # cll Google用id_token作为openid
logger.info(self.ICON_NAME + ' oauth ' + rsp)
return self.access_token
else:
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""通过访问令牌获取Google用户信息"""
if not self.is_authorized:
return None
params = {
'access_token': self.access_token
}
rsp = self.do_get(self.API_URL, params)
try:
datas = json.loads(rsp)
user = OAuthUser()
user.metadata = rsp
user.picture = datas['picture']
user.nickname = datas['name']
user.openid = datas['sub']
user.picture = datas['picture'] # cll 头像URL
user.nickname = datas['name'] # cll 姓名
user.openid = datas['sub'] # cll 唯一标识
user.token = self.access_token
user.type = 'google'
if datas['email']:
user.email = datas['email']
return user
@ -241,11 +261,13 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
return None
def get_picture(self, metadata):
"""从元数据提取Google用户头像"""
datas = json.loads(metadata)
return datas['picture']
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
"""GitHub OAuth授权管理器支持代理"""
AUTH_URL = 'https://github.com/login/oauth/authorize'
TOKEN_URL = 'https://github.com/login/oauth/access_token'
API_URL = 'https://api.github.com/user'
@ -256,13 +278,10 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
GitHubOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super().__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""构建GitHub授权跳转URL请求user权限"""
params = {
'client_id': self.client_id,
'response_type': 'code',
@ -273,16 +292,17 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""通过授权码获取GitHub访问令牌"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
# cll GitHub返回格式为form-encoded需解析
from urllib import parse
r = parse.parse_qs(rsp)
if 'access_token' in r:
@ -292,19 +312,21 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""通过访问令牌获取GitHub用户信息需在请求头携带令牌"""
rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token
})
try:
datas = json.loads(rsp)
user = OAuthUser()
user.picture = datas['avatar_url']
user.nickname = datas['name']
user.openid = datas['id']
user.picture = datas['avatar_url'] # cll 头像URL
user.nickname = datas['name'] # cll 姓名(可能为空)
user.openid = datas['id'] # cll 唯一标识
user.type = 'github'
user.token = self.access_token
user.metadata = rsp
if 'email' in datas and datas['email']:
user.email = datas['email']
return user
@ -314,11 +336,13 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
return None
def get_picture(self, metadata):
"""从元数据提取GitHub用户头像"""
datas = json.loads(metadata)
return datas['avatar_url']
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): #fkc
"""Facebook OAuth授权管理器支持代理"""
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth'
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token'
API_URL = 'https://graph.facebook.com/me'
@ -329,13 +353,10 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
FaceBookOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super().__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""构建Facebook授权跳转URL请求邮箱和公开资料权限"""
params = {
'client_id': self.client_id,
'response_type': 'code',
@ -346,17 +367,16 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""通过授权码获取Facebook访问令牌"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
# 'grant_type': 'authorization_code',
'code': code,
'redirect_uri': self.callback_url
}
rsp = self.do_post(self.TOKEN_URL, params)
obj = json.loads(rsp)
if 'access_token' in obj:
token = str(obj['access_token'])
self.access_token = token
@ -365,21 +385,26 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""通过访问令牌获取Facebook用户信息指定返回字段"""
params = {
'access_token': self.access_token,
'fields': 'id,name,picture,email'
'fields': 'id,name,picture,email' # cll 指定需要返回的字段
}
try:
rsp = self.do_get(self.API_URL, params)
datas = json.loads(rsp)
user = OAuthUser()
user.nickname = datas['name']
user.openid = datas['id']
user.nickname = datas['name'] # cll 姓名
user.openid = datas['id'] # cll 唯一标识
user.type = 'facebook'
user.token = self.access_token
user.metadata = rsp
if 'email' in datas and datas['email']:
user.email = datas['email']
# cll 解析头像URLFacebook返回格式嵌套较深
if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']:
user.picture = str(datas['picture']['data']['url'])
return user
@ -388,14 +413,17 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
return None
def get_picture(self, metadata):
"""从元数据提取Facebook用户头像"""
datas = json.loads(metadata)
return str(datas['picture']['data']['url'])
class QQOauthManager(BaseOauthManager):
class QQOauthManager(BaseOauthManager): #cll
"""QQ OAuth授权管理器"""
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize'
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token'
API_URL = 'https://graph.qq.com/user/get_user_info'
# cll QQ需单独请求openid的URL
OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me'
ICON_NAME = 'qq'
@ -404,13 +432,10 @@ class QQOauthManager(BaseOauthManager):
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
self.callback_url = config.callback_url if config else ''
super(
QQOauthManager,
self).__init__(
access_token=access_token,
openid=openid)
super().__init__(access_token=access_token, openid=openid)
def get_authorization_url(self, next_url='/'):
"""构建QQ授权跳转URL"""
params = {
'response_type': 'code',
'client_id': self.client_id,
@ -420,6 +445,7 @@ class QQOauthManager(BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""通过授权码获取QQ访问令牌"""
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
@ -428,77 +454,90 @@ class QQOauthManager(BaseOauthManager):
'redirect_uri': self.callback_url
}
rsp = self.do_get(self.TOKEN_URL, params)
if rsp:
# cll QQ返回格式为form-encoded解析令牌
d = urllib.parse.parse_qs(rsp)
if 'access_token' in d:
token = d['access_token']
self.access_token = token[0]
return token
else:
raise OAuthAccessTokenException(rsp)
raise OAuthAccessTokenException(rsp)
def get_open_id(self):
"""QQ需单独请求openid通过访问令牌获取"""
if self.is_access_token_set:
params = {
'access_token': self.access_token
}
rsp = self.do_get(self.OPEN_ID_URL, params)
if rsp:
rsp = rsp.replace(
'callback(', '').replace(
')', '').replace(
';', '')
# 去除QQ返回的callback包裹符
rsp = rsp.replace('callback(', '').replace(')', '').replace(';', '')
obj = json.loads(rsp)
openid = str(obj['openid'])
self.openid = openid
return openid
def get_oauth_userinfo(self):
"""获取QQ用户信息需先获取openid"""
openid = self.get_open_id()
if openid:
params = {
'access_token': self.access_token,
'oauth_consumer_key': self.client_id,
'oauth_consumer_key': self.client_id, # cll QQ要求传入appkey
'openid': self.openid
}
rsp = self.do_get(self.API_URL, params)
logger.info(rsp)
obj = json.loads(rsp)
user = OAuthUser()
user.nickname = obj['nickname']
user.openid = openid
user.nickname = obj['nickname'] # cll 昵称
user.openid = openid # cll 唯一标识
user.type = 'qq'
user.token = self.access_token
user.metadata = rsp
if 'email' in obj:
user.email = obj['email']
if 'figureurl' in obj:
user.picture = str(obj['figureurl'])
user.picture = str(obj['figureurl']) # cll 头像URL
return user
def get_picture(self, metadata):
"""从元数据提取QQ用户头像"""
datas = json.loads(metadata)
return str(datas['figureurl'])
@cache_decorator(expiration=100 * 60)
def get_oauth_apps():
def get_oauth_apps(): #xy
"""获取所有启用的OAuth应用缓存100分钟"""
# cll 读取数据库中启用的OAuth配置
configs = OAuthConfig.objects.filter(is_enable=True).all()
if not configs:
return []
# 提取已启用的平台类型
configtypes = [x.type for x in configs]
# cll 获取所有BaseOauthManager的子类各平台实现
applications = BaseOauthManager.__subclasses__()
# cll 筛选出已启用的平台实例
apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes]
return apps
def get_manager_by_type(type):
def get_manager_by_type(type):
"""根据平台类型获取对应的OAuth管理器实例"""
applications = get_oauth_apps()
if applications:
# cll 匹配平台类型(不区分大小写)
finds = list(
filter(
lambda x: x.ICON_NAME.lower() == type.lower(),
applications))
if finds:
return finds[0]
return None
return None
Loading…
Cancel
Save