CompletableFuture原理解析

2023-12-15 19:11:21

一、 Callable、Future介绍

1. 简介

Future 是用于表示异步计算结果的接口。它提供了一种在计算完成时获取结果的机制,以及检查计算是否完成的方法。Callable 是一个代表有返回值任务的接口。与 Runnable 不同,Callable 的 call 方法可以返回一个值,并且可以抛出异常。

import java.util.concurrent.*;

public class FutureExample {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //创建一个单一线程池
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        //将任务提交给线程池
        Future<String> future = executorService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(2000);
                return "Hello, Future!";
            }
        });

        // 在这里可以执行其他操作,不会阻塞
        // 获取未来对象的结果(这是一个阻塞调用,直到结果准备好)
        String result = future.get();
        System.out.println(result);
        // 关闭ExecutorService
        executorService.shutdown();
    }
}

在这里插入图片描述

2. 底层原理

首先我们来看一下Future源码:它是一个接口,定义了泛型V,表示返回值类型

public interface Future<V> {
    //尝试取消此任务的执行。如果任务已完成、已取消或由于某些其他原因无法取消,则此尝试将失败。如果成功,并且在调用 cancel 时此任务尚未启动,则此任务不应运行。
    //如果任务已启动,则 mayInterruptIfRunning 参数确定是否应中断执行此任务的线程以尝试停止任务。此方法返回后,对 isDone 的后续调用将始终返回 true。
    //如果此方法返回 true,则对 isCancelled 的后续调用将始终返回 true。
	boolean cancel(boolean mayInterruptIfRunning);
	//判断任务是否被取消了
	boolean isCancelled();
	//判断任务是否已经完成
	boolean isDone();
	//获取任务执行结果,如果任务还没有执行完成,这个方法会在调用线程处阻塞
	V get() throws InterruptedException, ExecutionException;
	//有时间期限的等待任务结果执行
	V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

然后看一下Callable源码,它是一个函数式接口(可以使用Lamda表达式实现),它只有一个方法call,表示对方法的调用,V表示任务的返回值类型

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Callable 是一种表示带返回值任务的接口。相比之下,Runnable 接口用于表示没有返回值的任务。通过使用 Callable,你可以在任务执行完毕后得到一个结果。Callable 的 call 方法允许抛出异常,这样你可以更好地处理任务执行过程中的异常情况。Future 接口表示一个异步计算的结果。通过 Future,你可以在任务执行过程中执行其他操作,然后在需要结果时检索计算的结果。Future 提供了 cancel 方法,允许取消任务的执行。这在某些情况下是很有用的,比如任务执行时间过长,或者在某些条件下不再需要计算结果。 Future 的 get 方法是一个阻塞调用,它会等待任务执行完毕并返回结果。这允许程序在需要计算结果时等待任务完成。

在这里插入图片描述

二、 FutureTask介绍

1. 简介

FutureTask 是 Java 中 Future 接口的一个具体实现,它同时也是 Runnable 接口的实现。FutureTask 可以用来包装一个 Callable 或 Runnable 对象,使其具备异步获取任务结果的能力。Future实际采用FutureTask实现,该对象相当于是消费者和生产者的桥梁,消费者通过 FutureTask 存储任务的处理结果,更新任务的状态:未开始、正在处理、已完成等。而生产者拿到的 FutureTask 被转型为 Future 接口,可以阻塞式获取任务的处理结果,非阻塞式获取任务处理状态。 FutureTask既可以被当做Runnable来执行,也可以被当做Future来获取Callable的返回结果。

