摘要:在章節中,我們說過,維護了一把全局鎖,無論是出隊還是入隊,都共用這把鎖,這就導致任一時間點只有一個線程能夠執行。入隊鎖對應的是條件隊列,出隊鎖對應的是條件隊列,所以每入隊一個元素,應當立即去喚醒可能阻塞的其它入隊線程。
本文首發于一世流云專欄:https://segmentfault.com/blog...一、LinkedBlockingQueue簡介
LinkedBlockingQueue是在JDK1.5時,隨著J.U.C包引入的一種阻塞隊列,它實現了BlockingQueue接口,底層基于單鏈表實現:
LinkedBlockingQueue是一種近似有界阻塞隊列,為什么說近似?因為LinkedBlockingQueue既可以在初始構造時就指定隊列的容量,也可以不指定,如果不指定,那么它的容量大小默認為Integer.MAX_VALUE。
LinkedBlockingQueue除了底層數據結構(單鏈表)與ArrayBlockingQueue不同外,另外一個特點就是:
它維護了兩把鎖——takeLock和putLock。
takeLock用于控制出隊的并發,putLock用于入隊的并發。這也就意味著,同一時刻,只能只有一個線程能執行入隊/出隊操作,其余入隊/出隊線程會被阻塞;但是,入隊和出隊之間可以并發執行,即同一時刻,可以同時有一個線程進行入隊,另一個線程進行出隊,這樣就可以提升吞吐量。
在ArrayBlockingQueue章節中,我們說過,ArrayBlockingQueue維護了一把全局鎖,無論是出隊還是入隊,都共用這把鎖,這就導致任一時間點只有一個線程能夠執行。那么對于“生產者-消費者”模式來說,意味著生產者和消費者不能并發執行。二、LinkedBlockingQueue原理 構造
LinkedBlockingQueue提供了三種構造器:
/** * 默認構造器. * 隊列容量為Integer.MAX_VALUE. */ public LinkedBlockingQueue() { this(Integer.MAX_VALUE); }
/** * 顯示指定隊列容量的構造器 */ public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node(null); }
/** * 從已有集合構造隊列. * 隊列容量為Integer.MAX_VALUE */ public LinkedBlockingQueue(Collection extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // 這里加鎖僅僅是為了保證可見性 try { int n = 0; for (E e : c) { if (e == null) // 隊列不能包含null元素 throw new NullPointerException(); if (n == capacity) // 隊列已滿 throw new IllegalStateException("Queue full"); enqueue(new Node(e)); // 隊尾插入元素 ++n; } count.set(n); // 設置元素個數 } finally { putLock.unlock(); } }
可以看到,如果不指定容量,那么它的容量大小默認為Integer.MAX_VALUE。另外,LinkedBlockingQueue使用了一個原子變量AtomicInteger記錄隊列中元素的個數,以保證入隊/出隊并發修改元素時的數據一致性。
public class LinkedBlockingQueueextends AbstractQueue implements BlockingQueue , java.io.Serializable { /** * 隊列容量. * 如果不指定, 則為Integer.MAX_VALUE */ private final int capacity; /** * 隊列中的元素個數 */ private final AtomicInteger count = new AtomicInteger(); /** * 隊首指針. * head.item == null */ transient Node head; /** * 隊尾指針. * last.next == null */ private transient Node last; /** * 出隊鎖 */ private final ReentrantLock takeLock = new ReentrantLock(); /** * 隊列空時,出隊線程在該條件隊列等待 */ private final Condition notEmpty = takeLock.newCondition(); /** * 入隊鎖 */ private final ReentrantLock putLock = new ReentrantLock(); /** * 隊列滿時,入隊線程在該條件隊列等待 */ private final Condition notFull = putLock.newCondition(); /** * 鏈表結點定義 */ static class Node { E item; Node next; // 后驅指針 Node(E x) { item = x; } } //... }
構造完成后,LinkedBlockingQueue的初始結構如下:
插入部分元素后的LinkedBlockingQueue結構:
核心方法由于接口和ArrayBlockingQueue完全一樣,所以LinkedBlockingQueue會阻塞線程的方法也一共有4個:put(E e)、offer(e, time, unit)和take()、poll(time, unit),我們先來看插入元素的方法。
插入元素——put(E e)
/** * 在隊尾插入指定的元素. * 如果隊列已滿,則阻塞線程. */ public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Nodenode = new Node (e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 獲取“入隊鎖” try { while (count.get() == capacity) { // 隊列已滿, 則線程在notFull上等待 notFull.await(); } enqueue(node); // 將新結點鏈接到“隊尾” /** * c+1 表示的元素個數. * 如果,則喚醒一個“入隊線程” */ c = count.getAndIncrement(); // c表示入隊前的隊列元素個數 if (c + 1 < capacity) // 入隊后隊列未滿, 則喚醒一個“入隊線程” notFull.signal(); } finally { putLock.unlock(); } if (c == 0) // 隊列初始為空, 則喚醒一個“出隊線程” signalNotEmpty(); }
插入元素時,首先需要獲得“入隊鎖”,如果隊列滿了,則當前線程需要在notFull條件隊列等待;否則,將新元素鏈接到隊列尾部。
這里需要注意的是兩個地方:
①每入隊一個元素后,如果隊列還沒滿,則需要喚醒其它可能正在等待的“入隊線程”:
/** * c+1 表示的元素個數. * 如果,則喚醒一個“入隊線程” */ c = count.getAndIncrement(); // c表示入隊前的隊列元素個數 if (c + 1 < capacity) // 入隊后隊列未滿, 則喚醒一個“入隊線程” notFull.signal();
② 每入隊一個元素,都要判斷下隊列是否空了,如果空了,說明可能存在正在等待的“出隊線程”,需要喚醒它:
if (c == 0) // 隊列為空, 則喚醒一個“出隊線程” signalNotEmpty();
這里為什么不像ArrayBlockingQueue那樣,入隊完成后,直接喚醒一個在notEmpty上等待的出隊線程?
因為ArrayBlockingQueue中,入隊/出隊用的是同一把鎖,兩者不會并發執行,所以每入隊一個元素(拿到鎖),就可以通知可能正在等待的“出隊線程”。(同一個鎖的兩個條件隊列:notEmpty、notFull)
ArrayBlockingQueue中的enqueue方法:
private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) // 隊列已滿,則重置索引為0 putIndex = 0; count++; // 元素個數+1 notEmpty.signal(); // 喚醒一個notEmpty上的等待線程(可以來隊列取元素了) }
而LinkedBlockingQueue中,入隊/出隊用的是兩把鎖,入隊/出隊是會并發執行的。入隊鎖對應的是notFull條件隊列,出隊鎖對應的是notEmpty條件隊列,所以每入隊一個元素,應當立即去喚醒可能阻塞的其它入隊線程。當隊列為空時,說明后面再來“出隊線程”,一定都會阻塞,所以此時可以去喚醒一個出隊線程,以提升性能。
試想以下,如果去掉上面的①和②,當入隊線程拿到“入隊鎖”,入隊元素后,直接嘗試喚醒出隊線程,會要求去拿出隊鎖,這樣持有鎖A的同時,再去嘗試獲取鎖B,很可能引起死鎖,就算通過打破死鎖的條件避免死鎖,每次操作同時獲取兩把鎖也會降低性能。
刪除元素——table()
刪除元素的邏輯和插入元素類似。刪除元素時,首先需要獲得“出隊鎖”,如果隊列為空,則當前線程需要在notEmpty條件隊列等待;否則,從隊首出隊一個元素:
/** * 從隊首出隊一個元素 */ public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; // 獲取“出隊鎖” takeLock.lockInterruptibly(); try { while (count.get() == 0) { // 隊列為空, 則阻塞線程 notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); // c表示出隊前的元素個數 if (c > 1) // 出隊前隊列非空, 則喚醒一個出隊線程 notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) // 隊列初始為滿,則喚醒一個入隊線程 signalNotFull(); return x; }
/** * 隊首出隊一個元素. */ private E dequeue() { Nodeh = head; Node first = h.next; h.next = h; // help GC head = first; E x = first.item; first.item = null; return x; }
上面需要的注意點和插入元素一樣:
①每出隊一個元素前,如果隊列非空,則需要喚醒其它可能正在等待的“出隊線程”:
c = count.getAndDecrement(); // c表示出隊前的元素個數 if (c > 1) // 出隊前隊列非空, 則喚醒一個出隊線程 notEmpty.signal();
② 每入隊一個元素,都要判斷下隊列是否滿,如果是滿的,說明可能存在正在等待的“入隊線程”,需要喚醒它:
if (c == capacity) // 隊列初始為滿,則喚醒一個入隊線程 signalNotFull();三、總結
歸納一下,LinkedBlockingQueue和ArrayBlockingQueue比較主要有以下區別:
隊列大小不同。ArrayBlockingQueue初始構造時必須指定大小,而LinkedBlockingQueue構造時既可以指定大小,也可以不指定(默認為Integer.MAX_VALUE,近似于無界);
底層數據結構不同。ArrayBlockingQueue底層采用數組作為數據存儲容器,而LinkedBlockingQueue底層采用單鏈表作為數據存儲容器;
兩者的加鎖機制不同。ArrayBlockingQueue使用一把全局鎖,即入隊和出隊使用同一個ReentrantLock鎖;而LinkedBlockingQueue進行了鎖分離,入隊使用一個ReentrantLock鎖(putLock),出隊使用另一個ReentrantLock鎖(takeLock);
LinkedBlockingQueue不能指定公平/非公平策略(默認都是非公平),而ArrayBlockingQueue可以指定策略。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77045.html
摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現了接口。該類在構造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內部通過來保證線程安全,所以的整體實現時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發于一世流云專欄:ht...
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現了接口,在多線程進階二五之框架中,我們提到過實現了接口,以提供和排序相關的功能,維持元素的有序性,所以就是一種為并發環境設計的有序工具類。唯一的區別是針對的僅僅是鍵值,針對鍵值對進行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發于一世流云專欄:https://seg...
摘要:僅僅當有多個線程同時進行寫操作時,才會進行同步。可以看到,上述方法返回一個迭代器對象,的迭代是在舊數組上進行的,當創建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發于一世流云專欄:https://...
摘要:我們之前已經介紹過了,底層基于跳表實現,其操作平均時間復雜度均為。事實上,內部引用了一個對象,以組合方式,委托對象實現了所有功能。線程安全內存的使用較多迭代是對快照進行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發于一世流云專欄:https://segmentfa...
閱讀 3326·2021-11-19 11:36
閱讀 2927·2021-09-27 13:34
閱讀 1990·2021-09-22 15:17
閱讀 2404·2019-08-30 13:49
閱讀 754·2019-08-26 13:58
閱讀 1359·2019-08-26 10:47
閱讀 2538·2019-08-23 18:05
閱讀 600·2019-08-23 14:25