中台最终版上传

master
lingel 5 months ago
parent 2d274e7798
commit 2edd7b08b6

@ -26,7 +26,6 @@ def connect_user_db():
port=3306 port=3306
) )
if conn.is_connected(): if conn.is_connected():
print("Successfully connected to the database")
return conn return conn
except Error as e: except Error as e:
print("Connection error:", e) print("Connection error:", e)
@ -314,6 +313,35 @@ def delete_server_health_log(conn,servername,finaldata):
@return: @return:
""" """
# 通过日期获得当日日志数量
def getLogNumByDate(conn,date):
"""
@param conn:
@param date:
@return:
"""
cursor = conn.cursor()
wherequery = f"detect_date = '{date}'"
query = f"SELECT COUNT(*) FROM serverhealthlog WHERE {wherequery}"
# print(query)
cursor.execute(query)
count = cursor.fetchone()[0]
# 关闭连接
cursor.close()
return count
# 测试用例
def test():
conn = connect_user_db()
date = '2024-6-2'
print(getLogNumByDate(conn,date))
# test()
# 断开数据库连接函数 # 断开数据库连接函数
def disconnect_db(conn): def disconnect_db(conn):
""" """

@ -0,0 +1,83 @@
#!/usr/bin/env python
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: MachineLearningDivider.py
# @time: 2024/6/26 8:21
# @desc:利用随机森林法进行模型训练,能够通过平均响应时间、故障率等数据计算出服务器的健康状态
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
import pickle
def trainmodel():
# 假设我们有以下数据集
X = [
[0.3, 0.005], # 服务器特征:平均响应时间和故障率
[2.5, 0.03],
[0.7, 0.045],
[1.2, 0.002],
[3.5, 0.1],
[1.3, 0.05],
[0.01, 0.15], # 服务器特征:平均响应时间和故障率
[5, 0.03],
[0.7, 0.015],
[1.4, 0.02],
[0.15, 0.2],
[1.3, 0.005],
]
y = ['良好', '一般', '一般', '良好', '', '一般', '一般', '', '良好', '', '', '良好'] # 对应的健康状态标签
# 将健康状态标签转换为数值
label_mapping = {'一般': 0, '良好': 1, '': 2}
y_encoded = [label_mapping[label] for label in y]
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.4, random_state=42)
# 选择模型,这里使用随机森林分类器
model = RandomForestClassifier(n_estimators=100, random_state=42)
# 训练模型
model.fit(X_train, y_train)
# 预测测试集
y_pred = model.predict(X_test)
# 评估模型
print(classification_report(y_test, y_pred))
print("Accuracy:", accuracy_score(y_test, y_pred))
# 保存模型
with open('server_health_model.pkl', 'wb') as file:
pickle.dump(model, file)
# trainmodel()
# 定义一个函数来加载模型并进行预测
def load_model_and_predict(new_data):
with open('../LogAnalyze/server_health_model.pkl', 'rb') as file:
loaded_model = pickle.load(file)
predictions = loaded_model.predict(new_data)
return predictions
# 定义一个函数来将预测结果转换为健康等级
def predict_health_status(new_data):
label_mapping = {'一般': 0, '良好': 1, '': 2}
predictions = load_model_and_predict(new_data)
# 创建逆向映射字典
inverse_label_mapping = {value: key for key, value in label_mapping.items()}
# 使用逆向映射字典转换预测结果
health_status = [inverse_label_mapping[pred] for pred in predictions]
return health_status
# 测试函数
def testcase():
new_data = [[0.4, 0.01]] # 新的服务器数据
health_status = predict_health_status(new_data)
print("预测的健康状态:", health_status)
# testcase()

@ -45,9 +45,11 @@ class BackendProcessor():
# 需要填入数据库的消息 # 需要填入数据库的消息
server_name =serverInfo["name"] server_name =serverInfo["name"]
upstream = serverInfo["upstream"] upstream = serverInfo["upstream"]
status = serverInfo["status"] status = 0
detect_date = serverInfo["detect_time"]#TODO 这个字段和下面这个字段都需要后续修改名字。 if serverInfo["status"] == 'up':
responseTime = serverInfo["responseTime"]# status = 1
detect_date = serverInfo["last_check_time"]#TODO 这个字段和下面这个字段都需要后续修改名字。
responseTime = serverInfo["last_response_time"]#
protocol = serverInfo["type"] protocol = serverInfo["type"]
# 先判断是否该服务器已经存在; # 先判断是否该服务器已经存在;
@ -77,8 +79,8 @@ class BackendProcessor():
return 0,[] return 0,[]
def backendTest(): def backendTest():
json_str = '{"source": "backend", "kind": "server_health_list", "servers": {"total":2,"generation":1, "server":[{"index":0,"type":"tcp","responseTime":"111","name":"1.195.201.255","status":0,"upstream":"cluster","detect_time":111},{"index":0,"type":"tcp","responseTime":"111","name":"1.205.204.0","status":1,"upstream":"cluster","detect_time":111}]}}' json_str = '{"source": "backend", "kind": "server_health_list", "servers": {"total":2,"generation":1, "server":[{"index":0,"type":"tcp","last_response_time":"30","name":"1.205.204.0","status":"up","upstream":"cluster","last_check_time":"2024-6-5"}]}}'
data_dict = json.loads(json_str) data_dict = json.loads(json_str)
a = BackendProcessor(data_dict) a = BackendProcessor(data_dict)
backendTest() # backendTest()