 public static void main(String[] args) throws ClassNotFoundException, ExecutionException, InterruptedException {
        // 创建 Callable 对象
        Callable<Integer> callable = () -> {
            // 模拟一个耗时的任务
            Thread.sleep(2000);
            return 42;
        };
        // 创建 FutureTask 对象,将 Callable 对象传入
        FutureTask<Integer> futureTask = new FutureTask<>(callable);
        // 创建线程池
        ExecutorService executor = Executors.newFixedThreadPool(1);
        // 提交 FutureTask 到线程池
        executor.submit(futureTask);
        // 在主线程中进行其他操作
        try {
            System.out.println("主线程正在执行其他任务...");

            // 获取任务的执行结果,如果任务还未完成,则会阻塞主线程
            Integer result = futureTask.get();

            System.out.println("任务执行结果:" + result);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        } finally {
            // 关闭线程池
            executor.shutdown();
        }

    }

在这里插入图片描述

2. 底层原理

首先FutureTask的类继承体系如下图:
在这里插入图片描述

首先查看RunableFuture接口源码

public interface RunnableFuture<V> extends Runnable, Future<V> {
    //将此 Future 设置为其计算结果,除非它已被取消。
    void run();
}

然后仔细分析一下FutureTask的源码

public class FutureTask<V> implements RunnableFuture<V> {
    //用于定义任务的状态
	private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
	//封装任务
	private Callable<V> callable;
	//任务的返回结果
	private Object outcome;
	//执行任务的线程
	private volatile Thread runner;
	//等待执行任务的线程堆栈
	private volatile WaitNode waiters;
    //返回任务的执行结果
	private V report(int s) throws ExecutionException {
	    //获得任务执行的结果
        Object x = outcome;
        if (s == NORMAL)
        	//如果任务的状态NORMAL,则正常返回结果
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }
    //构造函数
    public FutureTask(Runnable runnable, V result) {
        //Executors.callable 是一个静态工厂方法,用于创建一个 Callable 对象,该对象可以执行给定的 Runnable 并返回指定的结果。
        //在这个特定的情况下,runnable 是传入的 Runnable 对象,而 result 是传入的结果值。
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // 设置任务的状态为New
    }
    //判断任务是否被取消
    public boolean isCancelled() {
        return state >= CANCELLED;
    }
    //判断任务是否完成
    public boolean isDone() {
        return state != NEW;
    }
    //取消任务,做了两件事,分别是设置任务的状态为INTERRUPTING或CANCELLED,然后中断执行该任务的线程
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
                  //如果任务的状态不为NEW,然后通过CAS操作根据mayInterruptIfRunning设置状态设置为INTERRUPTING或CANCELLED是否成功
            return false;
        try {   
            if (mayInterruptIfRunning) {
                try {
                    //获取当前执行该任务的线程
                    Thread t = runner;
                    if (t != null)//然后中断该线程
                        t.interrupt();
                } finally {
                	//设置当前任务的状态为中断
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }
    //获取任务的执行结果
    public V get() throws InterruptedException, ExecutionException {
        //获得当前任务的状态
        int s = state;
        if (s <= COMPLETING)
            //等待任务执行完成
            s = awaitDone(false, 0L);
        //返回任务的执行结果
        return report(s);
    }
    //有等待时间的获得任务结果
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }
    //将此 future 的结果设置为给定值,除非此 future 已被设置或已取消。成功完成计算后,run 方法在内部调用此方法。
	 protected void set(V v) {
	     //尝试将任务状态设置为完成状态
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            //目的是将 FutureTask 对象的 state 字段设置为 NORMAL,表示任务正常完成。
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            //完成任务的最终状态
            finishCompletion();
        }
    }
    //设置任务执行异常的方法
     protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    //该方法是实际执行任务的逻辑
    public void run() {
    //这个条件检查确保只有当任务的状态为 NEW(即尚未执行)且runner字段为 null(表示没有其他线程正在执行这个任务)时,当前线程才能进入执行任务的逻辑。
   //如果条件不满足,说明任务已经被其他线程执行过或者正在被执行,当前线程直接返回,不执行任务。
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            //获取要执行的任务
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //调用Callable的call方法
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    //出现异常处理异常
                    setException(ex);
                }
                if (ran)
                //设置返回结果
                    set(result);
            }
        } finally { 
            //当前执行线程置换为空
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    //等待执行任务线程的节点
    static final class WaitNode {
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode() { thread = Thread.currentThread(); }
    }
    private void finishCompletion() {
        // assert state > COMPLETING;
        //使用 waiters 字段作为等待队列的头节点。这里使用循环,以确保在多线程情况下正确地处理等待队列。
        for (WaitNode q; (q = waiters) != null;) {
            //使用 compareAndSwapObject 操作,将 waiters 字段置为 null。这是一个原子性的操作,确保只有一个线程能够成功将 waiters 字段置为 null,从而获取到等待队列的控制权。
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    //获取线程
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        //唤醒线程
                        LockSupport.unpark(t);
                    }
                   //获取下一个线程
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }
        done();
        //任务执行完毕
        callable = null;        // to reduce footprint
    }
    //当有其他线程调用 FutureTask 的 get() 方法等待任务的结果时,如果任务还未完成,该线程会进入等待状态,加入等待队列。等待队列的维护可以确保等待线程能够在任务完成时被正确地唤醒。如果任务在执行过程中被取消,调用 cancel(true) 方法,这会导致等待线程被唤醒。在 finishCompletion() 中,会遍历等待队列,将其中的线程全部唤醒,以确保在任务被取消时等待线程能够被正确唤醒。
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
        //如果当前线程被中断,从阻塞队列中移除
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    //当前执行线程设置为空
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                //阻塞当前线程
                LockSupport.parkNanos(this, nanos);
            }
            else
                //阻塞当前线程
                LockSupport.park(this);
        }
    }
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    if (q.thread != null)
                        pred = q;
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null) // check for race
                            continue retry;
                    }
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                          q, s))
                        continue retry;
                }
                break;
            }
        }
    }
}

