代码注释

zjj_branch
周俊杰 2 months ago
parent 4ae5ff5fcb
commit b334590d1f

@ -1,64 +1,81 @@
# fix_pet_blog.py
# 引入操作系统接口模块,用于设置环境变量
import os
# 引入 Django 模块,用于初始化 Django 环境
import django
# 设置 Django 的 settings 模块为 'djangoblog.settings'
# 这一步是必须的,以便 Django 知道使用哪个配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoblog.settings')
# 初始化 Django 环境,加载所有应用和配置
django.setup()
# 从 blog 应用的 models 模块中导入 Article文章、Category分类、Tag标签模型
from blog.models import Article, Category, Tag
# 从 accounts 应用的 models 模块中导入 BlogUser博客用户模型
from accounts.models import BlogUser
# 打印脚本开始信息
print("=== 修复宠物博客数据 ===")
# 获取用户
# 获取第一个用户对象,假设这是博客的管理员或主要用户
# 如果 BlogUser 表中没有任何用户,这个查询将返回 None
user = BlogUser.objects.first()
# 检查是否找到了用户
if not user:
# 如果没有找到任何用户,打印错误信息并退出脚本
print("错误:没有找到用户")
exit()
# 为每个分类创建至少一篇文章
# 定义一个包含多篇文章数据的列表
# 每个元素是一个字典,包含文章的标题、内容、分类名称和标签列表
articles_data = [
# 狗狗日常
# 狗狗日常分类下的文章
{
'title': '我家狗狗的表演',
'body': '欲擒故纵,你们见过吗。就是要出去的时候,故意离你远远的,让人你没法给它套上项圈,其实它很想出去。',
'category': '狗狗日常',
'tags': ['图文', '狗狗社交', '遛狗']
'title': '我家狗狗的表演', # 文章标题
'body': '欲擒故纵,你们见过吗。就是要出去的时候,故意离你远远的,让人你没法给它套上项圈,其实它很想出去。', # 文章内容
'category': '狗狗日常', # 分类名称
'tags': ['图文', '狗狗社交', '遛狗'] # 标签列表
},
# 猫咪生活
# 猫咪生活分类下的文章
{
'title': '猫咪的日常护理',
'body': '定期为猫咪梳理毛发,保持清洁,注意观察猫咪的健康状况。',
'category': '猫咪生活',
'tags': ['宠物美容', '宠物健康']
},
# 宠物健康
# 宠物健康分类下的文章
{
'title': '宠物健康检查指南',
'body': '定期带宠物进行健康检查,注意疫苗接种和驱虫的重要性。',
'category': '宠物健康',
'tags': ['宠物医疗', '宠物健康']
},
# 训练技巧
# 训练技巧分类下的文章
{
'title': '如何训练狗狗坐下',
'body': '使用零食诱导,当狗狗完成动作时及时奖励,重复训练。',
'category': '训练技巧',
'tags': ['训练方法', '图文']
},
# 宠物用品
# 宠物用品分类下的文章
{
'title': '推荐几款好用的宠物玩具',
'body': '这些玩具既安全又有趣,能让宠物保持活跃和快乐。',
'category': '宠物用品',
'tags': ['宠物玩具', '宠物用品']
},
# 额外文章确保内容丰富
# 额外文章确保狗狗日常分类有更多内容
{
'title': '带狗狗散步的注意事项',
'body': '选择合适的牵引绳,注意天气和路况,确保狗狗的安全。',
'category': '狗狗日常',
'tags': ['遛狗', '狗狗社交']
},
# 额外文章:确保宠物健康分类有更多内容
{
'title': '猫咪饮食健康指南',
'body': '了解猫咪的营养需求,选择合适的猫粮和零食。',
@ -67,30 +84,50 @@ articles_data = [
}
]
# 删除现有文章,重新创建
# 删除数据库中所有的现有文章
# 注意:这将永久删除所有文章,谨慎操作!
Article.objects.all().delete()
print("已清理现有文章")
# 创建文章
# 遍历 articles_data 列表中的每一篇文章数据,逐个创建文章
for data in articles_data:
try:
# 根据分类名称从 Category 模型中获取对应的分类对象
# 如果找不到对应的分类,这里会抛出 Category.DoesNotExist 异常
category = Category.objects.get(name=data['category'])
# 创建一个新的 Article 对象,并保存到数据库中
article = Article.objects.create(
title=data['title'],
body=data['body'],
author=user,
category=category,
status='p'
title=data['title'], # 设置文章标题
body=data['body'], # 设置文章内容
author=user, # 设置文章作者为之前获取的用户
category=category, # 设置文章分类
status='p' # 设置文章状态为 'p'(通常代表已发布)
)
# 遍历当前文章数据中的每一个标签名称
for tag_name in data['tags']:
# 根据标签名称获取或创建一个 Tag 对象
# 如果标签不存在,则创建一个新的标签
tag, _ = Tag.objects.get_or_create(name=tag_name)
# 将该标签添加到文章的标签集合中
article.tags.add(tag)
# 打印成功创建文章的信息,包括文章标题和所属分类
print(f'创建文章: {data["title"]} (分类: {data["category"]})')
except Exception as e:
# 如果在创建文章的过程中发生任何异常,打印错误信息,包括文章标题和异常详情
print(f'创建文章失败 {data["title"]}: {e}')
# 打印脚本完成信息
print("=== 修复完成 ===")
# 打印当前数据库中所有文章的总数
print(f"总文章数: {Article.objects.count()}")
# 遍历所有分类,打印每个分类的名称及其下的文章数量
for category in Category.objects.all():
count = Article.objects.filter(category=category).count()
print(f"分类 '{category.name}': {count} 篇文章")

@ -1,22 +1,58 @@
#!/usr/bin/env python
"""
此脚本是 Django 项目的命令行管理入口通常命名为 manage.py
它允许您通过命令行执行各种 Django 管理任务如运行开发服务器执行数据库迁移启动交互式 Shell
使用方法
python manage.py <command> [options]
常用命令示例
python manage.py runserver # 启动开发服务器
python manage.py migrate # 执行数据库迁移
python manage.py createsuperuser # 创建超级用户
python manage.py shell # 启动 Django Shell
"""
# 引入 Python 的标准库模块 os用于与操作系统交互如设置环境变量
import os
# 引入 Python 的标准库模块 sys用于访问与 Python 解释器紧密相关的变量和函数,如命令行参数
import sys
# __name__ 是当前模块的名称当此脚本作为主程序运行时__name__ 的值为 '__main__'
if __name__ == "__main__":
"""
设置 Django settings 模块环境变量
Django 需要知道使用哪个设置模块来加载项目的配置
通常这个设置模块的路径是 '项目名称.settings'例如 'djangoblog.settings'
"""
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djangoblog.settings")
try:
"""
尝试从 django.core.management 模块中导入 execute_from_command_line 函数
该函数负责解析命令行参数并调用相应的 Django 管理命令
"""
from django.core.management import execute_from_command_line
except ImportError:
# The above import may fail for some other reason. Ensure that the
# issue is really that Django is missing to avoid masking other
# exceptions on Python 2.
# 如果导入 django.core.management 失败,可能是由于 Django 未安装或不在 Python 路径中。
# 为了更准确地诊断问题,首先尝试导入 django 模块本身。
try:
import django
except ImportError:
# 如果连 django 模块都无法导入,说明 Django 未正确安装或不在 PYTHONPATH 中。
# 抛出一个明确的 ImportError提示用户检查 Django 是否安装以及虚拟环境是否激活。
raise ImportError(
"Couldn't import Django. Are you sure it's installed and "
"available on your PYTHONPATH environment variable? Did you "
"forget to activate a virtual environment?"
"无法导入 Django。请确保 Django 已正确安装并且 "
"在您的 PYTHONPATH 环境变量中可用。您是否忘记激活虚拟环境?"
)
raise
execute_from_command_line(sys.argv)
else:
# 如果 django 模块可以导入,但 django.core.management 无法导入,
# 这通常意味着 Django 安装不完整或存在其他问题。
# 重新抛出之前的 ImportError以便用户了解问题所在。
raise
# 如果成功导入了 execute_from_command_line 函数,
# 则调用该函数并传入命令行参数 sys.argv。
# sys.argv 是一个包含命令行参数的列表,其中 sys.argv[0] 是脚本名称,
# sys.argv[1:] 是传递给脚本的参数。
execute_from_command_line(sys.argv)

@ -1,37 +1,43 @@
# ============================
# 插件1文章结尾版权声明插件
# ============================
from djangoblog.plugin_manage.base_plugin import BasePlugin
from djangoblog.plugin_manage import hooks
from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
class ArticleCopyrightPlugin(BasePlugin):
"""
功能在文章正文末尾添加版权声明标明文章作者提醒转载需注明出处
"""
PLUGIN_NAME = '文章结尾版权声明'
PLUGIN_DESCRIPTION = '一个在文章正文末尾添加版权声明的插件。'
PLUGIN_VERSION = '0.2.0'
PLUGIN_AUTHOR = 'liangliangyy'
# 2. 实现 register_hooks 方法,专门用于注册钩子
def register_hooks(self):
# 在这里将插件的方法注册到指定的钩子上
# 将本插件的版权添加方法注册到文章内容钩子上
hooks.register(ARTICLE_CONTENT_HOOK_NAME, self.add_copyright_to_content)
def add_copyright_to_content(self, content, *args, **kwargs):
"""
这个方法会被注册到 'the_content' 过滤器钩子上
它接收原始内容并返回添加了版权信息的新内容
给文章内容追加版权声明信息
:param content: 原始文章内容
:param kwargs: 可能包含 article文章对象is_summary是否摘要模式
:return: 添加了版权声明的新内容
"""
article = kwargs.get('article')
if not article:
return content
# 如果是摘要模式(首页),不添加版权声明
is_summary = kwargs.get('is_summary', False)
return content # 没有文章对象,直接返回原文
is_summary = kwargs.get('is_summary', False) # 是否为摘要(如首页列表页)
if is_summary:
return content
return content # 摘要模式下不显示版权信息
# 拼接版权声明 HTML
copyright_info = f"\n<hr><p>本文由 {article.author.username} 原创,转载请注明出处。</p>"
return content + copyright_info
# 3. 实例化插件。
# 这会自动调用 BasePlugin.__init__然后 BasePlugin.__init__ 会调用我们上面定义的 register_hooks 方法。
plugin = ArticleCopyrightPlugin()
# 实例化插件,自动调用 register_hooks 方法
plugin = ArticleCopyrightPlugin()

@ -1,3 +1,6 @@
# ============================
# 插件2外部链接处理器插件
# ============================
import re
from urllib.parse import urlparse
from djangoblog.plugin_manage.base_plugin import BasePlugin
@ -6,43 +9,47 @@ from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
class ExternalLinksPlugin(BasePlugin):
"""
功能自动为文章中的外部链接添加 target="_blank" rel="noopener noreferrer"
提高安全性防止标签页劫持
"""
PLUGIN_NAME = '外部链接处理器'
PLUGIN_DESCRIPTION = '自动为文章中的外部链接添加 target="_blank" 和 rel="noopener noreferrer" 属性。'
PLUGIN_VERSION = '0.1.0'
PLUGIN_AUTHOR = 'liangliangyy'
def register_hooks(self):
# 注册处理函数到文章内容钩子
hooks.register(ARTICLE_CONTENT_HOOK_NAME, self.process_external_links)
def process_external_links(self, content, *args, **kwargs):
"""
查找并处理文章中的所有 <a> 标签为外部链接添加安全属性
"""
from djangoblog.utils import get_current_site
site_domain = get_current_site().domain
site_domain = get_current_site().domain # 当前网站域名
# 正则表达式查找所有 <a> 标签
# 正则匹配 <a href="...">...</a>
link_pattern = re.compile(r'(<a\s+(?:[^>]*?\s+)?href=")([^"]*)(".*?/a>)', re.IGNORECASE)
def replacer(match):
# match.group(1) 是 <a ... href="
# match.group(2) 是链接 URL
# match.group(3) 是 ">...</a>
href = match.group(2)
prefix = match.group(1) # <a ... href="
href = match.group(2) # 链接地址
suffix = match.group(3) # ">...</a>
# 如果链接已经有 target 属性,不处理
# 如果已经有 target 属性,不处理
if 'target=' in match.group(0).lower():
return match.group(0)
# 解析链接
parsed_url = urlparse(href)
# 如果链接是外部的 (有域名且域名不等于当前网站域名)
# 判断是否为外部链接(有域名且非本站)
if parsed_url.netloc and parsed_url.netloc != site_domain:
# 添加 target 和 rel 属性
return f'{match.group(1)}{href}" target="_blank" rel="noopener noreferrer"{match.group(3)}'
# 否则返回原样
return match.group(0)
# 添加安全属性
return f'{prefix}{href}" target="_blank" rel="noopener noreferrer"{suffix}'
return match.group(0) # 内部链接,不处理
# 替换所有符合条件的链接
return link_pattern.sub(replacer, content)
plugin = ExternalLinksPlugin()
plugin = ExternalLinksPlugin()

@ -1,3 +1,6 @@
# ============================
# 插件3图片性能优化插件
# ============================
import re
import hashlib
from urllib.parse import urlparse
@ -7,170 +10,130 @@ from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
class ImageOptimizationPlugin(BasePlugin):
"""
功能为文章中的 标签添加懒加载异步解码响应式alt等属性
优化页面加载性能和用户体验
"""
PLUGIN_NAME = '图片性能优化插件'
PLUGIN_DESCRIPTION = '自动为文章中的图片添加懒加载、异步解码等性能优化属性,显著提升页面加载速度。'
PLUGIN_VERSION = '1.0.0'
PLUGIN_AUTHOR = 'liangliangyy'
def __init__(self):
# 插件配置
# 插件配置参数
self.config = {
'enable_lazy_loading': True, # 启用懒加载
'enable_async_decoding': True, # 启用异步解码
'add_loading_placeholder': True, # 添加加载占位符
'optimize_external_images': True, # 优化外部图片
'add_responsive_attributes': True, # 添加响应式属性
'skip_first_image': True, # 跳过第一张图片LCP优化
'enable_lazy_loading': True, # 是否启用懒加载
'enable_async_decoding': True, # 是否启用异步解码
'add_loading_placeholder': True, # 是否添加加载样式
'optimize_external_images': True, # 是否优化外部图片
'add_responsive_attributes': True, # 是否添加响应式属性
'skip_first_image': True, # 是否跳过第一张图片(优化LCP
}
super().__init__()
def register_hooks(self):
# 注册图片优化函数到文章内容钩子
hooks.register(ARTICLE_CONTENT_HOOK_NAME, self.optimize_images)
def optimize_images(self, content, *args, **kwargs):
"""
优化文章中的图片标签
"""
if not content:
return content
# 正则表达式匹配 img 标签
img_pattern = re.compile(
r'<img\s+([^>]*?)(?:\s*/)?>',
re.IGNORECASE | re.DOTALL
)
# 匹配所有 标签
img_pattern = re.compile(r']*?)(?:\s*/)?>', re.IGNORECASE | re.DOTALL)
image_count = 0
def replace_img_tag(match):
nonlocal image_count
image_count += 1
# 获取原始属性
original_attrs = match.group(1)
# 解析现有属性
attrs = self._parse_img_attributes(original_attrs)
# 应用优化
optimized_attrs = self._apply_optimizations(attrs, image_count)
# 重构 img 标签
return self._build_img_tag(optimized_attrs)
# 替换所有 img 标签
optimized_content = img_pattern.sub(replace_img_tag, content)
return optimized_content
def _parse_img_attributes(self, attr_string):
"""
解析 img 标签的属性
"""
# 解析如 src="x" alt="y" 这类属性为字典
attrs = {}
# 正则表达式匹配属性
attr_pattern = re.compile(r'(\w+)=(["\'])(.*?)\2')
for match in attr_pattern.finditer(attr_string):
attr_name = match.group(1).lower()
attr_value = match.group(3)
attrs[attr_name] = attr_value
key = match.group(1).lower()
val = match.group(3)
attrs[key] = val
return attrs
def _apply_optimizations(self, attrs, image_index):
"""
应用各种图片优化
"""
# 1. 懒加载优化跳过第一张图片以优化LCP
# 懒加载:跳过第一张图片
if self.config['enable_lazy_loading']:
if not (self.config['skip_first_image'] and image_index == 1):
if 'loading' not in attrs:
attrs['loading'] = 'lazy'
# 2. 异步解码
# 异步解码
if self.config['enable_async_decoding']:
if 'decoding' not in attrs:
attrs['decoding'] = 'async'
# 3. 添加样式优化
# 样式优化:限制最大宽度
current_style = attrs.get('style', '')
# 确保图片不会超出容器
if 'max-width' not in current_style:
if current_style and not current_style.endswith(';'):
current_style += ';'
current_style += 'max-width:100%;height:auto;'
attrs['style'] = current_style
# 4. 添加 alt 属性SEO和可访问性
# Alt 属性提升可访问性和SEO
if 'alt' not in attrs:
# 尝试从图片URL生成有意义的alt文本
src = attrs.get('src', '')
if src:
# 从文件名生成alt文本
filename = src.split('/')[-1].split('.')[0]
# 移除常见的无意义字符
clean_name = re.sub(r'[0-9a-f]{8,}', '', filename) # 移除长hash
clean_name = re.sub(r'[0-9a-f]{8,}', '', filename)
clean_name = re.sub(r'[_-]+', ' ', clean_name).strip()
attrs['alt'] = clean_name if clean_name else '文章图片'
else:
attrs['alt'] = '文章图片'
# 5. 外部图片优化
# 外部图片添加referrer和跨域属性
if self.config['optimize_external_images'] and 'src' in attrs:
src = attrs['src']
parsed_url = urlparse(src)
# 如果是外部图片,添加 referrerpolicy
if parsed_url.netloc and parsed_url.netloc != self._get_current_domain():
attrs['referrerpolicy'] = 'no-referrer-when-downgrade'
# 为外部图片添加crossorigin属性以支持性能监控
if 'crossorigin' not in attrs:
attrs['crossorigin'] = 'anonymous'
# 6. 响应式图片属性(如果配置启用)
# 响应式属性
if self.config['add_responsive_attributes']:
# 添加 sizes 属性(如果没有的话)
if 'sizes' not in attrs and 'srcset' not in attrs:
attrs['sizes'] = '(max-width: 768px) 100vw, (max-width: 1024px) 50vw, 33vw'
# 7. 添加图片唯一标识符用于性能追踪
# 图片唯一ID用于性能追踪
if 'data-img-id' not in attrs and 'src' in attrs:
img_hash = hashlib.md5(attrs['src'].encode()).hexdigest()[:8]
attrs['data-img-id'] = f'img-{img_hash}'
# 8. 为第一张图片添加高优先级提示LCP优化
# 第一张图片优化:提高加载优先级
if image_index == 1 and self.config['skip_first_image']:
attrs['fetchpriority'] = 'high'
# 移除懒加载以确保快速加载
if 'loading' in attrs:
del attrs['loading']
return attrs
def _build_img_tag(self, attrs):
"""
重新构建 img 标签
"""
# 重新构建优化后的 标签
attr_strings = []
# 确保 src 属性在最前面
if 'src' in attrs:
attr_strings.append(f'src="{attrs["src"]}"')
# 添加其他属性
for key, value in attrs.items():
if key != 'src': # src 已经添加过了
if key != 'src':
attr_strings.append(f'{key}="{value}"')
return f'<img {" ".join(attr_strings)}>'
return f''
def _get_current_domain(self):
"""
获取当前网站域名
"""
# 获取当前站点域名
try:
from djangoblog.utils import get_current_site
return get_current_site().domain
@ -178,5 +141,4 @@ class ImageOptimizationPlugin(BasePlugin):
return ''
# 实例化插件
plugin = ImageOptimizationPlugin()
plugin = ImageOptimizationPlugin()

@ -1,3 +1,6 @@
# ============================
# 插件4阅读时间预测插件
# ============================
import math
import re
from djangoblog.plugin_manage.base_plugin import BasePlugin
@ -6,46 +9,50 @@ from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
class ReadingTimePlugin(BasePlugin):
"""
功能根据文章内容的字数估算阅读时间并在文章开头显示预计阅读时间
提升用户对内容长度的预期仅对文章详情页生效
"""
PLUGIN_NAME = '阅读时间预测'
PLUGIN_DESCRIPTION = '估算文章阅读时间并显示在文章开头。'
PLUGIN_VERSION = '0.1.0'
PLUGIN_AUTHOR = 'liangliangyy'
def register_hooks(self):
# 注册到文章内容钩子,在渲染文章内容时调用
hooks.register(ARTICLE_CONTENT_HOOK_NAME, self.add_reading_time)
def add_reading_time(self, content, *args, **kwargs):
"""
计算阅读时间并添加到内容开头
只在文章详情页显示首页文章列表页不显示
计算阅读时间并插入到文章内容最前面仅非摘要模式如非首页
:param content: 原始文章内容
:param kwargs: 可能包含 is_summary是否为摘要模式如首页
:return: 添加了阅读时间提示的文章内容
"""
# 检查是否为摘要模式(首页/文章列表页)
# 通过kwargs中的is_summary参数判断
is_summary = kwargs.get('is_summary', False)
if is_summary:
# 如果是摘要模式(首页),直接返回原内容,不添加阅读时间
# 如果是摘要模式(如首页文章列表),不显示阅读时间
return content
# 移除HTML标签和空白字符以获得纯文本
# 去掉所有 HTML 标签,只保留纯文本内容
clean_content = re.sub(r'<[^>]*>', '', content)
clean_content = clean_content.strip()
# 中文和英文单词混合计数的一个简单方法
# 匹配中文字符或连续的非中文字符(视为单词)
# 匹配中文字符或连续的英文字符/数字(简单模拟单词统计)
words = re.findall(r'[\u4e00-\u9fa5]|\w+', clean_content)
word_count = len(words)
# 按平均每分钟200字的速度计算
# 按每分钟 200 字计算阅读时间
reading_speed = 200
reading_minutes = math.ceil(word_count / reading_speed)
# 如果阅读时间少于1分钟则显示为1分钟
# 最少显示 1 分钟
if reading_minutes < 1:
reading_minutes = 1
# 拼接阅读时间提示 HTML
reading_time_html = f'<p style="color: #888;"><em>预计阅读时间:{reading_minutes} 分钟</em></p>'
return reading_time_html + content
plugin = ReadingTimePlugin()
plugin = ReadingTimePlugin()

@ -1,3 +1,6 @@
# ============================
# 插件5SEO 优化器插件
# ============================
import json
from django.utils.html import strip_tags
from django.template.defaultfilters import truncatewords
@ -8,12 +11,17 @@ from djangoblog.utils import get_blog_setting
class SeoOptimizerPlugin(BasePlugin):
"""
功能为文章详情页分类页等动态生成 SEO 相关的 meta 标签与 JSON-LD 结构化数据
优化搜索引擎收录效果和展示内容
"""
PLUGIN_NAME = 'SEO 优化器'
PLUGIN_DESCRIPTION = '为文章、页面等提供 SEO 优化,动态生成 meta 标签和 JSON-LD 结构化数据。'
PLUGIN_VERSION = '0.2.0'
PLUGIN_AUTHOR = 'liuangliangyy'
def register_hooks(self):
# 注册到 head_meta 钩子,一般用于向 <head> 中插入 SEO 相关标签
hooks.register('head_meta', self.dispatch_seo_generation)
def _get_article_seo_data(self, context, request, blog_setting):
@ -21,9 +29,11 @@ class SeoOptimizerPlugin(BasePlugin):
if not isinstance(article, Article):
return None
# 构造文章描述和关键词
description = strip_tags(article.body)[:150]
keywords = ",".join([tag.name for tag in article.tags.all()]) or blog_setting.site_keywords
# OpenGraph 和基础 Meta 标签
meta_tags = f'''
<meta property="og:type" content="article"/>
<meta property="og:title" content="{article.title}"/>
@ -38,6 +48,7 @@ class SeoOptimizerPlugin(BasePlugin):
meta_tags += f'<meta property="article:tag" content="{tag.name}"/>'
meta_tags += f'<meta property="og:site_name" content="{blog_setting.site_name}"/>'
# JSON-LD 结构化数据Schema.org
structured_data = {
"@context": "https://schema.org",
"@type": "Article",
@ -65,7 +76,6 @@ class SeoOptimizerPlugin(BasePlugin):
category_name = context.get('tag_name')
if not category_name:
return None
category = Category.objects.filter(name=category_name).first()
if not category:
return None
@ -74,10 +84,11 @@ class SeoOptimizerPlugin(BasePlugin):
description = strip_tags(category.name) or blog_setting.site_description
keywords = category.name
# BreadcrumbList structured data for category page
breadcrumb_items = [{"@type": "ListItem", "position": 1, "name": "首页", "item": request.build_absolute_uri('/')}]
breadcrumb_items.append({"@type": "ListItem", "position": 2, "name": category.name, "item": request.build_absolute_uri()})
# Breadcrumb 结构化数据
breadcrumb_items = [
{"@type": "ListItem", "position": 1, "name": "首页", "item": request.build_absolute_uri('/')},
{"@type": "ListItem", "position": 2, "name": category.name, "item": request.build_absolute_uri()}
]
structured_data = {
"@context": "https://schema.org",
"@type": "BreadcrumbList",
@ -93,7 +104,7 @@ class SeoOptimizerPlugin(BasePlugin):
}
def _get_default_seo_data(self, context, request, blog_setting):
# Homepage and other default pages
# 默认数据,例如首页
structured_data = {
"@context": "https://schema.org",
"@type": "WebSite",
@ -121,18 +132,20 @@ class SeoOptimizerPlugin(BasePlugin):
view_name = request.resolver_match.view_name
blog_setting = get_blog_setting()
seo_data = None
if view_name == 'blog:detailbyid':
seo_data = self._get_article_seo_data(context, request, blog_setting)
elif view_name == 'blog:category_detail':
seo_data = self._get_category_seo_data(context, request, blog_setting)
if not seo_data:
seo_data = self._get_default_seo_data(context, request, blog_setting)
seo_data = self._get_default_seo_data(context, request, blog_setting)
# 构建 JSON-LD 脚本标签
json_ld_script = f'<script type="application/ld+json">{json.dumps(seo_data.get("json_ld", {}), ensure_ascii=False, indent=4)}</script>'
# 拼接所有 SEO 相关内容
seo_html = f"""
<title>{seo_data.get("title", "")}</title>
<meta name="description" content="{seo_data.get("description", "")}">
@ -140,8 +153,7 @@ class SeoOptimizerPlugin(BasePlugin):
{seo_data.get("meta_tags", "")}
{json_ld_script}
"""
# 将SEO内容追加到现有的metas内容上
return metas + seo_html
plugin = SeoOptimizerPlugin()
plugin = SeoOptimizerPlugin()

@ -1,18 +1,26 @@
# ============================
# 插件6文章浏览次数统计插件
# ============================
from djangoblog.plugin_manage.base_plugin import BasePlugin
from djangoblog.plugin_manage import hooks
class ViewCountPlugin(BasePlugin):
"""
功能在每次获取文章内容时统计该文章的浏览次数用于分析文章热度
"""
PLUGIN_NAME = '文章浏览次数统计'
PLUGIN_DESCRIPTION = '统计文章的浏览次数'
PLUGIN_VERSION = '0.1.0'
PLUGIN_AUTHOR = 'liangliangyy'
def register_hooks(self):
# 注册到 after_article_body_get 钩子,通常在文章内容加载后触发
hooks.register('after_article_body_get', self.record_view)
def record_view(self, article, *args, **kwargs):
# 调用 article 对象的 viewed() 方法来增加浏览量(需模型方法支持)
article.viewed()
plugin = ViewCountPlugin()
plugin = ViewCountPlugin()

@ -1,32 +1,35 @@
from werobot.session import SessionStorage
from werobot.utils import json_loads, json_dumps
from djangoblog.utils import cache
from djangoblog.utils import cache # 假设这是一个封装了 Django 缓存的工具模块
class MemcacheStorage(SessionStorage):
def __init__(self, prefix='ws_'):
self.prefix = prefix
self.cache = cache
self.prefix = prefix # 会话键前缀,避免与其他缓存冲突
self.cache = cache # Django 缓存实例,如 Redis 或 Memcached
@property
def is_available(self):
# 检查当前存储是否可用,通过设置和获取一个测试值
value = "1"
self.set('checkavaliable', value=value)
return value == self.get('checkavaliable')
def key_name(self, s):
return '{prefix}{s}'.format(prefix=self.prefix, s=s)
# 为每个会话 ID 添加前缀,生成唯一的缓存键
return f'{self.prefix}{s}'
def get(self, id):
# 根据 ID 获取会话数据,如果不存在则返回空字典字符串 '{}'
id = self.key_name(id)
session_json = self.cache.get(id) or '{}'
return json_loads(session_json)
return json_loads(session_json) # 反序列化为 Python 字典
def set(self, id, value):
# 将会话数据序列化后存入缓存
id = self.key_name(id)
self.cache.set(id, json_dumps(value))
def delete(self, id):
# 删除指定的会话数据
id = self.key_name(id)
self.cache.delete(id)
self.cache.delete(id)

@ -1,19 +1,23 @@
from django.contrib import admin
# Register your models here.
# 假设 commands 和 EmailSendLog 是来自 .models 的模型,这里为了示例直接使用
# 实际使用时请确保 from .models import commands, EmailSendLog
class CommandsAdmin(admin.ModelAdmin):
# 在后台列表页显示这些字段
list_display = ('title', 'command', 'describe')
# 邮件发送日志的后台管理
class EmailSendLogAdmin(admin.ModelAdmin):
# 列表页显示字段
list_display = ('title', 'emailto', 'send_result', 'creation_time')
readonly_fields = (
'title',
'emailto',
'send_result',
'creation_time',
'content')
# 这些字段为只读,不允许在后台修改
readonly_fields = ('title', 'emailto', 'send_result', 'creation_time', 'content')
# 禁止通过后台添加新的日志条目,只能通过程序逻辑创建
def has_add_permission(self, request):
return False
# 注册模型与对应的管理类(通常在文件末尾,这里假设已导入 models
# admin.site.register(commands, CommandsAdmin)
# admin.site.register(EmailSendLog, EmailSendLogAdmin)

@ -1,27 +1,75 @@
# 从 Haystack 的查询模块导入 SearchQuerySet用于全文检索
from haystack.query import SearchQuerySet
# 从本地 blog 应用的 models 模块中导入 Article文章和 Category分类模型
from blog.models import Article, Category
class BlogApi:
def __init__(self):
"""
初始化 BlogApi
self.searchqueryset 是一个 Haystack SearchQuerySet 对象用于执行搜索
self.__max_takecount__ 是一个私有属性表示每次查询或获取时最多返回的文章数量这里设为 8
"""
# 创建一个 SearchQuerySet 实例,用于后续的搜索操作
self.searchqueryset = SearchQuerySet()
# 执行一个空的自动查询(暂时没有实际作用,可能为预留或初始化)
self.searchqueryset.auto_query('')
# 定义每次查询返回的最大文章数
self.__max_takecount__ = 8
def search_articles(self, query):
"""
根据关键字 query 搜索相关的文章
参数:
query (str): 用户输入的搜索关键词
返回:
SearchQuerySet: 包含匹配文章的查询集最多返回 __max_takecount__ 条结果
"""
# 使用 Haystack 根据 query 自动构建搜索
sqs = self.searchqueryset.auto_query(query)
# 加载所有关联的模型数据(比如加载完整的 Article 对象而不仅是搜索快照)
sqs = sqs.load_all()
# 返回前 __max_takecount__ 条搜索结果
return sqs[:self.__max_takecount__]
def get_category_lists(self):
"""
获取所有的文章分类列表
返回:
QuerySet: 包含所有 Category 对象的查询集
"""
return Category.objects.all()
def get_category_articles(self, categoryname):
"""
根据分类名称获取该分类下的文章列表
参数:
categoryname (str): 分类名称
返回:
QuerySet or None: 该分类下的文章查询集最多 __max_takecount__ 如果分类不存在则返回 None
"""
# 从 Article 表中筛选出 category__name 等于传入的 categoryname 的文章
articles = Article.objects.filter(category__name=categoryname)
if articles:
# 如果有文章,返回前 __max_takecount__ 条
return articles[:self.__max_takecount__]
# 如果该分类下没有文章,返回 None
return None
def get_recent_articles(self):
return Article.objects.all()[:self.__max_takecount__]
"""
获取最近发布的文章列表默认最新的几篇文章
返回:
QuerySet: 最新的文章查询集最多返回 __max_takecount__
"""
# 从 Article 表中获取所有文章,但只返回前 __max_takecount__ 条,通常可以按时间倒序优化
return Article.objects.all()[:self.__max_takecount__]

@ -1,64 +1,117 @@
# 导入 Python 标准库中的日志模块,用于记录错误和运行信息
import logging
# 导入 os 模块,用于访问环境变量和执行系统命令
import os
import openai
# 从本地的 servermanager 应用的 models 模块中导入 commands 模型(应该是一个存储命令的数据表)
from servermanager.models import commands
# 创建一个日志记录器,用于当前模块的日志输出
logger = logging.getLogger(__name__)
# 从环境变量中获取 OPENAI_API_KEY这是调用 OpenAI API 所必需的密钥
openai.api_key = os.environ.get('OPENAI_API_KEY')
# 如果环境变量中设置了 HTTP_PROXY则将其作为 OpenAI 的代理设置
if os.environ.get('HTTP_PROXY'):
openai.proxy = os.environ.get('HTTP_PROXY')
class ChatGPT:
@staticmethod
def chat(prompt):
"""
调用 OpenAI ChatCompletion 接口 GPT-3.5-turbo 模型发送用户提示并获取回复
参数:
prompt (str): 用户输入的对话内容或问题
返回:
str: GPT 模型返回的回复内容如果发生异常则返回错误提示信息
"""
try:
completion = openai.ChatCompletion.create(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}])
# 调用 OpenAI 的 ChatCompletion.create 方法,使用 gpt-3.5-turbo 模型
completion = openai.ChatCompletion.create(
model="gpt-3.5-turbo", # 指定使用的模型
messages=[{"role": "user", "content": prompt}] # 构造对话消息,角色为用户,内容为 prompt
)
# 从返回结果中提取第一个选择的回复内容
return completion.choices[0].message.content
except Exception as e:
# 如果出现任何异常如网络错误、API Key 错误等),记录错误日志
logger.error(e)
# 返回用户友好的错误提示
return "服务器出错了"
# 定义一个处理系统命令的类
class CommandHandler:
def __init__(self):
"""
初始化 CommandHandler从数据库中加载所有的命令记录
"""
# 从数据库中获取所有的命令对象,应该是存储在 commands 表中的数据
self.commands = commands.objects.all()
def run(self, title):
"""
运行命令
:param title: 命令
:return: 返回命令执行结果
根据命令标题 title 查找对应的命令并执行该命令
参数:
title (str): 命令的名称或标题
返回:
str: 命令执行后的输出内容如果未找到对应命令返回提示信息
"""
# 从所有命令中筛选出 title不区分大小写与传入参数一致的命令对象
cmd = list(
filter(
lambda x: x.title.upper() == title.upper(),
self.commands))
lambda x: x.title.upper() == title.upper(), # 不区分大小写匹配命令标题
self.commands
)
)
if cmd:
# 如果找到了命令,取出第一个匹配项的 command 字段(应该是实际的 shell 命令)
return self.__run_command__(cmd[0].command)
else:
# 如果未找到命令,返回提示让用户输入 helpme 获取帮助
return "未找到相关命令请输入hepme获得帮助。"
def __run_command__(self, cmd):
"""
内部方法用于实际执行传入的系统命令并返回结果
参数:
cmd (str): 要执行的系统命令字符串
返回:
str: 命令执行的输出内容如果执行出错返回错误提示
"""
try:
# 使用 os.popen 执行命令并读取命令的标准输出
res = os.popen(cmd).read()
return res
except BaseException:
return '命令执行出错!'
# 捕获所有可能的异常(如命令不存在、权限问题等)
return '命令执行出错!' # 返回用户友好的错误信息
def get_help(self):
rsp = ''
"""
获取所有可用命令的帮助信息包括命令标题和描述
返回:
str: 格式化后的命令帮助信息每行包含一个命令及其描述
"""
rsp = '' # 初始化返回的字符串
for cmd in self.commands:
# 遍历所有命令,格式化为 "命令标题:命令描述" 并追加到返回字符串中
rsp += '{c}:{d}\n'.format(c=cmd.title, d=cmd.describe)
return rsp
# 当该脚本被直接运行时(而不是作为模块导入),执行以下测试代码
if __name__ == '__main__':
# 创建一个 ChatGPT 类的实例
chatbot = ChatGPT()
# 设定一个示例 prompt要求写一篇关于 AI 的 1000 字论文
prompt = "写一篇1000字关于AI的论文"
print(chatbot.chat(prompt))
# 调用 chat 方法并打印返回的 GPT 回复
print(chatbot.chat(prompt))

@ -1,5 +1,5 @@
from django.apps import AppConfig
class ServermanagerConfig(AppConfig):
name = 'servermanager'
# 定义本 Django app 的名称,需与项目中的 app 文件夹名称一致
name = 'servermanager'

@ -1,45 +1,67 @@
# Generated by Django 4.1.7 on 2023-03-02 07:14
# 由 Django 4.1.7 于 2023-03-02 07:14 生成
from django.db import migrations, models
class Migration(migrations.Migration):
# 标记此迁移为初始迁移,即项目中的第一个迁移
initial = True
# 此迁移不依赖于其他迁移
dependencies = [
]
# 定义要执行的操作列表
operations = [
# 创建名为 'commands' 的模型
migrations.CreateModel(
name='commands',
fields=[
# 主键字段,自动生成的大整数,作为模型的唯一标识
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 命令标题最大长度为300个字符
('title', models.CharField(max_length=300, verbose_name='命令标题')),
# 具体的命令内容最大长度为2000个字符
('command', models.CharField(max_length=2000, verbose_name='命令')),
# 命令的描述信息最大长度为300个字符
('describe', models.CharField(max_length=300, verbose_name='命令描述')),
# 命令的创建时间,自动设置为对象首次创建时的时间
('created_time', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
# 命令的最后修改时间,每次保存对象时自动更新为当前时间
('last_mod_time', models.DateTimeField(auto_now=True, verbose_name='修改时间')),
],
options={
# 模型的单数显示名称
'verbose_name': '命令',
# 模型的复数显示名称
'verbose_name_plural': '命令',
},
),
# 创建名为 'EmailSendLog' 的模型,用于记录邮件发送日志
migrations.CreateModel(
name='EmailSendLog',
fields=[
# 主键字段,自动生成的大整数,作为模型的唯一标识
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 邮件的收件人最大长度为300个字符
('emailto', models.CharField(max_length=300, verbose_name='收件人')),
# 邮件的标题最大长度为2000个字符
('title', models.CharField(max_length=2000, verbose_name='邮件标题')),
# 邮件的内容,使用文本字段以支持较长的内容
('content', models.TextField(verbose_name='邮件内容')),
# 邮件发送的结果,布尔值,默认为 False 表示未成功
('send_result', models.BooleanField(default=False, verbose_name='结果')),
# 邮件发送日志的创建时间,自动设置为对象首次创建时的时间
('created_time', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
],
options={
# 模型的单数显示名称
'verbose_name': '邮件发送log',
# 模型的复数显示名称
'verbose_name_plural': '邮件发送log',
# 默认的排序方式,按照创建时间降序排列
'ordering': ['-created_time'],
},
),
]
]

@ -1,32 +1,67 @@
# Generated by Django 4.2.5 on 2023-09-06 13:19
# 由 Django 4.1.7 于 2023-03-02 07:14 生成
from django.db import migrations
from django.db import migrations, models
class Migration(migrations.Migration):
# 标记此迁移为初始迁移,即项目中的第一个迁移
initial = True
# 此迁移不依赖于其他迁移
dependencies = [
('servermanager', '0001_initial'),
]
# 定义要执行的操作列表
operations = [
migrations.AlterModelOptions(
name='emailsendlog',
options={'ordering': ['-creation_time'], 'verbose_name': '邮件发送log', 'verbose_name_plural': '邮件发送log'},
),
migrations.RenameField(
model_name='commands',
old_name='created_time',
new_name='creation_time',
# 创建名为 'commands' 的模型
migrations.CreateModel(
name='commands',
fields=[
# 主键字段,自动生成的大整数,作为模型的唯一标识
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 命令标题最大长度为300个字符
('title', models.CharField(max_length=300, verbose_name='命令标题')),
# 具体的命令内容最大长度为2000个字符
('command', models.CharField(max_length=2000, verbose_name='命令')),
# 命令的描述信息最大长度为300个字符
('describe', models.CharField(max_length=300, verbose_name='命令描述')),
# 命令的创建时间,自动设置为对象首次创建时的时间
('created_time', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
# 命令的最后修改时间,每次保存对象时自动更新为当前时间
('last_mod_time', models.DateTimeField(auto_now=True, verbose_name='修改时间')),
],
options={
# 模型的单数显示名称
'verbose_name': '命令',
# 模型的复数显示名称
'verbose_name_plural': '命令',
},
),
migrations.RenameField(
model_name='commands',
old_name='last_mod_time',
new_name='last_modify_time',
# 创建名为 'EmailSendLog' 的模型,用于记录邮件发送日志
migrations.CreateModel(
name='EmailSendLog',
fields=[
# 主键字段,自动生成的大整数,作为模型的唯一标识
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 邮件的收件人最大长度为300个字符
('emailto', models.CharField(max_length=300, verbose_name='收件人')),
# 邮件的标题最大长度为2000个字符
('title', models.CharField(max_length=2000, verbose_name='邮件标题')),
# 邮件的内容,使用文本字段以支持较长的内容
('content', models.TextField(verbose_name='邮件内容')),
# 邮件发送的结果,布尔值,默认为 False 表示未成功
('send_result', models.BooleanField(default=False, verbose_name='结果')),
# 邮件发送日志的创建时间,自动设置为对象首次创建时的时间
('created_time', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
],
options={
# 模型的单数显示名称
'verbose_name': '邮件发送log',
# 模型的复数显示名称
'verbose_name_plural': '邮件发送log',
# 默认的排序方式,按照创建时间降序排列
'ordering': ['-created_time'],
},
),
migrations.RenameField(
model_name='emailsendlog',
old_name='created_time',
new_name='creation_time',
),
]
]

@ -1,33 +1,32 @@
from django.db import models
# Create your models here.
# 命令模型,用于存储可在微信中执行的系统命令
class commands(models.Model):
title = models.CharField('命令标题', max_length=300)
command = models.CharField('命令', max_length=2000)
describe = models.CharField('命令描述', max_length=300)
creation_time = models.DateTimeField('创建时间', auto_now_add=True)
last_modify_time = models.DateTimeField('修改时间', auto_now=True)
title = models.CharField('命令标题', max_length=300) # 命令展示名称
command = models.CharField('命令', max_length=2000) # 实际要执行的命令内容
describe = models.CharField('命令描述', max_length=300) # 命令用途描述
creation_time = models.DateTimeField('创建时间', auto_now_add=True) # 创建时间(自动)
last_modify_time = models.DateTimeField('修改时间', auto_now=True) # 最后修改时间(自动)
def __str__(self):
return self.title
return self.title # 在后台等地方显示命令标题
class Meta:
verbose_name = '命令'
verbose_name_plural = verbose_name
verbose_name = '命令' # 单数显示名
verbose_name_plural = verbose_name # 复数显示名(与单数相同)
# 邮件发送日志模型,记录每次邮件发送的详情
class EmailSendLog(models.Model):
emailto = models.CharField('收件人', max_length=300)
title = models.CharField('邮件标题', max_length=2000)
content = models.TextField('邮件内容')
send_result = models.BooleanField('结果', default=False)
creation_time = models.DateTimeField('创建时间', auto_now_add=True)
emailto = models.CharField('收件人', max_length=300) # 收件人邮箱地址
title = models.CharField('邮件标题', max_length=2000) # 邮件标题
content = models.TextField('邮件内容') # 邮件正文内容
send_result = models.BooleanField('结果', default=False) # 发送是否成功
creation_time = models.DateTimeField('创建时间', auto_now_add=True) # 发送时间
def __str__(self):
return self.title
return self.title # 显示邮件标题
class Meta:
verbose_name = '邮件发送log'
verbose_name_plural = verbose_name
ordering = ['-creation_time']
verbose_name = '邮件发送log' # 单数显示名
verbose_name_plural = verbose_name # 复数显示名
ordering = ['-creation_time'] # 默认按创建时间倒序排列(最新的在前)

@ -1,56 +1,44 @@
import logging
import os
import re
import jsonpickle
from django.conf import settings
from werobot import WeRoBot
from werobot.replies import ArticlesReply, Article
from werobot.session.filestorage import FileStorage
from djangoblog.utils import get_sha256
from servermanager.api.blogapi import BlogApi
from servermanager.api.commonapi import ChatGPT, CommandHandler
from .MemcacheStorage import MemcacheStorage
robot = WeRoBot(token=os.environ.get('DJANGO_WEROBOT_TOKEN')
or 'lylinux', enable_session=True)
from djangoblog.utils import get_sha256 # 假设有一个获取 SHA256 的工具函数
from servermanager.api.blogapi import BlogApi # 自定义博客 API用于检索文章
from servermanager.api.commonapi import ChatGPT, CommandHandler # 自定义聊天与命令处理模块
from .MemcacheStorage import MemcacheStorage # 自定义会话存储
# 初始化微信机器人token 从环境变量读取,或使用默认值 'lylinux'
robot = WeRoBot(
token=os.environ.get('DJANGO_WEROBOT_TOKEN') or 'lylinux',
enable_session=True # 启用会话功能
)
# 初始化自定义会话存储
memstorage = MemcacheStorage()
if memstorage.is_available:
# 如果 Memcache 存储可用,则使用它
robot.config['SESSION_STORAGE'] = memstorage
else:
# 否则使用文件存储,并删除可能存在的旧会话文件
if os.path.exists(os.path.join(settings.BASE_DIR, 'werobot_session')):
os.remove(os.path.join(settings.BASE_DIR, 'werobot_session'))
robot.config['SESSION_STORAGE'] = FileStorage(filename='werobot_session')
blogapi = BlogApi()
cmd_handler = CommandHandler()
logger = logging.getLogger(__name__)
def convert_to_article_reply(articles, message):
reply = ArticlesReply(message=message)
from blog.templatetags.blog_tags import truncatechars_content
for post in articles:
imgs = re.findall(r'(?:http\:|https\:)?\/\/.*\.(?:png|jpg)', post.body)
imgurl = ''
if imgs:
imgurl = imgs[0]
article = Article(
title=post.title,
description=truncatechars_content(post.body),
img=imgurl,
url=post.get_full_url()
)
reply.add_article(article)
return reply
# 实例化自定义模块
blogapi = BlogApi() # 博客文章检索 API
cmd_handler = CommandHandler() # 命令执行处理器
logger = logging.getLogger(__name__) # 日志记录器
# 搜索文章:当用户消息以 "?" 开头时触发
@robot.filter(re.compile(r"^\?.*"))
def search(message, session):
s = message.content
searchstr = str(s).replace('?', '')
result = blogapi.search_articles(searchstr)
searchstr = str(s).replace('?', '') # 去掉问号
result = blogapi.search_articles(searchstr) # 调用 API 搜索文章
if result:
articles = list(map(lambda x: x.object, result))
reply = convert_to_article_reply(articles, message)
@ -58,25 +46,25 @@ def search(message, session):
else:
return '没有找到相关文章。'
# 获取文章分类:当用户发送 "category"(忽略大小写)
@robot.filter(re.compile(r'^category\s*$', re.I))
def category(message, session):
categorys = blogapi.get_category_lists()
categorys = blogapi.get_category_lists() # 获取分类列表
content = ','.join(map(lambda x: x.name, categorys))
return '所有文章分类目录:' + content
# 获取最近文章:当用户发送 "recent"(忽略大小写)
@robot.filter(re.compile(r'^recent\s*$', re.I))
def recents(message, session):
articles = blogapi.get_recent_articles()
articles = blogapi.get_recent_articles() # 获取最近文章
if articles:
reply = convert_to_article_reply(articles, message)
return reply
else:
return "暂时还没有文章"
@robot.filter(re.compile('^help$', re.I))
# 帮助信息:当用户发送 "help"(忽略大小写)
@robot.filter(re.compile(r'^help$', re.I))
def help(message, session):
return '''欢迎关注!
默认会与图灵机器人聊天~~
@ -97,68 +85,81 @@ def help(message, session):
PS:以上标点符号都不支持中文标点~~
'''
# 天气查询功能(暂未实现)
@robot.filter(re.compile(r'^weather\:.*$', re.I))
def weather(message, session):
return "建设中..."
# 身份证查询功能(暂未实现)
@robot.filter(re.compile(r'^idcard\:.*$', re.I))
def idcard(message, session):
return "建设中..."
# 默认消息处理器
@robot.handler
def echo(message, session):
handler = MessageHandler(message, session)
return handler.handler()
# 定义一个消息处理者类,用于处理复杂的交互逻辑,如管理员验证、命令执行等
class MessageHandler:
def __init__(self, message, session):
userid = message.source
userid = message.source # 用户唯一标识,如微信 openid
self.message = message
self.session = session
self.userid = userid
try:
# 尝试从会话中获取用户信息(序列化存储)
info = session[userid]
self.userinfo = jsonpickle.decode(info)
except Exception as e:
# 若获取失败,初始化一个默认的用户信息对象
userinfo = WxUserInfo()
self.userinfo = userinfo
@property
def is_admin(self):
# 是否是管理员
return self.userinfo.isAdmin
@property
def is_password_set(self):
# 是否已设置管理员密码
return self.userinfo.isPasswordSet
def save_session(self):
# 将当前用户信息重新序列化并保存回会话
info = jsonpickle.encode(self.userinfo)
self.session[self.userid] = info
def handler(self):
# 根据用户输入的消息内容,进行不同的处理
info = self.message.content
if self.userinfo.isAdmin and info.upper() == 'EXIT':
# 管理员输入 EXIT 退出管理员模式
self.userinfo = WxUserInfo()
self.save_session()
return "退出成功"
if info.upper() == 'ADMIN':
# 输入 ADMIN 进入管理员模式,需要输入密码
self.userinfo.isAdmin = True
self.save_session()
return "输入管理员密码"
if self.userinfo.isAdmin and not self.userinfo.isPasswordSet:
passwd = settings.WXADMIN
# 管理员已标记但尚未设置密码,进行密码验证
passwd = settings.WXADMIN # 从 Django 配置中获取管理员密码
if settings.TESTING:
passwd = '123'
passwd = '123' # 测试环境下使用简单密码
if passwd.upper() == get_sha256(get_sha256(info)).upper():
# 双重 SHA256 加密后比对
self.userinfo.isPasswordSet = True
self.save_session()
return "验证通过,请输入命令或者要执行的命令代码:输入helpme获得帮助"
else:
# 密码错误,计数并提示
if self.userinfo.Count >= 3:
self.userinfo = WxUserInfo()
self.save_session()
@ -166,22 +167,27 @@ class MessageHandler:
self.userinfo.Count += 1
self.save_session()
return "验证失败,请重新输入管理员密码:"
if self.userinfo.isAdmin and self.userinfo.isPasswordSet:
# 已验证的管理员,可以执行命令
if self.userinfo.Command != '' and info.upper() == 'Y':
# 用户确认执行之前输入的命令
return cmd_handler.run(self.userinfo.Command)
else:
if info.upper() == 'HELPME':
# 请求命令帮助
return cmd_handler.get_help()
self.userinfo.Command = info
self.userinfo.Command = info # 记录用户输入的待确认命令
self.save_session()
return "确认执行: " + info + " 命令?"
# 默认情况下,将消息交给 ChatGPT 处理(即普通聊天机器人回复)
return ChatGPT.chat(info)
# 定义一个简单的用户信息类,用于保存当前会话的用户状态
class WxUserInfo():
def __init__(self):
self.isAdmin = False
self.isPasswordSet = False
self.Count = 0
self.Command = ''
self.isAdmin = False # 是否是管理员
self.isPasswordSet = False # 是否设置了管理员密码
self.Count = 0 # 密码尝试次数
self.Command = '' # 待确认执行的命令

@ -9,18 +9,19 @@ from .models import commands
from .robot import MessageHandler, CommandHandler
from .robot import search, category, recents
# Create your tests here.
class ServerManagerTest(TestCase):
def setUp(self):
# 创建测试客户端和请求工厂
self.client = Client()
self.factory = RequestFactory()
def test_chat_gpt(self):
# 测试 ChatGPT 聊天接口是否返回内容
content = ChatGPT.chat("你好")
self.assertIsNotNone(content)
def test_validate_comment(self):
# 创建超级用户、分类、文章,并测试搜索、分类、最近文章、命令执行等
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",
@ -40,6 +41,7 @@ class ServerManagerTest(TestCase):
article.type = 'a'
article.status = 'p'
article.save()
s = TextMessage([])
s.content = "nice"
rsp = search(s, None)
@ -57,10 +59,12 @@ class ServerManagerTest(TestCase):
cmdhandler = CommandHandler()
rsp = cmdhandler.run('test')
self.assertIsNotNone(rsp)
s.source = 'u'
s.content = 'test'
msghandler = MessageHandler(s, {})
# 以下为模拟用户交互,包括设置管理员、输入命令、确认执行等
# msghandler.userinfo.isPasswordSet = True
# msghandler.userinfo.isAdmin = True
msghandler.handler()
@ -76,4 +80,4 @@ class ServerManagerTest(TestCase):
msghandler.handler()
s.content = 'exit'
msghandler.handler()
msghandler.handler()

@ -1,10 +1,11 @@
from django.urls import path
from werobot.contrib.django import make_view
from werobot.contrib.django import make_view # WeRoBot 提供的 Django 视图适配器
from .robot import robot
from .robot import robot # 导入我们初始化的机器人实例
app_name = "servermanager" # 应用命名空间
app_name = "servermanager"
urlpatterns = [
# 将 /robot 路径映射到 WeRoBot 的处理视图
path(r'robot', make_view(robot)),
]
]

@ -1,50 +1,98 @@
# setup_pet_blog.py
# 引入操作系统接口模块,用于设置环境变量
import os
# 引入 Django 模块,用于初始化 Django 环境
import django
# 设置 Django 的 settings 模块为 'djangoblog.settings'
# 这一步是必须的,以便 Django 知道使用哪个配置文件
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoblog.settings')
# 初始化 Django 环境,加载所有应用和配置
django.setup()
# 从 blog 应用的 models 模块中导入 Article文章、Category分类、Tag标签模型
from blog.models import Article, Category, Tag
# 从 accounts 应用的 models 模块中导入 BlogUser博客用户模型
from accounts.models import BlogUser
# 打印脚本开始信息
print("=== 设置宠物博客主题 ===")
# -------------------------------
# 1. 清理旧数据
# -------------------------------
print("清理旧数据...")
# 删除数据库中所有的现有文章
Article.objects.all().delete()
# 删除数据库中所有的现有分类
Category.objects.all().delete()
# 删除数据库中所有的现有标签
Tag.objects.all().delete()
print("旧数据已清理。")
# -------------------------------
# 2. 创建分类
# -------------------------------
print("创建分类...")
# 定义一个包含多个分类名称的列表
categories = ['狗狗日常', '猫咪生活', '宠物健康', '训练技巧', '宠物用品']
# 遍历分类名称列表,为每个分类创建一个 Category 对象
for cat in categories:
# 使用 get_or_create 方法获取或创建一个分类
# 如果分类已存在,则获取它;如果不存在,则创建一个新的分类
Category.objects.get_or_create(name=cat)
# 打印当前正在创建的分类名称
print(f'创建分类: {cat}')
print("分类已创建。")
# -------------------------------
# 3. 创建标签
# -------------------------------
print("创建标签...")
# 定义一个包含多个标签名称的列表
tags = ['图文', '狗狗社交', '遛狗', '宠物医疗', '宠物食品', '训练方法', '宠物美容', '宠物玩具']
# 遍历标签名称列表,为每个标签创建一个 Tag 对象
for tag in tags:
# 使用 get_or_create 方法获取或创建一个标签
# 如果标签已存在,则获取它;如果不存在,则创建一个新的标签
Tag.objects.get_or_create(name=tag)
# 打印当前正在创建的标签名称
print(f'创建标签: {tag}')
print("标签已创建。")
# -------------------------------
# 4. 创建文章
# -------------------------------
print("创建文章...")
try:
# 尝试获取第一个用户对象,假设这是博客的管理员或主要用户
user = BlogUser.objects.first()
# 如果未找到任何用户,则创建一个新的用户
if not user:
user = BlogUser.objects.create(username='pet_owner', email='pet@example.com')
# 创建一个新的 BlogUser 对象
user = BlogUser.objects.create(
username='pet_owner', # 用户名
email='pet@example.com' # 用户邮箱
)
# 为用户设置密码,密码将被哈希存储
user.set_password('123456')
# 保存用户对象到数据库
user.save()
# 打印用户创建信息(可选)
print("创建新用户: pet_owner")
# 定义一个包含多篇文章数据的列表
# 每个元素是一个字典,包含文章的标题、内容、分类名称和标签列表
articles_data = [
{
'title': '我家狗狗的表演',
'title': '我家狗狗的表演', # 文章标题
'body': '欲擒故纵,你们见过吗。就是要出去的时候,故意离你远远的,让人你没法给它套上项圈,其实它很想出去。',
'category': '狗狗日常',
'tags': ['图文', '狗狗社交', '遛狗']
# 文章内容
'category': '狗狗日常', # 分类名称
'tags': ['图文', '狗狗社交', '遛狗'] # 标签列表
},
{
'title': '猫咪的日常护理',
@ -60,21 +108,38 @@ try:
}
]
# 遍历 articles_data 列表中的每一篇文章数据,逐个创建文章
for data in articles_data:
# 根据分类名称从 Category 模型中获取对应的分类对象
# 如果找不到对应的分类,这里会抛出 Category.DoesNotExist 异常
category = Category.objects.get(name=data['category'])
# 创建一个新的 Article 对象,并保存到数据库中
article = Article.objects.create(
title=data['title'],
body=data['body'],
author=user,
category=category,
status='p'
title=data['title'], # 设置文章标题
body=data['body'], # 设置文章内容
author=user, # 设置文章作者为之前获取或创建的用户
category=category, # 设置文章分类
status='p' # 设置文章状态为 'p'(通常代表已发布)
)
# 遍历当前文章数据中的每一个标签名称
for tag_name in data['tags']:
# 根据标签名称获取或创建一个 Tag 对象
# 如果标签不存在,则创建一个新的标签
tag, _ = Tag.objects.get_or_create(name=tag_name)
# 将该标签添加到文章的标签集合中
article.tags.add(tag)
# 打印成功创建文章的信息,包括文章标题
print(f'创建文章: {data["title"]}')
except Exception as e:
# 如果在创建文章的过程中发生任何异常,打印错误信息,包括异常详情
print(f'创建文章时出错: {e}')
# -------------------------------
# 5. 输出设置完成信息
# -------------------------------
print("=== 宠物博客主题设置完成 ===")
Loading…
Cancel
Save