sh_djangoblog APP注释

sh_branch
孙慧 3 months ago
parent 190075c0fd
commit a40a2b80c4

@ -0,0 +1,2 @@
"""sh:该配置类通常位于djangoblog/apps.py文件中可用于定义应用的名称、信号注册、启动时执行的操作等"""
default_app_config = 'djangoblog.apps.DjangoblogAppConfig'

@ -0,0 +1,63 @@
from django.contrib.admin import AdminSite
from django.contrib.admin.models import LogEntry
from django.contrib.sites.admin import SiteAdmin
from django.contrib.sites.models import Site
from accounts.admin import UserAdmin, GroupAdmin
from blog.admin import PostAdmin, CategoryAdmin
from blog.models import Post, Category
from comments.admin import CommentAdmin
from comments.models import Comment
from djangoblog.logentryadmin import LogEntryAdmin
from oauth.admin import OAuthAppAdmin, OAuthUserAdmin
from oauth.models import OAuthApp, OAuthUser
from owntracks.admin import DeviceAdmin, LocationAdmin
from owntracks.models import Device, Location
from servermanager.admin import ServerAdmin, TaskAdmin
from servermanager.models import Server, Task
class DjangoBlogAdminSite(AdminSite):
"""
sh:
自定义Admin站点类继承自Django的AdminSite
用于定制博客系统的管理后台特性
"""
# 管理后台页面顶部的标题
site_header = 'djangoblog administration'
# 浏览器标签页显示的标题
site_title = 'djangoblog site admin'
def __init__(self, name='admin'):
# 调用父类构造方法初始化
super().__init__(name)
def has_permission(self, request):
"""
重写权限检查方法仅允许超级用户访问管理后台
"""
return request.user.is_superuser
admin_site = DjangoBlogAdminSite(name='admin')
"""注册博客核心模型及对应的Admin配置"""
admin_site.register(Article, ArticlelAdmin)
admin_site.register(Category, CategoryAdmin)
admin_site.register(Tag, TagAdmin)
admin_site.register(Links, LinksAdmin)
admin_site.register(SideBar, SideBarAdmin)
admin_site.register(BlogSettings, BlogSettingsAdmin)
"""注册服务器管理相关模型"""
admin_site.register(commands, CommandsAdmin)
admin_site.register(EmailSendLog, EmailSendLogAdmin)
"""注册用户相关模型"""
admin_site.register(BlogUser, BlogUserAdmin)
"""注册评论模型"""
admin_site.register(Comment, CommentAdmin)
"""注册OAuth相关模型"""
admin_site.register(OAuthUser, OAuthUserAdmin)
admin_site.register(OAuthConfig, OAuthConfigAdmin)
"""注册OwnTracks位置追踪模型"""
admin_site.register(OwnTrackLog, OwnTrackLogsAdmin)
"""注册站点和操作日志模型"""
admin_site.register(Site, SiteAdmin)
admin_site.register(LogEntry, LogEntryAdmin)

@ -0,0 +1,22 @@
from django.apps import AppConfig
class DjangoblogAppConfig(AppConfig):
"""
sh:
博客应用的自定义配置类用于管理应用的初始化和生命周期
继承自Django的AppConfig可重写方法实现应用启动时的自定义逻辑
"""
"""指定模型默认的自增字段类型为BigAutoField大整数自增字段支持更大范围的ID"""
default_auto_field = 'django.db.models.BigAutoField'
"""应用的名称(与项目结构中应用的目录名一致)"""
name = 'djangoblog'
def ready(self):
"""
重写AppConfig的ready方法在应用加载就绪时执行初始化操作
该方法会在Django项目启动应用注册表就绪后被调用
"""
super().ready()
"""导入并加载插件(插件加载逻辑放在此处,确保应用就绪后再执行)"""
from .plugin_manage.loader import load_plugins
load_plugins()

@ -0,0 +1,142 @@
import _thread
import logging
import django.dispatch
from django.conf import settings
from django.contrib.admin.models import LogEntry
from django.contrib.auth.signals import user_logged_in, user_logged_out
from django.core.mail import EmailMultiAlternatives
from django.db.models.signals import post_save
from django.dispatch import receiver
from comments.models import Comment
from comments.utils import send_comment_email
from djangoblog.spider_notify import SpiderNotify
from djangoblog.utils import cache, expire_view_cache, delete_sidebar_cache, delete_view_cache
from djangoblog.utils import get_current_site
from oauth.models import OAuthUser
logger = logging.getLogger(__name__)
oauth_user_login_signal = django.dispatch.Signal(['id'])
send_email_signal = django.dispatch.Signal(
['emailto', 'title', 'content'])
@receiver(send_email_signal)
def send_email_signal_handler(sender, **kwargs):
"""
sh:
发送邮件信号的接收器处理邮件发送逻辑并记录发送日志
"""
"""从信号参数中提取邮件信息"""
emailto = kwargs['emailto']
title = kwargs['title']
content = kwargs['content']
"""创建HTML格式邮件"""
msg = EmailMultiAlternatives(
title,
content,
from_email=settings.DEFAULT_FROM_EMAIL,
to=emailto)
msg.content_subtype = "html"
"""初始化邮件发送日志记录"""
from servermanager.models import EmailSendLog
log = EmailSendLog()
log.title = title
log.content = content
log.emailto = ','.join(emailto)
try:
result = msg.send()
log.send_result = result > 0
except Exception as e:
logger.error(f"失败邮箱号: {emailto}, {e}")
log.send_result = False
log.save()
@receiver(oauth_user_login_signal)
def oauth_user_login_signal_handler(sender, **kwargs):
oauth_user_id = kwargs['id']
"""
OAuth用户登录信号的接收器处理用户头像本地化存储
"""
oauthuser = OAuthUser.objects.get(id=oauth_user_id)
site = get_current_site().domain
if oauthuser.picture and oauthuser.picture.find(site) == -1:
from djangoblog.utils import save_user_avatar
oauthuser.picture = save_user_avatar(oauthuser.picture)
oauthuser.save()
delete_sidebar_cache()
@receiver(post_save)
def _notify_baidu_spider(instance, is_update_views):
if not settings.TESTING and not is_update_views:
try:
notify_url = instance.get_full_url()
SpiderNotify.baidu_notify([notify_url])
except Exception as ex:
logger.error("notify spider: %s", ex)
return not is_update_views
def _handle_comment_cache(instance):
path = instance.article.get_absolute_url()
site = get_current_site().domain
if site.find(':') > 0:
site = site[:site.find(':')]
expire_view_cache(
path,
servername=site,
serverport=80,
key_prefix='blogdetail'
)
if cache.get('seo_processor'):
cache.delete('seo_processor')
comment_cache_key = 'article_comments_{id}'.format(id=instance.article.id)
cache.delete(comment_cache_key)
delete_sidebar_cache()
delete_view_cache(prefix='article_comments', keys=[str(instance.article.pk)])
_thread.start_new_thread(send_comment_email, args=(instance,))
def model_post_save_callback(
instance,
update_fields,
**kwargs
):
clearcache = False
"""跳过LogEntry类型的实例"""
if isinstance(instance, LogEntry):
return
"""处理百度蜘蛛通知逻辑"""
if 'get_full_url' in dir(instance):
is_update_views = update_fields == {'views'}
clearcache = _notify_baidu_spider(instance, is_update_views) or clearcache
"""处理评论相关逻辑"""
if isinstance(instance, Comment) and instance.is_enable:
_handle_comment_cache(instance)
clearcache = True
"""最终清理缓存"""
if clearcache:
cache.clear()
@receiver(user_logged_in)
@receiver(user_logged_out)
def user_auth_callback(sender, request, user, **kwargs):
if user and user.username:
logger.info(user)
delete_sidebar_cache()
# cache.clear()

