Merge branch 'master' into zc_branch

zc_branch
张陈 3 months ago
commit cd78d783e3

Binary file not shown.

@ -2,4 +2,27 @@ from django.apps import AppConfig
class AccountsConfig(AppConfig):
"""
账户应用的配置类
Django应用配置类用于配置accounts应用的元数据和行为
继承自Django的AppConfig基类
"""
# 应用的Python路径Django使用这个属性来识别应用
# 这应该与应用的目录名一致
name = 'accounts'
# 其他常用但未在此定义的配置选项包括:
# - verbose_name: 应用的易读名称(用于管理后台显示)
# - default_auto_field: 默认的主键字段类型
# - label: 应用的简短标签用于替代name
# - path: 应用的文件系统路径
# 示例如果需要配置verbose_name可以这样添加
# verbose_name = '用户账户管理'
# 示例如果需要自定义ready方法可以这样添加
# def ready(self):
# # 应用启动时执行的代码
# # 通常用于信号注册等初始化操作
# import accounts.signals

@ -9,90 +9,116 @@ from .models import BlogUser
class LoginForm(AuthenticationForm):
"""自定义登录表单继承自Django的AuthenticationForm"""
def __init__(self, *args, **kwargs):
"""初始化方法设置表单字段的widget属性"""
super(LoginForm, self).__init__(*args, **kwargs)
# 设置用户名字段的输入框属性
self.fields['username'].widget = widgets.TextInput(
attrs={'placeholder': "username", "class": "form-control"})
# 设置密码字段的输入框属性
self.fields['password'].widget = widgets.PasswordInput(
attrs={'placeholder': "password", "class": "form-control"})
class RegisterForm(UserCreationForm):
"""自定义用户注册表单继承自Django的UserCreationForm"""
def __init__(self, *args, **kwargs):
"""初始化方法设置所有表单字段的widget属性"""
super(RegisterForm, self).__init__(*args, **kwargs)
# 设置用户名字段的输入框属性
self.fields['username'].widget = widgets.TextInput(
attrs={'placeholder': "username", "class": "form-control"})
# 设置邮箱字段的输入框属性
self.fields['email'].widget = widgets.EmailInput(
attrs={'placeholder': "email", "class": "form-control"})
# 设置密码字段的输入框属性
self.fields['password1'].widget = widgets.PasswordInput(
attrs={'placeholder': "password", "class": "form-control"})
# 设置确认密码字段的输入框属性
self.fields['password2'].widget = widgets.PasswordInput(
attrs={'placeholder': "repeat password", "class": "form-control"})
def clean_email(self):
"""邮箱字段验证方法"""
email = self.cleaned_data['email']
# 检查邮箱是否已存在
if get_user_model().objects.filter(email=email).exists():
raise ValidationError(_("email already exists"))
return email
class Meta:
model = get_user_model()
fields = ("username", "email")
"""表单的元数据配置"""
model = get_user_model() # 使用当前激活的用户模型
fields = ("username", "email") # 表单包含的字段
class ForgetPasswordForm(forms.Form):
"""忘记密码重置表单"""
# 新密码字段
new_password1 = forms.CharField(
label=_("New password"),
widget=forms.PasswordInput(
label=_("New password"), # 字段标签
widget=forms.PasswordInput( # 密码输入框
attrs={
"class": "form-control",
'placeholder': _("New password")
"class": "form-control", # CSS类
'placeholder': _("New password") # 占位符文本
}
),
)
# 确认新密码字段
new_password2 = forms.CharField(
label="确认密码",
widget=forms.PasswordInput(
label="确认密码", # 字段标签
widget=forms.PasswordInput( # 密码输入框
attrs={
"class": "form-control",
'placeholder': _("Confirm password")
"class": "form-control", # CSS类
'placeholder': _("Confirm password") # 占位符文本
}
),
)
# 邮箱字段
email = forms.EmailField(
label='邮箱',
widget=forms.TextInput(
label='邮箱', # 字段标签
widget=forms.TextInput( # 文本输入框
attrs={
'class': 'form-control',
'placeholder': _("Email")
'class': 'form-control', # CSS类
'placeholder': _("Email") # 占位符文本
}
),
)
# 验证码字段
code = forms.CharField(
label=_('Code'),
widget=forms.TextInput(
label=_('Code'), # 字段标签
widget=forms.TextInput( # 文本输入框
attrs={
'class': 'form-control',
'placeholder': _("Code")
'class': 'form-control', # CSS类
'placeholder': _("Code") # 占位符文本
}
),
)
def clean_new_password2(self):
"""确认密码字段验证方法"""
password1 = self.data.get("new_password1")
password2 = self.data.get("new_password2")
# 检查两次输入的密码是否一致
if password1 and password2 and password1 != password2:
raise ValidationError(_("passwords do not match"))
# 使用Django的密码验证器验证密码强度
password_validation.validate_password(password2)
return password2
def clean_email(self):
"""邮箱字段验证方法"""
user_email = self.cleaned_data.get("email")
# 检查邮箱对应的用户是否存在
if not BlogUser.objects.filter(
email=user_email
).exists():
@ -101,7 +127,9 @@ class ForgetPasswordForm(forms.Form):
return user_email
def clean_code(self):
"""验证码字段验证方法"""
code = self.cleaned_data.get("code")
# 使用utils模块验证邮箱和验证码是否匹配
error = utils.verify(
email=self.cleaned_data.get("email"),
code=code,
@ -112,6 +140,8 @@ class ForgetPasswordForm(forms.Form):
class ForgetPasswordCodeForm(forms.Form):
"""忘记密码验证码请求表单(仅包含邮箱字段)"""
email = forms.EmailField(
label=_('Email'),
)
label=_('Email'), # 邮箱字段标签
)

@ -7,43 +7,89 @@ import django.utils.timezone
class Migration(migrations.Migration):
"""
Django数据库迁移文件
用于创建BlogUser模型的数据库表结构
这是一个初始迁移文件initial migration
"""
# 标记为初始迁移Django使用这个标志来识别应用的第一个迁移
initial = True
# 依赖关系此迁移依赖于auth应用的特定迁移
# 确保在创建用户表之前,权限相关的表已经存在
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
]
# 迁移操作列表:定义要执行的具体数据库操作
operations = [
# 创建BlogUser模型的数据库表
migrations.CreateModel(
name='BlogUser',
name='BlogUser', # 模型名称
fields=[
# 主键字段使用BigAutoField作为自增主键
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 密码字段存储加密后的密码最大长度128字符
('password', models.CharField(max_length=128, verbose_name='password')),
# 最后登录时间:记录用户最后一次登录的时间
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
# 超级用户标志:标记用户是否拥有所有权限
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
# 用户名字段:唯一标识用户,有严格的字符限制和验证
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
# 名字字段:用户的名,可选
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
# 姓氏字段:用户的姓,可选
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
# 邮箱字段:用户的邮箱地址,可选
('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
# 员工状态:标记用户是否可以登录管理后台
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
# 活跃状态:标记用户账户是否激活
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
# 加入日期:用户注册的时间,默认为当前时间
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
# 昵称字段:自定义字段,用户显示名称,可选
('nickname', models.CharField(blank=True, max_length=100, verbose_name='昵称')),
# 创建时间:自定义字段,记录用户创建时间
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 最后修改时间:自定义字段,记录用户信息最后修改时间
('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')),
# 来源字段:自定义字段,记录用户创建来源(如注册渠道)
('source', models.CharField(blank=True, max_length=100, verbose_name='创建来源')),
# 组关联:多对多关系,用户所属的权限组
('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')),
# 用户权限:多对多关系,用户特有的权限
('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')),
],
# 模型元选项
options={
'verbose_name': '用户',
'verbose_name_plural': '用户',
'ordering': ['-id'],
'get_latest_by': 'id',
'verbose_name': '用户', # 单数名称(用于管理后台)
'verbose_name_plural': '用户', # 复数名称(用于管理后台)
'ordering': ['-id'], # 默认排序按ID降序最新的在前
'get_latest_by': 'id', # 指定获取最新记录时使用的字段
},
# 模型管理器
managers=[
# 使用Django默认的UserManager来管理用户对象
('objects', django.contrib.auth.models.UserManager()),
],
),
]
]

@ -5,42 +5,82 @@ import django.utils.timezone
class Migration(migrations.Migration):
"""
Django数据库迁移文件
用于修改BlogUser模型的结构和字段定义
这是一个数据模型重构迁移主要更新字段命名和国际化
"""
# 依赖关系此迁移依赖于accounts应用的初始迁移
# 确保在修改表结构之前,初始表已经创建
dependencies = [
('accounts', '0001_initial'),
('accounts', '0001_initial'), # 依赖于accounts应用的第一个迁移文件
]
# 迁移操作列表:定义要执行的具体数据库结构修改
operations = [
# 修改模型的元选项(主要是国际化显示名称)
migrations.AlterModelOptions(
name='bloguser',
options={'get_latest_by': 'id', 'ordering': ['-id'], 'verbose_name': 'user', 'verbose_name_plural': 'user'},
name='bloguser', # 目标模型名称
options={
'get_latest_by': 'id', # 保持按id获取最新记录
'ordering': ['-id'], # 保持按id降序排列
'verbose_name': 'user', # 更新单数名称为英文(国际化准备)
'verbose_name_plural': 'user' # 更新复数名称为英文(国际化准备)
},
),
# 删除旧的创建时间字段(为后续添加新字段做准备)
migrations.RemoveField(
model_name='bloguser',
name='created_time',
model_name='bloguser', # 目标模型
name='created_time', # 要删除的字段名
),
# 删除旧的最后修改时间字段
migrations.RemoveField(
model_name='bloguser',
name='last_mod_time',
model_name='bloguser', # 目标模型
name='last_mod_time', # 要删除的字段名
),
# 添加新的创建时间字段(使用国际化的字段名)
migrations.AddField(
model_name='bloguser',
name='creation_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'),
model_name='bloguser', # 目标模型
name='creation_time', # 新字段名
field=models.DateTimeField(
default=django.utils.timezone.now, # 默认值为当前时间
verbose_name='creation time' # 英文显示名称(国际化)
),
),
# 添加新的最后修改时间字段(使用国际化的字段名)
migrations.AddField(
model_name='bloguser',
name='last_modify_time',
field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'),
model_name='bloguser', # 目标模型
name='last_modify_time', # 新字段名
field=models.DateTimeField(
default=django.utils.timezone.now, # 默认值为当前时间
verbose_name='last modify time' # 英文显示名称(国际化)
),
),
# 修改昵称字段的显示名称(国际化)
migrations.AlterField(
model_name='bloguser',
name='nickname',
field=models.CharField(blank=True, max_length=100, verbose_name='nick name'),
model_name='bloguser', # 目标模型
name='nickname', # 要修改的字段
field=models.CharField(
blank=True, # 保持允许为空
max_length=100, # 保持最大长度100
verbose_name='nick name' # 更新为英文显示名称
),
),
# 修改来源字段的显示名称(国际化)
migrations.AlterField(
model_name='bloguser',
name='source',
field=models.CharField(blank=True, max_length=100, verbose_name='create source'),
model_name='bloguser', # 目标模型
name='source', # 要修改的字段
field=models.CharField(
blank=True, # 保持允许为空
max_length=100, # 保持最大长度100
verbose_name='create source' # 更新为英文显示名称
),
),
]
]

