异步回调模式

2023-12-14 03:47:00

异步回调

所谓异步回调,本质上就是多线程中线程的通信,如今很多业务系统中,某个业务或者功能调用多个外部接口,通常这种调用就是异步的调用。如何得到这些异步调用的结果自然也就很重要了。
Callable、Future、FutureTask

public class test implements Callable<Boolean>{
    public static void main(String[] args) {
        test a=new test();
        FutureTask futureTask=new FutureTask<>(a);
        new Thread(futureTask).start();
        Object su=null;
        try {
            su=futureTask.get();
        }catch (Exception e){
            e.printStackTrace();
        }
        System.out.println(su);
    }
    @Override
    public Boolean call() throws Exception {
        return null;
    }
}

FutureTask和Callable都是泛型类,泛型参数表示返回结果的类型。通过FutureTask获取异步线程的执行结果,但是其调用get()方法获取异步结果时,主线程也会被阻塞。属于异步阻塞模式。异步阻塞模式属于主动模式的异步调用,异步回调属于被动模式的异步调用。Java中回调模式的标准实现类为CompletableFuture。由于此类出现时间比较晚,期间Guava和Netty等都提出了自己的异步回调模式API来使用。这里主要介绍CompletableFuture,其他的有时间后面再学习。

CompletableFuture

在这里插入图片描述
CompletableFuture实现Future和CompletionStage两个接口。此类的实例作为一个异步任务,可以在自己异步执行完成之后触发一些其他的异步任务,从而达到异步回调的效果。主要方法如下所示:

runAsync和supplyAsync创建子任务

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) {
        return asyncSupplyStage(screenExecutor(executor), supplier);
}
public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
}
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor) {
        return asyncRunStage(screenExecutor(executor), runnable);
}

可以看出runAsync没有返回值,supplyAsync有返回值,此处用supplyAsync举例:

ExecutorService executor= Executors.newFixedThreadPool(10);
CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
        return "你好,周先生";
},executor);
System.out.println(completableFuture.get());//输出你好,周先生
executor.shutdown();

上例中的线程池可以自己构造,如若不指定使用CompletableFuture中默认的线程池ForkJoinPool。
handle()方法统一处理异常和结果

//在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handle(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(null, fn);
}
//可能不在执行任务的同一个线程中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn) {
    return uniHandleStage(asyncPool, fn);
}
//在指定线程池executor中处理异常和结果
public <U> CompletableFuture<U> handleAsync(
    BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
    return uniHandleStage(screenExecutor(executor), fn);
}

案例:

CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
     throw new RuntimeException("你好");
});
completableFuture.handle(new BiFunction<String,Throwable,String>(){
     @Override
     public String apply(String s, Throwable throwable) {
          if(throwable==null){
               System.out.println("mei");;
          }else {
               System.out.println("出错了");
          }
          return "ok";
     }
});

异步任务的串行执行

主要方法为以下几种:thenApply()、thenAccept()、thenRun()和 thenCompose()。

thenApply()
此方法实现异步任务的串行化执行,前一个任务结果作为下一个任务的入参。

	后一个任务与前一个任务在同一个线程中执行
	public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }
	//后一个任务与前一个任务不在同一个线程中执行
    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }
	//后一个任务在指定的executor线程池中执行
    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn, Executor executor) {
        return uniApplyStage(screenExecutor(executor), fn);
    }

其中泛型参数T:上一个任务所返回结果的类型。泛型参数U:当前任务的返回类型。
案例:

		ExecutorService executor= Executors.newFixedThreadPool(10);
        CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,周先生";
        },executor).thenApplyAsync(new Function<String,String>() {
            @Override
            public String apply(String s) {
                System.out.println(Thread.currentThread().getId());//13
                return "你好,毛先生";
            }
        });

        System.out.println(completableFuture.get());//输出你好,毛先生
        executor.shutdown();

thenRun()
此方法不关心任务的处理结果。只要前一个任务执行完成,就开始执行后一个串行任务。而且没有返回值。

	//后一个任务与前一个任务在同一个线程中执行
	public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }
	//后一个任务与前一个任务可以不在同一个线程中执行
    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }
	//后一个任务在executor线程池中执行
    public CompletableFuture<Void> thenRunAsync(Runnable action,
                                                Executor executor) {
        return uniRunStage(screenExecutor(executor), action);
    }

