master
yangkesi 6 months ago
parent 26ae2c6227
commit 2336e079e3

@ -1,15 +1,18 @@
cmake_minimum_required(VERSION 3.6) cmake_minimum_required(VERSION 3.6) # CMake 3.6
project(server) project(server) # "server"
#
set(SRC set(SRC
WFServer.cc WFServer.cc # WFServer.cc
) )
# MySQL "n"
if (NOT MYSQL STREQUAL "n") if (NOT MYSQL STREQUAL "n")
set(SRC set(SRC
${SRC} ${SRC} #
WFMySQLServer.cc WFMySQLServer.cc # WFMySQLServer.cc
) )
endif () endif ()
add_library(${PROJECT_NAME} OBJECT ${SRC}) #
add_library(${PROJECT_NAME} OBJECT ${SRC}) # "server"

@ -19,44 +19,52 @@
#ifndef _WFDNSSERVER_H_ #ifndef _WFDNSSERVER_H_
#define _WFDNSSERVER_H_ #define _WFDNSSERVER_H_
#include "DnsMessage.h" #include "DnsMessage.h" // 引入 DNS 消息相关的头文件
#include "WFServer.h" #include "WFServer.h" // 引入服务器功能的头文件
#include "WFTaskFactory.h" #include "WFTaskFactory.h" // 引入任务工厂以创建 DNS 任务
// 定义一个 dns_process_t 类型,它是一个接受 WFDnsTask 指针并返回 void 的函数对象
using dns_process_t = std::function<void (WFDnsTask *)>; using dns_process_t = std::function<void (WFDnsTask *)>;
using WFDnsServer = WFServer<protocol::DnsRequest,
protocol::DnsResponse>;
// 定义 WFDnsServer 类型,基于 WFServer 模板类,使用 DnsRequest 和 DnsResponse 作为协议
using WFDnsServer = WFServer<protocol::DnsRequest, protocol::DnsResponse>;
// 设置 DNS 服务器的默认参数
static constexpr struct WFServerParams DNS_SERVER_PARAMS_DEFAULT = static constexpr struct WFServerParams DNS_SERVER_PARAMS_DEFAULT =
{ {
.transport_type = TT_UDP, .transport_type = TT_UDP, // 传输类型为 UDP
.max_connections = 2000, .max_connections = 2000, // 最大连接数
.peer_response_timeout = 10 * 1000, .peer_response_timeout = 10 * 1000, // 对端响应超时时间
.receive_timeout = -1, .receive_timeout = -1, // 接收超时时间,-1 表示不超时
.keep_alive_timeout = 300 * 1000, .keep_alive_timeout = 300 * 1000, // 保持连接的超时时间
.request_size_limit = (size_t)-1, .request_size_limit = (size_t)-1, // 请求大小限制,-1 表示无限制
.ssl_accept_timeout = 5000, .ssl_accept_timeout = 5000, // SSL 接受超时时间
}; };
// WFDnsServer 构造函数的特化模板
template<> inline template<> inline
WFDnsServer::WFServer(dns_process_t proc) : WFDnsServer::WFServer(dns_process_t proc) :
WFServerBase(&DNS_SERVER_PARAMS_DEFAULT), WFServerBase(&DNS_SERVER_PARAMS_DEFAULT), // 调用基类构造函数,传入默认参数
process(std::move(proc)) process(std::move(proc)) // 移动构造函数以提高性能
{ {
} }
// 创建新的会话,重写基类方法
template<> inline template<> inline
CommSession *WFDnsServer::new_session(long long seq, CommConnection *conn) CommSession *WFDnsServer::new_session(long long seq, CommConnection *conn)
{ {
WFDnsTask *task; WFDnsTask *task;
// 使用任务工厂创建一个新的 DNS 任务
task = WFServerTaskFactory::create_dns_task(this, this->process); task = WFServerTaskFactory::create_dns_task(this, this->process);
// 设置保持连接的超时时间
task->set_keep_alive(this->params.keep_alive_timeout); task->set_keep_alive(this->params.keep_alive_timeout);
// 设置接收超时时间
task->set_receive_timeout(this->params.receive_timeout); task->set_receive_timeout(this->params.receive_timeout);
// 设置请求大小限制
task->get_req()->set_size_limit(this->params.request_size_limit); task->get_req()->set_size_limit(this->params.request_size_limit);
return task; return task; // 返回创建的任务
} }
#endif #endif // _WFDNSSERVER_H_

