摘要:在之前,除了類(lèi)外,并沒(méi)有其它適合并發(fā)環(huán)境的棧數(shù)據(jù)結(jié)構(gòu)。作為雙端隊(duì)列,可以當(dāng)作棧來(lái)使用,并且高效地支持并發(fā)環(huán)境。
本文首發(fā)于一世流云專(zhuān)欄:https://segmentfault.com/blog...一、引言
在開(kāi)始講ConcurrentLinkedDeque之前,我們先來(lái)了解下Deque這種數(shù)據(jù)結(jié)構(gòu),我們知道Queue是一種具有FIFO特點(diǎn)的數(shù)據(jù)結(jié)構(gòu),元素只能在隊(duì)首進(jìn)行“入隊(duì)”操作,在隊(duì)尾進(jìn)行“出隊(duì)”操作。
而Deque(double-ended queue)是一種雙端隊(duì)列,也就是說(shuō)可以在任意一端進(jìn)行“入隊(duì)”,也可以在任意一端進(jìn)行“出隊(duì)”:
Deque的數(shù)據(jù)結(jié)構(gòu)示意圖如下:
我們?cè)賮?lái)看下JDK中Queue和Deque這兩種數(shù)據(jù)結(jié)構(gòu)的接口定義,看看Deque和Queue相比有哪些增強(qiáng):
Queue接口定義Queue的接口非常簡(jiǎn)單,一共只有三種類(lèi)型的操作:入隊(duì)、出隊(duì)、讀取。
上述方法,可以劃分如下:
操作類(lèi)型 | 拋出異常 | 返回特殊值 |
---|---|---|
入隊(duì) | add(e) | offer(e) |
出隊(duì) | remove() | poll() |
讀取 | element() | peek() |
每種操作類(lèi)型,都給出了兩種方法,區(qū)別就是其中一種操作在隊(duì)列的狀態(tài)不滿(mǎn)足某些要求時(shí),會(huì)拋出異常;另一種,則直接返回特殊值(如null)。
Deque接口定義Queue接口的所有方法Deque都具備,只不過(guò)隊(duì)首/隊(duì)尾都可以進(jìn)行“出隊(duì)”和“入隊(duì)”操作:
操作類(lèi)型 | 拋出異常 | 返回特殊值 |
---|---|---|
隊(duì)首入隊(duì) | addFirst(e) | offerFirst(e) |
隊(duì)首出隊(duì) | removeFirst() | pollFirst() |
隊(duì)首讀取 | getFirst() | peekFirst() |
隊(duì)尾入隊(duì) | addLast(e) | offerLast(e) |
隊(duì)尾出隊(duì) | removeLast() | pollLast() |
隊(duì)尾讀取 | getLast() | peekLast() |
除此之外,Deque還可以當(dāng)作“棧”來(lái)使用,我們知道“棧”是一種具有“LIFO”特點(diǎn)的數(shù)據(jù)結(jié)構(gòu)(關(guān)于棧,可以參考我的這篇博文:棧),Deque提供了push、pop、peek這三個(gè)棧方法,一般實(shí)現(xiàn)這三個(gè)方法時(shí),可以利用已有方法,即有如下映射關(guān)系:
棧方法 | Deque方法 |
---|---|
push | addFirst(e) |
pop | removeFirst() |
peek | peekFirst() |
關(guān)于Deque接口的更多細(xì)節(jié),讀者可以參考Oracle的官方文檔:https://docs.oracle.com/javas...
二、ConcurrentLinkedDeque簡(jiǎn)介ConcurrentLinkedDeque是JDK1.7時(shí),J.U.C包引入的一個(gè)集合工具類(lèi)。在JDK1.7之前,除了Stack類(lèi)外,并沒(méi)有其它適合并發(fā)環(huán)境的“棧”數(shù)據(jù)結(jié)構(gòu)。ConcurrentLinkedDeque作為雙端隊(duì)列,可以當(dāng)作“棧”來(lái)使用,并且高效地支持并發(fā)環(huán)境。
ConcurrentLinkedDeque和ConcurrentLinkedQueue一樣,采用了無(wú)鎖算法,底層基于自旋+CAS的方式實(shí)現(xiàn)。
三、ConcurrentLinkedDeque原理 隊(duì)列結(jié)構(gòu)我們先來(lái)看下ConcurrentLinkedDeque的內(nèi)部結(jié)構(gòu):
public class ConcurrentLinkedDequeextends AbstractCollection implements Deque , java.io.Serializable { /** * 頭指針 */ private transient volatile Node head; /** * 尾指針 */ private transient volatile Node tail; private static final Node
可以看到,ConcurrentLinkedDeque的內(nèi)部和ConcurrentLinkedQueue類(lèi)似,不過(guò)是一個(gè)雙鏈表結(jié)構(gòu),每入隊(duì)一個(gè)元素就是插入一個(gè)Node類(lèi)型的結(jié)點(diǎn)。字段head指向隊(duì)列頭,tail指向隊(duì)列尾,通過(guò)Unsafe來(lái)CAS操作字段值以及Node對(duì)象的字段值。
需要特別注意的是ConcurrentLinkedDeque包含兩個(gè)特殊字段:PREV_TERMINATOR、NEXT_TERMINATOR。
這兩個(gè)字段初始時(shí)都指向一個(gè)值為null的空結(jié)點(diǎn),這兩個(gè)字段在結(jié)點(diǎn)刪除時(shí)使用,后面會(huì)詳細(xì)介紹:
ConcurrentLinkedDeque包含兩種構(gòu)造器:
/** * 空構(gòu)造器. */ public ConcurrentLinkedDeque() { head = tail = new Node(null); }
/** * 從已有集合,構(gòu)造隊(duì)列 */ public ConcurrentLinkedDeque(Collection extends E> c) { Nodeh = null, t = null; for (E e : c) { checkNotNull(e); Node newNode = new Node (e); if (h == null) h = t = newNode; else { // 在隊(duì)尾插入元素 t.lazySetNext(newNode); newNode.lazySetPrev(t); t = newNode; } } initHeadTail(h, t); }
我們重點(diǎn)看下空構(gòu)造器,通過(guò)空構(gòu)造器建立的ConcurrentLinkedDeque對(duì)象,其head和tail指針并非指向null,而是指向一個(gè)item值為null的Node結(jié)點(diǎn)——哨兵結(jié)點(diǎn),如下圖:
入隊(duì)操作雙端隊(duì)列與普通隊(duì)列的入隊(duì)區(qū)別是:雙端隊(duì)列既可以在“隊(duì)尾”插入元素,也可以在“隊(duì)首”插入元素。ConcurrentLinkedDeque的入隊(duì)方法有很多:addFirst(e)、addLast(e)、offerFirst(e)、offerLast(e):
public void addFirst(E e) { linkFirst(e); } public void addLast(E e) { linkLast(e); } public boolean offerFirst(E e) { linkFirst(e); return true; } public boolean offerLast(E e) { linkLast(e); return true; }
可以看到,隊(duì)首“入隊(duì)”其實(shí)就是調(diào)用了linkFirst(e)方法,而隊(duì)尾“入隊(duì)”是調(diào)用了 linkLast(e)方法。我們先來(lái)看下隊(duì)首“入隊(duì)”——linkFirst(e):
/** * 在隊(duì)首插入一個(gè)元素. */ private void linkFirst(E e) { checkNotNull(e); final NodenewNode = new Node (e); // 創(chuàng)建待插入的結(jié)點(diǎn) restartFromHead: for (; ; ) for (Node h = head, p = h, q; ; ) { if ((q = p.prev) != null && (q = (p = q).prev) != null) // Check for head updates every other hop. // If p == q, we are sure to follow head instead. p = (h != (h = head)) ? h : q; else if (p.next == p) // PREV_TERMINATOR continue restartFromHead; else { // p is first node newNode.lazySetNext(p); // CAS piggyback if (p.casPrev(null, newNode)) { // Successful CAS is the linearization point // for e to become an element of this deque, // and for newNode to become "live". if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // Lost CAS race to another thread; re-read prev } } }
為了便于理解,我們以示例來(lái)看:假設(shè)有兩個(gè)線程ThreadA和ThreadB同時(shí)進(jìn)行入隊(duì)操作。
①ThreadA先多帶帶入隊(duì)一個(gè)元素9
此時(shí),ThreadA會(huì)執(zhí)行CASE3分支:
else { // CASE3: p是隊(duì)首結(jié)點(diǎn) newNode.lazySetNext(p); // “新結(jié)點(diǎn)”的next指向隊(duì)首結(jié)點(diǎn) if (p.casPrev(null, newNode)) { // 隊(duì)首結(jié)點(diǎn)的prev指針指向“新結(jié)點(diǎn)” if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // 執(zhí)行到此處說(shuō)明CAS操作失敗,有其它線程也在隊(duì)首插入元素 }
隊(duì)列的結(jié)構(gòu)如下:
②ThreadA入隊(duì)一個(gè)元素2,同時(shí)ThreadB入隊(duì)一個(gè)元素10
此時(shí),依然執(zhí)行CASE3分支,我們假設(shè)ThreadA操作成功,ThreadB操作失敗:
else { // CASE3: p是隊(duì)首結(jié)點(diǎn) newNode.lazySetNext(p); // “新結(jié)點(diǎn)”的next指向隊(duì)首結(jié)點(diǎn) if (p.casPrev(null, newNode)) { // 隊(duì)首結(jié)點(diǎn)的prev指針指向“新結(jié)點(diǎn)” if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK. return; } // 執(zhí)行到此處說(shuō)明CAS操作失敗,有其它線程也在隊(duì)首插入元素 }
ThreadA的CAS操作成功后,會(huì)進(jìn)入以下判斷:
if (p != h) // hop two nodes at a time casHead(h, newNode); // Failure is OK.
上述判斷的作用就是重置head頭指針,可以看到,ConcurrentLinkedDeque其實(shí)是以每次跳2個(gè)結(jié)點(diǎn)的方式移動(dòng)指針,這主要考慮到并發(fā)環(huán)境以這種hop跳的方式可以提升效率。
此時(shí)隊(duì)列的機(jī)構(gòu)如下:
注意,此時(shí)ThreadB的p.casPrev(null, newNode)操作失敗了,所以會(huì)進(jìn)入下一次自旋,在下一次自旋中繼續(xù)進(jìn)入CASE3。如果ThreadA的casHead操作沒(méi)有完成,ThreadB就進(jìn)入了下一次自旋,則會(huì)進(jìn)入分支1,重置指針p指向隊(duì)首。最終隊(duì)列結(jié)構(gòu)如下:
在隊(duì)尾插入元素和隊(duì)首類(lèi)似,不再贅述,讀者可以自己閱讀源碼。出隊(duì)操作
ConcurrentLinkedDeque的出隊(duì)一樣分為隊(duì)首、隊(duì)尾兩種情況:removeFirst()、pollFirst()、removeLast()、pollLast()。
public E removeFirst() { return screenNullResult(pollFirst()); } public E removeLast() { return screenNullResult(pollLast()); } public E pollFirst() { for (Nodep = first(); p != null; p = succ(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; } public E pollLast() { for (Node p = last(); p != null; p = pred(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; }
可以看到,兩個(gè)remove方法其實(shí)內(nèi)部都調(diào)用了對(duì)應(yīng)的poll方法,我們重點(diǎn)看下隊(duì)尾的“出隊(duì)”——pollLast方法:
public E pollLast() { for (Nodep = last(); p != null; p = pred(p)) { E item = p.item; if (item != null && p.casItem(item, null)) { unlink(p); return item; } } return null; }
last方法用于尋找隊(duì)尾結(jié)點(diǎn),即滿(mǎn)足p.next == null && p.prev != p的結(jié)點(diǎn):
Nodelast() { restartFromTail: for (; ; ) for (Node t = tail, p = t, q; ; ) { if ((q = p.next) != null && (q = (p = q).next) != null) // Check for tail updates every other hop. // If p == q, we are sure to follow tail instead. p = (t != (t = tail)) ? t : q; else if (p == t // It is possible that p is NEXT_TERMINATOR, // but if so, the CAS is guaranteed to fail. || casTail(t, p)) return p; else continue restartFromTail; } }
pred方法用于尋找當(dāng)前結(jié)點(diǎn)的前驅(qū)結(jié)點(diǎn)(如果前驅(qū)是自身,則返回隊(duì)尾結(jié)點(diǎn)):
final Nodepred(Node p) { Node q = p.prev; return (p == q) ? last() : q; }
unlink方法斷開(kāi)結(jié)點(diǎn)的鏈接:
/** * Unlinks non-null node x. */ void unlink(Nodex) { // assert x != null; // assert x.item == null; // assert x != PREV_TERMINATOR; // assert x != NEXT_TERMINATOR; final Node prev = x.prev; final Node next = x.next; if (prev == null) { unlinkFirst(x, next); } else if (next == null) { unlinkLast(x, prev); } else { Node activePred, activeSucc; boolean isFirst, isLast; int hops = 1; // Find active predecessor for (Node p = prev; ; ++hops) { if (p.item != null) { activePred = p; isFirst = false; break; } Node q = p.prev; if (q == null) { if (p.next == p) return; activePred = p; isFirst = true; break; } else if (p == q) return; else p = q; } // Find active successor for (Node p = next; ; ++hops) { if (p.item != null) { activeSucc = p; isLast = false; break; } Node q = p.next; if (q == null) { if (p.prev == p) return; activeSucc = p; isLast = true; break; } else if (p == q) return; else p = q; } // TODO: better HOP heuristics if (hops < HOPS // always squeeze out interior deleted nodes && (isFirst | isLast)) return; // Squeeze out deleted nodes between activePred and // activeSucc, including x. skipDeletedSuccessors(activePred); skipDeletedPredecessors(activeSucc); // Try to gc-unlink, if possible if ((isFirst | isLast) && // Recheck expected state of predecessor and successor (activePred.next == activeSucc) && (activeSucc.prev == activePred) && (isFirst ? activePred.prev == null : activePred.item != null) && (isLast ? activeSucc.next == null : activeSucc.item != null)) { updateHead(); // Ensure x is not reachable from head updateTail(); // Ensure x is not reachable from tail // Finally, actually gc-unlink x.lazySetPrev(isFirst ? prevTerminator() : x); x.lazySetNext(isLast ? nextTerminator() : x); } } }
ConcurrentLinkedDeque相比ConcurrentLinkedQueue,功能更豐富,但是由于底層結(jié)構(gòu)是雙鏈表,且完全采用CAS+自旋的無(wú)鎖算法保證線程安全性,所以需要考慮各種并發(fā)情況,源碼比ConcurrentLinkedQueue更加難懂,留待有精力作進(jìn)一步分析。四、總結(jié)
ConcurrentLinkedDeque使用了自旋+CAS的非阻塞算法來(lái)保證線程并發(fā)訪問(wèn)時(shí)的數(shù)據(jù)一致性。由于隊(duì)列本身是一種雙鏈表結(jié)構(gòu),所以雖然算法看起來(lái)很簡(jiǎn)單,但其實(shí)需要考慮各種并發(fā)的情況,實(shí)現(xiàn)復(fù)雜度較高,并且ConcurrentLinkedDeque不具備實(shí)時(shí)的數(shù)據(jù)一致性,實(shí)際運(yùn)用中,如果需要一種線程安全的棧結(jié)構(gòu),可以使用ConcurrentLinkedDeque。
另外,關(guān)于ConcurrentLinkedDeque還有以下需要注意的幾點(diǎn):
ConcurrentLinkedDeque的迭代器是弱一致性的,這在并發(fā)容器中是比較普遍的現(xiàn)象,主要是指在一個(gè)線程在遍歷隊(duì)列結(jié)點(diǎn)而另一個(gè)線程嘗試對(duì)某個(gè)隊(duì)列結(jié)點(diǎn)進(jìn)行修改的話不會(huì)拋出ConcurrentModificationException,這也就造成在遍歷某個(gè)尚未被修改的結(jié)點(diǎn)時(shí),在next方法返回時(shí)可以看到該結(jié)點(diǎn)的修改,但在遍歷后再對(duì)該結(jié)點(diǎn)修改時(shí)就看不到這種變化。
size方法需要遍歷鏈表,所以在并發(fā)情況下,其結(jié)果不一定是準(zhǔn)確的,只能供參考。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/77014.html
摘要:所以,在并發(fā)量適中的情況下,一般具有較好的性能。字段指向隊(duì)列頭,指向隊(duì)列尾,通過(guò)來(lái)操作字段值以及對(duì)象的字段值。單線程的情況下,元素入隊(duì)比較好理解,直接線性地在隊(duì)首插入元素即可。 showImg(https://segmentfault.com/img/bVbguGd?w=1200&h=800); 本文首發(fā)于一世流云專(zhuān)欄:https://segmentfault.com/blog... ...
摘要:整個(gè)包,按照功能可以大致劃分如下鎖框架原子類(lèi)框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見(jiàn)的多線程設(shè)計(jì)模式,設(shè)計(jì)了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對(duì)等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專(zhuān)欄:https...
摘要:我們來(lái)看下的類(lèi)繼承圖可以看到,實(shí)現(xiàn)了接口,在多線程進(jìn)階二五之框架中,我們提到過(guò)實(shí)現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計(jì)的有序工具類(lèi)。唯一的區(qū)別是針對(duì)的僅僅是鍵值,針對(duì)鍵值對(duì)進(jìn)行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專(zhuān)欄:https://seg...
摘要:僅僅當(dāng)有多個(gè)線程同時(shí)進(jìn)行寫(xiě)操作時(shí),才會(huì)進(jìn)行同步。可以看到,上述方法返回一個(gè)迭代器對(duì)象,的迭代是在舊數(shù)組上進(jìn)行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過(guò)程中不會(huì)拋出并發(fā)修改異常。另外,迭代器對(duì)象也不支持修改方法,全部會(huì)拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專(zhuān)欄:https://...
摘要:我們之前已經(jīng)介紹過(guò)了,底層基于跳表實(shí)現(xiàn),其操作平均時(shí)間復(fù)雜度均為。事實(shí)上,內(nèi)部引用了一個(gè)對(duì)象,以組合方式,委托對(duì)象實(shí)現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對(duì)快照進(jìn)行的,不會(huì)拋出,且迭代過(guò)程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專(zhuān)欄:https://segmentfa...
閱讀 1634·2023-04-26 02:11
閱讀 2979·2023-04-25 16:18
閱讀 3711·2021-09-06 15:00
閱讀 2630·2019-08-30 15:55
閱讀 1934·2019-08-30 13:20
閱讀 2051·2019-08-26 18:36
閱讀 3121·2019-08-26 11:40
閱讀 2538·2019-08-26 10:11