@ -9,27 +9,61 @@ from djangoblog.utils import get_current_site
# Create your models here.
class BlogUser(AbstractUser):
"""
自定义用户模型继承自Django的AbstractUser基类
扩展了博客系统的用户功能
"""
# 昵称字段,允许为空
nickname = models.CharField(_('nick name'), max_length=100, blank=True)
# 用户创建时间,默认为当前时间
creation_time = models.DateTimeField(_('creation time'), default=now)
# 最后修改时间,默认为当前时间
last_modify_time = models.DateTimeField(_('last modify time'), default=now)
# 用户创建来源(如:网站注册、第三方登录等),允许为空
source = models.CharField(_('create source'), max_length=100, blank=True)
def get_absolute_url(self):
"""
获取用户的绝对URL用于Django的通用视图和模板中
返回用户详情页的URL
"""
return reverse(
'blog:author_detail', kwargs={
'author_name': self.username})
def __str__(self):
"""
定义模型的字符串表示形式
在管理后台和其他显示对象的地方使用
这里使用邮箱作为标识
"""
return self.email
def get_full_url(self):
site = get_current_site().domain
"""
获取用户的完整URL包含域名
用于生成完整的用户主页链接
"""
site = get_current_site().domain # 获取当前站点域名
url = "https://{site}{path}".format(site=site,
path=self.get_absolute_url())
return url
class Meta:
"""模型的元数据配置"""
# 默认按ID降序排列最新的用户排在前面
ordering = ['-id']
# 在管理后台中显示的单数名称
verbose_name = _('user')
# 在管理后台中显示的复数名称
verbose_name_plural = verbose_name
get_latest_by = 'id'
# 指定获取最新记录时使用的字段
get_latest_by = 'id'

@ -13,172 +13,215 @@ from . import utils
class AccountTest(TestCase):
def setUp(self):
self.client = Client()
self.factory = RequestFactory()
"""测试用例初始化方法,每个测试方法执行前都会运行"""
self.client = Client() # 创建测试客户端用于模拟HTTP请求
self.factory = RequestFactory() # 创建请求工厂,用于构建请求对象
# 创建测试用户
self.blog_user = BlogUser.objects.create_user(
username="test",
email="admin@admin.com",
password="12345678"
)
self.new_test = "xxx123--="
self.new_test = "xxx123--=" # 测试用的新密码
def test_validate_account(self):
site = get_current_site().domain
"""测试账户验证功能,包括登录、管理员权限和文章管理"""
site = get_current_site().domain # 获取当前站点域名
# 创建超级用户
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",
password="qwer!@#$ggg")
testuser = BlogUser.objects.get(username='liangliangyy1')
# 测试登录功能
loginresult = self.client.login(
username='liangliangyy1',
password='qwer!@#$ggg')
self.assertEqual(loginresult, True)
self.assertEqual(loginresult, True) # 断言登录成功
# 测试管理员页面访问
response = self.client.get('/admin/')
self.assertEqual(response.status_code, 200)
self.assertEqual(response.status_code, 200) # 断言可以访问管理员页面
# 创建测试分类
category = Category()
category.name = "categoryaaa"
category.creation_time = timezone.now()
category.last_modify_time = timezone.now()
category.save()
# 创建测试文章
article = Article()
article.title = "nicetitleaaa"
article.body = "nicecontentaaa"
article.author = user
article.category = category
article.type = 'a'
article.status = 'p'
article.type = 'a' # 文章类型
article.status = 'p' # 发布状态
article.save()
# 测试文章管理页面访问
response = self.client.get(article.get_admin_url())
self.assertEqual(response.status_code, 200)
self.assertEqual(response.status_code, 200) # 断言可以访问文章管理页面
def test_validate_register(self):
"""测试用户注册流程,包括注册、邮箱验证、登录和权限管理"""
# 验证注册前用户不存在
self.assertEquals(
0, len(
BlogUser.objects.filter(
email='user123@user.com')))
# 发送注册请求
response = self.client.post(reverse('account:register'), {
'username': 'user1233',
'email': 'user123@user.com',
'password1': 'password123!q@wE#R$T',
'password2': 'password123!q@wE#R$T',
})
# 验证注册后用户已创建
self.assertEquals(
1, len(
BlogUser.objects.filter(
email='user123@user.com')))
user = BlogUser.objects.filter(email='user123@user.com')[0]
# 生成邮箱验证签名
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
path = reverse('accounts:result')
url = '{path}?type=validation&id={id}&sign={sign}'.format(
path=path, id=user.id, sign=sign)
# 测试验证页面访问
response = self.client.get(url)
self.assertEqual(response.status_code, 200)
# 测试登录功能
self.client.login(username='user1233', password='password123!q@wE#R$T')
user = BlogUser.objects.filter(email='user123@user.com')[0]
# 提升用户权限为管理员
user.is_superuser = True
user.is_staff = True
user.save()
delete_sidebar_cache()
delete_sidebar_cache() # 清理侧边栏缓存
# 创建测试分类
category = Category()
category.name = "categoryaaa"
category.creation_time = timezone.now()
category.last_modify_time = timezone.now()
category.save()
# 创建测试文章
article = Article()
article.category = category
article.title = "nicetitle333"
article.body = "nicecontentttt"
article.author = user
article.type = 'a'
article.status = 'p'
article.save()
# 测试文章管理页面访问
response = self.client.get(article.get_admin_url())
self.assertEqual(response.status_code, 200)
# 测试登出功能
response = self.client.get(reverse('account:logout'))
self.assertIn(response.status_code, [301, 302, 200])
self.assertIn(response.status_code, [301, 302, 200]) # 登出通常会有重定向
# 登出后测试文章管理页面访问(应该被拒绝)
response = self.client.get(article.get_admin_url())
self.assertIn(response.status_code, [301, 302, 200])
self.assertIn(response.status_code, [301, 302, 200]) # 应该重定向到登录页
# 重新登录测试(使用错误密码)
response = self.client.post(reverse('account:login'), {
'username': 'user1233',
'password': 'password123'
'password': 'password123' # 错误的密码
})
self.assertIn(response.status_code, [301, 302, 200])
# 登录后再次测试文章管理页面访问
response = self.client.get(article.get_admin_url())
self.assertIn(response.status_code, [301, 302, 200])
def test_verify_email_code(self):
"""测试邮箱验证码功能"""
to_email = "admin@admin.com"
code = generate_code()
code = generate_code() # 生成验证码
# 设置验证码到缓存
utils.set_code(to_email, code)
# 发送验证邮件
utils.send_verify_email(to_email, code)
# 测试正确的验证码验证
err = utils.verify("admin@admin.com", code)
self.assertEqual(err, None)
self.assertEqual(err, None) # 应该没有错误
# 测试错误的邮箱验证
err = utils.verify("admin@123.com", code)
self.assertEqual(type(err), str)
self.assertEqual(type(err), str) # 应该返回错误信息字符串
def test_forget_password_email_code_success(self):
"""测试成功发送忘记密码验证码"""
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict(email="admin@admin.com")
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.content.decode("utf-8"), "ok")
self.assertEqual(resp.content.decode("utf-8"), "ok") # 断言返回成功消息
def test_forget_password_email_code_fail(self):
"""测试忘记密码验证码发送失败的情况"""
# 测试空邮箱参数
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict()
)
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
# 测试无效邮箱格式
resp = self.client.post(
path=reverse("account:forget_password_code"),
data=dict(email="admin@com")
data=dict(email="admin@com") # 无效的邮箱格式
)
self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱")
def test_forget_password_email_success(self):
"""测试成功重置密码"""
code = generate_code()
utils.set_code(self.blog_user.email, code)
utils.set_code(self.blog_user.email, code) # 设置验证码到缓存
data = dict(
new_password1=self.new_test,
new_password2=self.new_test,
email=self.blog_user.email,
code=code,
)
# 提交密码重置请求
resp = self.client.post(
path=reverse("account:forget_password"),
data=data
)
self.assertEqual(resp.status_code, 302)
self.assertEqual(resp.status_code, 302) # 重置成功应该重定向
# 验证用户密码是否修改成功
blog_user = BlogUser.objects.filter(
email=self.blog_user.email,
).first() # type: BlogUser
self.assertNotEqual(blog_user, None)
self.assertEqual(blog_user.check_password(data["new_password1"]), True)
self.assertNotEqual(blog_user, None) # 用户应该存在
self.assertEqual(blog_user.check_password(data["new_password1"]), True) # 密码应该匹配
def test_forget_password_email_not_user(self):
"""测试重置密码时用户不存在的情况"""
data = dict(
new_password1=self.new_test,
new_password2=self.new_test,
email="123@123.com",
email="123@123.com", # 不存在的邮箱
code="123456",
)
resp = self.client.post(
@ -186,22 +229,21 @@ class AccountTest(TestCase):
data=data
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.status_code, 200) # 应该返回错误页面而不是重定向
def test_forget_password_email_code_error(self):
"""测试重置密码时验证码错误的情况"""
code = generate_code()
utils.set_code(self.blog_user.email, code)
utils.set_code(self.blog_user.email, code) # 设置正确的验证码
data = dict(
new_password1=self.new_test,
new_password2=self.new_test,
email=self.blog_user.email,
code="111111",
code="111111", # 错误的验证码
)
resp = self.client.post(
path=reverse("account:forget_password"),
data=data
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.status_code, 200) # 应该返回错误页面而不是重定向

@ -4,25 +4,46 @@ from django.urls import re_path
from . import views
from .forms import LoginForm
# 定义应用的命名空间用于URL反向解析
# 在模板中使用如:{% url 'accounts:login' %}
app_name = "accounts"
urlpatterns = [re_path(r'^login/$',
views.LoginView.as_view(success_url='/'),
name='login',
kwargs={'authentication_form': LoginForm}),
re_path(r'^register/$',
views.RegisterView.as_view(success_url="/"),
name='register'),
re_path(r'^logout/$',
views.LogoutView.as_view(),
name='logout'),
path(r'account/result.html',
views.account_result,
name='result'),
re_path(r'^forget_password/$',
views.ForgetPasswordView.as_view(),
name='forget_password'),
re_path(r'^forget_password_code/$',
views.ForgetPasswordEmailCode.as_view(),
name='forget_password_code'),
]
# URL配置列表定义所有用户账户相关的路由
urlpatterns = [
# 登录路由 - 使用正则表达式匹配以login/结尾的URL
re_path(r'^login/$',
# 使用基于类的视图,登录成功后重定向到首页
views.LoginView.as_view(success_url='/'),
name='login', # URL名称用于反向解析
kwargs={'authentication_form': LoginForm}), # 传递自定义登录表单类
# 注册路由 - 使用正则表达式匹配以register/结尾的URL
re_path(r'^register/$',
# 注册视图,注册成功后重定向到首页
views.RegisterView.as_view(success_url="/"),
name='register'), # URL名称
# 登出路由 - 使用正则表达式匹配以logout/结尾的URL
re_path(r'^logout/$',
# 登出视图,处理用户退出登录
views.LogoutView.as_view(),
name='logout'), # URL名称
# 账户操作结果页面 - 使用path匹配精确路径
path(r'account/result.html',
# 使用函数视图显示账户操作结果(如注册成功、密码重置成功等)
views.account_result,
name='result'), # URL名称
# 忘记密码页面 - 使用正则表达式匹配以forget_password/结尾的URL
re_path(r'^forget_password/$',
# 忘记密码视图,显示密码重置页面
views.ForgetPasswordView.as_view(),
name='forget_password'), # URL名称
# 忘记密码验证码接口 - 使用正则表达式匹配以forget_password_code/结尾的URL
re_path(r'^forget_password_code/$',
# 处理忘记密码的邮箱验证码发送和验证
views.ForgetPasswordEmailCode.as_view(),
name='forget_password_code'), # URL名称
]

@ -4,23 +4,58 @@ from django.contrib.auth.backends import ModelBackend
class EmailOrUsernameModelBackend(ModelBackend):
"""
允许使用用户名或邮箱登录
自定义认证后端允许用户使用用户名或邮箱登录
Extends ModelBackend to allow authentication using either username or email.
"""
def authenticate(self, request, username=None, password=None, **kwargs):
"""
用户认证方法
Authenticate a user based on username/email and password.
Args:
request: HTTP请求对象
username: 用户输入的用户名或邮箱
password: 用户输入的密码
**kwargs: 其他参数
Returns:
User: 认证成功的用户对象
None: 认证失败
"""
# 判断输入的是邮箱还是用户名
if '@' in username:
# 如果包含@符号,按邮箱处理
kwargs = {'email': username}
else:
# 否则按用户名处理
kwargs = {'username': username}
try:
# 根据用户名或邮箱查找用户
user = get_user_model().objects.get(**kwargs)
# 验证密码是否正确
if user.check_password(password):
return user
except get_user_model().DoesNotExist:
# 用户不存在时返回None
return None
def get_user(self, username):
def get_user(self, user_id):
"""
根据用户ID获取用户对象
Get a user by their primary key.
Args:
user_id: 用户ID
Returns:
User: 用户对象
None: 用户不存在
"""
try:
return get_user_model().objects.get(pk=username)
# 通过主键查找用户
return get_user_model().objects.get(pk=user_id)
except get_user_model().DoesNotExist:
return None
# 用户不存在时返回None
return None

