摘要:所以接下來,我們需要簡單的介紹下多線程中的并發通信模型。比如中,以及各種鎖機制,均為了解決線程間公共狀態的串行訪問問題。
并發的學習門檻較高,相較單純的羅列并發編程 API 的枯燥被動學習方式,本系列文章試圖用一個簡單的栗子,一步步結合并發編程的相關知識分析舊有實現的不足,再實現邏輯進行分析改進,試圖展示例子背后的并發工具與實現原理。
本文是本系列的第一篇文章,提出了一個簡單的業務場景,給出了一個簡單的串行實現以及基于原子變量的并發實現,同時詳細分析了 Java多線程通信、 Java 內存模型、 happy before 等基本概念。
寫在前面文中所有的代碼筆者均全部實現了一遍,并上傳到了我的 github 上,多線程這部分源碼位于java-multithread模塊中 ,歡迎感興趣的讀者訪問并給出建議^_^
倉庫地址:java-learning串行實現
git-clone:git@github.com:The-Hope/java-learning.git
假定有這樣一個需求,給定一個目錄和一個關鍵字,要求統計指定的目錄中各文件內指定關鍵字出現的總次數。
先來看看串行狀態下該怎么實現:
/** * Description: * 掃描指定目錄下指定關鍵字的出現次數——串行版本實現 * * @author The hope * @date 2018/5/20. */ public class KeywordCount1 implements KeywordCount { private String keyword; private File directory; public KeywordCount1(File directory, String keyword) { this.keyword = keyword; this.directory = directory; } public int search() { return search(directory); } private int search(File directory) { int result = 0; for (File file : directory.listFiles()) if (file.isDirectory()) result += search(file); else result += count(file); return result; } private int count(File file) { int result = 0; try (Scanner in = new Scanner(file)) { while (in.hasNextLine()) { String line = in.nextLine(); if (line.contains(keyword)) result++; } } catch (FileNotFoundException e) { e.printStackTrace(); } return result; } @Override public void shutDown() {} }
代碼很簡單,核心實現是search(File directory) 函數:
private int search(File directory) { int result = 0; for (File file : directory.listFiles()) if (file.isDirectory()) result += search(file); else result += count(file); return result; }
邏輯很簡單,判斷當前 file 對象如果是文件夾就遞歸調用自己,否則統計關鍵字出現次數。(注,為了方便測試函數的調用,我抽象了接口 KeywordCount 以規范暴露出的方法)
為了看看它的執行效果我們再來寫個簡單的測試函數:
/** * Description: * 掃描指定目錄下指定關鍵字的出現次數——測試函數 * @author The hope * @date 2018/5/20. */ public class KeywordCountTest { public static void main(String... args) throws Exception{ Scanner in = new Scanner(System.in); System.out.println("Enter base directory (e.g. C:Program FilesJavajdk1.6.0_45src): "); String directory = in.nextLine(); System.out.println("Enter keyword (e.g. java): "); String keyword = in.nextLine(); int execTimes = 5;// 設定執行次數 long start = System.currentTimeMillis();//開始計時 int totalCount = 0; KeywordCount counter = new KeywordCount1(new File(directory), keyword); for (int i = 0; i < execTimes; i++) { int count = counter.search(); totalCount += count; } long end = System.currentTimeMillis();//結束計時 System.out.println("Statistics: " + totalCount/ execTimes); System.out.println("used time: " + (end-start)/ execTimes); counter.shutDown(); } }
(為了消除單次運行的波動影響,這里故意寫了個循環來做平均)
執行效果如下:
Enter base directory (e.g. C:Program FilesJavajdk1.6.0_45src): C:Program FilesJavajdk1.6.0_45src Enter keyword (e.g. java): java Statistics: 43781 used time: 5152 Process finished with exit code 0
可以看到用時大概在5秒左右
拓展思考我們可以簡單的分析下整個功能的邏輯,大體上可以分為兩個部分:
從給定目錄尋找下級文件
從給定文件中統計指定關鍵字出現次數
其中第二步明顯是相互獨立、互不依賴且耗時較多的任務,假使我們能夠引入多線程并發的去執行那么就能合理的提升系統的吞吐量進而提高系統響應時間。
注意,在分析是否值得利用多線程改進一個需求實現時,自什么維度來進行任務的拆分是一件比較重要的考慮因素。如果任務之間存在執行順序依賴或者數據依賴,那么就很難簡單的對任務進行拆分,而應該從更高的維度重新思考任務的邊界并設計相應的實現。比如,針對有執行順序依賴的任務,可以從更高維度來對任務進行分組,并將一組任務放入一個線程中順序執行,并通過 ThreadLocal 來傳遞變量,這樣可以有效減少數據爭用的競態條件。
引入并發在開始動筆實現之前,我們先來思考這么兩個問題:
1. 線程何時執行不受我們控制,我們怎么知道線程何時能夠執行完畢
2. 即便我們知道線程什么時候執行完畢,可是 Java 并沒有提供線程之間顯示的通信方法,那么我們怎么獲取需要的結果。
其實這兩個問題,都是典型的線程間通信問題。比如第一個問題,換種角度看就是主線程如何接收子線程執行完畢的信息。第二個問題更是一種典型的主線程如何接受子線程計算結果的問題。
所以接下來,我們需要簡單的介紹下多線程中的并發通信模型。
多線程間的并發通信對于多線程編程來說,最根本的就是解決兩個問題:
線程之間如何進行通信(以何種信息來交換信息)
線程之間如何進行同步
我們先來說說如何通信,大體上有這么兩種方式:
基于消息傳遞
基于共享內存
消息傳遞的并發模型基于消息傳遞的并發模型中,線程之間沒有公共狀態,通信基于顯式消息傳遞實現,由于消息的接收一定存在于消息的發送之后,此時同步是隱式進行的。
結合并發模型的介紹,我們可以很容易的知道,Thread.join() 方法就是一種很典型的線程間消息傳遞機制。他傳遞的消息就是目標線程何時執行完畢的信息,并兼具阻塞代碼執行的功能。類似的消息傳遞機制還有 wait(),notifyAll() 等方法。
舉個栗子:
針對前文的第一個問題:
線程何時執行不受我們控制,我們怎么知道線程何時能夠執行完畢?
如下方代碼所示。通過使用 Thread.join()方法,保證代碼阻塞,直到子線程執行完畢再繼續執行:
class ThreadA{ public static void main(String... args){ ThreadB b = new Thread(new Runnable{ void run(){...}}); b.start(); b.join(); // join() 方法會等待線程執行完畢。如果不加這一行將會繼續運行下去 // do something } }共享內存的并發模型
基于共享內存的并發模型中,線程之間共享程序的公共狀態,通信是通過線程之間串行的對公共狀態進行讀寫來實現的,因此總是需要(程序員)顯示的指定同步來實現隱式的通信。
比如 Java 中 volatile,synchronized 以及各種鎖機制,均為了解決線程間公共狀態的串行訪問問題。
講到這里,我們還可以再宕開一筆,簡單聊聊為什么基于共享內存的并發模型一定要花大力氣保證線程之間的串行執行。
Java 內存模型的抽象(JMM)類似現代多核處理器會給每個核心設計自己的 CPU 寄存器緩存主內存中的目標數據,以方便處理器的快速存取。當多個處理器的任務涉及同一塊主內存時,就需要利用 MSI、MESI、MOSI 等緩存一致性協議來協調各個處理器之間的對特定內存或者高速緩存的訪問規則。如下圖:
針對一個線程對共享變量的寫入何時對另一個線程可見問題,Java 利用 JMM 抽象了線程與主內存之間的關系。
我們先來看看Java內存模型(JMM)的示意圖:
注意:這里的工作內存并不實際存在,而是涵蓋了緩存,寫緩沖區,寄存器以及其他的硬件和編譯器優化等概念的一種抽象
從圖中就可以很清晰的歸納出,如果線程A想要和線程B之間想要通過共享內存進行通信,那么必須經過以下步驟:
線程A將工作內存中更新的工作內存副本寫回至主內存中
線程B從根據主內存中的值重新更新刷新自己的工作內存副本
上述兩步必須有序進行,否則將會導致通信錯誤。
例如考慮以下時序:
變量X初始值為100
線程 A 將 X 值寫入工作內存中,此時工作內存與主內存 X 值均為100
線程 A 給 X + 50 然后寫入工作內存中,此時 A 的時間片用完。 X 在工作內存值為 150,X在主內存中值為100
線程 B 將 X 的值寫入自己的工作內存中。此時線程 B 的工作內存值為 100,主內存值仍為 100。
線程 B 給 X + 30 然后寫入工作內存中,此時 B 的工作內存值為 130,主內存值為 100。
線程 B 將工作內存的值寫回主內存,線程 B 運行結束。此時主內存值為 130。
線程 A 從休眠中醒來,將工作內存中的 150 同步回主內存,此時主內存值為 150。
從上述時序中,我們可以看到,由于線程 A & B 針對共享狀態 X 寫入并不是串行的,導致中間出現了數據覆蓋的錯誤情況。同理,讀者可以再繼續分析思考下寫讀模型中的同步問題。
重排序值得注意的是,除了上述例子中,線程間錯誤的時序會導致并發錯誤,重排序也同樣會導致意想不到的并發錯誤。
重排序的原因大體分為這三種:
編譯期優化的重排序(編譯器僅保證不更改單線程運行語義)
指令級并行的重排序(處理器僅保證不破壞存在數據依賴的指令)
內存系統的重排序(讀/寫緩沖區到主內存同步機制)
關于這部分的介紹,前人珠玉在前,列舉了大量簡明易懂的例子。這里援引并發編程網的程曉明在《深入理解Java內存模型》系列文章中的一個例子來給大家做個簡單介紹:
處理器重排序與內存屏障指令現代的處理器使用寫緩沖區來臨時保存向內存寫入的數據。寫緩沖區可以保證指令流水線持續運行,它可以避免由于處理器停頓下來等待向內存寫入數據而產生的延遲。同時,通過以批處理的方式刷新寫緩沖區,以及合并寫緩沖區中對同一內存地址的多次寫,可以減少對內存總線的占用。雖然寫緩沖區有這么多好處,但每個處理器上的寫緩沖區,僅僅對它所在的處理器可見。這個特性會對內存操作的執行順序產生重要的影響:處理器對內存的讀/寫操作的執行順序,不一定與內存實際發生的讀/寫操作順序一致!為了具體說明,請看下面示例:
Processor A | Processor B |
---|---|
a = 1; //A1 x = b; //A2 |
b = 2; //B1 y = a; //B2 |
初始狀態:a = b = 0 處理器允許執行后得到結果:x = y = 0 |
假設處理器A和處理器B按程序的順序并行執行內存訪問,最終卻可能得到x = y = 0的結果。具體的原因如下圖所示:
這里處理器A和處理器B可以同時把共享變量寫入自己的寫緩沖區(A1,B1),然后從內存中讀取另一個共享變量(A2,B2),最后才把自己寫緩存區中保存的臟數據刷新到內存中(A3,B3)。當以這種時序執行時,程序就可以得到x = y = 0的結果。
從內存操作實際發生的順序來看,直到處理器A執行A3來刷新自己的寫緩存區,寫操作A1才算真正執行了。雖然處理器A執行內存操作的順序為:A1->A2,但內存操作實際發生的順序卻是:A2->A1。此時,處理器A的內存操作順序被重排序了(處理器B的情況和處理器A一樣,這里就不贅述了)。
這里的關鍵是,由于寫緩沖區僅對自己的處理器可見,它會導致處理器執行內存操作的順序可能會與內存實際的操作執行順序不一致。由于現代的處理器都會使用寫緩沖區,因此現代的處理器都會允許對寫-讀操作重排序。
上述引文介紹了一個簡單的小栗子,說明了重排序問題導致的一個并發錯誤。既然重排序問題可能導致程序在并發執行時導致意想不到的錯誤發生,作為程序員我們又該怎么分析定位問題呢?
先行發生(happens before)原則雖然重排序問題會導致并發程序的可見性錯誤,不過 Java 通過先行發生的概念重新約定了操作之間的可見性。
換句話說如果一個操作的執行結果需要對另一個線程可見,那么這兩個操作之間一定要存在 happens before 關系。這里的兩個操作可以是在一個線程也可以是兩個線程。
與我們日常開發聯系最緊密的先行發生原則如下:
程序順序規則:一個線程中的每個操作,happens- before 于該線程中的任意后續操作。
監視器鎖規則:對一個監視器鎖的解鎖,happens- before 于隨后對這個監視器鎖的加鎖。
volatile變量規則:對一個volatile域的寫,happens- before 于任意后續對這個volatile域的讀。
傳遞性:如果A happens- before B,且B happens- before C,那么A happens- before C。
注:我們常說的 synchronized,volatile,ReentrantLock 等顯示同步的原理,就是依托于這里的監視器鎖規則實現的。
小結這里我們介紹了基于共享狀態的并發模型,指出了由于線程工作內存與主內存的同步,代碼執行的重排序等問題,可能導致線程共享狀態的可見性及原子性錯誤。因此,當線程之間存在公共狀態時,需要利用先行發生原則針對共享狀態的訪問進行合理性分析,確保共享狀態的訪問/修改操作兩兩符合先行發生原則。換句話說,需要保證對多線程之間共享狀態的操作進行合理同步。
拓展思考學了這么多,回到我們最開始的問題:
即便我們知道線程什么時候執行完畢,可是 Java 并沒有提供線程之間顯示的通信方法,那么我們怎么獲取需要的結果?
在進行分析之前,我們回過頭來看看之前版本的核心代碼實現:
int totalCount = 0; KeywordCount counter = new KeywordCount1(new File(directory), keyword); for (int i = 0; i < execTimes; i++) { int count = counter.search(); totalCount += count; }
可以看到,我們最終的結果是通過 totalCount 變量記錄的,也就是說,如果我們依舊依賴這個變量作為我們的最重結果,因為每個線程都會統計自己的關鍵詞,累加到該變量。那么這就是一種典型的共享數據的競態問題,這時依據先行發生原則進行分析,我們發現:
因為不是單線程環境,所以程序順序規則失效
因為沒有用任何鎖,也沒有用 synchronized 關鍵字,所以監視器規則失效
因為沒有用 volatile 關鍵字,所以volatile規則失效
因為上述規則都失效,所以傳遞性規則也失效
綜上,通過利用先行發生原則對競態條件進行分析,我們發現這部分代碼不做改變那么多線程環境下鐵定會出錯,那么我們接下來該怎么辦呢?
解決方法我們可以新建一個 Counter 類,將這個 Counter 類傳遞給各個線程去運行計算相應的任務。同時在 Counter 類中設置一個原子的計數器域(AtomicInteger),利用 AtomicInteger 的 incrementAndGet() 來實現原子的自增操作。等主線程判斷計算任務執行完畢時,再從 Counter 類獲取計算結果即可。核心代碼如下:
class Counter{ private AtomicInteger count = new AtomicInteger(0); counter(File file){ ··· count.incrementAndGet(); ··· } int getCounterNum(){ count.get(); } }
注:這里由于計數器的實現需要依賴變量自身的舊狀態,所以不能使用 volatile 變量。反之,如果業務場景只需要共享狀態的單一更新(不依賴舊狀態),那么使用 volatile 關鍵字效率會更高。
拓展來看,如果業務操作再復雜一些,需要確保多個變量的組合操作的并發原子性時,更建議使用 ReentrantLock 以及 synchronized 關鍵字來對方法或者代碼塊進行鎖定以保證正確性。
基于上文對并發編程模型的思考,我們解決了擺在我們面前的兩尊攔路虎,線程何時結束 & 變量在線程中如何傳遞。
現在我們終于可以再來看看并發版本的關鍵字統計功能該如何實現了。代碼實現如下:
/** * Description: * 掃描指定目錄下指定關鍵字的出現次數——多線程+原子變量版本實現 * @author The hope * @date 2018/5/20. */ public class KeywordCount2 implements KeywordCount { private final File directory; private final String keyword; KeywordCount2(File directory, String keyword) { this.keyword = keyword; this.directory = directory; } public int search() throws InterruptedException { Counter counter = new Counter(keyword); FileSearch fileSearch = new FileSearch(directory, counter); Thread t = new Thread(fileSearch); t.start(); t.join(); return counter.getCountNum(); } @Override public void shutDown() {} private static class FileSearch implements Runnable { private File directory; private Counter counter; FileSearch(File file, Counter counter) { this.directory = file; this.counter = counter; } @Override public void run() { ListsubThreads = new ArrayList<>(); for (File file : directory.listFiles()) if (file.isDirectory()) { FileSearch fileSearch = new FileSearch(file, counter); Thread t = new Thread(fileSearch); subThreads.add(t); t.start(); } else { counter.search(file); } for (Thread subThread : subThreads) try { subThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Counter { String keyword; AtomicInteger count = new AtomicInteger(0); Counter(String keyword) { this.keyword = keyword; } int getCountNum() { return count.get(); } void search(File file) { try (Scanner in = new Scanner(file)) { while (in.hasNextLine()) { String line = in.nextLine(); if (line.contains(keyword)) count.incrementAndGet(); } } catch (FileNotFoundException e) { e.printStackTrace(); } } } }
這里我們新創建了兩個類 FileSearch與Counter。
利用FileSearch來進行線程的創建與子計算的分發問題:
@Override public void run() { ListsubThreads = new ArrayList<>(); for (File file : directory.listFiles()) if (file.isDirectory()) { FileSearch fileSearch = new FileSearch(file, counter); Thread t = new Thread(fileSearch); subThreads.add(t); t.start(); } else { counter.search(file); } for (Thread subThread : subThreads) try { subThread.join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
利用Counter來解決計算結果在線程間的傳遞問題:
··· AtomicInteger count = new AtomicInteger(0); ··· void search(File file) { try (Scanner in = new Scanner(file)) { while (in.hasNextLine()) { String line = in.nextLine(); if (line.contains(keyword)) count.incrementAndGet(); } } catch (FileNotFoundException e) { e.printStackTrace(); } }
執行結果如下:
Enter base directory (e.g. C:Program FilesJavajdk1.6.0_45src): C:Program FilesJavajdk1.6.0_45src Enter keyword (e.g. java): java Statistics: 43781 used time: 2418 Process finished with exit code 0
可以看到時間降低至2秒半左右,提高了50%,的確是極大的提高了響應速度
小結本文通過提出一個簡單的業務場景(統計指定目錄下關鍵字出現數量),并設計了一個簡單的串行實現。
針對串行版本響應緩慢的問題,筆者以提出問題-解決問題的模式,引入Java多線程通信以及 Java 內存模型的相關知識,一步步解決改造過程中的痛點并最終完成了一個基于原子變量的并發版本實現。
通過測試驗證,本輪改造成功解決了串行版本的業務痛點 :)
拓展思考雖然上述實現極大的提高了程序的執行速度,將執行時間縮短了一半。但是仍然存在下面幾個問題。
代碼變得更為復雜: 串行版本50行不到解決問題并發版本,卻暴增至100行,客觀上增加了復雜度。
創建線程的數量不可確定: 本版本的實現中,線程的創建數量僅取決于文件數目,衍生出執行效率問題。
多了些額外的對象,比如 Counter:本問題實際上是問題 1 的具體版本,為了并發而引入新的類本就客觀增加了復雜度。
Counter 面臨多個線程的競態條件,必須進行同步:由于使用Counter來解決線程間的通信問題,因而勢必引出同步問題。
上述問題該如何解決與避免,請看下文:深入理解 Java多線程系列(2)——執行器框架
未完待續~
參考文獻Java 并發編程實戰
Java 核心技術——卷Ⅰ
深入理解 Jvm 虛擬機——周志明
深入理解 Java 內存模型——程曉明
zhihu.com
segmentfault.com
oschina.net
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69493.html
摘要:虛擬機所處的區域,則表示它是屬于新生代收集器還是老年代收集器。虛擬機總共運行了分鐘,其中垃圾收集花掉分鐘,那么吞吐量就是。收集器線程所占用的數量為。 本文主要從GC(垃圾回收)的角度試著對jvm中的內存分配策略與相應的垃圾收集器做一個介紹。 注:還是老規矩,本著能畫圖就不BB原則,盡量將各知識點通過思維導圖或者其他模型圖的方式進行說明。文字僅記錄額外的思考與心得,以及其他特殊情況 內存...
摘要:設備改造上傳結果數據的技術實現一項目需求及分析按照領導的要求,要改造一臺儀器,添加點功能,將測量數據上傳到服務器。所以選擇用提交,的通信可以多線程調度。考慮到新增的上傳功能不能影響之前的測量節拍,所以要多線程實現。 **設備改造——上傳結果數據的技術實現 一、項目需求及分析 按照領導的要求,要改造一臺儀器,添加點功能,將測量數據上傳到服務器。儀器測量節拍大概是20s,數據量目前不大,...
摘要:線程之間的通信由內存模型本文簡稱為控制,決定一個線程對共享變量的寫入何時對另一個線程可見。為了保證內存可見性,編譯器在生成指令序列的適當位置會插入內存屏障指令來禁止特定類型的處理器重排序。 并發編程模型的分類 在并發編程中,我們需要處理兩個關鍵問題:線程之間如何通信及線程之間如何同步(這里的線程是指并發執行的活動實體)。通信是指線程之間以何種機制來交換信息。在命令式編程中,線程之間的...
摘要:后端好書閱讀與推薦系列文章后端好書閱讀與推薦后端好書閱讀與推薦續后端好書閱讀與推薦續二后端好書閱讀與推薦續三這里依然記錄一下每本書的亮點與自己讀書心得和體會,分享并求拍磚。然后又請求封鎖,當釋放了上的封鎖之后,系統又批準了的請求一直等待。 后端好書閱讀與推薦系列文章:后端好書閱讀與推薦后端好書閱讀與推薦(續)后端好書閱讀與推薦(續二)后端好書閱讀與推薦(續三) 這里依然記錄一下每本書的...
閱讀 1579·2021-10-18 13:35
閱讀 2359·2021-10-09 09:44
閱讀 813·2021-10-08 10:05
閱讀 2707·2021-09-26 09:47
閱讀 3560·2021-09-22 15:22
閱讀 427·2019-08-29 12:24
閱讀 1992·2019-08-29 11:06
閱讀 2852·2019-08-26 12:23