Compare commits

..

13 Commits

File diff suppressed because it is too large Load Diff

@ -0,0 +1,308 @@
/*
Copyright (c) 2021 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Liu Kai (liukaidx@sogou-inc.com)
*/
#include <string>
#include <vector>
#include <atomic>
#include "URIParser.h"
#include "StringUtil.h"
#include "WFDnsClient.h"
using namespace protocol;
using DnsCtx = std::function<void (WFDnsTask *)>;
using ComplexTask = WFComplexClientTask<DnsRequest, DnsResponse, DnsCtx>;
// 定义一个DnsParams类用于存储DNS查询参数
class DnsParams
{
public:
// 定义一个内部结构体dns_params用于存储具体的DNS参数
struct dns_params
{
std::vector<ParsedURI> uris; // 存储解析后的URI
std::vector<std::string> search_list; // 搜索列表
int ndots; // 域名中允许的最小点数
int attempts; // 最大尝试次数
bool rotate; // 是否轮询
};
public:
// DnsParams类的构造函数
DnsParams()
{
this->ref = new std::atomic<size_t>(1); // 初始化引用计数为1
this->params = new dns_params(); // 初始化参数结构体
}
// DnsParams类的拷贝构造函数
DnsParams(const DnsParams& p)
{
this->ref = p.ref; // 拷贝引用计数指针
this->params = p.params; // 拷贝参数结构体指针
this->incref(); // 增加引用计数
}
// DnsParams类的赋值运算符
DnsParams& operator=(const DnsParams& p)
{
if (this != &p) // 如果不是自赋值
{
this->decref(); // 减少当前对象的引用计数
this->ref = p.ref; // 拷贝引用计数指针
this->params = p.params; // 拷贝参数结构体指针
this->incref(); // 增加引用计数
}
return *this; // 返回当前对象的引用
}
// DnsParams类的析构函数
~DnsParams() { this->decref(); } // 减少引用计数
// 获取const类型的参数指针
const dns_params *get_params() const { return this->params; }
// 获取非const类型的参数指针
dns_params *get_params() { return this->params; }
private:
// 增加引用计数
void incref() { (*this->ref)++; }
// 减少引用计数并在计数为0时释放资源
void decref()
{
if (--*this->ref == 0)
{
delete this->params; // 删除参数结构体
delete this->ref; // 删除引用计数
}
}
private:
dns_params *params; // 指向参数结构体的指针
std::atomic<size_t> *ref; // 指向引用计数的指针
};
// 定义DNS状态枚举
enum
{
DNS_STATUS_TRY_ORIGIN_DONE = 0,
DNS_STATUS_TRY_ORIGIN_FIRST = 1,
DNS_STATUS_TRY_ORIGIN_LAST = 2
};
// 定义DnsStatus结构体用于存储DNS查询状态
struct DnsStatus
{
std::string origin_name; // 原始域名
std::string current_name; // 当前域名
size_t next_server; // 下一个要尝试的服务器
size_t last_server; // 上一个尝试的服务器
size_t next_domain; // 下一个要尝试的搜索域
int attempts_left; // 剩余尝试次数
int try_origin_state; // 尝试原始域名的状态
};
// 计算字符串中点的数量
static int __get_ndots(const std::string& s)
{
int ndots = 0;
for (size_t i = 0; i < s.size(); i++)
ndots += s[i] == '.'; // 统计点的数量
return ndots;
}
// 检查是否有下一个域名要尝试
static bool __has_next_name(const DnsParams::dns_params *p,
struct DnsStatus *s)
{
if (s->try_origin_state == DNS_STATUS_TRY_ORIGIN_FIRST)
{
s->current_name = s->origin_name; // 设置当前域名为原始域名
s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE; // 更新状态
return true;
}
if (s->next_domain < p->search_list.size()) // 如果还有搜索域要尝试
{
s->current_name = s->origin_name; // 设置当前域名为原始域名
s->current_name.push_back('.'); // 添加点
s->current_name.append(p->search_list[s->next_domain]); // 添加搜索域
s->next_domain++; // 移动到下一个搜索域
return true;
}
if (s->try_origin_state == DNS_STATUS_TRY_ORIGIN_LAST)
{
s->current_name = s->origin_name; // 设置当前域名为原始域名
s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE; // 更新状态
return true;
}
return false; // 没有下一个域名要尝试
}
// DNS查询回调内部函数
static void __callback_internal(WFDnsTask *task, const DnsParams& params,
struct DnsStatus& s)
{
ComplexTask *ctask = static_cast<ComplexTask *>(task); // 转换任务类型
int state = task->get_state(); // 获取任务状态
DnsRequest *req = task->get_req(); // 获取DNS请求
DnsResponse *resp = task->get_resp(); // 获取DNS响应
const auto *p = params.get_params(); // 获取DNS参数
int rcode = resp->get_rcode(); // 获取响应码
bool try_next_server = state != WFT_STATE_SUCCESS || // 如果状态不是成功
rcode == DNS_RCODE_SERVER_FAILURE || // 或者响应码是服务器失败
rcode == DNS_RCODE_NOT_IMPLEMENTED || // 或者响应码是未实现
rcode == DNS_RCODE_REFUSED; // 或者响应码是拒绝
bool try_next_name = rcode == DNS_RCODE_FORMAT_ERROR || // 如果响应码是格式错误
rcode == DNS_RCODE_NAME_ERROR || // 或者响应码是名字错误
resp->get_ancount() == 0; // 或者响应中没有答案
if (try_next_server)
{
if (s.last_server == s.next_server) // 如果已经是最后一个服务器
s.attempts_left--; // 减少尝试次数
if (s.attempts_left <= 0) // 如果尝试次数用完
return; // 返回
s.next_server = (s.next_server + 1) % p->uris.size(); // 计算下一个服务器
ctask->set_redirect(p->uris[s.next_server]); // 设置重定向
return; // 返回
}
if (try_next_name && __has_next_name(p, &s)) // 如果需要尝试下一个名字
{
req->set_question_name(s.current_name.c_str()); // 设置查询名字
ctask->set_redirect(p->uris[s.next_server]); // 设置重定向
return; // 返回
}
}
// WFDnsClient类的初始化函数带一个URL参数
int WFDnsClient::init(const std::string& url)
{
return this->init(url, "", 1, 2, false); // 调用另一个初始化函数
}
// WFDnsClient类的初始化函数用于配置DNS客户端
int WFDnsClient::init(const std::string& url, const std::string& search_list,
int ndots, int attempts, bool rotate)
{
std::vector<std::string> hosts; // 用于存储分割后的主机名
std::vector<ParsedURI> uris; // 用于存储解析后的URI
std::string host; // 单个主机名字符串
ParsedURI uri; // 用于存储解析后的单个URI
this->id = 0; // 初始化客户端ID为0
hosts = StringUtil::split_filter_empty(url, ','); // 根据逗号分割URL字符串获取主机名列表
// 遍历主机名列表,对每个主机名进行处理
for (size_t i = 0; i < hosts.size(); i++)
{
host = hosts[i]; // 获取当前主机名
// 检查主机名是否以"dns://"或"dnss://"开头,如果不是,则添加"dns://"前缀
if (strncasecmp(host.c_str(), "dns://", 6) != 0 &&
strncasecmp(host.c_str(), "dnss://", 7) != 0)
{
host = "dns://" + host;
}
// 使用URIParser解析当前主机名如果解析失败则返回错误码-1
if (URIParser::parse(host, uri) != 0)
return -1;
// 将解析后的URI添加到uris列表中
uris.emplace_back(std::move(uri));
}
// 如果uris列表为空或者ndots小于0或者attempts小于1则设置errno为EINVAL并返回错误码-1
if (uris.empty() || ndots < 0 || attempts < 1)
{
errno = EINVAL;
return -1;
}
// 创建一个新的DnsParams对象来存储DNS参数
this->params = new DnsParams;
DnsParams::dns_params *q = ((DnsParams *)this->params)->get_params(); // 获取DNS参数的指针
q->uris = std::move(uris); // 将解析后的URI列表移动到DNS参数中
q->search_list = StringUtil::split_filter_empty(search_list, ','); // 根据逗号分割search_list字符串获取搜索域列表
// 设置ndots的值如果大于15则设置为15
q->ndots = ndots > 15 ? 15 : ndots;
// 设置attempts的值如果大于5则设置为5
q->attempts = attempts > 5 ? 5 : attempts;
q->rotate = rotate; // 设置是否轮询
return 0; // 初始化成功返回0
}
// WFDnsClient类的析构函数用于释放资源
void WFDnsClient::deinit()
{
delete (DnsParams *)this->params; // 删除分配的DnsParams对象
this->params = NULL; // 将params指针设置为NULL
}
// WFDnsClient类的方法用于创建一个新的DNS任务
WFDnsTask *WFDnsClient::create_dns_task(const std::string& name,
dns_callback_t callback)
{
DnsParams::dns_params *p = ((DnsParams *)this->params)->get_params(); // 获取DNS参数
struct DnsStatus status; // 创建DNS状态结构体
size_t next_server; // 下一个要尝试的服务器索引
WFDnsTask *task; // 指向新创建的DNS任务的指针
DnsRequest *req; // 指向DNS请求的指针
// 如果启用轮询,则计算下一个服务器索引,否则使用第一个服务器
next_server = p->rotate ? this->id++ % p->uris.size() : 0;
status.origin_name = name; // 设置原始域名
status.next_domain = 0; // 设置下一个要尝试的搜索域索引
status.attempts_left = p->attempts; // 设置剩余尝试次数
status.try_origin_state = DNS_STATUS_TRY_ORIGIN_FIRST; // 设置尝试原始域名的状态
// 如果域名以点结尾,则跳过搜索域
if (!name.empty() && name.back() == '.')
status.next_domain = p->search_list.size();
// 如果域名中的点数少于ndots则设置尝试原始域名的状态为最后尝试
else if (__get_ndots(name) < p->ndots)
status.try_origin_state = DNS_STATUS_TRY_ORIGIN_LAST;
// 检查是否有下一个域名要尝试,并更新状态
__has_next_name(p, &status);
// 创建一个新的DNS任务使用下一个服务器的URI和提供的回调函数
task = WFTaskFactory::create_dns_task(p->uris[next_server], 0, std::move(callback));
status.next_server = next_server; // 设置当前服务器索引
status.last_server = (next_server + p->uris.size() - 1) % p->uris.size(); // 设置最后一个服务器索引
req = task->get_req(); // 获取DNS请求对象
req->set_question(status.current_name.c_str(), DNS_TYPE_A, DNS_CLASS_IN); // 设置DNS请求的问题部分
req->set_rd(1); // 设置DNS请求的递归标志
ComplexTask *ctask = static_cast<ComplexTask *>(task); // 将任务转换为ComplexTask类型
// 设置任务的上下文回调当DNS任务完成时将调用__callback_internal函数
*ctask->get_mutable_ctx() = std::bind(__callback_internal,
std::placeholders::_1,
*(DnsParams *)params, status);
return task; // 返回新创建的DNS任务
}

