CompletableFuture,多线程编排利器,使用说明
前言
熟悉Java多线程并发的童鞋一定知道这些比较实用的工具类:Semaphore,CountDownLatch,CyclicBarrier,Phaser。今天在介绍一款比前面这些更加灵活而且强大的多线程编排利器:CompletableFuture。
什么是多线程编排?例如:线程A执行完了再执行线程B。线程A和B,有1个执行完了就执行线程C。线程A异常了就执行线程B。诸如此类。
CompletableFuture是在Java 8中新增加的,它提供了非常强大的Future的扩展功能(50多个方法),可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法
CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。
常用方法介绍
创建异步任务
-
supplyAsync执行CompletableFuture任务,支持返回值
-
runAsync执行CompletableFuture任务,没有返回值。
supplyAsync
//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//使用自定义线程池,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
runAsync
//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable)
//自定义线程,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
代码案例
public class CompletableFutureDemo {
public static void main(String[] args) {
//可以自定义线程池
ExecutorService executor = Executors.newCachedThreadPool();
//runAsync的使用
CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("runFuture 自定义线程池"), executor);
//supplyAsync的使用
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyFuture 自定义线程池");
return "返回值";
}, executor);
System.out.println(runFuture.join());
System.out.println(supplyFuture.join());
executor.shutdown();
}
}
任务异步回调
thenRun/thenRunAsync
做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值
public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);
案例代码
public class FutureThenRunTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("先执行第一个任务");
return "第一个方法返回值";
}
);
CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {
System.out.println("接着执行第二个任务");
});
System.out.println(thenRunFuture.get());
}
}
//控制台输出
先执行第一个任务
接着执行第二个任务
null
两个方法的区别
如果你执行第一个任务的时候,传入了一个自定义线程池
调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池。
调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池
后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个
源码
private static final Executor asyncPool = useCommonPool ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
public CompletableFuture<Void> thenRun(Runnable action) {
return uniRunStage(null, action);
}
public CompletableFuture<Void> thenRunAsync(Runnable action) {
return uniRunStage(asyncPool, action);
}
thenAccept/thenAcceptAsync
thenAccept方法表示:第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的
public class FutureThenAcceptTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务已执行");
return "第一个任务返回值";
}
);
CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {
if ("第一个任务返回值".equals(a)) {
System.out.println("满足条件,第二个任务已执行");
} else {
System.out.println("不满足条件,第二个任务未执行");
}
});
System.out.println(thenAcceptFuture.get());
}
}
//控制台输出
第一个任务已执行
满足条件,第二个任务已执行
null
thenApply/thenApplyAsync
thenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。
public class FutureThenApplyTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务");
return "第一个任务返回值";
}
);
CompletableFuture<String> thenAcceptFuture = orgFuture.thenApply((a) -> {
if ("第一个任务返回值".equals(a)) {
System.out.println("满足条件,第二个任务已执行");
} else {
System.out.println("不满足条件,第二个任务未执行");
}
return "第二个任务返回值";
});
System.out.println(thenAcceptFuture.get());
}
}
//控制台输出
第一个任务
满足条件,第二个任务已执行
第二个任务返回值
exceptionally
-
exceptionally方法表示,某个任务执行异常时,执行回调方法;并且会将该任务抛出的异常作为参数,传递到回调方法中。
-
public class FutureExceptionTest { public static void main(String[] args) throws Exception { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("第一个任务:"+ Thread.currentThread().getName()); throw new RuntimeException(); } ); CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> { e.printStackTrace(); System.out.println("异常回调任务:"+ Thread.currentThread().getName()); return "异常回调任务返回值"; }); System.out.println(exceptionFuture.get()); } } //控制台输出 第一个任务:ForkJoinPool.commonPool-worker-9 异常回调任务:ForkJoinPool.commonPool-worker-9 异常回调任务返回值 java.util.concurrent.CompletionException: java.lang.RuntimeException at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606) at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596) at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) Caused by: java.lang.RuntimeException at com.gem.j2se.thread.future.FutureExceptionTest.lambda$main$0(FutureExceptionTest.java:16) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604) ... 5 more
whenComplete
-
某个任务执行完成后,执行回调方法,回调方法无返回值;whenComplete方法返回的CompletableFuture的result是上个任务的结果。
-
public class FutureWhenCompleteTest { public static void main(String[] args) throws Exception { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("第一个任务:" + Thread.currentThread().getName()); return "第一个任务返回值"; } ); CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> { System.out.println("whenComplete任务:" + Thread.currentThread().getName()); System.out.println("上个任务返回值:" + a); System.out.println("上个任务异常:" + throwable); }); System.out.println(rstFuture.get()); } } //控制台输出 第一个任务:ForkJoinPool.commonPool-worker-9 whenComplete任务:main 上个任务返回值:第一个任务返回值 上个任务异常:null
handle
-
某个任务执行完成后,执行回调方法,回调方法有返回值;并且handle方法返回的CompletableFuture的result是回调方法执行的结果。
-
public class FutureHandlerTest { public static void main(String[] args) throws Exception { CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> { System.out.println("第一个任务:" + Thread.currentThread().getName()); return "第一个任务返回值"; } ); CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> { System.out.println("whenComplete任务:" + Thread.currentThread().getName()); System.out.println("上个任务返回值:" + a); System.out.println("上个任务异常:" + throwable); return "回调方法返回值"; }); System.out.println(rstFuture.get()); } } //控制台输出 第一个任务:ForkJoinPool.commonPool-worker-9 whenComplete任务:main 上个任务返回值:第一个任务返回值 上个任务异常:null 回调方法返回值
多个任务条件组合
AND组合关系
thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务。
thenCombine
将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
thenAcceptBoth
将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
runAfterBoth
不会把执行结果当做方法入参,且没有返回值。
OR组合的关系
applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。
applyToEither
将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
acceptEither
将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
runAfterEither
不会把执行结果当做方法入参,且没有返回值。
allOf
多个任务都执行完成后,才执行allOf返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常
public class FutureAllOfTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务:" + Thread.currentThread().getName());
return "第一个任务返回值";
});
CompletableFuture<String> b = CompletableFuture.supplyAsync(() -> {
System.out.println("第二个任务:" + Thread.currentThread().getName());
return "第二个任务返回值";
});
CompletableFuture<String> c = CompletableFuture.supplyAsync(() -> {
System.out.println("第三个任务:" + Thread.currentThread().getName());
return "第三个任务返回值";
});
CompletableFuture<Void> future = CompletableFuture.allOf(a, b, c).whenComplete((result, ex) -> {
System.out.println("回调任务入参:" + result + "," + ex);
});
}
}
//控制台输出
第一个任务:ForkJoinPool.commonPool-worker-9
第二个任务:ForkJoinPool.commonPool-worker-2
第三个任务:ForkJoinPool.commonPool-worker-2
回调任务入参:null,null
anyOf
任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常
public class FutureAnyOfTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务:" + Thread.currentThread().getName());
return "第一个任务返回值";
});
CompletableFuture<String> b = CompletableFuture.supplyAsync(() -> {
System.out.println("第二个任务:" + Thread.currentThread().getName());
return "第二个任务返回值";
});
CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((result, ex) -> {
System.out.println("回调任务入参:" + result + "," + ex);
});
anyOfFuture.join();
}
}
//控制台输出
第一个任务:ForkJoinPool.commonPool-worker-9
第二个任务:ForkJoinPool.commonPool-worker-9
回调任务入参:第一个任务返回值,null
thenCompose
thenCompose 可以用于组合多个CompletableFuture的结果,将前一个任务的返回结果作为下一个任务的参数,它们之间存在着业务逻辑上的先后顺序。
thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例。
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
案例
public class FutureThenComposeTest {
public static void main(String[] args) throws Exception {
CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
System.out.println("第一个任务:" + Thread.currentThread().getName());
return "第一个任务返回值";
});
CompletableFuture<String> b = a.thenCompose(result -> CompletableFuture.supplyAsync(() -> {
System.out.println("第二个任务:" + Thread.currentThread().getName());
return result+", 第二个任务返回值";
}));
CompletableFuture<String> c = b.thenCompose(result -> CompletableFuture.supplyAsync(() -> {
System.out.println("第三个任务:" + Thread.currentThread().getName());
return result+", 第三个任务返回值";
}));
System.out.println(c.join());
}
}
//控制台输出
第一个任务:ForkJoinPool.commonPool-worker-9
第二个任务:ForkJoinPool.commonPool-worker-9
第三个任务:ForkJoinPool.commonPool-worker-9
第一个任务返回值, 第二个任务返回值, 第三个任务返回值
使用注意事项
Future需要获取返回值才能获取异常
Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。大家使用的时候,最好注意一下。
CompletableFuture的get()是阻塞的
CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,可以考虑添加超时时间
CompletableFuture.get();
//超时时间和时间单位
CompletableFuture.get(5, TimeUnit.SECONDS);
注意默认线程池
CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应可能会很慢。建议使用自定义线程池并优化线程池配置参数。
注意自定义线程池的饱和策略
CompletableFuture的get()方法是阻塞的,一般建议使用带超时时间的get方法。并且建议使用自定义线程池。
但是如果线程池拒绝策略是DiscardPolicy
或者DiscardOldestPolicy
,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,建议做好线程池隔离。
文章参考
https://juejin.cn/post/6970558076642394142
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。 如若内容造成侵权/违法违规/事实不符,请联系我的编程经验分享网邮箱:veading@qq.com进行投诉反馈,一经查实,立即删除!