ZYY代码注释

ZYY_branch
zyy 4 months ago
parent d893d72e44
commit ee8428a382

@ -1,22 +1,25 @@
# ZYY 导入 Django 内置的 AdminSite 和 LogEntry 模型
from django.contrib.admin import AdminSite
from django.contrib.admin.models import LogEntry
from django.contrib.sites.admin import SiteAdmin
from django.contrib.sites.models import Site
from accounts.admin import *
from blog.admin import *
from blog.models import *
from comments.admin import *
from comments.models import *
from djangoblog.logentryadmin import LogEntryAdmin
from oauth.admin import *
from oauth.models import *
from owntracks.admin import *
from owntracks.models import *
from servermanager.admin import *
from servermanager.models import *
from django.contrib.admin.models import LogEntry # ZYY操作日志模型
from django.contrib.sites.admin import SiteAdmin # ZYYDjango 内置站点管理
from django.contrib.sites.models import Site # ZYY多站点支持模型
# ZYY 导入自定义应用的 admin 和 models
from accounts.admin import * #ZYY 用户账户管理
from blog.admin import *# ZYY博客核心管理
from blog.models import * # ZYY博客数据模型
from comments.admin import *#ZYY 评论管理
from comments.models import * # ZYY评论数据模型
# ZYY 导入自定义的 LogEntryAdmin
from djangoblog.logentryadmin import LogEntryAdmin # ZYY自定义日志管理
from oauth.admin import * # ZYY第三方登录管理
from oauth.models import * # ZYY第三方登录模型
from owntracks.admin import * #ZYY 位置跟踪管理
from owntracks.models import *# ZYY位置跟踪模型
from servermanager.admin import * #ZYY 服务器管理
from servermanager.models import *# ZYY服务器模型
# ZYY 自定义 AdminSite 类
class DjangoBlogAdminSite(AdminSite):
site_header = 'djangoblog administration'
site_title = 'djangoblog site admin'
@ -27,6 +30,7 @@ class DjangoBlogAdminSite(AdminSite):
def has_permission(self, request):
return request.user.is_superuser
# ZYY 自定义 URL 的示例(已注释)
# def get_urls(self):
# urls = super().get_urls()
# from django.urls import path
@ -37,28 +41,37 @@ class DjangoBlogAdminSite(AdminSite):
# ]
# return urls + my_urls
# ZYY 实例化自定义 AdminSite
admin_site = DjangoBlogAdminSite(name='admin')
admin_site.register(Article, ArticlelAdmin)
admin_site.register(Category, CategoryAdmin)
admin_site.register(Tag, TagAdmin)
admin_site.register(Links, LinksAdmin)
admin_site.register(SideBar, SideBarAdmin)
admin_site.register(BlogSettings, BlogSettingsAdmin)
# ZYY 注册 blog 应用的模型和管理类
admin_site.register(Article, ArticlelAdmin)# ZYY文章管理
admin_site.register(Category, CategoryAdmin) # ZYY分类管理
admin_site.register(Tag, TagAdmin) #ZYY 标签管理
admin_site.register(Links, LinksAdmin) # ZYY友情链接
admin_site.register(SideBar, SideBarAdmin)# ZYY侧边栏配置
admin_site.register(BlogSettings, BlogSettingsAdmin)# ZYY博客全局设置
#ZYY 注册 servermanager 应用的模型和管理类
admin_site.register(commands, CommandsAdmin) #ZYY 命令记录
admin_site.register(EmailSendLog, EmailSendLogAdmin)# ZYY邮件日志
admin_site.register(commands, CommandsAdmin)
admin_site.register(EmailSendLog, EmailSendLogAdmin)
# ZYY 注册 accounts 应用的模型和管理类
admin_site.register(BlogUser, BlogUserAdmin) # ZYY博客用户
admin_site.register(BlogUser, BlogUserAdmin)
# ZYY 注册 comments 应用的模型和管理类
admin_site.register(Comment, CommentAdmin)#ZYY 评论内容
admin_site.register(Comment, CommentAdmin)
admin_site.register(OAuthUser, OAuthUserAdmin)
admin_site.register(OAuthConfig, OAuthConfigAdmin)
# ZYY 注册 oauth 应用的模型和管理类
admin_site.register(OAuthUser, OAuthUserAdmin) #ZYY绑定账号
admin_site.register(OAuthConfig, OAuthConfigAdmin) #ZYY 平台配置
admin_site.register(OwnTrackLog, OwnTrackLogsAdmin)
# ZYY 注册 owntracks 应用的模型和管理类
admin_site.register(OwnTrackLog, OwnTrackLogsAdmin) # ZYY移动轨迹
admin_site.register(Site, SiteAdmin)
# ZYY 注册 Django 内置的 Site 模型和管理类
admin_site.register(Site, SiteAdmin) # ZYY多站点配置
admin_site.register(LogEntry, LogEntryAdmin)
# ZYY 注册 Django 内置的 LogEntry 模型和自定义 LogEntryAdmin
admin_site.register(LogEntry, LogEntryAdmin) # ZYY管理操作日志

@ -1,11 +1,27 @@
# ZYYDjango 应用配置类
from django.apps import AppConfig
class DjangoblogAppConfig(AppConfig):
# ZYY: 指定默认主键字段类型为BigAutoField64位自增ID
# ZYY替代旧版AutoField32位适合数据量大的应用
default_auto_field = 'django.db.models.BigAutoField'
# ZYY: 应用唯一标识,需与项目目录名一致
# 用于Django内部识别应用如管理后台、迁移等
name = 'djangoblog'
def ready(self):
super().ready()
# Import and load plugins here
"""ZYY: 应用启动时的初始化钩子
- Django在完成应用注册后会自动调用
- 适合执行启动时加载的任务如插件系统信号注册等
- 注意此方法可能被多次调用特别是在开发服务器热重载时
"""
super().ready() # 确保父类初始化逻辑执行
# ZYY: 插件系统加载入口
# ZYY设计说明
# ZYY1. 延迟导入避免循环依赖AppConfig初始化阶段不宜大量导入
# ZYY2. 插件系统应实现幂等性应对ready()多次调用)
# ZYY3. 建议添加异常处理防止插件加载失败影响应用启动
from .plugin_manage.loader import load_plugins
load_plugins()
load_plugins()