@ -7,43 +7,58 @@ from django.utils.translation import gettext_lazy as _
from djangoblog.utils import send_email
# 验证码的生存时间Time To Live设置为5分钟
_code_ttl = timedelta(minutes=5)
def send_verify_email(to_mail: str, code: str, subject: str = _("Verify Email")):
"""发送重设密码验证码
"""发送验证邮件
Args:
to_mail: 受邮箱
subject: 邮件主题
code: 验证码
to_mail: 收邮箱地址
subject: 邮件主题默认为"Verify Email"
code: 需要发送的验证码
"""
# 生成邮件HTML内容包含验证码信息并支持国际化
html_content = _(
"You are resetting the password, the verification code is%(code)s, valid within 5 minutes, please keep it "
"properly") % {'code': code}
# 调用邮件发送工具函数发送邮件
send_email([to_mail], subject, html_content)
def verify(email: str, code: str) -> typing.Optional[str]:
"""验证code是否有效
"""验证邮箱和验证码是否匹配
Args:
email: 请求邮箱
code: 验证码
email: 需要验证的邮箱地址
code: 用户输入的验证码
Return:
如果有错误就返回错误str
Node:
这里的错误处理不太合理应该采用raise抛出
否测调用方也需要对error进行处理
如果验证失败返回错误信息字符串验证成功返回None
Note:
当前错误处理方式不够合理建议改为抛出异常的方式
这样调用方可以通过try-except来处理错误而不是检查返回值
"""
# 从缓存中获取该邮箱对应的验证码
cache_code = get_code(email)
# 比较缓存中的验证码和用户输入的验证码
if cache_code != code:
return gettext("Verification code error")
# 验证成功返回None
def set_code(email: str, code: str):
"""设置code"""
"""将验证码存储到缓存中
Args:
email: 作为缓存键的邮箱地址
code: 需要存储的验证码
"""
cache.set(email, code, _code_ttl.seconds)
def get_code(email: str) -> typing.Optional[str]:
"""获取code"""
return cache.get(email)
"""从缓存中获取验证码
Args:
email: 作为缓存键的邮箱地址
Return:
返回缓存中的验证码如果不存在或已过期则返回None
"""
return cache.get(email)

@ -29,31 +29,47 @@ from .models import BlogUser
logger = logging.getLogger(__name__)
# Create your views here.
class RegisterView(FormView):
"""
用户注册视图
处理用户注册流程包括表单验证用户创建和发送验证邮件
"""
form_class = RegisterForm
template_name = 'account/registration_form.html'
@method_decorator(csrf_protect)
def dispatch(self, *args, **kwargs):
"""确保视图受到CSRF保护"""
return super(RegisterView, self).dispatch(*args, **kwargs)
def form_valid(self, form):
"""
处理有效的注册表单
创建非活跃用户发送邮箱验证邮件
"""
if form.is_valid():
# 创建用户但不立即保存到数据库
user = form.save(False)
user.is_active = False
user.source = 'Register'
user.save(True)
user.is_active = False # 邮箱验证前用户不可用
user.source = 'Register' # 标记用户来源
user.save(True) # 保存用户到数据库
# 获取当前站点信息
site = get_current_site().domain
# 生成验证签名,用于验证链接的安全性
sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
# 调试模式下使用本地地址
if settings.DEBUG:
site = '127.0.0.1:8000'
# 构建验证URL
path = reverse('account:result')
url = "http://{site}{path}?type=validation&id={id}&sign={sign}".format(
site=site, path=path, id=user.id, sign=sign)
# 构建邮件内容
content = """
<p>请点击下面链接验证您的邮箱</p>
@ -64,6 +80,8 @@ class RegisterView(FormView):
如果上面链接无法打开请将此链接复制至浏览器
{url}
""".format(url=url)
# 发送验证邮件
send_email(
emailto=[
user.email,
@ -71,134 +89,195 @@ class RegisterView(FormView):
title='验证您的电子邮箱',
content=content)
# 重定向到结果页面
url = reverse('accounts:result') + \
'?type=register&id=' + str(user.id)
return HttpResponseRedirect(url)
else:
# 表单无效,重新渲染表单页面
return self.render_to_response({
'form': form
})
class LogoutView(RedirectView):
url = '/login/'
"""
用户登出视图
处理用户登出操作并清理相关缓存
"""
url = '/login/' # 登出后重定向的URL
@method_decorator(never_cache)
def dispatch(self, request, *args, **kwargs):
"""确保登出页面不被缓存"""
return super(LogoutView, self).dispatch(request, *args, **kwargs)
def get(self, request, *args, **kwargs):
logout(request)
delete_sidebar_cache()
"""处理GET请求的登出操作"""
logout(request) # 执行登出操作
delete_sidebar_cache() # 清理侧边栏缓存
return super(LogoutView, self).get(request, *args, **kwargs)
class LoginView(FormView):
"""
用户登录视图
处理用户认证和登录会话管理
"""
form_class = LoginForm
template_name = 'account/login.html'
success_url = '/'
redirect_field_name = REDIRECT_FIELD_NAME
login_ttl = 2626560 # 一个月的时间
success_url = '/' # 登录成功后默认重定向的URL
redirect_field_name = REDIRECT_FIELD_NAME # 重定向字段名
login_ttl = 2626560 # 一个月的时间(秒),用于"记住我"功能
@method_decorator(sensitive_post_parameters('password'))
@method_decorator(csrf_protect)
@method_decorator(never_cache)
@method_decorator(sensitive_post_parameters('password')) # 保护密码参数
@method_decorator(csrf_protect) # CSRF保护
@method_decorator(never_cache) # 禁止缓存
def dispatch(self, request, *args, **kwargs):
"""应用装饰器到视图分发方法"""
return super(LoginView, self).dispatch(request, *args, **kwargs)
def get_context_data(self, **kwargs):
"""向模板上下文添加重定向URL"""
redirect_to = self.request.GET.get(self.redirect_field_name)
if redirect_to is None:
redirect_to = '/'
redirect_to = '/' # 默认重定向到首页
kwargs['redirect_to'] = redirect_to
return super(LoginView, self).get_context_data(**kwargs)
def form_valid(self, form):
"""处理有效的登录表单"""
# 使用Django的AuthenticationForm进行认证
form = AuthenticationForm(data=self.request.POST, request=self.request)
if form.is_valid():
# 认证成功,清理缓存并记录日志
delete_sidebar_cache()
logger.info(self.redirect_field_name)
# 登录用户
auth.login(self.request, form.get_user())
# 处理"记住我"功能
if self.request.POST.get("remember"):
self.request.session.set_expiry(self.login_ttl)
return super(LoginView, self).form_valid(form)
# return HttpResponseRedirect('/')
else:
# 认证失败,重新显示表单
return self.render_to_response({
'form': form
})
def get_success_url(self):
"""获取登录成功后重定向的URL"""
redirect_to = self.request.POST.get(self.redirect_field_name)
# 验证重定向URL的安全性
if not url_has_allowed_host_and_scheme(
url=redirect_to, allowed_hosts=[
self.request.get_host()]):
redirect_to = self.success_url
redirect_to = self.success_url # 不安全的URL使用默认URL
return redirect_to
def account_result(request):
type = request.GET.get('type')
id = request.GET.get('id')
"""
账户操作结果页面
处理注册结果和邮箱验证
"""
type = request.GET.get('type') # 操作类型register或validation
id = request.GET.get('id') # 用户ID
# 获取用户对象如果不存在返回404
user = get_object_or_404(get_user_model(), id=id)
logger.info(type)
# 如果用户已激活,直接重定向到首页
if user.is_active:
return HttpResponseRedirect('/')
# 处理注册和验证操作
if type and type in ['register', 'validation']:
if type == 'register':
# 注册成功页面
content = '''
恭喜您注册成功一封验证邮件已经发送到您的邮箱请验证您的邮箱后登录本站
'''
title = '注册成功'
else:
# 邮箱验证处理
c_sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id)))
sign = request.GET.get('sign')
# 验证签名安全性
if sign != c_sign:
return HttpResponseForbidden()
return HttpResponseForbidden() # 签名不匹配,禁止访问
# 激活用户账户
user.is_active = True
user.save()
content = '''
恭喜您已经成功的完成邮箱验证您现在可以使用您的账号来登录本站
'''
title = '验证成功'
# 渲染结果页面
return render(request, 'account/result.html', {
'title': title,
'content': content
})
else:
# 无效的操作类型,重定向到首页
return HttpResponseRedirect('/')
class ForgetPasswordView(FormView):
"""
忘记密码视图
处理密码重置请求
"""
form_class = ForgetPasswordForm
template_name = 'account/forget_password.html'
def form_valid(self, form):
"""处理有效的密码重置表单"""
if form.is_valid():
# 根据邮箱查找用户
blog_user = BlogUser.objects.filter(email=form.cleaned_data.get("email")).get()
# 使用Django的密码哈希器设置新密码
blog_user.password = make_password(form.cleaned_data["new_password2"])
blog_user.save()
blog_user.save() # 保存新密码
# 重定向到登录页面
return HttpResponseRedirect('/login/')
else:
# 表单无效,重新显示表单
return self.render_to_response({'form': form})
class ForgetPasswordEmailCode(View):
"""
发送忘记密码验证码视图
处理密码重置验证码的发送
"""
def post(self, request: HttpRequest):
"""处理POST请求发送密码重置验证码"""
form = ForgetPasswordCodeForm(request.POST)
# 验证表单数据
if not form.is_valid():
return HttpResponse("错误的邮箱")
to_email = form.cleaned_data["email"]
# 生成并发送验证码
code = generate_code()
utils.send_verify_email(to_email, code)
utils.set_code(to_email, code)
utils.send_verify_email(to_email, code) # 发送验证邮件
utils.set_code(to_email, code) # 存储验证码(通常在缓存中)
return HttpResponse("ok")
return HttpResponse("ok") # 返回成功响应

@ -1,7 +1,27 @@
# 导入 Django 内置的 Admin 核心模块
# django.contrib.admin 提供了完整的后台管理界面生成、数据CRUD、权限控制等功能
from django.contrib import admin
# Register your models here.
# 说明:该注释为 Django 自动生成,提示开发者在此处注册需要通过后台管理的模型
# 注册方式:使用 admin.site.register(模型类, 自定义Admin类) 关联模型与管理配置
class OwnTrackLogsAdmin(admin.ModelAdmin):
"""
自定义 Admin 配置类继承自 Django 内置的 ModelAdmin
作用配置 OwnTrackLog 模型在后台管理界面的展示形式操作权限数据筛选等功能
若需扩展后台功能可在此类中添加属性/方法如列表显示字段搜索框过滤条件等
"""
# pass 关键字:表示当前类暂未定义额外配置,完全使用 ModelAdmin 的默认行为
# 默认效果:
# 1. 列表页显示模型的所有字段id、tid、lat、lon、creation_time
# 2. 支持点击主键id进入详情页编辑数据
# 3. 支持批量删除、简单搜索(默认搜索主键字段)
# 4. 按模型 Meta 中定义的 ordering 排序(即 creation_time 升序)
pass
# 【注】当前代码缺少模型注册语句,需补充以下代码才能在后台看到该模型(否则配置不生效)
# 需先导入 OwnTrackLog 模型(从对应的 models.py 中),再注册关联
# 完整注册代码示例:
# from .models import OwnTrackLog # 从当前应用的 models.py 导入模型类
# admin.site.register(OwnTrackLog, OwnTrackLogsAdmin) # 关联模型与自定义Admin配置

