国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

高并發異步解耦利器:RocketMQ究竟強在哪里?

tainzhi / 3640人閱讀

摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。

image-20211017192453356

上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發展史:

image-20211006220855697

并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳細探討RocketMQ究竟好在哪里。

RocketMQ是一個分布式消息中間件,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可擴展性。它是阿里巴巴于2012年開源的第三代分布式消息中間件。

隨著阿里巴巴的電商業務不斷發展,需要一款更高性能的消息中間件,RocketMQ就是這個業務背景的產物。RocketMQ是一個分布式消息中間件,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可擴展性,它是阿里巴巴于2012年開源的第三代分布式消息中間件。RocketMQ經歷了多年雙十一的洗禮,在可用性、可靠性以及穩定性等方面都有出色的表現。值得一提的是,RocketMQ最初就是借鑒了Kafka進行改造開發而來的,所以熟悉Kafka的朋友,會發現RocketMQ的原理和Kafka有很多相似之處。

RocketMQ前身叫做MetaQ,在MeataQ發布3.0版本的時候改名為RocketMQ,其本質上的設計思路和Kafka類似,因為最初就是基于Kafka改造而來,經過不斷的迭代與版本升級,2016年11月21日,阿里巴巴向Apache軟件基金會捐贈了RocketMQ 。近年來被越來越多的國內企業使用。

本文帶大家從以下幾個方面詳細了解RocketMQ:

  • RocketMQ如何保證消息存儲的可靠性?
  • RocketMQ如何保證消息隊列服務的高可用?
  • 如何構建一個高可用的RocketMQ雙主雙從最小集群?
  • RocketMQ消息是如何存儲的?
  • RocketMQ是如何保證存取消息的效率的?
  • 如何實現基于Message Key的高效查詢?
  • 如何實現基于Message Id的高效查詢?
  • RocketMQ的Topic在集群中是如何存儲的?
  • Broker自動創建Topic會有什么問題?
  • RocketMQ如何保證消息投遞的順序性?
  • RocketMQ如何保證消息消費的順序性?
  • 實現分布式事務的手段有哪些?
  • RocketMQ如何實現事務消息?
  • RocketMQ事務消息是如何存儲的?

1. RocketMQ技術架構

RocketMQ的架構主要分為四部分,如下圖所示:

image-20211017212148402

  • Producer:消息生產者,支持集群方式部署;
  • Consumer:消息消費者,支持集群方式部署,支持pull,push模式獲取消息進行消費,支持集群和廣播方式消費;
  • NameServer:Topic路由注冊中心,類似于Dubbo中的zookeeper,支持Broker的動態注冊與發現;
    • 提供心跳檢測機制,檢查Broker是否存活;
    • 接收Broker集群的注冊信息,作為路由信息的基本數據;
    • NameServier各個實例不相互進行通信,每個NameServer都保存了一份完整的路由信息,這與zookeeper有所區別,不用作復雜的節點數據同步與選主過程;
  • BrokerServer:主要負責消息的存儲、投遞和查詢,以及服務高可用保證。BrokerServer包含以下幾個重要的子模塊:
    • Remoting Module:整個Broker的實體,負責處理來自clients端的請求;
    • Client Manager:負責管理客戶端(Producer/Consumer)和維護Consumer的Topic訂閱信息;
    • StoreService:提供方便簡單的API接口處理消息存儲到物理硬盤和查詢功能;
    • HA Service:高可用服務,提供Master Broker 和 Slave Broker之間的數據同步功能;
    • Index Service:根據特定的Message key對投遞到Broker的消息進行索引服務,以提供消息的快速查詢。

image-20211017212222438

2. RocketMQ執行原理

RocketMQ執行原理如下圖所示:

image-20211017212356716

  • 首先,啟動每個NameServer節點,共同構成一個NameServer Cluster。NameServer啟動后,監聽端口,等待Broker、Producer、Consumer的連接;
  • 然后啟動Broker的主從節點,這個時候Broker會與所有的NameServer建立并保持長連接,定時發送心跳包,把自己的信息(IP+端口號)以及存儲的所有Topic信息注冊到每個NameServer中。這樣NameServer集群中就有Topic和Broker的映射關系了;
  • 收發消息前,先創建Topic,創建Topic時需要指定該Topic要存儲在哪些Broker上,也可以在發送消息時自動創建Topic,每個Topic默認會分配4個Queue;
  • 啟動生產者,這個時候生產者會把信息注冊到NameServer中,并且從NameServer獲取Broker服務器,Queue等信息;
  • 啟動消費者,這個時候消費者會把信息注冊到NameServer中,并且從NameServer獲取Broker服務器,Queue等信息;
  • 生產者發送消息到Broker集群中的時候,會從所有的Master節點的對應Topic中選擇一個Queue,然后與Queue所在的Broker建立長連接從而向Broker投遞消息。消息實際上是存儲在了CommitLog文件中,而Queue文件里面存儲的實際是消息在CommitLog中的存儲位置信息;
  • 消費者從Broker集群中消費消息的時候,會通過特定的負載均衡算法,綁定一個消息隊列進行消費;
  • 消費者會定時(或者kill階段)把Queue的消費進度offset提交到Broker的consumerOffset.json文件中記錄起來;
  • 主節點和從節點之間可以是同步或者異步的進行數據復制,相關配置參數:
    • brokerRole,可選值:
      • ASYNC_MASTER:異步復制方式(異步雙寫),生產者寫入消息到Master之后,無需等到消息復制到Slave即可返回,消息的復制由旁路線程進行異步復制;
      • SYNC_MASTER:同步復制方式(同步雙寫),生產者寫入消息到Master之后,需要等到Slave復制成功才可以返回。如果有多個Slave,只需要有一個Slave復制成功,并成功應答,就算復制成功了。這里是否持久化到磁盤依賴于另一個參數:flushDiskType
      • SLAVE:從節點

