jiang0110-Jan 2 months ago
commit 21b7b9bebf

@ -1,13 +0,0 @@
R
^R
:wq
x

@ -1,103 +0,0 @@
# 导入Django表单模块
from django import forms
# 导入Django默认用户管理类
from django.contrib.auth.admin import UserAdmin
# 导入用户修改表单
from django.contrib.auth.forms import UserChangeForm
# 导入用户名字段
from django.contrib.auth.forms import UsernameField
# 导入国际化翻译函数
from django.utils.translation import gettext_lazy as _
# 注册模型到管理后台
# 导入自定义用户模型
from .models import BlogUser
class BlogUserCreationForm(forms.ModelForm):
"""自定义用户创建表单,用于管理员后台创建用户"""
# 密码字段1 - 输入密码
password1 = forms.CharField(
label=_('password'), # 字段标签:密码
widget=forms.PasswordInput # 使用密码输入控件
)
# 密码字段2 - 确认密码
password2 = forms.CharField(
label=_('Enter password again'), # 字段标签:再次输入密码
widget=forms.PasswordInput # 使用密码输入控件
)
class Meta:
# 指定使用的模型
model = BlogUser
# 表单包含的字段:仅邮箱
fields = ('email',)
def clean_password2(self):
"""清理和验证密码确认字段"""
# 从已清理数据中获取密码1
password1 = self.cleaned_data.get("password1")
# 从已清理数据中获取密码2
password2 = self.cleaned_data.get("password2")
# 检查两个密码是否存在且匹配
if password1 and password2 and password1 != password2:
# 如果不匹配,抛出验证错误
raise forms.ValidationError(_("passwords do not match"))
# 返回验证通过的密码2
return password2
def save(self, commit=True):
"""保存用户实例,处理密码哈希"""
# 调用父类save方法但不立即提交到数据库
user = super().save(commit=False)
# 使用Django的密码哈希方法设置密码
user.set_password(self.cleaned_data["password1"])
# 如果设置为立即提交
if commit:
# 设置用户来源为管理员站点
user.source = 'adminsite'
# 保存用户到数据库
user.save()
# 返回用户实例
return user
class BlogUserChangeForm(UserChangeForm):
"""自定义用户信息修改表单"""
class Meta:
# 指定使用的模型
model = BlogUser
# 包含所有字段
fields = '__all__'
# 字段类映射,用户名使用特定字段类
field_classes = {'username': UsernameField}
def __init__(self, *args, **kwargs):
"""初始化表单"""
# 调用父类初始化方法
super().__init__(*args, **kwargs)
class BlogUserAdmin(UserAdmin):
"""自定义用户管理类配置Django管理后台的用户界面"""
# 指定修改用户时使用的表单
form = BlogUserChangeForm
# 指定创建用户时使用的表单
add_form = BlogUserCreationForm
# 列表页面显示的字段
list_display = (
'id', # 用户ID
'nickname', # 昵称
'username', # 用户名
'email', # 邮箱
'last_login', # 最后登录时间
'date_joined', # 注册时间
'source' # 用户来源
)
# 列表中可作为链接点击的字段
list_display_links = ('id', 'username')
# 默认排序字段按ID降序排列
ordering = ('-id',)

@ -1,9 +0,0 @@
# 导入Django应用配置基类
from django.apps import AppConfig
class AccountsConfig(AppConfig):
"""账户应用的配置类"""
# 应用的名称Python路径
name = 'accounts'

@ -1,194 +0,0 @@
# 导入Django表单模块
from django import forms
# 导入用户模型获取函数和密码验证工具
from django.contrib.auth import get_user_model, password_validation
# 导入Django内置认证表单
from django.contrib.auth.forms import AuthenticationForm, UserCreationForm
# 导入验证异常
from django.core.exceptions import ValidationError
# 导入表单控件
from django.forms import widgets
# 导入国际化翻译函数
from django.utils.translation import gettext_lazy as _
# 导入工具函数
from . import utils
# 导入用户模型
from .models import BlogUser
class LoginForm(AuthenticationForm):
"""用户登录表单继承自Django内置认证表单"""
def __init__(self, *args, **kwargs):
"""初始化表单,自定义字段控件"""
# 调用父类初始化方法
super(LoginForm, self).__init__(*args, **kwargs)
# 自定义用户名字段控件文本输入框带占位符和CSS类
self.fields['username'].widget = widgets.TextInput(
attrs={
'placeholder': "username", # 输入框占位符文本
"class": "form-control" # CSS类名用于样式
}
)
# 自定义密码字段控件密码输入框带占位符和CSS类
self.fields['password'].widget = widgets.PasswordInput(
attrs={
'placeholder': "password", # 输入框占位符文本
"class": "form-control" # CSS类名用于样式
}
)
class RegisterForm(UserCreationForm):
"""用户注册表单继承自Django内置用户创建表单"""
def __init__(self, *args, **kwargs):
"""初始化表单,自定义所有字段的控件"""
# 调用父类初始化方法
super(RegisterForm, self).__init__(*args, **kwargs)
# 自定义用户名字段控件
self.fields['username'].widget = widgets.TextInput(
attrs={
'placeholder': "username", # 占位符:用户名
"class": "form-control" # CSS类
}
)
# 自定义邮箱字段控件
self.fields['email'].widget = widgets.EmailInput(
attrs={
'placeholder': "email", # 占位符:邮箱
"class": "form-control" # CSS类
}
)
# 自定义密码字段控件
self.fields['password1'].widget = widgets.PasswordInput(
attrs={
'placeholder': "password", # 占位符:密码
"class": "form-control" # CSS类
}
)
# 自定义密码确认字段控件
self.fields['password2'].widget = widgets.PasswordInput(
attrs={
'placeholder': "repeat password", # 占位符:重复密码
"class": "form-control" # CSS类
}
)
def clean_email(self):
"""清理和验证邮箱字段,确保邮箱唯一性"""
# 从已清理数据中获取邮箱
email = self.cleaned_data['email']
# 检查数据库中是否已存在该邮箱
if get_user_model().objects.filter(email=email).exists():
# 如果邮箱已存在,抛出验证错误
raise ValidationError(_("email already exists"))
# 返回验证通过的邮箱
return email
class Meta:
"""表单元数据配置"""
# 指定表单关联的模型
model = get_user_model()
# 表单包含的字段:用户名和邮箱
fields = ("username", "email")
class ForgetPasswordForm(forms.Form):
"""忘记密码重置表单"""
# 新密码字段1
new_password1 = forms.CharField(
label=_("New password"), # 字段标签:新密码
widget=forms.PasswordInput( # 使用密码输入控件
attrs={
"class": "form-control", # CSS类
'placeholder': _("New password") # 占位符:新密码
}
),
)
# 新密码字段2 - 确认密码
new_password2 = forms.CharField(
label="确认密码", # 字段标签:确认密码(硬编码中文)
widget=forms.PasswordInput( # 使用密码输入控件
attrs={
"class": "form-control", # CSS类
'placeholder': _("Confirm password") # 占位符:确认密码
}
),
)
# 邮箱字段
email = forms.EmailField(
label='邮箱', # 字段标签:邮箱(硬编码中文)
widget=forms.TextInput( # 使用文本输入控件
attrs={
'class': 'form-control', # CSS类
'placeholder': _("Email") # 占位符:邮箱
}
),
)
# 验证码字段
code = forms.CharField(
label=_('Code'), # 字段标签:验证码
widget=forms.TextInput( # 使用文本输入控件
attrs={
'class': 'form-control', # CSS类
'placeholder': _("Code") # 占位符:验证码
}
),
)
def clean_new_password2(self):
"""清理和验证密码确认字段"""
# 从原始数据中获取新密码1不使用cleaned_data因为可能还未验证
password1 = self.data.get("new_password1")
# 从原始数据中获取新密码2
password2 = self.data.get("new_password2")
# 检查两个密码是否存在且匹配
if password1 and password2 and password1 != password2:
# 如果不匹配,抛出验证错误
raise ValidationError(_("passwords do not match"))
# 使用Django内置密码验证器验证密码强度
password_validation.validate_password(password2)
# 返回验证通过的密码
return password2
def clean_email(self):
"""清理和验证邮箱字段,确保邮箱已注册"""
# 从已清理数据中获取邮箱
user_email = self.cleaned_data.get("email")
# 检查数据库中是否存在该邮箱的用户
if not BlogUser.objects.filter(email=user_email).exists():
# 安全提示:这里会暴露邮箱是否注册,可根据安全需求修改
# 如果邮箱不存在,抛出验证错误
raise ValidationError(_("email does not exist"))
# 返回验证通过的邮箱
return user_email
def clean_code(self):
"""清理和验证验证码字段"""
# 从已清理数据中获取验证码
code = self.cleaned_data.get("code")
# 调用工具函数验证验证码是否正确
error = utils.verify(
email=self.cleaned_data.get("email"), # 传入邮箱
code=code, # 传入验证码
)
# 如果验证返回错误信息
if error:
# 抛出验证错误
raise ValidationError(error)
# 返回验证通过的验证码
return code
class ForgetPasswordCodeForm(forms.Form):
"""获取忘记密码验证码的表单"""
# 邮箱字段
email = forms.EmailField(
label=_('Email'), # 字段标签:邮箱
)