thenAccept()
使用此方法时一个任务可以接收(或消费)前一个任务的处理结果,但是后一个任务没有结果输出。

	//后一个任务与前一个任务在同一个线程中执行
	public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(null, other, action);
    }
	//后一个任务与前一个任务不在同一个线程中执行
    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(asyncPool, other, action);
    }
	//后一个任务在指定的executor线程池中执行
    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action, Executor executor) {
        return biAcceptStage(screenExecutor(executor), other, action);
    }

thenCompose()
对两个任务进行串行的调度操作,第一个任务操作完成时,将其结果作为参数传递给第二个任务。

	//后一个任务与前一个任务在同一个线程中执行
	public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(null, fn);
    }
	//后一个任务与前一个任务不在同一个线程中执行
    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(asyncPool, fn);
    }
	//后一个任务在指定的executor线程池中执行
    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn,
        Executor executor) {
        return uniComposeStage(screenExecutor(executor), fn);
    }

thenCompose()方法第二个任务的返回值是一个CompletionStage异步实例。

		ExecutorService executor= Executors.newFixedThreadPool(10);
        CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,周先生";
        },executor).thenComposeAsync(new Function<String,CompletableFuture<String>>(){
            @Override
            public CompletableFuture<String> apply(String s) {
                return CompletableFuture.supplyAsync(()->{
                    System.out.println(Thread.currentThread().getId());//12
                    return "你好,毛先生";
                });
            }
        });
        System.out.println(completableFuture.get());//输出你好,毛先生
        executor.shutdown();

异步任务的合并执行

主要实现为以下几个方法:thenCombine()、runAfterBoth()、
thenAcceptBoth()。

thenCombine()
thenCombine()会在两个CompletionStage任务都执行完成后,一块来处理两个任务的执行结果。如果要合并多个任务,可以使用allOf()。

	//合并第二步任务的CompletionStage实例,返回第三步任务的CompletionStage
	public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }
	//不一定在同一个线程中执行第三步任务的CompletionStage实例
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }
	//第三步任务的CompletionStage实例在指定的executor线程池中执行
    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn, Executor executor) {
        return biApplyStage(screenExecutor(executor), other, fn);
    }

其中泛型参数T:表示第一个任务所返回结果的类型。泛型参数U:表示第二个任务所返回结果的类型。泛型参数V:表示第三个任务所返回结果的类型。
案例:

		CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,周先生";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,毛先生";
        });
        CompletableFuture<String> future3 = future1.thenCombine(future2, new BiFunction<String, String, String>(){
            @Override
            public String apply(String s, String s2) {
                return s+"-----"+s2;
            }
        });
        String s = future3.get();
        System.out.println(s);//你好,周先生-----你好,毛先生

而runAfterBoth()方法不关注每一步任务的输入参数和输出参数,thenAcceptBoth()中第三个任务接收第一个和第二个任务的结果,但是不返回结果。

异步任务的选择执行

若异步任务的选择执行不是按照某种条件进行选择的,而按照执行速度进行选择的:前面两并行任务,谁的结果返回速度快,其结果将作为第三步任务的输入。对两个异步任务的选择可以通过CompletionStage接口的applyToEither()、acceptEither()等方法来实现。
applyToEither()

	//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数
	public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(null, other, fn);
    }
	//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数,不一定在同一个线程中执行fn回调函数
    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(asyncPool, other, fn);
    }
	//和其他任务进行速度比较,最快返回的结果用于执行fn回调函数,在指定线程池执行fn回调函数
    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn,
        Executor executor) {
        return orApplyStage(screenExecutor(executor), other, fn);
    }

案例:

		CompletableFuture<String> future1 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "你好,周先生";
        });
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(()->{
            System.out.println(Thread.currentThread().getId());//12
            return "你好,毛先生";
        });
        CompletableFuture<String> future3 = future1.applyToEither(future2, new Function<String, String>(){
            @Override
            public String apply(String s) {
                return s;
            }
        });
        String s = future3.get();
        System.out.println(s);//你好,毛先生

文章来源:https://blog.csdn.net/weixin_42592415/article/details/134897319
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。