上一篇文章,分析了Netty服務(wù)端啟動的初始化過程,今天我們來分析一下Netty中的Reactor線程模型

在分析源碼之前,我們先分析,哪些地方用到了EventLoop?

  • NioServerSocketChannel的連接監(jiān)聽注冊
  • NioSocketChannel的IO事件注冊

NioServerSocketChannel連接監(jiān)聽

在AbstractBootstrap類的initAndRegister()方法中,當(dāng)NioServerSocketChannel初始化完成后,會調(diào)用case標(biāo)記位置的代碼進(jìn)行注冊。

final ChannelFuture initAndRegister() {    Channel channel = null;    try {        channel = channelFactory.newChannel();        init(channel);    } catch (Throwable t) {    }   //注冊到boss線程的selector上。    ChannelFuture regFuture = config().group().register(channel);    if (regFuture.cause() != null) {        if (channel.isRegistered()) {            channel.close();        } else {            channel.unsafe().closeForcibly();        }    }    return regFuture;}

AbstractNioChannel.doRegister

按照代碼的執(zhí)行邏輯,最終會執(zhí)行到AbstractNioChannel的doRegister()方法中。

@Overrideprotected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            //調(diào)用ServerSocketChannel的register方法,把當(dāng)前服務(wù)端對象注冊到boss線程的selector上            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            return;        } catch (CancelledKeyException e) {            if (!selected) {                // Force the Selector to select now as the "canceled" SelectionKey may still be                // cached and not removed because no Select.select(..) operation was called yet.                eventLoop().selectNow();                selected = true;            } else {                // We forced a select operation on the selector before but the SelectionKey is still cached                // for whatever reason. JDK bug ?                throw e;            }        }    }}

NioEventLoop的啟動過程

NioEventLoop是一個線程,它的啟動過程如下。

在AbstractBootstrap的doBind0方法中,獲取了NioServerSocketChannel中的NioEventLoop,然后使用它來執(zhí)行綁定端口的任務(wù)。

private static void doBind0(    final ChannelFuture regFuture, final Channel channel,    final SocketAddress localAddress, final ChannelPromise promise) {    //啟動    channel.eventLoop().execute(new Runnable() {        @Override        public void run() {            if (regFuture.isSuccess()) {                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);            } else {                promise.setFailure(regFuture.cause());            }        }    });}

SingleThreadEventExecutor.execute

然后一路執(zhí)行到SingleThreadEventExecutor.execute方法中,調(diào)用startThread()方法啟動線程。

private void execute(Runnable task, boolean immediate) {    boolean inEventLoop = inEventLoop();    addTask(task);    if (!inEventLoop) {        startThread(); //啟動線程        if (isShutdown()) {            boolean reject = false;            try {                if (removeTask(task)) {                    reject = true;                }            } catch (UnsupportedOperationException e) {                // The task queue does not support removal so the best thing we can do is to just move on and                // hope we will be able to pick-up the task before its completely terminated.                // In worst case we will log on termination.            }            if (reject) {                reject();            }        }    }    if (!addTaskWakesUp && immediate) {        wakeup(inEventLoop);    }}

startThread

private void startThread() {    if (state == ST_NOT_STARTED) {        if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {            boolean success = false;            try {                doStartThread(); //執(zhí)行啟動過程                success = true;            } finally {                if (!success) {                    STATE_UPDATER.compareAndSet(this, ST_STARTED, ST_NOT_STARTED);                }            }        }    }}

接著調(diào)用doStartThread()方法,通過executor.execute執(zhí)行一個任務(wù),在該任務(wù)中啟動了NioEventLoop線程

private void doStartThread() {    assert thread == null;    executor.execute(new Runnable() { //通過線程池執(zhí)行一個任務(wù)        @Override        public void run() {            thread = Thread.currentThread();            if (interrupted) {                thread.interrupt();            }            boolean success = false;            updateLastExecutionTime();            try {                SingleThreadEventExecutor.this.run(); //調(diào)用boss的NioEventLoop的run方法,開啟輪詢            }            //省略....        }    });}

NioEventLoop的輪詢過程

當(dāng)NioEventLoop線程被啟動后,就直接進(jìn)入到NioEventLoop的run方法中。

protected void run() {    int selectCnt = 0;    for (;;) {        try {            int strategy;            try {                strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());                switch (strategy) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.BUSY_WAIT:                    case SelectStrategy.SELECT:                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();                        if (curDeadlineNanos == -1L) {                            curDeadlineNanos = NONE; // nothing on the calendar                        }                        nextWakeupNanos.set(curDeadlineNanos);                        try {                            if (!hasTasks()) {                                strategy = select(curDeadlineNanos);                            }                        } finally {                            // This update is just to help block unnecessary selector wakeups                            // so use of lazySet is ok (no race condition)                            nextWakeupNanos.lazySet(AWAKE);                        }                        // fall through                    default:                }            } catch (IOException e) {                // If we receive an IOException here its because the Selector is messed up. Lets rebuild                // the selector and retry. https://github.com/netty/netty/issues/8566                rebuildSelector0();                selectCnt = 0;                handleLoopException(e);                continue;            }            selectCnt++;            cancelledKeys = 0;            needsToSelectAgain = false;            final int ioRatio = this.ioRatio;            boolean ranTasks;            if (ioRatio == 100) {                try {                    if (strategy > 0) {                        processSelectedKeys();                    }                } finally {                    // Ensure we always run tasks.                    ranTasks = runAllTasks();                }            } else if (strategy > 0) {                final long ioStartTime = System.nanoTime();                try {                    processSelectedKeys();                } finally {                    // Ensure we always run tasks.                    final long ioTime = System.nanoTime() - ioStartTime;                    ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);                }            } else {                ranTasks = runAllTasks(0); // This will run the minimum number of tasks            }            if (ranTasks || strategy > 0) {                if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                                 selectCnt - 1, selector);                }                selectCnt = 0;            } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)                selectCnt = 0;            }        } catch (CancelledKeyException e) {            // Harmless exception - log anyway            if (logger.isDebugEnabled()) {                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",                             selector, e);            }        } catch (Error e) {            throw (Error) e;        } catch (Throwable t) {            handleLoopException(t);        } finally {            // Always handle shutdown even if the loop processing threw an exception.            try {                if (isShuttingDown()) {                    closeAll();                    if (confirmShutdown()) {                        return;                    }                }            } catch (Error e) {                throw (Error) e;            } catch (Throwable t) {                handleLoopException(t);            }        }    }}

NioEventLoop的執(zhí)行流程

NioEventLoop中的run方法是一個無限循環(huán)的線程,在該循環(huán)中主要做三件事情,如圖9-1所示。

圖9-1

  • 輪詢處理I/O事件(select),輪詢Selector選擇器中已經(jīng)注冊的所有Channel的I/O就緒事件
  • 處理I/O事件,如果存在已經(jīng)就緒的Channel的I/O事件,則調(diào)用processSelectedKeys進(jìn)行處理
  • 處理異步任務(wù)(runAllTasks),Reactor線程有一個非常重要的職責(zé),就是處理任務(wù)隊(duì)列中的非I/O任務(wù),Netty提供了ioRadio參數(shù)用來調(diào)整I/O時(shí)間和任務(wù)處理的時(shí)間比例。

輪詢I/O就緒事件

我們先來看I/O時(shí)間相關(guān)的代碼片段:

  1. 通過selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())獲取當(dāng)前的執(zhí)行策略
  2. 根據(jù)不同的策略,用來控制每次輪詢時(shí)的執(zhí)行策略。
protected void run() {        int selectCnt = 0;        for (;;) {            try {                int strategy;                try {                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());                    switch (strategy) {                    case SelectStrategy.CONTINUE:                        continue;                    case SelectStrategy.BUSY_WAIT:                        // fall-through to SELECT since the busy-wait is not supported with NIO                    case SelectStrategy.SELECT:                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();                        if (curDeadlineNanos == -1L) {                            curDeadlineNanos = NONE; // nothing on the calendar                        }                        nextWakeupNanos.set(curDeadlineNanos);                        try {                            if (!hasTasks()) {                                strategy = select(curDeadlineNanos);                            }                        } finally {                            // This update is just to help block unnecessary selector wakeups                            // so use of lazySet is ok (no race condition)                            nextWakeupNanos.lazySet(AWAKE);                        }                        // fall through                    default:                    }                }                //省略....            }        }}

selectStrategy處理邏輯

@Overridepublic int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;}

如果hasTasks為true,表示當(dāng)前NioEventLoop線程存在異步任務(wù)的情況下,則調(diào)用selectSupplier.get(),否則直接返回SELECT

其中selectSupplier.get()的定義如下:

private final IntSupplier selectNowSupplier = new IntSupplier() {    @Override    public int get() throws Exception {        return selectNow();    }};

該方法中調(diào)用的是selectNow()方法,這個方法是Selector選擇器中的提供的非阻塞方法,執(zhí)行后會立刻返回。

  • 如果當(dāng)前已經(jīng)有就緒的Channel,則會返回對應(yīng)就緒Channel的數(shù)量
  • 否則,返回0.

分支處理

在上面一個步驟中獲得了strategy之后,會根據(jù)不同的結(jié)果進(jìn)行分支處理。

  • CONTINUE,表示需要重試。
  • BUSY_WAIT,由于在NIO中并不支持BUSY_WAIT,所以BUSY_WAIT和SELECT的執(zhí)行邏輯是一樣的
  • SELECT,表示需要通過select方法獲取就緒的Channel列表,當(dāng)NioEventLoop中不存在異步任務(wù)時(shí),也就是任務(wù)隊(duì)列為空,則返回該策略。
switch (strategy) {    case SelectStrategy.CONTINUE:        continue;    case SelectStrategy.BUSY_WAIT:        // fall-through to SELECT since the busy-wait is not supported with NIO    case SelectStrategy.SELECT:        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();        if (curDeadlineNanos == -1L) {            curDeadlineNanos = NONE; // nothing on the calendar        }        nextWakeupNanos.set(curDeadlineNanos);        try {            if (!hasTasks()) {                strategy = select(curDeadlineNanos);            }        } finally {            // This update is just to help block unnecessary selector wakeups            // so use of lazySet is ok (no race condition)            nextWakeupNanos.lazySet(AWAKE);        }        // fall through    default:}

SelectStrategy.SELECT

當(dāng)NioEventLoop線程中不存在異步任務(wù)時(shí),則開始執(zhí)行SELECT策略

//下一次定時(shí)任務(wù)觸發(fā)截至?xí)r間,默認(rèn)不是定時(shí)任務(wù),返回 -1Llong curDeadlineNanos = nextScheduledTaskDeadlineNanos();if (curDeadlineNanos == -1L) {    curDeadlineNanos = NONE; // nothing on the calendar}nextWakeupNanos.set(curDeadlineNanos);try {    if (!hasTasks()) {        //2. taskQueue中任務(wù)執(zhí)行完,開始執(zhí)行select進(jìn)行阻塞        strategy = select(curDeadlineNanos);    }} finally {    // This update is just to help block unnecessary selector wakeups    // so use of lazySet is ok (no race condition)    nextWakeupNanos.lazySet(AWAKE);}

select方法定義如下,默認(rèn)情況下deadlineNanos=NONE,所以會調(diào)用select()方法阻塞。

private int select(long deadlineNanos) throws IOException {    if (deadlineNanos == NONE) {        return selector.select();    }    //計(jì)算select()方法的阻塞超時(shí)時(shí)間    long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;    return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);}

最終返回就緒的channel個數(shù),后續(xù)的邏輯中會根據(jù)返回的就緒channel個數(shù)來決定執(zhí)行邏輯。

NioEventLoop.run中的業(yè)務(wù)處理

業(yè)務(wù)處理的邏輯相對來說比較容易理解

  • 如果有就緒的channel,則處理就緒channel的IO事件
  • 處理完成后同步執(zhí)行異步隊(duì)列中的任務(wù)。
  • 另外,這里為了解決Java NIO中的空轉(zhuǎn)問題,通過selectCnt記錄了空轉(zhuǎn)次數(shù),一次循環(huán)發(fā)生了空轉(zhuǎn)(既沒有IO需要處理、也沒有執(zhí)行任何任務(wù)),那么記錄下來(selectCnt); ,如果連續(xù)發(fā)生空轉(zhuǎn)(selectCnt達(dá)到一定值),netty認(rèn)為觸發(fā)了NIO的BUG(unexpectedSelectorWakeup處理);

Java Nio中有一個bug,Java nio在Linux系統(tǒng)下的epoll空輪詢問題。也就是在select()方法中,及時(shí)就緒的channel為0,也會從本來應(yīng)該阻塞的操作中被喚醒,從而導(dǎo)致CPU 使用率達(dá)到100%。

@Overrideprotected void run() {    int selectCnt = 0;    for (;;) {        //省略....        selectCnt++;//selectCnt記錄的是無功而返的select次數(shù),即eventLoop空轉(zhuǎn)的次數(shù),為解決NIO BUG        cancelledKeys = 0;        needsToSelectAgain = false;        final int ioRatio = this.ioRatio;        boolean ranTasks;        if (ioRatio == 100) { //ioRadio執(zhí)行時(shí)間占比是100%,默認(rèn)是50%            try {                if (strategy > 0) { //strategy>0表示存在就緒的SocketChannel                    processSelectedKeys(); //執(zhí)行就緒SocketChannel的任務(wù)                }            } finally {             //注意,將ioRatio設(shè)置為100,并不代表任務(wù)不執(zhí)行,反而是每次將任務(wù)隊(duì)列執(zhí)行完                ranTasks = runAllTasks(); //確保總是執(zhí)行隊(duì)列中的任務(wù)            }        } else if (strategy > 0) { //strategy>0表示存在就緒的SocketChannel            final long ioStartTime = System.nanoTime(); //io時(shí)間處理開始時(shí)間            try {                processSelectedKeys(); //開始處理IO就緒事件            } finally {                // io事件執(zhí)行結(jié)束時(shí)間                final long ioTime = System.nanoTime() - ioStartTime;                //基于本次循環(huán)處理IO的時(shí)間,ioRatio,計(jì)算出執(zhí)行任務(wù)耗時(shí)的上限,也就是只允許處理多長時(shí)間異步任務(wù)                ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);            }        } else {            //這個分支代表:strategy=0,ioRatio<100,此時(shí)任務(wù)限時(shí)=0,意為:盡量少地執(zhí)行異步任務(wù)            //這個分支和strategy>0實(shí)際是一碼事,代碼簡化了一下而已            ranTasks = runAllTasks(0); // This will run the minimum number of tasks        }        if (ranTasks || strategy > 0) { //ranTasks=true,或strategy>0,說明eventLoop干活了,沒有空轉(zhuǎn),清空selectCnt            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                             selectCnt - 1, selector);            }            selectCnt = 0;        }          //unexpectedSelectorWakeup處理NIO BUG        else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)            selectCnt = 0;        }    }}

processSelectedKeys

通過在select方法中,我們可以獲得就緒的I/O事件數(shù)量,從而觸發(fā)執(zhí)行processSelectedKeys方法。

private void processSelectedKeys() {    if (selectedKeys != null) {        processSelectedKeysOptimized();    } else {        processSelectedKeysPlain(selector.selectedKeys());    }}

處理I/O事件時(shí),有兩個邏輯分支處理:

  • 一種是處理Netty優(yōu)化過的selectedKeys,
  • 另一種是正常的處理邏輯

processSelectedKeys方法中根據(jù)是否設(shè)置了selectedKeys來判斷使用哪種策略,默認(rèn)使用的是Netty優(yōu)化過的selectedKeys,它返回的對象是SelectedSelectionKeySet

processSelectedKeysOptimized

private void processSelectedKeysOptimized() {    for (int i = 0; i < selectedKeys.size; ++i) {        //1. 取出IO事件以及對應(yīng)的channel        final SelectionKey k = selectedKeys.keys[i];        selectedKeys.keys[i] = null;//k的引用置null,便于gc回收,也表示該channel的事件處理完成避免重復(fù)處理        final Object a = k.attachment(); //獲取保存在當(dāng)前channel中的attachment,此時(shí)應(yīng)該是NioServerSocketChannel        //處理當(dāng)前的channel        if (a instanceof AbstractNioChannel) {             //對于boss NioEventLoop,輪詢到的基本是連接事件,后續(xù)的事情就是通過他的pipeline將連接扔給一個worker NioEventLoop處理            //對于worker NioEventLoop來說,輪循道的基本商是IO讀寫事件,后續(xù)的事情就是通過他的pipeline將讀取到的字節(jié)流傳遞給每個channelHandler來處理            processSelectedKey(k, (AbstractNioChannel) a);        } else {            @SuppressWarnings("unchecked")            NioTask task = (NioTask) a;            processSelectedKey(k, task);        }        if (needsToSelectAgain) {            // null out entries in the array to allow to have it GCed once the Channel close            // See https://github.com/netty/netty/issues/2363            selectedKeys.reset(i + 1);            selectAgain();            i = -1;        }    }}

processSelectedKey

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();    if (!k.isValid()) {        final EventLoop eventLoop;        try {            eventLoop = ch.eventLoop();        } catch (Throwable ignored) {        }        if (eventLoop == this) {            // close the channel if the key is not valid anymore            unsafe.close(unsafe.voidPromise());        }        return;    }    try {        int readyOps = k.readyOps(); //獲取當(dāng)前key所屬的操作類型        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//如果是連接類型            int ops = k.interestOps();            ops &= ~SelectionKey.OP_CONNECT;            k.interestOps(ops);            unsafe.finishConnect();        }        if ((readyOps & SelectionKey.OP_WRITE) != 0) { //如果是寫類型            ch.unsafe().forceFlush();        }        //如果是讀類型或者ACCEPT類型。則執(zhí)行unsafe.read()方法,unsafe的實(shí)例對象為 NioMessageUnsafe        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {            unsafe.read();        }    } catch (CancelledKeyException ignored) {        unsafe.close(unsafe.voidPromise());    }}

NioMessageUnsafe.read()

假設(shè)此時(shí)是一個讀操作,或者是客戶端建立連接,那么代碼執(zhí)行邏輯如下,

@Overridepublic void read() {    assert eventLoop().inEventLoop();    final ChannelConfig config = config();    final ChannelPipeline pipeline = pipeline(); //如果是第一次建立連接,此時(shí)的pipeline是ServerBootstrapAcceptor    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();    allocHandle.reset(config);    boolean closed = false;    Throwable exception = null;    try {        try {            do {                int localRead = doReadMessages(readBuf);                if (localRead == 0) {                    break;                }                if (localRead < 0) {                    closed = true;                    break;                }                allocHandle.incMessagesRead(localRead);            } while (continueReading(allocHandle));        } catch (Throwable t) {            exception = t;        }        int size = readBuf.size();        for (int i = 0; i < size; i ++) {            readPending = false;            pipeline.fireChannelRead(readBuf.get(i));  //調(diào)用pipeline中的channelRead方法        }        readBuf.clear();        allocHandle.readComplete();        pipeline.fireChannelReadComplete();        if (exception != null) {            closed = closeOnReadError(exception);            pipeline.fireExceptionCaught(exception); //調(diào)用pipeline中的ExceptionCaught方法        }        if (closed) {            inputShutdown = true;            if (isOpen()) {                close(voidPromise());            }        }    } finally {        if (!readPending && !config.isAutoRead()) {            removeReadOp();        }    }}

SelectedSelectionKeySet的優(yōu)化

Netty中自己封裝實(shí)現(xiàn)了一個SelectedSelectionKeySet,用來優(yōu)化原本SelectorKeys的結(jié)構(gòu),它是怎么進(jìn)行優(yōu)化的呢?先來看它的代碼定義

final class SelectedSelectionKeySet extends AbstractSet {    SelectionKey[] keys;    int size;    SelectedSelectionKeySet() {        keys = new SelectionKey[1024];    }    @Override    public boolean add(SelectionKey o) {        if (o == null) {            return false;        }        keys[size++] = o;        if (size == keys.length) {            increaseCapacity();        }        return true;    }}

SelectedSelectionKeySet內(nèi)部使用的是SelectionKey數(shù)組,所有在processSelectedKeysOptimized方法中可以直接通過遍歷數(shù)組來取出就緒的I/O事件。

而原來的Set<SelectionKey>返回的是HashSet類型,兩者相比,SelectionKey[]不需要考慮哈希沖突的問題,所以可以實(shí)現(xiàn)O(1)時(shí)間復(fù)雜度的add操作。

SelectedSelectionKeySet的初始化

netty通過反射的方式,把Selector對象內(nèi)部的selectedKeys和publicSelectedKeys替換為SelectedSelectionKeySet。

原本的selectedKeys和publicSelectedKeys這兩個字段都是HashSet類型,替換之后變成了SelectedSelectionKeySet。當(dāng)有就緒的key時(shí),會直接填充到SelectedSelectionKeySet的數(shù)組中。后續(xù)只需要遍歷即可。

private SelectorTuple openSelector() {    final Class selectorImplClass = (Class) maybeSelectorImplClass;    final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();    //使用反射    Object maybeException = AccessController.doPrivileged(new PrivilegedAction() {        @Override        public Object run() {            try {                //Selector內(nèi)部的selectedKeys字段                Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");                //Selector內(nèi)部的publicSelectedKeys字段                Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");                if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {                    //獲取selectedKeysField字段偏移量                    long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);                    //獲取publicSelectedKeysField字段偏移量                    long publicSelectedKeysFieldOffset =                        PlatformDependent.objectFieldOffset(publicSelectedKeysField);                    if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {                        //替換為selectedKeySet                        PlatformDependent.putObject(                            unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);                        PlatformDependent.putObject(                            unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);                        return null;                    }                    // We could not retrieve the offset, lets try reflection as last-resort.                }                Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true);                if (cause != null) {                    return cause;                }                cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true);                if (cause != null) {                    return cause;                }                selectedKeysField.set(unwrappedSelector, selectedKeySet);                publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);                return null;            } catch (NoSuchFieldException e) {                return e;            } catch (IllegalAccessException e) {                return e;            }        }    });    if (maybeException instanceof Exception) {        selectedKeys = null;        Exception e = (Exception) maybeException;        logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e);        return new SelectorTuple(unwrappedSelector);    }    selectedKeys = selectedKeySet;}

異步任務(wù)的執(zhí)行流程

分析完上面的流程后,我們繼續(xù)來看NioEventLoop中的run方法中,針對異步任務(wù)的處理流程

@Overrideprotected void run() {    int selectCnt = 0;    for (;;) {        ranTasks = runAllTasks();    }}

runAllTask

需要注意,NioEventLoop可以支持定時(shí)任務(wù)的執(zhí)行,通過nioEventLoop.schedule()來完成。

protected boolean runAllTasks() {    assert inEventLoop();    boolean fetchedAll;    boolean ranAtLeastOne = false;    do {        fetchedAll = fetchFromScheduledTaskQueue(); //合并定時(shí)任務(wù)到普通任務(wù)隊(duì)列        if (runAllTasksFrom(taskQueue)) { //循環(huán)執(zhí)行taskQueue中的任務(wù)            ranAtLeastOne = true;        }    } while (!fetchedAll);      if (ranAtLeastOne) { //如果任務(wù)全部執(zhí)行完成,記錄執(zhí)行完完成時(shí)間        lastExecutionTime = ScheduledFutureTask.nanoTime();    }    afterRunningAllTasks();//執(zhí)行收尾任務(wù)    return ranAtLeastOne;}

fetchFromScheduledTaskQueue

遍歷scheduledTaskQueue中的任務(wù),添加到taskQueue中。

private boolean fetchFromScheduledTaskQueue() {    if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) {        return true;    }    long nanoTime = AbstractScheduledEventExecutor.nanoTime();    for (;;) {        Runnable scheduledTask = pollScheduledTask(nanoTime);        if (scheduledTask == null) {            return true;        }        if (!taskQueue.offer(scheduledTask)) {            // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.            scheduledTaskQueue.add((ScheduledFutureTask) scheduledTask);            return false;        }    }}

任務(wù)添加方法execute

NioEventLoop內(nèi)部有兩個非常重要的異步任務(wù)隊(duì)列,分別是普通任務(wù)和定時(shí)任務(wù)隊(duì)列,針對這兩個隊(duì)列提供了兩個方法分別向兩個隊(duì)列中添加任務(wù)。

  • execute()
  • schedule()

其中,execute方法的定義如下。

private void execute(Runnable task, boolean immediate) {    boolean inEventLoop = inEventLoop();    addTask(task); //把當(dāng)前任務(wù)添加到阻塞隊(duì)列中    if (!inEventLoop) { //如果是非NioEventLoop        startThread(); //啟動線程        if (isShutdown()) { //如果當(dāng)前NioEventLoop已經(jīng)是停止?fàn)顟B(tài)            boolean reject = false;            try {                if (removeTask(task)) {                     reject = true;                }            } catch (UnsupportedOperationException e) {                // The task queue does not support removal so the best thing we can do is to just move on and                // hope we will be able to pick-up the task before its completely terminated.                // In worst case we will log on termination.            }            if (reject) {                reject();            }        }    }    if (!addTaskWakesUp && immediate) {        wakeup(inEventLoop);    }}

Nio的空輪轉(zhuǎn)問題

所謂的空輪訓(xùn),是指我們在執(zhí)行selector.select()方法時(shí),如果沒有就緒的SocketChannel時(shí),當(dāng)前線程會被阻塞 。 而空輪詢是指當(dāng)沒有就緒SocketChannel時(shí),會被觸發(fā)喚醒。

而這個喚醒是沒有任何讀寫請求的,從而導(dǎo)致線程在做無效的輪詢,使得CPU占用率較高。

導(dǎo)致這個問題的根本原因是:

在部分Linux的2.6的kernel中,poll和epoll對于突然中斷的連接socket會對返回的eventSet事件集合置為POLLHUP,也可能是POLLERR,eventSet事件集合發(fā)生了變化,這就可能導(dǎo)致Selector會被喚醒。這是與操作系統(tǒng)機(jī)制有關(guān)系的,JDK雖然僅僅是一個兼容各個操作系統(tǒng)平臺的軟件,但很遺憾在JDK5和JDK6最初的版本中(嚴(yán)格意義上來將,JDK部分版本都是),這個問題并沒有解決,而將這個帽子拋給了操作系統(tǒng)方,這也就是這個bug最終一直到2013年才最終修復(fù)的原因,最終影響力太廣。

Netty是如何解決這個問題的呢?我們回到NioEventLoop的run方法中

@Overrideprotected void run() {    int selectCnt = 0;    for (;;) {        //selectCnt記錄的是無功而返的select次數(shù),即eventLoop空轉(zhuǎn)的次數(shù),為解決NIO BUG        selectCnt++;         //ranTasks=true,或strategy>0,說明eventLoop干活了,沒有空轉(zhuǎn),清空selectCnt        if (ranTasks || strategy > 0) {            //如果選擇操作計(jì)數(shù)器的值,大于最小選擇器重構(gòu)閾值,則輸出log            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {                logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",                             selectCnt - 1, selector);            }            selectCnt = 0;        }         //unexpectedSelectorWakeup處理NIO BUG        else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)            selectCnt = 0;        }    }}

unexpectedSelectorWakeup

private boolean unexpectedSelectorWakeup(int selectCnt) {    if (Thread.interrupted()) {        if (logger.isDebugEnabled()) {            logger.debug("Selector.select() returned prematurely because " +                         "Thread.currentThread().interrupt() was called. Use " +                         "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");        }        return true;    }    //如果選擇重構(gòu)的閾值大于0, 默認(rèn)值是512次、 并且當(dāng)前觸發(fā)的空輪詢次數(shù)大于 512次。,則觸發(fā)重構(gòu)    if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {        // The selector returned prematurely many times in a row.        // Rebuild the selector to work around the problem.        logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",                    selectCnt, selector);        rebuildSelector();        return true;    }    return false;}

rebuildSelector()

public void rebuildSelector() {    if (!inEventLoop()) { //如果不是在eventLoop中執(zhí)行,則使用異步線程執(zhí)行        execute(new Runnable() {            @Override            public void run() {                rebuildSelector0();            }        });        return;    }    rebuildSelector0();}

rebuildSelector0

這個方法的主要作用: 重新創(chuàng)建一個選擇器,替代當(dāng)前事件循環(huán)中的選擇器

private void rebuildSelector0() {    final Selector oldSelector = selector; //獲取老的selector選擇器    final SelectorTuple newSelectorTuple; //定義新的選擇器    if (oldSelector == null) { //如果老的選擇器為空,直接返回        return;    }    try {        newSelectorTuple = openSelector(); //創(chuàng)建一個新的選擇器    } catch (Exception e) {        logger.warn("Failed to create a new Selector.", e);        return;    }    // Register all channels to the new Selector.    int nChannels = 0;    for (SelectionKey key: oldSelector.keys()) {//遍歷注冊到選擇器的選擇key集合        Object a = key.attachment();        try {             //如果選擇key無效或選擇關(guān)聯(lián)的通道已經(jīng)注冊到新的選擇器,則跳出當(dāng)前循環(huán)            if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {                continue;            }            //獲取key的選擇關(guān)注事件集            int interestOps = key.interestOps();            key.cancel();//取消選擇key          //注冊選擇key到新的選擇器            SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);            if (a instanceof AbstractNioChannel) {//如果是nio通道,則更新通道的選擇key                // Update SelectionKey                ((AbstractNioChannel) a).selectionKey = newKey;            }            nChannels ++;        } catch (Exception e) {            logger.warn("Failed to re-register a Channel to the new Selector.", e);            if (a instanceof AbstractNioChannel) {                AbstractNioChannel ch = (AbstractNioChannel) a;                ch.unsafe().close(ch.unsafe().voidPromise());            } else {                @SuppressWarnings("unchecked")                NioTask task = (NioTask) a;                invokeChannelUnregistered(task, key, e);            }        }    }    //更新當(dāng)前事件循環(huán)選擇器    selector = newSelectorTuple.selector;    unwrappedSelector = newSelectorTuple.unwrappedSelector;    try {        // time to close the old selector as everything else is registered to the new one        oldSelector.close(); //關(guān)閉原始選擇器    } catch (Throwable t) {        if (logger.isWarnEnabled()) {            logger.warn("Failed to close the old Selector.", t);        }    }    if (logger.isInfoEnabled()) {        logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");    }}

從上述過程中我們發(fā)現(xiàn),Netty解決NIO空輪轉(zhuǎn)問題的方式,是通過重建Selector對象來完成的,在這個重建過程中,核心是把Selector中所有的SelectionKey重新注冊到新的Selector上,從而巧妙的避免了JDK epoll空輪訓(xùn)問題。

連接的建立及處理過程

在9.2.4.3節(jié)中,提到了當(dāng)客戶端有連接或者讀事件發(fā)送到服務(wù)端時(shí),會調(diào)用NioMessageUnsafe類的read()方法。

public void read() {    assert eventLoop().inEventLoop();    final ChannelConfig config = config();    final ChannelPipeline pipeline = pipeline();    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();    allocHandle.reset(config);    boolean closed = false;    Throwable exception = null;    try {        try {            do {                //如果有客戶端連接進(jìn)來,則localRead為1,否則返回0                int localRead = doReadMessages(readBuf);                if (localRead == 0) {                    break;                }                if (localRead < 0) {                    closed = true;                    break;                }                allocHandle.incMessagesRead(localRead); //累計(jì)增加read消息數(shù)量            } while (continueReading(allocHandle));        } catch (Throwable t) {            exception = t;        }        int size = readBuf.size(); //遍歷客戶端連接列表        for (int i = 0; i < size; i ++) {            readPending = false;            pipeline.fireChannelRead(readBuf.get(i)); //調(diào)用pipeline中handler的channelRead方法。        }        readBuf.clear(); //清空集合        allocHandle.readComplete();        pipeline.fireChannelReadComplete(); //觸發(fā)pipeline中handler的readComplete方法        if (exception != null) {            closed = closeOnReadError(exception);            pipeline.fireExceptionCaught(exception);        }        if (closed) {            inputShutdown = true;            if (isOpen()) {                close(voidPromise());            }        }    } finally {        if (!readPending && !config.isAutoRead()) {            removeReadOp();        }    }}

pipeline.fireChannelRead(readBuf.get(i))

繼續(xù)來看pipeline的觸發(fā)方法,此時(shí)的pipeline組成,如果當(dāng)前是連接事件,那么pipeline = ServerBootstrap$ServerBootstrapAcceptor。

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);    EventExecutor executor = next.executor();    if (executor.inEventLoop()) {        next.invokeChannelRead(m); //獲取pipeline中的下一個節(jié)點(diǎn),調(diào)用該handler的channelRead方法    } else {        executor.execute(new Runnable() {            @Override            public void run() {                next.invokeChannelRead(m);            }        });    }}

ServerBootstrapAcceptor

ServerBootstrapAcceptor是NioServerSocketChannel中一個特殊的Handler,專門用來處理客戶端連接事件,該方法中核心的目的是把針對SocketChannel的handler鏈表,添加到當(dāng)前NioSocketChannel中的pipeline中。

public void channelRead(ChannelHandlerContext ctx, Object msg) {    final Channel child = (Channel) msg;    child.pipeline().addLast(childHandler);  //把服務(wù)端配置的childHandler,添加到當(dāng)前NioSocketChannel中的pipeline中    setChannelOptions(child, childOptions, logger); //設(shè)置NioSocketChannel的屬性    setAttributes(child, childAttrs);     try {        //把當(dāng)前的NioSocketChannel注冊到Selector上,并且監(jiān)聽一個異步事件。        childGroup.register(child).addListener(new ChannelFutureListener() {            @Override            public void operationComplete(ChannelFuture future) throws Exception {                if (!future.isSuccess()) {                    forceClose(child, future.cause());                }            }        });    } catch (Throwable t) {        forceClose(child, t);    }}

pipeline的構(gòu)建過程

9.6.2節(jié)中,child其實(shí)就是一個NioSocketChannel,它是在NioServerSocketChannel中,當(dāng)接收到一個新的鏈接時(shí),創(chuàng)建對象。

@Overrideprotected int doReadMessages(List buf) throws Exception {    SocketChannel ch = SocketUtils.accept(javaChannel());    try {        if (ch != null) {            buf.add(new NioSocketChannel(this, ch)); //這里            return 1;        }    } catch (Throwable t) {        logger.warn("Failed to create a new channel from an accepted socket.", t);        try {            ch.close();        } catch (Throwable t2) {            logger.warn("Failed to close a socket.", t2);        }    }    return 0;}

而NioSocketChannel在構(gòu)造時(shí),調(diào)用了父類AbstractChannel中的構(gòu)造方法,初始化了一個pipeline.

protected AbstractChannel(Channel parent) {    this.parent = parent;    id = newId();    unsafe = newUnsafe();    pipeline = newChannelPipeline();}

DefaultChannelPipeline

pipeline的默認(rèn)實(shí)例是DefaultChannelPipeline,構(gòu)造方法如下。

protected DefaultChannelPipeline(Channel channel) {    this.channel = ObjectUtil.checkNotNull(channel, "channel");    succeededFuture = new SucceededChannelFuture(channel, null);    voidPromise =  new VoidChannelPromise(channel, true);    tail = new TailContext(this);    head = new HeadContext(this);    head.next = tail;    tail.prev = head;}

初始化了一個頭節(jié)點(diǎn)和尾節(jié)點(diǎn),組成一個雙向鏈表,如圖9-2所示

圖9-2

NioSocketChannel中handler鏈的構(gòu)成

再回到ServerBootstrapAccepter的channelRead方法中,收到客戶端連接時(shí),觸發(fā)了NioSocketChannel中的pipeline的添加

以下代碼是DefaultChannelPipeline的addLast方法。

@Overridepublic final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {    ObjectUtil.checkNotNull(handlers, "handlers");    for (ChannelHandler h: handlers) { //遍歷handlers列表,此時(shí)這里的handler是ChannelInitializer回調(diào)方法        if (h == null) {            break;        }        addLast(executor, null, h);    }    return this;}

addLast

把服務(wù)端配置的ChannelHandler,添加到pipeline中,注意,此時(shí)的pipeline中保存的是ChannelInitializer回調(diào)方法。

@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {    final AbstractChannelHandlerContext newCtx;    synchronized (this) {        checkMultiplicity(handler); //檢查是否有重復(fù)的handler        //創(chuàng)建新的DefaultChannelHandlerContext節(jié)點(diǎn)        newCtx = newContext(group, filterName(name, handler), handler);        addLast0(newCtx);  //添加新的DefaultChannelHandlerContext到ChannelPipeline        if (!registered) {             newCtx.setAddPending();            callHandlerCallbackLater(newCtx, true);            return this;        }        EventExecutor executor = newCtx.executor();        if (!executor.inEventLoop()) {            callHandlerAddedInEventLoop(newCtx, executor);            return this;        }    }    callHandlerAdded0(newCtx);    return this;}

這個回調(diào)方法什么時(shí)候觸發(fā)調(diào)用呢?其實(shí)就是在ServerBootstrapAcceptor這個類的channelRead方法中,注冊當(dāng)前NioSocketChannel時(shí)

childGroup.register(child).addListener(new ChannelFutureListener() {}

最終按照之前我們上一節(jié)課源碼分析的思路,定位到AbstractChannel中的register0方法中。

private void register0(ChannelPromise promise) {            try {                // check if the channel is still open as it could be closed in the mean time when the register                // call was outside of the eventLoop                if (!promise.setUncancellable() || !ensureOpen(promise)) {                    return;                }                boolean firstRegistration = neverRegistered;                doRegister();                neverRegistered = false;                registered = true;                //                pipeline.invokeHandlerAddedIfNeeded();            }}

callHandlerAddedForAllHandlers

pipeline.invokeHandlerAddedIfNeeded()方法,向下執(zhí)行,會進(jìn)入到DefaultChannelPipeline這個類中的callHandlerAddedForAllHandlers方法中

private void callHandlerAddedForAllHandlers() {    final PendingHandlerCallback pendingHandlerCallbackHead;    synchronized (this) {        assert !registered;        // This Channel itself was registered.        registered = true;        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;        // Null out so it can be GCed.        this.pendingHandlerCallbackHead = null;    }    //從等待被調(diào)用的handler 回調(diào)列表中,取出任務(wù)來執(zhí)行。    PendingHandlerCallback task = pendingHandlerCallbackHead;    while (task != null) {        task.execute();        task = task.next;    }}

我們發(fā)現(xiàn),pendingHandlerCallbackHead這個單向鏈表,是在callHandlerCallbackLater方法中被添加的,

而callHandlerCallbackLater又是在addLast方法中添加的,所以構(gòu)成了一個異步完整的閉環(huán)。

ChannelInitializer.handlerAdded

task.execute()方法執(zhí)行路徑是

callHandlerAdded0 -> ctx.callHandlerAdded ->

? -------> AbstractChannelHandlerContext.callHandlerAffffded()

? ---------------> ChannelInitializer.handlerAdded

調(diào)用initChannel方法來初始化NioSocketChannel中的Channel.

@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {    if (ctx.channel().isRegistered()) {        // This should always be true with our current DefaultChannelPipeline implementation.        // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering        // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers        // will be added in the expected order.        if (initChannel(ctx)) {            // We are done with init the Channel, removing the initializer now.            removeState(ctx);        }    }}

接著,調(diào)用initChannel抽象方法,該方法由具體的實(shí)現(xiàn)類來完成。

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {    if (initMap.add(ctx)) { // Guard against re-entrance.        try {            initChannel((C) ctx.channel());        } catch (Throwable cause) {            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).            // We do so to prevent multiple calls to initChannel(...).            exceptionCaught(ctx, cause);        } finally {            ChannelPipeline pipeline = ctx.pipeline();            if (pipeline.context(this) != null) {                pipeline.remove(this);            }        }        return true;    }    return false;}

ChannelInitializer的實(shí)現(xiàn),是我們自定義Server中的匿名內(nèi)部類,ChannelInitializer。因此通過這個回調(diào)來完成當(dāng)前NioSocketChannel的pipeline的構(gòu)建過程。

public static void main(String[] args){    EventLoopGroup boss = new NioEventLoopGroup();    //2 用于對接受客戶端連接讀寫操作的線程工作組    EventLoopGroup work = new NioEventLoopGroup();    ServerBootstrap b = new ServerBootstrap();    b.group(boss, work) //綁定兩個工作線程組        .channel(NioServerSocketChannel.class)  //設(shè)置NIO的模式        // 初始化綁定服務(wù)通道        .childHandler(new ChannelInitializer() {            @Override            protected void initChannel(SocketChannel sc) throws Exception {                sc.pipeline()                    .addLast(                    new LengthFieldBasedFrameDecoder(1024,                                                     9,4,0,0))                    .addLast(new MessageRecordEncoder())                    .addLast(new MessageRecordDecode())                    .addLast(new ServerHandler());            }        });}

文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/124412.html

相關(guān)文章

  • #yyds干貨盤點(diǎn)#學(xué)不懂Netty?看不懂源碼?不存在的,這篇文章手把手帶你閱讀Netty源碼

    摘要:簡單來說就是把注冊的動作異步化,當(dāng)異步執(zhí)行結(jié)束后會把執(zhí)行結(jié)果回填到中抽象類一般就是公共邏輯的處理,而這里的處理主要就是針對一些參數(shù)的判斷,判斷完了之后再調(diào)用方法。 閱讀這篇文章之前,建議先閱讀和這篇文章關(guān)聯(lián)的內(nèi)容。 1. 詳細(xì)剖析分布式微服務(wù)架構(gòu)下網(wǎng)絡(luò)通信的底層實(shí)現(xiàn)原理(圖解) 2. (年薪60W的技巧)工作了5年,你真的理解Netty以及為什么要用嗎?(深度干貨)...

    zsirfs 評論0 收藏0
  • Netty 源碼分析 三 我就是大名鼎鼎的 EventLoop(一)

    摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...

    livem 評論0 收藏0
  • #yyds干貨盤點(diǎn)# 常用軟件框架,總有一個用的上

    摘要:一界面框架是微軟在其最新桌面操作系統(tǒng)中使用的圖形用戶界面。干貨盤點(diǎn)二服務(wù)在寫后臺代碼的過程中,經(jīng)常會遇到要寫一些多帶帶的服務(wù)。這個傳統(tǒng)的控件開發(fā)起來很不方面,使用也不友好。發(fā)現(xiàn)有用的,這個第三方的框架,集成的很好,用起來也方便。一、Fluent Ribbon界面框架Fluent/Ribbon是微軟在其最新桌面操作系統(tǒng)Windows 7中使用的圖形用戶界面。 Windows平臺的進(jìn)化,伴隨著系...

    番茄西紅柿 評論0 收藏2637
  • #yyds干貨盤點(diǎn)#Apache的三種模式

    1.Apache prefork模型:apache的默認(rèn)的模型預(yù)派 生模式,有 一個主控制進(jìn)程,然后 生成多個 子進(jìn)程,使 用select模型,最 大并發(fā)1024,每個 子進(jìn)程有 一個獨(dú) 立的線程響應(yīng) 用戶請求,相對 比較占 用內(nèi)存,但是 比較穩(wěn)定,可以設(shè)置最 大和最 小進(jìn)程數(shù),是最古 老 的 一種模式,也是最穩(wěn)定的模式,適 用于訪問量 不 是很 大的場景。優(yōu)點(diǎn):穩(wěn)定缺點(diǎn): 大量 用戶訪問慢,占...

    loonggg 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<