@ -0,0 +1 @@
[1217/001811.230:ERROR:registration_protocol_win.cc(108)] CreateFile: 系统找不到指定的文件。 (0x2)

File diff suppressed because it is too large Load Diff

Binary file not shown.

Binary file not shown.

@ -0,0 +1,18 @@
cmake_minimum_required(VERSION 3.6) # CMake 3.6
project(server) # "server"
#
set(SRC
WFServer.cc # WFServer.cc
)
# MySQL "n"
if (NOT MYSQL STREQUAL "n")
set(SRC
${SRC} #
WFMySQLServer.cc # WFMySQLServer.cc
)
endif ()
#
add_library(${PROJECT_NAME} OBJECT ${SRC}) # "server"

@ -0,0 +1,70 @@
/*
Copyright (c) 2021 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Liu Kai (liukaidx@sogou-inc.com)
*/
#ifndef _WFDNSSERVER_H_
#define _WFDNSSERVER_H_
#include "DnsMessage.h" // 引入 DNS 消息相关的头文件
#include "WFServer.h" // 引入服务器功能的头文件
#include "WFTaskFactory.h" // 引入任务工厂以创建 DNS 任务
// 定义一个 dns_process_t 类型,它是一个接受 WFDnsTask 指针并返回 void 的函数对象
using dns_process_t = std::function<void (WFDnsTask *)>;
// 定义 WFDnsServer 类型,基于 WFServer 模板类,使用 DnsRequest 和 DnsResponse 作为协议
using WFDnsServer = WFServer<protocol::DnsRequest, protocol::DnsResponse>;
// 设置 DNS 服务器的默认参数
static constexpr struct WFServerParams DNS_SERVER_PARAMS_DEFAULT =
{
.transport_type = TT_UDP, // 传输类型为 UDP
.max_connections = 2000, // 最大连接数
.peer_response_timeout = 10 * 1000, // 对端响应超时时间
.receive_timeout = -1, // 接收超时时间,-1 表示不超时
.keep_alive_timeout = 300 * 1000, // 保持连接的超时时间
.request_size_limit = (size_t)-1, // 请求大小限制,-1 表示无限制
.ssl_accept_timeout = 5000, // SSL 接受超时时间
};
// WFDnsServer 构造函数的特化模板
template<> inline
WFDnsServer::WFServer(dns_process_t proc) :
WFServerBase(&DNS_SERVER_PARAMS_DEFAULT), // 调用基类构造函数,传入默认参数
process(std::move(proc)) // 移动构造函数以提高性能
{
}
// 创建新的会话,重写基类方法
template<> inline
CommSession *WFDnsServer::new_session(long long seq, CommConnection *conn)
{
WFDnsTask *task;
// 使用任务工厂创建一个新的 DNS 任务
task = WFServerTaskFactory::create_dns_task(this, this->process);
// 设置保持连接的超时时间
task->set_keep_alive(this->params.keep_alive_timeout);
// 设置接收超时时间
task->set_receive_timeout(this->params.receive_timeout);
// 设置请求大小限制
task->get_req()->set_size_limit(this->params.request_size_limit);
return task; // 返回创建的任务
}
#endif // _WFDNSSERVER_H_