@ -1,72 +0,0 @@
# 导入Django抽象用户基类
from django.contrib.auth.models import AbstractUser
# 导入Django数据库模型
from django.db import models
# 导入URL反向解析函数
from django.urls import reverse
# 导入当前时间获取函数
from django.utils.timezone import now
# 导入国际化翻译函数
from django.utils.translation import gettext_lazy as _
# 导入获取当前站点的工具函数
from djangoblog.utils import get_current_site
# 在这里创建模型
class BlogUser(AbstractUser):
"""自定义用户模型继承自Django抽象用户基类"""
# 昵称字段最大长度100字符允许为空
nickname = models.CharField(
_('nick name'), # 字段显示名称:昵称
max_length=100, # 最大长度
blank=True # 允许为空
)
# 创建时间字段,默认值为当前时间
creation_time = models.DateTimeField(
_('creation time'), # 字段显示名称:创建时间
default=now # 默认值:当前时间
)
# 最后修改时间字段,默认值为当前时间
last_modify_time = models.DateTimeField(
_('last modify time'), # 字段显示名称:最后修改时间
default=now # 默认值:当前时间
)
# 用户来源字段最大长度100字符允许为空
source = models.CharField(
_('create source'), # 字段显示名称:创建来源
max_length=100, # 最大长度
blank=True # 允许为空
)
def get_absolute_url(self):
"""获取用户的绝对URL用于生成用户详情页链接"""
# 使用reverse反向解析URL传入用户名作为参数
return reverse(
'blog:author_detail', # URL模式名称
kwargs={'author_name': self.username} # URL参数作者用户名
)
def __str__(self):
"""对象的字符串表示,返回邮箱地址"""
return self.email
def get_full_url(self):
"""获取用户的完整URL包含域名"""
# 获取当前站点域名
site = get_current_site().domain
# 构建完整URLhttps://域名 + 用户详情页路径
url = "https://{site}{path}".format(
site=site, # 站点域名
path=self.get_absolute_url() # 用户详情页路径
)
# 返回完整URL
return url
class Meta:
"""模型的元数据配置"""
ordering = ['-id'] # 默认排序按ID降序排列
verbose_name = _('user') # 单数显示名称:用户
verbose_name_plural = verbose_name # 复数显示名称:与单数相同
get_latest_by = 'id' # 获取最新记录的依据字段ID

