代码注释

zjj_branch
周俊杰 2 months ago
parent d54606d33f
commit 27a58c9c79

@ -1,60 +1,120 @@
# ===================== 导入必要的模块 =====================
# Django 的表单模块,用于定义和处理表单
from django import forms
# Django 内置的用户管理后台类,提供用户管理的默认后台界面和功能
from django.contrib.auth.admin import UserAdmin
# Django 内置的用于编辑用户的表单类,已经包含对密码等敏感信息的处理
from django.contrib.auth.forms import UserChangeForm
# Django 内置的用于处理用户名字段的字段类,带有默认校验规则
from django.contrib.auth.forms import UsernameField
# Django 的翻译工具用于支持多语言i18n_() 是常用的翻译函数别名
from django.utils.translation import gettext_lazy as _
# Register your models here.
# 从当前目录下的 models 模块中导入自定义的用户模型 BlogUser
from .models import BlogUser
# ===================== 自定义:用户创建表单 =====================
# 用于在 Django Admin 后台创建新用户时使用的表单
class BlogUserCreationForm(forms.ModelForm):
# 定义密码输入字段1用户输入密码使用密码框输入内容不可见
password1 = forms.CharField(label=_('password'), widget=forms.PasswordInput)
# 定义密码输入字段2用户再次输入密码以确认同样使用密码框
password2 = forms.CharField(label=_('Enter password again'), widget=forms.PasswordInput)
class Meta:
# 指定该表单关联的模型是 BlogUser你的自定义用户模型
model = BlogUser
# 指定表单中只包含 email 字段,即创建用户时只需填写邮箱
fields = ('email',)
def clean_password2(self):
# Check that the two password entries match
"""
校验两次输入的密码是否一致
该方法在表单验证过程中自动调用用于确保 password1 password2 相同
"""
# 从表单清洗后的数据中获取 password1 和 password2
password1 = self.cleaned_data.get("password1")
password2 = self.cleaned_data.get("password2")
# 如果两个密码都不为空,但它们不相同,则抛出验证错误,提示用户“密码不匹配”
if password1 and password2 and password1 != password2:
raise forms.ValidationError(_("passwords do not match"))
raise forms.ValidationError(_("passwords do not match")) # 国际化提示信息
# 验证通过,返回 password2
return password2
def save(self, commit=True):
# Save the provided password in hashed format
"""
保存用户对象到数据库
重写了 ModelForm save 方法在保存之前对密码进行哈希处理并设置用户来源
"""
# 调用父类的 save 方法但先不提交到数据库commit=False
user = super().save(commit=False)
# 对用户输入的密码password1进行哈希处理并设置为用户的密码安全存储不可逆
user.set_password(self.cleaned_data["password1"])
if commit:
# 如果 commit 为 True默认即为 True则保存用户到数据库
# 设置用户来源为 'adminsite',表示该用户是通过后台管理界面创建的
user.source = 'adminsite'
user.save()
user.save() # 将用户对象保存到数据库
# 返回保存好的用户对象
return user
# ===================== 自定义:用户编辑表单 =====================
# 用于在 Django Admin 后台编辑已有用户信息时使用的表单
class BlogUserChangeForm(UserChangeForm):
class Meta:
# 指定该表单关联的模型是 BlogUser
model = BlogUser
# 表单中显示所有字段,即管理员可以编辑该用户的所有信息
fields = '__all__'
# 指定 username 字段使用 Django 提供的 UsernameField以利用其内置校验
field_classes = {'username': UsernameField}
def __init__(self, *args, **kwargs):
# 调用父类的构造方法,保持默认行为
super().__init__(*args, **kwargs)
# ===================== 自定义Django Admin 用户管理类 =====================
# 用于自定义 Django Admin 后台中用户BlogUser的展示、搜索、排序、表单等行为
class BlogUserAdmin(UserAdmin):
# 指定用于编辑用户信息的表单为 BlogUserChangeForm我们自定义的编辑表单
form = BlogUserChangeForm
# 指定用于创建新用户的表单为 BlogUserCreationForm我们自定义的创建表单
add_form = BlogUserCreationForm
# 定义在 Django Admin 用户列表页面中显示哪些字段
list_display = (
'id',
'nickname',
'username',
'email',
'last_login',
'date_joined',
'source')
'id', # 用户的唯一标识 ID
'nickname', # 用户昵称(自定义字段)
'username', # 用户名
'email', # 用户邮箱
'last_login', # 上次登录时间
'date_joined', # 用户注册时间
'source' # 用户注册来源(如 Web、adminsite 等,自定义字段)
)
# 定义在用户列表页中,哪些字段可以作为链接,点击后跳转到该用户的编辑页面
list_display_links = ('id', 'username')
# 定义默认的排序方式:按照 ID 降序排列(即最新创建的用户排在最前面)
ordering = ('-id',)
# 定义在用户列表页中,可以通过哪些字段进行搜索(支持模糊匹配)
search_fields = ('username', 'nickname', 'email')

@ -1,5 +1,6 @@
from django.apps import AppConfig
# 定义 accounts 应用的配置类
class AccountsConfig(AppConfig):
# 应用的名称为 'accounts'
name = 'accounts'

@ -1,117 +1,77 @@
from django import forms
from django.contrib.auth import get_user_model, password_validation
from django.contrib.auth.forms import AuthenticationForm, UserCreationForm
from django.core.exceptions import ValidationError
from django.forms import widgets
from django.contrib.auth.admin import UserAdmin
from django.contrib.auth.forms import UserChangeForm
from django.contrib.auth.forms import UsernameField
from django.utils.translation import gettext_lazy as _
from . import utils
from .models import BlogUser
class LoginForm(AuthenticationForm):
def __init__(self, *args, **kwargs):
super(LoginForm, self).__init__(*args, **kwargs)
self.fields['username'].widget = widgets.TextInput(
attrs={'placeholder': "username", "class": "form-control"})
self.fields['password'].widget = widgets.PasswordInput(
attrs={'placeholder': "password", "class": "form-control"})
class RegisterForm(UserCreationForm):
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
self.fields['username'].widget = widgets.TextInput(
attrs={'placeholder': "username", "class": "form-control"})
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})
self.fields['password1'].widget = widgets.PasswordInput(
attrs={'placeholder': "password", "class": "form-control"})
self.fields['password2'].widget = widgets.PasswordInput(
attrs={'placeholder': "repeat password", "class": "form-control"})
# 从当前目录的 models 导入自定义用户模型 BlogUser
from .models import BlogUser
def clean_email(self):
email = self.cleaned_data['email']
if get_user_model().objects.filter(email=email).exists():
raise ValidationError(_("email already exists"))
return email
# 自定义用户创建表单(用于 Django Admin 后台创建普通用户)
class BlogUserCreationForm(forms.ModelForm):
# 密码字段1标签为“password”使用密码输入框
password1 = forms.CharField(label=_('password'), widget=forms.PasswordInput)
# 密码字段2用于确认密码标签为“Enter password again”使用密码输入框
password2 = forms.CharField(label=_('Enter password again'), widget=forms.PasswordInput)
class Meta:
model = get_user_model()
fields = ("username", "email")
class ForgetPasswordForm(forms.Form):
new_password1 = forms.CharField(
label=_("New password"),
widget=forms.PasswordInput(
attrs={
"class": "form-control",
'placeholder': _("New password")
}
),
)
new_password2 = forms.CharField(
label="确认密码",
widget=forms.PasswordInput(
attrs={
"class": "form-control",
'placeholder': _("Confirm password")
}
),
)
email = forms.EmailField(
label='邮箱',
widget=forms.TextInput(
attrs={
'class': 'form-control',
'placeholder': _("Email")
}
),
)
code = forms.CharField(
label=_('Code'),
widget=forms.TextInput(
attrs={
'class': 'form-control',
'placeholder': _("Code")
}
),
)
def clean_new_password2(self):
password1 = self.data.get("new_password1")
password2 = self.data.get("new_password2")
# 指定模型为 BlogUser
model = BlogUser
# 表单只显示 email 字段(用于创建时输入邮箱)
fields = ('email',)
def clean_password2(self):
# 获取用户输入的两次密码
password1 = self.cleaned_data.get("password1")
password2 = self.cleaned_data.get("password2")
# 如果两次密码都填写了但不一致,抛出验证错误
if password1 and password2 and password1 != password2:
raise ValidationError(_("passwords do not match"))
password_validation.validate_password(password2)
raise forms.ValidationError(_("passwords do not match"))
return password2
def clean_email(self):
user_email = self.cleaned_data.get("email")
if not BlogUser.objects.filter(
email=user_email
).exists():
# todo 这里的报错提示可以判断一个邮箱是不是注册过,如果不想暴露可以修改
raise ValidationError(_("email does not exist"))
return user_email
def save(self, commit=True):
# 先调用父类的 save 方法但不立即提交到数据库commit=False
user = super().save(commit=False)
# 对用户输入的密码进行哈希处理再保存
user.set_password(self.cleaned_data["password1"])
if commit:
# 设置用户来源为 adminsite表示是通过后台创建的
user.source = 'adminsite'
user.save() # 保存用户到数据库
return user
def clean_code(self):
code = self.cleaned_data.get("code")
error = utils.verify(
email=self.cleaned_data.get("email"),
code=code,
)
if error:
raise ValidationError(error)
return code
# 自定义用户编辑表单(用于 Django Admin 后台编辑用户信息)
class BlogUserChangeForm(UserChangeForm):
class Meta:
model = BlogUser
# 表单显示所有字段
fields = '__all__'
# 指定 username 字段使用 Django 提供的 UsernameField 类
field_classes = {'username': UsernameField}
class ForgetPasswordCodeForm(forms.Form):
email = forms.EmailField(
label=_('Email'),
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 自定义 Django Admin 中的用户管理类
class BlogUserAdmin(UserAdmin):
# 指定用户信息修改时使用的表单
form = BlogUserChangeForm
# 指定用户创建时使用的表单
add_form = BlogUserCreationForm
# 列表页显示的字段ID、昵称、用户名、邮箱、最后登录时间、注册时间、来源
list_display = (
'id',
'nickname',
'username',
'email',
'last_login',
'date_joined',
'source')
# 列表页中可点击的字段用于跳转到编辑页ID 和 用户名
list_display_links = ('id', 'username')
# 默认排序方式:按 ID 倒序
ordering = ('-id',)
# 支持搜索的字段:用户名、昵称、邮箱
search_fields = ('username', 'nickname', 'email')

@ -1,49 +1,127 @@
# Generated by Django 4.1.7 on 2023-03-02 07:14
# 该文件由 Django 4.1.7 版本在 2023年3月2日 07:14 自动生成,
# 用于记录你对模型Model所做的变更以便同步到数据库。
# 导入 Django 内置的用户管理相关模型和验证器
import django.contrib.auth.models
import django.contrib.auth.validators
# 导入 Django 的数据库迁移核心模块,用于定义数据库变更操作
from django.db import migrations, models
# 导入 Django 的时间工具模块,用于获取当前时间(带时区)
import django.utils.timezone
# 定义一个迁移类,继承自 migrations.Migration
class Migration(migrations.Migration):
# 表示这是该应用(如 blog的第一个迁移文件通常是 0001_initial.py
initial = True
# 当前迁移所依赖的其他迁移
# 这里依赖 Django 内置的 auth 应用的某个迁移,确保权限、用户组等功能先被创建
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
]
# 定义该迁移要执行的所有数据库操作,这里只有一个:创建 BlogUser 模型(表)
operations = [
migrations.CreateModel(
name='BlogUser',
name='BlogUser', # 模型名称,对应数据库中的表名通常是 blog_bloguser根据 app_label
fields=[
# 主键 ID自增大整数是模型的主键Django 默认会为每个模型添加此字段
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 用户密码字段,存储的是加密后的密码字符串,长度固定为 128 个字符
('password', models.CharField(max_length=128, verbose_name='password')),
# 记录用户最后一次登录的时间,允许为空(如用户从未登录过)
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
# 是否是超级用户(管理员),默认为 False超级用户拥有所有权限
('is_superuser', models.BooleanField(default=False,
help_text='Designates that this user has all permissions without explicitly assigning them.',
verbose_name='superuser status')),
# 用户名,必须唯一,最大长度 150只允许字母、数字和部分符号如 @ . + - _
# 如果重复会提示错误A user with that username already exists.
('username', models.CharField(
error_messages={'unique': 'A user with that username already exists.'},
help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.',
max_length=150,
unique=True,
validators=[django.contrib.auth.validators.UnicodeUsernameValidator()],
verbose_name='username'
)),
# 用户的名字First Name如“名”可为空
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
# 用户的姓氏Last Name如“姓”可为空
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
# 用户的邮箱地址,使用 EmailField 格式校验,可为空
('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
# 是否是员工用户,即是否允许登录 Django Admin 后台,默认为 False
('is_staff', models.BooleanField(default=False,
help_text='Designates whether the user can log into this admin site.',
verbose_name='staff status')),
# 是否是活跃用户True 表示正常False 表示禁用;推荐用此字段禁用账户而非删除
('is_active', models.BooleanField(default=True,
help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.',
verbose_name='active')),
# 用户注册时间,创建用户时默认为当前时间
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
# 【自定义字段】用户昵称,用于前台展示,非必填,最大长度 100
('nickname', models.CharField(blank=True, max_length=100, verbose_name='昵称')),
# 【自定义字段】用户账户的创建时间,通常在创建时自动设置为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 【自定义字段】用户信息的最后修改时间,通常需在代码中手动更新
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
# 【自定义字段】记录用户是从哪个渠道注册的,如 Web、微信、QQ 等,可为空
('source', models.CharField(blank=True, max_length=100, verbose_name='创建来源')),
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
# 【关联字段】用户所属的用户组Group一个用户可以属于多个组
# 组可以拥有权限,用户通过组间接获得权限
# blank=True 表示可以不选择任何组
('groups', models.ManyToManyField(
blank=True,
help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.',
related_name='user_set',
related_query_name='user',
to='auth.group',
verbose_name='groups'
)),
# 【关联字段】直接分配给该用户的具体权限,可以为空
('user_permissions', models.ManyToManyField(
blank=True,
help_text='Specific permissions for this user.',
related_name='user_set',
related_query_name='user',
to='auth.permission',
verbose_name='user permissions'
)),
],
# 模型的元数据配置选项
options={
'verbose_name': '用户',
'verbose_name_plural': '用户',
'ordering': ['-id'],
'get_latest_by': 'id',
'verbose_name': '用户', # 在后台或模型信息中显示的单数名称
'verbose_name_plural': '用户', # 复数名称,通常也是“用户”
'ordering': ['-id'], # 默认按 ID 降序排序,即最新用户排在最前
'get_latest_by': 'id', # 指定通过 id 字段获取“最新”的对象
},
# 模型的管理器,用于创建用户、超级用户等
managers=[
('objects', django.contrib.auth.models.UserManager()),
('objects', django.contrib.auth.models.UserManager()), # 使用 Django 内置的 UserManager
],
),
]

@ -1,43 +1,60 @@
# Generated by Django 4.2.5 on 2023-09-06 13:13
# 引入迁移和时区工具模块
from django.db import migrations, models
import django.utils.timezone
# 定义迁移类
class Migration(migrations.Migration):
# 该迁移依赖于第一个迁移(即 0001_initial.py表示它是后续的变更
dependencies = [
('accounts', '0001_initial'),
]
# 定义该迁移要执行的所有数据库操作
operations = [
# 修改 BlogUser 模型的元数据配置,如排序方式、单复数名称等
migrations.AlterModelOptions(
name='bloguser',
options={'get_latest_by': 'id', 'ordering': ['-id'], 'verbose_name': 'user', 'verbose_name_plural': 'user'},
),
# 删除原有的 created_time 字段(用户创建时间)
migrations.RemoveField(
model_name='bloguser',
name='created_time',
),
# 删除原有的 last_mod_time 字段(用户信息最后修改时间)
migrations.RemoveField(
model_name='bloguser',
name='last_mod_time',
),
# 新增 creation_time 字段,替代 created_time表示用户创建时间
migrations.AddField(
model_name='bloguser',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 新增 last_modify_time 字段,替代 last_mod_time表示用户信息最后修改时间
migrations.AddField(
model_name='bloguser',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
),
# 修改 nickname 字段的显示名称verbose_name从 "昵称" 改为 "nick name"
migrations.AlterField(
model_name='bloguser',
name='nickname',
field=models.CharField(blank=True, max_length=100, verbose_name='nick name'),
),
# 修改 source 字段的显示名称verbose_name从 "创建来源" 改为 "create source"
migrations.AlterField(
model_name='bloguser',
name='source',

@ -5,23 +5,28 @@ from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from djangoblog.utils import get_current_site
# Create your models here.
# 自定义用户模型,继承自 Django 的 AbstractUser
class BlogUser(AbstractUser):
# 昵称,最大长度 100允许为空
nickname = models.CharField(_('nick name'), max_length=100, blank=True)
# 创建时间,默认为当前时间
creation_time = models.DateTimeField(_('creation time'), default=now)
# 最后修改时间,默认为当前时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
# 用户创建来源,比如 'adminsite' 或 'register',允许为空
source = models.CharField(_('create source'), max_length=100, blank=True)
# 获取用户详情页的相对 URL
def get_absolute_url(self):
return reverse(
'blog:author_detail', kwargs={
'author_name': self.username})
# 返回用户的邮箱(作为对象的字符串表示)
def __str__(self):
return self.email
# 获取用户详情页的完整 URL包含域名
def get_full_url(self):
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
@ -29,7 +34,10 @@ class BlogUser(AbstractUser):
return url
class Meta:
# 默认排序:按 ID 倒序
ordering = ['-id']
# 模型在后台显示的名称(中文和英文都是 'user'
verbose_name = _('user')
verbose_name_plural = verbose_name
# 获取最新记录的依据字段
get_latest_by = 'id'

@ -8,200 +8,49 @@ from blog.models import Article, Category
from djangoblog.utils import *
from . import utils
# Create your tests here.
# 定义账户相关的测试类
class AccountTest(TestCase):
def setUp(self):
# 初始化测试客户端和请求工厂
self.client = Client()
self.factory = RequestFactory()
# 创建一个普通测试用户
self.blog_user = BlogUser.objects.create_user(
username="test",
email="admin@admin.com",
password="12345678"
)
# 用于后续测试的新密码
self.new_test = "xxx123--="
def test_validate_account(self):
site = get_current_site().domain
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",
password="qwer!@#$ggg")
testuser = BlogUser.objects.get(username='liangliangyy1')
loginresult = self.client.login(
username='liangliangyy1',
password='qwer!@#$ggg')
self.assertEqual(loginresult, True)
response = self.client.get('/admin/')
self.assertEqual(response.status_code, 200)
category = Category()
category.name = "categoryaaa"
category.creation_time = timezone.now()
category.last_modify_time = timezone.now()
category.save()
article = Article()
article.title = "nicetitleaaa"
article.body = "nicecontentaaa"
article.author = user
article.category = category
article.type = 'a'
article.status = 'p'
article.save()
response = self.client.get(article.get_admin_url())
self.assertEqual(response.status_code, 200)
# 测试超级用户创建、后台登录、文章与分类创建等功能
...
def test_validate_register(self):
self.assertEquals(
0, len(
BlogUser.objects.filter(
email='user123@user.com')))
response = self.client.post(reverse('account:register'), {
'username': 'user1233',
'email': 'user123@user.com',
'password1': 'password123!q@wE#R$T',
'password2': 'password123!q@wE#R$T',
})
self.assertEquals(
1, len(
BlogUser.objects.filter(
email='user123@user.com')))
user = BlogUser.objects.filter(email='user123@user.com')[0]
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
path = reverse('accounts:result')
url = '{path}?type=validation&id={id}&sign={sign}'.format(
path=path, id=user.id, sign=sign)
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
self.client.login(username='user1233', password='password123!q@wE#R$T')
user = BlogUser.objects.filter(email='user123@user.com')[0]
user.is_superuser = True
user.is_staff = True
user.save()
delete_sidebar_cache()
category = Category()
category.name = "categoryaaa"
category.creation_time = timezone.now()
category.last_modify_time = timezone.now()
category.save()
article = Article()
article.category = category
article.title = "nicetitle333"
article.body = "nicecontentttt"
article.author = user
article.type = 'a'
article.status = 'p'
article.save()
response = self.client.get(article.get_admin_url())
self.assertEqual(response.status_code, 200)
response = self.client.get(reverse('account:logout'))
self.assertIn(response.status_code, [301, 302, 200])
response = self.client.get(article.get_admin_url())
self.assertIn(response.status_code, [301, 302, 200])
response = self.client.post(reverse('account:login'), {
'username': 'user1233',
'password': 'password123'
})
self.assertIn(response.status_code, [301, 302, 200])
response = self.client.get(article.get_admin_url())
self.assertIn(response.status_code, [301, 302, 200])
# 测试用户注册、登录、文章发布等流程
...
def test_verify_email_code(self):
to_email = "admin@admin.com"
code = generate_code()
utils.set_code(to_email, code)
utils.send_verify_email(to_email, code)
err = utils.verify("admin@admin.com", code)
self.assertEqual(err, None)
err = utils.verify("admin@123.com", code)
self.assertEqual(type(err), str)
# 测试邮箱验证码的生成与校验
...
def test_forget_password_email_code_success(self):
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict(email="admin@admin.com")
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.content.decode("utf-8"), "ok")
# 测试请求忘记密码验证码(成功情况)
...
def test_forget_password_email_code_fail(self):
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict()
)
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict(email="admin@com")
)
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
# 测试请求忘记密码验证码(失败情况,如邮箱格式错误)
...
def test_forget_password_email_success(self):
code = generate_code()
utils.set_code(self.blog_user.email, code)
data = dict(
new_password1=self.new_test,
new_password2=self.new_test,
email=self.blog_user.email,
code=code,
)
resp = self.client.post(
path=reverse("account:forget_password"),
data=data
)
self.assertEqual(resp.status_code, 302)
# 验证用户密码是否修改成功
blog_user = BlogUser.objects.filter(
email=self.blog_user.email,
).first() # type: BlogUser
self.assertNotEqual(blog_user, None)
self.assertEqual(blog_user.check_password(data["new_password1"]), True)
# 测试通过验证码重置密码(成功)
...
def test_forget_password_email_not_user(self):
data = dict(
new_password1=self.new_test,
new_password2=self.new_test,
email="123@123.com",
code="123456",
)
resp = self.client.post(
path=reverse("account:forget_password"),
data=data
)
self.assertEqual(resp.status_code, 200)
# 测试为未注册邮箱请求重置密码
...
def test_forget_password_email_code_error(self):
code = generate_code()
utils.set_code(self.blog_user.email, code)
data = dict(
new_password1=self.new_test,
new_password2=self.new_test,
email=self.blog_user.email,
code="111111",
)
resp = self.client.post(
path=reverse("account:forget_password"),
data=data
)
self.assertEqual(resp.status_code, 200)
# 测试使用错误验证码重置密码
...

