摘要:而且我們可以看到,在線程數相同的情況下,使用并行流時,用時要比方法更短。所以使用并行流之前,我們要注意到這個細節。
對于斐波那契數的計算,我們都知道最容易理解的就是遞歸的方法:
public long recursiveFibonacci(int n) { if (n < 2) { return 1; } return recursiveFibonacci(n - 1) + recursiveFibonacci(n - 2); }
當然這個遞歸也可以轉化為迭代:
public long iterativeFibonacci(int n) { long n1 = 1, n2 = 1; long fi = 2; // n1 + n2 for (int i = 2; i <= n; i++) { fi = n1 + n2; n1 = n2; n2 = fi; } return fi; }
但是,對于以上兩種方法,并不能并行化,因為后一項的值依賴于前一項,使得算法流程是串行的。所以引出了可以并行的計算斐波那契數的公式:
=>
f0 和 f1 都是 1 —— 很明顯我們可以對 進行并行計算。
首先我們定義一個 Matrix 類,用來表示一個 2*2 的矩陣:
public class Matrix { /** * 左上角的值 */ public final BigInteger a; /** * 右上角的值 */ public final BigInteger b; /** * 左下角的值 */ public final BigInteger c; /** * 右下角的值 */ public final BigInteger d; public Matrix(int a, int b, int c, int d) { this(BigInteger.valueOf(a), BigInteger.valueOf(b), BigInteger.valueOf(c), BigInteger.valueOf(d)); } public Matrix(BigInteger a, BigInteger b, BigInteger c, BigInteger d) { this.a = a; this.b = b; this.c = c; this.d = d; } /** * multiply * * @param m multiplier * @return */ public Matrix mul(Matrix m) { return new Matrix( a.multiply(m.a).add(b.multiply(m.c)), // a*a + b*c a.multiply(m.b).add(b.multiply(m.d)), // a*b + b*d c.multiply(m.a).add(d.multiply(m.c)), // c*a + d*c c.multiply(m.b).add(d.multiply(m.d)));// c*b + d*d } /** * power of exponent * * @param exponent * @return */ public Matrix pow(int exponent) { Matrix matrix = this.copy(); for (int i = 1; i < exponent; i++) { matrix = matrix.mul(this); } return matrix; } public Matrix copy() { return new Matrix(a, b, c, d); } }
然后我們來比較迭代和并行的效率:
我們先設置并行使用的線程數為 1,即單線程。
public static void main(String[] args) throws Exception { final int ITEM_NUM = 500000; // 計算斐波那契數列的第 ITEM_NUM 項 System.out.println("開始迭代計算..."); long begin = System.nanoTime(); BigInteger fi1 = iterativeFibonacci(ITEM_NUM); long end = System.nanoTime(); double time = (end - begin) / 1E9; System.out.printf("迭代計算用時: %.3f ", time); /* ------------------------------ */ System.out.println("開始并行計算..."); begin = System.nanoTime(); BigInteger fi2 = parallelFibonacci(ITEM_NUM, 1); end = System.nanoTime(); time = (end - begin) / 1E9; System.out.printf("并行計算用時: %.3f ", time); System.out.println("fi1 == fi2:" + (fi1.equals(fi2))); } static BigInteger iterativeFibonacci(int n) { BigInteger n1 = BigInteger.ONE; BigInteger n2 = BigInteger.ONE; BigInteger fi = BigInteger.valueOf(2); // n1 + n2 for (int i = 2; i <= n; i++) { fi = n1.add(n2); n1 = n2; n2 = fi; } return fi; } static BigInteger parallelFibonacci(int itemNum, int threadNum) throws Exception { final Matrix matrix = new Matrix(1, 1, 1, 0); final Matrix primary = new Matrix(1, 0, 1, 0); // (f0, 0; f1, 0) final int workload = itemNum / threadNum; // 每個線程要計算的 相乘的項數 // (num / threadNum) 可能存在除不盡的情況,所以最后一個任務計算所有剩下的項數 final int lastWorkload = itemNum - workload * (threadNum - 1); List> tasks = new ArrayList<>(threadNum); for (int i = 0; i < threadNum; i++) { if (i < threadNum - 1) { // 為了簡潔,使用 Lambda 表達式替代要實現 Callable 的匿名內部類 tasks.add(() -> matrix.pow(workload)); } else { tasks.add(() -> matrix.pow(lastWorkload)); } } ExecutorService threadPool = Executors.newFixedThreadPool(threadNum); List > futures = threadPool.invokeAll(tasks); // 執行所有任務,invokeAll 會阻塞直到所有任務執行完畢 Matrix result = primary.copy(); for (Future future : futures) { // (matrix ^ n) * (f0, 0; f1, 0) result = result.mul(future.get()); } threadPool.shutdown(); return result.c; }
可以看到單線程情況下,使用矩陣運算的效率大概只有迭代計算的 1/3 左右 —— 既然如此,那我們耍流氓的把并行的線程數改為 10 線程吧:
BigInteger fi2 = parallelFibonacci(ITEM_NUM, 10); // 10 線程并行計算
可以看到,此時并行計算的用時碾壓了迭代計算 —— 迭代計算委屈的哭了,并行計算這流氓耍的相當漂亮。
好像有點不對勁,我這篇文章的標題似乎是 使用并行流 —— 并行流呢?
其實前面都是鋪墊 :) 在 parallelFibonacci 方法中,我們使用了線程池來并行的執行任務,我們來嘗試將 parallelFibonacci 改為流式(即基于 Stream)風格的代碼:
static BigInteger streamFibonacci(int itemNum, int threadNum) { final Matrix matrix = new Matrix(1, 1, 1, 0); final Matrix primary = new Matrix(1, 0, 1, 0); final int workload = itemNum / threadNum; final int lastWorkload = itemNum - workload * (threadNum - 1); // 流式 API return IntStream.range(0, threadNum) // 產生 [0, threadNum) 區間,用于將任務切分 .parallel() // 使流并行化 .map(i -> i < threadNum - 1 ? workload : lastWorkload) .mapToObj(w -> matrix.pow(w)) // map -> mN = matrix ^ workload .reduce((m1, m2) -> m1.mul(m2)) // reduce -> m = m1 * m2 * ... * mN .map(m -> m.mul(primary)) // map -> m = m * primary .get().c; // get -> m.c }
依舊在 10 線程的環境下運行下看看:
public static void main(String[] args) throws Exception { ... /* ------------------------------ */ System.out.println("開始流式并行計算..."); begin = System.nanoTime(); BigInteger fi3 = streamFibonacci(ITEM_NUM, 10); end = System.nanoTime(); time = (end - begin) / 1E9; System.out.printf("流式并行計算用時: %.3f ", time); System.out.println("fi1 == fi2:" + (fi1.equals(fi2))); System.out.println("fi1 == fi3:" + (fi1.equals(fi3))); }
是的,使用并行流就是這么的簡單,只要你會使用 Stream API —— 給它加上 .parallel() —— 它就并行化了。寫了這么多年的 Java 代碼,從 Java6 到 Java7 再到 Java8,這一刻,我真的感動了(容我擦擦眼淚)。
而且我們可以看到,在線程數相同的情況下,使用 streamFibonacci(并行流)時,用時要比parallelFibonacci 方法更短。為了驗證,我夸張一點,將線程數提高到 32:
BigInteger fi2 = parallelFibonacci(ITEM_NUM, 32); ... BigInteger fi3 = streamFibonacci(ITEM_NUM, 32);
可以看到,此時 parallelFibonacci 的運行時間反而比 10 線程的時候更長了,而 streamFibonacci 使用的時間卻更短了 —— 流式 API 厲害了!
但這是什么原因呢?這個問題留給有興趣的讀者思考和探究吧。
值得注意的是,并行流的底層實現是基于 ForkJoinPool 的,并且使用的是一個共享的 ForkJoinPool —— ForkJoinPool.commonPool()。為了充分利用處理器資源和提升程序性能,我們應該盡量使用并行流來執行 CPU 密集的任務,而不是 IO 密集的任務 —— 因為共享池中的線程數量是有限的,如果共享池中某些線程執行 IO 密集的任務,那么這些線程將長時間處于等待 IO 操作完成的狀態,一旦共享池中的線程耗盡,那么程序中其他想繼續使用并行流的地方就需要等待,直到有空閑的線程可用,這會在很大程度上影響到程序的性能。所以使用并行流之前,我們要注意到這個細節。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/66909.html
摘要:前言系列神秘的系列神奇的函數式接口繼上兩篇之后,本文已經系列的第三篇了。相反,他們會返回一個持有結果的新。操作是延遲執行的。截斷流,使其元素不超過給定數量。返回流中元素總數。返回流中最大值。 前言 「Java8系列」神秘的Lambda「Java8系列」神奇的函數式接口繼上兩篇之后,本文已經java8系列的第三篇了。本篇文章比較長,但我希望大家都能認真讀完。讀不完可以先收藏,在找時間讀。...
摘要:實戰讀書筆記第一章從方法傳遞到接著上次的,繼續來了解一下,如果繼續簡化代碼。去掉并且生成的數字是萬,所消耗的時間循序流并行流至于為什么有時候并行流效率比循序流還低,這個以后的文章會解釋。 《Java8實戰》-讀書筆記第一章(02) 從方法傳遞到Lambda 接著上次的Predicate,繼續來了解一下,如果繼續簡化代碼。 把方法作為值來傳遞雖然很有用,但是要是有很多類似與isHeavy...
摘要:類似的你可以用將并行流變為順序流。中的使用順序求和并行求和將流轉為并行流配置并行流線程池并行流內部使用了默認的,默認的線程數量就是處理器的數量包括虛擬內核通過得到。 【概念 并行流就是一個把內容分成多個數據塊,并用不同的線程分別處理每一個數據塊的流。在java7之前,并行處理數據很麻煩,第一,需要明確的把包含數據的數據結構分成若干子部分。第二,給每一個子部分分配一個獨立的線程。第三,適...
摘要:串行與并行可以分為串行與并行兩種,串行流和并行流差別就是單線程和多線程的執行。返回串行流返回并行流和方法返回的都是類型的對象,說明它們在功能的使用上是沒差別的。唯一的差別就是單線程和多線程的執行。 Stream是什么 Stream是Java8中新加入的api,更準確的說: Java 8 中的 Stream 是對集合(Collection)對象功能的增強,它專注于對集合對象進行各種非常便...
閱讀 3762·2021-09-22 15:17
閱讀 1946·2021-09-22 14:59
閱讀 2346·2020-12-03 17:00
閱讀 3209·2019-08-30 15:55
閱讀 482·2019-08-30 11:23
閱讀 3487·2019-08-29 13:56
閱讀 518·2019-08-29 12:54
閱讀 2257·2019-08-29 12:49