摘要:在前面介紹了的多線程的基本原理信息線程池架構原理和源碼解析,本文對這個本身的線程池的調度器做一個簡單擴展,如果還沒讀過上一篇文章,建議讀一下,因為這是調度器的核心組件部分。
在前面介紹了java的多線程的基本原理信息:《Java線程池架構原理和源碼解析》,本文對這個java本身的線程池的調度器做一個簡單擴展,如果還沒讀過上一篇文章,建議讀一下,因為這是調度器的核心組件部分。
我們如果要用java默認的線程池來做調度器,一種選擇就是Timer和TimerTask的結合,在以前的文章:《Timer與TimerTask的真正原理&使用介紹》中有明確的說明:一個Timer為一個多帶帶的線程,雖然一個Timer可以調度多個TimerTask,但是對于一個Timer來講是串行的,至于細節請參看對應的那篇文章的內容,本文介紹的多線程調度器,也就是定時任務,基于多線程調度完成,當然你可以為了完成多線程使用多個Timer,只是這些Timer的管理需要你來完成,不是一個框架體系,而ScheduleThreadPoolExecutor提供了這個功能,所以我們第一要搞清楚是如何使用調度器的,其次是需要知道它的內部原理是什么,也就是知其然,再知其所以然!
首先如果我們要創建一個基于java本身的調度池通常的方法是:
Executors.newScheduledThreadPool(int);
當有重載方法,我們最常用的是這個就從這個,看下定義:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }
其實內部是new了一個實例化對象出來,并傳入大小,此時就跟蹤到ScheduledThreadPoolExecutor的構造方法中:
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0,TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }
你會發現調用了super,而super你跟蹤進去會發現,是ThreadPoolExecutor中,那么ScheduledThreadPoolExecutor和ThreadPoolExecutor有何區別,就是本文要說得重點了,首先我們留下個引子,你發現在定義隊列的時候,不再是上文中提到的LinkedBlockingQueue,而是DelayedWorkQueue,那么細節上我們接下來就是要講解的重點,既然他們又繼承關系,其實搞懂了不同點,就搞懂了共同點,而且有這樣的關系大多數應當是共同點,不同點的猜測:這個是要實現任務調度,任務調度不是立即的,需要延遲和定期做等情況,那么是如何實現的呢?
這就是我們需要思考的了,通過源碼考察,我們發現,他們都有execute方法,只是ScheduledThreadPoolExecutor將源碼進行了重寫,并且還有以下四個調度器的方法:
public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit); public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit); public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit);
那么這四個方法有什么區別呢?其實第一個和第二個區別不大,一個是Runnable、一個是Callable,內部包裝后是一樣的效果;所以把頭兩個方法幾乎當成一種調度,那么三種情況分別是:
1、 進行一次延遲調度:延遲delay這么長時間,單位為:TimeUnit傳入的的一個基本單位,例如:TimeUnit.SECONDS屬于提供好的枚舉信息;(適合于方法1和方法2)。
2、 多次調度,每次依照上一次預計調度時間進行調度,例如:延遲2s開始,5s一次,那么就是2、7、12、17,如果中間由于某種原因導致線程不夠用,沒有得到調度機會,那么接下來計算的時間會優先計算進去,因為他的排序會被排在前面,有點類似Timer中的:scheduleAtFixedRate方法,只是這里是多線程的,它的方法名也叫:scheduleAtFixedRate,所以這個是比較好記憶的(適合方法3)
3、 多次調度,每次按照上一次實際執行的時間進行計算下一次時間,同上,如果在第7秒沒有被得到調度,而是第9s才得到調度,那么計算下一次調度時間就不是12秒,而是9+5=14s,如果再次延遲,就會延遲一個周期以上,也就會出現少調用的情況(適合于方法3);
4、 最后補充execute方法是一次調度,期望被立即調度,時間為空:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
schedule(command, 0, TimeUnit.NANOSECONDS);
}
我們簡單看看scheduleAtFixedRate、scheduleWithFixedDelay對下面的分析會更加有用途:
public ScheduledFuture> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period <= 0) throw new IllegalArgumentException(); RunnableScheduledFuture> t = decorateTask(command, new ScheduledFutureTask
兩段源碼唯一的區別就是在unit.toNanos(int)這唯一一個地方,scheduleAtFixedRate里面是直接傳入值,而scheduleWithFixedDelay里面是取了相反數,也就是假如我們都傳入正數,scheduleWithFixedDelay其實就取反了,沒有任何區別,你是否聯想到前面文章介紹Timer中類似的處理手段通過正負數區分時間間隔方法,為0代表僅僅調度一次,其實在這里同樣是這樣的,他們也同樣有一個問題就是,如果你傳遞負數,方法的功能正好是相反的。
而你會發現,不論是那個schedule方法里頭,都會創建一個ScheduledFutureTask類的實例,此類究竟是何方神圣呢,我們來看看。
ScheduledFutureTask的類(ScheduleThreadPoolExecutor的私有的內部類)來進行調度,那么可以看看內部做了什么操作,如下:
ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a periodic action with given nano time and period. */ ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } /** * Creates a one-shot action with given nanoTime-based trigger. */ ScheduledFutureTask(Callable callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); }
最核心的幾個參數正好對應了調度的延遲的構造方法,這些參數如何用起來的?那么它還提供了什么方法呢?
public long getDelay(TimeUnit unit) { return unit.convert(time - now(), TimeUnit.NANOSECONDS); } public int compareTo(Delayed other) { if (other == this) // compare zero ONLY if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask> x = (ScheduledFutureTask>)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long d = (getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS)); return (d == 0)? 0 : ((d < 0)? -1 : 1); }
這里發現了,他們可以運行,且判定時間的方法是getDelay方法我們知道了。 對比時間的方法是:compareTo,傳入了參數類型為:Delayed類型,不難猜測出,ScheduledFutureTask和Delayed有某種繼承關系,沒錯,ScheduledFutureTask實現了Delayed的接口,只是它是間接實現的;并且Delayed接口繼承了Comparable接口,這個接口可用來干什么?看過我前面寫的一篇文章關于中文和對象排序的應該知道,這個是用來自定義對比和排序的,我們的調度任務是一個對象,所以需要排序才行,接下來我們回溯到開始定義的代碼中,找一個實際調用的代碼來看看它是如何啟動到run方法的?如何排序的?如何調用延遲的?就是我們下文中會提到的,而這里我們先提出問題,后文我們再來說明這些問題。 我們先來看下run方法的一些定義。
/** * 時間片類型任務執行 */ private void runPeriodic() { //運行對應的程序,這個是具體的程序 boolean ok = ScheduledFutureTask.super.runAndReset(); boolean down = isShutdown(); // Reschedule if not cancelled and not shutdown or policy allows if (ok && (!down || (getContinueExistingPeriodicTasksAfterShutdownPolicy() && !isStopped()))) { long p = period; if (p > 0)//規定時間間隔算出下一次時間 time += p; else//用當前時間算出下一次時間,負負得正 time = triggerTime(-p); //計算下一次時間,并資深再次放入等待隊列中 ScheduledThreadPoolExecutor.super.getQueue().add(this); } else if (down) interruptIdleWorkers(); } /** * 是否為逐片段執行,如果不是,則調用父親類的run方法 */ public void run() { if (isPeriodic())//周期任務 runPeriodic(); else//只執行一次的任務 ScheduledFutureTask.super.run(); }
可以看到run方法首先通過isPeriod()判定是否為時間片,判定的依據就是我們說的時間片是否“不為零”,如果不是周期任務,就直接運行一次,如果是周期任務,則除了運行還會計算下一次執行的時間,并將其再次放入等待隊列,這里對應到scheduleAtFixedRate、scheduleWithFixedDelay這兩個方法一正一負,在這里得到判定,并且將為負數的取反回來,負負得正,java就是這么干的,呵呵,所以不要認為什么是不可能的,只要好用什么都是可以的,然后計算的時間一個是基于標準的time加上一個時間片,一個是根據當前時間計算一個時間片,在上文中我們已經明確說明了兩者的區別。
以:schedule方法為例:
public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture t = decorateTask(callable, new ScheduledFutureTask(callable, triggerTime(delay, unit))); delayedExecute(t); return t; }
其實這個方法內部創建的就是一個我們剛才提到的:ScheduledFutureTask,外面又包裝了下叫做RunnableScheduledFuture,也就是適配了下而已,呵呵,代碼里面就是一個return操作,java這樣做的目的是方便子類去擴展。
關鍵是delayedExecute(t)方法中做了什么?看名稱是延遲執行的意思,難道java的線程可以延遲執行,那所有的任務線程都在運行狀態?
它的源碼是這樣的:
private void delayedExecute(Runnable command) { if (isShutdown()) { reject(command); return; } if (getPoolSize() < getCorePoolSize()) prestartCoreThread(); super.getQueue().add(command); }
我們主要關心prestartCoreThread()和super.getQueue().add(command),因為如果系統關閉,這些討論都沒有意義的,我們分別叫他們第二小段代碼和第三小段代碼。
第二個部分如果線程數小于核心線程數設置,那么就調用一個prestartCoreThread(),看方法名應該是:預先啟動一個核心線程的意思,先看完第三個部分,再跟蹤進去看源碼。
第三個部分很明了,就是調用super.getQueue().add(command);也就是說直接將任務放入一個隊列中,其實super是什么?super就是我們上一篇文章所提到的ThreadPoolExecutor,那么這個Queue就是上一篇文章中提到的等待隊列,也就是任何schedule任務首先放入等待隊列,然后等待被調度的。
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); } private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); return true; }
這個代碼是否似曾相似,沒錯,這個你在上一篇文章介紹ThreadPoolExecutor的時候就見到過,說明不論是ThreadPoolExecutor還是ScheduleThreadPoolExecutor他們的Thread都是由一個Worker來處理的(上一篇文章有介紹),而這個Worker處理的基本機制就是將當前任務執行后,不斷從線程等待隊列中獲取數據,然后用以執行,直到隊列為空為止。 那么他們的區別在哪里呢?延遲是如何實現的呢?和我們上面介紹的ScheduledFutureTask又有何關系呢? 那么我們回過頭來看看ScheduleThreadPool的定義是如何的。
public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0,TimeUnit.NANOSECONDS, new DelayedWorkQueue()); }
發現它和ThreadPoolExecutor有個定義上很大的區別就是,ThreadPoolExecutor用的是LinkedBlockingQueue(當然可以修改),它用的是DelayedWeorkQueue,而這個DelayedWorkQueue里面你會發現它僅僅是對java.util.concurrent.DelayedQueue類一個簡單訪問包裝,這個隊列就是等待隊列,可以看到任務是被直接放到等待隊列中的,所以取數據必然從這里獲取,而這個延遲的隊列有何神奇之處呢,它又是如何實現的呢,我們從什么地方下手去看這個DelayWorkQueue? 我們還是回頭看看Worker里面的run方法(上一篇文章中已經講過):
public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); } }
這里面要調用等待隊列就是getTask()方法:
Runnable getTask() { for (;;) { try { int state = runState; if (state > SHUTDOWN) return null; Runnable r; if (state == SHUTDOWN) // Help drain queue r = workQueue.poll(); else if (poolSize > corePoolSize || allowCoreThreadTimeOut) r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS); else r = workQueue.take(); if (r != null) return r; if (workerCanExit()) { if (runState >= SHUTDOWN) // Wake up others interruptIdleWorkers(); return null; } } catch (InterruptedException ie) { } } }
發現沒有,如果沒有設置超時,默認只會通過workQueue.take()方法獲取數據,那么我們就看take方法,而增加到隊列里面的方法自然看offer相關的方法。接下來我們來看下DelayQueue這個隊列的take方法:
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { available.await();//等待信號,線程一直掛在哪里 } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay > 0) { long tl = available.awaitNanos(delay);//最左等delay的時間段 } else { E x = q.poll();//可以運行,取出一個 assert x != null; if (q.size() != 0) available.signalAll(); return x; } } } } finally { lock.unlock(); } }
這里的for就是要找到數據為止,否則就等著,而這個“q”和“available”是什么呢?
private transient final Condition available = lock.newCondition();
private final PriorityQueue q = new PriorityQueue();
怎么里面還有一層隊列,不用怕,從這里你貌似看出點名稱意味了,就是它是優先級隊列,而對于任務調度來講,優先級的方式就是時間,我們用這中猜測來繼續深入源碼。
上面首先獲取這個隊列的第一個元素,若為空,就等待一個“available”發出的信號,我們可以猜測到這個offer的時候會發出的信號,一會來驗證即可;若不為空,則通過getDelay方法來獲取時間信息,這個getDelay方法就用上了我們開始說的ScheduledFutureTask了,如果是時間大于0,則也進入等待,因為還沒開始執行,等待也是“available”發出信號,但是有一個最長時間,為什么還要等這個信號,是因為有可能進來一個新的任務,比這個等待的任務還要先執行,所以要等這個信號;而最多等這么長時間,就是因為如果這段時間沒任務進來肯定就是它執行了。然后就返回的這個值,被Worker(上面有提到)拿到后調用其run()方法進行運行。
那么寫入隊列在那里?他們是如何排序的?
我們看看隊列的寫入方法是這樣的:
public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); q.offer(e); if (first == null || e.compareTo(first) < 0) available.signalAll(); return true; } finally { lock.unlock(); } }
隊列也是首先取出第一個(后面會用來和當前任務做比較),而這里“q”是上面提到的“PriorityQueue”,看來offer的關鍵還在它的里面,我們看看調用過程:
public boolean offer(E e) { if (e == null) throw new NullPointerException(); modCount++; int i = size; if (i >= queue.length) grow(i + 1); size = i + 1; if (i == 0) queue[0] = e; else siftUp(i, e);//主要是這條代碼很關鍵 return true; } private void siftUp(int k, E x) { if (comparator != null) siftUpUsingComparator(k, x); else //我們默認走這里,因為DelayQueue定義它的時候默認沒有給定義comparator siftUpComparable(k, x); } /* 可以發現這個方法是將任務按照compareTo對比后,放在隊列的合適位置,但是它肯定不是絕對順序的,這一點和Timer的內部排序機制類似。 */ private void siftUpComparable(int k, E x) { Comparable super E> key = (Comparable super E>) x; while (k > 0) { int parent = (k - 1) >>> 1; Object e = queue[parent]; if (key.compareTo((E) e) >= 0) break; queue[k] = e; k = parent; } queue[k] = key; }
你是否發現,compareTo也用上了,就是我們前面描述一大堆的:ScheduledFutureTask類中的一個方法,那么run方法也用上了,這個過程貌似完整了。
我們再來理一下思路:
1、調用的Thread的包裝,由在ThreadPoolExecutor中的Worker調用你傳入的Runnable的run方法,變成了Worker調用Runnable的run方法,由它來處理時間片的信息調用你傳入的線程。
2、ScheduledFutureTask類在整個過程中提供了基礎參考的方法,其中最為關鍵的就是實現了接口Comparable,實現內部的compareTo方法,也實現了Delayed接口中的getDelay方法用以判定時間(當然Delayed接口本身也是繼承于Comparable,我們不要糾結于細節概念就好)。
3、等待隊列由在ThreadPoolExecutor中默認使用的LinkedBlockingQueue換成了DelayQueue(它是被DelayWorkQueue包裝了一下子,沒多大區別),而DelayQueue主要提供了一個信號量“available”來作為寫入和讀取的信號控制開關,通過另一個優先級隊列“PriorityQueue”來控制實際的隊列順序,他們的順序就是基于上面提到的ScheduledFutureTask類中的compareTo方法,而是否運行也是基于getDelay方法來實現的。
4、ScheduledFutureTask類的run方法會判定是否為時間片信息,如果為時間片,在執行完對應的方法后,開始計算下一次執行時間(注意判定時間片大于0,小于0,分別代表的是以當前執行完的時間為準計算下一次時間還是以當前時間為準),這個在前面有提到。
5、它是支持多線程的,和Timer的機制最大的區別就在于多個線程會最征用這個隊列,隊里的排序方式和Timer有很多相似之處,并非完全有序,而是通過位移動來盡量找到合適的位置,有點類似貪心的算法,呵呵。
via ifeve
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69762.html
摘要:如問到是否使用某框架,實際是是問該框架的使用場景,有什么特點,和同類可框架對比一系列的問題。這兩個方向的區分點在于工作方向的側重點不同。 [TOC] 這是一份來自嗶哩嗶哩的Java面試Java面試 32個核心必考點完全解析(完) 課程預習 1.1 課程內容分為三個模塊 基礎模塊: 技術崗位與面試 計算機基礎 JVM原理 多線程 設計模式 數據結構與算法 應用模塊: 常用工具集 ...
摘要:也是自帶的一個基于線程池設計的定時任務類。其每個調度任務都會分配到線程池中的一個線程執行,所以其任務是并發執行的,互不影響。 原創不易,如需轉載,請注明出處https://www.cnblogs.com/baixianlong/p/10659045.html,否則將追究法律責任!!! 一、在JAVA開發領域,目前可以通過以下幾種方式進行定時任務 1、單機部署模式 Timer:jdk中...
摘要:創建線程的方式方式一將類聲明為的子類。將該線程標記為守護線程或用戶線程。其中方法隱含的線程為父線程。恢復線程,已過時。等待該線程銷毀終止。更多的使當前線程在鎖存器倒計數至零之前一直等待,除非線 知識體系圖: showImg(https://segmentfault.com/img/bVbef6v?w=1280&h=960); 1、線程是什么? 線程是進程中獨立運行的子任務。 2、創建線...
摘要:多線程和并發問題是技術面試中面試官比較喜歡問的問題之一。線程可以被稱為輕量級進程。一個守護線程是在后臺執行并且不會阻止終止的線程。其他的線程狀態還有,和。上下文切換是多任務操作系統和多線程環境的基本特征。 多線程和并發問題是 Java 技術面試中面試官比較喜歡問的問題之一。在這里,從面試的角度列出了大部分重要的問題,但是你仍然應該牢固的掌握Java多線程基礎知識來對應日后碰到的問題。(...
摘要:線程可以被稱為輕量級進程。一個守護線程是在后臺執行并且不會阻止終止的線程。其他的線程狀態還有,和。上下文切換是多任務操作系統和多線程環境的基本特征。在的線程中并沒有可供任何對象使用的鎖和同步器。 原文:Java Multi-Threading and Concurrency Interview Questions with Answers 翻譯:并發編程網 - 鄭旭東 校對:方騰飛 多...
閱讀 3539·2021-11-18 13:22
閱讀 2556·2021-09-23 11:53
閱讀 725·2019-08-30 13:17
閱讀 1346·2019-08-30 13:12
閱讀 895·2019-08-29 15:43
閱讀 1099·2019-08-29 12:53
閱讀 2828·2019-08-26 18:27
閱讀 1499·2019-08-26 11:52