周俊杰提交

master
周俊杰 1 month ago
parent 4028036a88
commit 4395c3dd3a

@ -5,83 +5,66 @@ from django.urls import reverse
from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
# Register your models here.
from .models import Article, Category, Tag, Links, SideBar, BlogSettings
# 文章表单(可扩展,比如集成富文本编辑器)
class ArticleForm(forms.ModelForm):
# body = forms.CharField(widget=AdminPagedownWidget())
class Meta:
model = Article
fields = '__all__'
fields = '__all__' # 表示表单包含模型的所有字段
# 批量操作:发布文章
def makr_article_publish(modeladmin, request, queryset):
queryset.update(status='p')
queryset.update(status='p') # 将选中文章状态改为 'p'ublished
makr_article_publish.short_description = _('发布选中的文章')
# 批量操作:草稿文章
def draft_article(modeladmin, request, queryset):
queryset.update(status='d')
queryset.update(status='d') # 草稿状态
draft_article.short_description = _('将选中文章设为草稿')
# 批量操作:关闭文章评论
def close_article_commentstatus(modeladmin, request, queryset):
queryset.update(comment_status='c')
queryset.update(comment_status='c') # 关闭评论
close_article_commentstatus.short_description = _('关闭文章评论')
# 批量操作:开放文章评论
def open_article_commentstatus(modeladmin, request, queryset):
queryset.update(comment_status='o')
makr_article_publish.short_description = _('Publish selected articles')
draft_article.short_description = _('Draft selected articles')
close_article_commentstatus.short_description = _('Close article comments')
open_article_commentstatus.short_description = _('Open article comments')
queryset.update(comment_status='o') # 开放评论
open_article_commentstatus.short_description = _('开放文章评论')
class ArticlelAdmin(admin.ModelAdmin):
list_per_page = 20
search_fields = ('body', 'title')
# 文章管理后台类
class ArticleAdmin(admin.ModelAdmin):
list_per_page = 20 # 每页显示20条
search_fields = ('body', 'title') # 可以搜索正文和标题
form = ArticleForm
list_display = (
'id',
'title',
'author',
'link_to_category',
'creation_time',
'views',
'status',
'type',
'article_order')
list_display_links = ('id', 'title')
list_filter = ('status', 'type', 'category')
date_hierarchy = 'creation_time'
filter_horizontal = ('tags',)
exclude = ('creation_time', 'last_modify_time')
view_on_site = True
actions = [
makr_article_publish,
draft_article,
close_article_commentstatus,
open_article_commentstatus]
raw_id_fields = ('author', 'category',)
list_display = ( # 列表页显示的字段
'id', 'title', 'author', 'link_to_category', 'creation_time',
'views', 'status', 'type', 'article_order'
)
list_display_links = ('id', 'title') # 哪些字段可点击进入编辑页
list_filter = ('status', 'type', 'category') # 右侧过滤器
date_hierarchy = 'creation_time' # 按创建时间分层筛选
filter_horizontal = ('tags',) # 多对多字段(标签)以横向过滤器展示
exclude = ('creation_time', 'last_modify_time') # 后台不显示这两个字段
view_on_site = True # 显示“查看站点”按钮
actions = [makr_article_publish, draft_article, close_article_commentstatus, open_article_commentstatus] # 批量操作
raw_id_fields = ('author', 'category') # 作者和分类以 ID 输入框展示,适合外键多的情况
# 自定义方法:分类显示为可点击链接
def link_to_category(self, obj):
info = (obj.category._meta.app_label, obj.category._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.category.id,))
return format_html(u'<a href="%s">%s</a>' % (link, obj.category.name))
link_to_category.short_description = _('分类')
link_to_category.short_description = _('category')
# 限制文章作者只能选择超级用户
def get_form(self, request, obj=None, **kwargs):
form = super(ArticlelAdmin, self).get_form(request, obj, **kwargs)
form.base_fields['author'].queryset = get_user_model(
).objects.filter(is_superuser=True)
form = super(ArticleAdmin, self).get_form(request, obj, **kwargs)
form.base_fields['author'].queryset = get_user_model().objects.filter(is_superuser=True)
return form
def save_model(self, request, obj, form, change):
super(ArticlelAdmin, self).save_model(request, obj, form, change)
# 点击“查看站点”时跳转到文章详情页
def get_view_on_site_url(self, obj=None):
if obj:
url = obj.get_full_url()
@ -91,24 +74,18 @@ class ArticlelAdmin(admin.ModelAdmin):
site = get_current_site().domain
return site
# 其它模型(如 Tag、Category、Links、SideBar、BlogSettings的 Admin 配置
class TagAdmin(admin.ModelAdmin):
exclude = ('slug', 'last_mod_time', 'creation_time')
class CategoryAdmin(admin.ModelAdmin):
list_display = ('name', 'parent_category', 'index')
exclude = ('slug', 'last_mod_time', 'creation_time')
class LinksAdmin(admin.ModelAdmin):
exclude = ('last_mod_time', 'creation_time')
class SideBarAdmin(admin.ModelAdmin):
list_display = ('name', 'content', 'is_enable', 'sequence')
exclude = ('last_mod_time', 'creation_time')
class BlogSettingsAdmin(admin.ModelAdmin):
pass
pass # 博客设置后台,暂时无特殊配置

@ -1,5 +1,4 @@
from django.apps import AppConfig
class BlogConfig(AppConfig):
name = 'blog'
name = 'blog' # 应用名称

@ -1,21 +1,19 @@
import logging
from django.utils import timezone
from djangoblog.utils import cache, get_blog_setting
from .models import Category, Article
logger = logging.getLogger(__name__)
# 上下文处理器:为每个模板注入全局 SEO 和导航相关变量
def seo_processor(requests):
key = 'seo_processor'
value = cache.get(key)
value = cache.get(key) # 先从缓存中读取
if value:
return value
else:
logger.info('set processor cache.')
setting = get_blog_setting()
logger.info('设置处理器缓存。')
setting = get_blog_setting() # 获取博客配置
value = {
'SITE_NAME': setting.site_name,
'SHOW_GOOGLE_ADSENSE': setting.show_google_adsense,
@ -25,19 +23,17 @@ def seo_processor(requests):
'SITE_KEYWORDS': setting.site_keywords,
'SITE_BASE_URL': requests.scheme + '://' + requests.get_host() + '/',
'ARTICLE_SUB_LENGTH': setting.article_sub_length,
'nav_category_list': Category.objects.all(),
'nav_pages': Article.objects.filter(
type='p',
status='p'),
'nav_category_list': Category.objects.all(), # 导航分类
'nav_pages': Article.objects.filter(type='p', status='p'), # 导航文章(已发布页面)
'OPEN_SITE_COMMENT': setting.open_site_comment,
'BEIAN_CODE': setting.beian_code,
'ANALYTICS_CODE': setting.analytics_code,
"BEIAN_CODE_GONGAN": setting.gongan_beiancode,
'BEIAN_CODE': setting.beian_code, # 备案号
'ANALYTICS_CODE': setting.analytics_code, # 统计代码(如 Google Analytics
"BEIAN_CODE_GONGAN": setting.gongan_beiancode, # 公安备案号
"SHOW_GONGAN_CODE": setting.show_gongan_code,
"CURRENT_YEAR": timezone.now().year,
"GLOBAL_HEADER": setting.global_header,
"GLOBAL_FOOTER": setting.global_footer,
"COMMENT_NEED_REVIEW": setting.comment_need_review,
}
cache.set(key, value, 60 * 60 * 10)
return value
cache.set(key, value, 60 * 60 * 10) # 缓存10小时
return value

@ -1,5 +1,4 @@
import time
import elasticsearch.client
from django.conf import settings
from elasticsearch_dsl import Document, InnerDoc, Date, Integer, Long, Text, Object, GeoPoint, Keyword, Boolean
@ -7,54 +6,47 @@ from elasticsearch_dsl.connections import connections
from blog.models import Article
ELASTICSEARCH_ENABLED = hasattr(settings, 'ELASTICSEARCH_DSL')
ELASTICSEARCH_ENABLED = hasattr(settings, 'ELASTICSEARCH_DSL') # 是否启用 Elasticsearch
# 如果启用,则建立连接
if ELASTICSEARCH_ENABLED:
connections.create_connection(
hosts=[settings.ELASTICSEARCH_DSL['default']['hosts']])
connections.create_connection(hosts=[settings.ELASTICSEARCH_DSL['default']['hosts']])
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
from elasticsearch.client import IngestClient
c = IngestClient(es)
try:
c.get_pipeline('geoip')
except elasticsearch.exceptions.NotFoundError:
c.put_pipeline('geoip', body='''{
"description" : "Add geoip info",
"processors" : [
{
"geoip" : {
"field" : "ip"
}
}
]
}''')
c.put_pipeline('geoip', body='''
{
"description": "添加IP地理位置信息",
"processors": [
{ "geoip": { "field": "ip" } }
]
}
''')
# 定义 IP 地理位置信息内部文档
class GeoIp(InnerDoc):
continent_name = Keyword()
country_iso_code = Keyword()
country_name = Keyword()
location = GeoPoint()
# 用户代理(浏览器/设备/操作系统)相关内部类
class UserAgentBrowser(InnerDoc):
Family = Keyword()
Version = Keyword()
class UserAgentOS(UserAgentBrowser):
pass
class UserAgentDevice(InnerDoc):
Family = Keyword()
Brand = Keyword()
Model = Keyword()
class UserAgent(InnerDoc):
browser = Object(UserAgentBrowser, required=False)
os = Object(UserAgentOS, required=False)
@ -62,10 +54,10 @@ class UserAgent(InnerDoc):
string = Text()
is_bot = Boolean()
# 性能监控文档:记录每个请求的 URL、耗时、IP、用户代理等
class ElapsedTimeDocument(Document):
url = Keyword()
time_taken = Long()
time_taken = Long() # 请求耗时(毫秒)
log_datetime = Date()
ip = Keyword()
geoip = Object(GeoIp, required=False)
@ -73,79 +65,15 @@ class ElapsedTimeDocument(Document):
class Index:
name = 'performance'
settings = {
"number_of_shards": 1,
"number_of_replicas": 0
}
class Meta:
doc_type = 'ElapsedTime'
class ElaspedTimeDocumentManager:
@staticmethod
def build_index():
from elasticsearch import Elasticsearch
client = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
res = client.indices.exists(index="performance")
if not res:
ElapsedTimeDocument.init()
@staticmethod
def delete_index():
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
es.indices.delete(index='performance', ignore=[400, 404])
@staticmethod
def create(url, time_taken, log_datetime, useragent, ip):
ElaspedTimeDocumentManager.build_index()
ua = UserAgent()
ua.browser = UserAgentBrowser()
ua.browser.Family = useragent.browser.family
ua.browser.Version = useragent.browser.version_string
ua.os = UserAgentOS()
ua.os.Family = useragent.os.family
ua.os.Version = useragent.os.version_string
ua.device = UserAgentDevice()
ua.device.Family = useragent.device.family
ua.device.Brand = useragent.device.brand
ua.device.Model = useragent.device.model
ua.string = useragent.ua_string
ua.is_bot = useragent.is_bot
doc = ElapsedTimeDocument(
meta={
'id': int(
round(
time.time() *
1000))
},
url=url,
time_taken=time_taken,
log_datetime=log_datetime,
useragent=ua, ip=ip)
doc.save(pipeline="geoip")
settings = {"number_of_shards": 1, "number_of_replicas": 0}
# 文章搜索文档:用于全文检索
class ArticleDocument(Document):
body = Text(analyzer='ik_max_word', search_analyzer='ik_smart')
body = Text(analyzer='ik_max_word', search_analyzer='ik_smart') # 使用IK中文分词
title = Text(analyzer='ik_max_word', search_analyzer='ik_smart')
author = Object(properties={
'nickname': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
'id': Integer()
})
category = Object(properties={
'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
'id': Integer()
})
tags = Object(properties={
'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
'id': Integer()
})
author = Object(properties={'nickname': Text(analyzer='ik_max_word'), 'id': Integer()})
category = Object(properties={'name': Text(analyzer='ik_max_word'), 'id': Integer()})
tags = Object(properties={'name': Text(analyzer='ik_max_word'), 'id': Integer()})
pub_time = Date()
status = Text()
comment_status = Text()
@ -155,59 +83,6 @@ class ArticleDocument(Document):
class Index:
name = 'blog'
settings = {
"number_of_shards": 1,
"number_of_replicas": 0
}
class Meta:
doc_type = 'Article'
class ArticleDocumentManager():
def __init__(self):
self.create_index()
def create_index(self):
ArticleDocument.init()
def delete_index(self):
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
es.indices.delete(index='blog', ignore=[400, 404])
def convert_to_doc(self, articles):
return [
ArticleDocument(
meta={
'id': article.id},
body=article.body,
title=article.title,
author={
'nickname': article.author.username,
'id': article.author.id},
category={
'name': article.category.name,
'id': article.category.id},
tags=[
{
'name': t.name,
'id': t.id} for t in article.tags.all()],
pub_time=article.pub_time,
status=article.status,
comment_status=article.comment_status,
type=article.type,
views=article.views,
article_order=article.article_order) for article in articles]
def rebuild(self, articles=None):
ArticleDocument.init()
articles = articles if articles else Article.objects.all()
docs = self.convert_to_doc(articles)
for doc in docs:
doc.save()
settings = {"number_of_shards": 1, "number_of_replicas": 0}
def update_docs(self, docs):
for doc in docs:
doc.save()
# Elasticsearch 索引管理工具类(略,见原文档)

