Compare commits

..

42 Commits

Author SHA1 Message Date
plhw57tbe b1359b18c1 ADD file via upload
3 months ago
plhw57tbe f1c373cf26 Delete 'doc/实践考评-开源软件大作业项目的自评报告.xlsx'
3 months ago
plhw57tbe f60e19d825 Merge pull request '合并分支' (#11) from smy_branch into master
3 months ago
pa2g3nmk9 9b20495c6e 上传文件至 'doc'
4 months ago
pa2g3nmk9 027dfca121 上传文件至 'doc'
4 months ago
pa2g3nmk9 4f4a5253ce 上传文件至 'doc'
4 months ago
plhw57tbe 68d571203a ADD file via upload
4 months ago
zxc 1c792651b0 Update urls.py
4 months ago
zxc 1a1515649c Update tests.py
4 months ago
zxc 8e224aab0e Update robot.py
4 months ago
zxc bb83dc7a1f Update models.py
4 months ago
zxc 1fee210e0b Update apps.py
4 months ago
zxc 235351d277 Update admin.py
4 months ago
zxc 5e97ac4351 Update MemcacheStorage.py
4 months ago
zxc 06981cdda4 Update 0002_alter_emailsendlog_options_and_more.py
4 months ago
zxc 7fdc0a59be Update 0001_initial.py
4 months ago
zxc ee81ca1a4d Update commonapi.py
4 months ago
zxc 4e8ad15d2f Update blogapi.py
4 months ago
zxc 59810b588d Update plugin.py
4 months ago
zxc 74c83abfde Update plugin.py
4 months ago
zxc 94c7dbed19 Update plugin.py
4 months ago
zxc 84127fe0af Update plugin.py
4 months ago
zxc f83339d3eb Update plugin.py
4 months ago
zxc b130e7f44e Update views.py
4 months ago
zxc 7b2ec798ca Update urls.py
4 months ago
zxc b243aefa4a Update tests.py
4 months ago
zxc 8bb5d004d7 Update models.py
4 months ago
zxc 75482f7bac Update apps.py
4 months ago
zxc d12d2aba96 Update admin.py
4 months ago
zxc 5bc4b18368 Update 0002_alter_owntracklog_options_and_more.py
4 months ago
zxc 0144aa7ddf Update 0001_initial.py
4 months ago
plhw57tbe b837cfe1f8 Merge pull request '合并分支' (#10) from smy_branch into master
4 months ago
pa2g3nmk9 c83a06d26a 上传文件至 'doc'
4 months ago
pa2g3nmk9 9f60f12057 上传文件至 'doc'
4 months ago
smy 89f603335c Merge branch 'master' of https://bdgit.educoder.net/plhw57tbe/SoftwareMethodology
5 months ago
smy 3ce0187398 Merge branch 'develop'
5 months ago
fz c7b1c2a3aa 合并develop分支:解决main.py冲突
5 months ago
fz f338bb2378 备份develop分支的修改
5 months ago
zxc 57eab4a72d 在src目录添加main.py文件
5 months ago
txb 3c06c750d4 在src目录添加main.py文件
5 months ago
smy 192f27dd24 在master分支添加doc目录及README.md文档
5 months ago
smy 3fac0dcd23 删除master分支中的README.md文件
5 months ago

@ -1 +1 @@
这是doc目录的说明文档
# Documentation

Binary file not shown.

@ -1,7 +1,17 @@
from django.contrib import admin
# Register your models here.
# 注册模型到Django管理后台使模型数据可通过后台界面管理
# 定义OwnTrackLogs模型的管理类用于配置模型在admin后台的展示和操作方式
class OwnTrackLogsAdmin(admin.ModelAdmin):
# 此处为管理类的配置区域目前未添加任何自定义配置pass表示空实现
# 可根据需求添加如下常见配置:
# list_display = ('tid', 'lat', 'lon', 'creation_time') # 列表页显示的字段
# list_filter = ('tid',) # 可用于过滤的字段
# search_fields = ('tid',) # 可搜索的字段
# ordering = ('-creation_time',) # 默认排序方式(按创建时间降序)
pass
# 注意:需要导入对应的模型才能完成注册,例如:
# from .models import OwnTrackLog
# admin.site.register(OwnTrackLog, OwnTrackLogsAdmin)

@ -2,4 +2,14 @@ from django.apps import AppConfig
class OwntracksConfig(AppConfig):
# 定义Django应用的配置类用于配置应用的元数据和行为
# 指定应用的名称,这是应用的唯一标识
# 在Django项目中通过该名称引用此应用如在INSTALLED_APPS中注册、迁移依赖等
name = 'owntracks'
# 可选配置(当前未设置):
# verbose_name应用的可读名称用于在admin后台等位置显示
# 例如verbose_name = '位置追踪日志'
# default_auto_field指定模型默认的主键字段类型
# 例如default_auto_field = 'django.db.models.BigAutoField'

@ -1,31 +1,56 @@
# Generated by Django 4.1.7 on 2023-03-02 07:14
# 由 Django 4.1.7 于 2023年3月2日 07:14 自动生成
# 该文件是Django的数据迁移文件用于定义数据库模型结构的变更
from django.db import migrations, models
import django.utils.timezone
class Migration(migrations.Migration):
# 标记为初始迁移(首次创建模型时使用)
initial = True
# 依赖的其他迁移文件,初始迁移无依赖
dependencies = [
]
# 迁移操作列表
operations = [
# 创建名为'OwnTrackLog'的数据模型
migrations.CreateModel(
name='OwnTrackLog',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('tid', models.CharField(max_length=100, verbose_name='用户')),
('lat', models.FloatField(verbose_name='纬度')),
('lon', models.FloatField(verbose_name='经度')),
('created_time', models.DateTimeField(default=django.utils.timezone.now, verbose_name='创建时间')),
# 自增主键字段类型为BigAutoField大整数自增
('id', models.BigAutoField(
auto_created=True, # 自动创建
primary_key=True, # 作为主键
serialize=False, # 不序列化
verbose_name='ID' # 字段显示名称
)),
# 用户标识字段字符串类型最大长度100
('tid', models.CharField(
max_length=100,
verbose_name='用户' # 显示名称为“用户”
)),
# 纬度字段,浮点型
('lat', models.FloatField(
verbose_name='纬度' # 显示名称为“纬度”
)),
# 经度字段,浮点型
('lon', models.FloatField(
verbose_name='经度' # 显示名称为“经度”
)),
# 创建时间字段,默认值为当前时间(时区感知)
('created_time', models.DateTimeField(
default=django.utils.timezone.now, # 默认值为当前时间
verbose_name='创建时间' # 显示名称为“创建时间”
)),
],
# 模型的额外配置选项
options={
'verbose_name': 'OwnTrackLogs',
'verbose_name_plural': 'OwnTrackLogs',
'ordering': ['created_time'],
'get_latest_by': 'created_time',
'verbose_name': 'OwnTrackLogs', # 模型的单数显示名称
'verbose_name_plural': 'OwnTrackLogs', # 模型的复数显示名称(此处与单数相同)
'ordering': ['created_time'], # 默认排序方式:按创建时间升序
'get_latest_by': 'created_time', # 指定通过created_time字段获取最新记录
},
),
]

@ -1,22 +1,31 @@
# Generated by Django 4.2.5 on 2023-09-06 13:19
# 由 Django 4.2.5 于 2023年9月6日 13:19 自动生成
# 该文件是Django的数据迁移文件用于修改已存在的数据模型结构
from django.db import migrations
class Migration(migrations.Migration):
# 依赖的迁移文件:依赖于'owntracks'应用下的0001_initial迁移
# 意味着执行当前迁移前必须先执行0001_initial迁移
dependencies = [
('owntracks', '0001_initial'),
]
# 迁移操作列表
operations = [
# 修改OwnTrackLog模型的配置选项
migrations.AlterModelOptions(
name='owntracklog',
name='owntracklog', # 目标模型名称
# 更新后的模型选项:
# - 按'creation_time'字段获取最新记录(原先是'created_time'
# - 按'creation_time'字段升序排序(原先是'created_time'
# - 单数和复数显示名称保持不变
options={'get_latest_by': 'creation_time', 'ordering': ['creation_time'], 'verbose_name': 'OwnTrackLogs', 'verbose_name_plural': 'OwnTrackLogs'},
),
# 重命名OwnTrackLog模型的字段
migrations.RenameField(
model_name='owntracklog',
old_name='created_time',
new_name='creation_time',
model_name='owntracklog', # 目标模型名称
old_name='created_time', # 原字段名
new_name='creation_time', # 新字段名
),
]

@ -4,17 +4,26 @@ from django.utils.timezone import now
# Create your models here.
# 定义OwnTrackLog模型用于存储用户的位置追踪日志数据
class OwnTrackLog(models.Model):
# 用户标识字段字符串类型最大长度100不允许为空显示名称为“用户”
tid = models.CharField(max_length=100, null=False, verbose_name='用户')
# 纬度字段:浮点型,显示名称为“纬度”
lat = models.FloatField(verbose_name='纬度')
# 经度字段:浮点型,显示名称为“经度”
lon = models.FloatField(verbose_name='经度')
# 创建时间字段DateTime类型显示名称为“创建时间”默认值为当前时间带时区
creation_time = models.DateTimeField('创建时间', default=now)
# 定义模型实例的字符串表示形式返回用户标识tid
def __str__(self):
return self.tid
# 模型的元数据配置
class Meta:
ordering = ['creation_time']
verbose_name = "OwnTrackLogs"
verbose_name_plural = verbose_name
get_latest_by = 'creation_time'
ordering = ['creation_time'] # 默认按创建时间升序排序
verbose_name = "OwnTrackLogs" # 模型的单数显示名称
verbose_name_plural = verbose_name # 模型的复数显示名称(与单数相同)
get_latest_by = 'creation_time' # 指定通过creation_time字段获取最新记录

@ -8,57 +8,85 @@ from .models import OwnTrackLog
# Create your tests here.
# 定义测试类继承自Django的TestCase用于测试OwnTrackLog相关功能
class OwnTrackLogTest(TestCase):
# 测试前的初始化方法,会在每个测试方法执行前调用
def setUp(self):
# 创建一个测试客户端用于模拟HTTP请求
self.client = Client()
# 创建一个请求工厂,用于构造更复杂的请求对象(本测试中未实际使用)
self.factory = RequestFactory()
# 核心测试方法测试OwnTrackLog的相关接口和功能
def test_own_track_log(self):
# 1. 测试正常提交位置数据
# 构造符合要求的测试数据包含tid、lat、lon三个必要字段
o = {
'tid': 12,
'lat': 123.123,
'lon': 134.341
}
# 模拟POST请求提交数据到日志记录接口
self.client.post(
'/owntracks/logtracks',
json.dumps(o),
content_type='application/json')
'/owntracks/logtracks', # 请求的URL
json.dumps(o), # 将数据序列化为JSON字符串
content_type='application/json' # 指定内容类型为JSON
)
# 验证数据库中是否成功创建了一条记录
length = len(OwnTrackLog.objects.all())
self.assertEqual(length, 1)
# 2. 测试提交不完整数据缺少lon字段
o = {
'tid': 12,
'lat': 123.123
'lat': 123.123 # 缺少经度lon字段
}
# 再次发送POST请求
self.client.post(
'/owntracks/logtracks',
json.dumps(o),
content_type='application/json')
content_type='application/json'
)
# 验证数据库记录数未增加(因数据不完整未创建新记录)
length = len(OwnTrackLog.objects.all())
self.assertEqual(length, 1)
# 3. 测试未登录状态访问受保护页面
# 访问地图展示页面
rsp = self.client.get('/owntracks/show_maps')
# 验证未登录时被重定向状态码302
self.assertEqual(rsp.status_code, 302)
# 4. 创建超级用户并登录,测试登录后访问功能
# 创建超级用户
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",
password="liangliangyy1")
password="liangliangyy1"
)
# 使用测试客户端登录
self.client.login(username='liangliangyy1', password='liangliangyy1')
# 手动创建一条位置记录
s = OwnTrackLog()
s.tid = 12
s.lon = 123.234
s.lat = 34.234
s.save()
# 测试登录后访问各接口是否正常状态码200
rsp = self.client.get('/owntracks/show_dates')
self.assertEqual(rsp.status_code, 200)
rsp = self.client.get('/owntracks/show_maps')
self.assertEqual(rsp.status_code, 200)
rsp = self.client.get('/owntracks/get_datas')
self.assertEqual(rsp.status_code, 200)
# 测试带日期参数的接口请求
rsp = self.client.get('/owntracks/get_datas?date=2018-02-26')
self.assertEqual(rsp.status_code, 200)

