Merge pull request #649 from liangliangyy/dev

add chatgpt && add proxy
sh_branch
且听风吟 3 years ago committed by GitHub
commit 07a1ded08e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -1,5 +1,6 @@
import json
import logging
import os
import urllib.parse
from abc import ABCMeta, abstractmethod
@ -145,7 +146,28 @@ class WBOauthManager(BaseOauthManager):
return datas['avatar_large']
class GoogleOauthManager(BaseOauthManager):
class ProxyManagerMixin:
def __init__(self, *args, **kwargs):
if os.environ.get("HTTP_PROXY"):
self.proxies = {
"http": os.environ.get("HTTP_PROXY"),
"https": os.environ.get("HTTP_PROXY")
}
else:
self.proxies = None
def do_get(self, url, params, headers=None):
rsp = requests.get(url=url, params=params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
def do_post(self, url, params, headers=None):
rsp = requests.post(url, params, headers=headers, proxies=self.proxies)
logger.info(rsp.text)
return rsp.text
class GoogleOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://accounts.google.com/o/oauth2/v2/auth'
TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
API_URL = 'https://www.googleapis.com/oauth2/v3/userinfo'
@ -223,7 +245,7 @@ class GoogleOauthManager(BaseOauthManager):
return datas['picture']
class GitHubOauthManager(BaseOauthManager):
class GitHubOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://github.com/login/oauth/authorize'
TOKEN_URL = 'https://github.com/login/oauth/access_token'
API_URL = 'https://api.github.com/user'
@ -240,14 +262,13 @@ class GitHubOauthManager(BaseOauthManager):
access_token=access_token,
openid=openid)
def get_authorization_url(self, nexturl='/'):
def get_authorization_url(self, next_url='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url + '&next_url=' + nexturl,
'redirect_uri': f'{self.callback_url}&next_url={next_url}',
'scope': 'user'
}
# url = self.AUTH_URL + "?" + urllib.parse.urlencode(params, quote_via=urllib.parse.quote)
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url
@ -297,9 +318,9 @@ class GitHubOauthManager(BaseOauthManager):
return datas['avatar_url']
class FaceBookOauthManager(BaseOauthManager):
AUTH_URL = 'https://www.facebook.com/v2.10/dialog/oauth'
TOKEN_URL = 'https://graph.facebook.com/v2.10/oauth/access_token'
class FaceBookOauthManager(ProxyManagerMixin, BaseOauthManager):
AUTH_URL = 'https://www.facebook.com/v16.0/dialog/oauth'
TOKEN_URL = 'https://graph.facebook.com/v16.0/oauth/access_token'
API_URL = 'https://graph.facebook.com/me'
ICON_NAME = 'facebook'
@ -314,11 +335,11 @@ class FaceBookOauthManager(BaseOauthManager):
access_token=access_token,
openid=openid)
def get_authorization_url(self, nexturl='/'):
def get_authorization_url(self, next_url='/'):
params = {
'client_id': self.client_id,
'response_type': 'code',
'redirect_uri': self.callback_url, # + '&next_url=' + nexturl,
'redirect_uri': self.callback_url,
'scope': 'email,public_profile'
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
@ -389,11 +410,11 @@ class QQOauthManager(BaseOauthManager):
access_token=access_token,
openid=openid)
def get_authorization_url(self, nexturl='/'):
def get_authorization_url(self, next_url='/'):
params = {
'response_type': 'code',
'client_id': self.client_id,
'redirect_uri': self.callback_url + '&next_url=' + nexturl,
'redirect_uri': self.callback_url + '&next_url=' + next_url,
}
url = self.AUTH_URL + "?" + urllib.parse.urlencode(params)
return url

@ -25,3 +25,4 @@ WeRoBot==1.13.1
Whoosh==2.7.4
user-agents==2.2.0
redis==4.5.4
openai==0.27.2

@ -3,7 +3,7 @@ from haystack.query import SearchQuerySet
from blog.models import Article, Category
class BlogApi():
class BlogApi:
def __init__(self):
self.searchqueryset = SearchQuerySet()
self.searchqueryset.auto_query('')

