diff --git a/src%2Fkernel/CommScheduler.h b/src%2Fkernel/CommScheduler.h new file mode 100644 index 0000000..b8d7b99 --- /dev/null +++ b/src%2Fkernel/CommScheduler.h @@ -0,0 +1,152 @@ +// 版权声明,表明该文件是在Apache License 2.0下授权的。 +/* + Copyright (c) 2019 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com) +*/ + +#ifndef _COMMSCHEDULER_H_ // 预处理指令,防止头文件内容被重复包含。 +#define _COMMSCHEDULER_H_ + +#include // 包含系统类型定义。 +#include // 包含套接字相关定义。 +#include // 包含POSIX线程库。 +#include // 包含SSL库。 +#include "Communicator.h" // 包含通信器类定义。 + +// 通信调度对象类,用于获取最大和当前负载。 +class CommSchedObject +{ +public: + size_t get_max_load() const { return this->max_load; } // 获取最大负载。 + size_t get_cur_load() const { return this->cur_load; } // 获取当前负载。 + +private: + virtual CommTarget *acquire(int wait_timeout) = 0; // 纯虚函数,用于获取CommTarget对象。 + +protected: + size_t max_load; // 最大负载。 + size_t cur_load; // 当前负载。 + +public: + virtual ~CommSchedObject() { } // 虚析构函数。 + friend class CommScheduler; // CommScheduler类是其友元类。 +}; + +// 通信调度组类,管理一组CommSchedTarget对象。 +class CommSchedGroup; + +class CommSchedTarget : public CommSchedObject, public CommTarget // 继承CommSchedObject和CommTarget。 +{ +public: + int init(const struct sockaddr *addr, socklen_t addrlen, // 初始化函数,用于非SSL连接。 + int connect_timeout, int response_timeout, + size_t max_connections); + void deinit(); // 反初始化函数。 + +public: + int init(const struct sockaddr *addr, socklen_t addrlen, SSL_CTX *ssl_ctx, // 初始化函数,用于SSL连接。 + int connect_timeout, int ssl_connect_timeout, int response_timeout, + size_t max_connections); + { + int ret = this->init(addr, addrlen, connect_timeout, response_timeout, + max_connections); + + if (ret >= 0) + this->set_ssl(ssl_ctx, ssl_connect_timeout); + + return ret; + } + +private: + virtual CommTarget *acquire(int wait_timeout); // 获取CommTarget对象的实现。 + virtual void release(); // 释放CommTarget对象的实现。 + +private: + CommSchedGroup *group; // 所属的CommSchedGroup对象。 + int index; // 在组中的索引。 + int wait_cnt; // 等待计数。 + pthread_mutex_t mutex; // 互斥锁。 + pthread_cond_t cond; // 条件变量。 + friend class CommSchedGroup; // CommSchedGroup类是其友元类。 +}; + +// 通信调度组类,管理一组CommSchedTarget对象。 +class CommSchedGroup : public CommSchedObject +{ +public: + int init(); // 初始化函数。 + void deinit(); // 反初始化函数。 + int add(CommSchedTarget *target); // 添加CommSchedTarget对象。 + int remove(CommSchedTarget *target); // 移除CommSchedTarget对象。 + +private: + virtual CommTarget *acquire(int wait_timeout); // 获取CommTarget对象的实现。 + +private: + CommSchedTarget **tg_heap; // 使用堆管理CommSchedTarget对象。 + int heap_size; // 堆大小。 + int heap_buf_size; // 堆缓冲区大小。 + int wait_cnt; // 等待计数。 + pthread_mutex_t mutex; // 互斥锁。 + pthread_cond_t cond; // 条件变量。 + +private: + static int target_cmp(CommSchedTarget *target1, CommSchedTarget *target2); // 比较两个CommSchedTarget对象。 + void heapify(int top); // 堆化。 + void heap_adjust(int index, int swap_on_equal); // 堆调整。 + int heap_insert(CommSchedTarget *target); // 插入到堆中。 + void heap_remove(int index); // 从堆中移除。 + friend class CommSchedTarget; // CommSchedTarget类是其友元类。 +}; + +// 通信调度器类,用于管理和调度通信会话。 +class CommScheduler +{ +public: + int init(size_t poller_threads, size_t handler_threads); // 初始化函数。 + void deinit(); // 反初始化函数。 + + /* wait_timeout in milliseconds, -1 for no timeout. */ + int request(CommSession *session, CommSchedObject *object, // 请求处理函数。 + int wait_timeout, CommTarget **target); + int reply(CommSession *session); // 回复处理函数。 + int shutdown(CommSession *session); // 关闭会话处理函数。 + int push(const void *buf, size_t size, CommSession *session); // 数据推送处理函数。 + int bind(CommService *service); // 绑定服务处理函数。 + void unbind(CommService *service); // 解绑服务处理函数。 + + /* for sleepers. */ + int sleep(SleepSession *session); // 睡眠处理函数。 + /* Call 'unsleep' only before 'handle()' returns. */ + int unsleep(SleepSession *session); // 唤醒处理函数。 + + /* for file aio services. */ + int io_bind(IOService *service); // 文件AIO服务绑定处理函数。 + void io_unbind(IOService *service); // 文件AIO服务解绑处理函数。 + +public: + int is_handler_thread() const; // 判断是否是处理线程。 + int increase_handler_thread(); // 增加处理线程。 + int decrease_handler_thread(); // 减少处理线程。 + +private: + Communicator comm; // 通信器对象。 + +public: + virtual ~CommScheduler() { } // 虚析构函数。 +}; + +#endif \ No newline at end of file