master
周潇潇 1 month ago
parent 15b198fde5
commit df7ae3a5fd

@ -1,32 +1,39 @@
# MemcacheStorage.py
from werobot.session import SessionStorage
from werobot.utils import json_loads, json_dumps
from djangoblog.utils import cache
from djangoblog.utils import cache # 导入Django缓存工具
class MemcacheStorage(SessionStorage):
# 基于Memcache的会话存储实现
def __init__(self, prefix='ws_'):
self.prefix = prefix
self.cache = cache
self.prefix = prefix # 缓存键前缀
self.cache = cache # Django缓存实例
@property
def is_available(self):
# 检查Memcache是否可用
value = "1"
self.set('checkavaliable', value=value)
return value == self.get('checkavaliable')
def key_name(self, s):
# 生成完整的缓存键名
return '{prefix}{s}'.format(prefix=self.prefix, s=s)
def get(self, id):
# 从缓存获取会话数据
id = self.key_name(id)
session_json = self.cache.get(id) or '{}'
return json_loads(session_json)
session_json = self.cache.get(id) or '{}' # 如果不存在返回空字典的JSON
return json_loads(session_json) # 解析JSON字符串为Python对象
def set(self, id, value):
# 设置会话数据到缓存
id = self.key_name(id)
self.cache.set(id, json_dumps(value))
self.cache.set(id, json_dumps(value)) # 将Python对象序列化为JSON字符串存储
def delete(self, id):
# 删除会话数据
id = self.key_name(id)
self.cache.delete(id)
self.cache.delete(id) # 从缓存中删除

@ -1,14 +1,17 @@
# admin.py
from django.contrib import admin
# Register your models here.
class CommandsAdmin(admin.ModelAdmin):
list_display = ('title', 'command', 'describe')
# 命令模型的后台管理配置
list_display = ('title', 'command', 'describe') # 列表页显示的字段
class EmailSendLogAdmin(admin.ModelAdmin):
list_display = ('title', 'emailto', 'send_result', 'creation_time')
readonly_fields = (
# 邮件发送日志的后台管理配置
list_display = ('title', 'emailto', 'send_result', 'creation_time') # 列表页显示的字段
readonly_fields = ( # 只读字段,不能编辑
'title',
'emailto',
'send_result',
@ -16,4 +19,5 @@ class EmailSendLogAdmin(admin.ModelAdmin):
'content')
def has_add_permission(self, request):
return False
# 禁止在后台添加新的邮件日志记录
return False

@ -1,27 +1,45 @@
# blogapi.py
from haystack.query import SearchQuerySet
from blog.models import Article, Category
class BlogApi:
"""博客API类提供博客文章的搜索和查询功能"""
def __init__(self):
# 初始化搜索查询集
self.searchqueryset = SearchQuerySet()
self.searchqueryset.auto_query('')
# 设置最大返回结果数量
self.__max_takecount__ = 8
def search_articles(self, query):
"""
搜索文章
:param query: 搜索查询字符串
:return: 搜索结果的查询集最多8条
"""
# 使用Haystack进行自动查询
sqs = self.searchqueryset.auto_query(query)
sqs = sqs.load_all()
return sqs[:self.__max_takecount__]
sqs = sqs.load_all() # 加载所有相关数据
return sqs[:self.__max_takecount__] # 返回前8条结果
def get_category_lists(self):
"""获取所有分类列表"""
return Category.objects.all()
def get_category_articles(self, categoryname):
"""
获取指定分类下的文章
:param categoryname: 分类名称
:return: 该分类下的文章列表最多8篇或None
"""
articles = Article.objects.filter(category__name=categoryname)
if articles:
return articles[:self.__max_takecount__]
return None
def get_recent_articles(self):
return Article.objects.all()[:self.__max_takecount__]
"""获取最近的文章列表最多8篇"""
return Article.objects.all()[:self.__max_takecount__]

@ -7,58 +7,87 @@ from servermanager.models import commands
logger = logging.getLogger(__name__)
# 设置OpenAI API密钥从环境变量中获取
openai.api_key = os.environ.get('OPENAI_API_KEY')
# 如果设置了HTTP代理则配置OpenAI使用代理
if os.environ.get('HTTP_PROXY'):
openai.proxy = os.environ.get('HTTP_PROXY')
class ChatGPT:
"""ChatGPT API封装类提供与GPT-3.5-turbo模型的对话功能"""
@staticmethod
def chat(prompt):
"""
静态方法向ChatGPT发送提示并获取回复
:param prompt: 用户输入的提示文本
:return: ChatGPT的回复内容或错误信息
"""
try:
# 调用OpenAI ChatCompletion API创建对话完成
completion = openai.ChatCompletion.create(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}])
# 返回第一个选择中的消息内容
return completion.choices[0].message.content
except Exception as e:
# 记录错误日志并返回用户友好的错误信息
logger.error(e)
return "服务器出错了"
class CommandHandler:
"""命令处理器类,用于执行预定义的系统命令"""
def __init__(self):
# 初始化时从数据库获取所有命令
self.commands = commands.objects.all()
def run(self, title):
"""
运行命令
:param title: 命令
:return: 返回命令执行结果
:param title: 命令标题
:return: 返回命令执行结果或帮助信息
"""
# 使用filter和lambda函数查找匹配的命令不区分大小写
cmd = list(
filter(
lambda x: x.title.upper() == title.upper(),
self.commands))
if cmd:
# 如果找到命令,执行该命令
return self.__run_command__(cmd[0].command)
else:
# 未找到命令时返回帮助提示
return "未找到相关命令请输入hepme获得帮助。"
def __run_command__(self, cmd):
"""
私有方法实际执行系统命令
:param cmd: 要执行的命令字符串
:return: 命令执行结果或错误信息
"""
try:
# 使用os.popen执行系统命令并读取结果
res = os.popen(cmd).read()
return res
except BaseException:
return '命令执行出错!'
def get_help(self):
"""
获取所有可用命令的帮助信息
:return: 格式化的命令帮助字符串
"""
rsp = ''
# 遍历所有命令,格式化输出命令标题和描述
for cmd in self.commands:
rsp += '{c}:{d}\n'.format(c=cmd.title, d=cmd.describe)
return rsp
if __name__ == '__main__':
# 测试代码创建ChatGPT实例并测试对话功能
chatbot = ChatGPT()
prompt = "写一篇1000字关于AI的论文"
print(chatbot.chat(prompt))
print(chatbot.chat(prompt))

