摘要:所以,在時執行也是為了保證線程池在狀態下必須要有一個線程來執行任務。
這篇文章對ThreadPoolExecutor創建的線程池如何操作線程的生命周期通過源碼的方式進行詳細解析。通過對execute方法、addWorker方法、Worker類、runWorker方法、getTask方法、processWorkerExit從源碼角度詳細闡述,文末有彩蛋。exexcte方法
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); /** * workerCountOf方法取出低29位的值,表示當前活動的線程數; * 如果當前活動的線程數小于corePoolSize,則新建一個線程放入線程池中,并把該任務放到線程中 */ if (workerCountOf(c) < corePoolSize) { /** * addWorker中的第二個參數表示限制添加線程的數量 是根據據corePoolSize 來判斷還是maximumPoolSize來判斷; * 如果是ture,根據corePoolSize判斷 * 如果是false,根據maximumPoolSize判斷 */ if (addWorker(command, true)) return; /** * 如果添加失敗,則重新獲取ctl值 */ c = ctl.get(); } /** * 如果線程池是Running狀態,并且任務添加到隊列中 */ if (isRunning(c) && workQueue.offer(command)) { //double-check,重新獲取ctl的值 int recheck = ctl.get(); /** * 再次判斷線程池的狀態,如果不是運行狀態,由于之前已經把command添加到阻塞隊列中,這時候需要從隊列中移除command; * 通過handler使用拒絕策略對該任務進行處理,整個方法返回 */ if (!isRunning(recheck) && remove(command)) reject(command); /** * 獲取線程池中的有效線程數,如果數量是0,則執行addWorker方法; * 第一個參數為null,表示在線程池中創建一個線程,但不去啟動 * 第二個參數為false,將線程池的線程數量的上限設置為maximumPoolSize,添加線程時根據maximumPoolSize來判斷 */ else if (workerCountOf(recheck) == 0) addWorker(null, false); /** * 執行到這里,有兩種情況: * 1、線程池的狀態不是RUNNING; * 2、線程池狀態是RUNNING,但是workerCount >= corePoolSize, workerQueue已滿 * 這個時候,再次調用addWorker方法,第二個參數傳false,將線程池的有限線程數量的上限設置為maximumPoolSize; * 如果失敗則執行拒絕策略; */ } else if (!addWorker(command, false)) reject(command); }
簡單來說,在執行execute()方法時如果狀態一直是RUNNING時,的執行過程如下:
如果workerCount < corePoolSize,則創建并啟動一個線程來執行新提交的任
務;
如果workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添
加到該阻塞隊列中;
如 果 workerCount >= corePoolSize && workerCount <
maximumPoolSize,且線程池內的阻塞隊列已滿,則創建并啟動一個線程來執行新
提交的任務;
如果workerCount >= maximumPoolSize,并且線程池內的阻塞隊列已滿, 則根
據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。
這里要注意一下addWorker(null, false);,也就是創建一個線程,但并沒有傳入任務,因為
任務已經被添加到workQueue中了,所以worker在執行的時候,會直接從workQueue中
獲取任務。所以,在workerCountOf(recheck) == 0時執行addWorker(null, false);也是
為了保證線程池在RUNNING狀態下必須要有一個線程來執行任務。
addWorker方法的主要作用是在線程池中創建一個新的線程并執行,firstTask參數用于指定新增的線程執行的第一個任務,core參數為true表示在新增線程時會判斷當前活動線程數是否少于corePoolSize ,false表示新增線程前需要判斷當前活動的線程數是否少于maximumPoolSize
private boolean addWorker(Runnable firstTask, boolean core) { retry: /** * 由于線程執行過程中,各種情況都有可能處于,通過自旋的方式來保證worker的增加; */ for (; ; ) { int c = ctl.get(); //獲取線程池運行狀態 int rs = runStateOf(c); /** * * 如果rs >= SHUTDOWN, 則表示此時不再接收新任務; * 接下來是三個條件 通過 && 連接,只要有一個任務不滿足,就返回false; * 1.rs == SHUTDOWN,表示關閉狀態,不再接收提交的任務,但卻可以繼續處理阻塞隊列中已經保存的任務; * 2.fisrtTask為空 * 3.Check if queue empty only if necessary. */ if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; for (; ; ) { //獲取線程池的線程數 int wc = workerCountOf(c); /** * 如果線程數 >= CAPACITY, 也就是ctl的低29位的最大值,則返回false; * 這里的core用來判斷 限制線程數量的上限是corePoolSize還是maximumPoolSize; * 如果core是ture表示根據corePoolSize來比較; * 如果core是false表示根據maximumPoolSize來比較; */ if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; /** * 通過CAS原子的方式來增加線程數量; * 如果成功,則跳出第一個for循環; */ if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl //如果當前運行的狀態不等于rs,說明線程池的狀態已經改變了,則返回第一個for循環繼續執行 if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //根據firstTask來創建Worker對象 w = new Worker(firstTask); //每一個Worker對象都會創建一個線程 final Thread t = w.thread; if (t != null) { //創建可重入鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 獲取線程池的狀態 int rs = runStateOf(ctl.get()); /** * 線程池的狀態小于SHUTDOWN,表示線程池處于RUNNING狀態; * 如果rs是RUNNING狀態或rs是SHUTDOWN狀態并且firstTask為null,向線程池中添加線程; * 因為在SHUTDOWN狀態時不會再添加新的任務,但還是處理workQueue中的任務; */ if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //workers是一個hashSet workers.add(w); int s = workers.size(); //largestPoolSize記錄線程池中出現的最大的線程數量 if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { //啟動線程,Worker實現了Running方法,此時會調用Worker的run方法 t.start(); workerStarted = true; } } } finally { if (!workerStarted) addWorkerFailed(w); } return workerStarted; }Worker類
線程池中的每一個對象被封裝成一個Worker對象,ThreadPool維護的就是一組Worker對象。
Worker類繼承了AQS,并實現了Runnable接口,其中包含了兩個重要屬性:firstTask用來保存傳入的任務,thread是在調用構造方法是通過ThreadFactory來創建的線程,是用來處理任務的線程。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { final Thread thread; Runnable firstTask; volatile long completedTasks; Worker(Runnable firstTask) { /** * 把state設置為-1,,阻止中斷直到調用runWorker方法; * 因為AQS默認state是0,如果剛創建一個Worker對象,還沒有執行任務時,這時候不應該被中斷 */ setState(-1); this.firstTask = firstTask; /** * 創建一個線程,newThread方法傳入的參數是this,因為Worker本身繼承了Runnable接口,也就是一個線程; * 所以一個Worker對象在啟動的時候會調用Worker類中run方法 */ this.thread = getThreadFactory().newThread(this); } }
Worker類繼承了AQS,使用AQS來實現獨占鎖的功能。為什么不使用ReentrantLock來實現?
可以看到tryAcquire方法,他是不允許重入的,而ReentrantLock是允許可重入的:
lock方法一旦獲取獨占鎖,表示當前線程正在執行任務中;
如果正在執行任務,則不應該中斷線程;
如果該線程現在不是獨占鎖的狀態,也就是空閑狀態,說明它沒有處理任務,這時可以對該線程進行中斷;
線程池中執行shutdown方法或tryTerminate方法時會調用interruptIdleWorkers方法來中斷空閑線程,interruptIdleWorkers方法會使用tryLock方法來判斷線程池中的線程是否是空閑狀態;
之所以設置為不可重入的,是因為在任務調用setCorePoolSize這類線程池控制的方法時,不會中斷正在運行的線程
所以,Worker繼承自AQS,用于判斷線程是否空閑以及是否處于被中斷。
protected boolean tryAcquire(int unused) { /** * cas修改state,不可重入; * state根據0來判斷,所以Worker構造方法中講state置為-1是為了禁止在執行任務前對線程進行中斷; * 因此,在runWorker方法中會先調用Worker對象的unlock方法將state設置為0 */ if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }runWorker方法
在Worker類中的run方法調用了runWorker方法來執行任務
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); //獲取第一個任務 Runnable task = w.firstTask; w.firstTask = null; //允許中斷 w.unlock(); //是否因異常退出循環 boolean completedAbruptly = true; try { //如果task為空,則通過getTask來獲取任務 while (task != null || (task = getTask()) != null) { w.lock(); /** * 如果線程池正在停止,那么要保證當前線程時中斷狀態; * 如果不是的話,則要保證當前線程不是中斷狀態 */ if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //beforeExecute和afterExecute是留給子類來實現的 beforeExecute(wt, task); Throwable thrown = null; try { //通過任務方式執行,不是線程方式 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //processWorkerExit會對completedAbruptly進行判斷,表示在執行過程中是否出現異常 processWorkerExit(w, completedAbruptly); } }
總結一下runWorker方法的執行過程:
while循環不斷地通過getTask方法來獲取任務;
getTask方法從阻塞隊列中獲取任務;
如果線程池正在停止,那么要保證當前線程處于中斷狀態, 否則要保證當前線程不是中斷狀態;
調用task.run()執行任務;
如果task為null則會跳出循環,執行processWorkerExit方法;
runWorker方法執行完畢,也代表著Worker中的run方法執行完畢,銷毀線程。
getTask方法getTask方法用于從阻塞隊列中獲取任務
private Runnable getTask() { //timeout變量的值表示上次從阻塞隊列中獲取任務是否超時 boolean timedOut = false; for (; ; ) { int c = ctl.get(); int rs = runStateOf(c); /** * 如果rs >= SHUTDOWN,表示線程池非RUNNING狀態,需要再次判斷: * 1、rs >= STOP ,線程池是否正在STOP * 2、阻塞隊列是否為空 * 滿足上述條件之一,則將workCount減一,并返回null; * 因為如果當前線程池的狀態處于STOP及以上或隊列為空,不能從阻塞隊列中獲取任務; */ if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); /** * timed變量用于判斷是否需要進行超時控制; * allowCoreThreadTimeOut默認是false,也就是核心線程不允許進行超時; * wc > corePoolSize,表示當前線程數大于核心線程數量; * 對于超過核心線程數量的這些線程,需要進行超時控制; */ boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /** * wc > maximumPoolSize的情況是因為可能在此方法執行階段同時執行了 setMaximumPoolSize方法; * timed && timedOut 如果為true,表示當前操作需要進行超時控制,并且上次從阻塞隊列中獲取任務發生了超時; * 接下來判斷,如果有效咸亨數量大于1,或者workQueue為空,那么將嘗試workCount減1; * 如果減1失敗,則返回重試; * 如果wc==1時,也就說明當前線程是線程池中的唯一線程了; */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } /** * timed為trure,則通過workQueue的poll方法進行超時控制,如果在keepAliveTime時間內沒有獲取任務,則返回null; * 否則通過take方法,如果隊列為空,則take方法會阻塞直到隊列中不為空; */ try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; //如果r==null,說明已經超時了,timedOut = true; timedOut = true; } catch (InterruptedException retry) { //如果獲取任務時當前線程發生了中斷,則將timedOut = false; timedOut = false; } } }
注意:第二個if判斷,目的是為了控制線程池的有效線程數量。
有上文分析得到,在execute方法時,如果當前線程池的線程數量超過coolPoolSize且小于maxmumPoolSize,并且阻塞隊列已滿時,則可以通過增加工作線程。但是如果工作線程在超時時間內沒有獲取到任務,timeOut=true,說明workQueue為空,也就說當前線程池不需要那么多線程來執行任務了,可以把多于的corePoolSize數量的線程銷毀掉,保證線程數量在corePoolSize即可。
什么時候會銷毀線程?
當然是runWorker方法執行完后,也就是Worker中的run方法執行完后,由JVM自動回收。
private void processWorkerExit(Worker w, boolean completedAbruptly) { /** * 如果completedAbruptly為true,則說明線程執行時出現異常,需要將workerCount數量減一 * 如果completedAbruptly為false,說明在getTask方法中已經對workerCount進行減一,這里不用再減 */ if (completedAbruptly) decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //統計完成的任務數 completedTaskCount += w.completedTasks; //從workers中移除,也就表示從線程池中移除一個工作線程 workers.remove(w); } finally { mainLock.unlock(); } //鉤子函數,根據線程池的狀態來判斷是否結束線程池 tryTerminate(); int c = ctl.get(); /** * 當前線程是RUNNING或SHUTDOWN時,如果worker是異常結束,那么會直接addWorker; * 如果allowCoreThreadTimeOut=true,那么等待隊列有任務,至少保留一個worker; * 如果allowCoreThreadTimeOut=false,workerCount少于coolPoolSize */ if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && !workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); } }
至此,processWorkerExit執行完之后,工作線程被銷毀。
工作執行流程工作線程的生命周期,從execute方法開始,Worker使用ThreadFactory創建新的工作線程,runWorker通過getTask獲取任務,然后執行任務,如果getTask返回null,進入processWorkerExit,整個線程結束。
還沒關注我的公眾號?
掃文末二維碼關注公眾號【小強的進階之路】可領取如下:
學習資料: 1T視頻教程:涵蓋Javaweb前后端教學視頻、機器學習/人工智能教學視頻、Linux系統教程視頻、雅思考試視頻教程;
100多本書:包含C/C++、Java、Python三門編程語言的經典必看圖書、LeetCode題解大全;
軟件工具:幾乎包括你在編程道路上的可能會用到的大部分軟件;
項目源碼:20個JavaWeb項目源碼。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/76287.html
摘要:創建方法最大線程數即源碼單線程化的線程池有且僅有一個工作線程執行任務所有任務按照指定順序執行,即遵循隊列的入隊出隊規則創建方法源碼還有一個結合了和,就不介紹了,基本不用。 *本篇文章已授權微信公眾號 guolin_blog (郭霖)獨家發布 為什么用線程池 創建/銷毀線程伴隨著系統開銷,過于頻繁的創建/銷毀線程,會很大程度上影響處理效率 >例如: > >記創建線程消耗時間T1,執行...
摘要:源碼分析創建可緩沖的線程池。源碼分析使用創建線程池源碼分析的構造函數構造函數參數核心線程數大小,當線程數,會創建線程執行最大線程數,當線程數的時候,會把放入中保持存活時間,當線程數大于的空閑線程能保持的最大時間。 之前創建線程的時候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...
摘要:最后,我們會通過對源代碼的剖析深入了解線程池的運行過程和具體設計,真正達到知其然而知其所以然的水平。創建線程池既然線程池是一個類,那么最直接的使用方法一定是一個類的對象,例如。單線程線程池單線程線程 我們一般不會選擇直接使用線程類Thread進行多線程編程,而是使用更方便的線程池來進行任務的調度和管理。線程池就像共享單車,我們只要在我們有需要的時候去獲取就可以了。甚至可以說線程池更棒,...
摘要:當活動線程核心線程非核心線程達到這個數值后,后續任務將會根據來進行拒絕策略處理。線程池工作原則當線程池中線程數量小于則創建線程,并處理請求。當線程池中的數量等于最大線程數時默默丟棄不能執行的新加任務,不報任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點記錄以及采用的解決方案 深入分析 java 線程池的實現原理 在這篇文章中,作者有條不紊的將 ja...
閱讀 2566·2023-04-25 18:13
閱讀 783·2021-11-22 12:10
閱讀 2978·2021-11-22 11:57
閱讀 2142·2021-11-19 11:26
閱讀 2176·2021-09-22 15:40
閱讀 1464·2021-09-03 10:28
閱讀 2707·2019-08-30 15:53
閱讀 1954·2019-08-30 15:44