@ -1,12 +1,27 @@
from django.urls import path
# 导入当前应用owntracks的views模块用于关联URL与视图函数
from . import views
# 定义应用的命名空间为"owntracks"
# 作用在使用reverse()或模板中引用URL时可通过"owntracks:URL名称"的格式精准定位避免不同应用间URL名称冲突
app_name = "owntracks"
# 定义URL路由列表将URL路径与对应的视图函数绑定
urlpatterns = [
# 1. 位置日志提交接口接收POST请求存储位置数据
# 路径:/owntracks/logtracks关联视图函数manage_owntrack_logURL名称为logtracks
path('owntracks/logtracks', views.manage_owntrack_log, name='logtracks'),
# 2. 地图展示页面:展示位置数据的地图视图
# 路径:/owntracks/show_maps关联视图函数show_mapsURL名称为show_maps
path('owntracks/show_maps', views.show_maps, name='show_maps'),
# 3. 数据查询接口:获取位置日志数据(支持带日期参数筛选)
# 路径:/owntracks/get_datas关联视图函数get_datasURL名称为get_datas
path('owntracks/get_datas', views.get_datas, name='get_datas'),
# 4. 日期列表页面:展示有位置日志的日期列表
# 路径:/owntracks/show_dates关联视图函数show_log_datesURL名称为show_dates
path('owntracks/show_dates', views.show_log_dates, name='show_dates')
]

@ -16,86 +16,110 @@ from django.views.decorators.csrf import csrf_exempt
from .models import OwnTrackLog
# 初始化日志记录器,用于记录视图中的操作和错误信息
logger = logging.getLogger(__name__)
# 处理位置日志数据提交的视图函数禁用CSRF保护方便外部设备提交数据
@csrf_exempt
def manage_owntrack_log(request):
try:
# 解析请求体中的JSON数据
s = json.loads(request.read().decode('utf-8'))
# 提取必要的字段(用户标识、纬度、经度)
tid = s['tid']
lat = s['lat']
lon = s['lon']
# 记录日志信息
logger.info(
'tid:{tid}.lat:{lat}.lon:{lon}'.format(
tid=tid, lat=lat, lon=lon))
# 验证字段不为空
if tid and lat and lon:
# 创建并保存位置记录
m = OwnTrackLog()
m.tid = tid
m.lat = lat
m.lon = lon
m.save()
return HttpResponse('ok')
return HttpResponse('ok') # 成功响应
else:
return HttpResponse('data error')
return HttpResponse('data error') # 数据不完整错误
except Exception as e:
# 记录异常信息
logger.error(e)
return HttpResponse('error')
return HttpResponse('error') # 异常响应
# 显示地图页面的视图函数,要求用户登录且为超级用户
@login_required
def show_maps(request):
if request.user.is_superuser:
# 获取默认日期当前UTC日期或请求中的日期参数
defaultdate = str(datetime.datetime.now(timezone.utc).date())
date = request.GET.get('date', defaultdate)
# 传递日期参数到模板
context = {
'date': date
}
return render(request, 'owntracks/show_maps.html', context)
else:
# 非超级用户拒绝访问
from django.http import HttpResponseForbidden
return HttpResponseForbidden()
# 显示日志日期列表的视图函数,要求用户登录
@login_required
def show_log_dates(request):
# 获取所有记录的创建时间,提取日期并去重排序
dates = OwnTrackLog.objects.values_list('creation_time', flat=True)
results = list(sorted(set(map(lambda x: x.strftime('%Y-%m-%d'), dates))))
# 传递日期列表到模板
context = {
'results': results
}
return render(request, 'owntracks/show_log_dates.html', context)
# 将GPS坐标转换为高德地图坐标的工具函数
def convert_to_amap(locations):
convert_result = []
it = iter(locations)
it = iter(locations) # 创建迭代器
# 每次处理30个坐标高德API限制
item = list(itertools.islice(it, 30))
while item:
# 拼接坐标字符串(格式:"lon1,lat1;lon2,lat2"
datas = ';'.join(
set(map(lambda x: str(x.lon) + ',' + str(x.lat), item)))
key = '8440a376dfc9743d8924bf0ad141f28e'
# 高德坐标转换API参数
key = '8440a376dfc9743d8924bf0ad141f28e' # 高德API密钥
api = 'http://restapi.amap.com/v3/assistant/coordinate/convert'
query = {
'key': key,
'locations': datas,
'coordsys': 'gps'
'coordsys': 'gps' # 源坐标系统为GPS
}
# 调用高德API进行坐标转换
rsp = requests.get(url=api, params=query)
result = json.loads(rsp.text)
if "locations" in result:
convert_result.append(result['locations'])
# 处理下一批坐标
item = list(itertools.islice(it, 30))
# 拼接所有转换结果
return ";".join(convert_result)
# 获取位置数据的视图函数,要求用户登录
@login_required
def get_datas(request):
# 确定查询日期默认今天可通过date参数指定
now = django.utils.timezone.now().replace(tzinfo=timezone.utc)
querydate = django.utils.timezone.datetime(
now.year, now.month, now.day, 0, 0, 0)
@ -103,25 +127,32 @@ def get_datas(request):
date = list(map(lambda x: int(x), request.GET.get('date').split('-')))
querydate = django.utils.timezone.datetime(
date[0], date[1], date[2], 0, 0, 0)
# 计算查询日期的结束时间次日0点
nextdate = querydate + datetime.timedelta(days=1)
# 查询该日期范围内的所有位置记录
models = OwnTrackLog.objects.filter(
creation_time__range=(querydate, nextdate))
result = list()
if models and len(models):
# 按用户标识tid分组
for tid, item in groupby(
sorted(models, key=lambda k: k.tid), key=lambda k: k.tid):
d = dict()
d["name"] = tid
d["name"] = tid # 用户名
paths = list()
# 使用高德转换后的经纬度
# 注释掉的代码:使用高德转换后的坐标
# locations = convert_to_amap(
# sorted(item, key=lambda x: x.creation_time))
# for i in locations.split(';'):
# paths.append(i.split(','))
# 使用GPS原始经纬度
# 当前使用GPS原始坐标按创建时间排序
for location in sorted(item, key=lambda x: x.creation_time):
paths.append([str(location.lon), str(location.lat)])
d["path"] = paths
d["path"] = paths # 位置路径
result.append(d)
# 返回JSON格式的位置数据
return JsonResponse(result, safe=False)

