CompletableFuture,多线程编排利器,使用说明

2023-12-15 23:43:43

前言

熟悉Java多线程并发的童鞋一定知道这些比较实用的工具类:Semaphore,CountDownLatch,CyclicBarrier,Phaser。今天在介绍一款比前面这些更加灵活而且强大的多线程编排利器:CompletableFuture。

什么是多线程编排?例如:线程A执行完了再执行线程B。线程A和B,有1个执行完了就执行线程C。线程A异常了就执行线程B。诸如此类。

CompletableFuture是在Java 8中新增加的,它提供了非常强大的Future的扩展功能(50多个方法),可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合CompletableFuture的方法

CompletableFuture类实现了Future接口,所以你还是可以像以前一样通过get方法阻塞或者轮询的方式获得结果,但是这种方式不推荐使用。CompletableFuture和FutureTask同属于Future接口的实现类,都可以获取线程的执行结果。

常用方法介绍

创建异步任务

  • supplyAsync执行CompletableFuture任务,支持返回值

  • runAsync执行CompletableFuture任务,没有返回值。

supplyAsync

//使用默认内置线程池ForkJoinPool.commonPool(),根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
//使用自定义线程池,根据supplier构建执行任务
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

runAsync

//使用默认内置线程池ForkJoinPool.commonPool(),根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable) 
//自定义线程,根据runnable构建执行任务
public static CompletableFuture<Void> runAsync(Runnable runnable,  Executor executor)

代码案例

public class CompletableFutureDemo {
    public static void main(String[] args) {
        //可以自定义线程池
        ExecutorService executor = Executors.newCachedThreadPool();
        //runAsync的使用
        CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("runFuture 自定义线程池"), executor);
        //supplyAsync的使用
        CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("supplyFuture 自定义线程池");
            return "返回值";
        }, executor);
        System.out.println(runFuture.join());
        System.out.println(supplyFuture.join());
        executor.shutdown();
    }
}

任务异步回调

thenRun/thenRunAsync

做完第一个任务后,再做第二个任务。某个任务执行完成后,执行回调方法;但是前后两个任务没有参数传递,第二个任务也没有返回值

public CompletableFuture<Void> thenRun(Runnable action);
public CompletableFuture<Void> thenRunAsync(Runnable action);

案例代码

public class FutureThenRunTest {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {
                    System.out.println("先执行第一个任务");
                    return "第一个方法返回值";
                }
        );

        CompletableFuture thenRunFuture = orgFuture.thenRun(() -> {
            System.out.println("接着执行第二个任务");
        });

        System.out.println(thenRunFuture.get());
    }
}

//控制台输出
先执行第一个任务
接着执行第二个任务
null

两个方法的区别

如果你执行第一个任务的时候,传入了一个自定义线程池

调用thenRun方法执行第二个任务时,则第二个任务和第一个任务是共用同一个线程池

调用thenRunAsync执行第二个任务时,则第一个任务使用的是你自己传入的线程池,第二个任务使用的是ForkJoin线程池

后面介绍的thenAccept和thenAcceptAsync,thenApply和thenApplyAsync等,它们之间的区别也是这个

源码

private static final Executor asyncPool = useCommonPool ?
     ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
     
 public CompletableFuture<Void> thenRun(Runnable action) {
     return uniRunStage(null, action);
 }
   
 public CompletableFuture<Void> thenRunAsync(Runnable action) {
     return uniRunStage(asyncPool, action);
 }

thenAccept/thenAcceptAsync

thenAccept方法表示:第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值

public class FutureThenAcceptTest {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {
                    System.out.println("第一个任务已执行");
                    return "第一个任务返回值";
                }
        );

        CompletableFuture thenAcceptFuture = orgFuture.thenAccept((a) -> {
            if ("第一个任务返回值".equals(a)) {
                System.out.println("满足条件,第二个任务已执行");
            } else {
                System.out.println("不满足条件,第二个任务未执行");
            }
        });
        System.out.println(thenAcceptFuture.get());
    }
}

//控制台输出
第一个任务已执行
满足条件,第二个任务已执行
null

thenApply/thenApplyAsync

thenApply方法表示,第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。

