摘要:設(shè)置每個(gè)數(shù)據(jù)包的大小如個(gè)字節(jié),如果某個(gè)數(shù)據(jù)包不足個(gè)字節(jié)可能會(huì)出現(xiàn)丟包的情況,即該數(shù)據(jù)包未從一個(gè)端到另一個(gè)端,此時(shí)需要用空格或者既定的符號(hào)補(bǔ)充在數(shù)據(jù)包之間使用一些字符進(jìn)行分割如號(hào)之類(lèi)的,解析的時(shí)候先處理掉分隔符再拿到各個(gè)數(shù)據(jù)包就好了。
netty
概念: Netty是由JBOSS提供的一個(gè)java開(kāi)源框架。Netty提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開(kāi)發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶(hù)端程序。
也就是說(shuō),Netty 是一個(gè)基于NIO的客戶(hù)、服務(wù)器端編程框架,使用Netty 可以確保你快速和簡(jiǎn)單的開(kāi)發(fā)出一個(gè)網(wǎng)絡(luò)應(yīng)用,例如實(shí)現(xiàn)了某種協(xié)議的客戶(hù),服務(wù)端應(yīng)用。Netty相當(dāng)簡(jiǎn)化和流線(xiàn)化了網(wǎng)絡(luò)應(yīng)用的編程開(kāi)發(fā)過(guò)程,例如,TCP和UDP的socket服務(wù)開(kāi)發(fā)。
處理大容量數(shù)據(jù)流更簡(jiǎn)單
處理協(xié)議編碼和單元測(cè)試更簡(jiǎn)單
I/O超時(shí)和idle狀態(tài)檢測(cè)
應(yīng)用程序的關(guān)閉更簡(jiǎn)單,更安全
更可靠的OutOfMemoryError預(yù)防
性能更好的吞吐量,更低的延遲
更少的資源消耗
最小化不必要的內(nèi)存拷貝
具體使用見(jiàn)代碼及注釋Helloword版
服務(wù)端這邊綁定了兩個(gè)端口,可以根據(jù)業(yè)務(wù)區(qū)別對(duì)待如端口1是做A業(yè)務(wù),端2做B業(yè)務(wù).
public class Server { public static void main(String[] args) throws InterruptedException { //1.創(chuàng)建兩個(gè)線(xiàn)程組 (只有服務(wù)器端需要 ) //一個(gè)線(xiàn)程組專(zhuān)門(mén)用來(lái)管理接收客戶(hù)端的請(qǐng)求連接的 //一個(gè)線(xiàn)程組進(jìn)行網(wǎng)絡(luò)通信(讀寫(xiě)) EventLoopGroup receiveGroup = new NioEventLoopGroup(); EventLoopGroup dealGroup = new NioEventLoopGroup(); //創(chuàng)建輔助工具類(lèi),用于設(shè)置服務(wù)器通道的一系列配置 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(receiveGroup, dealGroup)//綁定兩個(gè)線(xiàn)程組 .channel(NioServerSocketChannel.class) //指定NIO的模式 .option(ChannelOption.SO_BACKLOG, 1024) //設(shè)置tcp緩沖區(qū) .option(ChannelOption.SO_SNDBUF, 32*1024) //設(shè)置發(fā)送緩沖區(qū)大小 .option(ChannelOption.SO_RCVBUF, 32*1024) //設(shè)置接收緩沖大小 .option(ChannelOption.SO_KEEPALIVE, true) //保持連接 .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel sc) throws Exception { //3 在這里配置具體數(shù)據(jù)接收方法的處理 sc.pipeline().addLast(new ServerHandler()); } }); //4 進(jìn)行綁定 ChannelFuture cf1 = serverBootstrap.bind(8765).sync(); ChannelFuture cf2 = serverBootstrap.bind(8764).sync(); //5 等待關(guān)閉 cf1.channel().closeFuture().sync(); cf2.channel().closeFuture().sync(); receiveGroup.shutdownGracefully(); dealGroup.shutdownGracefully(); } }
服務(wù)端處理器:
public class ServerHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "gbk"); System.out.println("Server :" + body ); String response = "進(jìn)行返回給客戶(hù)端的響應(yīng):" + body ; //注意使用了writeAndFlush的話(huà)就可以不釋放ReferenceCountUtil.release(msg); 否則需要釋放ByteBuf容器的數(shù)據(jù)。 ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); //.addListener(ChannelFutureListener.CLOSE);//監(jiān)聽(tīng),內(nèi)容傳輸完畢后就關(guān)閉管道 } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("讀完了"); ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { ctx.close(); } }
客戶(hù)端:
public class Client { public static void main(String[] args) throws Exception{ EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf1 = b.connect("127.0.0.1", 8765).sync(); ChannelFuture cf2 = b.connect("127.0.0.1", 8764).sync(); //發(fā)送消息 cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:777".getBytes())); Thread.sleep(1000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:666".getBytes())); cf2.channel().writeAndFlush(Unpooled.copiedBuffer("C2:888".getBytes())); Thread.sleep(2000); cf1.channel().writeAndFlush(Unpooled.copiedBuffer("C1:888".getBytes())); cf2.channel().writeAndFlush(Unpooled.copiedBuffer("C2:666".getBytes())); cf1.channel().closeFuture().sync(); cf2.channel().closeFuture().sync(); group.shutdownGracefully(); } }
客戶(hù)端處理器:
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("客戶(hù)端的channelActive()方法"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { ByteBuf buf = (ByteBuf) msg; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "gbk"); System.out.println("Client :" + body ); } finally { ReferenceCountUtil.release(msg); } }TCP拆包粘包問(wèn)題
TCP是個(gè)“流”協(xié)議,所謂流,就是沒(méi)有界限的一串?dāng)?shù)據(jù)。大家可以想想河里的流水,是連成一片的,其間并沒(méi)有分界線(xiàn)。TCP底層并不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會(huì)根據(jù)TCP緩沖區(qū)的實(shí)際情況進(jìn)行包的劃分,所以在業(yè)務(wù)上認(rèn)為,一個(gè)完整的包可能會(huì)被TCP拆分成多個(gè)包進(jìn)行發(fā)送,也有可能把多個(gè)小的包封裝成一個(gè)大的數(shù)據(jù)包發(fā)送,這就是所謂的TCP粘包和拆包問(wèn)題。
通俗意義來(lái)說(shuō)可能是三個(gè)數(shù)據(jù)如"A","B","C" 但經(jīng)過(guò)TCP協(xié)議流式傳輸后成了"AB","C"兩個(gè)數(shù)據(jù)了,這種就是粘包了數(shù)據(jù)包之間粘一起了。那么拆包的話(huà)有三種方式。
設(shè)置每個(gè)數(shù)據(jù)包的大小如200個(gè)字節(jié),如果某個(gè)數(shù)據(jù)包不足200個(gè)字節(jié)可能會(huì)出現(xiàn)丟包的情況,即該數(shù)據(jù)包未從一個(gè)端到另一個(gè)端,此時(shí)需要用空格或者既定的符號(hào)補(bǔ)充.
在數(shù)據(jù)包之間使用一些字符進(jìn)行分割如$號(hào)之類(lèi)的,解析的時(shí)候先處理掉分隔符再拿到各個(gè)數(shù)據(jù)包就好了。(一般用的比較多)
細(xì)粒化數(shù)據(jù)包分為頭和尾(將消息分為消息頭和消息尾)
其他
兩根水管(服務(wù)器與客戶(hù)端)需要相互流通水(數(shù)據(jù)),那么需要一個(gè)轉(zhuǎn)接頭(套接字)連接,水流式無(wú)法區(qū)分一段段的數(shù)據(jù),一種方式在流通的過(guò)程中設(shè)置些標(biāo)志性物品如記號(hào)筆勾一下(分隔符),另一種方式則是設(shè)定每一段都是多少容量的水來(lái)區(qū)分.
使用分隔符解決TCP粘包可以理解管道流里流的都是ByteBuffer類(lèi)型的數(shù)據(jù),那么使用分隔符(非ByteBuffer類(lèi)型)的話(huà)可能就意味著一個(gè)轉(zhuǎn)碼與解碼的過(guò)程。
服務(wù)端:
public class Server { public static void main(String[] args) throws Exception{ //1 創(chuàng)建2個(gè)線(xiàn)程,一個(gè)是負(fù)責(zé)接收客戶(hù)端的連接。一個(gè)是負(fù)責(zé)進(jìn)行數(shù)據(jù)傳輸?shù)? EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); //2 創(chuàng)建服務(wù)器輔助類(lèi) ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel sc) throws Exception { //設(shè)置特殊分隔符 解決TCP拆包黏包問(wèn)題, ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); //設(shè)置字符串形式的解碼 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); //4 綁定連接 ChannelFuture cf = b.bind(8765).sync(); //等待服務(wù)器監(jiān)聽(tīng)端口關(guān)閉 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
服務(wù)端處理器:
public class ServerHandler extends ChannelHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println(" server channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String request = (String)msg; System.out.println("Server channelRead:" + request); String response = "服務(wù)器響應(yīng):" + msg + "$"; ctx.writeAndFlush(Unpooled.copiedBuffer(response.getBytes())); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable t) throws Exception { System.out.println("exceptionCaught"); ctx.close(); } }
客戶(hù)端:
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel sc) throws Exception { // ByteBuf buf = Unpooled.copiedBuffer("$".getBytes()); sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buf)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("數(shù)據(jù)A$".getBytes())); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("數(shù)據(jù)B$".getBytes())); //等待客戶(hù)端端口關(guān)閉 cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
客戶(hù)端處理器:
public class ClientHandler extends ChannelHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("client channel active... "); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { String response = (String)msg; System.out.println("Client: " + response); } finally { ReferenceCountUtil.release(msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { System.out.println("channelReadComplete"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("exceptionCaught"); ctx.close(); } }設(shè)置長(zhǎng)度大小解決TCP拆包黏包問(wèn)題
服務(wù)端:
public class Server { public static void main(String[] args) throws Exception{ //1 創(chuàng)建2個(gè)線(xiàn)程,一個(gè)是負(fù)責(zé)接收客戶(hù)端的連接。一個(gè)是負(fù)責(zé)進(jìn)行數(shù)據(jù)傳輸?shù)? EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); //2 創(chuàng)建服務(wù)器輔助類(lèi) ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32*1024) .option(ChannelOption.SO_RCVBUF, 32*1024) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel sc) throws Exception { //設(shè)置定長(zhǎng)字符串接收 sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); //設(shè)置字符串形式的解碼 sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ServerHandler()); } }); //4 綁定連接 ChannelFuture cf = b.bind(8765).sync(); //等待服務(wù)器監(jiān)聽(tīng)端口關(guān)閉 cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
客戶(hù)端:
public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(new FixedLengthFrameDecoder(5)); sc.pipeline().addLast(new StringDecoder()); sc.pipeline().addLast(new ClientHandler()); } }); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().writeAndFlush(Unpooled.wrappedBuffer("aaaaabbbbb".getBytes())); cf.channel().writeAndFlush(Unpooled.copiedBuffer("ccccccc".getBytes())); //等待客戶(hù)端端口關(guān)閉 cf.channel().closeFuture().sync(); group.shutdownGracefully(); } }
服務(wù)端與客戶(hù)端的處理器參照上例以字符串分割的.
新手上路,多多關(guān)注...
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/68178.html
摘要:提供異步的事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開(kāi)發(fā)高性能高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶(hù)端程序。總結(jié)我們完成了服務(wù)端的簡(jiǎn)單搭建,模擬了聊天會(huì)話(huà)場(chǎng)景。 之前一直在搞前端的東西,都快忘了自己是個(gè)java開(kāi)發(fā)。其實(shí)還有好多java方面的東西沒(méi)搞過(guò),突然了解到netty,覺(jué)得有必要學(xué)一學(xué)。 介紹 Netty是由JBOSS提供的一個(gè)java開(kāi)源框架。Netty提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框...
時(shí)間:2018年04月11日星期三 說(shuō)明:本文部分內(nèi)容均來(lái)自慕課網(wǎng)。@慕課網(wǎng):https://www.imooc.com 教學(xué)源碼:https://github.com/zccodere/s... 學(xué)習(xí)源碼:https://github.com/zccodere/s... 第一章:課程介紹 1-1 課程介紹 什么是Netty 高性能、事件驅(qū)動(dòng)、異步非阻塞的IO Java開(kāi)源框架 基于NIO的客戶(hù)...
摘要:目錄此文章屬于源碼之下無(wú)秘密做最好的源碼分析教程系列文章之一代碼下載首先到的倉(cāng)庫(kù)中點(diǎn)擊右邊綠色的按鈕拷貝地址然后在終端中輸入如下命令克隆工程工程源碼較大加上國(guó)內(nèi)網(wǎng)絡(luò)問(wèn)題下載源碼可能會(huì)比較耗時(shí)當(dāng)有如下輸出時(shí)表示克隆成功了如果有朋友實(shí)在下載太 目錄 此文章屬于 源碼之下無(wú)秘密 ── 做最好的 Netty 源碼分析教程 系列文章之一. 代碼下載 首先到 Netty 的 Github 倉(cāng)庫(kù) 中...
摘要:背景在工作中雖然我經(jīng)常使用到庫(kù)但是很多時(shí)候?qū)Φ囊恍└拍钸€是處于知其然不知其所以然的狀態(tài)因此就萌生了學(xué)習(xí)源碼的想法剛開(kāi)始看源碼的時(shí)候自然是比較痛苦的主要原因有兩個(gè)第一網(wǎng)上沒(méi)有找到讓我滿(mǎn)意的詳盡的源碼分析的教程第二我也是第一次系統(tǒng)地學(xué)習(xí)這么大代 背景 在工作中, 雖然我經(jīng)常使用到 Netty 庫(kù), 但是很多時(shí)候?qū)?Netty 的一些概念還是處于知其然, 不知其所以然的狀態(tài), 因此就萌生了學(xué)...
摘要:結(jié)構(gòu)作為服務(wù)端作為序列化數(shù)據(jù)的協(xié)議前端通訊演示地址服務(wù)端實(shí)現(xiàn)啟動(dòng)類(lèi)長(zhǎng)連接示例主線(xiàn)程組從線(xiàn)程組請(qǐng)求的解碼和編碼把多個(gè)消息轉(zhuǎn)換為一個(gè)單一的或是,原因是解碼器會(huì)在每個(gè)消息中生成多個(gè)消息對(duì)象主要用于處理大數(shù)據(jù)流,比如一個(gè)大小的文件如果你直接傳輸肯定 結(jié)構(gòu) netty 作為服務(wù)端 protobuf 作為序列化數(shù)據(jù)的協(xié)議 websocket 前端通訊 演示 GitHub 地址 showImg(...
閱讀 1993·2021-11-24 10:45
閱讀 1849·2021-10-09 09:43
閱讀 1291·2021-09-22 15:38
閱讀 1219·2021-08-18 10:19
閱讀 2837·2019-08-30 15:55
閱讀 3056·2019-08-30 12:45
閱讀 2960·2019-08-30 11:25
閱讀 356·2019-08-29 11:30