@ -1,5 +1,26 @@
# 导入 Django 应用配置核心类 AppConfig
# django.apps.AppConfig 是 Django 管理应用元数据的基础类,用于定义应用的名称、初始化逻辑、信号绑定等
from django.apps import AppConfig
class OwntracksConfig(AppConfig):
"""
自定义应用配置类继承自 Django 内置的 AppConfig
作用管理 'owntracks' 应用的核心配置包括应用名称初始化行为模型注册信号监听等
每个 Django 应用建议创建独立的 AppConfig 便于后续扩展应用功能如添加启动时初始化逻辑
"""
# 应用名称Django 识别应用的唯一标识,必须与应用目录名一致(此处为 'owntracks'
# 作用:
# 1. 作为应用的核心标识,用于迁移命令(如 python manage.py migrate owntracks、权限控制等
# 2. 关联 models、views、admin 等模块,确保 Django 能正确识别应用内的组件
# 3. 若需跨应用引用模型,需通过该名称定位(如 from owntracks.models import OwnTrackLog
name = 'owntracks'
# 【可选扩展配置】若需添加更多应用级配置,可在此处补充(示例):
# 1. 应用verbose名称后台管理界面显示的应用名称支持中文
# verbose_name = '用户轨迹管理'
# 2. 定义应用初始化逻辑(如启动时加载数据、绑定信号)
# def ready(self):
# # 导入信号处理模块(避免循环导入,需在 ready 方法内导入)
# import owntracks.signals

@ -1,31 +1,64 @@
# Generated by Django 4.1.7 on 2023-03-02 07:14
# 说明:该文件为 Django 自动生成的数据迁移文件,用于创建数据库表结构
# 生成条件:执行 makemigrations 命令时Django 检测到 models.py 中新增 OwnTrackLog 模型后自动生成
# 兼容版本Django 4.1.7(迁移文件与 Django 版本强相关,修改需注意兼容性)
# 导入 Django 迁移核心模块和模型字段类
from django.db import migrations, models
# 导入 Django 时区工具(用于处理时间字段的时区一致性)
import django.utils.timezone
class Migration(migrations.Migration):
"""
数据迁移类Django 迁移系统的核心载体用于定义数据库结构变更逻辑
每个迁移类对应一次数据库操作如建表改字段删索引等
"""
# 标记是否为初始迁移(首次创建模型时为 True后续修改为 False
initial = True
# 依赖迁移列表:当前迁移依赖的其他迁移文件(为空表示无依赖)
# 若需依赖其他 app 的迁移,格式为 ['其他app名称.迁移文件名前缀']
dependencies = [
]
# 迁移操作列表:定义具体的数据库变更操作
operations = [
# 创建数据库表操作:对应 models.py 中的 OwnTrackLog 模型
migrations.CreateModel(
# 模型名称(必须与 models.py 中定义的类名一致)
name='OwnTrackLog',
# 字段配置:与模型类中的 field 定义一一对应,决定表的列结构
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('tid', models.CharField(max_length=100, verbose_name='用户')),
# 主键字段BigAutoField 为自增bigint类型Django 默认主键类型
('id', models.BigAutoField(
auto_created=True, # 自动创建,无需手动赋值
primary_key=True, # 标记为主键
serialize=False, # 不序列化(主键默认不参与序列化)
verbose_name='ID' # 后台管理界面显示的字段名称
)),
# 用户标识字段CharField 对应数据库 varchar 类型
('tid', models.CharField(
max_length=100, # 最大长度100必填参数
verbose_name='用户' # 后台显示名称,支持中文
)),
# 纬度字段FloatField 对应数据库 float 类型,存储地理纬度(如 39.9042
('lat', models.FloatField(verbose_name='纬度')),
# 经度字段FloatField 对应数据库 float 类型,存储地理经度(如 116.4074
('lon', models.FloatField(verbose_name='经度')),
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 创建时间字段DateTimeField 对应数据库 datetime 类型
('created_time', models.DateTimeField(
default=django.utils.timezone.now, # 默认值:当前时区的当前时间
verbose_name='创建时间' # 后台显示名称
)),
],
# 模型元数据配置:对应模型类中的 Meta 内部类,影响表的整体属性
options={
'verbose_name': 'OwnTrackLogs',
'verbose_name_plural': 'OwnTrackLogs',
'ordering': ['created_time'],
'get_latest_by': 'created_time',
'verbose_name': 'OwnTrackLogs', # 单数形式的表名称(后台显示)
'verbose_name_plural': 'OwnTrackLogs', # 复数形式的表名称(后台列表页显示)
'ordering': ['created_time'], # 默认排序:按创建时间升序排列(-created_time 表示降序)
'get_latest_by': 'created_time', # 支持使用 Model.objects.latest() 方法,默认按创建时间取最新记录
},
),
]
]

@ -1,22 +1,44 @@
# Generated by Django 4.2.5 on 2023-09-06 13:19
# 说明:该文件为 Django 自动生成的**增量迁移文件**,用于更新数据库表结构
# 生成条件:修改 models.py 中 OwnTrackLog 模型的 Meta 配置排序字段、最新记录字段和字段名created_time → creation_time执行 makemigrations 命令生成
# 兼容版本Django 4.2.5(与初始迁移文件 0001_initial 版本需匹配,避免迁移冲突)
# 核心作用1. 重命名字段 created_time 为 creation_time2. 更新模型元数据的排序和最新记录查询字段
# 导入 Django 迁移核心模块(仅需 migrations无需额外字段类因无新增字段
from django.db import migrations
class Migration(migrations.Migration):
"""
数据迁移类定义数据库结构的增量变更操作
本次迁移依赖初始迁移文件仅修改字段名称和模型元数据不改变表结构核心逻辑
"""
# 依赖迁移列表:当前迁移必须在 'owntracks' 应用的 0001_initial 迁移执行后才能运行
# 格式:['应用名称.迁移文件前缀'],确保迁移顺序正确,避免字段不存在导致的报错
dependencies = [
('owntracks', '0001_initial'),
('owntracks', '0001_initial'), # 依赖初始迁移(创建 OwnTrackLog 表的迁移)
]
# 迁移操作列表:包含两个核心变更操作,按顺序执行
operations = [
# 操作1修改模型的元数据配置对应 models.py 中 OwnTrackLog 类的 Meta 内部类)
migrations.AlterModelOptions(
name='owntracklog',
options={'get_latest_by': 'creation_time', 'ordering': ['creation_time'], 'verbose_name': 'OwnTrackLogs', 'verbose_name_plural': 'OwnTrackLogs'},
name='owntracklog', # 目标模型名称(必须与 models.py 中类名一致)
options={
'get_latest_by': 'creation_time', # 更新「查询最新记录」的字段:从 created_time 改为新字段 creation_time
# 影响 Model.objects.latest() 方法的默认查询逻辑
'ordering': ['creation_time'], # 更新默认排序字段:从 created_time 改为 creation_time升序
'verbose_name': 'OwnTrackLogs', # 单数显示名称(未修改,与初始迁移一致)
'verbose_name_plural': 'OwnTrackLogs', # 复数显示名称(未修改)
},
),
# 操作2重命名模型的字段数据库表中对应列名也会同步修改
migrations.RenameField(
model_name='owntracklog',
old_name='created_time',
new_name='creation_time',
model_name='owntracklog', # 目标模型名称
old_name='created_time', # 旧字段名(原模型中定义的字段名)
new_name='creation_time', # 新字段名(修改后的字段名)
# 说明该操作会同步更新数据库表中对应的列名created_time → creation_time且保留原有字段数据
# 无需手动处理数据迁移Django 会自动完成字段名映射和数据保留
),
]
]

@ -1,20 +1,54 @@
# 导入 Django ORM 核心模块models 用于定义数据模型,对应数据库表结构
from django.db import models
# 导入 Django 时区工具now() 用于获取当前时区的时间(避免时区不一致问题)
from django.utils.timezone import now
# Create your models here.
# 说明:该注释为 Django 自动生成,提示开发者在此处定义数据模型类
# 模型类与数据库表的映射关系:每个模型类对应一张数据库表,类属性对应表字段
class OwnTrackLog(models.Model):
tid = models.CharField(max_length=100, null=False, verbose_name='用户')
lat = models.FloatField(verbose_name='纬度')
"""
轨迹数据模型类继承自 Django 内置的 models.Model所有数据模型的基类
核心作用存储用户的地理轨迹信息用户标识经纬度创建时间
映射数据库表名默认生成规则为应用名_模型名小写 owntracks_owntracklog
"""
# 1. 用户标识字段存储用户唯一标识如设备ID、用户名等
tid = models.CharField(
max_length=100, # 字段最大长度CharField 必填参数),适配多数用户标识场景
null=False, # 数据库层面不允许为空(必填字段),确保数据完整性
verbose_name='用户' # Django 后台管理界面显示的字段名称(支持中文)
)
# 2. 纬度字段:存储地理纬度值(如 39.9042,支持正负值,适配全球地理坐标)
lat = models.FloatField(verbose_name='纬度') # FloatField 对应数据库 float 类型,满足精度需求
# 3. 经度字段:存储地理经度值(如 116.4074,与纬度配合定位地理坐标)
lon = models.FloatField(verbose_name='经度')
creation_time = models.DateTimeField('创建时间', default=now)
# 4. 创建时间字段:记录轨迹数据的生成时间
creation_time = models.DateTimeField(
'创建时间', # verbose_name 的简写形式(第一个参数直接指定后台显示名称)
default=now # 默认值当前时区的当前时间now 是可调用对象,每次创建记录时动态获取)
# 注:区别于 datetime.datetime.now()django.utils.timezone.now() 包含时区信息,符合 Django 时区配置
)
def __str__(self):
"""
模型实例的字符串表示方法
作用 Django 后台终端打印实例时显示直观的标识而非默认的 <OwnTrackLog object>
返回值以用户标识tid作为实例的字符串描述便于区分不同用户的轨迹数据
"""
return self.tid
class Meta:
ordering = ['creation_time']
verbose_name = "OwnTrackLogs"
verbose_name_plural = verbose_name
get_latest_by = 'creation_time'
"""
模型元数据类用于配置模型的整体属性不对应表字段影响表的行为和显示
所有配置仅作用于当前模型不影响其他模型
"""
ordering = ['creation_time'] # 默认排序规则:按创建时间升序排列(-creation_time 表示降序)
verbose_name = "OwnTrackLogs" # 后台管理界面显示的「单数模型名称」
verbose_name_plural = verbose_name # 后台管理界面显示的「复数模型名称」(此处与单数一致,避免英文复数歧义)
get_latest_by = 'creation_time' # 支持 Model.objects.latest() 方法,默认按创建时间获取最新一条记录

