摘要:使用實現心跳機制代碼環境和具體思路如下使用提供的來檢測讀寫操作的空閑時間使用序列化客戶端空閑后向服務端發送一個心跳包服務端空閑后心跳丟失計數器丟失的心跳包數量當丟失的心跳包數量超過個時,主動斷開該客戶端的斷開連接后,客戶端之后重新連接代碼已
使用Netty實現心跳機制 代碼環境:JDK1.8和Netty4.x 具體思路如下:
使用Netty提供的IdleStateHandler來檢測讀寫操作的空閑時間
使用Protocol Buffer序列化
客戶端write空閑5s后向服務端發送一個心跳包
服務端read空閑6s后心跳丟失計數器+1(丟失的心跳包數量)
當丟失的心跳包數量超過3個時,主動斷開該客戶端的channel
斷開連接后,客戶端10s之后重新連接
代碼已上傳至GitHub:完整代碼地址 代碼實現: 數據包結構(proto文件)option java_outer_classname = "PacketProto"; message Packet { // 包的類型 enum PacketType { // 心跳包 HEARTBEAT = 1; // 非心跳包 DATA = 2; } // 包類型 required PacketType packetType = 1; // 數據部分(可選,心跳包不包含數據部分) optional string data = 2; }ClientHeartbeatHandler類
public class ClientHeartbeatHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("--- Server is active ---"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("--- Server is inactive ---"); // 10s 之后嘗試重新連接服務器 System.out.println("10s 之后嘗試重新連接服務器..."); Thread.sleep(10 * 1000); Client.doConnect(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { // 不管是讀事件空閑還是寫事件空閑都向服務器發送心跳包 sendHeartbeatPacket(ctx); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("連接出現異常"); } /** * 發送心跳包 * * @param ctx */ private void sendHeartbeatPacket(ChannelHandlerContext ctx) { Packet.Builder builder = newBuilder(); builder.setPacketType(Packet.PacketType.HEARTBEAT); Packet packet = builder.build(); ctx.writeAndFlush(packet); } }Client類
public class Client { private static Channel ch; private static Bootstrap bootstrap; public static void main(String[] args) { NioEventLoopGroup workGroup = new NioEventLoopGroup(); try { bootstrap = new Bootstrap(); bootstrap .group(workGroup) .channel(NioSocketChannel.class) .handler(new ChannelInitializerServerHeartbeatHandler類() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32LengthFieldPrepender()); pipeline.addLast(new ProtobufEncoder()); pipeline.addLast(new IdleStateHandler(0, 5, 0)); pipeline.addLast(new ClientHeartbeatHandler()); } }); // 連接服務器 doConnect(); // 模擬不定時發送向服務器發送數據的過程 Random random = new Random(); while (true) { int num = random.nextInt(21); Thread.sleep(num * 1000); PacketProto.Packet.Builder builder = newBuilder(); builder.setPacketType(PacketProto.Packet.PacketType.DATA); builder.setData("我是數據包(非心跳包) " + num); PacketProto.Packet packet = builder.build(); ch.writeAndFlush(packet); } } catch (InterruptedException e) { e.printStackTrace(); } finally { workGroup.shutdownGracefully(); } } /** * 抽取出該方法 (斷線重連時使用) * * @throws InterruptedException */ public static void doConnect() throws InterruptedException { ch = bootstrap.connect("127.0.0.1", 20000).sync().channel(); } }
public class ServerHeartbeatHandler extends ChannelInboundHandlerAdapter { // 心跳丟失計數器 private int counter; @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("--- Client is active ---"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("--- Client is inactive ---"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 判斷接收到的包類型 if (msg instanceof Packet) { Packet packet = (Packet) msg; switch (packet.getPacketType()) { case HEARTBEAT: handleHeartbreat(ctx, packet); break; case DATA: handleData(ctx, packet); break; default: break; } } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { // 空閑6s之后觸發 (心跳包丟失) if (counter >= 3) { // 連續丟失3個心跳包 (斷開連接) ctx.channel().close().sync(); System.out.println("已與Client斷開連接"); } else { counter++; System.out.println("丟失了第 " + counter + " 個心跳包"); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("連接出現異常"); } /** * 處理心跳包 * * @param ctx * @param packet */ private void handleHeartbreat(ChannelHandlerContext ctx, Packet packet) { // 將心跳丟失計數器置為0 counter = 0; System.out.println("收到心跳包"); ReferenceCountUtil.release(packet); } /** * 處理數據包 * * @param ctx * @param packet */ private void handleData(ChannelHandlerContext ctx, Packet packet) { // 將心跳丟失計數器置為0 counter = 0; String data = packet.getData(); System.out.println(data); ReferenceCountUtil.release(packet); } }Server類
public class Server { public static void main(String[] args) { NioEventLoopGroup acceptorGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .group(acceptorGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new ProtobufVarint32FrameDecoder()); pipeline.addLast(new ProtobufDecoder(PacketProto.Packet.getDefaultInstance())); pipeline.addLast(new IdleStateHandler(6, 0, 0)); pipeline.addLast(new ServerHeartbeatHandler()); } }); Channel ch = bootstrap.bind(20000).sync().channel(); System.out.println("Server has started..."); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { acceptorGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/65289.html
摘要:基礎何為心跳顧名思義所謂心跳即在長連接中客戶端和服務器之間定期發送的一種特殊的數據包通知對方自己還在線以確保連接的有效性為什么需要心跳因為網絡的不可靠性有可能在保持長連接的過程中由于某些突發情況例如網線被拔出突然掉電等會造成服務器和客戶端的 基礎 何為心跳 顧名思義, 所謂 心跳, 即在 TCP 長連接中, 客戶端和服務器之間定期發送的一種特殊的數據包, 通知對方自己還在線, 以確保 ...
摘要:比如面向連接的功能包發送接收數量包發送接收速率錯誤計數連接重連次數調用延遲連接狀態等。你要處理的,就是心跳超時的邏輯,比如延遲重連。發生異常后,可以根據不同的類型選擇斷線重連比如一些二進制協議的編解碼紊亂問題,或者調度到其他節點。 在java界,netty無疑是開發網絡應用的拿手菜。你不需要太多關注復雜的nio模型和底層網絡的細節,使用其豐富的接口,可以很容易的實現復雜的通訊功能。 和...
摘要:比如面向連接的功能包發送接收數量包發送接收速率錯誤計數連接重連次數調用延遲連接狀態等。你要處理的,就是心跳超時的邏輯,比如延遲重連。發生異常后,可以根據不同的類型選擇斷線重連比如一些二進制協議的編解碼紊亂問題,或者調度到其他節點。 在java界,netty無疑是開發網絡應用的拿手菜。你不需要太多關注復雜的nio模型和底層網絡的細節,使用其豐富的接口,可以很容易的實現復雜的通訊功能。 和...
摘要:超過后則認為服務端出現故障,需要重連。同時在每次心跳時候都用當前時間和之前服務端響應綁定到上的時間相減判斷是否需要重連即可。客戶端檢測到某個服務端遲遲沒有響應心跳也能重連獲取一個新的連接。 showImg(https://segmentfault.com/img/remote/1460000017987884?w=800&h=536); 前言 說道心跳這個詞大家都不陌生,當然不是指男女...
閱讀 3920·2021-11-24 09:38
閱讀 3096·2021-11-17 09:33
閱讀 3871·2021-11-10 11:48
閱讀 1241·2021-10-14 09:48
閱讀 3130·2019-08-30 13:14
閱讀 2551·2019-08-29 18:37
閱讀 3393·2019-08-29 12:38
閱讀 1418·2019-08-29 12:30