syj注释 #10

Merged
pqnvcz97o merged 6 commits from syj_branch into develop 5 months ago

@ -10,10 +10,28 @@ from djangoblog.utils import get_current_site
class BlogUser(AbstractUser):
"""
<<<<<<< HEAD
博客用户模型类
继承自Django的AbstractUser扩展了额外的用户信息字段
Attributes:
nickname (CharField): 用户昵称最大长度100个字符可为空
creation_time (DateTimeField): 用户创建时间默认为当前时间
last_modify_time (DateTimeField): 用户信息最后修改时间默认为当前时间
source (CharField): 用户创建来源最大长度100个字符可为空
Meta:
ordering: 按照id倒序排列
verbose_name: 用户的可读性名称
verbose_name_plural: 用户的复数形式名称
get_latest_by: 指定用于latest()查询的字段
"""
=======
博客用户模型继承自Django的AbstractUser
扩展了用户的基本信息
"""
# 用户昵称,可为空
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
nickname = models.CharField(_('nick name'), max_length=100, blank=True)
# 用户创建时间,默认为当前时间
creation_time = models.DateTimeField(_('creation time'), default=now)
@ -24,7 +42,15 @@ class BlogUser(AbstractUser):
def get_absolute_url(self):
"""
<<<<<<< HEAD
获取用户详情页URL
通过reverse函数解析URL使用author_detail命名URL模式
Returns:
str: 用户详情页的相对URL路径
=======
获取用户详情页的绝对URL
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
return reverse(
'blog:author_detail', kwargs={
@ -32,13 +58,28 @@ class BlogUser(AbstractUser):
def __str__(self):
"""
<<<<<<< HEAD
模型的字符串表示
Returns:
str: 用户邮箱地址
=======
定义对象的字符串表示返回用户的邮箱
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
return self.email
def get_full_url(self):
"""
<<<<<<< HEAD
获取用户页面的完整URL包含域名
用于构建完整的用户页面链接包含协议和域名
Returns:
str: 完整的用户页面URL格式为 https://{site}{path}
=======
获取用户的完整URL地址
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
@ -46,8 +87,15 @@ class BlogUser(AbstractUser):
return url
class Meta:
<<<<<<< HEAD
ordering = ['-id']
verbose_name = _('user')
verbose_name_plural = verbose_name
get_latest_by = 'id'
=======
ordering = ['-id'] # 默认按ID降序排列
verbose_name = _('user') # 单数名称
verbose_name_plural = verbose_name # 复数名称
get_latest_by = 'id' # 获取最新记录的字段
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc

@ -34,17 +34,32 @@ logger = logging.getLogger(__name__) # 创建日志记录器
class RegisterView(FormView):
"""
用户注册视图类
<<<<<<< HEAD
处理用户注册表单提交和验证邮箱功能
"""
form_class = RegisterForm
template_name = 'account/registration_form.html'
=======
"""
form_class = RegisterForm # 使用的表单类
template_name = 'account/registration_form.html' # 模板文件
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
@method_decorator(csrf_protect) # 添加CSRF保护装饰器
def dispatch(self, *args, **kwargs):
"""
调度方法添加CSRF保护装饰器
"""
return super(RegisterView, self).dispatch(*args, **kwargs)
def form_valid(self, form):
"""
<<<<<<< HEAD
处理有效的注册表单
保存用户信息发送验证邮件
=======
表单验证成功时的处理方法
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
if form.is_valid():
user = form.save(False) # 保存表单但不提交到数据库
@ -91,41 +106,77 @@ class RegisterView(FormView):
class LogoutView(RedirectView):
"""
用户登出视图类
<<<<<<< HEAD
处理用户登出逻辑并重定向到登录页面
"""
url = '/login/'
=======
"""
url = '/login/' # 登出后重定向的URL
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
@method_decorator(never_cache) # 添加不缓存装饰器
def dispatch(self, request, *args, **kwargs):
"""
调度方法添加不缓存装饰器
"""
return super(LogoutView, self).dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
"""
处理GET请求执行登出操作
"""
<<<<<<< HEAD
logout(request)
delete_sidebar_cache()
return super(LogoutView, self).get(request, *args, **kwargs)
=======
logout(request) # 执行登出
delete_sidebar_cache() # 删除侧边栏缓存
return super(LogoutView, self).get(request, *args, **kwargs) # 重定向到登录页
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
class LoginView(FormView):
"""
用户登录视图类
<<<<<<< HEAD
处理用户登录表单和认证逻辑
"""
form_class = LoginForm
template_name = 'account/login.html'
success_url = '/'
redirect_field_name = REDIRECT_FIELD_NAME
login_ttl = 2626560 # 一个月的时间
=======
"""
form_class = LoginForm # 使用的表单类
template_name = 'account/login.html' # 模板文件
success_url = '/' # 登录成功后重定向的URL
redirect_field_name = REDIRECT_FIELD_NAME # 重定向字段名
login_ttl = 2626560 # 登录会话保持时间(一个月)
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
@method_decorator(sensitive_post_parameters('password')) # 敏感参数保护
@method_decorator(csrf_protect) # CSRF保护
@method_decorator(never_cache) # 不缓存
def dispatch(self, request, *args, **kwargs):
<<<<<<< HEAD
"""
调度方法添加敏感参数保护CSRF保护和不缓存装饰器
"""
=======
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
return super(LoginView, self).dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
"""
<<<<<<< HEAD
获取上下文数据处理重定向URL
=======
获取上下文数据添加重定向URL
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
redirect_to = self.request.GET.get(self.redirect_field_name)
if redirect_to is None:
@ -136,7 +187,12 @@ class LoginView(FormView):
def form_valid(self, form):
"""
<<<<<<< HEAD
处理有效的登录表单
进行用户认证并登录
=======
表单验证成功时的处理方法
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
form = AuthenticationForm(data=self.request.POST, request=self.request)
@ -157,6 +213,10 @@ class LoginView(FormView):
"""
获取登录成功后的重定向URL
"""
<<<<<<< HEAD
=======
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
redirect_to = self.request.POST.get(self.redirect_field_name)
# 验证重定向URL是否安全
if not url_has_allowed_host_and_scheme(
@ -168,10 +228,18 @@ class LoginView(FormView):
def account_result(request):
"""
<<<<<<< HEAD
账户操作结果视图函数
处理注册和邮箱验证的结果页面显示
"""
type = request.GET.get('type')
id = request.GET.get('id')
=======
账户操作结果页面
"""
type = request.GET.get('type') # 获取操作类型
id = request.GET.get('id') # 获取用户ID
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
user = get_object_or_404(get_user_model(), id=id) # 获取用户对象
logger.info(type)
@ -208,6 +276,17 @@ def account_result(request):
class ForgetPasswordView(FormView):
"""
忘记密码视图类
<<<<<<< HEAD
处理用户忘记密码的重置操作
"""
form_class = ForgetPasswordForm
template_name = 'account/forget_password.html'
def form_valid(self, form):
"""
处理有效的忘记密码表单
更新用户密码
=======
"""
form_class = ForgetPasswordForm # 使用的表单类
template_name = 'account/forget_password.html' # 模板文件
@ -215,6 +294,7 @@ class ForgetPasswordView(FormView):
def form_valid(self, form):
"""
表单验证成功时的处理方法
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
if form.is_valid():
# 根据邮箱查找用户并更新密码
@ -228,14 +308,25 @@ class ForgetPasswordView(FormView):
class ForgetPasswordEmailCode(View):
"""
<<<<<<< HEAD
忘记密码邮箱验证码视图类
处理通过邮箱发送验证码的请求
=======
忘记密码时发送验证码视图类
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
def post(self, request: HttpRequest):
"""
<<<<<<< HEAD
处理POST请求发送验证码到用户邮箱
"""
form = ForgetPasswordCodeForm(request.POST)
=======
处理POST请求发送验证码邮件
"""
form = ForgetPasswordCodeForm(request.POST) # 验证表单
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
if not form.is_valid():
return HttpResponse("错误的邮箱") # 表单验证失败返回错误信息
to_email = form.cleaned_data["email"] # 获取邮箱
@ -246,3 +337,7 @@ class ForgetPasswordEmailCode(View):
return HttpResponse("ok") # 返回成功信息
<<<<<<< HEAD
return HttpResponse("ok")
=======
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc

@ -7,6 +7,17 @@ from blog.documents import ElapsedTimeDocument, ArticleDocumentManager, ElaspedT
# TODO 参数化
class Command(BaseCommand):
"""
<<<<<<< HEAD
Django管理命令用于构建Elasticsearch搜索索引
"""
help = 'build search index'
def handle(self, *args, **options):
"""
执行命令时的处理逻辑
如果启用了Elasticsearch则构建性能和文章的索引
"""
=======
Django管理命令用于构建搜索引擎索引
"""
help = 'build search index' # 命令帮助信息
@ -16,6 +27,7 @@ class Command(BaseCommand):
命令处理函数执行索引构建操作
"""
# 检查是否启用了Elasticsearch
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
if ELASTICSEARCH_ENABLED:
# 构建耗时文档索引
ElaspedTimeDocumentManager.build_index()

@ -456,6 +456,14 @@ logger = logging.getLogger(__name__)
class LinkShowType(models.TextChoices):
"""
链接显示类型枚举类
I: 首页显示
L: 列表页显示
P: 文章页显示
A: 所有页面显示
S: 幻灯片显示
"""
I = ('i', _('index'))
L = ('l', _('list'))
P = ('p', _('post'))
@ -464,11 +472,35 @@ class LinkShowType(models.TextChoices):
class BaseModel(models.Model):
id = models.AutoField(primary_key=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('modify time'), default=now)
"""
基础模型类
提供所有模型共有的字段和方法
包含主键创建时间和最后修改时间字段
Attributes:
id (AutoField): 主键字段
creation_time (DateTimeField): 创建时间
last_modify_time (DateTimeField): 最后修改时间
"""
id = models.AutoField(
primary_key=True,
help_text='主键ID')
creation_time = models.DateTimeField(
_('creation time'),
default=now,
help_text='记录创建时间')
last_modify_time = models.DateTimeField(
_('modify time'),
default=now,
help_text='记录最后修改时间')
def save(self, *args, **kwargs):
"""
重写保存方法
如果是更新文章浏览量则只更新浏览量字段
否则处理slug字段并调用父类保存方法
"""
is_update_views = isinstance(
self,
Article) and 'update_fields' in kwargs and kwargs['update_fields'] == ['views']
@ -483,6 +515,10 @@ class BaseModel(models.Model):
super().save(*args, **kwargs)
def get_full_url(self):
"""
获取完整URL地址
:return: 完整的URL地址包含域名
"""
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
@ -493,6 +529,10 @@ class BaseModel(models.Model):
@abstractmethod
def get_absolute_url(self):
"""
抽象方法子类必须实现
返回模型对象的绝对URL路径
"""
pass
@ -510,40 +550,79 @@ class Article(BaseModel):
('a', _('Article')),
('p', _('Page')),
)
title = models.CharField(_('title'), max_length=200, unique=True)
body = MDTextField(_('body'))
title = models.CharField(
_('title'),
max_length=200,
unique=True,
help_text='文章标题,必须唯一')
body = MDTextField(
_('body'),
help_text='文章正文内容支持Markdown语法')
pub_time = models.DateTimeField(
_('publish time'), blank=False, null=False, default=now)
_('publish time'),
blank=False,
null=False,
default=now,
help_text='文章发布时间')
status = models.CharField(
_('status'),
max_length=1,
choices=STATUS_CHOICES,
default='p')
default='p',
help_text='文章状态:草稿或已发布')
comment_status = models.CharField(
_('comment status'),
max_length=1,
choices=COMMENT_STATUS,
default='o')
type = models.CharField(_('type'), max_length=1, choices=TYPE, default='a')
views = models.PositiveIntegerField(_('views'), default=0)
default='o',
help_text='评论状态:开放或关闭')
type = models.CharField(
_('type'),
max_length=1,
choices=TYPE,
default='a',
help_text='文章类型:普通文章或页面')
views = models.PositiveIntegerField(
_('views'),
default=0,
help_text='文章浏览量')
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
blank=False,
null=False,
on_delete=models.CASCADE)
on_delete=models.CASCADE,
help_text='文章作者')
article_order = models.IntegerField(
_('order'), blank=False, null=False, default=0)
show_toc = models.BooleanField(_('show toc'), blank=False, null=False, default=False)
_('order'),
blank=False,
null=False,
default=0,
help_text='文章排序')
show_toc = models.BooleanField(
_('show toc'),
blank=False,
null=False,
default=False,
help_text='是否显示目录')
category = models.ForeignKey(
'Category',
verbose_name=_('category'),
on_delete=models.CASCADE,
blank=False,
null=False)
tags = models.ManyToManyField('Tag', verbose_name=_('tag'), blank=True)
null=False,
help_text='文章分类')
tags = models.ManyToManyField(
'Tag',
verbose_name=_('tag'),
blank=True,
help_text='文章标签')
def body_to_string(self):
"""
将文章内容转换为字符串
:return: 文章内容字符串
"""
return self.body
def __str__(self):
@ -556,6 +635,10 @@ class Article(BaseModel):
get_latest_by = 'id'
def get_absolute_url(self):
"""
获取文章详情页的URL
:return: 文章详情页URL
"""
return reverse('blog:detailbyid', kwargs={
'article_id': self.id,
'year': self.creation_time.year,
@ -565,19 +648,34 @@ class Article(BaseModel):
@cache_decorator(60 * 60 * 10)
def get_category_tree(self):
"""
获取分类目录树
:return: 分类目录树列表
"""
tree = self.category.get_category_tree()
names = list(map(lambda c: (c.name, c.get_absolute_url()), tree))
return names
def save(self, *args, **kwargs):
"""
重写保存方法调用父类保存方法
"""
super().save(*args, **kwargs)
def viewed(self):
"""
增加文章浏览量
"""
self.views += 1
self.save(update_fields=['views'])
def comment_list(self):
"""
获取文章的评论列表
使用缓存机制提高性能
:return: 评论列表
"""
cache_key = 'article_comments_{id}'.format(id=self.id)
value = cache.get(cache_key)
if value:
@ -590,24 +688,37 @@ class Article(BaseModel):
return comments
def get_admin_url(self):
"""
获取文章在管理后台的编辑URL
:return: 管理后台编辑URL
"""
info = (self._meta.app_label, self._meta.model_name)
return reverse('admin:%s_%s_change' % info, args=(self.pk,))
@cache_decorator(expiration=60 * 100)
def next_article(self):
"""
获取下一篇文章
:return: 下一篇文章对象
"""
# 下一篇
return Article.objects.filter(
id__gt=self.id, status='p').order_by('id').first()
@cache_decorator(expiration=60 * 100)
def prev_article(self):
"""
获取上一篇文章
:return: 上一篇文章对象
"""
# 前一篇
return Article.objects.filter(id__lt=self.id, status='p').first()
def get_first_image_url(self):
"""
Get the first image url from article.body.
:return:
从文章正文中获取第一张图片的URL
使用正则表达式匹配Markdown格式的图片
:return: 第一张图片的URL如果没有找到则返回空字符串
"""
match = re.search(r'!\[.*?\]\((.+?)\)', self.body)
if match:
@ -617,15 +728,27 @@ class Article(BaseModel):
class Category(BaseModel):
"""文章分类"""
name = models.CharField(_('category name'), max_length=30, unique=True)
name = models.CharField(
_('category name'),
max_length=30,
unique=True,
help_text='分类名称,必须唯一')
parent_category = models.ForeignKey(
'self',
verbose_name=_('parent category'),
blank=True,
null=True,
on_delete=models.CASCADE)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
index = models.IntegerField(default=0, verbose_name=_('index'))
on_delete=models.CASCADE,
help_text='父级分类')
slug = models.SlugField(
default='no-slug',
max_length=60,
blank=True,
help_text='分类slug用于生成URL')
index = models.IntegerField(
default=0,
verbose_name=_('index'),
help_text='分类索引,用于排序')
class Meta:
ordering = ['-index']
@ -633,6 +756,10 @@ class Category(BaseModel):
verbose_name_plural = verbose_name
def get_absolute_url(self):
"""
获取分类详情页URL
:return: 分类详情页URL
"""
return reverse(
'blog:category_detail', kwargs={
'category_name': self.slug})
@ -644,7 +771,7 @@ class Category(BaseModel):
def get_category_tree(self):
"""
递归获得分类目录的父级
:return:
:return: 包含当前分类及其所有父级分类的列表
"""
categorys = []
@ -660,7 +787,7 @@ class Category(BaseModel):
def get_sub_categorys(self):
"""
获得当前分类目录所有子集
:return:
:return: 包含当前分类及其所有子分类的列表
"""
categorys = []
all_categorys = Category.objects.all()
@ -680,17 +807,33 @@ class Category(BaseModel):
class Tag(BaseModel):
"""文章标签"""
name = models.CharField(_('tag name'), max_length=30, unique=True)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
name = models.CharField(
_('tag name'),
max_length=30,
unique=True,
help_text='标签名称,必须唯一')
slug = models.SlugField(
default='no-slug',
max_length=60,
blank=True,
help_text='标签slug用于生成URL')
def __str__(self):
return self.name
def get_absolute_url(self):
"""
获取标签详情页URL
:return: 标签详情页URL
"""
return reverse('blog:tag_detail', kwargs={'tag_name': self.slug})
@cache_decorator(60 * 60 * 10)
def get_article_count(self):
"""
获取使用该标签的文章数量
:return: 文章数量
"""
return Article.objects.filter(tags__name=self.name).distinct().count()
class Meta:
@ -702,18 +845,38 @@ class Tag(BaseModel):
class Links(models.Model):
"""友情链接"""
name = models.CharField(_('link name'), max_length=30, unique=True)
link = models.URLField(_('link'))
sequence = models.IntegerField(_('order'), unique=True)
name = models.CharField(
_('link name'),
max_length=30,
unique=True,
help_text='链接名称')
link = models.URLField(
_('link'),
help_text='链接地址')
sequence = models.IntegerField(
_('order'),
unique=True,
help_text='链接排序')
is_enable = models.BooleanField(
_('is show'), default=True, blank=False, null=False)
_('is show'),
default=True,
blank=False,
null=False,
help_text='是否显示')
show_type = models.CharField(
_('show type'),
max_length=1,
choices=LinkShowType.choices,
default=LinkShowType.I)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_mod_time = models.DateTimeField(_('modify time'), default=now)
default=LinkShowType.I,
help_text='链接显示类型')
creation_time = models.DateTimeField(
_('creation time'),
default=now,
help_text='链接创建时间')
last_mod_time = models.DateTimeField(
_('modify time'),
default=now,
help_text='链接最后修改时间')
class Meta:
ordering = ['sequence']
@ -726,12 +889,29 @@ class Links(models.Model):
class SideBar(models.Model):
"""侧边栏,可以展示一些html内容"""
name = models.CharField(_('title'), max_length=100)
content = models.TextField(_('content'))
sequence = models.IntegerField(_('order'), unique=True)
is_enable = models.BooleanField(_('is enable'), default=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_mod_time = models.DateTimeField(_('modify time'), default=now)
name = models.CharField(
_('title'),
max_length=100,
help_text='侧边栏标题')
content = models.TextField(
_('content'),
help_text='侧边栏内容支持HTML')
sequence = models.IntegerField(
_('order'),
unique=True,
help_text='侧边栏排序')
is_enable = models.BooleanField(
_('is enable'),
default=True,
help_text='是否启用')
creation_time = models.DateTimeField(
_('creation time'),
default=now,
help_text='侧边栏创建时间')
last_mod_time = models.DateTimeField(
_('modify time'),
default=now,
help_text='侧边栏最后修改时间')
class Meta:
ordering = ['sequence']
@ -749,53 +929,103 @@ class BlogSettings(models.Model):
max_length=200,
null=False,
blank=False,
default='')
default='',
help_text='网站名称')
site_description = models.TextField(
_('site description'),
max_length=1000,
null=False,
blank=False,
default='')
default='',
help_text='网站描述')
site_seo_description = models.TextField(
_('site seo description'), max_length=1000, null=False, blank=False, default='')
_('site seo description'),
max_length=1000,
null=False,
blank=False,
default='',
help_text='网站SEO描述')
site_keywords = models.TextField(
_('site keywords'),
max_length=1000,
null=False,
blank=False,
default='')
article_sub_length = models.IntegerField(_('article sub length'), default=300)
sidebar_article_count = models.IntegerField(_('sidebar article count'), default=10)
sidebar_comment_count = models.IntegerField(_('sidebar comment count'), default=5)
article_comment_count = models.IntegerField(_('article comment count'), default=5)
show_google_adsense = models.BooleanField(_('show adsense'), default=False)
default='',
help_text='网站关键词')
article_sub_length = models.IntegerField(
_('article sub length'),
default=300,
help_text='文章摘要长度')
sidebar_article_count = models.IntegerField(
_('sidebar article count'),
default=10,
help_text='侧边栏文章显示数量')
sidebar_comment_count = models.IntegerField(
_('sidebar comment count'),
default=5,
help_text='侧边栏评论显示数量')
article_comment_count = models.IntegerField(
_('article comment count'),
default=5,
help_text='文章评论显示数量')
show_google_adsense = models.BooleanField(
_('show adsense'),
default=False,
help_text='是否显示Google广告')
google_adsense_codes = models.TextField(
_('adsense code'), max_length=2000, null=True, blank=True, default='')
open_site_comment = models.BooleanField(_('open site comment'), default=True)
global_header = models.TextField("公共头部", null=True, blank=True, default='')
global_footer = models.TextField("公共尾部", null=True, blank=True, default='')
_('adsense code'),
max_length=2000,
null=True,
blank=True,
default='',
help_text='Google广告代码')
open_site_comment = models.BooleanField(
_('open site comment'),
default=True,
help_text='是否开放站点评论')
global_header = models.TextField(
"公共头部",
null=True,
blank=True,
default='',
help_text='公共头部代码')
global_footer = models.TextField(
"公共尾部",
null=True,
blank=True,
default='',
help_text='公共尾部代码')
beian_code = models.CharField(
'备案号',
max_length=2000,
null=True,
blank=True,
default='')
default='',
help_text='网站备案号')
analytics_code = models.TextField(
"网站统计代码",
max_length=1000,
null=False,
blank=False,
default='')
default='',
help_text='网站统计代码,如百度统计')
show_gongan_code = models.BooleanField(
'是否显示公安备案号', default=False, null=False)
'是否显示公安备案号',
default=False,
null=False,
help_text='是否显示公安备案号')
gongan_beiancode = models.TextField(
'公安备案号',
max_length=2000,
null=True,
blank=True,
default='')
default='',
help_text='公安备案号')
comment_need_review = models.BooleanField(
'评论是否需要审核', default=False, null=False)
'评论是否需要审核',
default=False,
null=False,
help_text='评论是否需要审核')
class Meta:
verbose_name = _('Website configuration')
@ -805,10 +1035,18 @@ class BlogSettings(models.Model):
return self.site_name
def clean(self):
"""
验证模型数据
确保只存在一个配置实例
"""
if BlogSettings.objects.exclude(id=self.id).count():
raise ValidationError(_('There can only be one configuration'))
def save(self, *args, **kwargs):
"""
重写保存方法
保存后清除缓存
"""
super().save(*args, **kwargs)
from djangoblog.utils import cache
cache.clear()
cache.clear()

