国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Netty(三) 什么是 TCP 拆、粘包?如何解決?

YanceyOfficial / 2060人閱讀

摘要:是一個(gè)面向字節(jié)流的協(xié)議,它是性質(zhì)是流式的,所以它并沒有分段。可基于分隔符解決。編解碼的主要目的就是為了可以編碼成字節(jié)流用于在網(wǎng)絡(luò)中傳輸持久化存儲(chǔ)。

前言

記得前段時(shí)間我們生產(chǎn)上的一個(gè)網(wǎng)關(guān)出現(xiàn)了故障。

這個(gè)網(wǎng)關(guān)邏輯非常簡單,就是接收客戶端的請(qǐng)求然后解析報(bào)文最后發(fā)送短信。

但這個(gè)請(qǐng)求并不是常見的 HTTP ,而是利用 Netty 自定義的協(xié)議。

有個(gè)前提是:網(wǎng)關(guān)是需要讀取一段完整的報(bào)文才能進(jìn)行后面的邏輯。

問題是有天突然發(fā)現(xiàn)網(wǎng)關(guān)解析報(bào)文出錯(cuò),查看了客戶端的發(fā)送日志也沒發(fā)現(xiàn)問題,最后通過日志發(fā)現(xiàn)收到了許多不完整的報(bào)文,有些還多了。

于是想會(huì)不會(huì)是 TCP 拆、粘包帶來的問題,最后利用 Netty 自帶的拆包工具解決了該問題。

這便有了此文。

TCP 協(xié)議

問題雖然解決了,但還是得想想原因,為啥會(huì)這樣?打破砂鍋問到底才是一個(gè)靠譜的程序員。

這就得從 TCP 這個(gè)協(xié)議說起了。

TCP 是一個(gè)面向字節(jié)流的協(xié)議,它是性質(zhì)是流式的,所以它并沒有分段。就像水流一樣,你沒法知道什么時(shí)候開始,什么時(shí)候結(jié)束。

所以他會(huì)根據(jù)當(dāng)前的套接字緩沖區(qū)的情況進(jìn)行拆包或是粘包。

下圖展示了一個(gè) TCP 協(xié)議傳輸?shù)倪^程:

發(fā)送端的字節(jié)流都會(huì)先傳入緩沖區(qū),再通過網(wǎng)絡(luò)傳入到接收端的緩沖區(qū)中,最終由接收端獲取。

當(dāng)我們發(fā)送兩個(gè)完整包到接收端的時(shí)候:

正常情況會(huì)接收到兩個(gè)完整的報(bào)文。

但也有以下的情況:

接收到的是一個(gè)報(bào)文,它是由發(fā)送的兩個(gè)報(bào)文組成的,這樣對(duì)于應(yīng)用程序來說就很難處理了(這樣稱為粘包)。

還有可能出現(xiàn)上面這樣的雖然收到了兩個(gè)包,但是里面的內(nèi)容卻是互相包含,對(duì)于應(yīng)用來說依然無法解析(拆包)。

對(duì)于這樣的問題只能通過上層的應(yīng)用來解決,常見的方式有:

在報(bào)文末尾增加換行符表明一條完整的消息,這樣在接收端可以根據(jù)這個(gè)換行符來判斷消息是否完整。

將消息分為消息頭、消息體。可以在消息頭中聲明消息的長度,根據(jù)這個(gè)長度來獲取報(bào)文(比如 808 協(xié)議)。

規(guī)定好報(bào)文長度,不足的空位補(bǔ)齊,取的時(shí)候按照長度截取即可。

以上的這些方式我們?cè)?Netty 的 pipline 中里加入對(duì)應(yīng)的解碼器都可以手動(dòng)實(shí)現(xiàn)。

但其實(shí) Netty 已經(jīng)幫我們做好了,完全可以開箱即用。

比如:

LineBasedFrameDecoder 可以基于換行符解決。

DelimiterBasedFrameDecoder 可基于分隔符解決。

FixedLengthFrameDecoder 可指定長度解決。

字符串拆、粘包

