摘要:類型是位二進制標示,其中高位用來表示線程池狀態,后面位用來記錄線程池線程個數。創建一個最小線程個數為,最大為,阻塞隊列為的線程池。
一、 前言
線程池主要解決兩個問題:一方面當執行大量異步任務時候線程池能夠提供較好的性能,這是因為使用線程池可以使每個任務的調用開銷減少(因為線程池線程是可以復用的)。另一方面線程池提供了一種資源限制和管理的手段,比如當執行一系列任務時候對線程的管理,每個ThreadPoolExecutor也保留了一些基本的統計數據,比如當前線程池完成的任務數目。
二、 類圖結構Executors其實是個工具類,里面提供了好多靜態方法,根據用戶選擇返回不同的線程池實例。
ThreadPoolExecutor繼承了AbstractExecutorService,成員變量ctl是個Integer的原子變量用來記錄線程池狀態 和 線程池線程個數,類似于ReentrantReadWriteLock使用一個變量存放兩種信息。
Integer類型是32位二進制標示,其中高3位用來表示線程池狀態,后面 29位用來記錄線程池線程個數。
線程池狀態含義:
RUNNING:接受新任務并且處理阻塞隊列里的任務
SHUTDOWN:拒絕新任務但是處理阻塞隊列里的任務
STOP:拒絕新任務并且拋棄阻塞隊列里的任務同時會中斷正在處理的任務
TIDYING:所有任務都執行完(包含阻塞隊列里面任務)當前線程池活動線程為0,將要調用terminated方法
TERMINATED:終止狀態。terminated方法調用完成以后的狀態
線程池狀態轉換:
RUNNING -> SHUTDOWN
顯式調用shutdown()方法,或者隱式調用了finalize(),它里面調用了shutdown()方法。
RUNNING or SHUTDOWN)-> STOP
顯式 shutdownNow()方法
SHUTDOWN -> TIDYING
當線程池和任務隊列都為空的時候
STOP -> TIDYING
當線程池為空的時候
TIDYING -> TERMINATED
當 terminated() hook 方法執行完成時候
線程池參數:
corePoolSize:線程池核心線程個數
workQueue:用于保存等待執行的任務的阻塞隊列。
比如基于數組的有界ArrayBlockingQueue、,基于鏈表的無界LinkedBlockingQueue,最多只有一個元素的同步隊列SynchronousQueue,優先級隊列PriorityBlockingQueue,具體可參考 https://www.atatech.org/artic...
maximunPoolSize:線程池最大線程數量。
ThreadFactory:創建線程的工廠
RejectedExecutionHandler:飽和策略,當隊列滿了并且線程個數達到maximunPoolSize后采取的策略,比如AbortPolicy(拋出異常),CallerRunsPolicy(使用調用者所在線程來運行任務),DiscardOldestPolicy(調用poll丟棄一個任務,執行當前任務),DiscardPolicy(默默丟棄,不拋出異常)
keeyAliveTime:存活時間。如果當前線程池中的線程數量比基本數量要多,并且是閑置狀態的話,這些閑置的線程能存活的最大時間
TimeUnit,存活時間的時間單位
線程池類型:
newFixedThreadPool
創建一個核心線程個數和最大線程個數都為nThreads的線程池,并且阻塞隊列長度為Integer.MAX_VALUE,keeyAliveTime=0說明只要線程個數比核心線程個數多并且當前空閑則回收。
newSingleThreadExecutor
創建一個核心線程個數和最大線程個數都為1的線程池,并且阻塞隊列長度為Integer.MAX_VALUE,keeyAliveTime=0說明只要線程個數比核心線程個數多并且當前空閑則回收。
newCachedThreadPool
創建一個按需創建線程的線程池,初始線程個數為0,最多線程個數為Integer.MAX_VALUE,并且阻塞隊列為同步隊列,keeyAliveTime=60說明只要當前線程60s內空閑則回收。這個特殊在于加入到同步隊列的任務會被馬上被執行,同步隊列里面最多只有一個任務,并且存在后馬上會拿出執行。
newSingleThreadScheduledExecutor
創建一個最小線程個數corePoolSize為1,最大為Integer.MAX_VALUE,阻塞隊列為DelayedWorkQueue的線程池。
其中Worker繼承AQS和Runnable是具體承載任務的對象,Worker繼承了AQS自己實現了簡單的不可重入獨占鎖,其中status=0標示鎖未被獲取狀態也就是未被鎖住的狀態,state=1標示鎖已經被獲取的狀態也就是鎖住的狀態。
DefaultThreadFactory是線程工廠,newThread方法是對線程的一個分組包裹,其中poolNumber是個靜態的原子變量,用來統計線程工廠的個數,threadNumber用來記錄每個線程工廠創建了多少線程。
三、 源碼分析3.1 添加任務到線程池exectue方法
如果當前線程池線程個數小于corePoolSize則開啟新線程
否則添加任務到任務隊列
如果任務隊列滿了,則嘗試新開啟線程執行任務,如果線程個數>maximumPoolSize則執行拒絕策略。
重點看addWorkder方法:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 檢查隊列是否只在必要時為空.(1) if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; //循環cas增加線程個數 for (;;) { int wc = workerCountOf(c); //如果線程個數超限則返回false if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; //cas增加線程個數,同時只有一個線程成功 if (compareAndIncrementWorkerCount(c)) break retry; //cas失敗了,則看線程池狀態是否變化了,變化則跳到外層循環重試重新獲取線程池狀態,否者內層循環重新cas。 c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; } } //到這里說明cas成功了,(2) boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { //創建worker final ReentrantLock mainLock = this.mainLock; w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //加獨占鎖,為了workers同步,因為可能多個線程調用了線程池的execute方法。 mainLock.lock(); try { //重新檢查線程池狀態,為了避免在獲取鎖前調用了shutdown接口(3) int c = ctl.get(); int rs = runStateOf(c); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //添加任務 workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } //添加成功則啟動任務 if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted;}
代碼比較長,主要分兩部分,第一部分雙重循環目的是通過cas增加線程池線程個數,第二部分主要是并發安全的把任務添加到workers里面,并且啟動任務執行。
先看第一部分的(1)
展開!運算后等價于
也就是說下面幾種情況下會返回false:
當前線程池狀態為STOP,TIDYING,TERMINATED
當前線程池狀態為SHUTDOWN并且已經有了第一個任務
當前線程池狀態為SHUTDOWN并且任務隊列為空
內層循環作用是使用cas增加線程個數,如果線程個數超限則返回false,否者進行cas,cas成功則退出雙循環,否者cas失敗了,要看當前線程池的狀態是否變化了,如果變了,則重新進入外層循環重新獲取線程池狀態,否者進入內層循環繼續進行cas嘗試。
到了第二部分說明CAS成功了,也就是說線程個數加一了,但是現在任務還沒開始執行,這里使用全局的獨占鎖來控制workers里面添加任務,其實也可以使用并發安全的set,但是性能沒有獨占鎖好(這個從注釋中知道的)。這里需要注意的是要在獲取鎖后重新檢查線程池的狀態,這是因為其他線程可可能在本方法獲取鎖前改變了線程池的狀態,比如調用了shutdown方法。添加成功則啟動任務執行。
3.2 工作線程Worker的執行
先看下構造函數:
這里添加一個新狀態-1是為了避免當前線程worker線程被中斷,比如調用了線程池的shutdownNow,如果當前worker狀態>=0則會設置該線程的中斷標志。這里設置了-1所以條件不滿足就不會中斷該線程了。運行runWorker時候會調用unlock方法,該方法吧status變為了0,所以這時候調用shutdownNow會中斷worker線程。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // status設置為0,允許中斷 boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // 如果線程池當前狀態至少是stop,則設置中斷標志; // 如果線程池當前狀態是RUNNININ,則重置中斷標志,重置后需要重新 //檢查下線程池狀態,因為當重置中斷標志時候,可能調用了線程池的shutdown方法 //改變了線程池狀態。 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //任務執行前干一些事情 beforeExecute(wt, task); Throwable thrown = null; try { task.run();//執行任務 } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //任務執行完畢后干一些事情 afterExecute(task, thrown); } } finally { task = null; //統計當前worker完成了多少個任務 w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { //執行清了工作 processWorkerExit(w, completedAbruptly); } } 如果當前task為空,則直接執行,否者調用getTask從任務隊列獲取一個任務執行,如果任務隊列為空,則worker退出。 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // 如果當前線程池狀態>=STOP 或者線程池狀態為shutdown并且工作隊列為空則,減少工作線程個數 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } boolean timed; // Are workers subject to culling? for (;;) { int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc > corePoolSize; if (wc <= maximumPoolSize && ! (timedOut && timed)) break; if (compareAndDecrementWorkerCount(c)) return null; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } try { //根據timed選擇調用poll還是阻塞的take Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } }} private void processWorkerExit(Worker w, boolean completedAbruptly){ if (completedAbruptly) // If abrupt, then workerCount wasn"t adjusted decrementWorkerCount(); //統計整個線程池完成的任務個數 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); } finally { mainLock.unlock(); } //嘗試設置線程池狀態為TERMINATED,如果當前是shutdonw狀態并且工作隊列為空 //或者當前是stop狀態當前線程池里面沒有活動線程 tryTerminate(); //如果當前線程個數小于核心個數,則增加 int c = ctl.get(); if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { int min = allowCoreThreadTimeOut ? 0 : corePoolSize; if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } addWorker(null, false); }}
3.3 shutdown操作
調用shutdown后,線程池就不會在接受新的任務了,但是工作隊列里面的任務還是要執行的,但是該方法立刻返回的,并不等待隊列任務完成在返回。
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //權限檢查 checkShutdownAccess(); //設置當前線程池狀態為SHUTDOWN,如果已經是SHUTDOWN則直接返回 advanceRunState(SHUTDOWN); //設置中斷標志 interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //嘗試狀態變為TERMINATED tryTerminate(); }
如果當前狀態>=targetState則直接返回,否者設置當前狀態為targetState
private void advanceRunState(int targetState) {
for (;;) { int c = ctl.get(); if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } private void interruptIdleWorkers() { interruptIdleWorkers(false); }
設置所有線程的中斷標志,主要這里首先加了全局鎖,同時只有一個線程可以調用shutdown時候設置中斷標志,然后嘗試獲取worker自己的鎖,獲取成功則設置中斷標示
private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) { Thread t = w.thread; if (!t.isInterrupted() && w.tryLock()) { try { t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } if (onlyOne) break; } } finally { mainLock.unlock(); }}
3.4 shutdownNow操作
調用shutdown后,線程池就不會在接受新的任務了,并且丟棄工作隊列里面里面的任務,正在執行的任務會被中斷,但是該方法立刻返回的,并不等待激活的任務執行完成在返回。返回隊列里面的任務列表。
調用隊列的drainTo一次當前隊列的元素到taskList,
可能失敗,如果調用drainTo后隊列海不為空,則循環刪除,并添加到taskList
public List
Listtasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess();//權限檢查 advanceRunState(STOP);// 設置線程池狀態為stop interruptWorkers();//中斷線程 tasks = drainQueue();//移動隊列任務到tasks } finally { mainLock.unlock(); } tryTerminate(); return tasks;
}
調用隊列的drainTo一次當前隊列的元素到taskList,
可能失敗,如果調用drainTo后隊列海不為空,則循環刪除,并添加到taskList
private List
BlockingQueueq = workQueue; List taskList = new ArrayList (); q.drainTo(taskList); if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList;
}
3.5 awaitTermination操作
等待線程池狀態變為TERMINATED則返回,或者時間超時。由于整個過程獨占鎖,所以一般調用shutdown或者shutdownNow后使用。
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }四、總結
線程池巧妙的使用一個Integer類型原子變量來記錄線程池狀態和線程池線程個數,設計時候考慮到未來(2^29)-1個線程可能不夠用,到時只需要把原子變量變為Long類型,然后掩碼位數變下就可以了,但是為啥現在不一勞永逸的定義為Long那,主要是考慮到使用int類型操作時候速度上比Long類型快些。
通過線程池狀態來控制任務的執行,每個worker線程可以處理多個任務,線程池通過線程的復用減少了線程創建和銷毀的開銷,通過使用任務隊列避免了線程的阻塞從而避免了線程調度和線程上下文切換的開銷。
另外需要注意的是調用shutdown方法作用僅僅是修改線程池狀態讓現在任務失敗并中斷當前線程,這個中斷并不是讓正在運行的線程終止,而是僅僅設置下線程的中斷標志,如果線程內沒有使用中斷標志做一些事情,那么這個對線程沒有影響。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/19209.html
摘要:類型是位二進制標示,其中高位用來表示線程池狀態,后面位用來記錄線程池線程個數。創建一個最小線程個數為,最大為,阻塞隊列為的線程池。 一、 前言 線程池主要解決兩個問題:一方面當執行大量異步任務時候線程池能夠提供較好的性能,這是因為使用線程池可以使每個任務的調用開銷減少(因為線程池線程是可以復用的)。另一方面線程池提供了一種資源限制和管理的手段,比如當執行一系列任務時候對線程的管理,每個...
摘要:類型是位二進制標示,其中高位用來表示線程池狀態,后面位用來記錄線程池線程個數。創建一個最小線程個數為,最大為,阻塞隊列為的線程池。 一、 前言 線程池主要解決兩個問題:一方面當執行大量異步任務時候線程池能夠提供較好的性能,這是因為使用線程池可以使每個任務的調用開銷減少(因為線程池線程是可以復用的)。另一方面線程池提供了一種資源限制和管理的手段,比如當執行一系列任務時候對線程的管理,每個...
摘要:最后,我們會通過對源代碼的剖析深入了解線程池的運行過程和具體設計,真正達到知其然而知其所以然的水平。創建線程池既然線程池是一個類,那么最直接的使用方法一定是一個類的對象,例如。單線程線程池單線程線程 我們一般不會選擇直接使用線程類Thread進行多線程編程,而是使用更方便的線程池來進行任務的調度和管理。線程池就像共享單車,我們只要在我們有需要的時候去獲取就可以了。甚至可以說線程池更棒,...
閱讀 2420·2021-11-18 10:02
閱讀 687·2021-10-08 10:04
閱讀 2250·2021-09-03 10:51
閱讀 3540·2019-08-30 15:44
閱讀 2799·2019-08-29 14:09
閱讀 2464·2019-08-29 12:21
閱讀 2064·2019-08-26 13:45
閱讀 1800·2019-08-26 13:25