国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專(zhuān)欄INFORMATION COLUMN

并發(fā)數(shù)據(jù)結(jié)構(gòu)與線程(ArrayBlockingQueue)

yck / 414人閱讀

摘要:今天在群上拋出來(lái)一個(gè)問(wèn)題,如下我以自帶的數(shù)據(jù)結(jié)構(gòu)為例,用源碼的形式說(shuō)明,如何阻塞線程通知線程的。一以可重入鎖和兩個(gè)對(duì)象來(lái)控制并發(fā)。四使用來(lái)控制并發(fā),同時(shí)也使用的對(duì)象來(lái)與線程交互。

今天在QQ群上拋出來(lái)一個(gè)問(wèn)題,如下

我以Java自帶的數(shù)據(jù)結(jié)構(gòu)為例,用源碼的形式說(shuō)明,如何阻塞線程、通知線程的。

一、Lock & Condition
ArrayBlockingQueue以可重入鎖和兩個(gè)Condition對(duì)象來(lái)控制并發(fā)。

    /*
     * Concurrency control uses the classic two-condition algorithm
     * found in any textbook.
     */

    /** Main lock guarding all access */
    private final ReentrantLock lock;
    /** Condition for waiting takes */
    private final Condition notEmpty;
    /** Condition for waiting puts */
    private final Condition notFull;

構(gòu)造函數(shù)中初始化了notEmpty和notFull.

    /**
     * Creates an ArrayBlockingQueue with the given (fixed)
     * capacity and the specified access policy.
     * @param capacity the capacity of this queue
     * @param fair if true then queue accesses for threads blocked
     *        on insertion or removal, are processed in FIFO order;
     *        if false the access order is unspecified.
     * @throws IllegalArgumentException if capacity is less than 1
     */
    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

二、線程阻塞
當(dāng)ArrayBlockingQueue存儲(chǔ)的元素是0個(gè)的時(shí)候,take()方法會(huì)阻塞.

    public Object take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            try {
                while (count == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to non-interrupted thread
                throw ie;
            }
            Object x = extract();
            return x;
        } finally {
            lock.unlock();
        }
    }

這里take方法首先獲得可重入鎖lock,然后判斷如果元素為空就執(zhí)行notEmpty.await(); 這個(gè)時(shí)候線程掛起。

三、通知線程
比如使用put放入一個(gè)新元素,

    /**
     * Inserts the specified element at the tail of this queue, waiting
     * for space to become available if the queue is full.
     *
     * @throws InterruptedException {@inheritDoc}
     * @throws NullPointerException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

enqueue方法中,

    /**
     * Inserts element at current put position, advances, and signals.
     * Call only when holding lock.
     */
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        notEmpty.signal();
    }

對(duì)剛才的notEmptyCondition進(jìn)行通知。
四、ReentrantLock vs AbstractQueuedSynchronizer
ArrayBlockingQueue使用ReentrantLock來(lái)控制并發(fā),同時(shí)也使用ArrayBlockingQueue的Condition對(duì)象來(lái)與線程交互。notEmptynotFull都是由
ReentrantLock的成員變量sync生成的,

public Condition newCondition() {
        return sync.newCondition();
    }

sync可以認(rèn)為是一個(gè)抽象類(lèi)類(lèi)型,Sync,它是在ReentrantLock內(nèi)部定義的靜態(tài)抽象類(lèi),抽象類(lèi)實(shí)現(xiàn)了newCondition方法,

final ConditionObject newCondition() {
            return new ConditionObject();
        }

返回的類(lèi)型是實(shí)現(xiàn)了Condition接口的ConditionObject類(lèi),這是在AbstractQueuedSynchronizer內(nèi)部定義的類(lèi)。在ArrayBlockingQueue中的notEmpty就是ConditionObject實(shí)例。

阻塞:
當(dāng)ArrayBlockingQueue為空時(shí),notEmpty.await()將自己掛起,如ConditionObject的await方法,

        /**
         * Implements interruptible condition wait.
         * 
    *
  1. If current thread is interrupted, throw InterruptedException. *
  2. Save lock state returned by {@link #getState}. *
  3. Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. *
  4. Block until signalled or interrupted. *
  5. Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. *
  6. If interrupted while blocked in step 4, throw InterruptedException. *
*/ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }

addConditionWaiter是將當(dāng)前線程作為一個(gè)node加入到ConditionObject的隊(duì)列中,隊(duì)列是用鏈表實(shí)現(xiàn)的。
如果是初次加入隊(duì)列的情況,node.waitStatus == Node.CONDITION成立,方法isOnSyncQueue返回false,那么就將線程park。

while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                ....
}

至此線程被掛起,LockSupport.park(this);這里this是指ConditionObject,是notEmpty.
通知:
當(dāng)新的元素put進(jìn)入ArrayBlockingQueue后,notEmpty.signal()通知在這上面等待的線程,如ConditionObject的signal方法,

