摘要:前言在前面的文章和響應式編程中提到了和后者毫無疑問是一個線程池前者則是一個類似經典定義的概念官方有一個非常無語的解釋就是運行在的一個任務抽象就是運行的線程池框架包含和若干的子類它的核心在于分治和工作竅取最大程度利用線程池中的工作線程避免忙的
前言
在前面的文章"CompletableFuture和響應式編程"中提到了ForkJoinTask和ForkJoinPool,后者毫無疑問是一個線程池,前者則是一個類似FutureTask經典定義的概念.
官方有一個非常無語的解釋:ForkJoinTask就是運行在ForkJoinPool的一個任務抽象,ForkJoinPool就是運行ForkJoinTask的線程池.
ForkJoin框架包含ForkJoinTask,ForkJoinWorkerThread,ForkJoinPool和若干ForkJoinTask的子類,它的核心在于分治和工作竅取,最大程度利用線程池中的工作線程,避免忙的忙死,餓的餓死.
ForkJoinTask可以理解為類線程但比線程輕量的實體,在ForkJoinPool中運行的少量ForkJoinWorkerThread可以持有大量的ForkJoinTask和它的子任務.ForkJoinTask同時也是一個輕量的Future,使用時應避免較長阻塞和io.
ForkJoinTask在JAVA8中應用廣泛,但它是一個抽象類,它的子類派生了各種用途,如后續計劃多帶帶介紹的CountedCompleter,以及若干JAVA8中stream?api定義的與并行流有關的各種操作(ops).
源碼首先看ForkJoinTask的簽名.
public abstract class ForkJoinTaskimplements Future , Serializable
從簽名上看,ForkJoinTask實現了future,也可以序列化,但它不是一個Runnable或Callable.
ForkJoinTask雖然可以序列化,但它只對運行前和后敏感,對于執行過程中不敏感.
先來看task的運行字段:
//volatie修飾的任務狀態值,由ForkJoinPool或工作線程修改. volatile int status; static final int DONE_MASK = 0xf0000000;//用于屏蔽完成狀態位. static final int NORMAL = 0xf0000000;//表示正常完成,是負值. static final int CANCELLED = 0xc0000000;//表示被取消,負值,且小于NORMAL static final int EXCEPTIONAL = 0x80000000;//異常完成,負值,且小于CANCELLED static final int SIGNAL = 0x00010000;//用于signal,必須不小于1<<16,默認為1<<16. static final int SMASK = 0x0000ffff;//后十六位的task標簽
很顯然,DONE_MASK能夠過濾掉所有非NORMAL,非CANCELLED,非EXCEPTIONAL的狀態,字段的含義也很直白,后面的SIGNAL和SMASK還不明確,后面再看.
//標記當前task的completion狀態,同時根據情況喚醒等待該task的線程. private int setCompletion(int completion) { for (int s;;) { //開啟一個循環,如果當前task的status已經是各種完成(小于0),則直接返回status,這個status可能是某一次循環前被其他線程完成. if ((s = status) < 0) return s; //嘗試將原來的status設置為它與completion按位或的結果. if (U.compareAndSwapInt(this, STATUS, s, s | completion)) { if ((s >>> 16) != 0) //此處體現了SIGNAL的標記作用,很明顯,只要task完成(包含取消或異常),或completion傳入的值不小于1<<16, //就可以起到喚醒其他線程的作用. synchronized (this) { notifyAll(); } //cas成功,返回參數中的completion. return completion; } } }
前面用注釋解釋了這個方法的邏輯,顯然該方法是阻塞的,如果傳入的參數不能將status設置為負值會如何?
顯然,可能會有至多一次的成功cas,并且若滿足喚醒的條件,會嘗試去喚醒線程,甚至可能因為為了喚醒其他線程而被阻塞在synchonized代碼塊外;也可能沒有一次成功的cas,直到其他線程成功將status置為完成.
//final修飾,運行ForkJoinTask的核心方法. final int doExec() { int s; boolean completed; //僅未完成的任務會運行,其他情況會忽略. if ((s = status) >= 0) { try { //調用exec completed = exec(); } catch (Throwable rex) { //發生異常,用setExceptionalCompletion設置結果 return setExceptionalCompletion(rex); } if (completed) //正常完成,調用前面說過的setCompletion,參數為normal,并將返回值作為結果s. s = setCompletion(NORMAL); } //返回s return s; } //記錄異常并且在符合條件時傳播異常行為 private int setExceptionalCompletion(Throwable ex) { //首先記錄異常信息到結果 int s = recordExceptionalCompletion(ex); if ((s & DONE_MASK) == EXCEPTIONAL) //status去除非完成態標志位(只保留前4位),等于EXCEPTIONAL.內部傳播異常 internalPropagateException(ex); return s; } //internalPropagateException方法是一個空方法,留給子類實現,可用于completer之間的異常傳遞 void internalPropagateException(Throwable ex) {} //記錄異常完成 final int recordExceptionalCompletion(Throwable ex) { int s; if ((s = status) >= 0) { //只能是異常態的status可以記錄. //hash值禁止重寫,不使用子類的hashcode函數. int h = System.identityHashCode(this); final ReentrantLock lock = exceptionTableLock; //異常鎖,加鎖 lock.lock(); try { //抹除臟異常,后面敘述 expungeStaleExceptions(); //異常表數組.ExceptionNode后面敘述. ExceptionNode[] t = exceptionTable;//exceptionTable是一個全局的靜態常量,后面敘述 //用hash值和數組長度進行與運算求一個初始的索引 int i = h & (t.length - 1); for (ExceptionNode e = t[i]; ; e = e.next) { //找到空的索引位,就創建一個新的ExceptionNode,保存this,異常對象并退出循環 if (e == null) { t[i] = new ExceptionNode(this, ex, t[i]);//(1) break; } if (e.get() == this) //已設置在相同的索引位置的鏈表中,退出循環.//2 break; //否則e指向t[i]的next,進入下個循環,直到發現判斷包裝this這個ForkJoinTask的ExceptionNode已經出現在t[i]這個鏈表并break(2), //或者直到e是null,意味著t[i]出發開始的鏈表并無包裝this的ExceptionNode,則將構建一個新的ExceptionNode并置換t[i], //將原t[i]置為它的next(1).整個遍歷判斷和置換過程處在鎖中進行. } } finally { lock.unlock(); } //記錄成功,將當前task設置為異常完成. s = setCompletion(EXCEPTIONAL); } return s; } //exceptionTable聲明 private static final ExceptionNode[] exceptionTable;//全局異常node表 private static final ReentrantLock exceptionTableLock;//上面用到的鎖,就是一個普通的可重入鎖. private static final ReferenceQueue
到此doExec(也是每個ForkJoinTask的執行核心過程)就此結束.
很明顯,ForkJoinTask的doExec負責了核心的執行,它留下了exec方法給子類實現,而重點負責了后面出現異常情況的處理.處理的邏輯前面已論述,在產生異常時嘗試將異常存放在全局的execptionTable中,存放的結構為數組+鏈表,按哈希值指定索引,每次存放新的異常時,順便清理上一次已被gc回收的ExceptionNode.所有ForkJoinTask共享了一個exceptionTable,因此必然在有關的幾個環節要進行及時的清理.除了剛剛論述的過程,還有如下的幾處:
前面論述了recordExceptionalCompletion,一共有四處使用了expungeStaleException,將已回收的ExceptionNode從引用隊列中清除.
clearExceptionalCompletion在對一個ForkJoinTask重新初始化時使用,我們在前面提到序列化時說過,ForkJoinTask的序列化結果只保留了兩種情況:運行前,運行結束.重新初始化一個ForkJoinTask,就要去除任何中間狀態,包含自身產出的已被回收的異常node,而expungeStaleExceptions顯然也順便幫助其他task清除.
getThrowableException是查詢task運行結果時調用,如一些get/join方法,很明顯,記錄這個異常的作用就在于返回給get/join,在這一塊順便清理已被回收的node,尤其是將自己運行時生成的node清除.
helpExpungeStaleExceptions是提供給ForkJoinPool在卸載worker時使用,順便幫助清理全局異常表.
使用它們的方法稍后再論述,先來繼續看ForkJoinTask的源碼.
//內部等待任務完成,直到完成或超時. final void internalWait(long timeout) { int s; //status小于0代表已完成,直接忽略wait. //未完成,則試著加上SIGNAL的標記,令完成任務的線程喚醒這個等待. if ((s = status) >= 0 && U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { //加鎖,只有一個線程可以進入. synchronized (this) { //再次判斷未完成.等待timeout,且忽略擾動異常. if (status >= 0) try { wait(timeout); } catch (InterruptedException ie) { } else //已完成則響醒其他等待者. notifyAll(); } } }
internalWait方法邏輯很簡單,首先判斷是否未完成,滿足未完成,則將標記位加上SIGNAL(可能已有別的線程做過),隨后加鎖double?check?status,還未完成則等待并釋放鎖,若發現已完成,或在后續被喚醒后發現已完成,則喚醒其他等待線程.通過notifyAll的方式避免了通知丟失.
同時,它的使用方法目前只有一個ForkJoinPool::awaitJoin,在該方法中使用循環的方式進行internalWait,滿足了每次按截止時間或周期進行等待,同時也順便解決了虛假喚醒.
繼續看externalAwaitDone函數.它體現了ForkJoin框架的一個核心:外部幫助.
//外部線程等待一個common池中的任務完成. private int externalAwaitDone() { int s = ((this instanceof CountedCompleter) ? //當前task是一個CountedCompleter,嘗試使用common ForkJoinPool去外部幫助完成,并將完成狀態返回. ForkJoinPool.common.externalHelpComplete( (CountedCompleter>)this, 0) : //當前task不是CountedCompleter,則調用common pool嘗試外部彈出該任務并進行執行, //status賦值doExec函數的結果,若彈出失敗(其他線程先行彈出)賦0. ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); if (s >= 0 && (s = status) >= 0) { //檢查上一步的結果,即外部使用common池彈出并執行的結果(不是CountedCompleter的情況),或外部嘗試幫助CountedCompleter完成的結果 //status大于0表示嘗試幫助完成失敗. //擾動標識,初值false boolean interrupted = false; do { //循環嘗試,先給status標記SIGNAL標識,便于后續喚醒操作. if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) { try { //CAS成功,進同步塊發現double check未完成,則等待. wait(0L); } catch (InterruptedException ie) { //若在等待過程中發生了擾動,不停止等待,標記擾動. interrupted = true; } } else //進同步塊發現已完成,則喚醒所有等待線程. notifyAll(); } } } while ((s = status) >= 0);//循環條件,task未完成. if (interrupted) //循環結束,若循環中間曾有擾動,則中斷當前線程. Thread.currentThread().interrupt(); } //返回status return s; }
externalAwaitDone的邏輯不復雜,在當前task為ForkJoinPool.common的情況下可以在外部進行等待和嘗試幫助完成.方法會首先根據ForkJoinTask的類型進行嘗試幫助,并返回當前的status,若發現未完成,則進入下面的等待喚醒邏輯.該方法的調用者為非worker線程.
相似的方法:externalInterruptibleAwaitDone
private int externalInterruptibleAwaitDone() throws InterruptedException { int s; //不同于externalAwaitDone,入口處發現當前線程已中斷,則立即拋出中斷異常. if (Thread.interrupted()) throw new InterruptedException(); if ((s = status) >= 0 && (s = ((this instanceof CountedCompleter) ? ForkJoinPool.common.externalHelpComplete( (CountedCompleter>)this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0)) >= 0) { while ((s = status) >= 0) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) //wait時也不catch中斷異常,發生即拋出. wait(0L); else notifyAll(); } } } } return s; }
externalInterruptibleAwaitDone的邏輯與externalAwaitDone相似,只是對中斷異常的態度為拋,后者為catch.
它們的使用點,externalAwaitDone為doJoin或doInvoke方法調用,externalInterruptibleAwaitDone為get方法調用,很明顯,join操作不可擾動,get則可以擾動.
下面來看看doJoin和doInvoke
//join的核心方法 private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; //已完成,返回status,未完成再嘗試后續 return (s = status) < 0 ? s : //未完成,當前線程是ForkJoinWorkerThread,從該線程中取出workQueue,并嘗試將 //當前task出隊然后執行,執行的結果是完成則返回狀態,否則使用當線程池所在的ForkJoinPool的awaitJoin方法等待. ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : //當前線程不是ForkJoinWorkerThread,調用前面說的externalAwaitDone方法. externalAwaitDone(); } //invoke的核心方法 private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; //先嘗試本線程執行,不成功才走后續流程 return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? //與上一個方法基本相同,但在當前線程是ForkJoinWorkerThread時不嘗試將該task移除棧并執行,而是等 (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); }
到此終于可以看一些公有對外方法了.有了前面的基礎,再看get,join,invoke等方法非常簡單.
//get方法還有get(long time)的變種. public final V get() throws InterruptedException, ExecutionException { int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ? //當前線程是ForkJoinWorkerThread則調用前面提過的doJoin方法. //否則調用前述externalInterruptibleAwaitDone doJoin() : externalInterruptibleAwaitDone(); Throwable ex; if ((s &= DONE_MASK) == CANCELLED) //異常處理,取消的任務,拋出CancellationException. throw new CancellationException(); if (s == EXCEPTIONAL && (ex = getThrowableException()) != null) //異常處理,調用getThrowableException獲取異常,封進ExecutionException. throw new ExecutionException(ex); //無異常處理,返回原始結果. return getRawResult(); } //getRawResult默認為一個抽象實現,在ForkJoinTask中,并未保存該結果的字段. public abstract V getRawResult(); //getThrowableException方法 private Throwable getThrowableException() { //不是異常標識,直接返回null,從方法名的字面意思看,要返回一個可拋出的異常. if ((status & DONE_MASK) != EXCEPTIONAL) return null; //系統哈希碼來定位ExceptionNode int h = System.identityHashCode(this); ExceptionNode e; final ReentrantLock lock = exceptionTableLock; //加異常表全局鎖 lock.lock(); try { //先清理已被回收的異常node,前面已述. expungeStaleExceptions(); ExceptionNode[] t = exceptionTable; e = t[h & (t.length - 1)]; //循環找出this匹配的異常node while (e != null && e.get() != this) e = e.next; } finally { lock.unlock(); } Throwable ex; //前面找不出異常node或異常node中存放的異常為null,則返回null if (e == null || (ex = e.ex) == null) return null; if (e.thrower != Thread.currentThread().getId()) { //不是當前線程拋出的異常. Class extends Throwable> ec = ex.getClass(); try { Constructor> noArgCtor = null;//該異常的無參構造器 Constructor>[] cs = ec.getConstructors();//該異常類公有構造器 for (int i = 0; i < cs.length; ++i) { Constructor> c = cs[i]; Class>[] ps = c.getParameterTypes(); if (ps.length == 0) //構建器參數列表長度0說明存在無參構造器,存放. noArgCtor = c; else if (ps.length == 1 && ps[0] == Throwable.class) { //發現有參構造器且參數長度1且第一個參數類型是Throwable,說明可以存放cause. //反射將前面取出的ex作為參數,反射調用該構造器創建一個要拋出的Throwable. Throwable wx = (Throwable)c.newInstance(ex); //反射失敗,異常會被catch,返回ex,否則返回wx. return (wx == null) ? ex : wx; } } if (noArgCtor != null) { //在嘗試了尋找有參無參構造器,并發現只存在無參構造器的情況,用無參構造器初始化異常. Throwable wx = (Throwable)(noArgCtor.newInstance()); if (wx != null) { //將ex設置為它的cause并返回它的實例. wx.initCause(ex); return wx; } } } catch (Exception ignore) { //此方法不可拋出異常,一定要成功返回. } } //有參無參均未成功,返回找到的異常. return ex; } //join公有方法 public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) //調用doJoin方法阻塞等待的結果不是NORMAL,說明有異常或取消.報告異常. reportException(s); //等于NORMAL,正常執行完畢,返回原始結果. return getRawResult(); } //報告異常,可在前一步判斷執行status是否為異常態,然后獲取并重拋異常. private void reportException(int s) { //參數s必須用DONE_MASK處理掉前4位以后的位. if (s == CANCELLED) //傳入的狀態碼等于取消,拋出取消異常. throw new CancellationException(); if (s == EXCEPTIONAL) //使用前面的getThrowableException方法獲取異常并重新拋出. rethrow(getThrowableException()); } //invoke公有方法. public final V invoke() { int s; //先嘗試執行 if ((s = doInvoke() & DONE_MASK) != NORMAL) //doInvoke方法的結果status只保留完成態位表示非NORMAL,則報告異常. reportException(s); //正常完成,返回原始結果. return getRawResult(); }
終于,讀到此處的讀者將關鍵的方法線串了起來,前述的所有內部方法,常量和變量與公有接口的關系已經明了.
很顯然,ForkJoinTask是個抽象類,且它并未保存任務的完成結果,也不負責這個結果的處理,但聲明并約束了返回結果的抽象方法getRawResult供子類實現.
因此,ForkJoinTask的自身關注任務的完成/異常/未完成,子類關注這個結果的處理.
每當獲取到任務的執行狀態時,ForkJoinTask可根據status來判斷是否是異常/正常完成,并進入相應的處理邏輯,最終使用子類實現的方法完成一個閉環.
如果理解為將ForkJoinTask和子類的有關代碼合并起來,在結果/完成狀態/異常信息這一塊,相當于同時有三個part在合作.
第一個part:status字段,它同時表示了未完成/正常完成/取消/異常完成等狀態,也同時告訴有關等待線程是否要喚醒其他線程(每個線程等待前會設置SIGNAL),同時留出了后面16位對付其他情況.
第二個part:result,在ForkJoinTask見不到它,也沒有相應的字段,子類也未必需要提供這個result字段,前面提到的CountedCompleter就沒有提供這個result,它的getRawResult會固定返回null.但是CountedCompleter可以繼承子類并實現這個result的保存與返回(道格大神在注釋中舉出了若干典型代碼例子),在JAVA8中,stream?api中的并行流也會保存每一步的計算結果,并對結果進行合并.
第三個part:異常.在ForkJoinTask中已經完成了所有異常處理流程和執行流程的定義,重點在于異常的存放,它是由ForkJoinTask的類變量進行存放的,結構為數組+鏈表,且元素利用了弱引用,借gc幫助清除掉已經被回收的ExceptionNode,顯然在gc之前必須得到使用.而異常隨時可以發生并進行record入列,但相應的能消費掉這個異常的只有相應的外部的get,join,invoke等方法或者內部擴展了exec()等方式,得到其他線程執行的task異常結果的情況.巧妙的是,只有外部調用者調用(get,invoke,join)時,這個異常信息才足夠重要,需要rethrow出去并保存關鍵的堆棧信息;而內部線程在訪問一些非自身執行的任務時,往往只需要status判斷是否異常即可,在exec()中fork新任務的,也往往必須立即join這些新的子任務,這就保證了能夠及時得到子任務中的異常堆棧(即使拿不到堆棧也知道它失敗了).
經過前面的論述,ForkJoinTask的執行和異常處理已經基本論結,但是,一個ForkJoinTask在創建之后是如何運行的?顯然,它不是一個Runnable,也不是Callable,不能直接submit或execute到普通的線程池.
臨時切換到ForkJoinPool的代碼,前面提到過,ForkJoinTask的官方定義就是可以運行在ForkJoinPool中的task.
//ForkJoinPool代碼,submit一個ForkJoinTask到ForkJoinPool,并將該task自身返回. //拿到返回的task,我們就可以進行前述的get方法了. publicForkJoinTask submit(ForkJoinTask task) { if (task == null) throw new NullPointerException(); externalPush(task); return task; } //execute,不返回.類似普通線程池提交一個runnable的行為. public void execute(ForkJoinTask> task) { if (task == null) throw new NullPointerException(); externalPush(task); }
顯然,若要使用一個自建的ForkJoinPool,可以使用execute或submit函數提交入池,然后用前述的get方法和變種方法進行.這是一種運行task的方式.
前面論述過的invoke方法會先去先去嘗試本地執行,然后才去等待,故我們自己new一個ForkJoinTask,一樣可以通過invoke直接執行,這是第二種運行task的方式.
前面論述的join方法在某種情況下也是一種task的運行方式,在當前線程是ForkJoinWorkerThread時,會去嘗試將task出隊并doExec,也就是會先用本線程執行一次,不成功才干等,非ForkJoinWorkerThread則直接干等了.顯然我們可以自己構建一個ForkJoinWorkerThread并去join,這時會將任務出隊并執行(但存在一個問題:什么時候入隊).且出隊后若未執行成功,則awaitJoin(參考ForkJoinPool::awaitJoin),此時因任務已出隊,不會被竊取或幫助(在awaitJoin中會有helpStealer,但其實任務是當前線程自己"偷走"了),似乎完全要靠自己了.但并不表示ForkJoinTask子類無法獲取這個已出隊的任務,比如CountedCompleter使用時,可以在compute中新生成的Completer時,將源CountedCompleter(ForkJoinTask的子類)作為新生成的CountedCountedCompleter的completer(該子類中的一個字段),這樣,若有一個ForkJoinWorkerThread竊取了這個新生成的CountedCompleter,可以通過completer鏈表找到先前被出隊的CountedCompleter(ForkJoinTask).關于CountedCompleter多帶帶文章詳述.
除此之外呢?包含前面提到的使用join操作不是ForkJoinWorkerThread調用的情況,不使用ForkJoinPool的submit?execute入池,如何能讓一個ForkJoinTask在將來執行?我們來看后面的方法.
//fork方法,將當前任務入池. public final ForkJoinTaskfork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) //如果當前線程是ForkJoinWorkerThread,將任務壓入該線程的任務隊列. ((ForkJoinWorkerThread)t).workQueue.push(this); else //否則調用common池的externalPush方法入隊. ForkJoinPool.common.externalPush(this); return this; }
顯然,我們還可以通過對一個ForkJoinTask進行fork方法入池,入哪個池完全取決于當前線程的類型.這是第四種讓任務能被運行的方式.
同樣,我們也看到了第五種方式,ForkJoinPool.common其實就是一個常量保存的ForkJoinPool,它能夠調用externalPush,我們自然也可以直接new一個ForkJoinPool,然后將當前task進行externalPush,字面意思外部壓入.這種辦法,非ForkJoinWorkerThread也能將任務提交到非common的ForkJoinPool.
從名字來看,ForkJoinTask似乎已經說明了一切,按照官方的注釋也是如此.對一個task,先Fork壓隊,再Join等待執行結果,這是一個ForkJoinTask的執行周期閉環(但不要簡單理解為生命周期,前面提到過,任務可以被重新初始化,而且重新初始化時還會清空ExceptionNode數組上的已回收成員).
到此為止,ForkJoinTask的核心函數和api已經基本了然,其它同類型的方法以及周邊的方法均不難理解,如invokeAll的各種變種.下面來看一些"周邊"類型的函數.有前述的基礎,它們很好理解.
//取消一個任務的執行,直接將status設置成CANCELLED,設置后判斷該status 是否為CANCELLED,是則true否則false. public boolean cancel(boolean mayInterruptIfRunning) { return (setCompletion(CANCELLED) & DONE_MASK) == CANCELLED; } //判斷是否完成,status小于0代表正常完成/異常完成/取消,很好理解. public final boolean isDone() { return status < 0; } //判斷當前任務是否取消. public final boolean isCancelled() { //status前4位 return (status & DONE_MASK) == CANCELLED; } public final boolean isCompletedAbnormally() { //是否為異常完成,前面說過,CANCELLED和EXCEPTIONAL均小于NORMAL return status < NORMAL; } //是否正常完成. public final boolean isCompletedNormally() { //完成態位等于NORMAL return (status & DONE_MASK) == NORMAL; } //獲取異常. public final Throwable getException() { int s = status & DONE_MASK; //當為正常完成或未完成時,返回null. return ((s >= NORMAL) ? null : //是取消時,新建一個取消異常. (s == CANCELLED) ? new CancellationException() : //不是取消,參考前面提到的getThrowableException. getThrowableException()); } //使用異常完成任務. public void completeExceptionally(Throwable ex) { //參考前述的setExceptionalCompletion, //ex已經是運行時異常或者Error,直接使用ex完成,若是受檢異常,包裝成運行時異常. setExceptionalCompletion((ex instanceof RuntimeException) || (ex instanceof Error) ? ex : new RuntimeException(ex)); } //使用value完成任務. public void complete(V value) { try { //設置原始結果,它是一個空方法.前面說過ForkJoinTask沒有維護result之類的結果字段,子類可自行發揮. setRawResult(value); } catch (Throwable rex) { //前述步驟出現異常,就用異常方式完成. setExceptionalCompletion(rex); return; } //前面的結果執行完,標記當前為完成. setCompletion(NORMAL); } //安靜完成任務.直接用NORMAL setCompletion,沒什么好說的. public final void quietlyComplete() { setCompletion(NORMAL); } //安靜join,它不會返回result也不會拋出異常.處理集合任務時,如果需要所有任務都被執行而不是一個執行出錯(取消)其他也跟著出錯的情況下, //很明顯適用,這不同于invokeAll,靜態方法invokeAll或invoke(ForkJoinTask,ForkJoinTask)會在任何一個任務出現異常后取消執行并拋出. public final void quietlyJoin() { doJoin(); } //安靜執行一次,不返回結果不拋出異常,沒什么好說的. public final void quietlyInvoke() { doInvoke(); } //重新初臺化當前task public void reinitialize() { if ((status & DONE_MASK) == EXCEPTIONAL) //如果當前任務是異常完成的,清除異常.該方法參考前面的論述. clearExceptionalCompletion(); else //否則重置status為0. status = 0; } //反fork. public boolean tryUnfork() { Thread t; //當前線程是ForkJoinWorkerThread,從它的隊列嘗試移除. return (((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ((ForkJoinWorkerThread)t).workQueue.tryUnpush(this) : //當前線程不是ForkJoinWorkerThread,用common池外部移除. ForkJoinPool.common.tryExternalUnpush(this)); }
上面是一些簡單的周邊方法,大多并不需要再論述了,unfork方法很明顯在某些場景下不會成功,顯然,當一個任務剛剛入隊并未進行后續操作時,很可能成功.按前面所述,當對一個任務進行join時,可能會成功的彈出當前任務并執行,此時不可能再次彈出;當一個任務被其他線程竊取或被它本身執行的也不會彈出.
再來看一些老朋友,在前面的文章"CompletableFuture和響應式編程"一文中,作者曾著重強調過它將每個要執行的動作進行壓棧(未能立即執行的情況),而棧中的元素Completion即是ForkJoinTask的子類,而標記該Completion是否被claim的方法和周邊方法如下:
//獲取ForkJoinTask的標記,返回結果為short型 public final short getForkJoinTaskTag() { //status的后16位 return (short)status; } //原子設置任務的標記位. public final short setForkJoinTaskTag(short tag) { for (int s;;) { //不停循環地嘗試將status的后16位設置為tag. if (U.compareAndSwapInt(this, STATUS, s = status, //替換的結果,前16位為原status的前16位,后16位為tag. (s & ~SMASK) | (tag & SMASK))) //返回被換掉的status的后16位. return (short)s; } } //循環嘗試原子設置標記位為tag,前提是原來的標記位等于e,成功true失敗false public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { for (int s;;) { if ((short)(s = status) != e) //如果某一次循環的原標記位不是e,則返回false. return false; //同上個方法 if (U.compareAndSwapInt(this, STATUS, s, (s & ~SMASK) | (tag & SMASK))) return true; } }
還記得CompletableFuture在異步執行Completion時要先claim嗎?claim方法中,會嘗試設置這個標記位.這是截止jdk8中CompletableFuture使用到ForkJoinTask的功能.
目前來看,在CompletableFuture的內部實現Completion還沒有使用到ForkJoinTask的其他屬性,比如放入一個ForkJoinPool執行(沒有任何前面總結的調用,比如用ForkJoinPool的push,execute,submit等,也沒有fork到common池).但是很明顯,道格大神令它繼承自ForkJoinTask不可能純粹只為了使用區區一個標記位,試想一下,在如此友好支持響應式編程的CompletableFuture中傳入的每一個action都可以生成若干新的action,那么CompletableFuture負責將這些action封裝成Completion放入ForkJoinPool執行,將最大化利用到ForkJoin框架的工作竊取和外部幫助的功效,強力結合分治思想,這將是多么優雅的設計.或者在jdk9-12中已經出現了相應的Completion實現(盡管作者寫過JAVA9-12,遺憾的是也沒有去翻它們的源碼).
另外,盡管Completion的眾多子類也沒有result之類的表示結果的字段,但它的一些子類通過封裝,實際上間接地將這個Completion所引用的dep的result作為了自己的"result",當然,getRawResult依舊是null,但是理念卻是相通的.
以上是ForkJoinTask的部分核心源碼,除了上述的源碼外,還有一些同屬于ForkJoinTask的核心源碼部分,比如其他的public方法(參考join?fork?invoke?即可),一些利用ForkJoinPool的實現,要深入了解ForkJoinPool才能了解的方法,一些不太難的靜態方法等,這些沒有必要論述了.
除了核心源碼外,ForkJoinTask也提供了對Runnable,Callable的適配器實現,這塊很好理解,簡單看一看.
//對Runnable的實現,如果在ForkJoinPool中提交一個runnable,會用它封裝成ForkJoinTask static final class AdaptedRunnableextends ForkJoinTask implements RunnableFuture { final Runnable runnable; T result; AdaptedRunnable(Runnable runnable, T result) { //不能沒有runnable if (runnable == null) throw new NullPointerException(); this.runnable = runnable; //對runnable做適配器時,可以提交將結果傳入,并設置為當前ForkJoinTask子類的result. //前面說過,ForkJoinTask不以result作為完成標記,判斷一個任務是否完成或異常,使用status足以, //返回的結果才使用result. this.result = result; } public final T getRawResult() { return result; } public final void setRawResult(T v) { result = v; } //前面說過提交入池的ForkJoinTask最終會運行doExec,而它會調用exec,此處會調用run. public final boolean exec() { runnable.run(); return true; } public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L;//序列化用 } //無結果的runnable適配器 static final class AdaptedRunnableAction extends ForkJoinTask implements RunnableFuture { final Runnable runnable; AdaptedRunnableAction(Runnable runnable) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; } //區別就是result固定為null,也不能set public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L; } //對runnable的適配器,但強制池中的工作線程在執行任務發現異常時拋出 static final class RunnableExecuteAction extends ForkJoinTask { final Runnable runnable; RunnableExecuteAction(Runnable runnable) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; } //默認null結果,set也是空實現 public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } void internalPropagateException(Throwable ex) { //前面說過doExec會被執行,它會調exec并catch,在catch塊中設置當前任務為異常完成態, //然后調用internalPropagateException方法,而在ForkJoinTask中默認為空實現. //此處將異常重新拋出,將造成worker線程拋出異常. rethrow(ex); } private static final long serialVersionUID = 5232453952276885070L; } //對callable的適配器,當將callable提交至ForkJoinPool時使用. static final class AdaptedCallable extends ForkJoinTask implements RunnableFuture { final Callable extends T> callable; T result; AdaptedCallable(Callable extends T> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; } //字段中有一個result,直接使用它返回. public final T getRawResult() { return result; } //result可外部直接設置. public final void setRawResult(T v) { result = v; } public final boolean exec() { try { //默認的result用call函數設置. result = callable.call(); return true; } catch (Error err) { //catch住Error,拋出 throw err; } catch (RuntimeException rex) { //catch住運行時異常,拋出 throw rex; } catch (Exception ex) { //catch住受檢異常,包裝成運行時異常拋出. throw new RuntimeException(ex); } } //run方法一樣只是調用invoke,進而調用doExec. public final void run() { invoke(); } private static final long serialVersionUID = 2838392045355241008L; } //runnable生成適配器的工具方法 public static ForkJoinTask> adapt(Runnable runnable) { return new AdaptedRunnableAction(runnable); } //指定結果設置runnable的適配器工具方法 public static ForkJoinTask adapt(Runnable runnable, T result) { return new AdaptedRunnable (runnable, result); } //對callable生成適配器的方法. public static ForkJoinTask adapt(Callable extends T> callable) { return new AdaptedCallable (callable); }
以上的代碼都不復雜,只要熟悉了ForkJoinTask的本身代碼結構,對于這一塊了解非常容易,這也間接說明了ForkJoinPool中是如何處理Runnable和Callable的(因為ForkJoinPool本身也是一種線程池,可以接受提交Callable和Runnable).
將runnable提交到pool時,可以指定result,也可以不指定,也可以用submit或execute方法區分異常處理行為,ForkJoinPool會自行選擇相應的適配器.
將callable?提交到pool時,pool會選擇對callable的適配器,它的結果將為task的結果,它的異常將為task的異常.
到此為止,ForkJoinTask的源碼分析完成.
后語本文詳細分析了ForkJoinTask的源碼,并解釋了前文CompletableFuture中Completion與它的關聯,以及分析了Completion繼承自ForkJoinTask目前已帶來的功能利用(tag)和將來可能增加的功用(一個Completion產生若干多個Completion并在ForkJoinPool中運行,還支持工作竊取).
同時本文也對ForkJoinPool和ForkJoinWorkerThread,以及CountedCompleter和Stream?api中的并行流進行了略微的描述.
在文章的最后,或許有一些新手讀者會好奇,我們究竟什么時候會使用ForkJoinTask?
首先,如果你在項目中大肆使用了流式計算,并使用了并行流,那么你已經在使用了.
前面提過,官方解釋ForkJoinTask可以視作比線程輕量許多的實體,也是輕量的Future.結合在源碼中時不時出來秀存在感的ForkJoinWorkerThread,顯然它就是據說比普通線程輕量一些的線程,在前面的源碼中可以看出,它維護了一組任務的隊列,每個線程負責完成隊列中的任務,也可以偷其他線程的任務,甚至池外的線程都可以時不時地來個join,順便幫助出隊執行任務.
顯然,對于重計算,輕io,輕阻塞的任務,適合使用ForkJoinPool,也就使用了ForkJoinTask,你不會認為它可以提交runnable和callable,就可以不用ForkJoinTask了吧?前面的適配器ForkJoinPool在這種情況下必用的,可以去翻相應的源碼.
本章沒有去詳述CountedCompleter,但前面論述時說過,你可以在exec()中將一個計算復雜的任務拆解為小的子任務,然后將子任務入池執行,父任務合并子任務的結果.這種分治的算法此前基本是在單線程模式下運行,使用ForkJoinTask,則可以將這種計算交給一個ForkJoinPool中的所有線程并行執行.
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77869.html
摘要:前言在前面的三篇文章中先后介紹了框架的任務組件體系體系源碼并簡單介紹了目前的并行流應用場景框架本質上是對的擴展它依舊支持經典的使用方式即任務池的配合向池中提交任務并異步地等待結果毫無疑問前面的文章已經解釋了框架的新穎性初步了解了工作竊取 前言 在前面的三篇文章中先后介紹了ForkJoin框架的任務組件(ForkJoinTask體系,CountedCompleter體系)源碼,并簡單介紹...
摘要:前言在前面的文章框架之中梳理了框架的簡要運行格架和異常處理流程顯然要理解框架的調度包含工作竊取等思想需要去中了解而對于的拓展和使用則需要了解它的一些子類前文中偶爾會提到的一個子類直譯為計數的完成器前文也說過的并行流其實就是基于了框架實現因此 前言 在前面的文章ForkJoin框架之ForkJoinTask中梳理了ForkJoin框架的簡要運行格架和異常處理流程,顯然要理解ForkJoi...
摘要:對于任務的分割,要求各個子任務之間相互獨立,能夠并行獨立地執行任務,互相之間不影響。是叉子分叉的意思,即將大任務分解成并行的小任務,是連接結合的意思,即將所有并行的小任務的執行結果匯總起來。使用方法會阻塞并等待子任務執行完并得到其結果。 Fork/Join是什么? Fork/Join框架是Java7提供的并行執行任務框架,思想是將大任務分解成小任務,然后小任務又可以繼續分解,然后每個小...
摘要:第二步執行任務并合并結果。使用兩個類來完成以上兩件事情我們要使用框架,必須首先創建一個任務。用于有返回結果的任務。如果任務順利執行完成了,則設置任務狀態為,如果出現異常,則紀錄異常,并將任務狀態設置為。 1. 什么是Fork/Join框架 Fork/Join框架是Java7提供了的一個用于并行執行任務的框架, 是一個把大任務分割成若干個小任務,最終匯總每個小任務結果后得到大任務結果的...
摘要:內部類,用于對和異常進行包裝,從而保證對進行只有一次成功。是取消異常,轉換后拋出。判斷是否使用的線程池,在中持有該線程池的引用。 前言 近期作者對響應式編程越發感興趣,在內部分享JAVA9-12新特性過程中,有兩處特性讓作者深感興趣:1.JAVA9中的JEP266對并發編程工具的更新,包含發布訂閱框架Flow和CompletableFuture加強,其中發布訂閱框架以java.base...
閱讀 847·2021-11-25 09:43
閱讀 3681·2021-11-19 09:40
閱讀 882·2021-09-29 09:34
閱讀 1783·2021-09-26 10:21
閱讀 870·2021-09-22 15:24
閱讀 4187·2021-09-22 15:08
閱讀 3265·2021-09-07 09:58
閱讀 2656·2019-08-30 15:55