最终上传

yuankaifeng_branch
lingel 5 months ago
parent d6db0ce71f
commit e37b42b49c

@ -26,7 +26,6 @@ def connect_user_db():
port=3306 port=3306
) )
if conn.is_connected(): if conn.is_connected():
print("Successfully connected to the database")
return conn return conn
except Error as e: except Error as e:
print("Connection error:", e) print("Connection error:", e)
@ -314,6 +313,35 @@ def delete_server_health_log(conn,servername,finaldata):
@return: @return:
""" """
# 通过日期获得当日日志数量
def getLogNumByDate(conn,date):
"""
@param conn:
@param date:
@return:
"""
cursor = conn.cursor()
wherequery = f"detect_date = '{date}'"
query = f"SELECT COUNT(*) FROM serverhealthlog WHERE {wherequery}"
# print(query)
cursor.execute(query)
count = cursor.fetchone()[0]
# 关闭连接
cursor.close()
return count
# 测试用例
def test():
conn = connect_user_db()
date = '2024-6-2'
print(getLogNumByDate(conn,date))
# test()
# 断开数据库连接函数 # 断开数据库连接函数
def disconnect_db(conn): def disconnect_db(conn):
""" """

@ -0,0 +1,167 @@
#!/usr/bin/env python
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: KernelToController.py
# @time: 2024/7/2 15:16
# @desc:
import requests
import socket
import json
import time
from datetime import datetime
class CustomEncoder(json.JSONEncoder):
def __init__(self, *args, **kwargs):
super(CustomEncoder, self).__init__(*args, **kwargs)
self.prefix = {'number': '1', 'source': 'backend', 'kind': 'server_health_list'}
def encode(self, obj):
if not isinstance(obj, dict):
raise TypeError("Object must be a dict.")
# 将prefix添加到原始字典的前面
encoded_dict = {**self.prefix, **obj}
return super(CustomEncoder, self).encode(encoded_dict)
def calResponseTime(time_str1, time_str2, time_format):
# 将字符串时间转换为datetime对象
time1 = datetime.strptime(time_str1, time_format)
time2 = datetime.strptime(time_str2, time_format)
time_difference = time2 - time1
seconds_difference = time_difference.total_seconds()
return seconds_difference
# 定义时间格式
time_format = "%a, %d %b %Y %H:%M:%S GMT"
# 目标URL
url = 'http://127.0.0.1/status?format=json'
# 发送数据的IP地址和端口
target_ip = '47.96.136.178'
target_port = 8080
# 定义一个数组来存储last_check_time的值
last_check_times = []
last_receive_times = []
last_response_times = []
tmp = ''
pos = 0
while True:
# 从URL获取数据
response = requests.get(url)
data = response.json()
tip = 0
pos = pos + 1
# 检查 'servers' 和 'server' 键是否存在
if 'servers' in data and 'server' in data['servers']:
# 遍历服务器数据
for server in data['servers']['server']:
# 将对应的值添加到数组中
if (pos == 1):
# 数组初添加
last_check_times.append(server['last_check_time'])
else:
# 修正处理json数据
server['last_check_time'] = tmp
# 存放到数组中
last_check_times[tip] = tmp
# 修正调整
if (server['last_receive_time'] == ''):
server['last_receive_time'] = 'Null'
if (pos == 1):
last_receive_times.append(server['last_receive_time'])
else:
last_receive_times[tip] = server['last_receive_time']
tip = tip + 1
print("last_check_times:", last_check_times)
print("last_receive_times:", last_receive_times)
##分析计算出服务器响应时间:
for i in range(len(last_check_times)):
# 初次添加
if (pos == 1):
if (last_receive_times[i] == 'Null'):
last_response_times.append('Null')
if (last_receive_times[i] != 'Null'):
responseTime = calResponseTime(last_check_times[i], last_receive_times[i], time_format)
last_response_times.append(responseTime)
else:
if (last_receive_times[i] == 'Null'):
last_response_times[i] = 'Null'
if (last_receive_times[i] != 'Null'):
responseTime = calResponseTime(last_check_times[i], last_receive_times[i], time_format)
last_response_times[i] = responseTime
# 加入到发送数据中:
for server, responseTime in zip(data['servers']['server'], last_response_times):
server['last_response_time'] = responseTime
for i in range(len(last_check_times)):
if (last_receive_times[i] != 'Null'):
tmp = last_receive_times[i]
last_check_times[i] = last_receive_times[i]
# 打印数组内容,查看存储的值
print("last_response_times:", last_response_times)
# 使用自定义的编码器将数据转换为JSON格式的字符串
json_data = json.dumps(data, cls=CustomEncoder)
print(json_data)
# 目标服务器的IP地址和端口
target_url = 'http://47.96.136.178:8080'
# 发送POST请求
# 指定发送的内容类型为JSON
headers = {'Content-Type': 'application/json'}
response = requests.post(target_url, data=json_data, headers=headers)
# 检查请求是否成功
if response.status_code == 200:
print("数据成功发送。")
else:
print("数据发送失败,状态码:", response.status_code)
print("数据已发送到IP{},端口:{}".format(target_ip, target_port))
# 等待2.5秒
time.sleep(2.5)
# 读取文件
def read_log(file_path):
with open(file_path, 'r') as file:
# 读取所有行
log_lines = [line.strip() for line in file]
return log_lines
# # 指定文件路径
# file_path = '/usr/local/nginx/logs/access.log'
#
# # 指定目标IP和端口
# targeturl = 'http://47.96.136.178:8080'
# 构造JSON数据并发送
# def send_json(url, data):
# headers = {'Content-Type': 'application/json'}
# # 将数据构造为JSON格式
# json_data = json.dumps({"logs": data})
# response = requests.post(url, data=json_data, headers=headers)
# return response
# try:
# # 读取日志文件并转换为JSON
# log_data = read_log(file_path)
#
# # 发送数据
# response = send_json(targeturl, log_data)
#
# # 打印响应状态码和内容
# print(f'Status Code: {response.status_code}')
# print(f'Response Content: {response.text}')
# except Exception as e:
# print(f'An error occurred: {e}')

