国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

【Netty】學(xué)習(xí)NioEventLoop

tulayang / 1828人閱讀

摘要:主從模型主從多線程多個的線程池用于接受客戶端的連接。負(fù)責(zé)多路分離已連接的,讀寫網(wǎng)絡(luò)數(shù)據(jù),將業(yè)務(wù)處理功能扔給線程池完成。比如在線程內(nèi)部進(jìn)行串行操作,避免多線程競爭造成的性能問題。

歡迎關(guān)注公眾號:【愛編程
如果有需要后臺回復(fù)2019贈送1T的學(xué)習(xí)資料哦!!

簡介

Netty框架的主要線程就是I/O線程,線程模型的設(shè)計決定了系統(tǒng)的吞吐量、并發(fā)性和安全性等架構(gòu)質(zhì)量屬性。所以了解一下NioEventLoop。

Reactor線程模型

基本上所有的網(wǎng)絡(luò)處理程序都有以下基本的處理過程:
Read request
Decode request
Process service
Encode reply
Send reply

Reactor單線程模型

這是最簡單的單Reactor線程模型,它負(fù)責(zé)多路分離套接字,Accept新連接,并分派請求到處理器鏈中。該模型適用于處理器鏈中業(yè)務(wù)處理組件能快速完成的場景。但這種模型并不能充分利用多核資源,實際使用少。

Reactor多線程模型

相比上一種模型,該模型在處理器鏈部分采用了多線程(線程池),也就是后端程序常見的模型。但Reactor仍為單個線程。

Reactor主從模型

主從Reactor多線程:多個acceptor的NIO線程池用于接受客戶端的連接。將Reactor分成兩部分,mainReactor負(fù)責(zé)監(jiān)聽Server socket,accpet新連接,并將簡歷的socket分派給subReactor。subReactor負(fù)責(zé)多路分離已連接的socket,讀寫網(wǎng)絡(luò)數(shù)據(jù),將業(yè)務(wù)處理功能扔給worker線程池完成。通常subReactor個數(shù)上與CPU個數(shù)等同。

以上就是對Reactor線程模型的學(xué)習(xí)。更加詳細(xì)可以參考Doug Lea大神的PPT
http://gee.cs.oswego.edu/dl/c...

Netty的線程模型

netty的線程模型是可以通過設(shè)置啟動類的參數(shù)來配置的,設(shè)置不同的啟動參數(shù),netty支持Reactor單線程模型、多線程模型和主從Reactor多線程模型。

Boss線程池職責(zé)如下:
(1)接收客戶端的連接,初始化Channel參數(shù)
(2)將鏈路狀態(tài)變更時間通知給ChannelPipeline

worker線程池作用是:
(1)異步讀取通信對端的數(shù)據(jù)報,發(fā)送讀事件到ChannelPipeline
(2)異步發(fā)送消息到通信對端,調(diào)用ChannelPipeline的消息發(fā)送接口
(3)執(zhí)行系統(tǒng)調(diào)用Task;
(4)執(zhí)行定時任務(wù)Task;

通過配置boss和worker線程池的線程個數(shù)以及是否共享線程池等方式,netty的線程模型可以在單線程、多線程、主從線程之間切換。

為了提升性能,netty在很多地方都進(jìn)行了無鎖設(shè)計。比如在IO線程內(nèi)部進(jìn)行串行操作,避免多線程競爭造成的性能問題。表面上似乎串行化設(shè)計似乎CPU利用率不高,但是通過調(diào)整NIO線程池的線程參數(shù),可以同時啟動多個串行化的線程并行運行,這種局部無鎖串行線程設(shè)計性能更優(yōu)。

NioEventLoop源碼分析

基于Netty4.1.36

問題:
1.默認(rèn)情況下,netty服務(wù)端起多少線程?何時啟動?
2.Netty是如何解決jdk空輪詢bug的?
3.Netty如何保證異步串行無鎖化?

NioEventLoop創(chuàng)建流程