@ -1,64 +1,124 @@
# 导入 JSON 模块:用于将 Python 字典序列化为 JSON 字符串(适配接口的 JSON 数据格式)
import json
# 导入 Django 测试核心工具:
# - Client模拟客户端发起 HTTP 请求GET/POST 等),用于测试视图接口
# - RequestFactory生成原始请求对象适用于单独测试视图函数/类,本用例未直接使用)
# - TestCaseDjango 单元测试基类,提供断言方法、测试环境初始化/清理等功能
from django.test import Client, RequestFactory, TestCase
# 导入跨应用模型BlogUser用户模型用于测试登录权限相关接口
from accounts.models import BlogUser
# 导入当前应用的测试目标模型OwnTrackLog轨迹数据模型用于验证数据读写
from .models import OwnTrackLog
# Create your tests here.
# 说明:该注释为 Django 自动生成,提示开发者在此处定义测试类/测试方法
class OwnTrackLogTest(TestCase):
"""
轨迹数据相关接口与模型单元测试类
继承自 TestCase专注测试 OwnTrackLog 模型的数据读写及相关视图接口/owntracks/ 下的接口
测试覆盖场景数据提交合法/非法接口权限控制响应状态码验证
"""
def setUp(self):
"""
测试前置初始化方法
在每个测试方法执行前自动调用用于创建测试所需的公共资源
作用避免重复代码确保每个测试方法的环境一致性
"""
# 初始化客户端对象:模拟浏览器发起 HTTP 请求,后续所有接口测试均通过该对象执行
self.client = Client()
# 初始化请求工厂对象:用于生成自定义请求(本用例未直接使用,预留扩展)
self.factory = RequestFactory()
def test_own_track_log(self):
"""
核心测试方法命名以 test_ 开头Django 测试框架自动识别执行
测试内容
1. 合法轨迹数据提交完整字段 验证数据是否成功写入数据库
2. 非法轨迹数据提交缺少必填字段 验证数据是否被拒绝数据库无新增
3. 未登录状态访问需权限接口 验证是否重定向302
4. 管理员登录后访问接口 验证是否正常响应200
5. 管理员登录后操作模型 验证数据写入及接口查询功能
"""
# --------------- 场景1提交完整合法的轨迹数据tid、lat、lon 字段齐全)---------------
# 构造合法的请求数据字典:包含模型所需的所有必填字段
o = {
'tid': 12,
'lat': 123.123,
'lon': 134.341
'tid': 12, # 用户标识(整数类型,模型中 CharField 会自动转换为字符串存储)
'lat': 123.123, # 纬度(合法浮点数)
'lon': 134.341 # 经度(合法浮点数)
}
# 模拟 POST 请求:向轨迹提交接口发送 JSON 格式数据
self.client.post(
'/owntracks/logtracks',
json.dumps(o),
content_type='application/json')
length = len(OwnTrackLog.objects.all())
self.assertEqual(length, 1)
'/owntracks/logtracks', # 请求接口路径(需与 urls.py 中配置一致)
json.dumps(o), # 请求体:将字典序列化为 JSON 字符串
content_type='application/json' # 指定请求头:声明数据格式为 JSON
)
# 验证:数据库中是否新增 1 条轨迹记录(断言实际数量与预期一致)
length = len(OwnTrackLog.objects.all()) # 查询所有轨迹记录的数量
self.assertEqual(length, 1) # 断言数量为 1 → 验证合法数据提交成功
# --------------- 场景2提交非法轨迹数据缺少必填字段 lon---------------
# 构造非法请求数据缺少经度lon字段模型中 lon 为必填字段,无 null=True 配置)
o = {
'tid': 12,
'lat': 123.123
'tid': 12, # 用户标识
'lat': 123.123 # 纬度(仅含该字段,缺少 lon
}
# 再次向同一接口发送非法数据
self.client.post(
'/owntracks/logtracks',
json.dumps(o),
content_type='application/json')
# 验证:数据库记录数量是否仍为 1非法数据未被写入
length = len(OwnTrackLog.objects.all())
self.assertEqual(length, 1)
self.assertEqual(length, 1) # 断言数量不变 → 验证非法数据被拒绝
# --------------- 场景3未登录状态访问需权限的地图展示接口 ---------------
# 模拟 GET 请求:未登录状态下访问 /owntracks/show_maps 接口
rsp = self.client.get('/owntracks/show_maps')
self.assertEqual(rsp.status_code, 302)
# 验证:响应状态码是否为 302重定向通常跳转到登录页
self.assertEqual(rsp.status_code, 302) # 断言重定向 → 验证接口权限控制生效
# --------------- 场景4创建管理员用户并登录 ---------------
# 创建超级用户(管理员):用于测试登录后访问权限接口
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",
password="liangliangyy1")
email="liangliangyy1@gmail.com", # 邮箱(超级用户必填字段)
username="liangliangyy1", # 用户名(登录账号)
password="liangliangyy1") # 密码(登录密码)
# 模拟管理员登录:使用上述创建的账号密码登录系统
self.client.login(username='liangliangyy1', password='liangliangyy1')
# 手动创建一条轨迹记录(用于后续接口查询测试)
s = OwnTrackLog()
s.tid = 12
s.lon = 123.234
s.lat = 34.234
s.save()
s.tid = 12 # 设置用户标识
s.lon = 123.234 # 设置经度
s.lat = 34.234 # 设置纬度
# creation_time 字段使用默认值(当前时间),无需手动赋值
s.save() # 保存到数据库
# --------------- 场景5登录后访问各类接口验证响应状态 ---------------
# 1. 访问日期列表接口 → 预期 200正常响应
rsp = self.client.get('/owntracks/show_dates')
self.assertEqual(rsp.status_code, 200)
# 2. 再次访问地图展示接口 → 预期 200已登录权限通过
rsp = self.client.get('/owntracks/show_maps')
self.assertEqual(rsp.status_code, 200)
# 3. 访问轨迹数据查询接口(无日期参数)→ 预期 200
rsp = self.client.get('/owntracks/get_datas')
self.assertEqual(rsp.status_code, 200)
# 4. 访问轨迹数据查询接口(带日期参数)→ 预期 200
rsp = self.client.get('/owntracks/get_datas?date=2018-02-26')
self.assertEqual(rsp.status_code, 200)
# 注:此处仅验证接口是否正常响应(状态码 200未验证返回数据的正确性可根据需求补充数据断言

@ -1,12 +1,40 @@
# 导入 Django 路由核心函数path 用于定义 URL 路径与视图函数的映射关系
from django.urls import path
# 导入当前应用的视图模块views 中包含所有路由对应的业务处理函数
from . import views
# 应用路由命名空间:用于区分不同应用的同名路由(避免反向解析时冲突)
# 作用在模板或视图中通过「app_name:route_name」反向生成 URL如 reverse('owntracks:logtracks')
app_name = "owntracks"
# 路由配置列表:每个 path 对应一条 URL 规则,按定义顺序匹配(优先匹配靠前的规则)
urlpatterns = [
path('owntracks/logtracks', views.manage_owntrack_log, name='logtracks'),
path('owntracks/show_maps', views.show_maps, name='show_maps'),
path('owntracks/get_datas', views.get_datas, name='get_datas'),
path('owntracks/show_dates', views.show_log_dates, name='show_dates')
]
# 1. 轨迹数据提交接口:接收客户端发送的轨迹数据(经纬度、用户标识)并存储
path(
'owntracks/logtracks', # URL 路径:客户端访问的接口地址(需完整匹配)
views.manage_owntrack_log, # 对应的视图函数:处理该 URL 的业务逻辑(如数据验证、写入数据库)
name='logtracks' # 路由别名:用于反向解析 URL替代硬编码路径便于维护
),
# 2. 地图展示接口:渲染包含用户轨迹的地图页面(需登录权限)
path(
'owntracks/show_maps', # URL 路径:地图展示页面地址
views.show_maps, # 视图函数:查询轨迹数据并传递给模板渲染地图
name='show_maps' # 路由别名:如模板中使用 {% url 'owntracks:show_maps' %} 生成 URL
),
# 3. 轨迹数据查询接口:返回指定条件的轨迹数据(如按日期筛选),通常用于前端异步请求
path(
'owntracks/get_datas', # URL 路径:数据查询接口地址(支持带查询参数,如 ?date=2023-09-06
views.get_datas, # 视图函数:处理查询条件,从数据库筛选数据并返回(如 JSON 格式)
name='get_datas' # 路由别名:前端 AJAX 请求时可通过反向解析获取接口地址
),
# 4. 轨迹日期列表接口:返回所有轨迹数据的日期列表(用于前端筛选日期选择)
path(
'owntracks/show_dates', # URL 路径:日期列表展示/查询地址
views.show_log_dates, # 视图函数:查询数据库中轨迹数据的所有日期并返回(去重处理)
name='show_dates' # 路由别名:用于反向生成日期筛选接口的 URL
)
]