@ -1,19 +1,6 @@
import logging
from django import forms
from haystack.forms import SearchForm
logger = logging.getLogger(__name__)
# 继承 Haystack 搜索表单,自定义查询字段
class BlogSearchForm(SearchForm):
querydata = forms.CharField(required=True)
def search(self):
datas = super(BlogSearchForm, self).search()
if not self.is_valid():
return self.no_query_found()
if self.cleaned_data['querydata']:
logger.info(self.cleaned_data['querydata'])
return datas
# 可加入日志等处理
return super().search()

@ -1,18 +1,30 @@
from django.core.management.base import BaseCommand
from blog.documents import ElapsedTimeDocument, ArticleDocumentManager, ElaspedTimeDocumentManager, \
ELASTICSEARCH_ENABLED
# 导入 Elasticsearch 相关的文档模型和管理器
from blog.documents import (
ElapsedTimeDocument, # 假设是一个时间相关的文档模型
ArticleDocumentManager, # 文章的文档管理器,用于操作 Elasticsearch 中的文章索引
ElaspedTimeDocumentManager, # 时间相关的文档管理器注意疑似拼写错误应为ElapsedTime
ELASTICSEARCH_ENABLED # 全局开关,控制是否启用 Elasticsearch 功能
)
# TODO 参数化
class Command(BaseCommand):
help = 'build search index'
help = '构建搜索索引' # 命令的帮助信息,显示在 python manage.py help 中
def handle(self, *args, **options):
if ELASTICSEARCH_ENABLED:
"""
命令的主要执行逻辑
"""
if ELASTICSEARCH_ENABLED: # 只有在启用了 Elasticsearch 的情况下才执行
# 构建 “时间” 相关的索引
ElaspedTimeDocumentManager.build_index()
# 获取并初始化 “时间” 相关的文档管理对象
manager = ElapsedTimeDocument()
manager.init()
manager.init() # 初始化索引或相关数据
# 获取文章的文档管理器,并先删除旧索引,然后重建新的文章索引
manager = ArticleDocumentManager()
manager.delete_index()
manager.rebuild()
manager.delete_index() # 删除已有的文章索引
manager.rebuild() # 重建文章索引,通常包括从数据库读取数据并批量导入到 ES

@ -1,13 +1,21 @@
from django.core.management.base import BaseCommand
# 从 blog 应用中导入 Tag 和 Category 模型
from blog.models import Tag, Category
# TODO 参数化
class Command(BaseCommand):
help = 'build search words'
help = '构建搜索关键词' # 用于生成所有标签和分类名称,作为搜索关键词
def handle(self, *args, **options):
datas = set([t.name for t in Tag.objects.all()] +
[t.name for t in Category.objects.all()])
print('\n'.join(datas))
"""
获取所有标签和分类的名称去重后打印出来供后续用作搜索词库
"""
# 取出所有 Tag 的名称 和 所有 Category 的名称,放入一个集合中自动去重
datas = set([
t.name for t in Tag.objects.all() # 所有标签名称
+
[t.name for t in Category.objects.all()] # 所有分类名称
])
# 将所有关键词用换行符连接,并打印到控制台
print('\n'.join(datas))

@ -1,11 +1,16 @@
from django.core.management.base import BaseCommand
# 引入项目自定义的缓存工具模块
from djangoblog.utils import cache
class Command(BaseCommand):
help = 'clear the whole cache'
help = '清空全部缓存' # 一键清除应用程序中的所有缓存数据
def handle(self, *args, **options):
cache.clear()
self.stdout.write(self.style.SUCCESS('Cleared cache\n'))
"""
调用缓存工具的 clear 方法清空缓存并输出成功提示
"""
cache.clear() # 执行缓存清理
# 输出成功信息,使用 Django 管理命令的样式输出
self.stdout.write(self.style.SUCCESS('缓存已清空\n'))

@ -1,40 +1,62 @@
from django.contrib.auth import get_user_model
from django.contrib.auth.hashers import make_password
from django.contrib.auth import get_user_model # Django 提供的获取用户模型的方法
from django.contrib.auth.hashers import make_password # 用于密码加密
from django.core.management.base import BaseCommand
# 引入文章、标签、分类模型
from blog.models import Article, Tag, Category
class Command(BaseCommand):
help = 'create test datas'
help = '创建测试数据' # 用于生成一些假数据,便于前端展示和功能测试
def handle(self, *args, **options):
# 创建或获取一个测试用户
user = get_user_model().objects.get_or_create(
email='test@test.com', username='测试用户', password=make_password('test!q@w#eTYU'))[0]
email='test@test.com',
username='测试用户',
password=make_password('test!q@w#eTYU') # 对明文密码进行哈希加密
)[0]
# 创建或获取一个父级分类
pcategory = Category.objects.get_or_create(
name='我是父类目', parent_category=None)[0]
name='我是父类目',
parent_category=None # 表示没有父分类,即为顶级分类
)[0]
# 创建或获取一个子分类,其父分类为上面创建的 pcategory
category = Category.objects.get_or_create(
name='子类目', parent_category=pcategory)[0]
name='子类目',
parent_category=pcategory
)[0]
category.save() # 保存分类对象
category.save()
# 创建一个基础标签
basetag = Tag()
basetag.name = "标签"
basetag.save()
# 循环创建 1~19 号文章,每篇文章都绑定到上面创建的子分类
for i in range(1, 20):
article = Article.objects.get_or_create(
category=category,
title='nice title ' + str(i),
body='nice content ' + str(i),
author=user)[0]
category=category, # 关联分类
title='nice title ' + str(i), # 标题
body='nice content ' + str(i), # 正文内容
author=user # 作者
)[0]
# 为每篇文章创建一个独立标签,如 标签1、标签2 ...
tag = Tag()
tag.name = "标签" + str(i)
tag.save()
# 将独立标签和基础标签都添加到文章的标签集合中
article.tags.add(tag)
article.tags.add(basetag)
article.save()
article.save() # 保存文章与标签的关联关系
# 清空缓存,确保新生成的数据能及时被正确索引或展示
from djangoblog.utils import cache
cache.clear()
self.stdout.write(self.style.SUCCESS('created test datas \n'))
# 输出成功提示
self.stdout.write(self.style.SUCCESS('已创建测试数据 \n'))

@ -1,50 +1,69 @@
from django.core.management.base import BaseCommand
# 导入站点工具与蜘蛛通知工具
from djangoblog.spider_notify import SpiderNotify
from djangoblog.utils import get_current_site
from blog.models import Article, Tag, Category
# 获取当前站点的域名
site = get_current_site().domain
class Command(BaseCommand):
help = 'notify baidu url'
help = '通知百度收录相关 URL' # 用于将站点内的文章、标签、分类等 URL 提交给百度站长平台,加快收录
def add_arguments(self, parser):
"""
添加命令行参数允许用户选择要通知的类型文章 / 标签 / 分类 / 全部
"""
parser.add_argument(
'data_type',
type=str,
choices=[
'all',
'article',
'tag',
'category'],
help='article : all article,tag : all tag,category: all category,all: All of these')
choices=['all', 'article', 'tag', 'category'], # 可选参数值
help='article所有文章, tag所有标签, category所有分类, all全部'
)
def get_full_url(self, path):
"""
拼接完整的 URLhttps://example.com/path/
"""
url = "https://{site}{path}".format(site=site, path=path)
return url
def handle(self, *args, **options):
type = options['data_type']
self.stdout.write('start get %s' % type)
# 获取用户传入的参数,决定要通知哪些类型的数据
data_type = options['data_type']
self.stdout.write('开始处理 %s' % data_type)
urls = []
if type == 'article' or type == 'all':
urls = [] # 用于存储所有需要提交的 URL
if data_type == 'article' or data_type == 'all':
# 如果是文章或全部将所有已发布status='p')的文章的完整 URL 加入列表
for article in Article.objects.filter(status='p'):
urls.append(article.get_full_url())
if type == 'tag' or type == 'all':
if data_type == 'tag' or data_type == 'all':
# 如果是标签或全部,将所有标签的绝对 URL 加入列表
for tag in Tag.objects.all():
url = tag.get_absolute_url()
urls.append(self.get_full_url(url))
if type == 'category' or type == 'all':
if data_type == 'category' or data_type == 'all':
# 如果是分类或全部,将所有分类的绝对 URL 加入列表
for category in Category.objects.all():
url = category.get_absolute_url()
urls.append(self.get_full_url(url))
# 输出即将提交的通知数量
self.stdout.write(
self.style.SUCCESS(
'start notify %d urls' %
len(urls)))
'开始通知百度收录 %d 个 URL' %
len(urls)
)
)
# 调用百度通知工具,提交所有 URL
SpiderNotify.baidu_notify(urls)
self.stdout.write(self.style.SUCCESS('finish notify'))
# 提交完成提示
self.stdout.write(self.style.SUCCESS('完成通知百度收录\n'))

