11 #27

Closed
pzthu8lr3 wants to merge 6 commits from yuankaifeng_branch into master

@ -0,0 +1,3 @@
# 默认忽略的文件
/shelf/
/workspace.xml

@ -0,0 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.12 (G:)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PackageRequirementsSettings">
<option name="requirementsPath" value="" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="EPYTEXT" />
<option name="myDocStringFormat" value="Epytext" />
</component>
</module>

@ -0,0 +1,6 @@
<component name="InspectionProjectProfileManager">
<settings>
<option name="USE_PROJECT_PROFILE" value="false" />
<version value="1.0" />
</settings>
</component>

@ -0,0 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="Black">
<option name="sdkName" value="Python 3.12 (controller)" />
</component>
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.12 (G:)" project-jdk-type="Python SDK" />
</project>

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/controller.iml" filepath="$PROJECT_DIR$/.idea/controller.iml" />
</modules>
</component>
</project>

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$/.." vcs="Git" />
</component>
</project>

@ -0,0 +1,11 @@
#!/usr/bin/env python
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: confControl.py
# @time: 2024/6/21 11:18
# @desc:
#TODO 通过ssh在线更改nginx的conf文件

@ -0,0 +1,350 @@
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: DatabaseOp.py
# @time: 2024/6/19 9:22
# @desc:
import mysql
from mysql.connector import Error
# 连接数据库函数
def connect_user_db():
"""
@author: 原凯峰
@return:
"""
try:
conn = mysql.connector.connect(
host="localhost",
user="rtsw",
password="123456",
database="nginxdb",
port=3306
)
if conn.is_connected():
print("Successfully connected to the database")
return conn
except Error as e:
print("Connection error:", e)
# 插入用户数据函数
def insert_user_data(conn, username, real_name, ID, password, tel):
"""
@author: 原凯峰
@param conn:
@param username:
@param real_name:
@param ID:
@param password:
@param tel:
@return:
"""
try:
cursor = conn.cursor()
query = """
INSERT INTO user (username, real_name, ID, password, tel)
VALUES (%s, %s, %s, %s, %s)
"""
cursor.execute(query, (username, real_name, ID, password, tel))
conn.commit()
return cursor.lastrowid
except Error as e:
print("Insert error:", e)
# 搜索是否存在某用户
def select_user_data(conn,username,ID):
"""
@author: 原凯峰
@param conn:
@param username:
@param ID:
@return:
"""
try:
cursor = conn.cursor()
select_query = "SELECT * FROM user"
cursor.execute(select_query)
for row in cursor.fetchall():
print("id: {}, username: {}, email: {}".format(row[0], row[1], row[2]))
except Error as e:
print("Select error:", e)
# 查询用户函数
def select_exist_user(conn, username):
"""
@author: 原凯峰
@param conn:
@param username:
@return:
"""
try:
cursor = conn.cursor()
# 构造查询语句确保使用参数化查询以防止SQL注入
select_query = "SELECT * FROM user WHERE username = %s"
cursor.execute(select_query, (username,))
# 使用fetchall()来获取所有结果
results = cursor.fetchall()
# 如果结果不为空,则表示找到了匹配的用户名
if results:
for row in results:
print("id: {}, username: {}, email: {}".format(row[0], row[1], row[2]))
return True
else:
return False
except Error as e:
print("Select error:", e)
return False
# 更新用户信息函数
def update_user(conn, username, new_email):
"""
@author: 原凯峰
@param conn:
@param username:
@param new_email:
@return:
"""
try:
cursor = conn.cursor()
query = """
UPDATE users SET email = %s WHERE username = %s
"""
cursor.execute(query, (new_email, username))
conn.commit()
return cursor.rowcount
except Error as e:
print("Update error:", e)
# 删除用户函数
def delete_user(conn, username):
"""
@author: 原凯峰
@param conn:
@param username:
@return:
"""
try:
cursor = conn.cursor()
delete_query = """
DELETE FROM users WHERE username = %s
"""
cursor.execute(delete_query, (username,))
conn.commit()
return cursor.rowcount
except Error as e:
print("Delete error:", e)
# ************************server部分***************************** #
# 判断是否存在数据库中已经收录过这个服务器
def is_exist_server(conn, server_name):
"""
@todo 查询该服务器是否已经被servers数据库收录
@param conn:
@param server_name:
@return:
"""
try:
cursor = conn.cursor()
# 构造查询语句确保使用参数化查询以防止SQL注入
select_query = "SELECT * FROM servers WHERE server_name = %s"
cursor.execute(select_query, (server_name,))
# 使用fetchall()来获取所有结果
results = cursor.fetchall()
# 如果结果不为空,则表示找到了匹配的用户名
if results:
for row in results:
print("server_name: {}, status: {}".format(row[0], row[1]))
return True
else:
return False
except Error as e:
print("Select error:", e)
return False
# 将这一次的服务器活跃信息存放到服务器中,是第一次存放
def insert_server_data(conn,server_name,status,upstream,detect_date,responseTime,protocol):
"""
@param conn:
@param server_name:
@param status:
@param upstream:
@param detect_date:
@param responseTime:
@param protocol:
@return:
"""
try:
cursor = conn.cursor()
query = """
INSERT INTO servers (server_name, status, upstream, detect_date, responseTime, protocol)
VALUES (%s, %s, %s, %s, %s, %s)
"""
cursor.execute(query, (server_name, status, upstream, detect_date, responseTime, protocol))
conn.commit()
return cursor.lastrowid
except Error as e:
print("Insert error:", e)
# 在第n次获得这个服务器的信息时更改服务器列表中的信息
def update_server_list(conn,server_name, status, upstream, detect_date, responseTime, protocol):
"""
@todo 将服务器信息填入或者更改在服务器列表中
@return:
"""
# 更新的数据和条件
update_data = {
'status': f'{status}', # 需要更新的列和新值
'responseTime': f'{responseTime}',
'detect_date': f'{detect_date}',
'upstream': f'{upstream}',
'protocol': f'{protocol}',
# 可以添加更多的列和值
}
where_condition = f'server_name = {server_name}' # 定位记录的条件例如主键ID为1的记录
# 连接到数据库
try:
cursor = conn.cursor()
# 构建UPDATE语句
update_columns = ', '.join([f"{key} = %s" for key in update_data.keys()])
update_values = tuple(update_data.values())
# sql_update_query = f"UPDATE servers SET {update_columns} WHERE {where_condition}"
# 采用下面这个方法不会出错
sql_update_query = "UPDATE servers SET status = %s, responseTime = %s, detect_date = %s, upstream = %s, protocol = %s WHERE server_name = %s"
update_values = (status, responseTime, detect_date, upstream, protocol, server_name)
# 执行UPDATE语句
cursor.execute(sql_update_query, update_values)
# 提交事务
conn.commit()
# 打印受影响的行数
print(cursor.rowcount, "record(s) updated.")
except Error as e:
print("Error while updating data to MySQL", e)
# 获取服务器列表中的所有服务器的信息
def select_server_health_list(conn):
"""
TODO 获得目前数据库中所有的服务器名称返回一个数组
@author: 原凯峰
@param conn:
@return: 存放有服务器名称的数组
"""
sql_query ="SELECT * FROM servers"
cursor = conn.cursor()
# 执行查询语句
cursor.execute(sql_query)
# 获取所有查询结果
rows = cursor.fetchall()
# 将查询结果转换为字典列表
columns = [i[0] for i in cursor.description] # 获取列名
data_list = [dict(zip(columns, row)) for row in rows] # 将每行数据转换为字典
return data_list
def insert_serverhealthlog_data(conn,server_name,status,upstream,detect_date,responseTime,protocol):
"""
@param conn:
@param server_name:
@param status:
@param upstream:
@param detect_date:
@param responseTime:
@param protocol:
@return:
"""
try:
cursor = conn.cursor()
query = """
INSERT INTO serverhealthlog (server_name, status, upstream, detect_date, responseTime, protocol)
VALUES (%s, %s, %s, %s, %s, %s)
"""
cursor.execute(query, (server_name, status, upstream, detect_date, responseTime, protocol))
conn.commit()
return cursor.lastrowid
except Error as e:
print("Insert error:", e)
def select_server_health_log(conn,servername):
"""
#TODO 查询某个服务器的历史健康状态的多条数据需要查询ServerHealthLog表,这个查询时间的范围还需要确定
@author: 原凯峰
@param conn:
@param servername:
@return:返回值是一个字典列表
"""
sql_query = f"SELECT * FROM serverhealthlog WHERE server_name = '{servername}'"
cursor = conn.cursor()
# 执行查询语句
cursor.execute(sql_query)
# 获取所有查询结果
rows = cursor.fetchall()
# 将查询结果转换为字典列表
# 假设你的表有列名为'id', 'name', 'age'等
columns = [i[0] for i in cursor.description] # 获取列名
data_list = [dict(zip(columns, row)) for row in rows] # 将每行数据转换为字典
return data_list
def delete_server_health_log(conn,servername,finaldata):
"""
@todo 删除某个服务器某段时间之前的健康信息
@author: 原凯峰
@param conn: 数据库连接
@param servername: 服务器呢名称
@param finaldata: 从哪个日期之前的数据都要删除
@return:
"""
# 断开数据库连接函数
def disconnect_db(conn):
"""
@author: 原凯峰
@param conn:
@return:
"""
if conn.is_connected():
cursor = conn.cursor()
cursor.close()
conn.close()
print("Database connection closed.")
# 示例使用
# if __name__ == "__main__":
# # 连接数据库
# conn = connect_user_db()
# if conn:
# # 插入用户数据
# insert_user_data(conn, 'username1', 'real_name1', 'ID1', 'password1', 'tel1')
#
# # 查询用户
# select_users(conn)
#
# # 更新用户信息
# # update_user(conn, 'username1', 'new_email1')
#
# # 删除用户
# # delete_user(conn, 'username1')
#
# # 断开数据库连接
# disconnect_db(conn)

