parent
54b4de7c17
commit
9b4fa3827e
@ -0,0 +1,2 @@
|
||||
.DS_Store
|
||||
*.DS_Store
|
||||
Binary file not shown.
@ -1,5 +0,0 @@
|
||||
from django.apps import AppConfig
|
||||
|
||||
#评论应用的配置类
|
||||
class CommentsConfig(AppConfig):
|
||||
name = 'comments'# 指定应用名称为'comments',对应安装的应用名
|
||||
@ -1,85 +0,0 @@
|
||||
from django.test import Client, RequestFactory, TransactionTestCase
|
||||
from django.urls import reverse
|
||||
|
||||
from accounts.models import BlogUser
|
||||
from blog.models import Category, Article
|
||||
from comments.models import Comment
|
||||
from comments.templatetags.comments_tags import *
|
||||
from djangoblog.utils import get_max_articleid_commentid
|
||||
|
||||
|
||||
# 评论功能测试类(使用数据库事务隔离)
|
||||
class CommentsTest(TransactionTestCase):
|
||||
def setUp(self):
|
||||
# 初始化测试客户端和请求工厂
|
||||
self.client = Client()
|
||||
self.factory = RequestFactory()
|
||||
|
||||
# 设置评论需要审核才能显示
|
||||
from blog.models import BlogSettings
|
||||
value = BlogSettings()
|
||||
value.comment_need_review = True
|
||||
value.save()
|
||||
|
||||
# 创建超级用户用于登录测试
|
||||
self.user = BlogUser.objects.create_superuser(
|
||||
email="liangliangyy1@gmail.com",
|
||||
username="liangliangyy1",
|
||||
password="liangliangyy1")
|
||||
|
||||
def update_article_comment_status(self, article):
|
||||
"""批量启用文章下的所有评论"""
|
||||
comments = article.comment_set.all()
|
||||
for comment in comments:
|
||||
comment.is_enable = True
|
||||
comment.save()
|
||||
|
||||
def test_validate_comment(self):
|
||||
# 登录测试用户
|
||||
self.client.login(username='liangliangyy1', password='liangliangyy1')
|
||||
|
||||
# 创建测试分类
|
||||
category = Category()
|
||||
category.name = "categoryccc"
|
||||
category.save()
|
||||
|
||||
# 创建测试文章
|
||||
article = Article()
|
||||
article.title = "nicetitleccc"
|
||||
article.body = "nicecontentccc"
|
||||
article.author = self.user
|
||||
article.category = category
|
||||
article.type = 'a'
|
||||
article.status = 'p'
|
||||
article.save()
|
||||
|
||||
# 获取评论提交的URL
|
||||
comment_url = reverse(
|
||||
'comments:postcomment', kwargs={
|
||||
'article_id': article.id})
|
||||
|
||||
# 测试发布第一条评论
|
||||
response = self.client.post(comment_url,
|
||||
{
|
||||
'body': '123ffffffffff'
|
||||
})
|
||||
|
||||
self.assertEqual(response.status_code, 302) # 验证重定向
|
||||
|
||||
#断言:评论未审核时不显示
|
||||
article = Article.objects.get(pk=article.pk)
|
||||
self.assertEqual(len(article.comment_list()), 0)
|
||||
|
||||
# 审核通过所有评论
|
||||
self.update_article_comment_status(article)
|
||||
self.assertEqual(len(article.comment_list()), 1) # 验证评论已显示
|
||||
|
||||
# 测试发布第二条评论
|
||||
response = self.client.post(comment_url,
|
||||
{
|
||||
'body': '123ffffffffff',
|
||||
})
|
||||
|
||||
self.assertEqual(response.status_code, 302)
|
||||
|
||||
article
|
||||
@ -1,43 +0,0 @@
|
||||
import logging
|
||||
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from djangoblog.utils import get_current_site
|
||||
from djangoblog.utils import send_email
|
||||
|
||||
logger = logging.getLogger(__name__) # 获取当前模块的日志记录器
|
||||
|
||||
|
||||
def send_comment_email(comment):
|
||||
"""发送评论相关邮件的主函数"""
|
||||
site = get_current_site().domain# 获取当前站点域名
|
||||
subject = _('Thanks for your comment')# 邮件标题(国际化)
|
||||
article_url = f"https://{site}{comment.article.get_absolute_url()}" # 构建文章完整URL
|
||||
|
||||
# 1. 给评论作者发送感谢邮件
|
||||
html_content = _("""<p>Thank you very much for your comments on this site</p>
|
||||
You can visit <a href="%(article_url)s" rel="bookmark">%(article_title)s</a>
|
||||
to review your comments,
|
||||
Thank you again!
|
||||
<br />
|
||||
If the link above cannot be opened, please copy this link to your browser.
|
||||
%(article_url)s""") % {'article_url': article_url, 'article_title': comment.article.title}
|
||||
tomail = comment.author.email# 收件人邮箱
|
||||
send_email([tomail], subject, html_content)
|
||||
|
||||
# 2. 如果是回复评论,给被回复者发送通知邮件
|
||||
try:
|
||||
if comment.parent_comment:
|
||||
html_content = _("""Your comment on <a href="%(article_url)s" rel="bookmark">%(article_title)s</a><br/> has
|
||||
received a reply. <br/> %(comment_body)s
|
||||
<br/>
|
||||
go check it out!
|
||||
<br/>
|
||||
If the link above cannot be opened, please copy this link to your browser.
|
||||
%(article_url)s
|
||||
""") % {'article_url': article_url, 'article_title': comment.article.title,
|
||||
'comment_body': comment.parent_comment.body}
|
||||
tomail = comment.parent_comment.author.email # 被回复者邮箱
|
||||
send_email([tomail], subject, html_content)
|
||||
except Exception as e:
|
||||
logger.error(e)#记录邮件发送异常
|
||||
@ -1,2 +0,0 @@
|
||||
# Zxy指定默认的应用配置类
|
||||
default_app_config = 'djangoblog.apps.DjangoblogAppConfig'
|
||||
@ -1,60 +0,0 @@
|
||||
# Zxy导入 Django 的 AdminSite 类和其他相关模块
|
||||
from django.contrib.admin import AdminSite
|
||||
from django.contrib.admin.models import LogEntry
|
||||
from django.contrib.sites.admin import SiteAdmin
|
||||
from django.contrib.sites.models import Site
|
||||
|
||||
# Zxy导入自定义模块和模型
|
||||
from accounts.admin import *
|
||||
from blog.admin import *
|
||||
from blog.models import *
|
||||
from comments.admin import *
|
||||
from comments.models import *
|
||||
from djangoblog.logentryadmin import LogEntryAdmin
|
||||
from oauth.admin import *
|
||||
from oauth.models import *
|
||||
from owntracks.admin import *
|
||||
from owntracks.models import *
|
||||
from servermanager.admin import *
|
||||
from servermanager.models import *
|
||||
|
||||
# Zxy定义自定义 AdminSite 类
|
||||
class DjangoBlogAdminSite(AdminSite):
|
||||
# 自定义站点标题和头部
|
||||
site_header = 'djangoblog administration'
|
||||
site_title = 'djangoblog site admin'
|
||||
|
||||
# 初始化方法
|
||||
def __init__(self, name='admin'):
|
||||
super().__init__(name)
|
||||
|
||||
# 定义权限检查方法,仅允许超级用户访问
|
||||
def has_permission(self, request):
|
||||
return request.user.is_superuser
|
||||
|
||||
# Zxy创建自定义 AdminSite 实例
|
||||
admin_site = DjangoBlogAdminSite(name='admin')
|
||||
|
||||
# Zxy注册模型到自定义 AdminSite
|
||||
admin_site.register(Article, ArticleAdmin)
|
||||
admin_site.register(Category, CategoryAdmin)
|
||||
admin_site.register(Tag, TagAdmin)
|
||||
admin_site.register(Links, LinksAdmin)
|
||||
admin_site.register(SideBar, SideBarAdmin)
|
||||
admin_site.register(BlogSettings, BlogSettingsAdmin)
|
||||
|
||||
admin_site.register(commands, CommandsAdmin)
|
||||
admin_site.register(EmailSendLog, EmailSendLogAdmin)
|
||||
|
||||
admin_site.register(BlogUser, BlogUserAdmin)
|
||||
|
||||
admin_site.register(Comment, CommentAdmin)
|
||||
|
||||
admin_site.register(OAuthUser, OAuthUserAdmin)
|
||||
admin_site.register(OAuthConfig, OAuthConfigAdmin)
|
||||
|
||||
admin_site.register(OwnTrackLog, OwnTrackLogsAdmin)
|
||||
|
||||
admin_site.register(Site, SiteAdmin)
|
||||
|
||||
admin_site.register(LogEntry, LogEntryAdmin)
|
||||
@ -1,196 +0,0 @@
|
||||
# Zxy导入Django的编码工具
|
||||
from django.utils.encoding import force_str
|
||||
# Zxy导入Elasticsearch DSL的查询构造器
|
||||
from elasticsearch_dsl import Q
|
||||
# Zxy导入Haystack的搜索后端模块
|
||||
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query
|
||||
# Zxy导入Haystack的表单模块
|
||||
from haystack.forms import ModelSearchForm
|
||||
# Zxy导入Haystack的搜索结果模型
|
||||
from haystack.models import SearchResult
|
||||
# Zxy导入Haystack的日志工具
|
||||
from haystack.utils import log as logging
|
||||
|
||||
# Zxy导入项目中的Elasticsearch文档和管理器
|
||||
from blog.documents import ArticleDocument, ArticleDocumentManager
|
||||
# Zxy导入项目中的文章模型
|
||||
from blog.models import Article
|
||||
|
||||
# Zxy获取日志记录器
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Zxy定义Elasticsearch搜索后端类
|
||||
class ElasticSearchBackend(BaseSearchBackend):
|
||||
# Zxy初始化方法
|
||||
def __init__(self, connection_alias, **connection_options):
|
||||
super(ElasticSearchBackend, self).__init__(connection_alias, **connection_options)
|
||||
self.manager = ArticleDocumentManager() # Zxy初始化文档管理器
|
||||
self.include_spelling = True # Zxy启用拼写建议
|
||||
|
||||
# Zxy获取模型数据并转换为文档
|
||||
def _get_models(self, iterable):
|
||||
models = iterable if iterable and iterable[0] else Article.objects.all() # Zxy获取模型数据
|
||||
docs = self.manager.convert_to_doc(models) # Zxy转换为Elasticsearch文档
|
||||
return docs
|
||||
|
||||
# Zxy创建索引并重建数据
|
||||
def _create(self, models):
|
||||
self.manager.create_index() # Zxy创建索引
|
||||
docs = self._get_models(models) # Zxy获取文档
|
||||
self.manager.rebuild(docs) # Zxy重建索引数据
|
||||
|
||||
# Zxy删除模型数据
|
||||
def _delete(self, models):
|
||||
for m in models:
|
||||
m.delete() # Zxy删除模型实例
|
||||
return True
|
||||
|
||||
# Zxy重建索引数据
|
||||
def _rebuild(self, models):
|
||||
models = models if models else Article.objects.all() # Zxy获取模型数据
|
||||
docs = self.manager.convert_to_doc(models) # Zxy转换为文档
|
||||
self.manager.update_docs(docs) # Zxy更新索引数据
|
||||
|
||||
# Zxy更新索引
|
||||
def update(self, index, iterable, commit=True):
|
||||
models = self._get_models(iterable) # Zxy获取文档
|
||||
self.manager.update_docs(models) # Zxy更新索引
|
||||
|
||||
# Zxy删除文档
|
||||
def remove(self, obj_or_string):
|
||||
models = self._get_models([obj_or_string]) # Zxy获取文档
|
||||
self._delete(models) # Zxy删除文档
|
||||
|
||||
# Zxy清空索引
|
||||
def clear(self, models=None, commit=True):
|
||||
self.remove(None) # Zxy删除所有文档
|
||||
|
||||
# Zxy获取拼写建议
|
||||
@staticmethod
|
||||
def get_suggestion(query: str) -> str:
|
||||
"""获取推荐词,如果没有找到则返回原搜索词"""
|
||||
search = ArticleDocument.search() \
|
||||
.query("match", body=query) \
|
||||
.suggest('suggest_search', query, term={'field': 'body'}) \
|
||||
.execute()
|
||||
|
||||
keywords = []
|
||||
for suggest in search.suggest.suggest_search:
|
||||
if suggest["options"]: # Zxy检查是否有建议选项
|
||||
keywords.append(suggest["options"][0]["text"]) # Zxy添加建议词
|
||||
else:
|
||||
keywords.append(suggest["text"]) # Zxy添加原搜索词
|
||||
|
||||
return ' '.join(keywords) # Zxy返回拼写建议
|
||||
|
||||
# Zxy执行搜索查询
|
||||
@log_query
|
||||
def search(self, query_string, **kwargs):
|
||||
logger.info('search query_string:' + query_string) # Zxy记录搜索查询
|
||||
|
||||
start_offset = kwargs.get('start_offset') # Zxy获取起始偏移量
|
||||
end_offset = kwargs.get('end_offset') # Zxy获取结束偏移量
|
||||
|
||||
# Zxy检查是否启用拼写建议
|
||||
if getattr(self, "is_suggest", None):
|
||||
suggestion = self.get_suggestion(query_string) # Zxy获取拼写建议
|
||||
else:
|
||||
suggestion = query_string # Zxy使用原搜索词
|
||||
|
||||
q = Q('bool', # Zxy构造布尔查询
|
||||
should=[Q('match', body=suggestion), Q('match', title=suggestion)], # Zxy匹配标题或正文
|
||||
minimum_should_match="70%") # Zxy至少匹配70%
|
||||
|
||||
search = ArticleDocument.search() \
|
||||
.query('bool', filter=[q]) \
|
||||
.filter('term', status='p') \
|
||||
.filter('term', type='a') \
|
||||
.source(False)[start_offset: end_offset] # Zxy执行搜索
|
||||
|
||||
results = search.execute() # Zxy执行搜索查询
|
||||
hits = results['hits'].total # Zxy获取总匹配数
|
||||
raw_results = [] # Zxy初始化结果列表
|
||||
|
||||
for raw_result in results['hits']['hits']: # Zxy遍历搜索结果
|
||||
app_label = 'blog' # Zxy应用标签
|
||||
model_name = 'Article' # Zxy模型名称
|
||||
additional_fields = {} # Zxy额外字段
|
||||
|
||||
result_class = SearchResult # Zxy搜索结果类
|
||||
|
||||
result = result_class( # Zxy创建搜索结果对象
|
||||
app_label,
|
||||
model_name,
|
||||
raw_result['_id'],
|
||||
raw_result['_score'],
|
||||
**additional_fields
|
||||
)
|
||||
raw_results.append(result) # Zxy添加到结果列表
|
||||
|
||||
facets = {} # Zxy初始化分面信息
|
||||
spelling_suggestion = None if query_string == suggestion else suggestion # Zxy拼写建议
|
||||
|
||||
return {
|
||||
'results': raw_results, # Zxy返回搜索结果
|
||||
'hits': hits, # Zxy返回匹配数
|
||||
'facets': facets, # Zxy返回分面信息
|
||||
'spelling_suggestion': spelling_suggestion, # Zxy返回拼写建议
|
||||
}
|
||||
|
||||
# Zxy定义Elasticsearch搜索查询类
|
||||
class ElasticSearchQuery(BaseSearchQuery):
|
||||
# Zxy转换日期时间格式
|
||||
def _convert_datetime(self, date):
|
||||
if hasattr(date, 'hour'):
|
||||
return force_str(date.strftime('%Y%m%d%H%M%S'))
|
||||
else:
|
||||
return force_str(date.strftime('%Y%m%d000000'))
|
||||
|
||||
# Zxy清理用户输入
|
||||
def clean(self, query_fragment):
|
||||
words = query_fragment.split()
|
||||
cleaned_words = []
|
||||
|
||||
for word in words:
|
||||
if word in self.backend.RESERVED_WORDS:
|
||||
word = word.replace(word, word.lower())
|
||||
|
||||
for char in self.backend.RESERVED_CHARACTERS:
|
||||
if char in word:
|
||||
word = "'%s'" % word
|
||||
break
|
||||
|
||||
cleaned_words.append(word)
|
||||
|
||||
return ' '.join(cleaned_words)
|
||||
|
||||
# Zxy构建查询片段
|
||||
def build_query_fragment(self, field, filter_type, value):
|
||||
return value.query_string
|
||||
|
||||
# Zxy获取结果数量
|
||||
def get_count(self):
|
||||
results = self.get_results()
|
||||
return len(results) if results else 0
|
||||
|
||||
# Zxy获取拼写建议
|
||||
def get_spelling_suggestion(self, preferred_query=None):
|
||||
return self._spelling_suggestion
|
||||
|
||||
# Zxy构建查询参数
|
||||
def build_params(self, spelling_query=None):
|
||||
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
|
||||
return kwargs
|
||||
|
||||
# Zxy定义Elasticsearch模型搜索表单
|
||||
class ElasticSearchModelSearchForm(ModelSearchForm):
|
||||
# Zxy执行搜索
|
||||
def search(self):
|
||||
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no" # Zxy检查是否启用拼写建议
|
||||
sqs = super().search() # Zxy调用父类搜索方法
|
||||
return sqs
|
||||
|
||||
# Zxy定义Elasticsearch搜索引擎
|
||||
class ElasticSearchEngine(BaseEngine):
|
||||
backend = ElasticSearchBackend # Zxy后端类
|
||||
query = ElasticSearchQuery # Zxy查询类
|
||||
@ -1,54 +0,0 @@
|
||||
# Zxy导入Django的用户模型
|
||||
from django.contrib.auth import get_user_model
|
||||
# Zxy导入Django的Feed视图
|
||||
from django.contrib.syndication.views import Feed
|
||||
# Zxy导入Django的时区工具
|
||||
from django.utils import timezone
|
||||
# Zxy导入RSS 2.0 Feed生成器
|
||||
from django.utils.feedgenerator import Rss201rev2Feed
|
||||
|
||||
# Zxy导入项目中的文章模型
|
||||
from blog.models import Article
|
||||
# Zxy导入Markdown工具
|
||||
from djangoblog.utils import CommonMarkdown
|
||||
|
||||
# Zxy定义Django博客Feed
|
||||
class DjangoBlogFeed(Feed):
|
||||
feed_type = Rss201rev2Feed # Zxy使用RSS 2.0格式
|
||||
|
||||
description = '大巧无工,重剑无锋.' # ZxyFeed描述
|
||||
title = "且听风吟 大巧无工,重剑无锋." # ZxyFeed标题
|
||||
link = "/feed/" # ZxyFeed链接
|
||||
|
||||
# Zxy获取作者名称
|
||||
def author_name(self):
|
||||
return get_user_model().objects.first().nickname
|
||||
|
||||
# Zxy获取作者链接
|
||||
def author_link(self):
|
||||
return get_user_model().objects.first().get_absolute_url()
|
||||
|
||||
# Zxy获取Feed项
|
||||
def items(self):
|
||||
return Article.objects.filter(type='a', status='p').order_by('-pub_time')[:5] # Zxy获取最近5篇已发布的文章
|
||||
|
||||
# Zxy获取Feed项标题
|
||||
def item_title(self, item):
|
||||
return item.title
|
||||
|
||||
# Zxy获取Feed项描述
|
||||
def item_description(self, item):
|
||||
return CommonMarkdown.get_markdown(item.body) # Zxy将文章内容转换为Markdown格式
|
||||
|
||||
# Zxy获取Feed版权信息
|
||||
def feed_copyright(self):
|
||||
now = timezone.now()
|
||||
return "Copyright© {year} 且听风吟".format(year=now.year) # Zxy动态生成版权年份
|
||||
|
||||
# Zxy获取Feed项链接
|
||||
def item_link(self, item):
|
||||
return item.get_absolute_url()
|
||||
|
||||
# Zxy获取Feed项GUID
|
||||
def item_guid(self, item):
|
||||
return
|
||||
@ -1,88 +0,0 @@
|
||||
# Zxy导入Django的admin模块
|
||||
from django.contrib import admin
|
||||
# Zxy导入Django的LogEntry模型
|
||||
from django.contrib.admin.models import DELETION
|
||||
# Zxy导入Django的内容类型模型
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
# Zxy导入Django的URL工具
|
||||
from django.urls import reverse, NoReverseMatch
|
||||
# Zxy导入Django的编码工具
|
||||
from django.utils.encoding import force_str
|
||||
# Zxy导入Django的HTML工具
|
||||
from django.utils.html import escape
|
||||
# Zxy导入Django的安全字符串工具
|
||||
from django.utils.safestring import mark_safe
|
||||
# Zxy导入Django的翻译工具
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
# Zxy定义LogEntryAdmin类
|
||||
class LogEntryAdmin(admin.ModelAdmin):
|
||||
list_filter = ['content_type'] # Zxy按内容类型过滤
|
||||
search_fields = ['object_repr', 'change_message'] # Zxy搜索字段
|
||||
|
||||
list_display_links = ['action_time', 'get_change_message'] # Zxy显示链接的字段
|
||||
list_display = ['action_time', 'user_link', 'content_type', 'object_link', 'get_change_message'] # Zxy显示字段
|
||||
|
||||
# Zxy检查是否有添加权限
|
||||
def has_add_permission(self, request):
|
||||
return False
|
||||
|
||||
# Zxy检查是否有修改权限
|
||||
def has_change_permission(self, request, obj=None):
|
||||
return (
|
||||
request.user.is_superuser or
|
||||
request.user.has_perm('admin.change_logentry')
|
||||
) and request.method != 'POST'
|
||||
|
||||
# Zxy检查是否有删除权限
|
||||
def has_delete_permission(self, request, obj=None):
|
||||
return False
|
||||
|
||||
# Zxy获取对象链接
|
||||
def object_link(self, obj):
|
||||
object_link = escape(obj.object_repr) # Zxy转义对象表示
|
||||
content_type = obj.content_type
|
||||
|
||||
if obj.action_flag != DELETION and content_type is not None: # Zxy检查是否为删除操作
|
||||
try:
|
||||
url = reverse( # Zxy生成反向URL
|
||||
'admin:{}_{}_change'.format(content_type.app_label, content_type.model),
|
||||
args=[obj.object_id]
|
||||
)
|
||||
object_link = '<a href="{}">{}</a>'.format(url, object_link) # Zxy生成链接
|
||||
except NoReverseMatch:
|
||||
pass
|
||||
return mark_safe(object_link) # Zxy标记为安全字符串
|
||||
|
||||
object_link.admin_order_field = 'object_repr' # Zxy排序字段
|
||||
object_link.short_description = _('object') # Zxy字段描述
|
||||
|
||||
# Zxy获取用户链接
|
||||
def user_link(self, obj):
|
||||
content_type = ContentType.objects.get_for_model(type(obj.user)) # Zxy获取用户的内容类型
|
||||
user_link = escape(force_str(obj.user)) # Zxy转义用户表示
|
||||
|
||||
try:
|
||||
url = reverse( # Zxy生成反向URL
|
||||
'admin:{}_{}_change'.format(content_type.app_label, content_type.model),
|
||||
args=[obj.user.pk]
|
||||
)
|
||||
user_link = '<a href="{}">{}</a>'.format(url, user_link) # Zxy生成链接
|
||||
except NoReverseMatch:
|
||||
pass
|
||||
return mark_safe(user_link) # Zxy标记为安全字符串
|
||||
|
||||
user_link.admin_order_field = 'user' # Zxy排序字段
|
||||
user_link.short_description = _('user') # Zxy字段描述
|
||||
|
||||
# Zxy获取查询集
|
||||
def get_queryset(self, request):
|
||||
queryset = super(LogEntryAdmin, self).get_queryset(request)
|
||||
return queryset.prefetch_related('content_type') # Zxy预加载内容类型
|
||||
|
||||
# Zxy获取操作
|
||||
def get_actions(self, request):
|
||||
actions = super(LogEntryAdmin, self).get_actions(request)
|
||||
if 'delete_selected' in actions: # Zxy移除删除操作
|
||||
del actions['delete_selected']
|
||||
return actions
|
||||
@ -1,48 +0,0 @@
|
||||
# Zxy导入日志模块
|
||||
import logging
|
||||
|
||||
# Zxy获取日志记录器
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Zxy定义基础插件类
|
||||
class BasePlugin:
|
||||
# Zxy插件元数据
|
||||
PLUGIN_NAME = None # Zxy插件名称
|
||||
PLUGIN_DESCRIPTION = None # Zxy插件描述
|
||||
PLUGIN_VERSION = None # Zxy插件版本
|
||||
|
||||
# Zxy初始化插件
|
||||
def __init__(self):
|
||||
# Zxy检查插件元数据是否已定义
|
||||
if not all([self.PLUGIN_NAME, self.PLUGIN_DESCRIPTION, self.PLUGIN_VERSION]):
|
||||
raise ValueError("Plugin metadata (PLUGIN_NAME, PLUGIN_DESCRIPTION, PLUGIN_VERSION) must be defined.")
|
||||
self.init_plugin() # Zxy调用初始化方法
|
||||
self.register_hooks() # Zxy注册钩子
|
||||
|
||||
# Zxy插件初始化逻辑
|
||||
def init_plugin(self):
|
||||
"""
|
||||
插件初始化逻辑
|
||||
子类可以重写此方法来实现特定的初始化操作
|
||||
"""
|
||||
logger.info(f'{self.PLUGIN_NAME} initialized.') # Zxy记录插件初始化日志
|
||||
|
||||
# Zxy注册插件钩子
|
||||
def register_hooks(self):
|
||||
"""
|
||||
注册插件钩子
|
||||
子类可以重写此方法来注册特定的钩子
|
||||
"""
|
||||
pass
|
||||
|
||||
# Zxy获取插件信息
|
||||
def get_plugin_info(self):
|
||||
"""
|
||||
获取插件信息
|
||||
:return: 包含插件元数据的字典
|
||||
"""
|
||||
return {
|
||||
'name': self.PLUGIN_NAME, # Zxy插件名称
|
||||
'description': self.PLUGIN_DESCRIPTION, # Zxy插件描述
|
||||
'version': self.PLUGIN_VERSION # Zxy插件版本
|
||||
}
|
||||
@ -1,8 +0,0 @@
|
||||
# Zxy定义文章相关的钩子常量
|
||||
ARTICLE_DETAIL_LOAD = 'article_detail_load' # Zxy文章详情加载时触发的钩子
|
||||
ARTICLE_CREATE = 'article_create' # Zxy文章创建时触发的钩子
|
||||
ARTICLE_UPDATE = 'article_update' # Zxy文章更新时触发的钩子
|
||||
ARTICLE_DELETE = 'article_delete' # Zxy文章删除时触发的钩子
|
||||
|
||||
# Zxy定义文章内容处理的钩子名称
|
||||
ARTICLE_CONTENT_HOOK_NAME = "the_content" # Zxy文章内容处理的钩子名称
|
||||
@ -1,47 +0,0 @@
|
||||
# Zxy导入日志模块
|
||||
import logging
|
||||
|
||||
# Zxy获取日志记录器
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Zxy定义全局钩子字典
|
||||
_hooks = {}
|
||||
|
||||
# Zxy注册钩子回调
|
||||
def register(hook_name: str, callback: callable):
|
||||
"""
|
||||
注册一个钩子回调。
|
||||
"""
|
||||
if hook_name not in _hooks: # Zxy检查钩子是否已存在
|
||||
_hooks[hook_name] = [] # Zxy初始化钩子列表
|
||||
_hooks[hook_name].append(callback) # Zxy添加回调到钩子列表
|
||||
logger.debug(f"Registered hook '{hook_name}' with callback '{callback.__name__}'") # Zxy记录注册日志
|
||||
|
||||
# Zxy执行Action Hook
|
||||
def run_action(hook_name: str, *args, **kwargs):
|
||||
"""
|
||||
执行一个 Action Hook。
|
||||
它会按顺序执行所有注册到该钩子上的回调函数。
|
||||
"""
|
||||
if hook_name in _hooks: # Zxy检查钩子是否存在
|
||||
logger.debug(f"Running action hook '{hook_name}'") # Zxy记录执行日志
|
||||
for callback in _hooks[hook_name]: # Zxy遍历钩子回调
|
||||
try:
|
||||
callback(*args, **kwargs) # Zxy执行回调
|
||||
except Exception as e:
|
||||
logger.error(f"Error running action hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True) # Zxy记录错误日志
|
||||
|
||||
# Zxy执行Filter Hook
|
||||
def apply_filters(hook_name: str, value, *args, **kwargs):
|
||||
"""
|
||||
执行一个 Filter Hook。
|
||||
它会把 value 依次传递给所有注册的回调函数进行处理。
|
||||
"""
|
||||
if hook_name in _hooks: # Zxy检查钩子是否存在
|
||||
logger.debug(f"Applying filter hook '{hook_name}'") # Zxy记录执行日志
|
||||
for callback in _hooks[hook_name]: # Zxy遍历钩子回调
|
||||
try:
|
||||
value = callback(value, *args, **kwargs) # Zxy调用回调处理值
|
||||
except Exception as e:
|
||||
logger.error(f"Error applying filter hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True) # Zxy记录错误日志
|
||||
return value # Zxy返回处理后的值
|
||||
@ -1,26 +0,0 @@
|
||||
# Zxy导入日志模块
|
||||
import logging
|
||||
|
||||
# Zxy导入 requests 模块用于发送 HTTP 请求
|
||||
import requests
|
||||
from django.conf import settings
|
||||
|
||||
# Zxy获取日志记录器
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Zxy定义爬虫通知类
|
||||
class SpiderNotify():
|
||||
@staticmethod
|
||||
def baidu_notify(urls):
|
||||
# Zxy向百度站长平台发送 URL 提交请求
|
||||
try:
|
||||
data = '\n'.join(urls) # Zxy将 URL 列表拼接为字符串
|
||||
result = requests.post(settings.BAIDU_NOTIFY_URL, data=data) # Zxy发送 POST 请求
|
||||
logger.info(result.text) # Zxy记录响应内容
|
||||
except Exception as e:
|
||||
logger.error(e) # Zxy记录异常信息
|
||||
|
||||
@staticmethod
|
||||
def notify(url):
|
||||
# Zxy调用百度通知方法
|
||||
SpiderNotify.baidu_notify(url)
|
||||
@ -1,68 +0,0 @@
|
||||
# Zxy定义项目的 URL 配置
|
||||
"""djangoblog URL Configuration
|
||||
|
||||
The `urlpatterns` list routes URLs to views. For more information please see:
|
||||
https://docs.djangoproject.com/en/1.10/topics/http/urls/
|
||||
Examples:
|
||||
Function views
|
||||
1. Add an import: from my_app import views
|
||||
2. Add a URL to urlpatterns: url(r'^$', views.home, name='home')
|
||||
Class-based views
|
||||
1. Add an import: from other_app.views import Home
|
||||
2. Add a URL to urlpatterns: url(r'^$', Home.as_view(), name='home')
|
||||
Including another URLconf
|
||||
1. Import the include() function: from django.conf.urls import url, include
|
||||
2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls'))
|
||||
"""
|
||||
from django.conf import settings
|
||||
from django.conf.urls.i18n import i18n_patterns
|
||||
from django.conf.urls.static import static
|
||||
from django.contrib.sitemaps.views import sitemap
|
||||
from django.urls import path, include
|
||||
from django.urls import re_path
|
||||
from haystack.views import search_view_factory
|
||||
|
||||
from blog.views import EsSearchView
|
||||
from djangoblog.admin_site import admin_site
|
||||
from djangoblog.elasticsearch_backend import ElasticSearchModelSearchForm
|
||||
from djangoblog.feeds import DjangoBlogFeed
|
||||
from djangoblog.sitemap import ArticleSiteMap, CategorySiteMap, StaticViewSitemap, TagSiteMap, UserSiteMap
|
||||
|
||||
# Zxy定义站点地图
|
||||
sitemaps = {
|
||||
'blog': ArticleSiteMap,
|
||||
'Category': CategorySiteMap,
|
||||
'Tag': TagSiteMap,
|
||||
'User': UserSiteMap,
|
||||
'static': StaticViewSitemap
|
||||
}
|
||||
|
||||
# Zxy定义 404、500 和 403 错误页面的视图
|
||||
handler404 = 'blog.views.page_not_found_view'
|
||||
handler500 = 'blog.views.server_error_view'
|
||||
handle403 = 'blog.views.permission_denied_view'
|
||||
|
||||
# Zxy定义 URL 模式
|
||||
urlpatterns = [
|
||||
path('i18n/', include('django.conf.urls.i18n')), # Zxy国际化语言切换
|
||||
]
|
||||
urlpatterns += i18n_patterns(
|
||||
re_path(r'^admin/', admin_site.urls), # Zxy自定义 Admin 站点
|
||||
re_path(r'', include('blog.urls', namespace='blog')), # Zxy博客应用的 URL
|
||||
re_path(r'mdeditor/', include('mdeditor.urls')), # Zxy Markdown 编辑器的 URL
|
||||
re_path(r'', include('comments.urls', namespace='comment')), # Zxy评论应用的 URL
|
||||
re_path(r'', include('accounts.urls', namespace='account')), # Zxy用户账户的 URL
|
||||
re_path(r'', include('oauth.urls', namespace='oauth')), # Zxy OAuth 应用的 URL
|
||||
re_path(r'^sitemap\.xml$', sitemap, {'sitemaps': sitemaps}, # Zxy站点地图
|
||||
name='django.contrib.sitemaps.views.sitemap'),
|
||||
re_path(r'^feed/$', DjangoBlogFeed()), # Zxy RSS 订阅
|
||||
re_path(r'^rss/$', DjangoBlogFeed()), # Zxy RSS 订阅
|
||||
re_path('^search', search_view_factory(view_class=EsSearchView, form_class=ElasticSearchModelSearchForm), # Zxy搜索功能
|
||||
name='search'),
|
||||
re_path(r'', include('servermanager.urls', namespace='servermanager')), # Zxy服务器管理应用的 URL
|
||||
re_path(r'', include('owntracks.urls', namespace='owntracks')), # Zxy位置跟踪应用的 URL
|
||||
prefix_default_language=False
|
||||
) + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT) # Zxy静态文件的 URL
|
||||
if settings.DEBUG:
|
||||
urlpatterns += static(settings.MEDIA_URL, # Zxy媒体文件的 URL
|
||||
document_root=settings.MEDIA_ROOT)
|
||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in new issue