【Java 并发】Exchanger

2023-12-25 05:57:12

1 简介

Exchanger 是一个用于线程间协作的工具类, 用于两个线程间能够交换。它提供了一个交换的同步点, 在这个同步点两个线程能够交换数据。
具体交换数据是通过 exchange 方法来实现的, 如果一个线程先执行 exchange 方法, 那么它会阻塞等待另一个线程也执行 exchange 方法, 这个时候两个线程就都达到了同步点, 两个线程就可以交换数据。

2 Exchanger 的方法

Exchanger 除了一个无参的构造方法外, 主要方法也很简单

  1. V exchange(V x) throws InterruptedException: 当一个线程执行该方法的时候, 会等待另一个线程也执行该方法, 因此两个线程就都达到了同步点, 将数据交换给另一个线程, 同时返回获取的数据
  2. V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException: 同上一个方法功能基本一样, 只不过这个方法同步等待的时候, 增加了超时时间。

Exchanger 理解起来很容易, 这里用一个简单的例子来看下它的具体使用。
我们来模拟这样一个情景, 在青春洋溢的中学时代, 下课期间, 男生经常会给走廊里为自己喜欢的女孩子送情书, 相信大家都做过这样的事情吧。
男孩会先到女孩教室门口, 然后等女孩出来, 教室那里就是一个同步点, 然后彼此交换信物, 也就是彼此交换了数据。现在, 就来模拟这个情景。


public class ExchangerDemo {

    private static Exchanger<String> exchanger = new Exchanger();

	public static void main(String[] args) {

	    //代表男生和女生
	    ExecutorService service = Executors.newFixedThreadPool(2);

	    service.execute(() -> {
	        try {
	            //男生对女生说的话
                // 这时候线程会被阻塞, 直到得到返回结果
	            String girl = exchanger.exchange("我其实暗恋你很久了......");
	            System.out.println("女孩儿说:" + girl);
	        } catch (InterruptedException e) {
	            e.printStackTrace();
	        }
	    });
	    service.execute(() -> {
	        try {
	            System.out.println("女生慢慢的从教室里走出来......");
	            TimeUnit.SECONDS.sleep(3);
	            //男生对女生说的话
	            String boy = exchanger.exchange("我也很喜欢你......");
	            System.out.println("男孩儿说:" + boy);
	        } catch (InterruptedException e) {
	            e.printStackTrace();
	        }
	    });
	}
}

输出结果:

女生慢慢的从教室里走出来......
男孩儿说:我其实暗恋你很久了......
女孩儿说:我也很喜欢你......

3 Exchanger 的源码实现

在 Exchanger 的源码实现中的实现, 会根据当前的并发情况, 分成 2 种实现单槽位多槽位

  1. 单槽位主要用于同一个时刻只有两个线程交换数据, 这样在竞争比较激烈的时候, 会影响到性能
  2. 多槽位主要用于同一个时刻有多个线程同时进行两个一组的数据交换, 彼此之间不受影响, 这样可以提高吞吐量

在 Exchanger 的源码中有 2 个属性

private volatile Node[] arena;

private volatile Node slot;

这 2 个属性和单槽和多槽实现相关的属性

  1. 单槽的时候, 线程 A 打算交换内容, 会先通过 slot 判断是否已经有其他的线程在等待了, 有和这个 slot 的节点交换数据, 没有就把自己放到这个 slot 节点
  2. 如果在上面的第 1 步的单槽位操作中失败了, 就会进入到多槽操作, 从 arena 寻找可以交互的节点, 找不到就将自己存入到这个节点数组。

概况起来的话, 就如上面的情况, 具体的场景分析, 看后面的源码。

3.1 Exchanger 的内部类


public class Exchanger<V> {

    @sun.misc.Contended 
    static final class Node {

        // 多槽操作用到, 当前节点在 Exchanger 的多槽节点数组 Node[] arena 的数组位置, 默认为 0
        int index;

        // 多槽操作用到, 记录 上一次 Exchanger.bound 的值
        int bound;

        // 在同一个 bound 下 cas 失败的次数
        int collides;

