摘要:同時,它會通過的方法將自己注冊到線程池中。線程池中的每個工作線程都有一個自己的任務隊列,工作線程優先處理自身隊列中的任務或順序,由線程池構造時的參數決定,自身隊列為空時,以的順序隨機竊取其它隊列中的任務。
本文首發于一世流云的專欄:https://segmentfault.com/blog...一、引言
算法領域有一種基本思想叫做“分治”,所謂“分治”就是將一個難以直接解決的大問題,分割成一些規模較小的子問題,以便各個擊破,分而治之。
比如:對于一個規模為N的問題,若該問題可以容易地解決,則直接解決;否則將其分解為K個規模較小的子問題,這些子問題互相獨立且與原問題性質相同,遞歸地解這些子問題,然后將各子問題的解合并得到原問題的解,這種算法設計策略叫做分治法。
許多基礎算法都運用了“分治”的思想,比如二分查找、快速排序等等。
基于“分治”的思想,J.U.C在JDK1.7時引入了一套Fork/Join框架。Fork/Join框架的基本思想就是將一個大任務分解(Fork)成一系列子任務,子任務可以繼續往下分解,當多個不同的子任務都執行完成后,可以將它們各自的結果合并(Join)成一個大結果,最終合并成大任務的結果:
二、工作竊取算法從上述Fork/Join框架的描述可以看出,我們需要一些線程來執行Fork出的任務,在實際中,如果每次都創建新的線程執行任務,對系統資源的開銷會很大,所以Fork/Join框架利用了線程池來調度任務。
另外,這里可以思考一個問題,既然由線程池調度,根據我們之前學習普通/計劃線程池的經驗,必然存在兩個要素:
工作線程
任務隊列
一般的線程池只有一個任務隊列,但是對于Fork/Join框架來說,由于Fork出的各個子任務其實是平行關系,為了提高效率,減少線程競爭,應該將這些平行的任務放到不同的隊列中去,如上圖中,大任務分解成三個子任務:子任務1、子任務2、子任務3,那么就創建三個任務隊列,然后再創建3個工作線程與隊列一一對應。
由于線程處理不同任務的速度不同,這樣就可能存在某個線程先執行完了自己隊列中的任務的情況,這時為了提升效率,我們可以讓該線程去“竊取”其它任務隊列中的任務,這就是所謂的工作竊取算法。
“工作竊取”的示意圖如下,當線程1執行完自身任務隊列中的任務后,嘗試從線程2的任務隊列中“竊取”任務:
對于一般的隊列來說,入隊元素都是在“隊尾”,出隊元素在“隊首”,要滿足“工作竊取”的需求,任務隊列應該支持從“隊尾”出隊元素,這樣可以減少與其它工作線程的沖突(因為正常情況下,其它工作線程從“隊首”獲取自己任務隊列中的任務),滿足這一需求的任務隊列其實就是我們在juc-collections框架中介紹過的雙端阻塞隊列——LinkedBlockingDeque。三、使用示例
當然,出于性能考慮,J.U.C中的Fork/Join框架并沒有直接利用LinkedBlockingDeque作為任務隊列,而是自己重新實現了一個。
為了給接下來的分析F/J框架組件做鋪墊,我們先通過一個簡單示例看下Fork/Join框架的基本使用。
假設有個非常大的long[]數組,通過FJ框架求解數組所有元素的和。
任務類定義,因為需要返回結果,所以繼承RecursiveTask,并覆寫compute方法。任務的fork通過ForkJoinTask的fork方法執行,join方法方法用于等待任務執行后返回:
public class ArraySumTask extends RecursiveTask{ ? private final int[] array; private final int begin; private final int end; ? private static final int THRESHOLD = 100; ? public ArraySumTask(int[] array, int begin, int end) { this.array = array; this.begin = begin; this.end = end; } ? @Override protected Long compute() { long sum = 0; ? if (end - begin + 1 < THRESHOLD) { // 小于閾值, 直接計算 for (int i = begin; i <= end; i++) { sum += array[i]; } } else { int middle = (end + begin) / 2; ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle); ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end); ? subtask1.fork(); subtask2.fork(); ? long sum1 = subtask1.join(); long sum2 = subtask2.join(); ? sum = sum1 + sum2; } return sum; } }
調用方如下:
public class Main { public static void main(String[] args) { ForkJoinPool executor = new ForkJoinPool(); ArraySumTask task = new ArraySumTask(new int[10000], 0, 9999); ? ForkJoinTask future = executor.submit(task); ? // some time passed... ? if (future.isCompletedAbnormally()) { System.out.println(future.getException()); } ? try { System.out.println("result: " + future.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } ? } }
注意:ForkJoinTask在執行的時候可能會拋出異常,但是沒辦法在主線程里直接捕獲異常,所以ForkJoinTask提供了isCompletedAbnormally()方法來檢查任務是否已經拋出異常或已經被取消了,并且可以通過ForkJoinTask的getException方法獲取異常.四、核心組件
在前幾小節中,我們簡要介紹了Fork/Join框架和它的使用。本節我們將更進一步,深入F/J框架,了解它的各個組件的關系和核心設計思想,本節不會涉及太多的源碼分析,而是參考 Doug Lea的這篇論文《A Java Fork/Join Framework》,從宏觀上分析F/J框架,然后分析整個框架的調度流程,閱讀完本節后,在下一節——Fork/Join框架(2) 實現中,我們再去深入源碼會輕松很多。
F/J框架的實現非常復雜,內部大量運用了位操作和無鎖算法,撇開這些實現細節不談,該框架主要涉及三大核心組件:ForkJoinPool(線程池)、ForkJoinTask(任務)、ForkJoinWorkerThread(工作線程),外加WorkQueue(任務隊列):
ForkJoinPool:ExecutorService的實現類,負責工作線程的管理、任務隊列的維護,以及控制整個任務調度流程;
ForkJoinTask:Future接口的實現類,fork是其核心方法,用于分解任務并異步執行;而join方法在任務結果計算完畢之后才會運行,用來合并或返回計算結果;
ForkJoinWorkerThread:Thread的子類,作為線程池中的工作線程(Worker)執行任務;
WorkQueue:任務隊列,用于保存任務;
ForkJoinPoolForkJoinPool作為Executors框架的一員,從外部看與其它線程池并沒有什么區別,僅僅是ExecutorService的一個實現類:
ForkJoinPool的主要工作如下:
接受外部任務的提交(外部調用ForkJoinPool的invoke/execute/submit方法提交任務);
接受ForkJoinTask自身fork出的子任務的提交;
任務隊列數組(WorkQueue[])的初始化和管理;
工作線程(Worker)的創建/管理。
注意:ForkJoinPool提供了3類外部提交任務的方法:invoke、execute、submit,它們的主要區別在于任務的執行方式上。
通過invoke方法提交的任務,調用線程直到任務執行完成才會返回,也就是說這是一個同步方法,且有返回結果;
通過execute方法提交的任務,調用線程會立即返回,也就是說這是一個異步方法,且沒有返回結果;
通過submit方法提交的任務,調用線程會立即返回,也就是說這是一個異步方法,且有返回結果(返回Future實現類,可以通過get獲取結果)。
ForkJoinPool對象的構建有兩種方式:
通過3種構造器的任意一種進行構造;
通過ForkJoinPool.commonPool()靜態方法構造。
JDK8以后,ForkJoinPool又提供了一個靜態方法commonPool(),這個方法返回一個ForkJoinPool內部聲明的靜態ForkJoinPool實例,主要是為了簡化線程池的構建,這個ForkJoinPool實例可以滿足大多數的使用場景:
public static ForkJoinPool commonPool() { // assert common != null : "static init error"; return common; }
ForkJoinPool對外提供的3種構造器,其實最終都調用了下面這個構造器:
/** * @param parallelism 并行級別, 默認為CPU核心數 * @param factory 工作線程工廠 * @param handler 異常處理器 * @param mode 調度模式: true表示FIFO_QUEUE; false表示LIFO_QUEUE * @param workerNamePrefix 工作線程的名稱前綴 */ private ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, UncaughtExceptionHandler handler, int mode, String workerNamePrefix) { this.workerNamePrefix = workerNamePrefix; this.factory = factory; this.ueh = handler; this.config = (parallelism & SMASK) | mode; long np = (long) (-parallelism); // offset ctl counts this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK); }
parallelism:默認值為CPU核心數,ForkJoinPool里工作線程數量與該參數有關,但它不表示最大線程數;
factory:工作線程工廠,默認是DefaultForkJoinWorkerThreadFactory,其實就是用來創建工作線程對象——ForkJoinWorkerThread;
handler:異常處理器;
config:保存parallelism和mode信息,供后續讀取;
ctl:線程池的核心控制字段
這些入參目前不用關注,我們重點是mode這個字段,ForkJoinPool支持兩種模式:
同步模式(默認方式)
異步模式
mode = asyncMode ? FIFO_QUEUE : LIFO_QUEUE
注意:這里的同步/異步并不是指F/J框架本身是采用同步模式還是采用異步模式工作,而是指其中的工作線程的工作方式。在F/J框架中,每個工作線程(Worker)都有一個屬于自己的任務隊列(WorkQueue),這是一個底層采用數組實現的雙向隊列。ForkJoinTask
同步是指:對于工作線程(Worker)自身隊列中的任務,采用后進先出(LIFO)的方式執行;異步是指:對于工作線程(Worker)自身隊列中的任務,采用先進先出(FIFO)的方式執行。
從Fork/Join框架的描述上來看,“任務”必須要滿足一定的條件:
支持Fork,即任務自身的分解
支持Join,即任務結果的合并
因此,J.U.C提供了一個抽象類——ForkJoinTask,來作為該類Fork/Join任務的抽象定義:
ForkJoinTask實現了Future接口,是一個異步任務,我們在使用Fork/Join框架時,一般需要使用線程池來調度任務,線程池內部調度的其實都是ForkJoinTask任務(即使提交的是一個Runnable或Callable任務,也會被適配成ForkJoinTask)。
除了ForkJoinTask,Fork/Join框架還提供了兩個它的抽象實現,我們在自定義ForkJoin任務時,一般繼承這兩個類:
RecursiveAction:表示具有返回結果的ForkJoin任務
RecursiveTask:表示沒有返回結果的ForkJoin任務
public abstract class RecursiveAction extends ForkJoinTask{ /** * 該任務的執行,子類覆寫該方法 */ protected abstract void compute(); ? public final Void getRawResult() { return null; } ? protected final void setRawResult(Void mustBeNull) { } ? protected final boolean exec() { compute(); return true; } }
public abstract class RecursiveTaskextends ForkJoinTask { ? /** * 該任務的執行結果. */ V result; ? /** * 該任務的執行,子類覆寫該方法 */ protected abstract V compute(); ? public final V getRawResult() { return result; } ? protected final void setRawResult(V value) { result = value; } ? protected final boolean exec() { result = compute(); return true; } }
ForkJoinTask除了和ForkJoinPool 結合使用外,也可以多帶帶使用,當我們調用ForkJoinTask的fork方法時,其內部會通過ForkJoinPool.commonPool()方法創建線程池,然后將自己作為任務提交給線程池。ForkJoinWorkerThread
Fork/Join框架中,每個工作線程(Worker)都有一個自己的任務隊列(WorkerQueue), 所以需要對一般的Thread做些特性化處理,J.U.C提供了ForkJoinWorkerThread類作為ForkJoinPool中的工作線程:
public class ForkJoinWorkerThread extends Thread { final ForkJoinPool pool; // 該工作線程歸屬的線程池 final ForkJoinPool.WorkQueue workQueue; // 對應的任務隊列 ? protected ForkJoinWorkerThread(ForkJoinPool pool) { super("aForkJoinWorkerThread"); // 指定工作線程名稱 this.pool = pool; this.workQueue = pool.registerWorker(this); } ?? // ... }
ForkJoinWorkerThread 在構造過程中,會保存所屬線程池信息和與自己綁定的任務隊列信息。同時,它會通過ForkJoinPool的registerWorker方法將自己注冊到線程池中。
線程池中的每個工作線程(ForkJoinWorkerThread)都有一個自己的任務隊列(WorkQueue),工作線程優先處理自身隊列中的任務(LIFO或FIFO順序,由線程池構造時的參數 mode 決定),自身隊列為空時,以FIFO的順序隨機竊取其它隊列中的任務。WorkQueue
任務隊列(WorkQueue)是ForkJoinPool與其它線程池區別最大的地方,在ForkJoinPool內部,維護著一個WorkQueue[]數組,它會在外部首次提交任務)時進行初始化:
volatile WorkQueue[] workQueues; // main registry
?
當通過線程池的外部方法(submit、invoke、execute)提交任務時,如果WorkQueue[]沒有初始化,則會進行初始化;然后根據數組大小和線程隨機數(ThreadLocalRandom.probe)等信息,計算出任務隊列所在的數組索引(這個索引一定是偶數),如果索引處沒有任務隊列,則初始化一個,再將任務入隊。也就是說,通過外部方法提交的任務一定是在偶數隊列,沒有綁定工作線程。
WorkQueue作為ForkJoinPool的內部類,表示一個雙端隊列。雙端隊列既可以作為棧使用(LIFO),也可以作為隊列使用(FIFO)。ForkJoinPool的“工作竊取”正是利用了這個特點,當工作線程從自己的隊列中獲取任務時,默認總是以棧操作(LIFO)的方式從棧頂取任務;當工作線程嘗試竊取其它任務隊列中的任務時,則是FIFO的方式。
我們在ForkJoinPool一節中曾講過,可以指定線程池的同步/異步模式(mode參數),其作用就在于此。同步模式就是“棧操作”,異步模式就是“隊列操作”,影響的就是工作線程從自己隊列中取任務的方式。
ForkJoinPool中的工作隊列可以分為兩類:
有工作線程(Worker)綁定的任務隊列:數組下標始終是奇數,稱為task queue,該隊列中的任務均由工作線程調用產生(工作線程調用FutureTask.fork方法);
沒有工作線程(Worker)綁定的任務隊列:數組下標始終是偶數,稱為submissions queue,該隊列中的任務全部由其它線程提交(也就是非工作線程調用execute/submit/invoke或者FutureTask.fork方法)。
五、線程池調度示例文字描述不太好理解,我們通過示意圖來看下任務入隊和“工作竊取”的整個過程:
假設現在通過ForkJoinPool的submit方法提交了一個FuturetTask任務,參考使用示例。初始
初始狀態下,線程池中的任務隊列為空,workQueues == null,也沒有工作線程:
外部提交FutureTask任務此時會初始化任務隊列數組WorkQueue[],大小為2的冪次,然后在某個槽位(偶數位)初始化一個任務隊列(WorkQueue),并插入任務:
注意,由于是非工作線程通過外部方法提交的任務,所以這個任務隊列并沒有綁定工作線程。
之所以是2的冪次,是由于ForkJoinPool采用了一種隨機算法(類似ConcurrentHashMap的隨機算法),該算法通過線程池隨機數(ThreadLocalRandom的probe值)和數組的大小計算出工作線程所映射的數組槽位,這種算法要求數組大小為2的冪次。創建工作線程
首次提交任務后,由于沒有工作線程,所以會創建一個工作線程,同時在某個奇數槽的位置創建一個與它綁定的任務隊列,如下圖:
竊取任務ForkJoinWorkThread_1會隨機掃描workQueues中的隊列,直到找到一個可以竊取的隊列——workQueues[2],然后從該隊列的base端獲取任務并執行,并將base加1:
竊取到的任務是FutureTask,ForkJoinWorkThread_1最終會調用它的compute方法(子類繼承ForkJoinTask,覆寫compute,參考本文的使用示例),該方法中會新建兩個子任務,并執行它們的fork方法:
@Override protected Long compute() { long sum = 0; ? if (end - begin + 1 < THRESHOLD) { // 小于閾值, 直接計算 for (int i = begin; i <= end; i++) { sum += array[i]; } } else { int middle = (end + begin) / 2; ArraySumTask subtask1 = new ArraySumTask(this.array, begin, middle); ArraySumTask subtask2 = new ArraySumTask(this.array, middle + 1, end); ? subtask1.fork(); subtask2.fork(); ? long sum1 = subtask1.join(); long sum2 = subtask2.join(); ? sum = sum1 + sum2; } return sum; }
之前說過,由于是由工作線程ForkJoinWorkThread_1來調用FutureTask的fork方法,所以會將這兩個子任務放入ForkJoinWorkThread_1自身隊列中:
然后,ForkJoinWorkThread_1會阻塞等待任務1和任務2的結果(先在subtask1.join等待):
long sum1 = subtask1.join(); long sum2 = subtask2.join();
從這里也可以看出,任務放到哪個隊列,其實是由調用線程來決定的(根據線程探針值probe計算隊列索引)。如果調用線程是工作線程,則必然有自己的隊列(task queue),則任務都會放到自己的隊列中;如果調用線程是其它線程(如主線程),則創建沒有工作線程綁定的任務隊列(submissions queue),然后存入任務。新的工作線程
ForkJoinWorkThread_1調用兩個子任務1和2的fork方法,除了將它們放入自己的任務隊列外,還會導致新增一個工作線程ForkJoinWorkThread_2:
ForkJoinWorkThread_2運行后會像ForkJoinWorkThread_1那樣從其它隊列竊取任務,如下圖,從ForkJoinWorkThread_1隊列的base端竊取一個任務(直接執行,并不會放入自己隊列):
竊取完成后,ForkJoinWorkThread_2會直接執行任務1,又回到了FutureTask子類的compute方法,假設此時又fork出兩個任務——任務3、任務4,則ForkJoinWorkThread_2最終會在任務3的join方法上等待:
如果此時還有其它工作線程,則重復上述步驟:竊取、執行、入隊、join阻塞、返回。ForkJoinTask的join方法內部邏輯非常復雜,上述ForkJoinWorkThread_1和ForkJoinWorkThread_2目前都在等待任務的完成,但事實上,ForkJoinTask存在一種互助機制,即工作線程之間可以互相幫助執行任務,這里不詳細展開,只需要知道,ForkJoinWorkThread_1和ForkJoinWorkThread_2可能會被其它工作線程喚醒。
我們這里假設ForkJoinWorkThread_2被其它某個工作線程喚醒,任務3和任務4的join方法依次返回了結果,那么任務1的結果也會返回,于是ForkJoinWorkThread_1也被喚醒(它在任務1的join上等待),然后ForkJoinWorkThread_1會繼續執行任務2的join方法,如果任務2不再分解,則最終返回任務1和任務2的合并結果,計算結束。
自身隊列的任務執行ForkJoinWorkThread_1和ForkJoinWorkThread_2喚醒執行完竊取到的任務后,還沒有結束,它們還會去看看自身隊列中有無任務可以執行。
/** * Executes the given task and any remaining local tasks. */ final void runTask(ForkJoinTask> task) { if (task != null) { scanState &= ~SCANNING; // mark as busy (currentSteal = task).doExec(); U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC execLocalTasks(); ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow transferStealCount(pool); scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); } }
上述ForkJoinPool.WorkQueue.runTask方法中,doExec()就是執行竊取的任務,而execLocalTasks用于執行隊列本身的任務。
我們假設此時的線程池是下面這種狀態:
工作線程ForkJoinWorkThread_1調用execLocalTasks方法一次性執行自己隊列中的所有任務,這時分成兩種情況:
1.異步模式(asyncMode==true)
如果構造線程池時,asyncMode為true,表示以異步模式執行工作線程自身隊列中的任務,此時會從 base -> top遍歷并執行所有任務。
2.同步模式(asyncMode==false)
如果構造線程池時,asyncMode為false(默認情況),表示以同步模式執行工作線程自身隊列中的任務,此時會從 top -> base 遍歷并執行所有任務。
任務的入隊總是在top端,所以當以同步模式遍歷時,其實相當于棧操作(從棧頂pop元素);
如果是異步模式,相當于隊列的出隊操作(從base端poll元素)。
異步模式比較適合于那些不需要返回結果的任務。其實如果將隊列中的任務看成一棵樹(無環連通圖)的話,異步模式類似于圖的廣度優先遍歷,同步模式類似于圖的深度優先遍歷
假設此處以默認的同步模式遍歷,ForkJoinWorkThread_1從棧頂開始執行并移除任務,先執行任務2并移除,再執行任務1并:
六、總結本章簡要概述了Fork/Join框架的思想、主要組件及基本使用,Fork/Join框架的核心包含四大組件:ForkJoinTask任務類、ForkJoinPool線程池、ForkJoinWorkerThread工作線程、WorkQueue任務隊列。
本章通過示例,描述了各個組件的關系以及ForkJoin線程池的整個調度流程,F/J框架的核心來自于它的工作竊取及調度策略,可以總結為以下幾點:
每個Worker線程利用它自己的任務隊列維護可執行任務;
任務隊列是一種雙端隊列,支持LIFO的push和pop操作,也支持FIFO的take操作;
任務fork的子任務,只會push到它所在線程(調用fork方法的線程)的隊列;
工作線程既可以使用LIFO通過pop處理自己隊列中的任務,也可以FIFO通過poll處理自己隊列中的任務,具體取決于構造線程池時的asyncMode參數;
當工作線程自己隊列中沒有待處理任務時,它嘗試去隨機讀取(竊取)其它任務隊列的base端的任務;
當線程進入join操作,它也會去處理其它工作線程的隊列中的任務(自己的已經處理完了),直到目標任務完成(通過isDone方法);
當一個工作線程沒有任務了,并且嘗試從其它隊列竊取也失敗了,它讓出資源(通過使用yields, sleeps或者其它優先級調整)并且隨后會再次激活,直到所有工作線程都空閑了——此時,它們都阻塞在等待另一個頂層線程的調用。
下一章將通過源碼分析更深入的理解Fork/Join調度過程。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/71835.html
摘要:并不會為每個任務都創建工作線程,而是根據實際情況構造線程池時的參數確定是喚醒已有空閑工作線程,還是新建工作線程。 showImg(https://segmentfault.com/img/bVbiYSP?w=1071&h=707); 本文首發于一世流云的專欄:https://segmentfault.com/blog... 一、引言 前一章——Fork/Join框架(1) 原理,我們...
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:注意線程與本地操作系統的線程是一一映射的。固定線程數的線程池提供了兩種創建具有固定線程數的的方法,固定線程池在初始化時確定其中的線程總數,運行過程中會始終維持線程數量不變。 showImg(https://segmentfault.com/img/bVbhK58?w=1920&h=1080); 本文首發于一世流云專欄:https://segmentfault.com/blog... ...
摘要:本文首發于一世流云的專欄一模式簡介模式是多線程設計模式中的一種常見模式,它的主要作用就是異步地執行任務,并在需要的時候獲取結果。二中的模式在多線程基礎之模式中,我們曾經給出過模式的通用類關系圖。 showImg(https://segmentfault.com/img/bVbiwcx?w=1000&h=667); 本文首發于一世流云的專欄:https://segmentfault.co...
摘要:好了,繼續向下執行,嘗試獲取鎖失敗后,會調用首先通過方法,將包裝成共享結點,插入等待隊列,插入完成后隊列結構如下然后會進入自旋操作,先嘗試獲取一次鎖,顯然此時是獲取失敗的主線程還未調用,同步狀態還是。 showImg(https://segmentfault.com/img/remote/1460000016012541); 本文首發于一世流云的專欄:https://segmentfa...
閱讀 3638·2021-11-25 09:43
閱讀 636·2021-09-22 15:59
閱讀 1744·2021-09-06 15:00
閱讀 1769·2021-09-02 09:54
閱讀 689·2019-08-30 15:56
閱讀 1176·2019-08-29 17:14
閱讀 1839·2019-08-29 13:15
閱讀 880·2019-08-28 18:28