cmz 9 months ago
parent 4b2fafbc15
commit 3a1ae8dfdf

@ -0,0 +1,158 @@
// 定义了一个通信调度对象,它包含了最大负载和当前负载的属性。
class CommSchedObject
{
public:
// 返回最大负载
size_t get_max_load() const { return this->max_load; }
// 返回当前负载
size_t get_cur_load() const { return this->cur_load; }
private:
// 纯虚函数用于获取一个CommTarget对象具体的实现由子类完成
virtual CommTarget* acquire(int wait_timeout) = 0;
protected:
// 最大负载
size_t max_load;
// 当前负载
size_t cur_load;
public:
// 析构函数
virtual ~CommSchedObject() { }
// 友元类声明允许CommScheduler访问私有成员
friend class CommScheduler;
};
// 定义了一个通信调度组它包含了一个CommSchedObject的列表。
class CommSchedGroup;
// 定义了一个通信调度目标它是CommSchedObject和CommTarget的子类。
class CommSchedTarget : public CommSchedObject, public CommTarget
{
public:
// 初始化函数,用于设置目标地址、连接超时等参数
int init(const struct sockaddr* addr, socklen_t addrlen,
int connect_timeout, int response_timeout,
size_t max_connections);
// 反初始化函数,用于清理资源
void deinit();
// 使用SSL的初始化函数
int init(const struct sockaddr* addr, socklen_t addrlen, SSL_CTX* ssl_ctx,
int connect_timeout, int ssl_connect_timeout, int response_timeout,
size_t max_connections)
{
// 调用非SSL版本的init函数然后设置SSL上下文
// ...
}
private:
// 最终重写的acquire和release函数
virtual CommTarget* acquire(int wait_timeout); /* final */
virtual void release(); /* final */
private:
// 指向所属的CommSchedGroup
CommSchedGroup* group;
// 索引
int index;
// 等待计数
int wait_cnt;
// 互斥锁和条件变量
pthread_mutex_t mutex;
pthread_cond_t cond;
// 友元类声明允许CommSchedGroup访问私有成员
friend class CommSchedGroup;
};
// 定义了一个通信调度组它是CommSchedObject的子类。
class CommSchedGroup : public CommSchedObject
{
public:
// 初始化和反初始化函数
int init();
void deinit();
// 添加和移除CommSchedTarget
int add(CommSchedTarget* target);
int remove(CommSchedTarget* target);
private:
// 最终重写的acquire函数
virtual CommTarget* acquire(int wait_timeout); /* final */
private:
// 用于存储CommSchedTarget的堆数组
CommSchedTarget** tg_heap;
// 堆的大小和缓冲区大小
int heap_size;
int heap_buf_size;
// 等待计数
int wait_cnt;
// 互斥锁和条件变量
pthread_mutex_t mutex;
pthread_cond_t cond;
// 静态比较函数,用于堆排序
static int target_cmp(CommSchedTarget* target1, CommSchedTarget* target2);
// 堆相关操作函数
void heapify(int top);
void heap_adjust(int index, int swap_on_equal);
int heap_insert(CommSchedTarget* target);
void heap_remove(int index);
// 友元类声明允许CommSchedTarget访问私有成员
friend class CommSchedTarget;
};
// 定义了一个通信调度器它包含了一个Communicator对象。
class CommScheduler
{
public:
// 初始化和反初始化函数
int init(size_t poller_threads, size_t handler_threads)
{
// 调用Communicator的初始化函数
// ...
}
void deinit()
{
// 调用Communicator的反初始化函数
// ...
}
// 请求函数用于获取一个CommTarget并发起请求
int request(CommSession* session, CommSchedObject* object,
int wait_timeout, CommTarget** target)
{
// 实现细节...
}
// 回复、关闭、推送数据、绑定服务等函数
// ...
public:
// 检查当前线程是否是处理线程,增加或减少处理线程的函数
int is_handler_thread() const
{
// 实现细节...
}
int increase_handler_thread()
{
// 实现细节...
}
int decrease_handler_thread()
{
// 实现细节...
}
private:
// 内部的Communicator对象
Communicator comm;
public:
// 析构函数
virtual ~CommScheduler() { }
};

