cmz 8 months ago
parent eb76b44d1b
commit 81825752d9

@ -0,0 +1,211 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han (xiehan@sogou-inc.com)
*/
/*
* This message queue originates from the project of Sogou C++ Workflow:
* https://github.com/sogou/workflow
*
* The idea of this implementation is quite simple and obvious. When the
* get_list is not empty, the consumer takes a message. Otherwise the consumer
* waits till put_list is not empty, and swap two lists. This method performs
* well when the queue is very busy, and the number of consumers is big.
*/
#include <errno.h> // 包含错误号定义
#include <stdlib.h> // 包含标准库函数如malloc和free
#include <pthread.h> // 包含POSIX线程库
#include "msgqueue.h" // 包含消息队列的声明
// 消息队列结构体定义
struct __msgqueue
{
size_t msg_max; // 消息队列最大容量
size_t msg_cnt; // 当前消息数量
int linkoff; // 消息链接偏移量
int nonblock; // 是否为非阻塞模式
void* head1; // 头部指针
void* head2; // 另一个头部指针,用于优化
void** get_head; // 获取操作的头部指针
void** put_head; // 放入操作的头部指针
void** put_tail; // 放入操作的尾部指针
pthread_mutex_t get_mutex; // 获取操作的互斥锁
pthread_mutex_t put_mutex; // 放入操作的互斥锁
pthread_cond_t get_cond; // 获取操作的条件变量
pthread_cond_t put_cond; // 放入操作的条件变量
};
// 设置消息队列为非阻塞模式
void msgqueue_set_nonblock(msgqueue_t* queue)
{
queue->nonblock = 1; // 设置非阻塞模式
pthread_mutex_lock(&queue->put_mutex); // 锁定放入操作的互斥锁
pthread_cond_signal(&queue->get_cond); // 通知获取操作
pthread_cond_broadcast(&queue->put_cond); // 广播放入操作的条件变量
pthread_mutex_unlock(&queue->put_mutex); // 解锁放入操作的互斥锁
}
// 设置消息队列为阻塞模式
void msgqueue_set_block(msgqueue_t* queue)
{
queue->nonblock = 0; // 设置阻塞模式
}
// 将消息放入消息队列末尾
void msgqueue_put(void* msg, msgqueue_t* queue)
{
void** link = (void**)((char*)msg + queue->linkoff); // 获取消息的链接字段
*link = NULL; // 设置链接字段为空
pthread_mutex_lock(&queue->put_mutex); // 锁定放入操作的互斥锁
while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock) // 等待队列有空位
pthread_cond_wait(&queue->put_cond, &queue->put_mutex); // 等待条件变量
*queue->put_tail = link; // 将消息链接到队列尾部
queue->put_tail = link; // 更新尾部指针
queue->msg_cnt++; // 增加消息计数
pthread_mutex_unlock(&queue->put_mutex); // 解锁放入操作的互斥锁
pthread_cond_signal(&queue->get_cond); // 通知获取操作
}
// 将消息放入消息队列头部
void msgqueue_put_head(void* msg, msgqueue_t* queue)
{
void** link = (void**)((char*)msg + queue->linkoff); // 获取消息的链接字段
pthread_mutex_lock(&queue->put_mutex); // 锁定放入操作的互斥锁
while (*queue->get_head) // 如果队列非空
{
if (pthread_mutex_trylock(&queue->get_mutex) == 0) // 尝试锁定获取操作的互斥锁
{
pthread_mutex_unlock(&queue->put_mutex); // 解锁放入操作的互斥锁
*link = *queue->get_head; // 将消息链接到队列头部
*queue->get_head = link;
pthread_mutex_unlock(&queue->get_mutex); // 解锁获取操作的互斥锁
return;
}
}
while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock) // 等待队列有空位
pthread_cond_wait(&queue->put_cond, &queue->put_mutex); // 等待条件变量
*link = *queue->put_head; // 将消息链接到队列头部
if (*link == NULL) // 如果原头部为空,则更新尾部指针
queue->put_tail = link;
*queue->put_head = link; // 更新头部指针
queue->msg_cnt++; // 增加消息计数
pthread_mutex_unlock(&queue->put_mutex); // 解锁放入操作的互斥锁
pthread_cond_signal(&queue->get_cond); // 通知获取操作
}
// 内部函数,用于交换消息队列的头部和尾部,以便从队列中获取消息
static size_t __msgqueue_swap(msgqueue_t* queue)
{
void** get_head = queue->get_head;
size_t cnt;
pthread_mutex_lock(&queue->put_mutex); // 锁定放入操作的互斥锁
while (queue->msg_cnt == 0 && !queue->nonblock) // 如果队列为空且不是非阻塞模式,则等待
pthread_cond_wait(&queue->get_cond, &queue->put_mutex); // 等待获取操作的条件变量
cnt = queue->msg_cnt; // 获取当前消息数量
if (cnt > queue->msg_max - 1) // 如果消息数量超过最大值减一,则广播放入操作的条件变量
pthread_cond_broadcast(&queue->put_cond);
queue->get_head = queue->put_head; // 交换获取和放入的头部指针
queue->put_head = get_head; //
queue->put_tail = get_head; // 重置放入操作的尾部指针
queue->msg_cnt = 0; // 重置消息数量
pthread_mutex_unlock(&queue->put_mutex); // 解锁放入操作的互斥锁
return cnt; // 返回消息数量
}
// 从消息队列中获取消息
void* msgqueue_get(msgqueue_t* queue)
{
void* msg;
pthread_mutex_lock(&queue->get_mutex); // 锁定获取操作的互斥锁
if (*queue->get_head || __msgqueue_swap(queue) > 0) // 如果队列非空或交换后有消息
{
msg = (char*)*queue->get_head - queue->linkoff; // 获取消息
*queue->get_head = *(void**)*queue->get_head; // 更新头部指针
}
else
msg = NULL; // 如果队列为空则返回NULL
pthread_mutex_unlock(&queue->get_mutex); // 解锁获取操作的互斥锁
return msg; // 返回消息
}
// 创建消息队列
msgqueue_t* msgqueue_create(size_t maxlen, int linkoff)
{
msgqueue_t* queue = (msgqueue_t*)malloc(sizeof(msgqueue_t)); // 分配消息队列结构体内存
int ret;
if (!queue)
return NULL;
ret = pthread_mutex_init(&queue->get_mutex, NULL); // 初始化获取操作的互斥锁
if (ret == 0)
{
ret = pthread_mutex_init(&queue->put_mutex, NULL); // 初始化放入操作的互斥锁
if (ret == 0)
{
ret = pthread_cond_init(&queue->get_cond, NULL); // 初始化获取操作的条件变量
if (ret == 0)
{
ret = pthread_cond_init(&queue->put_cond, NULL); // 初始化放入操作的条件变量
if (ret == 0)
{
queue->msg_max = maxlen; // 设置最大消息数量
queue->linkoff = linkoff; // 设置链接偏移量
queue->head1 = NULL;
queue->head2 = NULL;
queue->get_head = &queue->head1; // 设置获取操作的头部指针
queue->put_head = &queue->head2; // 设置放入操作的头部指针
queue->put_tail = &queue->head2; // 设置放入操作的尾部指针
queue->msg_cnt = 0; // 初始化消息数量
queue->nonblock = 0; // 初始化非阻塞模式
return queue; // 返回消息队列
}
pthread_cond_destroy(&queue->get_cond); // 销毁获取操作的条件变量
}
pthread_mutex_destroy(&queue->put_mutex); // 销毁放入操作的互斥锁
}
pthread_mutex_destroy(&queue->get_mutex); // 销毁获取操作的互斥锁
}
errno = ret; // 设置错误号
free(queue); // 释放消息队列结构体内存
return NULL; // 返回NULL
}
// 销毁消息队列
void msgqueue_destroy(msgqueue_t* queue)
{
pthread_cond_destroy(&queue->put_cond); // 销毁放入操作的条件变量
pthread_cond_destroy(&queue->get_cond); // 销毁获取操作的条件变量
pthread_mutex_destroy(&queue->put_mutex); // 销毁放入操作的互斥锁
pthread_mutex_destroy(&queue->get_mutex); // 销毁获取操作的互斥锁
free(queue); // 释放消息队列结构体内存
}