@ -0,0 +1,131 @@
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: Connector.py
# @time: 2024/6/19 9:20
# @desc:
import socket
import threading
import sys
from http.server import HTTPServer, BaseHTTPRequestHandler
from Server import *
# 全局变量
sockets = {}
socket_lock = threading.Lock()
exit_flag = False
# 维护连接的函数
def KeepConnection(sock, ip, port):
"""
@author: 原凯峰
@param sock:
@param ip:
@param port:
@return:
"""
while not exit_flag:
data = sock.recv(1024)
if not data:
print(f"Gateway disconnected from {ip}:{port}")
break
print(f"Received from gateway ({ip}:{port}): {data.decode()}")
with socket_lock:
del sockets[sock]
# 连接到网关的函数
def ConnectToGateway(ip, port):
"""
@author: 原凯峰
@param ip:
@param port:
@return:
"""
global exit_flag
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((ip, port))
with socket_lock:
sockets[sock] = (ip, port)
print(f"Connected to gateway at {ip}:{port}")
threading.Thread(target=KeepConnection, args=(sock, ip, port)).start()
def StartHttpServer():
"""
@author: 原凯峰
@return:
"""
global exit_flag
server_address = ('', 8080)
httpd = HTTPServer(server_address, CORSHTTPRequestHandler)
print(f'Starting httpd server on port 8080...')
try:
while not exit_flag:
httpd.handle_request() # 使用 handle_request 替代 serve_forever 以便可以检查 exit_flag
except KeyboardInterrupt:
pass
finally:
httpd.server_close()
print("HTTP server stopped")
# 发送消息到特定网关的函数
def SendMessageToGateway(ip, port, message):
"""
@author: 原凯峰
@param ip:
@param port:
@param message:
@return:
"""
with socket_lock:
# 查找特定的网关连接
for sock, (stored_ip, stored_port) in sockets.items():
if stored_ip == ip and stored_port == port:
try:
# 发送消息,这里假设消息是字符串格式
sock.sendall(message.encode('utf-8'))
print(f"Message sent to {ip}:{port}: {message}")
except socket.error as e:
print(f"Error sending message to {ip}:{port}: {e}")
return # 找到并发送消息后退出函数
# 如果没有找到连接
print(f"No connection found for {ip}:{port}")
# 示例发送消息到IP地址和端口
# 假设我们已经连接到了这个网关
def main():
global exit_flag
exit_flag = False # 初始化exit_flag
t = threading.Thread(target=StartHttpServer)
t.start()
while not exit_flag:
command = input("Enter command (/connect_gateway/exit): ")
# if command == "start_http_server":
# t = threading.Thread(target=StartHttpServer)
# t.start()
if command == "connect_gateway":
ip = input("Enter the gateway IP address: ")
port = int(input("Enter the gateway port: "))
ConnectToGateway(ip, port)
elif command == "exit":
exit_flag = True
print("Exiting program...")
break
else:
print("Unknown command")
# StartHttpServer函数修改
if __name__ == "__main__":
main()

