yyd_第五周注释 #9

Merged
pnry42fjm merged 1 commits from yyd_branch into develop 4 months ago

@ -0,0 +1,19 @@
# 当该包通常是Django应用被导入时打印初始化信息
# 主要用于开发调试,确认包的加载时机和状态
print("__init__.py is running")
# 定义Django应用的默认配置类
# Django在启动时会根据此配置加载应用包括应用名称、信号注册、初始化逻辑等
# 'djangoblog.apps.DjangoblogAppConfig'表示配置类的完整路径:
# - djangoblog应用所在的包名
# - apps存放配置类的模块名
# - DjangoblogAppConfig具体的配置类继承自django.apps.AppConfig
default_app_config = 'djangoblog.apps.DjangoblogAppConfig'
# 导入pymysql库Python连接MySQL的第三方库
import pymysql
# 将pymysql伪装成MySQLdb库
# 背景早期Django默认使用MySQLdb库连接MySQL但MySQLdb不支持Python3
# 作用让Django在使用`import MySQLdb`时实际导入pymysql实现Python3环境下的MySQL连接兼容
pymysql.install_as_MySQLdb()

@ -0,0 +1,103 @@
# 导入Django Admin相关核心组件
from django.contrib.admin import AdminSite # Django Admin站点基类
from django.contrib.admin.models import LogEntry # 管理员操作日志模型
from django.contrib.sites.admin import SiteAdmin # 站点管理的默认Admin配置
from django.contrib.sites.models import Site # Django内置的站点模型用于多站点管理
# 导入各应用的Admin配置和数据模型
from accounts.admin import * # 账户相关的Admin配置
from blog.admin import * # 博客核心功能文章、分类等的Admin配置
from blog.models import * # 博客核心数据模型
from comments.admin import * # 评论功能的Admin配置
from comments.models import * # 评论相关数据模型
from djangoblog.logentryadmin import LogEntryAdmin # 自定义的操作日志Admin配置
from oauth.admin import * # 第三方登录OAuth的Admin配置
from oauth.models import * # OAuth相关数据模型
from owntracks.admin import * # 位置追踪OwnTracks的Admin配置
from owntracks.models import *# OwnTracks相关数据模型
from servermanager.admin import * # 服务器管理的Admin配置
from servermanager.models import * # 服务器管理相关数据模型
class DjangoBlogAdminSite(AdminSite):
"""
自定义的Django Admin站点类继承自Django内置的AdminSite
作用通过重写基类属性和方法定制Admin后台的外观和权限控制
"""
# 定制Admin站点的页面头部标题显示在登录页和后台顶部导航栏
site_header = 'djangoblog administration'
# 定制Admin站点的页面标题显示在浏览器标签页
site_title = 'djangoblog site admin'
def __init__(self, name='admin'):
"""
初始化自定义Admin站点
:param name: 站点名称默认'admin'与Django默认Admin站点名称保持一致避免路由冲突
"""
super().__init__(name) # 调用父类构造方法初始化
def has_permission(self, request):
"""
重写权限检查方法控制谁能访问Admin后台
:param request: HTTP请求对象包含当前用户信息
:return: 布尔值True表示允许访问False表示拒绝访问
此处限制仅超级用户is_superuser可访问比默认的is_staff更严格
"""
return request.user.is_superuser
# 以下为注释掉的自定义URL示例可根据需求启用
# def get_urls(self):
# """
# 扩展Admin站点的URL路由添加自定义功能入口
# """
# # 先获取父类默认的URL配置
# urls = super().get_urls()
# # 导入URL路径处理和自定义视图
# from django.urls import path
# from blog.views import refresh_memcache # 示例:缓存刷新视图
#
# # 定义自定义URL规则使用admin_view()包装确保权限检查
# my_urls = [
# path('refresh/', self.admin_view(refresh_memcache), name="refresh"),
# ]
# # 合并默认URL和自定义URL自定义URL优先
# return urls + my_urls
# 实例化自定义的Admin站点名称为'admin'与Django默认Admin站点名称一致接管后台
admin_site = DjangoBlogAdminSite(name='admin')
# 注册数据模型与对应的Admin配置到自定义Admin站点
# 博客核心内容
admin_site.register(Article, ArticlelAdmin) # 文章模型 + 其Admin配置
admin_site.register(Category, CategoryAdmin) # 分类模型 + 其Admin配置
admin_site.register(Tag, TagAdmin) # 标签模型 + 其Admin配置
admin_site.register(Links, LinksAdmin) # 友情链接模型 + 其Admin配置
admin_site.register(SideBar, SideBarAdmin) # 侧边栏模型 + 其Admin配置
admin_site.register(BlogSettings, BlogSettingsAdmin) # 博客设置模型 + 其Admin配置
# 服务器管理
admin_site.register(commands, CommandsAdmin) # 命令模型 + 其Admin配置
admin_site.register(EmailSendLog, EmailSendLogAdmin)# 邮件发送日志模型 + 其Admin配置
# 账户管理
admin_site.register(BlogUser, BlogUserAdmin) # 自定义用户模型 + 其Admin配置
# 评论管理
admin_site.register(Comment, CommentAdmin) # 评论模型 + 其Admin配置
# OAuth第三方登录
admin_site.register(OAuthUser, OAuthUserAdmin) # OAuth用户模型 + 其Admin配置
admin_site.register(OAuthConfig, OAuthConfigAdmin) # OAuth配置模型 + 其Admin配置
# 位置追踪
admin_site.register(OwnTrackLog, OwnTrackLogsAdmin) # 位置日志模型 + 其Admin配置
# 站点管理Django内置
admin_site.register(Site, SiteAdmin) # 站点模型 + Django默认的SiteAdmin配置
# 操作日志管理
admin_site.register(LogEntry, LogEntryAdmin) # 管理员操作日志模型 + 自定义Admin配置