@ -0,0 +1,209 @@
from django.utils.encoding import force_str
from elasticsearch_dsl import Q
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query
from haystack.forms import ModelSearchForm
from haystack.models import SearchResult
from haystack.utils import log as logging
from blog.documents import ArticleDocument, ArticleDocumentManager
from blog.models import Article
logger = logging.getLogger(__name__)
class ElasticSearchBackend(BaseSearchBackend):
"""
sh:
自定义Elasticsearch搜索后端继承Haystack的BaseSearchBackend
实现与Elasticsearch的交互逻辑数据同步搜索查询拼写建议等
"""
def __init__(self, connection_alias, **connection_options):
super(
ElasticSearchBackend,
self).__init__(
connection_alias,
**connection_options)
self.manager = ArticleDocumentManager()
self.include_spelling = True
def _get_models(self, iterable):
"""
将模型实例列表转换为Elasticsearch文档对象
iterable待转换的模型实例集合可为空
"""
models = iterable if iterable and iterable[0] else Article.objects.all()
docs = self.manager.convert_to_doc(models)
return docs
def _create(self, models):
"""创建Elasticsearch索引并批量写入文档"""
self.manager.create_index()
docs = self._get_models(models)
self.manager.rebuild(docs)
def _delete(self, models):
"""从Elasticsearch中删除指定模型对应的文档"""
for m in models:
m.delete()
return True
def _rebuild(self, models):
"""更新Elasticsearch索引增量同步文档"""
models = models if models else Article.objects.all()
docs = self.manager.convert_to_doc(models)
self.manager.update_docs(docs)
def update(self, index, iterable, commit=True):
"""Haystack规范方法更新搜索索引接收Haystack的索引更新请求"""
models = self._get_models(iterable)
self.manager.update_docs(models)
def remove(self, obj_or_string):
"""Haystack规范方法移除索引中指定对象"""
models = self._get_models([obj_or_string])
self._delete(models)
def clear(self, models=None, commit=True):
"""Haystack规范方法清空索引"""
self.remove(None)
@staticmethod
def get_suggestion(query: str) -> str:
"""获取推荐词, 如果没有找到添加原搜索词"""
search = ArticleDocument.search() \
.query("match", body=query) \
.suggest('suggest_search', query, term={'field': 'body'}) \
.execute()
keywords = []
for suggest in search.suggest.suggest_search:
if suggest["options"]:
keywords.append(suggest["options"][0]["text"])
else:
keywords.append(suggest["text"])
return ' '.join(keywords)
@log_query
def search(self, query_string, **kwargs):
"""
核心搜索方法执行Elasticsearch查询并封装结果
query_string用户输入的搜索关键词
kwargs附加参数分页偏移量等
"""
logger.info('search query_string:' + query_string)
start_offset = kwargs.get('start_offset')
end_offset = kwargs.get('end_offset')
# 推荐词搜索
if getattr(self, "is_suggest", None):
suggestion = self.get_suggestion(query_string)
else:
suggestion = query_string
q = Q('bool',
should=[Q('match', body=suggestion), Q('match', title=suggestion)],
minimum_should_match="70%")
search = ArticleDocument.search() \
.query('bool', filter=[q]) \
.filter('term', status='p') \
.filter('term', type='a') \
.source(False)[start_offset: end_offset]
results = search.execute()
hits = results['hits'].total
raw_results = []
for raw_result in results['hits']['hits']:
app_label = 'blog'
model_name = 'Article'
additional_fields = {}
result_class = SearchResult
result = result_class(
app_label,
model_name,
raw_result['_id'],
raw_result['_score'],
**additional_fields)
raw_results.append(result)
facets = {}
spelling_suggestion = None if query_string == suggestion else suggestion
return {
'results': raw_results,
'hits': hits,
'facets': facets,
'spelling_suggestion': spelling_suggestion,
}
class ElasticSearchQuery(BaseSearchQuery):
"""
自定义搜索查询类继承Haystack的BaseSearchQuery
处理查询参数清洗格式转换结果计数等逻辑
"""
def _convert_datetime(self, date):
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S'))
else:
return force_str(date.strftime('%Y%m%d000000'))
def clean(self, query_fragment):
"""
清洗用户输入的搜索关键词
处理保留词特殊字符转义避免查询语法错误
"""
words = query_fragment.split()
cleaned_words = []
for word in words:
if word in self.backend.RESERVED_WORDS:
word = word.replace(word, word.lower())
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = "'%s'" % word
break
cleaned_words.append(word)
return ' '.join(cleaned_words)
def build_query_fragment(self, field, filter_type, value):
"""构建查询片段适配Haystack查询构建逻辑"""
return value.query_string
def get_count(self):
"""获取搜索结果总数"""
results = self.get_results()
return len(results) if results else 0
def get_spelling_suggestion(self, preferred_query=None):
"""获取拼写建议(对接后端的关键词推荐功能)"""
return self._spelling_suggestion
def build_params(self, spelling_query=None):
"""构建查询参数(继承父类逻辑,可自定义扩展)"""
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
return kwargs
class ElasticSearchModelSearchForm(ModelSearchForm):
"""
自定义搜索表单继承Haystack的ModelSearchForm
控制是否启用关键词推荐功能
"""
def search(self):
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
sqs = super().search()
return sqs
class ElasticSearchEngine(BaseEngine):
"""
Haystack搜索引擎入口类关联自定义的Backend和Query
供Django项目配置使用在settings中指定该引擎
"""
backend = ElasticSearchBackend
query = ElasticSearchQuery

@ -0,0 +1,55 @@
from django.contrib.auth import get_user_model
from django.contrib.syndication.views import Feed
from django.utils import timezone
from django.utils.feedgenerator import Rss201rev2Feed
from blog.models import Article
from djangoblog.utils import CommonMarkdown
class DjangoBlogFeed(Feed):
"""
sh:
自定义博客RSS订阅源生成类继承Django的Feed类
用于生成符合RSS 2.0规范的博客内容订阅源
"""
feed_type = Rss201rev2Feed
description = '大巧无工,重剑无锋.'
title = "且听风吟 大巧无工,重剑无锋. "
link = "/feed/"
def author_name(self):
"""返回订阅源的作者名称(此处取系统中第一个用户的昵称)"""
return get_user_model().objects.first().nickname
def author_link(self):
"""返回作者的个人主页链接"""
return get_user_model().objects.first().get_absolute_url()
def items(self):
"""
定义RSS源中包含的内容项
返回最新发布的5篇文章类型为'article'状态为'published'
"""
return Article.objects.filter(type='a', status='p').order_by('-pub_time')[:5]
def item_title(self, item):
"""返回单个内容项(文章)的标题"""
return item.title
def item_description(self, item):
"""
返回单个内容项文章的描述
将Markdown格式的文章内容转换为HTML格式用于RSS展示
"""
return CommonMarkdown.get_markdown(item.body)
def feed_copyright(self):
"""返回RSS源的版权信息包含当前年份"""
now = timezone.now()
return "Copyright© {year} 且听风吟".format(year=now.year)
def item_link(self, item):
return item.get_absolute_url()
def item_guid(self, item):
return