@ -1,34 +1,41 @@
import _thread
# ZYY信号处理与系统通知模块
import _thread # ZYY: 使用底层线程处理耗时操作(如邮件发送),避免阻塞主请求
import logging
import django.dispatch
from django.conf import settings
from django.contrib.admin.models import LogEntry
from django.contrib.admin.models import LogEntry # ZYY: 排除管理后台操作日志的缓存清理
from django.contrib.auth.signals import user_logged_in, user_logged_out
from django.core.mail import EmailMultiAlternatives
from django.db.models.signals import post_save
from django.dispatch import receiver
from comments.models import Comment
from comments.utils import send_comment_email
from djangoblog.spider_notify import SpiderNotify
from comments.utils import send_comment_email # ZYY: 异步发送评论通知邮件
from djangoblog.spider_notify import SpiderNotify # ZYY: 搜索引擎推送接口
from djangoblog.utils import cache, expire_view_cache, delete_sidebar_cache, delete_view_cache
from djangoblog.utils import get_current_site
from oauth.models import OAuthUser
logger = logging.getLogger(__name__)
oauth_user_login_signal = django.dispatch.Signal(['id'])
send_email_signal = django.dispatch.Signal(
['emailto', 'title', 'content'])
# ZYY: 自定义信号定义
oauth_user_login_signal = django.dispatch.Signal(['id']) # ZYY: OAuth用户登录后处理信号
send_email_signal = django.dispatch.Signal(['emailto', 'title', 'content']) # ZYY: 邮件发送信号
@receiver(send_email_signal)
def send_email_signal_handler(sender, **kwargs):
"""ZYY: 邮件发送信号处理器
- 使用信号机制解耦邮件发送逻辑
- 自动记录发送日志到数据库
- 捕获异常避免影响主流程
"""
emailto = kwargs['emailto']
title = kwargs['title']
content = kwargs['content']
# ZYY: 构造多部分邮件支持HTML内容
msg = EmailMultiAlternatives(
title,
content,
@ -36,6 +43,7 @@ def send_email_signal_handler(sender, **kwargs):
to=emailto)
msg.content_subtype = "html"
# ZYY: 记录邮件发送日志
from servermanager.models import EmailSendLog
log = EmailSendLog()
log.title = title
@ -44,7 +52,7 @@ def send_email_signal_handler(sender, **kwargs):
try:
result = msg.send()
log.send_result = result > 0
log.send_result = result > 0 # ZYY: 根据返回值判断是否发送成功
except Exception as e:
logger.error(f"失败邮箱号: {emailto}, {e}")
log.send_result = False
@ -53,62 +61,78 @@ def send_email_signal_handler(sender, **kwargs):
@receiver(oauth_user_login_signal)
def oauth_user_login_signal_handler(sender, **kwargs):
"""ZYY: OAuth用户登录后处理
- 自动处理头像域名适配
- 清理侧边栏缓存
"""
id = kwargs['id']
oauthuser = OAuthUser.objects.get(id=id)
site = get_current_site().domain
# ZYY: 处理头像URL域名适配避免混合内容警告
if oauthuser.picture and not oauthuser.picture.find(site) >= 0:
from djangoblog.utils import save_user_avatar
oauthuser.picture = save_user_avatar(oauthuser.picture)
oauthuser.save()
delete_sidebar_cache()
delete_sidebar_cache() # ZYY: 用户信息变更后清理相关缓存
@receiver(post_save)
def model_post_save_callback(
sender,
instance,
created,
raw,
using,
update_fields,
**kwargs):
def model_post_save_callback(sender, instance, created, raw, using, update_fields, **kwargs):
"""ZYY: 模型保存后通用处理器
- 处理内容更新后的缓存清理
- 搜索引擎URL提交
- 评论通知的异步处理
"""
clearcache = False
# ZYY: 排除管理后台日志对象
if isinstance(instance, LogEntry):
return
# ZYY: 处理支持URL获取的模型如文章、页面等
if 'get_full_url' in dir(instance):
is_update_views = update_fields == {'views'}
is_update_views = update_fields == {'views'} # ZYY: 仅浏览量更新时不触发完整处理
# ZYY: 非测试环境且非浏览量更新时推送搜索引擎
if not settings.TESTING and not is_update_views:
try:
notify_url = instance.get_full_url()
SpiderNotify.baidu_notify([notify_url])
SpiderNotify.baidu_notify([notify_url]) # ZYY: 百度站长推送
except Exception as ex:
logger.error("notify sipder", ex)
if not is_update_views:
clearcache = True
clearcache = True # ZYY: 标记需要清理缓存
# ZYY: 评论处理特别逻辑
if isinstance(instance, Comment):
if instance.is_enable:
if instance.is_enable: # ZYY: 仅处理已启用的评论
path = instance.article.get_absolute_url()
site = get_current_site().domain
if site.find(':') > 0:
if site.find(':') > 0: # ZYY: 去除端口号
site = site[0:site.find(':')]
# ZYY: 清理多级缓存文章详情页、SEO数据、评论列表等
expire_view_cache(
path,
servername=site,
serverport=80,
key_prefix='blogdetail')
if cache.get('seo_processor'):
cache.delete('seo_processor')
comment_cache_key = 'article_comments_{id}'.format(
id=instance.article.id)
comment_cache_key = 'article_comments_{id}'.format(id=instance.article.id)
cache.delete(comment_cache_key)
delete_sidebar_cache()
delete_view_cache('article_comments', [str(instance.article.pk)])
# ZYY: 使用独立线程发送评论通知邮件
_thread.start_new_thread(send_comment_email, (instance,))
# ZYY: 通用缓存清理(当内容更新时)
if clearcache:
cache.clear()
@ -116,7 +140,11 @@ def model_post_save_callback(
@receiver(user_logged_in)
@receiver(user_logged_out)
def user_auth_callback(sender, request, user, **kwargs):
"""ZYY: 用户认证状态变化处理
- 记录用户登录日志
- 清理用户相关缓存
"""
if user and user.username:
logger.info(user)
delete_sidebar_cache()
# cache.clear()
logger.info(user) # ZYY: 记录用户认证行为
delete_sidebar_cache() # ZYY: 用户状态变化后清理缓存
# cache.clear() # ZYY: 注释掉的激进缓存清理策略

@ -13,49 +13,83 @@ logger = logging.getLogger(__name__)
class ElasticSearchBackend(BaseSearchBackend):
def __init__(self, connection_alias, **connection_options):
# ZYY 初始化Elasticsearch搜索后端
# ZYY参数:
# ZYY connection_alias: 连接别名
# ZYY connection_options: 连接选项
super(
ElasticSearchBackend,
self).__init__(
connection_alias,
**connection_options)
self.manager = ArticleDocumentManager()
self.include_spelling = True
self.manager = ArticleDocumentManager()# ZYY 文章文档管理器实例
self.include_spelling = True # ZYY 是否包含拼写建议
def _get_models(self, iterable):
# ZYY 获取模型列表并转换为文档格式
# ZYY参数:
# ZYY iterable: 可迭代对象,包含要处理的模型实例
# ZYY返回:
# ZYY 转换后的文档列表
models = iterable if iterable and iterable[0] else Article.objects.all()
docs = self.manager.convert_to_doc(models)
return docs
def _create(self, models):
# ZYY 创建索引并重建文档
# 参数:
# models: 要处理的模型列表
self.manager.create_index()
docs = self._get_models(models)
self.manager.rebuild(docs)
def _delete(self, models):
# ZYY 删除模型对应的文档
# 参数:
# models: 要删除的模型列表
for m in models:
m.delete()
return True
def _rebuild(self, models):
# ZYY 重建索引
# 参数:
# models: 要重建的模型列表,为空则处理所有文章
models = models if models else Article.objects.all()
docs = self.manager.convert_to_doc(models)
self.manager.update_docs(docs)
def update(self, index, iterable, commit=True):
# ZYY 更新索引
# 参数:
# index: 索引名称
# iterable: 可迭代对象,包含要更新的模型实例
# commit: 是否立即提交
models = self._get_models(iterable)
self.manager.update_docs(models)
def remove(self, obj_or_string):
# ZYY 从索引中移除对象
# 参数:
# obj_or_string: 要移除的对象或字符串
models = self._get_models([obj_or_string])
self._delete(models)
def clear(self, models=None, commit=True):
# ZYY 清空索引
# 参数:
# models: 要清空的模型列表(未使用,始终清空所有)
# commit: 是否立即提交
self.remove(None)
@staticmethod
def get_suggestion(query: str) -> str:
"""获取推荐词, 如果没有找到添加原搜索词"""
"""ZYY获取推荐词, 如果没有找到添加原搜索词
参数:
query: 原始查询字符串
返回:
推荐的搜索词字符串"""
search = ArticleDocument.search() \
.query("match", body=query) \
@ -73,17 +107,27 @@ class ElasticSearchBackend(BaseSearchBackend):
@log_query
def search(self, query_string, **kwargs):
# ZYY 执行搜索
# 参数:
# query_string: 查询字符串
# **kwargs: 其他搜索参数,包括:
# start_offset: 起始偏移量
# end_offset: 结束偏移量
logger.info('search query_string:' + query_string)
start_offset = kwargs.get('start_offset')
end_offset = kwargs.get('end_offset')
# 推荐词搜索
# ZYY推荐词搜索
if getattr(self, "is_suggest", None):
suggestion = self.get_suggestion(query_string)
else:
suggestion = query_string
# ZYY构建复合查询:
# ZYY1. 应该匹配body或title字段
# ZYY2. 最小匹配度为70%
# ZYY3. 过滤状态为'p'(已发布)和类型为'a'(文章)
q = Q('bool',
should=[Q('match', body=suggestion), Q('match', title=suggestion)],
minimum_should_match="70%")
@ -95,15 +139,16 @@ class ElasticSearchBackend(BaseSearchBackend):
.source(False)[start_offset: end_offset]
results = search.execute()
hits = results['hits'].total
hits = results['hits'].total # ZYY 总命中数
raw_results = []
for raw_result in results['hits']['hits']:
app_label = 'blog'
model_name = 'Article'
additional_fields = {}
app_label = 'blog'# ZYY 应用标签
model_name = 'Article'# ZYY 模型名称
additional_fields = {}# ZYY 额外字段(当前未使用)
result_class = SearchResult
# ZYY: 构建搜索结果对象并收集到列表中
result = result_class(
app_label,
model_name,
@ -111,25 +156,32 @@ class ElasticSearchBackend(BaseSearchBackend):
raw_result['_score'],
**additional_fields)
raw_results.append(result)
# ZYY: 初始化分面数据和拼写建议
facets = {}
spelling_suggestion = None if query_string == suggestion else suggestion
# ZYY: 返回标准化的搜索结果结构
return {
'results': raw_results,
'hits': hits,
'facets': facets,
'spelling_suggestion': spelling_suggestion,
'results': raw_results, # ZYY: 实际搜索结果对象列表
'hits': hits,# ZYY: 总命中数
'facets': facets,# ZYY: 分面统计数据(当前为空)
'spelling_suggestion': spelling_suggestion,# ZYY: 拼写纠正建议
}
class ElasticSearchQuery(BaseSearchQuery):
# ZYY: 将datetime对象转换为ES兼容的字符串格式
def _convert_datetime(self, date):
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S'))
return force_str(date.strftime('%Y%m%d%H%M%S')) # ZYY: 包含时间的完整格式
else:
return force_str(date.strftime('%Y%m%d000000'))
return force_str(date.strftime('%Y%m%d000000')) # ZYY: 仅日期的格式
# ZYY: 清理用户输入的查询片段
def clean(self, query_fragment):
"""
ZYY: 提供在将用户输入呈现给后端之前进行净化的机制
Whoosh 1.X在此有所不同,不能再使用反斜杠转义保留字符而是应该引用整个单词
"""
"""
Provides a mechanism for sanitizing user input before presenting the
value to the backend.
@ -142,9 +194,11 @@ class ElasticSearchQuery(BaseSearchQuery):
cleaned_words = []
for word in words:
# ZYY: 处理保留字(转为小写)
if word in self.backend.RESERVED_WORDS:
word = word.replace(word, word.lower())
# ZYY: 处理保留字符(用单引号包裹整个词)
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = "'%s'" % word
@ -153,31 +207,35 @@ class ElasticSearchQuery(BaseSearchQuery):
cleaned_words.append(word)
return ' '.join(cleaned_words)
# ZYY: 构建查询片段(这里直接返回原始查询字符串)
def build_query_fragment(self, field, filter_type, value):
return value.query_string
# ZYY: 获取结果总数
def get_count(self):
results = self.get_results()
return len(results) if results else 0
# ZYY: 获取拼写建议
def get_spelling_suggestion(self, preferred_query=None):
return self._spelling_suggestion
# ZYY: 构建查询参数
def build_params(self, spelling_query=None):
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
return kwargs
# ZYY: 扩展ModelSearchForm以支持Elasticsearch特定功能
class ElasticSearchModelSearchForm(ModelSearchForm):
def search(self):
# 是否建议搜索
# ZYY: 根据请求参数设置是否启用搜索建议
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
sqs = super().search()
sqs = super().search()# ZYY: 调用父类搜索方法
return sqs
# ZYY: Elasticsearch搜索引擎实现
class ElasticSearchEngine(BaseEngine):
backend = ElasticSearchBackend
query = ElasticSearchQuery
backend = ElasticSearchBackend # ZYY: 指定使用的后端
query = ElasticSearchQuery # ZYY: 指定使用的查询类

@ -6,35 +6,60 @@ from django.utils.feedgenerator import Rss201rev2Feed
from blog.models import Article
from djangoblog.utils import CommonMarkdown
# ZYY: Django内置Feed类用于生成RSS/Atom订阅源
class DjangoBlogFeed(Feed):
# ZYY: 指定使用RSS 2.0规范(支持命名空间扩展)
feed_type = Rss201rev2Feed
description = '大巧无工,重剑无锋.'
title = "且听风吟 大巧无工,重剑无锋. "
link = "/feed/"
# ZYY: ================ 订阅源元数据配置 ================
description = '大巧无工,重剑无锋.'# ZYY: 订阅源副标题/描述
title = "且听风吟 大巧无工,重剑无锋. "# ZYY: 订阅源主标题
link = "/feed/" # ZYY: 订阅源自引用URL实际应为网站根URL
# ZYY: ================ 作者信息方法 ================
# ZYY: 注意这些方法在每次生成feed时都会查询数据库
def author_name(self):
# ZYY: 获取站点作者昵称(潜在问题:未处理无用户情况)
return get_user_model().objects.first().nickname
def author_link(self):
# ZYY: 获取作者个人页面URL假设用户模型有get_absolute_url方法
return get_user_model().objects.first().get_absolute_url()
# ZYY: ================ 订阅内容核心方法 ================
def items(self):
# ZYY: 筛选条件:
# ZYYtype='a' - 只包含文章类型(可能区分文章/页面等)
# ZYYstatus='p' - 只包含已发布状态(避免草稿泄露)
# ZYYorder_by('-pub_time') - 按发布时间降序
# ZYY[:5] - 限制最近5篇
return Article.objects.filter(type='a', status='p').order_by('-pub_time')[:5]
# ZYY: ================ 单个条目字段映射 ================
def item_title(self, item):
# ZYY: 直接使用文章标题作为条目标题
return item.title
def item_description(self, item):
# ZYY: 将文章内容通过Markdown渲染后作为描述
# 注意可能产生XSS风险需确保CommonMarkdown有净化处理
return CommonMarkdown.get_markdown(item.body)
# ZYY: ================ 订阅源附加信息 ================
def feed_copyright(self):
# ZYY: 动态生成版权声明(自动更新年份)
now = timezone.now()
return "Copyright© {year} 且听风吟".format(year=now.year)
def item_link(self, item):
# ZYY: 使用文章自身的绝对URL作为条目链接
return item.get_absolute_url()
# ZYY: ================ 条目唯一标识 ================
def item_guid(self, item):
return
# ZYY: 原代码不完整,通常应实现为:
# ZYYreturn item.get_absolute_url() # 使用URL作为唯一标识
#ZYY 或 return str(item.id) # 使用数据库ID
# ZYY当前实现缺失会导致某些阅读器无法识别条目更新
return # ZYY: 注意这里缺少返回值,实际使用会报错

@ -9,83 +9,107 @@ from django.utils.translation import gettext_lazy as _
class LogEntryAdmin(admin.ModelAdmin):
# ZYY: 列表过滤器配置 - 允许按内容类型筛选日志
list_filter = [
'content_type'
]
# ZYY: 搜索字段配置 - 允许按对象表示和变更消息搜索
search_fields = [
'object_repr',
'change_message'
]
# ZYY: 可点击的列表显示字段 - 指定哪些字段可以点击进入详情页
list_display_links = [
'action_time',
'get_change_message',
]
# ZYY: 列表显示字段配置 - 定义在日志列表中显示的字段
list_display = [
'action_time',
'user_link',
'content_type',
'object_link',
'get_change_message',
'action_time', #ZYY 操作时间
'user_link', # ZYY用户链接自定义方法
'content_type', # ZYY内容类型
'object_link', # ZYY对象链接自定义方法
'get_change_message', #ZYY 变更消息
]
# ZYY: 权限控制 - 禁止通过admin添加日志条目
def has_add_permission(self, request):
return False
# ZYY: 权限控制 - 允许超级用户或有特定权限的用户修改日志但禁止POST请求
def has_change_permission(self, request, obj=None):
return (
request.user.is_superuser or
request.user.has_perm('admin.change_logentry')
) and request.method != 'POST'
# ZYY: 权限控制 - 禁止通过admin删除日志条目
def has_delete_permission(self, request, obj=None):
return False
# ZYY: 自定义方法 - 生成对象链接(如果是删除操作则显示纯文本)
def object_link(self, obj):
# ZYY: 初始化为转义后的对象表示字符串防XSS
object_link = escape(obj.object_repr)
content_type = obj.content_type
# ZYY: 如果不是删除操作且有内容类型,尝试生成可点击链接
if obj.action_flag != DELETION and content_type is not None:
# try returning an actual link instead of object repr string
try:
# ZYY: 构建admin修改页面的URL
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.object_id]
)
# ZYY: 生成HTML链接后续会标记为安全
object_link = '<a href="{}">{}</a>'.format(url, object_link)
except NoReverseMatch:
# ZYY: 如果URL反转失败保持纯文本显示
pass
# ZYY: 标记字符串为安全HTML因为我们已经正确转义和构建
return mark_safe(object_link)
object_link.admin_order_field = 'object_repr'
object_link.short_description = _('object')
# ZYY: 配置列表排序和描述信息
object_link.admin_order_field = 'object_repr' # 按object_repr字段排序
object_link.short_description = _('object') # 列标题显示为"object"
# ZYY: 自定义方法 - 生成用户链接
def user_link(self, obj):
# ZYY: 获取用户模型的内容类型
content_type = ContentType.objects.get_for_model(type(obj.user))
# ZYY: 初始化为转义后的用户字符串表示
user_link = escape(force_str(obj.user))
try:
# try returning an actual link instead of object repr string
# ZYY: 尝试构建用户admin修改页面的URL
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.user.pk]
)
# ZYY: 生成HTML链接
user_link = '<a href="{}">{}</a>'.format(url, user_link)
except NoReverseMatch:
# ZYY: URL反转失败时保持纯文本
pass
# ZYY: 标记为安全HTML
return mark_safe(user_link)
user_link.admin_order_field = 'user'
user_link.short_description = _('user')
# ZYY: 配置列表排序和描述信息
user_link.admin_order_field = 'user' # ZYY按user字段排序
user_link.short_description = _('user') # ZYY列标题显示为"user"
# ZYY: 优化查询集 - 预取关联的内容类型减少查询次数
def get_queryset(self, request):
queryset = super(LogEntryAdmin, self).get_queryset(request)
return queryset.prefetch_related('content_type')
# ZYY: 操作控制 - 移除默认的批量删除操作
def get_actions(self, request):
actions = super(LogEntryAdmin, self).get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
return actions

@ -1,41 +1,60 @@
# ZYY: 导入Python标准库的logging模块用于记录日志
import logging
# ZYY: 获取当前模块的日志记录器
# ZYY: __name__表示当前模块名logging会根据模块名自动组织日志层次
logger = logging.getLogger(__name__)
# ZYY: 定义插件基类,所有插件应继承此类
class BasePlugin:
# 插件元数据
# ZYY: 插件元数据字段声明(类属性)
# ZYY: PLUGIN_NAME: 插件名称(必须)
PLUGIN_NAME = None
# ZYY: PLUGIN_DESCRIPTION: 插件描述(必须)
PLUGIN_DESCRIPTION = None
# ZYY: PLUGIN_VERSION: 插件版本(必须)
PLUGIN_VERSION = None
# ZYY: 构造函数,在实例化插件时自动调用
def __init__(self):
# ZYY: 检查必需的元数据是否已定义
# ZYY: 如果任何元数据为None则抛出ValueError异常
if not all([self.PLUGIN_NAME, self.PLUGIN_DESCRIPTION, self.PLUGIN_VERSION]):
raise ValueError("Plugin metadata (PLUGIN_NAME, PLUGIN_DESCRIPTION, PLUGIN_VERSION) must be defined.")
# ZYY: 调用插件初始化方法
self.init_plugin()
# ZYY: 调用钩子注册方法
self.register_hooks()
# ZYY: 插件初始化方法(可被子类重写)
def init_plugin(self):
"""
插件初始化逻辑
子类可以重写此方法来实现特定的初始化操作
ZYY: 插件初始化逻辑
ZYY: 子类可以重写此方法来实现特定的初始化操作
"""
# ZYY: 记录插件初始化日志
logger.info(f'{self.PLUGIN_NAME} initialized.')
# ZYY: 插件钩子注册方法(可被子类重写)
def register_hooks(self):
"""
注册插件钩子
子类可以重写此方法来注册特定的钩子
ZYY: 注册插件钩子
ZYY: 子类可以重写此方法来注册特定的钩子
"""
# ZYY: 基类不实现具体钩子注册逻辑
pass
# ZYY: 获取插件信息的方法
def get_plugin_info(self):
"""
获取插件信息
ZYY: 获取插件信息
:return: 包含插件元数据的字典
"""
# ZYY: 返回包含插件名称、描述和版本的字典
return {
'name': self.PLUGIN_NAME,
'description': self.PLUGIN_DESCRIPTION,
'version': self.PLUGIN_VERSION
}
}

@ -1,7 +1,20 @@
# ZYY: 定义文章详情页加载的钩子名称常量
# ZYY: 用于标识文章详情页加载事件,插件可监听此事件执行自定义逻辑
ARTICLE_DETAIL_LOAD = 'article_detail_load'
# ZYY: 定义文章创建的钩子名称常量
# ZYY: 用于标识文章创建事件,通常在文章保存前触发
ARTICLE_CREATE = 'article_create'
# ZYY: 定义文章更新的钩子名称常量
# ZYY: 用于标识文章更新事件,通常在文章修改后触发
ARTICLE_UPDATE = 'article_update'
ARTICLE_DELETE = 'article_delete'
ARTICLE_CONTENT_HOOK_NAME = "the_content"
# ZYY: 定义文章删除的钩子名称常量
# ZYY: 用于标识文章删除事件,通常在文章被删除前触发
ARTICLE_DELETE = 'article_delete'
# ZYY: 定义文章内容处理的钩子名称常量
# ZYY: 参考WordPress的the_content过滤器概念用于在文章内容渲染时进行处理
# ZYY: 插件可以通过此钩子修改文章最终显示的内容
ARTICLE_CONTENT_HOOK_NAME = "the_content"

@ -1,44 +1,77 @@
# ZYY: 导入Python标准库logging模块用于记录钩子系统的运行日志
import logging
# ZYY: 获取当前模块的日志记录器,用于记录调试和错误信息
logger = logging.getLogger(__name__)
# ZYY: 定义全局钩子存储字典,键为钩子名称,值为回调函数列表
_hooks = {}
# ZYY: 钩子注册函数,用于将回调函数注册到指定钩子
def register(hook_name: str, callback: callable):
"""
注册一个钩子回调
ZYY: 注册一个钩子回调函数到指定钩子名称
ZYY: 如果钩子不存在则创建新列表然后将回调添加到对应钩子的回调列表中
ZYY: @param hook_name: 要注册的钩子名称
ZYY: @param callback: 要注册的回调函数必须是可调用对象
"""
# ZYY: 检查钩子是否已存在,不存在则初始化空列表
if hook_name not in _hooks:
_hooks[hook_name] = []
# ZYY: 将回调函数添加到对应钩子的回调列表
_hooks[hook_name].append(callback)
# ZYY: 记录调试日志,显示成功注册的钩子和回调函数名
logger.debug(f"Registered hook '{hook_name}' with callback '{callback.__name__}'")
# ZYY: Action钩子执行函数用于触发指定名称的所有Action钩子回调
def run_action(hook_name: str, *args, **kwargs):
"""
执行一个 Action Hook
它会按顺序执行所有注册到该钩子上的回调函数
ZYY: 执行一个Action Hook无返回值的钩子
ZYY: 会按注册顺序依次执行所有回调函数不处理返回值
ZYY: @param hook_name: 要触发的钩子名称
ZYY: @param args: 传递给回调函数的位置参数
ZYY: @param kwargs: 传递给回调函数的关键字参数
"""
# ZYY: 检查是否有回调注册到该钩子
if hook_name in _hooks:
logger.debug(f"Running action hook '{hook_name}'")
# ZYY: 遍历并执行所有注册的回调函数
for callback in _hooks[hook_name]:
try:
# ZYY: 执行回调函数,可能抛出异常
callback(*args, **kwargs)
except Exception as e:
# ZYY: 捕获并记录回调执行中的异常,避免影响其他回调
logger.error(f"Error running action hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
# ZYY: Filter钩子执行函数用于触发指定名称的所有Filter钩子回调并处理返回值
def apply_filters(hook_name: str, value, *args, **kwargs):
"""
执行一个 Filter Hook
它会把 value 依次传递给所有注册的回调函数进行处理
ZYY: 执行一个Filter Hook有返回值的钩子
ZYY: 会将初始值依次传递给所有回调函数处理最终返回处理后的值
ZYY: @param hook_name: 要触发的钩子名称
ZYY: @param value: 要处理的初始值
ZYY: @param args: 传递给回调函数的位置参数
ZYY: @param kwargs: 传递给回调函数的关键字参数
ZYY: @return: 经过所有回调处理后的最终值
"""
# ZYY: 检查是否有回调注册到该钩子
if hook_name in _hooks:
logger.debug(f"Applying filter hook '{hook_name}'")
# ZYY: 遍历并执行所有注册的回调函数每次用返回值更新value
for callback in _hooks[hook_name]:
try:
value = callback(value, *args, **kwargs)
except Exception as e:
# ZYY: 捕获并记录回调执行中的异常,避免影响其他回调
logger.error(f"Error applying filter hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
return value
# ZYY: 返回最终处理后的值
return value

@ -1,19 +1,32 @@
# ZYY: 导入Python标准库os模块用于处理文件和目录路径
import os
# ZYY: 导入Python标准库logging模块用于记录插件加载过程中的日志信息
import logging
# ZYY: 导入Django配置模块用于访问Django项目的设置参数
from django.conf import settings
# ZYY: 获取当前模块的日志记录器,用于记录插件加载的调试和错误信息
logger = logging.getLogger(__name__)
# ZYY: 定义插件加载函数用于动态加载和初始化Django插件
def load_plugins():
"""
Dynamically loads and initializes plugins from the 'plugins' directory.
This function is intended to be called when the Django app registry is ready.
ZYY: 动态加载并初始化位于'plugins'目录中的插件
ZYY: 该函数应在Django应用注册表就绪后调用以确保所有依赖项可用
ZYY: 加载过程会读取settings.ACTIVE_PLUGINS配置并尝试加载每个指定的插件
"""
# ZYY: 遍历settings.ACTIVE_PLUGINS中配置的所有活跃插件名称
for plugin_name in settings.ACTIVE_PLUGINS:
# ZYY: 构建插件的完整路径(目录路径)
plugin_path = os.path.join(settings.PLUGINS_DIR, plugin_name)
# ZYY: 检查插件目录是否存在且包含plugin.py文件插件主模块
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, 'plugin.py')):
try:
# ZYY: 动态导入插件模块格式plugins.<plugin_name>.plugin
__import__(f'plugins.{plugin_name}.plugin')
# ZYY: 记录成功加载插件的日志信息
logger.info(f"Successfully loaded plugin: {plugin_name}")
except ImportError as e:
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)
# ZYY: 捕获并记录插件导入失败的错误信息(包含异常堆栈)
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)

@ -12,332 +12,344 @@ https://docs.djangoproject.com/en/1.10/ref/settings/
import os
import sys
from pathlib import Path
from django.utils.translation import gettext_lazy as _
# ZYY 将环境变量字符串转换为布尔值
def env_to_bool(env, default):
str_val = os.environ.get(env)
return default if str_val is None else str_val == 'True'
str_val = os.environ.get(env)
return default if str_val is None else str_val == 'True'
# Build paths inside the project like this: BASE_DIR / 'subdir'.
# ZYY 在项目内部构建路径例如BASE_DIR / 'subdir'
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.10/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
# ZYY 快速启动开发设置 - 不适用于生产环境
# ZYY 参考链接: https://docs.djangoproject.com/en/1.10/howto/deployment/checklist/
# ZYY 安全警告: 请保密生产环境中使用的密钥!
SECRET_KEY = os.environ.get(
'DJANGO_SECRET_KEY') or 'n9ceqv38)#&mwuat@(mjb_p%em$e8$qyr#fw9ot!=ba6lijx-6'
# SECURITY WARNING: don't run with debug turned on in production!
'DJANGO_SECRET_KEY') or 'n9ceqv38)#&mwuat@ (mjb_p%em$e8$qyr#fw9ot!=ba6lijx-6'
# ZYY 安全警告: 不要在生产环境中开启调试模式!
DEBUG = env_to_bool('DJANGO_DEBUG', True)
# DEBUG = False
# ZYY 检测是否为测试环境
TESTING = len(sys.argv) > 1 and sys.argv[1] == 'test'
# ZYY 允许的主机列表
# ALLOWED_HOSTS = []
ALLOWED_HOSTS = ['*', '127.0.0.1', 'example.com']
# django 4.0新增配置
# ZYY Django 4.0新增配置: CSRF信任来源
CSRF_TRUSTED_ORIGINS = ['http://example.com']
# Application definition
# ZYY 应用定义
INSTALLED_APPS = [
# 'django.contrib.admin',
'django.contrib.admin.apps.SimpleAdminConfig',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django.contrib.sites',
'django.contrib.sitemaps',
'mdeditor',
'haystack',
'blog',
'accounts',
'comments',
'oauth',
'servermanager',
'owntracks',
'compressor',
'djangoblog'
# 'django.contrib.admin',
'django.contrib.admin.apps.SimpleAdminConfig',
'django.contrib.auth',
'django. contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django.contrib.sites',
'django.contrib.sitemaps',
'mdeditor',
'haystack',
'blog',
'accounts',
'comments',
'oauth',
' servermanager',
'owntracks',
'compressor',
'djangoblog'
]
# ZYY 中间件配置
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.locale.LocaleMiddleware',
'django.middleware.gzip.GZipMiddleware',
# 'django.middleware.cache.UpdateCacheMiddleware',
'django.middleware.common.CommonMiddleware',
# 'django.middleware.cache.FetchFromCacheMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django.middleware.http.ConditionalGetMiddleware',
'blog.middleware.OnlineMiddleware'
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.locale.LocaleMiddleware',
'django.middleware.gzip.GZipMiddleware',
# 'django.middleware.cache.UpdateCacheMiddleware',
'django.middleware.common.CommonMiddleware',
# 'django.middleware.cache.FetchFromCacheMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib. messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django.middleware.http.ConditionalGetMiddleware',
'blog.middleware.OnlineMiddleware'
]
# ZYY 根URL配置
ROOT_URLCONF = 'djangoblog.urls'
# ZYY 模板配置
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [os.path.join(BASE_DIR, 'templates')],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
'blog.context_processors.seo_processor'
],
},
},
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [os.path.join(BASE_DIR , 'templates')],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
'blog.context_processors.seo_processor'
],
},
},
]
# ZYY WSGI应用配置
WSGI_APPLICATION = 'djangoblog.wsgi.application'
# Database
# https://docs.djangoproject.com/en/1.10/ref/settings/#databases
# ZYY 数据库配置
# ZYY 参考链接: https://docs.djangoproject.com/en/1.10/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': os.environ.get('DJANGO_MYSQL_DATABASE') or 'djangoblog',
'USER': os.environ.get('DJANGO_MYSQL_USER') or 'root',
'PASSWORD': os.environ.get('DJANGO_MYSQL_PASSWORD') or 'root',
'HOST': os.environ.get('DJANGO_MYSQL_HOST') or '127.0.0.1',
'PORT': int(
os.environ.get('DJANGO_MYSQL_PORT') or 3306),
'OPTIONS': {
'charset': 'utf8mb4'},
}}
# Password validation
# https://docs.djangoproject.com/en/1.10/ref/settings/#auth-password-validators
'default': {
'ENGINE': ' django.db.backends.mysql',
'NAME': os.environ.get('DJANGO_MYSQL_DATABASE') or 'djangoblog',
'USER': os.environ.get('DJANGO_MYSQL_USER') or 'root',
'PASSWORD': os.environ.get('DJANGO_MYSQL_PASSWORD') or 'root',
'HOST': os.environ.get('DJANGO_MYSQL_HOST') or '127.0.0.1',
'PORT': int(
os.environ.get('DJANGO_MYSQL_PORT') or 3306),
'OPTIONS': {
'charset': 'utf8mb4'},
}}
# ZYY 密码验证配置
# ZYY 参考链接: https://docs.djang oproject.com/en/1.10/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# ZYY 语言配置
LANGUAGES = (
('en', _('English')),
('zh-hans', _('Simplified Chinese')),
('zh-hant', _('Traditional Chinese')),
('en', _('English')),
('zh-hans', _('Simplified Chinese')),
('zh-hant', _('Traditional Chinese')),
)
LOCALE_PATHS = (
os.path.join(BASE_DIR, 'locale'),
os.path.join(BASE_DIR, 'locale'),
)
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_L10N = True
USE_TZ = False
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.10/howto/static-files/
# ZYY 静态文件配置(CSS, JavaScript, Images)
# ZYY 参考链接: https://docs.djangoproject.com/en/1.10/howto /static-files/
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.whoosh_cn_backend.WhooshEngine',
'PATH': os.path.join(os.path.dirname(__file__), 'whoosh_index'),
},
'default': {
'ENGINE': 'djangoblog.whoosh_cn_backend.WhooshEngine',
'PATH': os.path.join(os.path.dirname(__file__), 'whoosh_index'),
},
}
# Automatically update searching index
# ZYY 自动更新搜索索引
HAYSTACK_SIGNAL_PROCESSOR = 'haystack.signals.RealtimeSignalProcessor'
# Allow user login with username and password
# ZYY 允许用户使用用户名和密码登录
AUTHENTICATION_BACKENDS = [
'accounts.user_login_backend.EmailOrUsernameModelBackend']
'accounts.user_login_backend.EmailOrUsernameModelBackend']
# ZYY 静态文件根目录
STATIC_ROOT = os.path.join(BASE_DIR, 'collectedstatic')
STATIC_URL = '/static/'
STATICFILES = os.path.join(BASE_DIR, 'static')
# ZYY 自定义用户模型
AUTH_USER_MODEL = 'accounts.BlogUser'
LOGIN_URL = '/login/'
TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
DATE_TIME_FORMAT = '%Y-%m-%d'
# bootstrap color styles
# ZYY Bootstrap颜色样式
BOOTSTRAP_COLOR_TYPES = [
'default', 'primary', 'success', 'info', 'warning', 'danger'
'default', 'primary', 'success', 'info', 'warning', 'danger'
]
# paginate
# ZYY 分页配置
PAGINATE_BY = 10
# http cache timeout
# ZYY HTTP缓存超时时间
CACHE_CONTROL_MAX_AGE = 2592000
# cache setting
# ZYY 缓存配置
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'TIMEOUT': 10800,
'LOCATION': 'unique-snowflake',
}
'default': {
'BACKEND': 'django.core .cache.backends.locmem.LocMemCache',
'TIMEOUT': 10800,
'LOCATION': 'unique-snowflake',
}
}
# 使用redis作为缓存
# ZYY 使用redis作为缓存
if os.environ.get("DJANGO_REDIS_URL"):
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': f'redis://{os.environ.get("DJANGO_REDIS_URL")}',
}
}
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': f'redis://{os.environ.get("DJANGO_REDIS_URL")}',
}
}
SITE_ID = 1
BAIDU_NOTIFY_URL = os.environ.get('DJANGO_BAIDU_NOTIFY_URL') \
or 'http://data.zz.baidu.com/urls?site=https://www.lylinux.net&token=1uAOGrMsUm5syDGn'
# Email:
# ZYY 百度推送URL配置
BAIDU_NOTIFY_URL = os.environ .get('DJANGO_BAIDU_NOTIFY_URL') \
or 'http://data.zz.baidu.com/urls?site=https://www.lylinux.net&token=1uAOGrMsUm5syDGn'
# ZYY 邮件配置
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_USE_TLS = env_to_bool('DJANGO_EMAIL_TLS', False)
EMAIL_USE_SSL = env_to_bool('DJANGO_EMAIL_SSL', True)
EMAIL_HOST = os.environ.get('DJANGO_EMAIL_HOST') or 'smtp.mxhichina.com'
EMAIL_HOST = os.environ.get('DJANGO_EMAIL_HOST') or 'smtp.mxhichina.com '
EMAIL_PORT = int(os.environ.get('DJANGO_EMAIL_PORT') or 465)
EMAIL_HOST_USER = os.environ.get('DJANGO_EMAIL_USER')
EMAIL_HOST_PASSWORD = os.environ.get('DJANGO_EMAIL_PASSWORD')
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
SERVER_EMAIL = EMAIL_HOST_USER
# Setting debug=false did NOT handle except email notifications
# ZYY 设置debug=false时不会处理异常邮件通知
ADMINS = [('admin', os.environ.get('DJANGO_ADMIN_EMAIL') or 'admin@admin.com')]
# WX ADMIN password(Two times md5)
# ZYY 微信管理员密码(两次md5 加密)
WXADMIN = os.environ.get(
'DJANGO_WXADMIN_PASSWORD') or '995F03AC401D6CABABAEF756FC4D43C7'
'DJANGO_WXADMIN_PASSWORD') or '995F03AC401D6CABABAEF756FC4D43C7'
# ZYY 日志目录配置
LOG_PATH = os.path.join(BASE_DIR, 'logs')
if not os.path.exists(LOG_PATH):
os.makedirs(LOG_PATH, exist_ok=True)
os.makedirs(LOG_PATH, exist_ok=True)
# ZYY 日志配置
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'root': {
'level': 'INFO',
'handlers': ['console', 'log_file'],
},
'formatters': {
'verbose': {
'format': '[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d %(module)s] %(message)s',
}
},
'filters': {
'require_debug_false': {
'()': 'django.utils.log.RequireDebugFalse',
},
'require_debug_true': {
'()': 'django.utils.log.RequireDebugTrue',
},
},
'handlers': {
'log_file': {
'level': 'INFO',
'class': 'logging.handlers.TimedRotatingFileHandler',
'filename': os.path.join(LOG_PATH, 'djangoblog.log'),
'when': 'D',
'formatter': 'verbose',
'interval': 1,
'delay': True,
'backupCount': 5,
'encoding': 'utf-8'
},
'console': {
'level': 'DEBUG',
'filters': ['require_debug_true'],
'class': 'logging.StreamHandler',
'formatter': 'verbose'
},
'null': {
'class': 'logging.NullHandler',
},
'mail_admins': {
'level': 'ERROR',
'filters': ['require_debug_false'],
'class': 'django.utils.log.AdminEmailHandler'
}
},
'loggers': {
'djangoblog': {
'handlers': ['log_file', 'console'],
'level': 'INFO',
'propagate': True,
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
}
}
'version': 1,
'disable_existing_loggers': False,
'root': {
'level': 'INFO',
'handlers': ['console', 'log_file'],
},
'formatters': {
'verbose': {
'format': '[%( asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d %(module)s] %(message)s',
}
},
'filters': {
'require_debug_false': {
'()': 'django.utils.log.RequireDebugFalse',
},
'require_debug_true': {
'()': 'django.utils.log.RequireDebugTrue',
},
},
'handlers': {
'log_file': {
'level': 'INFO',
'class': 'logging.handlers.TimedRotatingFileHandler',
'filename': os.path.join(LOG_PATH, 'djangoblog.log'),
'when': 'D',
'formatter': 'verbose',
'interval': 1,
'delay': True ,
'backupCount': 5,
'encoding': 'utf-8'
},
'console': {
'level': 'DEBUG',
'filters': ['require_debug_true'],
'class': 'logging.StreamHandler',
'formatter': 'verbose'
},
'null': {
'class': 'logging.NullHandler',
},
'mail_admins': {
'level': 'ERROR',
'filters': ['require_debug_false'],
'class': 'django.utils.log.AdminEmailHandler'
}
},
'loggers': {
'djangoblog': {
'handlers': ['log_file', 'console'],
'level': 'INFO',
'propagate': True,
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
}
}
}
# ZYY 静态文件查找器配置
STATICFILES_FINDERS = (
'django.contrib.staticfiles.finders.FileSystemFinder',
'django.contrib.staticfiles.finders.AppDirectoriesFinder',
# other
'compressor.finders.CompressorFinder',
)
'django.contrib.staticfiles.finders.FileSystemFinder',
'django.contrib.staticfiles.finders.AppDirectoriesFinder',
# other
'compressor.finders.CompressorFinder',
)
# ZYY 压缩配置
COMPRESS_ENABLED = True
# COMPRESS_OFFLINE = True
COMPRESS_CSS_FILTERS = [
# creates absolute urls from relative ones
'compressor.filters.css_default.CssAbsoluteFilter',
# css minimizer
'compressor.filters.cssmin.CSSMinFilter'
# 创建绝对URL
'compressor.filters.css_default.CssAbsoluteFilter',
# CSS压缩
'compressor.filters.cssmin.CSSMinFilter'
]
COMPRESS_JS_FILTERS = [
'compressor.filters.jsmin.JSMinFilter'
'compressor.filters.jsmin.JSMinFilter'
]
# ZYY 媒体文件配置
MEDIA_ROOT = os.path.join(BASE_DIR, 'uploads')
MEDIA_URL = '/media/'
# ZYY X-Frame-Options配置
X_FRAME_OPTIONS = 'SAMEORIGIN'
# ZYY 默认自增字段类型
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
# ZYY Elasticsearch配置
if os.environ.get('DJANGO_ELASTICSEARCH_HOST'):
ELASTICSEARCH_DSL = {
'default': {
'hosts': os.environ.get('DJANGO_ELASTICSEARCH_HOST')
},
}
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.elasticsearch_backend.ElasticSearchEngine',
},
}
# Plugin System
ELASTICSEARCH_DSL = {
'default': {
'hosts': os.environ.get('DJANGO_ELASTICSEARCH_ HOST')
},
}
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.elasticsearch_backend.ElasticSearchEngine',
},
}
# ZYY 插件系统配置
PLUGINS_DIR = BASE_DIR / 'plugins'
ACTIVE_PLUGINS = [
'article_copyright',
'reading_time',
'external_links',
'view_count',
'seo_optimizer'
'article_copyright',
'reading_time',
' external_links',
'view_count',
'seo_optimizer'
]

@ -1,59 +1,73 @@
# ZYY 导入必要的模块和类
from django.contrib.sitemaps import Sitemap
from django.urls import reverse
from blog.models import Article, Category, Tag
# ZYY 定义静态视图站点地图类,用于生成博客首页等静态页面的站点地图
class StaticViewSitemap(Sitemap):
priority = 0.5
changefreq = 'daily'
priority = 0.5 # ZYY 优先级设置为0.5
changefreq = 'daily' # ZYY 更新频率设置为每天
def items(self):
# ZYY 返回需要生成站点地图的URL名称列表这里只有博客首页
return ['blog:index', ]
def location(self, item):
# ZYY 根据URL名称生成具体的URL地址
return reverse(item)
# ZYY 定义文章站点地图类,用于生成文章页面的站点地图
class ArticleSiteMap(Sitemap):
changefreq = "monthly"
priority = "0.6"
changefreq = "monthly" # ZYY 更新频率设置为每月
priority = "0.6" # ZYY 优先级设置为0.6
def items(self):
# ZYY 返回所有状态为'p'(已发布)的文章对象
return Article.objects.filter(status='p')
def lastmod(self, obj):
# ZYY 返回文章的最后修改时间
return obj.last_modify_time
# ZYY 定义分类站点地图类,用于生成分类页面的站点地图
class CategorySiteMap(Sitemap):
changefreq = "Weekly"
priority = "0.6"
changefreq = "Weekly" # ZYY 更新频率设置为每周
priority = "0.6" # ZYY 优先级设置为0.6
def items(self):
# ZYY 返回所有的分类对象
return Category.objects.all()
def lastmod(self, obj):
# ZYY 返回分类的最后修改时间
return obj.last_modify_time
# ZYY 定义标签站点地图类,用于生成标签页面的站点地图
class TagSiteMap(Sitemap):
changefreq = "Weekly"
priority = "0.3"
changefreq = "Weekly" # ZYY 更新频率设置为每周
priority = "0.3" # ZYY 优先级设置为0.3
def items(self):
# ZYY 返回所有的标签对象
return Tag.objects.all()
def lastmod(self, obj):
# ZYY 返回标签的最后修改时间
return obj.last_modify_time
# ZYY 定义用户站点地图类,用于生成用户页面的站点地图
class UserSiteMap(Sitemap):
changefreq = "Weekly"
priority = "0.3"
changefreq = "Weekly" # ZYY 更新频率设置为每周
priority = "0.3" # ZYY 优先级设置为0.3
def items(self):
# ZYY 返回所有文章的作者集合(去重)
return list(set(map(lambda x: x.author, Article.objects.all())))
def lastmod(self, obj):
return obj.date_joined
# ZYY 返回用户的注册时间作为最后修改时间
return obj.date_joined

@ -1,21 +1,32 @@
# ZYY 导入日志模块,用于记录程序运行状态
import logging
# ZYY 导入requests库用于发送HTTP请求
import requests
# ZYY 导入Django配置用于获取项目设置
from django.conf import settings
# ZYY 获取当前模块的日志记录器
logger = logging.getLogger(__name__)
# ZYY 定义蜘蛛通知类用于向搜索引擎提交URL通知
class SpiderNotify():
@staticmethod
# ZYY 静态方法用于通知百度蜘蛛抓取指定URL列表
def baidu_notify(urls):
try:
# ZYY 将URL列表拼接成字符串每个URL占一行
data = '\n'.join(urls)
# ZYY 向百度推送接口发送POST请求提交URL数据
result = requests.post(settings.BAIDU_NOTIFY_URL, data=data)
# ZYY 记录百度接口返回的结果(成功响应)
logger.info(result.text)
except Exception as e:
# ZYY 捕获并记录所有异常(如网络错误、接口错误等)
logger.error(e)
@staticmethod
# ZYY 静态方法,提供统一的通知接口(目前仅调用百度通知)
def notify(url):
SpiderNotify.baidu_notify(url)
# ZYY 调用百度通知方法注意这里参数名是url但实际传入的是单个URL可能存在命名歧义
SpiderNotify.baidu_notify(url)

@ -5,11 +5,18 @@ from djangoblog.utils import *
class DjangoBlogTest(TestCase):
def setUp(self):
# ZYY: 测试用例初始化方法,当前为空实现
pass
def test_utils(self):
# ZYY: 测试utils模块中的工具函数
# ZYY: 测试get_sha256函数验证字符串'test'的sha256哈希值不为空
md5 = get_sha256('test')
self.assertIsNotNone(md5)
# ZYY: 测试CommonMarkdown类的get_markdown方法
# ZYY: 输入包含标题、代码块和链接的Markdown文本验证输出不为空
c = CommonMarkdown.get_markdown('''
# Title1
@ -24,9 +31,12 @@ class DjangoBlogTest(TestCase):
''')
self.assertIsNotNone(c)
# ZYY: 测试parse_dict_to_url函数
# ZYY: 创建测试字典验证字典转URL参数字符串的功能
d = {
'd': 'key1',
'd2': 'key2'
}
data = parse_dict_to_url(d)
self.assertIsNotNone(data)
self.assertIsNotNone(data)# ZYY: 验证转换结果不为空

@ -13,52 +13,92 @@ Including another URLconf
1. Import the include() function: from django.conf.urls import url, include
2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls'))
"""
# ZYY: 导入Django基础配置模块
from django.conf import settings
# ZYY: 导入国际化URL模式工具
from django.conf.urls.i18n import i18n_patterns
# ZYY: 导入静态文件URL处理工具
from django.conf.urls.static import static
# ZYY: 导入站点地图视图
from django.contrib.sitemaps.views import sitemap
# ZYY: 导入URL路径处理函数
from django.urls import path, include
# ZYY: 导入正则表达式URL模式
from django.urls import re_path
# ZYY: 导入Haystack搜索视图工厂
from haystack.views import search_view_factory
# ZYY: 导入自定义视图和模块
from blog.views import EsSearchView
from djangoblog.admin_site import admin_site
from djangoblog.elasticsearch_backend import ElasticSearchModelSearchForm
from djangoblog.feeds import DjangoBlogFeed
from djangoblog.sitemap import ArticleSiteMap, CategorySiteMap, StaticViewSitemap, TagSiteMap, UserSiteMap
# ZYY: 定义站点地图配置,包含各类内容的站点地图
sitemaps = {
'blog': ArticleSiteMap,
'Category': CategorySiteMap,
'Tag': TagSiteMap,
'User': UserSiteMap,
'static': StaticViewSitemap
'blog': ArticleSiteMap, # ZYY: 文章站点地图
'Category': CategorySiteMap, # ZYY: 分类站点地图
'Tag': TagSiteMap, # ZYY: 标签站点地图
'User': UserSiteMap, # ZYY: 用户站点地图
'static': StaticViewSitemap # ZYY: 静态页面站点地图
}
handler404 = 'blog.views.page_not_found_view'
handler500 = 'blog.views.server_error_view'
handle403 = 'blog.views.permission_denied_view'
# ZYY: 定义自定义错误处理视图
handler404 = 'blog.views.page_not_found_view' # ZYY: 404错误处理
handler500 = 'blog.views.server_error_view' # ZYY: 500错误处理
handle403 = 'blog.views.permission_denied_view' # ZYY: 403错误处理
# ZYY: 基础URL模式配置
urlpatterns = [
# ZYY: 国际化URL前缀处理
path('i18n/', include('django.conf.urls.i18n')),
]
# ZYY: 添加国际化URL模式
urlpatterns += i18n_patterns(
re_path(r'^admin/', admin_site.urls),
# ZYY: 管理后台URL使用自定义admin_site
re_path(r'admin/', admin_site.urls),
# ZYY: 博客应用URL命名空间为blog
re_path(r'', include('blog.urls', namespace='blog')),
# ZYY: Markdown编辑器URL
re_path(r'mdeditor/', include('mdeditor.urls')),
# ZYY: 评论系统URL命名空间为comment
re_path(r'', include('comments.urls', namespace='comment')),
# ZYY: 账户系统URL命名空间为account
re_path(r'', include('accounts.urls', namespace='account')),
# ZYY: 第三方登录URL命名空间为oauth
re_path(r'', include('oauth.urls', namespace='oauth')),
re_path(r'^sitemap\.xml$', sitemap, {'sitemaps': sitemaps},
# ZYY: 站点地图URL生成sitemap.xml
re_path(r'sitemap\.xml$', sitemap, {'sitemaps': sitemaps},
name='django.contrib.sitemaps.views.sitemap'),
re_path(r'^feed/$', DjangoBlogFeed()),
re_path(r'^rss/$', DjangoBlogFeed()),
re_path('^search', search_view_factory(view_class=EsSearchView, form_class=ElasticSearchModelSearchForm),
# ZYY: RSS订阅URL两种访问路径
re_path(r'feed/$', DjangoBlogFeed()),
re_path(r'rss/$', DjangoBlogFeed()),
# ZYY: 搜索功能URL使用Elasticsearch搜索视图
re_path('search', search_view_factory(view_class=EsSearchView, form_class=ElasticSearchModelSearchForm),
name='search'),
# ZYY: 服务器管理URL命名空间为servermanager
re_path(r'', include('servermanager.urls', namespace='servermanager')),
re_path(r'', include('owntracks.urls', namespace='owntracks'))
, prefix_default_language=False) + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
# ZYY: OwnTracks应用URL命名空间为owntracks
re_path(r'', include('owntracks.urls', namespace='owntracks')),
# ZYY: 禁用默认语言前缀prefix_default_language=False
prefix_default_language=False
) + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT) # ZYY: 添加静态文件URL处理
# ZYY: 开发环境专用配置DEBUG模式下
if settings.DEBUG:
# ZYY: 添加媒体文件URL处理仅开发环境
urlpatterns += static(settings.MEDIA_URL,
document_root=settings.MEDIA_ROOT)
document_root=settings.MEDIA_ROOT)