@ -0,0 +1,43 @@
# 导入Django的AppConfig类用于配置Django应用的生命周期和元数据
from django.apps import AppConfig
class DjangoblogAppConfig(AppConfig):
"""
Django博客应用djangoblog的配置类用于定义应用的核心配置和生命周期钩子
作用
1. 配置应用的数据库主键生成规则
2. 标识应用的唯一名称
3. 定义应用就绪后的初始化逻辑如加载插件
"""
# 配置Django模型默认的自增主键字段类型
# BigAutoField是64位整数型自增字段支持更大的主键范围适用于数据量较大的博客
# 替代默认的AutoField32位整数避免数据量增长后主键溢出问题
default_auto_field = 'django.db.models.BigAutoField'
# 应用的唯一名称,必须与项目中应用的目录名一致(此处为'djangoblog'
# Django通过该名称识别应用用于注册路由、加载模型等核心操作
name = 'djangoblog'
def ready(self):
"""
Django应用就绪后的钩子方法在应用完全加载并初始化后自动调用
执行时机
- 项目启动时如runservercelery启动
- 应用注册表app registry完成所有应用加载后
注意该方法可能会被多次调用如开发环境自动重载时需确保逻辑可重入
核心功能
调用插件加载函数在应用就绪后初始化所有已激活的插件
"""
# 调用父类的ready()方法确保Django默认的应用就绪逻辑正常执行
super().ready()
# 导入并执行插件加载函数:
# 1. 从当前应用djangoblog的plugin_manage.loader模块中导入load_plugins函数
# 2. 调用load_plugins()触发插件动态加载(如导入插件模块、初始化插件实例)
# 此处是插件系统与Django应用生命周期的绑定点确保插件在应用就绪后启动
from .plugin_manage.loader import load_plugins
load_plugins()

