摘要:前言開發中不免會遇到需要所有子線程執行完畢通知主線程處理某些邏輯的場景。可以采用中斷線程的方式來通信,調用了方法其實就是將中的一個標志屬性置為了。實際開發中可以靈活根據需求選擇最適合的線程通信方式。
前言
開發中不免會遇到需要所有子線程執行完畢通知主線程處理某些邏輯的場景。
或者是線程 A 在執行到某個條件通知線程 B 執行某個操作。
可以通過以下幾種方式實現:
等待通知機制等待通知模式是 Java 中比較經典的線程通信方式。
兩個線程通過對同一對象調用等待 wait() 和通知 notify() 方法來進行通訊。
如兩個線程交替打印奇偶數:
public class TwoThreadWaitNotify { private int start = 1; private boolean flag = false; public static void main(String[] args) { TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify(); Thread t1 = new Thread(new OuNum(twoThread)); t1.setName("A"); Thread t2 = new Thread(new JiNum(twoThread)); t2.setName("B"); t1.start(); t2.start(); } /** * 偶數線程 */ public static class OuNum implements Runnable { private TwoThreadWaitNotify number; public OuNum(TwoThreadWaitNotify number) { this.number = number; } @Override public void run() { while (number.start <= 100) { synchronized (TwoThreadWaitNotify.class) { System.out.println("偶數線程搶到鎖了"); if (number.flag) { System.out.println(Thread.currentThread().getName() + "+-+偶數" + number.start); number.start++; number.flag = false; TwoThreadWaitNotify.class.notify(); }else { try { TwoThreadWaitNotify.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } /** * 奇數線程 */ public static class JiNum implements Runnable { private TwoThreadWaitNotify number; public JiNum(TwoThreadWaitNotify number) { this.number = number; } @Override public void run() { while (number.start <= 100) { synchronized (TwoThreadWaitNotify.class) { System.out.println("奇數線程搶到鎖了"); if (!number.flag) { System.out.println(Thread.currentThread().getName() + "+-+奇數" + number.start); number.start++; number.flag = true; TwoThreadWaitNotify.class.notify(); }else { try { TwoThreadWaitNotify.class.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } }
輸出結果:
t2+-+奇數93 t1+-+偶數94 t2+-+奇數95 t1+-+偶數96 t2+-+奇數97 t1+-+偶數98 t2+-+奇數99 t1+-+偶數100
這里的線程 A 和線程 B 都對同一個對象 TwoThreadWaitNotify.class 獲取鎖,A 線程調用了同步對象的 wait() 方法釋放了鎖并進入 WAITING 狀態。
B 線程調用了 notify() 方法,這樣 A 線程收到通知之后就可以從 wait() 方法中返回。
這里利用了 TwoThreadWaitNotify.class 對象完成了通信。
有一些需要注意:
wait() 、nofify() 、nofityAll() 調用的前提都是獲得了對象的鎖(也可稱為對象監視器)。
調用 wait() 方法后線程會釋放鎖,進入 WAITING 狀態,該線程也會被移動到等待隊列中。
調用 notify() 方法會將等待隊列中的線程移動到同步隊列中,線程狀態也會更新為 BLOCKED
從 wait() 方法返回的前提是調用 notify() 方法的線程釋放鎖,wait() 方法的線程獲得鎖。
等待通知有著一個經典范式:
線程 A 作為消費者:
獲取對象的鎖。
進入 while(判斷條件),并調用 wait() 方法。
當條件滿足跳出循環執行具體處理邏輯。
線程 B 作為生產者:
獲取對象鎖。
更改與線程 A 共用的判斷條件。
調用 notify() 方法。
偽代碼如下:
//Thread A synchronized(Object){ while(條件){ Object.wait(); } //do something } //Thread B synchronized(Object){ 條件=false;//改變條件 Object.notify(); }join() 方法
private static void join() throws InterruptedException { Thread t1 = new Thread(new Runnable() { @Override public void run() { LOGGER.info("running"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }) ; Thread t2 = new Thread(new Runnable() { @Override public void run() { LOGGER.info("running2"); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } }) ; t1.start(); t2.start(); //等待線程1終止 t1.join(); //等待線程2終止 t2.join(); LOGGER.info("main over"); }
輸出結果:
2018-03-16 20:21:30.967 [Thread-1] INFO c.c.actual.ThreadCommunication - running2 2018-03-16 20:21:30.967 [Thread-0] INFO c.c.actual.ThreadCommunication - running 2018-03-16 20:21:34.972 [main] INFO c.c.actual.ThreadCommunication - main over
在 t1.join() 時會一直阻塞到 t1 執行完畢,所以最終主線程會等待 t1 和 t2 線程執行完畢。
其實從源碼可以看出,join() 也是利用的等待通知機制:
核心邏輯:
while (isAlive()) { wait(0); }
在 join 線程完成后會調用 notifyAll() 方法,是在 JVM 實現中調用,所以這里看不出來。
volatile 共享內存因為 Java 是采用共享內存的方式進行線程通信的,所以可以采用以下方式用主線程關閉 A 線程:
public class Volatile implements Runnable{ private static volatile boolean flag = true ; @Override public void run() { while (flag){ System.out.println(Thread.currentThread().getName() + "正在運行。。。"); } System.out.println(Thread.currentThread().getName() +"執行完畢"); } public static void main(String[] args) throws InterruptedException { Volatile aVolatile = new Volatile(); new Thread(aVolatile,"thread A").start(); System.out.println("main 線程正在運行") ; TimeUnit.MILLISECONDS.sleep(100) ; aVolatile.stopThread(); } private void stopThread(){ flag = false ; } }
輸出結果:
thread A正在運行。。。 thread A正在運行。。。 thread A正在運行。。。 thread A正在運行。。。 thread A執行完畢
這里的 flag 存放于主內存中,所以主線程和線程 A 都可以看到。
flag 采用 volatile 修飾主要是為了內存可見性,更多內容可以查看這里。
CountDownLatch 并發工具CountDownLatch 可以實現 join 相同的功能,但是更加的靈活。
private static void countDownLatch() throws Exception{ int thread = 3 ; long start = System.currentTimeMillis(); final CountDownLatch countDown = new CountDownLatch(thread); for (int i= 0 ;i輸出結果:
2018-03-16 20:19:44.126 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run 2018-03-16 20:19:44.126 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run 2018-03-16 20:19:44.126 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run 2018-03-16 20:19:46.136 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end 2018-03-16 20:19:46.136 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end 2018-03-16 20:19:46.136 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end 2018-03-16 20:19:46.136 [main] INFO c.c.actual.ThreadCommunication - main over total time=2012CountDownLatch 也是基于 AQS(AbstractQueuedSynchronizer) 實現的,更多實現參考 ReentrantLock 實現原理
初始化一個 CountDownLatch 時告訴并發的線程,然后在每個線程處理完畢之后調用 countDown() 方法。
該方法會將 AQS 內置的一個 state 狀態 -1 。
最終在主線程調用 await() 方法,它會阻塞直到 state == 0 的時候返回。
CyclicBarrier 并發工具private static void cyclicBarrier() throws Exception { CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ; new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { cyclicBarrier.await() ; } catch (Exception e) { e.printStackTrace(); } LOGGER.info("thread end do something"); } }).start(); new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { cyclicBarrier.await() ; } catch (Exception e) { e.printStackTrace(); } LOGGER.info("thread end do something"); } }).start(); new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { Thread.sleep(5000); cyclicBarrier.await() ; } catch (Exception e) { e.printStackTrace(); } LOGGER.info("thread end do something"); } }).start(); LOGGER.info("main thread"); }CyclicBarrier 中文名叫做屏障或者是柵欄,也可以用于線程間通信。
它可以等待 N 個線程都達到某個狀態后繼續運行的效果。
首先初始化線程參與者。
調用 await() 將會在所有參與者線程都調用之前等待。
直到所有參與者都調用了 await() 后,所有線程從 await() 返回繼續后續邏輯。
運行結果:
2018-03-18 22:40:00.731 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run 2018-03-18 22:40:00.731 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run 2018-03-18 22:40:00.731 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run 2018-03-18 22:40:00.731 [main] INFO c.c.actual.ThreadCommunication - main thread 2018-03-18 22:40:05.741 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end do something 2018-03-18 22:40:05.741 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end do something 2018-03-18 22:40:05.741 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end do something可以看出由于其中一個線程休眠了五秒,所有其余所有的線程都得等待這個線程調用 await() 。
該工具可以實現 CountDownLatch 同樣的功能,但是要更加靈活。甚至可以調用 reset() 方法重置 CyclicBarrier (需要自行捕獲 BrokenBarrierException 處理) 然后重新執行。
線程響應中斷public class StopThread implements Runnable { @Override public void run() { while ( !Thread.currentThread().isInterrupted()) { // 線程執行具體邏輯 System.out.println(Thread.currentThread().getName() + "運行中。。"); } System.out.println(Thread.currentThread().getName() + "退出。。"); } public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(new StopThread(), "thread A"); thread.start(); System.out.println("main 線程正在運行") ; TimeUnit.MILLISECONDS.sleep(10) ; thread.interrupt(); } }輸出結果:
thread A運行中。。 thread A運行中。。 thread A退出。。可以采用中斷線程的方式來通信,調用了 thread.interrupt() 方法其實就是將 thread 中的一個標志屬性置為了 true。
并不是說調用了該方法就可以中斷線程,如果不對這個標志進行響應其實是沒有什么作用(這里對這個標志進行了判斷)。
但是如果拋出了 InterruptedException 異常,該標志就會被 JVM 重置為 false。
線程池 awaitTermination() 方法如果是用線程池來管理線程,可以使用以下方式來讓主線程等待線程池中所有任務執行完畢:
private static void executorService() throws Exception{ BlockingQueuequeue = new LinkedBlockingQueue<>(10) ; ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue) ; poolExecutor.execute(new Runnable() { @Override public void run() { LOGGER.info("running"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }); poolExecutor.execute(new Runnable() { @Override public void run() { LOGGER.info("running2"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); poolExecutor.shutdown(); while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){ LOGGER.info("線程還在執行。。。"); } LOGGER.info("main over"); } 輸出結果:
2018-03-16 20:18:01.273 [pool-1-thread-2] INFO c.c.actual.ThreadCommunication - running2 2018-03-16 20:18:01.273 [pool-1-thread-1] INFO c.c.actual.ThreadCommunication - running 2018-03-16 20:18:02.273 [main] INFO c.c.actual.ThreadCommunication - 線程還在執行。。。 2018-03-16 20:18:03.278 [main] INFO c.c.actual.ThreadCommunication - 線程還在執行。。。 2018-03-16 20:18:04.278 [main] INFO c.c.actual.ThreadCommunication - main over使用這個 awaitTermination() 方法的前提需要關閉線程池,如調用了 shutdown() 方法。
調用了 shutdown() 之后線程池會停止接受新任務,并且會平滑的關閉線程池中現有的任務。
管道通信public static void piped() throws IOException { //面向于字符 PipedInputStream 面向于字節 PipedWriter writer = new PipedWriter(); PipedReader reader = new PipedReader(); //輸入輸出流建立連接 writer.connect(reader); Thread t1 = new Thread(new Runnable() { @Override public void run() { LOGGER.info("running"); try { for (int i = 0; i < 10; i++) { writer.write(i+""); Thread.sleep(10); } } catch (Exception e) { } finally { try { writer.close(); } catch (IOException e) { e.printStackTrace(); } } } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { LOGGER.info("running2"); int msg = 0; try { while ((msg = reader.read()) != -1) { LOGGER.info("msg={}", (char) msg); } } catch (Exception e) { } } }); t1.start(); t2.start(); }輸出結果:
2018-03-16 19:56:43.014 [Thread-0] INFO c.c.actual.ThreadCommunication - running 2018-03-16 19:56:43.014 [Thread-1] INFO c.c.actual.ThreadCommunication - running2 2018-03-16 19:56:43.130 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=0 2018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=1 2018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=2 2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=3 2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=4 2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=5 2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=6 2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=7 2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=8 2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=9Java 雖說是基于內存通信的,但也可以使用管道通信。
需要注意的是,輸入流和輸出流需要首先建立連接。這樣線程 B 就可以收到線程 A 發出的消息了。
實際開發中可以靈活根據需求選擇最適合的線程通信方式。
號外最近在總結一些 Java 相關的知識點,感興趣的朋友可以一起維護。
地址: https://github.com/crossoverJie/Java-Interview
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68869.html
摘要:為了避免一篇文章的篇幅過長,于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會持續更新,以給大家一個查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學習經歷。因為寫作的時候發現,為了弄懂一個知識,不得不先去了解另外一些知識,這樣以來,為了說明一個問題,就要把一系列知識都了解一遍,寫出來的文章就特別長。 為了避免一篇...
摘要:本文從內存模型角度,探討的實現原理。通過共享內存或者消息通知這兩種方法,可以實現通信或同步。基于共享內存的線程通信是隱式的,線程同步是顯式的而基于消息通知的線程通信是顯式的,線程同步是隱式的。鎖規則鎖的解鎖,于于鎖的獲取或加鎖。 一、前言 在java多線程編程中,volatile可以用來定義輕量級的共享變量,它比synchronized的使用成本更低,因為它不會引起線程上下文的切換和調...
摘要:所以接下來,我們需要簡單的介紹下多線程中的并發通信模型。比如中,以及各種鎖機制,均為了解決線程間公共狀態的串行訪問問題。 并發的學習門檻較高,相較單純的羅列并發編程 API 的枯燥被動學習方式,本系列文章試圖用一個簡單的栗子,一步步結合并發編程的相關知識分析舊有實現的不足,再實現邏輯進行分析改進,試圖展示例子背后的并發工具與實現原理。 本文是本系列的第一篇文章,提出了一個簡單的業務場景...
摘要:線程之間的通信由內存模型本文簡稱為控制,決定一個線程對共享變量的寫入何時對另一個線程可見。為了保證內存可見性,編譯器在生成指令序列的適當位置會插入內存屏障指令來禁止特定類型的處理器重排序。 并發編程模型的分類 在并發編程中,我們需要處理兩個關鍵問題:線程之間如何通信及線程之間如何同步(這里的線程是指并發執行的活動實體)。通信是指線程之間以何種機制來交換信息。在命令式編程中,線程之間的...
閱讀 1507·2021-11-25 09:43
閱讀 4057·2021-11-15 11:37
閱讀 3192·2021-08-17 10:13
閱讀 3503·2019-08-30 14:16
閱讀 3534·2019-08-26 18:37
閱讀 2488·2019-08-26 11:56
閱讀 1128·2019-08-26 10:42
閱讀 609·2019-08-26 10:39