国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

簡易RPC框架:基于 netty 的協(xié)議編解碼

Loong_T / 3462人閱讀

摘要:概述在簡易框架需求與設計這篇文章中已經(jīng)給出了協(xié)議的具體細節(jié),協(xié)議類型為二進制協(xié)議,如下協(xié)議的解碼我們稱為,編碼我們成為,下文我們將直接使用和術語。直接貼代碼,參考前文提到的協(xié)議格式閱讀以下代碼協(xié)議編碼器

概述

在《簡易RPC框架:需求與設計》這篇文章中已經(jīng)給出了協(xié)議的具體細節(jié),協(xié)議類型為二進制協(xié)議,如下:

 ------------------------------------------------------------------------
 | magic (2bytes) | version (1byte) |  type (1byte)  | reserved (7bits) | 
 ------------------------------------------------------------------------
 | status (1byte) |    id (8bytes)    |        body length (4bytes)     |
 ------------------------------------------------------------------------
 |                                                                      |
 |                   body ($body_length bytes)                          |
 |                                                                      |
 ------------------------------------------------------------------------

協(xié)議的解碼我們稱為 decode,編碼我們成為 encode,下文我們將直接使用 decode 和 encode 術語。

decode 的本質(zhì)就是講接收到的一串二進制報文,轉化為具體的消息對象,在 Java 中,就是將這串二進制報文所包含的信息,用某種類型的對象存儲起來。

encode 則是將存儲了信息的對象,轉化為具有相同含義的一串二進制報文,然后網(wǎng)絡收發(fā)模塊再將報文發(fā)出去。

無論是 rpc 客戶端還是服務端,都需要有一個 decode 和 encode 的邏輯。

消息類型

rpc 客戶端與服務端之間的通信,需要通過發(fā)送不同類型的消息來實現(xiàn),例如:client 向 server 端發(fā)送的消息,可能是請求消息,可能是心跳消息,可能是認證消息,而 server 向 client 發(fā)送的消息,一般就是響應消息。

利用 Java 中的枚舉類型,可以將消息類型進行如下定義:

/**
 * 消息類型
 *
 * @author beanlam
 * @version 1.0  
 */ 

public enum MessageType {

    REQUEST((byte) 0x01), HEARTBEAT((byte) 0x02), CHECKIN((byte) 0x03), RESPONSE(
            (byte) 0x04), UNKNOWN((byte) 0xFF);

    private byte code;

    MessageType(byte code) {
        this.code = code;
    }

    public static MessageType valueOf(byte code) {
        for (MessageType instance : values()) {
            if (instance.code == code) {
                return instance;
            }
        }
        return UNKNOWN;
    }

    public byte getCode() {
        return code;
    }
}

在這個類中設計了 valueOf 方法,方便進行具體的 byte 字節(jié)與具體的消息枚舉類型之間的映射和轉換。

調(diào)用狀態(tài)設計

client 主動發(fā)起的一次 rpc 調(diào)用,要么成功,要么失敗,server 端有責任告知 client 此次調(diào)用的結果,client 也有責任去感知調(diào)用失敗的原因,因為不一定是 server 端造成的失敗,可能是因為 client 端在對消息進行預處理的時候,例如序列化,就已經(jīng)出錯了,這種錯誤也應該作為一次調(diào)用的調(diào)用結果返回給 client 調(diào)用者。因此引入一個調(diào)用狀態(tài),與消息類型一樣,它也借助了 Java 語言里的枚舉類型來實現(xiàn),并實現(xiàn)了方便的 valueOf 方法:

/**

 * 調(diào)用狀態(tài)

 *

 * @author beanlam

 * @version 1.0

 */

public enum InvocationStatus {

    OK((byte) 0x01), CLIENT_TIMEOUT((byte) 0x02), SERVER_TIMEOUT(

            (byte) 0x03), BAD_REQUEST((byte) 0x04), BAD_RESPONSE(

            (byte) 0x05), SERVICE_NOT_FOUND((byte) 0x06), SERVER_SERIALIZATION_ERROR(

            (byte) 0x07), CLIENT_SERIALIZATION_ERROR((byte) 0x08), CLIENT_CANCELED(

            (byte) 0x09), SERVER_BUSY((byte) 0x0A), CLIENT_BUSY(

            (byte) 0x0B), SERIALIZATION_ERROR((byte) 0x0C), INTERNAL_ERROR(

            (byte) 0x0D), SERVER_METHOD_INVOKE_ERROR((byte) 0x0E), UNKNOWN((byte) 0xFF);