        // 在当前线程等待另一个线程进行内容交换的过程中, 会通过 Exchanger 的一个常量值 SPINS 和当前线程的线程 id 等经过几次位运算得到一个 hash 值
        // 在线程交换内容后, 会将计算后的 hash 值存到这里
        int hash;

        // 当前线程调用 exchagne 方法打算传递给其他线程的内容
        Object item;

        // 从其他线程获取到的传递内容
        volatile Object match;

        // 当前节点中挂起的线程
        volatile Thread parked;
    }

    static final class Participant extends ThreadLocal<Node> {
        
        // 每个线程 ThreadLocalMap 中存储的内容
        public Node initialValue() { 
            return new Node(); 
        }
    }
}

3.3 Exchagner 的属性

public class Exchanger<V> {

    // 槽位数组中任意两个使用槽之间的索引距离, 将它们隔开以避免伪共享
    // 将设某个节点的在数组的 index 号位, 那么和他进行内容交换的节点不是直接选择了 index + 1, 需要隔一段距离, 这样可以避免伪共享的影响
    // 那么隔的这段距离应该多大, 就是通过节点的 index 位置 + 这个属性搭配一起计算的, 计算方式在多槽操作中
    private static final int ASHIFT = 5;

    // 等于 255, 二进制表示就是 00000000 00000000 00000000 11111111
    private static final int MMASK = 0xff;

    // 等于 256, 二进制表示就是 00000000 00000000 00000001 00000000
    private static final int SEQ = MMASK + 1;

    // 当前机器中, Java 虚拟机的可用的处理器数量, 简单理解为可用核心数, 我的机器的值为 16 
    private static final int NCPU = Runtime.getRuntime().availableProcessors();

    // NCPU >=  510 (MMASK << 1, 也就是乘以 2, 值为 510) ? 255 : NCPU/2;
    // FULL 的最大值是 255
    // 决定了 arean 的容量, arena 为一个 容量为 (Full + 2) * 32  的 Node 数组
    static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1;

    // 等于 1024, 默认自旋的最大次数
    private static final int SPINS = 1 << 10;

    // 线程 A 调用 exchange 方法, 传递给其他线程的内容为 null 时, 会修改为一个默认值 NULL_ITEM
    private static final Object NULL_ITEM = new Object();

    // 线程 A 调用带超时时间的 exchange 方法, 超时了还是没有得到其他线程给的值, 这时候默认会返回一个默认值 TIMED_OUT
    private static final Object TIMED_OUT = new Object();

    // 就是自定义的 ThreadLocal, 在构造方法中声明
    private final Participant participant;

    // 存储多槽节点的数组
    private volatile Node[] arena;

    // 单槽操作的节点
    private volatile Node slot;

    // 最大有效的竞技场位置的索引
    // 还有一个作用: 初始值为 0, 在单槽操作变为多槽操作中, 通过判断是否为 0, 来决定是否初始 arena 数组, 从而确保 arena 数组只会别初始一次
    private volatile int bound;
}

引用 大飞 中对伪共享讲解:
假设一个类的两个相互独立的属性 a 和 b 在内存地址上是连续的 (比如 FIFO 队列的头尾指针), 那么它们通常会被加载到相同的 cpu cache line 里面。
并发情况下,如果一个线程修改了 a, 会导致整个 cache line 失效(包括 b ), 这时另一个线程来读 b, 就需要从内存里再次加载了。
这种多线程频繁修改 ab 的情况下, 虽然 a 和 b 看似独立, 但它们会互相干扰,非常影响性能。

而数组的声明中, 是需要一段连续的内存的, 所以对 index 位置和 index + 1 的位置的节点频繁的操作, 会影响性能, 那么进行一段距离的隔离操作就行了。

3.2 Exchanger 的单位槽的 exchange 方法


public class Exchanger<V> {

