ConcurrentHashMap 源码阅读笔记


本文基于 JDK 1.8 分析

HashMap 是非线程安全的类,在多线程并发 put 导致 resize,在 transfer 过程中可能导致死锁或者数据丢失

而 ConcurrentHashMap 则是一个线程安全的 Map 类,在 HashMap 的基础上做了线程安全的处理

基础知识

先大概对 ConcurrentHashMap 的数据结构模型有个大概的了解

跟 HashMap 一样,由一个数组和多个链表/红黑树组成

concurrenthashmap.png

常量

MAXIMUM_CAPACITY = 1 << 30

最大的容量,因为 32 位的 hash 值的前两位为控制位,所以最大只到 1<<30

DEFAULT_CAPACITY = 16

默认初始容量,必须为2的次幂,最低位 1,最大为 MAXIMUM_CAPACITY

DEFAULT_CONCURRENCY_LEVEL = 16

默认的并发级别

LOAD_FACTOR = 0.75

负载因子

TREEIFY_THRESHOLD = 8

MIN_TREEIFY_CAPACITY = 64

当插入时,链表上的节点数量超过 TREEIFY_THRESHOLD ,且 table 的长度也超过了 MIN_TREEIFY_CAPACITY,则会将链表转为树

否则进行扩容操作

sizeCtl

sizeCtl 字段是用来表示 table 的初始化状态或者用来控制 table 数组的容量

如果 sizeCtl 为负数,则表示 table 正在初始化或者在调整容量

如果 sizeCtl 不为负数,如果 table 为 null,则保存初始化 ConcurrentHashMap 时的容量,为0或者默认值

初始化以后,则保存的是要进行扩容的阈值

sizeCtl = -1 表示正在初始化

sizeCtl < 0 && sizeCtl! = -1 表示正在扩容中,且 sizeCtl 的低16位的值表示正在扩容的线程数 + 1,高16位为扩容标识,是由数组长度计算得到的值

sizeCtl ≥ 0 && table == null 表示初始化时计算出的默认容量

sizeCtl ≥ 0 && table != null 表示需要扩容的阈值

Node<K,V>

Node<K,V> 是 ConcurrentHashMap 的一个静态内部类,实现了 Map.Entry<K,V> 接口,用来保存键值对信息,注意这里的 valnext 字段都是用了 volatile 修饰,以保证其线程间的可见性

1
2
3
4
5
6
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}

构造方法

ConcurrentHashMap 有多个构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public ConcurrentHashMap() {

}
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}

public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (initialCapacity < concurrencyLevel) // Use at least as many bins
initialCapacity = concurrencyLevel; // as estimated threads
long size = (long)(1.0 + (long)initialCapacity / loadFactor);
int cap = (size >= (long)MAXIMUM_CAPACITY) ?
MAXIMUM_CAPACITY : tableSizeFor((int)size);
this.sizeCtl = cap;
}

initialCapacity: 传入的初始容量

loadFactor: 负载因子

在构造方法中,实际上,是想通过传入的 initialCapacity 参数,并根据 loadFactor 计算出数组的初始化容量 cap,并且数组的阈值要大于 initialCapacity ,initTable() 方法中会使用这个 sizeCtl 值初始化 table 数组的容量

But,但是

这里的 public ConcurrentHashMap(int initialCapacity) 实际上是有 bug 的,在某些情况下,和下面三个参数的构造方法计算出来的 cap 值并不一致。

1
2
new ConcurrentHashMap(22)
new ConcurrentHashMap(22,0.75,1)

在一个参数的构造方法中,计算出来的 cap 值为 64,而三参数的构造方法计算所得到的 cap 为 32

这个问题直到 JDK 12 才被修复

value of ‘sizeCtl’ in ConcurrentHashMap varies with the constructor called

tableSizeFor(int size)

tableSizeFor(int size) 会返回大于等于 size 的最小的 2^n 的

例如:tableSizeFor(22) = 32 ; tableSizeFor(3) = 4

具体 tableSizeFor(size) 的方法可以参考

HashMap之tableSizeFor方法图解

添加元素

put 方法

final V putVal(K key , V value , boolean onlyIfAbsent)