3. RocketMQ集群

本節我們來看看一個雙主雙從的RocketMQ是如何搭建的。

集群配置參數說明:

在討論集群前,我們需要了解兩個關鍵的集群配置參數:brokerRoleflushDiskType。brokerRole在前一節已經介紹了,而flushDiskType則是刷盤方式的配置,主要有:

  • ASYNC_FLUSH: 異步刷盤
  • SYNC_FLUSH: 同步刷盤

3.1 如何保證消息存儲的可靠性?

brokerRole確定了主從同步是異步的還是同步的,flushDiskType確定了數據刷盤的方式是同步的還是異步的。

如果業務場景對消息丟失容忍度很低,可以采用SYNC_MASTER + ASYNC_FLUSH的方式,這樣只有master和slave在刷盤前同時掛掉,消息才會丟失,也就是說即使有一臺機器出故障,仍然能保證數據不丟

如果業務場景對消息丟失容忍度比較高,則可以采用ASYNC_MASTER + ASYNC_FLUSH的方式,這樣可以盡可能的提高消息的吞吐量。

3.2 如何保證消息隊列服務的高可用?

消費端的高可用

Master Broker支持讀和寫,Slave Broker只支持讀。

當Master不可用的時候,Consumer會自動切換到Slave進行讀,也就是說,當Master節點的機器出現故障后,Consumer仍然可以從Slave節點讀取消息,不影響消費端的消費程序。

生產端的高可用

集群配置參數說明:

  • brokerName: broker的名稱,需要把Master和Slave節點配置成相同的名稱,表示他們的主從關系,相同的brokerName的一組broker,組成一個broker組;
  • brokerId: broker的id,0表示Master節點的id,大于0表示Slave節點的id。

在RocketMQ中,機器的主從節點關系是提前配置好的,沒有類似Kafka的Master動態選主功能。

如果一個Master宕機了,要讓生產端程序繼續可以生產消息,您需要部署多個Master節點,組成多個broker組。這樣在創建Topic的時候,就可以把Topic的不同消息隊列分布在多個broker組中,即使某一個broker組的Master節點不可用了,其他組的Master節點仍然可用,保證了Producer可以繼續發送消息。

3.3 如何構建一個高可用的RocketMQ雙主雙從最小集群?

為了盡可能的保證消息不丟失,并且保證生產者和消費者的可用性,我們可以構建一個雙主雙從的集群,搭建的架構圖如下所示:

image-20211017212427244

部署架構說明:

  • 兩個Broker組,保證了其中一個Broker組的Master節點掛掉之后,另一個Master節點仍然可以接受某一個Topic的消息投遞;
  • 主從同步采用SYNC_MASTER,保證了生產者寫入消息到Master之后,需要等到Slave也復制成功,才返回消息投遞成功。這樣即使主節點或者從節點掛掉了,也不會導致丟數據;
  • 由于主節點有了從節點做備份,所以,落盤策略可以使用ASYNC_FLUSH,從而盡可能的提高消息的吞吐量;
  • 如果只提供兩臺服務器,要部署這個集群的情況下,可以把Broker Master1和Broker Slave2部署在一臺機器,Broker Master2和Broker Slave1部署在一臺機器。

關鍵配置參數

以下是關鍵的配置參數:

Broker Master1

# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.100# broker通信端口listenPort=10911# broker名稱brokerName=broker‐1# 0表示主節點brokerId=0# 2點進行消息刪除deleteWhen=02# 消息在磁盤上保留48小時fileReservedTime=48# 主從同步復制brokerRole=SYNC_MASTER# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創建TopicautoCreateTopicEnable=true# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐m

Broker Slave1

# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.101# broker通信端口listenPort=10911# broker名稱brokerName=broker‐1 # 非0表示從節點brokerId=1# 2點進行消息刪除deleteWhen=02# 消息在磁盤上保留48小時fileReservedTime=48# 從節點brokerRole=SLAVE# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創建TopicautoCreateTopicEnable=true # 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐s

Broker Master2

# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.102# broker通信端口listenPort=10911# broker名稱brokerName=broker‐2# 0表示主節點brokerId=0# 2點進行消息刪除deleteWhen=02# 消息在磁盤上保留48小時fileReservedTime=48# 主從同步復制brokerRole=SYNC_MASTER# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創建TopicautoCreateTopicEnable=true# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐m

Broker Slave2

# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.103# broker通信端口listenPort=10911# broker名稱brokerName=broker‐2# 非0表示從節點brokerId=1# 2點進行消息刪除deleteWhen=02# 消息在磁盤上保留48小時fileReservedTime=48# 從節點brokerRole=SLAVE# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創建TopicautoCreateTopicEnable=true# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐s

