摘要:而用于主線程池的屬性都定義在中本篇只是簡單介紹了一下引導類的配置屬性,下一篇我將詳細介紹服務端引導類的過程分析。
從Java1.4開始, Java引入了non-blocking IO,簡稱NIO。NIO與傳統socket最大的不同就是引入了Channel和多路復用selector的概念。傳統的socket是基于stream的,它是單向的,有InputStream表示read和OutputStream表示寫。而Channel是雙工的,既支持讀也支持寫,channel的讀/寫都是面向Buffer。 NIO中引入的多路復用Selector機制(如果是linux系統,則應用的epoll事件通知機制)可使一個線程同時監聽多個Channel上發生的事件。 雖然Java NIO相比于以往確實是一個大的突破,但是如果要真正上手進行開發,且想要開發出好的一個服務端網絡程序,那么你得要花費一點功夫了,畢竟Java NIO只是提供了一大堆的API而已,對于一般的軟件開發人員來說只能呵呵了。因此,社區中就涌現了很多基于Java NIO的網絡應用框架,其中以Apache的Mina,以及Netty最為出名,從本篇開始我們將深入的分析一下Netty的內部實現細節 。
本系列是基于Netty4.1.18這個版本。
在分析源碼之前,我們還是先看看Netty官方的樣例代碼,了解一下Netty一般是如何進行服務端及客戶端開發的。
Netty服務端示例:
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup) // (3) .channel(NioServerSocketChannel.class) // (4) .handler(new LoggingHandler()) // (5) .childHandler(new ChannelInitializer() { // (6) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new DiscardServerHandler()); } }) .option(ChannelOption.SO_BACKLOG, 128) // (7) .childOption(ChannelOption.SO_KEEPALIVE, true); // (8) // Bind and start to accept incoming connections. ChannelFuture f = b.bind(port).sync(); // (9) // Wait until the server socket is closed. // In this example, this does not happen, but you can do that to gracefully // shut down your server. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); }
上面這段代碼展示了服務端的一個基本步驟:
1、 初始化用于Acceptor的主"線程池"以及用于I/O工作的從"線程池";
2、 初始化ServerBootstrap實例, 此實例是netty服務端應用開發的入口,也是本篇介紹的重點, 下面我們會深入分析;
3、 通過ServerBootstrap的group方法,設置(1)中初始化的主從"線程池";
4、 指定通道channel的類型,由于是服務端,故而是NioServerSocketChannel;
5、 設置ServerSocketChannel的處理器(此處不詳述,后面的系列會進行深入分析)
6、 設置子通道也就是SocketChannel的處理器, 其內部是實際業務開發的"主戰場"(此處不詳述,后面的系列會進行深入分析)
7、 配置ServerSocketChannel的選項
8、 配置子通道也就是SocketChannel的選項
9、 綁定并偵聽某個端口
接著,我們再看看客戶端是如何開發的:
Netty客戶端示例:
public class TimeClient { public static void main(String[] args) throws Exception { String host = args[0]; int port = Integer.parseInt(args[1]); EventLoopGroup workerGroup = new NioEventLoopGroup(); // (1) try { Bootstrap b = new Bootstrap(); // (2) b.group(workerGroup); // (3) b.channel(NioSocketChannel.class); // (4) b.option(ChannelOption.SO_KEEPALIVE, true); // (5) b.handler(new ChannelInitializer() { // (6) @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new TimeClientHandler()); } }); // Start the client. ChannelFuture f = b.connect(host, port).sync(); // (7) // Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { workerGroup.shutdownGracefully(); } } }
客戶端的開發步驟和服務端都差不多:
1、 初始化用于連接及I/O工作的"線程池";
2、 初始化Bootstrap實例, 此實例是netty客戶端應用開發的入口,也是本篇介紹的重點, 下面我們會深入分析;
3、 通過Bootstrap的group方法,設置(1)中初始化的"線程池";
4、 指定通道channel的類型,由于是客戶端,故而是NioSocketChannel;
5、 設置SocketChannel的選項(此處不詳述,后面的系列會進行深入分析);
6、 設置SocketChannel的處理器, 其內部是實際業務開發的"主戰場"(此處不詳述,后面的系列會進行深入分析);
7、 連接指定的服務地址;
通過對上面服務端及客戶端代碼分析,Bootstrap是Netty應用開發的入口,如果想要理解Netty內部的實現細節,那么有必要先了解一下Bootstrap內部的實現機制。
首先我們先看一下ServerBootstrap及Bootstrap的類繼承結構圖:
通過類圖我們知道AbstractBootstrap類是ServerBootstrap及Bootstrap的基類,我們先看一下AbstractBootstrap類的主要代碼:
public abstract class AbstractBootstrap, C extends Channel> implements Cloneable { volatile EventLoopGroup group; private volatile ChannelFactory extends C> channelFactory; private final Map, Object> options = new LinkedHashMap , Object>(); private final Map , Object> attrs = new LinkedHashMap , Object>(); private volatile ChannelHandler handler; public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return self(); } private B self() { return (B) this; } public B channel(Class extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory (channelClass)); } @Deprecated public B channelFactory(ChannelFactory extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); } public B channelFactory(io.netty.channel.ChannelFactory extends C> channelFactory) { return channelFactory((ChannelFactory ) channelFactory); } public B option(ChannelOption option, T value) { if (option == null) { throw new NullPointerException("option"); } if (value == null) { synchronized (options) { options.remove(option); } } else { synchronized (options) { options.put(option, value); } } return self(); } public B attr(AttributeKey key, T value) { if (key == null) { throw new NullPointerException("key"); } if (value == null) { synchronized (attrs) { attrs.remove(key); } } else { synchronized (attrs) { attrs.put(key, value); } } return self(); } public B validate() { if (group == null) { throw new IllegalStateException("group not set"); } if (channelFactory == null) { throw new IllegalStateException("channel or channelFactory not set"); } return self(); } public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); } public ChannelFuture bind(SocketAddress localAddress) { validate(); if (localAddress == null) { throw new NullPointerException("localAddress"); } return doBind(localAddress); } private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; } if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it"s not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered(); doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } } final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } } return regFuture; } abstract void init(Channel channel) throws Exception; private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); } public B handler(ChannelHandler handler) { if (handler == null) { throw new NullPointerException("handler"); } this.handler = handler; return self(); }public abstract AbstractBootstrapConfig config(); }
現在我們以示例代碼為出發點,來詳細分析一下引導類內部實現細節:
1、 首先看看服務端的b.group(bossGroup, workerGroup):
調用ServerBootstrap的group方法,設置react模式的主線程池 以及 IO 操作線程池,ServerBootstrap中的group代碼如下:
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (childGroup == null) { throw new NullPointerException("childGroup"); } if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = childGroup; return this; }
在group方法中,會繼續調用父類的group方法,而通過類繼承圖我們知道,super.group(parentGroup)其實調用的就是AbstractBootstrap的group方法。AbstractBootstrap中group代碼如下:
public B group(EventLoopGroup group) { if (group == null) { throw new NullPointerException("group"); } if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return self(); }
通過以上分析,我們知道了AbstractBootstrap中定義了主線程池group的引用,而子線程池childGroup的引用是定義在ServerBootstrap中。
當我們查看客戶端Bootstrap的group方法時,我們發現,其是直接調用的父類AbstractBoostrap的group方法。
2、示例代碼中的 channel()方法
無論是服務端還是客戶端,channel調用的都是基類的channel方法,其實現細節如下:
public B channel(Class extends C> channelClass) { if (channelClass == null) { throw new NullPointerException("channelClass"); } return channelFactory(new ReflectiveChannelFactory(channelClass)); }
public B channelFactory(ChannelFactory extends C> channelFactory) { if (channelFactory == null) { throw new NullPointerException("channelFactory"); } if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); } this.channelFactory = channelFactory; return self(); }
我們發現,其實channel方法內部,只是初始化了一個用于生產指定channel類型的工廠實例。
3、option / handler / attr 方法
option: 設置通道的選項參數, 對于服務端而言就是ServerSocketChannel, 客戶端而言就是SocketChannel;
handler: 設置主通道的處理器, 對于服務端而言就是ServerSocketChannel,也就是用來處理Acceptor的操作;
對于客戶端的SocketChannel,主要是用來處理 業務操作;
attr: 設置通道的屬性;
option / handler / attr方法都定義在AbstractBootstrap中, 所以服務端和客戶端的引導類方法調用都是調用的父類的對應方法。
4、childHandler / childOption / childAttr 方法(只有服務端ServerBootstrap才有child類型的方法)
對于服務端而言,有兩種通道需要處理, 一種是ServerSocketChannel:用于處理用戶連接的accept操作, 另一種是SocketChannel,表示對應客戶端連接。而對于客戶端,一般都只有一種channel,也就是SocketChannel。
因此以child開頭的方法,都定義在ServerBootstrap中,表示處理或配置服務端接收到的對應客戶端連接的SocketChannel通道。
childHandler / childOption / childAttr 在ServerBootstrap中的對應代碼如下:
public ServerBootstrap childHandler(ChannelHandler childHandler) { if (childHandler == null) { throw new NullPointerException("childHandler"); } this.childHandler = childHandler; return this; }
publicServerBootstrap childOption(ChannelOption childOption, T value) { if (childOption == null) { throw new NullPointerException("childOption"); } if (value == null) { synchronized (childOptions) { childOptions.remove(childOption); } } else { synchronized (childOptions) { childOptions.put(childOption, value); } } return this; }
publicServerBootstrap childAttr(AttributeKey childKey, T value) { if (childKey == null) { throw new NullPointerException("childKey"); } if (value == null) { childAttrs.remove(childKey); } else { childAttrs.put(childKey, value); } return this; }
至此,引導類的屬性配置都設置完畢了。
本篇總結:
1、服務端由兩種線程池,用于Acceptor的React主線程和用于I/O操作的React從線程池; 客戶端只有用于連接及IO操作的React的主線程池;
2、ServerBootstrap中定義了服務端React的"從線程池"對應的相關配置,都是以child開頭的屬性。 而用于"主線程池"channel的屬性都定義在AbstractBootstrap中;
本篇只是簡單介紹了一下引導類的配置屬性, 下一篇我將詳細介紹服務端引導類的Bind過程分析。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/68110.html
摘要:對于,目前大家只知道是個線程組,其內部到底如何實現的,它的作用到底是什么,大家也都不太清楚,由于篇幅原因,這里不作詳細介紹,后面會有文章作專門詳解。 在上一篇《ServerBootstrap 與 Bootstrap 初探》中,我們已經初步的了解了ServerBootstrap是netty進行服務端開發的引導類。 且在上一篇的服務端示例中,我們也看到了,在使用netty進行網絡編程時,我...
摘要:接下來的兩篇文章,我將從源碼角度為大家深入淺出的剖析的線程模型工作機制。我們看一下的源碼通過的代碼發現,實現了接口,其內部會通過指定的默認線程工廠來創建線程,并執行相應的任務。至此,初始化完成了。下一篇我們將詳細介紹,敬請期待。 我們都知道Netty的線程模型是基于React的線程模型,并且我們都知道Netty是一個高性能的NIO框架,那么其線程模型必定是它的重要貢獻之一。 在使用ne...
摘要:一些想法這個系列想開很久了,自己使用也有一段時間了,利用也編寫了一個簡單的框架,并運用到工作中了,感覺還不錯,趁著這段時間工作不是很忙,來分析一波源碼,提升下技術硬實力。 一些想法 這個系列想開很久了,自己使用netty也有一段時間了,利用netty也編寫了一個簡單的框架,并運用到工作中了,感覺還不錯,趁著這段時間工作不是很忙,來分析一波源碼,提升下技術硬實力。 結構 這里先看下net...
摘要:本篇將通過實例化過程,來深入剖析。及初始化完成后,它們會相互連接。我們在回到的構造方法父類構造方法調用完成后,還要初始化一下自己的配置對象是的內部類而又是繼承自,通過代碼分析,此對象就是就會對底層一些配置設置行為的封裝。 根據上一篇《Netty4.x 源碼實戰系列(二):服務端bind流程詳解》所述,在進行服務端開發時,必須通過ServerBootstrap引導類的channel方法來...
摘要:下面無恥的貼點源碼。啟動類我們也學,把啟動類抽象成兩層,方便以后寫客戶端。別著急,我們慢慢來,下一篇我們會了解以及他的成員,然后,完善我們的程序,增加其接收數據的能力。文章的源碼我會同步更新到我的上,歡迎大家,哈哈。 廢話兩句 這次更新拖了很長時間,第一是自己生病了,第二是因為最開始這篇想寫的很大,然后構思了很久,發現不太合適把很多東西寫在一起,所以做了點拆分,準備國慶前完成這篇博客。...
閱讀 2485·2021-10-19 11:45
閱讀 2464·2021-09-30 09:56
閱讀 1432·2021-09-30 09:47
閱讀 591·2019-08-30 15:53
閱讀 1834·2019-08-30 15:44
閱讀 583·2019-08-30 12:52
閱讀 1084·2019-08-30 11:16
閱讀 1605·2019-08-29 16:36