Compare commits

...

5 Commits

@ -0,0 +1,2 @@
# szy:此文件用于将当前目录识别为一个Python包
default_app_config = 'djangoblog.apps.DjangoblogAppConfig'

@ -0,0 +1,67 @@
# szy功能描述自定义Django后台管理站点并注册各个模型
from django.contrib.admin import AdminSite
from django.contrib.admin.models import LogEntry
from django.contrib.sites.admin import SiteAdmin
from django.contrib.sites.models import Site
from accounts.admin import *
from blog.admin import *
from blog.models import *
from comments.admin import *
from comments.models import *
from djangoblog.logentryadmin import LogEntryAdmin
from oauth.admin import *
from oauth.models import *
from owntracks.admin import *
from owntracks.models import *
from servermanager.admin import *
from servermanager.models import *
# szy自定义Django后台管理站点并注册各个模型
class DjangoBlogAdminSite(AdminSite):
site_header = 'djangoblog administration'
site_title = 'djangoblog site admin'
# szy初始化管理站点设置站点名称
def __init__(self, name='admin'):
super().__init__(name)
# szy检查用户权限是否为超级管理员
def has_permission(self, request):
return request.user.is_superuser
# def get_urls(self):
# urls = super().get_urls()
# from django.urls import path
# from blog.views import refresh_memcache
#
# my_urls = [
# path('refresh/', self.admin_view(refresh_memcache), name="refresh"),
# ]
# return urls + my_urls
# szy注册各个模型到后台管理
admin_site = DjangoBlogAdminSite(name='admin')
admin_site.register(Article, ArticlelAdmin)
admin_site.register(Category, CategoryAdmin)
admin_site.register(Tag, TagAdmin)
admin_site.register(Links, LinksAdmin)
admin_site.register(SideBar, SideBarAdmin)
admin_site.register(BlogSettings, BlogSettingsAdmin)
admin_site.register(commands, CommandsAdmin)
admin_site.register(EmailSendLog, EmailSendLogAdmin)
admin_site.register(BlogUser, BlogUserAdmin)
admin_site.register(Comment, CommentAdmin)
admin_site.register(OAuthUser, OAuthUserAdmin)
admin_site.register(OAuthConfig, OAuthConfigAdmin)
admin_site.register(OwnTrackLog, OwnTrackLogsAdmin)
admin_site.register(Site, SiteAdmin)
admin_site.register(LogEntry, LogEntryAdmin)

@ -0,0 +1,13 @@
from django.apps import AppConfig
# szyDjango应用配置类用于加载插件
class DjangoblogAppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'djangoblog'
# szy应用准备时加载插件
def ready(self):
super().ready()
# Import and load plugins here
from .plugin_manage.loader import load_plugins
load_plugins()

@ -0,0 +1,123 @@
# szy定义Django信号并处理相关业务逻辑
import _thread
import logging
import django.dispatch
from django.conf import settings
from django.contrib.admin.models import LogEntry
from django.contrib.auth.signals import user_logged_in, user_logged_out
from django.core.mail import EmailMultiAlternatives
from django.db.models.signals import post_save
from django.dispatch import receiver
from comments.models import Comment
from comments.utils import send_comment_email
from djangoblog.spider_notify import SpiderNotify
from djangoblog.utils import cache, expire_view_cache, delete_sidebar_cache, delete_view_cache
from djangoblog.utils import get_current_site
from oauth.models import OAuthUser
logger = logging.getLogger(__name__)
oauth_user_login_signal = django.dispatch.Signal(['id'])
send_email_signal = django.dispatch.Signal(
['emailto', 'title', 'content'])
# szy处理发送邮件的信号
@receiver(send_email_signal)
def send_email_signal_handler(sender, **kwargs):
emailto = kwargs['emailto']
title = kwargs['title']
content = kwargs['content']
msg = EmailMultiAlternatives(
title,
content,
from_email=settings.DEFAULT_FROM_EMAIL,
to=emailto)
msg.content_subtype = "html"
from servermanager.models import EmailSendLog
log = EmailSendLog()
log.title = title
log.content = content
log.emailto = ','.join(emailto)
try:
result = msg.send()
log.send_result = result > 0
except Exception as e:
logger.error(f"失败邮箱号: {emailto}, {e}")
log.send_result = False
log.save()
# szy处理OAuth用户登录信号
@receiver(oauth_user_login_signal)
def oauth_user_login_signal_handler(sender, **kwargs):
id = kwargs['id']
oauthuser = OAuthUser.objects.get(id=id)
site = get_current_site().domain
if oauthuser.picture and not oauthuser.picture.find(site) >= 0:
from djangoblog.utils import save_user_avatar
oauthuser.picture = save_user_avatar(oauthuser.picture)
oauthuser.save()
delete_sidebar_cache()
@receiver(post_save)
def model_post_save_callback(
sender,
instance,
created,
raw,
using,
update_fields,
**kwargs):
clearcache = False
if isinstance(instance, LogEntry):
return
if 'get_full_url' in dir(instance):
is_update_views = update_fields == {'views'}
if not settings.TESTING and not is_update_views:
try:
notify_url = instance.get_full_url()
SpiderNotify.baidu_notify([notify_url])
except Exception as ex:
logger.error("notify sipder", ex)
if not is_update_views:
clearcache = True
if isinstance(instance, Comment):
if instance.is_enable:
path = instance.article.get_absolute_url()
site = get_current_site().domain
if site.find(':') > 0:
site = site[0:site.find(':')]
expire_view_cache(
path,
servername=site,
serverport=80,
key_prefix='blogdetail')
if cache.get('seo_processor'):
cache.delete('seo_processor')
comment_cache_key = 'article_comments_{id}'.format(
id=instance.article.id)
cache.delete(comment_cache_key)
delete_sidebar_cache()
delete_view_cache('article_comments', [str(instance.article.pk)])
_thread.start_new_thread(send_comment_email, (instance,))
if clearcache:
cache.clear()
@receiver(user_logged_in)
@receiver(user_logged_out)
def user_auth_callback(sender, request, user, **kwargs):
if user and user.username:
logger.info(user)
delete_sidebar_cache()
# cache.clear()

