摘要:引言在包中,很好的解決了在多線程中,如何高效安全傳輸數據的問題。同時,也用于自帶線程池的緩沖隊列中,了解也有助于理解線程池的工作模型。
引言
在java.util.Concurrent包中,BlockingQueue很好的解決了在多線程中,如何高效安全“傳輸”數據的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質量的多線程程序帶來極大的便利。同時,BlockingQueue也用于java自帶線程池的緩沖隊列中,了解BlockingQueue也有助于理解線程池的工作模型。
一 BlockingQueue接口該接口屬于隊列,所以繼承了Queue接口,該接口最重要的五個方法分別是offer方法,poll方法,put方法,take方法和drainTo方法。
offer方法和poll方法分別有一個靜態(tài)重載方法,分別是offer(E e, long timeout, TimeUnit unit)和poll(long timeout, TimeUnit unit)方法。其意義是在限定時間內存入或取出對象,如果不能存入取出則返回false。
put方法會在當隊列存儲對象達到限定值時阻塞線程,而在隊列不為空時喚醒被take方法所阻塞的線程。take方法是相反的。
drainTo方法可批量獲取隊列中的元素。
二 常見的BlockingQueue實現 一 LinkedBlockingQueueLinkedBlockingQueue是比較常見的BlockingQueue的實現,他是基于鏈表的阻塞隊列。在創(chuàng)建該對象時如果不指定可存儲對象個數大小時,默認為Integer.MAX_VALUE。當生產者往隊列中放入一個數據時,隊列會從生產者手中獲取數據,并緩存在隊列內部,而生產者立即返回;只有當隊列緩沖區(qū)達到最大值緩存容量時,才會阻塞生產者隊列,直到消費者從隊列中消費掉一份數據,生產者線程會被喚醒,反之對于消費者這端的處理也基于同樣的原理。
LinkedBlockingQueue內部使用了獨立的兩把鎖來控制數據同步,這也意味著在高并發(fā)的情況下生產者和消費者可以并行地操作隊列中的數據,以此來提高整個隊列的并發(fā)性能。
put方法和offer方法:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Nodenode = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); try { while (count.get() == capacity) { notFull.await(); } enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); } public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node node = new Node(e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; } }
這兩個方法的區(qū)別是put方法在容量達到上限時會阻塞,而offer方法則會直接返回false。
二 ArrayBlockingQueueArrayBlockingQueue是基于數組的阻塞隊列,除了有一個定長數組外,ArrayBlockingQueue內部還保存著兩個整形變量,分別標識著隊列的頭部和尾部在數組中的位置。ArrayBlockingQueue在生產者放入數據和消費者獲取數據,都是共用同一個鎖對象,由此也意味著兩者無法真正并行運行,這點尤其不同于LinkedBlockingQueue。
ArrayBlockingQueue和LinkedBlockingQueue間還有一個明顯的不同之處在于,前者在插入或刪除元素時不會產生或銷毀任何額外的對象實例,而后者則會生成一個額外的Node對象。
是一種沒有緩沖的阻塞隊列,在生產者put的同時必須要有一個消費者進行take,否則就會阻塞。聲明一個SynchronousQueue有兩種不同的方式。公平模式和非公平模式的區(qū)別:如果采用公平模式:SynchronousQueue會采用公平鎖,并配合一個FIFO隊列來阻塞多余的生產者和消費者,從而體系整體的公平策略;但如果是非公平模式(SynchronousQueue默認):SynchronousQueue采用非公平鎖,同時配合一個LIFO隊列來管理多余的生產者和消費者,而后一種模式,如果生產者和消費者的處理速度有差距,則很容易出現饑渴的情況,即可能有某些生產者或者是消費者的數據永遠都得不到處理。
四 PriorityBlockingQueue和DelayQueuePriorityBlockingQueue是基于優(yōu)先級的阻塞隊列,該隊列不會阻塞生產者,只會阻塞消費者。
DelayQueue隊列存儲的對象只有指定的延遲時間到了才能被取出,該隊列也不會阻塞生產者。
三 BlockingQueue的使用在處理多線程生產者消費者問題時的演示代碼:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; /** * Created by gavin on 15-8-30. */ public class BlockingQueueTest { public static void main(String[] args) { BlockingQueue四 總結queue = new ArrayBlockingQueue (1000); Thread p1 = new Thread(new Producer(queue),"producer1"); Thread p2 = new Thread(new Producer(queue),"producer2"); Thread c1 = new Thread(new Consumer(queue),"consumer1"); Thread c2 = new Thread(new Consumer(queue),"consumer2"); p1.start(); p2.start(); c1.start(); c2.start(); } } class Producer implements Runnable{ private BlockingQueue queue; public Producer(BlockingQueue queue) { this.queue = queue; } public void run() { int i = 0; while (!Thread.currentThread().isInterrupted()) { try { queue.put(Thread.currentThread().getName()+" product "+i); } catch (InterruptedException e) { System.err.println(Thread.currentThread().getName() + " error"); } i++; try { Thread.sleep(1000); } catch (InterruptedException e) { } } } } class Consumer implements Runnable{ private BlockingQueue queue; public Consumer(BlockingQueue queue) { this.queue = queue; } public void run() { int i = 0; while (!Thread.currentThread().isInterrupted()) { try { String str = queue.take(); System.out.println(str); } catch (InterruptedException e) { } try { Thread.sleep(300); } catch (InterruptedException e) { } } } }
BlockingQueue在并發(fā)編程中扮演著重要的角色,既可以自己用來解決生產者消費者問題,也用于java自帶線程池的緩沖隊列。
參考:
BlockingQueue
更多文章:http://blog.gavinzh.com
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/64465.html
摘要:如果隊列已滿,這個時候寫操作的線程進入到寫線程隊列排隊,等待讀線程將隊列元素移除騰出空間,然后喚醒寫線程隊列的第一個等待線程。數據必須從某個寫線程交給某個讀線程,而不是寫到某個隊列中等待被消費。 前言 本文直接參考 Doug Lea 寫的 Java doc 和注釋,這也是我們在學習 java 并發(fā)包時最好的材料了。希望大家能有所思、有所悟,學習 Doug Lea 的代碼風格,并將其優(yōu)雅...
摘要:當活動線程核心線程非核心線程達到這個數值后,后續(xù)任務將會根據來進行拒絕策略處理。線程池工作原則當線程池中線程數量小于則創(chuàng)建線程,并處理請求。當線程池中的數量等于最大線程數時默默丟棄不能執(zhí)行的新加任務,不報任何異常。 spring-cache使用記錄 spring-cache的使用記錄,坑點記錄以及采用的解決方案 深入分析 java 線程池的實現原理 在這篇文章中,作者有條不紊的將 ja...
摘要:與最大線程池比較。如果加入成功,需要二次檢查線程池的狀態(tài)如果線程池沒有處于,則從移除任務,啟動拒絕策略。如果線程池處于狀態(tài),則檢查工作線程是否為。線程池將如何工作這個問題應該就不難回答了。 原文地址:https://www.xilidou.com/2018/02/09/thread-corepoolsize/ 最近在看《Java并發(fā)編程的藝術》回顧線程池的原理和參數的時候發(fā)現一個問題,...
摘要:和方法會一直阻塞調用線程,直到線程被中斷或隊列狀態(tài)可用和方法會限時阻塞調用線程,直到超時或線程被中斷或隊列狀態(tài)可用。 showImg(https://segmentfault.com/img/bVbgyPy?w=1191&h=670); 本文首發(fā)于一世流云專欄:https://segmentfault.com/blog... 一、引言 從本節(jié)開始,我們將介紹juc-collectio...
摘要:我們將使用單個線程管理任務放入隊列的操作以及從隊列中取出的操作。同時這個線程會持續(xù)的管理隊列。另一個線程將用來創(chuàng)建,它將一直運行知道服務器終止。此線程永遠不會過期,有助于實現持續(xù)監(jiān)控。這些請求將會自動的被獲取,并在線程中繼續(xù)處理。 在Java中,BlockingQueue接口位于java.util.concurrent包下。阻塞隊列主要用來線程安全的實現生產者-消費者模型。他們可以使用...
閱讀 1972·2021-11-25 09:43
閱讀 653·2021-10-11 10:58
閱讀 1730·2019-08-30 15:55
閱讀 1725·2019-08-30 13:13
閱讀 736·2019-08-29 17:01
閱讀 1840·2019-08-29 15:30
閱讀 789·2019-08-29 13:49
閱讀 2172·2019-08-29 12:13