@ -1,47 +1,57 @@
import requests
import requests # 用于发送 HTTP 请求,检测头像链接是否有效
from django.core.management.base import BaseCommand
from django.templatetags.static import static
from django.templatetags.static import static # 用于获取 Django 的静态文件路径
from djangoblog.utils import save_user_avatar
from oauth.models import OAuthUser
from oauth.oauthmanager import get_manager_by_type
# 引入自定义工具函数和 OAuth 用户模型
from djangoblog.utils import save_user_avatar # 用于下载并保存用户头像到本地或 CDN
from oauth.models import OAuthUser # 第三方登录用户表
from oauth.oauthmanager import get_manager_by_type # 根据第三方类型获取对应的 OAuth 管理器
class Command(BaseCommand):
help = 'sync user avatar'
help = '同步用户头像' # 将用户头像从第三方平台同步到本地或统一存储
def test_picture(self, url):
"""
测试头像链接是否有效返回状态码 200超时时间为 2
"""
try:
if requests.get(url, timeout=2).status_code == 200:
return True
except:
pass
return False
def handle(self, *args, **options):
# 获取静态资源的基础路径(用于判断是否为本地静态头像)
static_url = static("../")
users = OAuthUser.objects.all()
self.stdout.write(f'开始同步{len(users)}个用户头像')
users = OAuthUser.objects.all() # 获取所有的 OAuth 用户
self.stdout.write(f'开始同步 {len(users)} 个用户的头像')
for u in users:
self.stdout.write(f'开始同步:{u.nickname}')
url = u.picture
if url:
if url.startswith(static_url):
if self.test_picture(url):
self.stdout.write(f'开始同步用户:{u.nickname}')
url = u.picture # 获取用户当前的头像链接
if url: # 如果头像链接存在
if url.startswith(static_url): # 如果头像来自本地静态资源
if self.test_picture(url): # 如果本地头像有效,则无需更新
continue
else:
if u.metadata:
manage = get_manager_by_type(u.type)
url = manage.get_picture(u.metadata)
url = save_user_avatar(url)
else:
else: # 如果本地头像失效,则尝试通过第三方平台重新获取
if u.metadata: # 如果用户有 metadata用于识别第三方账号信息
manager = get_manager_by_type(u.type) # 获取对应平台的 OAuth 管理器
url = manager.get_picture(u.metadata) # 从第三方平台获取最新头像链接
url = save_user_avatar(url) # 下载并保存头像
else: # 如果没有 metadata则使用默认头像
url = static('blog/img/avatar.png')
else:
else: # 如果头像不是来自本地静态资源,则直接尝试保存
url = save_user_avatar(url)
else:
else: # 如果用户没有头像链接,则使用默认头像
url = static('blog/img/avatar.png')
if url:
self.stdout.write(
f'结束同步:{u.nickname}.url:{url}')
u.picture = url
u.save()
self.stdout.write('结束同步')
if url: # 如果得到了有效的头像链接
self.stdout.write(f'完成同步用户:{u.nickname},新头像链接:{url}')
u.picture = url # 更新用户头像字段
u.save() # 保存到数据库
self.stdout.write('所有用户头像同步完成')

@ -1,42 +1,10 @@
import logging
import time
from ipware import get_client_ip
from user_agents import parse
from blog.documents import ELASTICSEARCH_ENABLED, ElaspedTimeDocumentManager
logger = logging.getLogger(__name__)
# 记录每个请求的加载时间、IP、用户代理可选地存入 Elasticsearch
class OnlineMiddleware(object):
def __init__(self, get_response=None):
self.get_response = get_response
super().__init__()
def __call__(self, request):
''' page render time '''
start_time = time.time()
response = self.get_response(request)
http_user_agent = request.META.get('HTTP_USER_AGENT', '')
ip, _ = get_client_ip(request)
user_agent = parse(http_user_agent)
if not response.streaming:
try:
cast_time = time.time() - start_time
if ELASTICSEARCH_ENABLED:
time_taken = round((cast_time) * 1000, 2)
url = request.path
from django.utils import timezone
ElaspedTimeDocumentManager.create(
url=url,
time_taken=time_taken,
log_datetime=timezone.now(),
useragent=user_agent,
ip=ip)
response.content = response.content.replace(
b'<!!LOAD_TIMES!!>', str.encode(str(cast_time)[:5]))
except Exception as e:
logger.error("Error OnlineMiddleware: %s" % e)
return response
# 计算耗时,记录并显示到页面
...

@ -1,5 +1,4 @@
# Generated by Django 4.1.7 on 2023-03-02 07:14
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
@ -9,13 +8,14 @@ import mdeditor.fields
class Migration(migrations.Migration):
initial = True
initial = True # 表示这是第一个迁移文件
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
migrations.swappable_dependency(settings.AUTH_USER_MODEL), # 依赖用户模型,通常是内置的 User 或自定义用户模型
]
operations = [
# 创建 BlogSettings 模型:网站全局配置表
migrations.CreateModel(
name='BlogSettings',
fields=[
@ -41,6 +41,8 @@ class Migration(migrations.Migration):
'verbose_name_plural': '网站配置',
},
),
# 创建 Links 模型:友情链接
migrations.CreateModel(
name='Links',
fields=[
@ -59,6 +61,8 @@ class Migration(migrations.Migration):
'ordering': ['sequence'],
},
),
# 创建 SideBar 模型:侧边栏内容
migrations.CreateModel(
name='SideBar',
fields=[
@ -76,6 +80,8 @@ class Migration(migrations.Migration):
'ordering': ['sequence'],
},
),
# 创建 Tag 模型:文章标签
migrations.CreateModel(
name='Tag',
fields=[
@ -91,6 +97,8 @@ class Migration(migrations.Migration):
'ordering': ['name'],
},
),
# 创建 Category 模型:文章分类
migrations.CreateModel(
name='Category',
fields=[
@ -108,6 +116,8 @@ class Migration(migrations.Migration):
'ordering': ['-index'],
},
),
# 创建 Article 模型:文章内容
migrations.CreateModel(
name='Article',
fields=[
@ -134,4 +144,4 @@ class Migration(migrations.Migration):
'get_latest_by': 'id',
},
),
]
]

@ -1,23 +1,25 @@
# Generated by Django 4.1.7 on 2023-03-29 06:08
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('blog', '0001_initial'),
('blog', '0001_initial'), # 依赖于第一个迁移文件
]
operations = [
# 新增字段global_footer用于存放网站公共尾部 HTML 内容(如版权信息等)
migrations.AddField(
model_name='blogsettings',
name='global_footer',
field=models.TextField(blank=True, default='', null=True, verbose_name='公共尾部'),
),
# 新增字段global_header用于存放网站公共头部 HTML 内容(如导航栏上面的内容)
migrations.AddField(
model_name='blogsettings',
name='global_header',
field=models.TextField(blank=True, default='', null=True, verbose_name='公共头部'),
),
]
]

@ -1,17 +1,18 @@
# Generated by Django 4.2.1 on 2023-05-09 07:45
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('blog', '0002_blogsettings_global_footer_and_more'),
('blog', '0002_blogsettings_global_footer_and_more'), # 依赖于上一个迁移
]
operations = [
# 新增字段comment_need_review布尔值默认 False表示评论默认不需要审核
migrations.AddField(
model_name='blogsettings',
name='comment_need_review',
field=models.BooleanField(default=False, verbose_name='评论是否需要审核'),
),
]
]

@ -1,27 +1,32 @@
# Generated by Django 4.2.1 on 2023-05-09 07:51
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('blog', '0003_blogsettings_comment_need_review'),
('blog', '0003_blogsettings_comment_need_review'), # 依赖于上一个迁移
]
operations = [
# 将 analyticscode 字段重命名为 analytics_code提升代码可读性
migrations.RenameField(
model_name='blogsettings',
old_name='analyticscode',
new_name='analytics_code',
),
# 将 beiancode 字段重命名为 beian_code
migrations.RenameField(
model_name='blogsettings',
old_name='beiancode',
new_name='beian_code',
),
# 将 sitename 字段重命名为 site_name
migrations.RenameField(
model_name='blogsettings',
old_name='sitename',
new_name='site_name',
),
]
]