@ -4,29 +4,37 @@ from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
class ArticleCopyrightPlugin(BasePlugin):
PLUGIN_NAME = '文章结尾版权声明'
PLUGIN_DESCRIPTION = '一个在文章正文末尾添加版权声明的插件。'
PLUGIN_VERSION = '0.2.0'
PLUGIN_AUTHOR = 'liangliangyy'
# 插件基本信息定义
PLUGIN_NAME = '文章结尾版权声明' # 插件名称,用于在管理界面显示
PLUGIN_DESCRIPTION = '一个在文章正文末尾添加版权声明的插件。' # 插件功能描述
PLUGIN_VERSION = '0.2.0' # 插件版本号
PLUGIN_AUTHOR = 'liangliangyy' # 插件作者
# 2. 实现 register_hooks 方法,专门用于注册钩子
# 2. 实现钩子注册方法,用于将插件功能绑定到系统钩子
def register_hooks(self):
# 在这里将插件的方法注册到指定的钩子上
# 将当前插件的add_copyright_to_content方法注册到文章内容钩子
# 当系统触发ARTICLE_CONTENT_HOOK_NAME钩子时会自动执行该方法
hooks.register(ARTICLE_CONTENT_HOOK_NAME, self.add_copyright_to_content)
def add_copyright_to_content(self, content, *args, **kwargs):
"""
这个方法会被注册到 'the_content' 过滤器钩子上
它接收原始内容并返回添加了版权信息的新内容
具体的插件功能实现在文章内容末尾添加版权声明
该方法会被注册到文章内容处理的钩子上接收原始内容并返回处理后的内容
"""
# 从关键字参数中获取当前文章对象
article = kwargs.get('article')
# 如果没有文章对象(如非文章场景),直接返回原始内容
if not article:
return content
# 构造版权声明内容,包含文章作者信息
copyright_info = f"\n<hr><p>本文由 {article.author.username} 原创,转载请注明出处。</p>"
# 将版权声明追加到原始内容末尾并返回
return content + copyright_info
# 3. 实例化插件。
# 这会自动调用 BasePlugin.__init__然后 BasePlugin.__init__ 会调用我们上面定义的 register_hooks 方法。
# 3. 实例化插件
# 实例化时会自动调用父类BasePlugin的__init__方法
# 父类初始化过程中会调用当前类的register_hooks方法完成钩子注册
# 从而使插件功能生效
plugin = ArticleCopyrightPlugin()

@ -6,43 +6,51 @@ from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
class ExternalLinksPlugin(BasePlugin):
PLUGIN_NAME = '外部链接处理器'
PLUGIN_DESCRIPTION = '自动为文章中的外部链接添加 target="_blank" 和 rel="noopener noreferrer" 属性。'
PLUGIN_VERSION = '0.1.0'
PLUGIN_AUTHOR = 'liangliangyy'
# 插件元信息定义,用于系统识别和管理插件
PLUGIN_NAME = '外部链接处理器' # 插件名称,显示在插件管理界面
PLUGIN_DESCRIPTION = '自动为文章中的外部链接添加 target="_blank" 和 rel="noopener noreferrer" 属性。' # 功能描述
PLUGIN_VERSION = '0.1.0' # 插件版本号
PLUGIN_AUTHOR = 'liangliangyy' # 插件作者
# 注册钩子:将插件功能绑定到文章内容处理钩子
def register_hooks(self):
# 当系统触发ARTICLE_CONTENT_HOOK_NAME文章内容钩子执行process_external_links方法
hooks.register(ARTICLE_CONTENT_HOOK_NAME, self.process_external_links)
# 核心功能:处理文章中的外部链接,添加安全属性
def process_external_links(self, content, *args, **kwargs):
# 导入工具函数,获取当前网站的域名(用于判断是否为外部链接)
from djangoblog.utils import get_current_site
site_domain = get_current_site().domain
# 正则表达式查找所有 <a> 标签
# 正则表达式:匹配文章中的<a>标签,捕获 href 属性值及标签前后内容
# 匹配规则:<a ... href="链接地址" .../>,不区分大小写
link_pattern = re.compile(r'(<a\s+(?:[^>]*?\s+)?href=")([^"]*)(".*?/a>)', re.IGNORECASE)
# 替换函数:对匹配到的<a>标签进行处理
def replacer(match):
# match.group(1) 是 <a ... href="
# match.group(2) 是链接 URL
# match.group(3) 是 ">...</a>
# 解构匹配结果group(1)=<a ... href="group(2)=链接URLgroup(3)=".../a>
href = match.group(2)
full_a_tag = match.group(0)
# 如果链接已经有 target 属性,则不处理
if 'target=' in match.group(0).lower():
return match.group(0)
# 跳过已包含target属性的链接避免重复添加
if 'target=' in full_a_tag.lower():
return full_a_tag
# 解析链接
# 解析链接URL提取域名netloc
parsed_url = urlparse(href)
# 如果链接是外部的 (有域名且域名不等于当前网站域名)
# 判断是否为外部链接:有域名(非相对路径)且域名不等于当前网站域名
if parsed_url.netloc and parsed_url.netloc != site_domain:
# 添加 target 和 rel 属性
# 为外部链接添加 target="_blank"(新窗口打开)和 rel="noopener noreferrer"(安全防护)
return f'{match.group(1)}{href}" target="_blank" rel="noopener noreferrer"{match.group(3)}'
# 否则返回原样
return match.group(0)
# 内部链接(相对路径或同域名):返回原标签,不做修改
return full_a_tag
# 用replacer函数替换content中所有匹配的<a>标签,返回处理后的内容
return link_pattern.sub(replacer, content)
# 实例化插件:自动触发父类初始化,完成钩子注册,使插件生效
plugin = ExternalLinksPlugin()

@ -1,4 +1,4 @@
import math
import math
import re
from djangoblog.plugin_manage.base_plugin import BasePlugin
from djangoblog.plugin_manage import hooks
@ -6,38 +6,44 @@ from djangoblog.plugin_manage.hook_constants import ARTICLE_CONTENT_HOOK_NAME
class ReadingTimePlugin(BasePlugin):
PLUGIN_NAME = '阅读时间预测'
PLUGIN_DESCRIPTION = '估算文章阅读时间并显示在文章开头。'
PLUGIN_VERSION = '0.1.0'
PLUGIN_AUTHOR = 'liangliangyy'
# 插件元信息:定义插件的基础标识与说明
PLUGIN_NAME = '阅读时间预测' # 插件名称,用于插件管理界面展示
PLUGIN_DESCRIPTION = '估算文章阅读时间并显示在文章开头。' # 插件功能描述
PLUGIN_VERSION = '0.1.0' # 插件版本号
PLUGIN_AUTHOR = 'liangliangyy' # 插件作者
# 钩子注册:将插件功能绑定到文章内容处理钩子
def register_hooks(self):
# 当系统处理文章内容触发ARTICLE_CONTENT_HOOK_NAME钩子执行add_reading_time方法
hooks.register(ARTICLE_CONTENT_HOOK_NAME, self.add_reading_time)
def add_reading_time(self, content, *args, **kwargs):
"""
计算阅读时间并添加到内容开头
核心功能计算文章阅读时间将结果添加到内容开头
"""
# 移除HTML标签和空白字符以获得纯文本
clean_content = re.sub(r'<[^>]*>', '', content)
clean_content = clean_content.strip()
# 1. 清理内容移除HTML标签避免标签干扰字数统计再去除首尾空白字符
clean_content = re.sub(r'<[^>]*>', '', content) # 匹配所有<...>格式的HTML标签并删除
clean_content = clean_content.strip() # 去除文本前后的空格、换行等空白字符
# 文和英文单词混合计数的一个简单方法
# 匹配中文字符或连续的非中文字符(视为单词)
# 2. 统计有效字数:支持中英文混合计数
# 正则匹配规则:匹配单个中文字符([\u4e00-\u9fa5])或连续的非中文字符(视为英文单词,\w+
words = re.findall(r'[\u4e00-\u9fa5]|\w+', clean_content)
word_count = len(words)
word_count = len(words) # 统计匹配到的字符/单词总数
# 按平均每分钟200字的速度计
reading_speed = 200
reading_minutes = math.ceil(word_count / reading_speed)
# 如果阅读时间少于1分钟则显示为1分钟
# 3. 计算阅读时间:按平均阅读速度估
reading_speed = 200 # 设定平均阅读速度每分钟200字中英文通用参考值
reading_minutes = math.ceil(word_count / reading_speed) # 向上取整避免0分钟的情况
# 4. 处理边界值若计算结果小于1分钟强制显示为1分钟符合用户认知
if reading_minutes < 1:
reading_minutes = 1
# 5. 构造阅读时间的HTML片段设置浅灰色文字斜体样式不干扰正文阅读
reading_time_html = f'<p style="color: #888;"><em>预计阅读时间:{reading_minutes} 分钟</em></p>'
# 6. 将阅读时间片段添加到文章内容开头,返回处理后的完整内容
return reading_time_html + content
plugin = ReadingTimePlugin()
# 实例化插件自动触发父类BasePlugin的初始化逻辑完成钩子注册使插件在系统中生效
plugin = ReadingTimePlugin()

