comments adding

master
xiangwangbuwang 7 months ago
parent b1bb4c6fc7
commit 26ae2c6227

@ -28,250 +28,281 @@ using namespace protocol;
using DnsCtx = std::function<void (WFDnsTask *)>; using DnsCtx = std::function<void (WFDnsTask *)>;
using ComplexTask = WFComplexClientTask<DnsRequest, DnsResponse, DnsCtx>; using ComplexTask = WFComplexClientTask<DnsRequest, DnsResponse, DnsCtx>;
// 定义一个DnsParams类用于存储DNS查询参数
class DnsParams class DnsParams
{ {
public: public:
struct dns_params // 定义一个内部结构体dns_params用于存储具体的DNS参数
{ struct dns_params
std::vector<ParsedURI> uris; {
std::vector<std::string> search_list; std::vector<ParsedURI> uris; // 存储解析后的URI
int ndots; std::vector<std::string> search_list; // 搜索列表
int attempts; int ndots; // 域名中允许的最小点数
bool rotate; int attempts; // 最大尝试次数
}; bool rotate; // 是否轮询
};
public: public:
DnsParams() // DnsParams类的构造函数
{ DnsParams()
this->ref = new std::atomic<size_t>(1); {
this->params = new dns_params(); this->ref = new std::atomic<size_t>(1); // 初始化引用计数为1
} this->params = new dns_params(); // 初始化参数结构体
}
DnsParams(const DnsParams& p)
{ // DnsParams类的拷贝构造函数
this->ref = p.ref; DnsParams(const DnsParams& p)
this->params = p.params; {
this->incref(); this->ref = p.ref; // 拷贝引用计数指针
} this->params = p.params; // 拷贝参数结构体指针
this->incref(); // 增加引用计数
DnsParams& operator=(const DnsParams& p) }
{
if (this != &p) // DnsParams类的赋值运算符
{ DnsParams& operator=(const DnsParams& p)
this->decref(); {
this->ref = p.ref; if (this != &p) // 如果不是自赋值
this->params = p.params; {
this->incref(); this->decref(); // 减少当前对象的引用计数
} this->ref = p.ref; // 拷贝引用计数指针
return *this; this->params = p.params; // 拷贝参数结构体指针
} this->incref(); // 增加引用计数
}
~DnsParams() { this->decref(); } return *this; // 返回当前对象的引用
}
const dns_params *get_params() const { return this->params; }
dns_params *get_params() { return this->params; } // DnsParams类的析构函数
~DnsParams() { this->decref(); } // 减少引用计数
// 获取const类型的参数指针
const dns_params *get_params() const { return this->params; }
// 获取非const类型的参数指针
dns_params *get_params() { return this->params; }
private: private:
void incref() { (*this->ref)++; } // 增加引用计数
void decref() void incref() { (*this->ref)++; }
{ // 减少引用计数并在计数为0时释放资源
if (--*this->ref == 0) void decref()
{ {
delete this->params; if (--*this->ref == 0)
delete this->ref; {
} delete this->params; // 删除参数结构体
} delete this->ref; // 删除引用计数
}
}
private: private:
dns_params *params; dns_params *params; // 指向参数结构体的指针
std::atomic<size_t> *ref; std::atomic<size_t> *ref; // 指向引用计数的指针
}; };
// 定义DNS状态枚举
enum enum
{ {
DNS_STATUS_TRY_ORIGIN_DONE = 0, DNS_STATUS_TRY_ORIGIN_DONE = 0,
DNS_STATUS_TRY_ORIGIN_FIRST = 1, DNS_STATUS_TRY_ORIGIN_FIRST = 1,
DNS_STATUS_TRY_ORIGIN_LAST = 2 DNS_STATUS_TRY_ORIGIN_LAST = 2
}; };
// 定义DnsStatus结构体用于存储DNS查询状态
struct DnsStatus struct DnsStatus
{ {
std::string origin_name; std::string origin_name; // 原始域名
std::string current_name; std::string current_name; // 当前域名
size_t next_server; // next server to try size_t next_server; // 下一个要尝试的服务器
size_t last_server; // last server to try size_t last_server; // 上一个尝试的服务器
size_t next_domain; // next search domain to try size_t next_domain; // 下一个要尝试的搜索域
int attempts_left; int attempts_left; // 剩余尝试次数
int try_origin_state; int try_origin_state; // 尝试原始域名的状态
}; };
// 计算字符串中点的数量
static int __get_ndots(const std::string& s) static int __get_ndots(const std::string& s)
{ {
int ndots = 0; int ndots = 0;
for (size_t i = 0; i < s.size(); i++) for (size_t i = 0; i < s.size(); i++)
ndots += s[i] == '.'; ndots += s[i] == '.'; // 统计点的数量
return ndots; return ndots;
} }
// 检查是否有下一个域名要尝试
static bool __has_next_name(const DnsParams::dns_params *p, static bool __has_next_name(const DnsParams::dns_params *p,
struct DnsStatus *s) struct DnsStatus *s)
{ {
if (s->try_origin_state == DNS_STATUS_TRY_ORIGIN_FIRST) if (s->try_origin_state == DNS_STATUS_TRY_ORIGIN_FIRST)
{ {
s->current_name = s->origin_name; s->current_name = s->origin_name; // 设置当前域名为原始域名
s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE; s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE; // 更新状态
return true; return true;
} }
if (s->next_domain < p->search_list.size()) if (s->next_domain < p->search_list.size()) // 如果还有搜索域要尝试
{ {
s->current_name = s->origin_name; s->current_name = s->origin_name; // 设置当前域名为原始域名
s->current_name.push_back('.'); s->current_name.push_back('.'); // 添加点
s->current_name.append(p->search_list[s->next_domain]); s->current_name.append(p->search_list[s->next_domain]); // 添加搜索域
s->next_domain++; s->next_domain++; // 移动到下一个搜索域
return true; return true;
} }
if (s->try_origin_state == DNS_STATUS_TRY_ORIGIN_LAST) if (s->try_origin_state == DNS_STATUS_TRY_ORIGIN_LAST)
{ {
s->current_name = s->origin_name; s->current_name = s->origin_name; // 设置当前域名为原始域名
s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE; s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE; // 更新状态
return true; return true;
} }
return false; return false; // 没有下一个域名要尝试
} }
// DNS查询回调内部函数
static void __callback_internal(WFDnsTask *task, const DnsParams& params, static void __callback_internal(WFDnsTask *task, const DnsParams& params,
struct DnsStatus& s) struct DnsStatus& s)
{ {
ComplexTask *ctask = static_cast<ComplexTask *>(task); ComplexTask *ctask = static_cast<ComplexTask *>(task); // 转换任务类型
int state = task->get_state(); int state = task->get_state(); // 获取任务状态
DnsRequest *req = task->get_req(); DnsRequest *req = task->get_req(); // 获取DNS请求
DnsResponse *resp = task->get_resp(); DnsResponse *resp = task->get_resp(); // 获取DNS响应
const auto *p = params.get_params(); const auto *p = params.get_params(); // 获取DNS参数
int rcode = resp->get_rcode(); int rcode = resp->get_rcode(); // 获取响应码
bool try_next_server = state != WFT_STATE_SUCCESS || bool try_next_server = state != WFT_STATE_SUCCESS || // 如果状态不是成功
rcode == DNS_RCODE_SERVER_FAILURE || rcode == DNS_RCODE_SERVER_FAILURE || // 或者响应码是服务器失败
rcode == DNS_RCODE_NOT_IMPLEMENTED || rcode == DNS_RCODE_NOT_IMPLEMENTED || // 或者响应码是未实现
rcode == DNS_RCODE_REFUSED; rcode == DNS_RCODE_REFUSED; // 或者响应码是拒绝
bool try_next_name = rcode == DNS_RCODE_FORMAT_ERROR || bool try_next_name = rcode == DNS_RCODE_FORMAT_ERROR || // 如果响应码是格式错误
rcode == DNS_RCODE_NAME_ERROR || rcode == DNS_RCODE_NAME_ERROR || // 或者响应码是名字错误
resp->get_ancount() == 0; resp->get_ancount() == 0; // 或者响应中没有答案
if (try_next_server) if (try_next_server)
{ {
if (s.last_server == s.next_server) if (s.last_server == s.next_server) // 如果已经是最后一个服务器
s.attempts_left--; s.attempts_left--; // 减少尝试次数
if (s.attempts_left <= 0) if (s.attempts_left <= 0) // 如果尝试次数用完
return; return; // 返回
s.next_server = (s.next_server + 1) % p->uris.size(); s.next_server = (s.next_server + 1) % p->uris.size(); // 计算下一个服务器
ctask->set_redirect(p->uris[s.next_server]); ctask->set_redirect(p->uris[s.next_server]); // 设置重定向
return; return; // 返回
} }
if (try_next_name && __has_next_name(p, &s)) if (try_next_name && __has_next_name(p, &s)) // 如果需要尝试下一个名字
{ {
req->set_question_name(s.current_name.c_str()); req->set_question_name(s.current_name.c_str()); // 设置查询名字
ctask->set_redirect(p->uris[s.next_server]); ctask->set_redirect(p->uris[s.next_server]); // 设置重定向
return; return; // 返回
} }
} }
// WFDnsClient类的初始化函数带一个URL参数
int WFDnsClient::init(const std::string& url) int WFDnsClient::init(const std::string& url)
{ {
return this->init(url, "", 1, 2, false); return this->init(url, "", 1, 2, false); // 调用另一个初始化函数
} }
// WFDnsClient类的初始化函数用于配置DNS客户端
int WFDnsClient::init(const std::string& url, const std::string& search_list, int WFDnsClient::init(const std::string& url, const std::string& search_list,
int ndots, int attempts, bool rotate) int ndots, int attempts, bool rotate)
{ {
std::vector<std::string> hosts; std::vector<std::string> hosts; // 用于存储分割后的主机名
std::vector<ParsedURI> uris; std::vector<ParsedURI> uris; // 用于存储解析后的URI
std::string host; std::string host; // 单个主机名字符串
ParsedURI uri; ParsedURI uri; // 用于存储解析后的单个URI
this->id = 0; this->id = 0; // 初始化客户端ID为0
hosts = StringUtil::split_filter_empty(url, ','); hosts = StringUtil::split_filter_empty(url, ','); // 根据逗号分割URL字符串获取主机名列表
for (size_t i = 0; i < hosts.size(); i++) // 遍历主机名列表,对每个主机名进行处理
{ for (size_t i = 0; i < hosts.size(); i++)
host = hosts[i]; {
if (strncasecmp(host.c_str(), "dns://", 6) != 0 && host = hosts[i]; // 获取当前主机名
strncasecmp(host.c_str(), "dnss://", 7) != 0) // 检查主机名是否以"dns://"或"dnss://"开头,如果不是,则添加"dns://"前缀
{ if (strncasecmp(host.c_str(), "dns://", 6) != 0 &&
host = "dns://" + host; strncasecmp(host.c_str(), "dnss://", 7) != 0)
} {
host = "dns://" + host;
if (URIParser::parse(host, uri) != 0) }
return -1;
// 使用URIParser解析当前主机名如果解析失败则返回错误码-1
uris.emplace_back(std::move(uri)); if (URIParser::parse(host, uri) != 0)
} return -1;
if (uris.empty() || ndots < 0 || attempts < 1) // 将解析后的URI添加到uris列表中
{ uris.emplace_back(std::move(uri));
errno = EINVAL; }
return -1;
} // 如果uris列表为空或者ndots小于0或者attempts小于1则设置errno为EINVAL并返回错误码-1
if (uris.empty() || ndots < 0 || attempts < 1)
this->params = new DnsParams; {
DnsParams::dns_params *q = ((DnsParams *)this->params)->get_params(); errno = EINVAL;
q->uris = std::move(uris); return -1;
q->search_list = StringUtil::split_filter_empty(search_list, ','); }
q->ndots = ndots > 15 ? 15 : ndots;
q->attempts = attempts > 5 ? 5 : attempts; // 创建一个新的DnsParams对象来存储DNS参数
q->rotate = rotate; this->params = new DnsParams;
DnsParams::dns_params *q = ((DnsParams *)this->params)->get_params(); // 获取DNS参数的指针
return 0; q->uris = std::move(uris); // 将解析后的URI列表移动到DNS参数中
q->search_list = StringUtil::split_filter_empty(search_list, ','); // 根据逗号分割search_list字符串获取搜索域列表
// 设置ndots的值如果大于15则设置为15
q->ndots = ndots > 15 ? 15 : ndots;
// 设置attempts的值如果大于5则设置为5
q->attempts = attempts > 5 ? 5 : attempts;
q->rotate = rotate; // 设置是否轮询
return 0; // 初始化成功返回0
} }
// WFDnsClient类的析构函数用于释放资源
void WFDnsClient::deinit() void WFDnsClient::deinit()
{ {
delete (DnsParams *)this->params; delete (DnsParams *)this->params; // 删除分配的DnsParams对象
this->params = NULL; this->params = NULL; // 将params指针设置为NULL
} }
// WFDnsClient类的方法用于创建一个新的DNS任务
WFDnsTask *WFDnsClient::create_dns_task(const std::string& name, WFDnsTask *WFDnsClient::create_dns_task(const std::string& name,
dns_callback_t callback) dns_callback_t callback)
{ {
DnsParams::dns_params *p = ((DnsParams *)this->params)->get_params(); DnsParams::dns_params *p = ((DnsParams *)this->params)->get_params(); // 获取DNS参数
struct DnsStatus status; struct DnsStatus status; // 创建DNS状态结构体
size_t next_server; size_t next_server; // 下一个要尝试的服务器索引
WFDnsTask *task; WFDnsTask *task; // 指向新创建的DNS任务的指针
DnsRequest *req; DnsRequest *req; // 指向DNS请求的指针
next_server = p->rotate ? this->id++ % p->uris.size() : 0; // 如果启用轮询,则计算下一个服务器索引,否则使用第一个服务器
next_server = p->rotate ? this->id++ % p->uris.size() : 0;
status.origin_name = name;
status.next_domain = 0; status.origin_name = name; // 设置原始域名
status.attempts_left = p->attempts; status.next_domain = 0; // 设置下一个要尝试的搜索域索引
status.try_origin_state = DNS_STATUS_TRY_ORIGIN_FIRST; status.attempts_left = p->attempts; // 设置剩余尝试次数
status.try_origin_state = DNS_STATUS_TRY_ORIGIN_FIRST; // 设置尝试原始域名的状态
if (!name.empty() && name.back() == '.')
status.next_domain = p->search_list.size(); // 如果域名以点结尾,则跳过搜索域
else if (__get_ndots(name) < p->ndots) if (!name.empty() && name.back() == '.')
status.try_origin_state = DNS_STATUS_TRY_ORIGIN_LAST; status.next_domain = p->search_list.size();
// 如果域名中的点数少于ndots则设置尝试原始域名的状态为最后尝试
__has_next_name(p, &status); else if (__get_ndots(name) < p->ndots)
status.try_origin_state = DNS_STATUS_TRY_ORIGIN_LAST;
task = WFTaskFactory::create_dns_task(p->uris[next_server], 0,
std::move(callback)); // 检查是否有下一个域名要尝试,并更新状态
status.next_server = next_server; __has_next_name(p, &status);
status.last_server = (next_server + p->uris.size() - 1) % p->uris.size();
// 创建一个新的DNS任务使用下一个服务器的URI和提供的回调函数
req = task->get_req(); task = WFTaskFactory::create_dns_task(p->uris[next_server], 0, std::move(callback));
req->set_question(status.current_name.c_str(), DNS_TYPE_A, DNS_CLASS_IN); status.next_server = next_server; // 设置当前服务器索引
req->set_rd(1); status.last_server = (next_server + p->uris.size() - 1) % p->uris.size(); // 设置最后一个服务器索引
ComplexTask *ctask = static_cast<ComplexTask *>(task); req = task->get_req(); // 获取DNS请求对象
*ctask->get_mutable_ctx() = std::bind(__callback_internal, req->set_question(status.current_name.c_str(), DNS_TYPE_A, DNS_CLASS_IN); // 设置DNS请求的问题部分
std::placeholders::_1, req->set_rd(1); // 设置DNS请求的递归标志
*(DnsParams *)params, status);
ComplexTask *ctask = static_cast<ComplexTask *>(task); // 将任务转换为ComplexTask类型
return task; // 设置任务的上下文回调当DNS任务完成时将调用__callback_internal函数
*ctask->get_mutable_ctx() = std::bind(__callback_internal,
std::placeholders::_1,
*(DnsParams *)params, status);
return task; // 返回新创建的DNS任务
} }

