gs注释 #7

Merged
pqnvcz97o merged 3 commits from gs_branch into develop 5 months ago

@ -146,7 +146,7 @@ python manage.py runserver
## 🙏 鸣谢
特别感谢 **JetBrains** 为本项目提供的免费开源许可证。gs ls syj zyd164
特别感谢 **JetBrains** 为本项目提供的免费开源许可证。gs130 ls syj zyd164
<p align="center">
<a href="https://www.jetbrains.com/?from=DjangoBlog">

@ -9,14 +9,21 @@ from .models import BlogUser
class BlogUserCreationForm(forms.ModelForm):
"""
用于在管理员界面创建BlogUser的表单
"""
# 密码输入字段
password1 = forms.CharField(label=_('password'), widget=forms.PasswordInput)
password2 = forms.CharField(label=_('Enter password again'), widget=forms.PasswordInput)
class Meta:
model = BlogUser
fields = ('email',)
fields = ('email',) # 表单包含的字段
def clean_password2(self):
"""
验证两个密码字段是否匹配
"""
# Check that the two password entries match
password1 = self.cleaned_data.get("password1")
password2 = self.cleaned_data.get("password2")
@ -25,28 +32,39 @@ class BlogUserCreationForm(forms.ModelForm):
return password2
def save(self, commit=True):
"""
以哈希格式保存密码
"""
# Save the provided password in hashed format
user = super().save(commit=False)
user.set_password(self.cleaned_data["password1"])
if commit:
user.source = 'adminsite'
user.source = 'adminsite' # 设置用户来源为管理站点
user.save()
return user
class BlogUserChangeForm(UserChangeForm):
"""
用于在管理员界面修改BlogUser信息的表单
"""
class Meta:
model = BlogUser
fields = '__all__'
field_classes = {'username': UsernameField}
fields = '__all__' # 包含所有字段
field_classes = {'username': UsernameField} # 指定username字段的类型
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
class BlogUserAdmin(UserAdmin):
form = BlogUserChangeForm
add_form = BlogUserCreationForm
"""
BlogUser模型在管理员界面的配置类
"""
form = BlogUserChangeForm # 修改用户时使用的表单
add_form = BlogUserCreationForm # 创建用户时使用的表单
# 在列表中显示的字段
list_display = (
'id',
'nickname',
@ -55,5 +73,10 @@ class BlogUserAdmin(UserAdmin):
'last_login',
'date_joined',
'source')
# 在列表中可以点击跳转到编辑页的字段
list_display_links = ('id', 'username')
# 默认排序方式
ordering = ('-id',)

@ -2,4 +2,8 @@ from django.apps import AppConfig
class AccountsConfig(AppConfig):
name = 'accounts'
"""
Django应用配置类用于配置accounts应用的基本信息
"""
name = 'accounts' # 定义应用的名称,与项目中的应用目录名一致