@ -1,6 +1,6 @@
import json
from django.utils.html import strip_tags
from django.template.defaultfilters import truncatewords
from django.template.defaultfiltersfilters import truncatewords
from djangoblog.plugin_manage.base_plugin import BasePlugin
from djangoblog.plugin_manage import hooks
from blog.models import Article, Category, Tag
@ -8,22 +8,29 @@ from djangoblog.utils import get_blog_setting
class SeoOptimizerPlugin(BasePlugin):
PLUGIN_NAME = 'SEO 优化器'
PLUGIN_DESCRIPTION = '为文章、页面等提供 SEO 优化,动态生成 meta 标签和 JSON-LD 结构化数据。'
PLUGIN_VERSION = '0.2.0'
PLUGIN_AUTHOR = 'liuangliangyy'
# 插件元信息定义
PLUGIN_NAME = 'SEO 优化器' # 插件名称
PLUGIN_DESCRIPTION = '为文章、页面等提供 SEO 优化,动态生成 meta 标签和 JSON-LD 结构化数据。' # 功能描述
PLUGIN_VERSION = '0.2.0' # 版本号
PLUGIN_AUTHOR = 'liuangliangyy' # 作者
# 注册钩子将SEO生成逻辑绑定到'head_meta'钩子(页面头部元信息钩子)
def register_hooks(self):
hooks.register('head_meta', self.dispatch_seo_generation)
# 生成文章页面的SEO数据
def _get_article_seo_data(self, context, request, blog_setting):
# 从上下文获取文章对象验证是否为Article实例
article = context.get('article')
if not isinstance(article, Article):
return None
# 提取文章描述移除HTML标签截取前150字符
description = strip_tags(article.body)[:150]
# 提取关键词(标签名称组合,默认使用网站关键词)
keywords = ",".join([tag.name for tag in article.tags.all()]) or blog_setting.site_keywords
# 生成Open Graph社交分享meta标签
meta_tags = f'''
<meta property="og:type" content="article"/>
<meta property="og:title" content="{article.title}"/>
@ -34,49 +41,58 @@ class SeoOptimizerPlugin(BasePlugin):
<meta property="article:author" content="{article.author.username}"/>
<meta property="article:section" content="{article.category.name}"/>
'''
# 为每个标签添加meta标签
for tag in article.tags.all():
meta_tags += f'<meta property="article:tag" content="{tag.name}"/>'
meta_tags += f'<meta property="og:site_name" content="{blog_setting.site_name}"/>'
# 生成JSON-LD结构化数据供搜索引擎解析的标准化数据
structured_data = {
"@context": "https://schema.org",
"@type": "Article",
"mainEntityOfPage": {"@type": "WebPage", "@id": request.build_absolute_uri()},
"headline": article.title,
"description": description,
"image": request.build_absolute_uri(article.get_first_image_url()),
"image": request.build_absolute_uri(article.get_first_image_url()), # 文章首图
"datePublished": article.pub_time.isoformat(),
"dateModified": article.last_modify_time.isoformat(),
"author": {"@type": "Person", "name": article.author.username},
"publisher": {"@type": "Organization", "name": blog_setting.site_name}
}
# 若没有图片则移除image字段
if not structured_data.get("image"):
del structured_data["image"]
return {
"title": f"{article.title} | {blog_setting.site_name}",
"title": f"{article.title} | {blog_setting.site_name}", # 页面标题(含网站名)
"description": description,
"keywords": keywords,
"meta_tags": meta_tags,
"json_ld": structured_data
}
# 生成分类页面的SEO数据
def _get_category_seo_data(self, context, request, blog_setting):
# 从上下文获取分类名称
category_name = context.get('tag_name')
if not category_name:
return None
# 查询分类对象
category = Category.objects.filter(name=category_name).first()
if not category:
return None
# 构造页面标题、描述和关键词
title = f"{category.name} | {blog_setting.site_name}"
description = strip_tags(category.name) or blog_setting.site_description
keywords = category.name
# BreadcrumbList structured data for category page
breadcrumb_items = [{"@type": "ListItem", "position": 1, "name": "首页", "item": request.build_absolute_uri('/')}]
breadcrumb_items.append({"@type": "ListItem", "position": 2, "name": category.name, "item": request.build_absolute_uri()})
# 生成面包屑导航的JSON-LD数据提升页面结构可读性
breadcrumb_items = [
{"@type": "ListItem", "position": 1, "name": "首页", "item": request.build_absolute_uri('/')},
{"@type": "ListItem", "position": 2, "name": category.name, "item": request.build_absolute_uri()}
]
structured_data = {
"@context": "https://schema.org",
@ -88,12 +104,13 @@ class SeoOptimizerPlugin(BasePlugin):
"title": title,
"description": description,
"keywords": keywords,
"meta_tags": "",
"meta_tags": "", # 分类页暂不添加额外meta标签
"json_ld": structured_data
}
# 生成默认页面如首页的SEO数据
def _get_default_seo_data(self, context, request, blog_setting):
# Homepage and other default pages
# 生成网站级JSON-LD数据含搜索功能描述
structured_data = {
"@context": "https://schema.org",
"@type": "WebSite",
@ -101,36 +118,44 @@ class SeoOptimizerPlugin(BasePlugin):
"potentialAction": {
"@type": "SearchAction",
"target": f"{request.build_absolute_uri('/search/')}?q={{search_term_string}}",
"query-input": "required name=search_term_string"
"query-input": "required name=search_term_string" # 声明搜索框参数
}
}
return {
"title": f"{blog_setting.site_name} | {blog_setting.site_description}",
"title": f"{blog_setting.site_name} | {blog_setting.site_description}", # 首页标题
"description": blog_setting.site_description,
"keywords": blog_setting.site_keywords,
"meta_tags": "",
"json_ld": structured_data
}
# 分发SEO数据生成逻辑根据页面类型调用对应生成方法
def dispatch_seo_generation(self, metas, context):
# 从上下文获取请求对象
request = context.get('request')
if not request:
return metas
# 获取当前视图名称(判断页面类型)
view_name = request.resolver_match.view_name
# 获取博客全局设置
blog_setting = get_blog_setting()
# 根据不同页面类型生成对应SEO数据
seo_data = None
if view_name == 'blog:detailbyid':
if view_name == 'blog:detailbyid': # 文章详情页
seo_data = self._get_article_seo_data(context, request, blog_setting)
elif view_name == 'blog:category_detail':
elif view_name == 'blog:category_detail': # 分类详情页
seo_data = self._get_category_seo_data(context, request, blog_setting)
# 若未匹配到特定页面类型使用默认SEO数据
if not seo_data:
seo_data = self._get_default_seo_data(context, request, blog_setting)
# 生成JSON-LD脚本标签
json_ld_script = f'<script type="application/ld+json">{json.dumps(seo_data.get("json_ld", {}), ensure_ascii=False, indent=4)}</script>'
# 组合所有SEO标签并返回
return f"""
<title>{seo_data.get("title", "")}</title>
<meta name="description" content="{seo_data.get("description", "")}">
@ -139,4 +164,6 @@ class SeoOptimizerPlugin(BasePlugin):
{json_ld_script}
"""
# 实例化插件,自动注册钩子使其生效
plugin = SeoOptimizerPlugin()

@ -1,18 +1,25 @@
from djangoblog.plugin_manage.base_plugin import BasePlugin
from djangoblog.plugin_manage.base_plugin import BasePlugin
from djangoblog.plugin_manage import hooks
class ViewCountPlugin(BasePlugin):
PLUGIN_NAME = '文章浏览次数统计'
PLUGIN_DESCRIPTION = '统计文章的浏览次数'
PLUGIN_VERSION = '0.1.0'
PLUGIN_AUTHOR = 'liangliangyy'
# 插件元信息定义
PLUGIN_NAME = '文章浏览次数统计' # 插件名称,用于管理界面展示
PLUGIN_DESCRIPTION = '统计文章的浏览次数' # 插件功能描述
PLUGIN_VERSION = '0.1.0' # 插件版本号
PLUGIN_AUTHOR = 'liangliangyy' # 插件作者
# 注册钩子:将统计逻辑绑定到文章内容获取后的钩子
def register_hooks(self):
# 当系统触发'after_article_body_get'钩子文章内容加载完成后执行record_view方法
hooks.register('after_article_body_get', self.record_view)
# 核心功能:记录文章浏览次数
def record_view(self, article, *args, **kwargs):
# 调用文章对象的viewed()方法,实现浏览次数+1的逻辑
# 注viewed()方法需在Article模型中预先定义通常包含计数器自增和保存操作
article.viewed()
plugin = ViewCountPlugin()
# 实例化插件:自动触发钩子注册,使插件生效
plugin = ViewCountPlugin()