@ -1,127 +1,237 @@
# Create your views here.
import datetime
import itertools
import json
import logging
from datetime import timezone
from itertools import groupby
import django
import requests
from django.contrib.auth.decorators import login_required
from django.http import HttpResponse
from django.http import JsonResponse
from django.shortcuts import render
from django.views.decorators.csrf import csrf_exempt
from .models import OwnTrackLog
# 说明:该文件为 Django 视图层核心文件,包含所有 /owntracks/ 路由对应的业务处理逻辑
# 视图函数职责:接收请求、处理数据(数据库读写/第三方接口调用)、返回响应(页面/JSON/状态码)
# 导入标准库模块
import datetime # 处理日期时间相关操作(如日期计算、格式化)
import itertools # 提供迭代器工具(如切片、分组,用于批量处理经纬度数据)
import json # 处理 JSON 数据序列化/反序列化(适配接口请求/响应)
import logging # 日志模块:记录业务日志(信息/错误),便于问题排查
from datetime import timezone # 处理时区相关(确保时间计算一致性)
from itertools import groupby # 分组工具按用户标识tid分组轨迹数据
# 导入第三方库/框架模块
import django # Django 核心模块(用于时区时间处理)
import requests # HTTP 请求库:调用高德地图坐标转换接口
from django.contrib.auth.decorators import login_required # 登录验证装饰器:限制未登录用户访问
from django.http import HttpResponse # 基础响应类:返回文本/状态码响应
from django.http import JsonResponse # JSON 响应类:返回 JSON 格式数据(适配前端异步请求)
from django.shortcuts import render # 页面渲染函数:加载模板并返回 HTML 页面
from django.views.decorators.csrf import csrf_exempt # CSRF 豁免装饰器:关闭跨站请求伪造保护(适配第三方客户端提交数据)
# 导入当前应用模块
from .models import OwnTrackLog # 轨迹数据模型:用于数据库读写操作
# 初始化日志对象:按当前模块名创建日志实例,日志输出会携带模块标识
logger = logging.getLogger(__name__)
@csrf_exempt
@csrf_exempt # 豁免 CSRF 验证:因客户端(如设备/第三方系统)可能无法提供 CSRF Token故关闭保护
def manage_owntrack_log(request):
"""
轨迹数据提交接口视图
功能接收客户端 POST 提交的 JSON 格式轨迹数据tid/经纬度验证后写入数据库
请求方式POST仅支持 POST其他方式会因缺少请求体报错
请求体格式{"tid": "用户标识", "lat": 纬度值, "lon": 经度值}
响应
- 成功写入返回 "ok"HTTP 200
- 数据不完整返回 "data error"HTTP 200
- 异常报错返回 "error"HTTP 200并记录错误日志
"""
try:
# 读取请求体:将 JSON 字符串解码为 Python 字典utf-8 编码适配中文/特殊字符)
s = json.loads(request.read().decode('utf-8'))
# 提取请求数据中的核心字段(用户标识、纬度、经度)
tid = s['tid']
lat = s['lat']
lon = s['lon']
# 记录信息日志:打印提交的轨迹数据(便于追踪数据流转)
logger.info(
'tid:{tid}.lat:{lat}.lon:{lon}'.format(
tid=tid, lat=lat, lon=lon))
tid=tid, lat=lat, lon=lon)
)
# 数据合法性校验:确保核心字段非空(避免写入无效数据)
if tid and lat and lon:
# 创建模型实例并赋值
m = OwnTrackLog()
m.tid = tid
m.lat = lat
m.lon = lon
m.save()
return HttpResponse('ok')
m.tid = tid # 用户标识
m.lat = lat # 纬度
m.lon = lon # 经度
# creation_time 字段使用默认值(当前时间),无需手动赋值
m.save() # 保存到数据库
return HttpResponse('ok') # 响应成功标识
else:
# 数据不完整:返回错误提示
return HttpResponse('data error')
except Exception as e:
logger.error(e)
return HttpResponse('error')
# 捕获所有异常(如 JSON 解析失败、字段缺失、数据库报错等)
logger.error(e) # 记录错误日志(包含异常堆栈信息,便于排查)
return HttpResponse('error') # 响应错误标识
@login_required
@login_required # 登录验证:仅登录用户可访问,未登录自动重定向到登录页
def show_maps(request):
"""
地图展示页面视图
功能渲染包含用户轨迹的地图页面需管理员权限
请求方式GET
请求参数?date=YYYY-MM-DD可选默认当前日期
响应
- 管理员登录返回地图 HTML 页面携带日期参数
- 非管理员登录返回 403 禁止访问
"""
# 权限二次校验仅超级管理员is_superuser=True可访问普通登录用户无权限
if request.user.is_superuser:
# 计算默认日期:当前 UTC 时间的日期格式YYYY-MM-DD
defaultdate = str(datetime.datetime.now(timezone.utc).date())
# 获取请求参数中的日期(若未传则使用默认日期)
date = request.GET.get('date', defaultdate)
# 构造模板上下文:传递日期参数给前端模板(用于筛选该日期的轨迹数据)
context = {
'date': date
}
# 渲染模板:加载 show_maps.html 模板并传入上下文,返回 HTML 响应
return render(request, 'owntracks/show_maps.html', context)
else:
# 非管理员:导入并返回 403 禁止访问响应
from django.http import HttpResponseForbidden
return HttpResponseForbidden()
@login_required
@login_required # 登录验证:仅登录用户可访问
def show_log_dates(request):
"""
轨迹日期列表页面视图
功能查询数据库中所有轨迹数据的日期去重渲染日期列表页面用于前端筛选
请求方式GET
响应返回日期列表 HTML 页面包含去重后的所有轨迹日期
"""
# 查询所有轨迹记录的创建时间(仅取 creation_time 字段flat=True 返回一维列表)
dates = OwnTrackLog.objects.values_list('creation_time', flat=True)
# 日期处理:
# 1. map 转换:将 datetime 对象格式化为 'YYYY-MM-DD' 字符串
# 2. set 去重:去除重复日期
# 3. sorted 排序:按日期升序排列
# 4. list 转换:转为列表用于模板渲染
results = list(sorted(set(map(lambda x: x.strftime('%Y-%m-%d'), dates))))
# 构造上下文:传递日期列表给模板
context = {
'results': results
}
# 渲染日期列表模板
return render(request, 'owntracks/show_log_dates.html', context)
def convert_to_amap(locations):
convert_result = []
it = iter(locations)
"""
高德地图坐标转换工具函数
功能 GPS 坐标系WGS84的经纬度转换为高德坐标系GCJ02
原因GPS 原始坐标在高德地图上会有偏移转换后可精准定位
参数locations - OwnTrackLog 模型实例列表包含 lon/lat 字段
返回值转换后的经纬度字符串格式"lon1,lat1;lon2,lat2;..."
限制高德接口单次最多支持 30 个坐标故分批次转换
"""
convert_result = [] # 存储所有批次的转换结果
it = iter(locations) # 将列表转为迭代器,便于分批次切片
# 循环分批次处理:每次取 30 个坐标(适配高德接口限制)
item = list(itertools.islice(it, 30))
while item:
# 构造坐标字符串:将每个实例的 lon/lat 拼接为 "lon,lat",再用 ";" 连接多个坐标
# set 去重:避免重复坐标提交(减少接口调用量)
datas = ';'.join(
set(map(lambda x: str(x.lon) + ',' + str(x.lat), item)))
set(map(lambda x: str(x.lon) + ',' + str(x.lat), item))
)
key = '8440a376dfc9743d8924bf0ad141f28e'
api = 'http://restapi.amap.com/v3/assistant/coordinate/convert'
# 高德地图坐标转换接口配置
key = '8440a376dfc9743d8924bf0ad141f28e' # 高德开发者密钥(需替换为有效密钥)
api = 'http://restapi.amap.com/v3/assistant/coordinate/convert' # 转换接口地址
query = {
'key': key,
'locations': datas,
'coordsys': 'gps'
'key': key, # 开发者密钥(必填)
'locations': datas, # 待转换的坐标字符串
'coordsys': 'gps' # 源坐标系gpsWGS84
}
# 调用高德接口GET 请求)
rsp = requests.get(url=api, params=query)
# 解析接口响应JSON 转字典)
result = json.loads(rsp.text)
# 若响应包含 "locations" 字段(转换成功),添加到结果列表
if "locations" in result:
convert_result.append(result['locations'])
# 处理下一批次坐标
item = list(itertools.islice(it, 30))
# 拼接所有批次结果,返回统一格式的坐标字符串
return ";".join(convert_result)
@login_required
@login_required # 登录验证:仅登录用户可访问
def get_datas(request):
"""
轨迹数据查询接口视图
功能按日期筛选轨迹数据按用户标识tid分组返回 JSON 格式的轨迹路径经纬度列表
请求方式GET
请求参数?date=YYYY-MM-DD可选默认当前日期
响应JSON 数组格式[{"name": "tid1", "path": [[lon1,lat1], [lon2,lat2], ...]}, ...]
"""
# 获取当前 UTC 时间(带时区信息,确保与数据库时间字段时区一致)
now = django.utils.timezone.now().replace(tzinfo=timezone.utc)
# 构造默认查询日期:当前日期的 00:00:00UTC 时间)
querydate = django.utils.timezone.datetime(
now.year, now.month, now.day, 0, 0, 0)
now.year, now.month, now.day, 0, 0, 0
)
# 若请求携带 date 参数,解析为指定日期的 00:00:00
if request.GET.get('date', None):
# 拆分日期字符串YYYY-MM-DD → [年, 月, 日])并转为整数
date = list(map(lambda x: int(x), request.GET.get('date').split('-')))
querydate = django.utils.timezone.datetime(
date[0], date[1], date[2], 0, 0, 0)
date[0], date[1], date[2], 0, 0, 0
)
# 构造查询结束日期:查询日期的次日 00:00:00即筛选 [querydate, nextdate) 区间的数据)
nextdate = querydate + datetime.timedelta(days=1)
# 数据库查询:筛选指定日期区间内的所有轨迹记录
models = OwnTrackLog.objects.filter(
creation_time__range=(querydate, nextdate))
result = list()
creation_time__range=(querydate, nextdate)
)
result = list() # 存储最终返回的 JSON 数据
# 若查询到数据,按 tid 分组并构造轨迹路径
if models and len(models):
# 1. sorted按 tid 排序(确保相同 tid 的记录连续,为 groupby 分组做准备)
# 2. groupby按 tid 分组key 为分组依据tid
for tid, item in groupby(
sorted(models, key=lambda k: k.tid), key=lambda k: k.tid):
# 构造单个用户的轨迹数据字典
d = dict()
d["name"] = tid
paths = list()
# 使用高德转换后的经纬度
d["name"] = tid # 用户标识(用于前端区分不同用户的轨迹)
paths = list() # 存储该用户的经纬度路径列表
# 【可选】使用高德转换后的经纬度(当前注释未启用,默认使用 GPS 原始坐标)
# locations = convert_to_amap(
# sorted(item, key=lambda x: x.creation_time))
# sorted(item, key=lambda x: x.creation_time) # 按创建时间排序,确保轨迹顺序正确
# )
# for i in locations.split(';'):
# paths.append(i.split(','))
# 使用GPS原始经纬度
# paths.append(i.split(',')) # 拆分坐标为 [lon, lat] 列表
# 使用 GPS 原始经纬度(默认启用)
# 按创建时间排序:确保轨迹点按时间顺序排列(避免路径错乱)
for location in sorted(item, key=lambda x: x.creation_time):
# 转为字符串格式(避免 JSON 序列化时的精度问题),添加到路径列表
paths.append([str(location.lon), str(location.lat)])
d["path"] = paths
result.append(d)
d["path"] = paths # 关联路径列表到用户字典
result.append(d) # 添加到最终结果列表
# 返回 JSON 响应safe=False 允许返回非字典类型(此处为列表)
return JsonResponse(result, safe=False)

@ -5,28 +5,83 @@ from djangoblog.utils import cache
class MemcacheStorage(SessionStorage):
"""
基于Memcache的会话存储实现类
该类继承自SessionStorage使用memcache作为后端存储来管理会话数据
Args:
prefix (str): 存储键名的前缀默认为'ws_'
"""
def __init__(self, prefix='ws_'):
self.prefix = prefix
self.cache = cache
@property
def is_available(self):
"""
检查存储是否可用
通过设置并获取一个测试值来验证存储服务的可用性
Returns:
bool: 存储服务可用返回True否则返回False
"""
value = "1"
self.set('checkavaliable', value=value)
return value == self.get('checkavaliable')
def key_name(self, s):
"""
生成带前缀的完整键名
Args:
s (str): 原始键名
Returns:
str: 添加前缀后的完整键名
"""
return '{prefix}{s}'.format(prefix=self.prefix, s=s)
def get(self, id):
"""
根据ID获取会话数据
Args:
id (str): 会话ID
Returns:
dict: 解析后的会话数据字典
"""
# 构造完整的缓存键名
id = self.key_name(id)
# 从缓存中获取会话数据如果不存在则返回空JSON对象
session_json = self.cache.get(id) or '{}'
# 将JSON字符串解析为Python对象并返回
return json_loads(session_json)
def set(self, id, value):
"""
设置会话数据
Args:
id (str): 会话ID
value (any): 要存储的会话数据
"""
# 构造完整的缓存键名
id = self.key_name(id)
# 将数据序列化为JSON字符串并存储到缓存中
self.cache.set(id, json_dumps(value))
def delete(self, id):
"""
删除指定ID的会话数据
Args:
id (str): 要删除的会话ID
"""
# 构造完整的缓存键名
id = self.key_name(id)
# 从缓存中删除对应的会话数据
self.cache.delete(id)

@ -3,10 +3,25 @@ from django.contrib import admin
class CommandsAdmin(admin.ModelAdmin):
"""
命令管理后台类
用于在Django管理后台中展示和管理命令信息配置了列表页面显示的字段
"""
list_display = ('title', 'command', 'describe')
class EmailSendLogAdmin(admin.ModelAdmin):
"""
邮件发送日志管理后台类
用于在Django管理后台中展示和管理邮件发送日志信息配置了列表页面显示的字段
和只读字段并重写了权限控制方法
Attributes:
list_display: 列表页面显示的字段元组
readonly_fields: 只读字段元组
"""
list_display = ('title', 'emailto', 'send_result', 'creation_time')
readonly_fields = (
'title',
@ -16,4 +31,17 @@ class EmailSendLogAdmin(admin.ModelAdmin):
'content')
def has_add_permission(self, request):
"""
控制是否具有添加新记录的权限
重写父类方法禁止用户在管理后台手动添加邮件发送日志记录
Args:
request: HTTP请求对象
Returns:
bool: 总是返回False表示没有添加权限
"""
return False

@ -1,27 +1,72 @@
from haystack.query import SearchQuerySet
#hz代码注释
from haystack.query import SearchQuerySet
from blog.models import Article, Category
class BlogApi:
"""
博客API类提供文章搜索分类获取等相关功能
Attributes:
searchqueryset (SearchQuerySet): 搜索查询集对象
__max_takecount__ (int): 最大返回记录数默认为8
"""
def __init__(self):
"""
初始化BlogApi实例
"""
self.searchqueryset = SearchQuerySet()
self.searchqueryset.auto_query('')
self.__max_takecount__ = 8
def search_articles(self, query):
"""
根据查询关键字搜索文章
Args:
query (str): 搜索关键字
Returns:
list: 匹配的文章列表最多返回__max_takecount__条记录
"""
sqs = self.searchqueryset.auto_query(query)
sqs = sqs.load_all()
return sqs[:self.__max_takecount__]
def get_category_lists(self):
"""
获取所有文章分类列表
Returns:
QuerySet: 所有分类对象的查询集
"""
return Category.objects.all()
def get_category_articles(self, categoryname):
"""
根据分类名称获取该分类下的文章列表
Args:
categoryname (str): 分类名称
Returns:
QuerySet or None: 指定分类下的文章查询集最多返回__max_takecount__条记录
如果没有找到相关文章则返回None
"""
articles = Article.objects.filter(category__name=categoryname)
if articles:
return articles[:self.__max_takecount__]
return None
def get_recent_articles(self):
"""
获取最近发布的文章列表
Returns:
QuerySet: 最近发布的文章查询集最多返回__max_takecount__条记录
"""
return Article.objects.all()[:self.__max_takecount__]