@ -0,0 +1,112 @@
from django.contrib import admin
from django.contrib.admin.models import DELETION
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse, NoReverseMatch
from django.utils.encoding import force_str
from django.utils.html import escape
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
class LogEntryAdmin(admin.ModelAdmin):
"""
sh:
自定义Admin操作日志管理类继承自ModelAdmin
用于在Django Admin后台展示和管理系统操作日志LogEntry模型
优化日志展示格式提供关联对象/用户的快速链接限制操作权限
"""
list_filter = [
'content_type'
]
search_fields = [
'object_repr',
'change_message'
]
list_display_links = [
'action_time',
'get_change_message',
]
list_display = [
'action_time',
'user_link',
'content_type',
'object_link',
'get_change_message',
]
def has_add_permission(self, request):
"""禁用添加操作:操作日志为系统自动生成,不允许手动添加"""
return False
def has_change_permission(self, request, obj=None):
"""
限制修改权限仅超级用户或拥有change_logentry权限的用户可查看
且禁止POST请求防止通过表单提交修改
"""
return (
request.user.is_superuser or
request.user.has_perm('admin.change_logentry')
) and request.method != 'POST'
def has_delete_permission(self, request, obj=None):
"""禁用删除操作:操作日志需保留,不允许手动删除"""
return False
def object_link(self, obj):
"""
自定义列表字段操作对象的名称带跳转链接
非删除操作时生成对象的Admin编辑页链接删除操作仅显示对象名称
"""
object_link = escape(obj.object_repr)
content_type = obj.content_type
if obj.action_flag != DELETION and content_type is not None:
# try returning an actual link instead of object repr string
try:
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.object_id]
)
object_link = '<a href="{}">{}</a>'.format(url, object_link)
except NoReverseMatch:
pass
return mark_safe(object_link)
object_link.admin_order_field = 'object_repr'
object_link.short_description = _('object')
def user_link(self, obj):
"""
自定义列表字段操作用户的名称带跳转链接
生成用户的Admin编辑页链接方便快速查看用户信息
"""
content_type = ContentType.objects.get_for_model(type(obj.user))
user_link = escape(force_str(obj.user))
try:
# try returning an actual link instead of object repr string
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.user.pk]
)
user_link = '<a href="{}">{}</a>'.format(url, user_link)
except NoReverseMatch:
pass
return mark_safe(user_link)
user_link.admin_order_field = 'user'
user_link.short_description = _('user')
def get_queryset(self, request):
"""优化查询性能预加载content_type关联数据减少数据库查询次数"""
queryset = super(LogEntryAdmin, self).get_queryset(request)
return queryset.prefetch_related('content_type')
def get_actions(self, request):
"""移除批量删除操作:操作日志不允许批量删除"""
actions = super(LogEntryAdmin, self).get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions

@ -0,0 +1,42 @@
import logging
logger = logging.getLogger(__name__)
class BasePlugin:
# 插件元数据
PLUGIN_NAME = None
PLUGIN_DESCRIPTION = None
PLUGIN_VERSION = None
def __init__(self):
if not all([self.PLUGIN_NAME, self.PLUGIN_DESCRIPTION, self.PLUGIN_VERSION]):
raise ValueError("Plugin metadata (PLUGIN_NAME, PLUGIN_DESCRIPTION, PLUGIN_VERSION) must be defined.")
self.init_plugin()
self.register_hooks()
def init_plugin(self):
"""
sh:
插件初始化逻辑
子类可以重写此方法来实现特定的初始化操作
"""
logger.info(f'{self.PLUGIN_NAME} initialized.')
def register_hooks(self):
"""
注册插件钩子
子类可以重写此方法来注册特定的钩子
"""
pass
def get_plugin_info(self):
"""
获取插件信息
:return: 包含插件元数据的字典
"""
return {
'name': self.PLUGIN_NAME,
'description': self.PLUGIN_DESCRIPTION,
'version': self.PLUGIN_VERSION
}

@ -0,0 +1,11 @@
# 常量:文章详情加载事件标识
ARTICLE_DETAIL_LOAD = 'article_detail_load'
# 常量:文章创建事件标识
ARTICLE_CREATE = 'article_create'
# 常量:文章更新事件标识
ARTICLE_UPDATE = 'article_update'
# 常量:文章删除事件标识
ARTICLE_DELETE = 'article_delete'
# 常量:文章内容钩子名称(通常用于钩子函数注册,处理文章内容相关逻辑)
ARTICLE_CONTENT_HOOK_NAME = "the_content"

@ -0,0 +1,45 @@
import logging
logger = logging.getLogger(__name__)
_hooks = {}
def register(hook_name: str, callback: callable):
"""
sh:
注册一个钩子回调
"""
if hook_name not in _hooks:
_hooks[hook_name] = []
_hooks[hook_name].append(callback)
logger.debug(f"Registered hook '{hook_name}' with callback '{callback.__name__}'")
def run_action(hook_name: str, *args, **kwargs):
"""
执行一个 Action Hook
它会按顺序执行所有注册到该钩子上的回调函数
"""
if hook_name in _hooks:
logger.debug(f"Running action hook '{hook_name}'")
for callback in _hooks[hook_name]:
try:
callback(*args, **kwargs)
except Exception as e:
logger.error(f"Error running action hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
def apply_filters(hook_name: str, value, *args, **kwargs):
"""
执行一个 Filter Hook
它会把 value 依次传递给所有注册的回调函数进行处理
"""
if hook_name in _hooks:
logger.debug(f"Applying filter hook '{hook_name}'")
for callback in _hooks[hook_name]:
try:
value = callback(value, *args, **kwargs)
except Exception as e:
logger.error(f"Error applying filter hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
return value

@ -0,0 +1,27 @@
import os
import logging
from django.conf import settings
logger = logging.getLogger(__name__)
def load_plugins():
"""
sh:
动态加载并初始化位于'plugins'目录中的插件
此函数旨在Django应用注册表就绪时被调用通常在项目启动阶段
加载逻辑说明
1. 遍历配置中激活的插件列表settings.ACTIVE_PLUGINS
2. 检查插件目录是否存在且包含必要的入口文件plugin.py
3. 动态导入符合条件的插件并记录加载状态
"""
for plugin_name in settings.ACTIVE_PLUGINS:
plugin_path = os.path.join(settings.PLUGINS_DIR, plugin_name)
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, 'plugin.py')):
try:
__import__(f'plugins.{plugin_name}.plugin')
logger.info(f"Successfully loaded plugin: {plugin_name}")
except ImportError as e:
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)