上面就是Future的核心源码,下面对一些关键点进行总结:

  • 工作原理图:

在这里插入图片描述

  • FutureTask 内部使用 volatile 的 state 字段来表示任务的状态,有多个状态,如 NEW、COMPLETING、NORMAL、EXCEPTIONAL、CANCELLED、INTERRUPTING、INTERRUPTED。状态转换通过 CAS 操作来保证线程安全。
  • 为了支持多线程环境下等待任务完成的线程,FutureTask 使用了 WaitNode 类来构建等待队列。等待队列的头结点是 waiters 字段,它是一个链表结构,每个结点包含一个等待的线程。
  • 在 run() 方法中,通过 CAS 操作将当前线程设置为执行任务的线程,并在任务执行完成后将 runner 字段置为 null,以确保任务执行的线程在最后正确退出。通过 LockSupport.unpark(t) 唤醒等待的线程。
  • finishCompletion() 方法负责唤醒等待队列中的线程。使用 CAS 操作将等待队列头结点置为 null,然后遍历链表唤醒每个等待的线程。
  • cancel(boolean mayInterruptIfRunning) 方法用于取消任务的执行,通过 CAS 操作将状态置为 CANCELLED 或 INTERRUPTING,并中断执行任务的线程。
  • 当任务执行过程中抛出异常时,通过 setException(Throwable t) 方法设置任务的状态为异常,并保存异常信息。

FutureTask 在 Java 并发编程中扮演了重要的角色,提供了异步执行任务并获取执行结果的机制。其内部涉及状态管理、等待队列、线程的安全退出、等待线程的唤醒、取消任务、异常处理等核心概念,这些概念构成了 FutureTask 的工作原理。在实际应用中,FutureTask 常用于多线程环境下的任务执行和结果获取。

三、CompletionService

1. 简介

当 for 循环批量获取 Future 的结果时容易 block,get 方法调用时应使用 timeout 限制。Future 的生命周期不能后退。一旦完成了任务,它就永久停在了“已完成”的状态,不能从头再来。从本质上说,Future表示一个异步计算的结果。它提供了isDone()来检测计算是否已经完成, 并且在计算结束后,可以通过get()方法来获取计算结果。在异步计算中,Future确实是个非常优秀的接口。但是,它的本身也确实存在着许多限制:

  • 并发执行多任务:Future只提供了get()方法来获取结果,并且是阻塞的。所以,除了等待你别无他法;
  • 无法对多个任务进行链式调用:如果你希望在计算任务完成后执行特定动作,比如发邮件,但Future却没有提供这样的能力;
  • 无法组合多个任务:如果你运行了10个任务,并期望在它们全部执行结束后执行特定动作,那么在Future中这是无能为力的;
  • 没有异常处理:Future接口中没有关于异常处理的方法;

Callable+Future 可以实现多个task并行执行,但是如果遇到前面的task执行较慢时需要阻塞等待前面的task执行完后面task才能取得结果。而CompletionService的主要功能就是一边生成任务,一边获取任务的返回值。让两件事分开执行,任务之间不会互相阻塞,可以实现先执行完的先取结果,不再依赖任务顺序了。

2. 原理

CompletionService 是 Java 中 Executor 框架的一部分,用于处理一组异步任务的结果。它允许你提交一组任务,并按照它们完成的顺序获取结果。CompletionService 接口的实现类通常是 ExecutorCompletionService。
CompletionService内部通过阻塞队列+FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后
顺序排序,内部有一个先进先出的阻塞队列,用于保存已经执行完成的Future,通过调用它的 take方法或poll方法可以获取到一个已经执行完成的Future,进而通过调用Future接口实现类的get方法获取最终的结果。