寫了那么多頂層架構圖,不寫寫底層內幕,就不是IT宅(itzhai.com)的文章風格,接下來,我們就來看看底層存儲架構。

4. RocketMQ存儲架構

我們在broker.conf文件中配置了消息存儲的根目錄:

# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐m

進入這個目錄,我們可以發現如下的目錄結構:

image-20211017212520062

其中:

  • abort:該文件在broker啟動時創建,關閉時刪除,如果broker異常退出,則文件會存在,在下次啟動時會走修復流程;
  • checkpoint:檢查點,主要存放以下內容:
    • physicMsgTimestamp:commitlog文件最后一次落盤時間;
    • logicsMsgTimestamp:consumequeue最后一次落盤時間;
    • indexMsgTimestamp:索引文件最后一次落盤時間;
  • commitlog:存放消息的完整內容,所有的topic消息都會通過文件追加的形式寫入到該文件中;
  • config:消息隊列的配置文件,包括了topic配置,消費的偏移量等信息。其中consumerOffset.json文件存放消息隊列消費的進度;
  • consumequeue:topic的邏輯隊列,在消息存放到commitlog之后,會把消息的存放位置記錄到這里,只有記錄到這里的消息,才能被消費者消費;
  • index:消息索引文件,通過Message Key查詢消息時,是通過該文件進行檢索查詢的。

4.1 RocketMQ消息是如何存儲的

下面我們來看看關鍵的commitlog以及consumequeue:

image-20211017212554757

消息投遞到Broker之后,是先把實際的消息內容存放到CommitLog中的,然后再把消息寫入到對應主題的ConsumeQueue中。其中:

CommitLog消息的物理存儲文件,存儲實際的消息內容。每個Broker上面的CommitLog被該Broker上所有的ConsumeQueue共享。

單個文件大小默認為1G,文件名長度為20位,左邊補零,剩余為起始偏移量。預分配好空間,消息順序寫入日志文件。當文件滿了,則寫入下一個文件,下一個文件的文件名基于文件第一條消息的偏移量進行命名;

ConsumeQueue消息的邏輯隊列,相當于CommitLog的索引文件。RocketMQ是基于Topic主題訂閱模式實現的,每個Topic下會創建若干個邏輯上的消息隊列ConsumeQueue,在消息寫入到CommitLog之后,通過Broker的后臺服務線程(ReputMessageService)不停地分發請求并異步構建ConsumeQueue和IndexFile(索引文件,后面介紹),然后把每個ConsumeQueue需要的消息記錄到各個ConsumeQueue中

image-20211017212636906

ConsumeQueue主要記錄8個字節的commitLogOffset(消息在CommitLog中的物理偏移量), 4個字節的msgSize(消息大小), 8個字節的TagHashcode,每個元素固定20個字節。

image-20211017212701949

ConsumeQueue相當于CommitLog文件的索引,可以通過ConsumeQueue快速從很大的CommitLog文件中快速定位到需要的消息。

ConsumeQueue的存儲結構

主題消息隊列:在consumequeue目錄下,按照topic的維度存儲消息隊列。

重試消息隊列:如果topic中的消息消費失敗,則會把消息發到重試隊列,重新隊列按照消費端的GroupName來分組,命名規則:%RETRY%ConsumerGroupName

死信消息隊列:如果topic中的消息消費失敗,并且超過了指定重試次數之后,則會把消息發到死信隊列,死信隊列按照消費端的GroupName來分組,命名規則:%DLQ%ConsumerGroupName

假設我們現在有一個topic:itzhai-test,消費分組:itzhai_consumer_group,當消息消費失敗之后,我們查看consumequeue目錄,會發現多處了一個重試隊列:

image-20211017212858807

我們可以在RocketMQ的控制臺看到這個重試消息隊列的主題和消息:

image-20210919111252088

image-20211017113351723

如果一直重試失敗,達到一定次數之后(默認是16次,重試時間:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h),就會把消息投遞到死信隊列:

image-20211017212936328

4.2 RocketMQ是如何保證存取消息的效率的

4.2.1 如何保證高效寫

每條消息的長度是不固定的,為了提高寫入的效率,RocketMQ預先分配好1G空間的CommitLog文件,采用順序寫的方式寫入消息,大大的提高寫入的速度。

RocketMQ中消息刷盤主要可以分為同步刷盤和異步刷盤兩種,通過flushDiskType參數進行配置。如果需要提高寫消息的效率,降低延遲,提高MQ的性能和吞吐量,并且不要求消息數據存儲的高可靠性,可以把刷盤策略設置為異步刷盤。

4.2.2 如何保證高效讀

為了提高讀取的效率,RocketMQ使用ConsumeQueue作為消費消息的索引,使用IndexFile作為基于消息key的查詢的索引。下面來詳細介紹下。

4.2.2.1 ConsumeQueue

讀取消息是隨機讀的,為此,RocketMQ專門建立了ConsumeQueue索引文件,每次先從ConsumeQueue中獲取需要的消息的地址,消息大小,然后從CommitLog文件中根據地址直接讀取消息內容。在讀取消息內容的過程中,也盡量利用到了操作系統的頁緩存機制,進一步加速讀取速度。