@ -0,0 +1,317 @@
import os
import sys
from pathlib import Path
from django.utils.translation import gettext_lazy as _
def env_to_bool(env, default):
"""
sh:
环境变量转换为布尔值的工具函数
env: 环境变量名
default: 环境变量不存在时的默认值
"""
str_val = os.environ.get(env)
return default if str_val is None else str_val == 'True'
"""项目根目录(当前文件所在目录的父级目录)"""
BASE_DIR = Path(__file__).resolve().parent.parent
SECRET_KEY = os.environ.get(
'DJANGO_SECRET_KEY') or 'n9ceqv38)#&mwuat@(mjb_p%em$e8$qyr#fw9ot!=ba6lijx-6'
DEBUG = env_to_bool('DJANGO_DEBUG', True)
TESTING = len(sys.argv) > 1 and sys.argv[1] == 'test'
ALLOWED_HOSTS = ['*', '127.0.0.1', 'example.com']
CSRF_TRUSTED_ORIGINS = ['http://example.com']
INSTALLED_APPS = [
"""Django内置Admin使用简化配置SimpleAdminConfig"""
'django.contrib.admin.apps.SimpleAdminConfig',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django.contrib.sites',
'django.contrib.sitemaps',
'mdeditor',
'haystack',
'blog',
'accounts',
'comments',
'oauth',
'servermanager',
'owntracks',
'compressor',
'djangoblog'
]
"""中间件配置(请求/响应处理流水线)"""
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.locale.LocaleMiddleware',
'django.middleware.gzip.GZipMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django.middleware.http.ConditionalGetMiddleware',
'blog.middleware.OnlineMiddleware'
]
ROOT_URLCONF = 'djangoblog.urls'
"""模板配置"""
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [os.path.join(BASE_DIR, 'templates')],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
'blog.context_processors.seo_processor'
],
},
},
]
WSGI_APPLICATION = 'djangoblog.wsgi.application'
"""数据库配置MySQL"""
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'djangoblog',
'USER': 'root',
'PASSWORD': '2315304313',
'HOST': '127.0.0.1',
'PORT': int(3306),
'OPTIONS': {
'charset': 'utf8mb4'},
}}
"""密码验证规则"""
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
LANGUAGES = (
('en', _('English')),
('zh-hans', _('Simplified Chinese')),
('zh-hant', _('Traditional Chinese')),
)
LOCALE_PATHS = (
os.path.join(BASE_DIR, 'locale'),
)
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_L10N = True
USE_TZ = False
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.whoosh_cn_backend.WhooshEngine',
'PATH': os.path.join(os.path.dirname(__file__), 'whoosh_index'),
},
}
HAYSTACK_SIGNAL_PROCESSOR = 'haystack.signals.RealtimeSignalProcessor'
AUTHENTICATION_BACKENDS = [
'accounts.user_login_backend.EmailOrUsernameModelBackend']
STATIC_ROOT = os.path.join(BASE_DIR, 'collectedstatic')
STATIC_URL = '/static/'
STATICFILES = os.path.join(BASE_DIR, 'static')
AUTH_USER_MODEL = 'accounts.BlogUser'
LOGIN_URL = '/login/'
TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
DATE_TIME_FORMAT = '%Y-%m-%d'
BOOTSTRAP_COLOR_TYPES = [
'default', 'primary', 'success', 'info', 'warning', 'danger'
]
PAGINATE_BY = 10
CACHE_CONTROL_MAX_AGE = 2592000
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'TIMEOUT': 10800,
'LOCATION': 'unique-snowflake',
}
}
"""使用redis作为缓存"""
if os.environ.get("DJANGO_REDIS_URL"):
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': f'redis://{os.environ.get("DJANGO_REDIS_URL")}',
}
}
SITE_ID = 1
BAIDU_NOTIFY_URL = os.environ.get('DJANGO_BAIDU_NOTIFY_URL') \
or 'http://data.zz.baidu.com/urls?site=https://www.lylinux.net&token=1uAOGrMsUm5syDGn'
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_USE_TLS = env_to_bool('DJANGO_EMAIL_TLS', False)
EMAIL_USE_SSL = env_to_bool('DJANGO_EMAIL_SSL', True)
EMAIL_HOST = os.environ.get('DJANGO_EMAIL_HOST') or 'smtp.mxhichina.com'
EMAIL_PORT = int(os.environ.get('DJANGO_EMAIL_PORT') or 465)
EMAIL_HOST_USER = os.environ.get('DJANGO_EMAIL_USER')
EMAIL_HOST_PASSWORD = os.environ.get('DJANGO_EMAIL_PASSWORD')
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
SERVER_EMAIL = EMAIL_HOST_USER
ADMINS = [('admin', os.environ.get('DJANGO_ADMIN_EMAIL') or 'admin@admin.com')]
WXADMIN = os.environ.get(
'DJANGO_WXADMIN_PASSWORD') or '995F03AC401D6CABABAEF756FC4D43C7'
LOG_PATH = os.path.join(BASE_DIR, 'logs')
if not os.path.exists(LOG_PATH):
os.makedirs(LOG_PATH, exist_ok=True)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'root': {
'level': 'INFO',
'handlers': ['console', 'log_file'],
},
'formatters': {
'verbose': {
'format': '[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d %(module)s] %(message)s',
}
},
'filters': {
'require_debug_false': {
'()': 'django.utils.log.RequireDebugFalse',
},
'require_debug_true': {
'()': 'django.utils.log.RequireDebugTrue',
},
},
'handlers': {
'log_file': {
'level': 'INFO',
'class': 'logging.handlers.TimedRotatingFileHandler',
'filename': os.path.join(LOG_PATH, 'djangoblog.log'),
'when': 'D',
'formatter': 'verbose',
'interval': 1,
'delay': True,
'backupCount': 5,
'encoding': 'utf-8'
},
'console': {
'level': 'DEBUG',
'filters': ['require_debug_true'],
'class': 'logging.StreamHandler',
'formatter': 'verbose'
},
'null': {
'class': 'logging.NullHandler',
},
'mail_admins': {
'level': 'ERROR',
'filters': ['require_debug_false'],
'class': 'django.utils.log.AdminEmailHandler'
}
},
'loggers': {
'djangoblog': {
'handlers': ['log_file', 'console'],
'level': 'INFO',
'propagate': True,
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
}
}
}
STATICFILES_FINDERS = (
'django.contrib.staticfiles.finders.FileSystemFinder',
'django.contrib.staticfiles.finders.AppDirectoriesFinder',
# other
'compressor.finders.CompressorFinder',
)
COMPRESS_ENABLED = True
COMPRESS_CSS_FILTERS = [
'compressor.filters.css_default.CssAbsoluteFilter',
'compressor.filters.cssmin.CSSMinFilter'
]
COMPRESS_JS_FILTERS = [
'compressor.filters.jsmin.JSMinFilter'
]
MEDIA_ROOT = os.path.join(BASE_DIR, 'uploads')
MEDIA_URL = '/media/'
X_FRAME_OPTIONS = 'SAMEORIGIN'
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
if os.environ.get('DJANGO_ELASTICSEARCH_HOST'):
ELASTICSEARCH_DSL = {
'default': {
'hosts': os.environ.get('DJANGO_ELASTICSEARCH_HOST')
},
}
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.elasticsearch_backend.ElasticSearchEngine',
},
}
PLUGINS_DIR = BASE_DIR / 'plugins'
ACTIVE_PLUGINS = [
'article_copyright',
'reading_time',
'external_links',
'view_count',
'seo_optimizer'
]

@ -0,0 +1,84 @@
from django.contrib.sitemaps import Sitemap
from django.urls import reverse
from blog.models import Article, Category, Tag
class StaticViewSitemap(Sitemap):
"""
sh:
静态页面站点地图类用于收录网站静态页面无动态数据的页面
"""
priority = 0.5
changefreq = 'daily'
def items(self):
"""返回需要收录的静态页面URL名称对应urls.py中的name属性"""
return ['blog:index', ]
def location(self, item):
"""根据URL名称反向解析出完整URL"""
return reverse(item)
class ArticleSiteMap(Sitemap):
"""
文章页面站点地图类收录所有已发布的博客文章
"""
changefreq = "monthly"
priority = "0.6"
def items(self):
"""返回需要收录的文章仅筛选状态为已发布status='p')的文章"""
return Article.objects.filter(status='p')
def lastmod(self, obj):
"""返回文章的最后修改时间,帮助搜索引擎识别更新内容"""
return obj.last_modify_time
class CategorySiteMap(Sitemap):
"""
分类页面站点地图类收录所有文章分类页面
"""
changefreq = "Weekly"
priority = "0.6"
def items(self):
"""返回所有分类实例"""
return Category.objects.all()
def lastmod(self, obj):
"""返回分类的最后修改时间"""
return obj.last_modify_time
class TagSiteMap(Sitemap):
"""
标签页面站点地图类收录所有文章标签页面
"""
changefreq = "Weekly"
priority = "0.3"
def items(self):
"""返回所有标签实例"""
return Tag.objects.all()
def lastmod(self, obj):
"""返回所有标签实例"""
return obj.last_modify_time
class UserSiteMap(Sitemap):
"""
用户页面站点地图类收录所有发布过文章的作者页面
"""
changefreq = "Weekly"
priority = "0.3"
def items(self):
"""返回所有发布过文章的唯一作者(去重)"""
return list({x.author for x in Article.objects.all()})
def lastmod(self, obj):
"""返回作者的注册时间(作为页面更新时间)"""
return obj.date_joined

@ -0,0 +1,35 @@
import logging
import requests
from django.conf import settings
logger = logging.getLogger(__name__)
class SpiderNotify():
"""
sh:
搜索引擎推送工具类用于主动向搜索引擎目前支持百度提交网站URL
实现新内容发布后快速通知搜索引擎抓取提升收录效率
"""
@staticmethod
def baidu_notify(urls):
"""
向百度搜索引擎推送URL批量
依赖settings中配置的BAIDU_NOTIFY_URL百度站长平台的推送接口地址
urls: 待推送的URL列表['https://example.com/article/1/', ...]
"""
try:
data = '\n'.join(urls)
result = requests.post(settings.BAIDU_NOTIFY_URL, data=data)
logger.info(result.text)
except Exception as e:
logger.error(e)
@staticmethod
def notify(url):
"""
通用推送方法兼容单URL推送场景
内部调用百度推送方法可扩展支持其他搜索引擎
url: 待推送的单个URL或URL列表
"""
SpiderNotify.baidu_notify(url)

