摘要:服務器構成至少一個該組件實現了服務器對從客戶端接受的數據的處理,即它的業務邏輯引導配置服務器的啟動代碼。至少,它會將服務器綁定到它要監聽連接請求的端口上。需要注意的是,由服務器發送的消息可能會被分塊接受。
Netty服務器構成
至少一個ChannelHandler——該組件實現了服務器對從客戶端接受的數據的處理,即它的業務邏輯
引導——配置服務器的啟動代碼。至少,它會將服務器綁定到它要監聽連接請求的端口上。
ChannelHandler和業務邏輯?ChannelHandler是一個接口族的父接口,它的實現負責接受并響應事件通知,在Netty應用程序中,所有的數據處理邏輯都包含在這些核心抽象的實現中。
?Echo服務器會響應傳入的消息,因此需要實現ChannelInboundHandler接口,用來定義響應入站事件的方法。由于Echo服務器的應用程序只需要用到少量的方法,所以只需要繼承ChannelInboundHandlerAdapter類,它提供了ChannelInboundHandler的默認實現。
?在ChannelInboundHandler中,我們感興趣的方法有:
channelRead()——對于每個傳入的消息都要調用
channelReadComplete()——通知ChannelInboundHandler最后一次對channelRead()的調用是當前批量讀取中的最后一條消息
execeptionCaught()——在讀取操作期間,有異常拋出時會調用。
EchoServerHandler實現package cn.sh.demo.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; /** * @author sh * @ChannelHandler.Sharable 標示一個ChannelHandler可以被多個Channel安全地共享 */ @ChannelHandler.Sharable public class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { ByteBuf in = (ByteBuf) msg; //將接受到的消息輸出到客戶端 System.out.println("Server received:" + in.toString(CharsetUtil.UTF_8)); //將接收到的消息寫給發送者,而不沖刷出站消息 ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) { //將消息沖刷到客戶端,并且關閉該Channel ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { //打印異常堆棧跟蹤 cause.printStackTrace(); //關閉該Channel ctx.close(); } }
備注
ChannelInbounHandlerAdapter每個方法都可以被重寫然后掛鉤到事件生命周期的恰當點上。
重寫exceptionCaught()方法允許你對Throwable的任何子類型作出反應。
每個Channel都擁有一個與之關聯的ChannelPipeline,ChannelPipeline持有一個ChannelHandler的實例鏈。在默認情況下,ChannelHandler會把對方法的調用轉發給鏈中的下一個ChannelHandler。因此,如果exceptionCaught()方法沒有被該鏈中的某處實現,那么異常將會被傳遞到ChannelPipeline的末端進行記錄
引導服務器主要涉及的內容
綁定監聽并接受傳入連接請求的端口
配置Channel,將有關的入站消息通知給EchoServerHandler實例
Echo服務引導示例代碼
package cn.sh.demo.echo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public void startServer() throws InterruptedException { EchoServerHandler serverHandler = new EchoServerHandler(); //創建EventLoopGroup EventLoopGroup group = new NioEventLoopGroup(); //創建ServerBootstrap ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(group) //指定所使用的NIO傳輸Channel .channel(NioServerSocketChannel.class) //使用指定的端口套接字 .localAddress(new InetSocketAddress(port)) //添加一個EchoServerHandler到子Channel的ChannelPipeline .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel channel) throws Exception { //此處由于EchoServerHandler被注解標注為@Shareble,所以我們總是使用相同的實例 channel.pipeline().addLast(serverHandler); } }); try { //異步的綁定服務器,調用sync()方法阻塞等待直到綁定完成 ChannelFuture channelFuture = bootstrap.bind().sync(); //獲取Channel的CloseFuture,并且阻塞當前線程直到它完成 channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //關閉EventLoopGroup,釋放所有的資源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { if (args.length != 1) { System.err.println("參數類型或者個數不正確"); return; } //設置端口值 int port = Integer.parseInt(args[0]); //啟動Echo服務器 new EchoServer(port).startServer(); } }
備注
此處使用了一個特殊的類——ChannelInitializer。當一個新的連接被接受時,一個新的子Channel會被創建,此時ChannelInitializer就會把一個EchoServerHandler的示例添加到該Channel的ChannelPipeline中,這個ChannelHandler將會收到有關入站消息的通知。
回顧引導服務創建一個ServerBootStrap實例來引導和綁定服務器
創建并分配一個NioEventLoopGroup實例進行事件的處理,如接受新連接以及讀/寫數據
指定服務器綁定的本地InetSocketAddress
使用EchoServerHandler的實例初始化每一個新的Channel
調用ServerBootstrap.bind()方法來綁定服務器
Echo客戶端客戶端主要包括的操作:
連接到服務器
發送一個或多個消息
對于每個消息,等待并接受從服務器發回相同的消息
關閉連接
編寫客戶端主要包括業務邏輯和引導
ChannelHandler實現客戶端邏輯在該示例中,我們使用SimpleChannelInboundHandler類來處理所有的事件,主要的方法有:
channelActive()——在和服務器的連接已經建立之后被調用
channelRead0()——當從服務器接收到一條消息時被調用
exceptionCaught()——在處理過程中引發異常時被調用
示例代碼如下:
package cn.sh.demo.echo; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.util.CharsetUtil; /** * @author sh * @ChannelHandler.Sharable 標記該類的示例可以被多個Channel共享 */ @ChannelHandler.Sharable public class EchoClientHandler extends SimpleChannelInboundHandler{ @Override public void channelActive(ChannelHandlerContext ctx) { //當一個連接被服務器接受并建立后,發送一條消息 ctx.writeAndFlush(Unpooled.copiedBuffer("Hello, Netty", CharsetUtil.UTF_8)); } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) { //記錄客戶端接收到服務器的消息 System.out.println("Client received:" + byteBuf.toString(CharsetUtil.UTF_8)); } /** * 在發生異常時,記錄錯誤并關閉Channel * @param ctx * @param cause */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
備注
?每次在接受數據時,都會調用channelRead0()方法。需要注意的是,由服務器發送的消息可能會被分塊接受。也就是說,如果服務器發送了5字節,那么不能保證這5字節會被一次性接受。即使是對于這么少量的數據,channelRead0()方法也可能會被調用兩次,第一次使用一個持有3字節的ByteBuf(Netty的字節容器),第二次使用一個持有2字節的ByteBuf。作為一個面向流的協議,TCP保證了字節數組會按照服務器發送它們的順序被接受。
主要和業務邏輯如何處理消息以及Netty如何管理資源有關
客戶端中,當channelRead0()方法完成時,已經接受了消息并且處理完畢,當該方法返回時,SimpleChannelInboundHandler負責釋放指向保存該消息的ByteBuf的內存引用。
但是在服務器端,你需要將消息返回給客戶端,write()操作是異步的,直到channelRead()方法返回后有可能仍然沒有完成,ChannelInboundHandlerAdapter在這個時間點上不會釋放消息。
服務端的消息是在channelComplete()方法中,通過writeAndFlush()方法調用時被釋放。
客戶端使用主機和端口參數來連接遠程地址,也就是Echo服務器的地址,而不是綁定到一個一直被監聽的端口。
示例代碼如下:
package cn.sh.demo.echo; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.net.InetSocketAddress; public class EchoClient { private final String host; private final int port; public EchoClient(String host, int port) { this.host = host; this.port = port; } public void start() throws InterruptedException { EventLoopGroup group = new NioEventLoopGroup(); //創建客戶端引導器 Bootstrap bootstrap = new Bootstrap(); //指定使用NioEventLoopGroup來處理客戶端事件 bootstrap.group(group) //指定使用NIO傳輸的Channel類型 .channel(NioSocketChannel.class) //設置服務器的InetSocketAddress .remoteAddress(new InetSocketAddress(host, port)) //在創建Channel時,向ChannelPipeline中添加一個EchoHandler實例 .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new EchoClientHandler()); } }); try { //連接到遠程節點,阻塞等待直到連接完成 ChannelFuture future = bootstrap.connect().sync(); //阻塞直到Channel關閉 future.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { //關閉線程池并且釋放所有的資源 group.shutdownGracefully().sync(); } } public static void main(String[] args) throws InterruptedException { if (args.length != 2) { System.err.println("參數個數不正確"); return; } int port = Integer.parseInt(args[1]); new EchoClient(args[0], port).start(); } }
備注
服務器和客戶端均使用了NIO傳輸,但是,客戶端和服務端可以使用不同的傳輸,例如,在服務器使用NIO傳輸,客戶端可以使用OIO傳輸
回顧引導服務器創建一個Bootstrap實例,引導并創建客戶端
創建一個NioEventLoopGroup實例來進行事件處理,其中事件處理包括創建新的連接以及處理入站和出站數據
為服務器連接創建了一個InetSocketAddress實例
當連接建立時,一個EchoClientHandler實例會被添加到(該Channel)的ChannelPipeline中
設置完成后,調用Bootstrap.connetc()連接到遠程節點
運行程序啟動服務端
再啟動客戶端
服務端的輸出如下:
客戶端的輸出如下:
代碼地址該文章的示例代碼位于cn.sh.demo.echo包下。
示例代碼,點擊查看
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69234.html
摘要:前言本文以自帶的示例工程為例,簡要介紹線程模型示例工程的代碼位于很簡單,僅包含一個方法用于初始化以及,我們來看看其中和線程模型相關的一些代碼在的初始化代碼中實例化了兩個對象和,它們有著公共基類,這個是線程模型的核心類名讓人聯想到組合模式, 前言 本文以 netty 4.1 自帶的示例工程 netty-example 為例,簡要介紹 netty 線程模型 EchoServer echo ...
摘要:目前為止,我們已經完成了一半的工作,剩下的就是在方法中啟動服務器。第一個通常被稱為,負責接收已到達的。這兩個指針恰好標記著數據的起始終止位置。 前言 本篇翻譯自netty官方Get Start教程,一方面能把好的文章分享給各位,另一方面能鞏固所學的知識。若有錯誤和遺漏,歡迎各位指出。 https://netty.io/wiki/user-gu... 面臨的問題 我們一般使用專用軟件或者...
摘要:目錄源碼分析之番外篇的前生今世的前生今世之一簡介的前生今世之二小結的前生今世之三詳解的前生今世之四詳解源碼分析之零磨刀不誤砍柴工源碼分析環境搭建源碼分析之一揭開神秘的紅蓋頭源碼分析之一揭開神秘的紅蓋頭客戶端源碼分析之一揭開神秘的紅蓋頭服務器 目錄 Netty 源碼分析之 番外篇 Java NIO 的前生今世 Java NIO 的前生今世 之一 簡介 Java NIO 的前生今世 ...
摘要:結構作為服務端作為序列化數據的協議前端通訊演示地址服務端實現啟動類長連接示例主線程組從線程組請求的解碼和編碼把多個消息轉換為一個單一的或是,原因是解碼器會在每個消息中生成多個消息對象主要用于處理大數據流,比如一個大小的文件如果你直接傳輸肯定 結構 netty 作為服務端 protobuf 作為序列化數據的協議 websocket 前端通訊 演示 GitHub 地址 showImg(...
摘要:結構作為服務端作為序列化數據的協議前端通訊演示地址服務端實現啟動類長連接示例主線程組從線程組請求的解碼和編碼把多個消息轉換為一個單一的或是,原因是解碼器會在每個消息中生成多個消息對象主要用于處理大數據流,比如一個大小的文件如果你直接傳輸肯定 結構 netty 作為服務端 protobuf 作為序列化數據的協議 websocket 前端通訊 演示 GitHub 地址 showImg(...
閱讀 786·2021-08-23 09:46
閱讀 928·2019-08-30 15:44
閱讀 2586·2019-08-30 13:53
閱讀 3039·2019-08-29 12:48
閱讀 3847·2019-08-26 13:46
閱讀 1780·2019-08-26 13:36
閱讀 3510·2019-08-26 11:46
閱讀 1408·2019-08-26 10:48