@ -2,4 +2,14 @@ from django.apps import AppConfig
class ServermanagerConfig(AppConfig):
"""
Django应用配置类
该类用于配置servermanager应用的基本信息继承自Django的AppConfig基类
通过设置name属性来指定应用的名称Django框架会使用这个配置来识别和管理应用
属性:
name (str): 应用的名称用于Django框架识别该应用模块
"""
name = 'servermanager'

@ -1,9 +1,16 @@
# Generated by Django 4.1.7 on 2023-03-02 07:14
#hz代码注释
from django.db import migrations, models
class Migration(migrations.Migration):
"""
Django数据库迁移类用于创建初始数据表结构
该迁移文件包含两个模型的创建操作
1. commands模型 - 用于存储命令信息
2. EmailSendLog模型 - 用于记录邮件发送日志
"""
initial = True
@ -11,6 +18,7 @@ class Migration(migrations.Migration):
]
operations = [
# 创建commands数据表用于存储命令相关信息
migrations.CreateModel(
name='commands',
fields=[
@ -26,6 +34,7 @@ class Migration(migrations.Migration):
'verbose_name_plural': '命令',
},
),
# 创建EmailSendLog数据表用于记录邮件发送日志信息
migrations.CreateModel(
name='EmailSendLog',
fields=[
@ -43,3 +52,4 @@ class Migration(migrations.Migration):
},
),
]

