摘要:三總結主要用于線程之間的數據交換,由于采用無鎖算法,其性能一般比單純的其它阻塞隊列要高。它的最大特點時不存儲實際元素,而是在內部通過棧或隊列結構保存阻塞線程。
本文首發于一世流云專欄:https://segmentfault.com/blog...一、SynchronousQueue簡介
SynchronousQueue是JDK1.5時,隨著J.U.C包一起引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基于棧和隊列實現:
沒有看錯,SynchronousQueue的底層實現包含兩種數據結構——棧和隊列。這是一種非常特殊的阻塞隊列,它的特點簡要概括如下:
入隊線程和出隊線程必須一一匹配,否則任意先到達的線程會阻塞。比如ThreadA進行入隊操作,在有其它線程執行出隊操作之前,ThreadA會一直等待,反之亦然;
SynchronousQueue內部不保存任何元素,也就是說它的容量為0,數據直接在配對的生產者和消費者線程之間傳遞,不會將數據緩沖到隊列中。
SynchronousQueue支持公平/非公平策略。其中非公平模式,基于內部數據結構——“棧”來實現,公平模式,基于內部數據結構——“隊列”來實現;
SynchronousQueue基于一種名為“Dual stack and Dual queue”的無鎖算法實現。
注意:上述的特點1,和我們之前介紹的Exchanger其實非常相似,可以類比Exchanger的功能來理解。二、SynchronousQueue原理 構造
之前提到,SynchronousQueue根據公平/非公平訪問策略的不同,內部使用了兩種不同的數據結構:棧和隊列。我們先來看下對象的構造,SynchronousQueue只有2種構造器:
/** * 默認構造器. * 默認使用非公平策略. */ public SynchronousQueue() { this(false); }
/** * 指定策略的構造器. */ public SynchronousQueue(boolean fair) { transferer = fair ? new TransferQueue() : new TransferStack (); }
可以看到,對于公平策略,內部構造了一個TransferQueue對象,而非公平策略則是構造了TransferStack對象。這兩個類都繼承了內部類Transferer,SynchronousQueue中的所有方法,其實都是委托調用了TransferQueue/TransferStack的方法:
public class SynchronousQueue棧結構extends AbstractQueue implements BlockingQueue , java.io.Serializable { ? /** * tranferer對象, 構造時根據策略類型確定. */ private transient volatile Transferer transferer; ? /** * Shared internal API for dual stacks and queues. */ abstract static class Transferer { /** * Performs a put or take. * * @param e 非null表示 生產者 -> 消費者; * null表示, 消費者 -> 生產者. * @return 非null表示傳遞的數據; null表示傳遞失敗(超時或中斷). */ abstract E transfer(E e, boolean timed, long nanos); } ? /** * Dual stack(雙棧結構). * 非公平策略時使用. */ static final class TransferStack extends Transferer { // ... } ? /** * Dual Queue(雙端隊列). * 公平策略時使用. */ static final class TransferQueue extends Transferer { // ... } ? // ... }
非公平策略由TransferStack類實現,既然TransferStack是棧,那就有結點。TransferStack內部定義了名為SNode的結點:
static final class SNode { volatile SNode next; volatile SNode match; // 與當前結點配對的結點 volatile Thread waiter; // 當前結點對應的線程 Object item; // 實際數據或null int mode; // 結點類型 ? SNode(Object item) { this.item = item; } ?? // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long matchOffset; private static final long nextOffset; ? static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = SNode.class; matchOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("match")); nextOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("next")); } catch (Exception e) { throw new Error(e); } } // ... }
上述SNode結點的定義中有個mode字段,表示結點的類型。TransferStack一共定義了三種結點類型,任何線程對TransferStack的操作都會創建下述三種類型的某種結點:
REQUEST:表示未配對的消費者(當線程進行出隊操作時,會創建一個mode值為REQUEST的SNode結點 )
DATA:表示未配對的生產者(當線程進行入隊操作時,會創建一個mode值為DATA的SNode結點 )
FULFILLING:表示配對成功的消費者/生產者
static final class TransferStack核心操作——put/takeextends Transferer { ? /** * 未配對的消費者 */ static final int REQUEST = 0; /** * 未配對的生產者 */ static final int DATA = 1; /** * 配對成功的消費者/生產者 */ static final int FULFILLING = 2; ? volatile SNode head; ? // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long headOffset; ? static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = TransferStack.class; headOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("head")); } catch (Exception e) { throw new Error(e); } } ? // ... }
SynchronousQueue的入隊操作調用了put方法:
/** * 入隊指定元素e. * 如果沒有另一個線程進行出隊操作, 則阻塞該入隊線程. */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); if (transferer.transfer(e, false, 0) == null) { Thread.interrupted(); throw new InterruptedException(); } }
SynchronousQueue的出隊操作調用了take方法:
/** * 出隊一個元素. * 如果沒有另一個線程進行出隊操作, 則阻塞該入隊線程. */ public E take() throws InterruptedException { E e = transferer.transfer(null, false, 0); if (e != null) return e; Thread.interrupted(); throw new InterruptedException(); }
可以看到,SynchronousQueue一樣不支持null元素,實際的入隊/出隊操作都是委托給了transfer方法,該方法返回null表示出/入隊失敗(通常是線程被中斷或超時):
/** * 入隊/出隊一個元素. */ E transfer(E e, boolean timed, long nanos) { SNode s = null; // s表示新創建的結點 // 入參e==null, 說明當前是出隊線程(消費者), 否則是入隊線程(生產者) // 入隊線程創建一個DATA結點, 出隊線程創建一個REQUEST結點 int mode = (e == null) ? REQUEST : DATA; for (; ; ) { // 自旋 SNode h = head; if (h == null || h.mode == mode) { // CASE1: 棧為空 或 棧頂結點類型與當前mode相同 if (timed && nanos <= 0) { // case1.1: 限時等待的情況 if (h != null && h.isCancelled()) casHead(h, h.next); else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // case1.2 將當前結點壓入棧 SNode m = awaitFulfill(s, timed, nanos); // 阻塞當前調用線程 if (m == s) { // 阻塞過程中被中斷 clean(s); return null; } // 此時m為配對結點 if ((h = head) != null && h.next == s) casHead(h, s.next); // 入隊線程null, 出隊線程返回配對結點的值 return (E) ((mode == REQUEST) ? m.item : s.item); } // 執行到此處說明入棧失敗(多個線程同時入棧導致CAS操作head失敗),則進入下一次自旋繼續執行 } else if (!isFulfilling(h.mode)) { // CASE2: 棧頂結點還未配對成功 if (h.isCancelled()) // case2.1: 元素取消情況(因中斷或超時)的處理 casHead(h, h.next); else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // case2.2: 將當前結點壓入棧中 for (; ; ) { SNode m = s.next; // s.next指向原棧頂結點(也就是與當前結點匹配的結點) if (m == null) { // m==null說明被其它線程搶先匹配了, 則跳出循環, 重新下一次自旋 casHead(s, null); s = null; break; } SNode mn = m.next; if (m.tryMatch(s)) { // 進行結點匹配 casHead(s, mn); // 匹配成功, 將匹配的兩個結點全部彈出棧 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回匹配值 } else // 匹配失敗 s.casNext(m, mn); // 移除原待匹配結點 } } } else { // CASE3: 其它線程正在匹配 SNode m = h.next; if (m == null) // 棧頂的next==null, 則直接彈出, 重新進入下一次自旋 casHead(h, null); else { // 嘗試和其它線程競爭匹配 SNode mn = m.next; if (m.tryMatch(h)) casHead(h, mn); // 匹配成功 else h.casNext(m, mn); // 匹配失敗(被其它線程搶先匹配成功了) } } } }
整個transfer方法考慮了限時等待的情況,且入隊/出隊其實都是調用了同一個方法,其主干邏輯就是在一個自旋中完成以下三種情況之一的操作,直到成功,或者被中斷或超時取消:
棧為空,或棧頂結點類型與當前入隊結點相同。這種情況,調用線程會阻塞;
棧頂結點還未配對成功,且與當前入隊結點可以配對。這種情況,直接進行配對操作;
棧頂結點正在配對中。這種情況,直接進行下一個結點的配對。
出/入隊示例講解為了便于理解,我們來看下面這個調用示例(假設不考慮限時等待的情況),假設一共有三個線程ThreadA、ThreadB、ThreadC:
①初始棧結構
初始棧為空,head為棧頂指針,始終指向棧頂結點:
②ThreadA(生產者)執行入隊操作
由于此時棧為空,所以ThreadA會進入CASE1,創建一個類型為DATA的結點:
if (h == null || h.mode == mode) { // CASE1: 棧為空 或 棧頂結點類型與當前mode相同 if (timed && nanos <= 0) { // case1.1: 限時等待的情況 if (h != null && h.isCancelled()) casHead(h, h.next); else return null; } else if (casHead(h, s = snode(s, e, h, mode))) { // case1.2 將當前結點壓入棧 SNode m = awaitFulfill(s, timed, nanos); // 阻塞當前調用線程 if (m == s) { // 阻塞過程中被中斷 clean(s); return null; } // 此時m為配對結點 if ((h = head) != null && h.next == s) casHead(h, s.next); // 入隊線程null, 出隊線程返回配對結點的值 return (E) ((mode == REQUEST) ? m.item : s.item); } // 執行到此處說明入棧失敗(多個線程同時入棧導致CAS操作head失敗),則進入下一次自旋繼續執行 }
CASE1分支中,將結點壓入棧后,會調用awaitFulfill方法,該方法會阻塞調用線程:
/** * 阻塞當前調用線程, 并將線程信息記錄在s.waiter字段上. * * @param s 等待的結點 * @return 返回配對的結點 或 當前結點(說明線程被中斷了) */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 性能優化操作(計算自旋次數) int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (; ; ) { if (w.isInterrupted()) s.tryCancel(); /** * s.match保存當前結點的匹配結點. * s.match==null說明還沒有匹配結點 * s.match==s說明當前結點s對應的線程被中斷了 */ SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins - 1) : 0; else if (s.waiter == null) // 還沒有匹配結點, 則保存當前線程 s.waiter = w; // s.waiter保存當前阻塞線程 else if (!timed) LockSupport.park(this); // 阻塞當前線程 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
此時棧結構如下,結點的waiter字段保存著創建該結點的線程ThreadA,ThreadA等待著被配對消費者線程喚醒:
③ThreadB(生產者)執行入隊操作
此時棧頂結點的類型和ThreadB創建的結點相同(都是DATA類型的結點),所以依然走CASE1分支,直接將結點壓入棧:
④ThreadC(消費者)執行出隊操作
此時棧頂結點的類型和ThreadC創建的結點匹配(棧頂DATA類型,ThreadC創建的是REQUEST類型),所以走CASE2分支,該分支會將匹配的兩個結點彈出棧:
else if (!isFulfilling(h.mode)) { // CASE2: 棧頂結點還未配對成功 if (h.isCancelled()) // case2.1: 元素取消情況(因中斷或超時)的處理 casHead(h, h.next); else if (casHead(h, s = snode(s, e, h, FULFILLING | mode))) { // case2.2: 將當前結點壓入棧中 for (; ; ) { SNode m = s.next; // s.next指向原棧頂結點(也就是與當前結點匹配的結點) if (m == null) { // m==null說明被其它線程搶先匹配了, 則跳出循環, 重新下一次自旋 casHead(s, null); s = null; break; } SNode mn = m.next; if (m.tryMatch(s)) { // 進行結點匹配 casHead(s, mn); // 匹配成功, 將匹配的兩個結點全部彈出棧 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回匹配值 } else // 匹配失敗 s.casNext(m, mn); // 移除原待匹配結點 } } }
上述isFulfilling方法就是判斷結點是否匹配:
/** * 判斷m是否已經配對成功. */ static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
ThreadC創建結點并壓入棧后,棧的結構如下:
此時,ThreadC會調用tryMatch方法進行匹配,該方法的主要作用有兩點:
將待結點的match字段置為與當前配對的結點(如上圖中,結點m是待配對結點,最終m.math == s)
喚醒待配對結點中的線程(如上圖中,喚醒結點m中ThreadB線程)
/** * 嘗試將當前結點和s結點配對. */ boolean tryMatch(SNode s) { if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) { Thread w = waiter; if (w != null) { // 喚醒當前結點對應的線程 waiter = null; LockSupport.unpark(w); } return true; } return match == s; // 配對成功返回true }
匹配完成后,會將匹配的兩個結點彈出棧,并返回匹配值:
if (m.tryMatch(s)) { // 進行結點匹配 casHead(s, mn); // 匹配成功, 將匹配的兩個結點全部彈出棧 return (E) ((mode == REQUEST) ? m.item : s.item); // 返回匹配值 }
最終,ThreadC拿到了等待配對結點中的數據并返回,此時棧的結構如下:
注意: CASE2分支中ThreadC創建的結點的mode值并不是REQUEST,其mode值為FULFILLING | mode,FULFILLING | mode的主要作用就是給棧頂結點置一個標識(二進制為11或10),表示當前有線程正在對棧頂匹配,這時如果有其它線程進入自旋(并發情況),則CASE2一定失敗,因為isFulfilling的結果必然為true,所以會進入CASE3分支——跳過棧頂結點進行匹配。
casHead(h, s = snode(s, e, h, FULFILLING | mode))
⑤ThreadB(生產者)喚醒后繼續執行
ThreadB被喚醒后,會從原阻塞處繼續執行,并進入下一次自旋,在下一次自旋中,由于結點的match字段已經有了匹配結點,所以直接返回配對結點:
/** * 阻塞當前調用線程, 并將線程信息記錄在s.waiter字段上. * * @param s 等待的結點 * @return 返回配對的結點 或 當前結點(說明線程被中斷了) */ SNode awaitFulfill(SNode s, boolean timed, long nanos) { final long deadline = timed ? System.nanoTime() + nanos : 0L; Thread w = Thread.currentThread(); // 性能優化操作(計算自旋次數) int spins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0); for (; ; ) { if (w.isInterrupted()) s.tryCancel(); /** * s.match保存當前結點的匹配結點. * s.match==null說明還沒有匹配結點 * s.match==s說明當前結點s對應的線程被中斷了 */ SNode m = s.match; if (m != null) return m; if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { s.tryCancel(); continue; } } if (spins > 0) spins = shouldSpin(s) ? (spins - 1) : 0; else if (s.waiter == null) // 還沒有匹配結點, 則保存當前線程 s.waiter = w; // s.waiter保存當前阻塞線程 else if (!timed) LockSupport.park(this); // 阻塞當前線程 else if (nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); } }
最終,在下面分支中返回:
else if (casHead(h, s = snode(s, e, h, mode))) { // case1.2 將當前結點壓入棧 SNode m = awaitFulfill(s, timed, nanos); // 阻塞當前調用線程 if (m == s) { // 阻塞過程中被中斷 clean(s); return null; } // 此時m為配對結點 if ((h = head) != null && h.next == s) casHead(h, s.next); // 入隊線程null, 出隊線程返回配對結點的值 return (E) ((mode == REQUEST) ? m.item : s.item); }
注意:對于入隊線程(生產者),返回的是它入隊時攜帶的原有元素值。隊列結構
SynchronousQueue的公平策略由TransferQueue類實現,TransferQueue內部定義了名為QNode的結點,一個head隊首指針,一個tail隊尾指針:
/** * Dual Queue(雙端隊列). * 公平策略時使用. */ static final class TransferQueueextends Transferer { /** * Head of queue */ transient volatile QNode head; /** * Tail of queue */ transient volatile QNode tail; /** * Reference to a cancelled node that might not yet have been * unlinked from queue because it was the last inserted node * when it was cancelled. */ transient volatile QNode cleanMe; /** * 隊列結點定義. */ static final class QNode { volatile QNode next; // next node in queue volatile Object item; // CAS"ed to or from null volatile Thread waiter; // to control park/unpark final boolean isData; // ... } // ... }
關于TransferQueue的transfer方法就不再贅述了,其思路和TransferStack大致相同,總之就是入隊/出隊必須一一匹配,否則任意一方就會加入隊列并等待匹配線程喚醒。讀者可以自行閱讀TransferQueued的源碼。三、總結
TransferQueue主要用于線程之間的數據交換,由于采用無鎖算法,其性能一般比單純的其它阻塞隊列要高。它的最大特點時不存儲實際元素,而是在內部通過棧或隊列結構保存阻塞線程。后面我們講JUC線程池框架的時候,還會再次看到它的身影。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77078.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現了接口,在多線程進階二五之框架中,我們提到過實現了接口,以提供和排序相關的功能,維持元素的有序性,所以就是一種為并發環境設計的有序工具類。唯一的區別是針對的僅僅是鍵值,針對鍵值對進行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發于一世流云專欄:https://seg...
摘要:僅僅當有多個線程同時進行寫操作時,才會進行同步。可以看到,上述方法返回一個迭代器對象,的迭代是在舊數組上進行的,當創建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發于一世流云專欄:https://...
摘要:我們之前已經介紹過了,底層基于跳表實現,其操作平均時間復雜度均為。事實上,內部引用了一個對象,以組合方式,委托對象實現了所有功能。線程安全內存的使用較多迭代是對快照進行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發于一世流云專欄:https://segmentfa...
摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現了接口。該類在構造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內部通過來保證線程安全,所以的整體實現時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發于一世流云專欄:ht...
閱讀 3672·2021-09-22 15:28
閱讀 1296·2021-09-03 10:35
閱讀 878·2021-09-02 15:21
閱讀 3474·2019-08-30 15:53
閱讀 3496·2019-08-29 17:25
閱讀 569·2019-08-29 13:22
閱讀 1555·2019-08-28 18:15
閱讀 2287·2019-08-26 13:57