【Java 集合】ConcurrentHashMap (JDK 1.8 版本)

2023-12-21 03:33:16

1 ConcurrentHashMap 简介

Map 一种存储键值对 (key-value) 的数据结构, 可以通过 key 快速地定位到需要的 value, 在 Java 中是一个使用频率很高的一个数据结构。一般情况下, 我们都是可以直接使用它的实现类 HashMap 就能满足需求了。
但是 HashMap 在多线程情况, 并不是一个线程安全的类。
解决的方式有很多, 例如:

  1. 使用在 Java 体系中古老的 Hashtable 作为替代, 该类基本上所有的方法都采用 synchronized 进行线程安全的控制, 虽然保证了线程安全, 但是牺牲了很大的性能。 在高并发的情况下, 每次只有一个线程能够获取对象监视器锁, 这样的并发性能的确不令人满意。
  2. 通过 Collections 的 synchronizedMap(Map<K,V> m) 方法返回一个线程安全的 Map。但是这个的效果实际上和 Hashtable 的一样, 依然是采用 synchronized 独占式锁进行线程安全的并发控制的, 所以这种方案的性能也是令人不太满意的。

那么有没有一种既保证了线程安全性, 性能也不错的 Map 呢? ConcurrentHashMap 就是我们的选择了, 内部利用了锁分段的思想提高了并发度。当然的, 随着 JDK 的升级, ConcurrentHashMap 的实现也有了不同的方式。
大体了分为 2 个版本: 1.6 版本的 和 1.8 版本。

ConcurrentHashMap 在 JDK 1.6 的版本网上资料很多, 有兴趣的可以去看看。 JDK 1.6 版本关键要素:

  1. segment 继承了 ReentrantLock 充当锁的角色, 为每一个 segment 提供了线程安全的保障
  2. segment 维护了哈希散列表的若干个桶, 每个桶由 HashEntry 构成的链表

而到了 JDK 1.8 的 ConcurrentHashMap 就有了很大的变化, 光是代码量就足足增加了很多。1.8 版本舍弃了 segment, 并且大量使用了 synchronized, 以及 CAS 无锁操作以保证 ConcurrentHashMap 操作的线程安全性。
至于为什么不用 ReentrantLock 而是 Synchronzied 呢? 实际上, synchronzied 在 JDK 1.6 做了很多的优化, 包括偏向锁, 轻量级锁, 重量级锁, 可以依次向上升级锁状态。
因此, 使用 synchronized 相较于 ReentrantLock 的性能会持平甚至在某些情况更优, 具体的性能测试可以去网上查阅一些资料。另外, 底层数据结构改变为采用数组 + 链表 + 红黑树的数据形式。

2 ConcurrentHashMap 的关键属性, 类和 CAS 方法

在了解 ConcurrentHashMap 的具体方法实现前, 我们需要系统的来看一下几个关键的地方, 从这几个地方, 从大体上了解 ConcurrentHashMap 的一些特性。

2.1 常量设置 (这些设置基本和 HashMap 的类似)

  1. ConcurrentHashMap 内部是使用一个数组存放数据的, 这个数组的长度必须是 2 的 n 次方, 默认的容量是 16, 最大是 1 << 30, 既 2 的 30 次方
  2. 默认负载因子为 0.75, 当数组的的存数据的项 >= 数组的长度 * 负载因子, 就会进行数组长度扩容
  3. 数组的每一项, 在 JDK 1.8 中, 可以是链表, 也可以是红黑树。
  4. 当数组的长度大于等于 64, 数组的某一项的长度大于等于 8, 会转为红黑树, 小于等于 6, 会重新转为链表
  5. key 和 value 都不允许为 null (HashMap 允许一个 key 为 null 和 不限制的 value 为 null)
  6. 在 ConcurrentHash 中数组中的每个节点的 hash 都有特殊作用

ConcurrentHash 中数组中的节点的 hash 值不同的含义

  1. >= 0, 表明这个节点为 Node, 既链表节点
  2. -1, 表明这个节点为 ForwardingNode, 说明这个位置正在数据迁移
  3. -2, 表明这个节点为 TreeBin, 树节点 TreeNode 的进一步封装
  4. -3, 表明这个节点为 ReservationNode, 通过 JDK 1.8 新增的几个方法给 ConcurrentHashMap 设值时, 会先对应位置的节点变为 ReservationNode, 再做处理

比如 JDK 1.8 中 Map 新提供了 computeIfAbsent(K key, Function<? super K, ? extends V> mappingFunction) 方法, 这个方法的作用, 就是向 Map 获取 key 对应的值不存在时, 将后面的 lambda 表达式的返回值设置到 Map 中, 然后返回。
这样就能保证从 Map 不会获取到 null 值。

Map<Integer, String> map = new ConcurrentHashMap<>();
map.put(1, "1");

// 输出: 2123
System.out.println(map.computeIfAbsent(2, k -> k + "123"));

// 输出: null
System.out.println(map.get(10));

computeIfAbsent 定义在 Map 接口中声明的方法, 同时基于 JDK 1.8 的特性, 用 default 关键字进行了修饰, 提供了默认的实现。

Map 提供的这些 default 方法, 可以发现都是线程不安全的, 所以 ConcurrentHashMap 对他们进行了重写, 在这些方法中, 需要对向已有的数组里面的添加新值,
那么就会向把对应位置的那一项, 设置为 ReservationNode 节点。用于通知其他的线程, 这个位置的数据在修改中。

2.2 关键属性

public class ConcurrentHashMap<K,V> {

    // 装载 Node 的数组, 作为 ConcurrentHashMap 的数据容器, 采用懒加载的方式, 直到第一次插入数据的时候才会进行初始化操作, 数组的大小总是为 2 的幂次方
    transient volatile Node<K,V>[] table;

    // 扩容时使用, 平时为 null, 只有在扩容的时候才为非 null, 扩容时先把数据放在这个对象, 扩容完成后, 才把 table 指向这个对象
    transient volatile Node<K,V>[] nextTable;

    // 在没有发生争用时的元素个数的统计, 即当前的元素个数
    transient volatile long baseCount;

    // 数组大小控制
    // 这个属性非常重要, 取值不同有不同的情况

    // 1. 当 sizeCtl > 0 时
    // 在 ConcurrentHashMap 声明, 但是内部存储数据的数组是延迟加载的, 这个数组的大小临时存放到 sizeCtl 中
    // 初始化后表示扩容的阈值, 默认为当前数组的长度 * 0.75

    // 2. 当 sizeCtl = 0 时
    // 在声明 ConcurrentHashMap, 没有指定容量大小, 这时 sizeCtl 为 0, 也就是表明 数组 table 的长度去默认值: 16

    // 3. 当 sizeCtl = -1 时
    // 表明当前 table 正在初始中

    // 4. 当 sizeCtl < -1 时
    // sizeCtl 的数据类型为 int, 占 32 位。
    // 那么 sizeCtl 的高 16 位存放的是扩容时标识符(具体的解释可以看后面, 现在知道有这个设定就行了), 低 16 位存放的是参与扩容的线程数目 (CocurrentHashMap 在扩容的时候, 有别的线程发现正在扩容中, 会一起参与进来, 一起扩容)
    transient volatile int sizeCtl;

    // 扩容索引值, 表示已经分配给扩容线程的 table 数组索引位置, 主要用来协调多个线程间迁移任务的并发安全
    transient volatile int transferIndex;

}

2.3 关键内部类

public class ConcurrentHashMap<K,V> {

    static class Node<K,V> implements Map.Entry<K,V> {
        
        final int hash;
        final K key;

        // 很多属性都是用 volatile 进行修饰的, 也就是为了保证内存可见性
        volatile V val;
        volatile Node<K,V> next;
        ......
    }