@ -19,45 +19,53 @@
#ifndef _WFHTTPSERVER_H_ #ifndef _WFHTTPSERVER_H_
#define _WFHTTPSERVER_H_ #define _WFHTTPSERVER_H_
#include <utility> #include <utility> // 引入标准库中的工具函数
#include "HttpMessage.h" #include "HttpMessage.h" // 引入 HTTP 消息相关的头文件
#include "WFServer.h" #include "WFServer.h" // 引入服务器功能的头文件
#include "WFTaskFactory.h" #include "WFTaskFactory.h" // 引入任务工厂以创建 HTTP 任务
// 定义一个 http_process_t 类型,它是一个接受 WFHttpTask 指针并返回 void 的函数对象
using http_process_t = std::function<void (WFHttpTask *)>; using http_process_t = std::function<void (WFHttpTask *)>;
using WFHttpServer = WFServer<protocol::HttpRequest,
protocol::HttpResponse>;
// 定义 WFHttpServer 类型,基于 WFServer 模板类,使用 HttpRequest 和 HttpResponse 作为协议
using WFHttpServer = WFServer<protocol::HttpRequest, protocol::HttpResponse>;
// 设置 HTTP 服务器的默认参数
static constexpr struct WFServerParams HTTP_SERVER_PARAMS_DEFAULT = static constexpr struct WFServerParams HTTP_SERVER_PARAMS_DEFAULT =
{ {
.transport_type = TT_TCP, .transport_type = TT_TCP, // 传输类型为 TCP
.max_connections = 2000, .max_connections = 2000, // 最大连接数
.peer_response_timeout = 10 * 1000, .peer_response_timeout = 10 * 1000, // 对端响应超时时间
.receive_timeout = -1, .receive_timeout = -1, // 接收超时时间,-1 表示不超时
.keep_alive_timeout = 60 * 1000, .keep_alive_timeout = 60 * 1000, // 保持连接的超时时间
.request_size_limit = (size_t)-1, .request_size_limit = (size_t)-1, // 请求大小限制,-1 表示无限制
.ssl_accept_timeout = 10 * 1000, .ssl_accept_timeout = 10 * 1000, // SSL 接受超时时间
}; };
// WFHttpServer 构造函数的特化模板
template<> inline template<> inline
WFHttpServer::WFServer(http_process_t proc) : WFHttpServer::WFServer(http_process_t proc) :
WFServerBase(&HTTP_SERVER_PARAMS_DEFAULT), WFServerBase(&HTTP_SERVER_PARAMS_DEFAULT), // 调用基类构造函数,传入默认参数
process(std::move(proc)) process(std::move(proc)) // 移动构造函数以提高性能
{ {
} }
// 创建新的会话,重写基类的方法
template<> inline template<> inline
CommSession *WFHttpServer::new_session(long long seq, CommConnection *conn) CommSession *WFHttpServer::new_session(long long seq, CommConnection *conn)
{ {
WFHttpTask *task; WFHttpTask *task; // 声明一个 HTTP 任务指针
// 使用任务工厂创建一个新的 HTTP 任务
task = WFServerTaskFactory::create_http_task(this, this->process); task = WFServerTaskFactory::create_http_task(this, this->process);
// 设置保持连接的超时时间
task->set_keep_alive(this->params.keep_alive_timeout); task->set_keep_alive(this->params.keep_alive_timeout);
// 设置接收超时时间
task->set_receive_timeout(this->params.receive_timeout); task->set_receive_timeout(this->params.receive_timeout);
// 设置请求大小限制
task->get_req()->set_size_limit(this->params.request_size_limit); task->get_req()->set_size_limit(this->params.request_size_limit);
return task; return task; // 返回创建的任务
} }
#endif #endif // _WFHTTPSERVER_H_

@ -16,45 +16,54 @@
Authors: Wu Jiaxu (wujiaxu@sogou-inc.com) Authors: Wu Jiaxu (wujiaxu@sogou-inc.com)
*/ */
#include <sys/uio.h> #include <sys/uio.h> // 引入系统I/O头文件用于向客户端发送数据
#include "WFMySQLServer.h" #include "WFMySQLServer.h" // 引入 MySQL 服务器的头文件
// 创建新的连接
WFConnection *WFMySQLServer::new_connection(int accept_fd) WFConnection *WFMySQLServer::new_connection(int accept_fd)
{ {
// 调用基类的 new_connection 方法创建连接
WFConnection *conn = this->WFServer::new_connection(accept_fd); WFConnection *conn = this->WFServer::new_connection(accept_fd);
if (conn) if (conn) // 如果连接成功
{ {
protocol::MySQLHandshakeResponse resp; protocol::MySQLHandshakeResponse resp; // 创建 MySQL 握手响应对象
struct iovec vec[8]; struct iovec vec[8]; // 定义 I/O 向量以便于发送数据
int count; int count; // 记录向量中有效数据的数量
resp.server_set(0x0a, "5.5", 1, (const uint8_t *)"12345678901234567890", // 设置服务器的握手响应参数
resp.server_set(0x0a, "5.5", 1, (const uint8_t *)"12345678901234567890",
0, 33, 0); 0, 33, 0);
// 编码握手响应信息到 I/O 向量中
count = resp.encode(vec, 8); count = resp.encode(vec, 8);
if (count >= 0) if (count >= 0) // 如果编码成功
{ {
// 使用 writev 函数发送握手响应数据到客户端
if (writev(accept_fd, vec, count) >= 0) if (writev(accept_fd, vec, count) >= 0)
return conn; return conn; // 返回创建的连接
} }
// 如果发送失败,删除该连接
this->delete_connection(conn); this->delete_connection(conn);
} }
return NULL; return NULL; // 如果连接失败,则返回 NULL
} }
// 创建新的会话,重写基类的方法
CommSession *WFMySQLServer::new_session(long long seq, CommConnection *conn) CommSession *WFMySQLServer::new_session(long long seq, CommConnection *conn)
{ {
static mysql_process_t empty = [](WFMySQLTask *){ }; static mysql_process_t empty = [](WFMySQLTask *){ }; // 定义一个空的 MySQL 处理函数
WFMySQLTask *task; WFMySQLTask *task; // 声明一个 MySQL 任务指针
task = WFServerTaskFactory::create_mysql_task(this, seq ? this->process : // 使用任务工厂创建一个新的 MySQL 任务
empty); task = WFServerTaskFactory::create_mysql_task(this, seq ? this->process : empty);
// 设置保持连接的超时时间
task->set_keep_alive(this->params.keep_alive_timeout); task->set_keep_alive(this->params.keep_alive_timeout);
// 设置接收超时时间
task->set_receive_timeout(this->params.receive_timeout); task->set_receive_timeout(this->params.receive_timeout);
// 设置请求大小限制
task->get_req()->set_size_limit(this->params.request_size_limit); task->get_req()->set_size_limit(this->params.request_size_limit);
return task; return task; // 返回创建的任务
} }

