RabbitMQ
erlang開發,對消息堆積的支持并不好,當大量消息積壓的時候,會導致RabbitMQ的性能急劇下降。每秒鐘可以處理幾萬到十幾萬條消息。
RocketMQ
Java開發,面向??互聯網集群化?
?,功能豐富,對在線業務的響應時延做了很多的優化,大多數情況下可以做到毫秒級的響應,每秒鐘大概能處理幾十萬條消息。
Kafka
Scala開發,面向??日志?
?,功能豐富,性能最高。當你的業務場景中,每秒鐘消息數量沒有那么多的時候,Kafka 的時延反而會比較高。所以,Kafka 不太適合在線業務場景。
ActiveMQ
Java開發,簡單,穩定,性能不如前面三個。小項目可以選擇。
RocketMq組成部分有哪些?
Nameserver
無狀態,動態列表;這也是和zookeeper的重要區別之一。zookeeper是有狀態的。
Producer
消息生產者,負責發消息到Broker。
Broker
就是MQ本身,負責收發消息、持久化消息等。
Consumer
消息消費者,負責從Broker上拉取消息進行消費,消費完進行ack。
RocketMq消費模式有幾種?
集群消費
一條消息只會被同Group中的一個Consumer消費。
多個Group同時消費一個Topic時,每個Group都會有一個Consumer消費到數據
廣播消費
消息將對一個Consumer Group下的各個Consumer實例都消費一遍。即使這些Consumber屬于同一個Consumer Group,消息也會被Consumer Group中的每個Consumer都消費一次。
消費重復消費如何解決?
出現原因:正常情況下在在consumer真正消費完消息后應該發送ack,通知broker該消息已正常消費,從queue中刪除。當ack因為網絡原因無法發送到broker,broker會認為詞條消息沒有被消費,此后會開啟消息重投機制把消息再次投遞到consumer。
消費模式:在CLUSTERING模式下,消息在broker中會保證相同group的consumer消費一次,但是針對不同的group的consumer會推送多次。
解決方案
a.數據庫表:處理消息前,使用消息主鍵在表中帶有約束的字段中insert
b.map:單機時可以使用map做限制,消費時查詢當前消息id是不是已經存在
c.redis:使用分布式鎖
rocketmq如何保證消息的順序消費?
首先多個queue只能保證單個queue里的順序,queue是典型的FIFO,天然順序。多個queue同時消費是無法絕對保證消息的有序性的。
可以使用同一個topic,同一個QUEUE,發消息的時候一個線程去發送消息,消費的時候一個線程去消費一個queue里的消息。
RocketMQ如何保證消息不丟失?
producer端
采用send()同步發消息,發送結果是同步感知的。發送失敗后可以重試,設置重試次數。默認3次。
broker端
修改刷盤策略為同步刷盤。默認情況下是異步刷盤的。
集群部署
Consumer端
完全消費正常后在進行手動ack確認。
RocketMq如何實現分布式事務?
1、生產者向MQ服務器發送half消息。
2、half消息發送成功后,MQ服務器返回確認消息給生產者。
3、生產者開始執行本地事務。
4、根據本地事務執行的結果(UNKNOW、commit、rollback)向MQ server發送提交或回滾消息。
5、如果錯過了(可能因為網絡異常、生產者突然宕機等導致的異常情況)提交/回滾消息,則MQ服務器將向同一組中的每個生產者發送回查消息以獲取事務狀態。
6、回查生產者本地事務狀態。
7、生產者根據本地事務狀態發送提交/回滾消息。
8、MQ服務器將丟棄的回滾的消息,但已提交(進行過二次確認的half消息)的消息將投遞給消費者進行消費。
?? ? 超時:如果超過回查次數,默認回滾消息。 也就是他并未真正進入Topic的queue,而是用了臨時queue來放所謂的? ?1、如果可以添加消費者解決,就添加消費者的數據量 2、如果出現了queue,但是消費者多的情況。可以使用準備一個臨時的topic,同時創建一些queue,在臨時創建一個消費者來把這些消息轉移到topic中,讓消費者消費。?Half Message?
?:預處理消息,當broker收到此類消息后,會存儲到??RMQ_SYS_TRANS_HALF_TOPIC?
?的消息消費隊列中?檢查事務狀態?
?:Broker會開啟一個定時任務,消費??RMQ_SYS_TRANS_HALF_TOPIC?
?隊列中的消息,每次執行任務會向消息發送者確認事務執行狀態(提交、回滾、未知),如果是未知,Broker會定時去回調在重新檢查。?half message?
?,等提交事務后才會真正的將half message轉移到topic下的queue。RocketMQ的消息堆積如何處理?