Compare commits

...

2 Commits

@ -0,0 +1,3 @@
# 设置Django应用的默认配置类
# 当Django启动时会自动使用这里指定的应用配置类
default_app_config = 'djangoblog.apps.DjangoblogAppConfig'

@ -0,0 +1,81 @@
from django.contrib.admin import AdminSite
from django.contrib.admin.models import LogEntry
from django.contrib.sites.admin import SiteAdmin
from django.contrib.sites.models import Site
# 导入各个应用的admin模块和模型
from accounts.admin import *
from blog.admin import *
from blog.models import *
from comments.admin import *
from comments.models import *
from djangoblog.logentryadmin import LogEntryAdmin
from oauth.admin import *
from oauth.models import *
from owntracks.admin import *
from owntracks.models import *
from servermanager.admin import *
from servermanager.models import *
class DjangoBlogAdminSite(AdminSite):
"""自定义DjangoBlog管理站点"""
# 管理站点头部标题
site_header = 'djangoblog administration'
# 管理站点页面标题
site_title = 'djangoblog site admin'
def __init__(self, name='admin'):
"""初始化管理站点"""
super().__init__(name)
def has_permission(self, request):
"""检查用户权限:只允许超级用户访问"""
return request.user.is_superuser
# 以下是注释掉的URL配置示例可用于添加自定义管理页面
# def get_urls(self):
# urls = super().get_urls()
# from django.urls import path
# from blog.views import refresh_memcache
#
# my_urls = [
# path('refresh/', self.admin_view(refresh_memcache), name="refresh"),
# ]
# return urls + my_urls
# 创建DjangoBlog管理站点实例
admin_site = DjangoBlogAdminSite(name='admin')
# 注册博客相关的模型和管理类
admin_site.register(Article, ArticlelAdmin)
admin_site.register(Category, CategoryAdmin)
admin_site.register(Tag, TagAdmin)
admin_site.register(Links, LinksAdmin)
admin_site.register(SideBar, SideBarAdmin)
admin_site.register(BlogSettings, BlogSettingsAdmin)
# 注册服务器管理相关的模型
admin_site.register(commands, CommandsAdmin)
admin_site.register(EmailSendLog, EmailSendLogAdmin)
# 注册用户账户模型
admin_site.register(BlogUser, BlogUserAdmin)
# 注册评论模型
admin_site.register(Comment, CommentAdmin)
# 注册OAuth认证相关模型
admin_site.register(OAuthUser, OAuthUserAdmin)
admin_site.register(OAuthConfig, OAuthConfigAdmin)
# 注册OwnTracks位置跟踪模型
admin_site.register(OwnTrackLog, OwnTrackLogsAdmin)
# 注册Django站点模型
admin_site.register(Site, SiteAdmin)
# 注册Django日志条目模型
admin_site.register(LogEntry, LogEntryAdmin)

@ -0,0 +1,20 @@
from django.apps import AppConfig
class DjangoblogAppConfig(AppConfig):
"""Djangoblog应用的配置类"""
# 设置默认的自动主键字段类型为BigAutoField64位自增整数
default_auto_field = 'django.db.models.BigAutoField'
# 指定应用名称
name = 'djangoblog'
def ready(self):
"""应用准备就绪时执行的方法"""
# 调用父类的ready方法
super().ready()
# 导入并加载插件
# 这里在应用启动时自动加载所有插件
from .plugin_manage.loader import load_plugins
load_plugins()

@ -0,0 +1,162 @@
import _thread
import logging
import django.dispatch
from django.conf import settings
from django.contrib.admin.models import LogEntry
from django.contrib.auth.signals import user_logged_in, user_logged_out
from django.core.mail import EmailMultiAlternatives
from django.db.models.signals import post_save
from django.dispatch import receiver
# 导入自定义模型和工具函数
from comments.models import Comment
from comments.utils import send_comment_email
from djangoblog.spider_notify import SpiderNotify
from djangoblog.utils import cache, expire_view_cache, delete_sidebar_cache, delete_view_cache
from djangoblog.utils import get_current_site
from oauth.models import OAuthUser
# 获取logger实例
logger = logging.getLogger(__name__)
# 定义自定义信号
# OAuth用户登录信号传递用户id
oauth_user_login_signal = django.dispatch.Signal(['id'])
# 发送邮件信号,传递收件人、标题和内容
send_email_signal = django.dispatch.Signal(
['emailto', 'title', 'content'])
@receiver(send_email_signal)
def send_email_signal_handler(sender, **kwargs):
"""发送邮件信号处理器"""
# 从信号参数中获取邮件相关信息
emailto = kwargs['emailto']
title = kwargs['title']
content = kwargs['content']
# 创建邮件消息对象
msg = EmailMultiAlternatives(
title,
content,
from_email=settings.DEFAULT_FROM_EMAIL,
to=emailto)
msg.content_subtype = "html" # 设置邮件内容类型为HTML
# 记录邮件发送日志
from servermanager.models import EmailSendLog
log = EmailSendLog()
log.title = title
log.content = content
log.emailto = ','.join(emailto)
try:
# 尝试发送邮件
result = msg.send()
log.send_result = result > 0 # 发送成功结果为True
except Exception as e:
# 记录发送失败日志
logger.error(f"失败邮箱号: {emailto}, {e}")
log.send_result = False
log.save() # 保存邮件发送日志
@receiver(oauth_user_login_signal)
def oauth_user_login_signal_handler(sender, **kwargs):
"""OAuth用户登录信号处理器"""
id = kwargs['id']
oauthuser = OAuthUser.objects.get(id=id)
site = get_current_site().domain
# 如果用户头像不在当前站点域名下,则保存头像到本地
if oauthuser.picture and not oauthuser.picture.find(site) >= 0:
from djangoblog.utils import save_user_avatar
oauthuser.picture = save_user_avatar(oauthuser.picture)
oauthuser.save()
# 删除侧边栏缓存
delete_sidebar_cache()
@receiver(post_save)
def model_post_save_callback(
sender,
instance,
created,
raw,
using,
update_fields,
**kwargs):
"""模型保存后的通用回调函数"""
clearcache = False
# 如果是LogEntryDjango管理员日志直接返回
if isinstance(instance, LogEntry):
return
# 检查实例是否有get_full_url方法通常是有URL的模型
if 'get_full_url' in dir(instance):
# 判断是否只是更新浏览量
is_update_views = update_fields == {'views'}
# 如果不是测试环境且不是更新浏览量,则通知搜索引擎
if not settings.TESTING and not is_update_views:
try:
notify_url = instance.get_full_url()
SpiderNotify.baidu_notify([notify_url]) # 通知百度搜索引擎
except Exception as ex:
logger.error("notify sipder", ex)
# 如果不是更新浏览量,设置清除缓存标志
if not is_update_views:
clearcache = True
# 如果是评论模型
if isinstance(instance, Comment):
# 如果评论是启用的
if instance.is_enable:
path = instance.article.get_absolute_url()
site = get_current_site().domain
# 处理站点域名(移除端口号)
if site.find(':') > 0:
site = site[0:site.find(':')]
# 使文章详情页缓存过期
expire_view_cache(
path,
servername=site,
serverport=80,
key_prefix='blogdetail')
# 删除SEO处理器缓存
if cache.get('seo_processor'):
cache.delete('seo_processor')
# 删除文章评论缓存
comment_cache_key = 'article_comments_{id}'.format(
id=instance.article.id)
cache.delete(comment_cache_key)
# 删除侧边栏缓存
delete_sidebar_cache()
# 删除文章评论视图缓存
delete_view_cache('article_comments', [str(instance.article.pk)])
# 在新线程中发送评论通知邮件
_thread.start_new_thread(send_comment_email, (instance,))
# 如果需要清除缓存
if clearcache:
cache.clear() # 清除所有缓存
@receiver(user_logged_in)
@receiver(user_logged_out)
def user_auth_callback(sender, request, user, **kwargs):
"""用户登录/登出信号处理器"""
if user and user.username:
logger.info(user) # 记录用户信息
delete_sidebar_cache() # 删除侧边栏缓存
# cache.clear() # 注释掉的清除所有缓存代码