@ -19,39 +19,43 @@
#ifndef _WFMYSQLSERVER_H_ #ifndef _WFMYSQLSERVER_H_
#define _WFMYSQLSERVER_H_ #define _WFMYSQLSERVER_H_
#include <utility> #include <utility> // 引入标准库的实用工具
#include "MySQLMessage.h" #include "MySQLMessage.h" // 引入 MySQL 消息相关的头文件
#include "WFServer.h" #include "WFServer.h" // 引入服务器功能的头文件
#include "WFTaskFactory.h" #include "WFTaskFactory.h" // 引入任务工厂以创建 MySQL 任务
#include "WFConnection.h" #include "WFConnection.h" // 引入连接类的头文件
// 定义一个 mysql_process_t 类型,这是一个接受 WFMySQLTask 指针并返回 void 的函数对象类型
using mysql_process_t = std::function<void (WFMySQLTask *)>; using mysql_process_t = std::function<void (WFMySQLTask *)>;
class MySQLServer;
static constexpr struct WFServerParams MYSQL_SERVER_PARAMS_DEFAULT = // 定义 WFMySQLServer 类,继承自 WFServer 模板类
{ class WFMySQLServer : public WFServer<protocol::MySQLRequest, protocol::MySQLResponse>
.transport_type = TT_TCP,
.max_connections = 2000,
.peer_response_timeout = 10 * 1000,
.receive_timeout = -1,
.keep_alive_timeout = 28800 * 1000,
.request_size_limit = (size_t)-1,
.ssl_accept_timeout = 10 * 1000,
};
class WFMySQLServer : public WFServer<protocol::MySQLRequest,
protocol::MySQLResponse>
{ {
public: public:
WFMySQLServer(mysql_process_t proc): // 构造函数,接收一个 MySQL 处理函数
WFServer(&MYSQL_SERVER_PARAMS_DEFAULT, std::move(proc)) WFMySQLServer(mysql_process_t proc) :
{ WFServer(&MYSQL_SERVER_PARAMS_DEFAULT, std::move(proc)) // 调用基类构造函数,使用默认参数
} {
}
protected: protected:
virtual WFConnection *new_connection(int accept_fd); // 重写 new_connection 方法,创建新的连接
virtual CommSession *new_session(long long seq, CommConnection *conn); virtual WFConnection *new_connection(int accept_fd);
// 重写 new_session 方法,创建新的会话
virtual CommSession *new_session(long long seq, CommConnection *conn);
}; };
#endif // 设置 MySQL 服务器的默认参数
static constexpr struct WFServerParams MYSQL_SERVER_PARAMS_DEFAULT =
{
.transport_type = TT_TCP, // 设置传输类型为 TCP
.max_connections = 2000, // 设置最大连接数
.peer_response_timeout = 10 * 1000, // 设置对端响应超时时间
.receive_timeout = -1, // 设置接收超时时间,-1 表示不超时
.keep_alive_timeout = 28800 * 1000, // 设置保持连接的超时时间
.request_size_limit = (size_t)-1, // 设置请求大小限制,-1 表示无限制
.ssl_accept_timeout = 10 * 1000, // 设置 SSL 接受超时时间
};
#endif // _WFMYSQLSERVER_H_

@ -19,31 +19,35 @@
#ifndef _WFREDISSERVER_H_ #ifndef _WFREDISSERVER_H_
#define _WFREDISSERVER_H_ #define _WFREDISSERVER_H_
#include "RedisMessage.h" #include "RedisMessage.h" // 引入 Redis 消息相关的头文件
#include "WFServer.h" #include "WFServer.h" // 引入服务器功能的头文件
#include "WFTaskFactory.h" #include "WFTaskFactory.h" // 引入任务工厂以创建 Redis 任务
// 定义一个 redis_process_t 类型,这是一个接受 WFRedisTask 指针并返回 void 的函数对象类型
using redis_process_t = std::function<void (WFRedisTask *)>; using redis_process_t = std::function<void (WFRedisTask *)>;
using WFRedisServer = WFServer<protocol::RedisRequest,
protocol::RedisResponse>;
// 定义 WFRedisServer 类型,基于 WFServer 模板类,使用 RedisRequest 和 RedisResponse 作为协议
using WFRedisServer = WFServer<protocol::RedisRequest, protocol::RedisResponse>;
// 设置 Redis 服务器的默认参数
static constexpr struct WFServerParams REDIS_SERVER_PARAMS_DEFAULT = static constexpr struct WFServerParams REDIS_SERVER_PARAMS_DEFAULT =
{ {
.transport_type = TT_TCP, .transport_type = TT_TCP, // 传输类型设置为 TCP
.max_connections = 2000, .max_connections = 2000, // 设置最大连接数为 2000
.peer_response_timeout = 10 * 1000, .peer_response_timeout = 10 * 1000, // 对端响应超时时间为 10 秒
.receive_timeout = -1, .receive_timeout = -1, // 接收超时时间,-1 表示不超时
.keep_alive_timeout = 300 * 1000, .keep_alive_timeout = 300 * 1000, // 保持连接的超时时间为 300 秒
.request_size_limit = (size_t)-1, .request_size_limit = (size_t)-1, // 请求大小限制,-1 表示无限制
.ssl_accept_timeout = 5000, .ssl_accept_timeout = 5000, // SSL 接受超时时间为 5 秒
}; };
// WFRedisServer 构造函数的特化模板
template<> inline template<> inline
WFRedisServer::WFServer(redis_process_t proc) : WFRedisServer::WFServer(redis_process_t proc) :
WFServerBase(&REDIS_SERVER_PARAMS_DEFAULT), WFServerBase(&REDIS_SERVER_PARAMS_DEFAULT), // 调用基类构造函数,传入默认参数
process(std::move(proc)) process(std::move(proc)) // 采用移动语义以避免不必要的拷贝
{ {
} }
#endif // 生成新会话的方法,重写基类的方法
#endif // _WFREDISSERVER_H_

