【线程池】的原理分析及源码(C语言版)

2023-12-14 13:53:54

线程池的原理分析及源码(C语言版)

centos8 连接失败 线程已满_张三和你一聊聊线程池
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

线程池

线程池是一种用于管理和复用线程的机制,通过线程池可以减少线程的创建和销毁次数,提高程序的性能和效率。线程池通常包含一个线程队列和一个任务队列,线程队列用于存储可用的线程,而任务队列用于存储待执行的任务。

下面是一个简单的线程池的原理分析和简单的C语言实现:

线程池的基本原理:

  1. 初始化: 创建一定数量的线程,并将它们放入线程队列中,初始化任务队列。

  2. 任务提交: 当有任务需要执行时,将任务添加到任务队列中。

  3. 线程执行: 线程池中的线程从任务队列中取出任务并执行。

  4. 线程复用: 执行完任务后,线程不销毁,而是继续等待新的任务。这样可以避免频繁地创建和销毁线程,提高效率。

  5. 线程阻塞: 如果任务队列为空,线程将被阻塞等待新的任务。

  6. 线程池销毁: 在程序结束时或者不再需要线程池时,销毁线程池,释放资源。

C语言实现:

下面是一个简单的C语言线程池实现,使用了pthread库:

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

#define MAX_THREADS 5
#define MAX_TASKS 10

typedef struct {
    void (*function)(void*);  // 任务函数指针
    void *arg;  // 任务参数
} Task;

typedef struct {
    Task tasks[MAX_TASKS];  // 任务队列
    int front, rear;  // 队头和队尾
    pthread_mutex_t mutex;  // 互斥锁
    pthread_cond_t cond;  // 条件变量
} TaskQueue;

typedef struct {
    pthread_t threads[MAX_THREADS];  // 线程队列
    TaskQueue taskQueue;  // 任务队列
    int threadCount;  // 线程数量
} ThreadPool;

void initializeThreadPool(ThreadPool *pool, int threadCount);
void submitTask(ThreadPool *pool, void (*function)(void*), void *arg);
void executeTask(ThreadPool *pool);
void destroyThreadPool(ThreadPool *pool);

void* workerThread(void *arg) {
    ThreadPool *pool = (ThreadPool*)arg;
    while (1) {
        executeTask(pool);
    }
    return NULL;
}

void initializeThreadPool(ThreadPool *pool, int threadCount) {
    pool->threadCount = threadCount;
    pool->taskQueue.front = pool->taskQueue.rear = 0;
    pthread_mutex_init(&pool->taskQueue.mutex, NULL);
    pthread_cond_init(&pool->taskQueue.cond, NULL);

    for (int i = 0; i < threadCount; ++i) {
        pthread_create(&pool->threads[i], NULL, workerThread, pool);
    }
}

void submitTask(ThreadPool *pool, void (*function)(void*), void *arg) {
    pthread_mutex_lock(&pool->taskQueue.mutex);

    while ((pool->taskQueue.rear + 1) % MAX_TASKS == pool->taskQueue.front) {
        // 等待任务队列非满
        pthread_cond_wait(&pool->taskQueue.cond, &pool->taskQueue.mutex);
    }

    // 添加任务到任务队列
    pool->taskQueue.tasks[pool->taskQueue.rear].function = function;
    pool->taskQueue.tasks[pool->taskQueue.rear].arg = arg;
    pool->taskQueue.rear = (pool->taskQueue.rear + 1) % MAX_TASKS;

    // 通知线程有新任务
    pthread_cond_signal(&pool->taskQueue.cond);
    pthread_mutex_unlock(&pool->taskQueue.mutex);
}

void executeTask(ThreadPool *pool) {
    pthread_mutex_lock(&pool->taskQueue.mutex);

    while (pool->taskQueue.front == pool->taskQueue.rear) {
        // 等待任务队列非空
        pthread_cond_wait(&pool->taskQueue.cond, &pool->taskQueue.mutex);
    }

    // 取出任务并执行
    Task task = pool->taskQueue.tasks[pool->taskQueue.front];
    pool->taskQueue.front = (pool->taskQueue.front + 1) % MAX_TASKS;

    // 通知线程有新空位
    pthread_cond_signal(&pool->taskQueue.cond);
    pthread_mutex_unlock(&pool->taskQueue.mutex);

    // 执行任务
    task.function(task.arg);
}

void destroyThreadPool(ThreadPool *pool) {
    for (int i = 0; i < pool->threadCount; ++i) {
        pthread_cancel(pool->threads[i]);
    }

    pthread_mutex_destroy(&pool->taskQueue.mutex);
    pthread_cond_destroy(&pool->taskQueue.cond);
}