@ -0,0 +1,205 @@
from django.utils.encoding import force_str
from elasticsearch_dsl import Q
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query
from haystack.forms import ModelSearchForm
from haystack.models import SearchResult
from haystack.utils import log as logging
from blog.documents import ArticleDocument, ArticleDocumentManager
from blog.models import Article
logger = logging.getLogger(__name__)
# szy定义Elasticsearch后端处理索引和查询
class ElasticSearchBackend(BaseSearchBackend):
def __init__(self, connection_alias, **connection_options):
super(
ElasticSearchBackend,
self).__init__(
connection_alias,
**connection_options)
self.manager = ArticleDocumentManager()
self.include_spelling = True
# szy获取要索引的模型数据
def _get_models(self, iterable):
models = iterable if iterable and iterable[0] else Article.objects.all()
docs = self.manager.convert_to_doc(models)
return docs
# szy创建索引
def _create(self, models):
self.manager.create_index()
docs = self._get_models(models)
self.manager.rebuild(docs)
# szy删除索引
def _delete(self, models):
for m in models:
m.delete()
return True
# szy重组索引
def _rebuild(self, models):
models = models if models else Article.objects.all()
docs = self.manager.convert_to_doc(models)
self.manager.update_docs(docs)
# szy更新索引
def update(self, index, iterable, commit=True):
models = self._get_models(iterable)
self.manager.update_docs(models)
# szy移除索引
def remove(self, obj_or_string):
models = self._get_models([obj_or_string])
self._delete(models)
# szy清空索引
def clear(self, models=None, commit=True):
self.remove(None)
# szy获取搜索建议词
@staticmethod
def get_suggestion(query: str) -> str:
"""获取推荐词, 如果没有找到添加原搜索词"""
search = ArticleDocument.search() \
.query("match", body=query) \
.suggest('suggest_search', query, term={'field': 'body'}) \
.execute()
keywords = []
for suggest in search.suggest.suggest_search:
if suggest["options"]:
keywords.append(suggest["options"][0]["text"])
else:
keywords.append(suggest["text"])
return ' '.join(keywords)
# szy执行搜索并返回结果
@log_query
def search(self, query_string, **kwargs):
logger.info('search query_string:' + query_string)
start_offset = kwargs.get('start_offset')
end_offset = kwargs.get('end_offset')
# 推荐词搜索
if getattr(self, "is_suggest", None):
suggestion = self.get_suggestion(query_string)
else:
suggestion = query_string
# szy构建查询条件匹配标题或正文设置最小匹配度
q = Q('bool',
should=[Q('match', body=suggestion), Q('match', title=suggestion)],
minimum_should_match="70%")
# szy执行搜索查询过滤已发布的状态和文章类型
search = ArticleDocument.search() \
.query('bool', filter=[q]) \
.filter('term', status='p') \
.filter('term', type='a') \
.source(False)[start_offset: end_offset]
results = search.execute()
hits = results['hits'].total
raw_results = []
# szy处理搜索结果构建SearchResult对象
for raw_result in results['hits']['hits']:
app_label = 'blog'
model_name = 'Article'
additional_fields = {}
result_class = SearchResult
result = result_class(
app_label,
model_name,
raw_result['_id'],
raw_result['_score'],
**additional_fields)
raw_results.append(result)
facets = {}
# szy设置拼写建议如果查询词与建议词不同则返回建议词
spelling_suggestion = None if query_string == suggestion else suggestion
return {
'results': raw_results,
'hits': hits,
'facets': facets,
'spelling_suggestion': spelling_suggestion,
}
# szy定义Elasticsearch查询类
class ElasticSearchQuery(BaseSearchQuery):
def _convert_datetime(self, date):
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S'))
else:
return force_str(date.strftime('%Y%m%d000000'))
# szy清理查询片段处理保留字和特殊字符
def clean(self, query_fragment):
"""
Provides a mechanism for sanitizing user input before presenting the
value to the backend.
Whoosh 1.X differs here in that you can no longer use a backslash
to escape reserved characters. Instead, the whole word should be
quoted.
"""
words = query_fragment.split()
cleaned_words = []
for word in words:
if word in self.backend.RESERVED_WORDS:
word = word.replace(word, word.lower())
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = "'%s'" % word
break
cleaned_words.append(word)
return ' '.join(cleaned_words)
# szy构建查询片段
def build_query_fragment(self, field, filter_type, value):
return value.query_string
# szy获取查询结果数量
def get_count(self):
results = self.get_results()
return len(results) if results else 0
# szy获取拼写建议
def get_spelling_suggestion(self, preferred_query=None):
return self._spelling_suggestion
# szy构建查询参数
def build_params(self, spelling_query=None):
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
return kwargs
# szy定义Elasticsearch模型搜索表单
class ElasticSearchModelSearchForm(ModelSearchForm):
# szy重写搜索方法支持建议搜索功能
def search(self):
# 是否建议搜索
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
sqs = super().search()
return sqs
# szy定义Elasticsearch搜索引擎
class ElasticSearchEngine(BaseEngine):
backend = ElasticSearchBackend
query = ElasticSearchQuery

