hyt_第五周注释

pull/10/head
hyt 4 months ago
parent 290d1fad9a
commit deb2fa6a9b

@ -0,0 +1,131 @@
from django import forms
from django.contrib import admin
from django.contrib.auth import get_user_model
from django.urls import reverse
from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
# Register your models here.
from .models import Article
class ArticleForm(forms.ModelForm):
# body = forms.CharField(widget=AdminPagedownWidget())
class Meta:
model = Article
fields = '__all__'
# 管理员动作函数 - 发布选中文章
def makr_article_publish(modeladmin, request, queryset):
"""将选中的文章状态设置为已发布"""
queryset.update(status='p')
# 管理员动作函数 - 将选中文章设为草稿
def draft_article(modeladmin, request, queryset):
"""将选中的文章状态设置为草稿"""
queryset.update(status='d')
# 管理员动作函数 - 关闭文章评论
def close_article_commentstatus(modeladmin, request, queryset):
"""关闭选中文章的评论功能"""
queryset.update(comment_status='c')
# 管理员动作函数 - 开启文章评论
def open_article_commentstatus(modeladmin, request, queryset):
"""开启选中文章的评论功能"""
queryset.update(comment_status='o')
# 设置管理员动作的显示名称
makr_article_publish.short_description = _('Publish selected articles')
draft_article.short_description = _('Draft selected articles')
close_article_commentstatus.short_description = _('Close article comments')
open_article_commentstatus.short_description = _('Open article comments')
class ArticlelAdmin(admin.ModelAdmin):
"""文章模型的后台管理配置"""
list_per_page = 20 # 每页显示20条记录
search_fields = ('body', 'title') # 搜索字段
form = ArticleForm # 使用自定义表单
list_display = (
'id',
'title',
'author',
'link_to_category',
'creation_time',
'views',
'status',
'type',
'article_order') # 列表页显示的字段
list_display_links = ('id', 'title') # 可点击链接的字段
list_filter = ('status', 'type', 'category') # 右侧过滤器
filter_horizontal = ('tags',) # 水平多选控件用于标签
exclude = ('creation_time', 'last_modify_time') # 排除的字段
view_on_site = True # 启用"在站点查看"功能
actions = [ # 管理员动作列表
makr_article_publish,
draft_article,
close_article_commentstatus,
open_article_commentstatus]
def link_to_category(self, obj):
"""生成分类的管理后台链接"""
info = (obj.category._meta.app_label, obj.category._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.category.id,))
return format_html(u'<a href="%s">%s</a>' % (link, obj.category.name))
link_to_category.short_description = _('category') # 设置列显示名称
def get_form(self, request, obj=None, **kwargs):
"""自定义表单,限制作者只能选择超级用户"""
form = super(ArticlelAdmin, self).get_form(request, obj, **kwargs)
form.base_fields['author'].queryset = get_user_model(
).objects.filter(is_superuser=True)
return form
def save_model(self, request, obj, form, change):
"""保存模型时的自定义逻辑"""
super(ArticlelAdmin, self).save_model(request, obj, form, change)
def get_view_on_site_url(self, obj=None):
"""获取"在站点查看"的URL"""
if obj:
url = obj.get_full_url() # 文章的完整URL
return url
else:
from djangoblog.utils import get_current_site
site = get_current_site().domain # 站点域名
return site
class TagAdmin(admin.ModelAdmin):
"""标签模型的后台管理配置"""
exclude = ('slug', 'last_mod_time', 'creation_time') # 排除自动生成的字段
class CategoryAdmin(admin.ModelAdmin):
"""分类模型的后台管理配置"""
list_display = ('name', 'parent_category', 'index') # 列表显示字段
exclude = ('slug', 'last_mod_time', 'creation_time') # 排除自动生成的字段
class LinksAdmin(admin.ModelAdmin):
"""友情链接模型的后台管理配置"""
exclude = ('last_mod_time', 'creation_time') # 排除时间字段
class SideBarAdmin(admin.ModelAdmin):
"""侧边栏模型的后台管理配置"""
list_display = ('name', 'content', 'is_enable', 'sequence') # 列表显示字段
exclude = ('last_mod_time', 'creation_time') # 排除时间字段
class BlogSettingsAdmin(admin.ModelAdmin):
"""博客设置模型的后台管理配置"""
pass # 使用默认管理配置

@ -0,0 +1,12 @@
from django.apps import AppConfig
class BlogConfig(AppConfig):
"""博客应用配置类
这个类用于配置Django博客应用的基本信息
它继承自Django的AppConfig基类用于定义应用的元数据和行为
"""
# 应用的完整Python路径Django使用这个名称来识别应用
name = 'blog'

@ -0,0 +1,86 @@
import logging
from django.utils import timezone
from djangoblog.utils import cache, get_blog_setting
from .models import Category, Article
logger = logging.getLogger(__name__)
def seo_processor(requests):
"""
SEO上下文处理器
这个函数是一个Django上下文处理器用于向所有模板传递SEO相关的变量
它使用缓存来提高性能避免每次请求都查询数据库
Args:
requests: Django请求对象包含当前请求的信息
Returns:
dict: 包含SEO和网站设置信息的字典这些变量将在所有模板中可用
"""
# 缓存键名
key = 'seo_processor'
# 尝试从缓存中获取数据
value = cache.get(key)
if value:
# 如果缓存存在,直接返回缓存数据
return value
else:
# 缓存不存在,重新生成数据
logger.info('set processor cache.')
# 获取博客全局设置
setting = get_blog_setting()
# 构建包含所有SEO和网站设置信息的字典
value = {
# 网站基本信息
'SITE_NAME': setting.site_name, # 网站名称
'SITE_SEO_DESCRIPTION': setting.site_seo_description, # SEO描述
'SITE_DESCRIPTION': setting.site_description, # 网站描述
'SITE_KEYWORDS': setting.site_keywords, # 网站关键词
# 网站URL相关
'SITE_BASE_URL': requests.scheme + '://' + requests.get_host() + '/', # 网站基础URL
# 文章相关设置
'ARTICLE_SUB_LENGTH': setting.article_sub_length, # 文章摘要长度
# 导航数据
'nav_category_list': Category.objects.all(), # 所有分类(用于导航菜单)
'nav_pages': Article.objects.filter(
type='p', # 页面类型
status='p'), # 已发布状态
# 评论系统设置
'OPEN_SITE_COMMENT': setting.open_site_comment, # 是否开启评论
'COMMENT_NEED_REVIEW': setting.comment_need_review, # 评论是否需要审核
# 备案信息
'BEIAN_CODE': setting.beian_code, # ICP备案号
"BEIAN_CODE_GONGAN": setting.gongan_beiancode, # 公安备案号
"SHOW_GONGAN_CODE": setting.show_gongan_code, # 是否显示公安备案
# 广告相关
'SHOW_GOOGLE_ADSENSE': setting.show_google_adsense, # 是否显示Google广告
'GOOGLE_ADSENSE_CODES': setting.google_adsense_codes, # Google广告代码
# 统计代码
'ANALYTICS_CODE': setting.analytics_code, # 网站统计代码(如百度统计)
# 时间信息
"CURRENT_YEAR": timezone.now().year, # 当前年份
# 全局页头页脚
"GLOBAL_HEADER": setting.global_header, # 全局头部HTML
"GLOBAL_FOOTER": setting.global_footer, # 全局尾部HTML
}
# 将数据存入缓存有效期10小时60 * 60 * 10秒
cache.set(key, value, 60 * 60 * 10)
return value