@ -0,0 +1,41 @@
from django.test import TestCase
from djangoblog.utils import get_sha256, CommonMarkdown
class DjangoBlogTest(TestCase):
"""
sh:
项目核心工具类单元测试类验证工具函数的功能正确性
继承Django的TestCase提供测试环境和断言方法
"""
def setUp(self):
"""
测试前置准备方法在每个测试方法执行前运行
此处原代码抛出未实现异常实际使用时可添加初始化逻辑如创建测试数据
"""
raise NotImplementedError("setUp method is not supported yet")
def test_utils(self):
md5 = get_sha256('test')
self.assertIsNotNone(md5)
c = CommonMarkdown.get_markdown('''
# Title1
```python
import os
```
[url](https://www.lylinux.net/)
[ddd](http://www.baidu.com)
''')
self.assertIsNotNone(c)
d = {
'd': 'key1',
'd2': 'key2'
}
data = parse_dict_to_url(d)
self.assertIsNotNone(data)

@ -0,0 +1,53 @@
from django.conf import settings
from django.conf.urls.i18n import i18n_patterns
from django.conf.urls.static import static
from django.contrib.sitemaps.views import sitemap
from django.urls import path, include
from django.urls import re_path
from haystack.views import search_view_factory
from blog.views import EsSearchView
from djangoblog.admin_site import admin_site
from djangoblog.elasticsearch_backend import ElasticSearchModelSearchForm
from djangoblog.feeds import DjangoBlogFeed
from djangoblog.sitemap import ArticleSiteMap, CategorySiteMap, StaticViewSitemap, TagSiteMap, UserSiteMap
"""sh:站点地图集合:关联各类型页面的站点地图类"""
sitemaps = {
'blog': ArticleSiteMap,
'Category': CategorySiteMap,
'Tag': TagSiteMap,
'User': UserSiteMap,
'static': StaticViewSitemap
}
"""自定义错误页面视图覆盖Django默认错误页"""
handler404 = 'blog.views.page_not_found_view'
handler500 = 'blog.views.server_error_view'
handle403 = 'blog.views.permission_denied_view'
"""基础URL配置无语言前缀"""
urlpatterns = [
path('i18n/', include('django.conf.urls.i18n')),
]
"""多语言URL配置自动添加语言前缀如/en/、/zh-hans/"""
urlpatterns += i18n_patterns(
re_path(r'^admin/', admin_site.urls),
re_path(r'', include('blog.urls', namespace='blog')),
re_path(r'mdeditor/', include('mdeditor.urls')),
re_path(r'', include('comments.urls', namespace='comment')),
re_path(r'', include('accounts.urls', namespace='account')),
re_path(r'', include('oauth.urls', namespace='oauth')),
re_path(r'^sitemap\.xml$', sitemap, {'sitemaps': sitemaps},
name='django.contrib.sitemaps.views.sitemap'),
re_path(r'^feed/$', DjangoBlogFeed()),
re_path(r'^rss/$', DjangoBlogFeed()),
re_path('^search', search_view_factory(view_class=EsSearchView, form_class=ElasticSearchModelSearchForm),
name='search'),
re_path(r'', include('servermanager.urls', namespace='servermanager')),
re_path(r'', include('owntracks.urls', namespace='owntracks'))
, prefix_default_language=False) + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL,
document_root=settings.MEDIA_ROOT)

@ -0,0 +1,271 @@
import logging
import os
import random
import string
import uuid
from hashlib import sha256
import bleach
import markdown
import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""
sh:
获取当前最大的文章ID和评论ID
用于数据统计或初始化场景
"""
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""
对字符串进行SHA256加密
:param str: 待加密字符串
:return: 加密后的十六进制字符串64
"""
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
"""
缓存装饰器为函数添加缓存功能减少重复计算/数据库查询
:param expiration: 缓存过期时间默认3分钟
:return: 装饰器函数
"""
def wrapper(func):
def news(*args, **kwargs):
try:
view = args[0]
key = view.get_cache_key()
except Exception:
key = None
if not key:
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
value = cache.get(key)
if value is not None:
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
logger.debug(
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
value = func(*args, **kwargs)
if value is None:
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
:param path:url路径
:param servername:host
:param serverport:端口
:param key_prefix:前缀
:return:是否成功
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
return True
return False
@cache_decorator()
def get_current_site():
"""获取当前站点信息从Django的Site模型"""
site = Site.objects.get_current()
return site
class CommonMarkdown:
"""Markdown解析工具类将Markdown文本转为HTML并支持生成目录"""
@staticmethod
def _convert_markdown(value):
"""
内部转换方法使用markdown库解析文本
:param value: Markdown格式文本
:return: (转换后的HTML内容, 目录HTML)
"""
md = markdown.Markdown(
extensions=[
'extra',
'codehilite',
'toc',
'tables',
]
)
body = md.convert(value)
toc = md.toc
return body, toc
@staticmethod
def get_markdown_with_toc(value):
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
body, _ = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""
发送邮件通过信号机制解耦避免直接依赖邮件发送逻辑
:param emailto: 收件人列表
:param title: 邮件标题
:param content: 邮件内容HTML格式
"""
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
def generate_code() -> str:
"""生成随机数验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
"""
将字典转换为URL查询参数字符串
:param dict: 键值对字典{'name': 'test', 'age': 18}
:return: URL编码后的参数串'name=test&age=18'
"""
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
def get_blog_setting():
"""
获取博客系统设置带缓存
若未初始化设置自动创建默认配置
"""
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value)
return value
def save_user_avatar(url):
"""
保存用户头像
:param url:头像url
:return: 本地路径
"""
logger.info(url)
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir)
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
save_filename = str(uuid.uuid4().hex) + ext
logger.info('保存用户头像:' + basedir + save_filename)
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename)
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png')
def delete_sidebar_cache():
"""删除侧边栏相关缓存(当数据更新时调用,确保显示最新内容)"""
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
cache.delete(k)
def delete_view_cache(prefix, keys):
"""
删除模板片段缓存
:param prefix: 缓存前缀与模板中cache标签的前缀一致
:param keys: 缓存键参数与模板中cache标签的参数一致
"""
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
"""获取静态资源基础URL优先使用配置否则自动生成"""
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
def sanitize_html(html):
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)

