一步一步写线程之四简单线程池

2024-01-07 17:22:01

一、线程池

说起线程池,只要写过几年程序的,基本上各种语言开发的人都听说过。可能小白或者初级程序员觉得这玩意儿很高级,很高大上。其实也没什么。之所以让开发者觉得如此,主要还在于线程池一般在开发者的背后(框架、库甚至是语言本身等)工作,比如Java有专门封装好的线程池;其次是用得少,它不像线程没事儿就可以开一个用一下;最后就是一个高效安全的线程池其中对线程调度、管理确实有极强的要求。
线程池,Thread Pool,按字面理解就可以,它是一个放线程的池子。它就像一池子水,可以打开多个水管放水来用,也可以随时停止让人使用。而放出来的水在使用完成后又可以回到池子重复使用。当需要水多时,多开几个水管放水,反之,少开几个,线程也是这样。
线程池其实是一种对线程应用的模式,为了提高线程的利用率和应对多任务对线程处理要求,它维护着N个线程,通过任务管理者来获取任务并发执行。
线程池有几个特点:
1、可伸缩
这个非常重要,一个好的线程池一定是可以根据任务的数量和并发要求动态调整线程的数量的。这也是伸缩的意思,线程的数量可多可少,看实际需求。
2、良好的适应性
也可以叫做扩展性,即在不同的需求和架构体系下,仍然可以安全高效运行。比如对CPU核心的亲和性,对多CPU或者多核的情况。
3、平台适应性
即在不同的平台(Windows、Linux甚至嵌入式等)都能够良好的运行。当然,这个特点不是重点了,毕竟往往聚集在某个平台上的效果会更好。
4、良好的接口
这个非常重要,它直接影响应用开发者的使用。所以对外接口要尽量简单、方便而且容易理解。

二、线程池的应用

线程池出现的目的就是为了解决一种应用场景,即对线程使用数量和次数比较频繁,但使用时间相对较短。这样,就会有一个问题,开发者在代码中会不断的创建和回收线程。学过操作系统的都知道线程的创建和销毁是一个非常耗费时间和资源的过程(对人来说可能很快,但对CPU来说,非常慢),而且一不小心就会导致资源或者内存泄露,甚至是直接崩溃。所以在程序运行中,不要随意进行线程的创建和销毁。
正是因为线程的创建和销毁有着这种复杂性,所以就可以创建一个线程池,一直在程序运行的整个生命周期运行着。那么,创建一个线程池,初始的大小怎么设置呢?这时估计很多同学会大声说,So Easy,CPU(或核心)数量*2+1(这个加1可选)。嗯,这么说,某种程度上是没有问题的,但如果是从一种应用说明的角度来说,就是不准确的。
回到这个系列的开始,经常提到的就是场景。对,其实数量的设置,和你的场景有关系,如IO密集性、CPU密集型这种整体上的场景,还有在实际应用中线程的应用情况,比如会不会有线程会被较长时间的阻塞,而这种情况下,线程的数量等于是自动减少了。所以,就需要根据实际的情况来确定线程池数量的创建。
但是一般的经验是CPU密集型线程池,以CPU数量+1为好,而IO密集型以上面的回答为好,但这都不是绝对的,可以根据实际情况再做完善。
线程的创建是有上限的,而且线程创建到一定数量后,线程间调度的开销也是一种很大资源耗费。所以一般情况下,线程池中,总的线程的创建数量尽量控制不要突破百的量级(正常的服务器开发)。如果有一台服务器配置非常好,比如有两千个核心,那就另说了。这句话的意思是,所以有的设置都是有前提条件的,也就是应用场景的变化,就会导致线程池的使用的变化,一切从实际出发。
线程的实现有很多种方法,有传统的C风格的,也有C++11风格的。它的实现方法也有很多,因人而异,因情况而异,有用数组管理,有抽象成类的都可以。线程池的具体的应用场景一般有如下几个:
1、较多的定时任务
即会在不特定多数的时间内不断的启动线程来执行一些任务,而执行的时间可能又相对较短,那么就可以创建一个线程池,把任务推送到队列中来定时处理。
2、并发数据处理
即可以将多种数据任务推送到线程池的队列,然后执行完成一个后自动调度到下一个任务。
3、IO密集型
这种场景是前面提到过的,最典型的就是网络通信,高并发,短时间通信。比如HTTP的服务端,IM的服务端等等。
基于此,线程池的优缺点就可以总结出来的:
优点:
1、降低资源消耗,提高线程的响应速度。不需要线程创建和销毁,直接使用线程。
2、方便使用,降低开发和维护成本。
缺点:
1、开发较为困难,特别是开发一个较好的线程池。
2、初始占用资源较多。程序一启动就N个线程跑起来,占用了很多资源。
3、对大多数场景作用不大。上面也提到过,能使用的线程池的场景相对整体应用来说并不多。

