摘要:在有些情況下,原子操作可以在不使用關鍵字和鎖的情況下解決多線程安全問題。但其內部的結果不是一個單一的值這個類的內部維護了一組變量來減少多線程的爭用。當來自多線程的更新比讀取更頻繁時這個類往往優于其他的原子類。
原文地址: Java 8 Concurrency Tutorial: Atomic Variables and ConcurrentMapAtomicInteger
java.concurrent.atomic 包下有很多原子操作的類。 在有些情況下,原子操作可以在不使用 synchronized 關鍵字和鎖的情況下解決多線程安全問題。
在內部,原子類大量使用 CAS, 這是大多數現在 CPU 支持的原子操作指令, 這些指令通常情況下比鎖同步要快得多。如果需要同時改變一個變量, 使用原子類是極其優雅的。
現在選擇一個原子類 AtomicInteger 作為例子
AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> executor.submit(atomicInt::incrementAndGet)); stop(executor); System.out.println(atomicInt.get()); // => 1000
使用 AtomicInteger 代替 Integer 可以在線程安全的環境中增加變量, 而不要同步訪問變量。incrementAndGet() 方法是一個原子操作, 我們可以在多線程中安全的調用。
AtomicInteger 支持多種的原子操作, updateAndGet() 方法接受一個 lambda 表達式,以便對整數做任何的算術運算。
AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> { Runnable task = () -> atomicInt.updateAndGet(n -> n + 2); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 2000
accumulateAndGet() 方法接受一個 IntBinaryOperator 類型的另一種 lambda 表達式, 我們是用這種方法來計算 1 -- 999 的和:
AtomicInteger atomicInt = new AtomicInteger(0); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> { Runnable task = () -> atomicInt.accumulateAndGet(i, (n, m) -> n + m); executor.submit(task); }); stop(executor); System.out.println(atomicInt.get()); // => 499500
還有一些其他的原子操作類: AtomicBoolean AtomicLong AtomicReference
LongAdder作為 AtomicLong 的替代, LongAdder 類可以用來連續地向數字添加值。
ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 1000) .forEach(i -> executor.submit(adder::increment)); stop(executor); System.out.println(adder.sumThenReset()); // => 1000
LongAdder 類和其他的整數原子操作類一樣提供了 add() 和 increment() 方法, 同時也是線程安全的。但其內部的結果不是一個單一的值, 這個類的內部維護了一組變量來減少多線程的爭用。實際結果可以通過調用 sum() 和 sumThenReset() 來獲取。
當來自多線程的更新比讀取更頻繁時, 這個類往往優于其他的原子類。通常作為統計數據, 比如要統計 web 服務器的請求數量。 LongAdder 的缺點是會消耗更多的內存, 因為有一組變量保存在內存中。
LongAccumulatorLongAccumulator 是 LongAdder 的一個更通用的版本。它不是執行簡單的添加操作, 類 LongAccumulator 圍繞 LongBinaryOperator 類型的lambda表達式構建,如代碼示例中所示:
LongBinaryOperator op = (x, y) -> 2 * x + y; LongAccumulator accumulator = new LongAccumulator(op, 1L); ExecutorService executor = Executors.newFixedThreadPool(2); IntStream.range(0, 10) .forEach(i -> executor.submit(() -> accumulator.accumulate(i))); stop(executor); System.out.println(accumulator.getThenReset()); // => 2539
我們使用函數 2 * x + y 和初始值1創建一個 LongAccumulator。 每次調用 accumulate(i) , 當前結果和值i都作為參數傳遞給`lambda 表達式。
像 LongAdder 一樣, LongAccumulator 在內部維護一組變量以減少對線程的爭用。
ConcurrentMapConcurrentMap 接口擴展了 Map 接口,并定義了最有用的并發集合類型之一。 Java 8 通過向此接口添加新方法引入了函數式編程。
在下面的代碼片段中, 來演示這些新的方法:
ConcurrentMapmap = new ConcurrentHashMap<>(); map.put("foo", "bar"); map.put("han", "solo"); map.put("r2", "d2"); map.put("c3", "p0");
forEach() 接受一個類型為 BiConsumer 的 lambda 表達式, 并將 map 的 key 和 value 作為參數傳遞。
map.forEach((key, value) -> System.out.printf("%s = %s ", key, value));
putIfAbsent() 方法只有當給定的 key 不存在時才將數據存入 map 中, 這個方法和 put 一樣是線程安全的, 當多個線程訪問 map 時不要做同步操作。
String value = map.putIfAbsent("c3", "p1"); System.out.println(value); // p0
getOrDefault() 方法返回給定 key 的 value, 當 key 不存在時返回給定的值。
String value = map.getOrDefault("hi", "there"); System.out.println(value); // there
replaceAll() 方法接受一個 BiFunction 類型的 lambda 表達式, 并將 key 和 value 作為參數傳遞,用來更新 value。
map.replaceAll((key, value) -> "r2".equals(key) ? "d3" : value); System.out.println(map.get("r2")); // d3
compute() 方法和 replaceAll() 方法有些相同, 不同的是它多一個參數, 用來更新指定 key 的 value
map.compute("foo", (key, value) -> value + value); System.out.println(map.get("foo")); // barbarConcurrentHashMap
以上所有方法都是 ConcurrentMap 接口的一部分,因此可用于該接口的所有實現。 此外,最重要的實現 ConcurrentHashMap 已經進一步增強了一些新的方法來在 Map 上執行并發操作。
就像并行流一樣,這些方法在 Java 8 中通過 ForkJoinPool.commonPool()提供特殊的 ForkJoinPool 。該池使用預設的并行性, 這取決于可用內核的數量。 我的機器上有四個CPU內核可以實現三種并行性:
System.out.println(ForkJoinPool.getCommonPoolParallelism()); // 3
通過設置以下 JVM 參數可以減少或增加此值:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
我們使用相同的示例來演示, 不過下面使用 ConcurrentHashMap 類型, 這樣可以調用更多的方法。
ConcurrentHashMapmap = new ConcurrentHashMap<>(); map.put("foo", "bar"); map.put("han", "solo"); map.put("r2", "d2"); map.put("c3", "p0");
Java 8 引入了三種并行操作:forEach, search 和 reduce。 每個操作都有四種形式, 分別用 key, value, entries和 key-value 來作為參數。
所有這些方法的第一個參數都是 parallelismThreshold 閥值。 該閾值表示操作并行執行時的最小收集大小。 例如, 如果傳遞的閾值為500,并且 map 的實際大小為499, 則操作將在單個線程上按順序執行。 在下面的例子中,我們使用一個閾值來強制并行操作。
ForEach方法 forEach() 能夠并行地迭代 map 的鍵值對。 BiConsumer 類型的 lambda 表達式接受當前迭代的 key 和 value。 為了可視化并行執行,我們將當前線程名稱打印到控制臺。 請記住,在我的情況下,底層的 ForkJoinPool 最多使用三個線程。
map.forEach(1, (key, value) -> System.out.printf("key: %s; value: %s; thread: %s ", key, value, Thread.currentThread().getName())); // key: r2; value: d2; thread: main // key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1 // key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2 // key: c3; value: p0; thread: mainSearch
search() 方法接受一個 BiFunction 類型的 lambda 表達式, 它能對 map 做搜索操作, 如果當前迭代不符合所需的搜索條件,則返回 null。 請記住,ConcurrentHashMap 是無序的。 搜索功能不應該取決于地圖的實際處理順序。 如果有多個匹配結果, 則結果可能是不確定的。
String result = map.search(1, (key, value) -> { System.out.println(Thread.currentThread().getName()); if ("foo".equals(key)) { return value; } return null; }); System.out.println("Result: " + result); // ForkJoinPool.commonPool-worker-2 // main // ForkJoinPool.commonPool-worker-3 // Result: bar
下面是對 value 的搜索
String result = map.searchValues(1, value -> { System.out.println(Thread.currentThread().getName()); if (value.length() > 3) { return value; } return null; }); System.out.println("Result: " + result); // ForkJoinPool.commonPool-worker-2 // main // main // ForkJoinPool.commonPool-worker-1 // Result: soloReduce
reduce() 方法接受兩個類型為 BiFunction 的 lambda 表達式。 第一個函數將每個鍵值對轉換為任何類型的單個值。 第二個函數將所有這些轉換后的值組合成一個結果, 其中火忽略 null 值。
String result = map.reduce(1, (key, value) -> { System.out.println("Transform: " + Thread.currentThread().getName()); return key + "=" + value; }, (s1, s2) -> { System.out.println("Reduce: " + Thread.currentThread().getName()); return s1 + ", " + s2; }); System.out.println("Result: " + result); // Transform: ForkJoinPool.commonPool-worker-2 // Transform: main // Transform: ForkJoinPool.commonPool-worker-3 // Reduce: ForkJoinPool.commonPool-worker-3 // Transform: main // Reduce: main // Reduce: main // Result: r2=d2, c3=p0, han=solo, foo=bar
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68434.html
摘要:并發教程原子變量和原文譯者飛龍協議歡迎閱讀我的多線程編程系列教程的第三部分。如果你能夠在多線程中同時且安全地執行某個操作,而不需要關鍵字或上一章中的鎖,那么這個操作就是原子的。當多線程的更新比讀取更頻繁時,這個類通常比原子數值類性能更好。 Java 8 并發教程:原子變量和 ConcurrentMap 原文:Java 8 Concurrency Tutorial: Synchroni...
高級并發對象 到目前為止,本課程重點關注從一開始就是Java平臺一部分的低級別API,這些API適用于非常基礎的任務,但更高級的任務需要更高級別的構建塊,對于充分利用當今多處理器和多核系統的大規模并發應用程序尤其如此。 在本節中,我們將介紹Java平臺5.0版中引入的一些高級并發功能,大多數這些功能都在新的java.util.concurrent包中實現,Java集合框架中還有新的并發數據結構。 ...
摘要:在接下來的分鐘,你將會學會如何通過同步關鍵字,鎖和信號量來同步訪問共享可變變量。所以在使用樂觀鎖時,你需要每次在訪問任何共享可變變量之后都要檢查鎖,來確保讀鎖仍然有效。 原文:Java 8 Concurrency Tutorial: Synchronization and Locks譯者:飛龍 協議:CC BY-NC-SA 4.0 歡迎閱讀我的Java8并發教程的第二部分。這份指南將...
摘要:進程線程與協程它們都是并行機制的解決方案。選擇是任意性的,并在對實現做出決定時發生。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。此線程池支持定時以及周期性執行任務的需求。 并發與并行的概念 并發(Concurrency): 問題域中的概念—— 程序需要被設計成能夠處理多個同時(或者幾乎同時)發生的事件 并行(Parallel...
摘要:在這個示例中我們使用了一個單線程線程池的。在延遲消逝后,任務將會并發執行。這是并發系列教程的第一部分。第一部分線程和執行器第二部分同步和鎖第三部分原子操作和 Java 8 并發教程:線程和執行器 原文:Java 8 Concurrency Tutorial: Threads and Executors 譯者:BlankKelly 來源:Java8并發教程:Threads和Execut...
閱讀 2311·2021-10-11 10:59
閱讀 2602·2021-10-11 10:58
閱讀 3304·2021-09-08 09:35
閱讀 3783·2021-09-02 15:21
閱讀 1455·2019-08-30 15:53
閱讀 2608·2019-08-29 14:16
閱讀 2068·2019-08-26 14:00
閱讀 2942·2019-08-26 13:52