下面來模擬一下最簡單的字符串傳輸。

還是在之前的

https://github.com/crossoverJie/netty-action

進(jìn)行演示。

在 Netty 客戶端中加了一個(gè)入口可以循環(huán)發(fā)送 100 條字符串報(bào)文到接收端:

    /**
     * 向服務(wù)端發(fā)消息 字符串
     * @param stringReqVO
     * @return
     */
    @ApiOperation("客戶端發(fā)送消息,字符串")
    @RequestMapping(value = "sendStringMsg", method = RequestMethod.POST)
    @ResponseBody
    public BaseResponse sendStringMsg(@RequestBody StringReqVO stringReqVO){
        BaseResponse res = new BaseResponse();

        for (int i = 0; i < 100; i++) {
            heartbeatClient.sendStringMsg(stringReqVO.getMsg()) ;
        }

        // 利用 actuator 來自增
        counterService.increment(Constants.COUNTER_CLIENT_PUSH_COUNT);

        SendMsgResVO sendMsgResVO = new SendMsgResVO() ;
        sendMsgResVO.setMsg("OK") ;
        res.setCode(StatusEnum.SUCCESS.getCode()) ;
        res.setMessage(StatusEnum.SUCCESS.getMessage()) ;
        return res ;
    }
    
    
    
    /**
     * 發(fā)送消息字符串
     *
     * @param msg
     */
    public void sendStringMsg(String msg) {
        ByteBuf message = Unpooled.buffer(msg.getBytes().length) ;
        message.writeBytes(msg.getBytes()) ;
        ChannelFuture future = channel.writeAndFlush(message);
        future.addListener((ChannelFutureListener) channelFuture ->
                LOGGER.info("客戶端手動(dòng)發(fā)消息成功={}", msg));

    }

服務(wù)端直接打印即可:

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
        LOGGER.info("收到msg={}", msg);

    }

順便提一下,這里加的有一個(gè)字符串的解碼器:.addLast(new StringDecoder()) 其實(shí)就是把消息解析為字符串。

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception {
        out.add(msg.toString(charset));
    }

在 Swagger 中調(diào)用了客戶端的接口用于給服務(wù)端發(fā)送了 100 次消息:

正常情況下接收端應(yīng)該打印 100 次 hello 才對(duì),但是查看日志會(huì)發(fā)現(xiàn):

收到的內(nèi)容有完整的、多的、少的、拼接的;這也就對(duì)應(yīng)了上面提到的拆包、粘包。

該怎么解決呢?這便可采用之前提到的 LineBasedFrameDecoder 利用換行符解決。

利用 LineBasedFrameDecoder 解決問題

LineBasedFrameDecoder 解碼器使用非常簡單,只需要在 pipline 鏈條上添加即可。

//字符串解析,換行防拆包
.addLast(new LineBasedFrameDecoder(1024))
.addLast(new StringDecoder())

構(gòu)造函數(shù)中傳入了 1024 是指報(bào)的長度最大不超過這個(gè)值,具體可以看下文的源碼分析。

然后我們?cè)龠M(jìn)行一次測(cè)試看看結(jié)果:

注意,由于 LineBasedFrameDecoder 解碼器是通過換行符來判斷的,所以在發(fā)送時(shí),一條完整的消息需要加上 

最終的結(jié)果:

仔細(xì)觀察日志,發(fā)現(xiàn)確實(shí)沒有一條被拆、粘包。

LineBasedFrameDecoder 的原理

目的達(dá)到了,來看看它的實(shí)現(xiàn)原理:

第一步主要就是 findEndOfLine 方法去找到當(dāng)前報(bào)文中是否存在分隔符,存在就會(huì)返回分隔符所在的位置。

判斷是否需要丟棄,默認(rèn)為 false ,第一次走這個(gè)邏輯(下文會(huì)判斷是否需要改為 true)。

如果報(bào)文中存在換行符,就會(huì)將數(shù)據(jù)截取到那個(gè)位置。

