Compare commits

..

5 Commits

Author SHA1 Message Date
pwc3lphzf 2fff54c62e ADD file via upload
3 months ago
pwc3lphzf 629bc72a53 Add src
3 months ago
pwc3lphzf 569025a29e ADD file via upload
3 months ago
吕弘罡 7f7df033a5 nope
4 months ago
吕弘罡 4b13938caa nope
4 months ago

@ -0,0 +1,158 @@
# DjangoBlog
<p align="center">
<a href="https://github.com/liangliangyy/DjangoBlog/actions/workflows/django.yml"><img src="https://github.com/liangliangyy/DjangoBlog/actions/workflows/django.yml/badge.svg" alt="Django CI"></a>
<a href="https://github.com/liangliangyy/DjangoBlog/actions/workflows/codeql-analysis.yml"><img src="https://github.com/liangliangyy/DjangoBlog/actions/workflows/codeql-analysis.yml/badge.svg" alt="CodeQL"></a>
<a href="https://codecov.io/gh/liangliangyy/DjangoBlog"><img src="https://codecov.io/gh/liangliangyy/DjangoBlog/branch/master/graph/badge.svg" alt="codecov"></a>
<a href="https://github.com/liangliangyy/DjangoBlog/blob/master/LICENSE"><img src="https://img.shields.io/github/license/liangliangyy/djangoblog.svg" alt="license"></a>
</p>
<p align="center">
<b>一款功能强大、设计优雅的现代化博客系统</b>
<br>
<a href="/docs/README-en.md">English</a><b>简体中文</b>
</p>
---
DjangoBlog 是一款基于 Python 3.10 和 Django 4.0 构建的高性能博客平台。它不仅提供了传统博客的所有核心功能还通过一个灵活的插件系统让您可以轻松扩展和定制您的网站。无论您是个人博主、技术爱好者还是内容创作者DjangoBlog 都旨在为您提供一个稳定、高效且易于维护的写作和发布环境。
## ✨ 特性亮点
- **强大的内容管理**: 支持文章、独立页面、分类和标签的完整管理。内置强大的 Markdown 编辑器,支持代码语法高亮。
- **全文搜索**: 集成搜索引擎,提供快速、精准的文章内容搜索。
- **互动评论系统**: 支持回复、邮件提醒等功能,评论内容同样支持 Markdown。
- **灵活的侧边栏**: 可自定义展示最新文章、最多阅读、标签云等模块。
- **社交化登录**: 内置 OAuth 支持,已集成 Google, GitHub, Facebook, 微博, QQ 等主流平台。
- **高性能缓存**: 原生支持 Redis 缓存,并提供自动刷新机制,确保网站高速响应。
- **SEO 友好**: 具备基础 SEO 功能,新内容发布后可自动通知 Google 和百度。
- **便捷的插件系统**: 通过创建独立的插件来扩展博客功能代码解耦易于维护。我们已经通过插件实现了文章浏览计数、SEO 优化等功能!
- **集成图床**: 内置简单的图床功能,方便图片上传和管理。
- **自动化前端**: 集成 `django-compressor`,自动压缩和优化 CSS 及 JavaScript 文件。
- **健壮的运维**: 内置网站异常邮件提醒和微信公众号管理功能。
## 🛠️ 技术栈
- **后端**: Python 3.10, Django 4.0
- **数据库**: MySQL, SQLite (可配置)
- **缓存**: Redis
- **前端**: HTML5, CSS3, JavaScript
- **搜索**: Whoosh, Elasticsearch (可配置)
- **编辑器**: Markdown (mdeditor)
## 🚀 快速开始
### 1. 环境准备
确保您的系统中已安装 Python 3.10+ 和 MySQL/MariaDB。
### 2. 克隆与安装
```bash
# 克隆项目到本地
git clone https://github.com/liangliangyy/DjangoBlog.git
cd DjangoBlog
# 安装依赖
pip install -r requirements.txt
```
### 3. 项目配置
- **数据库**:
打开 `djangoblog/settings.py` 文件,找到 `DATABASES` 配置项,修改为您的 MySQL 连接信息。
```python
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'djangoblog',
'USER': 'root',
'PASSWORD': 'your_password',
'HOST': '127.0.0.1',
'PORT': 3306,
}
}
```
在 MySQL 中创建数据库:
```sql
CREATE DATABASE `djangoblog` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
```
- **更多配置**:
关于邮件发送、OAuth 登录、缓存等更多高级配置,请参阅我们的 [详细配置文档](/docs/config.md)。
### 4. 初始化数据库
```bash
python manage.py makemigrations
python manage.py migrate
# 创建一个超级管理员账户
python manage.py createsuperuser
```
### 5. 运行项目
```bash
# (可选) 生成一些测试数据
python manage.py create_testdata
# (可选) 收集和压缩静态文件
python manage.py collectstatic --noinput
python manage.py compress --force
# 启动开发服务器
python manage.py runserver
```
现在,在您的浏览器中访问 `http://127.0.0.1:8000/`,您应该能看到 DjangoBlog 的首页了!
## 部署
- **传统部署**: 我们为您准备了非常详细的 [服务器部署教程](https://www.lylinux.net/article/2019/8/5/58.html)。
- **Docker 部署**: 项目已全面支持 Docker。如果您熟悉容器化技术请参考 [Docker 部署文档](/docs/docker.md) 来快速启动。
- **Kubernetes 部署**: 我们也提供了完整的 [Kubernetes 部署指南](/docs/k8s.md),助您轻松上云。
## 🧩 插件系统
插件系统是 DjangoBlog 的核心特色之一。它允许您在不修改核心代码的情况下,通过编写独立的插件来为您的博客添加新功能。
- **工作原理**: 插件通过在预定义的“钩子”上注册回调函数来工作。例如,当一篇文章被渲染时,`after_article_body_get` 钩子会被触发,所有注册到此钩子的函数都会被执行。
- **现有插件**: `view_count`(浏览计数), `seo_optimizer`SEO优化等都是通过插件系统实现的。
- **开发您自己的插件**: 只需在 `plugins` 目录下创建一个新的文件夹,并编写您的 `plugin.py`。欢迎探索并为 DjangoBlog 社区贡献您的创意!
## 🤝 贡献指南
我们热烈欢迎任何形式的贡献!如果您有好的想法或发现了 Bug请随时提交 Issue 或 Pull Request。
## 📄 许可证
本项目基于 [MIT License](LICENSE) 开源。
---
## ❤️ 支持与赞助
如果您觉得这个项目对您有帮助,并且希望支持我继续维护和开发新功能,欢迎请我喝杯咖啡!您的每一份支持都是我前进的最大动力。
<p align="center">
<img src="/docs/imgs/alipay.jpg" width="150" alt="支付宝赞助">
<img src="/docs/imgs/wechat.jpg" width="150" alt="微信赞助">
</p>
<p align="center">
<i>(左) 支付宝 / (右) 微信</i>
</p>
## 🙏 鸣谢
特别感谢 **JetBrains** 为本项目提供的免费开源许可证。
<p align="center">
<a href="https://www.jetbrains.com/?from=DjangoBlog">
<img src="/docs/imgs/pycharm_logo.png" width="150" alt="JetBrains Logo">
</a>
</p>
---
> 如果本项目帮助到了你,请在[这里](https://github.com/liangliangyy/DjangoBlog/issues/214)留下你的网址,让更多的人看到。您的回复将会是我继续更新维护下去的动力。

@ -13,85 +13,51 @@ from .models import Comment
class CommentPostView(FormView):
"""评论提交视图类,处理文章评论的发布"""
# 使用自定义的评论表单
form_class = CommentForm
# 指定渲染模板为文章详情页
template_name = 'blog/article_detail.html'
@method_decorator(csrf_protect)
def dispatch(self, *args, **kwargs):
"""重写dispatch方法添加CSRF保护装饰器"""
return super(CommentPostView, self).dispatch(*args, **kwargs)
def get(self, request, *args, **kwargs):
"""处理GET请求 - 重定向到文章详情页的评论区域"""
# 从URL参数获取文章ID
article_id = self.kwargs['article_id']
# 获取文章对象不存在则返回404
article = get_object_or_404(Article, pk=article_id)
# 获取文章的绝对URL
url = article.get_absolute_url()
# 重定向到文章详情页的评论锚点区域
return HttpResponseRedirect(url + "#comments")
def form_invalid(self, form):
"""表单验证失败时的处理逻辑"""
# 获取文章ID
article_id = self.kwargs['article_id']
# 获取文章对象
article = get_object_or_404(Article, pk=article_id)
# 重新渲染模板,显示表单错误信息
return self.render_to_response({
'form': form, # 包含错误信息的表单
'article': article # 文章对象用于模板渲染
'form': form,
'article': article
})
def form_valid(self, form):
"""表单验证成功后的处理逻辑"""
# 获取当前登录用户
"""提交的数据验证合法后的逻辑"""
user = self.request.user
# 通过用户ID获取对应的BlogUser作者对象
author = BlogUser.objects.get(pk=user.pk)
# 从URL参数获取文章ID
article_id = self.kwargs['article_id']
# 获取文章对象
article = get_absolute_url(Article, pk=article_id)
article = get_object_or_404(Article, pk=article_id)
# 检查文章是否允许评论
if article.comment_status == 'c' or article.status == 'c':
raise ValidationError("该文章评论已关闭.")
# 保存表单数据但不提交到数据库(commit=False)
comment = form.save(False)
# 设置评论关联的文章
comment.article = article
# 获取博客设置
from djangoblog.utils import get_blog_setting
settings = get_blog_setting()
# 根据博客设置决定评论是否需要审核
if not settings.comment_need_review:
comment.is_enable = True # 不需要审核,直接启用评论
# 设置评论作者
comment.is_enable = True
comment.author = author
# 处理父级评论(回复功能)
if form.cleaned_data['parent_comment_id']:
# 获取父评论对象
parent_comment = Comment.objects.get(
pk=form.cleaned_data['parent_comment_id'])
# 设置评论的父评论
comment.parent_comment = parent_comment
# 保存评论到数据库
comment.save(True)
# 重定向到文章详情页的特定评论位置
return HttpResponseRedirect(
"%s#div-comment-%d" % # 使用锚点定位到新发布的评论
(article.get_absolute_url(), comment.pk))
"%s#div-comment-%d" %
(article.get_absolute_url(), comment.pk))

@ -0,0 +1 @@
default_app_config = 'djangoblog.apps.DjangoblogAppConfig'

@ -0,0 +1,64 @@
from django.contrib.admin import AdminSite
from django.contrib.admin.models import LogEntry
from django.contrib.sites.admin import SiteAdmin
from django.contrib.sites.models import Site
from accounts.admin import *
from blog.admin import *
from blog.models import *
from comments.admin import *
from comments.models import *
from djangoblog.logentryadmin import LogEntryAdmin
from oauth.admin import *
from oauth.models import *
from owntracks.admin import *
from owntracks.models import *
from servermanager.admin import *
from servermanager.models import *
class DjangoBlogAdminSite(AdminSite):
site_header = 'djangoblog administration'
site_title = 'djangoblog site admin'
def __init__(self, name='admin'):
super().__init__(name)
def has_permission(self, request):
return request.user.is_superuser
# def get_urls(self):
# urls = super().get_urls()
# from django.urls import path
# from blog.views import refresh_memcache
#
# my_urls = [
# path('refresh/', self.admin_view(refresh_memcache), name="refresh"),
# ]
# return urls + my_urls
admin_site = DjangoBlogAdminSite(name='admin')
admin_site.register(Article, ArticlelAdmin)
admin_site.register(Category, CategoryAdmin)
admin_site.register(Tag, TagAdmin)
admin_site.register(Links, LinksAdmin)
admin_site.register(SideBar, SideBarAdmin)
admin_site.register(BlogSettings, BlogSettingsAdmin)
admin_site.register(commands, CommandsAdmin)
admin_site.register(EmailSendLog, EmailSendLogAdmin)
admin_site.register(BlogUser, BlogUserAdmin)
admin_site.register(Comment, CommentAdmin)
admin_site.register(OAuthUser, OAuthUserAdmin)
admin_site.register(OAuthConfig, OAuthConfigAdmin)
admin_site.register(OwnTrackLog, OwnTrackLogsAdmin)
admin_site.register(Site, SiteAdmin)
admin_site.register(LogEntry, LogEntryAdmin)

@ -0,0 +1,11 @@
from django.apps import AppConfig
class DjangoblogAppConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'djangoblog'
def ready(self):
super().ready()
# Import and load plugins here
from .plugin_manage.loader import load_plugins
load_plugins()

@ -0,0 +1,122 @@
import _thread
import logging
import django.dispatch
from django.conf import settings
from django.contrib.admin.models import LogEntry
from django.contrib.auth.signals import user_logged_in, user_logged_out
from django.core.mail import EmailMultiAlternatives
from django.db.models.signals import post_save
from django.dispatch import receiver
from comments.models import Comment
from comments.utils import send_comment_email
from djangoblog.spider_notify import SpiderNotify
from djangoblog.utils import cache, expire_view_cache, delete_sidebar_cache, delete_view_cache
from djangoblog.utils import get_current_site
from oauth.models import OAuthUser
logger = logging.getLogger(__name__)
oauth_user_login_signal = django.dispatch.Signal(['id'])
send_email_signal = django.dispatch.Signal(
['emailto', 'title', 'content'])
@receiver(send_email_signal)
def send_email_signal_handler(sender, **kwargs):
emailto = kwargs['emailto']
title = kwargs['title']
content = kwargs['content']
msg = EmailMultiAlternatives(
title,
content,
from_email=settings.DEFAULT_FROM_EMAIL,
to=emailto)
msg.content_subtype = "html"
from servermanager.models import EmailSendLog
log = EmailSendLog()
log.title = title
log.content = content
log.emailto = ','.join(emailto)
try:
result = msg.send()
log.send_result = result > 0
except Exception as e:
logger.error(f"失败邮箱号: {emailto}, {e}")
log.send_result = False
log.save()
@receiver(oauth_user_login_signal)
def oauth_user_login_signal_handler(sender, **kwargs):
id = kwargs['id']
oauthuser = OAuthUser.objects.get(id=id)
site = get_current_site().domain
if oauthuser.picture and not oauthuser.picture.find(site) >= 0:
from djangoblog.utils import save_user_avatar
oauthuser.picture = save_user_avatar(oauthuser.picture)
oauthuser.save()
delete_sidebar_cache()
@receiver(post_save)
def model_post_save_callback(
sender,
instance,
created,
raw,
using,
update_fields,
**kwargs):
clearcache = False
if isinstance(instance, LogEntry):
return
if 'get_full_url' in dir(instance):
is_update_views = update_fields == {'views'}
if not settings.TESTING and not is_update_views:
try:
notify_url = instance.get_full_url()
SpiderNotify.baidu_notify([notify_url])
except Exception as ex:
logger.error("notify sipder", ex)
if not is_update_views:
clearcache = True
if isinstance(instance, Comment):
if instance.is_enable:
path = instance.article.get_absolute_url()
site = get_current_site().domain
if site.find(':') > 0:
site = site[0:site.find(':')]
expire_view_cache(
path,
servername=site,
serverport=80,
key_prefix='blogdetail')
if cache.get('seo_processor'):
cache.delete('seo_processor')
comment_cache_key = 'article_comments_{id}'.format(
id=instance.article.id)
cache.delete(comment_cache_key)
delete_sidebar_cache()
delete_view_cache('article_comments', [str(instance.article.pk)])
_thread.start_new_thread(send_comment_email, (instance,))
if clearcache:
cache.clear()
@receiver(user_logged_in)
@receiver(user_logged_out)
def user_auth_callback(sender, request, user, **kwargs):
if user and user.username:
logger.info(user)
delete_sidebar_cache()
# cache.clear()

@ -0,0 +1,183 @@
from django.utils.encoding import force_str
from elasticsearch_dsl import Q
from haystack.backends import BaseEngine, BaseSearchBackend, BaseSearchQuery, log_query
from haystack.forms import ModelSearchForm
from haystack.models import SearchResult
from haystack.utils import log as logging
from blog.documents import ArticleDocument, ArticleDocumentManager
from blog.models import Article
logger = logging.getLogger(__name__)
class ElasticSearchBackend(BaseSearchBackend):
def __init__(self, connection_alias, **connection_options):
super(
ElasticSearchBackend,
self).__init__(
connection_alias,
**connection_options)
self.manager = ArticleDocumentManager()
self.include_spelling = True
def _get_models(self, iterable):
models = iterable if iterable and iterable[0] else Article.objects.all()
docs = self.manager.convert_to_doc(models)
return docs
def _create(self, models):
self.manager.create_index()
docs = self._get_models(models)
self.manager.rebuild(docs)
def _delete(self, models):
for m in models:
m.delete()
return True
def _rebuild(self, models):
models = models if models else Article.objects.all()
docs = self.manager.convert_to_doc(models)
self.manager.update_docs(docs)
def update(self, index, iterable, commit=True):
models = self._get_models(iterable)
self.manager.update_docs(models)
def remove(self, obj_or_string):
models = self._get_models([obj_or_string])
self._delete(models)
def clear(self, models=None, commit=True):
self.remove(None)
@staticmethod
def get_suggestion(query: str) -> str:
"""获取推荐词, 如果没有找到添加原搜索词"""
search = ArticleDocument.search() \
.query("match", body=query) \
.suggest('suggest_search', query, term={'field': 'body'}) \
.execute()
keywords = []
for suggest in search.suggest.suggest_search:
if suggest["options"]:
keywords.append(suggest["options"][0]["text"])
else:
keywords.append(suggest["text"])
return ' '.join(keywords)
@log_query
def search(self, query_string, **kwargs):
logger.info('search query_string:' + query_string)
start_offset = kwargs.get('start_offset')
end_offset = kwargs.get('end_offset')
# 推荐词搜索
if getattr(self, "is_suggest", None):
suggestion = self.get_suggestion(query_string)
else:
suggestion = query_string
q = Q('bool',
should=[Q('match', body=suggestion), Q('match', title=suggestion)],
minimum_should_match="70%")
search = ArticleDocument.search() \
.query('bool', filter=[q]) \
.filter('term', status='p') \
.filter('term', type='a') \
.source(False)[start_offset: end_offset]
results = search.execute()
hits = results['hits'].total
raw_results = []
for raw_result in results['hits']['hits']:
app_label = 'blog'
model_name = 'Article'
additional_fields = {}
result_class = SearchResult
result = result_class(
app_label,
model_name,
raw_result['_id'],
raw_result['_score'],
**additional_fields)
raw_results.append(result)
facets = {}
spelling_suggestion = None if query_string == suggestion else suggestion
return {
'results': raw_results,
'hits': hits,
'facets': facets,
'spelling_suggestion': spelling_suggestion,
}
class ElasticSearchQuery(BaseSearchQuery):
def _convert_datetime(self, date):
if hasattr(date, 'hour'):
return force_str(date.strftime('%Y%m%d%H%M%S'))
else:
return force_str(date.strftime('%Y%m%d000000'))
def clean(self, query_fragment):
"""
Provides a mechanism for sanitizing user input before presenting the
value to the backend.
Whoosh 1.X differs here in that you can no longer use a backslash
to escape reserved characters. Instead, the whole word should be
quoted.
"""
words = query_fragment.split()
cleaned_words = []
for word in words:
if word in self.backend.RESERVED_WORDS:
word = word.replace(word, word.lower())
for char in self.backend.RESERVED_CHARACTERS:
if char in word:
word = "'%s'" % word
break
cleaned_words.append(word)
return ' '.join(cleaned_words)
def build_query_fragment(self, field, filter_type, value):
return value.query_string
def get_count(self):
results = self.get_results()
return len(results) if results else 0
def get_spelling_suggestion(self, preferred_query=None):
return self._spelling_suggestion
def build_params(self, spelling_query=None):
kwargs = super(ElasticSearchQuery, self).build_params(spelling_query=spelling_query)
return kwargs
class ElasticSearchModelSearchForm(ModelSearchForm):
def search(self):
# 是否建议搜索
self.searchqueryset.query.backend.is_suggest = self.data.get("is_suggest") != "no"
sqs = super().search()
return sqs
class ElasticSearchEngine(BaseEngine):
backend = ElasticSearchBackend
query = ElasticSearchQuery

@ -0,0 +1,40 @@
from django.contrib.auth import get_user_model
from django.contrib.syndication.views import Feed
from django.utils import timezone
from django.utils.feedgenerator import Rss201rev2Feed
from blog.models import Article
from djangoblog.utils import CommonMarkdown
class DjangoBlogFeed(Feed):
feed_type = Rss201rev2Feed
description = '大巧无工,重剑无锋.'
title = "且听风吟 大巧无工,重剑无锋. "
link = "/feed/"
def author_name(self):
return get_user_model().objects.first().nickname
def author_link(self):
return get_user_model().objects.first().get_absolute_url()
def items(self):
return Article.objects.filter(type='a', status='p').order_by('-pub_time')[:5]
def item_title(self, item):
return item.title
def item_description(self, item):
return CommonMarkdown.get_markdown(item.body)
def feed_copyright(self):
now = timezone.now()
return "Copyright© {year} 且听风吟".format(year=now.year)
def item_link(self, item):
return item.get_absolute_url()
def item_guid(self, item):
return

@ -0,0 +1,91 @@
from django.contrib import admin
from django.contrib.admin.models import DELETION
from django.contrib.contenttypes.models import ContentType
from django.urls import reverse, NoReverseMatch
from django.utils.encoding import force_str
from django.utils.html import escape
from django.utils.safestring import mark_safe
from django.utils.translation import gettext_lazy as _
class LogEntryAdmin(admin.ModelAdmin):
list_filter = [
'content_type'
]
search_fields = [
'object_repr',
'change_message'
]
list_display_links = [
'action_time',
'get_change_message',
]
list_display = [
'action_time',
'user_link',
'content_type',
'object_link',
'get_change_message',
]
def has_add_permission(self, request):
return False
def has_change_permission(self, request, obj=None):
return (
request.user.is_superuser or
request.user.has_perm('admin.change_logentry')
) and request.method != 'POST'
def has_delete_permission(self, request, obj=None):
return False
def object_link(self, obj):
object_link = escape(obj.object_repr)
content_type = obj.content_type
if obj.action_flag != DELETION and content_type is not None:
# try returning an actual link instead of object repr string
try:
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.object_id]
)
object_link = '<a href="{}">{}</a>'.format(url, object_link)
except NoReverseMatch:
pass
return mark_safe(object_link)
object_link.admin_order_field = 'object_repr'
object_link.short_description = _('object')
def user_link(self, obj):
content_type = ContentType.objects.get_for_model(type(obj.user))
user_link = escape(force_str(obj.user))
try:
# try returning an actual link instead of object repr string
url = reverse(
'admin:{}_{}_change'.format(content_type.app_label,
content_type.model),
args=[obj.user.pk]
)
user_link = '<a href="{}">{}</a>'.format(url, user_link)
except NoReverseMatch:
pass
return mark_safe(user_link)
user_link.admin_order_field = 'user'
user_link.short_description = _('user')
def get_queryset(self, request):
queryset = super(LogEntryAdmin, self).get_queryset(request)
return queryset.prefetch_related('content_type')
def get_actions(self, request):
actions = super(LogEntryAdmin, self).get_actions(request)
if 'delete_selected' in actions:
del actions['delete_selected']
return actions

