摘要:前文回顧上一篇文章重點嘮叨了中協(xié)調(diào)線程間通信的機(jī)制,它有力的保證了線程間通信的安全性以及便利性。所以同一時刻廚師線程和服務(wù)員線程不會同時在等待隊列中。對于在操作系統(tǒng)中線程的阻塞狀態(tài),語言中用和這三個狀態(tài)分別表示。
前文回顧
上一篇文章重點嘮叨了java中協(xié)調(diào)線程間通信的wait/notify機(jī)制,它有力的保證了線程間通信的安全性以及便利性。本篇將介紹wait/notify機(jī)制的一個應(yīng)用以及更多線程間通信的內(nèi)容。
生產(chǎn)者-消費(fèi)者模式目光從廁所轉(zhuǎn)到飯館,一個飯館里通常都有好多廚師以及好多服務(wù)員,這里我們把廚師稱為生產(chǎn)者,把服務(wù)員稱為消費(fèi)者,廚師和服務(wù)員是不直接打交道的,而是在廚師做好菜之后放到窗口,服務(wù)員從窗口直接把菜端走給客人就好了,這樣會極大的提升工作效率,因為省去了生產(chǎn)者和消費(fèi)者之間的溝通成本。從java的角度看這個事情,每一個廚師就相當(dāng)于一個生產(chǎn)者線程,每一個服務(wù)員都相當(dāng)于一個消費(fèi)者線程,而放菜的窗口就相當(dāng)于一個緩沖隊列,生產(chǎn)者線程不斷把生產(chǎn)好的東西放到緩沖隊列里,消費(fèi)者線程不斷從緩沖隊列里取東西,畫個圖就像是這樣:
現(xiàn)實中放菜的窗口能放的菜數(shù)量是有限的,我們假設(shè)這個窗口只能放5個菜。那么廚師在做完菜之后需要看一下窗口是不是滿了,如果窗口已經(jīng)滿了的話,就在一旁抽根煙等待,直到有服務(wù)員來取菜的時候通知一下廚師窗口有了空閑,可以放菜了,這時廚師再把自己做的菜放到窗口上去炒下一個菜。從服務(wù)員的角度來說,如果窗口是空的,那么也去一旁抽根煙等待,直到有廚師把菜做好了放到窗口上,并且通知他們一下,然后再把菜端走。
我們先用java抽象一下菜:
public class Food { private static int counter = 0; private int i; //代表生產(chǎn)的第幾個菜 public Food() { i = ++counter; } @Override public String toString() { return "第" + i + "個菜"; } }
每次創(chuàng)建Food對象,字段i的值都會加1,代表這是創(chuàng)建的第幾道菜。
為了故事的順利進(jìn)行,我們首先定義一個工具類:
class SleepUtil { private static Random random = new Random(); public static void randomSleep() { try { Thread.sleep(random.nextInt(1000)); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
SleepUtil的靜態(tài)方法randomSleep代表當(dāng)前線程隨機(jī)休眠一秒內(nèi)的時間。
然后我們再用java定義一下廚師:
public class Cook extends Thread { private Queuequeue; public Cook(Queue queue, String name) { super(name); this.queue = queue; } @Override public void run() { while (true) { SleepUtil.randomSleep(); //模擬廚師炒菜時間 Food food = new Food(); System.out.println(getName() + " 生產(chǎn)了" + food); synchronized (queue) { while (queue.size() > 4) { try { System.out.println("隊列元素超過5個,為:" + queue.size() + " " + getName() + "抽根煙等待中"); queue.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } queue.add(food); queue.notifyAll(); } } } }
我們說每一個廚師Cook都是一個線程,內(nèi)部維護(hù)了一個名叫queue的隊列。在run方法中是一個死循環(huán),代表不斷的生產(chǎn)Food。他每生產(chǎn)一個Food后,都要判斷queue隊列中元素的個數(shù)是不是大于4,如果大于4的話,就調(diào)用queue.wait()等待,如果不大于4的話,就把創(chuàng)建號的Food對象放到queue隊列中,由于可能多個線程同時訪問queue的各個方法,所以對這段代碼用queue對象來加鎖保護(hù)。當(dāng)向隊列添加完剛創(chuàng)建的Food對象之后,就可以通知queue這個鎖對象關(guān)聯(lián)的等待隊列中的服務(wù)員線程們可以繼續(xù)端菜了。
然后我們再用java定義一下服務(wù)員:
class Waiter extends Thread { private Queuequeue; public Waiter(Queue queue, String name) { super(name); this.queue = queue; } @Override public void run() { while (true) { Food food; synchronized (queue) { while (queue.size() < 1) { try { System.out.println("隊列元素個數(shù)為: " + queue.size() + "," + getName() + "抽根煙等待中"); queue.wait(); } catch (InterruptedException e) { throw new RuntimeException(e); } } food = queue.remove(); System.out.println(getName() + " 獲取到:" + food); queue.notifyAll(); } SleepUtil.randomSleep(); //模擬服務(wù)員端菜時間 } } }
每個服務(wù)員也是一個線程,和廚師一樣,都在內(nèi)部維護(hù)了一個名叫queue的隊列。在run方法中是一個死循環(huán),代表不斷的從隊列中取走Food。每次在從queue隊列中取Food對象的時候,都需要判斷一下隊列中的元素是否小于1,如果小于1的話,就調(diào)用queue.wait()等待,如果不小于1的話,也就是隊列里有元素,就從隊列里取走一個Food對象,并且通知與queue這個鎖對象關(guān)聯(lián)的等待隊列中的廚師線程們可以繼續(xù)向隊列里放入Food對象了。
在廚師和服務(wù)員線程類都定義好了之后,我們再創(chuàng)建一個Restaurant類,來看看在餐館里真實發(fā)生的事情:
public class Restaurant { public static void main(String[] args) { Queuequeue = new LinkedList<>(); new Cook(queue, "1號廚師").start(); new Cook(queue, "2號廚師").start(); new Cook(queue, "3號廚師").start(); new Waiter(queue, "1號服務(wù)員").start(); new Waiter(queue, "2號服務(wù)員").start(); new Waiter(queue, "3號服務(wù)員").start(); } }
我們在Restaurant中安排了3個廚師和3個服務(wù)員,大家執(zhí)行一下這個程序,會發(fā)現(xiàn)在如果廚師生產(chǎn)的過快,廚師就會等待,如果服務(wù)員端菜速度過快,服務(wù)員就會等待。但是整個過程廚師和服務(wù)員是沒有任何關(guān)系的,它們是通過隊列queue實現(xiàn)了所謂的解耦。
這個過程雖然不是很復(fù)雜,但是使用中還是需要注意一些問題:
我們這里的廚師和服務(wù)員使用同一個鎖queue。
使用同一個鎖是因為對queue的操作只能用同一個鎖來保護(hù),假設(shè)使用不同的鎖,廚師線程調(diào)用queue.add方法,服務(wù)員線程調(diào)用queue.remove方法,這兩個方法都不是原子操作,多線程并發(fā)執(zhí)行的時候會出現(xiàn)不可預(yù)測的結(jié)果,所以我們使用同一個鎖來保護(hù)對queue這個變量的操作,這一點我們在嘮叨設(shè)計線程安全類的時候已經(jīng)強(qiáng)調(diào)過了。
廚師和服務(wù)員線程使用同一個鎖queue的后果就是廚師線程和服務(wù)員線程使用的是同一個等待隊列。
但是同一時刻廚師線程和服務(wù)員線程不會同時在等待隊列中,因為當(dāng)廚師線程在wait的時候,隊列里的元素肯定是5,此時服務(wù)員線程肯定是不會wait的,但是消費(fèi)的過程是被鎖對象queue保護(hù)的,所以在一個服務(wù)員線程消費(fèi)了一個Food之后,就會調(diào)用notifyAll來喚醒等待隊列中的廚師線程們;當(dāng)消費(fèi)者線程在wait的時候,隊列里的元素肯定是0,此時廚師線程肯定是不會wait的,生產(chǎn)的過程是被鎖對象queue保護(hù)的,所以在一個廚師線程生產(chǎn)了一個Food對象之后,就會調(diào)用notifyAll來喚醒等待隊列中的服務(wù)員線程們。所以同一時刻廚師線程和服務(wù)員線程不會同時在等待隊列中。
在生產(chǎn)和消費(fèi)過程,我們都調(diào)用了SleepUtil.randomSleep();。
我們這里的生產(chǎn)者-消費(fèi)者模型是把實際使用的場景進(jìn)行了簡化,真正的實際場景中生產(chǎn)過程和消費(fèi)過程一般都會很耗時,這些耗時的操作最好不要放在同步代碼塊中,這樣會造成別的線程的長時間阻塞。如果把生產(chǎn)過程和消費(fèi)過程都放在同步代碼塊中,也就是說在一個廚師炒菜的同時不允許別的廚師炒菜,在一個服務(wù)員端菜的同時不允許別的程序員端菜,這個顯然是不合理的,大家需要注意這一點。
以上就是wait/notify機(jī)制的一個現(xiàn)實應(yīng)用:生產(chǎn)者-消費(fèi)者模式的一個簡介。
管道輸入/輸出流還記得在嘮叨I/O的時候提到的管道流么,這些管道流就是用于在不同線程之間的數(shù)據(jù)傳輸,一共有四種管道流:
PipedInputStream:管道輸入字節(jié)流
PipedOutputStream:管道輸出字節(jié)流
PipedReader:管道輸入字符流
PipedWriter:管道輸出字符流
字節(jié)流和字符流的用法是差不多的,我們下邊以字節(jié)流為例來嘮叨一下管道流的用法。
一個線程可以持有一個PipedInputStream對象,這個PipedInputStream對象在內(nèi)部維護(hù)了一個字節(jié)數(shù)組,默認(rèn)大小為1024字節(jié)。它并不能多帶帶使用,需要與另一個線程持有的一個PipedOutputStream建立關(guān)聯(lián),PipedOutputStream往該字節(jié)數(shù)組中寫數(shù)據(jù),PipedInputStream從該字節(jié)數(shù)組中讀數(shù)據(jù),從而實現(xiàn)兩個線程的通信。
PipedInputStream
先看一下它的幾個構(gòu)造方法:
它有一個特別重要的方法就是:
PipedOutputStream
看一下它的構(gòu)造方法:
它也有一個連接到管道輸入流的方法:
使用示例
管道流的通常使用場景就是一個線程持有一個PipedInputStream對象,另一個線程持有一個PipedOutputStream對象,然后把這兩個輸入輸出管道流通過connect方法建立連接,此后從管道輸出流寫入的數(shù)據(jù)就可以通過管道輸入流讀出,從而實現(xiàn)了兩個線程間的數(shù)據(jù)交換,也就是實現(xiàn)了線程間的通信:
public class PipedDemo { public static void main(String[] args){ PipedInputStream in = new PipedInputStream(); PipedOutputStream out = new PipedOutputStream(); try { in.connect(out); //將輸入流和輸出流建立關(guān)聯(lián) } catch (IOException e) { throw new RuntimeException(e); } new ReadThread(in).start(); new WriteThread(out).start(); } } class ReadThread extends Thread { private PipedInputStream in; public ReadThread(PipedInputStream in) { this.in = in; } @Override public void run() { int i = 0; try { while ((i=in.read()) != -1) { //從輸入流讀取數(shù)據(jù) System.out.println(i); } } catch (IOException e) { throw new RuntimeException(e); } finally { try { in.close(); } catch (IOException e) { throw new RuntimeException(e); } } } } class WriteThread extends Thread { private PipedOutputStream out; public WriteThread(PipedOutputStream out) { this.out = out; } @Override public void run() { byte[] bytes = {1, 2, 3, 4, 5}; try { out.write(bytes); //向輸出流寫入數(shù)據(jù) out.flush(); } catch (IOException e) { throw new RuntimeException(e); } finally { try { out.close(); } catch (IOException e) { throw new RuntimeException(e); } } } }
執(zhí)行結(jié)果是:
1 2 3 4 5join方法
我們前邊說過這個方法,比如有代碼是這樣:
public static void main(String[] args) { Thread t = new Thread(new Runnable() { @Override public void run() { // ... 線程t執(zhí)行的具體任務(wù) } }, "t"); t.start(); t.join(); System.out.println("t線程執(zhí)行完了,繼續(xù)執(zhí)行main線程"); }
在main線程中調(diào)用t.join(),代表main線程需要等待t線程執(zhí)行完成后才能繼續(xù)執(zhí)行。也就是說,這個join方法可以協(xié)調(diào)各個線程之間的執(zhí)行順序。它的實現(xiàn)其實很簡單:
public final synchronized void join() throws InterruptedException { while (isAlive()) { wait(); } }
需要注意的是,join方法是Thread類的成員方法。上邊例子中在main線程中調(diào)用t.join()的意思就是,使用Thread對象t作為鎖對象,如果t線程還活著,就調(diào)用wait(),把main線程放到與t對象關(guān)聯(lián)的等待隊列里,直到t線程執(zhí)行結(jié)束,系統(tǒng)會主動調(diào)用一下t.notifyAll(),把與t對象關(guān)聯(lián)的等待隊列中的線程全部移出,從而main線程可以繼續(xù)執(zhí)行~
當(dāng)然它還有兩個指定等待時間的重載方法:
java為了方便的管理線程,對底層的操作系統(tǒng)的線程狀態(tài)做了一些抽象封裝,定義了如下的線程狀態(tài):
需要注意的是:
對于在操作系統(tǒng)中線程的運(yùn)行/就緒狀態(tài),java語言中統(tǒng)一用RUNNABLE狀態(tài)來表示。
對于在操作系統(tǒng)中線程的阻塞狀態(tài),java語言中用BLOCKED、WAITING和TIME_WAITING這三個狀態(tài)分別表示。
也就是對阻塞狀態(tài)進(jìn)行了進(jìn)一步細(xì)分。對于因為獲取不到鎖而產(chǎn)生的阻塞稱為BLOCKED狀態(tài),因為調(diào)用wait或者join方法而產(chǎn)生的阻塞稱為WAITING狀態(tài),因為調(diào)用有超時時間的wait、join或者sleep方法而產(chǎn)生的在有限時間內(nèi)阻塞稱為TIME_WAITING狀態(tài)。
大家可以通過這個圖來詳細(xì)的看一下各個狀態(tài)之間的轉(zhuǎn)換過程:
java這么劃分線程的狀態(tài)純屬于方便自己的管理,比如它會給在WAITING和TIMED_WAITING狀態(tài)的線程分別建立不同的隊列,來方便實施不同的恢復(fù)策略~所以大家也不用糾結(jié)為啥和操作系統(tǒng)中定義的不一樣,其實操作系統(tǒng)中對各個狀態(tài)的線程仍然有各種細(xì)分來方便管理,如果是你去設(shè)計一個語言或者一個操作系統(tǒng),你也可以為了自己的方便來定義一下線程的各種狀態(tài)。我們作為語言的使用者,首先還是把這些狀態(tài)記住了再說哈
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/74171.html
摘要:結(jié)構(gòu)型模式適配器模式橋接模式裝飾模式組合模式外觀模式享元模式代理模式。行為型模式模版方法模式命令模式迭代器模式觀察者模式中介者模式備忘錄模式解釋器模式模式狀態(tài)模式策略模式職責(zé)鏈模式責(zé)任鏈模式訪問者模式。 主要版本 更新時間 備注 v1.0 2015-08-01 首次發(fā)布 v1.1 2018-03-12 增加新技術(shù)知識、完善知識體系 v2.0 2019-02-19 結(jié)構(gòu)...
摘要:如果某線程并未使用很多操作,它會在自己的時間片內(nèi)一直占用處理器和。在中使用線程在和等大多數(shù)類系統(tǒng)上運(yùn)行時,支持多線程編程。守護(hù)線程另一個避免使用模塊的原因是,它不支持守護(hù)線程。 這一篇是Python并發(fā)的第四篇,主要介紹進(jìn)程和線程的定義,Python線程和全局解釋器鎖以及Python如何使用thread模塊處理并發(fā) 引言&動機(jī) 考慮一下這個場景,我們有10000條數(shù)據(jù)需要處理,處理每條...
摘要:本文探討并發(fā)中的其它問題線程安全可見性活躍性等等。當(dāng)閉鎖到達(dá)結(jié)束狀態(tài)時,門打開并允許所有線程通過。在從返回時被叫醒時,線程被放入鎖池,與其他線程競爭重新獲得鎖。 本文探討Java并發(fā)中的其它問題:線程安全、可見性、活躍性等等。 在行文之前,我想先推薦以下兩份資料,質(zhì)量很高:極客學(xué)院-Java并發(fā)編程讀書筆記-《Java并發(fā)編程實戰(zhàn)》 線程安全 《Java并發(fā)編程實戰(zhàn)》中提到了太多的術(shù)語...
摘要:微軟的雖然引入了事件機(jī)制,可以在隊列收到消息時觸發(fā)事件,通知訂閱者。由微軟作為主要貢獻(xiàn)者的,則對以及做了進(jìn)一層包裝,并能夠很好地實現(xiàn)這一模式。 在分布式服務(wù)框架中,一個最基礎(chǔ)的問題就是遠(yuǎn)程服務(wù)是怎么通訊的,在Java領(lǐng)域中有很多可實現(xiàn)遠(yuǎn)程通訊的技術(shù),例如:RMI、MINA、ESB、Burlap、Hessian、SOAP、EJB和JMS等,這些名詞之間到底是些什么關(guān)系呢,它們背后到底是基...
摘要:微軟的雖然引入了事件機(jī)制,可以在隊列收到消息時觸發(fā)事件,通知訂閱者。由微軟作為主要貢獻(xiàn)者的,則對以及做了進(jìn)一層包裝,并能夠很好地實現(xiàn)這一模式。 在分布式服務(wù)框架中,一個最基礎(chǔ)的問題就是遠(yuǎn)程服務(wù)是怎么通訊的,在Java領(lǐng)域中有很多可實現(xiàn)遠(yuǎn)程通訊的技術(shù),例如:RMI、MINA、ESB、Burlap、Hessian、SOAP、EJB和JMS等,這些名詞之間到底是些什么關(guān)系呢,它們背后到底是基...
閱讀 3069·2021-11-24 11:14
閱讀 3480·2021-11-22 15:22
閱讀 3200·2021-09-27 13:36
閱讀 712·2021-08-31 14:29
閱讀 1328·2019-08-30 15:55
閱讀 1752·2019-08-29 17:29
閱讀 1143·2019-08-29 16:24
閱讀 2400·2019-08-26 13:48