Compare commits

...

16 Commits

@ -0,0 +1,4 @@
default_app_config = 'djangoblog.apps.DjangoblogAppConfig'
# 默认应用配置指定当前Django应用djangoblog对应的配置类
# 配置类位于 djangoblog/apps.py 文件中的 DjangoBlogAppConfig 类

@ -0,0 +1,90 @@
# 导入Django内置的AdminSite后台管理站点基类
from django.contrib.admin import AdminSite
# 导入日志条目模型(记录后台操作日志)
from django.contrib.admin.models import LogEntry
# 导入站点管理类和站点模型Django内置的多站点配置
from django.contrib.sites.admin import SiteAdmin
from django.contrib.sites.models import Site
# 导入各个应用的Admin类和模型项目内自定义的后台配置
from accounts.admin import * # 账号相关的后台管理配置
from blog.admin import * # 博客核心功能的后台管理配置
from blog.models import * # 博客核心模型(文章、分类等)
from comments.admin import * # 评论功能的后台管理配置
from comments.models import *# 评论模型
from djangoblog.logentryadmin import LogEntryAdmin # 自定义的日志条目管理类
from oauth.admin import * # OAuth第三方登录的后台管理配置
from oauth.models import * # OAuth相关模型
from owntracks.admin import *# OwnTracks位置追踪的后台管理配置
from owntracks.models import *# OwnTracks相关模型
from servermanager.admin import *# 服务器管理的后台管理配置
from servermanager.models import *# 服务器管理相关模型
# 自定义Django后台管理站点类继承自AdminSite
class DjangoBlogAdminSite(AdminSite):
# 后台页面顶部的标题
site_header = 'djangoblog administration'
# 浏览器标签页的标题
site_title = 'djangoblog site admin'
# 初始化方法
def __init__(self, name='admin'):
# 调用父类AdminSite的初始化方法
super().__init__(name)
# 权限验证:判断用户是否有权限访问后台
def has_permission(self, request):
# 只有超级用户才能访问后台
return request.user.is_superuser
# 注释掉的代码自定义后台URL路由
# def get_urls(self):
# # 先获取父类默认的URL
# urls = super().get_urls()
# # 导入path用于定义路由
# from django.urls import path
# # 导入自定义的视图函数(刷新缓存)
# from blog.views import refresh_memcache
#
# # 自定义路由:后台添加“刷新缓存”的功能入口
# my_urls = [
# path('refresh/', self.admin_view(refresh_memcache), name="refresh"),
# ]
# # 合并默认路由和自定义路由
# return urls + my_urls
# 实例化自定义的后台管理站点
admin_site = DjangoBlogAdminSite(name='admin')
# 注册博客核心模型到后台管理并指定对应的Admin类配置模型在后台的展示、操作
admin_site.register(Article, ArticlelAdmin) # 文章模型
admin_site.register(Category, CategoryAdmin)# 分类模型
admin_site.register(Tag, TagAdmin) # 标签模型
admin_site.register(Links, LinksAdmin) # 友情链接模型
admin_site.register(SideBar, SideBarAdmin) # 侧边栏模型
admin_site.register(BlogSettings, BlogSettingsAdmin)# 博客设置模型
# 注册服务器管理相关模型
admin_site.register(commands, CommandsAdmin)# 命令模型
admin_site.register(EmailSendLog, EmailSendLogAdmin)# 邮件发送日志模型
# 注册账号相关模型
admin_site.register(BlogUser, BlogUserAdmin)# 博客用户模型
# 注册评论相关模型
admin_site.register(Comment, CommentAdmin)# 评论模型
# 注册OAuth第三方登录相关模型
admin_site.register(OAuthUser, OAuthUserAdmin)# OAuth用户模型
admin_site.register(OAuthConfig, OAuthConfigAdmin)# OAuth配置模型
# 注册OwnTracks位置追踪相关模型
admin_site.register(OwnTrackLog, OwnTrackLogsAdmin)# 位置追踪日志模型
# 注册Django内置的站点模型多站点配置
admin_site.register(Site, SiteAdmin)
# 注册日志条目模型后台操作日志使用自定义的LogEntryAdmin
admin_site.register(LogEntry, LogEntryAdmin)

@ -0,0 +1,20 @@
# 导入Django的应用配置基类AppConfig
from django.apps import AppConfig
# 定义当前应用djangoblog的配置类继承自AppConfig
class DjangoblogAppConfig(AppConfig):
# 指定模型主键的默认类型为BigAutoField大整数自增主键
# 替代旧版默认的AutoField支持更大的数值范围
default_auto_field = 'django.db.models.BigAutoField'
# 当前应用的名称(必须与项目中应用的目录名一致)
name = 'djangoblog'
# 应用启动时自动执行的方法Django加载完应用后触发
def ready(self):
# 先调用父类的ready方法确保基础初始化完成
super().ready()
# 在这里导入并加载插件(应用启动时自动加载插件逻辑)
# 从当前应用的plugin_manage模块中导入load_plugins函数
from .plugin_manage.loader import load_plugins
# 执行插件加载操作
load_plugins()

@ -0,0 +1,165 @@
# 导入线程模块(用于异步执行任务)
import _thread
# 导入日志模块(记录运行信息/错误)
import logging
# 导入Django信号相关工具
import django.dispatch
from django.conf import settings # 导入Django项目配置
from django.contrib.admin.models import LogEntry # 后台操作日志模型
# 导入用户登录/登出的内置信号
from django.contrib.auth.signals import user_logged_in, user_logged_out
# 导入Django邮件发送工具支持多格式邮件
from django.core.mail import EmailMultiAlternatives
# 导入模型保存后的内置信号
from django.db.models.signals import post_save
from django.dispatch import receiver # 信号接收器装饰器
# 导入自定义模型和工具函数
from comments.models import Comment # 评论模型
from comments.utils import send_comment_email # 发送评论通知邮件的工具
from djangoblog.spider_notify import SpiderNotify # 爬虫通知工具(如百度收录推送)
# 导入缓存操作、缓存过期等工具函数
from djangoblog.utils import cache, expire_view_cache, delete_sidebar_cache, delete_view_cache
from djangoblog.utils import get_current_site # 获取当前站点域名
from oauth.models import OAuthUser # OAuth第三方用户模型
# 创建当前模块的日志对象
logger = logging.getLogger(__name__)
# 自定义信号1OAuth用户登录信号携带参数id
oauth_user_login_signal = django.dispatch.Signal(['id'])
# 自定义信号2发送邮件信号携带参数收件人、标题、内容
send_email_signal = django.dispatch.Signal(['emailto', 'title', 'content'])
# 监听send_email_signal信号的处理器
@receiver(send_email_signal)
def send_email_signal_handler(sender, **kwargs):
# 从信号参数中提取邮件信息
emailto = kwargs['emailto'] # 收件人列表
title = kwargs['title'] # 邮件标题
content = kwargs['content'] # 邮件内容HTML格式
# 构造多格式邮件对象
msg = EmailMultiAlternatives(
title, # 邮件标题
content, # 邮件内容(文本/HTML
from_email=settings.DEFAULT_FROM_EMAIL, # 发件人(从配置中读取)
to=emailto # 收件人列表
)
msg.content_subtype = "html" # 指定邮件内容为HTML格式
# 记录邮件发送日志到数据库
from servermanager.models import EmailSendLog
log = EmailSendLog()
log.title = title
log.content = content
log.emailto = ','.join(emailto) # 把收件人列表转成字符串存储
try:
# 发送邮件,返回成功发送的数量
result = msg.send()
log.send_result = result > 0 # 发送成功则标记为True
except Exception as e:
# 发送失败时记录错误日志
logger.error(f"失败邮箱号: {emailto}, {e}")
log.send_result = False
log.save() # 保存日志记录
# 监听oauth_user_login_signal信号的处理器
@receiver(oauth_user_login_signal)
def oauth_user_login_signal_handler(sender, **kwargs):
id = kwargs['id'] # 从信号参数中提取OAuth用户ID
oauthuser = OAuthUser.objects.get(id=id) # 获取对应的OAuth用户对象
site = get_current_site().domain # 获取当前站点的域名
# 若用户头像链接不是本站域名,则下载并保存到本地
if oauthuser.picture and not oauthuser.picture.find(site) >= 0:
from djangoblog.utils import save_user_avatar
oauthuser.picture = save_user_avatar(oauthuser.picture) # 下载并替换头像链接
oauthuser.save() # 保存修改
delete_sidebar_cache() # 清除侧边栏缓存(避免显示旧数据)
# 监听所有模型post_save信号的处理器模型保存后触发
@receiver(post_save)
def model_post_save_callback(
sender,
instance,
created,
raw,
using,
update_fields,
**kwargs):
clearcache = False # 标记是否需要清除缓存
# 排除LogEntry后台操作日志不处理它的保存事件
if isinstance(instance, LogEntry):
return
# 若模型实例有get_full_url方法表示是可访问的内容如文章
if 'get_full_url' in dir(instance):
# 判断是否仅更新了views阅读量字段
is_update_views = update_fields == {'views'}
# 非测试环境 + 不是仅更新阅读量 → 通知爬虫(如百度)收录新内容
if not settings.TESTING and not is_update_views:
try:
notify_url = instance.get_full_url() # 获取内容的完整URL
SpiderNotify.baidu_notify([notify_url]) # 通知百度爬虫
except Exception as ex:
logger.error("notify sipder", ex) # 通知失败记录错误
# 不是仅更新阅读量 → 需要清除缓存
if not is_update_views:
clearcache = True
# 若保存的是Comment评论实例
if isinstance(instance, Comment):
# 仅当评论是启用状态时处理
if instance.is_enable:
# 获取评论对应的文章URL
path = instance.article.get_absolute_url()
site = get_current_site().domain # 获取当前站点域名
# 处理带端口的域名如localhost:8000 → 取localhost
if site.find(':') > 0:
site = site[0:site.find(':')]
# 过期文章详情页的缓存(确保显示最新评论)
expire_view_cache(
path,
servername=site,
serverport=80,
key_prefix='blogdetail'
)
# 清除SEO处理器的缓存
if cache.get('seo_processor'):
cache.delete('seo_processor')
# 清除该文章的评论缓存
comment_cache_key = 'article_comments_{id}'.format(
id=instance.article.id)
cache.delete(comment_cache_key)
delete_sidebar_cache() # 清除侧边栏缓存
# 清除评论列表的视图缓存
delete_view_cache('article_comments', [str(instance.article.pk)])
# 启动新线程异步发送评论通知邮件(避免阻塞主流程)
_thread.start_new_thread(send_comment_email, (instance,))
# 需要清除缓存时,清空整个缓存
if clearcache:
cache.clear()
# 监听用户登录/登出信号的处理器
@receiver(user_logged_in)
@receiver(user_logged_out)
def user_auth_callback(sender, request, user, **kwargs):
# 用户存在且有用户名时处理
if user and user.username:
logger.info(user) # 记录用户登录/登出日志
delete_sidebar_cache() # 清除侧边栏缓存(避免显示旧的用户相关内容)
# cache.clear() # (注释)可选择清空整个缓存,此处未启用

