本文基于 JDK 1.8 分析
HashMap 是非线程安全的类,在多线程并发 put 导致 resize,在 transfer 过程中可能导致死锁或者数据丢失
而 ConcurrentHashMap 则是一个线程安全的 Map 类,在 HashMap 的基础上做了线程安全的处理
基础知识 先大概对 ConcurrentHashMap 的数据结构模型有个大概的了解
跟 HashMap 一样,由一个数组和多个链表/红黑树组成
常量 MAXIMUM_CAPACITY
= 1 << 30
最大的容量,因为 32 位的 hash 值的前两位为控制位,所以最大只到 1<<30
DEFAULT_CAPACITY
= 16
默认初始容量,必须为2的次幂,最低位 1,最大为 MAXIMUM_CAPACITY
DEFAULT_CONCURRENCY_LEVEL
= 16
默认的并发级别
LOAD_FACTOR
= 0.75
负载因子
TREEIFY_THRESHOLD
= 8
MIN_TREEIFY_CAPACITY
= 64
当插入时,链表上的节点数量超过 TREEIFY_THRESHOLD
,且 table 的长度也超过了 MIN_TREEIFY_CAPACITY
,则会将链表转为树
否则进行扩容操作
sizeCtl sizeCtl 字段是用来表示 table 的初始化状态或者用来控制 table 数组的容量
如果 sizeCtl 为负数,则表示 table 正在初始化或者在调整容量
如果 sizeCtl 不为负数,如果 table 为 null,则保存初始化 ConcurrentHashMap 时的容量,为0或者默认值
初始化以后,则保存的是要进行扩容的阈值
sizeCtl = -1 表示正在初始化
sizeCtl < 0 && sizeCtl! = -1 表示正在扩容中,且 sizeCtl 的低16位的值表示正在扩容的线程数 + 1,高16位为扩容标识,是由数组长度计算得到的值
sizeCtl ≥ 0 && table == null 表示初始化时计算出的默认容量
sizeCtl ≥ 0 && table != null 表示需要扩容的阈值
Node<K,V> Node<K,V>
是 ConcurrentHashMap 的一个静态内部类,实现了 Map.Entry<K,V>
接口,用来保存键值对信息,注意这里的 val
和 next
字段都是用了 volatile
修饰,以保证其线程间的可见性
1 2 3 4 5 6 static class Node <K ,V > implements Map .Entry <K ,V > { final int hash; final K key; volatile V val; volatile Node<K,V> next; }
构造方法 ConcurrentHashMap 有多个构造方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public ConcurrentHashMap () { } public ConcurrentHashMap (int initialCapacity) { if (initialCapacity < 0 ) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1 )) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1 ) + 1 )); this .sizeCtl = cap; } public ConcurrentHashMap (int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0.0f ) || initialCapacity < 0 || concurrencyLevel <= 0 ) throw new IllegalArgumentException(); if (initialCapacity < concurrencyLevel) initialCapacity = concurrencyLevel; long size = (long )(1.0 + (long )initialCapacity / loadFactor); int cap = (size >= (long )MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : tableSizeFor((int )size); this .sizeCtl = cap; }
initialCapacity: 传入的初始容量
loadFactor : 负载因子
在构造方法中,实际上,是想通过传入的 initialCapacity 参数,并根据 loadFactor 计算出数组的初始化容量 cap,并且数组的阈值要大于 initialCapacity , 在 initTable()
方法中会使用这个 sizeCtl 值初始化 table 数组的容量
But,但是
这里的 public ConcurrentHashMap(int initialCapacity)
实际上是有 bug 的,在某些情况下,和下面三个参数的构造方法计算出来的 cap 值并不一致。
1 2 new ConcurrentHashMap(22 )new ConcurrentHashMap(22 ,0.75 ,1 )
在一个参数的构造方法中,计算出来的 cap 值为 64,而三参数的构造方法计算所得到的 cap 为 32
这个问题直到 JDK 12 才被修复
value of ‘sizeCtl’ in ConcurrentHashMap varies with the constructor called
tableSizeFor(int size) tableSizeFor(int size)
会返回大于等于 size 的最小的 2^n 的
例如:tableSizeFor(22) = 32 ; tableSizeFor(3) = 4
具体 tableSizeFor(size)
的方法可以参考
HashMap之tableSizeFor方法图解
添加元素 put 方法 final V putVal(K key , V value , boolean onlyIfAbsent)
💡 添加键值对,会进行初始化 table 数组、扩容、链表转树等操作
先看源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null ,new Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new Node<K,V>(hash, key, value, null ); break ; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2 ; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } else if (f instanceof ReservationNode) throw new IllegalStateException("Recursive update" ); } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
总结一下:
①. 初始化数组
通过 spread 方法计算 hash 值,并开始死循环,直到插入成功后退出
如果 table 还未初始化,则先通过 initTable() 初始化数组
②. index 处为 null
通过 spread() 方法计算出对应的索引 i ,如果数组 i 索引处的值为 null ,则通过 cas 的方式进行插入,如果 cas 失败,说明有别的线程在对该处进行操作,则进行下一次 for 循环,下一次 for 循环则会进入 3、4 步中
③. 帮助扩容
如果 index 处节点不为 null 且 index 处的节点 hash 值为 MOVED ,则进行 helpTransfer() 方法
④. 普通插入
上述几步都不满足,则说明 index 上有个普通的节点,或为链表,或为树,进行更新或者插入操作
这里需要注意的是,会将 f(即 index 处的节点) 进行加锁,并会再次检查此时的 table index 索引处的节点是否被改变了,如果改变了,则进入下一次循环,如果没有改变依旧为 f ,则进行更新或者插入操作
如果是链表,则遍历链表覆盖更新值或者在链表尾部插入新的键值对(binCount = 原链表长度)
如果是树,则将键值对插入树中
添加完成后,判断该节点上的链表长度,如果超过阈值,则调用 treeifyBin 转为树或者扩容数组,最后调用 addCount 添加计数,视机扩容数组等
initTable() 💡 初始化 table 数组
接着看源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0 ) { if ((sc = sizeCtl) < 0 ) Thread.yield(); else if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if ((tab = table) == null || tab.length == 0 ) { int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } break ; } } return tab; } public final native boolean compareAndSwapInt (Object o, long offset,int expected,int x}; 这个方法的作用是,读取传入的对象 o 在内存中偏移 offset 的值,并和 expected 比较 如果相同,则将 x 赋值给内存中偏移 offset 位置的值,并返回 true 否则不赋值返回 false
总结:
如果 sizeCtl 小于 0 则让出,则暂停自己的线程,以便其他线程去执行初始化操作
如果 sizeCtl 大于等于 0 则初始化一个 sizeCtl 大小的数组,并更新 sizeCtl 为自身的负载因子倍( * 0.75)
这里通过 Unsafe.compareAndSwapInt 保证了线程安全
假如有多个线程同时调用了 initTable 方法,只有一个线程的 U.compareAndSwapInt(this, SIZECTL, sc, -1)
会返回 true 走入该 if 中的代码去执行初始化操作,假如有第三个线程来了,则会走入到 Thread.yield() 这里去让出时间碎片
helpTransfer 如果在某个线程在进行转移节点数据,则进行判断,满足条件就帮助转移,并返回 table 数组,以便在下次循环中进行插入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null ) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) { transfer(tab, nextTab); break ; } } return nextTab; } return table; }
resizeStamp(int n)
由于传入的参数 n 都是 2 的 n 次幂,这个方法实际上是用来将这个 n 通过算法将其记录在低16位中
TL;DR
返回值高16位记录为 0,低16位记录的是「扩容标识」,与数组长度有关
1 2 3 static final int resizeStamp (int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS- 1 )); }
其中 Integer.numberOfLeadingZeros(n)
返回 n 在二进制中最高位的1前面0的个数
1 2 3 4 比方说 1 的二进制为 0000 0000 0000 0000 0000 0000 0000 0001 则 `Integer.numberOfLeadingZeros(1 )` = 31 所以 Integer.numberOfLeadingZeros(int n) 的值的范围为 0 -32
接着看后半段,*RESIZE_STAMP_BITS
值为 16,*所以 (1 << (*RESIZE_STAMP_BITS - 1 )*
即1左移15位,得到 0000 0000 0000 0000 1000 0000 0000 0000
最后进行 |
运算*(有1则为1)*
我们以 resizeStamp(16) 为例
1 2 3 4 5 6 7 8 9 (十进制 16 ) == 0000 0000 0000 0000 0000 0000 0001 0000 Integer.numberOfLeadingZeros(16 ) = 27 == 0000 0000 0000 0000 0000 0000 0001 1001 | 或运算 (1 << (RESIZE_STAMP_BITS- 1 ) = 1 <<15 == 0000 0000 0000 0000 1000 0000 0000 0000 = 等于 0000 0000 0000 0000 1000 0000 0001 1001
可见 resizeStamp 方法会返回一个 高16为都为0,低16位为传入的参数 n 的 「最高位的1前面0的个数」
即高16位记录为 0,第16位为1,低15位记录的是「最高位的1前面0的个数」低16位构成一个「扩容标识」
可见该方法返回的值的取值范围为 [32769,32799]
treeifyBin(Node<K,V>[] tab,int index) 在往 ConcurrentHashMap 中插入元素后,会调用该方法进行扩容或者将链表转为树
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 private final void treeifyBin (Node<K,V>[] tab, int index) { Node<K,V> b; int n; if (tab != null ) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) tryPresize(n << 1 ); else if ((b = tabAt(tab, index)) != null && b.hash >= 0 ) { synchronized (b) { if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null , tl = null ; for (Node<K,V> e = b; e != null ; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null , null ); if ((p.prev = tl) == null ) hd = p; else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd)); } } } } }
总结:
当插入一个元素后,如果链表的长度超过TREEIFY_THRESHOLD == 8
,但整个 table 数组长度小于 MIN_TREEIFY_CAPACITY
==64,则进行扩容操作,如果超过了 MIN_TREEIFY_CAPACITY
则将链表转为树
tryPresize(int size) 💡 尝试预处理 table 数组的容量以容纳给定的数量的元素
这个方法只在 treeifyBin() 以及 putAll() 中被调用
treeifyBin() 方法传入的 size 为 table.length << 1
putAll() 传入的 size 为 m.size()
即原 Map 的节点数
以 table.length == 16 为例,调用该方法时候,size = table.length << 1 即 32,计算出来的 c 为 64,直到 c ≤ sc 时才会退出循环,则 sc 至少需要为 128 * 0.75 = 96 才会退出循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 private final void tryPresize (int size) { int c = (size >= (MAXIMUM_CAPACITY >>> 1 )) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1 ) + 1 ); int sc; while ((sc = sizeCtl) >= 0 ) { Node<K,V>[] tab = table; int n; if (tab == null || (n = tab.length) == 0 ) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } } } else if (c <= sc || n >= MAXIMUM_CAPACITY) break ; else if (tab == table) { int rs = resizeStamp(n); if (sc < 0 ) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); } } }
transfer() transfer(Node<K,V> tab, Node<K,V>[] nextTab) 💡 将旧数组的数据迁移到新的数组中,支持多线程迁移
字段说明 transferIndex
是个全局变量,通过 volatile 修饰,并通过 cas 的方式进行修改,用来表示当前线程应该从哪个地方开始往前遍历,如果 transferIndex <= 0 说明对数组的转移已经到头了,不需要再继续处理了
i
字段表示当前线程在迁移的桶的索引
bound
字段表示当前线程的需要迁移的区间的上边界
总结 即每个线程的迁移区间为 [bound,transferIndex),并通过 i 在该区间从后往前遍历桶进行迁移处理,遍历区间完成后,会继续竞争下一个区间
为了高效的进行迁移数据,这里允许多个线程进入,但是给每个线程分配了旧数组的一段区间进行迁移,避免多个线程同时迁移同个区间,代码中通过 cas 的方式,
让多个线程通过 cas 的操作竞争 transferIndex 字段,并通过 transferIndex 字段和 stride 计算出每个线程的迁移区间,所以线程在处理完竞争到的某个区间后,会不停地往前竞争处理下一个区间,直到处理到数组最前面的区间,比如说[0,16) 的区间后,transferIndex 等于 0,所以 i = -1,
就会将 sizeCtl 中低16位中存储的线程数减1后 return,退出迁移,等到所有的线程对所有的区间处理完毕后,其他线程都退出迁移操作了,最后一个线程会将 finish 设置为 true,
再进行下一次循环后会将新的 nextTab 赋值给 table 并将 nextTable 置为空
从 HashMap 的迁移算法中我们也可以直到,对于一个节点,在扩容迁移后,要么还在原来的桶(假设索引为 index)中,要么就在索引为 2 * index 的桶处
同理在 ConcurrentHashMap 中也是这样子,所以不会造成多个线程同时转移到同一个桶中
首先会计算每个线程需要进行迁移的步长 stride,即迁移的桶的数量,至少为 16
在进行迁移时,会从后往前对 table 数组进行遍历,逐步对桶内的数据(链表或者树)进行迁移
由于篇幅太长,我们假设步长 stride 为4,举例进行说明
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 private final void transfer (Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1 ) ? (n >>> 3 ) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; if (nextTab == null ) { try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1 ]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return ; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true ; boolean finishing = false ; for (int i = 0 , bound = 0 ;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false ; else if ((nextIndex = transferIndex) <= 0 ) { i = -1 ; advance = false ; } else if (U.compareAndSwapInt (this , TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0 ))) { bound = nextBound; i = nextIndex - 1 ; advance = false ; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null ; table = nextTab; sizeCtl = (n << 1 ) - (n >>> 1 ); return ; } if (U.compareAndSwapInt(this , SIZECTL, sc = sizeCtl, sc - 1 )) { if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return ; finishing = advance = true ; i = n; } } else if ((f = tabAt(tab, i)) == null ) advance = casTabAt(tab, i, null , fwd); else if ((fh = f.hash) == MOVED) advance = true ; else { synchronized (f) { if (tabAt(tab, i) == f) { } } } } } }
链表/树的转移 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0 ) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null ; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0 ) { ln = lastRun; hn = null ; } else { hn = lastRun; ln = null ; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null , loTail = null ; TreeNode<K,V> hi = null , hiTail = null ; int lc = 0 , hc = 0 ; for (Node<K,V> e = t.first; e != null ; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null , null ); if ((h & n) == 0 ) { if ((p.prev = loTail) == null ) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null ) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0 ) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0 ) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } }
元素数量计数 size()
获取 map 中键值对的数量
返回 sumCount
方法的值,如果小于0,则返回0,如果超过了 Integer.MAX_VALUE,则返回 Integer.MAX_VALUE
1 2 3 4 5 6 public int size () { long n = sumCount(); return ((n < 0L ) ? 0 : (n > (long )Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int )n); }
sumCount() 返回 baseCount
和 counterCells 列表中值的总和
的和
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 final long sumCount () { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null ) { for (int i = 0 ; i < as.length; ++i) { if ((a = as[i]) != null ) sum += a.value; } } return sum; } @sun .misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
那这个 baseCount 和 counterCells 是什么呢,我们在前面 putVal 的流程中还没有介绍 addCount(1L,binCount) 这个方法。通过名字我们也能看出来这是个用来增加计数的方法
我们先来看这个方法
addCount(long x,int check) x
表示新增的节点数
check
表示是否需要进行扩容检查,如果 < 0 则不进行扩容
从前面 putVal 中我们可以知道,当一个键值对节点插入到链表中时, check 的值为链表的长度,当插入到红黑树中时,check 为 2
这个方法分为两个部分,前一部分是通过 LongAdder 的方式,进行计数,后半部分中会进行扩容迁移操作后,再进行计数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 private final void addCount (long x, int check) { CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } if (check >= 0 ) { Node<K,V>[] tab, nt; int n, sc; while (s >= (long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0 ) {( if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); s = sumCount(); } } }
fullAddCount private final void fullAddCount(long x, boolean wasUncontended)
x
表示新增的节点数量
wasUncontented
表示是否无竞争
true == 表示没有竞争
false == 表示有竞争
在 ConcurrentHashMap 中,其实是复制了一份 LongAdder 的源码,在 ConcurrentHashMap 中,由于并发情况多,如果使用 AtomicLong 的方式进行记录,如果 N 个线程同时对 AtomicLong 进行修改,只有一个线程能修改成功,其他线程则会处于自旋等待状态,而 LongAdder 的方式是使用一个变量 baseCount+数组 CounterCell[],让每个线程去维护自己的一个变量,减少碰撞冲突,每个线程维护数组中的一个对象,对象中存储一个值;从 CounterCell[] 中获取到对应的值并进行修改,如果修改失败,则尝试修改 baseCount 的值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 private final void fullAddCount (long x, boolean wasUncontended) { int h; if ((h = ThreadLocalRandom.getProbe()) == 0 ) { ThreadLocalRandom.localInit(); h = ThreadLocalRandom.getProbe(); wasUncontended = true ; } boolean collide = false ; for (;;) { CounterCell[] as; CounterCell a; int n; long v; if ((as = counterCells) != null && (n = as.length) > 0 ) { if ((a = as[(n - 1 ) & h]) == null ) { if (cellsBusy == 0 ) { CounterCell r = new CounterCell(x); if (cellsBusy == 0 && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { boolean created = false ; try { CounterCell[] rs; int m, j; if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1 ) & h] == null ) { rs[j] = r; created = true ; } } finally { cellsBusy = 0 ; } if (created) break ; continue ; } } collide = false ; } else if (!wasUncontended) wasUncontended = true ; else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break ; else if (counterCells != as || n >= NCPU) collide = false ; else if (!collide) collide = true ; else if (cellsBusy == 0 && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { try { if (counterCells == as) { CounterCell[] rs = new CounterCell[n << 1 ]; for (int i = 0 ; i < n; ++i) rs[i] = as[i]; counterCells = rs; } } finally { cellsBusy = 0 ; } collide = false ; continue ; } h = ThreadLocalRandom.advanceProbe(h); } else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this , CELLSBUSY, 0 , 1 )) { boolean init = false ; try { if (counterCells == as) { CounterCell[] rs = new CounterCell[2 ]; rs[h & 1 ] = new CounterCell(x); counterCells = rs; init = true ; } } finally { cellsBusy = 0 ; } if (init) break ; } else if (U.compareAndSwapLong(this , BASECOUNT, v = baseCount, v + x)) break ; } }
小结 至此,我们就看完了 ConcurrentHashMap 中的 put 的方法
参考链接