@ -0,0 +1,41 @@
import logging
logger = logging.getLogger(__name__)
class BasePlugin:
# 插件元数据
PLUGIN_NAME = None
PLUGIN_DESCRIPTION = None
PLUGIN_VERSION = None
def __init__(self):
if not all([self.PLUGIN_NAME, self.PLUGIN_DESCRIPTION, self.PLUGIN_VERSION]):
raise ValueError("Plugin metadata (PLUGIN_NAME, PLUGIN_DESCRIPTION, PLUGIN_VERSION) must be defined.")
self.init_plugin()
self.register_hooks()
def init_plugin(self):
"""
插件初始化逻辑
子类可以重写此方法来实现特定的初始化操作
"""
logger.info(f'{self.PLUGIN_NAME} initialized.')
def register_hooks(self):
"""
注册插件钩子
子类可以重写此方法来注册特定的钩子
"""
pass
def get_plugin_info(self):
"""
获取插件信息
:return: 包含插件元数据的字典
"""
return {
'name': self.PLUGIN_NAME,
'description': self.PLUGIN_DESCRIPTION,
'version': self.PLUGIN_VERSION
}

@ -0,0 +1,7 @@
ARTICLE_DETAIL_LOAD = 'article_detail_load'
ARTICLE_CREATE = 'article_create'
ARTICLE_UPDATE = 'article_update'
ARTICLE_DELETE = 'article_delete'
ARTICLE_CONTENT_HOOK_NAME = "the_content"