    private byte code;

    InvocationStatus(byte code) {

        this.code = code;

    }

    public static InvocationStatus valueOf(byte code) {

        for (InvocationStatus instance : values()) {

            if (code == instance.code) {

                return instance;

            }

        }

        return UNKNOWN;

    }

    public byte getCode() {

        return code;

    }

}
消息實體設計

我們將 client 往 server 端發(fā)送的統(tǒng)一稱為 rpc 請求消息,一個請求對應著一個響應,因此在 client 和 server 端間流動的信息大體上其實就只有兩種,即要么是請求,要么是響應。我們將會定義兩個類,分別是 RpcRequest 和 RpcResponse 來代表請求消息和響應消息。

另外由于無論是請求消息還是響應消息,它們都有一些共同的屬性,例如說“調(diào)用上下文ID”,或者消息類型。因此會再定義一個 RpcMessage 類,作為父類。

RpcMessage
/**

 * rpc消息

 *

 * @author beanlam

 * @version 1.0

 */

public class RpcMessage {

    private MessageType type;

    private long contextId;

    private Object data;

    public long getContextId() {

        return this.contextId;

    }

    public void setContextId(long id) {

        this.contextId = id;

    }

    public Object getData() {

        return this.data;

    }

    public void setData(Object data) {

        this.data = data;

    }

    public void setType(byte code) {

        this.type = MessageType.valueOf(code);

    }

    public MessageType getType() {

        return this.type;

    }

    public void setType(MessageType type) {

        this.type = type;

    }

    @Override

    public String toString() {

        return "[messageType=" + type.name() + ", contextId=" + contextId + ", data="

                + data + "]";

    }

}
RpcRequest
import java.util.concurrent.atomic.AtomicLong;

/**

 * rpc請求消息

 *

 * @author beanlam

 * @version 1.0

 */

public class RpcRequest extends RpcMessage {

    private static final AtomicLong ID_GENERATOR = new AtomicLong(0);

    public RpcRequest() {

        this(ID_GENERATOR.incrementAndGet());

    }

    public RpcRequest(long contextId) {

        setContextId(contextId);

        setType(MessageType.REQUEST);

    }

}
RpcResponse
/**

 *

 * rpc響應消息

 *

 * @author beanlam

 * @version 1.0

 */

public class RpcResponse extends RpcMessage {

    private InvocationStatus status = InvocationStatus.OK;

    public RpcResponse(long contextId) {

        setContextId(contextId);

        setType(MessageType.RESPONSE);

    }

    public InvocationStatus getStatus() {

        return this.status;

    }

    public void setStatus(InvocationStatus status) {

        this.status = status;

    }

    @Override

    public String toString() {

        return "RpcResponse[contextId=" + getContextId() + ", status=" + status.name() + "]";

    }

}
netty 編解碼介紹

netty 是一個 NIO 框架,應該這么說,netty 是一個有良好設計思想的 NIO 框架。一個 NIO 框架必備的要素就是 reactor 線程模型,目前有一些比較優(yōu)秀而且開源的小型 NIO 框架,例如分庫分表中間件 mycat 實現(xiàn)的一個簡易 NIO 框架,可以在這里看到。

netty 的主要特點有:微內(nèi)核設計、責任鏈模式的業(yè)務邏輯處理、內(nèi)存和資源泄露的檢測等。其中編解碼在 netty 中,都被設計成責任鏈上的一個一個 Handler。

decode 對于 netty 來說,它提供了 ByteToMessageDecoder,它也提供了 MessageToByteEncoder。

借助 netty 來實現(xiàn)協(xié)議編解碼,實際上就是去在這兩個handler里面實現(xiàn)編解碼的邏輯。

decode

