摘要:那么線程池到底是怎么利用類來實現持續不斷地接收提交的任務并執行的呢接下來,我們通過的源代碼來一步一步抽絲剝繭,揭開線程池運行模型的神秘面紗。
在上一篇文章《從0到1玩轉線程池》中,我們了解了線程池的使用方法,以及向線程池中提交任務的完整流程和ThreadPoolExecutor.execute方法的源代碼。在這篇文章中,我們將會從頭閱讀線程池ThreadPoolExecutor類的源代碼,深入剖析線程池從提交任務到執行任務的完整流程,從而建立起完整的線程池運行模型。
查看JDK源碼的方式在IDE中,例如IDEA里,我們可以點擊我們樣例代碼里的ThreadPoolExecutor類跳轉到JDK中ThreadPoolExecutor類的源代碼。在源代碼中我們可以看到很多java.util.concurrent包的締造者大牛“Doug Lea”所留下的各種注釋,下面的圖片就是該類源代碼的一個截圖。
這些注釋的內容非常有參考價值,建議有能力的讀者朋友可以自己閱讀一遍。下面,我們就開始閱讀ThreadPoolExecutor的源代碼吧。
控制變量與線程池生命周期在ThreadPoolExecutor類定義的開頭,我們可以看到如下的幾行代碼:
// 控制變量,前3位表示狀態,剩下的數據位表示有效的線程數 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // Integer的位數減去3位狀態位就是線程數的位數 private static final int COUNT_BITS = Integer.SIZE - 3; // CAPACITY就是線程數的上限(含),即2^COUNT_BITS - 1個 private static final int CAPACITY = (1 << COUNT_BITS) - 1;
第一行是一個用來作為控制變量的整型值,即一個Integer。之所以要用AtomicInteger類是因為要保證多線程安全,在本系列之后的文章中會對AtomicInteger進行具體介紹。一個整型一般是32位,但是這里的代碼為了保險起見,還是使用了Integer.SIZE來表示整型的總位數。這里的“位”指的是數據位(bit),在計算機中,8bit = 1字節,1024字節 = 1KB,1024KB = 1MB。每一位都是一個0或1的數字,我們如果把整型想象成一個二進制(0或1)的數組,那么一個Integer就是32個數字的數組。其中,前三個被用來表示狀態,那么我們就可以表示2^3 = 8個不同的狀態了。剩下的29位二進制數字都會被用于表示當前線程池中有效線程的數量,上限就是(2^29 - 1)個,即常量CAPACITY。
之后的部分列出了線程池的所有狀態:
private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;
在這里可以忽略數字后面的<< COUNT_BITS,可以把狀態簡單地理解為前面的數字部分,這樣的簡化基本不影響結論。
各個狀態的解釋如下:
RUNNING,正常運行狀態,可以接受新的任務和處理隊列中的任務
SHUTDOWN,關閉中狀態,不能接受新任務,但是可以處理隊列中的任務
STOP,停止中狀態,不能接受新任務,也不處理隊列中的任務,會中斷進行中的任務
TIDYING,待結束狀態,所有任務已經結束,線程數歸0,進入TIDYING狀態后將會運行terminated()方法
TERMINATED,結束狀態,terminated()方法調用完成后進入
這幾個狀態所對應的數字值是按照順序排列的,也就是說線程池的狀態只能從小到大變化,這也方便了通過數字比較來判斷狀態所在的階段,這種通過數字大小來比較狀態值的方法在ThreadPoolExecutor的源碼中會有大量的使用。
下圖是這五個狀態之間的變化過程:
當線程池被創建時會處于RUNNING狀態,正常接受和處理任務;
當shutdown()方法被直接調用,或者在線程池對象被GC回收時通過finalize()方法隱式調用了shutdown()方法時,線程池會進入SHUTDOWN狀態。該狀態下線程池仍然會繼續執行完阻塞隊列中的任務,只是不再接受新的任務了。當隊列中的任務被執行完后,線程池中的線程也會被回收。當隊列和線程都被清空后,線程池將進入TIDYING狀態;
在線程池處于RUNNING或者SHUTDOWN狀態時,如果有代碼調用了shutdownNow()方法,則線程池會進入STOP狀態。在STOP狀態下,線程池會直接清空阻塞隊列中待執行的任務,然后中斷所有正在進行中的任務并回收線程。當線程都被清空以后,線程池就會進入TIDYING狀態;
當線程池進入TIDYING狀態時,將會運行terminated()方法,該方法執行完后,線程池就會進入最終的TERMINATED狀態,徹底結束。
到這里我們就已經清楚地了解了線程從剛被創建時的RUNNING狀態一直到最終的TERMINATED狀態的整個生命周期了。那么當我們要向一個RUNNING狀態的線程池提交任務時會發生些什么呢?
execute方法的實現我們一般會使用execute方法提交我們的任務,那么線程池在這個過程中做了什么呢?在ThreadPoolExecutor類的execute()方法的源代碼中,我們主要做了四件事:
如果當前線程池中的線程數小于核心線程數corePoolSize,則通過threadFactory創建一個新的線程,并把入參中的任務作為第一個任務傳入該線程;
如果當前線程池中的線程數已經達到了核心線程數corePoolSize,那么就會通過阻塞隊列workerQueue的offer方法來將任務添加到隊列中保存,并等待線程空閑后進行執行;
如果線程數已經達到了corePoolSize且阻塞隊列中無法插入該任務(比如已滿),那么線程池就會再增加一個線程來執行該任務,除非線程數已經達到了最大線程數maximumPoolSize;
如果確實已經達到了最大線程數,那么就會通過拒絕策略對象handler拒絕這個任務。
總體上的執行流程如下,下方的黑色同心圓代表流程結束:
這里解釋一下阻塞隊列的定義,方便大家閱讀:
線程池中的阻塞隊列專門用于存放需要等待線程空閑的待執行任務,而阻塞隊列是這樣的一種數據結構,它是一個隊列(類似于一個List),可以存放0到N個元素。我們可以對這個隊列進行插入和彈出元素的操作,彈出操作可以理解為是一個獲取并從隊列中刪除一個元素的操作。當隊列中沒有元素時,對這個隊列的獲取操作將會被阻塞,直到有元素被插入時才會被喚醒;當隊列已滿時,對這個隊列的插入操作將會被阻塞,直到有元素被彈出后才會被喚醒。這樣的一種數據結構非常適合于線程池的場景,當一個工作線程沒有任務可處理時就會進入阻塞狀態,直到有新任務提交后才被喚醒。
線程池中常用的阻塞隊列一般有三種類型:直連隊列、無界隊列、有界隊列。不同的阻塞隊列類型會被線程池的行為產生不同的影響,有興趣的讀者可以在上一篇文章《從0到1玩轉線程池》中找到不同類型阻塞隊列的具體解釋。
下面是帶有注釋的源代碼,大家可以和上面的流程對照起來參考一下:
public void execute(Runnable command) { // 檢查提交的任務是否為空 if (command == null) throw new NullPointerException(); // 獲取控制變量值 int c = ctl.get(); // 檢查當前線程數是否達到了核心線程數 if (workerCountOf(c) < corePoolSize) { // 未達到核心線程數,則創建新線程 // 并將傳入的任務作為該線程的第一個任務 if (addWorker(command, true)) // 添加線程成功則直接返回,否則繼續執行 return; // 因為前面調用了耗時操作addWorker方法 // 所以線程池狀態有可能發生了改變,重新獲取狀態值 c = ctl.get(); } // 判斷線程池當前狀態是否是運行中 // 如果是則調用workQueue.offer方法將任務放入阻塞隊列 if (isRunning(c) && workQueue.offer(command)) { // 因為執行了耗時操作“放入阻塞隊列”,所以重新獲取狀態值 int recheck = ctl.get(); // 如果當前狀態不是運行中,則將剛才放入阻塞隊列的任務拿出,如果拿出成功,則直接拒絕這個任務 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) // 如果線程池中沒有線程了,那就創建一個 addWorker(null, false); } // 如果放入阻塞隊列失敗(如隊列已滿),則添加一個線程 else if (!addWorker(command, false)) // 如果添加線程失敗(如已經達到了最大線程數),則拒絕任務 reject(command); }
從上面的源碼中我們可以知道,當一個任務被通過ThreadPoolExecutor的execute方法提交到線程池中執行時,這個任務有可能以兩種方式被執行:
直接在創建一個新的Worker時被作為第一個任務傳入,由這個新創建的線程來執行;
把任務放入一個阻塞隊列,等待線程池中的工作線程Worker撈取任務進行執行。
這里的這個Worker指的就是ThreadPoolExecutor.Worker類,這是一個ThreadPoolExecutor的內部類,用于對基礎線程類Thread進行包裝和對線程進行管理。那么線程池到底是怎么利用Worker類來實現持續不斷地接收提交的任務并執行的呢?接下來,我們通過ThreadPoolExecutor的源代碼來一步一步抽絲剝繭,揭開線程池運行模型的神秘面紗。
addWorker方法在上文中的execute方法的代碼中我們可以看到線程池是通過addWorker方法來向線程池中添加新線程的,那么新的線程又是如何運行起來的呢?
這里我們暫時跳過addWorker方法的詳細源代碼,因為雖然這個方法的代碼行數較多,但是功能相對比較直接,只是通過new Worker(firstTask)創建了一個代表線程的Worker對象,然后調用了這個對象所包含的Thread對象的start()方法。
我們知道一旦調用了Thread類的start()方法,則這個線程就會開始執行創建線程時傳入的Runnable對象。從下面的Worker類構造器源代碼可以看出,Worker類正是把自己(this引用)傳入了線程的構造器當中,所以這個線程啟動后就會執行Worker類的run()方法了,而在Worker的run()方法中只執行了一行很簡單的代碼runWorker(this)。
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); }runWorker方法的實現
我們看到線程池中的線程在啟動時會調用對應的Worker類的runWorker方法,而這里就是整個線程池任務執行的核心所在了。runWorker方法中包含有一個類似無限循環的while語句,讓worker對象可以一直持續不斷地執行提交到線程池中的新任務或者等待下一個新任務的提交。
大家可以配合代碼上帶有的注釋來理解該方法的具體實現:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; // 將worker的狀態重置為正常狀態,因為state狀態值在構造器中被初始化為-1 w.unlock(); // 通過completedAbruptly變量的值判斷任務是否正常執行完成 boolean completedAbruptly = true; try { // 如果task為null就通過getTask方法獲取阻塞隊列中的下一個任務 // getTask方法一般不會返回null,所以這個while類似于一個無限循環 // worker對象就通過這個方法的持續運行來不斷處理新的任務 while (task != null || (task = getTask()) != null) { // 每一次任務的執行都必須獲取鎖來保證下方臨界區代碼的線程安全 w.lock(); // 如果狀態值大于等于STOP(狀態值是有序的,即STOP、TIDYING、TERMINATED) // 且當前線程還沒有被中斷,則主動中斷線程 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); // 開始 try { // 執行任務前處理操作,默認是一個空實現 // 在子類中可以通過重寫來改變任務執行前的處理行為 beforeExecute(wt, task); // 通過thrown變量保存任務執行過程中拋出的異常 // 提供給下面finally塊中的afterExecute方法使用 Throwable thrown = null; try { // *** 重要:實際執行任務的代碼 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { // 因為Runnable接口的run方法中不能拋出Throwable對象 // 所以要包裝成Error對象拋出 thrown = x; throw new Error(x); } finally { // 執行任務后處理操作,默認是一個空實現 // 在子類中可以通過重寫來改變任務執行后的處理行為 afterExecute(task, thrown); } } finally { // 將循環變量task設置為null,表示已處理完成 task = null; // 累加當前worker已經完成的任務數 w.completedTasks++; // 釋放while體中第一行獲取的鎖 w.unlock(); } } // 將completedAbruptly變量設置為false,表示任務正常處理完成 completedAbruptly = false; } finally { // 銷毀當前的worker對象,并完成一些諸如完成任務數量統計之類的輔助性工作 // 在線程池當前狀態小于STOP的情況下會創建一個新的worker來替換被銷毀的worker processWorkerExit(w, completedAbruptly); } }
在runWorker方法的源代碼中有兩個比較重要的方法調用,一個是while條件中對getTask方法的調用,一個是在方法的最后對processWorkerExit方法的調用。下面是對這兩個方法更詳細的解釋。
getTask方法在阻塞隊列中有待執行的任務時會從隊列中彈出一個任務并返回,如果阻塞隊列為空,那么就會阻塞等待新的任務提交到隊列中直到超時(在一些配置下會一直等待而不超時),如果在超時之前獲取到了新的任務,那么就會將這個任務作為返回值返回。所以一般getTask方法是不會返回null的,只會阻塞等待下一個任務并在之后將這個新任務作為返回值返回。
當getTask方法返回null時會導致當前Worker退出,當前線程被銷毀。在以下情況下getTask方法才會返回null:
當前線程池中的線程數超過了最大線程數。這是因為運行時通過調用setMaximumPoolSize修改了最大線程數而導致的結果;
線程池處于STOP狀態。這種情況下所有線程都應該被立即回收銷毀;
線程池處于SHUTDOWN狀態,且阻塞隊列為空。這種情況下已經不會有新的任務被提交到阻塞隊列中了,所以線程應該被銷毀;
線程可以被超時回收的情況下等待新任務超時。線程被超時回收一般有以下兩種情況:
超出核心線程數部分的線程等待任務超時
允許核心線程超時(線程池配置)的情況下線程等待任務超時
processWorkerExit方法會銷毀當前線程對應的Worker對象,并執行一些累加總處理任務數等輔助操作,但在線程池當前狀態小于STOP的情況下會創建一個新的Worker來替換被銷毀的Worker。
對getTask和processWorkerExit方法源代碼感興趣的讀者可以閱讀下一節來具體了解一下,不過跳過這一節也是完全可以的。
getTask與processWorkerExit方法源代碼以下是getTask與processWorkerExit兩個方法的帶有中文解釋的源代碼:
private Runnable getTask() { // 通過timeOut變量表示線程是否空閑時間超時了 boolean timedOut = false; // 無限循環 for (;;) { // 獲取線程池狀態 int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 如果 線程池狀態>=STOP // 或者 (線程池狀態==SHUTDOWN && 阻塞隊列為空) // 則直接減少一個worker計數并返回null(返回null會導致當前worker被銷毀) if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } // 獲取線程池中的worker計數 int wc = workerCountOf(c); // 判斷當前線程是否會被超時銷毀 // 會被超時銷毀的情況:線程池允許核心線程超時 或 當前線程數大于核心線程數 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 如果 (當前線程數大于最大線程數 或 (允許超時銷毀 且 當前發生了空閑時間超時)) // 且 (當前線程數大于1 或 阻塞隊列為空) —— 該條件在阻塞隊列不為空的情況下保證至少會保留一個線程繼續處理任務 // 則 減少worker計數并返回null(返回null會導致當前worker被銷毀) if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // 從阻塞隊列中取出一個任務(如果隊列為空會進入阻塞等待狀態) // 如果允許空閑超時銷毀線程的話則帶有一個等待的超時時間 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 如果獲取到了任務就直接返回該任務,返回后會開始執行該任務 if (r != null) return r; // 如果任務為null,則說明發生了等待超時,將空閑時間超時標志設置為true timedOut = true; } catch (InterruptedException retry) { // 如果等待被中斷了,那說明空閑時間(等待任務的時間)還沒有超時 timedOut = false; } } }
processWorkerExit方法的源代碼:
private void processWorkerExit(Worker w, boolean completedAbruptly) { // 如果completedAbruptly為true則表示任務執行過程中拋出了未處理的異常 // 所以還沒有正確地減少worker計數,這里需要減少一次worker計數 if (completedAbruptly) decrementWorkerCount(); // 獲取線程池的主鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 把將被銷毀的線程已完成的任務數累計到線程池的完成任務總數上 completedTaskCount += w.completedTasks; // 從worker集合中去掉將會銷毀的worker workers.remove(w); } finally { // 釋放線程池主鎖 mainLock.unlock(); } // 嘗試結束線程池 // 這里是為了在關閉線程池時等到所有worker都被回收后再結束線程池 tryTerminate(); int c = ctl.get(); // 如果線程池狀態 < STOP,即RUNNING或SHUTDOWN // 則需要考慮創建新線程來代替被銷毀的線程 if (runStateLessThan(c, STOP)) { // 如果worker是正常執行完的,則要判斷一下是否已經滿足了最小線程數要求 // 否則直接創建替代線程 if (!completedAbruptly) { // 如果允許核心線程超時則最小線程數是0,否則最小線程數等于核心線程數 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 如果阻塞隊列非空,則至少要有一個線程繼續執行剩下的任務 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 如果當前線程數已經滿足最小線程數要求 // 那么就不創建替代線程了 if (workerCountOf(c) >= min) return; } // 重新創建一個worker來代替被銷毀的線程 addWorker(null, false); } }總結
到這里我們的線程池源代碼之旅就結束了,在這篇文章中我們首先了解了線程池中的控制變量與狀態變換流程,之后我們通過線程池的源代碼深入解析了從提交任務到執行任務的全過程,相信通過這些知識我們已經可以在腦海中建立起一套完整的線程池運行模型了。如果大家有一些細節感覺還不是特別清晰的話,建議不妨再返回到文章的開頭多讀幾遍,相信第二遍的閱讀能給大家帶來不一樣的體驗,因為我自己也是在第三次讀ThreadPoolExecutor類的源代碼時才真正打通了其中的一些重要關節的。
引子在瀏覽ThreadPoolExexutor源碼的過程中,有幾個點我們其實并沒有完全說清楚,比如對鎖的加鎖操作、對控制變量的多次獲取、控制變量的AtomicInteger類型。在下一篇文章中,我將會介紹這些以鎖、volatile變量、CAS操作、AQS抽象類為代表的一系列線程同步方法,歡迎感興趣的讀者繼續關注我后續發布的文章~
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/73889.html
摘要:前言本文內容基本摘抄自深入理解虛擬機,以供復習之用,沒有多少參考價值。此區域是唯一一個在虛擬機規范中沒有規定任何情況的區域。堆是所有線程共享的內存區域,在虛擬機啟動時創建。虛擬機上把方法區稱為永久代。 前言 本文內容基本摘抄自《深入理解Java虛擬機》,以供復習之用,沒有多少參考價值。想要更詳細了解請參考原書。 第二章 1.運行時數據區域 showImg(https://segment...
摘要:面向對象的分布式爬蟲框架一簡介概述是一個面向對象的分布式爬蟲框架。分布式集群集群方式維護爬蟲爬蟲運行數據,可通過或定制實現。 《面向對象的分布式爬蟲框架XXL-CRAWLER》 showImg(https://segmentfault.com/img/remote/1460000011842697);showImg(https://segmentfault.com/img/remote...
摘要:為程序員金三銀四精心挑選的余道面試題與答案,歡迎大家向我推薦你在面試過程中遇到的問題我會把大家推薦的問題添加到下面的常用面試題清單中供大家參考。 為Java程序員金三銀四精心挑選的300余道Java面試題與答案,歡迎大家向我推薦你在面試過程中遇到的問題,我會把大家推薦的問題添加到下面的常用面試題清單中供大家參考。 前兩天寫的以下博客,大家比較認可,熱度不錯,希望可以幫到準備或者正在參加...
閱讀 5030·2021-09-07 09:58
閱讀 781·2019-08-30 15:55
閱讀 2909·2019-08-30 15:55
閱讀 915·2019-08-30 15:53
閱讀 1549·2019-08-29 12:57
閱讀 1796·2019-08-26 13:46
閱讀 559·2019-08-26 11:00
閱讀 3658·2019-08-23 15:42