摘要:并行流與目前,我們對集合進行計算有兩種方式并行流而更加的靈活,我們可以配置線程池的大小確保整體的計算不會因為等待而發生阻塞。
【回顧Future接口
Future接口時java5引入的,設計初衷是對將來某個時刻會發生的結果建模。它建模了一種異步計算,返回了一個執行預算結果的引用。比如,你去干洗店洗衣服,店員會告訴你什么時候可以來取衣服,而不是讓你一直在干洗店等待。要使用Future只需要將耗時操作封裝在一個Callable對象中,再將其提交給ExecutorService就可以了。
ExecutorService executor = Executors.newFixedThreadPool(10); Futurefuture = executor.submit(new Callable () { @Override public Double call() throws Exception { return doSomeLongComputation(); } }); doSomethingElse(); try { //最多等待1秒 Double result = future.get(1,TimeUnit.SECONDS); } catch (InterruptedException e) { //當前線程等待過程中被打斷 e.printStackTrace(); } catch (ExecutionException e) { //計算時出現異常 e.printStackTrace(); } catch (TimeoutException e) { //完成計算前就超時 e.printStackTrace(); }
但是Future依然有一些局限性:
無法將兩個異步計算的結果合并為一個。
等待Future集合中所有任務完成。
等待Future集合中最快任務完成(選擇最優的執行方案)。
通過編程的方式完成一個Future任務的執行(手工設定異步結果處理)。
應對Future的完成事件,當Future的完成事件發生時會收到通知,并可以使用Future的結果進行下一步操作,不只是簡單的阻塞等待。
而CompletableFuture類實現了Future接口,可以將上述的問題全部解決。CompletableFuture與Stream的設計都遵循了類似的設計模式:使用Lambda表達式以及流水線的思想,從這個角度可以說CompletableFuture與Future的關系類似于Stream與Collection的關系。
【構建一個異步應用最佳價格查詢器:查詢多個線上商店對同一商品的價格。
首先構建商店對象:
package BestPriceFinder; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; public class Shop { private String name; public Shop(String name){ this.name = name; } public String getName(){ return name; } /** * 異步api:使用創建CompletableFuture類提供的工廠方法與getPriceAsync()效果完全一致 * 可以更輕易的完成這個流程,并且不用擔心實現細節 * @param product * @return */ public FuturegetPriceAsyncByFactory(String product){ return CompletableFuture.supplyAsync(() -> calculatePrice(product)); } /** * 異步api: * @param product * @return */ public Future getPriceAsync(String product){ //創建CompletableFuture對象,它將包含計算結果 CompletableFuture futurePrice = new CompletableFuture<>(); //在新線程中異步計算結果 new Thread(() -> { try { double price = calculatePrice(product); //需要長時間計算的任務結束時,設置future的返回值 futurePrice.complete(price); }catch (Exception e){ //如這里沒有使用completeExceptionally,線程不會結束,調用方會永遠的執行下去 futurePrice.completeExceptionally(e); } }).start(); //無需等待計算結果,直接返回future對象 return futurePrice; } /** * 同步api: * 每個商店都需要提供的查詢api:根據名稱返回價格; * 模擬查詢數據庫等一些耗時操作:使用delay()模擬這些耗時操作。 * @param product * @return */ public double getPrice(String product){ return calculatePrice(product); } private double calculatePrice(String product){ delay(); return random.nextDouble() * product.charAt(0) + product.charAt(1); } private Random random = new Random(); /** * 模擬耗時操作:延遲一秒 */ private static void delay(){ try { Thread.sleep(1000L); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
下面我們針對Shop.java提供的同步方法與異步方法來進行測試:
package BestPriceFinder; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.stream.Collectors; /** * 最佳價格查詢器 */ public class BestFinder { Listshops = Arrays.asList( new Shop("A"), new Shop("B"), new Shop("C"), new Shop("D"), new Shop("E"), new Shop("F"), new Shop("G"), new Shop("H"), new Shop("I"), new Shop("J") ); /** * 順序查詢 */ public List findPrices(String product){ return shops.stream() .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))) .collect(Collectors.toList()); } /** * 并行流查詢 */ public List findPricesParallel(String product){ return shops.parallelStream() .map(shop -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product))) .collect(Collectors.toList()); } /** * 異步查詢 * 相比并行流的話CompletableFuture更有優勢:可以對執行器配置,設置線程池大小 */ @SuppressWarnings("all") private final Executor myExecutor = Executors.newFixedThreadPool(Math.min(shops.size(), 800), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); //使用守護線程保證不會阻止程序的關停 t.setDaemon(true); return t; } }); @SuppressWarnings("all") public List findPricesAsync(String product){ List > priceFuctures = shops.stream() .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)),myExecutor)) .collect(Collectors.toList()); /** 這里需要使用新的stream來等待所有的子線程執行完, * 因為:如果在一個stream中使用兩個map: * List > priceFuctures = shops.parallelStream() * .map(shop -> CompletableFuture.supplyAsync(() -> String.format("%s price is %.2f",shop.getName(),shop.getPrice(product)))) * .map(c -> c.join()).collect(Collectors.toList()) * .collect(Collectors.toList()); * 考慮到流操作之間的延遲特性。如果你在單一的流水線中處理流,發向不同商家的請求只能以同步順序的方式執行才會成功。因此每個創建CompletableFuture * 對象只能在前一個操作結束之后執行查詢商家動作。 */ return priceFuctures.stream().map(c -> c.join()).collect(Collectors.toList()); } }
@Test public void findPrices(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPrices("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); } @Test public void findPricesParallel(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPrices("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); } @Test public void findPricesAsync(){ BestFinder bestFinder = new BestFinder(); long st = System.currentTimeMillis(); System.out.println(bestFinder.findPricesAsync("iPhonX")); System.out.println("done : " + (System.currentTimeMillis() - st) + "msecs"); }
同步api測試結果:毫無疑問是10秒之上
并行流獲取同步api測試結果:也是10秒之上,但是并行流不是很高效嗎?怎么會如此凄慘呢?因為這與并行流可以調用的系統核數相關,我的計算機是8核,最多8個線程同時運行。而商店有10個,也就是說,我們的兩個線程會一直等待前面的某一個線程釋放出空閑才能繼續運行。
異步獲取api測試結果:一秒左右
為何差距如此大呢?
明智的選擇是創建了一個配有線程池的執行器,線程池中線程的數目取決于你的應用需要處理的負擔,但是你該如何選擇合適的線程數目呢?
《Java并發編程實戰》中給出如下公式:
Number = NCpu * Ucpu * ( 1 + W/C) Number : 線程數量 NCpu : 處理器核數 UCpu : 期望cpu利用率 W/C : 等待時間與計算時間比
我們這里:99%d的時間是等待商店響應 W/C = 99 ,cpu利用率期望 100% ,NCpu = 9,推斷出 number = 800。但是為了避免過多的線程搞死計算機,我們選擇商店數與計算值中較小的一個。
【并行流與CompletableFuture目前,我們對集合進行計算有兩種方式:1.并行流 2.CompletableFuture;而CompletableFuture更加的靈活,我們可以配置線程池的大小確保整體的計算不會因為等待IO而發生阻塞。
書上給出的建議如下:
如果是計算密集型的操作并且沒有IO推薦stream接口,因為實現簡單效率也高,如果所有的線程都是計算密集型的也就沒有必要創建比核數更多的線程。
反之,如果任務涉及到IO,網絡等操作:CompletableFuture靈活性更好,因為大部分線程處于等待狀態,需要讓他們更加忙碌,并且再邏輯中加入異常處理可以更有效的監控是什么原因觸發了等待。
現在我們知道了如何用CompletableFuture提供異步的api,后面的文章會學習如何利用CompletableFuture高效的操作同步api。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68219.html
摘要:相比與其他操作系統包括其他類系統有很多的優點,其中有一項就是,其上下文切換和模式切換的時間消耗非常少。因為多線程競爭鎖時會引起上下文切換。減少線程的使用。很多編程語言中都有協程。所以如何避免死鎖的產生,在我們使用并發編程時至關重要。 系列文章傳送門: Java多線程學習(一)Java多線程入門 Java多線程學習(二)synchronized關鍵字(1) java多線程學習(二)syn...
摘要:因為多線程競爭鎖時會引起上下文切換。減少線程的使用。舉個例子如果說服務器的帶寬只有,某個資源的下載速度是,系統啟動個線程下載該資源并不會導致下載速度編程,所以在并發編程時,需要考慮這些資源的限制。 最近私下做一項目,一bug幾日未解決,總惶恐。一日頓悟,bug不可怕,怕的是項目不存在bug,與其懼怕,何不與其剛正面。 系列文章傳送門: Java多線程學習(一)Java多線程入門 Jav...
摘要:學習編程的本最佳書籍這些書涵蓋了各個領域,包括核心基礎知識,集合框架,多線程和并發,內部和性能調優,設計模式等。擅長解釋錯誤及錯誤的原因以及如何解決簡而言之,這是學習中并發和多線程的最佳書籍之一。 showImg(https://segmentfault.com/img/remote/1460000018913016); 來源 | 愿碼(ChainDesk.CN)內容編輯 愿碼Slo...
摘要:表示的是兩個,當其中任意一個計算完并發編程之是線程安全并且高效的,在并發編程中經常可見它的使用,在開始分析它的高并發實現機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯網高并發場景。 干貨:深度剖析分布式搜索引擎設計 分布式,高可用,和機器學習一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...
閱讀 2805·2019-08-30 15:55
閱讀 2853·2019-08-30 15:53
閱讀 2289·2019-08-26 13:47
閱讀 2551·2019-08-26 13:43
閱讀 3153·2019-08-26 13:33
閱讀 2794·2019-08-26 11:53
閱讀 1789·2019-08-23 18:35
閱讀 795·2019-08-23 17:16