国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專(zhuān)欄INFORMATION COLUMN

線程間的同步與通信(6)——CountDownLatch源碼分析

longmon / 3290人閱讀

摘要:相較于方法,提供了超時(shí)等待機(jī)制注意,在方法中,我們用到了的返回值,如果該方法因?yàn)槌瑫r(shí)而退出時(shí),則將返回。的這個(gè)返回值有助于我們理解該方法究竟是因?yàn)楂@取到了鎖而返回,還是因?yàn)槌瑫r(shí)時(shí)間到了而返回。

前言

系列文章目錄

CountDownLatch是一個(gè)很有用的工具,latch是門(mén)閂的意思,該工具是為了解決某些操作只能在一組操作全部執(zhí)行完成后才能執(zhí)行的情景。例如,小組早上開(kāi)會(huì),只有等所有人到齊了才能開(kāi);再如,游樂(lè)園里的過(guò)山車(chē),一次可以坐10個(gè)人,為了節(jié)約成本,通常是等夠10個(gè)人了才開(kāi)。CountDown是倒數(shù)計(jì)數(shù),所以CountDownLatch的用法通常是設(shè)定一個(gè)大于0的值,該值即代表需要等待的總?cè)蝿?wù)數(shù),每完成一個(gè)任務(wù)后,將總?cè)蝿?wù)數(shù)減一,直到最后該值為0,說(shuō)明所有等待的任務(wù)都執(zhí)行完了,“門(mén)閂”此時(shí)就被打開(kāi),后面的任務(wù)可以繼續(xù)執(zhí)行。

CountDownLatch本身是基于共享鎖實(shí)現(xiàn)的,如果你還不了解共享鎖,建議先讀一下逐行分析AQS源碼(3)——共享鎖的獲取與釋放,然后再繼續(xù)往下看。

核心屬性

CountDownLatch主要是通過(guò)AQS的共享鎖機(jī)制實(shí)現(xiàn)的,因此它的核心屬性只有一個(gè)sync,它繼承自AQS,同時(shí)覆寫(xiě)了tryAcquireSharedtryReleaseShared,以完成具體的實(shí)現(xiàn)共享鎖的獲取與釋放的邏輯。

private final Sync sync;
/**
 * Synchronization control For CountDownLatch.
 * Uses AQS state to represent count.
 */
private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
        setState(count);
    }

    int getCount() {
        return getState();
    }

    protected int tryAcquireShared(int acquires) {
        return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
        // Decrement count; signal when transition to zero
        for (;;) {
            int c = getState();
            if (c == 0)
                return false;
            int nextc = c-1;
            if (compareAndSetState(c, nextc))
                return nextc == 0;
        }
    }
}
構(gòu)造函數(shù)
public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

在構(gòu)造函數(shù)中,我們就是簡(jiǎn)單傳入了一個(gè)不小于0的任務(wù)數(shù),由上面Sync的構(gòu)造函數(shù)可知,這個(gè)任務(wù)數(shù)就是AQS的state的初始值。

核心方法

CountDownLatch最核心的方法只有兩個(gè),一個(gè)是countDown方法,每調(diào)用一次,就會(huì)將當(dāng)前的count減一,當(dāng)count值為0時(shí),就會(huì)喚醒所有等待中的線程;另一個(gè)是await方法,它有兩種形式,一種是阻塞式,一種是帶超時(shí)機(jī)制的形式,該方法用于將當(dāng)前等待“門(mén)閂”開(kāi)啟的線程掛起,直到count值為0,這一點(diǎn)很類(lèi)似于條件隊(duì)列,相當(dāng)于等待的條件就是count值為0,然而其底層的實(shí)現(xiàn)并不是用條件隊(duì)列,而是共享鎖。

countDown()
public void countDown() {
    sync.releaseShared(1);
}

前面說(shuō)過(guò),countDown()方法的目的就是將count值減一,并且在count值為0時(shí),喚醒所有等待的線程,它內(nèi)部調(diào)用的其實(shí)是釋放共享鎖的操作:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

該方法由AQS實(shí)現(xiàn),但是tryReleaseShared方法由Sync類(lèi)自己實(shí)現(xiàn):

protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

