Java并发编程深度解析:掌握CompletableFuture的高效异步模式

2023-12-31 15:41:11

引言

在现代Java应用开发中,处理异步操作和并发任务是一个常见且关键的挑战。自Java 8引入CompletableFuture以来,它成为了处理异步编程的强大工具,每个Java开发者都应该理解并会使用,如果没用过或是不会用,那就太不像话了😂😂😂,记得点赞,收藏,1小时即可学会,应用在工作中的任意场景!本文将深入探CompletableFuture的功能和使用方法,以及如何利用它来简化复杂的并发任务

目录

  • CompletableFuture简介
  • 高级功能和方法使用介
  • 异步操作的原理介绍
  • 结合流(Stream)API使用
  • 实际应用场景
  • 性能考虑和最佳实践
  • 写在最后

1.CompletableFuture简介

CompletableFuture是Java 8中引入的一个类,实现了Future接口并提供了更多的灵活性和控制能力。与传统的Future相比,CompletableFuture支持更复杂的操作组合,异常处理,以及函数式编程风格的方法链

2.高级功能和方法使用介绍

CompletableFuture在Java中提供了一种强大的机制来处理异步编程。它不仅使代码更加简洁,还提高了程序的可读性和可维护性。以下是CompletableFuture的一些高级功能和使用方法,涵盖了创建、操作、组合异步任务,以及异常处理等方面

①创建CompletableFuture

CompletableFuture可以通过多种方式创建。最常见的方法是使用supplyAsync,该方法接收一个Supplier接口,并返回一个CompletableFuture对象,该对象将异步执行提供的任务

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟耗时的异步任务
    try {
        TimeUnit.SECONDS.sleep(2);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "异步任务的结果";
});
②异步任务的完成和结果获取

完成异步任务后,需要获取结果时。CompletableFuture提供了get方法来阻塞当前线程,直到结果可用,注意:该方法是同步阻塞的

try {
    String result = future.get(); // 阻塞直到结果可用
    System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}
③异步操作的链接

CompletableFuture支持多种方式来链接异步操作。例如,thenApply方法允许你对异步操作的结果进行处理,并返回一个新的CompletableFuture

CompletableFuture<String> greetingFuture = future.thenApply(
				name -> "Hello, " + name);

④组合多个异步操作

可以组合多个异步操作,无论异步任务之间是否相互依赖。thenCombine和thenCompose是实现组合常用的两种方法

CompletableFuture<String> anotherFuture = CompletableFuture.supplyAsync(
				() -> "异步任务2");
CompletableFuture<String> combinedFuture = future.thenCombine(
		anotherFuture, (result1, result2) -> result1 + " 和 " + result2);
⑤异常处理

CompletableFuture提供了exceptionally方法来处理异常情况

CompletableFuture<String> exceptionHandlingFuture = future.exceptionally(
ex -> "发生错误: " + ex.getMessage());
⑥运行后操作

除了处理返回值,还可以在异步操作完成后执行一些操作,如thenAccept和thenRun

future.thenAccept(result -> System.out.println("结果: " + result));
future.thenRun(() -> System.out.println("异步任务完成"));
⑦allOf等待多个CompletableFuture

allOf方法用于等待一组CompletableFuture任务全部完成,该方法同时会返回一个新的CompletableFuture,该CompletableFuture在所有给定的CompletableFuture对象完成后完成

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(
					() -> "任务1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(
					() -> "任务2");
CompletableFuture<Void> allFutures = CompletableFuture.allOf(future1, future2);

// 等待所有Future完成
allFutures.join();
⑧anyOf在多个CompletableFuture中等待任意一个完成

allOf相对的是anyOf方法,它在给定的多个CompletableFuture中任意一个完成时就完成。非常适用于执行多个相似任务并只需要其中一个结果的场景

CompletableFuture<String> future3 = CompletableFuture.supplyAsync(
					() -> "任务3");