@ -17,272 +17,300 @@
Wu Jiaxu (wujiaxu@sogou-inc.com) Wu Jiaxu (wujiaxu@sogou-inc.com)
*/ */
#include <sys/types.h> #include <sys/types.h> // 引入数据类型定义
#include <sys/socket.h> #include <sys/socket.h> // 引入套接字相关的函数
#include <errno.h> #include <errno.h> // 引入错误码定义
#include <unistd.h> #include <unistd.h> // 引入 UNIX 标准函数
#include <stdio.h> #include <stdio.h> // 引入标准输入输出库
#include <atomic> #include <atomic> // 引入原子操作库
#include <mutex> #include <mutex> // 引入互斥锁库
#include <condition_variable> #include <condition_variable> // 引入条件变量库
#include <openssl/ssl.h> #include <openssl/ssl.h> // 引入 OpenSSL 库以支持 SSL
#include "CommScheduler.h" #include "CommScheduler.h" // 引入调度器相关的头文件
#include "EndpointParams.h" #include "EndpointParams.h" // 引入端点参数定义
#include "WFConnection.h" #include "WFConnection.h" // 引入连接类的定义
#include "WFGlobal.h" #include "WFGlobal.h" // 引入全局配置相关的定义
#include "WFServer.h" #include "WFServer.h" // 引入服务器功能的头文件
#define PORT_STR_MAX 5 #define PORT_STR_MAX 5 // 定义最大端口字符串长度
// 定义 WFServerConnection 类,继承自 WFConnection
class WFServerConnection : public WFConnection class WFServerConnection : public WFConnection
{ {
public: public:
// 构造函数,接受连接计数的原子指针
WFServerConnection(std::atomic<size_t> *conn_count) WFServerConnection(std::atomic<size_t> *conn_count)
{ {
this->conn_count = conn_count; this->conn_count = conn_count; // 初始化连接计数指针
} }
// 析构函数
virtual ~WFServerConnection() virtual ~WFServerConnection()
{ {
(*this->conn_count)--; (*this->conn_count)--; // 每当一个连接被销毁,减少连接计数
} }
private: private:
std::atomic<size_t> *conn_count; std::atomic<size_t> *conn_count; // 存储连接计数的原子变量指针
}; };
// SSL 上下文回调函数,用于处理 SSL 连接
int WFServerBase::ssl_ctx_callback(SSL *ssl, int *al, void *arg) int WFServerBase::ssl_ctx_callback(SSL *ssl, int *al, void *arg)
{ {
WFServerBase *server = (WFServerBase *)arg; WFServerBase *server = (WFServerBase *)arg; // 将参数转换为 WFServerBase 类型
const char *servername = SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name); const char *servername = SSL_get_servername(ssl, TLSEXT_NAMETYPE_host_name); // 获取服务器名称
SSL_CTX *ssl_ctx = server->get_server_ssl_ctx(servername); SSL_CTX *ssl_ctx = server->get_server_ssl_ctx(servername); // 根据服务器名称获取 SSL 上下文
if (!ssl_ctx) if (!ssl_ctx) // 如果未找到 SSL 上下文
return SSL_TLSEXT_ERR_NOACK; return SSL_TLSEXT_ERR_NOACK; // 返回错误
// 如果找到的 SSL 上下文与当前不一致,则设置为找到的上下文
if (ssl_ctx != server->get_ssl_ctx()) if (ssl_ctx != server->get_ssl_ctx())
SSL_set_SSL_CTX(ssl, ssl_ctx); SSL_set_SSL_CTX(ssl, ssl_ctx);
return SSL_TLSEXT_ERR_OK; return SSL_TLSEXT_ERR_OK; // 返回成功
} }
// 创建新的 SSL 上下文
SSL_CTX *WFServerBase::new_ssl_ctx(const char *cert_file, const char *key_file) SSL_CTX *WFServerBase::new_ssl_ctx(const char *cert_file, const char *key_file)
{ {
SSL_CTX *ssl_ctx = WFGlobal::new_ssl_server_ctx(); SSL_CTX *ssl_ctx = WFGlobal::new_ssl_server_ctx(); // 创建新的 SSL 服务器上下文
if (!ssl_ctx) if (!ssl_ctx) // 如果创建失败
return NULL; return NULL; // 返回 NULL
// 加载证书链、私钥,并检查私钥有效性
if (SSL_CTX_use_certificate_chain_file(ssl_ctx, cert_file) > 0 && if (SSL_CTX_use_certificate_chain_file(ssl_ctx, cert_file) > 0 &&
SSL_CTX_use_PrivateKey_file(ssl_ctx, key_file, SSL_FILETYPE_PEM) > 0 && SSL_CTX_use_PrivateKey_file(ssl_ctx, key_file, SSL_FILETYPE_PEM) > 0 &&
SSL_CTX_check_private_key(ssl_ctx) > 0 && SSL_CTX_check_private_key(ssl_ctx) > 0 &&
SSL_CTX_set_tlsext_servername_callback(ssl_ctx, ssl_ctx_callback) > 0 && SSL_CTX_set_tlsext_servername_callback(ssl_ctx, ssl_ctx_callback) > 0 &&
SSL_CTX_set_tlsext_servername_arg(ssl_ctx, this) > 0) SSL_CTX_set_tlsext_servername_arg(ssl_ctx, this) > 0)
{ {
return ssl_ctx; return ssl_ctx; // 返回创建的 SSL 上下文
} }
SSL_CTX_free(ssl_ctx); SSL_CTX_free(ssl_ctx); // 释放 SSL 上下文
return NULL; return NULL; // 返回 NULL
} }
// 初始化服务器
int WFServerBase::init(const struct sockaddr *bind_addr, socklen_t addrlen, int WFServerBase::init(const struct sockaddr *bind_addr, socklen_t addrlen,
const char *cert_file, const char *key_file) const char *cert_file, const char *key_file)
{ {
int timeout = this->params.peer_response_timeout; int timeout = this->params.peer_response_timeout; // 将超时时间初始化为对端响应超时时间
if (this->params.receive_timeout >= 0) if (this->params.receive_timeout >= 0) // 如果接收超时时间有效
{ {
if ((unsigned int)timeout > (unsigned int)this->params.receive_timeout) if ((unsigned int)timeout > (unsigned int)this->params.receive_timeout)
timeout = this->params.receive_timeout; timeout = this->params.receive_timeout; // 将超时时间更新为接收超时时间
} }
// 检查是否需要 SSL/TLS 连接
if (this->params.transport_type == TT_TCP_SSL || if (this->params.transport_type == TT_TCP_SSL ||
this->params.transport_type == TT_SCTP_SSL) this->params.transport_type == TT_SCTP_SSL)
{ {
if (!cert_file || !key_file) if (!cert_file || !key_file) // 如果没有提供证书或密钥文件
{ {
errno = EINVAL; errno = EINVAL; // 设置错误号为无效参数
return -1; return -1; // 返回错误
} }
} }
// 初始化基本通信服务
if (this->CommService::init(bind_addr, addrlen, -1, timeout) < 0) if (this->CommService::init(bind_addr, addrlen, -1, timeout) < 0)
return -1; return -1; // 初始化失败,返回错误
// 如果提供了证书和密钥,设置 SSL 上下文
if (cert_file && key_file && this->params.transport_type != TT_UDP) if (cert_file && key_file && this->params.transport_type != TT_UDP)
{ {
SSL_CTX *ssl_ctx = this->new_ssl_ctx(cert_file, key_file); SSL_CTX *ssl_ctx = this->new_ssl_ctx(cert_file, key_file); // 创建新的 SSL 上下文
if (!ssl_ctx) if (!ssl_ctx) // 如果创建失败
{ {
this->deinit(); this->deinit(); // 反初始化
return -1; return -1; // 返回错误
} }
this->set_ssl(ssl_ctx, this->params.ssl_accept_timeout); this->set_ssl(ssl_ctx, this->params.ssl_accept_timeout); // 设置 SSL 上下文
} }
this->scheduler = WFGlobal::get_scheduler(); this->scheduler = WFGlobal::get_scheduler(); // 获取全局调度器
return 0; return 0; // 返回成功
} }
// 创建监听文件描述符
int WFServerBase::create_listen_fd() int WFServerBase::create_listen_fd()
{ {
if (this->listen_fd < 0) if (this->listen_fd < 0) // 如果尚未创建监听文件描述符
{ {
const struct sockaddr *bind_addr; const struct sockaddr *bind_addr; // 定义绑定地址
socklen_t addrlen; socklen_t addrlen; // 定义地址长度
int type, protocol; int type, protocol; // 定义套接字类型和协议
int reuse = 1; int reuse = 1; // 允许地址重用
// 根据传输类型设置套接字类型和协议
switch (this->params.transport_type) switch (this->params.transport_type)
{ {
case TT_TCP: case TT_TCP:
case TT_TCP_SSL: case TT_TCP_SSL:
type = SOCK_STREAM; type = SOCK_STREAM; // TCP 使用流式套接字
protocol = 0; protocol = 0; // 默认协议
break; break;
case TT_UDP: case TT_UDP:
type = SOCK_DGRAM; type = SOCK_DGRAM; // UDP 使用数据报套接字
protocol = 0; protocol = 0; // 默认协议
break; break;
#ifdef IPPROTO_SCTP #ifdef IPPROTO_SCTP
case TT_SCTP: case TT_SCTP:
case TT_SCTP_SSL: case TT_SCTP_SSL:
type = SOCK_STREAM; type = SOCK_STREAM; // SCTP 使用流式套接字
protocol = IPPROTO_SCTP; protocol = IPPROTO_SCTP; // SCTP 协议
break; break;
#endif #endif
default: default:
errno = EPROTONOSUPPORT; errno = EPROTONOSUPPORT; // 不支持的协议类型
return -1; return -1; // 返回错误
} }
this->get_addr(&bind_addr, &addrlen); this->get_addr(&bind_addr, &addrlen); // 获取绑定地址和长度
this->listen_fd = socket(bind_addr->sa_family, type, protocol); this->listen_fd = socket(bind_addr->sa_family, type, protocol); // 创建套接字
if (this->listen_fd >= 0) if (this->listen_fd >= 0) // 如果套接字创建成功
{ {
setsockopt(this->listen_fd, SOL_SOCKET, SO_REUSEADDR, setsockopt(this->listen_fd, SOL_SOCKET, SO_REUSEADDR, // 设置选项以重用地址
&reuse, sizeof (int)); &reuse, sizeof (int));
} }
} }
else else
this->listen_fd = dup(this->listen_fd); this->listen_fd = dup(this->listen_fd); // 复制已存在的监听文件描述符
return this->listen_fd; return this->listen_fd; // 返回监听文件描述符
} }
// 创建新的连接
WFConnection *WFServerBase::new_connection(int accept_fd) WFConnection *WFServerBase::new_connection(int accept_fd)
{ {
// 检查当前连接数是否在最大连接数限制内
if (++this->conn_count <= this->params.max_connections || if (++this->conn_count <= this->params.max_connections ||
this->drain(1) == 1) this->drain(1) == 1) // 如果连接数未超过最大值
{ {
int reuse = 1; int reuse = 1; // 允许地址重用
setsockopt(accept_fd, SOL_SOCKET, SO_REUSEADDR, setsockopt(accept_fd, SOL_SOCKET, SO_REUSEADDR, // 设置选项以重用地址
&reuse, sizeof (int)); &reuse, sizeof (int));
return new WFServerConnection(&this->conn_count); return new WFServerConnection(&this->conn_count); // 创建新的连接对象
} }
this->conn_count--; this->conn_count--; // 如果连接数超过最大值,减少连接计数
errno = EMFILE; errno = EMFILE; // 设置错误号为文件描述符过多
return NULL; return NULL; // 返回 NULL
} }
// 删除连接
void WFServerBase::delete_connection(WFConnection *conn) void WFServerBase::delete_connection(WFConnection *conn)
{ {
delete (WFServerConnection *)conn; delete (WFServerConnection *)conn; // 删除连接对象
} }
// 处理未绑定的情况
void WFServerBase::handle_unbound() void WFServerBase::handle_unbound()
{ {
this->mutex.lock(); this->mutex.lock(); // 加锁
this->unbind_finish = true; this->unbind_finish = true; // 设置解除绑定状态
this->cond.notify_one(); this->cond.notify_one(); // 通知等待的线程
this->mutex.unlock(); this->mutex.unlock(); // 解锁
} }
// 启动服务器
int WFServerBase::start(const struct sockaddr *bind_addr, socklen_t addrlen, int WFServerBase::start(const struct sockaddr *bind_addr, socklen_t addrlen,
const char *cert_file, const char *key_file) const char *cert_file, const char *key_file)
{ {
SSL_CTX *ssl_ctx; SSL_CTX *ssl_ctx; // 声明 SSL 上下文指针
// 初始化服务器
if (this->init(bind_addr, addrlen, cert_file, key_file) >= 0) if (this->init(bind_addr, addrlen, cert_file, key_file) >= 0)
{ {
// 如果调度器绑定成功,返回 0
if (this->scheduler->bind(this) >= 0) if (this->scheduler->bind(this) >= 0)
return 0; return 0;
ssl_ctx = this->get_ssl_ctx(); ssl_ctx = this->get_ssl_ctx(); // 获取当前 SSL 上下文
this->deinit(); this->deinit(); // 反初始化
if (ssl_ctx) if (ssl_ctx) // 如果 SSL 上下文存在
SSL_CTX_free(ssl_ctx); SSL_CTX_free(ssl_ctx); // 释放 SSL 上下文
} }
this->listen_fd = -1; this->listen_fd = -1; // 设置监听文件描述符为无效
return -1; return -1; // 返回错误
} }
// 启动服务器,使用主机和端口号
int WFServerBase::start(int family, const char *host, unsigned short port, int WFServerBase::start(int family, const char *host, unsigned short port,
const char *cert_file, const char *key_file) const char *cert_file, const char *key_file)
{ {
struct addrinfo hints = { struct addrinfo hints = { // 设置地址信息结构
.ai_flags = AI_PASSIVE, .ai_flags = AI_PASSIVE, // 使套接字可用于绑定
.ai_family = family, .ai_family = family, // 设置地址族
.ai_socktype = SOCK_STREAM, .ai_socktype = SOCK_STREAM, // 设置为流套接字
}; };
struct addrinfo *addrinfo; struct addrinfo *addrinfo; // 用于存储返回的地址信息
char port_str[PORT_STR_MAX + 1]; char port_str[PORT_STR_MAX + 1]; // 存储端口字符串
int ret; int ret; // 存储返回值
// 将端口号转换为字符串
snprintf(port_str, PORT_STR_MAX + 1, "%d", port); snprintf(port_str, PORT_STR_MAX + 1, "%d", port);
ret = getaddrinfo(host, port_str, &hints, &addrinfo); ret = getaddrinfo(host, port_str, &hints, &addrinfo); // 获取地址信息
if (ret == 0) if (ret == 0) // 如果成功获取地址信息
{ {
// 启动服务器
ret = start(addrinfo->ai_addr, (socklen_t)addrinfo->ai_addrlen, ret = start(addrinfo->ai_addr, (socklen_t)addrinfo->ai_addrlen,
cert_file, key_file); cert_file, key_file);
freeaddrinfo(addrinfo); freeaddrinfo(addrinfo); // 释放地址信息
} }
else else // 获取地址信息失败
{ {
if (ret != EAI_SYSTEM) if (ret != EAI_SYSTEM) // 如果不是系统错误
errno = EINVAL; errno = EINVAL; // 设置错误号为无效参数
ret = -1; ret = -1; // 返回错误
} }
return ret; return ret; // 返回结果
} }
// 启动服务器,使用已存在的监听文件描述符
int WFServerBase::serve(int listen_fd, int WFServerBase::serve(int listen_fd,
const char *cert_file, const char *key_file) const char *cert_file, const char *key_file)
{ {
struct sockaddr_storage ss; struct sockaddr_storage ss; // 存储 socket 地址
socklen_t len = sizeof ss; socklen_t len = sizeof ss; // 定义地址长度
// 获取当前 socket 名称
if (getsockname(listen_fd, (struct sockaddr *)&ss, &len) < 0) if (getsockname(listen_fd, (struct sockaddr *)&ss, &len) < 0)
return -1; return -1; // 返回错误
this->listen_fd = listen_fd; this->listen_fd = listen_fd; // 设置监听文件描述符
return start((struct sockaddr *)&ss, len, cert_file, key_file); return start((struct sockaddr *)&ss, len, cert_file, key_file); // 启动服务器
} }
// 关闭服务器
void WFServerBase::shutdown() void WFServerBase::shutdown()
{ {
this->listen_fd = -1; this->listen_fd = -1; // 设置监听文件描述符为无效
this->scheduler->unbind(this); this->scheduler->unbind(this); // 解除调度器的绑定
} }
// 等待服务器完成
void WFServerBase::wait_finish() void WFServerBase::wait_finish()
{ {
SSL_CTX *ssl_ctx = this->get_ssl_ctx(); SSL_CTX *ssl_ctx = this->get_ssl_ctx(); // 获取当前 SSL 上下文
std::unique_lock<std::mutex> lock(this->mutex); std::unique_lock<std::mutex> lock(this->mutex); // 使用唯一锁管理 mutex
// 等待解除绑定完成
while (!this->unbind_finish) while (!this->unbind_finish)
this->cond.wait(lock); this->cond.wait(lock); // 条件等待
this->deinit(); this->deinit(); // 反初始化
this->unbind_finish = false; this->unbind_finish = false; // 重置解除绑定状态
lock.unlock(); lock.unlock(); // 解锁
if (ssl_ctx)
SSL_CTX_free(ssl_ctx);
}
if (ssl_ctx) // 如果 SSL 上下文存在
SSL_CTX_free(ssl_ctx); // 释放 SSL 上下文
}