如果不存在換行符(有可能是拆包、粘包),就看當(dāng)前報(bào)文的長度是否大于預(yù)設(shè)的長度。大于則需要緩存這個(gè)報(bào)文長度,并將 discarding 設(shè)為 true。

如果是需要丟棄時(shí),判斷是否找到了換行符,存在則需要丟棄掉之前記錄的長度然后截取數(shù)據(jù)。

如果沒有找到換行符,則將之前緩存的報(bào)文長度進(jìn)行累加,用于下次拋棄。

從這個(gè)邏輯中可以看出就是尋找報(bào)文中是否包含換行符,并進(jìn)行相應(yīng)的截取。

由于是通過緩沖區(qū)讀取的,所以即使這次沒有換行符的數(shù)據(jù),只要下一次的報(bào)文存在換行符,上一輪的數(shù)據(jù)也不會(huì)丟。

高效的編碼方式 Google Protocol

上面提到的其實(shí)就是在解碼中進(jìn)行操作,我們也可以自定義自己的拆、粘包工具。

編解碼的主要目的就是為了可以編碼成字節(jié)流用于在網(wǎng)絡(luò)中傳輸、持久化存儲(chǔ)。

Java 中也可以實(shí)現(xiàn) Serializable 接口來實(shí)現(xiàn)序列化,但由于它性能等原因在一些 RPC 調(diào)用中用的很少。

Google Protocol 則是一個(gè)高效的序列化框架,下面來演示在 Netty 中如何使用。

安裝

首先第一步自然是安裝:

在官網(wǎng)下載對(duì)應(yīng)的包。

本地配置環(huán)境變量:

當(dāng)執(zhí)行 protoc --version 出現(xiàn)以下結(jié)果表明安裝成功:

定義自己的協(xié)議格式

接著是需要按照官方要求的語法定義自己的協(xié)議格式。

比如我這里需要定義一個(gè)輸入輸出的報(bào)文格式:

BaseRequestProto.proto:

syntax = "proto2";

package protocol;

option java_package = "com.crossoverjie.netty.action.protocol";
option java_outer_classname = "BaseRequestProto";

message RequestProtocol {
  required int32 requestId = 2;
  required string reqMsg = 1;
  

}

BaseResponseProto.proto:

syntax = "proto2";

package protocol;

option java_package = "com.crossoverjie.netty.action.protocol";
option java_outer_classname = "BaseResponseProto";

message ResponseProtocol {
  required int32 responseId = 2;
  required string resMsg = 1;
  

}

再通過

protoc --java_out=/dev BaseRequestProto.proto BaseResponseProto.proto

protoc 命令將剛才定義的協(xié)議格式轉(zhuǎn)換為 Java 代碼,并生成在 /dev 目錄。

只需要將生成的代碼拷貝到我們的項(xiàng)目中,同時(shí)引入依賴:


    com.google.protobuf
    protobuf-java
    3.4.0

利用 Protocol 的編解碼也非常簡單:

public class ProtocolUtil {

    public static void main(String[] args) throws InvalidProtocolBufferException {
        BaseRequestProto.RequestProtocol protocol = BaseRequestProto.RequestProtocol.newBuilder()
                .setRequestId(123)
                .setReqMsg("你好啊")
                .build();

        byte[] encode = encode(protocol);

        BaseRequestProto.RequestProtocol parseFrom = decode(encode);

        System.out.println(protocol.toString());
        System.out.println(protocol.toString().equals(parseFrom.toString()));
    }

    /**
     * 編碼
     * @param protocol
     * @return
     */
    public static byte[] encode(BaseRequestProto.RequestProtocol protocol){
        return protocol.toByteArray() ;
    }

    /**
     * 解碼
     * @param bytes
     * @return
     * @throws InvalidProtocolBufferException
     */
    public static BaseRequestProto.RequestProtocol decode(byte[] bytes) throws InvalidProtocolBufferException {
        return BaseRequestProto.RequestProtocol.parseFrom(bytes);
    }
}

利用 BaseRequestProto 來做一個(gè)演示,先編碼再解碼最后比較最終的結(jié)果是否相同。答案肯定是一致的。

