摘要:框架使用的是工作竊取算法。由于此時它們訪問同一個隊列,為了減小競爭,通常會使用雙端隊列。方法返回對象,如果任務被取消了則返回,如果任務沒有完成或者沒有拋出異常則返回。
概述
Fork 就是把一個大任務切分為若干個子任務并行地執行,Join 就是合并這些子任務的執行結果,最后得到這個大任務的結果。Fork/Join 框架使用的是工作竊取算法。
工作竊取算法工作竊取算法是指某個線程從其他隊列里竊取任務來執行。對于一個比較大的任務,可以把它分割為若干個互不依賴的子任務,為了減少線程間的競爭,把這些子任務分別放到不同的隊列里,并為每個隊列創建一個多帶帶的線程來執行隊列里的任務,線程和隊列一一對應。但是,有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務需要處理,于是它就去其他線程的隊列里竊取一個任務來執行。由于此時它們訪問同一個隊列,為了減小競爭,通常會使用雙端隊列。被竊取任務的線程永遠從雙端隊列的頭部獲取任務,竊取任務的線程永遠從雙端隊列的尾部獲取任務。
工作竊取算法的優缺點優點:充分利用線程進行并行計算,減少了線程間的競爭。
缺點:雙端隊列只存在一個任務時會導致競爭,會消耗更多的系統資源,因為需要創建多個線程和多個雙端隊列。
ForkJoinTask 在執行的時候可能拋出異常,但沒有辦法在主線程中直接捕獲異常,所以 ForkJoinTask 提供了 isCompletedAbnormally() 方法檢查任務是否已經拋出異常或已經被取消。getException() 方法返回 Throwable 對象,如果任務被取消了則返回 CancellationException,如果任務沒有完成或者沒有拋出異常則返回 null。
Fork/Join 框架的實現原理 fork() 方法的實現原理當調用 ForkJoinTask 的 fork() 方法時,程序會調用 ForkJoinPool.WorkQueue 的 push() 方法異步地執行這個任務,然后立即返回結果。代碼如下:
public final ForkJoinTaskfork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; }
push() 方法把當前任務存放在一個 ForkJoinTask 數組隊列里,然后再調用 ForkJoinPool 的 signalWork() 方法喚醒或創建一個工作線程來執行任務。代碼如下:
final void push(ForkJoinTask> task) { ForkJoinTask>[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task); U.putOrderedInt(this, QTOP, s + 1); if ((n = s - b) <= 1) { if ((p = pool) != null) p.signalWork(p.workQueues, this); } else if (n >= m) growArray(); } }join() 方法的實現原理
當調用 ForkJoinTask 的 join() 方法時,程序會調用 doJoin() 方法,通過 doJoin() 方法來判斷返回什么結果
public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } private void reportException(int s) { if (s == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL) rethrow(getThrowableException()); } public abstract V getRawResult();
實例代碼:
import java.util.concurrent.ExecutionException; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.Future; import java.util.concurrent.RecursiveTask; public class CountTask extends RecursiveTask{ private static final int THRESHOLD = 2; private int start; private int end; public CountTask(int start, int end) { this.start = start; this.end = end; } @Override protected Integer compute() { int sum = 0; boolean canCompute = (end - start) <= THRESHOLD; if (canCompute) { // 如果任務足夠小,就計算任務 for (int i = start; i <= end; i++) { sum += i; } } else { // 如果任務大于閾值,分裂成兩個子任務執行 int middle = (start + end) / 2; CountTask leftTask = new CountTask(start, middle); CountTask rightTask = new CountTask(middle + 1, end); // 執行子任務 leftTask.fork(); rightTask.fork(); // 等待子任務執行完,并得到其結果 int leftResult = leftTask.join(); int rightResult = rightTask.join(); // 合并子任務 sum = leftResult + rightResult; } return sum; } public static void main(String[] args) { ForkJoinPool forkJoinPool = new ForkJoinPool(); CountTask countTask = new CountTask(1, 100); peekNextLocalTask(); Future result = forkJoinPool.submit(countTask); try { if (countTask.isCompletedAbnormally()) { System.out.println(countTask.getException()); } System.out.println(result.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
參考資料
Java 并發編程的藝術
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/75169.html
以下是Java技術棧微信公眾號發布的關于 Java 的技術干貨,從以下幾個方面匯總。 Java 基礎篇 Java 集合篇 Java 多線程篇 Java JVM篇 Java 進階篇 Java 新特性篇 Java 工具篇 Java 書籍篇 Java基礎篇 8張圖帶你輕松溫習 Java 知識 Java父類強制轉換子類原則 一張圖搞清楚 Java 異常機制 通用唯一標識碼UUID的介紹及使用 字符串...
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:這減輕了手動重復執行相同基準測試的痛苦,并簡化了獲取結果的流程。處理項目的代碼并從標有注釋的方法處生成基準測試程序。用和運行該基準測試得到以下結果。同時,和的基線測試結果也有略微的不同。 Java 8 已經發布一段時間了,許多開發者已經開始使用 Java 8。本文也將討論最新發布在 JDK 中的并發功能更新。事實上,JDK 中已經有多處java.util.concurrent 改動,但...
摘要:同時,它會通過的方法將自己注冊到線程池中。線程池中的每個工作線程都有一個自己的任務隊列,工作線程優先處理自身隊列中的任務或順序,由線程池構造時的參數決定,自身隊列為空時,以的順序隨機竊取其它隊列中的任務。 showImg(https://segmentfault.com/img/bVbizJb?w=1802&h=762); 本文首發于一世流云的專欄:https://segmentfau...
摘要:方法返回對象,如果任務被取消了則返回。如果任務沒有完成或者沒有拋出異常則返回。 一. Fork/Join 1 . 簡單介紹 a . Fork/Join為JKD1.7引入,適用于對大量數據進行拆分成多個小任務進行計算的框架,最后把所有小任務的結果匯總合并得到最終的結果 b . 相關類 public abstract class RecursiveTask extends Fork...
閱讀 3035·2023-04-25 20:09
閱讀 3318·2021-11-23 09:51
閱讀 1971·2021-11-22 15:25
閱讀 3348·2021-11-18 10:02
閱讀 2747·2021-09-27 13:56
閱讀 1304·2019-08-30 15:44
閱讀 1149·2019-08-30 13:21
閱讀 3322·2019-08-30 11:05