@ -0,0 +1,10 @@
-- 定义一个 xmake 目标,名为 server
target("server")
set_kind("object") -- 设置目标类型为对象文件
add_files("*.cc") -- 添加当前目录下所有的 .cc 文件到目标中
-- 检查是否未定义 MySQL 配置
if not has_config("mysql") then
remove_files("WFMySQLServer.cc") -- 如果未定义 MySQL 配置,则移除 MySQL 服务器相关的源文件
end

@ -28,76 +28,65 @@ using namespace protocol;
using DnsCtx = std::function<void (WFDnsTask *)>;
using ComplexTask = WFComplexClientTask<DnsRequest, DnsResponse, DnsCtx>;
// 定义一个DnsParams类用于存储DNS查询参数
class DnsParams
{
public:
// 定义一个内部结构体dns_params用于存储具体的DNS参数
struct dns_params
{
std::vector<ParsedURI> uris; // 存储解析后的URI
std::vector<std::string> search_list; // 搜索列表
int ndots; // 域名中允许的最小点数
int attempts; // 最大尝试次数
bool rotate; // 是否轮询
std::vector<ParsedURI> uris;
std::vector<std::string> search_list;
int ndots;
int attempts;
bool rotate;
};
public:
// DnsParams类的构造函数
DnsParams()
{
this->ref = new std::atomic<size_t>(1); // 初始化引用计数为1
this->params = new dns_params(); // 初始化参数结构体
this->ref = new std::atomic<size_t>(1);
this->params = new dns_params();
}
// DnsParams类的拷贝构造函数
DnsParams(const DnsParams& p)
{
this->ref = p.ref; // 拷贝引用计数指针
this->params = p.params; // 拷贝参数结构体指针
this->incref(); // 增加引用计数
this->ref = p.ref;
this->params = p.params;
this->incref();
}
// DnsParams类的赋值运算符
DnsParams& operator=(const DnsParams& p)
{
if (this != &p) // 如果不是自赋值
if (this != &p)
{
this->decref(); // 减少当前对象的引用计数
this->ref = p.ref; // 拷贝引用计数指针
this->params = p.params; // 拷贝参数结构体指针
this->incref(); // 增加引用计数
this->decref();
this->ref = p.ref;
this->params = p.params;
this->incref();
}
return *this; // 返回当前对象的引用
return *this;
}
// DnsParams类的析构函数
~DnsParams() { this->decref(); } // 减少引用计数
~DnsParams() { this->decref(); }
// 获取const类型的参数指针
const dns_params *get_params() const { return this->params; }
// 获取非const类型的参数指针
dns_params *get_params() { return this->params; }
private:
// 增加引用计数
void incref() { (*this->ref)++; }
// 减少引用计数并在计数为0时释放资源
void decref()
{
if (--*this->ref == 0)
{
delete this->params; // 删除参数结构体
delete this->ref; // 删除引用计数
delete this->params;
delete this->ref;
}
}
private:
dns_params *params; // 指向参数结构体的指针
std::atomic<size_t> *ref; // 指向引用计数的指针
dns_params *params;
std::atomic<size_t> *ref;
};
// 定义DNS状态枚举
enum
{
DNS_STATUS_TRY_ORIGIN_DONE = 0,
@ -105,204 +94,184 @@ enum
DNS_STATUS_TRY_ORIGIN_LAST = 2
};
// 定义DnsStatus结构体用于存储DNS查询状态
struct DnsStatus
{
std::string origin_name; // 原始域名
std::string current_name; // 当前域名
size_t next_server; // 下一个要尝试的服务器
size_t last_server; // 上一个尝试的服务器
size_t next_domain; // 下一个要尝试的搜索域
int attempts_left; // 剩余尝试次数
int try_origin_state; // 尝试原始域名的状态
std::string origin_name;
std::string current_name;
size_t next_server; // next server to try
size_t last_server; // last server to try
size_t next_domain; // next search domain to try
int attempts_left;
int try_origin_state;
};
// 计算字符串中点的数量
static int __get_ndots(const std::string& s)
{
int ndots = 0;
for (size_t i = 0; i < s.size(); i++)
ndots += s[i] == '.'; // 统计点的数量
ndots += s[i] == '.';
return ndots;
}
// 检查是否有下一个域名要尝试
static bool __has_next_name(const DnsParams::dns_params *p,
struct DnsStatus *s)
{
if (s->try_origin_state == DNS_STATUS_TRY_ORIGIN_FIRST)
{
s->current_name = s->origin_name; // 设置当前域名为原始域名
s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE; // 更新状态
s->current_name = s->origin_name;
s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE;
return true;
}
if (s->next_domain < p->search_list.size()) // 如果还有搜索域要尝试
if (s->next_domain < p->search_list.size())
{
s->current_name = s->origin_name; // 设置当前域名为原始域名
s->current_name.push_back('.'); // 添加点
s->current_name.append(p->search_list[s->next_domain]); // 添加搜索域
s->current_name = s->origin_name;
s->current_name.push_back('.');
s->current_name.append(p->search_list[s->next_domain]);
s->next_domain++; // 移动到下一个搜索域
s->next_domain++;
return true;
}
if (s->try_origin_state == DNS_STATUS_TRY_ORIGIN_LAST)
{
s->current_name = s->origin_name; // 设置当前域名为原始域名
s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE; // 更新状态
s->current_name = s->origin_name;
s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE;
return true;
}
return false; // 没有下一个域名要尝试
return false;
}
// DNS查询回调内部函数
static void __callback_internal(WFDnsTask *task, const DnsParams& params,
struct DnsStatus& s)
{
ComplexTask *ctask = static_cast<ComplexTask *>(task); // 转换任务类型
int state = task->get_state(); // 获取任务状态
DnsRequest *req = task->get_req(); // 获取DNS请求
DnsResponse *resp = task->get_resp(); // 获取DNS响应
const auto *p = params.get_params(); // 获取DNS参数
int rcode = resp->get_rcode(); // 获取响应码
bool try_next_server = state != WFT_STATE_SUCCESS || // 如果状态不是成功
rcode == DNS_RCODE_SERVER_FAILURE || // 或者响应码是服务器失败
rcode == DNS_RCODE_NOT_IMPLEMENTED || // 或者响应码是未实现
rcode == DNS_RCODE_REFUSED; // 或者响应码是拒绝
bool try_next_name = rcode == DNS_RCODE_FORMAT_ERROR || // 如果响应码是格式错误
rcode == DNS_RCODE_NAME_ERROR || // 或者响应码是名字错误
resp->get_ancount() == 0; // 或者响应中没有答案
ComplexTask *ctask = static_cast<ComplexTask *>(task);
int state = task->get_state();
DnsRequest *req = task->get_req();
DnsResponse *resp = task->get_resp();
const auto *p = params.get_params();
int rcode = resp->get_rcode();
bool try_next_server = state != WFT_STATE_SUCCESS ||
rcode == DNS_RCODE_SERVER_FAILURE ||
rcode == DNS_RCODE_NOT_IMPLEMENTED ||
rcode == DNS_RCODE_REFUSED;
bool try_next_name = rcode == DNS_RCODE_FORMAT_ERROR ||
rcode == DNS_RCODE_NAME_ERROR ||
resp->get_ancount() == 0;
if (try_next_server)
{
if (s.last_server == s.next_server) // 如果已经是最后一个服务器
s.attempts_left--; // 减少尝试次数
if (s.attempts_left <= 0) // 如果尝试次数用完
return; // 返回
s.next_server = (s.next_server + 1) % p->uris.size(); // 计算下一个服务器
ctask->set_redirect(p->uris[s.next_server]); // 设置重定向
return; // 返回
if (s.last_server == s.next_server)
s.attempts_left--;
if (s.attempts_left <= 0)
return;
s.next_server = (s.next_server + 1) % p->uris.size();
ctask->set_redirect(p->uris[s.next_server]);
return;
}
if (try_next_name && __has_next_name(p, &s)) // 如果需要尝试下一个名字
if (try_next_name && __has_next_name(p, &s))
{
req->set_question_name(s.current_name.c_str()); // 设置查询名字
ctask->set_redirect(p->uris[s.next_server]); // 设置重定向
return; // 返回
req->set_question_name(s.current_name.c_str());
ctask->set_redirect(p->uris[s.next_server]);
return;
}
}
// WFDnsClient类的初始化函数带一个URL参数
int WFDnsClient::init(const std::string& url)
{
return this->init(url, "", 1, 2, false); // 调用另一个初始化函数
return this->init(url, "", 1, 2, false);
}
// WFDnsClient类的初始化函数用于配置DNS客户端
int WFDnsClient::init(const std::string& url, const std::string& search_list,
int ndots, int attempts, bool rotate)
{
std::vector<std::string> hosts; // 用于存储分割后的主机名
std::vector<ParsedURI> uris; // 用于存储解析后的URI
std::string host; // 单个主机名字符串
ParsedURI uri; // 用于存储解析后的单个URI
std::vector<std::string> hosts;
std::vector<ParsedURI> uris;
std::string host;
ParsedURI uri;
this->id = 0; // 初始化客户端ID为0
hosts = StringUtil::split_filter_empty(url, ','); // 根据逗号分割URL字符串获取主机名列表
this->id = 0;
hosts = StringUtil::split_filter_empty(url, ',');
// 遍历主机名列表,对每个主机名进行处理
for (size_t i = 0; i < hosts.size(); i++)
{
host = hosts[i]; // 获取当前主机名
// 检查主机名是否以"dns://"或"dnss://"开头,如果不是,则添加"dns://"前缀
host = hosts[i];
if (strncasecmp(host.c_str(), "dns://", 6) != 0 &&
strncasecmp(host.c_str(), "dnss://", 7) != 0)
{
host = "dns://" + host;
}
// 使用URIParser解析当前主机名如果解析失败则返回错误码-1
if (URIParser::parse(host, uri) != 0)
return -1;
// 将解析后的URI添加到uris列表中
uris.emplace_back(std::move(uri));
}
// 如果uris列表为空或者ndots小于0或者attempts小于1则设置errno为EINVAL并返回错误码-1
if (uris.empty() || ndots < 0 || attempts < 1)
{
errno = EINVAL;
return -1;
}
// 创建一个新的DnsParams对象来存储DNS参数
this->params = new DnsParams;
DnsParams::dns_params *q = ((DnsParams *)this->params)->get_params(); // 获取DNS参数的指针
q->uris = std::move(uris); // 将解析后的URI列表移动到DNS参数中
q->search_list = StringUtil::split_filter_empty(search_list, ','); // 根据逗号分割search_list字符串获取搜索域列表
// 设置ndots的值如果大于15则设置为15
DnsParams::dns_params *q = ((DnsParams *)this->params)->get_params();
q->uris = std::move(uris);
q->search_list = StringUtil::split_filter_empty(search_list, ',');
q->ndots = ndots > 15 ? 15 : ndots;
// 设置attempts的值如果大于5则设置为5
q->attempts = attempts > 5 ? 5 : attempts;
q->rotate = rotate; // 设置是否轮询
q->rotate = rotate;
return 0; // 初始化成功返回0
return 0;
}
// WFDnsClient类的析构函数用于释放资源
void WFDnsClient::deinit()
{
delete (DnsParams *)this->params; // 删除分配的DnsParams对象
this->params = NULL; // 将params指针设置为NULL
delete (DnsParams *)this->params;
this->params = NULL;
}
// WFDnsClient类的方法用于创建一个新的DNS任务
WFDnsTask *WFDnsClient::create_dns_task(const std::string& name,
dns_callback_t callback)
{
DnsParams::dns_params *p = ((DnsParams *)this->params)->get_params(); // 获取DNS参数
struct DnsStatus status; // 创建DNS状态结构体
size_t next_server; // 下一个要尝试的服务器索引
WFDnsTask *task; // 指向新创建的DNS任务的指针
DnsRequest *req; // 指向DNS请求的指针
DnsParams::dns_params *p = ((DnsParams *)this->params)->get_params();
struct DnsStatus status;
size_t next_server;
WFDnsTask *task;
DnsRequest *req;
// 如果启用轮询,则计算下一个服务器索引,否则使用第一个服务器
next_server = p->rotate ? this->id++ % p->uris.size() : 0;
status.origin_name = name; // 设置原始域名
status.next_domain = 0; // 设置下一个要尝试的搜索域索引
status.attempts_left = p->attempts; // 设置剩余尝试次数
status.try_origin_state = DNS_STATUS_TRY_ORIGIN_FIRST; // 设置尝试原始域名的状态
status.origin_name = name;
status.next_domain = 0;
status.attempts_left = p->attempts;
status.try_origin_state = DNS_STATUS_TRY_ORIGIN_FIRST;
// 如果域名以点结尾,则跳过搜索域
if (!name.empty() && name.back() == '.')
status.next_domain = p->search_list.size();
// 如果域名中的点数少于ndots则设置尝试原始域名的状态为最后尝试
else if (__get_ndots(name) < p->ndots)
status.try_origin_state = DNS_STATUS_TRY_ORIGIN_LAST;
// 检查是否有下一个域名要尝试,并更新状态
__has_next_name(p, &status);
// 创建一个新的DNS任务使用下一个服务器的URI和提供的回调函数
task = WFTaskFactory::create_dns_task(p->uris[next_server], 0, std::move(callback));
status.next_server = next_server; // 设置当前服务器索引
status.last_server = (next_server + p->uris.size() - 1) % p->uris.size(); // 设置最后一个服务器索引
task = WFTaskFactory::create_dns_task(p->uris[next_server], 0,
std::move(callback));
status.next_server = next_server;
status.last_server = (next_server + p->uris.size() - 1) % p->uris.size();
req = task->get_req(); // 获取DNS请求对象
req->set_question(status.current_name.c_str(), DNS_TYPE_A, DNS_CLASS_IN); // 设置DNS请求的问题部分
req->set_rd(1); // 设置DNS请求的递归标志
req = task->get_req();
req->set_question(status.current_name.c_str(), DNS_TYPE_A, DNS_CLASS_IN);
req->set_rd(1);
ComplexTask *ctask = static_cast<ComplexTask *>(task); // 将任务转换为ComplexTask类型
// 设置任务的上下文回调当DNS任务完成时将调用__callback_internal函数
ComplexTask *ctask = static_cast<ComplexTask *>(task);
*ctask->get_mutable_ctx() = std::bind(__callback_internal,
std::placeholders::_1,
*(DnsParams *)params, status);
return task; // 返回新创建的DNS任务
return task;
}

