摘要:相較于方法,提供了超時(shí)等待機(jī)制注意,在方法中,我們用到了的返回值,如果該方法因?yàn)槌瑫r(shí)而退出時(shí),則將返回。的這個(gè)返回值有助于我們理解該方法究竟是因?yàn)楂@取到了鎖而返回,還是因?yàn)槌瑫r(shí)時(shí)間到了而返回。
前言
系列文章目錄
CountDownLatch是一個(gè)很有用的工具,latch是門(mén)閂的意思,該工具是為了解決某些操作只能在一組操作全部執(zhí)行完成后才能執(zhí)行的情景。例如,小組早上開(kāi)會(huì),只有等所有人到齊了才能開(kāi);再如,游樂(lè)園里的過(guò)山車(chē),一次可以坐10個(gè)人,為了節(jié)約成本,通常是等夠10個(gè)人了才開(kāi)。CountDown是倒數(shù)計(jì)數(shù),所以CountDownLatch的用法通常是設(shè)定一個(gè)大于0的值,該值即代表需要等待的總?cè)蝿?wù)數(shù),每完成一個(gè)任務(wù)后,將總?cè)蝿?wù)數(shù)減一,直到最后該值為0,說(shuō)明所有等待的任務(wù)都執(zhí)行完了,“門(mén)閂”此時(shí)就被打開(kāi),后面的任務(wù)可以繼續(xù)執(zhí)行。
CountDownLatch本身是基于共享鎖實(shí)現(xiàn)的,如果你還不了解共享鎖,建議先讀一下逐行分析AQS源碼(3)——共享鎖的獲取與釋放,然后再繼續(xù)往下看。
核心屬性CountDownLatch主要是通過(guò)AQS的共享鎖機(jī)制實(shí)現(xiàn)的,因此它的核心屬性只有一個(gè)sync,它繼承自AQS,同時(shí)覆寫(xiě)了tryAcquireShared和tryReleaseShared,以完成具體的實(shí)現(xiàn)共享鎖的獲取與釋放的邏輯。
private final Sync sync;
/** * Synchronization control For CountDownLatch. * Uses AQS state to represent count. */ private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }構(gòu)造函數(shù)
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
在構(gòu)造函數(shù)中,我們就是簡(jiǎn)單傳入了一個(gè)不小于0的任務(wù)數(shù),由上面Sync的構(gòu)造函數(shù)可知,這個(gè)任務(wù)數(shù)就是AQS的state的初始值。
核心方法CountDownLatch最核心的方法只有兩個(gè),一個(gè)是countDown方法,每調(diào)用一次,就會(huì)將當(dāng)前的count減一,當(dāng)count值為0時(shí),就會(huì)喚醒所有等待中的線程;另一個(gè)是await方法,它有兩種形式,一種是阻塞式,一種是帶超時(shí)機(jī)制的形式,該方法用于將當(dāng)前等待“門(mén)閂”開(kāi)啟的線程掛起,直到count值為0,這一點(diǎn)很類(lèi)似于條件隊(duì)列,相當(dāng)于等待的條件就是count值為0,然而其底層的實(shí)現(xiàn)并不是用條件隊(duì)列,而是共享鎖。
countDown()public void countDown() { sync.releaseShared(1); }
前面說(shuō)過(guò),countDown()方法的目的就是將count值減一,并且在count值為0時(shí),喚醒所有等待的線程,它內(nèi)部調(diào)用的其實(shí)是釋放共享鎖的操作:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
該方法由AQS實(shí)現(xiàn),但是tryReleaseShared方法由Sync類(lèi)自己實(shí)現(xiàn):
protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }
該方法的實(shí)現(xiàn)很簡(jiǎn)單,就是獲取當(dāng)前的state值,如果已經(jīng)為0了,直接返回false;否則通過(guò)CAS操作將state值減一,之后返回的是nextc == 0,由此可見(jiàn),該方法只有在count值原來(lái)不為0,但是調(diào)用后變?yōu)?時(shí),才會(huì)返回true,否則返回false,并且也可以看出,該方法在返回true之后,后面如果再次調(diào)用,還是會(huì)返回false。也就是說(shuō),調(diào)用該方法只有一種情況會(huì)返回true,那就是state值從大于0變?yōu)?值時(shí),這時(shí)也是所有在門(mén)閂前的任務(wù)都完成了。
在tryReleaseShared返回true以后,將調(diào)用doReleaseShared方法喚醒所有等待中的線程,該方法我們?cè)谇懊娴奈恼轮幸呀?jīng)詳細(xì)分析過(guò)了,這里就不再贅述了。
值得一提的是,我們其實(shí)并不關(guān)心releaseShared的返回值,而只關(guān)心tryReleaseShared的返回值,或者只關(guān)心count到0了沒(méi)有,這里更像是借了共享鎖的“殼”,來(lái)完成我們的目的,事實(shí)上我們完全可以自己設(shè)一個(gè)全局變量count來(lái)實(shí)現(xiàn)相同的效果,只不過(guò)對(duì)這個(gè)全局變量的操作也必須使用CAS。
await()與Condition的await()方法的語(yǔ)義相同,該方法是阻塞式地等待,并且是響應(yīng)中斷的,只不過(guò)它不是在等待signal操作,而是在等待count值為0:
public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
可見(jiàn),await方法內(nèi)部調(diào)用的是acquireSharedInterruptibly方法,相當(dāng)于借用了獲取共享鎖的“殼”:
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
我們來(lái)回憶一下獨(dú)占模式下對(duì)應(yīng)的方法:
public final void acquireInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
可見(jiàn),兩者用的是同一個(gè)框架,只是這里:
tryAcquire(arg) 換成了 tryAcquireShared(arg) (子類(lèi)實(shí)現(xiàn))
doAcquireInterruptibly(arg) 換成了 doAcquireSharedInterruptibly(arg) (AQS提供)
我們先來(lái)看看Sync子類(lèi)對(duì)于tryAcquireShared的實(shí)現(xiàn):
protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; }
該方法似乎有點(diǎn)掛羊頭賣(mài)狗肉的感覺(jué)——所謂的獲取共享鎖,事實(shí)上并不是什么搶鎖的行為,沒(méi)有任何CAS操作,它就是判斷當(dāng)前的state值是不是0,是就返回1,不是就返回-1。
值得注意的是,在逐行分析AQS源碼(3)——共享鎖的獲取與釋放中我們特別提到過(guò)tryAcquireShared返回值的含義:
如果該值小于0,則代表當(dāng)前線程獲取共享鎖失敗
如果該值大于0,則代表當(dāng)前線程獲取共享鎖成功,并且接下來(lái)其他線程嘗試獲取共享鎖的行為很可能成功
如果該值等于0,則代表當(dāng)前線程獲取共享鎖成功,但是接下來(lái)其他線程嘗試獲取共享鎖的行為會(huì)失敗
所以,當(dāng)該方法的返回值不小于0時(shí),就說(shuō)明搶鎖成功,可以直接退出了,所對(duì)應(yīng)的就是count值已經(jīng)為0,所有等待的事件都滿足了。否則,我們調(diào)用doAcquireSharedInterruptibly(arg)將當(dāng)前線程封裝成Node,丟到sync queue中去阻塞等待:
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
在前面我們介紹共享鎖的獲取時(shí),已經(jīng)分析過(guò)了doAcquireShared方法,只是它是不拋出InterruptedException的,doAcquireSharedInterruptibly(arg)是它的可中斷版本,我們可以直接對(duì)比一下:
可見(jiàn),它們僅僅是在對(duì)待中斷的處理方式上有所不同,其他部分都是一樣的,由于doAcquireShared在前面的文章中我們已經(jīng)詳細(xì)分析過(guò)了,這里就不再贅述了。
await(long timeout, TimeUnit unit)相較于await()方法,await(long timeout, TimeUnit unit)提供了超時(shí)等待機(jī)制:
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout); }
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException { if (nanosTimeout <= 0L) return false; final long deadline = System.nanoTime() + nanosTimeout; final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return true; } } nanosTimeout = deadline - System.nanoTime(); if (nanosTimeout <= 0L) return false; if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
注意,在tryAcquireSharedNanos方法中,我們用到了doAcquireSharedNanos的返回值,如果該方法因?yàn)槌瑫r(shí)而退出時(shí),則將返回false。由于await()方法是阻塞式的,也就是說(shuō)沒(méi)有獲取到鎖是不會(huì)退出的,因此它沒(méi)有返回值,換句話說(shuō),如果它正常返回了,則一定是因?yàn)楂@取到了鎖而返回; 而await(long timeout, TimeUnit unit)由于有了超時(shí)機(jī)制,它是有返回值的,返回值為true則表示獲取鎖成功,為false則表示獲取鎖失敗。doAcquireSharedNanos的這個(gè)返回值有助于我們理解該方法究竟是因?yàn)楂@取到了鎖而返回,還是因?yàn)槌瑫r(shí)時(shí)間到了而返回。
至于doAcquireSharedNanos的實(shí)現(xiàn)細(xì)節(jié),由于他和doAcquireSharedInterruptibly相比只是多了一個(gè)超時(shí)機(jī)制:
代碼本身很簡(jiǎn)單,就不贅述了。
實(shí)戰(zhàn)接下來(lái)我們來(lái)學(xué)習(xí)一個(gè)使用CountDownLatch的實(shí)際例子,Java的官方源碼已經(jīng)為我們提供了一個(gè)使用的示例代碼:
class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); // don"t let run yet startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) { } // return; } void doWork() { ...} }
在這個(gè)例子中,有兩個(gè)“閘門(mén)”,一個(gè)是CountDownLatch startSignal = new CountDownLatch(1),它開(kāi)啟后,等待在這個(gè)“閘門(mén)”上的任務(wù)才能開(kāi)始運(yùn)行;另一個(gè)“閘門(mén)”是CountDownLatch doneSignal = new CountDownLatch(N), 它表示等待N個(gè)任務(wù)都執(zhí)行完成后,才能繼續(xù)往下。
Worker實(shí)現(xiàn)了Runnable接口,代表了要執(zhí)行的任務(wù),在它的run方法中,我們先調(diào)用了startSignal.await(),等待startSignal這一“閘門(mén)”開(kāi)啟,閘門(mén)開(kāi)啟后,我們就執(zhí)行自己的任務(wù),任務(wù)完成后再執(zhí)行doneSignal.countDown(),將等待的總?cè)蝿?wù)數(shù)減一。
代碼本身的邏輯非常簡(jiǎn)單好懂,這里不贅述了。
總結(jié)CountDownLatch相當(dāng)于一個(gè)“門(mén)栓”,一個(gè)“閘門(mén)”,只有它開(kāi)啟了,代碼才能繼續(xù)往下執(zhí)行。通常情況下,如果當(dāng)前線程需要等其他線程執(zhí)行完成后才能執(zhí)行,我們就可以使用CountDownLatch。
使用CountDownLatch#await方法阻塞等待一個(gè)“閘門(mén)”的開(kāi)啟。
使用CountDownLatch#countDown方法減少閘門(mén)所等待的任務(wù)數(shù)。
CountDownLatch基于共享鎖實(shí)現(xiàn)。
CountDownLatch是一次性的,“閘門(mén)”開(kāi)啟后,無(wú)法再重復(fù)使用,如果想重復(fù)使用,應(yīng)該用[CyclicBarrier]()
(完)
系列文章目錄
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/77262.html
摘要:例如,線程需要互相等待,保證所有線程都執(zhí)行完了之后才能一起通過(guò)。獲取正在等待中的線程數(shù)注意,這里加了鎖,因?yàn)榉椒赡軙?huì)被多個(gè)線程同時(shí)修改。只要有一行沒(méi)有處理完,所有的線程都會(huì)在處等待,最后一個(gè)執(zhí)行完的線程將會(huì)負(fù)責(zé)喚醒所有等待的線程 前言 系列文章目錄 上一篇 我們學(xué)習(xí)了基于AQS共享鎖實(shí)現(xiàn)的CountDownLatch,本篇我們來(lái)看看另一個(gè)和它比較像的并發(fā)工具CyclicBarrier...
摘要:為了避免一篇文章的篇幅過(guò)長(zhǎng),于是一些比較大的主題就都分成幾篇來(lái)講了,這篇文章是筆者所有文章的目錄,將會(huì)持續(xù)更新,以給大家一個(gè)查看系列文章的入口。 前言 大家好,筆者是今年才開(kāi)始寫(xiě)博客的,寫(xiě)作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因?yàn)閷?xiě)作的時(shí)候發(fā)現(xiàn),為了弄懂一個(gè)知識(shí),不得不先去了解另外一些知識(shí),這樣以來(lái),為了說(shuō)明一個(gè)問(wèn)題,就要把一系列知識(shí)都了解一遍,寫(xiě)出來(lái)的文章就特別長(zhǎng)。 為了避免一篇...
摘要:為了避免一篇文章的篇幅過(guò)長(zhǎng),于是一些比較大的主題就都分成幾篇來(lái)講了,這篇文章是筆者所有文章的目錄,將會(huì)持續(xù)更新,以給大家一個(gè)查看系列文章的入口。 前言 大家好,筆者是今年才開(kāi)始寫(xiě)博客的,寫(xiě)作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因?yàn)閷?xiě)作的時(shí)候發(fā)現(xiàn),為了弄懂一個(gè)知識(shí),不得不先去了解另外一些知識(shí),這樣以來(lái),為了說(shuō)明一個(gè)問(wèn)題,就要把一系列知識(shí)都了解一遍,寫(xiě)出來(lái)的文章就特別長(zhǎng)。 為了避免一篇...
摘要:創(chuàng)建線程的方式方式一將類(lèi)聲明為的子類(lèi)。將該線程標(biāo)記為守護(hù)線程或用戶(hù)線程。其中方法隱含的線程為父線程。恢復(fù)線程,已過(guò)時(shí)。等待該線程銷(xiāo)毀終止。更多的使當(dāng)前線程在鎖存器倒計(jì)數(shù)至零之前一直等待,除非線 知識(shí)體系圖: showImg(https://segmentfault.com/img/bVbef6v?w=1280&h=960); 1、線程是什么? 線程是進(jìn)程中獨(dú)立運(yùn)行的子任務(wù)。 2、創(chuàng)建線...
摘要:在創(chuàng)建對(duì)象時(shí),需要轉(zhuǎn)入一個(gè)值,用于初始化的成員變量,該成員變量表示屏障攔截的線程數(shù)。當(dāng)?shù)竭_(dá)屏障的線程數(shù)小于時(shí),這些線程都會(huì)被阻塞住。當(dāng)所有線程到達(dá)屏障后,將會(huì)被更新,表示進(jìn)入新一輪的運(yùn)行輪次中。 1.簡(jiǎn)介 在分析完AbstractQueuedSynchronizer(以下簡(jiǎn)稱(chēng) AQS)和ReentrantLock的原理后,本文將分析 java.util.concurrent 包下的兩個(gè)...
閱讀 3609·2021-11-15 11:37
閱讀 2974·2021-11-12 10:36
閱讀 4403·2021-09-22 15:51
閱讀 2381·2021-08-27 16:18
閱讀 881·2019-08-30 15:44
閱讀 2163·2019-08-30 10:58
閱讀 1769·2019-08-29 17:18
閱讀 3269·2019-08-28 18:25