@ -28,7 +28,14 @@ register = template.Library() # 创建模板标签注册器
@register.simple_tag(takes_context=True)
def head_meta(context):
"""
<<<<<<< HEAD
头部元数据标签
通过插件系统应用过滤器生成头部元数据
:param context: 模板上下文
:return: 头部元数据HTML
=======
生成页面头部meta信息
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
return mark_safe(hooks.apply_filters('head_meta', '', context))
@ -36,7 +43,12 @@ def head_meta(context):
@register.simple_tag
def timeformat(data):
"""
<<<<<<< HEAD
时间格式化标签
将时间数据格式化为设置中指定的时间格式
=======
格式化时间显示
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
:param data: 时间数据
:return: 格式化后的时间字符串
"""
@ -50,8 +62,14 @@ def timeformat(data):
@register.simple_tag
def datetimeformat(data):
"""
<<<<<<< HEAD
日期时间格式化标签
将时间数据格式化为设置中指定的日期时间格式
:param data: 时间数据
=======
格式化日期时间显示
:param data: 日期时间数据
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
:return: 格式化后的日期时间字符串
"""
try:
@ -65,9 +83,16 @@ def datetimeformat(data):
@stringfilter
def custom_markdown(content):
"""
<<<<<<< HEAD
自定义Markdown转换过滤器
将Markdown格式的内容转换为HTML
:param content: Markdown格式的内容
:return: 转换后的HTML内容
=======
将内容转换为markdown格式
:param content: 原始内容
:return: markdown格式的内容
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
return mark_safe(CommonMarkdown.get_markdown(content))
@ -75,8 +100,14 @@ def custom_markdown(content):
@register.simple_tag
def get_markdown_toc(content):
"""
<<<<<<< HEAD
获取Markdown目录标签
从Markdown内容中提取目录结构
:param content: Markdown格式的内容
=======
获取markdown内容的目录
:param content: markdown内容
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
:return: 目录HTML
"""
from djangoblog.utils import CommonMarkdown
@ -88,9 +119,16 @@ def get_markdown_toc(content):
@stringfilter
def comment_markdown(content):
"""
<<<<<<< HEAD
评论Markdown转换过滤器
将Markdown格式的评论内容转换为HTML并清理不安全标签
:param content: Markdown格式的评论内容
:return: 转换并清理后的HTML内容
=======
将评论内容转换为markdown格式并清理HTML
:param content: 评论内容
:return: 清理后的markdown格式内容
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
content = CommonMarkdown.get_markdown(content)
return mark_safe(sanitize_html(content))
@ -101,8 +139,14 @@ def comment_markdown(content):
def truncatechars_content(content):
"""
获得文章内容的摘要
<<<<<<< HEAD
根据博客设置中的文章摘要长度截取内容
:param content: 原始内容
:return: 截取后的内容
=======
:param content: 文章内容
:return: 截取后的摘要内容
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
from django.template.defaultfilters import truncatechars_html
from djangoblog.utils import get_blog_setting
@ -114,9 +158,16 @@ def truncatechars_content(content):
@stringfilter
def truncate(content):
"""
<<<<<<< HEAD
内容截取过滤器
移除HTML标签并截取前150个字符
:param content: 原始内容
:return: 截取后的内容
=======
截取内容前150个字符去除HTML标签
:param content: 原始内容
:return: 截取后的纯文本内容
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
from django.utils.html import strip_tags
return strip_tags(content)[:150]
@ -125,7 +176,12 @@ def truncate(content):
@register.inclusion_tag('blog/tags/breadcrumb.html')
def load_breadcrumb(article):
"""
<<<<<<< HEAD
加载面包屑导航标签
生成文章的面包屑导航信息
=======
获得文章面包屑导航
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
:param article: 文章对象
:return: 面包屑导航数据
"""
@ -146,9 +202,16 @@ def load_breadcrumb(article):
@register.inclusion_tag('blog/tags/article_tag_list.html')
def load_articletags(article):
"""
<<<<<<< HEAD
加载文章标签列表标签
生成文章标签的显示数据
:param article: 文章对象
:return: 文章标签列表数据
=======
加载文章标签列表
:param article: 文章对象
:return: 标签列表数据
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
tags = article.tags.all()
tags_list = []
@ -166,9 +229,17 @@ def load_articletags(article):
@register.inclusion_tag('blog/tags/sidebar.html')
def load_sidebar(user, linktype):
"""
<<<<<<< HEAD
加载侧边栏标签
生成侧边栏显示数据包括文章分类标签等信息
使用缓存提高性能
:param user: 当前用户
:param linktype: 链接类型
=======
加载侧边栏
:param user: 当前用户
:param linktype: 链接显示类型
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
:return: 侧边栏数据
"""
value = cache.get("sidebar" + linktype)
@ -239,10 +310,18 @@ def load_sidebar(user, linktype):
@register.inclusion_tag('blog/tags/article_meta_info.html')
def load_article_metas(article, user):
"""
<<<<<<< HEAD
加载文章元信息标签
生成文章元信息显示数据
:param article: 文章对象
:param user: 当前用户
:return: 文章元信息数据
=======
获得文章meta信息
:param article: 文章对象
:param user: 当前用户
:return: 文章meta信息数据
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
return {
'article': article,
@ -253,11 +332,20 @@ def load_article_metas(article, user):
@register.inclusion_tag('blog/tags/article_pagination.html')
def load_pagination_info(page_obj, page_type, tag_name):
"""
<<<<<<< HEAD
加载分页信息标签
根据页面类型生成分页导航链接
:param page_obj: 分页对象
:param page_type: 页面类型
:param tag_name: 标签名称
:return: 分页信息数据
=======
加载分页信息
:param page_obj: 分页对象
:param page_type: 页面类型
:param tag_name: 标签名/分类名/作者名
:return: 分页链接数据
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
previous_url = ''
next_url = ''
@ -336,7 +424,12 @@ def load_pagination_info(page_obj, page_type, tag_name):
@register.inclusion_tag('blog/tags/article_info.html')
def load_article_detail(article, isindex, user):
"""
<<<<<<< HEAD
加载文章详情标签
生成文章详情显示数据
=======
加载文章详情
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
:param article: 文章对象
:param isindex: 是否为列表页
:param user: 当前用户
@ -417,6 +510,10 @@ def query(qs, **kwargs):
@register.filter
def addstr(arg1, arg2):
<<<<<<< HEAD
"""concatenate arg1 & arg2"""
return str(arg1) + str(arg2)
=======
"""
连接两个字符串
:param arg1: 第一个字符串
@ -425,3 +522,4 @@ def addstr(arg1, arg2):
"""
return str(arg1) + str(arg2)
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc

@ -26,7 +26,12 @@ logger = logging.getLogger(__name__) # 创建日志记录器
class ArticleListView(ListView):
"""
<<<<<<< HEAD
文章列表视图基类
继承自Django的ListView提供文章列表的通用功能
=======
文章列表视图基类提供分页和缓存功能
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
# template_name属性用于指定使用哪个模板进行渲染
template_name = 'blog/article_index.html'
@ -42,7 +47,12 @@ class ArticleListView(ListView):
def get_view_cache_key(self):
"""
<<<<<<< HEAD
获取视图缓存键
:return: 缓存键
=======
获取视图缓存key
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
return self.request.get['pages']
@ -50,6 +60,10 @@ class ArticleListView(ListView):
def page_number(self):
"""
获取当前页码
<<<<<<< HEAD
:return: 页码
=======
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
page_kwarg = self.page_kwarg
page = self.kwargs.get(
@ -95,7 +109,11 @@ class ArticleListView(ListView):
def get_context_data(self, **kwargs):
"""
<<<<<<< HEAD
获取上下文数据
=======
添加额外的上下文数据
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
kwargs['linktype'] = self.link_type
return super(ArticleListView, self).get_context_data(**kwargs)
@ -103,7 +121,12 @@ class ArticleListView(ListView):
class IndexView(ArticleListView):
'''
<<<<<<< HEAD
首页视图类
显示所有已发布文章的列表
=======
首页视图
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
'''
# 友情链接类型
link_type = LinkShowType.I
@ -111,13 +134,22 @@ class IndexView(ArticleListView):
def get_queryset_data(self):
"""
获取首页文章数据
<<<<<<< HEAD
:return: 已发布文章查询集
=======
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
article_list = Article.objects.filter(type='a', status='p')
return article_list
def get_queryset_cache_key(self):
"""
<<<<<<< HEAD
获取首页缓存键
:return: 缓存键
=======
获取首页缓存key
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
cache_key = 'index_{page}'.format(page=self.page_number)
return cache_key
@ -125,7 +157,12 @@ class IndexView(ArticleListView):
class ArticleDetailView(DetailView):
'''
<<<<<<< HEAD
文章详情页面视图类
显示单篇文章的详细内容及相关信息
=======
文章详情页面视图
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
'''
template_name = 'blog/article_detail.html'
model = Article
@ -134,9 +171,16 @@ class ArticleDetailView(DetailView):
def get_context_data(self, **kwargs):
"""
<<<<<<< HEAD
获取文章详情页上下文数据
包括评论表单评论列表分页信息等
"""
comment_form = CommentForm()
=======
获取文章详情页的上下文数据
"""
comment_form = CommentForm() # 评论表单
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
article_comments = self.object.comment_list() # 获取文章评论列表
parent_comments = article_comments.filter(parent_comment=None) # 获取父级评论
@ -189,13 +233,23 @@ class ArticleDetailView(DetailView):
class CategoryDetailView(ArticleListView):
'''
<<<<<<< HEAD
分类目录详情视图类
显示指定分类下的所有文章
=======
分类目录列表视图
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
'''
page_type = "分类目录归档"
def get_queryset_data(self):
"""
<<<<<<< HEAD
获取指定分类下的文章数据
:return: 文章查询集
=======
获取分类文章数据
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
slug = self.kwargs['category_name']
category = get_object_or_404(Category, slug=slug)
@ -210,7 +264,12 @@ class CategoryDetailView(ArticleListView):
def get_queryset_cache_key(self):
"""
<<<<<<< HEAD
获取分类详情页缓存键
:return: 缓存键
=======
获取分类列表缓存key
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
"""
slug = self.kwargs['category_name']
category = get_object_or_404(Category, slug=slug)
@ -222,8 +281,14 @@ class CategoryDetailView(ArticleListView):
def get_context_data(self, **kwargs):
"""
<<<<<<< HEAD
获取分类详情页上下文数据
"""
=======
添加分类相关上下文数据
"""
>>>>>>> ff8be05a63440d45c1cc36269ce152191eac8fdc
categoryname = self.categoryname
try:
categoryname = categoryname.split('/')[-1]

@ -1,7 +1,13 @@
ARTICLE_DETAIL_LOAD = 'article_detail_load'
ARTICLE_CREATE = 'article_create'
ARTICLE_UPDATE = 'article_update'
ARTICLE_DELETE = 'article_delete'
"""
钩子常量定义文件
定义了系统中使用的各种钩子名称常量
"""
ARTICLE_CONTENT_HOOK_NAME = "the_content"
# 文章相关钩子
ARTICLE_DETAIL_LOAD = 'article_detail_load' # 文章详情加载
ARTICLE_CREATE = 'article_create' # 文章创建
ARTICLE_UPDATE = 'article_update' # 文章更新
ARTICLE_DELETE = 'article_delete' # 文章删除
# 内容过滤钩子
ARTICLE_CONTENT_HOOK_NAME = "the_content" # 文章内容过滤

@ -8,6 +8,8 @@ _hooks = {}
def register(hook_name: str, callback: callable):
"""
注册一个钩子回调
:param hook_name: 钩子名称
:param callback: 回调函数
"""
if hook_name not in _hooks:
_hooks[hook_name] = []
@ -19,6 +21,7 @@ def run_action(hook_name: str, *args, **kwargs):
"""
执行一个 Action Hook
它会按顺序执行所有注册到该钩子上的回调函数
:param hook_name: 钩子名称
"""
if hook_name in _hooks:
logger.debug(f"Running action hook '{hook_name}'")
@ -33,6 +36,9 @@ def apply_filters(hook_name: str, value, *args, **kwargs):
"""
执行一个 Filter Hook
它会把 value 依次传递给所有注册的回调函数进行处理
:param hook_name: 钩子名称
:param value: 要处理的值
:return: 处理后的值
"""
if hook_name in _hooks:
logger.debug(f"Applying filter hook '{hook_name}'")
@ -41,4 +47,4 @@ def apply_filters(hook_name: str, value, *args, **kwargs):
value = callback(value, *args, **kwargs)
except Exception as e:
logger.error(f"Error applying filter hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
return value
return value

@ -6,8 +6,12 @@ logger = logging.getLogger(__name__)
def load_plugins():
"""
Dynamically loads and initializes plugins from the 'plugins' directory.
This function is intended to be called when the Django app registry is ready.
动态加载并初始化'plugins'目录中的插件
此函数应在Django应用注册表准备就绪时调用
通过遍历ACTIVE_PLUGINS设置中的插件名称
检查插件目录和plugin.py文件是否存在
如果存在则尝试导入插件模块
"""
for plugin_name in settings.ACTIVE_PLUGINS:
plugin_path = os.path.join(settings.PLUGINS_DIR, plugin_name)
@ -16,4 +20,4 @@ def load_plugins():
__import__(f'plugins.{plugin_name}.plugin')
logger.info(f"Successfully loaded plugin: {plugin_name}")
except ImportError as e:
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)

@ -17,6 +17,12 @@ from django.utils.translation import gettext_lazy as _
def env_to_bool(env, default):
"""
将环境变量转换为布尔值
:param env: 环境变量名
:param default: 默认值
:return: 布尔值
"""
str_val = os.environ.get(env)
return default if str_val is None else str_val == 'True'

@ -61,4 +61,4 @@ urlpatterns += i18n_patterns(
, prefix_default_language=False) + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL,
document_root=settings.MEDIA_ROOT)
document_root=settings.MEDIA_ROOT)