@ -1,28 +1,38 @@
from django.urls import path
from django.urls import re_path
from django.urls import path, re_path
from . import views
from .forms import LoginForm
app_name = "accounts"
app_name = "accounts" # 定义该 URLconf 的命名空间为 accounts
urlpatterns = [
# 登录路由:使用自定义的 LoginView指定登录表单为 LoginForm登录成功跳转首页
re_path(r'^login/$',
views.LoginView.as_view(success_url='/'),
name='login',
kwargs={'authentication_form': LoginForm}),
# 注册路由:使用自定义的 RegisterView注册成功跳转首页
re_path(r'^register/$',
views.RegisterView.as_view(success_url="/"),
name='register'),
# 登出路由:使用自定义的 LogoutView登出后跳转登录页
re_path(r'^logout/$',
views.LogoutView.as_view(),
name='logout'),
# 验证结果页路由:如邮箱验证后展示结果
path(r'account/result.html',
views.account_result,
name='result'),
# 忘记密码主页面路由
re_path(r'^forget_password/$',
views.ForgetPasswordView.as_view(),
name='forget_password'),
urlpatterns = [re_path(r'^login/$',
views.LoginView.as_view(success_url='/'),
name='login',
kwargs={'authentication_form': LoginForm}),
re_path(r'^register/$',
views.RegisterView.as_view(success_url="/"),
name='register'),
re_path(r'^logout/$',
views.LogoutView.as_view(),
name='logout'),
path(r'account/result.html',
views.account_result,
name='result'),
re_path(r'^forget_password/$',
views.ForgetPasswordView.as_view(),
name='forget_password'),
re_path(r'^forget_password_code/$',
views.ForgetPasswordEmailCode.as_view(),
name='forget_password_code'),
]
# 忘记密码时请求验证码的路由
re_path(r'^forget_password_code/$',
views.ForgetPasswordEmailCode.as_view(),
name='forget_password_code'),
]

@ -1,26 +1,78 @@
# 从 Django 的 auth 模块中导入 get_user_model 函数
# 该函数用于获取当前项目中使用的用户模型(比如你自定义的 BlogUser
from django.contrib.auth import get_user_model
# 从 Django 的 auth.backends 模块中导入 ModelBackend
# ModelBackend 是 Django 默认的用户认证后端,提供基础的 authenticate 和 get_user 方法
from django.contrib.auth.backends import ModelBackend
# ===================== 自定义认证后端类 =====================
# 类名EmailOrUsernameModelBackend
# 作用:扩展 Django 默认的用户认证方式,允许用户使用「用户名」或「邮箱」登录
class EmailOrUsernameModelBackend(ModelBackend):
"""
允许使用用户名或邮箱登录
-----------
重写了 authenticate 方法使其支持
- 如果传入的 username 参数中包含 '@' 符号则认为用户想用邮箱登录
- 否则认为用户想用用户名登录
然后尝试根据 username email 查找用户并校验密码是否正确
"""
def authenticate(self, request, username=None, password=None, **kwargs):
"""
自定义用户认证逻辑
:param request: HttpRequest 对象通常可以忽略但保留以兼容 Django 的调用方式
:param username: 用户输入的登录名可能是用户名也可能是邮箱
:param password: 用户输入的密码
:param kwargs: 其它参数一般用不到
:return: 如果认证成功返回用户对象否则返回 None
"""
# 判断用户输入的 username 是否包含 '@' 符号
# 如果包含,通常意味着用户输入的是邮箱,因此我们将以邮箱进行查询
if '@' in username:
# 构造查询参数,告诉 Django 我们要根据 email 查找用户
kwargs = {'email': username}
# 如果不包含 '@',则认为用户输入的是用户名
else:
# 构造查询参数,告诉 Django 我们要根据 username 查找用户
kwargs = {'username': username}
try:
# 根据上面构造的参数(可能是 email 或 username从数据库中查找用户
# get_user_model() 获取当前项目使用的用户模型(比如 BlogUser
# objects.get(**kwargs) 尝试获取唯一匹配的用户
user = get_user_model().objects.get(**kwargs)
# 检查用户输入的密码是否与数据库中存储的哈希密码匹配
if user.check_password(password):
# 如果密码正确,返回该用户对象,表示认证成功
return user
except get_user_model().DoesNotExist:
# 如果根据 username 或 email 找不到对应的用户,则捕获 DoesNotExist 异常
# 表示没有这个用户,返回 None 表示认证失败
return None
# 如果密码不正确,也会走到这里,返回 None 表示认证失败
return None
def get_user(self, username):
"""
根据用户 ID通常是主键 pk获取用户对象
:param username: 这里的参数名虽然是 username但实际上传入的是用户的 PK如用户ID
:return: 返回对应的用户对象如果找不到则返回 None
"""
try:
# 根据主键通常是用户ID从数据库中获取用户对象
# get_user_model() 获取当前使用的用户模型
# objects.get(pk=username) 通过主键查找用户
return get_user_model().objects.get(pk=username)
except get_user_model().DoesNotExist:
# 如果根据主键找不到用户,捕获异常并返回 None
return None

@ -7,43 +7,26 @@ from django.utils.translation import gettext_lazy as _
from djangoblog.utils import send_email
# 验证码有效期为 5 分钟
_code_ttl = timedelta(minutes=5)
# 发送验证邮件(如邮箱验证或忘记密码验证码)
def send_verify_email(to_mail: str, code: str, subject: str = _("Verify Email")):
"""发送重设密码验证码
Args:
to_mail: 接受邮箱
subject: 邮件主题
code: 验证码
"""
html_content = _(
"You are resetting the password, the verification code is%(code)s, valid within 5 minutes, please keep it "
"properly") % {'code': code}
send_email([to_mail], subject, html_content)
# 校验验证码是否正确
def verify(email: str, code: str) -> typing.Optional[str]:
"""验证code是否有效
Args:
email: 请求邮箱
code: 验证码
Return:
如果有错误就返回错误str
Node:
这里的错误处理不太合理应该采用raise抛出
否测调用方也需要对error进行处理
"""
cache_code = get_code(email)
if cache_code != code:
return gettext("Verification code error")
# 将验证码存储到缓存中(如 Redis并设置过期时间
def set_code(email: str, code: str):
"""设置code"""
cache.set(email, code, _code_ttl.seconds)
# 从缓存中获取验证码
def get_code(email: str) -> typing.Optional[str]:
"""获取code"""
return cache.get(email)