    static final class TreeNode<K,V> extends Node<K,V> {

        TreeNode<K,V> parent;  // red-black tree links
        TreeNode<K,V> left;
        TreeNode<K,V> right;
        TreeNode<K,V> prev;    // needed to unlink next upon deletion
        boolean red;
        ......
    }

    static final class TreeBin<K,V> extends Node<K,V> {
        
        // 这个类并不负责包装用户的 key、value 信息, 而是包装的很多 TreeNode 节点。 
        // 实际的 ConcurrentHashMap "数组" 中, 存放的是 TreeBin 对象, 而不是 TreeNode 对象
        TreeNode<K,V> root;
        volatile TreeNode<K,V> first;
        volatile Thread waiter;
        volatile int lockState;

        // values for lockState
        static final int WRITER = 1; // 持有写锁时的标志
        static final int WAITER = 2; // 等待写锁的标志
        static final int READER = 4; // 读锁的增加值
        ......
    }

    static final class ForwardingNode<K,V> extends Node<K,V> {
        // 在扩容时才会出现的特殊节点, 作为一个标记节点放在桶的首位, 
        // 其 key, value, next 全部为 null, hash 为 MOVED (-1), 并拥有 nextTable 指针指向新的 table 数组

        final Node<K,V>[] nextTable;

        ForwardingNode(Node<K,V>[] tab) {
            // hash, key, value, next
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }
    }

    static final class ReservationNode<K,V> extends Node<K,V> {

        // 调用 Map default 方法时, 会把节点修改为这个特殊节点
        // 其 key, value, next 全部为 null, hash 为 RESERVED (-3)

        ReservationNode() {
            super(RESERVED, null, null);
        }

        Node<K,V> find(int h, Object k) {
            return null;
        }
    }

}

结合上面几个内部类, 基本可以推理出 ConcurrentHashMap 的存储数据的方式和 HashMap 一样, 都是数组, 数组的数据类型可以是链表, 也可以是红黑树。

大体的结构如下:
Alt 'ConcurrentHashMap 的数据结构'

2.4 一些高频的 CAS 方法

public class ConcurrentHashMap<K,V> {

    /**
     * 该方法用来获取 tab 数组中索引为 i 的 Node 元素
     */
    static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
        return (Node<K,V>)U.getObjectAcquire(tab, ((long)i << ASHIFT) + ABASE);
    }

    /**
     * 利用 cas 操作将 tab 数组中索引为 i 的元素从 c 替换为 v
     */
    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) {
        return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

    /**
     * 将 tab 数组中索引为 i 的元素设置为 v
     */
    static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
        U.putObjectRelease(tab, ((long)i << ASHIFT) + ABASE, v);
    }
}

3 ConcurrentHashMap 的源码实现

3.1 实例构造器方法

public class ConcurrentHashMap<K,V> {

    // 1. 构造一个空的 map, 即 table 数组还未初始化, 初始化放在第一次插入数据时, 默认大小为 16
    public ConcurrentHashMap(){}

    // 2. 给定 map 的大小
    public ConcurrentHashMap(int initialCapacity){}

    // 3. 给定一个 map
    public ConcurrentHashMap(Map<? extends K, ? extends V> m){}

    // 4. 给定 map 的大小以及负载因子
    public ConcurrentHashMap(int initialCapacity, float loadFactor){}

    // 5. 给定 map 大小, 加载因子以及最少多少个桶(数组的最小长度)
    public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel){}

}

ConcurrentHashMap 一共给我们提供了 5 种构造器方法, 具体使用请看注释, 我们来看看第 2 种构造器, 传入指定大小时的情况 (其他的构造方式都是类似的), 该构造器源码为:

public class ConcurrentHashMap<K,V> {

    // 存储数据的数组的最大容量 
    private static final int MAXIMUM_CAPACITY = 1 << 30;

    public ConcurrentHashMap(int initialCapacity) {

        //1. 小于 0 直接抛异常
        if (initialCapacity < 0)
            throw new IllegalArgumentException();

        //2. 判断指定的容量是否超过了允许的最大值, 超过了话则取最大值, 否则再对该值进一步处理, 使其为 2 的 n 次方

        // 在 initialCapacity 的值大于等于 MAXIMUM_CAPACITY / 2 的情况下, 直接取最大值 MAXIMUM_CAPACITY, 否则重试计算出第一个大于 initialCapacity 的 2 的 n 次方的数

        // initialCapacity >>> 1 相当于除以2 取整, 经过这样处理可以得到第一个大于 initialCapacity 的 2 的 n 次方的数
        int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?  MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));    

        //3. 赋值给 sizeCtl, 这时候 sizeCtl 作用是存放初始时数组的容量
        this.sizeCtl = cap;
    }

    // 经过这个方法的处理后, 可以得到第一个大于等于 c 的 2 的 n 次方的数
    private static final int tableSizeFor(int c) {
        int n = c - 1;
        n |= n >>> 1;
        n |= n >>> 2;
        n |= n >>> 4;
        n |= n >>> 8;
        n |= n >>> 16;
        // MAXIMUM_CAPACITY == 1 << 30
        return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
    }
}

这段代码的逻辑请看注释, 很容易理解。
如果小于 0 就直接抛出异常, 如果指定值大于了所允许的最大值的话就取最大值。
否则, 在对指定值做进一步处理。最后将计算出来的容量 cap 赋值给 sizeCtl, 关于 sizeCtl 的说明请看上面的说明, 当调用构造器方法之后, sizeCtl 的大小就代表了 ConcurrentHashMap 的大小, 即 table 数组长度

在指定容量的处理方法 tableSizeFor 中, 根据入参的值计算出第一个大于当前入参的 2 的 n 次方数, 这个值就是 ConcurrentHashMap 中数组进行声明时的容量了。
比如, 当指定大小为 18 时, 经过这个方法处理, 会得到一个 32 的值。

需要注意的是, 调用构造器方法的时候并未构造出 table 数组 (可以理解为 ConcurrentHashMap 的数据容器) , 只是算出 table 数组的长度, 当第一次向 ConcurrentHashMap 插入数据的时候才真正的完成初始化创建 table 数组的工作

3.2 ConcurrentHashMap 中数组的初始方法 initTable()

在上面的指定容量的构造函数中, 可以看到, 只是对入参的容量参数进行了处理, 然后赋值给 sizeCtl, 就结束了, 而真正存储数据的 table 数组还是为空的。
这是一种懒加载的方式, 而只要第一次向里面放数据时, 就会进行数组的初始化, initTable 就是初始化的方法。

public class ConcurrentHashMap<K,V> {

    private final Node<K,V>[] initTable() {

        Node<K,V>[] tab; 
        int sc;

        // table 为空 或者长度为 0, 进入循环, 否则进入后面的逻辑
        while ((tab = table) == null || tab.length == 0) {

            // 当前的 sizeCtl 小于 0, 表示 ConcurrentHashMap 正在扩容中
            if ((sc = sizeCtl) < 0)
                // 让当前线程让出行时间段, 使正在运行中的线程重新变成就绪状态, 后面重新竞争 CPU 的调度权
                // 确保当前只有一个线程在初始化
                Thread.yield();

            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                // 通过 CAS 将 sizeCtl 设置为 -1, 成功了, 进行初始化
                // 在初始时, 会先将 sizeCtl CAS 为 -1, 也就是其他的线程进入到 while 里面, 也会在上面的判断时, 判断为 true 然后让出执行权

                try {
                    // 再次判断 table 为 null 或者长度为 0 
                    if ((tab = table) == null || tab.length == 0) {
                        
                        // 如果指定的容量小于 0, 使用默认值 16, 进行声明, 否则就用指定的大小进行声明
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;

                        // 初始数组
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;

                        // 计算数组中可用的大小: 实际大小 n * 0.75 (加载因子) 
                        // n >>> 2 相等于 n /2 /2 = n/4 = 0.25
                        // sc = n - 0.25 * n = 0.75 * n
                        sc = n - (n >>> 2);

                    }

                } finally {
                    // 这时候 sizeCtl 存放的是阈值, 不是数组的长度, 也不是 -1 了
                    // 如果上面异常了, 可以把 sc 重新赋值过来, 也就是还是维护的是数组的长度
                    sizeCtl = sc;
                }
            }    
        }
    }
}

