cmz 8 months ago
parent d7d663c971
commit eb76b44d1b

@ -0,0 +1,318 @@
/*
Copyright (c) 2019 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han (xiehan@sogou-inc.com)
*/
#include <errno.h> // 包含错误号定义
#include <pthread.h> // 包含POSIX线程库
#include <stdlib.h> // 包含标准库函数如malloc和free
#include "msgqueue.h" // 包含消息队列的实现,用于线程间通信
#include "thrdpool.h" // 包含线程池的声明
// 线程池结构体定义
struct __thrdpool
{
msgqueue_t* msgqueue; // 消息队列,用于存放待执行的任务
size_t nthreads; // 线程池中的线程数量
size_t stacksize; // 线程栈的大小
pthread_t tid; // 线程ID
pthread_mutex_t mutex; // 互斥锁,用于保护线程池的共享数据
pthread_key_t key; // 线程局部存储的键
pthread_cond_t* terminate; // 条件变量,用于通知线程退出
};
// 任务条目结构体定义
struct __thrdpool_task_entry
{
void* link; // 链接到下一个任务条目
struct thrdpool_task task; // 任务结构体,包含任务函数和上下文
};
// 静态变量用于表示一个空的线程ID
static pthread_t __zero_tid;
// 线程退出时的回调函数
static void __thrdpool_exit_routine(void* context)
{
thrdpool_t* pool = (thrdpool_t*)context;
pthread_t tid;
// 锁定互斥锁
pthread_mutex_lock(&pool->mutex);
tid = pool->tid;
pool->tid = pthread_self();
if (--pool->nthreads == 0 && pool->terminate)
pthread_cond_signal(pool->terminate);
pthread_mutex_unlock(&pool->mutex);
// 如果不是零ID则等待线程结束
if (!pthread_equal(tid, __zero_tid))
pthread_join(tid, NULL);
pthread_exit(NULL);
}
// 线程池的线程函数
static void* __thrdpool_routine(void* arg)
{
thrdpool_t* pool = (thrdpool_t*)arg;
struct __thrdpool_task_entry* entry;
void (*task_routine)(void*);
void* task_context;
// 设置线程局部存储
pthread_setspecific(pool->key, pool);
while (!pool->terminate)
{
// 从消息队列中获取任务
entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue);
if (!entry)
break;
// 执行任务
task_routine = entry->task.routine;
task_context = entry->task.context;
free(entry);
task_routine(task_context);
// 如果线程池中的线程数量为0则线程池已被销毁
if (pool->nthreads == 0)
{
free(pool);
return NULL;
}
}
// 执行线程退出时的操作
__thrdpool_exit_routine(pool);
return NULL;
}
// 线程池销毁函数
static void __thrdpool_terminate(int in_pool, thrdpool_t* pool)
{
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
// 锁定互斥锁
pthread_mutex_lock(&pool->mutex);
msgqueue_set_nonblock(pool->msgqueue);
pool->terminate = &term;
if (in_pool)
{
// 如果在线程池的线程中销毁线程池,则分离当前线程
pthread_detach(pthread_self());
pool->nthreads--;
}
// 等待所有线程结束
while (pool->nthreads > 0)
pthread_cond_wait(&term, &pool->mutex);
pthread_mutex_unlock(&pool->mutex);
// 如果不是零ID则等待线程结束
if (!pthread_equal(pool->tid, __zero_tid))
pthread_join(pool->tid, NULL);
}
// 线程池创建线程函数
static int __thrdpool_create_threads(size_t nthreads, thrdpool_t* pool)
{
pthread_attr_t attr;
pthread_t tid;
int ret;
// 初始化线程属性
ret = pthread_attr_init(&attr);
if (ret == 0)
{
if (pool->stacksize)
pthread_attr_setstacksize(&attr, pool->stacksize);
while (pool->nthreads < nthreads)
{
// 创建线程
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
if (ret == 0)
pool->nthreads++;
else
break;
}
pthread_attr_destroy(&attr);
if (pool->nthreads == nthreads)
return 0;
// 如果创建失败,则销毁线程池
__thrdpool_terminate(0, pool);
}
errno = ret;
return -1;
}
// 创建线程池
thrdpool_t* thrdpool_create(size_t nthreads, size_t stacksize)
{
thrdpool_t* pool;
int ret;
pool = (thrdpool_t*)malloc(sizeof(thrdpool_t)); // 分配线程池结构体内存
if (!pool)
return NULL;
pool->msgqueue = msgqueue_create(0, 0); // 创建消息队列
if (pool->msgqueue)
{
ret = pthread_mutex_init(&pool->mutex, NULL); // 初始化互斥锁
if (ret == 0)
{
ret = pthread_key_create(&pool->key, NULL); // 创建线程局部存储键
if (ret == 0)
{
pool->stacksize = stacksize; // 设置线程栈大小
pool->nthreads = 0; // 初始化线程数量
pool->tid = __zero_tid; // 初始化线程ID
pool->terminate = NULL; // 初始化终止条件
if (__thrdpool_create_threads(nthreads, pool) >= 0) // 创建线程
return pool;
pthread_key_delete(pool->key); // 删除线程局部存储键
}
pthread_mutex_destroy(&pool->mutex); // 销毁互斥锁
}
errno = ret; // 设置错误号
msgqueue_destroy(pool->msgqueue); // 销毁消息队列
}
free(pool); // 释放线程池结构体内存
return NULL;
}
// 内部函数,用于将任务调度到线程池
inline void __thrdpool_schedule(const struct thrdpool_task* task, void* buf,
thrdpool_t* pool);
void __thrdpool_schedule(const struct thrdpool_task* task, void* buf,
thrdpool_t* pool)
{
((struct __thrdpool_task_entry*)buf)->task = *task; // 设置任务
msgqueue_put(buf, pool->msgqueue); // 将任务放入消息队列
}
// 调度任务到线程池
int thrdpool_schedule(const struct thrdpool_task* task, thrdpool_t* pool)
{
void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // 分配任务条目内存
if (buf)
{
__thrdpool_schedule(task, buf, pool); // 调度任务
return 0;
}
return -1;
}
// 判断当前线程是否在线程池中
inline int thrdpool_in_pool(thrdpool_t* pool);
int thrdpool_in_pool(thrdpool_t* pool)
{
return pthread_getspecific(pool->key) == pool; // 通过线程局部存储判断
}
// 增加线程池中的线程数量
int thrdpool_increase(thrdpool_t* pool)
{
pthread_attr_t attr;
pthread_t tid;
int ret;
ret = pthread_attr_init(&attr); // 初始化线程属性
if (ret == 0)
{
if (pool->stacksize)
pthread_attr_setstacksize(&attr, pool->stacksize); // 设置线程栈大小
pthread_mutex_lock(&pool->mutex); // 锁定互斥锁
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); // 创建线程
if (ret == 0)
pool->nthreads++; // 增加线程数量
pthread_mutex_unlock(&pool->mutex); // 解锁互斥锁
pthread_attr_destroy(&attr); // 销毁线程属性
if (ret == 0)
return 0;
}
errno = ret; // 设置错误号
return -1;
}
// 减少线程池中的线程数量
int thrdpool_decrease(thrdpool_t* pool)
{
void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // 分配任务条目内存
struct __thrdpool_task_entry* entry;
if (buf)
{
entry = (struct __thrdpool_task_entry*)buf;
entry->task.routine = __thrdpool_exit_routine; // 设置退出任务
entry->task.context = pool;
msgqueue_put_head(entry, pool->msgqueue); // 将退出任务放入消息队列头部
return 0;
}
return -1;
}
// 线程池中的线程退出
void thrdpool_exit(thrdpool_t* pool)
{
if (thrdpool_in_pool(pool)) // 如果当前线程在线程池中
__thrdpool_exit_routine(pool); // 执行退出操作
}
// 销毁线程池
void thrdpool_destroy(void (*pending)(const struct thrdpool_task*),
thrdpool_t* pool)
{
int in_pool = thrdpool_in_pool(pool); // 判断是否在线程池中
struct __thrdpool_task_entry* entry;
__thrdpool_terminate(in_pool, pool); // 终止线程池
while (1)
{
entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue); // 获取任务
if (!entry)
break;
if (pending && entry->task.routine != __thrdpool_exit_routine) // 如果有挂起任务处理函数
pending(&entry->task); // 处理挂起任务
free(entry); // 释放任务条目内存
}
pthread_key_delete(pool->key); // 删除线程局部存储键
pthread_mutex_destroy(&pool->mutex); // 销毁互斥锁
msgqueue_destroy(pool->msgqueue); // 销毁消息队列
if (!in_pool) // 如果不在线程池中,释放线程池结构体内存
free(pool);
}