// 示例任务函数
void printHello(void *arg) {
    int *num = (int*)arg;
    printf("Hello from task %d\n", *num);
}

int main() {
    ThreadPool pool;
    initializeThreadPool(&pool, MAX_THREADS);

    int taskArgs[MAX_TASKS];
    for (int i = 0; i < MAX_TASKS; ++i) {
        taskArgs[i] = i;
        submitTask(&pool, printHello, &taskArgs[i]);
    }

    // 等待任务执行完成
    sleep(2);

    destroyThreadPool(&pool);

    return 0;
}

请注意,这只是一个简单的示例,实际的线程池实现可能需要更多的功能和错误处理。此外,在生产环境中,建议使用现有的线程池库,而不是自己编写线程池。(谨慎使用)

线程池库

有许多现有的库提供了线程池的实现,这些库在不同的编程语言中都有。以下是一些常用的线程池库:

  1. C语言:

    • POSIX Threads (pthread): 这是C语言中用于多线程编程的标准库,提供了线程创建、同步等功能。虽然不是专门的线程池库,但可以通过合理的设计使用pthread来实现线程池。
  2. C++语言:

    • C++ Standard Library (std::thread): C++11及以后的标准库提供了std::thread,也可以通过其他标准库组件来实现线程池。
    • Boost.Thread: Boost库提供了boost::thread,同样可以通过它来实现线程池。
  3. Java语言:

    • java.util.concurrent: Java标准库提供了Executor框架,包括ThreadPoolExecutor等实现类,用于管理线程池。
    • Guava Library: Google的Guava库提供了ListeningExecutorService等线程池相关的接口和实现。
  4. Python语言:

    • concurrent.futures: Python标准库中的concurrent.futures模块提供了线程池和进程池的实现,如ThreadPoolExecutorProcessPoolExecutor
    • ThreadPoolExecutor: 这是一个基于concurrent.futures的第三方库,提供了更多的功能。
  5. C#语言:

    • System.Threading.Tasks: .NET框架中的TaskThreadPool类提供了线程池的实现。
    • TPL (Task Parallel Library): TPL是.NET框架的一部分,提供了更高级的任务并行编程模型,包括线程池。
  6. JavaScript语言:

    • Web Workers: 在Web开发中,浏览器提供了Web Workers,可以通过它们实现类似线程池的功能。
  7. 其他语言:

    • 对于其他编程语言,通常都有类似的库或框架,提供了线程池或并发编程的支持。

这些库提供了高级抽象,简化了线程池的使用和管理,同时考虑了线程的复用、任务调度等问题。在选择库时,可以根据自己的项目需求和编程语言选择合适的库。

1.线程池工作原理

线程池的组成主要分为3个部分,这三部分配合工作就可以得到一个完整的线程池:

  1. 任务队列,存储需要处理的任务,由工作的线程来处理这些任务
  2. 工作的线程(任务队列任务的消费者),N个
  3. 管理者线程(不处理任务队列中的任务)
    • 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
    • 当任务过多的时候,可以适当地创建一些新的工作线程
    • 当任务过少的时候,可以适当地销毁一些工作的线程

2.定义线程池的结构

/**任务结构体 */
typedef struct Task
{
    void (*function)(void *arg); /// 泛型,兼容各种各样的数据类型
    void *arg;
} Task;

/** 线程池的结构体*/  
struct ThreadPool
{
    /** 任务队列*/
    Task *taskQ;
    int queueCapacity; // 容量
    int queueSize;     // 当前任务个数
    int queueFront;    // 队头
    int queueRear;     // 对尾
    pthread_t managerID;  // 管理者线程ID
    pthread_t *threadIDs; // 工作的线程ID
    int minNum;  // 最小的线程数
    int maxNum;  // 最大的线程数
    int busyNum; // 正在工作的线程
    int liveNum; // 存活的线程个数
    int exitNum; // 要销毁的线程个数
    pthread_mutex_t mutexPool; ///< 锁整个线程池
    pthread_mutex_t mutexBusy; ///< 锁busyNum变量
    pthread_cond_t notFull;    ///< 任务队列是不是满了
    pthread_cond_t notEmpty;   ///< 任务队列是不是空了
    int shutdown; // 是不是要销毁线程池,销毁为1,不销毁为0;
};

3.创建线程池实例

