cmz 8 months ago
parent b5a866474c
commit 7db2d843b7

@ -0,0 +1,122 @@
// 版权声明表明这段代码是Sogou, Inc.在2019年版权所有的并且这段代码是在Apache License 2.0下授权的。
/*
Copyright (c) 2019 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han (xiehan@sogou-inc.com)
*/
#include <stddef.h> // 包含标准定义头文件比如NULL和size_t的定义
#include <stdlib.h> // 包含标准库头文件比如malloc和free函数的定义
#include "poller.h" // 包含poller的头文件定义了单线程poller的接口
#include "mpoller.h" // 包含多线程poller的头文件定义了mpoller的接口
// 外部声明的函数用于创建和销毁poller对象。
extern poller_t* __poller_create(void**, const struct poller_params*);
extern void __poller_destroy(poller_t*);
// 静态函数用于创建多线程poller对象。
static int __mpoller_create(const struct poller_params* params,
mpoller_t* mpoller)
{
void** nodes_buf = (void**)calloc(params->max_open_files, sizeof(void*)); // 分配内存用于保存poller对象
unsigned int i;
if (nodes_buf) // 如果内存分配成功
{
for (i = 0; i < mpoller->nthreads; i++) // 循环创建多个poller对象
{
mpoller->poller[i] = __poller_create(nodes_buf, params); // 创建单个poller
if (!mpoller->poller[i]) // 如果创建失败
break; // 退出循环
}
if (i == mpoller->nthreads) // 如果所有poller对象都创建成功
{
mpoller->nodes_buf = nodes_buf; // 保存nodes_buf指针
return 0; // 返回成功
}
while (i > 0) // 如果创建失败销毁已经创建的poller对象
__poller_destroy(mpoller->poller[--i]);
free(nodes_buf); // 释放nodes_buf内存
}
return -1; // 返回失败
}
// 函数用于创建多线程poller对象。
mpoller_t* mpoller_create(const struct poller_params* params, size_t nthreads)
{
mpoller_t* mpoller; // 声明mpoller对象指针
size_t size; // 声明size变量用于计算所需内存大小
if (nthreads == 0) // 如果线程数为0则默认为1
nthreads = 1;
size = offsetof(mpoller_t, poller) + nthreads * sizeof(void*); // 计算所需内存大小
mpoller = (mpoller_t*)malloc(size); // 分配内存
if (mpoller) // 如果内存分配成功
{
mpoller->nthreads = (unsigned int)nthreads; // 设置线程数
if (__mpoller_create(params, mpoller) >= 0) // 如果创建成功
return mpoller; // 返回mpoller对象
free(mpoller); // 如果创建失败,释放内存
}
return NULL; // 返回NULL
}
// 函数用于启动多线程poller对象。
int mpoller_start(mpoller_t* mpoller)
{
size_t i; // 循环变量
for (i = 0; i < mpoller->nthreads; i++) // 循环启动每个poller线程
{
if (poller_start(mpoller->poller[i]) < 0) // 如果启动失败
break; // 退出循环
}
if (i == mpoller->nthreads) // 如果所有线程都启动成功
return 0; // 返回成功
while (i > 0) // 如果启动失败,停止已经启动的线程
poller_stop(mpoller->poller[--i]);
return -1; // 返回失败
}
// 函数用于停止多线程poller对象。
void mpoller_stop(mpoller_t* mpoller)
{
size_t i; // 循环变量
for (i = 0; i < mpoller->nthreads; i++) // 循环停止每个poller线程
poller_stop(mpoller->poller[i]);
}
// 函数用于销毁多线程poller对象。
void mpoller_destroy(mpoller_t* mpoller)
{
size_t i; // 循环变量
for (i = 0; i < mpoller->nthreads; i++) // 循环销毁每个poller对象
__poller_destroy(mpoller->poller[i]);
free(mpoller->nodes_buf); // 释放nodes_buf内存
free(mpoller); // 释放mpoller对象内存
}