@ -0,0 +1,44 @@
import logging
logger = logging.getLogger(__name__)
_hooks = {}
def register(hook_name: str, callback: callable):
"""
注册一个钩子回调
"""
if hook_name not in _hooks:
_hooks[hook_name] = []
_hooks[hook_name].append(callback)
logger.debug(f"Registered hook '{hook_name}' with callback '{callback.__name__}'")
def run_action(hook_name: str, *args, **kwargs):
"""
执行一个 Action Hook
它会按顺序执行所有注册到该钩子上的回调函数
"""
if hook_name in _hooks:
logger.debug(f"Running action hook '{hook_name}'")
for callback in _hooks[hook_name]:
try:
callback(*args, **kwargs)
except Exception as e:
logger.error(f"Error running action hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
def apply_filters(hook_name: str, value, *args, **kwargs):
"""
执行一个 Filter Hook
它会把 value 依次传递给所有注册的回调函数进行处理
"""
if hook_name in _hooks:
logger.debug(f"Applying filter hook '{hook_name}'")
for callback in _hooks[hook_name]:
try:
value = callback(value, *args, **kwargs)
except Exception as e:
logger.error(f"Error applying filter hook '{hook_name}' callback '{callback.__name__}': {e}", exc_info=True)
return value

@ -0,0 +1,19 @@
import os
import logging
from django.conf import settings
logger = logging.getLogger(__name__)
def load_plugins():
"""
Dynamically loads and initializes plugins from the 'plugins' directory.
This function is intended to be called when the Django app registry is ready.
"""
for plugin_name in settings.ACTIVE_PLUGINS:
plugin_path = os.path.join(settings.PLUGINS_DIR, plugin_name)
if os.path.isdir(plugin_path) and os.path.exists(os.path.join(plugin_path, 'plugin.py')):
try:
__import__(f'plugins.{plugin_name}.plugin')
logger.info(f"Successfully loaded plugin: {plugin_name}")
except ImportError as e:
logger.error(f"Failed to import plugin: {plugin_name}", exc_info=e)

@ -0,0 +1,343 @@
"""
Django settings for djangoblog project.
Generated by 'django-admin startproject' using Django 1.10.2.
For more information on this file, see
https://docs.djangoproject.com/en/1.10/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.10/ref/settings/
"""
import os
import sys
from pathlib import Path
from django.utils.translation import gettext_lazy as _
def env_to_bool(env, default):
str_val = os.environ.get(env)
return default if str_val is None else str_val == 'True'
# Build paths inside the project like this: BASE_DIR / 'subdir'.
BASE_DIR = Path(__file__).resolve().parent.parent
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.10/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = os.environ.get(
'DJANGO_SECRET_KEY') or 'n9ceqv38)#&mwuat@(mjb_p%em$e8$qyr#fw9ot!=ba6lijx-6'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = env_to_bool('DJANGO_DEBUG', True)
# DEBUG = False
TESTING = len(sys.argv) > 1 and sys.argv[1] == 'test'
# ALLOWED_HOSTS = []
ALLOWED_HOSTS = ['*', '127.0.0.1', 'example.com']
# django 4.0新增配置
CSRF_TRUSTED_ORIGINS = ['http://example.com']
# Application definition
INSTALLED_APPS = [
# 'django.contrib.admin',
'django.contrib.admin.apps.SimpleAdminConfig',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django.contrib.sites',
'django.contrib.sitemaps',
'mdeditor',
'haystack',
'blog',
'accounts',
'comments',
'oauth',
'servermanager',
'owntracks',
'compressor',
'djangoblog'
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.locale.LocaleMiddleware',
'django.middleware.gzip.GZipMiddleware',
# 'django.middleware.cache.UpdateCacheMiddleware',
'django.middleware.common.CommonMiddleware',
# 'django.middleware.cache.FetchFromCacheMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django.middleware.http.ConditionalGetMiddleware',
'blog.middleware.OnlineMiddleware'
]
ROOT_URLCONF = 'djangoblog.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [os.path.join(BASE_DIR, 'templates')],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
'blog.context_processors.seo_processor'
],
},
},
]
WSGI_APPLICATION = 'djangoblog.wsgi.application'
# Database
# https://docs.djangoproject.com/en/1.10/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': os.environ.get('DJANGO_MYSQL_DATABASE') or 'djangoblog',
'USER': os.environ.get('DJANGO_MYSQL_USER') or 'root',
'PASSWORD': os.environ.get('DJANGO_MYSQL_PASSWORD') or 'root',
'HOST': os.environ.get('DJANGO_MYSQL_HOST') or '127.0.0.1',
'PORT': int(
os.environ.get('DJANGO_MYSQL_PORT') or 3306),
'OPTIONS': {
'charset': 'utf8mb4'},
}}
# Password validation
# https://docs.djangoproject.com/en/1.10/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
LANGUAGES = (
('en', _('English')),
('zh-hans', _('Simplified Chinese')),
('zh-hant', _('Traditional Chinese')),
)
LOCALE_PATHS = (
os.path.join(BASE_DIR, 'locale'),
)
LANGUAGE_CODE = 'zh-hans'
TIME_ZONE = 'Asia/Shanghai'
USE_I18N = True
USE_L10N = True
USE_TZ = False
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.10/howto/static-files/
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.whoosh_cn_backend.WhooshEngine',
'PATH': os.path.join(os.path.dirname(__file__), 'whoosh_index'),
},
}
# Automatically update searching index
HAYSTACK_SIGNAL_PROCESSOR = 'haystack.signals.RealtimeSignalProcessor'
# Allow user login with username and password
AUTHENTICATION_BACKENDS = [
'accounts.user_login_backend.EmailOrUsernameModelBackend']
STATIC_ROOT = os.path.join(BASE_DIR, 'collectedstatic')
STATIC_URL = '/static/'
STATICFILES = os.path.join(BASE_DIR, 'static')
AUTH_USER_MODEL = 'accounts.BlogUser'
LOGIN_URL = '/login/'
TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
DATE_TIME_FORMAT = '%Y-%m-%d'
# bootstrap color styles
BOOTSTRAP_COLOR_TYPES = [
'default', 'primary', 'success', 'info', 'warning', 'danger'
]
# paginate
PAGINATE_BY = 10
# http cache timeout
CACHE_CONTROL_MAX_AGE = 2592000
# cache setting
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
'TIMEOUT': 10800,
'LOCATION': 'unique-snowflake',
}
}
# 使用redis作为缓存
if os.environ.get("DJANGO_REDIS_URL"):
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.redis.RedisCache',
'LOCATION': f'redis://{os.environ.get("DJANGO_REDIS_URL")}',
}
}
SITE_ID = 1
BAIDU_NOTIFY_URL = os.environ.get('DJANGO_BAIDU_NOTIFY_URL') \
or 'http://data.zz.baidu.com/urls?site=https://www.lylinux.net&token=1uAOGrMsUm5syDGn'
# Email:
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend'
EMAIL_USE_TLS = env_to_bool('DJANGO_EMAIL_TLS', False)
EMAIL_USE_SSL = env_to_bool('DJANGO_EMAIL_SSL', True)
EMAIL_HOST = os.environ.get('DJANGO_EMAIL_HOST') or 'smtp.mxhichina.com'
EMAIL_PORT = int(os.environ.get('DJANGO_EMAIL_PORT') or 465)
EMAIL_HOST_USER = os.environ.get('DJANGO_EMAIL_USER')
EMAIL_HOST_PASSWORD = os.environ.get('DJANGO_EMAIL_PASSWORD')
DEFAULT_FROM_EMAIL = EMAIL_HOST_USER
SERVER_EMAIL = EMAIL_HOST_USER
# Setting debug=false did NOT handle except email notifications
ADMINS = [('admin', os.environ.get('DJANGO_ADMIN_EMAIL') or 'admin@admin.com')]
# WX ADMIN password(Two times md5)
WXADMIN = os.environ.get(
'DJANGO_WXADMIN_PASSWORD') or '995F03AC401D6CABABAEF756FC4D43C7'
LOG_PATH = os.path.join(BASE_DIR, 'logs')
if not os.path.exists(LOG_PATH):
os.makedirs(LOG_PATH, exist_ok=True)
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'root': {
'level': 'INFO',
'handlers': ['console', 'log_file'],
},
'formatters': {
'verbose': {
'format': '[%(asctime)s] %(levelname)s [%(name)s.%(funcName)s:%(lineno)d %(module)s] %(message)s',
}
},
'filters': {
'require_debug_false': {
'()': 'django.utils.log.RequireDebugFalse',
},
'require_debug_true': {
'()': 'django.utils.log.RequireDebugTrue',
},
},
'handlers': {
'log_file': {
'level': 'INFO',
'class': 'logging.handlers.TimedRotatingFileHandler',
'filename': os.path.join(LOG_PATH, 'djangoblog.log'),
'when': 'D',
'formatter': 'verbose',
'interval': 1,
'delay': True,
'backupCount': 5,
'encoding': 'utf-8'
},
'console': {
'level': 'DEBUG',
'filters': ['require_debug_true'],
'class': 'logging.StreamHandler',
'formatter': 'verbose'
},
'null': {
'class': 'logging.NullHandler',
},
'mail_admins': {
'level': 'ERROR',
'filters': ['require_debug_false'],
'class': 'django.utils.log.AdminEmailHandler'
}
},
'loggers': {
'djangoblog': {
'handlers': ['log_file', 'console'],
'level': 'INFO',
'propagate': True,
},
'django.request': {
'handlers': ['mail_admins'],
'level': 'ERROR',
'propagate': False,
}
}
}
STATICFILES_FINDERS = (
'django.contrib.staticfiles.finders.FileSystemFinder',
'django.contrib.staticfiles.finders.AppDirectoriesFinder',
# other
'compressor.finders.CompressorFinder',
)
COMPRESS_ENABLED = True
# COMPRESS_OFFLINE = True
COMPRESS_CSS_FILTERS = [
# creates absolute urls from relative ones
'compressor.filters.css_default.CssAbsoluteFilter',
# css minimizer
'compressor.filters.cssmin.CSSMinFilter'
]
COMPRESS_JS_FILTERS = [
'compressor.filters.jsmin.JSMinFilter'
]
MEDIA_ROOT = os.path.join(BASE_DIR, 'uploads')
MEDIA_URL = '/media/'
X_FRAME_OPTIONS = 'SAMEORIGIN'
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
if os.environ.get('DJANGO_ELASTICSEARCH_HOST'):
ELASTICSEARCH_DSL = {
'default': {
'hosts': os.environ.get('DJANGO_ELASTICSEARCH_HOST')
},
}
HAYSTACK_CONNECTIONS = {
'default': {
'ENGINE': 'djangoblog.elasticsearch_backend.ElasticSearchEngine',
},
}
# Plugin System
PLUGINS_DIR = BASE_DIR / 'plugins'
ACTIVE_PLUGINS = [
'article_copyright',
'reading_time',
'external_links',
'view_count',
'seo_optimizer'
]