@ -0,0 +1,216 @@
from django.utils.encoding import force_str
from elasticsearch_dsl import Q
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query
from haystack.forms import ModelSearchForm
from haystack.models import SearchResult
from haystack.utils import log as logging
# 导入自定义的Elasticsearch文档和模型
from blog.documents import ArticleDocument, ArticleDocumentManager
from blog.models import Article
# 获取日志记录器
logger = logging.getLogger(__name__)
class ElasticSearchBackend(BaseSearchBackend):
"""Elasticsearch搜索后端实现"""
def __init__(self, connection_alias, **connection_options):
"""初始化Elasticsearch后端"""
super(
ElasticSearchBackend,
self).__init__(
connection_alias,
**connection_options)
self.manager = ArticleDocumentManager() # 文章文档管理器
self.include_spelling = True # 是否包含拼写建议
def _get_models(self, iterable):
"""获取模型实例并转换为文档"""
# 如果有提供模型列表则使用,否则获取所有文章
models = iterable if iterable and iterable[0] else Article.objects.all()
docs = self.manager.convert_to_doc(models) # 将模型转换为Elasticsearch文档
return docs
def _create(self, models):
"""创建索引并重建文档"""
self.manager.create_index() # 创建Elasticsearch索引
docs = self._get_models(models)
self.manager.rebuild(docs) # 重建所有文档
def _delete(self, models):
"""删除文档"""
for m in models:
m.delete()
return True
def _rebuild(self, models):
"""重建索引文档"""
models = models if models else Article.objects.all()
docs = self.manager.convert_to_doc(models)
self.manager.update_docs(docs) # 更新文档
def update(self, index, iterable, commit=True):
"""更新文档"""
models = self._get_models(iterable)
self.manager.update_docs(models)
def remove(self, obj_or_string):
"""移除指定对象"""
models = self._get_models([obj_or_string])
self._delete(models)
def clear(self, models=None, commit=True):
"""清空索引"""
self.remove(None)
@staticmethod
def get_suggestion(query: str) -> str:
"""获取搜索建议词,如果没有找到建议词则返回原搜索词"""
# 构建搜索建议查询
search = ArticleDocument.search() \
.query("match", body=query) \
.suggest('suggest_search', query, term={'field': 'body'}) \
.execute()
keywords = []
# 处理建议结果
for suggest in search.suggest.suggest_search:
if suggest["options"]:
keywords.append(suggest["options"][0]["text"]) # 使用建议词
else:
keywords.append(suggest["text"]) # 使用原词
return ' '.join(keywords)
@log_query # 记录查询日志的装饰器
def search(self, query_string, **kwargs):
"""执行搜索查询"""
logger.info('search query_string:' + query_string)
# 获取分页参数
start_offset = kwargs.get('start_offset')
end_offset = kwargs.get('end_offset')
# 推荐词搜索:如果启用建议,则获取建议词
if getattr(self, "is_suggest", None):
suggestion = self.get_suggestion(query_string)
else:
suggestion = query_string
# 构建搜索查询在标题和正文中匹配设置最小匹配度70%
q = Q('bool',
should=[Q('match', body=suggestion), Q('match', title=suggestion)],
minimum_should_match="70%")
# 执行搜索过滤已发布的状态为p且类型为a的文章
search = ArticleDocument.search() \
.query('bool', filter=[q]) \
.filter('term', status='p') \
.filter('term', type='a') \
.source(False)[start_offset: end_offset] # 不返回源文档内容,只返回元数据
results = search.execute()
hits = results['hits'].total # 总命中数
raw_results = []
# 处理搜索结果
for raw_result in results['hits']['hits']:
app_label = 'blog'
model_name = 'Article'
additional_fields = {}
result_class = SearchResult
# 创建搜索结果对象
result = result_class(
app_label,
model_name,
raw_result['_id'], # 文档ID
raw_result['_score'], # 相关性分数
**additional_fields)
raw_results.append(result)
facets = {}
# 如果查询词与建议词不同,则设置拼写建议
spelling_suggestion = None if query_string == suggestion else suggestion
return {
'results': raw_results, # 搜索结果列表
'hits': hits, # 总命中数
'facets': facets, # 分面搜索数据
'spelling_suggestion': spelling_suggestion, # 拼写建议
}
class ElasticSearchQuery(BaseSearchQuery):
"""Elasticsearch查询构建器"""
def _convert_datetime(self, date):
"""转换日期时间格式"""
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S')) # 包含时间的格式
else:
return force_str(date.strftime('%Y%m%d000000')) # 只包含日期的格式
def clean(self, query_fragment):
"""
清理用户输入的查询片段转义保留字符
Whoosh 1.X与此不同不再使用反斜杠转义保留字符
而是应该引用整个单词
"""
words = query_fragment.split()
cleaned_words = []
for word in words:
# 处理保留字
if word in self.backend.RESERVED_WORDS:
word = word.replace(word, word.lower())
# 处理保留字符
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = "'%s'" % word # 用引号包围包含保留字符的单词
break
cleaned_words.append(word)
return ' '.join(cleaned_words)
def build_query_fragment(self, field, filter_type, value):
"""构建查询片段"""
return value.query_string
def get_count(self):
"""获取搜索结果数量"""
results = self.get_results()
return len(results) if results else 0
def get_spelling_suggestion(self, preferred_query=None):
"""获取拼写建议"""
return self._spelling_suggestion
def build_params(self, spelling_query=None):
"""构建查询参数"""
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
return kwargs
class ElasticSearchModelSearchForm(ModelSearchForm):
"""Elasticsearch模型搜索表单"""
def search(self):
"""执行搜索,根据参数决定是否使用建议搜索"""
# 是否建议搜索从请求数据中获取is_suggest参数
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
sqs = super().search() # 调用父类搜索方法
return sqs
class ElasticSearchEngine(BaseEngine):
"""Elasticsearch搜索引擎"""
backend = ElasticSearchBackend # 指定后端类
query = ElasticSearchQuery # 指定查询类