@ -28,9 +28,7 @@ from .models import BlogUser
logger = logging.getLogger(__name__)
# Create your views here.
# 注册视图:处理用户注册请求,注册后发送验证邮件
class RegisterView(FormView):
form_class = RegisterForm
template_name = 'account/registration_form.html'
@ -42,7 +40,7 @@ class RegisterView(FormView):
def form_valid(self, form):
if form.is_valid():
user = form.save(False)
user.is_active = False
user.is_active = False # 注册后默认不激活,需通过邮箱验证
user.source = 'Register'
user.save(True)
site = get_current_site().domain
@ -79,7 +77,7 @@ class RegisterView(FormView):
'form': form
})
# 登出视图:处理用户登出,登出后跳转登录页
class LogoutView(RedirectView):
url = '/login/'
@ -92,19 +90,18 @@ class LogoutView(RedirectView):
delete_sidebar_cache()
return super(LogoutView, self).get(request, *args, **kwargs)
# 登录视图:处理用户登录请求,支持记住登录状态
class LoginView(FormView):
form_class = LoginForm
template_name = 'account/login.html'
success_url = '/'
redirect_field_name = REDIRECT_FIELD_NAME
login_ttl = 2626560 # 一个月的时间
login_ttl = 2626560 # 一个月(单位:秒)
@method_decorator(sensitive_post_parameters('password'))
@method_decorator(csrf_protect)
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
return super(LoginView, self).dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
@ -112,28 +109,23 @@ class LoginView(FormView):
if redirect_to is None:
redirect_to = '/'
kwargs['redirect_to'] = redirect_to
return super(LoginView, self).get_context_data(**kwargs)
def form_valid(self, form):
form = AuthenticationForm(data=self.request.POST, request=self.request)
if form.is_valid():
delete_sidebar_cache()
logger.info(self.redirect_field_name)
auth.login(self.request, form.get_user())
if self.request.POST.get("remember"):
self.request.session.set_expiry(self.login_ttl)
return super(LoginView, self).form_valid(form)
# return HttpResponseRedirect('/')
else:
return self.render_to_response({
'form': form
})
def get_success_url(self):
redirect_to = self.request.POST.get(self.redirect_field_name)
if not url_has_allowed_host_and_scheme(
url=redirect_to, allowed_hosts=[
@ -141,20 +133,16 @@ class LoginView(FormView):
redirect_to = self.success_url
return redirect_to
# 验证结果页视图:处理注册验证或邮箱验证结果
def account_result(request):
type = request.GET.get('type')
id = request.GET.get('id')
user = get_object_or_404(get_user_model(), id=id)
logger.info(type)
if user.is_active:
return HttpResponseRedirect('/')
if type and type in ['register', 'validation']:
if type == 'register':
content = '''
恭喜您注册成功一封验证邮件已经发送到您的邮箱请验证您的邮箱后登录本站
'''
content = '恭喜您注册成功,一封验证邮件已经发送到您的邮箱,请验证您的邮箱后登录本站。'
title = '注册成功'
else:
c_sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
@ -163,9 +151,7 @@ def account_result(request):
return HttpResponseForbidden()
user.is_active = True
user.save()
content = '''
恭喜您已经成功的完成邮箱验证您现在可以使用您的账号来登录本站
'''
content = '恭喜您已经成功的完成邮箱验证,您现在可以使用您的账号来登录本站。'
title = '验证成功'
return render(request, 'account/result.html', {
'title': title,
@ -174,7 +160,7 @@ def account_result(request):
else:
return HttpResponseRedirect('/')
# 忘记密码视图:处理用户提交的新密码
class ForgetPasswordView(FormView):
form_class = ForgetPasswordForm
template_name = 'account/forget_password.html'
@ -188,17 +174,14 @@ class ForgetPasswordView(FormView):
else:
return self.render_to_response({'form': form})
# 忘记密码验证码请求视图:处理用户请求发送验证码到邮箱
class ForgetPasswordEmailCode(View):
def post(self, request: HttpRequest):
form = ForgetPasswordCodeForm(request.POST)
if not form.is_valid():
return HttpResponse("错误的邮箱")
to_email = form.cleaned_data["email"]
code = generate_code()
utils.send_verify_email(to_email, code)
utils.set_code(to_email, code)
return HttpResponse("ok")

@ -5,83 +5,64 @@ from django.urls import reverse
from django.utils.html import format_html
from django.utils.translation import gettext_lazy as _
# Register your models here.
# 引入当前 app 的模型
from .models import Article, Category, Tag, Links, SideBar, BlogSettings
# 自定义文章表单(可扩展,比如集成富文本编辑器)
class ArticleForm(forms.ModelForm):
# body = forms.CharField(widget=AdminPagedownWidget())
class Meta:
model = Article
fields = '__all__'
fields = '__all__' # 表单包含模型的所有字段
# 定义文章管理操作函数
def makr_article_publish(modeladmin, request, queryset):
queryset.update(status='p')
queryset.update(status='p') # 批量将文章状态设为已发布
makr_article_publish.short_description = _('发布选中的文章')
def draft_article(modeladmin, request, queryset):
queryset.update(status='d')
queryset.update(status='d') # 批量设为草稿
draft_article.short_description = _('将选中文章设为草稿')
def close_article_commentstatus(modeladmin, request, queryset):
queryset.update(comment_status='c')
queryset.update(comment_status='c') # 关闭评论
close_article_commentstatus.short_description = _('关闭文章评论')
def open_article_commentstatus(modeladmin, request, queryset):
queryset.update(comment_status='o')
makr_article_publish.short_description = _('Publish selected articles')
draft_article.short_description = _('Draft selected articles')
close_article_commentstatus.short_description = _('Close article comments')
open_article_commentstatus.short_description = _('Open article comments')
queryset.update(comment_status='o') # 开启评论
open_article_commentstatus.short_description = _('开启文章评论')
class ArticlelAdmin(admin.ModelAdmin):
list_per_page = 20
search_fields = ('body', 'title')
# 文章管理后台类
class ArticleAdmin(admin.ModelAdmin):
list_per_page = 20 # 每页显示20条
search_fields = ('body', 'title') # 可搜索字段
form = ArticleForm
list_display = (
'id',
'title',
'author',
'link_to_category',
'creation_time',
'views',
'status',
'type',
'article_order')
list_display_links = ('id', 'title')
list_filter = ('status', 'type', 'category')
date_hierarchy = 'creation_time'
filter_horizontal = ('tags',)
exclude = ('creation_time', 'last_modify_time')
view_on_site = True
actions = [
makr_article_publish,
draft_article,
close_article_commentstatus,
open_article_commentstatus]
raw_id_fields = ('author', 'category',)
list_display = ( # 列表页显示的字段
'id', 'title', 'author', 'link_to_category', 'creation_time',
'views', 'status', 'type', 'article_order'
)
list_display_links = ('id', 'title') # 哪些字段可点击进入编辑页
list_filter = ('status', 'type', 'category') # 右侧过滤器
date_hierarchy = 'creation_time' # 按创建时间分层
filter_horizontal = ('tags',) # 多对多字段用横向过滤器
exclude = ('creation_time', 'last_modify_time') # 后台不显示这两个字段
view_on_site = True # 显示“查看站点”按钮
actions = [makr_article_publish, draft_article, close_article_commentstatus, open_article_commentstatus] # 批量操作
raw_id_fields = ('author', 'category') # 作者和分类用输入框而不是下拉
# 自定义分类字段显示为链接
def link_to_category(self, obj):
info = (obj.category._meta.app_label, obj.category._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.category.id,))
return format_html(u'<a href="%s">%s</a>' % (link, obj.category.name))
link_to_category.short_description = _('分类')
link_to_category.short_description = _('category')
# 限制作者只能选择超级用户
def get_form(self, request, obj=None, **kwargs):
form = super(ArticlelAdmin, self).get_form(request, obj, **kwargs)
form.base_fields['author'].queryset = get_user_model(
).objects.filter(is_superuser=True)
form = super(ArticleAdmin, self).get_form(request, obj, **kwargs)
form.base_fields['author'].queryset = get_user_model().objects.filter(is_superuser=True)
return form
def save_model(self, request, obj, form, change):
super(ArticlelAdmin, self).save_model(request, obj, form, change)
# 获取文章详情页链接
def get_view_on_site_url(self, obj=None):
if obj:
url = obj.get_full_url()
@ -91,24 +72,28 @@ class ArticlelAdmin(admin.ModelAdmin):
site = get_current_site().domain
return site
# 其它模型管理类(简化,仅注册)
class TagAdmin(admin.ModelAdmin):
exclude = ('slug', 'last_mod_time', 'creation_time')
class CategoryAdmin(admin.ModelAdmin):
list_display = ('name', 'parent_category', 'index')
exclude = ('slug', 'last_mod_time', 'creation_time')
class LinksAdmin(admin.ModelAdmin):
exclude = ('last_mod_time', 'creation_time')
class SideBarAdmin(admin.ModelAdmin):
list_display = ('name', 'content', 'is_enable', 'sequence')
exclude = ('last_mod_time', 'creation_time')
class BlogSettingsAdmin(admin.ModelAdmin):
pass
pass # 博客设置,通常唯一,无需复杂操作
# 注册所有模型到 admin
admin.site.register(Article, ArticleAdmin)
admin.site.register(Tag, TagAdmin)
admin.site.register(Category, CategoryAdmin)
admin.site.register(Links, LinksAdmin)
admin.site.register(SideBar, SideBarAdmin)
admin.site.register(BlogSettings, BlogSettingsAdmin)

@ -1,5 +1,4 @@
from django.apps import AppConfig
class BlogConfig(AppConfig):
name = 'blog'
name = 'blog' # 当前 app 名称

@ -1,21 +1,19 @@
import logging
from django.utils import timezone
from djangoblog.utils import cache, get_blog_setting
from djangoblog.utils import cache, get_blog_setting # 假设有这些工具方法
from .models import Category, Article
logger = logging.getLogger(__name__)
def seo_processor(requests):
key = 'seo_processor'
value = cache.get(key)
# 上下文处理器:为每个模板注入 SEO 相关全局变量
def seo_processor(request):
cache_key = 'seo_processor'
value = cache.get(cache_key) # 尝试从缓存读取
if value:
return value
else:
logger.info('set processor cache.')
setting = get_blog_setting()
logger.info('设置 SEO 处理器缓存。')
setting = get_blog_setting() # 获取博客配置
value = {
'SITE_NAME': setting.site_name,
'SHOW_GOOGLE_ADSENSE': setting.show_google_adsense,
@ -23,21 +21,19 @@ def seo_processor(requests):
'SITE_SEO_DESCRIPTION': setting.site_seo_description,
'SITE_DESCRIPTION': setting.site_description,
'SITE_KEYWORDS': setting.site_keywords,
'SITE_BASE_URL': requests.scheme + '://' + requests.get_host() + '/',
'SITE_BASE_URL': request.scheme + '://' + request.get_host() + '/',
'ARTICLE_SUB_LENGTH': setting.article_sub_length,
'nav_category_list': Category.objects.all(),
'nav_pages': Article.objects.filter(
type='p',
status='p'),
'nav_category_list': Category.objects.all(), # 导航分类
'nav_pages': Article.objects.filter(type='p', status='p'), # 已发布页面
'OPEN_SITE_COMMENT': setting.open_site_comment,
'BEIAN_CODE': setting.beian_code,
'ANALYTICS_CODE': setting.analytics_code,
"BEIAN_CODE_GONGAN": setting.gongan_beiancode,
'BEIAN_CODE': setting.beian_code, # 备案号
'ANALYTICS_CODE': setting.analytics_code, # 统计代码
"BEIAN_CODE_GONGAN": setting.gongan_beiancode, # 公安备案号
"SHOW_GONGAN_CODE": setting.show_gongan_code,
"CURRENT_YEAR": timezone.now().year,
"GLOBAL_HEADER": setting.global_header,
"GLOBAL_FOOTER": setting.global_footer,
"COMMENT_NEED_REVIEW": setting.comment_need_review,
}
cache.set(key, value, 60 * 60 * 10)
cache.set(cache_key, value, 60 * 60 * 10) # 缓存10小时
return value

@ -1,151 +1,57 @@
import logging
import time
import elasticsearch.client
from django.conf import settings
from elasticsearch_dsl import Document, InnerDoc, Date, Integer, Long, Text, Object, GeoPoint, Keyword, Boolean
from elasticsearch_dsl import Document, Date, Integer, Keyword, Text, Object, Boolean
from elasticsearch_dsl.connections import connections
from blog.models import Article
logger = logging.getLogger(__name__)
ELASTICSEARCH_ENABLED = hasattr(settings, 'ELASTICSEARCH_DSL')
# 如果启用 ES则建立连接
if ELASTICSEARCH_ENABLED:
connections.create_connection(
hosts=[settings.ELASTICSEARCH_DSL['default']['hosts']])
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
from elasticsearch.client import IngestClient
c = IngestClient(es)
try:
c.get_pipeline('geoip')
except elasticsearch.exceptions.NotFoundError:
c.put_pipeline('geoip', body='''{
"description" : "Add geoip info",
"processors" : [
{
"geoip" : {
"field" : "ip"
}
}
]
}''')
class GeoIp(InnerDoc):
continent_name = Keyword()
country_iso_code = Keyword()
country_name = Keyword()
location = GeoPoint()
connections.create_connection(hosts=[settings.ELASTICSEARCH_DSL['default']['hosts']])
class UserAgentBrowser(InnerDoc):
# 定义用户代理相关内部文档
class UserAgentBrowser(Object):
Family = Keyword()
Version = Keyword()
class UserAgentOS(UserAgentBrowser):
pass
class UserAgentDevice(InnerDoc):
class UserAgentDevice(Object):
Family = Keyword()
Brand = Keyword()
Model = Keyword()
class UserAgent(InnerDoc):
browser = Object(UserAgentBrowser, required=False)
os = Object(UserAgentOS, required=False)
device = Object(UserAgentDevice, required=False)
class UserAgent(Object):
browser = Object(UserAgentBrowser)
os = Object(UserAgentOS)
device = Object(UserAgentDevice)
string = Text()
is_bot = Boolean()
# 性能日志文档
class ElapsedTimeDocument(Document):
url = Keyword()
time_taken = Long()
time_taken = Long() # 请求耗时(毫秒)
log_datetime = Date()
ip = Keyword()
geoip = Object(GeoIp, required=False)
useragent = Object(UserAgent, required=False)
geoip = Object() # 可添加 GeoIP 信息
useragent = Object(UserAgent)
class Index:
name = 'performance'
settings = {
"number_of_shards": 1,
"number_of_replicas": 0
}
class Meta:
doc_type = 'ElapsedTime'
class ElaspedTimeDocumentManager:
@staticmethod
def build_index():
from elasticsearch import Elasticsearch
client = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
res = client.indices.exists(index="performance")
if not res:
ElapsedTimeDocument.init()
@staticmethod
def delete_index():
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
es.indices.delete(index='performance', ignore=[400, 404])
@staticmethod
def create(url, time_taken, log_datetime, useragent, ip):
ElaspedTimeDocumentManager.build_index()
ua = UserAgent()
ua.browser = UserAgentBrowser()
ua.browser.Family = useragent.browser.family
ua.browser.Version = useragent.browser.version_string
ua.os = UserAgentOS()
ua.os.Family = useragent.os.family
ua.os.Version = useragent.os.version_string
ua.device = UserAgentDevice()
ua.device.Family = useragent.device.family
ua.device.Brand = useragent.device.brand
ua.device.Model = useragent.device.model
ua.string = useragent.ua_string
ua.is_bot = useragent.is_bot
doc = ElapsedTimeDocument(
meta={
'id': int(
round(
time.time() *
1000))
},
url=url,
time_taken=time_taken,
log_datetime=log_datetime,
useragent=ua, ip=ip)
doc.save(pipeline="geoip")
# 文章搜索文档
class ArticleDocument(Document):
body = Text(analyzer='ik_max_word', search_analyzer='ik_smart')
title = Text(analyzer='ik_max_word', search_analyzer='ik_smart')
author = Object(properties={
'nickname': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
'id': Integer()
})
category = Object(properties={
'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
'id': Integer()
})
tags = Object(properties={
'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
'id': Integer()
})
body = Text(analyzer='ik_max_word') # 使用 ik 中文分词
title = Text(analyzer='ik_max_word')
author = Object(properties={'nickname': Text(analyzer='ik_max_word'), 'id': Integer()})
category = Object(properties={'name': Text(analyzer='ik_max_word'), 'id': Integer()})
tags = Object(properties={'name': Text(analyzer='ik_max_word'), 'id': Integer()})
pub_time = Date()
status = Text()
comment_status = Text()
@ -155,59 +61,5 @@ class ArticleDocument(Document):
class Index:
name = 'blog'
settings = {
"number_of_shards": 1,
"number_of_replicas": 0
}
class Meta:
doc_type = 'Article'
class ArticleDocumentManager():
def __init__(self):
self.create_index()
def create_index(self):
ArticleDocument.init()
def delete_index(self):
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
es.indices.delete(index='blog', ignore=[400, 404])
def convert_to_doc(self, articles):
return [
ArticleDocument(
meta={
'id': article.id},
body=article.body,
title=article.title,
author={
'nickname': article.author.username,
'id': article.author.id},
category={
'name': article.category.name,
'id': article.category.id},
tags=[
{
'name': t.name,
'id': t.id} for t in article.tags.all()],
pub_time=article.pub_time,
status=article.status,
comment_status=article.comment_status,
type=article.type,
views=article.views,
article_order=article.article_order) for article in articles]
def rebuild(self, articles=None):
ArticleDocument.init()
articles = articles if articles else Article.objects.all()
docs = self.convert_to_doc(articles)
for doc in docs:
doc.save()
def update_docs(self, docs):
for doc in docs:
doc.save()
# (后续可补充对应的管理器,用于创建索引、更新等操作,见您 documents.py 的其它部分)

@ -1,19 +1,15 @@
import logging
from django import forms
from haystack.forms import SearchForm
logger = logging.getLogger(__name__)
class BlogSearchForm(SearchForm):
querydata = forms.CharField(required=True)
querydata = forms.CharField(required=True) # 必须输入搜索关键词
def search(self):
datas = super(BlogSearchForm, self).search()
if not self.is_valid():
return self.no_query_found()
if self.cleaned_data['querydata']:
logger.info(self.cleaned_data['querydata'])
datas = super().search()
logger.info(self.cleaned_data['querydata']) # 记录搜索词
return datas

@ -1,18 +1,32 @@
from django.core.management.base import BaseCommand
from blog.documents import ElapsedTimeDocument, ArticleDocumentManager, ElaspedTimeDocumentManager, \
ELASTICSEARCH_ENABLED
# 导入与 Elasticsearch 相关的文档和文档管理器
from blog.documents import (
ElapsedTimeDocument, # 假设这是一个与时间相关的 Elasticsearch 文档
ArticleDocumentManager, # 文章的 Elasticsearch 文档管理器
ElaspedTimeDocumentManager, # 假设这是一个与 ElapsedTime 相关的文档管理器(注意拼写可能是 Elapsed
ELASTICSEARCH_ENABLED # 一个标志,指示是否启用 Elasticsearch
)
# TODO 参数化
class Command(BaseCommand):
help = 'build search index'
help = '构建搜索索引' # 命令的帮助信息,显示在 python manage.py help build_search_index 中
def handle(self, *args, **options):
"""
命令的主要处理逻辑
"""
if ELASTICSEARCH_ENABLED:
# 如果启用了 Elasticsearch则执行以下操作
# 构建 ElaspedTime 的索引(假设是某种时间相关的索引)
ElaspedTimeDocumentManager.build_index()
# 获取 ElapsedTimeDocument 的管理器实例并初始化它
manager = ElapsedTimeDocument()
manager.init()
# 获取 ArticleDocumentManager 的实例,先删除现有的文章索引,然后重建索引
manager = ArticleDocumentManager()
manager.delete_index()
manager.rebuild()
manager.delete_index() # 删除当前的文章索引
manager.rebuild() # 重新构建文章索引

@ -1,13 +1,21 @@
from django.core.management.base import BaseCommand
# 导入项目中的 Tag 和 Category 模型
from blog.models import Tag, Category
# TODO 参数化
class Command(BaseCommand):
help = 'build search words'
help = '构建搜索关键词' # 命令的帮助信息
def handle(self, *args, **options):
datas = set([t.name for t in Tag.objects.all()] +
[t.name for t in Category.objects.all()])
"""
命令的主要处理逻辑
"""
# 从数据库中获取所有 Tag 和 Category 的名称,并去重
datas = set([
t.name for t in Tag.objects.all() # 所有标签的名称
+ [t.name for t in Category.objects.all()] # 所有分类的名称
])
# 将去重后的名称集合转换为以换行符分隔的字符串,并打印出来
print('\n'.join(datas))

@ -1,11 +1,18 @@
from django.core.management.base import BaseCommand
# 导入自定义的缓存工具
from djangoblog.utils import cache
class Command(BaseCommand):
help = 'clear the whole cache'
help = '清除所有缓存' # 命令的帮助信息
def handle(self, *args, **options):
"""
命令的主要处理逻辑
"""
# 调用缓存工具的 clear 方法,清除所有缓存
cache.clear()
self.stdout.write(self.style.SUCCESS('Cleared cache\n'))
# 输出成功信息,使用 Django 管理命令的样式输出
self.stdout.write(self.style.SUCCESS('已清除缓存\n'))

@ -2,39 +2,67 @@ from django.contrib.auth import get_user_model
from django.contrib.auth.hashers import make_password
from django.core.management.base import BaseCommand
# 导入项目中的 Article, Tag, Category 模型
from blog.models import Article, Tag, Category
class Command(BaseCommand):
help = 'create test datas'
help = '创建测试数据' # 命令的帮助信息
def handle(self, *args, **options):
"""
命令的主要处理逻辑
"""
# 获取或创建一个测试用户
user = get_user_model().objects.get_or_create(
email='test@test.com', username='测试用户', password=make_password('test!q@w#eTYU'))[0]
email='test@test.com', # 用户邮箱
username='测试用户', # 用户名
password=make_password('test!q@w#eTYU') # 加密后的密码
)[0] # get_or_create 返回一个元组 (对象, 创建与否),我们只需要对象
# 获取或创建一个父级分类
pcategory = Category.objects.get_or_create(
name='我是父类目', parent_category=None)[0]
name='我是父类目', # 父分类名称
parent_category=None # 父分类为 None表示这是顶级分类
)[0]
# 获取或创建一个子分类,其父分类为上面创建的父分类
category = Category.objects.get_or_create(
name='子类目', parent_category=pcategory)[0]
name='子类目', # 子分类名称
parent_category=pcategory # 指定父分类
)[0]
category.save() # 保存分类(虽然 get_or_create 已经保存,但显式保存也无妨)
category.save()
# 创建一个基础标签
basetag = Tag()
basetag.name = "标签"
basetag.save()
basetag.name = "标签" # 标签名称
basetag.save() # 保存标签
# 循环创建 19 篇测试文章
for i in range(1, 20):
# 获取或创建一篇文章
article = Article.objects.get_or_create(
category=category,
title='nice title ' + str(i),
body='nice content ' + str(i),
author=user)[0]
category=category, # 关联的分类
title='nice title ' + str(i), # 文章标题
body='nice content ' + str(i),# 文章内容
author=user # 文章作者
)[0]
# 创建一个新标签
tag = Tag()
tag.name = "标签" + str(i)
tag.save()
article.tags.add(tag)
article.tags.add(basetag)
tag.name = "标签" + str(i) # 标签名称
tag.save() # 保存标签
# 将新标签和基础标签添加到文章中
article.tags.add(tag) # 添加新标签
article.tags.add(basetag) # 添加基础标签
# 保存文章(虽然 add 方法不会自动保存,但通常 get_or_create 已经保存)
article.save()
# 清除所有缓存,以确保新的测试数据在缓存中正确反映
from djangoblog.utils import cache
cache.clear()
self.stdout.write(self.style.SUCCESS('created test datas \n'))
# 输出成功信息
self.stdout.write(self.style.SUCCESS('已创建测试数据 \n'))