該方法的實(shí)現(xiàn)很簡(jiǎn)單,就是獲取當(dāng)前的state值,如果已經(jīng)為0了,直接返回false;否則通過(guò)CAS操作將state值減一,之后返回的是nextc == 0,由此可見(jiàn),該方法只有在count值原來(lái)不為0,但是調(diào)用后變?yōu)?時(shí),才會(huì)返回true,否則返回false,并且也可以看出,該方法在返回true之后,后面如果再次調(diào)用,還是會(huì)返回false。也就是說(shuō),調(diào)用該方法只有一種情況會(huì)返回true,那就是state值從大于0變?yōu)?值時(shí),這時(shí)也是所有在門(mén)閂前的任務(wù)都完成了。

tryReleaseShared返回true以后,將調(diào)用doReleaseShared方法喚醒所有等待中的線程,該方法我們?cè)谇懊娴奈恼轮幸呀?jīng)詳細(xì)分析過(guò)了,這里就不再贅述了。

值得一提的是,我們其實(shí)并不關(guān)心releaseShared的返回值,而只關(guān)心tryReleaseShared的返回值,或者只關(guān)心count到0了沒(méi)有,這里更像是借了共享鎖的“殼”,來(lái)完成我們的目的,事實(shí)上我們完全可以自己設(shè)一個(gè)全局變量count來(lái)實(shí)現(xiàn)相同的效果,只不過(guò)對(duì)這個(gè)全局變量的操作也必須使用CAS。

await()

與Condition的await()方法的語(yǔ)義相同,該方法是阻塞式地等待,并且是響應(yīng)中斷的,只不過(guò)它不是在等待signal操作,而是在等待count值為0:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

可見(jiàn),await方法內(nèi)部調(diào)用的是acquireSharedInterruptibly方法,相當(dāng)于借用了獲取共享鎖的“殼”:

public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

我們來(lái)回憶一下獨(dú)占模式下對(duì)應(yīng)的方法:

public final void acquireInterruptibly(int arg) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

可見(jiàn),兩者用的是同一個(gè)框架,只是這里:

tryAcquire(arg) 換成了 tryAcquireShared(arg) (子類(lèi)實(shí)現(xiàn))

doAcquireInterruptibly(arg) 換成了 doAcquireSharedInterruptibly(arg) (AQS提供)

我們先來(lái)看看Sync子類(lèi)對(duì)于tryAcquireShared的實(shí)現(xiàn):

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

該方法似乎有點(diǎn)掛羊頭賣(mài)狗肉的感覺(jué)——所謂的獲取共享鎖,事實(shí)上并不是什么搶鎖的行為,沒(méi)有任何CAS操作,它就是判斷當(dāng)前的state值是不是0,是就返回1,不是就返回-1。

值得注意的是,在逐行分析AQS源碼(3)——共享鎖的獲取與釋放中我們特別提到過(guò)tryAcquireShared返回值的含義:

如果該值小于0,則代表當(dāng)前線程獲取共享鎖失敗

如果該值大于0,則代表當(dāng)前線程獲取共享鎖成功,并且接下來(lái)其他線程嘗試獲取共享鎖的行為很可能成功

如果該值等于0,則代表當(dāng)前線程獲取共享鎖成功,但是接下來(lái)其他線程嘗試獲取共享鎖的行為會(huì)失敗

所以,當(dāng)該方法的返回值不小于0時(shí),就說(shuō)明搶鎖成功,可以直接退出了,所對(duì)應(yīng)的就是count值已經(jīng)為0,所有等待的事件都滿足了。否則,我們調(diào)用doAcquireSharedInterruptibly(arg)將當(dāng)前線程封裝成Node,丟到sync queue中去阻塞等待:

private void doAcquireSharedInterruptibly(int arg) throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

在前面我們介紹共享鎖的獲取時(shí),已經(jīng)分析過(guò)了doAcquireShared方法,只是它是不拋出InterruptedException的,doAcquireSharedInterruptibly(arg)是它的可中斷版本,我們可以直接對(duì)比一下:

可見(jiàn),它們僅僅是在對(duì)待中斷的處理方式上有所不同,其他部分都是一樣的,由于doAcquireShared在前面的文章中我們已經(jīng)詳細(xì)分析過(guò)了,這里就不再贅述了。

await(long timeout, TimeUnit unit)

相較于await()方法,await(long timeout, TimeUnit unit)提供了超時(shí)等待機(jī)制:

public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
    return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    return tryAcquireShared(arg) >= 0 || doAcquireSharedNanos(arg, nanosTimeout);
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout) throws InterruptedException {
    if (nanosTimeout <= 0L)
        return false;
    final long deadline = System.nanoTime() + nanosTimeout;
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
            }
            nanosTimeout = deadline - System.nanoTime();
            if (nanosTimeout <= 0L)
                return false;
            if (shouldParkAfterFailedAcquire(p, node) &&
                nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

注意,在tryAcquireSharedNanos方法中,我們用到了doAcquireSharedNanos的返回值,如果該方法因?yàn)槌瑫r(shí)而退出時(shí),則將返回false。由于await()方法是阻塞式的,也就是說(shuō)沒(méi)有獲取到鎖是不會(huì)退出的,因此它沒(méi)有返回值,換句話說(shuō),如果它正常返回了,則一定是因?yàn)楂@取到了鎖而返回; 而await(long timeout, TimeUnit unit)由于有了超時(shí)機(jī)制,它是有返回值的,返回值為true則表示獲取鎖成功,為false則表示獲取鎖失敗。doAcquireSharedNanos的這個(gè)返回值有助于我們理解該方法究竟是因?yàn)楂@取到了鎖而返回,還是因?yàn)槌瑫r(shí)時(shí)間到了而返回。

至于doAcquireSharedNanos的實(shí)現(xiàn)細(xì)節(jié),由于他和doAcquireSharedInterruptibly相比只是多了一個(gè)超時(shí)機(jī)制:

代碼本身很簡(jiǎn)單,就不贅述了。

實(shí)戰(zhàn)

接下來(lái)我們來(lái)學(xué)習(xí)一個(gè)使用CountDownLatch的實(shí)際例子,Java的官方源碼已經(jīng)為我們提供了一個(gè)使用的示例代碼:

class Driver { // ...
    void main() throws InterruptedException {
        CountDownLatch startSignal = new CountDownLatch(1);
        CountDownLatch doneSignal = new CountDownLatch(N);

        for (int i = 0; i < N; ++i) // create and start threads
            new Thread(new Worker(startSignal, doneSignal)).start();

        doSomethingElse();            // don"t let run yet
        startSignal.countDown();      // let all threads proceed
        doSomethingElse();
        doneSignal.await();           // wait for all to finish
    }
}

class Worker implements Runnable {
    private final CountDownLatch startSignal;
    private final CountDownLatch doneSignal;

    Worker(CountDownLatch startSignal, CountDownLatch doneSignal) {
        this.startSignal = startSignal;
        this.doneSignal = doneSignal;
    }

    public void run() {
        try {
            startSignal.await();
            doWork();
            doneSignal.countDown();
        } catch (InterruptedException ex) {
        } // return;
    }

    void doWork() { ...}
}

在這個(gè)例子中,有兩個(gè)“閘門(mén)”,一個(gè)是CountDownLatch startSignal = new CountDownLatch(1),它開(kāi)啟后,等待在這個(gè)“閘門(mén)”上的任務(wù)才能開(kāi)始運(yùn)行;另一個(gè)“閘門(mén)”是CountDownLatch doneSignal = new CountDownLatch(N), 它表示等待N個(gè)任務(wù)都執(zhí)行完成后,才能繼續(xù)往下。

Worker實(shí)現(xiàn)了Runnable接口,代表了要執(zhí)行的任務(wù),在它的run方法中,我們先調(diào)用了startSignal.await(),等待startSignal這一“閘門(mén)”開(kāi)啟,閘門(mén)開(kāi)啟后,我們就執(zhí)行自己的任務(wù),任務(wù)完成后再執(zhí)行doneSignal.countDown(),將等待的總?cè)蝿?wù)數(shù)減一。

代碼本身的邏輯非常簡(jiǎn)單好懂,這里不贅述了。

總結(jié)

CountDownLatch相當(dāng)于一個(gè)“門(mén)栓”,一個(gè)“閘門(mén)”,只有它開(kāi)啟了,代碼才能繼續(xù)往下執(zhí)行。通常情況下,如果當(dāng)前線程需要等其他線程執(zhí)行完成后才能執(zhí)行,我們就可以使用CountDownLatch。

使用CountDownLatch#await方法阻塞等待一個(gè)“閘門(mén)”的開(kāi)啟。

使用CountDownLatch#countDown方法減少閘門(mén)所等待的任務(wù)數(shù)。

CountDownLatch基于共享鎖實(shí)現(xiàn)。

CountDownLatch是一次性的,“閘門(mén)”開(kāi)啟后,無(wú)法再重復(fù)使用,如果想重復(fù)使用,應(yīng)該用[CyclicBarrier]()