@ -1,5 +1,12 @@
# apps.py
from django.apps import AppConfig
class ServermanagerConfig(AppConfig):
# ServermanagerConfig 类继承自 Django 的 AppConfig 基类
# 这是 Django 应用的配置类,用于对 servermanager 应用进行配置
name = 'servermanager'
# name 属性指定了该配置类对应的 Django 应用的名称
# 这里的值是 'servermanager',表示这个配置类用于配置名为 servermanager 的 Django 应用
# 这个名称应该与应用的目录名保持一致

@ -1,45 +1,61 @@
# Generated by Django 4.1.7 on 2023-03-02 07:14
# 0001_initial.py
# 由Django 4.1.7于2023-03-02 07:14生成
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
initial = True # 这是初始迁移文件
dependencies = [
# 没有依赖项,因为这是第一个迁移
]
operations = [
# 创建commands模型表
migrations.CreateModel(
name='commands',
fields=[
# 主键ID自增BigAutoField
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 命令标题字段最大长度300字符
('title', models.CharField(max_length=300, verbose_name='命令标题')),
# 命令字段最大长度2000字符
('command', models.CharField(max_length=2000, verbose_name='命令')),
# 命令描述字段最大长度300字符
('describe', models.CharField(max_length=300, verbose_name='命令描述')),
# 创建时间字段,自动设置为记录创建时间
('created_time', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
# 最后修改时间字段,自动更新为最后修改时间
('last_mod_time', models.DateTimeField(auto_now=True, verbose_name='修改时间')),
],
options={
'verbose_name': '命令',
'verbose_name_plural': '命令',
'verbose_name': '命令', # 单数显示名称
'verbose_name_plural': '命令', # 复数显示名称
},
),
# 创建EmailSendLog模型表
migrations.CreateModel(
name='EmailSendLog',
fields=[
# 主键ID自增BigAutoField
('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
# 收件人字段最大长度300字符
('emailto', models.CharField(max_length=300, verbose_name='收件人')),
# 邮件标题字段最大长度2000字符
('title', models.CharField(max_length=2000, verbose_name='邮件标题')),
# 邮件内容字段,文本类型
('content', models.TextField(verbose_name='邮件内容')),
# 发送结果字段布尔值默认False
('send_result', models.BooleanField(default=False, verbose_name='结果')),
# 创建时间字段,自动设置为记录创建时间
('created_time', models.DateTimeField(auto_now_add=True, verbose_name='创建时间')),
],
options={
'verbose_name': '邮件发送log',
'verbose_name_plural': '邮件发送log',
'ordering': ['-created_time'],
'verbose_name': '邮件发送log', # 单数显示名称
'verbose_name_plural': '邮件发送log', # 复数显示名称
'ordering': ['-created_time'], # 按创建时间降序排列
},
),
]
]

@ -1,32 +1,42 @@
# Generated by Django 4.2.5 on 2023-09-06 13:19
# 0002_alter_emailsendlog_options_and_more.py
# 由Django 4.2.5于2023-09-06 13:19生成
from django.db import migrations
class Migration(migrations.Migration):
# 依赖之前的初始迁移文件
dependencies = [
('servermanager', '0001_initial'),
]
operations = [
# 修改EmailSendLog模型的选项配置
migrations.AlterModelOptions(
name='emailsendlog',
options={'ordering': ['-creation_time'], 'verbose_name': '邮件发送log', 'verbose_name_plural': '邮件发送log'},
options={
'ordering': ['-creation_time'], # 修改排序字段为creation_time
'verbose_name': '邮件发送log', # 保持单数显示名称不变
'verbose_name_plural': '邮件发送log', # 保持复数显示名称不变
},
),
# 重命名commands模型的created_time字段为creation_time
migrations.RenameField(
model_name='commands',
old_name='created_time',
new_name='creation_time',
old_name='created_time', # 原字段名
new_name='creation_time', # 新字段名
),
# 重命名commands模型的last_mod_time字段为last_modify_time
migrations.RenameField(
model_name='commands',
old_name='last_mod_time',
new_name='last_modify_time',
old_name='last_mod_time', # 原字段名
new_name='last_modify_time', # 新字段名
),
# 重命名EmailSendLog模型的created_time字段为creation_time
migrations.RenameField(
model_name='emailsendlog',
old_name='created_time',
new_name='creation_time',
old_name='created_time', # 原字段名
new_name='creation_time', # 新字段名
),
]
]

