摘要:在這個示例中我們使用了一個單線程線程池的。在延遲消逝后,任務將會并發執行。這是并發系列教程的第一部分。第一部分線程和執行器第二部分同步和鎖第三部分原子操作和
Java 8 并發教程:線程和執行器
原文:Java 8 Concurrency Tutorial: Threads and Executors
譯者:BlankKelly
來源:Java8并發教程:Threads和Executors
歡迎閱讀我的Java8并發教程的第一部分。這份指南將會以簡單易懂的代碼示例來教給你如何在Java8中進行并發編程。這是一系列教程中的第一部分。在接下來的15分鐘,你將會學會如何通過線程,任務(tasks)和 exector services來并行執行代碼。
第一部分:線程和執行器
第二部分:同步和鎖
第三部分:原子操作和 ConcurrentMap
并發在Java5中首次被引入并在后續的版本中不斷得到增強。在這篇文章中介紹的大部分概念同樣適用于以前的Java版本。不過我的代碼示例聚焦于Java8,大量使用lambda表達式和其他新特性。如果你對lambda表達式不屬性,我推薦你首先閱讀我的Java 8 教程。
Thread 和 Runnable所有的現代操作系統都通過進程和線程來支持并發。進程是通常彼此獨立運行的程序的實例,比如,如果你啟動了一個Java程序,操作系統產生一個新的進程,與其他程序一起并行執行。在這些進程的內部,我們使用線程并發執行代碼,因此,我們可以最大限度的利用CPU可用的核心(core)。
Java從JDK1.0開始執行線程。在開始一個新的線程之前,你必須指定由這個線程執行的代碼,通常稱為task。這可以通過實現Runnable——一個定義了一個無返回值無參數的run()方法的函數接口,如下面的代碼所示:
Runnable task = () -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName); }; task.run(); Thread thread = new Thread(task); thread.start(); System.out.println("Done!");
因為Runnable是一個函數接口,所以我們利用lambda表達式將當前的線程名打印到控制臺。首先,在開始一個線程前我們在主線程中直接運行runnable。
控制臺輸出的結果可能像下面這樣:
Hello main Hello Thread-0 Done!
或者這樣:
Hello main Done! Hello Thread-0
由于我們不能預測這個runnable是在打印"done"前執行還是在之后執行。順序是不確定的,因此在大的程序中編寫并發程序是一個復雜的任務。
我們可以將線程休眠確定的時間。在這篇文章接下來的代碼示例中我們可以通過這種方法來模擬長時間運行的任務。
Runnable runnable = () -> { try { String name = Thread.currentThread().getName(); System.out.println("Foo " + name); TimeUnit.SECONDS.sleep(1); System.out.println("Bar " + name); } catch (InterruptedException e) { e.printStackTrace(); } }; Thread thread = new Thread(runnable); thread.start();
當你運行上面的代碼時,你會注意到在第一條打印語句和第二條打印語句之間存在一分鐘的延遲。TimeUnit在處理單位時間時一個有用的枚舉類。你可以通過調用Thread.sleep(1000)來達到同樣的目的。
使用Thread類是很單調的且容易出錯。由于并發API在2004年Java5發布的時候才被引入。這些API位于java.util.concurrent包下,包含很多處理并發編程的有用的類。自從這些并發API引入以來,在隨后的新的Java版本發布過程中得到不斷的增強,甚至Java8提供了新的類和方法來處理并發。
接下來,讓我們走進并發API中最重要的一部——executor services。
Executor并發API引入了ExecutorService作為一個在程序中直接使用Thread的高層次的替換方案。Executos支持運行異步任務,通常管理一個線程池,這樣一來我們就不需要手動去創建新的線程。在不斷地處理任務的過程中,線程池內部線程將會得到復用,因此,在我們可以使用一個executor service來運行和我們想在我們整個程序中執行的一樣多的并發任務。
下面是使用executors的第一個代碼示例:
ExecutorService executor = Executors.newSingleThreadExecutor(); executor.submit(() -> { String threadName = Thread.currentThread().getName(); System.out.println("Hello " + threadName); }); // => Hello pool-1-thread-1
Executors類提供了便利的工廠方法來創建不同類型的 executor services。在這個示例中我們使用了一個單線程線程池的 executor。
代碼運行的結果類似于上一個示例,但是當運行代碼時,你會注意到一個很大的差別:Java進程從沒有停止!Executors必須顯式的停止-否則它們將持續監聽新的任務。
ExecutorService提供了兩個方法來達到這個目的——shutdwon()會等待正在執行的任務執行完而shutdownNow()會終止所有正在執行的任務并立即關閉execuotr。
這是我喜歡的通常關閉executors的方式:
try { System.out.println("attempt to shutdown executor"); executor.shutdown(); executor.awaitTermination(5, TimeUnit.SECONDS); } catch (InterruptedException e) { System.err.println("tasks interrupted"); } finally { if (!executor.isTerminated()) { System.err.println("cancel non-finished tasks"); } executor.shutdownNow(); System.out.println("shutdown finished"); }
executor通過等待指定的時間讓當前執行的任務終止來“溫柔的”關閉executor。在等待最長5分鐘的時間后,execuote最終會通過中斷所有的正在執行的任務關閉。
Callable 和 Future除了Runnable,executor還支持另一種類型的任務——Callable。Callables也是類似于runnables的函數接口,不同之處在于,Callable返回一個值。
下面的lambda表達式定義了一個callable:在休眠一分鐘后返回一個整數。
Callabletask = () -> { try { TimeUnit.SECONDS.sleep(1); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); } };
Callbale也可以像runnbales一樣提交給 executor services。但是callables的結果怎么辦?因為submit()不會等待任務完成,executor service不能直接返回callable的結果。不過,executor 可以返回一個Future類型的結果,它可以用來在稍后某個時間取出實際的結果。
ExecutorService executor = Executors.newFixedThreadPool(1); Futurefuture = executor.submit(task); System.out.println("future done? " + future.isDone()); Integer result = future.get(); System.out.println("future done? " + future.isDone()); System.out.print("result: " + result);
在將callable提交給exector之后,我們先通過調用isDone()來檢查這個future是否已經完成執行。我十分確定這會發生什么,因為在返回那個整數之前callable會休眠一分鐘、
在調用get()方法時,當前線程會阻塞等待,直到callable在返回實際的結果123之前執行完成?,F在future執行完畢,我們可以在控制臺看到如下的結果:
future done? false future done? true result: 123
Future與底層的executor service緊密的結合在一起。記住,如果你關閉executor,所有的未中止的future都會拋出異常。
executor.shutdownNow(); future.get();
你可能注意到我們這次創建executor的方式與上一個例子稍有不同。我們使用newFixedThreadPool(1)來創建一個單線程線程池的 execuot service。
這等同于使用newSingleThreadExecutor不過使用第二種方式我們可以稍后通過簡單的傳入一個比1大的值來增加線程池的大小。
任何future.get()調用都會阻塞,然后等待直到callable中止。在最糟糕的情況下,一個callable持續運行——因此使你的程序將沒有響應。我們可以簡單的傳入一個時長來避免這種情況。
ExecutorService executor = Executors.newFixedThreadPool(1); Futurefuture = executor.submit(() -> { try { TimeUnit.SECONDS.sleep(2); return 123; } catch (InterruptedException e) { throw new IllegalStateException("task interrupted", e); } }); future.get(1, TimeUnit.SECONDS);
運行上面的代碼將會產生一個TimeoutException:
Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205)
你可能已經猜到俄為什么會排除這個異常。我們指定的最長等待時間為1分鐘,而這個callable在返回結果之前實際需要兩分鐘。
invokeAllExecutors支持通過invokeAll()一次批量提交多個callable。這個方法結果一個callable的集合,然后返回一個future的列表。
ExecutorService executor = Executors.newWorkStealingPool(); List> callables = Arrays.asList( () -> "task1", () -> "task2", () -> "task3"); executor.invokeAll(callables) .stream() .map(future -> { try { return future.get(); } catch (Exception e) { throw new IllegalStateException(e); } }) .forEach(System.out::println);
在這個例子中,我們利用Java8中的函數流(stream)來處理invokeAll()調用返回的所有future。我們首先將每一個future映射到它的返回值,然后將每個值打印到控制臺。如果你還不屬性stream,可以閱讀我的Java8 Stream 教程。
invokeAny批量提交callable的另一種方式就是invokeAny(),它的工作方式與invokeAll()稍有不同。在等待future對象的過程中,這個方法將會阻塞直到第一個callable中止然后返回這一個callable的結果。
為了測試這種行為,我們利用這個幫助方法來模擬不同執行時間的callable。這個方法返回一個callable,這個callable休眠指定 的時間直到返回給定的結果。
Callablecallable(String result, long sleepSeconds) { return () -> { TimeUnit.SECONDS.sleep(sleepSeconds); return result; }; }
我們利用這個方法創建一組callable,這些callable擁有不同的執行時間,從1分鐘到3分鐘。通過invokeAny()將這些callable提交給一個executor,返回最快的callable的字符串結果-在這個例子中為任務2:
ExecutorService executor = Executors.newWorkStealingPool(); List> callables = Arrays.asList( callable("task1", 2), callable("task2", 1), callable("task3", 3)); String result = executor.invokeAny(callables); System.out.println(result); // => task2
上面這個例子又使用了另一種方式來創建executor——調用newWorkStealingPool()。這個工廠方法是Java8引入的,返回一個ForkJoinPool類型的 executor,它的工作方法與其他常見的execuotr稍有不同。與使用一個固定大小的線程池不同,ForkJoinPools使用一個并行因子數來創建,默認值為主機CPU的可用核心數。
ForkJoinPools 在Java7時引入,將會在這個系列后面的教程中詳細講解。讓我們深入了解一下 scheduled executors 來結束本次教程。
ScheduledExecutor我們已經學習了如何在一個 executor 中提交和運行一次任務。為了持續的多次執行常見的任務,我們可以利用調度線程池。
ScheduledExecutorService支持任務調度,持續執行或者延遲一段時間后執行。
下面的實例,調度一個任務在延遲3分鐘后執行:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime()); ScheduledFuture> future = executor.schedule(task, 3, TimeUnit.SECONDS); TimeUnit.MILLISECONDS.sleep(1337); long remainingDelay = future.getDelay(TimeUnit.MILLISECONDS); System.out.printf("Remaining Delay: %sms", remainingDelay);
調度一個任務將會產生一個專門的future類型——ScheduleFuture,它除了提供了Future的所有方法之外,他還提供了getDelay()方法來獲得剩余的延遲。在延遲消逝后,任務將會并發執行。
為了調度任務持續的執行,executors 提供了兩個方法scheduleAtFixedRate()和scheduleWithFixedDelay()。第一個方法用來以固定頻率來執行一個任務,比如,下面這個示例中,每分鐘一次:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> System.out.println("Scheduling: " + System.nanoTime()); int initialDelay = 0; int period = 1; executor.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.SECONDS);
另外,這個方法還接收一個初始化延遲,用來指定這個任務首次被執行等待的時長。
請記住:scheduleAtFixedRate()并不考慮任務的實際用時。所以,如果你指定了一個period為1分鐘而任務需要執行2分鐘,那么線程池為了性能會更快的執行。
在這種情況下,你應該考慮使用scheduleWithFixedDelay()。這個方法的工作方式與上我們上面描述的類似。不同之處在于等待時間 period 的應用是在一次任務的結束和下一個任務的開始之間。例如:
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); Runnable task = () -> { try { TimeUnit.SECONDS.sleep(2); System.out.println("Scheduling: " + System.nanoTime()); } catch (InterruptedException e) { System.err.println("task interrupted"); } }; executor.scheduleWithFixedDelay(task, 0, 1, TimeUnit.SECONDS);
這個例子調度了一個任務,并在一次執行的結束和下一次執行的開始之間設置了一個1分鐘的固定延遲。初始化延遲為0,任務執行時間為0。所以我們分別在0s,3s,6s,9s等間隔處結束一次執行。如你所見,scheduleWithFixedDelay()在你不能預測調度任務的執行時長時是很有用的。
這是并發系列教程的第一部分。我推薦你親手實踐一下上面的代碼示例。你可以從 Github 上找到這篇文章中所有的代碼示例,所以歡迎你fork這個倉庫,并收藏它。
我希望你會喜歡這篇文章。如果你有任何的問題都可以在下面評論或者通過 Twitter 向我反饋。
第一部分:線程和執行器
第二部分:同步和鎖
第三部分:原子操作和 ConcurrentMap
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/64965.html
摘要:在接下來的分鐘,你將會學會如何通過同步關鍵字,鎖和信號量來同步訪問共享可變變量。所以在使用樂觀鎖時,你需要每次在訪問任何共享可變變量之后都要檢查鎖,來確保讀鎖仍然有效。 原文:Java 8 Concurrency Tutorial: Synchronization and Locks譯者:飛龍 協議:CC BY-NC-SA 4.0 歡迎閱讀我的Java8并發教程的第二部分。這份指南將...
摘要:并發教程原子變量和原文譯者飛龍協議歡迎閱讀我的多線程編程系列教程的第三部分。如果你能夠在多線程中同時且安全地執行某個操作,而不需要關鍵字或上一章中的鎖,那么這個操作就是原子的。當多線程的更新比讀取更頻繁時,這個類通常比原子數值類性能更好。 Java 8 并發教程:原子變量和 ConcurrentMap 原文:Java 8 Concurrency Tutorial: Synchroni...
以下是Java技術棧微信公眾號發布的關于 Java 的技術干貨,從以下幾個方面匯總。 Java 基礎篇 Java 集合篇 Java 多線程篇 Java JVM篇 Java 進階篇 Java 新特性篇 Java 工具篇 Java 書籍篇 Java基礎篇 8張圖帶你輕松溫習 Java 知識 Java父類強制轉換子類原則 一張圖搞清楚 Java 異常機制 通用唯一標識碼UUID的介紹及使用 字符串...
并發 計算機用戶想當然地認為他們的系統一次可以做不止一件事,他們設想他們可以繼續在文字處理器中工作,而其他應用程序則下載文件、管理打印隊列和流音頻,即使是單個應用程序通常也希望一次完成多個任務。例如,流式音頻應用程序必須同時從網絡上讀取數字音頻、解壓縮、管理回放并更新其顯示,甚至文字處理器應始終準備好響應鍵盤和鼠標事件,無論重新格式化文本或更新顯示有多繁忙,可以執行此類操作的軟件稱為并發軟件。 J...
閱讀 3066·2023-04-25 18:54
閱讀 2591·2021-11-02 14:40
閱讀 3176·2021-09-23 11:58
閱讀 2424·2019-08-30 13:50
閱讀 1231·2019-08-29 12:46
閱讀 3117·2019-08-28 17:51
閱讀 678·2019-08-26 11:47
閱讀 897·2019-08-23 16:17