APP代码注释 #20

Merged
pwe53ixz6 merged 1 commits from gq_branch into develop 3 months ago

@ -1,13 +1,20 @@
#gq:
# 从 Django 内置的 admin 模块导入 AdminSite 基类
from django.contrib.admin import AdminSite
# 导入 LogEntry 模型,用于记录管理员操作日志
from django.contrib.admin.models import LogEntry
# 导入 Site 模型及其默认的 Admin 配置
from django.contrib.sites.admin import SiteAdmin
from django.contrib.sites.models import Site
# 批量导入各个自定义 App 的 Admin 配置和模型
# 这种星号(*)导入方式在项目规模较小时很方便,但大型项目中可能影响代码可读性
from accounts.admin import *
from blog.admin import *
from blog.models import *
from comments.admin import *
from comments.models import *
# 导入自定义的 LogEntryAdmin用于自定义操作日志的后台显示
from djangoblog.logentryadmin import LogEntryAdmin
from oauth.admin import *
from oauth.models import *
@ -18,28 +25,59 @@ from servermanager.models import *
class DjangoBlogAdminSite(AdminSite):
"""
自定义的 Admin 站点类继承自 Django AdminSite
用于定制 Admin 后台的外观和行为
"""
# 定制 Admin 后台顶部的标题
site_header = 'djangoblog administration'
# 定制浏览器标签页上的标题
site_title = 'djangoblog site admin'
def __init__(self, name='admin'):
"""
初始化方法
:param name: 站点的名称默认是 'admin'这会影响 URL 反向解析等
"""
super().__init__(name)
def has_permission(self, request):
"""
重写权限检查方法
这个方法决定了一个请求是否有权限访问 Admin 后台
:param request: 当前的 HTTP 请求对象
:return: Boolean表示是否允许访问
"""
# 只有超级用户(superuser)才能访问这个自定义的 Admin 站点
# 这是一个比默认更严格的权限控制
return request.user.is_superuser
# def get_urls(self):
# """
# (已注释)重写 get_urls 方法来添加自定义的 URL 路由。
# 这是一个示例,展示了如何在 Admin 后台中加入自己的视图。
# """
# # 先获取父类的所有 URL
# urls = super().get_urls()
# from django.urls import path
# # 导入一个自定义的视图函数,用于刷新缓存
# from blog.views import refresh_memcache
#
# # 定义自己的 URL 模式
# my_urls = [
# # 使用 self.admin_view() 包装自定义视图,以确保它受到 Admin 权限保护
# path('refresh/', self.admin_view(refresh_memcache), name="refresh"),
# ]
# # 返回合并后的 URL 列表
# return urls + my_urls
# 创建一个自定义 Admin 站点的实例
# 这个实例将被用于注册所有的模型
admin_site = DjangoBlogAdminSite(name='admin')
# --- 开始注册各个 App 的模型到自定义的 admin_site ---
# 注册 blog App 的模型
admin_site.register(Article, ArticlelAdmin)
admin_site.register(Category, CategoryAdmin)
admin_site.register(Tag, TagAdmin)
@ -47,18 +85,25 @@ admin_site.register(Links, LinksAdmin)
admin_site.register(SideBar, SideBarAdmin)
admin_site.register(BlogSettings, BlogSettingsAdmin)
# 注册 servermanager App 的模型
admin_site.register(commands, CommandsAdmin)
admin_site.register(EmailSendLog, EmailSendLogAdmin)
# 注册 accounts App 的模型
admin_site.register(BlogUser, BlogUserAdmin)
# 注册 comments App 的模型
admin_site.register(Comment, CommentAdmin)
# 注册 oauth App 的模型
admin_site.register(OAuthUser, OAuthUserAdmin)
admin_site.register(OAuthConfig, OAuthConfigAdmin)
# 注册 owntracks App 的模型
admin_site.register(OwnTrackLog, OwnTrackLogsAdmin)
# 注册 Django 内置的 Site 模型
admin_site.register(Site, SiteAdmin)
admin_site.register(LogEntry, LogEntryAdmin)
# 注册 Django 内置的 LogEntry 模型,使用自定义的 LogEntryAdmin 配置
admin_site.register(LogEntry, LogEntryAdmin)

@ -1,11 +1,14 @@
#gq:
from django.apps import AppConfig
class DjangoblogAppConfig(AppConfig):
# 默认主键类型BigAutoField自增 BigInteger避免ID溢出
default_auto_field = 'django.db.models.BigAutoField'
# 应用唯一标识与settings.py中INSTALLED_APPS配置一致
name = 'djangoblog'
def ready(self):
super().ready()
# Import and load plugins here
super().ready() # 执行父类启动逻辑
# 应用启动时自动加载所有插件
from .plugin_manage.loader import load_plugins
load_plugins()
load_plugins()

