CycliBarrier原理解释

2023-12-16 00:38:25

CyclicBarrier 是 Java 并发编程中的一个同步辅助工具,它允许一组线程相互等待,达到一个公共屏障点(Barrier Point),然后这些线程才继续执行。CyclicBarrier 并没有直接使用 ReentrantLock,但它确实使用了 AQS(AbstractQueuedSynchronizer)中的一些原理和类似的机制。

以下是 CyclicBarrier 的一些关键实现细节:

  1. 内部锁和条件: CyclicBarrier 内部使用了一个简单的锁(并非 ReentrantLock),以及一个条件变量。这个锁用于控制对屏障状态的访问,条件变量用于挂起线程直到所有线程都到达屏障。

  2. 屏障生成(Generation): CyclicBarrier 使用一个内部类 Generation 来表示屏障的每一代。每当屏障被触发时,就会创建一个新的 Generation 实例。

  3. 等待和通知: 当一个线程到达屏障时,它会调用 await() 方法。这个方法会检查当前屏障代是否已经被打破(broken),然后将等待的线程计数器减一。如果计数器没有减到零,线程就会在条件变量上等待。当最后一个线程到达时,它会触发屏障,并通知所有在条件变量上等待的线程。

  4. 重置和异常处理: 如果在所有线程到达之前,屏障被重置或者线程被中断,CyclicBarrier 会将当前屏障代标记为被打破,并通知所有等待的线程。

在实际的源码中,CyclicBarrier 并没有直接继承 AbstractQueuedSynchronizer,而是使用了一个内部锁和条件变量来实现类似的等待/通知机制。下面是一个简化的源码片段来展示这一点:

public class CyclicBarrier {
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();
    private Generation generation = new Generation();

    private int count;
    private final int parties;
    // 可选的屏障操作
    private final Runnable barrierCommand;

    // 内部类表示每一代
    private static class Generation {
        boolean broken = false;
    }

    public CyclicBarrier(int parties, Runnable barrierAction) {
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

    public int await() throws InterruptedException, BrokenBarrierException {
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            // 如果线程中断,则打破屏障
            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

            // 每个线程到达后,计数器减一
            int index = --count;
            if (index == 0) {  // 如果是最后一个到达的线程
                boolean ranAction = false;
                try {
                    final Runnable command = barrierCommand;
                    if (command != null)
                        command.run();
                    ranAction = true;
                    nextGeneration(); // 触发下一代屏障并唤醒所有线程
                    return 0;
                } finally {
                    if (!ranAction)
                        breakBarrier();
                }
            }

            // 不是最后一个线程,挂起
            for (;;) {
                try {
                    trip.await();
                } catch (InterruptedException ie) {
                    if (g == generation && !g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;
            }
        } finally {
            lock.unlock();
        }
    }

    private void breakBarrier() {
        generation.broken = true;
        count = parties;
        trip.signalAll();
    }

    private void nextGeneration() {
        trip.signalAll();
        count = parties;
        generation = new Generation();
    }

    // 其他方法省略
}

在这个片段中,可以看到 CyclicBarrier 使用了 ReentrantLockCondition,但这些并不是直接来源于 AQS,而是利用了类似的同步机制。CyclicBarrier 的核心在于如何管理这些线程的协调和等待/通知,而不是直接扩展 AQS。

trip.await() 被用于挂起线程,直到 trip.signalAll() 被调用,这通常发生在最后一个线程到达屏障时或者屏障被重置或打破时。这种使用 Condition 的方式允许 CyclicBarrier 控制线程在特定条件下的等待和唤醒。

文章来源:https://blog.csdn.net/zhanghuyi2022/article/details/134916397
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。