@ -0,0 +1,49 @@
from django.contrib.auth import get_user_model
from django.contrib.syndication.views import Feed
from django.utils import timezone
from django.utils.feedgenerator import Rss201rev2Feed
from blog.models import Article
from djangoblog.utils import CommonMarkdown
# szy定义Django博客的RSS订阅类
class DjangoBlogFeed(Feed):
feed_type = Rss201rev2Feed
# szy设置Feed的描述、标题和链接
description = '大巧无工,重剑无锋.'
title = "且听风吟 大巧无工,重剑无锋. "
link = "/feed/"
# szy获取作者名称博客主昵称
def author_name(self):
return get_user_model().objects.first().nickname
# szy获取作者链接地址
def author_link(self):
return get_user_model().objects.first().get_absolute_url()
# szy获取要包含在Feed中的文章项
def items(self):
return Article.objects.filter(type='a', status='p').order_by('-pub_time')[:5]
# szy获取文章项的标题
def item_title(self, item):
return item.title
# szy获取文章项的描述使用Markdown渲染正文
def item_description(self, item):
return CommonMarkdown.get_markdown(item.body)
# szy设置Feed的版权信息
def feed_copyright(self):
now = timezone.now()
return "Copyright© {year} 且听风吟".format(year=now.year)
# szy获取文章项的链接地址
def item_link(self, item):
return item.get_absolute_url()
# szy获取文章项的全局唯一标识符
def item_guid(self, item):
return

@ -0,0 +1,103 @@
from django.contrib import admin
from django.contrib.admin.models import DELETION
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse, NoReverseMatch
from django.utils.encoding import force_str
from django.utils.html import escape
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
# szy定义日志条目管理类
class LogEntryAdmin(admin.ModelAdmin):
# szy设置列表页过滤器字段
list_filter = [
'content_type'
]
# szy设置搜索字段
search_fields = [
'object_repr',
'change_message'
]
# szy设置列表页可点击链接的字段
list_display_links = [
'action_time',
'get_change_message',
]
# szy设置列表页显示的字段
list_display = [
'action_time',
'user_link',
'content_type',
'object_link',
'get_change_message',
]
# szy禁用添加权限
def has_add_permission(self, request):
return False
# szy设置修改权限仅超级用户或有特定权限的用户可以查看
def has_change_permission(self, request, obj=None):
return (
request.user.is_superuser or
request.user.has_perm('admin.change_logentry')
) and request.method != 'POST'
# szy禁用删除权限
def has_delete_permission(self, request, obj=None):
return False
# szy创建对象链接如果是删除操作则不生成链接
def object_link(self, obj):
object_link = escape(obj.object_repr)
content_type = obj.content_type
if obj.action_flag != DELETION and content_type is not None:
# try returning an actual link instead of object repr string
try:
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.object_id]
)
object_link = '<a href="{}">{}</a>'.format(url, object_link)
except NoReverseMatch:
pass
return mark_safe(object_link)
object_link.admin_order_field = 'object_repr'
object_link.short_description = _('object')
# szy创建用户链接链接到用户编辑页面
def user_link(self, obj):
content_type = ContentType.objects.get_for_model(type(obj.user))
user_link = escape(force_str(obj.user))
try:
# try returning an actual link instead of object repr string
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.user.pk]
)
user_link = '<a href="{}">{}</a>'.format(url, user_link)
except NoReverseMatch:
pass
return mark_safe(user_link)
user_link.admin_order_field = 'user'
user_link.short_description = _('user')
# szy优化查询集预取关联的内容类型数据
def get_queryset(self, request):
queryset = super(LogEntryAdmin, self).get_queryset(request)
return queryset.prefetch_related('content_type')
# szy重写获取actions的方法移除删除选中项的action
def get_actions(self, request):
actions = super(LogEntryAdmin, self).get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions

