diff --git a/src/kernel/thrdpool - 改.c b/src/kernel/thrdpool - 改.c new file mode 100644 index 0000000..abac1a4 --- /dev/null +++ b/src/kernel/thrdpool - 改.c @@ -0,0 +1,318 @@ +/* + Copyright (c) 2019 Sogou, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + Author: Xie Han (xiehan@sogou-inc.com) +*/ + +#include // °üº¬´íÎóºÅ¶¨Òå +#include // °üº¬POSIXÏ߳̿â +#include // °üº¬±ê×¼¿âº¯Êý£¬ÈçmallocºÍfree +#include "msgqueue.h" // °üº¬ÏûÏ¢¶ÓÁеÄʵÏÖ£¬ÓÃÓÚÏ̼߳äͨÐÅ +#include "thrdpool.h" // °üº¬Ï̳߳صÄÉùÃ÷ + +// Ï̳߳ؽṹÌ嶨Òå +struct __thrdpool +{ + msgqueue_t* msgqueue; // ÏûÏ¢¶ÓÁУ¬ÓÃÓÚ´æ·Å´ýÖ´ÐеÄÈÎÎñ + size_t nthreads; // Ï̳߳ØÖеÄÏß³ÌÊýÁ¿ + size_t stacksize; // Ïß³ÌÕ»µÄ´óС + pthread_t tid; // Ïß³ÌID + pthread_mutex_t mutex; // »¥³âËø£¬ÓÃÓÚ±£»¤Ï̳߳صĹ²ÏíÊý¾Ý + pthread_key_t key; // Ïֲ߳̾¿´æ´¢µÄ¼ü + pthread_cond_t* terminate; // Ìõ¼þ±äÁ¿£¬ÓÃÓÚ֪ͨÏß³ÌÍ˳ö +}; + +// ÈÎÎñÌõÄ¿½á¹¹Ì嶨Òå +struct __thrdpool_task_entry +{ + void* link; // Á´½Óµ½ÏÂÒ»¸öÈÎÎñÌõÄ¿ + struct thrdpool_task task; // ÈÎÎñ½á¹¹Ì壬°üº¬ÈÎÎñº¯ÊýºÍÉÏÏÂÎÄ +}; + +// ¾²Ì¬±äÁ¿£¬ÓÃÓÚ±íʾһ¸ö¿ÕµÄÏß³ÌID +static pthread_t __zero_tid; + +// Ïß³ÌÍ˳öʱµÄ»Øµ÷º¯Êý +static void __thrdpool_exit_routine(void* context) +{ + thrdpool_t* pool = (thrdpool_t*)context; + pthread_t tid; + + // Ëø¶¨»¥³âËø + pthread_mutex_lock(&pool->mutex); + tid = pool->tid; + pool->tid = pthread_self(); + if (--pool->nthreads == 0 && pool->terminate) + pthread_cond_signal(pool->terminate); + + pthread_mutex_unlock(&pool->mutex); + // Èç¹û²»ÊÇÁãID£¬ÔòµÈ´ýÏ߳̽áÊø + if (!pthread_equal(tid, __zero_tid)) + pthread_join(tid, NULL); + + pthread_exit(NULL); +} + +// Ï̳߳صÄÏ̺߳¯Êý +static void* __thrdpool_routine(void* arg) +{ + thrdpool_t* pool = (thrdpool_t*)arg; + struct __thrdpool_task_entry* entry; + void (*task_routine)(void*); + void* task_context; + + // ÉèÖÃÏֲ߳̾¿´æ´¢ + pthread_setspecific(pool->key, pool); + while (!pool->terminate) + { + // ´ÓÏûÏ¢¶ÓÁÐÖлñÈ¡ÈÎÎñ + entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue); + if (!entry) + break; + + // Ö´ÐÐÈÎÎñ + task_routine = entry->task.routine; + task_context = entry->task.context; + free(entry); + task_routine(task_context); + + // Èç¹ûÏ̳߳ØÖеÄÏß³ÌÊýÁ¿Îª0£¬ÔòÏ̳߳ØÒѱ»Ïú»Ù + if (pool->nthreads == 0) + { + free(pool); + return NULL; + } + } + + // Ö´ÐÐÏß³ÌÍ˳öʱµÄ²Ù×÷ + __thrdpool_exit_routine(pool); + return NULL; +} + +// Ï̳߳ØÏú»Ùº¯Êý +static void __thrdpool_terminate(int in_pool, thrdpool_t* pool) +{ + pthread_cond_t term = PTHREAD_COND_INITIALIZER; + + // Ëø¶¨»¥³âËø + pthread_mutex_lock(&pool->mutex); + msgqueue_set_nonblock(pool->msgqueue); + pool->terminate = &term; + + if (in_pool) + { + // Èç¹ûÔÚÏ̳߳صÄÏß³ÌÖÐÏú»ÙÏ̳߳أ¬Ôò·ÖÀ뵱ǰÏß³Ì + pthread_detach(pthread_self()); + pool->nthreads--; + } + + // µÈ´ýËùÓÐÏ߳̽áÊø + while (pool->nthreads > 0) + pthread_cond_wait(&term, &pool->mutex); + + pthread_mutex_unlock(&pool->mutex); + // Èç¹û²»ÊÇÁãID£¬ÔòµÈ´ýÏ߳̽áÊø + if (!pthread_equal(pool->tid, __zero_tid)) + pthread_join(pool->tid, NULL); +} + +// Ï̳߳ش´½¨Ï̺߳¯Êý +static int __thrdpool_create_threads(size_t nthreads, thrdpool_t* pool) +{ + pthread_attr_t attr; + pthread_t tid; + int ret; + + // ³õʼ»¯Ïß³ÌÊôÐÔ + ret = pthread_attr_init(&attr); + if (ret == 0) + { + if (pool->stacksize) + pthread_attr_setstacksize(&attr, pool->stacksize); + + while (pool->nthreads < nthreads) + { + // ´´½¨Ïß³Ì + ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); + if (ret == 0) + pool->nthreads++; + else + break; + } + + pthread_attr_destroy(&attr); + if (pool->nthreads == nthreads) + return 0; + + // Èç¹û´´½¨Ê§°Ü£¬ÔòÏú»ÙÏß³Ì³Ø + __thrdpool_terminate(0, pool); + } + + errno = ret; + return -1; +} + +// ´´½¨Ïß³Ì³Ø +thrdpool_t* thrdpool_create(size_t nthreads, size_t stacksize) +{ + thrdpool_t* pool; + int ret; + + pool = (thrdpool_t*)malloc(sizeof(thrdpool_t)); // ·ÖÅäÏ̳߳ؽṹÌåÄÚ´æ + if (!pool) + return NULL; + + pool->msgqueue = msgqueue_create(0, 0); // ´´½¨ÏûÏ¢¶ÓÁÐ + if (pool->msgqueue) + { + ret = pthread_mutex_init(&pool->mutex, NULL); // ³õʼ»¯»¥³âËø + if (ret == 0) + { + ret = pthread_key_create(&pool->key, NULL); // ´´½¨Ïֲ߳̾¿´æ´¢¼ü + if (ret == 0) + { + pool->stacksize = stacksize; // ÉèÖÃÏß³ÌÕ»´óС + pool->nthreads = 0; // ³õʼ»¯Ïß³ÌÊýÁ¿ + pool->tid = __zero_tid; // ³õʼ»¯Ïß³ÌID + pool->terminate = NULL; // ³õʼ»¯ÖÕÖ¹Ìõ¼þ + if (__thrdpool_create_threads(nthreads, pool) >= 0) // ´´½¨Ïß³Ì + return pool; + + pthread_key_delete(pool->key); // ɾ³ýÏֲ߳̾¿´æ´¢¼ü + } + + pthread_mutex_destroy(&pool->mutex); // Ïú»Ù»¥³âËø + } + + errno = ret; // ÉèÖôíÎóºÅ + msgqueue_destroy(pool->msgqueue); // Ïú»ÙÏûÏ¢¶ÓÁÐ + } + + free(pool); // ÊÍ·ÅÏ̳߳ؽṹÌåÄÚ´æ + return NULL; +} + +// ÄÚ²¿º¯Êý£¬ÓÃÓÚ½«ÈÎÎñµ÷¶Èµ½Ïß³Ì³Ø +inline void __thrdpool_schedule(const struct thrdpool_task* task, void* buf, + thrdpool_t* pool); + +void __thrdpool_schedule(const struct thrdpool_task* task, void* buf, + thrdpool_t* pool) +{ + ((struct __thrdpool_task_entry*)buf)->task = *task; // ÉèÖÃÈÎÎñ + msgqueue_put(buf, pool->msgqueue); // ½«ÈÎÎñ·ÅÈëÏûÏ¢¶ÓÁÐ +} + +// µ÷¶ÈÈÎÎñµ½Ïß³Ì³Ø +int thrdpool_schedule(const struct thrdpool_task* task, thrdpool_t* pool) +{ + void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // ·ÖÅäÈÎÎñÌõÄ¿ÄÚ´æ + + if (buf) + { + __thrdpool_schedule(task, buf, pool); // µ÷¶ÈÈÎÎñ + return 0; + } + + return -1; +} + +// Åжϵ±Ç°Ïß³ÌÊÇ·ñÔÚÏ̳߳ØÖÐ +inline int thrdpool_in_pool(thrdpool_t* pool); + +int thrdpool_in_pool(thrdpool_t* pool) +{ + return pthread_getspecific(pool->key) == pool; // ͨ¹ýÏֲ߳̾¿´æ´¢ÅÐ¶Ï +} + +// Ôö¼ÓÏ̳߳ØÖеÄÏß³ÌÊýÁ¿ +int thrdpool_increase(thrdpool_t* pool) +{ + pthread_attr_t attr; + pthread_t tid; + int ret; + + ret = pthread_attr_init(&attr); // ³õʼ»¯Ïß³ÌÊôÐÔ + if (ret == 0) + { + if (pool->stacksize) + pthread_attr_setstacksize(&attr, pool->stacksize); // ÉèÖÃÏß³ÌÕ»´óС + + pthread_mutex_lock(&pool->mutex); // Ëø¶¨»¥³âËø + ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); // ´´½¨Ïß³Ì + if (ret == 0) + pool->nthreads++; // Ôö¼ÓÏß³ÌÊýÁ¿ + + pthread_mutex_unlock(&pool->mutex); // ½âËø»¥³âËø + pthread_attr_destroy(&attr); // Ïú»ÙÏß³ÌÊôÐÔ + if (ret == 0) + return 0; + } + + errno = ret; // ÉèÖôíÎóºÅ + return -1; +} + +// ¼õÉÙÏ̳߳ØÖеÄÏß³ÌÊýÁ¿ +int thrdpool_decrease(thrdpool_t* pool) +{ + void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // ·ÖÅäÈÎÎñÌõÄ¿ÄÚ´æ + struct __thrdpool_task_entry* entry; + + if (buf) + { + entry = (struct __thrdpool_task_entry*)buf; + entry->task.routine = __thrdpool_exit_routine; // ÉèÖÃÍ˳öÈÎÎñ + entry->task.context = pool; + msgqueue_put_head(entry, pool->msgqueue); // ½«Í˳öÈÎÎñ·ÅÈëÏûÏ¢¶ÓÁÐÍ·²¿ + return 0; + } + + return -1; +} + +// Ï̳߳ØÖеÄÏß³ÌÍ˳ö +void thrdpool_exit(thrdpool_t* pool) +{ + if (thrdpool_in_pool(pool)) // Èç¹ûµ±Ç°Ïß³ÌÔÚÏ̳߳ØÖÐ + __thrdpool_exit_routine(pool); // Ö´ÐÐÍ˳ö²Ù×÷ +} + +// Ïú»ÙÏß³Ì³Ø +void thrdpool_destroy(void (*pending)(const struct thrdpool_task*), + thrdpool_t* pool) +{ + int in_pool = thrdpool_in_pool(pool); // ÅжÏÊÇ·ñÔÚÏ̳߳ØÖÐ + struct __thrdpool_task_entry* entry; + + __thrdpool_terminate(in_pool, pool); // ÖÕÖ¹Ïß³Ì³Ø + while (1) + { + entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue); // »ñÈ¡ÈÎÎñ + if (!entry) + break; + + if (pending && entry->task.routine != __thrdpool_exit_routine) // Èç¹ûÓÐ¹ÒÆðÈÎÎñ´¦Àíº¯Êý + pending(&entry->task); // ´¦Àí¹ÒÆðÈÎÎñ + + free(entry); // ÊÍ·ÅÈÎÎñÌõÄ¿ÄÚ´æ + } + + pthread_key_delete(pool->key); // ɾ³ýÏֲ߳̾¿´æ´¢¼ü + pthread_mutex_destroy(&pool->mutex); // Ïú»Ù»¥³âËø + msgqueue_destroy(pool->msgqueue); // Ïú»ÙÏûÏ¢¶ÓÁÐ + if (!in_pool) // Èç¹û²»ÔÚÏ̳߳ØÖУ¬ÊÍ·ÅÏ̳߳ؽṹÌåÄÚ´æ + free(pool); +} \ No newline at end of file diff --git a/src/kernel/thrdpool.c b/src/kernel/thrdpool.c index c1be399..abac1a4 100644 --- a/src/kernel/thrdpool.c +++ b/src/kernel/thrdpool.c @@ -16,278 +16,303 @@ Author: Xie Han (xiehan@sogou-inc.com) */ -#include -#include -#include -#include "msgqueue.h" -#include "thrdpool.h" +#include // °üº¬´íÎóºÅ¶¨Òå +#include // °üº¬POSIXÏ߳̿â +#include // °üº¬±ê×¼¿âº¯Êý£¬ÈçmallocºÍfree +#include "msgqueue.h" // °üº¬ÏûÏ¢¶ÓÁеÄʵÏÖ£¬ÓÃÓÚÏ̼߳äͨÐÅ +#include "thrdpool.h" // °üº¬Ï̳߳صÄÉùÃ÷ +// Ï̳߳ؽṹÌ嶨Òå struct __thrdpool { - msgqueue_t *msgqueue; - size_t nthreads; - size_t stacksize; - pthread_t tid; - pthread_mutex_t mutex; - pthread_key_t key; - pthread_cond_t *terminate; + msgqueue_t* msgqueue; // ÏûÏ¢¶ÓÁУ¬ÓÃÓÚ´æ·Å´ýÖ´ÐеÄÈÎÎñ + size_t nthreads; // Ï̳߳ØÖеÄÏß³ÌÊýÁ¿ + size_t stacksize; // Ïß³ÌÕ»µÄ´óС + pthread_t tid; // Ïß³ÌID + pthread_mutex_t mutex; // »¥³âËø£¬ÓÃÓÚ±£»¤Ï̳߳صĹ²ÏíÊý¾Ý + pthread_key_t key; // Ïֲ߳̾¿´æ´¢µÄ¼ü + pthread_cond_t* terminate; // Ìõ¼þ±äÁ¿£¬ÓÃÓÚ֪ͨÏß³ÌÍ˳ö }; +// ÈÎÎñÌõÄ¿½á¹¹Ì嶨Òå struct __thrdpool_task_entry { - void *link; - struct thrdpool_task task; + void* link; // Á´½Óµ½ÏÂÒ»¸öÈÎÎñÌõÄ¿ + struct thrdpool_task task; // ÈÎÎñ½á¹¹Ì壬°üº¬ÈÎÎñº¯ÊýºÍÉÏÏÂÎÄ }; +// ¾²Ì¬±äÁ¿£¬ÓÃÓÚ±íʾһ¸ö¿ÕµÄÏß³ÌID static pthread_t __zero_tid; -static void __thrdpool_exit_routine(void *context) +// Ïß³ÌÍ˳öʱµÄ»Øµ÷º¯Êý +static void __thrdpool_exit_routine(void* context) { - thrdpool_t *pool = (thrdpool_t *)context; - pthread_t tid; - - /* One thread joins another. Don't need to keep all thread IDs. */ - pthread_mutex_lock(&pool->mutex); - tid = pool->tid; - pool->tid = pthread_self(); - if (--pool->nthreads == 0 && pool->terminate) - pthread_cond_signal(pool->terminate); - - pthread_mutex_unlock(&pool->mutex); - if (!pthread_equal(tid, __zero_tid)) - pthread_join(tid, NULL); - - pthread_exit(NULL); + thrdpool_t* pool = (thrdpool_t*)context; + pthread_t tid; + + // Ëø¶¨»¥³âËø + pthread_mutex_lock(&pool->mutex); + tid = pool->tid; + pool->tid = pthread_self(); + if (--pool->nthreads == 0 && pool->terminate) + pthread_cond_signal(pool->terminate); + + pthread_mutex_unlock(&pool->mutex); + // Èç¹û²»ÊÇÁãID£¬ÔòµÈ´ýÏ߳̽áÊø + if (!pthread_equal(tid, __zero_tid)) + pthread_join(tid, NULL); + + pthread_exit(NULL); } -static void *__thrdpool_routine(void *arg) +// Ï̳߳صÄÏ̺߳¯Êý +static void* __thrdpool_routine(void* arg) { - thrdpool_t *pool = (thrdpool_t *)arg; - struct __thrdpool_task_entry *entry; - void (*task_routine)(void *); - void *task_context; - - pthread_setspecific(pool->key, pool); - while (!pool->terminate) - { - entry = (struct __thrdpool_task_entry *)msgqueue_get(pool->msgqueue); - if (!entry) - break; - - task_routine = entry->task.routine; - task_context = entry->task.context; - free(entry); - task_routine(task_context); - - if (pool->nthreads == 0) - { - /* Thread pool was destroyed by the task. */ - free(pool); - return NULL; - } - } - - __thrdpool_exit_routine(pool); - return NULL; + thrdpool_t* pool = (thrdpool_t*)arg; + struct __thrdpool_task_entry* entry; + void (*task_routine)(void*); + void* task_context; + + // ÉèÖÃÏֲ߳̾¿´æ´¢ + pthread_setspecific(pool->key, pool); + while (!pool->terminate) + { + // ´ÓÏûÏ¢¶ÓÁÐÖлñÈ¡ÈÎÎñ + entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue); + if (!entry) + break; + + // Ö´ÐÐÈÎÎñ + task_routine = entry->task.routine; + task_context = entry->task.context; + free(entry); + task_routine(task_context); + + // Èç¹ûÏ̳߳ØÖеÄÏß³ÌÊýÁ¿Îª0£¬ÔòÏ̳߳ØÒѱ»Ïú»Ù + if (pool->nthreads == 0) + { + free(pool); + return NULL; + } + } + + // Ö´ÐÐÏß³ÌÍ˳öʱµÄ²Ù×÷ + __thrdpool_exit_routine(pool); + return NULL; } -static void __thrdpool_terminate(int in_pool, thrdpool_t *pool) +// Ï̳߳ØÏú»Ùº¯Êý +static void __thrdpool_terminate(int in_pool, thrdpool_t* pool) { - pthread_cond_t term = PTHREAD_COND_INITIALIZER; - - pthread_mutex_lock(&pool->mutex); - msgqueue_set_nonblock(pool->msgqueue); - pool->terminate = &term; - - if (in_pool) - { - /* Thread pool destroyed in a pool thread is legal. */ - pthread_detach(pthread_self()); - pool->nthreads--; - } - - while (pool->nthreads > 0) - pthread_cond_wait(&term, &pool->mutex); - - pthread_mutex_unlock(&pool->mutex); - if (!pthread_equal(pool->tid, __zero_tid)) - pthread_join(pool->tid, NULL); + pthread_cond_t term = PTHREAD_COND_INITIALIZER; + + // Ëø¶¨»¥³âËø + pthread_mutex_lock(&pool->mutex); + msgqueue_set_nonblock(pool->msgqueue); + pool->terminate = &term; + + if (in_pool) + { + // Èç¹ûÔÚÏ̳߳صÄÏß³ÌÖÐÏú»ÙÏ̳߳أ¬Ôò·ÖÀ뵱ǰÏß³Ì + pthread_detach(pthread_self()); + pool->nthreads--; + } + + // µÈ´ýËùÓÐÏ߳̽áÊø + while (pool->nthreads > 0) + pthread_cond_wait(&term, &pool->mutex); + + pthread_mutex_unlock(&pool->mutex); + // Èç¹û²»ÊÇÁãID£¬ÔòµÈ´ýÏ߳̽áÊø + if (!pthread_equal(pool->tid, __zero_tid)) + pthread_join(pool->tid, NULL); } -static int __thrdpool_create_threads(size_t nthreads, thrdpool_t *pool) +// Ï̳߳ش´½¨Ï̺߳¯Êý +static int __thrdpool_create_threads(size_t nthreads, thrdpool_t* pool) { - pthread_attr_t attr; - pthread_t tid; - int ret; - - ret = pthread_attr_init(&attr); - if (ret == 0) - { - if (pool->stacksize) - pthread_attr_setstacksize(&attr, pool->stacksize); - - while (pool->nthreads < nthreads) - { - ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); - if (ret == 0) - pool->nthreads++; - else - break; - } - - pthread_attr_destroy(&attr); - if (pool->nthreads == nthreads) - return 0; - - __thrdpool_terminate(0, pool); - } - - errno = ret; - return -1; + pthread_attr_t attr; + pthread_t tid; + int ret; + + // ³õʼ»¯Ïß³ÌÊôÐÔ + ret = pthread_attr_init(&attr); + if (ret == 0) + { + if (pool->stacksize) + pthread_attr_setstacksize(&attr, pool->stacksize); + + while (pool->nthreads < nthreads) + { + // ´´½¨Ïß³Ì + ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); + if (ret == 0) + pool->nthreads++; + else + break; + } + + pthread_attr_destroy(&attr); + if (pool->nthreads == nthreads) + return 0; + + // Èç¹û´´½¨Ê§°Ü£¬ÔòÏú»ÙÏß³Ì³Ø + __thrdpool_terminate(0, pool); + } + + errno = ret; + return -1; } -thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize) +// ´´½¨Ïß³Ì³Ø +thrdpool_t* thrdpool_create(size_t nthreads, size_t stacksize) { - thrdpool_t *pool; - int ret; - - pool = (thrdpool_t *)malloc(sizeof (thrdpool_t)); - if (!pool) - return NULL; - - pool->msgqueue = msgqueue_create(0, 0); - if (pool->msgqueue) - { - ret = pthread_mutex_init(&pool->mutex, NULL); - if (ret == 0) - { - ret = pthread_key_create(&pool->key, NULL); - if (ret == 0) - { - pool->stacksize = stacksize; - pool->nthreads = 0; - pool->tid = __zero_tid; - pool->terminate = NULL; - if (__thrdpool_create_threads(nthreads, pool) >= 0) - return pool; - - pthread_key_delete(pool->key); - } - - pthread_mutex_destroy(&pool->mutex); - } - - errno = ret; - msgqueue_destroy(pool->msgqueue); - } - - free(pool); - return NULL; + thrdpool_t* pool; + int ret; + + pool = (thrdpool_t*)malloc(sizeof(thrdpool_t)); // ·ÖÅäÏ̳߳ؽṹÌåÄÚ´æ + if (!pool) + return NULL; + + pool->msgqueue = msgqueue_create(0, 0); // ´´½¨ÏûÏ¢¶ÓÁÐ + if (pool->msgqueue) + { + ret = pthread_mutex_init(&pool->mutex, NULL); // ³õʼ»¯»¥³âËø + if (ret == 0) + { + ret = pthread_key_create(&pool->key, NULL); // ´´½¨Ïֲ߳̾¿´æ´¢¼ü + if (ret == 0) + { + pool->stacksize = stacksize; // ÉèÖÃÏß³ÌÕ»´óС + pool->nthreads = 0; // ³õʼ»¯Ïß³ÌÊýÁ¿ + pool->tid = __zero_tid; // ³õʼ»¯Ïß³ÌID + pool->terminate = NULL; // ³õʼ»¯ÖÕÖ¹Ìõ¼þ + if (__thrdpool_create_threads(nthreads, pool) >= 0) // ´´½¨Ïß³Ì + return pool; + + pthread_key_delete(pool->key); // ɾ³ýÏֲ߳̾¿´æ´¢¼ü + } + + pthread_mutex_destroy(&pool->mutex); // Ïú»Ù»¥³âËø + } + + errno = ret; // ÉèÖôíÎóºÅ + msgqueue_destroy(pool->msgqueue); // Ïú»ÙÏûÏ¢¶ÓÁÐ + } + + free(pool); // ÊÍ·ÅÏ̳߳ؽṹÌåÄÚ´æ + return NULL; } -inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf, - thrdpool_t *pool); +// ÄÚ²¿º¯Êý£¬ÓÃÓÚ½«ÈÎÎñµ÷¶Èµ½Ïß³Ì³Ø +inline void __thrdpool_schedule(const struct thrdpool_task* task, void* buf, + thrdpool_t* pool); -void __thrdpool_schedule(const struct thrdpool_task *task, void *buf, - thrdpool_t *pool) +void __thrdpool_schedule(const struct thrdpool_task* task, void* buf, + thrdpool_t* pool) { - ((struct __thrdpool_task_entry *)buf)->task = *task; - msgqueue_put(buf, pool->msgqueue); + ((struct __thrdpool_task_entry*)buf)->task = *task; // ÉèÖÃÈÎÎñ + msgqueue_put(buf, pool->msgqueue); // ½«ÈÎÎñ·ÅÈëÏûÏ¢¶ÓÁÐ } -int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool) +// µ÷¶ÈÈÎÎñµ½Ïß³Ì³Ø +int thrdpool_schedule(const struct thrdpool_task* task, thrdpool_t* pool) { - void *buf = malloc(sizeof (struct __thrdpool_task_entry)); + void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // ·ÖÅäÈÎÎñÌõÄ¿ÄÚ´æ - if (buf) - { - __thrdpool_schedule(task, buf, pool); - return 0; - } + if (buf) + { + __thrdpool_schedule(task, buf, pool); // µ÷¶ÈÈÎÎñ + return 0; + } - return -1; + return -1; } -inline int thrdpool_in_pool(thrdpool_t *pool); +// Åжϵ±Ç°Ïß³ÌÊÇ·ñÔÚÏ̳߳ØÖÐ +inline int thrdpool_in_pool(thrdpool_t* pool); -int thrdpool_in_pool(thrdpool_t *pool) +int thrdpool_in_pool(thrdpool_t* pool) { - return pthread_getspecific(pool->key) == pool; + return pthread_getspecific(pool->key) == pool; // ͨ¹ýÏֲ߳̾¿´æ´¢ÅÐ¶Ï } -int thrdpool_increase(thrdpool_t *pool) +// Ôö¼ÓÏ̳߳ØÖеÄÏß³ÌÊýÁ¿ +int thrdpool_increase(thrdpool_t* pool) { - pthread_attr_t attr; - pthread_t tid; - int ret; - - ret = pthread_attr_init(&attr); - if (ret == 0) - { - if (pool->stacksize) - pthread_attr_setstacksize(&attr, pool->stacksize); - - pthread_mutex_lock(&pool->mutex); - ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); - if (ret == 0) - pool->nthreads++; - - pthread_mutex_unlock(&pool->mutex); - pthread_attr_destroy(&attr); - if (ret == 0) - return 0; - } - - errno = ret; - return -1; + pthread_attr_t attr; + pthread_t tid; + int ret; + + ret = pthread_attr_init(&attr); // ³õʼ»¯Ïß³ÌÊôÐÔ + if (ret == 0) + { + if (pool->stacksize) + pthread_attr_setstacksize(&attr, pool->stacksize); // ÉèÖÃÏß³ÌÕ»´óС + + pthread_mutex_lock(&pool->mutex); // Ëø¶¨»¥³âËø + ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); // ´´½¨Ïß³Ì + if (ret == 0) + pool->nthreads++; // Ôö¼ÓÏß³ÌÊýÁ¿ + + pthread_mutex_unlock(&pool->mutex); // ½âËø»¥³âËø + pthread_attr_destroy(&attr); // Ïú»ÙÏß³ÌÊôÐÔ + if (ret == 0) + return 0; + } + + errno = ret; // ÉèÖôíÎóºÅ + return -1; } -int thrdpool_decrease(thrdpool_t *pool) +// ¼õÉÙÏ̳߳ØÖеÄÏß³ÌÊýÁ¿ +int thrdpool_decrease(thrdpool_t* pool) { - void *buf = malloc(sizeof (struct __thrdpool_task_entry)); - struct __thrdpool_task_entry *entry; - - if (buf) - { - entry = (struct __thrdpool_task_entry *)buf; - entry->task.routine = __thrdpool_exit_routine; - entry->task.context = pool; - msgqueue_put_head(entry, pool->msgqueue); - return 0; - } - - return -1; + void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // ·ÖÅäÈÎÎñÌõÄ¿ÄÚ´æ + struct __thrdpool_task_entry* entry; + + if (buf) + { + entry = (struct __thrdpool_task_entry*)buf; + entry->task.routine = __thrdpool_exit_routine; // ÉèÖÃÍ˳öÈÎÎñ + entry->task.context = pool; + msgqueue_put_head(entry, pool->msgqueue); // ½«Í˳öÈÎÎñ·ÅÈëÏûÏ¢¶ÓÁÐÍ·²¿ + return 0; + } + + return -1; } -void thrdpool_exit(thrdpool_t *pool) +// Ï̳߳ØÖеÄÏß³ÌÍ˳ö +void thrdpool_exit(thrdpool_t* pool) { - if (thrdpool_in_pool(pool)) - __thrdpool_exit_routine(pool); + if (thrdpool_in_pool(pool)) // Èç¹ûµ±Ç°Ïß³ÌÔÚÏ̳߳ØÖÐ + __thrdpool_exit_routine(pool); // Ö´ÐÐÍ˳ö²Ù×÷ } -void thrdpool_destroy(void (*pending)(const struct thrdpool_task *), - thrdpool_t *pool) +// Ïú»ÙÏß³Ì³Ø +void thrdpool_destroy(void (*pending)(const struct thrdpool_task*), + thrdpool_t* pool) { - int in_pool = thrdpool_in_pool(pool); - struct __thrdpool_task_entry *entry; - - __thrdpool_terminate(in_pool, pool); - while (1) - { - entry = (struct __thrdpool_task_entry *)msgqueue_get(pool->msgqueue); - if (!entry) - break; - - if (pending && entry->task.routine != __thrdpool_exit_routine) - pending(&entry->task); - - free(entry); - } - - pthread_key_delete(pool->key); - pthread_mutex_destroy(&pool->mutex); - msgqueue_destroy(pool->msgqueue); - if (!in_pool) - free(pool); -} - + int in_pool = thrdpool_in_pool(pool); // ÅжÏÊÇ·ñÔÚÏ̳߳ØÖÐ + struct __thrdpool_task_entry* entry; + + __thrdpool_terminate(in_pool, pool); // ÖÕÖ¹Ïß³Ì³Ø + while (1) + { + entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue); // »ñÈ¡ÈÎÎñ + if (!entry) + break; + + if (pending && entry->task.routine != __thrdpool_exit_routine) // Èç¹ûÓÐ¹ÒÆðÈÎÎñ´¦Àíº¯Êý + pending(&entry->task); // ´¦Àí¹ÒÆðÈÎÎñ + + free(entry); // ÊÍ·ÅÈÎÎñÌõÄ¿ÄÚ´æ + } + + pthread_key_delete(pool->key); // ɾ³ýÏֲ߳̾¿´æ´¢¼ü + pthread_mutex_destroy(&pool->mutex); // Ïú»Ù»¥³âËø + msgqueue_destroy(pool->msgqueue); // Ïú»ÙÏûÏ¢¶ÓÁÐ + if (!in_pool) // Èç¹û²»ÔÚÏ̳߳ØÖУ¬ÊÍ·ÅÏ̳߳ؽṹÌåÄÚ´æ + free(pool); +} \ No newline at end of file