摘要:在隊尾插入指定元素,如果隊列已滿,則阻塞線程加鎖隊列已滿。這里必須用,防止虛假喚醒在隊列上等待之所以這樣做,是防止線程被意外喚醒,不經(jīng)再次判斷就直接調(diào)用方法。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、ArrayBlockingQueue簡介
ArrayBlockingQueue是在JDK1.5時,隨著J.U.C包引入的一種阻塞隊列,它實現(xiàn)了BlockingQueue接口,底層基于數(shù)組實現(xiàn):
ArrayBlockingQueue是一種有界阻塞隊列,在初始構(gòu)造的時候需要指定隊列的容量。具有如下特點:
隊列的容量一旦在構(gòu)造時指定,后續(xù)不能改變;
插入元素時,在隊尾進(jìn)行;刪除元素時,在隊首進(jìn)行;
隊列滿時,調(diào)用特定方法插入元素會阻塞線程;隊列空時,刪除元素也會阻塞線程;
支持公平/非公平策略,默認(rèn)為非公平策略。
這里的公平策略,是指當(dāng)線程從阻塞到喚醒后,以最初請求的順序(FIFO)來添加或刪除元素;非公平策略指線程被喚醒后,誰先搶占到鎖,誰就能往隊列中添加/刪除順序,是隨機(jī)的。二、ArrayBlockingQueue原理 構(gòu)造
ArrayBlockingQueue提供了三種構(gòu)造器:
/** * 指定隊列初始容量的構(gòu)造器. */ public ArrayBlockingQueue(int capacity) { this(capacity, false); }
/** * 指定隊列初始容量和公平/非公平策略的構(gòu)造器. */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); // 利用獨占鎖的策略 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
/** * 根據(jù)已有集合構(gòu)造隊列 */ public ArrayBlockingQueue(int capacity, boolean fair, Collection extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // 這里加鎖是用于保證items數(shù)組的可見性 try { int i = 0; try { for (E e : c) { checkNotNull(e); // 不能有null元素 items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; // 如果隊列已滿,則重置puIndex索引為0 } finally { lock.unlock(); } }
核心就是第二種構(gòu)造器,從構(gòu)造器也可以看出,ArrayBlockingQueue在構(gòu)造時就指定了內(nèi)部數(shù)組的大小,并通過ReentrantLock來保證并發(fā)環(huán)境下的線程安全。
ArrayBlockingQueue的公平/非公平策略其實就是內(nèi)部ReentrantLock對象的策略,此外構(gòu)造時還創(chuàng)建了兩個Condition對象。在隊列滿時,插入線程需要在notFull上等待;當(dāng)隊列空時,刪除線程會在notEmpty上等待:
public class ArrayBlockingQueue核心方法extends AbstractQueue implements BlockingQueue , java.io.Serializable { /** * 內(nèi)部數(shù)組 */ final Object[] items; /** * 下一個待刪除位置的索引: take, poll, peek, remove方法使用 */ int takeIndex; /** * 下一個待插入位置的索引: put, offer, add方法使用 */ int putIndex; /** * 隊列中的元素個數(shù) */ int count; /** * 全局鎖 */ final ReentrantLock lock; /** * 非空條件隊列:當(dāng)隊列空時,線程在該隊列等待獲取 */ private final Condition notEmpty; /** * 非滿條件隊列:當(dāng)隊列滿時,線程在該隊列等待插入 */ private final Condition notFull; //... }
ArrayBlockingQueue會阻塞線程的方法一共4個:put(E e)、offer(e, time, unit)和take()、poll(time, unit),我們先來看插入元素的方法。
插入元素——put(E e)
插入元素的邏輯很簡單,用ReentrantLock來保證線程安全,當(dāng)隊列滿時,則調(diào)用線程會在notFull條件隊列上等待,否則就調(diào)用enqueue方法入隊。
/** * 在隊尾插入指定元素,如果隊列已滿,則阻塞線程. */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); // 加鎖 try { while (count == items.length) // 隊列已滿。這里必須用while,防止虛假喚醒 notFull.await(); // 在notFull隊列上等待 enqueue(e); // 隊列未滿, 直接入隊 } finally { lock.unlock(); } }
這里需要注意一點,隊列已滿的時候,是通過while循環(huán)判斷的,這其實是多線程設(shè)計模式中的Guarded Suspension模式:
while (count == items.length) // 隊列已滿。這里必須用while,防止虛假喚醒 notFull.await(); // 在notFull隊列上等待
之所以這樣做,是防止線程被意外喚醒,不經(jīng)再次判斷就直接調(diào)用enqueue方法。
enqueue方法:
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) // 隊列已滿,則重置索引為0 putIndex = 0; count++; // 元素個數(shù)+1 notEmpty.signal(); // 喚醒一個notEmpty上的等待線程(可以來隊列取元素了) }
刪除元素——take()
刪除元素的邏輯和插入元素類似,區(qū)別就是:刪除元素時,如果隊列空了,則線程需要在notEmpty條件隊列上等待。
/** * 從隊首刪除一個元素, 如果隊列為空, 則阻塞線程 */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) // 隊列為空, 則線程在notEmpty條件隊列等待 notEmpty.await(); return dequeue(); // 隊列非空,則出隊一個元素 } finally { lock.unlock(); } }
隊列非空時,調(diào)用dequeue方法出隊一個元素:
private E dequeue() { final Object[] items = this.items; E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) // 如果隊列已空 takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); // 喚醒一個notFull上的等待線程(可以插入元素到隊列了) return x; }環(huán)形隊列
從上面的入隊/出隊操作,可以看出,ArrayBlockingQueue的內(nèi)部數(shù)組其實是一種環(huán)形結(jié)構(gòu)。
假設(shè)ArrayBlockingQueue的容量大小為6,我們來看下整個入隊過程:
①初始時
②插入元素“9”
③插入元素“2”、“10”、“25”、“93”
④插入元素“90”
注意,此時再插入一個元素“90”,則putIndex變成6,等于隊列容量6,由于是循環(huán)隊列,所以會將tableIndex重置為0:
這是隊列已經(jīng)滿了(count==6),如果再有線程嘗試插入元素,并不會覆蓋原有值,而是被阻塞。
我們再來看下出隊過程:
①出隊元素“9”
②出隊元素“2”、“10”、“25”、“93”
③出隊元素“90”
注意,此時再出隊一個元素“90”,則tabeIndex變成6,等于隊列容量6,由于是循環(huán)隊列,所以會將tableIndex重置為0:
這是隊列已經(jīng)空了(count==0),如果再有線程嘗試出隊元素,則會被阻塞。
三、總結(jié)ArrayBlockingQueue利用了ReentrantLock來保證線程的安全性,針對隊列的修改都需要加全局鎖。在一般的應(yīng)用場景下已經(jīng)足夠。對于超高并發(fā)的環(huán)境,由于生產(chǎn)者-消息者共用一把鎖,可能出現(xiàn)性能瓶頸。
另外,由于ArrayBlockingQueue是有界的,且在初始時指定隊列大小,所以如果初始時需要限定消息隊列的大小,則ArrayBlockingQueue 比較合適。后續(xù),我們會介紹另一種基于單鏈表實現(xiàn)的阻塞隊列——LinkedBlockingQueue,該隊列的最大特點是使用了“兩把鎖”,以提升吞吐量。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/77023.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計模式,設(shè)計了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對等進(jìn)行補(bǔ)充增強(qiáng)。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現(xiàn)了接口,在多線程進(jìn)階二五之框架中,我們提到過實現(xiàn)了接口,以提供和排序相關(guān)的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設(shè)計的有序工具類。唯一的區(qū)別是針對的僅僅是鍵值,針對鍵值對進(jìn)行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當(dāng)有多個線程同時進(jìn)行寫操作時,才會進(jìn)行同步。可以看到,上述方法返回一個迭代器對象,的迭代是在舊數(shù)組上進(jìn)行的,當(dāng)創(chuàng)建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發(fā)修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過了,底層基于跳表實現(xiàn),其操作平均時間復(fù)雜度均為。事實上,內(nèi)部引用了一個對象,以組合方式,委托對象實現(xiàn)了所有功能。線程安全內(nèi)存的使用較多迭代是對快照進(jìn)行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現(xiàn)了接口。該類在構(gòu)造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內(nèi)部通過來保證線程安全,所以的整體實現(xiàn)時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發(fā)于一世流云專欄:ht...
閱讀 3303·2021-11-24 09:39
閱讀 2808·2021-10-12 10:20
閱讀 1916·2019-08-30 15:53
閱讀 3080·2019-08-30 14:14
閱讀 2609·2019-08-29 15:36
閱讀 1127·2019-08-29 14:11
閱讀 1959·2019-08-26 13:51
閱讀 3414·2019-08-26 13:23