@ -21,17 +21,32 @@ logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""
获取最大的文章ID和评论ID
:return: (最大文章ID, 最大评论ID)
"""
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""
计算字符串的SHA256哈希值
:param str: 输入字符串
:return: SHA256哈希值
"""
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
"""
缓存装饰器
用于缓存函数的返回值提高性能
:param expiration: 缓存过期时间默认3分钟
:return: 装饰器函数
"""
def wrapper(func):
def news(*args, **kwargs):
try:
@ -94,13 +109,27 @@ def expire_view_cache(path, servername, serverport, key_prefix=None):
@cache_decorator()
def get_current_site():
"""
获取当前站点信息
使用缓存装饰器提高性能
:return: 当前站点对象
"""
site = Site.objects.get_current()
return site
class CommonMarkdown:
"""
Markdown处理工具类
提供Markdown到HTML的转换功能
"""
@staticmethod
def _convert_markdown(value):
"""
转换Markdown为HTML
:param value: Markdown格式的文本
:return: (HTML内容, 目录)
"""
md = markdown.Markdown(
extensions=[
'extra',
@ -115,16 +144,33 @@ class CommonMarkdown:
@staticmethod
def get_markdown_with_toc(value):
"""
获取带目录的Markdown HTML
:param value: Markdown格式的文本
:return: (HTML内容, 目录)
"""
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
"""
获取Markdown HTML不带目录
:param value: Markdown格式的文本
:return: HTML内容
"""
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""
发送邮件
通过信号机制发送邮件
:param emailto: 收件人列表
:param title: 邮件标题
:param content: 邮件内容
"""
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
@ -134,11 +180,19 @@ def send_email(emailto, title, content):
def generate_code() -> str:
"""生成随机数验证码"""
"""
生成随机数验证码
:return: 6位随机数字验证码
"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
"""
将字典转换为URL参数
:param dict: 字典对象
:return: URL参数字符串
"""
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
@ -146,6 +200,11 @@ def parse_dict_to_url(dict):
def get_blog_setting():
"""
获取博客设置
使用缓存提高性能如果缓存不存在则从数据库获取
:return: 博客设置对象
"""
value = cache.get('get_blog_setting')
if value:
return value
@ -202,6 +261,10 @@ def save_user_avatar(url):
def delete_sidebar_cache():
"""
删除侧边栏缓存
清除所有侧边栏相关的缓存
"""
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
@ -210,12 +273,21 @@ def delete_sidebar_cache():
def delete_view_cache(prefix, keys):
"""
删除模板片段缓存
:param prefix: 缓存前缀
:param keys: 缓存键列表
"""
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
"""
获取资源URL
:return: 静态资源URL
"""
if settings.STATIC_URL:
return settings.STATIC_URL
else:
@ -229,4 +301,10 @@ ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['ti
def sanitize_html(html):
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)
"""
清理HTML内容
只保留允许的标签和属性防止XSS攻击
:param html: 原始HTML内容
:return: 清理后的HTML内容
"""
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)

@ -7,48 +7,88 @@ from django.utils.html import format_html
logger = logging.getLogger(__name__)
class OAuthUserAdmin(admin.ModelAdmin):
"""
OAuth用户模型在Django管理后台的配置类
"""
# 设置搜索字段,可以在管理后台通过昵称或邮箱搜索用户
search_fields = ('nickname', 'email')
# 每页显示的记录数
list_per_page = 20
# 在列表页显示的字段
list_display = (
'id',
'nickname',
'link_to_usermodel',
'show_user_image',
'type',
'email',
'id', # 用户ID
'nickname', # 用户昵称
'link_to_usermodel', # 关联的系统用户链接
'show_user_image', # 显示用户头像
'type', # OAuth类型如GitHub、微博等
'email', # 用户邮箱
)
# 可以点击进入详情页的字段
list_display_links = ('id', 'nickname')
# 列表过滤器,可以通过作者和类型进行筛选
list_filter = ('author', 'type',)
# 只读字段列表(初始为空)
readonly_fields = []
def get_readonly_fields(self, request, obj=None):
"""
获取只读字段列表
将所有字段都设置为只读禁止在管理后台修改OAuth用户信息
"""
# 将所有模型字段和多对多字段都设置为只读
return list(self.readonly_fields) + \
[field.name for field in obj._meta.fields] + \
[field.name for field in obj._meta.many_to_many]
def has_add_permission(self, request):
"""
控制是否允许添加新记录
返回False表示禁止在管理后台手动添加OAuth用户
"""
return False
def link_to_usermodel(self, obj):
"""
创建指向关联系统用户的链接
如果该OAuth用户关联了系统用户则显示一个链接到该系统用户详情页的链接
"""
if obj.author:
# 获取关联用户模型的app_label和model_name
info = (obj.author._meta.app_label, obj.author._meta.model_name)
# 构造管理后台编辑页面的URL
link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,))
# 返回格式化的HTML链接
return format_html(
u'<a href="%s">%s</a>' %
(link, obj.author.nickname if obj.author.nickname else obj.author.email))
def show_user_image(self, obj):
"""
显示用户头像
从picture字段获取图片URL并显示为50x50像素的缩略图
"""
img = obj.picture
# 返回格式化的HTML图片标签
return format_html(
u'<img src="%s" style="width:50px;height:50px"></img>' %
(img))
link_to_usermodel.short_description = '用户'
show_user_image.short_description = '用户头像'
# 设置自定义方法在列表中的显示名称
link_to_usermodel.short_description = '用户' # 关联用户列的显示名称
show_user_image.short_description = '用户头像' # 用户头像列的显示名称
class OAuthConfigAdmin(admin.ModelAdmin):
list_display = ('type', 'appkey', 'appsecret', 'is_enable')
"""
OAuth配置模型在Django管理后台的配置类
"""
# 在列表页显示的字段
list_display = (
'type', # OAuth类型
'appkey', # App Key
'appsecret', # App Secret
'is_enable' # 是否启用
)
# 列表过滤器,可以通过类型进行筛选
list_filter = ('type',)