代码的逻辑请见注释, 有可能存在一个情况是多个线程同时走到这个方法中, 为了保证能够正确初始化, 会在数组初始时做了一个判断 sizeCtl < 0 的判断,
若当前已经有一个线程正在初始化即 sizeCtl 值变为 -1, 这个时候其他线程在 if 判断为 true 从而调用 Thread.yield() 让出 CPU 时间片。
正在进行初始化的线程会调用 U.compareAndSwapInt 方法将 sizeCtl 改为 -1 即正在初始化的状态。

构造方法 + initTable 基本就构成了 ConcurrentHashMap 的初始的全部逻辑了。

3.3 ConcurrentHashMap 新增数据 - put() 方法

public class ConcurrentHashMap<K,V> {

    static final int spread(int h) {
        // ConcurrentHashMap 中 hash 值的计算方法
        // h 异或 (h 无符号右移 16 位) 后, 再与上 HASH_BITS
        // HASH_BITS = 0x7fffffff = 2147483647, 二进制表示为  01111111 11111111 11111111 11111111
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    final V putVal(K key, V value, boolean onlyIfAbsent) {

        // 注意: 这里和 HashMap 的区别, HashMap 允许一个 key 为 0 和 多个 value 为 null
        // ConcurrentHashMap 则 key 和 value 都不允许, 直接抛出异常
        if (key == null || value == null) 
            throw new NullPointerException();

        // 第一步, 计算 key 对应的 hash 值
        int hash = spread(key.hashCode());   

        // 用来计算对应位置的链表的长度
        int binCount = 0;

        // 注意: 这里做了一个死循环, 自旋操作
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; 
            int n, i, fh;

            // 第二步, 当前的数据结构为 null 或者 长度为 0, 进行初始化
            if (tab == null || (n = tab.length) == 0)
                // 继续数组的声明, 数组创建成功后, 回到循环的头部, 重新循环
                tab = initTable();
            // 第三步, 将当前 key 对应的 hash 值 & 上 数组的长度 - 1 (这一步的操作, 在数组的长度为 2 的 n 次方的情况下《等同于 hash % 数组的长度) 
            // 得到当前 value 存放在数组的哪个位置, 并赋值给 i    
            // 通过 CAS 得到 i 位置的值, 如果为空, 进入判断里面的操作
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                // 把当前的数据封装为节点, 通过 cas 把这个节点放到 i 位置
                if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
                    // cas 操作成功, 跳出循环, 但是还是会执行到下面的 addCount 方法, 这时候 binCount = 0
                    break;
            // 第四步, MOVED = -1; 对应位置的节点的 hash 为 -1, 既当前节点为 ForwardingNode 节点, 表示当前正在扩容, 当前线程进行协助扩容        
            } else if ((fh = f.hash) == MOVED) {
                // 协助扩容
                // 协助扩容后, 又会回到循环的头部, 重新处理
                tab = helpTransfer(tab, f);
            } else {
                // 当前 value 应该存放到位置已经有节点了

                V oldVal = null;

                // 如果指定的位置不为 null, 同时不是 ForwardingNode, 对这个对象上锁
                // 因为这里对这个节点上锁了, 所以扩容和添加新数据, 只有一个线程进入
                synchronized (f) {
                    // 做多一次判断, 当前位置的节点还是指定的节点
                    if (tabAt(tab, i) == f) {
                        // 第五步, 当前为链表, 在链表中插入新的键值对, 或者存在 key 相同的, 进行值替换
                        // fh = 定位到的位置的 hash, > 0 为 链表, -2 表示的是树节点
                        if (fh >= 0) {

                            // 链表当前有节点个数
                            binCount = 1;
                            // 循环遍历 链表
                            for (Node<K,V> e = f;; ++binCount) {

                                K ek;
                                // 找到 hash 和 key 完成相同的节点, 覆盖旧值即可
                                if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                    // 旧值
                                    oldVal = e.val;
                                    // 是否替换旧值, 这里是 !false
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    // 这里旧值不为 null 并且 binCount >= 1, 走到下面的判断后, 不会走到 addCount 方法
                                    break;
                                }

                                Node<K,V> pred = e;
                                // 如果当前节点的下一个节点为 null, 把当前数据封装为新节点放到链表的尾部, 跳出循环
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key, value, null);
                                    // 这时候旧值为 null, binCount = 链表的长度, 会走到 addCount 方法
                                    break;
                                }

                            }
                        }
                        // 第六步, 当前为红黑树节点的话, 将新的键值对插入到红黑树中
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            // 直接把 binCount 设置为 2, 用于后面判断, 跳出循环
                            binCount = 2;
                            // 将节点转为 TreeBin 红黑树, 调用 TreeBin 的 putTreeVal 方法, 尝试将当前节点放到树内
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }                     
                }

                // 第七步, 插入完键值对后再根据实际大小看是否需要转换成红黑树
                if (binCount != 0) {
                    // 链表的长度大于等于 8 了, 红黑树化
                    if (binCount >= TREEIFY_THRESHOLD)
                        // 在 treeifyBin 中会判断当前的数组长度, 如果长度小于 MIN_TREEIFY_CAPACITY = 64
                        // 不会红黑树化, 而是直接扩容
                        treeifyBin(tab, i);

                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }    
        }

        // 第八步, 新增元素节点 1 个, 当前新增元素所在位置的节点个数 binCount 元素计数, 
        // 在 addCount 中检查是否需要扩容
        addCount(1L, binCount);
        return null;
    }
}

从整体而言, 为了解决线程安全的问题, ConcurrentHashMap 使用了 synchronizedCAS 的方式。

ConcurrentHashMap 的数据存储方式和 HashMap 没有多大的区别。ConcurrentHashMap 是一个哈希桶数组, 如果不出现哈希冲突的时候, 每个元素均匀的分布在哈希桶数组中。
当出现哈希冲突的时候, 是标准的链地址的解决方式, 将 hash 值相同的节点构成链表的形式, 称为 “拉链法”, 另外, 在 1.8 版本中为了防止拉链过长, 当链表的长度大于
8 (同时还需要满足数组的长度达到了 64, 否则只会进行数组的扩容, 进行重新调整) 的时候会将链表转换成红黑树。table 数组中的每个元素实际上是单链表的头结点或者红黑树的根节点。

上面的新增数据的流程大体如下:

第一步: 计算 key 的 hash 值

当插入键值对时, 首先应该定位到要插入的桶, 即插入 table 数组的索引 i 处。那么, 怎样计算得出索引 i 呢? 在 ConcurrentHash 通过 spread 方法就能得到 key 对应的 hash 值。

spread() 方法 主要是将 key 的 hashCode 和 key 的高 16 位进行异或运算, 这样不仅能够使得 hash 值能够分散, 均匀减小 hash 冲突的概率, 另外只用到了异或运算,
在性能开销上也能兼顾, 做到权衡。

第二步: table 的初始化
判断当前的存放数据的 table 是否为空或者长度为 0, 如果是的话, 调用 initTable() 方法进行初始化, 方法的介绍可以看上面的。

