摘要:數據結構重要成員變量代表整個哈希表??破?,解決多線程并行情況下使用鎖造成性能損耗的一種機制,操作包含三個操作數內存位置預期原值和新值。
ConcurrenHashMap 。下面分享一下我對ConcurrentHashMap 的理解,主要用于個人備忘。如果有不對,請批評。
HashMap“嚴重”的勾起了我對HashMap家族的好奇心,下面分享一下我對ConcurrentHashMap 的理解,主要用于個人備忘。如果有不對,請批評。
想要解鎖更多新姿勢?請訪問https://blog.tengshe789.tech/
總起HashMap是我們平時開發過程中用的比較多的集合,但它是非線程安全的,在涉及到多線程并發的情況,進行get操作有可能會引起死循環,導致CPU利用率接近100%。
因此需要支持線程安全的并發容器 ConcurrentHashMap 。
數據結構 重要成員變量/** * The array of bins. Lazily initialized upon first insertion. * Size is always a power of two. Accessed directly by iterators. */ transient volatile Node[] table;
table代表整個哈希表。 默認為null,初始化發生在第一次插入操作,默認大小為16的數組,用來存儲Node節點數據,擴容時大小總是2的冪次方。
/** * The next table to use; non-null only while resizing. */ private transient volatile Node[] nextTable;
nextTable是一個連接表,用于哈希表擴容,默認為null,擴容時新生成的數組,其大小為原數組的兩倍。
/** * Base counter value, used mainly when there is no contention, * but also as a fallback during table initialization * races. Updated via CAS. */ private transient volatile long baseCount;
baseCount保存著整個哈希表中存儲的所有的結點的個數總和,有點類似于 HashMap 的 size 屬性。 這個數通過CAS算法更新
/** * Table initialization and resizing control. When negative, the * table is being initialized or resized: -1 for initialization, * else -(1 + the number of active resizing threads). Otherwise, * when table is null, holds the initial table size to use upon * creation, or 0 for default. After initialization, holds the * next element count value upon which to resize the table. */ private transient volatile int sizeCtl;
初始化哈希表和擴容 rehash 的過程,都需要依賴sizeCtl。該屬性有以下幾種取值:
0:默認值
-1:代表哈希表正在進行初始化
大于0:相當于 HashMap 中的 threshold,表示閾值
小于-1:代表有多個線程正在進行擴容。(譬如:-N 表示有N-1個線程正在進行擴容操作 )
構造方法public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));//MAXIMUM_CAPACITY = 1 << 30 this.sizeCtl = cap;//ConcurrentHashMap在構造函數中只會初始化sizeCtl值,并不會直接初始化table,而是延緩到第一次put操作。 } public ConcurrentHashMap(Map extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY;//DEFAULT_CAPACITY = 16 putAll(m); }
構造方法是三個。重點是第二個,帶參的構造方法。這個帶參的構造方法會調用tableSizeFor()方法,確保table的大小總是2的冪次方(假設參數為100,最終會調整成256)。算法如下:
/** * Returns a power of two table size for the given desired capacity. * See Hackers Delight, sec 3.2 */ private static final int tableSizeFor(int c) { int n = c - 1; n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1; }PUT()方法
put()調用putVal()方法,讓我們看看:
final V putVal(K key, V value, boolean onlyIfAbsent) { //對傳入的參數進行合法性判斷 if (key == null || value == null) throw new NullPointerException(); int hash = spread(key.hashCode());//計算鍵所對應的 hash 值 int binCount = 0; for (Node[] tab = table;;) { Node f; int n, i, fh; //如果哈希表還未初始化,那么初始化它 if (tab == null || (n = tab.length) == 0) tab = initTable(); //根據hash值計算出在table里面的位置 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //如果這個位置沒有值 ,那么以CAS無鎖式向該位置添加一個節點 if (casTabAt(tab, i, null, new Node (hash, key, value, null))) break; // no lock when adding to empty bin } //檢測到桶結點是 ForwardingNode 類型,協助擴容(MOVED = -1; // hash for forwarding nodes) else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //桶結點是普通的結點,鎖住該桶頭結點并試圖在該鏈表的尾部添加一個節點 else { V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { //向普通的鏈表中添加元素 if (fh >= 0) { binCount = 1; //遍歷鏈表所有的結點 for (Node e = f;; ++binCount) { K ek; //如果hash值和key值相同,則修改對應結點的value值 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } Node pred = e; //如果遍歷到了最后一個結點,那么就證明新的節點需要插入鏈表尾部 if ((e = e.next) == null) { pred.next = new Node (hash, key, value, null); break; } } } //如果這個節點是樹節點,就按照樹的方式插入值 else if (f instanceof TreeBin) { Node p; binCount = 2; if ((p = ((TreeBin )f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //如果鏈表長度已經達到臨界值8,就需要把鏈表轉換為樹結構(TREEIFY_THRESHOLD = 8) if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //CAS 式更新baseCount,并判斷是否需要擴容 addCount(1L, binCount); return null; }
其實putVal()也多多少少掉用了其他方法,讓我們繼續探究一下。
CAS(compare and swap)科普compare and swap,解決多線程并行情況下使用鎖造成性能損耗的一種機制,CAS操作包含三個操作數——內存位置(V)、預期原值(A)和新值(B)。如果內存位置的值與預期原值相匹配,那么處理器會自動將該位置值更新為新值。否則,處理器不做任何操作。無論哪種情況,它都會在CAS指令之前返回該位置的值。CAS有效地說明了“我認為位置V應該包含值A;如果包含該值,則將B放到這個位置;否則,不要更改該位置,只告訴我這個位置現在的值即可。
spread首先,第四行出現的int hash = spread(key.hashCode());這是傳統的計算hash的方法。key的hash值高16位不變,低16位與高16位異或作為key的最終hash值。(h >>> 16,表示無符號右移16位,高位補0,任何數跟0異或都是其本身,因此key的hash值高16位不變。)
static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; }initTable
第十行, tab = initTable();這個方法的亮點是,可以讓put并發執行,實現table只初始化一次 。
initTable()核心思想就是,只允許一個線程對表進行初始化,如果有其他線程進來了,那么會讓其他線程交出 CPU 等待下次系統調度。這樣,保證了表同時只會被一個線程初始化。
private final Node[] initTable() { Node [] tab; int sc; //如果表為空才進行初始化操作 while ((tab = table) == null || tab.length == 0) { //如果一個線程發現sizeCtl<0,意味著另外的線程執行CAS操作成功,當前線程只需要讓出cpu時間片(放棄 CPU 的使用) if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //否則說明還未有線程對表進行初始化,那么本線程就來做這個工作 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { //sc 大于零說明容量已經初始化了,否則使用默認容量 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node,?>[n]; table = tab = nt; //計算閾值,等效于 n*0.75 sc = n - (n >>> 2); } } finally { //設置閾值 sizeCtl = sc; } break; } } return tab; }
接下來,第19行。 tab = helpTransfer(tab, f);這句話。要了解這個,首先需要知道ForwardingNode 這個節點類型。它一個用于連接兩個table的節點類。它包含一個nextTable指針,用于指向下一張hash表。而且這個節點的key、value、next指針全部為null,它的hash值為MOVED(static final int MOVED = -1)。
static final class ForwardingNodehelpTransferextends Node { final Node [] nextTable; ForwardingNode(Node [] tab) { super(MOVED, null, null, null); this.nextTable = tab; } //find的方法是從nextTable里進行查詢節點,而不是以自身為頭節點進行查找 Node find(int h, Object k) { // loop to avoid arbitrarily deep recursion on forwarding nodes outer: for (Node [] tab = nextTable;;) { Node e; int n; if (k == null || tab == null || (n = tab.length) == 0 || (e = tabAt(tab, (n - 1) & h)) == null) return null; for (;;) { int eh; K ek; if ((eh = e.hash) == h && ((ek = e.key) == k || (ek != null && k.equals(ek)))) return e; if (eh < 0) { if (e instanceof ForwardingNode) { tab = ((ForwardingNode )e).nextTable; continue outer; } else return e.find(h, k); } if ((e = e.next) == null) return null; } } } }
在擴容操作中,我們需要對每個桶中的結點進行分離和轉移。如果某個桶結點中所有節點都已經遷移完成了(已經被轉移到新表 nextTable 中了),那么會在原 table 表的該位置掛上一個 ForwardingNode 結點,說明此桶已經完成遷移。
helpTransfer什么作用呢?是檢測到當前哈希表正在擴容,然后讓當前線程去協助擴容 !
final Node[] helpTransfer(Node [] tab, Node f) { Node [] nextTab; int sc; if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode )f).nextTable) != null) {//新的table,nextTab已經存在前提下才能幫助擴容 int rs = resizeStamp(tab.length);//返回一個 16 位長度的擴容校驗標識 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {//sizeCtl 如果處于擴容狀態的話 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) //前 16 位是數據校驗標識,后 16 位是當前正在擴容的線程總數 //這里判斷校驗標識是否相等,如果校驗符不等或者擴容操作已經完成了,直接退出循環,不用協助它們擴容了 break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {//sc + 1 標識增加了一個線程進行擴容 transfer(tab, nextTab);//調用擴容方法 break; } } return nextTab; } return table; }
helpTransfer精髓的是可以調用多個工作線程一起幫助進行擴容,這樣的效率就會更高,而不是只有檢查到要擴容的那個線程進行擴容操作,其他線程就要等待擴容操作完成才能工作。
transfer既然這里涉及到擴容的操作,我們也一起來看看擴容方法transfer() :
private final void transfer(Node[] tab, Node [] nextTab) { int n = tab.length, stride; //計算單個線程允許處理的最少table桶首節點個數,不能小于 16 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //剛開始擴容,初始化 nextTab if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node [] nt = (Node [])new Node,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //transferIndex 指向最后一個桶,方便從后向前遍歷 transferIndex = n; } int nextn = nextTab.length; //定義 ForwardingNode 用于標記遷移完成的桶 ForwardingNode fwd = new ForwardingNode (nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab //i 指向當前桶,bound 指向當前線程需要處理的桶結點的區間下限 for (int i = 0, bound = 0;;) { Node f; int fh; //遍歷當前線程所分配到的桶結點 while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; //transferIndex <= 0 說明已經沒有需要遷移的桶了 else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } //更新 transferIndex //為當前線程分配任務,處理的桶結點區間為(nextBound,nextIndex) else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } //當前線程所有任務完成 if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } //待遷移桶為空,那么在此位置 CAS 添加 ForwardingNode 結點標識該桶已經被處理過了 else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); //如果掃描到 ForwardingNode,說明此桶已經被處理過了,跳過即可 else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { if (tabAt(tab, i) == f) { Node ln, hn; //鏈表的遷移操作 if (fh >= 0) { int runBit = fh & n; Node lastRun = f; //整個 for 循環為了找到整個桶中最后連續的 fh & n 不變的結點 for (Node p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node (ph, pk, pv, ln); else hn = new Node (ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } //紅黑樹的復制算法, else if (f instanceof TreeBin) { TreeBin t = (TreeBin )f; TreeNode lo = null, loTail = null; TreeNode hi = null, hiTail = null; int lc = 0, hc = 0; for (Node e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode p = new TreeNode (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin (lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin (hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
至此,put方法講完了
參考資料~參考資料
感謝
結束
此片完了~
想要了解更多精彩新姿勢?請訪問我的個人博客
本篇為原創內容,已經于07-06在個人博客率先發表,隨后CSDN,segmentfault,juejin同步發出。如有雷同,
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/76697.html
摘要:變量提升變量的聲明寫在可以在使用變量之后函數提升函數可以先調用,后聲明上面先解釋了下我理解的這兩個概念的定義。參考前端基礎進階三變量對象詳解關于變量提升的理解 變量提升:變量的聲明寫在可以在使用變量之后;函數提升:函數可以先調用,后聲明; 上面先解釋了下我理解的這兩個概念的定義。要真正理解它們,最好從變量對象的角度出發。引出變量對象的概念,要先理解執行上下文,也就是當控制器執行到可執行...
摘要:當容量超過容量負載因子時,進行擴容操作確定何時將沖突的鏈表轉換成紅黑樹用來確何時將紅黑樹轉換成鏈表當鏈表轉換成紅黑樹時,需要判斷數組容量。桶排序核心思想是根據數據規模劃分,個相同大小的區間每個區間為一個桶,桶可理解為容器。 剛剛看到QQ群有人吹Hashmap,一想我啥都不懂,就趕快補了一波。下面分享一下我對Hashmap的理解,主要用于個人備忘。如果有不對,請批評。想要解鎖更多新姿勢?...
摘要:對于一棵有效的紅黑樹二叉樹而言我們必須增加如下規則每個節點都只能是紅色或者黑色根節點是黑色每個葉節點節點,空節點是黑色的。這些約束強制了紅黑樹的關鍵性質從根到葉子的最長的可能路徑不多于最短的可能路徑的兩倍長。 群里的大哥說了,要想懂紅黑樹的應用,先要看TreeMap。 想要解鎖更多新姿勢?請訪問http://blog.tengshe789.tech/ OK,現在開始: 紅黑樹簡介 紅黑...
摘要:關于變量的值的類型的總結。所以此時指向新的對象還是指向被添加了屬性的老對象, 關于變量的值的類型的總結。 //1.當多個變量的值是非引用類型var a=1;var b=a; //系統復制了a的值并賦值給ba++; //a自身的值被改變,而b的值不受影響 a b的值雖相等但互不影響console.log(a)//2console.log(b)//1 ...
閱讀 3428·2021-11-19 09:40
閱讀 1314·2021-10-11 11:07
閱讀 4846·2021-09-22 15:07
閱讀 2890·2021-09-02 15:15
閱讀 1964·2019-08-30 15:55
閱讀 539·2019-08-30 15:43
閱讀 883·2019-08-30 11:13
閱讀 1449·2019-08-29 15:36