@ -16,303 +16,278 @@
Author: Xie Han (xiehan@sogou-inc.com)
*/
#include <errno.h> // 包含错误号定义
#include <pthread.h> // 包含POSIX线程库
#include <stdlib.h> // 包含标准库函数如malloc和free
#include "msgqueue.h" // 包含消息队列的实现,用于线程间通信
#include "thrdpool.h" // 包含线程池的声明
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
#include "msgqueue.h"
#include "thrdpool.h"
// 线程池结构体定义
struct __thrdpool
{
msgqueue_t* msgqueue; // 消息队列,用于存放待执行的任务
size_t nthreads; // 线程池中的线程数量
size_t stacksize; // 线程栈的大小
pthread_t tid; // 线程ID
pthread_mutex_t mutex; // 互斥锁,用于保护线程池的共享数据
pthread_key_t key; // 线程局部存储的键
pthread_cond_t* terminate; // 条件变量,用于通知线程退出
msgqueue_t *msgqueue;
size_t nthreads;
size_t stacksize;
pthread_t tid;
pthread_mutex_t mutex;
pthread_key_t key;
pthread_cond_t *terminate;
};
// 任务条目结构体定义
struct __thrdpool_task_entry
{
void* link; // 链接到下一个任务条目
struct thrdpool_task task; // 任务结构体,包含任务函数和上下文
void *link;
struct thrdpool_task task;
};
// 静态变量用于表示一个空的线程ID
static pthread_t __zero_tid;
// 线程退出时的回调函数
static void __thrdpool_exit_routine(void* context)
static void __thrdpool_exit_routine(void *context)
{
thrdpool_t* pool = (thrdpool_t*)context;
pthread_t tid;
// 锁定互斥锁
pthread_mutex_lock(&pool->mutex);
tid = pool->tid;
pool->tid = pthread_self();
if (--pool->nthreads == 0 && pool->terminate)
pthread_cond_signal(pool->terminate);
pthread_mutex_unlock(&pool->mutex);
// 如果不是零ID则等待线程结束
if (!pthread_equal(tid, __zero_tid))
pthread_join(tid, NULL);
pthread_exit(NULL);
thrdpool_t *pool = (thrdpool_t *)context;
pthread_t tid;
/* One thread joins another. Don't need to keep all thread IDs. */
pthread_mutex_lock(&pool->mutex);
tid = pool->tid;
pool->tid = pthread_self();
if (--pool->nthreads == 0 && pool->terminate)
pthread_cond_signal(pool->terminate);
pthread_mutex_unlock(&pool->mutex);
if (!pthread_equal(tid, __zero_tid))
pthread_join(tid, NULL);
pthread_exit(NULL);
}
// 线程池的线程函数
static void* __thrdpool_routine(void* arg)
static void *__thrdpool_routine(void *arg)
{
thrdpool_t* pool = (thrdpool_t*)arg;
struct __thrdpool_task_entry* entry;
void (*task_routine)(void*);
void* task_context;
// 设置线程局部存储
pthread_setspecific(pool->key, pool);
while (!pool->terminate)
{
// 从消息队列中获取任务
entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue);
if (!entry)
break;
// 执行任务
task_routine = entry->task.routine;
task_context = entry->task.context;
free(entry);
task_routine(task_context);
// 如果线程池中的线程数量为0则线程池已被销毁
if (pool->nthreads == 0)
{
free(pool);
return NULL;
}
}
// 执行线程退出时的操作
__thrdpool_exit_routine(pool);
return NULL;
thrdpool_t *pool = (thrdpool_t *)arg;
struct __thrdpool_task_entry *entry;
void (*task_routine)(void *);
void *task_context;
pthread_setspecific(pool->key, pool);
while (!pool->terminate)
{
entry = (struct __thrdpool_task_entry *)msgqueue_get(pool->msgqueue);
if (!entry)
break;
task_routine = entry->task.routine;
task_context = entry->task.context;
free(entry);
task_routine(task_context);
if (pool->nthreads == 0)
{
/* Thread pool was destroyed by the task. */
free(pool);
return NULL;
}
}
__thrdpool_exit_routine(pool);
return NULL;
}
// 线程池销毁函数
static void __thrdpool_terminate(int in_pool, thrdpool_t* pool)
static void __thrdpool_terminate(int in_pool, thrdpool_t *pool)
{
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
// 锁定互斥锁
pthread_mutex_lock(&pool->mutex);
msgqueue_set_nonblock(pool->msgqueue);
pool->terminate = &term;
if (in_pool)
{
// 如果在线程池的线程中销毁线程池,则分离当前线程
pthread_detach(pthread_self());
pool->nthreads--;
}
// 等待所有线程结束
while (pool->nthreads > 0)
pthread_cond_wait(&term, &pool->mutex);
pthread_mutex_unlock(&pool->mutex);
// 如果不是零ID则等待线程结束
if (!pthread_equal(pool->tid, __zero_tid))
pthread_join(pool->tid, NULL);
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
pthread_mutex_lock(&pool->mutex);
msgqueue_set_nonblock(pool->msgqueue);
pool->terminate = &term;
if (in_pool)
{
/* Thread pool destroyed in a pool thread is legal. */
pthread_detach(pthread_self());
pool->nthreads--;
}
while (pool->nthreads > 0)
pthread_cond_wait(&term, &pool->mutex);
pthread_mutex_unlock(&pool->mutex);
if (!pthread_equal(pool->tid, __zero_tid))
pthread_join(pool->tid, NULL);
}
// 线程池创建线程函数
static int __thrdpool_create_threads(size_t nthreads, thrdpool_t* pool)
static int __thrdpool_create_threads(size_t nthreads, thrdpool_t *pool)
{
pthread_attr_t attr;
pthread_t tid;
int ret;
// 初始化线程属性
ret = pthread_attr_init(&attr);
if (ret == 0)
{
if (pool->stacksize)
pthread_attr_setstacksize(&attr, pool->stacksize);
while (pool->nthreads < nthreads)
{
// 创建线程
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
if (ret == 0)
pool->nthreads++;
else
break;
}
pthread_attr_destroy(&attr);
if (pool->nthreads == nthreads)
return 0;
// 如果创建失败,则销毁线程池
__thrdpool_terminate(0, pool);
}
errno = ret;
return -1;
pthread_attr_t attr;
pthread_t tid;
int ret;
ret = pthread_attr_init(&attr);
if (ret == 0)
{
if (pool->stacksize)
pthread_attr_setstacksize(&attr, pool->stacksize);
while (pool->nthreads < nthreads)
{
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
if (ret == 0)
pool->nthreads++;
else
break;
}
pthread_attr_destroy(&attr);
if (pool->nthreads == nthreads)
return 0;
__thrdpool_terminate(0, pool);
}
errno = ret;
return -1;
}
// 创建线程池
thrdpool_t* thrdpool_create(size_t nthreads, size_t stacksize)
thrdpool_t *thrdpool_create(size_t nthreads, size_t stacksize)
{
thrdpool_t* pool;
int ret;
pool = (thrdpool_t*)malloc(sizeof(thrdpool_t)); // 分配线程池结构体内存
if (!pool)
return NULL;
pool->msgqueue = msgqueue_create(0, 0); // 创建消息队列
if (pool->msgqueue)
{
ret = pthread_mutex_init(&pool->mutex, NULL); // 初始化互斥锁
if (ret == 0)
{
ret = pthread_key_create(&pool->key, NULL); // 创建线程局部存储键
if (ret == 0)
{
pool->stacksize = stacksize; // 设置线程栈大小
pool->nthreads = 0; // 初始化线程数量
pool->tid = __zero_tid; // 初始化线程ID
pool->terminate = NULL; // 初始化终止条件
if (__thrdpool_create_threads(nthreads, pool) >= 0) // 创建线程
return pool;
pthread_key_delete(pool->key); // 删除线程局部存储键
}
pthread_mutex_destroy(&pool->mutex); // 销毁互斥锁
}
errno = ret; // 设置错误号
msgqueue_destroy(pool->msgqueue); // 销毁消息队列
}
free(pool); // 释放线程池结构体内存
return NULL;
thrdpool_t *pool;
int ret;
pool = (thrdpool_t *)malloc(sizeof (thrdpool_t));
if (!pool)
return NULL;
pool->msgqueue = msgqueue_create(0, 0);
if (pool->msgqueue)
{
ret = pthread_mutex_init(&pool->mutex, NULL);
if (ret == 0)
{
ret = pthread_key_create(&pool->key, NULL);
if (ret == 0)
{
pool->stacksize = stacksize;
pool->nthreads = 0;
pool->tid = __zero_tid;
pool->terminate = NULL;
if (__thrdpool_create_threads(nthreads, pool) >= 0)
return pool;
pthread_key_delete(pool->key);
}
pthread_mutex_destroy(&pool->mutex);
}
errno = ret;
msgqueue_destroy(pool->msgqueue);
}
free(pool);
return NULL;
}
// 内部函数,用于将任务调度到线程池
inline void __thrdpool_schedule(const struct thrdpool_task* task, void* buf,
thrdpool_t* pool);
inline void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
thrdpool_t *pool);
void __thrdpool_schedule(const struct thrdpool_task* task, void* buf,
thrdpool_t* pool)
void __thrdpool_schedule(const struct thrdpool_task *task, void *buf,
thrdpool_t *pool)
{
((struct __thrdpool_task_entry*)buf)->task = *task; // 设置任务
msgqueue_put(buf, pool->msgqueue); // 将任务放入消息队列
((struct __thrdpool_task_entry *)buf)->task = *task;
msgqueue_put(buf, pool->msgqueue);
}
// 调度任务到线程池
int thrdpool_schedule(const struct thrdpool_task* task, thrdpool_t* pool)
int thrdpool_schedule(const struct thrdpool_task *task, thrdpool_t *pool)
{
void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // 分配任务条目内存
void *buf = malloc(sizeof (struct __thrdpool_task_entry));
if (buf)
{
__thrdpool_schedule(task, buf, pool); // 调度任务
return 0;
}
if (buf)
{
__thrdpool_schedule(task, buf, pool);
return 0;
}
return -1;
return -1;
}
// 判断当前线程是否在线程池中
inline int thrdpool_in_pool(thrdpool_t* pool);
inline int thrdpool_in_pool(thrdpool_t *pool);
int thrdpool_in_pool(thrdpool_t* pool)
int thrdpool_in_pool(thrdpool_t *pool)
{
return pthread_getspecific(pool->key) == pool; // 通过线程局部存储判断
return pthread_getspecific(pool->key) == pool;
}
// 增加线程池中的线程数量
int thrdpool_increase(thrdpool_t* pool)
int thrdpool_increase(thrdpool_t *pool)
{
pthread_attr_t attr;
pthread_t tid;
int ret;
ret = pthread_attr_init(&attr); // 初始化线程属性
if (ret == 0)
{
if (pool->stacksize)
pthread_attr_setstacksize(&attr, pool->stacksize); // 设置线程栈大小
pthread_mutex_lock(&pool->mutex); // 锁定互斥锁
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); // 创建线程
if (ret == 0)
pool->nthreads++; // 增加线程数量
pthread_mutex_unlock(&pool->mutex); // 解锁互斥锁
pthread_attr_destroy(&attr); // 销毁线程属性
if (ret == 0)
return 0;
}
errno = ret; // 设置错误号
return -1;
pthread_attr_t attr;
pthread_t tid;
int ret;
ret = pthread_attr_init(&attr);
if (ret == 0)
{
if (pool->stacksize)
pthread_attr_setstacksize(&attr, pool->stacksize);
pthread_mutex_lock(&pool->mutex);
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
if (ret == 0)
pool->nthreads++;
pthread_mutex_unlock(&pool->mutex);
pthread_attr_destroy(&attr);
if (ret == 0)
return 0;
}
errno = ret;
return -1;
}
// 减少线程池中的线程数量
int thrdpool_decrease(thrdpool_t* pool)
int thrdpool_decrease(thrdpool_t *pool)
{
void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // 分配任务条目内存
struct __thrdpool_task_entry* entry;
if (buf)
{
entry = (struct __thrdpool_task_entry*)buf;
entry->task.routine = __thrdpool_exit_routine; // 设置退出任务
entry->task.context = pool;
msgqueue_put_head(entry, pool->msgqueue); // 将退出任务放入消息队列头部
return 0;
}
return -1;
void *buf = malloc(sizeof (struct __thrdpool_task_entry));
struct __thrdpool_task_entry *entry;
if (buf)
{
entry = (struct __thrdpool_task_entry *)buf;
entry->task.routine = __thrdpool_exit_routine;
entry->task.context = pool;
msgqueue_put_head(entry, pool->msgqueue);
return 0;
}
return -1;
}
// 线程池中的线程退出
void thrdpool_exit(thrdpool_t* pool)
void thrdpool_exit(thrdpool_t *pool)
{
if (thrdpool_in_pool(pool)) // 如果当前线程在线程池中
__thrdpool_exit_routine(pool); // 执行退出操作
if (thrdpool_in_pool(pool))
__thrdpool_exit_routine(pool);
}
// 销毁线程池
void thrdpool_destroy(void (*pending)(const struct thrdpool_task*),
thrdpool_t* pool)
void thrdpool_destroy(void (*pending)(const struct thrdpool_task *),
thrdpool_t *pool)
{
int in_pool = thrdpool_in_pool(pool); // 判断是否在线程池中
struct __thrdpool_task_entry* entry;
__thrdpool_terminate(in_pool, pool); // 终止线程池
while (1)
{
entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue); // 获取任务
if (!entry)
break;
if (pending && entry->task.routine != __thrdpool_exit_routine) // 如果有挂起任务处理函数
pending(&entry->task); // 处理挂起任务
free(entry); // 释放任务条目内存
}
pthread_key_delete(pool->key); // 删除线程局部存储键
pthread_mutex_destroy(&pool->mutex); // 销毁互斥锁
msgqueue_destroy(pool->msgqueue); // 销毁消息队列
if (!in_pool) // 如果不在线程池中,释放线程池结构体内存
free(pool);
}
int in_pool = thrdpool_in_pool(pool);
struct __thrdpool_task_entry *entry;
__thrdpool_terminate(in_pool, pool);
while (1)
{
entry = (struct __thrdpool_task_entry *)msgqueue_get(pool->msgqueue);
if (!entry)
break;
if (pending && entry->task.routine != __thrdpool_exit_routine)
pending(&entry->task);
free(entry);
}
pthread_key_delete(pool->key);
pthread_mutex_destroy(&pool->mutex);
msgqueue_destroy(pool->msgqueue);
if (!in_pool)
free(pool);
}

Loading…
Cancel
Save