@ -0,0 +1,144 @@
# 导入Django字符串处理工具及Elasticsearch相关依赖
from django.utils.encoding import force_str
from elasticsearch_dsl import Q
# 导入Haystack搜索引擎基础类和工具
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query
from haystack.forms import ModelSearchForm
from haystack.models import SearchResult
from haystack.utils import log as logging
# 导入博客相关的ES文档类、管理类和模型
from blog.documents import ArticleDocument, ArticleDocumentManager
from blog.models import Article
# 创建日志对象
logger = logging.getLogger(__name__)
# 自定义Elasticsearch搜索后端继承Haystack基础搜索后端
class ElasticSearchBackend(BaseSearchBackend):
def __init__(self, connection_alias, **connection_options):
super().__init__(connection_alias,** connection_options)
self.manager = ArticleDocumentManager() # 初始化文档管理器
self.include_spelling = True # 启用拼写建议功能
# 转换模型实例为ES文档格式
def _get_models(self, iterable):
models = iterable if (iterable and iterable[0]) else Article.objects.all()
return self.manager.convert_to_doc(models)
# 创建索引并批量导入文档
def _create(self, models):
self.manager.create_index()
self.manager.rebuild(self._get_models(models))
# 删除指定模型对应的ES文档
def _delete(self, models):
for m in models:
m.delete()
return True
# 重建索引,更新文档数据
def _rebuild(self, models):
models = models if models else Article.objects.all()
self.manager.update_docs(self.manager.convert_to_doc(models))
# 批量更新ES文档
def update(self, index, iterable, commit=True):
self.manager.update_docs(self._get_models(iterable))
# 移除单个对象对应的ES文档
def remove(self, obj_or_string):
self._delete(self._get_models([obj_or_string]))
# 清空索引数据
def clear(self, models=None, commit=True):
self.remove(None)
@staticmethod
# 获取搜索推荐词,无推荐则返回原搜索词
def get_suggestion(query: str) -> str:
search = ArticleDocument.search().query("match", body=query) \
.suggest('suggest_search', query, term={'field': 'body'}).execute()
keywords = []
for suggest in search.suggest.suggest_search:
keywords.append(suggest["options"][0]["text"] if suggest["options"] else suggest["text"])
return ' '.join(keywords)
# 核心搜索方法,带日志记录
@log_query
def search(self, query_string, **kwargs):
logger.info('search query_string:' + query_string)
start_offset, end_offset = kwargs.get('start_offset'), kwargs.get('end_offset')
# 处理搜索推荐词
suggestion = self.get_suggestion(query_string) if getattr(self, "is_suggest", None) else query_string
# 构建搜索条件匹配正文和标题最低匹配度70%
q = Q('bool', should=[Q('match', body=suggestion), Q('match', title=suggestion)], minimum_should_match="70%")
# 执行搜索:筛选已发布文章,指定结果范围
search = ArticleDocument.search().query('bool', filter=[q]) \
.filter('term', status='p').filter('term', type='a').source(False)[start_offset: end_offset]
results = search.execute()
hits = results['hits'].total
raw_results = []
# 格式化搜索结果为Haystack的SearchResult格式
for raw_result in results['hits']['hits']:
result = SearchResult('blog', 'Article', raw_result['_id'], raw_result['_score'])
raw_results.append(result)
# 返回搜索结果、总数、推荐词等
spelling_suggestion = None if query_string == suggestion else suggestion
return {'results': raw_results, 'hits': hits, 'facets': {}, 'spelling_suggestion': spelling_suggestion}
# 自定义搜索查询类继承Haystack基础查询类
class ElasticSearchQuery(BaseSearchQuery):
# 转换时间格式适配搜索
def _convert_datetime(self, date):
fmt = '%Y%m%d%H%M%S' if hasattr(date, 'hour') else '%Y%m%d000000'
return force_str(date.strftime(fmt))
# 清洗查询语句,处理保留词和字符
def clean(self, query_fragment):
cleaned_words = []
for word in query_fragment.split():
if word in self.backend.RESERVED_WORDS:
word = word.lower()
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = f"'{word}'"
break
cleaned_words.append(word)
return ' '.join(cleaned_words)
# 构建查询片段
def build_query_fragment(self, field, filter_type, value):
return value.query_string
# 获取搜索结果总数
def get_count(self):
return len(self.get_results()) if self.get_results() else 0
# 获取拼写建议
def get_spelling_suggestion(self, preferred_query=None):
return self._spelling_suggestion
# 构建查询参数
def build_params(self, spelling_query=None):
return super().build_params(spelling_query=spelling_query)
# 自定义搜索表单继承Haystack模型搜索表单
class ElasticSearchModelSearchForm(ModelSearchForm):
# 重写搜索方法,控制是否启用搜索建议
def search(self):
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
return super().search()
# 自定义搜索引擎引擎类,指定后端和查询类
class ElasticSearchEngine(BaseEngine):
backend = ElasticSearchBackend
query = ElasticSearchQuery