@ -0,0 +1,45 @@
import logging
logger = logging.getLogger(__name__)
# szy基础插件类所有插件都应该继承此类
class BasePlugin:
# szy插件元数据定义
PLUGIN_NAME = None
PLUGIN_DESCRIPTION = None
PLUGIN_VERSION = None
# szy插件初始化方法
def __init__(self):
if not all([self.PLUGIN_NAME, self.PLUGIN_DESCRIPTION, self.PLUGIN_VERSION]):
raise ValueError("Plugin metadata (PLUGIN_NAME, PLUGIN_DESCRIPTION, PLUGIN_VERSION) must be defined.")
self.init_plugin()
self.register_hooks()
# szy插件初始化逻辑子类可重写实现特定初始化
def init_plugin(self):
"""
插件初始化逻辑
子类可以重写此方法来实现特定的初始化操作
"""
logger.info(f'{self.PLUGIN_NAME} initialized.')
# szy注册插件钩子子类可重写实现特定钩子注册
def register_hooks(self):
"""
注册插件钩子
子类可以重写此方法来注册特定的钩子
"""
pass
# szy获取插件信息
def get_plugin_info(self):
"""
获取插件信息
:return: 包含插件元数据的字典
"""
return {
'name': self.PLUGIN_NAME,
'description': self.PLUGIN_DESCRIPTION,
'version': self.PLUGIN_VERSION
}

@ -0,0 +1,10 @@
# szy文章详情加载事件钩子名称
ARTICLE_DETAIL_LOAD = 'article_detail_load'
# szy文章创建事件钩子名称
ARTICLE_CREATE = 'article_create'
# szy文章更新事件钩子名称
ARTICLE_UPDATE = 'article_update'
# szy文章删除事件钩子名称
ARTICLE_DELETE = 'article_delete'
# szy文章内容处理钩子名称
ARTICLE_CONTENT_HOOK_NAME = "the_content"

@ -0,0 +1,48 @@
import logging
logger = logging.getLogger(__name__)
# szy全局钩子字典存储所有注册的钩子
_hooks = {}
def register(hook_name: str, callback: callable):
"""
注册一个钩子回调
"""
if hook_name not in _hooks:
_hooks[hook_name] = []
_hooks[hook_name].append(callback)
logger.debug(f"Registered hook '{hook_name}' with callback '{callback.__name__}'")
def run_action(hook_name: str, *args, **kwargs):
"""
执行一个 Action Hook
它会按顺序执行所有注册到该钩子上的回调函数
"""
if hook_name in _hooks:
logger.debug(f"Running action hook '{hook_name}'")
for callback in _hooks[hook_name]:
try:
callback(*args, **kwargs)
except Exception as e:
logger.error(f"Error running action hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
def apply_filters(hook_name: str, value, *args, **kwargs):
"""
执行一个 Filter Hook
它会把 value 依次传递给所有注册的回调函数进行处理
"""
# szy检查钩子是否存在存在则依次应用所有过滤器
if hook_name in _hooks:
logger.debug(f"Applying filter hook '{hook_name}'")
for callback in _hooks[hook_name]:
try:
# szy将当前值传递给回调函数进行处理并更新值
value = callback(value, *args, **kwargs)
except Exception as e:
# szy捕获并记录过滤器执行过程中的异常
logger.error(f"Error applying filter hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
# szy返回处理后的最终值
return value

@ -0,0 +1,24 @@
import os
import logging
from django.conf import settings
logger = logging.getLogger(__name__)
def load_plugins():
"""
Dynamically loads and initializes plugins from the 'plugins' directory.
This function is intended to be called when the Django app registry is ready.
"""
# szy遍历settings中配置的激活插件列表
for plugin_name in settings.ACTIVE_PLUGINS:
# szy构建插件路径
plugin_path = os.path.join(settings.PLUGINS_DIR, plugin_name)
# szy检查插件目录是否存在且包含plugin.py文件
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, 'plugin.py')):
try:
# szy动态导入插件模块
__import__(f'plugins.{plugin_name}.plugin')
logger.info(f"Successfully loaded plugin: {plugin_name}")
except ImportError as e:
# szy记录插件导入失败的错误信息
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)