@ -0,0 +1,95 @@
# 导入Python标准库中的logging模块用于实现插件运行过程中的日志记录功能
# 日志可以帮助开发者追踪插件的运行状态、排查错误等
import logging
# 创建一个日志记录器实例,其名称与当前模块(__name__)绑定
# 这样可以确保日志信息能够准确关联到插件模块,便于日志的分类和筛选
logger = logging.getLogger(__name__)
class BasePlugin:
"""
插件系统的基类抽象基类角色所有自定义插件都必须继承此类
该类的核心作用是
1. 定义插件必须包含的元数据规范名称描述版本
2. 提供插件初始化和钩子注册的统一流程
3. 封装获取插件信息的通用方法
子类通过继承此类并实现特定方法即可快速接入插件系统
"""
# 插件元数据字段(子类必须显式赋值,否则初始化会失败)
PLUGIN_NAME = None # 插件的唯一标识名称,用于在系统中区分不同插件,例如"DataCleanPlugin"
PLUGIN_DESCRIPTION = None # 插件功能的详细描述,说明插件的作用和使用场景,例如"用于清洗CSV格式的原始数据"
PLUGIN_VERSION = None # 插件的版本号,遵循语义化版本规范(如"1.0.0"),用于版本管理和兼容性判断
def __init__(self):
"""
插件实例的构造方法负责插件的初始化流程控制
执行逻辑
1. 首先验证子类是否完整实现了元数据名称描述版本
2. 若元数据不完整抛出ValueError异常阻止实例化
3. 元数据验证通过后依次调用初始化方法和钩子注册方法
确保插件在使用前完成必要的准备工作
"""
# 使用all()函数检查三个元数据字段是否都有值非None
# 若存在任何一个未定义的字段,触发异常
if not all([self.PLUGIN_NAME, self.PLUGIN_DESCRIPTION, self.PLUGIN_VERSION]):
raise ValueError("Plugin metadata (PLUGIN_NAME, PLUGIN_DESCRIPTION, PLUGIN_VERSION) must be defined.")
# 调用插件初始化方法,执行子类自定义的初始化逻辑
self.init_plugin()
# 调用钩子注册方法,让子类注册需要监听的系统事件
self.register_hooks()
def init_plugin(self):
"""
插件初始化的具体实现方法用于执行插件启动前的准备工作
基类默认实现
- 输出一条INFO级别的日志提示插件已完成初始化
子类可重写此方法实现特定逻辑例如
- 加载配置文件
- 建立数据库连接
- 初始化缓存数据结构等
注意重写时若需要保留默认日志可通过super().init_plugin()调用父类方法
"""
# 记录插件初始化成功的日志,包含插件名称便于追踪
logger.info(f'{self.PLUGIN_NAME} initialized.')
def register_hooks(self):
"""
插件钩子注册方法用于将插件功能与系统事件关联
钩子Hook机制说明
系统在运行过程中会触发一系列事件"数据处理前""任务完成后"
插件通过注册钩子可以在特定事件发生时自动执行对应逻辑
基类默认实现空方法pass
子类需根据自身功能重写此方法例如
- 调用系统提供的register_hook()方法注册事件回调
- 定义需要监听的事件类型和对应的处理函数
"""
pass
def get_plugin_info(self):
"""
获取插件元数据的统一接口用于系统展示或管理插件信息
返回值说明
- 字典类型包含三个键值对
- 'name'对应PLUGIN_NAME
- 'description'对应PLUGIN_DESCRIPTION
- 'version'对应PLUGIN_VERSION
应用场景
- 插件管理界面展示插件列表
- 系统启动时收集所有插件信息进行校验
- 插件间依赖关系判断时获取版本信息
"""
return {
'name': self.PLUGIN_NAME,
'description': self.PLUGIN_DESCRIPTION,
'version': self.PLUGIN_VERSION
}

