From 26ae2c6227107db97cb01c608d18c99d1d6996b2 Mon Sep 17 00:00:00 2001 From: xiangwangbuwang <1584615561@qq.com> Date: Wed, 8 Jan 2025 20:00:03 +0800 Subject: [PATCH] comments adding --- src/client/WFDnsClient.cc | 431 ++--- src/client/WFKafkaClient.cc | 2937 ++++++++++++++++---------------- src/client/WFMySQLConnection.h | 37 +- src/client/WFRedisSubscriber.h | 277 ++- 4 files changed, 1892 insertions(+), 1790 deletions(-) diff --git a/src/client/WFDnsClient.cc b/src/client/WFDnsClient.cc index 8ccf6c5..6d1e897 100644 --- a/src/client/WFDnsClient.cc +++ b/src/client/WFDnsClient.cc @@ -28,250 +28,281 @@ using namespace protocol; using DnsCtx = std::function; using ComplexTask = WFComplexClientTask; +// 定义一个DnsParams类,用于存储DNS查询参数 class DnsParams { public: - struct dns_params - { - std::vector uris; - std::vector search_list; - int ndots; - int attempts; - bool rotate; - }; + // 定义一个内部结构体dns_params,用于存储具体的DNS参数 + struct dns_params + { + std::vector uris; // 存储解析后的URI + std::vector search_list; // 搜索列表 + int ndots; // 域名中允许的最小点数 + int attempts; // 最大尝试次数 + bool rotate; // 是否轮询 + }; public: - DnsParams() - { - this->ref = new std::atomic(1); - this->params = new dns_params(); - } - - DnsParams(const DnsParams& p) - { - this->ref = p.ref; - this->params = p.params; - this->incref(); - } - - DnsParams& operator=(const DnsParams& p) - { - if (this != &p) - { - this->decref(); - this->ref = p.ref; - this->params = p.params; - this->incref(); - } - return *this; - } - - ~DnsParams() { this->decref(); } - - const dns_params *get_params() const { return this->params; } - dns_params *get_params() { return this->params; } + // DnsParams类的构造函数 + DnsParams() + { + this->ref = new std::atomic(1); // 初始化引用计数为1 + this->params = new dns_params(); // 初始化参数结构体 + } + + // DnsParams类的拷贝构造函数 + DnsParams(const DnsParams& p) + { + this->ref = p.ref; // 拷贝引用计数指针 + this->params = p.params; // 拷贝参数结构体指针 + this->incref(); // 增加引用计数 + } + + // DnsParams类的赋值运算符 + DnsParams& operator=(const DnsParams& p) + { + if (this != &p) // 如果不是自赋值 + { + this->decref(); // 减少当前对象的引用计数 + this->ref = p.ref; // 拷贝引用计数指针 + this->params = p.params; // 拷贝参数结构体指针 + this->incref(); // 增加引用计数 + } + return *this; // 返回当前对象的引用 + } + + // DnsParams类的析构函数 + ~DnsParams() { this->decref(); } // 减少引用计数 + + // 获取const类型的参数指针 + const dns_params *get_params() const { return this->params; } + // 获取非const类型的参数指针 + dns_params *get_params() { return this->params; } private: - void incref() { (*this->ref)++; } - void decref() - { - if (--*this->ref == 0) - { - delete this->params; - delete this->ref; - } - } + // 增加引用计数 + void incref() { (*this->ref)++; } + // 减少引用计数,并在计数为0时释放资源 + void decref() + { + if (--*this->ref == 0) + { + delete this->params; // 删除参数结构体 + delete this->ref; // 删除引用计数 + } + } private: - dns_params *params; - std::atomic *ref; + dns_params *params; // 指向参数结构体的指针 + std::atomic *ref; // 指向引用计数的指针 }; +// 定义DNS状态枚举 enum { - DNS_STATUS_TRY_ORIGIN_DONE = 0, - DNS_STATUS_TRY_ORIGIN_FIRST = 1, - DNS_STATUS_TRY_ORIGIN_LAST = 2 + DNS_STATUS_TRY_ORIGIN_DONE = 0, + DNS_STATUS_TRY_ORIGIN_FIRST = 1, + DNS_STATUS_TRY_ORIGIN_LAST = 2 }; +// 定义DnsStatus结构体,用于存储DNS查询状态 struct DnsStatus { - std::string origin_name; - std::string current_name; - size_t next_server; // next server to try - size_t last_server; // last server to try - size_t next_domain; // next search domain to try - int attempts_left; - int try_origin_state; + std::string origin_name; // 原始域名 + std::string current_name; // 当前域名 + size_t next_server; // 下一个要尝试的服务器 + size_t last_server; // 上一个尝试的服务器 + size_t next_domain; // 下一个要尝试的搜索域 + int attempts_left; // 剩余尝试次数 + int try_origin_state; // 尝试原始域名的状态 }; +// 计算字符串中点的数量 static int __get_ndots(const std::string& s) { - int ndots = 0; - for (size_t i = 0; i < s.size(); i++) - ndots += s[i] == '.'; - return ndots; + int ndots = 0; + for (size_t i = 0; i < s.size(); i++) + ndots += s[i] == '.'; // 统计点的数量 + return ndots; } +// 检查是否有下一个域名要尝试 static bool __has_next_name(const DnsParams::dns_params *p, - struct DnsStatus *s) + struct DnsStatus *s) { - if (s->try_origin_state == DNS_STATUS_TRY_ORIGIN_FIRST) - { - s->current_name = s->origin_name; - s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE; - return true; - } - - if (s->next_domain < p->search_list.size()) - { - s->current_name = s->origin_name; - s->current_name.push_back('.'); - s->current_name.append(p->search_list[s->next_domain]); - - s->next_domain++; - return true; - } - - if (s->try_origin_state == DNS_STATUS_TRY_ORIGIN_LAST) - { - s->current_name = s->origin_name; - s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE; - return true; - } - - return false; + if (s->try_origin_state == DNS_STATUS_TRY_ORIGIN_FIRST) + { + s->current_name = s->origin_name; // 设置当前域名为原始域名 + s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE; // 更新状态 + return true; + } + + if (s->next_domain < p->search_list.size()) // 如果还有搜索域要尝试 + { + s->current_name = s->origin_name; // 设置当前域名为原始域名 + s->current_name.push_back('.'); // 添加点 + s->current_name.append(p->search_list[s->next_domain]); // 添加搜索域 + + s->next_domain++; // 移动到下一个搜索域 + return true; + } + + if (s->try_origin_state == DNS_STATUS_TRY_ORIGIN_LAST) + { + s->current_name = s->origin_name; // 设置当前域名为原始域名 + s->try_origin_state = DNS_STATUS_TRY_ORIGIN_DONE; // 更新状态 + return true; + } + + return false; // 没有下一个域名要尝试 } +// DNS查询回调内部函数 static void __callback_internal(WFDnsTask *task, const DnsParams& params, - struct DnsStatus& s) + struct DnsStatus& s) { - ComplexTask *ctask = static_cast(task); - int state = task->get_state(); - DnsRequest *req = task->get_req(); - DnsResponse *resp = task->get_resp(); - const auto *p = params.get_params(); - int rcode = resp->get_rcode(); - - bool try_next_server = state != WFT_STATE_SUCCESS || - rcode == DNS_RCODE_SERVER_FAILURE || - rcode == DNS_RCODE_NOT_IMPLEMENTED || - rcode == DNS_RCODE_REFUSED; - bool try_next_name = rcode == DNS_RCODE_FORMAT_ERROR || - rcode == DNS_RCODE_NAME_ERROR || - resp->get_ancount() == 0; - - if (try_next_server) - { - if (s.last_server == s.next_server) - s.attempts_left--; - if (s.attempts_left <= 0) - return; - - s.next_server = (s.next_server + 1) % p->uris.size(); - ctask->set_redirect(p->uris[s.next_server]); - return; - } - - if (try_next_name && __has_next_name(p, &s)) - { - req->set_question_name(s.current_name.c_str()); - ctask->set_redirect(p->uris[s.next_server]); - return; - } + ComplexTask *ctask = static_cast(task); // 转换任务类型 + int state = task->get_state(); // 获取任务状态 + DnsRequest *req = task->get_req(); // 获取DNS请求 + DnsResponse *resp = task->get_resp(); // 获取DNS响应 + const auto *p = params.get_params(); // 获取DNS参数 + int rcode = resp->get_rcode(); // 获取响应码 + + bool try_next_server = state != WFT_STATE_SUCCESS || // 如果状态不是成功 + rcode == DNS_RCODE_SERVER_FAILURE || // 或者响应码是服务器失败 + rcode == DNS_RCODE_NOT_IMPLEMENTED || // 或者响应码是未实现 + rcode == DNS_RCODE_REFUSED; // 或者响应码是拒绝 + bool try_next_name = rcode == DNS_RCODE_FORMAT_ERROR || // 如果响应码是格式错误 + rcode == DNS_RCODE_NAME_ERROR || // 或者响应码是名字错误 + resp->get_ancount() == 0; // 或者响应中没有答案 + + if (try_next_server) + { + if (s.last_server == s.next_server) // 如果已经是最后一个服务器 + s.attempts_left--; // 减少尝试次数 + if (s.attempts_left <= 0) // 如果尝试次数用完 + return; // 返回 + + s.next_server = (s.next_server + 1) % p->uris.size(); // 计算下一个服务器 + ctask->set_redirect(p->uris[s.next_server]); // 设置重定向 + return; // 返回 + } + + if (try_next_name && __has_next_name(p, &s)) // 如果需要尝试下一个名字 + { + req->set_question_name(s.current_name.c_str()); // 设置查询名字 + ctask->set_redirect(p->uris[s.next_server]); // 设置重定向 + return; // 返回 + } } +// WFDnsClient类的初始化函数,带一个URL参数 int WFDnsClient::init(const std::string& url) { - return this->init(url, "", 1, 2, false); + return this->init(url, "", 1, 2, false); // 调用另一个初始化函数 } +// WFDnsClient类的初始化函数,用于配置DNS客户端 int WFDnsClient::init(const std::string& url, const std::string& search_list, - int ndots, int attempts, bool rotate) + int ndots, int attempts, bool rotate) { - std::vector hosts; - std::vector uris; - std::string host; - ParsedURI uri; - - this->id = 0; - hosts = StringUtil::split_filter_empty(url, ','); - - for (size_t i = 0; i < hosts.size(); i++) - { - host = hosts[i]; - if (strncasecmp(host.c_str(), "dns://", 6) != 0 && - strncasecmp(host.c_str(), "dnss://", 7) != 0) - { - host = "dns://" + host; - } - - if (URIParser::parse(host, uri) != 0) - return -1; - - uris.emplace_back(std::move(uri)); - } - - if (uris.empty() || ndots < 0 || attempts < 1) - { - errno = EINVAL; - return -1; - } - - this->params = new DnsParams; - DnsParams::dns_params *q = ((DnsParams *)this->params)->get_params(); - q->uris = std::move(uris); - q->search_list = StringUtil::split_filter_empty(search_list, ','); - q->ndots = ndots > 15 ? 15 : ndots; - q->attempts = attempts > 5 ? 5 : attempts; - q->rotate = rotate; - - return 0; + std::vector hosts; // 用于存储分割后的主机名 + std::vector uris; // 用于存储解析后的URI + std::string host; // 单个主机名字符串 + ParsedURI uri; // 用于存储解析后的单个URI + + this->id = 0; // 初始化客户端ID为0 + hosts = StringUtil::split_filter_empty(url, ','); // 根据逗号分割URL字符串,获取主机名列表 + + // 遍历主机名列表,对每个主机名进行处理 + for (size_t i = 0; i < hosts.size(); i++) + { + host = hosts[i]; // 获取当前主机名 + // 检查主机名是否以"dns://"或"dnss://"开头,如果不是,则添加"dns://"前缀 + if (strncasecmp(host.c_str(), "dns://", 6) != 0 && + strncasecmp(host.c_str(), "dnss://", 7) != 0) + { + host = "dns://" + host; + } + + // 使用URIParser解析当前主机名,如果解析失败则返回错误码-1 + if (URIParser::parse(host, uri) != 0) + return -1; + + // 将解析后的URI添加到uris列表中 + uris.emplace_back(std::move(uri)); + } + + // 如果uris列表为空,或者ndots小于0,或者attempts小于1,则设置errno为EINVAL并返回错误码-1 + if (uris.empty() || ndots < 0 || attempts < 1) + { + errno = EINVAL; + return -1; + } + + // 创建一个新的DnsParams对象来存储DNS参数 + this->params = new DnsParams; + DnsParams::dns_params *q = ((DnsParams *)this->params)->get_params(); // 获取DNS参数的指针 + q->uris = std::move(uris); // 将解析后的URI列表移动到DNS参数中 + q->search_list = StringUtil::split_filter_empty(search_list, ','); // 根据逗号分割search_list字符串,获取搜索域列表 + // 设置ndots的值,如果大于15则设置为15 + q->ndots = ndots > 15 ? 15 : ndots; + // 设置attempts的值,如果大于5则设置为5 + q->attempts = attempts > 5 ? 5 : attempts; + q->rotate = rotate; // 设置是否轮询 + + return 0; // 初始化成功,返回0 } +// WFDnsClient类的析构函数,用于释放资源 void WFDnsClient::deinit() { - delete (DnsParams *)this->params; - this->params = NULL; + delete (DnsParams *)this->params; // 删除分配的DnsParams对象 + this->params = NULL; // 将params指针设置为NULL } +// WFDnsClient类的方法,用于创建一个新的DNS任务 WFDnsTask *WFDnsClient::create_dns_task(const std::string& name, dns_callback_t callback) { - DnsParams::dns_params *p = ((DnsParams *)this->params)->get_params(); - struct DnsStatus status; - size_t next_server; - WFDnsTask *task; - DnsRequest *req; - - next_server = p->rotate ? this->id++ % p->uris.size() : 0; - - status.origin_name = name; - status.next_domain = 0; - status.attempts_left = p->attempts; - status.try_origin_state = DNS_STATUS_TRY_ORIGIN_FIRST; - - if (!name.empty() && name.back() == '.') - status.next_domain = p->search_list.size(); - else if (__get_ndots(name) < p->ndots) - status.try_origin_state = DNS_STATUS_TRY_ORIGIN_LAST; - - __has_next_name(p, &status); - - task = WFTaskFactory::create_dns_task(p->uris[next_server], 0, - std::move(callback)); - status.next_server = next_server; - status.last_server = (next_server + p->uris.size() - 1) % p->uris.size(); - - req = task->get_req(); - req->set_question(status.current_name.c_str(), DNS_TYPE_A, DNS_CLASS_IN); - req->set_rd(1); - - ComplexTask *ctask = static_cast(task); - *ctask->get_mutable_ctx() = std::bind(__callback_internal, - std::placeholders::_1, - *(DnsParams *)params, status); - - return task; + DnsParams::dns_params *p = ((DnsParams *)this->params)->get_params(); // 获取DNS参数 + struct DnsStatus status; // 创建DNS状态结构体 + size_t next_server; // 下一个要尝试的服务器索引 + WFDnsTask *task; // 指向新创建的DNS任务的指针 + DnsRequest *req; // 指向DNS请求的指针 + + // 如果启用轮询,则计算下一个服务器索引,否则使用第一个服务器 + next_server = p->rotate ? this->id++ % p->uris.size() : 0; + + status.origin_name = name; // 设置原始域名 + status.next_domain = 0; // 设置下一个要尝试的搜索域索引 + status.attempts_left = p->attempts; // 设置剩余尝试次数 + status.try_origin_state = DNS_STATUS_TRY_ORIGIN_FIRST; // 设置尝试原始域名的状态 + + // 如果域名以点结尾,则跳过搜索域 + if (!name.empty() && name.back() == '.') + status.next_domain = p->search_list.size(); + // 如果域名中的点数少于ndots,则设置尝试原始域名的状态为最后尝试 + else if (__get_ndots(name) < p->ndots) + status.try_origin_state = DNS_STATUS_TRY_ORIGIN_LAST; + + // 检查是否有下一个域名要尝试,并更新状态 + __has_next_name(p, &status); + + // 创建一个新的DNS任务,使用下一个服务器的URI和提供的回调函数 + task = WFTaskFactory::create_dns_task(p->uris[next_server], 0, std::move(callback)); + status.next_server = next_server; // 设置当前服务器索引 + status.last_server = (next_server + p->uris.size() - 1) % p->uris.size(); // 设置最后一个服务器索引 + + req = task->get_req(); // 获取DNS请求对象 + req->set_question(status.current_name.c_str(), DNS_TYPE_A, DNS_CLASS_IN); // 设置DNS请求的问题部分 + req->set_rd(1); // 设置DNS请求的递归标志 + + ComplexTask *ctask = static_cast(task); // 将任务转换为ComplexTask类型 + // 设置任务的上下文回调,当DNS任务完成时,将调用__callback_internal函数 + *ctask->get_mutable_ctx() = std::bind(__callback_internal, + std::placeholders::_1, + *(DnsParams *)params, status); + + return task; // 返回新创建的DNS任务 } - diff --git a/src/client/WFKafkaClient.cc b/src/client/WFKafkaClient.cc index 8339aed..f5c26f4 100644 --- a/src/client/WFKafkaClient.cc +++ b/src/client/WFKafkaClient.cc @@ -49,1646 +49,1723 @@ using namespace protocol; using ComplexKafkaTask = WFComplexClientTask; +// 定义一个名为KafkaMember的类,用于表示Kafka的成员信息 class KafkaMember { public: - KafkaMember() : scheme("kafka://"), ref(1) - { - this->transport_type = TT_TCP; - this->cgroup_status = KAFKA_CGROUP_NONE; - this->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; - this->meta_doing = false; - this->cgroup_outdated = false; - this->client_deinit = false; - this->heartbeat_series = NULL; - } - - void incref() - { - ++this->ref; - } - - void decref() - { - if (--this->ref == 0) - delete this; - } - - enum TransportType transport_type; - std::string scheme; - std::vector broker_hosts; - SSL_CTX *ssl_ctx; - KafkaCgroup cgroup; - KafkaMetaList meta_list; - KafkaBrokerMap broker_map; - KafkaConfig config; - std::map meta_status; - std::mutex mutex; - char cgroup_status; - char heartbeat_status; - bool meta_doing; - bool cgroup_outdated; - bool client_deinit; - void *heartbeat_series; - size_t cgroup_wait_cnt; - size_t meta_wait_cnt; - std::atomic ref; + // 构造函数,初始化成员变量 + KafkaMember() : scheme("kafka://"), ref(1) + { + this->transport_type = TT_TCP; // 设置传输类型为TCP + this->cgroup_status = KAFKA_CGROUP_NONE; // 设置消费者组状态为无 + this->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; // 设置心跳状态为未初始化 + this->meta_doing = false; // 元数据操作标志设置为false + this->cgroup_outdated = false; // 消费者组信息过时标志设置为false + this->client_deinit = false; // 客户端去初始化标志设置为false + this->heartbeat_series = NULL; // 心跳序列指针设置为NULL + } + + // 增加引用计数 + void incref() + { + ++this->ref; // 引用计数加1 + } + + // 减少引用计数,如果计数为0则删除对象 + void decref() + { + if (--this->ref == 0) // 引用计数减1,如果为0 + delete this; // 删除当前对象 + } + + // 成员变量 + enum TransportType transport_type; // 传输类型 + std::string scheme; // 协议方案 + std::vector broker_hosts; // 代理主机列表 + SSL_CTX *ssl_ctx; // SSL上下文 + KafkaCgroup cgroup; // Kafka消费者组 + KafkaMetaList meta_list; // Kafka元数据列表 + KafkaBrokerMap broker_map; // Kafka代理映射 + KafkaConfig config; // Kafka配置 + std::map meta_status; // 元数据状态映射 + std::mutex mutex; // 互斥锁 + char cgroup_status; // 消费者组状态 + char heartbeat_status; // 心跳状态 + bool meta_doing; // 是否正在处理元数据 + bool cgroup_outdated; // 消费者组信息是否过时 + bool client_deinit; // 客户端是否已去初始化 + void *heartbeat_series; // 心跳序列指针 + size_t cgroup_wait_cnt; // 消费者组等待计数 + size_t meta_wait_cnt; // 元数据等待计数 + std::atomic ref; // 引用计数,使用原子操作以保证线程安全 }; +// 定义一个名为KafkaClientTask的类,继承自WFKafkaTask,用于处理Kafka客户端任务 class KafkaClientTask : public WFKafkaTask { public: - KafkaClientTask(const std::string& query, int retry_max, - kafka_callback_t&& callback, - WFKafkaClient *client) : - WFKafkaTask(retry_max, std::move(callback)) - { - this->api_type = Kafka_Unknown; - this->kafka_error = 0; - this->member = client->member; - this->query = query; - - this->member->incref(); - this->member->mutex.lock(); - this->config = client->member->config; - if (!this->member->broker_hosts.empty()) - { - int rpos = rand() % this->member->broker_hosts.size(); - this->url = this->member->broker_hosts.at(rpos); - } - this->member->mutex.unlock(); - - this->info_generated = false; - this->msg = NULL; - } - - virtual ~KafkaClientTask() - { - this->member->decref(); - } - - std::string *get_url() { return &this->url; } + // 构造函数,初始化任务参数 + KafkaClientTask(const std::string& query, int retry_max, + kafka_callback_t&& callback, + WFKafkaClient *client) : + WFKafkaTask(retry_max, std::move(callback)) // 调用基类构造函数,传递重试次数和回调函数 + { + this->api_type = Kafka_Unknown; // 设置API类型为未知 + this->kafka_error = 0; // 初始化Kafka错误码为0 + this->member = client->member; // 获取WFKafkaClient的成员对象 + this->query = query; // 保存查询字符串 + + this->member->incref(); // 增加成员对象的引用计数 + this->member->mutex.lock(); // 锁定成员对象的互斥锁 + this->config = client->member->config; // 复制成员对象的配置 + if (!this->member->broker_hosts.empty()) // 如果代理主机列表不为空 + { + int rpos = rand() % this->member->broker_hosts.size(); // 随机选择一个代理主机 + this->url = this->member->broker_hosts.at(rpos); // 保存选中的代理主机URL + } + this->member->mutex.unlock(); // 解锁成员对象的互斥锁 + + this->info_generated = false; // 信息生成标志设置为false + this->msg = NULL; // 消息指针设置为NULL + } + + // 析构函数,减少成员对象的引用计数 + virtual ~KafkaClientTask() + { + this->member->decref(); // 减少成员对象的引用计数 + } + + // 获取当前任务的URL + std::string *get_url() { return &this->url; } protected: - virtual bool add_topic(const std::string& topic); + // 添加主题 + virtual bool add_topic(const std::string& topic); - virtual bool add_toppar(const KafkaToppar& toppar); + // 添加toppar(topic partition) + virtual bool add_toppar(const KafkaToppar& toppar); - virtual bool add_produce_record(const std::string& topic, int partition, - KafkaRecord record); + // 添加生产记录 + virtual bool add_produce_record(const std::string& topic, int partition, + KafkaRecord record); - virtual bool add_offset_toppar(const KafkaToppar& toppar); + // 添加偏移量toppar + virtual bool add_offset_toppar(const KafkaToppar& toppar); - virtual void dispatch(); + // 分派任务 + virtual void dispatch(); - virtual void parse_query(); - virtual void generate_info(); + // 解析查询字符串 + virtual void parse_query(); + virtual void generate_info(); private: - static void kafka_meta_callback(__WFKafkaTask *task); + // Kafka元数据回调函数 + static void kafka_meta_callback(__WFKafkaTask *task); - static void kafka_merge_meta_list(KafkaMetaList *dst, - KafkaMetaList *src); + // 合并元数据列表 + static void kafka_merge_meta_list(KafkaMetaList *dst, + KafkaMetaList *src); - static void kafka_merge_broker_list(const std::string& scheme, - std::vector *hosts, - KafkaBrokerMap *dst, - KafkaBrokerList *src); + // 合并代理列表 + static void kafka_merge_broker_list(const std::string& scheme, + std::vector *hosts, + KafkaBrokerMap *dst, + KafkaBrokerList *src); - static void kafka_cgroup_callback(__WFKafkaTask *task); + // Kafka消费者组回调函数 + static void kafka_cgroup_callback(__WFKafkaTask *task); - static void kafka_offsetcommit_callback(__WFKafkaTask *task); + // Kafka偏移量提交回调函数 + static void kafka_offsetcommit_callback(__WFKafkaTask *task); - static void kafka_parallel_callback(const ParallelWork *pwork); + // Kafka并行工作回调函数 + static void kafka_parallel_callback(const ParallelWork *pwork); - static void kafka_timer_callback(WFTimerTask *task); + // Kafka定时器回调函数 + static void kafka_timer_callback(WFTimerTask *task); - static void kafka_heartbeat_callback(__WFKafkaTask *task); + // Kafka心跳回调函数 + static void kafka_heartbeat_callback(__WFKafkaTask *task); - static void kafka_leavegroup_callback(__WFKafkaTask *task); + // Kafka离开组回调函数 + static void kafka_leavegroup_callback(__WFKafkaTask *task); - static void kafka_rebalance_proc(KafkaMember *member, SeriesWork *series); + // Kafka重新平衡处理函数 + static void kafka_rebalance_proc(KafkaMember *member, SeriesWork *series); - static void kafka_rebalance_callback(__WFKafkaTask *task); + // Kafka重新平衡回调函数 + static void kafka_rebalance_callback(__WFKafkaTask *task); - void kafka_move_task_callback(__WFKafkaTask *task); + // Kafka移动任务回调函数 + void kafka_move_task_callback(__WFKafkaTask *task); - void kafka_process_toppar_offset(KafkaToppar *task_toppar); + // Kafka处理toppar偏移量 + void kafka_process_toppar_offset(KafkaToppar *task_toppar); - bool compare_topics(KafkaClientTask *task); + // 比较主题 + bool compare_topics(KafkaClientTask *task); - bool check_cgroup(); + // 检查消费者组 + bool check_cgroup(); - bool check_meta(); + // 检查元数据 + bool check_meta(); - int arrange_toppar(int api_type); + // 安排toppar + int arrange_toppar(int api_type); - int arrange_produce(); + // 安排生产 + int arrange_produce(); - int arrange_fetch(); + // 安排获取 + int arrange_fetch(); - int arrange_commit(); + // 安排提交 + int arrange_commit(); - int arrange_offset(); + // 安排偏移量 + int arrange_offset(); - int dispatch_locked(); + // 上锁分派 + int dispatch_locked(); - KafkaBroker *get_broker(int node_id) - { - return this->member->broker_map.find_item(node_id); - } + // 获取代理对象 + KafkaBroker *get_broker(int node_id) + { + return this->member->broker_map.find_item(node_id); + } - int get_node_id(const KafkaToppar *toppar); + // 获取节点ID + int get_node_id(const KafkaToppar *toppar); - bool get_meta_status(KafkaMetaList **uninit_meta_list); - void set_meta_status(bool status); + // 获取元数据状态 + bool get_meta_status(KafkaMetaList **uninit_meta_list); + void set_meta_status(bool status); - std::string get_userinfo() { return this->userinfo; } + // 获取用户信息 + std::string get_userinfo() { return this->userinfo; } private: - KafkaMember *member; - KafkaBroker broker; - std::map toppar_list_map; - std::string url; - std::string query; - std::set topic_set; - std::string userinfo; - bool info_generated; - bool wait_cgroup; - void *msg; - - friend class WFKafkaClient; + // 成员变量 + KafkaMember *member; // Kafka成员对象 + KafkaBroker broker; // Kafka代理对象 + std::map toppar_list_map; // toppar列表映射 + std::string url; // URL + std::string query; // 查询字符串 + std::set topic_set; // 主题集合 + std::string userinfo; // 用户信息 + bool info_generated; // 信息是否已生成 + bool wait_cgroup; // 是否等待消费者组 + void *msg; // 消息指针 + + // 友元类 + friend class WFKafkaClient; }; +// 获取KafkaToppar对应的节点ID int KafkaClientTask::get_node_id(const KafkaToppar *toppar) { - int preferred_read_replica = toppar->get_preferred_read_replica(); - if (preferred_read_replica >= 0) - return preferred_read_replica; - - bool flag = false; - this->member->meta_list.rewind(); - KafkaMeta *meta; - while ((meta = this->member->meta_list.get_next()) != NULL) - { - if (strcmp(meta->get_topic(), toppar->get_topic()) == 0) - { - flag = true; - break; - } - } - - const kafka_broker_t *broker = NULL; - if (flag) - broker = meta->get_broker(toppar->get_partition()); - - if (!broker) - return -1; - - return broker->node_id; + int preferred_read_replica = toppar->get_preferred_read_replica(); // 获取首选的读取副本 + if (preferred_read_replica >= 0) + return preferred_read_replica; // 如果有首选的读取副本,直接返回其节点ID + + bool flag = false; // 标记是否找到匹配的主题 + this->member->meta_list.rewind(); // 重置元数据列表 + KafkaMeta *meta; // 指向元数据的指针 + while ((meta = this->member->meta_list.get_next()) != NULL) // 遍历元数据列表 + { + if (strcmp(meta->get_topic(), toppar->get_topic()) == 0) // 如果找到匹配的主题 + { + flag = true; // 设置标志为true + break; // 跳出循环 + } + } + + const kafka_broker_t *broker = NULL; // 指向代理的指针 + if (flag) // 如果找到了匹配的主题 + broker = meta->get_broker(toppar->get_partition()); // 获取对应分区的代理 + + if (!broker) // 如果没有找到代理 + return -1; // 返回-1表示失败 + + return broker->node_id; // 返回代理的节点ID } +// Kafka偏移量提交回调函数 void KafkaClientTask::kafka_offsetcommit_callback(__WFKafkaTask *task) { - KafkaClientTask *t = (KafkaClientTask *)task->user_data; - if (task->get_state() == WFT_STATE_SUCCESS) - t->result.set_resp(std::move(*task->get_resp()), 0); - - t->finish = true; - t->state = task->get_state(); - t->error = task->get_error(); - t->kafka_error = *static_cast(task)->get_mutable_ctx(); + KafkaClientTask *t = (KafkaClientTask *)task->user_data; // 将user_data转换为KafkaClientTask对象 + if (task->get_state() == WFT_STATE_SUCCESS) // 如果任务状态为成功 + t->result.set_resp(std::move(*task->get_resp()), 0); // 设置任务响应结果 + + t->finish = true; // 设置任务完成标志为true + t->state = task->get_state(); // 设置任务状态 + t->error = task->get_error(); // 设置任务错误码 + t->kafka_error = *static_cast(task)->get_mutable_ctx(); // 设置Kafka错误码 } +// Kafka离开组回调函数 void KafkaClientTask::kafka_leavegroup_callback(__WFKafkaTask *task) { - KafkaClientTask *t = (KafkaClientTask *)task->user_data; - t->finish = true; - t->state = task->get_state(); - t->error = task->get_error(); - t->kafka_error = *static_cast(task)->get_mutable_ctx(); + KafkaClientTask *t = (KafkaClientTask *)task->user_data; // 将user_data转换为KafkaClientTask对象 + t->finish = true; // 设置任务完成标志为true + t->state = task->get_state(); // 设置任务状态 + t->error = task->get_error(); // 设置任务错误码 + t->kafka_error = *static_cast(task)->get_mutable_ctx(); // 设置Kafka错误码 } +// Kafka重新平衡回调函数 void KafkaClientTask::kafka_rebalance_callback(__WFKafkaTask *task) { - KafkaMember *member = (KafkaMember *)task->user_data; - SeriesWork *series = series_of(task); - size_t max; - - member->mutex.lock(); - if (member->client_deinit) - { - member->mutex.unlock(); - member->decref(); - return; - } - - if (task->get_state() == WFT_STATE_SUCCESS) - { - member->cgroup_status = KAFKA_CGROUP_DONE; - member->cgroup = std::move(*(task->get_resp()->get_cgroup())); - - if (member->heartbeat_status == KAFKA_HEARTBEAT_UNINIT) - { - __WFKafkaTask *kafka_task; - KafkaBroker *coordinator = member->cgroup.get_coordinator(); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - coordinator->get_host(), - coordinator->get_port(), - member->ssl_ctx, "", 0, - kafka_heartbeat_callback); - kafka_task->user_data = member; - kafka_task->get_req()->set_api_type(Kafka_Heartbeat); - kafka_task->get_req()->set_cgroup(member->cgroup); - kafka_task->get_req()->set_broker(*coordinator); - series->push_back(kafka_task); - - member->heartbeat_status = KAFKA_HEARTBEAT_DOING; - member->heartbeat_series = series; - } - - max = member->cgroup_wait_cnt; - char name[64]; - snprintf(name, 64, "%p.cgroup", member); - member->mutex.unlock(); - - WFTaskFactory::signal_by_name(name, NULL, max); - } - else - { - kafka_rebalance_proc(member, series); - member->mutex.unlock(); - } + KafkaMember *member = (KafkaMember *)task->user_data; // 将user_data转换为KafkaMember对象 + SeriesWork *series = series_of(task); // 获取与任务关联的SeriesWork对象 + size_t max; + + member->mutex.lock(); // 锁定成员对象的互斥锁 + if (member->client_deinit) // 如果客户端已去初始化 + { + member->mutex.unlock(); // 解锁互斥锁 + member->decref(); // 减少成员对象的引用计数 + return; // 直接返回 + } + + if (task->get_state() == WFT_STATE_SUCCESS) // 如果任务状态为成功 + { + member->cgroup_status = KAFKA_CGROUP_DONE; // 设置消费者组状态为完成 + member->cgroup = std::move(*(task->get_resp()->get_cgroup())); // 获取并保存消费者组信息 + + if (member->heartbeat_status == KAFKA_HEARTBEAT_UNINIT) // 如果心跳状态为未初始化 + { + __WFKafkaTask *kafka_task; // 创建一个新的Kafka任务 + KafkaBroker *coordinator = member->cgroup.get_coordinator(); // 获取协调者代理 + kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + coordinator->get_host(), + coordinator->get_port(), + member->ssl_ctx, "", 0, + kafka_heartbeat_callback); + kafka_task->user_data = member; // 设置任务的用户数据为成员对象 + kafka_task->get_req()->set_api_type(Kafka_Heartbeat); // 设置API类型为心跳 + kafka_task->get_req()->set_cgroup(member->cgroup); // 设置消费者组信息 + kafka_task->get_req()->set_broker(*coordinator); // 设置协调者代理 + series->push_back(kafka_task); // 将心跳任务添加到SeriesWork中 + + member->heartbeat_status = KAFKA_HEARTBEAT_DOING; // 设置心跳状态为进行中 + member->heartbeat_series = series; // 保存心跳任务的SeriesWork对象 + } + + max = member->cgroup_wait_cnt; // 获取等待计数 + char name[64]; // 创建一个名称缓冲区 + snprintf(name, 64, "%p.cgroup", member); // 格式化名称 + member->mutex.unlock(); // 解锁互斥锁 + + WFTaskFactory::signal_by_name(name, NULL, max); // 根据名称发送信号 + } + else // 如果任务状态不为成功 + { + kafka_rebalance_proc(member, series); // 调用重新平衡处理函数 + member->mutex.unlock(); // 解锁互斥锁 + } } +// Kafka重新平衡处理函数 void KafkaClientTask::kafka_rebalance_proc(KafkaMember *member, SeriesWork *series) { - KafkaBroker *coordinator = member->cgroup.get_coordinator(); - __WFKafkaTask *task; - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - coordinator->get_host(), - coordinator->get_port(), - member->ssl_ctx, "", 0, - kafka_rebalance_callback); - task->user_data = member; - task->get_req()->set_config(member->config); - task->get_req()->set_api_type(Kafka_FindCoordinator); - task->get_req()->set_cgroup(member->cgroup); - task->get_req()->set_meta_list(member->meta_list); - - member->cgroup_status = KAFKA_CGROUP_DOING; - member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; - member->cgroup_outdated = false; - - series->push_back(task); + KafkaBroker *coordinator = member->cgroup.get_coordinator(); // 获取消费者组的协调者代理 + __WFKafkaTask *task; + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, // 创建一个新的Kafka任务 + coordinator->get_host(), + coordinator->get_port(), + member->ssl_ctx, "", 0, + kafka_rebalance_callback); + task->user_data = member; // 设置任务的用户数据为成员对象 + task->get_req()->set_config(member->config); // 设置任务的配置 + task->get_req()->set_api_type(Kafka_FindCoordinator); // 设置API类型为查找协调者 + task->get_req()->set_cgroup(member->cgroup); // 设置消费者组信息 + task->get_req()->set_meta_list(member->meta_list); // 设置元数据列表 + + member->cgroup_status = KAFKA_CGROUP_DOING; // 设置消费者组状态为进行中 + member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; // 重置心跳状态为未初始化 + member->cgroup_outdated = false; // 重置消费者组信息过时标志为false + + series->push_back(task); // 将任务添加到SeriesWork中 } +// Kafka心跳回调函数 void KafkaClientTask::kafka_heartbeat_callback(__WFKafkaTask *task) { - KafkaMember *member = (KafkaMember *)task->user_data; - SeriesWork *series = series_of(task); - KafkaResponse *resp = task->get_resp(); - - member->mutex.lock(); - - if (member->client_deinit || member->heartbeat_series != series) - { - member->mutex.unlock(); - member->decref(); - return; - } - - if (resp->get_cgroup()->get_error() == 0) - { - member->heartbeat_status = KAFKA_HEARTBEAT_DONE; - WFTimerTask *timer_task; - timer_task = WFTaskFactory::create_timer_task(KAFKA_HEARTBEAT_INTERVAL, - kafka_timer_callback); - timer_task->user_data = member; - series->push_back(timer_task); - } - else - kafka_rebalance_proc(member, series); - - member->mutex.unlock(); + KafkaMember *member = (KafkaMember *)task->user_data; // 将user_data转换为KafkaMember对象 + SeriesWork *series = series_of(task); // 获取与任务关联的SeriesWork对象 + KafkaResponse *resp = task->get_resp(); // 获取任务的响应对象 + + member->mutex.lock(); // 锁定成员对象的互斥锁 + + if (member->client_deinit || member->heartbeat_series != series) // 如果客户端已去初始化或心跳SeriesWork不匹配 + { + member->mutex.unlock(); // 解锁互斥锁 + member->decref(); // 减少成员对象的引用计数 + return; // 直接返回 + } + + if (resp->get_cgroup()->get_error() == 0) // 如果心跳响应没有错误 + { + member->heartbeat_status = KAFKA_HEARTBEAT_DONE; // 设置心跳状态为完成 + WFTimerTask *timer_task; // 创建一个新的定时器任务 + timer_task = WFTaskFactory::create_timer_task(KAFKA_HEARTBEAT_INTERVAL, // 设置心跳间隔 + kafka_timer_callback); + timer_task->user_data = member; // 设置定时器任务的用户数据为成员对象 + series->push_back(timer_task); // 将定时器任务添加到SeriesWork中 + } + else + kafka_rebalance_proc(member, series); // 如果心跳响应有错误,调用重新平衡处理函数 + + member->mutex.unlock(); // 解锁互斥锁 } +// Kafka定时器回调函数 void KafkaClientTask::kafka_timer_callback(WFTimerTask *task) { - KafkaMember *member = (KafkaMember *)task->user_data; - SeriesWork *series = series_of(task); - - member->mutex.lock(); - if (member->client_deinit || member->heartbeat_series != series) - { - member->mutex.unlock(); - member->decref(); - return; - } - - member->heartbeat_status = KAFKA_HEARTBEAT_DOING; - - __WFKafkaTask *kafka_task; - KafkaBroker *coordinator = member->cgroup.get_coordinator(); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - coordinator->get_host(), - coordinator->get_port(), - member->ssl_ctx, "", 0, - kafka_heartbeat_callback); - - kafka_task->user_data = member; - kafka_task->get_req()->set_config(member->config); - kafka_task->get_req()->set_api_type(Kafka_Heartbeat); - kafka_task->get_req()->set_cgroup(member->cgroup); - kafka_task->get_req()->set_broker(*coordinator); - series->push_back(kafka_task); - - member->mutex.unlock(); + KafkaMember *member = (KafkaMember *)task->user_data; // 将user_data转换为KafkaMember对象 + SeriesWork *series = series_of(task); // 获取与任务关联的SeriesWork对象 + + member->mutex.lock(); // 锁定成员对象的互斥锁 + if (member->client_deinit || member->heartbeat_series != series) // 如果客户端已去初始化或心跳SeriesWork不匹配 + { + member->mutex.unlock(); // 解锁互斥锁 + member->decref(); // 减少成员对象的引用计数 + return; // 直接返回 + } + + member->heartbeat_status = KAFKA_HEARTBEAT_DOING; // 设置心跳状态为进行中 + + __WFKafkaTask *kafka_task; // 创建一个新的Kafka任务 + KafkaBroker *coordinator = member->cgroup.get_coordinator(); // 获取消费者组的协调者代理 + kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, // 设置传输类型 + coordinator->get_host(), // 设置协调者代理的主机 + coordinator->get_port(), // 设置协调者代理的端口 + member->ssl_ctx, "", 0, // 设置SSL上下文 + kafka_heartbeat_callback); // 设置心跳回调函数 + kafka_task->user_data = member; // 设置任务的用户数据为成员对象 + kafka_task->get_req()->set_config(member->config); // 设置任务的配置 + kafka_task->get_req()->set_api_type(Kafka_Heartbeat); // 设置API类型为心跳 + kafka_task->get_req()->set_cgroup(member->cgroup); // 设置消费者组信息 + kafka_task->get_req()->set_broker(*coordinator); // 设置协调者代理 + series->push_back(kafka_task); // 将心跳任务添加到SeriesWork中 + + member->mutex.unlock(); // 解锁互斥锁 } -void KafkaClientTask::kafka_merge_meta_list(KafkaMetaList *dst, - KafkaMetaList *src) +// 合并元数据列表 +void KafkaClientTask::kafka_merge_meta_list(KafkaMetaList *dst, KafkaMetaList *src) { - src->rewind(); - KafkaMeta *src_meta; - while ((src_meta = src->get_next()) != NULL) - { - dst->rewind(); - - KafkaMeta *dst_meta; - while ((dst_meta = dst->get_next()) != NULL) - { - if (strcmp(dst_meta->get_topic(), src_meta->get_topic()) == 0) - { - dst->del_cur(); - delete dst_meta; - break; - } - } - - dst->add_item(*src_meta); - } + src->rewind(); // 重置源元数据列表 + KafkaMeta *src_meta; + while ((src_meta = src->get_next()) != NULL) // 遍历源元数据列表 + { + dst->rewind(); // 重置目标元数据列表 + + KafkaMeta *dst_meta; + while ((dst_meta = dst->get_next()) != NULL) // 遍历目标元数据列表 + { + if (strcmp(dst_meta->get_topic(), src_meta->get_topic()) == 0) // 如果找到相同主题的元数据 + { + dst->del_cur(); // 删除当前元数据 + delete dst_meta; // 释放内存 + break; // 跳出循环 + } + } + + dst->add_item(*src_meta); // 将源元数据添加到目标列表 + } } +// 合并代理列表 void KafkaClientTask::kafka_merge_broker_list(const std::string& scheme, - std::vector *hosts, - KafkaBrokerMap *dst, - KafkaBrokerList *src) + std::vector *hosts, + KafkaBrokerMap *dst, + KafkaBrokerList *src) { - hosts->clear(); - src->rewind(); - KafkaBroker *src_broker; - while ((src_broker = src->get_next()) != NULL) - { - std::string host = scheme + src_broker->get_host() + ":" + - std::to_string(src_broker->get_port()); - hosts->emplace_back(std::move(host)); - - if (!dst->find_item(src_broker->get_node_id())) - dst->add_item(*src_broker, src_broker->get_node_id()); - } + hosts->clear(); // 清空主机列表 + src->rewind(); // 重置源代理列表 + KafkaBroker *src_broker; + while ((src_broker = src->get_next()) != NULL) // 遍历源代理列表 + { + std::string host = scheme + src_broker->get_host() + ":" + std::to_string(src_broker->get_port()); // 构建主机字符串 + hosts->emplace_back(std::move(host)); // 添加到主机列表 + + if (!dst->find_item(src_broker->get_node_id())) // 如果目标代理映射中没有该节点ID + dst->add_item(*src_broker, src_broker->get_node_id()); // 添加代理到目标映射 + } } +// Kafka元数据回调函数 void KafkaClientTask::kafka_meta_callback(__WFKafkaTask *task) { - KafkaClientTask *t = (KafkaClientTask *)task->user_data; - void *msg = NULL; - size_t max; - - t->member->mutex.lock(); - t->state = task->get_state(); - t->error = task->get_error(); - t->kafka_error = *static_cast(task)->get_mutable_ctx(); - if (t->state == WFT_STATE_SUCCESS) - { - kafka_merge_meta_list(&t->member->meta_list, - task->get_resp()->get_meta_list()); - - t->meta_list.rewind(); - KafkaMeta *meta; - while ((meta = t->meta_list.get_next()) != NULL) - (t->member->meta_status)[meta->get_topic()] = true; - - kafka_merge_broker_list(t->member->scheme, - &t->member->broker_hosts, - &t->member->broker_map, - task->get_resp()->get_broker_list()); - } - else - { - t->meta_list.rewind(); - KafkaMeta *meta; - while ((meta = t->meta_list.get_next()) != NULL) - (t->member->meta_status)[meta->get_topic()] = false; - - t->finish = true; - msg = t; - } - - t->member->meta_doing = false; - max = t->member->meta_wait_cnt; - char name[64]; - snprintf(name, 64, "%p.meta", t->member); - t->member->mutex.unlock(); - - WFTaskFactory::signal_by_name(name, msg, max); + KafkaClientTask *t = (KafkaClientTask *)task->user_data; // 将user_data转换为KafkaClientTask对象 + void *msg = NULL; // 消息指针 + size_t max; + + t->member->mutex.lock(); // 锁定成员对象的互斥锁 + t->state = task->get_state(); // 获取任务状态 + t->error = task->get_error(); // 获取错误码 + t->kafka_error = *static_cast(task)->get_mutable_ctx(); // 获取Kafka错误码 + if (t->state == WFT_STATE_SUCCESS) // 如果任务成功 + { + kafka_merge_meta_list(&t->member->meta_list, task->get_resp()->get_meta_list()); // 合并元数据列表 + + t->meta_list.rewind(); // 重置元数据列表 + KafkaMeta *meta; + while ((meta = t->meta_list.get_next()) != NULL) // 遍历元数据列表 + (t->member->meta_status)[meta->get_topic()] = true; // 更新元数据状态 + + kafka_merge_broker_list(t->member->scheme, &t->member->broker_hosts, &t->member->broker_map, task->get_resp()->get_broker_list()); // 合并代理列表 + } + else // 如果任务失败 + { + t->meta_list.rewind(); // 重置元数据列表 + KafkaMeta *meta; + while ((meta = t->meta_list.get_next()) != NULL) // 遍历元数据列表 + (t->member->meta_status)[meta->get_topic()] = false; // 更新元数据状态为失败 + + t->finish = true; // 设置任务完成标志 + msg = t; // 设置消息指针为当前任务 + } + + t->member->meta_doing = false; // 设置元数据操作标志为false + max = t->member->meta_wait_cnt; // 获取等待计数 + char name[64]; // 创建名称缓冲区 + snprintf(name, 64, "%p.meta", t->member); // 格式化名称 + t->member->mutex.unlock(); // 解锁互斥锁 + + WFTaskFactory::signal_by_name(name, msg, max); // 根据名称发送信号 } +// Kafka消费者组回调函数 void KafkaClientTask::kafka_cgroup_callback(__WFKafkaTask *task) { - KafkaClientTask *t = (KafkaClientTask *)task->user_data; - KafkaMember *member = t->member; - SeriesWork *heartbeat_series = NULL; - void *msg = NULL; - size_t max; - - member->mutex.lock(); - t->state = task->get_state(); - t->error = task->get_error(); - t->kafka_error = *static_cast(task)->get_mutable_ctx(); - - if (t->state == WFT_STATE_SUCCESS) - { - member->cgroup = std::move(*(task->get_resp()->get_cgroup())); - - kafka_merge_meta_list(&member->meta_list, - task->get_resp()->get_meta_list()); - - t->meta_list.rewind(); - KafkaMeta *meta; - while ((meta = t->meta_list.get_next()) != NULL) - (member->meta_status)[meta->get_topic()] = true; - - kafka_merge_broker_list(member->scheme, - &member->broker_hosts, - &member->broker_map, - task->get_resp()->get_broker_list()); - - member->cgroup_status = KAFKA_CGROUP_DONE; - - if (member->heartbeat_status == KAFKA_HEARTBEAT_UNINIT) - { - __WFKafkaTask *kafka_task; - KafkaBroker *coordinator = member->cgroup.get_coordinator(); - kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - coordinator->get_host(), - coordinator->get_port(), - member->ssl_ctx, "", 0, - kafka_heartbeat_callback); - kafka_task->user_data = member; - member->incref(); - - kafka_task->get_req()->set_config(member->config); - kafka_task->get_req()->set_api_type(Kafka_Heartbeat); - kafka_task->get_req()->set_cgroup(member->cgroup); - kafka_task->get_req()->set_broker(*coordinator); - - heartbeat_series = Workflow::create_series_work(kafka_task, nullptr); - member->heartbeat_status = KAFKA_HEARTBEAT_DOING; - member->heartbeat_series = heartbeat_series; - } - } - else - { - member->cgroup_status = KAFKA_CGROUP_UNINIT; - member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; - member->heartbeat_series = NULL; - t->finish = true; - msg = t; - } - - max = member->cgroup_wait_cnt; - char name[64]; - snprintf(name, 64, "%p.cgroup", member); - member->mutex.unlock(); - - WFTaskFactory::signal_by_name(name, msg, max); - - if (heartbeat_series) - heartbeat_series->start(); + KafkaClientTask *t = (KafkaClientTask *)task->user_data; // 将user_data转换为KafkaClientTask对象 + KafkaMember *member = t->member; // 获取Kafka成员对象 + SeriesWork *heartbeat_series = NULL; // 心跳SeriesWork对象 + void *msg = NULL; // 消息指针 + size_t max; + + member->mutex.lock(); // 锁定成员对象的互斥锁 + t->state = task->get_state(); // 获取任务状态 + t->error = task->get_error(); // 获取错误码 + t->kafka_error = *static_cast(task)->get_mutable_ctx(); // 获取Kafka错误码 + + if (t->state == WFT_STATE_SUCCESS) // 如果任务成功 + { + member->cgroup = std::move(*(task->get_resp()->get_cgroup())); // 获取并保存消费者组信息 + + kafka_merge_meta_list(&member->meta_list, task->get_resp()->get_meta_list()); // 合并元数据列表 + + t->meta_list.rewind(); // 重置元数据列表 + KafkaMeta *meta; + while ((meta = t->meta_list.get_next()) != NULL) // 遍历元数据列表 + (member->meta_status)[meta->get_topic()] = true; // 更新元数据状态 + + kafka_merge_broker_list(member->scheme, &member->broker_hosts, &member->broker_map, task->get_resp()->get_broker_list()); // 合并代理列表 + + member->cgroup_status = KAFKA_CGROUP_DONE; // 设置消费者组状态为完成 + + if (member->heartbeat_status == KAFKA_HEARTBEAT_UNINIT) // 如果心跳状态为未初始化 + { + __WFKafkaTask *kafka_task; // 创建一个新的Kafka任务 + KafkaBroker *coordinator = member->cgroup.get_coordinator(); // 获取协调者代理 + kafka_task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, // 设置传输类型 + coordinator->get_host(), // 设置协调者代理的主机 + coordinator->get_port(), // 设置协调者代理的端口 + member->ssl_ctx, "", 0, // 设置SSL上下文 + kafka_heartbeat_callback); // 设置心跳回调函数 + kafka_task->user_data = member; // 设置任务的用户数据为成员对象 + member->incref(); // 增加成员对象的引用计数 + + kafka_task->get_req()->set_config(member->config); // 设置任务的配置 + kafka_task->get_req()->set_api_type(Kafka_Heartbeat); // 设置API类型为心跳 + kafka_task->get_req()->set_cgroup(member->cgroup); // 设置消费者组信息 + kafka_task->get_req()->set_broker(*coordinator); // 设置协调者代理 + + heartbeat_series = Workflow::create_series_work(kafka_task, nullptr); // 创建心跳SeriesWork + member->heartbeat_status = KAFKA_HEARTBEAT_DOING; // 设置心跳状态为进行中 + member->heartbeat_series = heartbeat_series; // 保存心跳SeriesWork对象 + } + } + else // 如果任务失败 + { + member->cgroup_status = KAFKA_CGROUP_UNINIT; // 重置消费者组状态为未初始化 + member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; // 重置心跳状态为未初始化 + member->heartbeat_series = NULL; // 重置心跳SeriesWork对象 + t->finish = true; // 设置任务完成标志 + msg = t; // 设置消息指针为当前任务 + } + + max = member->cgroup_wait_cnt; // 获取等待计数 + char name[64]; // 创建名称缓冲区 + snprintf(name, 64, "%p.cgroup", member); // 格式化名称 + member->mutex.unlock(); // 解锁互斥锁 + + WFTaskFactory::signal_by_name(name, msg, max); // 根据名称发送信号 + + if (heartbeat_series) // 如果心跳SeriesWork对象不为空 + heartbeat_series->start(); // 启动心跳SeriesWork } +// Kafka并行工作回调函数 void KafkaClientTask::kafka_parallel_callback(const ParallelWork *pwork) { - KafkaClientTask *t = (KafkaClientTask *)pwork->get_context(); - t->finish = true; - t->state = WFT_STATE_TASK_ERROR; - t->error = 0; - - std::pair *state_error; - bool flag = false; - int16_t state = WFT_STATE_SUCCESS; - int16_t error = 0; - int kafka_error = 0; - for (size_t i = 0; i < pwork->size(); i++) - { - state_error = (std::pair *)pwork->series_at(i)->get_context(); - if ((state_error->first >> 16) != WFT_STATE_SUCCESS) - { - if (!flag) - { - flag = true; - t->member->mutex.lock(); - t->set_meta_status(false); - t->member->mutex.unlock(); - } - state = state_error->first >> 16; - error = state_error->first & 0xffff; - kafka_error = state_error->second; - } - else - { - t->state = WFT_STATE_SUCCESS; - } - - delete state_error; - } - - if (t->state != WFT_STATE_SUCCESS) - { - t->state = state; - t->error = error; - t->kafka_error = kafka_error; - } + KafkaClientTask *t = (KafkaClientTask *)pwork->get_context(); // 获取并行工作的用户上下文 + t->finish = true; // 设置任务完成标志为true + t->state = WFT_STATE_TASK_ERROR; // 设置任务状态为任务错误 + t->error = 0; // 设置错误码为0 + + std::pair *state_error; // 状态错误对 + bool flag = false; // 标记是否已处理错误 + int16_t state = WFT_STATE_SUCCESS; // 状态 + int16_t error = 0; // 错误码 + int kafka_error = 0; // Kafka错误码 + for (size_t i = 0; i < pwork->size(); i++) // 遍历并行工作的所有任务 + { + state_error = (std::pair *)pwork->series_at(i)->get_context(); // 获取任务的状态错误对 + if ((state_error->first >> 16) != WFT_STATE_SUCCESS) // 如果任务状态不为成功 + { + if (!flag) // 如果还未处理错误 + { + flag = true; // 设置已处理错误标志为true + t->member->mutex.lock(); // 锁定成员对象的互斥锁 + t->set_meta_status(false); // 设置元数据状态为false + t->member->mutex.unlock(); // 解锁互斥锁 + } + state = state_error->first >> 16; // 获取状态 + error = state_error->first & 0xffff; // 获取错误码 + kafka_error = state_error->second; // 获取Kafka错误码 + } + else + { + t->state = WFT_STATE_SUCCESS; // 如果任务成功,设置任务状态为成功 + } + + delete state_error; // 释放状态错误对对象 + } + + if (t->state != WFT_STATE_SUCCESS) // 如果任务状态不为成功 + { + t->state = state; // 设置状态 + t->error = error; // 设置错误码 + t->kafka_error = kafka_error; // 设置Kafka错误码 + } } +// Kafka处理toppar偏移量 void KafkaClientTask::kafka_process_toppar_offset(KafkaToppar *task_toppar) { - KafkaToppar *toppar; - - struct list_head *pos; - list_for_each(pos, this->member->cgroup.get_assigned_toppar_list()) - { - toppar = this->member->cgroup.get_assigned_toppar_by_pos(pos); - if (strcmp(toppar->get_topic(), task_toppar->get_topic()) == 0 && - toppar->get_partition() == task_toppar->get_partition()) - { - long long offset = task_toppar->get_offset() - 1; - KafkaRecord *last_record = task_toppar->get_tail_record(); - if (last_record) - offset = last_record->get_offset(); - toppar->set_offset(offset + 1); - toppar->set_low_watermark(task_toppar->get_low_watermark()); - toppar->set_high_watermark(task_toppar->get_high_watermark()); - } - } + KafkaToppar *toppar; // toppar对象 + struct list_head *pos; // 链表节点 + list_for_each(pos, this->member->cgroup.get_assigned_toppar_list()) // 遍历分配的toppar链表 + { + toppar = this->member->cgroup.get_assigned_toppar_by_pos(pos); // 获取toppar对象 + if (strcmp(toppar->get_topic(), task_toppar->get_topic()) == 0 && // 如果主题相同 + toppar->get_partition() == task_toppar->get_partition()) // 并且分区相同 + { + long long offset = task_toppar->get_offset() - 1; // 计算偏移量 + KafkaRecord *last_record = task_toppar->get_tail_record(); // 获取最后一个记录 + if (last_record) // 如果有最后一个记录 + offset = last_record->get_offset(); // 更新偏移量 + toppar->set_offset(offset + 1); // 设置toppar的偏移量 + toppar->set_low_watermark(task_toppar->get_low_watermark()); // 设置低水位 + toppar->set_high_watermark(task_toppar->get_high_watermark()); // 设置高水位 + } + } } +// Kafka移动任务回调函数 void KafkaClientTask::kafka_move_task_callback(__WFKafkaTask *task) { - auto *state_error = new std::pair; - int16_t state = task->get_state(); - int16_t error = task->get_error(); - - /* 'state' is always positive. */ - state_error->first = (state << 16) | error; - state_error->second = *static_cast(task)->get_mutable_ctx(); - series_of(task)->set_context(state_error); - - KafkaTopparList *toppar_list = task->get_resp()->get_toppar_list(); - - if (task->get_state() == WFT_STATE_SUCCESS && - task->get_resp()->get_api_type() == Kafka_Fetch) - { - toppar_list->rewind(); - KafkaToppar *task_toppar; - - while ((task_toppar = toppar_list->get_next()) != NULL) - kafka_process_toppar_offset(task_toppar); - } - - if (task->get_state() == WFT_STATE_SUCCESS) - { - long idx = (long)(task->user_data); - this->result.set_resp(std::move(*task->get_resp()), idx); - } + auto *state_error = new std::pair; // 创建状态错误对对象 + int16_t state = task->get_state(); // 获取任务状态 + int16_t error = task->get_error(); // 获取错误码 + + /* 'state' is always positive. */ + state_error->first = (state << 16) | error; // 设置状态错误对的第一值为状态和错误码的组合 + state_error->second = *static_cast(task)->get_mutable_ctx(); // 设置状态错误对的第二值为Kafka错误码 + series_of(task)->set_context(state_error); // 设置SeriesWork的上下文为状态错误对 + + KafkaTopparList *toppar_list = task->get_resp()->get_toppar_list(); // 获取toppar列表 + + if (task->get_state() == WFT_STATE_SUCCESS && // 如果任务成功 + task->get_resp()->get_api_type() == Kafka_Fetch) // 并且API类型为获取 + { + toppar_list->rewind(); // 重置toppar列表 + KafkaToppar *task_toppar; // task_toppar对象 + + while ((task_toppar = toppar_list->get_next()) != NULL) // 遍历toppar列表 + kafka_process_toppar_offset(task_toppar); // 处理toppar偏移量 + } + + if (task->get_state() == WFT_STATE_SUCCESS) // 如果任务成功 + { + long idx = (long)(task->user_data); // 获取用户数据 + this->result.set_resp(std::move(*task->get_resp()), idx); // 设置响应结果 + } } +// 生成信息 void KafkaClientTask::generate_info() { - if (this->info_generated) - return; - - if (this->config.get_sasl_mech()) - { - const char *username = this->config.get_sasl_username(); - const char *password = this->config.get_sasl_password(); - - this->userinfo.clear(); - if (username) - this->userinfo += StringUtil::url_encode_component(username); - this->userinfo += ":"; - if (password) - this->userinfo += StringUtil::url_encode_component(password); - this->userinfo += ":"; - this->userinfo += this->config.get_sasl_mech(); - this->userinfo += ":"; - this->userinfo += std::to_string((intptr_t)this->member); - } - else - { - char buf[64]; - snprintf(buf, 64, "user:pass:sasl:%p", this->member); - this->userinfo = buf; - } - - const char *hostport = this->url.c_str() + this->member->scheme.size(); - this->url = this->member->scheme + this->userinfo + "@" + hostport; - this->info_generated = true; + if (this->info_generated) // 如果信息已生成 + return; // 直接返回 + + if (this->config.get_sasl_mech()) // 如果配置了SASL机制 + { + const char *username = this->config.get_sasl_username(); // 获取用户名 + const char *password = this->config.get_sasl_password(); // 获取密码 + + this->userinfo.clear(); // 清空用户信息 + if (username) // 如果有用户名 + this->userinfo += StringUtil::url_encode_component(username); // 添加用户名 + this->userinfo += ":"; // 添加冒号分隔符 + if (password) // 如果有密码 + this->userinfo += StringUtil::url_encode_component(password); // 添加密码 + this->userinfo += ":"; // 添加冒号分隔符 + this->userinfo += this->config.get_sasl_mech(); // 添加SASL机制 + this->userinfo += ":"; // 添加冒号分隔符 + this->userinfo += std::to_string((intptr_t)this->member); // 添加成员指针 + } + else // 如果没有配置SASL机制 + { + char buf[64]; // 创建缓冲区 + snprintf(buf, 64, "user:pass:sasl:%p", this->member); // 格式化字符串 + this->userinfo = buf; // 设置用户信息 + } + + const char *hostport = this->url.c_str() + this->member->scheme.size(); // 获取主机和端口 + this->url = this->member->scheme + this->userinfo + "@" + hostport; // 设置URL + this->info_generated = true; // 设置信息已生成标志为true } +// 解析查询字符串 void KafkaClientTask::parse_query() { - auto query_kv = URIParser::split_query_strict(this->query); - int api_type = this->api_type; - for (const auto &kv : query_kv) - { - if (strcasecmp(kv.first.c_str(), "api") == 0 && - api_type == Kafka_Unknown) - { - for (auto& v : kv.second) - { - if (strcasecmp(v.c_str(), "fetch") == 0) - this->api_type = Kafka_Fetch; - else if (strcasecmp(v.c_str(), "produce") == 0) - this->api_type = Kafka_Produce; - else if (strcasecmp(v.c_str(), "commit") == 0) - this->api_type = Kafka_OffsetCommit; - else if (strcasecmp(v.c_str(), "meta") == 0) - this->api_type = Kafka_Metadata; - else if (strcasecmp(v.c_str(), "leavegroup") == 0) - this->api_type = Kafka_LeaveGroup; - else if (strcasecmp(v.c_str(), "listoffsets") == 0) - this->api_type = Kafka_ListOffsets; - } - } - else if (strcasecmp(kv.first.c_str(), "topic") == 0) - { - for (auto& v : kv.second) - this->add_topic(v); - } - } + auto query_kv = URIParser::split_query_strict(this->query); // 使用URIParser函数分割查询字符串 + int api_type = this->api_type; // 获取当前API类型 + + for (const auto &kv : query_kv) // 遍历查询参数键值对 + { + if (strcasecmp(kv.first.c_str(), "api") == 0 && // 如果键是"api"且API类型为未知 + api_type == Kafka_Unknown) + { + for (auto& v : kv.second) // 遍历API类型值 + { + if (strcasecmp(v.c_str(), "fetch") == 0) + this->api_type = Kafka_Fetch; // 设置API类型为获取 + else if (strcasecmp(v.c_str(), "produce") == 0) + this->api_type = Kafka_Produce; // 设置API类型为生产 + else if (strcasecmp(v.c_str(), "commit") == 0) + this->api_type = Kafka_OffsetCommit; // 设置API类型为提交偏移量 + else if (strcasecmp(v.c_str(), "meta") == 0) + this->api_type = Kafka_Metadata; // 设置API类型为元数据 + else if (strcasecmp(v.c_str(), "leavegroup") == 0) + this->api_type = Kafka_LeaveGroup; // 设置API类型为离开组 + else if (strcasecmp(v.c_str(), "listoffsets") == 0) + this->api_type = Kafka_ListOffsets; // 设置API类型为列出偏移量 + } + } + else if (strcasecmp(kv.first.c_str(), "topic") == 0) // 如果键是"topic" + { + for (auto& v : kv.second) // 遍历主题值 + this->add_topic(v); // 添加主题 + } + } } +// 获取元数据状态 bool KafkaClientTask::get_meta_status(KafkaMetaList **uninit_meta_list) { - this->meta_list.rewind(); - KafkaMeta *meta; - std::set unique; - bool status = true; - - while ((meta = this->meta_list.get_next()) != NULL) - { - if (!unique.insert(meta->get_topic()).second) - continue; - - if (!this->member->meta_status[meta->get_topic()]) - { - if (status) - { - *uninit_meta_list = new KafkaMetaList; - status = false; - } - - (*uninit_meta_list)->add_item(*meta); - } - } - - return status; + this->meta_list.rewind(); // 重置元数据列表 + KafkaMeta *meta; // 指向元数据的指针 + std::set unique; // 用于存储唯一主题的集合 + bool status = true; // 状态标志,默认为true + + while ((meta = this->meta_list.get_next()) != NULL) // 遍历元数据列表 + { + if (!unique.insert(meta->get_topic()).second) // 如果主题已存在集合中,跳过 + continue; + + if (!this->member->meta_status[meta->get_topic()]) // 如果元数据状态为false + { + if (status) // 如果之前所有元数据状态为true + { + *uninit_meta_list = new KafkaMetaList; // 创建新的元数据列表 + status = false; // 更新状态为false + } + + (*uninit_meta_list)->add_item(*meta); // 将未初始化的元数据添加到列表 + } + } + + return status; // 返回状态,true表示所有元数据已初始化,false表示有未初始化的元数据 } +// 设置元数据状态 void KafkaClientTask::set_meta_status(bool status) { - this->member->meta_list.rewind(); - KafkaMeta *meta; - while ((meta = this->member->meta_list.get_next()) != NULL) - this->member->meta_status[meta->get_topic()] = false; + this->member->meta_list.rewind(); // 重置成员对象的元数据列表 + KafkaMeta *meta; // 指向元数据的指针 + while ((meta = this->member->meta_list.get_next()) != NULL) // 遍历元数据列表 + this->member->meta_status[meta->get_topic()] = false; // 设置所有主题的元数据状态为false } +// 比较两个任务的主题集合是否相等 bool KafkaClientTask::compare_topics(KafkaClientTask *task) { - auto first1 = topic_set.cbegin(), last1 = topic_set.cend(); - auto first2 = task->topic_set.cbegin(), last2 = task->topic_set.cend(); - int cmp; - - // check whether task->topic_set is a subset of topic_set - while (first1 != last1 && first2 != last2) - { - cmp = first1->compare(*first2); - if (cmp == 0) - { - ++first1; - ++first2; - } - else if (cmp < 0) - ++first1; - else - return false; - } - - return first2 == last2; + auto first1 = topic_set.cbegin(), last1 = topic_set.cend(); // 本任务的主题迭代器 + auto first2 = task->topic_set.cbegin(), last2 = task->topic_set.cend(); // 另一个任务的主题迭代器 + int cmp; + + // 检查task->topic_set是否是topic_set的子集 + while (first1 != last1 && first2 != last2) + { + cmp = first1->compare(*first2); // 比较两个主题 + if (cmp == 0) // 如果主题相同 + { + ++first1; // 移动本任务的主题迭代器 + ++first2; // 移动另一个任务的主题迭代器 + } + else if (cmp < 0) // 如果本任务的主题小于另一个任务的主题 + ++first1; // 移动本任务的主题迭代器 + else // 如果本任务的主题大于另一个任务的主题 + return false; // 返回false,不是子集 + } + + return first2 == last2; // 如果另一个任务的所有主题都已比较过,返回true,表示是子集 } +// 检查消费者组状态 bool KafkaClientTask::check_cgroup() { - KafkaMember *member = this->member; - - if (member->cgroup_outdated && member->cgroup_status != KAFKA_CGROUP_DOING) - { - member->cgroup_outdated = false; - member->cgroup_status = KAFKA_CGROUP_UNINIT; - member->heartbeat_series = NULL; - member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; - } - - if (member->cgroup_status == KAFKA_CGROUP_DOING) - { - WFConditional *cond; - char name[64]; - snprintf(name, 64, "%p.cgroup", this->member); - this->wait_cgroup = true; - cond = WFTaskFactory::create_conditional(name, this, &this->msg); - series_of(this)->push_front(cond); - member->cgroup_wait_cnt++; - return false; - } - - if ((this->api_type == Kafka_Fetch || this->api_type == Kafka_OffsetCommit) && - (member->cgroup_status == KAFKA_CGROUP_UNINIT)) - { - __WFKafkaTask *task; - - task = __WFKafkaTaskFactory::create_kafka_task(this->url, member->ssl_ctx, - this->retry_max, - kafka_cgroup_callback); - task->user_data = this; - task->get_req()->set_config(this->config); - task->get_req()->set_api_type(Kafka_FindCoordinator); - task->get_req()->set_cgroup(member->cgroup); - task->get_req()->set_meta_list(member->meta_list); - series_of(this)->push_front(this); - series_of(this)->push_front(task); - member->cgroup_status = KAFKA_CGROUP_DOING; - member->cgroup_wait_cnt = 0; - return false; - } - - return true; + KafkaMember *member = this->member; // 获取成员对象 + + if (member->cgroup_outdated && member->cgroup_status != KAFKA_CGROUP_DOING) + { + member->cgroup_outdated = false; // 重置消费者组信息过时标志 + member->cgroup_status = KAFKA_CGROUP_UNINIT; // 重置消费者组状态为未初始化 + member->heartbeat_series = NULL; // 重置心跳SeriesWork对象 + member->heartbeat_status = KAFKA_HEARTBEAT_UNINIT; // 重置心跳状态为未初始化 + } + + if (member->cgroup_status == KAFKA_CGROUP_DOING) // 如果消费者组状态为进行中 + { + WFConditional *cond; // 创建条件对象 + char name[64]; // 创建名称缓冲区 + snprintf(name, 64, "%p.cgroup", this->member); // 格式化名称 + this->wait_cgroup = true; // 设置等待消费者组标志为true + cond = WFTaskFactory::create_conditional(name, this, &this->msg); // 创建条件对象 + series_of(this)->push_front(cond); // 将条件对象添加到SeriesWork的前面 + member->cgroup_wait_cnt++; // 增加消费者组等待计数 + return false; // 返回false,表示需要等待消费者组操作完成 + } + + if ((this->api_type == Kafka_Fetch || this->api_type == Kafka_OffsetCommit) && // 如果API类型为获取或提交偏移量 + (member->cgroup_status == KAFKA_CGROUP_UNINIT)) // 并且消费者组状态为未初始化 + { + __WFKafkaTask *task; // 创建一个新的Kafka任务 + + task = __WFKafkaTaskFactory::create_kafka_task(this->url, member->ssl_ctx, // 设置URL和SSL上下文 + this->retry_max, kafka_cgroup_callback); // 设置重试次数和回调函数 + task->user_data = this; // 设置任务的用户数据为当前任务 + task->get_req()->set_config(this->config); // 设置任务的配置 + task->get_req()->set_api_type(Kafka_FindCoordinator); // 设置API类型为查找协调者 + task->get_req()->set_cgroup(member->cgroup); // 设置消费者组信息 + task->get_req()->set_meta_list(member->meta_list); // 设置元数据列表 + series_of(this)->push_front(this); // 将当前任务添加到SeriesWork的前面 + series_of(this)->push_front(task); // 将查找协调者任务添加到SeriesWork的前面 + member->cgroup_status = KAFKA_CGROUP_DOING; // 设置消费者组状态为进行中 + member->cgroup_wait_cnt = 0; // 重置消费者组等待计数 + return false; // 返回false,表示需要执行查找协调者操作 + } + + return true; // 返回true,表示消费者组状态正常,不需要等待或查找协调者 } +// 检查元数据状态 bool KafkaClientTask::check_meta() { - KafkaMember *member = this->member; - KafkaMetaList *uninit_meta_list; - - if (this->get_meta_status(&uninit_meta_list)) - return true; - - if (member->meta_doing) - { - WFConditional *cond; - char name[64]; - snprintf(name, 64, "%p.meta", this->member); - this->wait_cgroup = false; - cond = WFTaskFactory::create_conditional(name, this, &this->msg); - series_of(this)->push_front(cond); - member->meta_wait_cnt++; - } - else - { - __WFKafkaTask *task; - - task = __WFKafkaTaskFactory::create_kafka_task(this->url, member->ssl_ctx, - this->retry_max, - kafka_meta_callback); - task->user_data = this; - task->get_req()->set_config(this->config); - task->get_req()->set_api_type(Kafka_Metadata); - task->get_req()->set_meta_list(*uninit_meta_list); - series_of(this)->push_front(this); - series_of(this)->push_front(task); - member->meta_wait_cnt = 0; - member->meta_doing = true; - } - - delete uninit_meta_list; - return false; + KafkaMember *member = this->member; // 获取成员对象 + KafkaMetaList *uninit_meta_list; // 指向未初始化元数据列表的指针 + + // 检查元数据状态,如果所有元数据都已初始化,则返回true + if (this->get_meta_status(&uninit_meta_list)) + return true; + + // 如果元数据操作正在进行中,则等待 + if (member->meta_doing) + { + WFConditional *cond; // 创建条件对象 + char name[64]; // 创建名称缓冲区 + snprintf(name, 64, "%p.meta", this->member); // 格式化名称 + this->wait_cgroup = false; // 设置等待消费者组标志为false + cond = WFTaskFactory::create_conditional(name, this, &this->msg); // 创建条件对象 + series_of(this)->push_front(cond); // 将条件对象添加到SeriesWork的前面 + member->meta_wait_cnt++; // 增加元数据等待计数 + } + else // 如果元数据操作尚未开始 + { + __WFKafkaTask *task; // 创建一个新的Kafka任务 + + // 创建元数据任务 + task = __WFKafkaTaskFactory::create_kafka_task(this->url, member->ssl_ctx, + this->retry_max, kafka_meta_callback); + task->user_data = this; // 设置任务的用户数据为当前任务 + task->get_req()->set_config(this->config); // 设置任务的配置 + task->get_req()->set_api_type(Kafka_Metadata); // 设置API类型为元数据 + task->get_req()->set_meta_list(*uninit_meta_list); // 设置未初始化的元数据列表 + series_of(this)->push_front(this); // 将当前任务添加到SeriesWork的前面 + series_of(this)->push_front(task); // 将元数据任务添加到SeriesWork的前面 + member->meta_wait_cnt = 0; // 重置元数据等待计数 + member->meta_doing = true; // 设置元数据操作标志为true + } + + delete uninit_meta_list; // 释放未初始化元数据列表对象 + return false; // 返回false,表示需要等待元数据操作完成或开始新的元数据操作 } +// 分派任务,已锁定成员对象的互斥锁 int KafkaClientTask::dispatch_locked() { - KafkaMember *member = this->member; - KafkaBroker *coordinator; - __WFKafkaTask *task; - ParallelWork *parallel; - SeriesWork *series; - - if (this->check_cgroup() == false) - return member->cgroup_wait_cnt > 0; - - if (this->check_meta() == false) - return member->meta_wait_cnt > 0; - - if (arrange_toppar(this->api_type) < 0) - { - this->state = WFT_STATE_TASK_ERROR; - this->error = WFT_ERR_KAFKA_ARRANGE_FAILED; - this->finish = true; - return 0; - } - - if (this->member->cgroup_outdated) - { - series_of(this)->push_front(this); - return 0; - } - - switch(this->api_type) - { - case Kafka_Produce: - if (this->toppar_list_map.size() == 0) - { - this->state = WFT_STATE_TASK_ERROR; - this->error = WFT_ERR_KAFKA_PRODUCE_FAILED; - this->finish = true; - break; - } - - parallel = Workflow::create_parallel_work(kafka_parallel_callback); - this->result.create(this->toppar_list_map.size()); - parallel->set_context(this); - for (auto &v : this->toppar_list_map) - { - auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this, - std::placeholders::_1); - KafkaBroker *broker = get_broker(v.first); - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - broker->get_host(), - broker->get_port(), - member->ssl_ctx, - this->get_userinfo(), - this->retry_max, - std::move(cb)); - task->get_req()->set_config(this->config); - task->get_req()->set_toppar_list(v.second); - task->get_req()->set_broker(*broker); - task->get_req()->set_api_type(Kafka_Produce); - task->user_data = (void *)parallel->size(); - series = Workflow::create_series_work(task, nullptr); - parallel->add_series(series); - } - series_of(this)->push_front(this); - series_of(this)->push_front(parallel); - break; - - case Kafka_Fetch: - if (this->toppar_list_map.size() == 0) - { - this->state = WFT_STATE_TASK_ERROR; - this->error = WFT_ERR_KAFKA_FETCH_FAILED; - this->finish = true; - break; - } - - parallel = Workflow::create_parallel_work(kafka_parallel_callback); - this->result.create(this->toppar_list_map.size()); - parallel->set_context(this); - for (auto &v : this->toppar_list_map) - { - auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this, - std::placeholders::_1); - KafkaBroker *broker = get_broker(v.first); - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - broker->get_host(), - broker->get_port(), - member->ssl_ctx, - this->get_userinfo(), - this->retry_max, - std::move(cb)); - - task->get_req()->set_config(this->config); - task->get_req()->set_toppar_list(v.second); - task->get_req()->set_broker(*broker); - task->get_req()->set_api_type(Kafka_Fetch); - task->user_data = (void *)parallel->size(); - series = Workflow::create_series_work(task, nullptr); - parallel->add_series(series); - } - - series_of(this)->push_front(this); - series_of(this)->push_front(parallel); - break; - - case Kafka_Metadata: - this->finish = true; - break; - - case Kafka_OffsetCommit: - if (!member->cgroup.get_group()) - { - this->state = WFT_STATE_TASK_ERROR; - this->error = WFT_ERR_KAFKA_COMMIT_FAILED; - this->finish = true; - break; - } - - this->result.create(1); - coordinator = member->cgroup.get_coordinator(); - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - coordinator->get_host(), - coordinator->get_port(), - member->ssl_ctx, - this->get_userinfo(), - this->retry_max, - kafka_offsetcommit_callback); - task->user_data = this; - task->get_req()->set_config(this->config); - task->get_req()->set_cgroup(member->cgroup); - task->get_req()->set_broker(*coordinator); - task->get_req()->set_toppar_list(this->toppar_list); - task->get_req()->set_api_type(this->api_type); - series_of(this)->push_front(this); - series_of(this)->push_front(task); - break; - - case Kafka_LeaveGroup: - if (!member->cgroup.get_group()) - { - this->state = WFT_STATE_TASK_ERROR; - this->error = WFT_ERR_KAFKA_LEAVEGROUP_FAILED; - this->finish = true; - break; - } - - coordinator = member->cgroup.get_coordinator(); - if (!coordinator->get_host()) - break; - - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - coordinator->get_host(), - coordinator->get_port(), - member->ssl_ctx, - this->get_userinfo(), 0, - kafka_leavegroup_callback); - task->user_data = this; - task->get_req()->set_config(this->config); - task->get_req()->set_api_type(Kafka_LeaveGroup); - task->get_req()->set_broker(*coordinator); - task->get_req()->set_cgroup(member->cgroup); - series_of(this)->push_front(this); - series_of(this)->push_front(task); - break; - - case Kafka_ListOffsets: - if (this->toppar_list_map.size() == 0) - { - this->state = WFT_STATE_TASK_ERROR; - this->error = WFT_ERR_KAFKA_LIST_OFFSETS_FAILED; - this->finish = true; - break; - } - - parallel = Workflow::create_parallel_work(kafka_parallel_callback); - this->result.create(this->toppar_list_map.size()); - parallel->set_context(this); - for (auto &v : this->toppar_list_map) - { - auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this, - std::placeholders::_1); - KafkaBroker *broker = get_broker(v.first); - task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, - broker->get_host(), - broker->get_port(), - member->ssl_ctx, - this->get_userinfo(), - this->retry_max, - std::move(cb)); - task->get_req()->set_config(this->config); - task->get_req()->set_toppar_list(v.second); - task->get_req()->set_broker(*broker); - task->get_req()->set_api_type(Kafka_ListOffsets); - task->user_data = (void *)parallel->size(); - series = Workflow::create_series_work(task, nullptr); - parallel->add_series(series); - } - series_of(this)->push_front(this); - series_of(this)->push_front(parallel); - break; - - default: - this->state = WFT_STATE_TASK_ERROR; - this->error = WFT_ERR_KAFKA_API_UNKNOWN; - this->finish = true; - break; - } - - return 0; + KafkaMember *member = this->member; // 获取成员对象 + __WFKafkaTask *task; // 声明Kafka任务指针 + ParallelWork *parallel; // 声明并行工作指针 + SeriesWork *series; // 声明序列工作指针 + + // 检查消费者组状态,如果返回false,则等待消费者组操作完成 + if (this->check_cgroup() == false) + return member->cgroup_wait_cnt > 0; + + // 检查元数据状态,如果返回false,则等待元数据操作完成 + if (this->check_meta() == false) + return member->meta_wait_cnt > 0; + + // 安排toppar,如果返回负值,则表示安排失败 + if (arrange_toppar(this->api_type) < 0) + { + this->state = WFT_STATE_TASK_ERROR; + this->error = WFT_ERR_KAFKA_ARRANGE_FAILED; + this->finish = true; + return 0; + } + + // 如果消费者组信息过时,则重新分派当前任务 + if (member->cgroup_outdated) + { + series_of(this)->push_front(this); + return 0; + } + + // 根据API类型分派任务 + switch(this->api_type) + { + case Kafka_Produce: // 生产者API + if (this->toppar_list_map.empty()) // 如果没有toppar + { + this->state = WFT_STATE_TASK_ERROR; + this->error = WFT_ERR_KAFKA_PRODUCE_FAILED; + this->finish = true; + break; + } + + // 创建并行工作以处理多个分区的生产任务 + parallel = Workflow::create_parallel_work(kafka_parallel_callback); + this->result.create(this->toppar_list_map.size()); + parallel->set_context(this); + for (auto &v : this->toppar_list_map) + { + auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this, + std::placeholders::_1); + KafkaBroker *broker = get_broker(v.first); + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + broker->get_host(), + broker->get_port(), + member->ssl_ctx, + this->get_userinfo(), + this->retry_max, + std::move(cb)); + task->get_req()->set_config(this->config); + task->get_req()->set_toppar_list(v.second); + task->get_req()->set_broker(*broker); + task->get_req()->set_api_type(Kafka_Produce); + task->user_data = (void *)parallel->size(); + series = Workflow::create_series_work(task, nullptr); + parallel->add_series(series); + } + series_of(this)->push_front(this); + series_of(this)->push_front(parallel); + break; + + case Kafka_Fetch: // 消费者API + if (this->toppar_list_map.empty()) // 如果没有toppar + { + this->state = WFT_STATE_TASK_ERROR; + this->error = WFT_ERR_KAFKA_FETCH_FAILED; + this->finish = true; + break; + } + + // 创建并行工作以处理多个分区的获取任务 + parallel = Workflow::create_parallel_work(kafka_parallel_callback); + this->result.create(this->toppar_list_map.size()); + parallel->set_context(this); + for (auto &v : this->toppar_list_map) + { + auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this, + std::placeholders::_1); + KafkaBroker *broker = get_broker(v.first); + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + broker->get_host(), + broker->get_port(), + member->ssl_ctx, + this->get_userinfo(), + this->retry_max, + std::move(cb)); + task->get_req()->set_config(this->config); + task->get_req()->set_toppar_list(v.second); + task->get_req()->set_broker(*broker); + task->get_req()->set_api_type(Kafka_Fetch); + task->user_data = (void *)parallel->size(); + series = Workflow::create_series_work(task, nullptr); + parallel->add_series(series); + } + series_of(this)->push_front(this); + series_of(this)->push_front(parallel); + break; + + case Kafka_Metadata: // 元数据API + this->finish = true; + break; + + case Kafka_OffsetCommit: // 偏移量提交API + if (!member->cgroup.get_group()) // 如果没有消费者组 + { + this->state = WFT_STATE_TASK_ERROR; + this->error = WFT_ERR_KAFKA_COMMIT_FAILED; + this->finish = true; + break; + } + + this->result.create(1); + coordinator = member->cgroup.get_coordinator(); + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + coordinator->get_host(), + coordinator->get_port(), + member->ssl_ctx, + this->get_userinfo(), + this->retry_max, + kafka_offsetcommit_callback); + task->user_data = this; + task->get_req()->set_config(this->config); + task->get_req()->set_cgroup(member->cgroup); + task->get_req()->set_broker(*coordinator); + task->get_req()->set_toppar_list(this->toppar_list); + task->get_req()->set_api_type(this->api_type); + series_of(this)->push_front(this); + series_of(this)->push_front(task); + break; + + case Kafka_LeaveGroup: // 离开组API + if (!member->cgroup.get_group()) // 如果没有消费者组 + { + this->state = WFT_STATE_TASK_ERROR; + this->error = WFT_ERR_KAFKA_LEAVEGROUP_FAILED; + this->finish = true; + break; + } + + coordinator = member->cgroup.get_coordinator(); + if (!coordinator->get_host()) // 如果没有协调者主机 + break; + + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + coordinator->get_host(), + coordinator->get_port(), + member->ssl_ctx, + this->get_userinfo(), 0, + kafka_leavegroup_callback); + task->user_data = this; + task->get_req()->set_config(this->config); + task->get_req()->set_api_type(Kafka_LeaveGroup); + task->get_req()->set_broker(*coordinator); + task->get_req()->set_cgroup(member->cgroup); + series_of(this)->push_front(this); + series_of(this)->push_front(task); + break; + + case Kafka_ListOffsets: // 列出偏移量API + if (this->toppar_list_map.empty()) // 如果没有toppar + { + this->state = WFT_STATE_TASK_ERROR; + this->error = WFT_ERR_KAFKA_LIST_OFFSETS_FAILED; + this->finish = true; + break; + } + + // 创建并行工作以处理多个分区的列出偏移量任务 + parallel = Workflow::create_parallel_work(kafka_parallel_callback); + this->result.create(this->toppar_list_map.size()); + parallel->set_context(this); + for (auto &v : this->toppar_list_map) + { + auto cb = std::bind(&KafkaClientTask::kafka_move_task_callback, this, + std::placeholders::_1); + KafkaBroker *broker = get_broker(v.first); + task = __WFKafkaTaskFactory::create_kafka_task(member->transport_type, + broker->get_host(), + broker->get_port(), + member->ssl_ctx, + this->get_userinfo(), + this->retry_max, + std::move(cb)); + task->get_req()->set_config(this->config); + task->get_req()->set_toppar_list(v.second); + task->get_req()->set_broker(*broker); + task->get_req()->set_api_type(Kafka_ListOffsets); + task->user_data = (void *)parallel->size(); + series = Workflow::create_series_work(task, nullptr); + parallel->add_series(series); + } + series_of(this)->push_front(this); + series_of(this)->push_front(parallel); + break; + + default: // 未知API类型 + this->state = WFT_STATE_TASK_ERROR; + this->error = WFT_ERR_KAFKA_API_UNKNOWN; + this->finish = true; + break; + } + + return 0; // 返回0,表示任务已分派 } +// 分派任务 void KafkaClientTask::dispatch() { - if (this->finish) - { - this->subtask_done(); - return; - } - - if (this->msg) - { - KafkaClientTask *task = static_cast(this->msg); - if (this->wait_cgroup || this->compare_topics(task) == true) - { - this->state = task->get_state(); - this->error = task->get_error(); - this->kafka_error = get_kafka_error(); - this->finish = true; - this->subtask_done(); - return; - } - - this->msg = NULL; - } - - if (!this->query.empty()) - this->parse_query(); - - this->generate_info(); - - int flag; - this->member->mutex.lock(); - flag = this->dispatch_locked(); - if (flag) - this->subtask_done(); - this->member->mutex.unlock(); - - if (!flag) - this->subtask_done(); + if (this->finish) // 如果任务已完成 + { + this->subtask_done(); // 执行子任务完成操作 + return; + } + + if (this->msg) // 如果有消息 + { + KafkaClientTask *task = static_cast(this->msg); // 将消息转换为KafkaClientTask对象 + if (this->wait_cgroup || this->compare_topics(task) == true) // 如果等待消费者组或主题比较相等 + { + this->state = task->get_state(); // 设置当前任务状态为子任务状态 + this->error = task->get_error(); // 设置当前任务错误码为子任务错误码 + this->kafka_error = get_kafka_error(); // 获取Kafka错误码 + this->finish = true; // 设置任务完成标志为true + this->subtask_done(); // 执行子任务完成操作 + return; + } + + this->msg = NULL; // 清除消息 + } + + if (!this->query.empty()) // 如果查询字符串不为空 + this->parse_query(); // 解析查询字符串 + + this->generate_info(); // 生成信息 + + int flag; + this->member->mutex.lock(); // 锁定成员对象的互斥锁 + flag = this->dispatch_locked(); // 执行锁定状态下的分派操作 + if (flag) // 如果分派操作返回非零值 + this->subtask_done(); // 执行子任务完成操作 + this->member->mutex.unlock(); // 解锁互斥锁 + + if (!flag) // 如果分派操作返回零值 + this->subtask_done(); // 执行子任务完成操作 } +// 添加主题到任务中 bool KafkaClientTask::add_topic(const std::string& topic) { - bool flag = false; - this->member->mutex.lock(); - - this->topic_set.insert(topic); - this->member->meta_list.rewind(); - KafkaMeta *meta; - while ((meta = this->member->meta_list.get_next()) != NULL) - { - if (meta->get_topic() == topic) - { - flag = true; - break; - } - } - - if (!flag) - { - this->member->meta_status[topic] = false; - - KafkaMeta tmp; - if (!tmp.set_topic(topic)) - { - this->member->mutex.unlock(); - return false; - } - - this->meta_list.add_item(tmp); - this->member->meta_list.add_item(tmp); - - if (this->member->cgroup.get_group()) - this->member->cgroup_outdated = true; - } - else - { - this->meta_list.rewind(); - KafkaMeta *exist; - while ((exist = this->meta_list.get_next()) != NULL) - { - if (strcmp(exist->get_topic(), meta->get_topic()) == 0) - { - this->member->mutex.unlock(); - return true; - } - } - - this->meta_list.add_item(*meta); - } - - this->member->mutex.unlock(); - - return true; + bool flag = false; // 标记是否在元数据列表中找到主题 + this->member->mutex.lock(); // 锁定成员对象的互斥锁 + + this->topic_set.insert(topic); // 将主题添加到任务的主题集合中 + this->member->meta_list.rewind(); // 重置成员对象的元数据列表 + KafkaMeta *meta; // 指向元数据的指针 + while ((meta = this->member->meta_list.get_next()) != NULL) // 遍历元数据列表 + { + if (meta->get_topic() == topic) // 如果找到相同的主题 + { + flag = true; // 设置找到标志为true + break; // 跳出循环 + } + } + + if (!flag) // 如果没有在元数据列表中找到主题 + { + this->member->meta_status[topic] = false; // 设置主题的元数据状态为false + + KafkaMeta tmp; // 创建一个临时元数据对象 + if (!tmp.set_topic(topic)) // 如果设置主题失败 + { + this->member->mutex.unlock(); // 解锁互斥锁 + return false; // 返回false + } + + this->meta_list.add_item(tmp); // 将临时元数据添加到任务的元数据列表中 + this->member->meta_list.add_item(tmp); // 将临时元数据添加到成员对象的元数据列表中 + + if (this->member->cgroup.get_group()) // 如果成员对象有消费者组 + this->member->cgroup_outdated = true; // 设置消费者组信息过时标志为true + } + else // 如果在元数据列表中找到主题 + { + this->meta_list.rewind(); // 重置任务的元数据列表 + KafkaMeta *exist; // 指向已存在的元数据的指针 + while ((exist = this->meta_list.get_next()) != NULL) // 遍历任务的元数据列表 + { + if (strcmp(exist->get_topic(), meta->get_topic()) == 0) // 如果找到相同的主题 + { + this->member->mutex.unlock(); // 解锁互斥锁 + return true; // 返回true + } + } + + this->meta_list.add_item(*meta); // 将找到的元数据添加到任务的元数据列表中 + } + + this->member->mutex.unlock(); // 解锁互斥锁 + return true; // 返回true } +// 添加toppar(topic partition)到任务中 bool KafkaClientTask::add_toppar(const KafkaToppar& toppar) { - if (this->member->cgroup.get_group()) - return false; - - bool flag = false; - this->member->mutex.lock(); - - this->member->meta_list.rewind(); - KafkaMeta *meta; - while ((meta = this->member->meta_list.get_next()) != NULL) - { - if (strcmp(meta->get_topic(), toppar.get_topic()) == 0) - { - flag = true; - break; - } - } - - this->topic_set.insert(toppar.get_topic()); - if (!flag) - { - KafkaMeta tmp; - if (!tmp.set_topic(toppar.get_topic())) - { - this->member->mutex.unlock(); - return false; - } - - KafkaToppar new_toppar; - if (!new_toppar.set_topic_partition(toppar.get_topic(), toppar.get_partition())) - { - this->member->mutex.unlock(); - return false; - } - - new_toppar.set_offset(toppar.get_offset()); - new_toppar.set_offset_timestamp(toppar.get_offset_timestamp()); - new_toppar.set_low_watermark(toppar.get_low_watermark()); - new_toppar.set_high_watermark(toppar.get_high_watermark()); - this->toppar_list.add_item(new_toppar); - - this->meta_list.add_item(tmp); - this->member->meta_list.add_item(tmp); - - if (this->member->cgroup.get_group()) - this->member->cgroup_outdated = true; - } - else - { - this->toppar_list.rewind(); - KafkaToppar *exist; - while ((exist = this->toppar_list.get_next()) != NULL) - { - if (strcmp(exist->get_topic(), toppar.get_topic()) == 0 && - exist->get_partition() == toppar.get_partition()) - { - this->member->mutex.unlock(); - return true; - } - } - - KafkaToppar new_toppar; - if (!new_toppar.set_topic_partition(toppar.get_topic(), toppar.get_partition())) - { - this->member->mutex.unlock(); - return true; - } - - new_toppar.set_offset(toppar.get_offset()); - new_toppar.set_offset_timestamp(toppar.get_offset_timestamp()); - new_toppar.set_low_watermark(toppar.get_low_watermark()); - new_toppar.set_high_watermark(toppar.get_high_watermark()); - this->toppar_list.add_item(new_toppar); - - this->meta_list.add_item(*meta); - } - - this->member->mutex.unlock(); - - return true; + if (this->member->cgroup.get_group()) // 如果成员对象已经有消费者组 + return false; // 不允许添加toppar + + bool flag = false; // 标记是否在元数据列表中找到主题 + this->member->mutex.lock(); // 锁定成员对象的互斥锁 + + this->member->meta_list.rewind(); // 重置成员对象的元数据列表 + KafkaMeta *meta; // 指向元数据的指针 + while ((meta = this->member->meta_list.get_next()) != NULL) // 遍历元数据列表 + { + if (strcmp(meta->get_topic(), toppar.get_topic()) == 0) // 如果找到相同的主题 + { + flag = true; // 设置找到标志为true + break; // 跳出循环 + } + } + + this->topic_set.insert(toppar.get_topic()); // 将主题添加到任务的主题集合中 + if (!flag) // 如果没有在元数据列表中找到主题 + { + KafkaMeta tmp; // 创建一个临时元数据对象 + if (!tmp.set_topic(toppar.get_topic())) // 如果设置主题失败 + { + this->member->mutex.unlock(); // 解锁互斥锁 + return false; // 返回false + } + + KafkaToppar new_toppar; // 创建一个新的toppar对象 + if (!new_toppar.set_topic_partition(toppar.get_topic(), toppar.get_partition())) // 如果设置主题分区失败 + { + this->member->mutex.unlock(); // 解锁互斥锁 + return false; // 返回false + } + + new_toppar.set_offset(toppar.get_offset()); // 设置偏移量 + new_toppar.set_offset_timestamp(toppar.get_offset_timestamp()); // 设置偏移量时间戳 + new_toppar.set_low_watermark(toppar.get_low_watermark()); // 设置低水位 + new_toppar.set_high_watermark(toppar.get_high_watermark()); // 设置高水位 + this->toppar_list.add_item(new_toppar); // 将新的toppar添加到任务的toppar列表中 + + this->meta_list.add_item(tmp); // 将临时元数据添加到任务的元数据列表中 + this->member->meta_list.add_item(tmp); // 将临时元数据添加到成员对象的元数据列表中 + + if (this->member->cgroup.get_group()) // 如果成员对象有消费者组 + this->member->cgroup_outdated = true; // 设置消费者组信息过时标志为true + } + else // 如果在元数据列表中找到主题 + { + this->toppar_list.rewind(); // 重置任务的toppar列表 + KafkaToppar *exist; // 指向已存在的toppar的指针 + while ((exist = this->toppar_list.get_next()) != NULL) // 遍历任务的toppar列表 + { + if (strcmp(exist->get_topic(), toppar.get_topic()) == 0 && // 如果找到相同的主题 + exist->get_partition() == toppar.get_partition()) // 并且分区相同 + { + this->member->mutex.unlock(); // 解锁互斥锁 + return true; // 返回true + } + } + + KafkaToppar new_toppar; // 创建一个新的toppar对象 + if (!new_toppar.set_topic_partition(toppar.get_topic(), toppar.get_partition())) // 如果设置主题分区失败 + { + this->member->mutex.unlock(); // 解锁互斥锁 + return true; // 返回true + } + + new_toppar.set_offset(toppar.get_offset()); // 设置偏移量 + new_toppar.set_offset_timestamp(toppar.get_offset_timestamp()); // 设置偏移量时间戳 + new_toppar.set_low_watermark(toppar.get_low_watermark()); // 设置低水位 + new_toppar.set_high_watermark(toppar.get_high_watermark()); // 设置高水位 + this->toppar_list.add_item(new_toppar); // 将新的toppar添加到任务的toppar列表中 + + this->meta_list.add_item(*meta); // 将找到的元数据添加到任务的元数据列表中 + } + + this->member->mutex.unlock(); // 解锁互斥锁 + return true; // 返回true } -bool KafkaClientTask::add_produce_record(const std::string& topic, - int partition, - KafkaRecord record) +// 添加生产记录到任务中 +bool KafkaClientTask::add_produce_record(const std::string& topic, int partition, KafkaRecord record) { - if (!add_topic(topic)) - return false; - - bool flag = false; - this->toppar_list.rewind(); - KafkaToppar *toppar; - while ((toppar = this->toppar_list.get_next()) != NULL) - { - if (toppar->get_topic() == topic && - toppar->get_partition() == partition) - { - flag = true; - break; - } - } - - if (!flag) - { - KafkaToppar new_toppar; - if (!new_toppar.set_topic_partition(topic, partition)) - return false; - - new_toppar.add_record(std::move(record)); - this->toppar_list.add_item(std::move(new_toppar)); - } - else - toppar->add_record(std::move(record)); - - return true; + if (!add_topic(topic)) // 如果添加主题失败 + return false; + + bool flag = false; // 标记是否找到对应的toppar + this->toppar_list.rewind(); // 重置toppar列表 + KafkaToppar *toppar; // 指向toppar的指针 + while ((toppar = this->toppar_list.get_next()) != NULL) // 遍历toppar列表 + { + if (toppar->get_topic() == topic && // 如果找到相同的主题 + toppar->get_partition() == partition) // 并且分区相同 + { + flag = true; // 设置找到标志为true + break; // 跳出循环 + } + } + + if (!flag) // 如果没有找到对应的toppar + { + KafkaToppar new_toppar; // 创建一个新的toppar对象 + if (!new_toppar.set_topic_partition(topic, partition)) // 如果设置主题分区失败 + return false; + + new_toppar.add_record(std::move(record)); // 添加记录到toppar + this->toppar_list.add_item(std::move(new_toppar)); // 将新的toppar添加到列表 + } + else // 如果找到对应的toppar + toppar->add_record(std::move(record)); // 将记录添加到toppar + + return true; // 返回true } +// 检查是否需要替换toppar static bool check_replace_toppar(KafkaTopparList *toppar_list, KafkaToppar *toppar) { - bool flag = false; - toppar_list->rewind(); - KafkaToppar *exist; - while ((exist = toppar_list->get_next()) != NULL) - { - if (strcmp(exist->get_topic(), toppar->get_topic()) == 0 && - exist->get_partition() == toppar->get_partition()) - { - flag = true; - if (toppar->get_offset() > exist->get_offset()) - { - toppar_list->add_item(std::move(*toppar)); - toppar_list->del_cur(); - delete exist; - return true; - } - } - } - - if (!flag) - { - toppar_list->add_item(std::move(*toppar)); - return true; - } - - return false; + bool flag = false; // 标记是否找到对应的toppar + toppar_list->rewind(); // 重置toppar列表 + KafkaToppar *exist; // 指向已存在的toppar的指针 + while ((exist = toppar_list->get_next()) != NULL) // 遍历toppar列表 + { + if (strcmp(exist->get_topic(), toppar->get_topic()) == 0 && + exist->get_partition() == toppar->get_partition()) // 如果找到相同的主题和分区 + { + flag = true; // 设置找到标志为true + if (toppar->get_offset() > exist->get_offset()) // 如果新toppar的偏移量大于已存在的toppar + { + toppar_list->add_item(std::move(*toppar)); // 将新toppar添加到列表 + toppar_list->del_cur(); // 删除当前toppar + delete exist; // 释放已存在的toppar + return true; // 返回true + } + } + } + + if (!flag) // 如果没有找到对应的toppar + { + toppar_list->add_item(std::move(*toppar)); // 将新toppar添加到列表 + return true; // 返回true + } + + return false; // 返回false } +// 安排toppar int KafkaClientTask::arrange_toppar(int api_type) { - switch(api_type) - { - case Kafka_Produce: - return this->arrange_produce(); + switch(api_type) // 根据API类型 + { + case Kafka_Produce: // 生产者API + return this->arrange_produce(); // 安排生产toppar - case Kafka_Fetch: - return this->arrange_fetch(); + case Kafka_Fetch: // 消费者API + return this->arrange_fetch(); // 安排获取toppar - case Kafka_ListOffsets: - return this->arrange_offset(); + case Kafka_ListOffsets: // 列出偏移量API + return this->arrange_offset(); // 安排列出偏移量toppar - case Kafka_OffsetCommit: - return this->arrange_commit(); + case Kafka_OffsetCommit: // 偏移量提交API + return this->arrange_commit(); // 安排提交toppar - default: - return 0; - } + default: // 未知API类型 + return 0; // 返回0 + } } +// 添加偏移量toppar(topic partition)到任务中 bool KafkaClientTask::add_offset_toppar(const protocol::KafkaToppar& toppar) { - if (!add_topic(toppar.get_topic())) - return false; - - KafkaToppar *exist; - bool found = false; - while ((exist = this->toppar_list.get_next()) != NULL) - { - if (strcmp(exist->get_topic(), toppar.get_topic()) == 0 && - exist->get_partition() == toppar.get_partition()) - { - found = true; - break; - } - } - - if (!found) - { - KafkaToppar toppar_t; - toppar_t.set_topic_partition(toppar.get_topic(), toppar.get_partition()); - this->toppar_list.add_item(std::move(toppar_t)); - } - - return true; + if (!add_topic(toppar.get_topic())) // 如果添加主题失败 + return false; // 返回false + + KafkaToppar *exist; // 指向toppar的指针 + bool found = false; // 标记是否找到对应的toppar + while ((exist = this->toppar_list.get_next()) != NULL) // 遍历toppar列表 + { + if (strcmp(exist->get_topic(), toppar.get_topic()) == 0 && // 如果找到相同的主题 + exist->get_partition() == toppar.get_partition()) // 并且分区相同 + { + found = true; // 设置找到标志为true + break; // 跳出循环 + } + } + + if (!found) // 如果没有找到对应的toppar + { + KafkaToppar toppar_t; // 创建一个新的toppar对象 + toppar_t.set_topic_partition(toppar.get_topic(), toppar.get_partition()); // 设置主题和分区 + this->toppar_list.add_item(std::move(toppar_t)); // 将新的toppar添加到列表 + } + + return true; // 返回true } +// 安排列出偏移量的toppar int KafkaClientTask::arrange_offset() { - this->toppar_list.rewind(); - KafkaToppar *toppar; - while ((toppar = this->toppar_list.get_next()) != NULL) - { - int node_id = get_node_id(toppar); - if (node_id < 0) - return -1; - - if (this->toppar_list_map.find(node_id) == this->toppar_list_map.end()) - this->toppar_list_map[node_id] = (KafkaTopparList()); - - KafkaToppar new_toppar; - if (!new_toppar.set_topic_partition(toppar->get_topic(), toppar->get_partition())) - return -1; - - this->toppar_list_map[node_id].add_item(std::move(new_toppar)); - } - - return 0; + this->toppar_list.rewind(); // 重置toppar列表 + KafkaToppar *toppar; // 指向toppar的指针 + while ((toppar = this->toppar_list.get_next()) != NULL) // 遍历toppar列表 + { + int node_id = get_node_id(toppar); // 获取节点ID + if (node_id < 0) // 如果节点ID无效 + return -1; // 返回错误码 + + // 如果toppar_list_map中没有该节点ID的toppar列表,则创建一个 + if (this->toppar_list_map.find(node_id) == this->toppar_list_map.end()) + this->toppar_list_map[node_id] = KafkaTopparList(); + + KafkaToppar new_toppar; // 创建一个新的toppar对象 + if (!new_toppar.set_topic_partition(toppar->get_topic(), toppar->get_partition())) // 如果设置主题分区失败 + return -1; // 返回错误码 + + // 将新的toppar添加到对应节点ID的toppar列表中 + this->toppar_list_map[node_id].add_item(std::move(new_toppar)); + } + + return 0; // 返回成功码 } +// 安排提交偏移量的toppar int KafkaClientTask::arrange_commit() { - this->toppar_list.rewind(); - KafkaTopparList new_toppar_list; - KafkaToppar *toppar; - while ((toppar = this->toppar_list.get_next()) != NULL) - { - check_replace_toppar(&new_toppar_list, toppar); - } - - this->toppar_list = std::move(new_toppar_list); - return 0; + this->toppar_list.rewind(); // 重置toppar列表 + KafkaTopparList new_toppar_list; // 创建一个新的toppar列表 + KafkaToppar *toppar; // 指向toppar的指针 + while ((toppar = this->toppar_list.get_next()) != NULL) // 遍历toppar列表 + { + check_replace_toppar(&new_toppar_list, toppar); // 检查是否需要替换toppar + } + + this->toppar_list = std::move(new_toppar_list); // 将新的toppar列表赋值给当前的toppar列表 + return 0; // 返回成功码 } +// 安排获取操作的toppar int KafkaClientTask::arrange_fetch() { - this->meta_list.rewind(); - for (auto& topic : topic_set) - { - if (this->member->cgroup.get_group()) - { - this->member->cgroup.assigned_toppar_rewind(); - KafkaToppar *toppar; - while ((toppar = this->member->cgroup.get_assigned_toppar_next()) != NULL) - { - if (topic.compare(toppar->get_topic()) == 0) - { - int node_id = get_node_id(toppar); - if (node_id < 0) - return -1; - - if (this->toppar_list_map.find(node_id) == this->toppar_list_map.end()) - this->toppar_list_map[node_id] = (KafkaTopparList()); - - KafkaToppar new_toppar; - if (!new_toppar.set_topic_partition(toppar->get_topic(), toppar->get_partition())) - return -1; - - new_toppar.set_offset(toppar->get_offset()); - new_toppar.set_low_watermark(toppar->get_low_watermark()); - new_toppar.set_high_watermark(toppar->get_high_watermark()); - this->toppar_list_map[node_id].add_item(std::move(new_toppar)); - } - } - } - else - { - this->toppar_list.rewind(); - KafkaToppar *toppar; - while ((toppar = this->toppar_list.get_next()) != NULL) - { - if (topic.compare(toppar->get_topic()) == 0) - { - int node_id = get_node_id(toppar); - if (node_id < 0) - return -1; - - if (this->toppar_list_map.find(node_id) == this->toppar_list_map.end()) - this->toppar_list_map[node_id] = KafkaTopparList(); - - KafkaToppar new_toppar; - if (!new_toppar.set_topic_partition(toppar->get_topic(), toppar->get_partition())) - return -1; - - new_toppar.set_offset(toppar->get_offset()); - new_toppar.set_offset_timestamp(toppar->get_offset_timestamp()); - new_toppar.set_low_watermark(toppar->get_low_watermark()); - new_toppar.set_high_watermark(toppar->get_high_watermark()); - this->toppar_list_map[node_id].add_item(std::move(new_toppar)); - } - } - } - } - - return 0; + this->meta_list.rewind(); // 重置元数据列表 + for (auto& topic : topic_set) // 遍历主题集合 + { + if (this->member->cgroup.get_group()) // 如果成员对象有消费者组 + { + this->member->cgroup.assigned_toppar_rewind(); // 重置分配的toppar列表 + KafkaToppar *toppar; // 指向toppar的指针 + while ((toppar = this->member->cgroup.get_assigned_toppar_next()) != NULL) // 遍历分配的toppar列表 + { + if (topic.compare(toppar->get_topic()) == 0) // 如果找到相同的主题 + { + int node_id = get_node_id(toppar); // 获取节点ID + if (node_id < 0) // 如果节点ID无效 + return -1; // 返回错误码 + + // 如果toppar_list_map中没有该节点ID的toppar列表,则创建一个 + if (this->toppar_list_map.find(node_id) == this->toppar_list_map.end()) + this->toppar_list_map[node_id] = KafkaTopparList(); + + KafkaToppar new_toppar; // 创建一个新的toppar对象 + if (!new_toppar.set_topic_partition(toppar->get_topic(), toppar->get_partition())) // 如果设置主题分区失败 + return -1; // 返回错误码 + + new_toppar.set_offset(toppar->get_offset()); // 设置偏移量 + new_toppar.set_low_watermark(toppar->get_low_watermark()); // 设置低水位 + new_toppar.set_high_watermark(toppar->get_high_watermark()); // 设置高水位 + this->toppar_list_map[node_id].add_item(std::move(new_toppar)); // 将新的toppar添加到对应节点ID的toppar列表中 + } + } + } + else // 如果成员对象没有消费者组 + { + this->toppar_list.rewind(); // 重置toppar列表 + KafkaToppar *toppar; // 指向toppar的指针 + while ((toppar = this->toppar_list.get_next()) != NULL) // 遍历toppar列表 + { + if (topic.compare(toppar->get_topic()) == 0) // 如果找到相同的主题 + { + int node_id = get_node_id(toppar); // 获取节点ID + if (node_id < 0) // 如果节点ID无效 + return -1; // 返回错误码 + + // 如果toppar_list_map中没有该节点ID的toppar列表,则创建一个 + if (this->toppar_list_map.find(node_id) == this->toppar_list_map.end()) + this->toppar_list_map[node_id] = KafkaTopparList(); + + KafkaToppar new_toppar; // 创建一个新的toppar对象 + if (!new_toppar.set_topic_partition(toppar->get_topic(), toppar->get_partition())) // 如果设置主题分区失败 + return -1; // 返回错误码 + + new_toppar.set_offset(toppar->get_offset()); // 设置偏移量 + new_toppar.set_offset_timestamp(toppar->get_offset_timestamp()); // 设置偏移量时间戳 + new_toppar.set_low_watermark(toppar->get_low_watermark()); // 设置低水位 + new_toppar.set_high_watermark(toppar->get_high_watermark()); // 设置高水位 + this->toppar_list_map[node_id].add_item(std::move(new_toppar)); // 将新的toppar添加到对应节点ID的toppar列表中 + } + } + } + } + + return 0; // 返回成功码 } +// 安排生产操作的toppar int KafkaClientTask::arrange_produce() { - this->toppar_list.rewind(); - KafkaToppar *toppar; - while ((toppar = this->toppar_list.get_next()) != NULL) - { - if (toppar->get_partition() < 0) - { - toppar->record_rewind(); - KafkaRecord *record; - while ((record = toppar->get_record_next()) != NULL) - { - int partition_num; - const KafkaMeta *meta; - meta = get_meta(toppar->get_topic(), &this->member->meta_list); - if (!meta) - return -1; - - partition_num = meta->get_partition_elements(); - if (partition_num <= 0) - return -1; - - int partition = -1; - if (this->partitioner) - { - const void *key; - size_t key_len; - record->get_key(&key, &key_len); - partition = this->partitioner(toppar->get_topic(), key, - key_len, partition_num); - } - else - partition = rand() % partition_num; - - KafkaToppar *new_toppar = get_toppar(toppar->get_topic(), - partition, - &this->toppar_list); - if (!new_toppar) - { - KafkaToppar tmp; - if (!tmp.set_topic_partition(toppar->get_topic(), partition)) - return -1; - - new_toppar = this->toppar_list.add_item(std::move(tmp)); - } - - record->get_raw_ptr()->toppar = new_toppar->get_raw_ptr(); - new_toppar->add_record(std::move(*record)); - toppar->del_record_cur(); - delete record; - } - this->toppar_list.del_cur(); - delete toppar; - } - else - { - KafkaRecord *record; - while ((record = toppar->get_record_next()) != NULL) - record->get_raw_ptr()->toppar = toppar->get_raw_ptr(); - } - } - - this->toppar_list.rewind(); - KafkaTopparList toppar_list; - while ((toppar = this->toppar_list.get_next()) != NULL) - { - int node_id = get_node_id(toppar); - if (node_id < 0) - return -1; - - if (this->toppar_list_map.find(node_id) == this->toppar_list_map.end()) - this->toppar_list_map[node_id] = KafkaTopparList(); - - this->toppar_list_map[node_id].add_item(std::move(*toppar)); - } - - return 0; + this->toppar_list.rewind(); // 重置toppar列表 + KafkaToppar *toppar; // 指向toppar的指针 + while ((toppar = this->toppar_list.get_next()) != NULL) // 遍历toppar列表 + { + if (toppar->get_partition() < 0) // 如果分区号为负数,需要计算分区 + { + toppar->record_rewind(); // 重置记录列表 + KafkaRecord *record; // 指向记录的指针 + while ((record = toppar->get_record_next()) != NULL) // 遍历记录列表 + { + int partition_num; // 分区数量 + const KafkaMeta *meta; // 指向元数据的指针 + meta = get_meta(toppar->get_topic(), &this->member->meta_list); // 获取主题的元数据 + if (!meta) // 如果元数据为空 + return -1; // 返回错误码 + + partition_num = meta->get_partition_elements(); // 获取分区数量 + if (partition_num <= 0) // 如果分区数量不合法 + return -1; // 返回错误码 + + int partition = -1; // 分区号 + if (this->partitioner) // 如果有分区函数 + { + const void *key; // 键值 + size_t key_len; // 键值长度 + record->get_key(&key, &key_len); // 获取记录的键值和长度 + partition = this->partitioner(toppar->get_topic(), key, key_len, partition_num); // 计算分区号 + } + else + partition = rand() % partition_num; // 使用随机分区号 + + KafkaToppar *new_toppar = get_toppar(toppar->get_topic(), partition, &this->toppar_list); // 获取或创建新的toppar + if (!new_toppar) // 如果新的toppar为空 + { + KafkaToppar tmp; // 创建临时toppar + if (!tmp.set_topic_partition(toppar->get_topic(), partition)) // 如果设置主题分区失败 + return -1; // 返回错误码 + + new_toppar = this->toppar_list.add_item(std::move(tmp)); // 将临时toppar添加到列表 + } + + record->get_raw_ptr()->toppar = new_toppar->get_raw_ptr(); // 更新记录的toppar指针 + new_toppar->add_record(std::move(*record)); // 将记录添加到新的toppar + toppar->del_record_cur(); // 删除当前记录 + delete record; // 释放记录 + } + this->toppar_list.del_cur(); // 删除当前toppar + delete toppar; // 释放toppar + } + else // 如果分区号已指定 + { + KafkaRecord *record; // 指向记录的指针 + while ((record = toppar->get_record_next()) != NULL) // 遍历记录列表 + record->get_raw_ptr()->toppar = toppar->get_raw_ptr(); // 更新记录的toppar指针 + } + } + + this->toppar_list.rewind(); // 重置toppar列表 + KafkaTopparList toppar_list; // 创建一个新的toppar列表 + while ((toppar = this->toppar_list.get_next()) != NULL) // 遍历toppar列表 + { + int node_id = get_node_id(toppar); // 获取节点ID + if (node_id < 0) // 如果节点ID无效 + return -1; // 返回错误码 + + // 如果toppar_list_map中没有该节点ID的toppar列表,则创建一个 + if (this->toppar_list_map.find(node_id) == this->toppar_list_map.end()) + this->toppar_list_map[node_id] = KafkaTopparList(); + + this->toppar_list_map[node_id].add_item(std::move(*toppar)); // 将toppar添加到对应节点ID的toppar列表中 + } + + return 0; // 返回成功码 } +// 完成WFKafkaTask任务并执行回调 SubTask *WFKafkaTask::done() { - SeriesWork *series = series_of(this); - - auto cb = [] (WFTimerTask *task) - { - WFKafkaTask *kafka_task = (WFKafkaTask *)task->user_data; - if (kafka_task->callback) - kafka_task->callback(kafka_task); - - delete kafka_task; - }; - - if (finish) - { - if (this->state == WFT_STATE_TASK_ERROR) - { - WFTimerTask *timer; - timer = WFTaskFactory::create_timer_task(0, 0, std::move(cb)); - timer->user_data = this; - series->push_front(timer); - } - else - { - if (this->callback) - this->callback(this); - - delete this; - } - } - - return series->pop(); + SeriesWork *series = series_of(this); // 获取与当前任务关联的SeriesWork对象 + + // 定义一个lambda函数作为定时器任务的回调,用于处理任务完成时的回调 + auto cb = [] (WFTimerTask *task) + { + WFKafkaTask *kafka_task = (WFKafkaTask *)task->user_data; // 将定时器任务的用户数据转换为WFKafkaTask对象 + if (kafka_task->callback) // 如果有回调函数 + kafka_task->callback(kafka_task); // 执行回调函数 + + delete kafka_task; // 释放WFKafkaTask对象 + }; + + if (finish) // 如果任务已完成 + { + if (this->state == WFT_STATE_TASK_ERROR) // 如果任务状态为错误 + { + WFTimerTask *timer; // 创建一个定时器任务 + timer = WFTaskFactory::create_timer_task(0, 0, std::move(cb)); // 设置定时器任务的回调为上面定义的lambda函数 + timer->user_data = this; // 设置定时器任务的用户数据为当前任务 + series->push_front(timer); // 将定时器任务添加到SeriesWork的前面 + } + else // 如果任务状态不是错误 + { + if (this->callback) // 如果有回调函数 + this->callback(this); // 执行回调函数 + + delete this; // 释放当前任务 + } + } + + return series->pop(); // 返回SeriesWork中弹出的任务 } +// 初始化WFKafkaClient int WFKafkaClient::init(const std::string& broker, SSL_CTX *ssl_ctx) { - std::vector broker_hosts; - std::string::size_type ppos = 0; - std::string::size_type pos; - bool use_ssl; - - use_ssl = (strncasecmp(broker.c_str(), "kafkas://", 9) == 0); - while (1) - { - pos = broker.find(',', ppos); - std::string host = broker.substr(ppos, pos - ppos); - if (use_ssl) - { - if (strncasecmp(host.c_str(), "kafkas://", 9) != 0) - { - errno = EINVAL; - return -1; - } - } - else if (strncasecmp(host.c_str(), "kafka://", 8) != 0) - { - if (strncasecmp(host.c_str(), "kafkas://", 9) == 0) - { - errno = EINVAL; - return -1; - } - - host = "kafka://" + host; - } - - broker_hosts.emplace_back(host); - if (pos == std::string::npos) - break; - - ppos = pos + 1; - } - - this->member = new KafkaMember; - this->member->broker_hosts = std::move(broker_hosts); - this->member->ssl_ctx = ssl_ctx; - if (use_ssl) - { - this->member->transport_type = TT_TCP_SSL; - this->member->scheme = "kafkas://"; - } - - return 0; + std::vector broker_hosts; // 存储代理主机列表 + std::string::size_type ppos = 0; // 字符串查找的起始位置 + std::string::size_type pos; // 字符串查找的位置 + bool use_ssl; // 是否使用SSL + + // 检查broker URL是否使用SSL + use_ssl = (strncasecmp(broker.c_str(), "kafkas://", 9) == 0); + while (1) + { + pos = broker.find(',', ppos); // 查找逗号分隔符 + std::string host = broker.substr(ppos, pos - ppos); // 提取主机字符串 + if (use_ssl) // 如果使用SSL + { + if (strncasecmp(host.c_str(), "kafkas://", 9) != 0) // 检查主机字符串是否以"kafkas://"开头 + { + errno = EINVAL; // 设置错误码为EINVAL + return -1; // 返回错误 + } + } + else // 如果不使用SSL + { + if (strncasecmp(host.c_str(), "kafka://", 8) != 0) // 检查主机字符串是否以"kafka://"开头 + { + if (strncasecmp(host.c_str(), "kafkas://", 9) == 0) // 如果主机字符串以"kafkas://"开头,但不应使用SSL + { + errno = EINVAL; // 设置错误码为EINVAL + return -1; // 返回错误 + } + + host = "kafka://" + host; // 为主机字符串添加"kafka://"前缀 + } + } + + broker_hosts.emplace_back(host); // 将主机字符串添加到列表 + if (pos == std::string::npos) // 如果没有找到逗号分隔符 + break; // 跳出循环 + + ppos = pos + 1; // 更新查找的起始位置 + } + + this->member = new KafkaMember; // 创建KafkaMember对象 + this->member->broker_hosts = std::move(broker_hosts); // 设置代理主机列表 + this->member->ssl_ctx = ssl_ctx; // 设置SSL上下文 + if (use_ssl) // 如果使用SSL + { + this->member->transport_type = TT_TCP_SSL; // 设置传输类型为SSL + this->member->scheme = "kafkas://"; // 设置协议方案为"kafkas://" + } + + return 0; // 返回成功 } -int WFKafkaClient::init(const std::string& broker, const std::string& group, - SSL_CTX *ssl_ctx) +// 初始化WFKafkaClient,包括代理服务器、消费者组和SSL上下文 +int WFKafkaClient::init(const std::string& broker, const std::string& group, SSL_CTX *ssl_ctx) { - if (this->init(broker, ssl_ctx) < 0) - return -1; + if (this->init(broker, ssl_ctx) < 0) // 调用init函数初始化代理服务器和SSL上下文 + return -1; // 如果初始化失败,返回错误码 - this->member->cgroup.set_group(group); - this->member->cgroup_status = KAFKA_CGROUP_UNINIT; - return 0; + this->member->cgroup.set_group(group); // 设置消费者组名称 + this->member->cgroup_status = KAFKA_CGROUP_UNINIT; // 设置消费者组状态为未初始化 + return 0; // 返回成功码 } +// 反初始化WFKafkaClient int WFKafkaClient::deinit() { - this->member->mutex.lock(); - this->member->client_deinit = true; - this->member->mutex.unlock(); - this->member->decref(); - return 0; + this->member->mutex.lock(); // 锁定成员对象的互斥锁 + this->member->client_deinit = true; // 设置客户端去初始化标志为true + this->member->mutex.unlock(); // 解锁互斥锁 + this->member->decref(); // 减少成员对象的引用计数 + return 0; // 返回成功码 } -WFKafkaTask *WFKafkaClient::create_kafka_task(const std::string& query, - int retry_max, - kafka_callback_t cb) +// 创建Kafka任务 +WFKafkaTask *WFKafkaClient::create_kafka_task(const std::string& query, int retry_max, kafka_callback_t cb) { - WFKafkaTask *task = new KafkaClientTask(query, retry_max, std::move(cb), this); - return task; + WFKafkaTask *task = new KafkaClientTask(query, retry_max, std::move(cb), this); // 创建KafkaClientTask对象 + return task; // 返回任务对象 } -WFKafkaTask *WFKafkaClient::create_kafka_task(int retry_max, - kafka_callback_t cb) +// 创建Kafka任务,无查询字符串 +WFKafkaTask *WFKafkaClient::create_kafka_task(int retry_max, kafka_callback_t cb) { - WFKafkaTask *task = new KafkaClientTask("", retry_max, std::move(cb), this); - return task; + WFKafkaTask *task = new KafkaClientTask("", retry_max, std::move(cb), this); // 创建KafkaClientTask对象 + return task; // 返回任务对象 } -WFKafkaTask *WFKafkaClient::create_leavegroup_task(int retry_max, - kafka_callback_t cb) +// 创建离开组任务 +WFKafkaTask *WFKafkaClient::create_leavegroup_task(int retry_max, kafka_callback_t cb) { - WFKafkaTask *task = new KafkaClientTask("api=leavegroup", retry_max, - std::move(cb), this); - return task; + WFKafkaTask *task = new KafkaClientTask("api=leavegroup", retry_max, std::move(cb), this); // 创建KafkaClientTask对象,用于离开组操作 + return task; // 返回任务对象 } +// 设置Kafka配置 void WFKafkaClient::set_config(protocol::KafkaConfig conf) { - this->member->config = std::move(conf); + this->member->config = std::move(conf); // 将配置移动给成员对象 } +// 获取元数据列表 KafkaMetaList *WFKafkaClient::get_meta_list() { - return &this->member->meta_list; + return &this->member->meta_list; // 返回成员对象的元数据列表 } diff --git a/src/client/WFMySQLConnection.h b/src/client/WFMySQLConnection.h index 3f45228..271874d 100644 --- a/src/client/WFMySQLConnection.h +++ b/src/client/WFMySQLConnection.h @@ -26,64 +26,81 @@ #include "URIParser.h" #include "WFTaskFactory.h" +// 定义WFMySQLConnection类,用于管理MySQL数据库连接 class WFMySQLConnection { public: - /* example: mysql://username:passwd@127.0.0.1/dbname?character_set=utf8 - * IP string is recommmended in url. When using a domain name, the first - * address resovled will be used. Don't use upstream name as a host. */ + // 初始化函数,接受一个包含连接信息的URL字符串 + // 例如:mysql://username:passwd@127.0.0.1/dbname?character_set=utf8 + // 推荐使用IP地址而不是域名,因为域名会被解析为第一个地址,并且不建议使用上游名称作为主机名 int init(const std::string& url) { + // 调用另一个init函数,传入URL和一个NULL的SSL上下文 return this->init(url, NULL); } + // 重载的init函数,接受URL和一个可选的SSL上下文 int init(const std::string& url, SSL_CTX *ssl_ctx); + // 反初始化函数,当前为空实现 void deinit() { } public: + // 创建一个查询任务,接受一个SQL查询字符串和一个回调函数 WFMySQLTask *create_query_task(const std::string& query, mysql_callback_t callback) { + // 使用WFTaskFactory创建一个新的MySQL任务,传入连接URI、超时时间和回调函数 WFMySQLTask *task = WFTaskFactory::create_mysql_task(this->uri, 0, std::move(callback)); + // 设置任务的SSL上下文 this->set_ssl_ctx(task); + // 设置任务的查询字符串 task->get_req()->set_query(query); + // 返回创建的任务 return task; } - /* If you don't disconnect manually, the TCP connection will be - * kept alive after this object is deleted, and maybe reused by - * another WFMySQLConnection object with same id and url. */ + // 创建一个断开连接的任务,接受一个回调函数 + // 如果不手动断开连接,TCP连接将在对象删除后保持活动,并可能被具有相同ID和URL的另一个WFMySQLConnection对象重用 WFMySQLTask *create_disconnect_task(mysql_callback_t callback) { + // 创建一个空查询的任务,以此作为断开连接的任务 WFMySQLTask *task = this->create_query_task("", std::move(callback)); + // 设置任务的SSL上下文 this->set_ssl_ctx(task); + // 设置任务不保持活动状态,暗示这是一个断开连接的任务 task->set_keep_alive(0); + // 返回创建的任务 return task; } protected: + // 设置任务的SSL上下文 void set_ssl_ctx(WFMySQLTask *task) const { + // 使用别名简化类型名称 using MySQLRequest = protocol::MySQLRequest; using MySQLResponse = protocol::MySQLResponse; + // 将任务强制转换为WFComplexClientTask类型 auto *t = (WFComplexClientTask *)task; - /* 'ssl_ctx' can be NULL and will use default. */ + // 设置任务的SSL上下文,如果为NULL则使用默认 t->set_ssl_ctx(this->ssl_ctx); } protected: + // 解析后的URI,用于存储连接信息 ParsedURI uri; + // SSL上下文,用于加密通信 SSL_CTX *ssl_ctx; + // 连接ID,确保并发连接具有不同的ID int id; public: - /* Make sure that concurrent connections have different id. - * When a connection object is deleted, id can be reused. */ + // 确保并发连接的ID不同,当连接对象被删除时,ID可以被重用 WFMySQLConnection(int id) { this->id = id; } + // 析构函数,当前为空实现 virtual ~WFMySQLConnection() { } }; #endif - diff --git a/src/client/WFRedisSubscriber.h b/src/client/WFRedisSubscriber.h index c16ca60..a8cb8fb 100644 --- a/src/client/WFRedisSubscriber.h +++ b/src/client/WFRedisSubscriber.h @@ -30,211 +30,188 @@ #include "WFTask.h" #include "WFTaskFactory.h" +// 定义WFRedisSubscribeTask类,它继承自WFGenericTask class WFRedisSubscribeTask : public WFGenericTask { public: - /* Note: Call 'get_resp()' only in the 'extract' function or - before the task is started to set response size limit. */ - protocol::RedisResponse *get_resp() - { - return this->task->get_resp(); - } + // 获取与任务关联的Redis响应对象 + // 注意:只能在'extract'函数中或在任务开始之前调用'get_resp()'来设置响应大小限制。 + protocol::RedisResponse *get_resp() + { + return this->task->get_resp(); + } public: - /* User needs to call 'release()' exactly once, anywhere. */ - void release() - { - if (this->flag.exchange(true)) - delete this; - } + // 用户需要恰好调用一次'release()'函数,可以在任何地方调用。 + void release() + { + // 使用原子操作来检查并设置标志,如果标志已经是true,则不执行删除操作 + // 这样可以防止多次释放同一个对象 + if (this->flag.exchange(true)) + delete this; + } + + // ... (省略了其他Redis命令的函数,如subscribe, unsubscribe等) + // 这些函数通过发送Redis命令来执行订阅、取消订阅等操作 public: - /* Note: After 'release()' is called, all the requesting functions - should not be called except in 'extract', because the task - point may have been deleted because 'callback' finished. */ - - int subscribe(const std::vector& channels) - { - return this->sync_send("SUBSCRIBE", channels); - } - - int unsubscribe(const std::vector& channels) - { - return this->sync_send("UNSUBSCRIBE", channels); - } - - int unsubscribe() - { - return this->sync_send("UNSUBSCRIBE", { }); - } - - int psubscribe(const std::vector& patterns) - { - return this->sync_send("PSUBSCRIBE", patterns); - } - - int punsubscribe(const std::vector& patterns) - { - return this->sync_send("PUNSUBSCRIBE", patterns); - } - - int punsubscribe() - { - return this->sync_send("PUNSUBSCRIBE", { }); - } - - int ping(const std::string& message) - { - return this->sync_send("PING", { message }); - } - - int ping() - { - return this->sync_send("PING", { }); - } - - int quit() - { - return this->sync_send("QUIT", { }); - } + // 设置与消息接收相关的超时时间 + // 这些函数只能在任务开始之前或在'extract'中调用 + + // 设置等待每个消息的超时时间。非常有用。如果不设置,则最大等待时间将是全局的'response_timeout' + void set_watch_timeout(int timeout) + { + this->task->set_watch_timeout(timeout); + } + + // 设置接收完整消息的超时时间。 + void set_recv_timeout(int timeout) + { + this->task->set_receive_timeout(timeout); + } + + // 设置发送第一个订阅请求的超时时间。 + void set_send_timeout(int timeout) + { + this->task->set_send_timeout(timeout); + } + + // 设置保持连接活跃的超时时间。默认值为0。如果你想保持连接活跃,确保在所有频道/模式取消订阅后不再发送任何请求。 + void set_keep_alive(int timeout) + { + this->task->set_keep_alive(timeout); + } public: - /* All 'timeout' proxy functions can only be called only before - the task is started or in 'extract'. */ + // 设置提取函数或回调函数,这些函数只能在任务开始之前或在'extract'中调用 - /* Timeout of waiting for each message. Very useful. If not set, - the max waiting time will be the global 'response_timeout'*/ - void set_watch_timeout(int timeout) - { - this->task->set_watch_timeout(timeout); - } - - /* Timeout of receiving a complete message. */ - void set_recv_timeout(int timeout) - { - this->task->set_receive_timeout(timeout); - } - - /* Timeout of sending the first subscribe request. */ - void set_send_timeout(int timeout) - { - this->task->set_send_timeout(timeout); - } - - /* The default keep alive timeout is 0. If you want to keep - the connection alive, make sure not to send any request - after all channels/patterns were unsubscribed. */ - void set_keep_alive(int timeout) - { - this->task->set_keep_alive(timeout); - } - -public: - /* Call 'set_extract' or 'set_callback' only before the task - is started, or in 'extract'. */ + // 设置提取函数,当任务完成时,这个函数将被调用以处理结果 + void set_extract(std::function ex) + { + this->extract = std::move(ex); + } - void set_extract(std::function ex) - { - this->extract = std::move(ex); - } - - void set_callback(std::function cb) - { - this->callback = std::move(cb); - } + // 设置回调函数,当任务完成时(可能是在提取函数之后),这个函数将被调用 + void set_callback(std::function cb) + { + this->callback = std::move(cb); + } protected: - virtual void dispatch() - { - series_of(this)->push_front(this->task); - this->subtask_done(); - } - - virtual SubTask *done() - { - return series_of(this)->pop(); - } + // 虚拟的dispatch函数,用于将任务添加到执行序列中 + virtual void dispatch() + { + series_of(this)->push_front(this->task); + this->subtask_done(); + } + + // 虚拟的done函数,用于从执行序列中弹出任务并返回 + virtual SubTask *done() + { + return series_of(this)->pop(); + } protected: - int sync_send(const std::string& command, - const std::vector& params); - static void task_extract(WFRedisTask *task); - static void task_callback(WFRedisTask *task); + // 同步发送Redis命令的函数,它封装了与Redis服务器的通信细节 + int sync_send(const std::string& command, + const std::vector& params); -protected: - WFRedisTask *task; - std::mutex mutex; - std::atomic flag; - std::function extract; - std::function callback; + // 静态的提取函数和回调函数,它们被设计为与Redis任务交互的接口 + static void task_extract(WFRedisTask *task); + static void task_callback(WFRedisTask *task); protected: - WFRedisSubscribeTask(WFRedisTask *task, - std::function&& ex, - std::function&& cb) : - flag(false), - extract(std::move(ex)), - callback(std::move(cb)) - { - task->user_data = this; - this->task = task; - } - - virtual ~WFRedisSubscribeTask() - { - if (this->task) - this->task->dismiss(); - } + // 指向底层Redis任务的指针 + WFRedisTask *task; + // 用于同步访问的互斥锁 + std::mutex mutex; + // 原子标志,用于跟踪对象是否已被释放 + std::atomic flag; + // 提取函数和回调函数,当任务完成时它们将被调用 + std::function extract; + std::function callback; - friend class WFRedisSubscriber; +protected: + // 构造函数,接受一个Redis任务、提取函数和回调函数 + WFRedisSubscribeTask(WFRedisTask *task, + std::function&& ex, + std::function&& cb) : + flag(false), + extract(std::move(ex)), + callback(std::move(cb)) + { + // 将用户数据指针设置为当前对象,以便在回调中可以访问它 + task->user_data = this; + this->task = task; + } + + // 析构函数,释放与任务关联的资源 + virtual ~WFRedisSubscribeTask() + { + if (this->task) + this->task->dismiss(); // 可能是取消任务或释放相关资源 + } + + // 允许WFRedisSubscriber类访问受保护的成员 + friend class WFRedisSubscriber; }; +// 定义一个名为 WFRedisSubscriber 的类 class WFRedisSubscriber { public: + // 重载的 init 方法,只接受一个 URL 参数,内部调用另一个 init 方法并传入 NULL 作为 SSL 上下文 int init(const std::string& url) { - return this->init(url, NULL); + return this->init(url, NULL); // 调用重载的 init 方法,传入 URL 和 NULL 作为 SSL 上下文 } - int init(const std::string& url, SSL_CTX *ssl_ctx); + // init 方法,接受一个 URL 和一个可选的 SSL 上下文参数 + int init(const std::string& url, SSL_CTX *ssl_ctx); // 此方法的具体实现在代码段之外 + // 释放资源的 deinit 方法,当前为空实现 void deinit() { } public: + // 定义提取任务数据的函数类型 using extract_t = std::function; + // 定义任务回调的函数类型 using callback_t = std::function; public: - WFRedisSubscribeTask * - create_subscribe_task(const std::vector& channels, - extract_t extract, callback_t callback); + // 创建一个订阅任务,接受频道列表、提取函数和回调函数 + WFRedisSubscribeTask *create_subscribe_task(const std::vector& channels, + extract_t extract, callback_t callback); // 方法的具体实现在代码段之外 - WFRedisSubscribeTask * - create_psubscribe_task(const std::vector& patterns, - extract_t extract, callback_t callback); + // 创建一个模式订阅任务,接受模式列表、提取函数和回调函数 + WFRedisSubscribeTask *create_psubscribe_task(const std::vector& patterns, + extract_t extract, callback_t callback); // 方法的具体实现在代码段之外 protected: + // 为任务设置 SSL 上下文 void set_ssl_ctx(WFRedisTask *task) const { + // 使用别名简化类型名称 using RedisRequest = protocol::RedisRequest; using RedisResponse = protocol::RedisResponse; + // 将传入的 WFRedisTask 强制转换为 WFComplexClientTask 类型 auto *t = (WFComplexClientTask *)task; - /* 'ssl_ctx' can be NULL and will use default. */ + // 为任务设置 SSL 上下文,如果 ssl_ctx 为 NULL,则使用默认设置 t->set_ssl_ctx(this->ssl_ctx); } protected: + // 创建一个 Redis 任务,接受命令和参数列表 WFRedisTask *create_redis_task(const std::string& command, - const std::vector& params); + const std::vector& params); // 方法的具体实现在代码段之外 protected: + // 存储解析后的 URI,用于连接 Redis 服务器 ParsedURI uri; + // 存储 SSL 上下文,用于加密通信(如果需要) SSL_CTX *ssl_ctx; public: virtual ~WFRedisSubscriber() { } }; - #endif -