大致來說,從new NioEventLoopGroup()入手,然后到MultithreadEventLoopGroup的構(gòu)造中明確的寫明了默認(rèn)為CPU的2倍的線程,接著new ThreadPerTaskExecutor()[線程創(chuàng)建器],然后就是一個死循環(huán)newChild()構(gòu)造NioEventLoop,最后就是newChooser()[線程選擇器]為后面的啟動和執(zhí)行做準(zhǔn)備。

NioEventLoop啟動流程和執(zhí)行邏輯

NioEventLoop啟動從客戶端bind()入手,然后跟蹤到doBind0(),接著到SingleThreadEventExecutor中execute(),該方法主要是添加任務(wù)addTask(task)和運行線程startThread(),然后在startThread()-->doStartThread()-->SingleThreadEventExecutor.this.run();開始執(zhí)行NioEventLoop運行邏輯。

NioEventLoop啟動后主要的工作

1.select() -- 檢測IO事件,輪詢注冊到selector上面的io事件
2.processSelectedKeys() -- 處理io事件
3.runAllTasks() -- 處理外部線程扔到TaskQueue里面的任務(wù)

1.select() -- 檢測IO事件

檢測IO事件主要有三個部分:

deadline以及任務(wù)穿插邏輯處理:
計算本次執(zhí)行select截止時間(根據(jù)NioEventLoop當(dāng)時是否有定時任務(wù)處理)以及判斷在select的時候是否有任務(wù)要處理。

阻塞式select:
未到截止時間或者任務(wù)隊列為空進(jìn)行一次阻塞式select操作

避免JDK空輪詢的Bug:
判斷這次select操作是否阻塞timeoutMillis時間,未阻塞timeoutMillis時間表示觸發(fā)JDK空輪詢;判斷觸發(fā)JDK空輪詢的次數(shù)是否超過閾值,達(dá)到閾值調(diào)用rebuildSelector()方法替換原來的selector操作方式避免下次JDK空輪詢繼續(xù)發(fā)生

private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
                
                /** 1.deadline以及任務(wù)穿插邏輯處理-- 開始**/
                long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
                if (timeoutMillis <= 0) {
                    if (selectCnt == 0) {
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
               

                // If a task was submitted when wakenUp value was true, the task didn"t get a chance to call
                // Selector#wakeup. So we need to check task queue again before executing select operation.
                // If we don"t, the task might be pended until select operation was timed out.
                // It might be pended until idle timeout if IdleStateHandler existed in pipeline.
                if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                    selector.selectNow();
                    selectCnt = 1;
                    break;
                }
                  /** 1.deadline以及任務(wù)穿插邏輯處理-- 結(jié)束**/
                  /**2.阻塞select--開始**/
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;
               /**2.阻塞select--結(jié)束**/
                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    // - Selected something,
                    // - waken up by user, or
                    // - the task queue has a pending task.
                    // - a scheduled task is ready for processing
                    break;
                }
                if (Thread.interrupted()) {
                    // Thread was interrupted so reset selected keys and break so we not run into a busy loop.
                    // As this is most likely a bug in the handler of the user or it"s client library we will
                    // also log it.
                    //
                    // See https://github.com/netty/netty/issues/2426
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }
                 /**3.避免jdk空輪詢的bug -- 開始 **/
                long time = System.nanoTime();
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    // timeoutMillis elapsed without anything selected.
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The code exists in an extra method to ensure the method is not too big to inline as this
                    // branch is not very likely to get hit very frequently.
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }

                currentTimeNanos = time;
            }
            /**3.避免jdk空輪詢的bug -- 結(jié)束**/
            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }

2. processSelectedKeys()-- 處理IO事件

selected keySet優(yōu)化

select操作每次把已就緒狀態(tài)的io事件添加到底層HashSet(時間復(fù)雜度為O(n))數(shù)據(jù)結(jié)構(gòu),通過反射方式將HashSet替換成數(shù)組的實現(xiàn).