@ -0,0 +1,283 @@
import time
import elasticsearch.client
from django.conf import settings
from elasticsearch_dsl import Document, InnerDoc, Date, Integer, Long, Text, Object, GeoPoint, Keyword, Boolean
from elasticsearch_dsl.connections import connections
from blog.models import Article
# 检查是否启用了Elasticsearch配置
ELASTICSEARCH_ENABLED = hasattr(settings, 'ELASTICSEARCH_DSL')
if ELASTICSEARCH_ENABLED:
# 创建Elasticsearch连接
connections.create_connection(
hosts=[settings.ELASTICSEARCH_DSL['default']['hosts']])
from elasticsearch import Elasticsearch
# 初始化Elasticsearch客户端
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
from elasticsearch.client import IngestClient
# 创建Ingest管道客户端用于数据处理管道
c = IngestClient(es)
try:
# 检查是否已存在geoip管道
c.get_pipeline('geoip')
except elasticsearch.exceptions.NotFoundError:
# 如果不存在创建geoip管道用于IP地理位置解析
c.put_pipeline('geoip', body='''{
"description" : "Add geoip info",
"processors" : [
{
"geoip" : {
"field" : "ip"
}
}
]
}''')
class GeoIp(InnerDoc):
"""IP地理位置信息内嵌文档"""
continent_name = Keyword() # 大洲名称
country_iso_code = Keyword() # 国家ISO代码
country_name = Keyword() # 国家名称
location = GeoPoint() # 地理位置坐标
class UserAgentBrowser(InnerDoc):
"""用户代理浏览器信息"""
Family = Keyword() # 浏览器家族
Version = Keyword() # 浏览器版本
class UserAgentOS(UserAgentBrowser):
"""用户代理操作系统信息"""
pass # 继承自UserAgentBrowser具有相同的字段结构
class UserAgentDevice(InnerDoc):
"""用户代理设备信息"""
Family = Keyword() # 设备家族
Brand = Keyword() # 设备品牌
Model = Keyword() # 设备型号
class UserAgent(InnerDoc):
"""完整的用户代理信息"""
browser = Object(UserAgentBrowser, required=False) # 浏览器信息对象
os = Object(UserAgentOS, required=False) # 操作系统信息对象
device = Object(UserAgentDevice, required=False) # 设备信息对象
string = Text() # 原始用户代理字符串
is_bot = Boolean() # 是否为爬虫/机器人
class ElapsedTimeDocument(Document):
"""
性能监控文档 - 用于记录请求响应时间等性能数据
这个文档类型用于存储网站性能监控数据包括
- 请求URL和响应时间
- 用户IP和地理位置
- 用户代理信息
"""
url = Keyword() # 请求的URL
time_taken = Long() # 请求耗时(毫秒)
log_datetime = Date() # 日志时间
ip = Keyword() # 用户IP地址
geoip = Object(GeoIp, required=False) # IP地理位置信息
useragent = Object(UserAgent, required=False) # 用户代理信息
class Index:
"""索引配置"""
name = 'performance' # 索引名称
settings = {
"number_of_shards": 1, # 分片数量
"number_of_replicas": 0 # 副本数量
}
class Meta:
doc_type = 'ElapsedTime' # 文档类型
class ElaspedTimeDocumentManager:
"""性能监控文档管理器"""
@staticmethod
def build_index():
"""创建性能监控索引"""
from elasticsearch import Elasticsearch
client = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
res = client.indices.exists(index="performance")
if not res:
ElapsedTimeDocument.init() # 初始化索引映射
@staticmethod
def delete_index():
"""删除性能监控索引"""
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
es.indices.delete(index='performance', ignore=[400, 404]) # 忽略404错误
@staticmethod
def create(url, time_taken, log_datetime, useragent, ip):
"""
创建性能监控记录
Args:
url: 请求URL
time_taken: 请求耗时
log_datetime: 日志时间
useragent: 用户代理对象
ip: 用户IP地址
"""
ElaspedTimeDocumentManager.build_index()
# 构建用户代理信息
ua = UserAgent()
ua.browser = UserAgentBrowser()
ua.browser.Family = useragent.browser.family # 浏览器家族
ua.browser.Version = useragent.browser.version_string # 浏览器版本
ua.os = UserAgentOS()
ua.os.Family = useragent.os.family # 操作系统家族
ua.os.Version = useragent.os.version_string # 操作系统版本
ua.device = UserAgentDevice()
ua.device.Family = useragent.device.family # 设备家族
ua.device.Brand = useragent.device.brand # 设备品牌
ua.device.Model = useragent.device.model # 设备型号
ua.string = useragent.ua_string # 原始UA字符串
ua.is_bot = useragent.is_bot # 是否为机器人
# 创建文档并使用geoip管道处理IP地理位置
doc = ElapsedTimeDocument(
meta={
'id': int(round(time.time() * 1000)) # 使用时间戳作为文档ID
},
url=url,
time_taken=time_taken,
log_datetime=log_datetime,
useragent=ua,
ip=ip
)
doc.save(pipeline="geoip") # 使用geoip管道自动添加地理位置信息
class ArticleDocument(Document):
"""
文章搜索文档 - 用于Elasticsearch全文搜索
这个文档类型定义了文章在Elasticsearch中的索引结构
支持对文章标题内容作者分类标签等进行全文搜索
"""
body = Text(analyzer='ik_max_word', search_analyzer='ik_smart') # 文章内容使用IK中文分词器
title = Text(analyzer='ik_max_word', search_analyzer='ik_smart') # 文章标题使用IK中文分词器
author = Object(properties={
'nickname': Text(analyzer='ik_max_word', search_analyzer='ik_smart'), # 作者昵称
'id': Integer() # 作者ID
})
category = Object(properties={
'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'), # 分类名称
'id': Integer() # 分类ID
})
tags = Object(properties={
'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'), # 标签名称
'id': Integer() # 标签ID
})
# 文章元数据字段
pub_time = Date() # 发布时间
status = Text() # 文章状态(发布/草稿)
comment_status = Text() # 评论状态(开启/关闭)
type = Text() # 文章类型(文章/页面)
views = Integer() # 浏览次数
article_order = Integer() # 文章排序
class Index:
"""索引配置"""
name = 'blog' # 索引名称
settings = {
"number_of_shards": 1, # 分片数量
"number_of_replicas": 0 # 副本数量
}
class Meta:
doc_type = 'Article' # 文档类型
class ArticleDocumentManager():
"""文章文档管理器 - 负责文章搜索索引的创建、更新和管理"""
def __init__(self):
self.create_index()
def create_index(self):
"""创建文章搜索索引"""
ArticleDocument.init()
def delete_index(self):
"""删除文章搜索索引"""
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
es.indices.delete(index='blog', ignore=[400, 404]) # 忽略404错误
def convert_to_doc(self, articles):
"""
将Django文章对象转换为Elasticsearch文档对象
Args:
articles: Django文章查询集
Returns:
list: Elasticsearch文档对象列表
"""
return [
ArticleDocument(
meta={'id': article.id}, # 使用文章ID作为文档ID
body=article.body,
title=article.title,
author={
'nickname': article.author.username,
'id': article.author.id
},
category={
'name': article.category.name,
'id': article.category.id
},
tags=[
{'name': t.name, 'id': t.id} for t in article.tags.all() # 转换标签列表
],
pub_time=article.pub_time,
status=article.status,
comment_status=article.comment_status,
type=article.type,
views=article.views,
article_order=article.article_order
) for article in articles
]
def rebuild(self, articles=None):
"""
重建文章搜索索引
Args:
articles: 要索引的文章列表如果为None则索引所有文章
"""
ArticleDocument.init() # 重新初始化索引
articles = articles if articles else Article.objects.all() # 获取所有文章或指定文章
docs = self.convert_to_doc(articles) # 转换为文档对象
for doc in docs:
doc.save() # 保存到Elasticsearch
def update_docs(self, docs):
"""
更新文档索引
Args:
docs: 要更新的文档列表
"""
for doc in docs:
doc.save() # 保存更新到Elasticsearch

