Java 线程池的学习总结

2023-12-14 05:00:27

一、线程池的优点

(1)降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

(2)提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。

(3)提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
?

二、线程池的创建

1、线程池的 7 个参数:

①corePoolSize:核心线程数。

②maximumPoolSize:最大线程数。

③keepAliveTime:非核心线程的存活时间。

④unit:存活时间的单位。

⑤workQueue:当核心线程数已满,将新的线程存放在工作队列中,等待核心线程执行完毕。

⑥threadFactory:线程创建工厂。

⑦handler:当线程池已经饱和之后执行的拒绝策略。

2、工作队列的种类

2.1、LinkedBlockingQueue 构造方法分析

通过上述代码可以知道 LinkedBlockingQueue 默认是一个无界限的队列,所以当线程过多时就会导致 OOM(OUT OF MEMORY) 内存不足问题。

2.2、ArrayBlockingQueue 构造方法分析

ArrayBlockingQueue 是需要手动传入队列的长度,默认 ArrayBlockingQueue 的内置锁为非公平锁,也可通过传入 boolean 值来进行指定公平锁。

2.3、SynchronousQueue 构造方法分析

没有容量,直接提交队列,是无缓存等待队列,当任务提交进来,它总是马上将任务提交给线程去执行,如果线程池已满,则执行拒绝策略;所以使用 SynchronousQueue 阻塞队列一般要求maximumPoolSize为无界(无限大),避免线程拒绝执行操作,从源码中可以看到容量为0。

2.4、DelayQueue 延迟队列的源码分析

以下是 DelayQueue 延时获取线程的源码

