來(lái)源:占小狼, www.jianshu.com/p/e694f1e868ec 如有好文章投稿,請(qǐng)點(diǎn)擊 → 這里了解詳情
知止而后有定,定而后能靜,靜而后能安,安而后能慮,慮而后能得。
ConcurrentHashMap
在多線程環(huán)境下,使用HashMap進(jìn)行put操作時(shí)存在丟失數(shù)據(jù)的情況,為了避免這種bug的隱患,強(qiáng)烈建議使用ConcurrentHashMap代替HashMap,為了對(duì)ConcurrentHashMap有更深入的了解,本文將對(duì)ConcurrentHashMap1.7和1.8的不同實(shí)現(xiàn)進(jìn)行分析。
1.7實(shí)現(xiàn)
數(shù)據(jù)結(jié)構(gòu)
jdk1.7中采用Segment + HashEntry的方式進(jìn)行實(shí)現(xiàn),結(jié)構(gòu)如下:
ConcurrentHashMap初始化時(shí),計(jì)算出Segment數(shù)組的大小ssize和每個(gè)Segment中HashEntry數(shù)組的大小cap,并初始化Segment數(shù)組的第一個(gè)元素;其中ssize大小為2的冪次方,默認(rèn)為16,cap大小也是2的冪次方,最小值為2,最終結(jié)果根據(jù)根據(jù)初始化容量initialCapacity進(jìn)行計(jì)算,計(jì)算過(guò)程如下:
if (c * ssize <> ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap <> cap <=>=>
其中Segment在實(shí)現(xiàn)上繼承了ReentrantLock,這樣就自帶了鎖的功能。
put實(shí)現(xiàn)
當(dāng)執(zhí)行put方法插入數(shù)據(jù)時(shí),根據(jù)key的hash值,在Segment數(shù)組中找到相應(yīng)的位置,如果相應(yīng)位置的Segment還未初始化,則通過(guò)CAS進(jìn)行賦值,接著執(zhí)行Segment對(duì)象的put方法通過(guò)加鎖機(jī)制插入數(shù)據(jù),實(shí)現(xiàn)如下:
場(chǎng)景:線程A和線程B同時(shí)執(zhí)行相同Segment對(duì)象的put方法
線程A執(zhí)行tryLock()方法成功獲取鎖,則把HashEntry對(duì)象插入到相應(yīng)的位置;
線程B獲取鎖失敗,則執(zhí)行scanAndLockForPut()方法,在scanAndLockForPut方法中,會(huì)通過(guò)重復(fù)執(zhí)行tryLock()方法嘗試獲取鎖,在多處理器環(huán)境下,重復(fù)次數(shù)為64,單處理器重復(fù)次數(shù)為1,當(dāng)執(zhí)行tryLock()方法的次數(shù)超過(guò)上限時(shí),則執(zhí)行l(wèi)ock()方法掛起線程B;
當(dāng)線程A執(zhí)行完插入操作時(shí),會(huì)通過(guò)unlock()方法釋放鎖,接著喚醒線程B繼續(xù)執(zhí)行;
size實(shí)現(xiàn)
因?yàn)镃oncurrentHashMap是可以并發(fā)插入數(shù)據(jù)的,所以在準(zhǔn)確計(jì)算元素時(shí)存在一定的難度,一般的思路是統(tǒng)計(jì)每個(gè)Segment對(duì)象中的元素個(gè)數(shù),然后進(jìn)行累加,但是這種方式計(jì)算出來(lái)的結(jié)果并不一樣的準(zhǔn)確的,因?yàn)樵谟?jì)算后面幾個(gè)Segment的元素個(gè)數(shù)時(shí),已經(jīng)計(jì)算過(guò)的Segment同時(shí)可能有數(shù)據(jù)的插入或則刪除,在1.7的實(shí)現(xiàn)中,采用了如下方式:
try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length;=""> ensureSegment(j).lock(); // force creation } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length;="" ++j)=""> Segment seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; int c = seg.count; if (c < 0="" ||="" (size="" +="c)"><> overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length;=""> segmentAt(segments, j).unlock(); } }
先采用不加鎖的方式,連續(xù)計(jì)算元素的個(gè)數(shù),最多計(jì)算3次:
如果前后兩次計(jì)算結(jié)果相同,則說(shuō)明計(jì)算出來(lái)的元素個(gè)數(shù)是準(zhǔn)確的;
如果前后兩次計(jì)算結(jié)果都不同,則給每個(gè)Segment進(jìn)行加鎖,再計(jì)算一次元素的個(gè)數(shù);
1.8實(shí)現(xiàn)
數(shù)據(jù)結(jié)構(gòu)
1.8中放棄了Segment臃腫的設(shè)計(jì),取而代之的是采用Node + CAS + Synchronized來(lái)保證并發(fā)安全進(jìn)行實(shí)現(xiàn),結(jié)構(gòu)如下:
只有在執(zhí)行第一次put方法時(shí)才會(huì)調(diào)用initTable()初始化Node數(shù)組,實(shí)現(xiàn)如下:
private final Node[] initTable() { Node[] tab; int sc; while ((tab = table) == null || tab.length == 0) { if ((sc = sizeCtl) <> Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings('unchecked') Node[] nt = (Node[])new Node,??>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
put實(shí)現(xiàn)
當(dāng)執(zhí)行put方法插入數(shù)據(jù)時(shí),根據(jù)key的hash值,在Node數(shù)組中找到相應(yīng)的位置,實(shí)現(xiàn)如下:
1、如果相應(yīng)位置的Node還未初始化,則通過(guò)CAS插入相應(yīng)的數(shù)據(jù);
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { if (casTabAt(tab, i, null, new Node(hash, key, value, null))) break; // no lock when adding to empty bin }
2、如果相應(yīng)位置的Node不為空,且當(dāng)前該節(jié)點(diǎn)不處于移動(dòng)狀態(tài),則對(duì)該節(jié)點(diǎn)加synchronized鎖,如果該節(jié)點(diǎn)的hash不小于0,則遍歷鏈表更新節(jié)點(diǎn)或插入新節(jié)點(diǎn);
if (fh >= 0) { binCount = 1; for (Node e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; if ((e = e.next) == null) { pred.next = new Node(hash, key, value, null); break; } } }
3、如果該節(jié)點(diǎn)是TreeBin類型的節(jié)點(diǎn),說(shuō)明是紅黑樹(shù)結(jié)構(gòu),則通過(guò)putTreeVal方法往紅黑樹(shù)中插入節(jié)點(diǎn);
else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } }
4、如果binCount不為0,說(shuō)明put操作對(duì)數(shù)據(jù)產(chǎn)生了影響,如果當(dāng)前鏈表的個(gè)數(shù)達(dá)到8個(gè),則通過(guò)treeifyBin方法轉(zhuǎn)化為紅黑樹(shù),如果oldVal不為空,說(shuō)明是一次更新操作,沒(méi)有對(duì)元素個(gè)數(shù)產(chǎn)生影響,則直接返回舊值;
if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; }
5、如果插入的是一個(gè)新節(jié)點(diǎn),則執(zhí)行addCount()方法嘗試更新元素個(gè)數(shù)baseCount;
size實(shí)現(xiàn)
1.8中使用一個(gè)volatile類型的變量baseCount記錄元素的個(gè)數(shù),當(dāng)插入新數(shù)據(jù)或則刪除數(shù)據(jù)時(shí),會(huì)通過(guò)addCount()方法更新baseCount,實(shí)現(xiàn)如下:
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; boolean uncontended = true; if (as == null || (m = as.length - 1) < 0=""> (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return; } if (check <=>=> return; s = sumCount(); }
1、初始化時(shí)counterCells為空,在并發(fā)量很高時(shí),如果存在兩個(gè)線程同時(shí)執(zhí)行CAS修改baseCount值,則失敗的線程會(huì)繼續(xù)執(zhí)行方法體中的邏輯,使用CounterCell記錄元素個(gè)數(shù)的變化;
2、如果CounterCell數(shù)組counterCells為空,調(diào)用fullAddCount()方法進(jìn)行初始化,并插入對(duì)應(yīng)的記錄數(shù),通過(guò)CAS設(shè)置cellsBusy字段,只有設(shè)置成功的線程才能初始化CounterCell數(shù)組,實(shí)現(xiàn)如下:
else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) { CounterCell[] rs = new CounterCell[2]; rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; }
3、如果通過(guò)CAS設(shè)置cellsBusy字段失敗的話,則繼續(xù)嘗試通過(guò)CAS修改baseCount字段,如果修改baseCount字段成功的話,就退出循環(huán),否則繼續(xù)循環(huán)插入CounterCell對(duì)象;
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break;
所以在1.8中的size實(shí)現(xiàn)比1.7簡(jiǎn)單多,因?yàn)樵貍€(gè)數(shù)保存baseCount中,部分元素的變化個(gè)數(shù)保存在CounterCell數(shù)組中,實(shí)現(xiàn)如下:
public int size() { long n = sumCount(); return ((n < 0l)="" 0=""> (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); } final long sumCount() { CounterCell[] as = counterCells; CounterCell a; long sum = baseCount; if (as != null) { for (int i = 0; i < as.length;="" ++i)=""> if ((a = as[i]) != null) sum += a.value; } } return sum; }
通過(guò)累加baseCount和CounterCell數(shù)組中的數(shù)量,即可得到元素的總個(gè)數(shù)。
看完本文有收獲?請(qǐng)轉(zhuǎn)發(fā)分享給更多人 關(guān)注「ImportNew」,提升Java技能
|