三、简单线程池的实现

一个线程池主要有以下几个部分组成:
1、Worker线程
这个最好理解,线程池中真正执行任务的线程
2、Task队列
这个也好理解,工作线程需要从这个队列中获取任务
3、线程管理器
负责任务管理调度和线程管理调度,比如任务的入队,线程的启动等等
4、任务接口
这个就有点抽象,其实可以理解如何更好的将task中的任务提供给工作线程的一种接口化处理。
5、外部接口
这个就更好理解了,如何提供上层应用者的调用接口。

四、一个简单的线程池

#include <iostream>
#include <vector>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <future>
//#include <boost/thread.hpp>

class ThreadPool {
public:
    explicit ThreadPool(size_t num) : stop(false), threadNum_(num)
    {
        this->workerThreads();
    }

    //注意,c++14以前返回值需要使用拖尾类型
    template<typename F>
    std::future<typename std::result_of<F()>::type> submit(F&& func)
    {
        //和decltype(f(args...))一致,在c++17后使用std::invoke_result,调用函数使用std::invoke
        using return_type = typename std::result_of<F()>::type;

        auto task = std::make_shared<std::packaged_task<return_type()>>(std::forward<F>(func));
        std::future<return_type> result = task->get_future();

        {
            std::unique_lock<std::mutex> lock(qMutex_);

            if (stop)
            {
                throw std::runtime_error("ThreadPool is stopped");
            }

            //c++17后可直接使用[task = std::move(task)]
            //tasksQueue_.emplace(std::move(task));
            //C++17前
            tasksQueue_.emplace([task]() { (*task)(); });
        }

        cv_.notify_one();

        return result;
    }

    ~ThreadPool()
    {
        {
            std::unique_lock<std::mutex> lock(qMutex_);
            stop = true;
        }

        cv_.notify_all();

        for (auto& thread : vecThreads_)
        {
            thread.join();
        }
    }
private:
    void workerThreads() {
        for (int threadcount = 0; threadcount < this->threadNum_; threadcount++)
        {
            static int count = 0;
            std::cout << "create thread is:" <<++count<< std::endl;
            this->vecThreads_.emplace_back([this] {

                while (true) {
                    std::function<void()> task;

                    {
                        std::unique_lock<std::mutex> lock(qMutex_);

                        cv_.wait(lock, [this] { return !tasksQueue_.empty() || stop; });

                        if (!tasksQueue_.empty()) {
                            task = std::move(tasksQueue_.front());
                            tasksQueue_.pop();
                        }
                        else if (stop)
                        {
                            break; //中断并退出
                        }
                    }

                    if (task != nullptr)
                    {
                        task();
                    }
                }
                });
        }
    }

    bool stop;
    size_t threadNum_;
    std::vector<std::thread> vecThreads_;
    std::queue<std::function<void()>> tasksQueue_;
    std::mutex qMutex_;
    std::condition_variable cv_;
};

constexpr int poolsize = 10;
constexpr int threadtasknum = 20;