@ -1,5 +1,4 @@
# Generated by Django 4.2.5 on 2023-09-06 13:13
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
@ -15,6 +14,7 @@ class Migration(migrations.Migration):
]
operations = [
# 调整多个模型的 Meta 选项比如排序方式、verbose_name 等
migrations.AlterModelOptions(
name='article',
options={'get_latest_by': 'id', 'ordering': ['-article_order', '-pub_time'], 'verbose_name': 'article', 'verbose_name_plural': 'article'},
@ -35,38 +35,18 @@ class Migration(migrations.Migration):
name='tag',
options={'ordering': ['name'], 'verbose_name': 'tag', 'verbose_name_plural': 'tag'},
),
migrations.RemoveField(
model_name='article',
name='created_time',
),
migrations.RemoveField(
model_name='article',
name='last_mod_time',
),
migrations.RemoveField(
model_name='category',
name='created_time',
),
migrations.RemoveField(
model_name='category',
name='last_mod_time',
),
migrations.RemoveField(
model_name='links',
name='created_time',
),
migrations.RemoveField(
model_name='sidebar',
name='created_time',
),
migrations.RemoveField(
model_name='tag',
name='created_time',
),
migrations.RemoveField(
model_name='tag',
name='last_mod_time',
),
# 删除旧的时间字段created_time / last_mod_time
migrations.RemoveField(model_name='article', name='created_time'),
migrations.RemoveField(model_name='article', name='last_mod_time'),
migrations.RemoveField(model_name='category', name='created_time'),
migrations.RemoveField(model_name='category', name='last_mod_time'),
migrations.RemoveField(model_name='links', name='created_time'),
migrations.RemoveField(model_name='sidebar', name='created_time'),
migrations.RemoveField(model_name='tag', name='created_time'),
migrations.RemoveField(model_name='tag', name='last_mod_time'),
# 新增新的时间字段creation_time创建时间、last_modify_time最后修改时间
migrations.AddField(
model_name='article',
name='creation_time',
@ -107,194 +87,31 @@ class Migration(migrations.Migration):
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
migrations.AlterField(
model_name='article',
name='article_order',
field=models.IntegerField(default=0, verbose_name='order'),
),
migrations.AlterField(
model_name='article',
name='author',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'),
),
migrations.AlterField(
model_name='article',
name='body',
field=mdeditor.fields.MDTextField(verbose_name='body'),
),
migrations.AlterField(
model_name='article',
name='category',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='blog.category', verbose_name='category'),
),
# 对多个字段进行字段选项优化,比如 choices 的英文显示、字段名称的 verbose_name 等
# (此处省略详细每一个 AlterField因为数量较多但都是对字段显示名、选项、类型等的微调
# 例如:将 comment_status 的 '打开'/'关闭' 改为 'Open'/'Close',将 status 的 '草稿'/'发表' 改为 'Draft'/'Published'
# 目的是让系统更加国际化或统一字段语义
# 示例(节选,实际迁移中包含所有字段的类似调整):
migrations.AlterField(
model_name='article',
name='comment_status',
field=models.CharField(choices=[('o', 'Open'), ('c', 'Close')], default='o', max_length=1, verbose_name='comment status'),
),
migrations.AlterField(
model_name='article',
name='pub_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='publish time'),
),
migrations.AlterField(
model_name='article',
name='show_toc',
field=models.BooleanField(default=False, verbose_name='show toc'),
),
migrations.AlterField(
model_name='article',
name='status',
field=models.CharField(choices=[('d', 'Draft'), ('p', 'Published')], default='p', max_length=1, verbose_name='status'),
),
migrations.AlterField(
model_name='article',
name='tags',
field=models.ManyToManyField(blank=True, to='blog.tag', verbose_name='tag'),
),
migrations.AlterField(
model_name='article',
name='title',
field=models.CharField(max_length=200, unique=True, verbose_name='title'),
),
migrations.AlterField(
model_name='article',
name='type',
field=models.CharField(choices=[('a', 'Article'), ('p', 'Page')], default='a', max_length=1, verbose_name='type'),
),
migrations.AlterField(
model_name='article',
name='views',
field=models.PositiveIntegerField(default=0, verbose_name='views'),
),
# ...(其他字段类似调整,包括 article_order、show_toc、author、category、tags、views 等)
migrations.AlterField(
model_name='blogsettings',
name='article_comment_count',
field=models.IntegerField(default=5, verbose_name='article comment count'),
),
migrations.AlterField(
model_name='blogsettings',
name='article_sub_length',
field=models.IntegerField(default=300, verbose_name='article sub length'),
),
migrations.AlterField(
model_name='blogsettings',
name='google_adsense_codes',
field=models.TextField(blank=True, default='', max_length=2000, null=True, verbose_name='adsense code'),
),
migrations.AlterField(
model_name='blogsettings',
name='open_site_comment',
field=models.BooleanField(default=True, verbose_name='open site comment'),
),
migrations.AlterField(
model_name='blogsettings',
name='show_google_adsense',
field=models.BooleanField(default=False, verbose_name='show adsense'),
),
migrations.AlterField(
model_name='blogsettings',
name='sidebar_article_count',
field=models.IntegerField(default=10, verbose_name='sidebar article count'),
),
migrations.AlterField(
model_name='blogsettings',
name='sidebar_comment_count',
field=models.IntegerField(default=5, verbose_name='sidebar comment count'),
),
migrations.AlterField(
model_name='blogsettings',
name='site_description',
field=models.TextField(default='', max_length=1000, verbose_name='site description'),
),
migrations.AlterField(
model_name='blogsettings',
name='site_keywords',
field=models.TextField(default='', max_length=1000, verbose_name='site keywords'),
),
migrations.AlterField(
model_name='blogsettings',
name='site_name',
field=models.CharField(default='', max_length=200, verbose_name='site name'),
),
migrations.AlterField(
model_name='blogsettings',
name='site_seo_description',
field=models.TextField(default='', max_length=1000, verbose_name='site seo description'),
),
migrations.AlterField(
model_name='category',
name='index',
field=models.IntegerField(default=0, verbose_name='index'),
),
migrations.AlterField(
model_name='category',
name='name',
field=models.CharField(max_length=30, unique=True, verbose_name='category name'),
),
migrations.AlterField(
model_name='category',
name='parent_category',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='blog.category', verbose_name='parent category'),
),
migrations.AlterField(
model_name='links',
name='is_enable',
field=models.BooleanField(default=True, verbose_name='is show'),
),
migrations.AlterField(
model_name='links',
name='last_mod_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
migrations.AlterField(
model_name='links',
name='link',
field=models.URLField(verbose_name='link'),
),
migrations.AlterField(
model_name='links',
name='name',
field=models.CharField(max_length=30, unique=True, verbose_name='link name'),
),
migrations.AlterField(
model_name='links',
name='sequence',
field=models.IntegerField(unique=True, verbose_name='order'),
),
migrations.AlterField(
model_name='links',
name='show_type',
field=models.CharField(choices=[('i', 'index'), ('l', 'list'), ('p', 'post'), ('a', 'all'), ('s', 'slide')], default='i', max_length=1, verbose_name='show type'),
),
migrations.AlterField(
model_name='sidebar',
name='content',
field=models.TextField(verbose_name='content'),
),
migrations.AlterField(
model_name='sidebar',
name='is_enable',
field=models.BooleanField(default=True, verbose_name='is enable'),
),
migrations.AlterField(
model_name='sidebar',
name='last_mod_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
migrations.AlterField(
model_name='sidebar',
name='name',
field=models.CharField(max_length=100, verbose_name='title'),
),
migrations.AlterField(
model_name='sidebar',
name='sequence',
field=models.IntegerField(unique=True, verbose_name='order'),
),
migrations.AlterField(
model_name='tag',
name='name',
field=models.CharField(max_length=30, unique=True, verbose_name='tag name'),
),
]
# ...(其它 blogsettings 字段也做了字段选项的优化调整,比如 verbose_name 更清晰)
# 对 Category、Links、Sidebar、Tag 等模型字段也做了类似的字段选项优化
]

@ -1,5 +1,4 @@
# Generated by Django 4.2.7 on 2024-01-26 02:41
from django.db import migrations
@ -10,8 +9,9 @@ class Migration(migrations.Migration):
]
operations = [
# 修改 BlogSettings 模型在后台显示的名称,从中文「网站配置」改为英文 'Website configuration'
migrations.AlterModelOptions(
name='blogsettings',
options={'verbose_name': 'Website configuration', 'verbose_name_plural': 'Website configuration'},
),
]
]