利用 protoc 命令生成的 Java 文件里已經(jīng)幫我們把編解碼全部都封裝好了,只需要簡單調(diào)用就行了。

可以看出 Protocol 創(chuàng)建對(duì)象使用的是構(gòu)建者模式,對(duì)使用者來說清晰易讀,更多關(guān)于構(gòu)建器的內(nèi)容可以參考這里。

更多關(guān)于 Google Protocol 內(nèi)容請(qǐng)查看官方開發(fā)文檔。

結(jié)合 Netty

Netty 已經(jīng)自帶了對(duì) Google protobuf 的編解碼器,也是只需要在 pipline 中添加即可。

server 端:

// google Protobuf 編解碼
.addLast(new ProtobufDecoder(BaseRequestProto.RequestProtocol.getDefaultInstance()))
.addLast(new ProtobufEncoder())

客戶端:

// google Protobuf 編解碼

.addLast(new ProtobufDecoder(BaseResponseProto.ResponseProtocol.getDefaultInstance()))

.addLast(new ProtobufEncoder())
稍微注意的是,在構(gòu)建 ProtobufDecoder 時(shí)需要顯式指定解碼器需要解碼成什么類型。

我這里服務(wù)端接收的是 BaseRequestProto,客戶端收到的是服務(wù)端響應(yīng)的 BaseResponseProto 所以就設(shè)置了對(duì)應(yīng)的實(shí)例。

同樣的提供了一個(gè)接口向服務(wù)端發(fā)送消息,當(dāng)服務(wù)端收到了一個(gè)特殊指令時(shí)也會(huì)向客戶端返回內(nèi)容:

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, BaseRequestProto.RequestProtocol msg) throws Exception {
        LOGGER.info("收到msg={}", msg.getReqMsg());

        if (999 == msg.getRequestId()){
            BaseResponseProto.ResponseProtocol responseProtocol = BaseResponseProto.ResponseProtocol.newBuilder()
                    .setResponseId(1000)
                    .setResMsg("服務(wù)端響應(yīng)")
                    .build();
            ctx.writeAndFlush(responseProtocol) ;
        }

    }

在 swagger 中調(diào)用相關(guān)接口:

在日志可以看到服務(wù)端收到了消息,同時(shí)客戶端也收到了返回:

雖說 Netty 封裝了 Google Protobuf 相關(guān)的編解碼工具,其實(shí)查看它的編碼工具就會(huì)發(fā)現(xiàn)也是利用上文提到的 api 實(shí)現(xiàn)的。

Protocol 拆、粘包

Google Protocol 的使用確實(shí)非常簡單,但還是有值的注意的地方,比如它依然會(huì)有拆、粘包問題。

不妨模擬一下:

連續(xù)發(fā)送 100 次消息看服務(wù)端收到的怎么樣:

會(huì)發(fā)現(xiàn)服務(wù)端在解碼的時(shí)候報(bào)錯(cuò),其實(shí)就是被拆、粘包了。

這點(diǎn) Netty 自然也考慮到了,所以已經(jīng)提供了相關(guān)的工具。

//拆包解碼
.addLast(new ProtobufVarint32FrameDecoder())
.addLast(new ProtobufVarint32LengthFieldPrepender())

只需要在服務(wù)端和客戶端加上這兩個(gè)編解碼工具即可,再來發(fā)送一百次試試。

查看日志發(fā)現(xiàn)沒有出現(xiàn)一次異常,100 條信息全部都接收到了。

這個(gè)編解碼工具可以簡單理解為是在消息體中加了一個(gè) 32 位長度的整形字段,用于表明當(dāng)前消息長度。

總結(jié)

網(wǎng)絡(luò)這塊同樣是計(jì)算機(jī)的基礎(chǔ),由于近期在做相關(guān)的工作所以接觸的比較多,也算是給大學(xué)補(bǔ)課了。

后面會(huì)接著更新 Netty 相關(guān)的內(nèi)容,最后會(huì)產(chǎn)出一個(gè)高性能的 HTTP 以及 RPC 框架,敬請(qǐng)期待。