File diff suppressed because it is too large Load Diff

@ -26,81 +26,64 @@
#include "URIParser.h"
#include "WFTaskFactory.h"
// 定义WFMySQLConnection类用于管理MySQL数据库连接
class WFMySQLConnection
{
public:
// 初始化函数接受一个包含连接信息的URL字符串
// 例如mysql://username:passwd@127.0.0.1/dbname?character_set=utf8
// 推荐使用IP地址而不是域名因为域名会被解析为第一个地址并且不建议使用上游名称作为主机名
/* example: mysql://username:passwd@127.0.0.1/dbname?character_set=utf8
* IP string is recommmended in url. When using a domain name, the first
* address resovled will be used. Don't use upstream name as a host. */
int init(const std::string& url)
{
// 调用另一个init函数传入URL和一个NULL的SSL上下文
return this->init(url, NULL);
}
// 重载的init函数接受URL和一个可选的SSL上下文
int init(const std::string& url, SSL_CTX *ssl_ctx);
// 反初始化函数,当前为空实现
void deinit() { }
public:
// 创建一个查询任务接受一个SQL查询字符串和一个回调函数
WFMySQLTask *create_query_task(const std::string& query,
mysql_callback_t callback)
{
// 使用WFTaskFactory创建一个新的MySQL任务传入连接URI、超时时间和回调函数
WFMySQLTask *task = WFTaskFactory::create_mysql_task(this->uri, 0,
std::move(callback));
// 设置任务的SSL上下文
this->set_ssl_ctx(task);
// 设置任务的查询字符串
task->get_req()->set_query(query);
// 返回创建的任务
return task;
}
// 创建一个断开连接的任务,接受一个回调函数
// 如果不手动断开连接TCP连接将在对象删除后保持活动并可能被具有相同ID和URL的另一个WFMySQLConnection对象重用
/* If you don't disconnect manually, the TCP connection will be
* kept alive after this object is deleted, and maybe reused by
* another WFMySQLConnection object with same id and url. */
WFMySQLTask *create_disconnect_task(mysql_callback_t callback)
{
// 创建一个空查询的任务,以此作为断开连接的任务
WFMySQLTask *task = this->create_query_task("", std::move(callback));
// 设置任务的SSL上下文
this->set_ssl_ctx(task);
// 设置任务不保持活动状态,暗示这是一个断开连接的任务
task->set_keep_alive(0);
// 返回创建的任务
return task;
}
protected:
// 设置任务的SSL上下文
void set_ssl_ctx(WFMySQLTask *task) const
{
// 使用别名简化类型名称
using MySQLRequest = protocol::MySQLRequest;
using MySQLResponse = protocol::MySQLResponse;
// 将任务强制转换为WFComplexClientTask类型
auto *t = (WFComplexClientTask<MySQLRequest, MySQLResponse> *)task;
// 设置任务的SSL上下文如果为NULL则使用默认
/* 'ssl_ctx' can be NULL and will use default. */
t->set_ssl_ctx(this->ssl_ctx);
}
protected:
// 解析后的URI用于存储连接信息
ParsedURI uri;
// SSL上下文用于加密通信
SSL_CTX *ssl_ctx;
// 连接ID确保并发连接具有不同的ID
int id;
public:
// 确保并发连接的ID不同当连接对象被删除时ID可以被重用
/* Make sure that concurrent connections have different id.
* When a connection object is deleted, id can be reused. */
WFMySQLConnection(int id) { this->id = id; }
// 析构函数,当前为空实现
virtual ~WFMySQLConnection() { }
};
#endif