    public V exchange(V x) throws InterruptedException {

        Object v;
        Node[] a;

        // 需要交给另一个线程的信息为 null 的话,使用内部的一个 Object 作为替代
        Object item = (x == null) ? NULL_ITEM : x;

        // 1. 多槽节点数组为 null, 单槽交换的返回值为 null, 线程中断标识为 true,
        // 2. 多槽节点数组为 null, 单槽交换的返回值为 null, 线程中断标识为 false, 多槽交换的返回值为 null
        // 3. 多槽节点数组不为 null, 线程中断标识为 true
        // 4. 多槽节点数组不为 null, 线程中断标识为 false, 多槽交换的返回值为 null

        // 上面 4 种情况, 都会抛出异常
        if (((a = arena) != null || (v = slotExchange(item, false, 0L)) == null) && ((Thread.interrupted() || (v = arenaExchange(item, false, 0L)) == null)))
            throw new InterruptedException();
            
        return (v == NULL_ITEM) ? null : (V)v;    
    }
}

3.3 Exchanger 的 slotExchange 方法


public class Exchanger<V> {

    private final Object slotExchange(Object item, boolean timed, long ns) {

        // 获取当前线程身上的 Node 节点
        Node p = participant.get();

        Thread t = Thread.currentThread();
        // 线程的中断标识为 true, 返回 null
        if (t.isInterrupted()) 
            return null;

        for (Node q;;) {
        
            // 槽位节点 solt 不为 null, 则说明已经有线程在这里等待交换数据了
            if ((q = slot) != null) {
                
                // 通过 cas 将当前对象的 slot 的值从 q 更新为 null
                if (SLOT.compareAndSet(this, q, null)) {
                    // 更新成功
                    // 获取槽位节点的传递内容
                    Object v = q.item;

                    // 当前线程传递给其他线程的内容
                    q.match = item;

                    // 当前节点中挂起的线程
                    Thread w = q.parked;
                    if (w != null)
                        // 唤醒挂起的线程
                        LockSupport.unpark(w);
                    // 返回槽位节点的传递值    
                    return v;
                }

                // 这里就是从单槽操作切换到多槽操作的一个重点
                // 1. slot 不为 null, 当前线程尝试通过 cas 设置为 null, 失败了
                // 2. 同时当前应用可以的核心数 > 1 (有并发的可能性)
                // 3. 通过 cas 将当前 Exchanger 的 bound 属性从 0 设置为 255 成功
                // 三个条件公共导致当前  Exchanger 的 arena 数组声明

                // 这就满足了从单槽操作切换到多槽操作的一个重点: arena 数组不为空
                
                // 声明完 arena 后, 会重新回到上面的循环, 
                // slot 一旦为空了, 就直接返回 null, 进入到多槽操作
                // slot 不为空的话, 还是会继续通过 cas 尝试进行交换操作

                // 当前的核心数 > 1 并且  同时通过 cas 将 bound 从 0 设置为 256 
                if (NCPU > 1 && bound == 0 && BOUND.compareAndSet(this, 0, SEQ))
                    // 声明 arena 为一个 容量为 (Full + 2) * 32  的 Node 数组
                    // FULL 最大值为 255, 那么可以得出最终 arena 的容量在 8224 在
                    arena = new Node[(FULL + 2) << ASHIFT];

            } else if (arena != null)
                // 多槽位不为空,需要执行多槽位交换, 直接返回 null
                return null;
            else {
                // 多槽位为空, 同时槽位节点也为空

                // 从线程自身获取到的节点 p 的 item (当前线程打算传递给其他线程的内容) 为当前的 item 值
                p.item = item;
                // 通过 cas 将当前的槽位节点从 null 设置为当前线程身上的节点
                if (SLOT.compareAndSet(this, null, p))
                    // 设置成功了, 结束死循环
                    break;
                // 设置失败, 先将线程自身获取到的节点 p 的 item 设置为 null, 重新跑一次循环    
                p.item = null;
            }    
        }    

        // 获取当前 p 节点身上的 hash 值, 默认值为 0
        int h = p.hash;
        // 是否设置了超时时候, 是的话, 过期时间 = 当前的纳秒 + 设置的超时时间的纳秒值, 否则超时时间为 0 
        long end = timed ? System.nanoTime() + ns : 0L;
        // 计算最大的自旋次数 = 核心数 > 1 吗, 是的话, 最大自旋次数为 1024, 否则为 1 次
        int spins = (NCPU > 1) ? SPINS : 1;
        Object v;

        // 只有从线程自身获取到的节点 p 里面的 从其他线程获取到的传递内容 为空, 就一直循环
        // 当前线程是 slot 了, 等待另一个线程和自身进行交换内容, 当当前的节点获取到另一个线程交换的内容就可以停止循环了, 即 p.match != null
        while ((v = p.match) == null) {
            
            // 自旋次数大于 0
            // 当前线程为 slot 了, 但是还没有其他的线程和自身交换内容
            // 这是不会立即挂起当前线程, 而是通过自旋 spins 次, 这期间不到判断当前是否获取到了其他线程交换的内容
            // 自旋次数为 0 后, 还是没有获取到, 那么才尝试挂起当前的线程
            if (spins > 0) {
                // 计算默认值, 第一次线程自身的 Node 节点的 h 为 0, 经过下面 3 步后还是为 0 
                h ^= h << 1; 
                h ^= h >>> 3; 
                h ^= h << 10;
                // 计算后的 h 还是为 0
                if (h == 0)

                    // h = 1024 | 当前线程的 id
                    h = SPINS | (int)t.getId();

                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                    // (SPINS >>> 1) - 1 = (1024 / 2) - 1 = 511, 二进制 00000000 00000000 00000001 11111111
                    // 计算后的 h < 0  并且 spins - 1 后 & 511 等于 0,  
                    // 既每自旋 512 次, 同时 h < 0, 线程执行让步, 那么多次的自旋还没有获取到交换内容, 先让出 cpu
                    Thread.yield();
            }
            // 槽位节点不等于线程自身的节点
            // 一开始 slot == p, 现在 slot != p
            // 可能的就是已经有另一个线程通过 cas 将 slot 置为 null, 后面会和当前节点的进行交换内容
            else if (slot != p)
                // 自旋次数重置为 1024
                spins = SPINS;
            
            // 1. 线程中断标识位 false
            // 2. 节点数组为 null
            // 3. 没有设置超时时间 或者 距离超时时间还 > 0   
            // 三个条件都满足  
            else if (!t.isInterrupted() && arena == null && (!timed || (ns = end - System.nanoTime()) > 0L)) {    
                // 线程自身的节点的挂起线程等于当前线程
                p.parked = t;
                // 如果槽位节点 等于当前的线程节点
                if (slot == p) {
                    // 超时时间 = 0
                    if (ns == 0L)
                        // 挂起当前线程
                        LockSupport.park(this);
                    else
                        // 带超时时间的挂起线程
                        LockSupport.parkNanos(this, ns);
                }
                p.parked = null;
            } else if (SLOT.compareAndSet(this, p, null)) {

                // 能进入到这个的条件需要满足
                // 1. 自旋后, 还是没能获取到需要的交换的内容
                // 2. 当前 Exchanger 的 slot 还是等于当前线程的节点 p
                // 3. 当前 Exchagner 的 arena 不为 null
                // 4. 线程中断标识位 true
                // 5. 设置了超时时间, 同时已经超时了
                // 6. 当前的 Exchanger 的 slot 通过 cas 从 p 设置为 null 了

                // 然后根据当前是否超时了或者线程中断了, 判断是否要进入多槽操作
                
                // v = 设置了超时时间 + 距离超时时间 <= 0 + 线程中断标识为 false, 
                // 都为是 返回默认值 TIMED_OUT, 结束流程了,  否则返回 null, 尝试到多槽操作

                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                // 跳出循环
                break;
            }
        }

        // 设置 p 节点的 match 属性为 null
        MATCH.setRelease(p, null);
        // 设置 p 节点的 item 属性为 null
        p.item = null;
        // 设置 p 节点的 hash 值 = 最新的 hash
        p.hash = h;
        // 返回从其他线程获取到的传递内容
        return v;
    }
}