@ -5,28 +5,66 @@ from djangoblog.utils import cache
class MemcacheStorage(SessionStorage):
"""
基于Memcache的微信WeRoBot会话存储实现用于在微信机器人开发中持久 持久化存储用户会话数据如对话状态临时信息等
"""
def __init__(self, prefix='ws_'):
self.prefix = prefix
self.cache = cache
"""
初始化会话存储
:param prefix: 缓存键的前缀用于区分不同类型的缓存数据默认'ws_'WeChat Session缩写
"""
self.prefix = prefix # 缓存键前缀
self.cache = cache # 引入Django项目中的缓存实例通常配置为Memcache
@property
def is_available(self):
value = "1"
self.set('checkavaliable', value=value)
return value == self.get('checkavaliable')
"""
检查缓存存储是否可用
:return: 布尔值True表示可用False表示不可用
"""
test_value = "1"
# 尝试写入测试数据
self.set('check_available', value=test_value)
# 读取测试数据并对比,验证缓存读写功能是否正常
return test_value == self.get('check_available')
def key_name(self, s):
return '{prefix}{s}'.format(prefix=self.prefix, s=s)
"""
生成带前缀的缓存键名避免不同类型数据的键冲突
:param s: 原始键名如用户ID
:return: 带前缀的完整键名'ws_user123'
"""
return f'{self.prefix}{s}'
def get(self, id):
id = self.key_name(id)
session_json = self.cache.get(id) or '{}'
"""
从缓存中获取会话数据
:param id: 会话ID通常为用户标识如OpenID
:return: 反序列化后的会话数据字典
"""
# 生成带前缀的键名
cache_key = self.key_name(id)
# 从缓存读取数据默认返回空JSON字符串'{}'
session_json = self.cache.get(cache_key) or '{}'
# 将JSON字符串反序列化为Python字典
return json_loads(session_json)
def set(self, id, value):
id = self.key_name(id)
self.cache.set(id, json_dumps(value))
"""
将会话数据存入缓存
:param id: 会话ID
:param value: 要存储的会话数据字典类型
"""
cache_key = self.key_name(id)
# 将数据字典序列化为JSON字符串后存入缓存
self.cache.set(cache_key, json_dumps(value))
def delete(self, id):
id = self.key_name(id)
self.cache.delete(id)
"""
从缓存中删除会话数据
:param id: 会话ID
"""
cache_key = self.key_name(id)
# 删除指定键的缓存数据
self.cache.delete(cache_key)

@ -2,18 +2,36 @@ from django.contrib import admin
# Register your models here.
# 定义Commands模型在Admin后台的管理配置类
class CommandsAdmin(admin.ModelAdmin):
# 配置列表页显示的字段:命令标题、命令内容、命令描述
# 作用在Admin后台查看Commands列表时直接展示这三个核心字段无需点击进入详情页
list_display = ('title', 'command', 'describe')
# 定义EmailSendLog模型在Admin后台的管理配置类
class EmailSendLogAdmin(admin.ModelAdmin):
# 配置列表页显示的字段:邮件标题、收件人、发送结果、创建时间
# 作用:快速预览邮件发送的关键信息,尤其是发送结果(成功/失败)和时间
list_display = ('title', 'emailto', 'send_result', 'creation_time')
# 配置详情页的只读字段:用户无法修改这些字段的值
# 作用:邮件发送记录属于日志类数据,通常不允许手动编辑,确保数据真实性
readonly_fields = (
'title',
'emailto',
'send_result',
'creation_time',
'content')
'title', # 邮件标题
'emailto', # 收件人
'send_result', # 发送结果True/False
'creation_time',# 记录创建时间
'content' # 邮件内容
)
# 重写添加权限方法禁止在Admin后台手动添加邮件发送记录
# 作用:邮件发送记录应由系统自动生成(如发送邮件时触发),避免手动录入虚假日志
def has_add_permission(self, request):
return False
# 注意需补充模型注册代码才能在Admin后台显示示例如下
# from .models import commands, EmailSendLog
# admin.site.register(commands, CommandsAdmin)
# admin.site.register(EmailSendLog, EmailSendLogAdmin)

@ -5,23 +5,51 @@ from blog.models import Article, Category
class BlogApi:
def __init__(self):
# 初始化Haystack的搜索查询集SearchQuerySet
self.searchqueryset = SearchQuerySet()
# 执行空关键词自动查询,初始化查询集(无实际筛选,仅构建基础查询对象)
self.searchqueryset.auto_query('')
# 定义默认最大返回数量最多返回8条数据
self.__max_takecount__ = 8
def search_articles(self, query):
"""
根据关键词搜索文章
:param query: 搜索关键词
:return: 匹配的文章查询集最多8条
"""
# 基于关键词执行自动查询Haystack会处理分词、索引匹配等逻辑
sqs = self.searchqueryset.auto_query(query)
# 加载关联的完整模型数据(避免后续访问关联字段时触发额外数据库查询)
sqs = sqs.load_all()
# 返回前N条结果N=__max_takecount__
return sqs[:self.__max_takecount__]
def get_category_lists(self):
"""
获取所有文章分类
:return: 所有Category模型对象的查询集
"""
# 查询并返回所有分类(无数量限制,因分类通常数量较少)
return Category.objects.all()
def get_category_articles(self, categoryname):
"""
根据分类名称获取该分类下的文章
:param categoryname: 分类名称
:return: 该分类下的文章查询集最多8条无匹配时返回None
"""
# 筛选分类名称匹配的文章通过外键category关联查询
articles = Article.objects.filter(category__name=categoryname)
# 若有匹配文章返回前8条无匹配则返回None
if articles:
return articles[:self.__max_takecount__]
return None
def get_recent_articles(self):
return Article.objects.all()[:self.__max_takecount__]
"""
获取最新发布的文章
:return: 最新的文章查询集最多8条
"""
# 查询所有文章默认按主键降序通常主键自增等价于按发布时间降序返回前8条
return Article.objects.all()[:self.__max_takecount__]

@ -3,62 +3,100 @@ import os
import openai
# 注意原代码导入可能存在拼写问题推测应为从servermanager.models导入Commands模型首字母通常大写
from servermanager.models import commands
# 初始化日志记录器,用于记录操作日志和错误信息
logger = logging.getLogger(__name__)
# 从环境变量获取OpenAI API密钥并配置
openai.api_key = os.environ.get('OPENAI_API_KEY')
# 若环境变量中配置了HTTP代理为OpenAI客户端设置代理
if os.environ.get('HTTP_PROXY'):
openai.proxy = os.environ.get('HTTP_PROXY')
class ChatGPT:
"""封装OpenAI GPT-3.5-turbo模型的对话功能"""
@staticmethod
def chat(prompt):
"""
调用GPT-3.5-turbo模型生成对话响应
:param prompt: 用户输入的提示词
:return: 模型生成的响应内容出错时返回错误提示
"""
try:
completion = openai.ChatCompletion.create(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}])
# 调用OpenAI的ChatCompletion接口使用gpt-3.5-turbo模型
completion = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}] # 构造用户角色的对话消息
)
# 提取并返回模型生成的内容取第一个选项的message内容
return completion.choices[0].message.content
except Exception as e:
# 记录异常信息到日志
logger.error(e)
# 返回友好的错误提示给用户
return "服务器出错了"
class CommandHandler:
"""处理命令查询、执行与帮助信息展示的类"""
def __init__(self):
"""初始化加载所有已配置的命令从commands模型查询"""
self.commands = commands.objects.all()
def run(self, title):
"""
运行命令
:param title: 命令
:return: 返回命令执行结果
根据命令标题查找并执行对应的命令
:param title: 用户输入的命令标题不区分大小写
:return: 命令执行结果未找到命令时返回帮助提示
"""
# 过滤出标题(不区分大小写)匹配的命令
cmd = list(
filter(
lambda x: x.title.upper() == title.upper(),
self.commands))
self.commands
)
)
# 若找到匹配命令,执行命令;否则返回未找到提示
if cmd:
return self.__run_command__(cmd[0].command)
else:
return "未找到相关命令请输入hepme获得帮助。"
return "未找到相关命令请输入hepme获得帮助。" # 注意:推测"hepme"应为"help"的笔误
def __run_command__(self, cmd):
"""
私有方法执行系统命令并返回结果
:param cmd: 要执行的系统命令字符串
:return: 命令执行输出结果出错时返回错误提示
"""
try:
# 使用os.popen执行命令并读取输出os.popen安全性较低不建议执行用户输入的未知命令
res = os.popen(cmd).read()
return res
except BaseException:
return '命令执行出错!'
def get_help(self):
"""
生成所有命令的帮助信息
:return: 包含命令标题和描述的字符串每行一条命令
"""
rsp = ''
# 遍历所有命令,拼接"命令标题:命令描述"格式的帮助信息
for cmd in self.commands:
rsp += '{c}:{d}\n'.format(c=cmd.title, d=cmd.describe)
return rsp
# 主程序入口测试ChatGPT类功能
if __name__ == '__main__':
# 实例化ChatGPT对象
chatbot = ChatGPT()
# 定义测试提示词生成1000字关于AI的论文
prompt = "写一篇1000字关于AI的论文"
# 调用chat方法并打印结果原代码缺少右括号此处已补充
print(chatbot.chat(prompt))

