摘要:提交任務當創建了一個線程池之后我們就可以將任務提交到線程池中執行了。提交任務到線程池中相當簡單,我們只要把原來傳入類構造器的對象傳入線程池的方法或者方法就可以了。
我們一般不會選擇直接使用線程類Thread進行多線程編程,而是使用更方便的線程池來進行任務的調度和管理。線程池就像共享單車,我們只要在我們有需要的時候去獲取就可以了。甚至可以說線程池更棒,我們只需要把任務提交給它,它就會在合適的時候運行了。但是如果直接使用Thread類,我們就需要在每次執行任務時自己創建、運行、等待線程了,而且很難對線程進行整體的管理,這可不是一件輕松的事情。既然我們已經有了線程池,那還是把這些麻煩事交給線程池來處理吧。
之前一篇介紹線程池使用及其源碼的文章篇幅太長了、跨度太大了一些,感覺不是很好理解。所以我把內容重新組織了一下,拆為了兩篇文章,并且補充了一些內容,希望能讓大家更容易地理解相關內容。
這篇文章將從線程池的概念與一般使用入手,首先介紹線程池的一般使用。然后詳細介紹線程池中常用的可配置項,例如任務隊列、拒絕策略等,最后會介紹四種常用的線程池配置。通過這篇文章,大家可以熟練掌握線程池的使用方式,在實踐中游刃有余地使用線程池對線程進行靈活的調度。
閱讀本文需要對多線程編程有基本的認識,例如什么是線程、多線程解決的是什么問題等。不了解的讀者可以參考一下我之前發布的一篇文章《這一次,讓我們完全掌握Java多線程(2/10)》
一般我們最常用的線程池實現類是ThreadPoolExecutor,我們接下來會介紹這個類的基本使用方法。JDK已經對線程池做了比較好的封裝,相信這個過程會非常輕松。
線程池的基本使用 創建線程池既然線程池是一個Java類,那么最直接的使用方法一定是new一個ThreadPoolExecutor類的對象,例如ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue
當創建了一個線程池之后我們就可以將任務提交到線程池中執行了。提交任務到線程池中相當簡單,我們只要把原來傳入Thread類構造器的Runnable對象傳入線程池的execute方法或者submit方法就可以了。execute方法和submit方法基本沒有區別,兩者的區別只是submit方法會返回一個Future對象,用于檢查異步任務的執行情況和獲取執行結果(異步任務完成后)。
我們可以先試試如何使用比較簡單的execute方法,代碼例子如下:
public class ThreadPoolTest { private static int count = 0; public static void main(String[] args) throws Exception { Runnable task = new Runnable() { public void run() { for (int i = 0; i < 1000000; ++i) { synchronized (ThreadPoolTest.class) { count += 1; } } } }; // 重要:創建線程池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); // 重要:向線程池提交兩個任務 threadPool.execute(task); threadPool.execute(task); // 等待線程池中的所有任務完成 threadPool.shutdown(); while (!threadPool.awaitTermination(1L, TimeUnit.MINUTES)) { System.out.println("Not yet. Still waiting for termination"); } System.out.println("count = " + count); } }
運行之后得到的結果是兩百萬,我們成功實現了第一個使用線程池的程序。那么回到剛才的問題,創建線程池時傳入的那些參數有什么作用的呢?
深入解析線程池 創建線程池的參數下面是ThreadPoolExecutor的構造器定義:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueueworkQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
各個參數分別表示下面的含義:
corePoolSize,核心線程池大小,一般線程池會至少保持這么多的線程數量;
maximumPoolSize,最大線程池大小,也就是線程池最大的線程數量;
keepAliveTime和unit共同組成了一個超時時間,keepAliveTime是時間數量,unit是時間單位,單位加數量組成了最終的超時時間。這個超時時間表示如果線程池中包含了超過corePoolSize數量的線程,則在有線程空閑的時間超過了超時時間時該線程就會被銷毀;
workQueue是任務的阻塞隊列,在沒有線程池中沒有足夠的線程可用的情況下會將任務先放入到這個阻塞隊列中等待執行。這里傳入的隊列類型就決定了線程池在處理這些任務時的策略,具體類型會在下文中介紹;
threadFactory,線程的工廠對象,線程池通過該對象創建線程。我們可以通過傳入自定義的實現了ThreadFactory接口的類來修改線程的創建邏輯,可以不傳,默認使用Executors.defaultThreadFactory()作為默認的線程工廠;
handler,拒絕策略,在線程池無法執行或保存新提交的任務時進行處理的對象,常用的有以下幾種策略類:
ThreadPoolExecutor.AbortPolicy,默認策略,行為是直接拋出RejectedExecutionException異常
ThreadPoolExecutor.CallerRunsPolicy,用調用者所在的線程來執行任務
ThreadPoolExecutor.DiscardOldestPolicy,丟棄阻塞隊列中最早提交的任務,并重試execute方法
ThreadPoolExecutor.DiscardPolicy,靜默地直接丟棄任務,不返回任何錯誤
看到這里可能大部分讀者并不能理解每個參數具體的作用,接下來我們就通過線程池源代碼中使用了這些參數配置的代碼來深入理解每一個參數的意義。
execute方法的實現我們一般會使用execute方法提交我們的任務,那么線程池在這個過程中做了什么呢?在ThreadPoolExecutor類的execute()方法的源代碼中,我們主要做了四件事:
如果當前線程池中的線程數小于核心線程數corePoolSize,則通過threadFactory創建一個新的線程,并把入參中的任務作為第一個任務傳入該線程;
如果當前線程池中的線程數已經達到了核心線程數corePoolSize,那么就會通過阻塞隊列workerQueue的offer方法來將任務添加到隊列中保存,并等待線程空閑后進行執行;
如果線程數已經達到了corePoolSize且阻塞隊列中無法插入該任務(比如已滿),那么線程池就會再增加一個線程來執行該任務,除非線程數已經達到了最大線程數maximumPoolSize;
如果確實已經達到了最大線程數,那么就會通過拒絕策略對象handler拒絕這個任務。
總體上的執行流程如下,左側的實心黑點代表流程開始,下方的黑色同心圓代表流程結束:
上面提到了線程池構造器參數中除了超時時間之外的所有參數的作用,相信大家根據上面的流程已經可以理解每個參數的意義了。但是有一個名詞我們還一直沒有深入講解,那就是阻塞隊列的含義。
線程池中的阻塞隊列線程池中的阻塞隊列專門用于存放需要等待線程空閑的待執行任務,而阻塞隊列是這樣的一種數據結構,它是一個隊列(類似于一個List),可以存放0到N個元素。我們可以對這個隊列進行插入和彈出元素的操作,彈出操作可以理解為是一個獲取并從隊列中刪除一個元素的操作。當隊列中沒有元素時,對這個隊列的獲取操作將會被阻塞,直到有元素被插入時才會被喚醒;當隊列已滿時,對這個隊列的插入操作將會被阻塞,直到有元素被彈出后才會被喚醒。
這樣的一種數據結構非常適合于線程池的場景,當一個工作線程沒有任務可處理時就會進入阻塞狀態,直到有新任務提交后才被喚醒。
在線程池中,不同的阻塞隊列類型會被線程池的行為產生不同的影響,下面是三種我們最常用的阻塞隊列類型:
直連隊列,以SynchronousQueue類為代表,隊列不會存儲任何任務。當有任務提交線程試圖向隊列中添加待執行任務時會被阻塞,直到有任務處理線程試圖從隊列中獲取待執行任務時會與阻塞狀態中的任務提交線程發生直接聯系,由任務提交線程把任務直接交給任務執行線程;
無界隊列,以LinkedBlockingQueue類為代表,隊列中可以存儲無限數量的任務。這種隊列永遠不會因為隊列已滿導致任務放入隊列失敗,所以結合前面介紹的流程我們可以發現,當使用無界隊列時,線程池中的線程最多只能達到核心線程數就不會再增長了,最大線程數maximumPoolSize參數不會產生作用;
有界隊列,以ArrayBlockingQueue類為代表,可以保存固定數量的任務。這種隊列在實踐中比較常用,因為它既不會因為保存太多任務導致資源消耗過多(無界隊列),又不會因為任務提交線程被阻塞而影響到系統的性能(直連隊列)。總體上來說,有界隊列在實際效果上比較均衡。
閱讀execute方法的源碼在IDE中,例如IDEA里,我們可以點擊我們樣例代碼里的ThreadPoolExecutor類跳轉到JDK中ThreadPoolExecutor類的源代碼。在源代碼中我們可以看到很多java.util.concurrent包的締造者大牛“Doug Lea”所留下的各種注釋,下面的圖片就是該類源代碼的一個截圖。
這些注釋的內容非常有參考價值,建議有能力的讀者朋友可以自己閱讀一遍。下面,我們就一步步地抽絲剝繭,來揭開線程池類ThreadPoolExecutor源代碼的神秘面紗。不過這一步并不是必須的,可以跳過。
下面是ThreadPoolExecutor中execute方法帶有中文解釋的源代碼,有興趣的朋友可以和上面的流程對照起來參考一下:
public void execute(Runnable command) { // 檢查提交的任務是否為空 if (command == null) throw new NullPointerException(); // 獲取控制變量值 int c = ctl.get(); // 檢查當前線程數是否達到了核心線程數 if (workerCountOf(c) < corePoolSize) { // 未達到核心線程數,則創建新線程 // 并將傳入的任務作為該線程的第一個任務 if (addWorker(command, true)) // 添加線程成功則直接返回,否則繼續執行 return; // 因為前面調用了耗時操作addWorker方法 // 所以線程池狀態有可能發生了改變,重新獲取狀態值 c = ctl.get(); } // 判斷線程池當前狀態是否是運行中 // 如果是則調用workQueue.offer方法將任務放入阻塞隊列 if (isRunning(c) && workQueue.offer(command)) { // 因為執行了耗時操作“放入阻塞隊列”,所以重新獲取狀態值 int recheck = ctl.get(); // 如果當前狀態不是運行中,則將剛才放入阻塞隊列的任務拿出,如果拿出成功,則直接拒絕這個任務 if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) // 如果線程池中沒有線程了,那就創建一個 addWorker(null, false); } // 如果放入阻塞隊列失敗(如隊列已滿),則添加一個線程 else if (!addWorker(command, false)) // 如果添加線程失敗(如已經達到了最大線程數),則拒絕任務 reject(command); }
在這段源代碼中,我們可以看到,線程池是通過addWorker方法來創建線程的,這里的這個Worker指的就是ThreadPoolExecutor類中用來對線程進行包裝和管理的Worker類對象。如果想了解Worker類的具體執行流程可以閱讀一下下一篇深入剖析線程池的任務執行流程的文章。
超時時間那么還有一個我們沒有提到的超時時間在這個過程中發揮了什么作用呢?從前面我們可以看出,線程數量被劃分為了核心線程數和最大線程數。當線程沒有任務可執行時會阻塞在從隊列中獲取新任務這個操作上,這時我們稱這個線程為空閑線程,一旦有新任務被提交,則該線程就會退出阻塞狀態并開始執行這個新任務。
如果當前線程池中的線程總數大于核心線程數,那么只要有線程的空閑時間超過了超時時間,那么這個線程就會被銷毀;如果線程池中的線程總數小于等于核心線程數,那么超時線程就不會被銷毀了(除了一些特殊情況外)。這也就是超時時間參數所發揮的作用了。
其他線程池操作 關閉線程池在之前使用線程池執行任務的代碼中為了等待線程池中的所有任務執行完已經使用了shutdown()方法,這是關閉線程池的一種方法。對于ThreadPoolExecutor,關閉線程池的方法主要有兩個:
shutdown(),有序關閉線程池,調用后線程池會讓已經提交的任務完成執行,但是不會再接受新任務。
shutdownNow(),直接關閉線程池,線程池中正在運行的任務會被中斷,正在等待執行的任務不會再被執行,但是這些還在阻塞隊列中等待的任務會被作為返回值返回。
監控線程池運行狀態我們可以通過調用線程池對象上的一些方法來獲取線程池當前的運行信息,常用的方法有:
getTaskCount,線程池中已完成、執行中、等待執行的任務總數估計值。因為在統計過程中任務會發生動態變化,所以最后的結果并不是一個準確值;
getCompletedTaskCount,線程池中已完成的任務總數,這同樣是一個估計值;
getLargestPoolSize,線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否充滿過,也就是達到過maximumPoolSize;
getPoolSize,線程池當前的線程數量;
getActiveCount,當前線程池中正在執行任務的線程數量估計值。
四種常用線程池很多情況下我們也不會直接創建ThreadPoolExecutor類的對象,而是根據需要通過Executors的幾個靜態方法來創建特定用途的線程池。目前常用的線程池有四種:
可緩存線程池,使用Executors.newCachedThreadPool方法創建
定長線程池,使用Executors.newFixedThreadPool方法創建
延時任務線程池,使用Executors.newScheduledThreadPool方法創建
單線程線程池,使用Executors.newSingleThreadExecutor方法創建
下面通過這些靜態方法的源碼來具體了解一下不同類型線程池的特性與適用場景。
可緩存線程池JDK中的源碼我們通過在IDE中進行跳轉可以很方便地進行查看,下面就是Executors.newCachedThreadPool方法中的源代碼。從代碼中我們可以看到,可緩存線程池其實也是通過直接創建ThreadPoolExecutor類的構造器創建的,只是其中的參數都已經被設置好了,我們可以不用做具體的設置。所以我們要觀察的重點就是在這個方法中具體產生了一個怎樣配置的ThreadPoolExecutor對象,以及這樣的線程池適用于怎樣的場景。
從下面的代碼中,我們可以看到,傳入ThreadPoolExecutor構造器的值有:
- corePoolSize核心線程數為0,代表線程池中的線程數可以為0 - maximumPoolSize最大線程數為Integer.MAX_VALUE,代表線程池中最多可以有無限多個線程 - 超時時間設置為60秒,表示線程池中的線程在空閑60秒后會被回收 - 最后傳入的是一個`SynchronousQueue`類型的阻塞隊列,代表每一個新添加的任務都要馬上有一個工作線程進行處理
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
所以可緩存線程池在添加任務時會優先使用空閑的線程,如果沒有就創建一個新線程,線程數沒有上限,所以每一個任務都會馬上被分配到一個工作線程進行執行,不需要在阻塞隊列中等待;如果線程池長期閑置,那么其中的所有線程都會被銷毀,節約系統資源。
優點
任務在添加后可以馬上執行,不需要進入阻塞隊列等待
在閑置時不會保留線程,可以節約系統資源
缺點
對線程數沒有限制,可能會過量消耗系統資源
適用場景
適用于大量短耗時任務和對響應時間要求較高的場景
定長線程池傳入ThreadPoolExecutor構造器的值有:
corePoolSize核心線程數和maximumPoolSize最大線程數都為固定值nThreads,即線程池中的線程數量會保持在nThreads,所以被稱為“定長線程池”
超時時間被設置為0毫秒,因為線程池中只有核心線程,所以不需要考慮超時釋放
最后一個參數使用了無界隊列,所以在所有線程都在處理任務的情況下,可以無限添加任務到阻塞隊列中等待執行
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
定長線程池中的線程數會逐步增長到nThreads個,并且在之后空閑線程不會被釋放,線程數會一直保持在nThreads個。如果添加任務時所有線程都處于忙碌狀態,那么就會把任務添加到阻塞隊列中等待執行,阻塞隊列中任務的總數沒有上限。
優點
線程數固定,對系統資源的消耗可控
缺點
在任務量暴增的情況下線程池不會彈性增長,會導致任務完成時間延遲
使用了無界隊列,在線程數設置過小的情況下可能會導致過多的任務積壓,引起任務完成時間過晚和資源被過度消耗的問題
適用場景
任務量峰值不會過高,且任務對響應時間要求不高的場景
延時任務線程池與之前的兩個方法不同,Executors.newScheduledThreadPool返回的是ScheduledExecutorService接口對象,可以提供延時執行、定時執行等功能。在線程池配置上有如下特點:
maximumPoolSize最大線程數為無限,在任務量較大時可以創建大量新線程執行任務
超時時間為0,線程空閑后會被立即銷毀
使用了延時工作隊列,延時工作隊列中的元素都有對應的過期時間,只有過期的元素才會被彈出
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
延時任務線程池實現了ScheduledExecutorService接口,主要用于需要延時執行和定時執行的情況。
單線程線程池單線程線程池中只有一個工作線程,可以保證添加的任務都以指定順序執行(先進先出、后進先出、優先級)。但是如果線程池里只有一個線程,為什么我們還要用線程池而不直接用Thread呢?這種情況下主要有兩種優點:一是我們可以通過共享的線程池很方便地提交任務進行異步執行,而不用自己管理線程的生命周期;二是我們可以使用任務隊列并指定任務的執行順序,很容易做到任務管理的功能。
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue總結())); }
在這篇文章中我們從線程池的概念和基本使用方法說起,通過execute方法的源碼深入剖析了任務提交的全過程和各個線程池構造器參數在線程池實際運行過程中所發揮的作用,還真正閱讀了線程池類ThreadPoolExecutor的execute方法的源代碼。最后,我們介紹了線程池的其他常用操作和四種常用的線程池。
到這里我們的線程池源代碼之旅就結束了,希望大家在看完這篇文章之后能對線程池的使用和運行流程有了一個大概的印象。為什么說只是有了一個大概的印象呢?因為我覺得很多沒有相關基礎的讀者讀到這里可能還只是對線程池有了一個自己的認識,對其中的一些細節可能還沒有完全捕捉到。所以我建議大家在看完這篇文章后不妨再返回到文章的開頭多讀幾遍,相信第二遍的閱讀能給大家帶來不一樣的體驗,因為我自己也是在第三次讀ThreadPoolExecutor類的源代碼時才真正打通了其中的一些重要關節的。
引子在這篇文章中,我們還只是探究了線程池的基本使用方法,以及提交任務方法execute的源代碼。那么在任務提交以后是怎么被線程池所執行的呢?在下一篇文章中我們就可以找到答案,在下一篇文章中,我們會深入剖析線程池的任務執行流程。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/73860.html
摘要:高并發系列第篇文章。簡單的說,在使用了線程池之后,創建線程變成了從線程池中獲取一個空閑的線程,然后使用,關閉線程變成了將線程歸還到線程池。如果調用了線程池的方法,線程池會提前把核心線程都創造好,并啟動線程池允許創建的最大線程數。 java高并發系列第18篇文章。 本文主要內容 什么是線程池 線程池實現原理 線程池中常見的各種隊列 自定義線程創建的工廠 常見的飽和策略 自定義飽和策略 ...
摘要:那么線程池到底是怎么利用類來實現持續不斷地接收提交的任務并執行的呢接下來,我們通過的源代碼來一步一步抽絲剝繭,揭開線程池運行模型的神秘面紗。 在上一篇文章《從0到1玩轉線程池》中,我們了解了線程池的使用方法,以及向線程池中提交任務的完整流程和ThreadPoolExecutor.execute方法的源代碼。在這篇文章中,我們將會從頭閱讀線程池ThreadPoolExecutor類的源代...
摘要:從使用到原理學習線程池關于線程池的使用,及原理分析分析角度新穎面向切面編程的基本用法基于注解的實現在軟件開發中,分散于應用中多出的功能被稱為橫切關注點如事務安全緩存等。 Java 程序媛手把手教你設計模式中的撩妹神技 -- 上篇 遇一人白首,擇一城終老,是多么美好的人生境界,她和他歷經風雨慢慢變老,回首走過的點點滴滴,依然清楚的記得當初愛情萌芽的模樣…… Java 進階面試問題列表 -...
閱讀 826·2021-11-22 11:59
閱讀 3230·2021-11-17 09:33
閱讀 2308·2021-09-29 09:34
閱讀 1941·2021-09-22 15:25
閱讀 1955·2019-08-30 15:55
閱讀 1321·2019-08-30 15:55
閱讀 530·2019-08-30 15:53
閱讀 3346·2019-08-29 13:55