@ -0,0 +1,207 @@
# 导入必要模块
import _thread # 用于创建多线程,处理异步任务(如发送邮件)
import logging # 日志记录模块,记录信号处理过程中的关键信息和错误
import django.dispatch # Django信号系统用于定义和发送自定义信号
from django.conf import settings # 导入Django项目配置
from django.contrib.admin.models import LogEntry # 管理员操作日志模型
from django.contrib.auth.signals import user_logged_in, user_logged_out # Django内置的用户登录/登出信号
from django.core.mail import EmailMultiAlternatives # 用于发送HTML格式的邮件
from django.db.models.signals import post_save # Django模型保存后的信号
from django.dispatch import receiver # 用于注册信号接收器
# 导入项目内部模块
from comments.models import Comment # 评论模型
from comments.utils import send_comment_email # 发送评论通知邮件的工具函数
from djangoblog.spider_notify import SpiderNotify # 搜索引擎推送工具类
from djangoblog.utils import ( # 项目工具函数
cache, # 缓存操作对象
expire_view_cache, # 清除视图缓存
delete_sidebar_cache, # 清除侧边栏缓存
delete_view_cache # 清除指定视图缓存
)
from djangoblog.utils import get_current_site # 获取当前站点信息
from oauth.models import OAuthUser # OAuth用户模型
# 创建当前模块的日志记录器,用于记录信号处理相关日志
logger = logging.getLogger(__name__)
# 定义自定义信号OAuth用户登录信号
# 触发时机当用户通过OAuth第三方登录成功登录时
# 参数:['id'] 表示信号会携带OAuthUser的id
oauth_user_login_signal = django.dispatch.Signal(['id'])
# 定义自定义信号:发送邮件信号
# 触发时机:需要发送邮件时(解耦邮件发送逻辑,便于多处调用)
# 参数:['emailto', 'title', 'content'] 分别表示收件人、邮件标题、邮件内容
send_email_signal = django.dispatch.Signal(['emailto', 'title', 'content'])
@receiver(send_email_signal)
def send_email_signal_handler(sender, **kwargs):
"""
发送邮件信号的接收器处理实际的邮件发送逻辑
:param sender: 信号发送者通常无需关注
:param kwargs: 信号携带的参数emailto, title, content
"""
# 从信号参数中提取邮件信息
emailto = kwargs['emailto'] # 收件人列表
title = kwargs['title'] # 邮件标题
content = kwargs['content'] # 邮件内容HTML格式
# 创建HTML格式邮件对象
# from_email发件人从项目配置中获取
# to收件人列表
msg = EmailMultiAlternatives(
title,
content,
from_email=settings.DEFAULT_FROM_EMAIL,
to=emailto
)
msg.content_subtype = "html" # 声明邮件内容为HTML格式
# 记录邮件发送日志(存入数据库)
from servermanager.models import EmailSendLog # 导入邮件发送日志模型
log = EmailSendLog()
log.title = title
log.content = content
log.emailto = ','.join(emailto) # 将收件人列表转为字符串存储
try:
# 发送邮件,返回成功发送的数量
result = msg.send()
log.send_result = result > 0 # 若成功发送数量>0标记为发送成功
except Exception as e:
# 发送失败时记录错误日志
logger.error(f"失败邮箱号: {emailto}, {e}")
log.send_result = False # 标记为发送失败
log.save() # 保存日志记录到数据库
@receiver(oauth_user_login_signal)
def oauth_user_login_signal_handler(sender, **kwargs):
"""
OAuth用户登录信号的接收器处理第三方登录后的后续操作
:param sender: 信号发送者
:param kwargs: 信号携带的参数idOAuthUser的id
"""
id = kwargs['id'] # 获取OAuth用户ID
oauthuser = OAuthUser.objects.get(id=id) # 查询对应的OAuth用户对象
# 获取当前站点域名(用于判断头像是否为本站资源)
site = get_current_site().domain
# 若用户头像存在且不是本站资源如第三方平台的头像URL则下载并保存到本地
if oauthuser.picture and not oauthuser.picture.find(site) >= 0:
from djangoblog.utils import save_user_avatar # 导入保存用户头像的工具函数
oauthuser.picture = save_user_avatar(oauthuser.picture) # 下载并替换头像URL为本地路径
oauthuser.save() # 保存更新后的用户信息
# 清除侧边栏缓存(用户登录状态可能影响侧边栏展示内容)
delete_sidebar_cache()
@receiver(post_save)
def model_post_save_callback(
sender,
instance,
created,
raw,
using,
update_fields,
**kwargs):
"""
Django模型保存后post_save信号的接收器处理模型保存后的联动操作
:param sender: 发送信号的模型类
:param instance: 被保存的模型实例
:param created: 是否为新创建的记录True表示新建False表示更新
:param raw: 是否为原始保存如通过loaddata导入数据时为True
:param using: 使用的数据库别名
:param update_fields: 被更新的字段列表None表示全量更新
:param kwargs: 其他参数
"""
clearcache = False # 标记是否需要清除缓存
# 若保存的是管理员操作日志LogEntry直接返回无需处理
if isinstance(instance, LogEntry):
return
# 若模型实例有get_full_url方法通常表示是可被搜索引擎收录的内容如文章
if 'get_full_url' in dir(instance):
# 判断是否仅更新了浏览量字段views
is_update_views = update_fields == {'views'}
# 非测试环境且不是仅更新浏览量时,通知搜索引擎更新内容
if not settings.TESTING and not is_update_views:
try:
notify_url = instance.get_full_url() # 获取内容的完整URL
SpiderNotify.baidu_notify([notify_url]) # 向百度搜索引擎推送更新
except Exception as ex:
logger.error("notify spider", ex) # 推送失败时记录错误日志
# 若不是仅更新浏览量,标记需要清除缓存
if not is_update_views:
clearcache = True
# 若保存的是评论Comment实例
if isinstance(instance, Comment):
# 仅处理已启用的评论is_enable=True
if instance.is_enable:
# 获取评论所属文章的URL路径
path = instance.article.get_absolute_url()
# 获取当前站点域名
site = get_current_site().domain
# 处理域名中的端口(若有),仅保留主域名
if site.find(':') > 0:
site = site[0:site.find(':')]
# 清除文章详情页的视图缓存(评论更新后页面内容需同步更新)
expire_view_cache(
path,
servername=site,
serverport=80,
key_prefix='blogdetail'
)
# 清除SEO处理器缓存评论可能影响页面SEO信息
if cache.get('seo_processor'):
cache.delete('seo_processor')
# 清除该文章的评论缓存
comment_cache_key = 'article_comments_{id}'.format(id=instance.article.id)
cache.delete(comment_cache_key)
# 清除侧边栏缓存(侧边栏可能包含最新评论等动态内容)
delete_sidebar_cache()
# 清除评论列表视图的缓存
delete_view_cache('article_comments', [str(instance.article.pk)])
# 启动新线程异步发送评论通知邮件(避免阻塞主请求)
_thread.start_new_thread(send_comment_email, (instance,))
# 若标记需要清除缓存,则执行缓存清除
if clearcache:
cache.clear()
@receiver(user_logged_in) # 注册为用户登录信号的接收器
@receiver(user_logged_out) # 同时注册为用户登出信号的接收器
def user_auth_callback(sender, request, user, **kwargs):
"""
用户登录/登出信号的接收器处理认证状态变化后的操作
:param sender: 信号发送者
:param request: HTTP请求对象
:param user: 当前用户对象
:param kwargs: 其他参数
"""
# 若用户存在且用户名有效
if user and user.username:
logger.info(user) # 记录用户登录/登出日志
delete_sidebar_cache() # 清除侧边栏缓存(登录状态可能影响侧边栏内容,如显示用户名)
# cache.clear() # 注释:可根据需求开启全量缓存清除(通常侧边栏缓存足够)