@ -0,0 +1,211 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han (xiehan@sogou-inc.com)
*/
/*
* This message queue originates from the project of Sogou C++ Workflow:
* https://github.com/sogou/workflow
*
* The idea of this implementation is quite simple and obvious. When the
* get_list is not empty, the consumer takes a message. Otherwise the consumer
* waits till put_list is not empty, and swap two lists. This method performs
* well when the queue is very busy, and the number of consumers is big.
*/
#include <errno.h> // 包含错误号定义
#include <stdlib.h> // 包含标准库函数如malloc和free
#include <pthread.h> // 包含POSIX线程库
#include "msgqueue.h" // 包含消息队列的声明
// 消息队列结构体定义
struct __msgqueue
{
size_t msg_max; // 消息队列最大容量
size_t msg_cnt; // 当前消息数量
int linkoff; // 消息链接偏移量
int nonblock; // 是否为非阻塞模式
void* head1; // 头部指针
void* head2; // 另一个头部指针,用于优化
void** get_head; // 获取操作的头部指针
void** put_head; // 放入操作的头部指针
void** put_tail; // 放入操作的尾部指针
pthread_mutex_t get_mutex; // 获取操作的互斥锁
pthread_mutex_t put_mutex; // 放入操作的互斥锁
pthread_cond_t get_cond; // 获取操作的条件变量
pthread_cond_t put_cond; // 放入操作的条件变量
};
// 设置消息队列为非阻塞模式
void msgqueue_set_nonblock(msgqueue_t* queue)
{
queue->nonblock = 1; // 设置非阻塞模式
pthread_mutex_lock(&queue->put_mutex); // 锁定放入操作的互斥锁
pthread_cond_signal(&queue->get_cond); // 通知获取操作
pthread_cond_broadcast(&queue->put_cond); // 广播放入操作的条件变量
pthread_mutex_unlock(&queue->put_mutex); // 解锁放入操作的互斥锁
}
// 设置消息队列为阻塞模式
void msgqueue_set_block(msgqueue_t* queue)
{
queue->nonblock = 0; // 设置阻塞模式
}
// 将消息放入消息队列末尾
void msgqueue_put(void* msg, msgqueue_t* queue)
{
void** link = (void**)((char*)msg + queue->linkoff); // 获取消息的链接字段
*link = NULL; // 设置链接字段为空
pthread_mutex_lock(&queue->put_mutex); // 锁定放入操作的互斥锁
while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock) // 等待队列有空位
pthread_cond_wait(&queue->put_cond, &queue->put_mutex); // 等待条件变量
*queue->put_tail = link; // 将消息链接到队列尾部
queue->put_tail = link; // 更新尾部指针
queue->msg_cnt++; // 增加消息计数
pthread_mutex_unlock(&queue->put_mutex); // 解锁放入操作的互斥锁
pthread_cond_signal(&queue->get_cond); // 通知获取操作
}
// 将消息放入消息队列头部
void msgqueue_put_head(void* msg, msgqueue_t* queue)
{
void** link = (void**)((char*)msg + queue->linkoff); // 获取消息的链接字段
pthread_mutex_lock(&queue->put_mutex); // 锁定放入操作的互斥锁
while (*queue->get_head) // 如果队列非空
{
if (pthread_mutex_trylock(&queue->get_mutex) == 0) // 尝试锁定获取操作的互斥锁
{
pthread_mutex_unlock(&queue->put_mutex); // 解锁放入操作的互斥锁
*link = *queue->get_head; // 将消息链接到队列头部
*queue->get_head = link;
pthread_mutex_unlock(&queue->get_mutex); // 解锁获取操作的互斥锁
return;
}
}
while (queue->msg_cnt > queue->msg_max - 1 && !queue->nonblock) // 等待队列有空位
pthread_cond_wait(&queue->put_cond, &queue->put_mutex); // 等待条件变量
*link = *queue->put_head; // 将消息链接到队列头部
if (*link == NULL) // 如果原头部为空,则更新尾部指针
queue->put_tail = link;
*queue->put_head = link; // 更新头部指针
queue->msg_cnt++; // 增加消息计数
pthread_mutex_unlock(&queue->put_mutex); // 解锁放入操作的互斥锁
pthread_cond_signal(&queue->get_cond); // 通知获取操作
}
// 内部函数,用于交换消息队列的头部和尾部,以便从队列中获取消息
static size_t __msgqueue_swap(msgqueue_t* queue)
{
void** get_head = queue->get_head;
size_t cnt;
pthread_mutex_lock(&queue->put_mutex); // 锁定放入操作的互斥锁
while (queue->msg_cnt == 0 && !queue->nonblock) // 如果队列为空且不是非阻塞模式,则等待
pthread_cond_wait(&queue->get_cond, &queue->put_mutex); // 等待获取操作的条件变量
cnt = queue->msg_cnt; // 获取当前消息数量
if (cnt > queue->msg_max - 1) // 如果消息数量超过最大值减一,则广播放入操作的条件变量
pthread_cond_broadcast(&queue->put_cond);
queue->get_head = queue->put_head; // 交换获取和放入的头部指针
queue->put_head = get_head; //
queue->put_tail = get_head; // 重置放入操作的尾部指针
queue->msg_cnt = 0; // 重置消息数量
pthread_mutex_unlock(&queue->put_mutex); // 解锁放入操作的互斥锁
return cnt; // 返回消息数量
}
// 从消息队列中获取消息
void* msgqueue_get(msgqueue_t* queue)
{
void* msg;
pthread_mutex_lock(&queue->get_mutex); // 锁定获取操作的互斥锁
if (*queue->get_head || __msgqueue_swap(queue) > 0) // 如果队列非空或交换后有消息
{
msg = (char*)*queue->get_head - queue->linkoff; // 获取消息
*queue->get_head = *(void**)*queue->get_head; // 更新头部指针
}
else
msg = NULL; // 如果队列为空则返回NULL
pthread_mutex_unlock(&queue->get_mutex); // 解锁获取操作的互斥锁
return msg; // 返回消息
}
// 创建消息队列
msgqueue_t* msgqueue_create(size_t maxlen, int linkoff)
{
msgqueue_t* queue = (msgqueue_t*)malloc(sizeof(msgqueue_t)); // 分配消息队列结构体内存
int ret;
if (!queue)
return NULL;
ret = pthread_mutex_init(&queue->get_mutex, NULL); // 初始化获取操作的互斥锁
if (ret == 0)
{
ret = pthread_mutex_init(&queue->put_mutex, NULL); // 初始化放入操作的互斥锁
if (ret == 0)
{
ret = pthread_cond_init(&queue->get_cond, NULL); // 初始化获取操作的条件变量
if (ret == 0)
{
ret = pthread_cond_init(&queue->put_cond, NULL); // 初始化放入操作的条件变量
if (ret == 0)
{
queue->msg_max = maxlen; // 设置最大消息数量
queue->linkoff = linkoff; // 设置链接偏移量
queue->head1 = NULL;
queue->head2 = NULL;
queue->get_head = &queue->head1; // 设置获取操作的头部指针
queue->put_head = &queue->head2; // 设置放入操作的头部指针
queue->put_tail = &queue->head2; // 设置放入操作的尾部指针
queue->msg_cnt = 0; // 初始化消息数量
queue->nonblock = 0; // 初始化非阻塞模式
return queue; // 返回消息队列
}
pthread_cond_destroy(&queue->get_cond); // 销毁获取操作的条件变量
}
pthread_mutex_destroy(&queue->put_mutex); // 销毁放入操作的互斥锁
}
pthread_mutex_destroy(&queue->get_mutex); // 销毁获取操作的互斥锁
}
errno = ret; // 设置错误号
free(queue); // 释放消息队列结构体内存
return NULL; // 返回NULL
}
// 销毁消息队列
void msgqueue_destroy(msgqueue_t* queue)
{
pthread_cond_destroy(&queue->put_cond); // 销毁放入操作的条件变量
pthread_cond_destroy(&queue->get_cond); // 销毁获取操作的条件变量
pthread_mutex_destroy(&queue->put_mutex); // 销毁放入操作的互斥锁
pthread_mutex_destroy(&queue->get_mutex); // 销毁获取操作的互斥锁
free(queue); // 释放消息队列结构体内存
}