@ -0,0 +1,83 @@
#!/usr/bin/env python
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: MachineLearningDivider.py
# @time: 2024/6/26 8:21
# @desc:利用随机森林法进行模型训练,能够通过平均响应时间、故障率等数据计算出服务器的健康状态
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
import pickle
def trainmodel():
# 假设我们有以下数据集
X = [
[0.3, 0.005], # 服务器特征:平均响应时间和故障率
[2.5, 0.03],
[0.7, 0.045],
[1.2, 0.002],
[3.5, 0.1],
[1.3, 0.05],
[0.01, 0.15], # 服务器特征:平均响应时间和故障率
[5, 0.03],
[0.7, 0.015],
[1.4, 0.02],
[0.15, 0.2],
[1.3, 0.005],
]
y = ['良好', '一般', '一般', '良好', '', '一般', '一般', '', '良好', '', '', '良好'] # 对应的健康状态标签
# 将健康状态标签转换为数值
label_mapping = {'一般': 0, '良好': 1, '': 2}
y_encoded = [label_mapping[label] for label in y]
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.4, random_state=42)
# 选择模型,这里使用随机森林分类器
model = RandomForestClassifier(n_estimators=100, random_state=42)
# 训练模型
model.fit(X_train, y_train)
# 预测测试集
y_pred = model.predict(X_test)
# 评估模型
print(classification_report(y_test, y_pred))
print("Accuracy:", accuracy_score(y_test, y_pred))
# 保存模型
with open('server_health_model.pkl', 'wb') as file:
pickle.dump(model, file)
# trainmodel()
# 定义一个函数来加载模型并进行预测
def load_model_and_predict(new_data):
with open('../LogAnalyze/server_health_model.pkl', 'rb') as file:
loaded_model = pickle.load(file)
predictions = loaded_model.predict(new_data)
return predictions
# 定义一个函数来将预测结果转换为健康等级
def predict_health_status(new_data):
label_mapping = {'一般': 0, '良好': 1, '': 2}
predictions = load_model_and_predict(new_data)
# 创建逆向映射字典
inverse_label_mapping = {value: key for key, value in label_mapping.items()}
# 使用逆向映射字典转换预测结果
health_status = [inverse_label_mapping[pred] for pred in predictions]
return health_status
# 测试函数
def testcase():
new_data = [[0.4, 0.01]] # 新的服务器数据
health_status = predict_health_status(new_data)
print("预测的健康状态:", health_status)
# testcase()