先看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
final V putVal(K key, V value, boolean onlyIfAbsent) {
//如果 key 和 value 都为 null 则抛出空指针异常
if (key == null || value == null) throw new NullPointerException();
//计算 key 的 hash 值
int hash = spread(key.hashCode());
//记录链表中节点的数量,用来控制扩容或者由链表转变为树
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
//死循环,直到满足某个条件后再退出
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
//-------------①--------------------
//如果 tab 为空或者 tab 的长度为 0,则初始化 table
tab = initTable();
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//-------------②--------------------
//如果 hash 对应的索引处 i 的桶为空,则尝试 cas 插入
//这里 i 被赋值为索引
//f 被赋值为索引处的 node 对象
if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))
//插入数据成功,退出循环
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
//-------------③--------------------
// i 处的 Node 不为 null 且其 hash 为 MOVED,即正在扩容中
// 这里将 fh 赋值为 Node f 的 hash 值
tab = helpTransfer(tab, f);
else {
//-------------④--------------------
//否则,即 i 处已经有 Node f 存在,并且不处于扩容中,那就应该插入到链表中
V oldVal = null;
//多 f 加锁,多线程访问
synchronized (f) {
if (tabAt(tab, i) == f) {
//这里再次确认 i 处 Node 对象是否为 f
if (fh >= 0) {
//如果 f 的 hash 大于等于 0
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
//开始遍历链表,
//过程中如果找到 key 相同的,则替换 value 并退出遍历
//否则,遍历完链表,将节点插入链表尾部
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
//如果 key 相等,则替换 value,退出循环
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
else if (f instanceof TreeBin) {
//如果当前节点已经是树了,则将键值对插入树中
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
else if (f instanceof ReservationNode)
throw new IllegalStateException("Recursive update");
}
}
if (binCount != 0) {
//binCount 不为0,链表上的节点数量不为 0 ,即键值对插入了
if (binCount >= TREEIFY_THRESHOLD)
//节点上的数量超过了阈值,转为树
treeifyBin(tab, i);
if (oldVal != null)
//如果旧值不为 null,即替换了某个键值对
//则 return 旧值,结束整个 put 流程
return oldVal;
//否则,即 oldVal == null 的情况下
//退出 for (Node<K,V>[] tab = table;;) 循环
//走 addCount(1L,binCount) 处的逻辑并返回 null
break;
}
}
}
//增加计数,扩容等
addCount(1L, binCount);
//返回 null,代表是新增的键值对,并非 key 相同替换旧的 value
return null;
}

总结一下:

①. 初始化数组

通过 spread 方法计算 hash 值,并开始死循环,直到插入成功后退出

如果 table 还未初始化,则先通过 initTable() 初始化数组

②. index 处为 null

通过 spread() 方法计算出对应的索引 i ,如果数组 i 索引处的值为 null ,则通过 cas 的方式进行插入,如果 cas 失败,说明有别的线程在对该处进行操作,则进行下一次 for 循环,下一次 for 循环则会进入 3、4 步中

③. 帮助扩容

如果 index 处节点不为 null 且 index 处的节点 hash 值为 MOVED ,则进行 helpTransfer() 方法

④. 普通插入

上述几步都不满足,则说明 index 上有个普通的节点,或为链表,或为树,进行更新或者插入操作

这里需要注意的是,会将 f(即 index 处的节点) 进行加锁,并会再次检查此时的 table index 索引处的节点是否被改变了,如果改变了,则进入下一次循环,如果没有改变依旧为 f ,则进行更新或者插入操作

如果是链表,则遍历链表覆盖更新值或者在链表尾部插入新的键值对(binCount = 原链表长度)

如果是树,则将键值对插入树中

添加完成后,判断该节点上的链表长度,如果超过阈值,则调用 treeifyBin 转为树或者扩容数组,最后调用 addCount 添加计数,视机扩容数组等

initTable()

💡 初始化 table 数组