@ -0,0 +1,53 @@
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: Server.py
# @time: 2024/6/19 9:22
# @desc:
import json
from http.server import BaseHTTPRequestHandler, HTTPServer
from MessageHandler.PreDataProcessor import *
import time
class CORSHTTPRequestHandler(BaseHTTPRequestHandler):
def do_OPTIONS(self):
self.send_response(204)
self.send_header('Content-type', 'text/plain')
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.end_headers()
def do_POST(self):
content_length = int(self.headers['Content-Length'])
post_data = self.rfile.read(content_length)
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
print("Request line:", self.requestline)
print("Headers:", self.headers)
print("Body:", post_data.decode('utf-8'))
# 处理程序
MessageProcessor = preDataProcessor(post_data)
# response = {"POST": "request received"}
# self.wfile.write(str(response).encode('utf-8'))
# time.sleep(1)
backMessage =MessageProcessor.returnMessage
response = {"POST": backMessage}
self.wfile.write(str(json.dumps(response)).encode('utf-8'))
# self.wfile.write("{\"POST\": \"request received\"}".encode('utf-8'))
def do_GET(self):
self.send_response(200)
self.send_header('Content-type', 'text/plain')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(b"Hello, world!")

@ -0,0 +1,60 @@
#!/usr/bin/env python
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: aiohttpserver.py
# @time: 2024/6/19 23:11
# @desc:
import asyncio
import aiohttp
from aiohttp import web
async def process_data(data):
# 这里模拟异步数据处理,例如通过网络请求或数据库操作
await asyncio.sleep(1) # 模拟异步操作的延时
return f"Processed: {data}"
# 定义CORS中间件
async def add_cors_headers(request, handler):
# 设置CORS相关的响应头
response = handler(request) # 先调用handler处理请求获取response
response.headers['Access-Control-Allow-Origin'] = '*'
response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS'
response.headers['Access-Control-Allow-Headers'] = 'Content-Type'
# 检查请求方法是否为OPTIONS
if request.method == 'OPTIONS':
# OPTIONS请求返回204状态码不返回body
return web.Response(status=204, headers=response.headers)
return response
# 创建应用
app = web.Application(middlewares=[add_cors_headers])
# 定义路由和视图函数
async def handle_options(request):
return web.Response(status=204)
async def handle_post(request):
data = await request.post()
post_data = await data.text() # 异步读取POST数据
processed_data = await process_data(post_data)
return web.json_response({
"POST": "request received",
"processed_data": processed_data
})
async def process_data(data):
await asyncio.sleep(1) # 模拟异步操作的延时
return f"Processed: {data}"
# 添加路由
app.add_routes([web.route('OPTIONS', '/', handle_options),
web.route('POST', '/', handle_post)])
if __name__ == '__main__':
web.run_app(app, host='127.0.0.1', port=8080)

@ -0,0 +1,74 @@
#!/usr/bin/env python
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: MachineLearningDivider.py
# @time: 2024/6/26 8:21
# @desc:利用随机森林法进行模型训练,能够通过平均响应时间、故障率等数据计算出服务器的健康状态
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
import pickle
# 假设我们有以下数据集
X = [
[0.3, 0.005], # 服务器特征:平均响应时间和故障率
[2.5, 0.03],
[0.7, 0.045],
[1.2, 0.002]
]
y = ['良好', '', '', '良好'] # 对应的健康状态标签
# 将健康状态标签转换为数值
label_mapping = {'一般': 0, '良好': 1, '': 2, '极差': 3}
y_encoded = [label_mapping[label] for label in y]
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.25, random_state=42)
# 选择模型,这里使用随机森林分类器
model = RandomForestClassifier(n_estimators=100, random_state=42)
# 训练模型
model.fit(X_train, y_train)
# 预测测试集
y_pred = model.predict(X_test)
# 评估模型
print(classification_report(y_test, y_pred))
print("Accuracy:", accuracy_score(y_test, y_pred))
with open('server_health_model.pkl', 'wb') as file:
pickle.dump(model, file)
# 保存模型
with open('server_health_model.pkl', 'wb') as file:
pickle.dump(model, file)
# 定义一个函数来加载模型并进行预测
def load_model_and_predict(new_data):
with open('server_health_model.pkl', 'rb') as file:
loaded_model = pickle.load(file)
predictions = loaded_model.predict(new_data)
return predictions
# 定义一个函数来将预测结果转换为健康等级
def predict_health_status(new_data):
predictions = load_model_and_predict(new_data)
# 创建逆向映射字典
inverse_label_mapping = {value: key for key, value in label_mapping.items()}
# 使用逆向映射字典转换预测结果
health_status = [inverse_label_mapping[pred] for pred in predictions]
return health_status
# 测试函数
def testcase():
new_data = [[0.4, 0.01]] # 新的服务器数据
health_status = predict_health_status(new_data)
print("预测的健康状态:", health_status)
testcase()

