摘要:這個例子想要說明兩個事情中以為結尾的方法將會異步執行默認情況下即指沒有傳入的情況下,異步執行會使用實現,該線程池使用一個后臺線程來執行任務。這個例子展示了如何使用一個固定大小的線程池來實現大寫操作。
前言
這篇博客回顧JAVA8的CompletionStageAPI以及其在JAVA庫中的標準實現CompletableFuture。將會通過幾個例子來展示API的各種行為。
因為CompletableFuture是CompletionInterface接口的實現,所以我們首先要了解該接口的契約。它代表某個同步或異步計算的一個階段。你可以把它理解為是一個為了產生有價值最終結果的計算的流水線上的一個單元。這意味著多個ComletionStage指令可以鏈接起來從而一個階段的完成可以觸發下一個階段的執行。
除了實現了CompletionStage接口,Completion還繼承了Future,這個接口用于實現一個未開始的異步事件。因為能夠顯式的完成Future,所以取名為CompletableFuture。
1.新建一個完成的CompletableFuture這個簡單的示例中創建了一個已經完成的預先設置好結果的CompletableFuture。通常作為計算的起點階段。
static void completedFutureExample() { CompletableFuture cf = CompletableFuture.completedFuture("message"); assertTrue(cf.isDone()); assertEquals("message", cf.getNow(null)); }
getNow方法會返回完成后的結果(這里就是message),如果還未完成,則返回傳入的默認值null。
2.運行一個簡單的異步stage下面的例子解釋了如何創建一個異步運行Runnable的stage。
static void runAsyncExample() { CompletableFuture cf = CompletableFuture.runAsync(() -> { assertTrue(Thread.currentThread().isDaemon()); randomSleep(); }); assertFalse(cf.isDone()); sleepEnough(); assertTrue(cf.isDone()); }
這個例子想要說明兩個事情:
CompletableFuture中以Async為結尾的方法將會異步執行
默認情況下(即指沒有傳入Executor的情況下),異步執行會使用ForkJoinPool實現,該線程池使用一個后臺線程來執行Runnable任務。注意這只是特定于CompletableFuture實現,其它的CompletableStage實現可以重寫該默認行為。
3.將方法作用于前一個Stage下面的例子引用了第一個例子中已經完成的CompletableFuture,它將引用生成的字符串結果并將該字符串大寫。
static void thenApplyExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> { assertFalse(Thread.currentThread().isDaemon()); return s.toUpperCase(); }); assertEquals("MESSAGE", cf.getNow(null)); }
這里的關鍵詞是thenApply:
then是指在當前階段正常執行完成后(正常執行是指沒有拋出異常)進行的操作。在本例中,當前階段已經完成并得到值message。
Apply是指將一個Function作用于之前階段得出的結果
Function是阻塞的,這意味著只有當大寫操作執行完成之后才會執行getNow()方法。
4.異步的的將方法作用于前一個Stage通過在方法后面添加Async后綴,該CompletableFuture鏈將會異步執行(使用ForkJoinPool.commonPool())
static void thenApplyAsyncExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }); assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join()); }使用一個自定義的Executor來異步執行該方法
異步方法的一個好處是可以提供一個Executor來執行CompletableStage。這個例子展示了如何使用一個固定大小的線程池來實現大寫操作。
static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() { int count = 1; @Override public Thread newThread(Runnable runnable) { return new Thread(runnable, "custom-executor-" + count++); } }); static void thenApplyAsyncWithExecutorExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> { assertTrue(Thread.currentThread().getName().startsWith("custom-executor-")); assertFalse(Thread.currentThread().isDaemon()); randomSleep(); return s.toUpperCase(); }, executor); assertNull(cf.getNow(null)); assertEquals("MESSAGE", cf.join()); }6.消費(Consume)前一個Stage的結果
如果下一個Stage接收了當前Stage的結果但是在計算中無需返回值(比如其返回值為void),那么它將使用方法thenAccept并傳入一個Consumer接口。
static void thenAcceptExample() { StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture("thenAccept message") .thenAccept(s -> result.append(s)); assertTrue("Result was empty", result.length() > 0); }
Consumer將會同步執行,所以我們無需在返回的CompletableFuture上執行join操作。
7.異步執行Comsume同樣,使用Asyn后綴實現:
static void thenAcceptAsyncExample() { StringBuilder result = new StringBuilder(); CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message") .thenAcceptAsync(s -> result.append(s)); cf.join(); assertTrue("Result was empty", result.length() > 0); }8.計算出現異常時
我們現在來模擬一個出現異常的場景。為了簡潔性,我們還是將一個字符串大寫,但是我們會模擬延時進行該操作。我們會使用thenApplyAsyn(Function, Executor),第一個參數是大寫轉化方法,第二個參數是一個延時executor,它會延時一秒鐘再將操作提交給ForkJoinPool。
static void completeExceptionallyExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture exceptionHandler = cf.handle((s, th) -> { return (th != null) ? "message upon cancel" : ""; }); cf.completeExceptionally(new RuntimeException("completed exceptionally")); assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); try { cf.join(); fail("Should have thrown an exception"); } catch(CompletionException ex) { // just for testing assertEquals("completed exceptionally", ex.getCause().getMessage()); } assertEquals("message upon cancel", exceptionHandler.join()); }
首先,我們新建了一個已經完成并帶有返回值message的CompletableFuture對象。然后我們調用thenApplyAsync方法,該方法會返回一個新的CompletableFuture。這個方法用異步的方式執行大寫操作。這里還展示了如何使用delayedExecutor(timeout, timeUnit)方法來延時異步操作。
然后我們創建了一個handler stage,exceptionHandler,這個階段會處理一切異常并返回另一個消息message upon cancel。
最后,我們顯式的完成第二個階段并拋出異常,它會導致進行大寫操作的階段拋出CompletionException。它還會觸發handler階段。
API補充:9.取消計算
CompletableFuture handle(BiFunction super T,Throwable,? extends U> fn)
返回一個新的CompletionStage,無論之前的Stage是否正常運行完畢。傳入的參數包括上一個階段的結果和拋出異常。
和計算時異常處理很相似,我們可以通過Future接口中的cancel(boolean mayInterruptIfRunning)來取消計算。
static void cancelExample() { CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS)); CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message"); assertTrue("Was not canceled", cf.cancel(true)); assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally()); assertEquals("canceled message", cf2.join()); }
API補充10.將Function作用于兩個已完成Stage的結果之一
public CompletableFutureexceptionally(Function fn)
返回一個新的CompletableFuture,如果出現異常,則為該方法中執行的結果,否則就是正常執行的結果。
下面的例子創建了一個CompletableFuture對象并將Function作用于已完成的兩個Stage中的任意一個(沒有保證哪一個將會傳遞給Function)。這兩個階段分別如下:一個將字符串大寫,另一個小寫。
static void applyToEitherExample() { String original = "Message"; CompletableFuture cf1 = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)); CompletableFuture cf2 = cf1.applyToEither( CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), s -> s + " from applyToEither"); assertTrue(cf2.join().endsWith(" from applyToEither")); }
public CompletableFuture applyToEitherAsync(CompletionStage extends T> other,Function super T,U> fn)11.消費兩個階段的任意一個結果
返回一個全新的CompletableFuture,包含著this或是other操作完成之后,在二者中的任意一個執行fn
和前一個例子類似,將Function替換為Consumer
static void acceptEitherExample() { String original = "Message"; StringBuilder result = new StringBuilder(); CompletableFuture cf = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)) .acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), s -> result.append(s).append("acceptEither")); cf.join(); assertTrue("Result was empty", result.toString().endsWith("acceptEither")); }12.在兩個階段都完成后運行Runnable
注意這里的兩個Stage都是同步運行的,第一個stage將字符串轉化為大寫之后,第二個stage將其轉化為小寫。
static void runAfterBothExample() { String original = "Message"; StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).runAfterBoth( CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), () -> result.append("done")); assertTrue("Result was empty", result.length() > 0); }13.用Biconsumer接收兩個stage的結果
Biconsumer支持同時對兩個Stage的結果進行操作。
static void thenAcceptBothExample() { String original = "Message"; StringBuilder result = new StringBuilder(); CompletableFuture.completedFuture(original).thenApply(String::toUpperCase).thenAcceptBoth( CompletableFuture.completedFuture(original).thenApply(String::toLowerCase), (s1, s2) -> result.append(s1 + s2)); assertEquals("MESSAGEmessage", result.toString()); }14.將Bifunction同時作用于兩個階段的結果
如果CompletableFuture想要合并兩個階段的結果并且返回值,我們可以使用方法thenCombine。這里的計算流都是同步的,所以最后的getNow()方法會獲得最終結果,即大寫操作和小寫操作的結果的拼接。
static void thenCombineExample() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) .thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)), (s1, s2) -> s1 + s2); assertEquals("MESSAGEmessage", cf.getNow(null)); }15.異步將Bifunction同時作用于兩個階段的結果
和之前的例子類似,只是這里用了不同的方法:即兩個階段的操作都是異步的。那么thenCombine也會異步執行,及時它沒有Async后綴。
static void thenCombineAsyncExample() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original) .thenApplyAsync(s -> delayedUpperCase(s)) .thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)), (s1, s2) -> s1 + s2); assertEquals("MESSAGEmessage", cf.join()); }16.Compose CompletableFuture
我們可以使用thenCompose來完成前兩個例子中的操作。
static void thenComposeExample() { String original = "Message"; CompletableFuture cf = CompletableFuture.completedFuture(original).thenApply(s -> delayedUpperCase(s)) .thenCompose(upper -> CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)) .thenApply(s -> upper + s)); assertEquals("MESSAGEmessage", cf.join()); }17.當多個階段中有有何一個完成,即新建一個完成階段
static void anyOfExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List18.當所有的階段完成,新建一個完成階段futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()])).whenComplete((res, th) -> { if(th == null) { assertTrue(isUpperCase((String) res)); result.append(res); } }); assertTrue("Result was empty", result.length() > 0); }
static void allOfExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List19.當所有階段完成以后,新建一個異步完成階段futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v, th) -> { futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); result.append("done"); }); assertTrue("Result was empty", result.length() > 0); }
static void allOfAsyncExample() { StringBuilder result = new StringBuilder(); List messages = Arrays.asList("a", "b", "c"); List20.真實場景futures = messages.stream() .map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s))) .collect(Collectors.toList()); CompletableFuture allOf = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) .whenComplete((v, th) -> { futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null)))); result.append("done"); }); allOf.join(); assertTrue("Result was empty", result.length() > 0); }
下面展示了一個實踐CompletableFuture的場景:
先通過調用cars()方法異步獲得Car列表。它將會返回一個CompletionStage>
。cars()方法應當使用一個遠程的REST端點來實現。
我們將該Stage和另一個Stage組合,另一個Stage會通過調用rating(manufactureId)來異步獲取每輛車的評分。
當所有的Car對象都填入評分后,我們調用allOf()來進入最終Stage,它將在這兩個階段完成后執行
在最終Stage上使用whenComplete(),打印出車輛的評分。
cars().thenCompose(cars -> { List參考資料updatedCars = cars.stream() .map(car -> rating(car.manufacturerId).thenApply(r -> { car.setRating(r); return car; })).collect(Collectors.toList()); CompletableFuture done = CompletableFuture .allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()])); return done.thenApply(v -> updatedCars.stream().map(CompletionStage::toCompletableFuture) .map(CompletableFuture::join).collect(Collectors.toList())); }).whenComplete((cars, th) -> { if (th == null) { cars.forEach(System.out::println); } else { throw new RuntimeException(th); } }).toCompletableFuture().join();
Java CompletableFuture 詳解
Guide To CompletableFuture
想要了解更多開發技術,面試教程以及互聯網公司內推,歡迎關注我的微信公眾號!將會不定期的發放福利哦~
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68632.html
摘要:在此基礎上又向前邁進了一步局部變量類型推斷允許開發人員跳過局部變量的類型聲明局部變量是指在方法定義,初始化塊,循環和其它的如代碼塊,會推斷該局部變量的類型。 前言 之前面試的時候問了我是否了解JDK10的變化,一時回答不出來,所以只回答了JDK8中的函數式編程和流編程。今天看到這篇講JAVA10的文章,順便了解一下。 正文 JAVA10的所有新特性請參考這里。在所有的JEP中,JEP-...
摘要:有可能一個線程中的動作相對于另一個線程出現亂序。當實際輸出取決于線程交錯的結果時,這種情況被稱為競爭條件。這里的問題在于代碼塊不是原子性的,而且實例的變化對別的線程不可見。這種不能同時在多個線程上執行的部分被稱為關鍵部分。 為什么要額外寫一篇文章來研究volatile呢?是因為這可能是并發中最令人困惑以及最被誤解的結構。我看過不少解釋volatile的博客,但是大多數要么不完整,要么難...
摘要:前言上一篇文章請參考貓頭鷹的深夜翻譯核心并發一安全發布發布一個對象是指該對象的引用對當前的域之外也可見比如,從方法中獲取一個引用。任務的功能性接口表示一個沒有返回值的任務表示一個包含返回值的計算。 前言 上一篇文章請參考貓頭鷹的深夜翻譯:核心JAVA并發(一) 安全發布 發布一個對象是指該對象的引用對當前的域之外也可見(比如,從getter方法中獲取一個引用)。要確保一個對象被安全的發...
摘要:本文簡介類概覽類構造器總結類構造方法類使用舉例類概覽是一個實現了接口,并且鍵為型的哈希表。中的條目不再被正常使用時,會被自動刪除。它的鍵值均支持。和絕大多數的集合類一樣,這個類不是同步的。 本文簡介 WeakHashMap類概覽 WeakHashMap類構造器總結 WeakHashMap類構造方法 WeakHasjMap類使用舉例 1. WeakHashMap類概覽 Wea...
摘要:什么是為執行字節碼提供一個運行環境。它的實現主要包含三個部分,描述實現規格的文檔,具體實現和滿足要求的計算機程序以及實例具體執行字節碼。該類先被轉化為一組字節碼并放入文件中。字節碼校驗器通過字節碼校驗器檢查格式并找出非法代碼。 什么是Java Development Kit (JDK)? JDK通常用來開發Java應用和插件。基本上可以認為是一個軟件開發環境。JDK包含Java Run...
閱讀 3266·2021-11-18 10:02
閱讀 3443·2021-10-11 10:58
閱讀 3376·2021-09-24 09:47
閱讀 1120·2021-09-22 15:21
閱讀 3915·2021-09-10 11:10
閱讀 3277·2021-09-03 10:28
閱讀 1748·2019-08-30 15:45
閱讀 2136·2019-08-30 14:22