第三步: 能否直接将新值插入到 table 数组中

通过公式 通过当前 key 计算出来的 hash 值 & (当前存储数据的数组的长度 - 1), 得到当前的键值对应该存储在数组的哪个位置, 这个计算公式在数组长度确保为 2 的 n 次方下,
相当于 hash 对数组的长度取模, 既 hash % n

得到当前键值对待插入的位置, 再通过 tabAt() 方法获取该位置的节点, 如果节点为 null 的话, 就可以直接用 casTabAt() 方法将新值插入即可。

第四步: 当前是否正在扩容, 协助扩容
如果键值对待插入的位置的节点不为 null, 且该节点为特殊节点 (ForwardingNode) 的话, 就说明当前 ConcurrentHashMap 正在进行扩容操作。
怎样确定当前的这个 Node 是不是特殊的节点了? 是通过判断该节点的 hash 值是不是等于 -1(MOVED)。如果是说明当前正在扩容中, 则会协助扩容。

helpTransfer 协助扩容这里涉及到扩容的一些知识, 所以将其放到后面的扩容一起讲解。

第五步: 当前为链表, 在链表中插入新的键值对, 或者存在 key 相同的, 进行值替换
在 table[i] 不为 null 并且不为 ForwardingNode 时, 并且当前 Node f 的 hash 值大于 0 (fh >= 0) 的话说明当前节点 f 为当前桶的所有的节点组成的链表的头结点。
那么接下来, 要想向 ConcurrentHashMap 插入新值的话就是向这个链表插入新值就行了。通过 synchronized(f) 的方式进行加锁以实现线程安全性。
只对数组中这个位置的链表进行加锁, 而不是整个数组加锁, 提高了并发性。

第六步: 当 table[i] 为红黑树的根节点, 在红黑树中插入新值
按照之前的数组 + 链表的设计方案, 这里存在一个问题, 即使负载因子和 Hash 算法设计的再合理, 也免不了会出现拉链过长的情况, 一旦出现拉链过长, 甚至在极端情况下,
查找一个节点会出现时间复杂度为 O(n) 的情况, 则会严重影响 ConcurrentHashMap 的性能, 于是, 在 JDK 1.8 版本中, 对数据结构做了进一步的优化, 引入了红黑树。
而当链表长度太长 (默认超过 8) 时, 链表就转换为红黑树, 利用红黑树快速增删改查的特点提高 ConcurrentHashMap 的性能。

通过 f instanceof TreeBin 判断当前 table[i] 是否是树节点, 这下也正好验证了我们在最上面介绍时说的 TreeBin 会对 TreeNode 做进一步封装,
对红黑树进行操作的时候针对的是 TreeBin 而不是 TreeNode。
如果在红黑树中存在于待插入键值对的 Key 相同 (hash 值相等并且 equals 方法判断为 true) 的节点的话, 就覆盖旧值, 否则就向红黑树追加新节点

第七步: 插入完键值对后再根据实际大小看是否需要转换成红黑树
如果当前链表节点个数大于等于 8 (TREEIFY_THRESHOLD) 的时候, 就会调用 treeifyBin 方法将 tabel[i] (第 i 个散列桶) 拉链转换成红黑树。

第八步 添加元素计数, 并在 binCount 大于 0 时检查是否需要扩容
每次新增的时候都会对当前的数组的长度做一个检查, 超过了临界值, 就会进行扩容, 相关的方法 addCount

同样的, addCount 涉及到了扩容方面的知识, 将其也放到扩容的时候一起讲。

整体的流程是这样的:

  1. 首先对于每一个放入的值, 首先利用 spread 方法对 key 的 hashcode 进行一次 hash 计算, 由此来确定这个 key 应该存放在 table 中的位置
  2. 如果当前 table 数组还未初始化, 先将 table 数组进行初始化操作
  3. 如果要存放的位置是 null 的, 那么使用 cas 操作直接放入
  4. 如果这个位置存在结点, 说明发生了 hash 碰撞, 首先判断这个节点的类型。如果该节点 fh == MOVED (代表 forwardingNode, 数组正在进行扩容) 的话, 说明正在进行扩容, 协助扩容, 扩容结束后, 重新判断
  5. 如果是链表节点 (fh > 0), 则得到的结点就是 hash 值相同的节点组成的链表的头节点。需要依次向后遍历确定这个新加入的值所在位置。如果遇到 key 相同的节点, 则只需要覆盖该结点的 value 值即可。否则依次向后遍历, 直到链表尾插入这个结点。
  6. 如果这个节点的类型是 TreeBin 的话, 直接调用红黑树的插入方法进行插入新的节点
  7. 插入完节点之后再次检查链表长度, 如果长度大于 8, 就把这个链表转换成红黑树
  8. 对当前容量大小进行检查, 判断是否需要扩容

3.4 ConcurrentHashMap 获取数据 - get() 方法

看完了 put 方法再来看 get 方法就很容易了, 用逆向思维去看就好, 怎么存, 反过来这么取就好了, get 方法的源码:

public class ConcurrentHashMap<K,V> {

    public V get(Object key) {

        Node<K,V>[] tab; 
        Node<K,V> e, p; 
        int n, eh; K ek;

        // 得到 key 的 hash 值
        int h = spread(key.hashCode());

        // table 数组不为空同时长度大于 0, 计算得到的位置不为 null
        if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {

            // 指定位置的 hash key 都相同, 直接在头节点找到了, 直接返回这个节点值
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }        
            // 节点的 hash 为 0 说明为树节点, 在红黑树中查找即可
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;

            // 依次遍历链表的每一个节点
            while ((e = e.next) != null) {
                // hash 和 key 都相同, 返回这个节点 value 
                if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    // 找到了直接返回
                    return e.val;
            }    
        }
        return null;
    }
}

整理后的流程大体如下:

  1. 计算出 key 的 hash
  2. 通过 hash 得到对应位置的节点, 如果为 null, 直接返回
  3. 判断对应位置的节点的 hash 和 key 是否一致, 如果是的话, 返回这个节点的值
  4. 判断这个节点的 hash 小于 0, 说明这个为树节点, 在树中进行查找
  5. 到了这一步, 说明这个节点为链表, 依次遍历这个链表的节点, 进行查找
  6. 如果都没有的的, 直接返回 null

3.5 ConcurrentHashMap 的扩容

当 ConcurrentHashMap 容量不足的时候, 需要对 table 进行扩容。这个方法的基本思想跟 HashMap 是很像的, 但是由于它是支持并发扩容的, 所以要复杂的多。
原因是它支持多线程进行扩容操作, 而并没有加锁。这样做的目的不仅仅是为了满足 concurrent 的要求, 而是希望利用并发处理去减少扩容带来的时间影响。

3.5.1 扩容标识的获取 - resizeStamp() 方法
public class ConcurrentHashMap<K,V> {

    /**
     * 返回一个标志位用于调整 table 的大小
     * 这个数左移 RESIZE_STAMP_SHIFT, 也就是 16 位, 必须后能得到一个负数
     */
    static final int resizeStamp(int n) {

        // Interger.numberOfLeadingZero 的作用: 返回整型 n 非零最高位前面 0 的个数
        // 比如: 13 的二进制为: 00000000 00000000 00000000 00001101 , 从右往左, 最后一个非零的前面有 28 个 0, 所以这里等于 28

        // RESIZE_STAMP_BITS = 16 , 所以 1 << (RESIZE_STAMP_BITS - 1) 固定为 32768, 二进制表达为 00000000 00000000 10000000 00000000

        // 这里的 |, 或操作, 2 位只要有一个是 1 就是 1 了, 在这里可以简单的看出是 2 个数相加
        return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
    }
}

