摘要:啟動一個線程,獲取阻塞隊列的元素,當通道發生事件時,隊列會被放入事件對象啟動一個定時器,每個執行一次,掃描,超時沒有獲取結果的會被移除掉客戶端跟服務器端差不多。而這個對象會在傳輸之前進行編碼,消息接收到進行解碼。
rocketMQ通信模塊
Rocketmq的通信層是基于通信框架netty 4.0.21.Final之上做了簡單的協議封裝,基本的類圖如下:
通訊模塊是怎么進行的消息傳輸的先來看看服務器端啟動做了什么:
netty服務器啟動,監聽在8888;netty設置了一個心跳檢測器IdleStateHandler,讀寫超時時間為120s,在120s后都沒有讀寫操作將會觸發相應事件。
啟動一個線程,獲取阻塞隊列eventQueue的元素,當netty channel通道發生CONNECT, CLOSE,IDLE,EXCEPTION事件時,隊列會被放入事件對象
啟動一個定時器Timer,每個1s執行一次,掃描ResponseFuture,超時沒有獲取結果的會被移除掉
客戶端跟服務器端差不多。
rocketmq提供了三種通信方式:
一、invokeSyncImpl 同步調用(主要實現參見NettyRemotingAbstract.invokeSyncImpl)
同步調用是指客戶端發起遠程調用后,當前線程會被阻塞,直到服務器端返回結果或發生超時異常,我們在發送消息時需要同步知道消息發送成功還是失敗,一般使用這種方式。
我們知道,netty是異步基于事件驅動的,當我們使用netty向遠程服務器發送消息是通過channel.writeAndFlush方法,此方法是異步的,那我們如何同步的獲取服務器的返回結果呢?這里的做法是在向服務器發送消息時設置一個唯一的序列號,本地會通過上下文保存一個ResponseFuture對象在Map中,key就是這個唯一的序列號,value就是這個ResponseFuture對象,ResponseFuture對象會設置一個CountDownLatch,每當發送完消息后,就會調用CountDownLatch的await方法掛起當前線程;當服務器返回結果時也會攜帶之前客戶端傳遞過去的唯一序列號,這樣就可以找到ResponseFuture對象,再調用CountDownLatch的countDown方法,此時客戶端之前掛起的線程就會蘇醒過來,完成一次同步調用。
二、invokeAsyncImpl異步調用(主要實現參見NettyRemotingAbstract.invokeAsyncImpl)
客戶端發起遠程調用前會先設置一個InvokeCallback類,當然也是設置在ResponseFuture對象中,調用結束后不會等待結果,當服務器返回時也是跟同步調用一樣會在新的線程里面先找到ResponseFuture,然后執行回調接口也就是InvokeCallback的operationComplete方法。如果服務器返回結果超時,也會進行回調,客戶端可以根據相關的狀態來執行相關邏輯。
異步調用不會阻塞線程,調用后會立即返回,調用結果會在異步線程里面執行回調來獲取,使用Async需要控制好節奏,不能發送的太快以防止壓垮服務器端。所以在invokeAsyncImpl方法里面設置了一個信號量,默認是64個,只有獲取到許可的請求才能真正發起遠程調用。
三、invokeOnewayImpl 單向調用(主要實現參見NettyRemotingAbstract.invokeOnewayImpl)
客戶端發送請求后不會等待服務端返回的結果,并且會忽略服務端的處理結果;當前線程調用完畢,調用方并不關心服務器端的處理結果,也不會被阻塞,跟異步調用一樣需要控制好節奏以防壓垮服務器端。在invokeOnewayImpl方法里面也設置了一個信號量,默認是256個,只有獲取到許可的請求才能真正發起遠程調用。
三種通信方式的對比
調用方式 | 特點 | 使用場景 |
---|---|---|
Sync | 同步阻塞 | 需要同步獲取結果的場景 |
Async | 異步不阻塞 | 當前不需要結果,但是當服務器處理完后,需要做一些其他事情 |
Oneway | 異步不阻塞 | 不要需要結果,不保證消息一定發送成功 |
RemotingCommand是rocketMQ消息傳輸的媒介,所有的消息都會包裝成RemotingCommand來進行傳輸。而這個對象會在netty傳輸之前進行編碼,消息接收到進行解碼。
RemotingCommand是由頭部(header)和消息體(body)組成,消息發送的時候,頭部和消息體會分開進行編碼。那么RemotingCommand是如何組成的呢?
RemotingCommand的核心字段:
public class RemotingCommand{ private int code; private LanguageCode language = LanguageCode.JAVA; private int version = 0; private int opaque = requestId.getAndIncrement(); private int flag = 0; private String remark; private HashMap頭部(header)extFields; private transient CommandCustomHeader customHeader; private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer; private transient byte[] body; }
請求頭接收方和發起方的含義略有不同,下面的表格詳細的說明:
字段名 | 類型 | Request | Resposne |
---|---|---|---|
code | int | 請求操作代碼,接收方根據不同的代碼做不同的操作 | 應答結果代碼,0表示成功,非0表示各種錯誤代碼 |
language | 枚舉 | 請求方實現的語言,默認Java | 接收方實現的語言 |
version | int | 請求方版本 | 接收方版本 |
opaque | int | 請求方在同一連接上不同的請求標識代碼,多線程連接服用使用 | 接收方不做修改,直接返回 |
flag | int | 通信層的標志位 | 通信層的標志位 |
remark | String | 傳輸自定義文本信息 | 錯誤詳細描述 |
extFields | Map | 自定義字段 | 自定義字段 |
頭信息里面還包括了CommandCustomHeader的自定義的一些頭信息,會被通過反射的方式放在extFields字段里面
消息體消息體是直接變為byte數組,由客戶端自己序列化,這兩部分后一起放入netty傳輸的ByteBuffer中,一起傳輸到接收端
報文格式與序列化length | header length | headerData | bodyData |
---|---|---|---|
4個字節 | 4個字節(高一位字節表示序列化類型,低三位字節表示長度) |
length:表示整個數據包的長度 占4個字節
header length:表示header的長度(高一位字節表示序列化類型,低三位字節表示長度)
headerData的序列化有兩種方式:
json:使用fastjson進行序列化
自定義:使用bytebuffer自定義序列化
Netty服務器端在啟動時設置了TCP參數的含義SO_BACKLOG:1024
指定全連接隊列數,linux系統在文件/proc/sys/net/core/somaxconn指定,默認128;
還有一個半連接隊列數,linux在文件/proc/sys/net/ipv4/tcp_max_syn_backlog指定
SO_REUSEADDR:true
重用處于time_wait狀態下的連接
SO_KEEPALIVE:false
?;顧C制
TCP_NODELAY:true
關閉Nagle算法,Nagle算法可以降低網絡里小包的數量,從而提升網絡性能,關閉可以提高實時性
SO_SNDBUF:65535
發送緩存區大小
SO_RCVBUF:65535
接受緩存區大小
SO_RCVLOWAT:接收緩存水位線
SO_SNDLOWAT:發送緩存水位線
它們一般被I/O復用系統調用用來判斷socket是否可讀或可寫。當TCP接收緩沖區中可讀數據的總數大于其低水位標記時,I/O復用系統調用將通知應用程序可以從對應的socket上讀取數據;當TCP發送緩沖區中的空閑空間(可以寫入數據的空間)大于其低水位標記時,I/O復用系統調用將通知應用程序可以往對應的socket上寫入數據
在netty中好像沒有看到有設置這兩個參數
CONNECT_TIMEOUT_MILLIS:3000
連接超時時間
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/77012.html
摘要:具體可以參考消息隊列之具體可以參考實戰之快速入門十分鐘入門阿里中間件團隊博客是一個分布式的可分區的可復制的基于發布訂閱的消息系統主要用于大數據領域當然在分布式系統中也有應用。目前市面上流行的消息隊列就是阿里借鑒的原理用開發而得。 我自己總結的Java學習的系統知識點以及面試問題,目前已經開源,會一直完善下去,歡迎建議和指導歡迎Star: https://github.com/Snail...
摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳...
摘要:分布式高并發微服務問阿里京東螞蟻等大廠面試真題解析道跳槽漲薪必備精選面試題最新版大廠面試真題集點擊這里免費領取點擊這里免費領取 估計很多Java程序員平時主要的工作就是一些Web系統的業務開發,對于服務端IO程序以及網絡通信編程做得并不多,但是對于高級或者資深程序員來說,IO通信以及服務端編...
閱讀 1814·2021-10-20 13:49
閱讀 1356·2019-08-30 15:52
閱讀 2863·2019-08-29 16:37
閱讀 1033·2019-08-29 10:55
閱讀 3064·2019-08-26 12:14
閱讀 1649·2019-08-23 17:06
閱讀 3235·2019-08-23 16:59
閱讀 2544·2019-08-23 15:42