3.4 Exchanger 的 arenaExchange 方法

public class Exchanger<V> {

    private final Object arenaExchange(Object item, boolean timed, long ns) {

        // 槽位节点数组
        Node[] a = arena;
        // 槽位数组的长度
        int alen = a.length;
        // 获取当前线程身上维护的节点
        Node p = participant.get();


        // 一个死循环, 下面的逻辑全部都在这个循环中
        for (int i = p.index;;) { 
            
            // i 等于当前线程身上维护的节点的数组位置, 默认为 0
            // 这个 i 很重要, 后面会不断的修改 i 的值, 不断重试获取有值的位置

            int b, m, c;

            // 计算当前 index 对应的交换节点在槽位数组的位置

            // ASHIFT = 5, (1 << ASHIFT) - 1 = 30
            // i << ASHIFT = i << 5 = i * 2^5 = i * 32
            // j = i * 32 + 30 , i 的取值在 [0, 255] 之间 (为什么, 后面源码中有)
            // arean 最大长度为 8224, j 计算出来后的结果最大为 8191
            // 这条计算方式几乎能把 255 内的数都能映射到对应的数组上

            // 每个节点之间相隔 62 个位置

            int j = (i << ASHIFT) + ((1 << ASHIFT) - 1);

            // 容错, 计算出来的结果 < 0 或者 >= 数组的长度, 取数组的最后一个
            if (j < 0 || j >= alen)
                j = alen - 1;

            // 获取 arena 数组 j 位的节点信息    
            Node q = (Node)AA.getAcquire(a, j);

            
            // 下面的逻辑分为了 3 大块

            // 1. 当前节点应该在的位置在当前数组有效范围内, 同时对应的位置已经有节点了
            // 2. 当前节点应该在的位置在当前数组有效范围内, 但是对应的位置没有节点
            // 3. 当前节点应该在的位置不在当前数组有效范围内, 或者第一块的 cas 替换失败

            if (q != null && AA.compareAndSet(a, j, q, null)) {

                // 第一部分: 当前节点的应该在的位置在当前数组有效范围内, 同时对应的位置已经有节点了

                // 先通过 cas 将 arena 数组的 j 位置从 q 设置为 null 

                // 2 个节点交换数据
                Object v = q.item;                     
                q.match = item;
                Thread w = q.parked;
                // 唤醒交换的节点里面的线程
                if (w != null)
                    LockSupport.unpark(w);
                // 返回获取的交换值    
                return v;


            } else if (i <= (m = (b = bound) & MMASK) && q == null) {

                // 第二部分: 当前节点应该在的位置在当前数组有效范围内, 但是对应的位置没有节点

                // MMASK 等于 255, 二进制表示就是 00000000 00000000 00000000 11111111
                // 任何数 & 上 MMASK, 最大值也只会是 255, 也就是 m 的取值范围为 [0, 255]
                // 线程上的节点的 index 不在 0 到 255 之间会被修正

                // bound 是数组最大的有效的位置,和 MMASK 相与,可以得到数组真正最大有效位的位置
                // Exchanger 的 bound 默认为 0
                // 线程的 Node 节点初始的 index 的默认值也为 0

                // 更新当前节点交换的内容为最新的 item, 可以去除 p.item == null 的情况
                p.item = item;                         
    
                if (AA.compareAndSet(a, j, null, p)) {

                    // 将当前的节点设置到当前数组的 j 的位置 (即本身打算交换的位置)

                    // 计算超时时间, 设置了超时同时当前的 同时 数组真正最大有效位的位置 为 0, 计算超时的时间, 否则超时时间就是 0,不超时
                    long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;


                    Thread t = Thread.currentThread();
                    
                    // 自旋操作
                    for (int h = p.hash, spins = SPINS;;) {

                        // 先通过自旋, 获取当前节点中其他线程交换的数据, 不为空, 即交换成功, 做一下处理后, 就能返回
                        Object v = p.match;

                        // 当前节点中有其他线程交换的数据了
                        if (v != null) {
                            // 将当前节点的 match 属性设置为 null
                            MATCH.setRelease(p, null);
                            // item 属性设置为 null
                            p.item = null;         
                            // 更新 hash 为最新的 hash    
                            p.hash = h;
                            // 返回获取到的数据
                            return v;
                        }

                        else if (spins > 0) {

                            // 进到这里, 自旋次数还是大于 0, 同时当前节点中还没有其他线程的交换的内容

                            // 这里和 slotExchanger 一样的

                            // 计算一个新的 hash 值
                            h ^= h << 1; 
                            h ^= h >>> 3; 
                            h ^= h << 10; 
                            
                            // 计算出来的 hash 值为 0, 修改为 1024 | 当前的线程 Id
                            if (h == 0)                
                                h = SPINS | (int)t.getId();

                            else if (h < 0 && --spins & ((SPINS >>> 1) - 1)) == 0)
                                // 计算出来的 hash 值小于 0, spins 先自减 1, 然后 & 上 511 == 0
                                // 尝试让出线程执行权, 等待下次 cpu 重新调度
                                Thread.yield(); 
                        }
                        else if (AA.getAcquire(a, j) != p)
                            // arena 数组 j 位置不等于 p
                            // 可能的就是已经有另一个线程通过 cas 将 j 位置设为 null, 后面会和当前节点的进行交换内容
                            spins = SPINS;  

                        else if (!t.isInterrupted() && m == 0 && (!timed || (ns = end - System.nanoTime()) > 0L)) {

                            // 1. 线程中断标识位 false
                            // 2. 计算出来的数组最大有效位的位置 = 0, 已经达到当前数组的最小有效位了, 没法再减了, 进行阻塞
                            // 3. 没有设置超时时间 或者 距离超时时间还大于 0   
                            // 三个条件都满足  

                            // 尝试挂起当前的线程
                            p.parked = t;

                            // arena 数组 j 位置的节点仍然是当前线程的节点
                            if (AA.getAcquire(a, j) == p) {
                                // 挂起当前的线程
                                if (ns == 0L)
                                    LockSupport.park(this);
                                else
                                    LockSupport.parkNanos(this, ns);
                            }
                            p.parked = null;

                        } else if (AA.getAcquire(a, j) == p && AA.compareAndSet(a, j, p, null)) {
                            
                            // 当前这个槽位一直没有线程来交换数据,切换到另一个槽位

                            // arena 数组 j 位置的节点仍然是当前线程的节点 同时通过 cas 将 arena 数组的 j 位置从 p 设置为 null
                            
                            // 计算出来的数组最大有效位的索引 != 0, 也就是 bound 低 8 位存在 1
                            if (m != 0)                
                                // 将 Exchanger 的 bound 属性增加 256 - 1, 也就是 bound + 00000001 00000000 - 00000001
                                // bound 低 8 位存在 1, 那么经过下面的 cas 操作, bound 值增大了, 但是 bound 低 8 位的值减小了
                                // m 只会取 bound 的低 8 位, 也就是变相的使下次的 m 取值 - 1 了。
                                BOUND.compareAndSet(this, b, b + SEQ - 1);

                            // p 的打算和其他线程交换的内容设置为 null    
                            p.item = null;
                            // hash 设置为新的 hash
                            p.hash = h;
                            // 设置 i = 线程当前的节点的位置 / 2;
                            // 通过这个新的索引值, 重新计算得到新的交换位置
                            i = p.index >>>= 1; 
                            
                            // 线程中断标识为 true, 返回 null
                            if (Thread.interrupted())
                                return null;
                            // 设置了超时时间, 超时时间为 0, 同时计算后的数组有效位的最大索引最大值为 0
                            if (timed && m == 0 && ns <= 0L)
                                return TIMED_OUT;
                            break; 
                        }
                    }
                }
                else
                    // cas 失败了, 设置线程自身的节点 p 的打算用来交换的内容 item 为 null, 然后重新循环
                    p.item = null;                     
            } else {

                // 第三部分: 当前节点应该在的位置不在当前数组有效范围内, 或者第一块的 cas 替换失败

                // 尝试重新设置线程节点的 index

                // 线程自身的节点 p 的 bound 不等于 Exchanger 当前的 bound
                if (p.bound != b) {                    
                    // 更新线程自身的节点 p 的 bound 等于当前 Exchanger 的 bound
                    p.bound = b;
                    // 线程自身的节点 p 的 collides 为重置为 0 
                    p.collides = 0;

                    // 线程自身的节点 p 的 bound 不等于当前 Exchanger 的 bound,
                    // 将 p 重置为当前的的 bound, i 下次操作的位置, 还是从本轮计算出来的 m 值开始

                    
                    // m 为 arean 数组有效位的位置的最大值
                    // i == m 并且 m != 0, 那么 i 下次的位置为 m - 1, 上面验证过 i 位置节点的位置不正确, 这里 i == m, 那么从 m - 1 开始吧
                    // 其他情况依旧还是从 m 开始
                    i = (i != m || m == 0) ? m : m - 1;
                }
                else if ((c = p.collides) < m || m == FULL || !BOUND.compareAndSet(this, b, b + SEQ + 1)) {

                    // 在当前的 bound 下, cas 失败的次数 < 数组有效位的位置的最大值, 还能继续试
                    // m 等于最大容量值了达到了最大值了
                    // 通过 cas 给 bound 增大失败

                    // 通过 cas 设置 bound = bound + 256 + 1 = bound + 00000001 00000000 + 00000001 
                    // bound 只看低 8 位的话,结果就是 + 1 了, 后面 bound & MMASK = m 也就是 新的 m = 旧的 m + 1

                    // 线程自身的节点 p 的的 collides + 1, 又进行了一次 cas 操作

                    p.collides = c + 1;
                    // i 等于 0 时, i 重新赋值为计算后的数组有效位的最大索引最大值, 否则 i 减 1

                    // i 已经到 0 了, 那么 i 重新从最大值开始, 否则从 i - 1 继续开始
                    i = (i == 0) ? m : i - 1;          
                }
                else
                    // i 等于 计算后的数组有效位的最大索引最大值 + 1
                    i = m + 1;                         
                // 线程自身的节点 p 的索引值等于 i, 继续从 i 位置开始处理
                p.index = i;
            }
        }
    }

}