这个方法看起来就一行代码, 但是拆成三部分来看。

Integer.numberOfLeadingZeros(n)
这个方法的作用: 返回一个整形, 从左往右到第一个非 0 位的位数, 比如 13 的二进制为 00000000 00000000 00000000 00001101, 从左往右算到第一个 1 时, 总共有 28 位, 所以这个返回 28。

因为 int 为 32 位, 那么经过这个方法计算最终得到的结果最大为 32 (输入了 0), 最小当然是 0 (输入了 -1), 所以这个方法返回值在 0 - 32 之间。返回值的二进制为 00000000 00000000 00000000 00XXXXXX

1 << (RESIZE_STAMP_BITS - 1)

RESIZE_STAMP_BITS 是一个常量, 等于 16, 1 << (16 - 1) = 32768, 二进制为: 00000000 00000000 10000000 00000000, 刚好得到了一个第 16 位为 1 的数

| (或操作)
或运算, 这里可以看做是 2 个数相加, 将上面 2 步的结果进行或运算后, 可以得到一个二进制: 00000000 00000000 10000000 00XXXXXX

这个方法的作用是返回一个扩容标志符, 这个标志符经过左移 16 位后, 必须是一个负数。这个标志位主要用于后面的扩容。

通过这个方法得到的数字, 左移 16 位后, 为 10000000 00XXXXXX 00000000 00000000, 将这个数分成 2 部分。

高 16 位表示当前在扩容中, 而且保存了当前 table 的长度。
调用 resizeStamp() 方法, 传过来的 n = 当前 table 的长度, 而 table 的长度是 2^n 次方, 2^n 在二进制的形式为从 01 {n 个 0} 的形式, 结合 Interge.numberOfLeadingZeros 就可以知道当前 table 的长度了。
而低 16 位用在后面的记录当前参与扩容的线程数。每有一个线程参加了扩容就加 1, 所以最多参与扩容的线程个数就是 00000000 00000000 11111111 11111111 既 2^17 - 1 个线程。

先记住他的含义, 后面扩容的时候, 就能了解他的神奇之处。

3.5.2 扩容的判断 - addCount() 方法

在每次 put 新值的时候, 存放成功后, 都会执行一次是否需要扩容的检测, 如果需要就会调用扩容, 扩容的检测的方法就是 addCount() 方法

public class ConcurrentHashMap<K,V> {

    /**
     * 这个方法的作用: 更新当前的键值对的个数, 同时根据个数判断是否需要扩容
     * 不存在并发的情况下, 键值对的个数存放在 baseCount 下
     * 但是当出现了并发的时候, 键值对的个数的统计是通过数组 counterCells 每一项的 value 的累加
     *
     * @param x  增加的个数
     * @param check 检查模式,   check < 0 不进行扩容的的检测, check >= 0 进行扩容检测, 但是出现了并发时,  0<= check <= 1, 不进行扩容检测
     */
    private final void addCount(long x, int check) {

        // CounterCell 是 ConcurrentHashMap 中的一个内部类
        // 整个类只有一个 long value 的属性
        // 主要用于并发时, ConcurrentHashMap 的数据个数的计算
        CounterCell[] as; 

        long b, s;

        if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            
            // 1. 用于统计数组个数的 counterCells 数组已经不为 null 了
            // 2. 通过 cas 将当前直接存储 ConcurrentHashMap 元素个数的 bseCount + x, 失败 (baseCount 存放的是当前 ConcurrentHashMap 中存放的数据个数)
            
            // 这 2 个情景都可以确定当前的增加元素个数进入了并发情景了

            CounterCell a; 
            long v; 
            int m;

            // 线程争用的状态标记, 默认为无竞争
            boolean uncontended = true;

            // ThreadLocalRandom.getProbe() 随机产生一个 int 的数字, 
            // 下面的 m = ConcurrentHashMap 的 CounterCell[] counterCells 数组的长度, 默认声明是为 2, 后面扩容也是在原来的基础上 * 2, 所以 counterCell 的长度也是 2 的 n 次方
            // 所以通过 ThreadLocalRandom.getProbe() & m, 就是从  CounterCell[] counterCells 中随机去一个位置

            if (as == null || (m = as.length - 1) < 0 || (a = cs[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)))) {
                
                // 1. ConcurrentHashMap 的 CounterCell[] counterCells 为空或者长度小于 0
                // 2. 从 CounterCell[] counterCells 随机取出一个位置为 null
                // 3. 从 CounterCell[] counterCells 随机取出一个位置不为 null, 通过 cas 将这个位置的 CounterCell 的 value + x 失败了

                // 这里只有是给当前的 ConcurrentHashMap 的元素个数加上 x, 通过 fullAddCount() 方法实现的
                // 这个方法的复杂度很高, 就不展开了, 说一下大体的功能

                // 先通过 ThreadLocalRandom.getProbe() 当前线程的用于产生随机数的随机种子
                // 同一个线程一直调用这个方法会返回通过一个值, 调用 advanceProbe() 方法进行刷新, 再次调用 getProbe() 可以得到另一个新的随机种子

                // 下面的整个操作的过程都是自旋的

                // 1. 当前的 counterCells 有数据
                // 1.1 通过获取到随机种子, 算出在 counterCells 中的一个位置
                // 1.2 获取到的位置不为 null, 通过 cas 给这个位置的 CounterCell 的 value + x
                // 1.2.1 cas 设置成功了, 结束整个方法
                // 1.2.2 cas 设置失败了, 给 counterCells 进行扩容, 变为原来的 2 倍, 调用 ThreadLocalRandom.advanceProbe() 获取新的随机种子, 回到自旋循环的开头

                // 1.3 获取到的位置为 null, 创建一个新的 CounterCell, value 为 x
                // 1.3.1 在判断这个位置为 null, 将新的 CounterCell 放到这个位置, 结束
                // 1.3.3 在判断这个位置不为 null, 回到自旋循环的开头

                // 2. counterCells 没有数据, 通过 cas 将 baseCount 加 x

                // 在 fullAddCount 中实际的实现比上面的复杂, 会通过 cas cellsBusy 从 0 设置为 1, 设置成功, 才能进行上面的操作, 设置失败都只能自旋
                fullAddCount(x, uncontended);

                // 注意这里的 return 有个问题, 通过 fullAddCount() 给当前的 ConcurrentHashMap 的元素个数 + 1, 为什么不检测是否需要扩容呢?

                // 这里其实需要从多线程的角度考虑了, 能够进到这里, 有哪些情况? 结合上面 2 个 if

                // 1. counterCells 不为 null,  并且通过 cas 更新 counterCells 的计算出来的位置的值失败, 那么就是有另一个线程更新了这个值, 扩容的检测交给那个线程就行了
                // 2. counterCells 为 null, 通过 cas 更新 baseCount 失败了, 进到这里很大概率 as 为空, 同样的是交给另一个线程进行扩容的检测
                return;
            }