接着看源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//table 还没初始化,则进行初始化
if ((sc = sizeCtl) < 0)
// sizeCtl 小于 0 说明有别的线程在初始化,则让出 CPU 时间碎片
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
//否则,通过 cas 将 sc 和 -1 进行 cas 操作
//SIZECTL 是 sizeCtl 传入的对象 this
//(即 sizeCtl 在 ConcurrentHashMap 这个对象中内存的偏移量)
// sc 为期望值,此时值为 sizeCtl
// -1 为要替换的值
//总结来说就是将 sizeCtl 和 sc 进行比较,如果相等就将 -1 赋值给 sizeCtl,返回 true
//否则不赋值返回 false

//走到这里说明 cas 成功了
try {
if ((tab = table) == null || tab.length == 0) {
//如果设置了容量大小则使用设置的容量,否则使用默认的容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//new 一个数组并赋值
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//计算扩容的阈值 n - (n >>> 2) 等同于 n * LOAD_FACTOR
//只不过使用位运算更快
//即 n * 0.75 = n * 3/4 = n * (1 - 1/4) = n - n/4
sc = n - (n >>> 2);
}
} finally {
//将 sc 的值赋值给 sizeCtl
sizeCtl = sc;
}
//退出循环
break;
}
}
//返回新的 table 数组
return tab;
}

public final native boolean compareAndSwapInt(Object o, long offset,int expected,int x};
这个方法的作用是,读取传入的对象 o 在内存中偏移 offset 的值,并和 expected 比较
如果相同,则将 x 赋值给内存中偏移 offset 位置的值,并返回 true
否则不赋值返回 false

总结:

如果 sizeCtl 小于 0 则让出,则暂停自己的线程,以便其他线程去执行初始化操作

如果 sizeCtl 大于等于 0 则初始化一个 sizeCtl 大小的数组,并更新 sizeCtl 为自身的负载因子倍( * 0.75)

这里通过 Unsafe.compareAndSwapInt 保证了线程安全

假如有多个线程同时调用了 initTable 方法,只有一个线程的 U.compareAndSwapInt(this, SIZECTL, sc, -1) 会返回 true 走入该 if 中的代码去执行初始化操作,假如有第三个线程来了,则会走入到 Thread.yield() 这里去让出时间碎片

helpTransfer

如果在某个线程在进行转移节点数据,则进行判断,满足条件就帮助转移,并返回 table 数组,以便在下次循环中进行插入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//如果 tab 不为空,且其头节点为 fwd 节点,说明正在扩容迁移
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//满足 while 中的条件这里说明正在扩容
//则循环,直到扩容结束
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
//修改 sizeCtl 成功,修改扩容的线程数 +1,参与扩容
transfer(tab, nextTab);
break;
}
}
//扩容结束,返回新 table
return nextTab;
}
//头节点非 fwd 节点,则返回旧 table
return table;
}

resizeStamp(int n)

由于传入的参数 n 都是 2 的 n 次幂,这个方法实际上是用来将这个 n 通过算法将其记录在低16位中

TL;DR

返回值高16位记录为 0,低16位记录的是「扩容标识」,与数组长度有关

1
2
3
static final int resizeStamp(int n) {
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS- 1));
}

其中 Integer.numberOfLeadingZeros(n) 返回 n 在二进制中最高位的1前面0的个数

1
2
3
4
比方说 1 的二进制为 
0000 0000 0000 0000 0000 0000 0000 0001
则 `Integer.numberOfLeadingZeros(1)` = 31
所以 Integer.numberOfLeadingZeros(int n) 的值的范围为 0-32