@ -7,6 +7,7 @@
# @desc: # @desc:
from DbProcessor.DatabaseOp import * from DbProcessor.DatabaseOp import *
from LogAnalyze import MachineLearningDivider as mldivider from LogAnalyze import MachineLearningDivider as mldivider
from datetime import datetime, timedelta
import json import json
class FrontendProcessor(): class FrontendProcessor():
@ -150,9 +151,13 @@ class FrontendProcessor():
# 使用json.dumps将字典转换为JSON格式的字符串 # 使用json.dumps将字典转换为JSON格式的字符串
backMessage = json.dumps(stats) backMessage = json.dumps(stats)
return backMessage , DisabledRate return backMessage
def getAllServerStatus(self): def getAllServerStatus(self):
"""
给列表页返回信息
@return:
"""
conn = connect_user_db() conn = connect_user_db()
allServerStatusList = select_server_health_list(conn) allServerStatusList = select_server_health_list(conn)
@ -200,13 +205,30 @@ class FrontendProcessor():
healthDict= {} healthDict= {}
healthCnt = 0 # 统计总服务器列表中的健康状态信息,设计所有的服务器,而不是某个服务器的将抗日志统计 healthCnt = 0 # 统计总服务器列表中的健康状态信息,设计所有的服务器,而不是某个服务器的将抗日志统计
# 前端总揽界面所需要的前三个服务器的七天内的序列数据直接通过重用类中本身就有的getServerInfo函数进行获取
datasets = []#todo
# 以后考虑把这几个字段放在数据库里,入库的时候就算好
cnt = 3
for serverInfo in serverList: for serverInfo in serverList:
province = self.queryIpLocation(serverInfo['server_name']) province = self.queryIpLocation(serverInfo['server_name'])
disableRate = self.getDisabledRate(serverInfo['server_name']) disableRate = self.getDisabledRate(conn,serverInfo['server_name'])
avgReponsetime = self.getAvgResponseTime(serverInfo['server_name'])[0] avgReponsetime = self.getAvgResponseTime(conn,serverInfo['server_name'])[0]
if cnt:
server_dict = json.loads(self.getServerInfo(serverInfo['server_name']))
timeArray = server_dict['ResponseTimeArray']
length = min(6,len(timeArray))
parse_server_dict = {
'label': serverInfo['server_name'],
'data' : timeArray[:length]
}
datasets.append(parse_server_dict)
cnt -= 1
# 处理健康情况信息 # 处理健康情况信息
if int(serverInfo['health']): if int(serverInfo['status']):
healthCnt += 1 healthCnt += 1
# 处理省份信息 # 处理省份信息
@ -218,9 +240,8 @@ class FrontendProcessor():
# 存活度信息 # 存活度信息
testdata = [[avgReponsetime/1000, disableRate]] testdata = [[avgReponsetime/1000, disableRate]]
health = mldivider.predict_health_status(testdata) health = mldivider.predict_health_status(testdata)[0]
# 处理存活度信息 # 处理存活度信息
# @todo fuck liuzuai
if health in healthDict: if health in healthDict:
tmp = healthDict[health] tmp = healthDict[health]
healthDict[health] = tmp+1 #计数加一 healthDict[health] = tmp+1 #计数加一
@ -228,23 +249,58 @@ class FrontendProcessor():
healthDict[health] = 1 healthDict[health] = 1
provinceDistribution = self.getDictRate(provinceDict,len(serverList)) provinceDistribution = self.getDictRate(provinceDict,len(serverList))
healthDistribution = self.getDictRate(healthdict,len(serverList)) healthDistribution = self.getDictRate(healthDict,len(serverList))
overallHealthRate = float(healthCnt)/len(serverList) overallHealthRate = float(healthCnt)/len(serverList)
# 日志统计数据
dateArray, logNumArray = self.getLogNumOnDate(conn)
datadict = {
'labels':['周一','周二','周三','周四','周五','周六','周日'],
'datasets':datasets
}
overallMessage = { overallMessage = {
'provinceDistribution': provinceDistribution, 'AveSurRate': overallHealthRate,
'healthDistribution': healthDistribution, 'TimeArray': dateArray,
'overallHealthRate': overallHealthRate 'DataArray': logNumArray,
'data1': provinceDistribution,
'data2': healthDistribution,
'data': datadict
} }
return overallMessage return overallMessage
def getDisabledRate(self,serverName): def getLogNumOnDate(self,conn):
"""
从数据库中获取关于服务器健康状态的日志信息返回结果为每一天的日志信息数量这里的时间格式需要进行自己的定义
@param conn:
@return: 可以用于建立柱形图的信息
"""
dateArray = []
logNumArray = []
# 定义时间格式
date_format = "%Y-%m-%d"
# 计算当前日期之前15天的日期
for day in range(1,16):
delta = timedelta(days=-day)
previous_date = (datetime.now() + delta).strftime(date_format)
# 去除前导零
previous_date = previous_date.replace("-0", "-")
dateArray.append(previous_date)
logNum = getLogNumByDate(conn,previous_date)
logNumArray.append(logNum)
return dateArray, logNumArray
def getDisabledRate(self,conn,serverName):
""" """
@disc 为了更加方便调用把这个功能单独拿出来 @disc 为了更加方便调用把这个功能单独拿出来
@param serverName: @param serverName:
@return: 返回故障率 @return: 返回故障率
""" """
conn = connect_user_db()
serverHealthLog = select_server_health_log(conn, serverName) serverHealthLog = select_server_health_log(conn, serverName)
DisabledTime = 0 DisabledTime = 0
AbledTime = 0 AbledTime = 0
@ -261,13 +317,13 @@ class FrontendProcessor():
return DisabledRate return DisabledRate
def getAvgResponseTime(self,serverName): def getAvgResponseTime(self,conn,serverName):
""" """
获取某个服务器的平均响应时间 获取某个服务器的平均响应时间
@param serverName: @param serverName:
@return: @return:
""" """
conn = connect_user_db()
serverHealthLog = select_server_health_log(conn, serverName) serverHealthLog = select_server_health_log(conn, serverName)
ResponseTimeList = [] ResponseTimeList = []
@ -293,13 +349,17 @@ class FrontendProcessor():
@param dict: @param dict:
@return: @return:
""" """
ret = []
for key in Dict.keys():
rateDict = {} rateDict = {}
for key, value in Dict.keys(),Dict.values(): rateDict['value'] = Dict[key]/float(num)
rateDict[key] = float(value)/num rateDict['name'] = key
ret.append(rateDict)
return ret
return rateDict
def test_case_serverinfo(): def test_case_serverinfo():
di = {"source":"frontend" , "type": "getserverinfo_message","server_name":"abb"} di = {"source":"frontend" , "type": "getserverinfo_message","server_name":"1.205.204.0"}
a = FrontendProcessor(di) a = FrontendProcessor(di)
print(a.returnMessage) print(a.returnMessage)

