摘要:能夠異步的執行任務,并且通常管理一個線程池。這樣我們就不用手動的去創建線程了,線程池中的所有線程都將被重用。在之后不能再提交任務到線程池。它不使用固定大小的線程池,默認情況下是主機的可用內核數。
原文地址: Java 8 Concurrency Tutorial: Threads and Executors
Java 5 初次引入了Concurrency API,并在隨后的發布版本中不斷優化和改進。這篇文章的大部分概念也適用于老的版本。我的代碼示例主要聚焦在Java 8上,并大量適用 lambda 表達式和一些新特性。如果你還不熟悉 lambda 表達式,建議先閱讀 Java 8 Tutorial。
Threads 和 Runnables所有現代操作系統都是通過進程和線程來支持并發的。進程通常是相互獨立運行的程序實例。例如,你啟動一個 Java 程序,操作系統會產生一個新的進程和其他程序并行運行。在這些進程中可以利用線程同時執行代碼。這樣我們就可以充分利用 CPU。
Java 從 JDK 1.0 開始就支持線程。在開始一個新線程之前,必須先指定運行的代碼,通常稱為 Task。下面是通過實現 Runnable 接口來啟動一個新線程的例子:
Runnable task = () -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName); }; task.run(); Thread thread = new Thread(task); thread.start(); System.out.println("Done!");
由于 Runnable 是一個函數式接口,我們可以使用 lambda 表達式來打印線程的名字到控制臺。我們直接在主線程上執行Runnable,然后開始一個新線程。在控制臺你將看到這樣的結果:
Hello main Hello Thread-0 Done!
或者:
Hello main Done! Hello Thread-0
由于是并發執行,我們無法預測 Runnable 是在打印 Done 之前還是之后調用,順序不是不確定的,因此并發編程成為大型應用程序開發中一項復雜的任務。
線程也可以休眠一段時間,例如下面的例子:
Runnable runnable = () -> { try { String name = Thread.currentThread().getName(); System.out.println("Foo " + name); TimeUnit.SECONDS.sleep(1); System.out.println("Bar " + name); } catch (InterruptedException e) { e.printStackTrace(); } }; Thread thread = new Thread(runnable); thread.start();
執行上面的代碼會在兩個打印語句之間停留1秒鐘。TimeUnit 是一個時間單位的枚舉,或者可以通過調用 Thread.sleep(1000) 實現。
使用 Thread 類可能非常繁瑣且容易出錯。由于這個原因,在2004年,Java 5版本引入了 Concurrency API。API 位于 java.util.concurrent 包下,包含了許多有用的有關并發編程的類。從那時起,每個新發布的 Java 版本都增加了并發 API,Java 8 也提供了新的類和方法來處理并發。
現在我們來深入了解一下Concurrency API中最重要的部分 - executor services。
ExecutorsConcurrency API 引入了 ExecutorService 的概念,作為處理線程的高級別方式用來替代 Threads。 Executors 能夠異步的執行任務,并且通常管理一個線程池。這樣我們就不用手動的去創建線程了,線程池中的所有線程都將被重用。從而可以在一個
executor service 的整個應用程序生命周期中運行盡可能多的并發任務。
下面是一個簡單的 executors 例子:
ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName); }); // => Hello pool-1-thread-1
Executors 類提供了方便的工廠方法來創建不同類型的 executor services 。在這個例子中使用了只執行一個線程的 executor。
執行結果看起來和上面的示例類似,但是你會注意到一個重要區別:Java 進程永遠不會停止,執行者必須明確的停止它,否則它會不斷的接受新的任務。
ExecutorService 為此提供了兩種方法:shutdown() 等待當前任務執行完畢,而 shutdownNow() 則中斷所有正在執行的任務,并立即關閉執行程序。在 shudown 之后不能再提交任務到線程池。
下面是我關閉程序的首選方式:
try { System.out.println("attempt to shutdown executor"); executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { System.err.println("tasks interrupted"); } finally { if (!executor.isTerminated()) { System.err.println("cancel non-finished tasks"); } executor.shutdownNow(); System.out.println("shutdown finished"); }
執行者調用 shutdown 關閉 executor,在等待 5 秒鐘鐘后,不管任務有沒有執行完畢都調用 shutdownNow 中斷正在執行的任務而關閉。
Callables 和 Futures除了 Runnable 以外,executors 還支持 Callable 任務,和 Runnable 一樣是一個函數式接口,但它是有返回值的。
下面是一個使用 lambda 表達式定義的 Callable ,在睡眠 1 秒后返回一個整形值。
Callabletask = () -> { try { TimeUnit.SECONDS.sleep(1); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); } };
和 Runnable 一樣,Callable 也可以提交到 executor services,但是執行的結果是什么?由于 submit() 不等待任務執行完成,executor service 不能直接返回調用的結果。相對應的,它返回一個 Future 類型的結果,使用 Future 可以檢索實際執行結果。
ExecutorService executor = Executors.newFixedThreadPool(1); Futurefuture = executor.submit(task); System.out.println("future done? " + future.isDone()); Integer result = future.get(); System.out.println("future done? " + future.isDone()); System.out.print("result: " + result);
在將 Callable 提交給 executor 后,首先通過 isDone() 來檢查 future 是否執行完畢。我敢肯定,情況并非如此,因為上面的調用在返回整數之前睡眠了 1 秒鐘。
調用方法 get() 會阻塞當前線程,直到 callable 執行完成返回結果,現在 future 執行完成,并在控制臺輸出下面的結果:
future done? false future done? true result: 123
Future 與 executor service 緊密結合,如果關閉 executor service, 每個 Future 都會拋出異常。
executor.shutdownNow(); future.get();
這里創建 executor 的方式與前面的示例不同,這里使用 newFixedThreadPool(1) 來創建一個線程數量為 1 的線程池來支持 executor, 這相當于 newSingleThreadExecutor() ,稍后我們我們會通過傳遞一個大于 1 的值來增加線程池的大小。
Timeouts任何對 future.get()的調用都會阻塞并等待 Callable 被終止。 在最壞的情況下,一個可調用函數將永遠運行,從而使應用程序無法響應。可以簡單地通過超時來抵消這些情況:
ExecutorService executor = Executors.newFixedThreadPool(1); Futurefuture = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(2); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); } }); future.get(1, TimeUnit.SECONDS);
執行上面的代碼會拋出 TimeoutException
Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205)
指定了 1 秒鐘的最長等待時間,但是在返回結果之前,可調用事實上需要 2 秒鐘的時間。
InvokeAllExecutors 支持通過 invokeAll() 批量提交多個 Callable 。這個方法接受一個 Callable 類型集合的參數,并返回一個 Future 類型的 List。
ExecutorService executor = Executors.newWorkStealingPool(); List> callables = Arrays.asList( () -> "task1", () -> "task2", () -> "task3"); executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(System.out::println);
在這個例子中,我們利用 Java 8 的流來處理 invokeAll 調用返回的所有 Future。 我們首先映射每個 Future 的返回值,然后將每個值打印到控制臺。 如果還不熟悉流,請閱讀Java 8 Stream Tutorial。
InvokeAny批量提交可調用的另一種方法是 invokeAny(),它與 invokeAll() 略有不同。 該方法不會返回所有的 Future 對象,它只返回第一個執行完畢任務的結果。
Callablecallable(String result, long sleepSeconds) { return () -> { TimeUnit.SECONDS.sleep(sleepSeconds); return result; }; }
我們使用這種方法來創建一個有三個不同睡眠時間的 Callable。 通過 invokeAny()將這些可調用對象提交給 executor ,返回最快執行完畢結果,在這種情況下,task2:
ExecutorService executor = Executors.newWorkStealingPool(); List> callables = Arrays.asList( callable("task1", 2), callable("task2", 1), callable("task3", 3)); String result = executor.invokeAny(callables); System.out.println(result); // => task2
上面的例子使用通過 newWorkStealingPool() 創建的另一種類型的 executor。 這個工廠方法是 Java 8 的一部分,并且返回一個類型為 ForkJoinPool的 executor,它與正常的 executor 略有不同。 它不使用固定大小的線程池,默認情況下是主機CPU的可用內核數。
Scheduled Executors我們已經學會了如何在 Executors 上提交和運行任務。 為了多次定期運行任務,我們可以使用 scheduled thread pools。
ScheduledExecutorService 能夠安排任務定期運行或在一段時間過后運行一次。
下面代碼示例一個任務在三秒鐘后運行:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime()); ScheduledFuture> future = executor.schedule(task, 3, TimeUnit.SECONDS); TimeUnit.MILLISECONDS.sleep(1337); long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS); System.out.printf("Remaining Delay: %sms", remainingDelay);
調度任務產生一個類型為 ScheduledFuture的值,除了 Future 之外,它還提供getDelay() 方法來檢索任務執行的剩余時間。
為了定時執行的任務,executor 提供了兩個方法 scheduleAtFixedRate() 和
scheduleWithFixedDelay() 。 第一種方法能夠執行具有固定時間間隔的任務,例如, 每秒一次:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime()); int initialDelay = 0; int period = 1; executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
此外,此方法還可以設置延遲時間,該延遲描述了首次執行任務之前的等待時間。
scheduleWithFixedDelay() 方法與 scheduleAtFixedRate() 略有不同,不同之處是它們的等待時間,scheduleWithFixedDelay() 的等待時間是在上一個任務結束和下一個任務開始之間施加的。
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("Scheduling: " + System.nanoTime()); } catch (InterruptedException e) { System.err.println("task interrupted"); } }; executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
本示例在執行結束和下一次執行開始之間延遲 1 秒。 初始延遲為 0,任務持續時間為 2 秒。 所以我們結束了一個0s,3s,6s,9s等的執行間隔。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70831.html
摘要:在這個示例中我們使用了一個單線程線程池的。在延遲消逝后,任務將會并發執行。這是并發系列教程的第一部分。第一部分線程和執行器第二部分同步和鎖第三部分原子操作和 Java 8 并發教程:線程和執行器 原文:Java 8 Concurrency Tutorial: Threads and Executors 譯者:BlankKelly 來源:Java8并發教程:Threads和Execut...
摘要:前言在上一篇文章中多線程奇幻之旅算法實現線程安全,我們介紹了和方式實現線程安全類的方法,兩種方式一個是鎖定阻塞方式,一個是非阻塞方式。 前言 在上一篇文章中《Java多線程奇幻之旅——CAS算法實現線程安全》,我們介紹了Synchronized和CAS方式實現線程安全類的方法,兩種方式一個是鎖定阻塞方式,一個是非阻塞方式。本文專注于兩種實現方式效率問題。本文是上篇文章的延續,會借用到上...
摘要:無限期等待另一個線程執行特定操作。線程安全基本版請說明以及的區別值都不能為空數組結構上,通過數組和鏈表實現。優先考慮響應中斷,而不是響應鎖的普通獲取或重入獲取。只是在最后獲取鎖成功后再把當前線程置為狀態然后再中斷線程。 前段時間在慕課網直播上聽小馬哥面試勸退(面試虐我千百遍,Java 并發真討厭),發現講得東西比自己拿到offer還要高興,于是自己在線下做了一點小筆記,供各位參考。 課...
摘要:序本文主要研究一下的這里如果的為,則會創建這里如果是的話,參數傳遞的是如果是同步的方法,則傳的值是這里創建了一個,然后調用,這里使用了可以看到這里使用的是的方法注意這個方法是才有的,也是在這里使用的由于默認是使用創建的, 序 本文主要研究一下jdk httpclient的executor HttpClientImpl java.net.http/jdk/internal/net/htt...
摘要:用于限定中線程數的最大值。該線程池中的任務隊列維護著等待執行的對象。線程池和消息隊列筆者在實際工程應用中,使用過多線程和消息隊列處理過異步任務。以上是筆者在學習實踐之后對于多線程和消息隊列的粗淺認識,初學者切莫混淆兩者的作用。 多線程編程很難,難點在于多線程代碼的執行不是按照我們直覺上的執行順序。所以多線程編程必須要建立起一個宏觀的認識。 線程池是多線程編程中的一個重要概念。為了能夠更...
閱讀 4028·2021-11-22 13:53
閱讀 3625·2021-11-19 11:29
閱讀 1275·2021-09-08 09:35
閱讀 3171·2020-12-03 17:26
閱讀 517·2019-08-29 16:06
閱讀 2111·2019-08-26 13:50
閱讀 1187·2019-08-23 18:32
閱讀 2155·2019-08-23 18:12