@ -1,3 +1,4 @@
#gq:
import _thread
import logging
@ -18,13 +19,15 @@ from oauth.models import OAuthUser
logger = logging.getLogger(__name__)
# 自定义信号OAuth用户登录成功后触发
oauth_user_login_signal = django.dispatch.Signal(['id'])
send_email_signal = django.dispatch.Signal(
['emailto', 'title', 'content'])
# 自定义信号:需要发送邮件时触发
send_email_signal = django.dispatch.Signal(['emailto', 'title', 'content'])
@receiver(send_email_signal)
def send_email_signal_handler(sender, **kwargs):
"""发送HTML邮件并记录发送日志"""
emailto = kwargs['emailto']
title = kwargs['title']
content = kwargs['content']
@ -53,9 +56,11 @@ def send_email_signal_handler(sender, **kwargs):
@receiver(oauth_user_login_signal)
def oauth_user_login_signal_handler(sender, **kwargs):
"""OAuth用户登录后处理头像本地化并清理侧边栏缓存"""
id = kwargs['id']
oauthuser = OAuthUser.objects.get(id=id)
site = get_current_site().domain
# 如果头像是外部链接,则下载本地化
if oauthuser.picture and not oauthuser.picture.find(site) >= 0:
from djangoblog.utils import save_user_avatar
oauthuser.picture = save_user_avatar(oauthuser.picture)
@ -73,20 +78,31 @@ def model_post_save_callback(
using,
update_fields,
**kwargs):
"""
模型保存后触发
1. 对有get_full_url方法的模型如文章通知搜索引擎
2. 对评论清理相关缓存并异步发送邮件通知
"""
clearcache = False
# 忽略Admin日志
if isinstance(instance, LogEntry):
return
# 处理内容模型(如文章)
if 'get_full_url' in dir(instance):
is_update_views = update_fields == {'views'}
# 非测试环境且非仅更新浏览量,则通知搜索引擎
if not settings.TESTING and not is_update_views:
try:
notify_url = instance.get_full_url()
SpiderNotify.baidu_notify([notify_url])
except Exception as ex:
logger.error("notify sipder", ex)
# 非仅更新浏览量,则标记需要清理缓存
if not is_update_views:
clearcache = True
# 处理评论
if isinstance(instance, Comment):
if instance.is_enable:
path = instance.article.get_absolute_url()
@ -94,21 +110,19 @@ def model_post_save_callback(
if site.find(':') > 0:
site = site[0:site.find(':')]
expire_view_cache(
path,
servername=site,
serverport=80,
key_prefix='blogdetail')
# 清理文章页、评论、侧边栏等相关缓存
expire_view_cache(path, servername=site, serverport=80, key_prefix='blogdetail')
if cache.get('seo_processor'):
cache.delete('seo_processor')
comment_cache_key = 'article_comments_{id}'.format(
id=instance.article.id)
comment_cache_key = 'article_comments_{id}'.format(id=instance.article.id)
cache.delete(comment_cache_key)
delete_sidebar_cache()
delete_view_cache('article_comments', [str(instance.article.pk)])
# 异步发送评论通知邮件
_thread.start_new_thread(send_comment_email, (instance,))
# 根据标记清理全局缓存
if clearcache:
cache.clear()
@ -116,7 +130,8 @@ def model_post_save_callback(
@receiver(user_logged_in)
@receiver(user_logged_out)
def user_auth_callback(sender, request, user, **kwargs):
"""用户登录/登出后,清理侧边栏缓存"""
if user and user.username:
logger.info(user)
delete_sidebar_cache()
# cache.clear()
# cache.clear() # 注释掉了全局缓存清理,仅清理侧边栏

@ -1,3 +1,4 @@
#gq:
from django.utils.encoding import force_str
from elasticsearch_dsl import Q
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query
@ -12,51 +13,57 @@ logger = logging.getLogger(__name__)
class ElasticSearchBackend(BaseSearchBackend):
"""Elasticsearch搜索后端实现Haystack接口处理索引与搜索逻辑"""
def __init__(self, connection_alias, **connection_options):
super(
ElasticSearchBackend,
self).__init__(
connection_alias,
**connection_options)
self.manager = ArticleDocumentManager()
self.include_spelling = True
self.manager = ArticleDocumentManager() # 文档索引管理工具
self.include_spelling = True # 启用拼写建议
def _get_models(self, iterable):
"""将模型实例/ID转换为Elasticsearch文档对象"""
models = iterable if iterable and iterable[0] else Article.objects.all()
docs = self.manager.convert_to_doc(models)
return docs
def _create(self, models):
"""创建索引并批量重建文档"""
self.manager.create_index()
docs = self._get_models(models)
self.manager.rebuild(docs)
def _delete(self, models):
"""删除指定文档"""
for m in models:
m.delete()
return True
def _rebuild(self, models):
"""增量更新索引文档"""
models = models if models else Article.objects.all()
docs = self.manager.convert_to_doc(models)
docs = self._get_models(models)
self.manager.update_docs(docs)
def update(self, index, iterable, commit=True):
"""更新索引将模型实例同步到Elasticsearch"""
models = self._get_models(iterable)
self.manager.update_docs(models)
def remove(self, obj_or_string):
"""从索引中删除单个对象"""
models = self._get_models([obj_or_string])
self._delete(models)
def clear(self, models=None, commit=True):
"""清空整个索引"""
self.remove(None)
@staticmethod
def get_suggestion(query: str) -> str:
"""获取推荐词, 如果没有找到添加原搜索词"""
"""获取搜索推荐词,无建议则返回原查询词"""
search = ArticleDocument.search() \
.query("match", body=query) \
.suggest('suggest_search', query, term={'field': 'body'}) \
@ -64,30 +71,31 @@ class ElasticSearchBackend(BaseSearchBackend):
keywords = []
for suggest in search.suggest.suggest_search:
if suggest["options"]:
keywords.append(suggest["options"][0]["text"])
else:
keywords.append(suggest["text"])
# 有建议取第一个,无则用原词
keywords.append(suggest["options"][0]["text"] if suggest["options"] else suggest["text"])
return ' '.join(keywords)
@log_query
def search(self, query_string, **kwargs):
"""核心搜索逻辑:匹配文章标题/正文,过滤已发布文章,支持分页和拼写建议"""
logger.info('search query_string:' + query_string)
start_offset = kwargs.get('start_offset')
end_offset = kwargs.get('end_offset')
start_offset = kwargs.get('start_offset') # 分页起始位置
end_offset = kwargs.get('end_offset') # 分页结束位置
# 推荐词搜索
# 启用推荐词搜索
if getattr(self, "is_suggest", None):
suggestion = self.get_suggestion(query_string)
else:
suggestion = query_string
# 构建查询匹配正文或标题最低70%匹配度
q = Q('bool',
should=[Q('match', body=suggestion), Q('match', title=suggestion)],
minimum_should_match="70%")
# 执行搜索过滤已发布status='p'、文章类型type='a'),不返回原始文档
search = ArticleDocument.search() \
.query('bool', filter=[q]) \
.filter('term', status='p') \
@ -95,8 +103,9 @@ class ElasticSearchBackend(BaseSearchBackend):
.source(False)[start_offset: end_offset]
results = search.execute()
hits = results['hits'].total
hits = results['hits'].total # 总命中数
raw_results = []
# 格式化结果为Haystack兼容的SearchResult对象
for raw_result in results['hits']['hits']:
app_label = 'blog'
model_name = 'Article'
@ -107,11 +116,12 @@ class ElasticSearchBackend(BaseSearchBackend):
result = result_class(
app_label,
model_name,
raw_result['_id'],
raw_result['_score'],
raw_result['_id'], # 文档ID
raw_result['_score'], # 相关性得分
**additional_fields)
raw_results.append(result)
facets = {}
# 若推荐词与原词不同则返回建议
spelling_suggestion = None if query_string == suggestion else suggestion
return {
@ -123,21 +133,16 @@ class ElasticSearchBackend(BaseSearchBackend):
class ElasticSearchQuery(BaseSearchQuery):
"""Elasticsearch查询构建类适配Haystack接口"""
def _convert_datetime(self, date):
"""将datetime转换为Elasticsearch兼容的字符串格式"""
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S'))
else:
return force_str(date.strftime('%Y%m%d000000'))
def clean(self, query_fragment):
"""
Provides a mechanism for sanitizing user input before presenting the
value to the backend.
Whoosh 1.X differs here in that you can no longer use a backslash
to escape reserved characters. Instead, the whole word should be
quoted.
"""
"""清理查询片段处理保留字和特殊字符兼容Whoosh逻辑"""
words = query_fragment.split()
cleaned_words = []
@ -155,29 +160,34 @@ class ElasticSearchQuery(BaseSearchQuery):
return ' '.join(cleaned_words)
def build_query_fragment(self, field, filter_type, value):
"""构建查询片段,直接返回查询字符串"""
return value.query_string
def get_count(self):
"""获取查询结果总数"""
results = self.get_results()
return len(results) if results else 0
def get_spelling_suggestion(self, preferred_query=None):
"""返回拼写建议词"""
return self._spelling_suggestion
def build_params(self, spelling_query=None):
"""构建查询参数,复用父类逻辑"""
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
return kwargs
class ElasticSearchModelSearchForm(ModelSearchForm):
"""自定义搜索表单,控制是否启用搜索建议"""
def search(self):
# 是否建议搜索
# 根据请求参数决定是否启用建议is_suggest != "no"时启用)
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
sqs = super().search()
return sqs
class ElasticSearchEngine(BaseEngine):
"""Elasticsearch引擎入口指定后端和查询类"""
backend = ElasticSearchBackend
query = ElasticSearchQuery
query = ElasticSearchQuery

@ -1,40 +1,49 @@
#gq:
from django.contrib.auth import get_user_model
from django.contrib.syndication.views import Feed
from django.contrib.syndication.views import Feed # Django内置RSS/Atom订阅生成视图
from django.utils import timezone
from django.utils.feedgenerator import Rss201rev2Feed
from blog.models import Article
from djangoblog.utils import CommonMarkdown
from django.utils.feedgenerator import Rss201rev2Feed # RSS 2.0标准生成器
from blog.models import Article # 博客文章模型
from djangoblog.utils import CommonMarkdown # Markdown解析工具将Markdown转为HTML
class DjangoBlogFeed(Feed):
feed_type = Rss201rev2Feed
"""博客RSS订阅Feed类生成符合RSS 2.0标准的订阅内容"""
feed_type = Rss201rev2Feed # 指定Feed类型为RSS 2.0
description = '大巧无工,重剑无锋.'
title = "且听风吟 大巧无工,重剑无锋. "
link = "/feed/"
description = '大巧无工,重剑无锋.' # Feed描述
title = "且听风吟 大巧无工,重剑无锋. " # Feed标题订阅列表中显示
link = "/feed/" # Feed的URL地址
def author_name(self):
"""Feed作者名称取系统第一个用户的昵称"""
return get_user_model().objects.first().nickname
def author_link(self):
"""Feed作者链接取系统第一个用户的个人主页URL"""
return get_user_model().objects.first().get_absolute_url()
def items(self):
"""Feed订阅的内容列表最新5篇已发布status='p'的文章type='a'"""
return Article.objects.filter(type='a', status='p').order_by('-pub_time')[:5]
def item_title(self, item):
"""单个订阅项(文章)的标题:使用文章标题"""
return item.title
def item_description(self, item):
"""单个订阅项的描述将文章Markdown正文转为HTML"""
return CommonMarkdown.get_markdown(item.body)
def feed_copyright(self):
"""Feed版权信息显示当前年份的版权声明"""
now = timezone.now()
return "Copyright© {year} 且听风吟".format(year=now.year)
def item_link(self, item):
"""单个订阅项的链接文章详情页URL"""
return item.get_absolute_url()
def item_guid(self, item):
return
"""单个订阅项的唯一标识(预留方法,暂未实现)"""
return

@ -1,3 +1,4 @@
#gq:
from django.contrib import admin
from django.contrib.admin.models import DELETION
from django.contrib.contenttypes.models import ContentType
@ -7,54 +8,32 @@ from django.utils.html import escape
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
class LogEntryAdmin(admin.ModelAdmin):
list_filter = [
'content_type'
]
search_fields = [
'object_repr',
'change_message'
]
list_display_links = [
'action_time',
'get_change_message',
]
list_display = [
'action_time',
'user_link',
'content_type',
'object_link',
'get_change_message',
]
"""Admin操作日志自定义管理类优化展示与权限控制"""
list_filter = ['content_type'] # 按内容类型筛选
search_fields = ['object_repr', 'change_message'] # 搜索对象描述、操作信息
list_display_links = ['action_time', 'get_change_message'] # 可点击跳转字段
list_display = ['action_time', 'user_link', 'content_type', 'object_link', 'get_change_message'] # 列表展示字段
def has_add_permission(self, request):
"""禁用添加:日志自动生成,不允许手动添加"""
return False
def has_change_permission(self, request, obj=None):
return (
request.user.is_superuser or
request.user.has_perm('admin.change_logentry')
) and request.method != 'POST'
"""仅超级用户/有权限用户可查看禁止POST修改"""
return (request.user.is_superuser or request.user.has_perm('admin.change_logentry')) and request.method != 'POST'
def has_delete_permission(self, request, obj=None):
"""禁用删除:日志需保留"""
return False
def object_link(self, obj):
"""操作对象字段非删除操作显示Admin编辑链接"""
object_link = escape(obj.object_repr)
content_type = obj.content_type
if obj.action_flag != DELETION and content_type is not None:
# try returning an actual link instead of object repr string
if obj.action_flag != DELETION and obj.content_type:
try:
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.object_id]
)
object_link = '<a href="{}">{}</a>'.format(url, object_link)
url = reverse(f'admin:{obj.content_type.app_label}_{obj.content_type.model}_change', args=[obj.object_id])
object_link = f'<a href="{url}">{object_link}</a>'
except NoReverseMatch:
pass
return mark_safe(object_link)
@ -63,16 +42,12 @@ class LogEntryAdmin(admin.ModelAdmin):
object_link.short_description = _('object')
def user_link(self, obj):
"""操作用户字段显示用户Admin编辑链接"""
content_type = ContentType.objects.get_for_model(type(obj.user))
user_link = escape(force_str(obj.user))
try:
# try returning an actual link instead of object repr string
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.user.pk]
)
user_link = '<a href="{}">{}</a>'.format(url, user_link)
url = reverse(f'admin:{content_type.app_label}_{content_type.model}_change', args=[obj.user.pk])
user_link = f'<a href="{url}">{user_link}</a>'
except NoReverseMatch:
pass
return mark_safe(user_link)
@ -81,11 +56,11 @@ class LogEntryAdmin(admin.ModelAdmin):
user_link.short_description = _('user')
def get_queryset(self, request):
queryset = super(LogEntryAdmin, self).get_queryset(request)
return queryset.prefetch_related('content_type')
"""预加载content_type优化查询性能"""
return super().get_queryset(request).prefetch_related('content_type')
def get_actions(self, request):
actions = super(LogEntryAdmin, self).get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions
"""移除批量删除操作"""
actions = super().get_actions(request)
del actions['delete_selected']
return actions

