摘要:之后,會重復上一步,新喚醒的線程可能取代成為新的線程。這其實是一種名為的多線程設計模式。我們之前說了,線程作用之一就是用來喚醒其它無限等待的線程,所以必須要有這個判斷。線程池框架中的就是一種延時阻塞隊列。
本文首發(fā)于一世流云專欄:https://segmentfault.com/blog...一、DelayQueue簡介
DelayQueue是JDK1.5時,隨著J.U.C包一起引入的一種阻塞隊列,它實現(xiàn)了BlockingQueue接口,底層基于已有的PriorityBlockingQueue實現(xiàn):
DelayQueue也是一種比較特殊的阻塞隊列,從類聲明也可以看出,DelayQueue中的所有元素必須實現(xiàn)Delayed接口:
/** * 一種混合風格的接口,用來標記那些應該在給定延遲時間之后執(zhí)行的對象。 ** 此接口的實現(xiàn)必須定義一個 compareTo 方法,該方法提供與此接口的 getDelay 方法一致的排序。 */ public interface Delayed extends Comparable
{ /** * 返回與此對象相關的剩余有效時間,以給定的時間單位表示. */ long getDelay(TimeUnit unit); }
可以看到,Delayed接口除了自身的getDelay方法外,還實現(xiàn)了Comparable接口。getDelay方法用于返回對象的剩余有效時間,實現(xiàn)Comparable接口則是為了能夠比較兩個對象,以便排序。
也就是說,如果一個類實現(xiàn)了Delayed接口,當創(chuàng)建該類的對象并添加到DelayQueue中后,只有當該對象的getDalay方法返回的剩余時間≤0時才會出隊。
另外,由于DelayQueue內部委托了PriorityBlockingQueue對象來實現(xiàn)所有方法,所以能以堆的結構維護元素順序,這樣剩余時間最小的元素就在堆頂,每次出隊其實就是刪除剩余時間≤0的最小元素。
DelayQueue的特點簡要概括如下:
DelayQueue是無界阻塞隊列;
隊列中的元素必須實現(xiàn)Delayed接口,元素過期后才會從隊列中取走;
二、DelayQueue示例為了便于理解DelayQueue的功能,我們先來看一個使用DelayQueue的示例。
隊列元素第一節(jié)說了,隊列元素必須實現(xiàn)Delayed接口,我們先來定義一個Data類,作為隊列元素:
public class Data implements Delayed { private static final AtomicLong atomic = new AtomicLong(0); private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("HH:mm:ss-n"); // 數(shù)據(jù)的失效時間點 private final long time; // 序號 private final long seqno; /** * @param deadline 數(shù)據(jù)失效時間點 */ public Data(long deadline) { this.time = deadline; this.seqno = atomic.getAndIncrement(); } /** * 返回剩余有效時間 * * @param unit 時間單位 */ @Override public long getDelay(TimeUnit unit) { return unit.convert(this.time - System.nanoTime(), TimeUnit.NANOSECONDS); } /** * 比較兩個Delayed對象的大小, 比較順序如下: * 1. 如果是對象本身, 返回0; * 2. 比較失效時間點, 先失效的返回-1,后失效的返回1; * 3. 比較元素序號, 序號小的返回-1, 否則返回1. * 4. 非Data類型元素, 比較剩余有效時間, 剩余有效時間小的返回-1,大的返回1,相同返回0 */ @Override public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof Data) { Data x = (Data) other; // 優(yōu)先比較失效時間 long diff = this.time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (this.seqno < x.seqno) // 剩余時間相同則比較序號 return -1; else return 1; } // 一般不會執(zhí)行到此處,除非元素不是Data類型 long diff = this.getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } @Override public String toString() { return "Data{" + "time=" + time + ", seqno=" + seqno + "}, isValid=" + isValid(); } private boolean isValid() { return this.getDelay(TimeUnit.NANOSECONDS) > 0; } }
關于隊列元素Data類,需要注意以下幾點:
每個元素的time字段保存失效時間點)的納秒形式(構造時指定,比如當前時間+60s);
seqno字段表示元素序號,每個元素唯一,僅用于失效時間點一致的元素之間的比較。
getDelay方法返回元素的剩余有效時間,可以根據(jù)入?yún)⒌?strong>TimeUnit選擇時間的表示形式(秒、微妙、納秒等),一般選擇納秒以提高精度;
compareTo方法用于比較兩個元素的大小,以便在隊列中排序。由于DelayQueue基于優(yōu)先級隊列實現(xiàn),所以內部是“堆”的形式,我們定義的規(guī)則是先失效的元素將先出隊,所以先失效元素應該在堆頂,即compareTo方法返回結果<0的元素優(yōu)先出隊;
生產(chǎn)者-消費者還是以“生產(chǎn)者-消費者”模式來作為DelayQueued的示例:
生產(chǎn)者
public class Producer implements Runnable { private final DelayQueue queue; public Producer(DelayQueue queue) { this.queue = queue; } @Override public void run() { while (true) { long currentTime = System.nanoTime(); long validTime = ThreadLocalRandom.current().nextLong(1000000000L, 7000000000L); Data data = new Data(currentTime + validTime); queue.put(data); System.out.println(Thread.currentThread().getName() + ": put " + data); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消費者
public class Consumer implements Runnable { private final DelayQueue queue; public Consumer(DelayQueue queue) { this.queue = queue; } @Override public void run() { while (true) { try { Data data = queue.take(); System.out.println(Thread.currentThread().getName() + ": take " + data); Thread.yield(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
調用
public class Main { public static void main(String[] args) { DelayQueue queue = new DelayQueue<>(); Thread c1 = new Thread(new Consumer(queue), "consumer-1"); Thread p1 = new Thread(new Producer(queue), "producer-1"); c1.start(); p1.start(); } }
執(zhí)行結果:
producer-1: put Data{time=73262562161592, seqno=0}, isValid=true
producer-1: put Data{time=73262787192726, seqno=1}, isValid=true
producer-1: put Data{time=73265591291171, seqno=2}, isValid=true
producer-1: put Data{time=73266850330909, seqno=3}, isValid=true
consumer-1: take Data{time=73262562161592, seqno=0}, isValid=false
consumer-1: take Data{time=73262787192726, seqno=1}, isValid=false
producer-1: put Data{time=73267928737184, seqno=4}, isValid=true
producer-1: put Data{time=73265083111776, seqno=5}, isValid=true
producer-1: put Data{time=73268729942809, seqno=6}, isValid=true
consumer-1: take Data{time=73265083111776, seqno=5}, isValid=false
上面示例中,我們創(chuàng)建了一個生產(chǎn)者,一個消費者,生產(chǎn)者不斷得入隊元素,每個元素都會有個截止有效期;消費者不斷得從隊列者獲取元素。從輸出可以看出,消費者每次獲取到的元素都是有效期最小的,且都是已經(jīng)失效了的。(因為DelayQueue每次出隊只會刪除有效期最小且已經(jīng)過期的元素)
三、DelayQueue原理介紹完了DelayQueued的基本使用,讀者應該對該阻塞隊列的功能有了基本了解,接下來我們看下Doug Lea是如何實現(xiàn)DelayQueued的。
構造DelayQueued提供了兩種構造器,都非常簡單:
/** * 默認構造器. */ public DelayQueue() { }
/** * 從已有集合構造隊列. */ public DelayQueue(Collection extends E> c) { this.addAll(c); }
可以看到,內部的PriorityQueue并非在構造時創(chuàng)建,而是對象創(chuàng)建時生成:
public class DelayQueueextends AbstractQueue implements BlockingQueue { private final transient ReentrantLock lock = new ReentrantLock(); private final PriorityQueue q = new PriorityQueue (); /** * leader線程是首個嘗試出隊元素(隊列不為空)但被阻塞的線程. * 該線程會限時等待(隊首元素的剩余有效時間),用于喚醒其它等待線程 */ private Thread leader = null; /** * 出隊線程條件隊列, 當有多個線程, 會在此條件隊列上等待. */ private final Condition available = lock.newCondition(); //... }
上述比較特殊的是leader字段,我們之前已經(jīng)說過,DelayQueue每次只會出隊一個過期的元素,如果隊首元素沒有過期,就會阻塞出隊線程,讓線程在available這個條件隊列上無限等待。
為了提升性能,DelayQueue并不會讓所有出隊線程都無限等待,而是用leader保存了第一個嘗試出隊的線程,該線程的等待時間是隊首元素的剩余有效期。這樣,一旦leader線程被喚醒(此時隊首元素也失效了),就可以出隊成功,然后喚醒一個其它在available條件隊列上等待的線程。之后,會重復上一步,新喚醒的線程可能取代成為新的leader線程。這樣,就避免了無效的等待,提升了性能。這其實是一種名為“Leader-Follower pattern”的多線程設計模式。
入隊——putput方法沒有什么特別,由于是無界隊列,所以也不會阻塞線程。
/** * 入隊一個指定元素e. * 由于是無界隊列, 所以該方法并不會阻塞線程. */ public void put(E e) { offer(e); } public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); // 調用PriorityQueue的offer方法 if (q.peek() == e) { // 如果入隊元素在隊首, 則喚醒一個出隊線程 leader = null; available.signal(); } return true; } finally { lock.unlock(); } }
需要注意的是當首次入隊元素時,需要喚醒一個出隊線程,因為此時可能已有出隊線程在空隊列上等待了,如果不喚醒,會導致出隊線程永遠無法執(zhí)行。
if (q.peek() == e) { // 如果入隊元素在隊首, 則喚醒一個出隊線程 leader = null; available.signal(); }出隊——take
整個take方法在一個自旋中完成,其實就分為兩種情況:
1.隊列為空
這種情況直接阻塞出隊線程。(在available條件隊列等待)
2.隊列非空
隊列非空時,還要看隊首元素的狀態(tài)(有效期),如果隊首元素過期了,那直接出隊就行了;如果隊首元素未過期,就要看當前線程是否是第一個到達的出隊線程(即判斷leader是否為空),如果不是,就無限等待,如果是,則限時等待。
/** * 隊首出隊元素. * 如果隊首元素(堆頂)未到期或隊列為空, 則阻塞線程. */ public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (; ; ) { E first = q.peek(); // 讀取隊首元素 if (first == null) // CASE1: 隊列為空, 直接阻塞 available.await(); else { // CASE2: 隊列非空 long delay = first.getDelay(NANOSECONDS); if (delay <= 0) // CASE2.0: 隊首元素已過期 return q.poll(); // 執(zhí)行到此處說明隊列非空, 且隊首元素未過期 first = null; if (leader != null) // CASE2.1: 已存在leader線程 available.await(); // 無限期阻塞當前線程 else { // CASE2.2: 不存在leader線程 Thread thisThread = Thread.currentThread(); leader = thisThread; // 將當前線程置為leader線程 try { available.awaitNanos(delay); // 阻塞當前線程(限時等待剩余有效時間) } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) // 不存在leader線程, 則喚醒一個其它出隊線程 available.signal(); lock.unlock(); } }
需要注意,自旋結束后如果leader == null && q.peek() != null,需要喚醒一個等待中的出隊線程。四、總結
leader == null && q.peek() != null的含義就是——沒有leader線程但隊列中存在元素。我們之前說了,leader線程作用之一就是用來喚醒其它無限等待的線程,所以必須要有這個判斷。
DelayQueue是阻塞隊列中非常有用的一種隊列,經(jīng)常被用于緩存或定時任務等的設計。
考慮一種使用場景:
異步通知的重試,在很多系統(tǒng)中,當用戶完成服務調用后,系統(tǒng)有時需要將結果異步通知到用戶的某個URI。由于網(wǎng)絡等原因,很多時候會通知失敗,這個時候就需要一種重試機制。
這時可以用DelayQueue保存通知失敗的請求,失效時間可以根據(jù)已通知的次數(shù)來設定(比如:2s、5s、10s、20s),這樣每次從隊列中take獲取的就是剩余時間最短的請求,如果已重復通知次數(shù)超過一定閾值,則可以把消息拋棄。
后面,我們在講J.U.C之executors框架的時候,還會再次看到DelayQueue的身影。JUC線程池框架中的ScheduledThreadPoolExecutor.DelayedWorkQueue就是一種延時阻塞隊列。
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77112.html
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執(zhí)行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據(jù)一系列常見的多線程設計模式,設計了并發(fā)包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發(fā)于一世流云專欄:https...
摘要:我們來看下的類繼承圖可以看到,實現(xiàn)了接口,在多線程進階二五之框架中,我們提到過實現(xiàn)了接口,以提供和排序相關的功能,維持元素的有序性,所以就是一種為并發(fā)環(huán)境設計的有序工具類。唯一的區(qū)別是針對的僅僅是鍵值,針對鍵值對進行操作。 showImg(https://segmentfault.com/img/bVbggic?w=960&h=600); 本文首發(fā)于一世流云專欄:https://seg...
摘要:僅僅當有多個線程同時進行寫操作時,才會進行同步。可以看到,上述方法返回一個迭代器對象,的迭代是在舊數(shù)組上進行的,當創(chuàng)建迭代器的那一刻就確定了,所以迭代過程中不會拋出并發(fā)修改異常。另外,迭代器對象也不支持修改方法,全部會拋出異常。 showImg(https://segmentfault.com/img/bVbggij?w=960&h=600); 本文首發(fā)于一世流云專欄:https://...
摘要:我們之前已經(jīng)介紹過了,底層基于跳表實現(xiàn),其操作平均時間復雜度均為。事實上,內部引用了一個對象,以組合方式,委托對象實現(xiàn)了所有功能。線程安全內存的使用較多迭代是對快照進行的,不會拋出,且迭代過程中不支持修改操作。 showImg(https://segmentfault.com/img/bVbggjf?w=600&h=377); 本文首發(fā)于一世流云專欄:https://segmentfa...
摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現(xiàn)了接口。該類在構造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內部通過來保證線程安全,所以的整體實現(xiàn)時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發(fā)于一世流云專欄:ht...
閱讀 1822·2023-04-26 02:32
閱讀 571·2021-11-18 13:12
閱讀 2455·2021-10-20 13:48
閱讀 2521·2021-10-14 09:43
閱讀 3832·2021-10-11 10:58
閱讀 3496·2021-09-30 10:00
閱讀 2936·2019-08-30 15:53
閱讀 3493·2019-08-30 15:53