ConcurrentHashMap 是 HashMap 的一个线程安全并且支持高并发的版本,之前的文章《Java 容器之 HashMap》从 HashMap 的实现原理到各个方法的使用,以及每个方法各自的具体实现都分别进行了介绍,并将 JDK1.7 与 JDK1.8 中的 HashMap 进行了对比。

而本文在此基础上首先会介绍 JDK1.7 版本的 java.util.concurrent 包下的 ConcurrentHashMap 类,解释并说明其实现的原理,然后再引入 JDK1.8 版本,通过对比两个不容版本下的 ConcurrentHashMap ,说明其各自的优缺点以及随着版本升级所做的改进。如有错误,还请指正。

概述

我们知道 HashMap 不是线程安全的,在多线程并发的情况下容易发生线程安全问题,在进行扩容时也会出现死循环的问题。例如在并发的情况下,线程 1 进行了 put 操作,由于某种情况随后 sleep 了两秒,在这两秒期间,线程 2 修改了线程 1 之前 put 的值,等到线程 1 结束 sleep 后再次 get 获取原来值的时候,就有可能取到的值已经不是原来的值了,就会存在问题。

如果不用 HashMap 的话,用 HashTable 或者同步包装器 Collections.synchronizedMap(Map<K,V> m) 可以吗

之前的文章中也介绍了 HashTable 这个类,它的底层也是采用数组+链表的形式,只不过是 HashMap 的线程安全版本,即里面的方法是同步的(synchronized),当一个线程访问 HashTable 的同步方法时,其它线程如果要是也访问了该方法,则会出现阻塞或轮询等效率低下的情况。例如线程 1 使用 put 进行添加元素,线程 2 不但不能使用 put 方法,而且也不能使用 get 方法来获取元素,因此如果线程之间竞争越激烈,效率就越低。

而同步包装器也是可以保证线程安全,但是它们都是通过使用一个全局的锁来同步多个线程间的并发访问的,因此就会产生性能问题。所以,还可以使用更好的方式,即 ConcurrentHashMap。在介绍 ConcurrentHashMap 之前,还需要引入锁分段的概念。

在多线程环境下,所有访问 HashTable 的线程都必须竞争同一把锁,所以说效率不高。如果此时有多把锁,每一把锁用于锁住容器中的一部分数据,那么当多线程在访问容器里面的不同数据段的数据时,线程之间就不会存在锁竞争的问题,也就提高了多线程并发效率。

ConcurrentHashMap 就是依靠上述方式实现的,即锁分段技术。在 ConcurrentHashMap 中,首先会将数据以一段一段的方式进行存储,然后给每段的数据分配一把锁,当一个线程占用了锁并访问其中的一段数据时,其它数据段的数据也能被其它线程访问到,多个线程之间在进行访问时互不打扰。

那么 ConcurrentHashMap 是如果保证线程安全、高效并发的呢

  • 首先就是依靠上述所提到的锁分段技术,在进行写操作时,通过锁分段技术只对当前操作的段进行加锁,而不会影响其它段的数据,正是通过这种加锁以及不加锁的方案可以取保其线程的安全性;
  • 其次,是由 ConcurrentHashMap 的底层结构决定的,ConcurrentHashMap 底层是一个 Segment 数组,每个 Segment 数组中又包含多个桶,这些桶又组成了一个数组,每个桶位置上是由若干个 HashEntry 对象连接起来的链表;
  • 最后,利用 HashEntry 的不变性、volatile 变量的内存可见性以及加锁重读机制,确保了 ConcurrentHashMap 能够进行安全、高效的读操作。

整体结构:

内部实现

类的继承关系

如上图所示,ConcurrentHashMap 继承了静态类 AbstractMap,实现了 ConcurrentMap 以及 Serializable 接口:

1
2
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
    implements ConcurrentMap<K,V>, Serializable {}

常量定义

相对于 HashMap,ConcurrentHashMap 中的常量也定义了数组的默认的初始化容量、负载因子等,但比 HashMap 多了许多常量,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 默认的初始化容量,如果没有在构造器中指定的话,则默认为 16.
static final int DEFAULT_INITIAL_CAPACITY = 16;

// 默认的父子啊因子,如果没有在构造器中指定的话,则默认为 0.75.
static final float DEFAULT_LOAD_FACTOR = 0.75f;