/**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

doSignal方法,

        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

doSignal一開(kāi)始接收到的參數(shù)就是firstWaiter這個(gè)參數(shù),在內(nèi)部實(shí)現(xiàn)中用了do..while的形式,首先將first的的nextWaiter找出來(lái)保存到firstWaiter此時(shí)(first和firstWaiter不是一回事),在while的比較條件中可調(diào)用了transferForSignal方法,
整個(gè)while比較條件可以看著短路邏輯,如果transferForSignal結(jié)果為true,后面的first = firstWaiter就不執(zhí)行了,整個(gè)while循環(huán)就結(jié)束了。

參照注釋?zhuān)?/p>

transferForSignal方法,

    /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

首先確保想要被signal的等待node還是處于Node.CONDITION狀態(tài),然后調(diào)整狀態(tài)為Node.SIGNAL,這兩個(gè)都是采用CAS方法,最后調(diào)用的是

LockSupport.unpark(node.thread);

五、LockSupport
至此,我們已經(jīng)知道了線程的掛起和通知都是使用LockSupport來(lái)完成的,并發(fā)數(shù)據(jù)結(jié)構(gòu)與線程直接的交互最終也是需要LockSupport。那么關(guān)于LockSupport,我們又可以了解多少呢?

Ref:
Java中的ReentrantLock和synchronized兩種鎖定機(jī)制的對(duì)比
Java的LockSupport.park()實(shí)現(xiàn)分析

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/68083.html

相關(guān)文章

  • 通俗易懂,JDK 并發(fā)容器總結(jié)

    摘要:線程安全的線程安全的,在讀多寫(xiě)少的場(chǎng)合性能非常好,遠(yuǎn)遠(yuǎn)好于高效的并發(fā)隊(duì)列,使用鏈表實(shí)現(xiàn)。這樣帶來(lái)的好處是在高并發(fā)的情況下,你會(huì)需要一個(gè)全局鎖來(lái)保證整個(gè)平衡樹(shù)的線程安全。 該文已加入開(kāi)源項(xiàng)目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識(shí)的文檔類(lèi)項(xiàng)目,Star 數(shù)接近 14 k)。地址:https://github.com/Snailclimb... 一 JDK ...

    curlyCheng 評(píng)論0 收藏0
  • (十五)java多線程并發(fā)集合ArrayBlockingQueue

    摘要:本人郵箱歡迎轉(zhuǎn)載轉(zhuǎn)載請(qǐng)注明網(wǎng)址代碼已經(jīng)全部托管有需要的同學(xué)自行下載引言做的同學(xué)們或多或少的接觸過(guò)集合框架在集合框架中大多的集合類(lèi)是線程不安全的比如我們常用的等等我們寫(xiě)一個(gè)例子看為什么說(shuō)是不安全的例子證明是線程不安全的我們開(kāi)啟個(gè)線程每個(gè)線程向 本人郵箱: 歡迎轉(zhuǎn)載,轉(zhuǎn)載請(qǐng)注明網(wǎng)址 http://blog.csdn.net/tianshi_kcogithub: https://github...

    stefan 評(píng)論0 收藏0
  • 解讀 Java 并發(fā)隊(duì)列 BlockingQueue

    摘要:如果隊(duì)列已滿,這個(gè)時(shí)候?qū)懖僮鞯木€程進(jìn)入到寫(xiě)線程隊(duì)列排隊(duì),等待讀線程將隊(duì)列元素移除騰出空間,然后喚醒寫(xiě)線程隊(duì)列的第一個(gè)等待線程。數(shù)據(jù)必須從某個(gè)寫(xiě)線程交給某個(gè)讀線程,而不是寫(xiě)到某個(gè)隊(duì)列中等待被消費(fèi)。 前言 本文直接參考 Doug Lea 寫(xiě)的 Java doc 和注釋?zhuān)@也是我們?cè)趯W(xué)習(xí) java 并發(fā)包時(shí)最好的材料了。希望大家能有所思、有所悟,學(xué)習(xí) Doug Lea 的代碼風(fēng)格,并將其優(yōu)雅...

    maochunguang 評(píng)論0 收藏0
  • ArrayBlockingQueueLinkedBlockingQueue

    摘要:序本文主要簡(jiǎn)單介紹下與。有界無(wú)界有界,適合已知最大存儲(chǔ)容量的場(chǎng)景可有界可以無(wú)界吞吐量在大多數(shù)并發(fā)的場(chǎng)景下吞吐量比,但是性能不穩(wěn)定。測(cè)試結(jié)果表明,的可伸縮性要高于。 序 本文主要簡(jiǎn)單介紹下ArrayBlockingQueue與LinkedBlockingQueue。 對(duì)比 queue 阻塞與否 是否有界 線程安全保障 適用場(chǎng)景 注意事項(xiàng) ArrayBlockingQueue 阻...

    jackwang 評(píng)論0 收藏0
  • 并發(fā)包入坑指北』之阻塞隊(duì)列

    摘要:自己實(shí)現(xiàn)在自己實(shí)現(xiàn)之前先搞清楚阻塞隊(duì)列的幾個(gè)特點(diǎn)基本隊(duì)列特性先進(jìn)先出。消費(fèi)隊(duì)列空時(shí)會(huì)阻塞直到寫(xiě)入線程寫(xiě)入了隊(duì)列數(shù)據(jù)后喚醒消費(fèi)線程。最終的隊(duì)列大小為,可見(jiàn)線程也是安全的。 showImg(https://segmentfault.com/img/remote/1460000018811340); 前言 較長(zhǎng)一段時(shí)間以來(lái)我都發(fā)現(xiàn)不少開(kāi)發(fā)者對(duì) jdk 中的 J.U.C(java.util.c...

    nicercode 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<