摘要:今天在群上拋出來(lái)一個(gè)問(wèn)題,如下我以自帶的數(shù)據(jù)結(jié)構(gòu)為例,用源碼的形式說(shuō)明,如何阻塞線程通知線程的。一以可重入鎖和兩個(gè)對(duì)象來(lái)控制并發(fā)。四使用來(lái)控制并發(fā),同時(shí)也使用的對(duì)象來(lái)與線程交互。
今天在QQ群上拋出來(lái)一個(gè)問(wèn)題,如下
我以Java自帶的數(shù)據(jù)結(jié)構(gòu)為例,用源碼的形式說(shuō)明,如何阻塞線程、通知線程的。
一、Lock & Condition
ArrayBlockingQueue以可重入鎖和兩個(gè)Condition對(duì)象來(lái)控制并發(fā)。
/* * Concurrency control uses the classic two-condition algorithm * found in any textbook. */ /** Main lock guarding all access */ private final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
構(gòu)造函數(shù)中初始化了notEmpty和notFull.
/** * Creates an ArrayBlockingQueue with the given (fixed) * capacity and the specified access policy. * @param capacity the capacity of this queue * @param fair if true then queue accesses for threads blocked * on insertion or removal, are processed in FIFO order; * if false the access order is unspecified. * @throws IllegalArgumentException if capacity is less than 1 */ public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
二、線程阻塞
當(dāng)ArrayBlockingQueue存儲(chǔ)的元素是0個(gè)的時(shí)候,take()方法會(huì)阻塞.
public Object take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { try { while (count == 0) notEmpty.await(); } catch (InterruptedException ie) { notEmpty.signal(); // propagate to non-interrupted thread throw ie; } Object x = extract(); return x; } finally { lock.unlock(); } }
這里take方法首先獲得可重入鎖lock,然后判斷如果元素為空就執(zhí)行notEmpty.await(); 這個(gè)時(shí)候線程掛起。
三、通知線程
比如使用put放入一個(gè)新元素,
/** * Inserts the specified element at the tail of this queue, waiting * for space to become available if the queue is full. * * @throws InterruptedException {@inheritDoc} * @throws NullPointerException {@inheritDoc} */ public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } }
在enqueue方法中,
/** * Inserts element at current put position, advances, and signals. * Call only when holding lock. */ private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
對(duì)剛才的notEmptyCondition進(jìn)行通知。
四、ReentrantLock vs AbstractQueuedSynchronizer
ArrayBlockingQueue使用ReentrantLock來(lái)控制并發(fā),同時(shí)也使用ArrayBlockingQueue的Condition對(duì)象來(lái)與線程交互。notEmpty和notFull都是由
ReentrantLock的成員變量sync生成的,
public Condition newCondition() { return sync.newCondition(); }
sync可以認(rèn)為是一個(gè)抽象類(lèi)類(lèi)型,Sync,它是在ReentrantLock內(nèi)部定義的靜態(tài)抽象類(lèi),抽象類(lèi)實(shí)現(xiàn)了newCondition方法,
final ConditionObject newCondition() { return new ConditionObject(); }
返回的類(lèi)型是實(shí)現(xiàn)了Condition接口的ConditionObject類(lèi),這是在AbstractQueuedSynchronizer內(nèi)部定義的類(lèi)。在ArrayBlockingQueue中的notEmpty就是ConditionObject實(shí)例。
阻塞:
當(dāng)ArrayBlockingQueue為空時(shí),notEmpty.await()將自己掛起,如ConditionObject的await方法,
/** * Implements interruptible condition wait. **
*/ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }- If current thread is interrupted, throw InterruptedException. *
- Save lock state returned by {@link #getState}. *
- Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. *
- Block until signalled or interrupted. *
- Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. *
- If interrupted while blocked in step 4, throw InterruptedException. *
addConditionWaiter是將當(dāng)前線程作為一個(gè)node加入到ConditionObject的隊(duì)列中,隊(duì)列是用鏈表實(shí)現(xiàn)的。
如果是初次加入隊(duì)列的情況,node.waitStatus == Node.CONDITION成立,方法isOnSyncQueue返回false,那么就將線程park。
while (!isOnSyncQueue(node)) { LockSupport.park(this); .... }
至此線程被掛起,LockSupport.park(this);這里this是指ConditionObject,是notEmpty.
通知:
當(dāng)新的元素put進(jìn)入ArrayBlockingQueue后,notEmpty.signal()通知在這上面等待的線程,如ConditionObject的signal方法,
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
doSignal方法,
/** * Removes and transfers nodes until hit non-cancelled one or * null. Split out from signal in part to encourage compilers * to inline the case of no waiters. * @param first (non-null) the first node on condition queue */ private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
doSignal一開(kāi)始接收到的參數(shù)就是firstWaiter這個(gè)參數(shù),在內(nèi)部實(shí)現(xiàn)中用了do..while的形式,首先將first的的nextWaiter找出來(lái)保存到firstWaiter此時(shí)(first和firstWaiter不是一回事),在while的比較條件中可調(diào)用了transferForSignal方法,
整個(gè)while比較條件可以看著短路邏輯,如果transferForSignal結(jié)果為true,后面的first = firstWaiter就不執(zhí)行了,整個(gè)while循環(huán)就結(jié)束了。
參照注釋?zhuān)?/p>
transferForSignal方法,
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
首先確保想要被signal的等待node還是處于Node.CONDITION狀態(tài),然后調(diào)整狀態(tài)為Node.SIGNAL,這兩個(gè)都是采用CAS方法,最后調(diào)用的是
LockSupport.unpark(node.thread);
五、LockSupport
至此,我們已經(jīng)知道了線程的掛起和通知都是使用LockSupport來(lái)完成的,并發(fā)數(shù)據(jù)結(jié)構(gòu)與線程直接的交互最終也是需要LockSupport。那么關(guān)于LockSupport,我們又可以了解多少呢?
Ref:
Java中的ReentrantLock和synchronized兩種鎖定機(jī)制的對(duì)比
Java的LockSupport.park()實(shí)現(xiàn)分析
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/68083.html
摘要:線程安全的線程安全的,在讀多寫(xiě)少的場(chǎng)合性能非常好,遠(yuǎn)遠(yuǎn)好于高效的并發(fā)隊(duì)列,使用鏈表實(shí)現(xiàn)。這樣帶來(lái)的好處是在高并發(fā)的情況下,你會(huì)需要一個(gè)全局鎖來(lái)保證整個(gè)平衡樹(shù)的線程安全。 該文已加入開(kāi)源項(xiàng)目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識(shí)的文檔類(lèi)項(xiàng)目,Star 數(shù)接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...
摘要:本人郵箱歡迎轉(zhuǎn)載轉(zhuǎn)載請(qǐng)注明網(wǎng)址代碼已經(jīng)全部托管有需要的同學(xué)自行下載引言做的同學(xué)們或多或少的接觸過(guò)集合框架在集合框架中大多的集合類(lèi)是線程不安全的比如我們常用的等等我們寫(xiě)一個(gè)例子看為什么說(shuō)是不安全的例子證明是線程不安全的我們開(kāi)啟個(gè)線程每個(gè)線程向 本人郵箱: 歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明網(wǎng)址 http://blog.csdn.net/tianshi_kcogithub: https://github...
摘要:如果隊(duì)列已滿,這個(gè)時(shí)候?qū)懖僮鞯木€程進(jìn)入到寫(xiě)線程隊(duì)列排隊(duì),等待讀線程將隊(duì)列元素移除騰出空間,然后喚醒寫(xiě)線程隊(duì)列的第一個(gè)等待線程。數(shù)據(jù)必須從某個(gè)寫(xiě)線程交給某個(gè)讀線程,而不是寫(xiě)到某個(gè)隊(duì)列中等待被消費(fèi)。 前言 本文直接參考 Doug Lea 寫(xiě)的 Java doc 和注釋?zhuān)@也是我們?cè)趯W(xué)習(xí) java 并發(fā)包時(shí)最好的材料了。希望大家能有所思、有所悟,學(xué)習(xí) Doug Lea 的代碼風(fēng)格,并將其優(yōu)雅...
摘要:序本文主要簡(jiǎn)單介紹下與。有界無(wú)界有界,適合已知最大存儲(chǔ)容量的場(chǎng)景可有界可以無(wú)界吞吐量在大多數(shù)并發(fā)的場(chǎng)景下吞吐量比,但是性能不穩(wěn)定。測(cè)試結(jié)果表明,的可伸縮性要高于。 序 本文主要簡(jiǎn)單介紹下ArrayBlockingQueue與LinkedBlockingQueue。 對(duì)比 queue 阻塞與否 是否有界 線程安全保障 適用場(chǎng)景 注意事項(xiàng) ArrayBlockingQueue 阻...
摘要:自己實(shí)現(xiàn)在自己實(shí)現(xiàn)之前先搞清楚阻塞隊(duì)列的幾個(gè)特點(diǎn)基本隊(duì)列特性先進(jìn)先出。消費(fèi)隊(duì)列空時(shí)會(huì)阻塞直到寫(xiě)入線程寫(xiě)入了隊(duì)列數(shù)據(jù)后喚醒消費(fèi)線程。最終的隊(duì)列大小為,可見(jiàn)線程也是安全的。 showImg(https://segmentfault.com/img/remote/1460000018811340); 前言 較長(zhǎng)一段時(shí)間以來(lái)我都發(fā)現(xiàn)不少開(kāi)發(fā)者對(duì) jdk 中的 J.U.C(java.util.c...
閱讀 2249·2021-11-23 09:51
閱讀 1077·2021-11-22 15:35
閱讀 4859·2021-11-22 09:34
閱讀 1605·2021-10-08 10:13
閱讀 3023·2021-07-22 17:35
閱讀 2539·2019-08-30 15:56
閱讀 3086·2019-08-29 18:44
閱讀 3097·2019-08-29 15:32