摘要:本文的源碼基于。人如其名,包含了和兩部分。而將一個任務的狀態設置成終止態只有三種方法我們將在下文的源碼解析中分析這三個方法。將棧中所有掛起的線程都喚醒后,下面就是執行方法這個方法是一個空方
前言
系列文章目錄
有了上一篇對預備知識的了解之后,分析源碼就容易多了,本篇我們就直接來看看FutureTask的源碼。
本文的源碼基于JDK1.8。
Future和Task在深入分析源碼之前,我們再來拎一下FutureTask到底是干嘛的。人如其名,FutureTask包含了Future和Task兩部分。
我們上一篇說過,FutureTask實現了RunnableFuture接口,即Runnable接口和Future接口。
其中Runnable接口對應了FutureTask名字中的Task,代表FutureTask本質上也是表征了一個任務。而Future接口就對應了FutureTask名字中的Future,表示了我們對于這個任務可以執行某些操作,例如,判斷任務是否執行完畢,獲取任務的執行結果,取消任務的執行等。
所以簡單來說,FutureTask本質上就是一個“Task”,我們可以把它當做簡單的Runnable對象來使用。但是它又同時實現了Future接口,因此我們可以對它所代表的“Task”進行額外的控制操作。
Java并發工具類的三板斧關于Java并發工具類的三板斧,我們在分析AQS源碼的時候已經說過了,即:
狀態,隊列,CAS
以這三個方面為切入點來看源碼,有助于我們快速的看清FutureTask的概貌:
狀態首先是找狀態。
在FutureTask中,狀態是由state屬性來表示的,不出所料,它是volatile類型的,確保了不同線程對它修改的可見性:
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
state屬性是貫穿整個FutureTask的最核心的屬性,該屬性的值代表了任務在運行過程中的狀態,隨著任務的執行,狀態將不斷地進行轉變,從上面的定義中可以看出,總共有7種狀態:包括了1個初始態,2個中間態和4個終止態。
雖說狀態有這么多,但是狀態的轉換路徑卻只有四種:
任務的初始狀態都是NEW, 這一點是構造函數保證的,我們后面分析構造函數的時候再講;
任務的終止狀態有4種:
NORMAL:任務正常執行完畢
EXCEPTIONAL:任務執行過程中發生異常
CANCELLED:任務被取消
INTERRUPTED:任務被中斷
任務的中間狀態有2種:
COMPLETING 正在設置任務結果
INTERRUPTING 正在中斷運行任務的線程
值得一提的是,任務的中間狀態是一個瞬態,它非常的短暫。而且任務的中間態并不代表任務正在執行,而是任務已經執行完了,正在設置最終的返回結果,所以可以這么說:
只要state不處于 NEW 狀態,就說明任務已經執行完畢
注意,這里的執行完畢是指傳入的Callable對象的call方法執行完畢,或者拋出了異常。所以這里的COMPLETING的名字顯得有點迷惑性,它并不意味著任務正在執行中,而意味著call方法已經執行完畢,正在設置任務執行的結果。
而將一個任務的狀態設置成終止態只有三種方法:
set
setException
cancel
我們將在下文的源碼解析中分析這三個方法。
隊列接著我們來看隊列,在FutureTask中,隊列的實現是一個單向鏈表,它表示所有等待任務執行完畢的線程的集合。我們知道,FutureTask實現了Future接口,可以獲取“Task”的執行結果,那么如果獲取結果時,任務還沒有執行完畢怎么辦呢?那么獲取結果的線程就會在一個等待隊列中掛起,直到任務執行完畢被喚醒。這一點有點類似于我們之前學習的AQS中的sync queue,在下文的分析中,大家可以自己對照它們的異同點。
我們前面說過,在并發編程中使用隊列通常是將當前線程包裝成某種類型的數據結構扔到等待隊列中,我們先來看看隊列中的每一個節點是怎么個結構:
static final class WaitNode { volatile Thread thread; volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } }
可見,相比于AQS的sync queue所使用的雙向鏈表中的Node,這個WaitNode要簡單多了,它只包含了一個記錄線程的thread屬性和指向下一個節點的next屬性。
值得一提的是,FutureTask中的這個單向鏈表是當做棧來使用的,確切來說是當做Treiber棧來使用的,不了解Treiber棧是個啥的可以簡單的把它當做是一個線程安全的棧,它使用CAS來完成入棧出棧操作(想進一步了解的話可以看這篇文章)。為啥要使用一個線程安全的棧呢,因為同一時刻可能有多個線程都在獲取任務的執行結果,如果任務還在執行過程中,則這些線程就要被包裝成WaitNode扔到Treiber棧的棧頂,即完成入棧操作,這樣就有可能出現多個線程同時入棧的情況,因此需要使用CAS操作保證入棧的線程安全,對于出棧的情況也是同理。
由于FutureTask中的隊列本質上是一個Treiber棧,那么使用這個隊列就只需要一個指向棧頂節點的指針就行了,在FutureTask中,就是waiters屬性:
/** Treiber stack of waiting threads */ private volatile WaitNode waiters;
事實上,它就是整個單向鏈表的頭節點。
綜上,FutureTask中所使用的隊列的結構如下:
CAS操作大多數是用來改變狀態的,在FutureTask中也不例外。我們一般在靜態代碼塊中初始化需要CAS操作的屬性的偏移量:
// Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long stateOffset; private static final long runnerOffset; private static final long waitersOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class> k = FutureTask.class; stateOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("state")); runnerOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("runner")); waitersOffset = UNSAFE.objectFieldOffset(k.getDeclaredField("waiters")); } catch (Exception e) { throw new Error(e); } }
從這個靜態代碼塊中我們也可以看出,CAS操作主要針對3個屬性,包括state、runner和waiters,說明這3個屬性基本是會被多個線程同時訪問的。其中state屬性代表了任務的狀態,waiters屬性代表了指向棧頂節點的指針,這兩個我們上面已經分析過了。runner屬性代表了執行FutureTask中的“Task”的線程。為什么需要一個屬性來記錄執行任務的線程呢?這是為了中斷或者取消任務做準備的,只有知道了執行任務的線程是誰,我們才能去中斷它。
定義完屬性的偏移量之后,接下來就是CAS操作本身了。在FutureTask,CAS操作最終調用的還是Unsafe類的compareAndSwapXXX方法,關于這一點,我們上一篇預備知識中已經講過了,這里不再贅述。
核心屬性前面我們以java并發編程工具類的“三板斧”為切入點分析了FutureTask的狀態,隊列和CAS操作,對這個工具類有了初步的認識。接下來,我們就要開始進入源碼分析了。首先我們先來看看FutureTask的幾個核心屬性:
/** * The run state of this task, initially NEW. The run state * transitions to a terminal state only in methods set, * setException, and cancel. During completion, state may take on * transient values of COMPLETING (while outcome is being set) or * INTERRUPTING (only while interrupting the runner to satisfy a * cancel(true)). Transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * Possible state transitions: * NEW -> COMPLETING -> NORMAL * NEW -> COMPLETING -> EXCEPTIONAL * NEW -> CANCELLED * NEW -> INTERRUPTING -> INTERRUPTED */ private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; /** The underlying callable; nulled out after running */ private Callablecallable; /** The result to return or exception to throw from get() */ private Object outcome; // non-volatile, protected by state reads/writes /** The thread running the callable; CASed during run() */ private volatile Thread runner; /** Treiber stack of waiting threads */ private volatile WaitNode waiters;
可以看出,FutureTask的核心屬性只有5個:
state
callable
outcome
runner
waiters
關于 state waiters runner三個屬性我們上面已經解釋過了。剩下的callable屬性代表了要執行的任務本身,即FutureTask中的“Task”部分,為Callable類型,這里之所以用Callable而不用Runnable是因為FutureTask實現了Future接口,需要獲取任務的執行結果。outcome屬性代表了任務的執行結果或者拋出的異常,為Object類型,也就是說outcome可以是任意類型的對象,所以當我們將正常的執行結果返回給調用者時,需要進行強制類型轉換,返回由Callable定義的V類型。這5個屬性綜合起來就完成了整個FutureTask的工作,使用關系如下:
任務本尊:callable
任務的執行者:runner
任務的結果:outcome
獲取任務的結果:state + outcome + waiters
中斷或者取消任務:state + runner + waiters
構造函數介紹完核心屬性之后,我們來看看FutureTask的構造函數:
public FutureTask(Callablecallable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
FutureTask共有2個構造函數,這2個構造函數一個是直接傳入Callable對象, 一個是傳入一個Runnable對象和一個指定的result, 然后通過Executors工具類將它適配成callable對象, 所以這兩個構造函數的本質是一樣的:
用傳入的參數初始化callable成員變量
將FutureTask的狀態設為NEW
(關于將Runnable對象適配成Callable對象的方法Executors.callable(runnable, result)我們在上一篇預備知識中已經講過了,不記得的同學可以倒回去再看一下)
接口實現上一篇我們提過,FutureTask實現了RunnableFuture接口:
public class FutureTaskimplements RunnableFuture { ... }
因此,它必須實現Runnable和Future接口的所有方法。
Runnable接口實現要實現Runnable接口, 就得覆寫run方法, 我們看看FutureTask的run方法干了點啥:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callablec = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
首先我們看到,在run方法的一開始,就檢查當前狀態是不是New, 并且使用CAS操作將runner屬性設置位當前線程,即記錄執行任務的線程。compareAndSwapObject的用法在上一篇預備知識中已經介紹過了,這里不再贅述。可見,runner屬性是在運行時被初始化的。
接下來,我們就調用Callable對象的call方法來執行任務,如果任務執行成功,就使用set(result)設置結果,否則,用setException(ex)設置拋出的異常。
我們先來看看set(result)方法:
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
這個方法一開始通過CAS操作將state屬性由原來的NEW狀態修改為COMPLETING狀態,我們在一開始介紹state狀態的時候說過,COMPLETING是一個非常短暫的中間態,表示正在設置執行的結果。
狀態設置成功后,我們就把任務執行結果賦值給outcome, 然后直接把state狀態設置成NORMAL,注意,這里是直接設置,沒有先比較再設置的操作,由于state屬性被設置成volatile, 結合我們上一篇預備知識的介紹,這里putOrderedInt應當和putIntVolatile是等價的,保證了state狀態對其他線程的可見性。
在這之后,我們調用了 finishCompletion()來完成執行結果的設置。
接下來我們再來看看發生了異常的版本setException(ex):
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
可見,除了將outcome屬性賦值為異常對象,以及將state的終止狀態修改為EXCEPTIONAL,其余都和set方法類似。在方法的最后,都調用了 finishCompletion()來完成執行結果的設置。那么我們就來看看 finishCompletion()干了點啥:
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
這個方法事實上完成了一個“善后”工作。我們先來看看if條件語句中的CAS操作:
UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)
該方法是將waiters屬性的值由原值設置為null, 我們知道,waiters屬性指向了Treiber棧的棧頂節點,可以說是代表了整個Treiber棧,將該值設為null的目的就是清空整個棧。如果設置不成功,則if語句塊不會被執行,又進行下一輪for循環,而下一輪for循環的判斷條件又是waiters!=null ,由此我們知道,雖然最外層的for循環乍一看好像是什么遍歷節點的操作,其實只是為了確保waiters屬性被成功設置成null,本質上相當于一個自旋操作。
將waiters屬性設置成null以后,接下了 for (;;)死循環才是真正的遍歷節點,可以看出,循環內部就是一個普通的遍歷鏈表的操作,我們前面講屬性的時候說過,Treiber棧里面存放的WaitNode代表了當前等待任務執行結束的線程,這個循環的作用也正是遍歷鏈表中所有等待的線程,并喚醒他們。
將Treiber棧中所有掛起的線程都喚醒后,下面就是執行done方法:
/** * Protected method invoked when this task transitions to state * {@code isDone} (whether normally or via cancellation). The * default implementation does nothing. Subclasses may override * this method to invoke completion callbacks or perform * bookkeeping. Note that you can query status inside the * implementation of this method to determine whether this task * has been cancelled. */ protected void done() { }
這個方法是一個空方法,從注釋上看,它是提供給子類覆寫的,以實現一些任務執行結束前的額外操作。
done方法之后就是callable屬性的清理了(callable = null)。
至此,整個run方法分析完了。
真的嗎???
并沒有!別忘了run方法最后還有一個finally塊呢:
finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); }
在finally塊中,我們將runner屬性置為null,并且檢查有沒有遺漏的中斷,如果發現s >= INTERRUPTING, 說明執行任務的線程有可能被中斷了,因為s >= INTERRUPTING 只有兩種可能,state狀態為INTERRUPTING和INTERRUPTED。
有的同學可能就要問了,咱前面已經執行過的set方法或者setException方法不是已經將state狀態設置成NORMAL或者EXCEPTIONAL了嗎?怎么會出現INTERRUPTING或者INTERRUPTED狀態呢?別忘了,咱們在多線程的環境中,在當前線程執行run方法的同時,有可能其他線程取消了任務的執行,此時其他線程就可能對state狀態進行改寫,這也就是我們在設置終止狀態的時候用putOrderedInt方法,而沒有用CAS操作的原因——我們無法確信在設置state前是處于COMPLETING中間態還是INTERRUPTING中間態。
關于任務取消的操作,我們后面講Future接口的實現的時候再講,回到現在的問題,我們來看看handlePossibleCancellationInterrupt方法干了點啥:
/** * Ensures that any interrupt from a possible cancel(true) is only * delivered to a task while in run or runAndReset. */ private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us. Let"s spin-wait patiently. if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt }
可見該方法是一個自旋操作,如果當前的state狀態是INTERRUPTING,我們在原地自旋,直到state狀態轉換成終止態。
至此,run方法的分析就真的結束了。我們來總結一下:
run方法重點做了以下幾件事:
將runner屬性設置成當前正在執行run方法的線程
調用callable成員變量的call方法來執行任務
設置執行結果outcome, 如果執行成功, 則outcome保存的就是執行結果;如果執行過程中發生了異常, 則outcome中保存的就是異常,設置結果之前,先將state狀態設為中間態
對outcome的賦值完成后,設置state狀態為終止態(NORMAL或者EXCEPTIONAL)
喚醒Treiber棧中所有等待的線程
善后清理(waiters, callable,runner設為null)
檢查是否有遺漏的中斷,如果有,等待中斷狀態完成。
這里再插一句,我們前面說“state只要不是NEW狀態,就說明任務已經執行完成了”就體現在這里,因為run方法中,我們是在c.call()執行完畢或者拋出了異常之后才開始設置中間態和終止態的。
Future接口的實現Future接口一共定義了5個方法,我們一個個來看:
cancel(boolean mayInterruptIfRunning)既然上面在分析run方法的最后,我們提到了任務可能被別的線程取消,那我們就趁熱打鐵,看看怎么取消一個任務的執行:
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
還記得我們上一篇在介紹Future接口的時候對cancel方法的說明嗎?
關于cancel方法,這里要補充說幾點:
首先有以下三種情況之一的,cancel操作一定是失敗的:任務已經執行完成了
任務已經被取消過了
任務因為某種原因不能被取消
其它情況下,cancel操作將返回true。值得注意的是,cancel操作返回true并不代表任務真的就是被取消了,這取決于發動cancel狀態時,任務所處的狀態:
如果發起cancel時任務還沒有開始運行,則隨后任務就不會被執行;
如果發起cancel時任務已經在運行了,則這時就需要看mayInterruptIfRunning參數了:
如果mayInterruptIfRunning 為true, 則當前在執行的任務會被中斷
如果mayInterruptIfRunning 為false, 則可以允許正在執行的任務繼續運行,直到它執行完
我們來看看FutureTask是怎么實現cancel方法的這幾個規范的:
首先,對于“任務已經執行完成了或者任務已經被取消過了,則cancel操作一定是失敗的(返回false)”這兩條,是通過簡單的判斷state值是否為NEW實現的,因為我們前面說過了,只要state不為NEW,說明任務已經執行完畢了。從代碼中可以看出,只要state不為NEW,則直接返回false。
如果state還是NEW狀態,我們再往下看:
UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)
這一段是根據mayInterruptIfRunning的值將state的狀態由NEW設置成INTERRUPTING或者CANCELLED,當這一操作也成功之后,就可以執行后面的try語句了,但無論怎么,該方法最后都返回了true。
我們再接著看try塊干了點啥:
try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); }
我們知道,runner屬性中存放的是當前正在執行任務的線程,因此,這個try塊的目的就是中斷當前正在執行任務的線程,最后將state的狀態設為INTERRUPTED,當然,中斷操作完成后,還需要通過finishCompletion()來喚醒所有在Treiber棧中等待的線程。
我們現在總結一下,cancel方法實際上完成以下兩種狀態轉換之一:
NEW -> CANCELLED (對應于mayInterruptIfRunning=false)
NEW -> INTERRUPTING -> INTERRUPTED (對應于mayInterruptIfRunning=true)
對于第一條路徑,雖說cancel方法最終返回了true,但它只是簡單的把state狀態設為CANCELLED,并不會中斷線程的執行。但是這樣帶來的后果是,任務即使執行完畢了,也無法設置任務的執行結果,因為前面分析run方法的時候我們知道,設置任務結果有一個中間態,而這個中間態的設置,是以當前state狀態為NEW為前提的。
對于第二條路徑,則會中斷執行任務的線程,我們在倒回上面的run方法看看:
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callablec = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
雖然第二條路徑中斷了當前正在執行的線程,但是,響不響應這個中斷是由執行任務的線程自己決定的,更具體的說,這取決于c.call()方法內部是否對中斷進行了響應,是否將中斷異常拋出。
那call方法中是怎么處理中斷的呢?從上面的代碼中可以看出,catch語句處理了所有的Throwable的異常,這自然也包括了中斷異常。
然而,值得一提的是,即使這里進入了catch (Throwable ex){}代碼塊,setException(ex)的操作一定是失敗的,因為在我們取消任務執行的線程中,我們已經先把state狀態設為INTERRUPTING了,而setException(ex)的操作要求設置前線程的狀態為NEW。所以這里響應cancel方法所造成的中斷最大的意義不是為了對中斷進行處理,而是簡單的停止任務線程的執行,節省CPU資源。
那讀者可能會問了,既然這個setException(ex)的操作一定是失敗的,那放在這里有什么用呢?事實上,這個setException(ex)是用來處理任務自己在正常執行過程中產生的異常的,在我們沒有主動去cancel任務時,任務的state狀態在執行過程中就會始終是NEW,如果任務此時自己發生了異常,則這個異常就會被setException(ex)方法成功的記錄到outcome中。
反正無論如何,run方法最終都會進入finally塊,而這時候它會發現s >= INTERRUPTING,如果檢測發現s = INTERRUPTING,說明cancel方法還沒有執行到中斷當前線程的地方,那就等待它將state狀態設置成INTERRUPTED。到這里,對cancel方法的分析就和上面對run方法的分析對接上了。
cancel方法到這里就分析完了,如果你一條條的去對照Future接口對于cancel方法的規范,它每一條都是實現了的,而它實現的核心機理,就是對state的當前狀態的判斷和設置。由此可見,state屬性是貫穿整個FutureTask的最核心的屬性。
isCancelled()說完了cancel,我們再來看看 isCancelled()方法,相較而言,它就簡單多了:
public boolean isCancelled() { return state >= CANCELLED; }
那么state >= CANCELLED 包含了那些狀態呢,它包括了: CANCELLED INTERRUPTING INTERRUPTED
我們再來回憶下上一篇講的Future接口對于isCancelled()方法的規范:
該方法用于判斷任務是否被取消了。如果一個任務在正常執行完成之前被Cancel掉了, 則返回true
再對比state的狀態圖:
可見選取這三個狀態作為判斷依據是很合理的, 因為只有調用了cancel方法,才會使state狀態進入這三種狀態。
與 isCancelled方法類似,isDone方法也是簡單地通過state狀態來判斷。
public boolean isDone() { return state != NEW; }
關于這一點,其實我們之前已經說過了,只要state狀態不是NEW,則任務已經執行完畢了,因為state狀態不存在類似“任務正在執行中”這種狀態,即使是短暫的中間態,也是發生在任務已經執行完畢,正在設置任務結果的時候。
get()最后我們來看看獲取執行結果的get方法,先來看看無參的版本:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
該方法其實很簡單,當任務還沒有執行完畢或者正在設置執行結果時,我們就使用awaitDone方法等待任務進入終止態,注意,awaitDone的返回值是任務的狀態,而不是任務的結果。任務進入終止態之后,我們就根據任務的執行結果來返回計算結果或者拋出異常。
我們先來看看等待任務完成的awaitDone方法,該方法是獲取任務結果最核心的方法,它完成了獲取結果,掛起線程,響應中斷等諸多操作:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
在具體分析它的源碼之前,有一點我們先特別說明一下,FutureTask中會涉及到兩類線程,一類是執行任務的線程,它只有一個,FutureTask的run方法就由該線程來執行;一類是獲取任務執行結果的線程,它可以有多個,這些線程可以并發執行,每一個線程都是獨立的,都可以調用get方法來獲取任務的執行結果。如果任務還沒有執行完,則這些線程就需要進入Treiber棧中掛起,直到任務執行結束,或者等待的線程自身被中斷。
理清了這一點后,我們再來詳細看看awaitDone方法。可以看出,該方法的大框架是一個自旋操作,我們一段一段來看:
for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } // ... }
首先一開始,我們先檢測當前線程是否被中斷了,這是因為get方法是阻塞式的,如果等待的任務還沒有執行完,則調用get方法的線程會被扔到Treiber棧中掛起等待,直到任務執行完畢。但是,如果任務遲遲沒有執行完畢,則我們也有可能直接中斷在Treiber棧中的線程,以停止等待。
當檢測到線程被中斷后,我們調用了removeWaiter:
private void removeWaiter(WaitNode node) { if (node != null) { ... } }
removeWaiter的作用是將參數中的node從等待隊列(即Treiber棧)中移除。如果此時線程還沒有進入Treiber棧,則 q=null,那么removeWaiter(q)啥也不干。在這之后,我們就直接拋出了InterruptedException異常。
接著往下看:
for (;;) { /*if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); }*/ int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); }
如果任務已經進入終止態(s > COMPLETING),我們就直接返回任務的狀態;
否則,如果任務正在設置執行結果(s == COMPLETING),我們就讓出當前線程的CPU資源繼續等待
否則,就說明任務還沒有執行,或者任務正在執行過程中,那么這時,如果q現在還為null, 說明當前線程還沒有進入等待隊列,于是我們新建了一個WaitNode, WaitNode的構造函數我們之前已經看過了,就是生成了一個記錄了當前線程的節點;
如果q不為null,說明代表當前線程的WaitNode已經被創建出來了,則接下來如果queued=false,表示當前線程還沒有入隊,所以我們執行了:
queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);
這行代碼的作用是通過CAS操作將新建的q節點添加到waiters鏈表的頭節點之前,其實就是Treiber棧的入棧操作,寫的還是很簡潔的,一行代碼就搞定了,如果大家還是覺得暈乎,下面是它等價的偽代碼:
q.next = waiters; //當前節點的next指向目前的棧頂元素 //如果棧頂節點在這個過程中沒有變,即沒有發生并發入棧的情況 if(waiters的值還是上面q.next所使用的waiters值){ waiters = q; //修改棧頂的指針,指向剛剛入棧的節點 }
這個CAS操作就是為了保證同一時刻如果有多個線程在同時入棧,則只有一個能夠操作成功,也即Treiber棧的規范。
如果以上的條件都不滿足,則再接下來因為現在是不帶超時機制的get,timed為false,則else if代碼塊跳過,然后來到最后一個else, 把當前線程掛起,此時線程就處于阻塞等待的狀態。
至此,在任務沒有執行完畢的情況下,獲取任務執行結果的線程就會在Treiber棧中被LockSupport.park(this)掛起了。
那么這個掛起的線程什么時候會被喚醒呢?有兩種情況:
任務執行完畢了,在finishCompletion方法中會喚醒所有在Treiber棧中等待的線程
等待的線程自身因為被中斷等原因而被喚醒。
我們接下來就繼續看看線程被喚醒后的情況,此時,線程將回到for(;;)循環的開頭,繼續下一輪循環:
for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); // 掛起的線程從這里被喚醒 }
首先自然還是檢測中斷,所不同的是,此時q已經不為null了,因此在有中斷發生的情況下,在拋出中斷之前,多了一步removeWaiter(q)操作,該操作是將當前線程從等待的Treiber棧中移除,相比入棧操作,這個出棧操作要復雜一點,這取決于節點是否位于棧頂。下面我們來仔細分析這個出棧操作:
private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } }
首先,我們把要出棧的WaitNode的thread屬性設置為null, 這相當于一個標記,是我們后面在waiters鏈表中定位該節點的依據。
(1) 要移除的節點就在棧頂
我們先來看看該節點就位于棧頂的情況,這說明在該節點入棧后,并沒有別的線程再入棧了。由于一開始我們就將該節點的thread屬性設為了null,因此,前面的q.thread != null 和 pred != null都不滿足,我們直接進入到最后一個else if 分支:
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry;
這一段是棧頂節點出棧的操作,和入棧類似,采用了CAS比較,將棧頂元素設置成原棧頂節點的下一個節點。
值得注意的是,當CAS操作不成功時,程序會回到retry處重來,但即使CAS操作成功了,程序依舊會遍歷完整個鏈表,找尋node.thread == null 的節點,并將它們一并從鏈表中剔除。
(2) 要移除的節點不在棧頂
當要移除的節點不在棧頂時,我們會一直遍歷整個鏈表,直到找到q.thread == null的節點,找到之后,我們將進入
else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; }
這是因為節點不在棧頂,則其必然是有前驅節點pred的,這時,我們只是簡單的讓前驅節點指向當前節點的下一個節點,從而將目標節點從鏈表中剔除。
注意,后面多加的那個if判斷是很有必要的,因為removeWaiter方法并沒有加鎖,所以可能有多個線程在同時執行,WaitNode的兩個成員變量thread和next都被設置成volatile,這保證了它們的可見性,如果我們在這時發現了pred.thread == null,那就意味著它已經被另一個線程標記了,將在另一個線程中被拿出waiters鏈表,而我們當前目標節點的原后繼節點現在是接在這個pred節點上的,因此,如果pred已經被其他線程標記為要拿出去的節點,我們現在這個線程再繼續往后遍歷就沒有什么意義了,所以這時就調到retry處,從頭再遍歷。
如果pred節點沒有被其他線程標記,那我們就接著往下遍歷,直到整個鏈表遍歷完。
至此,將節點從waiters鏈表中移除的removeWaiter操作我們就分析完了,我們總結一下該方法:
在該方法中,會傳入一個需要移除的節點,我們會將這個節點的thread屬性設置成null,以標記該節點。然后無論如何,我們會遍歷整個鏈表,清除那些被標記的節點(只是簡單的將節點從鏈表中剔除)。如果要清除的節點就位于棧頂,則還需要注意重新設置waiters的值,指向新的棧頂節點。所以可以看出,雖說removeWaiter方法傳入了需要剔除的節點,但是事實上它可能剔除的不止是傳入的節點,而是所有已經被標記了的節點,這樣不僅清除操作容易了些(不需要專門去定位傳入的node在哪里),而且提升了效率(可以同時清除所有已經被標記的節點)。
我們再回到awaitDone方法里:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); // 剛剛分析到這里了,我們接著往下看 throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
如果線程不是因為中斷被喚醒,則會繼續往下執行,此時會再次獲取當前的state狀態。所不同的是,此時q已經不為null, queued已經為true了,所以已經不需要將當前節點再入waiters棧了。
至此我們知道,除非被中斷,否則get方法會在原地自旋等待(用的是Thread.yield,對應于s == COMPLETING)或者直接掛起(對應任務還沒有執行完的情況),直到任務執行完成。而我們前面分析run方法和cancel方法的時候知道,在run方法結束后,或者cancel方法取消完成后,都會調用finishCompletion()來喚醒掛起的線程,使它們得以進入下一輪循環,獲取任務執行結果。
最后,等awaitDone函數返回后,get方法返回了report(s),以根據任務的狀態,匯報執行結果:
@SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
可見,report方法非常簡單,它根據當前state狀態,返回正常執行的結果,或者拋出指定的異常。
至此,get方法就分析結束了。
值得注意的是,awaitDone方法和get方法都沒有加鎖,這在多個線程同時執行get方法的時候會不會產生線程安全問題呢?通過查看方法內部的參數我們知道,整個方法內部用的大多數是局部變量,因此不會產生線程安全問題,對于全局的共享變量waiters的修改時,也使用了CAS操作,保證了線程安全,而state變量本身是volatile的,保證了讀取時的可見性,因此整個方法調用雖然沒有加鎖,它仍然是線程安全的。
get(long timeout, TimeUnit unit)最后我們來看看帶超時版本的get方法:
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
它和上面不帶超時時間的get方法很類似,只是在awaitDone方法中多了超時檢測:
else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); }
即,如果指定的超時時間到了,則直接返回,如果返回時,任務還沒有進入終止狀態,則直接拋出TimeoutException異常,否則就像get()方法一樣,正常的返回執行結果。
總結FutureTask實現了Runnable和Future接口,它表示了一個帶有任務狀態和任務結果的任務,它的各種操作都是圍繞著任務的狀態展開的,值得注意的是,在所有的7個任務狀態中,只要不是NEW狀態,就表示任務已經執行完畢或者不再執行了,并沒有表示“任務正在執行中”的狀態。
除了代表了任務的Callable對象、代表任務執行結果的outcome屬性,FutureTask還包含了一個代表所有等待任務結束的線程的Treiber棧,這一點其實和各種鎖的等待隊列特別像,即如果拿不到鎖,則當前線程就會被扔進等待隊列中;這里則是如果任務還沒有執行結束,則所有等待任務執行完畢的線程就會被扔進Treiber棧中,直到任務執行完畢了,才會被喚醒。
FutureTask雖然為我們提供了獲取任務執行結果的途徑,遺憾的是,在獲取任務結果時,如果任務還沒有執行完成,則當前線程會自旋或者掛起等待,這和我們實現異步的初衷是相違背的,我們后面將繼續介紹另一個同步工具類CompletableFuture, 它解決了這個問題。
(完)
系列文章目錄
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77337.html
摘要:為了避免一篇文章的篇幅過長,于是一些比較大的主題就都分成幾篇來講了,這篇文章是筆者所有文章的目錄,將會持續更新,以給大家一個查看系列文章的入口。 前言 大家好,筆者是今年才開始寫博客的,寫作的初衷主要是想記錄和分享自己的學習經歷。因為寫作的時候發現,為了弄懂一個知識,不得不先去了解另外一些知識,這樣以來,為了說明一個問題,就要把一系列知識都了解一遍,寫出來的文章就特別長。 為了避免一篇...
摘要:在分析它的源碼之前我們需要先了解一些預備知識。因為接口沒有返回值所以為了與兼容我們額外傳入了一個參數使得返回的對象的方法直接執行的方法然后返回傳入的參數。 前言 系列文章目錄 FutureTask 是一個同步工具類,它實現了Future語義,表示了一種抽象的可生成結果的計算。在包括線程池在內的許多工具類中都會用到,弄懂它的實現將有利于我們更加深入地理解Java異步操作實現。 在分析...
摘要:零前期準備文章異常啰嗦且繞彎。版本版本簡介是中默認的實現類,常與結合進行多線程并發操作。所以方法的主體其實就是去喚醒被阻塞的線程。本文僅為個人的學習筆記,可能存在錯誤或者表述不清的地方,有緣補充 零 前期準備 0 FBI WARNING 文章異常啰嗦且繞彎。 1 版本 JDK 版本 : OpenJDK 11.0.1 IDE : idea 2018.3 2 ThreadLocal 簡介 ...
閱讀 3486·2021-10-18 13:30
閱讀 2941·2021-10-09 09:44
閱讀 1964·2019-08-30 11:26
閱讀 2287·2019-08-29 13:17
閱讀 757·2019-08-29 12:17
閱讀 2246·2019-08-26 18:42
閱讀 471·2019-08-26 13:24
閱讀 2951·2019-08-26 11:39