@ -0,0 +1,316 @@
# 导入必要模块
from django.utils.encoding import force_str # 用于将数据转换为字符串兼容Python 2/3
from elasticsearch_dsl import Q # Elasticsearch DSL的查询构建工具
from haystack.backends import ( # Haystack搜索框架的基础类
BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query
)
from haystack.forms import ModelSearchForm # Haystack默认的模型搜索表单
from haystack.models import SearchResult # Haystack的搜索结果封装类
from haystack.utils import log as logging # Haystack的日志工具
# 导入项目内部模块
from blog.documents import ArticleDocument, ArticleDocumentManager # 文章的Elasticsearch文档定义及管理器
from blog.models import Article # 博客文章模型
# 创建当前模块的日志记录器,用于记录搜索相关日志
logger = logging.getLogger(__name__)
class ElasticSearchBackend(BaseSearchBackend):
"""
基于Elasticsearch的搜索后端实现继承自Haystack的BaseSearchBackend
作用实现与Elasticsearch的交互逻辑包括索引的创建更新删除
以及搜索查询的执行拼写建议等功能
"""
def __init__(self, connection_alias, **connection_options):
"""
初始化搜索后端
:param connection_alias: 数据库连接别名用于多后端配置
:param connection_options: 连接参数如主机端口等
"""
super(ElasticSearchBackend, self).__init__(connection_alias,** connection_options)
self.manager = ArticleDocumentManager() # 初始化文章文档管理器(处理索引操作)
self.include_spelling = True # 启用拼写建议功能
def _get_models(self, iterable):
"""
将模型实例列表转换为Elasticsearch文档对象
:param iterable: 模型实例列表如Article对象列表
:return: 转换后的Elasticsearch文档列表
"""
# 若输入为空,默认使用所有已发布的文章
models = iterable if iterable and iterable[0] else Article.objects.all()
# 通过管理器将模型转换为文档
docs = self.manager.convert_to_doc(models)
return docs
def _create(self, models):
"""
创建索引并初始化文档全量重建索引时使用
:param models: 模型实例列表
"""
self.manager.create_index() # 创建Elasticsearch索引若不存在
docs = self._get_models(models) # 转换模型为文档
self.manager.rebuild(docs) # 全量重建索引(清空旧数据后插入新数据)
def _delete(self, models):
"""
从索引中删除指定模型对应的文档
:param models: 要删除的模型实例列表
:return: 操作是否成功始终返回True
"""
for m in models:
m.delete() # 调用文档的删除方法
return True
def _rebuild(self, models):
"""
增量更新索引适用于部分数据更新
:param models: 需要更新的模型实例列表若为空则更新所有文章
"""
models = models if models else Article.objects.all() # 处理空输入
docs = self._get_models(models) # 转换模型为文档
self.manager.update_docs(docs) # 增量更新文档
def update(self, index, iterable, commit=True):
"""
Haystack标准接口更新索引用于实时同步模型变更
:param index: 索引名称当前实现未使用由管理器处理
:param iterable: 模型实例列表
:param commit: 是否立即提交当前实现未使用
"""
models = self._get_models(iterable) # 转换模型为文档
self.manager.update_docs(models) # 执行更新
def remove(self, obj_or_string):
"""
Haystack标准接口从索引中移除指定对象
:param obj_or_string: 模型实例或ID字符串
"""
models = self._get_models([obj_or_string]) # 转换为文档
self._delete(models) # 执行删除
def clear(self, models=None, commit=True):
"""
Haystack标准接口清空索引或指定模型的索引
:param models: 可选指定要清空的模型类当前实现忽略清空所有
:param commit: 是否立即提交当前实现未使用
"""
self.remove(None) # 调用删除方法清空所有
@staticmethod
def get_suggestion(query: str) -> str:
"""
获取搜索建议词基于Elasticsearch的拼写纠错功能
:param query: 用户输入的搜索词
:return: 建议的修正词多个词用空格拼接
"""
# 构建搜索查询:匹配文章内容,并启用拼写建议
search = ArticleDocument.search() \
.query("match", body=query) \
.suggest('suggest_search', query, term={'field': 'body'}) \
.execute()
keywords = []
# 提取建议结果
for suggest in search.suggest.suggest_search:
if suggest["options"]: # 若有建议词,取第一个
keywords.append(suggest["options"][0]["text"])
else: # 若无建议,保留原词
keywords.append(suggest["text"])
return ' '.join(keywords) # 拼接建议词为字符串
@log_query # Haystack装饰器记录查询日志
def search(self, query_string, **kwargs):
"""
执行搜索查询的核心方法
:param query_string: 用户输入的搜索字符串
:param kwargs: 额外参数如分页偏移量start_offset/end_offset
:return: 搜索结果字典包含结果列表命中数拼写建议等
"""
logger.info('search query_string:' + query_string) # 记录搜索词
# 获取分页参数(用于限制返回结果范围)
start_offset = kwargs.get('start_offset', 0)
end_offset = kwargs.get('end_offset', 10) # 默认返回前10条
# 判断是否需要启用拼写建议通过is_suggest参数控制
if getattr(self, "is_suggest", None):
suggestion = self.get_suggestion(query_string) # 获取建议词
else:
suggestion = query_string # 不启用建议,使用原搜索词
# 构建Elasticsearch查询条件
# 1. 布尔查询should匹配标题或内容至少满足70%的条件
q = Q('bool',
should=[Q('match', body=suggestion), Q('match', title=suggestion)],
minimum_should_match="70%")
# 构建完整搜索:
# - 应用上述查询条件
# - 过滤仅包含已发布status='p'的文章type='a'
# - 不返回原始文档内容source=False
# - 应用分页偏移
search = ArticleDocument.search() \
.query('bool', filter=[q]) \
.filter('term', status='p') \
.filter('term', type='a') \
.source(False)[start_offset: end_offset]
# 执行搜索并处理结果
results = search.execute()
hits = results['hits'].total # 总命中数
raw_results = []
# 遍历搜索结果转换为Haystack的SearchResult格式
for raw_result in results['hits']['hits']:
app_label = 'blog' # 应用标签(固定为博客应用)
model_name = 'Article' # 模型名称(固定为文章模型)
additional_fields = {} # 额外字段(当前未使用)
# 创建SearchResult实例适配Haystack的结果格式
result = SearchResult(
app_label,
model_name,
raw_result['_id'], # 文档ID对应文章ID
raw_result['_score'], # 匹配得分
**additional_fields
)
raw_results.append(result)
# 构建返回结果字典
facets = {} # 分面搜索结果(当前未实现)
# 若建议词与原词不同则返回建议词否则为None
spelling_suggestion = None if query_string == suggestion else suggestion
return {
'results': raw_results, # 搜索结果列表SearchResult实例
'hits': hits, # 总命中数
'facets': facets, # 分面数据
'spelling_suggestion': spelling_suggestion, # 拼写建议
}
class ElasticSearchQuery(BaseSearchQuery):
"""
自定义搜索查询类继承自Haystack的BaseSearchQuery
作用处理搜索查询的构建逻辑包括查询字符串清洗参数转换等
"""
def _convert_datetime(self, date):
"""
转换日期时间为Elasticsearch兼容的字符串格式
:param date: 日期时间对象
:return: 格式化的字符串'20231018123000'
"""
if hasattr(date, 'hour'): # 若包含时间信息datetime对象
return force_str(date.strftime('%Y%m%d%H%M%S'))
else: # 仅日期date对象时间部分设为00:00:00
return force_str(date.strftime('%Y%m%d000000'))
def clean(self, query_fragment):
"""
清洗用户输入的查询片段处理保留字和特殊字符
:param query_fragment: 用户输入的查询字符串片段
:return: 清洗后的查询字符串
"""
words = query_fragment.split() # 按空格拆分词语
cleaned_words = []
for word in words:
# 处理Elasticsearch保留字转为小写
if word in self.backend.RESERVED_WORDS:
word = word.replace(word, word.lower())
# 处理特殊字符(若包含特殊字符,用单引号包裹)
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = "'%s'" % word
break
cleaned_words.append(word)
return ' '.join(cleaned_words) # 拼接清洗后的词语
def build_query_fragment(self, field, filter_type, value):
"""
构建查询片段适配Elasticsearch的查询语法
:param field: 搜索字段
:param filter_type: 过滤类型
:param value: 查询值
:return: 构建的查询字符串
"""
return value.query_string # 直接使用查询字符串由value提供
def get_count(self):
"""
获取搜索结果总数
:return: 结果数量
"""
results = self.get_results()
return len(results) if results else 0
def get_spelling_suggestion(self, preferred_query=None):
"""
获取拼写建议适配Haystack接口
:param preferred_query: 优先使用的查询未使用
:return: 拼写建议词
"""
return self._spelling_suggestion
def build_params(self, spelling_query=None):
"""
构建查询参数适配Haystack接口
:param spelling_query: 拼写建议查询未使用
:return: 构建的参数字典
"""
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
return kwargs
class ElasticSearchModelSearchForm(ModelSearchForm):
"""
自定义搜索表单继承自Haystack的ModelSearchForm
作用扩展默认搜索表单支持控制是否启用拼写建议
"""
def search(self):
"""
执行搜索根据表单参数控制拼写建议
:return: 搜索结果集SearchQuerySet
"""
# 通过表单数据中的"is_suggest"参数控制是否启用拼写建议
# 若"is_suggest"为"no",则禁用建议
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
# 调用父类方法执行搜索
sqs = super().search()
return sqs
class ElasticSearchEngine(BaseEngine):
"""
Elasticsearch搜索引擎入口类继承自Haystack的BaseEngine
作用绑定后端实现和查询类作为Haystack的引擎配置入口
"""
backend = ElasticSearchBackend # 指定使用的搜索后端
query = ElasticSearchQuery # 指定使用的查询类

