摘要:類似的你可以用將并行流變為順序流。中的使用順序求和并行求和將流轉為并行流配置并行流線程池并行流內部使用了默認的,默認的線程數量就是處理器的數量包括虛擬內核通過得到。
【概念
并行流就是一個把內容分成多個數據塊,并用不同的線程分別處理每一個數據塊的流。在java7之前,并行處理數據很麻煩,第一,需要明確的把包含數據的數據結構分成若干子部分。第二,給每一個子部分分配一個獨立的線程。第三,適當的時候進行同步,避免出現數據競爭帶來的問題,最后將每一個子部分的結果合并。在java7中引入了forkjoin框架來完成這些步驟,而java8中的stream接口可以讓你不費吹灰之力就對數據執行并行處理,而stream接口幕后正是使用的forkjoin框架。不過,對順序流調用parallel()并不意味著流本身有任何的變化。它在內部實際上就是設了一個boolean標志,表示你想讓parallel()之后的操作都并行執行。類似的你可以用sequential()將并行流變為順序流。這兩個方法可以讓我們更細化的控制流。
eg.java8中stream的使用:
//順序求和 public static long sum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) .reduce(0l,Long::sum); } //并行求和 public static long parallelSum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) //將流轉為并行流 .parallel() .reduce(0l,Long::sum); }【配置并行流線程池
并行流內部使用了默認的forkjoinPool,默認的線程數量就是處理器的數量(包括虛擬內核),
通過:Runtime.getRuntime().availableProcessors() 得到。
通過:System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12")來改變線程池大小。
我們不應該理所當然的任認為多線程比順序執行的效率更高,來看下面的例子:
public class Exercise { public static void main(String[] args) { long num = 1000_000_0; long st = System.currentTimeMillis(); System.out.println("iterate順序" + sum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("iterate并行" + parallelSum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("迭代" + forSum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("LongStream并行" + longStreamParallelSum(num) + ":" +(System.currentTimeMillis() - st)); st = System.currentTimeMillis(); System.out.println("LongStream順序" + longStreamSum(num) + ":" +(System.currentTimeMillis() - st)); } //順序求和 public static long sum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) .reduce(0l,Long::sum); } //并行求和 public static long parallelSum(long n){ return Stream.iterate(1l,i -> i + 1) .limit(n) //將流轉為并行流 .parallel() .reduce(0l,Long::sum); } //迭代求和 public static long forSum(long n){ long result = 0; for(long i = 0 ;i <= n ; i++){ result += i; } return result; } //longStream并行 public static long longStreamParallelSum(long n){ return LongStream.rangeClosed(1,n) .parallel() .reduce(0l,Long::sum); } //longStream順序執行 public static long longStreamSum(long n){ return LongStream.rangeClosed(1,n) .reduce(0l,Long::sum); } }
并行流執行的時間比順序流和迭代執行的要長很多,兩個原因:
iterate()生成的是裝箱對象,必須要拆箱才能求和;
iterate()很難分成多個獨立的塊并行運行,因為每次應用這個函數都要依賴前一次的應用的結果。數字列表在歸納的過程開始時沒有準備好,因而無法有效的把流劃分成小塊來并行處理。但是我們又標記流為并行執行,這就給順序執行增加了開銷,每一次的求和操作都新開啟了一個線程。
【使用更有針對性的的方法LongStream.rangeClosed():
1. 直接產生long類型數據,沒有開箱操作 2. 生成數字范圍,容易拆分成獨立的小塊
由此可見,選擇適當的數據結構往往比并行化算法更重要。并行是有代價的。并行過程需要對流做遞歸劃分,把每個子流的操作分配到不同的線程,然后把這些操作的結果合并成一個值。但是多核之間移動數據的代價比我們想象的要大,所以很重要的一點是保證再內核中并行執行的工作時間比內核之間傳輸數據的時間要長。
【正確的使用并行流錯誤使用并行流的首要原因就是使用的算法改變了共享變量的狀態,因為修改共享變量意味著同步,而使用同步方法就會使的并行毫無意義。以下是一些建議:
1. 測試,并行還是順序執行最重要的基準就是不停的測試性能。 2. 留意裝箱,自動裝箱,拆箱會大大降低性能,java8提供了LongStream,IntStream,DoubleStream來避免這兩個操作。 3. 有些操作本身就是順序執行要率高,例如:limit,findFirst等依賴元素順序的操作。 4. 當執行單個任務的成本高時使用并行,如果單個操作的成本很低,并行執行反而會因為開啟線程,標記狀態等操作使得效率下降。 5. 小量數據不適用并行。 6. 考慮流中背后的數據結構是否易于分解。ArrayList的拆分效率比LinkedList高得多,因為前者用不著便利就可以平均拆分。另外,range工廠方法的原始類型數據流也可以快速分解。以下時流數據源的可分解性: - ArrayList:極佳 - LinkedList:差 - IntStream等:極佳 - Stream.iterate:差 - HashSet:好 - TreeSet:好 7. 中間操作改變流的方法,涉及到排序就不適用并行。 8. 終端操作合并流的代價,涉及到排序就不適用并行。【正確的使用并行
高并發、任務執行時間短的業務,線程池線程數可以設置為CPU核數+1,減少線程上下文的切換
并發不高、任務執行時間長的業務要區分開看:
假如是業務時間長集中在IO操作上,也就是IO密集型的任務,因為IO操作并不占用CPU,所以不要讓所有的CPU閑下來,可以加大線程池中的線程數目,讓CPU處理更多的業務
假如是業務時間長集中在計算操作上,也就是計算密集型任務,這個就沒辦法了,和(1)一樣吧,線程池中的線程數設置得少一些,減少線程上下文的切換
并發高、業務執行時間長,解決這種類型任務的關鍵不在于線程池而在于整體架構的設計,看看這些業務里面某些數據是否能做緩存是第一步,增加服務器是第二步,至于線程池的設置,設置參考(2)。最后,業務執行時間長的問題,也可能需要分析一下,看看能不能使用中間件對任務進行拆分和解耦。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68095.html
摘要:并行流與目前,我們對集合進行計算有兩種方式并行流而更加的靈活,我們可以配置線程池的大小確保整體的計算不會因為等待而發生阻塞。 【回顧Future接口 Future接口時java5引入的,設計初衷是對將來某個時刻會發生的結果建模。它建模了一種異步計算,返回了一個執行預算結果的引用。比如,你去干洗店洗衣服,店員會告訴你什么時候可以來取衣服,而不是讓你一直在干洗店等待。要使用Future只需...
摘要:進程線程與協程它們都是并行機制的解決方案。選擇是任意性的,并在對實現做出決定時發生。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。此線程池支持定時以及周期性執行任務的需求。 并發與并行的概念 并發(Concurrency): 問題域中的概念—— 程序需要被設計成能夠處理多個同時(或者幾乎同時)發生的事件 并行(Parallel...
摘要:關于三者的一些概括總結離線分析框架,適合離線的復雜的大數據處理內存計算框架,適合在線離線快速的大數據處理流式計算框架,適合在線的實時的大數據處理我是一個以架構師為年之內目標的小小白。 整理自《架構解密從分布式到微服務》第七章——聊聊分布式計算.做了相應補充和修改。 [TOC] 前言 不管是網絡、內存、還是存儲的分布式,它們最終目的都是為了實現計算的分布式:數據在各個計算機節點上流動,同...
摘要:限制編寫并行流,存在一些與非并行流不一樣的約定。底層框架并行流在底層沿用的框架,遞歸式的分解問題,然后每段并行執行,最終由合并結果,返回最后的值。 本書第六章的讀書筆記,也是我這個系列的最后一篇讀書筆記。后面7、8、9章分別講的測試、調試與重構、設計和架構的原則以及使用Lambda表達式編寫并發程序,因為筆記不好整理,就不寫了,感興趣的同學自己買書來看吧。 并行化流操作 關于并行與并發...
閱讀 3517·2021-09-27 13:35
閱讀 3557·2019-08-29 17:09
閱讀 2426·2019-08-26 11:30
閱讀 698·2019-08-26 10:32
閱讀 532·2019-08-26 10:23
閱讀 1194·2019-08-26 10:20
閱讀 3150·2019-08-23 15:26
閱讀 3551·2019-08-23 14:33