摘要:方法接受一個(gè)生產(chǎn)者作為參數(shù),返回一個(gè)對(duì)象,該對(duì)象完成異步執(zhí)行后會(huì)讀取調(diào)用生產(chǎn)者方法的返回值。該方法接收一個(gè)對(duì)象構(gòu)成的數(shù)組,返回由第一個(gè)執(zhí)行完畢的對(duì)象的返回值構(gòu)成的。
一、Future 接口
在Future中觸發(fā)那些潛在耗時(shí)的操作把調(diào)用線程解放出來,讓它能繼續(xù)執(zhí)行其他有價(jià)值的工作,不再需要呆呆等待耗時(shí)的操作完成。打個(gè)比方,你可以把它想象成這樣的場景:你拿了一袋子衣服到你中意的干洗店去洗。干洗店的員工會(huì)給你張發(fā)票,告訴你什么時(shí)候你的衣服會(huì)洗好(這就是一個(gè)Future事件)。衣服干洗的同時(shí),你可以去做其他的事情。Future的另一個(gè)優(yōu)點(diǎn)是它比更底層的Thread更易用。要使用Future,通常你只需要將耗時(shí)的操作封裝在一個(gè)Callable對(duì)象中,再將它提交給ExecutorService,就萬事大吉了。
ExecutorService executor = Executors.newCachedThreadPool(); Futurefuture = executor.submit(new Callable () { public Double call() { return doSomeLongComputation(); }}); doSomethingElse(); //異步操作進(jìn)行的同時(shí),你可以做其他的事情 try { Double result = future.get(1, TimeUnit.SECONDS); //獲取異步操作的結(jié)果,如果最終被阻塞,無法得到結(jié) //果,那么在最多等待1秒鐘之后退出 } catch (ExecutionException ee) { // 計(jì)算拋出一個(gè)異常 } catch (InterruptedException ie) { // 當(dāng)前線程在等待過程中被中斷 } catch (TimeoutException te) { // 在Future對(duì)象完成之前超過已過期 }
同步API與異步API二、實(shí)現(xiàn)異步 API同步API其實(shí)只是對(duì)傳統(tǒng)方法調(diào)用的另一種稱呼:你調(diào)用了某個(gè)方法,調(diào)用方在被調(diào)用方運(yùn)行的過程中會(huì)等待,被調(diào)用方運(yùn)行結(jié)束返回,調(diào)用方取得被調(diào)用方的返回值并繼續(xù)運(yùn)行。即使調(diào)用方和被調(diào)用方在不同的線程中運(yùn)行,調(diào)用方還是需要等待被調(diào)用方結(jié)束運(yùn)行,這就是阻塞式調(diào)用這個(gè)名詞的由來。
與此相反,異步API會(huì)直接返回,或者至少在被調(diào)用方計(jì)算完成之前,將它剩余的計(jì)算任務(wù)交給另一個(gè)線程去做,該線程和調(diào)用方是異步的——這就是非阻塞式調(diào)用的由來。執(zhí)行剩余計(jì)算任務(wù)的線程會(huì)將它的計(jì)算結(jié)果返回給調(diào)用方。返回的方式要么是通過回調(diào)函數(shù),要么是由調(diào)用方再次執(zhí)行一個(gè)“等待,直到計(jì)算完成”的方法調(diào)用。這種方式的計(jì)算在I/O系統(tǒng)程序設(shè)計(jì)中非常常見:你發(fā)起了一次磁盤訪問,這次訪問和你的其他計(jì)算操作是異步的,你完成其他的任務(wù)時(shí),磁盤塊的數(shù)據(jù)可能還沒載入到內(nèi)存,你只需要等待數(shù)據(jù)的載入完成。
使用CompletableFuture后,getPriceAsync方法的實(shí)現(xiàn)
public FuturegetPriceAsync(String product) { CompletableFuture futurePrice = new CompletableFuture<>(); new Thread( () -> { double price = calculatePrice(product); //calculatePrice需長時(shí)間計(jì)算,任務(wù)結(jié)束并得出結(jié)果時(shí)設(shè)置 //Future的返回值 futurePrice.complete(price); }).start(); return futurePrice; //無需等待還沒結(jié)束的計(jì)算,直接返回Future對(duì)象 }
使用異步API:
Shop shop = new Shop("BestShop"); long start = System.nanoTime(); FuturefuturePrice = shop.getPriceAsync("my favorite product"); long invocationTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Invocation returned after " + invocationTime + " msecs"); // 執(zhí)行更多任務(wù),比如查詢其他商店 doSomethingElse(); // 在計(jì)算商品價(jià)格的同時(shí) try { double price = futurePrice.get(); //從Future對(duì)象中讀取價(jià)格,如果價(jià)格未知,會(huì)發(fā)生阻塞 System.out.printf("Price is %.2f%n", price); } catch (Exception e) { throw new RuntimeException(e); } long retrievalTime = ((System.nanoTime() - start) / 1_000_000); System.out.println("Price returned after " + retrievalTime + " msecs");
Stream和CompletableFuture的設(shè)計(jì)都遵循了類似的模式:它們都使用了Lambda表達(dá)式以及流水線的思想。CompletableFuture和Future的關(guān)系就跟Stream和Collection的關(guān)系一樣。
錯(cuò)誤處理
如果計(jì)算商品價(jià)格的方法出現(xiàn)異常,用于提示錯(cuò)誤的異常會(huì)被限制在試圖計(jì)算商品價(jià)格的當(dāng)前線程的范圍內(nèi),最終會(huì)殺死該線程,而這會(huì)導(dǎo)致等待get方法返回結(jié)果的客戶端永久地被阻塞。為了避免這種情況,你需要使用CompletableFuture的completeExceptionally方法將導(dǎo)致CompletableFuture內(nèi)發(fā)生問題的異常拋出。
拋出CompletableFuture內(nèi)的異常:
public FuturegetPriceAsync( String product ) { CompletableFuture futurePrice = new CompletableFuture<>(); new Thread( () - > { try { double price = calculatePrice( product ); futurePrice.complete( price ); } catch ( Exception ex ) { futurePrice.completeExceptionally( ex ); } } ).start(); return(futurePrice); }
使用工廠方法supplyAsync創(chuàng)建CompletableFuture對(duì)象:
public FuturegetPriceAsync(String product) { return CompletableFuture.supplyAsync(() -> calculatePrice(product)); }
此處getPriceAsync方法返回的CompletableFuture對(duì)象和上面你手工創(chuàng)建和完成的CompletableFuture對(duì)象是完全等價(jià)的,這意味著它提供了同樣的錯(cuò)誤管理機(jī)制。supplyAsync方法接受一個(gè)生產(chǎn)者(Supplier)作為參數(shù),返回一個(gè)CompletableFuture對(duì)象,該對(duì)象完成異步執(zhí)行后會(huì)讀取調(diào)用生產(chǎn)者方法的返回值。生產(chǎn)者方法會(huì)交由ForkJoinPool池中的某個(gè)執(zhí)行線程(Executor)運(yùn)行,但是你也可以使用supplyAsync方法的重載版本,傳遞第二個(gè)參數(shù)指定不同的執(zhí)行線程執(zhí)行生產(chǎn)者方法。
三、讓你的代碼免受阻塞之苦在所有店鋪中找出同一商品的價(jià)格,使用CompletableFuture實(shí)現(xiàn)findPrices方法
public ListfindPrices(String product) { List > priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getName() + " price is " + shop.getPrice(product))) .collect(Collectors.toList()); return priceFutures.stream() .map(CompletableFuture::join) .collect(toList()); }
這里使用了兩個(gè)不同的Stream流水線,而不是在同一個(gè)處理流的流水線上一個(gè)接一個(gè)地放置兩個(gè)map操作——這其實(shí)是有緣由的。考慮流操作之間的延遲特性,如果你在單一流水線中處理流,發(fā)向不同商家的請(qǐng)求只能以同步、順序執(zhí)行的方式才會(huì)成功。因此,每個(gè)創(chuàng)建CompletableFuture對(duì)象只能在前一個(gè)操作結(jié)束之后執(zhí)行查詢指定商家的動(dòng)作、通知join方法返回計(jì)算結(jié)果。
CompletableFuture類中的join方法和Future接口中的get有相同的含義,并且也聲明在Future接口中,它們唯一的不同是join不會(huì)拋出任何檢測到的異常。使用它你不再需要使用try/catch語句塊讓你傳遞給第二個(gè)map方法的Lambda表達(dá)式變得過于臃腫。
使用定制的執(zhí)行器:
調(diào)整線程池的大小
Brian Goetz建議,線程池大小與處理器的利用率之比可以使用下面的公式進(jìn)行估算:
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
?NCPU是處理器的核的數(shù)目,可以通過Runtime.getRuntime().availableProcessors()得到
?UCPU是期望的CPU利用率(該值應(yīng)該介于0和1之間)
?W/C是等待時(shí)間與計(jì)算時(shí)間的比率
實(shí)際操作中,如果你創(chuàng)建
的線程數(shù)比商店的數(shù)目更多,反而是一種浪費(fèi),因?yàn)檫@樣做之后,你線程池中的有些線程根本沒有機(jī)會(huì)被使用。出于這種考慮,我們建議你將執(zhí)行器使用的線程數(shù),與你需要查詢的商店數(shù)目設(shè)定為同一個(gè)值,這樣每個(gè)商店都應(yīng)該對(duì)應(yīng)一個(gè)服務(wù)線程。不過,為了避免發(fā)生由于商店的數(shù)目過多導(dǎo)致服務(wù)器超負(fù)荷而崩潰,你還是需要設(shè)置一個(gè)上限,比如100個(gè)線程。代碼清單如下所示。為“最優(yōu)價(jià)格查詢器”應(yīng)用定制的執(zhí)行器:
private final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100), new ThreadFactory() { public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } });
并行——使用流還是CompletableFutures?四、對(duì)多個(gè)異步任務(wù)進(jìn)行流水線操作 1.thenCompose
目前為止,你已經(jīng)知道對(duì)集合進(jìn)行并行計(jì)算有兩種方式:要么將其轉(zhuǎn)化為并行流,利用map這樣的操作開展工作,要么枚舉出集合中的每一個(gè)元素,創(chuàng)建新的線程,在CompletableFuture內(nèi)對(duì)其進(jìn)行操作。后者提供了更多的靈活性,你可以調(diào)整線程池的大小,而這能幫助你確保整體的計(jì)算不會(huì)因?yàn)榫€程都在等待I/O而發(fā)生阻塞。
我們對(duì)使用這些API的建議如下。
?如果你進(jìn)行的是計(jì)算密集型的操作,并且沒有I/O,那么推薦使用Stream接口,因?yàn)閷?shí)現(xiàn)簡單,同時(shí)效率也可能是最高的(如果所有的線程都是計(jì)算密集型的,那就沒有必要?jiǎng)?chuàng)建比處理器核數(shù)更多的線程)。
?反之,如果你并行的工作單元還涉及等待I/O的操作(包括網(wǎng)絡(luò)連接等待),那么使用CompletableFuture靈活性更好,你可以像前文討論的那樣,依據(jù)等待/計(jì)算,或者W/C的比率設(shè)定需要使用的線程數(shù)。這種情況不使用并行流的另一個(gè)原因是,處理流的流水線中如果發(fā)生I/O等待,流的延遲特性會(huì)讓我們很難判斷到底什么時(shí)候觸發(fā)了等待。
使用CompletableFuture實(shí)現(xiàn)findPrices方法(獲取商品折扣后價(jià)格):
public ListfindPrices(String product) { List > priceFutures = shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPrice(product), executor))//getPrice耗時(shí)操作,獲取商品的價(jià)格字符串,使用異步方式 .map(future -> future.thenApply(Quote::parse)) //將價(jià)格字符串解析成Quote對(duì)象(包裝了價(jià)格,折扣率等) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor))) //異步計(jì)算商品最終價(jià)格 .collect(toList()); return priceFutures.stream() .map(CompletableFuture::join) //等待流中的所有Future執(zhí)行完畢,并提取各自的返回值 .collect(toList()); }
thenapply()是返回的是非CompletableFuture類型:它的功能相當(dāng)于將CompletableFuture轉(zhuǎn)換成CompletableFuture。
thenCompose()用來連接兩個(gè)CompletableFuture,返回值是新的CompletableFuture:
thenCompose方法允許你對(duì)兩個(gè)異步操作進(jìn)行流水線,第一個(gè)操作完成時(shí),將其結(jié)果作為參數(shù)傳遞給第二個(gè)操作。
CompletableFuture類中的其他方法一樣,也提供了一個(gè)以Async后綴結(jié)尾的版本thenComposeAsync。通常而言,名稱中不帶Async2.用thenCombine將兩個(gè) CompletableFuture 對(duì)象整合起來,無論它們是否存在依賴
的方法和它的前一個(gè)任務(wù)一樣,在同一個(gè)線程中運(yùn)行;而名稱以Async結(jié)尾的方法會(huì)將后續(xù)的任務(wù)提交到一個(gè)線程池,所以每個(gè)任務(wù)是由不同的線程處理的。
thenCombine方法,它接收名為BiFunction的第二參數(shù),這個(gè)參數(shù)定義了當(dāng)兩個(gè)CompletableFuture對(duì)象完成計(jì)算后,結(jié)果如何合并。同thenCompose方法一樣,thenCombine方法也提供有一個(gè)Async的版本。這里,如果使用thenCombineAsync會(huì)導(dǎo)致BiFunction中定義的合并操作被提交到線程池中,由另一個(gè)任務(wù)以異步的方式執(zhí)行。
eg:有一家商店提供的價(jià)格是以歐元(EUR)計(jì)價(jià)的,但是你希望以美元的方式提供給你的客戶:
Future五、響應(yīng) CompletableFuture 的 completion 事件futurePriceInUSD = CompletableFuture.supplyAsync(() -> shop.getPrice(product)) .thenCombine( CompletableFuture.supplyAsync( () -> exchangeService.getRate(Money.EUR, Money.USD)), (price, rate) -> price * rate );
只要有商店返回商品價(jià)格就在第一時(shí)間顯示返回值,不再等待那些還未返回的商店(有些甚至?xí)l(fā)生超時(shí))。Java 8的CompletableFuture通 過thenAccept方法提供了這一功能,它接收CompletableFuture執(zhí)行完畢后的返回值做參數(shù)。
重構(gòu)findPrices方法返回一個(gè)由Future構(gòu)成的流
public Stream> findPricesStream(String product) { return shops.stream() .map(shop -> CompletableFuture.supplyAsync( () -> shop.getPrice(product), executor)) .map(future -> future.thenApply(Quote::parse)) .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync( () -> Discount.applyDiscount(quote), executor))); } findPricesStream("myPhone").map(f -> f.thenAccept(System.out::println));
由 于thenAccept方法已經(jīng)定義了如何處理CompletableFuture返回的結(jié)果,一旦CompletableFuture計(jì)算得到結(jié)果,它就返回一個(gè)CompletableFuture
你還希望能給最慢的商店一些機(jī)會(huì),讓它有機(jī)會(huì)打印輸出返回的價(jià)格。為了實(shí)現(xiàn)這一目的,你可以把構(gòu)成Stream的所有CompletableFuture
CompletableFuture[] futures = findPricesStream("myPhone") .map(f -> f.thenAccept(System.out::println)) .toArray(size -> new CompletableFuture[size]); CompletableFuture.allOf(futures).join();
allOf工廠方法接收一個(gè)由CompletableFuture構(gòu)成的數(shù)組,數(shù)組中的所有CompletableFuture對(duì)象執(zhí)行完成之后,它返回一個(gè)CompletableFuture
CompletableFuture執(zhí)行join操作是個(gè)不錯(cuò)的主意。
你可能希望只要CompletableFuture對(duì)象數(shù)組中有任何一個(gè)執(zhí)行完畢就不再等待,比如,你正在查詢兩個(gè)匯率服務(wù)器,任何一個(gè)返回了結(jié)果都能滿足你的需求。在這種情況下,你可以使用一個(gè)類似的工廠方法anyOf。該方法接收一個(gè)CompletableFuture對(duì)象構(gòu)成的數(shù)組,返回由第一個(gè)執(zhí)行完畢的CompletableFuture對(duì)象的返回值構(gòu)成的CompletableFuture
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/74370.html
摘要:中使用了提供的原生接口對(duì)自身的異步化做了改進(jìn)。可以支持和兩種調(diào)用方式。實(shí)戰(zhàn)通過下面的例子,可以看出的最大好處特性。 showImg(https://segmentfault.com/img/remote/1460000020032427?w=1240&h=655); 前段時(shí)間工作上比較忙,這篇文章一直沒來得及寫,本文是閱讀《Java8實(shí)戰(zhàn)》的時(shí)候,了解到Java 8里已經(jīng)提供了一個(gè)異步...
摘要:方法接收的是的實(shí)例,但是它沒有返回值方法是函數(shù)式接口,無參數(shù),會(huì)返回一個(gè)結(jié)果這兩個(gè)方法是的升級(jí),表示讓任務(wù)在指定的線程池中執(zhí)行,不指定的話,通常任務(wù)是在線程池中執(zhí)行的。該的接口是在線程使用舊的接口,它不允許返回值。 簡介 作為Java 8 Concurrency API改進(jìn)而引入,本文是CompletableFuture類的功能和用例的介紹。同時(shí)在Java 9 也有對(duì)Completab...
摘要:在這種方式中,主線程不會(huì)被阻塞,不需要一直等到子線程完成。主線程可以并行的執(zhí)行其他任務(wù)。如果我們不想等待結(jié)果返回,我們可以把需要等待完成執(zhí)行的邏輯寫入到回調(diào)函數(shù)中。任何立即執(zhí)行完成那就是執(zhí)行在主線程中嘗試刪除測試下。可以使用達(dá)成目的。 Java 8 有大量的新特性和增強(qiáng)如 Lambda 表達(dá)式,Streams,CompletableFuture等。在本篇文章中我將詳細(xì)解釋清楚Compl...
摘要:并行流與目前,我們對(duì)集合進(jìn)行計(jì)算有兩種方式并行流而更加的靈活,我們可以配置線程池的大小確保整體的計(jì)算不會(huì)因?yàn)榈却l(fā)生阻塞。 【回顧Future接口 Future接口時(shí)java5引入的,設(shè)計(jì)初衷是對(duì)將來某個(gè)時(shí)刻會(huì)發(fā)生的結(jié)果建模。它建模了一種異步計(jì)算,返回了一個(gè)執(zhí)行預(yù)算結(jié)果的引用。比如,你去干洗店洗衣服,店員會(huì)告訴你什么時(shí)候可以來取衣服,而不是讓你一直在干洗店等待。要使用Future只需...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)常可見它的使用,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...
摘要:表示的是兩個(gè),當(dāng)其中任意一個(gè)計(jì)算完并發(fā)編程之是線程安全并且高效的,在并發(fā)編程中經(jīng)常可見它的使用,在開始分析它的高并發(fā)實(shí)現(xiàn)機(jī)制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個(gè)比較典型的互聯(lián)網(wǎng)高并發(fā)場景。 干貨:深度剖析分布式搜索引擎設(shè)計(jì) 分布式,高可用,和機(jī)器學(xué)習(xí)一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個(gè)名詞,今天我們首先來說說分布式。 探究...
閱讀 3095·2021-10-15 09:41
閱讀 3167·2021-09-22 16:05
閱讀 2405·2021-09-22 15:19
閱讀 2873·2021-09-02 15:11
閱讀 2446·2019-08-30 15:52
閱讀 832·2019-08-30 11:06
閱讀 1001·2019-08-29 16:44
閱讀 1240·2019-08-23 18:18