@ -0,0 +1,58 @@
from django.contrib.auth import get_user_model
from django.contrib.syndication.views import Feed
from django.utils import timezone
from django.utils.feedgenerator import Rss201rev2Feed
# 导入自定义模型和工具
from blog.models import Article
from djangoblog.utils import CommonMarkdown
class DjangoBlogFeed(Feed):
"""DjangoBlog的RSS订阅源类"""
# 指定Feed类型为RSS 2.0
feed_type = Rss201rev2Feed
# Feed的描述信息
description = '大巧无工,重剑无锋.'
# Feed的标题
title = "且听风吟 大巧无工,重剑无锋. "
# Feed的链接地址
link = "/feed/"
def author_name(self):
"""获取作者名称 - 返回第一个用户的昵称"""
return get_user_model().objects.first().nickname
def author_link(self):
"""获取作者链接 - 返回第一个用户的绝对URL"""
return get_user_model().objects.first().get_absolute_url()
def items(self):
"""获取要在Feed中显示的项目列表"""
# 返回最近发布的5篇文章过滤条件类型为'article'且状态为'published'
return Article.objects.filter(type='a', status='p').order_by('-pub_time')[:5]
def item_title(self, item):
"""获取单个项目的标题"""
return item.title
def item_description(self, item):
"""获取单个项目的描述 - 将Markdown内容转换为HTML"""
return CommonMarkdown.get_markdown(item.body)
def feed_copyright(self):
"""获取Feed的版权信息"""
now = timezone.now()
return "Copyright© {year} 且听风吟".format(year=now.year)
def item_link(self, item):
"""获取单个项目的链接"""
return item.get_absolute_url()
def item_guid(self, item):
"""获取单个项目的全局唯一标识符(当前未实现)"""
# 注意:这个方法目前没有返回值,可能需要根据需求实现
# 通常应该返回一个唯一标识项目的字符串如文章的ID或永久链接
pass

@ -0,0 +1,113 @@
from django.contrib import admin
from django.contrib.admin.models import DELETION
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse, NoReverseMatch
from django.utils.encoding import force_str
from django.utils.html import escape
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
class LogEntryAdmin(admin.ModelAdmin):
"""Django管理员日志条目的自定义管理界面"""
# 列表页过滤器配置:按内容类型过滤
list_filter = [
'content_type'
]
# 搜索字段配置:可按对象表示和变更消息搜索
search_fields = [
'object_repr',
'change_message'
]
# 列表页中可点击的链接字段
list_display_links = [
'action_time',
'get_change_message',
]
# 列表页显示的字段
list_display = [
'action_time', # 操作时间
'user_link', # 用户链接(自定义)
'content_type', # 内容类型
'object_link', # 对象链接(自定义)
'get_change_message', # 变更消息
]
def has_add_permission(self, request):
"""禁止添加新的日志条目"""
return False
def has_change_permission(self, request, obj=None):
"""修改权限控制只允许超级用户或具有特定权限的用户查看不允许POST修改"""
return (
request.user.is_superuser or
request.user.has_perm('admin.change_logentry')
) and request.method != 'POST'
def has_delete_permission(self, request, obj=None):
"""禁止删除日志条目"""
return False
def object_link(self, obj):
"""生成对象链接的显示"""
object_link = escape(obj.object_repr) # 转义对象表示字符串
content_type = obj.content_type
# 如果不是删除操作且内容类型存在,尝试生成可点击的链接
if obj.action_flag != DELETION and content_type is not None:
try:
# 构建管理员修改页面的URL
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.object_id]
)
# 创建HTML链接
object_link = '<a href="{}">{}</a>'.format(url, object_link)
except NoReverseMatch:
# 如果无法生成URL保持原样
pass
return mark_safe(object_link) # 标记为安全HTML
# 设置对象链接字段的排序和显示名称
object_link.admin_order_field = 'object_repr'
object_link.short_description = _('object')
def user_link(self, obj):
"""生成用户链接的显示"""
content_type = ContentType.objects.get_for_model(type(obj.user))
user_link = escape(force_str(obj.user)) # 转义用户字符串
try:
# 构建用户修改页面的URL
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.user.pk]
)
# 创建HTML链接
user_link = '<a href="{}">{}</a>'.format(url, user_link)
except NoReverseMatch:
# 如果无法生成URL保持原样
pass
return mark_safe(user_link) # 标记为安全HTML
# 设置用户链接字段的排序和显示名称
user_link.admin_order_field = 'user'
user_link.short_description = _('user')
def get_queryset(self, request):
"""获取查询集预取content_type关系以提高性能"""
queryset = super(LogEntryAdmin, self).get_queryset(request)
return queryset.prefetch_related('content_type')
def get_actions(self, request):
"""获取可用的批量操作,移除删除选中操作"""
actions = super(LogEntryAdmin, self).get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected'] # 移除批量删除选项
return actions

