摘要:所以,在并發量適中的情況下,一般具有較好的性能。字段指向隊列頭,指向隊列尾,通過來操作字段值以及對象的字段值。單線程的情況下,元素入隊比較好理解,直接線性地在隊首插入元素即可。
本文首發于一世流云專欄:https://segmentfault.com/blog...一、ConcurrentLinkedQueue簡介
ConcurrentLinkedQueue是JDK1.5時隨著J.U.C一起引入的一個支持并發環境的隊列。從名字就可以看出來,ConcurrentLinkedQueue底層是基于鏈表實現的。
Doug Lea在實現ConcurrentLinkedQueue時,并沒有利用鎖或底層同步原語,而是完全基于自旋+CAS的方式實現了該隊列。回想一下AQS,AQS內部的CLH等待隊列也是利用了這種方式。
由于是完全基于無鎖算法實現的,所以當出現多個線程同時進行修改隊列的操作(比如同時入隊),很可能出現CAS修改失敗的情況,那么失敗的線程會進入下一次自旋,再嘗試入隊操作,直到成功。所以,在并發量適中的情況下,ConcurrentLinkedQueue一般具有較好的性能。
二、ConcurrentLinkedQueue原理 隊列結構我們來看下ConcurrentLinkedQueue的內部結構,:
public class ConcurrentLinkedQueueextends AbstractQueue implements Queue , java.io.Serializable { ? /** * 隊列頭指針 */ private transient volatile Node head; ? /** * 隊列尾指針. */ private transient volatile Node tail; ? // Unsafe mechanics ? private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; ? static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = ConcurrentLinkedQueue.class; headOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset (k.getDeclaredField("tail")); } catch (Exception e) { throw new Error(e); } } ? /** * 隊列結點定義 */ private static class Node { volatile E item; // 元素值 volatile Node next; // 后驅指針 ? Node(E item) { UNSAFE.putObject(this, itemOffset, item); } ? boolean casItem(E cmp, E val) { return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } ? void lazySetNext(Node val) { UNSAFE.putOrderedObject(this, nextOffset, val); } ? boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } ? // Unsafe mechanics ? private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; ? static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = Node.class; itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } } ? //... }
可以看到,ConcurrentLinkedQueue內部就是一個簡單的單鏈表結構,每入隊一個元素就是插入一個Node類型的結點。字段head指向隊列頭,tail指向隊列尾,通過Unsafe來CAS操作字段值以及Node對象的字段值。
構造器定義ConcurrentLinkedQueue包含兩種構造器:
/** * 構建一個空隊列(head,tail均指向一個占位結點). */ public ConcurrentLinkedQueue() { head = tail = new Node(null); }
/** * 根據已有集合,構造隊列 */ public ConcurrentLinkedQueue(Collection extends E> c) { Nodeh = null, t = null; for (E e : c) { checkNotNull(e); Node newNode = new Node (e); if (h == null) h = t = newNode; else { t.lazySetNext(newNode); t = newNode; } } if (h == null) h = t = new Node (null); head = h; tail = t; }
我們重點看下空構造器,通過空構造器建立的ConcurrentLinkedQueue對象,其head和tail指針并非指向null,而是指向一個item值為null的Node結點——哨兵結點,如下圖:
元素的入隊是在隊尾插入元素,關于隊列的操作,如果讀者不熟悉,可以參考《Algorithms 4th》或我的這篇博文:https://www.jianshu.com/p/f9b...
ConcurrentLinkedQueue的入隊代碼很簡單,卻非常精妙:
/** * 入隊一個元素. * * @throws NullPointerException 元素不能為null */ public boolean add(E e) { return offer(e); }
/** * 在隊尾入隊元素e, 直到成功 */ public boolean offer(E e) { checkNotNull(e); final NodenewNode = new Node (e); for (Node t = tail, p = t; ; ) { // 自旋, 直到插入結點成功 Node q = p.next; if (q == null) { // CASE1: 正常情況下, 新結點直接插入到隊尾 if (p.casNext(null, newNode)) { // CAS競爭插入成功 if (p != t) // CAS競爭失敗的線程會在下一次自旋中進入該邏輯 casTail(t, newNode); // 重新設置隊尾指針tail return true; } // CAS競爭插入失敗,則進入下一次自旋 ? } else if (p == q) // CASE2: 發生了出隊操作 p = (t != (t = tail)) ? t : head; else // 將p重新指向隊尾結點 p = (p != t && t != (t = tail)) ? t : q; } }
我們來分析下上面offer方法的實現。單線程的情況下,元素入隊比較好理解,直接線性地在隊首插入元素即可。現在我們假設有兩個線程ThreadA和ThreadB同時進行入隊操作:
①ThreadA先多帶帶入隊兩個元素9、2
此時隊列的結構如下:
②ThreadA入隊元素“10”,ThreadB入隊元素“25”
此時ThreadA和ThreadB若并發執行,我們看下會發生什么:
1、ThreadA和ThreadB同時進入自旋中的以下代碼塊:
if (q == null) { // CASE1: 正常情況下, 新結點直接插入到隊尾 if (p.casNext(null, newNode)) { // CAS競爭插入成功 if (p != t) // CAS競爭失敗的線程會在下一次自旋中進入該邏輯 casTail(t, newNode); // 重新設置隊尾指針tail return true; } // CAS競爭插入失敗,則進入下一次自旋 ? }
2、ThreadA執行cas操作(p.casNext)成功,插入新結點“10”
ThreadA執行完成后,直接返回true,隊列結構如下:
3、ThreadB執行cas操作(p.casNext)失敗
由于CAS操作同時修改隊尾元素,導致ThreadB操作失敗,則ThreadB進入下一次自旋;
在下一次自旋中,進入以下代碼塊:
else // 將p重新指向隊尾結點 p = (p != t && t != (t = tail)) ? t : q;
上述分支的作用就是讓p指針重新定位到隊尾結點,此時隊列結構如下:
然后ThreadB會繼續下一次自旋,并再次進入以下代碼塊:
if (q == null) { // CASE1: 正常情況下, 新結點直接插入到隊尾 if (p.casNext(null, newNode)) { // CAS競爭插入成功 if (p != t) // CAS競爭失敗的線程會在下一次自旋中進入該邏輯 casTail(t, newNode); // 重新設置隊尾指針tail return true; } // CAS競爭插入失敗,則進入下一次自旋 ? }
此時,CAS操作成功,隊列結構如下:
由于此時p!=t ,所以會調用casTail方法重新設置隊尾指針:
casTail(t, newNode); // 重新設置隊尾指針tail
最終隊列如下:
從上面的分析過程可以看到,由于入隊元素一定是要鏈接到隊尾的,但并發情況下隊尾結點可能隨時變化,所以就需要指針定位最新的隊尾結點,并在入隊時判斷隊尾結點是否改變了,如果改變了,就需要重新設置定位指針,然后在下一次自旋中繼續嘗試入隊操作。
上面整個執行步驟有一段分支還沒有覆蓋到:
else if (p == q) // CASE2: 發生了出隊操作 p = (t != (t = tail)) ? t : head;
這個分支只有在元素入隊的同時,針對該元素也發生了“出隊”操作才會執行,我們后面會分析元素的“出隊”,理解了“出隊”操作再回頭來看這個分支就容易理解很多了。
出隊操作隊列中元素的“出隊”是從隊首移除元素,我們來看下ConcurrentLinkedQueue是如何實現出隊的:
/** * 在隊首出隊元素, 直到成功 */ public E poll() { restartFromHead: for (; ; ) { for (Nodeh = head, p = h, q; ; ) { E item = p.item; ? if (item != null && p.casItem(item, null)) { // CASE2: 隊首是非哨兵結點(item!=null) if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; } else if ((q = p.next) == null) { // CASE1: 隊首是一個哨兵結點(item==null) updateHead(h, p); return null; } else if (p == q) continue restartFromHead; else p = q; } } }
還是通過示例來看,假設初始的隊列結構如下:
①ThreadA先多帶帶進行出隊操作
由于head所指的是item==null的結點,所以ThreadA會執行以下分支:
else p = q;
然后進入下一次自旋,在自旋中執行以下分支,如果CAS操作成功,則移除首個有效元素,并重新設置頭指針:
if (item != null && p.casItem(item, null)) { // CASE2: 隊首是非哨兵結點(item!=null) if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; }
此時隊列的結構如下:
如果ThreadA的CAS操作失敗呢?
CAS操作失敗則會進入以下分支,并重新開始自旋:
else if (p == q) continue restartFromHead;
最終前面兩個null結點會被GC回收,隊列結構如下:
②ThreadA繼續進行出隊操作
ThreadA繼續執行“出隊”操作,還是執行以下分支:
if (item != null && p.casItem(item, null)) { // CASE2: 隊首是非哨兵結點(item!=null) if (p != h) // hop two nodes at a time updateHead(h, ((q = p.next) != null) ? q : p); return item; }
但是此時p==h,所以僅將頭結點置null,這其實是一種“懶刪除”的策略。
出隊元素“2”:
出隊元素“10”:
最終隊列結果如下:
③ThreadA進行出隊,其它線程進行入隊
這是最特殊的一種情況,當隊列中只剩下一個元素時,如果同時發生出隊和入隊操作,會導致隊列出現下面這種結構:(假設ThreadA進行出隊元素“25”,ThreadB進行入隊元素“11”)
此時tail.next=tail自身,所以ThreadB在執行入隊時,會進入到offer方法的以下分支:
else if (p == q) // CASE2: 發生了出隊操作 p = (t != (t = tail)) ? t : head;三、總結
ConcurrentLinkedQueue使用了自旋+CAS的非阻塞算法來保證線程并發訪問時的數據一致性。由于隊列本身是一種鏈表結構,所以雖然算法看起來很簡單,但其實需要考慮各種并發的情況,實現復雜度較高,并且ConcurrentLinkedQueue不具備實時的數據一致性,實際運用中,隊列一般在生產者-消費者的場景下使用得較多,所以ConcurrentLinkedQueue的使用場景并不如阻塞隊列那么多。
另外,關于ConcurrentLinkedQueue還有以下需要注意的幾點:
ConcurrentLinkedQueue的迭代器是弱一致性的,這在并發容器中是比較普遍的現象,主要是指在一個線程在遍歷隊列結點而另一個線程嘗試對某個隊列結點進行修改的話不會拋出ConcurrentModificationException,這也就造成在遍歷某個尚未被修改的結點時,在next方法返回時可以看到該結點的修改,但在遍歷后再對該結點修改時就看不到這種變化。
size方法需要遍歷鏈表,所以在并發情況下,其結果不一定是準確的,只能供參考。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/76960.html
摘要:在之前,除了類外,并沒有其它適合并發環境的棧數據結構。作為雙端隊列,可以當作棧來使用,并且高效地支持并發環境。 showImg(https://segmentfault.com/img/bVbguF7?w=1280&h=853); 本文首發于一世流云專欄:https://segmentfault.com/blog... 一、引言 在開始講ConcurrentLinkedDeque之前...
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現了接口,在多線程進階二五之框架中,我們提到過實現了接口,以提供和排序相關的功能,維持元素的有序性,所以就是一種為并發環境設計的有序工具類。唯一的區別是針對的僅僅是鍵值,針對鍵值對進行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發于一世流云專欄:https://seg...
摘要:僅僅當有多個線程同時進行寫操作時,才會進行同步。可以看到,上述方法返回一個迭代器對象,的迭代是在舊數組上進行的,當創建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發于一世流云專欄:https://...
摘要:我們之前已經介紹過了,底層基于跳表實現,其操作平均時間復雜度均為。事實上,內部引用了一個對象,以組合方式,委托對象實現了所有功能。線程安全內存的使用較多迭代是對快照進行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發于一世流云專欄:https://segmentfa...
閱讀 2664·2021-11-24 09:38
閱讀 1979·2019-08-30 15:53
閱讀 1234·2019-08-30 15:44
閱讀 3229·2019-08-30 14:10
閱讀 3579·2019-08-29 16:29
閱讀 1800·2019-08-29 16:23
閱讀 1099·2019-08-29 16:20
閱讀 1472·2019-08-29 11:13