摘要:支持通過調整構造參數來配置不同的處理策略,本文主要介紹常用的策略配置方法以及應用場景。對于這種場景,我們可以設置使用帶有長度限制的隊列以及限定最大線程個數的線程池,同時通過設置處理任務被拒絕的情況。
ThreadPoolExecutor 是用來處理異步任務的一個接口,可以將其理解成為一個線程池和一個任務隊列,提交到 ExecutorService 對象的任務會被放入任務隊或者直接被線程池中的線程執行。ThreadPoolExecutor 支持通過調整構造參數來配置不同的處理策略,本文主要介紹常用的策略配置方法以及應用場景。
ThreadPoolExecutor 的處理邏輯首先看一下 ThreadPoolExecutor 構造函數的定義:
public ThreadPoolExecutor(int corePoolSize, //線程池核心線程數量 int maximumPoolSize, //線程池最大線程數量 long keepAliveTime, //線程KeepAlive時間,當線程池數量超過核心線程數量以后,idle時間超過這個值的線程會被終止 TimeUnit unit, //線程KeepAlive時間單位 BlockingQueueworkQueue, //任務隊列 ThreadFactory threadFactory, //創建線程的工廠對象 RejectedExecutionHandler handler) //任務被拒絕后調用的handler
ThreadPoolExecutor 對線程池和隊列的使用方式如下:
從線程池中獲取可用線程執行任務,如果沒有可用線程則使用ThreadFactory創建新的線程,直到線程數達到corePoolSize限制
線程池線程數達到corePoolSize以后,新的任務將被放入隊列,直到隊列不能再容納更多的任務
當隊列不能再容納更多的任務以后,會創建新的線程,直到線程數達到maxinumPoolSize限制
線程數達到maxinumPoolSize限制以后新任務會被拒絕執行,調用 RejectedExecutionHandler 進行處理
三種常用的 ThreadPoolExecutorExecutors 是提供了一組工廠方法用于創建常用的 ExecutorService ,分別是 FixedThreadPool,CachedThreadPool 以及 SingleThreadExecutor。這三種ThreadPoolExecutor都是調用 ThreadPoolExecutor 構造函數進行創建,區別在于參數不同。
FixedThreadPool - 線程池大小固定,任務隊列無界下面是 Executors 類 newFixedThreadPool 方法的源碼:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); }
可以看到 corePoolSize 和 maximumPoolSize 設置成了相同的值,此時不存在線程數量大于核心線程數量的情況,所以KeepAlive時間設置不會生效。任務隊列使用的是不限制大小的 LinkedBlockingQueue ,由于是無界隊列所以容納的任務數量沒有上限。
因此,FixedThreadPool的行為如下:
從線程池中獲取可用線程執行任務,如果沒有可用線程則使用ThreadFactory創建新的線程,直到線程數達到nThreads
線程池線程數達到nThreads以后,新的任務將被放入隊列
FixedThreadPool的優點是能夠保證所有的任務都被執行,永遠不會拒絕新的任務;同時缺點是隊列數量沒有限制,在任務執行時間無限延長的這種極端情況下會造成內存問題。
SingleThreadExecutor - 線程池大小固定為1,任務隊列無界public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); }
這個工廠方法中使用無界LinkedBlockingQueue,并的將線程數設置成1,除此以外還使用FinalizableDelegatedExecutorService類進行了包裝。這個包裝類的主要目的是為了屏蔽ThreadPoolExecutor中動態修改線程數量的功能,僅保留ExecutorService中提供的方法。雖然是單線程處理,一旦線程因為處理異常等原因終止的時候,ThreadPoolExecutor會自動創建一個新的線程繼續進行工作。
SingleThreadExecutor 適用于在邏輯上需要單線程處理任務的場景,同時無界的LinkedBlockingQueue保證新任務都能夠放入隊列,不會被拒絕;缺點和FixedThreadPool相同,當處理任務無限等待的時候會造成內存問題。
CachedThreadPool - 線程池無限大(MAX INT),等待隊列長度為1public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); }
SynchronousQueue是一個只有1個元素的隊列,入隊的任務需要一直等待直到隊列中的元素被移出。核心線程數是0,意味著所有任務會先入隊列;最大線程數是Integer.MAX_VALUE,可以認為線程數量是沒有限制的。KeepAlive時間被設置成60秒,意味著在沒有任務的時候線程等待60秒以后退出。CachedThreadPool對任務的處理策略是提交的任務會立即分配一個線程進行執行,線程池中線程數量會隨著任務數的變化自動擴張和縮減,在任務執行時間無限延長的極端情況下會創建過多的線程。
三種ExecutorService特性總結類型 | 核心線程數 | 最大線程數 | Keep Alive 時間 | 任務隊列 | 任務處理策略 |
---|---|---|---|---|---|
FixedThreadPool | 固定大小 | 固定大小(與核心線程數相同) | 0 | LinkedBlockingQueue | 線程池大小固定,沒有可用線程的時候任務會放入隊列等待,隊列長度無限制 |
SingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue | 與 FixedThreadPool 相同,區別在于線程池的大小為1,適用于業務邏輯上只允許1個線程進行處理的場景 |
CachedThreadPool | 0 | Integer.MAX_VALUE | 1分鐘 | SynchronousQueue | 線程池的數量無限大,新任務會直接分配或者創建一個線程進行執行 |
我們也可以通過修改 ThreadPoolExecutor 的構造函數來自定義任務處理策略。例如面對的業務是將數據異步寫入HBase,當HBase嚴重超時的時候允許寫入失敗并記錄日志以便事后補寫。對于這種應用場景,如果使用FixedThreadPool,在HBase服務嚴重超時的時候會導致隊列無限增長,引發內存問題;如果使用CachedThreadPool,會導致線程數量無限增長。對于這種場景,我們可以設置ExecutorService使用帶有長度限制的隊列以及限定最大線程個數的線程池,同時通過設置RejectedExecutionHandler處理任務被拒絕的情況。
首先定義 RejectedExecutionHandler:
public class MyRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 處理任務被拒絕的情況,例如記錄日志等 } }
創建 ThreadPoolExecutor:
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 10, //核心線程數設置成10 30, //線程池最大線程數為30 30, TimeUnit.SECONDS, //超過核心線程數量的線程idle 30秒之后會退出 new ArrayBlockingQueue(100), //隊列長度為100 new MyRejectedExecutionHandler() //任務被拒絕以后的處理類 );
這樣設置以后,如果任務處理函數出現長時間掛起的情況,會依次發生下列現象:
線程池線程數量達到核心線程數,向ArrayBlockingQueue中放入任務
ArrayBlockingQueue達到上限,創建新的線程進行處理
線程池中的線程數量達到30個,調用MyRejectedExecutionHandler處理新提交的任務
總結對于需要保證所有提交的任務都要被執行的情況,使用FixedThreadPool
如果限定只能使用一個線程進行任務處理,使用SingleThreadExecutor
如果希望提交的任務盡快分配線程執行,使用CachedThreadPool
如果業務上允許任務執行失敗,或者任務執行過程可能出現執行時間過長進而影響其他業務的應用場景,可以通過使用限定線程數量的線程池以及限定長度的隊列進行容錯處理。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69864.html
摘要:限制線程池運行線程以及等待線程數量的策略對于所提供的,可以保證可以在內存中有固定數量的線程數運行。指的是當線程池拒絕該任務的時候,線程在本地線程直接。由此限制了線程池的等待線程數與執行線程數 限制Java線程池運行線程以及等待線程數量的策略 對于java.util.concurrent.Executors所提供的FixedThreadPool,可以保證可以在內存中有固定數量的線程數運行...
摘要:去美團面試,問到了什么是線程池,如何使用,為什么要用以下做個總結。二線程池線程池的作用線程池作用就是限制系統中執行線程的數量。真正的線程池接口是。創建固定大小的線程池。此線程池支持定時以及周期性執行任務的需求。 去美團面試,問到了什么是線程池,如何使用,為什么要用,以下做個總結。關于線程之前也寫過一篇文章《高級面試題總結—線程池還能這么玩?》 1、什么是線程池:? java.util...
摘要:去美團面試,問到了什么是線程池,如何使用,為什么要用以下做個總結。二線程池線程池的作用線程池作用就是限制系統中執行線程的數量。真正的線程池接口是。創建固定大小的線程池。此線程池支持定時以及周期性執行任務的需求。 去美團面試,問到了什么是線程池,如何使用,為什么要用,以下做個總結。關于線程之前也寫過一篇文章《高級面試題總結—線程池還能這么玩?》 1、什么是線程池:? java.util...
閱讀 2371·2021-11-24 10:31
閱讀 3426·2021-11-23 09:51
閱讀 2239·2021-11-15 18:11
閱讀 2386·2021-09-02 15:15
閱讀 2452·2019-08-29 17:02
閱讀 2284·2019-08-29 15:04
閱讀 830·2019-08-29 12:27
閱讀 2853·2019-08-28 18:15