(完)

系列文章目錄

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/77262.html

相關(guān)文章

  • 線程間的同步通信(7)——CyclicBarrier源碼分析

    摘要:例如,線程需要互相等待,保證所有線程都執(zhí)行完了之后才能一起通過(guò)。獲取正在等待中的線程數(shù)注意,這里加了鎖,因?yàn)榉椒赡軙?huì)被多個(gè)線程同時(shí)修改。只要有一行沒(méi)有處理完,所有的線程都會(huì)在處等待,最后一個(gè)執(zhí)行完的線程將會(huì)負(fù)責(zé)喚醒所有等待的線程 前言 系列文章目錄 上一篇 我們學(xué)習(xí)了基于AQS共享鎖實(shí)現(xiàn)的CountDownLatch,本篇我們來(lái)看看另一個(gè)和它比較像的并發(fā)工具CyclicBarrier...

    freewolf 評(píng)論0 收藏0
  • 系列文章目錄

    摘要:為了避免一篇文章的篇幅過(guò)長(zhǎng),于是一些比較大的主題就都分成幾篇來(lái)講了,這篇文章是筆者所有文章的目錄,將會(huì)持續(xù)更新,以給大家一個(gè)查看系列文章的入口。 前言 大家好,筆者是今年才開(kāi)始寫(xiě)博客的,寫(xiě)作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因?yàn)閷?xiě)作的時(shí)候發(fā)現(xiàn),為了弄懂一個(gè)知識(shí),不得不先去了解另外一些知識(shí),這樣以來(lái),為了說(shuō)明一個(gè)問(wèn)題,就要把一系列知識(shí)都了解一遍,寫(xiě)出來(lái)的文章就特別長(zhǎng)。 為了避免一篇...

    lijy91 評(píng)論0 收藏0
  • 系列文章目錄

    摘要:為了避免一篇文章的篇幅過(guò)長(zhǎng),于是一些比較大的主題就都分成幾篇來(lái)講了,這篇文章是筆者所有文章的目錄,將會(huì)持續(xù)更新,以給大家一個(gè)查看系列文章的入口。 前言 大家好,筆者是今年才開(kāi)始寫(xiě)博客的,寫(xiě)作的初衷主要是想記錄和分享自己的學(xué)習(xí)經(jīng)歷。因?yàn)閷?xiě)作的時(shí)候發(fā)現(xiàn),為了弄懂一個(gè)知識(shí),不得不先去了解另外一些知識(shí),這樣以來(lái),為了說(shuō)明一個(gè)問(wèn)題,就要把一系列知識(shí)都了解一遍,寫(xiě)出來(lái)的文章就特別長(zhǎng)。 為了避免一篇...

    Yumenokanata 評(píng)論0 收藏0
  • Java 多線程并發(fā)編程面試筆錄一覽

    摘要:創(chuàng)建線程的方式方式一將類(lèi)聲明為的子類(lèi)。將該線程標(biāo)記為守護(hù)線程或用戶(hù)線程。其中方法隱含的線程為父線程。恢復(fù)線程,已過(guò)時(shí)。等待該線程銷(xiāo)毀終止。更多的使當(dāng)前線程在鎖存器倒計(jì)數(shù)至零之前一直等待,除非線 知識(shí)體系圖: showImg(https://segmentfault.com/img/bVbef6v?w=1280&h=960); 1、線程是什么? 線程是進(jìn)程中獨(dú)立運(yùn)行的子任務(wù)。 2、創(chuàng)建線...

    bitkylin 評(píng)論0 收藏0
  • Java 線程同步組件 CountDownLatch CyclicBarrier 原理分析

    摘要:在創(chuàng)建對(duì)象時(shí),需要轉(zhuǎn)入一個(gè)值,用于初始化的成員變量,該成員變量表示屏障攔截的線程數(shù)。當(dāng)?shù)竭_(dá)屏障的線程數(shù)小于時(shí),這些線程都會(huì)被阻塞住。當(dāng)所有線程到達(dá)屏障后,將會(huì)被更新,表示進(jìn)入新一輪的運(yùn)行輪次中。 1.簡(jiǎn)介 在分析完AbstractQueuedSynchronizer(以下簡(jiǎn)稱(chēng) AQS)和ReentrantLock的原理后,本文將分析 java.util.concurrent 包下的兩個(gè)...

    Anonymous1 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<