摘要:序本文主要簡單介紹下與。有界無界有界,適合已知最大存儲容量的場景可有界可以無界吞吐量在大多數并發的場景下吞吐量比,但是性能不穩定。測試結果表明,的可伸縮性要高于。
序
本文主要簡單介紹下ArrayBlockingQueue與LinkedBlockingQueue。
對比queue | 阻塞與否 | 是否有界 | 線程安全保障 | 適用場景 | 注意事項 |
---|---|---|---|---|---|
ArrayBlockingQueue | 阻塞 | 有界 | 一把全局鎖 | 生產消費模型,平衡兩邊處理速度 | 用于存儲隊列元素的存儲空間是預先分配的,使用過程中內存開銷較小(無須動態申請存儲空間) |
LinkedBlockingQueue | 阻塞 | 可配置 | 存取采用2把鎖 | 生產消費模型,平衡兩邊處理速度 | 無界的時候注意內存溢出問題,用于存儲隊列元素的存儲空間是在其使用過程中動態分配的,因此它可能會增加JVM垃圾回收的負擔。 |
ConcurrentLinkedQueue | 非阻塞 | 無界 | CAS | 對全局的集合進行操作的場景 | size() 是要遍歷一遍集合,慎用 |
ArrayBlockingQueue
用于存儲隊列元素的存儲空間是預先分配的,使用過程中內存開銷較小(無須動態申請存儲空間)
LinkedBlockingQueue
用于存儲隊列元素的存儲空間是在其使用過程中動態分配的,因此它可能會增加JVM垃圾回收的負擔。
ArrayBlockingQueue
有界,適合已知最大存儲容量的場景
LinkedBlockingQueue
可有界可以無界
LinkedBlockingQueue在大多數并發的場景下吞吐量比ArrayBlockingQueue,但是性能不穩定。
Linked queues typically have higher throughput than array-based queues but less predictable performance in most concurrent applications.
測試結果表明,LinkedBlockingQueue的可伸縮性要高于ArrayBlockingQueue。初看起來,這個結果有些奇怪:鏈表隊列在每次插入元素時,都必須分配一個鏈表節點對象,這似乎比基于數組的隊列執行了更多的工作。然而,雖然它擁有更好的內存分配與GC等開銷,但與基于數組的隊列相比,鏈表隊列的put和take等方法支持并發性更高的訪問,因為一些優化后的鏈接隊列算法能將隊列頭節點的更新操作與尾節點的更新操作分離開來。由于內存分配操作通常是線程本地的,因此如果算法能通過多執行一些內存分配操作來降低競爭程度,那么這種算法通常具有更高的可伸縮性。
并發方面ArrayBlockingQueue
采用一把鎖,兩個condition
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; /** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); } /** * Extracts element at current take position, advances, and signals. * Call only when holding lock. */ private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
此外還支持公平鎖
/** * Creates an {@code ArrayBlockingQueue} with the given (fixed) * capacity and the specified access policy. * * @param capacity the capacity of this queue * @param fair if {@code true} then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if {@code false} the access order is unspecified. * @throws IllegalArgumentException if {@code capacity < 1} */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
LinkedBlockingQueue
頭尾各1把鎖
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition(); /** * Inserts the specified element at the tail of this queue if it is * possible to do so immediately without exceeding the queue"s capacity, * returning {@code true} upon success and {@code false} if this queue * is full. * When using a capacity-restricted queue, this method is generally * preferable to method {@link BlockingQueue#add add}, which can fail to * insert an element only by throwing an exception. * * @throws NullPointerException if the specified element is null */ public boolean offer(E e) { if (e == null) throw new NullPointerException(); final AtomicInteger count = this.count; if (count.get() == capacity) return false; int c = -1; Node應用實例 Executorsnode = new Node (e); final ReentrantLock putLock = this.putLock; putLock.lock(); try { if (count.get() < capacity) { enqueue(node); c = count.getAndIncrement(); if (c + 1 < capacity) notFull.signal(); } } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); return c >= 0; } public E poll(long timeout, TimeUnit unit) throws InterruptedException { E x = null; int c = -1; long nanos = unit.toNanos(timeout); final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { if (nanos <= 0) return null; nanos = notEmpty.awaitNanos(nanos); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
里頭用了LinkedBlockingQueue
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue (), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue ())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue (), threadFactory)); }
使用LinkedBlockingQueue實現logger
public class BungeeLogger extends Logger { private final ColouredWriter writer; private final Formatter formatter = new ConciseFormatter(); // private final LogDispatcher dispatcher = new LogDispatcher(this); private final BlockingQueuedocqueue = new LinkedBlockingQueue<>(); volatile boolean running = true; Thread recvThread = new Thread(){ @Override public void run() { while (!isInterrupted() && running) { LogRecord record; try { record = queue.take(); } catch (InterruptedException ex) { continue; } doLog(record); } for (LogRecord record : queue) { doLog(record); } } }; public BungeeLogger() throws IOException { super("BungeeCord", null); this.writer = new ColouredWriter(new ConsoleReader()); try { FileHandler handler = new FileHandler("proxy.log", 1 << 24, 8, true); handler.setFormatter(formatter); addHandler(handler); } catch (IOException ex) { System.err.println("Could not register logger!"); ex.printStackTrace(); } recvThread.start(); Runtime.getRuntime().addShutdownHook(new Thread(){ @Override public void run() { running = false; } }); } @Override public void log(LogRecord record) { if (running) { queue.add(record); } } void doLog(LogRecord record) { super.log(record); writer.print(formatter.format(record)); } }
BungeeCord
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70386.html
摘要:線程安全的線程安全的,在讀多寫少的場合性能非常好,遠遠好于高效的并發隊列,使用鏈表實現。這樣帶來的好處是在高并發的情況下,你會需要一個全局鎖來保證整個平衡樹的線程安全。 該文已加入開源項目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識的文檔類項目,Star 數接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...
摘要:本人郵箱歡迎轉載轉載請注明網址代碼已經全部托管有需要的同學自行下載引言做的同學們或多或少的接觸過集合框架在集合框架中大多的集合類是線程不安全的比如我們常用的等等我們寫一個例子看為什么說是不安全的例子證明是線程不安全的我們開啟個線程每個線程向 本人郵箱: 歡迎轉載,轉載請注明網址 http://blog.csdn.net/tianshi_kcogithub: https://github...
摘要:在章節中,我們說過,維護了一把全局鎖,無論是出隊還是入隊,都共用這把鎖,這就導致任一時間點只有一個線程能夠執行。入隊鎖對應的是條件隊列,出隊鎖對應的是條件隊列,所以每入隊一個元素,應當立即去喚醒可能阻塞的其它入隊線程。 showImg(https://segmentfault.com/img/bVbgCD9?w=1920&h=1080); 本文首發于一世流云專欄:https://seg...
摘要:引言在包中,很好的解決了在多線程中,如何高效安全傳輸數據的問題。同時,也用于自帶線程池的緩沖隊列中,了解也有助于理解線程池的工作模型。 引言 在java.util.Concurrent包中,BlockingQueue很好的解決了在多線程中,如何高效安全傳輸數據的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質量的多線程程序帶來極大的便利。同時,BlockingQueue也用于...
摘要:源碼分析創建可緩沖的線程池。源碼分析使用創建線程池源碼分析的構造函數構造函數參數核心線程數大小,當線程數,會創建線程執行最大線程數,當線程數的時候,會把放入中保持存活時間,當線程數大于的空閑線程能保持的最大時間。 之前創建線程的時候都是用的 newCachedThreadPoo,newFixedThreadPool,newScheduledThreadPool,newSingleThr...
閱讀 1792·2021-09-03 10:50
閱讀 1327·2019-08-30 15:55
閱讀 3369·2019-08-30 15:52
閱讀 1231·2019-08-30 15:44
閱讀 935·2019-08-30 15:44
閱讀 3319·2019-08-30 14:23
閱讀 3551·2019-08-28 17:51
閱讀 2291·2019-08-26 13:52