摘要:所以基于目前的設(shè)計,建議關(guān)閉自動創(chuàng)建的功能,然后根據(jù)消息量的大小,手動創(chuàng)建。如果發(fā)送消息,返回結(jié)果超時,這種超時不會進行重試了如果是方法本身耗時超過,還未來得及調(diào)用發(fā)送消息,此時的超時也不會重試。
先來看下producer核心的類設(shè)計,如下圖:
1、核心發(fā)布消息的類DefaultMQProducer,繼承自MQProducer接口,此接口定義了一系列發(fā)送消息的方法,如普通消息,順序消息,延時消息等,最終進行網(wǎng)絡(luò)通信會交給MQClientAPIImpl處理。
2、rocketmq從4.1.3版本開始又支持了事務消息,由TransactionMQProducer類提供(之后會有專門的文章進行詳細解讀事務消息)
producer之配置我們看到DefaultMQProducer繼承了一個客戶端的公共配置類ClientConfig(與consumer公用),其實就是一個普通的javaBean,既可以代碼中設(shè)置屬性,也可以集成spring來配置
參數(shù)名 | 默認值 | 說明 |
---|---|---|
namesrvAddr | 無 | nameserver的地址列表,用分號隔開 |
clientIP | 本機ip地址 | 客戶端ip地址,有時候無法識別,需要手動配置 |
instanceName | DEFAULT | 客戶端實例名稱,客戶端創(chuàng)建的多個 Producer、Consumer 實際是共用一個內(nèi)部實例(這個實例包含網(wǎng)絡(luò)連接、線程資源等) |
clientCallbackExecutorThreads | cpu核數(shù) | 通信層客戶端處理請求的線程數(shù) |
pollNameServerInterval | 30000 | 輪詢nameserver的時間間隔,單位ms |
heartbeatBrokerInterval | 30000 | 向broker發(fā)送心跳的時間間隔,單位ms |
persistConsumerOffsetInterval | 5000 | 持久化 Consumer 消費進度間隔時間,單位ms |
producer獨有的配置:
參數(shù)名 | 默認值 | 說明 |
---|---|---|
producerGroup | DEFAULT_PRODUCER | Producer組名,相同分組的producer應該有相同的發(fā)送消息邏輯 |
createTopicKey | AUTO_CREATE_TOPIC_KEY | 自動創(chuàng)建topic時,以此默認topic為模板創(chuàng)建指定topic |
defaultTopicQueueNums | 4 | 自動創(chuàng)建topic隊列數(shù)量 |
sendMsgTimeout | 3000 | 發(fā)送消息的超時時間,單位ms |
compressMsgBodyOverHowmuch | 4098 | 消息體超過多大會進行壓縮,單位字節(jié) |
retryTimesWhenSendFailed | 2 | 同步發(fā)送消息,發(fā)送失敗重試次數(shù) |
retryTimesWhenSendAsyncFailed | 2 | 異步發(fā)送消息,發(fā)送失敗的重試次數(shù) |
retryAnotherBrokerWhenNotStoreOK | false | 同步發(fā)送消息,消息存儲失敗是否重試其他broker |
maxMessageSize | 4194304 | 客戶端限制消息的大小,默認4M |
TransactionListener | 事務消息時,必須設(shè)置的回查監(jiān)聽器 |
我們在創(chuàng)建producer時必須要指定一個group,這里有兩個作用:
生產(chǎn)者一般會是集群部署的,group用來標識一類生產(chǎn)者,相同group的生產(chǎn)者一般要有相同的發(fā)送邏輯。
在發(fā)送事務消息時,當事務消息異常,broker端來回查事務狀態(tài)時,需要知道是由哪類生產(chǎn)者發(fā)送的事務消息,生產(chǎn)端會根據(jù)group名稱來查找對應的producer來執(zhí)行相應的回查邏輯。
producer的啟動流程簡單說明下整個啟動流程:
1、首先在DefaultMQProducerImpl中會做一些參數(shù)校驗,如group是否合法;然后會創(chuàng)建MQClientInstance實例,此實例包含網(wǎng)絡(luò)連接、線程資源等,相同的clientId會共享此實例,所以通過MQClientManager來管理。
2、核心的啟動流程在MQClientInstance類中,如果nameserver地址沒有配置的話,會先通過靜態(tài)的http服務器地址去抓取nameserver的地址;再則啟動netty客戶端。
3、啟動一些定時任務,跟producer有關(guān)的如下幾個:
如果producer沒有配置nameserver地址,啟動定時抓取nameserver的地址的定時任務,任務延時10s開始,每隔2分支執(zhí)行一次。
輪詢nameserver定時任務,主要是定時更新topic的路由信息,任務延時10ms開始,每隔30s執(zhí)行一次。
清除下線的broker和向broker發(fā)送心跳,任務延時1s執(zhí)行,每隔30s執(zhí)行一次
Producer如何尋址RocketMQ 有多種配置方式可以令客戶端找到 NameServer, 然后通過 NameServer 再找到 Broker,分別如下,
優(yōu)先級由高到低,高優(yōu)優(yōu)先級會覆蓋低優(yōu)先級
1、代碼中指定 Name Server 地址
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
2、啟動參數(shù)指定
-Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876
3、環(huán)境變量指定 Name Server 地址
export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876
4、HTTP 靜態(tài)服務器尋址(默認)
如果以上三種都沒有設(shè)置name server的地址,客戶端啟動后先會訪問一個靜態(tài)http服務器獲取name server的地址,然后會啟動一個定時任務訪問這個靜態(tài) HTTP 服務器,地址如下:
http://jmenv.tbsite.net:8080/rocketmq/nsaddr
這是默認的地址,當然你也可以更改,做如下設(shè)置:
代碼:
System.setProperty("rocketmq.namesrv.domain","localhost"); System.setProperty("rocketmq.namesrv.domain.subgroup","nameServer")
或者啟動參數(shù)指定:
-Drocketmq.namesrv.domain=localhost -Drocketmq.namesrv.domain.subgroup=nameServer
以上設(shè)置后http服務器地址就變成:
http://localhsot:8080/rocketmq/nameServer
這個 URL 的返回內(nèi)容格式如下:
192.168.0.1:9876;192.168.0.2:9876
客戶端每隔 2 分鐘訪問一次這個 HTTP 服務器,并更新本地的 Name Server 地址。
推薦使用 HTTP 靜態(tài)服務器尋址方式,好處是客戶端部署簡單,且 Name Server 集群可以熱升級。
發(fā)送消息時如何獲取路由信息1、broker在啟動的時候通過參數(shù)autoCreateTopicEnable設(shè)置是否自動創(chuàng)建topic,默認為true,此時會創(chuàng)建一個名為TBW102(4.3版本已經(jīng)改名為AUTO_CREATE_TOPIC_KEY)的topic(參見類TopicConfigManager),broker在向namesrv注冊時會把默認的topic注冊上去。如果設(shè)置false,則不會注冊。
2、producer在發(fā)送消息時會在本地獲取路由信息,第一次發(fā)送的話本地肯定沒有,就會去namesrv獲取,如果此時namesrv也沒有,則會獲取TBW102的topic信息(參見DefaultMQProducerImpl.tryToFindTopicPublishInfo),以此為模板創(chuàng)建topic,然后選擇topic下的一臺broker發(fā),broker創(chuàng)建后,會通過心跳注冊到namesrv上。
3、如果autoCreateTopicEnable設(shè)置false的話,producer發(fā)送消息會報找不到路由的異常,此時必須手動創(chuàng)建topic。
建議autoCreateTopicEnable設(shè)置false,基于以上第二步,自動創(chuàng)建topic后,以后所有該TOPIC的消息,都將發(fā)送到剛才選擇的這臺broke上,達不到負載均衡的目的。所以基于目前RocketMQ的設(shè)計,建議關(guān)閉自動創(chuàng)建TOPIC的功能,然后根據(jù)消息量的大小,手動創(chuàng)建TOPIC。
可以通過管理工具mqadmin來手動創(chuàng)建topic
sh mqadmin updateTopic -c [集群名稱] -n [nameserver地址] -t [topic名稱] -w [寫隊列數(shù)] -r [讀隊列數(shù)]
手動創(chuàng)建了Topic后,producer就可以輪詢的發(fā)送到不同的broker了。
topic的隊列數(shù)這里講一下自動創(chuàng)建的topic的隊列數(shù)如何設(shè)置,首先broker創(chuàng)建的模板topic=AUTO_CREATE_TOPIC_KEY的隊列是8,參見類TopicConfigManager:
public TopicConfigManager(BrokerController brokerController) { //省略無關(guān)代碼 if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) { String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC; TopicConfig topicConfig = new TopicConfig(topic); this.systemTopicList.add(topic); topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig() .getDefaultTopicQueueNums()); int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE; topicConfig.setPerm(perm); this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig); } //省略無關(guān)代碼 }
BrokerConfig:
private int defaultTopicQueueNums = 8;
DefaultMQProducer端默認知道要創(chuàng)建的topic的隊列數(shù)是4
private volatile int defaultTopicQueueNums = 4;
在MQClientInstance類的方法updateTopicRouteInfoFromNameServer中有這樣一段邏輯:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault, DefaultMQProducer defaultMQProducer) { //省略無關(guān)代碼 for (QueueData data : topicRouteData.getQueueDatas()) { int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums()); data.setReadQueueNums(queueNums); data.setWriteQueueNums(queueNums); } //省略無關(guān)代碼 }
創(chuàng)建隊列是取兩者最小的一個,也就是4,所以要設(shè)置topic的隊列數(shù)量,很明顯了設(shè)置broker的defaultTopicQueueNums的值和DefaultMQProducer的defaultTopicQueueNums值就可以了。這是自動創(chuàng)建Topic時隊列數(shù)的設(shè)置方法,上面也提到生成環(huán)境一般不會開啟自動創(chuàng)建Topic的功能,可以通過上面的手動創(chuàng)建Topic的指令來設(shè)置讀寫隊列數(shù)。你可能注意到了Topic下有讀寫隊兩個隊列數(shù),分別代表上面意思呢?讀寫隊列其實是個邏輯概念,一個broker下topic的總隊列數(shù)是以寫隊列為準,而讀隊列意思是允許多少隊列可以被消費者消費,也就是說讀多寫少的情況下,沒有問題,隊列都可以被消費掉,如果寫多讀少的話,那么就會存在隊列不會被消費的情況。
消息發(fā)送前面我們講到了如何獲取topic的路由信息,如何創(chuàng)建topic的隊列數(shù),一個topic下有多個隊列,又可以分布在不同的broker上面,所以topic的總隊列數(shù)應該是所有broker上的topic下隊列數(shù)的總和。
備注:如果手動在每個broker上分別創(chuàng)建topic的話,相同topic在不同broker上的隊列數(shù)可以不一樣。
那么問題來了,在發(fā)送消息時根據(jù)怎么樣的策略來選擇一個隊列發(fā)送呢?rocketmq提供了一個MQFaultStrategy策略類來負責選擇隊列,這里會有一個參數(shù)sendLatencyFaultEnable是否開啟延遲故障,
該值默認為false,在不開啟的情況下,相同線程發(fā)送消息是輪詢topic下的所有隊列,不同線程發(fā)送是隨機的,核心代碼如下:
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) { if (this.sendLatencyFaultEnable) { //省略不必要的代碼...... } return tpInfo.selectOneMessageQueue(lastBrokerName); } //以上代碼邏輯參見類MQFaultStrategy.selectOneMessageQueue public MessageQueue selectOneMessageQueue(final String lastBrokerName) { if (lastBrokerName == null) { return selectOneMessageQueue(); } else { //省略不必要的代碼...... } } public MessageQueue selectOneMessageQueue() { int index = this.sendWhichQueue.getAndIncrement(); int pos = Math.abs(index) % this.messageQueueList.size(); if (pos < 0) pos = 0; return this.messageQueueList.get(pos); } //以上代碼邏輯參見類TopicPublishInfo public int getAndIncrement() { Integer index = this.threadLocalIndex.get();//ThreadLocal中獲取 if (null == index) {//為空,隨機生成一個 index = Math.abs(random.nextInt()); if (index < 0) index = 0; this.threadLocalIndex.set(index); } index = Math.abs(index + 1); if (index < 0) index = 0; this.threadLocalIndex.set(index); return index; } //以上代碼參見類ThreadLocalIndex
每次獲取index的時候都是從本地線程變量ThreadLocal中獲取,沒有的情況下就是隨機生成一個,加1取絕對值后返回,再對隊列列表的長度取模,所以在同一線程中,會輪訓的從隊列列表獲取隊列。而如果是不同線程的話,index是隨機生成的,所以就是隨機從隊列列表中獲取。如下圖所示:
可以看到選擇隊列方法的入?yún)⒂幸粋€lastBrokerName的入?yún)ⅲ藚?shù)的目的是在發(fā)送消息失敗的情況下,producer會重試再次發(fā)送,而再次發(fā)送選擇的隊列需要另選一個broker,lastBrokerName就是要過濾掉失敗的broker,選擇下一個broker的隊列進行發(fā)送消息。
開啟延遲故障,每當發(fā)送完一次消息,不管成功還是失敗,都會把此次存儲消息的broker給保存下來,記錄故障情況下此broker需要延長多長時間才能再次發(fā)送,目前看到在代碼里面寫死了,故障下30s之內(nèi)是不能再向此broker發(fā)送消息了。
消息重試producer的send方法本身支持內(nèi)部重試,重試邏輯如下:
1、最大重試次數(shù)默認2次,可以通過參數(shù)retryTimesWhenSendFailed設(shè)置
2、發(fā)送失敗,則輪詢到下一個broker,如果此時只有一個broker在線呢?那就會輪訓這個broker下的其他隊列。
3、這個方法的總耗時時間不超過 sendMsgTimeout 設(shè)置的值,默認為3s。
如果發(fā)送消息,broker返回結(jié)果超時,這種超時不會進行重試了;如果是方法本身耗時超過sendMsgTimeout ,還未來得及調(diào)用發(fā)送消息,此時的超時也不會重試。
以上策略其實也很難保證同步發(fā)送消息一定成功,如果應用要保證消息不丟失,最好先把消息存儲到db,后臺啟線程定時重試,確保消息一定存儲到broker。
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/77288.html
摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發(fā)展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳...
摘要:每個與集群中的所有節(jié)點建立長連接,定時注冊信息到所有。完全無狀態(tài),可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實現(xiàn)雖然版本不太一致但這也是能找到的最詳細的資料了接下來根據(jù)其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個成熟的產(chǎn)品來學習吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...
摘要:利用的高級特性特性是一種負載均衡的機制。在一個消息被分發(fā)到之前,首先檢查消息屬性。屬性為某個值的消息單個消息或消息集合在描述,和的對應關(guān)系,以及負載均衡策略時。同樣做到了保證消息的順序情況下,均衡消費的消費消息。 通常mq可以保證先到隊列的消息按照順序分發(fā)給消費者消費來保證順序,但是一個隊列有多個消費者消費的時候,那將失去這個保證,因為這些消息被多個線程并發(fā)的消費。但是有的時候消息按照...
摘要:前提通過前面兩篇文章可以簡單的了解和安裝,今天就將和整合起來使用。然后我運行之前的整合項目,查看監(jiān)控信息如下總結(jié)整篇文章講述了與整合和監(jiān)控平臺的搭建。 showImg(https://segmentfault.com/img/remote/1460000013232432?w=1920&h=1277); 前提 通過前面兩篇文章可以簡單的了解 RocketMQ 和 安裝 RocketMQ...
閱讀 1159·2021-10-15 09:39
閱讀 3062·2021-09-10 10:50
閱讀 3459·2019-08-30 15:53
閱讀 1882·2019-08-30 15:52
閱讀 2573·2019-08-29 15:31
閱讀 1983·2019-08-26 13:43
閱讀 2601·2019-08-26 13:37
閱讀 1448·2019-08-23 18:31