import java.util.concurrent.*;

public class CompletionServiceExample {

    public static void main(String[] args) {
        // 创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        // 创建 CompletionService
        CompletionService<Integer> completionService = new ExecutorCompletionService<>(executorService);

        // 提交一组任务
        for (int i = 0; i < 10; i++) {
            final int taskId = i;
            completionService.submit(() -> {
                // 模拟耗时任务
                Thread.sleep(1000);
                System.out.println("Task " + taskId + " completed");
                return taskId;
            });
        }

        // 按照任务完成的顺序获取结果
        for (int i = 0; i < 10; i++) {
            try {
                Future<Integer> result = completionService.take(); // 阻塞,直到有任务完成
                int taskId = result.get();
                System.out.println("Result for Task " + taskId + ": " + taskId);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }

        // 关闭线程池
        executorService.shutdown();
    }
}

在这里插入图片描述

3. 源码分析

CompletionService源码如下:

public interface CompletionService<V> {
    //提交一个返回结果的任务给执行者,并返回一个表示任务执行结果的Future对象。
    Future<V> submit(Callable<V> task);
    //提交一个可运行的任务给执行者,并返回一个表示任务执行结果的Future对象。这个方法可以指定任务成功执行后返回的结果。
    Future<V> submit(Runnable task, V result);
    //从 CompletionService 中获取并移除下一个已完成的任务的Future对象,如果没有已完成的任务,则会一直等待。返回一个 Future 对象,表示下一个已完成的任务的执行结果。
    Future<V> take() throws InterruptedException;
    //从 CompletionService 中获取并移除下一个已完成的任务的Future 对象,如果没有已完成的任务,则立即返回 null。
    Future<V> poll();
    //从 CompletionService 中获取并移除下一个已完成的任务的 Future 对象,如果没有已完成的任务,则在指定的等待时间内等待任务完成。
    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

让我们再看看它的实现类ExecutorCompletionService

public class ExecutorCompletionService<V> implements CompletionService<V> {
	//Executor 接口的主要作用是解耦任务提交和任务执行的细节,它抽象了任务的执行环境,将任务的提交和执行解耦开来,使得开发者更加专注于任务本身的逻辑。
	//Executor 接口的实现类通常是线程池,它提供了一种池化线程的机制,可以有效地重用线程,减少线程的创建和销毁的开销,同时可以对并发任务进行有效的管理。
    private final Executor executor;
	//AbstractExecutorService 是 ExecutorService 接口的一个抽象实现,它提供了一些默认的实现,简化了实现 ExecutorService 接口的类的任务。
	//ExecutorService 是 Executor 接口的子接口,它进一步扩展了线程池的能力,提供了更多对任务生命周期的控制和对任务执行结果的获取。
    private final AbstractExecutorService aes;
    //队列用于存放任务的执行结果
    private final BlockingQueue<Future<V>> completionQueue;
    //内部类,它继承了FutureTask任务
    private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        //如果任务完成了就将任务加入到执行完的队列中(任务完成时调用)任务执行完成会调用done,FutureTask源码可以看出来
        protected void done() { completionQueue.add(task); }
        //指的是当前惹怒
        private final Future<V> task;
    }
    //用于根据给定的Callable (Runnable, V) 对象创建一个 RunnableFuture<V> 实例。如果aes不为null,则调用 aes.newTaskFor 方法创建任务
    private RunnableFuture<V> newTaskFor(Callable<V> task) {
        if (aes == null)
            return new FutureTask<V>(task);
        else
            return aes.newTaskFor(task);
    }
    //使用Runnable创建一个任务,result用来保存结果
    private RunnableFuture<V> newTaskFor(Runnable task, V result) {
        //如果aes为空,直接创建一个FutureTask
        if (aes == null)
            return new FutureTask<V>(task, result);
        else
        //否则直接使用aes创建一个任务
            return aes.newTaskFor(task, result);
    }
    //构造函数
    public ExecutorCompletionService(Executor executor) {
        //如果executor为空直接抛出异常
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        //executor类型是否是AbstractExecutorService,如果是就直接赋值给this.aes
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        //创建一个任务结果队列,默认是LinkedBlockingQueue
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }
    //构造函数,提供了阻塞队列
    public ExecutorCompletionService(Executor executor,
                                     BlockingQueue<Future<V>> completionQueue) {
        //如果executor和completionQueue为空抛出异常
        if (executor == null || completionQueue == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = completionQueue;
    }
    //提交任务
    public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        //创建一个新任务
        RunnableFuture<V> f = newTaskFor(task);
        //执行任务
        executor.execute(new QueueingFuture(f));
        return f;
    }
    //提交任务
    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        //创建一个新任务
        RunnableFuture<V> f = newTaskFor(task, result);
        //执行任务
        executor.execute(new QueueingFuture(f));
        return f;
    }
    //获取任务
    public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }
    //取出任务
    public Future<V> poll() {
        return completionQueue.poll();
    }
    //有时间等待任务弹出
    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

}