ConsumeQueue由于每個元素大小是固定的,因此可以像訪問數組一樣訪問每個消息元素。并且占用空間很小,大部分的ConsumeQueue能夠被全部載入內存,所以這個索引查找的速度很快。每個ConsumeQueue文件由30w個元素組成,占用空間在6M以內。每個文件默認大小為600萬個字節,當一個ConsumeQueue類型的文件寫滿之后,則寫入下一個文件。

4.2.2.2 IndexFile為什么按照Message Key查詢效率高?

我們在RocketMQ的store目錄中可以發現有一個index目錄,這個是一個用于輔助提高查詢消息效率的索引文件。通過該索引文件實現基于消息key來查詢消息的功能

物理存儲結構

IndexFile索引文件物理存儲結構如下圖所示:

image-20211017213017099

  • Header:索引頭文件,40 bytes,包含以下信息:
    • beginTimestamp:索引文件中第一個索引消息存入Broker的時間戳;
    • endTimestamp:索引文件中最后一個索引消息存入Broker的時間戳
    • beginPHYOffset:索引文件中第一個索引消息在CommitLog中的偏移量;
    • endPhyOffset:索引文件中最后一個索引消息在CommitLog中的偏移量;
    • hashSlotCount:構建索引使用的slot數量;
    • indexCount:索引的總數;
  • Slot Table:槽位表,類似于Redis的Slot,或者哈希表的key,使用消息的key的hashcode與slotNum取模可以得到具體的槽的位置。每個槽位占4 bytes,一個IndexFile可以存儲500w個slot;
  • Index Linked List:消息的索引內容,如果哈希取模后發生槽位碰撞,則構建成鏈表,一個IndexFile可以存儲2000w個索引:
    • Key Hash:消息的哈希值;
    • Commit Log Offset:消息在CommitLog中的偏移量;
    • Timestamp:消息存儲的時間戳;
    • Next Index Offset:下一個索引的位置,如果消息取模后發生槽位槽位碰撞,則通過此字段把碰撞的消息構成鏈表。

每個IndexFile文件的大小:40b + 4b * 5000000 + 20b * 20000000 = 420000040b,約為400M。

邏輯存儲結構

IndexFile索引文件的邏輯存儲結構如下圖所示:

image-20211017213111748

IndexFile邏輯上是基于哈希表來實現的,Slot Table為哈希鍵,Index Linked List中存儲的為哈希值。

4.2.2.3 為什么按照MessageId查詢效率高?

RocketMQ中的MessageId的長度總共有16字節,其中包含了:消息存儲主機地址(IP地址和端口),消息Commit Log offset。

按照MessageId查詢消息的流程:Client端從MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封裝成一個RPC請求后通過Remoting通信層發送(業務請求碼:VIEW_MESSAGE_BY_ID)。Broker端走的是QueryMessageProcessor,讀取消息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄并解析成一個完整的消息返回

4.3 RocketMQ集群是如何做數據分區的?

我們繼續看看在集群模式下,RocketMQ的Topic數據是如何做分區的。IT宅(itzhai.com)提醒大家,實踐出真知。這里我們部署兩個Master節點:

image-20211017113659072

4.3.1 RocketMQ的Topic在集群中是如何存儲的

我們通過手動配置每個Broker中的Topic,以及ConsumeQueue數量,來實現Topic的數據分片,如,我們到集群中手動配置這樣的Topic:

  • broker-a創建itzhai-com-test-1,4個隊列;
  • broker-b創建itzhai-com-test-1,2個隊列。

創建完成之后,Topic分片集群分布如下:

image-20211017182449434

即:

image-20211017182628084

可以發現,RocketMQ是把Topic分片存儲到各個Broker節點中,然后在把Broker節點中的Topic繼續分片為若干等分的ConsumeQueue,從而提高消息的吞吐量。ConsumeQueue是作為負載均衡資源分配的基本單元

這樣把Topic的消息分區到了不同的Broker上,從而增加了消息隊列的數量,從而能夠支持更塊的并發消費速度(只要有足夠的消費者)。

4.3.2 Broker自動創建Topic會有什么問題?

假設設置為通過Broker自動創建Topic(autoCreateTopicEnable=true),并且Producer端設置Topic消息隊列數量設置為4,也就是默認值:

producer.setDefaultTopicQueueNums(4);

嘗試往一個新的 topic itzhai-test-queue-1連續發送10條消息,發送完畢之后,查看Topic狀態:

image-20211017114900279

我們可以發現,在兩個broker上面都創建了itzhai-test-queue-a,并且每個broker上的消息隊列數量都為4。怎么回事,我配置的明明是期望創建4個隊列,為什么加起來會變成了8個?如下圖所示:

image-20211017121546902

由于時間關系,本文我們不會帶大家從源碼方面去解讀為啥會出現這種情況,接下來我們通過一種更加直觀的方式來驗證下這個問題:繼續做實驗。

我們繼續嘗試往一個新的 topic itzhai-test-queue-10發送1條消息,注意,這一次不做并發發送了,只發送一條,發送完畢之后,查看Topic狀態:

image-20211017183414630

可以發現,這次創建的消息隊列數量又是對的了,并且都是在broker-a上面創建的。接下來,無論怎么并發發送消息,消息隊列的數量都不會繼續增加了。

其實這也是并發請求Broker,觸發自動創建Topic的bug。

