摘要:該方法傳入表示如果當(dāng)前任務(wù)正在執(zhí)行,那么立即終止其執(zhí)行傳入表示如果當(dāng)前方法正在執(zhí)行,那么等待其執(zhí)行完成之后再取消當(dāng)前任務(wù)。
???????本文主要分為兩個部分,第一部分首先會對ScheduledThreadPoolExecutor進(jìn)行簡單的介紹,并且會介紹其主要API的使用方式,然后介紹了其使用時的注意點,第二部分則主要對ScheduledThreadPoolExecutor的實現(xiàn)細(xì)節(jié)進(jìn)行介紹。
1. 使用簡介???????ScheduledThreadPoolExecutor是一個使用線程池執(zhí)行定時任務(wù)的類,相較于Java中提供的另一個執(zhí)行定時任務(wù)的類Timer,其主要有如下兩個優(yōu)點:
使用多線程執(zhí)行任務(wù),不用擔(dān)心任務(wù)執(zhí)行時間過長而導(dǎo)致任務(wù)相互阻塞的情況,Timer是單線程執(zhí)行的,因而會出現(xiàn)這個問題;
不用擔(dān)心任務(wù)執(zhí)行過程中,如果線程失活,其會新建線程執(zhí)行任務(wù),Timer類的單線程掛掉之后是不會重新創(chuàng)建線程執(zhí)行后續(xù)任務(wù)的。
???????除去上述兩個優(yōu)點外,ScheduledThreadPoolExecutor還提供了非常靈活的API,用于執(zhí)行任務(wù)。其任務(wù)的執(zhí)行策略主要分為兩大類:①在一定延遲之后只執(zhí)行一次某個任務(wù);②在一定延遲之后周期性的執(zhí)行某個任務(wù)。如下是其主要API:
public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit); publicScheduledFuture schedule(Callable callable, long delay, TimeUnit unit); public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);
???????上述四個方法中,第一個和第二個方法屬于第一類,即在delay指定的延遲之后執(zhí)行第一個參數(shù)所指定的任務(wù),區(qū)別在于,第二個方法執(zhí)行之后會有返回值,而第一個方法執(zhí)行之后是沒有返回值的。第三個和第四個方法則屬于第二類,即在第二個參數(shù)(initialDelay)指定的時間之后開始周期性的執(zhí)行任務(wù),執(zhí)行周期間隔為第三個參數(shù)指定的時間,但是這兩個方法的區(qū)別在于第三個方法執(zhí)行任務(wù)的間隔是固定的,無論上一個任務(wù)是否執(zhí)行完成,而第四個方法的執(zhí)行時間間隔是不固定的,其會在周期任務(wù)的上一個任務(wù)執(zhí)行完成之后才開始計時,并在指定時間間隔之后才開始執(zhí)行任務(wù)。如下是使用scheduleWithFixedDelay()和scheduleAtFixedRate()方法編寫的測試用例:
public class ScheduledThreadPoolExecutorTest { private ScheduledThreadPoolExecutor executor; private Runnable task; @Before public void before() { executor = initExecutor(); task = initTask(); } private ScheduledThreadPoolExecutor initExecutor() { return new ScheduledThreadPoolExecutor(2);; } private Runnable initTask() { long start = System.currentTimeMillis(); return () -> { print("start task: " + getPeriod(start, System.currentTimeMillis())); sleep(SECONDS, 10); print("end task: " + getPeriod(start, System.currentTimeMillis())); }; } @Test public void testFixedTask() { print("start main thread"); executor.scheduleAtFixedRate(task, 15, 30, SECONDS); sleep(SECONDS, 120); print("end main thread"); } @Test public void testDelayedTask() { print("start main thread"); executor.scheduleWithFixedDelay(task, 15, 30, SECONDS); sleep(SECONDS, 120); print("end main thread"); } private void sleep(TimeUnit unit, long time) { try { unit.sleep(time); } catch (InterruptedException e) { e.printStackTrace(); } } private int getPeriod(long start, long end) { return (int)(end - start) / 1000; } private void print(String msg) { System.out.println(msg); } }
???????可以看到,上述兩個測試用例代碼塊基本是一致的,區(qū)別在于第一個用例調(diào)用的是scheduleAtFixedRate()方法,而第二個用例調(diào)用的是scheduleWithFixedDelay()。這里兩個用例都是設(shè)置的在延遲15s后每個30s執(zhí)行一次指定的任務(wù),而該任務(wù)執(zhí)行時長為10s。如下分別是這兩個測試用例的執(zhí)行結(jié)果:
start main thread start task: 15 end task: 25 start task: 45 end task: 55 start task: 75 end task: 85 start task: 105 end task: 115 end main thread
start main thread start task: 15 end task: 25 start task: 55 end task: 65 start task: 95 end task: 105 end main thread
??????對比上述執(zhí)行結(jié)果可以看出,對于scheduleAtFixedRate()方法,其每次執(zhí)行任務(wù)的開始時間間隔都為固定不變的30s,與任務(wù)執(zhí)行時長無關(guān),而對于scheduleWithFixedDelay()方法,其每次執(zhí)行任務(wù)的開始時間間隔都為上次任務(wù)執(zhí)行時間加上指定的時間間隔。
???????這里關(guān)于ScheduledThreadPoolExecutor的使用有三點需要說明如下:
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor(ThreadPoolExecutor詳解),因而也有繼承而來的execute()和submit()方法,但是ScheduledThreadPoolExecutor重寫了這兩個方法,重寫的方式是直接創(chuàng)建兩個立即執(zhí)行并且只執(zhí)行一次的任務(wù);
ScheduledThreadPoolExecutor使用ScheduledFutureTask封裝每個需要執(zhí)行的任務(wù),而任務(wù)都是放入DelayedWorkQueue隊列中的,該隊列是一個使用數(shù)組實現(xiàn)的優(yōu)先隊列,在調(diào)用ScheduledFutureTask::cancel()方法時,其會根據(jù)removeOnCancel變量的設(shè)置來確認(rèn)是否需要將當(dāng)前任務(wù)真正的從隊列中移除,而不只是標(biāo)識其為已刪除狀態(tài);
ScheduledThreadPoolExecutor提供了一個鉤子方法decorateTask(Runnable, RunnableScheduledFuture)用于對執(zhí)行的任務(wù)進(jìn)行裝飾,該方法第一個參數(shù)是調(diào)用方傳入的任務(wù)實例,第二個參數(shù)則是使用ScheduledFutureTask對用戶傳入任務(wù)實例進(jìn)行封裝之后的實例。這里需要注意的是,在ScheduledFutureTask對象中有一個heapIndex變量,該變量用于記錄當(dāng)前實例處于隊列數(shù)組中的下標(biāo)位置,該變量可以將諸如contains(),remove()等方法的時間復(fù)雜度從O(N)降低到O(logN),因而效率提升是比較高的,但是如果這里用戶重寫decorateTask()方法封裝了隊列中的任務(wù)實例,那么heapIndex的優(yōu)化就不存在了,因而這里強烈建議是盡量不要重寫該方法,或者重寫時也還是復(fù)用ScheduledFutureTask類。
2. 源碼詳解 2.1 主要屬性???????ScheduledThreadPoolExecutor主要有四個屬性,分別如下:
private volatile boolean continueExistingPeriodicTasksAfterShutdown; private volatile boolean executeExistingDelayedTasksAfterShutdown = true; private volatile boolean removeOnCancel = false; private static final AtomicLong sequencer = new AtomicLong();
continueExistingPeriodicTasksAfterShutdown:用于標(biāo)識當(dāng)前Executor對象shutdown時,是否繼續(xù)執(zhí)行已經(jīng)存在于任務(wù)隊列中的定時任務(wù)(調(diào)用scheduleAtFixedRate()方法生成的任務(wù));
executeExistingDelayedTasksAfterShutdown:用于標(biāo)識當(dāng)前Executor對象shutdown時,是否繼續(xù)執(zhí)行已經(jīng)存在于任務(wù)隊列中的定時任務(wù)(調(diào)用scheduleWithFixedDelay()方法生成的任務(wù));
removeOnCancel:用于標(biāo)識如果當(dāng)前任務(wù)已經(jīng)取消了,是否將其從任務(wù)隊列中真正的移除,而不只是標(biāo)識其為刪除狀態(tài);
sequencer:其為一個AtomicLong類型的變量,該變量記錄了當(dāng)前任務(wù)被創(chuàng)建時是第幾個任務(wù)的一個序號,這個序號的主要用于確認(rèn)當(dāng)兩個任務(wù)開始執(zhí)行時間相同時具體哪個任務(wù)先執(zhí)行,比如兩個任務(wù)的開始執(zhí)行時間都為1515847881158,那么序號小的任務(wù)將先執(zhí)行。
2.2 ScheduledFutureTask???????在ScheduledThreadPoolExecutor中,主要使用ScheduledFutureTask封裝需要執(zhí)行的任務(wù),該類的主要聲明如下:
private class ScheduledFutureTaskextends FutureTask implements RunnableScheduledFuture { private final long sequenceNumber; // 記錄當(dāng)前實例的序列號 private long time; // 記錄當(dāng)前任務(wù)下次開始執(zhí)行的時間 // 記錄當(dāng)前任務(wù)執(zhí)行時間間隔,等于0則表示當(dāng)前任務(wù)只執(zhí)行一次,大于0表示當(dāng)前任務(wù)為fixedRate類型的任務(wù), // 小于0則表示其為fixedDelay類型的任務(wù) private final long period; RunnableScheduledFuture outerTask = this; // 記錄需要周期性執(zhí)行的任務(wù)的實例 int heapIndex; // 記錄當(dāng)前任務(wù)在隊列數(shù)組中位置的下標(biāo) ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); // 序號在創(chuàng)建任務(wù)實例時指定,且后續(xù)不會變化 } public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } // 各個任務(wù)在隊列中的存儲方式是一個基于時間和序號進(jìn)行比較的優(yōu)先隊列,當(dāng)前方法定義了優(yōu)先隊列中兩個 // 任務(wù)執(zhí)行的先后順序。這里先對兩個任務(wù)開始執(zhí)行時間進(jìn)行比較,時間較小者優(yōu)先執(zhí)行,若開始時間相同, // 則比較兩個任務(wù)的序號,序號小的任務(wù)先執(zhí)行 public int compareTo(Delayed other) { if (other == this) return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask> x = (ScheduledFutureTask>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } public boolean isPeriodic() { // 判斷是否為周期性任務(wù) return period != 0; } // 當(dāng)前任務(wù)執(zhí)行之后,會判斷當(dāng)前任務(wù)是否為周期性任務(wù),如果為周期性任務(wù),那么就調(diào)用當(dāng)前方法計算 // 當(dāng)前任務(wù)下次開始執(zhí)行的時間。這里如果當(dāng)前任務(wù)是fixedRate類型的任務(wù)(p > 0),那么下次執(zhí)行時間 // 就是此次執(zhí)行的開始時間加上時間間隔,如果當(dāng)前任務(wù)是fixedDelay類型的任務(wù)(p < 0),那么下次執(zhí)行 // 時間就是當(dāng)前時間(triggerTime()方法會獲取系統(tǒng)當(dāng)前時間)加上任務(wù)執(zhí)行時間間隔。可以看到,定頻率 // 和定延遲的任務(wù)的執(zhí)行時間區(qū)別就在當(dāng)前方法中進(jìn)行了指定,因為調(diào)用當(dāng)前方法時任務(wù)已經(jīng)執(zhí)行完成了, // 因而triggerTime()方法中獲取的時間就是任務(wù)執(zhí)行完成之后的時間點 private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); } // 取消當(dāng)前任務(wù)的執(zhí)行,super.cancel(boolean)方法也即FutureTask.cancel(boolean)方法。該方法傳入 // true表示如果當(dāng)前任務(wù)正在執(zhí)行,那么立即終止其執(zhí)行;傳入false表示如果當(dāng)前方法正在執(zhí)行,那么等待其 // 執(zhí)行完成之后再取消當(dāng)前任務(wù)。 public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = super.cancel(mayInterruptIfRunning); // 判斷是否設(shè)置了取消后移除隊列中當(dāng)前任務(wù),是則移除當(dāng)前任務(wù) if (cancelled && removeOnCancel && heapIndex >= 0) remove(this); return cancelled; } public void run() { boolean periodic = isPeriodic(); // 判斷是否為周期性任務(wù) if (!canRunInCurrentRunState(periodic)) // 判斷是否能夠在當(dāng)前狀態(tài)下執(zhí)行該任務(wù) cancel(false); else if (!periodic) // 如果能執(zhí)行當(dāng)前任務(wù),但是任務(wù)不是周期性的,那么就立即執(zhí)行該任務(wù)一次 ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { // 是周期性任務(wù),則立即執(zhí)行當(dāng)前任務(wù)并且重置 setNextRunTime(); // 在當(dāng)前任務(wù)執(zhí)行完成后調(diào)用該方法計算當(dāng)前任務(wù)下次執(zhí)行的時間 reExecutePeriodic(outerTask); // 將當(dāng)前任務(wù)放入任務(wù)隊列中以便下次執(zhí)行 } } }
???????在ScheduledFutureTask中,主要有三個點需要強調(diào):
對于run()方法的第一個分支,canRunInCurrentRunState()方法的聲明如下所示,可以看到,該方法是用于判斷當(dāng)前任務(wù)如果為周期性任務(wù),那么其是否允許在shutdown狀態(tài)下繼續(xù)執(zhí)行已經(jīng)存在的周期性任務(wù),是則表示當(dāng)前狀態(tài)下是可以執(zhí)行當(dāng)前任務(wù)的,這里isRunningOrShutdown()方法繼承自ThreadPoolExecutor;
boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); }
在run()方法的最后一個if分支中,其首先會執(zhí)行當(dāng)前任務(wù),在執(zhí)行完成時才會調(diào)用setNextRunTime()方法設(shè)置下次任務(wù)執(zhí)行時間,也就是說對于fixedRate和fixedDelay類型的任務(wù)都是在這個時間點才設(shè)置的,因而雖然fixedRate類型的任務(wù),即使該任務(wù)下次執(zhí)行時間比當(dāng)前時間要早,其也只會在當(dāng)前任務(wù)執(zhí)行完成后立即執(zhí)行,而不會與當(dāng)前任務(wù)還未執(zhí)行完時就執(zhí)行;對于fixedDelay任務(wù)則不會存在該問題,因為其是以任務(wù)完成后的時間點為基礎(chǔ)計算下次執(zhí)行的時間點;
對于run()方法的最后一個分支中的reExecutePeriodic()方法,其會將當(dāng)前任務(wù)加入到任務(wù)隊列中,并且調(diào)用父類的ensurePrestart()方法確保有可用的線程來執(zhí)行當(dāng)前任務(wù),如下是該方法的具體實現(xiàn):
void reExecutePeriodic(RunnableScheduledFuture> task) { if (canRunInCurrentRunState(true)) { // 判斷當(dāng)前任務(wù)是否可以繼續(xù)執(zhí)行 super.getQueue().add(task); // 將當(dāng)前任務(wù)加入到任務(wù)隊列中 if (!canRunInCurrentRunState(true) && remove(task)) // 雙檢查法判斷任務(wù)在加入過程中是否取消了 task.cancel(false); else ensurePrestart(); // 初始化核心線程等確保任務(wù)可以被執(zhí)行 } }
???????從ScheduledFutureTask的實現(xiàn)總結(jié)來看,當(dāng)每創(chuàng)建一個該類實例時,會初始化該類的一些主要屬性,如下次開始執(zhí)行的時間和執(zhí)行的周期。當(dāng)某個線程調(diào)用該任務(wù),即執(zhí)行該任務(wù)的run()方法時,如果該任務(wù)不為周期性任務(wù),那么執(zhí)行該任務(wù)之后就不會有其余的動作,如果該任務(wù)為周期性任務(wù),那么在將當(dāng)前任務(wù)執(zhí)行完畢之后,還會重置當(dāng)前任務(wù)的狀態(tài),并且計算下次執(zhí)行當(dāng)前任務(wù)的時間,然后將其放入隊列中以便下次執(zhí)行。
2.3 DelayedWorkQueue???????DelayedWorkQueue的實現(xiàn)與DelayQueue以及PriorityQueue的實現(xiàn)基本相似,形式都為一個優(yōu)先隊列,并且底層是使用堆結(jié)構(gòu)來實現(xiàn)優(yōu)先隊列的功能,在數(shù)據(jù)存儲方式上,其使用的是數(shù)組來實現(xiàn)。這里DelayedWorkQueue與DelayQueue以及PriorityQueue不同的點在于DelayedWorkQueue中主要存儲ScheduledFutureTask類型的任務(wù),該任務(wù)中有一個heapIndex屬性保存了當(dāng)前任務(wù)在當(dāng)前隊列數(shù)組中的位置下標(biāo),其主要提升的是對隊列的諸如contains()和remove()等需要定位當(dāng)前任務(wù)位置的方法的效率,時間復(fù)雜度可以從O(N)提升到O(logN)。如下是DelayedWorkQueue的實現(xiàn)代碼(這里只列出了該類的主要屬性和與實現(xiàn)ScheduledThreadPoolExecutor功能相關(guān)的方法,關(guān)于如何使用數(shù)組實現(xiàn)優(yōu)先隊列請讀者查閱相關(guān)文檔):
static class DelayedWorkQueue extends AbstractQueueimplements BlockingQueue { private static final int INITIAL_CAPACITY = 16; // 數(shù)組初始化大小 private RunnableScheduledFuture>[] queue = new RunnableScheduledFuture>[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); // 對添加和刪除元素所使用的鎖 private int size = 0; // 當(dāng)前隊列中有效任務(wù)的個數(shù) private Thread leader = null; // 執(zhí)行隊列頭部任務(wù)的線程 private final Condition available = lock.newCondition(); // 除leader線程外其余線程的等待隊列 // 在對任務(wù)進(jìn)行移動時,判斷其是否為ScheduledFutureTask實例,如果是則維護(hù)其heapIndex屬性 private void setIndex(RunnableScheduledFuture> f, int idx) { if (f instanceof ScheduledFutureTask) ((ScheduledFutureTask)f).heapIndex = idx; } private void siftUp(int k, RunnableScheduledFuture> key) {/* 省略 */} private void siftDown(int k, RunnableScheduledFuture> key) {/* 省略 */} private int indexOf(Object x) { if (x != null) { if (x instanceof ScheduledFutureTask) { // 如果為ScheduledFutureTask則可返回其heapIndex屬性 int i = ((ScheduledFutureTask) x).heapIndex; if (i >= 0 && i < size && queue[i] == x) return i; } else { // 如果不為ScheduledFutureTask實例,則需要遍歷隊列查詢當(dāng)前元素的位置 for (int i = 0; i < size; i++) if (x.equals(queue[i])) return i; } } return -1; } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture> e = (RunnableScheduledFuture>)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); // 隊列容量不足,對其進(jìn)行擴(kuò)容 size = i + 1; if (i == 0) { // 如果其為隊列第一個元素,則將其放入隊列頭部 queue[0] = e; setIndex(e, 0); } else { //如果不為第一個元素,則通過堆的上移元素操作移動當(dāng)前元素至合適的位置 siftUp(i, e); } if (queue[0] == e) { // 如果被更新的是隊列頭部元素,則更新記錄的執(zhí)行頭部任務(wù)的線程 leader = null; available.signal(); } } finally { lock.unlock(); } return true; } // 完成從隊列拉取元素操作,并且將其從隊列中移除 private RunnableScheduledFuture> finishPoll(RunnableScheduledFuture> f) { int s = --size; RunnableScheduledFuture> x = queue[s]; queue[s] = null; // 將隊列最尾部的元素置空 if (s != 0) // 將最后一個元素放入第一個位置,并且將其下推至合適的位置 siftDown(0, x); // 這里idx置為0是因為當(dāng)前方法的入?yún)都為隊列的第一個元素 setIndex(f, -1); return f; } // 嘗試從隊列(堆)中獲取元素,如果沒有元素或者元素的延遲時間還未到則返回空 public RunnableScheduledFuture> poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture> first = queue[0]; // 在此處代碼控制了當(dāng)從堆頂拉取元素時,如果元素的延遲時間還未達(dá)到,則不返回當(dāng)前元素 if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return finishPoll(first); // 返回堆頂元素 } finally { lock.unlock(); } } // 通過無限for循環(huán)獲取堆頂?shù)脑兀@里take()方法會阻塞當(dāng)前線程,直至獲取到了可執(zhí)行的任務(wù)。 // 可以看到,在第一次for循環(huán)中,如果堆頂不存在任務(wù),則其會加入阻塞隊列中,如果存在任務(wù),但是 // 其延遲時間還未到,那么當(dāng)前線程會等待該延遲時間長的時間,然后查看任務(wù)是否可用,當(dāng)獲取到任務(wù) // 之后,其會將其從隊列中移除,并且喚醒等待隊列中其余等待的線程執(zhí)行下一個任務(wù) public RunnableScheduledFuture> take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture> first = queue[0]; if (first == null) available.await(); // 堆內(nèi)沒有元素,當(dāng)前線程進(jìn)入等待隊列中 else { long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // 堆頂元素延遲時間小于0,可立即獲取任務(wù) return finishPoll(first); first = null; if (leader != null) available.await(); // 已經(jīng)有線程在等待堆頂元素,則當(dāng)前線程進(jìn)入等待隊列中 else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { available.awaitNanos(delay); // 當(dāng)前線程等待一定時長后獲取任務(wù)并執(zhí)行 } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && queue[0] != null) available.signal(); // 當(dāng)前線程獲取完任務(wù)之后喚醒等待隊列中的下一個線程執(zhí)行下一個任務(wù) lock.unlock(); } } }
???????從DelayedWorkQueue的take()和poll()方法可以看出來,對于隊列中任務(wù)的等待時間的限制主要是在這兩個方法中實現(xiàn)的,如果任務(wù)的等待時間還未到,那么該方法就會阻塞線程池中的線程,直至任務(wù)可以執(zhí)行。
2.4 scheduleAtFixedRate()和scheduleWithFixedDelay()方法???????前面我們對ScheduledThreadPoolExecutor的主要屬性和主要內(nèi)部類都進(jìn)行了詳細(xì)的講解,基本上已經(jīng)可以看出其是如何實現(xiàn)定時執(zhí)行任務(wù)的功能的,接下來我們主要對客戶端可以調(diào)用的主要方法進(jìn)行簡要介紹,這里scheduleAtFixedRate()和scheduleWithFixedDelay()方法的實現(xiàn)基本是一致的,兩個方法最細(xì)微的區(qū)別在于ScheduledFutureTask的setNextRunTime()方法的實現(xiàn),該方法的實現(xiàn)前面已經(jīng)進(jìn)行了講解,我們這里則以scheduleAtFixedRate()方法的實現(xiàn)為例對該方法進(jìn)行講解。如下是該方法的具體實現(xiàn):
public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); ScheduledFutureTasksft = // 封裝客戶端的任務(wù)實例 new ScheduledFutureTask (command, null, triggerTime(initialDelay, unit),unit.toNanos(period)); RunnableScheduledFuture t = decorateTask(command, sft); // 對客戶端任務(wù)實例進(jìn)行裝飾 sft.outerTask = t; // 初始化周期任務(wù)屬性outerTask delayedExecute(t); // 執(zhí)行該任務(wù) return t; }
???????從上述代碼可以看出來,scheduleAtFixedRate()首先對客戶端任務(wù)實例進(jìn)行了封裝,裝飾,并且初始化了封裝后的任務(wù)實例的outerTask屬性,最后調(diào)用delayedExecute()方法執(zhí)行任務(wù)。如下是delayedExecute()方法的實現(xiàn):
private void delayedExecute(RunnableScheduledFuture> task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); // 添加當(dāng)前任務(wù)到任務(wù)隊列中 if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); // 雙檢查法再次判斷當(dāng)前線程池是否處于可用狀態(tài),不是則移除當(dāng)前任務(wù) else ensurePrestart(); // 若線程池沒有初始化,則進(jìn)行一些初始化工作 } }
???????上述方法為主要的執(zhí)行任務(wù)的方法,該方法首先會將任務(wù)加入到任務(wù)隊列中,如果線程池已經(jīng)初始化過,那么該任務(wù)就會有等待的線程執(zhí)行該任務(wù)。在加入到任務(wù)隊列之后通過雙檢查法檢查線程池是否已經(jīng)shutdown了,如果是則將該任務(wù)從任務(wù)隊列中移除。如果當(dāng)前線程池沒有shutdown,就調(diào)用繼承自ThreadPoolExecutor的ensurePrestart()方法,該方法會對線程池進(jìn)行一些初始化工作,如初始化核心線程,然后各個線程會調(diào)用上述等待隊列的take()方法獲取任務(wù)執(zhí)行。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/76304.html
摘要:該方法傳入表示如果當(dāng)前任務(wù)正在執(zhí)行,那么立即終止其執(zhí)行傳入表示如果當(dāng)前方法正在執(zhí)行,那么等待其執(zhí)行完成之后再取消當(dāng)前任務(wù)。 ???????本文主要分為兩個部分,第一部分首先會對ScheduledThreadPoolExecutor進(jìn)行簡單的介紹,并且會介紹其主要API的使用方式,然后介紹了其使用時的注意點,第二部分則主要對ScheduledThreadPoolExecutor的實現(xiàn)細(xì)節(jié)...
摘要:該方法傳入表示如果當(dāng)前任務(wù)正在執(zhí)行,那么立即終止其執(zhí)行傳入表示如果當(dāng)前方法正在執(zhí)行,那么等待其執(zhí)行完成之后再取消當(dāng)前任務(wù)。 ???????本文主要分為兩個部分,第一部分首先會對ScheduledThreadPoolExecutor進(jìn)行簡單的介紹,并且會介紹其主要API的使用方式,然后介紹了其使用時的注意點,第二部分則主要對ScheduledThreadPoolExecutor的實現(xiàn)細(xì)節(jié)...
摘要:一使用線程池的好處線程池提供了一種限制和管理資源包括執(zhí)行一個任務(wù)。每個線程池還維護(hù)一些基本統(tǒng)計信息,例如已完成任務(wù)的數(shù)量。通過重復(fù)利用已創(chuàng)建的線程降低線程創(chuàng)建和銷毀造成的消耗。使用無界隊列作為線程池的工作隊列會對線程池帶來的影響與相同。 歷史優(yōu)質(zhì)文章推薦: Java并發(fā)編程指南專欄 分布式系統(tǒng)的經(jīng)典基礎(chǔ)理論 可能是最漂亮的Spring事務(wù)管理詳解 面試中關(guān)于Java虛擬機(jī)(jvm)的問...
摘要:去美團(tuán)面試,問到了什么是線程池,如何使用,為什么要用以下做個總結(jié)。二線程池線程池的作用線程池作用就是限制系統(tǒng)中執(zhí)行線程的數(shù)量。真正的線程池接口是。創(chuàng)建固定大小的線程池。此線程池支持定時以及周期性執(zhí)行任務(wù)的需求。 去美團(tuán)面試,問到了什么是線程池,如何使用,為什么要用,以下做個總結(jié)。關(guān)于線程之前也寫過一篇文章《高級面試題總結(jié)—線程池還能這么玩?》 1、什么是線程池:? java.util...
閱讀 5265·2021-09-22 15:59
閱讀 1856·2021-08-23 09:42
閱讀 2561·2019-08-29 18:42
閱讀 3444·2019-08-29 10:55
閱讀 2058·2019-08-27 10:57
閱讀 1759·2019-08-26 18:27
閱讀 2722·2019-08-23 18:26
閱讀 2912·2019-08-23 14:40