            // 增加元素个数进入了并发情景了
            // 不需要进行扩容的检测
            if (check <= 1)
                return;
            // 统计总个数
            // baseCount + CounterCell[] counterCells 中每个不为 null 的元素的 value  
            s = sumCount();
        }

        // 进行扩容的检测
        if (check >= 0) {

            Node<K,V>[] tab, nt; int n, sc;

            // 当前的元素个数 > 阈值 并且 table 不为空, 同时 table 的长度小于最大容量 2^30, 还有扩容空间
            while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {

                //根据数组长度获取一个扩容标志, 格式 16 个 0 + 1 + 15 位 (表示当前 table 容量的二进制表示从左往右 0 的个数)
                int rs = resizeStamp(n);

                // 在上面我们说过了 sizeCtl 的不同取值有不同的含义, 
                // 小于 0, 初始中或者有线程在扩容中, 但是在 addCount 方法可以断定为扩容中
                if (sc < 0) {

                    // 1. sc 无符号右移 16 位和扩容标识 rs 不相同, 将不做处理, 
                    // 出现不同的原因就是 table 的长度改变了, 导致计算出来的 rs 变了 也就是上次的扩容完成了, 不进行处理
                    // 当第一个线程进行扩容的时候, 会把 sizeCtl 设置为计算出来的 rs 右移 16 位后 + 2, 
                    // 所以当改变后的 sizeCtl 左移 16 位和当前的 rs 不一致了, 说明 table 的长度已经变了
                    // 如果 rs 从 + 1, 是无法区分 sizeCtl 低 16 位都是 0 时, 是扩容为开始还是扩容已经结束
                    // 如果直接 + 2, 后续减少时 -1, 那么低 16 位都是 0 表示扩容未开始, 低 16 位为 1, 扩容结束


                    // 2. sc == rs + 1 这里是一个 bug, 应该改为 sc == (rs <<< RESIZE_STAMP_SHIFT) + 1
                    // sc 在经过条件 1 后, 值是不会变的, 那么 sc 依旧是小于 0, 而 rs 在我们分析后必定是一个大于 0 的数, 所以 sc 一定不会等于 rs + 1, 应该是 sc == (rs <<< RESIZE_STAMP_SHIFT) + 1
                    // 这个 bug 可以查看 这里 https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427, 预计在 JDK 12 修复, 在 14 的时候已经看到修复了

                    // 如果修改为上面的, 我们知道 sc 的低位存放的是当前线程参与扩容的线程数, 默认是从 2 开始累积的, 在扩容 transfer 中,
                    // 每有一个线程扩容完成时会 - 1, 当 sc 的低位减到了 1 时, 说明这时候扩容已经完成了, 而 rs 无符号左移 16 位后, 低 16 位都是 0, 
                    // 如果 sc 的低位 = 右移后的 rs 的低位 + 1, 说明扩容完成了, 正在进行末尾的处理了, 不进行处理

                    // 3. 同上面的条件 2 一样, 应该修改为 sc == (rs <<< RESIZE_STAMP_SHIFT) + MAX_RESIZERS, 这里的 MAX_RESIZERS = 1 << 16 - 1; 
                    // 也就是 00000000 00000000 11111111 11111111, 也就是我们扩容能支持的最大线程数, 这里也就是我们当前扩容的线程数达到了上限, 不进行处理

                    // 4. 在扩容的时候, 我们会先声明 nextTable, 逐渐将数据放到这个对象, 如果这个对象为 null, 说明没有线程在进行扩容, 不进行处理, 在上面的 size < 0, 我们可以得知线程进来是有线程在扩容, 现在 nextTable 为空, 说明扩容完成了

                    // 5. 转移索引标识 transferIndex <= 0, 不进行处理, 这个变量用于扩容时控制, 可以查看下面的扩容方法, <= 0 说明我们的扩容已经完成了

                    if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0)
                        break;

                    // 在 sizeCtl 添加多一个线程, 开始协助扩容, 这时候扩容的新数组就是 nextTable
                    if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
                        transfer(tab, nt);    

                } 
                // 尝试将 sizeCtl 设置为 扩容标志 rs 左移 16 位 + 2, 2 表示线程数默认是从 2 开始累积的
                else if (U.compareAndSetInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
                    // cas 将 sizeCtl 设置成功, 开始扩容
                    //  进行扩容, 存放数据的新数组为 null,
                    transfer(tab, null);
                
                // 重新计算元素个数
                s = sumCount();
            }
        }
    }
}

总的来说: addCount 就做了 2 件事

  1. 更新当前的元素个数
  2. 检查是否需要扩容或者协助扩容
3.5.3 扩容 - transfer() 方法
public class ConcurrentHashMap<K,V> {

