摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。
上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發展史:
并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳細探討RocketMQ究竟好在哪里。
RocketMQ是一個分布式消息中間件,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可擴展性。它是阿里巴巴于2012年開源的第三代分布式消息中間件。
隨著阿里巴巴的電商業務不斷發展,需要一款更高性能的消息中間件,RocketMQ就是這個業務背景的產物。RocketMQ是一個分布式消息中間件,具有低延遲、高性能和可靠性、萬億級別的容量和靈活的可擴展性,它是阿里巴巴于2012年開源的第三代分布式消息中間件。RocketMQ經歷了多年雙十一的洗禮,在可用性、可靠性以及穩定性等方面都有出色的表現。值得一提的是,RocketMQ最初就是借鑒了Kafka進行改造開發而來的,所以熟悉Kafka的朋友,會發現RocketMQ的原理和Kafka有很多相似之處。
RocketMQ前身叫做MetaQ,在MeataQ發布3.0版本的時候改名為RocketMQ,其本質上的設計思路和Kafka類似,因為最初就是基于Kafka改造而來,經過不斷的迭代與版本升級,2016年11月21日,阿里巴巴向Apache軟件基金會捐贈了RocketMQ 。近年來被越來越多的國內企業使用。
本文帶大家從以下幾個方面詳細了解RocketMQ:
RocketMQ的架構主要分為四部分,如下圖所示:
Producer
:消息生產者,支持集群方式部署;Consumer
:消息消費者,支持集群方式部署,支持pull,push模式獲取消息進行消費,支持集群和廣播方式消費;NameServer
:Topic路由注冊中心,類似于Dubbo中的zookeeper,支持Broker的動態注冊與發現;BrokerServer
:主要負責消息的存儲、投遞和查詢,以及服務高可用保證。BrokerServer包含以下幾個重要的子模塊:RocketMQ執行原理如下圖所示:
brokerRole
,可選值:ASYNC_MASTER
:異步復制方式(異步雙寫),生產者寫入消息到Master之后,無需等到消息復制到Slave即可返回,消息的復制由旁路線程進行異步復制;SYNC_MASTER
:同步復制方式(同步雙寫),生產者寫入消息到Master之后,需要等到Slave復制成功才可以返回。如果有多個Slave,只需要有一個Slave復制成功,并成功應答,就算復制成功了。這里是否持久化到磁盤依賴于另一個參數:flushDiskType
;SLAVE
:從節點本節我們來看看一個雙主雙從的RocketMQ是如何搭建的。
集群配置參數說明:
在討論集群前,我們需要了解兩個關鍵的集群配置參數:brokerRole,flushDiskType。brokerRole在前一節已經介紹了,而flushDiskType則是刷盤方式的配置,主要有:
- ASYNC_FLUSH: 異步刷盤
- SYNC_FLUSH: 同步刷盤
brokerRole確定了主從同步是異步的還是同步的,flushDiskType確定了數據刷盤的方式是同步的還是異步的。
如果業務場景對消息丟失容忍度很低,可以采用SYNC_MASTER + ASYNC_FLUSH的方式,這樣只有master和slave在刷盤前同時掛掉,消息才會丟失,也就是說即使有一臺機器出故障,仍然能保證數據不丟;
如果業務場景對消息丟失容忍度比較高,則可以采用ASYNC_MASTER + ASYNC_FLUSH的方式,這樣可以盡可能的提高消息的吞吐量。
Master Broker支持讀和寫,Slave Broker只支持讀。
當Master不可用的時候,Consumer會自動切換到Slave進行讀,也就是說,當Master節點的機器出現故障后,Consumer仍然可以從Slave節點讀取消息,不影響消費端的消費程序。
集群配置參數說明:
- brokerName: broker的名稱,需要把Master和Slave節點配置成相同的名稱,表示他們的主從關系,相同的brokerName的一組broker,組成一個broker組;
- brokerId: broker的id,0表示Master節點的id,大于0表示Slave節點的id。
在RocketMQ中,機器的主從節點關系是提前配置好的,沒有類似Kafka的Master動態選主功能。
如果一個Master宕機了,要讓生產端程序繼續可以生產消息,您需要部署多個Master節點,組成多個broker組。這樣在創建Topic的時候,就可以把Topic的不同消息隊列分布在多個broker組中,即使某一個broker組的Master節點不可用了,其他組的Master節點仍然可用,保證了Producer可以繼續發送消息。
為了盡可能的保證消息不丟失
,并且保證生產者和消費者的可用性
,我們可以構建一個雙主雙從的集群,搭建的架構圖如下所示:
部署架構說明:
以下是關鍵的配置參數:
# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.100# broker通信端口listenPort=10911# broker名稱brokerName=broker‐1# 0表示主節點brokerId=0# 2點進行消息刪除deleteWhen=02# 消息在磁盤上保留48小時fileReservedTime=48# 主從同步復制brokerRole=SYNC_MASTER# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創建TopicautoCreateTopicEnable=true# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐m
# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.101# broker通信端口listenPort=10911# broker名稱brokerName=broker‐1 # 非0表示從節點brokerId=1# 2點進行消息刪除deleteWhen=02# 消息在磁盤上保留48小時fileReservedTime=48# 從節點brokerRole=SLAVE# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創建TopicautoCreateTopicEnable=true # 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐s
# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.102# broker通信端口listenPort=10911# broker名稱brokerName=broker‐2# 0表示主節點brokerId=0# 2點進行消息刪除deleteWhen=02# 消息在磁盤上保留48小時fileReservedTime=48# 主從同步復制brokerRole=SYNC_MASTER# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創建TopicautoCreateTopicEnable=true# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐m
# NameServer地址namesrvAddr=192.168.1.100:9876;192.168.1.101:9876# 集群名稱brokerClusterName=itzhai-com-cluster# brokerIP地址brokerIP1=192.168.1.103# broker通信端口listenPort=10911# broker名稱brokerName=broker‐2# 非0表示從節點brokerId=1# 2點進行消息刪除deleteWhen=02# 消息在磁盤上保留48小時fileReservedTime=48# 從節點brokerRole=SLAVE# 異步刷盤flushDiskType=ASYNC_FLUSH# 自動創建TopicautoCreateTopicEnable=true# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐s
寫了那么多頂層架構圖,不寫寫底層內幕,就不是IT宅(itzhai.com)的文章風格,接下來,我們就來看看底層存儲架構。
我們在broker.conf
文件中配置了消息存儲的根目錄:
# 消息存儲根目錄storePathRootDir=/data/rocketmq/store‐m
進入這個目錄,我們可以發現如下的目錄結構:
其中:
下面我們來看看關鍵的commitlog以及consumequeue:
消息投遞到Broker之后,是先把實際的消息內容存放到CommitLog中的,然后再把消息寫入到對應主題的ConsumeQueue中。其中:
CommitLog:消息的物理存儲文件,存儲實際的消息內容。每個Broker上面的CommitLog被該Broker上所有的ConsumeQueue共享。
單個文件大小默認為1G,文件名長度為20位,左邊補零,剩余為起始偏移量。預分配好空間,消息順序寫入日志文件。當文件滿了,則寫入下一個文件,下一個文件的文件名基于文件第一條消息的偏移量進行命名;
ConsumeQueue:消息的邏輯隊列,相當于CommitLog的索引文件。RocketMQ是基于Topic主題訂閱模式實現的,每個Topic下會創建若干個邏輯上的消息隊列ConsumeQueue,在消息寫入到CommitLog之后,通過Broker的后臺服務線程(ReputMessageService)不停地分發請求并異步構建ConsumeQueue和IndexFile(索引文件,后面介紹),然后把每個ConsumeQueue需要的消息記錄到各個ConsumeQueue中。
ConsumeQueue主要記錄8個字節的commitLogOffset(消息在CommitLog中的物理偏移量), 4個字節的msgSize(消息大小), 8個字節的TagHashcode,每個元素固定20個字節。
ConsumeQueue相當于CommitLog文件的索引,可以通過ConsumeQueue快速從很大的CommitLog文件中快速定位到需要的消息。
主題消息隊列:在consumequeue目錄下,按照topic的維度存儲消息隊列。
重試消息隊列:如果topic中的消息消費失敗,則會把消息發到重試隊列,重新隊列按照消費端的GroupName來分組,命名規則:%RETRY%ConsumerGroupName
死信消息隊列:如果topic中的消息消費失敗,并且超過了指定重試次數之后,則會把消息發到死信隊列,死信隊列按照消費端的GroupName來分組,命名規則:%DLQ%ConsumerGroupName
假設我們現在有一個topic:itzhai-test
,消費分組:itzhai_consumer_group
,當消息消費失敗之后,我們查看consumequeue目錄,會發現多處了一個重試隊列:
我們可以在RocketMQ的控制臺看到這個重試消息隊列的主題和消息:
如果一直重試失敗,達到一定次數之后(默認是16次,重試時間:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h),就會把消息投遞到死信隊列:
每條消息的長度是不固定的,為了提高寫入的效率,RocketMQ預先分配好1G空間的CommitLog文件,采用順序寫的方式寫入消息,大大的提高寫入的速度。
RocketMQ中消息刷盤主要可以分為同步刷盤和異步刷盤兩種,通過flushDiskType參數進行配置。如果需要提高寫消息的效率,降低延遲,提高MQ的性能和吞吐量,并且不要求消息數據存儲的高可靠性,可以把刷盤策略設置為異步刷盤。
為了提高讀取的效率,RocketMQ使用ConsumeQueue作為消費消息的索引,使用IndexFile作為基于消息key的查詢的索引。下面來詳細介紹下。
讀取消息是隨機讀的,為此,RocketMQ專門建立了ConsumeQueue索引文件,每次先從ConsumeQueue中獲取需要的消息的地址,消息大小,然后從CommitLog文件中根據地址直接讀取消息內容。在讀取消息內容的過程中,也盡量利用到了操作系統的頁緩存機制,進一步加速讀取速度。
ConsumeQueue由于每個元素大小是固定的,因此可以像訪問數組一樣訪問每個消息元素。并且占用空間很小,大部分的ConsumeQueue能夠被全部載入內存,所以這個索引查找的速度很快。每個ConsumeQueue文件由30w個元素組成,占用空間在6M以內。每個文件默認大小為600萬個字節,當一個ConsumeQueue類型的文件寫滿之后,則寫入下一個文件。
我們在RocketMQ的store目錄中可以發現有一個index目錄,這個是一個用于輔助提高查詢消息效率的索引文件。通過該索引文件實現基于消息key來查詢消息的功能。
IndexFile索引文件物理存儲結構如下圖所示:
beginTimestamp
:索引文件中第一個索引消息存入Broker的時間戳;endTimestamp
:索引文件中最后一個索引消息存入Broker的時間戳beginPHYOffset
:索引文件中第一個索引消息在CommitLog中的偏移量;endPhyOffset
:索引文件中最后一個索引消息在CommitLog中的偏移量;hashSlotCount
:構建索引使用的slot數量;indexCount
:索引的總數;Key Hash
:消息的哈希值;Commit Log Offset
:消息在CommitLog中的偏移量;Timestamp
:消息存儲的時間戳;Next Index Offset
:下一個索引的位置,如果消息取模后發生槽位槽位碰撞,則通過此字段把碰撞的消息構成鏈表。每個IndexFile文件的大小:40b + 4b * 5000000 + 20b * 20000000 = 420000040b,約為400M。
IndexFile索引文件的邏輯存儲結構如下圖所示:
IndexFile邏輯上是基于哈希表來實現的,Slot Table為哈希鍵,Index Linked List中存儲的為哈希值。
RocketMQ中的MessageId的長度總共有16字節,其中包含了:消息存儲主機地址(IP地址和端口),消息Commit Log offset。“
按照MessageId查詢消息的流程:Client端從MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封裝成一個RPC請求后通過Remoting通信層發送(業務請求碼:VIEW_MESSAGE_BY_ID)。Broker端走的是QueryMessageProcessor,讀取消息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄并解析成一個完整的消息返回。
我們繼續看看在集群模式下,RocketMQ的Topic數據是如何做分區的。IT宅(itzhai.com)提醒大家,實踐出真知。這里我們部署兩個Master節點:
我們通過手動配置每個Broker中的Topic,以及ConsumeQueue數量,來實現Topic的數據分片,如,我們到集群中手動配置這樣的Topic:
broker-a
創建itzhai-com-test-1
,4個隊列;broker-b
創建itzhai-com-test-1
,2個隊列。創建完成之后,Topic分片集群分布如下:
即:
可以發現,RocketMQ是把Topic分片存儲到各個Broker節點中,然后在把Broker節點中的Topic繼續分片為若干等分的ConsumeQueue,從而提高消息的吞吐量。ConsumeQueue是作為負載均衡資源分配的基本單元。
這樣把Topic的消息分區到了不同的Broker上,從而增加了消息隊列的數量,從而能夠支持更塊的并發消費速度(只要有足夠的消費者)。
假設設置為通過Broker自動創建Topic(autoCreateTopicEnable=true),并且Producer端設置Topic消息隊列數量設置為4,也就是默認值:
producer.setDefaultTopicQueueNums(4);
嘗試往一個新的 topic itzhai-test-queue-1
連續發送10條消息,發送完畢之后,查看Topic狀態:
我們可以發現,在兩個broker上面都創建了itzhai-test-queue-a
,并且每個broker上的消息隊列數量都為4。怎么回事,我配置的明明是期望創建4個隊列,為什么加起來會變成了8個?如下圖所示:
由于時間關系,本文我們不會帶大家從源碼方面去解讀為啥會出現這種情況,接下來我們通過一種更加直觀的方式來驗證下這個問題:繼續做實驗。
我們繼續嘗試往一個新的 topic itzhai-test-queue-10
發送1條消息,注意,這一次不做并發發送了,只發送一條,發送完畢之后,查看Topic狀態:
可以發現,這次創建的消息隊列數量又是對的了,并且都是在broker-a上面創建的。接下來,無論怎么并發發送消息,消息隊列的數量都不會繼續增加了。
其實這也是并發請求Broker,觸發自動創建Topic的bug。
為了更加嚴格的管理Topic的創建和分片配置,一般在生產環境都是配置為手動創建Topic,通過提交運維工單申請創建Topic以及Topic的數據分配。
接下來我們來看看RocketMQ的特性。更多其他技術的底層架構內幕分析,請訪問我的博客IT宅(itzhai.com)或者關注Java架構雜談公眾號。
RocketMQ中定義了如下三種消息通信的方式:
public enum CommunicationMode { SYNC, ASYNC, ONEWAY,}
SYNC
:同步發送,生產端會阻塞等待發送結果;ASYNC
:異步發送,生產端調用發送API之后,立刻返回,在拿到Broker的響應結果后,觸發對應的SendCallback回調;ONEWAY
:單向發送,發送方只負責發送消息,不等待服務器回應且沒有回調函數觸發,即只發送請求不等待應答。 此方式發送消息的過程耗時非常短,一般在微秒級別;SYNC和ASYNC關注發送結果,ONEWAY不關注發送結果。發送結果如下:
public enum SendStatus { SEND_OK, FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, SLAVE_NOT_AVAILABLE,}
SEND_OK
:消息發送成功。SEND_OK并不意味著投遞是可靠的,要確保消息不丟失,需要開啟SYNC_MASTER同步或者SYNC_FLUSH同步寫;FLUSH_DISK_TIMEOUT
:消息發送成功,但是刷盤超時。如果Broker的flushDiskType=SYNC_FLUSH,并且5秒內沒有完成消息的刷盤,則會返回這個狀態;FLUSH_SLAVE_TIMEOUT
:消息發送成功,但是服務器同步到Slave時超時。如果Broker的brokerRole=SYNC_MASTER,并且5秒內沒有完成同步,則會返回這個狀態;SLAVE_NOT_AVAILABLE
:消息發送成功,但是無可用的Slave節點。如果Broker的brokerRole=SYNC_MASTER,但是沒有發現SLAVE節點或者SLAVE節點掛掉了,那么會返回這個狀態。源碼內容更精彩,歡迎大家進一步閱讀源碼詳細了解消息發送的內幕:
- 同步發送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message)
- 異步發送:org.apache.rocketmq.client.producer.DefaultMQProducer#send(org.apache.rocketmq.common.message.Message, org.apache.rocketmq.client.producer.SendCallback)
- 單向發送:org.apache.rocketmq.client.producer.DefaultMQProducer#sendOneway(org.apache.rocketmq.common.message.Message)
消息的有序性指的是一類消息消費的時候,可以按照發送順序來消費,比如:在Java架構雜談
茶餐廳吃飯產生的消息:進入餐廳、點餐、下單、上菜、付款,消息要按照這個順序消費才有意義,但是多個顧客產生的消息是可以并行消費的。順序消費又分為全局順序消費和分區順序消費:
全局順序
:同一個Topic下的消息,所有消息按照嚴格的FIFO順序進行發布和消費。適用于:性能要求不高,所有消息嚴格按照FIFO進行發布和消費的場景;分區順序
:同一個Topic下,根據消息的特定業務ID進行sharding key分區,同一個分區內的消息按照嚴格的FIFO順序進行發布和消費。適用于:性能要求高,在同一個分區中嚴格按照FIFO進行發布和消費的場景。一般情況下,生產者是會以輪訓的方式把消息發送到Topic的消息隊列中的:
在同一個Queue里面,消息的順序性是可以得到保證的,但是如果一個Topic有多個Queue,以輪訓的方式投遞消息,那么就會導致消息亂序了。
為了保證消息的順序性,需要把保持順序性的消息投遞到同一個Queue中。
RocketMQ提供了MessageQueueSelector
接口,可以用來實現自定義的選擇投遞的消息隊列的算法:
for (int i = 0; i < orderList.size(); i++) { String content = "Hello itzhai.com. Java架構雜談," + new Date(); Message msg = new Message("topic-itzhai-com", tags[i % tags.length], "KEY" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List mqs, Message msg, Object arg) { Long orderId = (Long) arg; // 訂單號與消息隊列個數取模,保證讓同一個訂單號的消息落入同一個消息隊列 long index = orderId % mqs.size(); return mqs.get((int) index); } }, orderList.get(i).getOrderId()); System.out.printf("content: %s, sendResult: %s%n", content, sendResult);}
如上圖,我們實現了MessageQueueSelector
接口,并在實現的select方法里面,指定了選擇消息隊列的算法:訂單號與消息隊列個數取模,保證讓同一個訂單號的消息落入同一個消息隊列:
有個異常場景需要考慮:假設某一個Master節點掛掉了,導致Topic的消息隊列數量發生了變化,那么繼續使用以上的選擇算法,就會導致在這個過程中同一個訂單的消息會分散到不同的消息隊列里面,最終導致消息不能順序消費。
為了避免這種情況,只能選擇犧牲failover特性了。
現在投遞到消息隊列中的消息保證了順序,那如何保證消費也是順序的呢?
RocketMQ中提供了MessageListenerOrderly
,該對象用于有順序收異步傳遞的消息,一個隊列對應一個消費線程,使用方法如下:
consumer.registerMessageListener(new MessageListenerOrderly() { // 消費次數,用于輔助模擬各種消費結果 AtomicLong consumeTimes = new AtomicLong(0); @Override public ConsumeOrderlyStatus consumeMessage(List msgs, ConsumeOrderlyContext context) { context.setAutoCommit(true); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs); this.consumeTimes.incrementAndGet(); if ((this.consumeTimes.get() % 2) == 0) { return ConsumeOrderlyStatus.SUCCESS; } else if ((this.consumeTimes.get() % 3) == 0) { return ConsumeOrderlyStatus.ROLLBACK; } else if ((this.consumeTimes.get() % 4) == 0) { return ConsumeOrderlyStatus.COMMIT; } else if ((this.consumeTimes.get() % 5) == 0) { context.setSuspendCurrentQueueTimeMillis(3000); return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT; } return ConsumeOrderlyStatus.SUCCESS; }});
如果您使用的是MessageListenerConcurrently
,表示并發消費,為了保證消息消費的順序性,需要設置為單線程模式。
使用
MessageListenerOrderly
的問題:如果遇到某條消息消費失敗,并且無法跳過,那么消息隊列的消費進度就會停滯。
定時消費是指消息發送到Broker之后不會立即被消費,而是等待特定的時間之后才投遞到Topic中。定時消息會暫存在名為SCHEDULE_TOPIC_XXXX
的topic中,并根據delayTimeLevel存入特定的queue,queueId=delayTimeLevel-1,一個queue只存相同延遲的消息,保證具有相同延遲的消息能夠順序消費。比如,我們設置1秒后把消息投遞到topic-itzhai-com
topic,則存儲的文件目錄如下所示:
Broker會調度地消費SCHEDULE_TOPIC_XXXX,將消息寫入真實的topic。
定時消息的副作用:定時消息會在第一次寫入Topic和調度寫入實際的topic都會進行計數,因此發送數量,tps都會變高。
使用延遲隊列的場景:提交了訂單之后,如果等待超過約定的時間還未支付,則把訂單設置為超時狀態。
RocketMQ提供了以下幾個固定的延遲級別:
public class MessageStoreConfig { ... // 10個level,level:1~18 private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; ...}
level = 0 表示不使用延遲消息。
另外,消息消費失敗也會進入延遲隊列,消息發送時間與設置的延遲級別和重試次數有關。
以下是發送延遲消息的代碼:
public class ScheduledMessageProducer { public static void main(String[] args) throws Exception { DefaultMQProducer producer = new DefaultMQProducer("TestProducerGroup"); producer.start(); int totalMessagesToSend = 100; for (int i = 0; i < totalMessagesToSend; i++) { Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes()); // 指定該消息在10秒后被消費者消費 message.setDelayTimeLevel(3); producer.send(message); } producer.shutdown(); }}
通過消息對系統進行解耦之后,勢必會遇到分布式系統數據完整性的問題。
我們可以通過以下手段解決分布式系統數據最終一致性問題:
2PC(Two-phase commit protocol)
,二階段提交,同步阻塞,效率低下,存在協調者單點故障問題,極端情況下存在數據不一致的風險。對應技術上的XA、JTA/JTS。這是分布式環境下事務處理的典型模式;3PC
,三階段提交,引入了參與者超時機制,增加了預提交階段,使得故障恢復之后協調者的決策復雜度降低,但整體的交互過程變得更長了,性能有所下降,仍舊會存在數據不一致的問題;Try - Confirm - Cancel
。對業務的侵入較大,和業務緊耦合,對于每一個操作都需要定義三個動作分別對應:Try - Confirm - Cancel
,將資源層的兩階段提交協議轉換到業務層,成為業務模型中的一部分;RocketMQ事務消息(Transactional Message)則是通過事務消息來實現分布式事務的最終一致性。下面看看RocketMQ是如何實現事務消息的。
如下圖:
事務消息有兩個流程:
補償階段主要用于解決消息的Commit或者Rollback發生超時或者失敗的情況。
half消息:并不是發送了一半的消息,而是指消息已經發送到了MQ Server,但是該消息未收到生產者的二次確認,此時該消息暫時不能投遞到具體的ConsumeQueue中,這種狀態的消息稱為half消息。
發送到MQ Server的half消息對消費者是不可見的,為此,RocketMQ會先把half消息的Topic和Queue信息存儲到消息的屬性中,然后把該half消息投遞到一個專門的處理事務消息的隊列中:RMQ_SYS_TRANS_HALF_TOPIC
,由于消費者沒有訂閱該Topic,所以無法消息half類型的消息。
生產者執行Commit half消息的時候,會存儲一條專門的Op消息,用于標識事務消息已確定的狀態,如果一條事務消息還沒有對應的Op消息,說明這個事務的狀態還無法確定。RocketMQ會開啟一個定時任務,對于pending狀態的消息,會先向生產者發送回查事務狀態請求,根據事務狀態來決定是否提交或者回滾消息。
當消息被標記為Commit狀態之后,會把half消息的Topic和Queue相關屬性還原為原來的值,最終構建實際的消費索引(ConsumeQueue)。
RocketMQ并不會無休止的嘗試消息事務狀態回查,默認查找15次,超過了15次還是無法獲取事務狀態,RocketMQ默認回滾該消息。并打印錯誤日志,可以通過重寫AbstractTransactionalMessageCheckListener類修改這個行為。
可以通過Broker的配置參數:transactionCheckMax來修改此值。
如果消息發布方式是同步發送會重投,如果是異步發送會重試。
消息重投可以盡可能保證消息投遞成功,但是可能會造成消息重復。
什么情況會造成重復消費消息?
可以使用的消息重試策略:
retryTimesWhenSendFailed
:設置同步發送失敗的重投次數,默認為2。所以生產者最多會嘗試發送retryTimesWhenSendFailed+1次。retryTimesWhenSendAsyncFailed
:設置異步發送失敗重試次數,異步重試不會選擇其他Broker,不保證消息不丟失;retryAnotherBrokerWhenNotStoreOK
:消息刷盤(主或備)超時或slave不可用(返回狀態非SEND_OK),是否嘗試發送到其他broker,默認false。重要的消息可以開啟此選項。oneway發布方式不支持重投。
為了提高系統的吞吐量,提高發送效率,可以使用批量發送消息。
批量發送消息的限制:
發送批量消息的例子:
String topic = "itzhai-test-topic";List messages = new ArrayList<>();messages.add(new Message(topic, "TagA", "OrderID001", "Hello world itzhai.com 0".getBytes()));messages.add(new Message(topic, "TagA", "OrderID002", "Hello world itzhai.com 1".getBytes()));messages.add(new Message(topic, "TagA", "OrderID003", "Hello world itzhai.com 2".getBytes()));producer.send(messages);
如果發送的消息比較多,會增加復雜性,為此,可以對大消息進行拆分。以下是拆分的例子:
public class ListSplitter implements Iterator> { // 限制最大大小 private final int SIZE_LIMIT = 1024 * 1024 * 4; private final List messages; private int currIndex; public ListSplitter(List messages) { this.messages = messages; } @Override public boolean hasNext() { return currIndex < messages.size(); } @Override public List next() { int startIndex = getStartIndex(); int nextIndex = startIndex; int totalSize = 0; for (; nextIndex < messages.size(); nextIndex++) { Message message = messages.get(nextIndex); int tmpSize = calcMessageSize(message); if (tmpSize + totalSize > SIZE_LIMIT) { break; } else { totalSize += tmpSize; } } List subList = messages.subList(startIndex, nextIndex); currIndex = nextIndex; return subList; } private int getStartIndex() { Message currMessage = messages.get(currIndex); int tmpSize = calcMessageSize(currMessage); while(tmpSize > SIZE_LIMIT) { currIndex += 1; Message message = messages.get(curIndex); tmpSize = calcMessageSize(message); } return currIndex; } private int calcMessageSize(Message message) { int tmpSize = message.getTopic().length() + message.getBody().length(); Map properties = message.getProperties(); for (Map.Entry entry : properties.entrySet()) { tmpSize += entry.getKey().length() + entry.getValue().length(); } tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes return tmpSize; }}// then you could split the large list into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) { try { List listItem = splitter.next(); producer.send(listItem); } catch (Exception e) { e.printStackTrace(); // handle the error }}
RocketMQ的消費者可以根據Tag進行消息過濾來獲取自己感興趣的消息,也支持自定義屬性過濾。
Tags是Topic下的次級消息類型/二級類型(注:Tags也支持TagA || TagB
這樣的表達式),可以在同一個Topic下基于Tags進行消息過濾。
消息過濾是在Broker端實現的,減少了對Consumer無用消息的網絡傳輸,缺點是增加了Broker負擔,實現相對復雜。
消費端有兩周消費模型:集群消費和廣播消費。
集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。
廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。
RocketMQ會為每個消費組都設置一個Topic名稱為%RETRY%consumerGroupName
的重試隊列(這里需要注意的是,這個Topic的重試隊列是針對消費組,而不是針對每個Topic設置的),用于暫時保存因為各種異常而導致Consumer端無法消費的消息。
考慮到異常恢復起來需要一些時間,會為重試隊列設置多個重試級別,每個重試級別都有與之對應的重新投遞延時,重試次數越多投遞延時就越大。
RocketMQ對于重試消息的處理是先保存至Topic名稱為SCHEDULE_TOPIC_XXXX
的延遲隊列中,后臺定時任務按照對應的時間進行Delay后重新保存至%RETRY%consumerGroupName
的重試隊列中。
比如,我們設置1秒后把消息投遞到topic-itzhai-com
topic,則存儲的文件目錄如下所示:
當一條消息初次消費失敗,消息隊列會自動進行消息重試;達到最大重試次數后,若消費依然失敗,則表明消費者在正常情況下無法正確地消費該消息,此時,消息隊列不會立刻將消息丟棄,而是將其發送到該消費者對應的特殊隊列中。
RocketMQ將這種正常情況下無法被消費的消息稱為死信消息(Dead-Letter Message)
,將存儲死信消息的特殊隊列稱為死信隊列(Dead-Letter Queue)
。
在RocketMQ中,可以通過使用console控制臺對死信隊列中的消息進行重發來使得消費者實例再次進行消費。
由于RocketMQ是使用Java寫的,所以它的代碼特別適合拿來閱讀消遣,我們繼續來看看RocketMQ的源碼結構...
不不,還是算了,一下子又到周末晚上了,時間差不多了,今天就寫到這里了。有空再聊。
我精心整理了一份Redis寶典給大家,涵蓋了Redis的方方面面,面試官懂的里面有,面試官不懂的里面也有,有了它,不怕面試官連環問,就怕面試官一上來就問你Redis的Redo Log是干啥的?畢竟這種問題我也不會。
在Java架構雜談
公眾號發送Redis
關鍵字獲取pdf文件:
本文作者: arthinking
博客鏈接: https://www.itzhai.com/articles/deep-understanding-of-rocketmq.html
高并發異步解耦利器:RocketMQ究竟強在哪里?
版權聲明: 版權歸作者所有,未經許可不得轉載,侵權必究!聯系作者請加公眾號。
apache/rocketmq. Retrieved from https://github.com/apache/rocketmq
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/124531.html
摘要:熱點隨筆入門實現跨框架組件復用葡萄城技術團隊二工作三年的一些感悟百萬級大數據插入更新,支持多種數據庫果糖大數據科技被下屬罵,記一次矛盾升級有心無心,蝴蝶效應葉小釵中的鑒權授權正確方式包子推薦一款顏值逆天且功能齊全的開源工具鉑賽東開源免費圖熱點隨筆:·?Svelte入門——Web Components實現跨框架組件復用?(葡萄城技術團隊)·?(二)工作三年的一些感悟?(Craftsman-L)...
摘要:故事中的下屬們,就是消息生產者角色,屋子右面墻根那塊地就是消息持久化,呂秀才就是消息調度中心,而你就是消息消費者角色。下屬們匯報的消息,應該疊放在哪里,這個消息又應該在哪里才能找到,全靠呂秀才的驚人記憶力,才可以讓消息準確的被投放以及消費。 微信公眾號:IT一刻鐘大型現實非嚴肅主義現場一刻鐘與你分享優質技術架構與見聞,做一個有劇情的程序員關注可了解更多精彩內容。問題或建議,請公眾號留言...
摘要:故事中的下屬們,就是消息生產者角色,屋子右面墻根那塊地就是消息持久化,呂秀才就是消息調度中心,而你就是消息消費者角色。下屬們匯報的消息,應該疊放在哪里,這個消息又應該在哪里才能找到,全靠呂秀才的驚人記憶力,才可以讓消息準確的被投放以及消費。 微信公眾號:IT一刻鐘大型現實非嚴肅主義現場一刻鐘與你分享優質技術架構與見聞,做一個有劇情的程序員關注可了解更多精彩內容。問題或建議,請公眾號留言...
摘要:數量對吞吐量的影響可以達到幾百幾千個的級別,吞吐量會有小幅度的下降。這是的一大優勢,可在同等數量機器下支撐大量的從幾十個到幾百個的時候,吞吐量會大幅下降。下一篇如何保證消息隊列的高可用 1.為什么使用消息隊列? (1)解耦:可以在多個系統之間進行解耦,將原本通過網絡之間的調用的方式改為使用MQ進行消息的異步通訊,只要該操作不是需要同步的,就可以改為使用MQ進行不同系統之間的聯系,這樣項目之間...
閱讀 3641·2021-11-23 09:51
閱讀 1990·2021-11-16 11:42
閱讀 3234·2021-11-08 13:20
閱讀 1097·2019-08-30 15:55
閱讀 2205·2019-08-30 10:59
閱讀 1239·2019-08-29 14:04
閱讀 1017·2019-08-29 12:41
閱讀 2006·2019-08-26 12:22