摘要:時,標(biāo)準(zhǔn)類庫添加了,作為對型線程池的實現(xiàn)。類圖用來專門定義型任務(wù)完成將大任務(wù)分割為小任務(wù)以及合并結(jié)果的工作。
JDK 1.7 時,標(biāo)準(zhǔn)類庫添加了 ForkJoinPool,作為對 Fork/Join 型線程池的實現(xiàn)。Fork 在英文中有 分叉 的意思,而 Join 有 合并 的意思。ForkJoinPool 的功能也是如此:Fork 將大任務(wù)分叉為多個小任務(wù),然后讓小任務(wù)執(zhí)行,Join 是獲得小任務(wù)的結(jié)果,然后進行合并,將合并的結(jié)果作為大任務(wù)的結(jié)果 —— 并且這會是一個遞歸的過程 —— 因為任務(wù)如果足夠大,可以將任務(wù)多級分叉直到任務(wù)足夠小。
由此可見,ForkJoinPool 可以滿足 并行 地實現(xiàn) 分治算法(Divide-and-Conquer) 的需要。
ForkJoinPool 的類圖如下:
可以看到 ForkJoinPool 實現(xiàn)了 ExecutorService 接口,所以首先 ForkJoinPool 也是一個 ExecutorService (線程池)。因而 Runnable 和 Callable 類型的任務(wù),ForkJoinPool 也可以通過 submit、invokeAll 和 invokeAny 等方法來執(zhí)行。但是標(biāo)準(zhǔn)類庫還為 ForkJoinPool 定義了一種新的任務(wù),它就是 ForkJoinTask
ForkJoinTask 類圖:
ForkJoinTask
ForkJoinPool 可以使用三種方法用來執(zhí)行 ForkJoinTask:
invoke 方法:
invoke 方法用來執(zhí)行一個帶返回值的任務(wù)(通常繼承自RecursiveTask),并且該方法是阻塞的,直到任務(wù)執(zhí)行完畢,該方法才會停止阻塞并返回任務(wù)的執(zhí)行結(jié)果。
submit 方法:
除了從 ExecutorService 繼承的 submit 方法外,ForkJoinPool 還定義了用來執(zhí)行 ForkJoinTask 的 submit 方法 —— 一般該 submit 方法用來執(zhí)行帶返回值的ForkJoinTask(通常繼承自RecursiveTask)。該方法是非阻塞的,調(diào)用之后將任務(wù)提交給 ForkJoinPool 去執(zhí)行便立即返回,返回的便是已經(jīng)提交到 ForkJoinPool 去執(zhí)行的 task —— 由類圖可知 ForkJoinTask 實現(xiàn)了 Future 接口,所以可以直接通過 task 來和已經(jīng)提交的任務(wù)進行交互。
execute 方法:
除了從 Executor 獲得的 execute 方法外,ForkJoinPool 也定義了用來執(zhí)行ForkJoinTask 的 execute 方法 —— 一般該 execute 方法用來執(zhí)行不帶返回值的ForkJoinTask(通常繼承自RecursiveAction) ,該方法同樣是非阻塞的。
現(xiàn)在讓我們來實踐下 ForkJoinPool 的功能:計算 π 的值。計算 π 的值有一個通過多項式方法,即:π = 4 * (1 - 1/3 + 1/5 - 1/7 + 1/9 - ……),而且多項式的項數(shù)越多,計算出的 π 的值越精確。
首先我們定義用來估算 π 的 PiEstimateTask:
class PiEstimateTask extends RecursiveTask{ private final long begin; private final long end; private final long threshold; // 分割任務(wù)的臨界值 public PiEstimateTask(long begin, long end, long threshold) { this.begin = begin; this.end = end; this.threshold = threshold; } @Override protected Double compute() { // 實現(xiàn) compute 方法 if (end - begin <= threshold) { // 臨界值之下,不再分割,直接計算 int sign; // 符號,多項式中偶數(shù)位取 1,奇數(shù)位取 -1(位置從 0 開始) double result = 0.0; for (long i = begin; i < end; i++) { sign = (i & 1) == 0 ? 1 : -1; result += sign / (i * 2.0 + 1); } return result * 4; } // 分割任務(wù) long middle = (begin + end) / 2; PiEstimateTask leftTask = new PiEstimateTask(begin, middle, threshold); PiEstimateTask rightTask = new PiEstimateTask(middle, end, threshold); leftTask.fork(); // 異步執(zhí)行 leftTask rightTask.fork(); // 異步執(zhí)行 rightTask double leftResult = leftTask.join(); // 阻塞,直到 leftTask 執(zhí)行完畢返回結(jié)果 double rightResult = rightTask.join(); // 阻塞,直到 rightTask 執(zhí)行完畢返回結(jié)果 return leftResult + rightResult; // 合并結(jié)果 } }
然后我們使用 ForkJoinPool 的 invoke 執(zhí)行 PiEstimateTask:
public class ForkJoinPoolTest { public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(4); // 計算 10 億項,分割任務(wù)的臨界值為 1 千萬 PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000); double pi = forkJoinPool.invoke(task); // 阻塞,直到任務(wù)執(zhí)行完畢返回結(jié)果 System.out.println("π 的值:" + pi); forkJoinPool.shutdown(); // 向線程池發(fā)送關(guān)閉的指令 } }
運行結(jié)果:
我們也可以使用 submit 方法異步的執(zhí)行任務(wù)(此處 submit 方法返回的 future 指向的對象即提交任務(wù)時的 task):
public static void main(String[] args) throws Exception { ForkJoinPool forkJoinPool = new ForkJoinPool(4); PiEstimateTask task = new PiEstimateTask(0, 1_000_000_000, 10_000_000); Futurefuture = forkJoinPool.submit(task); // 不阻塞 double pi = future.get(); System.out.println("π 的值:" + pi); System.out.println("future 指向的對象是 task 嗎:" + (future == task)); forkJoinPool.shutdown(); // 向線程池發(fā)送關(guān)閉的指令 }
運行結(jié)果:
值得注意的是,選取一個合適的分割任務(wù)的臨界值,對 ForkJoinPool 執(zhí)行任務(wù)的效率有著至關(guān)重要的影響。臨界值選取過大,任務(wù)分割的不夠細,則不能充分利用 CPU;臨界值選取過小,則任務(wù)分割過多,可能產(chǎn)生過多的子任務(wù),導(dǎo)致過多的線程間的切換和加重 GC 的負擔(dān)從而影響了效率。所以,需要根據(jù)實際的應(yīng)用場景選擇一個合適的分割任務(wù)的臨界值。
ForkJoinPool 相比于 ThreadPoolExecutor,還有一個非常重要的特點(優(yōu)點)在于,ForkJoinPool具有 Work-Stealing (工作竊取)的能力。所謂 Work-Stealing,在 ForkJoinPool 中的實現(xiàn)為:線程池中每個線程都有一個互不影響的任務(wù)隊列(雙端隊列),線程每次都從自己的任務(wù)隊列的隊頭中取出一個任務(wù)來運行;如果某個線程對應(yīng)的隊列已空并且處于空閑狀態(tài),而其他線程的隊列中還有任務(wù)需要處理但是該線程處于工作狀態(tài),那么空閑的線程可以從其他線程的隊列的隊尾取一個任務(wù)來幫忙運行 —— 感覺就像是空閑的線程去偷人家的任務(wù)來運行一樣,所以叫 “工作竊取”。
Work-Stealing 的適用場景是不同的任務(wù)的耗時相差比較大,即某些任務(wù)需要運行較長時間,而某些任務(wù)會很快的運行完成,這種情況下用 Work-Stealing 很合適;但是如果任務(wù)的耗時很平均,則此時 Work-Stealing 并不適合,因為竊取任務(wù)時不同線程需要搶占鎖,這可能會造成額外的時間消耗,而且每個線程維護雙端隊列也會造成更大的內(nèi)存消耗。所以 ForkJoinPool 并不是 ThreadPoolExecutor 的替代品,而是作為對 ThreadPoolExecutor 的補充。
總結(jié):
ForkJoinPool 和 ThreadPoolExecutor 都是 ExecutorService(線程池),但ForkJoinPool 的獨特點在于:
ThreadPoolExecutor 只能執(zhí)行 Runnable 和 Callable 任務(wù),而 ForkJoinPool 不僅可以執(zhí)行 Runnable 和 Callable 任務(wù),還可以執(zhí)行 Fork/Join 型任務(wù) —— ForkJoinTask —— 從而滿足并行地實現(xiàn)分治算法的需要;
ThreadPoolExecutor 中任務(wù)的執(zhí)行順序是按照其在共享隊列中的順序來執(zhí)行的,所以后面的任務(wù)需要等待前面任務(wù)執(zhí)行完畢后才能執(zhí)行,而 ForkJoinPool 每個線程有自己的任務(wù)隊列,并在此基礎(chǔ)上實現(xiàn)了 Work-Stealing 的功能,使得在某些情況下 ForkJoinPool 能更大程度的提高并發(fā)效率。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/66494.html
摘要:第二步執(zhí)行任務(wù)并合并結(jié)果。使用兩個類來完成以上兩件事情我們要使用框架,必須首先創(chuàng)建一個任務(wù)。用于有返回結(jié)果的任務(wù)。如果任務(wù)順利執(zhí)行完成了,則設(shè)置任務(wù)狀態(tài)為,如果出現(xiàn)異常,則紀(jì)錄異常,并將任務(wù)狀態(tài)設(shè)置為。 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一個用于并行執(zhí)行任務(wù)的框架, 是一個把大任務(wù)分割成若干個小任務(wù),最終匯總每個小任務(wù)結(jié)果后得到大任務(wù)結(jié)果的...
摘要:對于任務(wù)的分割,要求各個子任務(wù)之間相互獨立,能夠并行獨立地執(zhí)行任務(wù),互相之間不影響。是叉子分叉的意思,即將大任務(wù)分解成并行的小任務(wù),是連接結(jié)合的意思,即將所有并行的小任務(wù)的執(zhí)行結(jié)果匯總起來。使用方法會阻塞并等待子任務(wù)執(zhí)行完并得到其結(jié)果。 Fork/Join是什么? Fork/Join框架是Java7提供的并行執(zhí)行任務(wù)框架,思想是將大任務(wù)分解成小任務(wù),然后小任務(wù)又可以繼續(xù)分解,然后每個小...
摘要:這減輕了手動重復(fù)執(zhí)行相同基準(zhǔn)測試的痛苦,并簡化了獲取結(jié)果的流程。處理項目的代碼并從標(biāo)有注釋的方法處生成基準(zhǔn)測試程序。用和運行該基準(zhǔn)測試得到以下結(jié)果。同時,和的基線測試結(jié)果也有略微的不同。 Java 8 已經(jīng)發(fā)布一段時間了,許多開發(fā)者已經(jīng)開始使用 Java 8。本文也將討論最新發(fā)布在 JDK 中的并發(fā)功能更新。事實上,JDK 中已經(jīng)有多處java.util.concurrent 改動,但...
摘要:分區(qū)函數(shù)返回一個布爾值,這意味著得到的分組的鍵類型是,于是它最多可以分為兩組是一組,是一組。當(dāng)遍歷到流中第個元素時,這個函數(shù)執(zhí)行時會有兩個參數(shù)保存歸約結(jié)果的累加器已收集了流中的前個項目,還有第個元素本身。 一、收集器簡介 把列表中的交易按貨幣分組: Map transactionsByCurrencies = transactions.stream().collect(groupi...
摘要:概述簡介并行流就是把一個內(nèi)容分成多個數(shù)據(jù)塊,并用不同的線程分別處理每個數(shù)據(jù)塊的流中將并行進行了優(yōu)化,我們可以很容易的對數(shù)據(jù)進行并行操作,可以聲明性地通過與在并行流與順序流之間進行切換。 1. 概述 1.1 簡介 并行流就是把一個內(nèi)容分成多個數(shù)據(jù)塊,并用不同的線程分別處理每個數(shù)據(jù)塊的流 Java 8 中將并行進行了優(yōu)化,我們可以很容易的對數(shù)據(jù)進行并行操作,Stream API 可以聲明性...
閱讀 1061·2023-04-26 02:02
閱讀 2401·2021-09-26 10:11
閱讀 3553·2019-08-30 13:10
閱讀 3743·2019-08-29 17:12
閱讀 720·2019-08-29 14:20
閱讀 2187·2019-08-28 18:19
閱讀 2230·2019-08-26 13:52
閱讀 954·2019-08-26 13:43