【Java 并发】CyclicBarrier 介绍

2023-12-22 00:08:55

1 简介

在多线程编程中, 协调和同步线程的执行是至关重要的。Java 提供了许多并发工具来帮助开发人员有效地管理多线程应用程序。
其中之一是 CyclicBarrier, 它是一个强大的同步辅助类, 可用于在多个线程之间创建同步点, 以便它们可以在同一时间点协调执行某个任务。

CyclicBarrier 和 CountDownLatch 一样具有等待计数的功能, 但是相比于 CountDownLatch 功能更加强大。

其本身具备了以下特点

  1. 同步多个线程: 允许多个线程在达到共同的同步点之前进行等待, 然后同时开始执行
  2. 可重用性: 一旦所有等待线程达到同步点, CyclicBarrier 会重置计数器, 可以被重复使用。这使得它适用于循环的多阶段任务
  3. 自定义回调动作: 可以在所有线程达到同步点后执行自定义的回调动作, 以便在同步完成后执行一些特定的逻辑
  4. 指定等待时间: 提供了一个带有超时参数的 await() 方法, 允许等待一段指定的时间后, 即使没有足够的线程达到同步点, 也会继续执行
  5. 可检测是否被破坏: 提供的 isBroken() 方法允许检测在等待过程中是否有线程被中断, 以及是否有异常发生
  6. 内存一致性效果: 在所有线程达到同步点时, 对于之前的所有写入操作, 对于当前线程都是可以见的

为了理解 CyclicBarrier, 这里举一个通俗的例子。
开运动会时, 会有跑步这一项运动, 我们来模拟下运动员入场时的情况。
假设有 6 条跑道, 在比赛开始时, 就需要 6 个运动员在比赛开始的时候都站在起点了, 裁判员吹哨后才能开始跑步。跑道起点就相当于 “barrier”, 是临界点,
而这 6 个运动员就类比成线程的话, 就是这 6 个线程都必须到达指定点了, 意味着凑齐了一波, 然后才能继续执行, 否则每个线程都得阻塞等待, 直至凑齐一
波即可。cyclic 是循环的意思, 也就是说 CyclicBarrier 当多个线程凑齐了一波之后, 仍然有效, 可以继续凑齐下一波。CyclicBarrier的执行。

示意图如下:
Alt 'CyclicBarrier 效果图'

当多个线程都达到了指定点后, 才能继续往下继续执行。这就有点像报数的感觉, 假设 6 个线程就相当于 6 个运动员, 到赛道起点时会报数进行统计, 如果刚好是 6 的话, 这一波就凑齐了, 才能往下执行。
CyclicBarrier 在使用一次后, 可以进行重置, 继续当做计数器使用, 这是与 CountDownLatch 的区别之一

2 CyclicBarrier 的方法

上面说到的 6 个运动员, 6 个线程, 指的就是计数器的初始值 6, 可以通过 CyclicBarrier 的构造方法传入的

public class CyclicBarrier {

    public CyclicBarrier(int parties) {
        this(parties, null);
    }

    // barrierAction 指定了回调函数, 在维护的个数达到了 0 了, 就会执行这个函数
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0)
            throw new IllegalArgumentException();

        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }
}

CyclicBarrier 的主要方法

  1. await() throws InterruptedException, BrokenBarrierException : 等到所有的线程都到达指定的临界点
  2. await(long timeout, TimeUnit unit) : 与上面的 await 方法功能一致, 只不过这里有了时间限制, 调用该方法的线程等到指定的 timeout 时间后, 不管 N 是否减至为 0, 都会继续往下执行
  3. getNumberWaiting() : 获取当前距离线程执行还差多少值
  4. reset(): CyclicBarrier 状态重置, 重新开始
  5. isBroken(): 判断当前的 CyclicBarrier 是否为一个打破状态, 可以理解为当前的栅栏被破坏了, 无法继续使用了, 线程这时在调用 await 方法会抛出异常

CyclicBarrier 进入打破状态的场景

  1. 线程被设置了中断标识为 true 的情况下, 调用了 await 方法
  2. 调用了 await 带超时时间的方法, 超时了也会进入 break 状态
  3. 线程调用 await, 这时需要等待的线程数为 0 了, 执行回调函数, 这个过程失败了, 也会进入 break 状态
  4. 调用 reset 方法, 也会先设置为 break 状态, 在重试设置为 false

下面用一个具体的例子来说明 CyclicBarrier 的具体用法:

public class CyclicBarrierDemo {
 
    //指定必须有 6 个运动员到达才行, 同时搭配一个线程数达到了, 就执行的回调的函数
    private static CyclicBarrier barrier = new CyclicBarrier(6, () -> {
        System.out.println("所有运动员入场, 裁判员一声令下!!!!!");
    });