接着看后半段,*RESIZE_STAMP_BITS 值为 16,*所以 (1 << (*RESIZE_STAMP_BITS - 1 )* 即1左移15位,得到 0000 0000 0000 0000 1000 0000 0000 0000

最后进行 | 运算*(有1则为1)*

我们以 resizeStamp(16) 为例

1
2
3
4
5
6
7
8
9

(十进制 16) == 0000 0000 0000 0000 0000 0000 0001 0000

Integer.numberOfLeadingZeros(16) = 27 == 0000 0000 0000 0000 0000 0000 0001 1001
| 或运算
(1 << (RESIZE_STAMP_BITS- 1) = 1<<15 == 0000 0000 0000 0000 1000 0000 0000 0000
= 等于
0000 0000 0000 0000 1000 0000 0001 1001

可见 resizeStamp 方法会返回一个 高16为都为0,低16位为传入的参数 n 的 「最高位的1前面0的个数」

即高16位记录为 0,第16位为1,低15位记录的是「最高位的1前面0的个数」低16位构成一个「扩容标识」

可见该方法返回的值的取值范围为 [32769,32799]

treeifyBin(Node<K,V>[] tab,int index)

在往 ConcurrentHashMap 中插入元素后,会调用该方法进行扩容或者将链表转为树

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
//如果 tab 的长度小于 MIN_TREEIFY_CAPACITY(64),则进行扩容操作
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
//否则,如果 index 处的节点 b 不为空且 hash 值大于0
synchronized (b) {
//加锁
//再次判断,tab 的 index 处是否为 b
if (tabAt(tab, index) == b) {
//转为树,略过不表
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}

总结:

当插入一个元素后,如果链表的长度超过TREEIFY_THRESHOLD == 8,但整个 table 数组长度小于 MIN_TREEIFY_CAPACITY==64,则进行扩容操作,如果超过了 MIN_TREEIFY_CAPACITY 则将链表转为树

tryPresize(int size)

💡 尝试预处理 table 数组的容量以容纳给定的数量的元素

这个方法只在 treeifyBin() 以及 putAll() 中被调用

treeifyBin() 方法传入的 size 为 table.length << 1

putAll() 传入的 size 为 m.size() 即原 Map 的节点数

以 table.length == 16 为例,调用该方法时候,size = table.length << 1 即 32,计算出来的 c 为 64,直到 c ≤ sc 时才会退出循环,则 sc 至少需要为 128 * 0.75 = 96 才会退出循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
/**
* 尝试调整 table 数组的容量
* @param size 需要调整到的容量
* 这个方法只在 treeifyBin() 以及 putAll() 中被调用
*/
private final void tryPresize(int size) {
//根据 size 的值计算约束出为一个正确的 2的次幂值
//为 MAXIMUM_CAPACITY 或者是大于等于 (size * 1.5 + 1) 的最小的 2次幂
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1);
// sc 为 sizeCtl 在本方法中的临时变量
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
if (tab == null || (n = tab.length) == 0) {
//-------①--------
//如果 table 还未初始化,则进行初始化操作
// n 为新的数组容量
n = (sc > c) ? sc : c;
//同理,if 中的语句会通过 cas 将 sizeCtl 更新为 -1,表示正在初始化
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
//计算 sc,即 n - 0.25 * n = 0.75 * n
sc = n - (n >>> 2);
}
} finally {
// 设置新的阈值
sizeCtl = sc;
}
}
}
else if (c <= sc || n >= MAXIMUM_CAPACITY)
//-------②--------
// c <= sc 说明数组的容量已经足够了
// n >= MAXIMUM_CAPACITY 说明已经超过最大容量了
//则退出循环,不处理
break;
else if (tab == table) {
//-------③--------
//进行扩容操作
//走到这里说明①和②的情况已经被排除掉了,即已经 table 数组已经初始化过了
//并且c <= sc不成立,即 sc < c ,说明还没扩容完成

//n 为 table 的容量
int rs = resizeStamp(n);
if (sc < 0) {
//sc < 0 因为数组已经初始化了,说明有其他线程正在扩容
Node<K,V>[] nt;
//
//sc 右移16位后,高16位为0,低16位为数组长度计算出的扩容标识
//(sc >>> RESIZE_STAMP_SHIFT) != rs:扩容标识不同
//说明数组长度发生了变化,可能是别的线程触发了第二次扩容
//sc == rs + MAX_RESIZERS:已经达到最大的扩容线程数量了
//(nt = nextTable) == null :nextTable 还没创建
//transferIndex <= 0 需要迁移的区间已经被别的线程分完了

//实际上这里有两个 bug
//1. sc == rs + 1
//sc 现在是个负数,rs 高16位都为 0,无论如何 rs + 1 都不会等于 sc
//fix:
//sc == (rs << RESIZE_STAMP_SHIFT) + 1
//
//
//
//2. sc == rs + MAX_RESIZERS
//MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1 = (1 << 16) -1 = 65535
//rs 的取值范围为 [32769,32799] 而 rs + MAX_RESIZERS > 0 并且还没溢出为负数
//也无论如何都不会相等
//fix:
//sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
//
//将 rs 左移 16位,其高16位才是扩容标识,低16为存储扩容的线程数 + 1
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
//下面这里对 sizeCtl 加一,表示参与迁移的线程数 +1,并进行迁移
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//说明自己是第一个进行扩容的线程,则通过 cas 的方式,将 rs 左移 16位并加2,存储在 sizeCtl 中
//通过前面的 resizeStamp 方法我们直到 rs 的高16位为0,这里左移16位,则低16位都为0,再加2
//则用低16为表示正在扩容的线程数,并且用 2 表示第一个正在扩容的线程
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
}
}
}