@ -1,294 +0,0 @@
# 导入Django测试客户端、请求工厂、测试用例
from django.test import Client, RequestFactory, TestCase
# 导入URL反向解析
from django.urls import reverse
# 导入时区工具
from django.utils import timezone
# 导入延迟翻译函数
from django.utils.translation import gettext_lazy as _
# 导入账户模型
from accounts.models import BlogUser
# 导入博客模型
from blog.models import Article, Category
# 导入项目工具函数
from djangoblog.utils import *
# 导入当前应用的工具函数
from . import utils
# 在这里创建测试
class AccountTest(TestCase):
"""账户功能测试类"""
def setUp(self):
"""测试前置设置,每个测试方法执行前都会调用"""
# 创建测试客户端
self.client = Client()
# 创建请求工厂
self.factory = RequestFactory()
# 创建测试用户
self.blog_user = BlogUser.objects.create_user(
username="test", # 用户名
email="admin@admin.com", # 邮箱
password="12345678" # 密码
)
# 设置新测试密码
self.new_test = "xxx123--="
def test_validate_account(self):
"""测试账户验证功能"""
# 获取当前站点
site = get_current_site().domain
# 创建超级用户
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com", # 邮箱
username="liangliangyy1", # 用户名
password="qwer!@#$ggg" # 密码
)
# 从数据库获取刚创建的用户
testuser = BlogUser.objects.get(username='liangliangyy1')
# 尝试登录
loginresult = self.client.login(
username='liangliangyy1', # 用户名
password='qwer!@#$ggg' # 密码
)
# 断言登录成功
self.assertEqual(loginresult, True)
# 访问管理员页面
response = self.client.get('/admin/')
# 断言页面访问成功
self.assertEqual(response.status_code, 200)
# 创建分类
category = Category()
category.name = "categoryaaa" # 分类名称
category.creation_time = timezone.now() # 创建时间
category.last_modify_time = timezone.now() # 最后修改时间
category.save()
# 创建文章
article = Article()
article.title = "nicetitleaaa" # 文章标题
article.body = "nicecontentaaa" # 文章内容
article.author = user # 文章作者
article.category = category # 文章分类
article.type = 'a' # 文章类型
article.status = 'p' # 文章状态:发布
article.save()
# 访问文章管理页面
response = self.client.get(article.get_admin_url())
# 断言页面访问成功
self.assertEqual(response.status_code, 200)
def test_validate_register(self):
"""测试用户注册功能"""
# 断言注册前邮箱不存在
self.assertEquals(
0, len(BlogUser.objects.filter(email='user123@user.com')))
# 发送注册POST请求
response = self.client.post(reverse('account:register'), {
'username': 'user1233', # 用户名
'email': 'user123@user.com', # 邮箱
'password1': 'password123!q@wE#R$T', # 密码
'password2': 'password123!q@wE#R$T', # 确认密码
})
# 断言注册后邮箱存在
self.assertEquals(
1, len(BlogUser.objects.filter(email='user123@user.com')))
# 获取刚注册的用户
user = BlogUser.objects.filter(email='user123@user.com')[0]
# 生成验证签名
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
# 构建验证URL路径
path = reverse('accounts:result')
# 构建完整验证URL
url = '{path}?type=validation&id={id}&sign={sign}'.format(
path=path, # 路径
id=user.id, # 用户ID
sign=sign # 签名
)
# 访问验证URL
response = self.client.get(url)
# 断言页面访问成功
self.assertEqual(response.status_code, 200)
# 登录用户
self.client.login(username='user1233', password='password123!q@wE#R$T')
# 获取用户并设置为管理员
user = BlogUser.objects.filter(email='user123@user.com')[0]
user.is_superuser = True # 设置为超级用户
user.is_staff = True # 设置为工作人员
user.save()
# 删除侧边栏缓存
delete_sidebar_cache()
# 创建分类
category = Category()
category.name = "categoryaaa" # 分类名称
category.creation_time = timezone.now() # 创建时间
category.last_modify_time = timezone.now() # 最后修改时间
category.save()
# 创建文章
article = Article()
article.category = category # 文章分类
article.title = "nicetitle333" # 文章标题
article.body = "nicecontentttt" # 文章内容
article.author = user # 文章作者
article.type = 'a' # 文章类型
article.status = 'p' # 文章状态:发布
article.save()
# 访问文章管理页面
response = self.client.get(article.get_admin_url())
# 断言页面访问成功
self.assertEqual(response.status_code, 200)
# 登出用户
response = self.client.get(reverse('account:logout'))
# 断言登出成功(重定向状态码)
self.assertIn(response.status_code, [301, 302, 200])
# 再次访问文章管理页面(应该被重定向到登录页)
response = self.client.get(article.get_admin_url())
# 断言被重定向
self.assertIn(response.status_code, [301, 302, 200])
# 使用错误密码尝试登录
response = self.client.post(reverse('account:login'), {
'username': 'user1233', # 用户名
'password': 'password123' # 错误密码
})
# 断言登录失败(重定向状态码)
self.assertIn(response.status_code, [301, 302, 200])
# 再次访问文章管理页面(应该仍然被重定向)
response = self.client.get(article.get_admin_url())
# 断言被重定向
self.assertIn(response.status_code, [301, 302, 200])
def test_verify_email_code(self):
"""测试邮箱验证码功能"""
# 测试邮箱
to_email = "admin@admin.com"
# 生成验证码
code = generate_code()
# 设置验证码到缓存
utils.set_code(to_email, code)
# 发送验证邮件
utils.send_verify_email(to_email, code)
# 验证正确验证码
err = utils.verify("admin@admin.com", code)
# 断言验证成功返回None
self.assertEqual(err, None)
# 验证错误验证码
err = utils.verify("admin@123.com", code)
# 断言验证失败(返回错误信息字符串)
self.assertEqual(type(err), str)
def test_forget_password_email_code_success(self):
"""测试成功获取忘记密码验证码"""
# 发送获取验证码的POST请求
resp = self.client.post(
path=reverse("account:forget_password_code"), # URL路径
data=dict(email="admin@admin.com") # 请求数据:邮箱
)
# 断言响应状态码为200
self.assertEqual(resp.status_code, 200)
# 断言响应内容为"ok"
self.assertEqual(resp.content.decode("utf-8"), "ok")
def test_forget_password_email_code_fail(self):
"""测试获取忘记密码验证码失败情况"""
# 发送空数据的POST请求
resp = self.client.post(
path=reverse("account:forget_password_code"), # URL路径
data=dict() # 空数据
)
# 断言返回错误信息
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
# 发送无效邮箱的POST请求
resp = self.client.post(
path=reverse("account:forget_password_code"), # URL路径
data=dict(email="admin@com") # 无效邮箱格式
)
# 断言返回错误信息
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
def test_forget_password_email_success(self):
"""测试成功重置密码"""
# 生成验证码
code = generate_code()
# 设置验证码到缓存
utils.set_code(self.blog_user.email, code)
# 准备请求数据
data = dict(
new_password1=self.new_test, # 新密码
new_password2=self.new_test, # 确认密码
email=self.blog_user.email, # 邮箱
code=code, # 验证码
)
# 发送重置密码的POST请求
resp = self.client.post(
path=reverse("account:forget_password"), # URL路径
data=data # 请求数据
)
# 断言重定向响应状态码302
self.assertEqual(resp.status_code, 302)
# 验证用户密码是否修改成功
blog_user = BlogUser.objects.filter(
email=self.blog_user.email,
).first() # 类型注解BlogUser
# 断言用户存在
self.assertNotEqual(blog_user, None)
# 断言新密码验证通过
self.assertEqual(blog_user.check_password(data["new_password1"]), True)
def test_forget_password_email_not_user(self):
"""测试重置密码时邮箱不存在的情况"""
# 准备请求数据(不存在的邮箱)
data = dict(
new_password1=self.new_test, # 新密码
new_password2=self.new_test, # 确认密码
email="123@123.com", # 不存在的邮箱
code="123456", # 验证码
)
# 发送重置密码的POST请求
resp = self.client.post(
path=reverse("account:forget_password"), # URL路径
data=data # 请求数据
)
# 断言返回表单页面状态码200
self.assertEqual(resp.status_code, 200)
def test_forget_password_email_code_error(self):
"""测试重置密码时验证码错误的情况"""
# 生成验证码
code = generate_code()
# 设置验证码到缓存
utils.set_code(self.blog_user.email, code)
# 准备请求数据(错误的验证码)
data = dict(
new_password1=self.new_test, # 新密码
new_password2=self.new_test, # 确认密码
email=self.blog_user.email, # 邮箱
code="111111", # 错误的验证码
)
# 发送重置密码的POST请求
resp = self.client.post(
path=reverse("account:forget_password"), # URL路径
data=data # 请求数据
)
# 断言返回表单页面状态码200
self.assertEqual(resp.status_code, 200)

@ -1,53 +0,0 @@
# 导入URL路径函数
from django.urls import path
# 导入正则URL路径函数兼容老版本
from django.urls import re_path
# 导入当前应用的视图
from . import views
# 导入登录表单
from .forms import LoginForm
# 应用命名空间用于URL反向解析
app_name = "accounts"
# URL模式列表
urlpatterns = [
# 登录URL - 使用正则表达式匹配 /login/ 路径
re_path(r'^login/$',
# 使用LoginView视图类登录成功后重定向到首页
views.LoginView.as_view(success_url='/'),
name='login', # URL名称login
# 传入额外参数:指定认证表单类
kwargs={'authentication_form': LoginForm}),
# 注册URL - 使用正则表达式匹配 /register/ 路径
re_path(r'^register/$',
# 使用RegisterView视图类注册成功后重定向到首页
views.RegisterView.as_view(success_url="/"),
name='register'), # URL名称register
# 登出URL - 使用正则表达式匹配 /logout/ 路径
re_path(r'^logout/$',
# 使用LogoutView视图类
views.LogoutView.as_view(),
name='logout'), # URL名称logout
# 账户结果页面URL - 使用path匹配固定路径
path(r'account/result.html',
# 使用account_result函数视图
views.account_result,
name='result'), # URL名称result
# 忘记密码URL - 使用正则表达式匹配 /forget_password/ 路径
re_path(r'^forget_password/$',
# 使用ForgetPasswordView视图类
views.ForgetPasswordView.as_view(),
name='forget_password'), # URL名称forget_password
# 获取忘记密码验证码URL - 使用正则表达式匹配 /forget_password_code/ 路径
re_path(r'^forget_password_code/$',
# 使用ForgetPasswordEmailCode视图类
views.ForgetPasswordEmailCode.as_view(),
name='forget_password_code'), # URL名称forget_password_code
]

@ -1,42 +0,0 @@
# 导入获取用户模型的函数
from django.contrib.auth import get_user_model
# 导入模型后端认证基类
from django.contrib.auth.backends import ModelBackend
class EmailOrUsernameModelBackend(ModelBackend):
"""
自定义认证后端允许使用用户名或邮箱登录
继承自Django的ModelBackend
"""
def authenticate(self, request, username=None, password=None, **kwargs):
"""用户认证方法"""
# 检查用户名中是否包含@符号(判断是否为邮箱)
if '@' in username:
# 如果是邮箱,设置查询参数为邮箱
kwargs = {'email': username}
else:
# 如果是用户名,设置查询参数为用户名
kwargs = {'username': username}
try:
# 根据查询参数获取用户对象
user = get_user_model().objects.get(**kwargs)
# 检查密码是否正确
if user.check_password(password):
# 密码正确,返回用户对象
return user
# 捕获用户不存在的异常
except get_user_model().DoesNotExist:
# 用户不存在返回None
return None
def get_user(self, username):
"""根据用户ID获取用户对象"""
try:
# 根据主键用户ID获取用户对象
return get_user_model().objects.get(pk=username)
# 捕获用户不存在的异常
except get_user_model().DoesNotExist:
# 用户不存在返回None
return None