@ -0,0 +1,30 @@
# 文章相关系统事件常量定义
# 用途:统一管理插件系统中与文章操作相关的事件名称,避免硬编码导致的不一致问题
# 所有事件名称均采用大写蛇形命名法UPPER_SNAKE_CASE符合Python常量命名规范
# 事件:文章详情页加载完成
# 触发时机:当用户访问某篇文章的详情页,页面内容加载完成后触发
# 应用场景:插件可监听此事件,执行详情页相关的自定义逻辑(如添加页面统计代码、注入额外内容等)
ARTICLE_DETAIL_LOAD = 'article_detail_load'
# 事件:文章创建完成
# 触发时机:当一篇新文章在系统中创建成功(如数据库写入完成、状态设为"已发布"或"草稿")后触发
# 应用场景:插件可监听此事件,执行创建后的后续操作(如自动生成文章摘要、同步到外部平台、发送通知等)
ARTICLE_CREATE = 'article_create'
# 事件:文章更新完成
# 触发时机:当已存在的文章内容、属性(如标题、分类、状态)修改并保存成功后触发
# 应用场景:插件可监听此事件,执行更新后的联动操作(如更新文章索引、记录修改日志、重新生成相关统计数据等)
ARTICLE_UPDATE = 'article_update'
# 事件:文章删除完成
# 触发时机:当一篇文章从系统中删除(物理删除或逻辑删除,如标记为"已删除"状态)后触发
# 应用场景:插件可监听此事件,执行删除后的清理操作(如删除关联的评论、移除相关缓存、同步删除外部存储的附件等)
ARTICLE_DELETE = 'article_delete'
# 文章内容钩子名称常量
# 用途:定义专门用于拦截、修改文章内容的钩子标识,与上述"操作事件"区分(事件侧重流程节点,钩子侧重内容处理)
# 命名格式与事件常量保持一致,确保插件系统中钩子名称的唯一性和可识别性
# 应用场景插件可注册此钩子在文章内容渲染前如详情页展示、导出为PDF对内容进行自定义处理如过滤敏感词、替换关键词、添加水印等
ARTICLE_CONTENT_HOOK_NAME = "the_content"