NioEventLoop.openSelector()

  private SelectorTuple openSelector() {
        final Selector unwrappedSelector;
        try {
            unwrappedSelector = provider.openSelector();
        } catch (IOException e) {
            throw new ChannelException("failed to open a new selector", e);
        }

        if (DISABLE_KEY_SET_OPTIMIZATION) {
            return new SelectorTuple(unwrappedSelector);
        }

        Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction() {
            @Override
            public Object run() {
                try {
                    return Class.forName(
                            "sun.nio.ch.SelectorImpl",
                            false,
                            PlatformDependent.getSystemClassLoader());
                } catch (Throwable cause) {
                    return cause;
                }
            }
        });

        if (!(maybeSelectorImplClass instanceof Class) ||
            // ensure the current selector implementation is what we can instrument.
            !((Class) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) {
            if (maybeSelectorImplClass instanceof Throwable) {
                Throwable t = (Throwable) maybeSelectorImplClass;
                logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t);
            }
            return new SelectorTuple(unwrappedSelector);
        }

        final Class selectorImplClass = (Class) maybeSelectorImplClass;
        final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

        Object maybeException = AccessController.doPrivileged(new PrivilegedAction() {
            @Override
            public Object run() {
                try {
                    Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
                    Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");

                    if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
                        // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
                        // This allows us to also do this in Java9+ without any extra flags.
                        long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
                        long publicSelectedKeysFieldOffset =
                                PlatformDependent.objectFieldOffset(publicSelectedKeysField);

                        if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
                            PlatformDependent.putObject(
                                    unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
                            PlatformDependent.putObject(
                                    unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
                            return null;
                        }
                        // We could not retrieve the offset, lets try reflection as last-resort.
                    }

                    Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }
                    cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);
                    if (cause != null) {
                        return cause;
                    }

                    selectedKeysField.set(unwrappedSelector, selectedKeySet);
                    publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
                    return null;
                } catch (NoSuchFieldException e) {
                    return e;
                } catch (IllegalAccessException e) {
                    return e;
                }
            }
        });

        if (maybeException instanceof Exception) {
            selectedKeys = null;
            Exception e = (Exception) maybeException;
            logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);
            return new SelectorTuple(unwrappedSelector);
        }
        selectedKeys = selectedKeySet;
        logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector);
        return new SelectorTuple(unwrappedSelector,
                                 new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
    }

processSelectedKeysOptimized()

遍歷SelectionKey數(shù)組獲取SelectionKey的attachment即NioChannel;
SelectionKey合法獲取SelectionKey的io事件進(jìn)行事件處理

NioEventLoop.processSelectedKeysOptimized()

private void processSelectedKeysOptimized() {
        for (int i = 0; i < selectedKeys.size; ++i) {
            final SelectionKey k = selectedKeys.keys[i];
            // null out entry in the array to allow to have it GC"ed once the Channel close
            // See https://github.com/netty/netty/issues/2363
            selectedKeys.keys[i] = null;

            final Object a = k.attachment();

            if (a instanceof AbstractNioChannel) {
                processSelectedKey(k, (AbstractNioChannel) a);
            } else {
                @SuppressWarnings("unchecked")
                NioTask task = (NioTask) a;
                processSelectedKey(k, task);
            }

            if (needsToSelectAgain) {
                // null out entries in the array to allow to have it GC"ed once the Channel close
                // See https://github.com/netty/netty/issues/2363
                selectedKeys.reset(i + 1);

                selectAgain();
                i = -1;
            }
        } 
    }

3. runAllTasks()

Task的分類和添加

MpscQueue創(chuàng)建NioEventLoop構(gòu)造,外部線程使用addTask()方法添加task;
ScheduledTaskQueue調(diào)用schedule()封裝ScheduledFutureTask添加到普通任務(wù)隊列

普通任務(wù)Task

SingleThreadEventExecutor.execute()-->addTask()

   protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }

定時任務(wù)Task

將線程外的任務(wù)是通過加入隊列實現(xiàn),從而保證了線程安全。

AbstractScheduledEventExecutor.schedule() -->ScheduledFuture

    ScheduledFuture schedule(final ScheduledFutureTask task) {
        if (inEventLoop()) {
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;
    }

任務(wù)的聚合

將定時任務(wù)隊列任務(wù)聚合到普通任務(wù)隊列

SingleThreadEventExecutor.fetchFromScheduledTaskQueue()

 private boolean fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        Runnable scheduledTask  = pollScheduledTask(nanoTime);
        while (scheduledTask != null) {
            if (!taskQueue.offer(scheduledTask)) {
                // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                scheduledTaskQueue().add((ScheduledFutureTask) scheduledTask);
                return false;
            }
            scheduledTask  = pollScheduledTask(nanoTime);
        }
        return true;
    }