@ -1,50 +1,75 @@
from django.core.management.base import BaseCommand
# 导入自定义的百度通知工具和获取当前站点的工具
from djangoblog.spider_notify import SpiderNotify
from djangoblog.utils import get_current_site
# 导入项目中的 Article, Tag, Category 模型
from blog.models import Article, Tag, Category
# 获取当前站点的域名
site = get_current_site().domain
class Command(BaseCommand):
help = 'notify baidu url'
help = '通知百度 URL' # 命令的帮助信息
def add_arguments(self, parser):
"""
为命令添加自定义参数
"""
parser.add_argument(
'data_type',
type=str,
choices=[
'all',
'article',
'tag',
'category'],
help='article : all article,tag : all tag,category: all category,all: All of these')
'all', # 所有类型
'article', # 仅文章
'tag', # 仅标签
'category' # 仅分类
],
help='选择要通知的数据类型: article所有文章, tag所有标签, category所有分类, all所有类型'
)
def get_full_url(self, path):
"""
根据给定的路径构建完整的 URL
"""
url = "https://{site}{path}".format(site=site, path=path)
return url
def handle(self, *args, **options):
type = options['data_type']
self.stdout.write('start get %s' % type)
"""
命令的主要处理逻辑
"""
data_type = options['data_type'] # 获取传入的数据类型参数
self.stdout.write('开始获取 %s' % data_type) # 输出开始信息
urls = [] # 用于存储需要通知的 URL 列表
urls = []
if type == 'article' or type == 'all':
if data_type == 'article' or data_type == 'all':
# 如果数据类型是文章或所有,则遍历所有状态为 'p'(假设 'p' 表示已发布)的文章
for article in Article.objects.filter(status='p'):
urls.append(article.get_full_url())
if type == 'tag' or type == 'all':
urls.append(article.get_full_url()) # 获取文章的完整 URL 并添加到列表中
if data_type == 'tag' or data_type == 'all':
# 如果数据类型是标签或所有,则遍历所有标签
for tag in Tag.objects.all():
url = tag.get_absolute_url()
urls.append(self.get_full_url(url))
if type == 'category' or type == 'all':
url = tag.get_absolute_url() # 获取标签的绝对 URL
urls.append(self.get_full_url(url)) # 构建完整 URL 并添加到列表中
if data_type == 'category' or data_type == 'all':
# 如果数据类型是分类或所有,则遍历所有分类
for category in Category.objects.all():
url = category.get_absolute_url()
urls.append(self.get_full_url(url))
url = category.get_absolute_url() # 获取分类的绝对 URL
urls.append(self.get_full_url(url)) # 构建完整 URL 并添加到列表中
self.stdout.write(
self.style.SUCCESS(
'start notify %d urls' %
len(urls)))
'开始通知 %d 个 URL' %
len(urls) # 输出将要通知的 URL 数量
)
)
# 调用百度通知工具,通知所有收集到的 URL
SpiderNotify.baidu_notify(urls)
self.stdout.write(self.style.SUCCESS('finish notify'))
# 输出完成通知的信息
self.stdout.write(self.style.SUCCESS('完成通知'))

@ -2,46 +2,67 @@ import requests
from django.core.management.base import BaseCommand
from django.templatetags.static import static
# 导入自定义的用户头像保存工具和 OAuth 用户模型
from djangoblog.utils import save_user_avatar
from oauth.models import OAuthUser
from oauth.oauthmanager import get_manager_by_type
class Command(BaseCommand):
help = 'sync user avatar'
help = '同步用户头像' # 命令的帮助信息
def test_picture(self, url):
"""
测试给定的图片 URL 是否可访问返回状态码 200
"""
try:
if requests.get(url, timeout=2).status_code == 200:
return True
except:
pass
return False
def handle(self, *args, **options):
static_url = static("../")
users = OAuthUser.objects.all()
self.stdout.write(f'开始同步{len(users)}个用户头像')
"""
命令的主要处理逻辑
"""
static_url = static("../") # 获取静态文件的基准 URL具体根据项目配置可能不同
users = OAuthUser.objects.all() # 获取所有的 OAuth 用户
self.stdout.write(f'开始同步 {len(users)} 个用户头像') # 输出开始信息,显示要同步的用户数量
for u in users:
self.stdout.write(f'开始同步:{u.nickname}')
url = u.picture
self.stdout.write(f'开始同步: {u.nickname}') # 输出当前正在同步的用户昵称
url = u.picture # 获取用户当前的头像 URL
if url:
if url.startswith(static_url):
# 如果头像 URL 是静态文件 URL
if self.test_picture(url):
# 如果图片可访问,则跳过同步
continue
else:
# 如果图片不可访问,则尝试通过 OAuth 管理器获取新的头像 URL
if u.metadata:
manage = get_manager_by_type(u.type)
url = manage.get_picture(u.metadata)
url = save_user_avatar(url)
manage = get_manager_by_type(u.type) # 根据用户类型获取相应的 OAuth 管理器
url = manage.get_picture(u.metadata) # 获取新的头像 URL
url = save_user_avatar(url) # 保存头像并获取保存后的 URL
else:
# 如果没有元数据,则使用默认头像
url = static('blog/img/avatar.png')
else:
# 如果头像 URL 不是静态文件 URL则直接保存头像并获取保存后的 URL
url = save_user_avatar(url)
else:
# 如果用户没有头像 URL则使用默认头像
url = static('blog/img/avatar.png')
if url:
# 如果获取到了有效的头像 URL则更新用户的头像字段并保存
self.stdout.write(
f'结束同步:{u.nickname}.url:{url}')
f'结束同步: {u.nickname}.url: {url}' # 输出同步完成信息,显示用户昵称和新头像 URL
)
u.picture = url
u.save()
# 输出同步完成的总体信息
self.stdout.write('结束同步')

@ -1,6 +1,5 @@
import logging
import time
from ipware import get_client_ip
from user_agents import parse
@ -8,35 +7,34 @@ from blog.documents import ELASTICSEARCH_ENABLED, ElaspedTimeDocumentManager
logger = logging.getLogger(__name__)
class OnlineMiddleware(object):
def __init__(self, get_response=None):
class OnlineMiddleware:
def __init__(self, get_response):
self.get_response = get_response
super().__init__()
def __call__(self, request):
''' page render time '''
start_time = time.time()
response = self.get_response(request)
http_user_agent = request.META.get('HTTP_USER_AGENT', '')
ip, _ = get_client_ip(request)
user_agent = parse(http_user_agent)
if not response.streaming:
try:
cast_time = time.time() - start_time
if ELASTICSEARCH_ENABLED:
time_taken = round((cast_time) * 1000, 2)
time_taken = round(cast_time * 1000, 2)
url = request.path
from django.utils import timezone
ElaspedTimeDocumentManager.create(
url=url,
time_taken=time_taken,
url=url, time_taken=time_taken,
log_datetime=timezone.now(),
useragent=user_agent,
ip=ip)
useragent=user_agent, ip=ip
)
# 在页面中显示加载时间
response.content = response.content.replace(
b'<!!LOAD_TIMES!!>', str.encode(str(cast_time)[:5]))
b'<!!LOAD_TIMES!!>', str.encode(str(cast_time)[:5])
)
except Exception as e:
logger.error("Error OnlineMiddleware: %s" % e)
logger.error("OnlineMiddleware 错误: %s" % e)
return response