@ -0,0 +1,358 @@
"""
Django settings for djangoblog project.
Generated by 'django-admin startproject' using Django 1.10.2.
For more information on this file, see
https://docs.djangoproject.com/en/1.10/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.10/ref/settings/
"""
import os
import sys
from pathlib import Path
from django.utils.translation import gettext_lazy as _
# szy将环境变量转换为布尔值
def env_to_bool(env, default):
str_val = os.environ.get(env)
return default if str_val is None else str_val == 'True'
# szy构建项目基础路径
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.10/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = os.environ.get(
'DJANGO_SECRET_KEY') or 'n9ceqv38)#&mwuat@(mjb_p%em$e8$qyr#fw9ot!=ba6lijx-6'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = env_to_bool('DJANGO_DEBUG', True)
# DEBUG = False
TESTING = len(sys.argv) > 1 and sys.argv[1] == 'test'
# ALLOWED_HOSTS = []
ALLOWED_HOSTS = ['*', '127.0.0.1', 'example.com']
# django 4.0新增配置
CSRF_TRUSTED_ORIGINS = ['http://example.com']
# Application definition
# szy已安装的应用列表
INSTALLED_APPS = [
# 'django.contrib.admin',
'django.contrib.admin.apps.SimpleAdminConfig',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django.contrib.sites',
'django.contrib.sitemaps',
'mdeditor',
'haystack',
'blog',
'accounts',
'comments',
'oauth',
'servermanager',
'owntracks',
'compressor',
'djangoblog'
]
# szy中间件配置
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.locale.LocaleMiddleware',
'django.middleware.gzip.GZipMiddleware',
# 'django.middleware.cache.UpdateCacheMiddleware',
'django.middleware.common.CommonMiddleware',
# 'django.middleware.cache.FetchFromCacheMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django.middleware.http.ConditionalGetMiddleware',
'blog.middleware.OnlineMiddleware'
]
ROOT_URLCONF = 'djangoblog.urls'
# szy模板配置
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [os.path.join(BASE_DIR, 'templates')],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
'blog.context_processors.seo_processor'
],
},
},
]
WSGI_APPLICATION = 'djangoblog.wsgi.application'
# Database
# https://docs.djangoproject.com/en/1.10/ref/settings/#databases
# szyMySQL数据库配置
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': os.environ.get('DJANGO_MYSQL_DATABASE') or 'djangoblog',
'USER': os.environ.get('DJANGO_MYSQL_USER') or 'root',
'PASSWORD': os.environ.get('DJANGO_MYSQL_PASSWORD') or 'root',
'HOST': os.environ.get('DJANGO_MYSQL_HOST') or '127.0.0.1',
'PORT': int(
os.environ.get('DJANGO_MYSQL_PORT') or 3306),
'OPTIONS': {
'charset': 'utf8mb4'},
}}
# szy密码验证配置
# Password validation
# https://docs.djangoproject.com/en/1.10/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# szy国际化配置 - 支持的语言
LANGUAGES = (
('en', _('English')),
('zh-hans', _('Simplified Chinese')),
('zh-hant', _('Traditional Chinese')),
)
# szy本地化文件路径
LOCALE_PATHS = (
os.path.join(BASE_DIR, 'locale'),
)
# szy默认语言代码
LANGUAGE_CODE = 'zh-hans'
# szy时区设置
TIME_ZONE = 'Asia/Shanghai'
# szy启用国际化
USE_I18N = True
# szy启用本地化
USE_L10N = True
# szy使用时区支持
USE_TZ = False
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.10/howto/static-files/
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.whoosh_cn_backend.WhooshEngine',
'PATH': os.path.join(os.path.dirname(__file__), 'whoosh_index'),
},
}
# Automatically update searching index
HAYSTACK_SIGNAL_PROCESSOR = 'haystack.signals.RealtimeSignalProcessor'
# Allow user login with username and password
AUTHENTICATION_BACKENDS = [
'accounts.user_login_backend.EmailOrUsernameModelBackend']
STATIC_ROOT = os.path.join(BASE_DIR, 'collectedstatic')
STATIC_URL = '/static/'
STATICFILES = os.path.join(BASE_DIR, 'static')
AUTH_USER_MODEL = 'accounts.BlogUser'
LOGIN_URL = '/login/'
# szy时间和日期格式
TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
DATE_TIME_FORMAT = '%Y-%m-%d'
# bootstrap color styles
BOOTSTRAP_COLOR_TYPES = [
'default', 'primary', 'success', 'info', 'warning', 'danger'
]
# szy分页设置
# paginate
PAGINATE_BY = 10
# http cache timeout
CACHE_CONTROL_MAX_AGE = 2592000
# cache setting
# szy缓存配置
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'TIMEOUT': 10800,
'LOCATION': 'unique-snowflake',
}
}
# 使用redis作为缓存
if os.environ.get("DJANGO_REDIS_URL"):
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': f'redis://{os.environ.get("DJANGO_REDIS_URL")}',
}
}
# szy站点ID
SITE_ID = 1
BAIDU_NOTIFY_URL = os.environ.get('DJANGO_BAIDU_NOTIFY_URL') \
or 'http://data.zz.baidu.com/urls?site=https://www.lylinux.net&token=1uAOGrMsUm5syDGn'
# szy邮件配置
# Email:
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_USE_TLS = env_to_bool('DJANGO_EMAIL_TLS', False)
EMAIL_USE_SSL = env_to_bool('DJANGO_EMAIL_SSL', True)
EMAIL_HOST = os.environ.get('DJANGO_EMAIL_HOST') or 'smtp.mxhichina.com'
EMAIL_PORT = int(os.environ.get('DJANGO_EMAIL_PORT') or 465)
EMAIL_HOST_USER = os.environ.get('DJANGO_EMAIL_USER')
EMAIL_HOST_PASSWORD = os.environ.get('DJANGO_EMAIL_PASSWORD')
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
SERVER_EMAIL = EMAIL_HOST_USER
# Setting debug=false did NOT handle except email notifications
ADMINS = [('admin', os.environ.get('DJANGO_ADMIN_EMAIL') or 'admin@admin.com')]
# WX ADMIN password(Two times md5)
WXADMIN = os.environ.get(
'DJANGO_WXADMIN_PASSWORD') or '995F03AC401D6CABABAEF756FC4D43C7'
# szy日志配置
LOG_PATH = os.path.join(BASE_DIR, 'logs')
if not os.path.exists(LOG_PATH):
os.makedirs(LOG_PATH, exist_ok=True)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'root': {
'level': 'INFO',
'handlers': ['console', 'log_file'],
},
'formatters': {
'verbose': {
'format': '[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d %(module)s] %(message)s',
}
},
'filters': {
'require_debug_false': {
'()': 'django.utils.log.RequireDebugFalse',
},
'require_debug_true': {
'()': 'django.utils.log.RequireDebugTrue',
},
},
'handlers': {
'log_file': {
'level': 'INFO',
'class': 'logging.handlers.TimedRotatingFileHandler',
'filename': os.path.join(LOG_PATH, 'djangoblog.log'),
'when': 'D',
'formatter': 'verbose',
'interval': 1,
'delay': True,
'backupCount': 5,
'encoding': 'utf-8'
},
'console': {
'level': 'DEBUG',
'filters': ['require_debug_true'],
'class': 'logging.StreamHandler',
'formatter': 'verbose'
},
'null': {
'class': 'logging.NullHandler',
},
'mail_admins': {
'level': 'ERROR',
'filters': ['require_debug_false'],
'class': 'django.utils.log.AdminEmailHandler'
}
},
'loggers': {
'djangoblog': {
'handlers': ['log_file', 'console'],
'level': 'INFO',
'propagate': True,
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
}
}
}
STATICFILES_FINDERS = (
'django.contrib.staticfiles.finders.FileSystemFinder',
'django.contrib.staticfiles.finders.AppDirectoriesFinder',
# other
'compressor.finders.CompressorFinder',
)
# szy启用压缩
COMPRESS_ENABLED = True
# COMPRESS_OFFLINE = True
COMPRESS_CSS_FILTERS = [
# creates absolute urls from relative ones
'compressor.filters.css_default.CssAbsoluteFilter',
# css minimizer
'compressor.filters.cssmin.CSSMinFilter'
]
COMPRESS_JS_FILTERS = [
'compressor.filters.jsmin.JSMinFilter'
]
# szy媒体文件配置
MEDIA_ROOT = os.path.join(BASE_DIR, 'uploads')
MEDIA_URL = '/media/'
X_FRAME_OPTIONS = 'SAMEORIGIN'
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
if os.environ.get('DJANGO_ELASTICSEARCH_HOST'):
ELASTICSEARCH_DSL = {
'default': {
'hosts': os.environ.get('DJANGO_ELASTICSEARCH_HOST')
},
}
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.elasticsearch_backend.ElasticSearchEngine',
},
}
# szy插件系统配置
# Plugin System
PLUGINS_DIR = BASE_DIR / 'plugins'
ACTIVE_PLUGINS = [
'article_copyright',
'reading_time',
'external_links',
'view_count',
'seo_optimizer'
]

