摘要:背景最近發(fā)現(xiàn)的回調方法,在連接創(chuàng)建成功和讀取數(shù)據(jù)后都會被回調。那我也嘗試著從源碼找到答案吧?;卣{流程分析的回調流程和流程沒有什么區(qū)別,可參考上文分析。但是在的方法中會調用這個是讀數(shù)據(jù)的關鍵讀數(shù)據(jù)分析讀數(shù)據(jù)分析
背景
最近發(fā)現(xiàn)ChannelOutboundHandlerAdapter的read()回調方法,在連接創(chuàng)建成功和讀取數(shù)據(jù)后都會被回調。因此就產生了疑問“為什么建立連接和讀取數(shù)據(jù)后read()方法會被調用呢?” 從網上搜索到一片文章https://my.oschina.net/lifany... 可以看出一些端倪,但是具體流程和一些疑問還是沒有解開。
那我也嘗試著從源碼找到答案吧。
我們先寫個小Demo,其中Test1OutboundHandlerAdapter是一個ChannelOutboundHandlerAdapter,里面的read()添加一行打印。 Test1HandlerAdapter 是一個ChannelInboundHandlerAdapter 里面的channelActive(xxx)、
channelRead(xxx)、channelReadComplete(xxx)添加打印。由于很簡單,下面只貼部分代碼
Test1OutboundHandlerAdapter.java
@Override public void read(ChannelHandlerContext ctx) throws Exception { super.read(ctx); System.out.println("Test1OutboundHandlerAdapter------------->read"); }
Test1HandlerAdapter.java
@Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { super.channelRegistered(ctx); System.out.println("Test1HandlerAdapter-------------->channelRegistered"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); System.out.println("Test1HandlerAdapter-------------->channelActive"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Test1HandlerAdapter-------------->channelRead"); ctx.writeAndFlush(msg); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { super.channelReadComplete(ctx); }
然后我們建立連接,隨便發(fā)一下數(shù)據(jù),服務器收到數(shù)據(jù),打印如下:
Test1HandlerAdapter-------------->handlerAdded
Test1HandlerAdapter-------------->channelRegistered
Test1HandlerAdapter-------------->channelActive
Test1OutboundHandlerAdapter------------->read
Test1HandlerAdapter-------------->channelRead
Test1HandlerAdapter-------------->channelReadComplete
Test1OutboundHandlerAdapter------------->read
如果把Test1OutboundHandlerAdapter的read(xxx)回調方法注釋掉,會發(fā)現(xiàn)服務器無法接收數(shù)據(jù)了。
源碼分析 1.channelRegistered回調流程分析可以定位到在AbstractChannelHandlerContext invokeChannelRegistered()方法調用了channelRegistered(xxx)方法,然后再查找會發(fā)現(xiàn)是
AbstractChannelHandlerContext的fireChannelRegistered()----->
invokeChannelRegistered(final AbstractChannelHandlerContext next)----->invokeChannelRegistered()
AbstractChannelHandlerContext
@Override public ChannelHandlerContext fireChannelRegistered() { invokeChannelRegistered(findContextInbound()); return this; } static void invokeChannelRegistered(final AbstractChannelHandlerContext next) { EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRegistered(); } else { executor.execute(new Runnable() { @Override public void run() { next.invokeChannelRegistered(); } }); } } private void invokeChannelRegistered() { if (invokeHandler()) { try { /**ChannelInboundHandler的register(xxx)在這里被調用*/ ((ChannelInboundHandler) handler()).channelRegistered(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelRegistered(); } }
順藤fireChannelRegistered()摸瓜,最終定位到AbstractChannel內部類AbstractUnsafe的
register(EventLoop eventLoop, final ChannelPromise promise)----->register0(ChannelPromise promise)
AbstractUnsafe
@Override public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } } private void register0(ChannelPromise promise) { try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the // user may already fire events through the pipeline in the ChannelFutureListener. pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // Only fire a channelActive if the channel has never been registered. This prevents firing // multiple channel actives if the channel is deregistered and re-registered. if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { // This channel was registered before and autoRead() is set. This means we need to begin read // again so that we process inbound data. // // See https://github.com/netty/netty/issues/4805 beginRead(); } } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }
在繼續(xù)找的會找到在EventLooop層面的調用了,我們可以先不用管了。在register0的方法中又調用了pipeline.fireChannelRegistered()和pipeline.fireChannelActive();,這正是我們要找的,也符合打印順序先channelRegistered后channelActive。為了驗證我們可以加上斷點調試,就是這兒了。
至此我們可以總結一下:
channelRegistered流程:
說明
DefaultChannelPipeline 的fireChannelRegistered()
@Override public final ChannelPipeline fireChannelUnregistered() { AbstractChannelHandlerContext.invokeChannelUnregistered(head); return this; }
AbstractChannelHandlerContext.invokeChannelUnregistered(head);傳遞的參數(shù)是DefaultChannelPipeline的head,這樣保證了register事件沿著pipeline從頭流向尾,其對應DefaultChannelPipeline內部類HeadContext。 HeadContext多重身份即是ChannelHandlerContext又是ChannelInboundHandler和ChannelOutboundhandler
DefaultChannelPipeline
final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail; ...省略... protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
DefaultChannelPipeline的內部類HeadContext
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler { private final Unsafe unsafe; HeadContext(DefaultChannelPipeline pipeline) { super(pipeline, null, HEAD_NAME, false, true); unsafe = pipeline.channel().unsafe(); setAddComplete(); } @Override public ChannelHandler handler() { return this; } 省略后邊的代碼
2.上圖黃色的部分都是調用的HeadContext中的方法
static void invokeChannelRegistered(final AbstractChannelHandlerContext next)接收的參數(shù)是DefaultChannelPipeline傳遞的head即HeadContext,那么也就是head.invokeChannelRegistered()。
invokeChannelRegistered()方法中會調用
((ChannelInboundHandler) handler()).channelRegistered(this);
HeadContext類中該方法返回的就是自己(可查看上面的代碼),因為HeadContext本身也是ChannelInboundHandler。 同時又將自己作為參數(shù),調用自己的channelRegistered方法
3.HeadContext的ChannelRegister方法中調用AbstractChannelHandlerContext的fireChannelRegistered();
(還是調用的自己)該方法中調用了invokeChannelRegistered(findContextInbound()); findContextInbound()所實現(xiàn)的功能就是查找到下一個ChanelInboundHandler即HeadContext(本身是ChannelInboundHandler)下一個ChanelInboundHandler
上面的步驟不斷重復,自此registered事件可以沿著pipeline在不同的InboundHandler里流動了。
channelActive的回調流程和channelRegister流程沒有什么區(qū)別,可參考上文分析。 但是在HeadContext的channelActive方法中會調用readIfIsAutoRead(); 這個是讀數(shù)據(jù)的關鍵
3.netty讀數(shù)據(jù)分析讀數(shù)據(jù)分析
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69148.html
摘要:用這種方式很好的規(guī)避了多線程所帶來的問題,很值得我們借鑒那么這個怎么來的呢看一下的方法如果為,就返回。 Netty channelRegisteredChannelActive---源碼分析經過下面的分析我們可以了解netty讀數(shù)據(jù)的一個過程,以及為什么channelActive方法、channelReadComplete方法會回調ChannelOutboundHandler的read...
摘要:背景在工作中雖然我經常使用到庫但是很多時候對的一些概念還是處于知其然不知其所以然的狀態(tài)因此就萌生了學習源碼的想法剛開始看源碼的時候自然是比較痛苦的主要原因有兩個第一網上沒有找到讓我滿意的詳盡的源碼分析的教程第二我也是第一次系統(tǒng)地學習這么大代 背景 在工作中, 雖然我經常使用到 Netty 庫, 但是很多時候對 Netty 的一些概念還是處于知其然, 不知其所以然的狀態(tài), 因此就萌生了學...
摘要:目錄此文章屬于源碼之下無秘密做最好的源碼分析教程系列文章之一代碼下載首先到的倉庫中點擊右邊綠色的按鈕拷貝地址然后在終端中輸入如下命令克隆工程工程源碼較大加上國內網絡問題下載源碼可能會比較耗時當有如下輸出時表示克隆成功了如果有朋友實在下載太 目錄 此文章屬于 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 系列文章之一. 代碼下載 首先到 Netty 的 Github 倉庫 中...
摘要:目錄源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端源碼分析之一揭開神秘的紅蓋頭服務器 目錄 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 簡介 Java NIO 的前生今世 ...
摘要:目錄源碼之下無秘密做最好的源碼分析教程源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環(huán)境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端 目錄 源碼之下無秘密 ── 做最好的 Netty 源碼分析教程 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NI...
閱讀 1914·2021-11-25 09:43
閱讀 1415·2021-11-22 14:56
閱讀 3285·2021-11-22 09:34
閱讀 2018·2021-11-15 11:37
閱讀 2267·2021-09-01 10:46
閱讀 1404·2019-08-30 15:44
閱讀 2299·2019-08-30 13:15
閱讀 2400·2019-08-29 13:07