@ -2,4 +2,17 @@ from django.apps import AppConfig
class ServermanagerConfig(AppConfig):
# 定义Django应用“servermanager”的配置类用于管理应用的元数据和初始化行为
# 指定应用的唯一标识名称,必须与应用目录名一致
# 项目中通过该名称引用此应用如在INSTALLED_APPS注册、迁移命令指定应用等
name = 'servermanager'
# 可选扩展配置(当前未设置,可根据需求添加):
# 1. 应用的可读名称用于Admin后台等界面显示默认显示“servermanager”
# verbose_name = '服务器管理'
# 2. 模型默认主键字段类型Django 3.2+推荐显式指定,避免版本兼容问题)
# default_auto_field = 'django.db.models.BigAutoField'
# 3. 应用就绪时的初始化操作(如注册信号、加载扩展功能等)
# def ready(self):
# import servermanager.signals # 导入信号模块

@ -1,45 +1,73 @@
# Generated by Django 4.1.7 on 2023-03-02 07:14
# 由 Django 4.1.7 于 2023年3月2日 07:14 自动生成
# 该文件是Django的数据迁移文件用于初始化创建两个核心数据模型的数据库表结构
from django.db import migrations, models
class Migration(migrations.Migration):
# 标记为初始迁移(首次创建模型时使用,无前置迁移依赖)
initial = True
# 依赖的其他迁移文件:初始迁移无依赖,为空列表
dependencies = [
]
# 迁移操作列表:包含两个模型的创建操作
operations = [
# 1. 创建 "commands" 模型(用于存储预设系统命令)
migrations.CreateModel(
name='commands',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 自增主键字段:大整数类型,自动创建,作为主键
('id', models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name='ID'
)),
# 命令标题字段字符串类型最大长度300用于标识命令如"查看日志"
('title', models.CharField(max_length=300, verbose_name='命令标题')),
# 命令内容字段字符串类型最大长度2000存储实际执行的系统命令如"ls /var/log"
('command', models.CharField(max_length=2000, verbose_name='命令')),
# 命令描述字段字符串类型最大长度300说明命令功能如"查看系统日志目录内容"
('describe', models.CharField(max_length=300, verbose_name='命令描述')),
# 创建时间字段:自动记录模型创建时的时间,后续不自动更新
('created_time', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
# 修改时间字段:自动记录模型每次更新时的时间
('last_mod_time', models.DateTimeField(auto_now=True, verbose_name='修改时间')),
],
# 模型元配置
options={
'verbose_name': '命令',
'verbose_name_plural': '命令',
'verbose_name': '命令', # 模型单数显示名称如Admin后台中
'verbose_name_plural': '命令', # 模型复数显示名称(与单数一致,避免中文复数歧义)
},
),
# 2. 创建 "EmailSendLog" 模型(用于记录邮件发送历史)
migrations.CreateModel(
name='EmailSendLog',
fields=[
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 自增主键字段:大整数类型,自动创建,作为主键
('id', models.BigAutoField(
auto_created=True,
primary_key=True,
serialize=False,
verbose_name='ID'
)),
# 收件人字段字符串类型最大长度300存储收件人邮箱可多个用逗号分隔
('emailto', models.CharField(max_length=300, verbose_name='收件人')),
# 邮件标题字段字符串类型最大长度2000存储邮件主题
('title', models.CharField(max_length=2000, verbose_name='邮件标题')),
# 邮件内容字段:文本类型,存储邮件正文(支持长文本,无长度限制)
('content', models.TextField(verbose_name='邮件内容')),
# 发送结果字段布尔类型默认值False未成功标记邮件是否发送成功
('send_result', models.BooleanField(default=False, verbose_name='结果')),
# 创建时间字段:自动记录邮件发送记录创建时的时间
('created_time', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
],
# 模型元配置
options={
'verbose_name': '邮件发送log',
'verbose_name_plural': '邮件发送log',
'ordering': ['-created_time'],
'verbose_name': '邮件发送log', # 模型单数显示名称
'verbose_name_plural': '邮件发送log', # 模型复数显示名称
'ordering': ['-created_time'], # 默认排序:按创建时间降序(最新记录在前)
},
),
]

@ -1,32 +1,41 @@
# Generated by Django 4.2.5 on 2023-09-06 13:19
# 由 Django 4.2.5 于 2023年9月6日 13:19 自动生成
# 该文件是Django的数据迁移文件用于修改已存在的模型字段名和配置属于数据库结构的迭代更新
from django.db import migrations
class Migration(migrations.Migration):
# 依赖的迁移文件:依赖于'servermanager'应用下的初始迁移0001_initial
# 意味着执行当前迁移前必须先完成0001_initial迁移即创建commands和EmailSendLog模型的表结构
dependencies = [
('servermanager', '0001_initial'),
]
# 迁移操作列表:包含模型配置修改和字段重命名两类操作
operations = [
# 1. 修改EmailSendLog模型的配置选项
migrations.AlterModelOptions(
name='emailsendlog',
name='emailsendlog', # 目标模型名称
# 更新后的模型选项:
# - 排序规则改为按'creation_time'降序(原先是按'created_time'降序,因字段重命名同步调整)
# - 单数和复数显示名称保持不变(仍为"邮件发送log"
options={'ordering': ['-creation_time'], 'verbose_name': '邮件发送log', 'verbose_name_plural': '邮件发送log'},
),
# 2. 重命名commands模型的"created_time"字段为"creation_time"
migrations.RenameField(
model_name='commands',
old_name='created_time',
new_name='creation_time',
model_name='commands', # 目标模型名称
old_name='created_time', # 原字段名
new_name='creation_time', # 新字段名
),
# 3. 重命名commands模型的"last_mod_time"字段为"last_modify_time"
migrations.RenameField(
model_name='commands',
old_name='last_mod_time',
new_name='last_modify_time',
model_name='commands', # 目标模型名称
old_name='last_mod_time', # 原字段名(简写形式)
new_name='last_modify_time', # 新字段名(完整形式,统一命名风格)
),
# 4. 重命名EmailSendLog模型的"created_time"字段为"creation_time"
migrations.RenameField(
model_name='emailsendlog',
old_name='created_time',
new_name='creation_time',
model_name='emailsendlog', # 目标模型名称
old_name='created_time', # 原字段名
new_name='creation_time', # 新字段名
),
]
]

@ -3,31 +3,51 @@ from django.db import models
# Create your models here.
class commands(models.Model):
"""
存储预设系统命令的模型用于管理可执行的系统指令
"""
# 命令标题:用于标识命令(如"查看系统状态"字符串类型最大长度300
title = models.CharField('命令标题', max_length=300)
# 命令内容:实际执行的系统命令字符串(如"df -h"最大长度2000
command = models.CharField('命令', max_length=2000)
# 命令描述:说明命令的功能和用途,方便管理员理解
describe = models.CharField('命令描述', max_length=300)
# 创建时间:自动记录命令添加时间,创建时自动填充,后续不更新
creation_time = models.DateTimeField('创建时间', auto_now_add=True)
# 修改时间:自动记录命令最后一次修改时间,每次保存时更新
last_modify_time = models.DateTimeField('修改时间', auto_now=True)
# 定义模型实例的字符串表示形式,返回命令标题
def __str__(self):
return self.title
# 模型元数据配置
class Meta:
verbose_name = '命令'
verbose_name_plural = verbose_name
verbose_name = '命令' # 模型的单数显示名称
verbose_name_plural = verbose_name # 复数显示名称与单数一致
class EmailSendLog(models.Model):
"""
记录邮件发送历史的日志模型用于追踪邮件发送状态
"""
# 收件人存储收件人邮箱地址多个邮箱用逗号分隔最大长度300
emailto = models.CharField('收件人', max_length=300)
# 邮件标题存储邮件的主题最大长度2000
title = models.CharField('邮件标题', max_length=2000)
# 邮件内容:存储邮件正文,文本类型(无长度限制)
content = models.TextField('邮件内容')
# 发送结果布尔值标记邮件是否发送成功默认False未成功
send_result = models.BooleanField('结果', default=False)
# 创建时间:自动记录日志创建时间(即邮件发送时间)
creation_time = models.DateTimeField('创建时间', auto_now_add=True)
# 定义模型实例的字符串表示形式,返回邮件标题
def __str__(self):
return self.title
# 模型元数据配置
class Meta:
verbose_name = '邮件发送log'
verbose_name_plural = verbose_name
ordering = ['-creation_time']
verbose_name = '邮件发送log' # 模型的单数显示名称
verbose_name_plural = verbose_name # 复数显示名称与单数一致
ordering = ['-creation_time'] # 默认按创建时间降序排序(最新记录在前)