@ -0,0 +1,41 @@
import logging
logger = logging.getLogger(__name__)
class BasePlugin:
# 插件元数据
PLUGIN_NAME = None
PLUGIN_DESCRIPTION = None
PLUGIN_VERSION = None
def __init__(self):
if not all([self.PLUGIN_NAME, self.PLUGIN_DESCRIPTION, self.PLUGIN_VERSION]):
raise ValueError("Plugin metadata (PLUGIN_NAME, PLUGIN_DESCRIPTION, PLUGIN_VERSION) must be defined.")
self.init_plugin()
self.register_hooks()
def init_plugin(self):
"""
插件初始化逻辑
子类可以重写此方法来实现特定的初始化操作
"""
logger.info(f'{self.PLUGIN_NAME} initialized.')
def register_hooks(self):
"""
注册插件钩子
子类可以重写此方法来注册特定的钩子
"""
pass
def get_plugin_info(self):
"""
获取插件信息
:return: 包含插件元数据的字典
"""
return {
'name': self.PLUGIN_NAME,
'description': self.PLUGIN_DESCRIPTION,
'version': self.PLUGIN_VERSION
}

@ -0,0 +1,7 @@
ARTICLE_DETAIL_LOAD = 'article_detail_load'
ARTICLE_CREATE = 'article_create'
ARTICLE_UPDATE = 'article_update'
ARTICLE_DELETE = 'article_delete'
ARTICLE_CONTENT_HOOK_NAME = "the_content"

@ -0,0 +1,44 @@
import logging
logger = logging.getLogger(__name__)
_hooks = {}
def register(hook_name: str, callback: callable):
"""
注册一个钩子回调
"""
if hook_name not in _hooks:
_hooks[hook_name] = []
_hooks[hook_name].append(callback)
logger.debug(f"Registered hook '{hook_name}' with callback '{callback.__name__}'")
def run_action(hook_name: str, *args, **kwargs):
"""
执行一个 Action Hook
它会按顺序执行所有注册到该钩子上的回调函数
"""
if hook_name in _hooks:
logger.debug(f"Running action hook '{hook_name}'")
for callback in _hooks[hook_name]:
try:
callback(*args, **kwargs)
except Exception as e:
logger.error(f"Error running action hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
def apply_filters(hook_name: str, value, *args, **kwargs):
"""
执行一个 Filter Hook
它会把 value 依次传递给所有注册的回调函数进行处理
"""
if hook_name in _hooks:
logger.debug(f"Applying filter hook '{hook_name}'")
for callback in _hooks[hook_name]:
try:
value = callback(value, *args, **kwargs)
except Exception as e:
logger.error(f"Error applying filter hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
return value

@ -0,0 +1,19 @@
import os
import logging
from django.conf import settings
logger = logging.getLogger(__name__)
def load_plugins():
"""
Dynamically loads and initializes plugins from the 'plugins' directory.
This function is intended to be called when the Django app registry is ready.
"""
for plugin_name in settings.ACTIVE_PLUGINS:
plugin_path = os.path.join(settings.PLUGINS_DIR, plugin_name)
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, 'plugin.py')):
try:
__import__(f'plugins.{plugin_name}.plugin')
logger.info(f"Successfully loaded plugin: {plugin_name}")
except ImportError as e:
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)

