ThreadPoolExecutor 方法详解

2023-12-13 04:20:56

目录

前言

1.方法 shutdown() 和 shutdownNow()

2.方法 List shutdownNow() 返回值的作用

3.方法?shutdown() 和 shutdownNow() 与中断

4.方法 isShutdown()?

5.方法 isTerminating() 和 isTerminated()

6.方法 awaitTermination(long timeout,TimeUnit unit)?

7.工厂 ThreadFactory + Thread + UncaughtExceptionHandler 处理异常?

8.方法 set/getRejectedExcutionHandler()

9.线程池的拒绝策略?

10.方法 afterExecute() 和 beforeExecute()

11.其他方法介绍

总结


前言

上一篇文章中,对线程池的创建,各种参数进行了详细的分析,接下来对其他的常用方法和异常处理展开介绍。


1.方法 shutdown() 和 shutdownNow()

public void shutdown() 方法的作用是使当前未执行完的任务继续执行,而队列中未执行的任务会继续执行,不删除队列中的任务,不再允许添加新的任务,同时 shutdown 方法不会阻塞。

List<Runnable> shutdownNow() 方法的作用是使当前未执行完的任务继续执行,而队列中未执行的任务不再执行,删除队列中的任务,不再允许添加新的任务,同时 shutdownNow() 方法不会阻塞。

创建测试用例 1:

