From 7db2d843b73cd1c74331b904eb9a5b4df6cfac33 Mon Sep 17 00:00:00 2001 From: cmz <18570688110@163.com> Date: Tue, 24 Dec 2024 21:42:38 +0800 Subject: [PATCH] 11 --- src/kernel/mpoller - 改.c | 122 ++++++++++++++ src/kernel/msgqueue - 改.c | 211 ++++++++++++++++++++++++ src/kernel/thrdpool - 改.c | 318 ++++++++++++++++++++++++++++++++++++ 3 files changed, 651 insertions(+) create mode 100644 src/kernel/mpoller - 改.c create mode 100644 src/kernel/msgqueue - 改.c create mode 100644 src/kernel/thrdpool - 改.c diff --git a/src/kernel/mpoller - 改.c b/src/kernel/mpoller - 改.c new file mode 100644 index 0000000..139312e --- /dev/null +++ b/src/kernel/mpoller - 改.c @@ -0,0 +1,122 @@ +// °æÈ¨ÉùÃ÷£¬±íÃ÷Õâ¶Î´úÂëÊÇSogou, Inc.ÔÚ2019Äê°æÈ¨ËùÓе쬲¢ÇÒÕâ¶Î´úÂëÊÇÔÚApache License 2.0ÏÂÊÚȨµÄ¡£ +/* + Copyright (c) 2019 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com) +*/ + +#include // °üº¬±ê×¼¶¨ÒåÍ·Îļþ£¬±ÈÈçNULLºÍsize_tµÄ¶¨Òå +#include // °üº¬±ê×¼¿âÍ·Îļþ£¬±ÈÈçmallocºÍfreeº¯ÊýµÄ¶¨Òå +#include "poller.h" // °üº¬pollerµÄÍ·Îļþ£¬¶¨ÒåÁ˵¥Ïß³ÌpollerµÄ½Ó¿Ú +#include "mpoller.h" // °üº¬¶àÏß³ÌpollerµÄÍ·Îļþ£¬¶¨ÒåÁËmpollerµÄ½Ó¿Ú + +// ÍⲿÉùÃ÷µÄº¯Êý£¬ÓÃÓÚ´´½¨ºÍÏú»Ùpoller¶ÔÏó¡£ +extern poller_t* __poller_create(void**, const struct poller_params*); +extern void __poller_destroy(poller_t*); + +// ¾²Ì¬º¯Êý£¬ÓÃÓÚ´´½¨¶àÏß³Ìpoller¶ÔÏó¡£ +static int __mpoller_create(const struct poller_params* params, + mpoller_t* mpoller) +{ + void** nodes_buf = (void**)calloc(params->max_open_files, sizeof(void*)); // ·ÖÅäÄÚ´æÓÃÓÚ±£´æpoller¶ÔÏó + unsigned int i; + + if (nodes_buf) // Èç¹ûÄÚ´æ·ÖÅä³É¹¦ + { + for (i = 0; i < mpoller->nthreads; i++) // Ñ­»·´´½¨¶à¸öpoller¶ÔÏó + { + mpoller->poller[i] = __poller_create(nodes_buf, params); // ´´½¨µ¥¸öpoller + if (!mpoller->poller[i]) // Èç¹û´´½¨Ê§°Ü + break; // Í˳öÑ­»· + } + + if (i == mpoller->nthreads) // Èç¹ûËùÓÐpoller¶ÔÏó¶¼´´½¨³É¹¦ + { + mpoller->nodes_buf = nodes_buf; // ±£´ænodes_bufÖ¸Õë + return 0; // ·µ»Ø³É¹¦ + } + + while (i > 0) // Èç¹û´´½¨Ê§°Ü£¬Ïú»ÙÒѾ­´´½¨µÄpoller¶ÔÏó + __poller_destroy(mpoller->poller[--i]); + + free(nodes_buf); // ÊÍ·Ånodes_bufÄÚ´æ + } + + return -1; // ·µ»ØÊ§°Ü +} + +// º¯ÊýÓÃÓÚ´´½¨¶àÏß³Ìpoller¶ÔÏó¡£ +mpoller_t* mpoller_create(const struct poller_params* params, size_t nthreads) +{ + mpoller_t* mpoller; // ÉùÃ÷mpoller¶ÔÏóÖ¸Õë + size_t size; // ÉùÃ÷size±äÁ¿£¬ÓÃÓÚ¼ÆËãËùÐèÄÚ´æ´óС + + if (nthreads == 0) // Èç¹ûÏß³ÌÊýΪ0£¬ÔòĬÈÏΪ1 + nthreads = 1; + + size = offsetof(mpoller_t, poller) + nthreads * sizeof(void*); // ¼ÆËãËùÐèÄÚ´æ´óС + mpoller = (mpoller_t*)malloc(size); // ·ÖÅäÄÚ´æ + if (mpoller) // Èç¹ûÄÚ´æ·ÖÅä³É¹¦ + { + mpoller->nthreads = (unsigned int)nthreads; // ÉèÖÃÏß³ÌÊý + if (__mpoller_create(params, mpoller) >= 0) // Èç¹û´´½¨³É¹¦ + return mpoller; // ·µ»Ømpoller¶ÔÏó + + free(mpoller); // Èç¹û´´½¨Ê§°Ü£¬ÊÍ·ÅÄÚ´æ + } + + return NULL; // ·µ»ØNULL +} + +// º¯ÊýÓÃÓÚÆô¶¯¶àÏß³Ìpoller¶ÔÏó¡£ +int mpoller_start(mpoller_t* mpoller) +{ + size_t i; // Ñ­»·±äÁ¿ + + for (i = 0; i < mpoller->nthreads; i++) // Ñ­»·Æô¶¯Ã¿¸öpollerÏß³Ì + { + if (poller_start(mpoller->poller[i]) < 0) // Èç¹ûÆô¶¯Ê§°Ü + break; // Í˳öÑ­»· + } + + if (i == mpoller->nthreads) // Èç¹ûËùÓÐÏ̶߳¼Æô¶¯³É¹¦ + return 0; // ·µ»Ø³É¹¦ + + while (i > 0) // Èç¹ûÆô¶¯Ê§°Ü£¬Í£Ö¹ÒѾ­Æô¶¯µÄÏß³Ì + poller_stop(mpoller->poller[--i]); + + return -1; // ·µ»ØÊ§°Ü +} + +// º¯ÊýÓÃÓÚÍ£Ö¹¶àÏß³Ìpoller¶ÔÏó¡£ +void mpoller_stop(mpoller_t* mpoller) +{ + size_t i; // Ñ­»·±äÁ¿ + + for (i = 0; i < mpoller->nthreads; i++) // Ñ­»·Í£Ö¹Ã¿¸öpollerÏß³Ì + poller_stop(mpoller->poller[i]); +} + +// º¯ÊýÓÃÓÚÏú»Ù¶àÏß³Ìpoller¶ÔÏó¡£ +void mpoller_destroy(mpoller_t* mpoller) +{ + size_t i; // Ñ­»·±äÁ¿ + + for (i = 0; i < mpoller->nthreads; i++) // Ñ­»·Ïú»Ùÿ¸öpoller¶ÔÏó + __poller_destroy(mpoller->poller[i]); + + free(mpoller->nodes_buf); // ÊÍ·Ånodes_bufÄÚ´æ + free(mpoller); // ÊÍ·Åmpoller¶ÔÏóÄÚ´æ +} \ No newline at end of file diff --git a/src/kernel/msgqueue - 改.c b/src/kernel/msgqueue - 改.c new file mode 100644 index 0000000..4776074 --- /dev/null +++ b/src/kernel/msgqueue - 改.c @@ -0,0 +1,211 @@ +/* + Copyright (c) 2020 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com) +*/ + +/* + * This message queue originates from the project of Sogou C++ Workflow: + * https://github.com/sogou/workflow + * + * The idea of this implementation is quite simple and obvious. When the + * get_list is not empty, the consumer takes a message. Otherwise the consumer + * waits till put_list is not empty, and swap two lists. This method performs + * well when the queue is very busy, and the number of consumers is big. + */ + +#include // °üº¬´íÎóºÅ¶¨Òå +#include // °üº¬±ê×¼¿âº¯Êý£¬ÈçmallocºÍfree +#include // °üº¬POSIXÏ߳̿â +#include "msgqueue.h" // °üº¬ÏûÏ¢¶ÓÁеÄÉùÃ÷ + + // ÏûÏ¢¶ÓÁнṹÌ嶨Òå +struct __msgqueue +{ + size_t msg_max; // ÏûÏ¢¶ÓÁÐ×î´óÈÝÁ¿ + size_t msg_cnt; // µ±Ç°ÏûÏ¢ÊýÁ¿ + int linkoff; // ÏûÏ¢Á´½ÓÆ«ÒÆÁ¿ + int nonblock; // ÊÇ·ñΪ·Ç×èÈûģʽ + void* head1; // Í·²¿Ö¸Õë + void* head2; // ÁíÒ»¸öÍ·²¿Ö¸Õ룬ÓÃÓÚÓÅ»¯ + void** get_head; // »ñÈ¡²Ù×÷µÄÍ·²¿Ö¸Õë + void** put_head; // ·ÅÈë²Ù×÷µÄÍ·²¿Ö¸Õë + void** put_tail; // ·ÅÈë²Ù×÷µÄβ²¿Ö¸Õë + pthread_mutex_t get_mutex; // »ñÈ¡²Ù×÷µÄ»¥³âËø + pthread_mutex_t put_mutex; // ·ÅÈë²Ù×÷µÄ»¥³âËø + pthread_cond_t get_cond; // »ñÈ¡²Ù×÷µÄÌõ¼þ±äÁ¿ + pthread_cond_t put_cond; // ·ÅÈë²Ù×÷µÄÌõ¼þ±äÁ¿ +}; + +// ÉèÖÃÏûÏ¢¶ÓÁÐΪ·Ç×èÈûģʽ +void msgqueue_set_nonblock(msgqueue_t* queue) +{ + queue->nonblock = 1; // ÉèÖ÷Ç×èÈûģʽ + pthread_mutex_lock(&queue->put_mutex); // Ëø¶¨·ÅÈë²Ù×÷µÄ»¥³âËø + pthread_cond_signal(&queue->get_cond); // ֪ͨ»ñÈ¡²Ù×÷ + pthread_cond_broadcast(&queue->put_cond); // ¹ã²¥·ÅÈë²Ù×÷µÄÌõ¼þ±äÁ¿ + pthread_mutex_unlock(&queue->put_mutex); // ½âËø·ÅÈë²Ù×÷µÄ»¥³âËø +} + +// ÉèÖÃÏûÏ¢¶ÓÁÐΪ×èÈûģʽ +void msgqueue_set_block(msgqueue_t* queue) +{ + queue->nonblock = 0; // ÉèÖÃ×èÈûģʽ +} + +// ½«ÏûÏ¢·ÅÈëÏûÏ¢¶ÓÁÐĩβ +void msgqueue_put(void* msg, msgqueue_t* queue) +{ + void** link = (void**)((char*)msg + queue->linkoff); // »ñÈ¡ÏûÏ¢µÄÁ´½Ó×Ö¶Î + + *link = NULL; // ÉèÖÃÁ´½Ó×Ö¶ÎΪ¿Õ + pthread_mutex_lock(&queue->put_mutex); // Ëø¶¨·ÅÈë²Ù×÷µÄ»¥³âËø + while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock) // µÈ´ý¶ÓÁÐÓпÕλ + pthread_cond_wait(&queue->put_cond, &queue->put_mutex); // µÈ´ýÌõ¼þ±äÁ¿ + + *queue->put_tail = link; // ½«ÏûÏ¢Á´½Óµ½¶ÓÁÐβ²¿ + queue->put_tail = link; // ¸üÐÂβ²¿Ö¸Õë + queue->msg_cnt++; // Ôö¼ÓÏûÏ¢¼ÆÊý + pthread_mutex_unlock(&queue->put_mutex); // ½âËø·ÅÈë²Ù×÷µÄ»¥³âËø + pthread_cond_signal(&queue->get_cond); // ֪ͨ»ñÈ¡²Ù×÷ +} + +// ½«ÏûÏ¢·ÅÈëÏûÏ¢¶ÓÁÐÍ·²¿ +void msgqueue_put_head(void* msg, msgqueue_t* queue) +{ + void** link = (void**)((char*)msg + queue->linkoff); // »ñÈ¡ÏûÏ¢µÄÁ´½Ó×Ö¶Î + + pthread_mutex_lock(&queue->put_mutex); // Ëø¶¨·ÅÈë²Ù×÷µÄ»¥³âËø + while (*queue->get_head) // Èç¹û¶ÓÁÐ·Ç¿Õ + { + if (pthread_mutex_trylock(&queue->get_mutex) == 0) // ³¢ÊÔËø¶¨»ñÈ¡²Ù×÷µÄ»¥³âËø + { + pthread_mutex_unlock(&queue->put_mutex); // ½âËø·ÅÈë²Ù×÷µÄ»¥³âËø + *link = *queue->get_head; // ½«ÏûÏ¢Á´½Óµ½¶ÓÁÐÍ·²¿ + *queue->get_head = link; + pthread_mutex_unlock(&queue->get_mutex); // ½âËø»ñÈ¡²Ù×÷µÄ»¥³âËø + return; + } + } + + while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock) // µÈ´ý¶ÓÁÐÓпÕλ + pthread_cond_wait(&queue->put_cond, &queue->put_mutex); // µÈ´ýÌõ¼þ±äÁ¿ + + *link = *queue->put_head; // ½«ÏûÏ¢Á´½Óµ½¶ÓÁÐÍ·²¿ + if (*link == NULL) // Èç¹ûÔ­Í·²¿Îª¿Õ£¬Ôò¸üÐÂβ²¿Ö¸Õë + queue->put_tail = link; + + *queue->put_head = link; // ¸üÐÂÍ·²¿Ö¸Õë + queue->msg_cnt++; // Ôö¼ÓÏûÏ¢¼ÆÊý + pthread_mutex_unlock(&queue->put_mutex); // ½âËø·ÅÈë²Ù×÷µÄ»¥³âËø + pthread_cond_signal(&queue->get_cond); // ֪ͨ»ñÈ¡²Ù×÷ +} + +// ÄÚ²¿º¯Êý£¬ÓÃÓÚ½»»»ÏûÏ¢¶ÓÁеÄÍ·²¿ºÍβ²¿£¬ÒÔ±ã´Ó¶ÓÁÐÖлñÈ¡ÏûÏ¢ +static size_t __msgqueue_swap(msgqueue_t* queue) +{ + void** get_head = queue->get_head; + size_t cnt; + + pthread_mutex_lock(&queue->put_mutex); // Ëø¶¨·ÅÈë²Ù×÷µÄ»¥³âËø + while (queue->msg_cnt == 0 && !queue->nonblock) // Èç¹û¶ÓÁÐΪ¿ÕÇÒ²»ÊÇ·Ç×èÈûģʽ£¬ÔòµÈ´ý + pthread_cond_wait(&queue->get_cond, &queue->put_mutex); // µÈ´ý»ñÈ¡²Ù×÷µÄÌõ¼þ±äÁ¿ + + cnt = queue->msg_cnt; // »ñÈ¡µ±Ç°ÏûÏ¢ÊýÁ¿ + if (cnt > queue->msg_max - 1) // Èç¹ûÏûÏ¢ÊýÁ¿³¬¹ý×î´óÖµ¼õÒ»£¬Ôò¹ã²¥·ÅÈë²Ù×÷µÄÌõ¼þ±äÁ¿ + pthread_cond_broadcast(&queue->put_cond); + + queue->get_head = queue->put_head; // ½»»»»ñÈ¡ºÍ·ÅÈëµÄÍ·²¿Ö¸Õë + queue->put_head = get_head; // + queue->put_tail = get_head; // ÖØÖ÷ÅÈë²Ù×÷µÄβ²¿Ö¸Õë + queue->msg_cnt = 0; // ÖØÖÃÏûÏ¢ÊýÁ¿ + pthread_mutex_unlock(&queue->put_mutex); // ½âËø·ÅÈë²Ù×÷µÄ»¥³âËø + return cnt; // ·µ»ØÏûÏ¢ÊýÁ¿ +} + +// ´ÓÏûÏ¢¶ÓÁÐÖлñÈ¡ÏûÏ¢ +void* msgqueue_get(msgqueue_t* queue) +{ + void* msg; + + pthread_mutex_lock(&queue->get_mutex); // Ëø¶¨»ñÈ¡²Ù×÷µÄ»¥³âËø + if (*queue->get_head || __msgqueue_swap(queue) > 0) // Èç¹û¶ÓÁзǿջò½»»»ºóÓÐÏûÏ¢ + { + msg = (char*)*queue->get_head - queue->linkoff; // »ñÈ¡ÏûÏ¢ + *queue->get_head = *(void**)*queue->get_head; // ¸üÐÂÍ·²¿Ö¸Õë + } + else + msg = NULL; // Èç¹û¶ÓÁÐΪ¿Õ£¬Ôò·µ»ØNULL + + pthread_mutex_unlock(&queue->get_mutex); // ½âËø»ñÈ¡²Ù×÷µÄ»¥³âËø + return msg; // ·µ»ØÏûÏ¢ +} + +// ´´½¨ÏûÏ¢¶ÓÁÐ +msgqueue_t* msgqueue_create(size_t maxlen, int linkoff) +{ + msgqueue_t* queue = (msgqueue_t*)malloc(sizeof(msgqueue_t)); // ·ÖÅäÏûÏ¢¶ÓÁнṹÌåÄÚ´æ + int ret; + + if (!queue) + return NULL; + + ret = pthread_mutex_init(&queue->get_mutex, NULL); // ³õʼ»¯»ñÈ¡²Ù×÷µÄ»¥³âËø + if (ret == 0) + { + ret = pthread_mutex_init(&queue->put_mutex, NULL); // ³õʼ»¯·ÅÈë²Ù×÷µÄ»¥³âËø + if (ret == 0) + { + ret = pthread_cond_init(&queue->get_cond, NULL); // ³õʼ»¯»ñÈ¡²Ù×÷µÄÌõ¼þ±äÁ¿ + if (ret == 0) + { + ret = pthread_cond_init(&queue->put_cond, NULL); // ³õʼ»¯·ÅÈë²Ù×÷µÄÌõ¼þ±äÁ¿ + if (ret == 0) + { + queue->msg_max = maxlen; // ÉèÖÃ×î´óÏûÏ¢ÊýÁ¿ + queue->linkoff = linkoff; // ÉèÖÃÁ´½ÓÆ«ÒÆÁ¿ + queue->head1 = NULL; + queue->head2 = NULL; + queue->get_head = &queue->head1; // ÉèÖûñÈ¡²Ù×÷µÄÍ·²¿Ö¸Õë + queue->put_head = &queue->head2; // ÉèÖ÷ÅÈë²Ù×÷µÄÍ·²¿Ö¸Õë + queue->put_tail = &queue->head2; // ÉèÖ÷ÅÈë²Ù×÷µÄβ²¿Ö¸Õë + queue->msg_cnt = 0; // ³õʼ»¯ÏûÏ¢ÊýÁ¿ + queue->nonblock = 0; // ³õʼ»¯·Ç×èÈûģʽ + return queue; // ·µ»ØÏûÏ¢¶ÓÁÐ + } + + pthread_cond_destroy(&queue->get_cond); // Ïú»Ù»ñÈ¡²Ù×÷µÄÌõ¼þ±äÁ¿ + } + + pthread_mutex_destroy(&queue->put_mutex); // Ïú»Ù·ÅÈë²Ù×÷µÄ»¥³âËø + } + + pthread_mutex_destroy(&queue->get_mutex); // Ïú»Ù»ñÈ¡²Ù×÷µÄ»¥³âËø + } + + errno = ret; // ÉèÖôíÎóºÅ + free(queue); // ÊÍ·ÅÏûÏ¢¶ÓÁнṹÌåÄÚ´æ + return NULL; // ·µ»ØNULL +} + +// Ïú»ÙÏûÏ¢¶ÓÁÐ +void msgqueue_destroy(msgqueue_t* queue) +{ + pthread_cond_destroy(&queue->put_cond); // Ïú»Ù·ÅÈë²Ù×÷µÄÌõ¼þ±äÁ¿ + pthread_cond_destroy(&queue->get_cond); // Ïú»Ù»ñÈ¡²Ù×÷µÄÌõ¼þ±äÁ¿ + pthread_mutex_destroy(&queue->put_mutex); // Ïú»Ù·ÅÈë²Ù×÷µÄ»¥³âËø + pthread_mutex_destroy(&queue->get_mutex); // Ïú»Ù»ñÈ¡²Ù×÷µÄ»¥³âËø + free(queue); // ÊÍ·ÅÏûÏ¢¶ÓÁнṹÌåÄÚ´æ +} diff --git a/src/kernel/thrdpool - 改.c b/src/kernel/thrdpool - 改.c new file mode 100644 index 0000000..abac1a4 --- /dev/null +++ b/src/kernel/thrdpool - 改.c @@ -0,0 +1,318 @@ +/* + Copyright (c) 2019 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com) +*/ + +#include // °üº¬´íÎóºÅ¶¨Òå +#include // °üº¬POSIXÏ߳̿â +#include // °üº¬±ê×¼¿âº¯Êý£¬ÈçmallocºÍfree +#include "msgqueue.h" // °üº¬ÏûÏ¢¶ÓÁеÄʵÏÖ£¬ÓÃÓÚÏ̼߳äͨÐÅ +#include "thrdpool.h" // °üº¬Ï̳߳صÄÉùÃ÷ + +// Ï̳߳ؽṹÌ嶨Òå +struct __thrdpool +{ + msgqueue_t* msgqueue; // ÏûÏ¢¶ÓÁУ¬ÓÃÓÚ´æ·Å´ýÖ´ÐеÄÈÎÎñ + size_t nthreads; // Ï̳߳ØÖеÄÏß³ÌÊýÁ¿ + size_t stacksize; // Ïß³ÌÕ»µÄ´óС + pthread_t tid; // Ïß³ÌID + pthread_mutex_t mutex; // »¥³âËø£¬ÓÃÓÚ±£»¤Ï̳߳صĹ²ÏíÊý¾Ý + pthread_key_t key; // Ïֲ߳̾¿´æ´¢µÄ¼ü + pthread_cond_t* terminate; // Ìõ¼þ±äÁ¿£¬ÓÃÓÚ֪ͨÏß³ÌÍ˳ö +}; + +// ÈÎÎñÌõÄ¿½á¹¹Ì嶨Òå +struct __thrdpool_task_entry +{ + void* link; // Á´½Óµ½ÏÂÒ»¸öÈÎÎñÌõÄ¿ + struct thrdpool_task task; // ÈÎÎñ½á¹¹Ì壬°üº¬ÈÎÎñº¯ÊýºÍÉÏÏÂÎÄ +}; + +// ¾²Ì¬±äÁ¿£¬ÓÃÓÚ±íʾһ¸ö¿ÕµÄÏß³ÌID +static pthread_t __zero_tid; + +// Ïß³ÌÍ˳öʱµÄ»Øµ÷º¯Êý +static void __thrdpool_exit_routine(void* context) +{ + thrdpool_t* pool = (thrdpool_t*)context; + pthread_t tid; + + // Ëø¶¨»¥³âËø + pthread_mutex_lock(&pool->mutex); + tid = pool->tid; + pool->tid = pthread_self(); + if (--pool->nthreads == 0 && pool->terminate) + pthread_cond_signal(pool->terminate); + + pthread_mutex_unlock(&pool->mutex); + // Èç¹û²»ÊÇÁãID£¬ÔòµÈ´ýÏ߳̽áÊø + if (!pthread_equal(tid, __zero_tid)) + pthread_join(tid, NULL); + + pthread_exit(NULL); +} + +// Ï̳߳صÄÏ̺߳¯Êý +static void* __thrdpool_routine(void* arg) +{ + thrdpool_t* pool = (thrdpool_t*)arg; + struct __thrdpool_task_entry* entry; + void (*task_routine)(void*); + void* task_context; + + // ÉèÖÃÏֲ߳̾¿´æ´¢ + pthread_setspecific(pool->key, pool); + while (!pool->terminate) + { + // ´ÓÏûÏ¢¶ÓÁÐÖлñÈ¡ÈÎÎñ + entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue); + if (!entry) + break; + + // Ö´ÐÐÈÎÎñ + task_routine = entry->task.routine; + task_context = entry->task.context; + free(entry); + task_routine(task_context); + + // Èç¹ûÏ̳߳ØÖеÄÏß³ÌÊýÁ¿Îª0£¬ÔòÏ̳߳ØÒѱ»Ïú»Ù + if (pool->nthreads == 0) + { + free(pool); + return NULL; + } + } + + // Ö´ÐÐÏß³ÌÍ˳öʱµÄ²Ù×÷ + __thrdpool_exit_routine(pool); + return NULL; +} + +// Ï̳߳ØÏú»Ùº¯Êý +static void __thrdpool_terminate(int in_pool, thrdpool_t* pool) +{ + pthread_cond_t term = PTHREAD_COND_INITIALIZER; + + // Ëø¶¨»¥³âËø + pthread_mutex_lock(&pool->mutex); + msgqueue_set_nonblock(pool->msgqueue); + pool->terminate = &term; + + if (in_pool) + { + // Èç¹ûÔÚÏ̳߳صÄÏß³ÌÖÐÏú»ÙÏ̳߳أ¬Ôò·ÖÀ뵱ǰÏß³Ì + pthread_detach(pthread_self()); + pool->nthreads--; + } + + // µÈ´ýËùÓÐÏ߳̽áÊø + while (pool->nthreads > 0) + pthread_cond_wait(&term, &pool->mutex); + + pthread_mutex_unlock(&pool->mutex); + // Èç¹û²»ÊÇÁãID£¬ÔòµÈ´ýÏ߳̽áÊø + if (!pthread_equal(pool->tid, __zero_tid)) + pthread_join(pool->tid, NULL); +} + +// Ï̳߳ش´½¨Ï̺߳¯Êý +static int __thrdpool_create_threads(size_t nthreads, thrdpool_t* pool) +{ + pthread_attr_t attr; + pthread_t tid; + int ret; + + // ³õʼ»¯Ïß³ÌÊôÐÔ + ret = pthread_attr_init(&attr); + if (ret == 0) + { + if (pool->stacksize) + pthread_attr_setstacksize(&attr, pool->stacksize); + + while (pool->nthreads < nthreads) + { + // ´´½¨Ïß³Ì + ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); + if (ret == 0) + pool->nthreads++; + else + break; + } + + pthread_attr_destroy(&attr); + if (pool->nthreads == nthreads) + return 0; + + // Èç¹û´´½¨Ê§°Ü£¬ÔòÏú»ÙÏß³Ì³Ø + __thrdpool_terminate(0, pool); + } + + errno = ret; + return -1; +} + +// ´´½¨Ïß³Ì³Ø +thrdpool_t* thrdpool_create(size_t nthreads, size_t stacksize) +{ + thrdpool_t* pool; + int ret; + + pool = (thrdpool_t*)malloc(sizeof(thrdpool_t)); // ·ÖÅäÏ̳߳ؽṹÌåÄÚ´æ + if (!pool) + return NULL; + + pool->msgqueue = msgqueue_create(0, 0); // ´´½¨ÏûÏ¢¶ÓÁÐ + if (pool->msgqueue) + { + ret = pthread_mutex_init(&pool->mutex, NULL); // ³õʼ»¯»¥³âËø + if (ret == 0) + { + ret = pthread_key_create(&pool->key, NULL); // ´´½¨Ïֲ߳̾¿´æ´¢¼ü + if (ret == 0) + { + pool->stacksize = stacksize; // ÉèÖÃÏß³ÌÕ»´óС + pool->nthreads = 0; // ³õʼ»¯Ïß³ÌÊýÁ¿ + pool->tid = __zero_tid; // ³õʼ»¯Ïß³ÌID + pool->terminate = NULL; // ³õʼ»¯ÖÕÖ¹Ìõ¼þ + if (__thrdpool_create_threads(nthreads, pool) >= 0) // ´´½¨Ïß³Ì + return pool; + + pthread_key_delete(pool->key); // ɾ³ýÏֲ߳̾¿´æ´¢¼ü + } + + pthread_mutex_destroy(&pool->mutex); // Ïú»Ù»¥³âËø + } + + errno = ret; // ÉèÖôíÎóºÅ + msgqueue_destroy(pool->msgqueue); // Ïú»ÙÏûÏ¢¶ÓÁÐ + } + + free(pool); // ÊÍ·ÅÏ̳߳ؽṹÌåÄÚ´æ + return NULL; +} + +// ÄÚ²¿º¯Êý£¬ÓÃÓÚ½«ÈÎÎñµ÷¶Èµ½Ïß³Ì³Ø +inline void __thrdpool_schedule(const struct thrdpool_task* task, void* buf, + thrdpool_t* pool); + +void __thrdpool_schedule(const struct thrdpool_task* task, void* buf, + thrdpool_t* pool) +{ + ((struct __thrdpool_task_entry*)buf)->task = *task; // ÉèÖÃÈÎÎñ + msgqueue_put(buf, pool->msgqueue); // ½«ÈÎÎñ·ÅÈëÏûÏ¢¶ÓÁÐ +} + +// µ÷¶ÈÈÎÎñµ½Ïß³Ì³Ø +int thrdpool_schedule(const struct thrdpool_task* task, thrdpool_t* pool) +{ + void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // ·ÖÅäÈÎÎñÌõÄ¿ÄÚ´æ + + if (buf) + { + __thrdpool_schedule(task, buf, pool); // µ÷¶ÈÈÎÎñ + return 0; + } + + return -1; +} + +// Åжϵ±Ç°Ïß³ÌÊÇ·ñÔÚÏ̳߳ØÖÐ +inline int thrdpool_in_pool(thrdpool_t* pool); + +int thrdpool_in_pool(thrdpool_t* pool) +{ + return pthread_getspecific(pool->key) == pool; // ͨ¹ýÏֲ߳̾¿´æ´¢ÅÐ¶Ï +} + +// Ôö¼ÓÏ̳߳ØÖеÄÏß³ÌÊýÁ¿ +int thrdpool_increase(thrdpool_t* pool) +{ + pthread_attr_t attr; + pthread_t tid; + int ret; + + ret = pthread_attr_init(&attr); // ³õʼ»¯Ïß³ÌÊôÐÔ + if (ret == 0) + { + if (pool->stacksize) + pthread_attr_setstacksize(&attr, pool->stacksize); // ÉèÖÃÏß³ÌÕ»´óС + + pthread_mutex_lock(&pool->mutex); // Ëø¶¨»¥³âËø + ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); // ´´½¨Ïß³Ì + if (ret == 0) + pool->nthreads++; // Ôö¼ÓÏß³ÌÊýÁ¿ + + pthread_mutex_unlock(&pool->mutex); // ½âËø»¥³âËø + pthread_attr_destroy(&attr); // Ïú»ÙÏß³ÌÊôÐÔ + if (ret == 0) + return 0; + } + + errno = ret; // ÉèÖôíÎóºÅ + return -1; +} + +// ¼õÉÙÏ̳߳ØÖеÄÏß³ÌÊýÁ¿ +int thrdpool_decrease(thrdpool_t* pool) +{ + void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // ·ÖÅäÈÎÎñÌõÄ¿ÄÚ´æ + struct __thrdpool_task_entry* entry; + + if (buf) + { + entry = (struct __thrdpool_task_entry*)buf; + entry->task.routine = __thrdpool_exit_routine; // ÉèÖÃÍ˳öÈÎÎñ + entry->task.context = pool; + msgqueue_put_head(entry, pool->msgqueue); // ½«Í˳öÈÎÎñ·ÅÈëÏûÏ¢¶ÓÁÐÍ·²¿ + return 0; + } + + return -1; +} + +// Ï̳߳ØÖеÄÏß³ÌÍ˳ö +void thrdpool_exit(thrdpool_t* pool) +{ + if (thrdpool_in_pool(pool)) // Èç¹ûµ±Ç°Ïß³ÌÔÚÏ̳߳ØÖÐ + __thrdpool_exit_routine(pool); // Ö´ÐÐÍ˳ö²Ù×÷ +} + +// Ïú»ÙÏß³Ì³Ø +void thrdpool_destroy(void (*pending)(const struct thrdpool_task*), + thrdpool_t* pool) +{ + int in_pool = thrdpool_in_pool(pool); // ÅжÏÊÇ·ñÔÚÏ̳߳ØÖÐ + struct __thrdpool_task_entry* entry; + + __thrdpool_terminate(in_pool, pool); // ÖÕÖ¹Ïß³Ì³Ø + while (1) + { + entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue); // »ñÈ¡ÈÎÎñ + if (!entry) + break; + + if (pending && entry->task.routine != __thrdpool_exit_routine) // Èç¹ûÓÐ¹ÒÆðÈÎÎñ´¦Àíº¯Êý + pending(&entry->task); // ´¦Àí¹ÒÆðÈÎÎñ + + free(entry); // ÊÍ·ÅÈÎÎñÌõÄ¿ÄÚ´æ + } + + pthread_key_delete(pool->key); // ɾ³ýÏֲ߳̾¿´æ´¢¼ü + pthread_mutex_destroy(&pool->mutex); // Ïú»Ù»¥³âËø + msgqueue_destroy(pool->msgqueue); // Ïú»ÙÏûÏ¢¶ÓÁÐ + if (!in_pool) // Èç¹û²»ÔÚÏ̳߳ØÖУ¬ÊÍ·ÅÏ̳߳ؽṹÌåÄÚ´æ + free(pool); +} \ No newline at end of file