You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
workflow/WFConsulClient.cc

1478 lines
39 KiB

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

/*
Copyright (c) 2022 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Authors: Wang Zhenpeng (wangzhenpeng@sogou-inc.com)
*/
#include <string.h>
#include <string>
#include <vector>
#include <utility>
#include <functional>
#include "json_parser.h" // JSON解析工具
#include "StringUtil.h" // 字符串工具
#include "URIParser.h" // URI解析工具
#include "HttpUtil.h" // HTTP相关工具
#include "WFConsulClient.h" // Consul客户端相关定义
using namespace protocol; // 使用协议相关命名空间
// WFConsulTask 构造函数
WFConsulTask::WFConsulTask(const std::string& proxy_url,
const std::string& service_namespace,
const std::string& service_name,
const std::string& service_id,
int retry_max,
consul_callback_t&& cb) :
proxy_url(proxy_url), // 初始化代理地址
callback(std::move(cb)) // 设置回调函数
{
this->service.service_name = service_name; // 初始化服务名称
this->service.service_namespace = service_namespace; // 初始化服务命名空间
this->service.service_id = service_id; // 初始化服务 ID
this->api_type = CONSUL_API_TYPE_UNKNOWN; // 设置 API 类型为未知
this->retry_max = retry_max; // 设置最大重试次数
this->finish = false; // 标记任务未完成
this->consul_index = 0; // 初始化 Consul 索引为 0
}
// 设置服务相关信息
void WFConsulTask::set_service(const struct protocol::ConsulService *service)
{
this->service.tags = service->tags; // 设置服务标签
this->service.meta = service->meta; // 设置服务元信息
this->service.tag_override = service->tag_override; // 设置标签覆盖标志
this->service.service_address = service->service_address; // 设置服务地址
this->service.lan = service->lan; // 设置本地局域网地址
this->service.lan_ipv4 = service->lan_ipv4; // 设置 IPv4 地址
this->service.lan_ipv6 = service->lan_ipv6; // 设置 IPv6 地址
this->service.virtual_address = service->virtual_address; // 设置虚拟地址
this->service.wan = service->wan; // 设置广域网地址
this->service.wan_ipv4 = service->wan_ipv4; // 设置 IPv4 地址
this->service.wan_ipv6 = service->wan_ipv6; // 设置 IPv6 地址
}
// 解析服务发现结果的静态函数
static bool parse_discover_result(const json_value_t *root,
std::vector<struct ConsulServiceInstance>& result);
// 解析服务列表结果的静态函数
static bool parse_list_service_result(const json_value_t *root,
std::vector<struct ConsulServiceTags>& result);
// 获取服务发现结果
bool WFConsulTask::get_discover_result(
std::vector<struct ConsulServiceInstance>& result)
{
json_value_t *root; // JSON 根节点
int errno_bak; // 备份当前 errno 值
bool ret; // 返回结果标志
// 检查当前 API 类型是否为服务发现
if (this->api_type != CONSUL_API_TYPE_DISCOVER)
{
errno = EPERM; // 设置权限错误
return false; // 返回失败
}
errno_bak = errno; // 备份 errno
errno = EBADMSG; // 设置消息错误
std::string body = HttpUtil::decode_chunked_body(&this->http_resp); // 解码 HTTP 响应的分块数据
root = json_value_parse(body.c_str()); // 解析 JSON 数据
if (!root) // 如果解析失败
return false; // 返回失败
ret = parse_discover_result(root, result); // 解析服务发现结果
json_value_destroy(root); // 销毁 JSON 对象
if (ret) // 如果解析成功
errno = errno_bak; // 恢复原始 errno
return ret; // 返回解析结果
}
// 获取服务列表结果
bool WFConsulTask::get_list_service_result(
std::vector<struct ConsulServiceTags>& result)
{
json_value_t *root; // JSON 根节点
int errno_bak; // 备份当前 errno 值
bool ret; // 返回结果标志
// 检查当前 API 类型是否为服务列表
if (this->api_type != CONSUL_API_TYPE_LIST_SERVICE)
{
errno = EPERM; // 设置权限错误
return false; // 返回失败
}
errno_bak = errno; // 备份 errno
errno = EBADMSG; // 设置消息错误
std::string body = HttpUtil::decode_chunked_body(&this->http_resp); // 解码 HTTP 响应的分块数据
root = json_value_parse(body.c_str()); // 解析 JSON 数据
if (!root) // 如果解析失败
return false; // 返回失败
ret = parse_list_service_result(root, result); // 解析服务列表结果
json_value_destroy(root); // 销毁 JSON 对象
if (ret) // 如果解析成功
errno = errno_bak; // 恢复原始 errno
return ret; // 返回解析结果
}
// 分发任务,根据 API 类型选择相应的任务处理逻辑
void WFConsulTask::dispatch()
{
WFHttpTask *task; // 定义 HTTP 任务指针
if (this->finish) // 如果任务已完成
{
this->subtask_done(); // 标记子任务完成
return; // 退出
}
// 根据 API 类型选择处理任务
switch(this->api_type)
{
case CONSUL_API_TYPE_DISCOVER: // 如果是服务发现任务
task = create_discover_task(); // 创建服务发现任务
break;
case CONSUL_API_TYPE_LIST_SERVICE: // 如果是服务列表任务
task = create_list_service_task(); // 创建服务列表任务
break;
case CONSUL_API_TYPE_DEREGISTER: // 如果是注销服务任务
task = create_deregister_task(); // 创建注销服务任务
break;
case CONSUL_API_TYPE_REGISTER: // 如果是注册服务任务
task = create_register_task(); // 创建注册服务任务
if (task)
break;
// 如果任务创建失败,设置错误状态
if (1)
{
this->state = WFT_STATE_SYS_ERROR; // 设置系统错误状态
this->error = errno; // 设置错误码
}
else
{
default:
this->state = WFT_STATE_TASK_ERROR; // 设置任务错误状态
this->error = WFT_ERR_CONSUL_API_UNKNOWN; // 设置未知 API 错误
}
this->finish = true; // 标记任务完成
this->subtask_done(); // 标记子任务完成
return;
}
// 将任务插入到工作队列
series_of(this)->push_front(this); // 将当前任务加入队列
series_of(this)->push_front(task); // 将新任务加入队列
this->subtask_done(); // 标记子任务完成
}
// 标记任务完成,并执行回调或清理逻辑
SubTask *WFConsulTask::done()
{
SeriesWork *series = series_of(this); // 获取当前任务的工作队列
if (finish) // 如果任务已完成
{
if (this->callback) // 如果有回调函数
this->callback(this); // 执行回调函数
delete this; // 删除任务对象
}
return series->pop(); // 从队列中弹出任务
}
// 将毫秒时间转换为字符串形式(如 "5s" 或 "3m"
static std::string convert_time_to_str(int milliseconds)
{
std::string str_time; // 保存时间字符串
int seconds = milliseconds / 1000; // 将毫秒转换为秒
if (seconds >= 180) // 如果时间超过 3 分钟
str_time = std::to_string(seconds / 60) + "m"; // 转换为分钟形式
else
str_time = std::to_string(seconds) + "s"; // 转换为秒形式
return str_time; // 返回时间字符串
}
// 生成服务发现请求的 URL
std::string WFConsulTask::generate_discover_request()
{
std::string url = this->proxy_url; // 初始化 URL 为代理地址
// 添加服务健康检查路径
url += "/v1/health/service/" + this->service.service_name;
// 添加数据中心参数
url += "?dc=" + this->config.get_datacenter();
// 添加服务命名空间参数
url += "&ns=" + this->service.service_namespace;
// 获取 passing 参数值,表示是否仅返回通过健康检查的服务实例
std::string passing = this->config.get_passing() ? "true" : "false";
url += "&passing=" + passing; // 添加 passing 参数
// 添加 token 参数用于鉴权
url += "&token=" + this->config.get_token();
// 添加过滤表达式参数
url += "&filter=" + this->config.get_filter_expr();
// 检查是否启用 Consul 阻塞查询
if (this->config.blocking_query())
{
// 添加索引参数,用于阻塞查询的初始状态
url += "&index=" + std::to_string(this->get_consul_index());
// 添加等待时间参数,表示阻塞查询的超时时间
url += "&wait=" + convert_time_to_str(this->config.get_wait_ttl());
}
return url; // 返回生成的 URL
}
// 创建服务发现任务
WFHttpTask *WFConsulTask::create_discover_task()
{
std::string url = generate_discover_request(); // 调用生成服务发现 URL 的函数
WFHttpTask *task = WFTaskFactory::create_http_task( // 创建 HTTP 任务
url, 0, this->retry_max, discover_callback);
HttpRequest *req = task->get_req(); // 获取 HTTP 请求对象
// 添加 HTTP 请求头,指定内容类型为 JSON
req->add_header_pair("Content-Type", "application/json");
task->user_data = this; // 设置用户数据为当前任务对象
return task; // 返回创建的任务
}
// 创建服务列表任务
WFHttpTask *WFConsulTask::create_list_service_task()
{
std::string url = this->proxy_url; // 初始化 URL 为代理地址
// 添加服务目录路径和鉴权 token
url += "/v1/catalog/services?token=" + this->config.get_token();
// 添加数据中心参数
url += "&dc=" + this->config.get_datacenter();
// 添加服务命名空间参数
url += "&ns=" + this->service.service_namespace;
WFHttpTask *task = WFTaskFactory::create_http_task( // 创建 HTTP 任务
url, 0, this->retry_max, list_service_callback);
HttpRequest *req = task->get_req(); // 获取 HTTP 请求对象
// 添加 HTTP 请求头,指定内容类型为 JSON
req->add_header_pair("Content-Type", "application/json");
task->user_data = this; // 设置用户数据为当前任务对象
return task; // 返回创建的任务
}
static void print_json_value(const json_value_t *val, int depth,
std::string& json_str);
static bool create_register_request(const json_value_t *root,
const struct ConsulService *service,
const ConsulConfig& config);
// 创建服务注册任务
WFHttpTask *WFConsulTask::create_register_task()
{
std::string payload; // 定义用于存储请求体的字符串
std::string url = this->proxy_url; // 初始化 URL 为代理地址
// 添加服务注册路径和替换检查选项
url += "/v1/agent/service/register?replace-existing-checks=";
url += this->config.get_replace_checks() ? "true" : "false";
WFHttpTask *task = WFTaskFactory::create_http_task( // 创建 HTTP 任务
url, 0, this->retry_max, register_callback);
HttpRequest *req = task->get_req(); // 获取 HTTP 请求对象
req->set_method(HttpMethodPut); // 设置 HTTP 方法为 PUT
req->add_header_pair("Content-Type", "application/json"); // 添加内容类型头
// 如果存在鉴权 token则添加到请求头
if (!this->config.get_token().empty())
req->add_header_pair("X-Consul-Token", this->config.get_token());
json_value_t *root = json_value_create(JSON_VALUE_OBJECT); // 创建 JSON 对象根节点
if (root) // 如果创建成功
{
// 构建服务注册请求 JSON 数据
if (create_register_request(root, &this->service, this->config))
print_json_value(root, 0, payload); // 将 JSON 数据转为字符串
json_value_destroy(root); // 销毁 JSON 对象
// 如果请求体不为空且追加到请求成功
if (!payload.empty() && req->append_output_body(payload))
{
task->user_data = this; // 设置用户数据为当前任务对象
return task; // 返回创建的任务
}
}
task->dismiss(); // 如果失败,销毁任务
return NULL; // 返回空指针
}
// 创建服务注销任务
WFHttpTask *WFConsulTask::create_deregister_task()
{
std::string url = this->proxy_url; // 初始化 URL 为代理地址
// 添加服务注销路径,包含服务 ID 和命名空间参数
url += "/v1/agent/service/deregister/" + this->service.service_id;
url += "?ns=" + this->service.service_namespace;
WFHttpTask *task = WFTaskFactory::create_http_task( // 创建 HTTP 任务
url, 0, this->retry_max, register_callback);
HttpRequest *req = task->get_req(); // 获取 HTTP 请求对象
req->set_method(HttpMethodPut); // 设置 HTTP 方法为 PUT
req->add_header_pair("Content-Type", "application/json"); // 添加内容类型头
// 如果存在鉴权 token则添加到请求头
std::string token = this->config.get_token();
if (!token.empty())
req->add_header_pair("X-Consul-Token", token);
task->user_data = this; // 设置用户数据为当前任务对象
return task; // 返回创建的任务
}
// 检查任务执行结果
bool WFConsulTask::check_task_result(WFHttpTask *task, WFConsulTask *consul_task)
{
// 如果任务状态不是成功状态
if (task->get_state() != WFT_STATE_SUCCESS)
{
consul_task->state = task->get_state(); // 设置任务状态
consul_task->error = task->get_error(); // 设置错误信息
return false; // 返回失败
}
// 获取任务的 HTTP 响应
protocol::HttpResponse *resp = task->get_resp();
// 检查 HTTP 响应状态码是否为 "200"
if (strcmp(resp->get_status_code(), "200") != 0)
{
consul_task->state = WFT_STATE_TASK_ERROR; // 设置任务错误状态
consul_task->error = WFT_ERR_CONSUL_CHECK_RESPONSE_FAILED; // 设置响应失败错误
return false; // 返回失败
}
return true; // 返回成功
}
// 从 HTTP 响应头中获取 Consul 索引
long long WFConsulTask::get_consul_index(HttpResponse *resp)
{
long long consul_index = 0; // 初始化 Consul 索引为 0
// 创建 HTTP 头游标以遍历响应头
protocol::HttpHeaderCursor cursor(resp);
std::string consul_index_str; // 用于存储 Consul 索引字符串
// 查找 "X-Consul-Index" 响应头
if (cursor.find("X-Consul-Index", consul_index_str))
{
// 将字符串转换为长整型
consul_index = strtoll(consul_index_str.c_str(), NULL, 10);
if (consul_index < 0) // 如果索引小于 0则重置为 0
consul_index = 0;
}
return consul_index; // 返回 Consul 索引
}
// 服务发现任务的回调函数
void WFConsulTask::discover_callback(WFHttpTask *task)
{
WFConsulTask *t = (WFConsulTask*)task->user_data; // 获取任务的用户数据
// 检查任务执行结果
if (WFConsulTask::check_task_result(task, t))
{
protocol::HttpResponse *resp = task->get_resp(); // 获取 HTTP 响应
long long consul_index = t->get_consul_index(resp); // 获取当前 Consul 索引
long long last_consul_index = t->get_consul_index(); // 获取上一次的 Consul 索引
// 更新 Consul 索引,如果当前索引小于上次索引,则重置为 0
t->set_consul_index(consul_index < last_consul_index ? 0 : consul_index);
t->state = task->get_state(); // 设置任务状态
}
// 将 HTTP 响应转移到任务对象
t->http_resp = std::move(*task->get_resp());
t->finish = true; // 标记任务完成
}
// 服务列表任务的回调函数
void WFConsulTask::list_service_callback(WFHttpTask *task)
{
WFConsulTask *t = (WFConsulTask*)task->user_data; // 获取任务的用户数据
// 检查任务执行结果
if (WFConsulTask::check_task_result(task, t))
t->state = task->get_state(); // 设置任务状态
// 将 HTTP 响应转移到任务对象
t->http_resp = std::move(*task->get_resp());
t->finish = true; // 标记任务完成
}
// 服务注册任务的回调函数
void WFConsulTask::register_callback(WFHttpTask *task)
{
WFConsulTask *t = (WFConsulTask *)task->user_data; // 获取任务的用户数据
// 检查任务执行结果
if (WFConsulTask::check_task_result(task, t))
t->state = task->get_state(); // 设置任务状态
// 将 HTTP 响应转移到任务对象
t->http_resp = std::move(*task->get_resp());
t->finish = true; // 标记任务完成
}
// 初始化 Consul 客户端
int WFConsulClient::init(const std::string& proxy_url, ConsulConfig config)
{
ParsedURI uri; // 定义用于存储解析后的 URI 的对象
// 解析代理 URL
if (URIParser::parse(proxy_url, uri) >= 0)
{
// 构建完整的代理 URL
this->proxy_url = uri.scheme;
this->proxy_url += "://";
this->proxy_url += uri.host;
if (uri.port) // 如果指定了端口,则追加端口信息
{
this->proxy_url += ":";
this->proxy_url += uri.port;
}
this->config = std::move(config); // 将配置赋值给客户端
return 0; // 返回成功
}
else if (uri.state == URI_STATE_INVALID) // 如果 URI 无效
errno = EINVAL; // 设置错误码为无效参数
return -1; // 返回失败
}
// 初始化 Consul 客户端(使用默认配置)
int WFConsulClient::init(const std::string& proxy_url)
{
return this->init(proxy_url, ConsulConfig()); // 调用带配置参数的 init 函数
}
// 创建服务发现任务
WFConsulTask *WFConsulClient::create_discover_task(
const std::string& service_namespace,
const std::string& service_name,
int retry_max,
consul_callback_t cb)
{
// 创建一个新的 Consul 任务
WFConsulTask *task = new WFConsulTask(this->proxy_url, service_namespace,
service_name, "", retry_max,
std::move(cb));
task->set_api_type(CONSUL_API_TYPE_DISCOVER); // 设置任务类型为服务发现
task->set_config(this->config); // 设置任务的配置信息
return task; // 返回创建的任务
}
// 创建服务列表任务
WFConsulTask *WFConsulClient::create_list_service_task(
const std::string& service_namespace,
int retry_max,
consul_callback_t cb)
{
// 创建一个新的 Consul 任务
WFConsulTask *task = new WFConsulTask(this->proxy_url, service_namespace,
"", "", retry_max,
std::move(cb));
task->set_api_type(CONSUL_API_TYPE_LIST_SERVICE); // 设置任务类型为服务列表
task->set_config(this->config); // 设置任务的配置信息
return task; // 返回创建的任务
}
// 创建服务注册任务
WFConsulTask *WFConsulClient::create_register_task(
const std::string& service_namespace,
const std::string& service_name,
const std::string& service_id,
int retry_max,
consul_callback_t cb)
{
// 创建一个新的 Consul 任务
WFConsulTask *task = new WFConsulTask(this->proxy_url, service_namespace,
service_name, service_id, retry_max,
std::move(cb));
task->set_api_type(CONSUL_API_TYPE_REGISTER); // 设置任务类型为服务注册
task->set_config(this->config); // 设置任务的配置信息
return task; // 返回创建的任务
}
// 创建服务注销任务
WFConsulTask *WFConsulClient::create_deregister_task(
const std::string& service_namespace,
const std::string& service_id,
int retry_max,
consul_callback_t cb)
{
// 创建一个新的 Consul 任务
WFConsulTask *task = new WFConsulTask(this->proxy_url, service_namespace,
"", service_id, retry_max,
std::move(cb));
task->set_api_type(CONSUL_API_TYPE_DEREGISTER); // 设置任务类型为服务注销
task->set_config(this->config); // 设置任务的配置信息
return task; // 返回创建的任务
}
// 创建带标签的地址对象
static bool create_tagged_address(const ConsulAddress& consul_address,
const std::string& name,
json_object_t *tagged_obj)
{
// 如果地址为空,直接返回成功
if (consul_address.first.empty())
return true;
// 向 JSON 对象中添加名为 name 的嵌套对象
const json_value_t *val = json_object_append(tagged_obj, name.c_str(),
JSON_VALUE_OBJECT);
if (!val) // 如果添加失败,返回 false
return false;
// 获取新创建的嵌套对象
json_object_t *obj = json_value_object(val);
if (!obj) // 如果对象获取失败,返回 false
return false;
// 向嵌套对象中添加 "Address" 字段
if (!json_object_append(obj, "Address", JSON_VALUE_STRING,
consul_address.first.c_str()))
return false;
// 向嵌套对象中添加 "Port" 字段
if (!json_object_append(obj, "Port", JSON_VALUE_NUMBER,
(double)consul_address.second))
return false;
return true; // 地址对象创建成功
}
// 创建健康检查配置
static bool create_health_check(const ConsulConfig& config, json_object_t *obj)
{
const json_value_t *val; // 存储添加字段的返回值
std::string str; // 临时字符串存储变量
// 如果未启用健康检查,直接返回成功
if (!config.get_health_check())
return true;
// 在 JSON 对象中添加 "Check" 字段
val = json_object_append(obj, "Check", JSON_VALUE_OBJECT);
if (!val) // 如果添加失败,返回 false
return false;
obj = json_value_object(val); // 获取 "Check" 对象
if (!obj) // 如果对象获取失败,返回 false
return false;
// 添加健康检查的名称
str = config.get_check_name();
if (!json_object_append(obj, "Name", JSON_VALUE_STRING, str.c_str()))
return false;
// 添加健康检查的备注
str = config.get_check_notes();
if (!json_object_append(obj, "Notes", JSON_VALUE_STRING, str.c_str()))
return false;
// 添加健康检查的 HTTP URL
str = config.get_check_http_url();
if (!str.empty()) // 如果 URL 不为空
{
if (!json_object_append(obj, "HTTP", JSON_VALUE_STRING, str.c_str()))
return false;
// 添加 HTTP 方法
str = config.get_check_http_method();
if (!json_object_append(obj, "Method", JSON_VALUE_STRING, str.c_str()))
return false;
// 添加 HTTP 请求体
str = config.get_http_body();
if (!json_object_append(obj, "Body", JSON_VALUE_STRING, str.c_str()))
return false;
// 添加 HTTP 头部信息
val = json_object_append(obj, "Header", JSON_VALUE_OBJECT);
if (!val)
return false;
json_object_t *header_obj = json_value_object(val);
if (!header_obj)
return false;
// 遍历并添加所有 HTTP 头部字段
for (const auto& header : *config.get_http_headers())
{
val = json_object_append(header_obj, header.first.c_str(),
JSON_VALUE_ARRAY);
if (!val)
return false;
json_array_t *arr = json_value_array(val);
if (!arr)
return false;
// 添加头部字段对应的多个值
for (const auto& value : header.second)
{
if (!json_array_append(arr, JSON_VALUE_STRING, value.c_str()))
return false;
}
}
}
// 添加 TCP 检查地址
str = config.get_check_tcp();
if (!str.empty())
{
if (!json_object_append(obj, "TCP", JSON_VALUE_STRING, str.c_str()))
return false;
}
// 添加初始状态
str = config.get_initial_status();
if (!json_object_append(obj, "Status", JSON_VALUE_STRING, str.c_str()))
return false;
// 添加自动注销时间
str = convert_time_to_str(config.get_auto_deregister_time());
if (!json_object_append(obj, "DeregisterCriticalServiceAfter",
JSON_VALUE_STRING, str.c_str()))
return false;
// 添加检查间隔时间
str = convert_time_to_str(config.get_check_interval());
if (!json_object_append(obj, "Interval", JSON_VALUE_STRING, str.c_str()))
return false;
// 添加检查超时时间
str = convert_time_to_str(config.get_check_timeout());
if (!json_object_append(obj, "Timeout", JSON_VALUE_STRING, str.c_str()))
return false;
// 添加连续成功次数限制
if (!json_object_append(obj, "SuccessBeforePassing", JSON_VALUE_NUMBER,
(double)config.get_success_times()))
return false;
// 添加连续失败次数限制
if (!json_object_append(obj, "FailuresBeforeCritical", JSON_VALUE_NUMBER,
(double)config.get_failure_times()))
return false;
return true; // 健康检查配置创建成功
}
// 创建服务注册请求
static bool create_register_request(const json_value_t *root,
const struct ConsulService *service,
const ConsulConfig& config)
{
const json_value_t *val; // 临时存储 JSON 值
json_object_t *obj; // 临时存储 JSON 对象
obj = json_value_object(root); // 获取根对象
if (!obj)
return false;
// 添加服务 ID
if (!json_object_append(obj, "ID", JSON_VALUE_STRING,
service->service_id.c_str()))
return false;
// 添加服务名称
if (!json_object_append(obj, "Name", JSON_VALUE_STRING,
service->service_name.c_str()))
return false;
// 如果命名空间非空,则添加命名空间
if (!service->service_namespace.empty())
{
if (!json_object_append(obj, "ns", JSON_VALUE_STRING,
service->service_namespace.c_str()))
return false;
}
// 添加服务标签数组
val = json_object_append(obj, "Tags", JSON_VALUE_ARRAY);
if (!val)
return false;
json_array_t *arr = json_value_array(val);
if (!arr)
return false;
// 遍历并添加所有标签
for (const auto& tag : service->tags)
{
if (!json_array_append(arr, JSON_VALUE_STRING, tag.c_str()))
return false;
}
// 添加服务地址
if (!json_object_append(obj, "Address", JSON_VALUE_STRING,
service->service_address.first.c_str()))
return false;
// 添加服务端口
if (!json_object_append(obj, "Port", JSON_VALUE_NUMBER,
(double)service->service_address.second))
return false;
// 添加元数据对象
val = json_object_append(obj, "Meta", JSON_VALUE_OBJECT);
if (!val)
return false;
json_object_t *meta_obj = json_value_object(val);
if (!meta_obj)
return false;
// 遍历并添加所有元数据
for (const auto& meta_kv : service->meta)
{
if (!json_object_append(meta_obj, meta_kv.first.c_str(),
JSON_VALUE_STRING, meta_kv.second.c_str()))
return false;
}
// 添加标签覆盖选项
int type = service->tag_override ? JSON_VALUE_TRUE : JSON_VALUE_FALSE;
if (!json_object_append(obj, "EnableTagOverride", type))
return false;
// 添加带标签的地址
val = json_object_append(obj, "TaggedAddresses", JSON_VALUE_OBJECT);
if (!val)
return false;
json_object_t *tagged_obj = json_value_object(val);
if (!tagged_obj)
return false;
// 创建所有类型的带标签地址
if (!create_tagged_address(service->lan, "lan", tagged_obj))
return false;
if (!create_tagged_address(service->lan_ipv4, "lan_ipv4", tagged_obj))
return false;
if (!create_tagged_address(service->lan_ipv6, "lan_ipv6", tagged_obj))
return false;
if (!create_tagged_address(service->virtual_address, "virtual", tagged_obj))
return false;
if (!create_tagged_address(service->wan, "wan", tagged_obj))
return false;
if (!create_tagged_address(service->wan_ipv4, "wan_ipv4", tagged_obj))
return false;
if (!create_tagged_address(service->wan_ipv6, "wan_ipv6", tagged_obj))
return false;
// 创建健康检查配置
if (!create_health_check(config, obj))
return false;
return true; // 服务注册请求创建成功
}
// 解析服务列表结果
static bool parse_list_service_result(const json_value_t *root,
std::vector<struct ConsulServiceTags>& result)
{
const json_object_t *obj; // 存储 JSON 对象
const json_value_t *val; // 存储 JSON 值
const json_array_t *arr; // 存储 JSON 数组
const char *key; // 存储键
const char *str; // 存储字符串值
// 将根值转换为 JSON 对象
obj = json_value_object(root);
if (!obj) // 如果转换失败,返回 false
return false;
// 遍历 JSON 对象中的每个键值对
json_object_for_each(key, val, obj)
{
struct ConsulServiceTags instance; // 定义一个服务标签实例
instance.service_name = key; // 将键作为服务名称
arr = json_value_array(val); // 获取值作为数组
if (!arr) // 如果值不是数组,返回 false
return false;
const json_value_t *tag_val; // 遍历数组的每个值
json_array_for_each(tag_val, arr)
{
str = json_value_string(tag_val); // 获取值的字符串形式
if (!str) // 如果值不是字符串,返回 false
return false;
instance.tags.emplace_back(str); // 将标签添加到实例
}
result.emplace_back(std::move(instance)); // 将实例添加到结果列表
}
return true; // 返回成功
}
// 解析发现节点信息
static bool parse_discover_node(const json_object_t *obj,
struct ConsulServiceInstance *instance)
{
const json_value_t *val; // 存储 JSON 值
const char *str; // 存储字符串值
// 查找并解析 "Node" 信息
val = json_object_find("Node", obj);
if (!val)
return false;
obj = json_value_object(val); // 获取 "Node" 对象
if (!obj)
return false;
// 获取 "ID" 字段
val = json_object_find("ID", obj);
if (!val)
return false;
str = json_value_string(val); // 转换为字符串
if (!str)
return false;
instance->node_id = str; // 设置节点 ID
// 获取 "Node" 字段
val = json_object_find("Node", obj);
if (!val)
return false;
str = json_value_string(val);
if (!str)
return false;
instance->node_name = str; // 设置节点名称
// 获取 "Address" 字段
val = json_object_find("Address", obj);
if (!val)
return false;
str = json_value_string(val);
if (!str)
return false;
instance->node_address = str; // 设置节点地址
// 获取 "Datacenter" 字段
val = json_object_find("Datacenter", obj);
if (!val)
return false;
str = json_value_string(val);
if (!str)
return false;
instance->dc = str; // 设置数据中心
// 解析 "Meta" 信息
val = json_object_find("Meta", obj);
if (!val)
return false;
const json_object_t *meta_obj = json_value_object(val); // 获取 "Meta" 对象
if (!meta_obj)
return false;
const char *meta_k; // 存储元数据键
const json_value_t *meta_v; // 存储元数据值
// 遍历 "Meta" 对象
json_object_for_each(meta_k, meta_v, meta_obj)
{
str = json_value_string(meta_v); // 获取元数据值
if (!str)
return false;
instance->node_meta[meta_k] = str; // 添加元数据键值对
}
// 获取 "CreateIndex" 字段
val = json_object_find("CreateIndex", obj);
if (val)
instance->create_index = json_value_number(val); // 设置创建索引
// 获取 "ModifyIndex" 字段
val = json_object_find("ModifyIndex", obj);
if (val)
instance->modify_index = json_value_number(val); // 设置修改索引
return true; // 返回成功
}
// 解析服务发现结果
static bool parse_discover_result(const json_value_t *root,
std::vector<struct ConsulServiceInstance>& result)
{
const json_array_t *arr = json_value_array(root); // 获取 JSON 数组
const json_value_t *val; // 存储数组值
const json_object_t *obj; // 存储 JSON 对象
if (!arr) // 如果不是数组,返回 false
return false;
// 遍历数组
json_array_for_each(val, arr)
{
struct ConsulServiceInstance instance; // 创建服务实例
obj = json_value_object(val); // 获取数组中的对象
if (!obj)
return false;
// 解析节点信息
if (!parse_discover_node(obj, &instance))
return false;
// 解析服务信息
if (!parse_service(obj, &instance.service))
return false;
// 解析健康检查
parse_health_check(obj, &instance);
result.emplace_back(std::move(instance)); // 添加实例到结果列表
}
return true; // 返回成功
}
// 解析服务信息
static bool parse_service(const json_object_t *obj,
struct ConsulService *service)
{
const json_value_t *val; // 用于存储 JSON 值
const char *str; // 用于存储字符串值
// 获取 "Service" 对象
val = json_object_find("Service", obj);
if (!val)
return false;
obj = json_value_object(val); // 将值转换为对象
if (!obj)
return false;
// 解析服务 ID
val = json_object_find("ID", obj);
if (!val)
return false;
str = json_value_string(val); // 将值转换为字符串
if (!str)
return false;
service->service_id = str; // 设置服务 ID
// 解析服务名称
val = json_object_find("Service", obj);
if (!val)
return false;
str = json_value_string(val);
if (!str)
return false;
service->service_name = str; // 设置服务名称
// 解析命名空间
val = json_object_find("Namespace", obj);
if (val)
{
str = json_value_string(val);
if (!str)
return false;
service->service_namespace = str; // 设置命名空间
}
// 解析服务地址
val = json_object_find("Address", obj);
if (!val)
return false;
str = json_value_string(val);
if (!str)
return false;
service->service_address.first = str; // 设置服务地址
// 解析服务端口
val = json_object_find("Port", obj);
if (!val)
return false;
service->service_address.second = json_value_number(val); // 设置服务端口
// 解析带标签地址
val = json_object_find("TaggedAddresses", obj);
if (!val)
return false;
parse_tagged_address("lan", val, service->lan);
parse_tagged_address("lan_ipv4", val, service->lan_ipv4);
parse_tagged_address("lan_ipv6", val, service->lan_ipv6);
parse_tagged_address("virtual", val, service->virtual_address);
parse_tagged_address("wan", val, service->wan);
parse_tagged_address("wan_ipv4", val, service->wan_ipv4);
parse_tagged_address("wan_ipv6", val, service->wan_ipv6);
// 解析标签列表
val = json_object_find("Tags", obj);
if (!val)
return false;
const json_array_t *tags_arr = json_value_array(val); // 获取数组
if (tags_arr)
{
const json_value_t *tags_value;
json_array_for_each(tags_value, tags_arr) // 遍历数组
{
str = json_value_string(tags_value); // 获取标签字符串
if (!str)
return false;
service->tags.emplace_back(str); // 添加标签到服务
}
}
// 解析元数据
val = json_object_find("Meta", obj);
if (!val)
return false;
const json_object_t *meta_obj = json_value_object(val); // 获取元数据对象
if (!meta_obj)
return false;
const char *meta_k;
const json_value_t *meta_v;
json_object_for_each(meta_k, meta_v, meta_obj) // 遍历元数据
{
str = json_value_string(meta_v);
if (!str)
return false;
service->meta[meta_k] = str; // 添加元数据键值对
}
// 检查标签覆盖选项
val = json_object_find("EnableTagOverride", obj);
if (val)
service->tag_override = (json_value_type(val) == JSON_VALUE_TRUE);
return true; // 返回成功
}
// 解析健康检查信息
static bool parse_health_check(const json_object_t *obj,
struct ConsulServiceInstance *instance)
{
const json_value_t *val; // 存储 JSON 值
const char *str; // 存储字符串值
// 获取 "Checks" 数组
val = json_object_find("Checks", obj);
if (!val)
return false;
const json_array_t *check_arr = json_value_array(val); // 转换为数组
if (!check_arr)
return false;
const json_value_t *arr_val;
json_array_for_each(arr_val, check_arr) // 遍历数组
{
obj = json_value_object(arr_val); // 获取数组中的对象
if (!obj)
return false;
// 解析服务名称
val = json_object_find("ServiceName", obj);
if (!val)
return false;
str = json_value_string(val);
if (!str)
return false;
std::string check_service_name = str;
// 解析服务 ID
val = json_object_find("ServiceID", obj);
if (!val)
return false;
str = json_value_string(val);
if (!str)
return false;
std::string check_service_id = str;
if (check_service_id.empty() || check_service_name.empty())
continue;
// 解析检查 ID
val = json_object_find("CheckID", obj);
if (!val)
return false;
str = json_value_string(val);
if (!str)
return false;
instance->check_id = str;
// 解析检查名称
val = json_object_find("Name", obj);
if (!val)
return false;
str = json_value_string(val);
if (!str)
return false;
instance->check_name = str;
// 解析检查状态
val = json_object_find("Status", obj);
if (!val)
return false;
str = json_value_string(val);
if (!str)
return false;
instance->check_status = str;
// 解析备注
val = json_object_find("Notes", obj);
if (val)
{
str = json_value_string(val);
if (!str)
return false;
instance->check_notes = str;
}
// 解析输出
val = json_object_find("Output", obj);
if (val)
{
str = json_value_string(val);
if (!str)
return false;
instance->check_output = str;
}
// 解析检查类型
val = json_object_find("Type", obj);
if (val)
{
str = json_value_string(val);
if (!str)
return false;
instance->check_type = str;
}
break; // 只解析一个有效的健康检查
}
return true; // 返回成功
}
// 解析服务发现结果
static bool parse_discover_result(const json_value_t *root,
std::vector<struct ConsulServiceInstance>& result)
{
const json_array_t *arr = json_value_array(root); // 转换为数组
const json_value_t *val; // 存储数组中的值
const json_object_t *obj; // 存储 JSON 对象
if (!arr)
return false;
// 遍历数组
json_array_for_each(val, arr)
{
struct ConsulServiceInstance instance; // 定义服务实例
obj = json_value_object(val); // 获取数组中的对象
if (!obj)
return false;
// 解析节点信息
if (!parse_discover_node(obj, &instance))
return false;
// 解析服务信息
if (!parse_service(obj, &instance.service))
return false;
// 解析健康检查
parse_health_check(obj, &instance);
result.emplace_back(std::move(instance)); // 添加实例到结果
}
return true; // 返回成功
}
// 打印 JSON 对象,递归地将其转换为字符串格式
static void print_json_object(const json_object_t *obj, int depth,
std::string& json_str)
{
const char *name; // JSON 对象的键
const json_value_t *val; // JSON 对象的值
int n = 0; // 键值对的计数器
int i;
json_str += "{\n"; // 开始对象
// 遍历 JSON 对象中的每个键值对
json_object_for_each(name, val, obj)
{
if (n != 0) // 如果不是第一个键值对,添加逗号换行
json_str += ",\n";
n++; // 键值对计数加 1
for (i = 0; i < depth + 1; i++) // 根据深度添加缩进
json_str += " ";
// 添加键
json_str += "\"";
json_str += name;
json_str += "\": ";
// 递归打印值
print_json_value(val, depth + 1, json_str);
}
json_str += "\n"; // 对象内容结束
for (i = 0; i < depth; i++) // 根据深度添加缩进
json_str += " ";
json_str += "}"; // 结束对象
}
// 打印 JSON 数组,递归地将其转换为字符串格式
static void print_json_array(const json_array_t *arr, int depth,
std::string& json_str)
{
const json_value_t *val; // JSON 数组的值
int n = 0; // 数组值的计数器
int i;
json_str += "[\n"; // 开始数组
// 遍历 JSON 数组中的每个值
json_array_for_each(val, arr)
{
if (n != 0) // 如果不是第一个值,添加逗号换行
json_str += ",\n";
n++; // 值计数加 1
for (i = 0; i < depth + 1; i++) // 根据深度添加缩进
json_str += " ";
// 递归打印值
print_json_value(val, depth + 1, json_str);
}
json_str += "\n"; // 数组内容结束
for (i = 0; i < depth; i++) // 根据深度添加缩进
json_str += " ";
json_str += "]"; // 结束数组
}
// 打印 JSON 字符串,处理特殊字符转义
static void print_json_string(const char *str, std::string& json_str)
{
json_str += "\""; // 开始字符串
// 遍历字符串中的每个字符
while (*str)
{
switch (*str)
{
case '\r': // 回车符转义
json_str += "\\r";
break;
case '\n': // 换行符转义
json_str += "\\n";
break;
case '\f': // 换页符转义
json_str += "\\f";
break;
case '\b': // 退格符转义
json_str += "\\b";
break;
case '\"': // 双引号转义
json_str += "\\\"";
break;
case '\t': // 制表符转义
json_str += "\\t";
break;
case '\\': // 反斜杠转义
json_str += "\\\\";
break;
default: // 其他字符直接添加
json_str += *str;
break;
}
str++; // 指向下一个字符
}
json_str += "\""; // 结束字符串
}
// 打印 JSON 数字
static void print_json_number(double number, std::string& json_str)
{
long long integer = number; // 检查数字是否是整数
if (integer == number) // 如果是整数
json_str += std::to_string(integer); // 转换为整数字符串
else // 如果是浮点数
json_str += std::to_string(number); // 转换为浮点数字符串
}
// 根据值的类型打印 JSON 值
static void print_json_value(const json_value_t *val, int depth,
std::string& json_str)
{
switch (json_value_type(val)) // 获取值的类型
{
case JSON_VALUE_STRING: // 如果是字符串
print_json_string(json_value_string(val), json_str);
break;
case JSON_VALUE_NUMBER: // 如果是数字
print_json_number(json_value_number(val), json_str);
break;
case JSON_VALUE_OBJECT: // 如果是对象
print_json_object(json_value_object(val), depth, json_str);
break;
case JSON_VALUE_ARRAY: // 如果是数组
print_json_array(json_value_array(val), depth, json_str);
break;
case JSON_VALUE_TRUE: // 如果是布尔值 true
json_str += "true";
break;
case JSON_VALUE_FALSE: // 如果是布尔值 false
json_str += "false";
break;
case JSON_VALUE_NULL: // 如果是 null
json_str += "null";
break;
}
}