ScheduledFutureTask中可以看到任務(wù)Task是先按照截止時間排序,然后按照id進(jìn)行排序的。

  public int compareTo(Delayed o) {
        if (this == o) {
            return 0;
        }

        ScheduledFutureTask that = (ScheduledFutureTask) o;
        long d = deadlineNanos() - that.deadlineNanos();
        if (d < 0) {
            return -1;
        } else if (d > 0) {
            return 1;
        } else if (id < that.id) {
            return -1;
        } else if (id == that.id) {
            throw new Error();
        } else {
            return 1;
        }
    }

任務(wù)的執(zhí)行

獲取普通任務(wù)隊列待執(zhí)行任務(wù),使用safeExecute()方法執(zhí)行任務(wù),每次當(dāng)累計任務(wù)數(shù)量達(dá)到64判斷當(dāng)前時間是否超過截止時間中斷執(zhí)行后續(xù)任務(wù)

NioEventLoop.runAllTasks()

  protected boolean runAllTasks(long timeoutNanos) {
        fetchFromScheduledTaskQueue();
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }
總結(jié)

主要學(xué)習(xí)了NioEventLoop的基本知識,如果有更多知識歡迎各位分享,我還是個小菜鳥。

最后

如果對 Java、大數(shù)據(jù)感興趣請長按二維碼關(guān)注一波,我會努力帶給你們價值。覺得對你哪怕有一丁點幫助的請幫忙點個贊或者轉(zhuǎn)發(fā)哦。
關(guān)注公眾號【愛編碼】,回復(fù)2019有相關(guān)資料哦。

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/74825.html

相關(guān)文章

  • Netty 源碼分析之 三 我就是大名鼎鼎的 EventLoop(一)

    摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...

    livem 評論0 收藏0
  • Netty ByteBuf 誰負(fù)責(zé)誰釋放

    摘要:轉(zhuǎn)發(fā)自 轉(zhuǎn)發(fā)自 http://netty.io/wiki/referenc... Since Netty version 4, the life cycle of certain objects are managed by their reference counts, so that Netty can return them (or their shared resources)...

    Lyux 評論0 收藏0
  • IDEA開發(fā)工具報錯----啟動項目報錯

    摘要:使用工具啟動服務(wù)器報錯打開防火墻設(shè)置,找到在的允許應(yīng)用通過防火墻中勾選,保存,重啟 使用idea工具啟動tomcat服務(wù)器報錯: Error:Abnormal build process termination: Build process started. Classpath: /C:/Program Files (x86)/JetBrains/IntelliJ IDEA 15.0...

    Lin_R 評論0 收藏0
  • IDEA開發(fā)工具報錯----啟動項目報錯

    摘要:使用工具啟動服務(wù)器報錯打開防火墻設(shè)置,找到在的允許應(yīng)用通過防火墻中勾選,保存,重啟 使用idea工具啟動tomcat服務(wù)器報錯: Error:Abnormal build process termination: Build process started. Classpath: /C:/Program Files (x86)/JetBrains/IntelliJ IDEA 15.0...

    blair 評論0 收藏0
  • Netty4.x 源碼實戰(zhàn)系列(五):深入淺出學(xué)NioEventLoopGroup

    摘要:接下來的兩篇文章,我將從源碼角度為大家深入淺出的剖析的線程模型工作機(jī)制。我們看一下的源碼通過的代碼發(fā)現(xiàn),實現(xiàn)了接口,其內(nèi)部會通過指定的默認(rèn)線程工廠來創(chuàng)建線程,并執(zhí)行相應(yīng)的任務(wù)。至此,初始化完成了。下一篇我們將詳細(xì)介紹,敬請期待。 我們都知道Netty的線程模型是基于React的線程模型,并且我們都知道Netty是一個高性能的NIO框架,那么其線程模型必定是它的重要貢獻(xiàn)之一。 在使用ne...

    MSchumi 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<