摘要:前言在前面的文章框架之中梳理了框架的簡要運行格架和異常處理流程顯然要理解框架的調度包含工作竊取等思想需要去中了解而對于的拓展和使用則需要了解它的一些子類前文中偶爾會提到的一個子類直譯為計數的完成器前文也說過的并行流其實就是基于了框架實現因此
前言
在前面的文章"ForkJoin框架之ForkJoinTask"中梳理了ForkJoin框架的簡要運行格架和異常處理流程,顯然要理解ForkJoin框架的調度,包含工作竊取等思想,需要去ForkJoinPool中了解,而對于ForkJoinTask的拓展和使用則需要了解它的一些子類,前文中偶爾會提到ForkJoinTask的一個子類:CountedCompleter,直譯為計數的完成器.
前文也說過,JAVA8的并行流其實就是基于了ForkJoin框架實現,因此并行流其實就在使用我們前面提到的工作竊取和分治思想.為了方便對于ForkJoinTask的理解,本文將詳述CountedCompleter(同時在ForkJoinPool中也需要了解它),以及前文提到的工作線程ForkJoinWorkerThread,并簡單看一看并行流.
CountedCompleter源碼根據doug的注釋,CoutedCompleter是一個特殊的ForkJoinTask,它會在觸發完成動作時,檢查有沒有掛起action,若沒有則執行一個完成動作.這個概念有些抽象,必須結合源碼和源碼作者給出的示例加以理解,同樣的,理解了它,也就理解了CountedCompleter的擴展類的實現方式,從而能閱讀懂有關的源碼(如并行流中涉及到運行集拆分,結果合并,運算調度等源碼).
它也是一個抽象類,基于ForkJoinTask的exec函數進行了若干擴展.
public abstract class CountedCompleterextends ForkJoinTask //任務的完成者,很明顯這是一個全局的棧結構(暫時這么理解吧,其實也不太嚴格). final CountedCompleter> completer; //重要字段,代表完成前掛起的任務數量,用volatile修飾. volatile int pending; //帶有completer的構造器. protected CountedCompleter(CountedCompleter> completer) { this.completer = completer; } //不帶completer的構造器 protected CountedCompleter() { this.completer = null; } //抽象的compute方法,它是類似ForkJoinTask的擴展方式. public abstract void compute(); //重寫的exec方法 protected final boolean exec() { //直接調用compute方法并返回false.回到ForkJoinTask類中的doExec方法,可以看到 //調用了exec后若得到true值,將會執行setCompletion(NORMAL)動作.且該動作將在首次喚醒等待結果的線程. //此處return了false,將不去執行上述操作.詳情參考上篇文章. compute(); return false; }
以上是CountedCompleter的簽名,字段,構造器和核心的抽象方法compute,其實整個CountedCompleter就是在圍著這點東西轉,首先看一看與ForkJoinTask的結合.
顯然,CountedCompleter簡單重寫了ForkJoinTask的exec方法簡單調用抽象的compute方法并返回false,當出現異常時,流程不變,但當compute方式正常完成的情況,將不可能進行父類后續的設置完成和喚醒操作.因此它必須由CountedCompleter自定義的完成.
而CountedCompleter也確實暴露了一些公有函數,但是調用的時機卻要用戶繼承它之后決定.我們先來繼續一些輔助源碼并理解Completer的設計理念,稍后再來看它的完成方法.
//onCompletion勾子方法,默認空實現. //CountedCompleter在tryComplete方法中會在符合完成的第一個條件(無掛起任務)的情況下執行它. //complete方法也會對它有無條件地調用. //關于這兩個方法稍后詳述. //它的實現取決于要實現的操作,并行流中的一些ops會在此處進行一些中間結果處理,比如結果集的合并(reduce操作). public void onCompletion(CountedCompleter> caller) { } //重寫ForkJoinTask中的方法.上篇源碼分享文章中提過,在ForkJoinTask的setExceptionalCompletion會調用internalPropagateException //傳遞異常,而且是個空實現,而在CountedCompleter中實現了該方法,并在內部調用onExceptionalCompletion void internalPropagateException(Throwable ex) { CountedCompleter> a = this, s = a; //循環判斷每一個task是否要傳遞異常給它的completer //無方法體的while循環.道格大神的代碼神跡. while (a.onExceptionalCompletion(ex, s) && //要傳遞給completer且具備completer且completer還不是完成態(正常或非正常) (a = (s = a).completer) != null && a.status >= 0 && //則令completer去記錄異常完成,若記錄成功則進入下一輪循環. a.recordExceptionalCompletion(ex) == EXCEPTIONAL) ; //因為onExceptionalCompletion固定返回true,若沒有中間完成的任務,直到最后一個completer,也就是root, //root不具備completer,將中斷循環. } //異常完成勾子方法. //按上一節的概念,當ForkJoinTask執行出錯,即exec->compute出錯時,最終會調到此勾子.或當手動completeExceptionally或cancel時. public boolean onExceptionalCompletion(Throwable ex, CountedCompleter> caller) { //直接返回true,顯然也是一個供擴展的方法.返回true代表異常應該傳遞給this的completer. return true; } //返回completer public final CountedCompleter> getCompleter() { return completer; } //返回掛起任務數量. public final int getPendingCount() { return pending; } //設置掛起任務數量 public final void setPendingCount(int count) { pending = count; } //原子地為掛起任務數量添加delta public final void addToPendingCount(int delta) { U.getAndAddInt(this, PENDING, delta); } //原子地將當前掛起任務數量從expected更改到count public final boolean compareAndSetPendingCount(int expected, int count) { return U.compareAndSwapInt(this, PENDING, expected, count); } //將當前任務的掛起數量原子減至0. public final int decrementPendingCountUnlessZero() { int c; do {} while ((c = pending) != 0 && !U.compareAndSwapInt(this, PENDING, c, c - 1)); return c; } //返回root completer.邏輯很簡單. public final CountedCompleter> getRoot() { CountedCompleter> a = this, p; while ((p = a.completer) != null) a = p; return a; }
以上是幾個工具函數,邏輯也很簡單,僅有一處可能留有疑問:完成態/異常態是如何傳遞的.
現在大家應該理解為什么ForkJoinTask要將internalPropagateException置為空實現了,顯然,對于不同方式的實現,確實需要不同的傳遞行為.CountedCompleter保存了一個類似"棧結構"的任務鏈,雖然提前講到棧底即為root任務(當然root在底部還是頂部本身不重要),顯然任何一個子任務出現了問題,與它關聯的父任務的行為顯然要有一個明確的由子類定義的規則.
我們看到在重寫的internalPropagateException方法中,不停地判斷當前任務是否要將異常信號傳遞給鏈上的下一個任務(on方法始終返回true,沒關系我們可以在子類中重寫),然后讓未完成的completer去記錄同一個異常ex.
那么問題來了,只要completer已完成過(正常完成過異常完成或取消),顯然while循環中斷,completer和它的后續completer將不會被處理(1).同樣,若傳遞異常的任務本身就是另一個或幾個任務的completer,它的異常信息顯然不會反向傳遞(2).
對于問題(1),顯然如果后續的completer已出現過異常,必然也會走一遍同樣的邏輯,傳遞給后面的completer,如果它正常完成,也必然要有相應向后傳遞的行為,否則無法解決(1),我們接下來即論述相關方法.
對于問題(2),顯然問題(1)中描述的情況與此有所交集,如果我們建立了一個CountedCompleter任務,并在compute方法中大肆fork子任務入隊,fork之后不等子任務完成,也不獲取子任務的執行結果,直接將父任務setCompletion或者setExceptionalCompletion,子任務還是會繼續執行的.
為了便于理解,我們繼續來看與任務的完成有關的方法.
//嘗試完成根任務或減少棧鏈下游的某一個completer的掛起數(包含它自身). public final void tryComplete() { //1.初始用a保存this,后續為當前操作任務,用s保存a. CountedCompleter> a = this, s = a; for (int c;;) { //2.第一次進入或在6造成競態的某一次循環中,a(this或this的completer鏈中的某一個)的的掛起任務數為0,代表它掛起的任務都完成了. if ((c = a.pending) == 0) { //3.a的勾子方法,若已經運行過4,且判斷條件為假未能到5并在下一次循環重新回到3的情況,a!=s且a是s的completer, //在對onCompletion重寫時,可以根據this與參數是否相等進行判斷,如并行流聚合時可以根據這個條件進行結果集的合并. a.onCompletion(s); //4.將a指向自己的completer,s指向原來的a. if ((a = (s = a).completer) == null) { //5.原來a的completer不存在,即a不是root,不需要再傳遞了,讓root進行quietlyComplete并返回. //此時說明整條鏈上的competer掛起任務全部是0. s.quietlyComplete(); return; } //隱藏的7.當原a的completer存在(a不是root)的情況,繼續對該complter判斷掛起任務數或嘗試減1,對下一個元素開啟下一輪循環. } //6.對this的completer棧的某一次循環時發現了掛起任務數不為0的,則對該completer的掛起數減1, //表示它掛起的任務完成了一個,并返回.若在此時恰好出現了競態,另一條鏈上的任務搶先減一,則當前 //的a要進入下一循環,它可能會在2處判斷通過,進入到鏈上的下一個completer的傳播邏輯. else if (U.compareAndSwapInt(a, PENDING, c, c - 1)) return; } } //基本等效于tryComplete,只是不執行onCompletion,tryComplete會在判斷鏈上某個completer的掛起任務數是0立即執行onCompletion. public final void propagateCompletion() { CountedCompleter> a = this, s = a; for (int c;;) { if ((c = a.pending) == 0) { if ((a = (s = a).completer) == null) { s.quietlyComplete(); return; } } else if (U.compareAndSwapInt(a, PENDING, c, c - 1)) return; } } //complete方法,邏輯簡單,絲毫不考慮掛起數,直接執行當前task的幾個完成方法,并嘗試對completer進行tryComplete. //它不改變自己的掛起任務數,但會讓completer對棧上的其他completer或自身嘗試減少掛起數或完成root. public void complete(T rawResult) { CountedCompleter> p; setRawResult(rawResult);//使用參數設置為當前任務的結果,盡管它為空方法. onCompletion(this);//直接調用onCompletion勾子. quietlyComplete();//安靜地將status置為NORMAL. if ((p = completer) != null) //自己不改變自身掛起數,也不嘗試完成root,但讓completer嘗試去向下執行這些操作. p.tryComplete(); } //沒辦法多帶帶理解這個方法名.官方注釋是和nextComplete放置在循環中使用. public final CountedCompleter> firstComplete() { for (int c;;) { if ((c = pending) == 0) //1.當前task沒有掛起任務數,則返回它. return this; else if (U.compareAndSwapInt(this, PENDING, c, c - 1)) //2.否則嘗試減少一個掛起任務數并返回null.但當出現競態時,可能導致未能進入2而在下一次循環進入1. return null; } } //結合前面的firstComplete互相理解,它會對當前任務判斷是否有completer,有則對該completer進行firstComplete, //否則將當前任務安靜完成并返回null. //故結果只能返回null或completer public final CountedCompleter> nextComplete() { CountedCompleter> p; if ((p = completer) != null) //有completer且completer已無掛起任務數,則返回completer, //有completer且completer有掛起任務數,則嘗試對該任務數減一并返回null.出現競態則可能返回該completer. return p.firstComplete(); else { //無completer,安靜完成當前任務并返回null. quietlyComplete(); return null; } } //等同于getRoot().quietlyComplete() public final void quietlyCompleteRoot() { for (CountedCompleter> a = this, p;;) { if ((p = a.completer) == null) { a.quietlyComplete(); return; } a = p; } } //如果當前任務未完成,嘗試去出棧執行,并處理至多給定數量的其他未處理任務,且對這些未處理任務 //來說,當前任務處于它們的完成路徑上(即這些任務是completer棧鏈的前置任務),實現特殊的工作竊取. public final void helpComplete(int maxTasks) { Thread t; ForkJoinWorkerThread wt; if (maxTasks > 0 && status >= 0) { if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) //當前線程是ForkJoinWorkerThread,嘗試執行當前任務并嘗試從線程的工作隊列中嘗試幫助前置任務執行. (wt = (ForkJoinWorkerThread)t).pool. helpComplete(wt.workQueue, this, maxTasks); else //使用common池的externalHelpComplete方法. ForkJoinPool.common.externalHelpComplete(this, maxTasks); } }
上一段代碼總體邏輯不難,有以下幾點總結:
1.顯然tryComplete方法在調用后的最終結果只有兩個:自己或completer鏈前方的某一個completer的掛起任務數減1(1),自己或completer鏈前方某一個completer(root)的quietlyComplete被執行(2).簡單來說,就是讓root進行quietlyComplete(鏈上每一個掛起任務數都是0)或讓鏈上的某一個completer減少一個掛起任務.
2.tryComplete方法只會對root進行quietlyComplete,進而setComplete(NORMAL),對于鏈上的其他任務,最多會幫助掛起數減一,而不會把它們置為完成態,但是線程池在執行任務時,或者直接對一個鏈上的completer進行invoke,doExec甚至get等操作時,這些方法會將該中間completer進行setComplete.
3.每一個CountedCompleter都可能有自己的completer棧鏈,每一個CountedCompleter也可以位于其他CountedCompleter的棧鏈上且上游不唯一而下游唯一一(倒樹形),任何一條棧鏈只能有一個root,root的completer為null.
4.從tryComplete方法來看正常運行情況下的規則,每一個CountedCompleter的tryComplete只能向前影響到鏈上的另一個completer,因為實現數量的增加方法有好幾處,用戶在實現時,隨時可能將一些completer的數量設置成任意的數,故可以出現前面tryComplete注釋中隱藏的7的情況,即存在一個completer,它的下一個completer的掛起數是0,它卻能將下下個completer安靜完成或將其掛起數減一,即跨無掛起數節點傳遞.
5.前面列出的helpComplete方法是CountedCompleter的特殊工作竊取方法(或者也不能叫作竊取,因為非common池情況竊取的是自己線程的任務,common池則依賴于一個探測值),具體的竊取細節在ForkJoinPool中,將在后面的文章中論述,但簡單的邏輯已經在注釋中描述清楚,把它歸到這一塊,也是因為它與前面描述的邏輯有所糾葛.124提到了tryComplete的向前影響結果,而在實際的應用中,我們可能會有各種各樣的情景,ForkJoin框架無法阻止我們對ForkJoinTask的exec函數進行任意式的擴展,也無法阻止我們對CountedCompleter的compute任意擴展,那么如何在我們任意拓展的情景下保持效率和健壯?比如下面這個使用場景:
a.建立一種ForkJoinTask,直接繼承CountedCompleter并重寫compute方法,則它可以運行在ForkJoinPool中.
b.我們接下來在compute方法中多次根據計算結果集的大小進行拆分并遞歸fork子任務入池,父任務成為子任務的completer,同時compute方法自身也負責不可拆分的計算邏輯,并在自身這一塊計算結束后,可能等待所有fork入池的子任務結束,也可能不等待子任務,直接結束父任務,讓線程空出來做其他的事.
c.所有子任務結束后,使用一個合并函數合并子任務的結果集和自身的結果,并作為最終的結果.然后tryComplete(如果b中使用了join,或者判斷當前任務是root).
顯然,b中fork出的子任務,也同樣要執行bc的邏輯.那么可能出現這樣的情況:
不同的父任務子任務在ForkJoinPool最初始壓入當前工作線程的隊列中,但隨時可能被其他工作線程甚至外部線程偷去執行.
父任務搶先搶得運行資源,運行完自己計算的部分,而入池的子任務及子孫任務有大量未完成.
難道父任務的執行線程就這樣干等?在前一篇文章中說過,ForkJoin框架適宜多計算,輕io,輕阻塞的情況,且本身就是為了避免線程忙的忙死餓的餓死,因此每個任務等待子任務執行結束是不可取的,這或許也是為什么有了ForkJoinTask,卻還要有CountedCompleter的原因之一吧.
若我們在任何每一個任務中只是單純地將該分出去的子任務fork入池并執行自己那一部分,并不讓當前線程join子任務呢?(事實上不join子任務恰好可以將當前線程的資源騰出來做其他的事)
所以,除了前面5中提到的若干種(124)向前影響completer棧鏈的掛起數或root的完成態,還需要一個能向棧鏈后方有所影響的操作,比如幫助子任務的完成,畢竟子任務也是b中fork出來且由自己入隊的.
helpComplete方法就可以做到這一點,它在ForkJoinPool中,它僅應在當前任務未完成時使用,首先它會嘗試將當前任務從出隊列并執行(ForkJoinPool::popCC及成功后續doExec,LIFO),出隊失敗則表示正在被執行甚至被偷去執行.出隊這一步之后,再嘗試自己的線程工作隊列中找出自己的子孫任務(FIFO)并進行執行(ForkJoinPool::pollAndExecCC).
而若執行完某個父任務的工作線程必然會調用tryComplete等有關方法,將自身或棧鏈后方的某一個completer的掛起數減一,甚至因為一些不合理的api使用(如直接更改了后方某個任務的掛起數量)而直接終止了root,將root任務標記成完成態.(注意前面強調的"運行完自己計算的部分",這就是否定本句話的關鍵了,前面也說明"helpComplete僅在當前任務未完成時使用",顯然,完成了自己負責的計算內容并不代表當前任務完成了,因為它的子任務還沒有完成,因此它不會調用tryComplete,并且可以去幫助子任務)
同時,執行完父任務負責的計算內容的任務線程也會去找它棧鏈后方的其他任務,按照b的邏輯,這將是它的子任務,幫助它們完成,每完成一個子任務(子任務無子任務,不再help的情況),會進行tryComplete傳遞一次.
余下的方法很簡單.
//重寫自ForkJoinTask的結果,前文也說過CountedCompleter也不維護result,返回null. //但并行流或者一些其他并行操作可以實現此結果,比如ConcurrentHashMap中支持的map reduce操作. public T getRawResult() { return null; } //同上,默認空,一些子類會有特別的實現. protected void setRawResult(T t) { }
顯然,completer棧鏈上的所有任務是可以并行執行的,且每一個完成都可以向后tryComplete一次,并在其后可以幫助前面的任務完成,而我們若實現上述兩個方法,完全可以將自身運算的結果設置進去,在root被安靜完成后,ForkJoinTask將可以get到結果(或join也將返回結果),可在此時合并計算結果,有些結果顯然是可以并行的.
一些操作,比如find類型,任何一個子任務完成了find,就可以直接讓root結束,然后直接讓整條棧鏈上的任務cancelIgnoringExceptions.
一些需要聚合每一個任務結果的操作,比如reduce類型,需要每個父任務根據子任務的結果去reduce,它的父任務再根據他和兄弟任務的結果reduce,最終合并到root.顯然,mapper由子任務實現,reducer由父任務實現.
一些接近find或reduce類型(或者說find的變種),比如filter,每一個任務都會有結果,這個結果可能是自己負責的原集中的一部分子集,也可能就是個空集,父任務合并每個子任務的結果集,直到root.
排序類型的操作,如使用歸并排序,顯然每個父任務即是divider也是merger,分解出的每個子集交給子任務去計算,父任務再去負責merge.
......
以上是ForkJoinTask的抽象子類CountedCompleter的源碼分析,接下來我們繼續分析工作線程.
ForkJoinWorkerThread源碼只要對java的線程結構稍有了解,ForkJoinWorkerThread的源碼十分簡單,且前面提過,ForkJoinTask被聲稱是一個輕量于普通線程和Future的實體,而它在ForkJoinPool中的運行載體便是ForkJoinWorkerThread,這個輕量究竟體現在何處?
//類簽名,直接繼承自Thread public class ForkJoinWorkerThread extends Thread { //每個ForkJoinWorkerThread都只能屬于一個線程池,且保存該池的引用. final ForkJoinPool pool; //每個ForkJoinWorkerThread都有一個工作隊列, 顯然隊列中的任務就是該線程干活的最小單位了.它也是工作竊取機制的核心. final ForkJoinPool.WorkQueue workQueue; //構造函數,創建時指定線程池. protected ForkJoinWorkerThread(ForkJoinPool pool) { // 線程名稱 super("aForkJoinWorkerThread"); this.pool = pool; //將工作線程注冊到ForkJoinPool后會返回一個工作隊列,供當前線程使用和供其他線程偷取. this.workQueue = pool.registerWorker(this); } //帶線程組的構造器 ForkJoinWorkerThread(ForkJoinPool pool, ThreadGroup threadGroup, AccessControlContext acc) { super(threadGroup, null, "aForkJoinWorkerThread"); //inheritedAccessControlContext是從Thread繼承下來的,字面意思是繼承的訪問控制上下文,設置為acc. U.putOrderedObject(this, INHERITEDACCESSCONTROLCONTEXT, acc); //注冊入池之前,清除掉本地化信息 eraseThreadLocals(); this.pool = pool; this.workQueue = pool.registerWorker(this); }
//返回注冊的池.
public ForkJoinPool getPool() { return pool; } //返回當前線程工作隊列在池中的索引,每個隊列都會維護一個在池中的索引. public int getPoolIndex() { return workQueue.getPoolIndex(); } //空函數,可交給子類實現,按照官方注釋,它的作用是在構造之后(這個構造不是指new出線程對象, //而是在run方法已進入的時候,說明"構造"是指線程已經完成了創建能夠正常運行),處理任務之前. protected void onStart() { } //工作線程終止時的勾子方法,負責執行一些有關的清理操作.但是若要重寫它,必須在方法的 //最后調用super.onTermination.參數exception是造成該線程終止的異常.若是正常結束, //則它是null. protected void onTermination(Throwable exception) { } //核心方法. public void run() { //doug在這一塊標注"只運行一次",查看ForkJoinPool的源碼, //ForkJoinPool中會有一個WorkQueue的數組,在取消線程的注冊后, //本線程關聯的WorkQueue會從該數組移除,但WorkQueue中的array不會置空. if (workQueue.array == null) { Throwable exception = null; try { //前面說過的預先操作 onStart(); //用線程池的runWorker方法執行,傳入隊列. pool.runWorker(workQueue); } catch (Throwable ex) { //發生異常,中斷前記錄下來 exception = ex; } finally { try { //將記錄下來的異常調用勾子方法. onTermination(exception); } catch (Throwable ex) { if (exception == null) //執行勾子方法本身出現了異常,記錄下來 exception = ex; } finally { //調用線程池的解除注冊方法,會將本線程的WorkQueue從數組中移除,同時使用上述異常. pool.deregisterWorker(this, exception); } } } } //擦除本地變量.把當前線程的兩個ThreadLocalMap全部置空 final void eraseThreadLocals() { U.putObject(this, THREADLOCALS, null); U.putObject(this, INHERITABLETHREADLOCALS, null); } //每正常運行完一次頂級task,就調用一次它.這個頂級任務自帶易誤解天性,其實可以理解為每一次從隊列取出的任務. void afterTopLevelExec() { } //自帶子類.它不具備任何特殊權限,也不是用戶定義的任何線程組的成員,每次運行完一個頂級任務, //則擦除本地化變量. static final class InnocuousForkJoinWorkerThread extends ForkJoinWorkerThread { //自已創建默認線程組. private static final ThreadGroup innocuousThreadGroup = createThreadGroup(); //訪問控制上下文支持權限. private static final AccessControlContext INNOCUOUS_ACC = new AccessControlContext( new ProtectionDomain[] { new ProtectionDomain(null, null) }); //構造函數. InnocuousForkJoinWorkerThread(ForkJoinPool pool) { super(pool, innocuousThreadGroup, INNOCUOUS_ACC); } @Override void afterTopLevelExec() { //在每一次從隊列取出的"頂級"任務運行后即擦除本地化變量. eraseThreadLocals(); } @Override public ClassLoader getContextClassLoader() { //如果獲取線程上下文類加載器,永遠直接返回系統類加載器. return ClassLoader.getSystemClassLoader(); } //嘗試對未捕獲異常處理器的設置,忽略. @Override public void setUncaughtExceptionHandler(UncaughtExceptionHandler x) { } //禁止直接設置線程的上下文類加載器. @Override public void setContextClassLoader(ClassLoader cl) { throw new SecurityException("setContextClassLoader"); } //創建一個以頂級線程組為父的線程組. private static ThreadGroup createThreadGroup() { try { sun.misc.Unsafe u = sun.misc.Unsafe.getUnsafe(); Class> tk = Thread.class; Class> gk = ThreadGroup.class; long tg = u.objectFieldOffset(tk.getDeclaredField("group")); long gp = u.objectFieldOffset(gk.getDeclaredField("parent")); //當前線程的所屬組. ThreadGroup group = (ThreadGroup) u.getObject(Thread.currentThread(), tg); //循環條件,當前線程的所屬組不是null while (group != null) { //不停地循環向上取parent ThreadGroup parent = (ThreadGroup)u.getObject(group, gp); if (parent == null) //發現無parent的線程組,說明是系統頂級線程組,用它當parent創建一個"無害"線程組返回. return new ThreadGroup(group, "InnocuousForkJoinWorkerThreadGroup"); //有parent,把它賦給group開啟下一輪循環. group = parent; } } catch (Exception e) { //有異常用Error包裝拋出. throw new Error(e); } //不能return就拋出Error. throw new Error("Cannot create ThreadGroup"); } }
以上是工作線程的代碼,粗略總結一下它和普通線程的區別.
首先,它內部會維護一個工作隊列,用它來實現任務調度和竊取.
其次,它提供了一些擴展,如每次頂層任務運行結束,清理ThreadLocal,這也是一種保護機制,避免同線程的本地化數據隨之污染.但粗略去看ForkJoinPool的代碼,發現它只是在每次從隊列取出并運行完一個任務后清除,并稱這個為"頂級循環",這倒也沒錯,但這個任務并不能稱之為頂級任務,因為這里的任務類型是ForkJoinTask,不一定是CountedCompleter等明顯標識了依賴關系的子類,所以父任務和子任務被塞進一個隊列,即使未被竊取,只由當前線程執行,兩次的本地化數據也是不同的.
不過如果我們在ForkJoinTask的exec方法中加入本地化,或在CountedCompleter中加入本地化,顯然每一個在此生成的子任務都會在相應的線程執行doExec時設置這些屬性,并在執行結束后清除.
最后官方提供的默認子類,以及一些線程組,優先級,權限等作者也未深入研究,但是我們構建線程池的時候有一個參數就是"線程工廠",了解下它或許能對后續的ForkJoinPool源碼閱讀有所幫助.
接下來簡述一個官方提供的案例,并以此聊一聊并行流.
官方案例第一節論述了CountedCompleter,顯然它作為一個抽象類,只是定義了某一些環節,以及一些環節的子環節的組合過程,而具體的實現與使用它定義的api則由用戶實現,它的源碼中并無使用(當然也可以看一些子類,但比較復雜),在CountedCompleter的源碼注釋中,道格大神提供了若干案例,這里舉出兩個來簡要說明一下前面論述過的使用方式,也可以為下一節論述官方提供的子類(并行流api中)提供閱讀基礎.
第一個是并行的可竊取的分治查找算法.
@Test public void testDivideSearch(){ Integer[] array = new Integer[10000000]; for(int i = 0; i < array.length; i++){ array[i] = i+1; } AtomicReferenceresult = new AtomicReference<>(); Integer find = new Searcher<>(null, array, result, 0, array.length - 1,this::match).invoke(); LOGGER.info("查找結束,任務返回:{},result:{}",find,result.get()); } static class Searcher extends CountedCompleter { final E[] array; final AtomicReference result; final int lo, hi; final Function matcher; Searcher(CountedCompleter> p, E[] array, AtomicReference result, int lo, int hi,Function matcher){ super(p); this.array = array; this.result = result; this.lo = lo; this.hi = hi; this.matcher = matcher; } @Override public void compute() { int l = this.lo;int h = this.hi; while(result.get() == null && h >= l){ if(h - l >=2){ int mid = (l + h)>>>1; //添加掛起任務數量,這樣當出現tryComplete時可以觸發root的結束(未查到) addToPendingCount(1); new Searcher (this,array,result,mid,h,matcher).fork(); h = mid; }else{ E x = array[l]; if(matcher.apply(x) && result.compareAndSet(null,x)){ super.quietlyCompleteRoot(); } break; } } //當前未有任何一個線程查到結果,當前任務也完成了子集查找,減少一個掛起數量,若掛起數已減至0則終止. if(null == result.get()) tryComplete(); } } private boolean match(Integer x) { return x > 2000000 && x%2 ==0 && x%3 == 0 && x%5 ==0 && x %7 ==0; }
該案例的邏輯很簡單,給定一個非常大的數組,充分利用本機的資源去查找滿足一個條件的元素.為了方便,在具體的查找數據上選定了整型,查找的條件也非常簡單.
在該案例中,會對結果進行分治,首先分治出足夠多的子任務,剩下的不需再分的父任務由當前線程完成,子任務則壓入工作隊列,其他空閑的線程就會來偷取子任務并執行.當有任務一個子任務查找到相應的數字后,即將它存放到result,并安靜地完成根任務.
此時整個任務鏈處在一個非常尷尬的情況:查找到結果的子任務將root設置為完成,而整條鏈上的非root任務均未完成.但因循環條件不滿足,退出了循環.此時查到result已有值,并不執行最后的tryComplete,執行結束,任務的status依舊為未完成,是否有重復執行的問題?
答案是沒有問題,因為ForkJoinTask絕對會在ForkJoinPool中調度(哪怕是common池),在common池中,任務執行前必須出隊,盡管compute方法在本例中沒有將這些任務設置為完成,但任務不會被二次執行.可見,上一章中費大力介紹的status字段也有無用的時候.
但是除了root任務需要使用到獲取結果的功能,需要保證status是負數,它產生的子孫任務還有什么用呢?所有compute方法會因為循環中止而結束,此后的這些任務不存在任何外部引用,會被gc清理,即使存在外部引用,用它去獲取子孫任務的執行情況或result也沒有任何意義.
顯然這個案例解決了至少兩個疑問,一是怎么實現一個保存result的ForkJoinTask,二是ForkJoin框架如何在查找方面大幅提升性能,很明顯,相比單線程遍歷的辦法,此例多線程查詢,且任何一個子任務在并行條件下完成了查詢,整個大任務均可以終止.
第二個是傳說中的map?reduce.大數據中常使用此概念(跨節點).
在并行流中,map可以代表非阻斷操作,reduce可以代表阻斷操作,但是reduce同樣可以并行地執行.
道格在注釋上給出了兩個map?reduce案例,我們只看第一個,它也是后續并行流一節我們要看的例子比較相近的解法.方法二有些繞,較難理解,但也優雅.
@Test public void testMapReduce() { Integer[] array = {1, 2, 3}; //方法一. Integer result = new MapRed<>(null, array, (a)->a+2, (a,b)->a+b, 0,array.length).invoke(); LOGGER.info("方法一result:{}",result); //方法二我就不抄了,就在官方注釋上. result = new MapReducer<>(null, array, (a) -> a + 1 , (a, b) -> a + b, 0, array.length, null).invoke(); LOGGER.info("方法二result:{}", result); } /** * 第一種map reduce方式,很好理解. * @param*/ private class MapRed extends CountedCompleter { final E[] array; final MyMapper mapper; final MyReducer reducer; final int lo, hi; MapRed sibling;//兄弟節點的引用 E result; MapRed(CountedCompleter> p, E[] array, MyMapper mapper, MyReducer reducer, int lo, int hi) { super(p); this.array = array; this.mapper = mapper; this.reducer = reducer; this.lo = lo; this.hi = hi; } public void compute() { if (hi - lo >= 2) { int mid = (lo + hi) >>> 1; MapRed left = new MapRed(this, array, mapper, reducer, lo, mid); MapRed right = new MapRed(this, array, mapper, reducer, mid, hi); left.sibling = right; right.sibling = left; //只掛起右任務 setPendingCount(1); right.fork(); //直接運算左任務. left.compute(); } else { if (hi > lo) result = mapper.apply(array[lo]); //它會依次調用onCompletion.并且是自己調自己或completer調子, //且只有左右兩個子后完成的能調成功(父任務的掛起數達到0). tryComplete(); } } public void onCompletion(CountedCompleter> caller) { //忽略自己調自己. if (caller != this) { //參數是子任務. MapRed child = (MapRed ) caller; MapRed sib = child.sibling; //設置父的result. if (sib == null || sib.result == null) result = child.result; else result = reducer.apply(child.result, sib.result); } } public E getRawResult() { return result; } } //mapper和reducer簡單的不能再簡單. @FunctionalInterface private static interface MyMapper { E apply(E e); } @FunctionalInterface private static interface MyReducer { E apply(E a, E b); }
上面的邏輯也很簡單,首先就是對任務的分解,簡單的將任務分為左和右,左直接由父任務執行(可能再分),右則入池,所有子任務直到不能再分(葉子任務)以map為result,每個葉子任務完成后會調用tryComplete.
這個動作會觸發一系列的completer棧元素的掛起數下降或完成,顯然,如果把completer理解為一個普通樹(這是作者很少見到的非二叉樹的情況,盡管這個例子寫成了二叉樹,我們完全可以在compute中將父任務一分為多,而不是限2個),從葉子節點開始,每個葉子節點完成(result是mapper的結果)會嘗試onCompletion并減少父節點的掛起任務數,但只有同父節點的最后一個兄弟節點可以進入onCompletion設置父節點的結果,并且由于這個設置過程的前提是父節點符合掛起任務數為0,因此符合循環繼續的條件,葉子節點的動作會繼續向上判斷父節點的父節點,直到root為止.假設線程數量足夠,保證每個子任務都有一個線程處理,那么深度每上一層,就會有一半(非二叉樹的情況每個父節點只能有一個通過)的執行葉子節點任務的線程因不符合某個任務的掛起數量為0的條件而退出,這樣逐級傳導,最后到root調用它最后一個子節點的onCompletion,使用reducer進行合并.
本例中進行結果合并的寫法(onCompletion)只適合二叉樹,有興趣的讀者可以看看道格在注釋中給出的第二種寫法,幾叉都可以.而且該實現很優雅,并未寫onCompletion函數,但是寫法真心夠繞的.
并行流簡述在JAVA8中支持了lamda表達式的同時,也支持了函數式編程,由此出現了一種新型的計算方式:流式計算,也出現了一種讓包括作者在內很多人興奮不已的編程方式:響應式編程.
流式計算的核心在于Stream?api,流有很多分類,比如并行流和串行流,這點可以顧名思義,同樣的,流中的每一個操作都可以劃分類型,比如阻斷操作和非阻斷操作.
java中實現并行流就是基于這些操作,CountedCompleter的一些子類就是這些操作的類型,顯然,如在前一篇文章所說,使用了并行流,就是使用了ForkJoin框架.
當我們使用下面的代碼,會發生什么操作?
Stream.of(1,2,3,4,5).parallel().map(x -> x + 1).reduce((a, b) -> a + b).get(); //map只是將動作簡單地記了下來,包裝起來,等到阻斷操作時才會真正執行. 位于ReferencePipeline public finalStream map(Function super P_OUT, ? extends R> mapper) { Objects.requireNonNull(mapper);//非空檢查 //返回一個無狀態操作. return new StatelessOp (this, StreamShape.REFERENCE, StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) { @Override Sink opWrapSink(int flags, Sink sink) { //典型的適配器模式.將action一律封裝為Sink. return new Sink.ChainedReference (sink) { @Override public void accept(P_OUT u) { downstream.accept(mapper.apply(u)); } }; } }; } //阻斷操作reduce位于 ReferencePipeline public final Optional reduce(BinaryOperator accumulator) { return evaluate(ReduceOps.makeRef(accumulator)); } //AbstractPipeline final R evaluate(TerminalOp terminalOp) { assert getOutputShape() == terminalOp.inputShape(); if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; return isParallel() ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags())); } //TerminalOp阻斷操作接口的默認方法 default R evaluateParallel(PipelineHelper helper, Spliterator spliterator) { if (Tripwire.ENABLED) Tripwire.trip(getClass(), "{0} triggering TerminalOp.evaluateParallel serial default"); return evaluateSequential(helper, spliterator); } //看ReduceOps 它返回了一內部類ReduceTask public R evaluateParallel(PipelineHelper helper, Spliterator spliterator) { return new ReduceTask<>(this, helper, spliterator).invoke().get(); } //內部類ReduceTask間接繼承自CountedCompleter private static final class ReduceTask > extends AbstractTask > { private final ReduceOp op; ReduceTask(ReduceOp op, PipelineHelper helper, Spliterator spliterator) { super(helper, spliterator); this.op = op; } ReduceTask(ReduceTask parent, Spliterator spliterator) { super(parent, spliterator); this.op = parent.op; } //老外起的名子,造小孩. @Override protected ReduceTask makeChild(Spliterator spliterator) { //和上面的例子非常相似的代碼,只是封裝更好. return new ReduceTask<>(this, spliterator); } @Override protected S doLeaf() { //葉子節點做這個. return helper.wrapAndCopyInto(op.makeSink(), spliterator); } //重寫了前面提過的onCompletion函數 @Override public void onCompletion(CountedCompleter> caller) { if (!isLeaf()) { //不是葉子節點.這條件,和前面咱們分析的多么匹配. //計算左結果 S leftResult = leftChild.getLocalResult(); //聯合右結果. leftResult.combine(rightChild.getLocalResult()); //聯合完的結果就是當前completer的結果. setLocalResult(leftResult); } // 直接父類是AbstractTask,它會對父,左右子幫助gc. super.onCompletion(caller); } } //AbstractTask幫助gc public void onCompletion(CountedCompleter> caller) { spliterator = null; leftChild = rightChild = null; } //更多實現細節自閱...
顯然,并行流(至少我舉的這個例子)是基于ForkJoin框架的.分治的思想與前面道格的例子相似,只是更加優雅和封裝更好.有了前面的基礎,若要詳細熟悉并行流原理,需要進一步了解的只有他們的繼承樹,分割聚合組件等邊角料,核心的調度思想已經不再是困難.
回到問題,當我們使用并行流時發生了什么?首先是非阻斷操作時,與串行流情況同樣,也是先將action封裝成適配器,僅在阻斷操作發生時的調度不同,并行流在阻斷操作下使用ForkJoin框架進行調度,任務的分割則使用它的Splitor,結果的合并也有它的Combiner.其他的流程與上面的案例無異.
后語1.CountedCompleter使用普通樹的結構存放動作,但是它又是另類的樹,因為子節點能找到父節點,父節點卻找不到子節點,而只知道子節點代表的動作未執行的數量,因此或許從訪問方式的角度來看還是用棧來理解更好.在這里樹既是數據結構,也是一個另類的操作棧.只從一個completer往下看,它是個棧,但從父節點的角度來講,它是一個訪問不到子節點的普通樹(或許我們不應該強行為它套上一個數據結構,不然總覺得不倫不類,但是用樹這個形狀便于理解).每個節點會存放掛起任務數量,每個節點的任務完成未必會設置它自己的完成態,但會嘗試將completer父元素棧(或者樹的一條線)上的每個任務掛起數量減一或將根節點安靜置為完成態.關于具體的理解和代碼實現,以及如何保存一個任務的運行結果,可以參考前面案例的章節,也可以以此為基礎去看并行流的源碼,但也要相應的理解并行流為了便捷實現而提供的各種分割合并組件.
2.ForkJoinWorkerThread是運行在ForkJoinPool中的主要線程,它內部維護了一個工作任務隊列,并存放了該隊列在線程池中的間接索引.借此實現任務的竊取,避免過于空閑等待,任務fork會直接push到該隊列,第一次擴容時,才給該隊列初始化任務數組,當線程從池中卸載時,不會清除掉該數組,這樣線程無法再次啟動.線程的啟動有一些勾子,官方提供的線程工廠有兩個,一個直接創建ForkJoinWorkerThread,另一個創建它的子類
InnocuousForkJoinWorkerThread,它除了一些安全策略外,最大的區別在于ForkJoinWorkerThread在注冊入池前進行本地化數據的清理,而它則每次完成一個主任務處理就清理一次.
3.并行流是ForkJoin框架的一個典型應用,JAVA8?Stream?api中的并行流定義了大量的以CountedCompleter為基礎的操作.利用分割/合并和周邊組件實現了基于ForkJoin框架的并行計算調度.
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77888.html
摘要:前言在前面的三篇文章中先后介紹了框架的任務組件體系體系源碼并簡單介紹了目前的并行流應用場景框架本質上是對的擴展它依舊支持經典的使用方式即任務池的配合向池中提交任務并異步地等待結果毫無疑問前面的文章已經解釋了框架的新穎性初步了解了工作竊取 前言 在前面的三篇文章中先后介紹了ForkJoin框架的任務組件(ForkJoinTask體系,CountedCompleter體系)源碼,并簡單介紹...
摘要:前言在前面的文章和響應式編程中提到了和后者毫無疑問是一個線程池前者則是一個類似經典定義的概念官方有一個非常無語的解釋就是運行在的一個任務抽象就是運行的線程池框架包含和若干的子類它的核心在于分治和工作竅取最大程度利用線程池中的工作線程避免忙的 前言 在前面的文章CompletableFuture和響應式編程中提到了ForkJoinTask和ForkJoinPool,后者毫無疑問是一個線程...
摘要:分區函數返回一個布爾值,這意味著得到的分組的鍵類型是,于是它最多可以分為兩組是一組,是一組。當遍歷到流中第個元素時,這個函數執行時會有兩個參數保存歸約結果的累加器已收集了流中的前個項目,還有第個元素本身。 一、收集器簡介 把列表中的交易按貨幣分組: Map transactionsByCurrencies = transactions.stream().collect(groupi...
摘要:類似的你可以用將并行流變為順序流。中的使用順序求和并行求和將流轉為并行流配置并行流線程池并行流內部使用了默認的,默認的線程數量就是處理器的數量包括虛擬內核通過得到。 【概念 并行流就是一個把內容分成多個數據塊,并用不同的線程分別處理每一個數據塊的流。在java7之前,并行處理數據很麻煩,第一,需要明確的把包含數據的數據結構分成若干子部分。第二,給每一個子部分分配一個獨立的線程。第三,適...
摘要:這減輕了手動重復執行相同基準測試的痛苦,并簡化了獲取結果的流程。處理項目的代碼并從標有注釋的方法處生成基準測試程序。用和運行該基準測試得到以下結果。同時,和的基線測試結果也有略微的不同。 Java 8 已經發布一段時間了,許多開發者已經開始使用 Java 8。本文也將討論最新發布在 JDK 中的并發功能更新。事實上,JDK 中已經有多處java.util.concurrent 改動,但...
閱讀 1104·2021-09-22 15:37
閱讀 1131·2021-09-13 10:27
閱讀 2465·2021-08-25 09:38
閱讀 2444·2019-08-26 11:42
閱讀 1524·2019-08-26 11:39
閱讀 1554·2019-08-26 10:58
閱讀 2316·2019-08-26 10:56
閱讀 2568·2019-08-23 18:08