    public static void main(String[] args) {

        System.out.println("运动员准备进场, 全场欢呼............");

        ExecutorService service = Executors.newFixedThreadPool(6);

        for (int i = 0; i < 6; i++) {
            service.execute(() -> {

                try {

                    System.out.println(Thread.currentThread().getName() + " 运动员, 进场");
                    // 线程挂起, 得到计数器达到了 0 
                    barrier.await();

                    // 线程又被唤醒, 继续执行
                    System.out.println(Thread.currentThread().getName() + "  运动员出发");

                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

输出结果:

运动员准备进场, 全场欢呼............
pool-1-thread-2 运动员, 进场
pool-1-thread-1 运动员, 进场
pool-1-thread-3 运动员, 进场
pool-1-thread-4 运动员, 进场
pool-1-thread-5 运动员, 进场
pool-1-thread-6 运动员, 进场
所有运动员入场, 裁判员一声令下!!!!!
pool-1-thread-6  运动员出发
pool-1-thread-1  运动员出发
pool-1-thread-5  运动员出发
pool-1-thread-4  运动员出发
pool-1-thread-3  运动员出发
pool-1-thread-2  运动员出发

3 CyclicBarrier 的源码实现

CyclicBarrier 内部是通过 ReentrantLock 和 Condition 实现的
而 ReentrantLock 和 Condition 是基于 AQS 实现的, 所以还有 AQS 的基础, 就能很容易理解。

3.1 CyclicBarrier 中的 Generation

CyclicBarrier 内部定义了一个很简单的内部类 Generation

private static class Generation {
    
    Generation() {} 
    
    // 默认值为 false, 表示当前的栅栏是否为破坏状态, 也就是当前的 CyclicBarrier 已经遭到了破坏, 不适用了
    boolean broken;
}

3.2 CyclicBarrier 的 await 方法

CyclicBarrier 的 await 方法可以让线程进行等待所有线程的到齐, 一起执行, 没到齐的情况进行挂起的操作。

public class CyclicBarrier {

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            // 参数 1: 是否会超时
            // 参数 2: 0L 超时时间 0 纳秒, 即不超时
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe);
        }
    }

    private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException {

        final ReentrantLock lock = this.lock;
        // 获取可重入锁, 加锁, 加锁失败会被阻塞
        lock.lock();

        try {
        
            final Generation g = generation;

            // 当前的栅栏状态已经为破坏状态, 直接抛出异常
            if (g.broken)
                throw new BrokenBarrierException();

            // 线程是中断状态
            if (Thread.interrupted()) {
                // 直接将栅栏设置为破坏状态, 通过 condition 将所有在等待的队列的线程移到同步队列, 等待获取锁唤醒
                breakBarrier();
                throw new InterruptedException();
            }    

            // 还需要等待的线程数 - 1
            int index = --count;

            // 还需要等待的线程数为 0 了
            if (index == 0) {
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    // 有设置了回调函数, 执行回调函数
                    if (command != null)
                        command.run();

                    ranAction = true;
                    nextGeneration();
                    return 0;

                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            for (;;) {
                try {

                    // 把当前的线程放到等待队列, 等待唤醒
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);    

                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }   

                // 线程唤醒了, 继续执行其他的逻辑
                if (g.broken)
                    throw new BrokenBarrierException();     

                // 唤醒后, 保留的 Generation 引用和最新的 Generation 不一样了, 直接返回当前的 index, 既还需要等待多少个线程
                if (g != generation)
                    return index; 

                // 设置了超时, 同时超时时间小于等于 0 了
                if (timed && nanos <= 0L) {
                    // 直接将栅栏设置为爆破状态, 通过 condition 将所有在等待的队列的线程移到同步队列, 等待获取锁唤醒
                    breakBarrier();
                    throw new TimeoutException();
                }       
            }
        } finally {
            lock.unlock();
        }

    }

    private void breakBarrier() {
        // 设置 Generation 的 broken 为 true,
        generation.broken = true;
        count = parties;
        // 将在等待队列中的线程移到同步队列
        trip.signalAll();
    }

    private void nextGeneration() {
        // 将在等待队列中的线程移到同步队列
        trip.signalAll();
        // 更新新的计算值
        count = parties;
        // 重试设置一个新的  Generation, broken 默认为 false
        generation = new Generation();
    }

}

3.3 CyclicBarrier 的 isBroken 方法

public class CyclicBarrier {

    public boolean isBroken() {

        // 加锁
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 返回当前的 Generation 的 broken 状态
            return generation.broken;
        } finally {
            lock.unlock();
        }
    }
}

3.3 CyclicBarrier 的 reset 方法

public class CyclicBarrier {

    public void reset() {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            // 设置为 break 状态, 更新新的等待线程个数
            breakBarrier();
            // 设置一个新的 Generation
            nextGeneration();
        } finally {
            lock.unlock();
        }
    }
}

4 CountDownLatch 与 CyclicBarrier 的比较

CountDownLatch 与 CyclicBarrier 都是用于控制并发的工具类, 都可以理解成维护的就是一个计数器, 但是这两者还是各有不同侧重点的:

  1. CountDownLatch 一般用于某个线程 A 等待若干个其他线程执行完任务之后, 它才执行; 而 CyclicBarrier 一般用于一组线程互相等待至某个状态,
    然后这一组线程再同时执行; CountDownLatch 强调一个线程等多个线程完成某件事情。CyclicBarrier 是多个线程互等, 等大家都完成, 再携手共进。
  2. 调用 CountDownLatch 的 countDown 方法后, 当前线程并不会阻塞, 会继续往下执行; 而调用 CyclicBarrier 的 await 方法, 会阻塞当前
    线程, 直到 CyclicBarrier 指定的线程全部都到达了指定点的时候, 才能继续往下执行
  3. CountDownLatch 方法比较少, 操作比较简单, 而 CyclicBarrier 提供的方法更多, 比如能够通过 getNumberWaiting(), isBroken() 这些
    方法获取当前多个线程的状态, 并且 CyclicBarrier 的构造方法可以传入 barrierAction, 指定当所有线程都到达时执行的业务功能
  4. CountDownLatch 是不能复用的, 而 CyclicLatch 是可以复用的, 可以通过 reset 方法进行重置

5 参考

大白话说java并发工具类-CountDownLatch, CyclicBarrier

文章来源:https://blog.csdn.net/LCN29/article/details/135141690
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。