@ -0,0 +1,74 @@
from django.contrib.sitemaps import Sitemap
from django.urls import reverse
from blog.models import Article, Category, Tag
# szy静态视图站点地图类
class StaticViewSitemap(Sitemap):
priority = 0.5
changefreq = 'daily'
# szy定义包含在站点地图中的项
def items(self):
return ['blog:index', ]
# szy获取项的URL位置
def location(self, item):
return reverse(item)
# szy文章站点地图类
class ArticleSiteMap(Sitemap):
# szy设置文章更新频率和优先级
changefreq = "monthly"
priority = "0.6"
# szy获取所有已发布的文章
def items(self):
return Article.objects.filter(status='p')
# szy获取文章的最后修改时间
def lastmod(self, obj):
return obj.last_modify_time
# szy分类站点地图类
class CategorySiteMap(Sitemap):
# szy设置分类更新频率和优先级
changefreq = "Weekly"
priority = "0.6"
# szy获取所有分类
def items(self):
return Category.objects.all()
# szy获取分类的最后修改时间
def lastmod(self, obj):
return obj.last_modify_time
# szy标签站点地图类
class TagSiteMap(Sitemap):
# szy设置标签更新频率和优先级
changefreq = "Weekly"
priority = "0.3"
# szy获取所有标签
def items(self):
return Tag.objects.all()
# szy获取标签的最后修改时间
def lastmod(self, obj):
return obj.last_modify_time
# szy用户站点地图类
class UserSiteMap(Sitemap):
# szy设置用户更新频率和优先级
changefreq = "Weekly"
priority = "0.3"
# szy获取所有有文章的作者去重
def items(self):
return list(set(map(lambda x: x.author, Article.objects.all())))
# szy获取用户的注册时间
def lastmod(self, obj):
return obj.date_joined