上文相關(guān)的代碼:

https://github.com/crossoverJie/netty-action

號(hào)外

最近在總結(jié)一些 Java 相關(guān)的知識(shí)點(diǎn),感興趣的朋友可以一起維護(hù)。

地址: https://github.com/crossoverJie/Java-Interview

歡迎關(guān)注公眾號(hào)一起交流:

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/76612.html

相關(guān)文章

  • 徹底理解Netty,這一篇文章就夠了

    摘要:如果什么事都沒得做,它也不會(huì)死循環(huán),它會(huì)將線程休眠起來,直到下一個(gè)事件來了再繼續(xù)干活,這樣的一個(gè)線程稱之為線程。而請(qǐng)求處理邏輯既可以使用單獨(dú)的線程池進(jìn)行處理,也可以跟放在讀寫線程一塊處理。 Netty到底是什么 從HTTP說起 有了Netty,你可以實(shí)現(xiàn)自己的HTTP服務(wù)器,F(xiàn)TP服務(wù)器,UDP服務(wù)器,RPC服務(wù)器,WebSocket服務(wù)器,Redis的Proxy服務(wù)器,MySQL的P...

    yy13818512006 評(píng)論0 收藏0
  • 【Java】幾道讓你拿offer的面試題

    摘要:的方法,的默認(rèn)實(shí)現(xiàn)會(huì)判斷是否是類型注意自動(dòng)拆箱,自動(dòng)裝箱問題。適應(yīng)自旋鎖鎖競爭是下的,會(huì)經(jīng)過用戶態(tài)到內(nèi)核態(tài)的切換,是比較花時(shí)間的。在中引入了自適應(yīng)的自旋鎖,說明自旋的時(shí)間不固定,要不要自旋變得越來越聰明。 前言 只有光頭才能變強(qiáng) 之前在刷博客的時(shí)候,發(fā)現(xiàn)一些寫得比較好的博客都會(huì)默默收藏起來。最近在查閱補(bǔ)漏,有的知識(shí)點(diǎn)比較重要的,但是在之前的博客中還沒有寫到,于是趁著閑整理一下。 文本的...

    張春雷 評(píng)論0 收藏0
  • 如何實(shí)現(xiàn)一個(gè)分布式RPC框架

    摘要:趁實(shí)習(xí)前的這段業(yè)余時(shí)間,我實(shí)現(xiàn)了一個(gè)輕量級(jí)的分布式框架,名字叫做,代碼量不大,但是麻雀雖小卻五臟俱全。目前支持和兩種序列化框架。使用實(shí)現(xiàn)服務(wù)注冊(cè)與發(fā)現(xiàn)功能。代碼實(shí)現(xiàn)是我學(xué)習(xí)驗(yàn)證過程中誕生的一個(gè)輕量級(jí)分布式框架,代碼放在了。 遠(yuǎn)程過程調(diào)用(Remote Procedure Call,RPC)是一個(gè)計(jì)算機(jī)通信協(xié)議。該協(xié)議允許運(yùn)行于一臺(tái)計(jì)算機(jī)的程序調(diào)用另一臺(tái)計(jì)算機(jī)的子程序,而程序員無需額外地...

    Vultr 評(píng)論0 收藏0
  • netty

    摘要:設(shè)置每個(gè)數(shù)據(jù)包的大小如個(gè)字節(jié),如果某個(gè)數(shù)據(jù)包不足個(gè)字節(jié)可能會(huì)出現(xiàn)丟包的情況,即該數(shù)據(jù)包未從一個(gè)端到另一個(gè)端,此時(shí)需要用空格或者既定的符號(hào)補(bǔ)充在數(shù)據(jù)包之間使用一些字符進(jìn)行分割如號(hào)之類的,解析的時(shí)候先處理掉分隔符再拿到各個(gè)數(shù)據(jù)包就好了。 netty 概念: Netty是由JBOSS提供的一個(gè)java開源框架。Netty提供異步的、事件驅(qū)動(dòng)的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能、高可靠...

    cfanr 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<