@ -0,0 +1,51 @@
# 导入用户模型、RSS订阅核心类等依赖
from django.contrib.auth import get_user_model
from django.contrib.syndication.views import Feed # Django RSS订阅基础类
from django.utils import timezone # 时间处理工具
from django.utils.feedgenerator import Rss201rev2Feed # RSS2.0格式生成器
# 导入博客文章模型和Markdown转换工具
from blog.models import Article
from djangoblog.utils import CommonMarkdown
# 自定义博客RSS订阅类继承Django的Feed基类
class DjangoBlogFeed(Feed):
feed_type = Rss201rev2Feed # 指定订阅源格式为RSS2.0
description = '大巧无工,重剑无锋.' # 订阅源描述
title = "且听风吟 大巧无工,重剑无锋. " # 订阅源标题
link = "/feed/" # 订阅源的链接地址
# 订阅源作者名称,取第一个用户的昵称
def author_name(self):
return get_user_model().objects.first().nickname
# 订阅源作者的链接,取第一个用户的个人页面地址
def author_link(self):
return get_user_model().objects.first().get_absolute_url()
# 订阅源的内容项获取5篇已发布的文章按发布时间倒序
def items(self):
return Article.objects.filter(type='a', status='p').order_by('-pub_time')[:5]
# 单个内容项的标题(对应文章标题)
def item_title(self, item):
return item.title
# 单个内容项的描述将文章正文Markdown格式转为HTML
def item_description(self, item):
return CommonMarkdown.get_markdown(item.body)
# 订阅源的版权信息,动态显示当前年份
def feed_copyright(self):
now = timezone.now()
return "Copyright© {year} 且听风吟".format(year=now.year)
# 单个内容项的链接(对应文章详情页地址)
def item_link(self, item):
return item.get_absolute_url()
# 单个内容项的唯一标识此处未实现可补充文章ID等作为标识
def item_guid(self, item):
return

@ -0,0 +1,77 @@
# 导入Django后台管理及相关工具类
from django.contrib import admin
from django.contrib.admin.models import DELETION # 操作类型:删除
from django.contrib.contenttypes.models import ContentType # 内容类型模型
from django.urls import reverse, NoReverseMatch # URL反向解析相关
from django.utils.encoding import force_str # 字符串编码处理
from django.utils.html import escape # HTML转义
from django.utils.safestring import mark_safe # 标记安全HTML内容
from django.utils.translation import gettext_lazy as _ # 国际化翻译
# 自定义后台操作日志管理类控制LogEntry模型在后台的展示与操作
class LogEntryAdmin(admin.ModelAdmin):
# 侧边栏筛选条件:按内容类型筛选
list_filter = ['content_type']
# 搜索字段:按对象描述和操作信息搜索
search_fields = ['object_repr', 'change_message']
# 列表页可点击跳转的字段
list_display_links = ['action_time', 'get_change_message']
# 列表页展示的字段:操作时间、操作用户、内容类型、操作对象、操作信息
list_display = ['action_time', 'user_link', 'content_type', 'object_link', 'get_change_message']
# 禁用添加权限:操作日志不可手动添加
def has_add_permission(self, request):
return False
# 限制修改权限仅超级用户或有权限用户可查看禁止POST提交修改
def has_change_permission(self, request, obj=None):
return (request.user.is_superuser or request.user.has_perm('admin.change_logentry')) and request.method != 'POST'
# 禁用删除权限:操作日志不可删除
def has_delete_permission(self, request, obj=None):
return False
# 生成操作对象的链接(非删除操作时)
def object_link(self, obj):
object_link = escape(obj.object_repr) # 转义对象描述避免XSS
content_type = obj.content_type
# 非删除操作且有内容类型时,尝试生成对象的后台编辑链接
if obj.action_flag != DELETION and content_type is not None:
try:
url = reverse(f'admin:{content_type.app_label}_{content_type.model}_change', args=[obj.object_id])
object_link = f'<a href=" ">{object_link}</a >'
except NoReverseMatch: # 无法解析URL时仅显示文本
pass
return mark_safe(object_link) # 标记为安全HTML允许页面渲染链接
object_link.admin_order_field = 'object_repr' # 支持按对象描述排序
object_link.short_description = _('object') # 字段显示名称
# 生成操作用户的后台编辑链接
def user_link(self, obj):
content_type = ContentType.objects.get_for_model(type(obj.user)) # 获取用户模型的内容类型
user_link = escape(force_str(obj.user)) # 转义用户名
try:
# 生成用户的后台编辑URL
url = reverse(f'admin:{content_type.app_label}_{content_type.model}_change', args=[obj.user.pk])
user_link = f'<a href="{url}">{user_link}</a >'
except NoReverseMatch:
pass
return mark_safe(user_link)
user_link.admin_order_field = 'user' # 支持按用户排序
user_link.short_description = _('user') # 字段显示名称
# 优化查询预加载content_type减少数据库查询次数
def get_queryset(self, request):
queryset = super().get_queryset(request)
return queryset.prefetch_related('content_type')
# 移除批量删除操作:避免误删日志
def get_actions(self, request):
actions = super().get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions

@ -0,0 +1,77 @@
# 导入Django后台管理及相关工具类
from django.contrib import admin
from django.contrib.admin.models import DELETION # 操作类型:删除
from django.contrib.contenttypes.models import ContentType # 内容类型模型
from django.urls import reverse, NoReverseMatch # URL反向解析相关
from django.utils.encoding import force_str # 字符串编码处理
from django.utils.html import escape # HTML转义
from django.utils.safestring import mark_safe # 标记安全HTML内容
from django.utils.translation import gettext_lazy as _ # 国际化翻译
# 自定义后台操作日志管理类控制LogEntry模型在后台的展示与操作
class LogEntryAdmin(admin.ModelAdmin):
# 侧边栏筛选条件:按内容类型筛选
list_filter = ['content_type']
# 搜索字段:按对象描述和操作信息搜索
search_fields = ['object_repr', 'change_message']
# 列表页可点击跳转的字段
list_display_links = ['action_time', 'get_change_message']
# 列表页展示的字段:操作时间、操作用户、内容类型、操作对象、操作信息
list_display = ['action_time', 'user_link', 'content_type', 'object_link', 'get_change_message']
# 禁用添加权限:操作日志不可手动添加
def has_add_permission(self, request):
return False
# 限制修改权限仅超级用户或有权限用户可查看禁止POST提交修改
def has_change_permission(self, request, obj=None):
return (request.user.is_superuser or request.user.has_perm('admin.change_logentry')) and request.method != 'POST'
# 禁用删除权限:操作日志不可删除
def has_delete_permission(self, request, obj=None):
return False
# 生成操作对象的链接(非删除操作时)
def object_link(self, obj):
object_link = escape(obj.object_repr) # 转义对象描述避免XSS
content_type = obj.content_type
# 非删除操作且有内容类型时,尝试生成对象的后台编辑链接
if obj.action_flag != DELETION and content_type is not None:
try:
url = reverse(f'admin:{content_type.app_label}_{content_type.model}_change', args=[obj.object_id])
object_link = f'<a href=" ">{object_link}</a >'
except NoReverseMatch: # 无法解析URL时仅显示文本
pass
return mark_safe(object_link) # 标记为安全HTML允许页面渲染链接
object_link.admin_order_field = 'object_repr' # 支持按对象描述排序
object_link.short_description = _('object') # 字段显示名称
# 生成操作用户的后台编辑链接
def user_link(self, obj):
content_type = ContentType.objects.get_for_model(type(obj.user)) # 获取用户模型的内容类型
user_link = escape(force_str(obj.user)) # 转义用户名
try:
# 生成用户的后台编辑URL
url = reverse(f'admin:{content_type.app_label}_{content_type.model}_change', args=[obj.user.pk])
user_link = f'<a href="{url}">{user_link}</a >'
except NoReverseMatch:
pass
return mark_safe(user_link)
user_link.admin_order_field = 'user' # 支持按用户排序
user_link.short_description = _('user') # 字段显示名称
# 优化查询预加载content_type减少数据库查询次数
def get_queryset(self, request):
queryset = super().get_queryset(request)
return queryset.prefetch_related('content_type')
# 移除批量删除操作:避免误删日志
def get_actions(self, request):
actions = super().get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions

@ -0,0 +1,80 @@
from django.contrib.sitemaps import Sitemap
from django.urls import reverse
from blog.models import Article, Category, Tag
class StaticViewSitemap(Sitemap):
"""静态视图站点地图类,用于处理没有对应模型数据的固定页面[1,2](@ref)"""
priority = 0.5 # 优先级取值范围0.0-1.0默认0.5[2,4](@ref)
changefreq = 'daily' # 内容更新频率可选值always/hourly/daily/weekly/monthly/yearly/never[2,6](@ref)
def items(self):
"""返回要包含在站点地图中的URL名称列表[1,2](@ref)"""
return ['blog:index', ] # 这里只包含博客首页,可以添加其他静态页面如'about'、'contact'等
def location(self, item):
"""根据URL名称生成完整的URL路径[1,2](@ref)"""
return reverse(item) # 使用Django的reverse函数通过URL名称生成实际URL
class ArticleSiteMap(Sitemap):
"""文章模型站点地图类,用于生成所有文章的站点地图条目[1,3](@ref)"""
changefreq = "monthly" # 文章内容通常每月更新
priority = "0.6" # 文章页面优先级较高,因为包含重要内容
def items(self):
"""返回所有已发布的文章对象[3,5](@ref)"""
return Article.objects.filter(status='p') # 只筛选状态为'p'(已发布)的文章
def lastmod(self, obj):
"""返回文章的最后修改时间[1,3](@ref)"""
return obj.last_modify_time # 使用文章的last_modify_time字段作为最后修改时间
class CategorySiteMap(Sitemap):
"""分类模型站点地图类,用于生成所有分类页面的站点地图[3](@ref)"""
changefreq = "Weekly" # 分类页面内容相对稳定,每周检查更新
priority = "0.6" # 分类页面有中等优先级
def items(self):
"""返回所有分类对象[3](@ref)"""
return Category.objects.all() # 包含所有分类
def lastmod(self, obj):
"""返回分类的最后修改时间[3](@ref)"""
return obj.last_modify_time # 使用分类的last_modify_time字段
class TagSiteMap(Sitemap):
"""标签模型站点地图类,用于生成所有标签页面的站点地图[3](@ref)"""
changefreq = "Weekly" # 标签页面更新频率为每周
priority = "0.3" # 标签页面优先级较低
def items(self):
"""返回所有标签对象[3](@ref)"""
return Tag.objects.all() # 包含所有标签
def lastmod(self, obj):
"""返回标签的最后修改时间"""
return obj.last_modify_time # 使用标签的last_modify_time字段
class UserSiteMap(Sitemap):
"""用户站点地图类,用于生成用户相关页面的站点地图"""
changefreq = "Weekly" # 用户信息相对稳定,每周检查
priority = "0.3" # 用户页面优先级较低
def items(self):
"""返回所有有文章的作者用户[7](@ref)"""
# 通过文章获取所有不重复的作者,确保只包含有文章的用户
return list(set(map(lambda x: x.author, Article.objects.all())))
def lastmod(self, obj):
"""返回用户的注册时间[7](@ref)"""
return obj.date_joined # 使用用户的date_joined字段作为最后修改时间

@ -0,0 +1,55 @@
import logging
import requests
from django.conf import settings
# 获取当前模块的日志记录器,用于记录日志信息
logger = logging.getLogger(__name__)
class SpiderNotify():
"""蜘蛛通知类用于向搜索引擎推送URL帮助搜索引擎发现和收录网站内容"""
@staticmethod
def baidu_notify(urls):
"""
向百度搜索引擎推送URL促进网站收录
Args:
urls (list): 需要推送的URL列表通常为新发布或更新的文章链接
Note:
使用百度站长平台的API接口进行URL推送
推送格式为每行一个URL的纯文本数据
"""
try:
# 将URL列表转换为百度API要求的格式每行一个URL的字符串
# 例如:['http://example.com/1', 'http://example.com/2'] -> "http://example.com/1\nhttp://example.com/2"
data = '\n'.join(urls)
# 发送POST请求到百度推送接口[8,9](@ref)
# 使用Django设置中配置的百度推送URL避免硬编码
result = requests.post(settings.BAIDU_NOTIFY_URL, data=data)
# 记录推送结果到日志,便于监控和调试[6,7](@ref)
logger.info(result.text)
except Exception as e:
# 捕获并记录所有可能的异常,如网络错误、配置错误等
# 使用错误级别日志记录异常信息[6](@ref)
logger.error(e)
@staticmethod
def notify(url):
"""
推送URL的便捷方法可以扩展支持多个搜索引擎
Args:
url (str or list): 单个URL字符串或URL列表
"""
# 如果传入的是单个URL转换为列表形式
if isinstance(url, str):
url = [url]
# 调用百度推送方法
# 这里的设计便于未来扩展其他搜索引擎的推送功能
SpiderNotify.baidu_notify(url)