// 默认的并发级别,如果没有在构造器中指定的话,则默认为 16.
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 最大容量,如果任何一个带有参数的构造函数隐式的指定了更高的值,则将会调用它;
// 该值必须满足的范围是 2 的幂到 1<<30.
static final int MAXIMUM_CAPACITY = 1 << 30;

// 每个 segment 数组的最小容量默认是 2;
// 该值必须是 2 的幂,以避免在延迟构造后立即调整下一次使用的大小.
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

// segment 数组所允许的最大数量,即 2 的 16 次方;
// 用于绑定构造函数的参数,2 的幂必须小于 1<<24.
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

// 此常量在进行 size 等跨段操作时会用到
// 详见 size() 方法部分的介绍
static final int RETRIES_BEFORE_LOCK = 2;

成员变量

由于引入了锁分段技术,在涉及到若干个由 Segment 实例组成的数组时,这里新增了两个 final 变量,用于定位段,如下所示:

1
2
3
4
5
final int segmentMask;

final int segmentShift;

final Segment<K,V>[] segments;

其中,segmentMask 表示用于定位段的掩码值,该值的计算方式是:先计算 key 的哈希值,然后利用哈希值的高位去定位段。segmentShift 表示段内索引的移位值。可以看到, ConcurrentHashMap 的底层使用的是一个 Segment 实例的数组。

内部类 Segment

被 final 修饰的 Segment 继承了 ReentrantLock 并实现了序列化,由于继承了可重入锁 ReentrantLock,因此 Segment 就具有了锁的能力。下面是部分成员变量及构造函数:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
// 表示链表数组,注意:它是被 volatile 修饰的,能够保证其可见性
transient volatile HashEntry<K,V>[] table;

// 表示 Segment 中元素的个数
transient int count;

// 表示在 Segment 中可变操作的总次数
transient int modCount;

// 当 Segment 的大小超过此阈值时,将对其进行重哈希操作
transient int threshold;

// 段的负载因子
final float loadFactor;

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
        this.loadFactor = lf;
        this.threshold = threshold;
        this.table = tab;
    }

通过 Segment 类及其成员变量可以看到,由于采用了分段锁的方式,通过对不同的段分别设置一个锁,然后当某线程对当前段中的数据进行读操作的时候,其它线程可以去操作其它段中的数据,从而不会造成并发问题,但前提是,多个读写操作发生在不同的段上。

内部类 HashEntry

与 JDK1.7 中 HashMap 的结构类似,HashEntry 包含四部分:被 final 修饰的 hash 和 key,以及被 volatile 修饰的 value 和 next。由于引入了 final 关键字,则保证了 HashEntry 对象几乎是不可变的,而引入了 volatile 关键字,则保证了 value 以及 next 的修改对其它线程是可见的,确保了其它线程读到的是最新的值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
static final class HashEntry<K,V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K,V> next;

    HashEntry(int hash, K key, V value, HashEntry<K,V> next) {
        this.hash = hash;
        this.key = key;
        this.value = value;
        this.next = next;
    }

构造函数

ConcurrentHashMap 的构造函数与三个参数初始容量负载因子并发级别密切相关,在 JDK1.7 中有 5 个构造函数,如下所示:

第一个构造函数,该构造器需要给定初始化容量、负载因子以及并发等级这三个参数,这里详解一下代码:

  • 在第一个 while 循环中,用于计算合适的 sshift 和 ssize,如果 concurrencyLevel 为 16,则 sshift 和 ssize 分别为 4 和 16,则 segmentShift 为 28,segmentMask 为 15;
  • 继续往下面走,此时 c = 16 / 16 = 1,cap = 2;
  • 再往下面走,在初始化 Segment 的地方一直到最后,这里会初始化 Segment 数组 segments 并初始化 segments[0]。因此初始化的 s0 数组长度为 2,负载因子为 0.75,最后初始化了一个 Segment 数组 ss,即容量为 ssize = 16。

源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
        throw new IllegalArgumentException();
    if (concurrencyLevel > MAX_SEGMENTS)
        concurrencyLevel = MAX_SEGMENTS;
    // Find power-of-two sizes best matching arguments
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    this.segmentShift = 32 - sshift;
    this.segmentMask = ssize - 1;
    if (initialCapacity > MAXIMUM_CAPACITY)
        initialCapacity = MAXIMUM_CAPACITY;
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity)
        ++c;
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    while (cap < c)
        cap <<= 1;
    // create segments and segments[0]
    Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]);
    Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
    this.segments = ss;
}