@ -0,0 +1,345 @@
"""
Django settings for djangoblog project.
Generated by 'django-admin startproject' using Django 1.10.2.
For more information on this file, see
https://docs.djangoproject.com/en/1.10/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.10/ref/settings/
"""
import os
import sys
from pathlib import Path
from django.utils.translation import gettext_lazy as _
def env_to_bool(env, default):
"""将环境变量转换为布尔值"""
str_val = os.environ.get(env)
return default if str_val is None else str_val == 'True'
# 构建项目内部路径BASE_DIR / 'subdir'
BASE_DIR = Path(__file__).resolve().parent.parent
# 快速开发配置 - 不适用于生产环境
# 参见 https://docs.djangoproject.com/en/1.10/howto/deployment/checklist/
# 安全警告:在生产环境中保持密钥保密!
SECRET_KEY = os.environ.get(
'DJANGO_SECRET_KEY') or 'n9ceqv38)#&mwuat@(mjb_p%em$e8$qyr#fw9ot!=ba6lijx-6'
# 安全警告:在生产环境中不要开启调试模式!
DEBUG = env_to_bool('DJANGO_DEBUG', True)
# DEBUG = False
TESTING = len(sys.argv) > 1 and sys.argv[1] == 'test' # 检测是否在测试模式
# 允许的主机名
# ALLOWED_HOSTS = []
ALLOWED_HOSTS = ['*', '127.0.0.1', 'example.com']
# Django 4.0新增配置受信任的CSRF来源
CSRF_TRUSTED_ORIGINS = ['http://example.com']
# 应用定义
INSTALLED_APPS = [
# 'django.contrib.admin',
'django.contrib.admin.apps.SimpleAdminConfig', # 使用简化的管理员配置
'django.contrib.auth', # 认证系统
'django.contrib.contenttypes', # 内容类型框架
'django.contrib.sessions', # 会话框架
'django.contrib.messages', # 消息框架
'django.contrib.staticfiles', # 静态文件管理
'django.contrib.sites', # 站点框架
'django.contrib.sitemaps', # 站点地图
'mdeditor', # Markdown编辑器
'haystack', # 搜索框架
'blog', # 博客应用
'accounts', # 账户应用
'comments', # 评论应用
'oauth', # OAuth认证
'servermanager', # 服务器管理
'owntracks', # 位置跟踪
'compressor', # 静态文件压缩
'djangoblog' # 主应用
]
# 中间件配置
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware', # 安全中间件
'django.contrib.sessions.middleware.SessionMiddleware', # 会话中间件
'django.middleware.locale.LocaleMiddleware', # 国际化中间件
'django.middleware.gzip.GZipMiddleware', # Gzip压缩
# 'django.middleware.cache.UpdateCacheMiddleware', # 缓存更新(注释)
'django.middleware.common.CommonMiddleware', # 通用中间件
# 'django.middleware.cache.FetchFromCacheMiddleware', # 缓存获取(注释)
'django.middleware.csrf.CsrfViewMiddleware', # CSRF保护
'django.contrib.auth.middleware.AuthenticationMiddleware', # 认证中间件
'django.contrib.messages.middleware.MessageMiddleware', # 消息中间件
'django.middleware.clickjacking.XFrameOptionsMiddleware', # 点击劫持保护
'django.middleware.http.ConditionalGetMiddleware', # 条件GET请求
'blog.middleware.OnlineMiddleware' # 自定义在线用户中间件
]
# 根URL配置
ROOT_URLCONF = 'djangoblog.urls'
# 模板配置
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates', # Django模板引擎
'DIRS': [os.path.join(BASE_DIR, 'templates')], # 模板目录
'APP_DIRS': True, # 启用应用模板目录
'OPTIONS': {
'context_processors': [ # 上下文处理器
'django.template.context_processors.debug', # 调试信息
'django.template.context_processors.request', # 请求对象
'django.contrib.auth.context_processors.auth', # 认证信息
'django.contrib.messages.context_processors.messages', # 消息框架
'blog.context_processors.seo_processor' # 自定义SEO处理器
],
},
},
]
# WSGI应用配置
WSGI_APPLICATION = 'djangoblog.wsgi.application'
# 数据库配置
# https://docs.djangoproject.com/en/1.10/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql', # MySQL数据库引擎
'NAME': os.environ.get('DJANGO_MYSQL_DATABASE') or 'djangoblog', # 数据库名
'USER': os.environ.get('DJANGO_MYSQL_USER') or 'django_user', # 用户名
'PASSWORD': os.environ.get('DJANGO_MYSQL_PASSWORD') or 'wzm216921', # 密码
'HOST': os.environ.get('DJANGO_MYSQL_HOST') or '127.0.0.1', # 主机
'PORT': int(
os.environ.get('DJANGO_MYSQL_PORT') or 3306), # 端口
'OPTIONS': {
'charset': 'utf8mb4'}, # 字符集配置
}}
# 密码验证
# https://docs.djangoproject.com/en/1.10/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator', # 用户属性相似性验证
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator', # 最小长度验证
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator', # 常见密码验证
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator', # 数字密码验证
},
]
# 国际化配置
LANGUAGES = (
('en', _('English')), # 英语
('zh-hans', _('Simplified Chinese')), # 简体中文
('zh-hant', _('Traditional Chinese')), # 繁体中文
)
LOCALE_PATHS = (
os.path.join(BASE_DIR, 'locale'), # 本地化文件路径
)
LANGUAGE_CODE = 'zh-hans' # 默认语言
TIME_ZONE = 'Asia/Shanghai' # 时区
USE_I18N = True # 启用国际化
USE_L10N = True # 启用本地化
USE_TZ = False # 不使用时区支持
# 静态文件 (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.10/howto/static-files/
# Haystack搜索配置
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.whoosh_cn_backend.WhooshEngine', # Whoosh搜索引擎
'PATH': os.path.join(os.path.dirname(__file__), 'whoosh_index'), # 索引路径
},
}
# 自动更新搜索索引
HAYSTACK_SIGNAL_PROCESSOR = 'haystack.signals.RealtimeSignalProcessor'
# 允许用户使用邮箱或用户名登录
AUTHENTICATION_BACKENDS = [
'accounts.user_login_backend.EmailOrUsernameModelBackend']
STATIC_ROOT = os.path.join(BASE_DIR, 'collectedstatic') # 静态文件收集目录
STATIC_URL = '/static/' # 静态文件URL
STATICFILES = os.path.join(BASE_DIR, 'static') # 静态文件目录
AUTH_USER_MODEL = 'accounts.BlogUser' # 自定义用户模型
LOGIN_URL = '/login/' # 登录URL
TIME_FORMAT = '%Y-%m-%d %H:%M:%S' # 时间格式
DATE_TIME_FORMAT = '%Y-%m-%d' # 日期格式
# bootstrap颜色样式
BOOTSTRAP_COLOR_TYPES = [
'default', 'primary', 'success', 'info', 'warning', 'danger'
]
# 分页设置
PAGINATE_BY = 10
# HTTP缓存超时时间
CACHE_CONTROL_MAX_AGE = 2592000
# 缓存设置
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', # 本地内存缓存
'TIMEOUT': 10800, # 缓存超时时间3小时
'LOCATION': 'unique-snowflake', # 缓存位置标识
}
}
# 使用redis作为缓存
if os.environ.get("DJANGO_REDIS_URL"):
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache', # Redis缓存后端
'LOCATION': f'redis://{os.environ.get("DJANGO_REDIS_URL")}', # Redis连接URL
}
}
SITE_ID = 1 # 站点ID
# 百度站长平台通知URL
BAIDU_NOTIFY_URL = os.environ.get('DJANGO_BAIDU_NOTIFY_URL') \
or 'http://data.zz.baidu.com/urls?site=https://www.lylinux.net&token=1uAOGrMsUm5syDGn'
# 邮箱配置:
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend' # SMTP后端
EMAIL_USE_TLS = env_to_bool('DJANGO_EMAIL_TLS', False) # 是否使用TLS
EMAIL_USE_SSL = env_to_bool('DJANGO_EMAIL_SSL', True) # 是否使用SSL
EMAIL_HOST = os.environ.get('DJANGO_EMAIL_HOST') or 'smtp.mxhichina.com' # SMTP主机
EMAIL_PORT = int(os.environ.get('DJANGO_EMAIL_PORT') or 465) # SMTP端口
EMAIL_HOST_USER = os.environ.get('DJANGO_EMAIL_USER') # 邮箱用户
EMAIL_HOST_PASSWORD = os.environ.get('DJANGO_EMAIL_PASSWORD') # 邮箱密码
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER # 默认发件人
SERVER_EMAIL = EMAIL_HOST_USER # 服务器邮箱
# 设置debug=false不会处理异常邮件通知
ADMINS = [('admin', os.environ.get('DJANGO_ADMIN_EMAIL') or 'admin@admin.com')] # 管理员邮箱
# 微信管理员密码两次MD5加密
WXADMIN = os.environ.get(
'DJANGO_WXADMIN_PASSWORD') or '995F03AC401D6CABABAEF756FC4D43C7'
# 日志配置
LOG_PATH = os.path.join(BASE_DIR, 'logs') # 日志路径
if not os.path.exists(LOG_PATH):
os.makedirs(LOG_PATH, exist_ok=True) # 创建日志目录
LOGGING = {
'version': 1, # 日志配置版本
'disable_existing_loggers': False, # 不禁用现有日志记录器
'root': {
'level': 'INFO', # 根日志级别
'handlers': ['console', 'log_file'], # 处理器
},
'formatters': { # 日志格式
'verbose': {
'format': '[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d %(module)s] %(message)s', # 详细格式
}
},
'filters': { # 过滤器
'require_debug_false': {
'()': 'django.utils.log.RequireDebugFalse', # 要求调试模式为False
},
'require_debug_true': {
'()': 'django.utils.log.RequireDebugTrue', # 要求调试模式为True
},
},
'handlers': { # 处理器
'log_file': {
'level': 'INFO', # 日志级别
'class': 'logging.handlers.TimedRotatingFileHandler', # 按时间轮转的文件处理器
'filename': os.path.join(LOG_PATH, 'djangoblog.log'), # 日志文件路径
'when': 'D', # 按天轮转
'formatter': 'verbose', # 使用详细格式
'interval': 1, # 间隔1天
'delay': True, # 延迟创建
'backupCount': 5, # 保留5个备份
'encoding': 'utf-8' # 文件编码
},
'console': { # 控制台处理器
'level': 'DEBUG',
'filters': ['require_debug_true'], # 仅在调试模式下生效
'class': 'logging.StreamHandler',
'formatter': 'verbose'
},
'null': { # 空处理器
'class': 'logging.NullHandler',
},
'mail_admins': { # 管理员邮件处理器
'level': 'ERROR',
'filters': ['require_debug_false'], # 仅在非调试模式下生效
'class': 'django.utils.log.AdminEmailHandler'
}
},
'loggers': { # 日志记录器
'djangoblog': {
'handlers': ['log_file', 'console'],
'level': 'INFO',
'propagate': True, # 向上传播
},
'django.request': { # Django请求日志
'handlers': ['mail_admins'], # 发送邮件给管理员
'level': 'ERROR',
'propagate': False, # 不向上传播
}
}
}
# 静态文件查找器
STATICFILES_FINDERS = (
'django.contrib.staticfiles.finders.FileSystemFinder', # 文件系统查找器
'django.contrib.staticfiles.finders.AppDirectoriesFinder', # 应用目录查找器
# other
'compressor.finders.CompressorFinder', # 压缩文件查找器
)
COMPRESS_ENABLED = True # 启用压缩
# COMPRESS_OFFLINE = True # 离线压缩(注释)
# CSS压缩过滤器
COMPRESS_CSS_FILTERS = [
# 从相对URL创建绝对URL
'compressor.filters.css_default.CssAbsoluteFilter',
# CSS压缩器
'compressor.filters.cssmin.CSSMinFilter'
]
# JS压缩过滤器
COMPRESS_JS_FILTERS = [
'compressor.filters.jsmin.JSMinFilter' # JS压缩器
]
MEDIA_ROOT = os.path.join(BASE_DIR, 'uploads') # 媒体文件根目录
MEDIA_URL = '/media/' # 媒体文件URL
X_FRAME_OPTIONS = 'SAMEORIGIN' # 帧选项:同源策略
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField' # 默认自增字段类型
# Elasticsearch配置
if os.environ.get('DJANGO_ELASTICSEARCH_HOST'):
ELASTICSEARCH_DSL = {
'default': {
'hosts': os.environ.get('DJANGO_ELASTICSEARCH_HOST') # Elasticsearch主机
},
}
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.elasticsearch_backend.ElasticSearchEngine', # Elasticsearch引擎
},
}
# 插件系统
PLUGINS_DIR = BASE_DIR / 'plugins' # 插件目录
ACTIVE_PLUGINS = [ # 激活的插件列表
'article_copyright', # 文章版权
'reading_time', # 阅读时间
'external_links', # 外部链接
'view_count', # 浏览量统计
'seo_optimizer' # SEO优化
]