@ -0,0 +1,944 @@
# encoding: utf-8
from __future__ import absolute_import, division, print_function, unicode_literals
import json
import os
import re
import shutil
import threading
import warnings
import six
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from datetime import datetime
from django.utils.encoding import force_str
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, EmptyResults, log_query
from haystack.constants import DJANGO_CT, DJANGO_ID, ID
from haystack.exceptions import MissingDependency, SearchBackendError, SkipDocument
from haystack.inputs import Clean, Exact, PythonData, Raw
from haystack.models import SearchResult
from haystack.utils import get_identifier, get_model_ct
from haystack.utils import log as logging
from haystack.utils.app_loading import haystack_get_model
from jieba.analyse import ChineseAnalyzer
from whoosh import index
from whoosh.analysis import StemmingAnalyzer
from whoosh.fields import BOOLEAN, DATETIME, IDLIST, KEYWORD, NGRAM, NGRAMWORDS, NUMERIC, Schema, TEXT
from whoosh.fields import ID as WHOOSH_ID
from whoosh.filedb.filestore import FileStorage, RamStorage
from whoosh.highlight import ContextFragmenter, HtmlFormatter
from whoosh.highlight import highlight as whoosh_highlight
from whoosh.qparser import QueryParser
from whoosh.searching import ResultsPage
from whoosh.writing import AsyncWriter
try:
import whoosh
except ImportError:
raise MissingDependency(
"The 'whoosh' backend requires the installation of 'Whoosh'. Please refer to the documentation.")
if not hasattr(whoosh, '__version__') or whoosh.__version__ < (2, 5, 0):
raise MissingDependency(
"The 'whoosh' backend requires version 2.5.0 or greater.")
"""日期时间格式正则用于解析Whoosh返回的日期字符串"""
DATETIME_REGEX = re.compile(
'^(?P<year>\d{4})-(?P<month>\d{2})-(?P<day>\d{2})T(?P<hour>\d{2}):(?P<minute>\d{2}):(?P<second>\d{2})(\.\d{3,6}Z?)?$')
"""线程本地存储用于RAM缓存"""
LOCALS = threading.local()
LOCALS.RAM_STORE = None
class WhooshHtmlFormatter(HtmlFormatter):
"""
自定义HTML高亮格式化器简化Whoosh默认格式化器
确保与其他搜索后端SolrElasticsearch的高亮结果格式一致
"""
template = '<%(tag)s>%(t)s</%(tag)s>'
class WhooshSearchBackend(BaseSearchBackend):
RESERVED_WORDS = (
'AND',
'NOT',
'OR',
'TO',
)
RESERVED_CHARACTERS = (
'\\', '+', '-', '&&', '||', '!', '(', ')', '{', '}',
'[', ']', '^', '"', '~', '*', '?', ':', '.',
)
def __init__(self, connection_alias, **connection_options):
super(
WhooshSearchBackend,
self).__init__(
connection_alias,
**connection_options)
self.setup_complete = False
self.use_file_storage = True
self.post_limit = getattr(
connection_options,
'POST_LIMIT',
128 * 1024 * 1024)
self.path = connection_options.get('PATH')
"""若配置为非文件存储则使用RAM存储"""
if connection_options.get('STORAGE', 'file') != 'file':
self.use_file_storage = False
"""文件存储模式下必须指定路径"""
if self.use_file_storage and not self.path:
raise ImproperlyConfigured(
"You must specify a 'PATH' in your settings for connection '%s'." %
connection_alias)
self.log = logging.getLogger('haystack')
def setup(self):
"""
延迟初始化在首次使用时创建索引存储和Schema
避免项目启动时过早加载资源
"""
from haystack import connections
new_index = False
if self.use_file_storage and not os.path.exists(self.path):
os.makedirs(self.path)
new_index = True
if self.use_file_storage and not os.access(self.path, os.W_OK):
raise IOError(
"The path to your Whoosh index '%s' is not writable for the current user/group." %
self.path)
if self.use_file_storage:
self.storage = FileStorage(self.path)
else:
global LOCALS
if getattr(LOCALS, 'RAM_STORE', None) is None:
LOCALS.RAM_STORE = RamStorage()
self.storage = LOCALS.RAM_STORE
self.content_field_name, self.schema = self.build_schema(
connections[self.connection_alias].get_unified_index().all_searchfields())
self.parser = QueryParser(self.content_field_name, schema=self.schema)
if new_index is True:
self.index = self.storage.create_index(self.schema)
else:
try:
self.index = self.storage.open_index(schema=self.schema)
except index.EmptyIndexError:
self.index = self.storage.create_index(self.schema)
self.setup_complete = True
def _create_field(self, field_class):
"""根据字段类型创建对应的Whoosh字段"""
if field_class.is_multivalued:
if field_class.indexed is False:
return IDLIST(stored=True, field_boost=field_class.boost)
else:
return KEYWORD(stored=True, commas=True, scorable=True, field_boost=field_class.boost)
elif field_class.field_type in ['date', 'datetime']:
return DATETIME(stored=field_class.stored, sortable=True)
elif field_class.field_type == 'integer':
return NUMERIC(stored=field_class.stored, numtype=int, field_boost=field_class.boost)
elif field_class.field_type == 'float':
return NUMERIC(stored=field_class.stored, numtype=float, field_boost=field_class.boost)
elif field_class.field_type == 'boolean':
return BOOLEAN(stored=field_class.stored)
elif field_class.field_type == 'ngram':
return NGRAM(minsize=3, maxsize=15, stored=field_class.stored, field_boost=field_class.boost)
elif field_class.field_type == 'edge_ngram':
return NGRAMWORDS(minsize=2, maxsize=15, at='start', stored=field_class.stored,
field_boost=field_class.boost)
else:
return TEXT(stored=True, analyzer=ChineseAnalyzer(), field_boost=field_class.boost, sortable=True)
def build_schema(self, fields):
"""初始化固定字段"""
schema_fields = {
ID: WHOOSH_ID(stored=True, unique=True),
DJANGO_CT: WHOOSH_ID(stored=True),
DJANGO_ID: WHOOSH_ID(stored=True),
}
initial_key_count = len(schema_fields)
content_field_name = ''
"""遍历并创建动态字段"""
for field_name, field_class in fields.items():
field = _create_field(field_class)
schema_fields[field_class.index_fieldname] = field
if field_class.document is True:
content_field_name = field_class.index_fieldname
field.spelling = True
"""校验字段数量"""
if len(schema_fields) <= initial_key_count:
raise SearchBackendError(
"No fields were found in any search_indexes. Please correct this before attempting to search."
)
return (content_field_name, Schema(**schema_fields))
def _process_doc(self, doc):
"""处理文档字段的编码和boost字段清理"""
for key in doc:
doc[key] = self._from_python(doc[key])
if 'boost' in doc:
del doc['boost']
return doc
def _handle_update_error(self, e, obj, index):
"""处理更新文档时的异常"""
if not self.silently_fail:
raise
self.log.error(
u"%s while preparing object for update" % e.__class__.__name__,
exc_info=True,
extra={
"data": {
"index": index,
"object": get_identifier(obj)}})
def update(self, index, iterable, commit=True):
if not self.setup_complete:
self.setup()
self.index = self.index.refresh()
writer = AsyncWriter(self.index)
for obj in iterable:
try:
doc = index.full_prepare(obj)
except SkipDocument:
self.log.debug(u"Indexing for object `%s` skipped", obj)
continue # 跳过当前对象,处理下一个
# 处理文档格式
processed_doc = self._process_doc(doc)
# 尝试更新文档
try:
writer.update_document(**processed_doc)
except Exception as e:
self._handle_update_error(e, obj, index)
if len(iterable) > 0:
writer.commit()
def remove(self, obj_or_string, commit=True):
if not self.setup_complete:
self.setup()
self.index = self.index.refresh()
whoosh_id = get_identifier(obj_or_string)
try:
self.index.delete_by_query(
q=self.parser.parse(
u'%s:"%s"' %
(ID, whoosh_id)))
except Exception as e:
if not self.silently_fail:
raise
self.log.error(
"Failed to remove document '%s' from Whoosh: %s",
whoosh_id,
e,
exc_info=True)
def clear(self, models=None, commit=True):
if not self.setup_complete:
self.setup()
self.index = self.index.refresh()
if models is not None:
assert isinstance(models, (list, tuple))
try:
if models is None:
self.delete_index()
else:
models_to_delete = []
for model in models:
models_to_delete.append(
u"%s:%s" %
(DJANGO_CT, get_model_ct(model)))
self.index.delete_by_query(
q=self.parser.parse(
u" OR ".join(models_to_delete)))
except Exception as e:
if not self.silently_fail:
raise
if models is not None:
self.log.error(
"Failed to clear Whoosh index of models '%s': %s",
','.join(models_to_delete),
e,
exc_info=True)
else:
self.log.error(
"Failed to clear Whoosh index: %s", e, exc_info=True)
def delete_index(self):
if self.use_file_storage and os.path.exists(self.path):
shutil.rmtree(self.path)
elif not self.use_file_storage:
self.storage.clean()
self.setup()
def optimize(self):
if not self.setup_complete:
self.setup()
self.index = self.index.refresh()
self.index.optimize()
def calculate_page(self, start_offset=0, end_offset=None):
if end_offset is not None and end_offset <= 0:
end_offset = 1
page_num = 0
if end_offset is None:
end_offset = 1000000
if start_offset is None:
start_offset = 0
page_length = end_offset - start_offset
if page_length and page_length > 0:
page_num = int(start_offset / page_length)
page_num += 1
return page_num, page_length
@log_query
def _handle_sorting(self, sort_by):
"""处理排序逻辑,返回排序字段和是否逆序"""
if sort_by is None:
return None, False
reverse_counter = sum(1 for order_by in sort_by if order_by.startswith('-'))
if reverse_counter and reverse_counter != len(sort_by):
raise SearchBackendError("Whoosh requires all order_by fields to use the same sort direction")
sort_by_list = [order_by[1:] if order_by.startswith('-') else order_by for order_by in sort_by]
reverse = sort_by[0].startswith('-')
return sort_by_list[0], reverse
def _handle_model_filters(self, models, limit_to_registered_models):
"""处理模型过滤逻辑,返回窄查询集合"""
if limit_to_registered_models is None:
limit_to_registered_models = getattr(settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)
model_choices = []
if models and len(models):
model_choices = sorted(get_model_ct(model) for model in models)
elif limit_to_registered_models:
model_choices = self.build_models_list()
narrow_queries = set()
if len(model_choices) > 0:
narrow_queries.add(' OR '.join(['%s:%s' % (DJANGO_CT, rm) for rm in model_choices]))
return narrow_queries
def _process_narrow_queries(self, narrow_queries):
"""处理窄查询,返回过滤后的结果集"""
if not narrow_queries:
return None
narrow_searcher = self.index.searcher()
narrowed_results = None
for nq in narrow_queries:
recent_narrowed_results = narrow_searcher.search(self.parser.parse(force_str(nq)), limit=None)
if len(recent_narrowed_results) <= 0:
narrow_searcher.close()
return {
'results': [],
'hits': 0,
}
if narrowed_results:
narrowed_results.filter(recent_narrowed_results)
else:
narrowed_results = recent_narrowed_results
return narrowed_results, narrow_searcher
def _execute_search(self, parsed_query, page_num, page_length, sort_by, reverse, narrowed_results):
"""执行搜索并处理原始结果"""
searcher = self.index.searcher()
search_kwargs = {
'pagelen': page_length,
'sortedby': sort_by,
'reverse': reverse,
}
if narrowed_results is not None:
search_kwargs['filter'] = narrowed_results
try:
raw_page = searcher.search_page(parsed_query, page_num, **search_kwargs)
except ValueError:
if not self.silently_fail:
raise
searcher.close()
return {
'results': [],
'hits': 0,
'spelling_suggestion': None,
}
if raw_page.pagenum < page_num:
searcher.close()
return {
'results': [],
'hits': 0,
'spelling_suggestion': None,
}
results = self._process_results(
raw_page,
highlight=highlight,
query_string=query_string,
spelling_query=spelling_query,
result_class=result_class)
searcher.close()
return results
def _get_spelling_suggestion(self, query_string):
"""获取拼写建议"""
if not self.include_spelling:
return None
return self.create_spelling_suggestion(query_string) if query_string else None
def _return_empty_results(self, query_string):
"""返回空结果集及拼写建议"""
spelling_suggestion = self._get_spelling_suggestion(query_string)
return {
'results': [],
'hits': 0,
'spelling_suggestion': spelling_suggestion,
}
def search(
self,
query_string,
sort_by=None,
start_offset=0,
end_offset=None,
fields='',
highlight=False,
facets=None,
date_facets=None,
query_facets=None,
narrow_queries=None,
spelling_query=None,
within=None,
dwithin=None,
distance_point=None,
models=None,
limit_to_registered_models=None,
result_class=None,
**kwargs
):
if not self.setup_complete:
self.setup()
"""处理空查询和短查询"""
if len(query_string) == 0 or (len(query_string) <= 1 and query_string != u'*'):
return self._return_empty_results(query_string)
query_string = force_str(query_string)
"""处理排序"""
sort_field, reverse = self._handle_sorting(sort_by)
"""处理分面警告"""
for facet_type in [facets, date_facets, query_facets]:
if facet_type is not None:
warnings.warn(f"Whoosh does not handle {facet_type.__class__.__name__} faceting.", Warning,
stacklevel=2)
"""处理模型过滤和窄查询"""
model_narrow_queries = self._handle_model_filters(models, limit_to_registered_models)
if narrow_queries is None:
narrow_queries = set()
narrow_queries.update(model_narrow_queries)
narrowed_results, narrow_searcher = self._process_narrow_queries(narrow_queries)
if isinstance(narrowed_results, dict):
return narrowed_results
"""执行搜索前的空索引校验"""
self.index = self.index.refresh()
if not self.index.doc_count():
return self._return_empty_results(query_string)
"""执行搜索"""
searcher = self.index.searcher()
parsed_query = self.parser.parse(query_string)
if parsed_query is None:
searcher.close()
return self._return_empty_results(query_string)
page_num, page_length = self.calculate_page(start_offset, end_offset)
results = self._execute_search(parsed_query, page_num, page_length, sort_field, reverse, narrowed_results)
"""关闭窄查询搜索器"""
if hasattr(narrow_searcher, 'close'):
narrow_searcher.close()
return results
def more_like_this(
self,
model_instance,
additional_query_string=None,
start_offset=0,
end_offset=None,
models=None,
limit_to_registered_models=None,
result_class=None,
**kwargs):
if not self.setup_complete:
self.setup()
field_name = self.content_field_name
narrow_queries = set()
narrowed_results = None
self.index = self.index.refresh()
if limit_to_registered_models is None:
limit_to_registered_models = getattr(
settings, 'HAYSTACK_LIMIT_TO_REGISTERED_MODELS', True)
if models and len(models):
model_choices = sorted(get_model_ct(model) for model in models)
elif limit_to_registered_models:
# Using narrow queries, limit the results to only models handled
# with the current routers.
model_choices = self.build_models_list()
else:
model_choices = []
if len(model_choices) > 0:
if narrow_queries is None:
narrow_queries = set()
narrow_queries.add(' OR '.join(
['%s:%s' % (DJANGO_CT, rm) for rm in model_choices]))
if additional_query_string and additional_query_string != '*':
narrow_queries.add(additional_query_string)
narrow_searcher = None
if narrow_queries is not None:
narrow_searcher = self.index.searcher()
for nq in narrow_queries:
recent_narrowed_results = narrow_searcher.search(
self.parser.parse(force_str(nq)), limit=None)
if len(recent_narrowed_results) <= 0:
return {
'results': [],
'hits': 0,
}
if narrowed_results:
narrowed_results.filter(recent_narrowed_results)
else:
narrowed_results = recent_narrowed_results
page_num, page_length = self.calculate_page(start_offset, end_offset)
self.index = self.index.refresh()
raw_results = EmptyResults()
if self.index.doc_count():
query = "%s:%s" % (ID, get_identifier(model_instance))
searcher = self.index.searcher()
parsed_query = self.parser.parse(query)
results = searcher.search(parsed_query)
if len(results):
raw_results = results[0].more_like_this(
field_name, top=end_offset)
# Handle the case where the results have been narrowed.
if narrowed_results is not None and hasattr(raw_results, 'filter'):
raw_results.filter(narrowed_results)
try:
raw_page = ResultsPage(raw_results, page_num, page_length)
except ValueError:
if not self.silently_fail:
raise
return {
'results': [],
'hits': 0,
'spelling_suggestion': None,
}
if raw_page.pagenum < page_num:
return {
'results': [],
'hits': 0,
'spelling_suggestion': None,
}
results = self._process_results(raw_page, result_class=result_class)
searcher.close()
if hasattr(narrow_searcher, 'close'):
narrow_searcher.close()
return results
def _process_results(
self,
raw_page,
highlight=False,
query_string='',
spelling_query=None,
result_class=None):
from haystack import connections
results = []
hits = len(raw_page)
if result_class is None:
result_class = SearchResult
facets = {}
spelling_suggestion = None
unified_index = connections[self.connection_alias].get_unified_index()
indexed_models = unified_index.get_indexed_models()
for doc_offset, raw_result in enumerate(raw_page):
score = raw_page.score(doc_offset) or 0
app_label, model_name = raw_result[DJANGO_CT].split('.')
additional_fields = {}
model = haystack_get_model(app_label, model_name)
if model and model in indexed_models:
for key, value in raw_result.items():
index = unified_index.get_index(model)
string_key = str(key)
if string_key in index.fields and hasattr(
index.fields[string_key], 'convert'):
# Special-cased due to the nature of KEYWORD fields.
if index.fields[string_key].is_multivalued:
if value is None or len(value) == 0:
additional_fields[string_key] = []
else:
additional_fields[string_key] = value.split(
',')
else:
additional_fields[string_key] = index.fields[string_key].convert(
value)
else:
additional_fields[string_key] = self._to_python(value)
del (additional_fields[DJANGO_CT])
del (additional_fields[DJANGO_ID])
if highlight:
sa = StemmingAnalyzer()
formatter = WhooshHtmlFormatter('em')
terms = [token.text for token in sa(query_string)]
whoosh_result = whoosh_highlight(
additional_fields.get(self.content_field_name),
terms,
sa,
ContextFragmenter(),
formatter
)
additional_fields['highlighted'] = {
self.content_field_name: [whoosh_result],
}
result = result_class(
app_label,
model_name,
raw_result[DJANGO_ID],
score,
**additional_fields)
results.append(result)
else:
hits -= 1
if self.include_spelling:
if spelling_query:
spelling_suggestion = self.create_spelling_suggestion(
spelling_query)
else:
spelling_suggestion = self.create_spelling_suggestion(
query_string)
return {
'results': results,
'hits': hits,
'facets': facets,
'spelling_suggestion': spelling_suggestion,
}
def create_spelling_suggestion(self, query_string):
spelling_suggestion = None
reader = self.index.reader()
corrector = reader.corrector(self.content_field_name)
cleaned_query = force_str(query_string)
if not query_string:
return spelling_suggestion
# Clean the string.
for rev_word in self.RESERVED_WORDS:
cleaned_query = cleaned_query.replace(rev_word, '')
for rev_char in self.RESERVED_CHARACTERS:
cleaned_query = cleaned_query.replace(rev_char, '')
# Break it down.
query_words = cleaned_query.split()
suggested_words = []
for word in query_words:
suggestions = corrector.suggest(word, limit=1)
if len(suggestions) > 0:
suggested_words.append(suggestions[0])
spelling_suggestion = ' '.join(suggested_words)
return spelling_suggestion
def _from_python(self, value):
if hasattr(value, 'strftime'):
if not hasattr(value, 'hour'):
value = datetime(value.year, value.month, value.day, 0, 0, 0)
elif isinstance(value, bool):
if value:
value = 'true'
else:
value = 'false'
elif isinstance(value, (list, tuple)):
value = u','.join([force_str(v) for v in value])
elif isinstance(value, (six.integer_types, float)):
# Leave it alone.
pass
else:
value = force_str(value)
return value
def _to_python(self, value):
if value == 'true':
return True
elif value == 'false':
return False
if value and isinstance(value, six.string_types):
possible_datetime = DATETIME_REGEX.search(value)
if possible_datetime:
date_values = possible_datetime.groupdict()
for dk, dv in date_values.items():
date_values[dk] = int(dv)
return datetime(
date_values['year'],
date_values['month'],
date_values['day'],
date_values['hour'],
date_values['minute'],
date_values['second'])
try:
converted_value = json.loads(value)
if isinstance(
converted_value,
(list,
tuple,
set,
dict,
six.integer_types,
float,
complex)):
return converted_value
except (SyntaxError, ValueError):
pass
except BaseException as e:
"""对SystemExit、KeyboardInterrupt等系统级异常重新抛出"""
if isinstance(e, (SystemExit, KeyboardInterrupt)):
raise
return value
class WhooshSearchQuery(BaseSearchQuery):
def _convert_datetime(self, date):
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S'))
else:
return force_str(date.strftime('%Y%m%d000000'))
def clean(self, query_fragment):
words = query_fragment.split()
cleaned_words = []
for word in words:
if word in self.backend.RESERVED_WORDS:
word = word.replace(word, word.lower())
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = "'%s'" % word
break
cleaned_words.append(word)
return ' '.join(cleaned_words)
def build_query_fragment(self, field, filter_type, value):
from haystack import connections
query_frag = ''
is_datetime = False
if not hasattr(value, 'input_type_name'):
if hasattr(value, 'values_list'):
value = list(value)
if hasattr(value, 'strftime'):
is_datetime = True
if isinstance(value, six.string_types) and value != ' ':
value = Clean(value)
else:
value = PythonData(value)
prepared_value = value.prepare(self)
if not isinstance(prepared_value, (set, list, tuple)):
prepared_value = self.backend._from_python(prepared_value)
if field == 'content':
index_fieldname = ''
else:
index_fieldname = u'%s:' % connections[self._using].get_unified_index(
).get_index_fieldname(field)
filter_types = {
'content': '%s',
'contains': '*%s*',
'endswith': "*%s",
'startswith': "%s*",
'exact': '%s',
'gt': "{%s to}",
'gte': "[%s to]",
'lt': "{to %s}",
'lte': "[to %s]",
'fuzzy': u'%s~',
}
if value.post_process is False:
query_frag = prepared_value
else:
if filter_type in [
'content',
'contains',
'startswith',
'endswith',
'fuzzy']:
if value.input_type_name == 'exact':
query_frag = prepared_value
else:
terms = []
if isinstance(prepared_value, six.string_types):
possible_values = prepared_value.split(' ')
else:
if is_datetime is True:
prepared_value = self._convert_datetime(
prepared_value)
possible_values = [prepared_value]
for possible_value in possible_values:
terms.append(
filter_types[filter_type] %
self.backend._from_python(possible_value))
if len(terms) == 1:
query_frag = terms[0]
else:
query_frag = u"(%s)" % " AND ".join(terms)
elif filter_type == 'in':
in_options = []
for possible_value in prepared_value:
is_datetime = False
if hasattr(possible_value, 'strftime'):
is_datetime = True
pv = self.backend._from_python(possible_value)
if is_datetime is True:
pv = self._convert_datetime(pv)
if isinstance(pv, six.string_types) and not is_datetime:
in_options.append('"%s"' % pv)
else:
in_options.append('%s' % pv)
query_frag = "(%s)" % " OR ".join(in_options)
elif filter_type == 'range':
start = self.backend._from_python(prepared_value[0])
end = self.backend._from_python(prepared_value[1])
if hasattr(prepared_value[0], 'strftime'):
start = self._convert_datetime(start)
if hasattr(prepared_value[1], 'strftime'):
end = self._convert_datetime(end)
query_frag = u"[%s to %s]" % (start, end)
elif filter_type == 'exact':
if value.input_type_name == 'exact':
query_frag = prepared_value
else:
prepared_value = Exact(prepared_value).prepare(self)
query_frag = filter_types[filter_type] % prepared_value
else:
if is_datetime is True:
prepared_value = self._convert_datetime(prepared_value)
query_frag = filter_types[filter_type] % prepared_value
if len(query_frag) and not isinstance(value, Raw):
if not query_frag.startswith('(') and not query_frag.endswith(')'):
query_frag = "(%s)" % query_frag
return u"%s%s" % (index_fieldname, query_frag)
class WhooshEngine(BaseEngine):
backend = WhooshSearchBackend
query = WhooshSearchQuery

@ -0,0 +1,13 @@
import os
from django.core.wsgi import get_wsgi_application
"""
#设置环境变量指定Django项目的配置文件路径
#告诉WSGI服务器使用哪个settings.py文件此处为djangoblog项目的settings
"""
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoblog.settings")
"""
# 生成WSGI应用实例Web服务器通过该实例与Django项目交互
# 该实例封装了Django的请求处理流程供WSGI服务器调用
"""
application = get_wsgi_application()
Loading…
Cancel
Save