    /**
     * 扩容
     * 大体的步骤:
     * 通过当前机器的 cpu 和 当前数组的容量算出一个迁移跨度  stride
     * 每次一个线程进来扩容的话,就会分配当前数组的 stride 个位置给它, 另一线程进来或者当前线程 stride 个位置已经迁移完了, 
     * 就再分配另外 stride 个位置给他, 直到当前数组所有的位置都分配完成
     * ConcurrentHashMap 的  transferIndex 扩容时默认为当前数组的容量, 类似于一个游标, 每次分配是从这个游标开始往前 stride 个位置就是分配个
     * 这个线程的区间了。
     * 里面声明的 2 个变量 i 和 boud, 就是当前线程可以处理的区间的位置 [bound, i], [transferIndex - 1 - stride, transferIndex - 1]
     * 同时把 transferIndex 更新为 transferIndex - stride
     * 
     * 遍历中的操作
     * 1. 获取到的位置为空, 填充为 ForwardingNode 节点, 处理下一个节点
     * 2. 获取到的位置的节点为 ForwardingNode, 跳过处理下一个节点
     * 3. 判断当前位置的节点是否为链表或者树
     * 3.1 链表, 新找到尾节点, 尾节点的 hash & 当前数组的容量 == 0 ? 放到低纬链表的头部, 放到高纬链表的头部, 使用同 HashMap 的高低位链表的方式进行迁移
     * 3.2 遍历当前位置的链表直到倒数第二个, 处理的节点的 hash & 数组的长度 = 0? 应该放到低纬, 应该放到高纬度
     * 3.3 把当前的节点封装为新的节点, 这个节点的下一个节点为当前的高纬/低纬节点, 然后把当前节点放到低纬/高纬的位置
     * 3.4 最后把临时数组 nextTab 的 i 位置设置为当前的低纬链表, i + 当前数组的容量的位置设置为当期的高纬链表
     * 3.5 把当前数组的 i 位置设置为 ForwardingNode 节点, 下一个遍历
     *
     * 迁移完成
     * 把临时数据 nextTable 赋值给真正的存储数组 table
     * 把 nextTable 设置为 null 
     * 把 sizeCtl 设置为 旧数组容量的 * 1.5 
     * 
     * 没法迁移了, 就给当前的 sizeCtl 减  1
     *
     * @param tab 扩容前的数组
     * @param nextTab 新的数组, 扩容的话, 这里为 null, 协助扩容的为, 为新的数组
     */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {

        // 这里的 tab 就是成员变量 table
        int n = tab.length, stride;

        // MIN_TRANSFER_STRIDE = 16   NCPU = 当前服务器的 CPU 核数
        // n 为数组的长度, 默认为 2 的 n 次方, 所以这里的 >>> 3, 相当于除以 8
        // 当前为多核服务器的话 stride = 当前的容量 / 8 / CPU 的数量
        // 如果为单核的话的话, stride = 当前数组容量
        // 最终计算出来的 stride 如果小于 16, 将其变为 16
        if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
            stride = MIN_TRANSFER_STRIDE;

        
        // 如果用于存数据的下一个数组为 null, 进行初始, 然后把这个数组赋值给变量 nextTable,
        if (nextTab == null) {
            try {

                // 扩容为原来的 2 倍
                @SuppressWarnings("unchecked")
                Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
                nextTab = nt;

            } catch (Throwable ex) {
                // 扩容失败, 直接将阈值变为 int 的最大值, 然后返回
                sizeCtl = Integer.MAX_VALUE;
                return;
            }

            // 将创建出来的扩容数组赋给给 nextTable, 可以让其他线程看见
            nextTable = nextTab;

            // 扩容索引变为扩容前数组的长度
            transferIndex = n;
        }   

        // 记录过渡数组的长度
        int nextn = nextTab.length; 

        // 创建一个 ForwardingNode 节点, 用于占位。当别的线程发现这个槽位中是 ForwardingNode 类型的节点, 则跳过这个节点
        ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

        // 该变量控制迁移的进行 true 说明可以再次迁移一个下标 (i--) , 
        // 反之, 如果是 false, 那么就不能推进下标, 需要将当前的下标处理完毕才能继续推进
        boolean advance = true;

        // 完成状态, 如果是 true, 说明迁移完成, 可以结束此方法
        boolean finishing = false;

        // 下面的循环主要是
        // 1. 为了计算出当前线程需要处理的区间 [transferIndex - stride, transferIndex], 即 [bound (transferIndex - stride), i(transferIndex - 1)], 
        // 同时更新 transferIndex = transferIndex - stride, 方便后续的线程或者线程重新获取的迁移的区间
        // 2. 遍历中, 负责进行 i - 1 操作, 然后决定是继续遍历还是进行下一次分配

        // i 表示处理的当前桶区间最大下标, bound 表示当前线程可以处理的当前桶区间最小下标
        for (int i = 0, bound = 0;;) {
            
            Node<K,V> f; 
            int fh;
            // 当前线程是否可以向后推进, 这个循环就是控制 i 递减, 同时, 每个线程都会进入这里取得自己需要转移的桶的区间
            while (advance) {
                int nextIndex, nextBound;

                // 对 i 减一, 判断是否大于等于 bound (正常情况下, 如果大于等于 bound 不成立, 说明该线程上次领取的任务已经完成了那么, 需要在下面继续领取任务)
                // 如果对 i 减一, 大于等于 bound (还需要继续做任务) , 或者迁移完成了, 修改推进状态为 false, 不能推进了。 每迁移完成一个位置, 推进状态会被修改为 true
                // 通常, 第一次进入循环, i-- 这个判断会无法通过, 从而走下面的 nextIndex 赋值操作 (获取最新的转移下标) 

                //其余情况都是: 如果可以推进, 将 i 减一, 然后修改成不可推进。如果 i 对应的位置处理成功了, 又把推进状态修改成可以推进。
                if (--i >= bound || finishing)
                    advance = false;
                else if ((nextIndex = transferIndex) <= 0) {
                    // 如果 transferIndex 小于等于 0, 说明没有区间了, i 改成 -1, 推进状态变成 false, 不再推进, 表示, 扩容结束了, 当前线程可以退出了
                    i = -1;
                    advance = false;
                } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { 
                    
                    // 通过 cas 减少 transferIndex (transferIndex - stride 跨度) 的值成功了, 为当前线程重新分配新的扩容区间

                    // 这里的目的是: 
                    // 1. 当一个线程进入时, 使其可以获取最新的转移下标
                    // 2. 当一个线程处理完自己的区间时, 如果还有剩余区间的没有别的线程处理, 再次获取区间

                    // 这个值就是当前线程可以处理的最小当前区间最小下标
                    bound = nextBound;
                    // 初次对 i 赋值, 这个就是当前线程可以处理的当前区间的最大下标
                    i = nextIndex - 1;
                    advance = false;
                }
                
            }

            // i < 0 (当 transferIndex <= 0 时, 数组已经分配完成了, 线程进入到上面的 while, 使得 i = -1, 然后走到这个方法, 让线程结束扩容 )
            // i >= 旧数组的长度
            // i + n >= 新数组的长度
            if (i < 0 || i >= n || i + n >= nextn) {

                int sc;
                // 如果完成了扩容
                if (finishing) {
                    // 成员变量设置为 null
                    nextTable = null;
                    // 将迁移完成的数组赋值给 table
                    table = nextTab;
                    // sizeCtl 变为 1.5 倍 (2n - 0.5n)
                    sizeCtl = (n << 1) - (n >>> 1);
                    return;
                }

                // 通过 cas 设置 sizeCtl 的扩容线程数减 1
                if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
                    // 一开始进入扩容时 sc + 2, 这里 - 2 等于扩容标识, 说明当前扩容完成了, 只剩一个线程了
                    // 让这个线程进行扩容完成后的处理
                    if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
                        return;
                    // 更新结束标志为 true, 推进标志为 true
                    finishing = advance = true;
                    // 将 i 重新设置为 n, 让上面的 --i >= bound 为 true, 可以进入后面 finish 的判断
                    i = n;
                }  

            }                  
            // 扩容时发现负责的区域有空的桶直接使用 ForwardingNode 填充
            else if ((f = tabAt(tab, i)) == null)
                advance = casTabAt(tab, i, null, fwd);
            // 对应位置的节点为 -1 (MOVED) 已经处理了, 继续下一个位置的处理   
            else if ((fh = f.hash) == MOVED)
                advance = true;
            else {    
                // 对 table 的 i 位置的节点上锁, 防止 putVal 的时候向链表插入新的数据
                synchronized (f) {
                    // 做一次判断, 确保对应位置的对象为需要处理的对象
                    if (tabAt(tab, i) == f) {
                        
                        // 此处扩容和 HashMap 有点像, 在原数组的 pos 位置的节点, 在扩容后, 会在新数组的 pos 位置 或者 pos + 原数组长度的位置
                        // 前者为低纬  lowNode, 后者为高纬 highNode
                        Node<K,V> ln, hn;

                        
                        // 如果 f 的 hash 值大于 0, 表明这个节点为链表
                        if (fh >= 0) {
                            // 节点的 hash  & 旧数组的长度
                            int runBit = fh & n;

                            // 存储的是链表的尾结点
                            Node<K,V> lastRun = f;

                            // 遍历链表, 找到最后链表中最后的一个
                            for (Node<K,V> p = f.next; p != null; p = p.next) {

                                int b = p.hash & n;
                                // hash 和 runBit 不一样了, 进行替换
                                if (b != runBit) {
                                    runBit = b;
                                    lastRun = p;
                                }
                            }
                            // 尾结点的 hash 处理后为 0, 将尾节点放在低纬链表
                            if (runBit == 0) {
                                ln = lastRun;
                                hn = null;
                            }
                            // 尾结点的 hash 处理后不为 0, 将尾结点放在高纬链表
                            else {
                                hn = lastRun;
                                ln = null;
                            }

                            // 没有上面的操作, 直接遍历链表也完成功能, 只是上面的都是做了一些优化

                            // 比如 原来数组的链表为 1(低纬) - 2(高纬) - 3(高纬) - 4(低纬) - 5(低纬) - 6(低纬) - 7(低纬)
                            // 那么经过上面的处理, lastRun = 4(低纬)
                            // ln = 4(低纬) - 5(低纬) - 6(低纬) - 7(低纬)

                            // 因为 lastRun 后面的节点必定和 lastRun 在同一个位置, 那么我们在迁移时, 只需要遍历到 lastRun 这个位置就行了
                            // 后面的都是位置和 lastRun 一样, 直接把 lastRun 移过来就行了, 后面的节点都在 lastRun 后面, 不用处理了

                            // 注意了, 这里是头插法, 也就是我们直接遍历链表, 把节点直接放到lastRun 的前面就行了

                            // 重新遍历链表, 以 lastRun 为结束标志
                            for (Node<K,V> p = f; p != lastRun; p = p.next) {
                                int ph = p.hash; K pk = p.key; V pv = p.val;

                                // 可以看出这里是头插法
                                if ((ph & n) == 0)
                                    ln = new Node<K,V>(ph, pk, pv, ln);
                                else
                                    hn = new Node<K,V>(ph, pk, pv, hn);
                            }

                            // 在 nextTable 的 i 位置上插入一个链表
                            setTabAt(nextTab, i, ln);

                            // 在 nextTable 的 i + 旧数组长度的位置上插入一个链表
                            setTabAt(nextTab, i + n, hn);

                            // 在 table 的 i 位置上插入 forwardNode 节点
                            setTabAt(tab, i, fwd);

                            // 设置推进标志为 true
                            advance = true;

                        } else if (f instanceof TreeBin) { 
                            // 树节点, 处理
                            TreeBin<K,V> t = (TreeBin<K,V>)f;
                            TreeNode<K,V> lo = null, loTail = null;
                            TreeNode<K,V> hi = null, hiTail = null;
                            int lc = 0, hc = 0;

                            // 遍历树节点
                            for (Node<K,V> e = t.first; e != null; e = e.next) {
                                int h = e.hash;
                                TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);

                                // 和链表相同的判断, 与运算 == 0 的放在低位
                                if ((h & n) == 0) {
                                    if ((p.prev = loTail) == null)
                                        lo = p;
                                    else
                                        loTail.next = p;
                                    loTail = p;
                                    ++lc;
                                } else {
                                    // 不是 0 的放在高位
                                    if ((p.prev = hiTail) == null)
                                        hi = p;
                                    else
                                        hiTail.next = p;
                                    hiTail = p;
                                    ++hc;
                                }
                            }

                            // 如果树的节点数小于等于 6, 那么转成链表, 反之, 创建一个新的树
                            ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
                                (hc != 0) ? new TreeBin<K,V>(lo) : t;
                            hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
                                (lc != 0) ? new TreeBin<K,V>(hi) : t;

                            // 低位树
                            setTabAt(nextTab, i, ln);
                            // 高位数
                            setTabAt(nextTab, i + n, hn);
                            // 旧的设置成占位符
                            setTabAt(tab, i, fwd);
                            // 继续向后推进
                            advance = true;
                        }
                    }
                }

            }

        }
    } 
}