@ -0,0 +1,81 @@
from django.contrib.sitemaps import Sitemap
from django.urls import reverse
# 导入博客相关模型
from blog.models import Article, Category, Tag
class StaticViewSitemap(Sitemap):
"""静态页面站点地图"""
priority = 0.5 # 优先级0.0-1.0
changefreq = 'daily' # 更新频率:每天
def items(self):
"""返回包含在站点地图中的项目列表"""
return ['blog:index', ] # 博客首页
def location(self, item):
"""返回每个项目的绝对URL"""
return reverse(item) # 通过反向解析生成URL
class ArticleSiteMap(Sitemap):
"""文章站点地图"""
changefreq = "monthly" # 更新频率:每月
priority = "0.6" # 优先级0.6
def items(self):
"""返回所有已发布的文章"""
return Article.objects.filter(status='p') # 状态为'p'(已发布)的文章
def lastmod(self, obj):
"""返回文章的最后修改时间"""
return obj.last_modify_time # 文章的最后修改时间
class CategorySiteMap(Sitemap):
"""分类站点地图"""
changefreq = "Weekly" # 更新频率:每周
priority = "0.6" # 优先级0.6
def items(self):
"""返回所有分类"""
return Category.objects.all() # 所有分类
def lastmod(self, obj):
"""返回分类的最后修改时间"""
return obj.last_modify_time # 分类的最后修改时间
class TagSiteMap(Sitemap):
"""标签站点地图"""
changefreq = "Weekly" # 更新频率:每周
priority = "0.3" # 优先级0.3(标签页优先级较低)
def items(self):
"""返回所有标签"""
return Tag.objects.all() # 所有标签
def lastmod(self, obj):
"""返回标签的最后修改时间"""
return obj.last_modify_time # 标签的最后修改时间
class UserSiteMap(Sitemap):
"""用户站点地图"""
changefreq = "Weekly" # 更新频率:每周
priority = "0.3" # 优先级0.3(用户页优先级较低)
def items(self):
"""返回所有有文章的作者(去重)"""
# 获取所有文章的作者并通过set去重再转换为列表
return list(set(map(lambda x: x.author, Article.objects.all())))
def lastmod(self, obj):
"""返回用户的注册时间"""
return obj.date_joined # 用户的注册时间