@ -1,70 +0,0 @@
# 导入类型提示模块
import typing
# 导入时间间隔类
from datetime import timedelta
# 导入Django缓存框架
from django.core.cache import cache
# 导入国际化翻译函数
from django.utils.translation import gettext
# 导入延迟翻译函数
from django.utils.translation import gettext_lazy as _
# 导入发送邮件的工具函数
from djangoblog.utils import send_email
# 验证码有效期5分钟
_code_ttl = timedelta(minutes=5)
def send_verify_email(to_mail: str, code: str, subject: str = _("Verify Email")):
"""发送验证邮件,用于密码重置等场景
Args:
to_mail: 接收邮件的邮箱地址
subject: 邮件主题默认为"验证邮箱"
code: 验证码内容
"""
# 构建邮件HTML内容包含验证码信息
html_content = _(
# 翻译文本:您正在重置密码,验证码是:{code}5分钟内有效请妥善保管
"You are resetting the password, the verification code is%(code)s, valid within 5 minutes, please keep it "
"properly"
) % {'code': code} # 将code插入到格式化字符串中
# 调用发送邮件函数
send_email([to_mail], subject, html_content)
def verify(email: str, code: str) -> typing.Optional[str]:
"""验证验证码是否有效
Args:
email: 请求验证的邮箱地址
code: 用户输入的验证码
Return:
如果验证失败返回错误信息字符串验证成功返回None
Note:
这里的错误处理不太合理应该采用raise抛出异常
否则调用方也需要对error进行处理
"""
# 从缓存中获取该邮箱对应的验证码
cache_code = get_code(email)
# 比较缓存中的验证码和用户输入的验证码
if cache_code != code:
# 如果不匹配,返回错误信息
return gettext("Verification code error")
# 验证成功返回None
def set_code(email: str, code: str):
"""将验证码设置到缓存中"""
# 使用cache.set方法key为邮箱value为验证码设置过期时间
cache.set(email, code, _code_ttl.seconds)
def get_code(email: str) -> typing.Optional[str]:
"""从缓存中获取验证码"""
# 使用cache.get方法根据邮箱获取验证码
return cache.get(email)

@ -1,327 +0,0 @@
# 导入日志模块
import logging
# 导入延迟翻译函数
from django.utils.translation import gettext_lazy as _
# 导入Django设置
from django.conf import settings
# 导入Django认证框架
from django.contrib import auth
# 导入重定向字段名常量
from django.contrib.auth import REDIRECT_FIELD_NAME
# 导入获取用户模型的函数
from django.contrib.auth import get_user_model
# 导入登出函数
from django.contrib.auth import logout
# 导入Django内置认证表单
from django.contrib.auth.forms import AuthenticationForm
# 导入密码哈希函数
from django.contrib.auth.hashers import make_password
# 导入HTTP响应重定向和禁止访问响应
from django.http import HttpResponseRedirect, HttpResponseForbidden
# 导入HTTP请求类型
from django.http.request import HttpRequest
# 导入HTTP响应类型
from django.http.response import HttpResponse
# 导入快捷函数获取对象或404错误
from django.shortcuts import get_object_or_404
# 导入快捷函数:渲染模板
from django.shortcuts import render
# 导入URL反向解析
from django.urls import reverse
# 导入方法装饰器
from django.utils.decorators import method_decorator
# 导入URL安全验证函数
from django.utils.http import url_has_allowed_host_and_scheme
# 导入基于类的视图基类
from django.views import View
# 导入禁止缓存装饰器
from django.views.decorators.cache import never_cache
# 导入CSRF保护装饰器
from django.views.decorators.csrf import csrf_protect
# 导入敏感参数保护装饰器
from django.views.decorators.debug import sensitive_post_parameters
# 导入通用视图类
from django.views.generic import FormView, RedirectView
# 导入项目工具函数
from djangoblog.utils import send_email, get_sha256, get_current_site, generate_code, delete_sidebar_cache
# 导入当前应用的工具函数
from . import utils
# 导入自定义表单
from .forms import RegisterForm, LoginForm, ForgetPasswordForm, ForgetPasswordCodeForm
# 导入用户模型
from .models import BlogUser
# 获取当前模块的日志器
logger = logging.getLogger(__name__)
# 在这里创建视图
class RegisterView(FormView):
"""用户注册视图"""
# 指定使用的表单类
form_class = RegisterForm
# 指定使用的模板
template_name = 'account/registration_form.html'
@method_decorator(csrf_protect) # CSRF保护装饰器
def dispatch(self, *args, **kwargs):
"""处理请求分发"""
# 调用父类的dispatch方法
return super(RegisterView, self).dispatch(*args, **kwargs)
def form_valid(self, form):
"""处理表单验证通过的情况"""
# 检查表单是否有效
if form.is_valid():
# 保存表单数据但不提交到数据库commit=False
user = form.save(False)
# 设置用户为未激活状态(需要邮箱验证)
user.is_active = False
# 设置用户来源为注册页面
user.source = 'Register'
# 保存用户到数据库
user.save(True)
# 获取当前站点域名
site = get_current_site().domain
# 生成验证签名:对密钥+用户ID进行双重SHA256哈希
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
# 如果是调试模式,使用本地地址
if settings.DEBUG:
site = '127.0.0.1:8000'
# 获取结果页面的URL路径
path = reverse('account:result')
# 构建完整的验证URL
url = "http://{site}{path}?type=validation&id={id}&sign={sign}".format(
site=site, # 站点地址
path=path, # 结果页面路径
id=user.id, # 用户ID
sign=sign # 验证签名
)
# 构建邮件内容
content = """
<p>请点击下面链接验证您的邮箱</p>
<a href="{url}" rel="bookmark">{url}</a>
再次感谢您
<br />
如果上面链接无法打开请将此链接复制至浏览器
{url}
""".format(url=url)
# 发送验证邮件
send_email(
emailto=[user.email], # 收件人邮箱
title='验证您的电子邮箱', # 邮件标题
content=content # 邮件内容
)
# 构建注册结果页面URL
url = reverse('accounts:result') + '?type=register&id=' + str(user.id)
# 重定向到结果页面
return HttpResponseRedirect(url)
else:
# 表单无效,重新渲染表单并显示错误
return self.render_to_response({'form': form})
class LogoutView(RedirectView):
"""用户登出视图"""
# 登出后重定向的URL
url = '/login/'
@method_decorator(never_cache) # 禁止缓存装饰器
def dispatch(self, request, *args, **kwargs):
"""处理请求分发"""
# 调用父类的dispatch方法
return super(LogoutView, self).dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
"""处理GET请求登出操作"""
# 调用Django登出函数
logout(request)
# 删除侧边栏缓存
delete_sidebar_cache()
# 调用父类的get方法进行重定向
return super(LogoutView, self).get(request, *args, **kwargs)
class LoginView(FormView):
"""用户登录视图"""
# 指定使用的表单类
form_class = LoginForm
# 指定使用的模板
template_name = 'account/login.html'
# 登录成功后的重定向URL
success_url = '/'
# 重定向字段名
redirect_field_name = REDIRECT_FIELD_NAME
# 登录会话保持时间:一个月(秒数)
login_ttl = 2626560
# 方法装饰器保护敏感参数、CSRF保护、禁止缓存
@method_decorator(sensitive_post_parameters('password'))
@method_decorator(csrf_protect)
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
"""处理请求分发"""
# 调用父类的dispatch方法
return super(LoginView, self).dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
"""获取模板上下文数据"""
# 从GET参数中获取重定向URL
redirect_to = self.request.GET.get(self.redirect_field_name)
# 如果重定向URL为空设置为首页
if redirect_to is None:
redirect_to = '/'
# 将重定向URL添加到上下文数据中
kwargs['redirect_to'] = redirect_to
# 调用父类方法获取基础上下文数据
return super(LoginView, self).get_context_data(**kwargs)
def form_valid(self, form):
"""处理表单验证通过的情况"""
# 创建认证表单实例使用POST数据
form = AuthenticationForm(data=self.request.POST, request=self.request)
# 检查表单是否有效
if form.is_valid():
# 删除侧边栏缓存
delete_sidebar_cache()
# 记录日志
logger.info(self.redirect_field_name)
# 登录用户
auth.login(self.request, form.get_user())
# 如果用户选择了"记住我"
if self.request.POST.get("remember"):
# 设置会话过期时间为一个月
self.request.session.set_expiry(self.login_ttl)
# 调用父类的form_valid方法会处理重定向
return super(LoginView, self).form_valid(form)
else:
# 表单无效,重新渲染表单并显示错误
return self.render_to_response({'form': form})
def get_success_url(self):
"""获取登录成功后的重定向URL"""
# 从POST数据中获取重定向URL
redirect_to = self.request.POST.get(self.redirect_field_name)
# 验证重定向URL是否安全同源策略
if not url_has_allowed_host_and_scheme(
url=redirect_to,
allowed_hosts=[self.request.get_host()]):
# 如果不安全使用默认的成功URL
redirect_to = self.success_url
# 返回重定向URL
return redirect_to
def account_result(request):
"""账户操作结果页面视图函数"""
# 从GET参数获取操作类型
type = request.GET.get('type')
# 从GET参数获取用户ID
id = request.GET.get('id')
# 根据ID获取用户对象如果不存在返回404
user = get_object_or_404(get_user_model(), id=id)
# 记录日志
logger.info(type)
# 如果用户已激活,重定向到首页
if user.is_active:
return HttpResponseRedirect('/')
# 检查操作类型是否为注册或验证
if type and type in ['register', 'validation']:
# 如果是注册操作
if type == 'register':
# 设置注册成功的内容和标题
content = '''
恭喜您注册成功一封验证邮件已经发送到您的邮箱请验证您的邮箱后登录本站
'''
title = '注册成功'
else:
# 生成验证签名
c_sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
# 从GET参数获取签名
sign = request.GET.get('sign')
# 验证签名是否正确
if sign != c_sign:
# 签名错误,返回禁止访问
return HttpResponseForbidden()
# 激活用户账户
user.is_active = True
# 保存用户
user.save()
# 设置验证成功的内容和标题
content = '''
恭喜您已经成功的完成邮箱验证您现在可以使用您的账号来登录本站
'''
title = '验证成功'
# 渲染结果页面
return render(request, 'account/result.html', {
'title': title, # 页面标题
'content': content # 页面内容
})
else:
# 无效的操作类型,重定向到首页
return HttpResponseRedirect('/')
class ForgetPasswordView(FormView):
"""忘记密码重置视图"""
# 指定使用的表单类
form_class = ForgetPasswordForm
# 指定使用的模板
template_name = 'account/forget_password.html'
def form_valid(self, form):
"""处理表单验证通过的情况"""
# 检查表单是否有效
if form.is_valid():
# 根据邮箱获取用户对象
blog_user = BlogUser.objects.filter(
email=form.cleaned_data.get("email")
).get()
# 使用新密码的哈希值更新用户密码
blog_user.password = make_password(form.cleaned_data["new_password2"])
# 保存用户
blog_user.save()
# 重定向到登录页面
return HttpResponseRedirect('/login/')
else:
# 表单无效,重新渲染表单并显示错误
return self.render_to_response({'form': form})
class ForgetPasswordEmailCode(View):
"""获取忘记密码验证码的API视图"""
def post(self, request: HttpRequest):
"""处理POST请求"""
# 创建表单实例并验证数据
form = ForgetPasswordCodeForm(request.POST)
# 检查表单是否有效
if not form.is_valid():
# 表单无效,返回错误响应
return HttpResponse("错误的邮箱")
# 从已验证数据中获取邮箱
to_email = form.cleaned_data["email"]
# 生成验证码
code = generate_code()
# 发送验证邮件
utils.send_verify_email(to_email, code)
# 将验证码保存到缓存
utils.set_code(to_email, code)
# 返回成功响应
return HttpResponse("ok")