第二个构造函数,通过指定初始化容量、负载因子,使用默认的并发级别(16)创建一个空的 ConcurrentHashMap:

1
2
3
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}

第三个构造函数,通过指定初始化容量,使用默认的负载因子(0.75)和默认的并发级别(16)来创建一个空的 ConcurrentHashMap:

1
2
3
public ConcurrentHashMap(int initialCapacity) {
    this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

第四个构造函数,使用默认的初始化容量(16)、默认的负载因子(0.75)以及默认的并发级别(16)来创建一个空的 ConcurrentHashMap:

1
2
3
public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

第五个构造函数,通过构造一个与指定 Map 具有相同映射的 ConcurrentHashMap,使其初始化容量不小于 16,负载因子是 0.75,并发级别是 16,来创建一个 ConcurrentHashMap:

1
2
3
4
5
6
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1,
                    DEFAULT_INITIAL_CAPACITY),
            DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
    putAll(m);
}

通过第一个构造函数可知,为了能通过按位与的哈希算法来定位 segments 数组的索引,必须保证 segments 数组的长度是 2 的 n 次方,所以必须计算出一个大于或等于 concurrencyLevel 的最小的 2 的 n 次方的值作为 segments 数组的长度。这里给出一张图,便于理解。

put() 方法

注意到 put 方法中的 key 和 value 均不能为 null,整体流程是先根据 key 来定位到 Segment,然后在对应的 Segment 进行 put 操作,也就是最后的 return 语句。

也就是说,在进行插入或删除元素的时候,必须先通过哈希算法定位到 Segment,即使用 Wang/Jenkins hash 的变种算法对元素的 hashCode 值再次进行哈希操作。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
public V put(K key, V value) {
    Segment<K,V> s;
    // 这里的 value 不能为空
    if (value == null)
        throw new NullPointerException();
    // 根据 key 的哈希值
    int hash = hash(key.hashCode());
    // 根据哈希值计算待插入对象在 segments 数组中的位置
    int j = (hash >>> segmentShift) & segmentMask;
    // 检查当前数组中指定位置的 Segment 是否为空
    // 如果为空,则先初始化 Segment 在进行 put,
    // 如果不为空,则直接执行 put 操作 
    if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
            (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment
        s = ensureSegment(j);
    return s.put(key, hash, value, false);
}

具体的再哈希操作:

1
2
3
4
5
6
7
8
private static int hash(int h) {
    h += (h <<  15) ^ 0xffffcd7d;
    h ^= (h >>> 10);
    h += (h <<   3);
    h ^= (h >>>  6);
    h += (h <<   2) + (h << 14);
    return h ^ (h >>> 16);
}

如此复杂的再哈希操作其实是为了减少哈希冲突,假如所有的元素都分布在一个 Segment 中,那么势必会存取缓慢。所以上面的计算方式可以使元素均匀的分布在不同的 Segment 上,从而提高容器的存取效率。

其实这里的处理方式的思想与 HashMap 中是一致的,都是为了减少哈希冲突。如果某些元素二进制的底位存在相同的情况的话,在不使用再哈希时,其计算出的结果总是一样的。而实现了上述再哈希函数的话,通过将元素的高位也参与到运算中,这样一来元素的每一位都会参与到哈希值的计算,从而就可以减少哈希冲突

注意 put 方法里面的 ensureSegment() 方法,在当前数组中指定位置的 Segment 为空时,则调用此方法。此方法接收一个给定的索引 j,返回一个 Segment 键值对对象。具体源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private Segment<K,V> ensureSegment(int k) {
    // 首先获取到 segments
    final Segment<K,V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // raw offset
    Segment<K,V> seg;
    if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) {
        // 使用 segment 0 作为原型,也就是拷贝一份与 segment 0 一样的 segment 
        Segment<K,V> proto = ss[0]; // use segment 0 as prototype
        // 容量 cap 为 2
        int cap = proto.table.length;
        // 负载因子为 0.75
        float lf = proto.loadFactor;
        // 阈值是 1,因为进行了强转
        int threshold = (int)(cap * lf);
        // 创建 HashEntry 数组
        HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap];
        if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
            Segment<K,V> s = new Segment<K,V>(lf, threshold, tab);
            while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u))
                    == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s))
                    break;
            }
        }
    }
    return seg;
}