ThreadPool *threadpoolCreate(int min, int max, int queueSize)
{
    /// 初始化线程池
    ThreadPool *pool = (ThreadPool *)malloc(sizeof(ThreadPool));

    do
    {
        if (pool == NULL) {
            printf("malloc threadpool fail ...\n");
            break;
        }

        /// 初始化工作的线程
        pool->threadIDs = (pthread_t *)malloc(sizeof(pthread_t) * max);
        if (pool->threadIDs == NULL) {
            printf("malloc threadIDs fail ... \n");
            break;
        }

        // 初始化线程池的相关参数
        memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
        pool->minNum = min;
        pool->maxNum = max;
        pool->busyNum = 0;
        pool->liveNum = min;
        pool->exitNum = 0;

        // 初始化相关锁和条件变量
        if (pthread_mutex_init(&pool->mutexPool, NULL) != 0 ||
            pthread_mutex_init(&pool->mutexBusy, NULL) != 0 ||
            pthread_cond_init(&pool->notEmpty, NULL) != 0 ||
            pthread_cond_init(&pool->notFull, NULL) != 0)
        {
            printf("mutex or condition init fail...\n");
            break;
        }

        // 初始化任务队列
        pool->taskQ = (Task *)malloc(sizeof(Task) * queueSize);
        pool->queueCapacity = queueSize;
        pool->queueSize = 0;
        pool->queueFront = 0;
        pool->queueRear = 0;
        
        // 线程池不销毁
        pool->shutdown = 0;
        
        // 创建线程
        pthread_create(&pool->managerID, NULL, manager, pool);
        for (int i = 0; i < min; ++i) {
            pthread_create(&pool->threadIDs[i], NULL, worker, pool);
        }
        return pool;
    } while (0);
    
    /// 释放资源
    if (pool && pool->threadIDs) {
        free(pool->threadIDs);
    }
    if (pool && pool->TaskQ) {
        free(pool->TaskQ);
    }
    if (pool) {
        free(pool);
    }
    return NULL;
}

4.工作线程的函数

void *worker(void *arg)
{
    ThreadPool *pool = (ThreadPool *)arg;

    while (1) {
        pthread_mutex_lock(&pool->mutexPool);
        // 当前任务队列是否为空
        while (pool->queueSize == 0 && !pool->shutdown) {
            // 阻塞工作线程
            pthread_cond_wait(&pool->notEmpty, &pool->mutexPool);

            // 判断是不是要销毁线程
            if (pool->exitNum > 0) {
                pool->exitNum--;
				if (pool->liveNum > pool->minNum) {
                    pool->liveNum--;
                    pthread_mutex_unlock(&pool->mutexPool);
                    threadExit(pool);
                }
            }
        }

        // 判断线程池是否被关闭了
        if (pool->shutdown) {
            pthread_mutex_unlock(&pool->mutexPool);
            threadExit(pool);
        }

        // 从任务队列中取出一个任务
        Task task;
        task.function = pool->taskQ[pool->queueFront].function;
        task.arg = pool->taskQ[pool->queueFront].arg;

        // 移动头结点
        pool->queueFront = (pool->queueFront + 1) % pool->queueCapacity;
        pool->queueSize--;

        // 解锁
        pthread_cond_signal(&pool->notFull);
        pthread_mutex_unlock(&pool->mutexPool);

        printf("thread %ld start working...\n", pthread_self());
        
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum++;
        pthread_mutex_unlock(&pool->mutexBusy);
        
        task.function(task.arg);
        free(task.arg);
        task.arg = NULL;
        printf("thread %ld end working...\n", pthread_self());
        
        pthread_mutex_lock(&pool->mutexBusy);
        pool->busyNum--;
        pthread_mutex_unlock(&pool->mutexBusy);
    }
    return NULL;
}

5.管理者线程的函数