CompletableFuture<String> future4 = CompletableFuture.supplyAsync(
					() -> "任务4");
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(future3, future4);

// 获取首个完成任务的结果
String result = (String) anyFuture.join();
⑨超时处理

在CompletableFuture中处理超时是一个非常基本的需求。在Java 9之后引入了orTimeout和completeOnTimeout方法来简化超时处理,很多公司可能还在Java8(有点遗憾)

CompletableFuture<String> timeoutFuture = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(2); // 模拟长时间任务
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "完成任务";
}).orTimeout(1, TimeUnit.SECONDS); // 设置1秒超时

try {
    timeoutFuture.get();
} catch (TimeoutException e) {
    System.out.println("任务超时");
} catch (InterruptedException | ExecutionException e) {
    e.printStackTrace();
}

3.异步操作的原理介绍

CompletableFuture在Java中提供了一种强大的异步编程机制。要理解其工作原理,需要深入了解它是如何在Java的并发框架中实现异步操作的

①异步执行机制

CompletableFuture的异步操作依赖于Java的Executor框架。创建一个CompletableFuture时,可以指定一个Executor来执行异步任务。如果没有指定,将使用默认的ForkJoinPool,这是Java 7中引入的一个轻量级并行框架

Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 异步任务代码
}, executor);

在这个示例中,异步任务将在指定的线程池中执行,而不是在默认的ForkJoinPool中

②异步任务的调度

CompletableFuture的异步任务调度是基于事件驱动的。当一个异步任务完成时,它会触发事件,这些事件可以用来触发更多的异步操作,如thenApply, thenAccept等方法定义的操作,其调度的流程图如下:
CompletableFuture异步任务调度流程图
具体的步骤详解:

  • 任务提交:当一个CompletableFuture任务被提交到Executor时,开始异步执行
  • 任务执行:任务在Executor提供的线程中异步执行。执行过程是非阻塞的,并且允许其他任务并行执行
  • 完成事件:任务无论是正常完成还是异常终止,都会触发一个“完成事件”
  • 回调触发:根据CompletableFuture的链式调用,这个完成事件会触发后续的回调操作,比如thenApply, thenAcceptexceptionally
  • 结果传递:每个回调函数都可以接收到前一个任务的结果,并根据这个结果执行操作,然后将新的结果传递到链中的下一个回调
  • 链式处理:这个过程在CompletableFuture链中持续进行,直到所有相关的异步操作和回调都被执行
③完成机制和回调

CompletableFuture提供了一系列方法来显式地完成一个Future或者在Future完成时触发回调。例如,complete方法可以用来手动设置Future的结果,而thenApply等方法则在Future完成时应用一个函数:

CompletableFuture<String> future = new CompletableFuture<>();
future.thenApply(result -> result.toUpperCase()) // 在Future完成时应用函数
      .thenAccept(result -> System.out.println(result)); // 最终处理结果

future.complete("async result"); // 手动完成Future
④异常处理

在异步编程中,异常处理非常重要。CompletableFuture通过exceptionally方法提供了一种优雅的方式来处理异步操作中的异常。这样就可以在链式调用中直接处理异常,而不需要在获取Future结果的时候

4.结合流(Stream)API使用

CompletableFuture与Java 8引入的流(Stream)API结合使用,可以提供强大的数据处理能力,特别是在处理异步操作和集合时。以下是一些如何将CompletableFuture与流API结合使用的示例

①异步操作的流处理

可以使用流API来处理CompletableFuture的集合。例如,List<CompletableFuture<String>>,这样一个list,希望等待所有Future完成,然后处理所有结果

List<CompletableFuture<String>> futuresList = Arrays.asList(
    CompletableFuture.supplyAsync(() -> "Task 1"),
    CompletableFuture.supplyAsync(() -> "Task 2"),
    CompletableFuture.supplyAsync(() -> "Task 3")
);

CompletableFuture<Void> allFutures = CompletableFuture.allOf(
    futuresList.toArray(new CompletableFuture[0])
);

