摘要:黑色的線表示,可在任意狀態下發起主動取消,進入該狀態。所以當線程阻塞時,可能處于停止狀態或者主動取消狀態。非阻塞同步相對于鎖同步而言,由代碼塊,轉為了點,是另一種思考方式。
前言
閱讀本文前,需要讀者對happens-before比較熟悉,了解非阻塞同步的一些基本概念。本文主要為happens-before法則的靈活運用,和一些解決問題的小技巧,分析問題的方式。
背景介紹原始需求為:本人當時在編寫一個正則替換工具,里面會動態地顯示所有的匹配結果(包括替換預覽),文本、正則表達式、參數,這些數據的其中一項發生了變化,結果就應該被更新,為了提供友好的交互體驗,數據變化時,應該是發起一個異步請求,由另一個獨立的線程來完成運算,完成后通知UI更新結果。由于是動態顯示,所以提交會非常頻繁。
需求描述需要這樣一個工具類,允許用戶頻繁地提交數據(本文之后以“submit”表示該操作)和更新結果(本文之后以“update”表示該操作),submit時,如果當前有進行中的運算,則應該取消,使用新參數執行新的運算;update時,如果當前沒有進行中的運算(處于阻塞狀態),并且當前結果不是最新的,則喚醒該線程,使用當前的新數據,執行新的運算。此處之所以分為submit和update兩個方法,是為了支持手動更新,即點擊更新按鈕時,才更新結果。
此外,出于練手的原因,也出于編寫一個功能全面,更實用的工具的目的,我還加入了一些額外的需求:
1、引入多線程場景,update和submit均可由多個線程同時發起,該工具類應設計成線程安全的。
2、允許延遲執行運算,如果延時內執行submit,僅重新計算延時。如果運算不方便取消,在短時間頻繁submit的場景下,延時會是一個很好的應對辦法。
3、允許設置一個最大延遲時間,作為延遲開啟運算的補充。當長時間頻繁submit時,會形成這樣的局面,一直未進入運算環節,新結果計算不出來,上一次計算結果卻是很早以前的。如果需要顯示一個較新但不是最新的結果,最大延遲時間將會很有用。
4、提供主動取消方法,主動取消正在進行的運算。
5、update時,允許等待運算完成,同時也可設置超時時間。當主動取消、超時、完成了當前或更(更加的意思)新的數據對應的運算時,結束等待。
需求交待完了,有興趣有精力的讀者,可以先試著思考下怎么實現。
問題分析該工具應該維護一個狀態字段,這樣才能在發起某個操作時,根據所處的狀態作出正確的動作,如:如果當前不處于停止狀態(或者主動取消狀態,原因見下文),執行update就不需要喚醒運算線程。簡單分析可知,至少應該有這樣幾種狀態:
1、停止狀態:當前沒有運算任務,線程進入阻塞狀態,主動取消和運算完成后,進入該狀態
2、延遲狀態:設置了延遲開啟運算時,進入運算前,處于該狀態
3、運算狀態:正在執行運算
4、主動取消狀態:當發起主動取消時,進入該狀態
5、新任務狀態:當時有新的運算任務時,進入該狀態,然后重新進入運算狀態
延遲再來看一下延遲,如果延遲500毫秒,就每次sleep(500),那么期間再submit怎么辦?將它喚醒然后重新sleep(500)嗎?顯然不行,成本太大了。
我有一個小技巧:將500分成多個合適的等份,使用一個計數器,每次sleep一個等份,計數器加1,如果發起submit,僅把計數器置0即可,雖然看起來線程的狀態切換變多了,但應對頻繁重置時,它更穩定。雖然時間上會上下波動一個等份,但此處并不需要多么精確。
現在還面臨這樣一個問題,如何知道當前是處于延遲狀態并計數器置0?取出狀態值進行判斷,然后置0,這方法顯然不行,因為置0的時候,可能狀態已經變了,所以你無法知道該操作是否生效了。
我想到的辦法是,再引入一個延遲重置狀態。如果處于該狀態,則下一次計數器加1時,將計數器重置,狀態變更是可以知道成功與否的。
狀態變更有些狀態的變更是有條件的,比如說當前處于取消狀態,就不能把它轉為運算狀態,運算狀態只能由新任務狀態、延遲狀態(延遲完成后執行運算)或延遲重置狀態轉入。這種場景正好跟CAS一致,所以,使用一個AtomicInteger來表示狀態。
分析下各狀態之間的轉換,可以得出下面的狀態變更圖:
藍色的a(bcd)|(e)f線路為停止狀態下,發起一次update,運算完重新回到停止的過程,開啟延遲時是bcd,否則是e。
紅色的線j表示超過了最大延遲時間,退出延遲,進入運算狀態(也可以是d)。
綠色的線ghi(包括a)表示:如果發起了submit或update,狀態應該怎么改變。如果處于延遲重置、新任務則不需要進行任何操作;如果處于延遲狀態,則轉為延遲重置即可;如果處于運算狀態,則可能使用了舊參數,應該轉為新任務;如果為主動取消或停止狀態,并且是調用update方法,則轉為新任務,并且可能處于阻塞狀態,應該喚醒該線程。
黑色的線l表示,可在任意狀態下發起主動取消,進入該狀態。然后通知等待線程后,轉入停止狀態,對應紫色的k,如果在停止狀態下發起主動取消,則僅轉為主動取消狀態,不會通知等待線程。所以當線程阻塞時,可能處于停止狀態或者主動取消狀態。
順序問題上面已經分析到,當submit時,應該把延遲轉為延遲重置、或運算轉為新任務,這兩個嘗試的順序是不是也有講究呢?
是的,因為正常執行流程a(bcd)|(e)f中,運算狀態在延遲狀態之后,假如先嘗試運算轉為新任務,可能此時為延遲狀態,故失敗,再嘗試延遲轉為延遲重置時,狀態在這期間從剛才的延遲轉為了運算,故兩次嘗試都失敗了,本應該重置延遲的,卻什么也沒干,這是錯誤的。而將兩次嘗試順序調換一下,只要狀態為延遲或運算,那么兩次狀態轉換嘗試中,一定有一次會成功。
之后的代碼中還有多處類似的順序細節。
解決方案下面給出完整的代碼,除去等待運算完成那部分,其它地方均為wait-free級別的實現。
calculateResult是具體執行運算的方法;上文中的submit對應代碼里的updateParametersVersion方法,上文中的update對應剩余幾個update方法。
updateAndWait方法中,使用了上一篇中講到的BoundlessCyclicBarrier,其維護的版本號就是參數的版本號ParametersVersion。
/** * @author trytocatch@163.com * @date 2013-2-2 */ public abstract class LatestResultsProvider { /** update return value */ public static final int UPDATE_FAILED = -1; public static final int UPDATE_NO_NEED_TO_UPDATE = 0; public static final int UPDATE_SUCCESS = 1; public static final int UPDATE_COMMITTED = 2; /** update return value */ /** work states*/ private static final int WS_OFF = 0; private static final int WS_NEW_TASK = 1; private static final int WS_WORKING = 2; private static final int WS_DELAYING = 3; private static final int WS_DELAY_RESET = 4; private static final int WS_CANCELED = 5; /** work states*/ private final AtomicInteger workState; private int sleepPeriod = 30; private final AtomicInteger parametersVersion; private volatile int updateDelay;// updateDelay>=0 private volatile int delayUpperLimit; private final BoundlessCyclicBarrier barrier; private Thread workThread; /** * * @param updateDelay unit: millisecond * @param delayUpperLimit limit the sum of the delay, disabled * while delayUpperLimit<0, unit: millisecond */ public LatestResultsProvider(int updateDelay, int delayUpperLimit) { if (updateDelay < 0) this.updateDelay = 0; else this.updateDelay = updateDelay; this.delayUpperLimit = delayUpperLimit; barrier = new BoundlessCyclicBarrier(0); workState = new AtomicInteger(WS_OFF); parametersVersion = new AtomicInteger(0); initThread(); } private void initThread() { workThread = new Thread("trytocatch"s worker") { @Override public void run() { int sleepCount = 0; for (;;) { try { while (!workState.compareAndSet(WS_NEW_TASK, updateDelay > 0 ? WS_DELAY_RESET : WS_WORKING)) { if (workState.compareAndSet(WS_CANCELED, WS_OFF)) { barrier.cancel(); } LockSupport.park(); interrupted(); } if (workState.get() == WS_DELAY_RESET) { int delaySum = 0; for (;;) { if (workState.compareAndSet(WS_DELAY_RESET, WS_DELAYING)) { sleepCount = (updateDelay + sleepPeriod - 1) / sleepPeriod; } sleep(sleepPeriod); if (--sleepCount <= 0 && workState.compareAndSet(WS_DELAYING, WS_WORKING)) break; if (delayUpperLimit >= 0) { delaySum += sleepPeriod; if (delaySum >= delayUpperLimit) { if (!workState.compareAndSet( WS_DELAYING, WS_WORKING)) workState.compareAndSet( WS_DELAY_RESET, WS_WORKING); break; } } if (workState.get() != WS_DELAYING && workState.get() != WS_DELAY_RESET) break; } } if (isWorking()) { int workingVersion = parametersVersion.get(); try { calculateResult(); if (workState.compareAndSet(WS_WORKING, WS_OFF)) barrier.nextCycle(workingVersion); } catch (Throwable t) { t.printStackTrace(); workState.set(WS_CANCELED); } } } catch (InterruptedException e) { workState.compareAndSet(WS_DELAYING, WS_CANCELED); workState.compareAndSet(WS_DELAY_RESET, WS_CANCELED); } }// for(;;) }// run() }; workThread.setDaemon(true); workThread.start(); } public int getUpdateDelay() { return updateDelay; } /** * @param updateDelay * delay time. unit: millisecond */ public void setUpdateDelay(int updateDelay) { this.updateDelay = updateDelay < 0 ? 0 : updateDelay; } public int getDelayUpperLimit() { return delayUpperLimit; } /** * @param delayUpperLimit limit the sum of the delay, disabled * while delayUpperLimit<0, unit: millisecond */ public void setDelayUpperLimit(int delayUpperLimit) { this.delayUpperLimit = delayUpperLimit; } public final void stopCurrentWorking() { workState.set(WS_CANCELED); } /** * @return NO_NEED_TO_UPDATE, COMMITTED */ public final int update() { if (isResultUptodate()) return UPDATE_NO_NEED_TO_UPDATE; if (workState.compareAndSet(WS_CANCELED, WS_NEW_TASK) || workState.compareAndSet(WS_OFF, WS_NEW_TASK)) LockSupport.unpark(workThread); return UPDATE_COMMITTED; } /** * @param timeout * unit:nanoseconds * @return FAILED, NO_NEED_TO_UPDATE, SUCCESS * @throws InterruptedException */ public final int updateAndWait(long nanosTimeout) throws InterruptedException { int newVersion = parametersVersion.get(); if (update() == UPDATE_NO_NEED_TO_UPDATE) return UPDATE_NO_NEED_TO_UPDATE; barrier.awaitWithAssignedVersion(newVersion, nanosTimeout); return barrier.getVersion() - newVersion >= 0 ? UPDATE_SUCCESS : UPDATE_FAILED; } /** * @return FAILED, NO_NEED_TO_UPDATE, SUCCESS * @throws InterruptedException */ public final int updateAndWait() throws InterruptedException { return updateAndWait(0); } public final boolean isResultUptodate() { return parametersVersion.get() == barrier.getVersion(); } /** * be used in calculateResult() * @return true: the work state is working, worth to calculate the * result absolutely, otherwise you can cancel the current calculation */ protected final boolean isWorking() { return workState.get()==WS_WORKING; } /** * you must call this after update the parameters, and before calling the * update */ protected final void updateParametersVersion() { int pVersion = parametersVersion.get(); //CAS failed means that another thread do the same work already if (parametersVersion.compareAndSet(pVersion, pVersion + 1)) if (!workState.compareAndSet(WS_DELAYING, WS_DELAY_RESET)) workState.compareAndSet(WS_WORKING, WS_NEW_TASK); } /** * implement this to deal with you task */ protected abstract void calculateResult(); }
代碼中,我直接在構造方法里開啟了新的線程,一般來說,是不推薦這樣做的,但在此處,除非在構造還未完成時就執行update方法,否則不會引發什么問題。
最后,附上該正則替換工具的介紹和下載地址:http://www.cnblogs.com/trytocatch/p/RegexReplacer.html
小結狀態變更非常適合使用非阻塞算法,并且還能夠達到wait-free級別。限于篇幅,有些沒講到的細節,請讀者借助代碼來理解吧,如有疑問,歡迎回復討論。
系列總結本實戰系列就到此結束了,簡單總結下。
非阻塞同步相對于鎖同步而言,由代碼塊,轉為了點,是另一種思考方式。
有時,無法做到一步完成,也許可以分成兩步完成,同樣可以解決問題,ConcurrentLinkedQueue就是這么做的。
如果需要維護多個數據之間的某種一致關系,則可以將它們封裝到一個類中,更新時采用更新該類對象的引用的方式。
眾所周知,鎖同步算法是難以測試的,非阻塞同步算法更加難以測試,我個人認為,其正確性主要靠慎密的推敲和論證。
非阻塞同步算法比鎖同步算法要顯得更復雜些,如果對性能要求不高,對非阻塞算法掌握得還不太熟練,建議不要使用非阻塞算法,鎖同步算法要簡潔得多,也更容易維護,如上面所說的,兩條看似沒有順序的語句,調換下順序,可能就會引發BUG。
by trytocatch via ifeve
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/64033.html
摘要:前言學習情況記錄時間子目標多線程記錄在學習線程安全知識點中,關于的有關知識點。對于資源競爭嚴重線程沖突嚴重的情況,自旋的概率會比較大,從而浪費更多的資源,效率低于。 前言 學習情況記錄 時間:week 1 SMART子目標 :Java 多線程 記錄在學習線程安全知識點中,關于CAS的有關知識點。 線程安全是指:多個線程不管以何種方式訪問某個類,并且在主調代碼中不需要進行同步,都能表...
摘要:如果停止了版本更新,可使用方法來解除所有因而阻塞的線程,包括指定版本號的。如果自己維護版本號,則應該保證遞增。 前言 相比上一篇而言,本文不需要太多的準備知識,但技巧性更強一些。因為分析、設計的過程比較復雜繁瑣,也限于篇幅,所以,主要展示如何解決這些需求,和講解代碼。另外,所講的內容也是后一篇實戰中需要用到的一個工具類。 需求介紹 我需要編寫一個同步工具,它需要提供這樣幾個方法:...
摘要:注意這里指的不是當次而是之后,所以如果我們使用隊列的方法返回,就知道隊列是否為空,但是不知道之后是否為空,并且,當關注的操作發生時,在插入或取出操作的返回值里告知此信息,來指導是否繼續注冊寫操作。 前言 本文寫給對ConcurrentLinkedQueue的實現和非阻塞同步算法的實現原理有一定了解,但缺少實踐經驗的朋友,文中包括了實戰中的嘗試、所走的彎路,經驗和教訓。 背景介紹 ...
摘要:如問到是否使用某框架,實際是是問該框架的使用場景,有什么特點,和同類可框架對比一系列的問題。這兩個方向的區分點在于工作方向的側重點不同。 [TOC] 這是一份來自嗶哩嗶哩的Java面試Java面試 32個核心必考點完全解析(完) 課程預習 1.1 課程內容分為三個模塊 基礎模塊: 技術崗位與面試 計算機基礎 JVM原理 多線程 設計模式 數據結構與算法 應用模塊: 常用工具集 ...
摘要:相比與其他操作系統包括其他類系統有很多的優點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。因為多線程競爭鎖時會引起上下文切換。減少線程的使用。很多編程語言中都有協程。所以如何避免死鎖的產生,在我們使用并發編程時至關重要。 系列文章傳送門: Java多線程學習(一)Java多線程入門 Java多線程學習(二)synchronized關鍵字(1) java多線程學習(二)syn...
閱讀 2968·2023-04-25 19:45
閱讀 2690·2021-11-19 09:40
閱讀 690·2021-10-14 09:49
閱讀 2668·2021-09-30 09:47
閱讀 2201·2021-09-26 09:55
閱讀 1220·2021-09-22 16:01
閱讀 2809·2019-08-30 14:19
閱讀 706·2019-08-29 16:44