摘要:本文首發(fā)于一世流云的專欄一模式簡介模式是多線程設(shè)計模式中的一種常見模式,它的主要作用就是異步地執(zhí)行任務(wù),并在需要的時候獲取結(jié)果。二中的模式在多線程基礎(chǔ)之模式中,我們曾經(jīng)給出過模式的通用類關(guān)系圖。
本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog...一、Future模式簡介
Future模式是Java多線程設(shè)計模式中的一種常見模式,它的主要作用就是異步地執(zhí)行任務(wù),并在需要的時候獲取結(jié)果。我們知道,一般調(diào)用一個函數(shù),需要等待函數(shù)執(zhí)行完成,調(diào)用線程才會繼續(xù)往下執(zhí)行,如果是一些計算密集型任務(wù),需要等待的時間可能就會比較長。
筆者在讀書期間曾參與過一個國家電網(wǎng)的復(fù)雜水電系統(tǒng)的聯(lián)合優(yōu)化調(diào)度項目,需要在工業(yè)上可接受的時間內(nèi)計算出整個云南地區(qū)近40座大型水電站的日發(fā)電計劃。
在電力系統(tǒng)中日發(fā)電計劃的制定非常重要,同時又涉及水利學(xué)、經(jīng)濟(jì)學(xué)、電氣工程、政治政策等諸多復(fù)雜約束條件,工業(yè)上基本都是通過混合整數(shù)規(guī)劃、動態(tài)規(guī)劃再結(jié)合其它數(shù)學(xué)規(guī)劃方法建模求解,模型涉及的變量基本都是百萬級以上。
試想一下,這種復(fù)雜的計算模型,假設(shè)我把它封裝到一個函數(shù)中,由調(diào)用方進(jìn)行單線程調(diào)用,需要等待多少時間?如果將模型集成到UI,用戶在界面上點擊一下計算,那可能用戶基本就認(rèn)為應(yīng)用假設(shè)死崩潰了。
在Java中,一種解決辦法是由調(diào)用線程新建一個線程執(zhí)行該任務(wù),比如下面這樣:
public void calculate(){ Thread t = new Thread(new Runnable() { @Override public void run() { model.calculate(); } }); t.start(); }
但是,這樣有一個問題,我拿不到計算結(jié)果,也不知道任務(wù)到底什么時候計算結(jié)束。我們來看下Future模式是如何來解決的。
Future模式,可以讓調(diào)用方立即返回,然后它自己會在后面慢慢處理,此時調(diào)用者拿到的僅僅是一個憑證,調(diào)用者可以先去處理其它任務(wù),在真正需要用到調(diào)用結(jié)果的場合,再使用憑證去獲取調(diào)用結(jié)果。這個憑證就是這里的Future。
我們看下泳道圖來理解下兩者的區(qū)別:
傳統(tǒng)的數(shù)據(jù)獲取方式:
Future模式下的數(shù)據(jù)獲取:
如果讀者對經(jīng)濟(jì)學(xué)有些了解,或是了解金融衍生品的話,對Future這個單詞應(yīng)該不會陌生,Future在經(jīng)濟(jì)學(xué)中出現(xiàn)的頻率相當(dāng)之高,比如關(guān)于現(xiàn)金流的折算,其中的終值,英文就是Future value。常見的金融衍生品,期貨、遠(yuǎn)期的英文分別是Futures、Financial future。
我們之前說了,F(xiàn)uture模式可以理解為一種憑證,拿著該憑證在將來的某個時間點可以取到我想要的東西,這其實就和期貨、遠(yuǎn)期有點類似了,期貨、遠(yuǎn)期也是雙方制定協(xié)議或合同,然后在約定的某個時間點,拿著合同進(jìn)行資金或?qū)嵨锏慕桓睢?梢姡現(xiàn)uture模式的命名是很有深意且很恰當(dāng)?shù)摹?/p> 二、J.U.C中的Future模式
在Java多線程基礎(chǔ)之Future模式中,我們曾經(jīng)給出過Future模式的通用類關(guān)系圖。本章中,我不想教科書般得再貼一遍該圖,而是希望能循序漸進(jìn)地帶領(lǐng)讀者去真正理解Future模式中的各個組件,去思考為什么Future模式的類關(guān)系圖是那樣,為什么一定就是那么幾個組件?
真實的任務(wù)類首先來思考下,我們需要執(zhí)行的是一個任務(wù),那么在Java中,一般需要實現(xiàn)Runnable接口,比如像下面這樣:
public class Task implements Runnable { @Override public void run() { // do something } }
但是,如果我需要任務(wù)的返回結(jié)果呢,從Runnable的接口定義來看,并不能滿足我們的要求,Runnable一般僅僅用于定義一個可以被線程執(zhí)行的任務(wù),它的run方法沒有返回值:
public interface Runnable { public abstract void run(); }
于是,JDK提供了另一個接口——Callable,表示一個具有返回結(jié)果的任務(wù):
public interface Callable{ V call() throws Exception; }
所以,最終我們自定義的任務(wù)類一般都是實現(xiàn)了Callable接口。以下定義了一個具有復(fù)雜計算過程的任務(wù),最終返回一個Double值:
public class ComplexTask implements Callable憑證{ @Override public Double call() { // complex calculating... return ThreadLocalRandom.current().nextDouble(); } }
第一節(jié)講到,F(xiàn)uture模式可以讓調(diào)用方獲取任務(wù)的一個憑證,以便將來拿著憑證去獲取任務(wù)結(jié)果,憑證需要具有以下特點:
在將來某個時間點,可以通過憑證獲取任務(wù)的結(jié)果;
可以支持取消。
從以上兩點來看,我們首先想到的方式就是對Callable任務(wù)進(jìn)行包裝,包裝成一個憑證,然后返回給調(diào)用方。
J.U.C提供了Future接口和它的實現(xiàn)類——FutureTask來滿足我們的需求,我們可以像下面這樣對之前定義的ComplexTask包裝:
ComplexTask task = new ComplexTask(); Futurefuture = new FutureTask (task);
上面的FutureTask就是真實的“憑證”,F(xiàn)uture則是該憑證的接口(從面向?qū)ο蟮慕嵌葋碇v,調(diào)用方應(yīng)面向接口操作)。
Future接口的定義:
public interface Future{ ? boolean cancel(boolean mayInterruptIfRunning); ? boolean isCancelled(); ? boolean isDone(); ? V get() throws InterruptedException, ExecutionException; ? V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
Future接口很簡單,提供了isCancelled和isDone兩個方法監(jiān)控任務(wù)的執(zhí)行狀態(tài),一個cancel方法用于取消任務(wù)的執(zhí)行。兩個get方法用于獲取任務(wù)的執(zhí)行結(jié)果,如果任務(wù)未執(zhí)行完成,除非設(shè)置超時,否則調(diào)用線程將會阻塞。
此外,為了能夠被線程或線程池執(zhí)行任務(wù),憑證還需要實現(xiàn)Runnable接口,所以J.U.C還提供了一個RunnableFuture接口,其實就是組合了Runnable和Future接口:
public interface RunnableFutureextends Runnable, Future { void run(); }
上面提到的FutureTask,其實就是實現(xiàn)了RunnableFuture接口的“憑證”:
public class FutureTaskimplements RunnableFuture { ? public FutureTask(Callable callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; // ... } ? public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); // ... } }
從構(gòu)造函數(shù)可以看到,F(xiàn)utureTask既可以包裝Callable任務(wù),也可以包裝Runnable任務(wù),但最終都是將Runnable轉(zhuǎn)換成Callable任務(wù),其實是一個適配過程。
調(diào)用方最終,調(diào)用方可以以下面這種方式使用Future模式,異步地獲取任務(wù)的執(zhí)行結(jié)果。
public class Client {
public static void main(String[] args) throws ExecutionException, InterruptedException { ComplexTask task = new ComplexTask(); Futurefuture = new FutureTask (task); // time passed... Double result = future.get(); }
}
通過上面的分析,可以看到,整個Future模式其實就三個核心組件:
真實任務(wù)/數(shù)據(jù)類(通常任務(wù)執(zhí)行比較慢,或數(shù)據(jù)構(gòu)造需要較長時間),即示例中的ComplexTask
Future接口(調(diào)用方使用該憑證獲取真實任務(wù)/數(shù)據(jù)的結(jié)果),即Future接口
Future實現(xiàn)類(用于對真實任務(wù)/數(shù)據(jù)進(jìn)行包裝),即FutureTask實現(xiàn)類
三、FutureTask原理在J.U.C提供的Future模式中,最重要的就是FutureTask類,F(xiàn)utureTask是在JDK1.5時,隨著J.U.C一起引入的,它代表著一個異步任務(wù),這個任務(wù)一般提交給Executor執(zhí)行,當(dāng)然也可以由調(diào)用方直接調(diào)用run方法運行。
既然是任務(wù),就有狀態(tài),F(xiàn)utureTask一共給任務(wù)定義了7種狀態(tài):
NEW:表示任務(wù)的初始化狀態(tài);
COMPLETING:表示任務(wù)已執(zhí)行完成(正常完成或異常完成),但任務(wù)結(jié)果或異常原因還未設(shè)置完成,屬于中間狀態(tài);
NORMAL:表示任務(wù)已經(jīng)執(zhí)行完成(正常完成),且任務(wù)結(jié)果已設(shè)置完成,屬于最終狀態(tài);
EXCEPTIONAL:表示任務(wù)已經(jīng)執(zhí)行完成(異常完成),且任務(wù)異常已設(shè)置完成,屬于最終狀態(tài);
CANCELLED:表示任務(wù)還沒開始執(zhí)行就被取消(非中斷方式),屬于最終狀態(tài);
INTERRUPTING:表示任務(wù)還沒開始執(zhí)行就被取消(中斷方式),正式被中斷前的過渡狀態(tài),屬于中間狀態(tài);
INTERRUPTED:表示任務(wù)還沒開始執(zhí)行就被取消(中斷方式),且已被中斷,屬于最終狀態(tài)。
各個狀態(tài)之間的狀態(tài)轉(zhuǎn)換圖如下:
上圖需要注意的是兩點:
FutureTask雖然支持任務(wù)的取消(cancel方法),但是只有當(dāng)任務(wù)是初始化(NEW狀態(tài))時才有效,否則cancel方法直接返回false;
當(dāng)執(zhí)行任務(wù)時(run方法),無論成功或異常,都會先過渡到COMPLETING狀態(tài),直到任務(wù)結(jié)果設(shè)置完成后,才會進(jìn)入響應(yīng)的終態(tài)。
JDK1.7之前,F(xiàn)utureTask通過內(nèi)部類實現(xiàn)了AQS框架來實現(xiàn)功能。 JDK1.7及以后,則改變?yōu)橹苯油ㄟ^Unsafe類CAS操作state狀態(tài)字段來進(jìn)行同步。構(gòu)造
FutureTask在構(gòu)造時可以接受Runnable或Callable任務(wù),如果是Runnable,則最終包裝成Callable:
public FutureTask(Callablecallable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; }
上述的Executors.callable()方法我們在executors框架概述提到過,其實就是對Runnable對象做了適配,返回Callable適配對象——RunnableAdapter:
public staticCallable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter (task, result); }
static final class RunnableAdapterimplements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
FutureTask的字段定義非常簡單,State標(biāo)識任務(wù)的當(dāng)前狀態(tài),狀態(tài)之間的轉(zhuǎn)換通過Unsafe來操作,所有操作都基于自旋+CAS完成:
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; ? private Callablecallable; // 真正的任務(wù) private volatile Thread runner; // 保存正在執(zhí)行任務(wù)的線程 ? /** * 記錄結(jié)果或異常 */ private Object outcome; ? /** * 無鎖棧(Treiber stack) * 保存等待線程 */ private volatile WaitNode waiters;
注意waiters這個字段,waiters指向一個“無鎖棧”,該棧保存著所有等待線程,我們知道當(dāng)調(diào)用FutureTask的get方法時,如果任務(wù)沒有完成,則調(diào)用線程會被阻塞,其實就是將線程包裝成WaitNode結(jié)點保存到waiters指向的棧中:
static final class WaitNode { volatile Thread thread; volatile WaitNode next; ? WaitNode() { thread = Thread.currentThread(); } }任務(wù)的運行
FutureTask的運行就是調(diào)用了run方法:
public void run() { // 僅當(dāng)任務(wù)為NEW狀態(tài)時, 才能執(zhí)行任務(wù) if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callablec = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
上述方法,首先判斷當(dāng)前任務(wù)的state是否等于NEW,如果不為NEW則說明任務(wù)或者已經(jīng)執(zhí)行過,或者已經(jīng)被取消,直接返回。
正常執(zhí)行完成后,會調(diào)用set方法設(shè)置任務(wù)執(zhí)行結(jié)果:
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
如果任務(wù)執(zhí)行過程中拋出異常,則調(diào)用setException設(shè)置異常信息:
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }任務(wù)的取消
cancel方法用于取消任務(wù),參數(shù)mayInterruptIfRunning如果為true,表示中斷正在執(zhí)行任務(wù)的線程,否則僅僅是將任務(wù)狀態(tài)置為CANCELLED :
public boolean cancel(boolean mayInterruptIfRunning) { // 僅NEW狀態(tài)下可以取消任務(wù) if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { if (mayInterruptIfRunning) { // 中斷任務(wù) try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
任務(wù)取消后,最終調(diào)用finishCompletion方法,釋放所有在棧上等待的線程:
/** * 喚醒棧上的所有等待線程. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null; ) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (; ; ) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } ? done(); // 鉤子方法 callable = null; // to reduce footprint }結(jié)果獲取
FutureTask可以通過get方法獲取任務(wù)結(jié)果,如果需要限時等待,可以調(diào)用get(long timeout, TimeUnit unit)。
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); // 映射任務(wù)執(zhí)行結(jié)果 }
可以看到,如果當(dāng)前任務(wù)的狀態(tài)是NEW或COMPLETING,會調(diào)用awaitDone阻塞線程。否則會認(rèn)為任務(wù)已經(jīng)完成,直接通過report方法映射結(jié)果:
/** * 將同步狀態(tài)映射為執(zhí)行結(jié)果. */ private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V) x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable) x); }
report會根據(jù)任務(wù)的狀態(tài)進(jìn)行映射,如果任務(wù)是Normal狀態(tài),說明正常執(zhí)行完成,則返回任務(wù)結(jié)果;如果任務(wù)被取消(CANCELLED或INTERRUPTED),則拋出CancellationException;其它情況則拋出ExecutionException。
四、ScheduledFutureTask在ScheduledThreadPoolExecutor一節(jié)中,我們曾經(jīng)介紹過另一種FutureTask——ScheduledFutureTask,ScheduledFutureTask是ScheduledThreadPoolExecutor這個線程池的默認(rèn)調(diào)度任務(wù)類。
ScheduledFutureTask在普通FutureTask的基礎(chǔ)上增加了周期執(zhí)行/延遲執(zhí)行的功能。通過下面的類圖可以看到,它其實是通過繼承FutureTask和Delayed接口來實現(xiàn)周期/延遲功能的。
ScheduledFutureTask(Callablecallable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }
ScheduledFutureTask的源碼非常簡單,基本都是委托FutureTask來實現(xiàn)的,關(guān)鍵是看下運行任務(wù)的方法:
public void run() { boolean periodic = isPeriodic(); // 是否是周期任務(wù) if (!canRunInCurrentRunState(periodic)) // 能否運行任務(wù) cancel(false); else if (!periodic) // 非周期任務(wù):調(diào)用FutureTask的run方法運行 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { // 周期任務(wù):調(diào)用FutureTask的runAndReset方法運行 setNextRunTime(); reExecutePeriodic(outerTask); } }
FutureTask的runAndReset方法與run方法的區(qū)別就是當(dāng)任務(wù)正常執(zhí)行完成后,不會設(shè)置任務(wù)的最終狀態(tài)(即保持NEW狀態(tài)),以便任務(wù)重復(fù)執(zhí)行:
protected boolean runAndReset() { // 僅NEW狀態(tài)的任務(wù)可以執(zhí)行 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return false; boolean ran = false; int s = state; try { Callable五、總結(jié)c = callable; if (c != null && s == NEW) { try { c.call(); // don"t set result ran = true; } catch (Throwable ex) { setException(ex); } } } finally { runner = null; s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } return ran && s == NEW; }
本章我們從源頭開始,講解了Future模式的來龍去脈,并以J.U.C中的Future模式為例,分析了Future模式的組件以及核心實現(xiàn)類——FutureTask,最后回顧了ScheduledFutureTask中定義的內(nèi)部異步任務(wù)類——ScheduledFutureTask。
理解Future模式的關(guān)鍵就是理解它的兩個核心組件,可以類比生活中的憑證來理解這一概念,不必拘泥于Java多線程設(shè)計模式中Future模式的類關(guān)系圖。
真實任務(wù)類
憑證
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/71825.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設(shè)計模式,設(shè)計了并發(fā)包,其中包下提供了一系列基礎(chǔ)的鎖工具,用以對等進(jìn)行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:注意線程與本地操作系統(tǒng)的線程是一一映射的。固定線程數(shù)的線程池提供了兩種創(chuàng)建具有固定線程數(shù)的的方法,固定線程池在初始化時確定其中的線程總數(shù),運行過程中會始終維持線程數(shù)量不變。 showImg(https://segmentfault.com/img/bVbhK58?w=1920&h=1080); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... ...
摘要:同時,它會通過的方法將自己注冊到線程池中。線程池中的每個工作線程都有一個自己的任務(wù)隊列,工作線程優(yōu)先處理自身隊列中的任務(wù)或順序,由線程池構(gòu)造時的參數(shù)決定,自身隊列為空時,以的順序隨機(jī)竊取其它隊列中的任務(wù)。 showImg(https://segmentfault.com/img/bVbizJb?w=1802&h=762); 本文首發(fā)于一世流云的專欄:https://segmentfau...
摘要:并不會為每個任務(wù)都創(chuàng)建工作線程,而是根據(jù)實際情況構(gòu)造線程池時的參數(shù)確定是喚醒已有空閑工作線程,還是新建工作線程。 showImg(https://segmentfault.com/img/bVbiYSP?w=1071&h=707); 本文首發(fā)于一世流云的專欄:https://segmentfault.com/blog... 一、引言 前一章——Fork/Join框架(1) 原理,我們...
摘要:無限期等待另一個線程執(zhí)行特定操作。線程安全基本版請說明以及的區(qū)別值都不能為空數(shù)組結(jié)構(gòu)上,通過數(shù)組和鏈表實現(xiàn)。優(yōu)先考慮響應(yīng)中斷,而不是響應(yīng)鎖的普通獲取或重入獲取。只是在最后獲取鎖成功后再把當(dāng)前線程置為狀態(tài)然后再中斷線程。 前段時間在慕課網(wǎng)直播上聽小馬哥面試勸退(面試虐我千百遍,Java 并發(fā)真討厭),發(fā)現(xiàn)講得東西比自己拿到offer還要高興,于是自己在線下做了一點小筆記,供各位參考。 課...
閱讀 2978·2021-11-23 09:51
閱讀 3609·2021-10-13 09:39
閱讀 2493·2021-09-22 15:06
閱讀 881·2019-08-30 15:55
閱讀 3147·2019-08-30 15:44
閱讀 1778·2019-08-30 14:05
閱讀 3434·2019-08-29 15:24
閱讀 2362·2019-08-29 12:44