@ -30,109 +30,145 @@
#include "WFTask.h"
#include "WFTaskFactory.h"
// 定义WFRedisSubscribeTask类它继承自WFGenericTask
class WFRedisSubscribeTask : public WFGenericTask
{
public:
// 获取与任务关联的Redis响应对象
// 注意:只能在'extract'函数中或在任务开始之前调用'get_resp()'来设置响应大小限制。
/* Note: Call 'get_resp()' only in the 'extract' function or
before the task is started to set response size limit. */
protocol::RedisResponse *get_resp()
{
return this->task->get_resp();
}
public:
// 用户需要恰好调用一次'release()'函数,可以在任何地方调用。
/* User needs to call 'release()' exactly once, anywhere. */
void release()
{
// 使用原子操作来检查并设置标志如果标志已经是true则不执行删除操作
// 这样可以防止多次释放同一个对象
if (this->flag.exchange(true))
delete this;
}
// ... (省略了其他Redis命令的函数如subscribe, unsubscribe等)
// 这些函数通过发送Redis命令来执行订阅、取消订阅等操作
public:
/* Note: After 'release()' is called, all the requesting functions
should not be called except in 'extract', because the task
point may have been deleted because 'callback' finished. */
int subscribe(const std::vector<std::string>& channels)
{
return this->sync_send("SUBSCRIBE", channels);
}
int unsubscribe(const std::vector<std::string>& channels)
{
return this->sync_send("UNSUBSCRIBE", channels);
}
int unsubscribe()
{
return this->sync_send("UNSUBSCRIBE", { });
}
int psubscribe(const std::vector<std::string>& patterns)
{
return this->sync_send("PSUBSCRIBE", patterns);
}
int punsubscribe(const std::vector<std::string>& patterns)
{
return this->sync_send("PUNSUBSCRIBE", patterns);
}
int punsubscribe()
{
return this->sync_send("PUNSUBSCRIBE", { });
}
int ping(const std::string& message)
{
return this->sync_send("PING", { message });
}
int ping()
{
return this->sync_send("PING", { });
}
int quit()
{
return this->sync_send("QUIT", { });
}
public:
// 设置与消息接收相关的超时时间
// 这些函数只能在任务开始之前或在'extract'中调用
/* All 'timeout' proxy functions can only be called only before
the task is started or in 'extract'. */
// 设置等待每个消息的超时时间。非常有用。如果不设置,则最大等待时间将是全局的'response_timeout'
/* Timeout of waiting for each message. Very useful. If not set,
the max waiting time will be the global 'response_timeout'*/
void set_watch_timeout(int timeout)
{
this->task->set_watch_timeout(timeout);
}
// 设置接收完整消息的超时时间。
/* Timeout of receiving a complete message. */
void set_recv_timeout(int timeout)
{
this->task->set_receive_timeout(timeout);
}
// 设置发送第一个订阅请求的超时时间。
/* Timeout of sending the first subscribe request. */
void set_send_timeout(int timeout)
{
this->task->set_send_timeout(timeout);
}
// 设置保持连接活跃的超时时间。默认值为0。如果你想保持连接活跃确保在所有频道/模式取消订阅后不再发送任何请求。
/* The default keep alive timeout is 0. If you want to keep
the connection alive, make sure not to send any request
after all channels/patterns were unsubscribed. */
void set_keep_alive(int timeout)
{
this->task->set_keep_alive(timeout);
}
public:
// 设置提取函数或回调函数,这些函数只能在任务开始之前或在'extract'中调用
/* Call 'set_extract' or 'set_callback' only before the task
is started, or in 'extract'. */
// 设置提取函数,当任务完成时,这个函数将被调用以处理结果
void set_extract(std::function<void (WFRedisSubscribeTask *)> ex)
{
this->extract = std::move(ex);
}
// 设置回调函数,当任务完成时(可能是在提取函数之后),这个函数将被调用
void set_callback(std::function<void (WFRedisSubscribeTask *)> cb)
{
this->callback = std::move(cb);
}
protected:
// 虚拟的dispatch函数用于将任务添加到执行序列中
virtual void dispatch()
{
series_of(this)->push_front(this->task);
this->subtask_done();
}
// 虚拟的done函数用于从执行序列中弹出任务并返回
virtual SubTask *done()
{
return series_of(this)->pop();
}
protected:
// 同步发送Redis命令的函数它封装了与Redis服务器的通信细节
int sync_send(const std::string& command,
const std::vector<std::string>& params);
// 静态的提取函数和回调函数它们被设计为与Redis任务交互的接口
static void task_extract(WFRedisTask *task);
static void task_callback(WFRedisTask *task);
protected:
// 指向底层Redis任务的指针
WFRedisTask *task;
// 用于同步访问的互斥锁
std::mutex mutex;
// 原子标志,用于跟踪对象是否已被释放
std::atomic<bool> flag;
// 提取函数和回调函数,当任务完成时它们将被调用
std::function<void (WFRedisSubscribeTask *)> extract;
std::function<void (WFRedisSubscribeTask *)> callback;
protected:
// 构造函数接受一个Redis任务、提取函数和回调函数
WFRedisSubscribeTask(WFRedisTask *task,
std::function<void (WFRedisSubscribeTask *)>&& ex,
std::function<void (WFRedisSubscribeTask *)>&& cb) :
@ -140,78 +176,65 @@ protected:
extract(std::move(ex)),
callback(std::move(cb))
{
// 将用户数据指针设置为当前对象,以便在回调中可以访问它
task->user_data = this;
this->task = task;
}
// 析构函数,释放与任务关联的资源
virtual ~WFRedisSubscribeTask()
{
if (this->task)
this->task->dismiss(); // 可能是取消任务或释放相关资源
this->task->dismiss();
}
// 允许WFRedisSubscriber类访问受保护的成员
friend class WFRedisSubscriber;
};
// 定义一个名为 WFRedisSubscriber 的类
class WFRedisSubscriber
{
public:
// 重载的 init 方法,只接受一个 URL 参数,内部调用另一个 init 方法并传入 NULL 作为 SSL 上下文
int init(const std::string& url)
{
return this->init(url, NULL); // 调用重载的 init 方法,传入 URL 和 NULL 作为 SSL 上下文
return this->init(url, NULL);
}
// init 方法,接受一个 URL 和一个可选的 SSL 上下文参数
int init(const std::string& url, SSL_CTX *ssl_ctx); // 此方法的具体实现在代码段之外
int init(const std::string& url, SSL_CTX *ssl_ctx);
// 释放资源的 deinit 方法,当前为空实现
void deinit() { }
public:
// 定义提取任务数据的函数类型
using extract_t = std::function<void (WFRedisSubscribeTask *)>;
// 定义任务回调的函数类型
using callback_t = std::function<void (WFRedisSubscribeTask *)>;
public:
// 创建一个订阅任务,接受频道列表、提取函数和回调函数
WFRedisSubscribeTask *create_subscribe_task(const std::vector<std::string>& channels,
extract_t extract, callback_t callback); // 方法的具体实现在代码段之外
WFRedisSubscribeTask *
create_subscribe_task(const std::vector<std::string>& channels,
extract_t extract, callback_t callback);
// 创建一个模式订阅任务,接受模式列表、提取函数和回调函数
WFRedisSubscribeTask *create_psubscribe_task(const std::vector<std::string>& patterns,
extract_t extract, callback_t callback); // 方法的具体实现在代码段之外
WFRedisSubscribeTask *
create_psubscribe_task(const std::vector<std::string>& patterns,
extract_t extract, callback_t callback);
protected:
// 为任务设置 SSL 上下文
void set_ssl_ctx(WFRedisTask *task) const
{
// 使用别名简化类型名称
using RedisRequest = protocol::RedisRequest;
using RedisResponse = protocol::RedisResponse;
// 将传入的 WFRedisTask 强制转换为 WFComplexClientTask<RedisRequest, RedisResponse> 类型
auto *t = (WFComplexClientTask<RedisRequest, RedisResponse> *)task;
// 为任务设置 SSL 上下文,如果 ssl_ctx 为 NULL则使用默认设置
/* 'ssl_ctx' can be NULL and will use default. */
t->set_ssl_ctx(this->ssl_ctx);
}
protected:
// 创建一个 Redis 任务,接受命令和参数列表
WFRedisTask *create_redis_task(const std::string& command,
const std::vector<std::string>& params); // 方法的具体实现在代码段之外
const std::vector<std::string>& params);
protected:
// 存储解析后的 URI用于连接 Redis 服务器
ParsedURI uri;
// 存储 SSL 上下文,用于加密通信(如果需要)
SSL_CTX *ssl_ctx;
public:
virtual ~WFRedisSubscriber() { }
};
#endif