transfer()

transfer(Node<K,V> tab, Node<K,V>[] nextTab)
💡 将旧数组的数据迁移到新的数组中,支持多线程迁移

字段说明

transferIndex 是个全局变量,通过 volatile 修饰,并通过 cas 的方式进行修改,用来表示当前线程应该从哪个地方开始往前遍历,如果 transferIndex <= 0 说明对数组的转移已经到头了,不需要再继续处理了

i 字段表示当前线程在迁移的桶的索引

bound 字段表示当前线程的需要迁移的区间的上边界

总结

即每个线程的迁移区间为 [bound,transferIndex),并通过 i 在该区间从后往前遍历桶进行迁移处理,遍历区间完成后,会继续竞争下一个区间

为了高效的进行迁移数据,这里允许多个线程进入,但是给每个线程分配了旧数组的一段区间进行迁移,避免多个线程同时迁移同个区间,代码中通过 cas 的方式,

让多个线程通过 cas 的操作竞争 transferIndex 字段,并通过 transferIndex 字段和 stride 计算出每个线程的迁移区间,所以线程在处理完竞争到的某个区间后,会不停地往前竞争处理下一个区间,直到处理到数组最前面的区间,比如说[0,16) 的区间后,transferIndex 等于 0,所以 i = -1,

就会将 sizeCtl 中低16位中存储的线程数减1后 return,退出迁移,等到所有的线程对所有的区间处理完毕后,其他线程都退出迁移操作了,最后一个线程会将 finish 设置为 true,

再进行下一次循环后会将新的 nextTab 赋值给 table 并将 nextTable 置为空

从 HashMap 的迁移算法中我们也可以直到,对于一个节点,在扩容迁移后,要么还在原来的桶(假设索引为 index)中,要么就在索引为 2 * index 的桶处

同理在 ConcurrentHashMap 中也是这样子,所以不会造成多个线程同时转移到同一个桶中

首先会计算每个线程需要进行迁移的步长 stride,即迁移的桶的数量,至少为 16

在进行迁移时,会从后往前对 table 数组进行遍历,逐步对桶内的数据(链表或者树)进行迁移

由于篇幅太长,我们假设步长 stride 为4,举例进行说明

transfer_concurrenthashmap.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
//当第一个线程进行转移数据时,nextTab == null,否则为全局变量 nextTable
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {

int n = tab.length, stride;
//计算步长 stride,至少为 16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
//nextTab 为 null 说明是第一个线程进行转移
try {
//创建一个容量为旧数组容量两倍的新数组,并赋值给 nextTab
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
//如果产生 OOM 等异常,则直接退出,不转移了
sizeCtl = Integer.MAX_VALUE;
return;
}
//将新数组赋值给 nextTable ,此时数组为一个没有数据的空数组
nextTable = nextTab;
//transferIndex 记录旧数组的长度,表示转移开始的索引值
transferIndex = n;
}
//nextn 为新数组的容量
int nextn = nextTab.length;
//fwd 是用来标识某个桶处正在转移的,存储了新数组
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//advance 是否前进,用来标记是否需要在区间内继续往前前进处理
boolean advance = true;
//finishing 是否扫描结束了,用来标记是否将所有的区间都迁移完成了
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
//这里将 i 和 bound 初始化为 0,并开始死循环,直到数据转移成功后再退出循环
Node<K,V> f; int fh;