@ -0,0 +1,59 @@
from django.contrib.sitemaps import Sitemap
from django.urls import reverse
from blog.models import Article, Category, Tag
class StaticViewSitemap(Sitemap):
priority = 0.5
changefreq = 'daily'
def items(self):
return ['blog:index', ]
def location(self, item):
return reverse(item)
class ArticleSiteMap(Sitemap):
changefreq = "monthly"
priority = "0.6"
def items(self):
return Article.objects.filter(status='p')
def lastmod(self, obj):
return obj.last_modify_time
class CategorySiteMap(Sitemap):
changefreq = "Weekly"
priority = "0.6"
def items(self):
return Category.objects.all()
def lastmod(self, obj):
return obj.last_modify_time
class TagSiteMap(Sitemap):
changefreq = "Weekly"
priority = "0.3"
def items(self):
return Tag.objects.all()
def lastmod(self, obj):
return obj.last_modify_time
class UserSiteMap(Sitemap):
changefreq = "Weekly"
priority = "0.3"
def items(self):
return list(set(map(lambda x: x.author, Article.objects.all())))
def lastmod(self, obj):
return obj.date_joined

@ -0,0 +1,21 @@
import logging
import requests
from django.conf import settings
logger = logging.getLogger(__name__)
class SpiderNotify():
@staticmethod
def baidu_notify(urls):
try:
data = '\n'.join(urls)
result = requests.post(settings.BAIDU_NOTIFY_URL, data=data)
logger.info(result.text)
except Exception as e:
logger.error(e)
@staticmethod
def notify(url):
SpiderNotify.baidu_notify(url)