@ -1,5 +1,30 @@
"""
OAuth应用配置
该模块定义了OAuth应用的配置类用于Django应用的初始化和配置
"""
from django.apps import AppConfig
class OauthConfig(AppConfig):
"""
OAuth应用的配置类
该类继承自Django的AppConfig用于配置OAuth应用的基本信息
当Django启动时会使用这个配置类来初始化OAuth应用
Attributes:
name (str): 应用名称必须与应用的目录名一致
verbose_name (str): 应用的显示名称可选
default_auto_field (str): 默认主键字段类型可选
"""
# 应用名称Django会根据这个名称来识别和加载相应的应用
# 必须与应用的目录名oauth一致
name = 'oauth'
# 应用的显示名称在Django管理后台中显示
verbose_name = 'OAuth第三方登录'
# 指定默认的主键字段类型Django 3.2+推荐使用
default_auto_field = 'django.db.models.BigAutoField'

@ -1,12 +1,55 @@
"""
OAuth模块表单定义
该模块包含OAuth登录过程中使用的表单类主要用于处理第三方登录时的用户信息收集
"""
from django.contrib.auth.forms import forms
from django.forms import widgets
class RequireEmailForm(forms.Form):
email = forms.EmailField(label='电子邮箱', required=True)
oauthid = forms.IntegerField(widget=forms.HiddenInput, required=False)
"""
要求用户提供邮箱的表单类
该表单用于在OAuth登录过程中要求用户输入电子邮箱地址
通常在第三方登录无法获取用户邮箱时使用例如某些OAuth提供商
可能不会在授权时返回用户的邮箱信息此时需要用户手动输入邮箱
来完成账号绑定
Attributes:
email (EmailField): 用户邮箱地址必填字段
oauthid (IntegerField): OAuth用户ID隐藏字段用于关联OAuth用户记录
"""
# 邮箱字段,设置为必填项,用于用户输入电子邮箱地址
email = forms.EmailField(
label='电子邮箱',
required=True,
help_text='请输入您的电子邮箱地址以完成账号绑定'
)
# OAuth ID字段用于存储第三方平台的用户ID隐藏字段非必填
oauthid = forms.IntegerField(
widget=forms.HiddenInput,
required=False,
help_text='OAuth用户ID由系统自动填充'
)
def __init__(self, *args, **kwargs):
"""
初始化表单设置邮箱输入框的样式和属性
Args:
*args: 位置参数
**kwargs: 关键字参数
"""
super(RequireEmailForm, self).__init__(*args, **kwargs)
# 自定义邮箱输入框的widget属性设置占位符和CSS样式类
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})
attrs={
'placeholder': "请输入您的邮箱地址",
"class": "form-control",
"autocomplete": "email"
}
)

