线程池:一个锁,两个条件变量,循环队列
2023-12-26 19:46:12
为什么要有线程池?
????????以网络编程服务器端为例,作为服务器端支持高并发,可以有多个客户端连接,发出请求,对于多个请求我们每次都去建立线程,这样线程会创建很多,而且线程执行完销毁也会有很大的系统开销,使用上效率很低。
??????线程池是一个抽象概念,可以简单的认为若干线程在一起运行,线程不退出,等待有任务处理。线程池,事先创建几个线程,不停取任务,如果没有任务休眠,省去了不停的创建线程销毁销毁线程的事件和资源。
线程设计思想:一个锁,两个条件变量,循环队列
????????我们也知道创建线程并非多多益善,所以我们的思路是提前创建好若干个线程,不退出,等待任务的产生,去接收任务处理后等待下一个任务。
线程们通过生产者和消费者模型去协调接收任务并且处理。
线程池上的线程利用回调机制,我们同样可以借助结构体的方式,对任务进行封装,比如任务的数据和任务处理回调都封装在结构体上,这样线程池的工作线程拿到任务的同时,也知道该如何执行了。
????????注意:?线程池处理的任务, 所需要处理的时间必须很短
- 1、任务队列需要上锁
????????线程创建后会并发地取任务,有可能造成多个线程争夺一个任务。所以我们需要给任务队列上锁,同一时间只允许一个线程或许任务。(生产者-消费者模式)
- 2、队列为空需要条件变量
? ? ? ? 当任务队列为空时,线程们取不到任务,会频繁访问队列锁(开锁后,没任务,只能退出解锁),浪费大量系统资源。所以需要一个队列为空的条件变量,当队列为空时,自动阻塞一个线程,直到有任务添加到队列时为止?。
- 3、队列为满需要条件变量
? ? ? ? 当任务队列为满时,线程池已经无法接纳新的任务,则需要通知添加任务的线程阻塞等待。
代码:
typedef struct _PoolTask
{
int tasknum;//模拟任务编号
void *arg;//回调函数参数
void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;
typedef struct _ThreadPool
{
int max_job_num;//最大任务个数
int job_num;//实际任务个数
PoolTask *tasks;//任务队列数组
int job_push;//入队位置
int job_pop;// 出队位置
int thr_num;//线程池内线程个数
pthread_t *threads;//线程池内线程数组
int shutdown;//是否关闭线程池
pthread_mutex_t pool_lock;//线程池的锁
pthread_cond_t empty_task;//任务队列为空的条件,(任务队列可以容纳任务)
pthread_cond_t not_empty_task;//任务队列不为空的条件,(任务队列有任务,线程可以提取)
}ThreadPool;
//简易版线程池
#include "threadpoolsimple.h"
ThreadPool *thrPool = NULL;
int beginnum = 1000;//模拟任务编号,初始编号1000
void *thrRun(void *arg)
{
//printf("begin call %s-----\n",__FUNCTION__);
ThreadPool *pool = (ThreadPool*)arg;
int taskpos = 0;//任务位置
PoolTask *task = (PoolTask *)malloc(sizeof(PoolTask));
while(1)
{
//获取任务,先要尝试加锁
pthread_mutex_lock(&thrPool->pool_lock);
//无任务并且线程池不是要摧毁
while(thrPool->job_num <= 0 && !thrPool->shutdown )
{
//如果没有任务,线程会阻塞
pthread_cond_wait(&thrPool->not_empty_task,&thrPool->pool_lock);
}
if(thrPool->job_num)
{
//有任务需要处理
taskpos = (thrPool->job_pop++)%thrPool->max_job_num;
//printf("task out %d...tasknum===%d tid=%lu\n",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self());
//为什么要拷贝?避免任务被修改,生产者会添加任务(循环队列,后来任务可能会覆盖原来位置)
memcpy(task,&thrPool->tasks[taskpos],sizeof(PoolTask));
task->arg = task;
thrPool->job_num--;
//task = &thrPool->tasks[taskpos];不能使用任务队列空间(循环队列,后来任务可能会覆盖原来位置)
pthread_cond_signal(&thrPool->empty_task);//通知生产者
}
if(thrPool->shutdown)
{
//代表要摧毁线程池,此时线程退出即可
//pthread_detach(pthread_self());//临死前分家
pthread_mutex_unlock(&thrPool->pool_lock);
free(task);
pthread_exit(NULL);
}
//释放锁
pthread_mutex_unlock(&thrPool->pool_lock);
task->task_func(task->arg);//执行回调函数
}
//printf("end call %s-----\n",__FUNCTION__);
}
//创建线程池
/*typedef struct _ThreadPool
{
int max_job_num;//最大任务个数
int job_num;//实际任务个数
PoolTask *tasks;//任务队列数组
int job_push;//入队位置
int job_pop;// 出队位置
int thr_num;//线程池内线程个数
pthread_t *threads;//线程池内线程数组
int shutdown;//是否关闭线程池
pthread_mutex_t pool_lock;//线程池的锁
pthread_cond_t empty_task;//任务队列为空的条件
pthread_cond_t not_empty_task;//任务队列不为空的条件
}ThreadPool;
*/
void create_threadpool(int thrnum,int maxtasknum)
{
printf("begin call %s-----\n",__FUNCTION__);
thrPool = (ThreadPool*)malloc(sizeof(ThreadPool));
thrPool->thr_num = thrnum;
thrPool->max_job_num = maxtasknum;
thrPool->shutdown = 0;//是否摧毁线程池,1代表摧毁
thrPool->job_push = 0;//任务队列添加的位置
thrPool->job_pop = 0;//任务队列出队的位置
thrPool->job_num = 0;//初始化的任务个数为0
thrPool->tasks = (PoolTask*)malloc((sizeof(PoolTask)*maxtasknum));//申请最大的任务队列
//初始化锁和条件变量
pthread_mutex_init(&thrPool->pool_lock,NULL);
pthread_cond_init(&thrPool->empty_task,NULL);
pthread_cond_init(&thrPool->not_empty_task,NULL);
int i = 0;
thrPool->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
for(i = 0;i < thrnum;i++)
{
pthread_create(&thrPool->threads[i],&attr,thrRun,(void*)thrPool);//创建多个线程
}
//printf("end call %s-----\n",__FUNCTION__);
}
//摧毁线程池
void destroy_threadpool(ThreadPool *pool)
{
pool->shutdown = 1;//开始自爆
pthread_cond_broadcast(&pool->not_empty_task);//诱杀线程
int i = 0;
for(i = 0; i < pool->thr_num ; i++)
{
pthread_join(pool->threads[i],NULL);
}
pthread_cond_destroy(&pool->not_empty_task);
pthread_cond_destroy(&pool->empty_task);
pthread_mutex_destroy(&pool->pool_lock);
free(pool->tasks);
free(pool->threads);
free(pool);
}
//添加任务到线程池
void addtask(ThreadPool *pool)
{
//printf("begin call %s-----\n",__FUNCTION__);
pthread_mutex_lock(&pool->pool_lock);
//实际任务总数大于最大任务个数则阻塞等待(等待任务被处理),池里没有线程了
while(pool->max_job_num <= pool->job_num)
{
pthread_cond_wait(&pool->empty_task,&pool->pool_lock);
}
int taskpos = (pool->job_push++)%pool->max_job_num; //任务队列使用循环队列
//printf("add task %d tasknum===%d\n",taskpos,beginnum);
pool->tasks[taskpos].tasknum = beginnum//模拟任务编号
pool->tasks[taskpos].arg = (void*)&pool->tasks[taskpos];
pool->tasks[taskpos].task_func = taskRun;
pool->job_num++;
pthread_mutex_unlock(&pool->pool_lock);
pthread_cond_signal(&pool->not_empty_task);//通知包身工
//printf("end call %s-----\n",__FUNCTION__);
}
/*typedef struct _PoolTask
{
int tasknum;//模拟任务编号
void *arg;//回调函数参数
void (*task_func)(void *arg);//任务的回调函数
int fd;
int epfd;
struct epoll_event *evs;
}PoolTask ;
*/
//任务回调函数
void taskRun(void *arg)
{
PoolTask *task = (PoolTask*)arg;
int num = task->tasknum;
printf("task %d is runing %lu\n",num,pthread_self());
sleep(1);
printf("task %d is done %lu\n",num,pthread_self());
}
int main()
{
create_threadpool(3,20);
int i = 0;
for(i = 0;i < 50 ; i++)
{
addtask(thrPool);//模拟添加任务
}
sleep(20);
destroy_threadpool(thrPool);
return 0;
}
#ifndef _THREADPOOL_H
#define _THREADPOOL_H
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#include <pthread.h>
#include "sys/epoll.h"
#include "wrap.h"
typedef struct _PoolTask
{
int tasknum;//模拟任务编号
void *arg;//回调函数参数
void (*task_func)(void *arg);//任务的回调函数
}PoolTask ;
typedef struct _ThreadPool
{
int max_job_num;//最大任务个数
int job_num;//实际任务个数
PoolTask *tasks;//任务队列数组
int job_push;//入队位置
int job_pop;// 出队位置
int thr_num;//线程池内线程个数
pthread_t *threads;//线程池内线程数组
int shutdown;//是否关闭线程池
pthread_mutex_t pool_lock;//线程池的锁
pthread_cond_t empty_task;//任务队列为空的条件,(任务队列可以容纳任务)
pthread_cond_t not_empty_task;//任务队列不为空的条件,(任务队列有任务,线程可以提取)
}ThreadPool;
void create_threadpool(int thrnum,int maxtasknum);//创建线程池--thrnum 代表线程个数,maxtasknum 最大任务个数
void destroy_threadpool(ThreadPool *pool);//摧毁线程池
void addtask(ThreadPool *pool);//添加任务到线程池
void taskRun(void *arg);//任务回调函数
#endif
优化:可以依据任务队列中任务数量,灵活开辟和释放线程池线程。
文章来源:https://blog.csdn.net/Scl_Diligent/article/details/135225828
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!