@ -0,0 +1,26 @@
import logging
import requests
from django.conf import settings
logger = logging.getLogger(__name__)
# szy蜘蛛爬虫通知类用于向搜索引擎推送URL
class SpiderNotify():
@staticmethod
def baidu_notify(urls):
try:
# szy将URL列表转换为换行分隔的字符串
data = '\n'.join(urls)
result = requests.post(settings.BAIDU_NOTIFY_URL, data=data)
# szy记录推送结果日志
logger.info(result.text)
except Exception as e:
# szy记录推送异常日志
logger.error(e)
# szy推送单个URL的便捷方法
@staticmethod
def notify(url):
SpiderNotify.baidu_notify(url)

@ -0,0 +1,39 @@
from django.test import TestCase
from djangoblog.utils import *
# szyDjangoBlog测试类用于测试工具函数
class DjangoBlogTest(TestCase):
# szy测试初始化设置
def setUp(self):
pass
# szy测试工具函数功能
def test_utils(self):
# szy测试SHA256加密函数
md5 = get_sha256('test')
self.assertIsNotNone(md5)
# szy测试Markdown解析函数
c = CommonMarkdown.get_markdown('''
# Title1
```python
import os
```
[url](https://www.lylinux.net/)
[ddd](http://www.baidu.com)
''')
self.assertIsNotNone(c)
# szy测试字典转URL参数字符串函数
d = {
'd': 'key1',
'd2': 'key2'
}
data = parse_dict_to_url(d)
self.assertIsNotNone(data)