public class FutureThenApplyTest {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {
                    System.out.println("第一个任务");
                    return "第一个任务返回值";
                }
        );

        CompletableFuture<String> thenAcceptFuture = orgFuture.thenApply((a) -> {
            if ("第一个任务返回值".equals(a)) {
                System.out.println("满足条件,第二个任务已执行");
            } else {
                System.out.println("不满足条件,第二个任务未执行");
            }
            return "第二个任务返回值";
        });
        System.out.println(thenAcceptFuture.get());
    }
}

//控制台输出
第一个任务
满足条件,第二个任务已执行
第二个任务返回值

exceptionally

  1. exceptionally方法表示,某个任务执行异常时,执行回调方法;并且会将该任务抛出的异常作为参数,传递到回调方法中。

  2. public class FutureExceptionTest {
        public static void main(String[] args) throws Exception {
            CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {
                        System.out.println("第一个任务:"+ Thread.currentThread().getName());
                        throw new RuntimeException();
                    }
            );
    
            CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {
                e.printStackTrace();
                System.out.println("异常回调任务:"+ Thread.currentThread().getName());
                return "异常回调任务返回值";
            });
    
            System.out.println(exceptionFuture.get());
        }
    }
    
    //控制台输出
    第一个任务:ForkJoinPool.commonPool-worker-9
    异常回调任务:ForkJoinPool.commonPool-worker-9
    异常回调任务返回值
    java.util.concurrent.CompletionException: java.lang.RuntimeException
    	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
    	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1606)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
    	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
    	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
    	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
    Caused by: java.lang.RuntimeException
    	at com.gem.j2se.thread.future.FutureExceptionTest.lambda$main$0(FutureExceptionTest.java:16)
    	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
    	... 5 more
    

whenComplete

  1. 某个任务执行完成后,执行回调方法,回调方法无返回值;whenComplete方法返回的CompletableFuture的result是上个任务的结果。

  2. public class FutureWhenCompleteTest {
        public static void main(String[] args) throws Exception {
            CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {
                        System.out.println("第一个任务:" + Thread.currentThread().getName());
                        return "第一个任务返回值";
                    }
            );
    
            CompletableFuture<String> rstFuture = orgFuture.whenComplete((a, throwable) -> {
                System.out.println("whenComplete任务:" + Thread.currentThread().getName());
                System.out.println("上个任务返回值:" + a);
                System.out.println("上个任务异常:" + throwable);
            });
    
            System.out.println(rstFuture.get());
        }
    }
    
    //控制台输出
    第一个任务:ForkJoinPool.commonPool-worker-9
    whenComplete任务:main
    上个任务返回值:第一个任务返回值
    上个任务异常:null
    

handle

  1. 某个任务执行完成后,执行回调方法,回调方法有返回值;并且handle方法返回的CompletableFuture的result是回调方法执行的结果。

  2. public class FutureHandlerTest {
        public static void main(String[] args) throws Exception {
            CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(() -> {
                        System.out.println("第一个任务:" + Thread.currentThread().getName());
                        return "第一个任务返回值";
                    }
            );
    
            CompletableFuture<String> rstFuture = orgFuture.handle((a, throwable) -> {
                System.out.println("whenComplete任务:" + Thread.currentThread().getName());
                System.out.println("上个任务返回值:" + a);
                System.out.println("上个任务异常:" + throwable);
                return "回调方法返回值";
            });
    
            System.out.println(rstFuture.get());
        }
    }
    
    //控制台输出
    第一个任务:ForkJoinPool.commonPool-worker-9
    whenComplete任务:main
    上个任务返回值:第一个任务返回值
    上个任务异常:null
    回调方法返回值
    

多个任务条件组合

AND组合关系

thenCombine / thenAcceptBoth / runAfterBoth都表示:将两个CompletableFuture组合起来,只有这两个都正常执行完了,才会执行某个任务。

thenCombine

将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值

thenAcceptBoth

将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值

runAfterBoth

不会把执行结果当做方法入参,且没有返回值。

OR组合的关系

applyToEither / acceptEither / runAfterEither 都表示:将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务。

applyToEither

将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值

acceptEither

将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值

runAfterEither

不会把执行结果当做方法入参,且没有返回值。

allOf

多个任务都执行完成后,才执行allOf返回的CompletableFuture。如果任意一个任务异常,allOf的CompletableFuture,执行get方法,会抛出异常