@ -45,9 +45,11 @@ class BackendProcessor():
# 需要填入数据库的消息 # 需要填入数据库的消息
server_name =serverInfo["name"] server_name =serverInfo["name"]
upstream = serverInfo["upstream"] upstream = serverInfo["upstream"]
status = serverInfo["status"] status = 0
detect_date = serverInfo["detect_time"]#TODO 这个字段和下面这个字段都需要后续修改名字。 if serverInfo["status"] == 'up':
responseTime = serverInfo["responseTime"]# status = 1
detect_date = serverInfo["last_check_time"]#TODO 这个字段和下面这个字段都需要后续修改名字。
responseTime = serverInfo["last_response_time"]#
protocol = serverInfo["type"] protocol = serverInfo["type"]
# 先判断是否该服务器已经存在; # 先判断是否该服务器已经存在;
@ -77,8 +79,8 @@ class BackendProcessor():
return 0,[] return 0,[]
def backendTest(): def backendTest():
json_str = '{"source": "backend", "kind": "server_health_list", "servers": {"total":2,"generation":1, "server":[{"index":0,"type":"tcp","responseTime":"111","name":"1.195.201.255","status":0,"upstream":"cluster","detect_time":111},{"index":0,"type":"tcp","responseTime":"111","name":"1.205.204.0","status":1,"upstream":"cluster","detect_time":111}]}}' json_str = '{"source": "backend", "kind": "server_health_list", "servers": {"total":2,"generation":1, "server":[{"index":0,"type":"tcp","last_response_time":"30","name":"1.205.204.0","status":"up","upstream":"cluster","last_check_time":"2024-6-5"}]}}'
data_dict = json.loads(json_str) data_dict = json.loads(json_str)
a = BackendProcessor(data_dict) a = BackendProcessor(data_dict)
backendTest() # backendTest()

