国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

非阻塞同步算法實戰(二):BoundlessCyclicBarrier

yintaolaowanzi / 2184人閱讀

摘要:如果停止了版本更新,可使用方法來解除所有因而阻塞的線程,包括指定版本號的。如果自己維護版本號,則應該保證遞增。

前言

相比上一篇而言,本文不需要太多的準備知識,但技巧性更強一些。因為分析、設計的過程比較復雜繁瑣,也限于篇幅,所以,主要展示如何解決這些需求,和講解代碼。另外,所講的內容也是后一篇實戰中需要用到的一個工具類。

需求介紹

我需要編寫一個同步工具,它需要提供這樣幾個方法:await、pass、cancel。某個線程調用await時,會被阻塞;當調用pass方法時,之前因為await而阻塞的線程將全部被解除阻塞,之后調用await的線程繼續被阻塞,直到下一次調用pass。

該工具同時還維護一個版本號,await方法可以帶一個目標版本號,如果當前的版本號比目標版本號新或相同,則直接通過,否則,阻塞本線程,直到到達或超過目標版本。調用pass的時候,更新版本號。

如果停止了版本更新,可使用cancel方法來解除所有因await而阻塞的線程,包括指定版本號的。此方法用于避免無謂地等待。若await發生在cancel之后,則仍將被阻塞。

因為CountDownLatch不允許重復使用,CyclicBarrier只支持固定個數的線程,并且都沒有維護一個版本號,所以沒有已有的類能實現上面的需求,需要自己實現。

問題分析

簡單分析可知,應該維護一個隊列,來保存當前被阻塞的線程,用于在pass時對它們一一解除阻塞,pass時應該使用一個新的隊列,否則不方便正確處理pass前和pass后調用await的線程。

至此,問題的關鍵就明了了:如何將隊列的替換和版本號的更新這兩個操作做成原子的。

解決方案

以前在《JAVA并發編程實踐》曾看到過這樣一個小技巧,如果要原子地更新兩個變量,那么可以創建一個新的類將它們封裝起來,將這兩個變量當定義成類成員變量,更新時,用CAS更新這個類的引用即可。

因為較為復雜,下面先給出完整的代碼,再講解其中的關鍵。

注意:上面所說pass,在代碼中的具體實現為nextCycle,有兩個版本,一個自動維護版本號,一個由調用者維護版本號。

/**
 * @author trytocatch@163.com
 * @time 2013-1-31
 */
public class BoundlessCyclicBarrier {
    protected final AtomicReference waitQueueRef;

    public BoundlessCyclicBarrier() {
        this(0);
    }

    public BoundlessCyclicBarrier(int startVersion) {
        waitQueueRef = new AtomicReference(new VersionQueue(startVersion));
    }

    public final void awaitWithAssignedVersion(int myVersion)
            throws InterruptedException {
        awaitImpl(true, myVersion, 0);
    }

    /**
     *
     * @param myVersion
     * @param nanosTimeout
     * @return if timeout, or be canceled and doesn"t reach myVersion, returns false
     * @throws InterruptedException
     */
    public final boolean awaitWithAssignedVersion(int myVersion, long nanosTimeout) throws InterruptedException {
        return awaitImpl(true, myVersion, nanosTimeout);
    }

    public final void await() throws InterruptedException {
        awaitImpl(false, 0, 0);
    }

    /**
     *
     * @param nanosTimeout
     * @return if and only if timeout, returns false
     * @throws InterruptedException
     */
    public final boolean await(long nanosTimeout)
            throws InterruptedException {
        return awaitImpl(false, 0, nanosTimeout);
    }

    /**
     * pass and version++(some threads may not be unparked when awaitImpl is in process, but it"s OK in this Barrier)
     * @return old queue version
     */
    public int nextCycle() {
        VersionQueue oldQueue = waitQueueRef.get();
        VersionQueue newQueue = new VersionQueue(oldQueue.version + 1);
        for(;;){
            if (waitQueueRef.compareAndSet(oldQueue, newQueue)) {
                for (Thread t : oldQueue.queue)
                    LockSupport.unpark(t);
                break;
            }
            oldQueue = waitQueueRef.get();
            newQueue.version = oldQueue.version + 1;
        }
        return oldQueue.version;
    }