@ -13,175 +13,222 @@ from servermanager.api.blogapi import BlogApi
from servermanager.api.commonapi import ChatGPT, CommandHandler
from .MemcacheStorage import MemcacheStorage
robot = WeRoBot(token=os.environ.get('DJANGO_WEROBOT_TOKEN')
or 'lylinux', enable_session=True)
# 初始化微信机器人WeRoBot
# 从环境变量获取Token默认值为'lylinux';启用会话功能以保存用户状态
robot = WeRoBot(
token=os.environ.get('DJANGO_WEROBOT_TOKEN') or 'lylinux',
enable_session=True
)
# 配置会话存储优先使用Memcache失败则降级为文件存储
memstorage = MemcacheStorage()
if memstorage.is_available:
if memstorage.is_available: # 检查Memcache是否可用
robot.config['SESSION_STORAGE'] = memstorage
else:
if os.path.exists(os.path.join(settings.BASE_DIR, 'werobot_session')):
os.remove(os.path.join(settings.BASE_DIR, 'werobot_session'))
# 清理旧的文件存储,避免残留数据
session_file = os.path.join(settings.BASE_DIR, 'werobot_session')
if os.path.exists(session_file):
os.remove(session_file)
# 使用文件存储会话数据适合开发或Memcache不可用场景
robot.config['SESSION_STORAGE'] = FileStorage(filename='werobot_session')
blogapi = BlogApi()
cmd_handler = CommandHandler()
logger = logging.getLogger(__name__)
# 初始化依赖组件
blogapi = BlogApi() # 博客数据接口(文章搜索、分类查询等)
cmd_handler = CommandHandler() # 系统命令处理(执行预设命令)
logger = logging.getLogger(__name__) # 日志记录器
def convert_to_article_reply(articles, message):
"""
将文章列表转换为微信公众号的图文消息回复格式
:param articles: 文章对象列表
:param message: 微信接收的消息对象用于构建回复
:return: ArticlesReply 图文回复对象
"""
reply = ArticlesReply(message=message)
# 导入自定义模板标签,用于截取文章内容作为描述
from blog.templatetags.blog_tags import truncatechars_content
for post in articles:
# 正则提取文章正文中的第一张图片(作为图文消息封面)
imgs = re.findall(r'(?:http\:|https\:)?\/\/.*\.(?:png|jpg)', post.body)
imgurl = ''
if imgs:
imgurl = imgs[0]
imgurl = imgs[0] if imgs else '' # 无图片则用空字符串
# 构建单条图文信息
article = Article(
title=post.title,
description=truncatechars_content(post.body),
img=imgurl,
url=post.get_full_url()
title=post.title, # 文章标题
description=truncatechars_content(post.body), # 截取内容作为描述
img=imgurl, # 封面图片URL
url=post.get_full_url() # 文章详情页URL
)
reply.add_article(article)
reply.add_article(article) # 添加到图文回复中
return reply
@robot.filter(re.compile(r"^\?.*"))
# ------------------------------ 微信消息处理过滤器 ------------------------------
@robot.filter(re.compile(r"^\?.*")) # 匹配以"?"开头的消息(文章搜索)
def search(message, session):
s = message.content
searchstr = str(s).replace('?', '')
result = blogapi.search_articles(searchstr)
"""处理文章搜索:输入“?关键词”返回匹配的图文消息"""
searchstr = message.content.replace('?', '') # 提取关键词(去除开头的"?"
result = blogapi.search_articles(searchstr) # 调用博客接口搜索文章
if result:
articles = list(map(lambda x: x.object, result))
reply = convert_to_article_reply(articles, message)
return reply
# 将搜索结果SearchQuerySet转换为文章对象列表
articles = [x.object for x in result]
return convert_to_article_reply(articles, message) # 返回图文回复
else:
return '没有找到相关文章。'
return '没有找到相关文章。' # 无结果提示
@robot.filter(re.compile(r'^category\s*$', re.I))
@robot.filter(re.compile(r'^category\s*$', re.I)) # 匹配"category"(不区分大小写)
def category(message, session):
categorys = blogapi.get_category_lists()
content = ','.join(map(lambda x: x.name, categorys))
return '所有文章分类目录:' + content
"""获取所有文章分类输入“category”返回分类列表"""
categorys = blogapi.get_category_lists() # 调用接口获取所有分类
# 拼接分类名称为字符串如“Python,Java,前端”)
category_names = ','.join([x.name for x in categorys])
return f'所有文章分类目录:{category_names}'
@robot.filter(re.compile(r'^recent\s*$', re.I))
@robot.filter(re.compile(r'^recent\s*$', re.I)) # 匹配"recent"(不区分大小写)
def recents(message, session):
articles = blogapi.get_recent_articles()
"""获取最新文章输入“recent”返回最新文章的图文消息"""
articles = blogapi.get_recent_articles() # 调用接口获取最新文章
if articles:
reply = convert_to_article_reply(articles, message)
return reply
return convert_to_article_reply(articles, message) # 返回图文回复
else:
return "暂时还没有文章"
return "暂时还没有文章" # 无文章提示
@robot.filter(re.compile('^help$', re.I))
@robot.filter(re.compile('^help$', re.I)) # 匹配"help"(不区分大小写)
def help(message, session):
"""获取帮助输入“help”返回功能说明"""
return '''欢迎关注!
默认会与图灵机器人聊天~~
你可以通过下面这些命令来获得信息
?关键字搜索文章.
?python.
category获得文章分类目录及文章数.
category-***获得该分类目录文章
如category-python
recent获得最新文章
help获得帮助.
weather:获得天气
如weather:西安
idcard:获得身份证信息
如idcard:61048119xxxxxxxxxx
music:音乐搜索
如music:阴天快乐
PS:以上标点符号都不支持中文标点~~
'''
@robot.filter(re.compile(r'^weather\:.*$', re.I))
默认会与图灵机器人聊天~~
你可以通过下面这些命令来获得信息
1. ?关键字 搜索文章?python
2. category 获得文章分类目录
3. category-*** 获得该分类下的文章如category-python
4. recent 获得最新文章
5. help 获得帮助
6. weather:城市 获得天气如weather:西安
7. idcard:号码 获得身份证信息如idcard:61048119xxxxxxxxxx
8. music:歌名 音乐搜索如music:阴天快乐
PS: 以上标点符号不支持中文标点~~
'''
@robot.filter(re.compile(r'^weather\:.*$', re.I)) # 匹配"weather:城市"格式
def weather(message, session):
"""天气查询(待开发):返回“建设中”提示"""
return "建设中..."
@robot.filter(re.compile(r'^idcard\:.*$', re.I))
@robot.filter(re.compile(r'^idcard\:.*$', re.I)) # 匹配"idcard:号码"格式
def idcard(message, session):
"""身份证信息查询(待开发):返回“建设中”提示"""
return "建设中..."
@robot.handler
# ------------------------------ 默认消息处理器 ------------------------------
@robot.handler # 未被上述过滤器匹配的消息,进入此默认处理器
def echo(message, session):
"""默认消息处理转发给MessageHandler处理用户状态管理、管理员命令等"""
handler = MessageHandler(message, session)
return handler.handler()
# ------------------------------ 用户状态与命令管理 ------------------------------
class MessageHandler:
"""处理用户消息的核心类:管理用户状态(普通用户/管理员)、执行管理员命令"""
def __init__(self, message, session):
userid = message.source
self.message = message
self.session = session
self.userid = userid
self.message = message # 微信消息对象
self.session = session # 会话存储(保存用户状态)
self.userid = message.source # 用户唯一标识微信OpenID
# 从会话中加载用户状态用jsonpickle反序列化
try:
info = session[userid]
self.userinfo = jsonpickle.decode(info)
except Exception as e:
userinfo = WxUserInfo()
self.userinfo = userinfo
user_info_json = session[self.userid]
self.userinfo = jsonpickle.decode(user_info_json)
except Exception:
# 会话中无用户状态,初始化新的用户信息
self.userinfo = WxUserInfo()
@property
def is_admin(self):
"""判断当前用户是否处于“管理员模式”"""
return self.userinfo.isAdmin
@property
def is_password_set(self):
"""判断管理员是否已通过密码验证"""
return self.userinfo.isPasswordSet
def save_session(self):
info = jsonpickle.encode(self.userinfo)
self.session[self.userid] = info
"""将用户状态序列化后保存到会话"""
user_info_json = jsonpickle.encode(self.userinfo)
self.session[self.userid] = user_info_json
def handler(self):
info = self.message.content
"""核心处理逻辑:根据用户状态分发消息处理"""
user_input = self.message.content # 用户输入的内容
if self.userinfo.isAdmin and info.upper() == 'EXIT':
self.userinfo = WxUserInfo()
# 1. 管理员退出已验证的管理员输入“EXIT”退出管理员模式
if self.is_admin and self.is_password_set and user_input.upper() == 'EXIT':
self.userinfo = WxUserInfo() # 重置用户状态为普通用户
self.save_session()
return "退出成功"
if info.upper() == 'ADMIN':
self.userinfo.isAdmin = True
# 2. 进入管理员模式普通用户输入“ADMIN”触发管理员验证流程
if user_input.upper() == 'ADMIN' and not self.is_admin:
self.userinfo.isAdmin = True # 标记为管理员模式
self.save_session()
return "输入管理员密码"
if self.userinfo.isAdmin and not self.userinfo.isPasswordSet:
passwd = settings.WXADMIN
if settings.TESTING:
passwd = '123'
if passwd.upper() == get_sha256(get_sha256(info)).upper():
self.userinfo.isPasswordSet = True
# 3. 管理员密码验证:处于管理员模式但未验证密码
if self.is_admin and not self.is_password_set:
# 获取正确密码(测试环境用'123'正式环境用settings中的WXADMIN
correct_passwd = '123' if settings.TESTING else settings.WXADMIN
# 密码加密比对两次SHA256加密避免明文传输风险
input_passwd = get_sha256(get_sha256(user_input)).upper()
if input_passwd == correct_passwd.upper():
self.userinfo.isPasswordSet = True # 标记为已验证
self.save_session()
return "验证通过,请输入命令或者要执行的命令代码:输入helpme获得帮助"
return "验证通过,请输入命令或执行代码:输入helpme获得帮助"
else:
# 密码错误次数限制3次后重置管理员模式
self.userinfo.Count += 1
if self.userinfo.Count >= 3:
self.userinfo = WxUserInfo()
self.userinfo = WxUserInfo() # 重置为普通用户
self.save_session()
return "超过验证次数"
self.userinfo.Count += 1
return "超过验证次数,已退出管理员模式"
self.save_session()
return "验证失败,请重新输入管理员密码:"
if self.userinfo.isAdmin and self.userinfo.isPasswordSet:
if self.userinfo.Command != '' and info.upper() == 'Y':
return f"验证失败(剩余{3 - self.userinfo.Count}次),请重新输入管理员密码:"
# 4. 管理员命令执行:已验证的管理员输入命令
if self.is_admin and self.is_password_set:
# 确认执行命令若之前已输入命令且当前输入“Y”则执行
if self.userinfo.Command != '' and user_input.upper() == 'Y':
return cmd_handler.run(self.userinfo.Command)
# 查看帮助输入“helpme”返回命令列表
elif user_input.upper() == 'HELPME':
return cmd_handler.get_help()
# 暂存命令:输入新命令,提示确认
else:
if info.upper() == 'HELPME':
return cmd_handler.get_help()
self.userinfo.Command = info
self.userinfo.Command = user_input
self.save_session()
return "确认执行: " + info + " 命令?"
return f"确认执行命令:{user_input}输入Y执行"
return ChatGPT.chat(info)
# 5. 普通用户默认转发给ChatGPT生成回复
return ChatGPT.chat(user_input)
class WxUserInfo():
class WxUserInfo:
"""用户状态类:存储用户是否为管理员、密码验证状态、命令暂存等信息"""
def __init__(self):
self.isAdmin = False
self.isPasswordSet = False
self.Count = 0
self.Command = ''
self.isAdmin = False # 是否处于管理员模式(默认否)
self.isPasswordSet = False # 是否已通过密码验证(默认否)
self.Count = 0 # 密码错误次数默认0
self.Command = '' # 暂存的管理员命令(默认空)

@ -1,6 +1,6 @@
from django.test import Client, RequestFactory, TestCase
from django.utils import timezone
from werobot.messages.messages import TextMessage
from werobot.messages.messages.messages import TextMessage
from accounts.models import BlogUser
from blog.models import Category, Article
@ -12,68 +12,100 @@ from .robot import search, category, recents
# Create your tests here.
class ServerManagerTest(TestCase):
"""测试servermanager应用的核心功能包括ChatGPT调用、文章查询、命令执行和消息处理逻辑"""
def setUp(self):
self.client = Client()
self.factory = RequestFactory()
"""测试前的初始化工作:创建测试客户端、请求工厂,为后续测试准备基础数据"""
self.client = Client() # Django测试客户端用于模拟HTTP请求
self.factory = RequestFactory() # 请求工厂,用于构建测试请求对象
def test_chat_gpt(self):
content = ChatGPT.chat("你好")
self.assertIsNotNone(content)
"""测试ChatGPT接口调用是否正常返回结果"""
content = ChatGPT.chat("你好") # 调用ChatGPT发送简单问候
self.assertIsNotNone(content) # 断言返回结果不为空(验证接口可用)
def test_validate_comment(self):
"""综合测试:文章搜索、分类查询、命令执行、消息处理器等功能"""
# 1. 创建测试用户(超级管理员)并登录
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",
password="liangliangyy1")
self.client.login(username='liangliangyy1', password='liangliangyy1')
c = Category()
c.name = "categoryccc"
c.save()
article = Article()
article.title = "nicetitleccc"
article.body = "nicecontentccc"
article.author = user
article.category = c
article.type = 'a'
article.status = 'p'
article.save()
s = TextMessage([])
s.content = "nice"
rsp = search(s, None)
rsp = category(None, None)
self.assertIsNotNone(rsp)
rsp = recents(None, None)
self.assertTrue(rsp != '暂时还没有文章')
cmd = commands()
cmd.title = "test"
cmd.command = "ls"
cmd.describe = "test"
cmd.save()
cmdhandler = CommandHandler()
rsp = cmdhandler.run('test')
self.assertIsNotNone(rsp)
s.source = 'u'
s.content = 'test'
msghandler = MessageHandler(s, {})
# msghandler.userinfo.isPasswordSet = True
# msghandler.userinfo.isAdmin = True
msghandler.handler()
s.content = 'y'
msghandler.handler()
s.content = 'idcard:12321233'
msghandler.handler()
s.content = 'weather:上海'
msghandler.handler()
s.content = 'admin'
msghandler.handler()
s.content = '123'
msghandler.handler()
s.content = 'exit'
msghandler.handler()
password="liangliangyy1"
)
self.client.login(username='liangliangyy1', password='liangliangyy1') # 模拟登录
# 2. 创建测试分类和文章(用于后续搜索、分类查询测试)
test_category = Category(name="categoryccc")
test_category.save() # 保存分类到数据库
test_article = Article(
title="nicetitleccc",
body="nicecontentccc",
author=user,
category=test_category,
type='a', # 假设'a'表示文章类型
status='p' # 假设'p'表示已发布
)
test_article.save() # 保存文章到数据库
# 3. 测试文章搜索功能
# 创建模拟文本消息(内容为"nice",用于搜索)
search_msg = TextMessage([])
search_msg.content = "nice"
search_rsp = search(search_msg, None) # 调用搜索函数
# (此处未断言搜索结果,因实际结果依赖搜索配置,仅验证无异常)
# 4. 测试分类查询功能
category_rsp = category(None, None) # 调用分类查询函数
self.assertIsNotNone(category_rsp) # 断言返回结果不为空
# 5. 测试最新文章查询功能
recents_rsp = recents(None, None) # 调用最新文章查询函数
self.assertTrue(recents_rsp != '暂时还没有文章') # 断言返回结果不是"无文章"提示
# 6. 测试命令执行功能
# 创建测试命令
test_cmd = commands(
title="test",
command="ls", # 简单的系统命令(列出目录内容)
describe="test"
)
test_cmd.save() # 保存命令到数据库
cmd_handler = CommandHandler()
cmd_rsp = cmd_handler.run('test') # 执行"test"命令
self.assertIsNotNone(cmd_rsp) # 断言命令执行结果不为空
# 7. 测试消息处理器MessageHandler的各种场景
# 模拟用户消息(来源为'u',内容为'test'
msg = TextMessage([])
msg.source = 'u'
msg.content = 'test'
msg_handler = MessageHandler(msg, {}) # 初始化消息处理器(空会话)
# 7.1 测试普通消息处理(非管理员模式)
msg_handler.handler() # 处理内容为'test'的消息
# 7.2 测试命令确认(假设已处于管理员模式,输入'y'确认执行)
msg.content = 'y'
msg_handler.handler()
# 7.3 测试待开发功能(身份证查询)
msg.content = 'idcard:12321233'
msg_handler.handler()
# 7.4 测试待开发功能(天气查询)
msg.content = 'weather:上海'
msg_handler.handler()
# 7.5 测试进入管理员模式(输入'admin'
msg.content = 'admin'
msg_handler.handler()
# 7.6 测试管理员密码验证(输入'123'
msg.content = '123'
msg_handler.handler()
# 7.7 测试退出管理员模式(输入'exit'
msg.content = 'exit'
msg_handler.handler()

@ -1,10 +1,15 @@
from django.urls import path
from werobot.contrib.django import make_view
# 导入微信机器人实例已在robot.py中初始化配置
from .robot import robot
# 定义应用的命名空间用于在模板和反向解析中标识该应用的URL
app_name = "servermanager"
# URL路由配置将微信机器人视图绑定到指定路径
urlpatterns = [
# 将robot实例通过make_view转换为Django视图绑定到'/robot'路径
# 微信公众号服务器会将用户消息POST到该路径由robot实例处理
path(r'robot', make_view(robot)),
]

@ -1 +1,2 @@
print('hello world')
# This is the main Python file

Loading…
Cancel
Save