摘要:前言本篇文章是基于線程間的同步與通信和這篇文章寫的,在那篇文章中,我們分析了接口所定義的方法,本篇我們就來看看對于接口的這些接口方法的具體實現。因此,條件隊列在出隊時,線程并不持有鎖。
前言
本篇文章是基于線程間的同步與通信(4)——Lock 和 Condtion 這篇文章寫的,在那篇文章中,我們分析了Condition接口所定義的方法,本篇我們就來看看AQS對于Condition接口的這些接口方法的具體實現。
下文中筆者將假設讀者已經閱讀過那篇文章,或者已經了解了相關的背景知識。
系列文章目錄
概述我們在前面介紹Conditon的時候說過,Condition接口的await/signal機制是設計用來代替監視器鎖的wait/notify機制 的,因此,與監視器鎖的wait/notify機制對照著學習有助于我們更好的理解Conditon接口:
Object 方法 | Condition 方法 | 區別 |
---|---|---|
void wait() | void await() | |
void wait(long timeout) | long awaitNanos(long nanosTimeout) | 時間單位,返回值 |
void wait(long timeout, int nanos) | boolean await(long time, TimeUnit unit) | 時間單位,參數類型,返回值 |
void notify() | void signal() | |
void notifyAll() | void signalAll() | |
- | void awaitUninterruptibly() | Condition獨有 |
- | boolean awaitUntil(Date deadline) | Condition獨有 |
這里先做一下說明,本文說wait方法時,是泛指wait()、wait(long timeout)、wait(long timeout, int nanos) 三個方法,當需要指明某個特定的方法時,會帶上相應的參數。同樣的,說notify方法時,也是泛指notify(),notifyAll()方法,await方法和signal方法以此類推。
首先,我們通過wait/notify機制來類比await/signal機制:
調用wait方法的線程首先必須是已經進入了同步代碼塊,即已經獲取了監視器鎖;與之類似,調用await方法的線程首先必須獲得lock鎖
調用wait方法的線程會釋放已經獲得的監視器鎖,進入當前監視器鎖的等待隊列(wait set)中;與之類似,調用await方法的線程會釋放已經獲得的lock鎖,進入到當前Condtion對應的條件隊列中。
調用監視器鎖的notify方法會喚醒等待在該監視器鎖上的線程,這些線程將開始參與鎖競爭,并在獲得鎖后,從wait方法處恢復執行;與之類似,調用Condtion的signal方法會喚醒對應的條件隊列中的線程,這些線程將開始參與鎖競爭,并在獲得鎖后,從await方法處開始恢復執行。
實戰由于前面我們已經學習過了監視器鎖的wait/notify機制,await/signal的用法基本類似。在正式分析源碼之前,我們先來看一個使用condition的實例:
</>復制代碼
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
// 生產者方法,往數組里面寫數據
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); //數組已滿,沒有空間時,掛起等待,直到數組“非滿”(notFull)
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
// 因為放入了一個數據,數組肯定不是空的了
// 此時喚醒等待這notEmpty條件上的線程
notEmpty.signal();
} finally {
lock.unlock();
}
}
// 消費者方法,從數組里面拿數據
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await(); // 數組是空的,沒有數據可拿時,掛起等待,直到數組非空(notEmpty)
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
// 因為拿出了一個數據,數組肯定不是滿的了
// 此時喚醒等待這notFull條件上的線程
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
這是java官方文檔提供的例子,是一個典型的生產者-消費者模型。這里在同一個lock鎖上,創建了兩個條件隊列notFull, notEmpty。當數組已滿,沒有存儲空間時,put方法在notFull條件上等待,直到數組“not full”;當數組空了,沒有數據可讀時,take方法在notEmpty條件上等待,直到數組“not empty”,而notEmpty.signal()和notFull.signal()則用來喚醒等待在這個條件上的線程。
注意,上面所說的,在notFull notEmpty條件上等待事實上是指線程在條件隊列(condition queue)上等待,當該線程被相應的signal方法喚醒后,將進入到我們前面三篇介紹的sync queue中去爭鎖,爭到鎖后才能能await方法處返回。這里接牽涉到兩種隊列了——condition queue和sync queue,它們都定義在AQS中。
為了防止大家被AQS中的隊列弄暈,這里我們先理理清:
同步隊列 vs 條件隊列 sync queue首先,在逐行分析AQS源碼(1)——獨占鎖的獲取這篇中我們說過,所有等待鎖的線程都會被包裝成Node扔到一個同步隊列中。該同步隊列如下:
sync queue是一個雙向鏈表,我們使用prev、next屬性來串聯節點。但是在這個同步隊列中,我們一直沒有用到nextWaiter屬性,即使是在共享鎖模式下,這一屬性也只作為一個標記,指向了一個空節點,因此,在sync queue中,我們不會用它來串聯節點。
condtion queue每創建一個Condtion對象就會對應一個Condtion隊列,每一個調用了Condtion對象的await方法的線程都會被包裝成Node扔進一個條件隊列中,就像這樣:
可見,每一個Condition對象對應一個Conditon隊列,每個Condtion隊列都是獨立的,互相不影響的。在上圖中,如果我們對當前線程調用了notFull.await(), 則當前線程就會被包裝成Node加到notFull隊列的末尾。
值得注意的是,condition queue是一個單向鏈表,在該鏈表中我們使用nextWaiter屬性來串聯鏈表。但是,就像在sync queue中不會使用nextWaiter屬性來串聯鏈表一樣,在condition queue中,也并不會用到prev, next屬性,它們的值都為null。也就是說,在條件隊列中,Node節點真正用到的屬性只有三個:
thread:代表當前正在等待某個條件的線程
waitStatus:條件的等待狀態
nextWaiter:指向條件隊列中的下一個節點
既然這里又提到了waitStatus,我們這里再回顧一下它的取值范圍:
</>復制代碼
volatile int waitStatus;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
在條件隊列中,我們只需要關注一個值即可——CONDITION。它表示線程處于正常的等待狀態,而只要waitStatus不是CONDITION,我們就認為線程不再等待了,此時就要從條件隊列中出隊。
sync queue 和 conditon queue的聯系一般情況下,等待鎖的sync queue和條件隊列condition queue是相互獨立的,彼此之間并沒有任何關系。但是,當我們調用某個條件隊列的signal方法時,會將某個或所有等待在這個條件隊列中的線程喚醒,被喚醒的線程和普通線程一樣需要去爭鎖,如果沒有搶到,則同樣要被加到等待鎖的sync queue中去,此時節點就從condition queue中被轉移到sync queue中:
但是,這里尤其要注意的是,node是被一個一個轉移過去的,哪怕我們調用的是signalAll()方法也是一個一個轉移過去的,而不是將整個條件隊列接在sync queue的末尾。
同時要注意的是,我們在sync queue中只使用prev、next來串聯鏈表,而不使用nextWaiter;我們在condition queue中只使用nextWaiter來串聯鏈表,而不使用prev、next.事實上,它們就是兩個使用了同樣的Node數據結構的完全獨立的兩種鏈表。因此,將節點從condition queue中轉移到sync queue中時,我們需要斷開原來的鏈接(nextWaiter),建立新的鏈接(prev, next),這某種程度上也是需要將節點一個一個地轉移過去的原因之一。
入隊時和出隊時的鎖狀態sync queue是等待鎖的隊列,當一個線程被包裝成Node加到該隊列中時,必然是沒有獲取到鎖;當處于該隊列中的節點獲取到了鎖,它將從該隊列中移除(事實上移除操作是將獲取到鎖的節點設為新的dummy head,并將thread屬性置為null)。
condition隊列是等待在特定條件下的隊列,因為調用await方法時,必然是已經獲得了lock鎖,所以在進入condtion隊列前線程必然是已經獲取了鎖;在被包裝成Node扔進條件隊列中后,線程將釋放鎖,然后掛起;當處于該隊列中的線程被signal方法喚醒后,由于隊列中的節點在之前掛起的時候已經釋放了鎖,所以必須先去再次的競爭鎖,因此,該節點會被添加到sync queue中。因此,條件隊列在出隊時,線程并不持有鎖。
所以事實上,這兩個隊列的鎖狀態正好相反:
condition queue:入隊時已經持有了鎖 -> 在隊列中釋放鎖 -> 離開隊列時沒有鎖 -> 轉移到sync queue
sync queue:入隊時沒有鎖 -> 在隊列中爭鎖 -> 離開隊列時獲得了鎖
通過上面的介紹,我們對條件隊列已經有了感性的認識,接下來就讓我們進入到本篇的重頭戲——源碼分析:
CondtionObjectAQS對Condition這個接口的實現主要是通過ConditionObject,上面已經說個,它的核心實現就是是一個條件隊列,每一個在某個condition上等待的線程都會被封裝成Node對象扔進這個條件隊列。
核心屬性它的核心屬性只有兩個:
</>復制代碼
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
這兩個屬性分別代表了條件隊列的隊頭和隊尾,每當我們新建一個conditionObject對象,都會對應一個條件隊列。
構造函數</>復制代碼
public ConditionObject() { }
構造函數啥也沒干,可見,條件隊列是延時初始化的,在真正用到的時候才會初始化。
Condition接口方法實現 await()第一部分分析</>復制代碼
public final void await() throws InterruptedException {
// 如果當前線程在調動await()方法前已經被中斷了,則直接拋出InterruptedException
if (Thread.interrupted())
throw new InterruptedException();
// 將當前線程封裝成Node添加到條件隊列
Node node = addConditionWaiter();
// 釋放當前線程所占用的鎖,保存當前的鎖狀態
int savedState = fullyRelease(node);
int interruptMode = 0;
// 如果當前隊列不在同步隊列中,說明剛剛被await, 還沒有人調用signal方法,則直接將當前線程掛起
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 線程將在這里被掛起,停止運行
// 能執行到這里說明要么是signal方法被調用了,要么是線程被中斷了
// 所以檢查下線程被喚醒的原因,如果是因為中斷被喚醒,則跳出while循環
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 第一部分就分析到這里,下面的部分我們到第二部分再看, 先把它注釋起來
/*
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
*/
}
我們已經在上面的代碼注釋中描述了大體的流程,接下來我們詳細來看看await方法中所調用方法的具體實現。
首先是將當前線程封裝成Node扔進條件隊列中的addConditionWaiter方法:
addConditionWaiter</>復制代碼
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private Node addConditionWaiter() {
Node t = lastWaiter;
// 如果尾節點被cancel了,則先遍歷整個鏈表,清除所有被cancel的節點
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
// 將當前線程包裝成Node扔進條件隊列
Node node = new Node(Thread.currentThread(), Node.CONDITION);
/*
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
*/
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
首先我們要思考的是,存在兩個不同的線程同時入隊的情況嗎?不存在。為什么呢?因為前面說過了,能調用await方法的線程必然是已經獲得了鎖,而獲得了鎖的線程只有一個,所以這里不存在并發,因此不需要CAS操作。
在這個方法中,我們就是簡單的將當前線程封裝成Node加到條件隊列的末尾。這和將一個線程封裝成Node加入等待隊列略有不同:
節點加入sync queue時waitStatus的值為0,但節點加入condition queue時waitStatus的值為Node.CONDTION。
sync queue的頭節點為dummy節點,如果隊列為空,則會先創建一個dummy節點,再創建一個代表當前節點的Node添加在dummy節點的后面;而condtion queue 沒有dummy節點,初始化時,直接將firstWaiter和lastWaiter直接指向新建的節點就行了。
sync queue是一個雙向隊列,在節點入隊后,要同時修改當前節點的前驅和前驅節點的后繼;而在condtion queue中,我們只修改了前驅節點的nextWaiter,也就是說,condtion queue是作為單向隊列來使用的。
如果入隊時發現尾節點已經取消等待了,那么我們就不應該接在它的后面,此時需要調用unlinkCancelledWaiters來剔除那些已經取消等待的線程:
</>復制代碼
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
該方法將從頭節點開始遍歷整個隊列,剔除其中waitStatus不為Node.CONDTION的節點,這里使用了兩個指針firstWaiter和trail來分別記錄第一個和最后一個waitStatus不為Node.CONDTION的節點,這些都是基礎的鏈表操作,很容易理解,這里不再贅述了。
fullyRelease在節點被成功添加到隊列的末尾后,我們將調用fullyRelease來釋放當前線程所占用的鎖:
</>復制代碼
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
首先,當我們調用這個方法時,說明當前線程已經被封裝成Node扔進條件隊列了。在該方法中,我們通過release方法釋放鎖,還記得release方法嗎,我們在逐行分析AQS源碼(2)——獨占鎖的釋放中已經詳細講過了,這里不再贅述了。
值得注意的是,這是一次性釋放了所有的鎖,即對于可重入鎖而言,無論重入了幾次,這里是一次性釋放完的,這也就是為什么該方法的名字叫fullyRelease。但這里尤其要注意的是release(savedState)方法是有可能拋出IllegalMonitorStateException的,這是因為當前線程可能并不是持有鎖的線程。但是咱前面不是說,只有持有鎖的線程才能調用await方法嗎?既然fullyRelease方法在await方法中,為啥當前線程還有可能并不是持有鎖的線程呢?
雖然話是這么說,但是在調用await方法時,我們其實并沒有檢測Thread.currentThread() == getExclusiveOwnerThread(),換句話說,也就是執行到fullyRelease這一步,我們才會檢測這一點,而這一點檢測是由AQS子類實現tryRelease方法來保證的,例如,ReentrantLock對tryRelease方法的實現如下:
</>復制代碼
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
當發現當前線程不是持有鎖的線程時,我們就會進入finally塊,將當前Node的狀態設為Node.CANCELLED,這也就是為什么上面的addConditionWaiter在添加新節點前每次都會檢查尾節點是否已經被取消了。
在當前線程的鎖被完全釋放了之后,我們就可以調用LockSupport.park(this)把當前線程掛起,等待被signal了。但是,在掛起當前線程之前我們先用isOnSyncQueue確保了它不在sync queue中,這是為什么呢?當前線程不是在一個和sync queue無關的條件隊列中嗎?怎么可能會出現在sync queue中的情況?
</>復制代碼
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
</>復制代碼
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* @return true if present
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
為了解釋這一問題,我們先來看看signal方法
signalAll()在看signalAll之前,我們首先要區分調用signalAll方法的線程與signalAll方法要喚醒的線程(等待在對應的條件隊列里的線程):
調用signalAll方法的線程本身是已經持有了鎖,現在準備釋放鎖了;
在條件隊列里的線程是已經在對應的條件上掛起了,等待著被signal喚醒,然后去爭鎖。
首先,與調用notify時線程必須是已經持有了監視器鎖類似,在調用condition的signal方法時,線程也必須是已經持有了lock鎖:
</>復制代碼
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
該方法首先檢查當前調用signal方法的線程是不是持有鎖的線程,這是通過isHeldExclusively方法來實現的,該方法由繼承AQS的子類來實現,例如,ReentrantLock對該方法的實現為:
</>復制代碼
protected final boolean isHeldExclusively() {
return getExclusiveOwnerThread() == Thread.currentThread();
}
因為exclusiveOwnerThread保存了當前持有鎖的線程,這里只要檢測它是不是等于當前線程就行了。
接下來先通過firstWaiter是否為空判斷條件隊列是否為空,如果條件隊列不為空,則調用doSignalAll方法:
</>復制代碼
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
首先我們通過lastWaiter = firstWaiter = null;將整個條件隊列清空,然后通過一個do-while循環,將原先的條件隊列里面的節點一個一個拿出來(令nextWaiter = null),再通過transferForSignal方法一個一個添加到sync queue的末尾:
</>復制代碼
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(Node node) {
// 如果該節點在調用signal方法前已經被取消了,則直接跳過這個節點
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
// 如果該節點在條件隊列中正常等待,則利用enq方法將該節點添加至sync queue隊列的尾部
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
在transferForSignal方法中,我們先使用CAS操作將當前節點的waitStatus狀態由CONDTION設為0,如果修改不成功,則說明該節點已經被CANCEL了,則我們直接返回,操作下一個節點;如果修改成功,則說明我們已經將該節點從等待的條件隊列中成功“喚醒”了,但此時該節點對應的線程并沒有真正被喚醒,它還要和其他普通線程一樣去爭鎖,因此它將被添加到sync queue的末尾等待獲取鎖。
我們這里通過enq方法將該節點添加進sync queue的末尾。關于該方法,我們在逐行分析AQS源碼(1)——獨占鎖的獲取中已經詳細講過了,這里不再贅述。不過這里尤其注意的是,enq方法將node節點添加進隊列時,返回的是node的前驅節點。
在將節點成功添加進sync queue中后,我們得到了該節點在sync queue中的前驅節點。我們前面說過,在sync queque中的節點都要靠前驅節點去喚醒,所以,這里我們要做的就是將前驅節點的waitStatus設為Node.SIGNAL, 這一點和shouldParkAfterFailedAcquire所做的工作類似:
</>復制代碼
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
所不同的是,shouldParkAfterFailedAcquire將會向前查找,跳過那些被cancel的節點,然后將找到的第一個沒有被cancel的節點的waitStatus設成SIGNAL,最后再掛起。而在transferForSignal中,當前Node所代表的線程本身就已經被掛起了,所以這里做的更像是一個復合操作——只要前驅節點處于被取消的狀態或者無法將前驅節點的狀態修成Node.SIGNAL,那我們就將Node所代表的線程喚醒,但這個條件并不意味著當前lock處于可獲取的狀態,有可能線程被喚醒了,但是鎖還是被占有的狀態,不過這樣做至少是無害的,因為我們在線程被喚醒后還要去爭鎖,如果搶不到鎖,則大不了再次被掛起。
值得注意的是,transferForSignal是有返回值的,但是我們在這個方法中并沒有用到,它將在signal()方法中被使用。
在繼續往下看signal()方法之前,這里我們再總結一下signalAll()方法:
將條件隊列清空(只是令lastWaiter = firstWaiter = null,隊列中的節點和連接關系仍然還存在)
將條件隊列中的頭節點取出,使之成為孤立節點(nextWaiter,prev,next屬性都為null)
如果該節點處于被Cancelled了的狀態,則直接跳過該節點(由于是孤立節點,則會被GC回收)
如果該節點處于正常狀態,則通過enq方法將它添加到sync queue的末尾
判斷是否需要將該節點喚醒(包括設置該節點的前驅節點的狀態為SIGNAL),如有必要,直接喚醒該節點
重復2-5,直到整個條件隊列中的節點都被處理完
signal()與signalAll()方法不同,signal()方法只會喚醒一個節點,對于AQS的實現來說,就是喚醒條件隊列中第一個沒有被Cancel的節點,弄懂了signalAll()方法,signal()方法就很容易理解了,因為它們大同小異:
</>復制代碼
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
首先依然是檢查調用該方法的線程(即當前線程)是不是已經持有了鎖,這一點和上面的signalAll()方法一樣,所不一樣的是,接下來調用的是doSignal方法:
</>復制代碼
private void doSignal(Node first) {
do {
// 將firstWaiter指向條件隊列隊頭的下一個節點
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 將條件隊列原來的隊頭從條件隊列中斷開,則此時該節點成為一個孤立的節點
first.nextWaiter = null;
} while (!transferForSignal(first) && (first = firstWaiter) != null);
}
這個方法也是一個do-while循環,目的是遍歷整個條件隊列,找到第一個沒有被cancelled的節點,并將它添加到條件隊列的末尾。如果條件隊列里面已經沒有節點了,則將條件隊列清空(firstWaiter=lasterWaiter=null)。
在這里,我們用的依然用的是transferForSignal方法,但是用到了它的返回值,只要節點被成功添加到sync queue中,transferForSignal就返回true, 此時while循環的條件就不滿足了,整個方法就結束了,即調用signal()方法,只會喚醒一個線程。
總結: 調用signal()方法會從當前條件隊列中取出第一個沒有被cancel的節點添加到sync隊列的末尾。
await()第二部分分析前面我們已經分析了signal方法,它會將節點添加進sync queue隊列中,并要么立即喚醒線程,要么等待前驅節點釋放鎖后將自己喚醒,無論怎樣,被喚醒的線程要從哪里恢復執行呢?當然是被掛起的地方呀,我們在哪里被掛起的呢?還記得嗎?當然是調用了await方法的地方,以await()方法為例:
</>復制代碼
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 我們在這里被掛起了,被喚醒后,將從這里繼續往下運行
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
這里值得注意的是,當我們被喚醒時,其實并不知道是因為什么原因被喚醒,有可能是因為其他線程調用了signal方法,也有可能是因為當前線程被中斷了。
但是,無論是被中斷喚醒還是被signal喚醒,被喚醒的線程最后都將離開condition queue,進入到sync queue中,這一點我們在下面分析源代碼的時候詳細說。
隨后,線程將在sync queue中利用進行acquireQueued方法進行“阻塞式”爭鎖,搶到鎖就返回,搶不到鎖就繼續被掛起。因此,當await()方法返回時,必然是保證了當前線程已經持有了lock鎖。
另外有一點這里我們提前說明一下,這一點對于我們下面理解源碼很重要,那就是:
</>復制代碼
如果從線程被喚醒,到線程獲取到鎖這段過程中發生過中斷,該怎么處理?
我們前面分析中斷的時候說過,中斷對于當前線程只是個建議,由當前線程決定怎么對其做出處理。在acquireQueued方法中,我們對中斷是不響應的,只是簡單的記錄搶鎖過程中的中斷狀態,并在搶到鎖后將這個中斷狀態返回,交于上層調用的函數處理,而這里“上層調用的函數”就是我們的await()方法。
那么await()方法是怎么對待這個中斷的呢?這取決于:
</>復制代碼
中斷發生時,線程是否已經被signal過?
如果中斷發生時,當前線程并沒有被signal過,則說明當前線程還處于條件隊列中,屬于正常在等待中的狀態,此時中斷將導致當前線程的正常等待行為被打斷,進入到sync queue中搶鎖,因此,在我們從await方法返回后,需要拋出InterruptedException,表示當前線程因為中斷而被喚醒。
如果中斷發生時,當前線程已經被signal過了,則說明這個中斷來的太晚了,既然當前線程已經被signal過了,那么就說明在中斷發生前,它就已經正常地被從condition queue中喚醒了,所以隨后即使發生了中斷(注意,這個中斷可以發生在搶鎖之前,也可以發生在搶鎖的過程中),我們都將忽略它,僅僅是在await()方法返回后,再自我中斷一下,補一下這個中斷。就好像這個中斷是在await()方法調用結束之后才發生的一樣。這里之所以要“補一下”這個中斷,是因為我們在用Thread.interrupted()方法檢測是否發生中斷的同時,會將中斷狀態清除,因此如果選擇了忽略中斷,則應該在await()方法退出后將它設成原來的樣子。
關于“這個中斷來的太晚了”這一點如果大家不太容易理解的話,這里打個比方:這就好比我們去飯店吃飯,都快吃完了,有一個菜到現在還沒有上,于是我們常常會把服務員叫來問:這個菜有沒有在做?要是還沒做我們就不要了。然后服務員會跑到廚房去問,之后跑回來說:對不起,這個菜已經下鍋在炒了,請再耐心等待一下。這里,這個“這個菜我們不要了”(發起的中斷)就來的太晚了,因為菜已經下鍋了(已經被signal過了)。
理清了上面的概念,我們再來看看await()方法是怎么做的,它用中斷模式interruptMode這個變量記錄中斷事件,該變量有三個值:
0 : 代表整個過程中一直沒有中斷發生。
THROW_IE : 表示退出await()方法時需要拋出InterruptedException,這種模式對應于中斷發生在signal之前
REINTERRUPT : 表示退出await()方法時只需要再自我中斷以下,這種模式對應于中斷發生在signal之后,即中斷來的太晚了。
</>復制代碼
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
接下來我們就從線程被喚醒的地方繼續往下走,一步步分析源碼:
情況1:中斷發生時,線程還沒有被signal過線程被喚醒后,我們將首先使用checkInterruptWhileWaiting方法檢測中斷的模式:
</>復制代碼
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
這里假設已經發生過中斷,則Thread.interrupted()方法必然返回true,接下來就是用transferAfterCancelledWait進一步判斷是否發生了signal:
</>復制代碼
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
enq(node);
return true;
}
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
上面已經說過,判斷一個node是否被signal過,一個簡單有效的方法就是判斷它是否離開了condition queue, 進入到sync queue中。
換句話說,只要一個節點的waitStatus還是Node.CONDITION,那就說明它還沒有被signal過。
由于現在我們分析情況1,則當前節點的waitStatus必然是Node.CONDITION,則會成功執行compareAndSetWaitStatus(node, Node.CONDITION, 0),將該節點的狀態設置成0,然后調用enq(node)方法將當前節點添加進sync queue中,然后返回true。
這里值得注意的是,我們此時并沒有斷開node的nextWaiter,所以最后一定不要忘記將這個鏈接斷開。
再回到transferAfterCancelledWait調用處,可知,由于transferAfterCancelledWait將返回true,現在checkInterruptWhileWaiting將返回THROW_IE,這表示我們在離開await方法時應當要拋出THROW_IE異常。
再回到checkInterruptWhileWaiting的調用處:
</>復制代碼
public final void await() throws InterruptedException {
/*
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
*/
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 我們現在在這里!!!
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
interruptMode現在為THROW_IE,則我們將執行break,跳出while循環。
接下來我們將執行acquireQueued(node, savedState)進行爭鎖,注意,這里傳入的需要獲取鎖的重入數量是savedState,即之前釋放了多少,這里就需要再次獲取多少:
</>復制代碼
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 如果線程獲取不到鎖,則將在這里被阻塞住
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued我們在前面的文章中已經詳細分析過了,它是一個阻塞式的方法,獲取到鎖則退出,獲取不到鎖則會被掛起。該方法只有在最終獲取到了鎖后,才會退出,并且退出時會返回當前線程的中斷狀態,如果我們在獲取鎖的過程中又被中斷了,則會返回true,否則會返回false。但是其實這里返回true還是false已經不重要了,因為前面已經發生過中斷了,我們就是因為中斷而被喚醒的不是嗎?所以無論如何,我們在退出await()方法時,必然會拋出InterruptedException。
我們這里假設它獲取到了鎖了,則它將回到上面的調用處,由于我們這時的interruptMode = THROW_IE,則會跳過if語句。
接下來我們將執行:
</>復制代碼
if (node.nextWaiter != null)
unlinkCancelledWaiters();
上面我們說過,當前節點的nextWaiter是有值的,它并沒有和原來的condition隊列斷開,這里我們已經獲取到鎖了,根據逐行分析AQS源碼(1)——獨占鎖的獲取中的分析,我們通過setHead方法已經將它的thread屬性置為null,從而將當前線程從sync queue"移除"了,接下來應當將它從condition隊列里面移除。由于condition隊列是一個單向隊列,我們無法獲取到它的前驅節點,所以只能從頭開始遍歷整個條件隊列,然后找到這個節點,再移除它。
然而,事實上呢,我們并沒有這么做。因為既然已經必須從頭開始遍歷鏈表了,我們就干脆一次性把鏈表中所有沒有在等待的節點都拿出去,所以這里調用了unlinkCancelledWaiters方法,該方法我們在前面await()第一部分的分析的時候已經講過了,它就是簡單的遍歷鏈表,找到所有waitStatus不為CONDITION的節點,并把它們從隊列中移除。
節點被移除后,接下來就是最后一步了——匯報中斷狀態:
</>復制代碼
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
這里我們的interruptMode=THROW_IE,說明發生了中斷,則將調用reportInterruptAfterWait:
</>復制代碼
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
可以看出,在interruptMode=THROW_IE時,我們就是簡單的拋出了一個InterruptedException。
至此,情況1(中斷發生于signal之前)我們就分析完了,這里我們簡單總結一下:
線程因為中斷,從掛起的地方被喚醒
隨后,我們通過transferAfterCancelledWait確認了線程的waitStatus值為Node.CONDITION,說明并沒有signal發生過
然后我們修改線程的waitStatus為0,并通過enq(node)方法將其添加到sync queue中
接下來線程將在sync queue中以阻塞的方式獲取,如果獲取不到鎖,將會被再次掛起
線程在sync queue中獲取到鎖后,將調用unlinkCancelledWaiters方法將自己從條件隊列中移除,該方法還會順便移除其他取消等待的鎖
最后我們通過reportInterruptAfterWait拋出了InterruptedException
由此可以看出,一個調用了await方法掛起的線程在被中斷后不會立即拋出InterruptedException,而是會被添加到sync queue中去爭鎖,如果爭不到,還是會被掛起;
只有爭到了鎖之后,該線程才得以從sync queue和condition queue中移除,最后拋出InterruptedException。
所以說,一個調用了await方法的線程,即使被中斷了,它依舊還是會被阻塞住,直到它獲取到鎖之后才能返回,并在返回時拋出InterruptedException。中斷對它意義更多的是體現在將它從condition queue中移除,加入到sync queue中去爭鎖,從這個層面上看,中斷和signal的效果其實很像,所不同的是,在await()方法返回后,如果是因為中斷被喚醒,則await()方法需要拋出InterruptedException異常,表示是它是被非正常喚醒的(正常喚醒是指被signal喚醒)。
情況2:中斷發生時,線程已經被signal過了這種情況對應于“中斷來的太晚了”,即REINTERRUPT模式,我們在拿到鎖退出await()方法后,只需要再自我中斷一下,不需要拋出InterruptedException。
值得注意的是這種情況其實包含了兩個子情況:
被喚醒時,已經發生了中斷,但此時線程已經被signal過了
被喚醒時,并沒有發生中斷,但是在搶鎖的過程中發生了中斷
下面我們分別來分析:
對于這種情況,與前面中斷發生于signal之前的主要差別在于transferAfterCancelledWait方法:
</>復制代碼
final boolean transferAfterCancelledWait(Node node) {
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) { //線程A執行到這里,CAS操作將會失敗
enq(node);
return true;
}
// 由于中斷發生前,線程已經被signal了,則這里只需要等待線程成功進入sync queue即可
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
在這里,由于signal已經發生過了,則由我們之前分析的signal方法可知,此時當前節點的waitStatus必定不為Node.CONDITION,他將跳過if語句。此時當前線程可能已經在sync queue中,或者正在進入到sync queue的路上。
為什么這里會出現“正在進入到sync queue的路上”的情況呢? 這里我們解釋下:
假設當前線程為線程A, 它被喚醒之后檢測到發生了中斷,來到了transferAfterCancelledWait這里,而另一個線程B在這之前已經調用了signal方法,該方法會調用transferForSignal將當前線程添加到sync queue的末尾:
</>復制代碼
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 線程B執行到這里,CAS操作將會成功
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
因為線程A和線程B是并發執行的,而這里我們分析的是“中斷發生在signal之后”,則此時,線程B的compareAndSetWaitStatus先于線程A執行。這時可能出現線程B已經成功修改了node的waitStatus狀態,但是還沒來得及調用enq(node)方法,線程A就執行到了transferAfterCancelledWait方法,此時它發現waitStatus已經不是Condition,但是其實當前節點還沒有被添加到sync node隊列中,因此,它接下來將通過自旋,等待線程B執行完transferForSignal方法。
線程A在自旋過程中會不斷地判斷節點有沒有被成功添加進sync queue,判斷的方法就是isOnSyncQueue:
</>復制代碼
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
該方法很好理解,只要waitStatus的值還為Node.CONDITION,則它一定還在condtion隊列中,自然不可能在sync里面;而每一個調用了enq方法入隊的線程:
</>復制代碼
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { //即使這一步失敗了next.prev一定是有值的
t.next = node; // 如果t.next有值,說明上面的compareAndSetTail方法一定成功了,則當前節點成為了新的尾節點
return t; // 返回了當前節點的前驅節點
}
}
}
}
哪怕在設置compareAndSetTail這一步失敗了,它的prev必然也是有值的,因此這兩個條件只要有一個滿足,就說明節點必然不在sync queue隊列中。
另一方面,如果node.next有值,則說明它不僅在sync queue中,并且在它后面還有別的節點,則只要它有值,該節點必然在sync queue中。
如果以上都不滿足,說明這里出現了尾部分叉(關于尾部分叉,參見這里)的情況,我們就從尾節點向前尋找這個節點:
</>復制代碼
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* @return true if present
*/
private boolean findNodeFromTail(Node node) {
Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
這里當然還是有可能出現從尾部反向遍歷找不到的情況,但是不用擔心,我們還在while循環中,無論如何,節點最后總會入隊成功的。最終,transferAfterCancelledWait將返回false。
再回到transferAfterCancelledWait調用處:
</>復制代碼
private int checkInterruptWhileWaiting(Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
則這里,由于transferAfterCancelledWait返回了false,則checkInterruptWhileWaiting方法將返回REINTERRUPT,這說明我們在退出該方法時只需要再次中斷。
再回到checkInterruptWhileWaiting方法的調用處:
</>復制代碼
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //我們在這里!!!
break;
}
//當前interruptMode=REINTERRUPT,無論這里是否進入if體,該值不變
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
此時,interruptMode的值為REINTERRUPT,我們將直接跳出while循環。
接下來就和上面的情況1一樣了,我們依然還是去爭鎖,這一步依然是阻塞式的,獲取到鎖則退出,獲取不到鎖則會被掛起。
另外由于現在interruptMode的值已經為REINTERRUPT,因此無論在爭鎖的過程中是否發生過中斷interruptMode的值都還是REINTERRUPT。
接著就是將節點從condition queue中剔除,與情況1不同的是,在signal方法成功將node加入到sync queue時,該節點的nextWaiter已經是null了,所以這里這一步不需要執行。
再接下來就是報告中斷狀態了:
</>復制代碼
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
注意,這里并沒有拋出中斷異常,而只是將當前線程再中斷一次。
至此,情況2.1(被喚醒時,已經發生了中斷,但此時線程已經被signal過了)我們就分析完了,這里我們簡單總結一下:
線程從掛起的地方被喚醒,此時既發生過中斷,又發生過signal
隨后,我們通過transferAfterCancelledWait確認了線程的waitStatus值已經不為Node.CONDITION,說明signal發生于中斷之前
然后,我們通過自旋的方式,等待signal方法執行完成,確保當前節點已經被成功添加到sync queue中
接下來線程將在sync queue中以阻塞的方式獲取鎖,如果獲取不到,將會被再次掛起
最后我們通過reportInterruptAfterWait將當前線程再次中斷,但是不會拋出InterruptedException
這種情況就比上面的情況簡單一點了,既然被喚醒時沒有發生中斷,那基本可以確信線程是被signal喚醒的,但是不要忘記還存在“假喚醒”這種情況,因此我們依然還是要檢測被喚醒的原因。
那么怎么區分到底是假喚醒還是因為是被signal喚醒了呢?
如果線程是因為signal而被喚醒,則由前面分析的signal方法可知,線程最終都會離開condition queue 進入sync queue中,所以我們只需要判斷被喚醒時,線程是否已經在sync queue中即可:
</>復制代碼
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 我們在這里,線程將在這里被喚醒
// 由于現在沒有發生中斷,所以interruptMode目前為0
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
線程被喚醒時,暫時還沒有發生中斷,所以這里interruptMode = 0, 表示沒有中斷發生,所以我們將繼續while循環,這時我們將通過isOnSyncQueue方法判斷當前線程是否已經在sync queue中了。由于已經發生過signal了,則此時node必然已經在sync queue中了,所以isOnSyncQueue將返回true,我們將退出while循環。
不過這里插一句,如果isOnSyncQueue檢測到當前節點不在sync queue中,則說明既沒有發生中斷,也沒有發生過signal,則當前線程是被“假喚醒”的,那么我們將再次進入循環體,將線程掛起。
退出while循環后接下來還是利用acquireQueued爭鎖,因為前面沒有發生中斷,則interruptMode=0,這時,如果在爭鎖的過程中發生了中斷,則acquireQueued將返回true,則此時interruptMode將變為REINTERRUPT。
接下是判斷node.nextWaiter != null,由于在調用signal方法時已經將節點移出了隊列,所有這個條件也不成立。
最后就是匯報中斷狀態了,此時interruptMode的值為REINTERRUPT,說明線程在被signal后又發生了中斷,這個中斷發生在搶鎖的過程中,這個中斷來的太晚了,因此我們只是再次自我中斷一下。
至此,情況2.2(被喚醒時,并沒有發生中斷,但是在搶鎖的過程中發生了中斷)我們就分析完了,這種情況和2.1很像,區別就是一個是在喚醒后就被發現已經發生了中斷,一個個喚醒后沒有發生中斷,但是在搶鎖的過成中發生了中斷,但無論如何,這兩種情況都會被歸結為“中斷來的太晚了”,中斷模式為REINTERRUPT,情況2.2的總結如下:
線程被signal方法喚醒,此時并沒有發生過中斷
因為沒有發生過中斷,我們將從checkInterruptWhileWaiting處返回,此時interruptMode=0
接下來我們回到while循環中,因為signal方法保證了將節點添加到sync queue中,此時while循環條件不成立,循環退出
接下來線程將在sync queue中以阻塞的方式獲取,如果獲取不到鎖,將會被再次掛起
線程獲取到鎖返回后,我們檢測到在獲取鎖的過程中發生過中斷,并且此時interruptMode=0,這時,我們將interruptMode修改為REINTERRUPT
最后我們通過reportInterruptAfterWait將當前線程再次中斷,但是不會拋出InterruptedException
這里我們再總結以下情況2(中斷發生時,線程已經被signal過了),這種情況對應于中斷發生signal之后,我們不管這個中斷是在搶鎖之前就已經發生了還是搶鎖的過程中發生了,只要它是在signal之后發生的,我們就認為它來的太晚了,我們將忽略這個中斷。因此,從await()方法返回的時候,我們只會將當前線程重新中斷一下,而不會拋出中斷異常。
情況3: 一直沒有中斷發生這種情況就更簡單了,它的大體流程和上面的情況2.2差不多,只是在搶鎖的過程中也沒有發生異常,則interruptMode為0,沒有發生過中斷,因此不需要匯報中斷。則線程就從await()方法處正常返回。
await()總結至此,我們總算把await()方法完整的分析完了,這里我們對整個方法做出總結:
進入await()時必須是已經持有了鎖
離開await()時同樣必須是已經持有了鎖
調用await()會使得當前線程被封裝成Node扔進條件隊列,然后釋放所持有的鎖
釋放鎖后,當前線程將在condition queue中被掛起,等待signal或者中斷
線程被喚醒后會將會離開condition queue進入sync queue中進行搶鎖
若在線程搶到鎖之前發生過中斷,則根據中斷發生在signal之前還是之后記錄中斷模式
線程在搶到鎖后進行善后工作(離開condition queue, 處理中斷異常)
線程已經持有了鎖,從await()方法返回
在這一過程中我們尤其要關注中斷,如前面所說,中斷和signal所起到的作用都是將線程從condition queue中移除,加入到sync queue中去爭鎖,所不同的是,signal方法被認為是正常喚醒線程,中斷方法被認為是非正常喚醒線程,如果中斷發生在signal之前,則我們在最終返回時,應當拋出InterruptedException;如果中斷發生在signal之后,我們就認為線程本身已經被正常喚醒了,這個中斷來的太晚了,我們直接忽略它,并在await()返回時再自我中斷一下,這種做法相當于將中斷推遲至await()返回時再發生。
awaitUninterruptibly()在前面我們分析的await()方法中,中斷起到了和signal同樣的效果,但是中斷屬于將一個等待中的線程非正常喚醒,可能即使線程被喚醒后,也搶到了鎖,但是卻發現當前的等待條件并沒有滿足,則還是得把線程掛起。因此我們有時候并不希望await方法被中斷,awaitUninterruptibly()方法即實現了這個功能:
</>復制代碼
public final void awaitUninterruptibly() {
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if (Thread.interrupted())
interrupted = true; // 發生了中斷后線程依舊留在了condition queue中,將會再次被掛起
}
if (acquireQueued(node, savedState) || interrupted)
selfInterrupt();
}
首先,從方法簽名上就可以看出,這個方法不會拋出中斷異常,我們拿它和await()方法對比一下:
</>復制代碼
public final void await() throws InterruptedException {
if (Thread.interrupted()) // 不同之處
throw new InterruptedException(); // 不同之處
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0; // 不同之處
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) // 不同之處
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 不同之處
interruptMode = REINTERRUPT; // 不同之處
if (node.nextWaiter != null) // 不同之處
unlinkCancelledWaiters(); // 不同之處
if (interruptMode != 0) // 不同之處
reportInterruptAfterWait(interruptMode); // 不同之處
}
由此可見,awaitUninterruptibly()全程忽略中斷,即使是當前線程因為中斷被喚醒,該方法也只是簡單的記錄中斷狀態,然后再次被掛起(因為并沒有并沒有任何操作將它添加到sync queue中)
要使當前線程離開condition queue去爭鎖,則必須是發生了signal事件。
最后,當線程在獲取鎖的過程中發生了中斷,該方法也是不響應,只是在最終獲取到鎖返回時,再自我中斷一下。可以看出,該方法和“中斷發生于signal之后的”REINTERRUPT模式的await()方法很像。
至此,該方法我們就分析完了,如果你之前await()方法已經弄懂了,這個awaitUninterruptibly()方法就很容易理解了。它的核心思想是:
中斷雖然會喚醒線程,但是不會導致線程離開condition queue,如果線程只是因為中斷而被喚醒,則他將再次被掛起
只有signal方法會使得線程離開condition queue
調用該方法時或者調用過程中如果發生了中斷,僅僅會在該方法結束時再自我中斷以下,不會拋出InterruptedException
awaitNanos(long nanosTimeout)前面我們看的方法,無論是await()還是awaitUninterruptibly(),它們在搶鎖的過程中都是阻塞式的,即一直到搶到了鎖才能返回,否則線程還是會被掛起,這樣帶來一個問題就是線程如果長時間搶不到鎖,就會一直被阻塞,因此我們有時候更需要帶超時機制的搶鎖,這一點和帶超時機制的wait(long timeout)是很像的,我們直接來看源碼:
</>復制代碼
public final long awaitNanos(long nanosTimeout) throws InterruptedException {
/*if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);*/
final long deadline = System.nanoTime() + nanosTimeout;
/*int interruptMode = 0;
while (!isOnSyncQueue(node)) */{
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout >= spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);
/*if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;*/
nanosTimeout = deadline - System.nanoTime();
}
/*if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);*/
return deadline - System.nanoTime();
}
該方法幾乎和await()方法一樣,只是多了超時時間的處理,我們上面已經把和await()方法相同的部分注釋起來了,只留下了不同的部分,這樣它們的區別就變得更明顯了。
該方法的主要設計思想是,如果設定的超時時間還沒到,我們就將線程掛起;超過等待的時間了,我們就將線程從condtion queue轉移到sync queue中。注意這里對于超時時間有一個小小的優化——當設定的超時時間很短時(小于spinForTimeoutThreshold的值),我們就是簡單的自旋,而不是將線程掛起,以減少掛起線程和喚醒線程所帶來的時間消耗。
不過這里還有一處值得注意,就是awaitNanos(0)的意義,我們在線程間的同步與通信(2)——wait, notify, notifyAll曾經提到過,wait(0)的含義是無限期等待,而我們在awaitNanos(long nanosTimeout)方法中是怎么處理awaitNanos(0)的呢?
</>復制代碼
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
從這里可以看出,如果設置的等待時間本身就小于等于0,當前線程是會直接從condition queue中轉移到sync queue中的,并不會被掛起,也不需要等待signal,這一點確實是更復合邏輯。如果需要線程只有在signal發生的條件下才會被喚醒,則應該用上面的awaitUninterruptibly()方法。
await(long time, TimeUnit unit)看完awaitNanos(long nanosTimeout)再看await(long time, TimeUnit unit)方法就更簡單了,它就是在awaitNanos(long nanosTimeout)的基礎上多了對于超時時間的時間單位的設置,但是在內部實現上還是會把時間轉成納秒去執行,這里我們直接拿它和上面的awaitNanos(long nano
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77193.html
摘要:為了避免一篇文章的篇幅過長,于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會持續更新,以給大家一個查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學習經歷。因為寫作的時候發現,為了弄懂一個知識,不得不先去了解另外一些知識,這樣以來,為了說明一個問題,就要把一系列知識都了解一遍,寫出來的文章就特別長。 為了避免一篇...
摘要:本篇我們將以的公平鎖為例來詳細看看使用獲取獨占鎖的流程。本文中的源碼基于。由于本篇我們分析的是獨占鎖,同一時刻,鎖只能被一個線程所持有。由于在整個搶鎖過程中,我們都是不響應中斷的。 前言 AQS(AbstractQueuedSynchronizer)是JAVA中眾多鎖以及并發工具的基礎,其底層采用樂觀鎖,大量使用了CAS操作, 并且在沖突時,采用自旋方式重試,以實現輕量級和高效地獲取鎖...
摘要:相較于方法,提供了超時等待機制注意,在方法中,我們用到了的返回值,如果該方法因為超時而退出時,則將返回。的這個返回值有助于我們理解該方法究竟是因為獲取到了鎖而返回,還是因為超時時間到了而返回。 前言 系列文章目錄 CountDownLatch是一個很有用的工具,latch是門閂的意思,該工具是為了解決某些操作只能在一組操作全部執行完成后才能執行的情景。例如,小組早上開會,只有等所有人...
摘要:前言本來準備做源碼閱讀的幾千行看著太累了看了幾篇大神的文章后才基本搞懂附在這里閱讀本文前請先看懂的介紹和原理分析并發包源碼學習之框架四源碼分析接口實現接口一般看一個類實現的接口可以看出它的目的其實也是熟悉的目的主要是替代的方法的它是基于實現 前言 本來準備做AbstractQueuedSynchronizer源碼閱讀的,幾千行看著太累了,看了幾篇大神的文章后才基本搞懂,附在這里,閱讀本...
閱讀 548·2021-10-19 11:45
閱讀 1357·2021-09-30 09:48
閱讀 1471·2021-08-16 10:56
閱讀 737·2021-07-26 23:38
閱讀 3211·2019-08-30 13:15
閱讀 2595·2019-08-30 12:45
閱讀 1829·2019-08-29 12:14
閱讀 2074·2019-08-26 18:42