    /**
     * pass and assign the next cycle version(caller should make sure that the newAssignVersion is right)
     * @param newAssignVersion
     */
    public void nextCycle(int newAssignVersion) {
        VersionQueue oldQueue = waitQueueRef.getAndSet(new VersionQueue(newAssignVersion));
        for (Thread t : oldQueue.queue)
            LockSupport.unpark(t);
    }

    /**
     * if version update has stopped, invoke this to awake all threads
     */
    public void cancel(){
        VersionQueue oldQueue = waitQueueRef.get();
        if (waitQueueRef.compareAndSet(oldQueue, new VersionQueue(oldQueue.version, true))) {
            for (Thread t : oldQueue.queue)
                LockSupport.unpark(t);
    }

    public final int getVersion() {
        return waitQueueRef.get().version;
    }

    private static final class VersionQueue {
        final private ConcurrentLinkedQueue queue;
        int version;
        final boolean isCancelQueue;

        VersionQueue(int curVersion){
            this(curVersion, false);
        }

        VersionQueue(int curVersion, boolean isCancelQueue) {
            this.version = curVersion;
            this.isCancelQueue = isCancelQueue;
            queue = new ConcurrentLinkedQueue();
        }
    }

    /**
     *
     * @param assignVersion is myVersion available
     * @param myVersion wait for this version
     * @param nanosTimeout wait time(nanosTimeout <=0 means that nanosTimeout is invalid)      * @return if timeout, or be canceled and doesn"t reach myVersion, returns false      * @throws InterruptedException      */     protected boolean awaitImpl(boolean assignVersion, int myVersion,             long nanosTimeout) throws InterruptedException {         boolean timeOutEnable = nanosTimeout > 0;
        long lastTime = System.nanoTime();
        VersionQueue newQueue = waitQueueRef.get();//A
        if (assignVersion && newQueue.version - myVersion >= 0)
            return true;
        while (true) {
            VersionQueue submitQueue = newQueue;//B
            submitQueue.queue.add(Thread.currentThread());//C
            while (true) {
                newQueue = waitQueueRef.get();//D
                if (newQueue != submitQueue){//E: it"s a new cycle
                    if(assignVersion == false)
                        return true;
                    else if(newQueue.version - myVersion >= 0)
                        return true;
                    else if (newQueue.isCancelQueue)//F: be canceled
                        return false;
                    else//just like invoking awaitImpl again
                        break;
                }
                if (timeOutEnable) {
                    if (nanosTimeout <= 0)
                        return false;
                    LockSupport.parkNanos(this, nanosTimeout);
                    long now = System.nanoTime();
                    nanosTimeout -= now - lastTime;
                    lastTime = now;
                } else
                    LockSupport.park(this);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        }
    }
}
代碼分析

先分析一下awaitImpl方法,A和D是該方法的關鍵點,決定著它屬于哪一個批次,對應哪一個版本。這里有個小細節,在nexeCycle,cancel解除阻塞時,該線程可能并不在隊列中,因為插入隊列發生在C處,這在A和D之后(雖然看起來C在D之前,但D取到的queue要在下一次循環時才被當作submitQueue),所以,在E處再進行了一次判斷,開始解除阻塞時,舊隊列肯定被新隊列所替換,newQueue != submitQueue一定為真,就會不調用park進行阻塞了,也就不需要解除阻塞,所以即使解除阻塞時,該線程不在隊列中也是沒問題的。

再看E處,當進入一個新的cycle時(當前隊列與提交的隊列不同),a)如果沒指定版本,或者到達或超過了指定版本,則返回true;b)如果當前調用了cancel,則當前隊列的isCancelQueue將為true,則不繼續傻等,返回false;c)或者還未到達指定版本,break,插入到當前隊列中,繼續等待指定版本的到達。

如果沒有進入E處的IF內,則當前線程會被阻塞,直到超時,然后返回false;或被中斷,然后拋出InterruptedException;或被解除阻塞,重新進行E處的判定。

這里還有個小細節,既然cancel時,把當前的隊列設置了isCancelQueue,那么之后指定版本的await會不會也直接返回了呢?其實不會的,因為它若要執行F處的判斷,則先必需通過E處的判定,這意味著,當前隊列已經不是提交時的那個設置了isCancelQueue的隊列了。