@ -0,0 +1,59 @@
#!/usr/bin/env python
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: ServerHealthLogAnalyzer.py
# @time: 2024/6/20 16:57
# @desc: 用于对服务器健康信息的数据分析和日志信息的数据分析
from DbProcessor.DatabaseOp import *
class ServerHealthLogAnalyzer():
def __init__(self):
self.serverList = select_server_health()
self.analyzeReturn = self.returnMessageProvider()
def returnMessageProvider(self):
"""
@todo 能够将每一个服务器的日志分析结果存放进dict中方便后续使用
@author: 原凯峰
@return: 存放有服务器健康状态分析的字典
"""
serverHealthDict = {}
for serverName in self.serverList:
aba= self.logAnalyzer(serverName)
return serverHealthDict
def logAnalyzer(self,serverName):
"""
@todo 需要处理日志信息并且将日志信息进行处理形成一个统计性的结论似乎可以借助某些在线应用的api来辅助进行日志分析
@author: 原凯峰
@param serverName:
@return: 存有分析结果的字符串
"""
serverHealthLog = select_server_health_log(serverName)
analyzeResult = ""
return analyzeResult
"""
可能可以做的数据分析
故障率
存活率
周期性存活率
平均正常运行时间
平均故障时间
已经正常/故障运行的时间
"""

@ -0,0 +1,60 @@
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
# 测试样例数据
data = {
'server_name': ['Server1', 'Server1', 'Server2', 'Server2',
'Server1', 'Server1', 'Server2', 'Server2'],
'check_time': ['2024-03-01 10:00', '2024-03-01 11:00', '2024-03-31 12:00',
'2024-04-15 13:00', '2024-06-01 14:00', '2024-06-30 15:00',
'2024-12-25 16:00', '2024-12-31 17:00'],
'status': ['Normal', 'Error', 'Normal', 'Warning',
'Normal', 'Error', 'Warning', 'Normal']
}
# 创建DataFrame
df = pd.DataFrame(data)
df['check_time'] = pd.to_datetime(df['check_time'])
# 添加季节性、月度、周度、日度和小时特征
df['season'] = df['check_time'].dt.to_period('Q').astype(str)
df['month'] = df['check_time'].dt.month_name()
df['day_of_week'] = df['check_time'].dt.day_name()
df['day'] = df['check_time'].dt.day
df['hour'] = df['check_time'].dt.hour
# 定义服务器存活状态
df['is_failure'] = df['status'].apply(lambda x: 1 if x not in ['Normal'] else 0)
# 按时间粒度分组并计算故障次数和总检查次数
grouped = df.groupby(['server_name', 'season', 'month', 'day_of_week', 'day', 'hour'])['is_failure'].agg(Total='sum').reset_index()
# 计算故障率
grouped['failure_rate'] = grouped['Total'] / grouped.groupby(['server_name', 'season', 'month', 'day_of_week', 'day', 'hour']).cumcount() + 1
# 为了可视化我们使用pivot来重塑数据
pivot_season = grouped.pivot_table(index=['server_name', 'season'], columns='hour', values='failure_rate', fill_value=0)
pivot_month = grouped.pivot_table(index=['server_name', 'month'], columns='day_of_week', values='failure_rate', fill_value=0)
pivot_week = grouped.pivot_table(index=['server_name', 'day_of_week'], columns='day', values='failure_rate', fill_value=0)
pivot_day = grouped.pivot_table(index=['server_name', 'day'], columns='hour', values='failure_rate', fill_value=0)
# 可视化季节性故障率
sns.heatmap(pivot_season, annot=True, cmap='YlGnBu')
plt.title('Seasonal Failure Rates by Server and Hour')
plt.show()
# 可视化月度故障率
sns.heatmap(pivot_month, annot=True, cmap='YlGnBu')
plt.title('Monthly Failure Rates by Server and Day of Week')
plt.show()
# 可视化周度故障率
sns.heatmap(pivot_week, annot=True, cmap='YlGnBu')
plt.title('Weekly Failure Rates by Server and Day')
plt.show()
# 可视化日度故障率
sns.heatmap(pivot_day, annot=True, cmap='YlGnBu')
plt.title('Daily Failure Rates by Server and Hour')
plt.show()