@ -1,8 +1,3 @@
"""
LJX: Django后台管理配置模块
负责blog应用中各模型在Django admin后台的显示和操作配置
包括文章分类标签友情链接侧边栏等模型的后台管理界面设置
"""
from django import forms
from django.contrib import admin
from django.contrib.auth import get_user_model
@ -15,7 +10,6 @@ from .models import Article
class ArticleForm(forms.ModelForm):
"""LJX: 文章表单类,用于后台文章编辑"""
# body = forms.CharField(widget=AdminPagedownWidget())
class Meta:
@ -24,26 +18,21 @@ class ArticleForm(forms.ModelForm):
def makr_article_publish(modeladmin, request, queryset):
"""LJX: 批量发布文章的管理动作"""
queryset.update(status='p')
def draft_article(modeladmin, request, queryset):
"""LJX: 批量将文章设为草稿的管理动作"""
queryset.update(status='d')
def close_article_commentstatus(modeladmin, request, queryset):
"""LJX: 批量关闭文章评论的管理动作"""
queryset.update(comment_status='c')
def open_article_commentstatus(modeladmin, request, queryset):
"""LJX: 批量开启文章评论的管理动作"""
queryset.update(comment_status='o')
# LJX: 设置管理动作的描述信息
makr_article_publish.short_description = _('Publish selected articles')
draft_article.short_description = _('Draft selected articles')
close_article_commentstatus.short_description = _('Close article comments')
@ -51,33 +40,31 @@ open_article_commentstatus.short_description = _('Open article comments')
class ArticlelAdmin(admin.ModelAdmin):
"""LJX: 文章模型的后台管理配置"""
list_per_page = 20 # LJX: 每页显示20条记录
search_fields = ('body', 'title') # LJX: 可搜索的字段
form = ArticleForm # LJX: 使用自定义表单
list_per_page = 20
search_fields = ('body', 'title')
form = ArticleForm
list_display = (
'id',
'title',
'author',
'link_to_category', # LJX: 自定义字段显示分类链接
'link_to_category',
'creation_time',
'views',
'status',
'type',
'article_order')
list_display_links = ('id', 'title') # LJX: 可点击的字段
list_filter = ('status', 'type', 'category') # LJX: 右侧过滤器
filter_horizontal = ('tags',) # LJX: 水平选择器用于多对多字段
exclude = ('creation_time', 'last_modify_time') # LJX: 排除的字段
view_on_site = True # LJX: 显示"在站点查看"按钮
actions = [ # LJX: 可用的批量动作
list_display_links = ('id', 'title')
list_filter = ('status', 'type', 'category')
filter_horizontal = ('tags',)
exclude = ('creation_time', 'last_modify_time')
view_on_site = True
actions = [
makr_article_publish,
draft_article,
close_article_commentstatus,
open_article_commentstatus]
def link_to_category(self, obj):
"""LJX: 自定义方法,显示分类名称并链接到分类编辑页面"""
info = (obj.category._meta.app_label, obj.category._meta.model_name)
link = reverse('admin:%s_%s_change' % info, args=(obj.category.id,))
return format_html(u'<a href="%s">%s</a>' % (link, obj.category.name))
@ -85,20 +72,17 @@ class ArticlelAdmin(admin.ModelAdmin):
link_to_category.short_description = _('category')
def get_form(self, request, obj=None, **kwargs):
"""LJX: 重写获取表单方法,限制作者只能选择超级用户"""
form = super(ArticlelAdmin, self).get_form(request, obj, **kwargs)
form.base_fields['author'].queryset = get_user_model(
).objects.filter(is_superuser=True)
return form
def save_model(self, request, obj, form, change):
"""LJX: 重写保存模型方法"""
super(ArticlelAdmin, self).save_model(request, obj, form, change)
def get_view_on_site_url(self, obj=None):
"""LJX: 获取在站点查看的URL"""
if obj:
url = obj.get_full_url() # LJX: 使用文章的完整URL
url = obj.get_full_url()
return url
else:
from djangoblog.utils import get_current_site
@ -107,27 +91,22 @@ class ArticlelAdmin(admin.ModelAdmin):
class TagAdmin(admin.ModelAdmin):
"""LJX: 标签模型的后台管理配置"""
exclude = ('slug', 'last_mod_time', 'creation_time') # LJX: 排除自动生成的字段
exclude = ('slug', 'last_mod_time', 'creation_time')
class CategoryAdmin(admin.ModelAdmin):
"""LJX: 分类模型的后台管理配置"""
list_display = ('name', 'parent_category', 'index') # LJX: 列表显示字段
exclude = ('slug', 'last_mod_time', 'creation_time') # LJX: 排除自动生成的字段
list_display = ('name', 'parent_category', 'index')
exclude = ('slug', 'last_mod_time', 'creation_time')
class LinksAdmin(admin.ModelAdmin):
"""LJX: 友情链接模型的后台管理配置"""
exclude = ('last_mod_time', 'creation_time') # LJX: 排除时间字段
exclude = ('last_mod_time', 'creation_time')
class SideBarAdmin(admin.ModelAdmin):
"""LJX: 侧边栏模型的后台管理配置"""
list_display = ('name', 'content', 'is_enable', 'sequence') # LJX: 列表显示字段
exclude = ('last_mod_time', 'creation_time') # LJX: 排除时间字段
list_display = ('name', 'content', 'is_enable', 'sequence')
exclude = ('last_mod_time', 'creation_time')
class BlogSettingsAdmin(admin.ModelAdmin):
"""LJX: 博客设置模型的后台管理配置"""
pass
pass

@ -0,0 +1,5 @@
from django.apps import AppConfig
class BlogConfig(AppConfig):
name = 'blog'

@ -0,0 +1,43 @@
import logging
from django.utils import timezone
from djangoblog.utils import cache, get_blog_setting
from .models import Category, Article
logger = logging.getLogger(__name__)
def seo_processor(requests):
key = 'seo_processor'
value = cache.get(key)
if value:
return value
else:
logger.info('set processor cache.')
setting = get_blog_setting()
value = {
'SITE_NAME': setting.site_name,
'SHOW_GOOGLE_ADSENSE': setting.show_google_adsense,
'GOOGLE_ADSENSE_CODES': setting.google_adsense_codes,
'SITE_SEO_DESCRIPTION': setting.site_seo_description,
'SITE_DESCRIPTION': setting.site_description,
'SITE_KEYWORDS': setting.site_keywords,
'SITE_BASE_URL': requests.scheme + '://' + requests.get_host() + '/',
'ARTICLE_SUB_LENGTH': setting.article_sub_length,
'nav_category_list': Category.objects.all(),
'nav_pages': Article.objects.filter(
type='p',
status='p'),
'OPEN_SITE_COMMENT': setting.open_site_comment,
'BEIAN_CODE': setting.beian_code,
'ANALYTICS_CODE': setting.analytics_code,
"BEIAN_CODE_GONGAN": setting.gongan_beiancode,
"SHOW_GONGAN_CODE": setting.show_gongan_code,
"CURRENT_YEAR": timezone.now().year,
"GLOBAL_HEADER": setting.global_header,
"GLOBAL_FOOTER": setting.global_footer,
"COMMENT_NEED_REVIEW": setting.comment_need_review,
}
cache.set(key, value, 60 * 60 * 10)
return value

@ -1,8 +1,3 @@
"""
LJX: Elasticsearch文档定义模块
定义Elasticsearch索引的文档结构和数据模型
用于博客文章的全文搜索和性能监控数据的存储
"""
import time
import elasticsearch.client
@ -12,11 +7,9 @@ from elasticsearch_dsl.connections import connections
from blog.models import Article
# LJX: 检查是否启用Elasticsearch
ELASTICSEARCH_ENABLED = hasattr(settings, 'ELASTICSEARCH_DSL')
if ELASTICSEARCH_ENABLED:
# LJX: 创建Elasticsearch连接
connections.create_connection(
hosts=[settings.ELASTICSEARCH_DSL['default']['hosts']])
from elasticsearch import Elasticsearch
@ -26,9 +19,8 @@ if ELASTICSEARCH_ENABLED:
c = IngestClient(es)
try:
c.get_pipeline('geoip') # LJX: 检查geoip管道是否存在
c.get_pipeline('geoip')
except elasticsearch.exceptions.NotFoundError:
# LJX: 创建geoip处理管道用于IP地址地理位置解析
c.put_pipeline('geoip', body='''{
"description" : "Add geoip info",
"processors" : [
@ -42,84 +34,72 @@ if ELASTICSEARCH_ENABLED:
class GeoIp(InnerDoc):
"""LJX: IP地理位置信息内嵌文档"""
continent_name = Keyword() # LJX: 大洲名称
country_iso_code = Keyword() # LJX: 国家ISO代码
country_name = Keyword() # LJX: 国家名称
location = GeoPoint() # LJX: 地理位置坐标
continent_name = Keyword()
country_iso_code = Keyword()
country_name = Keyword()
location = GeoPoint()
class UserAgentBrowser(InnerDoc):
"""LJX: 用户代理浏览器信息"""
Family = Keyword() # LJX: 浏览器家族
Version = Keyword() # LJX: 浏览器版本
Family = Keyword()
Version = Keyword()
class UserAgentOS(UserAgentBrowser):
"""LJX: 用户代理操作系统信息"""
pass
class UserAgentDevice(InnerDoc):
"""LJX: 用户代理设备信息"""
Family = Keyword() # LJX: 设备家族
Brand = Keyword() # LJX: 设备品牌
Model = Keyword() # LJX: 设备型号
Family = Keyword()
Brand = Keyword()
Model = Keyword()
class UserAgent(InnerDoc):
"""LJX: 完整的用户代理信息"""
browser = Object(UserAgentBrowser, required=False) # LJX: 浏览器信息
os = Object(UserAgentOS, required=False) # LJX: 操作系统信息
device = Object(UserAgentDevice, required=False) # LJX: 设备信息
string = Text() # LJX: 原始用户代理字符串
is_bot = Boolean() # LJX: 是否是爬虫
browser = Object(UserAgentBrowser, required=False)
os = Object(UserAgentOS, required=False)
device = Object(UserAgentDevice, required=False)
string = Text()
is_bot = Boolean()
class ElapsedTimeDocument(Document):
"""LJX: 性能监控耗时文档,记录页面加载时间等性能数据"""
url = Keyword() # LJX: 请求URL
time_taken = Long() # LJX: 耗时(毫秒)
log_datetime = Date() # LJX: 日志时间
ip = Keyword() # LJX: IP地址
geoip = Object(GeoIp, required=False) # LJX: 地理位置信息
useragent = Object(UserAgent, required=False) # LJX: 用户代理信息
url = Keyword()
time_taken = Long()
log_datetime = Date()
ip = Keyword()
geoip = Object(GeoIp, required=False)
useragent = Object(UserAgent, required=False)
class Index:
"""LJX: 索引配置"""
name = 'performance' # LJX: 索引名称
name = 'performance'
settings = {
"number_of_shards": 1, # LJX: 分片数量
"number_of_replicas": 0 # LJX: 副本数量
"number_of_shards": 1,
"number_of_replicas": 0
}
class Meta:
doc_type = 'ElapsedTime' # LJX: 文档类型
doc_type = 'ElapsedTime'
class ElaspedTimeDocumentManager:
"""LJX: 性能监控文档管理器"""
@staticmethod
def build_index():
"""LJX: 构建性能监控索引"""
from elasticsearch import Elasticsearch
client = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
res = client.indices.exists(index="performance")
if not res:
ElapsedTimeDocument.init() # LJX: 初始化索引
ElapsedTimeDocument.init()
@staticmethod
def delete_index():
"""LJX: 删除性能监控索引"""
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
es.indices.delete(index='performance', ignore=[400, 404])
@staticmethod
def create(url, time_taken, log_datetime, useragent, ip):
"""LJX: 创建性能监控记录"""
ElaspedTimeDocumentManager.build_index()
# LJX: 构建用户代理信息对象
ua = UserAgent()
ua.browser = UserAgentBrowser()
ua.browser.Family = useragent.browser.family
@ -136,13 +116,12 @@ class ElaspedTimeDocumentManager:
ua.string = useragent.ua_string
ua.is_bot = useragent.is_bot
# LJX: 创建文档并保存使用geoip管道处理IP地址
doc = ElapsedTimeDocument(
meta={
'id': int(
round(
time.time() *
1000)) # LJX: 使用时间戳作为文档ID
1000))
},
url=url,
time_taken=time_taken,
@ -152,64 +131,57 @@ class ElaspedTimeDocumentManager:
class ArticleDocument(Document):
"""LJX: 博客文章搜索文档,用于全文搜索"""
body = Text(analyzer='ik_max_word', search_analyzer='ik_smart') # LJX: 文章内容使用IK分词器
title = Text(analyzer='ik_max_word', search_analyzer='ik_smart') # LJX: 文章标题
author = Object(properties={ # LJX: 作者信息
body = Text(analyzer='ik_max_word', search_analyzer='ik_smart')
title = Text(analyzer='ik_max_word', search_analyzer='ik_smart')
author = Object(properties={
'nickname': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
'id': Integer()
})
category = Object(properties={ # LJX: 分类信息
category = Object(properties={
'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
'id': Integer()
})
tags = Object(properties={ # LJX: 标签信息
tags = Object(properties={
'name': Text(analyzer='ik_max_word', search_analyzer='ik_smart'),
'id': Integer()
})
# LJX: 文章元数据字段
pub_time = Date() # LJX: 发布时间
status = Text() # LJX: 状态
comment_status = Text() # LJX: 评论状态
type = Text() # LJX: 类型
views = Integer() # LJX: 浏览量
article_order = Integer() # LJX: 文章排序
pub_time = Date()
status = Text()
comment_status = Text()
type = Text()
views = Integer()
article_order = Integer()
class Index:
"""LJX: 文章索引配置"""
name = 'blog' # LJX: 索引名称
name = 'blog'
settings = {
"number_of_shards": 1,
"number_of_replicas": 0
}
class Meta:
doc_type = 'Article' # LJX: 文档类型
doc_type = 'Article'
class ArticleDocumentManager():
"""LJX: 文章文档管理器,处理文章搜索索引的创建和更新"""
def __init__(self):
self.create_index() # LJX: 初始化时创建索引
self.create_index()
def create_index(self):
"""LJX: 创建文章索引"""
ArticleDocument.init()
def delete_index(self):
"""LJX: 删除文章索引"""
from elasticsearch import Elasticsearch
es = Elasticsearch(settings.ELASTICSEARCH_DSL['default']['hosts'])
es.indices.delete(index='blog', ignore=[400, 404])
def convert_to_doc(self, articles):
"""LJX: 将文章对象转换为搜索文档对象"""
return [
ArticleDocument(
meta={
'id': article.id}, # LJX: 使用文章ID作为文档ID
'id': article.id},
body=article.body,
title=article.title,
author={
@ -221,7 +193,7 @@ class ArticleDocumentManager():
tags=[
{
'name': t.name,
'id': t.id} for t in article.tags.all()], # LJX: 转换标签列表
'id': t.id} for t in article.tags.all()],
pub_time=article.pub_time,
status=article.status,
comment_status=article.comment_status,
@ -230,14 +202,12 @@ class ArticleDocumentManager():
article_order=article.article_order) for article in articles]
def rebuild(self, articles=None):
"""LJX: 重建搜索索引"""
ArticleDocument.init()
articles = articles if articles else Article.objects.all() # LJX: 如果没有指定文章,则使用所有文章
articles = articles if articles else Article.objects.all()
docs = self.convert_to_doc(articles)
for doc in docs:
doc.save() # LJX: 保存所有文档到索引
doc.save()
def update_docs(self, docs):
"""LJX: 更新搜索文档"""
for doc in docs:
doc.save()
doc.save()

@ -0,0 +1,19 @@
import logging
from django import forms
from haystack.forms import SearchForm
logger = logging.getLogger(__name__)
class BlogSearchForm(SearchForm):
querydata = forms.CharField(required=True)
def search(self):
datas = super(BlogSearchForm, self).search()
if not self.is_valid():
return self.no_query_found()
if self.cleaned_data['querydata']:
logger.info(self.cleaned_data['querydata'])
return datas

@ -0,0 +1,42 @@
import logging
import time
from ipware import get_client_ip
from user_agents import parse
from blog.documents import ELASTICSEARCH_ENABLED, ElaspedTimeDocumentManager
logger = logging.getLogger(__name__)
class OnlineMiddleware(object):
def __init__(self, get_response=None):
self.get_response = get_response
super().__init__()
def __call__(self, request):
''' page render time '''
start_time = time.time()
response = self.get_response(request)
http_user_agent = request.META.get('HTTP_USER_AGENT', '')
ip, _ = get_client_ip(request)
user_agent = parse(http_user_agent)
if not response.streaming:
try:
cast_time = time.time() - start_time
if ELASTICSEARCH_ENABLED:
time_taken = round((cast_time) * 1000, 2)
url = request.path
from django.utils import timezone
ElaspedTimeDocumentManager.create(
url=url,
time_taken=time_taken,
log_datetime=timezone.now(),
useragent=user_agent,
ip=ip)
response.content = response.content.replace(
b'<!!LOAD_TIMES!!>', str.encode(str(cast_time)[:5]))
except Exception as e:
logger.error("Error OnlineMiddleware: %s" % e)
return response

@ -0,0 +1,376 @@
import logging
import re
from abc import abstractmethod
from django.conf import settings
from django.core.exceptions import ValidationError
from django.db import models
from django.urls import reverse
from django.utils.timezone import now
from django.utils.translation import gettext_lazy as _
from mdeditor.fields import MDTextField
from uuslug import slugify
from djangoblog.utils import cache_decorator, cache
from djangoblog.utils import get_current_site
logger = logging.getLogger(__name__)
class LinkShowType(models.TextChoices):
I = ('i', _('index'))
L = ('l', _('list'))
P = ('p', _('post'))
A = ('a', _('all'))
S = ('s', _('slide'))
class BaseModel(models.Model):
id = models.AutoField(primary_key=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_modify_time = models.DateTimeField(_('modify time'), default=now)
def save(self, *args, **kwargs):
is_update_views = isinstance(
self,
Article) and 'update_fields' in kwargs and kwargs['update_fields'] == ['views']
if is_update_views:
Article.objects.filter(pk=self.pk).update(views=self.views)
else:
if 'slug' in self.__dict__:
slug = getattr(
self, 'title') if 'title' in self.__dict__ else getattr(
self, 'name')
setattr(self, 'slug', slugify(slug))
super().save(*args, **kwargs)
def get_full_url(self):
site = get_current_site().domain
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
return url
class Meta:
abstract = True
@abstractmethod
def get_absolute_url(self):
pass
class Article(BaseModel):
"""文章"""
STATUS_CHOICES = (
('d', _('Draft')),
('p', _('Published')),
)
COMMENT_STATUS = (
('o', _('Open')),
('c', _('Close')),
)
TYPE = (
('a', _('Article')),
('p', _('Page')),
)
title = models.CharField(_('title'), max_length=200, unique=True)
body = MDTextField(_('body'))
pub_time = models.DateTimeField(
_('publish time'), blank=False, null=False, default=now)
status = models.CharField(
_('status'),
max_length=1,
choices=STATUS_CHOICES,
default='p')
comment_status = models.CharField(
_('comment status'),
max_length=1,
choices=COMMENT_STATUS,
default='o')
type = models.CharField(_('type'), max_length=1, choices=TYPE, default='a')
views = models.PositiveIntegerField(_('views'), default=0)
author = models.ForeignKey(
settings.AUTH_USER_MODEL,
verbose_name=_('author'),
blank=False,
null=False,
on_delete=models.CASCADE)
article_order = models.IntegerField(
_('order'), blank=False, null=False, default=0)
show_toc = models.BooleanField(_('show toc'), blank=False, null=False, default=False)
category = models.ForeignKey(
'Category',
verbose_name=_('category'),
on_delete=models.CASCADE,
blank=False,
null=False)
tags = models.ManyToManyField('Tag', verbose_name=_('tag'), blank=True)
def body_to_string(self):
return self.body
def __str__(self):
return self.title
class Meta:
ordering = ['-article_order', '-pub_time']
verbose_name = _('article')
verbose_name_plural = verbose_name
get_latest_by = 'id'
def get_absolute_url(self):
return reverse('blog:detailbyid', kwargs={
'article_id': self.id,
'year': self.creation_time.year,
'month': self.creation_time.month,
'day': self.creation_time.day
})
@cache_decorator(60 * 60 * 10)
def get_category_tree(self):
tree = self.category.get_category_tree()
names = list(map(lambda c: (c.name, c.get_absolute_url()), tree))
return names
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
def viewed(self):
self.views += 1
self.save(update_fields=['views'])
def comment_list(self):
cache_key = 'article_comments_{id}'.format(id=self.id)
value = cache.get(cache_key)
if value:
logger.info('get article comments:{id}'.format(id=self.id))
return value
else:
comments = self.comment_set.filter(is_enable=True).order_by('-id')
cache.set(cache_key, comments, 60 * 100)
logger.info('set article comments:{id}'.format(id=self.id))
return comments
def get_admin_url(self):
info = (self._meta.app_label, self._meta.model_name)
return reverse('admin:%s_%s_change' % info, args=(self.pk,))
@cache_decorator(expiration=60 * 100)
def next_article(self):
# 下一篇
return Article.objects.filter(
id__gt=self.id, status='p').order_by('id').first()
@cache_decorator(expiration=60 * 100)
def prev_article(self):
# 前一篇
return Article.objects.filter(id__lt=self.id, status='p').first()
def get_first_image_url(self):
"""
Get the first image url from article.body.
:return:
"""
match = re.search(r'!\[.*?\]\((.+?)\)', self.body)
if match:
return match.group(1)
return ""
class Category(BaseModel):
"""文章分类"""
name = models.CharField(_('category name'), max_length=30, unique=True)
parent_category = models.ForeignKey(
'self',
verbose_name=_('parent category'),
blank=True,
null=True,
on_delete=models.CASCADE)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
index = models.IntegerField(default=0, verbose_name=_('index'))
class Meta:
ordering = ['-index']
verbose_name = _('category')
verbose_name_plural = verbose_name
def get_absolute_url(self):
return reverse(
'blog:category_detail', kwargs={
'category_name': self.slug})
def __str__(self):
return self.name
@cache_decorator(60 * 60 * 10)
def get_category_tree(self):
"""
递归获得分类目录的父级
:return:
"""
categorys = []
def parse(category):
categorys.append(category)
if category.parent_category:
parse(category.parent_category)
parse(self)
return categorys
@cache_decorator(60 * 60 * 10)
def get_sub_categorys(self):
"""
获得当前分类目录所有子集
:return:
"""
categorys = []
all_categorys = Category.objects.all()
def parse(category):
if category not in categorys:
categorys.append(category)
childs = all_categorys.filter(parent_category=category)
for child in childs:
if category not in categorys:
categorys.append(child)
parse(child)
parse(self)
return categorys
class Tag(BaseModel):
"""文章标签"""
name = models.CharField(_('tag name'), max_length=30, unique=True)
slug = models.SlugField(default='no-slug', max_length=60, blank=True)
def __str__(self):
return self.name
def get_absolute_url(self):
return reverse('blog:tag_detail', kwargs={'tag_name': self.slug})
@cache_decorator(60 * 60 * 10)
def get_article_count(self):
return Article.objects.filter(tags__name=self.name).distinct().count()
class Meta:
ordering = ['name']
verbose_name = _('tag')
verbose_name_plural = verbose_name
class Links(models.Model):
"""友情链接"""
name = models.CharField(_('link name'), max_length=30, unique=True)
link = models.URLField(_('link'))
sequence = models.IntegerField(_('order'), unique=True)
is_enable = models.BooleanField(
_('is show'), default=True, blank=False, null=False)
show_type = models.CharField(
_('show type'),
max_length=1,
choices=LinkShowType.choices,
default=LinkShowType.I)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_mod_time = models.DateTimeField(_('modify time'), default=now)
class Meta:
ordering = ['sequence']
verbose_name = _('link')
verbose_name_plural = verbose_name
def __str__(self):
return self.name
class SideBar(models.Model):
"""侧边栏,可以展示一些html内容"""
name = models.CharField(_('title'), max_length=100)
content = models.TextField(_('content'))
sequence = models.IntegerField(_('order'), unique=True)
is_enable = models.BooleanField(_('is enable'), default=True)
creation_time = models.DateTimeField(_('creation time'), default=now)
last_mod_time = models.DateTimeField(_('modify time'), default=now)
class Meta:
ordering = ['sequence']
verbose_name = _('sidebar')
verbose_name_plural = verbose_name
def __str__(self):
return self.name
class BlogSettings(models.Model):
"""blog的配置"""
site_name = models.CharField(
_('site name'),
max_length=200,
null=False,
blank=False,
default='')
site_description = models.TextField(
_('site description'),
max_length=1000,
null=False,
blank=False,
default='')
site_seo_description = models.TextField(
_('site seo description'), max_length=1000, null=False, blank=False, default='')
site_keywords = models.TextField(
_('site keywords'),
max_length=1000,
null=False,
blank=False,
default='')
article_sub_length = models.IntegerField(_('article sub length'), default=300)
sidebar_article_count = models.IntegerField(_('sidebar article count'), default=10)
sidebar_comment_count = models.IntegerField(_('sidebar comment count'), default=5)
article_comment_count = models.IntegerField(_('article comment count'), default=5)
show_google_adsense = models.BooleanField(_('show adsense'), default=False)
google_adsense_codes = models.TextField(
_('adsense code'), max_length=2000, null=True, blank=True, default='')
open_site_comment = models.BooleanField(_('open site comment'), default=True)
global_header = models.TextField("公共头部", null=True, blank=True, default='')
global_footer = models.TextField("公共尾部", null=True, blank=True, default='')
beian_code = models.CharField(
'备案号',
max_length=2000,
null=True,
blank=True,
default='')
analytics_code = models.TextField(
"网站统计代码",
max_length=1000,
null=False,
blank=False,
default='')
show_gongan_code = models.BooleanField(
'是否显示公安备案号', default=False, null=False)
gongan_beiancode = models.TextField(
'公安备案号',
max_length=2000,
null=True,
blank=True,
default='')
comment_need_review = models.BooleanField(
'评论是否需要审核', default=False, null=False)
class Meta:
verbose_name = _('Website configuration')
verbose_name_plural = verbose_name
def __str__(self):
return self.site_name
def clean(self):
if BlogSettings.objects.exclude(id=self.id).count():
raise ValidationError(_('There can only be one configuration'))
def save(self, *args, **kwargs):
super().save(*args, **kwargs)
from djangoblog.utils import cache
cache.clear()

@ -0,0 +1,13 @@
from haystack import indexes
from blog.models import Article
class ArticleIndex(indexes.SearchIndex, indexes.Indexable):
text = indexes.CharField(document=True, use_template=True)
def get_model(self):
return Article
def index_queryset(self, using=None):
return self.get_model().objects.filter(status='p')

Before

Width:  |  Height:  |  Size: 221 B

After

Width:  |  Height:  |  Size: 221 B

Before

Width:  |  Height:  |  Size: 1.5 KiB

After

Width:  |  Height:  |  Size: 1.5 KiB

Before

Width:  |  Height:  |  Size: 25 KiB

After

Width:  |  Height:  |  Size: 25 KiB

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save