File diff suppressed because it is too large Load Diff

@ -26,64 +26,81 @@
#include "URIParser.h" #include "URIParser.h"
#include "WFTaskFactory.h" #include "WFTaskFactory.h"
// 定义WFMySQLConnection类用于管理MySQL数据库连接
class WFMySQLConnection class WFMySQLConnection
{ {
public: public:
/* example: mysql://username:passwd@127.0.0.1/dbname?character_set=utf8 // 初始化函数接受一个包含连接信息的URL字符串
* IP string is recommmended in url. When using a domain name, the first // 例如mysql://username:passwd@127.0.0.1/dbname?character_set=utf8
* address resovled will be used. Don't use upstream name as a host. */ // 推荐使用IP地址而不是域名因为域名会被解析为第一个地址并且不建议使用上游名称作为主机名
int init(const std::string& url) int init(const std::string& url)
{ {
// 调用另一个init函数传入URL和一个NULL的SSL上下文
return this->init(url, NULL); return this->init(url, NULL);
} }
// 重载的init函数接受URL和一个可选的SSL上下文
int init(const std::string& url, SSL_CTX *ssl_ctx); int init(const std::string& url, SSL_CTX *ssl_ctx);
// 反初始化函数,当前为空实现
void deinit() { } void deinit() { }
public: public:
// 创建一个查询任务接受一个SQL查询字符串和一个回调函数
WFMySQLTask *create_query_task(const std::string& query, WFMySQLTask *create_query_task(const std::string& query,
mysql_callback_t callback) mysql_callback_t callback)
{ {
// 使用WFTaskFactory创建一个新的MySQL任务传入连接URI、超时时间和回调函数
WFMySQLTask *task = WFTaskFactory::create_mysql_task(this->uri, 0, WFMySQLTask *task = WFTaskFactory::create_mysql_task(this->uri, 0,
std::move(callback)); std::move(callback));
// 设置任务的SSL上下文
this->set_ssl_ctx(task); this->set_ssl_ctx(task);
// 设置任务的查询字符串
task->get_req()->set_query(query); task->get_req()->set_query(query);
// 返回创建的任务
return task; return task;
} }
/* If you don't disconnect manually, the TCP connection will be // 创建一个断开连接的任务,接受一个回调函数
* kept alive after this object is deleted, and maybe reused by // 如果不手动断开连接TCP连接将在对象删除后保持活动并可能被具有相同ID和URL的另一个WFMySQLConnection对象重用
* another WFMySQLConnection object with same id and url. */
WFMySQLTask *create_disconnect_task(mysql_callback_t callback) WFMySQLTask *create_disconnect_task(mysql_callback_t callback)
{ {
// 创建一个空查询的任务,以此作为断开连接的任务
WFMySQLTask *task = this->create_query_task("", std::move(callback)); WFMySQLTask *task = this->create_query_task("", std::move(callback));
// 设置任务的SSL上下文
this->set_ssl_ctx(task); this->set_ssl_ctx(task);
// 设置任务不保持活动状态,暗示这是一个断开连接的任务
task->set_keep_alive(0); task->set_keep_alive(0);
// 返回创建的任务
return task; return task;
} }
protected: protected:
// 设置任务的SSL上下文
void set_ssl_ctx(WFMySQLTask *task) const void set_ssl_ctx(WFMySQLTask *task) const
{ {
// 使用别名简化类型名称
using MySQLRequest = protocol::MySQLRequest; using MySQLRequest = protocol::MySQLRequest;
using MySQLResponse = protocol::MySQLResponse; using MySQLResponse = protocol::MySQLResponse;
// 将任务强制转换为WFComplexClientTask类型
auto *t = (WFComplexClientTask<MySQLRequest, MySQLResponse> *)task; auto *t = (WFComplexClientTask<MySQLRequest, MySQLResponse> *)task;
/* 'ssl_ctx' can be NULL and will use default. */ // 设置任务的SSL上下文如果为NULL则使用默认
t->set_ssl_ctx(this->ssl_ctx); t->set_ssl_ctx(this->ssl_ctx);
} }
protected: protected:
// 解析后的URI用于存储连接信息
ParsedURI uri; ParsedURI uri;
// SSL上下文用于加密通信
SSL_CTX *ssl_ctx; SSL_CTX *ssl_ctx;
// 连接ID确保并发连接具有不同的ID
int id; int id;
public: public:
/* Make sure that concurrent connections have different id. // 确保并发连接的ID不同当连接对象被删除时ID可以被重用
* When a connection object is deleted, id can be reused. */
WFMySQLConnection(int id) { this->id = id; } WFMySQLConnection(int id) { this->id = id; }
// 析构函数,当前为空实现
virtual ~WFMySQLConnection() { } virtual ~WFMySQLConnection() { }
}; };
#endif #endif