package org.example.Executor;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutor_2 {
    static class MyRunnable implements Runnable {

        @Override
        public void run() {
            try {
                System.out.println("begin " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
                Thread.sleep(4000);
                System.out.println("    end" + Thread.currentThread().getName() + " " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyRunnable myRunnable = new MyRunnable();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(7,10,0L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
        pool.execute(myRunnable);
        System.out.println("main end!");
    }
}

运行结果如图,任务执行完成后,线程池继续等待新的任务,程序没有结束:

创建测试用例 2(修改main方法):

    public static void main(String[] args) {
        MyRunnable myRunnable = new MyRunnable();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(7,10,0L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
        pool.execute(myRunnable);
        pool.shutdown();
        System.out.println("main end!");
    }

程序运行的效果是 main 线程输出“main end”后 main 线程立即销毁,线程池在 4 s 后销毁进程结束。

创建测试用例 3(修改main方法):

    public static void main(String[] args) throws InterruptedException {
        MyRunnable myRunnable = new MyRunnable();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(7,10,0L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
        pool.execute(myRunnable);
        pool.execute(myRunnable);
        pool.execute(myRunnable);
        pool.execute(myRunnable);
        Thread.sleep(1000);
        pool.shutdown();
        pool.execute(myRunnable);
        System.out.println("main end!");
    }

运行结果如图,程序执行了 4 个任务,最后一个抛出异常,因为执行了 shutdown() 方法后不能添加新的任务,这个实验也证明执行 shutdown() 方法后未将队列中的任务删除,直到全部任务运行结束。

下面继续学习 shutdownNow() 方法。

新建测试用例1:?

package org.example.Executor;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutor_2 {
    static class MyRunnable implements Runnable {

        @Override
        public void run() {
                System.out.println("begin " + Thread.currentThread().getName() + " " + System.currentTimeMillis());
                for (int i = 0; i < Integer.MAX_VALUE / 50; i++) {
                    String string = new String();
                    Math.random();
                    Math.random();
                    Math.random();
                    Math.random();
                    Math.random();
                    Math.random();
                }
                System.out.println("    end" + Thread.currentThread().getName() + " " + System.currentTimeMillis());
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyRunnable myRunnable = new MyRunnable();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,9999,9999L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
        pool.execute(myRunnable);
        pool.execute(myRunnable);
        pool.execute(myRunnable);
        pool.execute(myRunnable);
        Thread.sleep(1000);
        pool.shutdownNow();
        System.out.println("main end!");
    }
}

程序运行结果如图,控制台信息代表两个任务被成功执行,其余两个任务被取消运行,并且进程销毁。

创建测试用例2,修改main方法如下:

    public static void main(String[] args) throws InterruptedException {
        MyRunnable myRunnable = new MyRunnable();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,9999,9999L, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>());
        pool.execute(myRunnable);
        pool.execute(myRunnable);
        pool.execute(myRunnable);
        pool.execute(myRunnable);
        Thread.sleep(1000);
        pool.shutdownNow();
        pool.execute(myRunnable);
        System.out.println("main end!");
    }

运行结果如图,两个任务被成功执行,两个任务被取消运行,最后一个任务被拒绝运行,抛出异常,进程最后被销毁。

2.方法 List<Runnable> shutdownNow() 返回值的作用

在调用?List<Runnable> shutdownNow()?方法后,队列中的任务被取消运行,shutdownNow()方法的返回值是 List<Runnable>,List 对象存储的是还未运行的任务,也就是被取消掉的任务,为了验证存储的是未运行的任务,创建测试用例,代码如下:

package org.example.Executor;

import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class test27 {
    static class MyRunnable implements Runnable{
        private String username;

        public MyRunnable(String username) {
            this.username = username;
        }

        public String getUsername() {
            return username;
        }

        @Override
        public void run() {
            for (int i = 0; i < Integer.MAX_VALUE / 500; i++) {
                String newString1 = new String();
                String newString5 = new String();
                String newString6 = new String();
                String newString7 = new String();
                Math.random();
                Math.random();
                Math.random();
            }
            System.out.println(Thread.currentThread().getName()+" 任务完成!");
        }
    }

    public static void main(String[] args) {
        try {
            MyRunnable a1 = new MyRunnable("A1");
            MyRunnable a2 = new MyRunnable("A2");
            MyRunnable a3 = new MyRunnable("A3");
            MyRunnable a4 = new MyRunnable("A4");
            ThreadPoolExecutor pool = new ThreadPoolExecutor(2,10,30, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
            pool.execute(a1);
            pool.execute(a2);
            pool.execute(a3);
            pool.execute(a4);
            Thread.sleep(1000);
            List<Runnable> list = pool.shutdownNow();
            for (int i = 0; i < list.size(); i++) {
                MyRunnable myRunnable = (MyRunnable) list.get(i);
                System.out.println(myRunnable.getUsername()+" 任务被取消!");
            }
            System.out.println("main end!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

程序运行结果如图 :

3.方法?shutdown() 和 shutdownNow() 与中断

如果正在执行的任务中使用 if(Thread.currentThread().isInterrupted() == true) 和 throw new InterruptedExeception() 判断任务是否中断,那么在调用 shutdown() 后任务并不会被中断而是继续运行,当调用 shutdownNow() 方法后会将任务立即中断。?

任务类 MyRunnable.java 代码如下:

    static class MyRunnable implements Runnable{
        private String username;

        public MyRunnable(String username) {
            this.username = username;
        }

        public String getUsername() {
            return username;
        }

        @Override
        public void run() {
            try {
                while(true){
                    if (Thread.currentThread().isInterrupted() == true){
                        throw new InterruptedException();
                    }
                }
            }catch (InterruptedException e){
                e.printStackTrace();
                System.out.println("---任务名称:"+username+" 被中断!");
            }
        }
    }

shutdown 测试代码如下 :

    public static void main(String[] args) {
        try{
            MyRunnable a1 = new MyRunnable("A1");
            ThreadPoolExecutor pool = new ThreadPoolExecutor(2,10,30, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
            pool.execute(a1);
            Thread.sleep(2000);
            pool.shutdown();
            System.out.println("main end!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

程序运行后,线程池中的任务未中断,而是会继续运行。

shutdownNow 测试代码如下:

    public static void main(String[] args) {
        try{
            MyRunnable a1 = new MyRunnable("A1");
            ThreadPoolExecutor pool = new ThreadPoolExecutor(2,10,30, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
            pool.execute(a1);
            Thread.sleep(2000);
            pool.shutdownNow();
            System.out.println("main end!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

程序运行后,中断任务停止,可见内部执行了 Thread t ,t.interrupt()方法。

4.方法 isShutdown()?

public boolean isShutdown() 方法的作用是判断线程池是否已经关闭。

package org.example.Executor;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutor_3 {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println(" 打印了 !begin "
                            + Thread.currentThread().getName());
                    Thread.sleep(1000);
                    System.out.println(" 打印了!   end "
                            + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, Integer.MAX_VALUE, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        executor.execute(runnable);
        System.out.println("A=" + executor.isShutdown());
        executor.shutdown();
        System.out.println("B=" + executor.isShutdown());
    }
}

程序运行结果如图,由运行结果可知,只要调用了 shutdown() 方法,isShutdown() 方法的返回结果就是 true:

5.方法 isTerminating() 和 isTerminated()

public boolean isTerminating() 方法:如果此执行程序处在 shutdown 或 shutdownNow 之后且正在终止但尚未完全终止的过程中,也就是还有任务在执行,则返回 true。此方法可以比喻成门是否正在关闭。

public boolean isTerminated() 方法:如果关闭后所有任务都已完成,则返回 true。此方法可以比喻成门是否已经关闭。

shutdown() 或 shutdownNow() 方法的功能是发出一个关闭线程池的命令,isShutdown() 方法用于判断关闭线程池的命令发出或者未发出。isTerminating() 方法代表线程池是否正在关闭中,而 isTerminated() 方法判断线程池是否已经关闭了。

新建测试用例:

package org.example.Executor;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutor_4 {
    static class MyRunnable implements Runnable {

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " begin " + System.currentTimeMillis());
                Thread.sleep(2000);
                System.out.println(Thread.currentThread().getName() + "   end " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyRunnable runnable = new MyRunnable();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 9999, 9999, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        pool.execute(runnable);
        pool.execute(runnable);
        pool.execute(runnable);
        pool.execute(runnable);
        System.out.println(pool.isTerminating() + " " + pool.isTerminated());
        pool.shutdown();
        Thread.sleep(1000);
        System.out.println(pool.isTerminating() + " " + pool.isTerminated());
        Thread.sleep(1000);
        System.out.println(pool.isTerminating() + " " + pool.isTerminated());
        Thread.sleep(1000);
        System.out.println(pool.isTerminating() + " " + pool.isTerminated());
        Thread.sleep(1000);
        System.out.println(pool.isTerminating() + " " + pool.isTerminated());
        Thread.sleep(1000);
        System.out.println(pool.isTerminating() + " " + pool.isTerminated());
    }
}

运行结果如图:

6.方法 awaitTermination(long timeout,TimeUnit unit)?

public boolean awaitTermination(long timeout,TimeUnit unit) 方法的作用是查看在指定的时间内,线程池是否已经终止工作,也就是 “最多” 等待多少时间后去判断线程池是否已经终止工作。如果在指定的时间之内,线程池销毁会导致该方法不再阻塞,而超过 timeout 时间也会导致该方法不再阻塞。此方法的使用需要 shutdown() 方法的配合。

package org.example.Executor;

import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutor_5 {
    static class MuRunnable implements Runnable {

        @Override
        public void run() {

            try {
                System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis());
                Thread.sleep(4000);
                System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MuRunnable run1 = new MuRunnable();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,9999,999L, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
        pool.execute(run1);
        System.out.println("main begin !"+System.currentTimeMillis());
        System.out.println(pool.awaitTermination(10,TimeUnit.SECONDS));
        System.out.println("main    end !"+System.currentTimeMillis());
        //此实验说明 awaitTermination 方法具有阻塞特性
    }
}

运行结果如图,从 main begin 到 main?end 耗时需要 10秒,因为 main 线程并未销毁,所以 awaitTimination 方法需要阻塞 10 秒。打印 false 的原因是因为未对线程池执行 shutdown 方法:

接下来执行 shutdown() 方法进行测试,更改 main 方法。?

    public static void main(String[] args) throws InterruptedException {
        MuRunnable run1 = new MuRunnable();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,9999,999L, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
        pool.execute(run1);
        pool.shutdown();
        System.out.println("main begin !"+System.currentTimeMillis());
        System.out.println(pool.awaitTermination(10,TimeUnit.SECONDS));
        System.out.println("main    end !"+System.currentTimeMillis());
        //代码 awaitTermination(10,TimeUnit.SECONDS)
        // 的作用是最多等待 10s,也就是阻塞 10s
    }

运行结果如图,打印 begin 和 end 之间耗时了 4s,不再是 10s。因为 4s 后线程池销毁了,导致 awaitTermination 方法取消阻塞。?

7.工厂 ThreadFactory + Thread + UncaughtExceptionHandler 处理异常?

有时需要对线程池中创建的线程属性进行定制化,这时就需要配置?ThreadFactory 线程工厂。如果线程出现异常,可以结合 UncaughtExceptionHandler 处理。

新建测试用例。

package org.example.Executor;


import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutor_6 {
    static class MyRunnable implements Runnable{

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
                Thread.sleep(4000);
                System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    static class MyThreadFactory implements ThreadFactory{

        @Override
        public Thread newThread(Runnable r) {
            Thread newThread = new Thread(r);
            newThread.setName("shenyang: "+new Date());
            return newThread;
        }
    }

    public static void main(String[] args) {
        MyRunnable runnable = new MyRunnable();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,9999,9999L
                , TimeUnit.SECONDS,new LinkedBlockingQueue<>(),new MyThreadFactory());
        //pool.setThreadFactory(new MyThreadFactory());
        pool.execute(runnable);
    }
}

运行结果如图:

除了使用构造方法传递自定义 ThreadFactory 外,还可以使用 setThreadFactory() 方法来设置自定义 ThreadFactory。?

当线程运行出现异常时,则 JDK 会抛出异常,测试用例如下:

package org.example.Executor;

import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Test3 {
    static class MyRunnable implements Runnable {

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis());
            String abc = null;
            abc.indexOf(0);
            System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis());
        }
    }
    static class MyThreadFactory implements ThreadFactory {

        @Override
        public Thread newThread(Runnable r) {
            Thread newThread = new Thread(r);
            newThread.setName("shenyang: "+new Date());
            return newThread;
        }
    }
    public static void main(String[] args) {
        MyRunnable runnable = new MyRunnable();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,9999,9999L, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
        pool.setThreadFactory(new MyThreadFactory());
        pool.execute(runnable);
    }
}

运行结果如下,程序员无法自行处理异常,所以控制台直接输出了该异常:

在使用自定义线程工厂时,线程如果出现异常完全可以自定义处理的。

package org.example.Executor;

import java.util.Date;
import java.util.concurrent.ThreadFactory;

public class MyThreadFactory implements ThreadFactory {
    @Override
    public Thread newThread(Runnable r) {
        Thread newThread = new Thread(r);
        newThread.setName("我的新名:"+new Date());
        newThread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throwable e) {
                System.out.println("自定义处理异常启用:"+t.getName()+" "+e.getMessage());
            }
        });
        return newThread;
    }
}

在 ThreadFactory 中,newThread() 方法创建出来的 Thread 线程对象调用 setUncaughtExceptionHandler() 方法的作用是使这些线程具有集中、统一处理异常的能力。

运行类如下:

class Run{
    static class MyRunnable implements Runnable {

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis());
            String abc = null;
            abc.indexOf(0);
            System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis());
        }
    }
    public static void main(String[] args) {
        MyRunnable runnable = new MyRunnable();
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,9999,9999L, TimeUnit.SECONDS,new LinkedBlockingQueue<>());
        pool.setThreadFactory(new MyThreadFactory());
        pool.execute(runnable);
    }
}

运行结果如图:

8.方法 set/getRejectedExcutionHandler()

public void setRejectedExcutionHandler(RejectedExcutionHandler handler) 和?public RejectedExcutionHandler getRejectedExcutionHandler() 方法的作用是可以处理任务被拒绝执行时的行为。

package org.example.Executor;


import java.util.concurrent.*;

public class ThreadPoolExecutor_7 {
    static class MyRunnable implements Runnable{
        private String username;

        public MyRunnable(String username) {
            this.username = username;
        }

        public String getUsername() {
            return username;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
                Thread.sleep(4000);
                System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        MyRunnable runnable1 = new MyRunnable("测试1");
        MyRunnable runnable2 = new MyRunnable("测试2");
        MyRunnable runnable3 = new MyRunnable("测试3");
        MyRunnable runnable4 = new MyRunnable("测试4");
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,3,9999L
                , TimeUnit.SECONDS,new SynchronousQueue<>());
        //pool.setThreadFactory(new MyThreadFactory());
        pool.execute(runnable1);
        pool.execute(runnable2);
        pool.execute(runnable3);
        pool.execute(runnable4);
    }
}

运行结果如图,拒绝运行多余的任务:

控制台打印的信息说明 MyRunnable myRunnbale = new MyRunnbale("测试4")?;任务被拒绝执行,在出现这样的异常时可以自定义拒绝执行任务的行为,新建测试用例,创建代码如下:

package org.example.Executor;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class RunTest {
    static class MyRunnable implements Runnable{
        private String username;

        public MyRunnable(String username) {
            this.username = username;
        }

        public String getUsername() {
            return username;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
                Thread.sleep(4000);
                System.out.println(Thread.currentThread().getName()+" "+System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    static class MyRejectedExcutionHandler implements RejectedExecutionHandler{

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println(((MyRunnable) r).getUsername()+" 被拒绝执行 ");
        }
    }

    public static void main(String[] args) {
        MyRunnable runnable1 = new MyRunnable("测试1");
        MyRunnable runnable2 = new MyRunnable("测试2");
        MyRunnable runnable3 = new MyRunnable("测试3");
        MyRunnable runnable4 = new MyRunnable("测试4");
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2,3,9999L
                , TimeUnit.SECONDS,new SynchronousQueue<>());
        //pool.setThreadFactory(new MyThreadFactory());
        pool.setRejectedExecutionHandler(new MyRejectedExcutionHandler());
        pool.execute(runnable1);
        pool.execute(runnable2);
        pool.execute(runnable3);
        pool.execute(runnable4);
    }
}

程序运行结果如图,此实验可以将被拒绝的任务日志化:

9.线程池的拒绝策略?

线程池中的资源全部被占用的时候,对新添加的 Task 任务有不同的处理策略,在默认的情况下,ThreadPoolExecutor 中有 4 种不同的处理方式。

1.AbortPolicy 策略?

AbortPolicy 策略是当任务添加到线程池中被拒绝时,将抛出 RejectedExecutionException 异常,这是线程池默认使用的拒绝策略。

package org.example.Executor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Run_8 {
    public static void main(String[] args) {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                    System.out.println(Thread.currentThread().getName() + " run end!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 5
                , TimeUnit.SECONDS, new ArrayBlockingQueue<>(1)
                , new ThreadPoolExecutor.AbortPolicy());
        executor.execute(runnable);//不报错
        executor.execute(runnable);//不报错
        executor.execute(runnable);//不报错
        executor.execute(runnable);//不报错
        //executor.execute(runnable);//报错
    }

}

程序运行后不出现异常,如图:

将代码注释去掉之后,将出现异常,从而超出线程池容量。

使用 AbortPolicy 策略后,当线程数量超过 max 值时,线程池将抛出 java.util.concurrent.RejectedExecutionException 异常。

2.CallerRunsPolicy 策略?

CallerRunsPolicy 策略是当任务添加到线程池中被拒绝时,会调用线程池的 Thread 线程对象处理被拒绝的任务。

package org.example.Executor;

import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Policy_CallerRunsPolicy_1 {
    static class MyThread extends Thread {
        @Override
        public void run() {
            try {
                Thread.sleep(5000);
                System.out.println("    end " + Thread.currentThread().getName()
                        + " " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyThread thread = new MyThread();
        LinkedBlockingDeque deque = new LinkedBlockingDeque(2);
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 5
                , TimeUnit.SECONDS, deque
                , new ThreadPoolExecutor.CallerRunsPolicy());
        System.out.println("a begin " + Thread.currentThread().getName() + " "
                + System.currentTimeMillis());
        pool.execute(thread);
        pool.execute(thread);
        pool.execute(thread);
        pool.execute(thread);
        pool.execute(thread);
        pool.execute(thread);
        System.out.println("a     end  " + Thread.currentThread().getName() + " "
                + System.currentTimeMillis());
    }
}

程序运行结果如图所示,任务被交给 main 线程执行。

在上面的实验中,线程 main 被阻塞(任务中有 sleep(5000),由于拒绝策略任务交由 main 线程所以? main 线程会被睡眠),严重影响程序的运行效率,所以并不建议这样做,通过改变代码结构可以改善这种情况。?

3.DiscardOldsetPolicy 策略?

DiscardOldsetPolicy 策略是当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。

新建测试用例:

package org.example.Executor;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Policy_DiscardOldestPolicy {
    static class MyRunnable implements Runnable{
        private String username;

        public MyRunnable(String username) {
            this.username = username;
        }

        public String getUsername() {
            return username;
        }

        @Override
        public void run() {
            try {
                System.out.println(username+" run");
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue deque = new ArrayBlockingQueue(2);
        ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 3, 5
                , TimeUnit.SECONDS, deque
                , new ThreadPoolExecutor.DiscardOldestPolicy());

        for (int i = 0; i < 5; i++) {
            MyRunnable runnable = new MyRunnable("Runnable"+(i+1));
            pool.execute(runnable);
        }
        Thread.sleep(50);
        Iterator iterator = deque.iterator();
        while (iterator.hasNext()){
            Object o = iterator.next();
            System.out.println(((MyRunnable)o).getUsername() );
        }
        pool.execute(new MyRunnable("Runable6"));
        pool.execute(new MyRunnable("Runable7"));
        iterator = deque.iterator();
        while(iterator.hasNext()){
            Object o = iterator.next();
            System.out.println(((MyRunnable)o).getUsername() );
        }
    }
}

运行结果如图,早期任务 3 和任务 4 被取消:

4.DiscardPolicy策略

DiscardPolicy 策略是当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。

package org.example.Executor;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Policy_DiscardPolicy {
    public static void main(String[] args) throws InterruptedException {
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(5000);
                    System.out.println(Thread.currentThread().getName()
                            + " run end!");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        ArrayBlockingQueue queue = new ArrayBlockingQueue(2);
        ThreadPoolExecutor executor = new ThreadPoolExecutor(2,3,5,
                TimeUnit.SECONDS,queue,new ThreadPoolExecutor.DiscardPolicy());
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        executor.execute(runnable);
        Thread.sleep(8000);
        System.out.println(executor.getPoolSize()+" "+queue.size());
    }
}

运行结果如图,多余的任务被取消执行:

10.方法 afterExecute() 和 beforeExecute()

在线程池 ThreadPoolExecutor 类中重写这 2 个方法可以对线程池中执行的线程对象实现监控。

package org.example.Executor;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutor_after_before {
    static class MyRunnable implements Runnable {
        private String username;

        public MyRunnable(String username) {
            this.username = username;
        }

        public String getUsername() {
            return username;
        }

        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis());
                Thread.sleep(4000);
                System.out.println(Thread.currentThread().getName() + " " + System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static class MyThreadPoolExecutor extends ThreadPoolExecutor {

        public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);

        }

        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            System.out.println("准备执行:" + ((MyRunnable) r).getUsername());
        }

        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            System.out.println(((MyRunnable) r).getUsername() + "    执行完了:");
        }
    }

    public static void main(String[] args) {
        MyThreadPoolExecutor executor = new MyThreadPoolExecutor(2, 2
                , Integer.MAX_VALUE, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>());
        executor.execute(new MyRunnable("A1"));
        executor.execute(new MyRunnable("A2"));
        executor.execute(new MyRunnable("A3"));
        executor.execute(new MyRunnable("A4"));
    }

}

运行结果如图所示:

11.其他方法介绍

还有一些其他方法这里就不一一举例了,简单介绍下。

方法?allowsCoreThreadTimeOut 和?allowCoreThreadTimeOut(bool)

方法 allowCoreThreadTimeOut(true) 可以使核心池中的线程具有超时销毁的特性。public boolean allowsCoreThreadTimeOut()? 方法的作用是判断是否具有这个特性,public void allowCoreThreadTimeOut(boolean value) 方法的作用是设置是否有这个特性。

方法 prestartCoreThread() 和?prestartAllCoreThreads()

在实例化 ThreadPoolExecutor 类后,线程池中并没有核心线程,除非执行 execute() 方法,但是在不执行 execute() 方法时也可以通过执行?prestartCoreThread() 和?prestartAllCoreThreads() 方法来创建出核心线程。

public boolean prestartCoreThread() 方法的作用是每调用一次就创建一个核心线程并使它变成启动状态,返回值类型为 boolean,代表是否创建成功。?

public int prestartAllCoreThreads() 方法的作用是启动全部核心线程,返回值是启动核心线程的数量。

方法 remove(Runnable) 的使用?

public boolean remove(Runnable task) 方法可以删除尚未被执行的 Runnable 任务。?


总结

加油!!!

文章来源:https://blog.csdn.net/m0_62943934/article/details/134931559
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。