@ -1,27 +1,30 @@
# models.py
from django.db import models
# Create your models here.
class commands(models.Model):
# 命令模型,用于存储可执行的系统命令
title = models.CharField('命令标题', max_length=300)
command = models.CharField('命令', max_length=2000)
describe = models.CharField('命令描述', max_length=300)
creation_time = models.DateTimeField('创建时间', auto_now_add=True)
last_modify_time = models.DateTimeField('修改时间', auto_now=True)
creation_time = models.DateTimeField('创建时间', auto_now_add=True) # 自动设置创建时间
last_modify_time = models.DateTimeField('修改时间', auto_now=True) # 自动更新修改时间
def __str__(self):
return self.title
class Meta:
verbose_name = '命令'
verbose_name_plural = verbose_name
verbose_name_plural = verbose_name # 复数形式显示名称
class EmailSendLog(models.Model):
# 邮件发送日志模型
emailto = models.CharField('收件人', max_length=300)
title = models.CharField('邮件标题', max_length=2000)
content = models.TextField('邮件内容')
send_result = models.BooleanField('结果', default=False)
send_result = models.BooleanField('结果', default=False) # 发送结果
creation_time = models.DateTimeField('创建时间', auto_now_add=True)
def __str__(self):
@ -30,4 +33,4 @@ class EmailSendLog(models.Model):
class Meta:
verbose_name = '邮件发送log'
verbose_name_plural = verbose_name
ordering = ['-creation_time']
ordering = ['-creation_time'] # 按创建时间降序排列

