From 81825752d94b042d6c049294750a7a43c4cb28ac Mon Sep 17 00:00:00 2001 From: cmz <3256005191@qq.com> Date: Mon, 23 Dec 2024 20:25:42 +0800 Subject: [PATCH] 13 --- src/kernel/msgqueue - 改.c | 211 ++++++++++++++++ src/kernel/thrdpool.c | 477 +++++++++++++++++------------------- 2 files changed, 437 insertions(+), 251 deletions(-) create mode 100644 src/kernel/msgqueue - 改.c diff --git a/src/kernel/msgqueue - 改.c b/src/kernel/msgqueue - 改.c new file mode 100644 index 0000000..4776074 --- /dev/null +++ b/src/kernel/msgqueue - 改.c @@ -0,0 +1,211 @@ +/* + Copyright (c) 2020 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com) +*/ + +/* + * This message queue originates from the project of Sogou C++ Workflow: + * https://github.com/sogou/workflow + * + * The idea of this implementation is quite simple and obvious. When the + * get_list is not empty, the consumer takes a message. Otherwise the consumer + * waits till put_list is not empty, and swap two lists. This method performs + * well when the queue is very busy, and the number of consumers is big. + */ + +#include // °üº¬´íÎóºÅ¶¨Òå +#include // °üº¬±ê×¼¿âº¯Êý£¬ÈçmallocºÍfree +#include // °üº¬POSIXÏ߳̿â +#include "msgqueue.h" // °üº¬ÏûÏ¢¶ÓÁеÄÉùÃ÷ + + // ÏûÏ¢¶ÓÁнṹÌ嶨Òå +struct __msgqueue +{ + size_t msg_max; // ÏûÏ¢¶ÓÁÐ×î´óÈÝÁ¿ + size_t msg_cnt; // µ±Ç°ÏûÏ¢ÊýÁ¿ + int linkoff; // ÏûÏ¢Á´½ÓÆ«ÒÆÁ¿ + int nonblock; // ÊÇ·ñΪ·Ç×èÈûģʽ + void* head1; // Í·²¿Ö¸Õë + void* head2; // ÁíÒ»¸öÍ·²¿Ö¸Õ룬ÓÃÓÚÓÅ»¯ + void** get_head; // »ñÈ¡²Ù×÷µÄÍ·²¿Ö¸Õë + void** put_head; // ·ÅÈë²Ù×÷µÄÍ·²¿Ö¸Õë + void** put_tail; // ·ÅÈë²Ù×÷µÄβ²¿Ö¸Õë + pthread_mutex_t get_mutex; // »ñÈ¡²Ù×÷µÄ»¥³âËø + pthread_mutex_t put_mutex; // ·ÅÈë²Ù×÷µÄ»¥³âËø + pthread_cond_t get_cond; // »ñÈ¡²Ù×÷µÄÌõ¼þ±äÁ¿ + pthread_cond_t put_cond; // ·ÅÈë²Ù×÷µÄÌõ¼þ±äÁ¿ +}; + +// ÉèÖÃÏûÏ¢¶ÓÁÐΪ·Ç×èÈûģʽ +void msgqueue_set_nonblock(msgqueue_t* queue) +{ + queue->nonblock = 1; // ÉèÖ÷Ç×èÈûģʽ + pthread_mutex_lock(&queue->put_mutex); // Ëø¶¨·ÅÈë²Ù×÷µÄ»¥³âËø + pthread_cond_signal(&queue->get_cond); // ֪ͨ»ñÈ¡²Ù×÷ + pthread_cond_broadcast(&queue->put_cond); // ¹ã²¥·ÅÈë²Ù×÷µÄÌõ¼þ±äÁ¿ + pthread_mutex_unlock(&queue->put_mutex); // ½âËø·ÅÈë²Ù×÷µÄ»¥³âËø +} + +// ÉèÖÃÏûÏ¢¶ÓÁÐΪ×èÈûģʽ +void msgqueue_set_block(msgqueue_t* queue) +{ + queue->nonblock = 0; // ÉèÖÃ×èÈûģʽ +} + +// ½«ÏûÏ¢·ÅÈëÏûÏ¢¶ÓÁÐĩβ +void msgqueue_put(void* msg, msgqueue_t* queue) +{ + void** link = (void**)((char*)msg + queue->linkoff); // »ñÈ¡ÏûÏ¢µÄÁ´½Ó×Ö¶Î + + *link = NULL; // ÉèÖÃÁ´½Ó×Ö¶ÎΪ¿Õ + pthread_mutex_lock(&queue->put_mutex); // Ëø¶¨·ÅÈë²Ù×÷µÄ»¥³âËø + while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock) // µÈ´ý¶ÓÁÐÓпÕλ + pthread_cond_wait(&queue->put_cond, &queue->put_mutex); // µÈ´ýÌõ¼þ±äÁ¿ + + *queue->put_tail = link; // ½«ÏûÏ¢Á´½Óµ½¶ÓÁÐβ²¿ + queue->put_tail = link; // ¸üÐÂβ²¿Ö¸Õë + queue->msg_cnt++; // Ôö¼ÓÏûÏ¢¼ÆÊý + pthread_mutex_unlock(&queue->put_mutex); // ½âËø·ÅÈë²Ù×÷µÄ»¥³âËø + pthread_cond_signal(&queue->get_cond); // ֪ͨ»ñÈ¡²Ù×÷ +} + +// ½«ÏûÏ¢·ÅÈëÏûÏ¢¶ÓÁÐÍ·²¿ +void msgqueue_put_head(void* msg, msgqueue_t* queue) +{ + void** link = (void**)((char*)msg + queue->linkoff); // »ñÈ¡ÏûÏ¢µÄÁ´½Ó×Ö¶Î + + pthread_mutex_lock(&queue->put_mutex); // Ëø¶¨·ÅÈë²Ù×÷µÄ»¥³âËø + while (*queue->get_head) // Èç¹û¶ÓÁÐ·Ç¿Õ + { + if (pthread_mutex_trylock(&queue->get_mutex) == 0) // ³¢ÊÔËø¶¨»ñÈ¡²Ù×÷µÄ»¥³âËø + { + pthread_mutex_unlock(&queue->put_mutex); // ½âËø·ÅÈë²Ù×÷µÄ»¥³âËø + *link = *queue->get_head; // ½«ÏûÏ¢Á´½Óµ½¶ÓÁÐÍ·²¿ + *queue->get_head = link; + pthread_mutex_unlock(&queue->get_mutex); // ½âËø»ñÈ¡²Ù×÷µÄ»¥³âËø + return; + } + } + + while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock) // µÈ´ý¶ÓÁÐÓпÕλ + pthread_cond_wait(&queue->put_cond, &queue->put_mutex); // µÈ´ýÌõ¼þ±äÁ¿ + + *link = *queue->put_head; // ½«ÏûÏ¢Á´½Óµ½¶ÓÁÐÍ·²¿ + if (*link == NULL) // Èç¹ûÔ­Í·²¿Îª¿Õ£¬Ôò¸üÐÂβ²¿Ö¸Õë + queue->put_tail = link; + + *queue->put_head = link; // ¸üÐÂÍ·²¿Ö¸Õë + queue->msg_cnt++; // Ôö¼ÓÏûÏ¢¼ÆÊý + pthread_mutex_unlock(&queue->put_mutex); // ½âËø·ÅÈë²Ù×÷µÄ»¥³âËø + pthread_cond_signal(&queue->get_cond); // ֪ͨ»ñÈ¡²Ù×÷ +} + +// ÄÚ²¿º¯Êý£¬ÓÃÓÚ½»»»ÏûÏ¢¶ÓÁеÄÍ·²¿ºÍβ²¿£¬ÒÔ±ã´Ó¶ÓÁÐÖлñÈ¡ÏûÏ¢ +static size_t __msgqueue_swap(msgqueue_t* queue) +{ + void** get_head = queue->get_head; + size_t cnt; + + pthread_mutex_lock(&queue->put_mutex); // Ëø¶¨·ÅÈë²Ù×÷µÄ»¥³âËø + while (queue->msg_cnt == 0 && !queue->nonblock) // Èç¹û¶ÓÁÐΪ¿ÕÇÒ²»ÊÇ·Ç×èÈûģʽ£¬ÔòµÈ´ý + pthread_cond_wait(&queue->get_cond, &queue->put_mutex); // µÈ´ý»ñÈ¡²Ù×÷µÄÌõ¼þ±äÁ¿ + + cnt = queue->msg_cnt; // »ñÈ¡µ±Ç°ÏûÏ¢ÊýÁ¿ + if (cnt > queue->msg_max - 1) // Èç¹ûÏûÏ¢ÊýÁ¿³¬¹ý×î´óÖµ¼õÒ»£¬Ôò¹ã²¥·ÅÈë²Ù×÷µÄÌõ¼þ±äÁ¿ + pthread_cond_broadcast(&queue->put_cond); + + queue->get_head = queue->put_head; // ½»»»»ñÈ¡ºÍ·ÅÈëµÄÍ·²¿Ö¸Õë + queue->put_head = get_head; // + queue->put_tail = get_head; // ÖØÖ÷ÅÈë²Ù×÷µÄβ²¿Ö¸Õë + queue->msg_cnt = 0; // ÖØÖÃÏûÏ¢ÊýÁ¿ + pthread_mutex_unlock(&queue->put_mutex); // ½âËø·ÅÈë²Ù×÷µÄ»¥³âËø + return cnt; // ·µ»ØÏûÏ¢ÊýÁ¿ +} + +// ´ÓÏûÏ¢¶ÓÁÐÖлñÈ¡ÏûÏ¢ +void* msgqueue_get(msgqueue_t* queue) +{ + void* msg; + + pthread_mutex_lock(&queue->get_mutex); // Ëø¶¨»ñÈ¡²Ù×÷µÄ»¥³âËø + if (*queue->get_head || __msgqueue_swap(queue) > 0) // Èç¹û¶ÓÁзǿջò½»»»ºóÓÐÏûÏ¢ + { + msg = (char*)*queue->get_head - queue->linkoff; // »ñÈ¡ÏûÏ¢ + *queue->get_head = *(void**)*queue->get_head; // ¸üÐÂÍ·²¿Ö¸Õë + } + else + msg = NULL; // Èç¹û¶ÓÁÐΪ¿Õ£¬Ôò·µ»ØNULL + + pthread_mutex_unlock(&queue->get_mutex); // ½âËø»ñÈ¡²Ù×÷µÄ»¥³âËø + return msg; // ·µ»ØÏûÏ¢ +} + +// ´´½¨ÏûÏ¢¶ÓÁÐ +msgqueue_t* msgqueue_create(size_t maxlen, int linkoff) +{ + msgqueue_t* queue = (msgqueue_t*)malloc(sizeof(msgqueue_t)); // ·ÖÅäÏûÏ¢¶ÓÁнṹÌåÄÚ´æ + int ret; + + if (!queue) + return NULL; + + ret = pthread_mutex_init(&queue->get_mutex, NULL); // ³õʼ»¯»ñÈ¡²Ù×÷µÄ»¥³âËø + if (ret == 0) + { + ret = pthread_mutex_init(&queue->put_mutex, NULL); // ³õʼ»¯·ÅÈë²Ù×÷µÄ»¥³âËø + if (ret == 0) + { + ret = pthread_cond_init(&queue->get_cond, NULL); // ³õʼ»¯»ñÈ¡²Ù×÷µÄÌõ¼þ±äÁ¿ + if (ret == 0) + { + ret = pthread_cond_init(&queue->put_cond, NULL); // ³õʼ»¯·ÅÈë²Ù×÷µÄÌõ¼þ±äÁ¿ + if (ret == 0) + { + queue->msg_max = maxlen; // ÉèÖÃ×î´óÏûÏ¢ÊýÁ¿ + queue->linkoff = linkoff; // ÉèÖÃÁ´½ÓÆ«ÒÆÁ¿ + queue->head1 = NULL; + queue->head2 = NULL; + queue->get_head = &queue->head1; // ÉèÖûñÈ¡²Ù×÷µÄÍ·²¿Ö¸Õë + queue->put_head = &queue->head2; // ÉèÖ÷ÅÈë²Ù×÷µÄÍ·²¿Ö¸Õë + queue->put_tail = &queue->head2; // ÉèÖ÷ÅÈë²Ù×÷µÄβ²¿Ö¸Õë + queue->msg_cnt = 0; // ³õʼ»¯ÏûÏ¢ÊýÁ¿ + queue->nonblock = 0; // ³õʼ»¯·Ç×èÈûģʽ + return queue; // ·µ»ØÏûÏ¢¶ÓÁÐ + } + + pthread_cond_destroy(&queue->get_cond); // Ïú»Ù»ñÈ¡²Ù×÷µÄÌõ¼þ±äÁ¿ + } + + pthread_mutex_destroy(&queue->put_mutex); // Ïú»Ù·ÅÈë²Ù×÷µÄ»¥³âËø + } + + pthread_mutex_destroy(&queue->get_mutex); // Ïú»Ù»ñÈ¡²Ù×÷µÄ»¥³âËø + } + + errno = ret; // ÉèÖôíÎóºÅ + free(queue); // ÊÍ·ÅÏûÏ¢¶ÓÁнṹÌåÄÚ´æ + return NULL; // ·µ»ØNULL +} + +// Ïú»ÙÏûÏ¢¶ÓÁÐ +void msgqueue_destroy(msgqueue_t* queue) +{ + pthread_cond_destroy(&queue->put_cond); // Ïú»Ù·ÅÈë²Ù×÷µÄÌõ¼þ±äÁ¿ + pthread_cond_destroy(&queue->get_cond); // Ïú»Ù»ñÈ¡²Ù×÷µÄÌõ¼þ±äÁ¿ + pthread_mutex_destroy(&queue->put_mutex); // Ïú»Ù·ÅÈë²Ù×÷µÄ»¥³âËø + pthread_mutex_destroy(&queue->get_mutex); // Ïú»Ù»ñÈ¡²Ù×÷µÄ»¥³âËø + free(queue); // ÊÍ·ÅÏûÏ¢¶ÓÁнṹÌåÄÚ´æ +} diff --git a/src/kernel/thrdpool.c b/src/kernel/thrdpool.c index abac1a4..c1be399 100644 --- a/src/kernel/thrdpool.c +++ b/src/kernel/thrdpool.c @@ -16,303 +16,278 @@ Author: Xie Han (xiehan@sogou-inc.com) */ -#include // °üº¬´íÎóºÅ¶¨Òå -#include // °üº¬POSIXÏ߳̿â -#include // °üº¬±ê×¼¿âº¯Êý£¬ÈçmallocºÍfree -#include "msgqueue.h" // °üº¬ÏûÏ¢¶ÓÁеÄʵÏÖ£¬ÓÃÓÚÏ̼߳äͨÐÅ -#include "thrdpool.h" // °üº¬Ï̳߳صÄÉùÃ÷ +#include +#include +#include +#include "msgqueue.h" +#include "thrdpool.h" -// Ï̳߳ؽṹÌ嶨Òå struct __thrdpool { - msgqueue_t* msgqueue; // ÏûÏ¢¶ÓÁУ¬ÓÃÓÚ´æ·Å´ýÖ´ÐеÄÈÎÎñ - size_t nthreads; // Ï̳߳ØÖеÄÏß³ÌÊýÁ¿ - size_t stacksize; // Ïß³ÌÕ»µÄ´óС - pthread_t tid; // Ïß³ÌID - pthread_mutex_t mutex; // »¥³âËø£¬ÓÃÓÚ±£»¤Ï̳߳صĹ²ÏíÊý¾Ý - pthread_key_t key; // Ïֲ߳̾¿´æ´¢µÄ¼ü - pthread_cond_t* terminate; // Ìõ¼þ±äÁ¿£¬ÓÃÓÚ֪ͨÏß³ÌÍ˳ö + msgqueue_t *msgqueue; + size_t nthreads; + size_t stacksize; + pthread_t tid; + pthread_mutex_t mutex; + pthread_key_t key; + pthread_cond_t *terminate; }; -// ÈÎÎñÌõÄ¿½á¹¹Ì嶨Òå struct __thrdpool_task_entry { - void* link; // Á´½Óµ½ÏÂÒ»¸öÈÎÎñÌõÄ¿ - struct thrdpool_task task; // ÈÎÎñ½á¹¹Ì壬°üº¬ÈÎÎñº¯ÊýºÍÉÏÏÂÎÄ + void *link; + struct thrdpool_task task; }; -// ¾²Ì¬±äÁ¿£¬ÓÃÓÚ±íʾһ¸ö¿ÕµÄÏß³ÌID static pthread_t __zero_tid; -// Ïß³ÌÍ˳öʱµÄ»Øµ÷º¯Êý -static void __thrdpool_exit_routine(void* context) +static void __thrdpool_exit_routine(void *context) { - thrdpool_t* pool = (thrdpool_t*)context; - pthread_t tid; - - // Ëø¶¨»¥³âËø - pthread_mutex_lock(&pool->mutex); - tid = pool->tid; - pool->tid = pthread_self(); - if (--pool->nthreads == 0 && pool->terminate) - pthread_cond_signal(pool->terminate); - - pthread_mutex_unlock(&pool->mutex); - // Èç¹û²»ÊÇÁãID£¬ÔòµÈ´ýÏ߳̽áÊø - if (!pthread_equal(tid, __zero_tid)) - pthread_join(tid, NULL); - - pthread_exit(NULL); + thrdpool_t *pool = (thrdpool_t *)context; + pthread_t tid; + + /* One thread joins another. Don't need to keep all thread IDs. */ + pthread_mutex_lock(&pool->mutex); + tid = pool->tid; + pool->tid = pthread_self(); + if (--pool->nthreads == 0 && pool->terminate) + pthread_cond_signal(pool->terminate); + + pthread_mutex_unlock(&pool->mutex); + if (!pthread_equal(tid, __zero_tid)) + pthread_join(tid, NULL); + + pthread_exit(NULL); } -// Ï̳߳صÄÏ̺߳¯Êý -static void* __thrdpool_routine(void* arg) +static void *__thrdpool_routine(void *arg) { - thrdpool_t* pool = (thrdpool_t*)arg; - struct __thrdpool_task_entry* entry; - void (*task_routine)(void*); - void* task_context; - - // ÉèÖÃÏֲ߳̾¿´æ´¢ - pthread_setspecific(pool->key, pool); - while (!pool->terminate) - { - // ´ÓÏûÏ¢¶ÓÁÐÖлñÈ¡ÈÎÎñ - entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue); - if (!entry) - break; - - // Ö´ÐÐÈÎÎñ - task_routine = entry->task.routine; - task_context = entry->task.context; - free(entry); - task_routine(task_context); - - // Èç¹ûÏ̳߳ØÖеÄÏß³ÌÊýÁ¿Îª0£¬ÔòÏ̳߳ØÒѱ»Ïú»Ù - if (pool->nthreads == 0) - { - free(pool); - return NULL; - } - } - - // Ö´ÐÐÏß³ÌÍ˳öʱµÄ²Ù×÷ - __thrdpool_exit_routine(pool); - return NULL; + thrdpool_t *pool = (thrdpool_t *)arg; + struct __thrdpool_task_entry *entry; + void (*task_routine)(void *); + void *task_context; + + pthread_setspecific(pool->key, pool); + while (!pool->terminate) + { + entry = (struct __thrdpool_task_entry *)msgqueue_get(pool->msgqueue); + if (!entry) + break; + + task_routine = entry->task.routine; + task_context = entry->task.context; + free(entry); + task_routine(task_context); + + if (pool->nthreads == 0) + { + /* Thread pool was destroyed by the task. */ + free(pool); + return NULL; + } + } + + __thrdpool_exit_routine(pool); + return NULL; } -// Ï̳߳ØÏú»Ùº¯Êý -static void __thrdpool_terminate(int in_pool, thrdpool_t* pool) +static void __thrdpool_terminate(int in_pool, thrdpool_t *pool) { - pthread_cond_t term = PTHREAD_COND_INITIALIZER; - - // Ëø¶¨»¥³âËø - pthread_mutex_lock(&pool->mutex); - msgqueue_set_nonblock(pool->msgqueue); - pool->terminate = &term; - - if (in_pool) - { - // Èç¹ûÔÚÏ̳߳صÄÏß³ÌÖÐÏú»ÙÏ̳߳أ¬Ôò·ÖÀ뵱ǰÏß³Ì - pthread_detach(pthread_self()); - pool->nthreads--; - } - - // µÈ´ýËùÓÐÏ߳̽áÊø - while (pool->nthreads > 0) - pthread_cond_wait(&term, &pool->mutex); - - pthread_mutex_unlock(&pool->mutex); - // Èç¹û²»ÊÇÁãID£¬ÔòµÈ´ýÏ߳̽áÊø - if (!pthread_equal(pool->tid, __zero_tid)) - pthread_join(pool->tid, NULL); + pthread_cond_t term = PTHREAD_COND_INITIALIZER; + + pthread_mutex_lock(&pool->mutex); + msgqueue_set_nonblock(pool->msgqueue); + pool->terminate = &term; + + if (in_pool) + { + /* Thread pool destroyed in a pool thread is legal. */ + pthread_detach(pthread_self()); + pool->nthreads--; + } + + while (pool->nthreads > 0) + pthread_cond_wait(&term, &pool->mutex); + + pthread_mutex_unlock(&pool->mutex); + if (!pthread_equal(pool->tid, __zero_tid)) + pthread_join(pool->tid, NULL); } -// Ï̳߳ش´½¨Ï̺߳¯Êý -static int __thrdpool_create_threads(size_t nthreads, thrdpool_t* pool) +static int __thrdpool_create_threads(size_t nthreads, thrdpool_t *pool) { - pthread_attr_t attr; - pthread_t tid; - int ret; - - // ³õʼ»¯Ïß³ÌÊôÐÔ - ret = pthread_attr_init(&attr); - if (ret == 0) - { - if (pool->stacksize) - pthread_attr_setstacksize(&attr, pool->stacksize); - - while (pool->nthreads < nthreads) - { - // ´´½¨Ïß³Ì - ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); - if (ret == 0) - pool->nthreads++; - else - break; - } - - pthread_attr_destroy(&attr); - if (pool->nthreads == nthreads) - return 0; - - // Èç¹û´´½¨Ê§°Ü£¬ÔòÏú»ÙÏß³Ì³Ø - __thrdpool_terminate(0, pool); - } - - errno = ret; - return -1; + pthread_attr_t attr; + pthread_t tid; + int ret; + + ret = pthread_attr_init(&attr); + if (ret == 0) + { + if (pool->stacksize) + pthread_attr_setstacksize(&attr, pool->stacksize); + + while (pool->nthreads < nthreads) + { + ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); + if (ret == 0) + pool->nthreads++; + else + break; + } + + pthread_attr_destroy(&attr); + if (pool->nthreads == nthreads) + return 0; + + __thrdpool_terminate(0, pool); + } + + errno = ret; + return -1; } -// ´´½¨Ïß³Ì³Ø -thrdpool_t* thrdpool_create(size_t nthreads, size_t stacksize) +thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize) { - thrdpool_t* pool; - int ret; - - pool = (thrdpool_t*)malloc(sizeof(thrdpool_t)); // ·ÖÅäÏ̳߳ؽṹÌåÄÚ´æ - if (!pool) - return NULL; - - pool->msgqueue = msgqueue_create(0, 0); // ´´½¨ÏûÏ¢¶ÓÁÐ - if (pool->msgqueue) - { - ret = pthread_mutex_init(&pool->mutex, NULL); // ³õʼ»¯»¥³âËø - if (ret == 0) - { - ret = pthread_key_create(&pool->key, NULL); // ´´½¨Ïֲ߳̾¿´æ´¢¼ü - if (ret == 0) - { - pool->stacksize = stacksize; // ÉèÖÃÏß³ÌÕ»´óС - pool->nthreads = 0; // ³õʼ»¯Ïß³ÌÊýÁ¿ - pool->tid = __zero_tid; // ³õʼ»¯Ïß³ÌID - pool->terminate = NULL; // ³õʼ»¯ÖÕÖ¹Ìõ¼þ - if (__thrdpool_create_threads(nthreads, pool) >= 0) // ´´½¨Ïß³Ì - return pool; - - pthread_key_delete(pool->key); // ɾ³ýÏֲ߳̾¿´æ´¢¼ü - } - - pthread_mutex_destroy(&pool->mutex); // Ïú»Ù»¥³âËø - } - - errno = ret; // ÉèÖôíÎóºÅ - msgqueue_destroy(pool->msgqueue); // Ïú»ÙÏûÏ¢¶ÓÁÐ - } - - free(pool); // ÊÍ·ÅÏ̳߳ؽṹÌåÄÚ´æ - return NULL; + thrdpool_t *pool; + int ret; + + pool = (thrdpool_t *)malloc(sizeof (thrdpool_t)); + if (!pool) + return NULL; + + pool->msgqueue = msgqueue_create(0, 0); + if (pool->msgqueue) + { + ret = pthread_mutex_init(&pool->mutex, NULL); + if (ret == 0) + { + ret = pthread_key_create(&pool->key, NULL); + if (ret == 0) + { + pool->stacksize = stacksize; + pool->nthreads = 0; + pool->tid = __zero_tid; + pool->terminate = NULL; + if (__thrdpool_create_threads(nthreads, pool) >= 0) + return pool; + + pthread_key_delete(pool->key); + } + + pthread_mutex_destroy(&pool->mutex); + } + + errno = ret; + msgqueue_destroy(pool->msgqueue); + } + + free(pool); + return NULL; } -// ÄÚ²¿º¯Êý£¬ÓÃÓÚ½«ÈÎÎñµ÷¶Èµ½Ïß³Ì³Ø -inline void __thrdpool_schedule(const struct thrdpool_task* task, void* buf, - thrdpool_t* pool); +inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf, + thrdpool_t *pool); -void __thrdpool_schedule(const struct thrdpool_task* task, void* buf, - thrdpool_t* pool) +void __thrdpool_schedule(const struct thrdpool_task *task, void *buf, + thrdpool_t *pool) { - ((struct __thrdpool_task_entry*)buf)->task = *task; // ÉèÖÃÈÎÎñ - msgqueue_put(buf, pool->msgqueue); // ½«ÈÎÎñ·ÅÈëÏûÏ¢¶ÓÁÐ + ((struct __thrdpool_task_entry *)buf)->task = *task; + msgqueue_put(buf, pool->msgqueue); } -// µ÷¶ÈÈÎÎñµ½Ïß³Ì³Ø -int thrdpool_schedule(const struct thrdpool_task* task, thrdpool_t* pool) +int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool) { - void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // ·ÖÅäÈÎÎñÌõÄ¿ÄÚ´æ + void *buf = malloc(sizeof (struct __thrdpool_task_entry)); - if (buf) - { - __thrdpool_schedule(task, buf, pool); // µ÷¶ÈÈÎÎñ - return 0; - } + if (buf) + { + __thrdpool_schedule(task, buf, pool); + return 0; + } - return -1; + return -1; } -// Åжϵ±Ç°Ïß³ÌÊÇ·ñÔÚÏ̳߳ØÖÐ -inline int thrdpool_in_pool(thrdpool_t* pool); +inline int thrdpool_in_pool(thrdpool_t *pool); -int thrdpool_in_pool(thrdpool_t* pool) +int thrdpool_in_pool(thrdpool_t *pool) { - return pthread_getspecific(pool->key) == pool; // ͨ¹ýÏֲ߳̾¿´æ´¢ÅÐ¶Ï + return pthread_getspecific(pool->key) == pool; } -// Ôö¼ÓÏ̳߳ØÖеÄÏß³ÌÊýÁ¿ -int thrdpool_increase(thrdpool_t* pool) +int thrdpool_increase(thrdpool_t *pool) { - pthread_attr_t attr; - pthread_t tid; - int ret; - - ret = pthread_attr_init(&attr); // ³õʼ»¯Ïß³ÌÊôÐÔ - if (ret == 0) - { - if (pool->stacksize) - pthread_attr_setstacksize(&attr, pool->stacksize); // ÉèÖÃÏß³ÌÕ»´óС - - pthread_mutex_lock(&pool->mutex); // Ëø¶¨»¥³âËø - ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); // ´´½¨Ïß³Ì - if (ret == 0) - pool->nthreads++; // Ôö¼ÓÏß³ÌÊýÁ¿ - - pthread_mutex_unlock(&pool->mutex); // ½âËø»¥³âËø - pthread_attr_destroy(&attr); // Ïú»ÙÏß³ÌÊôÐÔ - if (ret == 0) - return 0; - } - - errno = ret; // ÉèÖôíÎóºÅ - return -1; + pthread_attr_t attr; + pthread_t tid; + int ret; + + ret = pthread_attr_init(&attr); + if (ret == 0) + { + if (pool->stacksize) + pthread_attr_setstacksize(&attr, pool->stacksize); + + pthread_mutex_lock(&pool->mutex); + ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); + if (ret == 0) + pool->nthreads++; + + pthread_mutex_unlock(&pool->mutex); + pthread_attr_destroy(&attr); + if (ret == 0) + return 0; + } + + errno = ret; + return -1; } -// ¼õÉÙÏ̳߳ØÖеÄÏß³ÌÊýÁ¿ -int thrdpool_decrease(thrdpool_t* pool) +int thrdpool_decrease(thrdpool_t *pool) { - void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // ·ÖÅäÈÎÎñÌõÄ¿ÄÚ´æ - struct __thrdpool_task_entry* entry; - - if (buf) - { - entry = (struct __thrdpool_task_entry*)buf; - entry->task.routine = __thrdpool_exit_routine; // ÉèÖÃÍ˳öÈÎÎñ - entry->task.context = pool; - msgqueue_put_head(entry, pool->msgqueue); // ½«Í˳öÈÎÎñ·ÅÈëÏûÏ¢¶ÓÁÐÍ·²¿ - return 0; - } - - return -1; + void *buf = malloc(sizeof (struct __thrdpool_task_entry)); + struct __thrdpool_task_entry *entry; + + if (buf) + { + entry = (struct __thrdpool_task_entry *)buf; + entry->task.routine = __thrdpool_exit_routine; + entry->task.context = pool; + msgqueue_put_head(entry, pool->msgqueue); + return 0; + } + + return -1; } -// Ï̳߳ØÖеÄÏß³ÌÍ˳ö -void thrdpool_exit(thrdpool_t* pool) +void thrdpool_exit(thrdpool_t *pool) { - if (thrdpool_in_pool(pool)) // Èç¹ûµ±Ç°Ïß³ÌÔÚÏ̳߳ØÖÐ - __thrdpool_exit_routine(pool); // Ö´ÐÐÍ˳ö²Ù×÷ + if (thrdpool_in_pool(pool)) + __thrdpool_exit_routine(pool); } -// Ïú»ÙÏß³Ì³Ø -void thrdpool_destroy(void (*pending)(const struct thrdpool_task*), - thrdpool_t* pool) +void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), + thrdpool_t *pool) { - int in_pool = thrdpool_in_pool(pool); // ÅжÏÊÇ·ñÔÚÏ̳߳ØÖÐ - struct __thrdpool_task_entry* entry; - - __thrdpool_terminate(in_pool, pool); // ÖÕÖ¹Ïß³Ì³Ø - while (1) - { - entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue); // »ñÈ¡ÈÎÎñ - if (!entry) - break; - - if (pending && entry->task.routine != __thrdpool_exit_routine) // Èç¹ûÓÐ¹ÒÆðÈÎÎñ´¦Àíº¯Êý - pending(&entry->task); // ´¦Àí¹ÒÆðÈÎÎñ - - free(entry); // ÊÍ·ÅÈÎÎñÌõÄ¿ÄÚ´æ - } - - pthread_key_delete(pool->key); // ɾ³ýÏֲ߳̾¿´æ´¢¼ü - pthread_mutex_destroy(&pool->mutex); // Ïú»Ù»¥³âËø - msgqueue_destroy(pool->msgqueue); // Ïú»ÙÏûÏ¢¶ÓÁÐ - if (!in_pool) // Èç¹û²»ÔÚÏ̳߳ØÖУ¬ÊÍ·ÅÏ̳߳ؽṹÌåÄÚ´æ - free(pool); -} \ No newline at end of file + int in_pool = thrdpool_in_pool(pool); + struct __thrdpool_task_entry *entry; + + __thrdpool_terminate(in_pool, pool); + while (1) + { + entry = (struct __thrdpool_task_entry *)msgqueue_get(pool->msgqueue); + if (!entry) + break; + + if (pending && entry->task.routine != __thrdpool_exit_routine) + pending(&entry->task); + + free(entry); + } + + pthread_key_delete(pool->key); + pthread_mutex_destroy(&pool->mutex); + msgqueue_destroy(pool->msgqueue); + if (!in_pool) + free(pool); +} +