摘要:方法接收的是的實例,但是它沒有返回值方法是函數式接口,無參數,會返回一個結果這兩個方法是的升級,表示讓任務在指定的線程池中執行,不指定的話,通常任務是在線程池中執行的。該的接口是在線程使用舊的接口,它不允許返回值。
簡介
作為Java 8 Concurrency API改進而引入,本文是CompletableFuture類的功能和用例的介紹。同時在Java 9 也有對CompletableFuture有一些改進,之后再進入講解。
Future計算Future異步計算很難操作,通常我們希望將任何計算邏輯視為一系列步驟。但是在異步計算的情況下,表示為回調的方法往往分散在代碼中或者深深地嵌套在彼此內部。但是當我們需要處理其中一個步驟中可能發生的錯誤時,情況可能會變得更復雜。
Futrue接口是Java 5中作為異步計算而新增的,但它沒有任何方法去進行計算組合或者處理可能出現的錯誤。
在Java 8中,引入了CompletableFuture類。與Future接口一起,它還實現了CompletionStage接口。此接口定義了可與其他Future組合成異步計算契約。
CompletableFuture同時是一個組合和一個框架,具有大約50種不同的構成,結合,執行異步計算步驟和處理錯誤。
如此龐大的API可能會令人難以招架,下文將調一些重要的做重點介紹。
使用CompletableFuture作為Future實現首先,CompletableFuture類實現Future接口,因此你可以將其用作Future實現,但需要額外的完成實現邏輯。
例如,你可以使用無構參構造函數創建此類的實例,然后使用complete方法完成。消費者可以使用get方法來阻塞當前線程,直到get()結果。
在下面的示例中,我們有一個創建CompletableFuture實例的方法,然后在另一個線程中計算并立即返回Future。
計算完成后,該方法通過將結果提供給完整方法來完成Future:
public FuturecalculateAsync() throws InterruptedException { CompletableFuture completableFuture = new CompletableFuture<>(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.complete("Hello"); return null; }); return completableFuture; }
為了分離計算,我們使用了Executor?API?,這種創建和完成CompletableFuture的方法可以與任何并發包(包括原始線程)一起使用。
請注意,該calculateAsync方法返回一個Future實例。
我們只是調用方法,接收Future實例并在我們準備阻塞結果時調用它的get方法。
另請注意,get方法拋出一些已檢查的異常,即ExecutionException(封裝計算期間發生的異常)和InterruptedException(表示執行方法的線程被中斷的異常):
FuturecompletableFuture = calculateAsync(); // ... String result = completableFuture.get(); assertEquals("Hello", result);
如果你已經知道計算的結果,也可以用變成同步的方式來返回結果。
FuturecompletableFuture = CompletableFuture.completedFuture("Hello"); // ... String result = completableFuture.get(); assertEquals("Hello", result);
作為在某些場景中,你可能希望取消Future任務的執行。
假設我們沒有找到結果并決定完全取消異步執行任務。這可以通過Future的取消方法完成。此方法mayInterruptIfRunning,但在CompletableFuture的情況下,它沒有任何效果,因為中斷不用于控制CompletableFuture的處理。
這是異步方法的修改版本:
public FuturecalculateAsyncWithCancellation() throws InterruptedException { CompletableFuture completableFuture = new CompletableFuture<>(); Executors.newCachedThreadPool().submit(() -> { Thread.sleep(500); completableFuture.cancel(false); return null; }); return completableFuture; }
當我們使用Future.get()方法阻塞結果時,cancel()表示取消執行,它將拋出CancellationException:
FutureAPI介紹 static方法說明future = calculateAsyncWithCancellation(); future.get(); // CancellationException
上面的代碼很簡單,下面介紹幾個 static 方法,它們使用任務來實例化一個 CompletableFuture 實例。
CompletableFuture.runAsync(Runnable runnable); CompletableFuture.runAsync(Runnable runnable, Executor executor); CompletableFuture.supplyAsync(Supplier supplier); CompletableFuture.supplyAsync(Supplier supplier, Executor executor)
runAsync 方法接收的是 Runnable 的實例,但是它沒有返回值
supplyAsync 方法是JDK8函數式接口,無參數,會返回一個結果
這兩個方法是 executor 的升級,表示讓任務在指定的線程池中執行,不指定的話,通常任務是在 ForkJoinPool.commonPool() 線程池中執行的。
supplyAsync()使用靜態方法runAsync和supplyAsync允許我們相應地從Runnable和Supplier功能類型中創建CompletableFuture實例。
該Runnable的接口是在線程使用舊的接口,它不允許返回值。
Supplier接口是一個不具有參數,并返回參數化類型的一個值的單個方法的通用功能接口。
這允許將Supplier的實例作為lambda表達式提供,該表達式執行計算并返回結果:
CompletableFuturethenRun()使用future = CompletableFuture.supplyAsync(() -> "Hello"); // ... assertEquals("Hello", future.get());
在兩個任務任務A,任務B中,如果既不需要任務A的值也不想在任務B中引用,那么你可以將Runnable lambda 傳遞給thenRun()方法。在下面的示例中,在調用future.get()方法之后,我們只需在控制臺中打印一行:
模板
CompletableFuture.runAsync(() -> {}).thenRun(() -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
第一行用的是 thenRun(Runnable runnable),任務 A 執行完執行 B,并且 B 不需要 A 的結果。
第二行用的是 thenRun(Runnable runnable),任務 A 執行完執行 B,會返回resultA,但是 B 不需要 A 的結果。
實戰
CompletableFuturethenAccept()使用completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenRun(() -> System.out.println("Computation finished.")); future.get();
在兩個任務任務A,任務B中,如果你不需要在Future中有返回值,則可以用 thenAccept方法接收將計算結果傳遞給它。最后的future.get()調用返回Void類型的實例。
模板
CompletableFuture.runAsync(() -> {}).thenAccept(resultA -> {}); CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
第一行中,runAsync不會有返回值,第二個方法thenAccept,接收到的resultA值為null,同時任務B也不會有返回結果
第二行中,supplyAsync有返回值,同時任務B不會有返回結果。
實戰
CompletableFuturethenApply()使用completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenAccept(s -> System.out.println("Computation returned: " + s)); future.get();
在兩個任務任務A,任務B中,任務B想要任務A計算的結果,可以用thenApply方法來接受一個函數實例,用它來處理結果,并返回一個Future函數的返回值:
模板
CompletableFuture.runAsync(() -> {}).thenApply(resultA -> "resultB"); CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
第二行用的是 thenApply(Function fn),任務 A 執行完執行 B,B 需要 A 的結果,同時任務 B 有返回值。
實戰
CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApply(s -> s + " World"); assertEquals("Hello World", future.get());
當然,多個任務的情況下,如果任務 B 后面還有任務 C,往下繼續調用 .thenXxx() 即可。
thenCompose()使用接下來會有一個很有趣的設計模式;
CompletableFuture API 的最佳場景是能夠在一系列計算步驟中組合CompletableFuture實例。
這種組合結果本身就是CompletableFuture,允許進一步再續組合。這種方法在函數式語言中無處不在,通常被稱為monadic設計模式。
簡單說,Monad就是一種設計模式,表示將一個運算過程,通過函數拆解成互相連接的多個步驟。你只要提供下一步運算所需的函數,整個運算就會自動進行下去。
在下面的示例中,我們使用thenCompose方法按順序組合兩個Futures。
請注意,此方法采用返回CompletableFuture實例的函數。該函數的參數是先前計算步驟的結果。這允許我們在下一個CompletableFuture的lambda中使用這個值:
CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); assertEquals("Hello World", completableFuture.get());
該thenCompose方法連同thenApply一樣實現了結果的合并計算。但是他們的內部形式是不一樣的,它們與Java 8中可用的Stream和Optional類的map和flatMap方法是有著類似的設計思路在里面的。
兩個方法都接收一個CompletableFuture并將其應用于計算結果,但thenCompose(flatMap)方法接收一個函數,該函數返回相同類型的另一個CompletableFuture對象。此功能結構允許將這些類的實例繼續進行組合計算。
thenCombine()取兩個任務的結果
如果要執行兩個獨立的任務,并對其結果執行某些操作,可以用Future的thenCombine方法:
模板
CompletableFuturecfA = CompletableFuture.supplyAsync(() -> "resultA"); CompletableFuture cfB = CompletableFuture.supplyAsync(() -> "resultB"); cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {}); cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
實戰
CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> "Hello") .thenCombine(CompletableFuture.supplyAsync( () -> " World"), (s1, s2) -> s1 + s2)); assertEquals("Hello World", completableFuture.get());
更簡單的情況是,當你想要使用兩個Future結果時,但不需要將任何結果值進行返回時,可以用thenAcceptBoth,它表示后續的處理不需要返回值,而 thenCombine 表示需要返回值:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello") .thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"), (s1, s2) -> System.out.println(s1 + s2));thenApply()和thenCompose()之間的區別
在前面的部分中,我們展示了關于thenApply()和thenCompose()的示例。這兩個API都是使用的CompletableFuture調用,但這兩個API的使用是不同的。
thenApply()此方法用于處理先前調用的結果。但是,要記住的一個關鍵點是返回類型是轉換泛型中的類型,是同一個CompletableFuture。
因此,當我們想要轉換CompletableFuture 調用的結果時,效果是這樣的 :
CompletableFuturethenCompose()finalResult = compute().thenApply(s-> s + 1);
該thenCompose()方法類似于thenApply()在都返回一個新的計算結果。但是,thenCompose()使用前一個Future作為參數。它會直接使結果變新的Future,而不是我們在thenApply()中到的嵌套Future,而是用來連接兩個CompletableFuture,是生成一個新的CompletableFuture:
CompletableFuturecomputeAnother(Integer i){ return CompletableFuture.supplyAsync(() -> 10 + i); } CompletableFuture finalResult = compute().thenCompose(this::computeAnother);
因此,如果想要繼續嵌套鏈接CompletableFuture??方法,那么最好使用thenCompose()。
并行運行多個任務當我們需要并行執行多個任務時,我們通常希望等待所有它們執行,然后處理它們的組合結果。
該CompletableFuture.allOf靜態方法允許等待所有的完成任務:
API
public static CompletableFutureallOf(CompletableFuture>... cfs){...}
實戰
CompletableFuturefuture1 = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> "Beautiful"); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> "World"); CompletableFuture combinedFuture = CompletableFuture.allOf(future1, future2, future3); // ... combinedFuture.get(); assertTrue(future1.isDone()); assertTrue(future2.isDone()); assertTrue(future3.isDone());
請注意,CompletableFuture.allOf()的返回類型是CompletableFuture
String combined = Stream.of(future1, future2, future3) .map(CompletableFuture::join) .collect(Collectors.joining(" ")); assertEquals("Hello Beautiful World", combined);
CompletableFuture 提供了 join() 方法,它的功能和 get() 方法是一樣的,都是阻塞獲取值,它們的區別在于 join() 拋出的是 unchecked Exception。這使得它可以在Stream.map()方法中用作方法引用。
異常處理說到這里,我們順便來說下 CompletableFuture 的異常處理。這里我們要介紹兩個方法:
public CompletableFutureexceptionally(Function fn); public CompletionStage handle(BiFunction super T, Throwable, ? extends U> fn);
看下代碼
CompletableFuture.supplyAsync(() -> "resultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD");
上面的代碼中,任務 A、B、C、D 依次執行,如果任務 A 拋出異常(當然上面的代碼不會拋出異常),那么后面的任務都得不到執行。如果任務 C 拋出異常,那么任務 D 得不到執行。
那么我們怎么處理異常呢?看下面的代碼,我們在任務 A 中拋出異常,并對其進行處理:
CompletableFuturefuture = CompletableFuture.supplyAsync(() -> { throw new RuntimeException(); }) .exceptionally(ex -> "errorResultA") .thenApply(resultA -> resultA + " resultB") .thenApply(resultB -> resultB + " resultC") .thenApply(resultC -> resultC + " resultD"); System.out.println(future.join());
上面的代碼中,任務 A 拋出異常,然后通過 .exceptionally() 方法處理了異常,并返回新的結果,這個新的結果將傳遞給任務 B。所以最終的輸出結果是:
errorResultA resultB resultC resultD
String name = null; // ... CompletableFuturecompletableFuture = CompletableFuture.supplyAsync(() -> { if (name == null) { throw new RuntimeException("Computation error!"); } return "Hello, " + name; })}).handle((s, t) -> s != null ? s : "Hello, Stranger!"); assertEquals("Hello, Stranger!", completableFuture.get());
當然,它們也可以都為 null,因為如果它作用的那個 CompletableFuture 實例沒有返回值的時候,s 就是 null。
Async后綴方法CompletableFuture類中的API的大多數方法都有兩個帶有Async后綴的附加修飾。這些方法表示用于異步線程。
沒有Async后綴的方法使用調用線程運行下一個執行線程階段。不帶Async方法使用ForkJoinPool.commonPool()線程池的fork / join實現運算任務。帶有Async方法使用傳遞式的Executor任務去運行。
下面附帶一個案例,可以看到有thenApplyAsync方法。在程序內部,線程被包裝到ForkJoinTask實例中。這樣可以進一步并行化你的計算并更有效地使用系統資源。
CompletableFutureJDK 9 CompletableFuture APIcompletableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture future = completableFuture .thenApplyAsync(s -> s + " World"); assertEquals("Hello World", future.get());
在Java 9中, CompletableFuture API通過以下更改得到了進一步增強:
新工廠方法增加了
支持延遲和超時
改進了對子類化的支持。
引入了新的實例API:
Executor defaultExecutor()
CompletableFuture newIncompleteFuture()
CompletableFuture
CompletionStage
CompletableFuture
CompletableFuture
CompletableFuture
CompletableFuture
還有一些靜態實用方法:
Executor delayedExecutor(long delay, TimeUnit unit, Executor executor)
Executor delayedExecutor(long delay, TimeUnit unit)
CompletionStage completedStage(U value)
CompletionStage failedStage(Throwable ex)
CompletableFuture failedFuture(Throwable ex)
最后,為了解決超時問題,Java 9又引入了兩個新功能:
orTimeout()
completeOnTimeout()
結論在本文中,我們描述了CompletableFuture類的方法和典型用例。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/74065.html
摘要:這個方法返回與等待所有返回等待多個返回取多個當中最快的一個返回等待多個當中最快的一個返回二詳解終極指南并發編程中的風格 thenApply(等待并轉化future) @Test public void testThen() throws ExecutionException, InterruptedException { CompletableFutur...
摘要:首先想到的是開啟一個新的線程去做某項工作。再進一步,為了讓新線程可以返回一個值,告訴主線程事情做完了,于是乎粉墨登場。然而提供的方式是主線程主動問詢新線程,要是有個回調函數就爽了。極大的提高效率。 showImg(https://segmentfault.com/img/bVbvgBJ?w=1920&h=1200); 引子 為了讓程序更加高效,讓CPU最大效率的工作,我們會采用異步編程...
摘要:組合式異步編程最近這些年,兩種趨勢不斷地推動我們反思我們設計軟件的方式。第章中介紹的分支合并框架以及并行流是實現并行處理的寶貴工具它們將一個操作切分為多個子操作,在多個不同的核甚至是機器上并行地執行這些子操作。 CompletableFuture:組合式異步編程 最近這些年,兩種趨勢不斷地推動我們反思我們設計軟件的方式。第一種趨勢和應用運行的硬件平臺相關,第二種趨勢與應用程序的架構相關...
摘要:項目需求項目中需要優化一個接口,這個接口需要拉取個第三方接口,需求延遲時間小于技術選型是提出的一個支持非阻塞的多功能的,同樣也是實現了接口,是添加的類,用來描述一個異步計算的結果。對進一步完善,擴展了諸多功能形成了。 項目需求: 項目中需要優化一個接口,這個接口需要拉取23個第三方接口,需求延遲時間小于200ms; 技術選型: CompletableFuture是JDK8提出的一個支持...
摘要:中使用了提供的原生接口對自身的異步化做了改進。可以支持和兩種調用方式。實戰通過下面的例子,可以看出的最大好處特性。 showImg(https://segmentfault.com/img/remote/1460000020032427?w=1240&h=655); 前段時間工作上比較忙,這篇文章一直沒來得及寫,本文是閱讀《Java8實戰》的時候,了解到Java 8里已經提供了一個異步...
閱讀 2431·2021-09-22 15:41
閱讀 1448·2021-08-19 10:54
閱讀 1755·2019-08-23 15:11
閱讀 3402·2019-08-23 10:23
閱讀 1428·2019-08-22 16:28
閱讀 799·2019-08-22 15:11
閱讀 739·2019-08-22 14:53
閱讀 710·2019-08-22 13:49