@ -0,0 +1,29 @@
from django.test import TestCase # 导入Django测试框架的核心类
from djangoblog.utils import * # 导入需要测试的工具函数
class DjangoBlogTest(TestCase):
"""Django博客工具函数测试类继承自TestCase以获得Django测试框架的全部功能"""
def setUp(self):
"""
测试前置设置方法在每个测试方法执行前自动调用
用于初始化测试环境如创建测试数据配置设置等
当前测试不需要特殊设置所以使用pass跳过
"""
pass
# 如果需要,可以在这里创建测试用的模型实例或设置测试环境
# 例如self.user = User.objects.create_user(username='testuser', password='testpass')
def test_utils(self):
"""测试工具函数的核心测试方法,包含多个工具函数的验证"""
# 测试SHA256哈希函数
md5 = get_sha256('test') # 对'test'字符串进行SHA256加密
self.assertIsNotNone(md5) # 断言结果不为None验证函数正常工作
# 更完整的测试可以添加self.assertEqual(len(md5), 64) # SHA256结果应为64字符
# 测试CommonMarkdown的Markdown解析功能
c = CommonMarkdown.get_markdown('''
# Title1 # 一级标题

@ -0,0 +1,106 @@
"""
djangoblog URL Configuration
URL配置是Django网站的目录本质是URL与视图函数之间的映射表
通过此文件告诉Django对于哪个URL调用哪段代码
配置说明文档
https://docs.djangoproject.com/en/1.10/topics/http/urls/
示例
函数视图
1. 导入视图from my_app import views
2. 添加URL模式url(r'^$', views.home, name='home')
基于类的视图
1. 导入视图from other_app.views import Home
2. 添加URL模式url(r'^$', Home.as_view(), name='home')
包含其他URL配置
1. 导入include函数from django.conf.urls import url, include
2. 添加URL模式url(r'^blog/', include('blog.urls'))
"""
from django.conf import settings
from django.conf.urls.i18n import i18n_patterns # 国际化URL模式支持
from django.conf.urls.static import static # 静态文件服务
from django.contrib.sitemaps.views import sitemap # 站点地图视图
from django.urls import path, include
from django.urls import re_path # 正则表达式URL匹配
from haystack.views import search_view_factory # Haystack搜索视图工厂
# 导入自定义模块
from blog.views import EsSearchView
from djangoblog.admin_site import admin_site # 自定义admin站点
from djangoblog.elasticsearch_backend import ElasticSearchModelSearchForm
from djangoblog.feeds import DjangoBlogFeed # RSS订阅源
from djangoblog.sitemap import ArticleSiteMap, CategorySiteMap, StaticViewSitemap, TagSiteMap, UserSiteMap
# 站点地图配置字典,定义不同类型的站点地图[1,3](@ref)
sitemaps = {
'blog': ArticleSiteMap, # 文章站点地图
'Category': CategorySiteMap, # 分类站点地图
'Tag': TagSiteMap, # 标签站点地图
'User': UserSiteMap, # 用户站点地图
'static': StaticViewSitemap # 静态页面站点地图
}
# 自定义错误处理视图[2](@ref)
handler404 = 'blog.views.page_not_found_view' # 404页面未找到处理
handler500 = 'blog.views.server_error_view' # 500服务器错误处理
handle403 = 'blog.views.permission_denied_view' # 403权限拒绝处理
# 基础URL模式配置
urlpatterns = [
# 国际化URL支持提供语言切换功能[7,8](@ref)
path('i18n/', include('django.conf.urls.i18n')),
]
# 使用i18n_patterns为URL添加语言前缀支持国际化[6,7](@ref)
urlpatterns += i18n_patterns(
# 管理员后台URL使用自定义的admin_site[4](@ref)
re_path(r'^admin/', admin_site.urls),
# 博客应用URL包含博客相关所有路由[1,4](@ref)
re_path(r'', include('blog.urls', namespace='blog')),
# Markdown编辑器URL[4](@ref)
re_path(r'mdeditor/', include('mdeditor.urls')),
# 评论系统URL[4](@ref)
re_path(r'', include('comments.urls', namespace='comment')),
# 用户账户URL[4](@ref)
re_path(r'', include('accounts.urls', namespace='account')),
# OAuth认证URL[4](@ref)
re_path(r'', include('oauth.urls', namespace='oauth')),
# 站点地图XML文件URL[1](@ref)
re_path(r'^sitemap\.xml$', sitemap, {'sitemaps': sitemaps},
name='django.contrib.sitemaps.views.sitemap'),
# RSS订阅源URLfeed和rss两个端点[1](@ref)
re_path(r'^feed/$', DjangoBlogFeed()),
re_path(r'^rss/$', DjangoBlogFeed()),
# 搜索功能URL使用ElasticSearch作为后端[4](@ref)
re_path('^search', search_view_factory(view_class=EsSearchView, form_class=ElasticSearchModelSearchForm),
name='search'),
# 服务器管理URL[4](@ref)
re_path(r'', include('servermanager.urls', namespace='servermanager')),
# OwnTracks位置跟踪URL[4](@ref)
re_path(r'', include('owntracks.urls', namespace='owntracks')),
# 不强制为默认语言添加前缀[6,7](@ref)
prefix_default_language=False
)
# 静态文件服务配置(开发环境)[2](@ref)
+ static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
# 开发环境下媒体文件服务配置[2](@ref)
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL,
document_root=settings.MEDIA_ROOT)

@ -0,0 +1,313 @@
#!/usr/bin/env python
# encoding: utf-8
import logging
import os
import random
import string
import uuid
from hashlib import sha256
import bleach # HTML清理库用于防止XSS攻击
import markdown # Markdown解析库
import requests # HTTP请求库
from django.conf import settings
from django.contrib.sites.models import Site # Django站点框架
from django.core.cache import cache # Django缓存框架
from django.templatetags.static import static # 静态文件URL生成
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""获取最大的文章ID和评论ID"""
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""计算字符串的SHA256哈希值
Args:
str: 要计算哈希的字符串
Returns:
str: 64位的十六进制哈希值
"""
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
"""缓存装饰器,用于缓存函数结果
Args:
expiration: 缓存过期时间默认3分钟
Returns:
function: 装饰器函数
"""
def wrapper(func):
def news(*args, **kwargs):
try:
# 尝试从视图类获取缓存键
view = args[0]
key = view.get_cache_key()
except:
key = None
if not key:
# 如果没有特定的缓存键,根据函数参数生成唯一键
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
# 尝试从缓存获取结果
value = cache.get(key)
if value is not None:
# logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
# 缓存未命中,执行函数并缓存结果
logger.debug('cache_decorator set cache:%s key:%s' % (func.__name__, key))
value = func(*args, **kwargs)
if value is None:
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''刷新视图缓存
Args:
path: URL路径
servername: 服务器主机名
serverport: 服务器端口
key_prefix: 缓存键前缀
Returns:
bool: 是否成功删除
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
# 创建模拟请求对象
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
# 获取缓存键并删除
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
return True
return False
@cache_decorator() # 应用缓存装饰器
def get_current_site():
"""获取当前站点信息"""
site = Site.objects.get_current()
return site
class CommonMarkdown:
"""Markdown处理工具类"""
@staticmethod
def _convert_markdown(value):
"""内部方法转换Markdown为HTML
Args:
value: Markdown格式文本
Returns:
tuple: (HTML内容, 目录)
"""
# 配置Markdown扩展
md = markdown.Markdown(
extensions=[
'extra', # 额外语法支持
'codehilite', # 代码高亮
'toc', # 目录生成
'tables', # 表格支持
]
)
body = md.convert(value) # 转换Markdown为HTML
toc = md.toc # 获取目录
return body, toc
@staticmethod
def get_markdown_with_toc(value):
"""获取带目录的Markdown转换结果"""
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
"""获取Markdown转换结果不含目录"""
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""发送邮件(通过信号机制)
Args:
emailto: 收件人邮箱
title: 邮件标题
content: 邮件内容
"""
from djangoblog.blog_signals import send_email_signal
# 使用Django信号发送邮件实现解耦[9,10,11](@ref)
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
def generate_code() -> str:
"""生成6位随机数字验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
"""将字典转换为URL参数字符串
Args:
dict: 参数字典
Returns:
str: URL参数字符串
"""
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
def get_blog_setting():
"""获取博客设置,使用缓存提高性能[6,8](@ref)
Returns:
BlogSettings: 博客设置对象
"""
value = cache.get('get_blog_setting')
if value:
return value
else:
# 缓存未命中,从数据库获取
from blog.models import BlogSettings
if not BlogSettings.objects.count():
# 如果不存在设置,创建默认设置
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value) # 设置缓存
return value
def save_user_avatar(url):
'''保存用户头像到本地
Args:
url: 头像URL地址
Returns:
str: 本地静态文件路径
'''
logger.info(url)
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2) # 下载头像
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir) # 创建目录
# 检查图片扩展名
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
save_filename = str(uuid.uuid4().hex) + ext # 生成唯一文件名
logger.info('保存用户头像:' + basedir + save_filename)
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content) # 保存文件
return static('avatar/' + save_filename) # 返回静态文件URL
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png') # 返回默认头像
def delete_sidebar_cache():
"""删除侧边栏相关缓存"""
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
cache.delete(k)
def delete_view_cache(prefix, keys):
"""删除视图缓存
Args:
prefix: 缓存前缀
keys: 缓存键
"""
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
"""获取资源URL基础路径"""
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
# HTML标签和属性白名单用于防止XSS攻击
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i',
'li', 'ol', 'pre', 'strong', 'ul', 'h1', 'h2', 'p']
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
def sanitize_html(html):
"""清理HTML移除不安全的标签和属性
Args:
html: 要清理的HTML内容
Returns:
str: 安全的HTML内容
"""
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)

@ -0,0 +1,928 @@
# encoding: utf-8
from __future__ import absolute_import, division, print_function, unicode_literals
import json
import os
import re
import shutil
import threading
import warnings
import six
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from datetime import datetime
from django.utils.encoding import force_str
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, EmptyResults, log_query
from haystack.constants import DJANGO_CT, DJANGO_ID, ID
from haystack.exceptions import MissingDependency, SearchBackendError, SkipDocument
from haystack.inputs import Clean, Exact, PythonData, Raw
from haystack.models import SearchResult
from haystack.utils import get_identifier, get_model_ct
from haystack.utils import log as logging
from haystack.utils.app_loading import haystack_get_model
from jieba.analyse import ChineseAnalyzer # 导入jieba中文分词器
from whoosh import index
from whoosh.analysis import StemmingAnalyzer
from whoosh.fields import BOOLEAN, DATETIME, IDLIST, KEYWORD, NGRAM, NGRAMWORDS, NUMERIC, Schema, TEXT
from whoosh.fields import ID as WHOOSH_ID
from whoosh.filedb.filestore import FileStorage, RamStorage
from whoosh.highlight import ContextFragmenter, HtmlFormatter
from whoosh.highlight import highlight as whoosh_highlight
from whoosh.qparser import QueryParser
from whoosh.searching import ResultsPage
from whoosh.writing import AsyncWriter
# 检查whoosh依赖是否安装
try:
import whoosh
except ImportError:
raise MissingDependency(
"The 'whoosh' backend requires the installation of 'Whoosh'. Please refer to the documentation.")
# 检查whoosh版本要求
if not hasattr(whoosh, '__version__') or whoosh.__version__ < (2, 5, 0):
raise MissingDependency(
"The 'whoosh' backend requires version 2.5.0 or greater.")
# 日期时间正则表达式,用于解析日期格式
DATETIME_REGEX = re.compile(
'^(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})T(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})(\.\d{3,6}Z?)?$')
# 线程本地存储用于RAM存储
LOCALS = threading.local()
LOCALS.RAM_STORE = None
class WhooshHtmlFormatter(HtmlFormatter):
"""
自定义HTML格式化器用于搜索结果高亮显示
比whoosh原生的HtmlFormatter更简单保持跨后端的一致性
"""
template = '<%(tag)s>%(t)s</%(tag)s>' # 高亮模板
class WhooshSearchBackend(BaseSearchBackend):
"""Whoosh搜索引擎后端实现类继承自Haystack的BaseSearchBackend"""
# Whoosh保留关键字列表
RESERVED_WORDS = (
'AND', 'NOT', 'OR', 'TO',
)
# Whoosh保留字符列表
RESERVED_CHARACTERS = (
'\\', '+', '-', '&&', '||', '!', '(', ')', '{', '}',
'[', ']', '^', '"', '~', '*', '?', ':', '.',
)
def __init__(self, connection_alias, **connection_options):
"""初始化Whoosh后端
Args:
connection_alias: 连接别名
**connection_options: 连接选项包括PATHSTORAGE等
"""
super(WhooshSearchBackend, self).__init__(connection_alias, **connection_options)
self.setup_complete = False
self.use_file_storage = True
self.post_limit = getattr(connection_options, 'POST_LIMIT', 128 * 1024 * 1024) # 帖子大小限制
self.path = connection_options.get('PATH') # 索引文件路径
# 判断使用文件存储还是内存存储
if connection_options.get('STORAGE', 'file') != 'file':
self.use_file_storage = False
# 文件存储必须指定路径
if self.use_file_storage and not self.path:
raise ImproperlyConfigured(
"You must specify a 'PATH' in your settings for connection '%s'." % connection_alias)
self.log = logging.getLogger('haystack')
def setup(self):
"""初始化设置,延迟加载直到需要时执行"""
from haystack import connections
new_index = False
# 确保索引目录存在(文件存储模式)
if self.use_file_storage and not os.path.exists(self.path):
os.makedirs(self.path)
new_index = True
# 检查目录写入权限
if self.use_file_storage and not os.access(self.path, os.W_OK):
raise IOError("The path to your Whoosh index '%s' is not writable for the current user/group." % self.path)
# 设置存储后端
if self.use_file_storage:
self.storage = FileStorage(self.path) # 文件存储
else:
global LOCALS
if getattr(LOCALS, 'RAM_STORE', None) is None:
LOCALS.RAM_STORE = RamStorage() # 内存存储
self.storage = LOCALS.RAM_STORE
# 构建schema和获取内容字段名
self.content_field_name, self.schema = self.build_schema(
connections[self.connection_alias].get_unified_index().all_searchfields())
self.parser = QueryParser(self.content_field_name, schema=self.schema) # 查询解析器
# 创建或打开索引
if new_index is True:
self.index = self.storage.create_index(self.schema)
else:
try:
self.index = self.storage.open_index(schema=self.schema)
except index.EmptyIndexError:
self.index = self.storage.create_index(self.schema)
self.setup_complete = True
def build_schema(self, fields):
"""构建Whoosh的schema表结构
Args:
fields: 搜索字段字典
Returns:
tuple: (内容字段名, schema对象)
"""
# 基础字段定义
schema_fields = {
ID: WHOOSH_ID(stored=True, unique=True), # 文档ID
DJANGO_CT: WHOOSH_ID(stored=True), # Django内容类型
DJANGO_ID: WHOOSH_ID(stored=True), # Django对象ID
}
initial_key_count = len(schema_fields) # 初始字段数量
content_field_name = '' # 内容字段名
# 遍历所有字段进行类型映射
for field_name, field_class in fields.items():
if field_class.is_multivalued: # 多值字段
if field_class.indexed is False:
schema_fields[field_class.index_fieldname] = IDLIST(
stored=True, field_boost=field_class.boost)
else:
schema_fields[field_class.index_fieldname] = KEYWORD(
stored=True, commas=True, scorable=True, field_boost=field_class.boost)
elif field_class.field_type in ['date', 'datetime']: # 日期时间字段
schema_fields[field_class.index_fieldname] = DATETIME(
stored=field_class.stored, sortable=True)
elif field_class.field_type == 'integer': # 整数字段
schema_fields[field_class.index_fieldname] = NUMERIC(
stored=field_class.stored, numtype=int, field_boost=field_class.boost)
elif field_class.field_type == 'float': # 浮点数字段
schema_fields[field_class.index_fieldname] = NUMERIC(
stored=field_class.stored, numtype=float, field_boost=field_class.boost)
elif field_class.field_type == 'boolean': # 布尔字段
schema_fields[field_class.index_fieldname] = BOOLEAN(stored=field_class.stored)
elif field_class.field_type == 'ngram': # N-gram字段
schema_fields[field_class.index_fieldname] = NGRAM(
minsize=3, maxsize=15, stored=field_class.stored, field_boost=field_class.boost)
elif field_class.field_type == 'edge_ngram': # 边缘N-gram字段
schema_fields[field_class.index_fieldname] = NGRAMWORDS(
minsize=2, maxsize=15, at='start', stored=field_class.stored, field_boost=field_class.boost)
else: # 文本字段,使用中文分词器[1,3,6](@ref)
schema_fields[field_class.index_fieldname] = TEXT(
stored=True, analyzer=ChineseAnalyzer(), field_boost=field_class.boost, sortable=True)
# 标记文档主字段
if field_class.document is True:
content_field_name = field_class.index_fieldname
schema_fields[field_class.index_fieldname].spelling = True # 启用拼写检查
# 检查是否有有效字段
if len(schema_fields) <= initial_key_count:
raise SearchBackendError(
"No fields were found in any search_indexes. Please correct this before attempting to search.")
return (content_field_name, Schema(**schema_fields))
def update(self, index, iterable, commit=True):
"""更新索引文档
Args:
index: 索引对象
iterable: 可迭代的对象集合
commit: 是否提交更改
"""
if not self.setup_complete:
self.setup()
self.index = self.index.refresh()
writer = AsyncWriter(self.index) # 异步写入器
# 遍历所有对象并更新索引
for obj in iterable:
try:
doc = index.full_prepare(obj) # 准备文档数据
except SkipDocument:
self.log.debug(u"Indexing for object `%s` skipped", obj)
else:
# 确保所有值为unicode格式
for key in doc:
doc[key] = self._from_python(doc[key])
# Whoosh 2.5.0+不支持文档boost
if 'boost' in doc:
del doc['boost']
# 更新文档
try:
writer.update_document(**doc)
except Exception as e:
if not self.silently_fail:
raise
self.log.error(u"%s while preparing object for update" % e.__class__.__name__,
exc_info=True, extra={"data": {"index": index, "object": get_identifier(obj)}})
# 提交更改
if len(iterable) > 0:
writer.commit()
def remove(self, obj_or_string, commit=True):
"""从索引中移除文档
Args:
obj_or_string: 对象或标识符
commit: 是否提交更改
"""
if not self.setup_complete:
self.setup()
self.index = self.index.refresh()
whoosh_id = get_identifier(obj_or_string)
try:
# 通过ID删除文档
self.index.delete_by_query(q=self.parser.parse(u'%s:"%s"' % (ID, whoosh_id)))
except Exception as e:
if not self.silently_fail:
raise
self.log.error("Failed to remove document '%s' from Whoosh: %s", whoosh_id, e, exc_info=True)
def clear(self, models=None, commit=True):
"""清空索引
Args:
models: 要清空的模型列表None表示清空所有
commit: 是否提交更改
"""
if not self.setup_complete:
self.setup()
self.index = self.index.refresh()
if models is not None:
assert isinstance(models, (list, tuple))
try:
if models is None: # 清空整个索引
self.delete_index()
else: # 只清空指定模型的索引
models_to_delete = []
for model in models:
models_to_delete.append(u"%s:%s" % (DJANGO_CT, get_model_ct(model)))
self.index.delete_by_query(q=self.parser.parse(u" OR ".join(models_to_delete)))
except Exception as e:
if not self.silently_fail:
raise
if models is not None:
self.log.error("Failed to clear Whoosh index of models '%s': %s",
','.join(models_to_delete), e, exc_info=True)
else:
self.log.error("Failed to clear Whoosh index: %s", e, exc_info=True)
def delete_index(self):
"""删除整个索引(高效方式)"""
# 文件存储:直接删除目录[3,8](@ref)
if self.use_file_storage and os.path.exists(self.path):
shutil.rmtree(self.path)
elif not self.use_file_storage: # 内存存储:清理存储
self.storage.clean()
# 重新创建索引
self.setup()
def optimize(self):
"""优化索引性能"""
if not self.setup_complete:
self.setup()
self.index = self.index.refresh()
self.index.optimize()
def calculate_page(self, start_offset=0, end_offset=None):
"""计算分页信息
Args:
start_offset: 起始偏移量
end_offset: 结束偏移量
Returns:
tuple: (页码, 页大小)
"""
# 防止Whoosh错误需要end_offset大于0
if end_offset is not None and end_offset <= 0:
end_offset = 1
# 计算页码
page_num = 0
if end_offset is None:
end_offset = 1000000
if start_offset is None:
start_offset = 0
page_length = end_offset - start_offset
if page_length and page_length > 0:
page_num = int(start_offset / page_length)
# Whoosh使用1-based页码
page_num += 1
return page_num, page_length
@log_query
def search(self, query_string, sort_by=None, start_offset=0, end_offset=None, fields='',
highlight=False, facets=None, date_facets=None, query_facets=None, narrow_queries=None,
spelling_query=None, within=None, dwithin=None, distance_point=None, models=None,
limit_to_registered_models=None, result_class=None, **kwargs):
"""执行搜索查询[2,5,8](@ref)
Args:
query_string: 查询字符串
sort_by: 排序字段
start_offset: 起始偏移
end_offset: 结束偏移
highlight: 是否高亮
...其他参数...
Returns:
dict: 搜索结果字典
"""
if not self.setup_complete:
self.setup()
# 空查询返回空结果
if len(query_string) == 0:
return {'results': [], 'hits': 0}
query_string = force_str(query_string)
# 单字符非通配符查询返回空结果(被停用词过滤)
if len(query_string) <= 1 and query_string != u'*':
return {'results': [], 'hits': 0}
# 处理排序方向
reverse = False
if sort_by is not None:
# 检查所有排序字段是否同向
sort_by_list = []
reverse_counter = 0
for order_by in sort_by:
if order_by.startswith('-'):
reverse_counter += 1
if reverse_counter and reverse_counter != len(sort_by):
raise SearchBackendError("Whoosh requires all order_by fields to use the same sort direction")
# 处理排序字段
for order_by in sort_by:
if order_by.startswith('-'):
sort_by_list.append(order_by[1:])
if len(sort_by_list) == 1:
reverse = True
else:
sort_by_list.append(order_by)
if len(sort_by_list) == 1:
reverse = False
sort_by = sort_by_list[0]
# Whoosh不支持facets功能[8](@ref)
if facets is not None:
warnings.warn("Whoosh does not handle faceting.", Warning, stacklevel=2)
if date_facets is not None:
warnings.warn("Whoosh does not handle date faceting.", Warning, stacklevel=2)
if query_facets is not None:
warnings.warn("Whoosh does not handle query faceting.", Warning, stacklevel=2)
# 窄化查询处理
narrowed_results = None
self.index = self.index.refresh()
# 模型限制处理
if limit_to_registered_models is None:
limit_to_registered_models = getattr(settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)
if models and len(models):
model_choices = sorted(get_model_ct(model) for model in models)
elif limit_to_registered_models:
model_choices = self.build_models_list()
else:
model_choices = []
# 添加模型过滤条件
if len(model_choices) > 0:
if narrow_queries is None:
narrow_queries = set()
narrow_queries.add(' OR '.join(['%s:%s' % (DJANGO_CT, rm) for rm in model_choices]))
# 执行窄化查询
narrow_searcher = None
if narrow_queries is not None:
narrow_searcher = self.index.searcher()
for nq in narrow_queries:
recent_narrowed_results = narrow_searcher.search(
self.parser.parse(force_str(nq)), limit=None)
if len(recent_narrowed_results) <= 0:
return {'results': [], 'hits': 0}
if narrowed_results:
narrowed_results.filter(recent_narrowed_results)
else:
narrowed_results = recent_narrowed_results
self.index = self.index.refresh()
# 执行主搜索查询
if self.index.doc_count():
searcher = self.index.searcher()
parsed_query = self.parser.parse(query_string)
# 处理无效查询
if parsed_query is None:
return {'results': [], 'hits': 0}
# 计算分页
page_num, page_length = self.calculate_page(start_offset, end_offset)
search_kwargs = {
'pagelen': page_length,
'sortedby': sort_by,
'reverse': reverse,
}
# 应用窄化过滤
if narrowed_results is not None:
search_kwargs['filter'] = narrowed_results
try:
raw_page = searcher.search_page(parsed_query, page_num, **search_kwargs)
except ValueError:
if not self.silently_fail:
raise
return {'results': [], 'hits': 0}
# 检查页码有效性
if raw_page.pagenum < page_num:
return {'results': [], 'hits': 0}
# 处理结果
results = self._process_results(raw_page, highlight=highlight, query_string=query_string,
spelling_query=spelling_query, result_class=result_class)
searcher.close()
if hasattr(narrow_searcher, 'close'):
narrow_searcher.close()
return results
else:
# 无文档时的处理
if self.include_spelling:
spelling_suggestion = self.create_spelling_suggestion(
spelling_query if spelling_query else query_string)
else:
spelling_suggestion = None
return {
'results': [],
'hits': 0,
'spelling_suggestion': spelling_suggestion,
}
def more_like_this(self, model_instance, additional_query_string=None, start_offset=0, end_offset=None,
models=None, limit_to_registered_models=None, result_class=None, **kwargs):
"""查找相似文档[8](@ref)
Args:
model_instance: 模型实例
additional_query_string: 附加查询条件
...其他参数...
Returns:
dict: 相似结果
"""
if not self.setup_complete:
self.setup()
# 获取模型信息
model_klass = model_instance._meta.concrete_model
field_name = self.content_field_name
narrow_queries = set()
narrowed_results = None
self.index = self.index.refresh()
# 模型过滤处理
if limit_to_registered_models is None:
limit_to_registered_models = getattr(settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)
if models and len(models):
model_choices = sorted(get_model_ct(model) for model in models)
elif limit_to_registered_models:
model_choices = self.build_models_list()
else:
model_choices = []
# 添加模型过滤条件
if len(model_choices) > 0:
if narrow_queries is None:
narrow_queries = set()
narrow_queries.add(' OR '.join(['%s:%s' % (DJANGO_CT, rm) for rm in model_choices]))
# 添加附加查询条件
if additional_query_string and additional_query_string != '*':
narrow_queries.add(additional_query_string)
# 执行窄化查询
narrow_searcher = None
if narrow_queries is not None:
narrow_searcher = self.index.searcher()
for nq in narrow_queries:
recent_narrowed_results = narrow_searcher.search(self.parser.parse(force_str(nq)), limit=None)
if len(recent_narrowed_results) <= 0:
return {'results': [], 'hits': 0}
if narrowed_results:
narrowed_results.filter(recent_narrowed_results)
else:
narrowed_results = recent_narrowed_results
# 计算分页
page_num, page_length = self.calculate_page(start_offset, end_offset)
self.index = self.index.refresh()
raw_results = EmptyResults()
# 执行相似文档查询
if self.index.doc_count():
query = "%s:%s" % (ID, get_identifier(model_instance))
searcher = self.index.searcher()
parsed_query = self.parser.parse(query)
results = searcher.search(parsed_query)
if len(results):
raw_results = results[0].more_like_this(field_name, top=end_offset)
# 应用窄化过滤
if narrowed_results is not None and hasattr(raw_results, 'filter'):
raw_results.filter(narrowed_results)
# 分页处理
try:
raw_page = ResultsPage(raw_results, page_num, page_length)
except ValueError:
if not self.silently_fail:
raise
return {'results': [], 'hits': 0}
if raw_page.pagenum < page_num:
return {'results': [], 'hits': 0}
results = self._process_results(raw_page, result_class=result_class)
searcher.close()
if hasattr(narrow_searcher, 'close'):
narrow_searcher.close()
return results
def _process_results(self, raw_page, highlight=False, query_string='', spelling_query=None, result_class=None):
"""处理原始搜索结果
Args:
raw_page: 原始结果页
highlight: 是否高亮
query_string: 查询字符串
spelling_query: 拼写查询
result_class: 结果类
Returns:
dict: 处理后的结果
"""
from haystack import connections
results = []
hits = len(raw_page) # 命中数
if result_class is None:
result_class = SearchResult
facets = {}
spelling_suggestion = None
unified_index = connections[self.connection_alias].get_unified_index()
indexed_models = unified_index.get_indexed_models()
# 处理每个搜索结果
for doc_offset, raw_result in enumerate(raw_page):
score = raw_page.score(doc_offset) or 0
app_label, model_name = raw_result[DJANGO_CT].split('.')
additional_fields = {}
model = haystack_get_model(app_label, model_name)
if model and model in indexed_models:
# 处理每个字段值
for key, value in raw_result.items():
index = unified_index.get_index(model)
string_key = str(key)
if string_key in index.fields and hasattr(index.fields[string_key], 'convert'):
# 特殊处理多值字段
if index.fields[string_key].is_multivalued:
if value is None or len(value) == 0:
additional_fields[string_key] = []
else:
additional_fields[string_key] = value.split(',')
else:
additional_fields[string_key] = index.fields[string_key].convert(value)
else:
additional_fields[string_key] = self._to_python(value)
# 移除系统字段
del (additional_fields[DJANGO_CT])
del (additional_fields[DJANGO_ID])
# 高亮处理
if highlight:
sa = StemmingAnalyzer()
formatter = WhooshHtmlFormatter('em')
terms = [token.text for token in sa(query_string)]
whoosh_result = whoosh_highlight(
additional_fields.get(self.content_field_name), terms, sa,
ContextFragmenter(), formatter
)
additional_fields['highlighted'] = {self.content_field_name: [whoosh_result]}
# 创建结果对象
result = result_class(app_label, model_name, raw_result[DJANGO_ID], score, **additional_fields)
results.append(result)
else:
hits -= 1 # 调整命中数
# 拼写建议
if self.include_spelling:
spelling_suggestion = self.create_spelling_suggestion(
spelling_query if spelling_query else query_string)
return {
'results': results,
'hits': hits,
'facets': facets,
'spelling_suggestion': spelling_suggestion,
}
def create_spelling_suggestion(self, query_string):
"""创建拼写建议
Args:
query_string: 查询字符串
Returns:
str: 拼写建议
"""
spelling_suggestion = None
reader = self.index.reader()
corrector = reader.corrector(self.content_field_name)
cleaned_query = force_str(query_string)
if not query_string:
return spelling_suggestion
# 清理查询字符串中的保留字
for rev_word in self.RESERVED_WORDS:
cleaned_query = cleaned_query.replace(rev_word, '')
for rev_char in self.RESERVED_CHARACTERS:
cleaned_query = cleaned_query.replace(rev_char, '')
# 分词并获取建议
query_words = cleaned_query.split()
suggested_words = []
for word in query_words:
suggestions = corrector.suggest(word, limit=1)
if len(suggestions) > 0:
suggested_words.append(suggestions[0])
spelling_suggestion = ' '.join(suggested_words)
return spelling_suggestion
def _from_python(self, value):
"""将Python值转换为Whoosh字符串格式
Args:
value: Python值
Returns:
str: Whoosh格式字符串
"""
if hasattr(value, 'strftime'): # 日期时间处理
if not hasattr(value, 'hour'):
value = datetime(value.year, value.month, value.day, 0, 0, 0)
elif isinstance(value, bool): # 布尔值处理
value = 'true' if value else 'false'
elif isinstance(value, (list, tuple)): # 列表元组处理
value = u','.join([force_str(v) for v in value])
elif isinstance(value, (six.integer_types, float)): # 数字保持原样
pass
else: # 其他转为字符串
value = force_str(value)
return value
def _to_python(self, value):
"""将Whoosh值转换为Python原生值
Args:
value: Whoosh值
Returns:
object: Python值
"""
if value == 'true':
return True
elif value == 'false':
return False
# 日期时间解析
if value and isinstance(value, six.string_types):
possible_datetime = DATETIME_REGEX.search(value)
if possible_datetime:
date_values = possible_datetime.groupdict()
for dk, dv in date_values.items():
date_values[dk] = int(dv)
return datetime(date_values['year'], date_values['month'], date_values['day'],
date_values['hour'], date_values['minute'], date_values['second'])
# JSON解析尝试
try:
converted_value = json.loads(value)
if isinstance(converted_value, (list, tuple, set, dict, six.integer_types, float, complex)):
return converted_value
except:
pass
return value
class WhooshSearchQuery(BaseSearchQuery):
"""Whoosh搜索查询类处理查询构建"""
def _convert_datetime(self, date):
"""转换日期时间格式"""
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S'))
else:
return force_str(date.strftime('%Y%m%d000000'))
def clean(self, query_fragment):
"""清理查询片段,转义保留字符[8](@ref)
Args:
query_fragment: 查询片段
Returns:
str: 清理后的查询
"""
words = query_fragment.split()
cleaned_words = []
for word in words:
if word in self.backend.RESERVED_WORDS: # 保留字转小写
word = word.replace(word, word.lower())
for char in self.backend.RESERVED_CHARACTERS: # 保留字符加引号
if char in word:
word = "'%s'" % word
break
cleaned_words.append(word)
return ' '.join(cleaned_words)
def build_query_fragment(self, field, filter_type, value):
"""构建查询片段
Args:
field: 字段名
filter_type: 过滤类型
value: 字段值
Returns:
str: 查询片段
"""
from haystack import connections
query_frag = ''
is_datetime = False
# 处理值类型
if not hasattr(value, 'input_type_name'):
if hasattr(value, 'strftime'):
is_datetime = True
if isinstance(value, six.string_types) and value != ' ':
value = Clean(value) # 文本清理
else:
value = PythonData(value) # Python数据
prepared_value = value.prepare(self) # 准备值
if not isinstance(prepared_value, (set, list, tuple)):
prepared_value = self.backend._from_python(prepared_value)
# 字段名处理
if field == 'content': # 内容字段特殊处理
index_fieldname = ''
else:
index_fieldname = u'%s:' % connections[self._using].get_unified_index().get_index_fieldname(field)
# 过滤类型映射
filter_types = {
'content': '%s',
'contains': '*%s*',
'endswith': "*%s",
'startswith': "%s*",
'exact': '%s',
'gt': "{%s to}",
'gte': "[%s to]",
'lt': "{to %s}",
'lte': "[to %s]",
'fuzzy': u'%s~',
}
# 构建查询片段
if value.post_process is False:
query_frag = prepared_value
else:
if filter_type in ['content', 'contains', 'startswith', 'endswith', 'fuzzy']:
if value.input_type_name == 'exact':
query_frag = prepared_value
else:
# 多术语处理
terms = []
if isinstance(prepared_value, six.string_types):
possible_values = prepared_value.split(' ')
else:
if is_datetime is True:
prepared_value = self._convert_datetime(prepared_value)
possible_values = [prepared_value]
for possible_value in possible_values:
terms.append(filter_types[filter_type] % self.backend._from_python(possible_value))
if len(terms) == 1:
query_frag = terms[0]
else:
query_frag = u"(%s)" % " AND ".join(terms)
elif filter_type == 'in': # IN查询
in_options = []
for possible_value in prepared_value:
is_datetime = False
if hasattr(possible_value, 'strftime'):
is_datetime = True
pv = self.backend._from_python(possible_value)
if is_datetime is True:
pv = self._convert_datetime(pv)
if isinstance(pv, six.string_types) and not is_datetime:
in_options.append('"%s"' % pv)
else:
in_options.append('%s' % pv)
query_frag = "(%s)" % " OR ".join(in_options)
elif filter_type == 'range': # 范围查询
start = self.backend._from_python(prepared_value[0])
end = self.backend._from_python(prepared_value[1])
if hasattr(prepared_value[0], 'strftime'):
start = self._convert_datetime(start)
if hasattr(prepared_value[1], 'strftime'):
end = self._convert_datetime(end)
query_frag = u"[%s to %s]" % (start, end)
elif filter_type == 'exact': # 精确匹配
if value.input_type_name == 'exact':
query_frag = prepared_value
else:
prepared_value = Exact(prepared_value).prepare(self)
query_frag = filter_types[filter_type] % prepared_value
else: # 其他类型
if is_datetime is True:
prepared_value = self._convert_datetime(prepared_value)
query_frag = filter_types[filter_type] % prepared_value
# 添加括号
if len(query_frag) and not isinstance(value, Raw):
if not query_frag.startswith('(') and not query_frag.endswith(')'):
query_frag = "(%s)" % query_frag
return u"%s%s" % (index_fieldname, query_frag)
class WhooshEngine(BaseEngine):
"""Whoosh搜索引擎引擎类"""
backend = WhooshSearchBackend # 后端类
query = WhooshSearchQuery # 查询类

@ -0,0 +1,22 @@
"""
WSGI config for djangoblog project.
WSGIWeb Server Gateway Interface是Python的Web服务器网关接口是Django的主要部署平台
这个文件包含了WSGI可调用对象作为模块级别的变量名为`application`
更多关于此文件的信息请参考
https://docs.djangoproject.com/en/1.10/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
# 设置Django的默认设置模块环境变量
# DJANGO_SETTINGS_MODULE环境变量告诉Django应该使用哪个设置模块
# 当WSGI服务器加载应用时Django需要知道使用哪个设置文件来配置整个应用[6,7](@ref)
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoblog.settings")
# 获取WSGI可调用应用程序对象
# 这个application对象是WSGI服务器与Django应用通信的接口[6,9](@ref)
# get_wsgi_application()函数返回一个符合WSGI标准的可调用应用程序对象
application = get_wsgi_application()
Loading…
Cancel
Save