摘要:對于任務的分割,要求各個子任務之間相互獨立,能夠并行獨立地執行任務,互相之間不影響。是叉子分叉的意思,即將大任務分解成并行的小任務,是連接結合的意思,即將所有并行的小任務的執行結果匯總起來。使用方法會阻塞并等待子任務執行完并得到其結果。
Fork/Join是什么?
Fork/Join框架是Java7提供的并行執行任務框架,思想是將大任務分解成小任務,然后小任務又可以繼續分解,然后每個小任務分別計算出結果再合并起來,最后將匯總的結果作為大任務結果。其思想和MapReduce的思想非常類似。對于任務的分割,要求各個子任務之間相互獨立,能夠并行獨立地執行任務,互相之間不影響。
Fork/Join的運行流程圖如下:
[站外圖片上傳中...(image-938a4-1529976385943)]
我們可以通過Fork/Join單詞字面上的意思去理解這個框架。Fork是叉子分叉的意思,即將大任務分解成并行的小任務,Join是連接結合的意思,即將所有并行的小任務的執行結果匯總起來。
工作竊取算法ForkJoin采用了工作竊取(work-stealing)算法,若一個工作線程的任務隊列為空沒有任務執行時,便從其他工作線程中獲取任務主動執行。為了實現工作竊取,在工作線程中維護了雙端隊列,竊取任務線程從隊尾獲取任務,被竊取任務線程從隊頭獲取任務。這種機制充分利用線程進行并行計算,減少了線程競爭。但是當隊列中只存在一個任務了時,兩個線程去取反而會造成資源浪費。
工作竊取的運行流程圖如下:
[站外圖片上傳中...(image-17ddfc-1529976385943)]
Fork/Join核心類Fork/Join框架主要由子任務、任務調度兩部分組成,類層次圖如下。
ForkJoinPool
ForkJoinPool是ForkJoin框架中的任務調度器,和ThreadPoolExecutor一樣實現了自己的線程池,提供了三種調度子任務的方法:
execute:異步執行指定任務,無返回結果;
invoke、invokeAll:異步執行指定任務,等待完成才返回結果;
submit:異步執行指定任務,并立即返回一個Future對象;
ForkJoinTask
Fork/Join框架中的實際的執行任務類,有以下兩種實現,一般繼承這兩種實現類即可。
RecursiveAction:用于無結果返回的子任務;
RecursiveTask:用于有結果返回的子任務;
Fork/Join框架實戰下面實現一個Fork/Join小例子,從1+2+...10億,每個任務只能處理1000個數相加,超過1000個的自動分解成小任務并行處理;并展示了通過不使用Fork/Join和使用時的時間損耗對比。
import java.util.concurrent.ForkJoinPool; import java.util.concurrent.RecursiveTask; public class ForkJoinTask extends RecursiveTask{ private static final long MAX = 1000000000L; private static final long THRESHOLD = 1000L; private long start; private long end; public ForkJoinTask(long start, long end) { this.start = start; this.end = end; } public static void main(String[] args) { test(); System.out.println("--------------------"); testForkJoin(); } private static void test() { System.out.println("test"); long start = System.currentTimeMillis(); Long sum = 0L; for (long i = 0L; i <= MAX; i++) { sum += i; } System.out.println(sum); System.out.println(System.currentTimeMillis() - start + "ms"); } private static void testForkJoin() { System.out.println("testForkJoin"); long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); Long sum = forkJoinPool.invoke(new ForkJoinTask(1, MAX)); System.out.println(sum); System.out.println(System.currentTimeMillis() - start + "ms"); } @Override protected Long compute() { long sum = 0; if (end - start <= THRESHOLD) { for (long i = start; i <= end; i++) { sum += i; } return sum; } else { long mid = (start + end) / 2; ForkJoinTask task1 = new ForkJoinTask(start, mid); task1.fork(); ForkJoinTask task2 = new ForkJoinTask(mid + 1, end); task2.fork(); return task1.join() + task2.join(); } } }
這里需要計算結果,所以任務繼承的是RecursiveTask類。ForkJoinTask需要實現compute方法,在這個方法里首先需要判斷任務是否小于等于閾值1000,如果是就直接執行任務。否則分割成兩個子任務,每個子任務在調用fork方法時,又會進入compute方法,看看當前子任務是否需要繼續分割成孫任務,如果不需要繼續分割,則執行當前子任務并返回結果。使用join方法會阻塞并等待子任務執行完并得到其結果。
程序輸出:
test 500000000500000000 4992ms -------------------- testForkJoin 500000000500000000 508ms
從結果看出,并行的時間損耗明顯要少于串行的,這就是并行任務的好處。
盡管如此,在使用Fork/Join時也得注意,不要盲目使用。
如果任務拆解的很深,系統內的線程數量堆積,導致系統性能性能嚴重下降;
如果函數的調用棧很深,會導致棧內存溢出;
推薦閱讀干貨:Spring Boot & Cloud 最強技術教程
工具:推薦一款在線創作流程圖、思維導圖軟件
分享Java干貨,高并發編程,熱門技術教程,微服務及分布式技術,架構設計,區塊鏈技術,人工智能,大數據,Java面試題,以及前沿熱門資訊等。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/71384.html
摘要:第二步執行任務并合并結果。使用兩個類來完成以上兩件事情我們要使用框架,必須首先創建一個任務。用于有返回結果的任務。如果任務順利執行完成了,則設置任務狀態為,如果出現異常,則紀錄異常,并將任務狀態設置為。 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一個用于并行執行任務的框架, 是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的...
摘要:框架框架簡介框架是提供的一個用于并行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果得到大任務結果的框架。框架實例需求計算的結果。 Fork/Join框架 1. Fork/Join框架簡介 Fork/Join框架是java7提供的一個用于并行執行任務的框架,是一個把大任務分割成若干個小任務,最終匯總每個小任務結果得到大任務結果的框架。Fork指的就是把一個大任務...
摘要:分區函數返回一個布爾值,這意味著得到的分組的鍵類型是,于是它最多可以分為兩組是一組,是一組。當遍歷到流中第個元素時,這個函數執行時會有兩個參數保存歸約結果的累加器已收集了流中的前個項目,還有第個元素本身。 一、收集器簡介 把列表中的交易按貨幣分組: Map transactionsByCurrencies = transactions.stream().collect(groupi...
摘要:使用方式要把任務提交到線程池,必須創建的一個子類,其中是并行化任務產生的結果如果沒有結果使用類型。對一個子任務調用的話,可以使一個子任務重用當前線程,避免線程池中多分配一個任務帶來的開銷。 【概念 分支和并框架的目的是以遞歸的方式將可以并行的任務拆分成更小的任務,然后將每個子任務的結果合并起來生成整體的結果,它是ExecutorService的一個實現,它把子任務分配給線程池(Fork...
摘要:概述簡介并行流就是把一個內容分成多個數據塊,并用不同的線程分別處理每個數據塊的流中將并行進行了優化,我們可以很容易的對數據進行并行操作,可以聲明性地通過與在并行流與順序流之間進行切換。 1. 概述 1.1 簡介 并行流就是把一個內容分成多個數據塊,并用不同的線程分別處理每個數據塊的流 Java 8 中將并行進行了優化,我們可以很容易的對數據進行并行操作,Stream API 可以聲明性...
閱讀 2849·2021-08-20 09:37
閱讀 1607·2019-08-30 12:47
閱讀 1090·2019-08-29 13:27
閱讀 1685·2019-08-28 18:02
閱讀 749·2019-08-23 18:15
閱讀 3084·2019-08-23 16:51
閱讀 931·2019-08-23 14:13
閱讀 2125·2019-08-23 13:05