Java 多线程之线程池基础
一、概述
-
线程池是一种管理和复用线程的机制,它包含一组预先创建的线程,用于执行各种任务。线程池的主要作用是提高多线程应用程序的性能和效率,并提供对线程的生命周期和资源的管理,包括线程的创建、销毁和复用。主要作用如下:
- 降低资源消耗:线程的创建和销毁是一项开销较大的操作。通过使用线程池,可以避免频繁地创建和销毁线程,从而降低了系统的资源消耗。
- 提高响应速度:线程池中的线程可以立即执行任务,而不需要等待线程的创建。这可以显著提高任务的响应速度,尤其是在大量并发任务的情况下。
- 控制并发线程数量:线程池可以限制并发线程的数量,防止系统因为创建过多的线程而导致资源耗尽或性能下降。通过设置适当的线程池大小,可以在充分利用系统资源的同时,避免线程过多带来的负面影响。
- 提供线程管理和监控:线程池提供了对线程的生命周期和状态的管理。可以通过线程池来监控线程的执行情况、统计任务执行时间等,方便调试和性能优化。
- 支持任务队列:线程池通常与任务队列结合使用,将待执行的任务放入队列中,由线程池中的线程按照一定的调度策略进行执行。任务队列可以缓冲任务,避免任务提交过快导致系统负荷过大,同时提供了一种异步执行任务的机制。
-
线程池相关类关系图
二、Executors 说明
-
使用线程池,最简单的方法就是使用 Executors 类。Executors 有以下核心方法
- newSingleThreadExecutor 创建一个只有一个线程的线程池对象,使用 SynchronousQueue 保存任务队列。
- newFixedThreadPool 创建一个固定核心线程的线程池对象,使用 LinkedBlockingQueue 保存任务队列。
- newCachedThreadPool 创建一个没有核心线程的线程池对象,使用 LinkedBlockingQueue 保存任务队列。没有线程空闲,就新起一个线程来处理任务。
- newScheduledThreadPool 创建一个定时任务线程池对象,使用 DelayedWorkQueue 保存任务队列。
- newWorkStealingPool 创建一个任务分叉线程池对象,每个线程都单独的任务队列,当自己没有任务的时候再去别人任务队列拿任务。
-
Executors 使用方法
- 使用 Executors 创建线程池对象 executorService 后可以执行任务。
- 可以使用 ExecutorService 的 execute 方法异步执行无返回值的 Runnable 接口任务。
- 可以使用 ExecutorService 的 submit 方法异步执行有返回值的 Runnable、Callable 接口任务。
- 可以使用 ExecutorService 的 invokeXXX 方法同步执行 Runnable、Callable 接口任务。
private static void testExecutors() throws ExecutionException, InterruptedException { // 创建一个线程池,核心线程数为2(默认运行的线程数量为2) ExecutorService executorService = Executors.newFixedThreadPool(2); //ExecutorService executorService = Executors.newCachedThreadPool(); //ExecutorService executorService = Executors.newSingleThreadExecutor(); // 方法一:使用线程池执行任务,没有返回值 executorService.execute(new Runnable() { @Override public void run() { someTask(); } }); // 方法二:使用线程池提交任务,有返回值 Future<Boolean> future = executorService.submit(new Callable<Boolean>() { @Override public Boolean call() throws Exception { someTask(); return true; } }); // 方法三:执行任务多个,有返回值 List<Callable<Boolean>> callables = new ArrayList<>(); callables.add(()->{ // 这是一个 Lambda 表达式任务 return true; }); List<Future<Boolean>> futures = executorService.invokeAll(callables); // 等待任务执行完成 executorService.shutdown(); } private static void someTask() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("这里是一个复杂的业务逻辑(耗时任务)"); }
二、线程池 Hello World
-
下面是使用 Executors 创建线程池并执行线程的示例。
- Executors 的静态方法 newFixedThreadPool 可以创建线程池,并返回 ExecutorService 对象。
- ExecutorService 对象的 submit 方法运行任务,任务可以是 Runnable 和 Callable。
-
下面创建一个线程池,包括2个核心线程,执行一个 Lambda 表达式定义的任务
- 注意:我这里只添了一个任务,实际使用时可以使用 submit 添加多个任务到线程池。
private static void test1() throws ExecutionException, InterruptedException { // 创建一个线程池,核心线程数为2(默认运行的线程数量为2) ExecutorService executorService = Executors.newFixedThreadPool(2); // submit 是异步的 Future<Integer> future = executorService.submit(() -> { // 这里是一个复杂的业务逻辑(耗时任务) return 1; }); // get 是同步的,阻塞的 int result = future.get(); System.out.println("result="+result); executorService.shutdown(); }
-
为了方便理解,我将上面的 Lambda 表达式任务改成 Callable 接口任务,如下:
private static void test2() throws ExecutionException, InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2); Callable callable = new Callable<Integer>() { @Override public Integer call() throws Exception { // 这里是一个复杂的业务逻辑(耗时任务) return 2; } }; // submit 是异步的 Future<Integer> future = executorService.submit(callable); // get 是同步的,阻塞的 int result = future.get(); System.out.println("result="+result); executorService.shutdown(); }
三、FutureTask 的作用
-
从上面线程池的例子中可以看出当向线程池中提交任务(submit)时返回了一个 Future 对象。
-
FutureTask是Java中的一个实现了Future和Runnable接口的类,用于表示一个异步计算任务。用于获取计算结果、取消任务、判断任务是否完成等功能。主要方法如下:
- get() 获取由FutureTask执行的计算任务的结果。如果计算任务尚未完成,get()方法会阻塞当前线程,直到计算任务完成并返回结果。
- cancel() 取消尚未开始或正在执行的任务。取消任务后,如果任务尚未开始执行,则可以阻止任务的执行;如果任务正在执行,则可以尝试中断任务的执行。
- isDone() 判断FutureTask封装的任务是否已经完成。当任务执行完成或被取消时,isDone()方法会返回true。
-
下面使用FutureTask包装 Lambda 任务,然后新创建一个线程来执行,再使用 FutureTask 的 get 方法获取返回值。
- 注意:任务类型如果是 Runnable 类型是不能获取返回值,而任务是 Callable 类型才获取返回值。因为 Runnable 的 run 无返回值,而 Callable 的 call 方法有返回值。
private static void test3() throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<>(() -> { // 这里是一个复杂的业务逻辑(耗时任务) return 3; }); new Thread(futureTask).start(); System.out.println("result="+futureTask.get()); }
四、CompletableFuture 的作用
-
Future 还有一个很好用的子类叫 CompletableFuture 。他简化了 Future 类型任务执行,主要功能如下:
- 异步执行:CompletableFuture可以用于执行异步操作,例如网络请求、数据库查询等,而不会阻塞主线程。通过supplyAsync()或runAsync()等方法,可以将任务提交给线程池执行,并返回一个CompletableFuture对象,表示异步任务的执行结果。
- 链式操作:CompletableFuture支持链式操作,可以通过方法链的方式组合多个操作,形成一个任务流水线。通过调用thenApply()、thenAccept()、thenCompose()等方法,可以在任务完成后触发回调函数,并对任务的结果进行处理或执行其他操作。
- 合并多个任务:CompletableFuture提供了一些静态方法,如allOf()、anyOf()等,用于合并多个CompletableFuture对象的结果。allOf()方法在所有任务完成后返回一个CompletableFuture,它的结果是一个包含所有任务结果的CompletableFuture数组。anyOf()方法在任意一个任务完成后返回一个CompletableFuture,它的结果是第一个完成的任务的结果。
- 异常处理:CompletableFuture提供了一些方法,如exceptionally()和handle(),用于处理任务执行过程中的异常情况。可以在任务执行出现异常时触发回调函数,并对异常进行处理或返回一个默认值。
- 等待任务完成:可以使用join()方法来等待CompletableFuture的任务执行完成,并获取任务的结果。与get()方法不同的是,join()方法不会抛出已检查异常,使得它更适合在函数式编程中使用。
-
CompletableFuture 使用示例
private static void test4() throws ExecutionException, InterruptedException { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { // 这里是一个复杂的业务逻辑(耗时任务) return 1; }); CompletableFuture<Integer> completableFuture2 = CompletableFuture.supplyAsync(() -> { // 这里是一个复杂的业务逻辑(耗时任务) return 2; }); CompletableFuture<Integer> completableFuture3 = CompletableFuture.supplyAsync(() -> { // 这里是一个复杂的业务逻辑(耗时任务) return 3; }); // 等待所有任务完成 CompletableFuture.allOf(completableFuture1, completableFuture2, completableFuture3); System.out.println("r1="+completableFuture1.get()+",r2="+completableFuture2.get()+",r3="+completableFuture3.get()); // 链式调用 CompletableFuture completableFuture4 = CompletableFuture.supplyAsync(() -> { // 这里是一个复杂的业务逻辑(耗时任务) return 4; }).thenApply(String::valueOf).thenApply(text->"hello "+ text).thenAccept(System.out::println); }
五、定时任务线程池使用
-
使用 Executors.newScheduledThreadPool 可以创建定时任务线程池,定时任务线池可以使任务在某个时间点上执行.
-
如下,创建的线程池执行任务时,任务将在30分钟以后执行。
private static void test5() throws ExecutionException, InterruptedException { // 如下,每隔30分钟执行一次任务 ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2); scheduledExecutorService.scheduleWithFixedDelay(() -> { // 这里是一个复杂的业务逻辑(耗时任务) }, 0, 30, TimeUnit.MINUTES); // 30 表示延后30分钟 // 等待任务执行 scheduledExecutorService.shutdown(); }
六、任务分叉线程池
-
任务分叉合并线程池内部使用 ForkJoinPool 实现,并且每个线程都有自己的工作队列。当某个线程的队列为空时,它可以从其他线程的队列中“窃取”任务来执行。
任务分叉合并线程池 ForkJoinPool 可以执以下两种任务
-
RecursiveAction 不带返回值的递归任务
-
RecursiveTask 带返回值的递归任务
RecursiveAction action = new RecursiveAction() { @Override protected void compute() { // 复杂计算任务 } }; RecursiveTask task = new RecursiveTask() { @Override protected Object compute() { // 复杂计算任务 return null; } }; ForkJoinPool p = new ForkJoinPool(); p.execute(action); p.execute(task);
-
-
这种线程池适用于执行大量的独立、耗时较长的任务,可以更好地利用多核处理器的性能。
请注意,工作窃取线程池在不同的操作系统和Java版本上可能会有一些差异,因此在具体的应用场景中,你可能需要根据性能和需求进行调优。
private static void test6() throws ExecutionException, InterruptedException { // 分叉任务线程池 ExecutorService executorService = Executors.newWorkStealingPool(); executorService.execute(() -> { // 这里是一个复杂的业务逻辑(耗时任务) }); // 等待任务执行 executorService.shutdown(); }
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!