CompletableFuture<List<String>> allFuturesResult = allFutures.thenApply(v ->
    futuresList.stream()
        .map(CompletableFuture::join)
        .collect(Collectors.toList())
);

这个例子中,allOf用于等待所有Future完成,然后使用流API来收集结果

②流中的异步操作

还可以在流的处理过程中使用CompletableFuture来执行异步操作。例如,可以将一个对象的流转换为异步操作的流

Copy code
List<String> messages = Arrays.asList("Hello", "World", "Async", "Stream");
List<CompletableFuture<String>> futures = messages.stream()
    .map(msg -> CompletableFuture.supplyAsync(() -> msg.toUpperCase()))
    .collect(Collectors.toList());

这个例子中,每个字符串都通过一个异步操作转换为大写

③流操作的异步组合

使用CompletableFuture的thenCompose方法,可以在流中异步地组合多个操作

List<String> urls = Arrays.asList("http://example.com", "http://example.org", "http://example.net");
CompletableFuture<List<String>> contentsFuture = urls.stream()
    .map(url -> CompletableFuture.supplyAsync(() -> downloadContent(url)))
    .reduce(CompletableFuture.completedFuture(new ArrayList<>()),
        (acc, future) -> acc.thenCompose(list -> future.thenApply(
        content -> {
            list.add(content);
            return list;
        }))
    );

将CompletableFuture与流API结合使用,可以在处理集合和异步操作时提供更高的效率。这种结合使得编写并行和异步代码更加简洁和直观,特别是在需要处理大量数据的时候

5.实际应用场景

CompletableFuture作为Java中的一个强大工具,可以在多种实际应用场景中发挥重要作用。以下是一些典型的使用场景,CompletableFuture如何解决实际工作当中的问题

①异步API调用

在现代微服务架构中,异步API调用变得越来越普遍。CompletableFuture可以用于优雅地处理这些异步请求

public CompletableFuture<User> getUserAsync(String userId) {
    return CompletableFuture.supplyAsync(() -> userService.getUserById(userId));
}

public CompletableFuture<Order> getOrderAsync(String orderId) {
    return CompletableFuture.supplyAsync(() -> orderService.getOrderById(orderId));
}

// 使用
CompletableFuture<User> userFuture = getUserAsync("user123");
CompletableFuture<Order> orderFuture = getOrderAsync("order456");

CompletableFuture.allOf(userFuture, orderFuture).thenRun(() -> {
    User user = userFuture.join();
    Order order = orderFuture.join();
    // 处理用户和订单信息
});

在这个例子中,并行地调用两个服务,并在两个调用都完成后处理结果

②数据库读写的并行化

在处理数据库操作时,CompletableFuture可以用来并行化查询,从而提高性能

public CompletableFuture<List<Product>> getProductsAsync() {
    return CompletableFuture.supplyAsync(() -> productRepository.findAll());
}

public CompletableFuture<List<Customer>> getCustomersAsync() {
    return CompletableFuture.supplyAsync(() -> customerRepository.findAll());
}

// 使用
CompletableFuture<List<Product>> productsFuture = getProductsAsync();
CompletableFuture<List<Customer>> customersFuture = getCustomersAsync();

productsFuture.thenCombine(customersFuture, (products, customers) -> {
    // 合并产品和客户数据
    return combineData(products, customers);
}).thenAccept(combinedData -> {
    // 处理合并后的数据
});

这个例子展示了如何并行执行两个数据库查询,并在两者都完成后处理数据

③超时降级处理

在执行长时间运行的操作时,超时控制是一个常见的工作情形,比如对方依赖三方服务,服务返回时长不稳定,通常需要超时降级处理。CompletableFuture提供了简单的超时处理机制,对于这种降级处理非常方便(java9)

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // 模拟长时间运行的任务
    try {
        TimeUnit.SECONDS.sleep(5);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "结果";
}).orTimeout(3, TimeUnit.SECONDS);