//获取延时线程
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        /*为以下代码块上锁,此为一个阻塞式方法,
        会一直等待线程延时时间结束*/
        lock.lockInterruptibly();
        try {
            for (;;) {
                //死循环,一直获取队首线程
                E first = q.peek();
                if (first == null)
                    //线程等待
                    available.await();
                else {
                    //获取过期时间
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        //如果过期时间结束,返回队首线程,并将该线程出队
                        return q.poll();
                    first = null; // don't retain ref while waiting
                    //判断当前有无线程
                    if (leader != null)
                        //线程等待
                        available.await();
                    else {
                        Thread thisThread = Thread.currentThread();
                        //如果不存在,将当前线程指向 leader
                        leader = thisThread;
                        try {
                            /*在过期时间范围内等待,
                            直到接收到其他线程的通知、被中断或超时*/
                            available.awaitNanos(delay);
                        } finally {
                            if (leader == thisThread)
                                //最终将当前线程执行完置为空
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                /*如果当前线程为空并且队列队列首元素不为空,
                此时唤醒一个等待中的线程*/
                available.signal();
            //解锁
            lock.unlock();
        }
    }

DelayQueue 使用了?ReentrantLock?和?Condition?来实现线程的阻塞和唤醒。当调用?take()?方法时,如果队列为空或者队列中的第一个元素的延时时间还未到期,线程将进入阻塞状态,等待被唤醒。每当有一个元素的延时时间到期时,DelayQueue?会使用?Condition?的?signal()?方法唤醒一个等待中的线程。

3、handler 四种拒绝策略

3.1、DiscardPolicy(丢弃策略) 源码分析

在 rejectedExecution 方法中,DiscardPolicy 没有做任何操作,直接丢弃线程

3.2、AbortPolicy(中止策略) 源码分析

在 rejectedExecution 方法中,AbortPolicy 抛出了异常,但是也没有对线程操作,和 DiscardPolicy 一样都是直接丢弃,但是会抛出异常。

3.3、DiscardOldestPolicy(丢弃最老的策略) 源码分析

在 rejectedExecution 方法中先判断了当前线程池有没有执行 shutdown 方法,如果没有执行,则将线程池工作队列中的首元素出队列,将该线程直接加入到工作队列中。

3.4、CallerRunsPolicy(运行新的策略) 源码分析

在 rejectedExecution 方法中先判断线程池是否执行 shutdown 方法,如果未执行直接将该线程加入到主线程中,即在调用execute或者submit的方法中执行,不再使用线程池来处理此任务。

三、线程池的任务处理机制

以下是线程池处理任务的流程图:

1、首先,会先判断 corePoolSize 是否已满,如果还有空间则直接创建线程执行任务。

2、当 corePoolSize 已满,会将新的的任务放在 woreQueue 中,等待核心线程执行完,当一个线程完成任务会从队列中取下一个任务来执行;工作队列中的任务是由核心线程来执行的,而不是非核心线程。

3、当 workQueue 已满,会去判断 maximumPoolSize 是否已满,如果没有满,会尝试创建非核心线程去执行该任务。

4、当 maximumPoolSize 已满,则会执行 handler 拒绝策略。

5、如果线程池中线程数量大于 corePoolSize,则当空闲时间超过 keepAliveTime 的非核心线程,将会被终止,直到线程池数量不大于 corePoolSize 为止。核心线程不受 keepAliveTime 的影响,它们始终保持存活状态,不会被终止。

以下是线程池的任务执行的源码分析:

public void execute(Runnable command) {
        //判断当前执行任务是否为空,为空抛出异常
        if (command == null)
            throw new NullPointerException();
        //AtomicInteger ctl,原子类,用于记录线程池中的线程数
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            /*如果当前线程数小于核心线程数,直接创建线程执行任务,
            通过 addWorker 方法中的
            wc >= (core ? corePoolSize : maximumPoolSize)
            来判断是创建核心线程还是非核心线程*/
            if (addWorker(command, true))
                return;
            //如果创建失败,则重新获取当前线程数
            c = ctl.get();
        }
        //判断线程池是否运行,并且新线程成功加入到工作队列
        if (isRunning(c) && workQueue.offer(command)) {
            //获取当前的线程数
            int recheck = ctl.get();
            //如果线程池已经关闭,将工作队列中的任务移出
            if (! isRunning(recheck) && remove(command))
                //执行 handler 拒绝策略
                reject(command);
            else if (workerCountOf(recheck) == 0)
                //如果当前运行线程数为0,则不在创建线程,传 null 值
                addWorker(null, false);
        }
        //当工作队列已满尝试创建新的线程,传入 false,创建非核心线程
        else if (!addWorker(command, false))
            //如果创建失败,则执行相应的拒绝策略
            reject(command);
    }

线程池是在使用?executor()?方法时就已经开始执行任务了,而不是等到所有线程都创建完毕后再开始执行。

线程池的原理是将需要执行的任务提交给线程池,线程池会将这些任务分发给线程来执行。在任务提交后,线程池会立即开始执行任务,并根据线程池的配置情况来管理和调度线程的执行。

四、Java 中 Executors 的常用线程池

1、FixedThreadPool (固定大小的线程池) 源码分析

创建该线程池需要传入线程池的大小,从方法创建出来的的线程池可以看出该线程池的 corePoolSize 和 maximumPoolSize 的值一致,所以非核心线程的存活时间为 0 ,工作队列的类型为 LinkedBlockingQueue,所以当队列中任务过多会导致 OOM(Out Of Memory) 问题,因为 LinkedingBlockingQueue 队列是无界的。

2、SingleThreadExecutor(单线程线程池) 源码分析

单线程的线程池的核心线程数和最大线程数都为 1,因为每次只会创建一个线程来完成任务,所以非核心线程的存活时间为 0,工作队列使用的是 LinkedBlockingQueue,所以当任务过多,回导致工作队列存放很多任务,出现 OOM 的问题。

3、CachedThreadPool (缓存线程池) 源码分析

CachedThreadPool 是一个可以缓存的线程池,它的 corePoolSize 为 0,这意味着任务到达就直接放入工作队列中,工作队列使用的是 SynchronousQueue,既无缓存队列,线程到达就直接创建一个非核心线程立即执行任务,keepAliveTime 为一分钟,maximumPoolSize 为无界。

可能出现的问题:由于工作队列为 SynchronousQueue ,所以这个线程池对任务来着不拒,线程不够用就创建一个, 如果同一时刻应用的来了大量的任务, 很容易就创建过多的线程, 就容易导致应用卡顿或者直接 OOM。

4、ScheduledThreadPool (定时任务线程池) 源码分析

ScheduledThreadPool 是用来存放定时任务的线程池,创建方法的参数为 corePoolSize,它的maximumPoolSize 无界限,工作队列使用的是 DelayBlockingQueue,加入任务的时候,会把任务和定时时间构建一个RunnableScheduledFuture对象,再把这个对象放入DelayedWorkQueue队列中,DelayedWorkQueue是一个有序队列, 他会根据内部的RunnableScheduledFuture的运行时间排序内部对象。任务加入后就会启动一个线程。 这个线程会从DelayedWorkQueue中获取一个任务。

可能出现的问题:最大线程数和任务队列没有上限,可能发生前一次定时任务还没有完成, 后一个定时任务的运行时间到了, 它也会运行, 线程不够就创建,如果定时任务运行的时间过长, 就会导致前后两个定时任务同时执行,但如果他们之间有锁,就可能出现死锁。

五、如何判断线程池中的任务都已经完成

public static void executorsTest(ExecutorService executorService) {
        try {
            int length = 10;
            // 创建一个计数器,大小和执行任务的个数一致。
            CountDownLatch countDownLatch = new CountDownLatch(length);

            for (int i = 0; i < length; i++) {
                Runnable runnable = new Runnable() {
                    @Override
                    public void run() {
                        try {
                            System.out.println("当前线程:" + Thread.currentThread().getName() + "正在运行");
                            Thread.sleep(1000);
                            //执行一个任务,计数器 -1
                            countDownLatch.countDown();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                };
                executorService.execute(runnable);
            }
            //关闭线程池,会等待未完成的任务完成才会关闭。
            executorService.shutdown();
            //判断线程池是否已经调用 shutdown 方法
            if (executorService.isShutdown()) {
                System.out.println("shutdown方法已经被调用");
            }
            /**
             * 如何在线程任务都执行完毕之后做出打印操作:
             * 1、使用 isTerminated 方法俩判断线程池是否完全终止
             * 2、使用 awaitTermination 方法
             * 3、通过创建 CountDownLatch 计数器来实现
             *    等待一段时间,以确保所有任务都已经完成。
             */

            //第一种:使用 isTerminated 方法来判断
            /**
             * 如果没有添加 while 循环,那么在执行判断之前,
             * 线程池可能还没完全终止,还有任务在执行。
             * 因此,判断结果可能为 false,导致无法打印。
             *
             * 通过添加 while 循环,程序会一直等待线程池的终止,
             * 直到线程池中的所有任务都完成。
             * 这样,在执行判断时,线程池已经完全终止,判断结果为 true,从而进行打印。
             *
             * 需要注意的是,这种方式会阻塞线程池中的任务全部主线程,直到完成。
             * 如果任务执行时间较长,可能会导致程序长时间等待。
             * 因此,需要根据实际情况来决定是否使用 while 循环等待线程池的终止。
             */
            //isTerminated 方法用于判断线程池是否完全终止,
            // 如果没有终止就一直 cas 自旋等待
            while (!executorService.isTerminated()) {

            }
            //如果没有加 while 语句阻塞就会一直判断为false
            if (executorService.isTerminated()) {
                System.out.println("所有线程都已执行完毕");
            }

           /* 第二种:使用 awaitTermination 方法
            设定指定时间,如果在这个时间范围线程池的任务都已执行完毕,结果为true,否则为false*/
            if (executorService.awaitTermination(10, TimeUnit.SECONDS)) {
                System.out.println("所有的线程都已执行完毕");
            } else {
                System.out.println("时间结束,还有线程未执行完毕");
            }


           /* 第三种,使用计数器来记录线程的执行情况。
            阻塞等待,当计数器的值为 0 时,才会放行*/
            countDownLatch.await();
            System.out.println("所有线程都已执行完毕");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

六、线程池的常用方法总结

1、execute(Runnable command):用于向线程池提交一个Runnable任务以异步执行。

2、submit(Callable<T> task):用于向线程池提交一个Callable任务以异步执行,并返回一个表示任务待处理结果的Future对象。

3、shutdown():用于平缓关闭线程池,不再接受新的任务,但会等待已提交的任务执行完成。

4、shutdownNow():用于立即关闭线程池,并尝试中断正在执行的任务。

5、awaitTermination(long timeout, TimeUnit unit):用于等待线程池中所有的任务执行完毕,或者等待超时。

6、invokeAll(Collection<? extends Callable<T>> tasks):用于执行给定的任务集合,并返回表示任务待处理结果的Future列表。

7、invokeAny(Collection<? extends Callable<T>> tasks):用于执行给定的任务集合中的一个任务,返回首个成功执行的任务的结果,并取消其他任务。

8、isShutdown():用于判断线程池是否已经调用了shutdown()方法。

9、isTerminated():用于判断线程池是否已经完全终止。

文章来源:https://blog.csdn.net/qq_58417838/article/details/134952669
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。