摘要:當生產(chǎn)者線程調(diào)用方法時,如果沒有消費者等待接收元素,則會立即返回。方法方法,用于將指定元素傳遞給消費者線程調(diào)用方法。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、LinkedTransferQueue簡介
LinkedTransferQueue是在JDK1.7時,J.U.C包新增的一種比較特殊的阻塞隊列,它除了具備阻塞隊列的常用功能外,還有一個比較特殊的transfer方法。
我們知道,在普通阻塞隊列中,當隊列為空時,消費者線程(調(diào)用take或poll方法的線程)一般會阻塞等待生產(chǎn)者線程往隊列中存入元素。而LinkedTransferQueue的transfer方法則比較特殊:
當有消費者線程阻塞等待時,調(diào)用transfer方法的生產(chǎn)者線程不會將元素存入隊列,而是直接將元素傳遞給消費者;
如果調(diào)用transfer方法的生產(chǎn)者線程發(fā)現(xiàn)沒有正在等待的消費者線程,則會將元素入隊,然后會阻塞等待,直到有一個消費者線程來獲取該元素。
TransferQueue接口可以看到,LinkedTransferQueue實現(xiàn)了一個名為TransferQueue的接口,TransferQueue也是JDK1.7時J.U.C包新增的接口,正是該接口提供了上述的transfer方法:
除了transfer方法外,TransferQueue還提供了兩個變種方法:tryTransfer(E e)、tryTransfer(E e, long timeout, TimeUnit unit)。
tryTransfer(E e)
當生產(chǎn)者線程調(diào)用tryTransfer方法時,如果沒有消費者等待接收元素,則會立即返回false。該方法和transfer方法的區(qū)別就是tryTransfer方法無論消費者是否接收,方法立即返回,而transfer方法必須等到消費者消費后才返回。
tryTransfer(E e, long timeout, TimeUnit unit)
tryTransfer(E e,long timeout,TimeUnit unit)方法則是加上了限時等待功能,如果沒有消費者消費該元素,則等待指定的時間再返回;如果超時還沒消費元素,則返回false,如果在超時時間內(nèi)消費了元素,則返回true。
TransferQueue接口定義:
LinkedTransferQueue的特點簡要概括如下:
LinkedTransferQueue是一種無界阻塞隊列,底層基于單鏈表實現(xiàn);
LinkedTransferQueue中的結(jié)點有兩種類型:數(shù)據(jù)結(jié)點、請求結(jié)點;
LinkedTransferQueue基于無鎖算法實現(xiàn)。
二、LinkedTransferQueue原理 內(nèi)部結(jié)構(gòu)LinkedTransferQueue提供了兩種構(gòu)造器,也沒有參數(shù)設(shè)置隊列初始容量,所以是一種無界隊列:
/** * 隊列結(jié)點定義. */ static final class Node { final boolean isData; // true: 數(shù)據(jù)結(jié)點; false: 請求結(jié)點 volatile Object item; // 結(jié)點值 volatile Node next; // 后驅(qū)結(jié)點指針 volatile Thread waiter; // 等待線程 // 設(shè)置當前結(jié)點的后驅(qū)結(jié)點為val final boolean casNext(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val); } // 設(shè)置當前結(jié)點的值為val final boolean casItem(Object cmp, Object val) { // assert cmp == null || cmp.getClass() != Node.class; return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val); } Node(Object item, boolean isData) { UNSAFE.putObject(this, itemOffset, item); // relaxed write this.isData = isData; } // 設(shè)置當前結(jié)點的后驅(qū)結(jié)點為自身 final void forgetNext() { UNSAFE.putObject(this, nextOffset, this); } /** * 設(shè)置當前結(jié)點的值為自身. * 設(shè)置當前結(jié)點的等待線程為null. */ final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); } /** * 判斷當前結(jié)點是否匹配成功. * Node.item == this || (Node.isData == true && Node.item == null) */ final boolean isMatched() { Object x = item; return (x == this) || ((x == null) == isData); } /** * 判斷是否為未匹配的請求結(jié)點. * Node.isData == false && Node.item == null */ final boolean isUnmatchedRequest() { return !isData && item == null; } /** * 當該結(jié)點(havaData)是未匹配結(jié)點, 且與當前的結(jié)點類型不同時, 返回true. */ final boolean cannotPrecede(boolean haveData) { boolean d = isData; Object x; return d != haveData && (x = item) != this && (x != null) == d; } /** * 嘗試匹配數(shù)據(jù)結(jié)點. */ final boolean tryMatchData() { // assert isData; 當前結(jié)點必須為數(shù)據(jù)結(jié)點 Object x = item; if (x != null && x != this && casItem(x, null)) { LockSupport.unpark(waiter); // 喚醒等待線程 return true; } return false; } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long itemOffset; private static final long nextOffset; private static final long waiterOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = Node.class; itemOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("item")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); waiterOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiter")); } catch (Exception e) { throw new Error(e); } } }
關(guān)于Node結(jié)點,有以下幾點需要特別注意:
Node結(jié)點有兩種類型:數(shù)據(jù)結(jié)點、請求結(jié)點,通過字段isData區(qū)分,只有不同類型的結(jié)點才能相互匹配;
Node結(jié)點的值保存在item字段,匹配前后值會發(fā)生變化;
Node結(jié)點的狀態(tài)變化如下表:
結(jié)點/狀態(tài) | 數(shù)據(jù)結(jié)點 | 請求結(jié)點 |
---|---|---|
匹配前 | isData = true; item = 數(shù)據(jù)結(jié)點值 | isData = false; item = null |
匹配后 | isData = true; item = null | isData = false; item = this |
從上表也可以看出,對于一個數(shù)據(jù)結(jié)點,當item == null表示匹配成功;對于一個請求結(jié)點,當item == this表示匹配成功。歸納起來,匹配成功的結(jié)點Node就是滿足(Node.item == this) || ((Node.item == null) == Node.isData)。
LinkedTransferQueue內(nèi)部的其余字段定義如下,主要就是通過Unsafe類操作字段值,內(nèi)部定義了很多常量字段,比如自旋,這些都是為了非阻塞算法的鎖優(yōu)化而定義的:
public class LinkedTransferQueueextends AbstractQueue implements TransferQueue , java.io.Serializable { /** * True如果是多核CPU */ private static final boolean MP = Runtime.getRuntime().availableProcessors() > 1; /** * 線程自旋次數(shù)(僅多核CPU時用到). */ private static final int FRONT_SPINS = 1 << 7; /** * 線程自旋次數(shù)(僅多核CPU時用到). */ private static final int CHAINED_SPINS = FRONT_SPINS >>> 1; /** * The maximum number of estimated removal failures (sweepVotes) * to tolerate before sweeping through the queue unlinking * cancelled nodes that were not unlinked upon initial * removal. See above for explanation. The value must be at least * two to avoid useless sweeps when removing trailing nodes. */ static final int SWEEP_THRESHOLD = 32; /** * 隊首結(jié)點指針. */ transient volatile Node head; /** * 隊尾結(jié)點指針. */ private transient volatile Node tail; /** * The number of apparent failures to unsplice removed nodes */ private transient volatile int sweepVotes; // CAS設(shè)置隊尾tail指針為val private boolean casTail(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val); } // CAS設(shè)置隊首head指針為val private boolean casHead(Node cmp, Node val) { return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val); } private boolean casSweepVotes(int cmp, int val) { return UNSAFE.compareAndSwapInt(this, sweepVotesOffset, cmp, val); } /* * xfer方法的入?yún)? 不同類型的方法內(nèi)部調(diào)用xfer方法時入?yún)⒉煌? */ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; private static final long tailOffset; private static final long sweepVotesOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = LinkedTransferQueue.class; headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head")); tailOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("tail")); sweepVotesOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("sweepVotes")); } catch (Exception e) { throw new Error(e); } } //... }
上述比較重要的就是4個常量值的定義:
/* * xfer方法的入?yún)? 不同類型的方法內(nèi)部調(diào)用xfer方法時入?yún)⒉煌? */ private static final int NOW = 0; // for untimed poll, tryTransfer private static final int ASYNC = 1; // for offer, put, add private static final int SYNC = 2; // for transfer, take private static final int TIMED = 3; // for timed poll, tryTransfer
這四個常量值,作為xfer方法的入?yún)ⅲ糜跇俗R不同操作類型。其實從常量的命名也可以看出它們對應的操作含義:
NOW表示即時操作(可能失敗),即不會阻塞調(diào)用線程:
poll(獲取并移除隊首元素,如果隊列為空,直接返回null);tryTransfer(嘗試將元素傳遞給消費者,如果沒有等待的消費者,則立即返回false,也不會將元素入隊)
ASYNC表示異步操作(必然成功):
offer(插入指定元素至隊尾,由于是無界隊列,所以會立即返回true);put(插入指定元素至隊尾,由于是無界隊列,所以會立即返回);add(插入指定元素至隊尾,由于是無界隊列,所以會立即返回true)
SYNC表示同步操作(阻塞調(diào)用線程):
transfer(阻塞直到出現(xiàn)一個消費者線程);take(從隊首移除一個元素,如果隊列為空,則阻塞線程)
TIMED表示限時同步操作(限時阻塞調(diào)用線程):
poll(long timeout, TimeUnit unit);tryTransfer(E e, long timeout, TimeUnit unit)
關(guān)于xfer方法,它是LinkedTransferQueued的核心內(nèi)部方法,我們后面會詳細介紹。
transfer方法transfer方法,用于將指定元素e傳遞給消費者線程(調(diào)用take/poll方法)。如果有消費者線程正在阻塞等待,則調(diào)用transfer方法的線程會直接將元素傳遞給它;如果沒有消費者線程等待獲取元素,則調(diào)用transfer方法的線程會將元素插入到隊尾,然后阻塞等待,直到出現(xiàn)一個消費者線程獲取元素:
/** * 將指定元素e傳遞給消費者線程(調(diào)用take/poll方法). */ public void transfer(E e) throws InterruptedException { if (xfer(e, true, SYNC, 0) != null) { // 進入到此處, 說明調(diào)用線程被中斷了 Thread.interrupted(); // 清除中斷狀態(tài), 然后拋出中斷異常 throw new InterruptedException(); } }
transfer方法的內(nèi)部實際是調(diào)用了xfer方法,入?yún)?b>SYNC=2:
/** * 入隊/出隊元素的真正實現(xiàn). * * @param e 入隊操作, e非null; 出隊操作, e為null * @param haveData true表示入隊元素, false表示出隊元素 * @param how NOW, ASYNC, SYNC, TIMED 四種常量定義 * @param nanos 限時模式下使用(納秒) * @return 匹配成功則返回匹配的元素, 否則返回e本身 */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 入隊操作, 元素e不能為null throw new NullPointerException(); Node s = null; retry: for (; ; ) { for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結(jié)點 boolean isData = p.isData; // 結(jié)點類型 Object item = p.item; // 結(jié)點值 if (item != p && (item != null) == isData) { // 如果結(jié)點還未匹配過 if (isData == haveData) // 同種類型結(jié)點不能匹配 break; if (p.casItem(item, e)) { // p指向從隊首開始向后的第一個匹配結(jié)點 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結(jié)點上的等待線程 return LinkedTransferQueue.cast(item); // 返回匹配結(jié)點的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創(chuàng)建一個入隊結(jié)點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅(qū)結(jié)點或s(隊列中只有一個結(jié)點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 } return e; } }
我們通過示例看下xfer方法到底做了哪些事:
①隊列初始狀態(tài)
②ThreadA線程調(diào)用transfer入隊元素“9”
注意,此時入隊一個數(shù)據(jù)結(jié)點,且隊列為空,所以會直接進入xfer中的下述代碼:
if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創(chuàng)建一個入隊結(jié)點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅(qū)結(jié)點或s(隊列中只有一個結(jié)點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 }
上述代碼會插入一個結(jié)點至隊尾,然后線程進入阻塞,等待一個出隊線程(消費者)的到來。
隊尾插入結(jié)點的方法是tryAppend,由于此時隊列為空,會進入CASE1分支,設(shè)置隊首指針head指向新結(jié)點,tryAppend方法的返回值有三種情況:
入隊失敗,返回null;
入隊成功且隊列只有一個結(jié)點,返回該結(jié)點自身;
入隊成功且隊列不止一個結(jié)點,返回該入隊結(jié)點的前驅(qū)結(jié)點。
/** * 嘗試將結(jié)點s添加到隊尾. * * @param s 待添加的結(jié)點 * @param haveData true: 數(shù)據(jù)結(jié)點 * @return 返回null表示失敗; 否則返回s的前驅(qū)結(jié)點(沒有前驅(qū)則返回s自身) */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t; ; ) { Node n, u; if (p == null && (p = head) == null) { // CASE1: 隊列為空 if (casHead(null, s)) // 設(shè)置隊首指針head return s; } else if (p.cannotPrecede(haveData)) // CASE2: 結(jié)點s不能鏈接到結(jié)點p return null; else if ((n = p.next) != null) // CASE3: 遍歷至隊尾結(jié)點 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // CASE4: 插入結(jié)點s p = p.next; // re-read on CAS failure else { // CASE5: 嘗試進行松弛操作 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t) ; } return p; } } }
等待出隊線程方法awaitMatch,該方法核心作用就是進行結(jié)點匹配:
匹配成功,返回匹配值;
匹配失敗(中斷或限時等待的超時情況),返回原匹配結(jié)點的值;
阻塞線程,等待與之匹配的結(jié)點的到來。
從awaitMatch方法其實可以看到一種經(jīng)典的“鎖優(yōu)化”思路,就是 自旋 -> yield -> 阻塞,線程不會立即進入阻塞,因為線程上下文切換的開銷往往比較大,所以會先自旋一定次數(shù),中途可能伴隨隨機的yield操作,讓出cpu時間片,如果自旋次數(shù)用完后,還是沒有匹配線程出現(xiàn),再真正阻塞線程。
經(jīng)過上述步驟,ThreadA最終會進入CASE4分支中等待,此時的隊列結(jié)構(gòu)如下:
注意,此時的隊列中tail隊尾指針并不指向結(jié)點“9”,這是一種“松弛”策略,后面會講到。
③ThreadB線程調(diào)用transfer入隊元素“2”
由于此時隊首head指針不為null,所以會進入transfer方法中的以下循環(huán):
for (Node h = head, p = h; p != null; ) { boolean isData = p.isData; // 結(jié)點類型 Object item = p.item; // 結(jié)點值 if (item != p && (item != null) == isData) { // 如果結(jié)點還未匹配過 if (isData == haveData) // 同種類型結(jié)點不能匹配 break; if (p.casItem(item, e)) { // match for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); return LinkedTransferQueue.cast(item); } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist }
上述方法會讀取隊首結(jié)點,判斷該結(jié)點有沒被匹配過(item != p && (item != null) == isData):
如果已經(jīng)被其它線程匹配過了,則繼續(xù)判斷下一個結(jié)點(p.next);
如果還沒有被匹配,則判斷下當前的入隊結(jié)點類型是否和隊首中的一致;如果一致(isData == haveData)就匹配失敗,跳出循環(huán),否則進行匹配操作。
顯然,目前隊首結(jié)點是“數(shù)據(jù)結(jié)點”,ThreadB線程的入隊結(jié)點也是“數(shù)據(jù)結(jié)點”,結(jié)點類型一致,所以匹配失敗,直接跳過循環(huán),也進入以下代碼塊:
if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創(chuàng)建一個入隊結(jié)點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅(qū)結(jié)點或s(隊列中只有一個結(jié)點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 }
再次調(diào)用tryAppend方法, 會在CASE4分支中將元素“2”插入隊尾,然后在CASE5分支中重新設(shè)置隊尾指針tail:
/** * 嘗試將結(jié)點s添加到隊尾. * * @param s 待添加的結(jié)點 * @param haveData true: 數(shù)據(jù)結(jié)點 * @return 返回null表示失敗; 否則返回s的前驅(qū)結(jié)點(沒有前驅(qū)則返回s自身) */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t; ; ) { Node n, u; if (p == null && (p = head) == null) { // CASE1: 隊列為空 if (casHead(null, s)) // 設(shè)置隊首指針head return s; } else if (p.cannotPrecede(haveData)) // CASE2: 結(jié)點s不能鏈接到結(jié)點p return null; else if ((n = p.next) != null) // CASE3: 遍歷至隊尾結(jié)點 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // CASE4: 插入結(jié)點s p = p.next; // re-read on CAS failure else { // CASE5: 嘗試進行松弛操作 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t) ; } return p; } } }
此時隊列結(jié)構(gòu)如下:
最終,ThreadB也會在awaitMatch方法中進入阻塞,最終隊列結(jié)構(gòu)如下:
④ThreadC線程調(diào)用transfer入隊元素“93”
過程和前幾步幾乎相同,不再贅述,最終隊列結(jié)構(gòu)如下:
可以看到,隊尾指針tail的設(shè)置實際是滯后的,這是一種“松弛”策略,用以提升無鎖算法并發(fā)修改過程中的性能。
take方法再來看下消費者線程調(diào)用的take方法,該方法會從隊首取出一個元素,如果隊列為空,則線程會阻塞:
/** * 從隊首出隊一個元素. */ public E take() throws InterruptedException { E e = xfer(null, false, SYNC, 0); // (e == null && isData=false)表示一個請求結(jié)點 if (e != null) // 如果e!=null, 則表示匹配成功, 此時e為與之匹配的數(shù)據(jù)結(jié)點的值 return e; Thread.interrupted(); throw new InterruptedException(); }
內(nèi)部依然調(diào)用了xfer方法,不過此時入?yún)⒂兴煌捎谑窍M線程調(diào)用,所以入?yún)?b>e == null && hasData == false,表示一個“請求結(jié)點”:
/** * 入隊/出隊元素的真正實現(xiàn). * * @param e 入隊操作, e非null; 出隊操作, e為null * @param haveData true表示入隊元素, false表示出隊元素 * @param how NOW, ASYNC, SYNC, TIMED 四種常量定義 * @param nanos 限時模式下使用(納秒) * @return 匹配成功則返回匹配的元素, 否則返回e本身 */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 入隊操作, 元素e不能為null throw new NullPointerException(); Node s = null; retry: for (; ; ) { for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結(jié)點 boolean isData = p.isData; // 結(jié)點類型 Object item = p.item; // 結(jié)點值 if (item != p && (item != null) == isData) { // 如果結(jié)點還未匹配過 if (isData == haveData) // 同種類型結(jié)點不能匹配 break; if (p.casItem(item, e)) { // p指向從隊首開始向后的第一個匹配結(jié)點 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結(jié)點上的等待線程 return LinkedTransferQueue.cast(item); // 返回匹配結(jié)點的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創(chuàng)建一個入隊結(jié)點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅(qū)結(jié)點或s(隊列中只有一個結(jié)點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 } return e; } }
還是通過示例看:
①隊列初始狀態(tài)
②ThreadD調(diào)用take方法,消費元素
此時,在xfer方法中,會從隊首開始,向后找到第一個匹配結(jié)點,并交換元素值,然后喚醒隊列中匹配結(jié)點上的等待線程:
/** * 入隊/出隊元素的真正實現(xiàn). * * @param e 入隊操作, e非null; 出隊操作, e為null * @param haveData true表示入隊元素, false表示出隊元素 * @param how NOW, ASYNC, SYNC, TIMED 四種常量定義 * @param nanos 限時模式下使用(納秒) * @return 匹配成功則返回匹配的元素, 否則返回e本身 */ private E xfer(E e, boolean haveData, int how, long nanos) { if (haveData && (e == null)) // 入隊操作, 元素e不能為null throw new NullPointerException(); Node s = null; retry: for (; ; ) { for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結(jié)點 boolean isData = p.isData; // 結(jié)點類型 Object item = p.item; // 結(jié)點值 if (item != p && (item != null) == isData) { // 如果結(jié)點還未匹配過 if (isData == haveData) // 同種類型結(jié)點不能匹配 break; if (p.casItem(item, e)) { // p指向從隊首開始向后的第一個匹配結(jié)點 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結(jié)點上的等待線程 return LinkedTransferQueue.cast(item); // 返回匹配結(jié)點的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist } if (how != NOW) { if (s == null) s = new Node(e, haveData); // 創(chuàng)建一個入隊結(jié)點, 添加到隊尾 Node pred = tryAppend(s, haveData); // pred指向s的前驅(qū)結(jié)點或s(隊列中只有一個結(jié)點)或null(tryAppend失敗) if (pred == null) continue retry; // 入隊失敗,則重試 if (how != ASYNC) return awaitMatch(s, pred, e, (how == TIMED), nanos); // 等待出隊線程 } return e; } }
最終隊列結(jié)構(gòu)如下,匹配結(jié)點的值被置換為null,ThreadA被喚醒,ThreadD拿到匹配結(jié)點上的元素值“9”并返回:
③ThreadA被喚醒后繼續(xù)執(zhí)行
ThreadA被喚醒后,從原阻塞處——繼續(xù)向下執(zhí)行,然后進入下一次自旋,進入CASE1分支:
/** * 自旋/yield/阻塞,直到結(jié)點s被匹配. * * @param s 等待被匹配的結(jié)點s * @param pred s的前驅(qū)結(jié)點或s自身(隊列中只有一個結(jié)點的情況) * @param e 結(jié)點s的值 * @return 匹配值, 或e本身(中斷或超時情況) */ private E awaitMatch(Node s, Node pred, E e, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; // 限時等待情況下使用 Thread w = Thread.currentThread(); int spins = -1; // 自旋次數(shù), 鎖優(yōu)化操作 ThreadLocalRandom randomYields = null; // bound if needed for (; ; ) { Object item = s.item; if (item != e) { // CASE1: 匹配成功 // assert item != s; s.forgetContents(); // avoid garbage return LinkedTransferQueue.cast(item); } if ((w.isInterrupted() || (timed && nanos <= 0)) && s.casItem(e, s)) { // CASE2: 取消(線程被中斷或超時) unsplice(pred, s); return e; } // CASE3: 設(shè)置輕量級鎖(自旋 -> yield) if (spins < 0) { // 初始化自旋次數(shù) if ((spins = spinsFor(pred, s.isData)) > 0) randomYields = ThreadLocalRandom.current(); } else if (spins > 0) { // 自選次數(shù)減1 --spins; if (randomYields.nextInt(CHAINED_SPINS) == 0) Thread.yield(); // 隨機yield線程 } else if (s.waiter == null) { // waiter保存待阻塞線程 s.waiter = w; } else if (timed) { // 限時等待情況, 計算剩余有效時間 nanos = deadline - System.nanoTime(); if (nanos > 0L) LockSupport.parkNanos(this, nanos); } else { // CASE4: 阻塞線程 LockSupport.park(this); } } }
在CASE1分支中,由于結(jié)點的item項已經(jīng)被替換成了null,所以調(diào)用s.forgetContents(),并返回null
/** * 設(shè)置當前結(jié)點的值為自身. * 設(shè)置當前結(jié)點的等待線程為null. */ final void forgetContents() { UNSAFE.putObject(this, itemOffset, this); UNSAFE.putObject(this, waiterOffset, null); }
最終隊列結(jié)構(gòu)如下:
④ThreadE調(diào)用take方法出隊元素
ThreadE調(diào)用take方法出隊元素,過程和步驟②相同,進入xfer方法(e == null,hasData == false),由于head指針指向的元素已經(jīng)匹配過了,所以
向后繼續(xù)查找,找到第一個未匹配過的結(jié)點“2”,然后置換結(jié)點“2”中的元素值為null,喚醒線程ThreadB,返回匹配結(jié)點的元素值“2”:
for (Node h = head, p = h; p != null; ) { // 嘗試匹配p指向的結(jié)點 boolean isData = p.isData; // 結(jié)點類型 Object item = p.item; // 結(jié)點值 if (item != p && (item != null) == isData) { // 如果結(jié)點還未匹配過 if (isData == haveData) // 同種類型結(jié)點不能匹配 break; if (p.casItem(item, e)) { // p指向從隊首開始向后的第一個匹配結(jié)點 for (Node q = p; q != h; ) { Node n = q.next; // update by 2 unless singleton if (head == h && casHead(h, n == null ? q : n)) { h.forgetNext(); break; } // advance and retry if ((h = head) == null || (q = h.next) == null || !q.isMatched()) break; // unless slack < 2 } LockSupport.unpark(p.waiter); // 喚醒匹配結(jié)點上的等待線程 return LinkedTransferQueue.cast(item); // 返回匹配結(jié)點的值 } } Node n = p.next; p = (p != n) ? n : (h = head); // Use head if p offlist }
此時隊列狀態(tài)如下,可以看到,隊首指針head一次性向后跳了2個位置,原來已經(jīng)匹配過的元素的next指針指向自身,等待被GC回收,這其實就是LinkedTransferQueue的“松弛”策略:
⑤ThreadB被喚醒后繼續(xù)執(zhí)行
過程和步驟③完全相同,在awaitMatch方法中,將結(jié)點的item置為this,然后返回匹配結(jié)點值——null,最終隊列結(jié)構(gòu)如下:
⑥ThreadF調(diào)用take方法出隊元素
ThreadF調(diào)用take方法出隊元素,過程和步驟②相同,進入xfer方法(e == null,hasData == false),由于head指針指向的元素此時沒有匹配,所以不用像步驟②那樣向后查找,而是直接置換匹配結(jié)點的元素值“93”,然后喚醒ThreadC,返回匹配值“93”。最終隊列結(jié)構(gòu)如下:
⑦ThreadC被喚醒后繼續(xù)執(zhí)行
過程和步驟③完全相同,在awaitMatch方法中,將結(jié)點的item置為this,然后返回匹配結(jié)點值——null,最終隊列結(jié)構(gòu)如下:
此時的隊列結(jié)構(gòu),讀者移一定感到非常奇怪,并不嚴格遵守隊列的定義,這其實就是“Dual Queue”算法的實現(xiàn),為了對自旋優(yōu)化,做了很多看似別扭的操作,不必奇怪。
假設(shè)此時再有一個線程ThreadH調(diào)用take方法出隊元素會怎么樣?其實這是隊列已經(jīng)空了,ThreadH會被阻塞,但是會創(chuàng)建一個“請求結(jié)點”入隊:
/** * 嘗試將結(jié)點s添加到隊尾. * * @param s 待添加的結(jié)點 * @param haveData true: 數(shù)據(jù)結(jié)點 * @return 返回null表示失敗; 否則返回s的前驅(qū)結(jié)點(沒有前驅(qū)則返回s自身) */ private Node tryAppend(Node s, boolean haveData) { for (Node t = tail, p = t; ; ) { Node n, u; if (p == null && (p = head) == null) { // CASE1: 隊列為空 if (casHead(null, s)) // 設(shè)置隊首指針head return s; } else if (p.cannotPrecede(haveData)) // CASE2: 結(jié)點s不能鏈接到結(jié)點p return null; else if ((n = p.next) != null) // CASE3: 遍歷至隊尾結(jié)點 p = p != t && t != (u = tail) ? (t = u) : // stale tail (p != n) ? n : null; // restart if off list else if (!p.casNext(null, s)) // CASE4: 插入結(jié)點s p = p.next; // re-read on CAS failure else { // CASE5: 嘗試進行松弛操作 if (p != t) { // update if slack now >= 2 while ((tail != t || !casTail(t, s)) && (t = tail) != null && (s = t.next) != null && // advance and retry (s = s.next) != null && s != t) ; } return p; } } }
調(diào)用完tryAppend方法后,隊列結(jié)構(gòu)如下,橙色的為“請求結(jié)點”—— item==null && isData==false:
然后ThreadH也會進入在awaitMatch方法后進入阻塞,并等待一個入隊線程的到來。最終隊列結(jié)構(gòu)如下:
三、總結(jié)截止本篇為止,我們已經(jīng)學習完了juc-collection框架中的所有阻塞隊列,如下表所示:
隊列特性 | 有界隊列 | 近似無界隊列 | 無界隊列 | 特殊隊列 |
---|---|---|---|---|
有鎖算法 | ArrayBlockingQueue | LinkedBlockingQueue、LinkedBlockingDeque | / | PriorityBlockingQueue、DelayQueue |
無鎖算法 | / | / | LinkedTransferQueue | SynchronousQueue |
可以看到,LinkedTransferQueue其實兼具了SynchronousQueue的特性以及無鎖算法的性能,并且是一種無界隊列:
和SynchronousQueue相比,LinkedTransferQueue可以存儲實際的數(shù)據(jù);
和其它阻塞隊列相比,LinkedTransferQueue直接用無鎖算法實現(xiàn),性能有所提升。
另外,由于LinkedTransferQueue可以存放兩種不同類型的結(jié)點,所以稱之為“Dual Queue”:
內(nèi)部Node結(jié)點定義了一個 boolean 型字段——isData,表示該結(jié)點是“數(shù)據(jù)結(jié)點”還是“請求結(jié)點”。
為了節(jié)省 CAS 操作的開銷,LinkedTransferQueue使用了松弛(slack)操作:
在結(jié)點被匹配(被刪除)之后,不會立即更新隊列的head、tail,而是當 head、tail結(jié)點與最近一個未匹配的結(jié)點之間的距離超過“松弛閥值”后才會更新(默認為 2)。這個“松弛閥值”一般為1到3,如果太大會增加沿鏈表查找未匹配結(jié)點的時間,太小會增加 CAS 的開銷。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/77196.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計模式,設(shè)計了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現(xiàn)了接口,在多線程進階二五之框架中,我們提到過實現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計的有序工具類。唯一的區(qū)別是針對的僅僅是鍵值,針對鍵值對進行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當有多個線程同時進行寫操作時,才會進行同步。可以看到,上述方法返回一個迭代器對象,的迭代是在舊數(shù)組上進行的,當創(chuàng)建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發(fā)修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過了,底層基于跳表實現(xiàn),其操作平均時間復雜度均為。事實上,內(nèi)部引用了一個對象,以組合方式,委托對象實現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對快照進行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現(xiàn)了接口。該類在構(gòu)造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內(nèi)部通過來保證線程安全,所以的整體實現(xiàn)時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發(fā)于一世流云專欄:ht...
閱讀 1438·2023-04-25 16:31
閱讀 2040·2021-11-24 10:33
閱讀 2746·2021-09-23 11:33
閱讀 2528·2021-09-23 11:31
閱讀 2900·2021-09-08 09:45
閱讀 2336·2021-09-06 15:02
閱讀 2647·2019-08-30 14:21
閱讀 2313·2019-08-30 12:56