@ -7,6 +7,7 @@
# @desc: # @desc:
from DbProcessor.DatabaseOp import * from DbProcessor.DatabaseOp import *
from LogAnalyze import MachineLearningDivider as mldivider from LogAnalyze import MachineLearningDivider as mldivider
from datetime import datetime, timedelta
import json import json
class FrontendProcessor(): class FrontendProcessor():
@ -150,9 +151,13 @@ class FrontendProcessor():
# 使用json.dumps将字典转换为JSON格式的字符串 # 使用json.dumps将字典转换为JSON格式的字符串
backMessage = json.dumps(stats) backMessage = json.dumps(stats)
return backMessage , DisabledRate return backMessage
def getAllServerStatus(self): def getAllServerStatus(self):
"""
给列表页返回信息
@return:
"""
conn = connect_user_db() conn = connect_user_db()
allServerStatusList = select_server_health_list(conn) allServerStatusList = select_server_health_list(conn)
@ -200,13 +205,30 @@ class FrontendProcessor():
healthDict= {} healthDict= {}
healthCnt = 0 # 统计总服务器列表中的健康状态信息,设计所有的服务器,而不是某个服务器的将抗日志统计 healthCnt = 0 # 统计总服务器列表中的健康状态信息,设计所有的服务器,而不是某个服务器的将抗日志统计
# 前端总揽界面所需要的前三个服务器的七天内的序列数据直接通过重用类中本身就有的getServerInfo函数进行获取
datasets = []#todo
# 以后考虑把这几个字段放在数据库里,入库的时候就算好
cnt = 3
for serverInfo in serverList: for serverInfo in serverList:
province = self.queryIpLocation(serverInfo['server_name']) province = self.queryIpLocation(serverInfo['server_name'])
disableRate = self.getDisabledRate(serverInfo['server_name']) disableRate = self.getDisabledRate(conn,serverInfo['server_name'])
avgReponsetime = self.getAvgResponseTime(serverInfo['server_name'])[0] avgReponsetime = self.getAvgResponseTime(conn,serverInfo['server_name'])[0]
if cnt:
server_dict = json.loads(self.getServerInfo(serverInfo['server_name']))
timeArray = server_dict['ResponseTimeArray']
length = min(6,len(timeArray))
parse_server_dict = {
'label': serverInfo['server_name'],
'data' : timeArray[:length]
}
datasets.append(parse_server_dict)
cnt -= 1
# 处理健康情况信息 # 处理健康情况信息
if int(serverInfo['health']): if int(serverInfo['status']):
healthCnt += 1 healthCnt += 1
# 处理省份信息 # 处理省份信息
@ -218,9 +240,8 @@ class FrontendProcessor():
# 存活度信息 # 存活度信息
testdata = [[avgReponsetime/1000, disableRate]] testdata = [[avgReponsetime/1000, disableRate]]
health = mldivider.predict_health_status(testdata) health = mldivider.predict_health_status(testdata)[0]
# 处理存活度信息 # 处理存活度信息
# @todo fuck liuzuai
if health in healthDict: if health in healthDict:
tmp = healthDict[health] tmp = healthDict[health]
healthDict[health] = tmp+1 #计数加一 healthDict[health] = tmp+1 #计数加一
@ -228,23 +249,58 @@ class FrontendProcessor():
healthDict[health] = 1 healthDict[health] = 1
provinceDistribution = self.getDictRate(provinceDict,len(serverList)) provinceDistribution = self.getDictRate(provinceDict,len(serverList))
healthDistribution = self.getDictRate(healthdict,len(serverList)) healthDistribution = self.getDictRate(healthDict,len(serverList))
overallHealthRate = float(healthCnt)/len(serverList) overallHealthRate = float(healthCnt)/len(serverList)
# 日志统计数据
dateArray, logNumArray = self.getLogNumOnDate(conn)
datadict = {
'labels':['周一','周二','周三','周四','周五','周六','周日'],
'datasets':datasets
}
overallMessage = { overallMessage = {
'provinceDistribution': provinceDistribution, 'AveSurRate': overallHealthRate,
'healthDistribution': healthDistribution, 'TimeArray': dateArray,
'overallHealthRate': overallHealthRate 'DataArray': logNumArray,
'data1': provinceDistribution,
'data2': healthDistribution,
'data': datadict
} }
return overallMessage return overallMessage
def getDisabledRate(self,serverName): def getLogNumOnDate(self,conn):
"""
从数据库中获取关于服务器健康状态的日志信息返回结果为每一天的日志信息数量这里的时间格式需要进行自己的定义
@param conn:
@return: 可以用于建立柱形图的信息
"""
dateArray = []
logNumArray = []
# 定义时间格式
date_format = "%Y-%m-%d"
# 计算当前日期之前15天的日期
for day in range(1,16):
delta = timedelta(days=-day)
previous_date = (datetime.now() + delta).strftime(date_format)
# 去除前导零
previous_date = previous_date.replace("-0", "-")
dateArray.append(previous_date)
logNum = getLogNumByDate(conn,previous_date)
logNumArray.append(logNum)
return dateArray, logNumArray
def getDisabledRate(self,conn,serverName):
""" """
@disc 为了更加方便调用把这个功能单独拿出来 @disc 为了更加方便调用把这个功能单独拿出来
@param serverName: @param serverName:
@return: 返回故障率 @return: 返回故障率
""" """
conn = connect_user_db()
serverHealthLog = select_server_health_log(conn, serverName) serverHealthLog = select_server_health_log(conn, serverName)
DisabledTime = 0 DisabledTime = 0
AbledTime = 0 AbledTime = 0
@ -261,13 +317,13 @@ class FrontendProcessor():
return DisabledRate return DisabledRate
def getAvgResponseTime(self,serverName): def getAvgResponseTime(self,conn,serverName):
""" """
获取某个服务器的平均响应时间 获取某个服务器的平均响应时间
@param serverName: @param serverName:
@return: @return:
""" """
conn = connect_user_db()
serverHealthLog = select_server_health_log(conn, serverName) serverHealthLog = select_server_health_log(conn, serverName)
ResponseTimeList = [] ResponseTimeList = []
@ -293,13 +349,17 @@ class FrontendProcessor():
@param dict: @param dict:
@return: @return:
""" """
ret = []
for key in Dict.keys():
rateDict = {} rateDict = {}
for key, value in Dict.keys(),Dict.values(): rateDict['value'] = Dict[key]/float(num)
rateDict[key] = float(value)/num rateDict['name'] = key
ret.append(rateDict)
return ret
return rateDict
def test_case_serverinfo(): def test_case_serverinfo():
di = {"source":"frontend" , "type": "getserverinfo_message","server_name":"abb"} di = {"source":"frontend" , "type": "getserverinfo_message","server_name":"1.205.204.0"}
a = FrontendProcessor(di) a = FrontendProcessor(di)
print(a.returnMessage) print(a.returnMessage)