為了更加嚴格的管理Topic的創建和分片配置,一般在生產環境都是配置為手動創建Topic,通過提交運維工單申請創建Topic以及Topic的數據分配。

接下來我們來看看RocketMQ的特性。更多其他技術的底層架構內幕分析,請訪問我的博客IT宅(itzhai.com)或者關注Java架構雜談公眾號。

5. RocketMQ特性

5.1 生產端

5.1.1 消息發布

RocketMQ中定義了如下三種消息通信的方式:

public enum CommunicationMode {    SYNC,    ASYNC,    ONEWAY,}
  • SYNC:同步發送,生產端會阻塞等待發送結果;
    • 應用場景:這種方式應用場景非常廣泛,如重要業務事件通知。
  • ASYNC:異步發送,生產端調用發送API之后,立刻返回,在拿到Broker的響應結果后,觸發對應的SendCallback回調;
    • 應用場景:一般用于鏈路耗時較長,對 RT 較為敏感的業務場景;
  • ONEWAY:單向發送,發送方只負責發送消息,不等待服務器回應且沒有回調函數觸發,即只發送請求不等待應答。 此方式發送消息的過程耗時非常短,一般在微秒級別;
    • 應用場景:適用于耗時非常短,對可靠性要求不高的場景,如日志收集。

SYNC和ASYNC關注發送結果,ONEWAY不關注發送結果。發送結果如下:

public enum SendStatus {    SEND_OK,    FLUSH_DISK_TIMEOUT,    FLUSH_SLAVE_TIMEOUT,    SLAVE_NOT_AVAILABLE,}
  • SEND_OK:消息發送成功。SEND_OK并不意味著投遞是可靠的,要確保消息不丟失,需要開啟SYNC_MASTER同步或者SYNC_FLUSH同步寫;
  • FLUSH_DISK_TIMEOUT:消息發送成功,但是刷盤超時。如果Broker的flushDiskType=SYNC_FLUSH,并且5秒內沒有完成消息的刷盤,則會返回這個狀態;
  • FLUSH_SLAVE_TIMEOUT:消息發送成功,但是服務器同步到Slave時超時。如果Broker的brokerRole=SYNC_MASTER,并且5秒內沒有完成同步,則會返回這個狀態;
  • SLAVE_NOT_AVAILABLE:消息發送成功,但是無可用的Slave節點。如果Broker的brokerRole=SYNC_MASTER,但是沒有發現SLAVE節點或者SLAVE節點掛掉了,那么會返回這個狀態。

源碼內容更精彩,歡迎大家進一步閱讀源碼詳細了解消息發送的內幕:

  • 同步發送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
  • 異步發送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.SendCallback)
  • 單向發送:org.apache.rocketmq.client.producer.DefaultMQProducer#sendOneway(org.apache.rocketmq.common.message.Message)

5.1.2 順序消費

消息的有序性指的是一類消息消費的時候,可以按照發送順序來消費,比如:在Java架構雜談茶餐廳吃飯產生的消息:進入餐廳、點餐、下單、上菜、付款,消息要按照這個順序消費才有意義,但是多個顧客產生的消息是可以并行消費的。順序消費又分為全局順序消費和分區順序消費:

  • 全局順序:同一個Topic下的消息,所有消息按照嚴格的FIFO順序進行發布和消費。適用于:性能要求不高,所有消息嚴格按照FIFO進行發布和消費的場景;
  • 分區順序:同一個Topic下,根據消息的特定業務ID進行sharding key分區,同一個分區內的消息按照嚴格的FIFO順序進行發布和消費。適用于:性能要求高,在同一個分區中嚴格按照FIFO進行發布和消費的場景。

一般情況下,生產者是會以輪訓的方式把消息發送到Topic的消息隊列中的:

image-20211017213242909

在同一個Queue里面,消息的順序性是可以得到保證的,但是如果一個Topic有多個Queue,以輪訓的方式投遞消息,那么就會導致消息亂序了。

為了保證消息的順序性,需要把保持順序性的消息投遞到同一個Queue中。

5.1.2.1 如何保證消息投遞的順序性

RocketMQ提供了MessageQueueSelector接口,可以用來實現自定義的選擇投遞的消息隊列的算法:

