摘要:概述在簡易框架需求與設計這篇文章中已經(jīng)給出了協(xié)議的具體細節(jié),協(xié)議類型為二進制協(xié)議,如下協(xié)議的解碼我們稱為,編碼我們成為,下文我們將直接使用和術語。直接貼代碼,參考前文提到的協(xié)議格式閱讀以下代碼協(xié)議編碼器
概述
在《簡易RPC框架:需求與設計》這篇文章中已經(jīng)給出了協(xié)議的具體細節(jié),協(xié)議類型為二進制協(xié)議,如下:
------------------------------------------------------------------------ | magic (2bytes) | version (1byte) | type (1byte) | reserved (7bits) | ------------------------------------------------------------------------ | status (1byte) | id (8bytes) | body length (4bytes) | ------------------------------------------------------------------------ | | | body ($body_length bytes) | | | ------------------------------------------------------------------------
協(xié)議的解碼我們稱為 decode,編碼我們成為 encode,下文我們將直接使用 decode 和 encode 術語。
decode 的本質(zhì)就是講接收到的一串二進制報文,轉化為具體的消息對象,在 Java 中,就是將這串二進制報文所包含的信息,用某種類型的對象存儲起來。
encode 則是將存儲了信息的對象,轉化為具有相同含義的一串二進制報文,然后網(wǎng)絡收發(fā)模塊再將報文發(fā)出去。
無論是 rpc 客戶端還是服務端,都需要有一個 decode 和 encode 的邏輯。
消息類型rpc 客戶端與服務端之間的通信,需要通過發(fā)送不同類型的消息來實現(xiàn),例如:client 向 server 端發(fā)送的消息,可能是請求消息,可能是心跳消息,可能是認證消息,而 server 向 client 發(fā)送的消息,一般就是響應消息。
利用 Java 中的枚舉類型,可以將消息類型進行如下定義:
/** * 消息類型 * * @author beanlam * @version 1.0 */ public enum MessageType { REQUEST((byte) 0x01), HEARTBEAT((byte) 0x02), CHECKIN((byte) 0x03), RESPONSE( (byte) 0x04), UNKNOWN((byte) 0xFF); private byte code; MessageType(byte code) { this.code = code; } public static MessageType valueOf(byte code) { for (MessageType instance : values()) { if (instance.code == code) { return instance; } } return UNKNOWN; } public byte getCode() { return code; } }
在這個類中設計了 valueOf 方法,方便進行具體的 byte 字節(jié)與具體的消息枚舉類型之間的映射和轉換。
調(diào)用狀態(tài)設計client 主動發(fā)起的一次 rpc 調(diào)用,要么成功,要么失敗,server 端有責任告知 client 此次調(diào)用的結果,client 也有責任去感知調(diào)用失敗的原因,因為不一定是 server 端造成的失敗,可能是因為 client 端在對消息進行預處理的時候,例如序列化,就已經(jīng)出錯了,這種錯誤也應該作為一次調(diào)用的調(diào)用結果返回給 client 調(diào)用者。因此引入一個調(diào)用狀態(tài),與消息類型一樣,它也借助了 Java 語言里的枚舉類型來實現(xiàn),并實現(xiàn)了方便的 valueOf 方法:
/** * 調(diào)用狀態(tài) * * @author beanlam * @version 1.0 */ public enum InvocationStatus { OK((byte) 0x01), CLIENT_TIMEOUT((byte) 0x02), SERVER_TIMEOUT( (byte) 0x03), BAD_REQUEST((byte) 0x04), BAD_RESPONSE( (byte) 0x05), SERVICE_NOT_FOUND((byte) 0x06), SERVER_SERIALIZATION_ERROR( (byte) 0x07), CLIENT_SERIALIZATION_ERROR((byte) 0x08), CLIENT_CANCELED( (byte) 0x09), SERVER_BUSY((byte) 0x0A), CLIENT_BUSY( (byte) 0x0B), SERIALIZATION_ERROR((byte) 0x0C), INTERNAL_ERROR( (byte) 0x0D), SERVER_METHOD_INVOKE_ERROR((byte) 0x0E), UNKNOWN((byte) 0xFF); private byte code; InvocationStatus(byte code) { this.code = code; } public static InvocationStatus valueOf(byte code) { for (InvocationStatus instance : values()) { if (code == instance.code) { return instance; } } return UNKNOWN; } public byte getCode() { return code; } }消息實體設計
我們將 client 往 server 端發(fā)送的統(tǒng)一稱為 rpc 請求消息,一個請求對應著一個響應,因此在 client 和 server 端間流動的信息大體上其實就只有兩種,即要么是請求,要么是響應。我們將會定義兩個類,分別是 RpcRequest 和 RpcResponse 來代表請求消息和響應消息。
另外由于無論是請求消息還是響應消息,它們都有一些共同的屬性,例如說“調(diào)用上下文ID”,或者消息類型。因此會再定義一個 RpcMessage 類,作為父類。
RpcMessage/** * rpc消息 * * @author beanlam * @version 1.0 */ public class RpcMessage { private MessageType type; private long contextId; private Object data; public long getContextId() { return this.contextId; } public void setContextId(long id) { this.contextId = id; } public Object getData() { return this.data; } public void setData(Object data) { this.data = data; } public void setType(byte code) { this.type = MessageType.valueOf(code); } public MessageType getType() { return this.type; } public void setType(MessageType type) { this.type = type; } @Override public String toString() { return "[messageType=" + type.name() + ", contextId=" + contextId + ", data=" + data + "]"; } }RpcRequest
import java.util.concurrent.atomic.AtomicLong; /** * rpc請求消息 * * @author beanlam * @version 1.0 */ public class RpcRequest extends RpcMessage { private static final AtomicLong ID_GENERATOR = new AtomicLong(0); public RpcRequest() { this(ID_GENERATOR.incrementAndGet()); } public RpcRequest(long contextId) { setContextId(contextId); setType(MessageType.REQUEST); } }RpcResponse
/** * * rpc響應消息 * * @author beanlam * @version 1.0 */ public class RpcResponse extends RpcMessage { private InvocationStatus status = InvocationStatus.OK; public RpcResponse(long contextId) { setContextId(contextId); setType(MessageType.RESPONSE); } public InvocationStatus getStatus() { return this.status; } public void setStatus(InvocationStatus status) { this.status = status; } @Override public String toString() { return "RpcResponse[contextId=" + getContextId() + ", status=" + status.name() + "]"; } }netty 編解碼介紹
netty 是一個 NIO 框架,應該這么說,netty 是一個有良好設計思想的 NIO 框架。一個 NIO 框架必備的要素就是 reactor 線程模型,目前有一些比較優(yōu)秀而且開源的小型 NIO 框架,例如分庫分表中間件 mycat 實現(xiàn)的一個簡易 NIO 框架,可以在這里看到。
netty 的主要特點有:微內(nèi)核設計、責任鏈模式的業(yè)務邏輯處理、內(nèi)存和資源泄露的檢測等。其中編解碼在 netty 中,都被設計成責任鏈上的一個一個 Handler。
decode 對于 netty 來說,它提供了 ByteToMessageDecoder,它也提供了 MessageToByteEncoder。
借助 netty 來實現(xiàn)協(xié)議編解碼,實際上就是去在這兩個handler里面實現(xiàn)編解碼的邏輯。
decode在實現(xiàn) decode 邏輯時需要注意的一個問題是,由于二進制報文是在網(wǎng)絡上發(fā)送的,因此一個完整的報文可能經(jīng)過多個分組來發(fā)送的,什么意思呢,就是當有報文進來后,要確認報文是否完整,decode邏輯代碼不能假設收到的報文就是一個完整報文,一般稱這為“TCP半包問題”。同樣,報文是連著報文發(fā)送的,意味著decode代碼邏輯還要負責在一長串二進制序列中,分割出一個一個獨立的報文,這稱之為“TCP粘包問題”。
netty 本身有提供一些方便的 decoder handler 來處理 TCP 半包和粘包的問題。不過一般情況下我們不會直接去用它,因為我們的協(xié)議比較簡單,自己在代碼里處理一下就可以了。
完整的 decode 代碼邏輯如下所示:
import cn.com.agree.ats.rpc.message.*; import cn.com.agree.ats.util.logfacade.AbstractPuppetLoggerFactory; import cn.com.agree.ats.util.logfacade.IPuppetLogger; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 協(xié)議解碼器 * * @author beanlam * @version 1.0 */ public class ProtocolDecoder extends ByteToMessageDecoder { private static final IPuppetLogger logger = AbstractPuppetLoggerFactory .getInstance(ProtocolDecoder.class); private boolean magicChecked = false; @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List
可以看到,我們解決半包問題的時候,是判斷有沒有收到我們期望收到的報文,如果沒有,直接在 decode 方法里面 return,等有更多的報文被收到的時候,netty 會自動幫我們調(diào)起 decode 方法。而我們解決粘包問題的思路也很清晰,那就是一次只處理一個報文,不去動后面的報文內(nèi)容。
還需要注意的是,在 netty 中,對于 ByteBuf 的 get 是不會消費掉報文的,而 read 是會消費掉報文的。當不確定報文是否收完整的時候,我們都是用 get開頭的方法去試探性地驗證報文是否接收完全,當確定報文接收完全后,我們才用 read 開頭的方法去消費這段報文。
encode直接貼代碼,參考前文提到的協(xié)議格式閱讀以下代碼:
/** * * 協(xié)議編碼器 * * @author beanlam * @version 1.0 */ public class ProtocolEncoder extends MessageToByteEncoder{ @Override protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out) throws Exception { byte status; byte[] data = (byte[]) rpcMessage.getData(); if (rpcMessage instanceof RpcRequest) { RpcRequest request = (RpcRequest) rpcMessage; status = InvocationStatus.OK.getCode(); } else { RpcResponse response = (RpcResponse) rpcMessage; status = response.getStatus().getCode(); } out.writeShort(ProtocolMetaData.MAGIC); out.writeByte(ProtocolMetaData.VERSION); out.writeByte(rpcMessage.getType().getCode()); out.writeByte(status); out.writeLong(rpcMessage.getContextId()); out.writeInt(data.length); out.writeBytes(data); } }
文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72065.html
摘要:由于我們還未談到具體的調(diào)用機制,因此暫且認為就是把一個包含了調(diào)用信息的對象,從經(jīng)過序列化,變成一串二進制流,發(fā)送到了端。 概述 在上一篇文章《簡易RPC框架:基于 netty 的協(xié)議編解碼》中談到對于協(xié)議的 decode 和 encode,在談 decode 之前,必須先要知道 encode 的過程是什么,它把什么東西轉化成了二進制協(xié)議。由于我們還未談到具體的 RPC 調(diào)用機制,因此暫...
摘要:是一個分布式服務框架,以及治理方案。手寫注意要點手寫注意要點基于上文中對于協(xié)議的理解,如果我們自己去實現(xiàn),需要考慮哪些技術呢其實基于圖的整個流程應該有一個大概的理解。基于手寫實現(xiàn)基于手寫實現(xiàn)理解了協(xié)議后,我們基于來實現(xiàn)一個通信框架。閱讀這篇文章之前,建議先閱讀和這篇文章關聯(lián)的內(nèi)容。[1]詳細剖析分布式微服務架構下網(wǎng)絡通信的底層實現(xiàn)原理(圖解)[2][年薪60W的技巧]工作了5年,你真的理解N...
摘要:是一個面向字節(jié)流的協(xié)議,它是性質(zhì)是流式的,所以它并沒有分段。可基于分隔符解決。編解碼的主要目的就是為了可以編碼成字節(jié)流用于在網(wǎng)絡中傳輸持久化存儲。 showImg(https://segmentfault.com/img/remote/1460000015895049); 前言 記得前段時間我們生產(chǎn)上的一個網(wǎng)關出現(xiàn)了故障。 這個網(wǎng)關邏輯非常簡單,就是接收客戶端的請求然后解析報文最后發(fā)送...
摘要:等之所以支持跨語言,是因為他們自己定義了一套結構化數(shù)據(jù)存儲格式,如的,用于編解碼對象,作為各個語言通信的中間協(xié)議。 前段時間覺得自己一直用別人的框架,站在巨人的肩膀上,也該自己造造輪子了 一時興起 就著手寫起了RPC框架 這里寫了系列博客拿給大家分享下 這篇是開篇的思路篇 項目最終的代碼放在了我的github上https://github.com/wephone/Me... 歡迎sta...
摘要:豐富的緩存數(shù)據(jù)結構使用它自己的緩存來表示字節(jié)序列而不是的。針對有一個定義良好的事件模型。有一些協(xié)議是多層的建立在其他低級協(xié)議基礎上。此外,甚至不是完全線程安全的。協(xié)議由標準化為。協(xié)議緩存整合是一個高效二進制協(xié)議的快速實現(xiàn)。 Chapter 2、結構概覽 這一節(jié)我們將確認Netty提供的核心功能是什么,以及它們怎么構成一個完整的網(wǎng)絡應用開發(fā)堆棧。 1、豐富的緩存數(shù)據(jù)結構 Netty使用它...
閱讀 1924·2021-11-19 09:40
閱讀 2132·2021-10-09 09:43
閱讀 3294·2021-09-06 15:00
閱讀 2810·2019-08-29 13:04
閱讀 2766·2019-08-26 11:53
閱讀 3512·2019-08-26 11:46
閱讀 2320·2019-08-26 11:38
閱讀 390·2019-08-26 11:27