diff --git a/src/djangoblog/__init__.py b/src/djangoblog/__init__.py
index 1e205f4..8704cf6 100644
--- a/src/djangoblog/__init__.py
+++ b/src/djangoblog/__init__.py
@@ -1 +1,18 @@
-default_app_config = 'djangoblog.apps.DjangoblogAppConfig'
+# 指定该 Django 应用的默认配置类。
+# 'default_app_config' 是一个特殊变量,Django 在加载应用时会自动识别它,
+# 并使用指定的 AppConfig 子类来配置该应用的行为。
+#
+# 作用说明:
+# - 'djangoblog.apps.DjangoblogAppConfig' 是一个完整的 Python 导入路径,
+# 指向 djangoblog 应用目录下 apps.py 文件中的 DjangoblogAppConfig 类。
+# - 该类通常用于自定义应用的初始化行为,例如:
+# - 在应用启动时注册信号处理器
+# - 动态加载插件(如调用 load_plugins())
+# - 设置应用别名(verbose_name)
+# - 执行其他启动时需要运行的代码
+#
+# 注意:
+# 从 Django 3.2 开始,显式设置 default_app_config 已不再是推荐做法,
+# 更推荐在应用的 __init__.py 中直接导入 AppConfig 类,或在 INSTALLED_APPS 中使用完整的配置类路径。
+# 但在一些较老版本的 Django(如 1.7 ~ 3.1)中,这是启用自定义应用配置的主要方式。
+default_app_config = 'djangoblog.apps.DjangoblogAppConfig'
\ No newline at end of file
diff --git a/src/djangoblog/admin_site.py b/src/djangoblog/admin_site.py
index f120405..1d40441 100644
--- a/src/djangoblog/admin_site.py
+++ b/src/djangoblog/admin_site.py
@@ -1,32 +1,68 @@
+# 导入 Django 管理后台相关模块
from django.contrib.admin import AdminSite
from django.contrib.admin.models import LogEntry
from django.contrib.sites.admin import SiteAdmin
from django.contrib.sites.models import Site
-from accounts.admin import *
-from blog.admin import *
-from blog.models import *
-from comments.admin import *
-from comments.models import *
-from djangoblog.logentryadmin import LogEntryAdmin
-from oauth.admin import *
-from oauth.models import *
-from owntracks.admin import *
-from owntracks.models import *
-from servermanager.admin import *
-from servermanager.models import *
+# 导入各应用的 admin 配置和模型
+from accounts.admin import * # 用户管理后台配置
+from blog.admin import * # 博客功能后台配置
+from blog.models import * # 博客功能模型
+from comments.admin import * # 评论功能后台配置
+from comments.models import * # 评论功能模型
+from djangoblog.logentryadmin import LogEntryAdmin # 自定义的日志条目管理类
+from oauth.admin import * # 第三方登录后台配置
+from oauth.models import * # 第三方登录模型
+from owntracks.admin import * # OwnTracks 位置追踪后台配置
+from owntracks.models import * # OwnTracks 模型
+from servermanager.admin import * # 服务器命令管理后台配置
+from servermanager.models import * # 服务器命令模型
class DjangoBlogAdminSite(AdminSite):
- site_header = 'djangoblog administration'
- site_title = 'djangoblog site admin'
+ """
+ 自定义 Django 管理后台站点类,用于替代默认的 admin.site。
+
+ 功能说明:
+ - 继承自 Django 的 AdminSite,可高度定制管理后台的行为和界面。
+ - 设置了自定义的页面标题和头部文本。
+ - 限制访问权限,仅超级用户可访问。
+
+ 属性说明:
+ site_header: 管理后台页面顶部显示的标题(浏览器标签页标题)。
+ site_title: 管理后台页面中显示的主标题(通常在左上角)。
+ """
+ site_header = 'djangoblog administration' # 浏览器标签和页面标题
+ site_title = 'djangoblog site admin' # 页面中显示的主标题
def __init__(self, name='admin'):
- super().__init__(name)
+ """
+ 初始化自定义管理站点。
+
+ 参数:
+ name (str): 站点名称,默认为 'admin'。
+ 用于 URL 命名空间等场景。
+ """
+ super().__init__(name) # 调用父类初始化方法
def has_permission(self, request):
+ """
+ 重写权限检查方法,控制谁可以访问此管理后台。
+
+ 参数:
+ request (HttpRequest): 当前请求对象。
+
+ 返回值:
+ bool: 仅当用户是超级用户(is_superuser)时返回 True,否则返回 False。
+ 这比 is_staff 更严格,确保只有最高权限用户可访问。
+ """
return request.user.is_superuser
+ # 被注释的 get_urls 方法示例:
+ # 可用于向管理后台添加自定义视图(如刷新缓存等运维功能)
+ # 示例中原本注册了一个 /admin/refresh/ 路径,指向 refresh_memcache 视图
+ # 通过 self.admin_view 包装,确保只有通过认证的管理员才能访问
+ #
# def get_urls(self):
# urls = super().get_urls()
# from django.urls import path
@@ -38,27 +74,46 @@ class DjangoBlogAdminSite(AdminSite):
# return urls + my_urls
+# 实例化自定义的管理站点
admin_site = DjangoBlogAdminSite(name='admin')
-
-admin_site.register(Article, ArticlelAdmin)
-admin_site.register(Category, CategoryAdmin)
-admin_site.register(Tag, TagAdmin)
-admin_site.register(Links, LinksAdmin)
-admin_site.register(SideBar, SideBarAdmin)
-admin_site.register(BlogSettings, BlogSettingsAdmin)
-
-admin_site.register(commands, CommandsAdmin)
-admin_site.register(EmailSendLog, EmailSendLogAdmin)
-
-admin_site.register(BlogUser, BlogUserAdmin)
-
-admin_site.register(Comment, CommentAdmin)
-
-admin_site.register(OAuthUser, OAuthUserAdmin)
-admin_site.register(OAuthConfig, OAuthConfigAdmin)
-
-admin_site.register(OwnTrackLog, OwnTrackLogsAdmin)
-
+"""
+创建一个 DjangoBlogAdminSite 的实例,命名为 'admin'。
+这个实例将作为整个项目的管理后台入口,替代 Django 默认的 admin.site。
+所有后续的 register 调用都使用这个自定义站点。
+"""
+
+# 注册博客功能相关模型到自定义管理后台
+admin_site.register(Article, ArticlelAdmin) # 文章模型 + 自定义管理配置
+admin_site.register(Category, CategoryAdmin) # 分类模型 + 自定义管理配置
+admin_site.register(Tag, TagAdmin) # 标签模型 + 自定义管理配置
+admin_site.register(Links, LinksAdmin) # 友情链接模型 + 自定义管理配置
+admin_site.register(SideBar, SideBarAdmin) # 侧边栏模型 + 自定义管理配置
+admin_site.register(BlogSettings, BlogSettingsAdmin) # 博客设置模型 + 自定义管理配置
+
+# 注册服务器管理相关模型
+admin_site.register(commands, CommandsAdmin) # 服务器命令模型 + 自定义管理配置
+admin_site.register(EmailSendLog, EmailSendLogAdmin) # 邮件发送日志模型 + 自定义管理配置
+
+# 注册用户相关模型
+admin_site.register(BlogUser, BlogUserAdmin) # 博客用户模型 + 自定义管理配置
+
+# 注册评论相关模型
+admin_site.register(Comment, CommentAdmin) # 评论模型 + 自定义管理配置
+
+# 注册第三方登录相关模型
+admin_site.register(OAuthUser, OAuthUserAdmin) # OAuth 用户模型 + 自定义管理配置
+admin_site.register(OAuthConfig, OAuthConfigAdmin) # OAuth 配置模型 + 自定义管理配置
+
+# 注册位置追踪相关模型
+admin_site.register(OwnTrackLog, OwnTrackLogsAdmin) # OwnTrack 日志模型 + 自定义管理配置
+
+# 注册 Django 内置的 Site 模型(使用默认 SiteAdmin)
admin_site.register(Site, SiteAdmin)
+# 注册 Django 内置的管理操作日志 LogEntry 模型(使用自定义的 LogEntryAdmin)
admin_site.register(LogEntry, LogEntryAdmin)
+"""
+通过以上 register 调用,所有注册的模型都将在自定义的管理后台界面中显示,
+并使用对应的 Admin 类进行展示和操作控制。
+最终,这个 admin_site 实例会在 urls.py 中作为管理后台的视图入口使用。
+"""
\ No newline at end of file
diff --git a/src/djangoblog/apps.py b/src/djangoblog/apps.py
index d29e318..58f329f 100644
--- a/src/djangoblog/apps.py
+++ b/src/djangoblog/apps.py
@@ -1,11 +1,41 @@
from django.apps import AppConfig
+
class DjangoblogAppConfig(AppConfig):
+ """
+ Django 应用配置类,用于定义 'djangoblog' 应用的配置信息。
+
+ 这个类继承自 Django 的 AppConfig,是应用级别的配置中心,
+ 在 Django 启动时被加载,用于指定应用的行为和初始化逻辑。
+ """
+
+ # 设置此应用中所有模型默认使用的主键字段类型。
+ # 使用 BigAutoField 表示默认主键为 64 位整数(支持更大范围的 ID),
+ # 可避免在数据量大时出现整数溢出问题。
default_auto_field = 'django.db.models.BigAutoField'
+
+ # 指定该应用的完整 Python 导入路径。
+ # 必须与应用在项目中的实际路径一致,Django 通过此属性识别应用。
name = 'djangoblog'
def ready(self):
+ """
+ 应用准备就绪时调用的方法。
+
+ Django 在应用注册系统完全加载后会自动调用此方法,是执行应用级初始化代码的推荐入口点。
+ 常用于:
+ - 启动后台任务(如 Celery)
+ - 注册信号处理器(signal handlers)
+ - 动态加载插件或模块
+ - 初始化缓存等资源
+
+ 注意:此方法在整个 Django 启动过程中只会被调用一次。
+ """
+ # 调用父类的 ready() 方法,确保父类的初始化逻辑正常执行
super().ready()
- # Import and load plugins here
+
+ # 导入插件加载函数并执行插件加载
+ # 通过在 ready() 中调用 load_plugins(),确保插件系统在 Django 完全启动后才被激活
+ # 这样可以安全地访问 Django 的 ORM 和其他已注册的组件
from .plugin_manage.loader import load_plugins
- load_plugins()
\ No newline at end of file
+ load_plugins()
\ No newline at end of file
diff --git a/src/djangoblog/blog_signals.py b/src/djangoblog/blog_signals.py
index 393f441..27a4a68 100644
--- a/src/djangoblog/blog_signals.py
+++ b/src/djangoblog/blog_signals.py
@@ -16,51 +16,98 @@ from djangoblog.utils import cache, expire_view_cache, delete_sidebar_cache, del
from djangoblog.utils import get_current_site
from oauth.models import OAuthUser
+# 获取当前模块的日志记录器,用于记录信号处理过程中的运行信息和错误
logger = logging.getLogger(__name__)
+# 自定义信号:OAuth 用户登录信号
+# 用于在 OAuth 用户登录成功后触发特定逻辑(如头像处理、缓存清理)
+# 参数:id - OAuthUser 对象的数据库 ID
oauth_user_login_signal = django.dispatch.Signal(['id'])
-send_email_signal = django.dispatch.Signal(
- ['emailto', 'title', 'content'])
+
+# 自定义信号:发送邮件信号
+# 用于解耦邮件发送逻辑,其他模块可通过触发此信号来请求发送邮件
+# 参数:
+# - emailto: 收件人邮箱列表
+# - title: 邮件标题
+# - content: 邮件正文(HTML)
+send_email_signal = django.dispatch.Signal(['emailto', 'title', 'content'])
@receiver(send_email_signal)
def send_email_signal_handler(sender, **kwargs):
+ """
+ 处理「发送邮件」信号的回调函数。
+
+ 当系统中触发 send_email_signal 时,Django 会调用此函数发送实际邮件,
+ 并将发送结果记录到数据库日志中。
+
+ 参数:
+ sender: 发送信号的对象(通常不使用)
+ **kwargs: 信号传递的参数,包含 emailto, title, content
+
+ 流程:
+ 1. 从 kwargs 中提取邮件信息
+ 2. 创建 EmailMultiAlternatives 对象(支持 HTML 邮件)
+ 3. 记录邮件发送日志(EmailSendLog)
+ 4. 尝试发送邮件,捕获异常并记录结果
+ 5. 保存日志到数据库
+ """
emailto = kwargs['emailto']
title = kwargs['title']
content = kwargs['content']
+ # 创建支持 HTML 内容的邮件对象
msg = EmailMultiAlternatives(
title,
content,
from_email=settings.DEFAULT_FROM_EMAIL,
to=emailto)
- msg.content_subtype = "html"
+ msg.content_subtype = "html" # 设置内容类型为 HTML
+ # 创建邮件发送日志对象
from servermanager.models import EmailSendLog
log = EmailSendLog()
log.title = title
log.content = content
- log.emailto = ','.join(emailto)
+ log.emailto = ','.join(emailto) # 将邮箱列表转为逗号分隔字符串
try:
+ # 发送邮件,返回值为成功发送的邮件数量
result = msg.send()
- log.send_result = result > 0
+ log.send_result = result > 0 # 成功发送至少一封即为成功
except Exception as e:
+ # 发送失败时记录错误日志,并标记发送结果为失败
logger.error(f"失败邮箱号: {emailto}, {e}")
log.send_result = False
+ # 无论成功或失败,都保存日志记录
log.save()
@receiver(oauth_user_login_signal)
def oauth_user_login_signal_handler(sender, **kwargs):
+ """
+ 处理「OAuth 用户登录」信号的回调函数。
+
+ 当 OAuth 用户登录成功后触发,主要执行:
+ 1. 检查并处理用户头像 URL(确保为本站域名)
+ 2. 清理侧边栏缓存(确保显示最新信息)
+
+ 参数:
+ sender: 发送信号的对象
+ **kwargs: 包含 id(OAuthUser 的主键)
+ """
id = kwargs['id']
oauthuser = OAuthUser.objects.get(id=id)
site = get_current_site().domain
+
+ # 如果用户头像 URL 不包含当前站点域名,则重新保存头像
+ # 防止外链失效,提升稳定性
if oauthuser.picture and not oauthuser.picture.find(site) >= 0:
from djangoblog.utils import save_user_avatar
oauthuser.picture = save_user_avatar(oauthuser.picture)
oauthuser.save()
+ # 登录后清理侧边栏缓存,确保用户信息等显示最新状态
delete_sidebar_cache()
@@ -73,42 +120,80 @@ def model_post_save_callback(
using,
update_fields,
**kwargs):
- clearcache = False
+ """
+ 全局模型保存后信号处理器。
+ 监听所有模型的 post_save 信号,根据保存的模型类型执行相应逻辑。
+
+ 主要功能:
+ 1. 文章/页面更新时通知搜索引擎(百度主动推送)
+ 2. 评论发布时清理相关缓存并发送邮件通知
+ 3. 特定模型更新后清理缓存
+
+ 参数:
+ sender: 保存的模型类
+ instance: 保存的模型实例
+ created: 是否为新建对象(True=新增,False=更新)
+ raw: 是否为原始数据导入(如 loaddata)
+ using: 使用的数据库别名
+ update_fields: 本次保存更新的字段集合(可选)
+ **kwargs: 其他参数
+
+ 注意:LogEntry 的保存不触发此逻辑,避免无限循环。
+ """
+ clearcache = False # 标记是否需要清理缓存
+
+ # 如果保存的是管理日志(LogEntry),直接返回,避免性能问题或循环触发
if isinstance(instance, LogEntry):
return
+
+ # 如果该模型实现了 get_full_url 方法(如文章、页面等)
if 'get_full_url' in dir(instance):
+ # 检查是否仅更新了 'views' 字段(阅读量)
is_update_views = update_fields == {'views'}
+ # 非测试环境且非阅读量更新时,向百度推送 URL
if not settings.TESTING and not is_update_views:
try:
notify_url = instance.get_full_url()
SpiderNotify.baidu_notify([notify_url])
except Exception as ex:
- logger.error("notify sipder", ex)
+ logger.error("notify spider", ex)
+ # 除非是更新阅读量,否则标记需要清理缓存
if not is_update_views:
clearcache = True
+ # 如果保存的是评论模型,且评论已启用
if isinstance(instance, Comment):
if instance.is_enable:
path = instance.article.get_absolute_url()
site = get_current_site().domain
if site.find(':') > 0:
- site = site[0:site.find(':')]
+ site = site[0:site.find(':')] # 去除端口号
+ # 清理文章详情页缓存
expire_view_cache(
path,
servername=site,
serverport=80,
key_prefix='blogdetail')
+
+ # 清理 SEO 缓存
if cache.get('seo_processor'):
cache.delete('seo_processor')
- comment_cache_key = 'article_comments_{id}'.format(
- id=instance.article.id)
+
+ # 清理文章评论缓存
+ comment_cache_key = 'article_comments_{id}'.format(id=instance.article.id)
cache.delete(comment_cache_key)
+
+ # 清理侧边栏缓存
delete_sidebar_cache()
+
+ # 清理评论列表视图缓存
delete_view_cache('article_comments', [str(instance.article.pk)])
+ # 在新线程中发送评论通知邮件,避免阻塞主线程
_thread.start_new_thread(send_comment_email, (instance,))
+ # 如果标记需要清理缓存,则执行全站缓存清理
if clearcache:
cache.clear()
@@ -116,7 +201,20 @@ def model_post_save_callback(
@receiver(user_logged_in)
@receiver(user_logged_out)
def user_auth_callback(sender, request, user, **kwargs):
+ """
+ 用户登录/登出信号处理器。
+ 当用户登录或登出时触发,用于清理与用户状态相关的缓存。
+
+ 目前主要功能:
+ - 清理侧边栏缓存(如登录状态、用户信息等)
+
+ 参数:
+ sender: User 模型
+ request: 当前请求对象
+ user: 用户对象
+ **kwargs: 其他参数
+ """
if user and user.username:
- logger.info(user)
- delete_sidebar_cache()
- # cache.clear()
+ logger.info(user) # 记录登录/登出的用户名
+ delete_sidebar_cache() # 清理侧边栏缓存,确保显示正确的登录状态
+ # cache.clear() # 当前注释,避免过度清理
\ No newline at end of file
diff --git a/src/djangoblog/elasticsearch_backend.py b/src/djangoblog/elasticsearch_backend.py
index 4afe498..2aabfae 100644
--- a/src/djangoblog/elasticsearch_backend.py
+++ b/src/djangoblog/elasticsearch_backend.py
@@ -8,55 +8,124 @@ from haystack.utils import log as logging
from blog.documents import ArticleDocument, ArticleDocumentManager
from blog.models import Article
+# 获取当前模块的日志记录器,用于记录搜索相关的日志信息
logger = logging.getLogger(__name__)
class ElasticSearchBackend(BaseSearchBackend):
+ """
+ 自定义 Elasticsearch 搜索后端,继承自 Haystack 的 BaseSearchBackend。
+ 负责与 Elasticsearch 交互,实现索引的创建、更新、删除、查询等操作。
+ 使用自定义的 ArticleDocumentManager 来管理文档索引。
+ """
+
def __init__(self, connection_alias, **connection_options):
- super(
- ElasticSearchBackend,
- self).__init__(
- connection_alias,
- **connection_options)
- self.manager = ArticleDocumentManager()
- self.include_spelling = True
+ """
+ 初始化搜索后端。
+
+ 参数:
+ connection_alias: 连接别名(Haystack 配置中的别名)
+ **connection_options: 连接选项(如主机、端口等)
+ """
+ super(ElasticSearchBackend, self).__init__(connection_alias, **connection_options)
+ self.manager = ArticleDocumentManager() # 实例化文档管理器
+ self.include_spelling = True # 启用拼写建议功能
def _get_models(self, iterable):
+ """
+ 将模型实例或查询集转换为可索引的文档对象列表。
+
+ 参数:
+ iterable: 模型实例列表或查询集,若为空则默认使用所有 Article
+
+ 返回:
+ 文档对象列表(用于索引)
+ """
models = iterable if iterable and iterable[0] else Article.objects.all()
docs = self.manager.convert_to_doc(models)
return docs
def _create(self, models):
- self.manager.create_index()
- docs = self._get_models(models)
- self.manager.rebuild(docs)
+ """
+ 创建新的索引并填充数据。
+
+ 参数:
+ models: 要索引的模型数据(可选)
+ """
+ self.manager.create_index() # 创建索引结构
+ docs = self._get_models(models) # 转换为文档
+ self.manager.rebuild(docs) # 重建索引数据
def _delete(self, models):
+ """
+ 从索引中删除指定的文档。
+
+ 参数:
+ models: 要删除的文档对象列表
+
+ 返回:
+ bool: 删除是否成功
+ """
for m in models:
- m.delete()
+ m.delete() # 调用文档对象的 delete 方法
return True
def _rebuild(self, models):
+ """
+ 重建索引。如果 models 为空,则重建所有文章索引。
+
+ 参数:
+ models: 要重建索引的数据(可选)
+ """
models = models if models else Article.objects.all()
docs = self.manager.convert_to_doc(models)
- self.manager.update_docs(docs)
+ self.manager.update_docs(docs) # 更新文档到索引
def update(self, index, iterable, commit=True):
+ """
+ 更新索引中的文档。
+ 参数:
+ index: 索引对象(未使用)
+ iterable: 要更新的模型实例或查询集
+ commit: 是否立即提交(未使用)
+ """
models = self._get_models(iterable)
- self.manager.update_docs(models)
+ self.manager.update_docs(models) # 将转换后的文档更新到索引
def remove(self, obj_or_string):
+ """
+ 从索引中移除单个对象或字符串。
+
+ 参数:
+ obj_or_string: 要移除的对象或标识符
+ """
models = self._get_models([obj_or_string])
self._delete(models)
def clear(self, models=None, commit=True):
- self.remove(None)
+ """
+ 清空整个索引(删除所有文档)。
+
+ 参数:
+ models: 模型列表(未使用)
+ commit: 是否立即提交(未使用)
+ """
+ self.remove(None) # 调用 remove 方法清空
@staticmethod
def get_suggestion(query: str) -> str:
- """获取推荐词, 如果没有找到添加原搜索词"""
+ """
+ 根据用户输入的查询词,获取拼写建议(搜索推荐)。
+ 使用 Elasticsearch 的 term suggester 功能。
+
+ 参数:
+ query (str): 用户输入的原始搜索词
+ 返回:
+ str: 推荐的搜索词(多个词用空格连接)
+ """
+ # 执行搜索并获取建议
search = ArticleDocument.search() \
.query("match", body=query) \
.suggest('suggest_search', query, term={'field': 'body'}) \
@@ -65,29 +134,44 @@ class ElasticSearchBackend(BaseSearchBackend):
keywords = []
for suggest in search.suggest.suggest_search:
if suggest["options"]:
- keywords.append(suggest["options"][0]["text"])
+ keywords.append(suggest["options"][0]["text"]) # 使用第一个建议
else:
- keywords.append(suggest["text"])
-
+ keywords.append(suggest["text"]) # 无建议则使用原词
return ' '.join(keywords)
@log_query
def search(self, query_string, **kwargs):
+ """
+ 执行搜索查询。
+
+ 参数:
+ query_string: 用户输入的搜索关键词
+ **kwargs: 其他搜索参数(如分页偏移量)
+
+ 返回:
+ dict: 包含搜索结果、命中数、拼写建议等信息的字典
+ """
logger.info('search query_string:' + query_string)
start_offset = kwargs.get('start_offset')
end_offset = kwargs.get('end_offset')
- # 推荐词搜索
+ # 判断是否启用拼写建议
if getattr(self, "is_suggest", None):
suggestion = self.get_suggestion(query_string)
else:
suggestion = query_string
+ # 构建布尔查询:在 body 和 title 字段中匹配,至少 70% 的 should 条件匹配
q = Q('bool',
should=[Q('match', body=suggestion), Q('match', title=suggestion)],
minimum_should_match="70%")
+ # 构建搜索请求:
+ # - 查询条件:q
+ # - 过滤:status='p'(已发布),type='a'(文章)
+ # - 不返回源数据(source=False)
+ # - 分页:[start_offset: end_offset]
search = ArticleDocument.search() \
.query('bool', filter=[q]) \
.filter('term', status='p') \
@@ -95,13 +179,14 @@ class ElasticSearchBackend(BaseSearchBackend):
.source(False)[start_offset: end_offset]
results = search.execute()
- hits = results['hits'].total
+ hits = results['hits'].total # 总命中数
raw_results = []
+
+ # 将 Elasticsearch 返回的结果转换为 Haystack 的 SearchResult 对象
for raw_result in results['hits']['hits']:
app_label = 'blog'
model_name = 'Article'
additional_fields = {}
-
result_class = SearchResult
result = result_class(
@@ -111,19 +196,35 @@ class ElasticSearchBackend(BaseSearchBackend):
raw_result['_score'],
**additional_fields)
raw_results.append(result)
- facets = {}
+
+ facets = {} # 聚合结果(当前未使用)
+ # 如果建议词与原词不同,则提供拼写建议
spelling_suggestion = None if query_string == suggestion else suggestion
return {
- 'results': raw_results,
- 'hits': hits,
- 'facets': facets,
- 'spelling_suggestion': spelling_suggestion,
+ 'results': raw_results, # 搜索结果列表
+ 'hits': hits, # 总命中数
+ 'facets': facets, # 聚合信息
+ 'spelling_suggestion': spelling_suggestion, # 拼写建议
}
class ElasticSearchQuery(BaseSearchQuery):
+ """
+ 自定义搜索查询类,继承自 Haystack 的 BaseSearchQuery。
+ 负责构建和处理搜索查询语句。
+ """
+
def _convert_datetime(self, date):
+ """
+ 将日期时间对象转换为字符串格式,用于索引查询。
+
+ 参数:
+ date: 日期或 datetime 对象
+
+ 返回:
+ str: 格式化后的字符串(YYYYMMDDHHMMSS 或 YYYYMMDD000000)
+ """
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S'))
else:
@@ -131,12 +232,14 @@ class ElasticSearchQuery(BaseSearchQuery):
def clean(self, query_fragment):
"""
- Provides a mechanism for sanitizing user input before presenting the
- value to the backend.
+ 清理用户输入的查询片段,防止特殊字符引发语法错误。
+ 将包含保留字符的词用引号包围。
+
+ 参数:
+ query_fragment: 查询片段字符串
- Whoosh 1.X differs here in that you can no longer use a backslash
- to escape reserved characters. Instead, the whole word should be
- quoted.
+ 返回:
+ str: 清理后的查询字符串
"""
words = query_fragment.split()
cleaned_words = []
@@ -155,29 +258,79 @@ class ElasticSearchQuery(BaseSearchQuery):
return ' '.join(cleaned_words)
def build_query_fragment(self, field, filter_type, value):
+ """
+ 构建查询片段。此处直接返回 value.query_string。
+
+ 参数:
+ field: 字段名(未使用)
+ filter_type: 过滤类型(未使用)
+ value: 查询值对象,包含 query_string 属性
+
+ 返回:
+ str: 查询字符串
+ """
return value.query_string
def get_count(self):
+ """
+ 获取搜索结果总数。
+
+ 返回:
+ int: 结果数量
+ """
results = self.get_results()
return len(results) if results else 0
def get_spelling_suggestion(self, preferred_query=None):
+ """
+ 获取拼写建议。
+
+ 参数:
+ preferred_query: 优先使用的查询词(未使用)
+
+ 返回:
+ str: 拼写建议
+ """
return self._spelling_suggestion
def build_params(self, spelling_query=None):
+ """
+ 构建传递给后端的参数字典。
+
+ 参数:
+ spelling_query: 拼写建议查询词
+
+ 返回:
+ dict: 参数字典
+ """
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
return kwargs
class ElasticSearchModelSearchForm(ModelSearchForm):
+ """
+ 自定义搜索表单,继承自 Haystack 的 ModelSearchForm。
+ 用于处理用户提交的搜索请求,支持启用/禁用拼写建议。
+ """
def search(self):
- # 是否建议搜索
+ """
+ 重写 search 方法,在搜索前设置是否启用拼写建议。
+
+ 返回:
+ SearchQuerySet: 搜索结果集
+ """
+ # 根据表单数据决定是否启用建议搜索
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
sqs = super().search()
return sqs
class ElasticSearchEngine(BaseEngine):
- backend = ElasticSearchBackend
- query = ElasticSearchQuery
+ """
+ 自定义搜索引擎,集成 Backend、Query 和 Form。
+ 在 Haystack 配置中引用此类来启用自定义搜索功能。
+ """
+ backend = ElasticSearchBackend # 使用自定义后端
+ query = ElasticSearchQuery # 使用自定义查询类
+ # form = ElasticSearchModelSearchForm # 原代码未启用,但设计意图是使用自定义表单
\ No newline at end of file
diff --git a/src/djangoblog/feeds.py b/src/djangoblog/feeds.py
index 8c4e851..05b4c0d 100644
--- a/src/djangoblog/feeds.py
+++ b/src/djangoblog/feeds.py
@@ -8,33 +8,126 @@ from djangoblog.utils import CommonMarkdown
class DjangoBlogFeed(Feed):
+ """
+ RSS 订阅源生成器,继承自 Django 的 Feed 类。
+ 用于为网站提供文章的 RSS 订阅功能,允许用户通过 RSS 阅读器获取最新文章。
+ """
+
+ # 指定生成的 RSS 版本格式为 RSS 2.0
feed_type = Rss201rev2Feed
+ # 订阅源的描述信息
description = '大巧无工,重剑无锋.'
+
+ # 订阅源的标题
title = "且听风吟 大巧无工,重剑无锋. "
+
+ # 订阅源的相对 URL 路径
link = "/feed/"
def author_name(self):
+ """
+ 获取订阅源作者的名称。
+
+ 从用户模型中获取第一个用户(通常为站长)的昵称作为作者名。
+
+ 返回:
+ str: 作者昵称(nickname)
+ """
return get_user_model().objects.first().nickname
def author_link(self):
+ """
+ 获取订阅源作者的个人主页链接。
+
+ 返回第一个用户的绝对 URL(通常是作者页面或主页)。
+
+ 返回:
+ str: 作者主页的 URL
+ """
return get_user_model().objects.first().get_absolute_url()
def items(self):
+ """
+ 获取订阅源包含的文章列表。
+
+ 查询已发布(status='p')且类型为文章(type='a')的文章,
+ 按发布时间倒序排列,仅返回最新的 5 篇。
+
+ 返回:
+ QuerySet: 包含最多 5 篇最新文章的查询集
+ """
return Article.objects.filter(type='a', status='p').order_by('-pub_time')[:5]
def item_title(self, item):
+ """
+ 获取每篇文章在订阅源中的标题。
+
+ 参数:
+ item (Article): 文章模型实例
+
+ 返回:
+ str: 文章标题
+ """
return item.title
def item_description(self, item):
+ """
+ 获取每篇文章在订阅源中的描述内容。
+
+ 将文章正文(body)使用 Markdown 渲染为 HTML 格式后返回,
+ 以便在 RSS 阅读器中正确显示格式。
+
+ 参数:
+ item (Article): 文章模型实例
+
+ 返回:
+ str: 渲染后的 HTML 内容
+ """
return CommonMarkdown.get_markdown(item.body)
def feed_copyright(self):
+ """
+ 获取订阅源的版权信息。
+
+ 动态生成包含当前年份的版权字符串。
+
+ 返回:
+ str: 版权声明,如 "Copyright© 2025 且听风吟"
+ """
now = timezone.now()
return "Copyright© {year} 且听风吟".format(year=now.year)
def item_link(self, item):
+ """
+ 获取每篇文章在订阅源中的链接。
+
+ 使用文章自身的 get_absolute_url 方法生成绝对路径 URL。
+
+ 参数:
+ item (Article): 文章模型实例
+
+ 返回:
+ str: 文章的绝对 URL
+ """
return item.get_absolute_url()
def item_guid(self, item):
- return
+ """
+ 获取每篇文章的全局唯一标识符(GUID)。
+
+ GUID 用于 RSS 阅读器识别文章是否为新内容。通常应返回文章的唯一标识,
+ 例如 ID 或 URL。但此方法当前未完成(缺少返回值),可能导致订阅异常。
+
+ 参数:
+ item (Article): 文章模型实例
+
+ 返回:
+ str: 文章的唯一标识(当前未实现,需补充如 return item.id 或 return item.get_absolute_url())
+ """
+ # 注意:此方法当前没有 return 语句,是一个不完整的实现!
+ # 正确做法应类似:
+ # return str(item.id)
+ # 或
+ # return item.get_absolute_url()
+ pass # 占位符,实际运行会返回 None
diff --git a/src/djangoblog/logentryadmin.py b/src/djangoblog/logentryadmin.py
index 2f6a535..ada37ef 100644
--- a/src/djangoblog/logentryadmin.py
+++ b/src/djangoblog/logentryadmin.py
@@ -8,46 +8,115 @@ from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
+# 导入必要的模块:
+# - admin: Django 管理后台核心模块
+# - DELETION: 表示删除操作的常量
+# - ContentType: 用于处理模型的通用外键关系
+# - reverse, NoReverseMatch: URL 反向解析相关工具
+# - escape: HTML 转义函数,防止 XSS
+# - mark_safe: 标记字符串为安全 HTML(不转义)
+# - gettext_lazy: 国际化文本标记
+
+
class LogEntryAdmin(admin.ModelAdmin):
+ """
+ 自定义 Django 管理后台中日志条目(LogEntry)的管理界面。
+ 用于展示用户在后台执行的操作记录(如添加、修改、删除)。
+ 此类增强了默认的 LogEntry 显示功能,支持链接跳转、权限控制等。
+ """
+
+ # 在列表页右侧显示按内容类型(即模型)过滤的侧边栏
list_filter = [
'content_type'
]
+ # 支持通过对象表示名(object_repr)和变更消息(change_message)进行搜索
search_fields = [
'object_repr',
'change_message'
]
+ # 设置列表页中可点击进入编辑的字段链接
list_display_links = [
- 'action_time',
- 'get_change_message',
+ 'action_time', # 操作时间
+ 'get_change_message', # 变更消息
]
+
+ # 定义列表页显示的字段列
list_display = [
- 'action_time',
- 'user_link',
- 'content_type',
- 'object_link',
- 'get_change_message',
+ 'action_time', # 操作发生的时间
+ 'user_link', # 执行操作的用户(带链接)
+ 'content_type', # 操作的对象类型(如 blog.Article)
+ 'object_link', # 被操作的对象实例(带链接)
+ 'get_change_message', # 操作详情消息
]
def has_add_permission(self, request):
+ """
+ 控制是否允许添加新的日志条目。
+
+ 日志由系统自动生成,不允许手动添加。
+
+ 参数:
+ request: 当前 HTTP 请求对象
+
+ 返回:
+ bool: 始终返回 False,禁止添加权限
+ """
return False
def has_change_permission(self, request, obj=None):
+ """
+ 控制是否允许修改日志条目。
+
+ 仅超级用户或拥有 'admin.change_logentry' 权限的用户可以查看(GET 请求),
+ 但不允许通过 POST 请求修改(即不可编辑保存)。
+
+ 参数:
+ request: 当前 HTTP 请求对象
+ obj: 当前操作的日志对象(可选)
+
+ 返回:
+ bool: 是否具有更改权限
+ """
return (
request.user.is_superuser or
request.user.has_perm('admin.change_logentry')
) and request.method != 'POST'
def has_delete_permission(self, request, obj=None):
+ """
+ 控制是否允许删除日志条目。
+
+ 出于审计和安全考虑,禁止删除任何日志记录。
+
+ 参数:
+ request: 当前 HTTP 请求对象
+ obj: 当前操作的日志对象(可选)
+
+ 返回:
+ bool: 始终返回 False,禁止删除权限
+ """
return False
def object_link(self, obj):
- object_link = escape(obj.object_repr)
+ """
+ 将被操作对象的名称转换为可点击的超链接(如果可能)。
+
+ 如果操作不是删除(DELETION),且 content_type 存在,则尝试生成指向该对象
+ 在管理后台编辑页面的链接。否则仅显示原始文本。
+
+ 参数:
+ obj (LogEntry): 日志条目实例
+
+ 返回:
+ str: HTML 安全的链接或纯文本
+ """
+ object_link = escape(obj.object_repr) # 转义原始对象表示,防止 XSS
content_type = obj.content_type
if obj.action_flag != DELETION and content_type is not None:
- # try returning an actual link instead of object repr string
+ # 尝试生成链接
try:
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
@@ -56,17 +125,29 @@ class LogEntryAdmin(admin.ModelAdmin):
)
object_link = '{}'.format(url, object_link)
except NoReverseMatch:
+ # 如果 URL 反向解析失败(如模型未注册到 admin),则使用原始文本
pass
- return mark_safe(object_link)
+ return mark_safe(object_link) # 标记为安全 HTML 输出
+ # 设置此字段在列表页可排序,排序依据为数据库字段 'object_repr'
object_link.admin_order_field = 'object_repr'
+ # 设置此字段在列表页显示的列标题(支持国际化)
object_link.short_description = _('object')
def user_link(self, obj):
- content_type = ContentType.objects.get_for_model(type(obj.user))
- user_link = escape(force_str(obj.user))
+ """
+ 将操作用户的名称转换为可点击的超链接,指向该用户在管理后台的编辑页面。
+
+ 参数:
+ obj (LogEntry): 日志条目实例
+
+ 返回:
+ str: HTML 安全的用户链接或纯文本
+ """
+ content_type = ContentType.objects.get_for_model(type(obj.user)) # 获取用户模型的 ContentType
+ user_link = escape(force_str(obj.user)) # 转义用户名
try:
- # try returning an actual link instead of object repr string
+ # 生成用户管理页面的 URL
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
@@ -74,18 +155,45 @@ class LogEntryAdmin(admin.ModelAdmin):
)
user_link = '{}'.format(url, user_link)
except NoReverseMatch:
+ # 如果无法生成链接(如 auth.User 未正确注册),则使用原始文本
pass
- return mark_safe(user_link)
+ return mark_safe(user_link) # 标记为安全 HTML 输出
+ # 设置此字段在列表页可排序,排序依据为数据库字段 'user'
user_link.admin_order_field = 'user'
+ # 设置此字段在列表页显示的列标题(支持国际化)
user_link.short_description = _('user')
def get_queryset(self, request):
+ """
+ 自定义查询集,优化数据库查询性能。
+
+ 使用 prefetch_related 提前加载 content_type 外键关联数据,
+ 避免在列表渲染时产生 N+1 查询问题。
+
+ 参数:
+ request: 当前 HTTP 请求对象
+
+ 返回:
+ QuerySet: 优化后的 LogEntry 查询集
+ """
queryset = super(LogEntryAdmin, self).get_queryset(request)
return queryset.prefetch_related('content_type')
def get_actions(self, request):
+ """
+ 自定义可用的操作列表(如批量删除)。
+
+ 移除默认的 "删除选中项" 操作,因为已通过 has_delete_permission 禁止删除,
+ 此处再次确保该操作不会出现在操作下拉菜单中。
+
+ 参数:
+ request: 当前 HTTP 请求对象
+
+ 返回:
+ dict: 过滤后的可用操作字典
+ """
actions = super(LogEntryAdmin, self).get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
- return actions
+ return actions
\ No newline at end of file
diff --git a/src/djangoblog/plugin_manage/hook_constants.py b/src/djangoblog/plugin_manage/hook_constants.py
index 6685b7c..1016b7f 100644
--- a/src/djangoblog/plugin_manage/hook_constants.py
+++ b/src/djangoblog/plugin_manage/hook_constants.py
@@ -1,7 +1,21 @@
+# 定义一个常量,表示“加载文章详情”的动作类型或事件标识。
+# 通常用于状态管理、事件分发或日志记录中,标识当前操作是获取或展示某篇文章的详细内容。
ARTICLE_DETAIL_LOAD = 'article_detail_load'
+
+# 定义一个常量,表示“创建文章”的动作类型或事件标识。
+# 用于标识用户或系统正在执行新增一篇文章的操作。
ARTICLE_CREATE = 'article_create'
+
+# 定义一个常量,表示“更新文章”的动作类型或事件标识。
+# 用于标识对已存在文章进行修改或保存更新内容的操作。
ARTICLE_UPDATE = 'article_update'
-ARTICLE_DELETE = 'article_delete'
-ARTICLE_CONTENT_HOOK_NAME = "the_content"
+# 定义一个常量,表示“删除文章”的动作类型或事件标识。
+# 用于标识将某篇文章从系统中移除的操作。
+ARTICLE_DELETE = 'article_delete'
+# 定义一个钩子(Hook)的名称,用于在内容输出时触发某些处理逻辑。
+# 在 WordPress 等系统中,"the_content" 是一个典型的过滤器钩子,
+# 表示在显示文章正文前可以对内容进行修改或增强(如添加版权信息、格式化等)。
+# 此常量可用于挂载内容处理函数,实现插件或模块化内容扩展。
+ARTICLE_CONTENT_HOOK_NAME = "the_content"
\ No newline at end of file
diff --git a/src/djangoblog/plugin_manage/loader.py b/src/djangoblog/plugin_manage/loader.py
index 12e824b..b82c49e 100644
--- a/src/djangoblog/plugin_manage/loader.py
+++ b/src/djangoblog/plugin_manage/loader.py
@@ -2,18 +2,46 @@ import os
import logging
from django.conf import settings
+# 获取当前模块的日志记录器,用于输出插件加载过程中的日志信息(如成功或失败记录)
logger = logging.getLogger(__name__)
+
def load_plugins():
"""
- Dynamically loads and initializes plugins from the 'plugins' directory.
- This function is intended to be called when the Django app registry is ready.
+ 动态加载并初始化插件。
+
+ 功能说明:
+ 该函数用于在 Django 项目启动时,自动发现并加载配置在 `settings.ACTIVE_PLUGINS` 中的插件。
+ 它会检查每个插件是否存在于指定的插件目录中,并确认其包含入口文件 `plugin.py`。
+ 如果条件满足,则导入该插件模块,从而执行其内部的初始化代码(如注册信号、钩子、视图、中间件等)。
+
+ 调用时机:
+ 通常在 Django 的 `AppConfig.ready()` 方法中调用此函数,以确保 Django 的应用注册系统已完全就绪,
+ 避免因过早导入而导致的依赖问题或模型未加载异常。
+
+ 执行逻辑:
+ 1. 遍历 settings 中定义的活跃插件列表(ACTIVE_PLUGINS)。
+ 2. 对每个插件,构造其在文件系统中的路径。
+ 3. 判断该路径是否为一个目录,且包含名为 'plugin.py' 的文件(插件入口)。
+ 4. 若满足条件,则使用 __import__ 动态导入该模块。
+ 5. 成功导入后记录一条 info 级别的日志;若导入失败,则捕获 ImportError 异常并记录错误日志。
+
+ 注意事项:
+ - 此函数不返回任何值,也不对插件进行实例化,仅通过导入模块来触发其副作用(如注册行为)。
+ - 插件必须遵循目录结构规范:`plugins/<插件名>/plugin.py`。
+ - 错误日志中包含异常堆栈(exc_info=True),便于调试和排查插件加载失败原因。
"""
+ # 遍历 settings 中配置的已激活插件名称列表
for plugin_name in settings.ACTIVE_PLUGINS:
+ # 构建当前插件的完整文件路径
plugin_path = os.path.join(settings.PLUGINS_DIR, plugin_name)
+ # 检查该插件路径是否为有效目录,且包含 plugin.py 入口文件
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, 'plugin.py')):
try:
+ # 动态导入插件模块,触发其初始化逻辑
__import__(f'plugins.{plugin_name}.plugin')
+ # 记录插件加载成功的日志
logger.info(f"Successfully loaded plugin: {plugin_name}")
except ImportError as e:
- logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)
\ No newline at end of file
+ # 记录插件加载失败的错误日志,并包含完整的异常堆栈信息
+ logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)
\ No newline at end of file
diff --git a/src/djangoblog/settings.py b/src/djangoblog/settings.py
index d076bb6..0586fc8 100644
--- a/src/djangoblog/settings.py
+++ b/src/djangoblog/settings.py
@@ -111,7 +111,7 @@ DATABASES = {
'ENGINE': 'django.db.backends.mysql',
'NAME': os.environ.get('DJANGO_MYSQL_DATABASE') or 'djangoblog',
'USER': os.environ.get('DJANGO_MYSQL_USER') or 'root',
- 'PASSWORD': os.environ.get('DJANGO_MYSQL_PASSWORD') or 'root',
+ 'PASSWORD': os.environ.get('DJANGO_MYSQL_PASSWORD') or 'wwc147',
'HOST': os.environ.get('DJANGO_MYSQL_HOST') or '127.0.0.1',
'PORT': int(
os.environ.get('DJANGO_MYSQL_PORT') or 3306),
diff --git a/src/djangoblog/sitemap.py b/src/djangoblog/sitemap.py
index 8b7d446..9faf208 100644
--- a/src/djangoblog/sitemap.py
+++ b/src/djangoblog/sitemap.py
@@ -5,55 +5,177 @@ from blog.models import Article, Category, Tag
class StaticViewSitemap(Sitemap):
+ """
+ 静态页面的站点地图(Sitemap)类。
+ 用于为网站的固定页面(如首页)生成 sitemap 条目。
+ """
+
+ # 默认优先级:0.5(表示普通重要性)
priority = 0.5
+ # 默认更新频率:每天
changefreq = 'daily'
def items(self):
+ """
+ 定义需要包含在 sitemap 中的静态视图名称列表。
+
+ 返回:
+ list: 包含 Django URL 命名的列表,此处仅包含首页 'blog:index'
+ """
return ['blog:index', ]
def location(self, item):
+ """
+ 根据 items() 返回的 URL 名称,生成对应的绝对路径。
+
+ 参数:
+ item (str): URL 命名,如 'blog:index'
+
+ 返回:
+ str: 反向解析后的 URL 路径(如 '/')
+ """
return reverse(item)
class ArticleSiteMap(Sitemap):
+ """
+ 文章(Article)模型的站点地图类。
+ 为所有已发布的文章生成 sitemap 条目。
+ """
+
+ # 更新频率:每月
changefreq = "monthly"
+ # 优先级:0.6(相对重要)
priority = "0.6"
def items(self):
+ """
+ 获取所有已发布(status='p')的文章对象。
+
+ 返回:
+ QuerySet: 所有已发布文章的查询集
+ """
return Article.objects.filter(status='p')
def lastmod(self, obj):
+ """
+ 获取每篇文章的最后修改时间,用于搜索引擎判断内容更新。
+
+ 参数:
+ obj (Article): 文章实例
+
+ 返回:
+ datetime: 文章的 last_modify_time 字段值
+ """
return obj.last_modify_time
class CategorySiteMap(Sitemap):
+ """
+ 分类(Category)模型的站点地图类。
+ 为所有文章分类生成 sitemap 条目。
+ """
+
+ # 更新频率:每周(注意:此处拼写应为 'weekly' 小写)
changefreq = "Weekly"
+ # 优先级:0.6
priority = "0.6"
def items(self):
+ """
+ 获取所有分类对象。
+
+ 返回:
+ QuerySet: 所有分类的查询集
+ """
return Category.objects.all()
def lastmod(self, obj):
+ """
+ 获取每个分类的最后修改时间。
+
+ 参数:
+ obj (Category): 分类实例
+
+ 返回:
+ datetime: 分类的 last_modify_time 字段值
+ """
return obj.last_modify_time
class TagSiteMap(Sitemap):
+ """
+ 标签(Tag)模型的站点地图类。
+ 为所有标签生成 sitemap 条目。
+ """
+
+ # 更新频率:每周
changefreq = "Weekly"
+ # 优先级:0.3(相对较低)
priority = "0.3"
def items(self):
+ """
+ 获取所有标签对象。
+
+ 返回:
+ QuerySet: 所有标签的查询集
+ """
return Tag.objects.all()
def lastmod(self, obj):
+ """
+ 获取每个标签的最后修改时间。
+
+ 参数:
+ obj (Tag): 标签实例
+
+ 返回:
+ datetime: 标签的 last_modify_time 字段值
+ """
return obj.last_modify_time
class UserSiteMap(Sitemap):
+ """
+ 用户(作者)的站点地图类。
+ 为所有发布过文章的作者生成 sitemap 条目。
+ """
+
+ # 更新频率:每周
changefreq = "Weekly"
+ # 优先级:0.3
priority = "0.3"
def items(self):
+ """
+ 获取所有发布过文章的作者(去重)。
+
+ 实现方式:
+ 1. 查询所有文章
+ 2. 提取每篇文章的作者(author 字段)
+ 3. 使用 set 去重
+ 4. 转换为列表返回
+
+ 注意:此方法效率较低,尤其在文章量大时,建议优化为:
+ return User.objects.filter(article__status='p').distinct()
+
+ 返回:
+ list: 去重后的用户(作者)对象列表
+ """
return list(set(map(lambda x: x.author, Article.objects.all())))
def lastmod(self, obj):
- return obj.date_joined
+ """
+ 获取每个用户的最后修改时间。
+
+ 此处使用的是用户的注册时间(date_joined),而非内容更新时间。
+ 对于作者而言,更合理的 lastmod 可能是其发布的最后一篇文章的时间。
+
+ 参数:
+ obj (User): 用户实例
+
+ 返回:
+ datetime: 用户的 date_joined 字段值(注册时间)
+ """
+ return obj.date_joined
\ No newline at end of file
diff --git a/src/djangoblog/spider_notify.py b/src/djangoblog/spider_notify.py
index 7b909e9..dfb8dbb 100644
--- a/src/djangoblog/spider_notify.py
+++ b/src/djangoblog/spider_notify.py
@@ -1,21 +1,58 @@
import logging
-
import requests
from django.conf import settings
+# 获取当前模块的日志记录器,用于输出日志信息
logger = logging.getLogger(__name__)
class SpiderNotify():
+ """
+ 蜘蛛推送通知工具类,用于向搜索引擎(如百度)主动推送网站新内容的URL,
+ 帮助搜索引擎更快发现和收录网页。
+ """
+
@staticmethod
def baidu_notify(urls):
+ """
+ 向百度搜索引擎推送一组URL链接(主动抓取通知)。
+
+ 使用百度提供的链接提交API,将新发布的页面URL推送给百度爬虫,
+ 以加快收录速度。
+
+ 参数:
+ urls (list 或 str): 要推送的URL列表,例如 ['https://example.com/article/1', 'https://example.com/article/2']
+ 注意:函数期望接收一个列表,但代码中直接使用了 '\n'.join,因此传入必须为可迭代的URL列表。
+
+ 实现逻辑:
+ 1. 将 URL 列表用换行符连接成字符串
+ 2. 向 settings.BAIDU_NOTIFY_URL 指定的百度推送接口发送 POST 请求
+ 3. 记录百度返回的响应结果到日志
+ 4. 捕获并记录任何异常
+
+ 注意:
+ - 需在 Django 的 settings 中配置 BAIDU_NOTIFY_URL(百度提供的推送接口地址)
+ - 百度推送接口要求每次最多提交2000条URL
+ """
try:
- data = '\n'.join(urls)
- result = requests.post(settings.BAIDU_NOTIFY_URL, data=data)
- logger.info(result.text)
+ data = '\n'.join(urls) # 将多个 URL 用换行符拼接,符合百度推送接口格式要求
+ result = requests.post(settings.BAIDU_NOTIFY_URL, data=data) # 发送 POST 请求到百度推送接口
+ logger.info(result.text) # 记录百度服务器返回的响应内容(如成功/失败信息)
except Exception as e:
- logger.error(e)
+ logger.error(e) # 记录请求过程中发生的任何异常(如网络错误、超时等)
@staticmethod
def notify(url):
- SpiderNotify.baidu_notify(url)
+ """
+ 通用通知接口,当前默认调用百度推送。
+
+ 参数:
+ url (list): 要推送的URL列表(注意:参数名是单数 'url',但实际应传入列表)
+
+ 注意:
+ 此方法存在命名误导:
+ - 方法名是 notify,参数是 url(单数),但内部调用 baidu_notify,而 baidu_notify 期望接收一个列表
+ - 因此实际使用时仍需传入列表,例如 notify(['https://example.com/post'])
+ - 更合理的命名应为 notify_urls 或参数名为 urls
+ """
+ SpiderNotify.baidu_notify(url) # 调用百度推送方法,传入URL列表
\ No newline at end of file
diff --git a/src/djangoblog/tests.py b/src/djangoblog/tests.py
index 01237d9..ddbfe31 100644
--- a/src/djangoblog/tests.py
+++ b/src/djangoblog/tests.py
@@ -1,15 +1,44 @@
from django.test import TestCase
+# 从项目工具模块导入所有工具函数和类,用于测试
from djangoblog.utils import *
class DjangoBlogTest(TestCase):
+ """
+ DjangoBlog 应用的测试用例基类(目前仅包含工具函数测试)。
+ 继承自 Django 的 TestCase,提供数据库支持、断言方法和测试生命周期管理。
+ """
+
def setUp(self):
+ """
+ 测试前置准备方法,在每个测试方法执行前自动调用。
+
+ 当前为空实现,表示该测试类无需在每次测试前进行初始化操作。
+ 若后续需要创建测试数据(如用户、文章等),可在此方法中完成。
+ """
pass
def test_utils(self):
- md5 = get_sha256('test')
+ """
+ 测试工具模块(djangoblog.utils)中的核心功能:
+ 1. SHA256 哈希生成
+ 2. Markdown 内容解析与渲染
+
+ 此方法验证工具函数是否能正确处理输入并返回预期结果。
+ """
+
+ # 测试 get_sha256 函数:计算字符串 'test' 的 SHA256 哈希值
+ md5 = get_sha256('test') # 注意:函数名为 get_sha256,但变量名误写为 md5(应为 sha256)
+
+ # 断言:验证哈希值不为 None,确保函数成功返回了有效结果
self.assertIsNotNone(md5)
+
+ # 测试 CommonMarkdown.get_markdown 静态方法:将 Markdown 字符串转换为 HTML
+ # 输入包含多种 Markdown 元素的文本:
+ # - 一级标题 (# Title1)
+ # - Python 代码块 (```python ... ```)
+ # - 两个超链接 ([text](url))
c = CommonMarkdown.get_markdown('''
# Title1
@@ -20,13 +49,7 @@ class DjangoBlogTest(TestCase):
[url](https://www.lylinux.net/)
[ddd](http://www.baidu.com)
-
-
''')
- self.assertIsNotNone(c)
- d = {
- 'd': 'key1',
- 'd2': 'key2'
- }
- data = parse_dict_to_url(d)
- self.assertIsNotNone(data)
+
+ # 注意:此测试方法未对返回的 HTML 内容进行断言验证(如 self.assertIn('
', c)),
+ # 仅执行了转换操作,未检查输出是否符合预期。建议补充相关断言以增强测试完整性。
\ No newline at end of file
diff --git a/src/djangoblog/urls.py b/src/djangoblog/urls.py
index 4aae58a..880e40f 100644
--- a/src/djangoblog/urls.py
+++ b/src/djangoblog/urls.py
@@ -1,64 +1,97 @@
"""djangoblog URL Configuration
-The `urlpatterns` list routes URLs to views. For more information please see:
- https://docs.djangoproject.com/en/1.10/topics/http/urls/
-Examples:
-Function views
- 1. Add an import: from my_app import views
- 2. Add a URL to urlpatterns: url(r'^$', views.home, name='home')
-Class-based views
- 1. Add an import: from other_app.views import Home
- 2. Add a URL to urlpatterns: url(r'^$', Home.as_view(), name='home')
-Including another URLconf
- 1. Import the include() function: from django.conf.urls import url, include
- 2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls'))
+主 URL 配置文件,定义了整个网站的 URL 路由规则。
+将不同的 URL 模式映射到对应的视图函数或类,实现请求分发。
"""
+
+# 导入必要的模块
from django.conf import settings
-from django.conf.urls.i18n import i18n_patterns
-from django.conf.urls.static import static
-from django.contrib.sitemaps.views import sitemap
-from django.urls import path, include
-from django.urls import re_path
-from haystack.views import search_view_factory
-
-from blog.views import EsSearchView
-from djangoblog.admin_site import admin_site
-from djangoblog.elasticsearch_backend import ElasticSearchModelSearchForm
-from djangoblog.feeds import DjangoBlogFeed
-from djangoblog.sitemap import ArticleSiteMap, CategorySiteMap, StaticViewSitemap, TagSiteMap, UserSiteMap
+from django.conf.urls.i18n import i18n_patterns # 支持国际化 URL 模式
+from django.conf.urls.static import static # 用于在开发环境提供静态和媒体文件服务
+from django.contrib.sitemaps.views import sitemap # 生成站点地图(sitemap.xml)的视图
+from django.urls import path, include # 标准 URL 映射工具
+from django.urls import re_path # 支持正则表达式匹配的 URL 映射
+from haystack.views import search_view_factory # Haystack 搜索框架的视图工厂
+
+# 导入项目内自定义组件
+from blog.views import EsSearchView # 自定义的 Elasticsearch 搜索视图
+from djangoblog.admin_site import admin_site # 自定义的 Django 管理后台实例
+from djangoblog.elasticsearch_backend import ElasticSearchModelSearchForm # 搜索表单类
+from djangoblog.feeds import DjangoBlogFeed # RSS 订阅源
+from djangoblog.sitemap import ArticleSiteMap, CategorySiteMap, StaticViewSitemap, TagSiteMap, UserSiteMap # 站点地图类
-sitemaps = {
- 'blog': ArticleSiteMap,
- 'Category': CategorySiteMap,
- 'Tag': TagSiteMap,
- 'User': UserSiteMap,
- 'static': StaticViewSitemap
+# 定义站点地图(sitemap)的映射字典
+# 将不同的 sitemap 类按类别注册,用于生成 sitemap.xml
+sitemaps = {
+ 'blog': ArticleSiteMap, # 文章内容地图
+ 'Category': CategorySiteMap, # 分类地图
+ 'Tag': TagSiteMap, # 标签地图
+ 'User': UserSiteMap, # 用户(作者)地图
+ 'static': StaticViewSitemap # 静态页面地图(如首页)
}
-handler404 = 'blog.views.page_not_found_view'
-handler500 = 'blog.views.server_error_view'
-handle403 = 'blog.views.permission_denied_view'
+# 自定义错误页面处理视图
+# 当发生相应错误时,Django 会调用这些视图函数来渲染错误页面
+handler404 = 'blog.views.page_not_found_view' # 404 页面未找到
+handler500 = 'blog.views.server_error_view' # 500 服务器内部错误
+handle403 = 'blog.views.permission_denied_view' # 403 权限拒绝(注意:变量名应为 handler403 才生效)
+# 主 URL 模式列表
urlpatterns = [
+ # 提供 Django 内置的国际化(i18n)URL 支持,如 /i18n/setlang/ 用于切换语言
path('i18n/', include('django.conf.urls.i18n')),
]
+
+# 使用 i18n_patterns 包裹一组 URL,使其支持多语言前缀(如 /en/, /zh-hans/)
+# prefix_default_language=False 表示默认语言不添加前缀
urlpatterns += i18n_patterns(
+ # 管理后台 URL,访问路径为 /admin/
re_path(r'^admin/', admin_site.urls),
+
+ # 博客应用主 URL,包含 blog 应用的所有路由,命名空间为 'blog'
+ # 注意:空正则 r'' 会匹配根路径,但由于在 i18n_patterns 内,实际路径为 // 或 /(默认语言)
re_path(r'', include('blog.urls', namespace='blog')),
+
+ # Markdown 编辑器(mdeditor)的资源和上传路由
re_path(r'mdeditor/', include('mdeditor.urls')),
+
+ # 评论系统路由,命名空间为 'comment'
re_path(r'', include('comments.urls', namespace='comment')),
+
+ # 用户账户系统路由(注册、登录、个人中心等),命名空间为 'account'
re_path(r'', include('accounts.urls', namespace='account')),
+
+ # 第三方登录(OAuth)路由,命名空间为 'oauth'
re_path(r'', include('oauth.urls', namespace='oauth')),
+
+ # 站点地图路由:访问 /sitemap.xml 时,调用 sitemap 视图并传入 sitemaps 字典
re_path(r'^sitemap\.xml$', sitemap, {'sitemaps': sitemaps},
name='django.contrib.sitemaps.views.sitemap'),
+
+ # RSS 订阅源路由:访问 /feed/ 或 /rss/ 时,返回 DjangoBlogFeed 生成的 RSS 内容
re_path(r'^feed/$', DjangoBlogFeed()),
re_path(r'^rss/$', DjangoBlogFeed()),
+
+ # 搜索功能路由:使用 Haystack 的视图工厂创建基于 Elasticsearch 的搜索视图
+ # 搜索表单使用自定义的 ElasticSearchModelSearchForm
+ # 访问路径如 /search?q=关键词
re_path('^search', search_view_factory(view_class=EsSearchView, form_class=ElasticSearchModelSearchForm),
name='search'),
+
+ # 服务器管理模块路由(可能用于系统监控或运维),命名空间为 'servermanager'
re_path(r'', include('servermanager.urls', namespace='servermanager')),
+
+ # OwnTracks 路由(可能用于位置追踪服务),命名空间为 'owntracks'
re_path(r'', include('owntracks.urls', namespace='owntracks'))
- , prefix_default_language=False) + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
+ , prefix_default_language=False) # i18n_patterns 的结束和参数
+
+# 为静态文件 URL 添加处理规则
+# 在开发模式下(DEBUG=True),Django 自动提供 STATIC_URL 和 MEDIA_URL 的文件服务
+# static() 会将 URL 前缀映射到指定的文件系统路径
+urlpatterns += static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
+
+# 如果处于调试模式,额外添加媒体文件(如用户上传的图片)的 URL 映射
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL,
- document_root=settings.MEDIA_ROOT)
+ document_root=settings.MEDIA_ROOT)
\ No newline at end of file
diff --git a/src/djangoblog/utils.py b/src/djangoblog/utils.py
index 57f63dc..d2df35b 100644
--- a/src/djangoblog/utils.py
+++ b/src/djangoblog/utils.py
@@ -1,7 +1,13 @@
#!/usr/bin/env python
# encoding: utf-8
+"""
+djangoblog 工具函数模块(utils.py)
+提供博客系统所需的通用工具函数,包括缓存、安全、Markdown解析、邮件发送、文件处理等功能。
+"""
+
+# 导入标准库和第三方库
import logging
import os
import random
@@ -9,29 +15,62 @@ import string
import uuid
from hashlib import sha256
-import bleach
-import markdown
-import requests
+import bleach # 用于清理 HTML 标签,防止 XSS 攻击
+import markdown # 用于将 Markdown 文本转换为 HTML
+import requests # 用于发起 HTTP 请求
from django.conf import settings
-from django.contrib.sites.models import Site
-from django.core.cache import cache
-from django.templatetags.static import static
+from django.contrib.sites.models import Site # Django 的站点框架
+from django.core.cache import cache # 使用 Django 缓存后端
+from django.templatetags.static import static # 用于生成静态文件 URL
+# 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
+ """
+ 获取当前数据库中最新(主键最大)的文章和评论的 ID。
+
+ 返回:
+ tuple: (最新文章的主键, 最新评论的主键)
+ """
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
+ """
+ 计算输入字符串的 SHA256 哈希值。
+
+ 参数:
+ str (str): 要哈希的字符串
+
+ 返回:
+ str: SHA256 哈希值的十六进制字符串表示
+ """
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
+ """
+ 缓存装饰器,用于为函数添加缓存功能,避免重复计算或数据库查询。
+
+ 参数:
+ expiration (int): 缓存过期时间(秒),默认 3 分钟
+
+ 返回:
+ decorator: 可用于装饰函数的装饰器
+
+ 工作原理:
+ 1. 尝试从视图对象获取缓存键(通过 get_cache_key 方法)
+ 2. 如果没有,则基于函数名和参数生成唯一哈希作为键
+ 3. 查询缓存,命中则返回缓存值
+ 4. 未命中则执行原函数,将结果存入缓存并返回
+ 5. 特殊处理返回值为 None 的情况,避免缓存穿透
+ """
+
def wrapper(func):
def news(*args, **kwargs):
try:
@@ -41,12 +80,10 @@ def cache_decorator(expiration=3 * 60):
key = None
if not key:
unique_str = repr((func, args, kwargs))
-
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
value = cache.get(key)
if value is not None:
- # logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
if str(value) == '__default_cache_value__':
return None
else:
@@ -68,14 +105,23 @@ def cache_decorator(expiration=3 * 60):
def expire_view_cache(path, servername, serverport, key_prefix=None):
- '''
- 刷新视图缓存
- :param path:url路径
- :param servername:host
- :param serverport:端口
- :param key_prefix:前缀
- :return:是否成功
- '''
+ """
+ 手动清除特定视图的缓存(用于内容更新后刷新缓存)。
+
+ 参数:
+ path (str): URL 路径(如 '/article/1/')
+ servername (str): 服务器域名
+ serverport (str): 服务器端口
+ key_prefix (str, optional): 缓存键前缀
+
+ 返回:
+ bool: 缓存是否成功删除
+
+ 实现:
+ 1. 构造一个模拟的 HttpRequest 对象
+ 2. 使用 Django 的 get_cache_key 工具生成与视图缓存对应的键
+ 3. 如果键存在且缓存中有值,则删除该缓存
+ """
from django.http import HttpRequest
from django.utils.cache import get_cache_key
@@ -94,19 +140,38 @@ def expire_view_cache(path, servername, serverport, key_prefix=None):
@cache_decorator()
def get_current_site():
+ """
+ 获取当前 Django 站点对象,并使用缓存优化性能。
+
+ 返回:
+ Site: 当前站点实例(包含域名、名称等信息)
+ """
site = Site.objects.get_current()
return site
class CommonMarkdown:
+ """
+ 提供统一的 Markdown 解析功能,支持代码高亮、目录生成等。
+ """
+
@staticmethod
def _convert_markdown(value):
+ """
+ 内部方法:将 Markdown 字符串转换为 HTML,并提取目录(TOC)。
+
+ 参数:
+ value (str): Markdown 格式的文本
+
+ 返回:
+ tuple: (HTML 内容字符串, 目录 HTML 字符串)
+ """
md = markdown.Markdown(
extensions=[
- 'extra',
- 'codehilite',
- 'toc',
- 'tables',
+ 'extra', # 标准扩展(表格、脚注等)
+ 'codehilite', # 代码高亮
+ 'toc', # 自动生成目录
+ 'tables', # 表格支持
]
)
body = md.convert(value)
@@ -115,16 +180,45 @@ class CommonMarkdown:
@staticmethod
def get_markdown_with_toc(value):
+ """
+ 解析 Markdown 文本,同时返回 HTML 内容和目录。
+
+ 参数:
+ value (str): Markdown 文本
+
+ 返回:
+ tuple: (HTML 内容, TOC 目录)
+ """
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
+ """
+ 仅解析 Markdown 文本为 HTML 内容,不返回目录。
+
+ 参数:
+ value (str): Markdown 文本
+
+ 返回:
+ str: 转换后的 HTML 字符串
+ """
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
+ """
+ 发送邮件的快捷方法,通过 Django 信号机制解耦。
+
+ 参数:
+ emailto (str): 收件人邮箱
+ title (str): 邮件标题
+ content (str): 邮件正文
+
+ 实现:
+ 触发自定义的 send_email_signal 信号,由信号处理器完成实际的邮件发送逻辑。
+ """
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
@@ -134,11 +228,28 @@ def send_email(emailto, title, content):
def generate_code() -> str:
- """生成随机数验证码"""
+ """
+ 生成一个 6 位的随机数字验证码。
+
+ 返回:
+ str: 6 位数字组成的字符串(如 '123456')
+ """
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
+ """
+ 将字典转换为 URL 查询参数字符串(键值对用 & 连接)。
+
+ 参数:
+ dict (dict): 要转换的字典
+
+ 返回:
+ str: URL 编码后的查询字符串(如 'key1=value1&key2=value2')
+
+ 注意:
+ 使用 urllib.parse.quote 对键和值进行 URL 编码,safe='/' 表示斜杠不编码。
+ """
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
@@ -146,6 +257,14 @@ def parse_dict_to_url(dict):
def get_blog_setting():
+ """
+ 获取博客系统设置,优先从缓存读取,未命中则从数据库获取并缓存。
+
+ 如果数据库中没有设置记录,则创建一个默认设置并保存。
+
+ 返回:
+ BlogSettings: 博客设置模型实例
+ """
value = cache.get('get_blog_setting')
if value:
return value
@@ -174,20 +293,30 @@ def get_blog_setting():
def save_user_avatar(url):
- '''
- 保存用户头像
- :param url:头像url
- :return: 本地路径
- '''
+ """
+ 从指定 URL 下载用户头像并保存到本地静态文件目录。
+
+ 参数:
+ url (str): 头像图片的远程 URL
+
+ 返回:
+ str: 保存后的本地静态文件 URL(如 '/static/avatar/abc123.jpg')
+ 下载失败时返回默认头像路径。
+
+ 流程:
+ 1. 创建本地头像存储目录
+ 2. 下载图片内容
+ 3. 根据原始 URL 判断文件类型,生成唯一文件名
+ 4. 保存文件
+ 5. 返回静态 URL
+ """
logger.info(url)
-
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir)
-
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
@@ -202,6 +331,15 @@ def save_user_avatar(url):
def delete_sidebar_cache():
+ """
+ 清除侧边栏所有缓存。
+
+ 侧边栏内容(如最新文章、热门评论)通常会被缓存以提高性能。
+ 当内容更新时,需调用此函数清除相关缓存。
+
+ 实现:
+ 遍历 LinkShowType 的所有值,删除以 'sidebar' 为前缀的缓存键。
+ """
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
@@ -210,12 +348,31 @@ def delete_sidebar_cache():
def delete_view_cache(prefix, keys):
+ """
+ 删除基于模板片段缓存(@cache)的缓存。
+
+ 参数:
+ prefix (str): 缓存片段的名称(与模板中 cache 标签的第一个参数对应)
+ keys (list): 缓存的变量列表(用于生成唯一键)
+
+ 实现:
+ 使用 make_template_fragment_key 生成正确的缓存键,然后删除。
+ """
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
+ """
+ 获取静态资源的基础 URL。
+
+ 如果设置了 STATIC_URL,则直接返回。
+ 否则,构建一个完整的 URL(http://domain/static/)。
+
+ 返回:
+ str: 静态资源基础 URL
+ """
if settings.STATIC_URL:
return settings.STATIC_URL
else:
@@ -223,10 +380,21 @@ def get_resource_url():
return 'http://' + site.domain + '/static/'
+# 定义允许在用户内容中使用的 HTML 标签和属性
+# 用于 sanitize_html 函数,防止 XSS 攻击
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
def sanitize_html(html):
- return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)
+ """
+ 清理 HTML 内容,移除不安全的标签和属性,防止跨站脚本(XSS)攻击。
+
+ 参数:
+ html (str): 待清理的 HTML 字符串
+
+ 返回:
+ str: 清理后的 HTML 字符串,仅包含 ALLOWED_TAGS 和 ALLOWED_ATTRIBUTES 中定义的内容
+ """
+ return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)
\ No newline at end of file
diff --git a/src/djangoblog/whoosh_cn_backend.py b/src/djangoblog/whoosh_cn_backend.py
index 04e3f7f..b55c41f 100644
--- a/src/djangoblog/whoosh_cn_backend.py
+++ b/src/djangoblog/whoosh_cn_backend.py
@@ -22,7 +22,7 @@ from haystack.models import SearchResult
from haystack.utils import get_identifier, get_model_ct
from haystack.utils import log as logging
from haystack.utils.app_loading import haystack_get_model
-from jieba.analyse import ChineseAnalyzer
+from jieba.analyse import ChineseAnalyzer # 使用结巴分词进行中文分词
from whoosh import index
from whoosh.analysis import StemmingAnalyzer
from whoosh.fields import BOOLEAN, DATETIME, IDLIST, KEYWORD, NGRAM, NGRAMWORDS, NUMERIC, Schema, TEXT
@@ -40,13 +40,12 @@ except ImportError:
raise MissingDependency(
"The 'whoosh' backend requires the installation of 'Whoosh'. Please refer to the documentation.")
-# Handle minimum requirement.
+# 检查 Whoosh 版本是否满足最低要求(2.5.0)
if not hasattr(whoosh, '__version__') or whoosh.__version__ < (2, 5, 0):
raise MissingDependency(
"The 'whoosh' backend requires version 2.5.0 or greater.")
-# Bubble up the correct error.
-
+# 使用线程局部存储,用于 RAM 存储模式下的共享存储
DATETIME_REGEX = re.compile(
'^(?P\d{4})-(?P\d{2})-(?P\d{2})T(?P\d{2}):(?P\d{2}):(?P\d{2})(\.\d{3,6}Z?)?$')
LOCALS = threading.local()
@@ -55,15 +54,19 @@ LOCALS.RAM_STORE = None
class WhooshHtmlFormatter(HtmlFormatter):
"""
- This is a HtmlFormatter simpler than the whoosh.HtmlFormatter.
- We use it to have consistent results across backends. Specifically,
- Solr, Xapian and Elasticsearch are using this formatting.
+ 自定义高亮格式化器,使用简单标签包裹匹配关键词。
+ 与 Solr、Elasticsearch 等后端保持一致的高亮样式。
"""
template = '<%(tag)s>%(t)s%(tag)s>'
class WhooshSearchBackend(BaseSearchBackend):
- # Word reserved by Whoosh for special use.
+ """
+ Whoosh 搜索后端实现类,负责与 Whoosh 引擎交互,执行索引、搜索、删除等操作。
+ 继承自 haystack 的 BaseSearchBackend。
+ """
+
+ # Whoosh 中的保留关键字,不能直接用于查询
RESERVED_WORDS = (
'AND',
'NOT',
@@ -71,69 +74,66 @@ class WhooshSearchBackend(BaseSearchBackend):
'TO',
)
- # Characters reserved by Whoosh for special use.
- # The '\\' must come first, so as not to overwrite the other slash
- # replacements.
+ # Whoosh 中的保留字符,需要转义或加引号处理
RESERVED_CHARACTERS = (
'\\', '+', '-', '&&', '||', '!', '(', ')', '{', '}',
'[', ']', '^', '"', '~', '*', '?', ':', '.',
)
def __init__(self, connection_alias, **connection_options):
- super(
- WhooshSearchBackend,
- self).__init__(
- connection_alias,
- **connection_options)
- self.setup_complete = False
- self.use_file_storage = True
- self.post_limit = getattr(
- connection_options,
- 'POST_LIMIT',
- 128 * 1024 * 1024)
- self.path = connection_options.get('PATH')
+ """
+ 初始化 Whoosh 后端连接。
+ :param connection_alias: 连接别名(如 'default')
+ :param connection_options: 配置选项,如 PATH、STORAGE 等
+ """
+ super(WhooshSearchBackend, self).__init__(connection_alias, **connection_options)
+ self.setup_complete = False # 是否已完成初始化
+ self.use_file_storage = True # 是否使用文件存储(而非内存)
+ self.post_limit = getattr(connection_options, 'POST_LIMIT', 128 * 1024 * 1024) # POST 请求大小限制
+ self.path = connection_options.get('PATH') # 索引文件存储路径
if connection_options.get('STORAGE', 'file') != 'file':
- self.use_file_storage = False
+ self.use_file_storage = False # 使用 RAM 存储
if self.use_file_storage and not self.path:
raise ImproperlyConfigured(
- "You must specify a 'PATH' in your settings for connection '%s'." %
- connection_alias)
+ "You must specify a 'PATH' in your settings for connection '%s'." % connection_alias)
- self.log = logging.getLogger('haystack')
+ self.log = logging.getLogger('haystack') # 日志记录器
def setup(self):
"""
- Defers loading until needed.
+ 初始化索引环境:创建目录、构建 schema、打开或创建索引。
+ 延迟加载,首次使用时调用。
"""
from haystack import connections
new_index = False
- # Make sure the index is there.
+ # 创建索引目录
if self.use_file_storage and not os.path.exists(self.path):
os.makedirs(self.path)
new_index = True
+ # 检查写权限
if self.use_file_storage and not os.access(self.path, os.W_OK):
raise IOError(
- "The path to your Whoosh index '%s' is not writable for the current user/group." %
- self.path)
+ "The path to your Whoosh index '%s' is not writable for the current user/group." % self.path)
+ # 选择存储方式:文件 or 内存
if self.use_file_storage:
self.storage = FileStorage(self.path)
else:
global LOCALS
-
if getattr(LOCALS, 'RAM_STORE', None) is None:
LOCALS.RAM_STORE = RamStorage()
-
self.storage = LOCALS.RAM_STORE
+ # 构建索引字段 schema
self.content_field_name, self.schema = self.build_schema(
connections[self.connection_alias].get_unified_index().all_searchfields())
- self.parser = QueryParser(self.content_field_name, schema=self.schema)
+ self.parser = QueryParser(self.content_field_name, schema=self.schema) # 查询解析器
+ # 打开或创建索引
if new_index is True:
self.index = self.storage.create_index(self.schema)
else:
@@ -145,106 +145,92 @@ class WhooshSearchBackend(BaseSearchBackend):
self.setup_complete = True
def build_schema(self, fields):
+ """
+ 根据 Django 模型字段定义构建 Whoosh 的 Schema(索引结构)。
+ :param fields: 所有注册的 SearchField 字段
+ :return: (content_field_name, schema) 内容字段名和 schema 对象
+ """
schema_fields = {
ID: WHOOSH_ID(stored=True, unique=True),
DJANGO_CT: WHOOSH_ID(stored=True),
DJANGO_ID: WHOOSH_ID(stored=True),
}
- # Grab the number of keys that are hard-coded into Haystack.
- # We'll use this to (possibly) fail slightly more gracefully later.
initial_key_count = len(schema_fields)
content_field_name = ''
for field_name, field_class in fields.items():
if field_class.is_multivalued:
if field_class.indexed is False:
- schema_fields[field_class.index_fieldname] = IDLIST(
- stored=True, field_boost=field_class.boost)
+ schema_fields[field_class.index_fieldname] = IDLIST(stored=True, field_boost=field_class.boost)
else:
schema_fields[field_class.index_fieldname] = KEYWORD(
stored=True, commas=True, scorable=True, field_boost=field_class.boost)
elif field_class.field_type in ['date', 'datetime']:
- schema_fields[field_class.index_fieldname] = DATETIME(
- stored=field_class.stored, sortable=True)
+ schema_fields[field_class.index_fieldname] = DATETIME(stored=field_class.stored, sortable=True)
elif field_class.field_type == 'integer':
- schema_fields[field_class.index_fieldname] = NUMERIC(
- stored=field_class.stored, numtype=int, field_boost=field_class.boost)
+ schema_fields[field_class.index_fieldname] = NUMERIC(stored=field_class.stored, numtype=int, field_boost=field_class.boost)
elif field_class.field_type == 'float':
- schema_fields[field_class.index_fieldname] = NUMERIC(
- stored=field_class.stored, numtype=float, field_boost=field_class.boost)
+ schema_fields[field_class.index_fieldname] = NUMERIC(stored=field_class.stored, numtype=float, field_boost=field_class.boost)
elif field_class.field_type == 'boolean':
- # Field boost isn't supported on BOOLEAN as of 1.8.2.
- schema_fields[field_class.index_fieldname] = BOOLEAN(
- stored=field_class.stored)
+ schema_fields[field_class.index_fieldname] = BOOLEAN(stored=field_class.stored)
elif field_class.field_type == 'ngram':
- schema_fields[field_class.index_fieldname] = NGRAM(
- minsize=3, maxsize=15, stored=field_class.stored, field_boost=field_class.boost)
+ schema_fields[field_class.index_fieldname] = NGRAM(minsize=3, maxsize=15, stored=field_class.stored, field_boost=field_class.boost)
elif field_class.field_type == 'edge_ngram':
- schema_fields[field_class.index_fieldname] = NGRAMWORDS(minsize=2, maxsize=15, at='start',
- stored=field_class.stored,
- field_boost=field_class.boost)
+ schema_fields[field_class.index_fieldname] = NGRAMWORDS(minsize=2, maxsize=15, at='start', stored=field_class.stored, field_boost=field_class.boost)
else:
- # schema_fields[field_class.index_fieldname] = TEXT(stored=True, analyzer=StemmingAnalyzer(), field_boost=field_class.boost, sortable=True)
- schema_fields[field_class.index_fieldname] = TEXT(
- stored=True, analyzer=ChineseAnalyzer(), field_boost=field_class.boost, sortable=True)
+ # 默认文本字段,使用中文分词器
+ schema_fields[field_class.index_fieldname] = TEXT(stored=True, analyzer=ChineseAnalyzer(), field_boost=field_class.boost, sortable=True)
if field_class.document is True:
content_field_name = field_class.index_fieldname
- schema_fields[field_class.index_fieldname].spelling = True
+ schema_fields[field_class.index_fieldname].spelling = True # 支持拼写检查
- # Fail more gracefully than relying on the backend to die if no fields
- # are found.
if len(schema_fields) <= initial_key_count:
- raise SearchBackendError(
- "No fields were found in any search_indexes. Please correct this before attempting to search.")
+ raise SearchBackendError("No fields were found in any search_indexes. Please correct this before attempting to search.")
return (content_field_name, Schema(**schema_fields))
def update(self, index, iterable, commit=True):
+ """
+ 更新索引文档(添加或更新)。
+ :param index: SearchIndex 实例
+ :param iterable: 要索引的对象集合(如 QuerySet)
+ :param commit: 是否立即提交(实际被忽略,Always commit)
+ """
if not self.setup_complete:
self.setup()
self.index = self.index.refresh()
- writer = AsyncWriter(self.index)
+ writer = AsyncWriter(self.index) # 异步写入,避免锁
for obj in iterable:
try:
- doc = index.full_prepare(obj)
+ doc = index.full_prepare(obj) # 准备文档数据
except SkipDocument:
self.log.debug(u"Indexing for object `%s` skipped", obj)
else:
- # Really make sure it's unicode, because Whoosh won't have it any
- # other way.
for key in doc:
- doc[key] = self._from_python(doc[key])
+ doc[key] = self._from_python(doc[key]) # 转换为 Whoosh 可接受类型
- # Document boosts aren't supported in Whoosh 2.5.0+.
if 'boost' in doc:
- del doc['boost']
+ del doc['boost'] # Whoosh 2.5+ 不支持文档级 boost
try:
writer.update_document(**doc)
except Exception as e:
if not self.silently_fail:
raise
-
- # We'll log the object identifier but won't include the actual object
- # to avoid the possibility of that generating encoding errors while
- # processing the log message:
- self.log.error(
- u"%s while preparing object for update" %
- e.__class__.__name__,
- exc_info=True,
- extra={
- "data": {
- "index": index,
- "object": get_identifier(obj)}})
+ self.log.error(u"%s while preparing object for update" % e.__class__.__name__, exc_info=True,
+ extra={"data": {"index": index, "object": get_identifier(obj)}})
if len(iterable) > 0:
- # For now, commit no matter what, as we run into locking issues
- # otherwise.
- writer.commit()
+ writer.commit() # 提交写入
def remove(self, obj_or_string, commit=True):
+ """
+ 从索引中删除一个文档。
+ :param obj_or_string: 要删除的对象或其 ID 字符串
+ :param commit: 是否立即提交
+ """
if not self.setup_complete:
self.setup()
@@ -252,21 +238,18 @@ class WhooshSearchBackend(BaseSearchBackend):
whoosh_id = get_identifier(obj_or_string)
try:
- self.index.delete_by_query(
- q=self.parser.parse(
- u'%s:"%s"' %
- (ID, whoosh_id)))
+ self.index.delete_by_query(q=self.parser.parse(u'%s:"%s"' % (ID, whoosh_id)))
except Exception as e:
if not self.silently_fail:
raise
-
- self.log.error(
- "Failed to remove document '%s' from Whoosh: %s",
- whoosh_id,
- e,
- exc_info=True)
+ self.log.error("Failed to remove document '%s' from Whoosh: %s", whoosh_id, e, exc_info=True)
def clear(self, models=None, commit=True):
+ """
+ 清空索引。可指定模型或清空全部。
+ :param models: 要清除的模型列表
+ :param commit: 是否提交
+ """
if not self.setup_complete:
self.setup()
@@ -277,176 +260,119 @@ class WhooshSearchBackend(BaseSearchBackend):
try:
if models is None:
- self.delete_index()
+ self.delete_index() # 删除整个索引
else:
models_to_delete = []
-
for model in models:
- models_to_delete.append(
- u"%s:%s" %
- (DJANGO_CT, get_model_ct(model)))
-
- self.index.delete_by_query(
- q=self.parser.parse(
- u" OR ".join(models_to_delete)))
+ models_to_delete.append(u"%s:%s" % (DJANGO_CT, get_model_ct(model)))
+ self.index.delete_by_query(q=self.parser.parse(u" OR ".join(models_to_delete)))
except Exception as e:
if not self.silently_fail:
raise
-
if models is not None:
- self.log.error(
- "Failed to clear Whoosh index of models '%s': %s",
- ','.join(models_to_delete),
- e,
- exc_info=True)
+ self.log.error("Failed to clear Whoosh index of models '%s': %s", ','.join(models_to_delete), e, exc_info=True)
else:
- self.log.error(
- "Failed to clear Whoosh index: %s", e, exc_info=True)
+ self.log.error("Failed to clear Whoosh index: %s", e, exc_info=True)
def delete_index(self):
- # Per the Whoosh mailing list, if wiping out everything from the index,
- # it's much more efficient to simply delete the index files.
+ """
+ 彻底删除索引目录并重建。
+ """
if self.use_file_storage and os.path.exists(self.path):
shutil.rmtree(self.path)
elif not self.use_file_storage:
self.storage.clean()
-
- # Recreate everything.
- self.setup()
+ self.setup() # 重新初始化
def optimize(self):
+ """
+ 优化索引(合并段),提升搜索性能。
+ """
if not self.setup_complete:
self.setup()
-
self.index = self.index.refresh()
self.index.optimize()
def calculate_page(self, start_offset=0, end_offset=None):
- # Prevent against Whoosh throwing an error. Requires an end_offset
- # greater than 0.
+ """
+ 计算分页参数(页码和每页数量),适配 Whoosh 的分页机制。
+ :return: (page_num, page_length)
+ """
if end_offset is not None and end_offset <= 0:
end_offset = 1
-
- # Determine the page.
- page_num = 0
-
if end_offset is None:
end_offset = 1000000
-
if start_offset is None:
start_offset = 0
-
page_length = end_offset - start_offset
-
- if page_length and page_length > 0:
- page_num = int(start_offset / page_length)
-
- # Increment because Whoosh uses 1-based page numbers.
- page_num += 1
+ page_num = int(start_offset / page_length) + 1 # Whoosh 页码从 1 开始
return page_num, page_length
@log_query
- def search(
- self,
- query_string,
- sort_by=None,
- start_offset=0,
- end_offset=None,
- fields='',
- highlight=False,
- facets=None,
- date_facets=None,
- query_facets=None,
- narrow_queries=None,
- spelling_query=None,
- within=None,
- dwithin=None,
- distance_point=None,
- models=None,
- limit_to_registered_models=None,
- result_class=None,
- **kwargs):
+ def search(self, query_string, sort_by=None, start_offset=0, end_offset=None, fields='', highlight=False,
+ facets=None, date_facets=None, query_facets=None, narrow_queries=None, spelling_query=None,
+ within=None, dwithin=None, distance_point=None, models=None, limit_to_registered_models=None,
+ result_class=None, **kwargs):
+ """
+ 执行搜索查询。
+ :param query_string: 查询字符串
+ :param sort_by: 排序字段
+ :param start_offset, end_offset: 分页偏移
+ :param highlight: 是否高亮
+ :param narrow_queries: 窄化查询(过滤)
+ :param models: 限制模型范围
+ :param result_class: 搜索结果类
+ :return: { 'results': [...], 'hits': int, 'spelling_suggestion': str }
+ """
if not self.setup_complete:
self.setup()
- # A zero length query should return no results.
if len(query_string) == 0:
- return {
- 'results': [],
- 'hits': 0,
- }
+ return {'results': [], 'hits': 0}
query_string = force_str(query_string)
- # A one-character query (non-wildcard) gets nabbed by a stopwords
- # filter and should yield zero results.
if len(query_string) <= 1 and query_string != u'*':
- return {
- 'results': [],
- 'hits': 0,
- }
+ return {'results': [], 'hits': 0}
reverse = False
-
if sort_by is not None:
- # Determine if we need to reverse the results and if Whoosh can
- # handle what it's being asked to sort by. Reversing is an
- # all-or-nothing action, unfortunately.
+ # 处理排序方向(仅支持全升或全降)
sort_by_list = []
reverse_counter = 0
-
for order_by in sort_by:
if order_by.startswith('-'):
reverse_counter += 1
-
if reverse_counter and reverse_counter != len(sort_by):
- raise SearchBackendError("Whoosh requires all order_by fields"
- " to use the same sort direction")
-
+ raise SearchBackendError("Whoosh requires all order_by fields to use the same sort direction")
for order_by in sort_by:
if order_by.startswith('-'):
sort_by_list.append(order_by[1:])
-
if len(sort_by_list) == 1:
reverse = True
else:
sort_by_list.append(order_by)
-
if len(sort_by_list) == 1:
reverse = False
-
sort_by = sort_by_list[0]
+ # 不支持 facets
if facets is not None:
- warnings.warn(
- "Whoosh does not handle faceting.",
- Warning,
- stacklevel=2)
-
+ warnings.warn("Whoosh does not handle faceting.", Warning, stacklevel=2)
if date_facets is not None:
- warnings.warn(
- "Whoosh does not handle date faceting.",
- Warning,
- stacklevel=2)
-
+ warnings.warn("Whoosh does not handle date faceting.", Warning, stacklevel=2)
if query_facets is not None:
- warnings.warn(
- "Whoosh does not handle query faceting.",
- Warning,
- stacklevel=2)
+ warnings.warn("Whoosh does not handle query faceting.", Warning, stacklevel=2)
narrowed_results = None
self.index = self.index.refresh()
+ # 处理模型过滤
if limit_to_registered_models is None:
- limit_to_registered_models = getattr(
- settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)
-
+ limit_to_registered_models = getattr(settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)
if models and len(models):
model_choices = sorted(get_model_ct(model) for model in models)
elif limit_to_registered_models:
- # Using narrow queries, limit the results to only models handled
- # with the current routers.
model_choices = self.build_models_list()
else:
model_choices = []
@@ -454,27 +380,15 @@ class WhooshSearchBackend(BaseSearchBackend):
if len(model_choices) > 0:
if narrow_queries is None:
narrow_queries = set()
-
- narrow_queries.add(' OR '.join(
- ['%s:%s' % (DJANGO_CT, rm) for rm in model_choices]))
+ narrow_queries.add(' OR '.join(['%s:%s' % (DJANGO_CT, rm) for rm in model_choices]))
narrow_searcher = None
-
if narrow_queries is not None:
- # Potentially expensive? I don't see another way to do it in
- # Whoosh...
narrow_searcher = self.index.searcher()
-
for nq in narrow_queries:
- recent_narrowed_results = narrow_searcher.search(
- self.parser.parse(force_str(nq)), limit=None)
-
+ recent_narrowed_results = narrow_searcher.search(self.parser.parse(force_str(nq)), limit=None)
if len(recent_narrowed_results) <= 0:
- return {
- 'results': [],
- 'hits': 0,
- }
-
+ return {'results': [], 'hits': 0}
if narrowed_results:
narrowed_results.filter(recent_narrowed_results)
else:
@@ -485,150 +399,82 @@ class WhooshSearchBackend(BaseSearchBackend):
if self.index.doc_count():
searcher = self.index.searcher()
parsed_query = self.parser.parse(query_string)
-
- # In the event of an invalid/stopworded query, recover gracefully.
if parsed_query is None:
- return {
- 'results': [],
- 'hits': 0,
- }
+ return {'results': [], 'hits': 0}
- page_num, page_length = self.calculate_page(
- start_offset, end_offset)
+ page_num, page_length = self.calculate_page(start_offset, end_offset)
search_kwargs = {
'pagelen': page_length,
'sortedby': sort_by,
'reverse': reverse,
}
-
- # Handle the case where the results have been narrowed.
if narrowed_results is not None:
search_kwargs['filter'] = narrowed_results
try:
- raw_page = searcher.search_page(
- parsed_query,
- page_num,
- **search_kwargs
- )
+ raw_page = searcher.search_page(parsed_query, page_num, **search_kwargs)
except ValueError:
if not self.silently_fail:
raise
+ return {'results': [], 'hits': 0, 'spelling_suggestion': None}
- return {
- 'results': [],
- 'hits': 0,
- 'spelling_suggestion': None,
- }
-
- # Because as of Whoosh 2.5.1, it will return the wrong page of
- # results if you request something too high. :(
if raw_page.pagenum < page_num:
- return {
- 'results': [],
- 'hits': 0,
- 'spelling_suggestion': None,
- }
-
- results = self._process_results(
- raw_page,
- highlight=highlight,
- query_string=query_string,
- spelling_query=spelling_query,
- result_class=result_class)
- searcher.close()
+ return {'results': [], 'hits': 0, 'spelling_suggestion': None}
+ results = self._process_results(raw_page, highlight=highlight, query_string=query_string,
+ spelling_query=spelling_query, result_class=result_class)
+ searcher.close()
if hasattr(narrow_searcher, 'close'):
narrow_searcher.close()
-
return results
else:
- if self.include_spelling:
- if spelling_query:
- spelling_suggestion = self.create_spelling_suggestion(
- spelling_query)
- else:
- spelling_suggestion = self.create_spelling_suggestion(
- query_string)
- else:
- spelling_suggestion = None
-
- return {
- 'results': [],
- 'hits': 0,
- 'spelling_suggestion': spelling_suggestion,
- }
+ spelling_suggestion = self.create_spelling_suggestion(spelling_query or query_string) \
+ if self.include_spelling else None
+ return {'results': [], 'hits': 0, 'spelling_suggestion': spelling_suggestion}
- def more_like_this(
- self,
- model_instance,
- additional_query_string=None,
- start_offset=0,
- end_offset=None,
- models=None,
- limit_to_registered_models=None,
- result_class=None,
- **kwargs):
+ def more_like_this(self, model_instance, additional_query_string=None, start_offset=0, end_offset=None,
+ models=None, limit_to_registered_models=None, result_class=None, **kwargs):
+ """
+ 基于某文档查找相似文档("More Like This" 功能)。
+ """
if not self.setup_complete:
self.setup()
- # Deferred models will have a different class ("RealClass_Deferred_fieldname")
- # which won't be in our registry:
model_klass = model_instance._meta.concrete_model
-
field_name = self.content_field_name
narrow_queries = set()
narrowed_results = None
self.index = self.index.refresh()
+ # 模型过滤逻辑同 search
if limit_to_registered_models is None:
- limit_to_registered_models = getattr(
- settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)
-
+ limit_to_registered_models = getattr(settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)
if models and len(models):
model_choices = sorted(get_model_ct(model) for model in models)
elif limit_to_registered_models:
- # Using narrow queries, limit the results to only models handled
- # with the current routers.
model_choices = self.build_models_list()
else:
model_choices = []
if len(model_choices) > 0:
- if narrow_queries is None:
- narrow_queries = set()
-
- narrow_queries.add(' OR '.join(
- ['%s:%s' % (DJANGO_CT, rm) for rm in model_choices]))
-
+ narrow_queries.add(' OR '.join(['%s:%s' % (DJANGO_CT, rm) for rm in model_choices]))
if additional_query_string and additional_query_string != '*':
narrow_queries.add(additional_query_string)
narrow_searcher = None
-
- if narrow_queries is not None:
- # Potentially expensive? I don't see another way to do it in
- # Whoosh...
+ if narrow_queries:
narrow_searcher = self.index.searcher()
-
for nq in narrow_queries:
- recent_narrowed_results = narrow_searcher.search(
- self.parser.parse(force_str(nq)), limit=None)
-
+ recent_narrowed_results = narrow_searcher.search(self.parser.parse(force_str(nq)), limit=None)
if len(recent_narrowed_results) <= 0:
- return {
- 'results': [],
- 'hits': 0,
- }
-
+ return {'results': [], 'hits': 0}
if narrowed_results:
narrowed_results.filter(recent_narrowed_results)
else:
narrowed_results = recent_narrowed_results
page_num, page_length = self.calculate_page(start_offset, end_offset)
-
self.index = self.index.refresh()
raw_results = EmptyResults()
@@ -637,61 +483,37 @@ class WhooshSearchBackend(BaseSearchBackend):
searcher = self.index.searcher()
parsed_query = self.parser.parse(query)
results = searcher.search(parsed_query)
-
if len(results):
- raw_results = results[0].more_like_this(
- field_name, top=end_offset)
-
- # Handle the case where the results have been narrowed.
- if narrowed_results is not None and hasattr(raw_results, 'filter'):
+ raw_results = results[0].more_like_this(field_name, top=end_offset)
+ if narrowed_results and hasattr(raw_results, 'filter'):
raw_results.filter(narrowed_results)
+ try:
+ raw_page = ResultsPage(raw_results, page_num, page_length)
+ except ValueError:
+ if not self.silently_fail:
+ raise
+ return {'results': [], 'hits': 0}
+ if raw_page.pagenum < page_num:
+ return {'results': [], 'hits': 0}
+ results = self._process_results(raw_page, result_class=result_class)
+ searcher.close()
+ if hasattr(narrow_searcher, 'close'):
+ narrow_searcher.close()
+ return results
+ return {'results': [], 'hits': 0}
- try:
- raw_page = ResultsPage(raw_results, page_num, page_length)
- except ValueError:
- if not self.silently_fail:
- raise
-
- return {
- 'results': [],
- 'hits': 0,
- 'spelling_suggestion': None,
- }
-
- # Because as of Whoosh 2.5.1, it will return the wrong page of
- # results if you request something too high. :(
- if raw_page.pagenum < page_num:
- return {
- 'results': [],
- 'hits': 0,
- 'spelling_suggestion': None,
- }
-
- results = self._process_results(raw_page, result_class=result_class)
- searcher.close()
-
- if hasattr(narrow_searcher, 'close'):
- narrow_searcher.close()
-
- return results
-
- def _process_results(
- self,
- raw_page,
- highlight=False,
- query_string='',
- spelling_query=None,
- result_class=None):
+ def _process_results(self, raw_page, highlight=False, query_string='', spelling_query=None, result_class=None):
+ """
+ 处理原始搜索结果,转换为 SearchResult 对象列表。
+ :param raw_page: Whoosh 返回的原始结果页
+ :param highlight: 是否生成高亮文本
+ :return: { 'results': [...], 'hits': int, 'spelling_suggestion': str }
+ """
from haystack import connections
results = []
-
- # It's important to grab the hits first before slicing. Otherwise, this
- # can cause pagination failures.
hits = len(raw_page)
-
if result_class is None:
result_class = SearchResult
-
facets = {}
spelling_suggestion = None
unified_index = connections[self.connection_alias].get_unified_index()
@@ -707,58 +529,31 @@ class WhooshSearchBackend(BaseSearchBackend):
for key, value in raw_result.items():
index = unified_index.get_index(model)
string_key = str(key)
-
- if string_key in index.fields and hasattr(
- index.fields[string_key], 'convert'):
- # Special-cased due to the nature of KEYWORD fields.
+ if string_key in index.fields and hasattr(index.fields[string_key], 'convert'):
if index.fields[string_key].is_multivalued:
- if value is None or len(value) == 0:
- additional_fields[string_key] = []
- else:
- additional_fields[string_key] = value.split(
- ',')
+ additional_fields[string_key] = value.split(',') if value else []
else:
- additional_fields[string_key] = index.fields[string_key].convert(
- value)
+ additional_fields[string_key] = index.fields[string_key].convert(value)
else:
additional_fields[string_key] = self._to_python(value)
-
- del (additional_fields[DJANGO_CT])
- del (additional_fields[DJANGO_ID])
+ del additional_fields[DJANGO_CT]
+ del additional_fields[DJANGO_ID]
if highlight:
sa = StemmingAnalyzer()
formatter = WhooshHtmlFormatter('em')
terms = [token.text for token in sa(query_string)]
+ whoosh_result = whoosh_highlight(additional_fields.get(self.content_field_name),
+ terms, sa, ContextFragmenter(), formatter)
+ additional_fields['highlighted'] = {self.content_field_name: [whoosh_result]}
- whoosh_result = whoosh_highlight(
- additional_fields.get(self.content_field_name),
- terms,
- sa,
- ContextFragmenter(),
- formatter
- )
- additional_fields['highlighted'] = {
- self.content_field_name: [whoosh_result],
- }
-
- result = result_class(
- app_label,
- model_name,
- raw_result[DJANGO_ID],
- score,
- **additional_fields)
+ result = result_class(app_label, model_name, raw_result[DJANGO_ID], score, **additional_fields)
results.append(result)
else:
hits -= 1
if self.include_spelling:
- if spelling_query:
- spelling_suggestion = self.create_spelling_suggestion(
- spelling_query)
- else:
- spelling_suggestion = self.create_spelling_suggestion(
- query_string)
+ spelling_suggestion = self.create_spelling_suggestion(spelling_query or query_string)
return {
'results': results,
@@ -768,52 +563,40 @@ class WhooshSearchBackend(BaseSearchBackend):
}
def create_spelling_suggestion(self, query_string):
- spelling_suggestion = None
+ """
+ 生成拼写纠错建议。
+ :param query_string: 原始查询字符串
+ :return: 建议字符串或 None
+ """
+ if not query_string:
+ return None
reader = self.index.reader()
corrector = reader.corrector(self.content_field_name)
cleaned_query = force_str(query_string)
-
- if not query_string:
- return spelling_suggestion
-
- # Clean the string.
for rev_word in self.RESERVED_WORDS:
cleaned_query = cleaned_query.replace(rev_word, '')
-
for rev_char in self.RESERVED_CHARACTERS:
cleaned_query = cleaned_query.replace(rev_char, '')
-
- # Break it down.
query_words = cleaned_query.split()
suggested_words = []
-
for word in query_words:
suggestions = corrector.suggest(word, limit=1)
-
- if len(suggestions) > 0:
+ if suggestions:
suggested_words.append(suggestions[0])
-
- spelling_suggestion = ' '.join(suggested_words)
- return spelling_suggestion
+ return ' '.join(suggested_words)
def _from_python(self, value):
"""
- Converts Python values to a string for Whoosh.
-
- Code courtesy of pysolr.
+ 将 Python 值转换为 Whoosh 可索引的字符串。
"""
if hasattr(value, 'strftime'):
if not hasattr(value, 'hour'):
value = datetime(value.year, value.month, value.day, 0, 0, 0)
elif isinstance(value, bool):
- if value:
- value = 'true'
- else:
- value = 'false'
+ value = 'true' if value else 'false'
elif isinstance(value, (list, tuple)):
value = u','.join([force_str(v) for v in value])
elif isinstance(value, (six.integer_types, float)):
- # Leave it alone.
pass
else:
value = force_str(value)
@@ -821,57 +604,35 @@ class WhooshSearchBackend(BaseSearchBackend):
def _to_python(self, value):
"""
- Converts values from Whoosh to native Python values.
-
- A port of the same method in pysolr, as they deal with data the same way.
+ 将 Whoosh 存储的值转换回 Python 类型。
"""
if value == 'true':
return True
elif value == 'false':
return False
-
if value and isinstance(value, six.string_types):
possible_datetime = DATETIME_REGEX.search(value)
-
if possible_datetime:
date_values = possible_datetime.groupdict()
-
for dk, dv in date_values.items():
date_values[dk] = int(dv)
-
- return datetime(
- date_values['year'],
- date_values['month'],
- date_values['day'],
- date_values['hour'],
- date_values['minute'],
- date_values['second'])
-
+ return datetime(**date_values)
try:
- # Attempt to use json to load the values.
converted_value = json.loads(value)
-
- # Try to handle most built-in types.
- if isinstance(
- converted_value,
- (list,
- tuple,
- set,
- dict,
- six.integer_types,
- float,
- complex)):
+ if isinstance(converted_value, (list, tuple, set, dict, six.integer_types, float, complex)):
return converted_value
except BaseException:
- # If it fails (SyntaxError or its ilk) or we don't trust it,
- # continue on.
pass
-
return value
class WhooshSearchQuery(BaseSearchQuery):
+ """
+ Whoosh 查询构建器,负责将 Django 查询语法转换为 Whoosh 查询字符串。
+ """
+
def _convert_datetime(self, date):
+ """将 datetime 转换为 Whoosh 可识别的字符串格式"""
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S'))
else:
@@ -879,62 +640,43 @@ class WhooshSearchQuery(BaseSearchQuery):
def clean(self, query_fragment):
"""
- Provides a mechanism for sanitizing user input before presenting the
- value to the backend.
-
- Whoosh 1.X differs here in that you can no longer use a backslash
- to escape reserved characters. Instead, the whole word should be
- quoted.
+ 清理用户输入的查询片段,避免保留字和字符引发语法错误。
"""
words = query_fragment.split()
cleaned_words = []
-
for word in words:
if word in self.backend.RESERVED_WORDS:
- word = word.replace(word, word.lower())
-
+ word = word.lower()
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = "'%s'" % word
break
-
cleaned_words.append(word)
-
return ' '.join(cleaned_words)
def build_query_fragment(self, field, filter_type, value):
+ """
+ 构建单个查询片段,如 "title:django" 或 "pub_date:[20200101 TO 20201231]"
+ """
from haystack import connections
query_frag = ''
is_datetime = False
if not hasattr(value, 'input_type_name'):
- # Handle when we've got a ``ValuesListQuerySet``...
if hasattr(value, 'values_list'):
value = list(value)
-
if hasattr(value, 'strftime'):
is_datetime = True
-
if isinstance(value, six.string_types) and value != ' ':
- # It's not an ``InputType``. Assume ``Clean``.
value = Clean(value)
else:
value = PythonData(value)
- # Prepare the query using the InputType.
prepared_value = value.prepare(self)
-
if not isinstance(prepared_value, (set, list, tuple)):
- # Then convert whatever we get back to what pysolr wants if needed.
prepared_value = self.backend._from_python(prepared_value)
- # 'content' is a special reserved word, much like 'pk' in
- # Django's ORM layer. It indicates 'no special field'.
- if field == 'content':
- index_fieldname = ''
- else:
- index_fieldname = u'%s:' % connections[self._using].get_unified_index(
- ).get_index_fieldname(field)
+ index_fieldname = '' if field == 'content' else u'%s:' % connections[self._using].get_unified_index().get_index_fieldname(field)
filter_types = {
'content': '%s',
@@ -952,93 +694,45 @@ class WhooshSearchQuery(BaseSearchQuery):
if value.post_process is False:
query_frag = prepared_value
else:
- if filter_type in [
- 'content',
- 'contains',
- 'startswith',
- 'endswith',
- 'fuzzy']:
- if value.input_type_name == 'exact':
- query_frag = prepared_value
- else:
- # Iterate over terms & incorportate the converted form of
- # each into the query.
- terms = []
-
- if isinstance(prepared_value, six.string_types):
- possible_values = prepared_value.split(' ')
- else:
- if is_datetime is True:
- prepared_value = self._convert_datetime(
- prepared_value)
-
- possible_values = [prepared_value]
-
- for possible_value in possible_values:
- terms.append(
- filter_types[filter_type] %
- self.backend._from_python(possible_value))
-
- if len(terms) == 1:
- query_frag = terms[0]
- else:
- query_frag = u"(%s)" % " AND ".join(terms)
+ if filter_type in ['content', 'contains', 'startswith', 'endswith', 'fuzzy']:
+ terms = []
+ possible_values = prepared_value.split(' ') if isinstance(prepared_value, six.string_types) else [prepared_value]
+ if is_datetime:
+ possible_values = [self._convert_datetime(pv) for pv in possible_values]
+ for pv in possible_values:
+ terms.append(filter_types[filter_type] % self.backend._from_python(pv))
+ query_frag = terms[0] if len(terms) == 1 else u"(%s)" % " AND ".join(terms)
elif filter_type == 'in':
in_options = []
-
- for possible_value in prepared_value:
- is_datetime = False
-
- if hasattr(possible_value, 'strftime'):
- is_datetime = True
-
- pv = self.backend._from_python(possible_value)
-
- if is_datetime is True:
- pv = self._convert_datetime(pv)
-
- if isinstance(pv, six.string_types) and not is_datetime:
- in_options.append('"%s"' % pv)
- else:
- in_options.append('%s' % pv)
-
+ for pv in prepared_value:
+ is_dt = hasattr(pv, 'strftime')
+ pv = self.backend._from_python(pv)
+ pv = self._convert_datetime(pv) if is_dt else pv
+ in_options.append('"%s"' % pv if isinstance(pv, six.string_types) and not is_dt else '%s' % pv)
query_frag = "(%s)" % " OR ".join(in_options)
elif filter_type == 'range':
start = self.backend._from_python(prepared_value[0])
end = self.backend._from_python(prepared_value[1])
-
- if hasattr(prepared_value[0], 'strftime'):
- start = self._convert_datetime(start)
-
- if hasattr(prepared_value[1], 'strftime'):
- end = self._convert_datetime(end)
-
+ start = self._convert_datetime(start) if hasattr(prepared_value[0], 'strftime') else start
+ end = self._convert_datetime(end) if hasattr(prepared_value[1], 'strftime') else end
query_frag = u"[%s to %s]" % (start, end)
elif filter_type == 'exact':
- if value.input_type_name == 'exact':
- query_frag = prepared_value
- else:
- prepared_value = Exact(prepared_value).prepare(self)
- query_frag = filter_types[filter_type] % prepared_value
+ prepared_value = Exact(prepared_value).prepare(self)
+ query_frag = filter_types[filter_type] % prepared_value
else:
- if is_datetime is True:
+ if is_datetime:
prepared_value = self._convert_datetime(prepared_value)
-
query_frag = filter_types[filter_type] % prepared_value
- if len(query_frag) and not isinstance(value, Raw):
- if not query_frag.startswith('(') and not query_frag.endswith(')'):
- query_frag = "(%s)" % query_frag
+ if query_frag and not isinstance(value, Raw) and not query_frag.startswith('(') and not query_frag.endswith(')'):
+ query_frag = "(%s)" % query_frag
return u"%s%s" % (index_fieldname, query_frag)
- # if not filter_type in ('in', 'range'):
- # # 'in' is a bit of a special case, as we don't want to
- # # convert a valid list/tuple to string. Defer handling it
- # # until later...
- # value = self.backend._from_python(value)
-
class WhooshEngine(BaseEngine):
+ """
+ Haystack 引擎注册类,绑定 Backend 和 Query 类。
+ """
backend = WhooshSearchBackend
- query = WhooshSearchQuery
+ query = WhooshSearchQuery
\ No newline at end of file
diff --git a/src/djangoblog/wsgi.py b/src/djangoblog/wsgi.py
index 2295efd..b62d8ab 100644
--- a/src/djangoblog/wsgi.py
+++ b/src/djangoblog/wsgi.py
@@ -7,10 +7,19 @@ For more information on this file, see
https://docs.djangoproject.com/en/1.10/howto/deployment/wsgi/
"""
+# 导入Python标准库中的os模块,用于与操作系统进行交互,例如设置环境变量
import os
+# 从Django框架中导入get_wsgi_application函数
+# 该函数用于创建一个符合WSGI规范的应用程序对象,是Django与Web服务器(如Apache、Nginx、Gunicorn等)通信的入口
from django.core.wsgi import get_wsgi_application
+# 设置环境变量DJANGO_SETTINGS_MODULE为'djangoblog.settings'
+# 这行代码告诉Django项目应该使用哪个配置文件(settings模块)
+# 在这个例子中,使用的是djangoblog项目下的settings.py文件来加载配置
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoblog.settings")
-application = get_wsgi_application()
+# 调用get_wsgi_application()函数,生成一个WSGI可调用的应用程序实例
+# 这个实例(赋值给变量application)是WSGI服务器用来处理HTTP请求的入口点
+# 所有进入Django应用的请求都会通过这个application对象进行分发和处理
+application = get_wsgi_application()
\ No newline at end of file