@ -5,53 +5,76 @@ from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
# 标记这是一个初始迁移文件
initial = True
# 定义此迁移依赖的其他迁移
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
# 定义要执行的操作
operations = [
# 创建 OAuthConfig 模型
migrations.CreateModel(
name='OAuthConfig',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# OAuth类型选择包含微博、谷歌、GitHub、Facebook和QQ
('type', models.CharField(choices=[('weibo', '微博'), ('google', '谷歌'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='类型')),
# AppKey字段
('appkey', models.CharField(max_length=200, verbose_name='AppKey')),
# AppSecret字段
('appsecret', models.CharField(max_length=200, verbose_name='AppSecret')),
# 回调URL默认为百度首页
('callback_url', models.CharField(default='http://www.baidu.com', max_length=200, verbose_name='回调地址')),
# 是否启用该配置
('is_enable', models.BooleanField(default=True, verbose_name='是否显示')),
# 创建时间,默认为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 最后修改时间,默认为当前时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
],
# 模型选项设置
options={
'verbose_name': 'oauth配置',
'verbose_name_plural': 'oauth配置',
'ordering': ['-created_time'],
'ordering': ['-created_time'], # 按创建时间倒序排列
},
),
# 创建 OAuthUser 模型
migrations.CreateModel(
name='OAuthUser',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 用户在第三方平台的唯一标识
('openid', models.CharField(max_length=50)),
# 用户昵称
('nickname', models.CharField(max_length=50, verbose_name='昵称')),
# 访问令牌
('token', models.CharField(blank=True, max_length=150, null=True)),
# 用户头像URL
('picture', models.CharField(blank=True, max_length=350, null=True)),
# OAuth类型如weibo, github等
('type', models.CharField(max_length=50)),
# 用户邮箱
('email', models.CharField(blank=True, max_length=50, null=True)),
# 其他元数据信息
('metadata', models.TextField(blank=True, null=True)),
# 创建时间,默认为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 最后修改时间,默认为当前时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
# 关联到系统用户,允许为空
('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='用户')),
],
# 模型选项设置
options={
'verbose_name': 'oauth用户',
'verbose_name_plural': 'oauth用户',
'ordering': ['-created_time'],
'ordering': ['-created_time'], # 按创建时间倒序排列
},
),
]

@ -5,79 +5,124 @@ from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
class Migration(migrations.Migration):
# 定义此迁移依赖的其他迁移文件
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('oauth', '0001_initial'),
('oauth', '0001_initial'), # 依赖于oauth应用的初始迁移
]
operations = [
# 修改 OAuthConfig 模型选项
migrations.AlterModelOptions(
name='oauthconfig',
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth配置', 'verbose_name_plural': 'oauth配置'},
options={
'ordering': ['-creation_time'], # 按创建时间倒序排列
'verbose_name': 'oauth配置', # 单数名称
'verbose_name_plural': 'oauth配置' # 复数名称
},
),
# 修改 OAuthUser 模型选项
migrations.AlterModelOptions(
name='oauthuser',
options={'ordering': ['-creation_time'], 'verbose_name': 'oauth user', 'verbose_name_plural': 'oauth user'},
options={
'ordering': ['-creation_time'], # 按创建时间倒序排列
'verbose_name': 'oauth user', # 单数名称
'verbose_name_plural': 'oauth user' # 复数名称
},
),
# 移除 OAuthConfig 模型中的旧时间字段
migrations.RemoveField(
model_name='oauthconfig',
name='created_time',
name='created_time', # 删除created_time字段
),
migrations.RemoveField(
model_name='oauthconfig',
name='last_mod_time',
name='last_mod_time', # 删除last_mod_time字段
),
# 移除 OAuthUser 模型中的旧时间字段
migrations.RemoveField(
model_name='oauthuser',
name='created_time',
name='created_time', # 删除created_time字段
),
migrations.RemoveField(
model_name='oauthuser',
name='last_mod_time',
name='last_mod_time', # 删除last_mod_time字段
),
# 为 OAuthConfig 模型添加新的时间字段
migrations.AddField(
model_name='oauthconfig',
name='creation_time',
name='creation_time', # 添加creation_time字段
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
migrations.AddField(
model_name='oauthconfig',
name='last_modify_time',
name='last_modify_time', # 添加last_modify_time字段
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
),
# 为 OAuthUser 模型添加新的时间字段
migrations.AddField(
model_name='oauthuser',
name='creation_time',
name='creation_time', # 添加creation_time字段
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
migrations.AddField(
model_name='oauthuser',
name='last_modify_time',
name='last_modify_time', # 添加last_modify_time字段
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
),
# 修改 OAuthConfig 模型的callback_url字段
migrations.AlterField(
model_name='oauthconfig',
name='callback_url',
field=models.CharField(default='', max_length=200, verbose_name='callback url'),
),
# 修改 OAuthConfig 模型的is_enable字段仅更改显示名称
migrations.AlterField(
model_name='oauthconfig',
name='is_enable',
field=models.BooleanField(default=True, verbose_name='is enable'),
),
# 修改 OAuthConfig 模型的type字段选项中文标签改为英文
migrations.AlterField(
model_name='oauthconfig',
name='type',
field=models.CharField(choices=[('weibo', 'weibo'), ('google', 'google'), ('github', 'GitHub'), ('facebook', 'FaceBook'), ('qq', 'QQ')], default='a', max_length=10, verbose_name='type'),
field=models.CharField(
choices=[
('weibo', 'weibo'),
('google', 'google'),
('github', 'GitHub'),
('facebook', 'FaceBook'),
('qq', 'QQ')
],
default='a',
max_length=10,
verbose_name='type'
),
),
# 修改 OAuthUser 模型的author字段显示名称
migrations.AlterField(
model_name='oauthuser',
name='author',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'),
field=models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL,
verbose_name='author'
),
),
# 修改 OAuthUser 模型的nickname字段显示名称中文改为英文
migrations.AlterField(
model_name='oauthuser',
name='nickname',

@ -2,17 +2,21 @@
from django.db import migrations, models
class Migration(migrations.Migration):
# 定义此迁移依赖的前一个迁移文件
dependencies = [
('oauth', '0002_alter_oauthconfig_options_alter_oauthuser_options_and_more'),
]
# 定义要执行的迁移操作
operations = [
# 修改 OAuthUser 模型中的 nickname 字段
migrations.AlterField(
model_name='oauthuser',
name='nickname',
field=models.CharField(max_length=50, verbose_name='nick name'),
model_name='oauthuser', # 目标模型为 OAuthUser
name='nickname', # 字段名为 nickname
field=models.CharField(
max_length=50, # 最大长度仍为50个字符
verbose_name='nick name' # 将字段的显示名称从 "nickname" 改为 "nick name"
),
),
]

@ -7,6 +7,27 @@ from django.utils.translation import gettext_lazy as _
class OAuthUser(models.Model):
"""
OAuth用户模型
用于存储通过第三方平台登录的用户信息
Attributes:
author (ForeignKey): 关联的本地用户可为空
openid (CharField): 第三方平台的用户唯一标识最大长度50
nickname (CharField): 用户在第三方平台的昵称最大长度50
token (CharField): OAuth认证令牌最大长度150可为空
picture (CharField): 用户头像URL最大长度350可为空
type (CharField): 第三方平台类型如weibogoogle等最大长度50
email (CharField): 用户邮箱最大长度50可为空
metadata (TextField): 其他元数据信息可为空
creation_time (DateTimeField): 记录创建时间默认为当前时间
last_modify_time (DateTimeField): 记录最后修改时间默认为当前时间
Meta:
verbose_name: OAuth用户的可读性名称
verbose_name_plural: OAuth用户的复数形式名称
ordering: 按照创建时间倒序排列
"""
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
@ -24,6 +45,12 @@ class OAuthUser(models.Model):
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
def __str__(self):
"""
模型的字符串表示
Returns:
str: 用户昵称
"""
return self.nickname
class Meta:
@ -33,6 +60,25 @@ class OAuthUser(models.Model):
class OAuthConfig(models.Model):
"""
OAuth配置模型
用于存储第三方OAuth登录的配置信息
Attributes:
TYPE (tuple): 支持的OAuth平台类型选项
type (CharField): OAuth平台类型从TYPE中选择
appkey (CharField): 第三方平台分配的应用Key最大长度200
appsecret (CharField): 第三方平台分配的应用密钥最大长度200
callback_url (CharField): OAuth回调URL最大长度200
is_enable (BooleanField): 是否启用该OAuth配置
creation_time (DateTimeField): 配置创建时间默认为当前时间
last_modify_time (DateTimeField): 配置最后修改时间默认为当前时间
Meta:
verbose_name: OAuth配置的可读性名称
verbose_name_plural: OAuth配置的复数形式名称
ordering: 按照创建时间倒序排列
"""
TYPE = (
('weibo', _('weibo')),
('google', _('google')),
@ -54,14 +100,27 @@ class OAuthConfig(models.Model):
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
def clean(self):
"""
验证模型数据
确保同类型的配置只能存在一个实例
Raises:
ValidationError: 当已存在相同类型的配置时抛出验证错误
"""
if OAuthConfig.objects.filter(
type=self.type).exclude(id=self.id).count():
raise ValidationError(_(self.type + _('already exists')))
def __str__(self):
"""
模型的字符串表示
Returns:
str: 配置类型
"""
return self.type
class Meta:
verbose_name = 'oauth配置'
verbose_name_plural = verbose_name
ordering = ['-creation_time']
ordering = ['-creation_time']

@ -29,55 +29,133 @@ class BaseOauthManager(metaclass=ABCMeta):
ICON_NAME = None
def __init__(self, access_token=None, openid=None):
"""
初始化OAuth管理器
Args:
access_token (str, optional): 访问令牌
openid (str, optional): 用户唯一标识
"""
self.access_token = access_token
self.openid = openid
@property
def is_access_token_set(self):
"""检查访问令牌是否已设置"""
return self.access_token is not None
@property
def is_authorized(self):
"""检查是否已授权访问令牌和openid都已设置"""
return self.is_access_token_set and self.access_token is not None and self.openid is not None
@abstractmethod
def get_authorization_url(self, nexturl='/'):
"""
获取授权URL
Args:
nexturl (str): 授权后跳转的URL
Returns:
str: 授权页面URL
"""
pass
@abstractmethod
def get_access_token_by_code(self, code):
"""
通过授权码获取访问令牌
Args:
code (str): 授权码
Returns:
str: 访问令牌
"""
pass
@abstractmethod
def get_oauth_userinfo(self):
"""
获取OAuth用户信息
Returns:
OAuthUser: OAuth用户对象
"""
pass
@abstractmethod
def get_picture(self, metadata):
"""
从元数据中获取用户头像
Args:
metadata (str): 用户元数据JSON字符串
Returns:
str: 头像URL
"""
pass
def do_get(self, url, params, headers=None):
"""
发送GET请求
Args:
url (str): 请求URL
params (dict): 请求参数
headers (dict, optional): 请求头
Returns:
str: 响应文本
"""
rsp = requests.get(url=url, params=params, headers=headers)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
"""
发送POST请求
Args:
url (str): 请求URL
params (dict): 请求参数
headers (dict, optional): 请求头
Returns:
str: 响应文本
"""
rsp = requests.post(url, params, headers=headers)
logger.info(rsp.text)
return rsp.text
def get_config(self):
"""
获取OAuth配置信息
Returns:
OAuthConfig: OAuth配置对象
"""
value = OAuthConfig.objects.filter(type=self.ICON_NAME)
return value[0] if value else None
class WBOauthManager(BaseOauthManager):
"""新浪微博OAuth管理器"""
AUTH_URL = 'https://api.weibo.com/oauth2/authorize'
TOKEN_URL = 'https://api.weibo.com/oauth2/access_token'
API_URL = 'https://api.weibo.com/2/users/show.json'
ICON_NAME = 'weibo'
def __init__(self, access_token=None, openid=None):
"""
初始化新浪微博OAuth管理器
Args:
access_token (str, optional): 访问令牌
openid (str, optional): 用户唯一标识
"""
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
@ -89,6 +167,15 @@ class WBOauthManager(BaseOauthManager):
openid=openid)
def get_authorization_url(self, nexturl='/'):
"""
获取新浪微博授权URL
Args:
nexturl (str): 授权后跳转的URL
Returns:
str: 新浪微博授权页面URL
"""
params = {
'client_id': self.client_id,
'response_type': 'code',
@ -98,7 +185,18 @@ class WBOauthManager(BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""
通过授权码获取新浪微博访问令牌
Args:
code (str): 授权码
Returns:
OAuthUser: OAuth用户对象
Raises:
OAuthAccessTokenException: 获取访问令牌失败时抛出异常
"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
@ -117,6 +215,12 @@ class WBOauthManager(BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""
获取新浪微博用户信息
Returns:
OAuthUser: OAuth用户对象
"""
if not self.is_authorized:
return None
params = {
@ -142,12 +246,26 @@ class WBOauthManager(BaseOauthManager):
return None
def get_picture(self, metadata):
"""
从新浪微博元数据中获取用户头像
Args:
metadata (str): 用户元数据JSON字符串
Returns:
str: 头像URL
"""
datas = json.loads(metadata)
return datas['avatar_large']
class ProxyManagerMixin:
"""代理管理混入类用于支持HTTP代理"""
def __init__(self, *args, **kwargs):
"""
初始化代理设置
"""
if os.environ.get("HTTP_PROXY"):
self.proxies = {
"http": os.environ.get("HTTP_PROXY"),
@ -157,23 +275,53 @@ class ProxyManagerMixin:
self.proxies = None
def do_get(self, url, params, headers=None):
"""
发送带代理的GET请求
Args:
url (str): 请求URL
params (dict): 请求参数
headers (dict, optional): 请求头
Returns:
str: 响应文本
"""
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
"""
发送带代理的POST请求
Args:
url (str): 请求URL
params (dict): 请求参数
headers (dict, optional): 请求头
Returns:
str: 响应文本
"""
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
"""Google OAuth管理器"""
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
ICON_NAME = 'google'
def __init__(self, access_token=None, openid=None):
"""
初始化Google OAuth管理器
Args:
access_token (str, optional): 访问令牌
openid (str, optional): 用户唯一标识
"""
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
@ -185,6 +333,15 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
openid=openid)
def get_authorization_url(self, nexturl='/'):
"""
获取Google授权URL
Args:
nexturl (str): 授权后跳转的URL
Returns:
str: Google授权页面URL
"""
params = {
'client_id': self.client_id,
'response_type': 'code',
@ -195,6 +352,18 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""
通过授权码获取Google访问令牌
Args:
code (str): 授权码
Returns:
str: 访问令牌
Raises:
OAuthAccessTokenException: 获取访问令牌失败时抛出异常
"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
@ -216,6 +385,12 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""
获取Google用户信息
Returns:
OAuthUser: OAuth用户对象
"""
if not self.is_authorized:
return None
params = {
@ -241,17 +416,34 @@ class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
return None
def get_picture(self, metadata):
"""
从Google元数据中获取用户头像
Args:
metadata (str): 用户元数据JSON字符串
Returns:
str: 头像URL
"""
datas = json.loads(metadata)
return datas['picture']
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
"""GitHub OAuth管理器"""
AUTH_URL = 'https://github.com/login/oauth/authorize'
TOKEN_URL = 'https://github.com/login/oauth/access_token'
API_URL = 'https://api.github.com/user'
ICON_NAME = 'github'
def __init__(self, access_token=None, openid=None):
"""
初始化GitHub OAuth管理器
Args:
access_token (str, optional): 访问令牌
openid (str, optional): 用户唯一标识
"""
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
@ -263,6 +455,15 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
openid=openid)
def get_authorization_url(self, next_url='/'):
"""
获取GitHub授权URL
Args:
next_url (str): 授权后跳转的URL
Returns:
str: GitHub授权页面URL
"""
params = {
'client_id': self.client_id,
'response_type': 'code',
@ -273,6 +474,18 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""
通过授权码获取GitHub访问令牌
Args:
code (str): 授权码
Returns:
str: 访问令牌
Raises:
OAuthAccessTokenException: 获取访问令牌失败时抛出异常
"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
@ -292,7 +505,12 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""
获取GitHub用户信息
Returns:
OAuthUser: OAuth用户对象
"""
rsp = self.do_get(self.API_URL, params={}, headers={
"Authorization": "token " + self.access_token
})
@ -314,17 +532,34 @@ class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
return None
def get_picture(self, metadata):
"""
从GitHub元数据中获取用户头像
Args:
metadata (str): 用户元数据JSON字符串
Returns:
str: 头像URL
"""
datas = json.loads(metadata)
return datas['avatar_url']
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
"""Facebook OAuth管理器"""
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth'
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token'
API_URL = 'https://graph.facebook.com/me'
ICON_NAME = 'facebook'
def __init__(self, access_token=None, openid=None):
"""
初始化Facebook OAuth管理器
Args:
access_token (str, optional): 访问令牌
openid (str, optional): 用户唯一标识
"""
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
@ -336,6 +571,15 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
openid=openid)
def get_authorization_url(self, next_url='/'):
"""
获取Facebook授权URL
Args:
next_url (str): 授权后跳转的URL
Returns:
str: Facebook授权页面URL
"""
params = {
'client_id': self.client_id,
'response_type': 'code',
@ -346,6 +590,18 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""
通过授权码获取Facebook访问令牌
Args:
code (str): 授权码
Returns:
str: 访问令牌
Raises:
OAuthAccessTokenException: 获取访问令牌失败时抛出异常
"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
@ -365,6 +621,12 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_oauth_userinfo(self):
"""
获取Facebook用户信息
Returns:
OAuthUser: OAuth用户对象
"""
params = {
'access_token': self.access_token,
'fields': 'id,name,picture,email'
@ -388,11 +650,21 @@ class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
return None
def get_picture(self, metadata):
"""
从Facebook元数据中获取用户头像
Args:
metadata (str): 用户元数据JSON字符串
Returns:
str: 头像URL
"""
datas = json.loads(metadata)
return str(datas['picture']['data']['url'])
class QQOauthManager(BaseOauthManager):
"""QQ OAuth管理器"""
AUTH_URL = 'https://graph.qq.com/oauth2.0/authorize'
TOKEN_URL = 'https://graph.qq.com/oauth2.0/token'
API_URL = 'https://graph.qq.com/user/get_user_info'
@ -400,6 +672,13 @@ class QQOauthManager(BaseOauthManager):
ICON_NAME = 'qq'
def __init__(self, access_token=None, openid=None):
"""
初始化QQ OAuth管理器
Args:
access_token (str, optional): 访问令牌
openid (str, optional): 用户唯一标识
"""
config = self.get_config()
self.client_id = config.appkey if config else ''
self.client_secret = config.appsecret if config else ''
@ -411,6 +690,15 @@ class QQOauthManager(BaseOauthManager):
openid=openid)
def get_authorization_url(self, next_url='/'):
"""
获取QQ授权URL
Args:
next_url (str): 授权后跳转的URL
Returns:
str: QQ授权页面URL
"""
params = {
'response_type': 'code',
'client_id': self.client_id,
@ -420,6 +708,18 @@ class QQOauthManager(BaseOauthManager):
return url
def get_access_token_by_code(self, code):
"""
通过授权码获取QQ访问令牌
Args:
code (str): 授权码
Returns:
str: 访问令牌
Raises:
OAuthAccessTokenException: 获取访问令牌失败时抛出异常
"""
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
@ -438,6 +738,12 @@ class QQOauthManager(BaseOauthManager):
raise OAuthAccessTokenException(rsp)
def get_open_id(self):
"""
获取QQ用户openid
Returns:
str: 用户openid
"""
if self.is_access_token_set:
params = {
'access_token': self.access_token
@ -454,6 +760,12 @@ class QQOauthManager(BaseOauthManager):
return openid
def get_oauth_userinfo(self):
"""
获取QQ用户信息
Returns:
OAuthUser: OAuth用户对象
"""
openid = self.get_open_id()
if openid:
params = {
@ -477,12 +789,27 @@ class QQOauthManager(BaseOauthManager):
return user
def get_picture(self, metadata):
"""
从QQ元数据中获取用户头像
Args:
metadata (str): 用户元数据JSON字符串
Returns:
str: 头像URL
"""
datas = json.loads(metadata)
return str(datas['figureurl'])
@cache_decorator(expiration=100 * 60)
def get_oauth_apps():
"""
获取所有启用的OAuth应用
Returns:
list: OAuth应用管理器实例列表
"""
configs = OAuthConfig.objects.filter(is_enable=True).all()
if not configs:
return []
@ -493,6 +820,15 @@ def get_oauth_apps():
def get_manager_by_type(type):
"""
根据类型获取OAuth管理器
Args:
type (str): OAuth类型('weibo', 'google', 'github')
Returns:
BaseOauthManager: 对应的OAuth管理器实例
"""
applications = get_oauth_apps()
if applications:
finds = list(

@ -1,22 +1,65 @@
"""
OAuth模块模板标签
该模块提供用于在模板中显示OAuth登录按钮的模板标签
"""
from django import template
from django.urls import reverse
from oauth.oauthmanager import get_oauth_apps
# 注册模板标签库
register = template.Library()
@register.inclusion_tag('oauth/oauth_applications.html')
def load_oauth_applications(request):
"""
加载OAuth应用程序模板标签
该模板标签用于在模板中生成OAuth登录按钮它会获取所有已启用的OAuth配置
并为每个OAuth提供商生成相应的登录链接
Args:
request: HTTP请求对象用于获取当前页面路径作为登录后的跳转地址
Returns:
dict: 包含OAuth应用程序信息的字典
- apps: OAuth应用程序列表每个元素包含(平台名称, 登录URL)
Template:
该标签会渲染'oauth/oauth_applications.html'模板文件
Usage:
在模板中使用: {% load oauth_tags %} {% load_oauth_applications request %}
"""
# 获取所有已启用的OAuth应用程序配置
applications = get_oauth_apps()
if applications:
# 获取OAuth登录的基础URL
baseurl = reverse('oauth:oauthlogin')
# 获取当前请求的完整路径,用作登录成功后的跳转地址
path = request.get_full_path()
apps = list(map(lambda x: (x.ICON_NAME, '{baseurl}?type={type}&next_url={next}'.format(
baseurl=baseurl, type=x.ICON_NAME, next=path)), applications))
# 为每个OAuth应用程序生成登录URL
# 格式: baseurl?type={platform}&next_url={current_path}
apps = list(map(
lambda x: (
x.ICON_NAME, # OAuth平台名称如weibo、google、github等
'{baseurl}?type={type}&next_url={next}'.format(
baseurl=baseurl,
type=x.ICON_NAME,
next=path
)
),
applications
))
else:
# 如果没有可用的OAuth应用程序返回空列表
apps = []
return {
'apps': apps
'apps': apps # 返回OAuth应用程序列表供模板使用
}

@ -13,33 +13,55 @@ from oauth.oauthmanager import BaseOauthManager
# Create your tests here.
class OAuthConfigTest(TestCase):
"""OAuth配置测试类"""
def setUp(self):
"""
测试初始化设置
"""
self.client = Client()
self.factory = RequestFactory()
def test_oauth_login_test(self):
"""
测试OAuth登录功能
"""
# 创建微博OAuth配置
c = OAuthConfig()
c.type = 'weibo'
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
# 测试获取OAuth登录链接
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
# 测试授权回调
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
class OauthLoginTest(TestCase):
"""OAuth登录测试类"""
def setUp(self) -> None:
"""
测试初始化设置
"""
self.client = Client()
self.factory = RequestFactory()
self.apps = self.init_apps()
def init_apps(self):
"""
初始化所有OAuth应用
Returns:
list: OAuth应用管理器实例列表
"""
applications = [p() for p in BaseOauthManager.__subclasses__()]
for application in applications:
c = OAuthConfig()
@ -50,6 +72,15 @@ class OauthLoginTest(TestCase):
return applications
def get_app_by_type(self, type):
"""
根据类型获取OAuth应用管理器
Args:
type (str): OAuth类型
Returns:
BaseOauthManager: 对应的OAuth管理器实例
"""
for app in self.apps:
if app.ICON_NAME.lower() == type:
return app
@ -57,12 +88,21 @@ class OauthLoginTest(TestCase):
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_login(self, mock_do_get, mock_do_post):
"""
测试微博登录流程
Args:
mock_do_get: 模拟GET请求
mock_do_post: 模拟POST请求
"""
weibo_app = self.get_app_by_type('weibo')
assert weibo_app
url = weibo_app.get_authorization_url()
# 模拟返回访问令牌
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
})
# 模拟返回用户信息
mock_do_get.return_value = json.dumps({
"avatar_large": "avatar_large",
"screen_name": "screen_name",
@ -76,13 +116,22 @@ class OauthLoginTest(TestCase):
@patch("oauth.oauthmanager.GoogleOauthManager.do_post")
@patch("oauth.oauthmanager.GoogleOauthManager.do_get")
def test_google_login(self, mock_do_get, mock_do_post):
"""
测试Google登录流程
Args:
mock_do_get: 模拟GET请求
mock_do_post: 模拟POST请求
"""
google_app = self.get_app_by_type('google')
assert google_app
url = google_app.get_authorization_url()
# 模拟返回访问令牌
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
"id_token": "id_token",
})
# 模拟返回用户信息
mock_do_get.return_value = json.dumps({
"picture": "picture",
"name": "name",
@ -97,12 +146,21 @@ class OauthLoginTest(TestCase):
@patch("oauth.oauthmanager.GitHubOauthManager.do_post")
@patch("oauth.oauthmanager.GitHubOauthManager.do_get")
def test_github_login(self, mock_do_get, mock_do_post):
"""
测试GitHub登录流程
Args:
mock_do_get: 模拟GET请求
mock_do_post: 模拟POST请求
"""
github_app = self.get_app_by_type('github')
assert github_app
url = github_app.get_authorization_url()
self.assertTrue("github.com" in url)
self.assertTrue("client_id" in url)
# 模拟返回访问令牌
mock_do_post.return_value = "access_token=gho_16C7e42F292c6912E7710c838347Ae178B4a&scope=repo%2Cgist&token_type=bearer"
# 模拟返回用户信息
mock_do_get.return_value = json.dumps({
"avatar_url": "avatar_url",
"name": "name",
@ -117,13 +175,22 @@ class OauthLoginTest(TestCase):
@patch("oauth.oauthmanager.FaceBookOauthManager.do_post")
@patch("oauth.oauthmanager.FaceBookOauthManager.do_get")
def test_facebook_login(self, mock_do_get, mock_do_post):
"""
测试Facebook登录流程
Args:
mock_do_get: 模拟GET请求
mock_do_post: 模拟POST请求
"""
facebook_app = self.get_app_by_type('facebook')
assert facebook_app
url = facebook_app.get_authorization_url()
self.assertTrue("facebook.com" in url)
# 模拟返回访问令牌
mock_do_post.return_value = json.dumps({
"access_token": "access_token",
})
# 模拟返回用户信息
mock_do_get.return_value = json.dumps({
"name": "name",
"id": "id",
@ -149,6 +216,12 @@ class OauthLoginTest(TestCase):
})
])
def test_qq_login(self, mock_do_get):
"""
测试QQ登录流程
Args:
mock_do_get: 模拟GET请求序列
"""
qq_app = self.get_app_by_type('qq')
assert qq_app
url = qq_app.get_authorization_url()
@ -160,6 +233,13 @@ class OauthLoginTest(TestCase):
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_with_email(self, mock_do_get, mock_do_post):
"""
测试微博授权登录带邮箱
Args:
mock_do_get: 模拟GET请求
mock_do_post: 模拟POST请求
"""
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
@ -172,14 +252,17 @@ class OauthLoginTest(TestCase):
}
mock_do_get.return_value = json.dumps(mock_user_info)
# 测试获取OAuth登录链接
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
# 测试授权回调
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
# 验证用户登录状态
user = auth.get_user(self.client)
assert user.is_authenticated
self.assertTrue(user.is_authenticated)
@ -187,6 +270,7 @@ class OauthLoginTest(TestCase):
self.assertEqual(user.email, mock_user_info['email'])
self.client.logout()
# 再次测试授权登录
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, '/')
@ -200,6 +284,13 @@ class OauthLoginTest(TestCase):
@patch("oauth.oauthmanager.WBOauthManager.do_post")
@patch("oauth.oauthmanager.WBOauthManager.do_get")
def test_weibo_authoriz_login_without_email(self, mock_do_get, mock_do_post):
"""
测试微博授权登录不带邮箱需要用户输入邮箱
Args:
mock_do_get: 模拟GET请求
mock_do_post: 模拟POST请求
"""
mock_do_post.return_value = json.dumps({"access_token": "access_token",
"uid": "uid"
@ -211,17 +302,21 @@ class OauthLoginTest(TestCase):
}
mock_do_get.return_value = json.dumps(mock_user_info)
# 测试获取OAuth登录链接
response = self.client.get('/oauth/oauthlogin?type=weibo')
self.assertEqual(response.status_code, 302)
self.assertTrue("api.weibo.com" in response.url)
# 测试授权回调(无邮箱情况)
response = self.client.get('/oauth/authorize?type=weibo&code=code')
self.assertEqual(response.status_code, 302)
# 验证跳转到邮箱输入页面
oauth_user_id = int(response.url.split('/')[-1].split('.')[0])
self.assertEqual(response.url, f'/oauth/requireemail/{oauth_user_id}.html')
# 提交邮箱表单
response = self.client.post(response.url, {'email': 'test@gmail.com', 'oauthid': oauth_user_id})
self.assertEqual(response.status_code, 302)
@ -233,6 +328,7 @@ class OauthLoginTest(TestCase):
})
self.assertEqual(response.url, f'{url}?type=email')
# 验证邮箱确认链接
path = reverse('oauth:email_confirm', kwargs={
'id': oauth_user_id,
'sign': sign
@ -240,6 +336,8 @@ class OauthLoginTest(TestCase):
response = self.client.get(path)
self.assertEqual(response.status_code, 302)
self.assertEqual(response.url, f'/oauth/bindsuccess/{oauth_user_id}.html?type=success')
# 验证用户登录状态和信息
user = auth.get_user(self.client)
from oauth.models import OAuthUser
oauth_user = OAuthUser.objects.get(author=user)

@ -1,25 +1,62 @@
"""
OAuth模块URL配置
该模块定义了OAuth登录相关的所有URL路由包括授权回调邮箱绑定
登录入口等功能的路由配置
"""
from django.urls import path
from . import views
# 应用命名空间用于URL反向解析
app_name = "oauth"
# OAuth相关URL路由配置
urlpatterns = [
# OAuth授权回调处理路由
# 当用户在第三方平台完成授权后第三方平台会重定向到这个URL
# 系统会处理授权码,获取用户信息并完成登录流程
path(
r'oauth/authorize',
views.authorize),
views.authorize,
name='authorize'
),
# 要求用户输入邮箱的页面路由
# 当OAuth登录时第三方平台未返回邮箱信息时引导用户输入邮箱
# oauthid: OAuth用户记录的ID用于关联用户信息
path(
r'oauth/requireemail/<int:oauthid>.html',
views.RequireEmailView.as_view(),
name='require_email'),
name='require_email'
),
# 邮箱确认路由
# 用户点击邮箱中的确认链接后访问此路由,完成邮箱绑定
# id: OAuth用户ID
# sign: 安全签名,用于验证链接的有效性
path(
r'oauth/emailconfirm/<int:id>/<sign>.html',
views.emailconfirm,
name='email_confirm'),
name='email_confirm'
),
# OAuth绑定成功页面路由
# 显示绑定成功或需要验证邮箱的提示信息
# oauthid: OAuth用户记录的ID
path(
r'oauth/bindsuccess/<int:oauthid>.html',
views.bindsuccess,
name='bindsuccess'),
name='bindsuccess'
),
# OAuth登录入口路由
# 用户点击第三方登录按钮时访问此路由,系统会重定向到第三方授权页面
# 支持通过type参数指定第三方平台类型如weibo、google、github等
path(
r'oauth/oauthlogin',
views.oauthlogin,
name='oauthlogin')]
name='oauthlogin'
)
]

@ -27,6 +27,11 @@ logger = logging.getLogger(__name__)
def get_redirecturl(request):
"""
获取重定向URL
:param request: HTTP请求对象
:return: 重定向URL
"""
nexturl = request.GET.get('next_url', None)
if not nexturl or nexturl == '/login/' or nexturl == '/login':
nexturl = '/'
@ -41,6 +46,12 @@ def get_redirecturl(request):
def oauthlogin(request):
"""
OAuth登录入口
根据请求类型重定向到相应的OAuth提供商
:param request: HTTP请求对象
:return: 重定向响应
"""
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
@ -53,6 +64,12 @@ def oauthlogin(request):
def authorize(request):
"""
OAuth授权回调处理
处理OAuth提供商返回的授权码获取用户信息并登录
:param request: HTTP请求对象
:return: 重定向响应
"""
type = request.GET.get('type', None)
if not type:
return HttpResponseRedirect('/')
@ -125,6 +142,14 @@ def authorize(request):
def emailconfirm(request, id, sign):
"""
邮箱确认绑定处理
验证签名并完成邮箱绑定流程
:param request: HTTP请求对象
:param id: OAuth用户ID
:param sign: 签名
:return: 重定向响应
"""
if not sign:
return HttpResponseForbidden()
if not get_sha256(settings.SECRET_KEY +
@ -171,10 +196,18 @@ def emailconfirm(request, id, sign):
class RequireEmailView(FormView):
"""
要求邮箱视图类
当OAuth用户没有邮箱时要求用户提供邮箱地址
"""
form_class = RequireEmailForm
template_name = 'oauth/require_email.html'
def get(self, request, *args, **kwargs):
"""
处理GET请求
检查OAuth用户是否已有邮箱
"""
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.email:
@ -184,6 +217,10 @@ class RequireEmailView(FormView):
return super(RequireEmailView, self).get(request, *args, **kwargs)
def get_initial(self):
"""
获取表单初始数据
:return: 包含OAuth用户ID的字典
"""
oauthid = self.kwargs['oauthid']
return {
'email': '',
@ -191,6 +228,10 @@ class RequireEmailView(FormView):
}
def get_context_data(self, **kwargs):
"""
获取上下文数据
添加用户头像等信息到模板上下文
"""
oauthid = self.kwargs['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if oauthuser.picture:
@ -198,6 +239,10 @@ class RequireEmailView(FormView):
return super(RequireEmailView, self).get_context_data(**kwargs)
def form_valid(self, form):
"""
处理有效的表单提交
保存邮箱信息并发送确认邮件
"""
email = form.cleaned_data['email']
oauthid = form.cleaned_data['oauthid']
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
@ -234,6 +279,13 @@ class RequireEmailView(FormView):
def bindsuccess(request, oauthid):
"""
绑定成功页面
显示绑定成功或需要验证邮箱的提示信息
:param request: HTTP请求对象
:param oauthid: OAuth用户ID
:return: 渲染的模板响应
"""
type = request.GET.get('type', None)
oauthuser = get_object_or_404(OAuthUser, pk=oauthid)
if type == 'email':
@ -250,4 +302,4 @@ def bindsuccess(request, oauthid):
return render(request, 'oauth/bindsuccess.html', {
'title': title,
'content': content
})
})

@ -1,7 +1,65 @@
"""
OwnTracks位置跟踪模块Django管理后台配置
该模块定义了OwnTracks相关模型在Django管理后台的配置
包括列表显示搜索过滤等功能
"""
from django.contrib import admin
# Register your models here.
from .models import OwnTrackLog
@admin.register(OwnTrackLog)
class OwnTrackLogsAdmin(admin.ModelAdmin):
pass
"""
OwnTrackLogs模型的Django管理后台配置类
用于在Django管理界面中管理OwnTracks位置日志数据
提供搜索过滤分页等功能方便管理员查看和管理位置数据
Attributes:
list_display: 列表页显示的字段
list_filter: 右侧过滤器字段
search_fields: 搜索字段
list_per_page: 每页显示的记录数
ordering: 默认排序方式
readonly_fields: 只读字段
"""
# 在列表页显示的字段
list_display = (
'id', # 记录ID
'tid', # 用户/设备标识符
'lat', # 纬度
'lon', # 经度
'creation_time' # 创建时间
)
# 右侧过滤器,可以通过这些字段进行筛选
list_filter = (
'tid', # 按用户/设备筛选
'creation_time', # 按创建时间筛选
)
# 搜索字段,支持在这些字段中进行关键词搜索
search_fields = (
'tid', # 搜索用户/设备标识符
)
# 每页显示的记录数
list_per_page = 50
# 默认按创建时间倒序排列(最新的记录在前)
ordering = ['-creation_time']
# 只读字段,这些字段在编辑时不可修改
readonly_fields = (
'creation_time', # 创建时间通常不允许修改
)
# 可以点击进入详情页的字段
list_display_links = ('id', 'tid')
# 日期层次导航,按创建时间分层显示
date_hierarchy = 'creation_time'

@ -1,5 +1,30 @@
"""
OwnTracks位置跟踪应用配置
该模块定义了OwnTracks应用的配置类用于Django应用的初始化和配置
"""
from django.apps import AppConfig
class OwntracksConfig(AppConfig):
"""
OwnTracks应用的配置类
该类继承自Django的AppConfig用于配置OwnTracks位置跟踪应用的基本信息
当Django启动时会使用这个配置类来初始化OwnTracks应用
Attributes:
name (str): 应用名称必须与应用的目录名一致
verbose_name (str): 应用的显示名称可选
default_auto_field (str): 默认主键字段类型可选
"""
# 应用名称Django会根据这个名称来识别和加载相应的应用
# 必须与应用的目录名owntracks一致
name = 'owntracks'
# 应用的显示名称在Django管理后台中显示
verbose_name = 'OwnTracks位置跟踪'
# 指定默认的主键字段类型Django 3.2+推荐使用
default_auto_field = 'django.db.models.BigAutoField'

@ -5,27 +5,37 @@ import django.utils.timezone
class Migration(migrations.Migration):
"""
初始迁移文件用于创建OwnTrackLog模型表
"""
initial = True
dependencies = [
# 依赖列表为空,表示这是初始迁移
]
operations = [
# 创建OwnTrackLog模型的操作
migrations.CreateModel(
name='OwnTrackLog',
fields=[
# 主键字段,自动创建
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 用户标识字段最大长度100
('tid', models.CharField(max_length=100, verbose_name='用户')),
# 纬度字段,浮点数类型
('lat', models.FloatField(verbose_name='纬度')),
# 经度字段,浮点数类型
('lon', models.FloatField(verbose_name='经度')),
# 创建时间字段,默认值为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
],
options={
'verbose_name': 'OwnTrackLogs',
'verbose_name_plural': 'OwnTrackLogs',
'ordering': ['created_time'],
'get_latest_by': 'created_time',
'verbose_name': 'OwnTrackLogs', # 模型的可读名称
'verbose_name_plural': 'OwnTrackLogs', # 模型的复数可读名称
'ordering': ['created_time'], # 默认排序字段
'get_latest_by': 'created_time', # 获取最新记录的字段
},
),
]

@ -4,16 +4,27 @@ from django.db import migrations
class Migration(migrations.Migration):
"""
数据库迁移文件用于修改 OwnTrackLog 模型的选项和字段名
"""
dependencies = [
# 依赖于 owntracks 应用的 0001_initial 迁移文件
('owntracks', '0001_initial'),
]
operations = [
# 修改 OwnTrackLog 模型的选项配置
migrations.AlterModelOptions(
name='owntracklog',
options={'get_latest_by': 'creation_time', 'ordering': ['creation_time'], 'verbose_name': 'OwnTrackLogs', 'verbose_name_plural': 'OwnTrackLogs'},
options={
'get_latest_by': 'creation_time', # 更改获取最新记录的字段为 creation_time
'ordering': ['creation_time'], # 更改默认排序字段为 creation_time
'verbose_name': 'OwnTrackLogs', # 模型的可读名称
'verbose_name_plural': 'OwnTrackLogs' # 模型的复数可读名称
},
),
# 重命名字段:将 created_time 字段更名为 creation_time
migrations.RenameField(
model_name='owntracklog',
old_name='created_time',

@ -1,20 +1,89 @@
"""
OwnTracks位置跟踪模块数据模型
该模块定义了OwnTracks位置跟踪应用的数据模型
OwnTracks是一个开源的位置跟踪应用用户可以通过手机应用
自动记录位置信息并发送到服务器进行存储和可视化
主要功能
- 接收来自OwnTracks客户端的位置数据
- 存储用户的位置轨迹信息
- 提供位置数据的查询和展示功能
"""
from django.db import models
from django.utils.timezone import now
# Create your models here.
class OwnTrackLog(models.Model):
tid = models.CharField(max_length=100, null=False, verbose_name='用户')
lat = models.FloatField(verbose_name='纬度')
lon = models.FloatField(verbose_name='经度')
creation_time = models.DateTimeField('创建时间', default=now)
"""
OwnTracks位置日志模型
用于存储来自OwnTracks客户端的位置跟踪数据每条记录包含
用户标识经纬度坐标和记录时间等信息
Attributes:
tid (CharField): 用户/设备标识符用于区分不同的用户或设备
最大长度100个字符不能为空
lat (FloatField): 纬度坐标使用WGS84坐标系
lon (FloatField): 经度坐标使用WGS84坐标系
creation_time (DateTimeField): 日志创建时间默认为当前时间
Meta:
ordering: 按照创建时间升序排列便于按时间顺序查看轨迹
verbose_name: OwnTrack日志的可读性名称
verbose_name_plural: OwnTrack日志的复数形式名称
get_latest_by: 指定用于latest()查询的字段
Example:
>>> log = OwnTrackLog(tid='user123', lat=39.9042, lon=116.4074)
>>> log.save()
"""
# 用户/设备标识符,用于区分不同的用户或设备
tid = models.CharField(
max_length=100,
null=False,
verbose_name='用户',
help_text='用户或设备的唯一标识符'
)
# 纬度坐标使用WGS84坐标系
lat = models.FloatField(
verbose_name='纬度',
help_text='位置纬度使用WGS84坐标系'
)
# 经度坐标使用WGS84坐标系
lon = models.FloatField(
verbose_name='经度',
help_text='位置经度使用WGS84坐标系'
)
# 记录创建时间,默认为当前时间
creation_time = models.DateTimeField(
'创建时间',
default=now,
help_text='位置记录的时间戳'
)
def __str__(self):
"""
模型的字符串表示
返回用户ID用于在Django管理后台和调试时显示
Returns:
str: 用户标识符
"""
return self.tid
class Meta:
# 按照创建时间升序排列,便于按时间顺序查看轨迹
ordering = ['creation_time']
verbose_name = "OwnTrackLogs"
verbose_name_plural = verbose_name
get_latest_by = 'creation_time'
# 在Django管理后台中的显示名称
verbose_name = "OwnTracks位置日志"
verbose_name_plural = "OwnTracks位置日志"
# 指定用于latest()查询的字段
get_latest_by = 'creation_time'

@ -9,11 +9,32 @@ from .models import OwnTrackLog
# Create your tests here.
class OwnTrackLogTest(TestCase):
"""
OwnTrackLog模型的测试类
用于测试OwnTracks功能的相关接口和数据处理逻辑
"""
def setUp(self):
"""
测试初始化方法
创建测试客户端和请求工厂实例用于模拟HTTP请求
"""
self.client = Client()
self.factory = RequestFactory()
def test_own_track_log(self):
"""
测试OwnTrack日志记录功能
验证以下功能
1. 正常的日志数据能够正确保存到数据库
2. 缺少必要字段的日志数据会被拒绝
3. 地图展示功能的访问权限控制
4. 管理员用户登录后可以正常访问地图和数据接口
"""
# 测试正常数据提交
o = {
'tid': 12,
'lat': 123.123,
@ -27,6 +48,7 @@ class OwnTrackLogTest(TestCase):
length = len(OwnTrackLog.objects.all())
self.assertEqual(length, 1)
# 测试缺少必要字段的数据提交(应被拒绝)
o = {
'tid': 12,
'lat': 123.123
@ -39,21 +61,26 @@ class OwnTrackLogTest(TestCase):
length = len(OwnTrackLog.objects.all())
self.assertEqual(length, 1)
# 测试未登录用户访问地图功能(应重定向到登录页面)
rsp = self.client.get('/owntracks/show_maps')
self.assertEqual(rsp.status_code, 302)
# 创建管理员用户并登录
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",
password="liangliangyy1")
self.client.login(username='liangliangyy1', password='liangliangyy1')
# 添加测试数据
s = OwnTrackLog()
s.tid = 12
s.lon = 123.234
s.lat = 34.234
s.save()
# 测试各种数据展示接口的访问
rsp = self.client.get('/owntracks/show_dates')
self.assertEqual(rsp.status_code, 200)
rsp = self.client.get('/owntracks/show_maps')

@ -1,12 +1,60 @@
"""
OwnTracks位置跟踪模块URL配置
该模块定义了OwnTracks位置跟踪相关的所有URL路由包括
- 接收客户端位置数据的API接口
- 显示位置轨迹地图的页面
- 提供位置数据的JSON API
- 管理位置日志数据的页面
主要路由
1. logtracks: 接收OwnTracks客户端发送的位置数据
2. show_maps: 显示位置轨迹地图页面
3. get_datas: 获取位置数据的JSON API接口
4. show_dates: 显示有位置记录的日期列表页面
"""
from django.urls import path
from . import views
# 应用命名空间用于URL反向解析
app_name = "owntracks"
# OwnTracks位置跟踪相关URL路由配置
urlpatterns = [
path('owntracks/logtracks', views.manage_owntrack_log, name='logtracks'),
path('owntracks/show_maps', views.show_maps, name='show_maps'),
path('owntracks/get_datas', views.get_datas, name='get_datas'),
path('owntracks/show_dates', views.show_log_dates, name='show_dates')
# 接收OwnTracks客户端位置数据的日志记录接口
# 该接口接收POST请求包含JSON格式的位置数据
# OwnTracks客户端会自动向此接口发送位置信息
path(
'owntracks/logtracks',
views.manage_owntrack_log,
name='logtracks'
),
# 显示地图页面,展示位置轨迹
# 仅允许超级用户访问,显示指定日期的位置轨迹数据
# 支持通过GET参数指定查询日期格式: ?date=YYYY-MM-DD
path(
'owntracks/show_maps',
views.show_maps,
name='show_maps'
),
# 获取位置数据接口,用于前端地图渲染
# 返回JSON格式的位置轨迹数据供前端JavaScript调用
# 支持通过GET参数指定查询日期格式: ?date=YYYY-MM-DD
path(
'owntracks/get_datas',
views.get_datas,
name='get_datas'
),
# 显示可用的日志日期列表页面
# 显示数据库中有位置记录的所有日期,方便用户选择查看
path(
'owntracks/show_dates',
views.show_log_dates,
name='show_dates'
)
]

@ -1,4 +1,19 @@
# Create your views here.
"""
OwnTracks位置跟踪模块视图函数
该模块包含处理OwnTracks位置跟踪相关的所有视图函数包括
- 接收来自OwnTracks客户端的位置数据
- 显示位置轨迹地图
- 提供位置数据的API接口
- 管理位置日志数据
主要功能
1. manage_owntrack_log: 接收并存储位置数据
2. show_maps: 显示位置轨迹地图页面
3. show_log_dates: 显示有位置记录的日期列表
4. get_datas: 提供位置数据的JSON API接口
"""
import datetime
import itertools
import json
@ -16,11 +31,24 @@ from django.views.decorators.csrf import csrf_exempt
from .models import OwnTrackLog
# 获取日志记录器
logger = logging.getLogger(__name__)
@csrf_exempt
def manage_owntrack_log(request):
"""
处理OwnTracks客户端发送的位置日志数据
该视图接收来自OwnTracks应用程序的POST请求解析其中的tid(设备ID)
lat(纬度)lon(经度)等位置信息并保存到OwnTrackLog数据库中
Args:
request: HTTP请求对象包含JSON格式的位置数据
Returns:
HttpResponse: 返回处理结果状态('ok'/'data error'/'error')
"""
try:
s = json.loads(request.read().decode('utf-8'))
tid = s['tid']
@ -46,6 +74,18 @@ def manage_owntrack_log(request):
@login_required
def show_maps(request):
"""
显示位置轨迹地图页面
仅允许超级用户访问显示指定日期的位置轨迹数据
如果未指定日期则默认显示当天数据
Args:
request: HTTP请求对象
Returns:
HttpResponse: 地图页面或403 Forbidden响应
"""
if request.user.is_superuser:
defaultdate = str(datetime.datetime.now(timezone.utc).date())
date = request.GET.get('date', defaultdate)
@ -60,6 +100,17 @@ def show_maps(request):
@login_required
def show_log_dates(request):
"""
显示有位置记录的日期列表
获取数据库中所有位置记录的日期并去重排序后返回给前端
Args:
request: HTTP请求对象
Returns:
HttpResponse: 包含日期列表的页面
"""
dates = OwnTrackLog.objects.values_list('creation_time', flat=True)
results = list(sorted(set(map(lambda x: x.strftime('%Y-%m-%d'), dates))))
@ -70,58 +121,131 @@ def show_log_dates(request):
def convert_to_amap(locations):
"""
将GPS坐标转换为高德地图坐标
由于高德地图API限制每次最多转换30个坐标点因此需要分批处理
GPS坐标使用WGS84坐标系而高德地图使用GCJ02坐标系需要进行坐标转换
Args:
locations: OwnTrackLog对象列表包含需要转换的位置数据
Returns:
str: 转换后的坐标字符串格式为"经度,纬度;经度,纬度;..."
Note:
- 该函数使用了高德地图的坐标转换API
- API Key是硬编码的在生产环境中应该从配置文件读取
- 每次最多转换30个坐标点超过的需要分批处理
"""
convert_result = []
it = iter(locations)
# 每次处理30个坐标点高德地图API限制
item = list(itertools.islice(it, 30))
while item:
# 将坐标点转换为"经度,纬度"格式的字符串,并用分号连接
# 使用set去重避免重复的坐标点
datas = ';'.join(
set(map(lambda x: str(x.lon) + ',' + str(x.lat), item)))
# 高德地图API配置注意生产环境中应从配置文件读取
key = '8440a376dfc9743d8924bf0ad141f28e'
api = 'http://restapi.amap.com/v3/assistant/coordinate/convert'
# 构建API请求参数
query = {
'key': key,
'locations': datas,
'coordsys': 'gps'
'key': key, # API密钥
'locations': datas, # 需要转换的坐标点
'coordsys': 'gps' # 源坐标系GPS/WGS84
}
# 发送API请求
rsp = requests.get(url=api, params=query)
result = json.loads(rsp.text)
# 检查API响应是否包含转换后的坐标
if "locations" in result:
convert_result.append(result['locations'])
# 获取下一批坐标点
item = list(itertools.islice(it, 30))
# 将所有转换结果用分号连接返回
return ";".join(convert_result)
@login_required
def get_datas(request):
"""
获取位置轨迹数据接口
根据指定日期查询位置数据按设备ID分组并按时间排序
返回JSON格式的数据供前端地图渲染使用
Args:
request: HTTP请求对象可通过GET参数指定查询日期
- date: 查询日期格式为'YYYY-MM-DD'如未指定则使用当天
Returns:
JsonResponse: 位置轨迹数据格式为:
[
{
"name": "设备ID",
"path": [["经度", "纬度"], ["经度", "纬度"], ...]
},
...
]
Example:
请求: GET /owntracks/get_datas?date=2023-12-01
响应: [{"name": "user123", "path": [["116.4074", "39.9042"], ...]}]
"""
# 获取当前时间作为默认查询日期
now = django.utils.timezone.now().replace(tzinfo=timezone.utc)
querydate = django.utils.timezone.datetime(
now.year, now.month, now.day, 0, 0, 0)
# 如果请求中指定了日期参数,则使用指定的日期
if request.GET.get('date', None):
date = list(map(lambda x: int(x), request.GET.get('date').split('-')))
date_str = request.GET.get('date')
# 解析日期字符串,格式: YYYY-MM-DD
date = list(map(lambda x: int(x), date_str.split('-')))
querydate = django.utils.timezone.datetime(
date[0], date[1], date[2], 0, 0, 0)
# 计算查询的结束时间第二天0点
nextdate = querydate + datetime.timedelta(days=1)
# 查询指定日期范围内的位置数据
models = OwnTrackLog.objects.filter(
creation_time__range=(querydate, nextdate))
result = list()
if models and len(models):
# 按设备ID分组每个设备生成一条轨迹记录
for tid, item in groupby(
sorted(models, key=lambda k: k.tid), key=lambda k: k.tid):
d = dict()
d["name"] = tid
d["name"] = tid # 设备ID作为轨迹名称
paths = list()
# 使用高德转换后的经纬度
# 选项1: 使用高德地图转换后的经纬度(需要坐标转换)
# 注意当前代码中这部分被注释掉了直接使用原始GPS坐标
# locations = convert_to_amap(
# sorted(item, key=lambda x: x.creation_time))
# for i in locations.split(';'):
# paths.append(i.split(','))
# 使用GPS原始经纬度
# 选项2: 使用GPS原始经纬度当前使用的方案
# 按时间排序,确保轨迹点的顺序正确
for location in sorted(item, key=lambda x: x.creation_time):
paths.append([str(location.lon), str(location.lat)])
d["path"] = paths
result.append(d)
# 返回JSON响应safe=False允许返回非字典类型的数据
return JsonResponse(result, safe=False)

Loading…
Cancel
Save