diff --git a/doc/开源软件的质量分析报告文档.docx b/doc/开源软件的质量分析报告文档.docx new file mode 100644 index 0000000..d941537 Binary files /dev/null and b/doc/开源软件的质量分析报告文档.docx differ diff --git a/doc/编码规范.docx b/doc/编码规范.docx new file mode 100644 index 0000000..15e67b5 Binary files /dev/null and b/doc/编码规范.docx differ diff --git a/src/DjangoBlog-master/accounts/apps.py b/src/DjangoBlog-master/accounts/apps.py index 9b3fc5a..89b9e88 100644 --- a/src/DjangoBlog-master/accounts/apps.py +++ b/src/DjangoBlog-master/accounts/apps.py @@ -2,4 +2,27 @@ from django.apps import AppConfig class AccountsConfig(AppConfig): + """ + 账户应用的配置类 + Django应用配置类,用于配置accounts应用的元数据和行为 + 继承自Django的AppConfig基类 + """ + + # 应用的Python路径,Django使用这个属性来识别应用 + # 这应该与应用的目录名一致 name = 'accounts' + + # 其他常用但未在此定义的配置选项包括: + # - verbose_name: 应用的易读名称(用于管理后台显示) + # - default_auto_field: 默认的主键字段类型 + # - label: 应用的简短标签(用于替代name) + # - path: 应用的文件系统路径 + + # 示例:如果需要配置verbose_name,可以这样添加: + # verbose_name = '用户账户管理' + + # 示例:如果需要自定义ready方法,可以这样添加: + # def ready(self): + # # 应用启动时执行的代码 + # # 通常用于信号注册等初始化操作 + # import accounts.signals diff --git a/src/DjangoBlog-master/accounts/forms.py b/src/DjangoBlog-master/accounts/forms.py index fce4137..50f8cd1 100644 --- a/src/DjangoBlog-master/accounts/forms.py +++ b/src/DjangoBlog-master/accounts/forms.py @@ -9,90 +9,116 @@ from .models import BlogUser class LoginForm(AuthenticationForm): + """自定义登录表单,继承自Django的AuthenticationForm""" + def __init__(self, *args, **kwargs): + """初始化方法,设置表单字段的widget属性""" super(LoginForm, self).__init__(*args, **kwargs) + # 设置用户名字段的输入框属性 self.fields['username'].widget = widgets.TextInput( attrs={'placeholder': "username", "class": "form-control"}) + # 设置密码字段的输入框属性 self.fields['password'].widget = widgets.PasswordInput( attrs={'placeholder': "password", "class": "form-control"}) class RegisterForm(UserCreationForm): + """自定义用户注册表单,继承自Django的UserCreationForm""" + def __init__(self, *args, **kwargs): + """初始化方法,设置所有表单字段的widget属性""" super(RegisterForm, self).__init__(*args, **kwargs) + # 设置用户名字段的输入框属性 self.fields['username'].widget = widgets.TextInput( attrs={'placeholder': "username", "class": "form-control"}) + # 设置邮箱字段的输入框属性 self.fields['email'].widget = widgets.EmailInput( attrs={'placeholder': "email", "class": "form-control"}) + # 设置密码字段的输入框属性 self.fields['password1'].widget = widgets.PasswordInput( attrs={'placeholder': "password", "class": "form-control"}) + # 设置确认密码字段的输入框属性 self.fields['password2'].widget = widgets.PasswordInput( attrs={'placeholder': "repeat password", "class": "form-control"}) def clean_email(self): + """邮箱字段验证方法""" email = self.cleaned_data['email'] + # 检查邮箱是否已存在 if get_user_model().objects.filter(email=email).exists(): raise ValidationError(_("email already exists")) return email class Meta: - model = get_user_model() - fields = ("username", "email") + """表单的元数据配置""" + model = get_user_model() # 使用当前激活的用户模型 + fields = ("username", "email") # 表单包含的字段 class ForgetPasswordForm(forms.Form): + """忘记密码重置表单""" + + # 新密码字段 new_password1 = forms.CharField( - label=_("New password"), - widget=forms.PasswordInput( + label=_("New password"), # 字段标签 + widget=forms.PasswordInput( # 密码输入框 attrs={ - "class": "form-control", - 'placeholder': _("New password") + "class": "form-control", # CSS类 + 'placeholder': _("New password") # 占位符文本 } ), ) + # 确认新密码字段 new_password2 = forms.CharField( - label="确认密码", - widget=forms.PasswordInput( + label="确认密码", # 字段标签 + widget=forms.PasswordInput( # 密码输入框 attrs={ - "class": "form-control", - 'placeholder': _("Confirm password") + "class": "form-control", # CSS类 + 'placeholder': _("Confirm password") # 占位符文本 } ), ) + # 邮箱字段 email = forms.EmailField( - label='邮箱', - widget=forms.TextInput( + label='邮箱', # 字段标签 + widget=forms.TextInput( # 文本输入框 attrs={ - 'class': 'form-control', - 'placeholder': _("Email") + 'class': 'form-control', # CSS类 + 'placeholder': _("Email") # 占位符文本 } ), ) + # 验证码字段 code = forms.CharField( - label=_('Code'), - widget=forms.TextInput( + label=_('Code'), # 字段标签 + widget=forms.TextInput( # 文本输入框 attrs={ - 'class': 'form-control', - 'placeholder': _("Code") + 'class': 'form-control', # CSS类 + 'placeholder': _("Code") # 占位符文本 } ), ) def clean_new_password2(self): + """确认密码字段验证方法""" password1 = self.data.get("new_password1") password2 = self.data.get("new_password2") + # 检查两次输入的密码是否一致 if password1 and password2 and password1 != password2: raise ValidationError(_("passwords do not match")) + # 使用Django的密码验证器验证密码强度 password_validation.validate_password(password2) return password2 def clean_email(self): + """邮箱字段验证方法""" user_email = self.cleaned_data.get("email") + # 检查邮箱对应的用户是否存在 if not BlogUser.objects.filter( email=user_email ).exists(): @@ -101,7 +127,9 @@ class ForgetPasswordForm(forms.Form): return user_email def clean_code(self): + """验证码字段验证方法""" code = self.cleaned_data.get("code") + # 使用utils模块验证邮箱和验证码是否匹配 error = utils.verify( email=self.cleaned_data.get("email"), code=code, @@ -112,6 +140,8 @@ class ForgetPasswordForm(forms.Form): class ForgetPasswordCodeForm(forms.Form): + """忘记密码验证码请求表单(仅包含邮箱字段)""" + email = forms.EmailField( - label=_('Email'), - ) + label=_('Email'), # 邮箱字段标签 + ) \ No newline at end of file diff --git a/src/DjangoBlog-master/accounts/migrations/0001_initial.py b/src/DjangoBlog-master/accounts/migrations/0001_initial.py index d2fbcab..be13df7 100644 --- a/src/DjangoBlog-master/accounts/migrations/0001_initial.py +++ b/src/DjangoBlog-master/accounts/migrations/0001_initial.py @@ -7,43 +7,89 @@ import django.utils.timezone class Migration(migrations.Migration): + """ + Django数据库迁移文件 + 用于创建BlogUser模型的数据库表结构 + 这是一个初始迁移文件(initial migration) + """ + # 标记为初始迁移,Django使用这个标志来识别应用的第一个迁移 initial = True + # 依赖关系:此迁移依赖于auth应用的特定迁移 + # 确保在创建用户表之前,权限相关的表已经存在 dependencies = [ ('auth', '0012_alter_user_first_name_max_length'), ] + # 迁移操作列表:定义要执行的具体数据库操作 operations = [ + # 创建BlogUser模型的数据库表 migrations.CreateModel( - name='BlogUser', + name='BlogUser', # 模型名称 fields=[ + # 主键字段:使用BigAutoField作为自增主键 ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + + # 密码字段:存储加密后的密码,最大长度128字符 ('password', models.CharField(max_length=128, verbose_name='password')), + + # 最后登录时间:记录用户最后一次登录的时间 ('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')), + + # 超级用户标志:标记用户是否拥有所有权限 ('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')), + + # 用户名字段:唯一标识用户,有严格的字符限制和验证 ('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')), + + # 名字字段:用户的名,可选 ('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')), + + # 姓氏字段:用户的姓,可选 ('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')), + + # 邮箱字段:用户的邮箱地址,可选 ('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')), + + # 员工状态:标记用户是否可以登录管理后台 ('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')), + + # 活跃状态:标记用户账户是否激活 ('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')), + + # 加入日期:用户注册的时间,默认为当前时间 ('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')), + + # 昵称字段:自定义字段,用户显示名称,可选 ('nickname', models.CharField(blank=True, max_length=100, verbose_name='昵称')), + + # 创建时间:自定义字段,记录用户创建时间 ('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')), + + # 最后修改时间:自定义字段,记录用户信息最后修改时间 ('last_mod_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='修改时间')), + + # 来源字段:自定义字段,记录用户创建来源(如注册渠道) ('source', models.CharField(blank=True, max_length=100, verbose_name='创建来源')), + + # 组关联:多对多关系,用户所属的权限组 ('groups', models.ManyToManyField(blank=True, help_text='The groups this user belongs to. A user will get all permissions granted to each of their groups.', related_name='user_set', related_query_name='user', to='auth.group', verbose_name='groups')), + + # 用户权限:多对多关系,用户特有的权限 ('user_permissions', models.ManyToManyField(blank=True, help_text='Specific permissions for this user.', related_name='user_set', related_query_name='user', to='auth.permission', verbose_name='user permissions')), ], + # 模型元选项 options={ - 'verbose_name': '用户', - 'verbose_name_plural': '用户', - 'ordering': ['-id'], - 'get_latest_by': 'id', + 'verbose_name': '用户', # 单数名称(用于管理后台) + 'verbose_name_plural': '用户', # 复数名称(用于管理后台) + 'ordering': ['-id'], # 默认排序:按ID降序(最新的在前) + 'get_latest_by': 'id', # 指定获取最新记录时使用的字段 }, + # 模型管理器 managers=[ + # 使用Django默认的UserManager来管理用户对象 ('objects', django.contrib.auth.models.UserManager()), ], ), - ] + ] \ No newline at end of file diff --git a/src/DjangoBlog-master/accounts/migrations/0002_alter_bloguser_options_remove_bloguser_created_time_and_more.py b/src/DjangoBlog-master/accounts/migrations/0002_alter_bloguser_options_remove_bloguser_created_time_and_more.py index 1a9f509..0cb6fc9 100644 --- a/src/DjangoBlog-master/accounts/migrations/0002_alter_bloguser_options_remove_bloguser_created_time_and_more.py +++ b/src/DjangoBlog-master/accounts/migrations/0002_alter_bloguser_options_remove_bloguser_created_time_and_more.py @@ -5,42 +5,82 @@ import django.utils.timezone class Migration(migrations.Migration): + """ + Django数据库迁移文件 + 用于修改BlogUser模型的结构和字段定义 + 这是一个数据模型重构迁移,主要更新字段命名和国际化 + """ + # 依赖关系:此迁移依赖于accounts应用的初始迁移 + # 确保在修改表结构之前,初始表已经创建 dependencies = [ - ('accounts', '0001_initial'), + ('accounts', '0001_initial'), # 依赖于accounts应用的第一个迁移文件 ] + # 迁移操作列表:定义要执行的具体数据库结构修改 operations = [ + # 修改模型的元选项(主要是国际化显示名称) migrations.AlterModelOptions( - name='bloguser', - options={'get_latest_by': 'id', 'ordering': ['-id'], 'verbose_name': 'user', 'verbose_name_plural': 'user'}, + name='bloguser', # 目标模型名称 + options={ + 'get_latest_by': 'id', # 保持按id获取最新记录 + 'ordering': ['-id'], # 保持按id降序排列 + 'verbose_name': 'user', # 更新单数名称为英文(国际化准备) + 'verbose_name_plural': 'user' # 更新复数名称为英文(国际化准备) + }, ), + + # 删除旧的创建时间字段(为后续添加新字段做准备) migrations.RemoveField( - model_name='bloguser', - name='created_time', + model_name='bloguser', # 目标模型 + name='created_time', # 要删除的字段名 ), + + # 删除旧的最后修改时间字段 migrations.RemoveField( - model_name='bloguser', - name='last_mod_time', + model_name='bloguser', # 目标模型 + name='last_mod_time', # 要删除的字段名 ), + + # 添加新的创建时间字段(使用国际化的字段名) migrations.AddField( - model_name='bloguser', - name='creation_time', - field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='creation time'), + model_name='bloguser', # 目标模型 + name='creation_time', # 新字段名 + field=models.DateTimeField( + default=django.utils.timezone.now, # 默认值为当前时间 + verbose_name='creation time' # 英文显示名称(国际化) + ), ), + + # 添加新的最后修改时间字段(使用国际化的字段名) migrations.AddField( - model_name='bloguser', - name='last_modify_time', - field=models.DateTimeField(default=django.utils.timezone.now, verbose_name='last modify time'), + model_name='bloguser', # 目标模型 + name='last_modify_time', # 新字段名 + field=models.DateTimeField( + default=django.utils.timezone.now, # 默认值为当前时间 + verbose_name='last modify time' # 英文显示名称(国际化) + ), ), + + # 修改昵称字段的显示名称(国际化) migrations.AlterField( - model_name='bloguser', - name='nickname', - field=models.CharField(blank=True, max_length=100, verbose_name='nick name'), + model_name='bloguser', # 目标模型 + name='nickname', # 要修改的字段 + field=models.CharField( + blank=True, # 保持允许为空 + max_length=100, # 保持最大长度100 + verbose_name='nick name' # 更新为英文显示名称 + ), ), + + # 修改来源字段的显示名称(国际化) migrations.AlterField( - model_name='bloguser', - name='source', - field=models.CharField(blank=True, max_length=100, verbose_name='create source'), + model_name='bloguser', # 目标模型 + name='source', # 要修改的字段 + field=models.CharField( + blank=True, # 保持允许为空 + max_length=100, # 保持最大长度100 + verbose_name='create source' # 更新为英文显示名称 + ), ), - ] + ] \ No newline at end of file diff --git a/src/DjangoBlog-master/accounts/models.py b/src/DjangoBlog-master/accounts/models.py index 3baddbb..44ae74b 100644 --- a/src/DjangoBlog-master/accounts/models.py +++ b/src/DjangoBlog-master/accounts/models.py @@ -9,27 +9,61 @@ from djangoblog.utils import get_current_site # Create your models here. class BlogUser(AbstractUser): + """ + 自定义用户模型,继承自Django的AbstractUser基类 + 扩展了博客系统的用户功能 + """ + + # 昵称字段,允许为空 nickname = models.CharField(_('nick name'), max_length=100, blank=True) + + # 用户创建时间,默认为当前时间 creation_time = models.DateTimeField(_('creation time'), default=now) + + # 最后修改时间,默认为当前时间 last_modify_time = models.DateTimeField(_('last modify time'), default=now) + + # 用户创建来源(如:网站注册、第三方登录等),允许为空 source = models.CharField(_('create source'), max_length=100, blank=True) def get_absolute_url(self): + """ + 获取用户的绝对URL,用于Django的通用视图和模板中 + 返回用户详情页的URL + """ return reverse( 'blog:author_detail', kwargs={ 'author_name': self.username}) def __str__(self): + """ + 定义模型的字符串表示形式 + 在管理后台和其他显示对象的地方使用 + 这里使用邮箱作为标识 + """ return self.email def get_full_url(self): - site = get_current_site().domain + """ + 获取用户的完整URL(包含域名) + 用于生成完整的用户主页链接 + """ + site = get_current_site().domain # 获取当前站点域名 url = "https://{site}{path}".format(site=site, path=self.get_absolute_url()) return url class Meta: + """模型的元数据配置""" + + # 默认按ID降序排列(最新的用户排在前面) ordering = ['-id'] + + # 在管理后台中显示的单数名称 verbose_name = _('user') + + # 在管理后台中显示的复数名称 verbose_name_plural = verbose_name - get_latest_by = 'id' + + # 指定获取最新记录时使用的字段 + get_latest_by = 'id' \ No newline at end of file diff --git a/src/DjangoBlog-master/accounts/tests.py b/src/DjangoBlog-master/accounts/tests.py index 6893411..6df4beb 100644 --- a/src/DjangoBlog-master/accounts/tests.py +++ b/src/DjangoBlog-master/accounts/tests.py @@ -13,172 +13,215 @@ from . import utils class AccountTest(TestCase): def setUp(self): - self.client = Client() - self.factory = RequestFactory() + """测试用例初始化方法,每个测试方法执行前都会运行""" + self.client = Client() # 创建测试客户端,用于模拟HTTP请求 + self.factory = RequestFactory() # 创建请求工厂,用于构建请求对象 + # 创建测试用户 self.blog_user = BlogUser.objects.create_user( username="test", email="admin@admin.com", password="12345678" ) - self.new_test = "xxx123--=" + self.new_test = "xxx123--=" # 测试用的新密码 def test_validate_account(self): - site = get_current_site().domain + """测试账户验证功能,包括登录、管理员权限和文章管理""" + site = get_current_site().domain # 获取当前站点域名 + # 创建超级用户 user = BlogUser.objects.create_superuser( email="liangliangyy1@gmail.com", username="liangliangyy1", password="qwer!@#$ggg") testuser = BlogUser.objects.get(username='liangliangyy1') + # 测试登录功能 loginresult = self.client.login( username='liangliangyy1', password='qwer!@#$ggg') - self.assertEqual(loginresult, True) + self.assertEqual(loginresult, True) # 断言登录成功 + + # 测试管理员页面访问 response = self.client.get('/admin/') - self.assertEqual(response.status_code, 200) + self.assertEqual(response.status_code, 200) # 断言可以访问管理员页面 + # 创建测试分类 category = Category() category.name = "categoryaaa" category.creation_time = timezone.now() category.last_modify_time = timezone.now() category.save() + # 创建测试文章 article = Article() article.title = "nicetitleaaa" article.body = "nicecontentaaa" article.author = user article.category = category - article.type = 'a' - article.status = 'p' + article.type = 'a' # 文章类型 + article.status = 'p' # 发布状态 article.save() + # 测试文章管理页面访问 response = self.client.get(article.get_admin_url()) - self.assertEqual(response.status_code, 200) + self.assertEqual(response.status_code, 200) # 断言可以访问文章管理页面 def test_validate_register(self): + """测试用户注册流程,包括注册、邮箱验证、登录和权限管理""" + # 验证注册前用户不存在 self.assertEquals( 0, len( BlogUser.objects.filter( email='user123@user.com'))) + + # 发送注册请求 response = self.client.post(reverse('account:register'), { 'username': 'user1233', 'email': 'user123@user.com', 'password1': 'password123!q@wE#R$T', 'password2': 'password123!q@wE#R$T', }) + + # 验证注册后用户已创建 self.assertEquals( 1, len( BlogUser.objects.filter( email='user123@user.com'))) user = BlogUser.objects.filter(email='user123@user.com')[0] + + # 生成邮箱验证签名 sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id))) path = reverse('accounts:result') url = '{path}?type=validation&id={id}&sign={sign}'.format( path=path, id=user.id, sign=sign) + + # 测试验证页面访问 response = self.client.get(url) self.assertEqual(response.status_code, 200) + # 测试登录功能 self.client.login(username='user1233', password='password123!q@wE#R$T') user = BlogUser.objects.filter(email='user123@user.com')[0] + + # 提升用户权限为管理员 user.is_superuser = True user.is_staff = True user.save() - delete_sidebar_cache() + delete_sidebar_cache() # 清理侧边栏缓存 + + # 创建测试分类 category = Category() category.name = "categoryaaa" category.creation_time = timezone.now() category.last_modify_time = timezone.now() category.save() + # 创建测试文章 article = Article() article.category = category article.title = "nicetitle333" article.body = "nicecontentttt" article.author = user - article.type = 'a' article.status = 'p' article.save() + # 测试文章管理页面访问 response = self.client.get(article.get_admin_url()) self.assertEqual(response.status_code, 200) + # 测试登出功能 response = self.client.get(reverse('account:logout')) - self.assertIn(response.status_code, [301, 302, 200]) + self.assertIn(response.status_code, [301, 302, 200]) # 登出通常会有重定向 + # 登出后测试文章管理页面访问(应该被拒绝) response = self.client.get(article.get_admin_url()) - self.assertIn(response.status_code, [301, 302, 200]) + self.assertIn(response.status_code, [301, 302, 200]) # 应该重定向到登录页 + # 重新登录测试(使用错误密码) response = self.client.post(reverse('account:login'), { 'username': 'user1233', - 'password': 'password123' + 'password': 'password123' # 错误的密码 }) self.assertIn(response.status_code, [301, 302, 200]) + # 登录后再次测试文章管理页面访问 response = self.client.get(article.get_admin_url()) self.assertIn(response.status_code, [301, 302, 200]) def test_verify_email_code(self): + """测试邮箱验证码功能""" to_email = "admin@admin.com" - code = generate_code() + code = generate_code() # 生成验证码 + + # 设置验证码到缓存 utils.set_code(to_email, code) + # 发送验证邮件 utils.send_verify_email(to_email, code) + # 测试正确的验证码验证 err = utils.verify("admin@admin.com", code) - self.assertEqual(err, None) + self.assertEqual(err, None) # 应该没有错误 + # 测试错误的邮箱验证 err = utils.verify("admin@123.com", code) - self.assertEqual(type(err), str) + self.assertEqual(type(err), str) # 应该返回错误信息字符串 def test_forget_password_email_code_success(self): + """测试成功发送忘记密码验证码""" resp = self.client.post( path=reverse("account:forget_password_code"), data=dict(email="admin@admin.com") ) self.assertEqual(resp.status_code, 200) - self.assertEqual(resp.content.decode("utf-8"), "ok") + self.assertEqual(resp.content.decode("utf-8"), "ok") # 断言返回成功消息 def test_forget_password_email_code_fail(self): + """测试忘记密码验证码发送失败的情况""" + # 测试空邮箱参数 resp = self.client.post( path=reverse("account:forget_password_code"), data=dict() ) self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱") + # 测试无效邮箱格式 resp = self.client.post( path=reverse("account:forget_password_code"), - data=dict(email="admin@com") + data=dict(email="admin@com") # 无效的邮箱格式 ) self.assertEqual(resp.content.decode("utf-8"), "错误的邮箱") def test_forget_password_email_success(self): + """测试成功重置密码""" code = generate_code() - utils.set_code(self.blog_user.email, code) + utils.set_code(self.blog_user.email, code) # 设置验证码到缓存 data = dict( new_password1=self.new_test, new_password2=self.new_test, email=self.blog_user.email, code=code, ) + # 提交密码重置请求 resp = self.client.post( path=reverse("account:forget_password"), data=data ) - self.assertEqual(resp.status_code, 302) + self.assertEqual(resp.status_code, 302) # 重置成功应该重定向 # 验证用户密码是否修改成功 blog_user = BlogUser.objects.filter( email=self.blog_user.email, ).first() # type: BlogUser - self.assertNotEqual(blog_user, None) - self.assertEqual(blog_user.check_password(data["new_password1"]), True) + self.assertNotEqual(blog_user, None) # 用户应该存在 + self.assertEqual(blog_user.check_password(data["new_password1"]), True) # 密码应该匹配 def test_forget_password_email_not_user(self): + """测试重置密码时用户不存在的情况""" data = dict( new_password1=self.new_test, new_password2=self.new_test, - email="123@123.com", + email="123@123.com", # 不存在的邮箱 code="123456", ) resp = self.client.post( @@ -186,22 +229,21 @@ class AccountTest(TestCase): data=data ) - self.assertEqual(resp.status_code, 200) - + self.assertEqual(resp.status_code, 200) # 应该返回错误页面而不是重定向 def test_forget_password_email_code_error(self): + """测试重置密码时验证码错误的情况""" code = generate_code() - utils.set_code(self.blog_user.email, code) + utils.set_code(self.blog_user.email, code) # 设置正确的验证码 data = dict( new_password1=self.new_test, new_password2=self.new_test, email=self.blog_user.email, - code="111111", + code="111111", # 错误的验证码 ) resp = self.client.post( path=reverse("account:forget_password"), data=data ) - self.assertEqual(resp.status_code, 200) - + self.assertEqual(resp.status_code, 200) # 应该返回错误页面而不是重定向 \ No newline at end of file diff --git a/src/DjangoBlog-master/accounts/urls.py b/src/DjangoBlog-master/accounts/urls.py index 107a801..4d273a8 100644 --- a/src/DjangoBlog-master/accounts/urls.py +++ b/src/DjangoBlog-master/accounts/urls.py @@ -4,25 +4,46 @@ from django.urls import re_path from . import views from .forms import LoginForm +# 定义应用的命名空间,用于URL反向解析 +# 在模板中使用如:{% url 'accounts:login' %} app_name = "accounts" -urlpatterns = [re_path(r'^login/$', - views.LoginView.as_view(success_url='/'), - name='login', - kwargs={'authentication_form': LoginForm}), - re_path(r'^register/$', - views.RegisterView.as_view(success_url="/"), - name='register'), - re_path(r'^logout/$', - views.LogoutView.as_view(), - name='logout'), - path(r'account/result.html', - views.account_result, - name='result'), - re_path(r'^forget_password/$', - views.ForgetPasswordView.as_view(), - name='forget_password'), - re_path(r'^forget_password_code/$', - views.ForgetPasswordEmailCode.as_view(), - name='forget_password_code'), - ] +# URL配置列表,定义所有用户账户相关的路由 +urlpatterns = [ + # 登录路由 - 使用正则表达式匹配以login/结尾的URL + re_path(r'^login/$', + # 使用基于类的视图,登录成功后重定向到首页 + views.LoginView.as_view(success_url='/'), + name='login', # URL名称,用于反向解析 + kwargs={'authentication_form': LoginForm}), # 传递自定义登录表单类 + + # 注册路由 - 使用正则表达式匹配以register/结尾的URL + re_path(r'^register/$', + # 注册视图,注册成功后重定向到首页 + views.RegisterView.as_view(success_url="/"), + name='register'), # URL名称 + + # 登出路由 - 使用正则表达式匹配以logout/结尾的URL + re_path(r'^logout/$', + # 登出视图,处理用户退出登录 + views.LogoutView.as_view(), + name='logout'), # URL名称 + + # 账户操作结果页面 - 使用path匹配精确路径 + path(r'account/result.html', + # 使用函数视图显示账户操作结果(如注册成功、密码重置成功等) + views.account_result, + name='result'), # URL名称 + + # 忘记密码页面 - 使用正则表达式匹配以forget_password/结尾的URL + re_path(r'^forget_password/$', + # 忘记密码视图,显示密码重置页面 + views.ForgetPasswordView.as_view(), + name='forget_password'), # URL名称 + + # 忘记密码验证码接口 - 使用正则表达式匹配以forget_password_code/结尾的URL + re_path(r'^forget_password_code/$', + # 处理忘记密码的邮箱验证码发送和验证 + views.ForgetPasswordEmailCode.as_view(), + name='forget_password_code'), # URL名称 +] \ No newline at end of file diff --git a/src/DjangoBlog-master/accounts/user_login_backend.py b/src/DjangoBlog-master/accounts/user_login_backend.py index 73cdca1..d339c68 100644 --- a/src/DjangoBlog-master/accounts/user_login_backend.py +++ b/src/DjangoBlog-master/accounts/user_login_backend.py @@ -4,23 +4,58 @@ from django.contrib.auth.backends import ModelBackend class EmailOrUsernameModelBackend(ModelBackend): """ - 允许使用用户名或邮箱登录 + 自定义认证后端,允许用户使用用户名或邮箱登录 + Extends ModelBackend to allow authentication using either username or email. """ def authenticate(self, request, username=None, password=None, **kwargs): + """ + 用户认证方法 + Authenticate a user based on username/email and password. + + Args: + request: HTTP请求对象 + username: 用户输入的用户名或邮箱 + password: 用户输入的密码 + **kwargs: 其他参数 + + Returns: + User: 认证成功的用户对象 + None: 认证失败 + """ + # 判断输入的是邮箱还是用户名 if '@' in username: + # 如果包含@符号,按邮箱处理 kwargs = {'email': username} else: + # 否则按用户名处理 kwargs = {'username': username} + try: + # 根据用户名或邮箱查找用户 user = get_user_model().objects.get(**kwargs) + # 验证密码是否正确 if user.check_password(password): return user except get_user_model().DoesNotExist: + # 用户不存在时返回None return None - def get_user(self, username): + def get_user(self, user_id): + """ + 根据用户ID获取用户对象 + Get a user by their primary key. + + Args: + user_id: 用户ID + + Returns: + User: 用户对象 + None: 用户不存在 + """ try: - return get_user_model().objects.get(pk=username) + # 通过主键查找用户 + return get_user_model().objects.get(pk=user_id) except get_user_model().DoesNotExist: - return None + # 用户不存在时返回None + return None \ No newline at end of file diff --git a/src/DjangoBlog-master/accounts/utils.py b/src/DjangoBlog-master/accounts/utils.py index 4b94bdf..3d434df 100644 --- a/src/DjangoBlog-master/accounts/utils.py +++ b/src/DjangoBlog-master/accounts/utils.py @@ -7,43 +7,58 @@ from django.utils.translation import gettext_lazy as _ from djangoblog.utils import send_email +# 验证码的生存时间(Time To Live),设置为5分钟 _code_ttl = timedelta(minutes=5) def send_verify_email(to_mail: str, code: str, subject: str = _("Verify Email")): - """发送重设密码验证码 + """发送验证邮件 Args: - to_mail: 接受邮箱 - subject: 邮件主题 - code: 验证码 + to_mail: 接收邮箱地址 + subject: 邮件主题,默认为"Verify Email" + code: 需要发送的验证码 """ + # 生成邮件HTML内容,包含验证码信息,并支持国际化 html_content = _( "You are resetting the password, the verification code is:%(code)s, valid within 5 minutes, please keep it " "properly") % {'code': code} + # 调用邮件发送工具函数发送邮件 send_email([to_mail], subject, html_content) def verify(email: str, code: str) -> typing.Optional[str]: - """验证code是否有效 + """验证邮箱和验证码是否匹配 Args: - email: 请求邮箱 - code: 验证码 + email: 需要验证的邮箱地址 + code: 用户输入的验证码 Return: - 如果有错误就返回错误str - Node: - 这里的错误处理不太合理,应该采用raise抛出 - 否测调用方也需要对error进行处理 + 如果验证失败返回错误信息字符串,验证成功返回None + Note: + 当前错误处理方式不够合理,建议改为抛出异常的方式, + 这样调用方可以通过try-except来处理错误,而不是检查返回值 """ + # 从缓存中获取该邮箱对应的验证码 cache_code = get_code(email) + # 比较缓存中的验证码和用户输入的验证码 if cache_code != code: return gettext("Verification code error") + # 验证成功返回None def set_code(email: str, code: str): - """设置code""" + """将验证码存储到缓存中 + Args: + email: 作为缓存键的邮箱地址 + code: 需要存储的验证码 + """ cache.set(email, code, _code_ttl.seconds) def get_code(email: str) -> typing.Optional[str]: - """获取code""" - return cache.get(email) + """从缓存中获取验证码 + Args: + email: 作为缓存键的邮箱地址 + Return: + 返回缓存中的验证码,如果不存在或已过期则返回None + """ + return cache.get(email) \ No newline at end of file diff --git a/src/DjangoBlog-master/accounts/views.py b/src/DjangoBlog-master/accounts/views.py index ae67aec..9933d75 100644 --- a/src/DjangoBlog-master/accounts/views.py +++ b/src/DjangoBlog-master/accounts/views.py @@ -29,31 +29,47 @@ from .models import BlogUser logger = logging.getLogger(__name__) -# Create your views here. - class RegisterView(FormView): + """ + 用户注册视图 + 处理用户注册流程,包括表单验证、用户创建和发送验证邮件 + """ form_class = RegisterForm template_name = 'account/registration_form.html' @method_decorator(csrf_protect) def dispatch(self, *args, **kwargs): + """确保视图受到CSRF保护""" return super(RegisterView, self).dispatch(*args, **kwargs) def form_valid(self, form): + """ + 处理有效的注册表单 + 创建非活跃用户,发送邮箱验证邮件 + """ if form.is_valid(): + # 创建用户但不立即保存到数据库 user = form.save(False) - user.is_active = False - user.source = 'Register' - user.save(True) + user.is_active = False # 邮箱验证前用户不可用 + user.source = 'Register' # 标记用户来源 + user.save(True) # 保存用户到数据库 + + # 获取当前站点信息 site = get_current_site().domain + + # 生成验证签名,用于验证链接的安全性 sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id))) + # 调试模式下使用本地地址 if settings.DEBUG: site = '127.0.0.1:8000' + + # 构建验证URL path = reverse('account:result') url = "http://{site}{path}?type=validation&id={id}&sign={sign}".format( site=site, path=path, id=user.id, sign=sign) + # 构建邮件内容 content = """