@ -0,0 +1,81 @@
"""djangoblog URL Configuration
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/1.10/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: url(r'^$', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: url(r'^$', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.conf.urls import url, include
2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls'))
"""
from django.conf import settings
from django.conf.urls.i18n import i18n_patterns
from django.conf.urls.static import static
from django.contrib.sitemaps.views import sitemap
from django.urls import path, include
from django.urls import re_path
from haystack.views import search_view_factory
from blog.views import EsSearchView
from djangoblog.admin_site import admin_site
from djangoblog.elasticsearch_backend import ElasticSearchModelSearchForm
from djangoblog.feeds import DjangoBlogFeed
from djangoblog.sitemap import ArticleSiteMap, CategorySiteMap, StaticViewSitemap, TagSiteMap, UserSiteMap
# szy定义站点地图配置
sitemaps = {
'blog': ArticleSiteMap,
'Category': CategorySiteMap,
'Tag': TagSiteMap,
'User': UserSiteMap,
'static': StaticViewSitemap
}
# szy定义错误处理视图
handler404 = 'blog.views.page_not_found_view'
handler500 = 'blog.views.server_error_view'
handle403 = 'blog.views.permission_denied_view'
# szy基础URL模式配置
urlpatterns = [
path('i18n/', include('django.conf.urls.i18n')),
]
# szy国际化URL模式配置
urlpatterns += i18n_patterns(
# szy管理员后台URL
re_path(r'^admin/', admin_site.urls),
# szy博客应用URL
re_path(r'', include('blog.urls', namespace='blog')),
# szyMarkdown编辑器URL
re_path(r'mdeditor/', include('mdeditor.urls')),
# szy评论系统URL
re_path(r'', include('comments.urls', namespace='comment')),
# szy账户管理URL
re_path(r'', include('accounts.urls', namespace='account')),
# szyOAuth认证URL
re_path(r'', include('oauth.urls', namespace='oauth')),
# szy站点地图URL
re_path(r'^sitemap\.xml$', sitemap, {'sitemaps': sitemaps},
name='django.contrib.sitemaps.views.sitemap'),
# szyFeed订阅URL
re_path(r'^feed/$', DjangoBlogFeed()),
# szyRSS订阅URL
re_path(r'^rss/$', DjangoBlogFeed()),
# szy搜索功能URL使用ElasticSearch视图和表单
re_path('^search', search_view_factory(view_class=EsSearchView, form_class=ElasticSearchModelSearchForm),
name='search'),
# szy服务器管理URL
re_path(r'', include('servermanager.urls', namespace='servermanager')),
# szy位置追踪URL
re_path(r'', include('owntracks.urls', namespace='owntracks'))
, prefix_default_language=False) + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
# szy调试模式下添加媒体文件服务
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL,
document_root=settings.MEDIA_ROOT)

@ -0,0 +1,244 @@
#!/usr/bin/env python
# encoding: utf-8
import logging
import os
import random
import string
import uuid
from hashlib import sha256
import bleach
import markdown
import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
logger = logging.getLogger(__name__)
# szy获取最大文章ID和评论ID
def get_max_articleid_commentid():
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
# szy计算字符串的SHA256哈希值
def get_sha256(str):
m = sha256(str.encode('utf-8'))
return m.hexdigest()
# szy缓存装饰器用于函数结果缓存
def cache_decorator(expiration=3 * 60):
def wrapper(func):
def news(*args, **kwargs):
try:
view = args[0]
key = view.get_cache_key()
except:
key = None
if not key:
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
value = cache.get(key)
if value is not None:
# logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
logger.debug(
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
value = func(*args, **kwargs)
if value is None:
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
# szy刷新视图缓存
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
:param path:url路径
:param servername:host
:param serverport:端口
:param key_prefix:前缀
:return:是否成功
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
return True
return False
# szy获取当前站点信息带缓存
@cache_decorator()
def get_current_site():
site = Site.objects.get_current()
return site
# szy通用Markdown处理类
class CommonMarkdown:
# szy转换Markdown文本为HTML
@staticmethod
def _convert_markdown(value):
md = markdown.Markdown(
extensions=[
'extra',
'codehilite',
'toc',
'tables',
]
)
body = md.convert(value)
toc = md.toc
return body, toc
# szy获取带目录的Markdown内容
@staticmethod
def get_markdown_with_toc(value):
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
# szy获取Markdown内容
@staticmethod
def get_markdown(value):
body, toc = CommonMarkdown._convert_markdown(value)
return body
# szy发送邮件函数
def send_email(emailto, title, content):
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
# szy生成6位随机数字验证码
def generate_code() -> str:
"""生成随机数验证码"""
return ''.join(random.sample(string.digits, 6))
# szy将字典转换为URL参数字符串
def parse_dict_to_url(dict):
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
# szy获取博客设置
def get_blog_setting():
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value)
return value
# szy保存用户头像到本地
def save_user_avatar(url):
'''
保存用户头像
:param url:头像url
:return: 本地路径
'''
logger.info(url)
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir)
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
save_filename = str(uuid.uuid4().hex) + ext
logger.info('保存用户头像:' + basedir + save_filename)
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename)
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png')
# szy删除侧边栏缓存
def delete_sidebar_cache():
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
cache.delete(k)
# szy删除视图缓存
def delete_view_cache(prefix, keys):
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
# szy获取资源URL
def get_resource_url():
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
# szy清洗HTML内容移除不安全的标签和属性
def sanitize_html(html):
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)

File diff suppressed because it is too large Load Diff

@ -0,0 +1,18 @@
"""
WSGI config for djangoblog project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/1.10/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
# szy设置Django的默认设置模块环境变量
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoblog.settings")
# szy获取WSGI应用实例用于服务器部署
application = get_wsgi_application()

Binary file not shown.

Binary file not shown.
Loading…
Cancel
Save