@ -0,0 +1,49 @@
import logging
from django import forms
from haystack.forms import SearchForm
logger = logging.getLogger(__name__)
class BlogSearchForm(SearchForm):
"""
博客搜索表单类
继承自Haystack的SearchForm用于处理博客文章的搜索功能
这个表单定义了搜索框的验证规则和搜索逻辑
"""
# 搜索查询字段,设置为必填字段
querydata = forms.CharField(required=True)
def search(self):
"""
执行搜索操作
重写父类的search方法添加自定义搜索逻辑
1. 调用父类的搜索方法获取基础搜索结果
2. 验证表单数据是否有效
3. 记录搜索关键词到日志
4. 返回搜索结果
Returns:
SearchQuerySet: 搜索结果的查询集
Raises:
如果表单无效返回空搜索结果
"""
# 调用父类的search方法获取基础搜索结果
datas = super(BlogSearchForm, self).search()
# 检查表单数据是否有效
if not self.is_valid():
# 如果表单无效,返回空搜索结果
return self.no_query_found()
# 如果搜索关键词存在,记录到日志中(用于搜索统计和分析)
if self.cleaned_data['querydata']:
logger.info(self.cleaned_data['querydata'])
# 返回搜索结果
return datas

@ -0,0 +1,104 @@
import logging
import time
from ipware import get_client_ip
from user_agents import parse
from blog.documents import ELASTICSEARCH_ENABLED, ElaspedTimeDocumentManager
logger = logging.getLogger(__name__)
class OnlineMiddleware(object):
"""
在线性能监控中间件
这个中间件用于监控网站的性能指标包括
- 页面渲染时间
- 用户访问信息
- 用户代理分析
- IP地理位置通过Elasticsearch geoip管道
继承自object是Django中间件的标准写法
"""
def __init__(self, get_response=None):
"""
初始化中间件
Args:
get_response: Django的下一个中间件或视图函数
"""
self.get_response = get_response
super().__init__()
def __call__(self, request):
"""
中间件主处理逻辑
这个方在每次请求时被调用用于
1. 记录请求开始时间
2. 执行后续中间件和视图
3. 计算页面渲染时间
4. 收集用户访问数据
5. 将数据存储到Elasticsearch如果启用
6. 在响应内容中插入加载时间
Args:
request: Django请求对象
Returns:
HttpResponse: 处理后的响应对象
"""
# 记录请求开始时间,用于计算页面渲染时间
start_time = time.time()
# 调用后续中间件和视图函数,获取响应
response = self.get_response(request)
# 从请求头中获取用户代理字符串
http_user_agent = request.META.get('HTTP_USER_AGENT', '')
# 获取客户端IP地址使用ipware库处理代理情况
ip, _ = get_client_ip(request)
# 解析用户代理字符串,获取浏览器、设备等信息
user_agent = parse(http_user_agent)
# 只处理非流式响应(避免对大文件下载等操作进行监控)
if not response.streaming:
try:
# 计算页面渲染总时间(秒)
cast_time = time.time() - start_time
# 如果启用了Elasticsearch记录性能数据
if ELASTICSEARCH_ENABLED:
# 将时间转换为毫秒并保留2位小数
time_taken = round((cast_time) * 1000, 2)
# 获取请求的URL路径
url = request.path
# 导入时区模块,获取当前时间
from django.utils import timezone
# 创建性能监控记录到Elasticsearch
ElaspedTimeDocumentManager.create(
url=url, # 请求URL
time_taken=time_taken, # 耗时(毫秒)
log_datetime=timezone.now(), # 记录时间
useragent=user_agent, # 用户代理信息
ip=ip # 客户端IP
)
# 在响应内容中替换加载时间占位符
# 将<!!LOAD_TIMES!!>替换为实际的加载时间取前5位
response.content = response.content.replace(
b'<!!LOAD_TIMES!!>', str.encode(str(cast_time)[:5]))
except Exception as e:
# 记录中间件执行过程中的任何错误
logger.error("Error OnlineMiddleware: %s" % e)
# 返回处理后的响应
return response