@ -0,0 +1,84 @@
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: BackendProcessor.py
# @time: 2024/6/19 9:22
# @desc:
from DbProcessor.DatabaseOp import *
import LogAnalyze.ServerHealthLogAnalyzer as Analyzer
import json
#TODO 还没写
class BackendProcessor():
def __init__(self,receivedMessage):
self.receivedMessage = receivedMessage
self.messageType = self.getreceivedMessageType()
self.returnMessage = self.dataProcessor()
def getreceivedMessageType(self):
"""
@author: 原凯峰
@return: messageType
"""
try:
# 由于后端json中涉及到“type”字段所以采用kind作为关键词
messageType = self.receivedMessage["kind"]
print(f"this is a {messageType} message")
return messageType
except Exception as e:
print(f"An error occurred: {e}")
# 进行错误处理,比如记录日志或者设置默认值
return None
def dataProcessor(self):
"""
@author: 原凯峰
@return:
"""
# 如果是登录信息
if self.messageType == "server_health_list":
serverNum, serverList = self.serverHealthMessageParser(self.receivedMessage)
conn = connect_user_db()
for serverInfo in serverList:
# 需要填入数据库的消息
server_name =serverInfo["name"]
upstream = serverInfo["upstream"]
status = serverInfo["status"]
detect_date = serverInfo["detect_time"]#TODO 这个字段和下面这个字段都需要后续修改名字。
responseTime = serverInfo["responseTime"]#
protocol = serverInfo["type"]
# 先判断是否该服务器已经存在;
if not is_exist_server(conn, server_name):
insert_server_data(conn,server_name, status, upstream, detect_date, responseTime, protocol)
returnMessage = " success!"
else:
update_server_list(conn,server_name, status, upstream, detect_date, responseTime, protocol)
returnMessage = "Server already exists. "
insert_serverhealthlog_data(conn,server_name,status,upstream,detect_date,responseTime,protocol)
def serverHealthMessageParser(self,serversHealthList):
"""
@param serversHealthList: 包含有服务器健康信息的json
@return:serverList ,存放有服务器健康状态的数组
"""
try:
servers = serversHealthList["servers"]
num = servers["total"]
serverList = servers["server"]
return num, serverList
except Exception as e:
print("find an error while parsing server health list!!")
return 0,[]
def backendTest():
json_str = '{"source": "backend", "kind": "server_health_list", "servers": {"total":2,"generation":1, "server":[{"index":0,"type":"tcp","responseTime":"111","name":"1.195.201.255","status":0,"upstream":"cluster","detect_time":111},{"index":0,"type":"tcp","responseTime":"111","name":"1.205.204.0","status":1,"upstream":"cluster","detect_time":111}]}}'
data_dict = json.loads(json_str)
a = BackendProcessor(data_dict)
backendTest()