@ -0,0 +1,32 @@
from django.test import TestCase
from djangoblog.utils import *
class DjangoBlogTest(TestCase):
def setUp(self):
pass
def test_utils(self):
md5 = get_sha256('test')
self.assertIsNotNone(md5)
c = CommonMarkdown.get_markdown('''
# Title1
```python
import os
```
[url](https://www.lylinux.net/)
[ddd](http://www.baidu.com)
''')
self.assertIsNotNone(c)
d = {
'd': 'key1',
'd2': 'key2'
}
data = parse_dict_to_url(d)
self.assertIsNotNone(data)

@ -0,0 +1,64 @@
"""djangoblog URL Configuration
The `urlpatterns` list routes URLs to views. For more information please see:
https://docs.djangoproject.com/en/1.10/topics/http/urls/
Examples:
Function views
1. Add an import: from my_app import views
2. Add a URL to urlpatterns: url(r'^$', views.home, name='home')
Class-based views
1. Add an import: from other_app.views import Home
2. Add a URL to urlpatterns: url(r'^$', Home.as_view(), name='home')
Including another URLconf
1. Import the include() function: from django.conf.urls import url, include
2. Add a URL to urlpatterns: url(r'^blog/', include('blog.urls'))
"""
from django.conf import settings
from django.conf.urls.i18n import i18n_patterns
from django.conf.urls.static import static
from django.contrib.sitemaps.views import sitemap
from django.urls import path, include
from django.urls import re_path
from haystack.views import search_view_factory
from blog.views import EsSearchView
from djangoblog.admin_site import admin_site
from djangoblog.elasticsearch_backend import ElasticSearchModelSearchForm
from djangoblog.feeds import DjangoBlogFeed
from djangoblog.sitemap import ArticleSiteMap, CategorySiteMap, StaticViewSitemap, TagSiteMap, UserSiteMap
sitemaps = {
'blog': ArticleSiteMap,
'Category': CategorySiteMap,
'Tag': TagSiteMap,
'User': UserSiteMap,
'static': StaticViewSitemap
}
handler404 = 'blog.views.page_not_found_view'
handler500 = 'blog.views.server_error_view'
handle403 = 'blog.views.permission_denied_view'
urlpatterns = [
path('i18n/', include('django.conf.urls.i18n')),
]
urlpatterns += i18n_patterns(
re_path(r'^admin/', admin_site.urls),
re_path(r'', include('blog.urls', namespace='blog')),
re_path(r'mdeditor/', include('mdeditor.urls')),
re_path(r'', include('comments.urls', namespace='comment')),
re_path(r'', include('accounts.urls', namespace='account')),
re_path(r'', include('oauth.urls', namespace='oauth')),
re_path(r'^sitemap\.xml$', sitemap, {'sitemaps': sitemaps},
name='django.contrib.sitemaps.views.sitemap'),
re_path(r'^feed/$', DjangoBlogFeed()),
re_path(r'^rss/$', DjangoBlogFeed()),
re_path('^search', search_view_factory(view_class=EsSearchView, form_class=ElasticSearchModelSearchForm),
name='search'),
re_path(r'', include('servermanager.urls', namespace='servermanager')),
re_path(r'', include('owntracks.urls', namespace='owntracks'))
, prefix_default_language=False) + static(settings.STATIC_URL, document_root=settings.STATIC_ROOT)
if settings.DEBUG:
urlpatterns += static(settings.MEDIA_URL,
document_root=settings.MEDIA_ROOT)