@ -1,29 +1,38 @@
# Generated by Django 4.2.5 on 2023-09-06 13:19
#hx代码注释
from django.db import migrations
class Migration(migrations.Migration):
"""
Django数据库迁移类用于执行模型字段重命名和模型选项修改操作
该迁移依赖于servermanager应用的0001_initial迁移文件
"""
dependencies = [
('servermanager', '0001_initial'),
]
operations = [
# 修改EmailSendLog模型的元数据选项设置排序规则和显示名称
migrations.AlterModelOptions(
name='emailsendlog',
options={'ordering': ['-creation_time'], 'verbose_name': '邮件发送log', 'verbose_name_plural': '邮件发送log'},
),
# 重命名Commands模型的created_time字段为creation_time
migrations.RenameField(
model_name='commands',
old_name='created_time',
new_name='creation_time',
),
# 重命名Commands模型的last_mod_time字段为last_modify_time
migrations.RenameField(
model_name='commands',
old_name='last_mod_time',
new_name='last_modify_time',
),
# 重命名EmailSendLog模型的created_time字段为creation_time
migrations.RenameField(
model_name='emailsendlog',
old_name='created_time',

@ -3,6 +3,18 @@ from django.db import models
# Create your models here.
class commands(models.Model):
"""
命令模型类
用于存储命令相关信息的数据库模型
Attributes:
title (CharField): 命令标题最大长度300字符
command (CharField): 命令内容最大长度2000字符
describe (CharField): 命令描述最大长度300字符
creation_time (DateTimeField): 创建时间自动设置为记录创建时的时间
last_modify_time (DateTimeField): 修改时间自动更新为记录每次修改的时间
"""
title = models.CharField('命令标题', max_length=300)
command = models.CharField('命令', max_length=2000)
describe = models.CharField('命令描述', max_length=300)
@ -10,14 +22,37 @@ class commands(models.Model):
last_modify_time = models.DateTimeField('修改时间', auto_now=True)
def __str__(self):
"""
返回命令对象的字符串表示
Returns:
str: 命令的标题
"""
return self.title
class Meta:
"""
模型元数据配置
配置模型在Django管理界面中的显示名称
"""
verbose_name = '命令'
verbose_name_plural = verbose_name
class EmailSendLog(models.Model):
"""
邮件发送日志模型类
用于记录邮件发送历史和结果的数据库模型
Attributes:
emailto (CharField): 收件人邮箱地址最大长度300字符
title (CharField): 邮件标题最大长度2000字符
content (TextField): 邮件正文内容
send_result (BooleanField): 邮件发送结果True表示成功False表示失败
creation_time (DateTimeField): 创建时间自动设置为记录创建时的时间
"""
emailto = models.CharField('收件人', max_length=300)
title = models.CharField('邮件标题', max_length=2000)
content = models.TextField('邮件内容')
@ -25,9 +60,20 @@ class EmailSendLog(models.Model):
creation_time = models.DateTimeField('创建时间', auto_now_add=True)
def __str__(self):
"""
返回邮件发送日志对象的字符串表示
Returns:
str: 邮件的标题
"""
return self.title
class Meta:
"""
模型元数据配置
配置模型在Django管理界面中的显示名称和排序规则
"""
verbose_name = '邮件发送log'
verbose_name_plural = verbose_name
ordering = ['-creation_time']

@ -13,29 +13,46 @@ from servermanager.api.blogapi import BlogApi
from servermanager.api.commonapi import ChatGPT, CommandHandler
from .MemcacheStorage import MemcacheStorage
# 初始化微信机器人实例配置token和启用session功能
robot = WeRoBot(token=os.environ.get('DJANGO_WEROBOT_TOKEN')
or 'lylinux', enable_session=True)
# 创建Memcache存储实例用于session存储
memstorage = MemcacheStorage()
# 根据存储可用性配置机器人的session存储方式
if memstorage.is_available:
robot.config['SESSION_STORAGE'] = memstorage
else:
# 如果文件存储存在则删除旧文件使用文件存储作为session存储
if os.path.exists(os.path.join(settings.BASE_DIR, 'werobot_session')):
os.remove(os.path.join(settings.BASE_DIR, 'werobot_session'))
robot.config['SESSION_STORAGE'] = FileStorage(filename='werobot_session')
# 初始化博客API和命令处理器实例
blogapi = BlogApi()
cmd_handler = CommandHandler()
logger = logging.getLogger(__name__)
def convert_to_article_reply(articles, message):
"""
将文章列表转换为微信文章回复格式
Args:
articles: 文章对象列表
message: 微信消息对象
Returns:
ArticlesReply: 微信文章回复对象
"""
reply = ArticlesReply(message=message)
from blog.templatetags.blog_tags import truncatechars_content
for post in articles:
# 提取文章中的图片URL
imgs = re.findall(r'(?:http\:|https\:)?\/\/.*\.(?:png|jpg)', post.body)
imgurl = ''
if imgs:
imgurl = imgs[0]
# 创建单篇文章对象
article = Article(
title=post.title,
description=truncatechars_content(post.body),
@ -48,6 +65,16 @@ def convert_to_article_reply(articles, message):
@robot.filter(re.compile(r"^\?.*"))
def search(message, session):
"""
处理文章搜索请求根据关键词搜索文章并返回结果
Args:
message: 微信消息对象包含搜索关键词
session: 用户会话对象
Returns:
ArticlesReply或str: 搜索结果或提示信息
"""
s = message.content
searchstr = str(s).replace('?', '')
result = blogapi.search_articles(searchstr)
@ -61,6 +88,16 @@ def search(message, session):
@robot.filter(re.compile(r'^category\s*$', re.I))
def category(message, session):
"""
获取所有文章分类目录信息
Args:
message: 微信消息对象
session: 用户会话对象
Returns:
str: 包含所有分类名称的字符串
"""
categorys = blogapi.get_category_lists()
content = ','.join(map(lambda x: x.name, categorys))
return '所有文章分类目录:' + content
@ -68,6 +105,16 @@ def category(message, session):
@robot.filter(re.compile(r'^recent\s*$', re.I))
def recents(message, session):
"""
获取最新发布的文章列表
Args:
message: 微信消息对象
session: 用户会话对象
Returns:
ArticlesReply或str: 最新文章列表或提示信息
"""
articles = blogapi.get_recent_articles()
if articles:
reply = convert_to_article_reply(articles, message)
@ -78,6 +125,16 @@ def recents(message, session):
@robot.filter(re.compile('^help$', re.I))
def help(message, session):
"""
返回系统帮助信息包含所有可用命令说明
Args:
message: 微信消息对象
session: 用户会话对象
Returns:
str: 帮助信息文本
"""
return '''欢迎关注!
默认会与图灵机器人聊天~~
你可以通过下面这些命令来获得信息
@ -100,22 +157,61 @@ def help(message, session):
@robot.filter(re.compile(r'^weather\:.*$', re.I))
def weather(message, session):
"""
处理天气查询请求待实现
Args:
message: 微信消息对象
session: 用户会话对象
Returns:
str: 建设中提示信息
"""
return "建设中..."
@robot.filter(re.compile(r'^idcard\:.*$', re.I))
def idcard(message, session):
"""
处理身份证信息查询请求待实现
Args:
message: 微信消息对象
session: 用户会话对象
Returns:
str: 建设中提示信息
"""
return "建设中..."
@robot.handler
def echo(message, session):
"""
主消息处理函数创建消息处理器并处理用户消息
Args:
message: 微信消息对象
session: 用户会话对象
Returns:
str或其他类型: 处理结果
"""
handler = MessageHandler(message, session)
return handler.handler()
class MessageHandler:
"""微信消息处理器类,负责处理各种用户消息和命令"""
def __init__(self, message, session):
"""
初始化消息处理器
Args:
message: 微信消息对象
session: 用户会话对象
"""
userid = message.source
self.message = message
self.session = session
@ -129,27 +225,51 @@ class MessageHandler:
@property
def is_admin(self):
"""
判断当前用户是否为管理员
Returns:
bool: 是否为管理员
"""
return self.userinfo.isAdmin
@property
def is_password_set(self):
"""
判断管理员密码是否已设置
Returns:
bool: 密码是否已设置
"""
return self.userinfo.isPasswordSet
def save_session(self):
"""
保存用户会话信息到session中
"""
info = jsonpickle.encode(self.userinfo)
self.session[self.userid] = info
def handler(self):
"""
主要的消息处理逻辑根据用户状态和输入内容进行相应处理
Returns:
str: 处理结果响应文本
"""
info = self.message.content
# 处理管理员退出命令
if self.userinfo.isAdmin and info.upper() == 'EXIT':
self.userinfo = WxUserInfo()
self.save_session()
return "退出成功"
# 处理管理员登录命令
if info.upper() == 'ADMIN':
self.userinfo.isAdmin = True
self.save_session()
return "输入管理员密码"
# 处理管理员密码验证
if self.userinfo.isAdmin and not self.userinfo.isPasswordSet:
passwd = settings.WXADMIN
if settings.TESTING:
@ -159,6 +279,7 @@ class MessageHandler:
self.save_session()
return "验证通过,请输入命令或者要执行的命令代码:输入helpme获得帮助"
else:
# 处理密码错误次数限制
if self.userinfo.Count >= 3:
self.userinfo = WxUserInfo()
self.save_session()
@ -166,6 +287,7 @@ class MessageHandler:
self.userinfo.Count += 1
self.save_session()
return "验证失败,请重新输入管理员密码:"
# 处理管理员命令执行
if self.userinfo.isAdmin and self.userinfo.isPasswordSet:
if self.userinfo.Command != '' and info.upper() == 'Y':
return cmd_handler.run(self.userinfo.Command)
@ -176,12 +298,19 @@ class MessageHandler:
self.save_session()
return "确认执行: " + info + " 命令?"
# 默认使用ChatGPT处理普通消息
return ChatGPT.chat(info)
class WxUserInfo():
"""微信用户信息类,存储用户的状态信息"""
def __init__(self):
"""
初始化用户信息默认为非管理员状态
"""
self.isAdmin = False
self.isPasswordSet = False
self.Count = 0
self.Command = ''

@ -12,15 +12,32 @@ from .robot import search, category, recents
# Create your tests here.
class ServerManagerTest(TestCase):
"""
服务器管理模块的测试类用于测试聊天机器人命令处理文章搜索等功能
"""
def setUp(self):
"""
测试初始化方法在每个测试方法执行前运行
创建用于模拟HTTP请求的Client和RequestFactory实例
"""
self.client = Client()
self.factory = RequestFactory()
def test_chat_gpt(self):
"""
测试ChatGPT聊天功能
验证调用ChatGPT.chat方法能否返回非空内容
"""
content = ChatGPT.chat("你好")
self.assertIsNotNone(content)
def test_validate_comment(self):
"""
测试评论验证及相关功能包括用户登录文章创建命令处理和消息处理等
验证搜索分类最近文章命令执行和消息处理等功能是否正常运行
"""
# 创建超级用户并登录
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",
@ -28,10 +45,12 @@ class ServerManagerTest(TestCase):
self.client.login(username='liangliangyy1', password='liangliangyy1')
# 创建分类
c = Category()
c.name = "categoryccc"
c.save()
# 创建文章
article = Article()
article.title = "nicetitleccc"
article.body = "nicecontentccc"
@ -40,23 +59,33 @@ class ServerManagerTest(TestCase):
article.type = 'a'
article.status = 'p'
article.save()
# 测试搜索功能
s = TextMessage([])
s.content = "nice"
rsp = search(s, None)
# 测试分类功能
rsp = category(None, None)
self.assertIsNotNone(rsp)
# 测试最近文章功能
rsp = recents(None, None)
self.assertTrue(rsp != '暂时还没有文章')
# 创建并保存命令
cmd = commands()
cmd.title = "test"
cmd.command = "ls"
cmd.describe = "test"
cmd.save()
# 测试命令处理器
cmdhandler = CommandHandler()
rsp = cmdhandler.run('test')
self.assertIsNotNone(rsp)
# 测试消息处理器的各种场景
s.source = 'u'
s.content = 'test'
msghandler = MessageHandler(s, {})
@ -77,3 +106,5 @@ class ServerManagerTest(TestCase):
s.content = 'exit'
msghandler.handler()

@ -5,6 +5,7 @@ from .robot import robot
app_name = "servermanager"
urlpatterns = [
# 将微信机器人接口映射到/robot路径
path(r'robot', make_view(robot)),
]

@ -0,0 +1,442 @@
"""
OAuth认证模块
提供第三方登录功能支持微信微博GitHubGoogleQQFacebook等平台
"""
import logging
import json
import urllib.parse
from abc import ABC, abstractmethod
from django.urls import reverse
from django.shortcuts import get_object_or_404, render
from django.http import HttpResponseRedirect, HttpResponseForbidden
from django.contrib import auth
from django.utils.translation import gettext_lazy as _
from .models import OAuthUser, OAuthConfig
# 获取日志器
logger = logging.getLogger(__name__)
class BaseOauthManager(ABC):
"""OAuth认证管理器基类"""
# 授权URL和API端点
AUTH_URL = ""
TOKEN_URL = ""
OPEN_ID_URL = ""
USER_INFO_URL = ""
ICON_NAME = "" # 平台图标名称
def __init__(self):
"""初始化OAuth管理器"""
self.access_token = None
self.openid = None
self.client_id = None
self.client_secret = None
self.callback_url = None
@abstractmethod
def get_authorization_url(self, next_url='/'):
"""获取授权URL"""
pass
@abstractmethod
def get_access_token_by_code(self, code):
"""通过授权码获取访问令牌"""
pass
@abstractmethod
def get_oauth_userinfo(self):
"""获取用户信息"""
pass
def get_picture(self, metadata):
"""获取用户头像(可选实现)"""
return ""
def do_get(self, url, params, headers=None):
"""
执行GET请求
Args:
url: 请求URL
params: 请求参数
headers: 请求头
Returns:
str: 响应内容
"""
try:
response = requests.get(url=url, params=params, headers=headers)
logger.info(f"GET Response: {response.text}")
return response.text
except Exception as e:
logger.error(f"GET request failed: {e}")
raise
def do_post(self, url, params, headers=None):
"""
执行POST请求
Args:
url: 请求URL
params: 请求参数
headers: 请求头
Returns:
str: 响应内容
"""
try:
response = requests.post(url, data=params, headers=headers)
logger.info(f"POST Response: {response.text}")
return response.text
except Exception as e:
logger.error(f"POST request failed: {e}")
raise
@property
def is_access_token_set(self):
"""检查访问令牌是否已设置"""
return self.access_token is not None
def get_config(self):
"""获取OAuth配置"""
try:
config = OAuthConfig.objects.filter(
type=self.ICON_NAME.lower(),
is_enable=True
).first()
if config:
self.client_id = config.appkey
self.client_secret = config.appsecret
self.callback_url = config.callback_url
return config
except Exception as e:
logger.error(f"Get OAuth config failed: {e}")
return None
class WeiboOauthManager(BaseOauthManager):
"""微博OAuth认证管理器"""
AUTH_URL = "https://api.weibo.com/oauth2/authorize"
TOKEN_URL = "https://api.weibo.com/oauth2/access_token"
USER_INFO_URL = "https://api.weibo.com/2/users/show.json"
ICON_NAME = "weibo"
def get_authorization_url(self, next_url='/'):
"""
获取微博授权URL
Args:
next_url: 授权成功后跳转的URL
Returns:
str: 完整的授权URL
"""
if not self.get_config():
return ""
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""
通过授权码获取访问令牌
Args:
code: 授权码
Returns:
tuple: (access_token, 响应数据)
"""
params = {
'grant_type': 'authorization_code',
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
'redirect_uri': self.callback_url
}
try:
response_text = self.do_post(self.TOKEN_URL, params)
token_data = json.loads(response_text)
if 'access_token' in token_data:
self.access_token = token_data['access_token']
return self.access_token, token_data
else:
logger.error(f"Failed to get access token: {token_data}")
raise OAuthAccessTokenException(token_data)
except json.JSONDecodeError as e:
logger.error(f"JSON decode error: {e}")
raise OAuthAccessTokenException("Invalid response format")
except Exception as e:
logger.error(f"Get access token failed: {e}")
raise OAuthAccessTokenException(str(e))
def get_oauth_userinfo(self):
"""
获取微博用户信息
Returns:
OAuthUser: 用户信息对象
"""
if not self.is_access_token_set:
raise ValueError("Access token not set")
params = {'access_token': self.access_token}
response_text = self.do_get(self.USER_INFO_URL, params)
user_data = json.loads(response_text)
# 创建OAuth用户对象
oauth_user = OAuthUser()
oauth_user.nickname = user_data.get('screen_name', '')
oauth_user.picture = user_data.get('profile_image_url', '')
oauth_user.token = self.access_token
oauth_user.type = 'weibo'
oauth_user.email = user_data.get('email', '')
oauth_user.metadata = json.dumps(user_data)
return oauth_user
class GitHubOauthManager(BaseOauthManager):
"""GitHub OAuth认证管理器"""
AUTH_URL = "https://github.com/login/oauth/authorize"
TOKEN_URL = "https://github.com/login/oauth/access_token"
USER_INFO_URL = "https://api.github.com/user"
ICON_NAME = "github"
def get_authorization_url(self, next_url='/'):
"""获取GitHub授权URL"""
if not self.get_config():
return ""
params = {
'client_id': self.client_id,
'scope': 'user:email',
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
def get_access_token_by_code(self, code):
"""通过授权码获取GitHub访问令牌"""
params = {
'client_id': self.client_id,
'client_secret': self.client_secret,
'code': code,
}
headers = {'Accept': 'application/json'}
response_text = self.do_post(self.TOKEN_URL, params, headers)
token_data = json.loads(response_text)
if 'access_token' in token_data:
self.access_token = token_data['access_token']
return self.access_token, token_data
else:
raise OAuthAccessTokenException(token_data)
def get_oauth_userinfo(self):
"""获取GitHub用户信息"""
if not self.is_access_token_set:
raise ValueError("Access token not set")
headers = {'Authorization': f'token {self.access_token}'}
response_text = self.do_get(self.USER_INFO_URL, headers=headers)
user_data = json.loads(response_text)
oauth_user = OAuthUser()
oauth_user.nickname = user_data.get('login', '')
oauth_user.picture = user_data.get('avatar_url', '')
oauth_user.token = self.access_token
oauth_user.type = 'github'
oauth_user.email = user_data.get('email', '')
oauth_user.metadata = json.dumps(user_data)
return oauth_user
# OAuth异常类
class OAuthException(Exception):
"""OAuth异常基类"""
pass
class OAuthAccessTokenException(OAuthException):
"""访问令牌获取异常"""
pass
class OAuthUserInfoException(OAuthException):
"""用户信息获取异常"""
pass
# 工具函数
def get_oauth_apps():
"""
获取所有可用的OAuth应用
Returns:
list: OAuth管理器实例列表
"""
config_types = OAuthConfig.objects.filter(
is_enable=True
).values_list('type', flat=True)
applications = BaseOauthManager.__subclasses__()
apps = [
app() for app in applications
if app().ICON_NAME.lower() in config_types
]
return apps
def get_manager_by_type(oauth_type):
"""
根据类型获取OAuth管理器
Args:
oauth_type: OAuth类型weibogithub等
Returns:
BaseOauthManager: OAuth管理器实例
"""
applications = get_oauth_apps()
if applications:
finds = list(filter(
lambda x: x.ICON_NAME.lower() == oauth_type.lower(),
applications
))
return finds[0] if finds else None
return None
# 视图函数
def oauth_login(request):
"""
OAuth登录入口
Args:
request: HTTP请求对象
Returns:
HttpResponseRedirect: 重定向到授权页面或首页
"""
oauth_type = request.GET.get('type') # 修复:避免使用内置函数名
if not oauth_type:
return HttpResponseRedirect('/')
manager = get_manager_by_type(oauth_type)
if not manager:
return HttpResponseRedirect('/')
next_url = request.GET.get('next_url', '/')
authorize_url = manager.get_authorization_url(next_url)
return HttpResponseRedirect(authorize_url)
def oauth_authorize(request):
"""
OAuth授权回调处理
Args:
request: HTTP请求对象
Returns:
HttpResponse: 授权结果页面或重定向
"""
oauth_type = request.GET.get('type') # 修复:避免使用内置函数名
code = request.GET.get('code')
next_url = request.GET.get('next_url', '/')
if not oauth_type or not code:
return HttpResponseRedirect('/')
try:
manager = get_manager_by_type(oauth_type)
if not manager:
return HttpResponseRedirect('/')
# 获取访问令牌
manager.get_access_token_by_code(code)
# 获取用户信息
oauth_user = manager.get_oauth_userinfo()
# 处理用户登录或绑定
user = process_oauth_user(oauth_user, request)
if user:
auth.login(request, user)
return HttpResponseRedirect(next_url)
else:
# 转到绑定页面
return redirect_to_bind_page(oauth_user, request)
except OAuthException as e:
logger.error(f"OAuth authorization failed: {e}")
return render(request, 'oauth/error.html', {
'error_message': _('OAuth authentication failed')
})
def process_oauth_user(oauth_user, request):
"""
处理OAuth用户信息
Args:
oauth_user: OAuth用户对象
request: HTTP请求对象
Returns:
User: 认证用户对象或None
"""
try:
# 查找已存在的OAuth用户
existing_oauth_user = OAuthUser.objects.filter(
type=oauth_user.type,
openid=oauth_user.openid
).first()
if existing_oauth_user:
# 已绑定用户,直接登录
if existing_oauth_user.author:
return existing_oauth_user.author
else:
# 未绑定用户,转到绑定页面
return None
else:
# 新用户保存OAuth信息并转到绑定页面
oauth_user.save()
return None
except Exception as e:
logger.error(f"Process OAuth user failed: {e}")
return None
def redirect_to_bind_page(oauth_user, request):
"""
重定向到用户绑定页面
Args:
oauth_user: OAuth用户对象
request: HTTP请求对象
Returns:
HttpResponseRedirect: 重定向到绑定页面
"""
# 生成绑定URL
bind_url = reverse('oauth:bind') + f'?oauth_id={oauth_user.id}'
return HttpResponseRedirect(bind_url)
Loading…
Cancel
Save