//在第一个线程第一次执行到这里时
//--i == -1, bound == 0,第一个判断为 false
//注意这里每次进入 while 循环都会做一次 --i 的操作,使得 i 不断的自减
//nextIndex = transferIndex == n == tab.length 即旧数组的长度,大于0,第二个判断为 false
//则走到第三个判断中
//nextBound 为下一个线程需要进行转移的数组的下边界
//计算 nextBound 的值,如果 nextIndex(即就数组的长度)大于步长 stride,
//则 nextBound = nextIndex - stride,并通过 cas 的方式赋值
//给全局变量 transferIndex,即下个线程需要转移的区间的下边界
//下面的代码主要就是在第一次循环时,计算出来 [bound,transferIndex) 这个区间
//在下一次 for 循环则会走到第一个判断中,直到 i < bound 或者 finishing
//当 i < bound 时,即当前线程已经把分配给自己的区间里的桶都转移完毕了
//如果此时别的线程还没有将整个数组转移完毕(transferIndex <= 0 不成立),
//则当前线程会继续往前竞争下一个区间,这也就是如果只有一个线程的话,也可以迁移完成

while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
//某个线程的第二次循环时候才会走到这里来,说明 i 还在该线程需要转移的区间内
//[bound <--> i <--> transferIndex)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
//transferIndex <= 0 说明将已经到最前面了,将 i 赋值为 -1
//则会走到 ③ 处执行,简单来说 ③ 处会做一些判断是否该线程要退出迁移操作
i = -1;
advance = false;
}
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//走到这里说明 i < bound 且 transferIndex > 0 (即整个 table 的转移还没处理完成)
//通过 cas 继续认领一段区间进行转移
//计算出当前线程所需要处理的区间
//nextBound 就是下一个线程的下边界,也就是当先线程的上边界
bound = nextBound;
//i 为索引值,所以需要 - 1
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
//-----③----
//如果 i < 0 或者 i >= n 说明已经超出了所需要处理的区间了
int sc;
if (finishing) {
//------①-----
//如果 finishing 为 true ,说明所有的桶都处理完了
//则将新的数组赋值给 table 并将 nextTable 置为空
nextTable = null;
table = nextTab;
//并且设置新的阈值,新数组的长度为 2n,所以阈值应该为 0.75 * 2n,即 2n - 0.5n
sizeCtl = (n << 1) - (n >>> 1);
//返回,结束迁移
return;
}
//走到这里说明 finishing 为 false,即还未处理完所有的桶
//但当前线程需要处理的区间内的桶已经处理完了,所以将 sizeCtl 减一表示正在迁移的线程数 -1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
//if 中为 true 说明当前线程不是最后一个在进行迁移的线程,返回,结束本线程的迁移
return;
//走到这里说明当前线程是第一个参与迁移的线程
//将 finishing 和 advance 均设为 true
//并把 i 设置为 n(即旧数组 table 的长度)
//这样下一次循环就会走入 ① 处的逻辑结束迁移
finishing = advance = true;
i = n; // recheck before commit

}
}
else if ((f = tabAt(tab, i)) == null)
//如果索引为 i 的桶处为 null,则通过 cas 方式将 fwd 放在该桶处
//接着将 advance 设置为 true,退出循环继续往前
advance = casTabAt(tab, i, null, fwd);
else if ((fh = f.hash) == MOVED)
//如果索引为 i 处的桶上的数据不为空,且其 hash 值为 MOVED
//代表该处的数据已经迁移过了,将 advance 设置为 true,退出循环继续往前
advance = true; // already processed
else {
//否则,索引为 i 处的桶上有数据,则对桶处的头节点加锁
//进行迁移,迁移完成后会将 advance 设置为 true
synchronized (f) {
if (tabAt(tab, i) == f) {
//二次确认索引为 i 处的桶的节点为 f
//进行数据迁移,见后文细说
}
}
}
}
}
}