future.exceptionally(ex -> {
    // 处理超时,降级
    return "降级结果";
});

在这个例子中,如果任务在指定时间内未完成,将使用默认结果

④异步任务流水线

在一个复杂的业务流程中,我们可能需要将多个异步任务按特定顺序执行,形成一个异步任务流水线

public CompletableFuture<String> processOrder(String orderId) {
    return CompletableFuture.supplyAsync(() -> orderService.validateOrder(orderId))
        .thenCompose(validatedOrder -> CompletableFuture.supplyAsync(() -> paymentService.processPayment(validatedOrder)))
        .thenCompose(paymentConfirmation -> CompletableFuture.supplyAsync(() -> shippingService.scheduleDelivery(paymentConfirmation)));
}
// 使用
processOrder("12345").thenAccept(deliveryDetails -> {
    // 处理配送详情
});

在这个示例中,订单首先经过验证,然后处理支付,最后安排配送。每个步骤都是异步执行的,且后一个步骤依赖前一个步骤的结果

6.性能考虑和最佳实践

虽然CompletableFuture提供了强大的异步编程能力,但在使用时需要考虑性能影响,并遵循一些最佳实践以确保高效和可维护的代码。以下是一些关键的性能考虑和最佳实践

①避免过多的线程创建

CompletableFuture通常依赖于线程池来执行异步任务。过多的线程创建和销毁会增加开销,降低性能

  • 最佳实践:尽可能根据应用需求创建合适大小的自定义线程池,而不要使用默认的线程池,最好线程池也可以支持动态化及监控
②合理使用异步方法

CompletableFuture提供了许多异步方法,如thenApplyAsync。不过,不必要的异步化可能会导致性能下降和资源浪费

  • 最佳实践:仅在确实需要异步执行时使用异步方法。如果后续操作不是计算密集型或不依赖于I/O操作,考虑使用同步即。不要为了异步而异步,因为技术最终的目的都是为了提高性能,解决问题,而不是炫技,引入复杂性,这是程序员必须具备的思维意识
③注意异常处理

异常处理是异步编程中的一个重要方面。不恰当的异常处理可能会导致性能问题,例如,过多的异常创建和捕获

  • 最佳实践:使用exceptionallyhandle方法优雅地处理异常。避免在CompletableFuture的每个阶段都进行异常检查,不需要在外层再捕获异常
④避免阻塞调用

CompletableFuture的回调中使用阻塞调用(如Future.get())会降低性能,并可能导致死锁,因为Future.get()方法是同步阻塞的

  • 最佳实践:避免在CompletableFuture的回调链中使用阻塞调用。可以考虑使用thenCompose来处理依赖于其他CompletableFuture结果
⑤资源和任务管理

合理管理资源和任务对于保持CompletableFuture的性能至关重要,避免应异步任务时间过长导致系统因创建过多的线程而OOM,博主曾经都替组内小伙伴定位、解决个这个线上问题,当时是因为下游系统故障

  • 最佳实践:确保异步任务在完成时释放所有占用的资源。对于长时间运行的任务,必须考虑实现超时逻辑,比如结合RPC接口的超时时长,结合下游TP99的耗时,最大耗时等等

CompletableFuture是Java并发编程的强大工具,但需要谨慎使用。遵循这些最佳实践可以帮助你避免常见的陷阱,确保你的异步代码既高效又可维护

写在最后

CompletableFuture是Java并发编程工具箱中的一个强大工具。它不仅提供了处理异步任务的灵活性,还引入了许多功能,使得并发编程更加简洁和高效。通过本文的介绍,希望你能更好地理解和利用CompletableFuture来处理你的并发编程需求

如果你觉得这篇文章有用,请记得点赞和关注。我是程序员三毛,将继续分享更多编程知识和经验。感谢你的阅读和支持!

  • 程序员三毛

文章来源:https://blog.csdn.net/weixin_38522648/article/details/135208797
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。