摘要:在上文的代碼中,多線程累加的程序之所以會比單線程還慢得多就是因為在類型的靜態變量上有兩個線程同時調用方法進行累加,這就會導致在這個靜態變量上存在很嚴重的沖突。使用進行任務中引入了一個新的多線程任務執行框架,被稱為。
雖然對于一個計算機程序來說最重要的是正確性,如果一個程序沒辦法產出正確的結果,那么這個程序的價值就大打折扣了。但程序性能也是很重要的一個方面,如果程序運行得太慢,那也會影響到程序的適用范圍和硬件配置的成本。
在之前的文章《4.多線程中那些看不到的陷阱》中,我們了解了線程間的同步機制,這主要是為了保證程序在多線程環境下的正確性。在這篇文章中我們將會深入探究多線程程序的性能瓶頸和多種不同的優化方式,那么我們首先就從對程序性能的測量與分析開始吧。
分析多線程程序的性能我們先來看一個使用AtomicLong進行多線程計數的程序,下面的程序中會啟動兩個線程,每個線程會對靜態變量count進行一億次(10的8次方)的累加操作,這段代碼在開始和結束的時候都獲取了當前時間,然后通過這兩個時間值計算程序的運行耗時。
public class AtomicIntegerTest { private static AtomicLong count = new AtomicLong(0); public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); Runnable task = new Runnable() { public void run() { for (int i = 0; i < 1e8; ++i) { count.incrementAndGet(); } } }; Thread t1 = new Thread(task); Thread t2 = new Thread(task); t1.start(); t2.start(); t1.join(); t2.join(); System.out.println("count = " + count); long endTime = System.currentTimeMillis(); System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3)); } }
在我的電腦上運行這段程序最后輸出的結果是2.44s,看起來有點長了,那么我們再看一下如果直接在單個線程中對一個整型變量累加兩億次會是什么結果。下面是一個在單個線程中累加兩億次的程序代碼:
public class SingleThreadTest { public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); long count = 0; for (int i = 0; i < 2e8; ++i) { count += 1; } System.out.println("count = " + count); long endTime = System.currentTimeMillis(); System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3)); } }
這段代碼運行耗時只有0.33s,我們通過兩個線程進行累加的代碼竟然比單線程的代碼還要慢得多,這不是就和我們使用多線程加快程序運行的初衷相違背了嗎?
多線程程序的線程間同步是影響多線程程序性能的關鍵所在,一方面程序中必須串行化的部分會使系統整體的耗時顯著增加,另一方面同步行為本身的開銷也比較大,特別是在發生沖突的情況下。在上文的代碼中,多線程累加的程序之所以會比單線程還慢得多就是因為在AtomicLong類型的靜態變量count上有兩個線程同時調用incrementAndGet方法進行累加,這就會導致在這個靜態變量上存在很嚴重的沖突。
當一個線程成功修改了變量count的值后,另外一個正在修改的線程就會修改失敗并且會再次重試累加操作。并且因為AtomicLong類型的對象中是用一個volatile變量來保存實際的整型值的,而我們在之前的文章《多線程中那些看不到的陷阱》中可以了解到,對volatile變量的修改操作一定要把修改后的數據從高速緩存寫回內存當中,這也是用AtomicLong進行累加的耗時比單線程累加版本還要多這么多的主要原因。
那么我們有沒有更好的方法可以解決這個問題呢?
使用任務拆分進行優化在上面的例子中,我們需要的只是最終累加的結果,所以為了減小線程間同步的開銷,我們可以將累加任務拆分到不同的線程中執行,到最后再把每個線程的結果加在一起就可以得到最終的結果了。在下面的代碼中我們就使用了這種方法,t1在count1上累加一億次,t2在count2上累加一億次,最后把count1和count2相加得到最終的結果,我們來一起運行一下,看看效果如何。
public class TwoThreadTest { private static long count1 = 0; private static long count2 = 0; public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); Thread t1 = new Thread(new Runnable() { public void run() { for (int i = 0; i < 1e8; ++i) { count1 += 1; } } }); Thread t2 = new Thread(new Runnable() { public void run() { for (int i = 0; i < 1e8; ++i) { count2 += 1; } } }); t1.start(); t2.start(); t1.join(); t2.join(); long count = count1 + count2; System.out.println("count = " + count); long endTime = System.currentTimeMillis(); System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3)); } }
這段程序在我的電腦上的耗時是0.20s,比之前單線程的0.33s有了不小的提高,更是遙遙領先原先用兩個線程累加AtomicLong類型變量版本的2.44s。這說明我們之前的分析是正確的,CAS重試和volatile寫回內存兩個操作所引起的開銷是AtomicLong版本程序性能低下的罪魁禍首。
但是這個版本的結構還是略顯原始了,在應付累加這種簡單的需求時可能還比較容易,但是一旦面臨復雜的并發任務,那可能就要寫很多復雜的代碼,并且很容易出現錯誤了。例如我們如果想把任務拆分到10個線程中運行,那么我們就要首先把兩億次的累加任務拆分為10份,然后還要創建一個包含10個Thread對象的數組讓他們分別的對不同范圍進行累加,最后還要通過join方法等待這10個線程都執行完成,這個任務聽起來就不太容易。沒關系,下面我們將會介紹一種目前比較常用的任務拆分與運行框架來解決這個問題,通過這個框架我們可以很容易地寫出易于編寫和擴展的任務拆分式程序。
使用ForkJoinPool進行任務JDK 1.7中引入了一個新的多線程任務執行框架,被稱為ForkJoinPool。ForkJoinPool是一個Java類,它實現了代表線程池功能的ExecutorService接口,所以它在使用方法上和常用的線程池類ThreadPoolExecutor相似,但在本節中我們并不需要了解線程池的詳細用法,不過感興趣的讀者可以參考這篇文章從0到1玩轉線程池來了解一下。
線程池就是一個線程的集合,其中的線程會一直等待執行任務,所以我們可以把任務以任務對象的形式提交到線程池,然后線程池就會利用其中的線程來執行任務。在ForkJoinPool的使用中,線程池指的就是ForkJoinPool類型的對象,而任務對象指的就是繼承自ForkJoinTask的類的對象。在下面的示例代碼中,我們使用了自定義的RecursiveTask的子類來作為任務類,RecursiveTask類就繼承自ForkJoinTask類。
Recursive的意思是遞歸,也就是說我們在這個任務類的執行過程中可能會創建新的任務類對象來代表當前任務的子任務,然后通過結合多個子任務的結果來返回當前任務的結果。比如一開始的任務是累加兩億次,但那么我們就可以把它分為兩個分別累加一億次的子任務的結果之和,同樣的道理,累加一億次的子任務也可以再被分為兩個累加五千萬次的子子任務。這樣的拆分會一直持續到我們認為任務規模已經足夠小的時候,這時子任務的結果就會被計算,然后再返回給上層任務進行處理之后就得到上層任務的結果了。
如果前面這一段文字描述看不明白也沒關系,我們在代碼中找一找答案:
public class ForkJoinTest { private static class AccumulateTask extends RecursiveTask{ private long start; private long end; private long threshold; /** * 任務的構造函數 * * @param start 任務處理范圍的起始點(包含) * @param end 任務處理范圍的結束點(不包含) * @param threshold 任務拆分的閾值 */ public AccumulateTask(long start, long end, long threshold) { this.start = start; this.end = end; this.threshold = threshold; } @Override protected Long compute() { long left = start; long right = end; // 終止條件:如果當前處理的范圍小于等于閾值(threshold), // 那么就直接通過循環執行累加操作 if (right - left <= (int) threshold) { long result = 0; for (long i = left; i < right; ++i) { result += 1; } return result; } // 獲取當前處理范圍的中心點 long mid = (start + end) / 2; // 拆分出兩個子任務,一個從start到mid,一個從mid到end ForkJoinTask leftTask = new AccumulateTask(start, mid, threshold); ForkJoinTask rightTask = new AccumulateTask(mid, end, threshold); // 通過當前線程池運行兩個子任務 leftTask.fork(); rightTask.fork(); try { // 獲取兩個子任務的結果并返回 return leftTask.get() + rightTask.get(); } catch (Exception e) { return 0L; } } } public static void main(String[] args) throws Exception { long startTime = System.currentTimeMillis(); // 創建總任務,范圍是從1到兩億(包含),閾值為10的7次方,所以最終至少會有10個任務進行for循環的累加 AccumulateTask forkJoinTask = new AccumulateTask(1, (int) 2e8+1, (long) 1e7); // 使用一個新創建的ForkJoinPool任務池運行ForkJoin任務 new ForkJoinPool().submit(forkJoinTask); // 打印任務結果 System.out.println("count = " + forkJoinTask.get()); // 計算程序耗時并打印 long endTime = System.currentTimeMillis(); System.out.println(String.format("總耗時:%.2fs", (endTime - startTime) / 1e3)); } }
在上面的代碼中,我們會不斷創建在指定范圍內累加的子任務,直到任務范圍小于閾值threshold(在代碼中是10的7次方)時才不再拆分子任務,而是通過循環來得到累加結果。之后子任務的返回結果在上層任務中相加并作為上層任務的結果返回。到最后我們就可以得到累加兩億次的結果了。
在這個程序中,最重要的是對任務對象的三個操作:
創建任務對象,代碼中使用的是new AccumulateTask(start, mid, threshold)和new AccumulateTask(mid, end, threshold),這兩段代碼會創建除了范圍不同其他邏輯與父任務完全一致的子任務,每個子任務會負責執行父任務范圍中的一半;
執行子任務,通過調用任務對象的fork()方法可以讓子任務被提交到當前ForkJoinPool中執行;
等待子任務返回結果,通過調用子任務任務對象的get()方法,父任務將會等待子任務執行完成并返回結果,然后將兩個子任務的結果相加得到父任務的執行結果。
為什么同樣都是線程池,但是ThreadPoolExecutor類就很難實現這樣的執行方式呢?細心的讀者可能已經發現了,我們在一個任務中會拆分出兩個子任務,并且要等待這兩個子任務都執行完成才能返回父任務的結果。如果是在ThreadPoolExecutor中,在等待子任務運行完成得到結果時,父任務會一直阻塞并且占用一個線程,這樣的話如果父任務太多就會導致子任務沒有線程可供使用了,這個運行流程就沒辦法繼續執行下去了。而ForkJoinPool這個特殊的線程池就解決了這個問題,父任務在等待子任務執行時可以讓出線程給其他任務,這樣就不會導致線程都被阻塞狀態的父任務所阻塞了。
這種將任務拆分為互不依賴的子任務,然后分別在不同的線程上執行,最后再將結果進行逐步合并的方法就被稱為Map-Reduce。這種方法在離線大數據技術中被廣泛應用,甚至可以說大數據相關技術就是在Map-Reduce思想基礎上發展起來的也不為過。
線程內變量通過上面的幾個例子,我們可以看到,對于多線程程序來說,共享數據就是最大的問題。共享數據不但可能引起數據競爭問題,導致程序出現問題;而且隨著引入的線程同步操作又會拉低程序的性能,甚至可能使多線程程序的執行時間比單線程程序還長得多。在上面的例子中,我們通過使用ForkJoinPool拆分并執行了一個累加任務,各個子任務之間基本完全獨立,做到了最大程度的并行化。但是在一些情況下,我們可能沒辦法做到如此理想的方案,在一些情況下還是會留有一定的線程同步操作和對應的代碼臨界區。
那么在這些情況下我們如何處理能讓程序的性能盡可能高呢?
假設我們現在要統計一個方法的調用次數,如果可能有多個線程同時調用該方法,那么就需要對多個線程的調用同時計數。這種情況下我們可以考慮在每個線程里各自保留一個整型變量用于保存每個線程內的調用次數,然后在獲取總數時只需要把每個線程中的數量加在一起就能計算出來了。而在累加計數時我們只需要修改當前線程對應的變量就可以了,自然就沒有了數據競爭問題。java.util.concurrent.atomic包中的累加器LongAdder類采用的就是這樣的思路,這種思路也有自己專門的專業術語,被稱為“線程封閉”,線程封閉指的就是這種通過線程內變量來避免線程間共享數據的優化方式。
Java中也有專門的ThreadLocal類可以處理線程內變量,只是因為性能和線程銷毀時的數據保存之類的原因一般不會用于多線程累加這樣的數據聚合場景,但是在保存和獲取數據方面非常的便利,有興趣的讀者可以了解一下。
ConcurrentHashMapjava.util.concurrent包為我們提供了鎖、原子類、線程池、ForkJoinPool等一大批并發編程工具。最后,我們來了解一下java.util.concurrent包中為我們提供的一種線程安全數據結構。
在Java中,我們常用的Map類是HashMap,但是這個類并不是線程安全的,如果我們在多個線程中同時對HashMap對象進行讀寫,那么就有可能引發一些程序問題。還有一個從JDK 1.0起就已經存在的Hashtable類可以保證線程安全,但是我們打開這個類的源代碼可以看到,這個類中的大部分方法上都加上了synchronized標記,其中包括了最常用的get和put方法,這意味著Hashtable類的對象同一時間幾乎只能被一個線程所使用,這樣的效率相對是比較低的。
但是其實熟悉HashMap結構的朋友可能會知道,HashMap內部的結構是分為很多的桶的,每個鍵值對都會根據key值的hashCode值被放到不同的桶中。其實在做修改操作時我們只需要對對應的一個桶加鎖就可以了,而在執行讀操作時,在大多數情況下是不用加鎖的。JDK 1.5中引入的ConcurrentHashMap基本能達到這兩點。
在這個類中,我們通過兩種方式來優化了并發的性能:
通過限制鎖保護的代碼范圍來減少了鎖沖突發生的可能性,而且也減少了需要的鎖數量,減少了同步產生的開銷;
另一方面因為讀取的時候并不需要加鎖,而只有寫操作才需要加鎖,在一些讀操作較多但是寫操作較少的情況下,我們就能大大降低讀操作的成本,從而提高了程序的性能。
而且ConcurrentHashMap中的大多數方法不僅優化了同步機制的效率,而且提供了很多原子性的類似于CAS的操作方法,下面是ConcurrentHashMap類常用的操作:
V putIfAbsent(K key, V value),原子性操作,如果map中不包含key,則執行map.put(key, value)并將put方法的返回值返回,否則直接返回map.get(key)的值,即當前值;
boolean remove(Object key, Object value),原子性操作,如果滿足條件 (map中包含key對應的鍵值對 && value參數等于鍵值對中當前的value值) 則移除key對應的鍵值對并返回true,否則返回false;
boolean replace(K key, V oldValue, V newValue),原子性操作,當滿足 (map中包含key對應的鍵值對 && oldValue參數等于鍵值對中當前的value值)時,將key對應的值改為newValue并返回true,否則返回false。
總結我們在這篇文章中從對一個使用AtomicLong進行多線程累加的程序的性能測試開始,通過Map-Reduce思想大大優化了這個程序的性能,在此過程中還涉及到了ForkJoinPool類的使用。之后我們通過線程內變量正式提出了“線程封閉”的概念,如果我們能做到線程封閉,那么因為減少了線程間同步的開銷,所以線程的性能一定會有很大的提高。最后我們介紹了java.util.concurrent包中為我們提供的并發安全數據類ConcurrentHashMap。相信通過這篇文章大家能夠了解到多種多線程性能優化方法,但最重要的還是要找出多線程程序性能的瓶頸所在,這樣才能在實際的實踐場景中根據不同的情況因時制宜,使用合適的方法解決不同的瓶頸問題。本文中的觀點是多線程程序的瓶頸主要在于共享數據引起的數據競爭問題,如果能夠讓不同的線程間不存在或者盡可能少地存在共享數據與臨界區代碼,就能夠對多線程程序的性能起到正面的影響。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77883.html
摘要:限制同時運行線程數使用類就行,在內部管理著一個計數器。當計數器到時,再調用就會阻塞,直到其他線程來調用,這樣就限制了同時運行線程的數量。 事前最好了解一下什么是進程,什么是線程,什么是GIL,本文不再贅述,直接介紹模塊的使用: 推薦1,推薦2,推薦3,更多自尋 普通的python爬蟲是單進程單線程的,這樣在遇到大量重復的操作時就只能逐個進行,我們就很難過了。舉個栗子:你有1000個...
摘要:在回調隊列中,函數等待調用棧為空,因為每個語句都執行一次。最后一個運行,并且從調用棧中彈出。它將回調以先進先出順序移動到調用棧并執行。 翻譯:瘋狂的技術宅原文: https://medium.freecodecamp.o... 本文首發微信公眾號:前端先鋒歡迎關注,每天都給你推送新鮮的前端技術文章 Node.js 是一個 JavaScript 運行時環境。聽起來還不錯,不過這究竟...
閱讀 683·2021-11-22 09:34
閱讀 3821·2021-09-22 15:42
閱讀 1327·2021-09-03 10:28
閱讀 1072·2021-08-26 14:13
閱讀 1901·2019-08-29 15:41
閱讀 1423·2019-08-29 14:12
閱讀 3364·2019-08-26 18:36
閱讀 3307·2019-08-26 13:47