@ -0,0 +1,318 @@
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: FrontendProcessor.py
# @time: 2024/6/19 9:22
# @desc:
from DbProcessor.DatabaseOp import *
from LogAnalyze import MachineLearningDivider as mldivider
import json
class FrontendProcessor():
def __init__(self,receivedMessage):
self.receivedMessage = receivedMessage
self.messageType = self.getreceivedMessageType()
self.returnMessage = self.dataProcessor()
def getreceivedMessageType(self):
"""
@author: 原凯峰
@return: messageType
"""
try:
messageType = self.receivedMessage["type"]
print(f"this is a {messageType} message")
return messageType
except Exception as e:
print(f"An error occurred: {e}")
# 进行错误处理,比如记录日志或者设置默认值
return None
def dataProcessor(self):
"""
@disc: 这个函数是这个类的核心处理函数处理来自前端的所有请求并且返回响应信息
@author: 原凯峰
@return:
"""
# 如果是登录信息
if self.messageType == "register_message":
conn = connect_user_db()
# 需要填入数据库的消息
username = self.receivedMessage["username"]
password = self.receivedMessage["password"]
real_name = self.receivedMessage["real_name"]
ID = self.receivedMessage["ID"]
tel = self.receivedMessage["tel"]
# 先判断是否改用户已经存在;
if not select_exist_user(conn,username):
insert_user_data(conn,username,real_name,ID,password,tel)
returnMessage = "RegistOP success!"
return returnMessage
else:
returnMessage = "Username already exists or Database error. "
return returnMessage
elif self.messageType == "getallserverstatus":
# 获取所有服务器的状态信息,主要用于服务器列表界面的展示
return self.getAllServerStatus()
elif self.messageType == "getserverinfo_message":
# 获取某个服务器的某些数据,故障率、平均响应时间等,
serverName = self.receivedMessage["server_name"]
serverInfo = self.getServerInfo(serverName)
return serverInfo
elif self.messageType == "get_overall_message":
overAllMessage = self.caculateOverallMessage()
return overAllMessage
elif self.messageType == "get_net_point_message":
# 前端像我请求网络环境中的所有信息
backMessage=self.getNetPointInfo()
# self.returnMessage == f"{backMessage}"
elif self.messageType == "alt_backend_config":
# 下面这个判断用于接收并且处理前端传输过来的更改后端配置文件的请求。
# TODO这个函数的实现可能可以采用以下这种实现方式向后端发送更改信息 不着急写
"""
@想法 是否可以采用ssh远程操控 或者说仅仅担任一个消息中转的作用
@疑惑是否需要建立一个记录这个更改操作的数据库表名为 altconfighistory
"""
conn = connect_user_db()
# 需要填入数据库的消息
username = self.receivedMessage["username"]
password = self.receivedMessage["password"]
json_example = '{"source":"backend","config":"this is a test config file"} '
def getNetPointInfo(self):
"""
@todo 获取当前网络节点所有的节点信息这个不着急写
@return: 所需要的信息字段
"""
rez = '{"servers":"{[{"name":"one","status":"alive","ip":"192.168.0.0.132:8080"},{"name":"one","status":"alive","ip":"192.168.0.0.132:8080"}]}", "gateways":"{["name":"gateway1",abc]}"}'
return rez
def getServerInfo(self,serverName):
"""
@date6.25 14:58
@param serverName:
@return: 返回统计好的服务器的基本信息
"""
conn = connect_user_db()
serverHealthLog = select_server_health_log(conn,serverName)
ResponseTimeList = []
DisabledTime = 0
AbledTime = 0
logNum = len(serverHealthLog)
DetectTimeList = []
# 遍历每一条数据
for serverlog in serverHealthLog:
# 判断该条数据中该服务器是否存活
if serverlog["status"] == '1':
AbledTime += 1
else:
DisabledTime += 1
serverlog.pop('status')
serverlog.pop('server_name')
serverlog.pop('protocol')
serverlog.pop('health_id')
serverlog.pop('upstream')
ResponseTimeList.append(int(serverlog["responseTime"]))
DetectTimeList.append(serverlog['detect_date'])
print(ResponseTimeList)
# 故障率
DisabledRate = float(DisabledTime)/(DisabledTime + AbledTime)
# 响应时间类的统计数据
AverageResponseTime = sum(ResponseTimeList)/float(logNum)
MinResponseTime = min(ResponseTimeList)
MaxResponseTime = max(ResponseTimeList)
stats = {
"servername": serverName,
"DisabledRate": DisabledRate,
"AverageResponseTime": AverageResponseTime,
"MinResponseTime": MinResponseTime,
"MaxResponseTime": MaxResponseTime,
"ResponseTimeArray": ResponseTimeList,
"Time": DetectTimeList
}
# 使用json.dumps将字典转换为JSON格式的字符串
backMessage = json.dumps(stats)
return backMessage , DisabledRate
def getAllServerStatus(self):
conn = connect_user_db()
allServerStatusList = select_server_health_list(conn)
for ServerStatus in allServerStatusList:
ServerStatus.pop('upstream')
if ServerStatus['status'] == '1':
ServerStatus['status'] = '存活'
else:
ServerStatus['status'] = '死亡'
ServerStatus['address'] = self.queryIpLocation(ServerStatus['server_name'])
return allServerStatusList
def queryIpLocation(self, ip_address):
"""
@disc: 返回IP的所在地
@param ip_address:
@return:
"""
ip_data_file = '../china_ip_address.txt'
result = None
with open(ip_data_file, 'r', encoding='utf-8') as f:
for line in f:
if line.strip(): # 确保行非空
parts = line.split()
if len(parts) >= 3:
start_ip = parts[0]
end_ip = parts[1]
location_info = ' '.join(parts[2:])
if ip_address >= start_ip and ip_address <= end_ip:
result = location_info.split('')[1].strip() # 获取中国的地区信息
break
return result
def caculateOverallMessage(self):
"""
@todo 计算服务器的地区分布并且获得统计性数据 利用机器学习算法计算服务器的存活程度 计算平均存活率
@return:返回包含以上所有数据的json格式数据
"""
conn = connect_user_db()
serverList = select_server_health_list(conn)
print(serverList)
provinceDict = {}
healthDict= {}
healthCnt = 0 # 统计总服务器列表中的健康状态信息,设计所有的服务器,而不是某个服务器的将抗日志统计
for serverInfo in serverList:
province = self.queryIpLocation(serverInfo['server_name'])
disableRate = self.getDisabledRate(serverInfo['server_name'])
avgReponsetime = self.getAvgResponseTime(serverInfo['server_name'])[0]
# 处理健康情况信息
if int(serverInfo['health']):
healthCnt += 1
# 处理省份信息
if province in provinceDict:
tmp = provinceDict[province]
provinceDict[province] = tmp+1 #计数加一
else:
provinceDict[province] = 1
# 存活度信息
testdata = [[avgReponsetime/1000, disableRate]]
health = mldivider.predict_health_status(testdata)
# 处理存活度信息
# @todo fuck liuzuai
if health in healthDict:
tmp = healthDict[health]
healthDict[health] = tmp+1 #计数加一
else:
healthDict[health] = 1
provinceDistribution = self.getDictRate(provinceDict,len(serverList))
healthDistribution = self.getDictRate(healthdict,len(serverList))
overallHealthRate = float(healthCnt)/len(serverList)
overallMessage = {
'provinceDistribution': provinceDistribution,
'healthDistribution': healthDistribution,
'overallHealthRate': overallHealthRate
}
return overallMessage
def getDisabledRate(self,serverName):
"""
@disc 为了更加方便调用把这个功能单独拿出来
@param serverName:
@return: 返回故障率
"""
conn = connect_user_db()
serverHealthLog = select_server_health_log(conn, serverName)
DisabledTime = 0
AbledTime = 0
# 遍历每一条数据
for serverlog in serverHealthLog:
# 判断该条数据中该服务器是否存活
if serverlog["status"] == '1':
AbledTime += 1
else:
DisabledTime += 1
# 故障率
DisabledRate = float(DisabledTime) / (DisabledTime + AbledTime)
return DisabledRate
def getAvgResponseTime(self,serverName):
"""
获取某个服务器的平均响应时间
@param serverName:
@return:
"""
conn = connect_user_db()
serverHealthLog = select_server_health_log(conn, serverName)
ResponseTimeList = []
logNum = len(serverHealthLog)
DetectTimeList = []
# 遍历每一条数据
for serverlog in serverHealthLog:
# 判断该条数据中该服务器是否存活
ResponseTimeList.append(int(serverlog["responseTime"]))
DetectTimeList.append(serverlog['detect_date'])
# 响应时间类的统计数据
AverageResponseTime = sum(ResponseTimeList) / float(logNum)
MinResponseTime = min(ResponseTimeList)
MaxResponseTime = max(ResponseTimeList)
return (AverageResponseTime, MinResponseTime, MaxResponseTime)
def getDictRate(self, Dict, num):
"""
返回比率
@param dict:
@return:
"""
rateDict = {}
for key, value in Dict.keys(),Dict.values():
rateDict[key] = float(value)/num
return rateDict
def test_case_serverinfo():
di = {"source":"frontend" , "type": "getserverinfo_message","server_name":"abb"}
a = FrontendProcessor(di)
print(a.returnMessage)
def test_case_serverlist():
di = {"source":"frontend" , "type": "getallserverstatus"}
a = FrontendProcessor(di)
print(a.returnMessage)
def test_case_overallMessage():
di = {"source":"frontend" , "type": "get_overall_message"}
a = FrontendProcessor(di)
print(a.returnMessage)
# test_case_serverlist()
# test_case_serverinfo()
test_case_overallMessage()

