摘要:在這種方式中,主線程不會被阻塞,不需要一直等到子線程完成。主線程可以并行的執行其他任務。如果我們不想等待結果返回,我們可以把需要等待完成執行的邏輯寫入到回調函數中。任何立即執行完成那就是執行在主線程中嘗試刪除測試下。可以使用達成目的。
Java 8 有大量的新特性和增強如 Lambda 表達式,Streams,CompletableFuture等。在本篇文章中我將詳細解釋清楚CompletableFuture以及它所有方法的使用。
什么是CompletableFuture?在Java中CompletableFuture用于異步編程,異步編程是編寫非阻塞的代碼,運行的任務在一個多帶帶的線程,與主線程隔離,并且會通知主線程它的進度,成功或者失敗。
在這種方式中,主線程不會被阻塞,不需要一直等到子線程完成。主線程可以并行的執行其他任務。
使用這種并行方式,可以極大的提高程序的性能。
Future vs CompletableFutureCompletableFuture 是 Future API的擴展。
Future 被用于作為一個異步計算結果的引用。提供一個 isDone() 方法來檢查計算任務是否完成。當任務完成時,get() 方法用來接收計算任務的結果。
從 Callbale和 Future 教程可以學習更多關于 Future 知識.
Future API 是非常好的 Java 異步編程進階,但是它缺乏一些非常重要和有用的特性。
Future 的局限性不能手動完成
當你寫了一個函數,用于通過一個遠程API獲取一個電子商務產品最新價格。因為這個 API 太耗時,你把它允許在一個獨立的線程中,并且從你的函數中返回一個 Future。現在假設這個API服務宕機了,這時你想通過該產品的最新緩存價格手工完成這個Future 。你會發現無法這樣做。
Future 的結果在非阻塞的情況下,不能執行更進一步的操作
Future 不會通知你它已經完成了,它提供了一個阻塞的 get() 方法通知你結果。你無法給 Future 植入一個回調函數,當 Future 結果可用的時候,用該回調函數自動的調用 Future 的結果。
多個 Future 不能串聯在一起組成鏈式調用
有時候你需要執行一個長時間運行的計算任務,并且當計算任務完成的時候,你需要把它的計算結果發送給另外一個長時間運行的計算任務等等。你會發現你無法使用 Future 創建這樣的一個工作流。
不能組合多個 Future 的結果
假設你有10個不同的Future,你想并行的運行,然后在它們運行未完成后運行一些函數。你會發現你也無法使用 Future 這樣做。
沒有異常處理
Future API 沒有任務的異常處理結構居然有如此多的限制,幸好我們有CompletableFuture,你可以使用 CompletableFuture 達到以上所有目的。
CompletableFuture 實現了 Future 和 CompletionStage接口,并且提供了許多關于創建,鏈式調用和組合多個 Future 的便利方法集,而且有廣泛的異常處理支持。
創建 CompletableFuture1. 簡單的例子
可以使用如下無參構造函數簡單的創建 CompletableFuture:
CompletableFuturecompletableFuture = new CompletableFuture ();
這是一個最簡單的 CompletableFuture,想獲取CompletableFuture 的結果可以使用 CompletableFuture.get() 方法:
String result = completableFuture.get()
get() 方法會一直阻塞直到 Future 完成。因此,以上的調用將被永遠阻塞,因為該Future一直不會完成。
你可以使用 CompletableFuture.complete() 手工的完成一個 Future:
completableFuture.complete("Future"s Result")
所有等待這個 Future 的客戶端都將得到一個指定的結果,并且 completableFuture.complete() 之后的調用將被忽略。
2. 使用 runAsync() 運行異步計算
如果你想異步的運行一個后臺任務并且不想改任務返回任務東西,這時候可以使用 CompletableFuture.runAsync()方法,它持有一個Runnable 對象,并返回 CompletableFuture
// Run a task specified by a Runnable Object asynchronously. CompletableFuturefuture = CompletableFuture.runAsync(new Runnable() { @Override public void run() { // Simulate a long-running Job try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } System.out.println("I"ll run in a separate thread than the main thread."); } }); // Block and wait for the future to complete future.get()
你也可以以 lambda 表達式的形式傳入 Runnable 對象:
// Using Lambda Expression CompletableFuturefuture = CompletableFuture.runAsync(() -> { // Simulate a long-running Job try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } System.out.println("I"ll run in a separate thread than the main thread."); });
在本文中,我使用lambda表達式會比較頻繁,如果以前你沒有使用過,建議你也多使用lambda 表達式。
3. 使用 supplyAsync() 運行一個異步任務并且返回結果
當任務不需要返回任何東西的時候, CompletableFuture.runAsync() 非常有用。但是如果你的后臺任務需要返回一些結果應該要怎么樣?
CompletableFuture.supplyAsync() 就是你的選擇。它持有supplier
// Run a task specified by a Supplier object asynchronously CompletableFuturefuture = CompletableFuture.supplyAsync(new Supplier () { @Override public String get() { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of the asynchronous computation"; } }); // Block and get the result of the Future String result = future.get(); System.out.println(result);
Supplier
你可以使用lambda表達式使得上面的示例更加簡明:
// Using Lambda Expression CompletableFuturefuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of the asynchronous computation"; });
一個關于Executor 和Thread Pool筆記
你可能想知道,我們知道runAsync() 和supplyAsync()方法在多帶帶的線程中執行他們的任務。但是我們不會永遠只創建一個線程。
CompletableFuture可以從全局的 ForkJoinPool.commonPool()獲得一個線程中執行這些任務。
但是你也可以創建一個線程池并傳給runAsync() 和supplyAsync()方法來讓他們從線程池中獲取一個線程執行它們的任務。
CompletableFuture API 的所有方法都有兩個變體-一個接受Executor作為參數,另一個不這樣:
// Variations of runAsync() and supplyAsync() methods static CompletableFuturerunAsync(Runnable runnable) static CompletableFuture runAsync(Runnable runnable, Executor executor) static CompletableFuture supplyAsync(Supplier supplier) static CompletableFuture supplyAsync(Supplier supplier, Executor executor)
創建一個線程池,并傳遞給其中一個方法:
Executor executor = Executors.newFixedThreadPool(10); CompletableFuture在 CompletableFuture 轉換和運行future = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of the asynchronous computation"; }, executor);
CompletableFuture.get()方法是阻塞的。它會一直等到Future完成并且在完成后返回結果。
但是,這是我們想要的嗎?對于構建異步系統,我們應該附上一個回調給CompletableFuture,當Future完成的時候,自動的獲取結果。
如果我們不想等待結果返回,我們可以把需要等待Future完成執行的邏輯寫入到回調函數中。
可以使用 thenApply(), thenAccept() 和thenRun()方法附上一個回調給CompletableFuture。
1. thenApply()
可以使用 thenApply() 處理和改變CompletableFuture的結果。持有一個Function
// Create a CompletableFuture CompletableFuturewhatsYourNameFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Rajeev"; }); // Attach a callback to the Future using thenApply() CompletableFuture greetingFuture = whatsYourNameFuture.thenApply(name -> { return "Hello " + name; }); // Block and get the result of the future. System.out.println(greetingFuture.get()); // Hello Rajeev
你也可以通過附加一系列的thenApply()在回調方法 在CompletableFuture寫一個連續的轉換。這樣的話,結果中的一個 thenApply方法就會傳遞給該系列的另外一個 thenApply方法。
CompletableFuturewelcomeText = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Rajeev"; }).thenApply(name -> { return "Hello " + name; }).thenApply(greeting -> { return greeting + ", Welcome to the CalliCoder Blog"; }); System.out.println(welcomeText.get()); // Prints - Hello Rajeev, Welcome to the CalliCoder Blog
2. thenAccept() 和 thenRun()
如果你不想從你的回調函數中返回任何東西,僅僅想在Future完成后運行一些代碼片段,你可以使用thenAccept() 和 thenRun()方法,這些方法經常在調用鏈的最末端的最后一個回調函數中使用。
CompletableFuture.thenAccept() 持有一個Consumer
// thenAccept() example CompletableFuture.supplyAsync(() -> { return ProductService.getProductDetail(productId); }).thenAccept(product -> { System.out.println("Got product detail from remote service " + product.getName()) });
雖然thenAccept()可以訪問CompletableFuture的結果,但thenRun()不能訪Future的結果,它持有一個Runnable返回CompletableFuture
// thenRun() example CompletableFuture.supplyAsync(() -> { // Run some computation }).thenRun(() -> { // Computation Finished. });
異步回調方法的筆記
CompletableFuture提供的所有回調方法都有兩個變體:
`// thenApply() variants
CompletableFuture thenApply(Function super T,? extends U> fn)
CompletableFuture thenApplyAsync(Function super T,? extends U> fn)
CompletableFuture thenApplyAsync(Function super T,? extends U> fn, Executor executor)`
這些異步回調變體通過在獨立的線程中執行回調任務幫助你進一步執行并行計算。
以下示例:
CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Some Result" }).thenApply(result -> { /* Executed in the same thread where the supplyAsync() task is executed or in the main thread If the supplyAsync() task completes immediately (Remove sleep() call to verify) */ return "Processed Result" })
在以上示例中,在thenApply()中的任務和在supplyAsync()中的任務執行在相同的線程中。任何supplyAsync()立即執行完成,那就是執行在主線程中(嘗試刪除sleep測試下)。
為了控制執行回調任務的線程,你可以使用異步回調。如果你使用thenApplyAsync()回調,將從ForkJoinPool.commonPool()獲取不同的線程執行。
CompletableFuture.supplyAsync(() -> { return "Some Result" }).thenApplyAsync(result -> { // Executed in a different thread from ForkJoinPool.commonPool() return "Processed Result" })
此外,如果你傳入一個Executor到thenApplyAsync()回調中,,任務將從Executor線程池獲取一個線程執行。
Executor executor = Executors.newFixedThreadPool(2); CompletableFuture.supplyAsync(() -> { return "Some result" }).thenApplyAsync(result -> { // Executed in a thread obtained from the executor return "Processed Result" }, executor);組合兩個CompletableFuture
1. 使用 thenCompose() 組合兩個獨立的future
假設你想從一個遠程API中獲取一個用戶的詳細信息,一旦用戶信息可用,你想從另外一個服務中獲取他的貸方。
考慮下以下兩個方法getUserDetail() 和getCreditRating()的實現:
CompletableFuturegetUsersDetail(String userId) { return CompletableFuture.supplyAsync(() -> { UserService.getUserDetails(userId); }); } CompletableFuture getCreditRating(User user) { return CompletableFuture.supplyAsync(() -> { CreditRatingService.getCreditRating(user); }); }
現在讓我們弄明白當使用了thenApply()后是否會達到我們期望的結果-
CompletableFuture> result = getUserDetail(userId) .thenApply(user -> getCreditRating(user));
在更早的示例中,Supplier函數傳入thenApply將返回一個簡單的值,但是在本例中,將返回一個CompletableFuture。以上示例的最終結果是一個嵌套的CompletableFuture。
如果你想獲取最終的結果給最頂層future,使用 thenCompose()方法代替-
CompletableFutureresult = getUserDetail(userId) .thenCompose(user -> getCreditRating(user));
因此,規則就是-如果你的回調函數返回一個CompletableFuture,但是你想從CompletableFuture鏈中獲取一個直接合并后的結果,這時候你可以使用thenCompose()。
2. 使用thenCombine()組合兩個獨立的 future
雖然thenCompose()被用于當一個future依賴另外一個future的時候用來組合兩個future。thenCombine()被用來當兩個獨立的Future都完成的時候,用來做一些事情。
System.out.println("Retrieving weight."); CompletableFutureweightInKgFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 65.0; }); System.out.println("Retrieving height."); CompletableFuture heightInCmFuture = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return 177.8; }); System.out.println("Calculating BMI."); CompletableFuture combinedFuture = weightInKgFuture .thenCombine(heightInCmFuture, (weightInKg, heightInCm) -> { Double heightInMeter = heightInCm/100; return weightInKg/(heightInMeter*heightInMeter); }); System.out.println("Your BMI is - " + combinedFuture.get());
當兩個Future都完成的時候,傳給``thenCombine()的回調函數將被調用。
組合多個CompletableFuture我們使用thenCompose() 和 thenCombine()把兩個CompletableFuture組合在一起。現在如果你想組合任意數量的CompletableFuture,應該怎么做?我們可以使用以下兩個方法組合任意數量的CompletableFuture。
static CompletableFutureallOf(CompletableFuture>... cfs) static CompletableFuture
1. CompletableFuture.allOf()
CompletableFuture.allOf的使用場景是當你一個列表的獨立future,并且你想在它們都完成后并行的做一些事情。
假設你想下載一個網站的100個不同的頁面。你可以串行的做這個操作,但是這非常消耗時間。因此你想寫一個函數,傳入一個頁面鏈接,返回一個CompletableFuture,異步的下載頁面內容。
CompletableFuturedownloadWebPage(String pageLink) { return CompletableFuture.supplyAsync(() -> { // Code to download and return the web page"s content }); }
現在,當所有的頁面已經下載完畢,你想計算包含關鍵字CompletableFuture頁面的數量。可以使用CompletableFuture.allOf()達成目的。
ListwebPageLinks = Arrays.asList(...) // A list of 100 web page links // Download contents of all the web pages asynchronously List > pageContentFutures = webPageLinks.stream() .map(webPageLink -> downloadWebPage(webPageLink)) .collect(Collectors.toList()); // Create a combined Future using allOf() CompletableFuture allFutures = CompletableFuture.allOf( pageContentFutures.toArray(new CompletableFuture[pageContentFutures.size()]) );
使用CompletableFuture.allOf()的問題是它返回CompletableFuture
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list - CompletableFuture> allPageContentsFuture = allFutures.thenApply(v -> { return pageContentFutures.stream() .map(pageContentFuture -> pageContentFuture.join()) .collect(Collectors.toList()); });
花一些時間理解下以上代碼片段。當所有future完成的時候,我們調用了future.join(),因此我們不會在任何地方阻塞。
join()方法和get()方法非常類似,這唯一不同的地方是如果最頂層的CompletableFuture完成的時候發生了異常,它會拋出一個未經檢查的異常。
現在讓我們計算包含關鍵字頁面的數量。
// Count the number of web pages having the "CompletableFuture" keyword. CompletableFuturecountFuture = allPageContentsFuture.thenApply(pageContents -> { return pageContents.stream() .filter(pageContent -> pageContent.contains("CompletableFuture")) .count(); }); System.out.println("Number of Web Pages having CompletableFuture keyword - " + countFuture.get());
2. CompletableFuture.anyOf()
CompletableFuture.anyOf()和其名字介紹的一樣,當任何一個CompletableFuture完成的時候【相同的結果類型】,返回一個新的CompletableFuture。以下示例:
CompletableFuturefuture1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 1"; }); CompletableFuture future2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 2"; }); CompletableFuture future3 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new IllegalStateException(e); } return "Result of Future 3"; }); CompletableFuture
在以上示例中,當三個中的任何一個CompletableFuture完成, anyOfFuture就會完成。因為future2的休眠時間最少,因此她最先完成,最終的結果將是future2的結果。
CompletableFuture.anyOf()傳入一個Future可變參數,返回CompletableFuture
CompletableFuture 異常處理我們探尋了怎樣創建CompletableFuture,轉換它們,并組合多個CompletableFuture。現在讓我們弄明白當發生錯誤的時候我們應該怎么做。
首先讓我們明白在一個回調鏈中錯誤是怎么傳遞的。思考下以下回調鏈:
CompletableFuture.supplyAsync(() -> { // Code which might throw an exception return "Some result"; }).thenApply(result -> { return "processed result"; }).thenApply(result -> { return "result after further processing"; }).thenAccept(result -> { // do something with the final result });
如果在原始的supplyAsync()任務中發生一個錯誤,這時候沒有任何thenApply會被調用并且future將以一個異常結束。如果在第一個thenApply發生錯誤,這時候第二個和第三個將不會被調用,同樣的,future將以異常結束。
1. 使用 exceptionally() 回調處理異常
exceptionally()回調給你一個從原始Future中生成的錯誤恢復的機會。你可以在這里記錄這個異常并返回一個默認值。
Integer age = -1; CompletableFuturematurityFuture = CompletableFuture.supplyAsync(() -> { if(age < 0) { throw new IllegalArgumentException("Age can not be negative"); } if(age > 18) { return "Adult"; } else { return "Child"; } }).exceptionally(ex -> { System.out.println("Oops! We have an exception - " + ex.getMessage()); return "Unknown!"; }); System.out.println("Maturity : " + maturityFuture.get());
2. 使用 handle() 方法處理異常
API提供了一個更通用的方法 - handle()從異常恢復,無論一個異常是否發生它都會被調用。
Integer age = -1; CompletableFuturematurityFuture = CompletableFuture.supplyAsync(() -> { if(age < 0) { throw new IllegalArgumentException("Age can not be negative"); } if(age > 18) { return "Adult"; } else { return "Child"; } }).handle((res, ex) -> { if(ex != null) { System.out.println("Oops! We have an exception - " + ex.getMessage()); return "Unknown!"; } return res; }); System.out.println("Maturity : " + maturityFuture.get());
如果異常發生,res參數將是 null,否則,ex將是 null。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69149.html
摘要:方法接收的是的實例,但是它沒有返回值方法是函數式接口,無參數,會返回一個結果這兩個方法是的升級,表示讓任務在指定的線程池中執行,不指定的話,通常任務是在線程池中執行的。該的接口是在線程使用舊的接口,它不允許返回值。 簡介 作為Java 8 Concurrency API改進而引入,本文是CompletableFuture類的功能和用例的介紹。同時在Java 9 也有對Completab...
摘要:中使用了提供的原生接口對自身的異步化做了改進。可以支持和兩種調用方式。實戰通過下面的例子,可以看出的最大好處特性。 showImg(https://segmentfault.com/img/remote/1460000020032427?w=1240&h=655); 前段時間工作上比較忙,這篇文章一直沒來得及寫,本文是閱讀《Java8實戰》的時候,了解到Java 8里已經提供了一個異步...
摘要:方法接受一個生產者作為參數,返回一個對象,該對象完成異步執行后會讀取調用生產者方法的返回值。該方法接收一個對象構成的數組,返回由第一個執行完畢的對象的返回值構成的。 一、Future 接口 在Future中觸發那些潛在耗時的操作把調用線程解放出來,讓它能繼續執行其他有價值的工作,不再需要呆呆等待耗時的操作完成。打個比方,你可以把它想象成這樣的場景:你拿了一袋子衣服到你中意的干洗店去洗。...
摘要:首先想到的是開啟一個新的線程去做某項工作。再進一步,為了讓新線程可以返回一個值,告訴主線程事情做完了,于是乎粉墨登場。然而提供的方式是主線程主動問詢新線程,要是有個回調函數就爽了。極大的提高效率。 showImg(https://segmentfault.com/img/bVbvgBJ?w=1920&h=1200); 引子 為了讓程序更加高效,讓CPU最大效率的工作,我們會采用異步編程...
摘要:這個方法返回與等待所有返回等待多個返回取多個當中最快的一個返回等待多個當中最快的一個返回二詳解終極指南并發編程中的風格 thenApply(等待并轉化future) @Test public void testThen() throws ExecutionException, InterruptedException { CompletableFutur...
閱讀 3548·2021-08-31 09:39
閱讀 1854·2019-08-30 13:14
閱讀 2919·2019-08-30 13:02
閱讀 2769·2019-08-29 13:22
閱讀 2341·2019-08-26 13:54
閱讀 767·2019-08-26 13:45
閱讀 1586·2019-08-26 11:00
閱讀 982·2019-08-26 10:58