链表/树的转移

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
//如果头节点 f 的 hash 值大于0,说明为链表
//计算出 fh 和 n 的与运算的结果,由于 n 为数组的长度,是2的次幂,所以只有最高位为1,其他都为0
//与运算计算出来后 runBit 要么为 0 ,要么为 n
//如果 runBid == 0 说明扩容后 f 依旧在原来的 index 处,否则在 index + n 处
//
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
//遍历链表,找到第一个后面全部为相同的 runBit 的节点
//举个例子
//链表 : A->B->C->D->E->F->G
//runBit 值: n->0->n->0->0->0->0
//则这个 for 循环跑完以后,lastRun 则为D,runBit = 0
//后续则可以将这个 D 后面的所有节点都一起挪动过去,就不需要再一个一个处理
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
//如果 runBit == 0 说明这些节点还要放在相同 index 的桶里,赋值给 ln
ln = lastRun;
hn = null;
}
else {
//否则 runBit == n 说明这些节点还要放在相同 index + n 的桶里,赋值给 hn
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
//循环,构建两个链表,将节点插入对应的链表的尾部
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//将 ln 插入新的 table 的 i 处
setTabAt(nextTab, i, ln);
//将 ln 插入新的 table 的 i + n 处
setTabAt(nextTab, i + n, hn);
//将旧的 table i 处置为 fwd
setTabAt(tab, i, fwd);
//更新 advance 为 true,准备下次 for 循环
advance = true;
}
else if (f instanceof TreeBin) {
//同理对 TreeBin 进行迁移
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}

元素数量计数

size()

获取 map 中键值对的数量

返回 sumCount 方法的值,如果小于0,则返回0,如果超过了 Integer.MAX_VALUE,则返回 Integer.MAX_VALUE

1
2
3
4
5
6
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

sumCount()

返回 baseCountcounterCells 列表中值的总和 的和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}

那这个 baseCount 和 counterCells 是什么呢,我们在前面 putVal 的流程中还没有介绍 addCount(1L,binCount) 这个方法。通过名字我们也能看出来这是个用来增加计数的方法

我们先来看这个方法

addCount(long x,int check)

x 表示新增的节点数

check 表示是否需要进行扩容检查,如果 < 0 则不进行扩容

从前面 putVal 中我们可以知道,当一个键值对节点插入到链表中时, check 的值为链表的长度,当插入到红黑树中时,check 为 2

这个方法分为两个部分,前一部分是通过 LongAdder 的方式,进行计数,后半部分中会进行扩容迁移操作后,再进行计数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//第一个判断:(as = counterCells) != null
//表示 counterCells 已经被初始化了
//第二个判断:将 baseCount 通过 cas 的方式修改为 baseCount + x
//如果修改成功说明没有竞争
//如果修改失败,说明存在竞争,则进入下面的逻辑对 counterCells 进行处理
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
//前两个判断 as == null || (m = as.length - 1) < 0
//判断 counterCells 是否初始化了,并且数组中是否有内容
//如果为还未初始化或者数组中没有内容,则调用 fullAddCount(x, true)
//第三个判断是在前两个判断都为 false 的基础上
//进一步获取 counterCells 中当前线程所对应的 CounterCell 元素
//如果为空,则调用 fullAddCount(x, true)
//如果不为空,则尝试对 CounterCell 通过 cas 进行累加,累加成功则进行下一步
//累加不成功说明存在竞争,则调用 fullAddCount(x, false)
//uncontended 代表是否无竞争
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//详见后面的小节
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
//当 check >= 0,则尝试是否需要扩容
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//当下面的三个条件都满足时,才会进入这里
//1. 当 s (节点数量) >= 阈值 sizeCtl 了
//2. table 不为 null
//3. table 的长度小于最大容量
//

//rs 的高16位为0,低16为存储数组长度 n 计算出来的值
int rs = resizeStamp(n);
if (sc < 0) {(
//
//sc 右移16位后,高16位为0,低16位为数组长度计算出的扩容标识
//(sc >>> RESIZE_STAMP_SHIFT) != rs:扩容标识不同,
// 说明数组长度发生了变化,可能是别的线程触发了第二次扩容
//sc == rs + MAX_RESIZERS:已经达到最大的扩容线程数量了
//(nt = nextTable) == null :nextTable 还没创建
//transferIndex <= 0 需要迁移的区间已经被别的线程分完了

//实际上这里有两个 bug
//1. sc == rs + 1
//sc 现在是个负数,rs 高16位都为 0,无论如何 rs + 1 都不会等于 sc
//fix:
//sc == (rs << RESIZE_STAMP_SHIFT) + 1
//
//
//
//2. sc == rs + MAX_RESIZERS
//MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1 = (1 << 16) -1 = 65535
//rs 的取值范围为 [32769,32799] 而 rs + MAX_RESIZERS > 0 且还没溢出为负数
//也无论如何都不会相等
//fix:
//sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
//
//将 rs 左移 16位,其高16位才是扩容标识,低16为存储扩容的线程数 + 1
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
//不需要扩容
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
//此时有别的线程在扩容了,则帮助进行扩容
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
//进行扩容,这里是第一个进行扩容的线程
transfer(tab, null);
//进行计数
s = sumCount();
}
}
}