之前的 put 方法是根据 key 定位到 Segment,而下面的方法是在确定 Segment 以后,在 Segment 内部进行 put 的方法。因此,下面的 put 方法是类 Segment 中的方法,首先会尝试获取锁,如果获取失败则很定有其它线程存在竞争,则利用 scanAndLockForPut() 自旋获取锁。如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 先尝试获取锁,获取到则返回 null,否则执行 scanAndLockForPut 方法
    HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        // 此 table 被 volatile 修饰 
        HashEntry<K,V>[] tab = table;
        // 计算在 HashEntry[] 中的位置
        int index = (tab.length - 1) & hash;
        // 找到 HashEntry[] 中指定位置的第一个节点,即 first 指向桶中链表的第一个节点
        HashEntry<K,V> first = entryAt(tab, index);
        for (HashEntry<K,V> e = first;;) {
            // 如果当前节点不为空,则遍历该链表
            if (e != null) {
                K k;
                // 如果之前已经存在了该 key,则用新值替换旧值
                if ((k = e.key) == key ||
                    (e.hash == hash && key.equals(k))) {
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                // 来到下一节点
                e = e.next;
            }
            // 如果当前节点为空,则进入 else
            else {
                if (node != null)
                    node.setNext(first);
                else
                    // 采用链表的头插法新建一个节点
                    node = new HashEntry<K,V>(hash, key, value, first);
                // 键值对数量加 1
                int c = count + 1;
                // 如果超过阈值,则扩容
                if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                    rehash(node);
                else
                    // 没有超过阈值的话,则放在指定的位置
                    setEntryAt(tab, index, node);
                ++modCount;
                count = c;
                // 桶中不存在相同 key 的节点,所以返回 null
                oldValue = null;
                break;
            }
        }
    } finally {
        // 解锁操作
        unlock();
    }
    // put 成功则返回旧值
    return oldValue;
}

ConcurrentHashMap 的 put 操作相对于 HashMap 中的 put 操作新增了加锁和解锁的操作,这样才能保证同一时刻只有一个线程拥有修改的权限。这里应该注意一下 put 操作中一开始获取锁的操作,即

1
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);

假如当前环境下没有任何线程进行 put 操作,此时如果线程 1 进行了 put 操作,它首先会去尝试获得锁,由于之前没有任何一个线程持有锁,所以线程 1 是可以执行到 tryLock() 并返回 null 的,即线程 1 成功的拿到了锁,然后根据计算找到对应桶的位置,新添加一个键值对。如果此时在线程 1 还没有释放锁的情况下,线程 2 又执行了 put 操作,则假如线程 2 恰好也定位到了和线程 1 同一个段,然后尝试去进行 put 操作,即尝试获取锁。但是线程 1 还没有释放锁,所以线程 2 在一开始的时候会执行scanAndLockForPut 方法,如下所示:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
    // 通过 Segment 和 hash 寻找匹配的 HashEntry
    HashEntry<K,V> first = entryForHash(this, hash);
    HashEntry<K,V> e = first;
    HashEntry<K,V> node = null;
    // 重试次数
    int retries = -1; // negative while locating node
    // 不断循环,尝试自旋获取锁
    while (!tryLock()) {
        HashEntry<K,V> f; // to recheck first below
        if (retries < 0) {
            // 之前表中不存在
            if (e == null) {
                if (node == null) // speculatively create node
                    // 新增一个节点
                    node = new HashEntry<K,V>(hash, key, value, null);
                // 将重试次数置为 0
                retries = 0;
            }
            // 找到了该节点,并且第一个节点就是该节点,重试次数置为 0
            else if (key.equals(e.key))
                retries = 0;
            // 第一个节点也不是,则继续来到下一个节点
            else
                e = e.next;
        }
        // 尝试次数大于了最大的次数(64)的话,则改为阻塞锁获取,保证能获取成功
        else if (++retries > MAX_SCAN_RETRIES) {
            lock();
            break;
        }
        // 在 MAX_SCAN_RETRIES 次过程中,key 对应的 entry 发生了变化,则从头开始
        else if ((retries & 1) == 0 &&
                    (f = entryForHash(this, hash)) != first) {
            e = first = f; // re-traverse if entry changed
            retries = -1;
        }
    }
    return node;
}