@ -0,0 +1,304 @@
// 声明了一个通信连接的基类
class CommConnection
{
public:
// 虚析构函数,以确保派生类的析构函数被正确调用
virtual ~CommConnection() { }
};
// 声明了一个通信目标类,用于表示连接的目标地址
class CommTarget
{
public:
// 初始化函数,设置目标地址和超时参数
int init(const struct sockaddr* addr, socklen_t addrlen,
int connect_timeout, int response_timeout);
// 反初始化函数,清理资源
void deinit();
// 获取目标地址
void get_addr(const struct sockaddr** addr, socklen_t* addrlen) const
{
// ...
}
// 检查是否有空闲连接
int has_idle_conn() const { return !list_empty(&this->idle_list); }
protected:
// 设置SSL上下文和超时
void set_ssl(SSL_CTX* ssl_ctx, int ssl_connect_timeout)
{
// ...
}
// 获取SSL上下文
SSL_CTX* get_ssl_ctx() const { return this->ssl_ctx; }
private:
// 创建连接文件描述符的纯虚函数
virtual int create_connect_fd()
{
// ...
}
// 创建新的连接对象的纯虚函数
virtual CommConnection* new_connection(int connect_fd)
{
// ...
}
// 初始化SSL的虚函数
virtual int init_ssl(SSL* ssl) { return 0; }
// 目标地址、长度、超时参数和SSL上下文
struct sockaddr* addr;
socklen_t addrlen;
int connect_timeout;
int response_timeout;
int ssl_connect_timeout;
SSL_CTX* ssl_ctx;
// 空闲连接列表和互斥锁
struct list_head idle_list;
pthread_mutex_t mutex;
public:
// 虚析构函数
virtual ~CommTarget() { }
// 友元类声明
friend class CommServiceTarget;
friend class Communicator;
};
// 声明了一个消息输出的抽象类
class CommMessageOut
{
private:
// 编码消息到iovec数组的纯虚函数
virtual int encode(struct iovec vectors[], int max) = 0;
public:
// 虚析构函数
virtual ~CommMessageOut() { }
// 友元类声明
friend class Communicator;
};
// 声明了一个消息输入的抽象类继承自poller_message_t
class CommMessageIn : private poller_message_t
{
private:
// 追加数据到消息的纯虚函数
virtual int append(const void* buf, size_t* size) = 0;
protected:
// 反馈函数,用于在接收时发送小数据包
virtual int feedback(const void* buf, size_t size);
// 更新接收开始时间
virtual void renew();
// 获取最内层包装的消息
virtual CommMessageIn* inner() { return this; }
private:
// 连接入口
struct CommConnEntry* entry;
public:
// 虚析构函数
virtual ~CommMessageIn() { }
// 友元类声明
friend class Communicator;
};
// 定义了会话状态的宏
#define CS_STATE_SUCCESS 0
#define CS_STATE_ERROR 1
#define CS_STATE_STOPPED 2
#define CS_STATE_TOREPLY 3 /* 仅用于服务会话 */
// 声明了一个通信会话的抽象类
class CommSession
{
private:
// 获取消息输出、输入对象的纯虚函数
virtual CommMessageOut* message_out() = 0;
virtual CommMessageIn* message_in() = 0;
// 获取超时参数的虚函数
virtual int send_timeout() { return -1; }
virtual int receive_timeout() { return -1; }
virtual int keep_alive_timeout() { return 0; }
virtual int first_timeout() { return 0; }
// 处理会话状态的纯虚函数
virtual void handle(int state, int error) = 0;
protected:
// 获取目标、连接、消息对象和序列号
CommTarget* get_target() const { return this->target; }
CommConnection* get_connection() const { return this->conn; }
CommMessageOut* get_message_out() const { return this->out; }
CommMessageIn* get_message_in() const { return this->in; }
long long get_seq() const { return this->seq; }
private:
// 会话的目标、连接、消息对象、序列号、开始时间和超时参数
CommTarget* target;
CommConnection* conn;
CommMessageOut* out;
CommMessageIn* in;
long long seq;
struct timespec begin_time;
int timeout;
int passive;
public:
// 构造函数
CommSession() { this->passive = 0; }
// 虚析构函数
virtual ~CommSession();
// 友元类声明
friend class CommMessageIn;
friend class Communicator;
};
// 声明了一个通信服务的抽象类
class CommService
{
public:
// 初始化和反初始化函数
int init(const struct sockaddr* bind_addr, socklen_t addrlen,
int listen_timeout, int response_timeout);
void deinit();
// 清理活动的会话
int drain(int max);
public:
// 获取绑定地址
void get_addr(const struct sockaddr** addr, socklen_t* addrlen) const
{
// ...
}
protected:
// 设置SSL上下文和超时
void set_ssl(SSL_CTX* ssl_ctx, int ssl_accept_timeout)
{
// ...
}
// 获取SSL上下文
SSL_CTX* get_ssl_ctx() const { return this->ssl_ctx; }
private:
// 创建新的会话对象的纯虚函数
virtual CommSession* new_session(long long seq, CommConnection* conn) = 0;
// 处理停止和未绑定情况的虚函数
virtual void handle_stop(int error) { }
virtual void handle_unbound() = 0;
private:
// 创建监听文件描述符的虚函数
virtual int create_listen_fd()
{
// ...
}
// 创建新的连接对象的虚函数
virtual CommConnection* new_connection(int accept_fd)
{
// ...
}
// 初始化SSL的虚函数
virtual int init_ssl(SSL* ssl) { return 0; }
// 绑定地址、长度、超时参数和SSL上下文
struct sockaddr* bind_addr;
socklen_t addrlen;
int listen_timeout;
int response_timeout;
int ssl_accept_timeout;
SSL_CTX* ssl_ctx;
// 引用计数和活动会话列表
void incref();
void decref();
int reliable;
int listen_fd;
int ref;
struct list_head alive_list;
pthread_mutex_t mutex;
public:
// 虚析构函数
virtual ~CommService() { }
// 友元类声明
friend class CommServiceTarget;
friend class Communicator;
};
// 定义了睡眠会话状态的宏
#define SS_STATE_COMPLETE 0
#define SS_STATE_ERROR 1
#define SS_STATE_DISRUPTED 2
// 声明了一个睡眠会话的抽象类
class SleepSession
{
private:
// 获取持续时间和处理睡眠会话状态的纯虚函数
virtual int duration(struct timespec* value) = 0;
virtual void handle(int state, int error) = 0;
private:
// 定时器和索引
void* timer;
int index;
public:
// 虚析构函数
virtual ~SleepSession() { }
// 友元类声明
friend class Communicator;
};
// 根据操作系统包含不同的IO服务实现
#ifdef __linux__
# include "IOService_linux.h"
#else
# include "IOService_thread.h"
#endif
// 声明了通信器类负责管理会话、服务和IO操作
class Communicator
{
public:
// 初始化和反初始化函数
int init(size_t poller_threads, size_t handler_threads);
void deinit();
// 请求、回复、推送数据、关闭会话、绑定服务等函数
int request(CommSession* session, CommTarget* target);
int reply(CommSession* session);
int push(const void* buf, size_t size, CommSession* session);
int shutdown(CommSession* session);
int bind(CommService* service);
void unbind(CommService* service);
// 睡眠和取消睡眠会话的函数
int sleep(SleepSession* session);
int unsleep(SleepSession* session);
// 绑定和解绑IO服务的函数
int io_bind(IOService* service);
void io_unbind(IOService* service);
public:
// 检查当前线程是否是处理线程,增加或减少处理线程的函数
int is_handler_thread() const
Loading…
Cancel
Save