Binary file not shown.

@ -0,0 +1,27 @@
cmake_minimum_required(VERSION 3.6)
project(kernel)
if (CMAKE_SYSTEM_NAME STREQUAL "Linux" OR CMAKE_SYSTEM_NAME STREQUAL "Android")
set(IOSERVICE_FILE IOService_linux.cc)
elseif (UNIX)
set(IOSERVICE_FILE IOService_thread.cc)
else ()
message(FATAL_ERROR "IOService unsupported.")
endif ()
set(SRC
${IOSERVICE_FILE}
mpoller.c
poller.c
rbtree.c
msgqueue.c
thrdpool.c
CommRequest.cc
CommScheduler.cc
Communicator.cc
Executor.cc
SubTask.cc
)
add_library(${PROJECT_NAME} OBJECT ${SRC})

@ -703,6 +703,7 @@ static int __dns_parser_parse_question(dns_parser_t *parser)
void dns_parser_init(dns_parser_t *parser)
{
// 初始化解析器结构体,包括分配内存、设置指针、初始化计数器和列表头
parser->msgbuf = NULL;
parser->msgbase = NULL;
parser->cur = NULL;
@ -724,6 +725,7 @@ int dns_parser_set_question(const char *name,
{
int ret;
// 设置DNS查询的问题部分包括域名、查询类型和查询类
ret = dns_parser_set_question_name(name, parser);
if (ret < 0)
return ret;

@ -0,0 +1,71 @@
/*
Copyright (c) 2019 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Xie Han (xiehan@sogou-inc.com)
*/
#ifndef _WFHTTPSERVER_H_
#define _WFHTTPSERVER_H_
#include <utility> // 引入标准库中的工具函数
#include "HttpMessage.h" // 引入 HTTP 消息相关的头文件
#include "WFServer.h" // 引入服务器功能的头文件
#include "WFTaskFactory.h" // 引入任务工厂以创建 HTTP 任务
// 定义一个 http_process_t 类型,它是一个接受 WFHttpTask 指针并返回 void 的函数对象
using http_process_t = std::function<void (WFHttpTask *)>;
// 定义 WFHttpServer 类型,基于 WFServer 模板类,使用 HttpRequest 和 HttpResponse 作为协议
using WFHttpServer = WFServer<protocol::HttpRequest, protocol::HttpResponse>;
// 设置 HTTP 服务器的默认参数
static constexpr struct WFServerParams HTTP_SERVER_PARAMS_DEFAULT =
{
.transport_type = TT_TCP, // 传输类型为 TCP
.max_connections = 2000, // 最大连接数
.peer_response_timeout = 10 * 1000, // 对端响应超时时间
.receive_timeout = -1, // 接收超时时间,-1 表示不超时
.keep_alive_timeout = 60 * 1000, // 保持连接的超时时间
.request_size_limit = (size_t)-1, // 请求大小限制,-1 表示无限制
.ssl_accept_timeout = 10 * 1000, // SSL 接受超时时间
};
// WFHttpServer 构造函数的特化模板
template<> inline
WFHttpServer::WFServer(http_process_t proc) :
WFServerBase(&HTTP_SERVER_PARAMS_DEFAULT), // 调用基类构造函数,传入默认参数
process(std::move(proc)) // 移动构造函数以提高性能
{
}
// 创建新的会话,重写基类的方法
template<> inline
CommSession *WFHttpServer::new_session(long long seq, CommConnection *conn)
{
WFHttpTask *task; // 声明一个 HTTP 任务指针
// 使用任务工厂创建一个新的 HTTP 任务
task = WFServerTaskFactory::create_http_task(this, this->process);
// 设置保持连接的超时时间
task->set_keep_alive(this->params.keep_alive_timeout);
// 设置接收超时时间
task->set_receive_timeout(this->params.receive_timeout);
// 设置请求大小限制
task->get_req()->set_size_limit(this->params.request_size_limit);
return task; // 返回创建的任务
}
#endif // _WFHTTPSERVER_H_
Loading…
Cancel
Save