线程 2 虽然没有得到锁,但是也没有闲着,而是将准备存放的键值对在对应数组中相应的位置给计算了出来,一旦线程 2 获取到了锁,那么就可以利用等待获取锁的这段时间所做的工作,直接定位到具体的位置,从而节省了时间,提高了执行效率。

最后的else if所做的工作就是:如果线程 2 在等待锁的期间,线程 1 将 key 对应的 entry 进行了修改,则线程 2 需要重新确定接下来要定位的位置。

put 小结

  • 将当前的 Segment 中的 table 通过 key 的 hashcode 定位到 HashEntry;
  • 遍历该 HashEntry,如果不为空,则判断传入的 key 和当前遍历到的 key 是否相等,相等则覆盖旧的 value;
  • 如果 node 为空,则新建一个 HashEntry 并加入到 Segment 中,再判断是否需要扩容;
  • 最后会在 finally 中解除之前获得到的锁。

rehash() 方法

在 Segment 类的 put 方法中,如果键值对的数量超过了阈值,则会进行扩容操作,源码如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
private void rehash(HashEntry<K,V> node) {
    // 旧数组
    HashEntry<K,V>[] oldTable = table;
    int oldCapacity = oldTable.length;

    // 新数组的长度为旧数组的两倍
    int newCapacity = oldCapacity << 1;
    // 根据新数组的长度计算新的阈值
    threshold = (int)(newCapacity * loadFactor);
    // 创建新的表
    HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity];
    // 用于定位桶
    int sizeMask = newCapacity - 1;

    // 遍历旧的表
    for (int i = 0; i < oldCapacity ; i++) {
        // 依次指向旧表中的每个桶的链表表头
        HashEntry<K,V> e = oldTable[i];
        // 当前旧表的桶中的链表不为空
        if (e != null) {
            HashEntry<K,V> next = e.next;
            // 重新哈希,以便于定位新桶
            int idx = e.hash & sizeMask;
            // 如果旧桶中只有一个节点
            if (next == null)   //  Single node on list
                newTable[idx] = e;
            else { // Reuse consecutive sequence at same slot
                HashEntry<K,V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K,V> last = next;
                        last != null;
                        last = last.next) {
                    int k = last.hash & sizeMask;
                    // 扩容前后位置发生了变化
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                // 将改变的键值对放到新表的对应位置
                newTable[lastIdx] = lastRun;
                // Clone remaining nodes
                // 将链表中剩下的节点拷贝到新表中
                for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K,V> n = newTable[k];
                    newTable[k] = new HashEntry<K,V>(h, p.key, v, n);
                }
            }
        }
    }
    // 链表头插新的节点
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

这里的扩容是按照 2 的幂次方进行的,所以在扩容之前处于同一个桶中的元素,则在扩容之后要么还在原来的序号的桶里,要么就是原来的需要再加上一个 2 的幂次方。

这里给出参考文章中的一张清晰的 put 操作的流程图,涵盖了以上所有的情况,如扩容、不断自旋获取锁等等一些相关的操作。

size() 方法

size 方法用于统计整个 ConcurrentHashMap 中元素的数量,由于所有的元素都分布在不同的段中,因此该操作设计到跨段操作,统计所有 Segment 里元素的数量后进行求和。Segment 中的全局变量 count 用于统计对应 Segment 的元素的数量,那在多线程环境下,我们是不是可以直接将所有的 count 加起来就可以得到 ConcurrentHashMap 的大小了呢?并不能,因为虽然相加时可以获取每个 Segment 中 count 的最新值,但拿到之后累加之前可能 count 的值就发生了变化,计算结果也就不准确了。所以安全的做法是:在调用 size 方法的时候,将 Segment 的 put、remove 以及 clean 方法全部锁住,但这样的做法显然非常低效。

JDK1.7 中的做法是,由于在累加 count 的过程中,之前累加过的 count 发生变化的几率是非常小的,所以 ConcurrentHashMap 先尝试RETRIES_BEFORE_LOCK次(默认为 2)通过不锁住 Segment 的方式来统计各个 Segment 的大小。如果在统计的过程中 count 发生了变化,则再采用加锁的方式来统计所有 Segment 的大小。

至于如何判断 count 前后发生了变化,可以利用 Segment 中的 modCount 变量,在 put、remove 和 clean 方法里操作元素时都会将 modCount 进行加 1 操作。因此只需要统计在 size 前后比较 modCount 是否发生了变化就可以知道容器大小是否发生了变化