4. 总结

  • 当需要批量提交异步任务的时候建议你使用CompletionService。CompletionService将线程池Executor和阻塞队列BlockingQueue的功能融合在了一 起,能够让批量异步任务的管理更简单。
  • CompletionService能够让异步任务的执行结果有序化。先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以 快速实现诸如Forking Cluster这样的需求。
  • 线程池隔离,CompletionService支持自己创建线程池,这种隔离性能避免几个特 别耗时的任务拖垮整个应用的风险。

四、CompletableFuture

1. 简介

CompletableFuture 是 Java 8 引入的一个类,位于 java.util.concurrent 包下,用于支持异步编程和构建异步操作的工具类。它的出现主要是为了简化异步编程的复杂性,提供更直观、灵活和强大的方式来处理异步操作,以及更好地支持函数式编程风格。CompletableFuture是Future接口的扩展和增强。CompletableFuture实现了Future接口,并在此基础上进行了丰富地扩展,完美地弥补了Future上述的种种问题。更为重要的是, CompletableFuture实现了对任务的编排能力。借助这项能力,我们可以轻松地组织不同任务的运行顺序、规则以及方式。从某种程度上说,这项能力是它的核心能力。而在以往,虽然通过CountDownLatch等工具类也可以实现任务的编排,但需要复杂的逻辑处理,不仅耗费精力且难以维护。该类有如下特点:

  • 异步操作和组合:CompletableFuture 提供了一系列方法,支持异步操作的执行和组合。这使得你可以轻松地创建异步任务链,通过组合不同的操作,构建复杂的异步流程。
  • 非阻塞的回调风格:通过 thenApply, thenAccept, thenRun 等方法,可以以非阻塞的方式定义回调,以处理异步操作的结果。
  • 异常处理:提供了异常处理的机制,可以通过 exceptionally 或 handle 方法来处理异步任务中的异常。
  • 组合多个 CompletableFuture:通过 thenCombine, thenCompose, allOf, anyOf 等方法,可以将多个 CompletableFuture 组合在一起,实现更复杂的异步操作。
  • 超时处理:支持通过 completeOnTimeout 方法设置超时,当异步操作超时时,可以提供一个默认值
  • 取消操作:提供 cancel 方法,用于取消异步操作。
  • 异步执行器支持:可以指定异步操作使用的执行器(Executor),使得可以在指定的线程池中执行异步任务。

总的来说,CompletableFuture 的目的是为了提供一种更简单、更灵活、更符合函数式编程风格的方式来处理异步编程,解决传统异步编程模型中的一些问题,使得 Java 中的异步操作更加便利和强。

2. 案例

下面例子演示了 CompletableFuture 的链式操作、异常处理以及异步执行的特性。在实际应用中,CompletableFuture 还提供了许多其他方法,如 thenCompose、thenCombine、allOf、anyOf 等,以支持更复杂的异步编程场景。