@ -0,0 +1,232 @@
#!/usr/bin/env python
# encoding: utf-8
import logging
import os
import random
import string
import uuid
from hashlib import sha256
import bleach
import markdown
import requests
from django.conf import settings
from django.contrib.sites.models import Site
from django.core.cache import cache
from django.templatetags.static import static
logger = logging.getLogger(__name__)
def get_max_articleid_commentid():
from blog.models import Article
from comments.models import Comment
return (Article.objects.latest().pk, Comment.objects.latest().pk)
def get_sha256(str):
m = sha256(str.encode('utf-8'))
return m.hexdigest()
def cache_decorator(expiration=3 * 60):
def wrapper(func):
def news(*args, **kwargs):
try:
view = args[0]
key = view.get_cache_key()
except:
key = None
if not key:
unique_str = repr((func, args, kwargs))
m = sha256(unique_str.encode('utf-8'))
key = m.hexdigest()
value = cache.get(key)
if value is not None:
# logger.info('cache_decorator get cache:%s key:%s' % (func.__name__, key))
if str(value) == '__default_cache_value__':
return None
else:
return value
else:
logger.debug(
'cache_decorator set cache:%s key:%s' %
(func.__name__, key))
value = func(*args, **kwargs)
if value is None:
cache.set(key, '__default_cache_value__', expiration)
else:
cache.set(key, value, expiration)
return value
return news
return wrapper
def expire_view_cache(path, servername, serverport, key_prefix=None):
'''
刷新视图缓存
:param path:url路径
:param servername:host
:param serverport:端口
:param key_prefix:前缀
:return:是否成功
'''
from django.http import HttpRequest
from django.utils.cache import get_cache_key
request = HttpRequest()
request.META = {'SERVER_NAME': servername, 'SERVER_PORT': serverport}
request.path = path
key = get_cache_key(request, key_prefix=key_prefix, cache=cache)
if key:
logger.info('expire_view_cache:get key:{path}'.format(path=path))
if cache.get(key):
cache.delete(key)
return True
return False
@cache_decorator()
def get_current_site():
site = Site.objects.get_current()
return site
class CommonMarkdown:
@staticmethod
def _convert_markdown(value):
md = markdown.Markdown(
extensions=[
'extra',
'codehilite',
'toc',
'tables',
]
)
body = md.convert(value)
toc = md.toc
return body, toc
@staticmethod
def get_markdown_with_toc(value):
body, toc = CommonMarkdown._convert_markdown(value)
return body, toc
@staticmethod
def get_markdown(value):
body, toc = CommonMarkdown._convert_markdown(value)
return body
def send_email(emailto, title, content):
from djangoblog.blog_signals import send_email_signal
send_email_signal.send(
send_email.__class__,
emailto=emailto,
title=title,
content=content)
def generate_code() -> str:
"""生成随机数验证码"""
return ''.join(random.sample(string.digits, 6))
def parse_dict_to_url(dict):
from urllib.parse import quote
url = '&'.join(['{}={}'.format(quote(k, safe='/'), quote(v, safe='/'))
for k, v in dict.items()])
return url
def get_blog_setting():
value = cache.get('get_blog_setting')
if value:
return value
else:
from blog.models import BlogSettings
if not BlogSettings.objects.count():
setting = BlogSettings()
setting.site_name = 'djangoblog'
setting.site_description = '基于Django的博客系统'
setting.site_seo_description = '基于Django的博客系统'
setting.site_keywords = 'Django,Python'
setting.article_sub_length = 300
setting.sidebar_article_count = 10
setting.sidebar_comment_count = 5
setting.show_google_adsense = False
setting.open_site_comment = True
setting.analytics_code = ''
setting.beian_code = ''
setting.show_gongan_code = False
setting.comment_need_review = False
setting.save()
value = BlogSettings.objects.first()
logger.info('set cache get_blog_setting')
cache.set('get_blog_setting', value)
return value
def save_user_avatar(url):
'''
保存用户头像
:param url:头像url
:return: 本地路径
'''
logger.info(url)
try:
basedir = os.path.join(settings.STATICFILES, 'avatar')
rsp = requests.get(url, timeout=2)
if rsp.status_code == 200:
if not os.path.exists(basedir):
os.makedirs(basedir)
image_extensions = ['.jpg', '.png', 'jpeg', '.gif']
isimage = len([i for i in image_extensions if url.endswith(i)]) > 0
ext = os.path.splitext(url)[1] if isimage else '.jpg'
save_filename = str(uuid.uuid4().hex) + ext
logger.info('保存用户头像:' + basedir + save_filename)
with open(os.path.join(basedir, save_filename), 'wb+') as file:
file.write(rsp.content)
return static('avatar/' + save_filename)
except Exception as e:
logger.error(e)
return static('blog/img/avatar.png')
def delete_sidebar_cache():
from blog.models import LinkShowType
keys = ["sidebar" + x for x in LinkShowType.values]
for k in keys:
logger.info('delete sidebar key:' + k)
cache.delete(k)
def delete_view_cache(prefix, keys):
from django.core.cache.utils import make_template_fragment_key
key = make_template_fragment_key(prefix, keys)
cache.delete(key)
def get_resource_url():
if settings.STATIC_URL:
return settings.STATIC_URL
else:
site = get_current_site()
return 'http://' + site.domain + '/static/'
ALLOWED_TAGS = ['a', 'abbr', 'acronym', 'b', 'blockquote', 'code', 'em', 'i', 'li', 'ol', 'pre', 'strong', 'ul', 'h1',
'h2', 'p']
ALLOWED_ATTRIBUTES = {'a': ['href', 'title'], 'abbr': ['title'], 'acronym': ['title']}
def sanitize_html(html):
return bleach.clean(html, tags=ALLOWED_TAGS, attributes=ALLOWED_ATTRIBUTES)

File diff suppressed because it is too large Load Diff

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save