@ -0,0 +1,92 @@
# 导入logging模块用于记录钩子系统运行过程中的日志如注册信息、错误信息等
import logging
# 创建当前模块的日志记录器,日志名称与模块绑定,便于区分不同组件的日志输出
logger = logging.getLogger(__name__)
# 全局钩子存储字典,用于保存所有注册的钩子及其对应的回调函数
# 键:钩子名称(字符串,如"article_create"
# 值:回调函数列表(所有注册到该钩子的可调用对象将按注册顺序存储)
_hooks = {}
def register(hook_name: str, callback: callable):
"""
注册一个钩子回调函数将其添加到指定钩子名称对应的回调列表中
核心作用建立"钩子名称""处理逻辑(回调函数)"的映射关系
使后续触发钩子时能自动执行所有注册的回调
:param hook_name: 钩子名称字符串需与触发时使用的名称一致如ARTICLE_CREATE
:param callback: 可调用对象函数方法等当钩子被触发时会执行此对象
回调函数的参数需与钩子触发时传递的参数匹配
"""
# 若钩子名称尚未在全局字典中,初始化一个空列表用于存储回调
if hook_name not in _hooks:
_hooks[hook_name] = []
# 将回调函数添加到对应钩子的列表中(按注册顺序存储,触发时也按此顺序执行)
_hooks[hook_name].append(callback)
# 记录DEBUG级日志说明钩子注册成功包含钩子名称和回调函数名便于调试
logger.debug(f"Registered hook '{hook_name}' with callback '{callback.__name__}'")
def run_action(hook_name: str, *args, **kwargs):
"""
执行指定名称的"动作钩子Action Hook"按注册顺序调用所有关联的回调函数
动作钩子特性用于触发一系列操作不关注返回值仅执行回调逻辑
典型场景文章创建后发送通知记录日志等执行动作但无需修改数据
:param hook_name: 要触发的钩子名称需已被注册过
:param *args: 传递给回调函数的位置参数可变参数根据钩子场景定义
:param **kwargs: 传递给回调函数的关键字参数可变参数根据钩子场景定义
"""
# 检查该钩子是否有已注册的回调函数
if hook_name in _hooks:
# 记录DEBUG级日志说明开始执行该动作钩子
logger.debug(f"Running action hook '{hook_name}'")
# 按注册顺序遍历所有回调函数并执行
for callback in _hooks[hook_name]:
try:
# 传递位置参数和关键字参数给回调函数
callback(*args, **kwargs)
except Exception as e:
# 若回调执行出错记录ERROR级日志包含详细异常信息
# exc_info=True 会在日志中附带堆栈跟踪,便于排查错误
logger.error(
f"Error running action hook '{hook_name}' callback '{callback.__name__}': {e}",
exc_info=True
)
def apply_filters(hook_name: str, value, *args, **kwargs):
"""
执行指定名称的"过滤钩子Filter Hook"通过回调函数链式处理初始值并返回最终结果
过滤钩子特性用于对数据进行加工处理每个回调函数接收上一个函数的输出作为输入
最终返回经过所有回调处理后的值
典型场景文章内容过滤敏感词格式化文本等修改数据并返回新值
:param hook_name: 要触发的钩子名称需已被注册过
:param value: 初始值需要被过滤/处理的数据如文章内容字符串
:param *args: 传递给回调函数的额外位置参数
:param **kwargs: 传递给回调函数的额外关键字参数
:return: 经过所有回调函数处理后的最终值
"""
# 检查该钩子是否有已注册的回调函数
if hook_name in _hooks:
# 记录DEBUG级日志说明开始执行该过滤钩子
logger.debug(f"Applying filter hook '{hook_name}'")
# 按注册顺序遍历所有回调函数,链式处理初始值
for callback in _hooks[hook_name]:
try:
# 调用回调函数,将当前值和额外参数传入,更新值为回调返回的结果
value = callback(value, *args, **kwargs)
except Exception as e:
# 若回调执行出错记录ERROR级日志包含详细异常信息
logger.error(
f"Error applying filter hook '{hook_name}' callback '{callback.__name__}': {e}",
exc_info=True
)
# 返回经过所有过滤处理后的最终值
return value