fullAddCount

private final void fullAddCount(long x, boolean wasUncontended)

x表示新增的节点数量

wasUncontented 表示是否无竞争

true == 表示没有竞争
false == 表示有竞争

在 ConcurrentHashMap 中,其实是复制了一份 LongAdder 的源码,在 ConcurrentHashMap 中,由于并发情况多,如果使用 AtomicLong 的方式进行记录,如果 N 个线程同时对 AtomicLong 进行修改,只有一个线程能修改成功,其他线程则会处于自旋等待状态,而 LongAdder 的方式是使用一个变量 baseCount+数组 CounterCell[],让每个线程去维护自己的一个变量,减少碰撞冲突,每个线程维护数组中的一个对象,对象中存储一个值;从 CounterCell[] 中获取到对应的值并进行修改,如果修改失败,则尝试修改 baseCount 的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
if ((h = ThreadLocalRandom.getProbe()) == 0) {
//如果还没有初始化,则进行初始化
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
//还没初始化,所以不存在竞争,将 wasUncontended 设置为 true
wasUncontended = true;
}
//collide 表示是否多个线程 hash 到同一个 CounterCell 产生碰撞了
boolean collide = false; // True if last slot nonempty
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
if ((as = counterCells) != null && (n = as.length) > 0) {
//如果 counterCells 不为 null 且 counterCells 长度大于 0
if ((a = as[(n - 1) & h]) == null) {
//取到 counterCells 中对应线程的 CounterCell 为空
//cellsBusy 字段为 1 说明 counterCells 正在初始化或者扩容
if (cellsBusy == 0) { // Try to attach new Cell
//下面为尝试创建一个 CounterCell 对象并存到 counterCells 数组对应的索引处
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
//counterCells 为正常状态且修改 cellsBusy 为1 成功
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
//j 是当前线程的 hash 值计算得出的索引
//对 counterCells 中 j 处的元素进行替换
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
//创建成功,退出循环
break;
//创建不成功,则进行下一次循环
//可能是因为 rs[j] != null,即该处有值了
continue; // Slot is now non-empty
}
}
collide = false;
}
else if (!wasUncontended)
// cas 失败了,则设置为 true,在下次循环的时候重新计算 hash 再进行分配
wasUncontended = true; // Continue after rehash
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
//修改当前 CounterCell 中的 value 成功,则退出
break;
else if (counterCells != as || n >= NCPU)
//counterCells != as 说明 counterCells 被扩容了
//或者 as 大于等于 CPU 的数量了,等待下一轮重试
collide = false; // At max size or stale
else if (!collide)
//上述条件都不满足,说明产生了碰撞,且竞争失败了
//将值 collide 修改为 true,下一次循环就可以走到下个 if 中
collide = true;
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
//counterCells 处于正常状态,且修改 cellsBusy 值为1成功
//则对 counterCells 进行扩容并迁移数据,
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//重新计算 hash 值
h = ThreadLocalRandom.advanceProbe(h);
}//这里是 (as = counterCells) != null && (n = as.length) > 0 这个条件结束的地方
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
//走到这里说明 counterCells 为空,则进行初始化,容量为2,并将 CounterCell 放进数组中
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//走到这里说明 counterCells 为空,且竞争初始化失败了,则尝试将 x 加到 baseCount 上
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break;// Fall back on using base
}
}

小结

至此,我们就看完了 ConcurrentHashMap 中的 put 的方法

参考链接

作者

PPTing

发布于

2021-12-23

更新于

2022-02-12

许可协议

评论