摘要:首先想到的是開啟一個(gè)新的線程去做某項(xiàng)工作。再進(jìn)一步,為了讓新線程可以返回一個(gè)值,告訴主線程事情做完了,于是乎粉墨登場。然而提供的方式是主線程主動(dòng)問詢新線程,要是有個(gè)回調(diào)函數(shù)就爽了。極大的提高效率。
引子
為了讓程序更加高效,讓CPU最大效率的工作,我們會(huì)采用異步編程。首先想到的是開啟一個(gè)新的線程去做某項(xiàng)工作。再進(jìn)一步,為了讓新線程可以返回一個(gè)值,告訴主線程事情做完了,于是乎Future粉墨登場。然而Future提供的方式是主線程主動(dòng)問詢新線程,要是有個(gè)回調(diào)函數(shù)就爽了。所以,為了滿足Future的某些遺憾,強(qiáng)大的CompletableFuture隨著Java8一起來了。
Future傳統(tǒng)多線程的卻讓程序更加高效,畢竟是異步,可以讓CPU充分工作,但這僅限于新開的線程無需你的主線程再費(fèi)心了。比如你開啟的新線程僅僅是為了計(jì)算1+...+n再打印結(jié)果。有時(shí)候你需要子線程返回計(jì)算結(jié)果,在主線程中進(jìn)行進(jìn)一步計(jì)算,就需要Future了。
看下面這個(gè)例子,主線程計(jì)算2+4+6+8+10;子線程計(jì)算1+3+5+7+9;最后需要在主線程中將兩部分結(jié)果再相加。
public class OddNumber implements Callable{ @Override public Integer call() throws Exception { Thread.sleep(3000); int result = 1 + 3 + 5 + 7 + 9; return result; } }
public class FutureTest { public static void main(String[] args) { ExecutorService executor = Executors.newCachedThreadPool(); OddNumber oddNumber = new OddNumber(); Futurefuture = executor.submit(oddNumber); long startTime = System.currentTimeMillis(); int evenNumber = 2 + 4 + 6 + 8 + 10; try { Thread.sleep(1000); System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒"); int oddNumberResult = future.get();//這時(shí)間會(huì)被阻塞 System.out.println("1+2+...+9+10="+(evenNumber+oddNumberResult)); System.out.println("1.開始了:"+ (System.currentTimeMillis()-startTime) +"秒"); } catch (Exception e) { System.out.println(e); } } } 輸出結(jié)果: 0.開始了:1001秒 1+2+...+9+10=55 1.開始了:3002秒
看一下Future接口,只有五個(gè)方法比較簡單
//取消任務(wù),如果已經(jīng)完成或者已經(jīng)取消,就返回失敗 boolean cancel(boolean mayInterruptIfRunning); //查看任務(wù)是否取消 boolean isCancelled(); //查看任務(wù)是否完成 boolean isDone(); //剛才用到了,查看結(jié)果,任務(wù)未完成就一直阻塞 V get() throws InterruptedException, ExecutionException; //同上,但是加了一個(gè)過期時(shí)間,防止長時(shí)間阻塞,主線程也做不了事情 V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;CompletableFuture
上面的看到Future的五個(gè)方法,不是很豐富,既然我們的主線程叫做main,就應(yīng)該以我為主,我更希望子線程做完了事情主動(dòng)通知我。為此,Java8帶來了CompletableFuture,一個(gè)Future的實(shí)現(xiàn)類。其實(shí)CompletableFuture最迷人的地方并不是極大豐富了Future的功能,而是完美結(jié)合了Java8流的新特性。
實(shí)現(xiàn)回調(diào),自動(dòng)后續(xù)操作提前說一下CompletableFuture實(shí)現(xiàn)回調(diào)的方法(之一):thenAccept()
public CompletableFuturethenAccept(Consumer super T> action) { return uniAcceptStage(null, action); }
參數(shù)有個(gè)Consumer,用到了Java8新特性,行為參數(shù)化,就是參數(shù)不一定是基本類型或者類,也可使是函數(shù)(行為),或者說一個(gè)方法(接口)。
public class OddNumberPlus implements Supplier{ @Override public Integer get() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return 1+3+5+7+9; } }
public class CompletableFutureTest { public static void main(String[] args) { long startTime = System.currentTimeMillis(); final int evenNumber = 2 + 4 + 6 + 8 + 10; CompletableFutureoddNumber = CompletableFuture.supplyAsync(new OddNumberPlus()); try { Thread.sleep(1000); System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒"); //看這里,實(shí)現(xiàn)回調(diào) oddNumber.thenAccept(oddNumberResult-> { System.out.println("1.開始了:"+ (System.currentTimeMillis()-startTime) +"秒"); System.out.println("此時(shí)計(jì)算結(jié)果為:"+(evenNumber+oddNumberResult)); }); oddNumber.get(); } catch (Exception e) { System.out.println(e); } } } 輸出結(jié)果: 0.開始了:1006秒 1.開始了:3006秒 此時(shí)計(jì)算結(jié)果為:55
值得一提的是,本例中并沒有顯示的創(chuàng)建任務(wù)連接池,程序會(huì)默認(rèn)選擇一個(gè)任務(wù)連接池ForkJoinPool.commonPool()
private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
ForkJoinPool始自JDK7,叫做分支/合并框架。可以通過將一個(gè)任務(wù)遞歸分成很多分子任務(wù),形成不同的流,進(jìn)行并行執(zhí)行,同時(shí)還伴隨著強(qiáng)大的工作竊取算法。極大的提高效率。當(dāng)然,你也可以自己指定連接池。
CompletableFuture合并Java8的確豐富了Future實(shí)現(xiàn),CompletableFuture有很多方法可供大家使用,但是但從上面的例子來看,其實(shí)CompletableFuture能做的功能,貌似Future。畢竟你CompletableFuture用get()這個(gè)方法的時(shí)候還不是阻塞了,我Future蠻可以自己拿到返回值,再手動(dòng)執(zhí)行一些操作嘛(雖說這樣main方法一定很不爽)。那么接下來的事情,F(xiàn)uture做起來就十分麻煩了。假設(shè)我們main方法只做奇數(shù)合集加上偶數(shù)合集這一個(gè)操作,提前算這兩個(gè)合集的操作異步交給兩個(gè)子線程,我們需要怎么做呢?沒錯(cuò),開啟兩個(gè)線程,等到兩個(gè)線程都計(jì)算結(jié)束的時(shí)候,我們進(jìn)行最后的相加,問題在于,你怎么知道那個(gè)子線程最后結(jié)束的呢?(貌似可以做個(gè)輪詢,不定的調(diào)用isDone()這個(gè)方法...)豐富的CompletableFuture功能為我們提供了一個(gè)方法,用于等待兩個(gè)子線程都結(jié)束了,再進(jìn)行相加操作:
//asyncPool就是上面提到的默認(rèn)線程池ForkJoinPool public CompletableFuturethenCombineAsync( CompletionStage extends U> other, BiFunction super T,? super U,? extends V> fn) { return biApplyStage(asyncPool, other, fn); }
看個(gè)例子:
public class OddCombine implements Supplier{ @Override public Integer get() { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return 1+3+5+7+9; } }
public class EvenCombine implements Supplier{ @Override public Integer get() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return 2+4+6+8+10; } }
public class CompletableCombineTest { public static void main(String[] args) throws Exception{ CompletableFutureoddNumber = CompletableFuture.supplyAsync(new OddCombine()); CompletableFuture evenNumber = CompletableFuture.supplyAsync(new EvenCombine()); long startTime = System.currentTimeMillis(); CompletableFuture resultFuturn = oddNumber.thenCombine(evenNumber,(odd,even)->{ return odd + even; }); System.out.println(resultFuturn.get()); System.out.println("0.開始了:"+ (System.currentTimeMillis()-startTime) +"秒"); } } 輸出結(jié)果: 55 0.開始了:3000秒
這邊模擬一個(gè)睡1秒,一個(gè)睡3秒,但是真正的網(wǎng)絡(luò)請求時(shí)間是不定的。是不是很爽,最爽的還不是現(xiàn)象,而是以上操作已經(jīng)利用了Java8流的概念。
兩個(gè)子線程還不夠,那么還有anyOff()函數(shù),可以承受多個(gè)CompletableFuture,會(huì)等待所有任務(wù)都完成。
public static CompletableFutureallOf(CompletableFuture>... cfs) { return andTree(cfs, 0, cfs.length - 1); }
與它長的很像的,有個(gè)方法,是當(dāng)?shù)谝粋€(gè)執(zhí)行結(jié)束的時(shí)候,就結(jié)束,后面任務(wù)不再等了,可以看作充分條件。
public static CompletableFuture
在上面那個(gè)例子的基礎(chǔ)上,把OddNumberPlus類時(shí)間調(diào)長一點(diǎn):
public class OddNumberPlus implements Supplier{ @Override public Integer get() { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return 1+3+5+7+9; } }
public class CompletableCombineTest { public static void main(String[] args) throws Exception{ CompletableFuture小結(jié)oddNumber = CompletableFuture.supplyAsync(new OddCombine()); CompletableFuture evenNumber = CompletableFuture.supplyAsync(new EvenCombine()); CompletableFuture testNumber = CompletableFuture.supplyAsync(new OddNumberPlus()); long startTime = System.currentTimeMillis(); CompletableFuture
CompletableFuture的方法其實(shí)還有很多,常用的比如說runAsync(),類似于supplyAsync(),只是沒有返回值;除了thenApply()可以加回調(diào)函數(shù)以外,還有thenApply();還有注入runAfterBoth()、runAfterEither(),這些見名知意。還有很多,可以點(diǎn)開CompletableFuture這個(gè)類的源碼仔細(xì)看一看。見微知著,透過CompletableFuture,更加感覺到Java8的強(qiáng)大,強(qiáng)大的流概念、行為參數(shù)化、高效的并行理念等等,不僅讓Java寫起來更爽,還不斷豐富Java整個(gè)生態(tài)。Java一直在進(jìn)步,所以沒有被時(shí)代淘汰,我們Javaer也可以繼續(xù)職業(yè)生涯,感謝Java,一起進(jìn)步。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/75459.html
摘要:內(nèi)部類,用于對和異常進(jìn)行包裝,從而保證對進(jìn)行只有一次成功。是取消異常,轉(zhuǎn)換后拋出。判斷是否使用的線程池,在中持有該線程池的引用。 前言 近期作者對響應(yīng)式編程越發(fā)感興趣,在內(nèi)部分享JAVA9-12新特性過程中,有兩處特性讓作者深感興趣:1.JAVA9中的JEP266對并發(fā)編程工具的更新,包含發(fā)布訂閱框架Flow和CompletableFuture加強(qiáng),其中發(fā)布訂閱框架以java.base...
摘要:組合式異步編程最近這些年,兩種趨勢不斷地推動(dòng)我們反思我們設(shè)計(jì)軟件的方式。第章中介紹的分支合并框架以及并行流是實(shí)現(xiàn)并行處理的寶貴工具它們將一個(gè)操作切分為多個(gè)子操作,在多個(gè)不同的核甚至是機(jī)器上并行地執(zhí)行這些子操作。 CompletableFuture:組合式異步編程 最近這些年,兩種趨勢不斷地推動(dòng)我們反思我們設(shè)計(jì)軟件的方式。第一種趨勢和應(yīng)用運(yùn)行的硬件平臺相關(guān),第二種趨勢與應(yīng)用程序的架構(gòu)相關(guān)...
摘要:方法接收的是的實(shí)例,但是它沒有返回值方法是函數(shù)式接口,無參數(shù),會(huì)返回一個(gè)結(jié)果這兩個(gè)方法是的升級,表示讓任務(wù)在指定的線程池中執(zhí)行,不指定的話,通常任務(wù)是在線程池中執(zhí)行的。該的接口是在線程使用舊的接口,它不允許返回值。 簡介 作為Java 8 Concurrency API改進(jìn)而引入,本文是CompletableFuture類的功能和用例的介紹。同時(shí)在Java 9 也有對Completab...
摘要:在這種方式中,主線程不會(huì)被阻塞,不需要一直等到子線程完成。主線程可以并行的執(zhí)行其他任務(wù)。如果我們不想等待結(jié)果返回,我們可以把需要等待完成執(zhí)行的邏輯寫入到回調(diào)函數(shù)中。任何立即執(zhí)行完成那就是執(zhí)行在主線程中嘗試刪除測試下。可以使用達(dá)成目的。 Java 8 有大量的新特性和增強(qiáng)如 Lambda 表達(dá)式,Streams,CompletableFuture等。在本篇文章中我將詳細(xì)解釋清楚Compl...
摘要:這個(gè)方法返回與等待所有返回等待多個(gè)返回取多個(gè)當(dāng)中最快的一個(gè)返回等待多個(gè)當(dāng)中最快的一個(gè)返回二詳解終極指南并發(fā)編程中的風(fēng)格 thenApply(等待并轉(zhuǎn)化future) @Test public void testThen() throws ExecutionException, InterruptedException { CompletableFutur...
閱讀 845·2019-08-30 15:54
閱讀 3316·2019-08-29 15:33
閱讀 2701·2019-08-29 13:48
閱讀 1213·2019-08-26 18:26
閱讀 3333·2019-08-26 13:55
閱讀 1476·2019-08-26 10:45
閱讀 1164·2019-08-26 10:19
閱讀 305·2019-08-26 10:16