@ -0,0 +1,40 @@
import logging
import requests
from django.conf import settings
# 获取日志记录器
logger = logging.getLogger(__name__)
class SpiderNotify():
"""搜索引擎爬虫通知类"""
@staticmethod
def baidu_notify(urls):
"""
向百度站长平台提交链接通知百度爬虫抓取更新内容
Args:
urls: 需要通知的URL列表
"""
try:
# 将URL列表转换为换行分隔的字符串格式
data = '\n'.join(urls)
# 向百度站长平台API提交URL数据
result = requests.post(settings.BAIDU_NOTIFY_URL, data=data)
# 记录API返回结果
logger.info(result.text)
except Exception as e:
# 记录通知过程中的错误
logger.error(e)
@staticmethod
def notify(url):
"""
通用的爬虫通知方法目前仅支持百度
Args:
url: 需要通知的URL
"""
SpiderNotify.baidu_notify(url)

@ -0,0 +1,43 @@
from django.test import TestCase
# 导入工具函数
from djangoblog.utils import *
class DjangoBlogTest(TestCase):
"""DjangoBlog应用测试类"""
def setUp(self):
"""测试初始化方法"""
# 可以在这里设置测试数据,当前为空
pass
def test_utils(self):
"""测试工具函数"""
# 测试SHA256加密函数
md5 = get_sha256('test')
self.assertIsNotNone(md5) # 断言加密结果不为空
# 测试Markdown转换函数
c = CommonMarkdown.get_markdown('''
# Title1
```python
import os
```
[url](https://www.lylinux.net/)
[ddd](http://www.baidu.com)
''')
self.assertIsNotNone(c) # 断言Markdown转换结果不为空
# 测试字典转URL参数字符串函数
d = {
'd': 'key1',
'd2': 'key2'
}
data = parse_dict_to_url(d)
self.assertIsNotNone(data) # 断言转换结果不为空