@ -0,0 +1,22 @@
from datetime import datetime, timedelta
# 定义时间格式
date_format = "%Y-%m-%d"
# 获取当前日期,并按照定义的格式转换
current_date = datetime.now().strftime(date_format)
# 去除前导零
current_date = current_date.replace("-0", "-")
# 打印当前日期
print("当前日期:", current_date)
for day in range(1,16):
# 计算当前日期之前15天的日期
delta = timedelta(days=-day)
previous_date = (datetime.now() + delta).strftime(date_format)
# 去除前导零
previous_date = previous_date.replace("-0", "-")
print(previous_date)
# 打印之前15天的日期
print("之前15天的日期:", previous_date)

@ -1,74 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: MachineLearningDivider.py
# @time: 2024/6/26 8:21
# @desc:利用随机森林法进行模型训练,能够通过平均响应时间、故障率等数据计算出服务器的健康状态
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
import pickle
# 假设我们有以下数据集
X = [
[0.3, 0.005], # 服务器特征:平均响应时间和故障率
[2.5, 0.03],
[0.7, 0.045],
[1.2, 0.002]
]
y = ['良好', '', '', '良好'] # 对应的健康状态标签
# 将健康状态标签转换为数值
label_mapping = {'一般': 0, '良好': 1, '': 2, '极差': 3}
y_encoded = [label_mapping[label] for label in y]
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.25, random_state=42)
# 选择模型,这里使用随机森林分类器
model = RandomForestClassifier(n_estimators=100, random_state=42)
# 训练模型
model.fit(X_train, y_train)
# 预测测试集
y_pred = model.predict(X_test)
# 评估模型
print(classification_report(y_test, y_pred))
print("Accuracy:", accuracy_score(y_test, y_pred))
with open('server_health_model.pkl', 'wb') as file:
pickle.dump(model, file)
# 保存模型
with open('server_health_model.pkl', 'wb') as file:
pickle.dump(model, file)
# 定义一个函数来加载模型并进行预测
def load_model_and_predict(new_data):
with open('server_health_model.pkl', 'rb') as file:
loaded_model = pickle.load(file)
predictions = loaded_model.predict(new_data)
return predictions
# 定义一个函数来将预测结果转换为健康等级
def predict_health_status(new_data):
predictions = load_model_and_predict(new_data)
# 创建逆向映射字典
inverse_label_mapping = {value: key for key, value in label_mapping.items()}
# 使用逆向映射字典转换预测结果
health_status = [inverse_label_mapping[pred] for pred in predictions]
return health_status
# 测试函数
def testcase():
new_data = [[0.4, 0.01]] # 新的服务器数据
health_status = predict_health_status(new_data)
print("预测的健康状态:", health_status)
testcase()

@ -1,2 +0,0 @@
dic = {'a':1,'b':2,'c':3}
for i in dic.keys():
Loading…
Cancel
Save