摘要:和方法會一直阻塞調用線程,直到線程被中斷或隊列狀態可用和方法會限時阻塞調用線程,直到超時或線程被中斷或隊列狀態可用。
本文首發于一世流云專欄:https://segmentfault.com/blog...一、引言
從本節開始,我們將介紹juc-collections框架中的“阻塞隊列”部分。阻塞隊列在實際應用中非常廣泛,許多消息中間件中定義的隊列,通常就是一種“阻塞隊列”。
那么“阻塞隊列”和我們之前討論過的ConcurrentLinkedQueue、ConcurrentLinkedDeque有什么不同呢?
ConcurrentLinkedQueue和ConcurrentLinkedDeque是以非阻塞算法實現的高性能隊列,其使用場景一般是在并發環境下,需要“隊列”/“棧”這類數據結構時才會使用;而“阻塞隊列”通常利用了“鎖”來實現,也就是會阻塞調用線程,其使用場景一般是在“生產者-消費者”模式中,用于線程之間的數據交換或系統解耦。
在Java多線程基礎(七)——Producer-Consumer模式中,我們曾簡要的談到過“生產者-消費者”這種模式。在這種模式中,“生產者”和“消費者”是相互獨立的,兩者之間的通信需要依靠一個隊列。這個隊列,其實就是本文中的“阻塞隊列”。
引入“阻塞隊列”的最大好處就是解耦,在軟件工程中,“高內聚,低耦合”是進行模塊設計的準則之一,這樣“生產者”和“消費者”其實是互不影響的,將來任意一方需要升級時,可以保證系統的平滑過渡。
二、BlockingQueue簡介BlockingQueue是在JDK1.5時,隨著J.U.C引入的一個接口:
BlockingQueue繼承了Queue接口,提供了一些阻塞方法,主要作用如下:
當線程向隊列中插入元素時,如果隊列已滿,則阻塞線程,直到隊列有空閑位置(非滿);
當線程從隊列中取元素(刪除隊列元素)時,如果隊列未空,則阻塞線程,直到隊列有元素;
既然BlockingQueue是一種隊列,所以也具備隊列的三種基本方法:插入、刪除、讀取:
操作類型 | 拋出異常 | 返回特殊值 | 阻塞線程 | 超時 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
刪除 | remove() | poll() | take() | poll(time, unit) |
讀取 | element() | peek() | / | / |
可以看到,對于每種基本方法,“拋出異?!焙汀胺祷靥厥庵怠钡姆椒ǘx和Queue是完全一樣的。BlockingQueue只是增加了兩類和阻塞相關的方法:put(e)、take();offer(e, time, unit)、poll(time, unit)。
put(e)和take()方法會一直阻塞調用線程,直到線程被中斷或隊列狀態可用;
offer(e, time, unit)和poll(time, unit)方法會限時阻塞調用線程,直到超時或線程被中斷或隊列狀態可用。
public interface BlockingQueueextends Queue { /** * 插入元素e至隊尾, 如果隊列已滿, 則阻塞調用線程直到隊列有空閑空間. */ void put(E e) throws InterruptedException; /** * 插入元素e至隊列, 如果隊列已滿, 則限時阻塞調用線程,直到隊列有空閑空間或超時. */ boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; /** * 從隊首刪除元素,如果隊列為空, 則阻塞調用線程直到隊列中有元素. */ E take() throws InterruptedException; /** * 從隊首刪除元素,如果隊列為空, 則限時阻塞調用線程,直到隊列中有元素或超時. */ E poll(long timeout, TimeUnit unit) throws InterruptedException; // ... }
除此之外,BlockingQueue還具有以下特點:
BlockingQueue隊列中不能包含null元素;
BlockingQueue接口的實現類都必須是線程安全的,實現類一般通過“鎖”保證線程安全;
BlockingQueue 可以是限定容量的。remainingCapacity()方法用于返回剩余可用容量,對于沒有容量限制的BlockingQueue實現,該方法總是返回Integer.MAX_VALUE 。
三、再談“生產者-消費者”模式最后,我們來看下如何利用BlockingQueue來實現生產者-消費者模式。在生產者-消費者模式中,一共有四類角色:生產者、消費者、消息隊列、消息體。我們利用BlockingQueue來實現消息隊列,其余部分沒有什么變化。
Producer(生產者)生產者生產消息體(Data),并將消息體(Data)傳遞給通道(Channel)。
/** * 生產者 */ public class Producer implements Runnable { private Channel channel; public Producer(Channel channel) { this.channel = channel; } @Override public void run() { while (true) { String v = String.valueOf(ThreadLocalRandom.current().nextInt()); Data data = new Data(v); try { channel.put(data); System.out.println(Thread.currentThread().getName() + " produce :" + data); } catch (InterruptedException e) { e.printStackTrace(); } Thread.yield(); } } }Consumer(消費者)
消費者從通道(Channel)中獲取數據,進行處理。
/** * 消費者 */ public class Consumer implements Runnable { private final Channel channel; public Consumer(Channel channel) { this.channel = channel; } @Override public void run() { while (true) { try { Object obj = channel.take(); System.out.println(Thread.currentThread().getName() + " consume :" + obj.toString()); } catch (InterruptedException e) { e.printStackTrace(); } Thread.yield(); } } }Channel(通道)
相當于消息的隊列,對消息進行排隊,控制消息的傳輸。
/** * 通道類 */ public class Channel { private final BlockingQueue blockingQueue; public Channel(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; } public Object take() throws InterruptedException { return blockingQueue.take(); } public void put(Object o) throws InterruptedException { blockingQueue.put(o); } }Data(消息體/數據)
Data代表了實際生產或消費的數據。
/** * 數據/消息 */ public class Dataimplements Serializable { private T data; public Data(T data) { this.data = data; } public T getData() { return data; } public void setData(T data) { this.data = data; } @Override public String toString() { return "Data{" + "data=" + data + "}"; } }
調用如下:
public class Main { public static void main(String[] args) { BlockingQueue blockingQueue = new SomeQueueImplementation(); Channel channel = new Channel(blockingQueue); Producer p = new Producer(channel); Consumer c1 = new Consumer(channel); Consumer c2 = new Consumer(channel); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77009.html
摘要:接口截止目前為止,我們介紹的阻塞隊列都是實現了接口。該類在構造時一般需要指定容量,如果不指定,則最大容量為。另外,由于內部通過來保證線程安全,所以的整體實現時比較簡單的。另外,雙端隊列相比普通隊列,主要是多了隊尾出隊元素隊首入隊元素的功能。 showImg(https://segmentfault.com/img/bVbgZ7j?w=770&h=514); 本文首發于一世流云專欄:ht...
摘要:整個包,按照功能可以大致劃分如下鎖框架原子類框架同步器框架集合框架執行器框架本系列將按上述順序分析,分析所基于的源碼為。后,根據一系列常見的多線程設計模式,設計了并發包,其中包下提供了一系列基礎的鎖工具,用以對等進行補充增強。 showImg(https://segmentfault.com/img/remote/1460000016012623); 本文首發于一世流云專欄:https...
摘要:在章節中,我們說過,維護了一把全局鎖,無論是出隊還是入隊,都共用這把鎖,這就導致任一時間點只有一個線程能夠執行。入隊鎖對應的是條件隊列,出隊鎖對應的是條件隊列,所以每入隊一個元素,應當立即去喚醒可能阻塞的其它入隊線程。 showImg(https://segmentfault.com/img/bVbgCD9?w=1920&h=1080); 本文首發于一世流云專欄:https://seg...
摘要:在隊尾插入指定元素,如果隊列已滿,則阻塞線程加鎖隊列已滿。這里必須用,防止虛假喚醒在隊列上等待之所以這樣做,是防止線程被意外喚醒,不經再次判斷就直接調用方法。 showImg(https://segmentfault.com/img/bVbgCD0?w=768&h=512); 本文首發于一世流云專欄:https://segmentfault.com/blog... 一、ArrayBl...
摘要:初始狀態對應二叉樹結構將頂點與最后一個結點調換即將頂點與最后一個結點交換,然后將索引為止置。 showImg(https://segmentfault.com/img/bVbgOtL?w=1600&h=800); 本文首發于一世流云專欄:https://segmentfault.com/blog... 一、PriorityBlockingQueue簡介 PriorityBlockin...
閱讀 3296·2021-11-24 09:39
閱讀 2805·2021-10-12 10:20
閱讀 1908·2019-08-30 15:53
閱讀 3076·2019-08-30 14:14
閱讀 2600·2019-08-29 15:36
閱讀 1121·2019-08-29 14:11
閱讀 1956·2019-08-26 13:51
閱讀 3408·2019-08-26 13:23