@ -0,0 +1,73 @@
"""djangoblog URL 配置
`urlpatterns` 列表将 URL 路由到视图更多信息请参阅
https://docs.djangoproject.com/en/1.10/topics/http/urls/
示例
函数视图
1. 导入: from my_app import views
2. 添加 URL urlpatterns: url(r'^$', views.home, name='home')
基于类的视图
1. 导入: from other_app.views import Home
2. 添加 URL urlpatterns: url(r'^$', Home.as_view(), name='home')
包含其他 URLconf
1. 导入 include() 函数: from django.conf.urls import url, include
2. 添加 URL urlpatterns: url(r'^blog/', include('blog.urls'))
"""
from django.conf import settings
from django.conf.urls.i18n import i18n_patterns # 国际化URL模式
from django.conf.urls.static import static # 静态文件服务
from django.contrib.sitemaps.views import sitemap # 站点地图视图
from django.urls import path, include
from django.urls import re_path
from haystack.views import search_view_factory # Haystack搜索视图工厂
# 导入自定义视图和组件
from blog.views import EsSearchView
from djangoblog.admin_site import admin_site # 自定义管理员站点
from djangoblog.elasticsearch_backend import ElasticSearchModelSearchForm # Elasticsearch搜索表单
from djangoblog.feeds import DjangoBlogFeed # RSS订阅源
from djangoblog.sitemap import ArticleSiteMap, CategorySiteMap, StaticViewSitemap, TagSiteMap, UserSiteMap
# 站点地图配置字典
sitemaps = {
'blog': ArticleSiteMap, # 文章站点地图
'Category': CategorySiteMap, # 分类站点地图
'Tag': TagSiteMap, # 标签站点地图
'User': UserSiteMap, # 用户站点地图
'static': StaticViewSitemap # 静态页面站点地图
}
# 自定义错误处理视图
handler404 = 'blog.views.page_not_found_view' # 404页面未找到
handler500 = 'blog.views.server_error_view' # 500服务器错误
handle403 = 'blog.views.permission_denied_view' # 403权限拒绝
# 基本URL模式
urlpatterns = [
path('i18n/', include('django.conf.urls.i18n')), # 国际化URL
]
# 添加国际化URL模式
urlpatterns += i18n_patterns(
re_path(r'^admin/', admin_site.urls), # 自定义管理员后台URL
re_path(r'', include('blog.urls', namespace='blog')), # 博客应用URL
re_path(r'mdeditor/', include('mdeditor.urls')), # Markdown编辑器URL
re_path(r'', include('comments.urls', namespace='comment')), # 评论应用URL
re_path(r'', include('accounts.urls', namespace='account')), # 账户应用URL
re_path(r'', include('oauth.urls', namespace='oauth')), # OAuth认证URL
re_path(r'^sitemap\.xml$', sitemap, {'sitemaps': sitemaps}, # 站点地图XML
name='django.contrib.sitemaps.views.sitemap'),
re_path(r'^feed/$', DjangoBlogFeed()), # RSS订阅源URL
re_path(r'^rss/$', DjangoBlogFeed()), # RSS订阅源别名
# 搜索URL使用自定义的Elasticsearch视图和表单
re_path('^search', search_view_factory(view_class=EsSearchView, form_class=ElasticSearchModelSearchForm),
name='search'),
re_path(r'', include('servermanager.urls', namespace='servermanager')), # 服务器管理URL
re_path(r'', include('owntracks.urls', namespace='owntracks')), # OwnTracks位置跟踪URL
prefix_default_language=False # 不在默认语言的URL前添加语言前缀
) + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT) # 静态文件服务
# 调试模式下启用媒体文件服务
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL,
document_root=settings.MEDIA_ROOT)

@ -0,0 +1,262 @@
#!/usr/bin/env python
# encoding: utf-8
import logging
import os
import random
import string
import uuid
from hashlib import sha256
import bleach
import markdown
import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
# 获取日志记录器
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
"""获取最新的文章ID和评论ID"""
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
"""计算字符串的SHA256哈希值"""
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
"""缓存装饰器,用于缓存函数结果
Args:
expiration: 缓存过期时间默认3分钟
"""
def wrapper(func):
def news(*args, **kwargs):
try:
# 尝试从视图获取缓存键
view = args[0]
key = view.get_cache_key()
except:
key = None
if not key:
# 如果没有缓存键,根据函数和参数生成唯一键
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
value = cache.get(key)
if value is not None:
# logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
logger.debug(
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
value = func(*args, **kwargs)
if value is None:
# 如果函数返回None设置默认缓存值
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
:param path:url路径
:param servername:host
:param serverport:端口
:param key_prefix:前缀
:return:是否成功
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
# 创建模拟请求对象
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
# 获取缓存键并删除缓存
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
return True
return False
@cache_decorator()
def get_current_site():
"""获取当前站点(带缓存)"""
site = Site.objects.get_current()
return site
class CommonMarkdown:
"""Markdown处理工具类"""
@staticmethod
def _convert_markdown(value):
"""内部方法转换Markdown文本为HTML"""
md = markdown.Markdown(
extensions=[
'extra', # 额外扩展
'codehilite', # 代码高亮
'toc', # 目录生成
'tables', # 表格支持
]
)
body = md.convert(value) # 转换Markdown为HTML
toc = md.toc # 获取目录
return body, toc
@staticmethod
def get_markdown_with_toc(value):
"""获取带目录的Markdown转换结果"""
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
"""获取Markdown转换结果不带目录"""
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
"""发送邮件(通过信号机制)"""
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
def generate_code() -> str:
"""生成6位随机数验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
"""将字典转换为URL参数字符串"""
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
def get_blog_setting():
"""获取博客设置(带缓存)"""
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
# 如果数据库中没有博客设置,创建默认设置
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value) # 设置缓存
return value
def save_user_avatar(url):
'''
保存用户头像到本地
:param url:头像url
:return: 本地路径
'''
logger.info(url)
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2) # 下载头像
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir) # 创建头像目录
# 检查文件是否为图片格式
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
save_filename = str(uuid.uuid4().hex) + ext # 生成唯一文件名
logger.info('保存用户头像:' + basedir + save_filename)
# 保存头像文件
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename) # 返回静态文件URL
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png') # 返回默认头像
def delete_sidebar_cache():
"""删除侧边栏相关缓存"""
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
cache.delete(k)
def delete_view_cache(prefix, keys):
"""删除视图缓存"""
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
"""获取资源URL静态文件URL"""
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
# HTML清理允许的标签
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
# HTML清理允许的属性
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
def sanitize_html(html):
"""清理HTML移除不安全的标签和属性"""
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)

File diff suppressed because it is too large Load Diff

@ -0,0 +1,20 @@
"""
WSGI config for djangoblog project.
它将WSGI可调用对象暴露为名为``application``的模块级变量
有关此文件的更多信息请参阅
https://docs.djangoproject.com/en/1.10/howto/deployment/wsgi/
"""
import os
# 导入Django的WSGI应用获取函数
from django.core.wsgi import get_wsgi_application
# 设置Django的默认设置模块
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoblog.settings")
# 获取WSGI应用实例
# 这个application变量将被WSGI服务器如Gunicorn、uWSGI使用来服务Django应用
application = get_wsgi_application()
Loading…
Cancel
Save