@ -0,0 +1,380 @@
import logging
import re
from abc import abstractmethod
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.urls import reverse
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from mdeditor.fields import MDTextField
from uuslug import slugify
from djangoblog.utils import cache_decorator, cache
from djangoblog.utils import get_current_site
logger = logging.getLogger(__name__)
class LinkShowType(models.TextChoices):
"""
链接显示类型选择
定义友情链接在网站中的显示位置
"""
I = ('i', _('index')) # 首页显示
L = ('l', _('list')) # 列表页显示
P = ('p', _('post')) # 文章页显示
A = ('a', _('all')) # 所有页面显示
S = ('s', _('slide')) # 幻灯片显示
class BaseModel(models.Model):
"""
基础模型类
所有模型的基类提供公共字段和方法
"""
id = models.AutoField(primary_key=True) # 自增主键
creation_time = models.DateTimeField(_('creation time'), default=now) # 创建时间
last_modify_time = models.DateTimeField(_('modify time'), default=now) # 最后修改时间
def save(self, *args, **kwargs):
"""
重写保存方法
处理文章浏览量更新和自动生成slug
"""
# 检查是否为文章视图更新操作(优化性能,避免完整保存)
is_update_views = isinstance(
self,
Article) and 'update_fields' in kwargs and kwargs['update_fields'] == ['views']
if is_update_views:
# 只更新浏览量字段,提高性能
Article.objects.filter(pk=self.pk).update(views=self.views)
else:
# 自动生成slugURL友好字符串
if 'slug' in self.__dict__:
slug = getattr(
self, 'title') if 'title' in self.__dict__ else getattr(
self, 'name')
setattr(self, 'slug', slugify(slug))
# 调用父类保存方法
super().save(*args, **kwargs)
def get_full_url(self):
"""获取完整的URL地址包含域名"""
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
return url
class Meta:
abstract = True # 抽象基类,不会创建数据库表
@abstractmethod
def get_absolute_url(self):
"""抽象方法获取对象的绝对URL子类必须实现"""
pass
class Article(BaseModel):
"""
文章模型
博客系统的核心模型存储所有文章内容
"""
# 文章状态选择
STATUS_CHOICES = (
('d', _('Draft')), # 草稿
('p', _('Published')), # 已发布
)
# 评论状态选择
COMMENT_STATUS = (
('o', _('Open')), # 开启评论
('c', _('Close')), # 关闭评论
)
# 内容类型选择
TYPE = (
('a', _('Article')), # 普通文章
('p', _('Page')), # 独立页面
)
# 基础字段
title = models.CharField(_('title'), max_length=200, unique=True) # 文章标题,唯一
body = MDTextField(_('body')) # 文章内容使用Markdown编辑器
pub_time = models.DateTimeField(_('publish time'), blank=False, null=False, default=now) # 发布时间
# 状态字段
status = models.CharField(_('status'), max_length=1, choices=STATUS_CHOICES, default='p') # 发布状态
comment_status = models.CharField(_('comment status'), max_length=1, choices=COMMENT_STATUS, default='o') # 评论状态
type = models.CharField(_('type'), max_length=1, choices=TYPE, default='a') # 内容类型
# 统计字段
views = models.PositiveIntegerField(_('views'), default=0) # 浏览次数
# 关联字段
author = models.ForeignKey(settings.AUTH_USER_MODEL, verbose_name=_('author'),
blank=False, null=False, on_delete=models.CASCADE) # 作者
article_order = models.IntegerField(_('order'), blank=False, null=False, default=0) # 文章排序
# 功能字段
show_toc = models.BooleanField(_('show toc'), blank=False, null=False, default=False) # 是否显示目录
category = models.ForeignKey('Category', verbose_name=_('category'),
on_delete=models.CASCADE, blank=False, null=False) # 分类
tags = models.ManyToManyField('Tag', verbose_name=_('tag'), blank=True) # 标签,多对多关系
def body_to_string(self):
"""将文章内容转换为字符串"""
return self.body
def __str__(self):
"""对象的字符串表示"""
return self.title
class Meta:
ordering = ['-article_order', '-pub_time'] # 默认按排序和发布时间降序排列
verbose_name = _('article') # 单数显示名称
verbose_name_plural = verbose_name # 复数显示名称
get_latest_by = 'id' # 获取最新记录的依据字段
def get_absolute_url(self):
"""获取文章的绝对URL包含年月日信息用于SEO"""
return reverse('blog:detailbyid', kwargs={
'article_id': self.id,
'year': self.creation_time.year,
'month': self.creation_time.month,
'day': self.creation_time.day
})
@cache_decorator(60 * 60 * 10) # 缓存10小时
def get_category_tree(self):
"""获取文章所属分类的树形结构,用于面包屑导航"""
tree = self.category.get_category_tree()
names = list(map(lambda c: (c.name, c.get_absolute_url()), tree))
return names
def save(self, *args, **kwargs):
"""保存文章,调用父类保存逻辑"""
super().save(*args, **kwargs)
def viewed(self):
"""增加文章浏览量使用update_fields优化性能"""
self.views += 1
self.save(update_fields=['views'])
def comment_list(self):
"""获取文章评论列表(带缓存)"""
cache_key = 'article_comments_{id}'.format(id=self.id)
value = cache.get(cache_key)
if value:
logger.info('get article comments:{id}'.format(id=self.id))
return value
else:
# 获取已启用的评论并按ID降序排列
comments = self.comment_set.filter(is_enable=True).order_by('-id')
cache.set(cache_key, comments, 60 * 100) # 缓存100分钟
logger.info('set article comments:{id}'.format(id=self.id))
return comments
def get_admin_url(self):
"""获取文章在Admin后台的URL"""
info = (self._meta.app_label, self._meta.model_name)
return reverse('admin:%s_%s_change' % info, args=(self.pk,))
@cache_decorator(expiration=60 * 100) # 缓存100分钟
def next_article(self):
"""获取下一篇文章按ID顺序"""
return Article.objects.filter(id__gt=self.id, status='p').order_by('id').first()
@cache_decorator(expiration=60 * 100) # 缓存100分钟
def prev_article(self):
"""获取上一篇文章按ID顺序"""
return Article.objects.filter(id__lt=self.id, status='p').first()
def get_first_image_url(self):
"""
从文章内容中提取第一张图片的URL
用于文章列表的缩略图显示
"""
# 使用正则表达式匹配Markdown图片语法
match = re.search(r'!\[.*?\]\((.+?)\)', self.body)
if match:
return match.group(1)
return ""
class Category(BaseModel):
"""
文章分类模型
用于组织和管理博客文章的类别支持多级分类结构
"""
name = models.CharField(_('category name'), max_length=30, unique=True) # 分类名称,唯一
parent_category = models.ForeignKey('self', verbose_name=_('parent category'),
blank=True, null=True, on_delete=models.CASCADE) # 父级分类,支持层级结构
slug = models.SlugField(default='no-slug', max_length=60, blank=True) # URL友好名称
index = models.IntegerField(default=0, verbose_name=_('index')) # 分类排序索引
class Meta:
ordering = ['-index'] # 按索引降序排列
verbose_name = _('category') # 单数显示名称
verbose_name_plural = verbose_name # 复数显示名称
def get_absolute_url(self):
"""获取分类的绝对URL地址使用slug作为URL参数"""
return reverse('blog:category_detail', kwargs={'category_name': self.slug})
def __str__(self):
"""对象的字符串表示"""
return self.name
@cache_decorator(60 * 60 * 10) # 缓存10小时
def get_category_tree(self):
"""
递归获得分类目录的父级
返回从当前分类到根分类的路径用于面包屑导航
"""
categorys = []
def parse(category):
categorys.append(category)
if category.parent_category:
parse(category.parent_category)
parse(self)
return categorys
@cache_decorator(60 * 60 * 10) # 缓存10小时
def get_sub_categorys(self):
"""
获得当前分类目录所有子集
返回所有子分类的列表
"""
categorys = []
all_categorys = Category.objects.all()
def parse(category):
if category not in categorys:
categorys.append(category)
childs = all_categorys.filter(parent_category=category)
for child in childs:
if category not in categorys:
categorys.append(child)
parse(child)
parse(self)
return categorys
class Tag(BaseModel):
"""文章标签模型"""
name = models.CharField(_('tag name'), max_length=30, unique=True) # 标签名称,唯一
slug = models.SlugField(default='no-slug', max_length=60, blank=True) # URL友好名称
def __str__(self):
return self.name
def get_absolute_url(self):
"""获取标签的绝对URL使用slug作为URL参数"""
return reverse('blog:tag_detail', kwargs={'tag_name': self.slug})
@cache_decorator(60 * 60 * 10) # 缓存10小时
def get_article_count(self):
"""获取该标签下的文章数量使用distinct去重"""
return Article.objects.filter(tags__name=self.name).distinct().count()
class Meta:
ordering = ['name'] # 按名称升序排列
verbose_name = _('tag') # 单数显示名称
verbose_name_plural = verbose_name # 复数显示名称
class Links(models.Model):
"""友情链接模型"""
name = models.CharField(_('link name'), max_length=30, unique=True) # 链接名称,唯一
link = models.URLField(_('link')) # 链接地址
sequence = models.IntegerField(_('order'), unique=True) # 显示顺序,唯一
is_enable = models.BooleanField(_('is show'), default=True, blank=False, null=False) # 是否启用
show_type = models.CharField(_('show type'), max_length=1, choices=LinkShowType.choices,
default=LinkShowType.I) # 显示类型
creation_time = models.DateTimeField(_('creation time'), default=now) # 创建时间
last_mod_time = models.DateTimeField(_('modify time'), default=now) # 最后修改时间
class Meta:
ordering = ['sequence'] # 按顺序升序排列
verbose_name = _('link') # 单数显示名称
verbose_name_plural = verbose_name # 复数显示名称
def __str__(self):
return self.name
class SideBar(models.Model):
"""侧边栏模型可以展示一些html内容"""
name = models.CharField(_('title'), max_length=100) # 侧边栏标题
content = models.TextField(_('content')) # 侧边栏内容HTML
sequence = models.IntegerField(_('order'), unique=True) # 显示顺序,唯一
is_enable = models.BooleanField(_('is enable'), default=True) # 是否启用
creation_time = models.DateTimeField(_('creation time'), default=now) # 创建时间
last_mod_time = models.DateTimeField(_('modify time'), default=now) # 最后修改时间
class Meta:
ordering = ['sequence'] # 按顺序升序排列
verbose_name = _('sidebar') # 单数显示名称
verbose_name_plural = verbose_name # 复数显示名称
def __str__(self):
return self.name
class BlogSettings(models.Model):
"""博客全局配置模型,使用单例模式确保只有一份配置"""
# 网站基本信息
site_name = models.CharField(_('site name'), max_length=200, null=False, blank=False, default='') # 网站名称
site_description = models.TextField(_('site description'), max_length=1000, null=False, blank=False,
default='') # 网站描述
site_seo_description = models.TextField(_('site seo description'), max_length=1000, null=False, blank=False,
default='') # SEO描述
site_keywords = models.TextField(_('site keywords'), max_length=1000, null=False, blank=False, default='') # 网站关键词
# 内容显示设置
article_sub_length = models.IntegerField(_('article sub length'), default=300) # 文章摘要长度
sidebar_article_count = models.IntegerField(_('sidebar article count'), default=10) # 侧边栏文章数量
sidebar_comment_count = models.IntegerField(_('sidebar comment count'), default=5) # 侧边栏评论数量
article_comment_count = models.IntegerField(_('article comment count'), default=5) # 文章页评论数量
# 广告设置
show_google_adsense = models.BooleanField(_('show adsense'), default=False) # 是否显示Google广告
google_adsense_codes = models.TextField(_('adsense code'), max_length=2000, null=True, blank=True,
default='') # 广告代码
# 功能开关
open_site_comment = models.BooleanField(_('open site comment'), default=True) # 是否开启全站评论
comment_need_review = models.BooleanField('评论是否需要审核', default=False, null=False) # 评论是否需要审核
# 页面布局
global_header = models.TextField("公共头部", null=True, blank=True, default='') # 全局头部HTML
global_footer = models.TextField("公共尾部", null=True, blank=True, default='') # 全局尾部HTML
# 备案信息
beian_code = models.CharField('备案号', max_length=2000, null=True, blank=True, default='') # ICP备案号
show_gongan_code = models.BooleanField('是否显示公安备案号', default=False, null=False) # 是否显示公安备案
gongan_beiancode = models.TextField('公安备案号', max_length=2000, null=True, blank=True, default='') # 公安备案号
# 统计代码
analytics_code = models.TextField("网站统计代码", max_length=1000, null=False, blank=False, default='') # 网站统计代码
class Meta:
verbose_name = _('Website configuration') # 单数显示名称
verbose_name_plural = verbose_name # 复数显示名称
def __str__(self):
return self.site_name
def clean(self):
"""验证配置唯一性,确保只有一个配置实例(单例模式)"""
if BlogSettings.objects.exclude(id=self.id).count():
raise ValidationError(_('There can only be one configuration'))
def save(self, *args, **kwargs):
"""保存配置并清除缓存,确保配置变更立即生效"""
super().save(*args, **kwargs)
from djangoblog.utils import cache
cache.clear() # 清除所有缓存

