From 8f09715ed35d7fbf6b3254f326f87b2f547d46c4 Mon Sep 17 00:00:00 2001 From: flj666 Date: Sun, 9 Nov 2025 00:24:11 +0800 Subject: [PATCH] updata forth --- src/DjangoBlog/accounts/models.py | 25 ++-- src/DjangoBlog/accounts/views.py | 138 +++++++++--------- src/DjangoBlog/comments/models.py | 18 ++- src/DjangoBlog/comments/views.py | 59 ++++---- src/DjangoBlog/oauth/models.py | 37 ++++- src/DjangoBlog/oauth/oauthmanager.py | 201 ++++++++++++++++----------- 6 files changed, 285 insertions(+), 193 deletions(-) diff --git a/src/DjangoBlog/accounts/models.py b/src/DjangoBlog/accounts/models.py index 178ee7a..acca91d 100644 --- a/src/DjangoBlog/accounts/models.py +++ b/src/DjangoBlog/accounts/models.py @@ -1,4 +1,4 @@ -# 这个文件定义了用户相关的数据模型 +#flj 这个文件定义了用户相关的数据模型 from django.contrib.auth.models import AbstractUser from django.db import models from django.urls import reverse @@ -9,27 +9,28 @@ from djangoblog.utils import get_current_site # Create your models here. +#zxm 自定义用户模型,扩展了Django的默认用户模型 class BlogUser(AbstractUser): - # 用户昵称 + #zxm 用户昵称 nickname = models.CharField(_('nick name'), max_length=100, blank=True) - # 创建时间 + #zxm 创建时间 creation_time = models.DateTimeField(_('creation time'), default=now) - # 最后修改时间 + #zxm 最后修改时间 last_modify_time = models.DateTimeField(_('last modify time'), default=now) - # 来源 + #zxm 来源 source = models.CharField(_('create source'), max_length=100, blank=True) - # 获取用户详情页的url + #zxm 获取用户详情页的url def get_absolute_url(self): return reverse( 'blog:author_detail', kwargs={ 'author_name': self.username}) - # 返回邮箱作为用户标识 + #zxm 返回邮箱作为用户标识 def __str__(self): return self.email - # 获取用户的完整url + #zxm 获取用户的完整url def get_full_url(self): site = get_current_site().domain url = "https://{site}{path}".format(site=site, @@ -37,7 +38,7 @@ class BlogUser(AbstractUser): return url class Meta: - ordering = ['-id'] - verbose_name = _('user') - verbose_name_plural = verbose_name - get_latest_by = 'id' + ordering = ['-id'] #zxm 按ID倒序排列 + verbose_name = _('user') #zxm 在管理后台显示的名称 + verbose_name_plural = verbose_name #zxm 复数形式 + get_latest_by = 'id' #zxm 获取最新记录的依据 diff --git a/src/DjangoBlog/accounts/views.py b/src/DjangoBlog/accounts/views.py index ae67aec..57afb09 100644 --- a/src/DjangoBlog/accounts/views.py +++ b/src/DjangoBlog/accounts/views.py @@ -1,3 +1,4 @@ +#flj 用户账户相关视图 import logging from django.utils.translation import gettext_lazy as _ from django.conf import settings @@ -31,29 +32,32 @@ logger = logging.getLogger(__name__) # Create your views here. +#zxm 用户注册视图类 class RegisterView(FormView): - form_class = RegisterForm - template_name = 'account/registration_form.html' + form_class = RegisterForm #zxm 使用注册表单 + template_name = 'account/registration_form.html' #zxm 注册页面模板 - @method_decorator(csrf_protect) + @method_decorator(csrf_protect) #zxm 防止CSRF攻击 def dispatch(self, *args, **kwargs): return super(RegisterView, self).dispatch(*args, **kwargs) + #zxm 表单验证成功后的处理 def form_valid(self, form): if form.is_valid(): - user = form.save(False) - user.is_active = False - user.source = 'Register' - user.save(True) - site = get_current_site().domain - sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id))) + user = form.save(False) #zxm 不保存到数据库 + user.is_active = False #zxm 初始状态为未激活 + user.source = 'Register' #zxm 注册来源 + user.save(True) #zxm 保存到数据库 + site = get_current_site().domain #zxm 获取当前站点域名 + sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id))) #zxm 生成验证签名 if settings.DEBUG: - site = '127.0.0.1:8000' - path = reverse('account:result') + site = '127.0.0.1:8000' #zxm 开发环境使用本地地址 + path = reverse('account:result') #zxm 获取结果页面URL url = "http://{site}{path}?type=validation&id={id}&sign={sign}".format( - site=site, path=path, id=user.id, sign=sign) + site=site, path=path, id=user.id, sign=sign) #zxm 生成验证链接 + #zxm 邮件内容 content = """

