摘要:例如,線程需要互相等待,保證所有線程都執行完了之后才能一起通過。獲取正在等待中的線程數注意,這里加了鎖,因為方法可能會被多個線程同時修改。只要有一行沒有處理完,所有的線程都會在處等待,最后一個執行完的線程將會負責喚醒所有等待的線程
前言
系列文章目錄
上一篇 我們學習了基于AQS共享鎖實現的CountDownLatch,本篇我們來看看另一個和它比較像的并發工具CyclicBarrier。
與CountdownLatch的區別 將count值遞減的線程在CountDownLatch中,執行countDown方法的線程和執行await方法的線程不是一類線程。例如,線程M,N需要等待線程A,B,C,D,E執行完成后才能繼續往下執行,則線程A,B,C,D,E執行完成后都將調用countDown方法,使得最后count變為了0,最后一個將count值減為0的線程調用的tryReleaseShared方法會成功返回true,從而調用doReleaseShared()喚醒所有在sync queue中等待共享鎖的線程,這里對應的就是M,N。所以,在CountDownLatch中,執行countDown的線程不會被掛起,調用await方法的線程會阻塞等待共享鎖。
而在CyclicBarrier中,將count值遞減的線程和執行await方法的線程是一類線程,它們在執行完遞減count的操作后,如果count值不為0,則可能同時被掛起。例如,線程A,B,C,D,E需要互相等待,保證所有線程都執行完了之后才能一起通過。
這就好像同一個班級出去春游,到一個景區后先自由活動,一段時間后在指定的地點集合,然后去下一個景點。這里這個指定集合的地點就是CyclicBarrier中的barrier,每一個人到達后都會執行await方法先將需要繼續等待的人數(count)減1,然后(在條件隊列上)掛起等待,當最后一個人到了之后,發現人已經到到齊了,則他負責執行barrierCommand(例如向班主任匯報人已經到齊),接著就喚醒所有還在等待中的線程,開啟新一代。
是否能重復使用CountDownLatch是一次性的,當count值被減為0后,不會被重置;
而CyclicBarrier在線程通過柵欄后,會開啟新的一代,count值會被重置。
CountDownLatch使用的是共享鎖,count值不為0時,線程在sync queue中等待,自始至終只牽涉到sync queue,由于使用共享鎖,喚醒操作不必等待鎖釋放后再進行,喚醒操作很迅速。
CyclicBarrier使用的是獨占鎖,count值不為0時,線程進入condition queue中等待,當count值降為0后,將被signalAll()方法喚醒到sync queue中去,然后挨個去爭鎖(因為是獨占鎖),在前驅節點釋放鎖以后,才能繼續喚醒后繼節點。
private static class Generation { boolean broken = false; } /** The lock for guarding barrier entry */ private final ReentrantLock lock = new ReentrantLock(); /** Condition to wait on until tripped */ private final Condition trip = lock.newCondition(); /** The number of parties */ private final int parties; /* The command to run when tripped */ private final Runnable barrierCommand; /** The current generation */ private Generation generation = new Generation(); /** * Number of parties still waiting. Counts down from parties to 0 * on each generation. It is reset to parties on each new * generation or when broken. */ private int count;
CyclicBarrier的核心屬性共有6個,我們將它分為三組。
第一組:
private final int parties; private int count;
注意,這兩個屬性都是用來表征線程的數量,parties代表了參與線程的總數,即需要一同通過barrier的線程數,它是final類型的,由構造函數初始化,在類被創建后就一直不變了;count屬性和CountDownLatch中的count一樣,代表還需要等待的線程數,初始值為parties,每當一個線程到來就減一,如果該值為0,則說明所有的線程都到齊了,大家可以一起通過barrier了。
第二組:
private final ReentrantLock lock = new ReentrantLock(); private final Condition trip = lock.newCondition(); private Generation generation = new Generation();
這一組代表了CyclicBarrier的基礎實現,即CyclicBarrier是基于獨占鎖ReentrantLock和條件隊列實現的,而不是共享鎖,所有相互等待的線程都會在同樣的條件隊列trip上掛起,被喚醒后將會被添加到sync queue中去爭取獨占鎖lock,獲得鎖的線程將繼續往下執行。
這里還有一個Generation對象,從定義上可以看出,它只有一個boolean類型的broken屬性,關于這個Generation,我們下面分析源碼的時候再詳細講。
第三組:
private final Runnable barrierCommand;
這是一個Runnable對象,代表了一個任務。當所有線程都到齊后,在它們一同通過barrier之前,就會執行這個對象的run方法,因此,它有點類似于一個鉤子方法。當然這個參數不是必須的,如果線程在通過barrier之前沒有什么特別需要處理的事情,該值可以為null。
構造函數CyclicBarrier有兩個構造函數:
public CyclicBarrier(int parties) { this(parties, null); }
public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }
其中,第一個構造函數本質上也是調用了第二個,即如果不傳入Runnable對象,則barrierCommand的值默認為null。
我們可以看出,構造函數就是初始化了parties,count,barrierCommand 三個變量。
輔助方法要理解CyclicBarrier,首先我們需要弄明白它的幾個輔助方法。
首先需要理解的是“代”(Generation)的概念,由于CyclicBarrier是可重復使用的,我們把每一個新的barrier稱為一“代”。這個怎么理解呢,打個比方:一個過山車有10個座位,景區常常需要等夠10個人了,才會去開動過山車。于是我們常常在欄桿(barrier)外面等,等湊夠了10個人,工作人員就把欄桿打開,讓10個人通過;然后再將欄桿歸位,后面新來的人還是要在欄桿外等待。這里,前面已經通過的人就是一“代”,后面再繼續等待的一波人就是另外一“代”,欄桿每打開關閉一次,就產生新一的“代”。
在CyclicBarrier,開啟新的一代使用的是nextGeneration方法:
nextGeneration()private void nextGeneration() { // 喚醒當前這一代中所有等待在條件隊列里的線程 trip.signalAll(); // 恢復count值,開啟新的一代 count = parties; generation = new Generation(); }
該方法用于開啟新的“一代”,通常是被最后一個調用await方法的線程調用。在該方法中,我們的主要工作就是喚醒當前這一代中所有等待在條件隊列里的線程,將count的值恢復為parties,以及開啟新的一代。
breakBarrier()breakBarrier即打破現有的柵欄,讓所有線程通過:
private void breakBarrier() { // 標記broken狀態 generation.broken = true; // 恢復count值 count = parties; // 喚醒當前這一代中所有等待在條件隊列里的線程(因為柵欄已經打破了) trip.signalAll(); }
這個breakBarrier怎么理解呢,繼續拿上面過上車的例子打比方,有時候某個時間段,景區的人比較少,等待過山車的人數湊不夠10個人,眼看后面遲遲沒有人再來,這個時候有的工作人員也會打開柵欄,讓正在等待的人進來坐過山車。這里工作人員的行為就是breakBarrier,由于并不是在湊夠10個人的情況下就開啟了柵欄,我們就把這一代的broken狀態標記為true。
reset()reset方法用于將barrier恢復成初始的狀態,它的內部就是簡單地調用了breakBarrier方法和nextGeneration方法。
public void reset() { final ReentrantLock lock = this.lock; lock.lock(); try { breakBarrier(); // break the current generation nextGeneration(); // start a new generation } finally { lock.unlock(); } }
這里要注意的是,如果在我們執行該方法時有線程正等待在barrier上,則它將立即返回并拋出BrokenBarrierException異常。
另外一點值得注意的是,該方法執行前需要先獲得鎖。
看完前面的輔助方法之后,接下來我們就來看CyclicBarrier最核心的await方法,可以說整個CyclicBarrier最關鍵的只有它了。它也是一個集“countDown”和“阻塞等待”于一體的方法。
await方法有兩種版本,一種帶超時機制,一種不帶,然而從源碼上看,它們最終調用的都是帶超時機制的dowait方法:
public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException { return dowait(true, unit.toNanos(timeout)); }
其中,dowait方法定義如下,它就是整個CyclicBarrier的核心了,我們直接在代碼中以注釋的形式分析:
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 所有執行await方法的線程必須是已經持有了鎖,所以這里必須先獲取鎖 lock.lock(); try { final Generation g = generation; // 前面說過,調用breakBarrier會將當前“代”的broken屬性設為true // 如果一個正在await的線程發現barrier已經被break了,則將直接拋出BrokenBarrierException異常 if (g.broken) throw new BrokenBarrierException(); // 如果當前線程被中斷了,則先將柵欄打破,再拋出InterruptedException // 這么做的原因是,所以等待在barrier的線程都是相互等待的,如果其中一個被中斷了,那其他的就不用等了。 if (Thread.interrupted()) { breakBarrier(); throw new InterruptedException(); } // 當前線程已經來到了柵欄前,先將等待的線程數減一 int index = --count; // 如果等待的線程數為0了,說明所有的parties都到齊了 // 則可以喚醒所有等待的線程,讓大家一起通過柵欄,并重置柵欄 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; if (command != null) // 如果創建CyclicBarrier時傳入了barrierCommand // 說明通過柵欄前有一些額外的工作要做 command.run(); ranAction = true; // 喚醒所有線程,開啟新一代 nextGeneration(); return 0; } finally { if (!ranAction) breakBarrier(); } } // 如果count數不為0,就將當前線程掛起,直到所有的線程到齊,或者超時,或者中斷發生 for (;;) { try { // 如果沒有設定超時機制,則直接調用condition的await方法 if (!timed) trip.await(); // 當前線程在這里被掛起 else if (nanos > 0L) // 如果設了超時,則等待指定的時間 nanos = trip.awaitNanos(nanos); // 當前線程在這里被掛起,超時時間到了就會自動喚醒 } catch (InterruptedException ie) { // 執行到這里說明線程被中斷了 // 如果線程被中斷時還處于當前這一“代”,并且當前這一代還沒有被broken,則先打破柵欄 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // 注意來到這里有兩種情況 // 一種是g!=generation,說明新的一代已經產生了,所以我們沒有必要處理這個中斷,只要再自我中斷一下就好,交給后續的人處理 // 一種是g.broken = true, 說明中斷前柵欄已經被打破了,既然中斷發生時柵欄已經被打破了,也沒有必要再處理這個中斷了 Thread.currentThread().interrupt(); } } // 注意,執行到這里是對應于線程從await狀態被喚醒了 // 這里先檢測broken狀態,能使broken狀態變為true的,只有breakBarrier()方法,到這里對應的場景是 // 1. 其他執行await方法的線程在掛起前就被中斷了 // 2. 其他執行await方法的線程在還處于等待中時被中斷了 // 2. 最后一個到達的線程在執行barrierCommand的時候發生了錯誤 // 4. reset()方法被調用 if (g.broken) throw new BrokenBarrierException(); // 如果線程被喚醒時,新一代已經被開啟了,說明一切正常,直接返回 if (g != generation) return index; // 如果是因為超時時間到了被喚醒,則打破柵欄,返回TimeoutException if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); } }
這個await方法雖然包攬了countDown、阻塞線程、喚醒線程、執行barrierCommand任務、開啟新一代,處理中斷等諸多任務,但是代碼本身還是比較好懂的。
值得注意的是,await方法是有返回值的,代表了線程到達的順序,第一個到達的線程的index為parties - 1,最后一個到達的線程的index為0
工具方法除了重頭戲await方法和它的一些輔助方法,CyclicBarrier還為我們提供了一些工具方法:
(1)獲取參與的線程數parties
public int getParties() { return parties; }
parties 在構造完成后就不會被修改了,因此對它的訪問不需要加鎖。
(2)獲取正在等待中的線程數
public int getNumberWaiting() { final ReentrantLock lock = this.lock; lock.lock(); try { return parties - count; } finally { lock.unlock(); } }
注意,這里加了鎖,因為count方法可能會被多個線程同時修改。
(3)判斷當前barrier是否已經broken
public boolean isBroken() { final ReentrantLock lock = this.lock; lock.lock(); try { return generation.broken; } finally { lock.unlock(); } }
注意,這里同樣要加鎖,因為broken屬性可能被多個線程同時訪問或修改。
實戰為了學以致用,接下來我們就來看看怎么使用這個并發工具,java官方文檔為我們提供了一個使用的范例:
class Solver { final int N; final float[][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run() { while (!done()) { processRow(myRow); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } public Solver(float[][] matrix) { data = matrix; N = matrix.length; Runnable barrierAction = new Runnable() { public void run() { mergeRows(...); } }; barrier = new CyclicBarrier(N, barrierAction); Listthreads = new ArrayList (N); for (int i = 0; i < N; i++) { Thread thread = new Thread(new Worker(i)); threads.add(thread); thread.start(); } // wait until done for (Thread thread : threads) thread.join(); } }
在這個例子中,我們為傳入的matrix數組的每一行都創建了一個線程進行處理,使用了CyclicBarrier來保證只有所有的線程都處理完之后,才會調用mergeRows(...)方法來合并結果。只要有一行沒有處理完,所有的線程都會在barrier.await()處等待,最后一個執行完的線程將會負責喚醒所有等待的線程。
總結CyclicBarrier實現了類似CountDownLatch的邏輯,它可以使得一組線程之間相互等待,直到所有的線程都到齊了之后再繼續往下執行。
CyclicBarrier基于條件隊列和獨占鎖來實現,而非共享鎖。
CyclicBarrier可重復使用,在所有線程都到齊了一起通過后,將會開啟新的一代。
CyclicBarrier使用了“all-or-none breakage model”,所有互相等待的線程,要么一起通過barrier,要么一個都不要通過,如果有一個線程因為中斷,失敗或者超時而過早的離開了barrier,則該barrier會被broken掉,所有等待在該barrier上的線程都會拋出BrokenBarrierException(或者InterruptedException)。
(完)
系列文章目錄
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77278.html
摘要:為了避免一篇文章的篇幅過長,于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會持續更新,以給大家一個查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學習經歷。因為寫作的時候發現,為了弄懂一個知識,不得不先去了解另外一些知識,這樣以來,為了說明一個問題,就要把一系列知識都了解一遍,寫出來的文章就特別長。 為了避免一篇...
摘要:創建線程的方式方式一將類聲明為的子類。將該線程標記為守護線程或用戶線程。其中方法隱含的線程為父線程。恢復線程,已過時。等待該線程銷毀終止。更多的使當前線程在鎖存器倒計數至零之前一直等待,除非線 知識體系圖: showImg(https://segmentfault.com/img/bVbef6v?w=1280&h=960); 1、線程是什么? 線程是進程中獨立運行的子任務。 2、創建線...
摘要:相較于方法,提供了超時等待機制注意,在方法中,我們用到了的返回值,如果該方法因為超時而退出時,則將返回。的這個返回值有助于我們理解該方法究竟是因為獲取到了鎖而返回,還是因為超時時間到了而返回。 前言 系列文章目錄 CountDownLatch是一個很有用的工具,latch是門閂的意思,該工具是為了解決某些操作只能在一組操作全部執行完成后才能執行的情景。例如,小組早上開會,只有等所有人...
摘要:在創建對象時,需要轉入一個值,用于初始化的成員變量,該成員變量表示屏障攔截的線程數。當到達屏障的線程數小于時,這些線程都會被阻塞住。當所有線程到達屏障后,將會被更新,表示進入新一輪的運行輪次中。 1.簡介 在分析完AbstractQueuedSynchronizer(以下簡稱 AQS)和ReentrantLock的原理后,本文將分析 java.util.concurrent 包下的兩個...
閱讀 2365·2023-04-25 20:07
閱讀 3302·2021-11-25 09:43
閱讀 3661·2021-11-16 11:44
閱讀 2529·2021-11-08 13:14
閱讀 3177·2021-10-19 11:46
閱讀 894·2021-09-28 09:36
閱讀 2975·2021-09-22 10:56
閱讀 2374·2021-09-10 10:51