From 3a1ae8dfdfbf2b75e8a6d4c37ebebbc51c044189 Mon Sep 17 00:00:00 2001 From: cmz <3256005191@qq.com> Date: Mon, 16 Dec 2024 22:46:43 +0800 Subject: [PATCH] cmz --- src/kernel/CommScheduler - 改.h | 158 ++++++++++++++++ src/kernel/Communicator - 改.h | 304 +++++++++++++++++++++++++++++++ 2 files changed, 462 insertions(+) create mode 100644 src/kernel/CommScheduler - 改.h create mode 100644 src/kernel/Communicator - 改.h diff --git a/src/kernel/CommScheduler - 改.h b/src/kernel/CommScheduler - 改.h new file mode 100644 index 0000000..42561f2 --- /dev/null +++ b/src/kernel/CommScheduler - 改.h @@ -0,0 +1,158 @@ +// ¶¨ÒåÁËÒ»¸öͨÐŵ÷¶È¶ÔÏó£¬Ëü°üº¬ÁË×î´ó¸ºÔغ͵±Ç°¸ºÔصÄÊôÐÔ¡£ +class CommSchedObject +{ +public: + // ·µ»Ø×î´ó¸ºÔØ + size_t get_max_load() const { return this->max_load; } + // ·µ»Øµ±Ç°¸ºÔØ + size_t get_cur_load() const { return this->cur_load; } + +private: + // ´¿Ð麯Êý£¬ÓÃÓÚ»ñȡһ¸öCommTarget¶ÔÏ󣬾ßÌåµÄʵÏÖÓÉ×ÓÀàÍê³É + virtual CommTarget* acquire(int wait_timeout) = 0; + +protected: + // ×î´ó¸ºÔØ + size_t max_load; + // µ±Ç°¸ºÔØ + size_t cur_load; + +public: + // Îö¹¹º¯Êý + virtual ~CommSchedObject() { } + // ÓÑÔªÀàÉùÃ÷£¬ÔÊÐíCommScheduler·ÃÎÊ˽ÓгÉÔ± + friend class CommScheduler; +}; + +// ¶¨ÒåÁËÒ»¸öͨÐŵ÷¶È×飬Ëü°üº¬ÁËÒ»¸öCommSchedObjectµÄÁÐ±í¡£ +class CommSchedGroup; + +// ¶¨ÒåÁËÒ»¸öͨÐŵ÷¶ÈÄ¿±ê£¬ËüÊÇCommSchedObjectºÍCommTargetµÄ×ÓÀà¡£ +class CommSchedTarget : public CommSchedObject, public CommTarget +{ +public: + // ³õʼ»¯º¯Êý£¬ÓÃÓÚÉèÖÃÄ¿±êµØÖ·¡¢Á¬½Ó³¬Ê±µÈ²ÎÊý + int init(const struct sockaddr* addr, socklen_t addrlen, + int connect_timeout, int response_timeout, + size_t max_connections); + // ·´³õʼ»¯º¯Êý£¬ÓÃÓÚÇåÀí×ÊÔ´ + void deinit(); + + // ʹÓÃSSLµÄ³õʼ»¯º¯Êý + int init(const struct sockaddr* addr, socklen_t addrlen, SSL_CTX* ssl_ctx, + int connect_timeout, int ssl_connect_timeout, int response_timeout, + size_t max_connections) + { + // µ÷Ó÷ÇSSL°æ±¾µÄinitº¯Êý£¬È»ºóÉèÖÃSSLÉÏÏÂÎÄ + // ... + } + +private: + // ×îÖÕÖØÐ´µÄacquireºÍreleaseº¯Êý + virtual CommTarget* acquire(int wait_timeout); /* final */ + virtual void release(); /* final */ + +private: + // Ö¸ÏòËùÊôµÄCommSchedGroup + CommSchedGroup* group; + // Ë÷Òý + int index; + // µÈ´ý¼ÆÊý + int wait_cnt; + // »¥³âËøºÍÌõ¼þ±äÁ¿ + pthread_mutex_t mutex; + pthread_cond_t cond; + // ÓÑÔªÀàÉùÃ÷£¬ÔÊÐíCommSchedGroup·ÃÎÊ˽ÓгÉÔ± + friend class CommSchedGroup; +}; + +// ¶¨ÒåÁËÒ»¸öͨÐŵ÷¶È×飬ËüÊÇCommSchedObjectµÄ×ÓÀà¡£ +class CommSchedGroup : public CommSchedObject +{ +public: + // ³õʼ»¯ºÍ·´³õʼ»¯º¯Êý + int init(); + void deinit(); + // Ìí¼ÓºÍÒÆ³ýCommSchedTarget + int add(CommSchedTarget* target); + int remove(CommSchedTarget* target); + +private: + // ×îÖÕÖØÐ´µÄacquireº¯Êý + virtual CommTarget* acquire(int wait_timeout); /* final */ + +private: + // ÓÃÓÚ´æ´¢CommSchedTargetµÄ¶ÑÊý×é + CommSchedTarget** tg_heap; + // ¶ÑµÄ´óСºÍ»º³åÇø´óС + int heap_size; + int heap_buf_size; + // µÈ´ý¼ÆÊý + int wait_cnt; + // »¥³âËøºÍÌõ¼þ±äÁ¿ + pthread_mutex_t mutex; + pthread_cond_t cond; + + // ¾²Ì¬±È½Ïº¯Êý£¬ÓÃÓÚ¶ÑÅÅÐò + static int target_cmp(CommSchedTarget* target1, CommSchedTarget* target2); + // ¶ÑÏà¹Ø²Ù×÷º¯Êý + void heapify(int top); + void heap_adjust(int index, int swap_on_equal); + int heap_insert(CommSchedTarget* target); + void heap_remove(int index); + // ÓÑÔªÀàÉùÃ÷£¬ÔÊÐíCommSchedTarget·ÃÎÊ˽ÓгÉÔ± + friend class CommSchedTarget; +}; + +// ¶¨ÒåÁËÒ»¸öͨÐŵ÷¶ÈÆ÷£¬Ëü°üº¬ÁËÒ»¸öCommunicator¶ÔÏó¡£ +class CommScheduler +{ +public: + // ³õʼ»¯ºÍ·´³õʼ»¯º¯Êý + int init(size_t poller_threads, size_t handler_threads) + { + // µ÷ÓÃCommunicatorµÄ³õʼ»¯º¯Êý + // ... + } + + void deinit() + { + // µ÷ÓÃCommunicatorµÄ·´³õʼ»¯º¯Êý + // ... + } + + // ÇëÇóº¯Êý£¬ÓÃÓÚ»ñȡһ¸öCommTarget²¢·¢ÆðÇëÇó + int request(CommSession* session, CommSchedObject* object, + int wait_timeout, CommTarget** target) + { + // ʵÏÖϸ½Ú... + } + + // »Ø¸´¡¢¹Ø±Õ¡¢ÍÆËÍÊý¾Ý¡¢°ó¶¨·þÎñµÈº¯Êý + // ... + +public: + // ¼ì²éµ±Ç°Ïß³ÌÊÇ·ñÊÇ´¦ÀíỊ̈߳¬Ôö¼Ó»ò¼õÉÙ´¦ÀíÏ̵߳ĺ¯Êý + int is_handler_thread() const + { + // ʵÏÖϸ½Ú... + } + + int increase_handler_thread() + { + // ʵÏÖϸ½Ú... + } + + int decrease_handler_thread() + { + // ʵÏÖϸ½Ú... + } + +private: + // ÄÚ²¿µÄCommunicator¶ÔÏó + Communicator comm; + +public: + // Îö¹¹º¯Êý + virtual ~CommScheduler() { } +}; \ No newline at end of file diff --git a/src/kernel/Communicator - 改.h b/src/kernel/Communicator - 改.h new file mode 100644 index 0000000..e120b8d --- /dev/null +++ b/src/kernel/Communicator - 改.h @@ -0,0 +1,304 @@ +// ÉùÃ÷ÁËÒ»¸öͨÐÅÁ¬½ÓµÄ»ùÀà +class CommConnection +{ +public: + // ÐéÎö¹¹º¯Êý£¬ÒÔÈ·±£ÅÉÉúÀàµÄÎö¹¹º¯Êý±»ÕýÈ·µ÷Óà + virtual ~CommConnection() { } +}; + +// ÉùÃ÷ÁËÒ»¸öͨÐÅÄ¿±êÀ࣬ÓÃÓÚ±íʾÁ¬½ÓµÄÄ¿±êµØÖ· +class CommTarget +{ +public: + // ³õʼ»¯º¯Êý£¬ÉèÖÃÄ¿±êµØÖ·ºÍ³¬Ê±²ÎÊý + int init(const struct sockaddr* addr, socklen_t addrlen, + int connect_timeout, int response_timeout); + // ·´³õʼ»¯º¯Êý£¬ÇåÀí×ÊÔ´ + void deinit(); + + // »ñȡĿ±êµØÖ· + void get_addr(const struct sockaddr** addr, socklen_t* addrlen) const + { + // ... + } + + // ¼ì²éÊÇ·ñÓпÕÏÐÁ¬½Ó + int has_idle_conn() const { return !list_empty(&this->idle_list); } + +protected: + // ÉèÖÃSSLÉÏÏÂÎĺͳ¬Ê± + void set_ssl(SSL_CTX* ssl_ctx, int ssl_connect_timeout) + { + // ... + } + + // »ñÈ¡SSLÉÏÏÂÎÄ + SSL_CTX* get_ssl_ctx() const { return this->ssl_ctx; } + +private: + // ´´½¨Á¬½ÓÎļþÃèÊö·ûµÄ´¿Ð麯Êý + virtual int create_connect_fd() + { + // ... + } + + // ´´½¨ÐµÄÁ¬½Ó¶ÔÏóµÄ´¿Ð麯Êý + virtual CommConnection* new_connection(int connect_fd) + { + // ... + } + + // ³õʼ»¯SSLµÄÐ麯Êý + virtual int init_ssl(SSL* ssl) { return 0; } + + // Ä¿±êµØÖ·¡¢³¤¶È¡¢³¬Ê±²ÎÊýºÍSSLÉÏÏÂÎÄ + struct sockaddr* addr; + socklen_t addrlen; + int connect_timeout; + int response_timeout; + int ssl_connect_timeout; + SSL_CTX* ssl_ctx; + + // ¿ÕÏÐÁ¬½ÓÁбíºÍ»¥³âËø + struct list_head idle_list; + pthread_mutex_t mutex; + +public: + // ÐéÎö¹¹º¯Êý + virtual ~CommTarget() { } + // ÓÑÔªÀàÉùÃ÷ + friend class CommServiceTarget; + friend class Communicator; +}; + +// ÉùÃ÷ÁËÒ»¸öÏûÏ¢Êä³öµÄ³éÏóÀà +class CommMessageOut +{ +private: + // ±àÂëÏûÏ¢µ½iovecÊý×éµÄ´¿Ð麯Êý + virtual int encode(struct iovec vectors[], int max) = 0; + +public: + // ÐéÎö¹¹º¯Êý + virtual ~CommMessageOut() { } + // ÓÑÔªÀàÉùÃ÷ + friend class Communicator; +}; + +// ÉùÃ÷ÁËÒ»¸öÏûÏ¢ÊäÈëµÄ³éÏóÀ࣬¼Ì³Ð×Ôpoller_message_t +class CommMessageIn : private poller_message_t +{ +private: + // ×·¼ÓÊý¾Ýµ½ÏûÏ¢µÄ´¿Ð麯Êý + virtual int append(const void* buf, size_t* size) = 0; + +protected: + // ·´À¡º¯Êý£¬ÓÃÓÚÔÚ½ÓÊÕʱ·¢ËÍСÊý¾Ý°ü + virtual int feedback(const void* buf, size_t size); + + // ¸üнÓÊÕ¿ªÊ¼Ê±¼ä + virtual void renew(); + + // »ñÈ¡×îÄÚ²ã°ü×°µÄÏûÏ¢ + virtual CommMessageIn* inner() { return this; } + +private: + // Á¬½ÓÈë¿Ú + struct CommConnEntry* entry; + +public: + // ÐéÎö¹¹º¯Êý + virtual ~CommMessageIn() { } + // ÓÑÔªÀàÉùÃ÷ + friend class Communicator; +}; + +// ¶¨ÒåÁ˻Ự״̬µÄºê +#define CS_STATE_SUCCESS 0 +#define CS_STATE_ERROR 1 +#define CS_STATE_STOPPED 2 +#define CS_STATE_TOREPLY 3 /* ½öÓÃÓÚ·þÎñ»á»° */ + +// ÉùÃ÷ÁËÒ»¸öͨÐŻỰµÄ³éÏóÀà +class CommSession +{ +private: + // »ñÈ¡ÏûÏ¢Êä³ö¡¢ÊäÈë¶ÔÏóµÄ´¿Ð麯Êý + virtual CommMessageOut* message_out() = 0; + virtual CommMessageIn* message_in() = 0; + // »ñÈ¡³¬Ê±²ÎÊýµÄÐ麯Êý + virtual int send_timeout() { return -1; } + virtual int receive_timeout() { return -1; } + virtual int keep_alive_timeout() { return 0; } + virtual int first_timeout() { return 0; } + // ´¦Àí»á»°×´Ì¬µÄ´¿Ð麯Êý + virtual void handle(int state, int error) = 0; + +protected: + // »ñȡĿ±ê¡¢Á¬½Ó¡¢ÏûÏ¢¶ÔÏóºÍÐòÁкŠ+ CommTarget* get_target() const { return this->target; } + CommConnection* get_connection() const { return this->conn; } + CommMessageOut* get_message_out() const { return this->out; } + CommMessageIn* get_message_in() const { return this->in; } + long long get_seq() const { return this->seq; } + +private: + // »á»°µÄÄ¿±ê¡¢Á¬½Ó¡¢ÏûÏ¢¶ÔÏó¡¢ÐòÁкš¢¿ªÊ¼Ê±¼äºÍ³¬Ê±²ÎÊý + CommTarget* target; + CommConnection* conn; + CommMessageOut* out; + CommMessageIn* in; + long long seq; + + struct timespec begin_time; + int timeout; + int passive; + +public: + // ¹¹Ô캯Êý + CommSession() { this->passive = 0; } + // ÐéÎö¹¹º¯Êý + virtual ~CommSession(); + // ÓÑÔªÀàÉùÃ÷ + friend class CommMessageIn; + friend class Communicator; +}; + +// ÉùÃ÷ÁËÒ»¸öͨÐÅ·þÎñµÄ³éÏóÀà +class CommService +{ +public: + // ³õʼ»¯ºÍ·´³õʼ»¯º¯Êý + int init(const struct sockaddr* bind_addr, socklen_t addrlen, + int listen_timeout, int response_timeout); + void deinit(); + + // ÇåÀí»î¶¯µÄ»á»° + int drain(int max); + +public: + // »ñÈ¡°ó¶¨µØÖ· + void get_addr(const struct sockaddr** addr, socklen_t* addrlen) const + { + // ... + } + +protected: + // ÉèÖÃSSLÉÏÏÂÎĺͳ¬Ê± + void set_ssl(SSL_CTX* ssl_ctx, int ssl_accept_timeout) + { + // ... + } + + // »ñÈ¡SSLÉÏÏÂÎÄ + SSL_CTX* get_ssl_ctx() const { return this->ssl_ctx; } + +private: + // ´´½¨ÐµĻỰ¶ÔÏóµÄ´¿Ð麯Êý + virtual CommSession* new_session(long long seq, CommConnection* conn) = 0; + // ´¦ÀíÍ£Ö¹ºÍδ°ó¶¨Çé¿öµÄÐ麯Êý + virtual void handle_stop(int error) { } + virtual void handle_unbound() = 0; + +private: + // ´´½¨¼àÌýÎļþÃèÊö·ûµÄÐ麯Êý + virtual int create_listen_fd() + { + // ... + } + + // ´´½¨ÐµÄÁ¬½Ó¶ÔÏóµÄÐ麯Êý + virtual CommConnection* new_connection(int accept_fd) + { + // ... + } + + // ³õʼ»¯SSLµÄÐ麯Êý + virtual int init_ssl(SSL* ssl) { return 0; } + + // °ó¶¨µØÖ·¡¢³¤¶È¡¢³¬Ê±²ÎÊýºÍSSLÉÏÏÂÎÄ + struct sockaddr* bind_addr; + socklen_t addrlen; + int listen_timeout; + int response_timeout; + int ssl_accept_timeout; + SSL_CTX* ssl_ctx; + + // ÒýÓüÆÊýºÍ»î¶¯»á»°Áбí + void incref(); + void decref(); + + int reliable; + int listen_fd; + int ref; + + struct list_head alive_list; + pthread_mutex_t mutex; + +public: + // ÐéÎö¹¹º¯Êý + virtual ~CommService() { } + // ÓÑÔªÀàÉùÃ÷ + friend class CommServiceTarget; + friend class Communicator; +}; + +// ¶¨ÒåÁË˯Ã߻Ự״̬µÄºê +#define SS_STATE_COMPLETE 0 +#define SS_STATE_ERROR 1 +#define SS_STATE_DISRUPTED 2 + +// ÉùÃ÷ÁËÒ»¸ö˯Ã߻ỰµÄ³éÏóÀà +class SleepSession +{ +private: + // »ñÈ¡³ÖÐøÊ±¼äºÍ´¦Àí˯Ã߻Ự״̬µÄ´¿Ð麯Êý + virtual int duration(struct timespec* value) = 0; + virtual void handle(int state, int error) = 0; + +private: + // ¶¨Ê±Æ÷ºÍË÷Òý + void* timer; + int index; + +public: + // ÐéÎö¹¹º¯Êý + virtual ~SleepSession() { } + // ÓÑÔªÀàÉùÃ÷ + friend class Communicator; +}; + +// ¸ù¾Ý²Ù×÷ϵͳ°üº¬²»Í¬µÄIO·þÎñʵÏÖ +#ifdef __linux__ +# include "IOService_linux.h" +#else +# include "IOService_thread.h" +#endif + +// ÉùÃ÷ÁËͨÐÅÆ÷À࣬¸ºÔð¹ÜÀí»á»°¡¢·þÎñºÍIO²Ù×÷ +class Communicator +{ +public: + // ³õʼ»¯ºÍ·´³õʼ»¯º¯Êý + int init(size_t poller_threads, size_t handler_threads); + void deinit(); + + // ÇëÇ󡢻ظ´¡¢ÍÆËÍÊý¾Ý¡¢¹Ø±Õ»á»°¡¢°ó¶¨·þÎñµÈº¯Êý + int request(CommSession* session, CommTarget* target); + int reply(CommSession* session); + int push(const void* buf, size_t size, CommSession* session); + int shutdown(CommSession* session); + int bind(CommService* service); + void unbind(CommService* service); + + // ˯ÃߺÍÈ¡Ïû˯Ã߻ỰµÄº¯Êý + int sleep(SleepSession* session); + int unsleep(SleepSession* session); + + // °ó¶¨ºÍ½â°óIO·þÎñµÄº¯Êý + int io_bind(IOService* service); + void io_unbind(IOService* service); + +public: + // ¼ì²éµ±Ç°Ïß³ÌÊÇ·ñÊÇ´¦ÀíỊ̈߳¬Ôö¼Ó»ò¼õÉÙ´¦ÀíÏ̵߳ĺ¯Êý + int is_handler_thread() const \ No newline at end of file