@ -8,7 +8,6 @@ import random
import string
import uuid
from hashlib import sha256
import bleach
import markdown
import requests
@ -16,21 +15,20 @@ from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
logger = logging.getLogger(__name__)
# ZYY: 获取最新的文章ID和评论ID
def get_max_articleid_commentid():
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
# ZYY: 获取字符串的SHA256哈希值
def get_sha256(str):
m = sha256(str.encode('utf-8'))
return m.hexdigest()
# ZYY: 缓存装饰器 ,用于缓存函数结果
def cache_decorator(expiration=3 * 60):
def wrapper(func):
def news(*args, **kwargs):
@ -41,7 +39,6 @@ def cache_decorator(expiration=3 * 60):
key = None
if not key:
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
value = cache.get(key)
@ -53,7 +50,7 @@ def cache_decorator(expiration=3 * 60):
return value
else:
logger.debug(
'cache_decorator set cache:%s key:%s' %
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
value = func(*args, **kwargs)
if value is None:
@ -61,44 +58,40 @@ def cache_decorator(expiration=3 * 60):
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
# ZYY: 刷新视图缓存
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
:param path:url路径
:param servername:host
:param serverport:端口
:param key_prefix:前缀
: param key_prefix:前缀
:return:是否成功
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.META = {'SERVER_NAME': servername , 'SERVER_PORT': serverport}
request.path = path
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
return True
return True
return False
# ZYY: 使用缓存装饰器获取当前站点信息
@cache_decorator()
def get_current_site():
site = Site.objects.get_current()
site = Site.objects.get_current ()
return site
class CommonMarkdown:
# ZYY: 将Markdown文本转换为HTML并生成目录
@staticmethod
def _convert_markdown(value):
md = markdown.Markdown(
@ -113,17 +106,19 @@ class CommonMarkdown:
toc = md.toc
return body, toc
# ZYY: 获取带有目录的Markdown转换结果
@staticmethod
def get_markdown_with_toc(value):
body, toc = CommonMarkdown._convert_markdown(value)
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
# ZYY: 获取Markdown转换结果不带目录
@staticmethod
def get_markdown(value):
body, toc = CommonMarkdown._convert_markdown(value)
return body
# ZYY: 发送邮件
def send_email(emailto, title, content):
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
@ -132,19 +127,19 @@ def send_email(emailto, title, content):
title=title,
content=content)
# ZYY: 生成随机数验证码
def generate_code() -> str:
"""生成随机数验证码"""
return ''.join(random.sample(string.digits, 6))
return ''.join(random .sample(string.digits, 6))
# ZYY: 将字典解析为URL字符串
def parse_dict_to_url(dict):
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
# ZYY: 获取博客设置,如果缓存中没有则从数据库加载并设置缓存
def get_blog_setting():
value = cache.get('get_blog_setting')
if value:
@ -163,7 +158,7 @@ def get_blog_setting():
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
@ -172,7 +167,7 @@ def get_blog_setting():
cache.set('get_blog_setting', value)
return value
# ZYY: 保存用户头像到本地
def save_user_avatar(url):
'''
保存用户头像
@ -180,14 +175,12 @@ def save_user_avatar(url):
:return: 本地路径
'''
logger.info(url)
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir)
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
@ -195,12 +188,12 @@ def save_user_avatar(url):
logger.info('保存用户头像:' + basedir + save_filename)
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename)
return static('avatar /' + save_filename)
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png')
# ZYY: 删除侧边栏缓存
def delete_sidebar_cache():
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
@ -208,13 +201,13 @@ def delete_sidebar_cache():
logger.info('delete sidebar key:' + k)
cache.delete(k)
# ZYY: 删除视图缓存
def delete_view_cache(prefix, keys):
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
# ZYY: 获取静态资源URL
def get_resource_url():
if settings.STATIC_URL:
return settings.STATIC_URL
@ -222,11 +215,11 @@ def get_resource_url():
site = get_current_site()
return 'http://' + site.domain + '/static/'
# ZYY: 允许的HTML标签和属性
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
# ZYY: 清理HTML只保留允许的标签和属性
def sanitize_html(html):
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)