@ -1,18 +1,25 @@
#gq:
import logging
# 获取当前模块的日志记录器,方便在插件中输出日志
logger = logging.getLogger(__name__)
class BasePlugin:
# 插件元数据
# 插件元数据,子类必须定义这些属性,否则会在初始化时抛出异常
PLUGIN_NAME = None
PLUGIN_DESCRIPTION = None
PLUGIN_VERSION = None
def __init__(self):
# 检查插件元数据是否完整,不完整则抛出 ValueError 异常
if not all([self.PLUGIN_NAME, self.PLUGIN_DESCRIPTION, self.PLUGIN_VERSION]):
raise ValueError("Plugin metadata (PLUGIN_NAME, PLUGIN_DESCRIPTION, PLUGIN_VERSION) must be defined.")
# 调用插件初始化方法,子类可以在这里完成特定的初始化操作
self.init_plugin()
# 注册插件钩子,子类可以在这里注册需要监听的钩子
self.register_hooks()
def init_plugin(self):
@ -20,6 +27,7 @@ class BasePlugin:
插件初始化逻辑
子类可以重写此方法来实现特定的初始化操作
"""
# 默认行为:输出插件初始化日志
logger.info(f'{self.PLUGIN_NAME} initialized.')
def register_hooks(self):
@ -27,6 +35,7 @@ class BasePlugin:
注册插件钩子
子类可以重写此方法来注册特定的钩子
"""
# 默认行为:空实现,子类根据需要覆盖
pass
def get_plugin_info(self):
@ -34,8 +43,9 @@ class BasePlugin:
获取插件信息
:return: 包含插件元数据的字典
"""
# 返回插件的基本信息,供外部调用者获取
return {
'name': self.PLUGIN_NAME,
'description': self.PLUGIN_DESCRIPTION,
'version': self.PLUGIN_VERSION
}
}

@ -1,13 +1,18 @@
#gq:
import logging
# 获取当前模块的日志记录器,用于输出钩子系统的日志
logger = logging.getLogger(__name__)
# 私有全局字典,用于存储所有注册的钩子
# 结构:{ "hook_name": [callback1, callback2, ...] }
_hooks = {}
def register(hook_name: str, callback: callable):
"""
注册一个钩子回调
如果钩子名不存在则初始化为空列表再将回调函数追加进去
"""
if hook_name not in _hooks:
_hooks[hook_name] = []
@ -17,8 +22,9 @@ def register(hook_name: str, callback: callable):
def run_action(hook_name: str, *args, **kwargs):
"""
执行一个 Action Hook
它会按顺序执行所有注册到该钩子上的回调函数
执行一个 Action Hook动作钩子
它会按顺序执行所有注册到该钩子上的回调函数无返回值
如果某个回调抛出异常记录错误日志并继续执行下一个回调
"""
if hook_name in _hooks:
logger.debug(f"Running action hook '{hook_name}'")
@ -31,8 +37,10 @@ def run_action(hook_name: str, *args, **kwargs):
def apply_filters(hook_name: str, value, *args, **kwargs):
"""
执行一个 Filter Hook
它会把 value 依次传递给所有注册的回调函数进行处理
执行一个 Filter Hook过滤钩子
它会将 value 依次传递给所有注册的回调函数进行处理每次处理结果作为下次输入
如果某个回调抛出异常记录错误日志并跳过该回调
最后返回经过所有过滤器处理后的值
"""
if hook_name in _hooks:
logger.debug(f"Applying filter hook '{hook_name}'")
@ -41,4 +49,4 @@ def apply_filters(hook_name: str, value, *args, **kwargs):
value = callback(value, *args, **kwargs)
except Exception as e:
logger.error(f"Error applying filter hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
return value
return value

@ -1,19 +1,30 @@
#gq:
import os
import logging
from django.conf import settings
# 获取当前模块的日志记录器,用于输出插件加载过程中的日志
logger = logging.getLogger(__name__)
def load_plugins():
"""
Dynamically loads and initializes plugins from the 'plugins' directory.
This function is intended to be called when the Django app registry is ready.
动态加载并初始化 plugins 目录下的插件
应在 Django App Registry 完全就绪后调用 AppConfig.ready()
遍历 settings.ACTIVE_PLUGINS 列表依次导入每个插件的 plugin.py 模块
"""
# 遍历配置中启用的插件列表
for plugin_name in settings.ACTIVE_PLUGINS:
# 拼接插件目录绝对路径
plugin_path = os.path.join(settings.PLUGINS_DIR, plugin_name)
# 仅当目录存在且目录下包含 plugin.py 文件时才尝试导入
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, 'plugin.py')):
try:
# 使用 __import__ 动态导入插件模块
# 导入路径示例plugins.<plugin_name>.plugin
__import__(f'plugins.{plugin_name}.plugin')
logger.info(f"Successfully loaded plugin: {plugin_name}")
except ImportError as e:
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)
# 导入失败时记录错误日志,包含异常堆栈
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)

@ -1,59 +1,70 @@
from django.contrib.sitemaps import Sitemap
from django.urls import reverse
from blog.models import Article, Category, Tag
#gq:
from django.contrib.sitemaps import Sitemap # Django内置站点地图生成类
from django.urls import reverse # URL反向解析
from blog.models import Article, Category, Tag # 导入博客相关模型
class StaticViewSitemap(Sitemap):
priority = 0.5
changefreq = 'daily'
"""静态页面站点地图:如首页"""
priority = 0.5 # 页面优先级0.0-1.0
changefreq = 'daily' # 内容更新频率
def items(self):
"""返回要包含的静态页面名称列表对应URL名称"""
return ['blog:index', ]
def location(self, item):
"""返回每个静态页面的URL"""
return reverse(item)
class ArticleSiteMap(Sitemap):
changefreq = "monthly"
priority = "0.6"
"""文章页面站点地图"""
changefreq = "monthly" # 文章更新频率
priority = "0.6" # 文章页面优先级
def items(self):
"""返回所有已发布status='p')的文章"""
return Article.objects.filter(status='p')
def lastmod(self, obj):
"""返回文章的最后修改时间"""
return obj.last_modify_time
class CategorySiteMap(Sitemap):
changefreq = "Weekly"
priority = "0.6"
"""分类页面站点地图"""
changefreq = "Weekly" # 分类更新频率
priority = "0.6" # 分类页面优先级
def items(self):
"""返回所有分类"""
return Category.objects.all()
def lastmod(self, obj):
"""返回分类的最后修改时间"""
return obj.last_modify_time
class TagSiteMap(Sitemap):
changefreq = "Weekly"
priority = "0.3"
"""标签页面站点地图"""
changefreq = "Weekly" # 标签更新频率
priority = "0.3" # 标签页面优先级(低于文章和分类)
def items(self):
"""返回所有标签"""
return Tag.objects.all()
def lastmod(self, obj):
"""返回标签的最后修改时间"""
return obj.last_modify_time
class UserSiteMap(Sitemap):
changefreq = "Weekly"
priority = "0.3"
"""用户页面站点地图"""
changefreq = "Weekly" # 用户信息更新频率
priority = "0.3" # 用户页面优先级
def items(self):
"""返回所有发布过文章的不重复作者列表"""
return list(set(map(lambda x: x.author, Article.objects.all())))
def lastmod(self, obj):
return obj.date_joined
"""返回用户的注册时间(作为站点地图的最后更新时间)"""
return obj.date_joined

@ -1,21 +1,26 @@
#gq:
import logging
import requests # HTTP请求库用于向搜索引擎提交URL
from django.conf import settings # 导入Django项目配置
import requests
from django.conf import settings
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__) # 初始化日志对象
class SpiderNotify():
"""搜索引擎爬虫通知类向百度等搜索引擎提交新URL加速收录"""
@staticmethod
def baidu_notify(urls):
"""向百度搜索引擎提交URL列表请求收录"""
try:
# 格式化URL数据每行一个URL百度要求的提交格式
data = '\n'.join(urls)
# 发送POST请求到百度收录接口配置在settings.BAIDU_NOTIFY_URL
result = requests.post(settings.BAIDU_NOTIFY_URL, data=data)
logger.info(result.text)
logger.info(result.text) # 记录提交结果
except Exception as e:
logger.error(e)
logger.error(e) # 记录提交失败异常
@staticmethod
def notify(url):
SpiderNotify.baidu_notify(url)
"""通用通知方法:调用百度收录提交(可扩展支持其他搜索引擎)"""
SpiderNotify.baidu_notify(url)

@ -1,15 +1,21 @@
#gq:
from django.test import TestCase
from djangoblog.utils import *
class DjangoBlogTest(TestCase):
"""测试 djangoblog.utils 模块的核心工具函数"""
def setUp(self):
pass
def test_utils(self):
# 测试SHA-256哈希生成
md5 = get_sha256('test')
self.assertIsNotNone(md5)
# 测试Markdown解析含标题、代码块、链接
c = CommonMarkdown.get_markdown('''
# Title1

@ -1,3 +1,4 @@
#gq:
"""djangoblog URL Configuration
The `urlpatterns` list routes URLs to views. For more information please see:
@ -17,48 +18,59 @@ from django.conf import settings
from django.conf.urls.i18n import i18n_patterns
from django.conf.urls.static import static
from django.contrib.sitemaps.views import sitemap
from django.urls import path, include
from django.urls import re_path
from django.urls import path, include, re_path
from haystack.views import search_view_factory
from blog.views import EsSearchView
from djangoblog.admin_site import admin_site
from djangoblog.elasticsearch_backend import ElasticSearchModelSearchForm
from djangoblog.feeds import DjangoBlogFeed
from djangoblog.sitemap import ArticleSiteMap, CategorySiteMap, StaticViewSitemap, TagSiteMap, UserSiteMap
from djangoblog.sitemap import (
ArticleSiteMap, CategorySiteMap, StaticViewSitemap, TagSiteMap, UserSiteMap
)
# 站点地图配置
sitemaps = {
'blog': ArticleSiteMap,
'Category': CategorySiteMap,
'Tag': TagSiteMap,
'User': UserSiteMap,
'static': StaticViewSitemap
'blog': ArticleSiteMap, # 文章
'Category': CategorySiteMap, # 分类
'Tag': TagSiteMap, # 标签
'User': UserSiteMap, # 用户
'static': StaticViewSitemap # 静态页面
}
handler404 = 'blog.views.page_not_found_view'
handler500 = 'blog.views.server_error_view'
handle403 = 'blog.views.permission_denied_view'
# 自定义错误页面
handler404 = 'blog.views.page_not_found_view' # 404
handler500 = 'blog.views.server_error_view' # 500
handle403 = 'blog.views.permission_denied_view'# 403
urlpatterns = [
path('i18n/', include('django.conf.urls.i18n')),
path('i18n/', include('django.conf.urls.i18n')), # 国际化
]
# 国际化URL多语言支持
urlpatterns += i18n_patterns(
re_path(r'^admin/', admin_site.urls),
re_path(r'', include('blog.urls', namespace='blog')),
re_path(r'mdeditor/', include('mdeditor.urls')),
re_path(r'', include('comments.urls', namespace='comment')),
re_path(r'', include('accounts.urls', namespace='account')),
re_path(r'', include('oauth.urls', namespace='oauth')),
re_path(r'^sitemap\.xml$', sitemap, {'sitemaps': sitemaps},
name='django.contrib.sitemaps.views.sitemap'),
re_path(r'^feed/$', DjangoBlogFeed()),
re_path(r'^rss/$', DjangoBlogFeed()),
re_path('^search', search_view_factory(view_class=EsSearchView, form_class=ElasticSearchModelSearchForm),
name='search'),
re_path(r'', include('servermanager.urls', namespace='servermanager')),
re_path(r'', include('owntracks.urls', namespace='owntracks'))
, prefix_default_language=False) + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
re_path(r'^admin/', admin_site.urls), # 后台管理
re_path(r'', include('blog.urls', namespace='blog')), # 博客主体
re_path(r'mdeditor/', include('mdeditor.urls')), # Markdown编辑器
re_path(r'', include('comments.urls', namespace='comment')), # 评论
re_path(r'', include('accounts.urls', namespace='account')), # 账户
re_path(r'', include('oauth.urls', namespace='oauth')), # 第三方登录
re_path(r'^sitemap\.xml$', sitemap, {'sitemaps': sitemaps}), # 站点地图
re_path(r'^feed/$', DjangoBlogFeed()), # RSS订阅
re_path(r'^rss/$', DjangoBlogFeed()), # RSS订阅(备用)
# Elasticsearch搜索
re_path('^search', search_view_factory(
view_class=EsSearchView,
form_class=ElasticSearchModelSearchForm
), name='search'),
re_path(r'', include('servermanager.urls', namespace='servermanager')), # 服务器管理
re_path(r'', include('owntracks.urls', namespace='owntracks')), # 位置跟踪
prefix_default_language=False
)
# 静态文件URL
urlpatterns += static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
# 开发环境媒体文件URL
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL,
document_root=settings.MEDIA_ROOT)
urlpatterns += static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)

