摘要:最后一直調用函數判斷節點是否被轉移到隊列上,也就是中等待獲取鎖的隊列。這樣的話,函數中調用函數就會返回,導致函數進入最后一步重新獲取鎖的狀態。函數其實就做了一件事情,就是不斷嘗試調用函數,將隊首的一個節點轉移到隊列中,直到轉移成功。
?我在前段時間寫了一篇關于AQS源碼解析的文章
AbstractQueuedSynchronizer超詳細原理解析,在文章里邊我說JUC包中的大部分多線程相關的類都和AQS相關,今天我們就學習一下依賴于AQS來實現的阻塞隊列BlockingQueue的實現原理。本文中的源碼未加說明即來自于以ArrayBlockingQueue。
?相信大多數同學在學習線程池時會了解阻塞隊列的概念,熟記各種類型的阻塞隊列對線程池初始化的影響。當從阻塞隊列獲取元素但是隊列為空時,當前線程會阻塞直到另一個線程向阻塞隊列中添加一個元素;類似的,當向一個阻塞隊列加入元素時,如果隊列已經滿了,當前線程也會阻塞直到另外一個線程從隊列中讀取一個元素。阻塞隊列一般都是先進先出的,用來實現生產者和消費者模式。當發生上述兩種情況時,阻塞隊列有四種不同的處理方式,這四種方式分別為拋出異常,返回特殊值(null或在是false),阻塞當前線程直到執行結束,最后一種是只阻塞固定時間,到時后還無法執行成功就放棄操作。這些方法都總結在下邊這種表中了。
?我們就只分析put和take方法。
put和take函數?我們都知道,使用同步隊列可以很輕松的實現生產者-消費者模式,其實,同步隊列就是按照生產者-消費者的模式來實現的,我們可以將put函數看作生產者的操作,take是消費者的操作。
?我們首先看一下ArrayListBlock的構造函數。它初始化了put和take函數中使用到的關鍵成員變量,分別是ReentrantLock和Condition。
public ArrayBlockingQueue(int capacity, boolean fair) { this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
?ReentrantLock是AQS的子類,其newCondition函數返回的Condition接口實例是定義在AQS類內部的ConditionObject實現類。它可以直接調用AQS相關的函數。
?put函數會在隊列末尾添加元素,如果隊列已經滿了,無法添加元素的話,就一直阻塞等待到可以加入為止。函數的源碼如下所示。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //先獲得鎖 try { while (count == items.length) //如果隊列滿了,就NotFull這個Condition對象上進行等待 notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { final Object[] items = this.items; items[putIndex] = x; //這里可以注意的是ArrayBlockingList實際上使用Array實現了一個環形數組, //當putIndex達到最大時,就返回到起點,繼續插入, //當然,如果此時0位置的元素還沒有被取走, //下次put時,就會因為cout == item.length未被阻塞。 if (++putIndex == items.length) putIndex = 0; count++; //因為插入了元素,通知等待notEmpty事件的線程。 notEmpty.signal(); }
?我們會發現put函數使用了wait/notify的機制。與一般生產者-消費者的實現方式不同,同步隊列使用ReentrantLock和Condition相結合的先獲得鎖,再等待的機制;而不是Synchronized和Object.wait的機制。這里的區別我們下一節再詳細講解。
?看完了生產者相關的put函數,我們再來看一下消費者調用的take函數。take函數在隊列為空時會被阻塞,一直到阻塞隊列加入了新的元素。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) //如果隊列為空,那么在notEmpty對象上等待, //當put函數調用時,會調用notEmpty的notify進行通知。 notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { E x = (E) items[takeIndex]; items[takeIndex] = null; //取出takeIndex位置的元素 if (++takeIndex == items.length) //如果到了尾部,將指針重新調整到頭部 takeIndex = 0; count--; .... //通知notFull對象上等待的線程 notFull.signal(); return x; }await操作
?我們發現ArrayBlockingList并沒有使用Object.wait,而是使用的Condition.await,這是為什么呢?其中又有哪些原因呢?
?Condition對象可以提供和Object的wait和notify一樣的行為,但是后者必須先獲取synchronized這個內置的monitor鎖,才能調用;而Condition則必須先獲取ReentrantLock。這兩種方式在阻塞等待時都會將相應的鎖釋放掉,但是Condition的等待可以中斷,這是二者唯一的區別。
?我們先來看一下Condition的wait函數,wait函數的流程大致如下圖所示。
?wait函數主要有三個步驟。一是調用addConditionWaiter 函數,在condition wait queue隊列中添加一個節點,代表當前線程在等待一個消息。然后調用fullyRelease函數,將持有的鎖釋放掉,調用的是AQS的函數,不清楚的同學可以查看本篇開頭的介紹的文章。最后一直調用isOnSyncQueue函數判斷節點是否被轉移到sync queue隊列上,也就是AQS中等待獲取鎖的隊列。如果沒有,則進入阻塞狀態,如果已經在隊列上,則調用acquireQueued函數重新獲取鎖。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); //在condition wait隊列上添加新的節點 Node node = addConditionWaiter(); //釋放當前持有的鎖 int savedState = fullyRelease(node); int interruptMode = 0; //由于node在之前是添加到condition wait queue上的,現在判斷這個node //是否被添加到Sync的獲得鎖的等待隊列上,Sync就是AQS的子類 //node在condition queue上說明還在等待事件的notify, //notify函數會將condition queue 上的node轉化到Sync的隊列上。 while (!isOnSyncQueue(node)) { //node還沒有被添加到Sync Queue上,說明還在等待事件通知 //所以調用park函數來停止線程執行 LockSupport.park(this); //判斷是否被中斷,線程從park函數返回有兩種情況,一種是 //其他線程調用了unpark,另外一種是線程被中斷 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //代碼執行到這里,已經有其他線程調用notify函數,或則被中斷,該線程可以繼續執行,但是必須先 //再次獲得調用await函數時的鎖.acquireQueued函數在AQS文章中做了介紹. if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; .... } final int fullyRelease(Node node) { //AQS的方法,當前已經在鎖中了,所以直接操作 boolean failed = true; try { int savedState = getState(); //獲取state當前的值,然后保存,以待以后恢復 // release函數是AQS的函數,不清楚的同學請看開頭介紹的文章。 if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } private int checkInterruptWhileWaiting(Node node) { //中斷可能發生在兩個階段中,一是在等待signa時,另外一個是在獲得signal之后 return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; } final boolean transferAfterCancelledWait(Node node) { //這里要和下邊的transferForSignal對應著看,這是線程中斷進入的邏輯.那邊是signal的邏輯 //兩邊可能有并發沖突,但是成功的一方必須調用enq來進入acquire lock queue中. if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { enq(node); return true; } //如果失敗了,說明transferForSignal那邊成功了,等待node 進入acquire lock queue while (!isOnSyncQueue(node)) Thread.yield(); return false; }signal操作
?signal函數將condition wait queue隊列中隊首的線程節點轉移等待獲取鎖的sync queue隊列中。這樣的話,wait函數中調用isOnSyncQueue函數就會返回true,導致wait函數進入最后一步重新獲取鎖的狀態。
?我們這里來詳細解析一下condition wait queue和sync queue兩個隊列的設計原理。condition wait queue是等待消息的隊列,因為阻塞隊列為空而進入阻塞狀態的take函數操作就是在等待阻塞隊列不為空的消息。而sync queue隊列則是等待獲取鎖的隊列,take函數獲得了消息,就可以運行了,但是它還必須等待獲取鎖之后才能真正進行運行狀態。
?signal函數的示意圖如下所示。
?signal函數其實就做了一件事情,就是不斷嘗試調用transferForSignal 函數,將condition wait queue隊首的一個節點轉移到sync queue隊列中,直到轉移成功。因為一次轉移成功,就代表這個消息被成功通知到了等待消息的節點。
public final void signal() { if (!isHeldExclusively()) //如果當前線程沒有獲得鎖,拋出異常 throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) //將Condition wait queue中的第一個node轉移到acquire lock queue中. doSignal(first); } private void doSignal(Node first) { do { //由于生產者的signal在有消費者等待的情況下,必須要通知 //一個消費者,所以這里有一個循環,直到隊列為空 //把first 這個node從condition queue中刪除掉 //condition queue的頭指針指向node的后繼節點,如果node后續節點為null,那么也將尾指針也置為null if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); //transferForSignal將node轉而添加到Sync的acquire lock 隊列 } final boolean transferForSignal(Node node) { //如果設置失敗,說明該node已經被取消了,所以返回false,讓doSignal繼續向下通知其他未被取消的node if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; //將node添加到acquire lock queue中. Node p = enq(node); int ws = p.waitStatus; //需要注意的是這里的node進行了轉化 //ws>0代表canceled的含義所以直接unpark線程 //如果compareAndSetWaitStatus失敗,所以直接unpark,讓線程繼續執行await中的 //進行isOnSyncQueue判斷的while循環,然后進入acquireQueue函數. //這里失敗的原因可能是Lock其他線程釋放掉了鎖,同步設置p的waitStatus //如果compareAndSetWaitStatus成功了呢?那么該node就一直在acquire lock queue中 //等待鎖被釋放掉再次搶奪鎖,然后再unpark if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }后記
?后邊一篇文章主要講解如何自己使用AQS來創建符合自己業務需求的鎖,請大家繼續關注我的文章啦.一起進步偶。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/73031.html
摘要:線程通信傳統的線程通信方法概述方法導致當前線程等待,直到其他線程調用該同步監視器的方法或方法來喚醒該線程。運行結果如下線程組和未處理的異常表示線程組,可以對一批線程進行分類管理。對線程組的控制相當于同時控制這批線程。 線程通信 傳統的線程通信 方法概述: wait方法:導致當前線程等待,直到其他線程調用該同步監視器的notify()方法或notifyAll()方法來喚醒該線程。 w...
摘要:如果隊列已滿,這個時候寫操作的線程進入到寫線程隊列排隊,等待讀線程將隊列元素移除騰出空間,然后喚醒寫線程隊列的第一個等待線程。數據必須從某個寫線程交給某個讀線程,而不是寫到某個隊列中等待被消費。 前言 本文直接參考 Doug Lea 寫的 Java doc 和注釋,這也是我們在學習 java 并發包時最好的材料了。希望大家能有所思、有所悟,學習 Doug Lea 的代碼風格,并將其優雅...
摘要:創建一個阻塞隊列生產者生產,目前總共有消費者消費,目前總共有原文鏈接更多教程 原文鏈接 更多教程 本文概要 生產者和消費者問題是線程模型中老生常談的問題,也是面試中經常遇到的問題。光在Java中的實現方式多達數十種,更不用說加上其他語言的實現方式了。那么我們該如何學習呢? 本文會通過精講wait()和notify()方法實現生產者-消費者模型,來學習生產者和消費者問題的原理。 目的...
摘要:一和并發包中的和主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當于的使用。寫鎖與讀鎖之間互斥,一個線程在寫時,不允許讀操作。的注意事項不支持重入,即不可反復獲取同一把鎖。沒有返回值,也就是說無法獲取執行結果。 一、Lock 和 Condition Java 并發包中的 Lock 和 Condition 主要解決的是線程的互斥和同步問題,這兩者的配合使用,相當于 synchron...
閱讀 3344·2021-11-10 11:36
閱讀 3244·2021-10-08 10:21
閱讀 2841·2021-09-29 09:35
閱讀 2416·2021-09-22 16:06
閱讀 3959·2021-09-09 09:33
閱讀 1327·2019-08-30 15:44
閱讀 3171·2019-08-30 10:59
閱讀 2982·2019-08-29 15:32