@ -17,113 +17,117 @@
Wu Jiaxu (wujiaxu@sogou-inc.com) Wu Jiaxu (wujiaxu@sogou-inc.com)
*/ */
#ifndef _WFSERVER_H_ #ifndef _WFSERVER_H_ // 防止头文件被多次包含
#define _WFSERVER_H_ #define _WFSERVER_H_
#include <sys/types.h> #include <sys/types.h> // 引入基本数据类型定义
#include <sys/socket.h> #include <sys/socket.h> // 引入 socket 相关的函数
#include <errno.h> #include <errno.h> // 引入错误码定义
#include <functional> #include <functional> // 引入函数对象
#include <atomic> #include <atomic> // 引入原子操作的支持
#include <mutex> #include <mutex> // 引入互斥量支持
#include <condition_variable> #include <condition_variable> // 引入条件变量支持
#include <openssl/ssl.h> #include <openssl/ssl.h> // 引入 OpenSSL 以支持 SSL/TLS
#include "EndpointParams.h" #include "EndpointParams.h" // 引入端点参数定义
#include "WFTaskFactory.h" #include "WFTaskFactory.h" // 引入任务工厂以创建任务
// 定义服务器参数结构体
struct WFServerParams struct WFServerParams
{ {
enum TransportType transport_type; enum TransportType transport_type; // 传输类型TCP, UDP, SSL等
size_t max_connections; size_t max_connections; // 最大并发连接数
int peer_response_timeout; /* timeout of each read or write operation */ int peer_response_timeout; // 每次读取或写入操作的超时
int receive_timeout; /* timeout of receiving the whole message */ int receive_timeout; // 接收整个消息的超时
int keep_alive_timeout; int keep_alive_timeout; // 连接保持存活的超时
size_t request_size_limit; size_t request_size_limit; // 请求大小限制
int ssl_accept_timeout; /* if not ssl, this will be ignored */ int ssl_accept_timeout; // SSL 接受超时时间
}; };
// 设置服务器参数的默认值
static constexpr struct WFServerParams SERVER_PARAMS_DEFAULT = static constexpr struct WFServerParams SERVER_PARAMS_DEFAULT =
{ {
.transport_type = TT_TCP, .transport_type = TT_TCP, // 默认使用 TCP
.max_connections = 2000, .max_connections = 2000, // 默认最多 2000 个连接
.peer_response_timeout = 10 * 1000, .peer_response_timeout = 10 * 1000, // 默认对端响应超时 10 秒
.receive_timeout = -1, .receive_timeout = -1, // 默认不超时
.keep_alive_timeout = 60 * 1000, .keep_alive_timeout = 60 * 1000, // 默认连接保持存活 60 秒
.request_size_limit = (size_t)-1, .request_size_limit = (size_t)-1, // 请求大小限制为无限制
.ssl_accept_timeout = 10 * 1000, .ssl_accept_timeout = 10 * 1000, // SSL 接受超时设置为 10 秒
}; };
// 定义 WFServerBase 基类,用于建立服务器
class WFServerBase : protected CommService class WFServerBase : protected CommService
{ {
public: public:
// 构造函数,接收服务器参数并初始化
WFServerBase(const struct WFServerParams *params) : WFServerBase(const struct WFServerParams *params) :
conn_count(0) conn_count(0) // 初始化连接计数为 0
{ {
this->params = *params; this->params = *params; // 保存服务器参数
this->unbind_finish = false; this->unbind_finish = false; // 初始化解除绑定状态
this->listen_fd = -1; this->listen_fd = -1; // 初始化监听文件描述符为无效值
} }
public: public:
/* To start a TCP server */ // 启动一个 TCP 服务器函数
/* Start on port with IPv4. */ /* 在指定端口上启动服务器IPv4 */
int start(unsigned short port) int start(unsigned short port)
{ {
return start(AF_INET, NULL, port, NULL, NULL); return start(AF_INET, NULL, port, NULL, NULL);
} }
/* Start with family. AF_INET or AF_INET6. */ /* 根据家庭类型启动服务器IPv4 或 IPv6 */
int start(int family, unsigned short port) int start(int family, unsigned short port)
{ {
return start(family, NULL, port, NULL, NULL); return start(family, NULL, port, NULL, NULL);
} }
/* Start with hostname and port. */ /* 使用主机名和端口启动服务器 */
int start(const char *host, unsigned short port) int start(const char *host, unsigned short port)
{ {
return start(AF_INET, host, port, NULL, NULL); return start(AF_INET, host, port, NULL, NULL);
} }
/* Start with family, hostname and port. */ /* 使用家庭类型、主机名和端口启动服务器 */
int start(int family, const char *host, unsigned short port) int start(int family, const char *host, unsigned short port)
{ {
return start(family, host, port, NULL, NULL); return start(family, host, port, NULL, NULL);
} }
/* Start with binding address. */ /* 使用指定的地址绑定启动服务器 */
int start(const struct sockaddr *bind_addr, socklen_t addrlen) int start(const struct sockaddr *bind_addr, socklen_t addrlen)
{ {
return start(bind_addr, addrlen, NULL, NULL); return start(bind_addr, addrlen, NULL, NULL);
} }
/* To start an SSL server. */ // 启动一个 SSL 服务器
/* 在指定端口启动 SSL 服务器 */
int start(unsigned short port, const char *cert_file, const char *key_file) int start(unsigned short port, const char *cert_file, const char *key_file)
{ {
return start(AF_INET, NULL, port, cert_file, key_file); return start(AF_INET, NULL, port, cert_file, key_file);
} }
/* 使用家庭类型和端口启动 SSL 服务器 */
int start(int family, unsigned short port, int start(int family, unsigned short port,
const char *cert_file, const char *key_file) const char *cert_file, const char *key_file)
{ {
return start(family, NULL, port, cert_file, key_file); return start(family, NULL, port, cert_file, key_file);
} }
/* 使用主机名和端口启动 SSL 服务器 */
int start(const char *host, unsigned short port, int start(const char *host, unsigned short port,
const char *cert_file, const char *key_file) const char *cert_file, const char *key_file)
{ {
return start(AF_INET, host, port, cert_file, key_file); return start(AF_INET, host, port, cert_file, key_file);
} }
/* 使用家庭类型、主机名和端口启动 SSL 服务器 */
int start(int family, const char *host, unsigned short port, int start(int family, const char *host, unsigned short port,
const char *cert_file, const char *key_file); const char *cert_file, const char *key_file);
/* This is the only necessary start function. */ /* 使用指定文件描述符启动服务器,用于优雅重启或 SCTP 服务器 */
int start(const struct sockaddr *bind_addr, socklen_t addrlen,
const char *cert_file, const char *key_file);
/* To start with a specified fd. For graceful restart or SCTP server. */
int serve(int listen_fd) int serve(int listen_fd)
{ {
return serve(listen_fd, NULL, NULL); return serve(listen_fd, NULL, NULL);
@ -131,114 +135,119 @@ public:
int serve(int listen_fd, const char *cert_file, const char *key_file); int serve(int listen_fd, const char *cert_file, const char *key_file);
/* stop() is a blocking operation. */ /* 停止服务器是一个阻塞操作 */
void stop() void stop()
{ {
this->shutdown(); this->shutdown(); // 关闭服务器
this->wait_finish(); this->wait_finish(); // 等待完成
} }
/* Nonblocking terminating the server. For stopping multiple servers. /* 非阻塞地终止服务器。用于停止多个服务器。
* Typically, call shutdown() and then wait_finish(). * shutdown() wait_finish()
* But indeed wait_finish() can be called before shutdown(), even before * 线 shutdown() start() wait_finish() */
* start() in another thread. */ void shutdown(); // 停止服务器
void shutdown(); void wait_finish(); // 等待所有连接和任务完成
void wait_finish();
public: public:
// 获取当前连接数
size_t get_conn_count() const { return this->conn_count; } size_t get_conn_count() const { return this->conn_count; }
/* Get the listening address. This is often used after starting /* 获取监听地址。这通常在服务器在随机端口上启动后使用start() 端口为 0。 */
* server on a random port (start() with port == 0). */
int get_listen_addr(struct sockaddr *addr, socklen_t *addrlen) const int get_listen_addr(struct sockaddr *addr, socklen_t *addrlen) const
{ {
if (this->listen_fd >= 0) if (this->listen_fd >= 0)
return getsockname(this->listen_fd, addr, addrlen); return getsockname(this->listen_fd, addr, addrlen); // 获取当前的 socket 地址
errno = ENOTCONN; errno = ENOTCONN; // 如果没有连接,设置错误号为没有连接
return -1; return -1; // 返回错误
} }
// 获取服务器参数
const struct WFServerParams *get_params() const { return &this->params; } const struct WFServerParams *get_params() const { return &this->params; }
protected: protected:
/* Override this function to create the initial SSL CTX of the server */ /* 重写此函数以创建服务器的初始 SSL CTX */
virtual SSL_CTX *new_ssl_ctx(const char *cert_file, const char *key_file); virtual SSL_CTX *new_ssl_ctx(const char *cert_file, const char *key_file);
/* Override this function to implement server that supports TLS SNI. /* 重写此函数以实现支持 TLS SNI 的服务器。
* "servername" will be NULL if client does not set a host name. * "servername" NULL
* Returning NULL to indicate that servername is not supported. */ * NULL */
virtual SSL_CTX *get_server_ssl_ctx(const char *servername) virtual SSL_CTX *get_server_ssl_ctx(const char *servername)
{ {
return this->get_ssl_ctx(); return this->get_ssl_ctx(); // 返回当前的 SSL 上下文
} }
/* This can be used by the implementation of 'new_ssl_ctx'. */ /* 这个函数可以在 'new_ssl_ctx' 的实现中使用。 */
static int ssl_ctx_callback(SSL *ssl, int *al, void *arg); static int ssl_ctx_callback(SSL *ssl, int *al, void *arg);
protected: protected:
WFServerParams params; WFServerParams params; // 服务器参数结构体
protected: protected:
// 创建新连接的方法
virtual int create_listen_fd(); virtual int create_listen_fd();
virtual WFConnection *new_connection(int accept_fd); virtual WFConnection *new_connection(int accept_fd); // 创建新连接
void delete_connection(WFConnection *conn); void delete_connection(WFConnection *conn); // 删除连接
private: private:
// 初始化服务器的方法
int init(const struct sockaddr *bind_addr, socklen_t addrlen, int init(const struct sockaddr *bind_addr, socklen_t addrlen,
const char *cert_file, const char *key_file); const char *cert_file, const char *key_file);
virtual void handle_unbound(); virtual void handle_unbound(); // 处理解除绑定
protected: protected:
std::atomic<size_t> conn_count; std::atomic<size_t> conn_count; // 当前连接计数
private: private:
int listen_fd; int listen_fd; // 监听文件描述符
bool unbind_finish; bool unbind_finish; // 标记解除绑定是否完成
std::mutex mutex; std::mutex mutex; // 互斥锁,用于线程安全
std::condition_variable cond; std::condition_variable cond; // 条件变量,用于线程间通知
class CommScheduler *scheduler; class CommScheduler *scheduler; // 调度器
}; };
// 模板类,用于定义特定协议的服务器
template<class REQ, class RESP> template<class REQ, class RESP>
class WFServer : public WFServerBase class WFServer : public WFServerBase
{ {
public: public:
// 构造函数,接收参数和处理函数
WFServer(const struct WFServerParams *params, WFServer(const struct WFServerParams *params,
std::function<void (WFNetworkTask<REQ, RESP> *)> proc) : std::function<void (WFNetworkTask<REQ, RESP> *)> proc) :
WFServerBase(params), WFServerBase(params), // 调用基类构造函数
process(std::move(proc)) process(std::move(proc)) // 移动处理函数,避免不必要的拷贝
{ {
} }
// 构造函数,使用默认参数
WFServer(std::function<void (WFNetworkTask<REQ, RESP> *)> proc) : WFServer(std::function<void (WFNetworkTask<REQ, RESP> *)> proc) :
WFServerBase(&SERVER_PARAMS_DEFAULT), WFServerBase(&SERVER_PARAMS_DEFAULT), // 调用基类构造函数,传入默认参数
process(std::move(proc)) process(std::move(proc)) // 移动处理函数
{ {
} }
protected: protected:
virtual CommSession *new_session(long long seq, CommConnection *conn); virtual CommSession *new_session(long long seq, CommConnection *conn); // 创建新会话
protected: protected:
std::function<void (WFNetworkTask<REQ, RESP> *)> process; std::function<void (WFNetworkTask<REQ, RESP> *)> process; // 处理函数
}; };
// 创建新会话的方法,实现具体的会话处理
template<class REQ, class RESP> template<class REQ, class RESP>
CommSession *WFServer<REQ, RESP>::new_session(long long seq, CommConnection *conn) CommSession *WFServer<REQ, RESP>::new_session(long long seq, CommConnection *conn)
{ {
using factory = WFNetworkTaskFactory<REQ, RESP>; using factory = WFNetworkTaskFactory<REQ, RESP>; // 使用工厂来创建任务
WFNetworkTask<REQ, RESP> *task; WFNetworkTask<REQ, RESP> *task; // 声明任务指针
task = factory::create_server_task(this, this->process); task = factory::create_server_task(this, this->process); // 使用工厂创建任务
task->set_keep_alive(this->params.keep_alive_timeout); task->set_keep_alive(this->params.keep_alive_timeout); // 设置保持连接的超时
task->set_receive_timeout(this->params.receive_timeout); task->set_receive_timeout(this->params.receive_timeout); // 设置接收超时
task->get_req()->set_size_limit(this->params.request_size_limit); task->get_req()->set_size_limit(this->params.request_size_limit); // 设置请求大小限制
return task; return task; // 返回创建的会话任务
} }
#endif #endif // _WFSERVER_H_

@ -1,6 +1,10 @@
-- 定义一个 xmake 目标,名为 server
target("server") target("server")
set_kind("object") set_kind("object") -- 设置目标类型为对象文件
add_files("*.cc")
add_files("*.cc") -- 添加当前目录下所有的 .cc 文件到目标中
-- 检查是否未定义 MySQL 配置
if not has_config("mysql") then if not has_config("mysql") then
remove_files("WFMySQLServer.cc") remove_files("WFMySQLServer.cc") -- 如果未定义 MySQL 配置,则移除 MySQL 服务器相关的源文件
end end

Loading…
Cancel
Save