多槽交换方法 arenaExchange 的整体流程和 slotExchange 类似,主要区别在于它会根据当前线程的数据携带结点 Node 中的 index 字段计算出命中的槽位。

  1. 为了解决缓存行的问题, Exchanger 中存节点的数组不会连续存储的, 而是存一个, 下一个会隔一段距离后才存储
  2. 基于这个可以想象成 2 个数组, 一个连续的逻辑数组, 节点之间没有间隔, 一个就是实际存储的数组, 节点直接由间隔的, 得到逻辑数组的位置 i, 可以通过公式推导出他在实际数组的位置 j
  3. Exchagner 中的数组是一开始就声明好的, 但是为了性能, 整个数组一开始不会就全部都使用, 而是设置了一个最高有效位, 也就是上面的 bound 属性, 数组可用的部分为 [0, bound] 部分
  4. 在使用中, 节点到了, 先通过节点的逻辑数组位置 index, 计算出实际的位置, 判断实际的位置有没有节点在等待, 有, 交换数据返回
  5. 对应的位置没有数据, 同时这个逻辑位置 index 还在当前数组的有效最高位内 (bound 可以通过公式逆推逻辑位置), 那么就和单槽操作的一样, 把当前节点放到对应的位置
  6. 在第 5 步中, 会通过自旋, 自旋次数达到上限, 通过换一个槽位 (等于当前的位置 index / 2), 继续自旋, 直到当前逻辑数组的第一位, 在没有超时的限制下, 挂起当前线程, 等待其他线程交换
  7. 在上面的 4,5 中, 当前节点的逻辑数组位置 index 对应的节点没有节点等待, 而且这个 index 又不符合有效最高位的情况下,
  8. 当前节点的 bound 值和当前 Exchanger 的不一致, 先重置为一样的, 那么设置当前的当前节点为 m 或者 m - 1 (流程走下来, 确定 m 对应的位置没有节点) 的位置重新循环处理, 从第 4 步开始
  9. 当前节点的逻辑数组位置 index 对应的节点没有节点等待, 而且这个 index 又不符合有效最高为的情况下, bound 还一样, 那么从有效最高为的 当前节点的上一个节点 或者 有效最高位节点开始处理
  10. 最后这一步了, 没办法设置从 有效最高位 + 1 的位置开始处理 (这时候位置为 null, 会将当前节点放到这个位置)

4 参考

大白话说Java并发工具类-Semaphore, Exchanger
Java 并发 — Exchanger源码分析

文章来源:https://blog.csdn.net/LCN29/article/details/135189251
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。