int main() {
    ThreadPool pool(poolsize);

    for (int i = 0; i < threadtasknum; ++i)
    {
        int id = i + 1;
         auto ret = pool.submit([id]()->int
            {
            std::cout << "Task id:" << id << ", Get cur thread id:"<<std::this_thread::get_id() << std::endl;
            return id;
            }).get();

            std::cout << "thread return value is:" <<ret<< std::endl;
    }

    return 0;
}

运行结果:

create thread is:1
create thread is:2
create thread is:3
create thread is:4
create thread is:5
create thread is:6
create thread is:7
create thread is:8
create thread is:9
create thread is:10
Task id:1, Get cur thread id:1708
thread return value is:1
Task id:2, Get cur thread id:1176
thread return value is:2
Task id:3, Get cur thread id:39628
thread return value is:3
Task id:4, Get cur thread id:8120
thread return value is:4
Task id:5, Get cur thread id:39728
thread return value is:5
Task id:6, Get cur thread id:4340
thread return value is:6
Task id:7, Get cur thread id:31900
thread return value is:7
Task id:8, Get cur thread id:31804
thread return value is:8
Task id:9, Get cur thread id:7908
thread return value is:9
Task id:10, Get cur thread id:38240
thread return value is:10
Task id:11, Get cur thread id:1708
thread return value is:11
Task id:12, Get cur thread id:1176
thread return value is:12
Task id:13, Get cur thread id:39628
thread return value is:13
Task id:14, Get cur thread id:8120
thread return value is:14
Task id:15, Get cur thread id:39728
thread return value is:15
Task id:16, Get cur thread id:4340
thread return value is:16
Task id:17, Get cur thread id:31900
thread return value is:17
Task id:18, Get cur thread id:31804
thread return value is:18
Task id:19, Get cur thread id:7908
thread return value is:19
Task id:20, Get cur thread id:38240
thread return value is:20

在运行结果中可以看到线程池中的线程创建了10个,每个被使用了两次,符合预期。同时,线程运行的结果,也正确的返回了,达到预期。
在这个入门的线程池类中,可以看到:
1、在ThreadPool的构造函数中通过调用workerThreads增加线程(应用接口)
2、在workerThreads函数中,通过Lambda表达式自动插入到线程管理的容器vecThreads_(管理线程)
3、通过submit模板函数来实现对task任务的提交(应用接口,工作队列,任务接口)
4、在workerThreads函数中,通过循环阻塞来判断是否有任务提交并执行之(工作线程)
5、管理者ThreadPool通过条件变量通知线程启动任务执行(管理者分配任务)
6、通过packaged_task的调用来得到返回值
基本上网上的线程池的类型大抵和这个都差不多,可能由于c++版本的不同,会有一些细节的差异,只要明白就可以了。

通过上面这个简单的线程池,可以了解并知道开发一个线程池,一般会有以下几种情况需要开发者考虑:
1、线程池的初始化状态,即空闲状态
2、线程池中的工作任务小于线程数量,任务队列空闲
3、线程池工作线程全负载,但任务队列未满
4、线程池中工作线程全负载,任务队列满。此时需要根据情况决定是否启动线程的批量创建或者任务队列的扩容。
5、第4种情况消失后,在创建新线程的情况下,如何根据实际情况决定是否回收线程池中的部分线程
6、线程池的销毁
另外需要注意的是,线程池未必就比单线程或者普通多线程更高效,这个还是看场景和实际的线程的数量,仍然是那句话:实践出真知。

五、总结

写线程池不要着急,要把基础打好,然后多看看网上各路大神对线程池的看法和他们的线程池代码。当然,各种框架中的线程池的代码更有参考作用。多看多写,多探寻原因究竟,更要多引入新技术新知识。这样,线程池自然会写好。写好代码后,还要不断的回头完善和修改自己写的代码,这样,最终就会写出一个不错的线程池。
与诸君共勉!

文章来源:https://blog.csdn.net/fpcc/article/details/135438654
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。