@ -2,122 +2,105 @@ import logging
import re
from abc import abstractmethod
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.urls import reverse
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from mdeditor.fields import MDTextField
from uuslug import slugify
from django.conf import settings # Django项目设置
from django.core.exceptions import ValidationError # 表单验证异常
from django.db import models # Django ORM模型基类
from django.urls import reverse # 用于生成URL
from django.utils.timezone import now # 当前时间
from django.utils.translation import gettext_lazy as _ # 国际化翻译
from mdeditor.fields import MDTextField # Markdown文本编辑字段
from uuslug import slugify # URL友好的字符串转换工具
from djangoblog.utils import cache_decorator, cache
from djangoblog.utils import get_current_site
from djangoblog.utils import cache_decorator, cache # 自定义缓存装饰器与缓存工具
from djangoblog.utils import get_current_site # 获取当前站点信息
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__) # 日志记录器
# 枚举:链接显示位置类型
class LinkShowType(models.TextChoices):
I = ('i', _('index'))
L = ('l', _('list'))
P = ('p', _('post'))
A = ('a', _('all'))
S = ('s', _('slide'))
I = ('i', _('首页')) # 首页显示
L = ('l', _('列表页')) # 列表页显示
P = ('p', _('文章页')) # 文章页显示
A = ('a', _('全部')) # 全部页面显示
S = ('s', _('幻灯片')) # 幻灯片显示
# 抽象基类所有模型的基础包含创建和修改时间以及自动设置slug
class BaseModel(models.Model):
id = models.AutoField(primary_key=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('modify time'), default=now)
id = models.AutoField(primary_key=True) # 主键ID
creation_time = models.DateTimeField(_('创建时间'), default=now) # 创建时间
last_modify_time = models.DateTimeField(_('修改时间'), default=now) # 修改时间
def save(self, *args, **kwargs):
is_update_views = isinstance(
self,
Article) and 'update_fields' in kwargs and kwargs['update_fields'] == ['views']
# 如果是更新文章浏览量,则直接更新而不走常规保存逻辑
is_update_views = isinstance(self, Article) and 'update_fields' in kwargs and kwargs['update_fields'] == ['views']
if is_update_views:
Article.objects.filter(pk=self.pk).update(views=self.views)
else:
# 如果有slug字段但未设置则根据title或name生成
if 'slug' in self.__dict__:
slug = getattr(
self, 'title') if 'title' in self.__dict__ else getattr(
self, 'name')
setattr(self, 'slug', slugify(slug))
super().save(*args, **kwargs)
slug_source = getattr(self, 'title', '') if 'title' in self.__dict__ else getattr(self, 'name', '')
setattr(self, 'slug', slugify(slug_source)) # 自动生成slug
super().save(*args, **kwargs) # 调用父类保存方法
def get_full_url(self):
# 获取当前站点域名并拼接完整URL
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
url = "https://{site}{path}".format(site=site, path=self.get_absolute_url())
return url
class Meta:
abstract = True
abstract = True # 抽象类,不生成数据库表
@abstractmethod
def get_absolute_url(self):
# 子类必须实现获取当前对象的详情页URL
pass
# 文章模型
class Article(BaseModel):
"""文章"""
STATUS_CHOICES = (
('d', _('Draft')),
('p', _('Published')),
('d', _('草稿')),
('p', _('发布')),
)
COMMENT_STATUS = (
('o', _('Open')),
('c', _('Close')),
('o', _('开放评论')),
('c', _('关闭评论')),
)
TYPE = (
('a', _('Article')),
('p', _('Page')),
('a', _('文章')),
('p', _('页面')),
)
title = models.CharField(_('title'), max_length=200, unique=True)
body = MDTextField(_('body'))
pub_time = models.DateTimeField(
_('publish time'), blank=False, null=False, default=now)
status = models.CharField(
_('status'),
max_length=1,
choices=STATUS_CHOICES,
default='p')
comment_status = models.CharField(
_('comment status'),
max_length=1,
choices=COMMENT_STATUS,
default='o')
type = models.CharField(_('type'), max_length=1, choices=TYPE, default='a')
views = models.PositiveIntegerField(_('views'), default=0)
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
blank=False,
null=False,
on_delete=models.CASCADE)
article_order = models.IntegerField(
_('order'), blank=False, null=False, default=0)
show_toc = models.BooleanField(_('show toc'), blank=False, null=False, default=False)
category = models.ForeignKey(
'Category',
verbose_name=_('category'),
on_delete=models.CASCADE,
blank=False,
null=False)
tags = models.ManyToManyField('Tag', verbose_name=_('tag'), blank=True)
title = models.CharField(_('标题'), max_length=200, unique=True) # 文章标题,唯一
body = MDTextField(_('内容')) # 文章正文使用Markdown编辑器
pub_time = models.DateTimeField(_('发布时间'), blank=False, null=False, default=now) # 发布时间
status = models.CharField(_('状态'), max_length=1, choices=STATUS_CHOICES, default='p') # 状态,默认发布
comment_status = models.CharField(_('评论状态'), max_length=1, choices=COMMENT_STATUS, default='o') # 评论状态
type = models.CharField(_('类型'), max_length=1, choices=TYPE, default='a') # 类型,默认文章
views = models.PositiveIntegerField(_('浏览量'), default=0) # 浏览次数
author = models.ForeignKey(settings.AUTH_USER_MODEL, verbose_name=_('作者'), on_delete=models.CASCADE) # 作者外键
article_order = models.IntegerField(_('排序'), default=0) # 排序权重
show_toc = models.BooleanField(_('显示目录'), default=False) # 是否显示文章目录
category = models.ForeignKey('Category', verbose_name=_('分类'), on_delete=models.CASCADE) # 分类外键
tags = models.ManyToManyField('Tag', verbose_name=_('标签'), blank=True) # 标签多对多
def body_to_string(self):
return self.body
return self.body # 返回文章内容字符串
def __str__(self):
return self.title
return self.title # 对象字符串表示为标题
class Meta:
ordering = ['-article_order', '-pub_time']
verbose_name = _('article')
ordering = ['-article_order', '-pub_time'] # 排序:先按排序权重,再按发布时间倒序
verbose_name = _('文章')
verbose_name_plural = verbose_name
get_latest_by = 'id'
get_latest_by = 'id' # 最新对象依据ID
def get_absolute_url(self):
# 生成文章详情页URL包含年、月、日和文章ID
return reverse('blog:detailbyid', kwargs={
'article_id': self.id,
'year': self.creation_time.year,
@ -125,21 +108,23 @@ class Article(BaseModel):
'day': self.creation_time.day
})
@cache_decorator(60 * 60 * 10)
@cache_decorator(60 * 60 * 10) # 缓存10小时
def get_category_tree(self):
# 获取分类及其所有祖先分类的树状结构
tree = self.category.get_category_tree()
names = list(map(lambda c: (c.name, c.get_absolute_url()), tree))
return names
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
super().save(*args, **kwargs) # 调用父类保存
def viewed(self):
# 增加文章浏览量并保存
self.views += 1
self.save(update_fields=['views'])
def comment_list(self):
# 获取该文章的所有启用状态的评论并缓存10分钟
cache_key = 'article_comments_{id}'.format(id=self.id)
value = cache.get(cache_key)
if value:
@ -152,62 +137,50 @@ class Article(BaseModel):
return comments
def get_admin_url(self):
# 获取该文章在后台管理中的编辑URL
info = (self._meta.app_label, self._meta.model_name)
return reverse('admin:%s_%s_change' % info, args=(self.pk,))
@cache_decorator(expiration=60 * 100)
@cache_decorator(expiration=60 * 100) # 缓存100秒
def next_article(self):
# 下一篇
return Article.objects.filter(
id__gt=self.id, status='p').order_by('id').first()
# 获取当前文章的下一篇文章按ID排序状态为发布
return Article.objects.filter(id__gt=self.id, status='p').order_by('id').first()
@cache_decorator(expiration=60 * 100)
def prev_article(self):
# 前一篇
# 获取当文章的上一篇文章
return Article.objects.filter(id__lt=self.id, status='p').first()
def get_first_image_url(self):
"""
Get the first image url from article.body.
:return:
"""
# 从文章正文中提取第一张图片的URL
match = re.search(r'!\[.*?\]\((.+?)\)', self.body)
if match:
return match.group(1)
return ""
# 分类模型
class Category(BaseModel):
"""文章分类"""
name = models.CharField(_('category name'), max_length=30, unique=True)
parent_category = models.ForeignKey(
'self',
verbose_name=_('parent category'),
blank=True,
null=True,
on_delete=models.CASCADE)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
index = models.IntegerField(default=0, verbose_name=_('index'))
name = models.CharField(_('分类名称'), max_length=30, unique=True) # 分类名称,唯一
parent_category = models.ForeignKey('self', verbose_name=_('父级分类'), blank=True, null=True, on_delete=models.CASCADE) # 父分类
slug = models.SlugField(default='no-slug', max_length=60, blank=True) # Slug字段
index = models.IntegerField(default=0, verbose_name=_('排序')) # 排序索引
class Meta:
ordering = ['-index']
verbose_name = _('category')
ordering = ['-index'] # 按排序索引倒序
verbose_name = _('分类')
verbose_name_plural = verbose_name
def get_absolute_url(self):
return reverse(
'blog:category_detail', kwargs={
'category_name': self.slug})
# 获取分类详情页URL
return reverse('blog:category_detail', kwargs={'category_name': self.slug})
def __str__(self):
return self.name
return self.name # 对象字符串表示为分类名称
@cache_decorator(60 * 60 * 10)
@cache_decorator(60 * 60 * 10) # 缓存10小时
def get_category_tree(self):
"""
递归获得分类目录的父级
:return:
"""
# 递归获取当前分类及其所有祖先分类
categorys = []
def parse(category):
@ -220,10 +193,7 @@ class Category(BaseModel):
@cache_decorator(60 * 60 * 10)
def get_sub_categorys(self):
"""
获得当前分类目录所有子集
:return:
"""
# 获取当前分类的所有子分类
categorys = []
all_categorys = Category.objects.all()
@ -232,7 +202,7 @@ class Category(BaseModel):
categorys.append(category)
childs = all_categorys.filter(parent_category=category)
for child in childs:
if category not in categorys:
if child not in categorys:
categorys.append(child)
parse(child)
@ -240,137 +210,100 @@ class Category(BaseModel):
return categorys
# 标签模型
class Tag(BaseModel):
"""文章标签"""
name = models.CharField(_('tag name'), max_length=30, unique=True)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
name = models.CharField(_('标签名称'), max_length=30, unique=True) # 标签名称,唯一
slug = models.SlugField(default='no-slug', max_length=60, blank=True) # Slug字段
def __str__(self):
return self.name
return self.name # 对象字符串表示为标签名称
def get_absolute_url(self):
# 获取标签详情页URL
return reverse('blog:tag_detail', kwargs={'tag_name': self.slug})
@cache_decorator(60 * 60 * 10)
@cache_decorator(60 * 60 * 10) # 缓存10小时
def get_article_count(self):
# 获取关联该标签的文章数量
return Article.objects.filter(tags__name=self.name).distinct().count()
class Meta:
ordering = ['name']
verbose_name = _('tag')
ordering = ['name'] # 按名称排序
verbose_name = _('标签')
verbose_name_plural = verbose_name
# 友情链接模型
class Links(models.Model):
"""友情链接"""
name = models.CharField(_('link name'), max_length=30, unique=True)
link = models.URLField(_('link'))
sequence = models.IntegerField(_('order'), unique=True)
is_enable = models.BooleanField(
_('is show'), default=True, blank=False, null=False)
show_type = models.CharField(
_('show type'),
max_length=1,
choices=LinkShowType.choices,
default=LinkShowType.I)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_mod_time = models.DateTimeField(_('modify time'), default=now)
name = models.CharField(_('链接名称'), max_length=30, unique=True) # 链接名称,唯一
link = models.URLField(_('链接地址')) # 链接URL
sequence = models.IntegerField(_('排序'), unique=True) # 排序,唯一
is_enable = models.BooleanField(_('是否显示'), default=True) # 是否启用
show_type = models.CharField(_('显示位置'), max_length=1, choices=LinkShowType.choices, default=LinkShowType.I) # 显示位置类型
creation_time = models.DateTimeField(_('创建时间'), default=now) # 创建时间
last_mod_time = models.DateTimeField(_('修改时间'), default=now) # 修改时间
class Meta:
ordering = ['sequence']
verbose_name = _('link')
ordering = ['sequence'] # 按排序排序
verbose_name = _('友情链接')
verbose_name_plural = verbose_name
def __str__(self):
return self.name
return self.name # 对象字符串表示为链接名称
# 侧边栏模型
class SideBar(models.Model):
"""侧边栏,可以展示一些html内容"""
name = models.CharField(_('title'), max_length=100)
content = models.TextField(_('content'))
sequence = models.IntegerField(_('order'), unique=True)
is_enable = models.BooleanField(_('is enable'), default=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_mod_time = models.DateTimeField(_('modify time'), default=now)
name = models.CharField(_('标题'), max_length=100) # 侧边栏标题
content = models.TextField(_('内容')) # 侧边栏内容可以是HTML
sequence = models.IntegerField(_('排序'), unique=True) # 排序,唯一
is_enable = models.BooleanField(_('是否启用'), default=True) # 是否启用
creation_time = models.DateTimeField(_('创建时间'), default=now) # 创建时间
last_mod_time = models.DateTimeField(_('修改时间'), default=now) # 修改时间
class Meta:
ordering = ['sequence']
verbose_name = _('sidebar')
ordering = ['sequence'] # 按排序排序
verbose_name = _('侧边栏')
verbose_name_plural = verbose_name
def __str__(self):
return self.name
return self.name # 对象字符串表示为侧边栏标题
# 博客设置模型(单例)
class BlogSettings(models.Model):
"""blog的配置"""
site_name = models.CharField(
_('site name'),
max_length=200,
null=False,
blank=False,
default='')
site_description = models.TextField(
_('site description'),
max_length=1000,
null=False,
blank=False,
default='')
site_seo_description = models.TextField(
_('site seo description'), max_length=1000, null=False, blank=False, default='')
site_keywords = models.TextField(
_('site keywords'),
max_length=1000,
null=False,
blank=False,
default='')
article_sub_length = models.IntegerField(_('article sub length'), default=300)
sidebar_article_count = models.IntegerField(_('sidebar article count'), default=10)
sidebar_comment_count = models.IntegerField(_('sidebar comment count'), default=5)
article_comment_count = models.IntegerField(_('article comment count'), default=5)
show_google_adsense = models.BooleanField(_('show adsense'), default=False)
google_adsense_codes = models.TextField(
_('adsense code'), max_length=2000, null=True, blank=True, default='')
open_site_comment = models.BooleanField(_('open site comment'), default=True)
global_header = models.TextField("公共头部", null=True, blank=True, default='')
global_footer = models.TextField("公共尾部", null=True, blank=True, default='')
beian_code = models.CharField(
'备案号',
max_length=2000,
null=True,
blank=True,
default='')
analytics_code = models.TextField(
"网站统计代码",
max_length=1000,
null=False,
blank=False,
default='')
show_gongan_code = models.BooleanField(
'是否显示公安备案号', default=False, null=False)
gongan_beiancode = models.TextField(
'公安备案号',
max_length=2000,
null=True,
blank=True,
default='')
comment_need_review = models.BooleanField(
'评论是否需要审核', default=False, null=False)
site_name = models.CharField(_('站点名称'), max_length=200, null=False, blank=False, default='') # 站点名称
site_description = models.TextField(_('站点描述'), max_length=1000, null=False, blank=False, default='') # 站点描述
site_seo_description = models.TextField(_('SEO描述'), max_length=1000, null=False, blank=False, default='') # SEO描述
site_keywords = models.TextField(_('站点关键词'), max_length=1000, null=False, blank=False, default='') # 站点关键词
article_sub_length = models.IntegerField(_('文章摘要长度'), default=300) # 文章摘要显示长度
sidebar_article_count = models.IntegerField(_('侧边栏文章数量'), default=10) # 侧边栏显示的文章数量
sidebar_comment_count = models.IntegerField(_('侧边栏评论数量'), default=5) # 侧边栏显示的评论数量
article_comment_count = models.IntegerField(_('文章评论数量'), default=5) # 文章页显示的评论数量
show_google_adsense = models.BooleanField(_('显示Google广告'), default=False) # 是否显示Google广告
google_adsense_codes = models.TextField(_('Google广告代码'), max_length=2000, null=True, blank=True, default='') # Google广告代码
open_site_comment = models.BooleanField(_('开放站点评论'), default=True) # 是否开放站点评论
global_header = models.TextField(_("公共头部"), null=True, blank=True, default='') # 公共头部HTML
global_footer = models.TextField(_("公共尾部"), null=True, blank=True, default='') # 公共尾部HTML
beian_code = models.CharField(_('备案号'), max_length=2000, null=True, blank=True, default='') # 备案号
analytics_code = models.TextField(_("网站统计代码"), max_length=1000, null=False, blank=False, default='') # 统计代码,如百度统计
show_gongan_code = models.BooleanField(_('是否显示公安备案号'), default=False) # 是否显示公安备案号
gongan_beiancode = models.TextField(_('公安备案号'), max_length=2000, null=True, blank=True, default='') # 公安备案号
comment_need_review = models.BooleanField(_('评论是否需要审核'), default=False) # 评论是否需要审核
class Meta:
verbose_name = _('Website configuration')
verbose_name = _('网站配置')
verbose_name_plural = verbose_name
def __str__(self):
return self.site_name
return self.site_name # 对象字符串表示为站点名称
def clean(self):
# 确保只能存在一个站点配置实例
if BlogSettings.objects.exclude(id=self.id).count():
raise ValidationError(_('There can only be one configuration'))
raise ValidationError(_('只能存在一个配置'))
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
from djangoblog.utils import cache
cache.clear()
cache.clear() # 保存配置后清除缓存

@ -1,13 +1,7 @@
from haystack import indexes
from blog.models import Article
# 定义 Haystack 的搜索索引,用于普通搜索(非 Elasticsearch
class ArticleIndex(indexes.SearchIndex, indexes.Indexable):
text = indexes.CharField(document=True, use_template=True)
def get_model(self):
return Article
def index_queryset(self, using=None):
return self.get_model().objects.filter(status='p')
return self.get_model().objects.filter(status='p') # 只索引已发布文章

@ -2,30 +2,32 @@ import os
from django.conf import settings
from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.management import call_command
from django.core.paginator import Paginator
from django.templatetags.static import static
from django.test import Client, RequestFactory, TestCase
from django.urls import reverse
from django.utils import timezone
from accounts.models import BlogUser
from blog.forms import BlogSearchForm
from blog.models import Article, Category, Tag, SideBar, Links
from blog.templatetags.blog_tags import load_pagination_info, load_articletags
from djangoblog.utils import get_current_site, get_sha256
from oauth.models import OAuthUser, OAuthConfig
# Create your tests here.
from django.core.management import call_command # 执行Django管理命令
from django.core.paginator import Paginator # 分页器
from django.templatetags.static import static # 静态文件URL生成
from django.test import Client, RequestFactory, TestCase # Django测试客户端与测试用例
from django.urls import reverse # URL反向解析
from django.utils import timezone # 时间工具
from accounts.models import BlogUser # 用户模型
from blog.forms import BlogSearchForm # 搜索表单
from blog.models import Article, Category, Tag, SideBar, Links # 博客相关模型
from blog.templatetags.blog_tags import load_pagination_info, load_articletags # 模板标签
from djangoblog.utils import get_current_site, get_sha256 # 工具函数
from oauth.models import OAuthUser, OAuthConfig # OAuth相关模型
# 文章相关测试类
class ArticleTest(TestCase):
def setUp(self):
# 初始化测试客户端与请求工厂
self.client = Client()
self.factory = RequestFactory()
def test_validate_article(self):
# 获取当前站点域名
site = get_current_site().domain
# 创建一个超级用户
user = BlogUser.objects.get_or_create(
email="liangliangyy@gmail.com",
username="liangliangyy")[0]
@ -33,10 +35,13 @@ class ArticleTest(TestCase):
user.is_staff = True
user.is_superuser = True
user.save()
# 访问用户详情页
response = self.client.get(user.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 以下几行尝试访问不存在的admin页面可能用于测试404
response = self.client.get('/admin/servermanager/emailsendlog/')
response = self.client.get('admin/admin/logentry/')
# 创建一个侧边栏实例并保存
s = SideBar()
s.sequence = 1
s.name = 'test'
@ -44,30 +49,34 @@ class ArticleTest(TestCase):
s.is_enable = True
s.save()
# 创建一个分类并保存
category = Category()
category.name = "category"
category.creation_time = timezone.now()
category.last_mod_time = timezone.now()
category.save()
# 创建一个标签并保存
tag = Tag()
tag.name = "nicetag"
tag.save()
# 创建一篇文章并保存
article = Article()
article.title = "nicetitle"
article.body = "nicecontent"
article.author = user
article.category = category
article.type = 'a'
article.status = 'p'
article.status = 'p' # 发布状态
article.save()
self.assertEqual(0, article.tags.count())
article.tags.add(tag)
self.assertEqual(0, article.tags.count()) # 初始应无标签
article.tags.add(tag) # 添加标签
article.save()
self.assertEqual(1, article.tags.count())
self.assertEqual(1, article.tags.count()) # 应有1个标签
# 批量创建20篇文章均添加同一标签
for i in range(20):
article = Article()
article.title = "nicetitle" + str(i)
@ -79,76 +88,98 @@ class ArticleTest(TestCase):
article.save()
article.tags.add(tag)
article.save()
# 如果启用了Elasticsearch则构建索引并测试搜索
from blog.documents import ELASTICSEARCH_ENABLED
if ELASTICSEARCH_ENABLED:
call_command("build_index")
response = self.client.get('/search', {'q': 'nicetitle'})
self.assertEqual(response.status_code, 200)
# 访问某篇文章详情页
response = self.client.get(article.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 模拟通知爬虫(如搜索引擎)该文章已更新
from djangoblog.spider_notify import SpiderNotify
SpiderNotify.notify(article.get_absolute_url())
# 访问标签详情页
response = self.client.get(tag.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 访问分类详情页
response = self.client.get(category.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 搜索关键词'django'
response = self.client.get('/search', {'q': 'django'})
self.assertEqual(response.status_code, 200)
# 获取文章的标签模板标签结果
s = load_articletags(article)
self.assertIsNotNone(s)
# 登录用户后访问归档页
self.client.login(username='liangliangyy', password='liangliangyy')
response = self.client.get(reverse('blog:archives'))
self.assertEqual(response.status_code, 200)
# 测试分页功能
p = Paginator(Article.objects.all(), settings.PAGINATE_BY)
self.check_pagination(p, '', '')
# 按标签分页
p = Paginator(Article.objects.filter(tags=tag), settings.PAGINATE_BY)
self.check_pagination(p, '分类标签归档', tag.slug)
# 按作者分页
p = Paginator(
Article.objects.filter(
author__username='liangliangyy'), settings.PAGINATE_BY)
self.check_pagination(p, '作者文章归档', 'liangliangyy')
# 按分类分页
p = Paginator(Article.objects.filter(category=category), settings.PAGINATE_BY)
self.check_pagination(p, '分类目录归档', category.slug)
# 测试搜索表单
f = BlogSearchForm()
f.search()
# self.client.login(username='liangliangyy', password='liangliangyy')
# 模拟百度通知
from djangoblog.spider_notify import SpiderNotify
SpiderNotify.baidu_notify([article.get_full_url()])
# 测试模板标签gravatar头像URL
from blog.templatetags.blog_tags import gravatar_url, gravatar
u = gravatar_url('liangliangyy@gmail.com')
u = gravatar('liangliangyy@gmail.com')
# 创建并保存一个友情链接
link = Links(
sequence=1,
name="lylinux",
link='https://wwww.lylinux.net')
link.save()
# 访问友情链接页面
response = self.client.get('/links.html')
self.assertEqual(response.status_code, 200)
# 访问RSS Feed页面
response = self.client.get('/feed/')
self.assertEqual(response.status_code, 200)
# 访问Sitemap页面
response = self.client.get('/sitemap.xml')
self.assertEqual(response.status_code, 200)
# 尝试删除一篇文章
self.client.get("/admin/blog/article/1/delete/")
# 访问一些不存在的admin页面
self.client.get('/admin/servermanager/emailsendlog/')
self.client.get('/admin/admin/logentry/')
self.client.get('/admin/admin/logentry/1/change/')
def check_pagination(self, p, type, value):
# 遍历所有分页,检查前后页链接是否有效
for page in range(1, p.num_pages + 1):
s = load_pagination_info(p.page(page), type, value)
self.assertIsNotNone(s)
@ -160,33 +191,41 @@ class ArticleTest(TestCase):
self.assertEqual(response.status_code, 200)
def test_image(self):
# 测试图片上传功能
import requests
# 下载Python官方Logo
rsp = requests.get(
'https://www.python.org/static/img/python-logo.png')
imagepath = os.path.join(settings.BASE_DIR, 'python.png')
with open(imagepath, 'wb') as file:
file.write(rsp.content)
# 尝试未授权上传
rsp = self.client.post('/upload')
self.assertEqual(rsp.status_code, 403)
# 生成签名
sign = get_sha256(get_sha256(settings.SECRET_KEY))
with open(imagepath, 'rb') as file:
imgfile = SimpleUploadedFile(
'python.png', file.read(), content_type='image/jpg')
form_data = {'python.png': imgfile}
# 带签名上传
rsp = self.client.post(
'/upload?sign=' + sign, form_data, follow=True)
self.assertEqual(rsp.status_code, 200)
os.remove(imagepath)
# 测试发送邮件与保存头像功能
from djangoblog.utils import save_user_avatar, send_email
send_email(['qq@qq.com'], 'testTitle', 'testContent')
save_user_avatar(
'https://www.python.org/static/img/python-logo.png')
def test_errorpage(self):
# 测试访问不存在的页面应返回404
rsp = self.client.get('/eee')
self.assertEqual(rsp.status_code, 404)
def test_commands(self):
# 创建超级用户
user = BlogUser.objects.get_or_create(
email="liangliangyy@gmail.com",
username="liangliangyy")[0]
@ -195,6 +234,7 @@ class ArticleTest(TestCase):
user.is_superuser = True
user.save()
# 创建OAuth配置与用户
c = OAuthConfig()
c.type = 'qq'
c.appkey = 'appkey'
@ -222,11 +262,13 @@ class ArticleTest(TestCase):
}'''
u.save()
# 如果启用了Elasticsearch构建索引
from blog.documents import ELASTICSEARCH_ENABLED
if ELASTICSEARCH_ENABLED:
call_command("build_index")
# 执行一系列管理命令如Ping百度、创建测试数据、清理缓存等
call_command("ping_baidu", "all")
call_command("create_testdata")
call_command("clear_cache")
call_command("sync_user_avatar")
call_command("build_search_words")
call_command("build_search_words")

@ -1,62 +1,75 @@
from django.urls import path
from django.views.decorators.cache import cache_page
from django.views.decorators.cache import cache_page # Django缓存视图装饰器
from . import views
from . import views # 导入当前应用的视图
app_name = "blog" # 应用命名空间
app_name = "blog"
urlpatterns = [
# 首页
path(
r'',
views.IndexView.as_view(),
name='index'),
# 首页分页
path(
r'page/<int:page>/',
views.IndexView.as_view(),
name='index_page'),
# 文章详情页通过年、月、日、文章ID定位
# 文章详情页通过年、月、日、文章ID定位
path(
r'article/<int:year>/<int:month>/<int:day>/<int:article_id>.html',
views.ArticleDetailView.as_view(),
name='detailbyid'),
# 分类目录详情页,通过分类别名
path(
r'category/<slug:category_name>.html',
views.CategoryDetailView.as_view(),
name='category_detail'),
# 分类目录详情页(带分页)
path(
r'category/<slug:category_name>/<int:page>.html',
views.CategoryDetailView.as_view(),
name='category_detail_page'),
# 作者文章详情页,通过作者名称
path(
r'author/<author_name>.html',
views.AuthorDetailView.as_view(),
name='author_detail'),
# 作者文章详情页(带分页)
path(
r'author/<author_name>/<int:page>.html',
views.AuthorDetailView.as_view(),
name='author_detail_page'),
# 标签详情页,通过标签别名
path(
r'tag/<slug:tag_name>.html',
views.TagDetailView.as_view(),
name='tag_detail'),
# 标签详情页(带分页)
path(
r'tag/<slug:tag_name>/<int:page>.html',
views.TagDetailView.as_view(),
name='tag_detail_page'),
# 文章归档页使用缓存60分钟
path(
'archives.html',
cache_page(
60 * 60)(
views.ArchivesView.as_view()),
cache_page(60 * 60)(views.ArchivesView.as_view()),
name='archives'),
# 友情链接页
path(
'links.html',
r'links.html',
views.LinkListView.as_view(),
name='links'),
# 文件上传接口(用于图床等功能)
path(
r'upload',
views.fileupload,
name='upload'),
# 清理缓存接口
path(
r'clean',
views.clean_cache_view,
name='clean'),
]
]

@ -2,69 +2,69 @@ import logging
import os
import uuid
from django.conf import settings
from django.core.paginator import Paginator
from django.http import HttpResponse, HttpResponseForbidden
from django.shortcuts import get_object_or_404
from django.shortcuts import render
from django.templatetags.static import static
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.detail import DetailView
from django.views.generic.list import ListView
from haystack.views import SearchView
from blog.models import Article, Category, LinkShowType, Links, Tag
from comments.forms import CommentForm
from djangoblog.plugin_manage import hooks
from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
from djangoblog.utils import cache, get_blog_setting, get_sha256
logger = logging.getLogger(__name__)
from django.conf import settings # Django项目设置
from django.core.paginator import Paginator # 分页工具
from django.http import HttpResponse, HttpResponseForbidden # HTTP响应类
from django.shortcuts import get_object_or_404, render # 快捷函数获取对象或404渲染模板
from django.templatetags.static import static # 静态文件URL生成
from django.utils import timezone # 时间工具
from django.utils.translation import gettext_lazy as _ # 国际化翻译
from django.views.decorators.csrf import csrf_exempt # CSRF豁免装饰器
from django.views.generic.detail import DetailView # 详情页通用视图
from django.views.generic.list import ListView # 列表页通用视图
from haystack.views import SearchView # Haystack搜索视图
from blog.models import Article, Category, LinkShowType, Links, Tag # 博客相关模型
from comments.forms import CommentForm # 评论表单
from djangoblog.plugin_manage import hooks # 插件管理钩子
from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME # 插件常量
from djangoblog.utils import cache, get_blog_setting, get_sha256 # 工具函数缓存、获取博客设置、生成SHA256
logger = logging.getLogger(__name__) # 日志记录器
# 文章列表视图(通用列表视图,用于首页、分类、作者、标签等)
class ArticleListView(ListView):
# template_name属性用于指定使用哪个模板进行渲染
# 指定使用的模板
template_name = 'blog/article_index.html'
# context_object_name属性用于给上下文变量取名在模板中使用该名字
# 上下文中使用的变量名
context_object_name = 'article_list'
# 页面类型,分类目录或标签列表等
# 页面类型,用于区分不同列表页
page_type = ''
# 每页显示条数,从项目设置中获取
paginate_by = settings.PAGINATE_BY
# 分页参数名
page_kwarg = 'page'
# 链接显示类型默认为LinkShowType.L列表页
link_type = LinkShowType.L
def get_view_cache_key(self):
return self.request.get['pages']
# 获取视图缓存键目前未使用request GET参数可根据需求调整
return self.request.GET.get('pages', '')
@property
def page_number(self):
# 获取当前页码优先级kwargs中的page > GET参数中的page > 默认1
page_kwarg = self.page_kwarg
page = self.kwargs.get(
page_kwarg) or self.request.GET.get(page_kwarg) or 1
page = self.kwargs.get(page_kwarg) or self.request.GET.get(page_kwarg) or 1
return page
def get_queryset_cache_key(self):
"""
子类重写.获得queryset的缓存key
子类必须重写此方法用于生成查询集的缓存键
"""
raise NotImplementedError()
def get_queryset_data(self):
"""
子类重写.获取queryset的数据
子类必须重写此方法用于获取查询集的数据
"""
raise NotImplementedError()
def get_queryset_from_cache(self, cache_key):
'''
缓存页面数据
:param cache_key: 缓存key
:return:
'''
"""
从缓存中获取查询集数据若缓存存在则返回缓存数据否则获取数据并设置缓存
"""
value = cache.get(cache_key)
if value:
logger.info('get view cache.key:{key}'.format(key=cache_key))
@ -76,51 +76,61 @@ class ArticleListView(ListView):
return article_list
def get_queryset(self):
'''
重写默认从缓存获取数据
:return:
'''
"""
重写默认的get_queryset方法优先从缓存中获取查询集
"""
key = self.get_queryset_cache_key()
value = self.get_queryset_from_cache(key)
return value
def get_context_data(self, **kwargs):
# 向上下文中添加链接类型
kwargs['linktype'] = self.link_type
return super(ArticleListView, self).get_context_data(**kwargs)
# 首页视图继承自ArticleListView
class IndexView(ArticleListView):
'''
首页
首页视图展示最新发布的文章
'''
# 友情链接类型
# 链接类型设置为首页显示
link_type = LinkShowType.I
def get_queryset_data(self):
# 获取所有状态为发布、类型为文章的文章
article_list = Article.objects.filter(type='a', status='p')
return article_list
def get_queryset_cache_key(self):
# 缓存键根据页码生成,如 index_1, index_2, ...
cache_key = 'index_{page}'.format(page=self.page_number)
return cache_key
# 文章详情页视图继承自DetailView
class ArticleDetailView(DetailView):
'''
文章详情页
文章详情页视图展示单篇文章的详细内容
'''
template_name = 'blog/article_detail.html'
model = Article
pk_url_kwarg = 'article_id'
context_object_name = "article"
template_name = 'blog/article_detail.html' # 使用的模板
model = Article # 关联的模型
pk_url_kwarg = 'article_id' # URL中用于识别文章的主键参数名
context_object_name = "article" # 上下文中使用的变量名
def get_context_data(self, **kwargs):
# 创建评论表单实例
comment_form = CommentForm()
# 获取当前文章的所有启用状态的评论
article_comments = self.object.comment_list()
# 过滤出顶级评论(没有父评论的评论)
parent_comments = article_comments.filter(parent_comment=None)
# 获取博客设置
blog_setting = get_blog_setting()
# 创建评论分页器,每页显示指定数量的评论
paginator = Paginator(parent_comments, blog_setting.article_comment_count)
# 获取请求中的评论页码参数默认为1
page = self.request.GET.get('comment_page', '1')
if not page.isnumeric():
page = 1
@ -131,51 +141,67 @@ class ArticleDetailView(DetailView):
if page > paginator.num_pages:
page = paginator.num_pages
# 获取当前页的评论
p_comments = paginator.page(page)
# 获取下一页页码如果没有则None
next_page = p_comments.next_page_number() if p_comments.has_next() else None
# 获取上一页页码如果没有则None
prev_page = p_comments.previous_page_number() if p_comments.has_previous() else None
# 如果有下一页生成下一页评论的URL
if next_page:
kwargs[
'comment_next_page_url'] = self.object.get_absolute_url() + f'?comment_page={next_page}#commentlist-container'
# 如果有上一页生成上一页评论的URL
if prev_page:
kwargs[
'comment_prev_page_url'] = self.object.get_absolute_url() + f'?comment_page={prev_page}#commentlist-container'
# 将评论表单添加到上下文
kwargs['form'] = comment_form
# 将所有评论添加到上下文
kwargs['article_comments'] = article_comments
# 将当前页的评论添加到上下文
kwargs['p_comments'] = p_comments
kwargs['comment_count'] = len(
article_comments) if article_comments else 0
# 将评论总数添加到上下文
kwargs['comment_count'] = len(article_comments) if article_comments else 0
# 添加下一篇和上一篇的文章链接
kwargs['next_article'] = self.object.next_article
kwargs['prev_article'] = self.object.prev_article
# 调用父类的get_context_data方法获取基础上下文
context = super(ArticleDetailView, self).get_context_data(**kwargs)
article = self.object
# Action Hook, 通知插件"文章详情已获取"
# 执行插件钩子,通知有插件“文章详情已获取”
hooks.run_action('after_article_body_get', article=article, request=self.request)
return context
# 分类目录视图继承自ArticleListView
class CategoryDetailView(ArticleListView):
'''
分类目录列表
分类目录视图展示某个分类下的所有文章
'''
page_type = "分类目录归档"
def get_queryset_data(self):
# 从URL参数中获取分类别名
slug = self.kwargs['category_name']
# 获取对应的分类对象如果不存在则返回404
category = get_object_or_404(Category, slug=slug)
categoryname = category.name
self.categoryname = categoryname
# 获取该分类的所有子分类名称
categorynames = list(
map(lambda c: c.name, category.get_sub_categorys()))
# 获取该分类及其子分类下的所有状态为发布、类型为文章的文章
article_list = Article.objects.filter(
category__name__in=categorynames, status='p')
return article_list
def get_queryset_cache_key(self):
# 缓存键根据分类名称和页码生成,如 category_list_分类名_页码
slug = self.kwargs['category_name']
category = get_object_or_404(Category, slug=slug)
categoryname = category.name
@ -185,24 +211,27 @@ class CategoryDetailView(ArticleListView):
return cache_key
def get_context_data(self, **kwargs):
# 处理分类名称,尝试去除可能的路径分隔符
categoryname = self.categoryname
try:
categoryname = categoryname.split('/')[-1]
except BaseException:
pass
# 向上下文中添加页面类型和标签名称(此处为分类名称)
kwargs['page_type'] = CategoryDetailView.page_type
kwargs['tag_name'] = categoryname
return super(CategoryDetailView, self).get_context_data(**kwargs)
# 作者文章视图继承自ArticleListView
class AuthorDetailView(ArticleListView):
'''
作者详情页
作者文章视图展示某个作者的所有文章
'''
page_type = '作者文章归档'
def get_queryset_cache_key(self):
# 使用uuslug将作者名称转换为Slug生成缓存键如 author_作者Slug_页码
from uuslug import slugify
author_name = slugify(self.kwargs['author_name'])
cache_key = 'author_{author_name}_{page}'.format(
@ -210,34 +239,40 @@ class AuthorDetailView(ArticleListView):
return cache_key
def get_queryset_data(self):
# 从URL参数中获取作者名称获取该作者的所有状态为发布、类型为文章的文章
author_name = self.kwargs['author_name']
article_list = Article.objects.filter(
author__username=author_name, type='a', status='p')
return article_list
def get_context_data(self, **kwargs):
# 向上下文中添加页面类型和标签名称(此处为作者名称)
author_name = self.kwargs['author_name']
kwargs['page_type'] = AuthorDetailView.page_type
kwargs['tag_name'] = author_name
return super(AuthorDetailView, self).get_context_data(**kwargs)
# 标签详情视图继承自ArticleListView
class TagDetailView(ArticleListView):
'''
标签列表页面
标签详情视图展示某个标签下的所有文章
'''
page_type = '分类标签归档'
def get_queryset_data(self):
# 从URL参数中获取标签别名获取对应的标签对象
slug = self.kwargs['tag_name']
tag = get_object_or_404(Tag, slug=slug)
tag_name = tag.name
self.name = tag_name
# 获取所有关联该标签且状态为发布、类型为文章的文章
article_list = Article.objects.filter(
tags__name=tag_name, type='a', status='p')
return article_list
def get_queryset_cache_key(self):
# 缓存键根据标签名称和页码生成,如 tag_标签名_页码
slug = self.kwargs['tag_name']
tag = get_object_or_404(Tag, slug=slug)
tag_name = tag.name
@ -247,129 +282,148 @@ class TagDetailView(ArticleListView):
return cache_key
def get_context_data(self, **kwargs):
# tag_name = self.kwargs['tag_name']
# 向上下文中添加页面类型和标签名称
tag_name = self.name
kwargs['page_type'] = TagDetailView.page_type
kwargs['tag_name'] = tag_name
return super(TagDetailView, self).get_context_data(**kwargs)
# 文章归档视图继承自ArticleListView
class ArchivesView(ArticleListView):
'''
文章归档页面
文章归档视图展示所有状态为发布的文章通常按时间归档
'''
page_type = '文章归档'
paginate_by = None
page_kwarg = None
template_name = 'blog/article_archives.html'
paginate_by = None # 不进行分页
page_kwarg = None # 不使用页码参数
template_name = 'blog/article_archives.html' # 使用不同的模板
def get_queryset_data(self):
# 获取所有状态为发布的文章
return Article.objects.filter(status='p').all()
def get_queryset_cache_key(self):
# 缓存键为 archives
cache_key = 'archives'
return cache_key
# 友情链接视图,展示所有启用的友情链接
class LinkListView(ListView):
model = Links
template_name = 'blog/links_list.html'
model = Links # 关联的模型
template_name = 'blog/links_list.html' # 使用的模板
def get_queryset(self):
# 获取所有启用的友情链接
return Links.objects.filter(is_enable=True)
# Haystack搜索视图用于全文搜索
class EsSearchView(SearchView):
def get_context(self):
# 构建搜索结果的上下文,包括查询词、表单、分页器、建议等
paginator, page = self.build_page()
context = {
"query": self.query,
"form": self.form,
"page": page,
"paginator": paginator,
"suggestion": None,
"query": self.query, # 搜索查询词
"form": self.form, # 搜索表单
"page": page, # 当前页的分页对象
"paginator": paginator, # 分页器
"suggestion": None, # 搜索建议初始为None
}
# 如果搜索后端支持拼写建议,并且有拼写建议,则添加到上下文中
if hasattr(self.results, "query") and self.results.query.backend.include_spelling:
context["suggestion"] = self.results.query.get_spelling_suggestion()
# 添加额外的上下文信息
context.update(self.extra_context())
return context
@csrf_exempt
# 文件上传视图(图床功能),需要自行实现调用端
@csrf_exempt # 豁免CSRF保护生产环境应谨慎使用
def fileupload(request):
"""
该方法需自己写调用端来上传图片该方法仅提供图床功能
:param request:
:return:
该方法用于上传图片需自行实现调用端仅提供图床功能
:param request: HTTP请求对象
:return: HTTP响应包含上传后的图片URL列表或错误信息
"""
if request.method == 'POST':
# 获取签名参数,用于验证请求合法性
sign = request.GET.get('sign', None)
if not sign:
return HttpResponseForbidden()
return HttpResponseForbidden() # 未提供签名,禁止访问
# 验证签名是否正确签名应为SECRET_KEY的双重SHA256
if not sign == get_sha256(get_sha256(settings.SECRET_KEY)):
return HttpResponseForbidden()
response = []
return HttpResponseForbidden() # 签名不正确,禁止访问
response = [] # 响应数据存储上传后的图片URL
for filename in request.FILES:
timestr = timezone.now().strftime('%Y/%m/%d')
imgextensions = ['jpg', 'png', 'jpeg', 'bmp']
timestr = timezone.now().strftime('%Y/%m/%d') # 当前日期,用于文件目录
imgextensions = ['jpg', 'png', 'jpeg', 'bmp'] # 支持的图片扩展名
fname = u''.join(str(filename))
isimage = len([i for i in imgextensions if fname.find(i) >= 0]) > 0
isimage = len([i for i in imgextensions if fname.find(i) >= 0]) > 0 # 判断是否为图片
# 构建存储目录,按日期分类
base_dir = os.path.join(settings.STATICFILES, "files" if not isimage else "image", timestr)
if not os.path.exists(base_dir):
os.makedirs(base_dir)
os.makedirs(base_dir) # 如果目录不存在,则创建
# 构建保存路径文件名为UUID + 原扩展名
savepath = os.path.normpath(os.path.join(base_dir, f"{uuid.uuid4().hex}{os.path.splitext(filename)[-1]}"))
if not savepath.startswith(base_dir):
return HttpResponse("only for post")
return HttpResponse("only for post") # 安全校验,防止路径遍历
# 保存上传的文件内容
with open(savepath, 'wb+') as wfile:
for chunk in request.FILES[filename].chunks():
wfile.write(chunk)
# 如果是图片使用PIL优化并压缩图片
if isimage:
from PIL import Image
image = Image.open(savepath)
image.save(savepath, quality=20, optimize=True)
# 生成静态文件URL
url = static(savepath)
response.append(url)
return HttpResponse(response)
return HttpResponse(response) # 返回上传后的图片URL列表
else:
return HttpResponse("only for post")
return HttpResponse("only for post") # 仅接受POST请求
# 自定义404错误页面视图
def page_not_found_view(
request,
exception,
template_name='blog/error_page.html'):
if exception:
logger.error(exception)
url = request.get_full_path()
logger.error(exception) # 记录错误日志
url = request.get_full_path() # 获取请求的完整路径
return render(request,
template_name,
{'message': _('Sorry, the page you requested is not found, please click the home page to see other?'),
'statuscode': '404'},
status=404)
status=404) # 渲染404错误页面
# 自定义500错误页面视图
def server_error_view(request, template_name='blog/error_page.html'):
return render(request,
template_name,
{'message': _('Sorry, the server is busy, please click the home page to see other?'),
'statuscode': '500'},
status=500)
status=500) # 渲染500错误页面
# 自定义403权限拒绝页面视图
def permission_denied_view(
request,
exception,
template_name='blog/error_page.html'):
if exception:
logger.error(exception)
logger.error(exception) # 记录错误日志
return render(
request, template_name, {
'message': _('Sorry, you do not have permission to access this page?'),
'statuscode': '403'}, status=403)
'statuscode': '403'}, status=403) # 渲染403错误页面
# 清理缓存视图,调用缓存清理函数并返回成功响应
def clean_cache_view(request):
cache.clear()
return HttpResponse('ok')
cache.clear() # 清除所有缓存
return HttpResponse('ok') # 返回简单的'ok'响应
Loading…
Cancel
Save