代碼中對于cancel的處理,其實并不保證cancel后,之前的await都會被解除阻塞并返回,如果cancel后,緊接著又調用了nextCycle,那么可能某線程感知不到cancel的調用,喚醒后又繼續等待指定的版本。cancel的目的是在于不讓線程傻等,既然恢復版本更新了,那就繼續等待吧。

如果自己維護版本號,則應該保證遞增。另外,版本號的設計,考慮到了int溢出的情況,版本的前后判斷,我不是使用newVersion>=oldVersion,而是newVersion-oldVersion>=0,這樣,版本號就相當于循環使用了,只要兩個比較的版本號的差不超過int的最大值,那么都是正確的,int的最大值可是20多億,幾乎不可能出現跨度這么大的兩個版本號的比較,所以,認為它是正確的。

小結

本文講到了一個非阻塞同步算法設計時的小技巧,如果多個變量之間要維護某種特定關系,那么可以將它們封裝到一個類中,再用CAS更新這個類的引用,這樣就達到了:要么都被更新,要么都沒被更新,保持了多個變量之間的一致性。同時需要注意的是,每次更新都必需創建新的包裝對象,假如有其它更好的辦法,應該避免使用該方法。

via ifeve.com

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/64034.html

相關文章

  • 阻塞同步算法實戰(三)-LatestResultsProvider

    摘要:黑色的線表示,可在任意狀態下發起主動取消,進入該狀態。所以當線程阻塞時,可能處于停止狀態或者主動取消狀態。非阻塞同步相對于鎖同步而言,由代碼塊,轉為了點,是另一種思考方式。 前言 閱讀本文前,需要讀者對happens-before比較熟悉,了解非阻塞同步的一些基本概念。本文主要為happens-before法則的靈活運用,和一些解決問題的小技巧,分析問題的方式。 背景介紹 原始需...

    CrazyCodes 評論0 收藏0
  • Week 1 - Java 多線程 - CAS

    摘要:前言學習情況記錄時間子目標多線程記錄在學習線程安全知識點中,關于的有關知識點。對于資源競爭嚴重線程沖突嚴重的情況,自旋的概率會比較大,從而浪費更多的資源,效率低于。 前言 學習情況記錄 時間:week 1 SMART子目標 :Java 多線程 記錄在學習線程安全知識點中,關于CAS的有關知識點。 線程安全是指:多個線程不管以何種方式訪問某個類,并且在主調代碼中不需要進行同步,都能表...

    ZweiZhao 評論0 收藏0
  • 阻塞同步算法實戰(一):ConcurrentLinkedQueue

    摘要:注意這里指的不是當次而是之后,所以如果我們使用隊列的方法返回,就知道隊列是否為空,但是不知道之后是否為空,并且,當關注的操作發生時,在插入或取出操作的返回值里告知此信息,來指導是否繼續注冊寫操作。 前言 本文寫給對ConcurrentLinkedQueue的實現和非阻塞同步算法的實現原理有一定了解,但缺少實踐經驗的朋友,文中包括了實戰中的嘗試、所走的彎路,經驗和教訓。 背景介紹 ...

    EscapedDog 評論0 收藏0
  • Java面試 32個核心必考點完全解析

    摘要:如問到是否使用某框架,實際是是問該框架的使用場景,有什么特點,和同類可框架對比一系列的問題。這兩個方向的區分點在于工作方向的側重點不同。 [TOC] 這是一份來自嗶哩嗶哩的Java面試Java面試 32個核心必考點完全解析(完) 課程預習 1.1 課程內容分為三個模塊 基礎模塊: 技術崗位與面試 計算機基礎 JVM原理 多線程 設計模式 數據結構與算法 應用模塊: 常用工具集 ...

    JiaXinYi 評論0 收藏0
  • Java多線程學習(七)并發編程中一些問題

    摘要:相比與其他操作系統包括其他類系統有很多的優點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。因為多線程競爭鎖時會引起上下文切換。減少線程的使用。很多編程語言中都有協程。所以如何避免死鎖的產生,在我們使用并發編程時至關重要。 系列文章傳送門: Java多線程學習(一)Java多線程入門 Java多線程學習(二)synchronized關鍵字(1) java多線程學習(二)syn...

    dingding199389 評論0 收藏0

發表評論

0條評論

yintaolaowanzi

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<