在實現(xiàn) decode 邏輯時需要注意的一個問題是,由于二進制報文是在網(wǎng)絡上發(fā)送的,因此一個完整的報文可能經(jīng)過多個分組來發(fā)送的,什么意思呢,就是當有報文進來后,要確認報文是否完整,decode邏輯代碼不能假設收到的報文就是一個完整報文,一般稱這為“TCP半包問題”。同樣,報文是連著報文發(fā)送的,意味著decode代碼邏輯還要負責在一長串二進制序列中,分割出一個一個獨立的報文,這稱之為“TCP粘包問題”。

netty 本身有提供一些方便的 decoder handler 來處理 TCP 半包和粘包的問題。不過一般情況下我們不會直接去用它,因為我們的協(xié)議比較簡單,自己在代碼里處理一下就可以了。

完整的 decode 代碼邏輯如下所示:

import cn.com.agree.ats.rpc.message.*;

import cn.com.agree.ats.util.logfacade.AbstractPuppetLoggerFactory;

import cn.com.agree.ats.util.logfacade.IPuppetLogger;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.ByteToMessageDecoder;

import java.util.List;

/**

 * 協(xié)議解碼器

 *

 * @author beanlam

 * @version 1.0

 */

public class ProtocolDecoder extends ByteToMessageDecoder {

    private static final IPuppetLogger logger = AbstractPuppetLoggerFactory

            .getInstance(ProtocolDecoder.class);

    private boolean magicChecked = false;

    @Override

    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List list)

            throws Exception {

        if (!magicChecked) {

            if (in.readableBytes() < ProtocolMetaData.MAGIC_LENGTH_IN_BYTES) {

                return;

            }

            magicChecked = true;

            if (!(in.getShort(in.readerIndex()) == ProtocolMetaData.MAGIC)) {

                logger.warn(

                        "illegal data received without correct magic number, channel will be close");

                ctx.close();

                magicChecked = false; //this line of code makes no any sense, but it"s good for a warning

                return;

            }

        }

        if (in.readableBytes() < ProtocolMetaData.HEADER_LENGTH_IN_BYTES) {

            return;

        }

        int bodyLength = in

                .getInt(in.readerIndex() + ProtocolMetaData.BODY_LENGTH_OFFSET);

        if (in.readableBytes() < bodyLength + ProtocolMetaData.HEADER_LENGTH_IN_BYTES) {

            return;

        }

        magicChecked = false;// so far the whole packet was received

        in.readShort(); // skip the magic

        in.readByte(); // dont care about the protocol version so far

        byte type = in.readByte();

        byte status = in.readByte();

        long contextId = in.readLong();

        byte[] body = new byte[in.readInt()];

        in.readBytes(body);

        RpcMessage message = null;

        MessageType messageType = MessageType.valueOf(type);

        if (messageType == MessageType.RESPONSE) {

            message = new RpcResponse(contextId);

            ((RpcResponse) message).setStatus(InvocationStatus.valueOf(status));

        } else {

            message = new RpcRequest(contextId);

        }

        message.setType(messageType);

        message.setData(body);

        list.add(message);

    }

}

可以看到,我們解決半包問題的時候,是判斷有沒有收到我們期望收到的報文,如果沒有,直接在 decode 方法里面 return,等有更多的報文被收到的時候,netty 會自動幫我們調(diào)起 decode 方法。而我們解決粘包問題的思路也很清晰,那就是一次只處理一個報文,不去動后面的報文內(nèi)容。

還需要注意的是,在 netty 中,對于 ByteBuf 的 get 是不會消費掉報文的,而 read 是會消費掉報文的。當不確定報文是否收完整的時候,我們都是用 get開頭的方法去試探性地驗證報文是否接收完全,當確定報文接收完全后,我們才用 read 開頭的方法去消費這段報文。

encode

直接貼代碼,參考前文提到的協(xié)議格式閱讀以下代碼:

/**

 *

 * 協(xié)議編碼器

 *

 * @author beanlam

 * @version 1.0

 */

public class ProtocolEncoder extends MessageToByteEncoder {

    @Override

    protected void encode(ChannelHandlerContext ctx, RpcMessage rpcMessage, ByteBuf out)