JDK1.8 中的 ConcurrentHashMap

以上介绍的都是 JDK1.7 中的 ConcurrentHashMap,但 JDK1.7 内部的每个小版本之间还有些不同,不过大体的处理流程是相似的,不会有太大的区别。

JDK1.8 中的 ConcurrentHashMap 与 JDK1.7 的不同点在于:

  • 抛弃了 JDK1.7 中 Segment 锁分段技术,而是采用 CAS + synchronized 的方式来保证并发的安全性,体现在 put 操作的不同,在 JDK1.8 中则是对数组中单个位置加锁;
  • 将 JDK1.7 中用于存放数据的内部类 HashEntry 替换为内部类 Node,但作用相同。
  • JDK1.8 的结构转换为数组+链表+红黑树。

除此之外,JDK1.8 的 ConcurrentHashMap 也多了一个空参构造,首先看一下 put 方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
public V put(K key, V value) {
        return putVal(key, value, false);
    }


final V putVal(K key, V value, boolean onlyIfAbsent) {
    // 不允许存储空 key 和空 value
    if (key == null || value == null) throw new NullPointerException();
    int hash = spread(key.hashCode());
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
        Node<K,V> f; 
        int n, i, fh;
        // 判断是否要进行初始化工作,如果表为 null,则需要初始化表
        if (tab == null || (n = tab.length) == 0)
            tab = initTable();
        // 在不需要进行初始化的情况下,通过 key 定位出 Node,也就是 f,
        // 如果 f 为 null,则说明当前位置可以写入数据,则利用 CAS 尝试写入,失败则自旋
        else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
            if (casTabAt(tab, i, null,
                            new Node<K,V>(hash, key, value, null)))
                break;                   // no lock when adding to empty bin
        }
        // 如果当前不需要初始化表,之前通过 key 定位的 Node 也不为 null,
        // 则再判断该位置的 Node 是否正在扩容,如果正在扩容,则调用 helpTransfer
        else if ((fh = f.hash) == MOVED)
            tab = helpTransfer(tab, f);
        // 当前位置不为空
        else {
            V oldVal = null;
            // 对当前节点加锁
            synchronized (f) {
                if (tabAt(tab, i) == f) {
                    // 节点是链表的情况
                    if (fh >= 0) {
                        binCount = 1;
                        // 遍历整个链表
                        for (Node<K,V> e = f;; ++binCount) {
                            K ek;
                            // 已存在,则替换旧值
                            if (e.hash == hash &&
                                ((ek = e.key) == key ||
                                    (ek != null && key.equals(ek)))) {
                                oldVal = e.val;
                                if (!onlyIfAbsent)
                                    e.val = value;
                                break;
                            }
                            Node<K,V> pred = e;
                            // 如果是新加入的节点,则通过尾插法将其插入链表
                            if ((e = e.next) == null) {
                                pred.next = new Node<K,V>(hash, key,
                                                            value, null);
                                break;
                            }
                        }
                    }
                    // 节点是红黑树的情况
                    else if (f instanceof TreeBin) {
                        Node<K,V> p;
                        binCount = 2;
                        // 遍历红黑树
                        if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                        value)) != null) {
                            oldVal = p.val;
                            if (!onlyIfAbsent)
                                p.val = value;
                        }
                    }
                }
            }
            if (binCount != 0) {
                // 链表中的节点个数超过 8,则转成红黑树
                if (binCount >= TREEIFY_THRESHOLD)
                    treeifyBin(tab, i);
                if (oldVal != null)
                    return oldVal;
                break;
            }
        }
    }
    // 总结点数加一
    addCount(1L, binCount);
    return null;
}

这里再看一眼 spread() 方法,用于根据 key 的 hashCode 计算 hash 值:

1
2
3
4
5
static final int HASH_BITS = 0x7fffffff; 

static final int spread(int h) {
    return (h ^ (h >>> 16)) & HASH_BITS;
}

put 方法中的注释应该很容易就能看清楚,需要注意一点的是,在 put 方法中的最后 addCount 添加节点时可能会引起扩容(transfer),JDK1.8 的 ConcurrentHashMap 支持并发扩容,之前扩容是由一个线程将旧数组中的键值对转移到新的数组中,支持并发以后,转移的时间就缩短了,但处理逻辑也变复杂了。

参考