请点击下面链接验证您的邮箱

@@ -64,6 +68,7 @@ class RegisterView(FormView): 如果上面链接无法打开,请将此链接复制至浏览器。 {url} """.format(url=url) + #zxm 发送验证邮件 send_email( emailto=[ user.email, @@ -80,33 +85,35 @@ class RegisterView(FormView): }) +#fkc 用户登出视图类 class LogoutView(RedirectView): - url = '/login/' + url = '/login/' #zxm 登出后重定向到登录页 - @method_decorator(never_cache) + @method_decorator(never_cache) #zxm 不缓存页面 def dispatch(self, request, *args, **kwargs): return super(LogoutView, self).dispatch(request, *args, **kwargs) def get(self, request, *args, **kwargs): - logout(request) - delete_sidebar_cache() + logout(request) #zxm 登出用户 + delete_sidebar_cache() #zxm 删除侧边栏缓存 return super(LogoutView, self).get(request, *args, **kwargs) +#cll 用户登录视图类 class LoginView(FormView): - form_class = LoginForm - template_name = 'account/login.html' - success_url = '/' - redirect_field_name = REDIRECT_FIELD_NAME - login_ttl = 2626560 # 一个月的时间 - - @method_decorator(sensitive_post_parameters('password')) - @method_decorator(csrf_protect) - @method_decorator(never_cache) + form_class = LoginForm #zxm 使用登录表单 + template_name = 'account/login.html' #zxm 登录页面模板 + success_url = '/' #zxm 登录成功后重定向到首页 + redirect_field_name = REDIRECT_FIELD_NAME #zxm 重定向字段名 + login_ttl = 2626560 #zxm 一个月的时间(记住我功能) + + @method_decorator(sensitive_post_parameters('password')) #zxm 敏感参数保护 + @method_decorator(csrf_protect) #zxm 防止CSRF攻击 + @method_decorator(never_cache) #zxm 不缓存页面 def dispatch(self, request, *args, **kwargs): - return super(LoginView, self).dispatch(request, *args, **kwargs) + #zxm 获取上下文数据 def get_context_data(self, **kwargs): redirect_to = self.request.GET.get(self.redirect_field_name) if redirect_to is None: @@ -115,90 +122,95 @@ class LoginView(FormView): return super(LoginView, self).get_context_data(**kwargs) + #zxm 表单验证成功后的处理 def form_valid(self, form): form = AuthenticationForm(data=self.request.POST, request=self.request) if form.is_valid(): - delete_sidebar_cache() + delete_sidebar_cache() #zxm 删除侧边栏缓存 logger.info(self.redirect_field_name) - auth.login(self.request, form.get_user()) - if self.request.POST.get("remember"): - self.request.session.set_expiry(self.login_ttl) + auth.login(self.request, form.get_user()) #zxm 登录用户 + if self.request.POST.get("remember"): #zxm 如果勾选记住我 + self.request.session.set_expiry(self.login_ttl) #zxm 设置会话过期时间 return super(LoginView, self).form_valid(form) - # return HttpResponseRedirect('/') else: return self.render_to_response({ 'form': form }) + #zxm 获取成功后重定向的URL def get_success_url(self): - redirect_to = self.request.POST.get(self.redirect_field_name) if not url_has_allowed_host_and_scheme( url=redirect_to, allowed_hosts=[ - self.request.get_host()]): + self.request.get_host()]): #zxm 安全检查 redirect_to = self.success_url return redirect_to +#xy 账户结果处理函数 def account_result(request): - type = request.GET.get('type') - id = request.GET.get('id') + type = request.GET.get('type') #zxm 获取类型参数 + id = request.GET.get('id') #zxm 获取用户ID - user = get_object_or_404(get_user_model(), id=id) + user = get_object_or_404(get_user_model(), id=id) #zxm 获取用户对象 logger.info(type) - if user.is_active: - return HttpResponseRedirect('/') - if type and type in ['register', 'validation']: - if type == 'register': + if user.is_active: #zxm 如果用户已激活 + return HttpResponseRedirect('/') #zxm 重定向到首页 + if type and type in ['register', 'validation']: #zxm 处理注册或验证类型 + if type == 'register': #zxm 注册成功 content = ''' 恭喜您注册成功,一封验证邮件已经发送到您的邮箱,请验证您的邮箱后登录本站。 ''' title = '注册成功' - else: - c_sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id))) - sign = request.GET.get('sign') - if sign != c_sign: - return HttpResponseForbidden() - user.is_active = True - user.save() + else: #zxm 邮箱验证 + c_sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id))) #zxm 计算签名 + sign = request.GET.get('sign') #zxm 获取请求中的签名 + if sign != c_sign: #zxm 验证签名 + return HttpResponseForbidden() #zxm 签名不匹配,禁止访问 + user.is_active = True #zxm 激活用户 + user.save() #zxm 保存修改 content = ''' 恭喜您已经成功的完成邮箱验证,您现在可以使用您的账号来登录本站。 ''' title = '验证成功' - return render(request, 'account/result.html', { + return render(request, 'account/result.html', { #zxm 渲染结果页面 'title': title, 'content': content }) else: - return HttpResponseRedirect('/') + return HttpResponseRedirect('/') #zxm 其他情况重定向到首页 +#zhj 忘记密码视图类 class ForgetPasswordView(FormView): - form_class = ForgetPasswordForm - template_name = 'account/forget_password.html' + form_class = ForgetPasswordForm #zxm 使用忘记密码表单 + template_name = 'account/forget_password.html' #zxm 忘记密码页面模板 + #zxm 表单验证成功后的处理 def form_valid(self, form): if form.is_valid(): - blog_user = BlogUser.objects.filter(email=form.cleaned_data.get("email")).get() - blog_user.password = make_password(form.cleaned_data["new_password2"]) - blog_user.save() - return HttpResponseRedirect('/login/') + blog_user = BlogUser.objects.filter(email=form.cleaned_data.get("email")).get() #zxm 获取用户 + blog_user.password = make_password(form.cleaned_data["new_password2"]) #zxm 设置新密码 + blog_user.save() #zxm 保存修改 + return HttpResponseRedirect('/login/') #zxm 重定向到登录页 else: - return self.render_to_response({'form': form}) + return self.render_to_response({'form': form}) #zxm 渲染表单错误 +#flj 忘记密码邮箱验证码视图 class ForgetPasswordEmailCode(View): + #zxm 处理POST请求 def post(self, request: HttpRequest): - form = ForgetPasswordCodeForm(request.POST) + form = ForgetPasswordCodeForm(request.POST) #zxm 验证表单 if not form.is_valid(): - return HttpResponse("错误的邮箱") - to_email = form.cleaned_data["email"] + return HttpResponse("错误的邮箱") #zxm 返回错误信息 + to_email = form.cleaned_data["email"] #zxm 获取邮箱 - code = generate_code() - utils.send_verify_email(to_email, code) - utils.set_code(to_email, code) + code = generate_code() #zxm 生成验证码 + utils.send_verify_email(to_email, code) #zxm 发送验证邮件 + utils.set_code(to_email, code) #zxm 保存验证码 - return HttpResponse("ok") + return HttpResponse("ok") #zxm 返回成功信息 diff --git a/src/DjangoBlog/comments/models.py b/src/DjangoBlog/comments/models.py index 7c3bbc8..b83fa4f 100644 --- a/src/DjangoBlog/comments/models.py +++ b/src/DjangoBlog/comments/models.py @@ -1,3 +1,4 @@ +#zxm 评论相关模型 from django.conf import settings from django.db import models from django.utils.timezone import now @@ -8,32 +9,41 @@ from blog.models import Article # Create your models here. +#fkc 评论模型类 class Comment(models.Model): + #fkc 评论正文 body = models.TextField('正文', max_length=300) + #fkc 创建时间 creation_time = models.DateTimeField(_('creation time'), default=now) + #fkc 最后修改时间 last_modify_time = models.DateTimeField(_('last modify time'), default=now) + #fkc 评论作者(外键关联用户) author = models.ForeignKey( settings.AUTH_USER_MODEL, verbose_name=_('author'), on_delete=models.CASCADE) + #fkc 评论的文章(外键关联文章) article = models.ForeignKey( Article, verbose_name=_('article'), on_delete=models.CASCADE) + #fkc 父评论(支持回复功能) parent_comment = models.ForeignKey( 'self', verbose_name=_('parent comment'), blank=True, null=True, on_delete=models.CASCADE) + #fkc 是否启用(审核功能) is_enable = models.BooleanField(_('enable'), default=False, blank=False, null=False) class Meta: - ordering = ['-id'] - verbose_name = _('comment') - verbose_name_plural = verbose_name - get_latest_by = 'id' + ordering = ['-id'] #fkc 按ID倒序排列 + verbose_name = _('comment') #fkc 在管理后台显示的名称 + verbose_name_plural = verbose_name #fkc 复数形式 + get_latest_by = 'id' #fkc 获取最新记录的依据 + #fkc 返回评论正文作为字符串表示 def __str__(self): return self.body diff --git a/src/DjangoBlog/comments/views.py b/src/DjangoBlog/comments/views.py index ad9b2b9..a0bb7b4 100644 --- a/src/DjangoBlog/comments/views.py +++ b/src/DjangoBlog/comments/views.py @@ -1,4 +1,5 @@ # Create your views here. +#cll 评论相关视图 from django.core.exceptions import ValidationError from django.http import HttpResponseRedirect from django.shortcuts import get_object_or_404 @@ -12,52 +13,58 @@ from .forms import CommentForm from .models import Comment +#xy 评论提交视图类 class CommentPostView(FormView): - form_class = CommentForm - template_name = 'blog/article_detail.html' + form_class = CommentForm #xy 使用评论表单 + template_name = 'blog/article_detail.html' #xy 文章详情页面模板 - @method_decorator(csrf_protect) + @method_decorator(csrf_protect) #xy 防止CSRF攻击 def dispatch(self, *args, **kwargs): return super(CommentPostView, self).dispatch(*args, **kwargs) + #xy 处理GET请求 def get(self, request, *args, **kwargs): - article_id = self.kwargs['article_id'] - article = get_object_or_404(Article, pk=article_id) - url = article.get_absolute_url() - return HttpResponseRedirect(url + "#comments") + article_id = self.kwargs['article_id'] #xy 获取文章ID + article = get_object_or_404(Article, pk=article_id) #xy 获取文章对象 + url = article.get_absolute_url() #xy 获取文章详情页URL + return HttpResponseRedirect(url + "#comments") #xy 重定向到评论区 + #xy 表单验证失败后的处理 def form_invalid(self, form): - article_id = self.kwargs['article_id'] - article = get_object_or_404(Article, pk=article_id) + article_id = self.kwargs['article_id'] #xy 获取文章ID + article = get_object_or_404(Article, pk=article_id) #xy 获取文章对象 return self.render_to_response({ - 'form': form, - 'article': article + 'form': form, #xy 带错误的表单 + 'article': article #xy 文章对象 }) + #xy 表单验证成功后的处理 def form_valid(self, form): """提交的数据验证合法后的逻辑""" - user = self.request.user - author = BlogUser.objects.get(pk=user.pk) - article_id = self.kwargs['article_id'] - article = get_object_or_404(Article, pk=article_id) + user = self.request.user #xy 获取当前用户 + author = BlogUser.objects.get(pk=user.pk) #xy 获取用户对象 + article_id = self.kwargs['article_id'] #xy 获取文章ID + article = get_object_or_404(Article, pk=article_id) #xy 获取文章对象 + #xy 检查文章是否允许评论 if article.comment_status == 'c' or article.status == 'c': raise ValidationError("该文章评论已关闭.") - comment = form.save(False) - comment.article = article + comment = form.save(False) #xy 不保存到数据库 + comment.article = article #xy 设置评论所属文章 from djangoblog.utils import get_blog_setting - settings = get_blog_setting() - if not settings.comment_need_review: - comment.is_enable = True - comment.author = author + settings = get_blog_setting() #xy 获取博客设置 + if not settings.comment_need_review: #xy 如果不需要审核 + comment.is_enable = True #xy 直接启用评论 + comment.author = author #xy 设置评论作者 - if form.cleaned_data['parent_comment_id']: + #xy 处理回复评论 + if form.cleaned_data['parent_comment_id']: #xy 如果有父评论ID parent_comment = Comment.objects.get( - pk=form.cleaned_data['parent_comment_id']) - comment.parent_comment = parent_comment + pk=form.cleaned_data['parent_comment_id']) #xy 获取父评论 + comment.parent_comment = parent_comment #xy 设置父评论 - comment.save(True) + comment.save(True) #xy 保存评论到数据库 return HttpResponseRedirect( "%s#div-comment-%d" % - (article.get_absolute_url(), comment.pk)) + (article.get_absolute_url(), comment.pk)) #xy 重定向到评论位置 diff --git a/src/DjangoBlog/oauth/models.py b/src/DjangoBlog/oauth/models.py index be838ed..92411c2 100644 --- a/src/DjangoBlog/oauth/models.py +++ b/src/DjangoBlog/oauth/models.py @@ -1,4 +1,4 @@ -# Create your models here. +#xy OAuth相关模型 from django.conf import settings from django.core.exceptions import ValidationError from django.db import models @@ -6,33 +6,47 @@ from django.utils.timezone import now from django.utils.translation import gettext_lazy as _ +#zhj OAuth用户模型类 class OAuthUser(models.Model): + #zhj 关联的博客用户 author = models.ForeignKey( settings.AUTH_USER_MODEL, verbose_name=_('author'), blank=True, null=True, on_delete=models.CASCADE) + #zhj 第三方平台openid openid = models.CharField(max_length=50) + #zhj 用户昵称 nickname = models.CharField(max_length=50, verbose_name=_('nick name')) + #zhj 访问令牌 token = models.CharField(max_length=150, null=True, blank=True) + #zhj 用户头像 picture = models.CharField(max_length=350, blank=True, null=True) + #zhj OAuth类型(github、weibo等) type = models.CharField(blank=False, null=False, max_length=50) + #zhj 用户邮箱 email = models.CharField(max_length=50, null=True, blank=True) + #zhj 元数据 metadata = models.TextField(null=True, blank=True) + #zhj 创建时间 creation_time = models.DateTimeField(_('creation time'), default=now) + #zhj 最后修改时间 last_modify_time = models.DateTimeField(_('last modify time'), default=now) + #zhj 返回用户昵称作为字符串表示 def __str__(self): return self.nickname class Meta: - verbose_name = _('oauth user') - verbose_name_plural = verbose_name - ordering = ['-creation_time'] + verbose_name = _('oauth user') #zhj 在管理后台显示的名称 + verbose_name_plural = verbose_name #zhj 复数形式 + ordering = ['-creation_time'] #zhj 按创建时间倒序排列 +#flj OAuth配置模型类 class OAuthConfig(models.Model): + #flj OAuth类型选项 TYPE = ( ('weibo', _('weibo')), ('google', _('google')), @@ -40,28 +54,37 @@ class OAuthConfig(models.Model): ('facebook', 'FaceBook'), ('qq', 'QQ'), ) + #flj OAuth类型 type = models.CharField(_('type'), max_length=10, choices=TYPE, default='a') + #flj 应用密钥 appkey = models.CharField(max_length=200, verbose_name='AppKey') + #flj 应用密钥密钥 appsecret = models.CharField(max_length=200, verbose_name='AppSecret') + #flj 回调URL callback_url = models.CharField( max_length=200, verbose_name=_('callback url'), blank=False, default='') + #flj 是否启用 is_enable = models.BooleanField( _('is enable'), default=True, blank=False, null=False) + #flj 创建时间 creation_time = models.DateTimeField(_('creation time'), default=now) + #flj 最后修改时间 last_modify_time = models.DateTimeField(_('last modify time'), default=now) + #flj 验证方法,确保每种类型只有一个配置 def clean(self): if OAuthConfig.objects.filter( type=self.type).exclude(id=self.id).count(): raise ValidationError(_(self.type + _('already exists'))) + #flj 返回OAuth类型作为字符串表示 def __str__(self): return self.type class Meta: - verbose_name = 'oauth配置' - verbose_name_plural = verbose_name - ordering = ['-creation_time'] + verbose_name = 'oauth配置' #flj 在管理后台显示的名称 + verbose_name_plural = verbose_name #flj 复数形式 + ordering = ['-creation_time'] #flj 按创建时间倒序排列 diff --git a/src/DjangoBlog/oauth/oauthmanager.py b/src/DjangoBlog/oauth/oauthmanager.py index 2e7ceef..4523432 100644 --- a/src/DjangoBlog/oauth/oauthmanager.py +++ b/src/DjangoBlog/oauth/oauthmanager.py @@ -9,86 +9,95 @@ import requests from djangoblog.utils import cache_decorator from oauth.models import OAuthUser, OAuthConfig -logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) #flj -class OAuthAccessTokenException(Exception): - ''' - oauth授权失败异常 - ''' +class OAuthAccessTokenException(Exception): #fkc + '''oauth授权失败异常''' -class BaseOauthManager(metaclass=ABCMeta): - """获取用户授权""" +class BaseOauthManager(metaclass=ABCMeta): #cll + """OAuth授权基类,定义统一接口规范""" + #cll 授权页面URL AUTH_URL = None - """获取token""" + #cll 获取访问令牌URL TOKEN_URL = None - """获取用户信息""" + #cll 获取用户信息URL API_URL = None - '''icon图标名''' + #cll 平台图标标识名 ICON_NAME = None def __init__(self, access_token=None, openid=None): + #cll 访问令牌 self.access_token = access_token + #cll 第三方平台用户唯一标识 self.openid = openid @property def is_access_token_set(self): + """判断访问令牌是否已设置""" return self.access_token is not None @property def is_authorized(self): + """判断是否已完成授权(令牌和openid均存在)""" return self.is_access_token_set and self.access_token is not None and self.openid is not None @abstractmethod def get_authorization_url(self, nexturl='/'): + """抽象方法:获取授权跳转URL""" pass @abstractmethod def get_access_token_by_code(self, code): + """抽象方法:通过授权码获取访问令牌""" pass @abstractmethod def get_oauth_userinfo(self): + """抽象方法:通过访问令牌获取用户信息""" pass @abstractmethod def get_picture(self, metadata): + """抽象方法:从元数据中提取用户头像URL""" pass def do_get(self, url, params, headers=None): + """通用GET请求方法""" rsp = requests.get(url=url, params=params, headers=headers) logger.info(rsp.text) return rsp.text def do_post(self, url, params, headers=None): + """通用POST请求方法""" rsp = requests.post(url, params, headers=headers) logger.info(rsp.text) return rsp.text def get_config(self): + """获取当前平台的OAuth配置(从数据库读取)""" value = OAuthConfig.objects.filter(type=self.ICON_NAME) return value[0] if value else None -class WBOauthManager(BaseOauthManager): +class WBOauthManager(BaseOauthManager): #xy + """微博OAuth授权管理器""" AUTH_URL = 'https://api.weibo.com/oauth2/authorize' TOKEN_URL = 'https://api.weibo.com/oauth2/access_token' API_URL = 'https://api.weibo.com/2/users/show.json' ICON_NAME = 'weibo' def __init__(self, access_token=None, openid=None): + # cll获取微博OAuth配置 config = self.get_config() self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' self.callback_url = config.callback_url if config else '' - super( - WBOauthManager, - self).__init__( - access_token=access_token, - openid=openid) + super().__init__(access_token=access_token, openid=openid) def get_authorization_url(self, nexturl='/'): + """构建微博授权跳转URL,包含回调地址和后续跳转路径""" params = { 'client_id': self.client_id, 'response_type': 'code', @@ -98,7 +107,7 @@ class WBOauthManager(BaseOauthManager): return url def get_access_token_by_code(self, code): - + """通过授权码获取微博访问令牌和用户UID""" params = { 'client_id': self.client_id, 'client_secret': self.client_secret, @@ -107,8 +116,9 @@ class WBOauthManager(BaseOauthManager): 'redirect_uri': self.callback_url } rsp = self.do_post(self.TOKEN_URL, params) - obj = json.loads(rsp) + + # cll 成功获取令牌后,存储并返回用户信息 if 'access_token' in obj: self.access_token = str(obj['access_token']) self.openid = str(obj['uid']) @@ -117,22 +127,27 @@ class WBOauthManager(BaseOauthManager): raise OAuthAccessTokenException(rsp) def get_oauth_userinfo(self): + """通过访问令牌获取微博用户信息(昵称、头像、邮箱等)""" if not self.is_authorized: return None + params = { 'uid': self.openid, 'access_token': self.access_token } rsp = self.do_get(self.API_URL, params) + try: datas = json.loads(rsp) user = OAuthUser() - user.metadata = rsp - user.picture = datas['avatar_large'] - user.nickname = datas['screen_name'] - user.openid = datas['id'] - user.type = 'weibo' - user.token = self.access_token + user.metadata = rsp # cll 存储原始返回数据 + user.picture = datas['avatar_large'] # cll 大尺寸头像 + user.nickname = datas['screen_name'] # cll 昵称 + user.openid = datas['id'] # cll 用户唯一标识 + user.type = 'weibo' # cll 平台类型 + user.token = self.access_token # cll 存储访问令牌 + + # cll 若返回邮箱则存储 if 'email' in datas and datas['email']: user.email = datas['email'] return user @@ -142,12 +157,15 @@ class WBOauthManager(BaseOauthManager): return None def get_picture(self, metadata): + """从元数据中提取微博用户头像URL""" datas = json.loads(metadata) return datas['avatar_large'] -class ProxyManagerMixin: +class ProxyManagerMixin: + """代理请求混入类,支持通过环境变量配置HTTP代理""" def __init__(self, *args, **kwargs): + # cll 从环境变量读取代理配置 if os.environ.get("HTTP_PROXY"): self.proxies = { "http": os.environ.get("HTTP_PROXY"), @@ -157,17 +175,20 @@ class ProxyManagerMixin: self.proxies = None def do_get(self, url, params, headers=None): + """带代理的GET请求""" rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies) logger.info(rsp.text) return rsp.text def do_post(self, url, params, headers=None): + """带代理的POST请求""" rsp = requests.post(url, params, headers=headers, proxies=self.proxies) logger.info(rsp.text) return rsp.text -class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): +class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): + """Google OAuth授权管理器(支持代理)""" AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth' TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token' API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo' @@ -178,13 +199,10 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' self.callback_url = config.callback_url if config else '' - super( - GoogleOauthManager, - self).__init__( - access_token=access_token, - openid=openid) + super().__init__(access_token=access_token, openid=openid) def get_authorization_url(self, nexturl='/'): + """构建Google授权跳转URL,请求openid和email权限""" params = { 'client_id': self.client_id, 'response_type': 'code', @@ -195,43 +213,45 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): return url def get_access_token_by_code(self, code): + """通过授权码获取Google访问令牌""" params = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'grant_type': 'authorization_code', 'code': code, - 'redirect_uri': self.callback_url } rsp = self.do_post(self.TOKEN_URL, params) - obj = json.loads(rsp) - + if 'access_token' in obj: self.access_token = str(obj['access_token']) - self.openid = str(obj['id_token']) + self.openid = str(obj['id_token']) # cll Google用id_token作为openid logger.info(self.ICON_NAME + ' oauth ' + rsp) return self.access_token else: raise OAuthAccessTokenException(rsp) def get_oauth_userinfo(self): + """通过访问令牌获取Google用户信息""" if not self.is_authorized: return None + params = { 'access_token': self.access_token } rsp = self.do_get(self.API_URL, params) + try: - datas = json.loads(rsp) user = OAuthUser() user.metadata = rsp - user.picture = datas['picture'] - user.nickname = datas['name'] - user.openid = datas['sub'] + user.picture = datas['picture'] # cll 头像URL + user.nickname = datas['name'] # cll 姓名 + user.openid = datas['sub'] # cll 唯一标识 user.token = self.access_token user.type = 'google' + if datas['email']: user.email = datas['email'] return user @@ -241,11 +261,13 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager): return None def get_picture(self, metadata): + """从元数据提取Google用户头像""" datas = json.loads(metadata) return datas['picture'] class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): + """GitHub OAuth授权管理器(支持代理)""" AUTH_URL = 'https://github.com/login/oauth/authorize' TOKEN_URL = 'https://github.com/login/oauth/access_token' API_URL = 'https://api.github.com/user' @@ -256,13 +278,10 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' self.callback_url = config.callback_url if config else '' - super( - GitHubOauthManager, - self).__init__( - access_token=access_token, - openid=openid) + super().__init__(access_token=access_token, openid=openid) def get_authorization_url(self, next_url='/'): + """构建GitHub授权跳转URL,请求user权限""" params = { 'client_id': self.client_id, 'response_type': 'code', @@ -273,16 +292,17 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): return url def get_access_token_by_code(self, code): + """通过授权码获取GitHub访问令牌""" params = { 'client_id': self.client_id, 'client_secret': self.client_secret, 'grant_type': 'authorization_code', 'code': code, - 'redirect_uri': self.callback_url } rsp = self.do_post(self.TOKEN_URL, params) - + + # cll GitHub返回格式为form-encoded,需解析 from urllib import parse r = parse.parse_qs(rsp) if 'access_token' in r: @@ -292,19 +312,21 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): raise OAuthAccessTokenException(rsp) def get_oauth_userinfo(self): - + """通过访问令牌获取GitHub用户信息(需在请求头携带令牌)""" rsp = self.do_get(self.API_URL, params={}, headers={ "Authorization": "token " + self.access_token }) + try: datas = json.loads(rsp) user = OAuthUser() - user.picture = datas['avatar_url'] - user.nickname = datas['name'] - user.openid = datas['id'] + user.picture = datas['avatar_url'] # cll 头像URL + user.nickname = datas['name'] # cll 姓名(可能为空) + user.openid = datas['id'] # cll 唯一标识 user.type = 'github' user.token = self.access_token user.metadata = rsp + if 'email' in datas and datas['email']: user.email = datas['email'] return user @@ -314,11 +336,13 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager): return None def get_picture(self, metadata): + """从元数据提取GitHub用户头像""" datas = json.loads(metadata) return datas['avatar_url'] -class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): +class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): #fkc + """Facebook OAuth授权管理器(支持代理)""" AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth' TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token' API_URL = 'https://graph.facebook.com/me' @@ -329,13 +353,10 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' self.callback_url = config.callback_url if config else '' - super( - FaceBookOauthManager, - self).__init__( - access_token=access_token, - openid=openid) + super().__init__(access_token=access_token, openid=openid) def get_authorization_url(self, next_url='/'): + """构建Facebook授权跳转URL,请求邮箱和公开资料权限""" params = { 'client_id': self.client_id, 'response_type': 'code', @@ -346,17 +367,16 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): return url def get_access_token_by_code(self, code): + """通过授权码获取Facebook访问令牌""" params = { 'client_id': self.client_id, 'client_secret': self.client_secret, - # 'grant_type': 'authorization_code', 'code': code, - 'redirect_uri': self.callback_url } rsp = self.do_post(self.TOKEN_URL, params) - obj = json.loads(rsp) + if 'access_token' in obj: token = str(obj['access_token']) self.access_token = token @@ -365,21 +385,26 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): raise OAuthAccessTokenException(rsp) def get_oauth_userinfo(self): + """通过访问令牌获取Facebook用户信息(指定返回字段)""" params = { 'access_token': self.access_token, - 'fields': 'id,name,picture,email' + 'fields': 'id,name,picture,email' # cll 指定需要返回的字段 } + try: rsp = self.do_get(self.API_URL, params) datas = json.loads(rsp) user = OAuthUser() - user.nickname = datas['name'] - user.openid = datas['id'] + user.nickname = datas['name'] # cll 姓名 + user.openid = datas['id'] # cll 唯一标识 user.type = 'facebook' user.token = self.access_token user.metadata = rsp + if 'email' in datas and datas['email']: user.email = datas['email'] + + # cll 解析头像URL(Facebook返回格式嵌套较深) if 'picture' in datas and datas['picture'] and datas['picture']['data'] and datas['picture']['data']['url']: user.picture = str(datas['picture']['data']['url']) return user @@ -388,14 +413,17 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager): return None def get_picture(self, metadata): + """从元数据提取Facebook用户头像""" datas = json.loads(metadata) return str(datas['picture']['data']['url']) -class QQOauthManager(BaseOauthManager): +class QQOauthManager(BaseOauthManager): #cll + """QQ OAuth授权管理器""" AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize' TOKEN_URL = 'https://graph.qq.com/oauth2.0/token' API_URL = 'https://graph.qq.com/user/get_user_info' + # cll QQ需单独请求openid的URL OPEN_ID_URL = 'https://graph.qq.com/oauth2.0/me' ICON_NAME = 'qq' @@ -404,13 +432,10 @@ class QQOauthManager(BaseOauthManager): self.client_id = config.appkey if config else '' self.client_secret = config.appsecret if config else '' self.callback_url = config.callback_url if config else '' - super( - QQOauthManager, - self).__init__( - access_token=access_token, - openid=openid) + super().__init__(access_token=access_token, openid=openid) def get_authorization_url(self, next_url='/'): + """构建QQ授权跳转URL""" params = { 'response_type': 'code', 'client_id': self.client_id, @@ -420,6 +445,7 @@ class QQOauthManager(BaseOauthManager): return url def get_access_token_by_code(self, code): + """通过授权码获取QQ访问令牌""" params = { 'grant_type': 'authorization_code', 'client_id': self.client_id, @@ -428,77 +454,90 @@ class QQOauthManager(BaseOauthManager): 'redirect_uri': self.callback_url } rsp = self.do_get(self.TOKEN_URL, params) + if rsp: + # cll QQ返回格式为form-encoded,解析令牌 d = urllib.parse.parse_qs(rsp) if 'access_token' in d: token = d['access_token'] self.access_token = token[0] return token - else: - raise OAuthAccessTokenException(rsp) + raise OAuthAccessTokenException(rsp) def get_open_id(self): + """QQ需单独请求openid(通过访问令牌获取)""" if self.is_access_token_set: params = { 'access_token': self.access_token } rsp = self.do_get(self.OPEN_ID_URL, params) + if rsp: - rsp = rsp.replace( - 'callback(', '').replace( - ')', '').replace( - ';', '') + # 去除QQ返回的callback包裹符 + rsp = rsp.replace('callback(', '').replace(')', '').replace(';', '') obj = json.loads(rsp) openid = str(obj['openid']) self.openid = openid return openid def get_oauth_userinfo(self): + """获取QQ用户信息(需先获取openid)""" openid = self.get_open_id() if openid: params = { 'access_token': self.access_token, - 'oauth_consumer_key': self.client_id, + 'oauth_consumer_key': self.client_id, # cll QQ要求传入appkey 'openid': self.openid } rsp = self.do_get(self.API_URL, params) logger.info(rsp) obj = json.loads(rsp) + user = OAuthUser() - user.nickname = obj['nickname'] - user.openid = openid + user.nickname = obj['nickname'] # cll 昵称 + user.openid = openid # cll 唯一标识 user.type = 'qq' user.token = self.access_token user.metadata = rsp + if 'email' in obj: user.email = obj['email'] if 'figureurl' in obj: - user.picture = str(obj['figureurl']) + user.picture = str(obj['figureurl']) # cll 头像URL return user def get_picture(self, metadata): + """从元数据提取QQ用户头像""" datas = json.loads(metadata) return str(datas['figureurl']) @cache_decorator(expiration=100 * 60) -def get_oauth_apps(): +def get_oauth_apps(): #xy + """获取所有启用的OAuth应用(缓存100分钟)""" + # cll 读取数据库中启用的OAuth配置 configs = OAuthConfig.objects.filter(is_enable=True).all() if not configs: return [] + + # 提取已启用的平台类型 configtypes = [x.type for x in configs] + # cll 获取所有BaseOauthManager的子类(各平台实现) applications = BaseOauthManager.__subclasses__() + # cll 筛选出已启用的平台实例 apps = [x() for x in applications if x().ICON_NAME.lower() in configtypes] return apps -def get_manager_by_type(type): +def get_manager_by_type(type): + """根据平台类型获取对应的OAuth管理器实例""" applications = get_oauth_apps() if applications: + # cll 匹配平台类型(不区分大小写) finds = list( filter( lambda x: x.ICON_NAME.lower() == type.lower(), applications)) if finds: return finds[0] - return None + return None \ No newline at end of file