for (int i = 0; i < orderList.size(); i++) {    String content = "Hello itzhai.com. Java架構雜談," + new Date();    Message msg = new Message("topic-itzhai-com", tags[i % tags.length], "KEY" + i,            content.getBytes(RemotingHelper.DEFAULT_CHARSET));    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {        @Override        public MessageQueue select(List mqs, Message msg, Object arg) {            Long orderId = (Long) arg;            // 訂單號與消息隊列個數取模,保證讓同一個訂單號的消息落入同一個消息隊列            long index = orderId % mqs.size();            return mqs.get((int) index);        }    }, orderList.get(i).getOrderId());    System.out.printf("content: %s, sendResult: %s%n", content, sendResult);}

如上圖,我們實現了MessageQueueSelector接口,并在實現的select方法里面,指定了選擇消息隊列的算法:訂單號與消息隊列個數取模,保證讓同一個訂單號的消息落入同一個消息隊列

image-20211017213318790

有個異常場景需要考慮:假設某一個Master節點掛掉了,導致Topic的消息隊列數量發生了變化,那么繼續使用以上的選擇算法,就會導致在這個過程中同一個訂單的消息會分散到不同的消息隊列里面,最終導致消息不能順序消費。

為了避免這種情況,只能選擇犧牲failover特性了。

現在投遞到消息隊列中的消息保證了順序,那如何保證消費也是順序的呢?

5.1.2.2 如何保證消息消費的順序性?

RocketMQ中提供了MessageListenerOrderly,該對象用于有順序收異步傳遞的消息,一個隊列對應一個消費線程,使用方法如下:

consumer.registerMessageListener(new MessageListenerOrderly() {    // 消費次數,用于輔助模擬各種消費結果    AtomicLong consumeTimes = new AtomicLong(0);    @Override    public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) {        context.setAutoCommit(true);        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);        this.consumeTimes.incrementAndGet();        if ((this.consumeTimes.get() % 2) == 0) {            return ConsumeOrderlyStatus.SUCCESS;        } else if ((this.consumeTimes.get() % 3) == 0) {            return ConsumeOrderlyStatus.ROLLBACK;        } else if ((this.consumeTimes.get() % 4) == 0) {            return ConsumeOrderlyStatus.COMMIT;        } else if ((this.consumeTimes.get() % 5) == 0) {            context.setSuspendCurrentQueueTimeMillis(3000);            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;        }        return ConsumeOrderlyStatus.SUCCESS;    }});

如果您使用的是MessageListenerConcurrently,表示并發消費,為了保證消息消費的順序性,需要設置為單線程模式。

使用MessageListenerOrderly的問題:如果遇到某條消息消費失敗,并且無法跳過,那么消息隊列的消費進度就會停滯。

5.1.3 延遲隊列(定時消息)

定時消費是指消息發送到Broker之后不會立即被消費,而是等待特定的時間之后才投遞到Topic中。定時消息會暫存在名為SCHEDULE_TOPIC_XXXX的topic中,并根據delayTimeLevel存入特定的queue,queueId=delayTimeLevel-1,一個queue只存相同延遲的消息,保證具有相同延遲的消息能夠順序消費。比如,我們設置1秒后把消息投遞到topic-itzhai-comtopic,則存儲的文件目錄如下所示:

image-20211017213559746

Broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。

定時消息的副作用:定時消息會在第一次寫入Topic和調度寫入實際的topic都會進行計數,因此發送數量,tps都會變高。

使用延遲隊列的場景:提交了訂單之后,如果等待超過約定的時間還未支付,則把訂單設置為超時狀態。

RocketMQ提供了以下幾個固定的延遲級別:

public class MessageStoreConfig {    ...    // 10個level,level:1~18    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";    ...}

level = 0 表示不使用延遲消息。

另外,消息消費失敗也會進入延遲隊列,消息發送時間與設置的延遲級別和重試次數有關

以下是發送延遲消息的代碼:

public class ScheduledMessageProducer {    public static void main(String[] args) throws Exception {        DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup");        producer.start();        int totalMessagesToSend = 100;        for (int i = 0; i < totalMessagesToSend; i++) {            Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());            // 指定該消息在10秒后被消費者消費            message.setDelayTimeLevel(3);            producer.send(message);        }        producer.shutdown();    }}

5.1.4 數據完整性與事務消息

通過消息對系統進行解耦之后,勢必會遇到分布式系統數據完整性的問題。

5.1.4.1 實現分布式事務的手段有哪些?

我們可以通過以下手段解決分布式系統數據最終一致性問題:

  • 數據庫層面的2PC(Two-phase commit protocol),二階段提交,同步阻塞,效率低下,存在協調者單點故障問題,極端情況下存在數據不一致的風險。對應技術上的XA、JTA/JTS。這是分布式環境下事務處理的典型模式;
  • 數據庫層面的3PC,三階段提交,引入了參與者超時機制,增加了預提交階段,使得故障恢復之后協調者的決策復雜度降低,但整體的交互過程變得更長了,性能有所下降,仍舊會存在數據不一致的問題;
  • 業務層面的TCC ,Try - Confirm - Cancel。對業務的侵入較大,和業務緊耦合,對于每一個操作都需要定義三個動作分別對應:Try - Confirm - Cancel,將資源層的兩階段提交協議轉換到業務層,成為業務模型中的一部分;
  • 本地消息表;
  • 事務消息;

RocketMQ事務消息(Transactional Message)則是通過事務消息來實現分布式事務的最終一致性。下面看看RocketMQ是如何實現事務消息的。

5.1.4.2 RocketMQ如何實現事務消息?

如下圖:

image-20211017213817767

事務消息有兩個流程:

  1. 事務消息發送及提交:
    1. 發送half消息;
    2. 服務端響應half消息寫入結果;
    3. 根據half消息的發送結果執行本地事務。如果發送失敗,此時half消息對業務不可見,本地事務不執行;
    4. 根據本地事務狀態執行Commit或者Rollback。Commit操作會觸發生成ConsumeQueue索引,此時消息對消費者可見
  2. 補償流程:
    5. 對于沒有Commit/Rollback的事務消息,會處于pending狀態,這對這些消息,MQ Server發起一次回查;
    6. Producer收到回查消息,檢查回查消息對應的本地事務的轉塔體;
    7. 根據本地事務狀態,重新執行Commit或者Rollback。

補償階段主要用于解決消息的Commit或者Rollback發生超時或者失敗的情況。

half消息:并不是發送了一半的消息,而是指消息已經發送到了MQ Server,但是該消息未收到生產者的二次確認,此時該消息暫時不能投遞到具體的ConsumeQueue中,這種狀態的消息稱為half消息。

5.1.4.3 RocketMQ事務消息是如何存儲的?

發送到MQ Server的half消息對消費者是不可見的,為此,RocketMQ會先把half消息的Topic和Queue信息存儲到消息的屬性中,然后把該half消息投遞到一個專門的處理事務消息的隊列中:RMQ_SYS_TRANS_HALF_TOPIC,由于消費者沒有訂閱該Topic,所以無法消息half類型的消息。

image-20211017213932431

生產者執行Commit half消息的時候,會存儲一條專門的Op消息,用于標識事務消息已確定的狀態,如果一條事務消息還沒有對應的Op消息,說明這個事務的狀態還無法確定。RocketMQ會開啟一個定時任務,對于pending狀態的消息,會先向生產者發送回查事務狀態請求,根據事務狀態來決定是否提交或者回滾消息。

當消息被標記為Commit狀態之后,會把half消息的Topic和Queue相關屬性還原為原來的值,最終構建實際的消費索引(ConsumeQueue)。

RocketMQ并不會無休止的嘗試消息事務狀態回查,默認查找15次,超過了15次還是無法獲取事務狀態,RocketMQ默認回滾該消息。并打印錯誤日志,可以通過重寫AbstractTransactionalMessageCheckListener類修改這個行為。

可以通過Broker的配置參數:transactionCheckMax來修改此值。

5.1.5 消息重投

如果消息發布方式是同步發送會重投,如果是異步發送會重試。

消息重投可以盡可能保證消息投遞成功,但是可能會造成消息重復。

什么情況會造成重復消費消息?

  • 出現消息量大,網絡抖動的時候;
  • 生產者主動重發;
  • 消費負載發生變化。

可以使用的消息重試策略:

  • retryTimesWhenSendFailed:設置同步發送失敗的重投次數,默認為2。所以生產者最多會嘗試發送retryTimesWhenSendFailed+1次。
    • 為了最大程度保證消息不丟失,重投的時候會嘗試向其他broker發送消息;
    • 超過重投次數,拋出異常,讓客戶端自行處理;
    • 觸發重投的異常:RemotingException、MQClientException和部分MQBrokerException;
  • retryTimesWhenSendAsyncFailed:設置異步發送失敗重試次數,異步重試不會選擇其他Broker,不保證消息不丟失;
  • retryAnotherBrokerWhenNotStoreOK:消息刷盤(主或備)超時或slave不可用(返回狀態非SEND_OK),是否嘗試發送到其他broker,默認false。重要的消息可以開啟此選項。

oneway發布方式不支持重投。

5.1.6 批量消息

為了提高系統的吞吐量,提高發送效率,可以使用批量發送消息。

批量發送消息的限制:

  • 同一批批量消息的topic,waitStoreMsgOK屬性必須保持一致;
  • 批量消息不支持延遲隊列;
  • 批量消息一次課發送的上限是4MB。

發送批量消息的例子:

String topic = "itzhai-test-topic";List messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world itzhai.com 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world itzhai.com 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world itzhai.com 2".getBytes()));producer.send(messages);

如果發送的消息比較多,會增加復雜性,為此,可以對大消息進行拆分。以下是拆分的例子:

public class ListSplitter implements Iterator> {     // 限制最大大小    private final int SIZE_LIMIT = 1024 * 1024 * 4;    private final List messages;    private int currIndex;    public ListSplitter(List messages) {         this.messages = messages;    }    @Override public boolean hasNext() {        return currIndex < messages.size();     }    @Override public List next() {         int startIndex = getStartIndex();        int nextIndex = startIndex;        int totalSize = 0;        for (; nextIndex < messages.size(); nextIndex++) {            Message message = messages.get(nextIndex);             int tmpSize = calcMessageSize(message);            if (tmpSize + totalSize > SIZE_LIMIT) {                break;             } else {                totalSize += tmpSize;             }        }        List subList = messages.subList(startIndex, nextIndex);         currIndex = nextIndex;        return subList;    }    private int getStartIndex() {        Message currMessage = messages.get(currIndex);         int tmpSize = calcMessageSize(currMessage);         while(tmpSize > SIZE_LIMIT) {            currIndex += 1;            Message message = messages.get(curIndex);             tmpSize = calcMessageSize(message);        }        return currIndex;     }    private int calcMessageSize(Message message) {        int tmpSize = message.getTopic().length() + message.getBody().length();         Map properties = message.getProperties();        for (Map.Entry entry : properties.entrySet()) {            tmpSize += entry.getKey().length() + entry.getValue().length();         }        tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes        return tmpSize;     }}// then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {   try {       List  listItem = splitter.next();       producer.send(listItem);   } catch (Exception e) {       e.printStackTrace();       // handle the error   }}

5.1.7 消息過濾

RocketMQ的消費者可以根據Tag進行消息過濾來獲取自己感興趣的消息,也支持自定義屬性過濾。

Tags是Topic下的次級消息類型/二級類型(注:Tags也支持TagA || TagB這樣的表達式),可以在同一個Topic下基于Tags進行消息過濾。

消息過濾是在Broker端實現的,減少了對Consumer無用消息的網絡傳輸,缺點是增加了Broker負擔,實現相對復雜。

5.2 消費端

5.2.1 消費模型

消費端有兩周消費模型:集群消費和廣播消費。

集群消費

集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。

廣播消費

廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。

5.2.2 消息重試

RocketMQ會為每個消費組都設置一個Topic名稱為%RETRY%consumerGroupName的重試隊列(這里需要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每個Topic設置的),用于暫時保存因為各種異常而導致Consumer端無法消費的消息。

考慮到異常恢復起來需要一些時間,會為重試隊列設置多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。

RocketMQ對于重試消息的處理是先保存至Topic名稱為SCHEDULE_TOPIC_XXXX的延遲隊列中,后臺定時任務按照對應的時間進行Delay后重新保存至%RETRY%consumerGroupName的重試隊列中。

比如,我們設置1秒后把消息投遞到topic-itzhai-comtopic,則存儲的文件目錄如下所示:

image-20211017213559746

5.2.3 死信隊列

當一條消息初次消費失敗,消息隊列會自動進行消息重試;達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列不會立刻將消息丟棄,而是將其發送到該消費者對應的特殊隊列中。

RocketMQ將這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message),將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)

在RocketMQ中,可以通過使用console控制臺對死信隊列中的消息進行重發來使得消費者實例再次進行消費


由于RocketMQ是使用Java寫的,所以它的代碼特別適合拿來閱讀消遣,我們繼續來看看RocketMQ的源碼結構...

不不,還是算了,一下子又到周末晚上了,時間差不多了,今天就寫到這里了。有空再聊。


我精心整理了一份Redis寶典給大家,涵蓋了Redis的方方面面,面試官懂的里面有,面試官不懂的里面也有,有了它,不怕面試官連環問,就怕面試官一上來就問你Redis的Redo Log是干啥的?畢竟這種問題我也不會。

image-20211007142531823

Java架構雜談公眾號發送Redis關鍵字獲取pdf文件:

image-20211010220323135

本文作者: arthinking

博客鏈接: https://www.itzhai.com/articles/deep-understanding-of-rocketmq.html

高并發異步解耦利器:RocketMQ究竟強在哪里?

版權聲明: 版權歸作者所有,未經許可不得轉載,侵權必究!聯系作者請加公眾號。

References

apache/rocketmq. Retrieved from https://github.com/apache/rocketmq

Java架構雜談

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/124531.html

相關文章

  • 上周熱點回顧(11.22-11.28)

    摘要:熱點隨筆入門實現跨框架組件復用葡萄城技術團隊二工作三年的一些感悟百萬級大數據插入更新,支持多種數據庫果糖大數據科技被下屬罵,記一次矛盾升級有心無心,蝴蝶效應葉小釵中的鑒權授權正確方式包子推薦一款顏值逆天且功能齊全的開源工具鉑賽東開源免費圖熱點隨筆:·?Svelte入門——Web Components實現跨框架組件復用?(葡萄城技術團隊)·?(二)工作三年的一些感悟?(Craftsman-L)...

    不知名網友 評論0 收藏0
  • RocketMQ我們學到了什么之NameServer

    摘要:故事中的下屬們,就是消息生產者角色,屋子右面墻根那塊地就是消息持久化,呂秀才就是消息調度中心,而你就是消息消費者角色。下屬們匯報的消息,應該疊放在哪里,這個消息又應該在哪里才能找到,全靠呂秀才的驚人記憶力,才可以讓消息準確的被投放以及消費。 微信公眾號:IT一刻鐘大型現實非嚴肅主義現場一刻鐘與你分享優質技術架構與見聞,做一個有劇情的程序員關注可了解更多精彩內容。問題或建議,請公眾號留言...

    wangbjun 評論0 收藏0
  • RocketMQ我們學到了什么之NameServer

    摘要:故事中的下屬們,就是消息生產者角色,屋子右面墻根那塊地就是消息持久化,呂秀才就是消息調度中心,而你就是消息消費者角色。下屬們匯報的消息,應該疊放在哪里,這個消息又應該在哪里才能找到,全靠呂秀才的驚人記憶力,才可以讓消息準確的被投放以及消費。 微信公眾號:IT一刻鐘大型現實非嚴肅主義現場一刻鐘與你分享優質技術架構與見聞,做一個有劇情的程序員關注可了解更多精彩內容。問題或建議,請公眾號留言...

    Arno 評論0 收藏0
  • 關于MQ的幾件小事(一)消息隊列的用途、優缺點、技術選型

    摘要:數量對吞吐量的影響可以達到幾百幾千個的級別,吞吐量會有小幅度的下降。這是的一大優勢,可在同等數量機器下支撐大量的從幾十個到幾百個的時候,吞吐量會大幅下降。下一篇如何保證消息隊列的高可用 1.為什么使用消息隊列? (1)解耦:可以在多個系統之間進行解耦,將原本通過網絡之間的調用的方式改為使用MQ進行消息的異步通訊,只要該操作不是需要同步的,就可以改為使用MQ進行不同系統之間的聯系,這樣項目之間...

    xialong 評論0 收藏0

發表評論

0條評論

tainzhi

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<