Compare commits

..

16 Commits
master ... main

Before

Width:  |  Height:  |  Size: 18 KiB

After

Width:  |  Height:  |  Size: 18 KiB

Before

Width:  |  Height:  |  Size: 129 KiB

After

Width:  |  Height:  |  Size: 129 KiB

Before

Width:  |  Height:  |  Size: 24 KiB

After

Width:  |  Height:  |  Size: 24 KiB

@ -0,0 +1,10 @@
[run]
source = .
include = *.py
omit =
*migrations*
*tests*
*.html
*whoosh_cn_backend*
*settings.py*
*venv*

@ -0,0 +1,11 @@
bin/data/
# virtualenv
venv/
collectedstatic/
djangoblog/whoosh_index/
uploads/
settings_production.py
*.md
docs/
logs/
static/

@ -0,0 +1,6 @@
blog/static/* linguist-vendored
*.js linguist-vendored
*.css linguist-vendored
* text=auto
*.sh text eol=lf
*.conf text eol=lf

@ -0,0 +1,80 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.pot
# Django stuff:
*.log
logs/
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# PyCharm
# http://www.jetbrains.com/pycharm/webhelp/project.html
.idea
.iml
static/
# virtualenv
venv/
collectedstatic/
djangoblog/whoosh_index/
google93fd32dbd906620a.html
baidu_verify_FlHL7cUyC9.html
BingSiteAuth.xml
cb9339dbe2ff86a5aa169d28dba5f615.txt
werobot_session.*
django.jpg
uploads/
settings_production.py
werobot_session.db
bin/datas/

@ -0,0 +1,158 @@
# DjangoBlog
<p align="center">
<a href="https://github.com/liangliangyy/DjangoBlog/actions/workflows/django.yml"><img src="https://github.com/liangliangyy/DjangoBlog/actions/workflows/django.yml/badge.svg" alt="Django CI"></a>
<a href="https://github.com/liangliangyy/DjangoBlog/actions/workflows/codeql-analysis.yml"><img src="https://github.com/liangliangyy/DjangoBlog/actions/workflows/codeql-analysis.yml/badge.svg" alt="CodeQL"></a>
<a href="https://codecov.io/gh/liangliangyy/DjangoBlog"><img src="https://codecov.io/gh/liangliangyy/DjangoBlog/branch/master/graph/badge.svg" alt="codecov"></a>
<a href="https://github.com/liangliangyy/DjangoBlog/blob/master/LICENSE"><img src="https://img.shields.io/github/license/liangliangyy/djangoblog.svg" alt="license"></a>
</p>
<p align="center">
<b>一款功能强大、设计优雅的现代化博客系统</b>
<br>
<a href="/docs/README-en.md">English</a><b>简体中文</b>
</p>
---
DjangoBlog 是一款基于 Python 3.10 和 Django 4.0 构建的高性能博客平台。它不仅提供了传统博客的所有核心功能还通过一个灵活的插件系统让您可以轻松扩展和定制您的网站。无论您是个人博主、技术爱好者还是内容创作者DjangoBlog 都旨在为您提供一个稳定、高效且易于维护的写作和发布环境。
## ✨ 特性亮点
- **强大的内容管理**: 支持文章、独立页面、分类和标签的完整管理。内置强大的 Markdown 编辑器,支持代码语法高亮。
- **全文搜索**: 集成搜索引擎,提供快速、精准的文章内容搜索。
- **互动评论系统**: 支持回复、邮件提醒等功能,评论内容同样支持 Markdown。
- **灵活的侧边栏**: 可自定义展示最新文章、最多阅读、标签云等模块。
- **社交化登录**: 内置 OAuth 支持,已集成 Google, GitHub, Facebook, 微博, QQ 等主流平台。
- **高性能缓存**: 原生支持 Redis 缓存,并提供自动刷新机制,确保网站高速响应。
- **SEO 友好**: 具备基础 SEO 功能,新内容发布后可自动通知 Google 和百度。
- **便捷的插件系统**: 通过创建独立的插件来扩展博客功能代码解耦易于维护。我们已经通过插件实现了文章浏览计数、SEO 优化等功能!
- **集成图床**: 内置简单的图床功能,方便图片上传和管理。
- **自动化前端**: 集成 `django-compressor`,自动压缩和优化 CSS 及 JavaScript 文件。
- **健壮的运维**: 内置网站异常邮件提醒和微信公众号管理功能。
## 🛠️ 技术栈
- **后端**: Python 3.10, Django 4.0
- **数据库**: MySQL, SQLite (可配置)
- **缓存**: Redis
- **前端**: HTML5, CSS3, JavaScript
- **搜索**: Whoosh, Elasticsearch (可配置)
- **编辑器**: Markdown (mdeditor)
## 🚀 快速开始
### 1. 环境准备
确保您的系统中已安装 Python 3.10+ 和 MySQL/MariaDB。
### 2. 克隆与安装
```bash
# 克隆项目到本地
git clone https://github.com/liangliangyy/DjangoBlog.git
cd DjangoBlog
# 安装依赖
pip install -r requirements.txt
```
### 3. 项目配置
- **数据库**:
打开 `djangoblog/settings.py` 文件,找到 `DATABASES` 配置项,修改为您的 MySQL 连接信息。
```python
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.mysql',
'NAME': 'djangoblog',
'USER': 'root',
'PASSWORD': 'your_password',
'HOST': '127.0.0.1',
'PORT': 3306,
}
}
```
在 MySQL 中创建数据库:
```sql
CREATE DATABASE `djangoblog` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
```
- **更多配置**:
关于邮件发送、OAuth 登录、缓存等更多高级配置,请参阅我们的 [详细配置文档](/docs/config.md)。
### 4. 初始化数据库
```bash
python manage.py makemigrations
python manage.py migrate
# 创建一个超级管理员账户
python manage.py createsuperuser
```
### 5. 运行项目
```bash
# (可选) 生成一些测试数据
python manage.py create_testdata
# (可选) 收集和压缩静态文件
python manage.py collectstatic --noinput
python manage.py compress --force
# 启动开发服务器
python manage.py runserver
```
现在,在您的浏览器中访问 `http://127.0.0.1:8000/`,您应该能看到 DjangoBlog 的首页了!
## 部署
- **传统部署**: 我们为您准备了非常详细的 [服务器部署教程](https://www.lylinux.net/article/2019/8/5/58.html)。
- **Docker 部署**: 项目已全面支持 Docker。如果您熟悉容器化技术请参考 [Docker 部署文档](/docs/docker.md) 来快速启动。
- **Kubernetes 部署**: 我们也提供了完整的 [Kubernetes 部署指南](/docs/k8s.md),助您轻松上云。
## 🧩 插件系统
插件系统是 DjangoBlog 的核心特色之一。它允许您在不修改核心代码的情况下,通过编写独立的插件来为您的博客添加新功能。
- **工作原理**: 插件通过在预定义的“钩子”上注册回调函数来工作。例如,当一篇文章被渲染时,`after_article_body_get` 钩子会被触发,所有注册到此钩子的函数都会被执行。
- **现有插件**: `view_count`(浏览计数), `seo_optimizer`SEO优化等都是通过插件系统实现的。
- **开发您自己的插件**: 只需在 `plugins` 目录下创建一个新的文件夹,并编写您的 `plugin.py`。欢迎探索并为 DjangoBlog 社区贡献您的创意!
## 🤝 贡献指南
我们热烈欢迎任何形式的贡献!如果您有好的想法或发现了 Bug请随时提交 Issue 或 Pull Request。
## 📄 许可证
本项目基于 [MIT License](LICENSE) 开源。
---
## ❤️ 支持与赞助
如果您觉得这个项目对您有帮助,并且希望支持我继续维护和开发新功能,欢迎请我喝杯咖啡!您的每一份支持都是我前进的最大动力。
<p align="center">
<img src="/docs/imgs/alipay.jpg" width="150" alt="支付宝赞助">
<img src="/docs/imgs/wechat.jpg" width="150" alt="微信赞助">
</p>
<p align="center">
<i>(左) 支付宝 / (右) 微信</i>
</p>
## 🙏 鸣谢
特别感谢 **JetBrains** 为本项目提供的免费开源许可证。
<p align="center">
<a href="https://www.jetbrains.com/?from=DjangoBlog">
<img src="/docs/imgs/pycharm_logo.png" width="150" alt="JetBrains Logo">
</a>
</p>
---
> 如果本项目帮助到了你,请在[这里](https://github.com/liangliangyy/DjangoBlog/issues/214)留下你的网址,让更多的人看到。您的回复将会是我继续更新维护下去的动力。

@ -0,0 +1,50 @@
from django.contrib.auth.models import AbstractUser
from django.db import models
from django.urls import reverse
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from djangoblog.utils import get_current_site
# Create your models here.
class BlogUser(AbstractUser):
nickname = models.CharField(_('nick name'), max_length=100, blank=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
source = models.CharField(_('create source'), max_length=100, blank=True)
def get_absolute_url(self):
return reverse(
'blog:author_detail', kwargs={
'author_name': self.username})
def __str__(self):
return self.email
def get_full_url(self):
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
return url
def get_subscriber_count(self):
"""获取订阅该作者的用户数量"""
from blog.models import Subscription
return Subscription.objects.filter(author=self, subscription_type='author').count()
def get_subscribed_articles_count(self):
"""获取该用户订阅的文章数量"""
from blog.models import Subscription
return Subscription.objects.filter(user=self, subscription_type='article').count()
def get_subscribed_authors_count(self):
"""获取该用户订阅的作者数量"""
from blog.models import Subscription
return Subscription.objects.filter(user=self, subscription_type='author').count()
class Meta:
ordering = ['-id']
verbose_name = _('user')
verbose_name_plural = verbose_name
get_latest_by = 'id'

@ -0,0 +1,233 @@
from django import forms
from django.contrib import admin
from django.contrib.auth import get_user_model
from django.urls import reverse
from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
# Register your models here.
from .models import Article, Subscription, Like, Notification
from django.db.models import Count, Q
from django.utils.html import format_html
from django.urls import reverse
from django.utils import timezone
from datetime import timedelta
class ArticleForm(forms.ModelForm):
# body = forms.CharField(widget=AdminPagedownWidget())
class Meta:
model = Article
fields = '__all__'
def makr_article_publish(modeladmin, request, queryset):
queryset.update(status='p')
def draft_article(modeladmin, request, queryset):
queryset.update(status='d')
def close_article_commentstatus(modeladmin, request, queryset):
queryset.update(comment_status='c')
def open_article_commentstatus(modeladmin, request, queryset):
queryset.update(comment_status='o')
makr_article_publish.short_description = _('Publish selected articles')
draft_article.short_description = _('Draft selected articles')
close_article_commentstatus.short_description = _('Close article comments')
open_article_commentstatus.short_description = _('Open article comments')
class ArticlelAdmin(admin.ModelAdmin):
list_per_page = 20
search_fields = ('body', 'title')
form = ArticleForm
list_display = (
'id',
'title',
'author',
'link_to_category',
'creation_time',
'views',
'likes',
'subscriptions',
'status',
'type',
'article_order')
list_display_links = ('id', 'title')
list_filter = ('status', 'type', 'category')
filter_horizontal = ('tags',)
exclude = ('creation_time', 'last_modify_time')
view_on_site = True
actions = [
makr_article_publish,
draft_article,
close_article_commentstatus,
open_article_commentstatus]
def link_to_category(self, obj):
info = (obj.category._meta.app_label, obj.category._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.category.id,))
return format_html(u'<a href="%s">%s</a>' % (link, obj.category.name))
link_to_category.short_description = _('category')
def get_form(self, request, obj=None, **kwargs):
form = super(ArticlelAdmin, self).get_form(request, obj, **kwargs)
form.base_fields['author'].queryset = get_user_model(
).objects.filter(is_superuser=True)
return form
def save_model(self, request, obj, form, change):
super(ArticlelAdmin, self).save_model(request, obj, form, change)
def get_view_on_site_url(self, obj=None):
if obj:
url = obj.get_full_url()
return url
else:
from djangoblog.utils import get_current_site
site = get_current_site().domain
return site
class TagAdmin(admin.ModelAdmin):
exclude = ('slug', 'last_mod_time', 'creation_time')
class CategoryAdmin(admin.ModelAdmin):
list_display = ('name', 'parent_category', 'index')
exclude = ('slug', 'last_mod_time', 'creation_time')
class LinksAdmin(admin.ModelAdmin):
exclude = ('last_mod_time', 'creation_time')
class SideBarAdmin(admin.ModelAdmin):
list_display = ('name', 'content', 'is_enable', 'sequence')
exclude = ('last_mod_time', 'creation_time')
class BlogSettingsAdmin(admin.ModelAdmin):
pass
class SubscriptionAdmin(admin.ModelAdmin):
"""订阅记录管理"""
list_display = ('id', 'user', 'subscription_type', 'article_link', 'author_link', 'created_time')
list_filter = ('subscription_type', 'created_time')
search_fields = ('user__username', 'article__title', 'author__username')
readonly_fields = ('created_time',)
list_per_page = 20
date_hierarchy = 'created_time'
def article_link(self, obj):
if obj.article:
info = (obj.article._meta.app_label, obj.article._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.article.id,))
return format_html(u'<a href="%s">%s</a>' % (link, obj.article.title))
return '-'
article_link.short_description = '文章'
def author_link(self, obj):
if obj.author:
from accounts.models import BlogUser
info = (BlogUser._meta.app_label, BlogUser._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.author.id,))
return format_html(u'<a href="%s">%s</a>' % (link, obj.author.username))
return '-'
author_link.short_description = '作者'
def changelist_view(self, request, extra_context=None):
"""添加订阅数据分析"""
response = super().changelist_view(request, extra_context=extra_context)
try:
qs = response.context_data['cl'].queryset
except (AttributeError, KeyError):
return response
# 订阅统计
total_subscriptions = qs.count()
article_subscriptions = qs.filter(subscription_type='article').count()
author_subscriptions = qs.filter(subscription_type='author').count()
# 今日订阅数
today = timezone.now().date()
today_subscriptions = qs.filter(created_time__date=today).count()
# 本周订阅数
week_ago = today - timedelta(days=7)
week_subscriptions = qs.filter(created_time__date__gte=week_ago).count()
# 本月订阅数
month_ago = today - timedelta(days=30)
month_subscriptions = qs.filter(created_time__date__gte=month_ago).count()
# 最受欢迎的文章(订阅数最多的文章)
popular_articles = Article.objects.annotate(
subscription_count=Count('subscription', filter=Q(subscription__subscription_type='article'))
).filter(subscription_count__gt=0).order_by('-subscription_count')[:10]
# 最受欢迎的作者(订阅数最多的作者)
from accounts.models import BlogUser
popular_authors = BlogUser.objects.annotate(
subscriber_count=Count('subscribed_by', filter=Q(subscribed_by__subscription_type='author'))
).filter(subscriber_count__gt=0).order_by('-subscriber_count')[:10]
# 添加到上下文
if extra_context is None:
extra_context = {}
extra_context.update({
'total_subscriptions': total_subscriptions,
'article_subscriptions': article_subscriptions,
'author_subscriptions': author_subscriptions,
'today_subscriptions': today_subscriptions,
'week_subscriptions': week_subscriptions,
'month_subscriptions': month_subscriptions,
'popular_articles': popular_articles,
'popular_authors': popular_authors,
})
response.context_data.update(extra_context)
return response
# 注意如果需要独立的数据分析页面可以在admin_site.py中添加自定义URL
# SubscriptionAdmin的changelist_view已经包含了基本的数据分析功能
class NotificationAdmin(admin.ModelAdmin):
"""通知管理"""
list_display = ('id', 'user', 'notification_type', 'title', 'is_read', 'created_time', 'article_link')
list_filter = ('notification_type', 'is_read', 'created_time')
search_fields = ('user__username', 'title', 'content')
readonly_fields = ('created_time',)
list_per_page = 20
date_hierarchy = 'created_time'
actions = ['mark_as_read', 'mark_as_unread']
def article_link(self, obj):
if obj.article:
info = (obj.article._meta.app_label, obj.article._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.article.id,))
return format_html(u'<a href="%s">%s</a>' % (link, obj.article.title))
return '-'
article_link.short_description = '相关文章'
def mark_as_read(self, request, queryset):
queryset.update(is_read=True)
self.message_user(request, f'已标记 {queryset.count()} 条通知为已读')
mark_as_read.short_description = '标记为已读'
def mark_as_unread(self, request, queryset):
queryset.update(is_read=False)
self.message_user(request, f'已标记 {queryset.count()} 条通知为未读')
mark_as_unread.short_description = '标记为未读'

@ -0,0 +1,373 @@
"""
管理后台统计视图
"""
from django.contrib.admin.views.decorators import staff_member_required
from django.shortcuts import render
from django.http import JsonResponse
from django.db.models import Sum, Count, Q
from django.utils import timezone
from datetime import timedelta, datetime
from django.db.models.functions import TruncMonth, TruncYear, TruncQuarter
from blog.models import Article
from comments.models import Comment
@staff_member_required
def statistics_dashboard(request):
"""
统计仪表盘主页面
"""
return render(request, 'admin/blog/statistics_dashboard.html')
@staff_member_required
def get_statistics_overview(request):
"""
获取统计概览数据API
"""
try:
# 总文章数(已发布)
total_articles = Article.objects.filter(status='p', type='a').count()
# 总评论数
total_comments = Comment.objects.filter(is_enable=True).count()
# 总阅读量
total_views = Article.objects.filter(status='p').aggregate(
total=Sum('views')
)['total'] or 0
# 总点赞数
total_likes = Article.objects.filter(status='p').aggregate(
total=Sum('likes')
)['total'] or 0
# 总订阅数
total_subscriptions = Article.objects.filter(status='p').aggregate(
total=Sum('subscriptions')
)['total'] or 0
# 今日新增
today = timezone.now().date()
today_articles = Article.objects.filter(
status='p',
creation_time__date=today
).count()
today_comments = Comment.objects.filter(
is_enable=True,
creation_time__date=today
).count()
# 本周新增
week_ago = today - timedelta(days=7)
week_articles = Article.objects.filter(
status='p',
creation_time__date__gte=week_ago
).count()
week_comments = Comment.objects.filter(
is_enable=True,
creation_time__date__gte=week_ago
).count()
# 本月新增
month_ago = today - timedelta(days=30)
month_articles = Article.objects.filter(
status='p',
creation_time__date__gte=month_ago
).count()
month_comments = Comment.objects.filter(
is_enable=True,
creation_time__date__gte=month_ago
).count()
return JsonResponse({
'success': True,
'data': {
'total_articles': total_articles,
'total_comments': total_comments,
'total_views': total_views,
'total_likes': total_likes,
'total_subscriptions': total_subscriptions,
'today_articles': today_articles,
'today_comments': today_comments,
'week_articles': week_articles,
'week_comments': week_comments,
'month_articles': month_articles,
'month_comments': month_comments,
}
})
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
@staff_member_required
def get_monthly_statistics(request):
"""
获取月度统计数据API
"""
try:
# 获取时间范围参数
start_date = request.GET.get('start_date')
end_date = request.GET.get('end_date')
period = request.GET.get('period', 'month') # month, quarter, year
# 默认查询最近12个月
if not start_date or not end_date:
end_date = timezone.now().date()
start_date = end_date - timedelta(days=365)
else:
start_date = datetime.strptime(start_date, '%Y-%m-%d').date()
end_date = datetime.strptime(end_date, '%Y-%m-%d').date()
# 按月统计文章数
if period == 'month':
articles_data = Article.objects.filter(
status='p',
creation_time__date__gte=start_date,
creation_time__date__lte=end_date
).annotate(
month=TruncMonth('creation_time')
).values('month').annotate(
count=Count('id'),
views=Sum('views'),
likes=Sum('likes'),
subscriptions=Sum('subscriptions')
).order_by('month')
# 按月统计评论数
comments_data = Comment.objects.filter(
is_enable=True,
creation_time__date__gte=start_date,
creation_time__date__lte=end_date
).annotate(
month=TruncMonth('creation_time')
).values('month').annotate(
count=Count('id')
).order_by('month')
elif period == 'quarter':
articles_data = Article.objects.filter(
status='p',
creation_time__date__gte=start_date,
creation_time__date__lte=end_date
).annotate(
quarter=TruncQuarter('creation_time')
).values('quarter').annotate(
count=Count('id'),
views=Sum('views'),
likes=Sum('likes'),
subscriptions=Sum('subscriptions')
).order_by('quarter')
comments_data = Comment.objects.filter(
is_enable=True,
creation_time__date__gte=start_date,
creation_time__date__lte=end_date
).annotate(
quarter=TruncQuarter('creation_time')
).values('quarter').annotate(
count=Count('id')
).order_by('quarter')
else: # year
articles_data = Article.objects.filter(
status='p',
creation_time__date__gte=start_date,
creation_time__date__lte=end_date
).annotate(
year=TruncYear('creation_time')
).values('year').annotate(
count=Count('id'),
views=Sum('views'),
likes=Sum('likes'),
subscriptions=Sum('subscriptions')
).order_by('year')
comments_data = Comment.objects.filter(
is_enable=True,
creation_time__date__gte=start_date,
creation_time__date__lte=end_date
).annotate(
year=TruncYear('creation_time')
).values('year').annotate(
count=Count('id')
).order_by('year')
# 格式化数据
articles_list = []
for item in articles_data:
time_key = 'month' if period == 'month' else ('quarter' if period == 'quarter' else 'year')
time_value = item[time_key]
if period == 'month':
label = time_value.strftime('%Y-%m')
elif period == 'quarter':
label = f"{time_value.year}Q{(time_value.month-1)//3+1}"
else:
label = str(time_value.year)
articles_list.append({
'period': label,
'articles': item['count'],
'views': item['views'] or 0,
'likes': item['likes'] or 0,
'subscriptions': item['subscriptions'] or 0,
})
comments_list = []
for item in comments_data:
time_key = 'month' if period == 'month' else ('quarter' if period == 'quarter' else 'year')
time_value = item[time_key]
if period == 'month':
label = time_value.strftime('%Y-%m')
elif period == 'quarter':
label = f"{time_value.year}Q{(time_value.month-1)//3+1}"
else:
label = str(time_value.year)
comments_list.append({
'period': label,
'comments': item['count'],
})
return JsonResponse({
'success': True,
'data': {
'articles': articles_list,
'comments': comments_list,
'period': period,
'start_date': start_date.strftime('%Y-%m-%d'),
'end_date': end_date.strftime('%Y-%m-%d'),
}
})
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
@staff_member_required
def get_top_articles(request):
"""
获取热门文章排行API
"""
try:
limit = int(request.GET.get('limit', 10))
sort_by = request.GET.get('sort_by', 'views') # views, comments, likes, subscriptions
articles = Article.objects.filter(status='p').annotate(
comment_count=Count('comment', filter=Q(comment__is_enable=True))
)
if sort_by == 'views':
articles = articles.order_by('-views')[:limit]
elif sort_by == 'comments':
articles = articles.order_by('-comment_count')[:limit]
elif sort_by == 'likes':
articles = articles.order_by('-likes')[:limit]
elif sort_by == 'subscriptions':
articles = articles.order_by('-subscriptions')[:limit]
else:
articles = articles.order_by('-views')[:limit]
articles_list = []
for article in articles:
articles_list.append({
'id': article.id,
'title': article.title,
'views': article.views,
'likes': article.likes,
'subscriptions': article.subscriptions,
'comments': article.comment_count,
'author': article.author.username,
'created_time': article.creation_time.strftime('%Y-%m-%d'),
'url': article.get_absolute_url(),
})
return JsonResponse({
'success': True,
'data': articles_list
})
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
@staff_member_required
def get_category_statistics(request):
"""
获取分类统计数据API
"""
try:
from blog.models import Category
categories = Category.objects.annotate(
article_count=Count('article', filter=Q(article__status='p')),
total_views=Sum('article__views', filter=Q(article__status='p')),
total_likes=Sum('article__likes', filter=Q(article__status='p')),
total_comments=Count('article__comment', filter=Q(article__status='p', article__comment__is_enable=True))
).filter(article_count__gt=0).order_by('-article_count')
categories_list = []
for category in categories:
categories_list.append({
'name': category.name,
'article_count': category.article_count,
'total_views': category.total_views or 0,
'total_likes': category.total_likes or 0,
'total_comments': category.total_comments or 0,
})
return JsonResponse({
'success': True,
'data': categories_list
})
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)
@staff_member_required
def get_author_statistics(request):
"""
获取作者统计数据API
"""
try:
from accounts.models import BlogUser
authors = BlogUser.objects.annotate(
article_count=Count('article', filter=Q(article__status='p')),
total_views=Sum('article__views', filter=Q(article__status='p')),
total_likes=Sum('article__likes', filter=Q(article__status='p')),
total_comments=Count('article__comment', filter=Q(article__status='p', article__comment__is_enable=True)),
subscriber_count=Count('subscribed_by', filter=Q(subscribed_by__subscription_type='author'))
).filter(article_count__gt=0).order_by('-article_count')
authors_list = []
for author in authors:
authors_list.append({
'username': author.username,
'article_count': author.article_count,
'total_views': author.total_views or 0,
'total_likes': author.total_likes or 0,
'total_comments': author.total_comments or 0,
'subscriber_count': author.subscriber_count or 0,
})
return JsonResponse({
'success': True,
'data': authors_list
})
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
}, status=500)

@ -0,0 +1,78 @@
# Generated by Django 5.2.4 on 2025-11-19 22:16
import django.db.models.deletion
import django.utils.timezone
from django.conf import settings
from django.db import migrations, models, connection
def add_likes_field_if_not_exists(apps, schema_editor):
"""只在likes字段不存在时添加"""
with connection.cursor() as cursor:
# 检查字段是否已存在
cursor.execute("""
SELECT COUNT(*)
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'blog_article'
AND COLUMN_NAME = 'likes'
""")
exists = cursor.fetchone()[0] > 0
if not exists:
# 字段不存在,添加字段
cursor.execute("""
ALTER TABLE blog_article
ADD COLUMN likes INTEGER UNSIGNED NOT NULL DEFAULT 0
""")
def reverse_add_likes_field(apps, schema_editor):
"""回滚操作移除likes字段"""
with connection.cursor() as cursor:
cursor.execute("""
ALTER TABLE blog_article
DROP COLUMN IF EXISTS likes
""")
class Migration(migrations.Migration):
dependencies = [
('blog', '0006_alter_blogsettings_options'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.RunPython(
add_likes_field_if_not_exists,
reverse_add_likes_field,
),
migrations.CreateModel(
name='Like',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
('article', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='blog.article', verbose_name='文章')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='用户')),
],
options={
'verbose_name': '点赞记录',
'verbose_name_plural': '点赞记录',
'unique_together': {('user', 'article')},
},
),
# 使用SeparateDatabaseAndState数据库操作已在RunPython中完成只更新Django状态
migrations.SeparateDatabaseAndState(
database_operations=[
# 数据库操作已经在RunPython中完成这里不需要再执行
],
state_operations=[
migrations.AddField(
model_name='article',
name='likes',
field=models.PositiveIntegerField(default=0, verbose_name='点赞数'),
),
],
),
]

@ -0,0 +1,73 @@
# Generated by Django 5.2.4 on 2025-11-22 19:27
import django.db.models.deletion
import django.utils.timezone
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("blog", "0007_article_likes_like"),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.RemoveField(
model_name="tag",
name="creation_time",
),
migrations.RemoveField(
model_name="tag",
name="last_modify_time",
),
migrations.AlterField(
model_name="tag",
name="id",
field=models.BigAutoField(
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
),
),
migrations.CreateModel(
name="TagSubscription",
fields=[
(
"id",
models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name="ID",
),
),
(
"created_time",
models.DateTimeField(
default=django.utils.timezone.now, verbose_name="创建时间"
),
),
(
"tag",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to="blog.tag",
verbose_name="标签",
),
),
(
"user",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
to=settings.AUTH_USER_MODEL,
verbose_name="用户",
),
),
],
options={
"verbose_name": "标签订阅",
"verbose_name_plural": "标签订阅",
"unique_together": {("user", "tag")},
},
),
]

@ -0,0 +1,58 @@
# Generated by Django 5.2.4 on 2025-01-XX XX:XX
import django.db.models.deletion
import django.utils.timezone
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('blog', '0008_remove_tag_creation_time_remove_tag_last_modify_time_and_more'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.AddField(
model_name='article',
name='subscriptions',
field=models.PositiveIntegerField(default=0, verbose_name='订阅数'),
),
migrations.CreateModel(
name='Subscription',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('subscription_type', models.CharField(choices=[('article', '文章'), ('author', '作者')], default='article', max_length=10, verbose_name='订阅类型')),
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
('article', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='blog.article', verbose_name='订阅的文章')),
('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='subscribed_by', to=settings.AUTH_USER_MODEL, verbose_name='订阅的作者')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='订阅用户')),
],
options={
'verbose_name': '订阅记录',
'verbose_name_plural': '订阅记录',
},
),
migrations.AddIndex(
model_name='subscription',
index=models.Index(fields=['user', 'subscription_type'], name='blog_subscr_user_id_idx'),
),
migrations.AddIndex(
model_name='subscription',
index=models.Index(fields=['article'], name='blog_subscr_article_idx'),
),
migrations.AddIndex(
model_name='subscription',
index=models.Index(fields=['author'], name='blog_subscr_author_idx'),
),
migrations.AddIndex(
model_name='subscription',
index=models.Index(fields=['user', 'article'], name='blog_subscr_user_article_idx'),
),
migrations.AddIndex(
model_name='subscription',
index=models.Index(fields=['user', 'author'], name='blog_subscr_user_author_idx'),
),
]

@ -0,0 +1,38 @@
# Generated by Django 5.2.4 on 2025-11-22 20:06
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
("blog", "0009_article_subscriptions_subscription"),
]
operations = [
migrations.RenameIndex(
model_name="subscription",
new_name="blog_subscr_user_id_ae3348_idx",
old_name="blog_subscr_user_id_idx",
),
migrations.RenameIndex(
model_name="subscription",
new_name="blog_subscr_article_dc0890_idx",
old_name="blog_subscr_article_idx",
),
migrations.RenameIndex(
model_name="subscription",
new_name="blog_subscr_author__279c7a_idx",
old_name="blog_subscr_author_idx",
),
migrations.RenameIndex(
model_name="subscription",
new_name="blog_subscr_user_id_be80aa_idx",
old_name="blog_subscr_user_article_idx",
),
migrations.RenameIndex(
model_name="subscription",
new_name="blog_subscr_user_id_a74bfc_idx",
old_name="blog_subscr_user_author_idx",
),
]

@ -0,0 +1,45 @@
# Generated by Django 5.2.4 on 2025-11-22 XX:XX
import django.db.models.deletion
import django.utils.timezone
from django.conf import settings
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('blog', '0010_rename_blog_subscr_user_id_idx_blog_subscr_user_id_ae3348_idx_and_more'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
]
operations = [
migrations.CreateModel(
name='Notification',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('notification_type', models.CharField(choices=[('article_published', '文章发布'), ('article_updated', '文章更新'), ('author_new_article', '作者新文章')], default='article_published', max_length=20, verbose_name='通知类型')),
('title', models.CharField(max_length=200, verbose_name='通知标题')),
('content', models.TextField(blank=True, verbose_name='通知内容')),
('is_read', models.BooleanField(default=False, verbose_name='已读')),
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
('article', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='blog.article', verbose_name='相关文章')),
('author', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, related_name='sent_notifications', to=settings.AUTH_USER_MODEL, verbose_name='相关作者')),
('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='notifications', to=settings.AUTH_USER_MODEL, verbose_name='接收用户')),
],
options={
'verbose_name': '通知',
'verbose_name_plural': '通知',
'ordering': ['-created_time'],
},
),
migrations.AddIndex(
model_name='notification',
index=models.Index(fields=['user', 'is_read'], name='blog_notifi_user_id_idx'),
),
migrations.AddIndex(
model_name='notification',
index=models.Index(fields=['created_time'], name='blog_notifi_created_idx'),
),
]

@ -0,0 +1,577 @@
import logging
import re
from abc import abstractmethod
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.urls import reverse
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from mdeditor.fields import MDTextField
from uuslug import slugify
from djangoblog.utils import cache_decorator, cache
from djangoblog.utils import get_current_site
logger = logging.getLogger(__name__)
class LinkShowType(models.TextChoices):
I = ('i', _('index'))
L = ('l', _('list'))
P = ('p', _('post'))
A = ('a', _('all'))
S = ('s', _('slide'))
class BaseModel(models.Model):
id = models.AutoField(primary_key=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('modify time'), default=now)
def save(self, *args, **kwargs):
is_update_views = isinstance(
self,
Article) and 'update_fields' in kwargs and kwargs['update_fields'] == ['views']
if is_update_views:
Article.objects.filter(pk=self.pk).update(views=self.views)
else:
if 'slug' in self.__dict__:
slug = getattr(
self, 'title') if 'title' in self.__dict__ else getattr(
self, 'name')
setattr(self, 'slug', slugify(slug))
super().save(*args, **kwargs)
def get_full_url(self):
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
return url
class Meta:
abstract = True
@abstractmethod
def get_absolute_url(self):
pass
class Article(BaseModel):
"""文章"""
STATUS_CHOICES = (
('d', _('Draft')),
('p', _('Published')),
)
COMMENT_STATUS = (
('o', _('Open')),
('c', _('Close')),
)
TYPE = (
('a', _('Article')),
('p', _('Page')),
)
title = models.CharField(_('title'), max_length=200, unique=True)
body = MDTextField(_('body'))
pub_time = models.DateTimeField(
_('publish time'), blank=False, null=False, default=now)
status = models.CharField(
_('status'),
max_length=1,
choices=STATUS_CHOICES,
default='p')
comment_status = models.CharField(
_('comment status'),
max_length=1,
choices=COMMENT_STATUS,
default='o')
type = models.CharField(_('type'), max_length=1, choices=TYPE, default='a')
views = models.PositiveIntegerField(_('views'), default=0)
likes = models.PositiveIntegerField('点赞数', default=0) # 添加点赞字段
subscriptions = models.PositiveIntegerField('订阅数', default=0) # 添加订阅字段
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
blank=False,
null=False,
on_delete=models.CASCADE)
article_order = models.IntegerField(
_('order'), blank=False, null=False, default=0)
show_toc = models.BooleanField(_('show toc'), blank=False, null=False, default=False)
category = models.ForeignKey(
'Category',
verbose_name=_('category'),
on_delete=models.CASCADE,
blank=False,
null=False)
tags = models.ManyToManyField('Tag', verbose_name=_('tag'), blank=True)
def body_to_string(self):
return self.body
def __str__(self):
return self.title
class Meta:
ordering = ['-article_order', '-pub_time']
verbose_name = _('article')
verbose_name_plural = verbose_name
get_latest_by = 'id'
def get_absolute_url(self):
return reverse('blog:detailbyid', kwargs={
'article_id': self.id,
'year': self.creation_time.year,
'month': self.creation_time.month,
'day': self.creation_time.day
})
@cache_decorator(60 * 60 * 10)
def get_category_tree(self):
tree = self.category.get_category_tree()
names = list(map(lambda c: (c.name, c.get_absolute_url()), tree))
return names
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
def viewed(self):
self.views += 1
self.save(update_fields=['views'])
def comment_list(self):
cache_key = 'article_comments_{id}'.format(id=self.id)
value = cache.get(cache_key)
if value:
logger.info('get article comments:{id}'.format(id=self.id))
return value
else:
comments = self.comment_set.filter(is_enable=True).order_by('-id')
cache.set(cache_key, comments, 60 * 100)
logger.info('set article comments:{id}'.format(id=self.id))
return comments
def get_admin_url(self):
info = (self._meta.app_label, self._meta.model_name)
return reverse('admin:%s_%s_change' % info, args=(self.pk,))
@cache_decorator(expiration=60 * 100)
def next_article(self):
# 下一篇
return Article.objects.filter(
id__gt=self.id, status='p').order_by('id').first()
@cache_decorator(expiration=60 * 100)
def prev_article(self):
# 前一篇
return Article.objects.filter(id__lt=self.id, status='p').first()
def get_first_image_url(self):
"""
Get the first image url from article.body.
:return:
"""
match = re.search(r'!\[.*?\]\((.+?)\)', self.body)
if match:
return match.group(1)
return ""
def increase_views(self):
self.views += 1
self.save(update_fields=['views'])
def like(self):
"""增加点赞数"""
self.likes += 1
self.save(update_fields=['likes'])
def unlike(self):
"""取消点赞"""
if self.likes > 0:
self.likes -= 1
self.save(update_fields=['likes'])
def subscribe(self):
"""增加订阅数"""
self.subscriptions += 1
self.save(update_fields=['subscriptions'])
# 清除缓存
cache_key = f'article_subscriptions_{self.id}'
cache.delete(cache_key)
def unsubscribe(self):
"""取消订阅"""
if self.subscriptions > 0:
self.subscriptions -= 1
self.save(update_fields=['subscriptions'])
# 清除缓存
cache_key = f'article_subscriptions_{self.id}'
cache.delete(cache_key)
class Like(models.Model):
"""点赞记录模型"""
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, verbose_name='用户')
article = models.ForeignKey(Article, on_delete=models.CASCADE, verbose_name='文章')
created_time = models.DateTimeField('创建时间', default=now)
class Meta:
unique_together = ('user', 'article') # 确保每个用户只能对同一篇文章点赞一次
verbose_name = '点赞记录'
verbose_name_plural = verbose_name
def __str__(self):
return f"{self.user.username} likes {self.article.title}"
def save(self, *args, **kwargs):
# 检查是否已存在该用户的点赞记录
if not self.pk and Like.objects.filter(user=self.user, article=self.article).exists():
raise ValidationError("您已经点过赞了")
super().save(*args, **kwargs)
# 同步更新文章点赞数
self.article.like()
def delete(self, *args, **kwargs):
# 先保存文章引用
article = self.article
super().delete(*args, **kwargs)
# 同步更新文章点赞数
article.unlike()
class Subscription(models.Model):
"""订阅记录模型 - 支持订阅文章和作者"""
SUBSCRIPTION_TYPE_CHOICES = (
('article', '文章'),
('author', '作者'),
)
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, verbose_name='订阅用户')
article = models.ForeignKey(Article, on_delete=models.CASCADE, verbose_name='订阅的文章', null=True, blank=True)
author = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, verbose_name='订阅的作者',
related_name='subscribed_by', null=True, blank=True)
subscription_type = models.CharField('订阅类型', max_length=10, choices=SUBSCRIPTION_TYPE_CHOICES, default='article')
created_time = models.DateTimeField('创建时间', default=now)
class Meta:
# 注意unique_together不能很好地处理null值所以在save方法中手动检查
verbose_name = '订阅记录'
verbose_name_plural = verbose_name
indexes = [
models.Index(fields=['user', 'subscription_type']),
models.Index(fields=['article']),
models.Index(fields=['author']),
models.Index(fields=['user', 'article']),
models.Index(fields=['user', 'author']),
]
def __str__(self):
if self.subscription_type == 'article' and self.article:
return f"{self.user.username} 订阅文章 {self.article.title}"
elif self.subscription_type == 'author' and self.author:
return f"{self.user.username} 订阅作者 {self.author.username}"
return f"{self.user.username} 的订阅记录"
def clean(self):
"""验证订阅类型和对应字段的一致性"""
from django.core.exceptions import ValidationError
if self.subscription_type == 'article' and not self.article:
raise ValidationError("订阅文章时必须指定文章")
if self.subscription_type == 'author' and not self.author:
raise ValidationError("订阅作者时必须指定作者")
if self.subscription_type == 'article' and self.author:
raise ValidationError("订阅文章时不能指定作者")
if self.subscription_type == 'author' and self.article:
raise ValidationError("订阅作者时不能指定文章")
def save(self, *args, **kwargs):
self.clean()
# 检查是否已存在该订阅记录(手动实现唯一约束)
if not self.pk:
if self.subscription_type == 'article' and self.article:
existing = Subscription.objects.filter(
user=self.user,
article=self.article,
subscription_type='article'
)
if existing.exists():
raise ValidationError("您已经订阅过这篇文章了")
elif self.subscription_type == 'author' and self.author:
existing = Subscription.objects.filter(
user=self.user,
author=self.author,
subscription_type='author'
)
if existing.exists():
raise ValidationError("您已经订阅过这位作者了")
super().save(*args, **kwargs)
# 同步更新订阅数
if self.subscription_type == 'article' and self.article:
self.article.subscribe()
# 注意作者订阅数可以通过related_name查询不需要单独字段
def delete(self, *args, **kwargs):
# 先保存引用
article = self.article
author = self.author
subscription_type = self.subscription_type
super().delete(*args, **kwargs)
# 同步更新订阅数
if subscription_type == 'article' and article:
article.unsubscribe()
class Category(BaseModel):
"""文章分类"""
name = models.CharField(_('category name'), max_length=30, unique=True)
parent_category = models.ForeignKey(
'self',
verbose_name=_('parent category'),
blank=True,
null=True,
on_delete=models.CASCADE)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
index = models.IntegerField(default=0, verbose_name=_('index'))
class Meta:
ordering = ['-index']
verbose_name = _('category')
verbose_name_plural = verbose_name
def get_absolute_url(self):
return reverse(
'blog:category_detail', kwargs={
'category_name': self.slug})
def __str__(self):
return self.name
@cache_decorator(60 * 60 * 10)
def get_category_tree(self):
"""
递归获得分类目录的父级
:return:
"""
categorys = []
def parse(category):
categorys.append(category)
if category.parent_category:
parse(category.parent_category)
parse(self)
return categorys
@cache_decorator(60 * 60 * 10)
def get_sub_categorys(self):
"""
获得当前分类目录所有子集
:return:
"""
categorys = []
all_categorys = Category.objects.all()
def parse(category):
if category not in categorys:
categorys.append(category)
childs = all_categorys.filter(parent_category=category)
for child in childs:
if category not in categorys:
categorys.append(child)
parse(child)
parse(self)
return categorys
class Tag(models.Model):
"""文章标签"""
name = models.CharField(_('tag name'), max_length=30, unique=True)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
def __str__(self):
return self.name
def get_absolute_url(self):
return reverse('blog:tag_detail', kwargs={'tag_name': self.slug})
@cache_decorator(60 * 60 * 10)
def get_article_count(self):
return Article.objects.filter(tags__name=self.name).distinct().count()
class Meta:
ordering = ['name']
verbose_name = _('tag')
verbose_name_plural = verbose_name
class Links(models.Model):
"""友情链接"""
name = models.CharField(_('link name'), max_length=30, unique=True)
link = models.URLField(_('link'))
sequence = models.IntegerField(_('order'), unique=True)
is_enable = models.BooleanField(
_('is show'), default=True, blank=False, null=False)
show_type = models.CharField(
_('show type'),
max_length=1,
choices=LinkShowType.choices,
default=LinkShowType.I)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_mod_time = models.DateTimeField(_('modify time'), default=now)
class Meta:
ordering = ['sequence']
verbose_name = _('link')
verbose_name_plural = verbose_name
def __str__(self):
return self.name
class SideBar(models.Model):
"""侧边栏,可以展示一些html内容"""
name = models.CharField(_('title'), max_length=100)
content = models.TextField(_('content'))
sequence = models.IntegerField(_('order'), unique=True)
is_enable = models.BooleanField(_('is enable'), default=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_mod_time = models.DateTimeField(_('modify time'), default=now)
class Meta:
ordering = ['sequence']
verbose_name = _('sidebar')
verbose_name_plural = verbose_name
def __str__(self):
return self.name
class BlogSettings(models.Model):
"""blog的配置"""
site_name = models.CharField(
_('site name'),
max_length=200,
null=False,
blank=False,
default='')
site_description = models.TextField(
_('site description'),
max_length=1000,
null=False,
blank=False,
default='')
site_seo_description = models.TextField(
_('site seo description'), max_length=1000, null=False, blank=False, default='')
site_keywords = models.TextField(
_('site keywords'),
max_length=1000,
null=False,
blank=False,
default='')
article_sub_length = models.IntegerField(_('article sub length'), default=300)
sidebar_article_count = models.IntegerField(_('sidebar article count'), default=10)
sidebar_comment_count = models.IntegerField(_('sidebar comment count'), default=5)
article_comment_count = models.IntegerField(_('article comment count'), default=5)
show_google_adsense = models.BooleanField(_('show adsense'), default=False)
google_adsense_codes = models.TextField(
_('adsense code'), max_length=2000, null=True, blank=True, default='')
open_site_comment = models.BooleanField(_('open site comment'), default=True)
global_header = models.TextField("公共头部", null=True, blank=True, default='')
global_footer = models.TextField("公共尾部", null=True, blank=True, default='')
beian_code = models.CharField(
'备案号',
max_length=2000,
null=True,
blank=True,
default='')
analytics_code = models.TextField(
"网站统计代码",
max_length=1000,
null=False,
blank=False,
default='')
show_gongan_code = models.BooleanField(
'是否显示公安备案号', default=False, null=False)
gongan_beiancode = models.TextField(
'公安备案号',
max_length=2000,
null=True,
blank=True,
default='')
comment_need_review = models.BooleanField(
'评论是否需要审核', default=False, null=False)
class Meta:
verbose_name = _('Website configuration')
verbose_name_plural = verbose_name
def __str__(self):
return self.site_name
def clean(self):
if BlogSettings.objects.exclude(id=self.id).count():
raise ValidationError(_('There can only be one configuration'))
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
from djangoblog.utils import cache
cache.clear()
class TagSubscription(models.Model):
"""
标签订阅模型
"""
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, verbose_name='用户')
tag = models.ForeignKey('Tag', on_delete=models.CASCADE, verbose_name='标签')
created_time = models.DateTimeField(default=now, verbose_name='创建时间')
class Meta:
unique_together = ('user', 'tag') # 确保用户不能重复订阅同一标签
verbose_name = '标签订阅'
verbose_name_plural = verbose_name
def __str__(self):
return f"{self.user.username} 订阅 {self.tag.name}"
class Notification(models.Model):
"""通知模型 - 用于订阅内容更新通知"""
NOTIFICATION_TYPE_CHOICES = (
('article_published', '文章发布'),
('article_updated', '文章更新'),
('author_new_article', '作者新文章'),
)
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, verbose_name='接收用户', related_name='notifications')
notification_type = models.CharField('通知类型', max_length=20, choices=NOTIFICATION_TYPE_CHOICES, default='article_published')
title = models.CharField('通知标题', max_length=200)
content = models.TextField('通知内容', blank=True)
article = models.ForeignKey(Article, on_delete=models.CASCADE, verbose_name='相关文章', null=True, blank=True)
author = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, verbose_name='相关作者', related_name='sent_notifications', null=True, blank=True)
is_read = models.BooleanField('已读', default=False)
created_time = models.DateTimeField('创建时间', default=now)
class Meta:
ordering = ['-created_time']
verbose_name = '通知'
verbose_name_plural = verbose_name
indexes = [
models.Index(fields=['user', 'is_read']),
models.Index(fields=['created_time']),
]
def __str__(self):
return f"{self.user.username} - {self.title}"
def mark_as_read(self):
"""标记为已读"""
self.is_read = True
self.save(update_fields=['is_read'])

@ -0,0 +1,370 @@
import hashlib
import logging
import random
import urllib
from django import template
from django.conf import settings
from django.template.defaultfilters import stringfilter
from django.utils.safestring import mark_safe
from django.urls import reverse
from django.core.cache import cache
from django.utils.html import strip_tags
from django.contrib.sites.models import Site
from django.db.models import Q
from blog.models import Article, Category, Tag, LinkShowType, Links, SideBar
from oauth.models import OAuthUser
from django.templatetags.static import static
from django.shortcuts import get_object_or_404
from djangoblog.utils import get_current_site, sanitize_html, CommonMarkdown
from comments.models import Comment
register = template.Library()
logger = logging.getLogger(__name__)
@register.simple_tag
def keywords_to_str(article):
"""
将文章的标签转换为字符串
"""
tags = article.tags.all()
if tags:
return mark_safe(', '.join([tag.name for tag in tags]))
return ''
@register.simple_tag(takes_context=True)
def head_meta(context):
from djangoblog.plugin_manage import hooks
return mark_safe(hooks.apply_filters('head_meta', '', context))
@register.simple_tag
def timeformat(data):
try:
return data.strftime(settings.TIME_FORMAT)
except Exception as e:
logger.error(e)
return ""
@register.simple_tag
def datetimeformat(data):
try:
# 如果传入的是字符串则尝试解析为datetime对象
if isinstance(data, str):
from datetime import datetime
# 尝试常见的日期时间格式
try:
# 假设是ISO格式的字符串
data = datetime.fromisoformat(data.replace('Z', '+00:00'))
except ValueError:
# 如果失败则原样返回
return data
return data.strftime(settings.DATE_TIME_FORMAT)
except Exception as e:
logger.error(e)
return ""
@register.filter()
@stringfilter
def custom_markdown(content):
from djangoblog.utils import CommonMarkdown
return mark_safe(CommonMarkdown.get_markdown(content))
@register.simple_tag
def get_markdown_toc(content):
from djangoblog.utils import CommonMarkdown
body, toc = CommonMarkdown.get_markdown_with_toc(content)
return mark_safe(toc)
@register.filter()
@stringfilter
def comment_markdown(content):
content = CommonMarkdown.get_markdown(content)
return mark_safe(sanitize_html(content))
@register.filter(is_safe=True)
@stringfilter
def truncatechars_content(content):
"""
获得文章内容的摘要
:param content:
:return:
"""
from django.template.defaultfilters import truncatechars_html
from djangoblog.utils import get_blog_setting
blogsetting = get_blog_setting()
return truncatechars_html(content, blogsetting.article_sub_length)
@register.filter(is_safe=True)
@stringfilter
def truncate(content):
from django.utils.html import strip_tags
return strip_tags(content)[:150]
@register.inclusion_tag('blog/tags/breadcrumb.html')
def load_breadcrumb(article):
"""
获得文章面包屑
:param article:
:return:
"""
names = article.get_category_tree()
from djangoblog.utils import get_blog_setting
blogsetting = get_blog_setting()
site = get_current_site().domain
names.append((blogsetting.site_name, '/'))
names = names[::-1]
return {
'names': names,
'title': article.title,
'count': len(names) + 1
}
@register.inclusion_tag('blog/tags/article_tag_list.html')
def load_articletags(article):
"""
文章标签
:param article:
:return:
"""
tags = article.tags.all()
tags_list = []
for tag in tags:
url = tag.get_absolute_url()
count = tag.get_article_count()
tags_list.append((
url, count, tag, random.choice(settings.BOOTSTRAP_COLOR_TYPES)
))
return {
'article_tags_list': tags_list
}
@register.inclusion_tag('blog/tags/sidebar.html')
def load_sidebar(user, linktype):
"""
加载侧边栏
:return:
"""
value = cache.get("sidebar" + linktype)
if not value:
logger.info('load sidebar')
from djangoblog.utils import get_blog_setting
blogsetting = get_blog_setting()
recent_articles = Article.objects.filter(
status='p')[:blogsetting.sidebar_article_count]
sidebar_categorys = Category.objects.all()
extra_sidebars = SideBar.objects.filter(
is_enable=True).order_by('sequence')
most_read_articles = Article.objects.filter(status='p').order_by(
'-views')[:blogsetting.sidebar_article_count]
dates = Article.objects.datetimes('creation_time', 'month', order='DESC')
links = Links.objects.filter(is_enable=True).filter(
Q(show_type=str(linktype)) | Q(show_type=LinkShowType.A))
commment_list = Comment.objects.filter(is_enable=True).order_by(
'-id')[:blogsetting.sidebar_comment_count]
# 标签云 计算字体大小
# 根据总数计算出平均值 大小为 (数目/平均值)*步长
increment = 3 # 减小步长,让字体大小差异更小
tags = Tag.objects.all()
sidebar_tags = None
if tags and len(tags) > 0:
s = [t for t in [(t, t.get_article_count()) for t in tags] if t[1]]
count = sum([t[1] for t in s])
dd = 1 if (count == 0 or not len(tags)) else count / len(tags)
import random
sidebar_tags = list(
map(lambda x: (x[0], x[1], (x[1] / dd) * increment + 8), s)) # 基础大小从10改为8
random.shuffle(sidebar_tags)
value = {
'recent_articles': recent_articles,
'sidebar_categorys': sidebar_categorys,
'most_read_articles': most_read_articles,
'article_dates': dates,
'sidebar_comments': commment_list,
'sidabar_links': links,
'show_google_adsense': blogsetting.show_google_adsense,
'google_adsense_codes': blogsetting.google_adsense_codes,
'open_site_comment': blogsetting.open_site_comment,
'show_gongan_code': blogsetting.show_gongan_code,
'sidebar_tags': sidebar_tags,
'extra_sidebars': extra_sidebars
}
cache.set("sidebar" + linktype, value, 60 * 60 * 60 * 3)
logger.info('set sidebar cache.key:{key}'.format(key="sidebar" + linktype))
# 获取用户的标签订阅状态(每次都需要实时获取,不缓存)
tag_subscriptions = set()
if user and user.is_authenticated:
from blog.models import TagSubscription
tag_subscriptions = set(TagSubscription.objects.filter(user=user).values_list('tag_id', flat=True))
value['user'] = user
value['tag_subscriptions'] = tag_subscriptions
return value
@register.inclusion_tag('blog/tags/article_meta_info.html')
def load_article_metas(article, user):
"""
获得文章meta信息
:param article:
:return:
"""
return {
'article': article,
'user': user
}
@register.inclusion_tag('blog/tags/article_pagination.html')
def load_pagination_info(page_obj, page_type, tag_name):
previous_url = ''
next_url = ''
if page_type == '':
if page_obj.has_next():
next_number = page_obj.next_page_number()
next_url = reverse('blog:index_page', kwargs={'page': next_number})
if page_obj.has_previous():
previous_number = page_obj.previous_page_number()
previous_url = reverse(
'blog:index_page', kwargs={
'page': previous_number})
if page_type == '分类标签归档':
tag = get_object_or_404(Tag, name=tag_name)
if page_obj.has_next():
next_number = page_obj.next_page_number()
next_url = reverse(
'blog:tag_detail_page',
kwargs={
'page': next_number,
'tag_name': tag.slug})
if page_obj.has_previous():
previous_number = page_obj.previous_page_number()
previous_url = reverse(
'blog:tag_detail_page',
kwargs={
'page': previous_number,
'tag_name': tag.slug})
if page_type == '作者文章归档':
if page_obj.has_next():
next_number = page_obj.next_page_number()
next_url = reverse(
'blog:author_detail_page',
kwargs={
'page': next_number,
'author_name': tag_name})
if page_obj.has_previous():
previous_number = page_obj.previous_page_number()
previous_url = reverse(
'blog:author_detail_page',
kwargs={
'page': previous_number,
'author_name': tag_name})
if page_type == '分类目录归档':
category = get_object_or_404(Category, name=tag_name)
if page_obj.has_next():
next_number = page_obj.next_page_number()
next_url = reverse(
'blog:category_detail_page',
kwargs={
'page': next_number,
'category_name': category.slug})
if page_obj.has_previous():
previous_number = page_obj.previous_page_number()
previous_url = reverse(
'blog:category_detail_page',
kwargs={
'page': previous_number,
'category_name': category.slug})
return {
'previous_url': previous_url,
'next_url': next_url,
'page_obj': page_obj
}
@register.inclusion_tag('blog/tags/article_info.html')
def load_article_detail(article, isindex, user):
"""
加载文章详情
:param article:
:param isindex:是否列表页若是列表页只显示摘要
:return:
"""
from djangoblog.utils import get_blog_setting
blogsetting = get_blog_setting()
return {
'article': article,
'isindex': isindex,
'user': user,
'open_site_comment': blogsetting.open_site_comment,
}
# return only the URL of the gravatar
# TEMPLATE USE: {{ email|gravatar_url:150 }}
@register.filter
def gravatar_url(email, size=40):
"""获得gravatar头像"""
cachekey = 'gravatat/' + email
url = cache.get(cachekey)
if url:
return url
else:
usermodels = OAuthUser.objects.filter(email=email)
if usermodels:
o = list(filter(lambda x: x.picture is not None, usermodels))
if o:
return o[0].picture
email = email.encode('utf-8')
default = static('blog/img/avatar.png')
url = "https://www.gravatar.com/avatar/%s?%s" % (hashlib.md5(
email.lower()).hexdigest(), urllib.parse.urlencode({'d': default, 's': str(size)}))
cache.set(cachekey, url, 60 * 60 * 10)
logger.info('set gravatar cache.key:{key}'.format(key=cachekey))
return url
@register.filter
def gravatar(email, size=40):
"""获得gravatar头像"""
url = gravatar_url(email, size)
return mark_safe(
'<img src="%s" height="%d" width="%d">' %
(url, size, size))
@register.simple_tag
def query(qs, **kwargs):
""" template tag which allows queryset filtering. Usage:
{% query books author=author as mybooks %}
{% for book in mybooks %}
...
{% endfor %}
"""
return qs.filter(**kwargs)
@register.filter
def addstr(arg1, arg2):
"""concatenate arg1 & arg2"""
return str(arg1) + str(arg2)

@ -0,0 +1,74 @@
from django.urls import path
from . import views
app_name = 'blog'
urlpatterns = [
path(
r'',
views.IndexView.as_view(),
name='index'),
path(
r'page/<int:page>/',
views.IndexView.as_view(),
name='index_page'),
path(
r'article/<int:year>/<int:month>/<int:day>/<int:article_id>.html',
views.ArticleDetailView.as_view(),
name='detailbyid'),
path(
r'category/<slug:category_name>.html',
views.CategoryDetailView.as_view(),
name='category_detail'),
path(
r'category/<slug:category_name>/<int:page>.html',
views.CategoryDetailView.as_view(),
name='category_detail_page'),
path(
r'author/<author_name>.html',
views.AuthorDetailView.as_view(),
name='author_detail'),
path(
r'author/<author_name>/<int:page>.html',
views.AuthorDetailView.as_view(),
name='author_detail_page'),
path(
'archives.html',
views.ArchivesView.as_view(),
name='archives'),
path(
'links.html',
views.LinkListView.as_view(),
name='links'),
path(
r'upload',
views.fileupload,
name='upload'),
path(
r'clean',
views.clean_cache_view,
name='clean'),
path('article/<int:article_id>/like/', views.like_article, name='like_article'),
path('article/<int:article_id>/like/check/', views.check_like_status, name='check_like_status'),
# 订阅相关路由 - 必须在标签详情路由之前,避免路由冲突
path('article/<int:article_id>/subscribe/', views.subscribe_article, name='subscribe_article'),
path('article/<int:article_id>/subscribe/check/', views.check_subscribe_status, name='check_subscribe_status'),
path('author/<int:author_id>/subscribe/', views.subscribe_author, name='subscribe_author'),
path('author/<int:author_id>/subscribe/check/', views.check_subscribe_status, name='check_author_subscribe_status'),
path('tag/<int:tag_id>/subscribe/', views.subscribe_tag, name='subscribe_tag'),
path(
r'tag/<slug:tag_name>.html',
views.TagDetailView.as_view(),
name='tag_detail'),
path(
r'tag/<slug:tag_name>/<int:page>.html',
views.TagDetailView.as_view(),
name='tag_detail_page'),
path('subscription/history/', views.subscription_history, name='subscription_history'),
# 通知相关路由
path('notifications/', views.notification_list, name='notification_list'),
path('notifications/unread/count/', views.get_unread_notification_count, name='unread_notification_count'),
path('notifications/recent/', views.get_recent_notifications, name='recent_notifications'),
path('notifications/<int:notification_id>/read/', views.mark_notification_read, name='mark_notification_read'),
path('notifications/read/all/', views.mark_all_notifications_read, name='mark_all_notifications_read'),
]

@ -0,0 +1,936 @@
import json
from django.http import JsonResponse
from django.views.decorators.csrf import csrf_exempt
from django.views.decorators.http import require_http_methods
from .models import Article
# 添加检查点赞状态的接口
@require_http_methods(["GET"])
def check_like_status(request, article_id):
"""
检查用户是否已点赞文章
"""
if not request.user.is_authenticated:
return JsonResponse({'liked': False})
try:
from .models import Like
liked = Like.objects.filter(user=request.user, article_id=article_id).exists()
return JsonResponse({'liked': liked})
except Exception as e:
return JsonResponse({'liked': False, 'error': str(e)})
import logging
import os
import uuid
from django.conf import settings
from django.core.paginator import Paginator
from django.http import HttpResponse, HttpResponseForbidden
from django.shortcuts import render, get_object_or_404, redirect
from django.http import JsonResponse
from django.contrib.auth.decorators import login_required
from django.views.generic import DetailView
from .models import Article, Tag, TagSubscription, Subscription, Notification
from django.db.models import Count, Q
from django.core.paginator import Paginator
# Adding a view function to like an article
@login_required
@require_http_methods(["POST", "DELETE"])
def like_article(request, article_id):
"""
处理文章点赞和取消点赞
POST: 点赞文章
DELETE: 取消点赞
"""
try:
article = get_object_or_404(Article, id=article_id)
if request.method == "POST":
# 点赞文章
from .models import Like
# 检查是否已经点赞
if Like.objects.filter(user=request.user, article=article).exists():
return JsonResponse({
'liked': True,
'likes_count': article.likes,
'detail': '您已经点过赞了'
}, status=400)
# 创建点赞记录
like = Like(user=request.user, article=article)
like.save()
return JsonResponse({
'liked': True,
'likes_count': article.likes,
'detail': '点赞成功'
})
elif request.method == "DELETE":
# 取消点赞
from .models import Like
try:
like = Like.objects.get(user=request.user, article=article)
like.delete()
return JsonResponse({
'liked': False,
'likes_count': article.likes,
'detail': '已取消点赞'
})
except Like.DoesNotExist:
return JsonResponse({
'liked': False,
'likes_count': article.likes,
'detail': '您还没有点赞'
}, status=400)
except Exception as e:
return JsonResponse({
'liked': False,
'likes_count': article.likes if 'article' in locals() else 0,
'detail': str(e)
}, status=500)
# 订阅相关视图函数
@require_http_methods(["GET"])
def check_subscribe_status(request, article_id=None, author_id=None):
"""
检查用户是否已订阅文章或作者
"""
if not request.user.is_authenticated:
return JsonResponse({'subscribed': False})
try:
if article_id:
subscribed = Subscription.objects.filter(
user=request.user,
article_id=article_id,
subscription_type='article'
).exists()
elif author_id:
subscribed = Subscription.objects.filter(
user=request.user,
author_id=author_id,
subscription_type='author'
).exists()
else:
return JsonResponse({'subscribed': False, 'error': '缺少参数'}, status=400)
return JsonResponse({'subscribed': subscribed})
except Exception as e:
return JsonResponse({'subscribed': False, 'error': str(e)})
@login_required
@require_http_methods(["POST", "DELETE"])
def subscribe_article(request, article_id):
"""
处理文章订阅和取消订阅
POST: 订阅文章
DELETE: 取消订阅
"""
try:
article = get_object_or_404(Article, id=article_id)
if request.method == "POST":
# 订阅文章
# 检查是否已经订阅
if Subscription.objects.filter(
user=request.user,
article=article,
subscription_type='article'
).exists():
return JsonResponse({
'subscribed': True,
'subscriptions_count': article.subscriptions,
'detail': '您已经订阅过这篇文章了'
}, status=400)
# 创建订阅记录
subscription = Subscription(
user=request.user,
article=article,
subscription_type='article'
)
subscription.save()
# 清除缓存
from djangoblog.utils import cache
cache_key = f'article_subscriptions_{article.id}'
cache.delete(cache_key)
return JsonResponse({
'subscribed': True,
'subscriptions_count': article.subscriptions,
'detail': '订阅成功'
})
elif request.method == "DELETE":
# 取消订阅
try:
subscription = Subscription.objects.get(
user=request.user,
article=article,
subscription_type='article'
)
subscription.delete()
# 清除缓存
from djangoblog.utils import cache
cache_key = f'article_subscriptions_{article.id}'
cache.delete(cache_key)
return JsonResponse({
'subscribed': False,
'subscriptions_count': article.subscriptions,
'detail': '已取消订阅'
})
except Subscription.DoesNotExist:
return JsonResponse({
'subscribed': False,
'subscriptions_count': article.subscriptions,
'detail': '您还没有订阅这篇文章'
}, status=400)
except Exception as e:
return JsonResponse({
'subscribed': False,
'subscriptions_count': article.subscriptions if 'article' in locals() else 0,
'detail': str(e)
}, status=500)
@login_required
@require_http_methods(["POST", "DELETE"])
def subscribe_author(request, author_id):
"""
处理作者订阅和取消订阅
POST: 订阅作者
DELETE: 取消订阅
"""
try:
from accounts.models import BlogUser
author = get_object_or_404(BlogUser, id=author_id)
# 不能订阅自己
if request.user.id == author_id:
return JsonResponse({
'subscribed': False,
'detail': '不能订阅自己'
}, status=400)
if request.method == "POST":
# 订阅作者
# 检查是否已经订阅
if Subscription.objects.filter(
user=request.user,
author=author,
subscription_type='author'
).exists():
subscriber_count = author.get_subscriber_count()
return JsonResponse({
'subscribed': True,
'subscriber_count': subscriber_count,
'detail': '您已经订阅过这位作者了'
}, status=400)
# 创建订阅记录
subscription = Subscription(
user=request.user,
author=author,
subscription_type='author'
)
subscription.save()
subscriber_count = author.get_subscriber_count()
return JsonResponse({
'subscribed': True,
'subscriber_count': subscriber_count,
'detail': '订阅成功'
})
elif request.method == "DELETE":
# 取消订阅
try:
subscription = Subscription.objects.get(
user=request.user,
author=author,
subscription_type='author'
)
subscription.delete()
subscriber_count = author.get_subscriber_count()
return JsonResponse({
'subscribed': False,
'subscriber_count': subscriber_count,
'detail': '已取消订阅'
})
except Subscription.DoesNotExist:
subscriber_count = author.get_subscriber_count()
return JsonResponse({
'subscribed': False,
'subscriber_count': subscriber_count,
'detail': '您还没有订阅这位作者'
}, status=400)
except Exception as e:
return JsonResponse({
'subscribed': False,
'detail': str(e)
}, status=500)
@login_required
def subscription_history(request):
"""
订阅历史页面 - 显示用户订阅的文章和作者
"""
user = request.user
# 获取订阅的文章
subscribed_articles = Subscription.objects.filter(
user=user,
subscription_type='article'
).select_related('article').order_by('-created_time')
# 获取订阅的作者
subscribed_authors = Subscription.objects.filter(
user=user,
subscription_type='author'
).select_related('author').order_by('-created_time')
# 分页处理
article_paginator = Paginator(subscribed_articles, 10)
author_paginator = Paginator(subscribed_authors, 10)
article_page = request.GET.get('article_page', 1)
author_page = request.GET.get('author_page', 1)
try:
article_page_obj = article_paginator.page(article_page)
except:
article_page_obj = article_paginator.page(1)
try:
author_page_obj = author_paginator.page(author_page)
except:
author_page_obj = author_paginator.page(1)
context = {
'subscribed_articles': article_page_obj,
'subscribed_authors': author_page_obj,
'articles_count': subscribed_articles.count(),
'authors_count': subscribed_authors.count(),
}
return render(request, 'blog/subscription_history.html', context)
# 通知相关视图
@require_http_methods(["GET"])
def get_unread_notification_count(request):
"""
获取未读通知数量API
"""
if not request.user.is_authenticated:
return JsonResponse({'count': 0})
try:
count = Notification.objects.filter(user=request.user, is_read=False).count()
return JsonResponse({'count': count})
except Exception as e:
return JsonResponse({'count': 0, 'error': str(e)})
@login_required
def notification_list(request):
"""
通知列表页面
"""
notifications = Notification.objects.filter(user=request.user).order_by('-created_time')
# 分页
paginator = Paginator(notifications, 20)
page = request.GET.get('page', 1)
try:
page_obj = paginator.page(page)
except:
page_obj = paginator.page(1)
context = {
'notifications': page_obj,
'unread_count': Notification.objects.filter(user=request.user, is_read=False).count(),
}
return render(request, 'blog/notification_list.html', context)
@login_required
@require_http_methods(["POST"])
def mark_notification_read(request, notification_id):
"""
标记通知为已读
"""
try:
notification = get_object_or_404(Notification, id=notification_id, user=request.user)
notification.mark_as_read()
return JsonResponse({'success': True, 'message': '已标记为已读'})
except Exception as e:
return JsonResponse({'success': False, 'message': str(e)}, status=400)
@login_required
@require_http_methods(["POST"])
def mark_all_notifications_read(request):
"""
标记所有通知为已读
"""
try:
Notification.objects.filter(user=request.user, is_read=False).update(is_read=True)
return JsonResponse({'success': True, 'message': '已全部标记为已读'})
except Exception as e:
return JsonResponse({'success': False, 'message': str(e)}, status=400)
@login_required
@require_http_methods(["GET"])
def get_recent_notifications(request):
"""
获取最近的通知用于下拉菜单显示
"""
try:
notifications = Notification.objects.filter(user=request.user).order_by('-created_time')[:10]
notification_list = []
for notif in notifications:
notification_list.append({
'id': notif.id,
'title': notif.title,
'content': notif.content,
'type': notif.notification_type,
'is_read': notif.is_read,
'created_time': notif.created_time.strftime('%Y-%m-%d %H:%M'),
'article_url': notif.article.get_absolute_url() if notif.article else None,
})
unread_count = Notification.objects.filter(user=request.user, is_read=False).count()
return JsonResponse({
'notifications': notification_list,
'unread_count': unread_count
})
except Exception as e:
return JsonResponse({'notifications': [], 'unread_count': 0, 'error': str(e)})
# 标签订阅功能 - 按照文章订阅的逻辑实现
@login_required
@require_http_methods(["POST", "DELETE"])
def subscribe_tag(request, tag_id):
"""
处理标签订阅和取消订阅
POST: 订阅标签
DELETE: 取消订阅
"""
try:
tag = get_object_or_404(Tag, id=tag_id)
if request.method == "POST":
# 订阅标签
# 检查是否已经订阅
if TagSubscription.objects.filter(
user=request.user,
tag=tag
).exists():
subscription_count = TagSubscription.objects.filter(tag=tag).count()
return JsonResponse({
'status': 'success',
'subscribed': True,
'subscription_count': subscription_count,
'message': '您已经订阅过这个标签了'
})
# 创建订阅记录 - 直接更新数据库
subscription = TagSubscription(
user=request.user,
tag=tag
)
subscription.save()
# 清除相关缓存
from djangoblog.utils import cache
cache_key = f'tag_subscriptions_{tag.id}'
cache.delete(cache_key)
# 清除侧边栏缓存
cache.delete('sidebarL')
cache.delete('sidebarI')
subscription_count = TagSubscription.objects.filter(tag=tag).count()
return JsonResponse({
'status': 'success',
'subscribed': True,
'subscription_count': subscription_count,
'message': '订阅成功'
})
elif request.method == "DELETE":
# 取消订阅
try:
subscription = TagSubscription.objects.get(
user=request.user,
tag=tag
)
subscription.delete()
# 清除相关缓存
from djangoblog.utils import cache
cache_key = f'tag_subscriptions_{tag.id}'
cache.delete(cache_key)
# 清除侧边栏缓存
cache.delete('sidebarL')
cache.delete('sidebarI')
subscription_count = TagSubscription.objects.filter(tag=tag).count()
return JsonResponse({
'status': 'success',
'subscribed': False,
'subscription_count': subscription_count,
'message': '已取消订阅'
})
except TagSubscription.DoesNotExist:
subscription_count = TagSubscription.objects.filter(tag=tag).count()
return JsonResponse({
'status': 'success',
'subscribed': False,
'subscription_count': subscription_count,
'message': '您还没有订阅这个标签'
})
except Exception as e:
import logging
logger = logging.getLogger(__name__)
logger.error(f'标签订阅错误: {str(e)}')
return JsonResponse({
'status': 'error',
'subscribed': False,
'subscription_count': 0,
'message': f'操作失败: {str(e)}'
}, status=500)
# Modify the tag detail view to pass the subscription status
class TagDetailView(DetailView):
model = Tag
template_name = 'blog/tag_detail.html'
context_object_name = 'tag'
def get_object(self):
return get_object_or_404(Tag, slug=self.kwargs['tag_name'])
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
tag = context['tag']
# Get articles under the tag
articles = Article.objects.filter(tags=tag, status='p').order_by('-article_order', '-pub_time')
context['articles'] = articles
# Get subscription count
subscription_count = TagSubscription.objects.filter(tag=tag).count()
context['subscription_count'] = subscription_count
# Check if the current user is subscribed (if they are logged in)
if self.request.user.is_authenticated:
is_subscribed = TagSubscription.objects.filter(
user=self.request.user,
tag=tag
).exists()
context['is_subscribed'] = is_subscribed
else:
context['is_subscribed'] = False
return context
from django.templatetags.static import static
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.detail import DetailView
from django.views.generic.list import ListView
from haystack.views import SearchView
from blog.models import Article, Category, LinkShowType, Links, Tag
from comments.forms import CommentForm
from djangoblog.plugin_manage import hooks
from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
from djangoblog.utils import cache, get_blog_setting, get_sha256
logger = logging.getLogger(__name__)
class ArticleListView(ListView):
# template_name属性用于指定使用哪个模板进行渲染
template_name = 'blog/article_index.html'
# context_object_name属性用于给上下文变量取名在模板中使用该名字
context_object_name = 'article_list'
# 页面类型,分类目录或标签列表等
page_type = ''
paginate_by = settings.PAGINATE_BY
page_kwarg = 'page'
link_type = LinkShowType.L
def get_view_cache_key(self):
return self.request.get['pages']
@property
def page_number(self):
page_kwarg = self.page_kwarg
page = self.kwargs.get(
page_kwarg) or self.request.GET.get(page_kwarg) or 1
return page
def get_queryset_cache_key(self):
"""
子类重写.获得queryset的缓存key
"""
raise NotImplementedError()
def get_queryset_data(self):
"""
子类重写.获取queryset的数据
"""
raise NotImplementedError()
def get_queryset_from_cache(self, cache_key):
'''
缓存页面数据
:param cache_key: 缓存key
:return:
'''
value = cache.get(cache_key)
if value:
logger.info('get view cache.key:{key}'.format(key=cache_key))
return value
else:
article_list = self.get_queryset_data()
cache.set(cache_key, article_list)
logger.info('set view cache.key:{key}'.format(key=cache_key))
return article_list
def get_queryset(self):
'''
重写默认从缓存获取数据
:return:
'''
key = self.get_queryset_cache_key()
value = self.get_queryset_from_cache(key)
return value
def get_context_data(self, **kwargs):
kwargs['linktype'] = self.link_type
return super(ArticleListView, self).get_context_data(**kwargs)
class IndexView(ArticleListView):
'''
首页
'''
# 友情链接类型
link_type = LinkShowType.I
def get_queryset_data(self):
article_list = Article.objects.filter(type='a', status='p')
return article_list
def get_queryset_cache_key(self):
cache_key = 'index_{page}'.format(page=self.page_number)
return cache_key
class ArticleDetailView(DetailView):
'''
文章详情页面
'''
template_name = 'blog/article_detail.html'
model = Article
pk_url_kwarg = 'article_id'
context_object_name = "article"
def get_context_data(self, **kwargs):
comment_form = CommentForm()
article_comments = self.object.comment_list()
parent_comments = article_comments.filter(parent_comment=None)
blog_setting = get_blog_setting()
paginator = Paginator(parent_comments, blog_setting.article_comment_count)
page = self.request.GET.get('comment_page', '1')
if not page.isnumeric():
page = 1
else:
page = int(page)
if page < 1:
page = 1
if page > paginator.num_pages:
page = paginator.num_pages
p_comments = paginator.page(page)
next_page = p_comments.next_page_number() if p_comments.has_next() else None
prev_page = p_comments.previous_page_number() if p_comments.has_previous() else None
if next_page:
kwargs[
'comment_next_page_url'] = self.object.get_absolute_url() + f'?comment_page={next_page}#commentlist-container'
if prev_page:
kwargs[
'comment_prev_page_url'] = self.object.get_absolute_url() + f'?comment_page={prev_page}#commentlist-container'
kwargs['form'] = comment_form
kwargs['article_comments'] = article_comments
kwargs['p_comments'] = p_comments
kwargs['comment_count'] = len(
article_comments) if article_comments else 0
kwargs['next_article'] = self.object.next_article
kwargs['prev_article'] = self.object.prev_article
context = super(ArticleDetailView, self).get_context_data(**kwargs)
article = self.object
# 添加订阅状态
if self.request.user.is_authenticated:
is_subscribed = Subscription.objects.filter(
user=self.request.user,
article=article,
subscription_type='article'
).exists()
context['is_subscribed'] = is_subscribed
else:
context['is_subscribed'] = False
# 添加作者订阅状态
if self.request.user.is_authenticated and self.request.user != article.author:
is_author_subscribed = Subscription.objects.filter(
user=self.request.user,
author=article.author,
subscription_type='author'
).exists()
context['is_author_subscribed'] = is_author_subscribed
else:
context['is_author_subscribed'] = False
# Action Hook, 通知插件"文章详情已获取"
hooks.run_action('after_article_body_get', article=article, request=self.request)
# # Filter Hook, 允许插件修改文章正文
article.body = hooks.apply_filters(ARTICLE_CONTENT_HOOK_NAME, article.body, article=article,
request=self.request)
return context
class CategoryDetailView(ArticleListView):
'''
分类目录列表
'''
page_type = "分类目录归档"
def get_queryset_data(self):
slug = self.kwargs['category_name']
category = get_object_or_404(Category, slug=slug)
categoryname = category.name
self.categoryname = categoryname
categorynames = list(
map(lambda c: c.name, category.get_sub_categorys()))
article_list = Article.objects.filter(
category__name__in=categorynames, status='p')
return article_list
def get_queryset_cache_key(self):
slug = self.kwargs['category_name']
category = get_object_or_404(Category, slug=slug)
categoryname = category.name
self.categoryname = categoryname
cache_key = 'category_list_{categoryname}_{page}'.format(
categoryname=categoryname, page=self.page_number)
return cache_key
def get_context_data(self, **kwargs):
categoryname = self.categoryname
try:
categoryname = categoryname.split('/')[-1]
except BaseException:
pass
kwargs['page_type'] = CategoryDetailView.page_type
kwargs['tag_name'] = categoryname
return super(CategoryDetailView, self).get_context_data(**kwargs)
class AuthorDetailView(ArticleListView):
'''
作者详情页
'''
page_type = '作者文章归档'
def get_queryset_cache_key(self):
from uuslug import slugify
author_name = slugify(self.kwargs['author_name'])
cache_key = 'author_{author_name}_{page}'.format(
author_name=author_name, page=self.page_number)
return cache_key
def get_queryset_data(self):
author_name = self.kwargs['author_name']
article_list = Article.objects.filter(
author__username=author_name, type='a', status='p')
return article_list
def get_context_data(self, **kwargs):
author_name = self.kwargs['author_name']
kwargs['page_type'] = AuthorDetailView.page_type
kwargs['tag_name'] = author_name
# 获取作者对象
from accounts.models import BlogUser
try:
author = BlogUser.objects.get(username=author_name)
kwargs['author'] = author
kwargs['subscriber_count'] = author.get_subscriber_count()
# 检查当前用户是否订阅了该作者
if self.request.user.is_authenticated and self.request.user != author:
is_author_subscribed = Subscription.objects.filter(
user=self.request.user,
author=author,
subscription_type='author'
).exists()
kwargs['is_author_subscribed'] = is_author_subscribed
else:
kwargs['is_author_subscribed'] = False
except BlogUser.DoesNotExist:
pass
return super(AuthorDetailView, self).get_context_data(**kwargs)
class ArchivesView(ArticleListView):
'''
文章归档页面
'''
page_type = '文章归档'
paginate_by = None
page_kwarg = None
template_name = 'blog/article_archives.html'
def get_queryset_data(self):
return Article.objects.filter(status='p').all()
def get_queryset_cache_key(self):
cache_key = 'archives'
return cache_key
class LinkListView(ListView):
model = Links
template_name = 'blog/links_list.html'
def get_queryset(self):
return Links.objects.filter(is_enable=True)
class EsSearchView(SearchView):
def get_context(self):
paginator, page = self.build_page()
context = {
"query": self.query,
"form": self.form,
"page": page,
"paginator": paginator,
"suggestion": None,
}
if hasattr(self.results, "query") and self.results.query.backend.include_spelling:
context["suggestion"] = self.results.query.get_spelling_suggestion()
context.update(self.extra_context())
return context
def fileupload(request):
"""
该方法需自己写调用端来上传图片该方法仅提供图床功能
:param request:
:return:
"""
if request.method == 'POST':
sign = request.GET.get('sign', None)
if not sign:
return HttpResponseForbidden()
if not sign == get_sha256(get_sha256(settings.SECRET_KEY)):
return HttpResponseForbidden()
response = []
for filename in request.FILES:
timestr = timezone.now().strftime('%Y/%m/%d')
imgextensions = ['jpg', 'png', 'jpeg', 'bmp']
fname = u''.join(str(filename))
isimage = len([i for i in imgextensions if fname.find(i) >= 0]) > 0
base_dir = os.path.join(settings.STATICFILES, "files" if not isimage else "image", timestr)
if not os.path.exists(base_dir):
os.makedirs(base_dir)
savepath = os.path.normpath(os.path.join(base_dir, f"{uuid.uuid4().hex}{os.path.splitext(filename)[-1]}"))
if not savepath.startswith(base_dir):
return HttpResponse("only for post")
with open(savepath, 'wb+') as wfile:
for chunk in request.FILES[filename].chunks():
wfile.write(chunk)
if isimage:
from PIL import Image
image = Image.open(savepath)
image.save(savepath, quality=20, optimize=True)
url = static(savepath)
response.append(url)
return HttpResponse(response)
else:
return HttpResponse("only for post")
def page_not_found_view(
request,
exception,
template_name='blog/error_page.html'):
if exception:
logger.error(exception)
url = request.get_full_path()
return render(request,
template_name,
{'message': _('Sorry, the page you requested is not found, please click the home page to see other?'),
'statuscode': '404'},
status=404)
def server_error_view(request, template_name='blog/error_page.html'):
return render(request,
template_name,
{'message': _('Sorry, the server is busy, please click the home page to see other?'),
'statuscode': '500'},
status=500)
def permission_denied_view(
request,
exception,
template_name='blog/error_page.html'):
if exception:
logger.error(exception)
return render(
request, template_name, {
'message': _('Sorry, you do not have permission to access this page?'),
'statuscode': '403'}, status=403)
def clean_cache_view(request):
cache.clear()
return HttpResponse('ok')

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save