@ -1,32 +1,64 @@
import json
import logging
import os
import requests
import openai
logger = logging.getLogger(__name__)
from servermanager.models import commands
logger = logging.getLogger(__name__)
class TuLing:
def __init__(self):
self.__key__ = '2f1446eb0321804291b0a1e217c25bb5'
self.__appid__ = 137762
openai.api_key = os.environ.get('OPENAI_API_KEY')
if os.environ.get('PROXY'):
openai.proxy = os.environ.get('PROXY')
def _build_req_url(self, content):
return 'http://www.tuling123.com/openapi/api?key=%s&info=%s&userid=%s' % (
self.__key__, content, self.__appid__)
def UserAgent(self, url):
rsp = requests.get(url)
return rsp.content
class ChatGPT:
def getdata(self, content):
@staticmethod
def chat(prompt):
try:
requrl = self._build_req_url(content)
res = self.UserAgent(requrl).decode('utf-8')
jsons = json.loads(res, encoding='utf-8')
if str(jsons["code"]) == '100000':
return jsons["text"]
completion = openai.ChatCompletion.create(model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}])
return completion.choices[0].message.content
except Exception as e:
logger.error(e)
return "哎呀,出错啦。"
return "服务器出错了"
class CommandHandler:
def __init__(self):
self.commands = commands.objects.all()
def run(self, title):
"""
运行命令
:param title: 命令
:return: 返回命令执行结果
"""
cmd = list(
filter(
lambda x: x.title.upper() == title.upper(),
self.commands))
if cmd:
return self.__run_command__(cmd[0].command)
else:
return "未找到相关命令请输入hepme获得帮助。"
def __run_command__(self, cmd):
try:
res = os.popen(cmd).read()
return res
except BaseException:
return '命令执行出错!'
def get_help(self):
rsp = ''
for cmd in self.commands:
rsp += '{c}:{d}\n'.format(c=cmd.title, d=cmd.describe)
return rsp
if __name__ == '__main__':
chatbot = ChatGPT()
prompt = "写一篇1000字关于AI的论文"
print(chatbot.chat(prompt))