@ -1,7 +1,7 @@
# encoding: utf-8
from __future__ import absolute_import, division, print_function, unicode_literals
# ZYY: 导入必要的模块和函数
import json
import os
import re
@ -34,12 +34,14 @@ from whoosh.qparser import QueryParser
from whoosh.searching import ResultsPage
from whoosh.writing import AsyncWriter
# ZYY: 尝试导入whoosh模块如果失败则抛出异常
try:
import whoosh
except ImportError:
raise MissingDependency(
"The 'whoosh' backend requires the installation of 'Whoosh'. Please refer to the documentation.")
# ZYY: 检查whoosh版本是否满足要求
# Handle minimum requirement.
if not hasattr(whoosh, '__version__') or whoosh.__version__ < (2, 5, 0):
raise MissingDependency(
@ -52,16 +54,20 @@ DATETIME_REGEX = re.compile(
LOCALS = threading.local()
LOCALS.RAM_STORE = None
# ZYY: 自定义的HtmlFormatter类用于在whoosh中统一结果格式
class WhooshHtmlFormatter(HtmlFormatter):
"""
This is a HtmlFormatter simpler than the whoosh.HtmlFormatter.
We use it to have consistent results across backends. Specifically,
Solr, Xapian and Elasticsearch are using this formatting.
"""
"""
ZYY: 这是一个比whoosh.HtmlFormatter更简单的HtmlFormatter
我们使用它来确保不同后端如Solr, Xapian和Elasticsearch之间的结果一致性
"""
template = '<%(tag)s>%(t)s</%(tag)s>'
# ZYY: Whoosh搜索后端类
class WhooshSearchBackend(BaseSearchBackend):
# Word reserved by Whoosh for special use.
RESERVED_WORDS = (
@ -70,7 +76,7 @@ class WhooshSearchBackend(BaseSearchBackend):
'OR',
'TO',
)
# ZYY: Whoosh保留的特殊用途字符
# Characters reserved by Whoosh for special use.
# The '\\' must come first, so as not to overwrite the other slash
# replacements.
@ -80,6 +86,7 @@ class WhooshSearchBackend(BaseSearchBackend):
)
def __init__(self, connection_alias, **connection_options):
# ZYY: 初始化Whoosh搜索后端
super(
WhooshSearchBackend,
self).__init__(
@ -107,9 +114,13 @@ class WhooshSearchBackend(BaseSearchBackend):
"""
Defers loading until needed.
"""
"""
ZYY: 延迟加载直到需要时才加载
"""
from haystack import connections
new_index = False
# ZYY: 确保索引存在
# Make sure the index is there.
if self.use_file_storage and not os.path.exists(self.path):
os.makedirs(self.path)
@ -144,12 +155,14 @@ class WhooshSearchBackend(BaseSearchBackend):
self.setup_complete = True
# ZYY: 构建索引模式
def build_schema(self, fields):
schema_fields = {
ID: WHOOSH_ID(stored=True, unique=True),
DJANGO_CT: WHOOSH_ID(stored=True),
DJANGO_ID: WHOOSH_ID(stored=True),
}
# ZYY: 获取Haystack中硬编码的键数量用于后续的错误处理
# Grab the number of keys that are hard-coded into Haystack.
# We'll use this to (possibly) fail slightly more gracefully later.
initial_key_count = len(schema_fields)
@ -173,6 +186,7 @@ class WhooshSearchBackend(BaseSearchBackend):
schema_fields[field_class.index_fieldname] = NUMERIC(
stored=field_class.stored, numtype=float, field_boost=field_class.boost)
elif field_class.field_type == 'boolean':
# ZYY: BOOLEAN字段不支持field_boost
# Field boost isn't supported on BOOLEAN as of 1.8.2.
schema_fields[field_class.index_fieldname] = BOOLEAN(
stored=field_class.stored)
@ -184,13 +198,14 @@ class WhooshSearchBackend(BaseSearchBackend):
stored=field_class.stored,
field_boost=field_class.boost)
else:
# ZYY: 使用中文分析器
# schema_fields[field_class.index_fieldname] = TEXT(stored=True, analyzer=StemmingAnalyzer(), field_boost=field_class.boost, sortable=True)
schema_fields[field_class.index_fieldname] = TEXT(
stored=True, analyzer=ChineseAnalyzer(), field_boost=field_class.boost, sortable=True)
if field_class.document is True:
content_field_name = field_class.index_fieldname
schema_fields[field_class.index_fieldname].spelling = True
# ZYY: 如果没有找到任何字段,则抛出错误
# Fail more gracefully than relying on the backend to die if no fields
# are found.
if len(schema_fields) <= initial_key_count:
@ -198,7 +213,7 @@ class WhooshSearchBackend(BaseSearchBackend):
"No fields were found in any search_indexes. Please correct this before attempting to search.")
return (content_field_name, Schema(**schema_fields))
# ZYY: 更新索引
def update(self, index, iterable, commit=True):
if not self.setup_complete:
self.setup()
@ -212,11 +227,13 @@ class WhooshSearchBackend(BaseSearchBackend):
except SkipDocument:
self.log.debug(u"Indexing for object `%s` skipped", obj)
else:
# ZYY: 确保所有数据都是unicode格式
# Really make sure it's unicode, because Whoosh won't have it any
# other way.
for key in doc:
doc[key] = self._from_python(doc[key])
# ZYY: Whoosh 2.5.0+不支持文档boost
# Document boosts aren't supported in Whoosh 2.5.0+.
if 'boost' in doc:
del doc['boost']
@ -227,6 +244,7 @@ class WhooshSearchBackend(BaseSearchBackend):
if not self.silently_fail:
raise
# ZYY: 记录错误信息
# We'll log the object identifier but won't include the actual object
# to avoid the possibility of that generating encoding errors while
# processing the log message:
@ -240,10 +258,12 @@ class WhooshSearchBackend(BaseSearchBackend):
"object": get_identifier(obj)}})
if len(iterable) > 0:
# ZYY: 无论如何都提交更改,以避免锁定问题
# For now, commit no matter what, as we run into locking issues
# otherwise.
writer.commit()
# ZYY: 从索引中删除文档
def remove(self, obj_or_string, commit=True):
if not self.setup_complete:
self.setup()
@ -266,6 +286,7 @@ class WhooshSearchBackend(BaseSearchBackend):
e,
exc_info=True)
# ZYY: 清空索引
def clear(self, models=None, commit=True):
if not self.setup_complete:
self.setup()
@ -302,8 +323,9 @@ class WhooshSearchBackend(BaseSearchBackend):
else:
self.log.error(
"Failed to clear Whoosh index: %s", e, exc_info=True)
# ZYY: 删除索引文件
def delete_index(self):
# ZYY: 根据Whoosh邮件列表的建议如果要从索引中删除所有内容更高效的方法是直接删除索引文件
# Per the Whoosh mailing list, if wiping out everything from the index,
# it's much more efficient to simply delete the index files.
if self.use_file_storage and os.path.exists(self.path):
@ -311,9 +333,11 @@ class WhooshSearchBackend(BaseSearchBackend):
elif not self.use_file_storage:
self.storage.clean()
# ZYY: 重新创建索引
# Recreate everything.
self.setup()
# ZYY: 优化索引
def optimize(self):
if not self.setup_complete:
self.setup()
@ -321,12 +345,15 @@ class WhooshSearchBackend(BaseSearchBackend):
self.index = self.index.refresh()
self.index.optimize()
# ZYY: 计算分页
def calculate_page(self, start_offset=0, end_offset=None):
# Prevent against Whoosh throwing an error. Requires an end_offset
# greater than 0.
# ZYY: 防止Whoosh抛出错误要求end_offset大于0
if end_offset is not None and end_offset <= 0:
end_offset = 1
# ZYY: 确定页码
# Determine the page.
page_num = 0
@ -341,6 +368,7 @@ class WhooshSearchBackend(BaseSearchBackend):
if page_length and page_length > 0:
page_num = int(start_offset / page_length)
# ZYY: Whoosh使用1-based的页码
# Increment because Whoosh uses 1-based page numbers.
page_num += 1
return page_num, page_length
@ -368,7 +396,8 @@ class WhooshSearchBackend(BaseSearchBackend):
**kwargs):
if not self.setup_complete:
self.setup()
# ZYY: 空查询应该返回无结果
# A zero length query should return no results.
if len(query_string) == 0:
return {
@ -377,7 +406,7 @@ class WhooshSearchBackend(BaseSearchBackend):
}
query_string = force_str(query_string)
# ZYY: 单字符查询(非通配符)会被停用词过滤器过滤掉,应该返回无结果
# A one-character query (non-wildcard) gets nabbed by a stopwords
# filter and should yield zero results.
if len(query_string) <= 1 and query_string != u'*':
@ -389,6 +418,7 @@ class WhooshSearchBackend(BaseSearchBackend):
reverse = False
if sort_by is not None:
# ZYY: 确定是否需要反转结果以及Whoosh是否能处理排序要求
# Determine if we need to reverse the results and if Whoosh can
# handle what it's being asked to sort by. Reversing is an
# all-or-nothing action, unfortunately.
@ -445,6 +475,7 @@ class WhooshSearchBackend(BaseSearchBackend):
if models and len(models):
model_choices = sorted(get_model_ct(model) for model in models)
elif limit_to_registered_models:
# ZYY: 使用窄查询限制结果到当前路由器处理的模型
# Using narrow queries, limit the results to only models handled
# with the current routers.
model_choices = self.build_models_list()
@ -461,6 +492,7 @@ class WhooshSearchBackend(BaseSearchBackend):
narrow_searcher = None
if narrow_queries is not None:
# ZYY: 潜在的高开销操作
# Potentially expensive? I don't see another way to do it in
# Whoosh...
narrow_searcher = self.index.searcher()
@ -486,6 +518,7 @@ class WhooshSearchBackend(BaseSearchBackend):
searcher = self.index.searcher()
parsed_query = self.parser.parse(query_string)
# ZYY: 处理无效/停用词查询,优雅恢复
# In the event of an invalid/stopworded query, recover gracefully.
if parsed_query is None:
return {
@ -502,6 +535,7 @@ class WhooshSearchBackend(BaseSearchBackend):
'reverse': reverse,
}
# ZYY: 处理已缩小范围的查询结果
# Handle the case where the results have been narrowed.
if narrowed_results is not None:
search_kwargs['filter'] = narrowed_results
@ -521,7 +555,7 @@ class WhooshSearchBackend(BaseSearchBackend):
'hits': 0,
'spelling_suggestion': None,
}
# ZYY: Whoosh 2.5.1如果请求的页码过高,会返回错误的页码
# Because as of Whoosh 2.5.1, it will return the wrong page of
# results if you request something too high. :(
if raw_page.pagenum < page_num:
@ -559,7 +593,7 @@ class WhooshSearchBackend(BaseSearchBackend):
'hits': 0,
'spelling_suggestion': spelling_suggestion,
}
# ZYY: 查找类似文档
def more_like_this(
self,
model_instance,
@ -572,7 +606,7 @@ class WhooshSearchBackend(BaseSearchBackend):
**kwargs):
if not self.setup_complete:
self.setup()
# ZYY: 延迟模型将具有不同的类名("RealClass_Deferred_fieldname"),该类名不会在我们的注册表中
# Deferred models will have a different class ("RealClass_Deferred_fieldname")
# which won't be in our registry:
model_klass = model_instance._meta.concrete_model
@ -589,6 +623,7 @@ class WhooshSearchBackend(BaseSearchBackend):
if models and len(models):
model_choices = sorted(get_model_ct(model) for model in models)
elif limit_to_registered_models:
# ZYY: 使用窄查询限制结果到当前路由器处理的模型
# Using narrow queries, limit the results to only models handled
# with the current routers.
model_choices = self.build_models_list()
@ -608,6 +643,7 @@ class WhooshSearchBackend(BaseSearchBackend):
narrow_searcher = None
if narrow_queries is not None:
# ZYY: 潜在的高开销操作
# Potentially expensive? I don't see another way to do it in
# Whoosh...
narrow_searcher = self.index.searcher()
@ -642,10 +678,11 @@ class WhooshSearchBackend(BaseSearchBackend):
raw_results = results[0].more_like_this(
field_name, top=end_offset)
# ZYY: 处理已缩小范围的查询结果
# Handle the case where the results have been narrowed.
if narrowed_results is not None and hasattr(raw_results, 'filter'):
raw_results.filter(narrowed_results)
# ZYY: Whoosh 2.5.1如果请求的页码过高,会返回错误的页码
try:
raw_page = ResultsPage(raw_results, page_num, page_length)
except ValueError:
@ -674,7 +711,7 @@ class WhooshSearchBackend(BaseSearchBackend):
narrow_searcher.close()
return results
# ZYY: 处理查询结果
def _process_results(
self,
raw_page,
@ -684,174 +721,153 @@ class WhooshSearchBackend(BaseSearchBackend):
result_class=None):
from haystack import connections
results = []
# ZYY: 获取命中数
# It's important to grab the hits first before slicing. Otherwise, this
# can cause pagination failures.
hits = len(raw_page)
if result_class is None:
result_class = SearchResult
facets = {}
spelling_suggestion = None
unified_index = connections[self.connection_alias].get_unified_index()
indexed_models = unified_index.get_indexed_models()
for doc_offset, raw_result in enumerate(raw_page):
score = raw_page.score(doc_offset) or 0
app_label, model_name = raw_result[DJANGO_CT].split('.')
additional_fields = {}
model = haystack_get_model(app_label, model_name)
if model and model in indexed_models:
for key, value in raw_result.items():
index = unified_index.get_index(model)
string_key = str(key)
if string_key in index.fields and hasattr(
index.fields[string_key], 'convert'):
# Special-cased due to the nature of KEYWORD fields.
if index.fields[string_key].is_multivalued:
if value is None or len(value) == 0:
additional_fields[string_key] = []
else:
additional_fields[string_key] = value.split(
',')
else:
additional_fields[string_key] = index.fields[string_key].convert(
value)
# ZYY: 如果result_class为None则将其设置为SearchResult
if result_class is None:
result_class = SearchResult
facets = {}
spelling_suggestion = None
# ZYY: 获取统一索引
unified_index = connections[self.connection_alias].get_unified_index()
# ZYY: 获取已索引的模型列表
indexed_models = unified_index.get_indexed_models()
for doc_offset, raw_result in enumerate(raw_page):
# ZYY: 获取文档得分如果没有则为0
score = raw_page.score(doc_offset) or 0
app_label, model_name = raw_result[DJANGO_CT].split('.')
additional_fields = {}
# ZYY: 根据app_label和model_name获取模型
model = haystack_get_model(app_label, model_name)
if model and model in indexed_models:
for key, value in raw_result.items():
index = unified_index.get_index(model)
string_key = str(key)
# ZYY : 检查字段是否在索引中且具有convert方法特殊处理KEYWORD字段
if string_key in index.fields and hasattr(
index.fields[string_key], 'convert'):
# ZYY: 处理多值字段
if index.fields[string_key].is_multivalued:
if value is None or len(value) == 0:
additional_fields[string_key] = []
else:
additional_fields[string_key] = self._to_python(value)
del (additional_fields[DJANGO_CT])
del (additional_fields[DJANGO_ID])
if highlight:
sa = StemmingAnalyzer()
formatter = WhooshHtmlFormatter('em')
terms = [token.text for token in sa(query_string)]
whoosh_result = whoosh_highlight(
additional_fields.get(self.content_field_name),
terms,
sa,
ContextFragmenter(),
formatter
)
additional_fields['highlighted'] = {
self.content_field_name: [whoosh_result],
}
result = result_class(
app_label,
model_name,
raw_result[DJANGO_ID],
score,
**additional_fields)
results.append(result)
else:
hits -= 1
if self.include_spelling:
if spelling_query:
spelling_suggestion = self.create_spelling_suggestion(
spelling_query)
additional_fields[string_key] = value.split(
',')
else:
additional_fields[string_key] = index.fields[string_key].convert(
value)
else:
spelling_suggestion = self.create_spelling_suggestion(
query_string)
return {
'results': results,
'hits': hits,
'facets': facets,
'spelling_suggestion': spelling_suggestion,
}
def create_spelling_suggestion(self, query_string):
spelling_suggestion = None
reader = self.index.reader()
corrector = reader.corrector(self.content_field_name)
cleaned_query = force_str(query_string)
if not query_string:
return spelling_suggestion
# Clean the string.
for rev_word in self.RESERVED_WORDS:
cleaned_query = cleaned_query.replace(rev_word, '')
for rev_char in self.RESERVED_CHARACTERS:
cleaned_query = cleaned_query.replace(rev_char, '')
# Break it down.
query_words = cleaned_query.split()
suggested_words = []
for word in query_words:
suggestions = corrector.suggest(word, limit=1)
if len(suggestions) > 0:
suggested_words.append(suggestions[0])
spelling_suggestion = ' '.join(suggested_words)
additional_fields[string_key] = self._to_python(value)
del (additional_fields[DJANGO_CT])
del (additional_fields[DJANGO_ID])
if highlight:
sa = StemmingAnalyzer()
formatter = WhooshHtmlFormatter('em')
terms = [token .text for token in sa(query_string)]
# ZYY: 使用Whoosh进行高亮显示
whoosh_result = whoosh_highlight(
additional_fields.get(self.content_field_name),
terms,
sa,
ContextFragmenter(),
formatter
)
additional_fields['highlighted'] = {
self.content_field_name: [whoosh_result],
}
result = result_class(
app_label,
model_name,
raw_result[DJANGO_ID],
score,
**additional_fields)
results.append(result)
else:
hits -= 1
if self.include_spelling:
if spelling_query:
# ZYY: 根据spelling_query创建拼写建议
spelling_suggestion = self.create_spelling_suggestion(
spelling_query)
else:
# ZYY: 根据query_string创建拼写建议
spelling_suggestion = self.create_spelling_suggestion(
query_string)
return {
'results': results,
'hits': hits,
'facets': facets,
'spelling_suggestion': spelling_suggestion,
}
# ZYY: 创建拼写建议的方法
def create_spelling_suggestion(self, query_string):
spelling_suggestion = None
reader = self.index.reader()
corrector = reader.corrector(self.content_field_name)
cleaned_query = force_str(query_string)
if not query_string:
return spelling_suggestion
def _from_python(self, value):
"""
Converts Python values to a string for Whoosh.
Code courtesy of pysolr.
"""
if hasattr(value, 'strftime'):
if not hasattr(value, 'hour'):
value = datetime(value.year, value.month, value.day, 0, 0, 0)
elif isinstance(value, bool):
if value:
value = 'true'
else:
value = 'false'
elif isinstance(value, (list, tuple)):
value = u','.join([force_str(v) for v in value])
elif isinstance(value, (six.integer_types, float)):
# Leave it alone.
pass
# ZYY: 清理字符串,移除保留字和保留字符
for rev_word in self.RESERVED_WORDS:
cleaned_query = cleaned_query.replace(rev_word, '')
for rev_char in self.RESERVED_CHARACTERS:
cleaned_query = cleaned_query.replace(rev_char, '')
# ZYY: 分解查询词
query_words = cleaned_query.split()
suggested_words = []
for word in query_words:
suggestions = corrector.suggest(word, limit=1)
if len(suggestions) > 0:
suggested_words.append(suggestions[0])
spelling_suggestion = ' '.join(suggested_words)
return spelling_suggestion
# ZYY: 将Python值转换为Whoosh使用的字符串
def _from_python(self, value):
if hasattr(value, 'strftime'):
if not hasattr(value, 'hour'):
value = datetime(value.year, value.month, value.day, 0, 0, 0)
elif isinstance(value, bool):
if value:
value = ' true'
else:
value = force_str(value)
return value
def _to_python(self, value):
"""
Converts values from Whoosh to native Python values.
A port of the same method in pysolr, as they deal with data the same way.
"""
if value == 'true':
return True
elif value == 'false':
return False
if value and isinstance(value, six.string_types):
possible_datetime = DATETIME_REGEX.search(value)
if possible_datetime:
date_values = possible_datetime.groupdict()
for dk, dv in date_values.items():
date_values[dk] = int(dv)
return datetime(
date_values['year'],
date_values['month'],
date_values['day'],
date_values['hour'],
date_values['minute'],
date_values['second'])
value = 'false'
elif isinstance(value, (list, tuple)):
value = u','.join([force_str(v) for v in value])
elif isinstance(value, (six.integer_types, float)):
# ZYY: 整数和浮点数保持不变
pass
else:
value = force_str(value)
return value
# ZYY: 将Whoosh的值转换为Python原生值
def _to_python(self, value):
if value == 'true ':
return True
elif value == 'false':
return False
if value and isinstance(value, six.string_types):
possible_datetime = DATETIME_REGEX.search(value)
if possible_datetime:
date_values = possible_datetime.groupdict()
for dk, dv in date_values.items():
date_values[dk] = int(dv)
return datetime(
date_values['year'],
date_values['month'],
date_values ['day'],
date_values['hour'],
date_values['minute'],
date_values['second'])
try:
# Attempt to use json to load the values.
# ZYY: 尝试使用json加载值
converted_value = json.loads(value)
# Try to handle most built-in types.
# ZYY: 尝试处理大多数内置类型
if isinstance(
converted_value,
(list,
@ -863,182 +879,139 @@ class WhooshSearchBackend(BaseSearchBackend):
complex)):
return converted_value
except BaseException:
# If it fails (SyntaxError or its ilk) or we don't trust it,
# continue on.
pass
return value
# ZYY: 如果失败或不信任,则继续
pass
return value
class WhooshSearchQuery(BaseSearchQuery):
# ZYY: 将datetime对象转换为Whoosh使用的字符串
def _convert_datetime(self, date):
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S'))
else:
return force_str(date.strftime('%Y%m%d000000'))
return force_str(date.strftime('%Y%m%d000000'))
# ZYY: 清理查询片段,处理保留字和保留字符
def clean(self, query_fragment):
"""
Provides a mechanism for sanitizing user input before presenting the
value to the backend.
Whoosh 1.X differs here in that you can no longer use a backslash
to escape reserved characters. Instead, the whole word should be
quoted.
"""
words = query_fragment.split()
cleaned_words = []
for word in words:
if word in self.backend.RESERVED_WORDS:
word = word.replace(word, word.lower())
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = "'%s'" % word
break
cleaned_words.append(word)
cleaned_words. append(word)
return ' '.join(cleaned_words)
# ZYY: 构建查询片段
def build_query_fragment(self, field, filter_type, value):
from haystack import connections
query_frag = ''
is_datetime = False
if not hasattr(value, 'input_type_name'):
# Handle when we've got a ``ValuesListQuerySet``...
# ZYY: 处理ValuesListQuerySet的情况
if hasattr(value, 'values_list'):
value = list(value)
if hasattr(value, 'strftime'):
if hasattr(value, ' strftime'):
is_datetime = True
if isinstance(value, six.string_types) and value != ' ':
# It's not an ``InputType``. Assume ``Clean``.
# ZYY: 假设为Clean类型
value = Clean(value)
else:
value = PythonData(value)
# Prepare the query using the InputType.
prepared_value = value.prepare(self)
if not isinstance(prepared_value, (set, list, tuple)):
# Then convert whatever we get back to what pysolr wants if needed.
prepared_value = self.backend._from_python(prepared_value)
# 'content' is a special reserved word, much like 'pk' in
# Django's ORM layer. It indicates 'no special field'.
if field == 'content':
index_fieldname = ''
else:
index_fieldname = u'%s:' % connections[self._using].get_unified_index(
).get_index_fieldname(field)
filter_types = {
'content': '%s',
'contains': '*%s*',
'endswith': "*%s",
'startswith': "%s*",
'exact': '%s',
'gt': "{%s to}",
'gte': "[%s to]",
'lt': "{to %s}",
'lte': "[to %s]",
'fuzzy': u'%s~',
}
if value.post_process is False:
query_frag = prepared_value
else:
if filter_type in [
'content',
'contains',
'startswith',
'endswith',
'fuzzy']:
if value.input_type_name == 'exact':
query_frag = prepared_value
else:
# Iterate over terms & incorportate the converted form of
# each into the query.
terms = []
if isinstance(prepared_value, six.string_types):
possible_values = prepared_value.split(' ')
# ZYY: 使用InputType准备查询
prepared_value = value.prepare(self)
if not isinstance(prepared_value, (set, list, tuple )):
# ZYY: 转换为pysolr需要的格式
prepared_value = self.backend._from_python(prepared_value)
# ZYY: 'content'是特殊保留字,表示无特殊字段
if field == 'content':
index_fieldname = ''
else:
index_fieldname = u'%s:' % connections[self._using].get_unified_index(
).get_index_fieldname(field)
filter_types = {
'content': '%s',
'contains': '*%s*',
' endswith': "*%s",
'startswith': "%s*",
'exact': '%s',
'gt': "{%s to}",
'gte': "[%s to]",
'lt': "{to %s}",
'lte': "[to %s]",
'fuzzy': u'%s~',
}
if value.post_process is False:
query_frag = prepared_value
else:
if filter_type in [
'content',
'contains',
'startswith ',
'endswith',
'fuzzy']:
if value.input_type_name == 'exact':
query_frag = prepared_value
else:
# ZYY: 遍历词项并合并转换后的形式
terms = []
if isinstance(prepared_value, six.string_types):
possible_values = prepared_value.split(' ')
else:
if is_datetime is True:
prepared_value = self._convert_datetime(
prepared_value)
possible_values = [prepared_value]
for possible_value in possible_values:
terms.append(
filter_types[filter_type] %
self.backend._from_python(possible_value))
if len(terms) == 1:
query_frag = terms[0]
else:
query_frag = u"(%s)" % " AND ".join(terms)
elif filter_type == 'in':
in_options = []
for possible_value in prepared_value:
is_datetime = False
if hasattr(possible_value, 'strftime'):
is_datetime = True
pv = self.backend._from_python (possible_value)
if is_datetime is True:
prepared_value = self._convert_datetime(
prepared_value)
possible_values = [prepared_value]
for possible_value in possible_values:
terms.append(
filter_types[filter_type] %
self.backend._from_python(possible_value))
if len(terms) == 1:
query_frag = terms[0]
else:
query_frag = u"(%s)" % " AND ".join(terms)
elif filter_type == 'in':
in_options = []
for possible_value in prepared_value:
is_datetime = False
if hasattr(possible_value, 'strftime'):
is_datetime = True
pv = self.backend._from_python(possible_value)
if is_datetime is True:
pv = self._convert_datetime(pv)
if isinstance(pv, six.string_types) and not is_datetime:
in_options.append('"%s"' % pv)
pv = self._convert_datetime(pv)
if isinstance(pv, six.string_types) and not is_datetime:
in_options.append('"%s"' % pv)
else:
in_options.append('%s' % pv)
query_frag = "(%s)" % " OR ".join(in_options)
elif filter_type == 'range':
start = self.backend._from_python(prepared_value[0])
end = self.backend._from_python(prepared_value[1])
if hasattr(prepared_value [0], 'strftime'):
start = self._convert_datetime(start)
if hasattr(prepared_value[1], 'strftime'):
end = self._convert_datetime(end)
query_frag = u"[%s to %s]" % (start, end)
elif filter_type == 'exact':
if value.input_type_name == 'exact':
query_frag = prepared_value
else:
in_options.append('%s' % pv)
query_frag = "(%s)" % " OR ".join(in_options)
elif filter_type == 'range':
start = self.backend._from_python(prepared_value[0])
end = self.backend._from_python(prepared_value[1])
if hasattr(prepared_value[0], 'strftime'):
start = self._convert_datetime(start)
if hasattr(prepared_value[1], 'strftime'):
end = self._convert_datetime(end)
query_frag = u"[%s to %s]" % (start, end)
elif filter_type == 'exact':
if value.input_type_name == 'exact':
query_frag = prepared_value
prepared_value = Exact(prepared_value).prepare(self)
query_frag = filter_types[filter_type] % prepared_value
else:
prepared_value = Exact(prepared_value).prepare(self)
if is_datetime is True:
prepared_value = self._convert_datetime(prepared_value)
query_frag = filter_types[filter_type] % prepared_value
else:
if is_datetime is True:
prepared_value = self._convert_datetime(prepared_value)
query_frag = filter_types[filter_type] % prepared_value
if len(query_frag) and not isinstance(value, Raw):
if not query_frag.startswith('(') and not query_frag.endswith(')'):
query_frag = "(%s)" % query_frag
if len(query_frag) and not isinstance(value, Raw):
if not query_frag.startswith('(') and not query_frag.endswith(')'):
query_frag = "(%s)" % query_frag
return u"%s%s" % (index_fieldname, query_frag)
# if not filter_type in ('in', 'range'):
# # 'in' is a bit of a special case, as we don't want to
# # convert a valid list/tuple to string. Defer handling it
# # until later...
# value = self.backend._from_python(value)
class WhooshEngine(BaseEngine):
backend = WhooshSearchBackend
query = WhooshSearchQuery
query = WhooshSearchQuery

@ -6,11 +6,19 @@ It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/1.10/howto/deployment/wsgi/
"""
# ZYY: 导入操作系统模块,用于与操作系统交互(如环境变量设置)
import os
# ZYY: 从Django核心模块导入WSGI应用获取函数
# ZYY: WSGI (Web Server Gateway Interface) 是Python web应用与服务器之间的标准接口
from django.core.wsgi import get_wsgi_application
# ZYY: 设置默认环境变量DJANGO_SETTINGS_MODULE
# ZYY: 该变量指定Django项目的设置模块路径格式为"项目名.settings"
# ZYY: os.environ.setdefault()表示如果环境变量已存在则不覆盖
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoblog.settings")
# ZYY: 获取WSGI应用实例并赋值给application变量
# ZYY: 这是WSGI服务器的入口点服务器将通过这个变量调用Django应用
# ZYY: get_wsgi_application()会加载Django设置并初始化应用
application = get_wsgi_application()

Loading…
Cancel
Save