@ -30,211 +30,188 @@
#include "WFTask.h" #include "WFTask.h"
#include "WFTaskFactory.h" #include "WFTaskFactory.h"
// 定义WFRedisSubscribeTask类它继承自WFGenericTask
class WFRedisSubscribeTask : public WFGenericTask class WFRedisSubscribeTask : public WFGenericTask
{ {
public: public:
/* Note: Call 'get_resp()' only in the 'extract' function or // 获取与任务关联的Redis响应对象
before the task is started to set response size limit. */ // 注意:只能在'extract'函数中或在任务开始之前调用'get_resp()'来设置响应大小限制。
protocol::RedisResponse *get_resp() protocol::RedisResponse *get_resp()
{ {
return this->task->get_resp(); return this->task->get_resp();
} }
public: public:
/* User needs to call 'release()' exactly once, anywhere. */ // 用户需要恰好调用一次'release()'函数,可以在任何地方调用。
void release() void release()
{ {
if (this->flag.exchange(true)) // 使用原子操作来检查并设置标志如果标志已经是true则不执行删除操作
delete this; // 这样可以防止多次释放同一个对象
} if (this->flag.exchange(true))
delete this;
}
// ... (省略了其他Redis命令的函数如subscribe, unsubscribe等)
// 这些函数通过发送Redis命令来执行订阅、取消订阅等操作
public: public:
/* Note: After 'release()' is called, all the requesting functions // 设置与消息接收相关的超时时间
should not be called except in 'extract', because the task // 这些函数只能在任务开始之前或在'extract'中调用
point may have been deleted because 'callback' finished. */
// 设置等待每个消息的超时时间。非常有用。如果不设置,则最大等待时间将是全局的'response_timeout'
int subscribe(const std::vector<std::string>& channels) void set_watch_timeout(int timeout)
{ {
return this->sync_send("SUBSCRIBE", channels); this->task->set_watch_timeout(timeout);
} }
int unsubscribe(const std::vector<std::string>& channels) // 设置接收完整消息的超时时间。
{ void set_recv_timeout(int timeout)
return this->sync_send("UNSUBSCRIBE", channels); {
} this->task->set_receive_timeout(timeout);
}
int unsubscribe()
{ // 设置发送第一个订阅请求的超时时间。
return this->sync_send("UNSUBSCRIBE", { }); void set_send_timeout(int timeout)
} {
this->task->set_send_timeout(timeout);
int psubscribe(const std::vector<std::string>& patterns) }
{
return this->sync_send("PSUBSCRIBE", patterns); // 设置保持连接活跃的超时时间。默认值为0。如果你想保持连接活跃确保在所有频道/模式取消订阅后不再发送任何请求。
} void set_keep_alive(int timeout)
{
int punsubscribe(const std::vector<std::string>& patterns) this->task->set_keep_alive(timeout);
{ }
return this->sync_send("PUNSUBSCRIBE", patterns);
}
int punsubscribe()
{
return this->sync_send("PUNSUBSCRIBE", { });
}
int ping(const std::string& message)
{
return this->sync_send("PING", { message });
}
int ping()
{
return this->sync_send("PING", { });
}
int quit()
{
return this->sync_send("QUIT", { });
}
public: public:
/* All 'timeout' proxy functions can only be called only before // 设置提取函数或回调函数,这些函数只能在任务开始之前或在'extract'中调用
the task is started or in 'extract'. */
/* Timeout of waiting for each message. Very useful. If not set, // 设置提取函数,当任务完成时,这个函数将被调用以处理结果
the max waiting time will be the global 'response_timeout'*/ void set_extract(std::function<void (WFRedisSubscribeTask *)> ex)
void set_watch_timeout(int timeout) {
{ this->extract = std::move(ex);
this->task->set_watch_timeout(timeout); }
}
/* Timeout of receiving a complete message. */
void set_recv_timeout(int timeout)
{
this->task->set_receive_timeout(timeout);
}
/* Timeout of sending the first subscribe request. */
void set_send_timeout(int timeout)
{
this->task->set_send_timeout(timeout);
}
/* The default keep alive timeout is 0. If you want to keep
the connection alive, make sure not to send any request
after all channels/patterns were unsubscribed. */
void set_keep_alive(int timeout)
{
this->task->set_keep_alive(timeout);
}
public:
/* Call 'set_extract' or 'set_callback' only before the task
is started, or in 'extract'. */
void set_extract(std::function<void (WFRedisSubscribeTask *)> ex) // 设置回调函数,当任务完成时(可能是在提取函数之后),这个函数将被调用
{ void set_callback(std::function<void (WFRedisSubscribeTask *)> cb)
this->extract = std::move(ex); {
} this->callback = std::move(cb);
}
void set_callback(std::function<void (WFRedisSubscribeTask *)> cb)
{
this->callback = std::move(cb);
}
protected: protected:
virtual void dispatch() // 虚拟的dispatch函数用于将任务添加到执行序列中
{ virtual void dispatch()
series_of(this)->push_front(this->task); {
this->subtask_done(); series_of(this)->push_front(this->task);
} this->subtask_done();
}
virtual SubTask *done()
{ // 虚拟的done函数用于从执行序列中弹出任务并返回
return series_of(this)->pop(); virtual SubTask *done()
} {
return series_of(this)->pop();
}
protected: protected:
int sync_send(const std::string& command, // 同步发送Redis命令的函数它封装了与Redis服务器的通信细节
const std::vector<std::string>& params); int sync_send(const std::string& command,
static void task_extract(WFRedisTask *task); const std::vector<std::string>& params);
static void task_callback(WFRedisTask *task);
protected: // 静态的提取函数和回调函数它们被设计为与Redis任务交互的接口
WFRedisTask *task; static void task_extract(WFRedisTask *task);
std::mutex mutex; static void task_callback(WFRedisTask *task);
std::atomic<bool> flag;
std::function<void (WFRedisSubscribeTask *)> extract;
std::function<void (WFRedisSubscribeTask *)> callback;
protected: protected:
WFRedisSubscribeTask(WFRedisTask *task, // 指向底层Redis任务的指针
std::function<void (WFRedisSubscribeTask *)>&& ex, WFRedisTask *task;
std::function<void (WFRedisSubscribeTask *)>&& cb) : // 用于同步访问的互斥锁
flag(false), std::mutex mutex;
extract(std::move(ex)), // 原子标志,用于跟踪对象是否已被释放
callback(std::move(cb)) std::atomic<bool> flag;
{ // 提取函数和回调函数,当任务完成时它们将被调用
task->user_data = this; std::function<void (WFRedisSubscribeTask *)> extract;
this->task = task; std::function<void (WFRedisSubscribeTask *)> callback;
}
virtual ~WFRedisSubscribeTask()
{
if (this->task)
this->task->dismiss();
}
friend class WFRedisSubscriber; protected:
// 构造函数接受一个Redis任务、提取函数和回调函数
WFRedisSubscribeTask(WFRedisTask *task,
std::function<void (WFRedisSubscribeTask *)>&& ex,
std::function<void (WFRedisSubscribeTask *)>&& cb) :
flag(false),
extract(std::move(ex)),
callback(std::move(cb))
{
// 将用户数据指针设置为当前对象,以便在回调中可以访问它
task->user_data = this;
this->task = task;
}
// 析构函数,释放与任务关联的资源
virtual ~WFRedisSubscribeTask()
{
if (this->task)
this->task->dismiss(); // 可能是取消任务或释放相关资源
}
// 允许WFRedisSubscriber类访问受保护的成员
friend class WFRedisSubscriber;
}; };
// 定义一个名为 WFRedisSubscriber 的类
class WFRedisSubscriber class WFRedisSubscriber
{ {
public: public:
// 重载的 init 方法,只接受一个 URL 参数,内部调用另一个 init 方法并传入 NULL 作为 SSL 上下文
int init(const std::string& url) int init(const std::string& url)
{ {
return this->init(url, NULL); return this->init(url, NULL); // 调用重载的 init 方法,传入 URL 和 NULL 作为 SSL 上下文
} }
int init(const std::string& url, SSL_CTX *ssl_ctx); // init 方法,接受一个 URL 和一个可选的 SSL 上下文参数
int init(const std::string& url, SSL_CTX *ssl_ctx); // 此方法的具体实现在代码段之外
// 释放资源的 deinit 方法,当前为空实现
void deinit() { } void deinit() { }
public: public:
// 定义提取任务数据的函数类型
using extract_t = std::function<void (WFRedisSubscribeTask *)>; using extract_t = std::function<void (WFRedisSubscribeTask *)>;
// 定义任务回调的函数类型
using callback_t = std::function<void (WFRedisSubscribeTask *)>; using callback_t = std::function<void (WFRedisSubscribeTask *)>;
public: public:
WFRedisSubscribeTask * // 创建一个订阅任务,接受频道列表、提取函数和回调函数
create_subscribe_task(const std::vector<std::string>& channels, WFRedisSubscribeTask *create_subscribe_task(const std::vector<std::string>& channels,
extract_t extract, callback_t callback); extract_t extract, callback_t callback); // 方法的具体实现在代码段之外
WFRedisSubscribeTask * // 创建一个模式订阅任务,接受模式列表、提取函数和回调函数
create_psubscribe_task(const std::vector<std::string>& patterns, WFRedisSubscribeTask *create_psubscribe_task(const std::vector<std::string>& patterns,
extract_t extract, callback_t callback); extract_t extract, callback_t callback); // 方法的具体实现在代码段之外
protected: protected:
// 为任务设置 SSL 上下文
void set_ssl_ctx(WFRedisTask *task) const void set_ssl_ctx(WFRedisTask *task) const
{ {
// 使用别名简化类型名称
using RedisRequest = protocol::RedisRequest; using RedisRequest = protocol::RedisRequest;
using RedisResponse = protocol::RedisResponse; using RedisResponse = protocol::RedisResponse;
// 将传入的 WFRedisTask 强制转换为 WFComplexClientTask<RedisRequest, RedisResponse> 类型
auto *t = (WFComplexClientTask<RedisRequest, RedisResponse> *)task; auto *t = (WFComplexClientTask<RedisRequest, RedisResponse> *)task;
/* 'ssl_ctx' can be NULL and will use default. */ // 为任务设置 SSL 上下文,如果 ssl_ctx 为 NULL则使用默认设置
t->set_ssl_ctx(this->ssl_ctx); t->set_ssl_ctx(this->ssl_ctx);
} }
protected: protected:
// 创建一个 Redis 任务,接受命令和参数列表
WFRedisTask *create_redis_task(const std::string& command, WFRedisTask *create_redis_task(const std::string& command,
const std::vector<std::string>& params); const std::vector<std::string>& params); // 方法的具体实现在代码段之外
protected: protected:
// 存储解析后的 URI用于连接 Redis 服务器
ParsedURI uri; ParsedURI uri;
// 存储 SSL 上下文,用于加密通信(如果需要)
SSL_CTX *ssl_ctx; SSL_CTX *ssl_ctx;
public: public:
virtual ~WFRedisSubscriber() { } virtual ~WFRedisSubscriber() { }
}; };
#endif #endif

Loading…
Cancel
Save