摘要:在前面介紹的文章中,提到了關于線程池的創建介紹,在文章之系列外部中第一部分有詳細的說明,請參閱文章中其實說明了外部的使用方式,但是沒有說內部是如何實現的,為了加深對實現的理解,在使用中可以放心,我們這里將做源碼解析以及反饋到原理上,工具可
在前面介紹JUC的文章中,提到了關于線程池Execotors的創建介紹,在文章:《java之JUC系列-外部Tools》中第一部分有詳細的說明,請參閱;
文章中其實說明了外部的使用方式,但是沒有說內部是如何實現的,為了加深對實現的理解,在使用中可以放心,我們這里將做源碼解析以及反饋到原理上,Executors工具可以創建普通的線程池以及schedule調度任務的調度池,其實兩者實現上還是有一些區別,但是理解了ThreadPoolExecutor,在看ScheduledThreadPoolExecutor就非常輕松了,后面的文章中也會專門介紹這塊,但是需要先看這篇文章。
使用Executors最常用的莫過于是使用:Executors.newFixedThreadPool(int)這個方法,因為它既可以限制數量,而且線程用完后不會一直被cache住;那么就通過它來看看源碼,回過頭來再看其他構造方法的區別:
在《java之JUC系列-外部Tools》文章中提到了構造方法,為了和本文對接,再貼下代碼。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
其實你可以自己new一個ThreadPoolExecutor,來達到自己的參數可控的程度,例如,可以將LinkedBlockingQueue換成其它的(如:SynchronousQueue),只是可讀性會降低,這里只是使用了一種設計模式。
我們現在來看看ThreadPoolExecutor的源碼是怎么樣的,也許你剛開始看他的源碼會很痛苦,因為你不知道作者為什么是這樣設計的,所以本文就我看到的思想會給你做一個介紹,此時也許你通過知道了一些作者的思想,你也許就知道應該該如何去操作了。
這里來看下構造方法中對那些屬性做了賦值:
源碼段1:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
這里你可以看到最終賦值的過程,可以先大概知道下參數的意思:
corePoolSize:核心運行的poolSize,也就是當超過這個范圍的時候,就需要將新的Runnable放入到等待隊列workQueue中了,我們把這些Runnable就叫做要去執行的任務吧。
maximumPoolSize:一般你用不到,當大于了這個值就會將任務由一個丟棄處理機制來處理,但是當你發生:newFixedThreadPool的時候,corePoolSize和maximumPoolSize是一樣的,而corePoolSize是先執行的,所以他會先被放入等待隊列,而不會執行到下面的丟棄處理中,看了后面的代碼你就知道了。
workQueue:等待隊列,當達到corePoolSize的時候,就向該等待隊列放入線程信息(默認為一個LinkedBlockingQueue),運行中的線程屬性為:workers,為一個HashSet;我們的Runnable內部被包裝了一層,后面會看到這部分代碼;這個隊列默認是一個無界隊列(你也可以設定一個有界隊列),所以在生產者瘋狂生產的時候,考慮如何控制的問題。
keepAliveTime:默認都是0,當線程沒有任務處理后,保持多長時間,當你使用:newCachedThreadPool(),它將是60s的時間。這個參數在運行中的線程從workQueue獲取任務時,當(poolSize >corePoolSize || allowCoreThreadTimeOut)會用到,當然allowCoreThreadTimeOut要設置為true,也會先判定keepAliveTime是大于0的,不過由于它在corePoolSize上采用了Integer.MAX_VALUE,當遇到系統遇到瞬間沖擊,workers就會迅速膨脹,所以這個地方就不要去設置allowCoreThreadTimeOut=true,否則結果是這些運行中的線程會持續60s以上;另外,如果corePoolSize的值還沒到Integer.MAX_VALUE,當超過那個值以后,這些運行中的線程,也是
threadFactory:是構造Thread的方法,你可以自己去包裝和傳遞,主要實現newThread方法即可;
handler:也就是參數maximumPoolSize達到后丟棄處理的方法,java提供了5種丟棄處理的方法,當然你也可以自己根據實際情況去重寫,主要是要實現接口:RejectedExecutionHandler中的方法: public void rejectedExecution(Runnabler, ThreadPoolExecutor e) java默認的是使用:AbortPolicy,他的作用是當出現這中情況的時候會拋出一個異常;
其余的還包含:
1、CallerRunsPolicy:如果發現線程池還在運行,就直接運行這個線程
2、DiscardOldestPolicy:在線程池的等待隊列中,將頭取出一個拋棄,然后將當前線程放進去。
3、DiscardPolicy:什么也不做
4、AbortPolicy:java默認,拋出一個異常:RejectedExecutionException。
你可以自己寫一個,例如我們想在這個處理中,既不是完全丟棄,也不是完全啟動,也不是拋異常,而是控制生產者的線程,那么你就可以嘗試某種方式將生產者的線程blocking住,其實就有點類似提到的Semaphor的功能了。
通常你得到線程池后,會調用其中的:submit方法或execute方法去操作;其實你會發現,submit方法最終會調用execute方法來進行操作,只是他提供了一個Future來托管返回值的處理而已,當你調用需要有返回值的信息時,你用它來處理是比較好的;這個Future會包裝對Callable信息,并定義一個Sync對象(),當你發生讀取返回值的操作的時候,會通過Sync對象進入鎖,直到有返回值的數據通知,具體細節先不要看太多。
繼續向下,來看看execute最為核心的方法吧: 源碼段2:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated } }
這段代碼看似簡單,其實有點難懂,很多人也是這里沒看懂,沒事,我一個if一個if說:
首先第一個判定空操作就不用說了,下面判定的poolSize >= corePoolSize成立時候會進入if的區域,當然它不成立也有可能會進入,他會判定addIfUnderCorePoolSize是否返回false,如果返回false就會進去;
我們先來看下addIfUnderCorePoolSize方法的源碼是什么:
源碼段3:
private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
可以發現,這段源碼是如果發現小雨corePoolSize就會創建一個新的線程,并且調用線程的start()方法將線程運行起來:這個addThread()方法,我們先不考慮細節,因為我們還要先看到前面是怎么進去的,這里可以發信啊,只有沒有創建成功Thread才會返回false,也就是當當前的poolSize > corePoolSize的時候,或線程池已經不是在running狀態的時候才會出現;
注意:這里在外部判定一次poolSize和corePoolSize只是初步判定,內部是加鎖后判定的,以得到更為準確的結果,而外部初步判定如果是大于了,就沒有必要進入這段有鎖的代碼了。
此時我們知道了,當前線程數量大于corePoolSize的時候,就會進入【代碼段2】的第一個if語句中,回到【源碼段2】,繼續看if語句中的內容:
這里標記為
源碼段4:
if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command)) reject(command); // is shutdown or saturated
第一個if,也就是當當前狀態為running的時候,就會去執行workQueue.offer(command),這個workQueue其實就是一個BlockingQueue,offer()操作就是在隊列的尾部寫入一個對象,此時寫入的對象為線程的對象而已;所以你可以認為只有線程池在RUNNING狀態,才會在隊列尾部插入數據,否則就執行else if,其實else if可以看出是要做一個是否大于MaximumPoolSize的判定,如果大于這個值,就會做reject的操作,關于reject的說明,我們在【源碼段1】的解釋中已經非常明確的說明,這里可以簡單看下源碼,以應征結果:
源碼段5:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < maximumPoolSize && runState == RUNNING) //在corePoolSize = maximumPoolSize下,該代碼幾乎不可能運行 t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; } void reject(Runnable command) { handler.rejectedExecution(command, this); }
也就是如果線程池滿了,而且線程池調用了shutdown后,還在調用execute方法時,就會拋出上面說明的異常:RejectedExecutionException 再回頭來看下【代碼段4】中進入到等待隊列后的操作:
if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command);
這段代碼是要在線程池運行狀態不是RUNNING或poolSize == 0才會調用,他是干啥呢? 他為什么會不等于RUNNING呢?外面那一層不是判定了他== RUNNING了么,其實有時間差就是了,如果是poolSize == 0也會執行這段代碼,但是里面的判定條件是如果不是RUNNING,就做reject操作,在第一個線程進去的時候,會將第一個線程直接啟動起來;很多人也是看這段代碼很繞,因為不斷的循環判定類似的判定條件,你主要記住他們之間有時間差,要取最新的就好了。 此時貌似代碼看完了?咦,此時有問題了: 1、 等待中的線程在后來是如何跑起來的呢?線程池是不是有類似Timer一樣的守護進程不斷掃描線程隊列和等待隊列?還是利用某種鎖機制,實現類似wait和notify實現的? 2、 線程池的運行隊列和等待隊列是如何管理的呢?這里還沒看出影子呢! NO,NO,NO! Java在實現這部分的時候,使用了怪異的手段,神馬手段呢,還要再看一部分代碼才曉得。 在前面【源碼段3】中,我們看到了一個方法叫:addThread(),也許很少有人會想到關鍵在這里,其實關鍵就是在這里: 我們看看addThread()方法到底做了什么。 源碼段6:
private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); if (t != null) { w.thread = t; workers.add(w); int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; } return t; }
這里創建了一個Worker,其余的操作,就是將poolSize++的操作,然后將將其放入workers的運行的HashSet中等操作;
我們主要關心Worker是干什么的,因為這個threadFactory對我們用途不大,只是做了Thread的命名處理;而Worker你會發現它的定義也是一個Runnable,外部開始在代碼段中發現了調用哪個這個Worker的start()方法,也就是線程的啟動方法,其實也就是調用了Worker的run()方法,那么我們重點要關心run方法是如何處理的
源碼段7:
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
FirstTask其實就是開始在創建work的時候,由外部傳入的Runnable對象,也就是你自己的Thread,你會發現它如果發現task為空,就會調用getTask()方法再判定,直到兩者為空,并且是一個while循環體。
那么看看getTask()方法的實現為:
源碼段8:
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } // Else retry } catch (InterruptedException ie) { // On interruption, re-check runState } } }
你會發現它是從workQueue隊列中,也就是等待隊列中獲取一個元素出來并返回!
回過頭來根據代碼段6理解下:
當前線程運行完后,在到workQueue中去獲取一個task出來,繼續運行,這樣就保證了線程池中有一定的線程一直在運行;此時若跳出了while循環,只有workQueue隊列為空才會出現或出現了類似于shutdown的操作,自然運行隊列會減少1,當再有新的線程進來的時候,就又開始向worker里面放數據了,這樣以此類推,實現了線程池的功能。
這里可以看下run方法的finally中調用的workerDone方法為:
源碼段9:
void workerDone(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { completedTaskCount += w.completedTasks; workers.remove(w); if (--poolSize == 0) tryTerminate(); } finally { mainLock.unlock(); } }
注意這里將workers.remove(w)掉,并且調用了—poolSize來做操作。
至于tryTerminate是做了更多關于回收方面的操作。
最后我們還要看一段代碼就是在【源碼段6】中出現的代碼調用為:runTask(task);這個方法也是運行的關鍵。
源碼段10:
private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) thread.interrupt(); boolean ran = false; beforeExecute(thread, task); try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } }
你可以看到,這里面的task為傳入的task信息,調用的不是start方法,而是run方法,因為run方法直接調用不會啟動新的線程,也是因為這樣,導致了你無法獲取到你自己的線程的狀態,因為線程池是直接調用的run方法,而不是start方法來運行。
這里有個beforeExecute和afterExecute方法,分別代表在執行前和執行后,你可以做一段操作,在這個類中,這兩個方法都是【空body】的,因為普通線程池無需做更多的操作。
如果你要實現類似暫停等待通知的或其他的操作,可以自己extends后進行重寫構造;
本文沒有介紹關于ScheduledThreadPoolExecutor調用的細節,下一篇文章會詳細說明,因為大部分代碼和本文一致,區別在于一些細節,在介紹:ScheduledThreadPoolExecutor的時候,會明確的介紹它與Timer和TimerTask的巨大區別,區別不在于使用,而是在于本身內在的處理細節。
via ifeve
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69763.html
摘要:表示的是兩個,當其中任意一個計算完并發編程之是線程安全并且高效的,在并發編程中經常可見它的使用,在開始分析它的高并發實現機制前,先講講廢話,看看它是如何被引入的。電商秒殺和搶購,是兩個比較典型的互聯網高并發場景。 干貨:深度剖析分布式搜索引擎設計 分布式,高可用,和機器學習一樣,最近幾年被提及得最多的名詞,聽名字多牛逼,來,我們一步一步來擊破前兩個名詞,今天我們首先來說說分布式。 探究...
摘要:在結構上引入了頭結點和尾節點,他們分別指向隊列的頭和尾,嘗試獲取鎖入隊服務教程在它提出十多年后的今天,已經成為最重要的應用技術之一。隨著編程經驗的日積月累,越來越感覺到了解虛擬機相關要領的重要性。 JVM 源碼分析之 Jstat 工具原理完全解讀 http://click.aliyun.com/m/8315/ JVM 源碼分析之 Jstat 工具原理完全解讀 http:...
摘要:在中一般來說通過來創建所需要的線程池,如高并發原理初探后端掘金閱前熱身為了更加形象的說明同步異步阻塞非阻塞,我們以小明去買奶茶為例。 AbstractQueuedSynchronizer 超詳細原理解析 - 后端 - 掘金今天我們來研究學習一下AbstractQueuedSynchronizer類的相關原理,java.util.concurrent包中很多類都依賴于這個類所提供的隊列式...
閱讀 2404·2021-10-14 09:43
閱讀 2435·2021-09-09 09:34
閱讀 1601·2019-08-30 12:57
閱讀 1198·2019-08-29 14:16
閱讀 718·2019-08-26 12:13
閱讀 3201·2019-08-26 11:45
閱讀 2282·2019-08-23 16:18
閱讀 2652·2019-08-23 15:27