public static void main(String[] args) {
        // 异步执行任务,并返回 CompletableFuture 对象,异步执行一个任务,该任务返回一个字符串 "Hello, "。
        CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task is running asynchronously");
            // 模拟耗时操作
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "Hello, ";
        });

        // 定义链式操作,对异步任务的结果进行处理。使用 thenApplyAsync 方法定义链式操作,将异步任务的结果与 "world!" 拼接。
        CompletableFuture<String> resultFuture = completableFuture.thenApplyAsync(result -> {
            System.out.println("Continue processing asynchronously");
            return result + "world!";
        });

        // 异常处理,使用 exceptionally 方法对异常进行处理,输出异常信息并返回一个默认值。
        CompletableFuture<String> exceptionHandledFuture = resultFuture.exceptionally(throwable -> {
            System.out.println("Exception occurred: " + throwable.getMessage());
            return "Handled Exception";
        });

        // 阻塞等待异步任务完成,并获取结果。使用 get 方法阻塞等待异步任务的完成,并获取最终的结果。
        try {
            String finalResult = exceptionHandledFuture.get();
            System.out.println(finalResult);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

在这里插入图片描述

3. 源码分析

CompletableFutur的类继承体系如下:
在这里插入图片描述
Future接口我们前面学习过,下面分析一下CompletionStage。CompletionStage 是 Java 8 中引入的接口,位于 java.util.concurrent 包下。它表示异步计算的阶段,提供了一系列用于组合异步操作的方法。CompletionStage 是 Future 的扩展,更加强大和灵活,支持更丰富的异步编程模型。

public interface CompletionStage<T> {
    //该方法接受一个函数 Function<? super T, ? extends U>,表示当前阶段计算完成后,将计算结果应用到该函数上,得到新的计算阶段。参数为一个函数式接口,返回值为CompletionStage
	public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
	//thenApplyAsync 方法表示在当前阶段计算完成后,异步地将计算结果应用到指定的函数上,生成一个新的 CompletionStage<U>,其中包含了函数应用后的结果。
	//这种异步执行的特性可以提高并发性能,避免阻塞当前线程。这对于处理需要长时间计算的任务或依赖外部资源的任务是非常有用的。
	 public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn);
     //允许在异步计算中使用指定的执行器来执行函数。通过指定执行器,可以更加灵活地控制异步计算的线程池,适应不同的并发场景,避免资源争用。这在需要更精细地控制异步操作执行环境的情况下非常有用。
     public <U> CompletionStage<U> thenApplyAsync
        (Function<? super T,? extends U> fn,
         Executor executor);
    //该方法接受一个消费者 Consumer<? super T>,表示当前阶段计算完成后,对计算结果执行消费操作。它返回一个 CompletionStage<Void>,表示没有计算结果的新阶段。
     public CompletionStage<Void> thenAccept(Consumer<? super T> action);
     //thenAcceptAsync 方法表示在当前阶段计算完成后,在指定的异步线程中执行操作,并且该操作不会返回有意义的结果值。这适用于需要执行副作用的异步操作,例如异步地更新状态或进行日志记录。
     public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
     public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,
                                                 Executor executor);
   //该方法表示在当前阶段计算完成后执行指定的 Runnable 操作。操作是在当前线程中同步执行的,不会启动新的线程。返回的 CompletionStage 表示不包含有意义的结果值,仅表示操作完成。
     public CompletionStage<Void> thenRun(Runnable action);
   //该方法表示在当前阶段计算完成后异步执行指定的 Runnable 操作。操作将在默认的异步执行器中执行,通常会使用线程池中的一个线程。返回的 CompletionStage 也表示不包含有意义的结果值,仅表示异步操作完成。
     public CompletionStage<Void> thenRunAsync(Runnable action);
     public CompletionStage<Void> thenRunAsync(Runnable action,
                                              Executor executor);
     
     public <U,V> CompletionStage<V> thenCombine
        (CompletionStage<? extends U> other,
         BiFunction<? super T,? super U,? extends V> fn);
     ......
}

上面源码还是有些晦涩难懂,下面以通俗的语言讲解各个函数的作用:

  • thenApply 方法:

作用: 当前阶段计算完成后,将计算结果应用到指定的函数上,生成一个新的 CompletionStage。 就像是你拿到了一份数据报告,然后你想对这份报告进行一些处理,thenApply 就像是给你一个处理报告的工具,处理完成后生成新的报告。

  • thenAccept 方法:

作用: 当前阶段计算完成后,执行指定的消费操作,不返回结果。就像你拿到了一份数据报告,但你并不需要生成新的报告,只是想看一下或者做一些简单的处理,thenAccept 就像是你在看报告的过程中执行了一些操作。

  • thenRun 方法:

作用: 当前阶段计算完成后,执行指定的操作,不返回结果。就像你拿到了一份数据报告,但不需要对报告进行处理,只是想在处理完成后执行一些额外的操作,比如通知某人,thenRun 就像是在处理报告的过程中执行了一些附加的操作。

  • thenCompose 方法:

作用: 将两个 CompletionStage 进行组合,当前阶段计算完成后,将计算结果传递给指定函数,生成一个新的 CompletionStage。就像你有两份报告,thenCompose 就像是帮你整合这两份报告,整合后生成一份新的报告。

  • exceptionally 方法:

作用: 处理当前阶段计算过程中的异常,返回一个新的 CompletionStage。就像在处理报告的过程中可能会出现一些问题,exceptionally 就像是为你准备了一个备选方案,当处理出错时执行这个备选方案。

  • handle 方法:

作用: 无论计算是否正常完成,都会执行指定函数处理结果,返回一个新的 CompletionStage。就像无论你处理报告的过程中是否遇到问题,handle 都会给你一个机会,在处理完成后执行一些额外的操作。

如果方法名中带有 Async 后缀,通常表示该方法会异步执行操作,即在一个新的线程或线程池中执行相应的计算。这样的方法允许在异步环境中执行操作,提高并发性能,避免阻塞当前线程。例如thenApplyAsync,就像你拿到了一份数据报告,但想在另外一个线程中处理这份报告,thenApplyAsync 就像是给你一个异步处理报告的工具。

分析完CompletionStage源码后,下面就开始分析CompletableFuture的源码:

public class CompletableFuture<T> implements Future<T>, CompletionStage<T> {
    //用于存放任务异步执行结果
	volatile Object result;
	//该字段用于维护异步计算的依赖关系栈      
    volatile Completion stack;   
    //将异步执行结果result指定为值r
    final boolean internalComplete(Object r) { // CAS from null to r
        return UNSAFE.compareAndSwapObject(this, RESULT, null, r);
    } 
    //cas更新依赖关系栈
    final boolean casStack(Completion cmp, Completion val) {
        return UNSAFE.compareAndSwapObject(this, STACK, cmp, val);
    }
    //用于尝试将一个 Completion 对象推入依赖关系栈。如果栈顶元素为h,cas更新h为栈顶
	 final boolean tryPushStack(Completion c) {
        Completion h = stack;
        lazySetNext(c, h);
        return UNSAFE.compareAndSwapObject(this, STACK, h, c);
    }
    //通过无参构造函数创建一个空白的CompletableFuture实例。
     public CompletableFuture() {
    }
    //用于完成计算,将结果设置为指定的值。
    public boolean complete(T value) {
        boolean triggered = completeValue(value);
        postComplete();
        return triggered;
    }
    //用于完成计算,将结果设置为异常。允许将一个异常传递给等待此计算完成的所有线程。
    public boolean completeExceptionally(Throwable ex) {
        if (ex == null) throw new NullPointerException();
        boolean triggered = internalComplete(new AltResult(ex));
        postComplete();
        return triggered;
    }
    //下面两个操作是异步计算操作
    //返回一个新的 CompletableFuture,在异步线程中应用指定的函数。
    public CompletableFuture<T> thenApplyAsync(Function<? super T, ? extends U> fn) { /* ... */ }
    //返回一个新的 CompletableFuture,在异步线程中执行指定的操作。
    public CompletableFuture<Void> thenRunAsync(Runnable action) { /* ... */ }
    //下面两个操作是组合操作
    //返回一个新的 CompletableFuture,在异步线程中将计算结果传递给指定的函数,生成新的 CompletionStage。
    public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) { /* ... */ }
    //返回一个新的 CompletableFuture,用于处理异常情况,并提供替代的计算结果
    public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn) { /* ... */ }
    //组合多个 CompletableFuture
    //返回一个新的 CompletableFuture,它在所有输入的 CompletableFuture 完成后才完成。
    public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { /* ... */ }
    //返回一个新的 CompletableFuture,它在任何一个输入的 CompletableFuture 完成后就完成
    public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { /* ... */ }

}

CompletableFuture 的实现基于 java.util.concurrent.ConcurrentLinkedQueue,使用了一些原子操作和 CAS 操作来保证线程安全性。内部维护了一个 volatile 的 result 字段,用于存储计算的结果或异常。使用了一些 synchronized 块和 Lock 接口,以及 Unsafe 类来进行底层的操作。总体来说,CompletableFuture 类提供了强大的异步编程工具,可以方便地处理异步计算、结果组合、异常处理等场景,帮助开发者更灵活地进行并发编程。

文章来源:https://blog.csdn.net/qq_43456605/article/details/135008043
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。