@ -0,0 +1,318 @@
/*
Copyright (c) 2019 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Xie Han (xiehan@sogou-inc.com)
*/
#include <errno.h> // 包含错误号定义
#include <pthread.h> // 包含POSIX线程库
#include <stdlib.h> // 包含标准库函数如malloc和free
#include "msgqueue.h" // 包含消息队列的实现,用于线程间通信
#include "thrdpool.h" // 包含线程池的声明
// 线程池结构体定义
struct __thrdpool
{
msgqueue_t* msgqueue; // 消息队列,用于存放待执行的任务
size_t nthreads; // 线程池中的线程数量
size_t stacksize; // 线程栈的大小
pthread_t tid; // 线程ID
pthread_mutex_t mutex; // 互斥锁,用于保护线程池的共享数据
pthread_key_t key; // 线程局部存储的键
pthread_cond_t* terminate; // 条件变量,用于通知线程退出
};
// 任务条目结构体定义
struct __thrdpool_task_entry
{
void* link; // 链接到下一个任务条目
struct thrdpool_task task; // 任务结构体,包含任务函数和上下文
};
// 静态变量用于表示一个空的线程ID
static pthread_t __zero_tid;
// 线程退出时的回调函数
static void __thrdpool_exit_routine(void* context)
{
thrdpool_t* pool = (thrdpool_t*)context;
pthread_t tid;
// 锁定互斥锁
pthread_mutex_lock(&pool->mutex);
tid = pool->tid;
pool->tid = pthread_self();
if (--pool->nthreads == 0 && pool->terminate)
pthread_cond_signal(pool->terminate);
pthread_mutex_unlock(&pool->mutex);
// 如果不是零ID则等待线程结束
if (!pthread_equal(tid, __zero_tid))
pthread_join(tid, NULL);
pthread_exit(NULL);
}
// 线程池的线程函数
static void* __thrdpool_routine(void* arg)
{
thrdpool_t* pool = (thrdpool_t*)arg;
struct __thrdpool_task_entry* entry;
void (*task_routine)(void*);
void* task_context;
// 设置线程局部存储
pthread_setspecific(pool->key, pool);
while (!pool->terminate)
{
// 从消息队列中获取任务
entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue);
if (!entry)
break;
// 执行任务
task_routine = entry->task.routine;
task_context = entry->task.context;
free(entry);
task_routine(task_context);
// 如果线程池中的线程数量为0则线程池已被销毁
if (pool->nthreads == 0)
{
free(pool);
return NULL;
}
}
// 执行线程退出时的操作
__thrdpool_exit_routine(pool);
return NULL;
}
// 线程池销毁函数
static void __thrdpool_terminate(int in_pool, thrdpool_t* pool)
{
pthread_cond_t term = PTHREAD_COND_INITIALIZER;
// 锁定互斥锁
pthread_mutex_lock(&pool->mutex);
msgqueue_set_nonblock(pool->msgqueue);
pool->terminate = &term;
if (in_pool)
{
// 如果在线程池的线程中销毁线程池,则分离当前线程
pthread_detach(pthread_self());
pool->nthreads--;
}
// 等待所有线程结束
while (pool->nthreads > 0)
pthread_cond_wait(&term, &pool->mutex);
pthread_mutex_unlock(&pool->mutex);
// 如果不是零ID则等待线程结束
if (!pthread_equal(pool->tid, __zero_tid))
pthread_join(pool->tid, NULL);
}
// 线程池创建线程函数
static int __thrdpool_create_threads(size_t nthreads, thrdpool_t* pool)
{
pthread_attr_t attr;
pthread_t tid;
int ret;
// 初始化线程属性
ret = pthread_attr_init(&attr);
if (ret == 0)
{
if (pool->stacksize)
pthread_attr_setstacksize(&attr, pool->stacksize);
while (pool->nthreads < nthreads)
{
// 创建线程
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool);
if (ret == 0)
pool->nthreads++;
else
break;
}
pthread_attr_destroy(&attr);
if (pool->nthreads == nthreads)
return 0;
// 如果创建失败,则销毁线程池
__thrdpool_terminate(0, pool);
}
errno = ret;
return -1;
}
// 创建线程池
thrdpool_t* thrdpool_create(size_t nthreads, size_t stacksize)
{
thrdpool_t* pool;
int ret;
pool = (thrdpool_t*)malloc(sizeof(thrdpool_t)); // 分配线程池结构体内存
if (!pool)
return NULL;
pool->msgqueue = msgqueue_create(0, 0); // 创建消息队列
if (pool->msgqueue)
{
ret = pthread_mutex_init(&pool->mutex, NULL); // 初始化互斥锁
if (ret == 0)
{
ret = pthread_key_create(&pool->key, NULL); // 创建线程局部存储键
if (ret == 0)
{
pool->stacksize = stacksize; // 设置线程栈大小
pool->nthreads = 0; // 初始化线程数量
pool->tid = __zero_tid; // 初始化线程ID
pool->terminate = NULL; // 初始化终止条件
if (__thrdpool_create_threads(nthreads, pool) >= 0) // 创建线程
return pool;
pthread_key_delete(pool->key); // 删除线程局部存储键
}
pthread_mutex_destroy(&pool->mutex); // 销毁互斥锁
}
errno = ret; // 设置错误号
msgqueue_destroy(pool->msgqueue); // 销毁消息队列
}
free(pool); // 释放线程池结构体内存
return NULL;
}
// 内部函数,用于将任务调度到线程池
inline void __thrdpool_schedule(const struct thrdpool_task* task, void* buf,
thrdpool_t* pool);
void __thrdpool_schedule(const struct thrdpool_task* task, void* buf,
thrdpool_t* pool)
{
((struct __thrdpool_task_entry*)buf)->task = *task; // 设置任务
msgqueue_put(buf, pool->msgqueue); // 将任务放入消息队列
}
// 调度任务到线程池
int thrdpool_schedule(const struct thrdpool_task* task, thrdpool_t* pool)
{
void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // 分配任务条目内存
if (buf)
{
__thrdpool_schedule(task, buf, pool); // 调度任务
return 0;
}
return -1;
}
// 判断当前线程是否在线程池中
inline int thrdpool_in_pool(thrdpool_t* pool);
int thrdpool_in_pool(thrdpool_t* pool)
{
return pthread_getspecific(pool->key) == pool; // 通过线程局部存储判断
}
// 增加线程池中的线程数量
int thrdpool_increase(thrdpool_t* pool)
{
pthread_attr_t attr;
pthread_t tid;
int ret;
ret = pthread_attr_init(&attr); // 初始化线程属性
if (ret == 0)
{
if (pool->stacksize)
pthread_attr_setstacksize(&attr, pool->stacksize); // 设置线程栈大小
pthread_mutex_lock(&pool->mutex); // 锁定互斥锁
ret = pthread_create(&tid, &attr, __thrdpool_routine, pool); // 创建线程
if (ret == 0)
pool->nthreads++; // 增加线程数量
pthread_mutex_unlock(&pool->mutex); // 解锁互斥锁
pthread_attr_destroy(&attr); // 销毁线程属性
if (ret == 0)
return 0;
}
errno = ret; // 设置错误号
return -1;
}
// 减少线程池中的线程数量
int thrdpool_decrease(thrdpool_t* pool)
{
void* buf = malloc(sizeof(struct __thrdpool_task_entry)); // 分配任务条目内存
struct __thrdpool_task_entry* entry;
if (buf)
{
entry = (struct __thrdpool_task_entry*)buf;
entry->task.routine = __thrdpool_exit_routine; // 设置退出任务
entry->task.context = pool;
msgqueue_put_head(entry, pool->msgqueue); // 将退出任务放入消息队列头部
return 0;
}
return -1;
}
// 线程池中的线程退出
void thrdpool_exit(thrdpool_t* pool)
{
if (thrdpool_in_pool(pool)) // 如果当前线程在线程池中
__thrdpool_exit_routine(pool); // 执行退出操作
}
// 销毁线程池
void thrdpool_destroy(void (*pending)(const struct thrdpool_task*),
thrdpool_t* pool)
{
int in_pool = thrdpool_in_pool(pool); // 判断是否在线程池中
struct __thrdpool_task_entry* entry;
__thrdpool_terminate(in_pool, pool); // 终止线程池
while (1)
{
entry = (struct __thrdpool_task_entry*)msgqueue_get(pool->msgqueue); // 获取任务
if (!entry)
break;
if (pending && entry->task.routine != __thrdpool_exit_routine) // 如果有挂起任务处理函数
pending(&entry->task); // 处理挂起任务
free(entry); // 释放任务条目内存
}
pthread_key_delete(pool->key); // 删除线程局部存储键
pthread_mutex_destroy(&pool->mutex); // 销毁互斥锁
msgqueue_destroy(pool->msgqueue); // 销毁消息队列
if (!in_pool) // 如果不在线程池中,释放线程池结构体内存
free(pool);
}
Loading…
Cancel
Save