@ -16,37 +16,41 @@
Author: Xie Han (xiehan@sogou-inc.com)
*/
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
#include "msgqueue.h"
#include "thrdpool.h"
#include <errno.h> // 包含错误号定义
#include <pthread.h> // 包含POSIX线程库
#include <stdlib.h> // 包含标准库函数如malloc和free
#include "msgqueue.h" // 包含消息队列的实现,用于线程间通信
#include "thrdpool.h" // 包含线程池的声明
// 线程池结构体定义
struct __thrdpool
{
msgqueue_t *msgqueue;
size_t nthreads;
size_t stacksize;
pthread_t tid;
pthread_mutex_t mutex;
pthread_key_t key;
pthread_cond_t *terminate;
msgqueue_t* msgqueue; // 消息队列,用于存放待执行的任务
size_t nthreads; // 线程池中的线程数量
size_t stacksize; // 线程栈的大小
pthread_t tid; // 线程ID
pthread_mutex_t mutex; // 互斥锁,用于保护线程池的共享数据
pthread_key_t key; // 线程局部存储的键
pthread_cond_t* terminate; // 条件变量,用于通知线程退出
};
// 任务条目结构体定义
struct __thrdpool_task_entry
{
void *link;
struct thrdpool_task task;
void* link; // 链接到下一个任务条目
struct thrdpool_task task; // 任务结构体,包含任务函数和上下文
};
// 静态变量用于表示一个空的线程ID
static pthread_t __zero_tid;
// 线程退出时的回调函数
static void __thrdpool_exit_routine(void* context)
{
thrdpool_t* pool = (thrdpool_t*)context;
pthread_t tid;
/* One thread joins another. Don't need to keep all thread IDs. */
// 锁定互斥锁
pthread_mutex_lock(&pool->mutex);
tid = pool->tid;
pool->tid = pthread_self();
@ -54,12 +58,14 @@ static void __thrdpool_exit_routine(void *context)
pthread_cond_signal(pool->terminate);
pthread_mutex_unlock(&pool->mutex);
// 如果不是零ID则等待线程结束
if (!pthread_equal(tid, __zero_tid))
pthread_join(tid, NULL);
pthread_exit(NULL);
}
// 线程池的线程函数
static void* __thrdpool_routine(void* arg)
{
thrdpool_t* pool = (thrdpool_t*)arg;
@ -67,59 +73,69 @@ static void *__thrdpool_routine(void *arg)
void (*task_routine)(void*);
void* task_context;
// 设置线程局部存储
pthread_setspecific(pool->key, pool);
while (!pool->terminate)
{
// 从消息队列中获取任务
entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue);
if (!entry)
break;
// 执行任务
task_routine = entry->task.routine;
task_context = entry->task.context;
free(entry);
task_routine(task_context);
// 如果线程池中的线程数量为0则线程池已被销毁
if (pool->nthreads == 0)
{
/* Thread pool was destroyed by the task. */
free(pool);
return NULL;
}
}
// 执行线程退出时的操作
__thrdpool_exit_routine(pool);
return NULL;
}
// 线程池销毁函数
static void __thrdpool_terminate(int in_pool, thrdpool_t* pool)
{
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
// 锁定互斥锁
pthread_mutex_lock(&pool->mutex);
msgqueue_set_nonblock(pool->msgqueue);
pool->terminate = &term;
if (in_pool)
{
/* Thread pool destroyed in a pool thread is legal. */
// 如果在线程池的线程中销毁线程池,则分离当前线程
pthread_detach(pthread_self());
pool->nthreads--;
}
// 等待所有线程结束
while (pool->nthreads > 0)
pthread_cond_wait(&term, &pool->mutex);
pthread_mutex_unlock(&pool->mutex);
// 如果不是零ID则等待线程结束
if (!pthread_equal(pool->tid, __zero_tid))
pthread_join(pool->tid, NULL);
}
// 线程池创建线程函数
static int __thrdpool_create_threads(size_t nthreads, thrdpool_t* pool)
{
pthread_attr_t attr;
pthread_t tid;
int ret;
// 初始化线程属性
ret = pthread_attr_init(&attr);
if (ret == 0)
{
@ -128,6 +144,7 @@ static int __thrdpool_create_threads(size_t nthreads, thrdpool_t *pool)
while (pool->nthreads < nthreads)
{
// 创建线程
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
if (ret == 0)
pool->nthreads++;
@ -139,6 +156,7 @@ static int __thrdpool_create_threads(size_t nthreads, thrdpool_t *pool)
if (pool->nthreads == nthreads)
return 0;
// 如果创建失败,则销毁线程池
__thrdpool_terminate(0, pool);
}
@ -146,148 +164,155 @@ static int __thrdpool_create_threads(size_t nthreads, thrdpool_t *pool)
return -1;
}
// 创建线程池
thrdpool_t* thrdpool_create(size_t nthreads, size_t stacksize)
{
thrdpool_t* pool;
int ret;
pool = (thrdpool_t *)malloc(sizeof (thrdpool_t));
pool = (thrdpool_t*)malloc(sizeof(thrdpool_t)); // 分配线程池结构体内存
if (!pool)
return NULL;
pool->msgqueue = msgqueue_create(0, 0);
pool->msgqueue = msgqueue_create(0, 0); // 创建消息队列
if (pool->msgqueue)
{
ret = pthread_mutex_init(&pool->mutex, NULL);
ret = pthread_mutex_init(&pool->mutex, NULL); // 初始化互斥锁
if (ret == 0)
{
ret = pthread_key_create(&pool->key, NULL);
ret = pthread_key_create(&pool->key, NULL); // 创建线程局部存储键
if (ret == 0)
{
pool->stacksize = stacksize;
pool->nthreads = 0;
pool->tid = __zero_tid;
pool->terminate = NULL;
if (__thrdpool_create_threads(nthreads, pool) >= 0)
pool->stacksize = stacksize; // 设置线程栈大小
pool->nthreads = 0; // 初始化线程数量
pool->tid = __zero_tid; // 初始化线程ID
pool->terminate = NULL; // 初始化终止条件
if (__thrdpool_create_threads(nthreads, pool) >= 0) // 创建线程
return pool;
pthread_key_delete(pool->key);
pthread_key_delete(pool->key); // 删除线程局部存储键
}
pthread_mutex_destroy(&pool->mutex);
pthread_mutex_destroy(&pool->mutex); // 销毁互斥锁
}
errno = ret;
msgqueue_destroy(pool->msgqueue);
errno = ret; // 设置错误号
msgqueue_destroy(pool->msgqueue); // 销毁消息队列
}
free(pool);
free(pool); // 释放线程池结构体内存
return NULL;
}
// 内部函数,用于将任务调度到线程池
inline void __thrdpool_schedule(const struct thrdpool_task* task, void* buf,
thrdpool_t* pool);
void __thrdpool_schedule(const struct thrdpool_task* task, void* buf,
thrdpool_t* pool)
{
((struct __thrdpool_task_entry *)buf)->task = *task;
msgqueue_put(buf, pool->msgqueue);
((struct __thrdpool_task_entry*)buf)->task = *task; // 设置任务
msgqueue_put(buf, pool->msgqueue); // 将任务放入消息队列
}
// 调度任务到线程池
int thrdpool_schedule(const struct thrdpool_task* task, thrdpool_t* pool)
{
void *buf = malloc(sizeof (struct __thrdpool_task_entry));
void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // 分配任务条目内存
if (buf)
{
__thrdpool_schedule(task, buf, pool);
__thrdpool_schedule(task, buf, pool); // 调度任务
return 0;
}
return -1;
}
// 判断当前线程是否在线程池中
inline int thrdpool_in_pool(thrdpool_t* pool);
int thrdpool_in_pool(thrdpool_t* pool)
{
return pthread_getspecific(pool->key) == pool;
return pthread_getspecific(pool->key) == pool; // 通过线程局部存储判断
}
// 增加线程池中的线程数量
int thrdpool_increase(thrdpool_t* pool)
{
pthread_attr_t attr;
pthread_t tid;
int ret;
ret = pthread_attr_init(&attr);
ret = pthread_attr_init(&attr); // 初始化线程属性
if (ret == 0)
{
if (pool->stacksize)
pthread_attr_setstacksize(&attr, pool->stacksize);
pthread_attr_setstacksize(&attr, pool->stacksize); // 设置线程栈大小
pthread_mutex_lock(&pool->mutex);
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
pthread_mutex_lock(&pool->mutex); // 锁定互斥锁
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); // 创建线程
if (ret == 0)
pool->nthreads++;
pool->nthreads++; // 增加线程数量
pthread_mutex_unlock(&pool->mutex);
pthread_attr_destroy(&attr);
pthread_mutex_unlock(&pool->mutex); // 解锁互斥锁
pthread_attr_destroy(&attr); // 销毁线程属性
if (ret == 0)
return 0;
}
errno = ret;
errno = ret; // 设置错误号
return -1;
}
// 减少线程池中的线程数量
int thrdpool_decrease(thrdpool_t* pool)
{
void *buf = malloc(sizeof (struct __thrdpool_task_entry));
void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // 分配任务条目内存
struct __thrdpool_task_entry* entry;
if (buf)
{
entry = (struct __thrdpool_task_entry*)buf;
entry->task.routine = __thrdpool_exit_routine;
entry->task.routine = __thrdpool_exit_routine; // 设置退出任务
entry->task.context = pool;
msgqueue_put_head(entry, pool->msgqueue);
msgqueue_put_head(entry, pool->msgqueue); // 将退出任务放入消息队列头部
return 0;
}
return -1;
}
// 线程池中的线程退出
void thrdpool_exit(thrdpool_t* pool)
{
if (thrdpool_in_pool(pool))
__thrdpool_exit_routine(pool);
if (thrdpool_in_pool(pool)) // 如果当前线程在线程池中
__thrdpool_exit_routine(pool); // 执行退出操作
}
// 销毁线程池
void thrdpool_destroy(void (*pending)(const struct thrdpool_task*),
thrdpool_t* pool)
{
int in_pool = thrdpool_in_pool(pool);
int in_pool = thrdpool_in_pool(pool); // 判断是否在线程池中
struct __thrdpool_task_entry* entry;
__thrdpool_terminate(in_pool, pool);
__thrdpool_terminate(in_pool, pool); // 终止线程池
while (1)
{
entry = (struct __thrdpool_task_entry *)msgqueue_get(pool->msgqueue);
entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue); // 获取任务
if (!entry)
break;
if (pending && entry->task.routine != __thrdpool_exit_routine)
pending(&entry->task);
if (pending && entry->task.routine != __thrdpool_exit_routine) // 如果有挂起任务处理函数
pending(&entry->task); // 处理挂起任务
free(entry);
free(entry); // 释放任务条目内存
}
pthread_key_delete(pool->key);
pthread_mutex_destroy(&pool->mutex);
msgqueue_destroy(pool->msgqueue);
if (!in_pool)
pthread_key_delete(pool->key); // 删除线程局部存储键
pthread_mutex_destroy(&pool->mutex); // 销毁互斥锁
msgqueue_destroy(pool->msgqueue); // 销毁消息队列
if (!in_pool) // 如果不在线程池中,释放线程池结构体内存
free(pool);
}

Loading…
Cancel
Save