@ -0,0 +1,59 @@
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: PreDataProcessor.py
# @time: 2024/6/19 9:22
# @desc:
import json
from MessageHandler.FrontendProcessor import FrontendProcessor
from MessageHandler.BackendProcessor import BackendProcessor
class preDataProcessor():
def __init__(self,rawData):
self.rawData = rawData
self.processedData = self.preDataProcess()
self.returnMessage = self.todo()
def preDataProcess(self):
"""
@author: 原凯峰
@return:processedData处理过后的数据
@todo 前端可以用这个但是后端不太能用
"""
print("preDataProcessing")
processedData = parseJsonToDict(self.rawData)
return processedData
def todo(self):
"""
@author: 原凯峰
@return:前后端信息处理模块的返回信息
"""
if self.processedData["source"]=="frontend":
print("this is a frontend message")
frontProsece = FrontendProcessor(self.processedData)
return frontProsece.returnMessage
elif self.processedData["source"]=="backend":
#对于网关信息的处理
print("this is a backend message")
backendProcess = BackendProcessor(self.processedData)
#这个信息只是一个能够拿到种类的深度,如果想要活得更都的数据,需要在后端信息处理模块里面进行深度解包
return backendProcess.returnMessage
def parseJsonToDict(json_data):
"""
@author: 原凯峰
@param json_data:
@return: 将json信息解析厚的字典
"""
try:
# 使用json.loads()方法将JSON字符串解析为字典
data_dict = json.loads(json_data)
return data_dict
except json.JSONDecodeError as e:
# 如果解析失败,抛出异常
raise ValueError(f"Invalid JSON data: {e}")

@ -0,0 +1,2 @@
dic = {'a':1,'b':2,'c':3}
for i in dic.keys():

@ -0,0 +1,58 @@
# !/usr/bin/env python
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: tryselect.py
# @time: 2024/6/22 11:32
# @desc:
import mysql.connector
from mysql.connector import Error
import json
# 数据库配置信息
host = ''
database = 'nginxdb'
user = 'rtsw'
password = '123456'
# SQL查询语句
sql_query = "SELECT * FROM tasks" # 替换成你的表名和查询条件
# 连接到数据库
try:
connection = mysql.connector.connect(host=host,
database=database,
user=user,
password=password)
if connection.is_connected():
cursor = connection.cursor()
# 执行查询语句
cursor.execute(sql_query)
# 获取所有查询结果
rows = cursor.fetchall()
# 将查询结果转换为字典列表
# 假设你的表有列名为'id', 'name', 'age'等
columns = [i[0] for i in cursor.description] # 获取列名
data_list = [dict(zip(columns, row)) for row in rows] # 将每行数据转换为字典
# 将字典列表转换为JSON格式
json_data = json.dumps(data_list, ensure_ascii=False)
print(json_data) # 打印JSON格式的数据
print(data_list[0]["subject"])
except Error as e:
print("Error while connecting to MySQL", e)
finally:
# 关闭数据库连接
if connection.is_connected():
cursor.close()
connection.close()
print("MySQL connection is closed")

@ -0,0 +1,21 @@
CREATE TABLE `Servers` (
`server_name` VARCHAR(32) PRIMARY KEY,
`status VARCHAR(32) NOT NULL`,
`upstream` VARCHAR(32),
`detect_date` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
`responseTime` varchar(255) NOT NULL COMMENT '响应时间',
`protocol` varchar(32) NOT NULL COMMENT '通信协议名称',
) DEFAULT CHARACTER SET = utf8mb4 COMMENT = '服务器列表';
CREATE TABLE `ServerHealthLog` (
`health_id` INT AUTO_INCREMENT PRIMARY KEY,
`server_name` VARCHAR(32),
`upstream` VARCHAR(32),
`detect_date` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
`status` VARCHAR(32) NOT NULL,
`responseTime` varchar(255) NOT NULL COMMENT '响应时间',
`protocol` varchar(32) NOT NULL COMMENT '通信协议名称',
FOREIGN KEY (`server_name`) REFERENCES Servers(`server_name`),
INDEX `check_time_index` (`check_time`)
) DEFAULT CHARACTER SET = utf8mb4 COMMENT = '服务器历史信息记录表';

@ -0,0 +1,11 @@
CREATE TABLE IF NOT EXISTS `user` (
`username` varchar(255) NOT NULL,
`real_name` varchar(255) NOT NULL COMMENT '真名',
`ID` varchar(255) NOT NULL COMMENT '身份证号',
`password` varchar(255) NOT NULL COMMENT '密码',
`tel` varchar(255) NOT NULL COMMENT '电话号码',
PRIMARY KEY (`id`),
) DEFAULT CHARACTER SET = utf8mb4 COMMENT = '用户信息列表';

File diff suppressed because it is too large Load Diff

