|
|
|
|
@ -1,3 +1,6 @@
|
|
|
|
|
#yyd:
|
|
|
|
|
# coding: utf-8
|
|
|
|
|
|
|
|
|
|
# 导入必要的模块
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
@ -79,7 +82,7 @@ def search(message, session):
|
|
|
|
|
|
|
|
|
|
if result:
|
|
|
|
|
# 将搜索结果转换为文章对象列表
|
|
|
|
|
articles = list(map(lambda x: x.object, result))
|
|
|
|
|
articles = [x.object for x in result]
|
|
|
|
|
reply = convert_to_article_reply(articles, message)
|
|
|
|
|
return reply
|
|
|
|
|
else:
|
|
|
|
|
@ -90,11 +93,12 @@ def search(message, session):
|
|
|
|
|
def category(message, session):
|
|
|
|
|
"""
|
|
|
|
|
获取所有文章分类
|
|
|
|
|
触发方式: category
|
|
|
|
|
触发方式:category
|
|
|
|
|
"""
|
|
|
|
|
categorys = blogapi.get_category_lists()
|
|
|
|
|
content = ','.join(map(lambda x: x.name, categorys))
|
|
|
|
|
return '所有文章分类目录:' + content
|
|
|
|
|
content = ','.join([x.name for x in categorys])
|
|
|
|
|
return '所有文章分类目录: ' + content
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@robot.filter(re.compile(r'^recent\s*$', re.I))
|
|
|
|
|
@ -171,7 +175,7 @@ class MessageHandler:
|
|
|
|
|
# 从会话中获取用户信息
|
|
|
|
|
info = session[userid]
|
|
|
|
|
self.userinfo = jsonpickle.decode(info)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
except Exception:
|
|
|
|
|
# 如果会话中没有用户信息,创建新的用户信息对象
|
|
|
|
|
userinfo = WxUserInfo()
|
|
|
|
|
self.userinfo = userinfo
|
|
|
|
|
@ -193,63 +197,92 @@ class MessageHandler:
|
|
|
|
|
|
|
|
|
|
def handler(self):
|
|
|
|
|
"""主消息处理方法"""
|
|
|
|
|
info = self.message.content
|
|
|
|
|
info = self.message.content.strip().upper()
|
|
|
|
|
|
|
|
|
|
# 管理员退出命令
|
|
|
|
|
if self.userinfo.isAdmin and info.upper() == 'EXIT':
|
|
|
|
|
self.userinfo = WxUserInfo()
|
|
|
|
|
self.save_session()
|
|
|
|
|
if self._handle_exit(info):
|
|
|
|
|
return "退出成功"
|
|
|
|
|
|
|
|
|
|
# 进入管理员模式
|
|
|
|
|
if info.upper() == 'ADMIN':
|
|
|
|
|
self.userinfo.isAdmin = True
|
|
|
|
|
self.save_session()
|
|
|
|
|
|
|
|
|
|
if self._handle_admin_entry(info):
|
|
|
|
|
return "输入管理员密码"
|
|
|
|
|
|
|
|
|
|
# 管理员密码验证
|
|
|
|
|
if self.userinfo.isAdmin and not self.userinfo.isPasswordSet:
|
|
|
|
|
passwd = settings.WXADMIN # 从设置获取管理员密码
|
|
|
|
|
if settings.TESTING: # 测试环境下使用简单密码
|
|
|
|
|
passwd = '123'
|
|
|
|
|
# 验证密码(双重SHA256加密)
|
|
|
|
|
if passwd.upper() == get_sha256(get_sha256(info)).upper():
|
|
|
|
|
self.userinfo.isPasswordSet = True
|
|
|
|
|
self.save_session()
|
|
|
|
|
return "验证通过,请输入命令或者要执行的命令代码:输入helpme获得帮助"
|
|
|
|
|
else:
|
|
|
|
|
# 密码错误次数限制
|
|
|
|
|
if self.userinfo.Count >= 3:
|
|
|
|
|
self.userinfo = WxUserInfo()
|
|
|
|
|
self.save_session()
|
|
|
|
|
return "超过验证次数"
|
|
|
|
|
self.userinfo.Count += 1
|
|
|
|
|
self.save_session()
|
|
|
|
|
return "验证失败,请重新输入管理员密码:"
|
|
|
|
|
|
|
|
|
|
# 管理员命令执行
|
|
|
|
|
if self.userinfo.isAdmin and self.userinfo.isPasswordSet:
|
|
|
|
|
# 确认执行命令
|
|
|
|
|
if self.userinfo.Command != '' and info.upper() == 'Y':
|
|
|
|
|
return cmd_handler.run(self.userinfo.Command)
|
|
|
|
|
else:
|
|
|
|
|
# 获取命令帮助
|
|
|
|
|
if info.upper() == 'HELPME':
|
|
|
|
|
return cmd_handler.get_help()
|
|
|
|
|
# 保存命令等待确认
|
|
|
|
|
self.userinfo.Command = info
|
|
|
|
|
self.save_session()
|
|
|
|
|
return "确认执行: " + info + " 命令?"
|
|
|
|
|
|
|
|
|
|
# 默认使用ChatGPT回复
|
|
|
|
|
return ChatGPT.chat(info)
|
|
|
|
|
|
|
|
|
|
if self._needs_password_verification():
|
|
|
|
|
return self._handle_password_verify(info)
|
|
|
|
|
|
|
|
|
|
if self._can_execute_admin_command():
|
|
|
|
|
return self._handle_admin_command(info)
|
|
|
|
|
|
|
|
|
|
return self._handle_default_chat(info)
|
|
|
|
|
|
|
|
|
|
# ------------------ 以下为子逻辑模块 ------------------
|
|
|
|
|
|
|
|
|
|
def _handle_exit(self, info: str) -> bool:
|
|
|
|
|
"""管理员退出命令"""
|
|
|
|
|
if self.userinfo.isAdmin and info == 'EXIT':
|
|
|
|
|
self.userinfo = WxUserInfo()
|
|
|
|
|
self.save_session()
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def _handle_admin_entry(self, info: str) -> bool:
|
|
|
|
|
"""进入管理员模式"""
|
|
|
|
|
if info == 'ADMIN':
|
|
|
|
|
self.userinfo.isAdmin = True
|
|
|
|
|
self.save_session()
|
|
|
|
|
return True
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def _needs_password_verification(self) -> bool:
|
|
|
|
|
"""判断是否需要进行密码验证"""
|
|
|
|
|
return self.userinfo.isAdmin and not self.userinfo.isPasswordSet
|
|
|
|
|
|
|
|
|
|
def _handle_password_verify(self, info: str) -> str:
|
|
|
|
|
"""管理员密码验证"""
|
|
|
|
|
passwd = settings.WXADMIN
|
|
|
|
|
if settings.TESTING:
|
|
|
|
|
passwd = '123'
|
|
|
|
|
|
|
|
|
|
# 双重加密验证
|
|
|
|
|
if passwd.upper() == get_sha256(get_sha256(info)).upper():
|
|
|
|
|
self.userinfo.isPasswordSet = True
|
|
|
|
|
self.save_session()
|
|
|
|
|
return "验证通过,请输入命令或者要执行的命令代码:输入helpme获得帮助"
|
|
|
|
|
|
|
|
|
|
# 密码错误次数限制
|
|
|
|
|
if self.userinfo.Count >= 3:
|
|
|
|
|
self.userinfo = WxUserInfo()
|
|
|
|
|
self.save_session()
|
|
|
|
|
return "超过验证次数"
|
|
|
|
|
|
|
|
|
|
self.userinfo.Count += 1
|
|
|
|
|
self.save_session()
|
|
|
|
|
return "验证失败,请重新输入管理员密码:"
|
|
|
|
|
|
|
|
|
|
def _can_execute_admin_command(self) -> bool:
|
|
|
|
|
"""是否处于可执行管理员命令状态"""
|
|
|
|
|
return self.userinfo.isAdmin and self.userinfo.isPasswordSet
|
|
|
|
|
|
|
|
|
|
def _handle_admin_command(self, info: str) -> str:
|
|
|
|
|
"""管理员命令执行"""
|
|
|
|
|
if self.userinfo.Command and info == 'Y':
|
|
|
|
|
return cmd_handler.run(self.userinfo.Command)
|
|
|
|
|
|
|
|
|
|
if info == 'HELPME':
|
|
|
|
|
return cmd_handler.get_help()
|
|
|
|
|
|
|
|
|
|
self.userinfo.Command = info
|
|
|
|
|
self.save_session()
|
|
|
|
|
return f"确认执行: {info} 命令?"
|
|
|
|
|
|
|
|
|
|
def _handle_default_chat(self, info: str) -> str:
|
|
|
|
|
"""普通模式下使用 ChatGPT 回复"""
|
|
|
|
|
return ChatGPT.chat(info)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class WxUserInfo():
|
|
|
|
|
"""微信用户信息类,用于存储用户会话状态"""
|
|
|
|
|
|
|
|
|
|
def __init__(self):
|
|
|
|
|
self.isAdmin = False # 是否为管理员
|
|
|
|
|
self.isPasswordSet = False # 是否通过密码验证
|
|
|
|
|
self.Count = 0 # 密码尝试次数
|
|
|
|
|
self.Command = '' # 待确认的命令
|
|
|
|
|
self.is_admin = False # 是否为管理员.
|
|
|
|
|
self.is_password_set = False # 是否通过密码验证1
|
|
|
|
|
self.count = 0 # 密码尝试次数
|
|
|
|
|
self.command = '' # 待确认的命令
|