摘要:線程池主要解決兩個問題一是當執行大量異步任務時線程池能夠提供很好的性能。二是線程池提供了一種資源限制和管理的手段,比如可以限制現成的個數,動態新增線程等。該方法返回一個對象,可指定線程池線程數量。
什么是線程池?
為了避免頻繁重復的創建和銷毀線程,我們可以讓這些線程進行復用,在線程池中,總會有活躍的線程在占用,但是線程池中也會存在沒有占用的線程,這些線程處于空閑狀態,當有任務的時候會從池子里面拿去一個線程來進行使用,當完成工作后,并沒有銷毀線程,而是將將線程放回到池子中去。
線程池主要解決兩個問題:一是當執行大量異步任務時線程池能夠提供很好的性能。
二是線程池提供了一種資源限制和管理的手段,比如可以限制現成的個數,動態新增線程等。
? -《Java并發編程之美》
上面內容出自《Java并發編程之美》這本書,第一個問題上面已經提到過,線程的頻繁創建和銷毀是很損耗性能的,但是線程池中的線程是可以復用的,可以較好的提升性能問題,線程池內部是采用了阻塞隊列來維護Runnable對象。
原理分析JDK為我們封裝了一套操作多線程的框架Executors,幫助我們可以更好的控制線程池,Executors下提供了一些線程池的工廠方法:
newFixedThreadPool:返回固定長度的線程池,線程池中的線程數量是固定的。
newCacheThreadPool:該方法返回一個根據實際情況來進行調整線程數量的線程池,空余線程存活時間是60s
newSingleThreadExecutor:該方法返回一個只有一個線程的線程池。
newSingleThreadScheduledExecutor:該方法返回一個SchemeExecutorService對象,線程池大小為1,SchemeExecutorService接口在ThreadPoolExecutor類和 ExecutorService接口之上的擴展,在給定時間執行某任務。
newSchemeThreadPool:該方法返回一個SchemeExecutorService對象,可指定線程池線程數量。
對于核心的線程池來說,它內部都是使用了ThreadPoolExecutor對象來實現的,只不過內部參數信息不一樣,我們先來看兩個例子:nexFixedThreadPool和newSingleThreadExecutor如下所示:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue ())); }
由上面的線程池的創建過程可以看到它們都是ThreadPoolExecutor的封裝,接下來我們來看一下ThreadPoolExecutor的參數說明:
參數名稱 | 參數描述 |
---|---|
corePoolSize | 指定線程池線程的數量 |
maximumPoolSize | 指定線程池中線程的最大數量 |
keepAliveTime | 當線程池線程的數量超過corePoolSize的時候,多余的空閑線程存活的時間,如果超過了corePoolSize,在keepAliveTime的時間之后,銷毀線程 |
unit | keepAliveTime的單位 |
workQueue | 工作隊列,將被提交但尚未執行的任務緩存起來 |
threadFactory | 線程工廠,用于創建線程,不指定為默認線程工廠DefaultThreadFactory |
handler | 拒絕策略 |
其中workQueue代表的是提交但未執行的隊列,它是BlockingQueue接口的對象,用于存放Runable對象,主要分為以下幾種類型:
直接提交的隊列:SynchronousQueue隊列,它是一個沒有容量的隊列,前面我有對其進行講解,當線程池進行入隊offer操作的時候,本身是無容量的,所以直接返回false,并沒有保存下來,而是直接提交給線程來進行執行,如果沒有空余的線程則執行拒絕策略。
有界的任務隊列:可以使用ArrayBlockingQueue隊列,因為它內部是基于數組來進行實現的,初始化時必須指定容量參數,當使用有界任務隊列時,當有任務進行提交時,線程池的線程數量小于corePoolSize則創建新的線程來執行任務,當線程池的線程數量大于corePoolSize的時候,則將提交的任務放入到隊列中,當提交的任務塞滿隊列后,如果線程池的線程數量沒有超過maximumPoolSize,則創建新的線程執行任務,如果超過了maximumPoolSize則執行拒絕策略。
無界的任務隊列:可以使用LinkedBlockingQueue隊列,它內部是基于鏈表的形式,默認隊列的長度是Integer.MAX_VALUE,也可以指定隊列的長度,當隊列滿時進行阻塞操作,當然線程池中采用的是offer方法并不會阻塞線程,當隊列滿時則返回false,入隊成功則則返回true,當使用LinkedBlockingQueue隊列時,有任務提交到線程池時,如果線程池的數量小于corePoolSize,線程池會產生新的線程來執行任務,當線程池的線程數量大于corePoolSize時,則將提交的任務放入到隊列中,等待執行任務的線程執行完之后進行消費隊列中的任務,若后續仍有新的任務提交,而沒有空閑的線程時,它會不斷往隊列中入隊提交的任務,直到資源耗盡。
優先任務隊列:t有限任務隊列是帶有執行優先級的隊列,他可以使用PriorityBlockingQueue隊列,可以控制任務的執行先后順序,它是一個無界隊列,該隊列可以根據任務自身的優先級順序先后執行,在確保性能的同時,也能有很好的質量保證。
上面講解了關于線程池內部都是通過ThreadPoolExecutor來進行實現的,那么下面我以一個例子來進行源碼分析:
public class ThreadPoolDemo1 { public static void main(String[] args) { ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5), new CustomThreadFactory()); for (int i = 0; i < 15; i++) { executorService.execute(() -> { try { Thread.sleep(50000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("由線程:" + Thread.currentThread().getName() + "執行任務完成"); }); } } }
上面定義了一個線程池,線程池初始化的corePoolSize為5,也就是線程池中線程的數量為5,最大線程maximumThreadPoolSize為10,空余的線程存活的時間是60s,使用LinkedBlockingQueue來作為阻塞隊列,這里還發現我自定義了ThreadFactory線程池工廠,這里我真是針對線程創建的時候輸出線程池的名稱,源碼如下所示:
/** * 自定義的線程池構造工廠 */ public class CustomThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; public CustomThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } @Override public Thread newThread(Runnable r) { String name = namePrefix + threadNumber.getAndIncrement(); Thread t = new Thread(group, r, name, 0); System.out.println("線程池創建,線程名稱為:" + name); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
代碼和DefaultThreadFactory一樣,只是在newThread新建線程的動作的時候輸出了線程池的名稱,方便查看線程創建的時機,上面main方法中提交了15個任務,調用了execute方法來進行提交任務,在分析execute方法之前我們先了解一下線程的狀態:
//假設Integer類型是32位的二進制表示。 //高3位代表線程池的狀態,低29位代表的是線程池的數量 //默認是RUNNING狀態,線程池的數量為0 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //線程個數位數,表示的Integer中除去最高的3位之后剩下的位數表示線程池的個數 private static final int COUNT_BITS = Integer.SIZE - 3; //線程池的線程的最大數量 //這里舉例是32為機器,表示為00011111111111111111111111111111 private static final int CAPACITY = (1 << COUNT_BITS) - 1; //線程池的狀態 // runState is stored in the high-order bits //11100000000000000000000000000000 //接受新任務并且處理阻塞隊列里面任務 private static final int RUNNING = -1 << COUNT_BITS; //00000000000000000000000000000000 //拒絕新任務但是處理阻塞隊列的任務 private static final int SHUTDOWN = 0 << COUNT_BITS; //00100000000000000000000000000000 //拒接新任務并且拋棄阻塞隊列里面的任務,同時會中斷正在處理的任務 private static final int STOP = 1 << COUNT_BITS; //01000000000000000000000000000000 //所有任務都執行完(包括阻塞隊列中的任務)后當線程池活動線程數為0,將要調用terminated方法。 private static final int TIDYING = 2 << COUNT_BITS; //01100000000000000000000000000000 //終止狀態,terminated方法調用完成以后的狀態 private static final int TERMINATED = 3 << COUNT_BITS;
通過上面內容可以看到ctl其實存放的是線程池的狀態和線程數量的變量,默認是RUNNING,也就是11100000000000000000000000000000,這里我們來假設運行的機器上的Integer的是32位的,因為有些機器上可能Integer并不是32位,下面COUNT_BITS來控制位數,也就是先獲取Integer在該平臺上的位數,比如說是32位,然后32位-3位=29位,也就是低29位代表的是現成的數量,高3位代表線程的狀態,可以清晰看到下面的線程池的狀態都是通過低位來進行向左位移的操作的,除了上面的變量,還提供了操作線程池狀態的方法:
// 操作ctl變量,主要是進行分解或組合線程數量和線程池狀態。 // 獲取高3位,獲取線程池狀態 private static int runStateOf(int c) { return c & ~CAPACITY; } // 獲取低29位,獲取線程池中線程的數量 private static int workerCountOf(int c) { return c & CAPACITY; } // 組合ctl變量,rs=runStatue代表的是線程池的狀態,wc=workCount代表的是線程池線程的數量 private static int ctlOf(int rs, int wc) { return rs | wc; } /* * Bit field accessors that don"t require unpacking ctl. * These depend on the bit layout and on workerCount being never negative. */ //指定的線程池狀態c小于狀態s private static boolean runStateLessThan(int c, int s) { return c < s; } //指定的線程池狀態c至少是狀態s private static boolean runStateAtLeast(int c, int s) { return c >= s; } // 判斷線程池是否運行狀態 private static boolean isRunning(int c) { return c < SHUTDOWN; } /** * CAS增加線程池線程數量. */ private boolean compareAndIncrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect + 1); } /** * CAS減少線程池線程數量 */ private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } /** * 將線程池的線程數量進行較少操作,如果競爭失敗直到競爭成功為止。 */ private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); }
下來我們看一下ThreadPoolExecutor對象下的execute方法:
public void execute(Runnable command) { // 判斷提交的任務是不是為空,如果為空則拋出NullPointException異常 if (command == null) throw new NullPointerException(); // 獲取線程池的狀態和線程池的數量 int c = ctl.get(); // 如果線程池的數量小于corePoolSize,則進行添加線程執行任務 if (workerCountOf(c) < corePoolSize) { //添加線程修改線程數量并且將command作為第一個任務進行處理 if (addWorker(command, true)) return; // 獲取最新的狀態 c = ctl.get(); } // 如果線程池的狀態是RUNNING,將命令添加到隊列中 if (isRunning(c) && workQueue.offer(command)) { //二次檢查線程池狀態和線程數量 int recheck = ctl.get(); //線程不是RUNNING狀態,從隊列中移除當前任務,并且執行拒絕策略。 //這里說明一點,只有RUNNING狀態的線程池才會接受新的任務,其余狀態全部拒絕。 if (! isRunning(recheck) && remove(command)) reject(command); //如果線程池的線程數量為空時,代表線程池是空的,添加一個新的線程。 else if (workerCountOf(recheck) == 0) addWorker(null, false); } //如果隊列是滿的,或者是SynchronousQueue隊列時,則直接添加新的線程執行任務,如果添加失敗則進行拒絕 //可能線程池的線程數量大于maximumPoolSize則采取拒絕策略。 else if (!addWorker(command, false)) reject(command); }
通過分析execute方法總結以下幾點:
當線程池中線程的數量小于corePoolSize時,直接添加線程到線程池并且將當前任務做為第一個任務執行。
如果線程池的狀態的是RUNNING,則可以接受任務,將任務放入到阻塞隊列中,內部進行二次檢查,有可能在運行下面內容時線程池狀態已經發生了變化,在這個時候如果線程池狀態變成不是RUNNING,則將當前任務從隊列中移除,并且進行拒絕策略。
如果阻塞隊列已經滿了或者SynchronousQueue這種特殊隊列無空間的時候,直接添加新的線程執行任務,當線程池的線程數量大于maximumPoolSize時相應拒絕策略。
入隊操作用的是offer方法,該方法不會阻塞隊列,如果隊列已經滿時或超時導致入隊失敗,返回false,如果入隊成功返回true。
針對上面例子源碼我們來做一下分析,我們源碼中阻塞隊列采用的是ArrayBlockingQueue隊列,并且指定隊列的長度是5,我們看下面提交的線程池的任務是15個,而且corePoolSize設置的是5個核心線程,最大線程數(maximumPoolSzie)是10個(包括核心線程數),假設所有任務都同時提交到了線程池中,其中有5個任務會被提交到線程中作為第一個任務進行執行,會有5個任務被添加到阻塞隊列中,還有5個任務提交到到線程池中的時候發現阻塞隊列已經滿了,這時候會直接提交任務,發現當前線程數是5小于最大線程數,可以進行新建線程來執行任務。
1.png部提交,因為我們在任務中添加了Thread.sleep睡眠一會,在for循環結束提交任務之后可能才會結束掉任務的睡眠執行任務后面內容,所以可以看做是全部提交任務,但是沒有任務完成,如果有任務完成的話,可能就不會是觸發最大的線程數,有可能就是一個任務完成后從隊列取出來,然后另一個任務來的時候可以添加到隊列中,上圖中可以看到,有5個核心core線程在執行任務,任務隊列中有5個任務在等待空余線程執行,而還有5個正在執行的線程,核心線程是指在corePoolSize范圍的線程,而非核心線程指的是大于corePoolSize但是小于等于MaximumPoolSize的線程,就是這些非核心線程并不是一直存活的線程,它會跟隨線程池指定的參數來進行銷毀,我們這里指定了60s后如果沒有任務提交,則會進行銷毀操作,當然工作線程并不指定那些線程必須回收那些線程就必須保留,是根據從隊列中獲取任務來決定,如果線程獲取任務時發現線程池中的線程數量大于corePoolSize,并且阻塞隊列中為空時,則阻塞隊列會阻塞60s后如果還有沒有任務就返回false,這時候會釋放線程,調用processWorkerExit來處理線程的退出,接下來我們來分析下addWorker都做了什么內容:
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { //獲取線程池的狀態和線程池線程的數量 int c = ctl.get(); //多帶帶獲取線程池的狀態 int rs = runStateOf(c); //檢查隊列是否只在必要時為空 if (rs >= SHUTDOWN && //線程池的狀態是SHUTDOWN、STOP、TIDYING、TERMINATED ! (rs == SHUTDOWN && //可以看做是rs!=SHUTDOWN,線程池狀態為STOP、TIDYING、TERMINATED firstTask == null && //可以看做firstTask!=null,并且rs=SHUTDOWN ! workQueue.isEmpty())) //可以看做rs=SHUTDOWN,并且workQueue.isEmpty()隊列為空 return false; //循環CAS增加線程池中線程的個數 for (;;) { //獲取線程池中線程個數 int wc = workerCountOf(c); //如果線程池線程數量超過最大線程池數量,則直接返回 if (wc >= CAPACITY || //如果指定使用corePoolSize作為限制則使用corePoolSize,反之使用maximumPoolSize,最為工作線程最大線程線程數量,如果工作線程大于相應的線程數量則直接返回。 wc >= (core ? corePoolSize : maximumPoolSize)) return false; //CAS增加線程池中線程的數量 if (compareAndIncrementWorkerCount(c)) //跳出增加線程池數量。 break retry; //如果修改失敗,則重新獲取線程池的狀態和線程數量 c = ctl.get(); // Re-read ctl //如果最新的線程池狀態和原有縣城出狀態不一樣時,則跳轉到外層retry中,否則在內層循環重新進行CAS if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //工作線程是否開始啟動標志 boolean workerStarted = false; //工作線程添加到線程池成功與否標志 boolean workerAdded = false; Worker w = null; try { //創建一個Worker對象 w = new Worker(firstTask); //獲取worker中的線程,這里線程是通過ThreadFactory線程工廠創建出來的,詳細看下面源碼信息。 final Thread t = w.thread; //判斷線程是否為空 if (t != null) { //添加獨占鎖,為添加worker進行同步操作,防止其他線程同時進行execute方法。 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //獲取線程池的狀態 int rs = runStateOf(ctl.get()); //如果線程池狀態為RUNNING或者是線程池狀態為SHUTDOWN并且第一個任務為空時,當線程池狀態為SHUTDOWN時,是不允許添加新任務的,所以他會從隊列中獲取任務。 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); //添加worker到集合中 workers.add(w); int s = workers.size(); //跟蹤最大的線程池數量 if (s > largestPoolSize) largestPoolSize = s; //添加worker成功 workerAdded = true; } } finally { mainLock.unlock(); } //如果添加worker成功就啟動任務 if (workerAdded) { t.start(); workerStarted = true; } } } finally { //如果沒有啟動,w不為空就已出worker,并且線程池數量進行減少。 if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
通過上面addWorker方法可以分為兩個部分來進行講解,第一部分是對線程池中線程數量的通過CAS的方式進行增加,其中第一部分中上面有個if語句,這個地方著重分析下:
if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false;
可以看成下面的樣子,將!放到括號里面,變成下面的樣子:
if (rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null || workQueue.isEmpty())) return false;
線程池的狀態是SHUTDOWN、STOP、TIDYING、TERMINATED
當線程池狀態是STOP、TIDYING、TERMINATED時,這些狀態的時候不需要進行線程的添加和啟動操作,因為如果是上面的狀態,其實線程池的線程正在進行銷毀操作,意味著線程調用了shutdownNow等方法。
如果線程池狀態為SHUTDOWN并且第一個任務不為空時,不接受新的任務,直接返回false,也就是說SHUTDOWN的狀態,不會接受新任務,只會針對隊列中未完成的任務進行操作。
當線線程池狀態為SHUTDOWN并且隊列為空時,直接返回不進行任務添加。
上半部分分為內外兩個循環,外循環對線程池狀態的判斷,用于判斷是否需要添加工作任務線程,通過上面講的內容進行判斷,后面內循環則是通過CAS操作增加線程數,如果指定了core參數為true,代表線程池中線程的數量沒有超過corePoolSize,當指定為false時,代表線程池中線程數量達到了corePoolSize,并且隊列已經滿了,或者是SynchronousQueue這種無空間的隊列,但是還沒有達到最大的線程池maximumPoolSize,所以它內部會根據指定的core參數來判斷是否已經超過了最大的限制,如果超過了就不能進行添加線程了,并且進行拒絕策略,如果沒有超過就增加線程數量。
第二部分主要是把任務添加到worker中,并啟動線程,這里我們先來看一下Worker對象。
// 這里發現它是實現了AQS,是一個不可重入的獨占鎖模式 // 并且它還集成了Runable接口,實現了run方法。 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; /** 執行任務的線程,通過ThreadFactory創建 */ final Thread thread; /** 初始化第一個任務*/ Runnable firstTask; /** 每個線程完成任務的數量 */ volatile long completedTasks; /** * 首先現將state值設置為-1,因為在AQS中state=0代表的是鎖沒有被占用,而且在線程池中shutdown方法會判斷能否爭搶到鎖,如果可以獲得鎖則對線程進行中斷操作,如果調用了shutdownNow它會判斷state>=0會被中斷。 * firstTask第一個任務,如果為空則會從隊列中獲取任務,后面runWorker中。 */ Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** 委托調用外部的runWorker方法 */ public void run() { runWorker(this); } //是否獨占鎖 protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } //這里就是上面shutdownNow中調用的線程中斷的方法,getState()>=0 void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
可以看到Worker是一個實現了AQS的鎖,它是一個不可重入的獨占鎖,并且他也實現了Runnable接口,實現了run方法,在構造函數中將AQS的state設置為-1,為了避免線程還沒有進入runWorker方法前,就調用了shutdown或shutdownNow方法,會被中斷,設置為-1則不會被中斷。后面我們看到run方法,它調用的是ThreadPoolExecutor的runWorker方法,我們這里回想一下,在addWorker方法中,添加worker到HashSet
if (workerAdded) { t.start(); workerStarted = true; }
這個t代表的就是在Worker構造函數中的使用ThreadFactory創建的線程,并且將自己(Worker自己)傳遞了當前線程,創建的線程就是任務線程,任務線程啟動的時候會調用Worker下的run方法,run方法內部又委托給外部方法runWorker來進行操作,它的參數傳遞的是調用者自己,Worker中的run方法如下所示:
public void run() { runWorker(this); //this指Worker對象本身 }
這里簡單畫一張圖來表示下調用的邏輯。
2.png
整體的邏輯是先進行創建線程,線程將Worker設置為執行程序,并將線程塞到Worker中,然后再addWorker中將Worker中的線程取出來,進行啟動操作,啟動后他會調用Worker中的run方法,然后run方法中將調用ThreadPoolExecutor的runWorker,然后runWorker又會調用Worker中的任務firstTask,這個fistTask是要真正執行的任務,也是用戶自己實現的代碼邏輯。
接下來我們就要看一下runWorker方法里面具體內容:
final void runWorker(Worker w) { //調用者也就是Worker中的線程 Thread wt = Thread.currentThread(); //獲取Worker中的第一個任務 Runnable task = w.firstTask; //將Worker中的任務清除代表執行了第一個任務了,后面如果再有任務就從隊列中獲取。 w.firstTask = null; //這里還記的我們在new Worker的時候將AQS的state狀態設置為-1,這里先進行解鎖操作,將state設置為0 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { //循環進行獲取任務,如果第一個任務不為空,或者是如果第一個任務為空,從任務隊列中獲取任務,如果有任務則返回獲取的任務信息,如果沒有任務可以獲取則進行阻塞,阻塞也分兩種第一種是阻塞直到任務隊列中有內容,第二種是阻塞隊列一定時間之后還是沒有任務就直接返回null。 while (task != null || (task = getTask()) != null) { //先獲取worker的獨占鎖,防止其他線程調用了shutdown方法。 w.lock(); // 如果線程池正在停止,確保線程是被中斷的,如果沒有則確保線程不被中斷操作。 if ((runStateAtLeast(ctl.get(), STOP) || //如果線程池狀態為STOP、TIDYING、TERMINATED直接拒絕任務中斷當前線程 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { //執行任務之前做一些操作,可進行自定義 beforeExecute(wt, task); Throwable thrown = null; try { //運行任務在這里嘍。 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //執行任務之后做一些操作,可進行自定義 afterExecute(task, thrown); } } finally { //將任務清空為了下次任務獲取 task = null; //統計當前Worker完成了多少任務 w.completedTasks++; //獨占鎖釋放 w.unlock(); } } completedAbruptly = false; } finally { //處理Worker的退出操作,執行清理工作。 processWorkerExit(w, completedAbruptly); } }
我們看到如果Worker是第一次被啟動,它會從Worker中獲取firstTask任務來執行,然后執行成功后,它會getTask()來從隊列中獲取任務,這個地方比較有意思,它是分情況進行獲取任務的,我們都直到BlockingQueue中提供了幾種從隊列中獲取的方法,這個getTask中使用了兩種方式,第一種是使用poll進行獲取隊列中的信息,它采用的是過一點時間如果隊列中仍沒有任務時直接返回null,然后還有一個就是take方法,take方法是如果隊列中沒有任務則將當前線程進行阻塞,等待隊列中有任務后,會通知等待的隊列線程進行消費任務,讓我們看一下getTask方法:
private Runnable getTask() { boolean timedOut = false; //poll獲取超時 for (;;) { //獲取線程池的狀態和線程數量 int c = ctl.get(); //獲取線程池的狀態 int rs = runStateOf(c); //線程池狀態大于等于SHUTDOWN //1.線程池如果是大于STOP的話減少工作線程池數量 //2.如果線程池狀態為SHUTDOW并且隊列為空時,代表隊列任務已經執行完,返回null,線程數量減少1 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } //獲取線程池數量。 int wc = workerCountOf(c); //如果allowCoreThreadTimeOut為true,則空閑線程在一定時間未獲得任務會清除 //或者如果線程數量大于corePoolSize的時候會進行清除空閑線程 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; //1.如果線程池數量大于最大的線程池數量或者對(空余線程進行清除操作并且poll超時了,意思是隊列中沒有內容了,導致poll間隔一段時間后沒有獲取內容超時了。 //2.如果線程池的數量大于1或者是隊列已經是空的 //總之意思就是當線程池的線程池數量大于corePoolSize,或指定了allowCoreThreadTimeOut為true,當隊列中沒有數據或者線程池數量大于1的情況下,嘗試對線程池的數量進行減少操作,然后返回null,用于上一個方法進行清除操作。 if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { //如果timed代表的是清除空閑線程的意思 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : //等待一段時間如果沒有獲取到返回null。 workQueue.take(); //阻塞當前線程 //如果隊列中獲取到內容則返回 if (r != null) return r; //如果沒有獲取到超時了則設置timeOut狀態 timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
工作線程調用getTask從隊列中進行獲取任務。
如果指定了allowCoreThreadTimeOut或線程池線程數量大于corePoolSize則進行清除空閑多余的線程,調用阻塞隊列的poll方法,在指定時間內如果沒有獲取到任務直接返回false。
如果線程池中線程池數量小于corePoolSize或者allowCoreThreadTimeOut為false默認值,則進行阻塞線程從隊列中獲取任務,直到隊列有任務喚醒線程。
我們還記得第一張圖中有標記出來是core線程和普通線程,其實這樣標記不是很準確,準確的意思是如果線程池的數量超過了corePoolSize并且沒有特別指定allowCoreThreadTimeOut的情況下,它會清除掉大于corePoolSize并且小于等于maximumPoolSize的一些線程,標記出core線程的意思是有corePoolSize不會被清除,但是會清除大于corePoolSize的線程,也就是線程池中的線程對獲取任務的時候進行判斷,也就是getTask中進行判斷,如果當前線程池的線程數量大于corePoolSize就使用poll方式獲取隊列中的任務,當過一段時間還沒有任務就會返回null,返回null之后設置timeOut=true,并且獲取getTask也會返回null,到此會跳到調用者runWorker方法中,一直在while (task != null || (task = getTask()) != null)此時的getTask返回null跳出while循環語句,設置completedAbruptly = false,表示不是突然完成的而是正常完成,退出后它會執行finally的processWorkerExit(w, completedAbruptly),執行清理工作。我們來看下源碼:
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 如果突然完成則調整線程數量 decrementWorkerCount(); // 減少線程數量1 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //獲取鎖,同時只有一個線程獲得鎖 try { completedTaskCount += w.completedTasks; //統計整個線程池完成的數量 workers.remove(w); //將完成任務的worker從HashSet中移除 } finally { mainLock.unlock(); //釋放鎖 } //嘗試設置線程池狀態為TERMINATED //1.如果線程池狀態為SHUTDOWN并且線程池線程數量與工作隊列為空時,修改狀態。 //2.如果線程池狀態為STOP并且線程池線程數量為空時,修改狀態。 tryTerminate(); // 獲取線程池的狀態和線程池的數量 int c = ctl.get(); // 如果線程池的狀態小于STOP,也就是SHUTDOWN或RUNNING狀態 if (runStateLessThan(c, STOP)) { //如果不是突然完成,也就是正常結束 if (!completedAbruptly) { //如果指定allowCoreThreadTimeOut=true(默認false)則代表線程池中有空余線程時需要進行清理操作,否則線程池中的線程應該保持corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; //這里判斷如果線程池中隊列為空并且線程數量最小為0時,將最小值調整為1,因為隊列中還有任務沒有完成需要增加隊列,所以這里增加了一個線程。 if (min == 0 && ! workQueue.isEmpty()) min = 1; if (workerCountOf(c) >= min) return; // replacement not needed } //如果當前線程數效益核心個數,就增加一個Worker addWorker(null, false); }
通過上面的源碼可以得出,如果線程數超過核心線程數后,在runWorker中就不會等待隊列中的消息,而是會進行清除操作,上面的清除代碼首先是先對線程池的數量進行較少操作,其次是統計整個線程池中完成任務的數量,然后就是嘗試修改線程池的狀態由SHUTDOWN->TIDYING->TERMINATED或者是由STOP->TIDYING->TERMINATED,修改線程池狀態為TERMINATED,需要有兩個條件:
當線程池線程數量和工作隊列為空,并且線程池的狀態為SHUTDOWN時,才會將狀態進行修改,修改的過程是SHUTDOWN->TIDYING->TERMINATED
當線程池的狀態為STOP并且線程池數量為空時,才會嘗試修改狀態,修改過程是STOP->TIDYING->TERMINATED
如果設置為TERMINATED狀態,還需要調用條件變量termination的signalAll()方法來喚醒所有因為調用awaitTermination方法而被阻塞的線程,換句話說當調用awaitTermination后,只有線程池狀態變成TERMINATED才會被喚醒。
接下來我們就來分析一下這個tryTerminate方法,看一下他到底符不符合我們上述說的內容:
final void tryTerminate() { for (;;) { // 獲取線程池的狀態和線程池的數量組合狀態 int c = ctl.get(); //這里多帶帶下面進行分析,這里說明兩個問題,需要反向來想這個問題。 //1.如果線程池狀態STOP則不進入if語句 //2.如果線程池狀態為SHUTDOWN并且工作隊列為空時,不進入if語句 if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) return; //如果線程池數量不為空時,進行中斷操作。 if (workerCountOf(c) != 0) { // Eligible to terminate interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //修改狀態為TIDYING,并且將線程池的數量進行清空 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { //執行一些邏輯,默認是空的 terminated(); } finally { //修改狀態為TERMINATED ctl.set(ctlOf(TERMINATED, 0)); //喚醒調用awaitTermination方法的線程 termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS }
我們多帶帶將上面的if語句摘出來進行分析,將上面的第一個if判斷進行修改如下,可以看到return在else里面,這時候內部if判斷進行轉換,轉換成如下所示:
if (!isRunning(c) && !runStateAtLeast(c, TIDYING) && //只能是SHUTDOWN和STOP (runStateOf(c) != SHUTDOWN || workQueue.isEmpty())){ //這里執行邏輯 }else { return; }
逐一分析分析內容如下:
!isRunning(c)代表不是RUNNING,則可能的是SHUTDOWN,STOP,TIDYING,TERMINATED這四種狀態
中間的連接符是并且的意思,跟著runStateAtLeast(c, TIDYING)這句話的意思是至少是TIDYING,TERMINATED這兩個,反過來就是可能是RUNNING,SHUTDOWN,STOP,但是前面已經判斷了不能是RUNINNG狀態,所以前面兩個連在一起就是只能是狀態為SHUTDOWN,STOP
runStateOf(c) != SHUTDOWN || workQueue.isEmpty()當前面的狀態是SHUTDOWN時,則會出發workQueue.isEmpty(),連在一起就是狀態是SHUTDOWN并工作隊列為空,當線程池狀態為STOP時,則會進入到runStateOf(c) != SHUTDOWN,直接返回true,就代表線程池狀態為STOP
后面還有一個語句一個if語句將其轉換一下邏輯就是下面的內容:
if (workerCountOf(c) == 0) { //執行下面的邏輯 }else{ interruptIdleWorkers(ONLY_ONE); return; }
這里我們也進行轉換下,就可以看出來當線程池的數量為空時,才會進行下面的邏輯,下面的邏輯就是修改線程池狀態為TERMINATED,兩個連在一起就是上面分析的修改狀態為TERMINATED的條件,這里畫一張圖來表示線程池狀態的信息:
3.png
其實上面圖中我們介紹了關于從SHUTDOWN或STOP到TERMINATED的變化,沒有講解關于如何從RUNNING狀態轉變成SHUTDOWN或STOP狀態,其實是調用了shutdown()或shutdownNow方法對其進行狀態的變換,下面來看一下shutdown方法源碼:
public void shutdown() { //獲取全局鎖 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //權限檢查 checkShutdownAccess(); //設置線程池狀態為SHUTDOWN,如果狀態已經是大于等于SHUTDOWN則直接返回 advanceRunState(SHUTDOWN); //如果線程沒有設置中斷標識并且線程沒有運行則設置中斷標識 interruptIdleWorkers(); //空的可以實現的內容 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } //嘗試修改線程池狀態為TERMINATED tryTerminate(); }
首先對當前線程進行權限檢測,查看是否設置了安全管理器,如果設置了則要看當前調用shutdown的線程有沒有權限都關閉線程的權限,如果有權限還要看是否有中斷工作現成的權限,如果沒有權限則拋出SecurityException或NullPointException異常。
設置線程池狀態為SHUTDOWN,如果狀態已經是大于等于SHUTDOWN則直接返回
如果線程沒有設置中斷標識并且線程沒有運行則設置中斷標識
嘗試修改線程池狀態為TERMINATED
接下來我們來看一下advanceRunState內容如下所示:
private void advanceRunState(int targetState) { for (;;) { //獲取線程池狀態和線程池的線程數量 int c = ctl.get(); if (runStateAtLeast(c, targetState) || //如果線程池的狀態>=SHUTDOWN ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) //設置線程池狀態為SHUTDOWN //返回 break; } }
當線程池的狀態>=SHUTDOWN,直接返回
如果線程池狀態為RUNNING,設置線程池狀態為SHUTDOWN,設置成功則返回
interruptIdleWorkers代碼如下所示:
private void interruptIdleWorkers() { interruptIdleWorkers(false); }
private void interruptIdleWorkers(boolean onlyOne) { //獲取全局鎖,同時只能有一個線程能夠調用shutdown方法 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //遍歷工作線程 for (Worker w : workers) { Thread t = w.thread; //如果當前線程沒有設置中斷標志并且可以獲取Worker自己的鎖 if (!t.isInterrupted() && w.tryLock()) { try { //設置中斷標志 t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } //執行一次,清理空閑線程。 if (onlyOne) break; } } finally { mainLock.unlock(); } }
我們看到當我們調用shutdown方法的時候,只是將空閑的線程給設置了中斷標識,也就是活躍正在執行任務的線程并沒有設置中斷標識,直到將任務全部執行完后才會逐步清理線程操作,我們還記的在getTask中的方法里面有這樣一段代碼:
// Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; }
判斷是否是狀態>=SHUTDOWN,并且隊列為空時,將線程池數量進行減少操作,內部進行CAS操作,直到CAS操作成功為止,并且返回null,返回null后,會調用processWorkerExit(w, false);清理Workers線程信息,并且嘗試將線程設置為TERMINATED狀態,上面是對所有shutdown方法的分析,下面來看一下shutdownNow方法并且比較兩個之間的區別:
public ListshutdownNow() { List tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { //權限檢查 checkShutdownAccess(); //設置線程池狀態為STOP,如果狀態已經是大于等于STOP則直接返回 advanceRunState(STOP); //這里是和SHUTDOWN區別的地方,這里是強制進行中斷操作 interruptWorkers(); //將為完成任務復制到list集合中 tasks = drainQueue(); } finally { mainLock.unlock(); } //嘗試修改線程池狀態為TERMINATED tryTerminate(); return tasks; }
shutdownNow方法返回了未完成的任務信息列表tasks = drainQueue();,其實該方法和shutdown方法主要的區別在于一下幾點內容:
shutdownNow方法將線程池狀態設置為STOP,而shutdown則將狀態修改為SHUTDOWN
shutdownNow方法將工作任務進行中斷操作,也就是說如果工作線程在工作也會被中斷,而shutdown則是先嘗試獲取鎖如果獲得鎖成功則進行中斷標志設置,也就是中斷操作,如果沒有獲取到鎖則等待進行完成后自動退出。
shutdownNow方法返回未完成的任務列表。
下面代碼是shutDownNow的interruptWorkers方法:
private void interruptWorkers() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (Worker w : workers) //直接進行中斷操作。 w.interruptIfStarted(); } finally { mainLock.unlock(); } }
內部調用了Worker的interruptIfStarted方法,方法內部是針對線程進行中斷操作,但是中斷的前提條件是AQS的state狀態必須大于等于0,如果狀態為-1的則不會被中斷,但是如果任務運行起來的時候在runWorker中則不會執行任務,因為線程池狀態為STOP,如果線程池狀態為STOP則會中斷線程,下面代碼是Worker中的interruptIfStarted:
void interruptIfStarted() { Thread t; //當前Worker鎖狀態大于等于0并且線程沒有被中斷 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } }拒絕策略
JDK內置的拒絕策略如下:
AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作
CallerRunsPolicy策略:只要線程池沒有關閉線程池狀態是RUNNING狀態,該略略直接調用線程中運行當前被丟棄的任務
DiscardOledestPolicy策略:該策略將丟棄最老的一個請求,也就是即將被執行的第一個任務,并嘗試再次提交任務
DiscardPolicy策略:該策略默默丟棄無法處理的任務,不予任何處理。
5.png
首先先上一張圖,針對這張圖來進行總結:
4.png
主線程進行線程池的調用,線程池執行execute方法
線程池通過addWorker進行創建線程,并將線程放入到線程池中,這里我們看到第二步是將線程添加到核心線程中,其實線程池內部不分核心線程和非核心線程,只是根據corePoolSize和maximumPoolSize設置的大小來進行區分,因為超過corePoolSize的線程會被回收,至于回收那些線程,是根據線程獲取任務的時候進行判斷,當前線程池數量大于corePoolSize,或者指定了allowCoreThreadTimeOut為true,則他等待一定時間后會返回,不會一直等待
當線程池的數量達到corePoolSize時,線程池首先會將任務添加到隊列中
當隊列中任務也達到了隊列設置的最大值時,它會創建新的線程,注意的是此時的線程數量已經超過了corePoolSize,但是沒有達到maximumPoolSize最大值。
當線程池的線程數量達到了maximumPoolSize,則會相應拒絕策略。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77783.html
摘要:創建一個定長線程池,可控制線程最大并發數,超出的線程會在隊列中等待。創建一個定長線程池,支持定時及周期性任務執行。 ExecutorService是Java中對線程池定義的一個接口,它java.util.concurrent包中. 創建一個什么樣的ExecutorService的實例(即線程池)需要g根據具體應用場景而定,不過Java給我們提供了一個Executors工廠類,它可以幫助...
摘要:本文是作者自己對中線程的狀態線程間協作相關使用的理解與總結,不對之處,望指出,共勉。當中的的數目而不是已占用的位置數大于集合番一文通版集合番一文通版垃圾回收機制講得很透徹,深入淺出。 一小時搞明白自定義注解 Annotation(注解)就是 Java 提供了一種元程序中的元素關聯任何信息和著任何元數據(metadata)的途徑和方法。Annotion(注解) 是一個接口,程序可以通過...
摘要:從使用到原理學習線程池關于線程池的使用,及原理分析分析角度新穎面向切面編程的基本用法基于注解的實現在軟件開發中,分散于應用中多出的功能被稱為橫切關注點如事務安全緩存等。 Java 程序媛手把手教你設計模式中的撩妹神技 -- 上篇 遇一人白首,擇一城終老,是多么美好的人生境界,她和他歷經風雨慢慢變老,回首走過的點點滴滴,依然清楚的記得當初愛情萌芽的模樣…… Java 進階面試問題列表 -...
摘要:內存分配解析四方法執行完畢,立即釋放局部變量所占用的棧空間。內存分配解析五調用對象的方法,以實例為參數。堆和棧的小結以上就是程序運行時內存分配的大致情況。 前言 java中有很多類型的變量、靜態變量、全局變量及對象等,這些變量在java運行的時候到底是如何分配內存的呢?接下來有必要對此進行一些探究。 基本知識概念: (1)寄存器:最快的存儲區, 由編譯器根據需求進行分配,我們在程序...
閱讀 536·2021-10-19 11:45
閱讀 1345·2021-09-30 09:48
閱讀 1463·2021-08-16 10:56
閱讀 727·2021-07-26 23:38
閱讀 3206·2019-08-30 13:15
閱讀 2589·2019-08-30 12:45
閱讀 1823·2019-08-29 12:14
閱讀 2059·2019-08-26 18:42