void *manager(void *arg)
{
    ThreadPool *pool = (ThreadPool *)arg;
    while (!pool->shutdown) {
        // 每隔3s检测一次
        sleep(3);

        // 取出线程池中任务的数量和当前线程的数量
        pthread_mutex_lock(&pool->mutexPool);
        int queueSize = pool->queueSize;
        int liveNum = pool->liveNum;
        pthread_mutex_unlock(&pool->mutexPool);

        // 取出忙的线程的数量
        pthread_mutex_lock(&pool->mutexBusy);
        int busyNum = pool->busyNum;
        pthread_mutex_unlock(&pool->mutexBusy);

        // 添加线程
        // 任务的个数>存活的线程个数 && 存活的线程数<最大线程数
        if (queueSize > liveNum && liveNum < pool->maxNum) {
            pthread_mutex_lock(&pool->mutexPool);
            int counter = 0;
            
            /**                  最大线程数                 ?              存活的线程数    最大线程数  */
            for (int i = 0; i < pool->maxNum && counter < NUMBER && pool->liveNum < pool->maxNum; ++i) {
                if (pool->threadIDs[i] == 0) {
                    pthread_create(&pool->threadIDs[i], NULL, worker, pool);
                    counter++;
                    pool->liveNum++;
                }
            }
            pthread_mutex_unlock(&pool->mutexPool);
        }

        // 销毁线程
        // 忙的线程*2 < 存活的线程数 && 存活的线程>最小线程数
        if (busyNum * 2 < liveNum && liveNum > pool->minNum) {
            pthread_mutex_lock(&pool->mutexPool);
            pool->exitNum = NUMBER;
            pthread_mutex_unlock(&pool->mutexPool);
            
            // 让工作的线程自杀
            for (int i = 0; i < NUMBER; ++i) {
                pthread_cond_signal(&pool->notEmpty);
            }
        }
    }
    return NULL;
}

6.线程退出函数

void threadExit(ThreadPool *pool)
{
    pthread_t tid = pthread_self();
    
    for (int i = 0; i < pool->maxNum; ++i) {
        if (pool->threadIDs[i] == tid) {
            pool->threadIDs[i] = 0;
            printf("threadExit() called, %ld exiting...\n", tid);
            break;
        }
    }
    pthread_exit(NULL);
}

7.往线程池中添加任务

void threadPoolAdd(ThreadPool *pool, void (*func)(void *), void *arg)
{
    pthread_mutex_lock(&pool->mutexPool);
    
    while (pool->queueSize == pool->queueCapacity && !pool->shutdown) {
        // 阻塞生产者线程
        pthread_cond_wait(&pool->notFull, &pool->mutexPool);
    }
    if (pool->shutdown) {
        pthread_mutex_unlock(&pool->mutexPool);
        return;
    }

    // 添加任务
    pool->taskQ[pool->queueRear].function = func;
    pool->taskQ[pool->queueRear].arg = arg;
    pool->queueRear = (pool->queueRear + 1) % pool->queueCapacity;
    pool->queueSize++;

    pthread_cond_signal(&pool->notEmpty);
    pthread_mutex_unlock(&pool->mutexPool);
}

8.获取线程池中工作的线程和活着的线程数量

获取线程池中活着的线程数

int threadPoolAliveNum(ThreadPool *pool)
{
    pthread_mutex_lock(&pool->mutexPool);
    int aliveNum = pool->liveNum;
    pthread_mutex_unlock(&pool->mutexPool);
    
    return aliveNum;
}

获取线程池中工作的线程数

int threadPoolBusyNum(ThreadPool *pool)
{
    pthread_mutex_lock(&pool->mutexBusy);
    int busyNum = pool->busyNum;
    pthread_mutex_unlock(&pool->mutexBusy);
    
    return busyNum;
}

9.线程池的销毁

int threadPoolDestroy(ThreadPool *pool)
{
    if (pool == NULL) {
        return -1;
    }

    // 关闭线程池
    pool->shutdown = 1;

    // 阻塞回收管理者线程
    pthread_join(pool->managerID, NULL);

    // 唤醒阻塞的消费者线程
    for (int i = 0; i < pool->liveNum; ++i) {
        pthread_cond_signal(&pool->notEmpty);
    }

    // 释放堆内存
    if (pool->taskQ) {
        free(pool->taskQ);
    }
    if (pool->threadIDs) {
        free(pool->threadIDs);
    }

    pthread_mutex_destroy(&pool->mutexPool);
    pthread_mutex_destroy(&pool->mutexBusy);
    pthread_cond_destroy(&pool->notEmpty);
    pthread_cond_destroy(&pool->notFull);
    
    free(pool);
    pool = NULL;
    
    return 0;
}

10.使用测试

void taskFunc(void *arg)
{
    int num = *(int *)arg;
    printf("thread %ld is working, number = %d\n",
           pthread_self(), num);

    sleep(1);
}

int main()
{
    /** 创建线程池*/
    ThreadPool *pool = threadPoolCreate(3, 10, 100);
    for (int i = 0; i < 100; ++i) {
        int *num = (int *)malloc(sizeof(int));
        *num = i + 100;
        threadPoolAdd(pool, taskFunc, num);
    }
    sleep(30);
    threadPoolDestroy(pool);
    
    return 0;
}

文章来源:https://blog.csdn.net/m0_62140641/article/details/134919272
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。