@ -9,28 +9,43 @@ from .models import BlogUser
class LoginForm(AuthenticationForm):
"""
用户登录表单继承自Django内置的AuthenticationForm
"""
def __init__(self, *args, **kwargs):
super(LoginForm, self).__init__(*args, **kwargs)
# 自定义用户名输入框的样式和属性
self.fields['username'].widget = widgets.TextInput(
attrs={'placeholder': "username", "class": "form-control"})
# 自定义密码输入框的样式和属性
self.fields['password'].widget = widgets.PasswordInput(
attrs={'placeholder': "password", "class": "form-control"})
class RegisterForm(UserCreationForm):
"""
用户注册表单继承自Django内置的UserCreationForm
"""
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
# 自定义用户名输入框样式
self.fields['username'].widget = widgets.TextInput(
attrs={'placeholder': "username", "class": "form-control"})
# 自定义邮箱输入框样式
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})
# 自定义密码输入框样式
self.fields['password1'].widget = widgets.PasswordInput(
attrs={'placeholder': "password", "class": "form-control"})
# 自定义确认密码输入框样式
self.fields['password2'].widget = widgets.PasswordInput(
attrs={'placeholder': "repeat password", "class": "form-control"})
def clean_email(self):
"""
验证邮箱是否已经被注册
"""
email = self.cleaned_data['email']
if get_user_model().objects.filter(email=email).exists():
raise ValidationError(_("email already exists"))
@ -38,10 +53,14 @@ class RegisterForm(UserCreationForm):
class Meta:
model = get_user_model()
fields = ("username", "email")
fields = ("username", "email") # 定义表单包含的字段
class ForgetPasswordForm(forms.Form):
"""
忘记密码表单用于用户重置密码
"""
# 新密码输入字段
new_password1 = forms.CharField(
label=_("New password"),
widget=forms.PasswordInput(
@ -52,6 +71,7 @@ class ForgetPasswordForm(forms.Form):
),
)
# 确认新密码字段
new_password2 = forms.CharField(
label="确认密码",
widget=forms.PasswordInput(
@ -62,6 +82,7 @@ class ForgetPasswordForm(forms.Form):
),
)
# 邮箱字段
email = forms.EmailField(
label='邮箱',
widget=forms.TextInput(
@ -72,6 +93,7 @@ class ForgetPasswordForm(forms.Form):
),
)
# 验证码字段
code = forms.CharField(
label=_('Code'),
widget=forms.TextInput(
@ -83,6 +105,9 @@ class ForgetPasswordForm(forms.Form):
)
def clean_new_password2(self):
"""
验证两次输入的密码是否一致并验证密码强度
"""
password1 = self.data.get("new_password1")
password2 = self.data.get("new_password2")
if password1 and password2 and password1 != password2:
@ -92,6 +117,9 @@ class ForgetPasswordForm(forms.Form):
return password2
def clean_email(self):
"""
验证邮箱是否存在
"""
user_email = self.cleaned_data.get("email")
if not BlogUser.objects.filter(
email=user_email
@ -101,6 +129,9 @@ class ForgetPasswordForm(forms.Form):
return user_email
def clean_code(self):
"""
验证验证码是否正确
"""
code = self.cleaned_data.get("code")
error = utils.verify(
email=self.cleaned_data.get("email"),
@ -112,6 +143,10 @@ class ForgetPasswordForm(forms.Form):
class ForgetPasswordCodeForm(forms.Form):
"""
忘记密码时获取验证码的表单
"""
email = forms.EmailField(
label=_('Email'),
)

@ -7,43 +7,66 @@ import django.utils.timezone
class Migration(migrations.Migration):
# 标记这是一个初始迁移文件
initial = True
# 定义依赖关系依赖于auth应用的0012_alter_user_first_name_max_length迁移
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
]
# 定义具体的操作
operations = [
# 创建BlogUser模型
migrations.CreateModel(
name='BlogUser',
fields=[
# 主键字段自动创建的BigAutoField
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 密码字段最大长度128字符
('password', models.CharField(max_length=128, verbose_name='password')),
# 最后登录时间,可为空
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
# 超级用户状态,用于标识是否具有所有权限
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
# 用户名字段,唯一且有验证器
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
# 名字字段,可为空
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
# 姓氏字段,可为空
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
# 邮箱地址字段,可为空
('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
# 员工状态,标识用户是否可以登录管理站点
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
# 活跃状态,标识用户是否应该被视为活跃用户
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
# 加入日期,默认为当前时间
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
# 昵称字段,可为空
('nickname', models.CharField(blank=True, max_length=100, verbose_name='昵称')),
# 创建时间,默认为当前时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 最后修改时间,默认为当前时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
# 创建来源字段,可为空
('source', models.CharField(blank=True, max_length=100, verbose_name='创建来源')),
# 用户组关联字段,多对多关系
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
# 用户权限字段,多对多关系
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
],
# 模型选项配置
options={
'verbose_name': '用户',
'verbose_name_plural': '用户',
'ordering': ['-id'],
'get_latest_by': 'id',
'verbose_name': '用户', # 单数名称
'verbose_name_plural': '用户', # 复数名称
'ordering': ['-id'], # 默认排序方式
'get_latest_by': 'id', # 获取最新记录的字段
},
# 模型管理器
managers=[
('objects', django.contrib.auth.models.UserManager()),
],
),
]

@ -5,42 +5,56 @@ import django.utils.timezone
class Migration(migrations.Migration):
# 定义该迁移文件的依赖关系依赖于accounts应用的0001_initial迁移
dependencies = [
('accounts', '0001_initial'),
]
# 定义具体的迁移操作
operations = [
# 修改BlogUser模型的选项配置
migrations.AlterModelOptions(
name='bloguser',
options={'get_latest_by': 'id', 'ordering': ['-id'], 'verbose_name': 'user', 'verbose_name_plural': 'user'},
options={
'get_latest_by': 'id', # 获取最新记录的字段
'ordering': ['-id'], # 默认排序方式
'verbose_name': 'user', # 单数名称(英文)
'verbose_name_plural': 'user' # 复数名称(英文)
},
),
# 移除BlogUser模型中的created_time字段
migrations.RemoveField(
model_name='bloguser',
name='created_time',
),
# 移除BlogUser模型中的last_mod_time字段
migrations.RemoveField(
model_name='bloguser',
name='last_mod_time',
),
# 添加creation_time字段到BlogUser模型
migrations.AddField(
model_name='bloguser',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 添加last_modify_time字段到BlogUser模型
migrations.AddField(
model_name='bloguser',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
),
# 修改BlogUser模型中的nickname字段
migrations.AlterField(
model_name='bloguser',
name='nickname',
field=models.CharField(blank=True, max_length=100, verbose_name='nick name'),
),
# 修改BlogUser模型中的source字段
migrations.AlterField(
model_name='bloguser',
name='source',
field=models.CharField(blank=True, max_length=100, verbose_name='create source'),
),
]

@ -9,27 +9,45 @@ from djangoblog.utils import get_current_site
# Create your models here.
class BlogUser(AbstractUser):
"""
博客用户模型继承自Django的AbstractUser
扩展了用户的基本信息
"""
# 用户昵称,可为空
nickname = models.CharField(_('nick name'), max_length=100, blank=True)
# 用户创建时间,默认为当前时间
creation_time = models.DateTimeField(_('creation time'), default=now)
# 用户最后修改时间,默认为当前时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
# 用户创建来源,可为空
source = models.CharField(_('create source'), max_length=100, blank=True)
def get_absolute_url(self):
"""
获取用户详情页的绝对URL
"""
return reverse(
'blog:author_detail', kwargs={
'author_name': self.username})
def __str__(self):
"""
定义对象的字符串表示返回用户的邮箱
"""
return self.email
def get_full_url(self):
"""
获取用户的完整URL地址
"""
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
return url
class Meta:
ordering = ['-id']
verbose_name = _('user')
verbose_name_plural = verbose_name
get_latest_by = 'id'
ordering = ['-id'] # 默认按ID降序排列
verbose_name = _('user') # 单数名称
verbose_name_plural = verbose_name # 复数名称
get_latest_by = 'id' # 获取最新记录的字段

@ -12,17 +12,26 @@ from . import utils
# Create your tests here.
class AccountTest(TestCase):
"""
账户相关功能的测试类
"""
def setUp(self):
self.client = Client()
self.factory = RequestFactory()
"""
测试前的准备工作创建测试客户端请求工厂和测试用户
"""
self.client = Client() # 创建测试客户端
self.factory = RequestFactory() # 创建请求工厂
self.blog_user = BlogUser.objects.create_user(
username="test",
email="admin@admin.com",
password="12345678"
)
self.new_test = "xxx123--="
self.new_test = "xxx123--=" # 测试用的新密码
def test_validate_account(self):
"""
测试账户验证功能
"""
site = get_current_site().domain
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
@ -30,6 +39,7 @@ class AccountTest(TestCase):
password="qwer!@#$ggg")
testuser = BlogUser.objects.get(username='liangliangyy1')
# 测试登录
loginresult = self.client.login(
username='liangliangyy1',
password='qwer!@#$ggg')
@ -37,12 +47,14 @@ class AccountTest(TestCase):
response = self.client.get('/admin/')
self.assertEqual(response.status_code, 200)
# 创建测试分类
category = Category()
category.name = "categoryaaa"
category.creation_time = timezone.now()
category.last_modify_time = timezone.now()
category.save()
# 创建测试文章
article = Article()
article.title = "nicetitleaaa"
article.body = "nicecontentaaa"
@ -52,14 +64,19 @@ class AccountTest(TestCase):
article.status = 'p'
article.save()
# 测试访问文章管理URL
response = self.client.get(article.get_admin_url())
self.assertEqual(response.status_code, 200)
def test_validate_register(self):
"""
测试用户注册功能
"""
self.assertEquals(
0, len(
BlogUser.objects.filter(
email='user123@user.com')))
# 测试用户注册
response = self.client.post(reverse('account:register'), {
'username': 'user1233',
'email': 'user123@user.com',
@ -78,6 +95,7 @@ class AccountTest(TestCase):
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
# 测试登录后操作
self.client.login(username='user1233', password='password123!q@wE#R$T')
user = BlogUser.objects.filter(email='user123@user.com')[0]
user.is_superuser = True
@ -103,12 +121,14 @@ class AccountTest(TestCase):
response = self.client.get(article.get_admin_url())
self.assertEqual(response.status_code, 200)
# 测试登出
response = self.client.get(reverse('account:logout'))
self.assertIn(response.status_code, [301, 302, 200])
response = self.client.get(article.get_admin_url())
self.assertIn(response.status_code, [301, 302, 200])
# 测试错误密码登录
response = self.client.post(reverse('account:login'), {
'username': 'user1233',
'password': 'password123'
@ -119,18 +139,26 @@ class AccountTest(TestCase):
self.assertIn(response.status_code, [301, 302, 200])
def test_verify_email_code(self):
"""
测试邮箱验证码验证功能
"""
to_email = "admin@admin.com"
code = generate_code()
utils.set_code(to_email, code)
utils.send_verify_email(to_email, code)
# 测试正确验证码
err = utils.verify("admin@admin.com", code)
self.assertEqual(err, None)
# 测试错误邮箱
err = utils.verify("admin@123.com", code)
self.assertEqual(type(err), str)
def test_forget_password_email_code_success(self):
"""
测试忘记密码时成功获取验证码
"""
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict(email="admin@admin.com")
@ -140,12 +168,17 @@ class AccountTest(TestCase):
self.assertEqual(resp.content.decode("utf-8"), "ok")
def test_forget_password_email_code_fail(self):
"""
测试忘记密码时获取验证码失败的情况
"""
# 测试空邮箱
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict()
)
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
# 测试无效邮箱格式
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict(email="admin@com")
@ -153,6 +186,9 @@ class AccountTest(TestCase):
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
def test_forget_password_email_success(self):
"""
测试通过邮箱成功重置密码
"""
code = generate_code()
utils.set_code(self.blog_user.email, code)
data = dict(
@ -175,6 +211,9 @@ class AccountTest(TestCase):
self.assertEqual(blog_user.check_password(data["new_password1"]), True)
def test_forget_password_email_not_user(self):
"""
测试为不存在的用户重置密码
"""
data = dict(
new_password1=self.new_test,
new_password2=self.new_test,
@ -188,8 +227,10 @@ class AccountTest(TestCase):
self.assertEqual(resp.status_code, 200)
def test_forget_password_email_code_error(self):
"""
测试使用错误验证码重置密码
"""
code = generate_code()
utils.set_code(self.blog_user.email, code)
data = dict(

@ -4,25 +4,38 @@ from django.urls import re_path
from . import views
from .forms import LoginForm
app_name = "accounts"
app_name = "accounts" # 定义应用的命名空间
urlpatterns = [re_path(r'^login/$',
urlpatterns = [
# 登录URL使用自定义的LoginForm表单
re_path(r'^login/$',
views.LoginView.as_view(success_url='/'),
name='login',
kwargs={'authentication_form': LoginForm}),
re_path(r'^register/$',
# 注册URL
re_path(r'^register/$',
views.RegisterView.as_view(success_url="/"),
name='register'),
re_path(r'^logout/$',
# 登出URL
re_path(r'^logout/$',
views.LogoutView.as_view(),
name='logout'),
path(r'account/result.html',
# 账户操作结果页面URL
path(r'account/result.html',
views.account_result,
name='result'),
re_path(r'^forget_password/$',
# 忘记密码页面URL
re_path(r'^forget_password/$',
views.ForgetPasswordView.as_view(),
name='forget_password'),
re_path(r'^forget_password_code/$',
# 忘记密码验证码发送URL
re_path(r'^forget_password_code/$',
views.ForgetPasswordEmailCode.as_view(),
name='forget_password_code'),
]
]

@ -4,23 +4,52 @@ from django.contrib.auth.backends import ModelBackend
class EmailOrUsernameModelBackend(ModelBackend):
"""
允许使用用户名或邮箱登录
自定义认证后端允许用户使用用户名或邮箱进行登录
"""
def authenticate(self, request, username=None, password=None, **kwargs):
"""
重写authenticate方法支持邮箱或用户名登录
Args:
request: HTTP请求对象
username: 用户名或邮箱
password: 密码
**kwargs: 其他参数
Returns:
用户对象或None
"""
# 判断输入的是邮箱还是用户名
if '@' in username:
kwargs = {'email': username}
kwargs = {'email': username} # 使用邮箱查询
else:
kwargs = {'username': username}
kwargs = {'username': username} # 使用用户名查询
try:
# 获取用户对象
user = get_user_model().objects.get(**kwargs)
# 验证密码是否正确
if user.check_password(password):
return user
except get_user_model().DoesNotExist:
# 用户不存在时返回None
return None
def get_user(self, username):
"""
根据用户ID获取用户对象
Args:
username: 用户ID
Returns:
用户对象或None
"""
try:
# 根据主键获取用户
return get_user_model().objects.get(pk=username)
except get_user_model().DoesNotExist:
# 用户不存在时返回None
return None

@ -7,7 +7,7 @@ from django.utils.translation import gettext_lazy as _
from djangoblog.utils import send_email
_code_ttl = timedelta(minutes=5)
_code_ttl = timedelta(minutes=5) # 验证码有效期为5分钟
def send_verify_email(to_mail: str, code: str, subject: str = _("Verify Email")):
@ -17,6 +17,7 @@ def send_verify_email(to_mail: str, code: str, subject: str = _("Verify Email"))
subject: 邮件主题
code: 验证码
"""
# 构造邮件内容,包含验证码和有效期提示
html_content = _(
"You are resetting the password, the verification code is%(code)s, valid within 5 minutes, please keep it "
"properly") % {'code': code}
@ -30,20 +31,32 @@ def verify(email: str, code: str) -> typing.Optional[str]:
code: 验证码
Return:
如果有错误就返回错误str
Node:
Note:
这里的错误处理不太合理应该采用raise抛出
否测调用方也需要对error进行处理
"""
cache_code = get_code(email)
cache_code = get_code(email) # 从缓存中获取存储的验证码
if cache_code != code:
return gettext("Verification code error")
return gettext("Verification code error") # 验证码不匹配时返回错误信息
def set_code(email: str, code: str):
"""设置code"""
cache.set(email, code, _code_ttl.seconds)
"""设置code
将验证码存储到缓存中设置过期时间
Args:
email: 邮箱地址作为缓存的键
code: 验证码作为缓存的值
"""
cache.set(email, code, _code_ttl.seconds) # 使用邮箱作为键存储验证码
def get_code(email: str) -> typing.Optional[str]:
"""获取code"""
return cache.get(email)
"""获取code
从缓存中获取指定邮箱的验证码
Args:
email: 邮箱地址
Returns:
验证码字符串或None如果不存在或已过期
"""
return cache.get(email) # 从缓存中获取验证码

@ -26,34 +26,41 @@ from . import utils
from .forms import RegisterForm, LoginForm, ForgetPasswordForm, ForgetPasswordCodeForm
from .models import BlogUser
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__) # 创建日志记录器
# Create your views here.
class RegisterView(FormView):
form_class = RegisterForm
template_name = 'account/registration_form.html'
"""
用户注册视图类
"""
form_class = RegisterForm # 使用的表单类
template_name = 'account/registration_form.html' # 模板文件
@method_decorator(csrf_protect)
@method_decorator(csrf_protect) # 添加CSRF保护装饰器
def dispatch(self, *args, **kwargs):
return super(RegisterView, self).dispatch(*args, **kwargs)
def form_valid(self, form):
"""
表单验证成功时的处理方法
"""
if form.is_valid():
user = form.save(False)
user.is_active = False
user.source = 'Register'
user.save(True)
site = get_current_site().domain
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
user = form.save(False) # 保存表单但不提交到数据库
user.is_active = False # 设置用户为非活跃状态,需要邮箱验证
user.source = 'Register' # 设置用户来源
user.save(True) # 提交到数据库
site = get_current_site().domain # 获取当前站点域名
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id))) # 生成验证签名
if settings.DEBUG:
site = '127.0.0.1:8000'
path = reverse('account:result')
site = '127.0.0.1:8000' # 调试模式下使用本地地址
path = reverse('account:result') # 获取验证结果页面URL
url = "http://{site}{path}?type=validation&id={id}&sign={sign}".format(
site=site, path=path, id=user.id, sign=sign)
# 构造验证邮件内容
content = """
<p>请点击下面链接验证您的邮箱</p>
@ -64,6 +71,7 @@ class RegisterView(FormView):
如果上面链接无法打开请将此链接复制至浏览器
{url}
""".format(url=url)
# 发送验证邮件
send_email(
emailto=[
user.email,
@ -73,7 +81,7 @@ class RegisterView(FormView):
url = reverse('accounts:result') + \
'?type=register&id=' + str(user.id)
return HttpResponseRedirect(url)
return HttpResponseRedirect(url) # 重定向到注册结果页面
else:
return self.render_to_response({
'form': form
@ -81,33 +89,44 @@ class RegisterView(FormView):
class LogoutView(RedirectView):
url = '/login/'
"""
用户登出视图类
"""
url = '/login/' # 登出后重定向的URL
@method_decorator(never_cache)
@method_decorator(never_cache) # 添加不缓存装饰器
def dispatch(self, request, *args, **kwargs):
return super(LogoutView, self).dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
logout(request)
delete_sidebar_cache()
return super(LogoutView, self).get(request, *args, **kwargs)
"""
处理GET请求执行登出操作
"""
logout(request) # 执行登出
delete_sidebar_cache() # 删除侧边栏缓存
return super(LogoutView, self).get(request, *args, **kwargs) # 重定向到登录页
class LoginView(FormView):
form_class = LoginForm
template_name = 'account/login.html'
success_url = '/'
redirect_field_name = REDIRECT_FIELD_NAME
login_ttl = 2626560 # 一个月的时间
@method_decorator(sensitive_post_parameters('password'))
@method_decorator(csrf_protect)
@method_decorator(never_cache)
"""
用户登录视图类
"""
form_class = LoginForm # 使用的表单类
template_name = 'account/login.html' # 模板文件
success_url = '/' # 登录成功后重定向的URL
redirect_field_name = REDIRECT_FIELD_NAME # 重定向字段名
login_ttl = 2626560 # 登录会话保持时间(一个月)
@method_decorator(sensitive_post_parameters('password')) # 敏感参数保护
@method_decorator(csrf_protect) # CSRF保护
@method_decorator(never_cache) # 不缓存
def dispatch(self, request, *args, **kwargs):
return super(LoginView, self).dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
"""
获取上下文数据添加重定向URL
"""
redirect_to = self.request.GET.get(self.redirect_field_name)
if redirect_to is None:
redirect_to = '/'
@ -116,25 +135,30 @@ class LoginView(FormView):
return super(LoginView, self).get_context_data(**kwargs)
def form_valid(self, form):
"""
表单验证成功时的处理方法
"""
form = AuthenticationForm(data=self.request.POST, request=self.request)
if form.is_valid():
delete_sidebar_cache()
delete_sidebar_cache() # 删除侧边栏缓存
logger.info(self.redirect_field_name)
auth.login(self.request, form.get_user())
if self.request.POST.get("remember"):
self.request.session.set_expiry(self.login_ttl)
auth.login(self.request, form.get_user()) # 执行登录
if self.request.POST.get("remember"): # 如果选择了记住登录
self.request.session.set_expiry(self.login_ttl) # 设置会话过期时间
return super(LoginView, self).form_valid(form)
# return HttpResponseRedirect('/')
else:
return self.render_to_response({
'form': form
})
def get_success_url(self):
"""
获取登录成功后的重定向URL
"""
redirect_to = self.request.POST.get(self.redirect_field_name)
# 验证重定向URL是否安全
if not url_has_allowed_host_and_scheme(
url=redirect_to, allowed_hosts=[
self.request.get_host()]):
@ -143,62 +167,82 @@ class LoginView(FormView):
def account_result(request):
type = request.GET.get('type')
id = request.GET.get('id')
"""
账户操作结果页面
"""
type = request.GET.get('type') # 获取操作类型
id = request.GET.get('id') # 获取用户ID
user = get_object_or_404(get_user_model(), id=id)
user = get_object_or_404(get_user_model(), id=id) # 获取用户对象
logger.info(type)
if user.is_active:
return HttpResponseRedirect('/')
return HttpResponseRedirect('/') # 如果用户已激活,重定向到首页
if type and type in ['register', 'validation']:
if type == 'register':
# 注册成功提示
content = '''
恭喜您注册成功一封验证邮件已经发送到您的邮箱请验证您的邮箱后登录本站
'''
title = '注册成功'
else:
# 邮箱验证处理
c_sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
sign = request.GET.get('sign')
if sign != c_sign:
return HttpResponseForbidden()
user.is_active = True
return HttpResponseForbidden() # 签名验证失败
user.is_active = True # 激活用户
user.save()
content = '''
恭喜您已经成功的完成邮箱验证您现在可以使用您的账号来登录本站
'''
title = '验证成功'
# 渲染结果页面
return render(request, 'account/result.html', {
'title': title,
'content': content
})
else:
return HttpResponseRedirect('/')
return HttpResponseRedirect('/') # 重定向到首页
class ForgetPasswordView(FormView):
form_class = ForgetPasswordForm
template_name = 'account/forget_password.html'
"""
忘记密码视图类
"""
form_class = ForgetPasswordForm # 使用的表单类
template_name = 'account/forget_password.html' # 模板文件
def form_valid(self, form):
"""
表单验证成功时的处理方法
"""
if form.is_valid():
# 根据邮箱查找用户并更新密码
blog_user = BlogUser.objects.filter(email=form.cleaned_data.get("email")).get()
blog_user.password = make_password(form.cleaned_data["new_password2"])
blog_user.password = make_password(form.cleaned_data["new_password2"]) # 加密新密码
blog_user.save()
return HttpResponseRedirect('/login/')
return HttpResponseRedirect('/login/') # 重定向到登录页面
else:
return self.render_to_response({'form': form})
class ForgetPasswordEmailCode(View):
"""
忘记密码时发送验证码视图类
"""
def post(self, request: HttpRequest):
form = ForgetPasswordCodeForm(request.POST)
"""
处理POST请求发送验证码邮件
"""
form = ForgetPasswordCodeForm(request.POST) # 验证表单
if not form.is_valid():
return HttpResponse("错误的邮箱")
to_email = form.cleaned_data["email"]
return HttpResponse("错误的邮箱") # 表单验证失败返回错误信息
to_email = form.cleaned_data["email"] # 获取邮箱
code = generate_code() # 生成验证码
utils.send_verify_email(to_email, code) # 发送验证码邮件
utils.set_code(to_email, code) # 将验证码存储到缓存
code = generate_code()
utils.send_verify_email(to_email, code)
utils.set_code(to_email, code)
return HttpResponse("ok") # 返回成功信息
return HttpResponse("ok")

@ -10,29 +10,45 @@ from .models import Article
class ArticleForm(forms.ModelForm):
"""
文章表单类用于在管理界面编辑文章
"""
# body = forms.CharField(widget=AdminPagedownWidget())
class Meta:
model = Article
fields = '__all__'
fields = '__all__' # 包含所有字段
def makr_article_publish(modeladmin, request, queryset):
queryset.update(status='p')
"""
批量发布文章操作
"""
queryset.update(status='p') # 将选中的文章状态设置为已发布
def draft_article(modeladmin, request, queryset):
queryset.update(status='d')
"""
批量将文章设为草稿操作
"""
queryset.update(status='d') # 将选中的文章状态设置为草稿
def close_article_commentstatus(modeladmin, request, queryset):
queryset.update(comment_status='c')
"""
批量关闭文章评论功能
"""
queryset.update(comment_status='c') # 将选中的文章评论状态设置为关闭
def open_article_commentstatus(modeladmin, request, queryset):
queryset.update(comment_status='o')
"""
批量开启文章评论功能
"""
queryset.update(comment_status='o') # 将选中的文章评论状态设置为开启
# 为批量操作设置显示名称
makr_article_publish.short_description = _('Publish selected articles')
draft_article.short_description = _('Draft selected articles')
close_article_commentstatus.short_description = _('Close article comments')
@ -40,10 +56,13 @@ open_article_commentstatus.short_description = _('Open article comments')
class ArticlelAdmin(admin.ModelAdmin):
list_per_page = 20
search_fields = ('body', 'title')
form = ArticleForm
list_display = (
"""
文章模型在管理界面的配置类
"""
list_per_page = 20 # 每页显示20条记录
search_fields = ('body', 'title') # 设置搜索字段
form = ArticleForm # 使用自定义表单
list_display = ( # 列表页显示的字段
'id',
'title',
'author',
@ -53,34 +72,46 @@ class ArticlelAdmin(admin.ModelAdmin):
'status',
'type',
'article_order')
list_display_links = ('id', 'title')
list_filter = ('status', 'type', 'category')
filter_horizontal = ('tags',)
exclude = ('creation_time', 'last_modify_time')
view_on_site = True
actions = [
list_display_links = ('id', 'title') # 列表页中可点击进入编辑页的字段
list_filter = ('status', 'type', 'category') # 设置过滤器
filter_horizontal = ('tags',) # 标签字段使用水平过滤器
exclude = ('creation_time', 'last_modify_time') # 在表单中排除这些字段
view_on_site = True # 显示"在站点上查看"链接
actions = [ # 注册批量操作
makr_article_publish,
draft_article,
close_article_commentstatus,
open_article_commentstatus]
def link_to_category(self, obj):
"""
在列表页显示分类的链接
"""
info = (obj.category._meta.app_label, obj.category._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.category.id,))
return format_html(u'<a href="%s">%s</a>' % (link, obj.category.name))
link_to_category.short_description = _('category')
link_to_category.short_description = _('category') # 设置列名
def get_form(self, request, obj=None, **kwargs):
"""
获取表单限制作者字段只能选择超级用户
"""
form = super(ArticlelAdmin, self).get_form(request, obj, **kwargs)
form.base_fields['author'].queryset = get_user_model(
).objects.filter(is_superuser=True)
return form
def save_model(self, request, obj, form, change):
"""
保存模型实例
"""
super(ArticlelAdmin, self).save_model(request, obj, form, change)
def get_view_on_site_url(self, obj=None):
"""
获取在站点上查看的URL
"""
if obj:
url = obj.get_full_url()
return url
@ -91,22 +122,38 @@ class ArticlelAdmin(admin.ModelAdmin):
class TagAdmin(admin.ModelAdmin):
exclude = ('slug', 'last_mod_time', 'creation_time')
"""
标签模型在管理界面的配置类
"""
exclude = ('slug', 'last_mod_time', 'creation_time') # 排除字段
class CategoryAdmin(admin.ModelAdmin):
list_display = ('name', 'parent_category', 'index')
exclude = ('slug', 'last_mod_time', 'creation_time')
"""
分类模型在管理界面的配置类
"""
list_display = ('name', 'parent_category', 'index') # 列表页显示字段
exclude = ('slug', 'last_mod_time', 'creation_time') # 排除字段
class LinksAdmin(admin.ModelAdmin):
exclude = ('last_mod_time', 'creation_time')
"""
友情链接模型在管理界面的配置类
"""
exclude = ('last_mod_time', 'creation_time') # 排除字段
class SideBarAdmin(admin.ModelAdmin):
list_display = ('name', 'content', 'is_enable', 'sequence')
exclude = ('last_mod_time', 'creation_time')
"""
侧边栏模型在管理界面的配置类
"""
list_display = ('name', 'content', 'is_enable', 'sequence') # 列表页显示字段
exclude = ('last_mod_time', 'creation_time') # 排除字段
class BlogSettingsAdmin(admin.ModelAdmin):
"""
博客设置模型在管理界面的配置类
"""
pass

@ -2,4 +2,7 @@ from django.apps import AppConfig
class BlogConfig(AppConfig):
name = 'blog'
"""
Django应用配置类用于配置blog应用的基本信息
"""
name = 'blog' # 定义应用的名称,与项目中的应用目录名一致

@ -5,39 +5,55 @@ from django.utils import timezone
from djangoblog.utils import cache, get_blog_setting
from .models import Category, Article
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__) # 创建日志记录器
def seo_processor(requests):
key = 'seo_processor'
value = cache.get(key)
"""
SEO上下文处理器为模板提供SEO相关数据
Args:
requests: HTTP请求对象
Returns:
dict: 包含SEO和网站配置信息的字典
"""
key = 'seo_processor' # 缓存键名
value = cache.get(key) # 从缓存中获取数据
if value:
# 如果缓存中存在数据,直接返回
return value
else:
# 如果缓存中没有数据,记录日志并生成新数据
logger.info('set processor cache.')
setting = get_blog_setting()
setting = get_blog_setting() # 获取博客设置
# 构造返回值字典包含网站SEO和配置信息
value = {
'SITE_NAME': setting.site_name,
'SHOW_GOOGLE_ADSENSE': setting.show_google_adsense,
'GOOGLE_ADSENSE_CODES': setting.google_adsense_codes,
'SITE_SEO_DESCRIPTION': setting.site_seo_description,
'SITE_DESCRIPTION': setting.site_description,
'SITE_KEYWORDS': setting.site_keywords,
'SITE_BASE_URL': requests.scheme + '://' + requests.get_host() + '/',
'ARTICLE_SUB_LENGTH': setting.article_sub_length,
'nav_category_list': Category.objects.all(),
'nav_pages': Article.objects.filter(
'SITE_NAME': setting.site_name, # 网站名称
'SHOW_GOOGLE_ADSENSE': setting.show_google_adsense, # 是否显示谷歌广告
'GOOGLE_ADSENSE_CODES': setting.google_adsense_codes, # 谷歌广告代码
'SITE_SEO_DESCRIPTION': setting.site_seo_description, # 网站SEO描述
'SITE_DESCRIPTION': setting.site_description, # 网站描述
'SITE_KEYWORDS': setting.site_keywords, # 网站关键词
'SITE_BASE_URL': requests.scheme + '://' + requests.get_host() + '/', # 网站基础URL
'ARTICLE_SUB_LENGTH': setting.article_sub_length, # 文章摘要长度
'nav_category_list': Category.objects.all(), # 导航分类列表
'nav_pages': Article.objects.filter( # 导航页面列表
type='p',
status='p'),
'OPEN_SITE_COMMENT': setting.open_site_comment,
'BEIAN_CODE': setting.beian_code,
'ANALYTICS_CODE': setting.analytics_code,
"BEIAN_CODE_GONGAN": setting.gongan_beiancode,
"SHOW_GONGAN_CODE": setting.show_gongan_code,
"CURRENT_YEAR": timezone.now().year,
"GLOBAL_HEADER": setting.global_header,
"GLOBAL_FOOTER": setting.global_footer,
"COMMENT_NEED_REVIEW": setting.comment_need_review,
'OPEN_SITE_COMMENT': setting.open_site_comment, # 是否开启网站评论
'BEIAN_CODE': setting.beian_code, # 备案号
'ANALYTICS_CODE': setting.analytics_code, # 网站统计代码
"BEIAN_CODE_GONGAN": setting.gongan_beiancode, # 公安备案号
"SHOW_GONGAN_CODE": setting.show_gongan_code, # 是否显示公安备案号
"CURRENT_YEAR": timezone.now().year, # 当前年份
"GLOBAL_HEADER": setting.global_header, # 公共头部内容
"GLOBAL_FOOTER": setting.global_footer, # 公共尾部内容
"COMMENT_NEED_REVIEW": setting.comment_need_review, # 评论是否需要审核
}
# 将数据缓存10小时
cache.set(key, value, 60 * 60 * 10)
return value

@ -7,9 +7,11 @@ from elasticsearch_dsl.connections import connections
from blog.models import Article
# 检查是否在Django设置中配置了Elasticsearch
ELASTICSEARCH_ENABLED = hasattr(settings, 'ELASTICSEARCH_DSL')
if ELASTICSEARCH_ENABLED:
# 创建Elasticsearch连接
connections.create_connection(
hosts=[settings.ELASTICSEARCH_DSL['default']['hosts']])
from elasticsearch import Elasticsearch
@ -19,8 +21,10 @@ if ELASTICSEARCH_ENABLED:
c = IngestClient(es)
try:
# 尝试获取geoip管道
c.get_pipeline('geoip')
except elasticsearch.exceptions.NotFoundError:
# 如果geoip管道不存在则创建一个用于添加地理位置信息的管道
c.put_pipeline('geoip', body='''{
"description" : "Add geoip info",
"processors" : [
@ -34,57 +38,81 @@ if ELASTICSEARCH_ENABLED:
class GeoIp(InnerDoc):
continent_name = Keyword()
country_iso_code = Keyword()
country_name = Keyword()
location = GeoPoint()
"""
地理IP信息内部文档类
"""
continent_name = Keyword() # 大洲名称
country_iso_code = Keyword() # 国家ISO代码
country_name = Keyword() # 国家名称
location = GeoPoint() # 地理位置坐标
class UserAgentBrowser(InnerDoc):
Family = Keyword()
Version = Keyword()
"""
用户代理浏览器信息内部文档类
"""
Family = Keyword() # 浏览器家族
Version = Keyword() # 浏览器版本
class UserAgentOS(UserAgentBrowser):
"""
用户代理操作系统信息内部文档类
"""
pass
class UserAgentDevice(InnerDoc):
Family = Keyword()
Brand = Keyword()
Model = Keyword()
"""
用户代理设备信息内部文档类
"""
Family = Keyword() # 设备家族
Brand = Keyword() # 设备品牌
Model = Keyword() # 设备型号
class UserAgent(InnerDoc):
browser = Object(UserAgentBrowser, required=False)
os = Object(UserAgentOS, required=False)
device = Object(UserAgentDevice, required=False)
string = Text()
is_bot = Boolean()
"""
用户代理完整信息内部文档类
"""
browser = Object(UserAgentBrowser, required=False) # 浏览器信息
os = Object(UserAgentOS, required=False) # 操作系统信息
device = Object(UserAgentDevice, required=False) # 设备信息
string = Text() # 完整的User-Agent字符串
is_bot = Boolean() # 是否为爬虫
class ElapsedTimeDocument(Document):
url = Keyword()
time_taken = Long()
log_datetime = Date()
ip = Keyword()
geoip = Object(GeoIp, required=False)
useragent = Object(UserAgent, required=False)
"""
页面响应时间文档类用于记录网站性能数据
"""
url = Keyword() # 请求URL
time_taken = Long() # 耗时(毫秒)
log_datetime = Date() # 记录时间
ip = Keyword() # 访问者IP
geoip = Object(GeoIp, required=False) # 地理位置信息
useragent = Object(UserAgent, required=False) # 用户代理信息
class Index:
name = 'performance'
name = 'performance' # 索引名称
settings = {
"number_of_shards": 1,
"number_of_replicas": 0
"number_of_shards": 1, # 分片数量
"number_of_replicas": 0 # 副本数量
}
class Meta:
doc_type = 'ElapsedTime'
doc_type = 'ElapsedTime' # 文档类型
class ElaspedTimeDocumentManager:
"""
页面响应时间文档管理器类
"""
@staticmethod
def build_index():
"""
构建性能监控索引
"""
from elasticsearch import Elasticsearch
client = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
res = client.indices.exists(index="performance")
@ -93,13 +121,20 @@ class ElaspedTimeDocumentManager:
@staticmethod
def delete_index():
"""
删除性能监控索引
"""
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
es.indices.delete(index='performance', ignore=[400, 404])
@staticmethod
def create(url, time_taken, log_datetime, useragent, ip):
"""
创建并保存页面响应时间记录
"""
ElaspedTimeDocumentManager.build_index()
# 构建用户代理信息对象
ua = UserAgent()
ua.browser = UserAgentBrowser()
ua.browser.Family = useragent.browser.family
@ -116,98 +151,125 @@ class ElaspedTimeDocumentManager:
ua.string = useragent.ua_string
ua.is_bot = useragent.is_bot
# 创建文档实例
doc = ElapsedTimeDocument(
meta={
'id': int(
round(
time.time() *
1000))
1000)) # 使用当前时间戳作为文档ID
},
url=url,
time_taken=time_taken,
log_datetime=log_datetime,
useragent=ua, ip=ip)
useragent=ua,
ip=ip)
# 保存文档并使用geoip管道处理IP地理位置信息
doc.save(pipeline="geoip")
class ArticleDocument(Document):
body = Text(analyzer='ik_max_word', search_analyzer='ik_smart')
title = Text(analyzer='ik_max_word', search_analyzer='ik_smart')
author = Object(properties={
"""
文章文档类用于Elasticsearch全文搜索
"""
body = Text(analyzer='ik_max_word', search_analyzer='ik_smart') # 文章正文使用IK分词器
title = Text(analyzer='ik_max_word', search_analyzer='ik_smart') # 文章标题使用IK分词器
author = Object(properties={ # 作者信息
'nickname': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
'id': Integer()
})
category = Object(properties={
category = Object(properties={ # 分类信息
'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
'id': Integer()
})
tags = Object(properties={
tags = Object(properties={ # 标签信息
'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
'id': Integer()
})
pub_time = Date()
status = Text()
comment_status = Text()
type = Text()
views = Integer()
article_order = Integer()
pub_time = Date() # 发布时间
status = Text() # 文章状态
comment_status = Text() # 评论状态
type = Text() # 文章类型
views = Integer() # 浏览量
article_order = Integer() # 文章排序
class Index:
name = 'blog'
name = 'blog' # 索引名称
settings = {
"number_of_shards": 1,
"number_of_replicas": 0
"number_of_shards": 1, # 分片数量
"number_of_replicas": 0 # 副本数量
}
class Meta:
doc_type = 'Article'
doc_type = 'Article' # 文档类型
class ArticleDocumentManager():
"""
文章文档管理器类用于管理文章在Elasticsearch中的索引
"""
def __init__(self):
self.create_index()
def create_index(self):
"""
创建文章索引
"""
ArticleDocument.init()
def delete_index(self):
"""
删除文章索引
"""
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
es.indices.delete(index='blog', ignore=[400, 404])
def convert_to_doc(self, articles):
"""
将Django文章对象转换为Elasticsearch文档对象
"""
return [
ArticleDocument(
meta={
'id': article.id},
body=article.body,
title=article.title,
author={
'id': article.id}, # 文档ID
body=article.body, # 文章正文
title=article.title, # 文章标题
author={ # 作者信息
'nickname': article.author.username,
'id': article.author.id},
category={
category={ # 分类信息
'name': article.category.name,
'id': article.category.id},
tags=[
tags=[ # 标签信息
{
'name': t.name,
'id': t.id} for t in article.tags.all()],
pub_time=article.pub_time,
status=article.status,
comment_status=article.comment_status,
type=article.type,
views=article.views,
pub_time=article.pub_time, # 发布时间
status=article.status, # 文章状态
comment_status=article.comment_status, # 评论状态
type=article.type, # 文章类型
views=article.views, # 浏览量
article_order=article.article_order) for article in articles]
def rebuild(self, articles=None):
"""
重新构建文章索引
"""
ArticleDocument.init()
# 如果没有提供文章列表,则获取所有文章
articles = articles if articles else Article.objects.all()
docs = self.convert_to_doc(articles)
# 逐个保存文档
for doc in docs:
doc.save()
def update_docs(self, docs):
"""
更新文档
"""
for doc in docs:
doc.save()

@ -3,17 +3,33 @@ import logging
from django import forms
from haystack.forms import SearchForm
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__) # 创建日志记录器
class BlogSearchForm(SearchForm):
"""
博客搜索表单类继承自Haystack的SearchForm
"""
# 定义搜索查询字段,设置为必填
querydata = forms.CharField(required=True)
def search(self):
"""
执行搜索操作
Returns:
搜索结果数据
"""
# 调用父类的搜索方法
datas = super(BlogSearchForm, self).search()
# 验证表单数据
if not self.is_valid():
return self.no_query_found()
return self.no_query_found() # 如果表单无效,返回无查询结果
# 如果查询数据存在,记录查询日志
if self.cleaned_data['querydata']:
logger.info(self.cleaned_data['querydata'])
return datas
return datas # 返回搜索结果

@ -6,13 +6,27 @@ from blog.documents import ElapsedTimeDocument, ArticleDocumentManager, ElaspedT
# TODO 参数化
class Command(BaseCommand):
help = 'build search index'
"""
Django管理命令用于构建搜索引擎索引
"""
help = 'build search index' # 命令帮助信息
def handle(self, *args, **options):
"""
命令处理函数执行索引构建操作
"""
# 检查是否启用了Elasticsearch
if ELASTICSEARCH_ENABLED:
# 构建耗时文档索引
ElaspedTimeDocumentManager.build_index()
# 初始化耗时文档
manager = ElapsedTimeDocument()
manager.init()
# 重新构建文章文档索引
manager = ArticleDocumentManager()
manager.delete_index()
manager.rebuild()
manager.delete_index() # 删除现有索引
manager.rebuild() # 重新构建索引
else:
# 如果未启用Elasticsearch可以添加提示信息或处理逻辑
pass

@ -5,9 +5,19 @@ from blog.models import Tag, Category
# TODO 参数化
class Command(BaseCommand):
help = 'build search words'
"""
Django管理命令用于构建搜索词列表
"""
help = 'build search words' # 命令帮助信息
def handle(self, *args, **options):
datas = set([t.name for t in Tag.objects.all()] +
[t.name for t in Category.objects.all()])
"""
命令处理函数生成标签和分类名称的搜索词列表
"""
# 收集所有标签和分类的名称使用set去重
datas = set([t.name for t in Tag.objects.all()] + # 获取所有标签名称
[t.name for t in Category.objects.all()]) # 获取所有分类名称
# 将搜索词列表按行打印输出
print('\n'.join(datas))

@ -4,8 +4,17 @@ from djangoblog.utils import cache
class Command(BaseCommand):
help = 'clear the whole cache'
"""
Django管理命令用于清除整个缓存
"""
help = 'clear the whole cache' # 命令帮助信息
def handle(self, *args, **options):
cache.clear()
"""
命令处理函数执行缓存清除操作
"""
cache.clear() # 清除所有缓存
# 输出成功信息到控制台
self.stdout.write(self.style.SUCCESS('Cleared cache\n'))

@ -6,35 +6,61 @@ from blog.models import Article, Tag, Category
class Command(BaseCommand):
help = 'create test datas'
"""
Django管理命令用于创建测试数据
"""
help = 'create test datas' # 命令帮助信息
def handle(self, *args, **options):
"""
命令处理函数创建测试用的用户分类标签和文章数据
"""
# 创建或获取测试用户
user = get_user_model().objects.get_or_create(
email='test@test.com', username='测试用户', password=make_password('test!q@w#eTYU'))[0]
email='test@test.com',
username='测试用户',
password=make_password('test!q@w#eTYU'))[0]
# 创建父分类
pcategory = Category.objects.get_or_create(
name='我是父类目', parent_category=None)[0]
name='我是父类目',
parent_category=None)[0]
# 创建子分类
category = Category.objects.get_or_create(
name='子类目', parent_category=pcategory)[0]
name='子类目',
parent_category=pcategory)[0]
category.save()
# 创建基础标签
basetag = Tag()
basetag.name = "标签"
basetag.save()
# 创建20篇文章及其对应的标签
for i in range(1, 20):
# 创建文章
article = Article.objects.get_or_create(
category=category,
title='nice title ' + str(i),
body='nice content ' + str(i),
author=user)[0]
# 为每篇文章创建专属标签
tag = Tag()
tag.name = "标签" + str(i)
tag.save()
# 将标签关联到文章
article.tags.add(tag)
article.tags.add(basetag)
article.save()
# 清除缓存
from djangoblog.utils import cache
cache.clear()
# 输出成功信息
self.stdout.write(self.style.SUCCESS('created test datas \n'))

@ -4,47 +4,73 @@ from djangoblog.spider_notify import SpiderNotify
from djangoblog.utils import get_current_site
from blog.models import Article, Tag, Category
site = get_current_site().domain
site = get_current_site().domain # 获取当前站点域名
class Command(BaseCommand):
help = 'notify baidu url'
"""
Django管理命令用于向百度搜索引擎提交URL进行收录通知
"""
help = 'notify baidu url' # 命令帮助信息
def add_arguments(self, parser):
"""
添加命令行参数
"""
parser.add_argument(
'data_type',
type=str,
choices=[
'data_type', # 参数名称
type=str, # 参数类型
choices=[ # 参数可选值
'all',
'article',
'tag',
'category'],
help='article : all article,tag : all tag,category: all category,all: All of these')
help='article : all article,tag : all tag,category: all category,all: All of these') # 参数帮助信息
def get_full_url(self, path):
"""
根据相对路径生成完整URL
Args:
path: 相对路径
Returns:
完整的HTTPS URL
"""
url = "https://{site}{path}".format(site=site, path=path)
return url
def handle(self, *args, **options):
type = options['data_type']
self.stdout.write('start get %s' % type)
"""
命令处理函数执行百度URL提交操作
"""
type = options['data_type'] # 获取数据类型参数
self.stdout.write('start get %s' % type) # 输出开始信息
urls = []
urls = [] # 存储需要提交的URL列表
# 根据数据类型收集相应的URL
if type == 'article' or type == 'all':
# 收集所有已发布文章的URL
for article in Article.objects.filter(status='p'):
urls.append(article.get_full_url())
if type == 'tag' or type == 'all':
# 收集所有标签页面的URL
for tag in Tag.objects.all():
url = tag.get_absolute_url()
urls.append(self.get_full_url(url))
if type == 'category' or type == 'all':
# 收集所有分类页面的URL
for category in Category.objects.all():
url = category.get_absolute_url()
urls.append(self.get_full_url(url))
# 输出准备提交的URL数量
self.stdout.write(
self.style.SUCCESS(
'start notify %d urls' %
len(urls)))
'start notify %d urls' % len(urls)))
# 向百度提交URL
SpiderNotify.baidu_notify(urls)
# 输出完成信息
self.stdout.write(self.style.SUCCESS('finish notify'))

@ -8,40 +8,68 @@ from oauth.oauthmanager import get_manager_by_type
class Command(BaseCommand):
help = 'sync user avatar'
"""
Django管理命令用于同步用户头像
"""
help = 'sync user avatar' # 命令帮助信息
def test_picture(self, url):
"""
测试图片URL是否有效
Args:
url: 图片URL
Returns:
bool: URL是否有效
"""
try:
# 发送GET请求测试图片URL超时时间为2秒
if requests.get(url, timeout=2).status_code == 200:
return True
except:
# 出现异常说明URL无效
pass
def handle(self, *args, **options):
static_url = static("../")
users = OAuthUser.objects.all()
self.stdout.write(f'开始同步{len(users)}个用户头像')
"""
命令处理函数执行用户头像同步操作
"""
static_url = static("../") # 获取静态文件基础URL
users = OAuthUser.objects.all() # 获取所有OAuth用户
self.stdout.write(f'开始同步{len(users)}个用户头像') # 输出开始信息
# 遍历所有用户进行头像同步
for u in users:
self.stdout.write(f'开始同步:{u.nickname}')
url = u.picture
self.stdout.write(f'开始同步:{u.nickname}') # 输出当前同步的用户
url = u.picture # 获取用户当前头像URL
if url:
# 如果URL以静态URL开头说明是本地头像
if url.startswith(static_url):
# 测试图片是否有效
if self.test_picture(url):
continue
continue # 有效则跳过
else:
# 无效则尝试从元数据重新获取
if u.metadata:
manage = get_manager_by_type(u.type)
url = manage.get_picture(u.metadata)
url = save_user_avatar(url)
manage = get_manager_by_type(u.type) # 获取对应的OAuth管理器
url = manage.get_picture(u.metadata) # 从元数据获取新图片URL
url = save_user_avatar(url) # 保存用户头像
else:
# 没有元数据则使用默认头像
url = static('blog/img/avatar.png')
else:
# 非本地头像则保存新头像
url = save_user_avatar(url)
else:
# 没有头像URL则使用默认头像
url = static('blog/img/avatar.png')
# 如果获取到有效的头像URL则更新用户头像
if url:
self.stdout.write(
f'结束同步:{u.nickname}.url:{url}')
u.picture = url
u.save()
self.stdout.write('结束同步')
f'结束同步:{u.nickname}.url:{url}') # 输出同步结果
u.picture = url # 更新用户头像URL
u.save() # 保存到数据库
self.stdout.write('结束同步') # 输出结束信息

@ -6,37 +6,66 @@ from user_agents import parse
from blog.documents import ELASTICSEARCH_ENABLED, ElaspedTimeDocumentManager
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__) # 创建日志记录器
class OnlineMiddleware(object):
"""
在线中间件类用于记录页面加载时间和用户访问信息
"""
def __init__(self, get_response=None):
"""
初始化中间件
Args:
get_response: Django的响应获取函数
"""
self.get_response = get_response
super().__init__()
def __call__(self, request):
"""
中间件调用方法处理请求和响应
Args:
request: HTTP请求对象
Returns:
HTTP响应对象
"""
''' page render time '''
start_time = time.time()
response = self.get_response(request)
http_user_agent = request.META.get('HTTP_USER_AGENT', '')
start_time = time.time() # 记录请求开始时间
response = self.get_response(request) # 获取响应
# 获取用户IP地址
ip, _ = get_client_ip(request)
# 解析用户代理信息
http_user_agent = request.META.get('HTTP_USER_AGENT', '')
user_agent = parse(http_user_agent)
# 如果响应不是流式响应,则处理性能数据
if not response.streaming:
try:
# 计算页面渲染耗时
cast_time = time.time() - start_time
# 如果启用了Elasticsearch则记录性能数据
if ELASTICSEARCH_ENABLED:
time_taken = round((cast_time) * 1000, 2)
url = request.path
time_taken = round((cast_time) * 1000, 2) # 转换为毫秒
url = request.path # 获取请求路径
from django.utils import timezone
# 创建并保存性能文档
ElaspedTimeDocumentManager.create(
url=url,
time_taken=time_taken,
log_datetime=timezone.now(),
useragent=user_agent,
ip=ip)
# 在响应内容中替换加载时间占位符
response.content = response.content.replace(
b'<!!LOAD_TIMES!!>', str.encode(str(cast_time)[:5]))
except Exception as e:
# 记录错误日志
logger.error("Error OnlineMiddleware: %s" % e)
return response
return response # 返回响应

@ -8,14 +8,17 @@ import mdeditor.fields
class Migration(migrations.Migration):
# 标记这是一个初始迁移文件
initial = True
# 定义依赖关系
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
migrations.swappable_dependency(settings.AUTH_USER_MODEL), # 依赖用户模型
]
# 定义具体的操作
operations = [
# 创建BlogSettings模型网站配置
migrations.CreateModel(
name='BlogSettings',
fields=[
@ -41,6 +44,8 @@ class Migration(migrations.Migration):
'verbose_name_plural': '网站配置',
},
),
# 创建Links模型友情链接
migrations.CreateModel(
name='Links',
fields=[
@ -59,6 +64,8 @@ class Migration(migrations.Migration):
'ordering': ['sequence'],
},
),
# 创建SideBar模型侧边栏
migrations.CreateModel(
name='SideBar',
fields=[
@ -76,6 +83,8 @@ class Migration(migrations.Migration):
'ordering': ['sequence'],
},
),
# 创建Tag模型标签
migrations.CreateModel(
name='Tag',
fields=[
@ -91,6 +100,8 @@ class Migration(migrations.Migration):
'ordering': ['name'],
},
),
# 创建Category模型分类
migrations.CreateModel(
name='Category',
fields=[
@ -108,6 +119,8 @@ class Migration(migrations.Migration):
'ordering': ['-index'],
},
),
# 创建Article模型文章
migrations.CreateModel(
name='Article',
fields=[
@ -115,7 +128,7 @@ class Migration(migrations.Migration):
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
('title', models.CharField(max_length=200, unique=True, verbose_name='标题')),
('body', mdeditor.fields.MDTextField(verbose_name='正文')),
('body', mdeditor.fields.MDTextField(verbose_name='正文')), # 使用Markdown编辑器字段
('pub_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='发布时间')),
('status', models.CharField(choices=[('d', '草稿'), ('p', '发表')], default='p', max_length=1, verbose_name='文章状态')),
('comment_status', models.CharField(choices=[('o', '打开'), ('c', '关闭')], default='o', max_length=1, verbose_name='评论状态')),
@ -130,8 +143,9 @@ class Migration(migrations.Migration):
options={
'verbose_name': '文章',
'verbose_name_plural': '文章',
'ordering': ['-article_order', '-pub_time'],
'ordering': ['-article_order', '-pub_time'], # 按排序和发布时间降序排列
'get_latest_by': 'id',
},
),
]

@ -4,20 +4,24 @@ from django.db import migrations, models
class Migration(migrations.Migration):
# 定义该迁移文件的依赖关系依赖于blog应用的0001_initial迁移
dependencies = [
('blog', '0001_initial'),
]
# 定义具体的迁移操作
operations = [
# 为BlogSettings模型添加global_footer字段
migrations.AddField(
model_name='blogsettings',
name='global_footer',
field=models.TextField(blank=True, default='', null=True, verbose_name='公共尾部'),
),
# 为BlogSettings模型添加global_header字段
migrations.AddField(
model_name='blogsettings',
name='global_header',
field=models.TextField(blank=True, default='', null=True, verbose_name='公共头部'),
),
]

@ -4,14 +4,18 @@ from django.db import migrations, models
class Migration(migrations.Migration):
# 定义该迁移文件的依赖关系依赖于blog应用的0002_blogsettings_global_footer_and_more迁移
dependencies = [
('blog', '0002_blogsettings_global_footer_and_more'),
]
# 定义具体的迁移操作
operations = [
# 为BlogSettings模型添加comment_need_review字段
migrations.AddField(
model_name='blogsettings',
name='comment_need_review',
field=models.BooleanField(default=False, verbose_name='评论是否需要审核'),
),
]

@ -4,24 +4,30 @@ from django.db import migrations
class Migration(migrations.Migration):
# 定义该迁移文件的依赖关系依赖于blog应用的0003_blogsettings_comment_need_review迁移
dependencies = [
('blog', '0003_blogsettings_comment_need_review'),
]
# 定义具体的迁移操作
operations = [
# 将BlogSettings模型中的analyticscode字段重命名为analytics_code
migrations.RenameField(
model_name='blogsettings',
old_name='analyticscode',
new_name='analytics_code',
),
# 将BlogSettings模型中的beiancode字段重命名为beian_code
migrations.RenameField(
model_name='blogsettings',
old_name='beiancode',
new_name='beian_code',
),
# 将BlogSettings模型中的sitename字段重命名为site_name
migrations.RenameField(
model_name='blogsettings',
old_name='sitename',
new_name='site_name',
),
]

@ -8,293 +8,357 @@ import mdeditor.fields
class Migration(migrations.Migration):
# 定义该迁移文件的依赖关系
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('blog', '0004_rename_analyticscode_blogsettings_analytics_code_and_more'),
]
# 定义具体的迁移操作
operations = [
# 修改Article模型的选项配置将显示名称改为英文
migrations.AlterModelOptions(
name='article',
options={'get_latest_by': 'id', 'ordering': ['-article_order', '-pub_time'], 'verbose_name': 'article', 'verbose_name_plural': 'article'},
),
# 修改Category模型的选项配置将显示名称改为英文
migrations.AlterModelOptions(
name='category',
options={'ordering': ['-index'], 'verbose_name': 'category', 'verbose_name_plural': 'category'},
),
# 修改Links模型的选项配置将显示名称改为英文
migrations.AlterModelOptions(
name='links',
options={'ordering': ['sequence'], 'verbose_name': 'link', 'verbose_name_plural': 'link'},
),
# 修改SideBar模型的选项配置将显示名称改为英文
migrations.AlterModelOptions(
name='sidebar',
options={'ordering': ['sequence'], 'verbose_name': 'sidebar', 'verbose_name_plural': 'sidebar'},
),
# 修改Tag模型的选项配置将显示名称改为英文
migrations.AlterModelOptions(
name='tag',
options={'ordering': ['name'], 'verbose_name': 'tag', 'verbose_name_plural': 'tag'},
),
# 移除Article模型中的created_time字段
migrations.RemoveField(
model_name='article',
name='created_time',
),
# 移除Article模型中的last_mod_time字段
migrations.RemoveField(
model_name='article',
name='last_mod_time',
),
# 移除Category模型中的created_time字段
migrations.RemoveField(
model_name='category',
name='created_time',
),
# 移除Category模型中的last_mod_time字段
migrations.RemoveField(
model_name='category',
name='last_mod_time',
),
# 移除Links模型中的created_time字段
migrations.RemoveField(
model_name='links',
name='created_time',
),
# 移除SideBar模型中的created_time字段
migrations.RemoveField(
model_name='sidebar',
name='created_time',
),
# 移除Tag模型中的created_time字段
migrations.RemoveField(
model_name='tag',
name='created_time',
),
# 移除Tag模型中的last_mod_time字段
migrations.RemoveField(
model_name='tag',
name='last_mod_time',
),
# 添加Article模型中的creation_time字段
migrations.AddField(
model_name='article',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 添加Article模型中的last_modify_time字段
migrations.AddField(
model_name='article',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
# 添加Category模型中的creation_time字段
migrations.AddField(
model_name='category',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 添加Category模型中的last_modify_time字段
migrations.AddField(
model_name='category',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
# 添加Links模型中的creation_time字段
migrations.AddField(
model_name='links',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 添加SideBar模型中的creation_time字段
migrations.AddField(
model_name='sidebar',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 添加Tag模型中的creation_time字段
migrations.AddField(
model_name='tag',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
),
# 添加Tag模型中的last_modify_time字段
migrations.AddField(
model_name='tag',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
# 修改Article模型中的article_order字段显示名称
migrations.AlterField(
model_name='article',
name='article_order',
field=models.IntegerField(default=0, verbose_name='order'),
),
# 修改Article模型中的author字段显示名称
migrations.AlterField(
model_name='article',
name='author',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL, verbose_name='author'),
),
# 修改Article模型中的body字段显示名称
migrations.AlterField(
model_name='article',
name='body',
field=mdeditor.fields.MDTextField(verbose_name='body'),
),
# 修改Article模型中的category字段显示名称
migrations.AlterField(
model_name='article',
name='category',
field=models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='blog.category', verbose_name='category'),
),
# 修改Article模型中的comment_status字段显示名称和选项
migrations.AlterField(
model_name='article',
name='comment_status',
field=models.CharField(choices=[('o', 'Open'), ('c', 'Close')], default='o', max_length=1, verbose_name='comment status'),
),
# 修改Article模型中的pub_time字段显示名称
migrations.AlterField(
model_name='article',
name='pub_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='publish time'),
),
# 修改Article模型中的show_toc字段显示名称
migrations.AlterField(
model_name='article',
name='show_toc',
field=models.BooleanField(default=False, verbose_name='show toc'),
),
# 修改Article模型中的status字段显示名称和选项
migrations.AlterField(
model_name='article',
name='status',
field=models.CharField(choices=[('d', 'Draft'), ('p', 'Published')], default='p', max_length=1, verbose_name='status'),
),
# 修改Article模型中的tags字段显示名称
migrations.AlterField(
model_name='article',
name='tags',
field=models.ManyToManyField(blank=True, to='blog.tag', verbose_name='tag'),
),
# 修改Article模型中的title字段显示名称
migrations.AlterField(
model_name='article',
name='title',
field=models.CharField(max_length=200, unique=True, verbose_name='title'),
),
# 修改Article模型中的type字段显示名称和选项
migrations.AlterField(
model_name='article',
name='type',
field=models.CharField(choices=[('a', 'Article'), ('p', 'Page')], default='a', max_length=1, verbose_name='type'),
),
# 修改Article模型中的views字段显示名称
migrations.AlterField(
model_name='article',
name='views',
field=models.PositiveIntegerField(default=0, verbose_name='views'),
),
# 修改BlogSettings模型中的article_comment_count字段显示名称
migrations.AlterField(
model_name='blogsettings',
name='article_comment_count',
field=models.IntegerField(default=5, verbose_name='article comment count'),
),
# 修改BlogSettings模型中的article_sub_length字段显示名称
migrations.AlterField(
model_name='blogsettings',
name='article_sub_length',
field=models.IntegerField(default=300, verbose_name='article sub length'),
),
# 修改BlogSettings模型中的google_adsense_codes字段显示名称
migrations.AlterField(
model_name='blogsettings',
name='google_adsense_codes',
field=models.TextField(blank=True, default='', max_length=2000, null=True, verbose_name='adsense code'),
),
# 修改BlogSettings模型中的open_site_comment字段显示名称
migrations.AlterField(
model_name='blogsettings',
name='open_site_comment',
field=models.BooleanField(default=True, verbose_name='open site comment'),
),
# 修改BlogSettings模型中的show_google_adsense字段显示名称
migrations.AlterField(
model_name='blogsettings',
name='show_google_adsense',
field=models.BooleanField(default=False, verbose_name='show adsense'),
),
# 修改BlogSettings模型中的sidebar_article_count字段显示名称
migrations.AlterField(
model_name='blogsettings',
name='sidebar_article_count',
field=models.IntegerField(default=10, verbose_name='sidebar article count'),
),
# 修改BlogSettings模型中的sidebar_comment_count字段显示名称
migrations.AlterField(
model_name='blogsettings',
name='sidebar_comment_count',
field=models.IntegerField(default=5, verbose_name='sidebar comment count'),
),
# 修改BlogSettings模型中的site_description字段显示名称
migrations.AlterField(
model_name='blogsettings',
name='site_description',
field=models.TextField(default='', max_length=1000, verbose_name='site description'),
),
# 修改BlogSettings模型中的site_keywords字段显示名称
migrations.AlterField(
model_name='blogsettings',
name='site_keywords',
field=models.TextField(default='', max_length=1000, verbose_name='site keywords'),
),
# 修改BlogSettings模型中的site_name字段显示名称
migrations.AlterField(
model_name='blogsettings',
name='site_name',
field=models.CharField(default='', max_length=200, verbose_name='site name'),
),
# 修改BlogSettings模型中的site_seo_description字段显示名称
migrations.AlterField(
model_name='blogsettings',
name='site_seo_description',
field=models.TextField(default='', max_length=1000, verbose_name='site seo description'),
),
# 修改Category模型中的index字段显示名称
migrations.AlterField(
model_name='category',
name='index',
field=models.IntegerField(default=0, verbose_name='index'),
),
# 修改Category模型中的name字段显示名称
migrations.AlterField(
model_name='category',
name='name',
field=models.CharField(max_length=30, unique=True, verbose_name='category name'),
),
# 修改Category模型中的parent_category字段显示名称
migrations.AlterField(
model_name='category',
name='parent_category',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='blog.category', verbose_name='parent category'),
),
# 修改Links模型中的is_enable字段显示名称
migrations.AlterField(
model_name='links',
name='is_enable',
field=models.BooleanField(default=True, verbose_name='is show'),
),
# 修改Links模型中的last_mod_time字段显示名称
migrations.AlterField(
model_name='links',
name='last_mod_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
# 修改Links模型中的link字段显示名称
migrations.AlterField(
model_name='links',
name='link',
field=models.URLField(verbose_name='link'),
),
# 修改Links模型中的name字段显示名称
migrations.AlterField(
model_name='links',
name='name',
field=models.CharField(max_length=30, unique=True, verbose_name='link name'),
),
# 修改Links模型中的sequence字段显示名称
migrations.AlterField(
model_name='links',
name='sequence',
field=models.IntegerField(unique=True, verbose_name='order'),
),
# 修改Links模型中的show_type字段显示名称和选项
migrations.AlterField(
model_name='links',
name='show_type',
field=models.CharField(choices=[('i', 'index'), ('l', 'list'), ('p', 'post'), ('a', 'all'), ('s', 'slide')], default='i', max_length=1, verbose_name='show type'),
),
# 修改SideBar模型中的content字段显示名称
migrations.AlterField(
model_name='sidebar',
name='content',
field=models.TextField(verbose_name='content'),
),
# 修改SideBar模型中的is_enable字段显示名称
migrations.AlterField(
model_name='sidebar',
name='is_enable',
field=models.BooleanField(default=True, verbose_name='is enable'),
),
# 修改SideBar模型中的last_mod_time字段显示名称
migrations.AlterField(
model_name='sidebar',
name='last_mod_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='modify time'),
),
# 修改SideBar模型中的name字段显示名称
migrations.AlterField(
model_name='sidebar',
name='name',
field=models.CharField(max_length=100, verbose_name='title'),
),
# 修改SideBar模型中的sequence字段显示名称
migrations.AlterField(
model_name='sidebar',
name='sequence',
field=models.IntegerField(unique=True, verbose_name='order'),
),
# 修改Tag模型中的name字段显示名称
migrations.AlterField(
model_name='tag',
name='name',
field=models.CharField(max_length=30, unique=True, verbose_name='tag name'),
),
]

@ -4,14 +4,17 @@ from django.db import migrations
class Migration(migrations.Migration):
# 定义该迁移文件的依赖关系依赖于blog应用的0005_alter_article_options_alter_category_options_and_more迁移
dependencies = [
('blog', '0005_alter_article_options_alter_category_options_and_more'),
]
# 定义具体的迁移操作
operations = [
# 修改BlogSettings模型的选项配置将显示名称改为英文
migrations.AlterModelOptions(
name='blogsettings',
options={'verbose_name': 'Website configuration', 'verbose_name_plural': 'Website configuration'},
),
]

@ -14,6 +14,444 @@ from uuslug import slugify
from djangoblog.utils import cache_decorator, cache
from djangoblog.utils import get_current_site
logger = logging.getLogger(__name__) # 创建日志记录器
class LinkShowType(models.TextChoices):
"""
链接显示类型枚举类
"""
I = ('i', _('index')) # 首页
L = ('l', _('list')) # 列表页
P = ('p', _('post')) # 文章页面
A = ('a', _('all')) # 全站
S = ('s', _('slide')) # 幻灯片
class BaseModel(models.Model):
"""
基础模型类提供通用字段和方法
"""
id = models.AutoField(primary_key=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('modify time'), default=now)
def save(self, *args, **kwargs):
"""
重写保存方法处理slug生成和视图计数更新
"""
# 判断是否是仅更新浏览量的操作
is_update_views = isinstance(
self,
Article) and 'update_fields' in kwargs and kwargs['update_fields'] == ['views']
if is_update_views:
# 如果是更新浏览量,直接更新数据库避免触发其他逻辑
Article.objects.filter(pk=self.pk).update(views=self.views)
else:
# 如果有slug字段自动生成slug
if 'slug' in self.__dict__:
slug = getattr(
self, 'title') if 'title' in self.__dict__ else getattr(
self, 'name')
setattr(self, 'slug', slugify(slug))
super().save(*args, **kwargs)
def get_full_url(self):
"""
获取完整的URL地址
"""
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
return url
class Meta:
abstract = True # 声明为抽象类
@abstractmethod
def get_absolute_url(self):
"""
抽象方法子类必须实现获取绝对URL的方法
"""
pass
class Article(BaseModel):
"""文章模型"""
STATUS_CHOICES = (
('d', _('Draft')), # 草稿
('p', _('Published')), # 已发布
)
COMMENT_STATUS = (
('o', _('Open')), # 开启评论
('c', _('Close')), # 关闭评论
)
TYPE = (
('a', _('Article')), # 文章
('p', _('Page')), # 页面
)
title = models.CharField(_('title'), max_length=200, unique=True)
body = MDTextField(_('body')) # 使用Markdown编辑器字段
pub_time = models.DateTimeField(
_('publish time'), blank=False, null=False, default=now)
status = models.CharField(
_('status'),
max_length=1,
choices=STATUS_CHOICES,
default='p')
comment_status = models.CharField(
_('comment status'),
max_length=1,
choices=COMMENT_STATUS,
default='o')
type = models.CharField(_('type'), max_length=1, choices=TYPE, default='a')
views = models.PositiveIntegerField(_('views'), default=0)
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
blank=False,
null=False,
on_delete=models.CASCADE)
article_order = models.IntegerField(
_('order'), blank=False, null=False, default=0)
show_toc = models.BooleanField(_('show toc'), blank=False, null=False, default=False)
category = models.ForeignKey(
'Category',
verbose_name=_('category'),
on_delete=models.CASCADE,
blank=False,
null=False)
tags = models.ManyToManyField('Tag', verbose_name=_('tag'), blank=True)
def body_to_string(self):
"""
将文章正文转换为字符串
"""
return self.body
def __str__(self):
return self.title
class Meta:
ordering = ['-article_order', '-pub_time'] # 按排序和发布时间降序排列
verbose_name = _('article')
verbose_name_plural = verbose_name
get_latest_by = 'id'
def get_absolute_url(self):
"""
获取文章绝对URL
"""
return reverse('blog:detailbyid', kwargs={
'article_id': self.id,
'year': self.creation_time.year,
'month': self.creation_time.month,
'day': self.creation_time.day
})
@cache_decorator(60 * 60 * 10)
def get_category_tree(self):
"""
获取分类树结构
"""
tree = self.category.get_category_tree()
names = list(map(lambda c: (c.name, c.get_absolute_url()), tree))
return names
def save(self, *args, **kwargs):
"""
保存文章
"""
super().save(*args, **kwargs)
def viewed(self):
"""
增加文章浏览量
"""
self.views += 1
self.save(update_fields=['views'])
def comment_list(self):
"""
获取文章评论列表带缓存
"""
cache_key = 'article_comments_{id}'.format(id=self.id)
value = cache.get(cache_key)
if value:
logger.info('get article comments:{id}'.format(id=self.id))
return value
else:
comments = self.comment_set.filter(is_enable=True).order_by('-id')
cache.set(cache_key, comments, 60 * 100)
logger.info('set article comments:{id}'.format(id=self.id))
return comments
def get_admin_url(self):
"""
获取管理后台的文章编辑URL
"""
info = (self._meta.app_label, self._meta.model_name)
return reverse('admin:%s_%s_change' % info, args=(self.pk,))
@cache_decorator(expiration=60 * 100)
def next_article(self):
"""
获取下一篇已发布的文章
"""
# 下一篇
return Article.objects.filter(
id__gt=self.id, status='p').order_by('id').first()
@cache_decorator(expiration=60 * 100)
def prev_article(self):
"""
获取上一篇已发布的文章
"""
# 前一篇
return Article.objects.filter(id__lt=self.id, status='p').first()
def get_first_image_url(self):
"""
从文章正文中获取第一张图片的URL
:return: 图片URL或空字符串
"""
match = re.search(r'!\[.*?\]\((.+?)\)', self.body)
if match:
return match.group(1)
return ""
class Category(BaseModel):
"""文章分类模型"""
name = models.CharField(_('category name'), max_length=30, unique=True)
parent_category = models.ForeignKey(
'self',
verbose_name=_('parent category'),
blank=True,
null=True,
on_delete=models.CASCADE)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
index = models.IntegerField(default=0, verbose_name=_('index'))
class Meta:
ordering = ['-index'] # 按索引降序排列
verbose_name = _('category')
verbose_name_plural = verbose_name
def get_absolute_url(self):
"""
获取分类绝对URL
"""
return reverse(
'blog:category_detail', kwargs={
'category_name': self.slug})
def __str__(self):
return self.name
@cache_decorator(60 * 60 * 10)
def get_category_tree(self):
"""
递归获得分类目录的父级分类树
:return: 分类列表
"""
categorys = []
def parse(category):
categorys.append(category)
if category.parent_category:
parse(category.parent_category)
parse(self)
return categorys
@cache_decorator(60 * 60 * 10)
def get_sub_categorys(self):
"""
获得当前分类目录所有子分类
:return: 子分类列表
"""
categorys = []
all_categorys = Category.objects.all()
def parse(category):
if category not in categorys:
categorys.append(category)
childs = all_categorys.filter(parent_category=category)
for child in childs:
if category not in categorys:
categorys.append(child)
parse(child)
parse(self)
return categorys
class Tag(BaseModel):
"""文章标签模型"""
name = models.CharField(_('tag name'), max_length=30, unique=True)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
def __str__(self):
return self.name
def get_absolute_url(self):
"""
获取标签绝对URL
"""
return reverse('blog:tag_detail', kwargs={'tag_name': self.slug})
@cache_decorator(60 * 60 * 10)
def get_article_count(self):
"""
获取该标签下的文章数量
"""
return Article.objects.filter(tags__name=self.name).distinct().count()
class Meta:
ordering = ['name'] # 按名称升序排列
verbose_name = _('tag')
verbose_name_plural = verbose_name
class Links(models.Model):
"""友情链接模型"""
name = models.CharField(_('link name'), max_length=30, unique=True)
link = models.URLField(_('link'))
sequence = models.IntegerField(_('order'), unique=True)
is_enable = models.BooleanField(
_('is show'), default=True, blank=False, null=False)
show_type = models.CharField(
_('show type'),
max_length=1,
choices=LinkShowType.choices,
default=LinkShowType.I)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_mod_time = models.DateTimeField(_('modify time'), default=now)
class Meta:
ordering = ['sequence'] # 按顺序排列
verbose_name = _('link')
verbose_name_plural = verbose_name
def __str__(self):
return self.name
class SideBar(models.Model):
"""侧边栏模型,可以展示一些html内容"""
name = models.CharField(_('title'), max_length=100)
content = models.TextField(_('content'))
sequence = models.IntegerField(_('order'), unique=True)
is_enable = models.BooleanField(_('is enable'), default=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_mod_time = models.DateTimeField(_('modify time'), default=now)
class Meta:
ordering = ['sequence'] # 按顺序排列
verbose_name = _('sidebar')
verbose_name_plural = verbose_name
def __str__(self):
return self.name
class BlogSettings(models.Model):
"""博客配置模型"""
site_name = models.CharField(
_('site name'),
max_length=200,
null=False,
blank=False,
default='')
site_description = models.TextField(
_('site description'),
max_length=1000,
null=False,
blank=False,
default='')
site_seo_description = models.TextField(
_('site seo description'), max_length=1000, null=False, blank=False, default='')
site_keywords = models.TextField(
_('site keywords'),
max_length=1000,
null=False,
blank=False,
default='')
article_sub_length = models.IntegerField(_('article sub length'), default=300)
sidebar_article_count = models.IntegerField(_('sidebar article count'), default=10)
sidebar_comment_count = models.IntegerField(_('sidebar comment count'), default=5)
article_comment_count = models.IntegerField(_('article comment count'), default=5)
show_google_adsense = models.BooleanField(_('show adsense'), default=False)
google_adsense_codes = models.TextField(
_('adsense code'), max_length=2000, null=True, blank=True, default='')
open_site_comment = models.BooleanField(_('open site comment'), default=True)
global_header = models.TextField("公共头部", null=True, blank=True, default='')
global_footer = models.TextField("公共尾部", null=True, blank=True, default='')
beian_code = models.CharField(
'备案号',
max_length=2000,
null=True,
blank=True,
default='')
analytics_code = models.TextField(
"网站统计代码",
max_length=1000,
null=False,
blank=False,
default='')
show_gongan_code = models.BooleanField(
'是否显示公安备案号', default=False, null=False)
gongan_beiancode = models.TextField(
'公安备案号',
max_length=2000,
null=True,
blank=True,
default='')
comment_need_review = models.BooleanField(
'评论是否需要审核', default=False, null=False)
class Meta:
verbose_name = _('Website configuration')
verbose_name_plural = verbose_name
def __str__(self):
return self.site_name
def clean(self):
"""
验证模型数据确保只存在一个配置实例
"""
if BlogSettings.objects.exclude(id=self.id).count():
raise ValidationError(_('There can only be one configuration'))
def save(self, *args, **kwargs):
"""
保存配置并清除缓存
"""
super().save(*args, **kwargs)
from djangoblog.utils import cache
cache.clear()
import logging
import re
from abc import abstractmethod
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.urls import reverse
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from mdeditor.fields import MDTextField
from uuslug import slugify
from djangoblog.utils import cache_decorator, cache
from djangoblog.utils import get_current_site
logger = logging.getLogger(__name__)

@ -4,10 +4,25 @@ from blog.models import Article
class ArticleIndex(indexes.SearchIndex, indexes.Indexable):
"""
文章搜索索引类用于Haystack全文搜索
"""
# 定义文档字段use_template=True表示使用模板来构建索引内容
text = indexes.CharField(document=True, use_template=True)
def get_model(self):
"""
返回索引对应的模型类
"""
return Article
def index_queryset(self, using=None):
return self.get_model().objects.filter(status='p')
"""
返回需要建立索引的查询集
Args:
using: 数据库别名
Returns:
已发布文章的查询集
"""
return self.get_model().objects.filter(status='p') # 只对已发布的文章建立索引

@ -20,18 +20,26 @@ from djangoblog.utils import get_current_site
from oauth.models import OAuthUser
from djangoblog.plugin_manage import hooks
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__) # 创建日志记录器
register = template.Library()
register = template.Library() # 创建模板标签注册器
@register.simple_tag(takes_context=True)
def head_meta(context):
"""
生成页面头部meta信息
"""
return mark_safe(hooks.apply_filters('head_meta', '', context))
@register.simple_tag
def timeformat(data):
"""
格式化时间显示
:param data: 时间数据
:return: 格式化后的时间字符串
"""
try:
return data.strftime(settings.TIME_FORMAT)
except Exception as e:
@ -41,6 +49,11 @@ def timeformat(data):
@register.simple_tag
def datetimeformat(data):
"""
格式化日期时间显示
:param data: 日期时间数据
:return: 格式化后的日期时间字符串
"""
try:
return data.strftime(settings.DATE_TIME_FORMAT)
except Exception as e:
@ -51,11 +64,21 @@ def datetimeformat(data):
@register.filter()
@stringfilter
def custom_markdown(content):
"""
将内容转换为markdown格式
:param content: 原始内容
:return: markdown格式的内容
"""
return mark_safe(CommonMarkdown.get_markdown(content))
@register.simple_tag
def get_markdown_toc(content):
"""
获取markdown内容的目录
:param content: markdown内容
:return: 目录HTML
"""
from djangoblog.utils import CommonMarkdown
body, toc = CommonMarkdown.get_markdown_with_toc(content)
return mark_safe(toc)
@ -64,6 +87,11 @@ def get_markdown_toc(content):
@register.filter()
@stringfilter
def comment_markdown(content):
"""
将评论内容转换为markdown格式并清理HTML
:param content: 评论内容
:return: 清理后的markdown格式内容
"""
content = CommonMarkdown.get_markdown(content)
return mark_safe(sanitize_html(content))
@ -73,8 +101,8 @@ def comment_markdown(content):
def truncatechars_content(content):
"""
获得文章内容的摘要
:param content:
:return:
:param content: 文章内容
:return: 截取后的摘要内容
"""
from django.template.defaultfilters import truncatechars_html
from djangoblog.utils import get_blog_setting
@ -85,17 +113,21 @@ def truncatechars_content(content):
@register.filter(is_safe=True)
@stringfilter
def truncate(content):
"""
截取内容前150个字符去除HTML标签
:param content: 原始内容
:return: 截取后的纯文本内容
"""
from django.utils.html import strip_tags
return strip_tags(content)[:150]
@register.inclusion_tag('blog/tags/breadcrumb.html')
def load_breadcrumb(article):
"""
获得文章面包屑
:param article:
:return:
获得文章面包屑导航
:param article: 文章对象
:return: 面包屑导航数据
"""
names = article.get_category_tree()
from djangoblog.utils import get_blog_setting
@ -114,9 +146,9 @@ def load_breadcrumb(article):
@register.inclusion_tag('blog/tags/article_tag_list.html')
def load_articletags(article):
"""
文章标签
:param article:
:return:
加载文章标签列表
:param article: 文章对象
:return: 标签列表数据
"""
tags = article.tags.all()
tags_list = []
@ -135,7 +167,9 @@ def load_articletags(article):
def load_sidebar(user, linktype):
"""
加载侧边栏
:return:
:param user: 当前用户
:param linktype: 链接显示类型
:return: 侧边栏数据
"""
value = cache.get("sidebar" + linktype)
if value:
@ -145,28 +179,38 @@ def load_sidebar(user, linktype):
logger.info('load sidebar')
from djangoblog.utils import get_blog_setting
blogsetting = get_blog_setting()
# 获取最新文章
recent_articles = Article.objects.filter(
status='p')[:blogsetting.sidebar_article_count]
# 获取所有分类
sidebar_categorys = Category.objects.all()
# 获取启用的额外侧边栏
extra_sidebars = SideBar.objects.filter(
is_enable=True).order_by('sequence')
# 获取阅读量最高的文章
most_read_articles = Article.objects.filter(status='p').order_by(
'-views')[:blogsetting.sidebar_article_count]
# 获取文章发布日期
dates = Article.objects.datetimes('creation_time', 'month', order='DESC')
# 获取友情链接
links = Links.objects.filter(is_enable=True).filter(
Q(show_type=str(linktype)) | Q(show_type=LinkShowType.A))
# 获取最新评论
commment_list = Comment.objects.filter(is_enable=True).order_by(
'-id')[:blogsetting.sidebar_comment_count]
# 标签云 计算字体大小
# 根据总数计算出平均值 大小为 (数目/平均值)*步长
increment = 5
tags = Tag.objects.all()
sidebar_tags = None
if tags and len(tags) > 0:
# 获取每个标签的文章数量
s = [t for t in [(t, t.get_article_count()) for t in tags] if t[1]]
count = sum([t[1] for t in s])
dd = 1 if (count == 0 or not len(tags)) else count / len(tags)
import random
# 计算标签字体大小
sidebar_tags = list(
map(lambda x: (x[0], x[1], (x[1] / dd) * increment + 10), s))
random.shuffle(sidebar_tags)
@ -185,6 +229,7 @@ def load_sidebar(user, linktype):
'sidebar_tags': sidebar_tags,
'extra_sidebars': extra_sidebars
}
# 缓存侧边栏数据3小时
cache.set("sidebar" + linktype, value, 60 * 60 * 60 * 3)
logger.info('set sidebar cache.key:{key}'.format(key="sidebar" + linktype))
value['user'] = user
@ -195,8 +240,9 @@ def load_sidebar(user, linktype):
def load_article_metas(article, user):
"""
获得文章meta信息
:param article:
:return:
:param article: 文章对象
:param user: 当前用户
:return: 文章meta信息数据
"""
return {
'article': article,
@ -206,8 +252,17 @@ def load_article_metas(article, user):
@register.inclusion_tag('blog/tags/article_pagination.html')
def load_pagination_info(page_obj, page_type, tag_name):
"""
加载分页信息
:param page_obj: 分页对象
:param page_type: 页面类型
:param tag_name: 标签名/分类名/作者名
:return: 分页链接数据
"""
previous_url = ''
next_url = ''
# 首页分页
if page_type == '':
if page_obj.has_next():
next_number = page_obj.next_page_number()
@ -217,6 +272,8 @@ def load_pagination_info(page_obj, page_type, tag_name):
previous_url = reverse(
'blog:index_page', kwargs={
'page': previous_number})
# 标签页面分页
if page_type == '分类标签归档':
tag = get_object_or_404(Tag, name=tag_name)
if page_obj.has_next():
@ -233,6 +290,8 @@ def load_pagination_info(page_obj, page_type, tag_name):
kwargs={
'page': previous_number,
'tag_name': tag.slug})
# 作者文章页面分页
if page_type == '作者文章归档':
if page_obj.has_next():
next_number = page_obj.next_page_number()
@ -249,6 +308,7 @@ def load_pagination_info(page_obj, page_type, tag_name):
'page': previous_number,
'author_name': tag_name})
# 分类目录页面分页
if page_type == '分类目录归档':
category = get_object_or_404(Category, name=tag_name)
if page_obj.has_next():
@ -277,9 +337,10 @@ def load_pagination_info(page_obj, page_type, tag_name):
def load_article_detail(article, isindex, user):
"""
加载文章详情
:param article:
:param isindex:是否列表页若是列表页只显示摘要
:return:
:param article: 文章对象
:param isindex: 是否为列表页
:param user: 当前用户
:return: 文章详情数据
"""
from djangoblog.utils import get_blog_setting
blogsetting = get_blog_setting()
@ -296,12 +357,18 @@ def load_article_detail(article, isindex, user):
# TEMPLATE USE: {{ email|gravatar_url:150 }}
@register.filter
def gravatar_url(email, size=40):
"""获得gravatar头像"""
"""
获得gravatar头像URL
:param email: 邮箱地址
:param size: 头像大小
:return: gravatar头像URL
"""
cachekey = 'gravatat/' + email
url = cache.get(cachekey)
if url:
return url
else:
# 查找OAuth用户是否有自定义头像
usermodels = OAuthUser.objects.filter(email=email)
if usermodels:
o = list(filter(lambda x: x.picture is not None, usermodels))
@ -311,8 +378,10 @@ def gravatar_url(email, size=40):
default = static('blog/img/avatar.png')
# 生成gravatar URL
url = "https://www.gravatar.com/avatar/%s?%s" % (hashlib.md5(
email.lower()).hexdigest(), urllib.parse.urlencode({'d': default, 's': str(size)}))
# 缓存头像URL 10小时
cache.set(cachekey, url, 60 * 60 * 10)
logger.info('set gravatar cache.key:{key}'.format(key=cachekey))
return url
@ -320,25 +389,39 @@ def gravatar_url(email, size=40):
@register.filter
def gravatar(email, size=40):
"""获得gravatar头像"""
"""
获得gravatar头像HTML标签
:param email: 邮箱地址
:param size: 头像大小
:return: 头像img标签
"""
url = gravatar_url(email, size)
return mark_safe(
'<img src="%s" height="%d" width="%d">' %
(url, size, size))
'<img src="%s" height="%d" width="%d">' % (url, size, size))
@register.simple_tag
def query(qs, **kwargs):
""" template tag which allows queryset filtering. Usage:
{% query books author=author as mybooks %}
{% for book in mybooks %}
...
{% endfor %}
"""
模板标签允许查询集过滤用法:
{% query books author=author as mybooks %}
{% for book in mybooks %}
...
{% endfor %}
:param qs: 查询集
:param kwargs: 过滤条件
:return: 过滤后的查询集
"""
return qs.filter(**kwargs)
@register.filter
def addstr(arg1, arg2):
"""concatenate arg1 & arg2"""
"""
连接两个字符串
:param arg1: 第一个字符串
:param arg2: 第二个字符串
:return: 连接后的字符串
"""
return str(arg1) + str(arg2)

@ -20,12 +20,22 @@ from oauth.models import OAuthUser, OAuthConfig
# Create your tests here.
class ArticleTest(TestCase):
"""
文章相关功能测试类
"""
def setUp(self):
self.client = Client()
self.factory = RequestFactory()
"""
测试前的准备工作
"""
self.client = Client() # 创建测试客户端
self.factory = RequestFactory() # 创建请求工厂
def test_validate_article(self):
"""
测试文章相关功能验证
"""
site = get_current_site().domain
# 创建或获取测试用户
user = BlogUser.objects.get_or_create(
email="liangliangyy@gmail.com",
username="liangliangyy")[0]
@ -33,10 +43,16 @@ class ArticleTest(TestCase):
user.is_staff = True
user.is_superuser = True
user.save()
# 测试用户详情页访问
response = self.client.get(user.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 测试管理页面访问
response = self.client.get('/admin/servermanager/emailsendlog/')
response = self.client.get('admin/admin/logentry/')
# 创建侧边栏
s = SideBar()
s.sequence = 1
s.name = 'test'
@ -44,16 +60,19 @@ class ArticleTest(TestCase):
s.is_enable = True
s.save()
# 创建分类
category = Category()
category.name = "category"
category.creation_time = timezone.now()
category.last_mod_time = timezone.now()
category.save()
# 创建标签
tag = Tag()
tag.name = "nicetag"
tag.save()
# 创建文章
article = Article()
article.title = "nicetitle"
article.body = "nicecontent"
@ -68,6 +87,7 @@ class ArticleTest(TestCase):
article.save()
self.assertEqual(1, article.tags.count())
# 创建更多测试文章
for i in range(20):
article = Article()
article.title = "nicetitle" + str(i)
@ -79,32 +99,46 @@ class ArticleTest(TestCase):
article.save()
article.tags.add(tag)
article.save()
# 如果启用了Elasticsearch测试搜索功能
from blog.documents import ELASTICSEARCH_ENABLED
if ELASTICSEARCH_ENABLED:
call_command("build_index")
response = self.client.get('/search', {'q': 'nicetitle'})
self.assertEqual(response.status_code, 200)
# 测试文章详情页访问
response = self.client.get(article.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 测试蜘蛛通知功能
from djangoblog.spider_notify import SpiderNotify
SpiderNotify.notify(article.get_absolute_url())
# 测试标签页访问
response = self.client.get(tag.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 测试分类页访问
response = self.client.get(category.get_absolute_url())
self.assertEqual(response.status_code, 200)
# 测试搜索功能
response = self.client.get('/search', {'q': 'django'})
self.assertEqual(response.status_code, 200)
# 测试加载文章标签功能
s = load_articletags(article)
self.assertIsNotNone(s)
# 用户登录测试
self.client.login(username='liangliangyy', password='liangliangyy')
# 测试归档页面访问
response = self.client.get(reverse('blog:archives'))
self.assertEqual(response.status_code, 200)
# 测试分页功能
p = Paginator(Article.objects.all(), settings.PAGINATE_BY)
self.check_pagination(p, '', '')
@ -119,16 +153,20 @@ class ArticleTest(TestCase):
p = Paginator(Article.objects.filter(category=category), settings.PAGINATE_BY)
self.check_pagination(p, '分类目录归档', category.slug)
# 测试搜索表单
f = BlogSearchForm()
f.search()
# self.client.login(username='liangliangyy', password='liangliangyy')
# 测试百度通知功能
from djangoblog.spider_notify import SpiderNotify
SpiderNotify.baidu_notify([article.get_full_url()])
# 测试头像功能
from blog.templatetags.blog_tags import gravatar_url, gravatar
u = gravatar_url('liangliangyy@gmail.com')
u = gravatar('liangliangyy@gmail.com')
# 测试友情链接
link = Links(
sequence=1,
name="lylinux",
@ -137,37 +175,53 @@ class ArticleTest(TestCase):
response = self.client.get('/links.html')
self.assertEqual(response.status_code, 200)
# 测试RSS订阅
response = self.client.get('/feed/')
self.assertEqual(response.status_code, 200)
# 测试站点地图
response = self.client.get('/sitemap.xml')
self.assertEqual(response.status_code, 200)
# 测试管理后台功能
self.client.get("/admin/blog/article/1/delete/")
self.client.get('/admin/servermanager/emailsendlog/')
self.client.get('/admin/admin/logentry/')
self.client.get('/admin/admin/logentry/1/change/')
def check_pagination(self, p, type, value):
"""
检查分页功能
"""
for page in range(1, p.num_pages + 1):
s = load_pagination_info(p.page(page), type, value)
self.assertIsNotNone(s)
# 测试上一页链接
if s['previous_url']:
response = self.client.get(s['previous_url'])
self.assertEqual(response.status_code, 200)
# 测试下一页链接
if s['next_url']:
response = self.client.get(s['next_url'])
self.assertEqual(response.status_code, 200)
def test_image(self):
"""
测试图片上传功能
"""
import requests
# 下载测试图片
rsp = requests.get(
'https://www.python.org/static/img/python-logo.png')
imagepath = os.path.join(settings.BASE_DIR, 'python.png')
with open(imagepath, 'wb') as file:
file.write(rsp.content)
# 测试未授权上传
rsp = self.client.post('/upload')
self.assertEqual(rsp.status_code, 403)
# 测试授权上传
sign = get_sha256(get_sha256(settings.SECRET_KEY))
with open(imagepath, 'rb') as file:
imgfile = SimpleUploadedFile(
@ -176,17 +230,28 @@ class ArticleTest(TestCase):
rsp = self.client.post(
'/upload?sign=' + sign, form_data, follow=True)
self.assertEqual(rsp.status_code, 200)
# 清理测试文件
os.remove(imagepath)
# 测试用户头像保存和邮件发送功能
from djangoblog.utils import save_user_avatar, send_email
send_email(['qq@qq.com'], 'testTitle', 'testContent')
save_user_avatar(
'https://www.python.org/static/img/python-logo.png')
def test_errorpage(self):
"""
测试错误页面
"""
rsp = self.client.get('/eee')
self.assertEqual(rsp.status_code, 404)
def test_commands(self):
"""
测试管理命令
"""
# 创建测试用户
user = BlogUser.objects.get_or_create(
email="liangliangyy@gmail.com",
username="liangliangyy")[0]
@ -195,12 +260,14 @@ class ArticleTest(TestCase):
user.is_superuser = True
user.save()
# 创建OAuth配置
c = OAuthConfig()
c.type = 'qq'
c.appkey = 'appkey'
c.appsecret = 'appsecret'
c.save()
# 创建OAuth用户
u = OAuthUser()
u.type = 'qq'
u.openid = 'openid'
@ -222,6 +289,7 @@ class ArticleTest(TestCase):
}'''
u.save()
# 测试各种管理命令
from blog.documents import ELASTICSEARCH_ENABLED
if ELASTICSEARCH_ENABLED:
call_command("build_index")
@ -230,3 +298,4 @@ class ArticleTest(TestCase):
call_command("clear_cache")
call_command("sync_user_avatar")
call_command("build_search_words")

@ -3,60 +3,74 @@ from django.views.decorators.cache import cache_page
from . import views
app_name = "blog"
app_name = "blog" # 定义应用的命名空间
urlpatterns = [
# 首页路由
path(
r'',
views.IndexView.as_view(),
name='index'),
# 首页分页路由
path(
r'page/<int:page>/',
views.IndexView.as_view(),
name='index_page'),
# 文章详情页路由根据年月日和文章ID访问
path(
r'article/<int:year>/<int:month>/<int:day>/<int:article_id>.html',
views.ArticleDetailView.as_view(),
name='detailbyid'),
# 分类详情页路由
path(
r'category/<slug:category_name>.html',
views.CategoryDetailView.as_view(),
name='category_detail'),
# 分类详情页分页路由
path(
r'category/<slug:category_name>/<int:page>.html',
views.CategoryDetailView.as_view(),
name='category_detail_page'),
# 作者详情页路由
path(
r'author/<author_name>.html',
views.AuthorDetailView.as_view(),
name='author_detail'),
# 作者详情页分页路由
path(
r'author/<author_name>/<int:page>.html',
views.AuthorDetailView.as_view(),
name='author_detail_page'),
# 标签详情页路由
path(
r'tag/<slug:tag_name>.html',
views.TagDetailView.as_view(),
name='tag_detail'),
# 标签详情页分页路由
path(
r'tag/<slug:tag_name>/<int:page>.html',
views.TagDetailView.as_view(),
name='tag_detail_page'),
# 归档页面路由缓存1小时
path(
'archives.html',
cache_page(
60 * 60)(
views.ArchivesView.as_view()),
name='archives'),
# 友情链接页面路由
path(
'links.html',
views.LinkListView.as_view(),
name='links'),
# 文件上传路由
path(
r'upload',
views.fileupload,
name='upload'),
# 清理缓存路由
path(
r'clean',
views.clean_cache_view,
name='clean'),
]

@ -21,10 +21,13 @@ from djangoblog.plugin_manage import hooks
from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
from djangoblog.utils import cache, get_blog_setting, get_sha256
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__) # 创建日志记录器
class ArticleListView(ListView):
"""
文章列表视图基类提供分页和缓存功能
"""
# template_name属性用于指定使用哪个模板进行渲染
template_name = 'blog/article_index.html'
@ -33,15 +36,21 @@ class ArticleListView(ListView):
# 页面类型,分类目录或标签列表等
page_type = ''
paginate_by = settings.PAGINATE_BY
page_kwarg = 'page'
link_type = LinkShowType.L
paginate_by = settings.PAGINATE_BY # 每页文章数量
page_kwarg = 'page' # 分页参数名
link_type = LinkShowType.L # 链接显示类型
def get_view_cache_key(self):
"""
获取视图缓存key
"""
return self.request.get['pages']
@property
def page_number(self):
"""
获取当前页码
"""
page_kwarg = self.page_kwarg
page = self.kwargs.get(
page_kwarg) or self.request.GET.get(page_kwarg) or 1
@ -63,7 +72,7 @@ class ArticleListView(ListView):
'''
缓存页面数据
:param cache_key: 缓存key
:return:
:return: 查询结果集
'''
value = cache.get(cache_key)
if value:
@ -78,36 +87,45 @@ class ArticleListView(ListView):
def get_queryset(self):
'''
重写默认从缓存获取数据
:return:
:return: 查询结果集
'''
key = self.get_queryset_cache_key()
value = self.get_queryset_from_cache(key)
return value
def get_context_data(self, **kwargs):
"""
添加额外的上下文数据
"""
kwargs['linktype'] = self.link_type
return super(ArticleListView, self).get_context_data(**kwargs)
class IndexView(ArticleListView):
'''
首页
首页视图
'''
# 友情链接类型
link_type = LinkShowType.I
def get_queryset_data(self):
"""
获取首页文章数据
"""
article_list = Article.objects.filter(type='a', status='p')
return article_list
def get_queryset_cache_key(self):
"""
获取首页缓存key
"""
cache_key = 'index_{page}'.format(page=self.page_number)
return cache_key
class ArticleDetailView(DetailView):
'''
文章详情页面
文章详情页面视图
'''
template_name = 'blog/article_detail.html'
model = Article
@ -115,11 +133,15 @@ class ArticleDetailView(DetailView):
context_object_name = "article"
def get_context_data(self, **kwargs):
comment_form = CommentForm()
"""
获取文章详情页的上下文数据
"""
comment_form = CommentForm() # 评论表单
article_comments = self.object.comment_list()
parent_comments = article_comments.filter(parent_comment=None)
blog_setting = get_blog_setting()
article_comments = self.object.comment_list() # 获取文章评论列表
parent_comments = article_comments.filter(parent_comment=None) # 获取父级评论
blog_setting = get_blog_setting() # 获取博客设置
# 对父级评论进行分页
paginator = Paginator(parent_comments, blog_setting.article_comment_count)
page = self.request.GET.get('comment_page', '1')
if not page.isnumeric():
@ -132,6 +154,7 @@ class ArticleDetailView(DetailView):
page = paginator.num_pages
p_comments = paginator.page(page)
# 计算评论分页的前后页链接
next_page = p_comments.next_page_number() if p_comments.has_next() else None
prev_page = p_comments.previous_page_number() if p_comments.has_previous() else None
@ -141,12 +164,15 @@ class ArticleDetailView(DetailView):
if prev_page:
kwargs[
'comment_prev_page_url'] = self.object.get_absolute_url() + f'?comment_page={prev_page}#commentlist-container'
# 添加评论相关数据到上下文
kwargs['form'] = comment_form
kwargs['article_comments'] = article_comments
kwargs['p_comments'] = p_comments
kwargs['comment_count'] = len(
article_comments) if article_comments else 0
# 添加上下篇文章信息
kwargs['next_article'] = self.object.next_article
kwargs['prev_article'] = self.object.prev_article
@ -154,7 +180,7 @@ class ArticleDetailView(DetailView):
article = self.object
# Action Hook, 通知插件"文章详情已获取"
hooks.run_action('after_article_body_get', article=article, request=self.request)
# # Filter Hook, 允许插件修改文章正文
# Filter Hook, 允许插件修改文章正文
article.body = hooks.apply_filters(ARTICLE_CONTENT_HOOK_NAME, article.body, article=article,
request=self.request)
@ -163,11 +189,14 @@ class ArticleDetailView(DetailView):
class CategoryDetailView(ArticleListView):
'''
分类目录列表
分类目录列表视图
'''
page_type = "分类目录归档"
def get_queryset_data(self):
"""
获取分类文章数据
"""
slug = self.kwargs['category_name']
category = get_object_or_404(Category, slug=slug)
@ -180,6 +209,9 @@ class CategoryDetailView(ArticleListView):
return article_list
def get_queryset_cache_key(self):
"""
获取分类列表缓存key
"""
slug = self.kwargs['category_name']
category = get_object_or_404(Category, slug=slug)
categoryname = category.name
@ -189,7 +221,9 @@ class CategoryDetailView(ArticleListView):
return cache_key
def get_context_data(self, **kwargs):
"""
添加分类相关上下文数据
"""
categoryname = self.categoryname
try:
categoryname = categoryname.split('/')[-1]
@ -202,11 +236,14 @@ class CategoryDetailView(ArticleListView):
class AuthorDetailView(ArticleListView):
'''
作者详情页
作者详情页视图
'''
page_type = '作者文章归档'
def get_queryset_cache_key(self):
"""
获取作者文章列表缓存key
"""
from uuslug import slugify
author_name = slugify(self.kwargs['author_name'])
cache_key = 'author_{author_name}_{page}'.format(
@ -214,12 +251,18 @@ class AuthorDetailView(ArticleListView):
return cache_key
def get_queryset_data(self):
"""
获取作者文章数据
"""
author_name = self.kwargs['author_name']
article_list = Article.objects.filter(
author__username=author_name, type='a', status='p')
return article_list
def get_context_data(self, **kwargs):
"""
添加作者相关上下文数据
"""
author_name = self.kwargs['author_name']
kwargs['page_type'] = AuthorDetailView.page_type
kwargs['tag_name'] = author_name
@ -228,11 +271,14 @@ class AuthorDetailView(ArticleListView):
class TagDetailView(ArticleListView):
'''
标签列表页面
标签列表页面视图
'''
page_type = '分类标签归档'
def get_queryset_data(self):
"""
获取标签文章数据
"""
slug = self.kwargs['tag_name']
tag = get_object_or_404(Tag, slug=slug)
tag_name = tag.name
@ -242,6 +288,9 @@ class TagDetailView(ArticleListView):
return article_list
def get_queryset_cache_key(self):
"""
获取标签列表缓存key
"""
slug = self.kwargs['tag_name']
tag = get_object_or_404(Tag, slug=slug)
tag_name = tag.name
@ -251,6 +300,9 @@ class TagDetailView(ArticleListView):
return cache_key
def get_context_data(self, **kwargs):
"""
添加标签相关上下文数据
"""
# tag_name = self.kwargs['tag_name']
tag_name = self.name
kwargs['page_type'] = TagDetailView.page_type
@ -260,31 +312,49 @@ class TagDetailView(ArticleListView):
class ArchivesView(ArticleListView):
'''
文章归档页面
文章归档页面视图
'''
page_type = '文章归档'
paginate_by = None
page_kwarg = None
paginate_by = None # 不分页
page_kwarg = None # 无分页参数
template_name = 'blog/article_archives.html'
def get_queryset_data(self):
"""
获取所有已发布文章
"""
return Article.objects.filter(status='p').all()
def get_queryset_cache_key(self):
"""
获取归档页面缓存key
"""
cache_key = 'archives'
return cache_key
class LinkListView(ListView):
"""
友情链接列表视图
"""
model = Links
template_name = 'blog/links_list.html'
def get_queryset(self):
"""
获取启用的友情链接
"""
return Links.objects.filter(is_enable=True)
class EsSearchView(SearchView):
"""
Elasticsearch搜索视图
"""
def get_context(self):
"""
获取搜索上下文数据
"""
paginator, page = self.build_page()
context = {
"query": self.query,
@ -303,31 +373,37 @@ class EsSearchView(SearchView):
@csrf_exempt
def fileupload(request):
"""
该方法需自己写调用端来上传图片该方法仅提供图床功能
:param request:
:return:
文件上传视图该方法需自己写调用端来上传图片该方法仅提供图床功能
:param request: HTTP请求对象
:return: 上传文件的URL列表
"""
if request.method == 'POST':
sign = request.GET.get('sign', None)
# 验证签名,确保是授权上传
if not sign:
return HttpResponseForbidden()
if not sign == get_sha256(get_sha256(settings.SECRET_KEY)):
return HttpResponseForbidden()
response = []
# 处理上传的每个文件
for filename in request.FILES:
timestr = timezone.now().strftime('%Y/%m/%d')
imgextensions = ['jpg', 'png', 'jpeg', 'bmp']
fname = u''.join(str(filename))
isimage = len([i for i in imgextensions if fname.find(i) >= 0]) > 0
# 根据文件类型确定保存目录
base_dir = os.path.join(settings.STATICFILES, "files" if not isimage else "image", timestr)
if not os.path.exists(base_dir):
os.makedirs(base_dir)
# 生成安全的保存路径
savepath = os.path.normpath(os.path.join(base_dir, f"{uuid.uuid4().hex}{os.path.splitext(filename)[-1]}"))
if not savepath.startswith(base_dir):
return HttpResponse("only for post")
# 保存文件
with open(savepath, 'wb+') as wfile:
for chunk in request.FILES[filename].chunks():
wfile.write(chunk)
# 如果是图片,进行压缩优化
if isimage:
from PIL import Image
image = Image.open(savepath)
@ -344,6 +420,9 @@ def page_not_found_view(
request,
exception,
template_name='blog/error_page.html'):
"""
404页面不存在视图
"""
if exception:
logger.error(exception)
url = request.get_full_path()
@ -355,6 +434,9 @@ def page_not_found_view(
def server_error_view(request, template_name='blog/error_page.html'):
"""
500服务器错误视图
"""
return render(request,
template_name,
{'message': _('Sorry, the server is busy, please click the home page to see other?'),
@ -366,6 +448,9 @@ def permission_denied_view(
request,
exception,
template_name='blog/error_page.html'):
"""
403权限拒绝视图
"""
if exception:
logger.error(exception)
return render(
@ -375,5 +460,9 @@ def permission_denied_view(
def clean_cache_view(request):
"""
清理缓存视图
"""
cache.clear()
return HttpResponse('ok')

Loading…
Cancel
Save