@ -1,13 +1,14 @@
#gq:
#!/usr/bin/env python
# encoding: utf-8
import logging
import os
import random
import string
import uuid
from hashlib import sha256
from urllib.parse import quote
import bleach
import markdown
@ -15,52 +16,51 @@ import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.core.cache.utils import make_template_fragment_key
from django.http import HttpRequest
from django.templatetags.static import static
from django.utils.cache import get_cache_key
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""获取最新文章和评论的ID"""
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""计算字符串的SHA-256哈希值"""
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
"""函数缓存装饰器默认缓存3分钟"""
def wrapper(func):
def news(*args, **kwargs):
try:
# 尝试从请求对象获取缓存键
view = args[0]
key = view.get_cache_key()
except:
key = None
if not key:
# 否则根据函数和参数生成唯一键
unique_str = repr((func, args, kwargs))
key = get_sha256(unique_str)
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
value = cache.get(key)
if value is not None:
# logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
logger.debug(
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
value = func(*args, **kwargs)
if value is None:
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
# 返回缓存值,处理空值标记
return None if str(value) == '__default_cache_value__' else value
# 缓存未命中,执行函数并缓存结果
logger.debug(f'cache_decorator set cache:{func.__name__} key:{key}')
value = func(*args, **kwargs)
cache.set(key, value if value is not None else '__default_cache_value__', expiration)
return value
return news
@ -68,165 +68,143 @@ def cache_decorator(expiration=3 * 60):
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
:param path:url路径
:param servername:host
:param serverport:端口
:param key_prefix:前缀
:return:是否成功
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
"""刷新指定URL的视图缓存"""
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
logger.info(f'expire_view_cache:get key:{path}')
cache.delete(key)
return True
return False
@cache_decorator()
def get_current_site():
site = Site.objects.get_current()
return site
"""获取当前站点信息(带缓存)"""
return Site.objects.get_current()
class CommonMarkdown:
"""Markdown解析工具类"""
@staticmethod
def _convert_markdown(value):
md = markdown.Markdown(
extensions=[
'extra',
'codehilite',
'toc',
'tables',
]
)
body = md.convert(value)
toc = md.toc
return body, toc
"""内部方法执行Markdown转换返回HTML和目录"""
md = markdown.Markdown(extensions=['extra', 'codehilite', 'toc', 'tables'])
return md.convert(value), md.toc
@staticmethod
def get_markdown_with_toc(value):
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
"""转换Markdown为HTML含目录"""
return CommonMarkdown._convert_markdown(value)
@staticmethod
def get_markdown(value):
body, toc = CommonMarkdown._convert_markdown(value)
"""转换Markdown为HTML不含目录"""
body, _ = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""发送邮件(通过信号解耦)"""
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
send_email_signal.send(send_email.__class__, emailto=emailto, title=title, content=content)
def generate_code() -> str:
"""生成随机数验证码"""
"""生成6位随机数字验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
"""将字典转换为URL查询字符串"""
return '&'.join([f'{quote(k, safe="/")}={quote(v, safe="/")}' for k, v in dict.items()])
def get_blog_setting():
"""获取博客系统设置(带缓存,无数据时初始化)"""
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value)
return value
from blog.models import BlogSettings
if not BlogSettings.objects.count():
# 初始化默认设置
setting = BlogSettings(
site_name='djangoblog',
site_description='基于Django的博客系统',
site_seo_description='基于Django的博客系统',
site_keywords='Django,Python',
article_sub_length=300,
sidebar_article_count=10,
sidebar_comment_count=5,
show_google_adsense=False,
open_site_comment=True,
analytics_code='',
beian_code='',
show_gongan_code=False,
comment_need_review=False
)
setting.save()
value = BlogSettings.objects.first()
cache.set('get_blog_setting', value)
return value
def save_user_avatar(url):
'''
保存用户头像
:param url:头像url
:return: 本地路径
'''
logger.info(url)
def save_user_avatar(url):
"""下载并保存用户头像到本地返回静态文件URL"""
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir)
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
save_filename = str(uuid.uuid4().hex) + ext
logger.info('保存用户头像:' + basedir + save_filename)
os.makedirs(basedir, exist_ok=True)
# 确定文件扩展名
ext = os.path.splitext(url)[1] if any(
url.endswith(ext) for ext in ['.jpg', '.png', 'jpeg', '.gif']) else '.jpg'
save_filename = f'{uuid.uuid4().hex}{ext}'
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename)
return static(f'avatar/{save_filename}')
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png')
return static('blog/img/avatar.png') # 返回默认头像
def delete_sidebar_cache():
"""删除侧边栏相关缓存"""
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
keys = [f"sidebar{x}" for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
logger.info(f'delete sidebar key:{k}')
cache.delete(k)
def delete_view_cache(prefix, keys):
from django.core.cache.utils import make_template_fragment_key
"""删除指定模板片段缓存"""
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
"""获取静态资源基础URL"""
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
site = get_current_site()
return f'http://{site.domain}/static/'
# HTML清理配置
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
def sanitize_html(html):
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)
"""清理HTML只保留允许的标签和属性"""
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save