摘要:實現原理是通過基于單鏈表的條件隊列來管理等待線程的。中斷在轉移到同步隊列期間或之后發生,此時表明有線程正在調用轉移節點。在該種中斷模式下,再次設置線程的中斷狀態。
1. 簡介
Condition是一個接口,AbstractQueuedSynchronizer 中的ConditionObject內部類實現了這個接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Object中的wait/notify/notifyAll等方法相似。這兩者相同的地方在于,它們所提供的等待/通知方法均是為了協同線程的運行秩序。只不過,Object 中的方法需要配合 synchronized 關鍵字使用,而 Condition 中的方法則要配合鎖對象使用,并通過newCondition方法獲取實現類對象。除此之外,Condition 接口中聲明的方法功能上更為豐富一些。比如,Condition 聲明了具有不響應中斷和超時功能的等待接口,這些都是 Object wait 方法所不具備的。
本篇文章是上一篇文章AbstractQueuedSynchronizer 原理分析 - 獨占/共享模式的續篇,在學習 Condition 的原理前,建議大家先去了解 AbstractQueuedSynchronizer 同步隊列相關原理。本篇文章會涉及到同步隊列相關知識,這些知識在上一篇文章分析過。
關于Condition的簡介這里先說到這,接下來分析一下Condition實現類ConditionObject的原理。
2. 實現原理ConditionObject是通過基于單鏈表的條件隊列來管理等待線程的。線程在調用await方法進行等待時,會釋放同步狀態。同時線程將會被封裝到一個等待節點中,并將節點置入條件隊列尾部進行等待。當有線程在獲取獨占鎖的情況下調用signal或singalAll方法時,隊列中的等待線程將會被喚醒,重新競爭鎖。另外,需要說明的是,一個鎖對象可同時創建多個 ConditionObject 對象,這意味著多個競爭同一獨占鎖的線程可在不同的條件隊列中進行等待。在喚醒時,可喚醒指定條件隊列中的線程。其大致示意圖如下:
以上就是 ConditionObject 所實現的等待/通知機制的大致原理,并不是很難理解。當然,在具體的實現中,則考慮的更為細致一些。相關細節將會在接下來一章中進行說明,繼續往下看吧。
3. 源碼解析 3.1 等待ConditionObject 中實現了幾種不同的等待方法,每種方法均有它自己的特點。比如await()會響應中斷,而awaitUninterruptibly()則不響應中斷。await(long, TimeUnit)則會在響應中斷的基礎上,新增了超時功能。除此之外,還有一些等待方法,這里就不一一列舉了。
在本節中,我將主要分析await()的方法實現。其他的等待方法大同小異,就不一一分析了,有興趣的朋友可以自己看一下。好了,接下來進入源碼分析階段。
/** * await 是一個響應中斷的等待方法,主要邏輯流程如下: * 1. 如果線程中斷了,拋出 InterruptedException 異常 * 2. 將線程封裝到節點對象里,并將節點添加到條件隊列尾部 * 3. 保存并完全釋放同步狀態,保存下來的同步狀態在重新競爭鎖時會用到 * 4. 線程進入等待狀態,直到被通知或中斷才會恢復運行 * 5. 使用第3步保存的同步狀態去競爭獨占鎖 */ public final void await() throws InterruptedException { // 線程中斷,則拋出中斷異常,對應步驟1 if (Thread.interrupted()) throw new InterruptedException(); // 添加等待節點到條件隊列尾部,對應步驟2 Node node = addConditionWaiter(); // 保存并完全釋放同步狀態,對應步驟3。此方法的意義會在后面詳細說明。 int savedState = fullyRelease(node); int interruptMode = 0; /* * 判斷節點是否在同步隊列上,如果不在則阻塞線程。 * 循環結束的條件: * 1. 其他線程調用 singal/singalAll,node 將會被轉移到同步隊列上。node 對應線程將 * 會在獲取同步狀態的過程中被喚醒,并走出 while 循環。 * 2. 線程在阻塞過程中產生中斷 */ while (!isOnSyncQueue(node)) { // 調用 LockSupport.park 阻塞當前線程,對應步驟4 LockSupport.park(this); /* * 檢測中斷模式,這里有兩種中斷模式,如下: * THROW_IE: * 中斷在 node 轉移到同步隊列“前”發生,需要當前線程自行將 node 轉移到同步隊 * 列中,并在隨后拋出 InterruptedException 異常。 * * REINTERRUPT: * 中斷在 node 轉移到同步隊列“期間”或“之后”發生,此時表明有線程正在調用 * singal/singalAll 轉移節點。在該種中斷模式下,再次設置線程的中斷狀態。 * 向后傳遞中斷標志,由后續代碼去處理中斷。 */ if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } /* * 被轉移到同步隊列的節點 node 將在 acquireQueued 方法中重新獲取同步狀態,注意這里 * 的這里的 savedState 是上面調用 fullyRelease 所返回的值,與此對應,可以把這里的 * acquireQueued 作用理解為 fullyAcquire(并不存在這個方法)。 * * 如果上面的 while 循環沒有產生中斷,則 interruptMode = 0。但 acquireQueued 方法 * 可能會產生中斷,產生中斷時返回 true。這里仍將 interruptMode 設為 REINTERRUPT, * 目的是繼續向后傳遞中斷,acquireQueued 不會處理中斷。 */ if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; /* * 正常通過 singal/singalAll 轉移節點到同步隊列時,nextWaiter 引用會被置空。 * 若發生線程產生中斷(THROW_IE)或 fullyRelease 方法出現錯誤等異常情況, * 該引用則不會被置空 */ if (node.nextWaiter != null) // clean up if cancelled // 清理等待狀態非 CONDITION 的節點 unlinkCancelledWaiters(); if (interruptMode != 0) /* * 根據 interruptMode 覺得中斷的處理方式: * THROW_IE:拋出 InterruptedException 異常 * REINTERRUPT:重新設置線程中斷標志 */ reportInterruptAfterWait(interruptMode); } /** 將當先線程封裝成節點,并將節點添加到條件隊列尾部 */ private Node addConditionWaiter() { Node t = lastWaiter; /* * 清理等待狀態為 CANCELLED 的節點。fullyRelease 內部調用 release 發生異常或釋放同步狀 * 態失敗時,節點的等待狀態會被設置為 CANCELLED。所以這里要清理一下已取消的節點 */ if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } // 創建節點,并將節點置于隊列尾部 Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } /** 清理等待狀態為 CANCELLED 的節點 */ private void unlinkCancelledWaiters() { Node t = firstWaiter; // 指向上一個等待狀態為非 CANCELLED 的節點 Node trail = null; while (t != null) { Node next = t.nextWaiter; if (t.waitStatus != Node.CONDITION) { t.nextWaiter = null; /* * trail 為 null,表明 next 之前的節點等待狀態均為 CANCELLED,此時更新 * firstWaiter 引用的指向。 * trail 不為 null,表明 next 之前有節點的等待狀態為 CONDITION,這時將 * trail.nextWaiter 指向 next 節點。 */ if (trail == null) firstWaiter = next; else trail.nextWaiter = next; // next 為 null,表明遍歷到條件隊列尾部了,此時將 lastWaiter 指向 trail if (next == null) lastWaiter = trail; } else // t.waitStatus = Node.CONDITION,則將 trail 指向 t trail = t; t = next; } } /** * 這個方法用于完全釋放同步狀態。這里解釋一下完全釋放的原因:為了避免死鎖的產生,鎖的實現上 * 一般應該支持重入功能。對應的場景就是一個線程在不釋放鎖的情況下可以多次調用同一把鎖的 * lock 方法進行加鎖,且不會加鎖失敗,如失敗必然導致導致死鎖。鎖的實現類可通過 AQS 中的整型成員 * 變量 state 記錄加鎖次數,每次加鎖,將 state++。每次 unlock 方法釋放鎖時,則將 state--, * 直至 state = 0,線程完全釋放鎖。用這種方式即可實現了鎖的重入功能。 */ final int fullyRelease(Node node) { boolean failed = true; try { // 獲取同步狀態數值 int savedState = getState(); // 調用 release 釋放指定數量的同步狀態 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { // 如果 relase 出現異常或釋放同步狀態失敗,此處將 node 的等待狀態設為 CANCELLED if (failed) node.waitStatus = Node.CANCELLED; } } /** 該方法用于判斷節點 node 是否在同步隊列上 */ final boolean isOnSyncQueue(Node node) { /* * 節點在同步隊列上時,其狀態可能為 0、SIGNAL、PROPAGATE 和 CANCELLED 其中之一, * 但不會為 CONDITION,所以可已通過節點的等待狀態來判斷節點所處的隊列。 * * node.prev 僅會在節點獲取同步狀態后,調用 setHead 方法將自己設為頭結點時被置為 * null,所以只要節點在同步隊列上,node.prev 一定不會為 null */ if (node.waitStatus == Node.CONDITION || node.prev == null) return false; /* * 如果節點后繼被為 null,則表明節點在同步隊列上。因為條件隊列使用的是 nextWaiter 指 * 向后繼節點的,條件隊列上節點的 next 指針均為 null。但僅以 node.next != null 條 * 件斷定節點在同步隊列是不充分的。節點在入隊過程中,是先設置 node.prev,后設置 * node.next。如果設置完 node.prev 后,線程被切換了,此時 node.next 仍然為 * null,但此時 node 確實已經在同步隊列上了,所以這里還需要進行后續的判斷。 */ if (node.next != null) return true; // 在同步隊列上,從后向前查找 node 節點 return findNodeFromTail(node); } /** 由于同步隊列上的的節點 prev 引用不會為空,所以這里從后向前查找 node 節點 */ private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } /** 檢測線程在等待期間是否發生了中斷 */ private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } /** * 判斷中斷發生的時機,分為兩種: * 1. 中斷在節點被轉移到同步隊列前發生,此時返回 true * 2. 中斷在節點被轉移到同步隊列期間或之后發生,此時返回 false */ final boolean transferAfterCancelledWait(Node node) { // 中斷在節點被轉移到同步隊列前發生,此時自行將節點轉移到同步隊列上,并返回 true if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { // 調用 enq 將節點轉移到同步隊列中 enq(node); return true; } /* * 如果上面的條件分支失敗了,則表明已經有線程在調用 signal/signalAll 方法了,這兩個 * 方法會先將節點等待狀態由 CONDITION 設置為 0 后,再調用 enq 方法轉移節點。下面判斷節 * 點是否已經在同步隊列上的原因是,signal/signalAll 方法可能僅設置了等待狀態,還沒 * 來得及轉移節點就被切換走了。所以這里用自旋的方式判斷 signal/signalAll 是否已經完 * 成了轉移操作。這種情況表明了中斷發生在節點被轉移到同步隊列期間。 */ while (!isOnSyncQueue(node)) Thread.yield(); } // 中斷在節點被轉移到同步隊列期間或之后發生,返回 false return false; } /** * 根據中斷類型做出相應的處理: * THROW_IE:拋出 InterruptedException 異常 * REINTERRUPT:重新設置中斷標志,向后傳遞中斷 */ private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); } /** 中斷線程 */ static void selfInterrupt() { Thread.currentThread().interrupt(); }3.2 通知
/** 將條件隊列中的頭結點轉移到同步隊列中 */ public final void signal() { // 檢查線程是否獲取了獨占鎖,未獲取獨占鎖調用 signal 方法是不允許的 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) // 將頭結點轉移到同步隊列中 doSignal(first); } private void doSignal(Node first) { do { /* * 將 firstWaiter 指向 first 節點的 nextWaiter 節點,while 循環將會用到更新后的 * firstWaiter 作為判斷條件。 */ if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; // 將頭結點從條件隊列中移除 first.nextWaiter = null; /* * 調用 transferForSignal 將節點轉移到同步隊列中,如果失敗,且 firstWaiter * 不為 null,則再次進行嘗試。transferForSignal 成功了,while 循環就結束了。 */ } while (!transferForSignal(first) && (first = firstWaiter) != null); } /** 這個方法用于將條件隊列中的節點轉移到同步隊列中 */ final boolean transferForSignal(Node node) { /* * 如果將節點的等待狀態由 CONDITION 設為 0 失敗,則表明節點被取消。 * 因為 transferForSignal 中不存在線程競爭的問題,所以下面的 CAS * 失敗的唯一原因是節點的等待狀態為 CANCELLED。 */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; // 調用 enq 方法將 node 轉移到同步隊列中,并返回 node 的前驅節點 p Node p = enq(node); int ws = p.waitStatus; /* * 如果前驅節點的等待狀態 ws > 0,則表明前驅節點處于取消狀態,此時應喚醒 node 對應的 * 線程去獲取同步狀態。如果 ws <= 0,這里通過 CAS 將節點 p 的等待設為 SIGNAL。 * 這樣,節點 p 在釋放同步狀態后,才會喚醒后繼節點 node。如果 CAS 設置失敗,則應立即 * 喚醒 node 節點對應的線程。以免因 node 沒有被喚醒導致同步隊列掛掉。關于同步隊列的相關的 * 知識,請參考我的另一篇文章“AbstractQueuedSynchronizer 原理分析 - 獨占/共享模式”, * 鏈接為:http://t.cn/RuERpHl */ if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
看完了 signal 方法的分析,下面再來看看 signalAll 的源碼分析,如下:
public final void signalAll() { // 檢查線程是否獲取了獨占鎖 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignalAll(first); } private void doSignalAll(Node first) { lastWaiter = firstWaiter = null; /* * 將條件隊列中所有的節點轉移到同步隊列中。與 doSignal 方法略有不同,主要區別在 * while 循環的循環條件上,下的循環只有在條件隊列中沒節點后才終止。 */ do { Node next = first.nextWaiter; // 將 first 節點從條件隊列中移除 first.nextWaiter = null; // 轉移節點到同步隊列上 transferForSignal(first); first = next; } while (first != null); }4. 其他
在我閱讀 ConditionObject 源碼時發現了一個問題 - await 方法竟然沒有做同步控制。而在 signal 和 signalAll 方法開頭都會調用 isHeldExclusively 檢測線程是否已經獲取了獨占鎖,未獲取獨占鎖調用這兩個方法會拋出異常。但在 await 方法中,卻沒有進行相關的檢測。如果在正確的使用方式下調用 await 方法是不會出現問題的,所謂正確的使用方式指的是在獲取鎖的情況下調用 await 方法。但如果沒獲取鎖就調用該方法,就會產生線程競爭的情況,這將會對條件隊列的結構造成破壞。這里再來看一下新增節點的方法源碼,如下:
private Node addConditionWaiter() { Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); // 存在競爭時將會導致節點入隊出錯 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
假如現在有線程 t1 和 t2,對應節點 node1 和 node2。線程 t1 獲取了鎖,而 t2 未獲取鎖,此時條件隊列為空,即 firstWaiter = lastWaiter = null。演繹一下會導致條件隊列被破壞的場景,如下:
時刻1:線程 t1 和 t2 同時執行到 if (t == null),兩個線程都認為 if 條件滿足
時刻2:線程 t1 初始化 firstWaiter,即將 firstWaiter 指向 node1
時刻3:線程 t2 再次修改 firstWaiter 的指向,此時 firstWaiter 指向 node2
如上,如果線程是按照上面的順序執行,這會導致隊列被破壞。firstWaiter 本應該指向 node1,但結果卻指向了 node2,node1 被排擠出了隊列。這樣會導致什么問題呢?這樣可能會導致線程 t1 一直阻塞下去。因為 signal/signalAll 是從條件隊列頭部轉移節點的,但 node1 不在隊列中,所以 node1 無法被轉移到同步隊列上。在不出現中斷的情況下,node1 對應的線程 t1 會被永久阻塞住。
這里未對 await 方法進行同步控制,導致條件隊列出現問題,應該算 ConditionObject 實現上的一個缺陷了。關于這個缺陷,博客園博主 活在夢裡 在他的文章 AbstractQueuedSynchronizer源碼解讀--續篇之Condition 中也提到了。并向 JDK 開發者提了一個 BUG,BUG 鏈接為 JDK-8187408,有興趣的同學可以去看看。
5. 總結到這里,Condition 的原理就分析完了。分析完 Condition 原理,關于 AbstractQueuedSynchronizer 的分析也就結束了。總體來說,通過分析 AQS 并寫成博客,使我對 AQS 的原理有了更深刻的認識。AQS 是 JDK 中鎖和其他并發組件實現的基礎,弄懂 AQS 原理對后續在分析各種鎖和其他同步組件大有裨益。
AQS 本身實現比較復雜,要處理各種各樣的情況。作為類庫,AQS 要考慮和處理各種可能的情況,實現起來可謂非常復雜。不僅如此,AQS 還很好的封裝了同步隊列的管理,線程的阻塞與喚醒等基礎操作,大大降低了繼承類實現同步控制功能的復雜度。所以,在本文的最后,再次向 AQS 的作者,Java 大師Doug Lea致敬。
好了,本文到此結束,謝謝大家閱讀。
參考AbstractQueuedSynchronizer源碼解讀--續篇之Condition
本文在知識共享許可協議 4.0 下發布,轉載需在明顯位置處注明出處
作者:coolblog
本文同步發布在我的個人博客:http://www.coolblog.xyz
本作品采用知識共享署名-非商業性使用-禁止演繹 4.0 國際許可協議進行許可。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/71116.html
摘要:實現原理是通過基于單鏈表的條件隊列來管理等待線程的。中斷在轉移到同步隊列期間或之后發生,此時表明有線程正在調用轉移節點。在該種中斷模式下,再次設置線程的中斷狀態。 1. 簡介 Condition是一個接口,AbstractQueuedSynchronizer 中的ConditionObject內部類實現了這個接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Objec...
摘要:實現原理是通過基于單鏈表的條件隊列來管理等待線程的。中斷在轉移到同步隊列期間或之后發生,此時表明有線程正在調用轉移節點。在該種中斷模式下,再次設置線程的中斷狀態。 1. 簡介 Condition是一個接口,AbstractQueuedSynchronizer 中的ConditionObject內部類實現了這個接口。Condition聲明了一組等待/通知的方法,這些方法的功能與Objec...
摘要:同步器擁有三個成員變量隊列的頭結點隊列的尾節點和狀態。對于同步器維護的狀態,多個線程對其的獲取將會產生一個鏈式的結構。使用將當前線程,關于后續會詳細介紹。 簡介提供了一個基于FIFO隊列,可以用于構建鎖或者其他相關同步裝置的基礎框架。該同步器(以下簡稱同步器)利用了一個int來表示狀態,期望它能夠成為實現大部分同步需求的基礎。使用的方法是繼承,子類通過繼承同步器并需要實現它的方法來管理...
摘要:總結總的來說,操作順序是進入隊列喚醒,成功獲得鎖將狀態變為并將其從轉到使再次獲得鎖執行余下代碼。當然這是理由狀態下,為了討論及的原理,實際的操作時序也有可能變化。 AQS Condition 最近面試被問到java concurrent包下有哪些熟悉的,用過的工具。因此來回顧一下,這些工具的底層實現,AbstractQueuedSynchronizer。在網上看到了其他人的一些技術博客...
摘要:總結總的來說,操作順序是進入隊列喚醒,成功獲得鎖將狀態變為并將其從轉到使再次獲得鎖執行余下代碼。當然這是理由狀態下,為了討論及的原理,實際的操作時序也有可能變化。 AQS Condition 最近面試被問到java concurrent包下有哪些熟悉的,用過的工具。因此來回顧一下,這些工具的底層實現,AbstractQueuedSynchronizer。在網上看到了其他人的一些技術博客...
閱讀 3084·2021-09-22 15:20
閱讀 2607·2019-08-30 15:54
閱讀 1971·2019-08-30 14:06
閱讀 3120·2019-08-30 13:05
閱讀 2462·2019-08-29 18:36
閱讀 575·2019-08-29 15:10
閱讀 529·2019-08-29 11:17
閱讀 825·2019-08-28 18:11