摘要:為了讓大家理解線程池的整個設計方案,我會按照的設計思路來多說一些相關的東西。也是因為線程池的需要,所以才有了這個接口。
線程池是非常重要的工具,如果你要成為一個好的工程師,還是得比較好地掌握這個知識。即使你為了謀生,也要知道,這基本上是面試必問的題目,而且面試官很容易從被面試者的回答中捕捉到被面試者的技術水平。
本文略長,建議在 pc 上閱讀,邊看文章邊翻源碼(Java7 和 Java8 都一樣),建議想好好看的讀者抽出至少 15 至 30 分鐘的整塊時間來閱讀。當然,如果讀者僅為面試準備,可以直接滑到最后的總結部分。
總覽開篇來一些廢話。下圖是 java 線程池幾個相關類的繼承結構:
先簡單說說這個繼承結構,Executor 位于最頂層,也是最簡單的,就一個 execute(Runnable runnable) 接口方法定義。
ExecutorService 也是接口,在 Executor 接口的基礎上添加了很多的接口方法,所以一般來說我們會使用這個接口。
然后再下來一層是 AbstractExecutorService,從名字我們就知道,這是抽象類,這里實現了非常有用的一些方法供子類直接使用,之后我們再細說。
然后才到我們的重點部分 ThreadPoolExecutor 類,這個類提供了關于線程池所需的非常豐富的功能。
另外,我們還涉及到下圖中的這些類:
同在并發包中的 Executors 類,類名中帶字母 s,我們猜到這個是工具類,里面的方法都是靜態方法,如以下我們最常用的用于生成 ThreadPoolExecutor 的實例的一些方法:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
另外,由于線程池支持獲取線程執行的結果,所以,引入了 Future 接口,RunnableFuture 繼承自此接口,然后我們最需要關心的就是它的實現類 FutureTask。到這里,記住這個概念,在線程池的使用過程中,我們是往線程池提交任務(task),使用過線程池的都知道,我們提交的每個任務是實現了 Runnable 接口的,其實就是先將 Runnable 的任務包裝成 FutureTask,然后再提交到線程池。這樣,讀者才能比較容易記住 FutureTask 這個類名:它首先是一個任務(Task),然后具有 Future 接口的語義,即可以在將來(Future)得到執行的結果。
當然,線程池中的 BlockingQueue 也是非常重要的概念,如果線程數達到 corePoolSize,我們的每個任務會提交到等待隊列中,等待線程池中的線程來取任務并執行。這里的 BlockingQueue 通常我們使用其實現類 LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue,每個實現類都有不同的特征,使用場景之后會慢慢分析。想要詳細了解各個 BlockingQueue 的讀者,可以參考我的前面的一篇對 BlockingQueue 的各個實現類進行詳細分析的文章。
把事情說完整:除了上面說的這些類外,還有一個很重要的類,就是定時任務實現類 ScheduledThreadPoolExecutor,它繼承自本文要重點講解的 ThreadPoolExecutor,用于實現定時執行。不過本文不會介紹它的實現,我相信讀者看完本文后可以比較容易地看懂它的源碼。
以上就是本文要介紹的知識,廢話不多說,開始進入正文。
Executor 接口/*
@since 1.5
@author Doug Lea
*/
public interface Executor {
void execute(Runnable command);
}
我們可以看到 Executor 接口非常簡單,就一個 void execute(Runnable command) 方法,代表提交一個任務。為了讓大家理解 java 線程池的整個設計方案,我會按照 Doug Lea 的設計思路來多說一些相關的東西。
我們經常這樣啟動一個線程:
new Thread(new Runnable(){
// do something
}).start();
用了線程池 Executor 后就可以像下面這么使用:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
如果我們希望線程池同步執行每一個任務,我們可以這么實現這個接口:
class DirectExecutor implements Executor {
public void execute(Runnable r) { r.run();// 這里不是用的new Thread(r).start(),也就是說沒有啟動任何一個新的線程。 }
}
我們希望每個任務提交進來后,直接啟動一個新的線程來執行這個任務,我們可以這么實現:
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) { new Thread(r).start(); // 每個任務都用一個新的線程來執行 }
}
我們再來看下怎么組合兩個 Executor 來使用,下面這個實現是將所有的任務都加到一個 queue 中,然后從 queue 中取任務,交給真正的執行器執行,這里采用 synchronized 進行并發控制:
class SerialExecutor implements Executor {
// 任務隊列 final Queuetasks = new ArrayDeque (); // 這個才是真正的執行器 final Executor executor; // 當前正在執行的任務 Runnable active; // 初始化的時候,指定執行器 SerialExecutor(Executor executor) { this.executor = executor; } // 添加任務到線程池: 將任務添加到任務隊列,scheduleNext 觸發執行器去任務隊列取任務 public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } } protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { // 具體的執行轉給真正的執行器 executor executor.execute(active); } }
}
當然了,Executor 這個接口只有提交任務的功能,太簡單了,我們想要更豐富的功能,比如我們想知道執行結果、我們想知道當前線程池有多少個線程活著、已經完成了多少任務等等,這些都是這個接口的不足的地方。接下來我們要介紹的是繼承自 Executor 接口的 ExecutorService 接口,這個接口提供了比較豐富的功能,也是我們最常使用到的接口。
一般我們定義一個線程池的時候,往往都是使用這個接口:
ExecutorService executor = Executors.newFixedThreadPool(args...);
ExecutorService executor = Executors.newCachedThreadPool(args...);
因為這個接口中定義的一系列方法大部分情況下已經可以滿足我們的需要了。
那么我們簡單初略地來看一下這個接口中都有哪些方法:
public interface ExecutorService extends Executor {
// 關閉線程池,已提交的任務繼續執行,不接受繼續提交新任務 void shutdown(); // 關閉線程池,嘗試停止正在執行的所有任務,不接受繼續提交新任務 // 它和前面的方法相比,加了一個單詞“now”,區別在于它會去停止當前正在進行的任務 ListshutdownNow(); // 線程池是否已關閉 boolean isShutdown(); // 如果調用了 shutdown() 或 shutdownNow() 方法后,所有任務結束了,那么返回true // 這個方法必須在調用shutdown或shutdownNow方法之后調用才會返回true boolean isTerminated(); // 等待所有任務完成,并設置超時時間 // 我們這么理解,實際應用中是,先調用 shutdown 或 shutdownNow, // 然后再調這個方法等待所有的線程真正地完成,返回值意味著有沒有超時 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // 提交一個 Callable 任務 Future submit(Callable task); // 提交一個 Runnable 任務,第二個參數將會放到 Future 中,作為返回值, // 因為 Runnable 的 run 方法本身并不返回任何東西 Future submit(Runnable task, T result); // 提交一個 Runnable 任務 Future> submit(Runnable task); // 執行所有任務,返回 Future 類型的一個 list List > invokeAll(Collection extends Callable > tasks) throws InterruptedException; // 也是執行所有任務,但是這里設置了超時時間 List > invokeAll(Collection extends Callable > tasks, long timeout, TimeUnit unit) throws InterruptedException; // 只有其中的一個任務結束了,就可以返回,返回執行完的那個任務的結果 T invokeAny(Collection extends Callable > tasks) throws InterruptedException, ExecutionException; // 同上一個方法,只有其中的一個任務結束了,就可以返回,返回執行完的那個任務的結果, // 不過這個帶超時,超過指定的時間,拋出 TimeoutException 異常 T invokeAny(Collection extends Callable > tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
這些方法都很好理解,一個簡單的線程池主要就是這些功能,能提交任務,能獲取結果,能關閉線程池,這也是為什么我們經常用這個接口的原因。
在繼續往下層介紹 ExecutorService 的實現類之前,我們先來說說相關的類 FutureTask。
Future -> RunnableFuture -> FutureTask
Runnable -> RunnableFuture
FutureTask 通過 RunnableFuture 間接實現了 Runnable 接口,
所以每個 Runnable 通常都先包裝成 FutureTask,
然后調用 executor.execute(Runnable command) 將其提交給線程池
我們知道,Runnable 的 void run() 方法是沒有返回值的,所以,通常,如果我們需要的話,會在 submit 中指定第二個參數作為返回值:
其實到時候會通過這兩個參數,將其包裝成 Callable。
Callable 也是因為線程池的需要,所以才有了這個接口。它和 Runnable 的區別在于 run() 沒有返回值,而 Callable 的 call() 方法有返回值,同時,如果運行出現異常,call() 方法會拋出異常。
public interface Callable
V call() throws Exception;
}
在這里,就不展開說 FutureTask 類了,因為本文篇幅本來就夠大了,這里我們需要知道怎么用就行了。
下面,我們來看看 ExecutorService 的抽象實現 AbstractExecutorService 。
AbstractExecutorServiceAbstractExecutorService 抽象類派生自 ExecutorService 接口,然后在其基礎上實現了幾個實用的方法,這些方法提供給子類進行調用。
這個抽象類實現了 invokeAny 方法和 invokeAll 方法,這里的兩個 newTaskFor 方法也比較有用,用于將任務包裝成 FutureTask。定義于最上層接口 Executor中的 void execute(Runnable command) 由于不需要獲取結果,不會進行 FutureTask 的包裝。
需要獲取結果(FutureTask),用 submit 方法,不需要獲取結果,可以用 execute 方法。
下面,我將一行一行源碼地來分析這個類,跟著源碼來看看其實現吧:
Tips: invokeAny 和 invokeAll 方法占了這整個類的絕大多數篇幅,讀者可以選擇適當跳過,因為它們可能在你的實踐中使用的頻次比較低,而且它們不帶有承前啟后的作用,不用擔心會漏掉什么導致看不懂后面的代碼。
public abstract class AbstractExecutorService implements ExecutorService {
// RunnableFuture 是用于獲取執行結果的,我們常用它的子類 FutureTask // 下面兩個 newTaskFor 方法用于將我們的任務包裝成 FutureTask 提交到線程池中執行 protectedRunnableFuture newTaskFor(Runnable runnable, T value) { return new FutureTask (runnable, value); } protected RunnableFuture newTaskFor(Callable callable) { return new FutureTask (callable); } // 提交任務 public Future> submit(Runnable task) { if (task == null) throw new NullPointerException(); // 1. 將任務包裝成 FutureTask RunnableFuture ftask = newTaskFor(task, null); // 2. 交給執行器執行,execute 方法由具體的子類來實現 // 前面也說了,FutureTask 間接實現了Runnable 接口。 execute(ftask); return ftask; } public Future submit(Runnable task, T result) { if (task == null) throw new NullPointerException(); // 1. 將任務包裝成 FutureTask RunnableFuture ftask = newTaskFor(task, result); // 2. 交給執行器執行 execute(ftask); return ftask; } public Future submit(Callable task) { if (task == null) throw new NullPointerException(); // 1. 將任務包裝成 FutureTask RunnableFuture ftask = newTaskFor(task); // 2. 交給執行器執行 execute(ftask); return ftask; } // 此方法目的:將 tasks 集合中的任務提交到線程池執行,任意一個線程執行完后就可以結束了 // 第二個參數 timed 代表是否設置超時機制,超時時間為第三個參數, // 如果 timed 為 true,同時超時了還沒有一個線程返回結果,那么拋出 TimeoutException 異常 private T doInvokeAny(Collection extends Callable > tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { if (tasks == null) throw new NullPointerException(); // 任務數 int ntasks = tasks.size(); if (ntasks == 0) throw new IllegalArgumentException(); // List > futures= new ArrayList >(ntasks); // ExecutorCompletionService 不是一個真正的執行器,參數 this 才是真正的執行器 // 它對執行器進行了包裝,每個任務結束后,將結果保存到內部的一個 completionQueue 隊列中 // 這也是為什么這個類的名字里面有個 Completion 的原因吧。 ExecutorCompletionService ecs = new ExecutorCompletionService (this); try { // 用于保存異常信息,此方法如果沒有得到任何有效的結果,那么我們可以拋出最后得到的一個異常 ExecutionException ee = null; long lastTime = timed ? System.nanoTime() : 0; Iterator extends Callable > it = tasks.iterator(); // 首先先提交一個任務,后面的任務到下面的 for 循環一個個提交 futures.add(ecs.submit(it.next())); // 提交了一個任務,所以任務數量減 1 --ntasks; // 正在執行的任務數(提交的時候 +1,任務結束的時候 -1) int active = 1; for (;;) { // ecs 上面說了,其內部有一個 completionQueue 用于保存執行完成的結果 // BlockingQueue 的 poll 方法不阻塞,返回 null 代表隊列為空 Future f = ecs.poll(); // 為 null,說明剛剛提交的第一個線程還沒有執行完成 // 在前面先提交一個任務,加上這里做一次檢查,也是為了提高性能 if (f == null) { if (ntasks > 0) { --ntasks; futures.add(ecs.submit(it.next())); ++active; } // 這里是 else if,不是 if。這里說明,沒有任務了,同時 active 為 0 說明 // 任務都執行完成了。其實我也沒理解為什么這里做一次 break? // 因為我認為 active 為 0 的情況,必然從下面的 f.get() 返回了 // 2018-02-23 感謝讀者 newmicro 的 comment, // 這里的 active == 0,說明所有的任務都執行失敗,那么這里是 for 循環出口 else if (active == 0) break; // 這里也是 else if。這里說的是,沒有任務了,但是設置了超時時間,這里檢測是否超時 else if (timed) { // 帶等待的 poll 方法 f = ecs.poll(nanos, TimeUnit.NANOSECONDS); // 如果已經超時,拋出 TimeoutException 異常,這整個方法就結束了 if (f == null) throw new TimeoutException(); long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } // 這里是 else。說明,沒有任務需要提交,但是池中的任務沒有完成,還沒有超時(如果設置了超時) // take() 方法會阻塞,直到有元素返回,說明有任務結束了 else f = ecs.take(); } /* * 我感覺上面這一段并不是很好理解,這里簡單說下。 * 1. 首先,這在一個 for 循環中,我們設想每一個任務都沒那么快結束, * 那么,每一次都會進到第一個分支,進行提交任務,直到將所有的任務都提交了 * 2. 任務都提交完成后,如果設置了超時,那么 for 循環其實進入了“一直檢測是否超時” 這件事情上 * 3. 如果沒有設置超時機制,那么不必要檢測超時,那就會阻塞在 ecs.take() 方法上, 等待獲取第一個執行結果 * 4. 如果所有的任務都執行失敗,也就是說 future 都返回了, 但是 f.get() 拋出異常,那么從 active == 0 分支出去(感謝 newmicro 提出) // 當然,這個需要看下面的 if 分支。 */ // 有任務結束了 if (f != null) { --active; try { // 返回執行結果,如果有異常,都包裝成 ExecutionException return f.get(); } catch (ExecutionException eex) { ee = eex; } catch (RuntimeException rex) { ee = new ExecutionException(rex); } } }// 注意看 for 循環的范圍,一直到這里 if (ee == null) ee = new ExecutionException(); throw ee; } finally { // 方法退出之前,取消其他的任務 for (Future f : futures) f.cancel(true); } } public T invokeAny(Collection extends Callable > tasks) throws InterruptedException, ExecutionException { try { return doInvokeAny(tasks, false, 0); } catch (TimeoutException cannotHappen) { assert false; return null; } } public T invokeAny(Collection extends Callable > tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { return doInvokeAny(tasks, true, unit.toNanos(timeout)); } // 執行所有的任務,返回任務結果。 // 先不要看這個方法,我們先想想,其實我們自己提交任務到線程池,也是想要線程池執行所有的任務 // 只不過,我們是每次 submit 一個任務,這里以一個集合作為參數提交 public List > invokeAll(Collection extends Callable > tasks) throws InterruptedException { if (tasks == null) throw new NullPointerException(); List > futures = new ArrayList >(tasks.size()); boolean done = false; try { // 這個很簡單 for (Callable t : tasks) { // 包裝成 FutureTask RunnableFuture f = newTaskFor(t); futures.add(f); // 提交任務 execute(f); } for (Future f : futures) { if (!f.isDone()) { try { // 這是一個阻塞方法,直到獲取到值,或拋出了異常 // 這里有個小細節,其實 get 方法簽名上是會拋出 InterruptedException 的 // 可是這里沒有進行處理,而是拋給外層去了。此異常發生于還沒執行完的任務被取消了 f.get(); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } } } done = true; // 這個方法返回,不像其他的場景,返回 List ,其實執行結果還沒出來 // 這個方法返回是真正的返回,任務都結束了 return futures; } finally { // 為什么要這個?就是上面說的有異常的情況 if (!done) for (Future f : futures) f.cancel(true); } } // 帶超時的 invokeAll,我們找不同吧 public List > invokeAll(Collection extends Callable > tasks, long timeout, TimeUnit unit) throws InterruptedException { if (tasks == null || unit == null) throw new NullPointerException(); long nanos = unit.toNanos(timeout); List > futures = new ArrayList >(tasks.size()); boolean done = false; try { for (Callable t : tasks) futures.add(newTaskFor(t)); long lastTime = System.nanoTime(); Iterator > it = futures.iterator(); // 提交一個任務,檢測一次是否超時 while (it.hasNext()) { execute((Runnable)(it.next())); long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; // 超時 if (nanos <= 0) return futures; } for (Future f : futures) { if (!f.isDone()) { if (nanos <= 0) return futures; try { // 調用帶超時的 get 方法,這里的參數 nanos 是剩余的時間, // 因為上面其實已經用掉了一些時間了 f.get(nanos, TimeUnit.NANOSECONDS); } catch (CancellationException ignore) { } catch (ExecutionException ignore) { } catch (TimeoutException toe) { return futures; } long now = System.nanoTime(); nanos -= now - lastTime; lastTime = now; } } done = true; return futures; } finally { if (!done) for (Future f : futures) f.cancel(true); } }
}
到這里,我們發現,這個抽象類包裝了一些基本的方法,可是像 submit、invokeAny、invokeAll 等方法,它們都沒有真正開啟線程來執行任務,它們都只是在方法內部調用了 execute 方法,所以最重要的 execute(Runnable runnable) 方法還沒出現,需要等具體執行器來實現這個最重要的部分,這里我們要說的就是 ThreadPoolExecutor 類了。
鑒于本文的篇幅,我覺得看到這里的讀者應該已經不多了,快餐文化使然啊!我寫的每篇文章都力求讓讀者可以通過我的一篇文章而記住所有的相關知識點,所以篇幅不免長了些。其實,工作了很多年的話,會有一個感覺,比如說線程池,即使看了 20 篇各種總結,也不如一篇長文實實在在講解清楚每一個知識點,有點少即是多,多即是少的意味了。
ThreadPoolExecutorThreadPoolExecutor 是 JDK 中的線程池實現,這個類實現了一個線程池需要的各個方法,它實現了任務提交、線程管理、監控等等方法。
我們可以基于它來進行業務上的擴展,以實現我們需要的其他功能,比如實現定時任務的類 ScheduledThreadPoolExecutor 就繼承自 ThreadPoolExecutor。當然,這不是本文關注的重點,下面,還是趕緊進行源碼分析吧。
首先,我們來看看線程池實現中的幾個概念和處理流程。
我們先回顧下提交任務的幾個方法:
public Future> submit(Runnable task) {
if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, null); execute(ftask); return ftask;
}
public
if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task, result); execute(ftask); return ftask;
}
public
if (task == null) throw new NullPointerException(); RunnableFutureftask = newTaskFor(task); execute(ftask); return ftask;
}
一個最基本的概念是,submit 方法中,參數是 Runnable 類型(也有Callable 類型),這個參數不是用于 new Thread(runnable).start() 中的,此處的這個參數不是用于啟動線程的,這里指的是任務,任務要做的事情是 run() 方法里面定義的或 Callable 中的 call() 方法里面定義的。
初學者往往會搞混這個,因為 Runnable 總是在各個地方出現,經常把一個 Runnable 包到另一個 Runnable 中。請把它想象成有個 Task 接口,這個接口里面有一個 run() 方法(我想作者只是不想因為這個再定義一個完全可以用 Runnable 來代替的接口,Callable 的出現,完全是因為 Runnable 不能滿足需要)。
我們回過神來繼續往下看,我畫了一個簡單的示意圖來描述線程池中的一些主要的構件:
當然,上圖沒有考慮隊列是否有界,提交任務時隊列滿了怎么辦?什么情況下會創建新的線程?提交任務時線程池滿了怎么辦?空閑線程怎么關掉?這些問題下面我們會一一解決。
我們經常會使用 Executors 這個工具類來快速構造一個線程池,對于初學者而言,這種工具類是很有用的,開發者不需要關注太多的細節,只要知道自己需要一個線程池,僅僅提供必需的參數就可以了,其他參數都采用作者提供的默認值。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
這里先不說有什么區別,它們最終都會導向這個構造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // 這幾個參數都是必須要有的 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
基本上,上面的構造方法中列出了我們最需要關心的幾個屬性了,下面逐個介紹下構造方法中出現的這幾個屬性:
corePoolSize
核心線程數,不要摳字眼,反正先記著有這么個屬性就可以了。
maximumPoolSize
?最大線程數,線程池允許創建的最大線程數。
workQueue
任務隊列,BlockingQueue 接口的某個實現(常使用 ArrayBlockingQueue 和 LinkedBlockingQueue)。
keepAliveTime
空閑線程的?;顣r間,如果某線程的空閑時間超過這個值都沒有任務給它做,那么可以被關閉了。注意這個值并不會對所有線程起作用,如果線程池中的線程數少于等于核心線程數 corePoolSize,那么這些線程不會因為空閑太長時間而被關閉,當然,也可以通過調用 allowCoreThreadTimeOut(true)使核心線程數內的線程也可以被回收。
threadFactory
用于生成線程,一般我們可以用默認的就可以了。通常,我們可以通過它將我們的線程的名字設置得比較可讀一些,如 Message-Thread-1, Message-Thread-2 類似這樣。
handler:
當線程池已經滿了,但是又有新的任務提交的時候,該采取什么策略由這個來指定。有幾種方式可供選擇,像拋出異常、直接拒絕然后返回等,也可以自己實現相應的接口實現自己的邏輯,這個之后再說。
除了上面幾個屬性外,我們再看看其他重要的屬性。
Doug Lea 采用一個 32 位的整數來存放線程池的狀態和當前池中的線程數,其中高 3 位用于存放線程池狀態,低 29 位表示線程數(即使只有 29 位,也已經不小了,大概 5 億多,現在還沒有哪個機器能起這么多線程的吧)。我們知道,java 語言在整數編碼上是統一的,都是采用補碼的形式,下面是簡單的移位操作和布爾操作,都是挺簡單的。
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 這里 COUNT_BITS 設置為 29(32-3),意味著前三位用于存放線程狀態,后29位用于存放線程數
// 很多初學者很喜歡在自己的代碼中寫很多 29 這種數字,或者某個特殊的字符串,然后分布在各個地方,這是非常糟糕的
private static final int COUNT_BITS = Integer.SIZE - 3;
// 000 11111111111111111111111111111
// 這里得到的是 29 個 1,也就是說線程池的最大線程數是 2^29-1=536870911
// 以我們現在計算機的實際情況,這個數量還是夠用的
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 我們說了,線程池的狀態存放在高 3 位中
// 運算結果為 111跟29個0:111 00000000000000000000000000000
private static final int RUNNING = -1 << COUNT_BITS;
// 000 00000000000000000000000000000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 001 00000000000000000000000000000
private static final int STOP = 1 << COUNT_BITS;
// 010 00000000000000000000000000000
private static final int TIDYING = 2 << COUNT_BITS;
// 011 00000000000000000000000000000
private static final int TERMINATED = 3 << COUNT_BITS;
// 將整數 c 的低 29 位修改為 0,就得到了線程池的狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 將整數 c 的高 3 為修改為 0,就得到了線程池中的線程數
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
/*
Bit field accessors that don"t require unpacking ctl.
These depend on the bit layout and on workerCount being never negative.
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
上面就是對一個整數的簡單的位操作,幾個操作方法將會在后面的源碼中一直出現,所以讀者最好把方法名字和其代表的功能記住,看源碼的時候也就不需要來來回回翻了。
在這里,介紹下線程池中的各個狀態和狀態變化的轉換過程:
RUNNING:這個沒什么好說的,這是最正常的狀態:接受新的任務,處理等待隊列中的任務
SHUTDOWN:不接受新的任務提交,但是會繼續處理等待隊列中的任務
STOP:不接受新的任務提交,不再處理等待隊列中的任務,中斷正在執行任務的線程
TIDYING:所有的任務都銷毀了,workCount 為 0。線程池的狀態在轉換為 TIDYING 狀態時,會執行鉤子方法 terminated()
TERMINATED:terminated() 方法結束后,線程池的狀態就會變成這個
RUNNING 定義為 -1,SHUTDOWN 定義為 0,其他的都比 0 大,所以等于 0 的時候不能提交任務,大于 0 的話,連正在執行的任務也需要中斷。
看了這幾種狀態的介紹,讀者大體也可以猜到十之八九的狀態轉換了,各個狀態的轉換過程有以下幾種:
RUNNING -> SHUTDOWN:當調用了 shutdown() 后,會發生這個狀態轉換,這也是最重要的
(RUNNING or SHUTDOWN) -> STOP:當調用 shutdownNow() 后,會發生這個狀態轉換,這下要清楚 shutDown() 和 shutDownNow() 的區別了
SHUTDOWN -> TIDYING:當任務隊列和線程池都清空后,會由 SHUTDOWN 轉換為 TIDYING
STOP -> TIDYING:當任務隊列清空后,發生這個轉換
TIDYING -> TERMINATED:這個前面說了,當 terminated() 方法結束后
上面的幾個記住核心的就可以了,尤其第一個和第二個。
另外,我們還要看看一個內部類 Worker,因為 Doug Lea 把線程池中的線程包裝成了一個個 Worker,翻譯成工人,就是線程池中做任務的線程。所以到這里,我們知道任務是 Runnable(內部叫 task 或 command),線程是 Worker。
Worker 這里又用到了抽象類 AbstractQueuedSynchronizer。題外話,AQS 在并發中真的是到處出現,而且非常容易使用,寫少量的代碼就能實現自己需要的同步方式(對 AQS 源碼感興趣的讀者請參看我之前寫的幾篇文章)。
private final class Worker
extends AbstractQueuedSynchronizer implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L; // 這個是真正的線程,任務靠你啦 final Thread thread; // 前面說了,這里的 Runnable 是任務。為什么叫 firstTask?因為在創建線程的時候,如果同時指定了 // 這個線程起來以后需要執行的第一個任務,那么第一個任務就是存放在這里的(線程可不止執行這一個任務) // 當然了,也可以為 null,這樣線程起來了,自己到任務隊列(BlockingQueue)中取任務(getTask 方法)就行了 Runnable firstTask; // 用于存放此線程完全的任務數,注意了,這里用了 volatile,保證可見性 volatile long completedTasks; // Worker 只有這一個構造方法,傳入 firstTask,也可以傳 null Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; // 調用 ThreadFactory 來創建一個新的線程 this.thread = getThreadFactory().newThread(this); } // 這里調用了外部類的 runWorker 方法 public void run() { runWorker(this); } ...// 其他幾個方法沒什么好看的,就是用 AQS 操作,來獲取這個線程的執行權,用了獨占鎖
}
前面雖然啰嗦,但是簡單。有了上面的這些基礎后,我們終于可以看看 ThreadPoolExecutor 的 execute 方法了,前面源碼分析的時候也說了,各種方法都最終依賴于 execute 方法:
public void execute(Runnable command) {
if (command == null) throw new NullPointerException(); // 前面說的那個表示 “線程池狀態” 和 “線程數” 的整數 int c = ctl.get(); // 如果當前線程數少于核心線程數,那么直接添加一個 worker 來執行任務, // 創建一個新的線程,并把當前任務 command 作為這個線程的第一個任務(firstTask) if (workerCountOf(c) < corePoolSize) { // 添加任務成功,那么就結束了。提交任務嘛,線程池已經接受了這個任務,這個方法也就可以返回了 // 至于執行的結果,到時候會包裝到 FutureTask 中。 // 返回 false 代表線程池不允許提交任務 if (addWorker(command, true)) return; c = ctl.get(); } // 到這里說明,要么當前線程數大于等于核心線程數,要么剛剛 addWorker 失敗了 // 如果線程池處于 RUNNING 狀態,把這個任務添加到任務隊列 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { /* 這里面說的是,如果任務進入了 workQueue,我們是否需要開啟新的線程 * 因為線程數在 [0, corePoolSize) 是無條件開啟新的線程 * 如果線程數已經大于等于 corePoolSize,那么將任務添加到隊列中,然后進到這里 */ int recheck = ctl.get(); // 如果線程池已不處于 RUNNING 狀態,那么移除已經入隊的這個任務,并且執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果線程池還是 RUNNING 的,并且線程數為 0,那么開啟新的線程 // 到這里,我們知道了,這塊代碼的真正意圖是:擔心任務提交到隊列中了,但是線程都關閉了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果 workQueue 隊列滿了,那么進入到這個分支 // 以 maximumPoolSize 為界創建新的 worker, // 如果失敗,說明當前線程數已經達到 maximumPoolSize,執行拒絕策略 else if (!addWorker(command, false)) reject(command);
}
對創建線程的錯誤理解:如果線程數少于 corePoolSize,創建一個線程,如果線程數在 [corePoolSize, maximumPoolSize] 之間那么可以創建線程或復用空閑線程,keepAliveTime 對這個區間的線程有效。
從上面的幾個分支,我們就可以看出,上面的這段話是錯誤的。
上面這些一時半會也不可能全部消化搞定,我們先繼續往下吧,到時候再回頭看幾遍。
這個方法非常重要 addWorker(Runnable firstTask, boolean core) 方法,我們看看它是怎么創建新的線程的:
// 第一個參數是準備提交給這個線程執行的任務,之前說了,可以為 null
// 第二個參數為 true 代表使用核心線程數 corePoolSize 作為創建線程的界線,也就說創建這個線程的時候,
// 如果線程池中的線程總數已經達到 corePoolSize,那么不能響應這次創建線程的請求
// 如果是 false,代表使用最大線程數 maximumPoolSize 作為界線
private boolean addWorker(Runnable firstTask, boolean core) {
retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 這個非常不好理解 // 如果線程池已關閉,并滿足以下條件之一,那么不創建新的 worker: // 1. 線程池狀態大于 SHUTDOWN,其實也就是 STOP, TIDYING, 或 TERMINATED // 2. firstTask != null // 3. workQueue.isEmpty() // 簡單分析下: // 還是狀態控制的問題,當線程池處于 SHUTDOWN 的時候,不允許提交任務,但是已有的任務繼續執行 // 當狀態大于 SHUTDOWN 時,不允許提交任務,且中斷正在執行的任務 // 多說一句:如果線程池處于 SHUTDOWN,但是 firstTask 為 null,且 workQueue 非空,那么是允許創建 worker 的 if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 如果成功,那么就是所有創建線程前的條件校驗都滿足了,準備創建線程執行任務了 // 這里失敗的話,說明有其他線程也在嘗試往線程池中創建線程 if (compareAndIncrementWorkerCount(c)) break retry; // 由于有并發,重新再讀取一下 ctl c = ctl.get(); // 正常如果是 CAS 失敗的話,進到下一個里層的for循環就可以了 // 可是如果是因為其他線程的操作,導致線程池的狀態發生了變更,如有其他線程關閉了這個線程池 // 那么需要回到外層的for循環 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } /* * 到這里,我們認為在當前這個時刻,可以開始創建線程來執行任務了, * 因為該校驗的都校驗了,至于以后會發生什么,那是以后的事,至少當前是滿足條件的 */ // worker 是否已經啟動 boolean workerStarted = false; // 是否已將這個 worker 添加到 workers 這個 HashSet 中 boolean workerAdded = false; Worker w = null; try { final ReentrantLock mainLock = this.mainLock; // 把 firstTask 傳給 worker 的構造方法 w = new Worker(firstTask); // 取 worker 中的線程對象,之前說了,Worker的構造方法會調用 ThreadFactory 來創建一個新的線程 final Thread t = w.thread; if (t != null) { // 這個是整個類的全局鎖,持有這個鎖才能讓下面的操作“順理成章”, // 因為關閉一個線程池需要這個鎖,至少我持有鎖的期間,線程池不會被關閉 mainLock.lock(); try { int c = ctl.get(); int rs = runStateOf(c); // 小于 SHUTTDOWN 那就是 RUNNING,這個自不必說,是最正常的情況 // 如果等于 SHUTDOWN,前面說了,不接受新的任務,但是會繼續執行等待隊列中的任務 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // worker 里面的 thread 可不能是已經啟動的 if (t.isAlive()) throw new IllegalThreadStateException(); // 加到 workers 這個 HashSet 中 workers.add(w); int s = workers.size(); // largestPoolSize 用于記錄 workers 中的個數的最大值 // 因為 workers 是不斷增加減少的,通過這個值可以知道線程池的大小曾經達到的最大值 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } // 添加成功的話,啟動這個線程 if (workerAdded) { // 啟動線程 t.start(); workerStarted = true; } } } finally { // 如果線程沒有啟動,需要做一些清理工作,如前面 workCount 加了 1,將其減掉 if (! workerStarted) addWorkerFailed(w); } // 返回線程是否啟動成功 return workerStarted;
}
簡單看下 addWorkFailed 的處理:
// workers 中刪除掉相應的 worker
// workCount 減 1
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) workers.remove(w); decrementWorkerCount(); // rechecks for termination, in case the existence of this worker was holding up termination tryTerminate(); } finally { mainLock.unlock(); }
}
回過頭來,繼續往下走。我們知道,worker 中的線程 start 后,其 run 方法會調用 runWorker 方法:
// Worker 類的 run() 方法
public void run() {
runWorker(this);
}
繼續往下看 runWorker 方法:
// 此方法由 worker 線程啟動后調用,這里用一個 while 循環來不斷地從等待隊列中獲取任務并執行
// 前面說了,worker 在初始化的時候,可以指定 firstTask,那么第一個任務也就可以不需要從隊列中獲取
final void runWorker(Worker w) {
// Thread wt = Thread.currentThread(); // 該線程的第一個任務(如果有的話) Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 循環調用 getTask 獲取任務 while (task != null || (task = getTask()) != null) { w.lock(); // 如果線程池狀態大于等于 STOP,那么意味著該線程也要中斷 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { // 這是一個鉤子方法,留給需要的子類實現 beforeExecute(wt, task); Throwable thrown = null; try { // 到這里終于可以執行任務了 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { // 這里不允許拋出 Throwable,所以轉換為 Error thrown = x; throw new Error(x); } finally { // 也是一個鉤子方法,將 task 和異常作為參數,留給需要的子類實現 afterExecute(task, thrown); } } finally { // 置空 task,準備 getTask 獲取下一個任務 task = null; // 累加完成的任務數 w.completedTasks++; // 釋放掉 worker 的獨占鎖 w.unlock(); } } completedAbruptly = false; } finally { // 如果到這里,需要執行線程關閉: // 1. 說明 getTask 返回 null,也就是說,這個 worker 的使命結束了,執行關閉 // 2. 任務執行過程中發生了異常 // 第一種情況,已經在代碼處理了將 workCount 減 1,這個在 getTask 方法分析中會說 // 第二種情況,workCount 沒有進行處理,所以需要在 processWorkerExit 中處理 // 限于篇幅,我不準備分析這個方法了,感興趣的讀者請自行分析源碼 processWorkerExit(w, completedAbruptly); }
}
我們看看 getTask() 是怎么獲取任務的,這個方法寫得真的很好,每一行都很簡單,組合起來卻所有的情況都想好了:
// 此方法有三種可能:
// 1. 阻塞直到獲取到任務返回。我們知道,默認 corePoolSize 之內的線程是不會被回收的,
// 它們會一直等待任務
// 2. 超時退出。keepAliveTime 起作用的時候,也就是如果這么多時間內都沒有任務,那么應該執行關閉
// 3. 如果發生了以下條件,此方法必須返回 null:
// - 池中有大于 maximumPoolSize 個 workers 存在(通過調用 setMaximumPoolSize 進行設置)
// - 線程池處于 SHUTDOWN,而且 workQueue 是空的,前面說了,這種不再接受新的任務
// - 線程池處于 STOP,不僅不接受新的線程,連 workQueue 中的線程也不再執行
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 兩種可能 // 1. rs == SHUTDOWN && workQueue.isEmpty() // 2. rs >= STOP if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // CAS 操作,減少工作線程數 decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); // 允許核心線程數內的線程回收,或當前線程數超過了核心線程數,那么有可能發生超時關閉 timed = allowCoreThreadTimeOut || wc > corePoolSize; // 這里 break,是為了不往下執行后一個 if (compareAndDecrementWorkerCount(c)) // 兩個 if 一起看:如果當前線程數 wc > maximumPoolSize,或者超時,都返回 null // 那這里的問題來了,wc > maximumPoolSize 的情況,為什么要返回 null? // 換句話說,返回 null 意味著關閉線程。 // 那是因為有可能開發者調用了 setMaximumPoolSize 將線程池的 maximumPoolSize 調小了 if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl // compareAndDecrementWorkerCount(c) 失敗,線程池中的線程數發生了改變 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } // wc <= maximumPoolSize 同時沒有超時 try { // 到 workQueue 中獲取任務 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { // 如果此 worker 發生了中斷,采取的方案是重試 // 解釋下為什么會發生中斷,這個讀者要去看 setMaximumPoolSize 方法, // 如果開發者將 maximumPoolSize 調小了,導致其小于當前的 workers 數量, // 那么意味著超出的部分線程要被關閉。重新進入 for 循環,自然會有部分線程會返回 null timedOut = false; } }
}
到這里,基本上也說完了整個流程,讀者這個時候應該回到 execute(Runnable command) 方法,看看各個分支,我把代碼貼過來一下:
public void execute(Runnable command) {
if (command == null) throw new NullPointerException(); // 前面說的那個表示 “線程池狀態” 和 “線程數” 的整數 int c = ctl.get(); // 如果當前線程數少于核心線程數,那么直接添加一個 worker 來執行任務, // 創建一個新的線程,并把當前任務 command 作為這個線程的第一個任務(firstTask) if (workerCountOf(c) < corePoolSize) { // 添加任務成功,那么就結束了。提交任務嘛,線程池已經接受了這個任務,這個方法也就可以返回了 // 至于執行的結果,到時候會包裝到 FutureTask 中。 // 返回 false 代表線程池不允許提交任務 if (addWorker(command, true)) return; c = ctl.get(); } // 到這里說明,要么當前線程數大于等于核心線程數,要么剛剛 addWorker 失敗了 // 如果線程池處于 RUNNING 狀態,把這個任務添加到任務隊列 workQueue 中 if (isRunning(c) && workQueue.offer(command)) { /* 這里面說的是,如果任務進入了 workQueue,我們是否需要開啟新的線程 * 因為線程數在 [0, corePoolSize) 是無條件開啟新的線程 * 如果線程數已經大于等于 corePoolSize,那么將任務添加到隊列中,然后進到這里 */ int recheck = ctl.get(); // 如果線程池已不處于 RUNNING 狀態,那么移除已經入隊的這個任務,并且執行拒絕策略 if (! isRunning(recheck) && remove(command)) reject(command); // 如果線程池還是 RUNNING 的,并且線程數為 0,那么開啟新的線程 // 到這里,我們知道了,這塊代碼的真正意圖是:擔心任務提交到隊列中了,但是線程都關閉了 else if (workerCountOf(recheck) == 0) addWorker(null, false); } // 如果 workQueue 隊列滿了,那么進入到這個分支 // 以 maximumPoolSize 為界創建新的 worker, // 如果失敗,說明當前線程數已經達到 maximumPoolSize,執行拒絕策略 else if (!addWorker(command, false)) reject(command);
}
上面各個分支中,有兩種情況會調用 reject(command) 來處理任務,因為按照正常的流程,線程池此時不能接受這個任務,所以需要執行我們的拒絕策略。接下來,我們說一說 ThreadPoolExecutor 中的拒絕策略。
final void reject(Runnable command) {
// 執行拒絕策略 handler.rejectedExecution(command, this);
}
此處的 handler 我們需要在構造線程池的時候就傳入這個參數,它是 RejectedExecutionHandler 的實例。
RejectedExecutionHandler 在 ThreadPoolExecutor 中有四個已經定義好的實現類可供我們直接使用,當然,我們也可以實現自己的策略,不過一般也沒有必要。
// 只要線程池沒有被關閉,那么由提交任務的線程自己來執行這個任務。
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
}
// 不管怎樣,直接拋出 RejectedExecutionException 異常
// 這個是默認的策略,如果我們構造線程池的時候不傳相應的 handler 的話,那就會指定使用這個
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
}
// 不做任何處理,直接忽略掉這個任務
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { }
}
// 這個相對霸道一點,如果線程池沒有被關閉的話,
// 把隊列隊頭的任務(也就是等待了最長時間的)直接扔掉,然后提交這個任務到等待隊列中
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } }
}
到這里,ThreadPoolExecutor 的源碼算是分析結束了。單純從源碼的難易程度來說,ThreadPoolExecutor 的源碼還算是比較簡單的,只是需要我們靜下心來好好看看罷了。
這節其實也不是分析 Executors 這個類,因為它僅僅是工具類,它的所有方法都是 static 的。
生成一個固定大小的線程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
最大線程數設置為與核心線程數相等,此時 keepAliveTime 設置為 0(因為這里它是沒用的,即使不為 0,線程池默認也不會回收 corePoolSize 內的線程),任務隊列采用 LinkedBlockingQueue,無界隊列。
過程分析:剛開始,每提交一個任務都創建一個 worker,當 worker 的數量達到 nThreads 后,不再創建新的線程,而是把任務提交到 LinkedBlockingQueue 中,而且之后線程數始終為 nThreads。
生成只有一個線程的固定線程池,這個更簡單,和上面的一樣,只要設置線程數為 1 就可以了:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
生成一個需要的時候就創建新的線程,同時可以復用之前創建的線程(如果這個線程當前沒有任務)的線程池:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
核心線程數為 0,最大線程數為 Integer.MAX_VALUE,keepAliveTime 為 60 秒,任務隊列采用 SynchronousQueue。
這種線程池對于任務可以比較快速地完成的情況有比較好的性能。如果線程空閑了 60 秒都沒有任務,那么將關閉此線程并從線程池中移除。所以如果線程池空閑了很長時間也不會有問題,因為隨著所有的線程都會被關閉,整個線程池不會占用任何的系統資源。
過程分析:我把 execute 方法的主體黏貼過來,讓大家看得明白些。鑒于 corePoolSize 是 0,那么提交任務的時候,直接將任務提交到隊列中,由于采用了 SynchronousQueue,所以如果是第一個任務提交的時候,offer 方法肯定會返回 false,因為此時沒有任何 worker 對這個任務進行接收,那么將進入到最后一個分支來創建第一個 worker。之后再提交任務的話,取決于是否有空閑下來的線程對任務進行接收,如果有,會進入到第二個 if 語句塊中,否則就是和第一個任務一樣,進到最后的 else if 分支。
int c = ctl.get();
// corePoolSize 為 0,所以不會進到這個 if 分支
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) return; c = ctl.get();
}
// offer 如果有空閑線程剛好可以接收此任務,那么返回 true,否則返回 false
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
SynchronousQueue 是一個比較特殊的 BlockingQueue,其本身不儲存任何元素,它有一個虛擬隊列(或虛擬棧),不管讀操作還是寫操作,如果當前隊列中存儲的是與當前操作相同模式的線程,那么當前操作也進入隊列中等待;如果是相反模式,則配對成功,從當前隊列中取隊頭節點。具體的信息,可以看我的另一篇關于 BlockingQueue 的文章。
總結我一向不喜歡寫總結,因為我把所有需要表達的都寫在正文中了,寫小篇幅的總結并不能真正將話說清楚,本文的總結部分為準備面試的讀者而寫,希望能幫到面試者或者沒有足夠的時間看完全文的讀者。
java 線程池有哪些關鍵屬性?
corePoolSize,maximumPoolSize,workQueue,keepAliveTime,rejectedExecutionHandler
corePoolSize 到 maximumPoolSize 之間的線程會被回收,當然 corePoolSize 的線程也可以通過設置而得到回收(allowCoreThreadTimeOut(true))。
workQueue 用于存放任務,添加任務的時候,如果當前線程數超過了 corePoolSize,那么往該隊列中插入任務,線程池中的線程會負責到隊列中拉取任務。
keepAliveTime 用于設置空閑時間,如果線程數超出了 corePoolSize,并且有些線程的空閑時間超過了這個值,會執行關閉這些線程的操作
rejectedExecutionHandler 用于處理當線程池不能執行此任務時的情況,默認有拋出 RejectedExecutionException 異常、忽略任務、使用提交任務的線程來執行此任務和將隊列中等待最久的任務刪除,然后提交此任務這四種策略,默認為拋出異常。
說說線程池中的線程創建時機?
如果當前線程數少于 corePoolSize,那么提交任務的時候創建一個新的線程,并由這個線程執行這個任務;
如果當前線程數已經達到 corePoolSize,那么將提交的任務添加到隊列中,等待線程池中的線程去隊列中取任務;
如果隊列已滿,那么創建新的線程來執行任務,需要保證池中的線程數不會超過 maximumPoolSize,如果此時線程數超過了 maximumPoolSize,那么執行拒絕策略。
注意:如果將隊列設置為無界隊列,那么線程數達到 corePoolSize 后,其實線程數就不會再增長了。
Executors.newFixedThreadPool(…) 和 Executors.newCachedThreadPool() 構造出來的線程池有什么差別?
細說太長,往上滑一點點,在 Executors 的小節進行了詳盡的描述。
任務執行過程中發生異常怎么處理?
如果某個任務執行出現異常,那么執行任務的線程會被關閉,而不是繼續接收其他任務。然后會啟動一個新的線程來代替它。
什么時候會執行拒絕策略?
workers 的數量達到了 corePoolSize(任務此時需要進入任務隊列),任務入隊成功,與此同時線程池被關閉了,而且關閉線程池并沒有將這個任務出隊,那么執行拒絕策略。這里說的是非常邊界的問題,入隊和關閉線程池并發執行,讀者仔細看看 execute 方法是怎么進到第一個 reject(command) 里面的。
workers 的數量大于等于 corePoolSize,將任務加入到任務隊列,可是隊列滿了,任務入隊失敗,那么準備開啟新的線程,可是線程數已經達到 maximumPoolSize,那么執行拒絕策略。
因為本文實在太長了,所以我沒有說執行結果是怎么獲取的,也沒有說關閉線程池相關的部分,這個就留給讀者吧。
本文篇幅是有點長,如果讀者發現什么不對的地方,或者有需要補充的地方,請不吝提出,謝謝。
(全文完)
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72341.html
摘要:線程池為線程生命周期的開銷和資源不足問題提供了解決方案。狀態說明線程池處于狀態,不接收新任務,不處理已提交的任務,并且會中斷正在處理的任務。線程池中允許的最大線程數。線程池的飽和策略。 線程池為線程生命周期的開銷和資源不足問題提供了解決方 案。通過對多個任務重用線程,線程創建的開銷被分攤到了多個任務上。 線程實現方式 Thread、Runnable、Callable //實現Runna...
摘要:架構相關概念監控虛擬機監控指標收集緩沖區監控指標。這是一個指標,與類似,可以對指標數據進行采樣。內存區域的最大字節數內存區域的初始化字節數內存池使用情況內存池最大數內存池初始化數線程區域監控。線程峰值總啟動線程數量,指標。 簡介 Prometheus 是一套開源的系統監控報警框架。它啟發于 Google 的 borgmon 監控系統,由工作在 SoundCloud 的 google 前...
引言 本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經在偏離主題的道路上越來越遠。于是乎,我決定:繼續保持…… 使用 首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....
引言 本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經在偏離主題的道路上越來越遠。于是乎,我決定:繼續保持…… 使用 首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....
引言 本文是源起netty專欄的第4篇文章,很明顯前3篇文章已經在偏離主題的道路上越來越遠。于是乎,我決定:繼續保持…… 使用 首先看看源碼類注釋中的示例(未改變官方示例邏輯,只是增加了print輸出和注釋) import java.time.LocalTime; import java.util.concurrent.Executors; import java.util.concurrent....
閱讀 3288·2021-09-08 09:45
閱讀 1251·2019-08-30 15:53
閱讀 1522·2019-08-30 14:12
閱讀 981·2019-08-29 17:01
閱讀 2568·2019-08-29 15:35
閱讀 394·2019-08-29 13:09
閱讀 1965·2019-08-29 12:32
閱讀 3083·2019-08-26 18:37