摘要:內部類,用于對和異常進行包裝,從而保證對進行只有一次成功。是取消異常,轉換后拋出。判斷是否使用的線程池,在中持有該線程池的引用。
前言
近期作者對響應式編程越發感興趣,在內部分享"JAVA9-12"新特性過程中,有兩處特性讓作者深感興趣:
1.JAVA9中的JEP266對并發編程工具的更新,包含發布訂閱框架Flow和CompletableFuture加強,其中發布訂閱框架以java.base模塊下的java.util.concurrent.Flow及其中的幾個內部類/接口為組成部分,它們的名稱和作用如下,摘自JAVA12的Flow api文檔。
2.JAVA9中孵化,JAVA11中標準化的HttpClient,在之前分享的JAVA9-12新特性一文中曾引用摘自網絡的HttpClient代碼片段:
片段1:
HttpClient client = HttpClient.newHttpClient(); HttpRequest request = HttpRequest.newBuilder() .uri(URI.create(uri)) .build(); return client.sendAsync(request, HttpResponse.BodyHandlers.ofString()) .thenApply(HttpResponse::body); }
片段2:
HttpClient client = HttpClient.newHttpClient(); Listurls = List.of("http://www.baidu.com","http://www.alibaba.com/","http://www.tencent.com"); List requests = urls.stream() .map(url -> HttpRequest.newBuilder(URI.create(url))) .map(reqBuilder -> reqBuilder.build()) .collect(Collectors.toList()); List >> futures = requests.stream() .map(request -> client.sendAsync(request, HttpResponse.BodyHandlers.ofString())) .collect(Collectors.toList()); futures.stream() .forEach(e -> e.whenComplete((resp,err) -> { if(err != null){ err.printStackTrace(); }else{ System.out.println(resp.body()); System.out.println(resp.statusCode()); } })); CompletableFuture.allOf(futures .toArray(CompletableFuture>[]::new)) .join(); }
在片段1中,thenApply方法是CompletableFuture的成員,client.sendAsync返回的是一個CompletableFuture。這兩段代碼很好閱讀,甚至說猜出其中的意義。片段2可以說對于作者目前的書寫習慣是一個全面的顛覆,顯然我們可以預先定義響應行為,而行為的執行時間則由前一個階段的實際完成時間決定。片段2中的whenComplete方法很好理解,最后一行用allOf生成一個類似樹的依賴結構,在當前方法中等待所有CompletableFuture執行完成。
簡單看這兩段代碼,響應式編程的魅力可見一斑,甚至可以說是美妙不可言。
那么,作為JAVA9中額外照顧增強,HttpClient賴以實現的CompletableFuture,它是何方神圣呢?
CompletionStage是什么?不妨賣個關子先。
作者目前使用的JDK版本為8,盡管它不包含9之后的增強,萬幸CompletionStage是從JDK8引入,因此足以用以了解這一偉大神器了。近期作者在公司使用的一些開源框架中,發現至處間接對它的使用:
1.持久化框架Redission。它內部使用一個RedissonExecutorService(實現ScheduledExecutorService)和PromiseDelegator(實現CompletionStage,而CompletableFuture同樣也實現了CompletionStage)來異步地執行task。
2.apollo配置中心。它提供了配置變更的異步通知機制,而這依賴于spring web-mvc提供的DeferredResult,而在異步處理return value時,DeferredResult的setResult同樣也是相應的CompletionStage執行。
//例:阿波羅NotificationControllerV2拉取通知接口 @GetMapping public DeferredResult>> pollNotification( @RequestParam(value = "appId") String appId, @RequestParam(value = "cluster") String cluster, @RequestParam(value = "notifications") String notificationsAsString, @RequestParam(value = "dataCenter", required = false) String dataCenter, @RequestParam(value = "ip", required = false) String clientIp) { List notifications = null; //省略無關代碼 //DeferredResultWrapper是apollo作者包裝的spring DeferredResult DeferredResultWrapper deferredResultWrapper = new DeferredResultWrapper(); //省略無關代碼 if (!CollectionUtils.isEmpty(newNotifications)) { deferredResultWrapper.setResult(newNotifications); } else { deferredResultWrapper .onTimeout(() -> logWatchedKeys(watchedKeys, "Apollo.LongPoll.TimeOutKeys")); deferredResultWrapper.onCompletion(() -> { //unregister all keys for (String key : watchedKeys) { deferredResults.remove(key, deferredResultWrapper); } logWatchedKeys(watchedKeys, "Apollo.LongPoll.CompletedKeys"); }); //省略 return deferredResultWrapper.getResult(); }
在spring的CompletionStageReturnValueHandler的handleReturnValue()方法中,如下異步地處理響應結果:
@Override public void handleReturnValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { if (returnValue == null) { mavContainer.setRequestHandled(true); return; } final DeferredResult
以上代碼的future.thenAccept與future.exceptionally只是規定了兩種情況下程序接下來的運行行為,相應的代碼不是立即執行,而是等到相應的行為發生了才去執行。很明顯,同步式編程寫流程,響應式編程似乎就是在寫行為。
顯然,只要熟悉了CompletionStage的api,以上的代碼就絕對簡單了,好了,開胃菜已上完,接下來介紹CompletionStage。
CompletionStage其實很好理解,按照官方定義,它表示一個可能異步運行的“階段”,在該階段內要執行相應的行為,而這些運算會在另一個CompletionStage完成后開始,它自身完成后又可觸發另一個依賴的CompletionStage。
在CompletionStage中這些方法均可用來定義一個行為,行為的執行方式可參考方法名和入參,這與java8中的stream api持同樣的風格。行為參數可以是Consumer,Function,Runnable。包含accept的方法,參數會有一個Consumer,它會消費上一或多個CompletionStage的結果;包含run的方法,參數會有一個Runnable,它的運行不需要前面CompletionStage的執行結果;包含apply的方法,參數會包含Function,該function一般以前一或幾階段的返回值為入參,以自身的執行結果作為當前CompletionStage的結果。
CompletionStage和實現類ComletableFuture的方法名中也會包含either/all/any等簡單的單詞,和上述的含義相組合,不難理解。
以以下三個接口為例說明:
1.public CompletionStagerunAfterBoth(CompletionStage> other,Runnable action);
接口會返回一個CompletionStage,該stage僅在當前stage和參數中的other正常完成后才會執行參數中的action。
2.public CompletionStage applyToEitherAsync(CompletionStage extends T> other,Function super T, U> fn);
接口會返回一個CompletionStage,該stage會在當前stage或參數中的other正常執行完畢后異步執行參數中的函數fn,而fn的參數就是前面執行完畢的stage的結果,fn的返回值將是被返回的stage的結果。
3.public CompletionStagethenAcceptBothAsync(CompletionStage extends U> other,BiConsumer super T, ? super U> action, Executor executor);
接口會返回一個CompletionStage,它會在當前stage和參數中的other正常執行完畢后執行,以這兩個stage的結果作為參數,在參數executor線程池中執行action函數,因為它是一個消費者,因此沒有返回值。
接口的其他方法邏輯類似,不再綴述。
上一節簡述了CompletionStage接口的函數定義,作為官方提供的實現類,CompletableFuture實現了有關的所有接口,它的作者依舊是我等膜拜的道格大神,下面來具體分析CompletableFuture的實現。
類簽名:
public class CompletableFuture
從簽名信息來看,CompletableFuture實現了Future和CompletionStage接口,這意味著它即滿足CompletableStage的階段執行,也提供了Future中獲取該執行結果的方法。
首先來看成員變量和核心函數:
volatile Object result; // 當前對象的結果或者一個異常包裝對象AltResult,關于AltResult下面再看 volatile Completion stack; // 任務棧,Completion后面再述。 final boolean internalComplete(Object r) { //使用cas原子操作,將原本為null的result置為r,所有調用者都保證r不是null,因此只有第一次才能返回true。 return UNSAFE.compareAndSwapObject(this, RESULT, null, r); } final boolean casStack(Completion cmp, Completion val) { //嘗試用cas原子操作將當前stack的值從cmp換為val。 return UNSAFE.compareAndSwapObject(this, STACK, cmp, val); } //其中STACK,RESULT就是上面stack和result的句柄,這點和其他juc中的工具慣例相同 private static final sun.misc.Unsafe UNSAFE; private static final long RESULT; private static final long STACK; private static final long NEXT; static { try { final sun.misc.Unsafe u; UNSAFE = u = sun.misc.Unsafe.getUnsafe(); Class> k = CompletableFuture.class; RESULT = u.objectFieldOffset(k.getDeclaredField("result")); STACK = u.objectFieldOffset(k.getDeclaredField("stack")); NEXT = u.objectFieldOffset (Completion.class.getDeclaredField("next")); } catch (Exception x) { throw new Error(x); } }
stack的類型為Completion,為了方便理解,在介紹Completion類之前,先看幾個聲明在CompletableFuture的常量
static final int SYNC = 0;//同步 static final int ASYNC = 1;//異步 static final int NESTED = -1;//嵌套
再來看Completion類的結構
//繼承ForkJoinTask,實現Runnable,以及簽名接口AsynchronousCompletionTask abstract static class Completion extends ForkJoinTaskimplements Runnable, AsynchronousCompletionTask { volatile Completion next; // 指向下一個Completion //當被觸發時,執行completion動作,如果存在需要傳遞的行為, //返回一個代表該行為的CompletableFuture //參數只能是上面提到的SYNC,ASYNC,NESTED,后面留意它的正負。 abstract CompletableFuture> tryFire(int mode); //如果當前completion依舊是可觸發的,則返回true,這會在清理任務棧時使用. abstract boolean isLive(); //繼承自Runnable,直接調用tryFile,參數為1 public final void run() { tryFire(ASYNC); } //繼承自ForkJoinTask,直接調用tryFile,參數為1,返回true public final boolean exec() { tryFire(ASYNC); return true; } //繼承自ForkJoinTask,直接返回null public final Void getRawResult() { return null; } //繼承自ForkJoinTask,空方法。 public final void setRawResult(Void v) {} }
上面列舉了內部類Completion的全部代碼,它繼承并實現了ForkJoinTask和Runnable中的抽象方法,同時聲明了tryFire這個抽象方法供子類實現。因為繼承了ForkJoinTask,這意味著Completion也是一個任務,且它可能在ForkJoinPool中執行。關于Completion和它的子類后面詳述。先來繼續看核心函數和成員實現。
/** 嘗試將一個任務壓棧,成功返回true */ final boolean tryPushStack(Completion c) { Completion h = stack; lazySetNext(c, h);//把當前的棧設置為c的next //嘗試把當前棧(h)更新為新值(c) return UNSAFE.compareAndSwapObject(this, STACK, h, c); } //lazySetNext定義 static void lazySetNext(Completion c, Completion next) { UNSAFE.putOrderedObject(c, NEXT, next); }
方法tryPushStack的流程很簡單,先調用lazySetNext將當前棧設置為參數的next,這樣達到了棧的后入為頂層的目的,然后試圖將頂部元素設置為新壓入棧的c。
/** 不加鎖將任務壓棧,使用cas加自旋的方式,這也是道格大神的經典. */ final void pushStack(Completion c) { do {} while (!tryPushStack(c)); }
接下來是一些對輸出結果編碼的代碼。
//內部類,用于對null和異常進行包裝,從而保證對result進行cas只有一次成功。 static final class AltResult { // See above final Throwable ex; // null only for NIL AltResult(Throwable x) { this.ex = x; } } /** 空值用一個ex為null的AltResult表示 */ static final AltResult NIL = new AltResult(null); /** 使用上面的NIL完成任務,若任務已經被完成過,返回false */ final boolean completeNull() { return UNSAFE.compareAndSwapObject(this, RESULT, null, NIL); } /** 對空值進行編碼,使用NIL */ final Object encodeValue(T t) { return (t == null) ? NIL : t; } /** 使用t完成當前任務,t是null時使用NIL作為結果,否則使用t */ final boolean completeValue(T t) { return UNSAFE.compareAndSwapObject(this, RESULT, null, (t == null) ? NIL : t); } //對異常進行編碼,返回一個AltResult,其值ex取決于參數x, //若x為CompletionException則直接用x賦值ex, //否則用CoimpletionException包一層。 static AltResult encodeThrowable(Throwable x) { return new AltResult((x instanceof CompletionException) ? x : new CompletionException(x)); } /** 使用參數提供的異常的編碼結果完成任務,若result已非空,返回false */ final boolean completeThrowable(Throwable x) { return UNSAFE.compareAndSwapObject(this, RESULT, null, encodeThrowable(x)); } // 如果x非CompletionException,將它包裹成CompletionException返回。 //如果不是,則判斷,若r是AltResult且其ex就是參數x的值,則將r返回。 // 否則將x包裹成AltResult返回。 static Object encodeThrowable(Throwable x, Object r) { if (!(x instanceof CompletionException)) x = new CompletionException(x); else if (r instanceof AltResult && x == ((AltResult)r).ex) return r; return new AltResult(x); } // 給定一個Throwble x,一個Object r,使用上面的方法編碼的結果來嘗試完成。 final boolean completeThrowable(Throwable x, Object r) { return UNSAFE.compareAndSwapObject(this, RESULT, null, encodeThrowable(x, r)); } //如果x不是null,使用上面的encodeThrowable對x編碼的結果返回,否則若t是空, // 返回NIL,否則返回t。 Object encodeOutcome(T t, Throwable x) { return (x == null) ? (t == null) ? NIL : t : encodeThrowable(x); } static Object encodeRelay(Object r) { Throwable x; //對非空參數r進行判斷。 //若r是AltResult且具備非空的ex,且ex并不是CompletionException類型, //將ex包裝成CompletionException,并包裹成AltResult返回。 //其他情況直接返回r。 return (((r instanceof AltResult) && (x = ((AltResult)r).ex) != null && !(x instanceof CompletionException)) ? new AltResult(new CompletionException(x)) : r); } final boolean completeRelay(Object r) { //這段代碼的邏輯和上一個方法聯合去看,當前未完成的情況下,嘗試使用參數r完成。 //如果r是異常,嘗試將它包裝成CompletionException并外包一層AltResult。 //用這個AltResult完成。 return UNSAFE.compareAndSwapObject(this, RESULT, null, encodeRelay(r)); }
CompletableFuture本質也是一個Future,因此也會支持異步的阻塞的result獲取。因為在完成這個future時,為了便于處理和維護,使用了編碼的結果,固在讀取結果時,也要對結果進行解碼。
/** * 供future.get()使用。 */ private staticT reportGet(Object r) throws InterruptedException, ExecutionException { if (r == null) //參數r代表一個CompletableFuture的result,因為它會對異常和null進行編碼。 //故null可以視為get的中間被擾動的結果。 throw new InterruptedException(); if (r instanceof AltResult) { Throwable x, cause; //這一段很簡單,是AltResult,ex是空返回空。 if ((x = ((AltResult)r).ex) == null) return null; if (x instanceof CancellationException) //ex是取消異常,轉換后拋出。 throw (CancellationException)x; if ((x instanceof CompletionException) && (cause = x.getCause()) != null) //異常是包裝異常CompletionException,取出被包裝的異常拋出。 x = cause; throw new ExecutionException(x); } //result不是null也不能體現異常,強轉返回。 @SuppressWarnings("unchecked") T t = (T) r; return t; } //reportJoin方法相對簡單,因為join操作會一直等待,r能保證非空。 //對于非AltResult類型的r直接強轉返回,AltResult類型的處理與 //reportGet類似,但是不解CompletionException,直接拋出。 //此方法拋出的異常均不受檢。 private static T reportJoin(Object r) { if (r instanceof AltResult) { Throwable x; if ((x = ((AltResult)r).ex) == null) return null; if (x instanceof CancellationException) throw (CancellationException)x; if (x instanceof CompletionException) throw (CompletionException)x; throw new CompletionException(x); } @SuppressWarnings("unchecked") T t = (T) r; return t; }
相應的get和join方法實現。
public T get() throws InterruptedException, ExecutionException { Object r; return reportGet((r = result) == null ? waitingGet(true) : r); } public T join() { Object r; return reportJoin((r = result) == null ? waitingGet(false) : r); }
可以看出,get和join方法分別先調用reportGet,reportJoin,若得到的空結果,會繼續調用waitingGet方法,只是參數分別為true和false,waitingGet方法的實現需要先了解剩余的核心函數以及Completion子類,稍后再看。
一些與異步操作的準備:
/** * 標識是異步方法產生的任務的接口,對于異步行為的監控,debug,追蹤會很有用。 * 在jdk8的CompletableFuture實現中,它有三個直接實現類,AsyncRun, * AsyncSupply以及前面提到過的Completion。 */ public static interface AsynchronousCompletionTask { } //判斷是否使用ForkJoinPool的common線程池,在ForkJoinTask中持有該線程池的引用。 //判斷規則是可用cpu核數大于1. private static final boolean useCommonPool = (ForkJoinPool.getCommonPoolParallelism() > 1); //異步線程池,根據上述判斷,決定使用commonPool還是ThreadPerTaskExecutor, // 后者是一個對每一個任務都新建一個線程的low逼線程池。 private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); /** low逼線程池源碼,沒什么可說的 */ static final class ThreadPerTaskExecutor implements Executor { public void execute(Runnable r) { new Thread(r).start(); } } static Executor screenExecutor(Executor e) { if (!useCommonPool && e == ForkJoinPool.commonPool()) //判斷參數執行器(線程池的父接口,一般會傳入線程池)是否需要屏蔽, //如果參數就是ForkJoinPool.commonPool()并且經前面的系統判斷 //useCommonPool為false,則強制使用asyncPool。 return asyncPool; if (e == null) throw new NullPointerException(); //非空且通過驗證,返回參數e return e;
}
為異步做的這些準備很好理解,屏蔽不合理的線程池使用,在用戶提供的線程池,commonPool和ThreadPerTaskExecutor之中擇一,在后續的操作中需要使用它們。
還有兩個重要的核心函數,是道格大神的神作。
final void postComplete() { CompletableFuture> f = this; Completion h;//初始化f為this while ((h = f.stack) != null ||//1,f的棧非空 (f != this && (h = (f = this).stack) != null)) {//2 f的棧為空且不是this,重置 CompletableFuture> d; Completion t; if (f.casStack(h, t = h.next)) {//3 h出棧 if (t != null) {//4 出棧的h不是最后一個元素,最后一個元素直接執行7即可,減少一次循環cas競態 if (f != this) {//f不是this pushStack(h);//5 將f剛出棧的h(頂)入this的棧(頂) continue; } h.next = null; //6 detach 幫助gc } //tryFire參數為NESTED,即-1,這是它唯一一次使用。 f = (d = h.tryFire(NESTED)) == null ? this : d;//7 f棧的最后一個元素或者就是this棧中的元素 } } }
這寥寥數行代碼的含金量不可小覷。它應在將要完成時調用,很明顯,它會將當前CompletableFuture的棧以及傳遞依賴的其他CompletableFuture的棧清空。為了便于解釋,在相應的代碼上打出了編號,下面詳細分析。
調用該方法,首先進入1,此時f是當前CompletableFuture,h是它的stack,滿足不為空的判斷,進入3.
到達3時,將棧頂Completion h出棧,一般除非并發多個線程對同一個CompletableFuture調用postComplete,否則一定會成功并到達4。若出現多個線程調用,cas失敗,則重新循環。
到達4后,若發現f的棧已空,則直接進入7,否則判斷f是否為當前CompletableFuture,若是,則進行6,取消h和t的關聯,若不是則進入5,將h(f中剛剛移除的棧頂)壓入當前Completable的棧并重新循環。
顯然,只要處理當前CompletableFuture的棧,就一定會執行7,只要處理的是另一個CompletableFuture的棧,就會將其出棧,然后壓入當前CompletableFuture的棧。
在7處,會嘗試執行棧頂的Completion的tryFile方法,它會返回一個可能為null的CompletableFuture,若非空,則賦給f,否則將this賦給f。
所以這段方法的真實執行流程:當前CompletableFuture的棧中元素逐個出棧并tryFile,發現新的CompletableFuture,將它的元素反向壓入本CompletableFuture的棧,壓入結束后,繼續對棧中元素逐個出棧并tryFire,發現非空CompletableFuture則繼續上述過程。直到本CompletableFuture的棧中不再有元素(此時tryFire返回的CompletableFuture棧也是空的)為止。
膜拜道格大神的同時,順便點一下,這似乎是一種避免遞歸的方式。只不過tryFire返回的CompletableFuture中的棧元素將會反向執行。
/* 遍歷棧并去除死亡任務/
final void cleanStack() { for (Completion p = null, q = stack; q != null;) {//初始條件,q指向null時終止。 Completion s = q.next;//循環內第一行,q永遠指向棧頂,s永遠指向棧頂第二個元素或者null if (q.isLive()) {//a只要q存活,就將p指向q,并將q指向s p = q; q = s; } else if (p == null) {//b q不存活,p是null,兩種可能,從未見到存活的節點,或執行過最后的重啟 casStack(q, s);/將q出棧 q = stack;//變量q重新指向新的棧頂。 } else { p.next = s;//q已死亡,且當前已經找到過存活的元素。p指向q的下一個元素s,從而將q出棧 if (p.isLive())//c判斷p是否存活,而p只能是null或者最近一個存活的Completion q = s;//6.q前進 else {//4 p = null; //d 重新將p置null并將q指向當前的棧,重啟循環。 q = stack; } } } }
為了讓這段代碼的說明更加清晰,不妨舉個簡單的例子說明。
假定當前CompletableFuture的棧中有1-9個元素,其中14568在調用cleanStack方法時已死亡,在執行過程中,也出現方法執行過程中出現死亡的狀態。
進入循環,p為null,q指向1,滿足循環條件,開始第一輪循環。
第一輪循環進入后,s指向2,p為null,q指向1,是個死亡對象,因此在第一個判斷條件a處未能通過,b判斷條件為真,q被移除,循環結束,此時p為null,q指向2,棧變為2-9.
第二輪循環進入,s指向3,p為null,q指向2,是個存活對象,進入a,循環結束,p指向2,q指向3。棧依舊為2-9.
第三輪循環進入,s指向4,p為2,q指向3,是存活對象,進入a,循環結束,p指向3,q指向4,棧保持2-9不變。
第四輪循環進入,s指向5,p為3,q指向4,是個死亡對象,p非空且存活,進入c,則p保持為3,3的next指向5,q指向5.循環結束,棧變為2356789.
第五輪循環進入,s指向6,p指向3,q指向5,是個死亡對象,p非空且存活,進入c,p保持為3,3的next指向6,q指向6,循環結束,棧變為236789.
第六輪循環進入,s指向7,p指向3,q指向6,是個死亡對象,假定此時3死亡,則3的next指向7,進入d分支,p為null,q為2,棧為23789.
第七輪循環進入,s指向3,p為null,q指向2,是個存活對象,p指向2,q指向3,棧依舊為23789.
第八輪循環進入,s指向4,p指向2,q指向3,是個死亡對象,p非空且存活,進入c,則p保持為2,q指向7,3的next指向7,棧變2789.
第九輪進入,s指向8,p指向2,q指向7,是個存活對象,進入a分支,p變為7,q變為8,棧保持2789.假定此步之后2死亡,但此時p已經指向7.
第十輪進入,s指向9,p指向7,q指向8,是個死亡對象,p當前指向7且存活,所以盡管2不存活,仍舊進入分支c,p保持為7,q指向9,7的next指向9.棧為279.
第十一輪,s為null,p指向7,q指向9,是個存活對象,則進入a分支,p變為9,q變為null,棧保持279.
因q為null,循環終止。棧經過清理只剩下279三個元素,其中2因為巧合而死亡且未被清理。
下面回到Completion,Completion是一個抽象類,前面已經簡單展示它的源碼,它的子類如下:
可以看到有三個直接子類,CoCompletion,Signaller和UniCompletion。UniCompletion又有若干子類,它們分別作為一些CompletionStage中聲明方法的實現工具,很明顯,道格大神在此處大量使用了策略模式。
先來簡單看一下CoCompletion的實現:
static final class CoCompletion extends Completion { //CoCompletion完全委托給base執行。 BiCompletion,?,?> base; CoCompletion(BiCompletion,?,?> base) { this.base = base; } final CompletableFuture> tryFire(int mode) { BiCompletion,?,?> c; CompletableFuture> d; if ((c = base) == null || (d = c.tryFire(mode)) == null) //base未指定,或base的tryFire返回null,則返回null。 return null; base = null; // 解除關聯,再isLive判斷為死亡。 //返回的d就是base的tryFire返回的非空CompletableFuture return d; } final boolean isLive() { BiCompletion,?,?> c; //存活標準,base非空且base的dep非空。 return (c = base) != null && c.dep != null; } }
CoCompletion雖然是Completion的直接子類,但它依賴了BiCompletion,且BiCompletion是UniCompletion的直接子類,先來看UniCompletion.
abstract static class UniCompletionextends Completion { Executor executor;//用來執行任務的執行器 CompletableFuture dep; //要完成的依賴CompletableFuture CompletableFuture src; //作為行為源的CompletableFuture UniCompletion(Executor executor, CompletableFuture dep, CompletableFuture src) { this.executor = executor; this.dep = dep; this.src = src; } final boolean claim() { Executor e = executor; if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {//1 //compareAndSetForkJoinTaskTag是ForkJoinTask的方法,利用cas,保證任何一種情況下,該行為只能執行一次。 if (e == null) //不精確的說法是同步調用返回true,異步調用返回false,然后在線程池中執行。 //有e代表要在執行器中執行,盡管大多數情況下e都是線程池實例,會異步運行任務。但對于Executor來說,完全可以實現在同一個線程執行。 return true;//2. //對于3這行代碼,道格大神注釋就寫了個disable,為此我翻了大量代碼,發現根本過不了上面cas這一關,所以個人有兩個理解: //1.對于當前Completion而言,它的線程池只能用來做一次事,在claim之后立即置空,盡管此時還沒有執行action,也不允許當前Completion使用它做別的事了。 //2.減少了一個指向該線程池的引用,線程池也有被gc的時候吧。就算不gc,關閉虛擬機或者dump的時候也能少做點事。 executor = null; // 3. e.execute(this);//使用該線程池異步執行,回憶上面Completion的聲明,它實現了runnable,在run方法中tryFire(ASYNC),參數ASYNC是正數。 } return false; } final boolean isLive() { return dep != null; } }
盡管UniCompletion本身代碼不多,但是有關代碼卻很繞,后面會從CompletableFuture調用開始說明一個完整的工作流,作者本來有幾次都已經十分艱難的“確定發現問題”,寫出了“問題”,但最終還是在描述過程中啟動大腦自我否定,不得不佩服道格大神強大的邏輯和大腦。
很明顯,UniCompletion是一個可以擁有執行器的Completion,它是兩個操作的結合,dep為要最終執行的依賴操作,src為來源CompletableFuture,tryFire沒有默認實現,它的子類分別根據不同情況實現了該方法,實現的方式依舊是優雅的策略模式。
claim方法要在執行action前調用,若claim方法返回false,則不能調用action,原則上要保證action只執行一次。
claim的意思是聲稱,開個玩笑,在美劇行尸走肉第四季,有一伙武裝分子解決為了解決內部分配問題的提出了一個辦法,對任何事物只看誰先喊一句”claimed“,代表”我要了“。調用claim方法和稍后運行action的動作發生在一個線程,因此需要該線程嘗試去claim這個action,claim成功則執行,claim不成功則不執行。
但在提供Executor的前提下,claim除了聲明以外,還會直接在方法內使用該executor執行tryFire,間接地執行action,并返回false,避免調用者也執行action,因為有cas的效果,多次claim只有第一次可能返回true。
接下來看BiCompletion,它也是一個抽象類,不同在于它有兩個源,也就是它的成員dep要等到另外兩個成員CompletableFuture(src,snd)完成,具體的依賴關系要看子類實現。
abstract static class BiCompletionextends UniCompletion { CompletableFuture snd; // 第二個源action BiCompletion(Executor executor, CompletableFuture dep, CompletableFuture src, CompletableFuture snd) { super(executor, dep, src); this.snd = snd; } }
BiCompletion有多個實現類,看名稱可以看到Apply,Accept,Run等字眼,前面已經討論過相應的語義。
以OrApply為例
static final class OrApplyextends BiCompletion { Function super T,? extends V> fn; OrApply(Executor executor, CompletableFuture dep, CompletableFuture src, CompletableFuture snd, Function super T,? extends V> fn) { //構造函數,多傳一個函數,該函數就是dep對應的action。 super(executor, dep, src, snd); this.fn = fn; } //tryFire父類沒有實現 final CompletableFuture tryFire(int mode) { CompletableFuture d; CompletableFuture a; CompletableFuture b; if ((d = dep) == null ||//沒有dep,則沒有相應的依賴行為,已經執行過的dep會是null。 //執行orApply返回false,則返回null。最后一個參數僅當mode是ASYNC(只有它大于1)時會是this !d.orApply(a = src, b = snd, fn, mode > 0 ? null : this)) //到此可能是運行過,或者不滿執行fn的條件,返回null。 return null; //前面dep不是null,執行orApply也成功了,則解除引用關聯,下次運行會直接返回null,也不影響gc。 //回憶前面看過的核心函數postComplete,會對CompletableFuture中棧上的所有Completion進行tryFire, //返回非null則進行類似遞歸的操作,很明顯,在調用postComplete //方法時,dep為null會返回一個null,避免了再次tryFire。 dep = null; src = null; snd = null; fn = null; //正常運行結束,調用dep的postFire并返回。 return d.postFire(a, b, mode); } }
orApply方法定義在CompletionFuture。前面沒有敘述。它不是作者稱為的”核心函數“(即各種Completion都能使用到)。
finalboolean orApply(CompletableFuture a, CompletableFuture b, Function super R, ? extends T> f, OrApplyc) { Object r; Throwable x; if (a == null || b == null || //為r賦值用于后續的計算,因為是or,r優先取第一個,第一個源action未完成的情況下再取第二個。 ((r = a.result) == null && (r = b.result) == null) || f == null) //首先檢測兩個源action,若a和b均未完成,則說明依賴dep不可被執行,返回false。 return false; //僅當當前(dep)未完成(result為null)時,可進行完成工作。 tryComplete: if (result == null) { try { //前面說過,c不為null說明是異步執行,需要先去嘗試claim這個action。 if (c != null && !c.claim()) //異步且claim不成功,返回false。 return false; if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { //如果r表示異常,調用completeThrowable核心函數并結束代碼塊,直接返回true。 completeThrowable(x, r); break tryComplete; } //第一個非空的action(a或b)結果代表異常,但ex是null,則將r置為null并返回true。 r = null; } //r不代表異常結果,直接強轉,用該結果作為action的參數,執行action,用結果作為當前的result。出現異常則進入catch塊。 @SuppressWarnings("unchecked") R rr = (R) r; completeValue(f.apply(rr)); } catch (Throwable ex) { //上述代碼出現異常,調用completeThrowable完成dep(this) completeThrowable(ex); } } return true; }
正常運行結束還會調用dep的postFire,它也位于CompletableFuture中,但它只供 BiCompletion在tryFire成功之后才可使用,該方法源碼如下:
final CompletableFuturepostFire(CompletableFuture> a, CompletableFuture> b, int mode) { //對于ab兩個源,先處理b,后處理a if (b != null && b.stack != null) { //b存在且b的棧還有元素 if (mode < 0 || b.result == null) //當為NESTED(只有它的值是-1)時,或者b沒有結果時,對b進行清棧。調用postFire意味著d執行tryFire成功, //即d獲得了結果,而這前提是ab之一已執行成功(orApply的含義),所以ab可能是其一完成。 b.cleanStack(); else //非NESTED,則對b進行postComplete,該方法內部又會對b的棧上的每一個Completion執行tryFire,而且用NESTED模式。 b.postComplete(); } //接下來對a直接進行postFire,并沿用mode。 return postFire(a, mode); }
對a進行postComplete的方法如下:
final CompletableFuturepostFire(CompletableFuture> a, int mode) { if (a != null && a.stack != null) { //棧非空 if (mode < 0 || a.result == null) //類似上面的邏輯,是NESTED模式時或者a未完成時,對a進行清棧,否則對a執行postComplete. a.cleanStack(); else a.postComplete(); } //處理a之后,處理當前(即dep) if (result != null && stack != null) { //有結果且棧非空 if (mode < 0) //NESTED模式,直接返回this。 return this; else //非NESTED模式,執行postComplete,其中會對d的棧中所有Completion進行tryFire(NESTED), //并在每一個tryFire返回的CompletableFuture逆棧執行同一樣操作,參見上面的源碼。 postComplete(); } return null; }
以上是全部與OrApply的實現有關的源碼,下面來看一看OrApply的應用,再簡單梳理一下流程。
在CompletableFuture中有三個有關的方法:
可以看到三個方法的簽名和調用信息,這三個方法均是實現自CompletionStage。關于方法的字意和大致邏輯的推測方法前面已分析。
public CompletableFuture applyToEither( CompletionStage extends T> other, Function super T, U> fn) { //直接調用orApplyStage,不指定線程池。 return orApplyStage(null, other, fn); } public CompletableFuture applyToEitherAsync( CompletionStage extends T> other, Function super T, U> fn) { //調用orApplyStage方法,外部不提供線程池,使用asyncPool,關于asyncPool前面已分析。 return orApplyStage(asyncPool, other, fn); } public CompletableFuture applyToEitherAsync( CompletionStage extends T> other, Function super T, U> fn, Executor executor) { //調用orApplyStage方法,但對外面傳入的線程池進行屏蔽,條件符合則使用,不符合則更換,屏蔽原則前面已分析。 return orApplyStage(screenExecutor(executor), other, fn); }
可見三個方法均使用了orApplyStage方法,只是在參數上有所不同。再來看orApplyStage方法。
private CompletableFutureorApplyStage( Executor e, CompletionStage o, Function super T, ? extends V> f) { CompletableFuture b; if (f == null || (b = o.toCompletableFuture()) == null) //要執行的函數未提供,或者參數o轉換的CompletableFuture也是null,則拋出空指針。 throw new NullPointerException(); //新建了一個dep,后面將它返回,故直接調用實現自CompletionStage的方法不用考慮返回空的問題,可以鏈式調用。 CompletableFuture d = new CompletableFuture (); //如果指定了線程池,直接進入if。未指定線程池,首先嘗試調用orApply方法,并以this和b作參數。 //前面分析過,若條件滿足,即this和b有一個是完成態,則會立即執行f,結果或異常作為d的結果。 //d.orApply的最后一個參數是null(c),說明是同步操作,不會進行c.claim操作。 if (e != null || !d.orApply(this, b, f, null)) { //指定了線程池,或者嘗試d.orApply條件不滿足,轉為異步。 //構建OrApply對象壓入Completion棧。 OrApply c = new OrApply (e, d, this, b, f); orpush(b, c); //壓棧后再次嘗試同步調用一次tryFire,前面分析過,tryFire成功會最終調用相應的cleanStack,postComplete等操作, //將死亡的Completion(各子類有不同的判定,CoCompletion判定base是null,有些判斷dep是null,而完成一般會把dep置null) //從棧上移除。 c.tryFire(SYNC); } return d; } public CompletableFuture toCompletableFuture() { //直接返回this return this; } final void orpush(CompletableFuture> b, BiCompletion,?,?> c) { if (c != null) { //循環條件,b不存在或未完成且同時當前CompletableFuture未完成。有任何一個完成則終止,若無完成,則執行下面的代碼將任務入this和b的棧。 while ((b == null || b.result == null) && result == null) { //將c壓入當前CompletableFuture棧并退出循環。 if (tryPushStack(c)) { if (b != null && b != this && b.result == null) { //存在b,b不是當前,b未完成時。嘗試將c封裝成CoCompletion并壓入b的棧,前面說過 //這個壓入b棧的q完全依賴于c,并使用c的運行結果。 Completion q = new CoCompletion(c); //內循環,參數外循環說明。 while (result == null && b.result == null && !b.tryPushStack(q)) lazySetNext(q, null); // clear on failure } break; } //到此說明c壓入當前棧失敗,則將c的next恢復為null。 lazySetNext(c, null); // clear on failure } } }
簡單梳理OrApply這一條線的流程,其他線邏輯類似。
當使用Completable的applyToEitherAsync/applyToEither時,將進入這一條線的代碼執行,CompletableFuture在初步驗參后,會封裝一個d用于表示結果的CompletableFuture,稍后將會用它作為返回值。隨后根據入參不同而進入不停的邏輯。
同步的情況,即未提供Executor,首先就嘗試調用它的d.uniApply方法,若此時當前CompletableFuture或參數中的另一個stage已完成,則用完成的結果直接執行用戶指定的action并對d的結果進行賦值,并進一步完成d的后續清棧和postComplete(1);若此時當前的Completable或另一個stage未完成,則不滿足執行action的條件,將當前Completable作為第一個source,另一個stage作為第二個source,封裝成一個OrApply并壓當前CompletableFuture和另一個stage的棧(2),隨后立即以同步方式調用它的tryFire(1)。
異步的情況,直接封裝OrApply對象,將由線程池間接調用tryFire(3),進一步調用orApply方法,因為是異步,即使滿足了前面的條件(ab之一正?;虍惓M瓿桑?,依舊需要進行claim,claim失敗則不會執行action。claim成功,執行action出現異常,則用異常來完成這個action。
以上三種情況最終都會執行action,標注了(1)和(3)是很明確的兩種情況。
任何一個CompletableFuture完成后,都會根據mode進行后續處理,其實盡管每個Completion都具備一個next指針,但每一個Completion的完成均不依賴于棧中的其他Completion,僅在cleanStack,壓棧,postComplete使用了該棧的結構?,F在來回答前面分析時發現的兩個問題。
1.當前CompletableFuture在完成后,執行postComplete,會將它自身的棧中completion出棧并執行action,若要產生新的CompletableFuture,則將它的棧反向壓入自身的棧,然后重復執行出棧-執行的操作。反向壓棧有問題嗎?答案是沒有。因為棧中的每一個Completion在執行上互不影響,它們的順序只影響到cleanStack和postComplete的處理順序。CompletableFuture和它的棧元素產生的CompletableFuture彼此間有順序要求,但對同一個CompletableFuture的棧內的Completion元素彼此間沒有順序要求,決定他們順序的是對源CompletionFuture調用orApply,thenApply等等方法的順序,后續運行也完全獨立。只不過在源CompletableFuture進行postComplete時,執行的順序將會與原本的”先來后到“相反。
2.cleanStack到一半,p指向的Completion依舊存活,位于p以上的Completion已執行完畢,那么不會重新開始循環,p之前的死亡Completion會留在棧中。這也是為什么前面使用OrApply來解釋這個問題的原因,因為很可能就不存在這個問題。根據前面的源碼,僅有postComplete觸發的tryFire會使用NESTED(-1)模式,只有NESTED模式下,或者源CompletableFuture的result為null(未完成)的情況下執行postFire才會進入到cleanStack,否則會進入postComplete,后者會將所有元素出棧并執行存活元素,顯然不存在要考慮存活的問題。而只有or且為BiCompletion的情況下,才可能出現兩個源之一實際并未完成,這樣在非NESTED模式下調用cleanStack方法。
可見2的問題是存在的。但它對于整體的運行結果是無影響的,后續該source執行完畢,調用自身的postComplete時,將已死亡的Completion出棧并tryFire,會發現諸如”dep=null"等情況,直接返回null,則postComplete方法中的f會保持指向this并繼續迭代下一個棧元素。
目前關于2中提到的cleanStack的調用只出現在UniCompletion成功后調用postFire時依賴模式和result運行。其實還有一種情況,就是前面提了一次的,屬于future接口的get方法,以及類似的join方法。
前面提到,get和join方法都會在獲取不到結果是按條件輪循watingGet方法,下面來看waitingGet方法。
private Object waitingGet(boolean interruptible) { Signaller q = null;//信號器 boolean queued = false;//是否入隊 int spins = -1;//自旋次數 Object r;//結果引用 //循環條件是只等待result,內部有根據擾動決定的break while ((r = result) == null) { //自旋次數只有第一次進來是負值,后續只能是0或其他正數。 if (spins < 0) //自旋次數,多處理器下初始化為16,否則為0,即不自旋。設置值后此次循環結束。 spins = (Runtime.getRuntime().availableProcessors() > 1) ? 1 << 8 : 0; //第二次循環時才會判斷自旋次數。只要spins大于0就繼續循環,直到達到0為止再執行下面的else代碼。 else if (spins > 0) { //僅當下一個種子數不小于0時,減小一次自旋次數。nextSecondarySeed是Thread類中使用@Contended注解標識的變量, //這與傳說中的偽共享有關。 if (ThreadLocalRandom.nextSecondarySeed() >= 0) --spins; } //停止自旋后的第一輪循環,result依舊是null,則對q進行初始化,關于Signaller后續再講。 else if (q == null) q = new Signaller(interruptible, 0L, 0L); //初始化q后的下一輪循環(停止自旋后的第二輪),queued是false,將上一輪循環初始化的q壓入棧。 else if (!queued) queued = tryPushStack(q); //停止自旋后的若干次循環(上一步可能壓棧失敗,則下一輪自旋會再次壓棧,直到成功)后,判斷是否可擾動。 else if (interruptible && q.interruptControl < 0) { //擾動信號匹配,將q的有關字段全部置空,順帶清一下棧,返回null。 q.thread = null; //這個清棧的過程,細看上面的解釋還有有關的源碼,可能會發出一個疑問,cleanStack只能清除isLive判斷false的Completion, //但目前的實現,基本上都只能在dep為null,base為null等僅當dep執行完成的情況發生,而dep完成的情況是當前CompletableFuture的 //result不是null,而方法運行到此,很明顯result必然是null,那么還有必要清棧嗎? //答案是必要的,首先將來也許能出現存活或死亡狀態與source的result無關的Completion,那么此處清一下棧也是幫助后面的工作。 //其次,剛才壓入棧的q在thread指向null時即已死亡,它也必須要進行清除。 cleanStack(); return null; } else if (q.thread != null && result == null) { //q關聯的線程存在,即q存活,且依舊沒有執行完畢,使用ForkJoinPool的阻塞管理機制,q的策略進行阻塞。 try { ForkJoinPool.managedBlock(q); } catch (InterruptedException ie) { //阻塞是可以擾動的,此時會將q的擾動控制信號設置為-1,則下一次循環時將可能進入上一個else if。 q.interruptControl = -1; } } } //前面的循環沒有break,能執行到此,只有result獲得非null值的情況。 if (q != null) { //若q不是null,說明沒有在自旋階段獲取到result,需要對它進行禁用。 q.thread = null; if (q.interruptControl < 0) { if (interruptible) //可擾動且有擾動信號,則說明擾動后未能進入上面帶有cleanStack的那個else if, //可能是恰好在這次循環開始時獲取到了非空result,從而退出循環,也可能是參數interruptible為假, //在外部擾動了當前線程后,依舊等到了result。 //只要發生了擾動,就將結果置null,外面調用者如果是join,可以報出擾動。 r = null; // report interruption else //如果不可擾動,則中斷當前線程(創建q的線程)。 Thread.currentThread().interrupt(); } } //當前future已經有結果,進行postComplete邏輯并返回r。 postComplete(); return r; }
根據該方法的注釋,waitingGet方法只會有兩個結果,null(可擾動并且擾動了)和原始的result。而get方法可擾動,也即可返回null,join方法不可擾動,只能等待結束或拋出異常。
waitingGet方法中出現了第三個也是最后一個Completion的直接子類Signaller,前面沒有對它進行介紹,不過它也只使用在此處,因此可以一并介紹。
static final class Signaller extends Completion implements ForkJoinPool.ManagedBlocker { long nanos; // 計時的情況下,要等待的時間。 final long deadline; // 計時的情況下指定不為0的值 volatile int interruptControl; // 大于0代表可擾動,小于0代表已擾動。 volatile Thread thread;//持有的線程 Signaller(boolean interruptible, long nanos, long deadline) { this.thread = Thread.currentThread(); this.interruptControl = interruptible ? 1 : 0;//不可擾動,賦0 this.nanos = nanos; this.deadline = deadline; } final CompletableFuture> tryFire(int ignore) {//ignore無用 Thread w; //Signaller自持有創建者線程,tryFire只是單純喚醒創建它的線程。 if ((w = thread) != null) { thread = null;//釋放引用 LockSupport.unpark(w);//解除停頓。 } //返回null,當action已執行并進行postComlete調用時,f依舊指向當前CompletableFuture引用并解除停頓。 return null; } public boolean isReleasable() { //線程是空,允許釋放。這可能是某一次調用本方法或tryFire方法造成。 if (thread == null) return true; if (Thread.interrupted()) { //如果調用isReleasable方法的線程被擾動了,則置擾動信號為-1 int i = interruptControl; interruptControl = -1; if (i > 0) //原擾動信號是”可擾動“,則是本次調用置為”已擾動“,返回true。 return true; } //未定時(deadline是0)的情況只能在上面釋放,定時的情況,本次計算nanos(deadline-System.nanoTime()) //或上次計算的nanos不大于0時,說明可以釋放。 if (deadline != 0L && (nanos <= 0L || (nanos = deadline - System.nanoTime()) <= 0L)) { //只要可釋放,將創建者線程的引用釋放。下次調用直接返回true,線程運行結束銷毀后可被gc回收。 thread = null; return true; } //仍持有創建者線程,調用此方法的線程未擾動或當前擾動不是第一次,未定時或不滿足定時設置的一律返回false。 return false; } public boolean block() { //block方法 if (isReleasable()) //判斷可釋放,直接return true。 return true; //判斷deadline是0,說明不計時,默認park。 else if (deadline == 0L) LockSupport.park(this); else if (nanos > 0L) //計時情況,park指定nanos。 LockSupport.parkNanos(this, nanos); //睡醒后再次返回isReleasable的結果。 return isReleasable(); } //創建者線程引用被釋放即代表死亡。 final boolean isLive() { return thread != null; } }
Signaller是一個Completion的直接子類,同時實現了ForkJoinPool的內部接口ManagedBlocker,這使得它可以在當ForkJoinPool出現大量線程阻塞堆積時避免饑餓。
Signaller的作用是持有和釋放一個線程,并提供相應的阻塞策略。
前面提到的waitingGet方法創建了一個Signaller(interruptible, 0L, 0L),類似的,可以看到timedGet方法使用Signaller(true, nanos, d == 0L ? 1L : d)來進行阻塞的管理,管理的方法依賴ForkJoinPool內部的
ForkJoinPool.managedBlock(q)來實現,而這用到了被Signaller實現的ForkJoinPool.ManagedBlocker,managedBlock方法源碼如下。
//ForkJoinPool的managedBlock方法。 public static void managedBlock(ManagedBlocker blocker) throws InterruptedException { ForkJoinPool p; ForkJoinWorkerThread wt; Thread t = Thread.currentThread();//調用此方法的線程,即前面的Signaller的創建者線程。 if ((t instanceof ForkJoinWorkerThread) && (p = (wt = (ForkJoinWorkerThread)t).pool) != null) { //調用managedBlock方法的線程是ForkJoinWorkerThread,則它可運行在ForkJoinPool中。此處要求內部持有pool的引用。 WorkQueue w = wt.workQueue; //循環,只要判斷blocker(即Signaller)不可釋放。 while (!blocker.isReleasable()) { //嘗試用ForkJoinPool對當前線程的工作隊列進行補償。 //tryCompensate方法會嘗試減少活躍數并可能創建或釋放一個準備阻塞的worker線程, //它會在發生競態,臟數據,松弛或池終止時返回false。 //關于ForkJoinPool的詳情多帶帶準備文章。 if (p.tryCompensate(w)) { try { //補償成功,不停地對線程池嘗試先isReleasable再block,任何一個方法返回true則終止循環。 do {} while (!blocker.isReleasable() && !blocker.block()); } finally { //出現任何異常,或循環終止時,控制信號加上一個活躍數單元,因為前面通過補償才會進入循環,已減少了一個單元。 U.getAndAddLong(p, CTL, AC_UNIT); } break; } } } else { //當前線程不是ForkJoinWorkerThread或不持有ForkJoinPool的引用。連續先嘗試isReleasable再嘗試block,直到有一者返回true為止。 do {} while (!blocker.isReleasable() && !blocker.block()); } }
關于ForkJoinPool本文不做額外介紹,只列舉這一個方法,到此為止,對于CompletableFuture的主要接口(繼承自CompletionStage)和實現已經描述完畢(其實只過了一個特殊案例的接口,但是前面提到過,其他接口的邏輯和實現方式類似,無非就是run,active,apply的更換,或either,both,then,when等,有上面的基礎,再憑借規則推測語義,源碼并不難理解。
CompletableFuture還有一些獨立聲明的公有方法,源碼也有些非常值得借鑒的地方,如allOf,anyOf兩個方法。
//anyOf方法,返回一個CompletableFuture對象,任何一個cfs列表中的成員進入完成態(正常完成或異常),則它也一并完成,結果一致。 public static CompletableFutureanyOf(CompletableFuture>... cfs) { //直接調用orTree return orTree(cfs, 0, cfs.length - 1); } //allOf方法,當所有cfs列表中的成員進入完成態后完成(使用空結果),或有任何一個列表成員異常完成時完成(使用同一個異常)。 public static CompletableFuture allOf(CompletableFuture>... cfs) { //直接調用andTree return andTree(cfs, 0, cfs.length - 1); } static CompletableFuture andTree(CompletableFuture>[] cfs, int lo, int hi) { //聲明一個后續返回的dep CompletableFuture d = new CompletableFuture (); if (lo > hi) //驗參 d.result = NIL; else { CompletableFuture> a, b; //折半驗證參數并歸并。每相鄰的兩個成員會在一個遞歸中生成另一個"d", //總量奇數的最后一個多帶帶表示這個d。 int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : andTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); //調用d.biRelay的中繼方法嘗試完成。 if (!d.biRelay(a, b)) { //不滿足完成條件,生成一個中繼并壓棧,再次嘗試同步完成。若不滿足條件,ab任何一個完成后都會再間接調用它的tryFire。 BiRelay,?> c = new BiRelay<>(d, a, b); a.bipush(b, c);//除非ab均完成,否則bipush要進ab兩者的棧。 c.tryFire(SYNC); } } return d; } //biRelay方法,有前面的基礎,很簡單,只要ab之一任何一個未完成則返回false,都完成且dep未完成則進入相應的正常異常完成策略, //不論dep是否已完成,只要ab均已完成,則返回true boolean biRelay(CompletableFuture> a, CompletableFuture> b) { Object r, s; Throwable x; if (a == null || (r = a.result) == null || b == null || (s = b.result) == null) return false; //biRelay是嘗試根據兩個CompletableFuture完成dep,因為三個complete*方法均已做到原子性,也沒有action要執行,因此它不需要claim。 if (result == null) { if (r instanceof AltResult && (x = ((AltResult)r).ex) != null) completeThrowable(x, r); else if (s instanceof AltResult && (x = ((AltResult)s).ex) != null) completeThrowable(x, s); else //正常情況,用null完成。 completeNull(); } return true; } //壓入棧的BiRelay static final class BiRelay extends BiCompletion { // for And BiRelay(CompletableFuture dep, CompletableFuture src, CompletableFuture snd) { super(null, dep, src, snd); } final CompletableFuture tryFire(int mode) { CompletableFuture d; CompletableFuture a; CompletableFuture b; if ((d = dep) == null || !d.biRelay(a = src, b = snd)) //已經完成過,或者未完成,本次也不能完成,返回一個null return null; //BiRelay通過BiCompletion間接繼承了UniCompletion,因此dep取null代表死亡。 //這樣也能規避錯誤的tryFire,如當它已被完成,持有的dep引用置null,當d進行postFire的postComplete時會保持f=this并持續出棧 //dep未完成時清棧也能有效移除已完成的任務。 src = null; snd = null; dep = null; return d.postFire(a, b, mode); } } //orTree類似上面的andTree,有一個完成或異常,就用它的結果或異常作為返回的CompletableFuture的結果或異常。 static CompletableFuture orTree(CompletableFuture>[] cfs, int lo, int hi) { CompletableFuture d = new CompletableFuture (); if (lo <= hi) { CompletableFuture> a, b; int mid = (lo + hi) >>> 1; //同上 if ((a = (lo == mid ? cfs[lo] : orTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : orTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); //同上,下面簡述orRelay和OrRelay if (!d.orRelay(a, b)) { OrRelay,?> c = new OrRelay<>(d, a, b); //除非ab任何一個已完成,否則orpush要進棧,且只進一個棧。 a.orpush(b, c); c.tryFire(SYNC); } } return d; } //很明顯,orRelay就是兩個CompletableFuture的或關系中繼者。 final boolean o
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77760.html
摘要:前言在前面的文章和響應式編程中提到了和后者毫無疑問是一個線程池前者則是一個類似經典定義的概念官方有一個非常無語的解釋就是運行在的一個任務抽象就是運行的線程池框架包含和若干的子類它的核心在于分治和工作竅取最大程度利用線程池中的工作線程避免忙的 前言 在前面的文章CompletableFuture和響應式編程中提到了ForkJoinTask和ForkJoinPool,后者毫無疑問是一個線程...
摘要:一和并發包中的和主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當于的使用。寫鎖與讀鎖之間互斥,一個線程在寫時,不允許讀操作。的注意事項不支持重入,即不可反復獲取同一把鎖。沒有返回值,也就是說無法獲取執行結果。 一、Lock 和 Condition Java 并發包中的 Lock 和 Condition 主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當于 synchron...
摘要:前言在前面的三篇文章中先后介紹了框架的任務組件體系體系源碼并簡單介紹了目前的并行流應用場景框架本質上是對的擴展它依舊支持經典的使用方式即任務池的配合向池中提交任務并異步地等待結果毫無疑問前面的文章已經解釋了框架的新穎性初步了解了工作竊取 前言 在前面的三篇文章中先后介紹了ForkJoin框架的任務組件(ForkJoinTask體系,CountedCompleter體系)源碼,并簡單介紹...
摘要:前言在前面的文章框架之中梳理了框架的簡要運行格架和異常處理流程顯然要理解框架的調度包含工作竊取等思想需要去中了解而對于的拓展和使用則需要了解它的一些子類前文中偶爾會提到的一個子類直譯為計數的完成器前文也說過的并行流其實就是基于了框架實現因此 前言 在前面的文章ForkJoin框架之ForkJoinTask中梳理了ForkJoin框架的簡要運行格架和異常處理流程,顯然要理解ForkJoi...
摘要:方法接收的是的實例,但是它沒有返回值方法是函數式接口,無參數,會返回一個結果這兩個方法是的升級,表示讓任務在指定的線程池中執行,不指定的話,通常任務是在線程池中執行的。該的接口是在線程使用舊的接口,它不允許返回值。 簡介 作為Java 8 Concurrency API改進而引入,本文是CompletableFuture類的功能和用例的介紹。同時在Java 9 也有對Completab...
閱讀 1378·2023-04-25 18:34
閱讀 3437·2021-11-19 09:40
閱讀 2823·2021-11-17 09:33
閱讀 2934·2021-11-12 10:36
閱讀 2823·2021-09-26 09:55
閱讀 2652·2021-08-05 10:03
閱讀 2511·2019-08-30 15:54
閱讀 2860·2019-08-30 15:54