@ -0,0 +1,51 @@
# 导入必要的模块
# os: 用于处理文件路径和目录操作
# logging: 用于记录插件加载过程中的日志信息(成功/失败状态)
# django.conf.settings: 用于获取Django项目的配置信息如插件目录、激活的插件列表
import os
import logging
from django.conf import settings
# 创建当前模块的日志记录器,日志名称与模块绑定,便于追踪插件加载相关的日志
logger = logging.getLogger(__name__)
def load_plugins():
"""
动态加载并初始化位于'plugins'目录中的插件
功能说明
- 从Django配置中读取激活的插件列表settings.ACTIVE_PLUGINS
- 检查每个插件的目录结构是否合法是否存在plugin.py文件
- 动态导入插件的核心模块plugin.py触发插件的初始化流程
- 通过日志记录每个插件的加载结果成功/失败及原因
调用时机
该函数应在Django应用注册表app registry准备就绪后调用
通常在项目启动时如通过AppConfig.ready()方法触发确保Django环境已初始化完成
"""
# 遍历配置中激活的所有插件名称settings.ACTIVE_PLUGINS是一个插件名称列表
for plugin_name in settings.ACTIVE_PLUGINS:
# 构建插件的完整目录路径:
# settings.PLUGINS_DIR是项目中存放所有插件的根目录如"project_root/plugins"
# 拼接根目录与当前插件名称,得到具体插件的目录路径(如"project_root/plugins/my_plugin"
plugin_path = os.path.join(settings.PLUGINS_DIR, plugin_name)
# 验证插件目录的合法性:
# 1. 必须是一个存在的目录os.path.isdir(plugin_path)
# 2. 目录中必须包含核心文件plugin.py插件的入口模块
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, 'plugin.py')):
try:
# 动态导入插件的plugin.py模块
# 导入路径格式为"plugins.{插件名称}.plugin"基于Python包结构
# 导入后会自动执行plugin.py中的顶级代码如插件类的定义和注册逻辑
__import__(f'plugins.{plugin_name}.plugin')
# 记录INFO级日志提示插件加载成功
logger.info(f"Successfully loaded plugin: {plugin_name}")
# 捕获导入过程中的异常(如模块不存在、语法错误、依赖缺失等)
except ImportError as e:
# 记录ERROR级日志提示插件导入失败并附带异常信息
# exc_info=e 会将异常堆栈信息写入日志,便于排查问题
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)
Loading…
Cancel
Save