摘要:當面試官問線程池時,你應該知道些什么一執行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創建出一個調度線程池以后,就可以開始提交任務了。
最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。
原文地址:http://www.jianshu.com/p/18f4...
該類主要還是基于ThreadPoolExecutor類進行二次開發,所以對Java線程池執行過程還不了解的同學建議先看看我之前的文章。
當面試官問線程池時,你應該知道些什么?
與ThreadPoolExecutor不同,向ScheduledThreadPoolExecutor中提交任務的時候,任務被包裝成ScheduledFutureTask對象加入延遲隊列并啟動一個woker線程。
用戶提交的任務加入延遲隊列時,會按照執行時間進行排列,也就是說隊列頭的任務是需要最早執行的。而woker線程會從延遲隊列中獲取任務,如果已經到了任務的執行時間,則開始執行。否則阻塞等待剩余延遲時間后再嘗試獲取任務。
任務執行完成以后,如果該任務是一個需要周期性反復執行的任務,則計算好下次執行的時間后會重新加入到延遲隊列中。
二、源碼深入分析首先看下ScheduledThreadPoolExecutor類的幾個構造函數:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); }
注:這里構造函數都是使用super,其實就是ThreadPoolExecutor的構造函數
這里有三點需要注意:
使用DelayedWorkQueue作為阻塞隊列,并沒有像ThreadPoolExecutor類一樣開放給用戶進行自定義設置。該隊列是ScheduledThreadPoolExecutor類的核心組件,后面詳細介紹。
這里沒有向用戶開放maximumPoolSize的設置,原因是DelayedWorkQueue中的元素在大于初始容量16時,會進行擴容,也就是說隊列不會裝滿,maximumPoolSize參數即使設置了也不會生效。
worker線程沒有回收時間,原因跟第2點一樣,因為不會觸發回收操作。所以這里的線程存活時間都設置為0。
再次說明:上面三點的理解需要先了解ThreadPoolExecutor的知識點。
當我們創建出一個調度線程池以后,就可以開始提交任務了。這里依次分析一下三個常用API的源碼:
首先是schedule方法,該方法是指任務在指定延遲時間到達后觸發,只會執行一次。
public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) { //參數校驗 if (command == null || unit == null) throw new NullPointerException(); //這里是一個嵌套結構,首先把用戶提交的任務包裝成ScheduledFutureTask //然后在調用decorateTask進行包裝,該方法是留給用戶去擴展的,默認是個空方法 RunnableScheduledFuture> t = decorateTask(command, new ScheduledFutureTask(command, null, triggerTime(delay, unit))); //包裝好任務以后,就進行提交了 delayedExecute(t); return t; }
重點看一下提交任務的源碼:
private void delayedExecute(RunnableScheduledFuture> task) { //如果線程池已經關閉,則使用拒絕策略把提交任務拒絕掉 if (isShutdown()) reject(task); else { //與ThreadPoolExecutor不同,這里直接把任務加入延遲隊列 super.getQueue().add(task); //如果當前狀態無法執行任務,則取消 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else //這里是增加一個worker線程,避免提交的任務沒有worker去執行 //原因就是該類沒有像ThreadPoolExecutor一樣,woker滿了才放入隊列 ensurePrestart(); } }
這里的關鍵點其實就是super.getQueue().add(task)行代碼,ScheduledThreadPoolExecutor類在內部自己實現了一個基于堆數據結構的延遲隊列。add方法最終會落到offer方法中,一起看下:
public boolean offer(Runnable x) { //參數校驗 if (x == null) throw new NullPointerException(); RunnableScheduledFuture> e = (RunnableScheduledFuture>)x; final ReentrantLock lock = this.lock; lock.lock(); try { //查看當前元素數量,如果大于隊列長度則進行擴容 int i = size; if (i >= queue.length) grow(); //元素數量加1 size = i + 1; //如果當前隊列還沒有元素,則直接加入頭部 if (i == 0) { queue[0] = e; //記錄索引 setIndex(e, 0); } else { //把任務加入堆中,并調整堆結構,這里就會根據任務的觸發時間排列 //把需要最早執行的任務放在前面 siftUp(i, e); } //如果新加入的元素就是隊列頭,這里有兩種情況 //1.這是用戶提交的第一個任務 //2.新任務進行堆調整以后,排在隊列頭 if (queue[0] == e) { //這個變量起優化作用,后面說 leader = null; //加入元素以后,喚醒worker線程 available.signal(); } } finally { lock.unlock(); } return true; }
通過上面的邏輯,我們把提交的任務成功加入到了延遲隊列中,前面說了加入任務以后會開啟一個woker線程,該線程的任務就是從延遲隊列中不斷取出任務執行。這些都是跟ThreadPoolExecutor相同的,我們看下從該延遲隊列中獲取元素的源碼:
public RunnableScheduledFuture> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { //取出隊列中第一個元素,即最早需要執行的任務 RunnableScheduledFuture> first = queue[0]; //如果隊列為空,則阻塞等待加入元素時喚醒 if (first == null) available.await(); else { //計算任務執行時間,這個delay是當前時間減去任務觸發時間 long delay = first.getDelay(NANOSECONDS); //如果到了觸發時間,則執行出隊操作 if (delay <= 0) return finishPoll(first); first = null; //這里表示該任務已經分配給了其他線程,當前線程等待喚醒就可以 if (leader != null) available.await(); else { //否則把給任務分配給當前線程 Thread thisThread = Thread.currentThread(); leader = thisThread; try { //當前線程等待任務剩余延遲時間 available.awaitNanos(delay); } finally { //這里線程醒來以后,什么時候leader會發生變化呢? //就是上面的添加任務的時候 if (leader == thisThread) leader = null; } } } } } finally { //如果隊列不為空,則喚醒其他woker線程 if (leader == null && queue[0] != null) available.signal(); lock.unlock(); } }
這里為什么會加入一個leader變量來分配阻塞隊列中的任務呢?原因是要減少不必要的時間等待。比如說現在隊列中的第一個任務1分鐘后執行,那么用戶提交新的任務時會不斷的加入woker線程,如果新提交的任務都排在隊列后面,也就是說新的woker現在都會取出這第一個任務進行執行延遲時間的等待,當該任務到觸發時間時,會喚醒很多woker線程,這顯然是沒有必要的。
當任務被woker線程取出以后,會執行run方法,由于此時任務已經被包裝成了ScheduledFutureTask對象,那我們來看下該類的run方法:
public void run() { boolean periodic = isPeriodic(); //如果當前線程池已經不支持執行任務,則取消 if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) //如果不需要周期性執行,則直接執行run方法然后結束 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { //如果需要周期執行,則在執行完任務以后,設置下一次執行時間 setNextRunTime(); //把任務重新加入延遲隊列 reExecutePeriodic(outerTask); } }
上面就是schedule方法完整的執行過程。
ScheduledThreadPoolExecutor類中關于周期性執行的任務提供了兩個方法scheduleAtFixedRate跟scheduleWithFixedDelay,一起看下區別。
public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { //刪除不必要的邏輯,重點看區別 ScheduledFutureTasksft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), //二者唯一區別 unit.toNanos(period)); //... } public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { //... ScheduledFutureTask sft = new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit), //二者唯一區別 unit.toNanos(-delay)); //.. }
前者把周期延遲時間傳入ScheduledFutureTask中,而后者卻設置成負數傳入,區別在哪里呢?看下當任務執行完成以后的收尾工作中設置任務下次執行時間的方法setNextRunTime源碼:
private void setNextRunTime() { long p = period; //大于0是scheduleAtFixedRate方法,表示執行時間是根據初始化參數計算的 if (p > 0) time += p; else //小于0是scheduleWithFixedDelay方法,表示執行時間是根據當前時間重新計算的 time = triggerTime(-p); }
也就是說當使用scheduleAtFixedRate方法提交任務時,任務后續執行的延遲時間都已經確定好了,分別是initialDelay,initialDelay + period,initialDelay + 2 * period以此類推。
而調用scheduleWithFixedDelay方法提交任務時,第一次執行的延遲時間為initialDelay,后面的每次執行時間都是在前一次任務執行完成以后的時間點上面加上period延遲執行。
ScheduledThreadPoolExecutor可以說是在ThreadPoolExecutor上面進行了一些擴展操作,它只是重新包裝了任務以及阻塞隊列。該類的阻塞隊列DelayedWorkQueue是基于堆去實現的,本文沒有太詳細介紹堆結構插入跟刪除數據的調整工作,感興趣的同學可以私信或者評論交流。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/76303.html
摘要:當面試官問線程池時,你應該知道些什么一執行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創建出一個調度線程池以后,就可以開始提交任務了。 最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。 原文地址:http://www.jianshu....
摘要:當面試官問線程池時,你應該知道些什么一執行流程與不同,向中提交任務的時候,任務被包裝成對象加入延遲隊列并啟動一個線程。當我們創建出一個調度線程池以后,就可以開始提交任務了。 最近新接手的項目里大量使用了ScheduledThreadPoolExecutor類去執行一些定時任務,之前一直沒有機會研究這個類的源碼,這次趁著機會好好研讀一下。 原文地址:http://www.jianshu....
摘要:在前面介紹了的多線程的基本原理信息線程池架構原理和源碼解析,本文對這個本身的線程池的調度器做一個簡單擴展,如果還沒讀過上一篇文章,建議讀一下,因為這是調度器的核心組件部分。 在前面介紹了java的多線程的基本原理信息:《Java線程池架構原理和源碼解析》,本文對這個java本身的線程池的調度器做一個簡單擴展,如果還沒讀過上一篇文章,建議讀一下,因為這是調度器的核心組件部分。 我們如果...
摘要:深入理解線程池線程池初探所謂線程池,就是將多個線程放在一個池子里面所謂池化技術,然后需要線程的時候不是創建一個線程,而是從線程池里面獲取一個可用的線程,然后執行我們的任務。最后的的意思是需要確保線程池已經被啟動起來了。 深入理解Java線程池 線程池初探 ?所謂線程池,就是將多個線程放在一個池子里面(所謂池化技術),然后需要線程的時候不是創建一個線程,而是從線程池里面獲取一個可用的線程...
摘要:也是自帶的一個基于線程池設計的定時任務類。其每個調度任務都會分配到線程池中的一個線程執行,所以其任務是并發執行的,互不影響。 原創不易,如需轉載,請注明出處https://www.cnblogs.com/baixianlong/p/10659045.html,否則將追究法律責任!!! 一、在JAVA開發領域,目前可以通過以下幾種方式進行定時任務 1、單機部署模式 Timer:jdk中...
閱讀 1981·2021-11-24 09:38
閱讀 3343·2021-11-22 12:07
閱讀 1909·2021-09-22 16:03
閱讀 1968·2021-09-02 15:41
閱讀 2625·2021-07-24 23:28
閱讀 2218·2019-08-29 13:17
閱讀 1558·2019-08-29 12:25
閱讀 2671·2019-08-29 11:10