@ -0,0 +1,22 @@
from datetime import datetime, timedelta
# 定义时间格式
date_format = "%Y-%m-%d"
# 获取当前日期,并按照定义的格式转换
current_date = datetime.now().strftime(date_format)
# 去除前导零
current_date = current_date.replace("-0", "-")
# 打印当前日期
print("当前日期:", current_date)
for day in range(1,16):
# 计算当前日期之前15天的日期
delta = timedelta(days=-day)
previous_date = (datetime.now() + delta).strftime(date_format)
# 去除前导零
previous_date = previous_date.replace("-0", "-")
print(previous_date)
# 打印之前15天的日期
print("之前15天的日期:", previous_date)

@ -1,288 +0,0 @@
#include <iostream>
#include <thread>
#include <vector>
#include <map>
#include <mutex>
#include <atomic>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <cstring>
#define BUFFER_SIZE 1024
#define HTTP_SERVER_PORT 8080
std::vector<std::thread> threads; // 存储所有连接线程
std::map<int, int> sockets; // 存储套接字
std::mutex sockets_mutex; // 用于同步访问 sockets
std::atomic<bool> exit_flag(false); // 原子退出标志
// 维护连接的函数
void KeepConnection(int sock) {
char buffer[BUFFER_SIZE];
int nbytes;
while (!exit_flag) {
nbytes = recv(sock, buffer, BUFFER_SIZE, 0);
if (nbytes <= 0) {
std::cout << "Gateway disconnected" << std::endl;
break;
}
std::cout << "Received from gateway (Socket " << sock << "): " << buffer << std::endl;
}
{
std::lock_guard<std::mutex> lock(sockets_mutex);
sockets.erase(sock); // 从集合中移除套接字
}
close(sock);
}
// 连接到网关的函数
void ConnectToGateway(const std::string& ip, int port) {
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
perror("Could not create socket");
return;
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
if (inet_pton(AF_INET, ip.c_str(), &server_addr.sin_addr) <= 0) {
perror("Invalid address");
close(sock);
return;
}
if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("Connection to the server failed");
close(sock);
return;
}
{
std::lock_guard<std::mutex> lock(sockets_mutex);
sockets[sock] = 1; // 将套接字添加到集合中
}
std::cout << "Connected to gateway at " << ip << ":" << port << " (Socket " << sock << ")" << std::endl;
// 在新线程中维护连接
threads.emplace_back(KeepConnection, sock);
}
// 处理HTTP请求的函数
void HandleHttpRequest(int client_socket) {
char buffer[BUFFER_SIZE];
int bytes_read = recv(client_socket, buffer, sizeof(buffer), 0);
if (bytes_read <= 0) {
close(client_socket);
return;
}
// 简单解析HTTP请求
std::string http_request(buffer, bytes_read);
std::string ip;
int port;
// 寻找IP和端口号
auto host_pos = http_request.find("Host: ");
if (host_pos != std::string::npos) {
auto start = host_pos + std::string("Host: ").size();
ip = http_request.substr(start, http_request.find("\r\n", start) - start);
}
auto content_length_pos = http_request.find("Content-Length:");
if (content_length_pos != std::string::npos) {
auto start = content_length_pos + std::string("Content-Length: ").size();
int length = std::stoi(http_request.substr(start, http_request.find("\r\n", start) - start));
if (length > 0) {
char* content = new char[length];
bytes_read = recv(client_socket, content, length, 0);
if (bytes_read > 0) {
// 解析JSON内容假设前端发送的是JSON格式
// 这里需要添加JSON解析逻辑来提取IP和端口
// 示例:{"ip":"192.168.1.1","port":8080}
// 可以使用第三方库如nlohmann/json来解析JSON
// 假设解析后得到ip和port
// ConnectToGateway(ip, port);
delete[] content;
}
}
}
// 发送HTTP响应
std::string response = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n";
send(client_socket, response.c_str(), response.size(), 0);
close(client_socket);
}
// 启动HTTP服务器的函数
void StartHttpServer() {
int http_server_socket = socket(AF_INET, SOCK_STREAM, 0);
if (http_server_socket < 0) {
perror("Could not create HTTP server socket");
return;
}
int opt = 1;
if (setsockopt(http_server_socket, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
perror("Setsockopt failed");
close(http_server_socket);
return;
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(HTTP_SERVER_PORT);
if (bind(http_server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("Bind failed");
close(http_server_socket);
return;
}
if (listen(http_server_socket, 5) < 0) {
perror("Listen failed");
close(http_server_socket);
return;
}
std::cout << "HTTP server listening on port " << HTTP_SERVER_PORT << std::endl;
while (!exit_flag) {
int client_socket = accept(http_server_socket, NULL, NULL);
if (client_socket < 0) {
perror("Accept failed");
continue;
}
threads.emplace_back(HandleHttpRequest, client_socket);
}
close(http_server_socket);
}
int main() {
// threads.emplace_back(StartHttpServer);
// std::string command, ip;
// int port;
// bool in_connect_mode = false;
// std::cout << "Enter command (connect/exit): ";
// while (true) {
// if (in_connect_mode)
// {
// std::cout << "continue connect or not ? (y or n) ";
// std::cin >> command;
// if(command=="y"){
// std::cout << "Enter the gateway IP address: ";
// std::cin >> ip;
// std::cout << "Enter the gateway port: ";
// std::cin >> port;
// ConnectToGateway(ip, port);
// }
// else {
// in_connect_mode = false; // 退出连接模式
// std::cout << "Exiting connect mode." << std::endl;
// }
// }
// else{
// std::cout << "Enter command (connect/exit): ";
// std::cin>>command;
// if (command == "connect") {
// if (in_connect_mode) {
// std::cout << "Already in connect mode." << std::endl;
// continue;
// }
// in_connect_mode = true; // 进入连接模式
// std::cout << "Enter the gateway IP address: ";
// std::cin >> ip;
// std::cout << "Enter the gateway port: ";
// std::cin >> port;
// ConnectToGateway(ip, port);
// } else if (command == "exit") {
// exit_flag = true; // 设置退出标志
// std::cout << "Exiting program." << std::endl;
// // 关闭所有套接字
// for (auto& sock_pair : sockets) {
// shutdown(sock_pair.first, SHUT_RDWR); // 关闭套接字的发送和接收
// close(sock_pair.first);
// }
// sockets.clear();
// std::cout << "sockets.clear" << std::endl;
// // 等待所有线程结束
// for (auto& thread : threads) {
// if (thread.joinable()) {
// thread.join();
// }
// }
// threads.clear();
// break; // 退出主循环
// } else {
// std::cout << "Unknown command" << std::endl;
// }
// }
// }
// std::cout<<"out"<<std::endl;
// return 0;
std::thread http_server_thread(StartHttpServer);
std::string command, ip;
int port;
bool in_connect_mode = false;
while (true) {
std::cout << "Enter command (connect/exit): ";
std::cin >> command;
if (command == "connect") {
if (in_connect_mode) {
std::cout << "Already in connect mode." << std::endl;
continue;
}
in_connect_mode = true;
std::cout << "Enter the gateway IP address: ";
std::cin >> ip;
std::cout << "Enter the gateway port: ";
std::cin >> port;
ConnectToGateway(ip, port);
} else if (command == "exit") {
break; // 接收到退出命令,退出主循环
} else {
std::cout << "Unknown command" << std::endl;
}
}
// 设置退出标志
exit_flag = true;
// 强制结束HTTP服务器线程
if (http_server_thread.joinable()) {
http_server_thread.join();
}
// 关闭所有套接字
for (auto& sock_pair : sockets) {
shutdown(sock_pair.first, SHUT_RDWR);
close(sock_pair.first);
}
sockets.clear();
// 等待所有工作线程结束
for (auto& thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
threads.clear();
std::cout << "Exited program." << std::endl;
return 0;
}

@ -1,31 +0,0 @@
#include <iostream>
#include <mysql/mysql.h>
int main() {
MYSQL *conn;
conn = mysql_init(NULL);
if (conn == NULL) {
std::cerr << "MySQL init failed" << std::endl;
return 1;
}
const char *server = "localhost";
const char *user = "rtsw";
const char *password = "123456";
const char *database = "nginxdb";
unsigned int port = 3306; // 使用云数据库提供的端口
if (mysql_real_connect(conn, server, user, password, database, port, NULL, 0) == NULL) {
std::cerr << "Connection error: " << mysql_error(conn) << std::endl;
mysql_close(conn);
return 1;
}
std::cout << "Successfully connected to the database" << std::endl;
// ... 执行数据库操作 ...
mysql_close(conn);
return 0;
}

@ -1,86 +0,0 @@
#include <iostream>
#include <thread>
#include <cstring>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <arpa/inet.h>
#define PORT 8080
#define MAX_CLIENTS 5
#define BUFFER_SIZE 1024
// 声明 handle_client 函数
void handle_client(int client_socket);
int main() {
int server_fd, new_socket;
struct sockaddr_in server_addr, client_addr;
socklen_t client_len = sizeof(client_addr);
char buffer[BUFFER_SIZE];
int opt = 1;
// 创建套接字
server_fd = socket(AF_INET, SOCK_STREAM, 0);
if (server_fd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}
// 设置选项,允许重用地址
if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
perror("setsockopt failed");
exit(EXIT_FAILURE);
}
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);
// 绑定
if (bind(server_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
// 监听
if (listen(server_fd, MAX_CLIENTS) < 0) {
perror("listen failed");
exit(EXIT_FAILURE);
}
std::cout << "Server listening on port " << PORT << std::endl;
while (true) {
client_len = sizeof(client_addr);
new_socket = accept(server_fd, (struct sockaddr *)&client_addr, &client_len);
if (new_socket < 0) {
perror("accept failed");
exit(EXIT_FAILURE);
}
std::cout << "New client connected from " << inet_ntoa(client_addr.sin_addr) << " port " << ntohs(client_addr.sin_port) << std::endl;
// 创建并启动线程处理客户端
std::thread(handle_client, new_socket).detach();
}
close(server_fd);
return 0;
}
// 定义 handle_client 函数
void handle_client(int client_socket) {
while (true) {
char buffer[BUFFER_SIZE];
memset(buffer, 0, BUFFER_SIZE); // 清空缓冲区
int nbytes = read(client_socket, buffer, BUFFER_SIZE);
if (nbytes <= 0) {
std::cout << "Client disconnected." << std::endl;
close(client_socket);
break;
}
std::cout << "Received message from client: " << buffer << std::endl;
// 这里可以添加更多的处理逻辑
}
}

@ -1,66 +0,0 @@
#include <iostream>
#include <mysql/mysql.h>
int main() {
MYSQL *conn;
conn = mysql_init(NULL);
if (conn == NULL) {
std::cerr << "MySQL init failed" << std::endl;
return 1;
}
const char *server = "localhost";
const char *user = "rtsw";
const char *password = "123456";
const char *database = "nginxdb";
unsigned int port = 3306;
if (mysql_real_connect(conn, server, user, password, database, port, NULL, 0) == NULL) {
std::cerr << "Connection error: " << mysql_error(conn) << std::endl;
mysql_close(conn);
return 1;
}
std::cout << "Successfully connected to the database" << std::endl;
// 插入数据
const char *insert_query = "INSERT INTO users (username, email) VALUES ('newuser', 'newuser@example.com')";
if (mysql_query(conn, insert_query)) {
std::cerr << "Insert error: " << mysql_error(conn) << std::endl;
} else {
std::cout << "Insert successful" << std::endl;
}
// 查询数据
const char *select_query = "SELECT * FROM users";
MYSQL_RES *result = mysql_store_result(conn);
if (result) {
MYSQL_ROW row;
while ((row = mysql_fetch_row(result))) {
std::cout << "id: " << row[0] << ", username: " << row[1] << ", email: " << row[2] << std::endl;
}
mysql_free_result(result);
} else {
std::cerr << "Select error: " << mysql_error(conn) << std::endl;
}
// 更新数据
const char *update_query = "UPDATE users SET email = 'newuser_updated@example.com' WHERE username = 'newuser'";
if (mysql_query(conn, update_query)) {
std::cerr << "Update error: " << mysql_error(conn) << std::endl;
} else {
std::cout << "Update successful" << std::endl;
}
// 删除数据
const char *delete_query = "DELETE FROM users WHERE username = 'newuser'";
if (mysql_query(conn, delete_query)) {
std::cerr << "Delete error: " << mysql_error(conn) << std::endl;
} else {
std::cout << "Delete successful" << std::endl;
}
mysql_close(conn);
return 0;
}

@ -0,0 +1,167 @@
#!/usr/bin/env python
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: KernelToController.py
# @time: 2024/7/2 15:16
# @desc:
import requests
import socket
import json
import time
from datetime import datetime
class CustomEncoder(json.JSONEncoder):
def __init__(self, *args, **kwargs):
super(CustomEncoder, self).__init__(*args, **kwargs)
self.prefix = {'number': '1', 'source': 'backend', 'kind': 'server_health_list'}
def encode(self, obj):
if not isinstance(obj, dict):
raise TypeError("Object must be a dict.")
# 将prefix添加到原始字典的前面
encoded_dict = {**self.prefix, **obj}
return super(CustomEncoder, self).encode(encoded_dict)
def calResponseTime(time_str1, time_str2, time_format):
# 将字符串时间转换为datetime对象
time1 = datetime.strptime(time_str1, time_format)
time2 = datetime.strptime(time_str2, time_format)
time_difference = time2 - time1
seconds_difference = time_difference.total_seconds()
return seconds_difference
# 定义时间格式
time_format = "%a, %d %b %Y %H:%M:%S GMT"
# 目标URL
url = 'http://127.0.0.1/status?format=json'
# 发送数据的IP地址和端口
target_ip = '47.96.136.178'
target_port = 8080
# 定义一个数组来存储last_check_time的值
last_check_times = []
last_receive_times = []
last_response_times = []
tmp = ''
pos = 0
while True:
# 从URL获取数据
response = requests.get(url)
data = response.json()
tip = 0
pos = pos + 1
# 检查 'servers' 和 'server' 键是否存在
if 'servers' in data and 'server' in data['servers']:
# 遍历服务器数据
for server in data['servers']['server']:
# 将对应的值添加到数组中
if (pos == 1):
# 数组初添加
last_check_times.append(server['last_check_time'])
else:
# 修正处理json数据
server['last_check_time'] = tmp
# 存放到数组中
last_check_times[tip] = tmp
# 修正调整
if (server['last_receive_time'] == ''):
server['last_receive_time'] = 'Null'
if (pos == 1):
last_receive_times.append(server['last_receive_time'])
else:
last_receive_times[tip] = server['last_receive_time']
tip = tip + 1
print("last_check_times:", last_check_times)
print("last_receive_times:", last_receive_times)
##分析计算出服务器响应时间:
for i in range(len(last_check_times)):
# 初次添加
if (pos == 1):
if (last_receive_times[i] == 'Null'):
last_response_times.append('Null')
if (last_receive_times[i] != 'Null'):
responseTime = calResponseTime(last_check_times[i], last_receive_times[i], time_format)
last_response_times.append(responseTime)
else:
if (last_receive_times[i] == 'Null'):
last_response_times[i] = 'Null'
if (last_receive_times[i] != 'Null'):
responseTime = calResponseTime(last_check_times[i], last_receive_times[i], time_format)
last_response_times[i] = responseTime
# 加入到发送数据中:
for server, responseTime in zip(data['servers']['server'], last_response_times):
server['last_response_time'] = responseTime
for i in range(len(last_check_times)):
if (last_receive_times[i] != 'Null'):
tmp = last_receive_times[i]
last_check_times[i] = last_receive_times[i]
# 打印数组内容,查看存储的值
print("last_response_times:", last_response_times)
# 使用自定义的编码器将数据转换为JSON格式的字符串
json_data = json.dumps(data, cls=CustomEncoder)
print(json_data)
# 目标服务器的IP地址和端口
target_url = 'http://47.96.136.178:8080'
# 发送POST请求
# 指定发送的内容类型为JSON
headers = {'Content-Type': 'application/json'}
response = requests.post(target_url, data=json_data, headers=headers)
# 检查请求是否成功
if response.status_code == 200:
print("数据成功发送。")
else:
print("数据发送失败,状态码:", response.status_code)
print("数据已发送到IP{},端口:{}".format(target_ip, target_port))
# 等待2.5秒
time.sleep(2.5)
# 读取文件
def read_log(file_path):
with open(file_path, 'r') as file:
# 读取所有行
log_lines = [line.strip() for line in file]
return log_lines
# # 指定文件路径
# file_path = '/usr/local/nginx/logs/access.log'
#
# # 指定目标IP和端口
# targeturl = 'http://47.96.136.178:8080'
# 构造JSON数据并发送
# def send_json(url, data):
# headers = {'Content-Type': 'application/json'}
# # 将数据构造为JSON格式
# json_data = json.dumps({"logs": data})
# response = requests.post(url, data=json_data, headers=headers)
# return response
# try:
# # 读取日志文件并转换为JSON
# log_data = read_log(file_path)
#
# # 发送数据
# response = send_json(targeturl, log_data)
#
# # 打印响应状态码和内容
# print(f'Status Code: {response.status_code}')
# print(f'Response Content: {response.text}')
# except Exception as e:
# print(f'An error occurred: {e}')

@ -1,280 +0,0 @@
#include "connector_controller.h"
#define BUFFER_SIZE 1024
#define HTTP_SERVER_PORT 8080
std::vector<std::thread> threads; // 存储所有连接线程
std::map<int, int> sockets; // 存储套接字
std::mutex sockets_mutex; // 用于同步访问 sockets
std::atomic<bool> exit_flag(false); // 原子退出标志
// 维护连接的函数
void KeepConnection(int sock) {
char buffer[BUFFER_SIZE];
int nbytes;
while (!exit_flag) {
nbytes = recv(sock, buffer, BUFFER_SIZE, 0);
if (nbytes <= 0) {
std::cout << "Gateway disconnected" << std::endl;
break;
}
std::cout << "Received from gateway (Socket " << sock << "): " << buffer << std::endl;
}
{
std::lock_guard<std::mutex> lock(sockets_mutex);
sockets.erase(sock); // 从集合中移除套接字
}
close(sock);
}
// 连接到网关的函数
void ConnectToGateway(const std::string& ip, int port) {
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
perror("Could not create socket");
return;
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
if (inet_pton(AF_INET, ip.c_str(), &server_addr.sin_addr) <= 0) {
perror("Invalid address");
close(sock);
return;
}
if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("Connection to the server failed");
close(sock);
return;
}
{
std::lock_guard<std::mutex> lock(sockets_mutex);
sockets[sock] = 1; // 将套接字添加到集合中
}
std::cout << "Connected to gateway at " << ip << ":" << port << " (Socket " << sock << ")" << std::endl;
// 在新线程中维护连接
threads.emplace_back(KeepConnection, sock);
}
// 处理HTTP请求的函数
void HandleHttpRequest(int client_socket) {
char buffer[BUFFER_SIZE];
int bytes_read = recv(client_socket, buffer, sizeof(buffer), 0);
if (bytes_read <= 0) {
close(client_socket);
return;
}
// 简单解析HTTP请求
std::string http_request(buffer, bytes_read);
std::string ip;
int port;
// 寻找IP和端口号
auto host_pos = http_request.find("Host: ");
if (host_pos != std::string::npos) {
auto start = host_pos + std::string("Host: ").size();
ip = http_request.substr(start, http_request.find("\r\n", start) - start);
}
auto content_length_pos = http_request.find("Content-Length:");
if (content_length_pos != std::string::npos) {
auto start = content_length_pos + std::string("Content-Length: ").size();
int length = std::stoi(http_request.substr(start, http_request.find("\r\n", start) - start));
if (length > 0) {
char* content = new char[length];
bytes_read = recv(client_socket, content, length, 0);
if (bytes_read > 0) {
// 解析JSON内容假设前端发送的是JSON格式
// 这里需要添加JSON解析逻辑来提取IP和端口
// 示例:{"ip":"192.168.1.1","port":8080}
// 可以使用第三方库如nlohmann/json来解析JSON
// 假设解析后得到ip和port
// ConnectToGateway(ip, port);
delete[] content;
}
}
}
// 发送HTTP响应
std::string response = "HTTP/1.1 200 OK\r\nContent-Type: text/plain\r\n\r\n";
send(client_socket, response.c_str(), response.size(), 0);
close(client_socket);
}
// 启动HTTP服务器的函数
void StartHttpServer() {
int http_server_socket = socket(AF_INET, SOCK_STREAM, 0);
if (http_server_socket < 0) {
perror("Could not create HTTP server socket");
return;
}
int opt = 1;
if (setsockopt(http_server_socket, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
perror("Setsockopt failed");
close(http_server_socket);
return;
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(HTTP_SERVER_PORT);
if (bind(http_server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("Bind failed");
close(http_server_socket);
return;
}
if (listen(http_server_socket, 5) < 0) {
perror("Listen failed");
close(http_server_socket);
return;
}
std::cout << "HTTP server listening on port " << HTTP_SERVER_PORT << std::endl;
while (!exit_flag) {
int client_socket = accept(http_server_socket, NULL, NULL);
if (client_socket < 0) {
perror("Accept failed");
continue;
}
threads.emplace_back(HandleHttpRequest, client_socket);
}
close(http_server_socket);
}
/*
int main() {
// threads.emplace_back(StartHttpServer);
// std::string command, ip;
// int port;
// bool in_connect_mode = false;
// std::cout << "Enter command (connect/exit): ";
// while (true) {
// if (in_connect_mode)
// {
// std::cout << "continue connect or not ? (y or n) ";
// std::cin >> command;
// if(command=="y"){
// std::cout << "Enter the gateway IP address: ";
// std::cin >> ip;
// std::cout << "Enter the gateway port: ";
// std::cin >> port;
// ConnectToGateway(ip, port);
// }
// else {
// in_connect_mode = false; // 退出连接模式
// std::cout << "Exiting connect mode." << std::endl;
// }
// }
// else{
// std::cout << "Enter command (connect/exit): ";
// std::cin>>command;
// if (command == "connect") {
// if (in_connect_mode) {
// std::cout << "Already in connect mode." << std::endl;
// continue;
// }
// in_connect_mode = true; // 进入连接模式
// std::cout << "Enter the gateway IP address: ";
// std::cin >> ip;
// std::cout << "Enter the gateway port: ";
// std::cin >> port;
// ConnectToGateway(ip, port);
// } else if (command == "exit") {
// exit_flag = true; // 设置退出标志
// std::cout << "Exiting program." << std::endl;
// // 关闭所有套接字
// for (auto& sock_pair : sockets) {
// shutdown(sock_pair.first, SHUT_RDWR); // 关闭套接字的发送和接收
// close(sock_pair.first);
// }
// sockets.clear();
// std::cout << "sockets.clear" << std::endl;
// // 等待所有线程结束
// for (auto& thread : threads) {
// if (thread.joinable()) {
// thread.join();
// }
// }
// threads.clear();
// break; // 退出主循环
// } else {
// std::cout << "Unknown command" << std::endl;
// }
// }
// }
// std::cout<<"out"<<std::endl;
// return 0;
std::thread http_server_thread(StartHttpServer);
std::string command, ip;
int port;
bool in_connect_mode = false;
while (true) {
std::cout << "Enter command (connect/exit): ";
std::cin >> command;
if (command == "connect") {
if (in_connect_mode) {
std::cout << "Already in connect mode." << std::endl;
continue;
}
in_connect_mode = true;
std::cout << "Enter the gateway IP address: ";
std::cin >> ip;
std::cout << "Enter the gateway port: ";
std::cin >> port;
ConnectToGateway(ip, port);
} else if (command == "exit") {
break; // 接收到退出命令,退出主循环
} else {
std::cout << "Unknown command" << std::endl;
}
}
// 设置退出标志
exit_flag = true;
// 强制结束HTTP服务器线程
if (http_server_thread.joinable()) {
http_server_thread.join();
}
// 关闭所有套接字
for (auto& sock_pair : sockets) {
shutdown(sock_pair.first, SHUT_RDWR);
close(sock_pair.first);
}
sockets.clear();
// 等待所有工作线程结束
for (auto& thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
threads.clear();
std::cout << "Exited program." << std::endl;
return 0;
}
*/

@ -1,22 +0,0 @@
#ifndef CONNECTOR_CONTROLLER_H
#define CONNECTOR_CONTROLLER_H
#include <iostream>
#include <thread>
#include <vector>
#include <map>
#include <mutex>
#include <atomic>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <cstring>
// 函数声明
void StartHttpServer();
void ConnectToGateway(const std::string& ip, int port);
void KeepConnection(int sock);
void HandleHttpRequest(int client_socket);
#endif // CONNECTOR_CONTROLLER_H

@ -1,68 +0,0 @@
/*
*/
/*
#include <iostream>
#include <mysql/mysql.h>
int main() {
MYSQL *conn;
conn = mysql_init(NULL);
if (conn == NULL) {
std::cerr << "MySQL init failed" << std::endl;
return 1;
}
const char *server = "localhost";
const char *user = "rtsw";
const char *password = "123456";
const char *database = "nginxdb";
unsigned int port = 3306; // 使用云数据库提供的端口
if (mysql_real_connect(conn, server, user, password, database, port, NULL, 0) == NULL) {
std::cerr << "Connection error: " << mysql_error(conn) << std::endl;
mysql_close(conn);
return 1;
}
std::cout << "Successfully connected to the database" << std::endl;
// ... 执行数据库操作 ...
mysql_close(conn);
return 0;
}
*/
#include "connector_database.h"
// DatabaseOperation函数定义
void DatabaseOperation() {
MYSQL *conn = mysql_init(NULL);
if (conn == NULL) {
std::cerr << "MySQL init failed" << std::endl;
return;
}
const char *server = "localhost";
const char *user = "rtsw";
const char *password = "123456";
const char *database = "nginxdb";
unsigned int port = 3306;
if (mysql_real_connect(conn, server, user, password, database, port, NULL, 0) == NULL) {
std::cerr << "Connection error: " << mysql_error(conn) << std::endl;
mysql_close(conn);
return;
}
std::cout << "Successfully connected to the database" << std::endl;
// ... 执行数据库操作 ...
mysql_close(conn);
}

@ -1,10 +0,0 @@
#ifndef CONNECTOR_DATABASE_H
#define CONNECTOR_DATABASE_H
#include <iostream>
#include <mysql/mysql.h>
// 函数声明
void DatabaseOperation();
#endif // CONNECTOR_DATABASE_H

@ -1,35 +0,0 @@
#include <iostream>
#include <string>
#include "connector_controller.h"
#include "connector_database.h"
int main() {
std::string command;
bool running = true;
while (running) {
std::cout << "Enter command (start_http_server/connect_gateway/database_operation/exit): ";
std::cin >> command;
if (command == "start_http_server") {
StartHttpServer();
} else if (command == "connect_gateway") {
std::string ip;
int port;
std::cout << "Enter the gateway IP address: "<<std::endl;
std::cin>>ip; // 使用getline以获取包含空格的IP地址
std::cout << "Enter the gateway port: "<<std::endl;
std::cin >> port;
ConnectToGateway(ip, port);
} else if (command == "database_operation") {
DatabaseOperation();
} else if (command == "exit") {
running = false;
} else {
std::cout << "Unknown command" << std::endl;
}
}
std::cout << "Exiting program." << std::endl;
return 0;
}

@ -1,125 +0,0 @@
#include <iostream>
#include <thread>
#include <vector>
#include <map>
#include <mutex>
#include <atomic>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <arpa/inet.h>
#define BUFFER_SIZE 1024
std::vector<std::thread> threads; // 存储所有连接线程
std::map<int, int> sockets; // 存储套接字
std::mutex sockets_mutex; // 用于同步访问 sockets
std::atomic<bool> exit_flag(false); // 原子退出标志
// 维护连接的函数
void KeepConnection(int sock) {
char buffer[BUFFER_SIZE];
int nbytes;
while (!exit_flag) {
nbytes = recv(sock, buffer, BUFFER_SIZE, 0);
if (nbytes <= 0) {
std::cout << "Gateway disconnected" << std::endl;
break;
}
std::cout << "Received from gateway (Socket " << sock << "): " << buffer << std::endl;
}
{
std::lock_guard<std::mutex> lock(sockets_mutex);
sockets.erase(sock); // 从集合中移除套接字
}
close(sock);
}
// 连接到网关的函数
void ConnectToGateway(const std::string& ip, int port) {
int sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock < 0) {
perror("Could not create socket");
return;
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
if (inet_pton(AF_INET, ip.c_str(), &server_addr.sin_addr) <= 0) {
perror("Invalid address");
close(sock);
return;
}
if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("Connection to the server failed");
close(sock);
return;
}
{
std::lock_guard<std::mutex> lock(sockets_mutex);
sockets[sock] = 1; // 将套接字添加到集合中
}
std::cout << "Connected to gateway at " << ip << ":" << port << " (Socket " << sock << ")" << std::endl;
// 在新线程中维护连接
threads.emplace_back(KeepConnection, sock);
}
int main() {
std::string command, ip;
int port;
bool in_connect_mode = false;
while (true) {
std::cout << "Enter command (connect/exitconnect/exit): ";
std::cin >> command;
if (command == "connect") {
if (in_connect_mode) {
std::cout << "Already in connect mode." << std::endl;
continue;
}
in_connect_mode = true; // 进入连接模式
std::cout << "Enter the gateway IP address: ";
std::cin >> ip;
std::cout << "Enter the gateway port: ";
std::cin >> port;
ConnectToGateway(ip, port);
} else if (command == "exitconnect") {
if (!in_connect_mode) {
std::cout << "Not in connect mode." << std::endl;
continue;
}
in_connect_mode = false; // 退出连接模式
std::cout << "Exiting connect mode." << std::endl;
} else if (command == "exit") {
exit_flag = true; // 设置退出标志
std::cout << "Exiting program." << std::endl;
// 关闭所有套接字
for (auto& sock_pair : sockets) {
shutdown(sock_pair.first, SHUT_RDWR); // 关闭套接字的发送和接收
close(sock_pair.first);
}
sockets.clear();
// 等待所有线程结束
for (auto& thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
threads.clear();
break; // 退出主循环
} else {
std::cout << "Unknown command" << std::endl;
}
}
return 0;
}

@ -1,74 +0,0 @@
#!/usr/bin/env python
# encoding: utf-8
# @author: 原凯峰
# @contact: 2894340009@qq.com
# @software: pycharm
# @file: MachineLearningDivider.py
# @time: 2024/6/26 8:21
# @desc:利用随机森林法进行模型训练,能够通过平均响应时间、故障率等数据计算出服务器的健康状态
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report, accuracy_score
import pickle
# 假设我们有以下数据集
X = [
[0.3, 0.005], # 服务器特征:平均响应时间和故障率
[2.5, 0.03],
[0.7, 0.045],
[1.2, 0.002]
]
y = ['良好', '', '', '良好'] # 对应的健康状态标签
# 将健康状态标签转换为数值
label_mapping = {'一般': 0, '良好': 1, '': 2, '极差': 3}
y_encoded = [label_mapping[label] for label in y]
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(X, y_encoded, test_size=0.25, random_state=42)
# 选择模型,这里使用随机森林分类器
model = RandomForestClassifier(n_estimators=100, random_state=42)
# 训练模型
model.fit(X_train, y_train)
# 预测测试集
y_pred = model.predict(X_test)
# 评估模型
print(classification_report(y_test, y_pred))
print("Accuracy:", accuracy_score(y_test, y_pred))
with open('server_health_model.pkl', 'wb') as file:
pickle.dump(model, file)
# 保存模型
with open('server_health_model.pkl', 'wb') as file:
pickle.dump(model, file)
# 定义一个函数来加载模型并进行预测
def load_model_and_predict(new_data):
with open('server_health_model.pkl', 'rb') as file:
loaded_model = pickle.load(file)
predictions = loaded_model.predict(new_data)
return predictions
# 定义一个函数来将预测结果转换为健康等级
def predict_health_status(new_data):
predictions = load_model_and_predict(new_data)
# 创建逆向映射字典
inverse_label_mapping = {value: key for key, value in label_mapping.items()}
# 使用逆向映射字典转换预测结果
health_status = [inverse_label_mapping[pred] for pred in predictions]
return health_status
# 测试函数
def testcase():
new_data = [[0.4, 0.01]] # 新的服务器数据
health_status = predict_health_status(new_data)
print("预测的健康状态:", health_status)
testcase()

@ -1,2 +0,0 @@
dic = {'a':1,'b':2,'c':3}
for i in dic.keys():
Loading…
Cancel
Save