public class FutureAllOfTest {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务:" + Thread.currentThread().getName());
            return "第一个任务返回值";
        });
        CompletableFuture<String> b = CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务:" + Thread.currentThread().getName());
            return "第二个任务返回值";
        });
        CompletableFuture<String> c = CompletableFuture.supplyAsync(() -> {
            System.out.println("第三个任务:" + Thread.currentThread().getName());
            return "第三个任务返回值";
        });
        CompletableFuture<Void> future = CompletableFuture.allOf(a, b, c).whenComplete((result, ex) -> {
            System.out.println("回调任务入参:" + result + "," + ex);
        });
    }
}

//控制台输出
第一个任务:ForkJoinPool.commonPool-worker-9
第二个任务:ForkJoinPool.commonPool-worker-2
第三个任务:ForkJoinPool.commonPool-worker-2
回调任务入参:null,null

anyOf

任意一个任务执行完,就执行anyOf返回的CompletableFuture。如果执行的任务异常,anyOf的CompletableFuture,执行get方法,会抛出异常

public class FutureAnyOfTest {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务:" + Thread.currentThread().getName());
            return "第一个任务返回值";
        });
        CompletableFuture<String> b = CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务:" + Thread.currentThread().getName());
            return "第二个任务返回值";
        });
        CompletableFuture<Object> anyOfFuture = CompletableFuture.anyOf(a, b).whenComplete((result, ex) -> {
            System.out.println("回调任务入参:" + result + "," + ex);
        });
        anyOfFuture.join();
    }
}

//控制台输出
第一个任务:ForkJoinPool.commonPool-worker-9
第二个任务:ForkJoinPool.commonPool-worker-9
回调任务入参:第一个任务返回值,null

thenCompose

thenCompose 可以用于组合多个CompletableFuture的结果,将前一个任务的返回结果作为下一个任务的参数,它们之间存在着业务逻辑上的先后顺序。

thenCompose方法会在某个任务执行完成后,将该任务的执行结果作为方法入参然后执行指定的方法,该方法会返回一个新的CompletableFuture实例。

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

案例

public class FutureThenComposeTest {
    public static void main(String[] args) throws Exception {
        CompletableFuture<String> a = CompletableFuture.supplyAsync(() -> {
            System.out.println("第一个任务:" + Thread.currentThread().getName());
            return "第一个任务返回值";
        });
        CompletableFuture<String> b = a.thenCompose(result -> CompletableFuture.supplyAsync(() -> {
            System.out.println("第二个任务:" + Thread.currentThread().getName());
            return result+", 第二个任务返回值";
        }));
        CompletableFuture<String> c = b.thenCompose(result -> CompletableFuture.supplyAsync(() -> {
            System.out.println("第三个任务:" + Thread.currentThread().getName());
            return result+", 第三个任务返回值";
        }));
        System.out.println(c.join());
    }
}

//控制台输出
第一个任务:ForkJoinPool.commonPool-worker-9
第二个任务:ForkJoinPool.commonPool-worker-9
第三个任务:ForkJoinPool.commonPool-worker-9
第一个任务返回值, 第二个任务返回值, 第三个任务返回值

使用注意事项

Future需要获取返回值才能获取异常

Future需要获取返回值,才能获取到异常信息。如果不加 get()/join()方法,看不到异常信息。大家使用的时候,最好注意一下。

CompletableFuture的get()是阻塞的

CompletableFuture的get()方法是阻塞的,如果使用它来获取异步调用的返回值,可以考虑添加超时时间

CompletableFuture.get();
//超时时间和时间单位
CompletableFuture.get(5, TimeUnit.SECONDS);

注意默认线程池

CompletableFuture代码中又使用了默认的线程池,处理的线程个数是电脑CPU核数-1。在大量请求过来的时候,处理逻辑复杂的话,响应可能会很慢。建议使用自定义线程池并优化线程池配置参数。

注意自定义线程池的饱和策略

CompletableFuture的get()方法是阻塞的,一般建议使用带超时时间的get方法。并且建议使用自定义线程池。

但是如果线程池拒绝策略是DiscardPolicy或者DiscardOldestPolicy,当线程池饱和时,会直接丢弃任务,不会抛弃异常。因此建议,CompletableFuture线程池策略最好使用AbortPolicy,然后耗时的异步线程,建议做好线程池隔离。

文章参考

https://juejin.cn/post/6970558076642394142

文章来源:https://blog.csdn.net/namelessmyth/article/details/135026527
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。