@ -0,0 +1,40 @@
from haystack import indexes
from blog.models import Article
class ArticleIndex(indexes.SearchIndex, indexes.Indexable):
"""
文章搜索索引类
这个类用于定义Django Haystack搜索引擎中文章的索引结构
它继承自SearchIndex和Indexable提供了文章模型的全文搜索功能
"""
# 主搜索字段document=True表示这是主要的搜索内容字段
# use_template=True表示使用模板文件来定义索引内容
text = indexes.CharField(document=True, use_template=True)
def get_model(self):
"""
获取与此索引关联的Django模型
Returns:
Model: 返回Article模型类
"""
return Article
def index_queryset(self, using=None):
"""
定义要建立索引的查询集
这个方法返回需要被索引的文章集合这里只索引已发布(status='p')的文章
草稿文章不会被包含在搜索索引中
Args:
using: 可选参数指定使用的搜索引擎别名
Returns:
QuerySet: 包含所有已发布文章的查询集
"""
return self.get_model().objects.filter(status='p')

@ -0,0 +1,329 @@
import os
from django.conf import settings
from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.management import call_command
from django.core.paginator import Paginator
from django.templatetags.static import static
from django.test import Client, RequestFactory, TestCase
from django.urls import reverse
from django.utils import timezone
from accounts.models import BlogUser
from blog.forms import BlogSearchForm
from blog.models import Article, Category, Tag, SideBar, Links
from blog.templatetags.blog_tags import load_pagination_info, load_articletags
from djangoblog.utils import get_current_site, get_sha256
from oauth.models import OAuthUser, OAuthConfig
# Create your tests here.
class ArticleTest(TestCase):
"""
文章模型测试类
这个测试类用于测试博客系统的核心功能包括
- 文章创建和验证
- 搜索功能
- 分页功能
- 文件上传
- 管理命令
- 错误页面处理
"""
def setUp(self):
"""
测试初始化方法
在每个测试方法执行前运行用于设置测试环境
"""
self.client = Client() # Django测试客户端用于模拟HTTP请求
self.factory = RequestFactory() # 请求工厂,用于创建请求对象
def test_validate_article(self):
"""
测试文章验证和核心功能
这个测试方法验证博客系统的核心功能
- 用户创建和认证
- 文章创建和关联
- 搜索功能
- 分页功能
- RSS和站点地图
- 管理后台访问
"""
# 获取当前站点域名
site = get_current_site().domain
# 创建测试用户(超级用户)
user = BlogUser.objects.get_or_create(
email="liangliangyy@gmail.com",
username="liangliangyy")[0]
user.set_password("liangliangyy")
user.is_staff = True # 设置为管理员
user.is_superuser = True # 设置为超级用户
user.save()
# 测试用户详情页访问
response = self.client.get(user.get_absolute_url())
self.assertEqual(response.status_code, 200) # 断言返回200状态码
# 测试管理后台页面访问
response = self.client.get('/admin/servermanager/emailsendlog/')
response = self.client.get('admin/admin/logentry/')
# 创建侧边栏测试数据
s = SideBar()
s.sequence = 1
s.name = 'test'
s.content = 'test content'
s.is_enable = True
s.save()
# 创建分类测试数据
category = Category()
category.name = "category"
category.creation_time = timezone.now()
category.last_mod_time = timezone.now()
category.save()
# 创建标签测试数据
tag = Tag()
tag.name = "nicetag"
tag.save()
# 创建文章测试数据
article = Article()
article.title = "nicetitle"
article.body = "nicecontent"
article.author = user
article.category = category
article.type = 'a' # 文章类型
article.status = 'p' # 发布状态
article.save()
# 验证初始标签数量为0
self.assertEqual(0, article.tags.count())
# 添加标签到文章
article.tags.add(tag)
article.save()
# 验证标签数量为1
self.assertEqual(1, article.tags.count())
# 批量创建20篇文章用于测试分页和搜索
for i in range(20):
article = Article()
article.title = "nicetitle" + str(i)
article.body = "nicetitle" + str(i)
article.author = user
article.category = category
article.type = 'a'
article.status = 'p'
article.save()
article.tags.add(tag)
article.save()
# 测试搜索功能如果启用了Elasticsearch
from blog.documents import ELASTICSEARCH_ENABLED
if ELASTICSEARCH_ENABLED:
call_command("build_index") # 构建搜索索引
response = self.client.get('/search', {'q': 'nicetitle'})
self.assertEqual(response.status_code, 200)
# 测试文章详情页访问
response = self.client.get(article.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 测试百度推送通知
from djangoblog.spider_notify import SpiderNotify
SpiderNotify.notify(article.get_absolute_url())
# 测试标签页访问
response = self.client.get(tag.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 测试分类页访问
response = self.client.get(category.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 测试搜索页面
response = self.client.get('/search', {'q': 'django'})
self.assertEqual(response.status_code, 200)
# 测试模板标签函数
s = load_articletags(article)
self.assertIsNotNone(s)
# 用户登录测试
self.client.login(username='liangliangyy', password='liangliangyy')
# 测试文章归档页面
response = self.client.get(reverse('blog:archives'))
self.assertEqual(response.status_code, 200)
# 测试各种分页场景
p = Paginator(Article.objects.all(), settings.PAGINATE_BY)
self.check_pagination(p, '', '') # 基础分页
p = Paginator(Article.objects.filter(tags=tag), settings.PAGINATE_BY)
self.check_pagination(p, '分类标签归档', tag.slug) # 标签分页
p = Paginator(
Article.objects.filter(
author__username='liangliangyy'), settings.PAGINATE_BY)
self.check_pagination(p, '作者文章归档', 'liangliangyy') # 作者分页
p = Paginator(Article.objects.filter(category=category), settings.PAGINATE_BY)
self.check_pagination(p, '分类目录归档', category.slug) # 分类分页
# 测试搜索表单
f = BlogSearchForm()
f.search()
# 测试百度批量推送
from djangoblog.spider_notify import SpiderNotify
SpiderNotify.baidu_notify([article.get_full_url()])
# 测试Gravatar相关功能
from blog.templatetags.blog_tags import gravatar_url, gravatar
u = gravatar_url('liangliangyy@gmail.com')
u = gravatar('liangliangyy@gmail.com')
# 测试友情链接功能
link = Links(
sequence=1,
name="lylinux",
link='https://wwww.lylinux.net')
link.save()
response = self.client.get('/links.html')
self.assertEqual(response.status_code, 200)
# 测试RSS订阅
response = self.client.get('/feed/')
self.assertEqual(response.status_code, 200)
# 测试站点地图
response = self.client.get('/sitemap.xml')
self.assertEqual(response.status_code, 200)
# 测试管理后台操作
self.client.get("/admin/blog/article/1/delete/")
self.client.get('/admin/servermanager/emailsendlog/')
self.client.get('/admin/admin/logentry/')
self.client.get('/admin/admin/logentry/1/change/')
def check_pagination(self, p, type, value):
"""
分页功能测试辅助方法
Args:
p: Paginator分页对象
type: 分页类型用于生成URL
value: 分页参数值
"""
for page in range(1, p.num_pages + 1):
# 加载分页信息
s = load_pagination_info(p.page(page), type, value)
self.assertIsNotNone(s)
# 测试上一页链接
if s['previous_url']:
response = self.client.get(s['previous_url'])
self.assertEqual(response.status_code, 200)
# 测试下一页链接
if s['next_url']:
response = self.client.get(s['next_url'])
self.assertEqual(response.status_code, 200)
def test_image(self):
"""
图片上传和头像处理测试
"""
import requests
# 下载测试图片
rsp = requests.get(
'https://www.python.org/static/img/python-logo.png')
imagepath = os.path.join(settings.BASE_DIR, 'python.png')
with open(imagepath, 'wb') as file:
file.write(rsp.content)
# 测试未授权上传应该返回403
rsp = self.client.post('/upload')
self.assertEqual(rsp.status_code, 403)
# 生成签名用于授权上传
sign = get_sha256(get_sha256(settings.SECRET_KEY))
with open(imagepath, 'rb') as file:
imgfile = SimpleUploadedFile(
'python.png', file.read(), content_type='image/jpg')
form_data = {'python.png': imgfile}
# 测试授权上传
rsp = self.client.post(
'/upload?sign=' + sign, form_data, follow=True)
self.assertEqual(rsp.status_code, 200)
# 清理测试文件
os.remove(imagepath)
# 测试工具函数
from djangoblog.utils import save_user_avatar, send_email
send_email(['qq@qq.com'], 'testTitle', 'testContent')
save_user_avatar(
'https://www.python.org/static/img/python-logo.png')
def test_errorpage(self):
"""测试404错误页面"""
rsp = self.client.get('/eee')
self.assertEqual(rsp.status_code, 404)
def test_commands(self):
"""
测试Django管理命令
验证系统提供的各种管理命令是否能正常执行
"""
# 创建测试用户
user = BlogUser.objects.get_or_create(
email="liangliangyy@gmail.com",
username="liangliangyy")[0]
user.set_password("liangliangyy")
user.is_staff = True
user.is_superuser = True
user.save()
# 创建OAuth配置
c = OAuthConfig()
c.type = 'qq'
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
# 创建OAuth用户
u = OAuthUser()
u.type = 'qq'
u.openid = 'openid'
u.user = user
u.picture = static("/blog/img/avatar.png")
u.metadata = '''
{
"figureurl": "https://qzapp.qlogo.cn/qzapp/101513904/C740E30B4113EAA80E0D9918ABC78E82/30"
}'''
u.save()
u = OAuthUser()
u.type = 'qq'
u.openid = 'openid1'
u.picture = 'https://qzapp.qlogo.cn/qzapp/101513904/C740E30B4113EAA80E0D9918ABC78E82/30'
u.metadata = '''
{
"figureurl": "https://qzapp.qlogo.cn/qzapp/101513904/C740E30B4113EAA80E0D9918ABC78E82/30"
}'''
u.save()
# 测试各种管理命令
from blog.documents import ELASTICSEARCH_ENABLED
if ELASTICSEARCH_ENABLED:
call_command("build_index") # 构建搜索索引
call_command("ping_baidu", "all") # 百度推送
call_command("create_testdata") # 创建测试数据
call_command("clear_cache") # 清理缓存
call_command("sync_user_avatar") # 同步用户头像
call_command("build_search_words") # 构建搜索词

@ -0,0 +1,101 @@
from django.urls import path
from django.views.decorators.cache import cache_page
from . import views
# 应用命名空间用于URL反向解析时区分不同应用的URL
app_name = "blog"
# URL模式配置定义了博客应用的所有URL路由
urlpatterns = [
# 首页路由
path(
r'', # 空路径匹配根URL/ 或 /blog/
views.IndexView.as_view(), # 使用类视图处理首页
name='index' # URL名称用于反向解析
),
# 首页分页路由
path(
r'page/<int:page>/', # 带页码的路径(如:/page/2/
views.IndexView.as_view(), # 使用相同的类视图,但会处理分页
name='index_page' # URL名称
),
# 文章详情页路由SEO友好URL
path(
r'article/<int:year>/<int:month>/<int:day>/<int:article_id>.html', # 包含年月日和文章ID的URL
views.ArticleDetailView.as_view(), # 文章详情类视图
name='detailbyid' # URL名称
),
# 分类详情页路由
path(
r'category/<slug:category_name>.html', # 使用分类名称的slug格式
views.CategoryDetailView.as_view(), # 分类详情类视图
name='category_detail' # URL名称
),
# 分类详情分页路由
path(
r'category/<slug:category_name>/<int:page>.html', # 带页码的分类URL
views.CategoryDetailView.as_view(), # 相同的类视图处理分页
name='category_detail_page' # URL名称
),
# 作者详情页路由
path(
r'author/<author_name>.html', # 使用作者名称的URL
views.AuthorDetailView.as_view(), # 作者详情类视图
name='author_detail' # URL名称
),
# 作者详情分页路由
path(
r'author/<author_name>/<int:page>.html', # 带页码的作者URL
views.AuthorDetailView.as_view(), # 相同的类视图处理分页
name='author_detail_page' # URL名称
),
# 标签详情页路由
path(
r'tag/<slug:tag_name>.html', # 使用标签名称的slug格式
views.TagDetailView.as_view(), # 标签详情类视图
name='tag_detail' # URL名称
),
# 标签详情分页路由
path(
r'tag/<slug:tag_name>/<int:page>.html', # 带页码的标签URL
views.TagDetailView.as_view(), # 相同的类视图处理分页
name='tag_detail_page' # URL名称
),
# 文章归档页路由(带缓存)
path(
'archives.html', # 归档页面URL
cache_page(60 * 60)(views.ArchivesView.as_view()), # 使用缓存装饰器缓存1小时
name='archives' # URL名称
),
# 友情链接页面路由
path(
'links.html', # 友情链接页面URL
views.LinkListView.as_view(), # 链接列表类视图
name='links' # URL名称
),
# 文件上传路由
path(
r'upload', # 文件上传端点
views.fileupload, # 使用函数视图处理文件上传
name='upload' # URL名称
),
# 缓存清理路由
path(
r'clean', # 缓存清理端点
views.clean_cache_view, # 使用函数视图处理缓存清理
name='clean' # URL名称
),
]

@ -0,0 +1,500 @@
import logging
import os
import uuid
from django.conf import settings
from django.core.paginator import Paginator
from django.http import HttpResponse, HttpResponseForbidden
from django.shortcuts import get_object_or_404
from django.shortcuts import render
from django.templatetags.static import static
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.detail import DetailView
from django.views.generic.list import ListView
from haystack.views import SearchView
from blog.models import Article, Category, LinkShowType, Links, Tag
from comments.forms import CommentForm
from djangoblog.plugin_manage import hooks
from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
from djangoblog.utils import cache, get_blog_setting, get_sha256
logger = logging.getLogger(__name__)
class ArticleListView(ListView):
"""
文章列表基类视图
提供通用的文章列表功能和缓存机制
所有文章列表视图都应该继承此类
"""
# template_name属性用于指定使用哪个模板进行渲染
template_name = 'blog/article_index.html'
# context_object_name属性用于给上下文变量取名在模板中使用该名字
context_object_name = 'article_list'
# 页面类型,分类目录或标签列表等
page_type = ''
paginate_by = settings.PAGINATE_BY # 每页显示的文章数量
page_kwarg = 'page' # URL中页码参数的名称
link_type = LinkShowType.L # 友情链接显示类型
def get_view_cache_key(self):
"""获取视图缓存键 - 需要子类实现"""
return self.request.get['pages']
@property
def page_number(self):
"""获取当前页码"""
page_kwarg = self.page_kwarg
page = self.kwargs.get(
page_kwarg) or self.request.GET.get(page_kwarg) or 1
return page
def get_queryset_cache_key(self):
"""
获取查询集缓存键
子类必须重写此方法
"""
raise NotImplementedError()
def get_queryset_data(self):
"""
获取查询集数据
子类必须重写此方法
"""
raise NotImplementedError()
def get_queryset_from_cache(self, cache_key):
"""
从缓存获取查询集数据
Args:
cache_key: 缓存键
Returns:
QuerySet: 文章查询集
"""
value = cache.get(cache_key)
if value:
logger.info('get view cache.key:{key}'.format(key=cache_key))
return value
else:
article_list = self.get_queryset_data()
cache.set(cache_key, article_list)
logger.info('set view cache.key:{key}'.format(key=cache_key))
return article_list
def get_queryset(self):
"""
获取查询集 - 从缓存获取数据
Returns:
QuerySet: 文章查询集
"""
key = self.get_queryset_cache_key()
value = self.get_queryset_from_cache(key)
return value
def get_context_data(self, **kwargs):
"""添加上下文数据"""
kwargs['linktype'] = self.link_type
return super(ArticleListView, self).get_context_data(**kwargs)
class IndexView(ArticleListView):
"""
首页视图
显示最新的文章列表
"""
# 友情链接类型 - 首页显示
link_type = LinkShowType.I
def get_queryset_data(self):
"""获取首页文章数据 - 只获取已发布的普通文章"""
article_list = Article.objects.filter(type='a', status='p')
return article_list
def get_queryset_cache_key(self):
"""获取首页缓存键 - 基于页码"""
cache_key = 'index_{page}'.format(page=self.page_number)
return cache_key
class ArticleDetailView(DetailView):
"""
文章详情页面视图
显示单篇文章的详细内容和评论
"""
template_name = 'blog/article_detail.html'
model = Article # 关联的模型
pk_url_kwarg = 'article_id' # URL中主键参数的名称
context_object_name = "article" # 模板中使用的变量名
def get_context_data(self, **kwargs):
"""添加上下文数据 - 文章详情和评论信息"""
# 创建评论表单
comment_form = CommentForm()
# 获取文章评论列表
article_comments = self.object.comment_list()
# 获取顶级评论(没有父评论的评论)
parent_comments = article_comments.filter(parent_comment=None)
# 获取博客设置
blog_setting = get_blog_setting()
# 对评论进行分页
paginator = Paginator(parent_comments, blog_setting.article_comment_count)
page = self.request.GET.get('comment_page', '1')
# 验证页码
if not page.isnumeric():
page = 1
else:
page = int(page)
if page < 1:
page = 1
if page > paginator.num_pages:
page = paginator.num_pages
# 获取当前页的评论
p_comments = paginator.page(page)
# 计算下一页和上一页
next_page = p_comments.next_page_number() if p_comments.has_next() else None
prev_page = p_comments.previous_page_number() if p_comments.has_previous() else None
# 构建评论分页URL
if next_page:
kwargs[
'comment_next_page_url'] = self.object.get_absolute_url() + f'?comment_page={next_page}#commentlist-container'
if prev_page:
kwargs[
'comment_prev_page_url'] = self.object.get_absolute_url() + f'?comment_page={prev_page}#commentlist-container'
# 添加上下文数据
kwargs['form'] = comment_form
kwargs['article_comments'] = article_comments
kwargs['p_comments'] = p_comments
kwargs['comment_count'] = len(article_comments) if article_comments else 0
# 添加上下篇文章信息
kwargs['next_article'] = self.object.next_article
kwargs['prev_article'] = self.object.prev_article
# 调用父类方法获取基础上下文
context = super(ArticleDetailView, self).get_context_data(**kwargs)
article = self.object
# Action Hook, 通知插件"文章详情已获取"
hooks.run_action('after_article_body_get', article=article, request=self.request)
# Filter Hook, 允许插件修改文章正文
article.body = hooks.apply_filters(ARTICLE_CONTENT_HOOK_NAME, article.body, article=article,
request=self.request)
return context
class CategoryDetailView(ArticleListView):
"""
分类目录列表视图
显示指定分类下的所有文章包括子分类
"""
page_type = "分类目录归档"
def get_queryset_data(self):
"""获取分类文章数据 - 包括所有子分类的文章"""
slug = self.kwargs['category_name']
category = get_object_or_404(Category, slug=slug)
categoryname = category.name
self.categoryname = categoryname
# 获取所有子分类的名称
categorynames = list(
map(lambda c: c.name, category.get_sub_categorys()))
# 获取这些分类下的所有已发布文章
article_list = Article.objects.filter(
category__name__in=categorynames, status='p')
return article_list
def get_queryset_cache_key(self):
"""获取分类页面缓存键 - 基于分类名称和页码"""
slug = self.kwargs['category_name']
category = get_object_or_404(Category, slug=slug)
categoryname = category.name
self.categoryname = categoryname
cache_key = 'category_list_{categoryname}_{page}'.format(
categoryname=categoryname, page=self.page_number)
return cache_key
def get_context_data(self, **kwargs):
"""添加上下文数据"""
categoryname = self.categoryname
try:
categoryname = categoryname.split('/')[-1] # 处理多层分类名称
except BaseException:
pass
kwargs['page_type'] = CategoryDetailView.page_type
kwargs['tag_name'] = categoryname
return super(CategoryDetailView, self).get_context_data(**kwargs)
class AuthorDetailView(ArticleListView):
"""
作者详情页视图
显示指定作者的所有文章
"""
page_type = '作者文章归档'
def get_queryset_cache_key(self):
"""获取作者页面缓存键 - 基于作者名称和页码"""
from uuslug import slugify
author_name = slugify(self.kwargs['author_name']) # 使用slugify处理作者名
cache_key = 'author_{author_name}_{page}'.format(
author_name=author_name, page=self.page_number)
return cache_key
def get_queryset_data(self):
"""获取作者文章数据 - 指定作者的所有已发布文章"""
author_name = self.kwargs['author_name']
article_list = Article.objects.filter(
author__username=author_name, type='a', status='p')
return article_list
def get_context_data(self, **kwargs):
"""添加上下文数据"""
author_name = self.kwargs['author_name']
kwargs['page_type'] = AuthorDetailView.page_type
kwargs['tag_name'] = author_name
return super(AuthorDetailView, self).get_context_data(**kwargs)
class TagDetailView(ArticleListView):
"""
标签列表页面视图
显示指定标签下的所有文章
"""
page_type = '分类标签归档'
def get_queryset_data(self):
"""获取标签文章数据 - 指定标签的所有已发布文章"""
slug = self.kwargs['tag_name']
tag = get_object_or_404(Tag, slug=slug)
tag_name = tag.name
self.name = tag_name
article_list = Article.objects.filter(
tags__name=tag_name, type='a', status='p')
return article_list
def get_queryset_cache_key(self):
"""获取标签页面缓存键 - 基于标签名称和页码"""
slug = self.kwargs['tag_name']
tag = get_object_or_404(Tag, slug=slug)
tag_name = tag.name
self.name = tag_name
cache_key = 'tag_{tag_name}_{page}'.format(
tag_name=tag_name, page=self.page_number)
return cache_key
def get_context_data(self, **kwargs):
"""添加上下文数据"""
tag_name = self.name
kwargs['page_type'] = TagDetailView.page_type
kwargs['tag_name'] = tag_name
return super(TagDetailView, self).get_context_data(**kwargs)
class ArchivesView(ArticleListView):
"""
文章归档页面视图
按时间顺序显示所有文章不分页
"""
page_type = '文章归档'
paginate_by = None # 不分页
page_kwarg = None
template_name = 'blog/article_archives.html' # 使用专门的归档模板
def get_queryset_data(self):
"""获取归档数据 - 所有已发布文章"""
return Article.objects.filter(status='p').all()
def get_queryset_cache_key(self):
"""获取归档页面缓存键 - 固定键名"""
cache_key = 'archives'
return cache_key
class LinkListView(ListView):
"""
友情链接列表视图
显示所有启用的友情链接
"""
model = Links # 关联的模型
template_name = 'blog/links_list.html' # 友情链接模板
def get_queryset(self):
"""获取启用的友情链接"""
return Links.objects.filter(is_enable=True)
class EsSearchView(SearchView):
"""
Elasticsearch搜索视图
扩展Haystack的搜索功能
"""
def get_context(self):
"""获取搜索上下文数据"""
paginator, page = self.build_page() # 构建分页
context = {
"query": self.query, # 搜索关键词
"form": self.form, # 搜索表单
"page": page, # 当前页
"paginator": paginator, # 分页器
"suggestion": None, # 搜索建议
}
# 添加拼写建议
if hasattr(self.results, "query") and self.results.query.backend.include_spelling:
context["suggestion"] = self.results.query.get_spelling_suggestion()
context.update(self.extra_context())
return context
@csrf_exempt # 免除CSRF验证用于文件上传
def fileupload(request):
"""
文件上传视图
提供图床功能支持图片和文件上传
Args:
request: HTTP请求对象
Returns:
HttpResponse: 上传结果
"""
if request.method == 'POST':
# 验证签名,确保上传请求合法
sign = request.GET.get('sign', None)
if not sign:
return HttpResponseForbidden()
if not sign == get_sha256(get_sha256(settings.SECRET_KEY)):
return HttpResponseForbidden()
response = []
# 处理所有上传的文件
for filename in request.FILES:
# 按日期创建目录结构
timestr = timezone.now().strftime('%Y/%m/%d')
imgextensions = ['jpg', 'png', 'jpeg', 'bmp'] # 图片文件扩展名
fname = u''.join(str(filename))
# 判断是否为图片文件
isimage = len([i for i in imgextensions if fname.find(i) >= 0]) > 0
# 创建存储目录
base_dir = os.path.join(settings.STATICFILES, "files" if not isimage else "image", timestr)
if not os.path.exists(base_dir):
os.makedirs(base_dir)
# 生成唯一文件名
savepath = os.path.normpath(os.path.join(base_dir, f"{uuid.uuid4().hex}{os.path.splitext(filename)[-1]}"))
# 安全检查:确保文件保存在指定目录内
if not savepath.startswith(base_dir):
return HttpResponse("only for post")
# 保存文件
with open(savepath, 'wb+') as wfile:
for chunk in request.FILES[filename].chunks():
wfile.write(chunk)
# 如果是图片,进行压缩优化
if isimage:
from PIL import Image
image = Image.open(savepath)
image.save(savepath, quality=20, optimize=True) # 压缩质量20%
# 生成静态文件URL
url = static(savepath)
response.append(url)
return HttpResponse(response)
else:
return HttpResponse("only for post") # 只支持POST请求
def page_not_found_view(
request,
exception,
template_name='blog/error_page.html'):
"""
404页面未找到视图
Args:
request: 请求对象
exception: 异常信息
template_name: 模板名称
Returns:
HttpResponse: 404错误页面
"""
if exception:
logger.error(exception) # 记录异常日志
url = request.get_full_path()
return render(request,
template_name,
{'message': _('Sorry, the page you requested is not found, please click the home page to see other?'),
'statuscode': '404'},
status=404)
def server_error_view(request, template_name='blog/error_page.html'):
"""
500服务器错误视图
Args:
request: 请求对象
template_name: 模板名称
Returns:
HttpResponse: 500错误页面
"""
return render(request,
template_name,
{'message': _('Sorry, the server is busy, please click the home page to see other?'),
'statuscode': '500'},
status=500)
def permission_denied_view(
request,
exception,
template_name='blog/error_page.html'):
"""
403权限拒绝视图
Args:
request: 请求对象
exception: 异常信息
template_name: 模板名称
Returns:
HttpResponse: 403错误页面
"""
if exception:
logger.error(exception) # 记录异常日志
return render(
request, template_name, {
'message': _('Sorry, you do not have permission to access this page?'),
'statuscode': '403'}, status=403)
def clean_cache_view(request):
"""
清理缓存视图
用于手动清理系统缓存
Args:
request: 请求对象
Returns:
HttpResponse: 清理结果
"""
cache.clear()
return HttpResponse('ok')
Loading…
Cancel
Save