摘要:如果停止了版本更新,可使用方法來解除所有因而阻塞的線程,包括指定版本號的。如果自己維護版本號,則應該保證遞增。
前言
相比上一篇而言,本文不需要太多的準備知識,但技巧性更強一些。因為分析、設計的過程比較復雜繁瑣,也限于篇幅,所以,主要展示如何解決這些需求,和講解代碼。另外,所講的內容也是后一篇實戰中需要用到的一個工具類。
需求介紹我需要編寫一個同步工具,它需要提供這樣幾個方法:await、pass、cancel。某個線程調用await時,會被阻塞;當調用pass方法時,之前因為await而阻塞的線程將全部被解除阻塞,之后調用await的線程繼續被阻塞,直到下一次調用pass。
該工具同時還維護一個版本號,await方法可以帶一個目標版本號,如果當前的版本號比目標版本號新或相同,則直接通過,否則,阻塞本線程,直到到達或超過目標版本。調用pass的時候,更新版本號。
如果停止了版本更新,可使用cancel方法來解除所有因await而阻塞的線程,包括指定版本號的。此方法用于避免無謂地等待。若await發生在cancel之后,則仍將被阻塞。
因為CountDownLatch不允許重復使用,CyclicBarrier只支持固定個數的線程,并且都沒有維護一個版本號,所以沒有已有的類能實現上面的需求,需要自己實現。
問題分析簡單分析可知,應該維護一個隊列,來保存當前被阻塞的線程,用于在pass時對它們一一解除阻塞,pass時應該使用一個新的隊列,否則不方便正確處理pass前和pass后調用await的線程。
至此,問題的關鍵就明了了:如何將隊列的替換和版本號的更新這兩個操作做成原子的。
解決方案以前在《JAVA并發編程實踐》曾看到過這樣一個小技巧,如果要原子地更新兩個變量,那么可以創建一個新的類將它們封裝起來,將這兩個變量當定義成類成員變量,更新時,用CAS更新這個類的引用即可。
因為較為復雜,下面先給出完整的代碼,再講解其中的關鍵。
注意:上面所說pass,在代碼中的具體實現為nextCycle,有兩個版本,一個自動維護版本號,一個由調用者維護版本號。
/** * @author trytocatch@163.com * @time 2013-1-31 */ public class BoundlessCyclicBarrier { protected final AtomicReference代碼分析waitQueueRef; public BoundlessCyclicBarrier() { this(0); } public BoundlessCyclicBarrier(int startVersion) { waitQueueRef = new AtomicReference (new VersionQueue(startVersion)); } public final void awaitWithAssignedVersion(int myVersion) throws InterruptedException { awaitImpl(true, myVersion, 0); } /** * * @param myVersion * @param nanosTimeout * @return if timeout, or be canceled and doesn"t reach myVersion, returns false * @throws InterruptedException */ public final boolean awaitWithAssignedVersion(int myVersion, long nanosTimeout) throws InterruptedException { return awaitImpl(true, myVersion, nanosTimeout); } public final void await() throws InterruptedException { awaitImpl(false, 0, 0); } /** * * @param nanosTimeout * @return if and only if timeout, returns false * @throws InterruptedException */ public final boolean await(long nanosTimeout) throws InterruptedException { return awaitImpl(false, 0, nanosTimeout); } /** * pass and version++(some threads may not be unparked when awaitImpl is in process, but it"s OK in this Barrier) * @return old queue version */ public int nextCycle() { VersionQueue oldQueue = waitQueueRef.get(); VersionQueue newQueue = new VersionQueue(oldQueue.version + 1); for(;;){ if (waitQueueRef.compareAndSet(oldQueue, newQueue)) { for (Thread t : oldQueue.queue) LockSupport.unpark(t); break; } oldQueue = waitQueueRef.get(); newQueue.version = oldQueue.version + 1; } return oldQueue.version; } /** * pass and assign the next cycle version(caller should make sure that the newAssignVersion is right) * @param newAssignVersion */ public void nextCycle(int newAssignVersion) { VersionQueue oldQueue = waitQueueRef.getAndSet(new VersionQueue(newAssignVersion)); for (Thread t : oldQueue.queue) LockSupport.unpark(t); } /** * if version update has stopped, invoke this to awake all threads */ public void cancel(){ VersionQueue oldQueue = waitQueueRef.get(); if (waitQueueRef.compareAndSet(oldQueue, new VersionQueue(oldQueue.version, true))) { for (Thread t : oldQueue.queue) LockSupport.unpark(t); } public final int getVersion() { return waitQueueRef.get().version; } private static final class VersionQueue { final private ConcurrentLinkedQueue queue; int version; final boolean isCancelQueue; VersionQueue(int curVersion){ this(curVersion, false); } VersionQueue(int curVersion, boolean isCancelQueue) { this.version = curVersion; this.isCancelQueue = isCancelQueue; queue = new ConcurrentLinkedQueue(); } } /** * * @param assignVersion is myVersion available * @param myVersion wait for this version * @param nanosTimeout wait time(nanosTimeout <=0 means that nanosTimeout is invalid) * @return if timeout, or be canceled and doesn"t reach myVersion, returns false * @throws InterruptedException */ protected boolean awaitImpl(boolean assignVersion, int myVersion, long nanosTimeout) throws InterruptedException { boolean timeOutEnable = nanosTimeout > 0; long lastTime = System.nanoTime(); VersionQueue newQueue = waitQueueRef.get();//A if (assignVersion && newQueue.version - myVersion >= 0) return true; while (true) { VersionQueue submitQueue = newQueue;//B submitQueue.queue.add(Thread.currentThread());//C while (true) { newQueue = waitQueueRef.get();//D if (newQueue != submitQueue){//E: it"s a new cycle if(assignVersion == false) return true; else if(newQueue.version - myVersion >= 0) return true; else if (newQueue.isCancelQueue)//F: be canceled return false; else//just like invoking awaitImpl again break; } if (timeOutEnable) { if (nanosTimeout <= 0) return false; LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; } else LockSupport.park(this); if (Thread.interrupted()) throw new InterruptedException(); } } } }
先分析一下awaitImpl方法,A和D是該方法的關鍵點,決定著它屬于哪一個批次,對應哪一個版本。這里有個小細節,在nexeCycle,cancel解除阻塞時,該線程可能并不在隊列中,因為插入隊列發生在C處,這在A和D之后(雖然看起來C在D之前,但D取到的queue要在下一次循環時才被當作submitQueue),所以,在E處再進行了一次判斷,開始解除阻塞時,舊隊列肯定被新隊列所替換,newQueue != submitQueue一定為真,就會不調用park進行阻塞了,也就不需要解除阻塞,所以即使解除阻塞時,該線程不在隊列中也是沒問題的。
再看E處,當進入一個新的cycle時(當前隊列與提交的隊列不同),a)如果沒指定版本,或者到達或超過了指定版本,則返回true;b)如果當前調用了cancel,則當前隊列的isCancelQueue將為true,則不繼續傻等,返回false;c)或者還未到達指定版本,break,插入到當前隊列中,繼續等待指定版本的到達。
如果沒有進入E處的IF內,則當前線程會被阻塞,直到超時,然后返回false;或被中斷,然后拋出InterruptedException;或被解除阻塞,重新進行E處的判定。
這里還有個小細節,既然cancel時,把當前的隊列設置了isCancelQueue,那么之后指定版本的await會不會也直接返回了呢?其實不會的,因為它若要執行F處的判斷,則先必需通過E處的判定,這意味著,當前隊列已經不是提交時的那個設置了isCancelQueue的隊列了。
代碼中對于cancel的處理,其實并不保證cancel后,之前的await都會被解除阻塞并返回,如果cancel后,緊接著又調用了nextCycle,那么可能某線程感知不到cancel的調用,喚醒后又繼續等待指定的版本。cancel的目的是在于不讓線程傻等,既然恢復版本更新了,那就繼續等待吧。
如果自己維護版本號,則應該保證遞增。另外,版本號的設計,考慮到了int溢出的情況,版本的前后判斷,我不是使用newVersion>=oldVersion,而是newVersion-oldVersion>=0,這樣,版本號就相當于循環使用了,只要兩個比較的版本號的差不超過int的最大值,那么都是正確的,int的最大值可是20多億,幾乎不可能出現跨度這么大的兩個版本號的比較,所以,認為它是正確的。
小結本文講到了一個非阻塞同步算法設計時的小技巧,如果多個變量之間要維護某種特定關系,那么可以將它們封裝到一個類中,再用CAS更新這個類的引用,這樣就達到了:要么都被更新,要么都沒被更新,保持了多個變量之間的一致性。同時需要注意的是,每次更新都必需創建新的包裝對象,假如有其它更好的辦法,應該避免使用該方法。
via ifeve.com
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/64034.html
摘要:黑色的線表示,可在任意狀態下發起主動取消,進入該狀態。所以當線程阻塞時,可能處于停止狀態或者主動取消狀態。非阻塞同步相對于鎖同步而言,由代碼塊,轉為了點,是另一種思考方式。 前言 閱讀本文前,需要讀者對happens-before比較熟悉,了解非阻塞同步的一些基本概念。本文主要為happens-before法則的靈活運用,和一些解決問題的小技巧,分析問題的方式。 背景介紹 原始需...
摘要:前言學習情況記錄時間子目標多線程記錄在學習線程安全知識點中,關于的有關知識點。對于資源競爭嚴重線程沖突嚴重的情況,自旋的概率會比較大,從而浪費更多的資源,效率低于。 前言 學習情況記錄 時間:week 1 SMART子目標 :Java 多線程 記錄在學習線程安全知識點中,關于CAS的有關知識點。 線程安全是指:多個線程不管以何種方式訪問某個類,并且在主調代碼中不需要進行同步,都能表...
摘要:注意這里指的不是當次而是之后,所以如果我們使用隊列的方法返回,就知道隊列是否為空,但是不知道之后是否為空,并且,當關注的操作發生時,在插入或取出操作的返回值里告知此信息,來指導是否繼續注冊寫操作。 前言 本文寫給對ConcurrentLinkedQueue的實現和非阻塞同步算法的實現原理有一定了解,但缺少實踐經驗的朋友,文中包括了實戰中的嘗試、所走的彎路,經驗和教訓。 背景介紹 ...
摘要:如問到是否使用某框架,實際是是問該框架的使用場景,有什么特點,和同類可框架對比一系列的問題。這兩個方向的區分點在于工作方向的側重點不同。 [TOC] 這是一份來自嗶哩嗶哩的Java面試Java面試 32個核心必考點完全解析(完) 課程預習 1.1 課程內容分為三個模塊 基礎模塊: 技術崗位與面試 計算機基礎 JVM原理 多線程 設計模式 數據結構與算法 應用模塊: 常用工具集 ...
摘要:相比與其他操作系統包括其他類系統有很多的優點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。因為多線程競爭鎖時會引起上下文切換。減少線程的使用。很多編程語言中都有協程。所以如何避免死鎖的產生,在我們使用并發編程時至關重要。 系列文章傳送門: Java多線程學習(一)Java多線程入門 Java多線程學習(二)synchronized關鍵字(1) java多線程學習(二)syn...
閱讀 1915·2023-04-26 01:56
閱讀 3112·2021-11-18 10:02
閱讀 3049·2021-09-09 11:35
閱讀 1284·2021-09-03 10:28
閱讀 3408·2019-08-29 18:36
閱讀 2846·2019-08-29 17:14
閱讀 833·2019-08-29 16:10
閱讀 1616·2019-08-26 13:45