摘要:它使用了事件通知以確定在一組非阻塞套接字中有哪些已經就緒能夠進行相關的操作。目前,可以把看作是傳入入站或者傳出出站數據的載體。出站事件是未來將會觸發的某個動作的操作結果,這些動作包括打開或者關閉到遠程節點的連接將數據寫到或者沖刷到套接字。
netty的概念 定義
Netty 是一款異步的事件驅動的網絡應用程序框架,支持快速地開發可維護的高性能的面向協議的服務器和客戶端。我們可以很簡單的使用Netty 構建應用程序,你不必是一名網絡編程專家;而且Netty 比直接使用底層的Java API 容易得多,它推崇良好的設計實踐,可以將你的應用程序邏輯和網絡層解耦。
Netty的特性總結在我們開始首次深入地了解Netty 之前,請仔細審視表1-1 中所總結的關鍵特性。有些是技術性的,而其他的更多的則是關于架構或設計哲學的。在本書的學習過程中,我們將不止一次地重新審視它們。
在深入netty之前,我們先來簡單說說NIO;我們都知道,它和以前的普通I/O相比的最大優勢在于它是非阻塞的。
阻塞I/O因為普通I/O的阻塞,以前我們設計并發只能像下圖這樣,為每個I/O分配一個線程:
這顯然帶來了一些問題:
在任何時候都可能有大量的線程處于休眠狀態,只是等待輸入或者輸出數據就緒,這可能算是一種資源浪費。
需要為每個線程的調用棧都分配內存,其默認值大小區間為64 KB 到1 MB,具體取決于操作系統。
即使Java 虛擬機(JVM)在物理上可以支持非常大數量的線程,但是遠在到達該極限之前,上下文切換所帶來的開銷就會帶來麻煩,例如,在達到10 000 個連接的時候。
非阻塞I/OJava 對于非阻塞I/O 的支持是在2002 年引入的,位于JDK 1.4 的java.nio 包中。下圖展示了一個非阻塞設計,其實際上消除了普通I/O的那些弊端。選擇器使得我們能夠通過較少的線程便可監視許多連接上的事件。
class java.nio.channels.Selector 是Java 的非阻塞I/O 實現的關鍵。它使用了事件通知API以確定在一組非阻塞套接字中有哪些已經就緒能夠進行I/O 相關的操作。因為可以在任何的時間檢查任意的讀操作或者寫操作的完成狀態,所以如上圖 所示,一個單一的線程便可以處理多個并發的連接。
總體來看,與阻塞I/O 模型相比,這種模型提供了更好的資源管理:
使用較少的線程便可以處理許多連接,因此也減少了內存管理和上下文切換所帶來開銷;
當沒有I/O 操作需要處理的時候,線程也可以被用于其他任務。盡管已經有許多直接使用Java NIO API 的應用程序被構建了,但是要做到如此正確和安全并不容易。特別是,在高負載下可靠和高效地處理和調度I/O 操作是一項繁瑣而且容易出錯的任務,最好留給高性能的網絡編程專家——Netty。
netty核心組件在本節中我將要討論Netty 的主要構件塊:
Channel —— 可以看做是Socket的抽象;
回調;
ChannelFuture—— 異步通知;
事件和ChannelHandler。
EventLoop —— 控制流、多線程處理、并發;
ChannelPipeline —— 提供了ChannelHandler 鏈的容器
引導 —— Bootstrap和ServerBootstrap
這些構建塊代表了不同類型的構造:資源、邏輯以及通知。你的應用程序將使用它們來訪問網絡以及流經網絡的數據。
對于每個組件來說,我們都將提供一個基本的定義,并且在適當的情況下,還會提供一個簡單的示例代碼來說明它的用法。
Channel基本的I/O 操作(bind()、connect()、read()和write())依賴于底層網絡傳輸所提供的原語。在基于Java 的網絡編程中,其基本的構造是class Socket。Netty 的Channel 接口所提供的API,大大地降低了直接使用Socket 類的復雜性。
Channel 是Java NIO 的一個基本構造。它代表一個到實體(如一個硬件設備、一個文件、一個網絡套接字或者一個能夠執行一個或者多個不同的I/O操作的程序組件)的開放連接,如讀操作和寫操作。
目前,可以把Channel 看作是傳入(入站)或者傳出(出站)數據的載體。因此,它可以被打開或者被關閉,連接或者斷開連接。
回調一個回調其實就是一個方法,一個指向已經被提供給另外一個方法的方法的引用。這使得后者可以在適當的時候調用前者。回調在廣泛的編程場景中都有應用,而且也是在操作完成后通知相關方最常見的方式之一。
ChannelFutureFuture 提供了另一種在操作完成時通知應用程序的方式。這個對象可以看作是一個異步操作的結果的占位符;它將在未來的某個時刻完成,并提供對其結果的訪問。
JDK 預置了interface java.util.concurrent.Future,但是其所提供的實現,只允許手動檢查對應的操作是否已經完成,或者一直阻塞直到它完成。這是非常繁瑣的,所以Netty提供了它自己的實現——ChannelFuture,用于在執行異步操作的時候使用。
ChannelFuture提供了幾種額外的方法,這些方法使得我們能夠注冊一個或者多個ChannelFutureListener實例。監聽器的回調方法operationComplete(),將會在對應的操作完成時被調用。然后監聽器可以判斷該操作是成功地完成了還是出錯了。如果是后者,我們可以檢索產生的Throwable。簡而言之,由ChannelFutureListener提供的通知機制消除了手動檢查對應的操作是否完成的必要。
下面展示了一個異步地連接到遠程節點,ChannelFuture 作為一個I/O 操作的一部分返回的例子。這里,connect()方法將會直接返回,而不會阻塞。
Channel channel = ...; ChannelFuture future = channel.connect( new InetSocketAddress("192.168.0.1", 25));
下面的代碼顯示了如何利用ChannelFutureListener。首先,要連接到遠程節點上。然后,要注冊一個新的ChannelFutureListener 到對connect()方法的調用所返回的ChannelFuture 上。當該監聽器被通知連接已經建立的時候,要檢查對應的狀態。如果該操作是成功的,那么將數據寫到該Channel。否則,要從ChannelFuture 中檢索對應的Throwable。
Channel channel = ...; // 連接遠程節點 ChannelFuture future = channel.connect( new InetSocketAddress("192.168.0.1", 25)); //注冊一個ChannelFutureListener,以便在操作完成時獲得通知 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) { //狀態判斷 if (future.isSuccess()){ //如果操作是成功的,則創建一個ByteBuf 以持有數據 ByteBuf buffer = Unpooled.copiedBuffer( "Hello",Charset.defaultCharset()); //將數據異步地發送到遠程節點。返回一個ChannelFuture ChannelFuture wf = future.channel() .writeAndFlush(buffer); .... } else { //如果發生錯誤,則訪問描述原因的Throwable Throwable cause = future.cause(); cause.printStackTrace(); } } });
如果你把ChannelFutureListener 看作是回調的一個更加精細的版本,那么你是對的。事實上,回調和Future 是相互補充的機制;它們相互結合,構成了Netty 本身的關鍵構件塊之一。
事件和ChannelHandler 事件Netty 使用不同的事件來通知我們狀態的改變或者是操作的狀態。這使得我們能夠基于已經發生的事件來觸發適當的動作。這些動作可能是:
記錄日志;
數據轉換;
流控制;
應用程序邏輯。
Netty 是一個網絡編程框架,所以事件是按照它們與入站或出站數據流的相關性進行分類的。可能由入站數據或者相關的狀態更改而觸發的事件包括:
連接已被激活或者連接失活;
數據讀取;
用戶事件;
錯誤事件。
出站事件是未來將會觸發的某個動作的操作結果,這些動作包括:
打開或者關閉到遠程節點的連接;
將數據寫到或者沖刷到套接字。
ChannelHandler從應用程序開發人員的角度來看,Netty 的主要組件是ChannelHandler,它充當了所有處理入站和出站數據的應用程序邏輯的容器。該組件實現了服務器對從客戶端接收的數據的處理。每個事件都可以被分發給ChannelHandler 類中的某個用戶實現的方法。這是一個很好的將事件驅動范式直接轉換為應用程序構件塊的例子。圖1-3 展示了一個事件是如何被一個這樣的ChannelHandler 鏈處理的。
Netty 的ChannelHandler 為處理器提供了基本的抽象,如圖1-3 所示的那些。你可以認為每個ChannelHandler 的實例都類似于一種為了響應特定事件而被執行的回調。
下列代碼就是一個handler的示例:
@ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler{ //重寫了channelActive()方法,其將在一個連接建立時被調用 @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } //重寫了channelRead0()方法。每當接收數據時,都會調用這個方法。 //需要注意的是,由服務器發送的消息可能會被分塊接收。 // 也就是說,如果服務器發送了5 字節,那么不能保證這5 字節會被一次性接收。 //即使是對于這么少量的數據,channelRead0()方法也可能 // 會被調用兩次,第一次使用一個持有3 字節的ByteBuf(Netty 的字節容器) // 第二次使用一個持有2 字節的ByteBuf。 @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println( "Client received: " + in.toString(CharsetUtil.UTF_8)); } //發生異常時被調用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
channelHandler的主要抽象方法都定義于ChannelHandlerAdapter類中,我們通過重寫適當的方法,來控制整個生命周期的重要節點的邏輯。
Netty 提供了大量預定義的可以開箱即用的ChannelHandler 實現,包括用于各種協議(如HTTP 和SSL/TLS)的ChannelHandler。在內部,ChannelHandler 自己也使用了事件和Future,使得它們也成為了你的應用程序將使用的相同抽象的消費者。
ChannelPipelineChannelPipeline 提供了ChannelHandler 鏈的容器,并定義了用于在該鏈上傳播入站和出站事件流的API。當Channel 被創建時,它會被自動地分配到它專屬的ChannelPipeline。
ChannelHandler 安裝到ChannelPipeline 中的過程如下所示:
ServerBootstrap b = new ServerBootstrap(); //一個ChannelInitializer的實現被注冊到了ServerBootstrap中①; b.group(group) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializerEventLoop() { //當ChannelInitializer.initChannel()方法被調用時ChannelInitializer將在 //ChannelPipeline 中安裝一組自定義的ChannelHandler serverHandler; @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(serverHandler); } });
EventLoop 定義了Netty 的核心抽象,用于處理連接的生命周期中所發生的事件。圖3-1
下圖在高層次上說明了Channel、EventLoop、Thread 以及EventLoopGroup 之間的關系。
這些關系是:
一個EventLoopGroup 包含一個或者多個EventLoop;
一個EventLoop 在它的生命周期內只和一個Thread 綁定;所有由EventLoop 處理的I/O 事件都將在它專有的Thread 上被處理;
一個Channel 在它的生命周期內只注冊于一個EventLoop;一個EventLoop 可能會被分配給一個或多個Channel。
注意,在這種設計中,一個給定Channel 的I/O 操作都是由相同的Thread 執行的,實際
上消除了不同線程間對于同步的需要。
Netty 的引導類為應用程序的網絡層配置提供了容器,這涉及將一個進程綁定到某個指定的端口(ServerBootstrap),或者將一個進程連接到另一個運行在某個指定主機的指定端口上的進程(Bootstrap)。Netty提供兩種類型的引導,一種用于客戶端(簡單地稱為Bootstrap),而另一種(ServerBootstrap)用于服務器。無論你的應用程序使用哪種協議或者處理哪種類型的數據,唯一決定它使用哪種引導類的是它是作為一個客戶端還是作為一個服務器。表3-1 比較了這兩種類型的引導類。
這兩種類型的引導類之間的第一個區別已經討論過了:ServerBootstrap 將綁定到一個端口,因為服務器必須要監聽連接,而Bootstrap 則是由想要連接到遠程節點的客戶端應用程序所使用的。
第二個區別可能更加明顯。引導一個客戶端只需要一個EventLoopGroup,但是一個ServerBootstrap 則需要兩個(也可以是同一個實例)。為什么呢?
因為服務器需要兩組不同的Channel。第一組將只包含一個ServerChannel,代表服務器自身的已綁定到某個本地端口的正在監聽的套接字。而第二組將包含所有已創建的用來處理傳入客戶端連接(對于每個服務器已經接受的連接都有一個)的Channel。圖3-4 說明了這個模型,并且展示了為何需要兩個不同的EventLoopGroup。
與ServerChannel 相關聯的EventLoopGroup 將分配一個負責為 傳入連接請求 創建Channel 的EventLoop。一旦連接被接受,第二個EventLoopGroup 就會給它的Channel分配一個EventLoop。
一個簡單的Netty服務端和客戶端交互demo 客戶端 自定義channelHandlerimport io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; @ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler客戶端實例{ //重寫了channelActive()方法,其將在一個連接建立時被調用 @Override public void channelActive(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8)); } //重寫了channelRead0()方法。每當接收數據時,都會調用這個方法。 //需要注意的是,由服務器發送的消息可能會被分塊接收。 // 也就是說,如果服務器發送了5 字節,那么不能保證這5 字節會被一次性接收。 //即使是對于這么少量的數據,channelRead0()方法也可能 // 會被調用兩次,第一次使用一個持有3 字節的ByteBuf(Netty 的字節容器) // 第二次使用一個持有2 字節的ByteBuf。 @Override public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) { System.out.println( "Client received: " + in.toString(CharsetUtil.UTF_8)); } //發生異常時被調用 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws Exception { //定義EventLoop EventLoopGroup group = new NioEventLoopGroup(); try { //Bootstrap類包提供包含豐富API的幫助類,能夠非常方便的實現典型的服務器端和客戶端通道初始化功能。 Bootstrap b = new Bootstrap(); //綁定EventLoop b.group(group) //使用默認的channelFactory創建一個channel .channel(NioSocketChannel.class) //定義遠程地址 .remoteAddress(new InetSocketAddress(host, port)) //綁定自定義的EchoClientHandler到ChannelPipeline上 .handler(new ChannelInitializer服務端 自定義channelHandler() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new EchoClientHandler()); } }); //同步式的鏈接 ChannelFuture f = b.connect().sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } } public static void main(String[] args) throws Exception { new EchoClient("localhost", 8155).start(); } }
import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.util.CharsetUtil; /** * 因為你的Echo 服務器會響應傳入的消息,所以它需要實現ChannelInboundHandler 接口,用 * 來定義響應入站事件的方法。 */ //標示一個ChannelHandler 可以被多個Channel 安全地共享 @ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { //對于每個傳入的消息都會被調用; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { //將消息記錄到控制臺 ByteBuf in = (ByteBuf) msg; System.out.println( "Server received: " + in.toString(CharsetUtil.UTF_8)); ctx.write(in); } //通知ChannelInboundHandler最后一次對channelRead() //的調用是當前批量讀取中的最后一條消息; @Override public void channelReadComplete(ChannelHandlerContext ctx) { ctx.writeAndFlush(Unpooled.EMPTY_BUFFER) .addListener(ChannelFutureListener.CLOSE); } //在讀取操作期間,有異常拋出時會調用。 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }服務端實例
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.*; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { //設置端口值(如果端口參數的格式不正確,則拋出一個NumberFormatException) int port = 8155; new EchoServer(port).start(); } public void start() throws Exception { //定義EventLoop EventLoopGroup group = new NioEventLoopGroup(); try { //與Bootstrap類包包含豐富的客戶端API一樣,ServerBootstrap能夠非常方便的實現典型的服務端。 ServerBootstrap b = new ServerBootstrap(); b.group(group) //指定所使用的NIO傳輸Channel .channel(NioServerSocketChannel.class) //使用指定的端口設置套接字地址 .localAddress(new InetSocketAddress(port)) //添加一個EchoServerHandler 到子Channel的ChannelPipeline .childHandler(new ChannelInitializer小結() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoServerHandler()); } }); //新建一個future實例,異步地綁定服務器;調用sync()方法阻塞等待直到綁定完成 ChannelFuture f = b.bind().sync(); //獲取Channel 的CloseFuture,并且阻塞當前線程直到它完成 //該應用程序將會阻塞等待直到服務器的Channel關閉(因為你在Channel 的CloseFuture 上調用了sync()方法)。 f.channel().closeFuture().sync(); } finally { //關閉EventLoopGroup,釋放所有的資源 group.shutdownGracefully().sync(); } } }
在本章中,我們從技術和體系結構這兩個角度探討了理解Netty 的重要性。我們也更加詳細地重新審視了之前引入的一些概念和組件,特別是ChannelHandler、ChannelPipeline和引導。
特別地,我們討論了ChannelHandler 類的層次結構,并介紹了編碼器和解碼器,描述了它們在數據和網絡字節格式之間來回轉換的互補功能。下面的許多章節都將致力于深入研究這些組件,而這里所呈現的概覽應該有助于你對整體
的把控。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69480.html
摘要:當你從讀取時,它的將會被遞增已經被讀取的字節數。達到和位于同一位置,表示我們到達可以讀取的數據的末尾。該應用程序可以選擇為多個消息重用相同的消息主體。 ByteBuffer 當我們進行數據傳輸的時候,往往需要使用到緩沖區,常用的緩沖區就是JDK NIO類庫提供的java.nio.Buffer。 showImg(https://segmentfault.com/img/bVbbz8p?w...
摘要:簡單來說就是把注冊的動作異步化,當異步執行結束后會把執行結果回填到中抽象類一般就是公共邏輯的處理,而這里的處理主要就是針對一些參數的判斷,判斷完了之后再調用方法。 閱讀這篇文章之前,建議先閱讀和這篇文章關聯的內容。 1. 詳細剖析分布式微服務架構下網絡通信的底層實現原理(圖解) 2. (年薪60W的技巧)工作了5年,你真的理解Netty以及為什么要用嗎?(深度干貨)...
摘要:最后五年工作經驗以上。所以我希望大家也可以深入某項核心技術去好好研究。公眾號貓說現架構設計碼農兼創業技術顧問,不羈平庸,熱愛開源,雜談程序人生與不定期干貨。 本博客 貓叔的博客,轉載請申明出處 前言 感謝粉絲提問,由于我最近工作較忙,所以今晚抽空回復問題,可能有點倉促,回答的不是很全,不過希望能對你有所幫助。 整理自己的技術棧 首先,我需要你整理出自己一份滿意的簡歷,我希望程序員可以每...
摘要:是一個分布式服務框架,以及治理方案。手寫注意要點手寫注意要點基于上文中對于協議的理解,如果我們自己去實現,需要考慮哪些技術呢其實基于圖的整個流程應該有一個大概的理解。基于手寫實現基于手寫實現理解了協議后,我們基于來實現一個通信框架。閱讀這篇文章之前,建議先閱讀和這篇文章關聯的內容。[1]詳細剖析分布式微服務架構下網絡通信的底層實現原理(圖解)[2][年薪60W的技巧]工作了5年,你真的理解N...
閱讀 2472·2021-11-24 09:39
閱讀 3518·2019-08-30 15:53
閱讀 594·2019-08-29 15:15
閱讀 2903·2019-08-26 13:23
閱讀 3212·2019-08-26 10:48
閱讀 643·2019-08-26 10:31
閱讀 748·2019-08-26 10:30
閱讀 2359·2019-08-23 18:32