            throws Exception {

        byte status;

        byte[] data = (byte[]) rpcMessage.getData();

        if (rpcMessage instanceof RpcRequest) {

            RpcRequest request = (RpcRequest) rpcMessage;

            status = InvocationStatus.OK.getCode();

        } else {

            RpcResponse response = (RpcResponse) rpcMessage;

            status = response.getStatus().getCode();

        }

        out.writeShort(ProtocolMetaData.MAGIC);

        out.writeByte(ProtocolMetaData.VERSION);

        out.writeByte(rpcMessage.getType().getCode());

        out.writeByte(status);

        out.writeLong(rpcMessage.getContextId());

        out.writeInt(data.length);

        out.writeBytes(data);

    }

}

文章版權歸作者所有,未經(jīng)允許請勿轉載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72065.html

相關文章

  • 簡易RPC框架:序列化機制

    摘要:由于我們還未談到具體的調(diào)用機制,因此暫且認為就是把一個包含了調(diào)用信息的對象,從經(jīng)過序列化,變成一串二進制流,發(fā)送到了端。 概述 在上一篇文章《簡易RPC框架:基于 netty 的協(xié)議編解碼》中談到對于協(xié)議的 decode 和 encode,在談 decode 之前,必須先要知道 encode 的過程是什么,它把什么東西轉化成了二進制協(xié)議。由于我們還未談到具體的 RPC 調(diào)用機制,因此暫...

    walterrwu 評論0 收藏0
  • 手把手教你基于Netty實現(xiàn)一個基礎RPC框架(通俗易懂)

    摘要:是一個分布式服務框架,以及治理方案。手寫注意要點手寫注意要點基于上文中對于協(xié)議的理解,如果我們自己去實現(xiàn),需要考慮哪些技術呢其實基于圖的整個流程應該有一個大概的理解。基于手寫實現(xiàn)基于手寫實現(xiàn)理解了協(xié)議后,我們基于來實現(xiàn)一個通信框架。閱讀這篇文章之前,建議先閱讀和這篇文章關聯(lián)的內(nèi)容。[1]詳細剖析分布式微服務架構下網(wǎng)絡通信的底層實現(xiàn)原理(圖解)[2][年薪60W的技巧]工作了5年,你真的理解N...

    番茄西紅柿 評論0 收藏2637
  • Netty(三) 什么是 TCP 拆、粘包?如何解決?

    摘要:是一個面向字節(jié)流的協(xié)議,它是性質(zhì)是流式的,所以它并沒有分段。可基于分隔符解決。編解碼的主要目的就是為了可以編碼成字節(jié)流用于在網(wǎng)絡中傳輸持久化存儲。 showImg(https://segmentfault.com/img/remote/1460000015895049); 前言 記得前段時間我們生產(chǎn)上的一個網(wǎng)關出現(xiàn)了故障。 這個網(wǎng)關邏輯非常簡單,就是接收客戶端的請求然后解析報文最后發(fā)送...

    YanceyOfficial 評論0 收藏0
  • RPC框架原理及從零實現(xiàn)系列博客(一):思路篇

    摘要:等之所以支持跨語言,是因為他們自己定義了一套結構化數(shù)據(jù)存儲格式,如的,用于編解碼對象,作為各個語言通信的中間協(xié)議。 前段時間覺得自己一直用別人的框架,站在巨人的肩膀上,也該自己造造輪子了 一時興起 就著手寫起了RPC框架 這里寫了系列博客拿給大家分享下 這篇是開篇的思路篇 項目最終的代碼放在了我的github上https://github.com/wephone/Me... 歡迎sta...

    tracy 評論0 收藏0
  • Netty3文檔翻譯(二)

    摘要:豐富的緩存數(shù)據(jù)結構使用它自己的緩存來表示字節(jié)序列而不是的。針對有一個定義良好的事件模型。有一些協(xié)議是多層的建立在其他低級協(xié)議基礎上。此外,甚至不是完全線程安全的。協(xié)議由標準化為。協(xié)議緩存整合是一個高效二進制協(xié)議的快速實現(xiàn)。 Chapter 2、結構概覽 這一節(jié)我們將確認Netty提供的核心功能是什么,以及它們怎么構成一個完整的網(wǎng)絡應用開發(fā)堆棧。 1、豐富的緩存數(shù)據(jù)結構 Netty使用它...

    Zhuxy 評論0 收藏0

發(fā)表評論

0條評論

最新活動
閱讀需要支付1元查看
<