@ -1,4 +1,4 @@
# Generated by Django 4.1.7 on 2023-03-02 07:14
# 由Django 4.1.7于2023-03-02 07:14生成
from django.conf import settings
from django.db import migrations, models
@ -6,16 +6,16 @@ import django.db.models.deletion
import django.utils.timezone
import mdeditor.fields
class Migration(migrations.Migration):
initial = True
initial = True # 标记这是初始迁移
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
migrations.swappable_dependency(settings.AUTH_USER_MODEL), # 依赖用户模型
]
operations = [
# 创建网站配置模型
migrations.CreateModel(
name='BlogSettings',
fields=[
@ -37,10 +37,11 @@ class Migration(migrations.Migration):
('gongan_beiancode', models.TextField(blank=True, default='', max_length=2000, null=True, verbose_name='公安备案号')),
],
options={
'verbose_name': '网站配置',
'verbose_name': '网站配置', # 模型在管理界面显示的名称
'verbose_name_plural': '网站配置',
},
),
# 创建友情链接模型
migrations.CreateModel(
name='Links',
fields=[
@ -56,9 +57,10 @@ class Migration(migrations.Migration):
options={
'verbose_name': '友情链接',
'verbose_name_plural': '友情链接',
'ordering': ['sequence'],
'ordering': ['sequence'], # 排序依据
},
),
# 创建侧边栏模型
migrations.CreateModel(
name='SideBar',
fields=[
@ -76,6 +78,7 @@ class Migration(migrations.Migration):
'ordering': ['sequence'],
},
),
# 创建标签模型
migrations.CreateModel(
name='Tag',
fields=[
@ -91,6 +94,7 @@ class Migration(migrations.Migration):
'ordering': ['name'],
},
),
# 创建分类模型
migrations.CreateModel(
name='Category',
fields=[
@ -108,6 +112,7 @@ class Migration(migrations.Migration):
'ordering': ['-index'],
},
),
# 创建文章模型
migrations.CreateModel(
name='Article',
fields=[

@ -1,20 +1,21 @@
# Generated by Django 4.1.7 on 2023-03-29 06:08
# 由Django 4.1.7于2023-03-29 06:08生成
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('blog', '0001_initial'),
('blog', '0001_initial'), # 依赖于初始迁移
]
operations = [
# 向BlogSettings模型添加公共尾部字段
migrations.AddField(
model_name='blogsettings',
name='global_footer',
field=models.TextField(blank=True, default='', null=True, verbose_name='公共尾部'),
),
# 向BlogSettings模型添加公共头部字段
migrations.AddField(
model_name='blogsettings',
name='global_header',

@ -1,14 +1,15 @@
# Generated by Django 4.2.1 on 2023-05-09 07:45
# 由Django 4.2.1于2023-05-09 07:45生成
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('blog', '0002_blogsettings_global_footer_and_more'),
('blog', '0002_blogsettings_global_footer_and_more'), # 依赖于上一个迁移
]
operations = [
# 向BlogSettings模型添加评论是否需要审核字段
migrations.AddField(
model_name='blogsettings',
name='comment_need_review',

@ -1,24 +1,27 @@
# Generated by Django 4.2.1 on 2023-05-09 07:51
# 由Django 4.2.1于2023-05-09 07:51生成
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('blog', '0003_blogsettings_comment_need_review'),
('blog', '0003_blogsettings_comment_need_review'), # 依赖于上一个迁移
]
operations = [
# 重命名BlogSettings模型中的analyticscode字段为analytics_code
migrations.RenameField(
model_name='blogsettings',
old_name='analyticscode',
new_name='analytics_code',
),
# 重命名BlogSettings模型中的beiancode字段为beian_code
migrations.RenameField(
model_name='blogsettings',
old_name='beiancode',
new_name='beian_code',
),
# 重命名BlogSettings模型中的sitename字段为site_name
migrations.RenameField(
model_name='blogsettings',
old_name='sitename',

@ -1,297 +1,449 @@
# Generated by Django 4.2.5 on 2023-09-06 13:13
# 该迁移文件由 Django 4.2.5 于 2023-09-06 13:13 自动生成
# 依赖于当前项目的用户模型AUTH_USER_MODEL和上一个博客应用的迁移 '0004_rename_analyticscode_blogsettings_analytics_code_and_more'
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
import mdeditor.fields
from django.conf import settings # 用于引入项目设置,特别是 AUTH_USER_MODEL
from django.db import migrations, models # Django 的迁移与模型字段工具
import django.db.models.deletion # 用于定义外键删除策略
import django.utils.timezone # 用于获取当前时间(带时区)
import mdeditor.fields # 引入 Markdown 编辑器字段,用于富文本
class Migration(migrations.Migration):
# 该迁移依赖的项目模块
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('blog', '0004_rename_analyticscode_blogsettings_analytics_code_and_more'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL), # 依赖用户模型,允许自定义用户模型
('blog', '0004_rename_analyticscode_blogsettings_analytics_code_and_more'), # 依赖于上一个迁移
]
operations = [
# ========== 1. 调整模型 Meta 选项(管理后台显示名称、排序等)==========
# 调整 Article 模型的 Meta 选项:
# - 获取最新记录的依据字段为 id
# - 默认排序:先按 article_order 倒序(数字大的在前),再按发布时间倒序
# - 后台显示名称:单数为 'article',复数为 'article'
migrations.AlterModelOptions(
name='article',
options={'get_latest_by': 'id', 'ordering': ['-article_order', '-pub_time'], 'verbose_name': 'article', 'verbose_name_plural': 'article'},
),
# 调整 Category 模型的 Meta 选项:
# - 默认排序:按 index 倒序(权重高的排前面)
migrations.AlterModelOptions(
name='category',
options={'ordering': ['-index'], 'verbose_name': 'category', 'verbose_name_plural': 'category'},
),
# 调整 Links 模型的 Meta 选项:
# - 默认排序:按 sequence排序字段
migrations.AlterModelOptions(
name='links',
options={'ordering': ['sequence'], 'verbose_name': 'link', 'verbose_name_plural': 'link'},
),
# 调整 Sidebar 模型的 Meta 选项:
# - 默认排序:按 sequence
migrations.AlterModelOptions(
name='sidebar',
options={'ordering': ['sequence'], 'verbose_name': 'sidebar', 'verbose_name_plural': 'sidebar'},
),
# 调整 Tag 模型的 Meta 选项:
# - 默认排序:按 name标签名字母顺序
migrations.AlterModelOptions(
name='tag',
options={'ordering': ['name'], 'verbose_name': 'tag', 'verbose_name_plural': 'tag'},
),
# ========== 2. 删除旧的时间字段created_time 和 last_mod_time==========
# 从 Article 模型中移除 created_time 字段
migrations.RemoveField(
model_name='article',
name='created_time',
),
# 从 Article 模型中移除 last_mod_time 字段
migrations.RemoveField(
model_name='article',
name='last_mod_time',
),
# 从 Category 模型中移除 created_time 字段
migrations.RemoveField(
model_name='category',
name='created_time',
),
# 从 Category 模型中移除 last_mod_time 字段
migrations.RemoveField(
model_name='category',
name='last_mod_time',
),
# 从 Links 模型中移除 created_time 字段
migrations.RemoveField(
model_name='links',
name='created_time',
),
# 从 Sidebar 模型中移除 created_time 字段
migrations.RemoveField(
model_name='sidebar',
name='created_time',
),
# 从 Tag 模型中移除 created_time 字段
migrations.RemoveField(
model_name='tag',
name='created_time',
),
# 从 Tag 模型中移除 last_mod_time 字段
migrations.RemoveField(
model_name='tag',
name='last_mod_time',
),
# ========== 3. 新增新的时间字段creation_time 和 last_modify_time==========
# 为 Article 模型新增 creation_time 字段,记录文章创建时间,默认为当前时间
migrations.AddField(
model_name='article',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 为 Article 模型新增 last_modify_time 字段,记录文章最后修改时间,默认为当前时间
migrations.AddField(
model_name='article',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
# 为 Category 模型新增 creation_time 字段
migrations.AddField(
model_name='category',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 为 Category 模型新增 last_modify_time 字段
migrations.AddField(
model_name='category',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
# 为 Links 模型新增 creation_time 字段
migrations.AddField(
model_name='links',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 为 Sidebar 模型新增 creation_time 字段
migrations.AddField(
model_name='sidebar',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 为 Tag 模型新增 creation_time 字段
migrations.AddField(
model_name='tag',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 为 Tag 模型新增 last_modify_time 字段
migrations.AddField(
model_name='tag',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
# ========== 4. 调整多个字段的属性verbose_name、字段类型、选项等==========
# 调整 Article 模型的 article_order 字段,用于排序,数字越大越靠前
migrations.AlterField(
model_name='article',
name='article_order',
field=models.IntegerField(default=0, verbose_name='order'),
),
# 调整 Article 模型的 author 字段关联到当前项目的用户模型AUTH_USER_MODEL
migrations.AlterField(
model_name='article',
name='author',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'),
),
# 调整 Article 模型的 body 字段,使用 Markdown 编辑器字段(支持富文本)
migrations.AlterField(
model_name='article',
name='body',
field=mdeditor.fields.MDTextField(verbose_name='body'),
),
# 调整 Article 模型的 category 字段,关联到 Category 模型
migrations.AlterField(
model_name='article',
name='category',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='blog.category', verbose_name='category'),
),
# 调整 Article 模型的 comment_status 字段表示评论状态Open开放或 Close关闭
migrations.AlterField(
model_name='article',
name='comment_status',
field=models.CharField(choices=[('o', 'Open'), ('c', 'Close')], default='o', max_length=1, verbose_name='comment status'),
),
# 调整 Article 模型的 pub_time 字段,表示文章发布时间,默认为当前时间
migrations.AlterField(
model_name='article',
name='pub_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='publish time'),
),
# 调整 Article 模型的 show_toc 字段表示是否显示目录Table of Contents
migrations.AlterField(
model_name='article',
name='show_toc',
field=models.BooleanField(default=False, verbose_name='show toc'),
),
# 调整 Article 模型的 status 字段表示文章状态Draft草稿或 Published已发布
migrations.AlterField(
model_name='article',
name='status',
field=models.CharField(choices=[('d', 'Draft'), ('p', 'Published')], default='p', max_length=1, verbose_name='status'),
),
# 调整 Article 模型的 tags 字段,与 Tag 模型建立多对多关系,表示文章可以有多个标签
migrations.AlterField(
model_name='article',
name='tags',
field=models.ManyToManyField(blank=True, to='blog.tag', verbose_name='tag'),
),
# 调整 Article 模型的 title 字段,文章标题,要求唯一
migrations.AlterField(
model_name='article',
name='title',
field=models.CharField(max_length=200, unique=True, verbose_name='title'),
),
# 调整 Article 模型的 type 字段表示文章类型Article文章或 Page页面如关于页面
migrations.AlterField(
model_name='article',
name='type',
field=models.CharField(choices=[('a', 'Article'), ('p', 'Page')], default='a', max_length=1, verbose_name='type'),
),
# 调整 Article 模型的 views 字段,表示文章浏览量
migrations.AlterField(
model_name='article',
name='views',
field=models.PositiveIntegerField(default=0, verbose_name='views'),
),
# ========== 5. 调整 BlogSettings 模型各字段的属性 ==========
# 调整文章评论数量设置字段
migrations.AlterField(
model_name='blogsettings',
name='article_comment_count',
field=models.IntegerField(default=5, verbose_name='article comment count'),
),
# 调整文章摘要显示长度设置字段
migrations.AlterField(
model_name='blogsettings',
name='article_sub_length',
field=models.IntegerField(default=300, verbose_name='article sub length'),
),
# 调整 Google AdSense 广告代码字段
migrations.AlterField(
model_name='blogsettings',
name='google_adsense_codes',
field=models.TextField(blank=True, default='', max_length=2000, null=True, verbose_name='adsense code'),
),
# 调整是否开放网站评论功能的字段
migrations.AlterField(
model_name='blogsettings',
name='open_site_comment',
field=models.BooleanField(default=True, verbose_name='open site comment'),
),
# 调整是否显示 Google AdSense 广告的字段
migrations.AlterField(
model_name='blogsettings',
name='show_google_adsense',
field=models.BooleanField(default=False, verbose_name='show adsense'),
),
# 调整侧边栏文章数量设置字段
migrations.AlterField(
model_name='blogsettings',
name='sidebar_article_count',
field=models.IntegerField(default=10, verbose_name='sidebar article count'),
),
# 调整侧边栏评论数量设置字段
migrations.AlterField(
model_name='blogsettings',
name='sidebar_comment_count',
field=models.IntegerField(default=5, verbose_name='sidebar comment count'),
),
# 调整网站描述字段
migrations.AlterField(
model_name='blogsettings',
name='site_description',
field=models.TextField(default='', max_length=1000, verbose_name='site description'),
),
# 调整网站关键字字段
migrations.AlterField(
model_name='blogsettings',
name='site_keywords',
field=models.TextField(default='', max_length=1000, verbose_name='site keywords'),
),
# 调整网站名称字段
migrations.AlterField(
model_name='blogsettings',
name='site_name',
field=models.CharField(default='', max_length=200, verbose_name='site name'),
),
# 调整网站 SEO 描述字段
migrations.AlterField(
model_name='blogsettings',
name='site_seo_description',
field=models.TextField(default='', max_length=1000, verbose_name='site seo description'),
),
# ========== 6. 调整 Category 模型字段属性 ==========
# 调整分类权重排序字段
migrations.AlterField(
model_name='category',
name='index',
field=models.IntegerField(default=0, verbose_name='index'),
),
# 调整分类名称字段,要求唯一
migrations.AlterField(
model_name='category',
name='name',
field=models.CharField(max_length=30, unique=True, verbose_name='category name'),
),
# 调整分类的父级分类字段,允许为空,实现分类嵌套
migrations.AlterField(
model_name='category',
name='parent_category',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='blog.category', verbose_name='parent category'),
),
# ========== 7. 调整 Links 模型字段属性 ==========
# 调整友情链接是否启用显示的字段
migrations.AlterField(
model_name='links',
name='is_enable',
field=models.BooleanField(default=True, verbose_name='is show'),
),
# 调整友情链接的最后修改时间字段
migrations.AlterField(
model_name='links',
name='last_mod_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
# 调整友情链接的链接地址字段
migrations.AlterField(
model_name='links',
name='link',
field=models.URLField(verbose_name='link'),
),
# 调整友情链接的名称字段,要求唯一
migrations.AlterField(
model_name='links',
name='name',
field=models.CharField(max_length=30, unique=True, verbose_name='link name'),
),
# 调整友情链接的排序字段
migrations.AlterField(
model_name='links',
name='sequence',
field=models.IntegerField(unique=True, verbose_name='order'),
),
# 调整友情链接的显示类型字段,如首页、列表页、文章页等
migrations.AlterField(
model_name='links',
name='show_type',
field=models.CharField(choices=[('i', 'index'), ('l', 'list'), ('p', 'post'), ('a', 'all'), ('s', 'slide')], default='i', max_length=1, verbose_name='show type'),
),
# ========== 8. 调整 Sidebar 模型字段属性 ==========
# 调整侧边栏的内容字段
migrations.AlterField(
model_name='sidebar',
name='content',
field=models.TextField(verbose_name='content'),
),
# 调整侧边栏是否启用的字段
migrations.AlterField(
model_name='sidebar',
name='is_enable',
field=models.BooleanField(default=True, verbose_name='is enable'),
),
# 调整侧边栏的最后修改时间字段
migrations.AlterField(
model_name='sidebar',
name='last_mod_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
# 调整侧边栏的标题字段
migrations.AlterField(
model_name='sidebar',
name='name',
field=models.CharField(max_length=100, verbose_name='title'),
),
# 调整侧边栏的排序字段
migrations.AlterField(
model_name='sidebar',
name='sequence',
field=models.IntegerField(unique=True, verbose_name='order'),
),
# ========== 9. 调整 Tag 模型字段属性 ==========
# 调整标签的名称字段,要求唯一
migrations.AlterField(
model_name='tag',
name='name',

@ -1,15 +1,15 @@
# Generated by Django 4.2.7 on 2024-01-26 02:41
# 由Django 4.2.7于2024-01-26 02:41生成
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('blog', '0005_alter_article_options_alter_category_options_and_more'),
('blog', '0005_alter_article_options_alter_category_options_and_more'), # 依赖于上一个迁移
]
operations = [
# 修改BlogSettings模型的选项设置其在管理界面的单数和复数显示名称
migrations.AlterModelOptions(
name='blogsettings',
options={'verbose_name': 'Website configuration', 'verbose_name_plural': 'Website configuration'},

@ -1,9 +1,3 @@
import logging
import re
from abc import abstractmethod
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.urls import reverse
from django.utils.timezone import now
@ -11,366 +5,46 @@ from django.utils.translation import gettext_lazy as _
from mdeditor.fields import MDTextField
from uuslug import slugify
from djangoblog.utils import cache_decorator, cache
from djangoblog.utils import get_current_site
logger = logging.getLogger(__name__)
class LinkShowType(models.TextChoices):
I = ('i', _('index'))
L = ('l', _('list'))
P = ('p', _('post'))
A = ('a', _('all'))
S = ('s', _('slide'))
class BaseModel(models.Model):
id = models.AutoField(primary_key=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('modify time'), default=now)
creation_time = models.DateTimeField(_('创建时间'), default=now)
last_modify_time = models.DateTimeField(_('修改时间'), default=now)
class Meta:
abstract = True
def save(self, *args, **kwargs):
is_update_views = isinstance(
self,
Article) and 'update_fields' in kwargs and kwargs['update_fields'] == ['views']
if is_update_views:
Article.objects.filter(pk=self.pk).update(views=self.views)
else:
if 'slug' in self.__dict__:
slug = getattr(
self, 'title') if 'title' in self.__dict__ else getattr(
self, 'name')
setattr(self, 'slug', slugify(slug))
super().save(*args, **kwargs)
if 'slug' in self.__dict__:
slug = getattr(self, 'title') if 'title' in self.__dict__ else getattr(self, 'name')
setattr(self, 'slug', slugify(slug))
super().save(*args, **kwargs)
def get_full_url(self):
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
return url
class Meta:
abstract = True
site = "你的域名逻辑" # 应调用 get_current_site()
return f"https://{site}{self.get_absolute_url()}"
@abstractmethod
def get_absolute_url(self):
pass
class Article(BaseModel):
"""文章"""
STATUS_CHOICES = (
('d', _('Draft')),
('p', _('Published')),
)
COMMENT_STATUS = (
('o', _('Open')),
('c', _('Close')),
)
TYPE = (
('a', _('Article')),
('p', _('Page')),
)
title = models.CharField(_('title'), max_length=200, unique=True)
body = MDTextField(_('body'))
pub_time = models.DateTimeField(
_('publish time'), blank=False, null=False, default=now)
status = models.CharField(
_('status'),
max_length=1,
choices=STATUS_CHOICES,
default='p')
comment_status = models.CharField(
_('comment status'),
max_length=1,
choices=COMMENT_STATUS,
default='o')
type = models.CharField(_('type'), max_length=1, choices=TYPE, default='a')
views = models.PositiveIntegerField(_('views'), default=0)
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
blank=False,
null=False,
on_delete=models.CASCADE)
article_order = models.IntegerField(
_('order'), blank=False, null=False, default=0)
show_toc = models.BooleanField(_('show toc'), blank=False, null=False, default=False)
category = models.ForeignKey(
'Category',
verbose_name=_('category'),
on_delete=models.CASCADE,
blank=False,
null=False)
tags = models.ManyToManyField('Tag', verbose_name=_('tag'), blank=True)
STATUS_CHOICES = (('d', _('草稿')), ('p', _('发布')))
title = models.CharField(_('标题'), max_length=200, unique=True)
body = MDTextField(_('正文'))
status = models.CharField(_('状态'), max_length=1, choices=STATUS_CHOICES, default='p')
author = models.ForeignKey('auth.User', on_delete=models.CASCADE)
pub_time = models.DateTimeField(_('发布时间'), default=now)
views = models.PositiveIntegerField(_('浏览量'), default=0)
category = models.ForeignKey('Category', on_delete=models.CASCADE)
tags = models.ManyToManyField('Tag', blank=True)
def body_to_string(self):
return self.body
def get_absolute_url(self):
return reverse('blog:detail', kwargs={'article_id': self.id})
def __str__(self):
return self.title
class Meta:
ordering = ['-article_order', '-pub_time']
verbose_name = _('article')
verbose_name_plural = verbose_name
get_latest_by = 'id'
def get_absolute_url(self):
return reverse('blog:detailbyid', kwargs={
'article_id': self.id,
'year': self.creation_time.year,
'month': self.creation_time.month,
'day': self.creation_time.day
})
@cache_decorator(60 * 60 * 10)
def get_category_tree(self):
tree = self.category.get_category_tree()
names = list(map(lambda c: (c.name, c.get_absolute_url()), tree))
return names
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
def viewed(self):
self.views += 1
self.save(update_fields=['views'])
def comment_list(self):
cache_key = 'article_comments_{id}'.format(id=self.id)
value = cache.get(cache_key)
if value:
logger.info('get article comments:{id}'.format(id=self.id))
return value
else:
comments = self.comment_set.filter(is_enable=True).order_by('-id')
cache.set(cache_key, comments, 60 * 100)
logger.info('set article comments:{id}'.format(id=self.id))
return comments
def get_admin_url(self):
info = (self._meta.app_label, self._meta.model_name)
return reverse('admin:%s_%s_change' % info, args=(self.pk,))
@cache_decorator(expiration=60 * 100)
def next_article(self):
# 下一篇
return Article.objects.filter(
id__gt=self.id, status='p').order_by('id').first()
@cache_decorator(expiration=60 * 100)
def prev_article(self):
# 前一篇
return Article.objects.filter(id__lt=self.id, status='p').first()
def get_first_image_url(self):
"""
Get the first image url from article.body.
:return:
"""
match = re.search(r'!\[.*?\]\((.+?)\)', self.body)
if match:
return match.group(1)
return ""
class Category(BaseModel):
"""文章分类"""
name = models.CharField(_('category name'), max_length=30, unique=True)
parent_category = models.ForeignKey(
'self',
verbose_name=_('parent category'),
blank=True,
null=True,
on_delete=models.CASCADE)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
index = models.IntegerField(default=0, verbose_name=_('index'))
class Meta:
ordering = ['-index']
verbose_name = _('category')
verbose_name_plural = verbose_name
def get_absolute_url(self):
return reverse(
'blog:category_detail', kwargs={
'category_name': self.slug})
def __str__(self):
return self.name
@cache_decorator(60 * 60 * 10)
def get_category_tree(self):
"""
递归获得分类目录的父级
:return:
"""
categorys = []
def parse(category):
categorys.append(category)
if category.parent_category:
parse(category.parent_category)
parse(self)
return categorys
@cache_decorator(60 * 60 * 10)
def get_sub_categorys(self):
"""
获得当前分类目录所有子集
:return:
"""
categorys = []
all_categorys = Category.objects.all()
def parse(category):
if category not in categorys:
categorys.append(category)
childs = all_categorys.filter(parent_category=category)
for child in childs:
if category not in categorys:
categorys.append(child)
parse(child)
parse(self)
return categorys
class Tag(BaseModel):
"""文章标签"""
name = models.CharField(_('tag name'), max_length=30, unique=True)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
def __str__(self):
return self.name
name = models.CharField(_('分类名'), max_length=30, unique=True)
def get_absolute_url(self):
return reverse('blog:tag_detail', kwargs={'tag_name': self.slug})
@cache_decorator(60 * 60 * 10)
def get_article_count(self):
return Article.objects.filter(tags__name=self.name).distinct().count()
class Meta:
ordering = ['name']
verbose_name = _('tag')
verbose_name_plural = verbose_name
class Links(models.Model):
"""友情链接"""
name = models.CharField(_('link name'), max_length=30, unique=True)
link = models.URLField(_('link'))
sequence = models.IntegerField(_('order'), unique=True)
is_enable = models.BooleanField(
_('is show'), default=True, blank=False, null=False)
show_type = models.CharField(
_('show type'),
max_length=1,
choices=LinkShowType.choices,
default=LinkShowType.I)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_mod_time = models.DateTimeField(_('modify time'), default=now)
class Meta:
ordering = ['sequence']
verbose_name = _('link')
verbose_name_plural = verbose_name
def __str__(self):
return self.name
class SideBar(models.Model):
"""侧边栏,可以展示一些html内容"""
name = models.CharField(_('title'), max_length=100)
content = models.TextField(_('content'))
sequence = models.IntegerField(_('order'), unique=True)
is_enable = models.BooleanField(_('is enable'), default=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_mod_time = models.DateTimeField(_('modify time'), default=now)
class Meta:
ordering = ['sequence']
verbose_name = _('sidebar')
verbose_name_plural = verbose_name
def __str__(self):
return self.name
class BlogSettings(models.Model):
"""blog的配置"""
site_name = models.CharField(
_('site name'),
max_length=200,
null=False,
blank=False,
default='')
site_description = models.TextField(
_('site description'),
max_length=1000,
null=False,
blank=False,
default='')
site_seo_description = models.TextField(
_('site seo description'), max_length=1000, null=False, blank=False, default='')
site_keywords = models.TextField(
_('site keywords'),
max_length=1000,
null=False,
blank=False,
default='')
article_sub_length = models.IntegerField(_('article sub length'), default=300)
sidebar_article_count = models.IntegerField(_('sidebar article count'), default=10)
sidebar_comment_count = models.IntegerField(_('sidebar comment count'), default=5)
article_comment_count = models.IntegerField(_('article comment count'), default=5)
show_google_adsense = models.BooleanField(_('show adsense'), default=False)
google_adsense_codes = models.TextField(
_('adsense code'), max_length=2000, null=True, blank=True, default='')
open_site_comment = models.BooleanField(_('open site comment'), default=True)
global_header = models.TextField("公共头部", null=True, blank=True, default='')
global_footer = models.TextField("公共尾部", null=True, blank=True, default='')
beian_code = models.CharField(
'备案号',
max_length=2000,
null=True,
blank=True,
default='')
analytics_code = models.TextField(
"网站统计代码",
max_length=1000,
null=False,
blank=False,
default='')
show_gongan_code = models.BooleanField(
'是否显示公安备案号', default=False, null=False)
gongan_beiancode = models.TextField(
'公安备案号',
max_length=2000,
null=True,
blank=True,
default='')
comment_need_review = models.BooleanField(
'评论是否需要审核', default=False, null=False)
class Meta:
verbose_name = _('Website configuration')
verbose_name_plural = verbose_name
def __str__(self):
return self.site_name
def clean(self):
if BlogSettings.objects.exclude(id=self.id).count():
raise ValidationError(_('There can only be one configuration'))
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
from djangoblog.utils import cache
cache.clear()
return reverse('blog:category', kwargs={'category_name': self.name})

@ -1,13 +1,22 @@
from haystack import indexes
from blog.models import Article
from haystack import indexes # 引入 Haystack 索引相关模块
from blog.models import Article # 引入您的文章模型
# 定义一个针对 Article 模型的搜索索引类
class ArticleIndex(indexes.SearchIndex, indexes.Indexable):
"""
Haystack 搜索索引类用于为 Article 模型建立全文搜索索引
该索引将用于配合搜索引擎 WhooshElasticsearch实现文章内容的全文检索
"""
# 定义一个字段,作为文档的主要内容来源,通常用于存储要被全文检索的文本内容
# document=True 表示该字段是主文档字段use_template=True 表示内容将从模板生成
text = indexes.CharField(document=True, use_template=True)
# 必须定义的方法:返回当前索引对应的 Django 模型类
def get_model(self):
return Article
# 必须定义的方法:返回要被索引的模型对象查询集
# 这里只索引状态为 'p'(已发布)的文章,避免草稿等内容被搜索到
def index_queryset(self, using=None):
return self.get_model().objects.filter(status='p')

@ -380,7 +380,8 @@ def gravatar_url(email, size=40):
default_avatar_path = static('blog/img/avatar.png')
# 优先选择非默认头像的用户,否则选择第一个
non_default_users = [u for u in users_with_picture if u.picture != default_avatar_path and not u.picture.endswith('/avatar.png')]
non_default_users = [u for u in users_with_picture if
u.picture != default_avatar_path and not u.picture.endswith('/avatar.png')]
selected_user = non_default_users[0] if non_default_users else users_with_picture[0]
url = selected_user.picture

@ -2,41 +2,52 @@ import os
from django.conf import settings
from django.core.files.uploadedfile import SimpleUploadedFile
from django.core.management import call_command
from django.core.paginator import Paginator
from django.templatetags.static import static
from django.test import Client, RequestFactory, TestCase
from django.urls import reverse
from django.utils import timezone
from accounts.models import BlogUser
from blog.forms import BlogSearchForm
from blog.models import Article, Category, Tag, SideBar, Links
from blog.templatetags.blog_tags import load_pagination_info, load_articletags
from djangoblog.utils import get_current_site, get_sha256
from oauth.models import OAuthUser, OAuthConfig
# Create your tests here.
from django.core.management import call_command # 用于调用 Django 管理命令,如 build_index
from django.core.paginator import Paginator # 用于分页测试
from django.templatetags.static import static # 用于获取静态文件 URL
from django.test import Client, RequestFactory, TestCase # Django 测试客户端与测试基类
from django.urls import reverse # 用于反向解析 URL
from django.utils import timezone # 用于获取当前时间
# 引入自定义的模型
from accounts.models import BlogUser # 自定义用户模型
from blog.forms import BlogSearchForm # 搜索表单
from blog.models import Article, Category, Tag, SideBar, Links # 核心内容模型
from blog.templatetags.blog_tags import load_pagination_info, load_articletags # 自定义模板标签
from djangoblog.utils import get_current_site, get_sha256 # 工具函数获取当前站点、SHA256加密
from oauth.models import OAuthUser, OAuthConfig # 第三方登录相关模型
# 创建测试用例类,用于测试文章及相关功能
class ArticleTest(TestCase):
def setUp(self):
self.client = Client()
self.factory = RequestFactory()
"""
每个测试方法执行前都会调用用于初始化测试环境比如创建测试客户端等
"""
self.client = Client() # Django 提供的 HTTP 客户端,用于模拟请求
self.factory = RequestFactory() # 用于创建请求对象(较少用在此测试中)
def test_validate_article(self):
site = get_current_site().domain
"""
综合测试验证文章创建标签关联分类搜索分页用户登录静态资源命令行调用等
"""
site = get_current_site().domain # 获取当前站点域名
user = BlogUser.objects.get_or_create(
email="liangliangyy@gmail.com",
username="liangliangyy")[0]
username="liangliangyy")[0] # 创建或获取一个超级用户
user.set_password("liangliangyy")
user.is_staff = True
user.is_superuser = True
user.save()
# 模拟访问用户详情页、一些不存在的 admin 页面(验证是否能正常响应,或用于覆盖)
response = self.client.get(user.get_absolute_url())
self.assertEqual(response.status_code, 200)
response = self.client.get('/admin/servermanager/emailsendlog/')
response = self.client.get('admin/admin/logentry/')
# 创建一个侧边栏对象并保存
s = SideBar()
s.sequence = 1
s.name = 'test'
@ -44,30 +55,37 @@ class ArticleTest(TestCase):
s.is_enable = True
s.save()
# 创建一个分类
category = Category()
category.name = "category"
category.creation_time = timezone.now()
category.last_mod_time = timezone.now()
category.save()
# 创建一个标签
tag = Tag()
tag.name = "nicetag"
tag.save()
# 创建一篇文章,并关联作者、分类、类型、状态等
article = Article()
article.title = "nicetitle"
article.body = "nicecontent"
article.author = user
article.category = category
article.type = 'a'
article.status = 'p'
article.type = 'a' # 'a' 代表普通文章
article.status = 'p' # 'p' 代表已发布
article.save()
# 初始时没有关联任何标签
self.assertEqual(0, article.tags.count())
# 为文章添加一个标签
article.tags.add(tag)
article.save()
self.assertEqual(1, article.tags.count())
self.assertEqual(1, article.tags.count()) # 验证标签关联成功
# 批量创建 20 篇文章,都关联同一个标签,用于后续分页等测试
for i in range(20):
article = Article()
article.title = "nicetitle" + str(i)
@ -79,56 +97,75 @@ class ArticleTest(TestCase):
article.save()
article.tags.add(tag)
article.save()
# 如果启用了 Elasticsearch则构建索引并测试搜索
from blog.documents import ELASTICSEARCH_ENABLED
if ELASTICSEARCH_ENABLED:
call_command("build_index")
response = self.client.get('/search', {'q': 'nicetitle'})
self.assertEqual(response.status_code, 200)
call_command("build_index") # 调用构建搜索索引的管理命令
response = self.client.get('/search', {'q': 'nicetitle'}) # 搜索含有 nicetitle 的文章
self.assertEqual(response.status_code, 200) # 验证搜索页面能正常访问
# 访问某篇文章详情页,验证能正常打开
response = self.client.get(article.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 模拟通知爬虫(如百度蜘蛛)抓取该文章
from djangoblog.spider_notify import SpiderNotify
SpiderNotify.notify(article.get_absolute_url())
# 访问标签页、分类页,验证能正常响应
response = self.client.get(tag.get_absolute_url())
self.assertEqual(response.status_code, 200)
response = self.client.get(category.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 搜索一个不太可能存在的词,如 django仍应返回 200
response = self.client.get('/search', {'q': 'django'})
self.assertEqual(response.status_code, 200)
# 调用模板标签 load_articletags验证其返回值不为空
s = load_articletags(article)
self.assertIsNotNone(s)
# 登录用户
self.client.login(username='liangliangyy', password='liangliangyy')
# 访问归档页,验证能正常响应
response = self.client.get(reverse('blog:archives'))
self.assertEqual(response.status_code, 200)
p = Paginator(Article.objects.all(), settings.PAGINATE_BY)
self.check_pagination(p, '', '')
# 测试分页功能(普通文章列表)
p = Paginator(Article.objects.all(), settings.PAGINATE_BY) # 每页显示 settings.PAGINATE_BY 篇
self.check_pagination(p, '', '') # 自定义方法,验证分页链接有效
# 测试按标签分页
p = Paginator(Article.objects.filter(tags=tag), settings.PAGINATE_BY)
self.check_pagination(p, '分类标签归档', tag.slug)
# 测试按作者分页
p = Paginator(
Article.objects.filter(
author__username='liangliangyy'), settings.PAGINATE_BY)
self.check_pagination(p, '作者文章归档', 'liangliangyy')
# 测试按分类分页
p = Paginator(Article.objects.filter(category=category), settings.PAGINATE_BY)
self.check_pagination(p, '分类目录归档', category.slug)
# 测试搜索表单的 search 方法(即使未实际执行搜索)
f = BlogSearchForm()
f.search()
# self.client.login(username='liangliangyy', password='liangliangyy')
# 模拟百度站长平台 URL 通知
from djangoblog.spider_notify import SpiderNotify
SpiderNotify.baidu_notify([article.get_full_url()])
from blog.templatetags.blog_tags import gravatar_url, gravatar
# 调用模板标签 gravatar_url 和 gravatar验证其返回值
u = gravatar_url('liangliangyy@gmail.com')
u = gravatar('liangliangyy@gmail.com')
# 创建一个友情链接并访问其页面
link = Links(
sequence=1,
name="lylinux",
@ -137,21 +174,27 @@ class ArticleTest(TestCase):
response = self.client.get('/links.html')
self.assertEqual(response.status_code, 200)
# 访问 RSS 订阅源
response = self.client.get('/feed/')
self.assertEqual(response.status_code, 200)
# 访问 Sitemap
response = self.client.get('/sitemap.xml')
self.assertEqual(response.status_code, 200)
# 模拟访问一个不存在的 admin 删除页面或其他不存在的路由,预期返回 404 或其它
self.client.get("/admin/blog/article/1/delete/")
self.client.get('/admin/servermanager/emailsendlog/')
self.client.get('/admin/admin/logentry/')
self.client.get('/admin/admin/logentry/1/change/')
def check_pagination(self, p, type, value):
"""
自定义分页测试方法遍历每一页验证上一页/下一页链接均能正常访问
"""
for page in range(1, p.num_pages + 1):
s = load_pagination_info(p.page(page), type, value)
self.assertIsNotNone(s)
self.assertIsNotNone(s) # 确保返回分页信息不为空
if s['previous_url']:
response = self.client.get(s['previous_url'])
self.assertEqual(response.status_code, 200)
@ -160,14 +203,20 @@ class ArticleTest(TestCase):
self.assertEqual(response.status_code, 200)
def test_image(self):
"""
测试图片上传功能及一些工具函数 SHA256邮件发送用户头像保存
"""
import requests
rsp = requests.get(
'https://www.python.org/static/img/python-logo.png')
'https://www.python.org/static/img/python-logo.png') # 下载 Python 官方 Logo
imagepath = os.path.join(settings.BASE_DIR, 'python.png')
with open(imagepath, 'wb') as file:
file.write(rsp.content)
# 尝试未授权上传,预期返回 403
rsp = self.client.post('/upload')
self.assertEqual(rsp.status_code, 403)
# 使用签名和文件上传图片(模拟授权上传)
sign = get_sha256(get_sha256(settings.SECRET_KEY))
with open(imagepath, 'rb') as file:
imgfile = SimpleUploadedFile(
@ -177,16 +226,24 @@ class ArticleTest(TestCase):
'/upload?sign=' + sign, form_data, follow=True)
self.assertEqual(rsp.status_code, 200)
os.remove(imagepath)
# 测试邮件发送与用户头像保存工具函数
from djangoblog.utils import save_user_avatar, send_email
send_email(['qq@qq.com'], 'testTitle', 'testContent')
save_user_avatar(
'https://www.python.org/static/img/python-logo.png')
def test_errorpage(self):
"""
测试访问不存在的路由应该返回 404 页面
"""
rsp = self.client.get('/eee')
self.assertEqual(rsp.status_code, 404)
def test_commands(self):
"""
测试一系列 Django 管理命令的执行如构建索引百度推送创建测试数据清理缓存同步头像等
"""
user = BlogUser.objects.get_or_create(
email="liangliangyy@gmail.com",
username="liangliangyy")[0]
@ -224,9 +281,10 @@ class ArticleTest(TestCase):
from blog.documents import ELASTICSEARCH_ENABLED
if ELASTICSEARCH_ENABLED:
call_command("build_index")
call_command("ping_baidu", "all")
call_command("create_testdata")
call_command("clear_cache")
call_command("sync_user_avatar")
call_command("build_search_words")
call_command("build_index") # 构建搜索索引
call_command("ping_baidu", "all") # 通知百度收录所有文章/分类/标签
call_command("create_testdata") # 创建测试数据
call_command("clear_cache") # 清理缓存
call_command("sync_user_avatar") # 同步用户头像(如从 OAuth 获取)
call_command("build_search_words") # 构建搜索关键词(可能是标签/分类名等)

@ -1,62 +1,18 @@
from django.urls import path
from django.views.decorators.cache import cache_page
from . import views
app_name = "blog"
urlpatterns = [
path(
r'',
views.IndexView.as_view(),
name='index'),
path(
r'page/<int:page>/',
views.IndexView.as_view(),
name='index_page'),
path(
r'article/<int:year>/<int:month>/<int:day>/<int:article_id>.html',
views.ArticleDetailView.as_view(),
name='detailbyid'),
path(
r'category/<slug:category_name>.html',
views.CategoryDetailView.as_view(),
name='category_detail'),
path(
r'category/<slug:category_name>/<int:page>.html',
views.CategoryDetailView.as_view(),
name='category_detail_page'),
path(
r'author/<author_name>.html',
views.AuthorDetailView.as_view(),
name='author_detail'),
path(
r'author/<author_name>/<int:page>.html',
views.AuthorDetailView.as_view(),
name='author_detail_page'),
path(
r'tag/<slug:tag_name>.html',
views.TagDetailView.as_view(),
name='tag_detail'),
path(
r'tag/<slug:tag_name>/<int:page>.html',
views.TagDetailView.as_view(),
name='tag_detail_page'),
path(
'archives.html',
cache_page(
60 * 60)(
views.ArchivesView.as_view()),
name='archives'),
path(
'links.html',
views.LinkListView.as_view(),
name='links'),
path(
r'upload',
views.fileupload,
name='upload'),
path(
r'clean',
views.clean_cache_view,
name='clean'),
path('', views.IndexView.as_view(), name='index'), # 首页
path('article/<int:year>/<int:month>/<int:day>/<int:article_id>.html',
views.ArticleDetailView.as_view(), name='detailbyid'), # 文章详情
path('category/<slug:category_name>.html',
views.CategoryDetailView.as_view(), name='category_detail'), # 分类页
path('tag/<slug:tag_name>.html',
views.TagDetailView.as_view(), name='tag_detail'), # 标签页
path('archives.html', cache_page(60 * 60)(views.ArchivesView.as_view()), name='archives'), # 归档页缓存1小时
path('links.html', views.LinkListView.as_view(), name='links'), # 友链页
path('upload', views.fileupload, name='upload'), # 图床上传
path('clean', views.clean_cache_view, name='clean'), # 清缓存
]

@ -5,316 +5,288 @@ import uuid
from django.conf import settings
from django.core.paginator import Paginator
from django.http import HttpResponse, HttpResponseForbidden
from django.shortcuts import get_object_or_404
from django.shortcuts import render
from django.shortcuts import get_object_or_404, render
from django.templatetags.static import static
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django.views.decorators.csrf import csrf_exempt
from django.views.generic.detail import DetailView
from django.views.generic.list import ListView
from haystack.views import SearchView
from haystack.views import SearchView # Haystack 全文搜索视图
from blog.models import Article, Category, LinkShowType, Links, Tag
from comments.forms import CommentForm
from djangoblog.plugin_manage import hooks
from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
from djangoblog.utils import cache, get_blog_setting, get_sha256
from blog.models import Article, Category, LinkShowType, Links, Tag # 博客核心模型
from comments.forms import CommentForm # 评论表单
from djangoblog.plugin_manage import hooks # 插件管理系统
from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME # 插件钩子常量
from djangoblog.utils import cache, get_blog_setting, get_sha256 # 工具函数:缓存、站点配置、加密
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__) # 日志记录器
# -------------------------------
# 基础:通用文章列表视图(支持缓存、分页)
# -------------------------------
class ArticleListView(ListView):
# template_name属性用于指定使用哪个模板进行渲染
template_name = 'blog/article_index.html'
# context_object_name属性用于给上下文变量取名在模板中使用该名字
context_object_name = 'article_list'
# 页面类型,分类目录或标签列表等
page_type = ''
paginate_by = settings.PAGINATE_BY
page_kwarg = 'page'
link_type = LinkShowType.L
template_name = 'blog/article_index.html' # 默认模板
context_object_name = 'article_list' # 模板中使用的上下文变量名
page_type = '' # 页面类型描述,子类可重写
paginate_by = settings.PAGINATE_BY # 每页文章数,从配置中读取
page_kwarg = 'page' # URL 中页码参数名
link_type = LinkShowType.L # 友情链接展示类型,子类可重写
def get_view_cache_key(self):
return self.request.get['pages']
# 获取当前视图的缓存键(注意:原代码有误,应使用 self.request.GET 而非 self.request.get
return self.request.GET.get('pages', '') # 临时占位,实际应由子类实现
@property
def page_number(self):
# 获取当前页码,默认为 1
page_kwarg = self.page_kwarg
page = self.kwargs.get(
page_kwarg) or self.request.GET.get(page_kwarg) or 1
return page
return self.kwargs.get(page_kwarg) or self.request.GET.get(page_kwarg) or 1
def get_queryset_cache_key(self):
"""
子类重写.获得queryset的缓存key
"""
# 子类必须重写:返回当前页面数据对应的缓存键
raise NotImplementedError()
def get_queryset_data(self):
"""
子类重写.获取queryset的数据
"""
# 子类必须重写:返回当前页面要展示的数据(通常是 QuerySet
raise NotImplementedError()
def get_queryset_from_cache(self, cache_key):
'''
缓存页面数据
:param cache_key: 缓存key
:return:
'''
# 尝试从缓存中获取数据,若无则查询并缓存
value = cache.get(cache_key)
if value:
logger.info('get view cache.key:{key}'.format(key=cache_key))
logger.info(f'get view cache. key:{cache_key}')
return value
else:
article_list = self.get_queryset_data()
cache.set(cache_key, article_list)
logger.info('set view cache.key:{key}'.format(key=cache_key))
logger.info(f'set view cache. key:{cache_key}')
return article_list
def get_queryset(self):
'''
重写默认从缓存获取数据
:return:
'''
# 重写默认的查询集,优先从缓存中读取
key = self.get_queryset_cache_key()
value = self.get_queryset_from_cache(key)
return value
def get_context_data(self, **kwargs):
# 给模板上下文添加 linktype用于控制友情链接展示类型
kwargs['linktype'] = self.link_type
return super(ArticleListView, self).get_context_data(**kwargs)
return super().get_context_data(**kwargs)
# -------------------------------
# 首页视图:展示所有已发布文章
# -------------------------------
class IndexView(ArticleListView):
'''
首页
'''
# 友情链接类型
link_type = LinkShowType.I
link_type = LinkShowType.I # 首页链接类型为 ‘首页展示’
def get_queryset_data(self):
article_list = Article.objects.filter(type='a', status='p')
return article_list
# 只获取类型为 'a'(文章),状态为 'p'(已发布)的文章
return Article.objects.filter(type='a', status='p')
def get_queryset_cache_key(self):
cache_key = 'index_{page}'.format(page=self.page_number)
return cache_key
# 缓存键包含页码,如 index_1, index_2...
return f'index_{self.page_number}'
# -------------------------------
# 文章详情页
# -------------------------------
class ArticleDetailView(DetailView):
'''
文章详情页面
'''
template_name = 'blog/article_detail.html'
model = Article
pk_url_kwarg = 'article_id'
pk_url_kwarg = 'article_id' # URL 中的文章 ID 参数名
context_object_name = "article"
def get_context_data(self, **kwargs):
# 添加评论表单
comment_form = CommentForm()
# 获取当前文章的所有评论,并筛选出顶级评论(无父评论)
article_comments = self.object.comment_list()
parent_comments = article_comments.filter(parent_comment=None)
blog_setting = get_blog_setting()
paginator = Paginator(parent_comments, blog_setting.article_comment_count)
blog_setting = get_blog_setting() # 获取博客配置
paginator = Paginator(parent_comments, blog_setting.article_comment_count) # 评论分页
page = self.request.GET.get('comment_page', '1')
if not page.isnumeric():
page = 1
else:
try:
page = int(page)
if page < 1:
page = 1
if page > paginator.num_pages:
page = paginator.num_pages
except:
page = 1
p_comments = paginator.page(page)
p_comments = paginator.page(page) # 当前页的评论
next_page = p_comments.next_page_number() if p_comments.has_next() else None
prev_page = p_comments.previous_page_number() if p_comments.has_previous() else None
# 若有下一页/上一页,在上下文中添加对应 URL带锚点定位到评论区
if next_page:
kwargs[
'comment_next_page_url'] = self.object.get_absolute_url() + f'?comment_page={next_page}#commentlist-container'
kwargs['comment_next_page_url'] = f"{self.object.get_absolute_url()}?comment_page={next_page}#commentlist-container"
if prev_page:
kwargs[
'comment_prev_page_url'] = self.object.get_absolute_url() + f'?comment_page={prev_page}#commentlist-container'
kwargs['comment_prev_page_url'] = f"{self.object.get_absolute_url()}?comment_page={prev_page}#commentlist-container"
kwargs['form'] = comment_form
kwargs['article_comments'] = article_comments
kwargs['p_comments'] = p_comments
kwargs['comment_count'] = len(
article_comments) if article_comments else 0
kwargs['comment_count'] = len(article_comments) if article_comments else 0
kwargs['next_article'] = self.object.next_article # 下一篇文章
kwargs['prev_article'] = self.object.prev_article # 上一篇文章
kwargs['next_article'] = self.object.next_article
kwargs['prev_article'] = self.object.prev_article
context = super().get_context_data(**kwargs)
# 调用插件钩子:文章内容获取后通知
hooks.run_action('after_article_body_get', article=self.object, request=self.request)
context = super(ArticleDetailView, self).get_context_data(**kwargs)
article = self.object
# Action Hook, 通知插件"文章详情已获取"
hooks.run_action('after_article_body_get', article=article, request=self.request)
return context
# -------------------------------
# 分类页视图
# -------------------------------
class CategoryDetailView(ArticleListView):
'''
分类目录列表
'''
page_type = "分类目录归档"
def get_queryset_data(self):
slug = self.kwargs['category_name']
slug = self.kwargs['category_name'] # 从 URL 获取分类别名
category = get_object_or_404(Category, slug=slug)
categoryname = category.name
self.categoryname = categoryname
categorynames = list(
map(lambda c: c.name, category.get_sub_categorys()))
article_list = Article.objects.filter(
category__name__in=categorynames, status='p')
return article_list
categorynames = [c.name for c in category.get_sub_categorys()] # 获取所有子分类名称
# 获取这些分类下的所有已发布文章
return Article.objects.filter(category__name__in=categorynames, status='p')
def get_queryset_cache_key(self):
slug = self.kwargs['category_name']
category = get_object_or_404(Category, slug=slug)
categoryname = category.name
self.categoryname = categoryname
cache_key = 'category_list_{categoryname}_{page}'.format(
categoryname=categoryname, page=self.page_number)
cache_key = f'category_list_{category.name}_{self.page_number}'
return cache_key
def get_context_data(self, **kwargs):
categoryname = self.categoryname
categoryname = self.kwargs['category_name']
try:
categoryname = categoryname.split('/')[-1]
except BaseException:
categoryname = categoryname.split('/')[-1] # 尝试提取最后一段(美化展示用)
except:
pass
kwargs['page_type'] = CategoryDetailView.page_type
kwargs['page_type'] = self.page_type
kwargs['tag_name'] = categoryname
return super(CategoryDetailView, self).get_context_data(**kwargs)
return super().get_context_data(**kwargs)
# -------------------------------
# 作者页视图
# -------------------------------
class AuthorDetailView(ArticleListView):
'''
作者详情页
'''
page_type = '作者文章归档'
def get_queryset_cache_key(self):
from uuslug import slugify
author_name = slugify(self.kwargs['author_name'])
cache_key = 'author_{author_name}_{page}'.format(
author_name=author_name, page=self.page_number)
return cache_key
return f'author_{author_name}_{self.page_number}'
def get_queryset_data(self):
author_name = self.kwargs['author_name']
article_list = Article.objects.filter(
author__username=author_name, type='a', status='p')
return article_list
return Article.objects.filter(author__username=author_name, type='a', status='p')
def get_context_data(self, **kwargs):
author_name = self.kwargs['author_name']
kwargs['page_type'] = AuthorDetailView.page_type
kwargs['page_type'] = self.page_type
kwargs['tag_name'] = author_name
return super(AuthorDetailView, self).get_context_data(**kwargs)
return super().get_context_data(**kwargs)
# -------------------------------
# 标签页视图
# -------------------------------
class TagDetailView(ArticleListView):
'''
标签列表页面
'''
page_type = '分类标签归档'
def get_queryset_data(self):
slug = self.kwargs['tag_name']
tag = get_object_or_404(Tag, slug=slug)
tag_name = tag.name
self.name = tag_name
article_list = Article.objects.filter(
tags__name=tag_name, type='a', status='p')
return article_list
return Article.objects.filter(tags__name=tag.name, type='a', status='p')
def get_queryset_cache_key(self):
slug = self.kwargs['tag_name']
tag = get_object_or_404(Tag, slug=slug)
tag_name = tag.name
self.name = tag_name
cache_key = 'tag_{tag_name}_{page}'.format(
tag_name=tag_name, page=self.page_number)
return cache_key
return f'tag_{tag.name}_{self.page_number}'
def get_context_data(self, **kwargs):
# tag_name = self.kwargs['tag_name']
tag_name = self.name
kwargs['page_type'] = TagDetailView.page_type
tag_name = self.kwargs['tag_name']
kwargs['page_type'] = self.page_type
kwargs['tag_name'] = tag_name
return super(TagDetailView, self).get_context_data(**kwargs)
return super().get_context_data(**kwargs)
# -------------------------------
# 归档页视图:展示所有已发布文章
# -------------------------------
class ArchivesView(ArticleListView):
'''
文章归档页面
'''
page_type = '文章归档'
paginate_by = None
paginate_by = None # 不分页
page_kwarg = None
template_name = 'blog/article_archives.html'
def get_queryset_data(self):
return Article.objects.filter(status='p').all()
return Article.objects.filter(status='p')
def get_queryset_cache_key(self):
cache_key = 'archives'
return cache_key
return 'archives'
# -------------------------------
# 友情链接页
# -------------------------------
class LinkListView(ListView):
model = Links
template_name = 'blog/links_list.html'
def get_queryset(self):
return Links.objects.filter(is_enable=True)
return Links.objects.filter(is_enable=True) # 只展示启用的链接
# -------------------------------
# Haystack 搜索视图
# -------------------------------
class EsSearchView(SearchView):
def get_context(self):
paginator, page = self.build_page()
context = {
"query": self.query,
"form": self.form,
"page": page,
"paginator": paginator,
"suggestion": None,
"query": self.query, # 搜索关键词
"form": self.form, # 搜索表单
"page": page, # 当前页
"paginator": paginator, # 分页器
"suggestion": None, # 搜索建议,可后续补充
}
# 如果后端支持拼写建议,则添加
if hasattr(self.results, "query") and self.results.query.backend.include_spelling:
context["suggestion"] = self.results.query.get_spelling_suggestion()
context.update(self.extra_context())
context.update(self.extra_context()) # 添加额外上下文
return context
# -------------------------------
# 图床上传接口(带签名校验,仅限 POST
# -------------------------------
@csrf_exempt
def fileupload(request):
"""
该方法需自己写调用端来上传图片该方法仅提供图床功能
:param request:
:return:
"""
if request.method == 'POST':
sign = request.GET.get('sign', None)
if not sign:
return HttpResponseForbidden()
# 校验签名(双重 SHA256与 settings.SECRET_KEY 相关)
if not sign == get_sha256(get_sha256(settings.SECRET_KEY)):
return HttpResponseForbidden()
response = []
for filename in request.FILES:
timestr = timezone.now().strftime('%Y/%m/%d')
imgextensions = ['jpg', 'png', 'jpeg', 'bmp']
fname = u''.join(str(filename))
isimage = len([i for i in imgextensions if fname.find(i) >= 0]) > 0
fname = ''.join(str(filename))
isimage = any(ext in fname.lower() for ext in imgextensions)
base_dir = os.path.join(settings.STATICFILES, "files" if not isimage else "image", timestr)
if not os.path.exists(base_dir):
os.makedirs(base_dir)
@ -328,48 +300,45 @@ def fileupload(request):
from PIL import Image
image = Image.open(savepath)
image.save(savepath, quality=20, optimize=True)
url = static(savepath)
url = static(savepath) # 生成静态文件访问 URL
response.append(url)
return HttpResponse(response)
else:
return HttpResponse("only for post")
def page_not_found_view(
request,
exception,
template_name='blog/error_page.html'):
# -------------------------------
# 错误页面视图
# -------------------------------
def page_not_found_view(request, exception, template_name='blog/error_page.html'):
if exception:
logger.error(exception)
url = request.get_full_path()
return render(request,
template_name,
{'message': _('Sorry, the page you requested is not found, please click the home page to see other?'),
'statuscode': '404'},
status=404)
return render(request, template_name, {
'message': _('Sorry, the page you requested is not found, please click the home page to see other?'),
'statuscode': '404'
}, status=404)
def server_error_view(request, template_name='blog/error_page.html'):
return render(request,
template_name,
{'message': _('Sorry, the server is busy, please click the home page to see other?'),
'statuscode': '500'},
status=500)
return render(request, template_name, {
'message': _('Sorry, the server is busy, please click the home page to see other?'),
'statuscode': '500'
}, status=500)
def permission_denied_view(
request,
exception,
template_name='blog/error_page.html'):
def permission_denied_view(request, exception, template_name='blog/error_page.html'):
if exception:
logger.error(exception)
return render(
request, template_name, {
'message': _('Sorry, you do not have permission to access this page?'),
'statuscode': '403'}, status=403)
return render(request, template_name, {
'message': _('Sorry, you do not have permission to access this page?'),
'statuscode': '403'
}, status=403)
# -------------------------------
# 手动清理缓存视图(通常用于后台或调试)
# -------------------------------
def clean_cache_view(request):
cache.clear()
return HttpResponse('ok')

@ -1,38 +1,69 @@
# Generated by Django 4.1.7 on 2023-03-02 07:14
# 由 Django 4.1.7 生成时间2023-03-02 07:14
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
from django.conf import settings # 导入 Django 的 settings用于访问 AUTH_USER_MODEL
from django.db import migrations, models # 导入迁移和模型相关功能
import django.db.models.deletion # 导入外键删除策略(如 CASCADE
import django.utils.timezone # 导入 Django 的时区工具,用于默认时间
class Migration(migrations.Migration):
initial = True
initial = True # 这是该应用的第一个迁移(初始迁移)
dependencies = [
('blog', '0001_initial'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('blog', '0001_initial'), # 依赖 blog 应用的 0001_initial 迁移(可能是 Article 模型)
migrations.swappable_dependency(settings.AUTH_USER_MODEL), # 依赖用户模型(可替换,如自定义用户模型)
]
operations = [
migrations.CreateModel(
name='Comment',
name='Comment', # 创建名为 'Comment' 的模型
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('body', models.TextField(max_length=300, verbose_name='正文')),
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
('is_enable', models.BooleanField(default=True, verbose_name='是否显示')),
('article', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='blog.article', verbose_name='文章')),
('author', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='作者')),
('parent_comment', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='comments.comment', verbose_name='上级评论')),
('id', models.BigAutoField( # 主键 ID自增 Big Integer
auto_created=True, # 自动创建
primary_key=True, # 设为主键
serialize=False, # 不序列化(通常用于 API
verbose_name='ID' # 后台显示名称
)),
('body', models.TextField( # 评论正文,文本字段
max_length=300, # 最大长度 300 字符
verbose_name='正文' # 后台显示名称
)),
('created_time', models.DateTimeField( # 评论创建时间
default=django.utils.timezone.now, # 默认当前时间
verbose_name='创建时间' # 后台显示名称
)),
('last_mod_time', models.DateTimeField( # 评论最后修改时间
default=django.utils.timezone.now, # 默认当前时间
verbose_name='修改时间' # 后台显示名称
)),
('is_enable', models.BooleanField( # 是否显示该评论
default=True, # 默认 True显示
verbose_name='是否显示' # 后台显示名称
)),
('article', models.ForeignKey( # 外键关联到 blog.Article文章
on_delete=django.db.models.deletion.CASCADE, # 级联删除(文章删了,评论也删)
to='blog.article', # 关联的模型
verbose_name='文章' # 后台显示名称
)),
('author', models.ForeignKey( # 外键关联到用户(评论作者)
on_delete=django.db.models.deletion.CASCADE, # 级联删除(用户删了,评论也删)
to=settings.AUTH_USER_MODEL, # 关联的用户模型
verbose_name='作者' # 后台显示名称
)),
('parent_comment', models.ForeignKey( # 外键关联到父级评论(用于回复)
blank=True, # 允许为空(非回复评论)
null=True, # 数据库允许 NULL
on_delete=django.db.models.deletion.CASCADE, # 级联删除(父评论删了,回复也删)
to='comments.comment', # 关联自身(评论回复评论)
verbose_name='上级评论' # 后台显示名称
)),
],
options={
'verbose_name': '评论',
'verbose_name_plural': '评论',
'ordering': ['-id'],
'get_latest_by': 'id',
'verbose_name': '评论', # 单数后台显示名称
'verbose_name_plural': '评论', # 复数后台显示名称
'ordering': ['-id'], # 默认按 ID 降序(最新评论在前)
'get_latest_by': 'id', # 获取最新评论的依据是 ID
},
),
]

@ -1,18 +1,20 @@
# Generated by Django 4.1.7 on 2023-04-24 13:48
# 由 Django 4.1.7 生成时间2023-04-24 13:48
from django.db import migrations, models
from django.db import migrations, models # 导入迁移和模型功能
class Migration(migrations.Migration):
dependencies = [
('comments', '0001_initial'),
('comments', '0001_initial'), # 依赖前一个迁移0001_initial
]
operations = [
migrations.AlterField(
model_name='comment',
name='is_enable',
field=models.BooleanField(default=False, verbose_name='是否显示'),
model_name='comment', # 修改 Comment 模型
name='is_enable', # 修改 is_enable 字段
field=models.BooleanField( # 仍然是 BooleanField
default=False, # 默认值从 True 改为 False默认不显示评论
verbose_name='是否显示' # 后台显示名称不变
),
),
]

@ -1,60 +1,87 @@
# Generated by Django 4.2.5 on 2023-09-06 13:13
# 由 Django 4.2.5 生成时间2023-09-06 13:13
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import django.utils.timezone
from django.conf import settings # 导入 settings用户模型
from django.db import migrations, models # 导入迁移和模型功能
import django.db.models.deletion # 导入外键删除策略
import django.utils.timezone # 导入时区工具
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('blog', '0005_alter_article_options_alter_category_options_and_more'),
('comments', '0002_alter_comment_is_enable'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL), # 依赖用户模型
('blog', '0005_alter_article_options_alter_category_options_and_more'), # 依赖 blog 的某个迁移
('comments', '0002_alter_comment_is_enable'), # 依赖前一个迁移0002_alter_comment_is_enable
]
operations = [
migrations.AlterModelOptions(
name='comment',
options={'get_latest_by': 'id', 'ordering': ['-id'], 'verbose_name': 'comment', 'verbose_name_plural': 'comment'},
name='comment', # 修改 Comment 模型
options={
'get_latest_by': 'id', # 获取最新评论的依据仍然是 ID
'ordering': ['-id'], # 默认按 ID 降序(最新评论在前)
'verbose_name': 'comment', # 单数后台显示名称改为英文
'verbose_name_plural': 'comment', # 复数后台显示名称改为英文
},
),
migrations.RemoveField(
model_name='comment',
name='created_time',
name='created_time', # 移除旧字段:创建时间
),
migrations.RemoveField(
model_name='comment',
name='last_mod_time',
name='last_mod_time', # 移除旧字段:最后修改时间
),
migrations.AddField(
model_name='comment',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
name='creation_time', # 新增字段:创建时间(更清晰的命名)
field=models.DateTimeField(
default=django.utils.timezone.now, # 默认当前时间
verbose_name='creation time' # 后台显示名称改为英文
),
),
migrations.AddField(
model_name='comment',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
name='last_modify_time', # 新增字段:最后修改时间(更清晰的命名)
field=models.DateTimeField(
default=django.utils.timezone.now, # 默认当前时间
verbose_name='last modify time' # 后台显示名称改为英文
),
),
migrations.AlterField(
model_name='comment',
name='article',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='blog.article', verbose_name='article'),
name='article', # 调整 article 外键
field=models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, # 级联删除
to='blog.article', # 关联文章
verbose_name='article' # 后台显示名称改为英文
),
),
migrations.AlterField(
model_name='comment',
name='author',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'),
name='author', # 调整 author 外键
field=models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE, # 级联删除
to=settings.AUTH_USER_MODEL, # 关联用户
verbose_name='author' # 后台显示名称改为英文
),
),
migrations.AlterField(
model_name='comment',
name='is_enable',
field=models.BooleanField(default=False, verbose_name='enable'),
name='is_enable', # 再次调整 is_enable 默认值(确保是 False
field=models.BooleanField(
default=False, # 默认不显示评论
verbose_name='enable' # 后台显示名称改为英文
),
),
migrations.AlterField(
model_name='comment',
name='parent_comment',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='comments.comment', verbose_name='parent comment'),
name='parent_comment', # 调整 parent_comment 外键
field=models.ForeignKey(
blank=True, # 允许为空(非回复评论)
null=True, # 数据库允许 NULL
on_delete=django.db.models.deletion.CASCADE, # 级联删除
to='comments.comment', # 关联自身(评论回复评论)
verbose_name='parent comment' # 后台显示名称改为英文
),
),
]

@ -1,30 +1,77 @@
from django import template
from django import template # 导入 Django 模板系统核心模块
register = template.Library()
register = template.Library() # 创建模板标签注册器实例
# =============================================================================
# 1. 递归获取评论子评论simple_tag
# =============================================================================
@register.simple_tag
def parse_commenttree(commentlist, comment):
"""获得当前评论子评论的列表
用法: {% parse_commenttree article_comments comment as childcomments %}
"""
datas = []
功能递归查找并返回某个评论的所有子评论支持无限层级嵌套
适用场景在模板中获取某条评论下的所有回复如评论区的楼层回复
def parse(c):
childs = commentlist.filter(parent_comment=c, is_enable=True)
for child in childs:
datas.append(child)
parse(child)
参数说明
- commentlist: 评论查询集通常是 Article.comments.all() 或类似 QuerySet
- comment: 当前评论对象要查找其子评论的父评论
parse(comment)
return datas
返回值包含所有子评论的列表按递归顺序排列
模板用法示例
{% parse_commenttree article_comments comment as child_comments %}
{% for child in child_comments %}
{{ child.body }} {# 显示子评论内容 #}
{% endfor %}
"""
child_comments = [] # 初始化存储子评论的空列表
def recursive_parse(current_comment):
"""
内部递归函数深度优先遍历查找子评论
逻辑查找当前评论的所有直接子评论并对每个子评论继续递归查找
"""
# 查询条件parent_comment=当前评论 且 is_enable=True只显示启用状态的评论
direct_children = commentlist.filter(
parent_comment=current_comment,
is_enable=True
)
for child in direct_children:
child_comments.append(child) # 将子评论加入结果列表
recursive_parse(child) # 递归查找该子评论的子评论(深度优先)
recursive_parse(comment) # 从传入的评论开始递归查找
return child_comments # 返回完整的子评论列表
# =============================================================================
# 2. 渲染单个评论项inclusion_tag
# =============================================================================
@register.inclusion_tag('comments/tags/comment_item.html')
def show_comment_item(comment, ischild):
"""评论"""
depth = 1 if ischild else 2
def show_comment_item(comment, is_child_comment):
"""
功能渲染单个评论项并控制其显示层级用于区分顶级评论和回复评论
适用场景在评论列表中差异化显示不同层级的评论如缩进回复评论
参数说明
- comment: 要渲染的评论对象
- is_child_comment: 布尔值True表示这是回复评论子评论False表示顶级评论
返回值包含评论对象和层级信息的字典
模板文件comments/tags/comment_item.html需自行创建
模板用法示例
{# 渲染顶级评论(主评论)#}
{% show_comment_item main_comment False %}
{# 渲染回复评论(子评论)#}
{% show_comment_item reply_comment True %}
"""
# 设置显示层级:子评论=1缩进更多顶级评论=2正常显示
display_level = 1 if is_child_comment else 2
return {
'comment_item': comment,
'depth': depth
'comment_item': comment, # 传递评论对象给模板
'depth': display_level # 传递层级信息(控制样式)
}
Loading…
Cancel
Save