@ -1,14 +1,16 @@
import logging
import os
import re
import jsonpickle
from django.conf import settings
from werobot import WeRoBot
from werobot.replies import ArticlesReply, Article
from django.conf import settings
from werobot.session.filestorage import FileStorage
from djangoblog.utils import get_sha256
from servermanager.api.blogapi import BlogApi
from servermanager.api.commonapi import TuLing
from servermanager.models import commands
from servermanager.api.commonapi import ChatGPT, CommandHandler
from .MemcacheStorage import MemcacheStorage
robot = WeRoBot(token=os.environ.get('DJANGO_WEROBOT_TOKEN')
@ -17,18 +19,16 @@ memstorage = MemcacheStorage()
if memstorage.is_available:
robot.config['SESSION_STORAGE'] = memstorage
else:
from werobot.session.filestorage import FileStorage
import os
from django.conf import settings
if os.path.exists(os.path.join(settings.BASE_DIR, 'werobot_session')):
os.remove(os.path.join(settings.BASE_DIR, 'werobot_session'))
robot.config['SESSION_STORAGE'] = FileStorage(filename='werobot_session')
blogapi = BlogApi()
tuling = TuLing()
cmd_handler = CommandHandler()
logger = logging.getLogger(__name__)
def convert_to_articlereply(articles, message):
def convert_to_article_reply(articles, message):
reply = ArticlesReply(message=message)
from blog.templatetags.blog_tags import truncatechars_content
for post in articles:
@ -53,7 +53,7 @@ def search(message, session):
result = blogapi.search_articles(searchstr)
if result:
articles = list(map(lambda x: x.object, result))
reply = convert_to_articlereply(articles, message)
reply = convert_to_article_reply(articles, message)
return reply
else:
return '没有找到相关文章。'
@ -70,7 +70,7 @@ def category(message, session):
def recents(message, session):
articles = blogapi.get_recent_articles()
if articles:
reply = convert_to_articlereply(articles, message)
reply = convert_to_article_reply(articles, message)
return reply
else:
return "暂时还没有文章"
@ -114,38 +114,7 @@ def echo(message, session):
return handler.handler()
class CommandHandler():
def __init__(self):
self.commands = commands.objects.all()
def run(self, title):
cmd = list(
filter(
lambda x: x.title.upper() == title.upper(),
self.commands))
if cmd:
return self.__run_command__(cmd[0].command)
else:
return "未找到相关命令请输入hepme获得帮助。"
def __run_command__(self, cmd):
try:
str = os.popen(cmd).read()
return str
except BaseException:
return '命令执行出错!'
def get_help(self):
rsp = ''
for cmd in self.commands:
rsp += '{c}:{d}\n'.format(c=cmd.title, d=cmd.describe)
return rsp
cmdhandler = CommandHandler()
class MessageHandler():
class MessageHandler:
def __init__(self, message, session):
userid = message.source
self.message = message
@ -154,7 +123,7 @@ class MessageHandler():
try:
info = session[userid]
self.userinfo = jsonpickle.decode(info)
except BaseException:
except Exception as e:
userinfo = WxUserInfo()
self.userinfo = userinfo
@ -166,7 +135,7 @@ class MessageHandler():
def is_password_set(self):
return self.userinfo.isPasswordSet
def savesession(self):
def save_session(self):
info = jsonpickle.encode(self.userinfo)
self.session[self.userid] = info
@ -175,11 +144,11 @@ class MessageHandler():
if self.userinfo.isAdmin and info.upper() == 'EXIT':
self.userinfo = WxUserInfo()
self.savesession()
self.save_session()
return "退出成功"
if info.upper() == 'ADMIN':
self.userinfo.isAdmin = True
self.savesession()
self.save_session()
return "输入管理员密码"
if self.userinfo.isAdmin and not self.userinfo.isPasswordSet:
passwd = settings.WXADMIN
@ -187,27 +156,27 @@ class MessageHandler():
passwd = '123'
if passwd.upper() == get_sha256(get_sha256(info)).upper():
self.userinfo.isPasswordSet = True
self.savesession()
self.save_session()
return "验证通过,请输入命令或者要执行的命令代码:输入helpme获得帮助"
else:
if self.userinfo.Count >= 3:
self.userinfo = WxUserInfo()
self.savesession()
self.save_session()
return "超过验证次数"
self.userinfo.Count += 1
self.savesession()
self.save_session()
return "验证失败,请重新输入管理员密码:"
if self.userinfo.isAdmin and self.userinfo.isPasswordSet:
if self.userinfo.Command != '' and info.upper() == 'Y':
return cmdhandler.run(self.userinfo.Command)
return cmd_handler.run(self.userinfo.Command)
else:
if info.upper() == 'HELPME':
return cmdhandler.get_help()
return cmd_handler.get_help()
self.userinfo.Command = info
self.savesession()
self.save_session()
return "确认执行: " + info + " 命令?"
rsp = tuling.getdata(info)
return rsp
return ChatGPT.chat(info)
class WxUserInfo():
@ -216,17 +185,3 @@ class WxUserInfo():
self.isPasswordSet = False
self.Count = 0
self.Command = ''
"""
@robot.handler
def hello(message, session):
blogapi = BlogApi()
result = blogapi.search_articles(message.content)
if result:
articles = list(map(lambda x: x.object, result))
reply = convert_to_articlereply(articles, message)
return reply
else:
return '没有找到相关文章。'
"""

@ -2,10 +2,9 @@ from django.test import Client, RequestFactory, TestCase
from django.utils import timezone
from werobot.messages.messages import TextMessage
from djangoblog.utils import get_current_site
from accounts.models import BlogUser
from blog.models import Category, Article
from servermanager.api.commonapi import TuLing
from servermanager.api.commonapi import ChatGPT
from .models import commands
from .robot import MessageHandler, CommandHandler
from .robot import search, category, recents
@ -17,13 +16,11 @@ class ServerManagerTest(TestCase):
self.client = Client()
self.factory = RequestFactory()
def test_tuling(self):
t = TuLing()
content = t.getdata('test')
def test_chat_gpt(self):
content = ChatGPT.chat("你好")
self.assertIsNotNone(content)
def test_validate_comment(self):
site = get_current_site().domain
user = BlogUser.objects.create_superuser(
email="liangliangyy1@gmail.com",
username="liangliangyy1",

Loading…
Cancel
Save