@ -0,0 +1,187 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>注册页面</title>
<!-- 引入格式文件-->
<!-- <link rel="stylesheet" href="html&css.css"> -->
</head>
<body>
<style>
*{
margin: 0px;/*所有的外边距为0*/
padding: 0px;/*所有的内边距为0*/
box-sizing: border-box;/*规定两个并排的带边框的框*/
}
table{
text-align: center;
}
body{
background: url("./assets/images/button.jpg")no-repeat center;
padding-top: 25px;
}
.rg_layout{
width: 900px;
height: 500px;
border: 8px solid #EEEEEE;/*solid 定义实线*/
background-color: white;
margin: auto;
}
.rg_left{
float: none;
text-align: center;
margin: 15px;
}
.rg_left>p:first-child{
color: #FFD026;
font-size: 20px;
}
.rg_left>p:last-child{
color: #A6A6A6;
font-size: 20px;
}
.rg_center{
float: left;
}
.rg_right{
float: none;
margin: 250px;
padding-left: 10px;
white-space:nowrap;
}
.rg_right p{
font-size: 15px;
}
.rg_right p a{
color: coral;
}
.td_left{
padding-left: 250px;
width: 100px;
text-align: center;
height: 45px;
white-space:nowrap;
}
.td_right{
padding-left: 40px;
text-align: center;
white-space:nowrap;
}
.bt_center{
padding-left: 310px;
}
#username,#real_name,#ID,#password,#tel,#birthday,#checkcode{
width: 251px;
height: 32px;
border: 1px solid #A6A6A6;
/*设置边框圆角*/
border-radius: 5px;
padding-left: 10px;
}
#checkcode{
width: 110px;
}
#img_check{
height: 32px;
vertical-align: middle;/*设置图片的位置垂直居中*/
}
#btn_sub{
width: 100px;
height: 40px;
background-color: #FFD026;
border: 1px solid #FFD026;
padding-left: 10px;
}
</style>
<div class="rg_layout">
<div class="rg_left">
<p>新用户注册</p>
<p>USER REGISTER</p>
</div>
<div class="rg_center">
<div class="rg_form">
<form class="ant-form" action="#" method="post">
<table>
<tr><!--label 标签的作用是当点击文字也会跳到文本输出框-->
<!--for属性与ID属性对应规定 label 绑定到哪个表单元素。-->
<td class="td_left"><label for="username">用户名</label> </td>
<td class="td_right"><input type="text" name="username" id="username"> </td>
</tr>
<tr>
<td class="td_left"><label for="real_name">姓名</label> </td>
<td class="td_right"><input type="text" name="real_name" id="real_name"> </td>
</tr>
<tr><!--label 标签的作用是当点击文字也会跳到文本输出框-->
<td class="td_left"><label for="ID">身份证号</label> </td>
<td class="td_right"><input type="text" name="ID" id="ID"> </td>
</tr>
<tr>
<td class="td_left"><label for="password">密码</label> </td>
<td class="td_right"><input type="password" name="password" id="password"> </td>
</tr>
<tr>
<td class="td_left"><label for="tel">再次输入密码</label> </td>
<td class="td_right"><input type="password" name="tel" id="tel"> </td>
</tr>
<tr>
<td colspan="2" align="center" class="bt_center">
<input type="submit" value="注册" id="btn_sub">
</td>
</tr>
</table>
</form>
</div>
</div>
<div class="rg_right">
<p><a href="LoginUI.html">返回登录界面</a></p>
</div>
</div>
</body>
<script>
document.addEventListener('DOMContentLoaded', function() {
// 获取表单元素
var form = document.querySelector('.ant-form');
// 为表单添加提交事件监听器
form.addEventListener('submit', function(e) {
e.preventDefault(); // 阻止表单的默认提交行为
// 收集表单数据
var formData = {
//question_kind: document.getElementsByName('question_kind').value,
ID:document.getElementById('ID').value,
password: document.getElementById('password').value,
real_name:document.getElementById('real_name').value,
source:"frontend",
tel:document.getElementById('tel').value,
type:"register_message",
username: document.getElementById('username').value,
};
console.log(formData);
fetch('http://47.96.136.178:8080', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(formData) // 将表单数据转换为JSON字符串
})
.then(response => response.json()) // 转换响应为JSON
.then(data => {
console.log('Success:', data);
alert('反馈成功提交!');
})
.catch((error) => {
console.error('Error:', error);
alert('提交失败,请稍后重试!');
});
});
});
</script>
</html>

@ -0,0 +1,23 @@
def query_ip_location(ip_address, ip_data_file):
result = None
with open(ip_data_file, 'r', encoding='utf-8') as f:
for line in f:
if line.strip(): # 确保行非空
parts = line.split()
if len(parts) >= 3:
start_ip = parts[0]
end_ip = parts[1]
location_info = ' '.join(parts[2:])
if ip_address >= start_ip and ip_address <= end_ip:
result = location_info.split('')[1].strip() # 获取中国的地区信息
break
return result
# 示例用法
ip_data_file = 'china_ip_address.txt'
ip_address_to_query = '1.173.45.67' # 替换为你想要查询的IP地址
location = query_ip_location(ip_address_to_query, ip_data_file)
if location:
print(f"{ip_address_to_query} 所在的省级地域是:{location}")
else:
print(f"未找到 {ip_address_to_query} 的地理位置信息。")

File diff suppressed because it is too large Load Diff

@ -1,19 +0,0 @@
#ifndef _NGX_HTTP_UPSTREAM_CHECK_MODELE_H_INCLUDED_
#define _NGX_HTTP_UPSTREAM_CHECK_MODELE_H_INCLUDED_
#include <ngx_config.h>
#include <ngx_core.h>
#include <ngx_http.h>
ngx_uint_t ngx_http_upstream_check_add_peer(ngx_conf_t *cf,
ngx_http_upstream_srv_conf_t *us, ngx_addr_t *peer);
ngx_uint_t ngx_http_upstream_check_peer_down(ngx_uint_t index);
void ngx_http_upstream_check_get_peer(ngx_uint_t index);
void ngx_http_upstream_check_free_peer(ngx_uint_t index);
#endif //_NGX_HTTP_UPSTREAM_CHECK_MODELE_H_INCLUDED_
Loading…
Cancel
Save