摘要:在上一篇文章從到實(shí)現(xiàn)自己的阻塞隊(duì)列上中,我們已經(jīng)實(shí)現(xiàn)了一個(gè)可以使用的阻塞隊(duì)列版本。插入鎖隊(duì)列未滿的條件變量彈出鎖隊(duì)列非空的條件變量最后我們要對(duì)和方法中的調(diào)用做出一些調(diào)整。
在上一篇文章《從0到1實(shí)現(xiàn)自己的阻塞隊(duì)列(上)》中,我們已經(jīng)實(shí)現(xiàn)了一個(gè)可以使用的阻塞隊(duì)列版本。在這篇文章中,我們可以繼續(xù)我們的冒險(xiǎn)之旅,將我們的阻塞隊(duì)列提升到接近JDK版本的水平上。
更進(jìn)一步優(yōu)化效率我們一直使用的都是Object.notifyAll()或者condition.signalAll()這樣會(huì)喚醒所有線程的方法,那么如果只有一個(gè)線程能夠順利執(zhí)行,但是其他線程都要再次回到等待狀態(tài)繼續(xù)休眠,那不是非常的浪費(fèi)嗎?比如如果有N個(gè)消費(fèi)者線程在等待隊(duì)列中出現(xiàn)元素,那么當(dāng)一個(gè)元素被插入以后所有N個(gè)消費(fèi)者線程都被全部喚醒,最后也只會(huì)有一個(gè)消費(fèi)者線程能夠真正拿到元素并執(zhí)行完成,其他線程不是都被白白喚醒了嗎?我們?yōu)槭裁床挥弥粫?huì)喚醒一個(gè)線程的Object.notify()和condition.signal()方法呢?
拆分條件變量在阻塞隊(duì)列中,我們可以使用Object.notify()或者condition.signal()這樣只喚醒一個(gè)線程的方法,但是會(huì)有一些前提條件:
首先,在一個(gè)條件變量上等待的線程必須是同一類型的。比如一個(gè)條件變量上只有消費(fèi)者線程在等待,另一個(gè)條件變量上只有生產(chǎn)者線程在等待。這么做的目的就是防止發(fā)生我們?cè)诓迦霑r(shí)想喚醒的是消費(fèi)者線程,但是喚醒了一個(gè)生產(chǎn)者線程,這個(gè)生產(chǎn)者線程又因?yàn)殛?duì)列已滿又進(jìn)入了等待狀態(tài),這樣我們需要喚醒的消費(fèi)者線程就永遠(yuǎn)不會(huì)被喚醒了。
另外還有一點(diǎn)就是這個(gè)條件變量上等待的線程只能互斥執(zhí)行,如果N個(gè)生產(chǎn)者線程可以同時(shí)執(zhí)行,我們也就不需要一個(gè)一個(gè)喚醒了,這樣反而會(huì)讓效率降低。當(dāng)然,在我們的阻塞隊(duì)列當(dāng)中,不管是插入還是彈出操作同一時(shí)間都只能有一個(gè)線程在執(zhí)行,所以自然就滿足這個(gè)要求了。
所以,我們只需要滿足第一個(gè)要求讓不同類型的線程在不同的條件變量上等待就可以了。那么具體要怎么做呢?
首先,我們自然是要把原來的一個(gè)條件變量condition給拆分成兩個(gè)實(shí)例變量notFull和notEmpty,這兩個(gè)條件變量雖然對(duì)應(yīng)于同一互斥鎖,但是兩個(gè)條件變量的等待和喚醒操作是完全隔離的。這兩個(gè)條件變量分別代表隊(duì)列未滿和隊(duì)列非空兩個(gè)條件,消費(fèi)者線程因?yàn)槭潜魂?duì)列為空的情況所阻塞的,所以就應(yīng)該等待隊(duì)列非空條件得到滿足;而生產(chǎn)者線程因?yàn)槭潜魂?duì)列已滿的情況所阻塞的,自然就要等待隊(duì)列未滿條件的成立。
/** 隊(duì)列未滿的條件變量 */ private final Condition notFull = lock.newCondition(); /** 隊(duì)列非空的條件變量 */ private final Condition notEmpty = lock.newCondition();
所以在put()和take()方法中,我們就需要把take()方法中原來的condition.await()修改為等待隊(duì)列非空條件,即notEmpty.await();而put()方法中的condition.await()自然是要修改為等待隊(duì)列未滿條件成立,即notFull.await()。既然我們把等待條件變量的語句都改了,那么喚醒的語句也要做同樣的修改,put()操作要喚醒等待的消費(fèi)者線程,所以是notEmpty.signal();take()操作要喚醒的生產(chǎn)者線程,所以是notFull.signal()。修改完成后的代碼如下,大家可以參考一下:
/** * 將指定元素插入隊(duì)列 * * @param e 待插入的對(duì)象 */ public void put(Object e) throws InterruptedException { lock.lockInterruptibly(); try { while (count == items.length) { // 隊(duì)列已滿時(shí)進(jìn)入休眠 // 等待隊(duì)列未滿條件得到滿足 notFull.await(); } // 執(zhí)行入隊(duì)操作,將對(duì)象e實(shí)際放入隊(duì)列中 enqueue(e); // 插入元素后喚醒一個(gè)等待隊(duì)列非空條件成立的線程 notEmpty.signal(); } finally { lock.unlock(); } } /** * 從隊(duì)列中彈出一個(gè)元素 * * @return 被彈出的元素 */ public Object take() throws InterruptedException { lock.lockInterruptibly(); try { while (count == 0) { // 隊(duì)列為空時(shí)進(jìn)入休眠 // 等待隊(duì)列非空條件得到滿足 notEmpty.await(); } // 執(zhí)行出隊(duì)操作,將隊(duì)列中的第一個(gè)元素彈出 Object e = dequeue(); // 彈出元素后喚醒一個(gè)等待隊(duì)列未滿條件成立的線程 notFull.signal(); return e; } finally { lock.unlock(); } }驗(yàn)證程序的效率
既然我們對(duì)阻塞隊(duì)列做了效率上的改進(jìn),那么就讓我們來實(shí)際檢驗(yàn)一下吧。我們還是之前已經(jīng)提供的檢驗(yàn)程序,但是不同的是,為了明顯地看出效率上的變化,我們需要修改一下程序中的兩個(gè)變量。首先,我們需要把檢驗(yàn)程序中運(yùn)行的線程數(shù)threads增加到400,然后我們需要把每個(gè)線程執(zhí)行的次數(shù)改為100次,就像下面這樣:
// 創(chuàng)建400個(gè)線程 final int threads = 400; // 每個(gè)線程執(zhí)行100次 final int times = 100;
最后我們分別使用改進(jìn)前和改進(jìn)后的版本來執(zhí)行這個(gè)這個(gè)阻塞隊(duì)列,在我的電腦上,改進(jìn)前的版本耗時(shí)為7.80s,改進(jìn)后的版本耗時(shí)為1.35s。看起來我們對(duì)阻塞隊(duì)列的效率做了一個(gè)非常大的提升,非常好,那我們還有沒有辦法再加快一點(diǎn)呢?
還能不能更快?在上面的阻塞隊(duì)列實(shí)現(xiàn)中,我們主要使用的就是put()和take()兩個(gè)操作。而因?yàn)橛谢コ怄iReentrantLock的保護(hù),所以這兩個(gè)方法在同一時(shí)間只能有一個(gè)線程調(diào)用。也就是說,生產(chǎn)者線程在操作隊(duì)列時(shí)同樣會(huì)阻塞消費(fèi)者線程。不過從我們的代碼中看,實(shí)際上put()方法和take()方法之間需要有互斥鎖保護(hù)的共享數(shù)據(jù)訪問只發(fā)生在入隊(duì)操作enqueue方法和出隊(duì)操作dequeue方法之中。在這兩個(gè)方法里,對(duì)于putIndex和takeIndex的訪問是完全隔離的,enqueue只使用putIndex,而dequeue只使用takeIndex,那么線程間的競(jìng)爭(zhēng)性數(shù)據(jù)就只剩下count了。這樣的話,如果我們能解決count的更新問題是不是就可以把鎖lock拆分為兩個(gè)互斥鎖,分別讓生產(chǎn)者線程和消費(fèi)者線程使用了呢?這樣的話生產(chǎn)者線程在操作時(shí)就只會(huì)阻塞生產(chǎn)者線程而不會(huì)阻塞消費(fèi)者線程了,消費(fèi)者線程也是一樣的道理。
拆分鎖這時(shí)候就要請(qǐng)出我們很熟悉的一種同步工具CAS了,CAS是一個(gè)原子操作,它會(huì)接收兩個(gè)參數(shù),一個(gè)是當(dāng)前值,一個(gè)是目標(biāo)值,如果當(dāng)前值已經(jīng)發(fā)生了改變,那么就會(huì)返回失敗,而如果當(dāng)前值沒有變化,就會(huì)將這個(gè)變量修改為目標(biāo)值。在Java中,我們一般會(huì)通過java.util.concurrent中的AtomicInteger來執(zhí)行CAS操作。在AtomicInteger類上有原子性的增加與減少方法,每次調(diào)用都可以保證對(duì)指定的對(duì)象進(jìn)行增加或減少,并且即使有多個(gè)線程同時(shí)執(zhí)行這些操作,它們的結(jié)果也仍然是正確的。
首先,為了保證入隊(duì)和出隊(duì)操作之間的互斥特性移除后兩個(gè)方法能夠并發(fā)執(zhí)行,那么我們就要保證對(duì)count的更新是線程安全的。因此,我們首先需要把實(shí)例變量count的類型從int修改為AtomicInteger,而AtomicInteger類就提供了我們需要的原子性的增加與減少接口。
/** 隊(duì)列中的元素總數(shù) */ private AtomicInteger count = new AtomicInteger(0);
然后對(duì)應(yīng)地,我們需要將入隊(duì)方法中的count++和出隊(duì)方法中的count--分別改為Atomic原子性的加1方法getAndIncrement與減1方法getAndDecrement。
/** * 入隊(duì)操作 * * @param e 待插入的對(duì)象 */ private void enqueue(Object e) { // 將對(duì)象e放入putIndex指向的位置 items[putIndex] = e; // putIndex向后移一位,如果已到末尾則返回隊(duì)列開頭(位置0) if (++putIndex == items.length) putIndex = 0; // 增加元素總數(shù) count.getAndIncrement(); } /** * 出隊(duì)操作 * * @return 被彈出的元素 */ private Object dequeue() { // 取出takeIndex指向位置中的元素 // 并將該位置清空 Object e = items[takeIndex]; items[takeIndex] = null; // takeIndex向后移一位,如果已到末尾則返回隊(duì)列開頭(位置0) if (++takeIndex == items.length) takeIndex = 0; // 減少元素總數(shù) count.getAndDecrement(); // 返回之前代碼中取出的元素e return e; }
到這里,我們就已經(jīng)解決了put()和take()方法之間的數(shù)據(jù)競(jìng)爭(zhēng)問題,兩個(gè)方法現(xiàn)在就可以分別用兩個(gè)鎖來控制了。雖然相同類型的線程仍然是互斥的,例如生產(chǎn)者和生產(chǎn)者之間同一時(shí)間只能有一個(gè)生產(chǎn)者線程在操作隊(duì)列。但是在生產(chǎn)者線程和消費(fèi)者線程之間將不用再繼續(xù)互斥,一個(gè)生產(chǎn)者線程和一個(gè)消費(fèi)者線程可以在同一時(shí)間操作同一阻塞隊(duì)列了。所以,我們?cè)谶@里可以將互斥鎖lock拆為兩個(gè),分別保證生產(chǎn)者線程和消費(fèi)者線程的互斥性,我們將它們命名為插入鎖putLock和彈出鎖takeLock。同時(shí),原來的條件變量也要分別對(duì)應(yīng)于不同的互斥鎖了,notFull要對(duì)應(yīng)于putLock,因?yàn)椴迦朐氐纳a(chǎn)者線程需要等待隊(duì)列未滿條件,那么notEmpyt自然就要對(duì)應(yīng)于takeLock了。
/** 插入鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 隊(duì)列未滿的條件變量 */ private final Condition notFull = putLock.newCondition(); /** 彈出鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 隊(duì)列非空的條件變量 */ private final Condition notEmpty = takeLock.newCondition();
最后我們要對(duì)put()和take()方法中的signal()調(diào)用做出一些調(diào)整。因?yàn)樵谏衔闹刑岬降模谑褂脳l件變量時(shí)一定要先持有條件變量所對(duì)應(yīng)的互斥鎖,而在put()和take()方法中,使用signal()方法喚醒的都是另一種類型的線程,例如生產(chǎn)者線程喚醒消費(fèi)者,消費(fèi)者線程喚醒生產(chǎn)者。這樣我們調(diào)用signal()方法的條件變量就和try語句中持有的鎖不一致了,所以我們必須將直接的xxx.signal()調(diào)用替換為一個(gè)私有方法調(diào)用。而在私有方法中,我們會(huì)先獲取與條件變量對(duì)應(yīng)的鎖,然后再調(diào)用條件變量的signal()方法。比如在下面的signalNotEmpty()方法中,我們就要先獲取takeLock才能調(diào)用notEmpty.signal();而在signalNotFull()方法中,我們就要先獲取putLock才能調(diào)用notFull.signal()。
/** * 喚醒等待隊(duì)列非空條件的線程 */ private void signalNotEmpty() { // 為了喚醒等待隊(duì)列非空條件的線程,需要先獲取對(duì)應(yīng)的takeLock takeLock.lock(); try { // 喚醒一個(gè)等待非空條件的線程 notEmpty.signal(); } finally { takeLock.unlock(); } } /** * 喚醒等待隊(duì)列未滿條件的線程 */ private void signalNotFull() { // 為了喚醒等待隊(duì)列未滿條件的線程,需要先獲取對(duì)應(yīng)的putLock putLock.lock(); try { // 喚醒一個(gè)等待隊(duì)列未滿條件的線程 notFull.signal(); } finally { putLock.unlock(); } }解決死鎖問題
但直接把notFull.signal()換成signalNotFull(),把notEmpty.signal()換成signalNotEmpty()還不夠,因?yàn)樵谖覀兊拇a中,原來的notFull.signal()和notEmpty.signal()都是在持有鎖的try語句塊當(dāng)中的。一旦我們做了調(diào)用私有方法的替換,那么put()和take()方法就會(huì)以相反的順序同時(shí)獲取putLock和takeLock兩個(gè)鎖。有一些讀者可能已經(jīng)意識(shí)到這樣會(huì)產(chǎn)生死鎖問題了,那么我們應(yīng)該怎么解決它呢?
最好的方法就是不要同時(shí)加兩個(gè)鎖,我們完全可以在釋放前一個(gè)之后再使用signal()方法來喚醒另一種類型的線程。就像下面的put()與take()方法中所做的一樣,我們可以在執(zhí)行完入隊(duì)操作之后就釋放插入鎖putLock,然后才運(yùn)行signalNotEmpty()方法去獲取takeLock并調(diào)用與其對(duì)應(yīng)的條件變量notEmpty的signal()方法,在take()方法中也是一樣的道理。
/** * 將指定元素插入隊(duì)列 * * @param e 待插入的對(duì)象 */ public void put(Object e) throws InterruptedException { putLock.lockInterruptibly(); try { while (count.get() == items.length) { // 隊(duì)列已滿時(shí)進(jìn)入休眠 // 等待隊(duì)列未滿條件得到滿足 notFull.await(); } // 執(zhí)行入隊(duì)操作,將對(duì)象e實(shí)際放入隊(duì)列中 enqueue(e); } finally { putLock.unlock(); } // 喚醒等待隊(duì)列非空條件的線程 // 為了防止死鎖,不能在釋放putLock之前獲取takeLock signalNotEmpty(); } /** * 從隊(duì)列中彈出一個(gè)元素 * * @return 被彈出的元素 */ public Object take() throws InterruptedException { Object e; takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 隊(duì)列為空時(shí)進(jìn)入休眠 // 等待隊(duì)列非空條件得到滿足 notEmpty.await(); } // 執(zhí)行出隊(duì)操作,將隊(duì)列中的第一個(gè)元素彈出 e = dequeue(); } finally { takeLock.unlock(); } // 喚醒等待隊(duì)列未滿條件的線程 // 為了防止死鎖,不能在釋放takeLock之前獲取putLock signalNotFull(); return e; }
到了這里我們就順利地把原來單一的一個(gè)lock鎖拆分為了插入鎖putLock和takeLock,這樣生產(chǎn)者線程和消費(fèi)者線程就可以同時(shí)運(yùn)行了。
最后的細(xì)節(jié)優(yōu)化——優(yōu)化喚醒其他線程的效率啊?我們的阻塞隊(duì)列到了這里還能再繼續(xù)優(yōu)化嗎?其實(shí)我們做的優(yōu)化已經(jīng)足夠多了,基本上影響比較大的優(yōu)化我們都做了,但是還有一些細(xì)節(jié)是可以最后完善一下的。比如說如果隊(duì)列并沒有為空或者已滿時(shí),我們插入或者彈出了元素其實(shí)都是不需要喚醒任何線程的,多余的喚醒操作需要先獲取ReentrantLock鎖才能調(diào)用對(duì)應(yīng)的條件變量的signal()方法,而獲取鎖是一個(gè)成本比較大的操作。所以我們最好是能在隊(duì)列真的為空或者已滿以后,成功插入或彈出元素時(shí),再去獲取鎖并喚醒等待的線程。
也就是說我們會(huì)將signalNotEmpty();修改為if (c == 0) signalNotEmpty();,而把signalNotFull();修改為if (c == items.length) signalNotFull();,也就是只有在必要的時(shí)候才去喚醒另一種類型的線程。但是這種修改又會(huì)引入另外一種問題,例如有N個(gè)消費(fèi)者線程在等待隊(duì)列非空,這時(shí)有兩個(gè)生產(chǎn)者線程插入了兩個(gè)元素,但是這兩個(gè)插入操作是連續(xù)發(fā)生的,也就是說只有第一個(gè)生產(chǎn)者線程在插入元素之后調(diào)用了signalNotEmpty(),第二個(gè)線程看到隊(duì)列原本是非空的就不會(huì)調(diào)用喚醒方法。在這種情況下,實(shí)際就只有一個(gè)消費(fèi)者線程被喚醒了,而實(shí)際上隊(duì)列中還有一個(gè)元素可供消費(fèi)。那么我們?nèi)绾谓鉀Q這個(gè)問題呢?
比較簡(jiǎn)單的一種方法就是,生產(chǎn)者線程和消費(fèi)者線程不止會(huì)喚醒另一種類型的線程,而且也會(huì)喚醒同類型的線程。比如在生產(chǎn)者線程中如果插入元素之后發(fā)現(xiàn)隊(duì)列還未滿,那么就可以調(diào)用notFull.signal()方法來喚醒其他可能存在的等待狀態(tài)的生產(chǎn)者線程,對(duì)于消費(fèi)者線程所使用的take()方法也是類似的處理方式。相對(duì)來說signal方法較低,而互斥鎖的lock方法成本較高,而且會(huì)影響到另一種類型線程的運(yùn)行。所以通過這種方式盡可能地少調(diào)用signalNotEmpty()和signalNotFull()方法會(huì)是一種還不錯(cuò)的優(yōu)化手段。
優(yōu)化后的put()和take()方法如下:
/** * 將指定元素插入隊(duì)列 * * @param e 待插入的對(duì)象 */ public void put(Object e) throws InterruptedException { int c = -1; putLock.lockInterruptibly(); try { while (count.get() == items.length) { // 隊(duì)列已滿時(shí)進(jìn)入休眠 // 等待隊(duì)列未滿條件得到滿足 notFull.await(); } // 執(zhí)行入隊(duì)操作,將對(duì)象e實(shí)際放入隊(duì)列中 enqueue(e); // 增加元素總數(shù) c = count.getAndIncrement(); // 如果在插入后隊(duì)列仍然沒滿,則喚醒其他等待插入的線程 if (c + 1 < items.length) notFull.signal(); } finally { putLock.unlock(); } // 如果插入之前隊(duì)列為空,才喚醒等待彈出元素的線程 // 為了防止死鎖,不能在釋放putLock之前獲取takeLock if (c == 0) signalNotEmpty(); } /** * 從隊(duì)列中彈出一個(gè)元素 * * @return 被彈出的元素 */ public Object take() throws InterruptedException { Object e; int c = -1; takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 隊(duì)列為空時(shí)進(jìn)入休眠 // 等待隊(duì)列非空條件得到滿足 notEmpty.await(); } // 執(zhí)行出隊(duì)操作,將隊(duì)列中的第一個(gè)元素彈出 e = dequeue(); // 減少元素總數(shù) c = count.getAndDecrement(); // 如果隊(duì)列在彈出一個(gè)元素后仍然非空,則喚醒其他等待隊(duì)列非空的線程 if (c - 1 > 0) notEmpty.signal(); } finally { takeLock.unlock(); } // 只有在彈出之前隊(duì)列已滿的情況下才喚醒等待插入元素的線程 // 為了防止死鎖,不能在釋放takeLock之前獲取putLock if (c == items.length) signalNotFull(); return e; }成品出爐!
恭喜大家,經(jīng)過一番漫長的探索,我們終于徹底完成了我們的阻塞隊(duì)列實(shí)現(xiàn)之旅。如果你能堅(jiān)持到這里,我相信你已經(jīng)對(duì)多線程編程的實(shí)踐方法有了非常深刻的理解。最后讓我們來看一看我們最終完成的成品代碼——一個(gè)完整的阻塞隊(duì)列實(shí)現(xiàn)吧。
完整的阻塞隊(duì)列代碼public class BlockingQueue { /** 存放元素的數(shù)組 */ private final Object[] items; /** 彈出元素的位置 */ private int takeIndex; /** 插入元素的位置 */ private int putIndex; /** 隊(duì)列中的元素總數(shù) */ private AtomicInteger count = new AtomicInteger(0); /** 插入鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** 隊(duì)列未滿的條件變量 */ private final Condition notFull = putLock.newCondition(); /** 彈出鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** 隊(duì)列非空的條件變量 */ private final Condition notEmpty = takeLock.newCondition(); /** * 指定隊(duì)列大小的構(gòu)造器 * * @param capacity 隊(duì)列大小 */ public BlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); items = new Object[capacity]; } /** * 入隊(duì)操作 * * @param e 待插入的對(duì)象 */ private void enqueue(Object e) { // 將對(duì)象e放入putIndex指向的位置 items[putIndex] = e; // putIndex向后移一位,如果已到末尾則返回隊(duì)列開頭(位置0) if (++putIndex == items.length) putIndex = 0; } /** * 出隊(duì)操作 * * @return 被彈出的元素 */ private Object dequeue() { // 取出takeIndex指向位置中的元素 // 并將該位置清空 Object e = items[takeIndex]; items[takeIndex] = null; // takeIndex向后移一位,如果已到末尾則返回隊(duì)列開頭(位置0) if (++takeIndex == items.length) takeIndex = 0; // 返回之前代碼中取出的元素e return e; } /** * 將指定元素插入隊(duì)列 * * @param e 待插入的對(duì)象 */ public void put(Object e) throws InterruptedException { int c = -1; putLock.lockInterruptibly(); try { while (count.get() == items.length) { // 隊(duì)列已滿時(shí)進(jìn)入休眠 // 等待隊(duì)列未滿條件得到滿足 notFull.await(); } // 執(zhí)行入隊(duì)操作,將對(duì)象e實(shí)際放入隊(duì)列中 enqueue(e); // 增加元素總數(shù) c = count.getAndIncrement(); // 如果在插入后隊(duì)列仍然沒滿,則喚醒其他等待插入的線程 if (c + 1 < items.length) notFull.signal(); } finally { putLock.unlock(); } // 如果插入之前隊(duì)列為空,才喚醒等待彈出元素的線程 // 為了防止死鎖,不能在釋放putLock之前獲取takeLock if (c == 0) signalNotEmpty(); } /** * 喚醒等待隊(duì)列非空條件的線程 */ private void signalNotEmpty() { // 為了喚醒等待隊(duì)列非空條件的線程,需要先獲取對(duì)應(yīng)的takeLock takeLock.lock(); try { // 喚醒一個(gè)等待非空條件的線程 notEmpty.signal(); } finally { takeLock.unlock(); } } /** * 從隊(duì)列中彈出一個(gè)元素 * * @return 被彈出的元素 */ public Object take() throws InterruptedException { Object e; int c = -1; takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 隊(duì)列為空時(shí)進(jìn)入休眠 // 等待隊(duì)列非空條件得到滿足 notEmpty.await(); } // 執(zhí)行出隊(duì)操作,將隊(duì)列中的第一個(gè)元素彈出 e = dequeue(); // 減少元素總數(shù) c = count.getAndDecrement(); // 如果隊(duì)列在彈出一個(gè)元素后仍然非空,則喚醒其他等待隊(duì)列非空的線程 if (c - 1 > 0) notEmpty.signal(); } finally { takeLock.unlock(); } // 只有在彈出之前隊(duì)列已滿的情況下才喚醒等待插入元素的線程 // 為了防止死鎖,不能在釋放takeLock之前獲取putLock if (c == items.length) signalNotFull(); return e; } /** * 喚醒等待隊(duì)列未滿條件的線程 */ private void signalNotFull() { // 為了喚醒等待隊(duì)列未滿條件的線程,需要先獲取對(duì)應(yīng)的putLock putLock.lock(); try { // 喚醒一個(gè)等待隊(duì)列未滿條件的線程 notFull.signal(); } finally { putLock.unlock(); } } }
有興趣的讀者可以把我們完成的這個(gè)阻塞隊(duì)列類和JDK中的java.util.concurrent.LinkedBlockingQueue類做一個(gè)比較,相信大家可以發(fā)現(xiàn)這兩個(gè)類非常的相似,這足以說明我們費(fèi)勁千辛萬苦實(shí)現(xiàn)的這個(gè)阻塞隊(duì)列類已經(jīng)非常接近JDK中的阻塞隊(duì)列類的質(zhì)量了。
總結(jié)恭喜大家終于完整地讀完了這篇文章!在這篇文章中,我們從一個(gè)最簡(jiǎn)單的阻塞隊(duì)列版本開始,一路解決了各種問題,最終得到了一個(gè)完整、高質(zhì)量的阻塞隊(duì)列實(shí)現(xiàn)。我們一起來回憶一下我們解決的問題吧。從最簡(jiǎn)單的阻塞隊(duì)列開始,我們首先用互斥鎖synchronized關(guān)鍵字解決了并發(fā)控制問題,保證了隊(duì)列在多線程訪問情況下的正確性。然后我們用條件變量Object.wati()、Object.notifyAll()解決了休眠喚醒問題,使隊(duì)列的效率得到了飛躍性地提高。為了保障隊(duì)列的安全性,不讓外部代碼可以訪問到我們所使用的對(duì)象鎖和條件變量,所以我們使用了顯式鎖ReentrantLock,并通過鎖對(duì)象lock的newCondition()方法創(chuàng)建了與其相對(duì)應(yīng)的條件變量對(duì)象。最后,我們對(duì)隊(duì)列中的條件變量和互斥鎖分別做了拆分,使隊(duì)列的效率得到了進(jìn)一步的提高。當(dāng)然,最后我們還加上了一點(diǎn)對(duì)喚醒操作的有條件調(diào)用優(yōu)化,使整個(gè)阻塞隊(duì)列的實(shí)現(xiàn)變得更加完善。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/74358.html
摘要:而且在大多數(shù)經(jīng)典的多線程編程資料中,阻塞隊(duì)列都是其中非常重要的一個(gè)實(shí)踐案例。甚至可以說只有自己動(dòng)手實(shí)現(xiàn)了一個(gè)阻塞隊(duì)列才能真正掌握多線程相關(guān)的。為什么會(huì)發(fā)生這種情況呢原因就是在我們實(shí)現(xiàn)的這個(gè)阻塞隊(duì)列中完全沒有線程同步機(jī)制,所以同時(shí)并發(fā)進(jìn)行的個(gè) 阻塞隊(duì)列不止是一道熱門的面試題,同時(shí)也是許多并發(fā)處理模型的基礎(chǔ),比如常用的線程池類ThreadPoolExecutor內(nèi)部就使用了阻塞隊(duì)列來保存等...
摘要:實(shí)現(xiàn)阻塞隊(duì)列在自己實(shí)現(xiàn)之前先搞清楚阻塞隊(duì)列的幾個(gè)特點(diǎn)基本隊(duì)列特性先進(jìn)先出。消費(fèi)隊(duì)列空時(shí)會(huì)阻塞直到寫入線程寫入了隊(duì)列數(shù)據(jù)后喚醒消費(fèi)線程。 實(shí)現(xiàn)Java 阻塞隊(duì)列 在自己實(shí)現(xiàn)之前先搞清楚阻塞隊(duì)列的幾個(gè)特點(diǎn):基本隊(duì)列特性:先進(jìn)先出。寫入隊(duì)列空間不可用時(shí)會(huì)阻塞。獲取隊(duì)列數(shù)據(jù)時(shí)當(dāng)隊(duì)列為空時(shí)將阻塞。 實(shí)現(xiàn)隊(duì)列的方式多種,總的來說就是數(shù)組和鏈表;其實(shí)我們只需要搞清楚其中一個(gè)即可,不同的特性主要表現(xiàn)為...
摘要:提交任務(wù)當(dāng)創(chuàng)建了一個(gè)線程池之后我們就可以將任務(wù)提交到線程池中執(zhí)行了。提交任務(wù)到線程池中相當(dāng)簡(jiǎn)單,我們只要把原來傳入類構(gòu)造器的對(duì)象傳入線程池的方法或者方法就可以了。 我們一般不會(huì)選擇直接使用線程類Thread進(jìn)行多線程編程,而是使用更方便的線程池來進(jìn)行任務(wù)的調(diào)度和管理。線程池就像共享單車,我們只要在我們有需要的時(shí)候去獲取就可以了。甚至可以說線程池更棒,我們只需要把任務(wù)提交給它,它就會(huì)在合...
摘要:自己實(shí)現(xiàn)在自己實(shí)現(xiàn)之前先搞清楚阻塞隊(duì)列的幾個(gè)特點(diǎn)基本隊(duì)列特性先進(jìn)先出。消費(fèi)隊(duì)列空時(shí)會(huì)阻塞直到寫入線程寫入了隊(duì)列數(shù)據(jù)后喚醒消費(fèi)線程。最終的隊(duì)列大小為,可見線程也是安全的。 showImg(https://segmentfault.com/img/remote/1460000018811340); 前言 較長一段時(shí)間以來我都發(fā)現(xiàn)不少開發(fā)者對(duì) jdk 中的 J.U.C(java.util.c...
摘要:最后,我們會(huì)通過對(duì)源代碼的剖析深入了解線程池的運(yùn)行過程和具體設(shè)計(jì),真正達(dá)到知其然而知其所以然的水平。創(chuàng)建線程池既然線程池是一個(gè)類,那么最直接的使用方法一定是一個(gè)類的對(duì)象,例如。單線程線程池單線程線程 我們一般不會(huì)選擇直接使用線程類Thread進(jìn)行多線程編程,而是使用更方便的線程池來進(jìn)行任務(wù)的調(diào)度和管理。線程池就像共享單車,我們只要在我們有需要的時(shí)候去獲取就可以了。甚至可以說線程池更棒,...
閱讀 1303·2021-11-22 09:34
閱讀 2167·2021-10-08 10:18
閱讀 1729·2021-09-29 09:35
閱讀 2460·2019-08-29 17:20
閱讀 2141·2019-08-29 15:36
閱讀 3407·2019-08-29 13:52
閱讀 783·2019-08-29 12:29
閱讀 1187·2019-08-28 18:10