@ -1,3 +1,4 @@
# robot.py
import logging
import os
import re
@ -13,8 +14,11 @@ from servermanager.api.blogapi import BlogApi
from servermanager.api.commonapi import ChatGPT, CommandHandler
from .MemcacheStorage import MemcacheStorage
# 创建微信机器人实例使用环境变量中的token或默认值
robot = WeRoBot(token=os.environ.get('DJANGO_WEROBOT_TOKEN')
or 'lylinux', enable_session=True)
# 配置会话存储优先使用Memcache否则使用文件存储
memstorage = MemcacheStorage()
if memstorage.is_available:
robot.config['SESSION_STORAGE'] = memstorage
@ -23,24 +27,28 @@ else:
os.remove(os.path.join(settings.BASE_DIR, 'werobot_session'))
robot.config['SESSION_STORAGE'] = FileStorage(filename='werobot_session')
# 初始化API处理器
blogapi = BlogApi()
cmd_handler = CommandHandler()
logger = logging.getLogger(__name__)
def convert_to_article_reply(articles, message):
# 将文章列表转换为微信图文回复格式
reply = ArticlesReply(message=message)
from blog.templatetags.blog_tags import truncatechars_content
for post in articles:
# 从文章内容中提取图片URL
imgs = re.findall(r'(?:http\:|https\:)?\/\/.*\.(?:png|jpg)', post.body)
imgurl = ''
if imgs:
imgurl = imgs[0]
# 创建图文消息条目
article = Article(
title=post.title,
description=truncatechars_content(post.body),
img=imgurl,
url=post.get_full_url()
description=truncatechars_content(post.body), # 截断文章内容作为描述
img=imgurl, # 文章封面图
url=post.get_full_url() # 文章完整URL
)
reply.add_article(article)
return reply
@ -48,8 +56,9 @@ def convert_to_article_reply(articles, message):
@robot.filter(re.compile(r"^\?.*"))
def search(message, session):
# 搜索文章功能,以?开头触发
s = message.content
searchstr = str(s).replace('?', '')
searchstr = str(s).replace('?', '') # 去除问号得到搜索关键词
result = blogapi.search_articles(searchstr)
if result:
articles = list(map(lambda x: x.object, result))
@ -61,6 +70,7 @@ def search(message, session):
@robot.filter(re.compile(r'^category\s*$', re.I))
def category(message, session):
# 获取所有分类目录
categorys = blogapi.get_category_lists()
content = ','.join(map(lambda x: x.name, categorys))
return '所有文章分类目录:' + content
@ -68,6 +78,7 @@ def category(message, session):
@robot.filter(re.compile(r'^recent\s*$', re.I))
def recents(message, session):
# 获取最新文章
articles = blogapi.get_recent_articles()
if articles:
reply = convert_to_article_reply(articles, message)
@ -78,6 +89,7 @@ def recents(message, session):
@robot.filter(re.compile('^help$', re.I))
def help(message, session):
# 帮助信息
return '''欢迎关注!
默认会与图灵机器人聊天~~
你可以通过下面这些命令来获得信息
@ -100,56 +112,71 @@ def help(message, session):
@robot.filter(re.compile(r'^weather\:.*$', re.I))
def weather(message, session):
# 天气查询功能(建设中)
return "建设中..."
@robot.filter(re.compile(r'^idcard\:.*$', re.I))
def idcard(message, session):
# 身份证查询功能(建设中)
return "建设中..."
@robot.handler
def echo(message, session):
# 默认消息处理器
handler = MessageHandler(message, session)
return handler.handler()
class MessageHandler:
# 消息处理器类,处理用户会话状态和命令执行
def __init__(self, message, session):
userid = message.source
self.message = message
self.session = session
self.userid = userid
try:
# 从会话中获取用户信息
info = session[userid]
self.userinfo = jsonpickle.decode(info)
except Exception as e:
# 如果会话不存在,创建新的用户信息
userinfo = WxUserInfo()
self.userinfo = userinfo
@property
def is_admin(self):
# 检查用户是否为管理员
return self.userinfo.isAdmin
@property
def is_password_set(self):
# 检查管理员密码是否已验证
return self.userinfo.isPasswordSet
def save_session(self):
# 保存用户信息到会话
info = jsonpickle.encode(self.userinfo)
self.session[self.userid] = info
def handler(self):
# 主要消息处理逻辑
info = self.message.content
# 管理员退出处理
if self.userinfo.isAdmin and info.upper() == 'EXIT':
self.userinfo = WxUserInfo()
self.save_session()
return "退出成功"
# 进入管理员模式
if info.upper() == 'ADMIN':
self.userinfo.isAdmin = True
self.save_session()
return "输入管理员密码"
# 管理员密码验证
if self.userinfo.isAdmin and not self.userinfo.isPasswordSet:
passwd = settings.WXADMIN
if settings.TESTING:
@ -166,6 +193,8 @@ class MessageHandler:
self.userinfo.Count += 1
self.save_session()
return "验证失败,请重新输入管理员密码:"
# 管理员命令执行
if self.userinfo.isAdmin and self.userinfo.isPasswordSet:
if self.userinfo.Command != '' and info.upper() == 'Y':
return cmd_handler.run(self.userinfo.Command)
@ -176,12 +205,14 @@ class MessageHandler:
self.save_session()
return "确认执行: " + info + " 命令?"
# 默认使用ChatGPT回复
return ChatGPT.chat(info)
class WxUserInfo():
# 微信用户信息类,存储用户会话状态
def __init__(self):
self.isAdmin = False
self.isPasswordSet = False
self.Count = 0
self.Command = ''
self.isAdmin = False # 是否为管理员
self.isPasswordSet = False # 密码是否已验证
self.Count = 0 # 密码尝试次数
self.Command = '' # 待执行的命令

@ -1,3 +1,4 @@
# tests.py
from django.test import Client, RequestFactory, TestCase
from django.utils import timezone
from werobot.messages.messages import TextMessage
@ -13,25 +14,31 @@ from .robot import search, category, recents
# Create your tests here.
class ServerManagerTest(TestCase):
def setUp(self):
# 初始化测试客户端和请求工厂
self.client = Client()
self.factory = RequestFactory()
def test_chat_gpt(self):
# 测试ChatGPT功能
content = ChatGPT.chat("你好")
self.assertIsNotNone(content)
self.assertIsNotNone(content) # 断言回复内容不为空
def test_validate_comment(self):
# 创建超级用户用于测试
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",
password="liangliangyy1")
# 登录用户
self.client.login(username='liangliangyy1', password='liangliangyy1')
# 创建分类
c = Category()
c.name = "categoryccc"
c.save()
# 创建文章
article = Article()
article.title = "nicetitleccc"
article.body = "nicecontentccc"
@ -40,29 +47,38 @@ class ServerManagerTest(TestCase):
article.type = 'a'
article.status = 'p'
article.save()
# 测试搜索功能
s = TextMessage([])
s.content = "nice"
rsp = search(s, None)
# 测试分类功能
rsp = category(None, None)
self.assertIsNotNone(rsp)
# 测试最近文章功能
rsp = recents(None, None)
self.assertTrue(rsp != '暂时还没有文章')
# 创建测试命令
cmd = commands()
cmd.title = "test"
cmd.command = "ls"
cmd.describe = "test"
cmd.save()
# 测试命令执行
cmdhandler = CommandHandler()
rsp = cmdhandler.run('test')
self.assertIsNotNone(rsp)
# 测试消息处理器
s.source = 'u'
s.content = 'test'
msghandler = MessageHandler(s, {})
# msghandler.userinfo.isPasswordSet = True
# msghandler.userinfo.isAdmin = True
# 测试各种消息处理场景
msghandler.handler()
s.content = 'y'
msghandler.handler()
@ -76,4 +92,4 @@ class ServerManagerTest(TestCase):
msghandler.handler()
s.content = 'exit'
msghandler.handler()
msghandler.handler()

@ -1,10 +1,10 @@
from django.urls import path
from werobot.contrib.django import make_view
from .robot import robot
from .robot import robot # 导入微信机器人实例
app_name = "servermanager"
app_name = "servermanager" # 定义应用命名空间
urlpatterns = [
# 配置微信机器人路由所有发送到robot的请求都由微信机器人处理
path(r'robot', make_view(robot)),
]
]
Loading…
Cancel
Save