第一部分 构建一个 nextTable, 它的容量是原来的两倍, 这个操作是单线程完成的。新建 table 数组的代码为: Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1], 在原容量大小的基础上右移一位

第二部分就是将原来 table 中的元素复制到 nextTable 中, 主要是遍历复制的过程。 根据运算得到当前遍历的数组的位置 i, 然后利用 tabAt 方法获得i位置的元素再进行判断

  1. 如果这个位置为空, 就在原 table 中的 i 位置放入 forwardNode 节点, 这个也是触发并发扩容的关键点
  2. 如果这个位置是 Node 节点 (fh >= 0) , 如果它是一个链表的头节点, 就构造一个反序链表, 把他们分别放在 nextTable 的 i 和 i + n 的位置上
  3. 如果这个位置是 TreeBin 节点 (fh < 0) , 也做一个反序处理, 并且判断是否需要 untreefi, 把处理的结果分别放在 nextTable 的 i 和 i + n 的位置上
  4. 遍历过所有的节点以后就完成了复制工作, 这时让 nextTable 作为新的 table, 并且更新 sizeCtl 为新容量的 0.75 倍, 完成扩容
3.5.4 协助扩容 - helpTransfer() 方法

当 ConcurrentHashMap 正在扩容中, 有其他的线程计划向里面添加数据时, 因为正在扩容中, 无法添加。
但是 ConcurrentHashMap 的设计是不会让这个线程阻塞, 而是让这个线程帮忙扩容, 既执行 helpTransfer() 方法。

public class ConcurrentHashMap<K,V> {

    /**
     * @param tab 扩容的数组, 这里看成存放数据的 table 就对了
     * @param f 当前处于扩容中的节点
     */
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {

        Node<K,V>[] nextTab; int sc;


        // 1. 当前的 tab(也就是 table) 不为 null,
        // 2. 当前的节点为 ForwardingNode
        // 3. 当前 ForwardingNode 中用于临时存放扩容时, 数据的 nextTable 不为空
        // 三个条件都满足了, 会进入协助扩容
        if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {


            int rs = resizeStamp(tab.length);

            while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {

                // 这里的逻辑和 addCount 里面的差不多, 就不做解释了
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;

                // 通过 CAS 操作, 设置当前的扩容线程 + 1, 
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    // 设置成功了, 进入协助扩容
                    transfer(tab, nextTab);
                    break;
                }       

            }
            return nextTab;
        }
        return table;
    }
}

4 ConcurrentHashMap 与 size 相关的一些方法

对于 ConcurrentHashMap 来说, 这个 table 里到底装了多少东西其实是个不确定的数量, 因为不可能在调用 size() 方法的时候像 GC 的 “stop the world” 一样让其他线程都停下来让你去统计,
因此只能说这个数量是个估计值。对于这个估计值, ConcurrentHashMap 也是大费周章才计算出来的。

为了统计元素个数, ConcurrentHashMap 定义了一些变量和一个内部类


public class ConcurrentHashMap<K,V> {

    /**
     *  Contended 注解可以用来解决伪共享
     */
    @sun.misc.Contended 
    static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

    /** 当出现了并发的时候, 整个 ConcurrentHashMap 的元素个数, 是分散存在这个数组内*/
    private transient volatile CounterCell[] counterCells;

    /** 创建 CounterCells 是充当自旋锁使用 */
    private transient volatile int cellsBusy;

    /** 实际保存的是 Map 中的元素个数  利用 CAS 锁进行更新, 但是当出现了并发情况, 元素的个数就不从这个进行获取了*/
    private transient volatile long baseCount;
}

size() 方法

public class ConcurrentHashMap<K,V> {

    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
    }

    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        long sum = baseCount;
        
        /** counterCells 不为 null, 就是出现了并发情况, 这个对象被声明了 */
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;//所有counter的值求和
            }
        }
        return sum;
    }

    /**
     * mappingCount 与 size 方法的类似 
     * 官方的给出的注释来看, 应该使用 mappingCount 代替 size 方法
     */
    public long mappingCount() {
        long n = sumCount();
        return (n < 0L) ? 0L : n; 
    }
}

5 总结

JDK 1.6, 1.7 中的 ConcurrentHashmap 主要使用 Segment 来实现减小锁粒度。 分割成若干个 Segment, 在 put 的时候需要锁住 Segment, get 时候不加锁, 使用 volatile 来保证可见性。
当要统计全局时 (比如 size) , 首先会尝试多次计算 modcount 来确定, 这几次尝试中, 是否有其他线程进行了修改操作, 如果没有, 则直接返回 size。如果有, 则需要依次锁住所有的 Segment 来计算。

1.8 之前 put 定位节点时要先定位到具体的 segment, 然后再在 segment 中定位到具体的桶。而在 1.8 的时候摒弃了 segment 臃肿的设计, 直接针对的是 Node[] table 数组中的每一个桶, 进一步减小了锁粒度。
并且防止拉链过长导致性能下降, 当链表长度大于 8 的时候采用红黑树的设计。

主要设计上的变化有以下几点:

  1. 放弃采用 segment 而采用 node, 锁住 node 来实现减小锁粒度
  2. 设计了 MOVED 状态, 当 resize 的过程中, 线程 2 在 put 数据, 线程 2 会帮助 resize
  3. 使用 3 个 CAS 操作来确保 node 的一些操作的原子性, 这种方式代替了锁。
  4. sizeCtl 的不同值来代表不同含义, 起到了控制的作用
  5. 采用 synchronized 而不是 ReentrantLock

更多关于 1.7 版本与 1.8 版本的 ConcurrentHashMap 的实现对比, 可以参考这篇文章

6 参考

ConcurrentHashmap简介
ConcurrentHashMap源码阅读
How does ConcurrentHashMap resizeStamp method work

文章来源:https://blog.csdn.net/LCN29/article/details/135108797
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。