摘要:接上篇源碼分析之三我就是大名鼎鼎的一的處理循環(huán)在中一個需要負責兩個工作第一個是作為線程負責相應的操作第二個是作為任務線程執(zhí)行中的任務接下來我們先從操縱方面入手看一下數(shù)據(jù)是如何從傳遞到我們的中的是模型的一個實現(xiàn)并且是基于的那么從的前生今世之四
接上篇Netty 源碼分析之 三 我就是大名鼎鼎的 EventLoop(一)
Netty 的 IO 處理循環(huán)在 Netty 中, 一個 EventLoop 需要負責兩個工作, 第一個是作為 IO 線程, 負責相應的 IO 操作; 第二個是作為任務線程, 執(zhí)行 taskQueue 中的任務. 接下來我們先從 IO 操縱方面入手, 看一下 TCP 數(shù)據(jù)是如何從 Java NIO Socket 傳遞到我們的 handler 中的.
Netty 是 Reactor 模型的一個實現(xiàn), 并且是基于 Java NIO 的, 那么從 Java NIO 的前生今世 之四 NIO Selector 詳解 中我們知道, Netty 中必然有一個 Selector 線程, 用于不斷調(diào)用 Java NIO 的 Selector.select 方法, 查詢當前是否有就緒的 IO 事件. 回顧一下在 Java NIO 中所講述的 Selector 的使用流程:
通過 Selector.open() 打開一個 Selector.
將 Channel 注冊到 Selector 中, 并設置需要監(jiān)聽的事件(interest set)
不斷重復:
調(diào)用 select() 方法
調(diào)用 selector.selectedKeys() 獲取 selected keys
迭代每個 selected key:
1) 從 selected key 中獲取 對應的 Channel 和附加信息(如果有的話)
2) 判斷是哪些 IO 事件已經(jīng)就緒了, 然后處理它們. 如果是 OP_ACCEPT 事件, 則調(diào)用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 獲取 SocketChannel, 并將它設置為 非阻塞的, 然后將這個 Channel 注冊到 Selector 中.
3) 根據(jù)需要更改 selected key 的監(jiān)聽事件.
4) 將已經(jīng)處理過的 key 從 selected keys 集合中刪除.
上面的使用流程用代碼來體現(xiàn)就是:
/** * @author xiongyongshun * @Email yongshun1228@gmail.com * @version 1.0 * @created 16/8/1 13:13 */ public class NioEchoServer { private static final int BUF_SIZE = 256; private static final int TIMEOUT = 3000; public static void main(String args[]) throws Exception { // 打開服務端 Socket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 打開 Selector Selector selector = Selector.open(); // 服務端 Socket 監(jiān)聽8080端口, 并配置為非阻塞模式 serverSocketChannel.socket().bind(new InetSocketAddress(8080)); serverSocketChannel.configureBlocking(false); // 將 channel 注冊到 selector 中. // 通常我們都是先注冊一個 OP_ACCEPT 事件, 然后在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ // 注冊到 Selector 中. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { // 通過調(diào)用 select 方法, 阻塞地等待 channel I/O 可操作 if (selector.select(TIMEOUT) == 0) { System.out.print("."); continue; } // 獲取 I/O 操作就緒的 SelectionKey, 通過 SelectionKey 可以知道哪些 Channel 的哪類 I/O 操作已經(jīng)就緒. IteratorkeyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); // 當獲取一個 SelectionKey 后, 就要將它刪除, 表示我們已經(jīng)對這個 IO 事件進行了處理. keyIterator.remove(); if (key.isAcceptable()) { // 當 OP_ACCEPT 事件到來時, 我們就有從 ServerSocketChannel 中獲取一個 SocketChannel, // 代表客戶端的連接 // 注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel. // 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel. SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept(); clientChannel.configureBlocking(false); //在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 注冊到 Selector 中. // 注意, 這里我們?nèi)绻麤]有設置 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 那么 select 方法會一直直接返回. clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE)); } if (key.isReadable()) { SocketChannel clientChannel = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clientChannel.read(buf); if (bytesRead == -1) { clientChannel.close(); } else if (bytesRead > 0) { key.interestOps(OP_READ | SelectionKey.OP_WRITE); System.out.println("Get data length: " + bytesRead); } } if (key.isValid() && key.isWritable()) { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clientChannel = (SocketChannel) key.channel(); clientChannel.write(buf); if (!buf.hasRemaining()) { key.interestOps(OP_READ); } buf.compact(); } } } } }
還記得不, 上面操作的第一步 通過 Selector.open() 打開一個 Selector 我們已經(jīng)在第一章的 Channel 實例化 這一小節(jié)中已經(jīng)提到了, Netty 中是通過調(diào)用 SelectorProvider.openSocketChannel() 來打開一個新的 Java NIO SocketChannel:
private static SocketChannel newSocket(SelectorProvider provider) { ... return provider.openSocketChannel(); }
第二步 將 Channel 注冊到 Selector 中, 并設置需要監(jiān)聽的事件(interest set) 的操作我們在第一章 channel 的注冊過程 中也分析過了, 我們在來回顧一下, 在客戶端的 Channel 注冊過程中, 會有如下調(diào)用鏈:
Bootstrap.initAndRegister -> AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register -> AbstractUnsafe.register0 -> AbstractNioChannel.doRegister
在 AbstractUnsafe.register 方法中調(diào)用了 register0 方法:
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { // 省略條件判斷和錯誤處理 AbstractChannel.this.eventLoop = eventLoop; register0(promise); }
register0 方法代碼如下:
private void register0(ChannelPromise promise) { boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (firstRegistration && isActive()) { pipeline.fireChannelActive(); } }
register0 又調(diào)用了 AbstractNioChannel.doRegister:
@Override protected void doRegister() throws Exception { // 省略錯誤處理 selectionKey = javaChannel().register(eventLoop().selector, 0, this); }
在這里 javaChannel() 返回的是一個 Java NIO SocketChannel 對象, 我們將此 SocketChannel 注冊到前面第一步獲取的 Selector 中.
那么接下來的第三步的循環(huán)是在哪里實現(xiàn)的呢? 第三步的操作就是我們今天分析的關鍵, 下面我會一步一步向讀者展示出來.
thread 的 run 循環(huán)在 EventLoop 的啟動 一小節(jié)中, 我們已經(jīng)了解到了, 當 EventLoop.execute 第一次被調(diào)用時, 就會觸發(fā) startThread() 的調(diào)用, 進而導致了 EventLoop 所對應的 Java 線程的啟動. 接著我們來更深入一些, 來看一下此線程啟動后都會做什么東東吧.
下面是此線程的 run() 方法, 我已經(jīng)把一些異常處理和收尾工作的代碼都去掉了. 這個 run 方法可以說是十分簡單, 主要就是調(diào)用了 SingleThreadEventExecutor.this.run() 方法. 而 SingleThreadEventExecutor.run() 是一個抽象方法, 它的實現(xiàn)在 NioEventLoop 中.
thread = threadFactory.newThread(new Runnable() { @Override public void run() { boolean success = false; updateLastExecutionTime(); try { SingleThreadEventExecutor.this.run(); success = true; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: ", t); } finally { ... } } });
繼續(xù)跟蹤到 NioEventLoop.run() 方法, 其源碼如下:
@Override protected void run() { for (;;) { boolean oldWakenUp = wakenUp.getAndSet(false); try { if (hasTasks()) { selectNow(); } else { select(oldWakenUp); if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break; } } } catch (Throwable t) { ... } } }
啊哈, 看到了上面代碼的 for(;;) 所構(gòu)成的死循環(huán)了沒? 原來 NioEventLoop 事件循環(huán)的核心就是這里!
現(xiàn)在我們把上面所提到的 Selector 使用步驟的第三步的部分也找到了.
這個 run 方法可以說是 Netty NIO 的核心, 屬于重中之重, 把它分析明白了, 那么對 Netty 的事件循環(huán)機制也就了解了大部分了. 讓我們一鼓作氣, 繼續(xù)分析下去吧!
首先, 在 run 方法中, 第一步是調(diào)用 hasTasks() 方法來判斷當前任務隊列中是否有任務:
protected boolean hasTasks() { assert inEventLoop(); return !taskQueue.isEmpty(); }
這個方法很簡單, 僅僅是檢查了一下 taskQueue 是否為空. 至于 taskQueue 是什么呢, 其實它就是存放一系列的需要由此 EventLoop 所執(zhí)行的任務列表. 關于 taskQueue, 我們這里暫時不表, 等到后面再來詳細分析它.
當 taskQueue 不為空時, 就執(zhí)行到了 if 分支中的 selectNow() 方法. 然而當 taskQueue 為空時, 執(zhí)行的是 select(oldWakenUp) 方法. 那么 selectNow() 和 select(oldWakenUp) 之間有什么區(qū)別呢? 來看一下, selectNow() 的源碼如下:
void selectNow() throws IOException { try { selector.selectNow(); } finally { // restore wakup state if needed if (wakenUp.get()) { selector.wakeup(); } } }
首先調(diào)用了 selector.selectNow() 方法, 這里 selector 是什么大家還有印象不? 我們在第一章 Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (客戶端) 時對它有過介紹, 這個 selector 字段正是 Java NIO 中的多路復用器 Selector. 那么這里 selector.selectNow() 就很好理解了, selectNow() 方法會檢查當前是否有就緒的 IO 事件, 如果有, 則返回就緒 IO 事件的個數(shù); 如果沒有, 則返回0. 注意, selectNow() 是立即返回的, 不會阻塞當前線程. 當 selectNow() 調(diào)用后, finally 語句塊中會檢查 wakenUp 變量是否為 true, 當為 true 時, 調(diào)用 selector.wakeup() 喚醒 select() 的阻塞調(diào)用.
看了 if 分支的 selectNow 方法后, 我們再來看一下 else 分支的 select(oldWakenUp) 方法.
其實 else 分支的 select(oldWakenUp) 方法的處理邏輯比較復雜, 而我們這里的目的暫時不是分析這個方法調(diào)用的具體工作, 因此我這里長話短說, 只列出我們我們關注的內(nèi)如:
private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { ... int selectedKeys = selector.select(timeoutMillis); ... } catch (CancelledKeyException e) { ... } }
在這個 select 方法中, 調(diào)用了 selector.select(timeoutMillis), 而這個調(diào)用是會阻塞住當前線程的, timeoutMillis 是阻塞的超時時間.
到來這里, 我們可以看到, 當 hasTasks() 為真時, 調(diào)用的的 selectNow() 方法是不會阻塞當前線程的, 而當 hasTasks() 為假時, 調(diào)用的 select(oldWakenUp) 是會阻塞當前線程的.
這其實也很好理解: 當 taskQueue 中沒有任務時, 那么 Netty 可以阻塞地等待 IO 就緒事件; 而當 taskQueue 中有任務時, 我們自然地希望所提交的任務可以盡快地執(zhí)行, 因此 Netty 會調(diào)用非阻塞的 selectNow() 方法, 以保證 taskQueue 中的任務盡快可以執(zhí)行.
在 NioEventLoop.run() 方法中, 第一步是通過 select/selectNow 調(diào)用查詢當前是否有就緒的 IO 事件. 那么當有 IO 事件就緒時, 第二步自然就是處理這些 IO 事件啦.
首先讓我們來看一下 NioEventLoop.run 中循環(huán)的剩余部分:
final int ioRatio = this.ioRatio; if (ioRatio == 100) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); }
上面列出的代碼中, 有兩個關鍵的調(diào)用, 第一個是 processSelectedKeys() 調(diào)用, 根據(jù)字面意思, 我們可以猜出這個方法肯定是查詢就緒的 IO 事件, 然后處理它; 第二個調(diào)用是 runAllTasks(), 這個方法我們也可以一眼就看出來它的功能就是運行 taskQueue 中的任務.
這里的代碼還有一個十分有意思的地方, 即 ioRatio. 那什么是 ioRatio呢? 它表示的是此線程分配給 IO 操作所占的時間比(即運行 processSelectedKeys 耗時在整個循環(huán)中所占用的時間). 例如 ioRatio 默認是 50, 則表示 IO 操作和執(zhí)行 task 的所占用的線程執(zhí)行時間比是 1 : 1. 當知道了 IO 操作耗時和它所占用的時間比, 那么執(zhí)行 task 的時間就可以很方便的計算出來了:
設 IO 操作耗時為 ioTime, ioTime 占的時間比例為 ioRatio, 則: ioTime / ioRatio = taskTime / taskRatio taskRatio = 100 - ioRatio => taskTime = ioTime * (100 - ioRatio) / ioRatio
根據(jù)上面的公式, 當我們設置 ioRate = 70 時, 則表示 IO 運行耗時占比為70%, 即假設某次循環(huán)一共耗時為 100ms, 那么根據(jù)公式, 我們知道 processSelectedKeys() 方法調(diào)用所耗時大概為70ms(即 IO 耗時), 而 runAllTasks() 耗時大概為 30ms(即執(zhí)行 task 耗時).
當 ioRatio 為 100 時, Netty 就不考慮 IO 耗時的占比, 而是分別調(diào)用 processSelectedKeys()、runAllTasks(); 而當 ioRatio 不為 100時, 則執(zhí)行到 else 分支, 在這個分支中, 首先記錄下 processSelectedKeys() 所執(zhí)行的時間(即 IO 操作的耗時), 然后根據(jù)公式, 計算出執(zhí)行 task 所占用的時間, 然后以此為參數(shù), 調(diào)用 runAllTasks().
我們這里先分析一下 processSelectedKeys() 方法調(diào)用, runAllTasks() 我們留到下一節(jié)再分析.
processSelectedKeys() 方法的源碼如下:
private void processSelectedKeys() { if (selectedKeys != null) { processSelectedKeysOptimized(selectedKeys.flip()); } else { processSelectedKeysPlain(selector.selectedKeys()); } }
這個方法中, 會根據(jù) selectedKeys 字段是否為空, 而分別調(diào)用 processSelectedKeysOptimized 或 processSelectedKeysPlain. selectedKeys 字段是在調(diào)用 openSelector() 方法時, 根據(jù) JVM 平臺的不同, 而有設置不同的值, 在我所調(diào)試這個值是不為 null 的. 其實 processSelectedKeysOptimized 方法 processSelectedKeysPlain 沒有太大的區(qū)別, 為了簡單起見, 我們以 processSelectedKeysOptimized 為例分析一下源碼的工作流程吧.
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) { for (int i = 0;; i ++) { final SelectionKey k = selectedKeys[i]; if (k == null) { break; } selectedKeys[i] = null; final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTasktask = (NioTask ) a; processSelectedKey(k, task); } ... } }
其實你別看它代碼挺多的, 但是關鍵的點就兩個: 迭代 selectedKeys 獲取就緒的 IO 事件, 然后為每個事件都調(diào)用 processSelectedKey 來處理它.
這里正好完美對應上了我們提到的 Selector 的使用流程中的第三步里操作.
還有一點需要注意的是, 我們可以調(diào)用 selectionKey.attach(object) 給一個 selectionKey 設置一個附加的字段, 然后可以通過 Object attachedObj = selectionKey.attachment() 獲取它. 上面代代碼正是通過了 k.attachment() 來獲取一個附加在 selectionKey 中的對象, 那么這個對象是什么呢? 它又是在哪里設置的呢? 我們再來回憶一下 SocketChannel 是如何注冊到 Selector 中的:
在客戶端的 Channel 注冊過程中, 會有如下調(diào)用鏈:
Bootstrap.initAndRegister -> AbstractBootstrap.initAndRegister -> MultithreadEventLoopGroup.register -> SingleThreadEventLoop.register -> AbstractUnsafe.register -> AbstractUnsafe.register0 -> AbstractNioChannel.doRegister
最后的 AbstractNioChannel.doRegister 方法會調(diào)用 SocketChannel.register 方法注冊一個 SocketChannel 到指定的 Selector:
@Override protected void doRegister() throws Exception { // 省略錯誤處理 selectionKey = javaChannel().register(eventLoop().selector, 0, this); }
特別注意一下 register 的第三個參數(shù), 這個參數(shù)是設置 selectionKey 的附加對象的, 和調(diào)用 selectionKey.attach(object) 的效果一樣. 而調(diào)用 register 所傳遞的第三個參數(shù)是 this, 它其實就是一個 NioSocketChannel 的實例. 那么這里就很清楚了, 我們在將 SocketChannel 注冊到 Selector 中時, 將 SocketChannel 所對應的 NioSocketChannel 以附加字段的方式添加到了selectionKey 中.
再回到 processSelectedKeysOptimized 方法中, 當我們獲取到附加的對象后, 我們就調(diào)用 processSelectedKey 來處理這個 IO 事件:
final Object a = k.attachment(); if (a instanceof AbstractNioChannel) { processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTasktask = (NioTask ) a; processSelectedKey(k, task); }
processSelectedKey 方法源碼如下:
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) { final NioUnsafe unsafe = ch.unsafe(); ... try { int readyOps = k.readyOps(); // 可讀事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } } // 可寫事件 if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); } // 連接建立事件 if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); } } catch (CancelledKeyException ignored) { unsafe.close(unsafe.voidPromise()); } }
這個代碼是不是很熟悉啊? 完全是 Java NIO 的 Selector 的那一套處理流程嘛!
processSelectedKey 中處理了三個事件, 分別是:
OP_READ, 可讀事件, 即 Channel 中收到了新數(shù)據(jù)可供上層讀取.
OP_WRITE, 可寫事件, 即上層可以向 Channel 寫入數(shù)據(jù).
OP_CONNECT, 連接建立事件, 即 TCP 連接已經(jīng)建立, Channel 處于 active 狀態(tài).
下面我們分別根據(jù)這三個事件來看一下 Netty 是怎么處理的吧.
OP_READ 處理當就緒的 IO 事件是 OP_READ, 代碼會調(diào)用 unsafe.read() 方法, 即:
// 可讀事件 if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); if (!ch.isOpen()) { // Connection already closed - no need to handle write. return; } }
unsafe 這個字段, 我們已經(jīng)和它打了太多的交道了, 在第一章 Netty 源碼分析之 一 揭開 Bootstrap 神秘的紅蓋頭 (客戶端) 中我們已經(jīng)對它進行過濃墨重彩地分析了, 最后我們確定了它是一個 NioSocketChannelUnsafe 實例, 負責的是 Channel 的底層 IO 操作.
我們可以利用 Intellij IDEA 提供的 Go To Implementations 功能, 尋找到這個方法的實現(xiàn). 最后我們發(fā)現(xiàn)這個方法沒有在 NioSocketChannelUnsafe 中實現(xiàn), 而是在它的父類 AbstractNioByteChannel 實現(xiàn)的, 它的實現(xiàn)源碼如下:
@Override public final void read() { ... ByteBuf byteBuf = null; int messages = 0; boolean close = false; try { int totalReadAmount = 0; boolean readPendingReset = false; do { byteBuf = allocHandle.allocate(allocator); int writable = byteBuf.writableBytes(); int localReadAmount = doReadBytes(byteBuf); // 檢查讀取結(jié)果. ... pipeline.fireChannelRead(byteBuf); byteBuf = null; ... totalReadAmount += localReadAmount; // 檢查是否是配置了自動讀取, 如果不是, 則立即退出循環(huán). ... } while (++ messages < maxMessagesPerRead); pipeline.fireChannelReadComplete(); allocHandle.record(totalReadAmount); if (close) { closeOnRead(pipeline); close = false; } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close); } finally { } }
read() 源碼比較長, 我為了篇幅起見, 刪除了部分代碼, 只留下了主干. 不過我建議讀者朋友們自己一定要看一下 read() 源碼, 這對理解 Netty 的 EventLoop 十分有幫助.
上面 read 方法其實歸納起來, 可以認為做了如下工作:
分配 ByteBuf
從 SocketChannel 中讀取數(shù)據(jù)
調(diào)用 pipeline.fireChannelRead 發(fā)送一個 inbound 事件.
前面兩點沒什么好說的, 第三點 pipeline.fireChannelRead 讀者朋友們看到了有沒有會心一笑地感覺呢? 反正我看到這里時是有的. pipeline.fireChannelRead 正好就是我們在第二章 Netty 源碼分析之 二 貫穿Netty 的大動脈 ── ChannelPipeline (二) 中分析的 inbound 事件起點. 當調(diào)用了 pipeline.fireIN_EVT() 后, 那么就產(chǎn)生了一個 inbound 事件, 此事件會以 head -> customContext -> tail 的方向依次流經(jīng) ChannelPipeline 中的各個 handler.
調(diào)用了 pipeline.fireChannelRead 后, 就是 ChannelPipeline 中所需要做的工作了, 這些我們已經(jīng)在第二章中有過詳細討論, 這里就展開了.
OP_WRITE 可寫事件代碼如下. 這里代碼比較簡單, 沒有詳細分析的必要了.
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write ch.unsafe().forceFlush(); }OP_CONNECT 處理
最后一個事件是 OP_CONNECT, 即 TCP 連接已建立事件.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops); unsafe.finishConnect(); }
OP_CONNECT 事件的處理中, 只做了兩件事情:
正如代碼中的注釋所言, 我們需要將 OP_CONNECT 從就緒事件集中清除, 不然會一直有 OP_CONNECT 事件.
調(diào)用 unsafe.finishConnect() 通知上層連接已建立
unsafe.finishConnect() 調(diào)用最后會調(diào)用到 pipeline().fireChannelActive(), 產(chǎn)生一個 inbound 事件, 通知 pipeline 中的各個 handler TCP 通道已建立(即 ChannelInboundHandler.channelActive 方法會被調(diào)用)
到了這里, 我們整個 NioEventLoop 的 IO 操作部分已經(jīng)了解完了, 接下來的一節(jié)我們要重點分析一下 Netty 的任務隊列機制.
Netty 的任務隊列機制我們已經(jīng)提到過, 在Netty 中, 一個 NioEventLoop 通常需要肩負起兩種任務, 第一個是作為 IO 線程, 處理 IO 操作; 第二個就是作為任務線程, 處理 taskQueue 中的任務. 這一節(jié)的重點就是分析一下 NioEventLoop 的任務隊列機制的.
Task 的添加 普通 Runnable 任務NioEventLoop 繼承于 SingleThreadEventExecutor, 而 SingleThreadEventExecutor 中有一個 Queue
例如當我們需要將一個 Runnable 添加到 taskQueue 中時, 我們可以進行如下操作:
EventLoop eventLoop = channel.eventLoop(); eventLoop.execute(new Runnable() { @Override public void run() { System.out.println("Hello, Netty!"); } });
當調(diào)用 execute 后, 實際上是調(diào)用到了 SingleThreadEventExecutor.execute() 方法, 它的實現(xiàn)如下:
@Override public void execute(Runnable task) { if (task == null) { throw new NullPointerException("task"); } boolean inEventLoop = inEventLoop(); if (inEventLoop) { addTask(task); } else { startThread(); addTask(task); if (isShutdown() && removeTask(task)) { reject(); } } if (!addTaskWakesUp && wakesUpForTask(task)) { wakeup(inEventLoop); } }
而添加任務的 addTask 方法的源碼如下:
protected void addTask(Runnable task) { if (task == null) { throw new NullPointerException("task"); } if (isShutdown()) { reject(); } taskQueue.add(task); }
因此實際上, taskQueue 是存放著待執(zhí)行的任務的隊列.
schedule 任務除了通過 execute 添加普通的 Runnable 任務外, 我們還可以通過調(diào)用 eventLoop.scheduleXXX 之類的方法來添加一個定時任務.
EventLoop 中實現(xiàn)任務隊列的功能在超類 SingleThreadEventExecutor 實現(xiàn)的, 而 schedule 功能的實現(xiàn)是在 SingleThreadEventExecutor 的父類, 即 AbstractScheduledEventExecutor 中實現(xiàn)的.
在 AbstractScheduledEventExecutor 中, 有以 scheduledTaskQueue 字段:
Queue> scheduledTaskQueue;
scheduledTaskQueue 是一個隊列(Queue), 其中存放的元素是 ScheduledFutureTask. 而 ScheduledFutureTask 我們很容易猜到, 它是對 Schedule 任務的一個抽象.
我們來看一下 AbstractScheduledEventExecutor 所實現(xiàn)的 schedule 方法吧:
@Override public ScheduledFuture> schedule(Runnable command, long delay, TimeUnit unit) { ObjectUtil.checkNotNull(command, "command"); ObjectUtil.checkNotNull(unit, "unit"); if (delay < 0) { throw new IllegalArgumentException( String.format("delay: %d (expected: >= 0)", delay)); } return schedule(new ScheduledFutureTask( this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay)))); }
這是其中一個重載的 schedule, 當一個 Runnable 傳遞進來后, 會被封裝為一個 ScheduledFutureTask 對象, 這個對象會記錄下這個 Runnable 在何時運行、已何種頻率運行等信息.
當構(gòu)建了 ScheduledFutureTask 后, 會繼續(xù)調(diào)用 另一個重載的 schedule 方法:
ScheduledFuture schedule(final ScheduledFutureTask task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new OneTimeTask() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; }
在這個方法中, ScheduledFutureTask 對象就會被添加到 scheduledTaskQueue 中了.
任務的執(zhí)行當一個任務被添加到 taskQueue 后, 它是怎么被 EventLoop 執(zhí)行的呢?
讓我們回到 NioEventLoop.run() 方法中, 在這個方法里, 會分別調(diào)用 processSelectedKeys() 和 runAllTasks() 方法, 來進行 IO 事件的處理和 task 的處理. processSelectedKeys() 方法我們已經(jīng)分析過了, 下面我們來看一下 runAllTasks() 中到底有什么名堂吧.
runAllTasks 方法有兩個重載的方法, 一個是無參數(shù)的, 另一個有一個參數(shù)的. 首先來看一下無參數(shù)的 runAllTasks:
protected boolean runAllTasks() { fetchFromScheduledTaskQueue(); Runnable task = pollTask(); if (task == null) { return false; } for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception.", t); } task = pollTask(); if (task == null) { lastExecutionTime = ScheduledFutureTask.nanoTime(); return true; } } }
我們前面已經(jīng)提到過, EventLoop 可以通過調(diào)用 EventLoop.execute 來將一個 Runnable 提交到 taskQueue 中, 也可以通過調(diào)用 EventLoop.schedule 來提交一個 schedule 任務到 scheduledTaskQueue 中. 在此方法的一開始調(diào)用的 fetchFromScheduledTaskQueue() 其實就是將 scheduledTaskQueue 中已經(jīng)可以執(zhí)行的(即定時時間已到的 schedule 任務) 拿出來并添加到 taskQueue 中, 作為可執(zhí)行的 task 等待被調(diào)度執(zhí)行.
它的源碼如下:
private void fetchFromScheduledTaskQueue() { if (hasScheduledTasks()) { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { break; } taskQueue.add(scheduledTask); } } }
接下來 runAllTasks() 方法就會不斷調(diào)用 task = pollTask() 從 taskQueue 中獲取一個可執(zhí)行的 task, 然后調(diào)用它的 run() 方法來運行此 task.
注意, 因為 EventLoop 既需要執(zhí)行 IO 操作, 又需要執(zhí)行 task, 因此我們在調(diào)用 EventLoop.execute 方法提交任務時, 不要提交耗時任務, 更不能提交一些會造成阻塞的任務, 不然會導致我們的 IO 線程得不到調(diào)度, 影響整個程序的并發(fā)量.
本文由 yongshun 發(fā)表于個人博客, 采用 署名-相同方式共享 3.0 中國大陸許可協(xié)議.
Email: yongshun1228@gmail .com
本文標題為: Netty 源碼分析之 三 我就是大名鼎鼎的 EventLoop(二)
本文鏈接為: https://segmentfault.com/a/1190000007403937
文章版權歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/65280.html
摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...
摘要:背景在工作中雖然我經(jīng)常使用到庫但是很多時候?qū)Φ囊恍└拍钸€是處于知其然不知其所以然的狀態(tài)因此就萌生了學習源碼的想法剛開始看源碼的時候自然是比較痛苦的主要原因有兩個第一網(wǎng)上沒有找到讓我滿意的詳盡的源碼分析的教程第二我也是第一次系統(tǒng)地學習這么大代 背景 在工作中, 雖然我經(jīng)常使用到 Netty 庫, 但是很多時候?qū)?Netty 的一些概念還是處于知其然, 不知其所以然的狀態(tài), 因此就萌生了學...
摘要:我想這很好的解釋了中,僅僅一個都這么復雜,在單線程或者說串行的程序中,編程往往是很簡單的,說白了就是調(diào)用,調(diào)用,調(diào)用然后返回。 Netty源碼分析(三) 前提概要 這次停更很久了,原因是中途迷茫了一段時間,不過最近調(diào)整過來了。不過有點要說下,前幾天和業(yè)內(nèi)某個大佬聊天,收獲很多,所以這篇博文和之前也會不太一樣,我們會先從如果是我自己去實現(xiàn)這個功能需要怎么做開始,然后去看netty源碼,與...
摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結(jié)的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...
摘要:原理剖析第篇之服務端啟動工作原理分析下一大致介紹由于篇幅過長難以發(fā)布,所以本章節(jié)接著上一節(jié)來的,上一章節(jié)為原理剖析第篇之服務端啟動工作原理分析上那么本章節(jié)就繼續(xù)分析的服務端啟動,分析的源碼版本為二三四章節(jié)請看上一章節(jié)詳見原理剖析第篇之 原理剖析(第 011 篇)Netty之服務端啟動工作原理分析(下) - 一、大致介紹 1、由于篇幅過長難以發(fā)布,所以本章節(jié)接著上一節(jié)來的,上一章節(jié)為【原...
閱讀 2053·2021-11-22 13:52
閱讀 976·2021-11-17 09:33
閱讀 2708·2021-09-01 10:49
閱讀 2841·2019-08-30 15:53
閱讀 2659·2019-08-29 16:10
閱讀 2432·2019-08-29 11:31
閱讀 1343·2019-08-26 11:40
閱讀 1866·2019-08-26 10:59