请点击下面链接验证您的邮箱

@@ -64,6 +80,8 @@ class RegisterView(FormView): 如果上面链接无法打开,请将此链接复制至浏览器。 {url} """.format(url=url) + + # 发送验证邮件 send_email( emailto=[ user.email, @@ -71,134 +89,195 @@ class RegisterView(FormView): title='验证您的电子邮箱', content=content) + # 重定向到结果页面 url = reverse('accounts:result') + \ '?type=register&id=' + str(user.id) return HttpResponseRedirect(url) else: + # 表单无效,重新渲染表单页面 return self.render_to_response({ 'form': form }) class LogoutView(RedirectView): - url = '/login/' + """ + 用户登出视图 + 处理用户登出操作并清理相关缓存 + """ + url = '/login/' # 登出后重定向的URL @method_decorator(never_cache) def dispatch(self, request, *args, **kwargs): + """确保登出页面不被缓存""" return super(LogoutView, self).dispatch(request, *args, **kwargs) def get(self, request, *args, **kwargs): - logout(request) - delete_sidebar_cache() + """处理GET请求的登出操作""" + logout(request) # 执行登出操作 + delete_sidebar_cache() # 清理侧边栏缓存 return super(LogoutView, self).get(request, *args, **kwargs) class LoginView(FormView): + """ + 用户登录视图 + 处理用户认证和登录会话管理 + """ form_class = LoginForm template_name = 'account/login.html' - success_url = '/' - redirect_field_name = REDIRECT_FIELD_NAME - login_ttl = 2626560 # 一个月的时间 + success_url = '/' # 登录成功后默认重定向的URL + redirect_field_name = REDIRECT_FIELD_NAME # 重定向字段名 + login_ttl = 2626560 # 一个月的时间(秒),用于"记住我"功能 - @method_decorator(sensitive_post_parameters('password')) - @method_decorator(csrf_protect) - @method_decorator(never_cache) + @method_decorator(sensitive_post_parameters('password')) # 保护密码参数 + @method_decorator(csrf_protect) # CSRF保护 + @method_decorator(never_cache) # 禁止缓存 def dispatch(self, request, *args, **kwargs): - + """应用装饰器到视图分发方法""" return super(LoginView, self).dispatch(request, *args, **kwargs) def get_context_data(self, **kwargs): + """向模板上下文添加重定向URL""" redirect_to = self.request.GET.get(self.redirect_field_name) if redirect_to is None: - redirect_to = '/' + redirect_to = '/' # 默认重定向到首页 kwargs['redirect_to'] = redirect_to return super(LoginView, self).get_context_data(**kwargs) def form_valid(self, form): + """处理有效的登录表单""" + # 使用Django的AuthenticationForm进行认证 form = AuthenticationForm(data=self.request.POST, request=self.request) if form.is_valid(): + # 认证成功,清理缓存并记录日志 delete_sidebar_cache() logger.info(self.redirect_field_name) + # 登录用户 auth.login(self.request, form.get_user()) + + # 处理"记住我"功能 if self.request.POST.get("remember"): self.request.session.set_expiry(self.login_ttl) + return super(LoginView, self).form_valid(form) - # return HttpResponseRedirect('/') else: + # 认证失败,重新显示表单 return self.render_to_response({ 'form': form }) def get_success_url(self): - + """获取登录成功后重定向的URL""" redirect_to = self.request.POST.get(self.redirect_field_name) + + # 验证重定向URL的安全性 if not url_has_allowed_host_and_scheme( url=redirect_to, allowed_hosts=[ self.request.get_host()]): - redirect_to = self.success_url + redirect_to = self.success_url # 不安全的URL使用默认URL + return redirect_to def account_result(request): - type = request.GET.get('type') - id = request.GET.get('id') - + """ + 账户操作结果页面 + 处理注册结果和邮箱验证 + """ + type = request.GET.get('type') # 操作类型:register或validation + id = request.GET.get('id') # 用户ID + + # 获取用户对象,如果不存在返回404 user = get_object_or_404(get_user_model(), id=id) logger.info(type) + + # 如果用户已激活,直接重定向到首页 if user.is_active: return HttpResponseRedirect('/') + + # 处理注册和验证操作 if type and type in ['register', 'validation']: if type == 'register': + # 注册成功页面 content = ''' 恭喜您注册成功,一封验证邮件已经发送到您的邮箱,请验证您的邮箱后登录本站。 ''' title = '注册成功' else: + # 邮箱验证处理 c_sign = get_sha256(get_sha256(settings.SECRET_KEY + str(user.id))) sign = request.GET.get('sign') + + # 验证签名安全性 if sign != c_sign: - return HttpResponseForbidden() + return HttpResponseForbidden() # 签名不匹配,禁止访问 + + # 激活用户账户 user.is_active = True user.save() + content = ''' 恭喜您已经成功的完成邮箱验证,您现在可以使用您的账号来登录本站。 ''' title = '验证成功' + + # 渲染结果页面 return render(request, 'account/result.html', { 'title': title, 'content': content }) else: + # 无效的操作类型,重定向到首页 return HttpResponseRedirect('/') class ForgetPasswordView(FormView): + """ + 忘记密码视图 + 处理密码重置请求 + """ form_class = ForgetPasswordForm template_name = 'account/forget_password.html' def form_valid(self, form): + """处理有效的密码重置表单""" if form.is_valid(): + # 根据邮箱查找用户 blog_user = BlogUser.objects.filter(email=form.cleaned_data.get("email")).get() + # 使用Django的密码哈希器设置新密码 blog_user.password = make_password(form.cleaned_data["new_password2"]) - blog_user.save() + blog_user.save() # 保存新密码 + + # 重定向到登录页面 return HttpResponseRedirect('/login/') else: + # 表单无效,重新显示表单 return self.render_to_response({'form': form}) class ForgetPasswordEmailCode(View): + """ + 发送忘记密码验证码视图 + 处理密码重置验证码的发送 + """ def post(self, request: HttpRequest): + """处理POST请求,发送密码重置验证码""" form = ForgetPasswordCodeForm(request.POST) + + # 验证表单数据 if not form.is_valid(): return HttpResponse("错误的邮箱") + to_email = form.cleaned_data["email"] + # 生成并发送验证码 code = generate_code() - utils.send_verify_email(to_email, code) - utils.set_code(to_email, code) + utils.send_verify_email(to_email, code) # 发送验证邮件 + utils.set_code(to_email, code) # 存储验证码(通常在缓存中) - return HttpResponse("ok") + return HttpResponse("ok") # 返回成功响应 \ No newline at end of file diff --git a/src/DjangoBlog-master/owntracks/admin.py b/src/DjangoBlog-master/owntracks/admin.py index 655b535..50ca1fc 100644 --- a/src/DjangoBlog-master/owntracks/admin.py +++ b/src/DjangoBlog-master/owntracks/admin.py @@ -1,7 +1,27 @@ +# 导入 Django 内置的 Admin 核心模块 +# django.contrib.admin 提供了完整的后台管理界面生成、数据CRUD、权限控制等功能 from django.contrib import admin # Register your models here. - +# 说明:该注释为 Django 自动生成,提示开发者在此处注册需要通过后台管理的模型 +# 注册方式:使用 admin.site.register(模型类, 自定义Admin类) 关联模型与管理配置 class OwnTrackLogsAdmin(admin.ModelAdmin): + """ + 自定义 Admin 配置类:继承自 Django 内置的 ModelAdmin + 作用:配置 OwnTrackLog 模型在后台管理界面的展示形式、操作权限、数据筛选等功能 + 若需扩展后台功能,可在此类中添加属性/方法(如列表显示字段、搜索框、过滤条件等) + """ + # pass 关键字:表示当前类暂未定义额外配置,完全使用 ModelAdmin 的默认行为 + # 默认效果: + # 1. 列表页显示模型的所有字段(id、tid、lat、lon、creation_time) + # 2. 支持点击主键(id)进入详情页编辑数据 + # 3. 支持批量删除、简单搜索(默认搜索主键字段) + # 4. 按模型 Meta 中定义的 ordering 排序(即 creation_time 升序) pass + +# 【注】当前代码缺少模型注册语句,需补充以下代码才能在后台看到该模型(否则配置不生效) +# 需先导入 OwnTrackLog 模型(从对应的 models.py 中),再注册关联 +# 完整注册代码示例: +# from .models import OwnTrackLog # 从当前应用的 models.py 导入模型类 +# admin.site.register(OwnTrackLog, OwnTrackLogsAdmin) # 关联模型与自定义Admin配置 diff --git a/src/DjangoBlog-master/owntracks/apps.py b/src/DjangoBlog-master/owntracks/apps.py index 1bc5f12..e75abb4 100644 --- a/src/DjangoBlog-master/owntracks/apps.py +++ b/src/DjangoBlog-master/owntracks/apps.py @@ -1,5 +1,26 @@ +# 导入 Django 应用配置核心类 AppConfig +# django.apps.AppConfig 是 Django 管理应用元数据的基础类,用于定义应用的名称、初始化逻辑、信号绑定等 from django.apps import AppConfig class OwntracksConfig(AppConfig): + """ + 自定义应用配置类:继承自 Django 内置的 AppConfig + 作用:管理 'owntracks' 应用的核心配置,包括应用名称、初始化行为、模型注册、信号监听等 + 每个 Django 应用建议创建独立的 AppConfig 类,便于后续扩展应用功能(如添加启动时初始化逻辑) + """ + + # 应用名称:Django 识别应用的唯一标识,必须与应用目录名一致(此处为 'owntracks') + # 作用: + # 1. 作为应用的核心标识,用于迁移命令(如 python manage.py migrate owntracks)、权限控制等 + # 2. 关联 models、views、admin 等模块,确保 Django 能正确识别应用内的组件 + # 3. 若需跨应用引用模型,需通过该名称定位(如 from owntracks.models import OwnTrackLog) name = 'owntracks' + + # 【可选扩展配置】若需添加更多应用级配置,可在此处补充(示例): + # 1. 应用verbose名称(后台管理界面显示的应用名称,支持中文) + # verbose_name = '用户轨迹管理' + # 2. 定义应用初始化逻辑(如启动时加载数据、绑定信号) + # def ready(self): + # # 导入信号处理模块(避免循环导入,需在 ready 方法内导入) + # import owntracks.signals \ No newline at end of file diff --git a/src/DjangoBlog-master/owntracks/migrations/0001_initial.py b/src/DjangoBlog-master/owntracks/migrations/0001_initial.py index 9eee55c..23733e5 100644 --- a/src/DjangoBlog-master/owntracks/migrations/0001_initial.py +++ b/src/DjangoBlog-master/owntracks/migrations/0001_initial.py @@ -1,31 +1,64 @@ # Generated by Django 4.1.7 on 2023-03-02 07:14 +# 说明:该文件为 Django 自动生成的数据迁移文件,用于创建数据库表结构 +# 生成条件:执行 makemigrations 命令时,Django 检测到 models.py 中新增 OwnTrackLog 模型后自动生成 +# 兼容版本:Django 4.1.7(迁移文件与 Django 版本强相关,修改需注意兼容性) +# 导入 Django 迁移核心模块和模型字段类 from django.db import migrations, models +# 导入 Django 时区工具(用于处理时间字段的时区一致性) import django.utils.timezone class Migration(migrations.Migration): + """ + 数据迁移类:Django 迁移系统的核心载体,用于定义数据库结构变更逻辑 + 每个迁移类对应一次数据库操作(如建表、改字段、删索引等) + """ + # 标记是否为初始迁移(首次创建模型时为 True,后续修改为 False) initial = True + # 依赖迁移列表:当前迁移依赖的其他迁移文件(为空表示无依赖) + # 若需依赖其他 app 的迁移,格式为 ['其他app名称.迁移文件名前缀'] dependencies = [ ] + # 迁移操作列表:定义具体的数据库变更操作 operations = [ + # 创建数据库表操作:对应 models.py 中的 OwnTrackLog 模型 migrations.CreateModel( + # 模型名称(必须与 models.py 中定义的类名一致) name='OwnTrackLog', + # 字段配置:与模型类中的 field 定义一一对应,决定表的列结构 fields=[ - ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), - ('tid', models.CharField(max_length=100, verbose_name='用户')), + # 主键字段:BigAutoField 为自增bigint类型,Django 默认主键类型 + ('id', models.BigAutoField( + auto_created=True, # 自动创建,无需手动赋值 + primary_key=True, # 标记为主键 + serialize=False, # 不序列化(主键默认不参与序列化) + verbose_name='ID' # 后台管理界面显示的字段名称 + )), + # 用户标识字段:CharField 对应数据库 varchar 类型 + ('tid', models.CharField( + max_length=100, # 最大长度100(必填参数) + verbose_name='用户' # 后台显示名称,支持中文 + )), + # 纬度字段:FloatField 对应数据库 float 类型,存储地理纬度(如 39.9042) ('lat', models.FloatField(verbose_name='纬度')), + # 经度字段:FloatField 对应数据库 float 类型,存储地理经度(如 116.4074) ('lon', models.FloatField(verbose_name='经度')), - ('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')), + # 创建时间字段:DateTimeField 对应数据库 datetime 类型 + ('created_time', models.DateTimeField( + default=django.utils.timezone.now, # 默认值:当前时区的当前时间 + verbose_name='创建时间' # 后台显示名称 + )), ], + # 模型元数据配置:对应模型类中的 Meta 内部类,影响表的整体属性 options={ - 'verbose_name': 'OwnTrackLogs', - 'verbose_name_plural': 'OwnTrackLogs', - 'ordering': ['created_time'], - 'get_latest_by': 'created_time', + 'verbose_name': 'OwnTrackLogs', # 单数形式的表名称(后台显示) + 'verbose_name_plural': 'OwnTrackLogs', # 复数形式的表名称(后台列表页显示) + 'ordering': ['created_time'], # 默认排序:按创建时间升序排列(-created_time 表示降序) + 'get_latest_by': 'created_time', # 支持使用 Model.objects.latest() 方法,默认按创建时间取最新记录 }, ), - ] + ] \ No newline at end of file diff --git a/src/DjangoBlog-master/owntracks/migrations/0002_alter_owntracklog_options_and_more.py b/src/DjangoBlog-master/owntracks/migrations/0002_alter_owntracklog_options_and_more.py index b4f8dec..1b5dfaf 100644 --- a/src/DjangoBlog-master/owntracks/migrations/0002_alter_owntracklog_options_and_more.py +++ b/src/DjangoBlog-master/owntracks/migrations/0002_alter_owntracklog_options_and_more.py @@ -1,22 +1,44 @@ # Generated by Django 4.2.5 on 2023-09-06 13:19 +# 说明:该文件为 Django 自动生成的**增量迁移文件**,用于更新数据库表结构 +# 生成条件:修改 models.py 中 OwnTrackLog 模型的 Meta 配置(排序字段、最新记录字段)和字段名(created_time → creation_time)后,执行 makemigrations 命令生成 +# 兼容版本:Django 4.2.5(与初始迁移文件 0001_initial 版本需匹配,避免迁移冲突) +# 核心作用:1. 重命名字段 created_time 为 creation_time;2. 更新模型元数据的排序和最新记录查询字段 +# 导入 Django 迁移核心模块(仅需 migrations,无需额外字段类,因无新增字段) from django.db import migrations class Migration(migrations.Migration): + """ + 数据迁移类:定义数据库结构的增量变更操作 + 本次迁移依赖初始迁移文件,仅修改字段名称和模型元数据,不改变表结构核心逻辑 + """ + # 依赖迁移列表:当前迁移必须在 'owntracks' 应用的 0001_initial 迁移执行后才能运行 + # 格式:['应用名称.迁移文件前缀'],确保迁移顺序正确,避免字段不存在导致的报错 dependencies = [ - ('owntracks', '0001_initial'), + ('owntracks', '0001_initial'), # 依赖初始迁移(创建 OwnTrackLog 表的迁移) ] + # 迁移操作列表:包含两个核心变更操作,按顺序执行 operations = [ + # 操作1:修改模型的元数据配置(对应 models.py 中 OwnTrackLog 类的 Meta 内部类) migrations.AlterModelOptions( - name='owntracklog', - options={'get_latest_by': 'creation_time', 'ordering': ['creation_time'], 'verbose_name': 'OwnTrackLogs', 'verbose_name_plural': 'OwnTrackLogs'}, + name='owntracklog', # 目标模型名称(必须与 models.py 中类名一致) + options={ + 'get_latest_by': 'creation_time', # 更新「查询最新记录」的字段:从 created_time 改为新字段 creation_time + # 影响 Model.objects.latest() 方法的默认查询逻辑 + 'ordering': ['creation_time'], # 更新默认排序字段:从 created_time 改为 creation_time(升序) + 'verbose_name': 'OwnTrackLogs', # 单数显示名称(未修改,与初始迁移一致) + 'verbose_name_plural': 'OwnTrackLogs', # 复数显示名称(未修改) + }, ), + # 操作2:重命名模型的字段(数据库表中对应列名也会同步修改) migrations.RenameField( - model_name='owntracklog', - old_name='created_time', - new_name='creation_time', + model_name='owntracklog', # 目标模型名称 + old_name='created_time', # 旧字段名(原模型中定义的字段名) + new_name='creation_time', # 新字段名(修改后的字段名) + # 说明:该操作会同步更新数据库表中对应的列名(created_time → creation_time),且保留原有字段数据 + # 无需手动处理数据迁移,Django 会自动完成字段名映射和数据保留 ), - ] + ] \ No newline at end of file diff --git a/src/DjangoBlog-master/owntracks/models.py b/src/DjangoBlog-master/owntracks/models.py index 760942c..206522e 100644 --- a/src/DjangoBlog-master/owntracks/models.py +++ b/src/DjangoBlog-master/owntracks/models.py @@ -1,20 +1,54 @@ +# 导入 Django ORM 核心模块:models 用于定义数据模型,对应数据库表结构 from django.db import models +# 导入 Django 时区工具:now() 用于获取当前时区的时间(避免时区不一致问题) from django.utils.timezone import now # Create your models here. +# 说明:该注释为 Django 自动生成,提示开发者在此处定义数据模型类 +# 模型类与数据库表的映射关系:每个模型类对应一张数据库表,类属性对应表字段 class OwnTrackLog(models.Model): - tid = models.CharField(max_length=100, null=False, verbose_name='用户') - lat = models.FloatField(verbose_name='纬度') + """ + 轨迹数据模型类:继承自 Django 内置的 models.Model(所有数据模型的基类) + 核心作用:存储用户的地理轨迹信息(用户标识、经纬度、创建时间) + 映射数据库表名:默认生成规则为「应用名_模型名小写」→ owntracks_owntracklog + """ + + # 1. 用户标识字段:存储用户唯一标识(如设备ID、用户名等) + tid = models.CharField( + max_length=100, # 字段最大长度(CharField 必填参数),适配多数用户标识场景 + null=False, # 数据库层面不允许为空(必填字段),确保数据完整性 + verbose_name='用户' # Django 后台管理界面显示的字段名称(支持中文) + ) + + # 2. 纬度字段:存储地理纬度值(如 39.9042,支持正负值,适配全球地理坐标) + lat = models.FloatField(verbose_name='纬度') # FloatField 对应数据库 float 类型,满足精度需求 + + # 3. 经度字段:存储地理经度值(如 116.4074,与纬度配合定位地理坐标) lon = models.FloatField(verbose_name='经度') - creation_time = models.DateTimeField('创建时间', default=now) + + # 4. 创建时间字段:记录轨迹数据的生成时间 + creation_time = models.DateTimeField( + '创建时间', # verbose_name 的简写形式(第一个参数直接指定后台显示名称) + default=now # 默认值:当前时区的当前时间(now 是可调用对象,每次创建记录时动态获取) + # 注:区别于 datetime.datetime.now(),django.utils.timezone.now() 包含时区信息,符合 Django 时区配置 + ) def __str__(self): + """ + 模型实例的字符串表示方法: + 作用:在 Django 后台、终端打印实例时,显示直观的标识(而非默认的 ) + 返回值:以用户标识(tid)作为实例的字符串描述,便于区分不同用户的轨迹数据 + """ return self.tid class Meta: - ordering = ['creation_time'] - verbose_name = "OwnTrackLogs" - verbose_name_plural = verbose_name - get_latest_by = 'creation_time' + """ + 模型元数据类:用于配置模型的整体属性(不对应表字段,影响表的行为和显示) + 所有配置仅作用于当前模型,不影响其他模型 + """ + ordering = ['creation_time'] # 默认排序规则:按创建时间升序排列(-creation_time 表示降序) + verbose_name = "OwnTrackLogs" # 后台管理界面显示的「单数模型名称」 + verbose_name_plural = verbose_name # 后台管理界面显示的「复数模型名称」(此处与单数一致,避免英文复数歧义) + get_latest_by = 'creation_time' # 支持 Model.objects.latest() 方法,默认按创建时间获取最新一条记录 \ No newline at end of file diff --git a/src/DjangoBlog-master/owntracks/tests.py b/src/DjangoBlog-master/owntracks/tests.py index 3b4b9d8..ce208f5 100644 --- a/src/DjangoBlog-master/owntracks/tests.py +++ b/src/DjangoBlog-master/owntracks/tests.py @@ -1,64 +1,124 @@ +# 导入 JSON 模块:用于将 Python 字典序列化为 JSON 字符串(适配接口的 JSON 数据格式) import json +# 导入 Django 测试核心工具: +# - Client:模拟客户端发起 HTTP 请求(GET/POST 等),用于测试视图接口 +# - RequestFactory:生成原始请求对象(适用于单独测试视图函数/类,本用例未直接使用) +# - TestCase:Django 单元测试基类,提供断言方法、测试环境初始化/清理等功能 from django.test import Client, RequestFactory, TestCase +# 导入跨应用模型:BlogUser(用户模型,用于测试登录权限相关接口) from accounts.models import BlogUser +# 导入当前应用的测试目标模型:OwnTrackLog(轨迹数据模型,用于验证数据读写) from .models import OwnTrackLog # Create your tests here. +# 说明:该注释为 Django 自动生成,提示开发者在此处定义测试类/测试方法 class OwnTrackLogTest(TestCase): + """ + 轨迹数据相关接口与模型单元测试类: + 继承自 TestCase,专注测试 OwnTrackLog 模型的数据读写及相关视图接口(/owntracks/ 下的接口) + 测试覆盖场景:数据提交(合法/非法)、接口权限控制、响应状态码验证 + """ + def setUp(self): + """ + 测试前置初始化方法: + 在每个测试方法执行前自动调用,用于创建测试所需的公共资源 + 作用:避免重复代码,确保每个测试方法的环境一致性 + """ + # 初始化客户端对象:模拟浏览器发起 HTTP 请求,后续所有接口测试均通过该对象执行 self.client = Client() + # 初始化请求工厂对象:用于生成自定义请求(本用例未直接使用,预留扩展) self.factory = RequestFactory() def test_own_track_log(self): + """ + 核心测试方法:命名以 test_ 开头(Django 测试框架自动识别执行) + 测试内容: + 1. 合法轨迹数据提交(完整字段)→ 验证数据是否成功写入数据库 + 2. 非法轨迹数据提交(缺少必填字段)→ 验证数据是否被拒绝(数据库无新增) + 3. 未登录状态访问需权限接口 → 验证是否重定向(302) + 4. 管理员登录后访问接口 → 验证是否正常响应(200) + 5. 管理员登录后操作模型 → 验证数据写入及接口查询功能 + """ + # --------------- 场景1:提交完整合法的轨迹数据(tid、lat、lon 字段齐全)--------------- + # 构造合法的请求数据字典:包含模型所需的所有必填字段 o = { - 'tid': 12, - 'lat': 123.123, - 'lon': 134.341 + 'tid': 12, # 用户标识(整数类型,模型中 CharField 会自动转换为字符串存储) + 'lat': 123.123, # 纬度(合法浮点数) + 'lon': 134.341 # 经度(合法浮点数) } + # 模拟 POST 请求:向轨迹提交接口发送 JSON 格式数据 self.client.post( - '/owntracks/logtracks', - json.dumps(o), - content_type='application/json') - length = len(OwnTrackLog.objects.all()) - self.assertEqual(length, 1) + '/owntracks/logtracks', # 请求接口路径(需与 urls.py 中配置一致) + json.dumps(o), # 请求体:将字典序列化为 JSON 字符串 + content_type='application/json' # 指定请求头:声明数据格式为 JSON + ) + + # 验证:数据库中是否新增 1 条轨迹记录(断言实际数量与预期一致) + length = len(OwnTrackLog.objects.all()) # 查询所有轨迹记录的数量 + self.assertEqual(length, 1) # 断言数量为 1 → 验证合法数据提交成功 + # --------------- 场景2:提交非法轨迹数据(缺少必填字段 lon)--------------- + # 构造非法请求数据:缺少经度(lon)字段(模型中 lon 为必填字段,无 null=True 配置) o = { - 'tid': 12, - 'lat': 123.123 + 'tid': 12, # 用户标识 + 'lat': 123.123 # 纬度(仅含该字段,缺少 lon) } + # 再次向同一接口发送非法数据 self.client.post( '/owntracks/logtracks', json.dumps(o), content_type='application/json') + + # 验证:数据库记录数量是否仍为 1(非法数据未被写入) length = len(OwnTrackLog.objects.all()) - self.assertEqual(length, 1) + self.assertEqual(length, 1) # 断言数量不变 → 验证非法数据被拒绝 + # --------------- 场景3:未登录状态访问需权限的地图展示接口 --------------- + # 模拟 GET 请求:未登录状态下访问 /owntracks/show_maps 接口 rsp = self.client.get('/owntracks/show_maps') - self.assertEqual(rsp.status_code, 302) + # 验证:响应状态码是否为 302(重定向,通常跳转到登录页) + self.assertEqual(rsp.status_code, 302) # 断言重定向 → 验证接口权限控制生效 + + # --------------- 场景4:创建管理员用户并登录 --------------- + # 创建超级用户(管理员):用于测试登录后访问权限接口 user = BlogUser.objects.create_superuser( - email="liangliangyy1@gmail.com", - username="liangliangyy1", - password="liangliangyy1") + email="liangliangyy1@gmail.com", # 邮箱(超级用户必填字段) + username="liangliangyy1", # 用户名(登录账号) + password="liangliangyy1") # 密码(登录密码) + # 模拟管理员登录:使用上述创建的账号密码登录系统 self.client.login(username='liangliangyy1', password='liangliangyy1') + + # 手动创建一条轨迹记录(用于后续接口查询测试) s = OwnTrackLog() - s.tid = 12 - s.lon = 123.234 - s.lat = 34.234 - s.save() + s.tid = 12 # 设置用户标识 + s.lon = 123.234 # 设置经度 + s.lat = 34.234 # 设置纬度 + # creation_time 字段使用默认值(当前时间),无需手动赋值 + s.save() # 保存到数据库 + # --------------- 场景5:登录后访问各类接口,验证响应状态 --------------- + # 1. 访问日期列表接口 → 预期 200(正常响应) rsp = self.client.get('/owntracks/show_dates') self.assertEqual(rsp.status_code, 200) + + # 2. 再次访问地图展示接口 → 预期 200(已登录,权限通过) rsp = self.client.get('/owntracks/show_maps') self.assertEqual(rsp.status_code, 200) + + # 3. 访问轨迹数据查询接口(无日期参数)→ 预期 200 rsp = self.client.get('/owntracks/get_datas') self.assertEqual(rsp.status_code, 200) + + # 4. 访问轨迹数据查询接口(带日期参数)→ 预期 200 rsp = self.client.get('/owntracks/get_datas?date=2018-02-26') self.assertEqual(rsp.status_code, 200) + # 注:此处仅验证接口是否正常响应(状态码 200),未验证返回数据的正确性,可根据需求补充数据断言 \ No newline at end of file diff --git a/src/DjangoBlog-master/owntracks/urls.py b/src/DjangoBlog-master/owntracks/urls.py index c19ada8..60221ef 100644 --- a/src/DjangoBlog-master/owntracks/urls.py +++ b/src/DjangoBlog-master/owntracks/urls.py @@ -1,12 +1,40 @@ +# 导入 Django 路由核心函数:path 用于定义 URL 路径与视图函数的映射关系 from django.urls import path +# 导入当前应用的视图模块:views 中包含所有路由对应的业务处理函数 from . import views +# 应用路由命名空间:用于区分不同应用的同名路由(避免反向解析时冲突) +# 作用:在模板或视图中通过「app_name:route_name」反向生成 URL(如 reverse('owntracks:logtracks')) app_name = "owntracks" +# 路由配置列表:每个 path 对应一条 URL 规则,按定义顺序匹配(优先匹配靠前的规则) urlpatterns = [ - path('owntracks/logtracks', views.manage_owntrack_log, name='logtracks'), - path('owntracks/show_maps', views.show_maps, name='show_maps'), - path('owntracks/get_datas', views.get_datas, name='get_datas'), - path('owntracks/show_dates', views.show_log_dates, name='show_dates') -] + # 1. 轨迹数据提交接口:接收客户端发送的轨迹数据(经纬度、用户标识)并存储 + path( + 'owntracks/logtracks', # URL 路径:客户端访问的接口地址(需完整匹配) + views.manage_owntrack_log, # 对应的视图函数:处理该 URL 的业务逻辑(如数据验证、写入数据库) + name='logtracks' # 路由别名:用于反向解析 URL(替代硬编码路径,便于维护) + ), + + # 2. 地图展示接口:渲染包含用户轨迹的地图页面(需登录权限) + path( + 'owntracks/show_maps', # URL 路径:地图展示页面地址 + views.show_maps, # 视图函数:查询轨迹数据并传递给模板渲染地图 + name='show_maps' # 路由别名:如模板中使用 {% url 'owntracks:show_maps' %} 生成 URL + ), + + # 3. 轨迹数据查询接口:返回指定条件的轨迹数据(如按日期筛选),通常用于前端异步请求 + path( + 'owntracks/get_datas', # URL 路径:数据查询接口地址(支持带查询参数,如 ?date=2023-09-06) + views.get_datas, # 视图函数:处理查询条件,从数据库筛选数据并返回(如 JSON 格式) + name='get_datas' # 路由别名:前端 AJAX 请求时可通过反向解析获取接口地址 + ), + + # 4. 轨迹日期列表接口:返回所有轨迹数据的日期列表(用于前端筛选日期选择) + path( + 'owntracks/show_dates', # URL 路径:日期列表展示/查询地址 + views.show_log_dates, # 视图函数:查询数据库中轨迹数据的所有日期并返回(去重处理) + name='show_dates' # 路由别名:用于反向生成日期筛选接口的 URL + ) +] \ No newline at end of file diff --git a/src/DjangoBlog-master/owntracks/views.py b/src/DjangoBlog-master/owntracks/views.py index 4c72bdd..11289cd 100644 --- a/src/DjangoBlog-master/owntracks/views.py +++ b/src/DjangoBlog-master/owntracks/views.py @@ -1,127 +1,237 @@ # Create your views here. -import datetime -import itertools -import json -import logging -from datetime import timezone -from itertools import groupby - -import django -import requests -from django.contrib.auth.decorators import login_required -from django.http import HttpResponse -from django.http import JsonResponse -from django.shortcuts import render -from django.views.decorators.csrf import csrf_exempt - -from .models import OwnTrackLog - +# 说明:该文件为 Django 视图层核心文件,包含所有 /owntracks/ 路由对应的业务处理逻辑 +# 视图函数职责:接收请求、处理数据(数据库读写/第三方接口调用)、返回响应(页面/JSON/状态码) + +# 导入标准库模块 +import datetime # 处理日期时间相关操作(如日期计算、格式化) +import itertools # 提供迭代器工具(如切片、分组,用于批量处理经纬度数据) +import json # 处理 JSON 数据序列化/反序列化(适配接口请求/响应) +import logging # 日志模块:记录业务日志(信息/错误),便于问题排查 +from datetime import timezone # 处理时区相关(确保时间计算一致性) +from itertools import groupby # 分组工具:按用户标识(tid)分组轨迹数据 + +# 导入第三方库/框架模块 +import django # Django 核心模块(用于时区时间处理) +import requests # HTTP 请求库:调用高德地图坐标转换接口 +from django.contrib.auth.decorators import login_required # 登录验证装饰器:限制未登录用户访问 +from django.http import HttpResponse # 基础响应类:返回文本/状态码响应 +from django.http import JsonResponse # JSON 响应类:返回 JSON 格式数据(适配前端异步请求) +from django.shortcuts import render # 页面渲染函数:加载模板并返回 HTML 页面 +from django.views.decorators.csrf import csrf_exempt # CSRF 豁免装饰器:关闭跨站请求伪造保护(适配第三方客户端提交数据) + +# 导入当前应用模块 +from .models import OwnTrackLog # 轨迹数据模型:用于数据库读写操作 + +# 初始化日志对象:按当前模块名创建日志实例,日志输出会携带模块标识 logger = logging.getLogger(__name__) -@csrf_exempt +@csrf_exempt # 豁免 CSRF 验证:因客户端(如设备/第三方系统)可能无法提供 CSRF Token,故关闭保护 def manage_owntrack_log(request): + """ + 轨迹数据提交接口视图: + 功能:接收客户端 POST 提交的 JSON 格式轨迹数据(tid/经纬度),验证后写入数据库 + 请求方式:POST(仅支持 POST,其他方式会因缺少请求体报错) + 请求体格式:{"tid": "用户标识", "lat": 纬度值, "lon": 经度值} + 响应: + - 成功写入:返回 "ok"(HTTP 200) + - 数据不完整:返回 "data error"(HTTP 200) + - 异常报错:返回 "error"(HTTP 200)并记录错误日志 + """ try: + # 读取请求体:将 JSON 字符串解码为 Python 字典(utf-8 编码适配中文/特殊字符) s = json.loads(request.read().decode('utf-8')) + # 提取请求数据中的核心字段(用户标识、纬度、经度) tid = s['tid'] lat = s['lat'] lon = s['lon'] + # 记录信息日志:打印提交的轨迹数据(便于追踪数据流转) logger.info( 'tid:{tid}.lat:{lat}.lon:{lon}'.format( - tid=tid, lat=lat, lon=lon)) + tid=tid, lat=lat, lon=lon) + ) + + # 数据合法性校验:确保核心字段非空(避免写入无效数据) if tid and lat and lon: + # 创建模型实例并赋值 m = OwnTrackLog() - m.tid = tid - m.lat = lat - m.lon = lon - m.save() - return HttpResponse('ok') + m.tid = tid # 用户标识 + m.lat = lat # 纬度 + m.lon = lon # 经度 + # creation_time 字段使用默认值(当前时间),无需手动赋值 + m.save() # 保存到数据库 + return HttpResponse('ok') # 响应成功标识 else: + # 数据不完整:返回错误提示 return HttpResponse('data error') except Exception as e: - logger.error(e) - return HttpResponse('error') + # 捕获所有异常(如 JSON 解析失败、字段缺失、数据库报错等) + logger.error(e) # 记录错误日志(包含异常堆栈信息,便于排查) + return HttpResponse('error') # 响应错误标识 -@login_required +@login_required # 登录验证:仅登录用户可访问,未登录自动重定向到登录页 def show_maps(request): + """ + 地图展示页面视图: + 功能:渲染包含用户轨迹的地图页面(需管理员权限) + 请求方式:GET + 请求参数:?date=YYYY-MM-DD(可选,默认当前日期) + 响应: + - 管理员登录:返回地图 HTML 页面(携带日期参数) + - 非管理员登录:返回 403 禁止访问 + """ + # 权限二次校验:仅超级管理员(is_superuser=True)可访问,普通登录用户无权限 if request.user.is_superuser: + # 计算默认日期:当前 UTC 时间的日期(格式:YYYY-MM-DD) defaultdate = str(datetime.datetime.now(timezone.utc).date()) + # 获取请求参数中的日期(若未传则使用默认日期) date = request.GET.get('date', defaultdate) + # 构造模板上下文:传递日期参数给前端模板(用于筛选该日期的轨迹数据) context = { 'date': date } + # 渲染模板:加载 show_maps.html 模板并传入上下文,返回 HTML 响应 return render(request, 'owntracks/show_maps.html', context) else: + # 非管理员:导入并返回 403 禁止访问响应 from django.http import HttpResponseForbidden return HttpResponseForbidden() -@login_required +@login_required # 登录验证:仅登录用户可访问 def show_log_dates(request): + """ + 轨迹日期列表页面视图: + 功能:查询数据库中所有轨迹数据的日期(去重),渲染日期列表页面(用于前端筛选) + 请求方式:GET + 响应:返回日期列表 HTML 页面(包含去重后的所有轨迹日期) + """ + # 查询所有轨迹记录的创建时间(仅取 creation_time 字段,flat=True 返回一维列表) dates = OwnTrackLog.objects.values_list('creation_time', flat=True) + # 日期处理: + # 1. map 转换:将 datetime 对象格式化为 'YYYY-MM-DD' 字符串 + # 2. set 去重:去除重复日期 + # 3. sorted 排序:按日期升序排列 + # 4. list 转换:转为列表用于模板渲染 results = list(sorted(set(map(lambda x: x.strftime('%Y-%m-%d'), dates)))) + # 构造上下文:传递日期列表给模板 context = { 'results': results } + # 渲染日期列表模板 return render(request, 'owntracks/show_log_dates.html', context) def convert_to_amap(locations): - convert_result = [] - it = iter(locations) - + """ + 高德地图坐标转换工具函数: + 功能:将 GPS 坐标系(WGS84)的经纬度转换为高德坐标系(GCJ02) + 原因:GPS 原始坐标在高德地图上会有偏移,转换后可精准定位 + 参数:locations - OwnTrackLog 模型实例列表(包含 lon/lat 字段) + 返回值:转换后的经纬度字符串(格式:"lon1,lat1;lon2,lat2;...") + 限制:高德接口单次最多支持 30 个坐标,故分批次转换 + """ + convert_result = [] # 存储所有批次的转换结果 + it = iter(locations) # 将列表转为迭代器,便于分批次切片 + + # 循环分批次处理:每次取 30 个坐标(适配高德接口限制) item = list(itertools.islice(it, 30)) while item: + # 构造坐标字符串:将每个实例的 lon/lat 拼接为 "lon,lat",再用 ";" 连接多个坐标 + # set 去重:避免重复坐标提交(减少接口调用量) datas = ';'.join( - set(map(lambda x: str(x.lon) + ',' + str(x.lat), item))) + set(map(lambda x: str(x.lon) + ',' + str(x.lat), item)) + ) - key = '8440a376dfc9743d8924bf0ad141f28e' - api = 'http://restapi.amap.com/v3/assistant/coordinate/convert' + # 高德地图坐标转换接口配置 + key = '8440a376dfc9743d8924bf0ad141f28e' # 高德开发者密钥(需替换为有效密钥) + api = 'http://restapi.amap.com/v3/assistant/coordinate/convert' # 转换接口地址 query = { - 'key': key, - 'locations': datas, - 'coordsys': 'gps' + 'key': key, # 开发者密钥(必填) + 'locations': datas, # 待转换的坐标字符串 + 'coordsys': 'gps' # 源坐标系:gps(WGS84) } + + # 调用高德接口(GET 请求) rsp = requests.get(url=api, params=query) + # 解析接口响应(JSON 转字典) result = json.loads(rsp.text) + + # 若响应包含 "locations" 字段(转换成功),添加到结果列表 if "locations" in result: convert_result.append(result['locations']) + + # 处理下一批次坐标 item = list(itertools.islice(it, 30)) + # 拼接所有批次结果,返回统一格式的坐标字符串 return ";".join(convert_result) -@login_required +@login_required # 登录验证:仅登录用户可访问 def get_datas(request): + """ + 轨迹数据查询接口视图: + 功能:按日期筛选轨迹数据,按用户标识(tid)分组,返回 JSON 格式的轨迹路径(经纬度列表) + 请求方式:GET + 请求参数:?date=YYYY-MM-DD(可选,默认当前日期) + 响应:JSON 数组(格式:[{"name": "tid1", "path": [[lon1,lat1], [lon2,lat2], ...]}, ...]) + """ + # 获取当前 UTC 时间(带时区信息,确保与数据库时间字段时区一致) now = django.utils.timezone.now().replace(tzinfo=timezone.utc) + # 构造默认查询日期:当前日期的 00:00:00(UTC 时间) querydate = django.utils.timezone.datetime( - now.year, now.month, now.day, 0, 0, 0) + now.year, now.month, now.day, 0, 0, 0 + ) + + # 若请求携带 date 参数,解析为指定日期的 00:00:00 if request.GET.get('date', None): + # 拆分日期字符串(YYYY-MM-DD → [年, 月, 日])并转为整数 date = list(map(lambda x: int(x), request.GET.get('date').split('-'))) querydate = django.utils.timezone.datetime( - date[0], date[1], date[2], 0, 0, 0) + date[0], date[1], date[2], 0, 0, 0 + ) + + # 构造查询结束日期:查询日期的次日 00:00:00(即筛选 [querydate, nextdate) 区间的数据) nextdate = querydate + datetime.timedelta(days=1) + + # 数据库查询:筛选指定日期区间内的所有轨迹记录 models = OwnTrackLog.objects.filter( - creation_time__range=(querydate, nextdate)) - result = list() + creation_time__range=(querydate, nextdate) + ) + + result = list() # 存储最终返回的 JSON 数据 + + # 若查询到数据,按 tid 分组并构造轨迹路径 if models and len(models): + # 1. sorted:按 tid 排序(确保相同 tid 的记录连续,为 groupby 分组做准备) + # 2. groupby:按 tid 分组,key 为分组依据(tid) for tid, item in groupby( sorted(models, key=lambda k: k.tid), key=lambda k: k.tid): + # 构造单个用户的轨迹数据字典 d = dict() - d["name"] = tid - paths = list() - # 使用高德转换后的经纬度 + d["name"] = tid # 用户标识(用于前端区分不同用户的轨迹) + paths = list() # 存储该用户的经纬度路径列表 + + # 【可选】使用高德转换后的经纬度(当前注释未启用,默认使用 GPS 原始坐标) # locations = convert_to_amap( - # sorted(item, key=lambda x: x.creation_time)) + # sorted(item, key=lambda x: x.creation_time) # 按创建时间排序,确保轨迹顺序正确 + # ) # for i in locations.split(';'): - # paths.append(i.split(',')) - # 使用GPS原始经纬度 + # paths.append(i.split(',')) # 拆分坐标为 [lon, lat] 列表 + + # 使用 GPS 原始经纬度(默认启用) + # 按创建时间排序:确保轨迹点按时间顺序排列(避免路径错乱) for location in sorted(item, key=lambda x: x.creation_time): + # 转为字符串格式(避免 JSON 序列化时的精度问题),添加到路径列表 paths.append([str(location.lon), str(location.lat)]) - d["path"] = paths - result.append(d) + + d["path"] = paths # 关联路径列表到用户字典 + result.append(d) # 添加到最终结果列表 + + # 返回 JSON 响应:safe=False 允许返回非字典类型(此处为列表) return JsonResponse(result, safe=False) diff --git a/src/DjangoBlog-master/servermanager/MemcacheStorage.py b/src/DjangoBlog-master/servermanager/MemcacheStorage.py index 38a7990..2019f91 100644 --- a/src/DjangoBlog-master/servermanager/MemcacheStorage.py +++ b/src/DjangoBlog-master/servermanager/MemcacheStorage.py @@ -5,28 +5,83 @@ from djangoblog.utils import cache class MemcacheStorage(SessionStorage): + """ + 基于Memcache的会话存储实现类 + + 该类继承自SessionStorage,使用memcache作为后端存储来管理会话数据 + + Args: + prefix (str): 存储键名的前缀,默认为'ws_' + """ def __init__(self, prefix='ws_'): self.prefix = prefix self.cache = cache @property def is_available(self): + """ + 检查存储是否可用 + + 通过设置并获取一个测试值来验证存储服务的可用性 + + Returns: + bool: 存储服务可用返回True,否则返回False + """ value = "1" self.set('checkavaliable', value=value) return value == self.get('checkavaliable') def key_name(self, s): + """ + 生成带前缀的完整键名 + + Args: + s (str): 原始键名 + + Returns: + str: 添加前缀后的完整键名 + """ return '{prefix}{s}'.format(prefix=self.prefix, s=s) def get(self, id): + """ + 根据ID获取会话数据 + + Args: + id (str): 会话ID + + Returns: + dict: 解析后的会话数据字典 + """ + # 构造完整的缓存键名 id = self.key_name(id) + # 从缓存中获取会话数据,如果不存在则返回空JSON对象 session_json = self.cache.get(id) or '{}' + # 将JSON字符串解析为Python对象并返回 return json_loads(session_json) def set(self, id, value): + """ + 设置会话数据 + + Args: + id (str): 会话ID + value (any): 要存储的会话数据 + """ + # 构造完整的缓存键名 id = self.key_name(id) + # 将数据序列化为JSON字符串并存储到缓存中 self.cache.set(id, json_dumps(value)) def delete(self, id): + """ + 删除指定ID的会话数据 + + Args: + id (str): 要删除的会话ID + """ + # 构造完整的缓存键名 id = self.key_name(id) + # 从缓存中删除对应的会话数据 self.cache.delete(id) + diff --git a/src/DjangoBlog-master/servermanager/__init__.py b/src/DjangoBlog-master/servermanager/__init__.py index e69de29..34f89ee 100644 --- a/src/DjangoBlog-master/servermanager/__init__.py +++ b/src/DjangoBlog-master/servermanager/__init__.py @@ -0,0 +1 @@ +#初始化 diff --git a/src/DjangoBlog-master/servermanager/admin.py b/src/DjangoBlog-master/servermanager/admin.py index f26f4f6..757ba1e 100644 --- a/src/DjangoBlog-master/servermanager/admin.py +++ b/src/DjangoBlog-master/servermanager/admin.py @@ -3,10 +3,25 @@ from django.contrib import admin class CommandsAdmin(admin.ModelAdmin): + """ + 命令管理后台类 + + 用于在Django管理后台中展示和管理命令信息,配置了列表页面显示的字段 + """ list_display = ('title', 'command', 'describe') class EmailSendLogAdmin(admin.ModelAdmin): + """ + 邮件发送日志管理后台类 + + 用于在Django管理后台中展示和管理邮件发送日志信息,配置了列表页面显示的字段 + 和只读字段,并重写了权限控制方法 + + Attributes: + list_display: 列表页面显示的字段元组 + readonly_fields: 只读字段元组 + """ list_display = ('title', 'emailto', 'send_result', 'creation_time') readonly_fields = ( 'title', @@ -16,4 +31,17 @@ class EmailSendLogAdmin(admin.ModelAdmin): 'content') def has_add_permission(self, request): + """ + 控制是否具有添加新记录的权限 + + 重写父类方法,禁止用户在管理后台手动添加邮件发送日志记录 + + Args: + request: HTTP请求对象 + + Returns: + bool: 总是返回False,表示没有添加权限 + """ return False + + diff --git a/src/DjangoBlog-master/servermanager/api/blogapi.py b/src/DjangoBlog-master/servermanager/api/blogapi.py index 8a4d6ac..4e8bfaf 100644 --- a/src/DjangoBlog-master/servermanager/api/blogapi.py +++ b/src/DjangoBlog-master/servermanager/api/blogapi.py @@ -1,27 +1,72 @@ +from haystack.query import SearchQuerySet +#hz代码注释 + from haystack.query import SearchQuerySet from blog.models import Article, Category class BlogApi: + """ + 博客API类,提供文章搜索、分类获取等相关功能 + + Attributes: + searchqueryset (SearchQuerySet): 搜索查询集对象 + __max_takecount__ (int): 最大返回记录数,默认为8 + """ def __init__(self): + """ + 初始化BlogApi实例 + """ self.searchqueryset = SearchQuerySet() self.searchqueryset.auto_query('') self.__max_takecount__ = 8 def search_articles(self, query): + """ + 根据查询关键字搜索文章 + + Args: + query (str): 搜索关键字 + + Returns: + list: 匹配的文章列表,最多返回__max_takecount__条记录 + """ sqs = self.searchqueryset.auto_query(query) sqs = sqs.load_all() return sqs[:self.__max_takecount__] def get_category_lists(self): + """ + 获取所有文章分类列表 + + Returns: + QuerySet: 所有分类对象的查询集 + """ return Category.objects.all() def get_category_articles(self, categoryname): + """ + 根据分类名称获取该分类下的文章列表 + + Args: + categoryname (str): 分类名称 + + Returns: + QuerySet or None: 指定分类下的文章查询集,最多返回__max_takecount__条记录, + 如果没有找到相关文章则返回None + """ articles = Article.objects.filter(category__name=categoryname) if articles: return articles[:self.__max_takecount__] return None def get_recent_articles(self): + """ + 获取最近发布的文章列表 + + Returns: + QuerySet: 最近发布的文章查询集,最多返回__max_takecount__条记录 + """ return Article.objects.all()[:self.__max_takecount__] + diff --git a/src/DjangoBlog-master/servermanager/apps.py b/src/DjangoBlog-master/servermanager/apps.py index 03cc38d..1d5ef59 100644 --- a/src/DjangoBlog-master/servermanager/apps.py +++ b/src/DjangoBlog-master/servermanager/apps.py @@ -2,4 +2,14 @@ from django.apps import AppConfig class ServermanagerConfig(AppConfig): + """ + Django应用配置类 + + 该类用于配置servermanager应用的基本信息,继承自Django的AppConfig基类。 + 通过设置name属性来指定应用的名称,Django框架会使用这个配置来识别和管理应用。 + + 属性: + name (str): 应用的名称,用于Django框架识别该应用模块 + """ name = 'servermanager' + diff --git a/src/DjangoBlog-master/servermanager/migrations/0001_initial.py b/src/DjangoBlog-master/servermanager/migrations/0001_initial.py index bbdbf77..10d92d0 100644 --- a/src/DjangoBlog-master/servermanager/migrations/0001_initial.py +++ b/src/DjangoBlog-master/servermanager/migrations/0001_initial.py @@ -1,9 +1,16 @@ # Generated by Django 4.1.7 on 2023-03-02 07:14 - +#hz代码注释 from django.db import migrations, models class Migration(migrations.Migration): + """ + Django数据库迁移类,用于创建初始数据表结构 + + 该迁移文件包含两个模型的创建操作: + 1. commands模型 - 用于存储命令信息 + 2. EmailSendLog模型 - 用于记录邮件发送日志 + """ initial = True @@ -11,6 +18,7 @@ class Migration(migrations.Migration): ] operations = [ + # 创建commands数据表,用于存储命令相关信息 migrations.CreateModel( name='commands', fields=[ @@ -26,6 +34,7 @@ class Migration(migrations.Migration): 'verbose_name_plural': '命令', }, ), + # 创建EmailSendLog数据表,用于记录邮件发送日志信息 migrations.CreateModel( name='EmailSendLog', fields=[ @@ -43,3 +52,4 @@ class Migration(migrations.Migration): }, ), ] + diff --git a/src/DjangoBlog-master/servermanager/migrations/0002_alter_emailsendlog_options_and_more.py b/src/DjangoBlog-master/servermanager/migrations/0002_alter_emailsendlog_options_and_more.py index 4858857..22716ae 100644 --- a/src/DjangoBlog-master/servermanager/migrations/0002_alter_emailsendlog_options_and_more.py +++ b/src/DjangoBlog-master/servermanager/migrations/0002_alter_emailsendlog_options_and_more.py @@ -1,29 +1,38 @@ # Generated by Django 4.2.5 on 2023-09-06 13:19 - +#hx代码注释 from django.db import migrations class Migration(migrations.Migration): + """ + Django数据库迁移类,用于执行模型字段重命名和模型选项修改操作 + + 该迁移依赖于servermanager应用的0001_initial迁移文件 + """ dependencies = [ ('servermanager', '0001_initial'), ] operations = [ + # 修改EmailSendLog模型的元数据选项,设置排序规则和显示名称 migrations.AlterModelOptions( name='emailsendlog', options={'ordering': ['-creation_time'], 'verbose_name': '邮件发送log', 'verbose_name_plural': '邮件发送log'}, ), + # 重命名Commands模型的created_time字段为creation_time migrations.RenameField( model_name='commands', old_name='created_time', new_name='creation_time', ), + # 重命名Commands模型的last_mod_time字段为last_modify_time migrations.RenameField( model_name='commands', old_name='last_mod_time', new_name='last_modify_time', ), + # 重命名EmailSendLog模型的created_time字段为creation_time migrations.RenameField( model_name='emailsendlog', old_name='created_time', diff --git a/src/DjangoBlog-master/servermanager/models.py b/src/DjangoBlog-master/servermanager/models.py index 4326c65..169ec39 100644 --- a/src/DjangoBlog-master/servermanager/models.py +++ b/src/DjangoBlog-master/servermanager/models.py @@ -3,6 +3,18 @@ from django.db import models # Create your models here. class commands(models.Model): + """ + 命令模型类 + + 用于存储命令相关信息的数据库模型 + + Attributes: + title (CharField): 命令标题,最大长度300字符 + command (CharField): 命令内容,最大长度2000字符 + describe (CharField): 命令描述,最大长度300字符 + creation_time (DateTimeField): 创建时间,自动设置为记录创建时的时间 + last_modify_time (DateTimeField): 修改时间,自动更新为记录每次修改的时间 + """ title = models.CharField('命令标题', max_length=300) command = models.CharField('命令', max_length=2000) describe = models.CharField('命令描述', max_length=300) @@ -10,14 +22,37 @@ class commands(models.Model): last_modify_time = models.DateTimeField('修改时间', auto_now=True) def __str__(self): + """ + 返回命令对象的字符串表示 + + Returns: + str: 命令的标题 + """ return self.title class Meta: + """ + 模型元数据配置 + + 配置模型在Django管理界面中的显示名称 + """ verbose_name = '命令' verbose_name_plural = verbose_name class EmailSendLog(models.Model): + """ + 邮件发送日志模型类 + + 用于记录邮件发送历史和结果的数据库模型 + + Attributes: + emailto (CharField): 收件人邮箱地址,最大长度300字符 + title (CharField): 邮件标题,最大长度2000字符 + content (TextField): 邮件正文内容 + send_result (BooleanField): 邮件发送结果,True表示成功,False表示失败 + creation_time (DateTimeField): 创建时间,自动设置为记录创建时的时间 + """ emailto = models.CharField('收件人', max_length=300) title = models.CharField('邮件标题', max_length=2000) content = models.TextField('邮件内容') @@ -25,9 +60,20 @@ class EmailSendLog(models.Model): creation_time = models.DateTimeField('创建时间', auto_now_add=True) def __str__(self): + """ + 返回邮件发送日志对象的字符串表示 + + Returns: + str: 邮件的标题 + """ return self.title class Meta: + """ + 模型元数据配置 + + 配置模型在Django管理界面中的显示名称和排序规则 + """ verbose_name = '邮件发送log' verbose_name_plural = verbose_name ordering = ['-creation_time'] diff --git a/src/DjangoBlog-master/servermanager/robot.py b/src/DjangoBlog-master/servermanager/robot.py index 7b45736..25adb96 100644 --- a/src/DjangoBlog-master/servermanager/robot.py +++ b/src/DjangoBlog-master/servermanager/robot.py @@ -13,29 +13,46 @@ from servermanager.api.blogapi import BlogApi from servermanager.api.commonapi import ChatGPT, CommandHandler from .MemcacheStorage import MemcacheStorage +# 初始化微信机器人实例,配置token和启用session功能 robot = WeRoBot(token=os.environ.get('DJANGO_WEROBOT_TOKEN') or 'lylinux', enable_session=True) +# 创建Memcache存储实例用于session存储 memstorage = MemcacheStorage() +# 根据存储可用性配置机器人的session存储方式 if memstorage.is_available: robot.config['SESSION_STORAGE'] = memstorage else: + # 如果文件存储存在则删除旧文件,使用文件存储作为session存储 if os.path.exists(os.path.join(settings.BASE_DIR, 'werobot_session')): os.remove(os.path.join(settings.BASE_DIR, 'werobot_session')) robot.config['SESSION_STORAGE'] = FileStorage(filename='werobot_session') +# 初始化博客API和命令处理器实例 blogapi = BlogApi() cmd_handler = CommandHandler() logger = logging.getLogger(__name__) def convert_to_article_reply(articles, message): + """ + 将文章列表转换为微信文章回复格式 + + Args: + articles: 文章对象列表 + message: 微信消息对象 + + Returns: + ArticlesReply: 微信文章回复对象 + """ reply = ArticlesReply(message=message) from blog.templatetags.blog_tags import truncatechars_content for post in articles: + # 提取文章中的图片URL imgs = re.findall(r'(?:http\:|https\:)?\/\/.*\.(?:png|jpg)', post.body) imgurl = '' if imgs: imgurl = imgs[0] + # 创建单篇文章对象 article = Article( title=post.title, description=truncatechars_content(post.body), @@ -48,6 +65,16 @@ def convert_to_article_reply(articles, message): @robot.filter(re.compile(r"^\?.*")) def search(message, session): + """ + 处理文章搜索请求,根据关键词搜索文章并返回结果 + + Args: + message: 微信消息对象,包含搜索关键词 + session: 用户会话对象 + + Returns: + ArticlesReply或str: 搜索结果或提示信息 + """ s = message.content searchstr = str(s).replace('?', '') result = blogapi.search_articles(searchstr) @@ -61,6 +88,16 @@ def search(message, session): @robot.filter(re.compile(r'^category\s*$', re.I)) def category(message, session): + """ + 获取所有文章分类目录信息 + + Args: + message: 微信消息对象 + session: 用户会话对象 + + Returns: + str: 包含所有分类名称的字符串 + """ categorys = blogapi.get_category_lists() content = ','.join(map(lambda x: x.name, categorys)) return '所有文章分类目录:' + content @@ -68,6 +105,16 @@ def category(message, session): @robot.filter(re.compile(r'^recent\s*$', re.I)) def recents(message, session): + """ + 获取最新发布的文章列表 + + Args: + message: 微信消息对象 + session: 用户会话对象 + + Returns: + ArticlesReply或str: 最新文章列表或提示信息 + """ articles = blogapi.get_recent_articles() if articles: reply = convert_to_article_reply(articles, message) @@ -78,6 +125,16 @@ def recents(message, session): @robot.filter(re.compile('^help$', re.I)) def help(message, session): + """ + 返回系统帮助信息,包含所有可用命令说明 + + Args: + message: 微信消息对象 + session: 用户会话对象 + + Returns: + str: 帮助信息文本 + """ return '''欢迎关注! 默认会与图灵机器人聊天~~ 你可以通过下面这些命令来获得信息 @@ -100,22 +157,61 @@ def help(message, session): @robot.filter(re.compile(r'^weather\:.*$', re.I)) def weather(message, session): + """ + 处理天气查询请求(待实现) + + Args: + message: 微信消息对象 + session: 用户会话对象 + + Returns: + str: 建设中提示信息 + """ return "建设中..." @robot.filter(re.compile(r'^idcard\:.*$', re.I)) def idcard(message, session): + """ + 处理身份证信息查询请求(待实现) + + Args: + message: 微信消息对象 + session: 用户会话对象 + + Returns: + str: 建设中提示信息 + """ return "建设中..." @robot.handler def echo(message, session): + """ + 主消息处理函数,创建消息处理器并处理用户消息 + + Args: + message: 微信消息对象 + session: 用户会话对象 + + Returns: + str或其他类型: 处理结果 + """ handler = MessageHandler(message, session) return handler.handler() class MessageHandler: + """微信消息处理器类,负责处理各种用户消息和命令""" + def __init__(self, message, session): + """ + 初始化消息处理器 + + Args: + message: 微信消息对象 + session: 用户会话对象 + """ userid = message.source self.message = message self.session = session @@ -129,27 +225,51 @@ class MessageHandler: @property def is_admin(self): + """ + 判断当前用户是否为管理员 + + Returns: + bool: 是否为管理员 + """ return self.userinfo.isAdmin @property def is_password_set(self): + """ + 判断管理员密码是否已设置 + + Returns: + bool: 密码是否已设置 + """ return self.userinfo.isPasswordSet def save_session(self): + """ + 保存用户会话信息到session中 + """ info = jsonpickle.encode(self.userinfo) self.session[self.userid] = info def handler(self): + """ + 主要的消息处理逻辑,根据用户状态和输入内容进行相应处理 + + Returns: + str: 处理结果响应文本 + """ info = self.message.content + # 处理管理员退出命令 if self.userinfo.isAdmin and info.upper() == 'EXIT': self.userinfo = WxUserInfo() self.save_session() return "退出成功" + # 处理管理员登录命令 if info.upper() == 'ADMIN': self.userinfo.isAdmin = True self.save_session() return "输入管理员密码" + # 处理管理员密码验证 if self.userinfo.isAdmin and not self.userinfo.isPasswordSet: passwd = settings.WXADMIN if settings.TESTING: @@ -159,6 +279,7 @@ class MessageHandler: self.save_session() return "验证通过,请输入命令或者要执行的命令代码:输入helpme获得帮助" else: + # 处理密码错误次数限制 if self.userinfo.Count >= 3: self.userinfo = WxUserInfo() self.save_session() @@ -166,6 +287,7 @@ class MessageHandler: self.userinfo.Count += 1 self.save_session() return "验证失败,请重新输入管理员密码:" + # 处理管理员命令执行 if self.userinfo.isAdmin and self.userinfo.isPasswordSet: if self.userinfo.Command != '' and info.upper() == 'Y': return cmd_handler.run(self.userinfo.Command) @@ -176,12 +298,19 @@ class MessageHandler: self.save_session() return "确认执行: " + info + " 命令?" + # 默认使用ChatGPT处理普通消息 return ChatGPT.chat(info) class WxUserInfo(): + """微信用户信息类,存储用户的状态信息""" + def __init__(self): + """ + 初始化用户信息,默认为非管理员状态 + """ self.isAdmin = False self.isPasswordSet = False self.Count = 0 self.Command = '' + diff --git a/src/DjangoBlog-master/servermanager/tests.py b/src/DjangoBlog-master/servermanager/tests.py index 22a6689..924179e 100644 --- a/src/DjangoBlog-master/servermanager/tests.py +++ b/src/DjangoBlog-master/servermanager/tests.py @@ -12,15 +12,32 @@ from .robot import search, category, recents # Create your tests here. class ServerManagerTest(TestCase): + """ + 服务器管理模块的测试类,用于测试聊天机器人、命令处理、文章搜索等功能。 + """ + def setUp(self): + """ + 测试初始化方法,在每个测试方法执行前运行。 + 创建用于模拟HTTP请求的Client和RequestFactory实例。 + """ self.client = Client() self.factory = RequestFactory() def test_chat_gpt(self): + """ + 测试ChatGPT聊天功能。 + 验证调用ChatGPT.chat方法能否返回非空内容。 + """ content = ChatGPT.chat("你好") self.assertIsNotNone(content) def test_validate_comment(self): + """ + 测试评论验证及相关功能,包括用户登录、文章创建、命令处理和消息处理等。 + 验证搜索、分类、最近文章、命令执行和消息处理等功能是否正常运行。 + """ + # 创建超级用户并登录 user = BlogUser.objects.create_superuser( email="liangliangyy1@gmail.com", username="liangliangyy1", @@ -28,10 +45,12 @@ class ServerManagerTest(TestCase): self.client.login(username='liangliangyy1', password='liangliangyy1') + # 创建分类 c = Category() c.name = "categoryccc" c.save() + # 创建文章 article = Article() article.title = "nicetitleccc" article.body = "nicecontentccc" @@ -40,23 +59,33 @@ class ServerManagerTest(TestCase): article.type = 'a' article.status = 'p' article.save() + + # 测试搜索功能 s = TextMessage([]) s.content = "nice" rsp = search(s, None) + + # 测试分类功能 rsp = category(None, None) self.assertIsNotNone(rsp) + + # 测试最近文章功能 rsp = recents(None, None) self.assertTrue(rsp != '暂时还没有文章') + # 创建并保存命令 cmd = commands() cmd.title = "test" cmd.command = "ls" cmd.describe = "test" cmd.save() + # 测试命令处理器 cmdhandler = CommandHandler() rsp = cmdhandler.run('test') self.assertIsNotNone(rsp) + + # 测试消息处理器的各种场景 s.source = 'u' s.content = 'test' msghandler = MessageHandler(s, {}) @@ -77,3 +106,5 @@ class ServerManagerTest(TestCase): s.content = 'exit' msghandler.handler() + + diff --git a/src/DjangoBlog-master/servermanager/urls.py b/src/DjangoBlog-master/servermanager/urls.py index 8d134d2..8b04251 100644 --- a/src/DjangoBlog-master/servermanager/urls.py +++ b/src/DjangoBlog-master/servermanager/urls.py @@ -5,6 +5,7 @@ from .robot import robot app_name = "servermanager" urlpatterns = [ + # 将微信机器人接口映射到/robot路径 path(r'robot', make_view(robot)), ] diff --git a/src/oauth代码注释.py b/src/oauth代码注释.py new file mode 100644 index 0000000..d3ec8ff --- /dev/null +++ b/src/oauth代码注释.py @@ -0,0 +1,442 @@ +""" +OAuth认证模块 +提供第三方登录功能,支持微信、微博、GitHub、Google、QQ、Facebook等平台 +""" + +import logging +import json +import urllib.parse +from abc import ABC, abstractmethod +from django.urls import reverse +from django.shortcuts import get_object_or_404, render +from django.http import HttpResponseRedirect, HttpResponseForbidden +from django.contrib import auth +from django.utils.translation import gettext_lazy as _ +from .models import OAuthUser, OAuthConfig + +# 获取日志器 +logger = logging.getLogger(__name__) + + +class BaseOauthManager(ABC): + """OAuth认证管理器基类""" + + # 授权URL和API端点 + AUTH_URL = "" + TOKEN_URL = "" + OPEN_ID_URL = "" + USER_INFO_URL = "" + ICON_NAME = "" # 平台图标名称 + + def __init__(self): + """初始化OAuth管理器""" + self.access_token = None + self.openid = None + self.client_id = None + self.client_secret = None + self.callback_url = None + + @abstractmethod + def get_authorization_url(self, next_url='/'): + """获取授权URL""" + pass + + @abstractmethod + def get_access_token_by_code(self, code): + """通过授权码获取访问令牌""" + pass + + @abstractmethod + def get_oauth_userinfo(self): + """获取用户信息""" + pass + + def get_picture(self, metadata): + """获取用户头像(可选实现)""" + return "" + + def do_get(self, url, params, headers=None): + """ + 执行GET请求 + + Args: + url: 请求URL + params: 请求参数 + headers: 请求头 + + Returns: + str: 响应内容 + """ + try: + response = requests.get(url=url, params=params, headers=headers) + logger.info(f"GET Response: {response.text}") + return response.text + except Exception as e: + logger.error(f"GET request failed: {e}") + raise + + def do_post(self, url, params, headers=None): + """ + 执行POST请求 + + Args: + url: 请求URL + params: 请求参数 + headers: 请求头 + + Returns: + str: 响应内容 + """ + try: + response = requests.post(url, data=params, headers=headers) + logger.info(f"POST Response: {response.text}") + return response.text + except Exception as e: + logger.error(f"POST request failed: {e}") + raise + + @property + def is_access_token_set(self): + """检查访问令牌是否已设置""" + return self.access_token is not None + + def get_config(self): + """获取OAuth配置""" + try: + config = OAuthConfig.objects.filter( + type=self.ICON_NAME.lower(), + is_enable=True + ).first() + if config: + self.client_id = config.appkey + self.client_secret = config.appsecret + self.callback_url = config.callback_url + return config + except Exception as e: + logger.error(f"Get OAuth config failed: {e}") + return None + + +class WeiboOauthManager(BaseOauthManager): + """微博OAuth认证管理器""" + + AUTH_URL = "https://api.weibo.com/oauth2/authorize" + TOKEN_URL = "https://api.weibo.com/oauth2/access_token" + USER_INFO_URL = "https://api.weibo.com/2/users/show.json" + ICON_NAME = "weibo" + + def get_authorization_url(self, next_url='/'): + """ + 获取微博授权URL + + Args: + next_url: 授权成功后跳转的URL + + Returns: + str: 完整的授权URL + """ + if not self.get_config(): + return "" + + params = { + 'client_id': self.client_id, + 'response_type': 'code', + 'redirect_uri': self.callback_url + '&next_url=' + next_url, + } + url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) + return url + + def get_access_token_by_code(self, code): + """ + 通过授权码获取访问令牌 + + Args: + code: 授权码 + + Returns: + tuple: (access_token, 响应数据) + """ + params = { + 'grant_type': 'authorization_code', + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'code': code, + 'redirect_uri': self.callback_url + } + + try: + response_text = self.do_post(self.TOKEN_URL, params) + token_data = json.loads(response_text) + + if 'access_token' in token_data: + self.access_token = token_data['access_token'] + return self.access_token, token_data + else: + logger.error(f"Failed to get access token: {token_data}") + raise OAuthAccessTokenException(token_data) + + except json.JSONDecodeError as e: + logger.error(f"JSON decode error: {e}") + raise OAuthAccessTokenException("Invalid response format") + except Exception as e: + logger.error(f"Get access token failed: {e}") + raise OAuthAccessTokenException(str(e)) + + def get_oauth_userinfo(self): + """ + 获取微博用户信息 + + Returns: + OAuthUser: 用户信息对象 + """ + if not self.is_access_token_set: + raise ValueError("Access token not set") + + params = {'access_token': self.access_token} + response_text = self.do_get(self.USER_INFO_URL, params) + user_data = json.loads(response_text) + + # 创建OAuth用户对象 + oauth_user = OAuthUser() + oauth_user.nickname = user_data.get('screen_name', '') + oauth_user.picture = user_data.get('profile_image_url', '') + oauth_user.token = self.access_token + oauth_user.type = 'weibo' + oauth_user.email = user_data.get('email', '') + oauth_user.metadata = json.dumps(user_data) + + return oauth_user + + +class GitHubOauthManager(BaseOauthManager): + """GitHub OAuth认证管理器""" + + AUTH_URL = "https://github.com/login/oauth/authorize" + TOKEN_URL = "https://github.com/login/oauth/access_token" + USER_INFO_URL = "https://api.github.com/user" + ICON_NAME = "github" + + def get_authorization_url(self, next_url='/'): + """获取GitHub授权URL""" + if not self.get_config(): + return "" + + params = { + 'client_id': self.client_id, + 'scope': 'user:email', + 'redirect_uri': self.callback_url + '&next_url=' + next_url, + } + url = self.AUTH_URL + "?" + urllib.parse.urlencode(params) + return url + + def get_access_token_by_code(self, code): + """通过授权码获取GitHub访问令牌""" + params = { + 'client_id': self.client_id, + 'client_secret': self.client_secret, + 'code': code, + } + headers = {'Accept': 'application/json'} + + response_text = self.do_post(self.TOKEN_URL, params, headers) + token_data = json.loads(response_text) + + if 'access_token' in token_data: + self.access_token = token_data['access_token'] + return self.access_token, token_data + else: + raise OAuthAccessTokenException(token_data) + + def get_oauth_userinfo(self): + """获取GitHub用户信息""" + if not self.is_access_token_set: + raise ValueError("Access token not set") + + headers = {'Authorization': f'token {self.access_token}'} + response_text = self.do_get(self.USER_INFO_URL, headers=headers) + user_data = json.loads(response_text) + + oauth_user = OAuthUser() + oauth_user.nickname = user_data.get('login', '') + oauth_user.picture = user_data.get('avatar_url', '') + oauth_user.token = self.access_token + oauth_user.type = 'github' + oauth_user.email = user_data.get('email', '') + oauth_user.metadata = json.dumps(user_data) + + return oauth_user + + +# OAuth异常类 +class OAuthException(Exception): + """OAuth异常基类""" + pass + + +class OAuthAccessTokenException(OAuthException): + """访问令牌获取异常""" + pass + + +class OAuthUserInfoException(OAuthException): + """用户信息获取异常""" + pass + + +# 工具函数 +def get_oauth_apps(): + """ + 获取所有可用的OAuth应用 + + Returns: + list: OAuth管理器实例列表 + """ + config_types = OAuthConfig.objects.filter( + is_enable=True + ).values_list('type', flat=True) + + applications = BaseOauthManager.__subclasses__() + apps = [ + app() for app in applications + if app().ICON_NAME.lower() in config_types + ] + return apps + + +def get_manager_by_type(oauth_type): + """ + 根据类型获取OAuth管理器 + + Args: + oauth_type: OAuth类型(weibo、github等) + + Returns: + BaseOauthManager: OAuth管理器实例 + """ + applications = get_oauth_apps() + if applications: + finds = list(filter( + lambda x: x.ICON_NAME.lower() == oauth_type.lower(), + applications + )) + return finds[0] if finds else None + return None + + +# 视图函数 +def oauth_login(request): + """ + OAuth登录入口 + + Args: + request: HTTP请求对象 + + Returns: + HttpResponseRedirect: 重定向到授权页面或首页 + """ + oauth_type = request.GET.get('type') # 修复:避免使用内置函数名 + if not oauth_type: + return HttpResponseRedirect('/') + + manager = get_manager_by_type(oauth_type) + if not manager: + return HttpResponseRedirect('/') + + next_url = request.GET.get('next_url', '/') + authorize_url = manager.get_authorization_url(next_url) + return HttpResponseRedirect(authorize_url) + + +def oauth_authorize(request): + """ + OAuth授权回调处理 + + Args: + request: HTTP请求对象 + + Returns: + HttpResponse: 授权结果页面或重定向 + """ + oauth_type = request.GET.get('type') # 修复:避免使用内置函数名 + code = request.GET.get('code') + next_url = request.GET.get('next_url', '/') + + if not oauth_type or not code: + return HttpResponseRedirect('/') + + try: + manager = get_manager_by_type(oauth_type) + if not manager: + return HttpResponseRedirect('/') + + # 获取访问令牌 + manager.get_access_token_by_code(code) + + # 获取用户信息 + oauth_user = manager.get_oauth_userinfo() + + # 处理用户登录或绑定 + user = process_oauth_user(oauth_user, request) + if user: + auth.login(request, user) + return HttpResponseRedirect(next_url) + else: + # 转到绑定页面 + return redirect_to_bind_page(oauth_user, request) + + except OAuthException as e: + logger.error(f"OAuth authorization failed: {e}") + return render(request, 'oauth/error.html', { + 'error_message': _('OAuth authentication failed') + }) + + +def process_oauth_user(oauth_user, request): + """ + 处理OAuth用户信息 + + Args: + oauth_user: OAuth用户对象 + request: HTTP请求对象 + + Returns: + User: 认证用户对象或None + """ + try: + # 查找已存在的OAuth用户 + existing_oauth_user = OAuthUser.objects.filter( + type=oauth_user.type, + openid=oauth_user.openid + ).first() + + if existing_oauth_user: + # 已绑定用户,直接登录 + if existing_oauth_user.author: + return existing_oauth_user.author + else: + # 未绑定用户,转到绑定页面 + return None + else: + # 新用户,保存OAuth信息并转到绑定页面 + oauth_user.save() + return None + + except Exception as e: + logger.error(f"Process OAuth user failed: {e}") + return None + + +def redirect_to_bind_page(oauth_user, request): + """ + 重定向到用户绑定页面 + + Args: + oauth_user: OAuth用户对象 + request: HTTP请求对象 + + Returns: + HttpResponseRedirect: 重定向到绑定页面 + """ + # 生成绑定URL + bind_url = reverse('oauth:bind') + f'?oauth_id={oauth_user.id}' + return HttpResponseRedirect(bind_url) \ No newline at end of file