前提介紹
在RocketMQ中一般有兩種獲取消息的方式,一個(gè)是拉(pull,消費(fèi)者主動(dòng)去broker拉取),一個(gè)是推(push,主動(dòng)推送給消費(fèi)者),在上一章節(jié)中已經(jīng)介紹到了相關(guān)的Push操作,接下來的章節(jié)會(huì)介紹Pull操作方式的消費(fèi)機(jī)制體系。
DefaultMQPullConsumer
DefaultMQPullConsumer與DefaultMQPushConsumer相比最大的區(qū)別是,消費(fèi)哪些隊(duì)列的消息,從哪個(gè)位移開始消費(fèi),以及何時(shí)提交消費(fèi)位移都是由程序自己的控制的。下面來介紹一下DefaultMQPullConsumer的內(nèi)部原理。
總體流程執(zhí)行
DefaultMQPullConsumer使用例子
public class MQPullConsumer { private static final Map OFFSE_TABLE = new HashMap(); public static void main(String[] args) throws MQClientException { DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName"); consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876"); consumer.start(); // 從指定topic中拉取所有消息隊(duì)列 Set mqs = consumer.fetchSubscribeMessageQueues("order-topic"); for(MessageQueue mq:mqs){ try { // 獲取消息的offset,指定從store中獲取 long offset = consumer.fetchConsumeOffset(mq,true); while(true){ PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32); putMessageQueueOffset(mq,pullResult.getNextBeginOffset()); switch(pullResult.getPullStatus()){ case FOUND: List messageExtList = pullResult.getMsgFoundList(); for (MessageExt m : messageExtList) { System.out.println(new String(m.getBody())); } break; case NO_MATCHED_MSG: break; case NO_NEW_MSG: break; case OFFSET_ILLEGAL: break; } } } catch (Exception e) { e.printStackTrace(); } } consumer.shutdown(); } // 保存上次消費(fèi)的消息下標(biāo) private static void putMessageQueueOffset(MessageQueue mq, long nextBeginOffset) { OFFSE_TABLE.put(mq, nextBeginOffset); } // 獲取上次消費(fèi)的消息的下標(biāo) private static Long getMessageQueueOffset(MessageQueue mq) { Long offset = OFFSE_TABLE.get(mq); if(offset != null){ return offset; } return 0l; }}
- 消費(fèi)者啟動(dòng):consumer.start();
- 獲取主題下所有的消息隊(duì)列:這里是根據(jù)topic從nameserver獲取的這里我們可以修改為從其他位置獲取隊(duì)列信息
Set
mqs = consumer.fetchSubscribeMessageQueues("topicTest");//遍歷隊(duì)列for(MessageQueue mq:mqs){ try { //獲取當(dāng)前隊(duì)列的消費(fèi)位移,第二個(gè)參數(shù)表示位移是從本地內(nèi)存獲取,還是從broker獲取,true表示從broker獲取 long offset = consumer.fetchConsumeOffset(mq,true); while(true){ //第二個(gè)參數(shù)表示可以消費(fèi)哪些tag的消息 //第三個(gè)參數(shù)表示從哪個(gè)位移開始消費(fèi)消息 //第四個(gè)參數(shù)表示一次最大拉多少個(gè)消息 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);}
DefaultMQPullConsumer的總體流程
啟動(dòng)DefaultMQPullConsumer是通過調(diào)用start()方法完成的
DefaultMQPullConsumer拉取源碼分析
分析下DefaultMQPullConsumer拉取消息的流程
consumer.fetchSubscribeMessageQueues("order-topic")
從指定topic中拉取所有消息隊(duì)列
Set mqs = consumer.fetchSubscribeMessageQueues("order-topic");
核心源碼分析
fetchSubscribeMessageQueues()
通過調(diào)用fetchSubscribeMessageQueues()方法可以獲取指定topic(GET_ROUTEINTO_BY_TOPIC)的讀隊(duì)列信息。它通過向nameserver發(fā)送GetRouteInfoRequest請(qǐng)求,請(qǐng)求內(nèi)容為GET_ROUTEINTO_BY_TOPIC,nameserver將主題下的讀隊(duì)列個(gè)數(shù)發(fā)送給消費(fèi)者,然后消費(fèi)者使用如下代碼創(chuàng)建出與讀隊(duì)列個(gè)數(shù)相同的MessageQueue對(duì)象。
每個(gè)MessageQueue對(duì)象里面記錄了topic、broker名和讀隊(duì)列號(hào)。最后fetchSubscribeMessageQueues()將MessageQueue對(duì)象集合返回給調(diào)用者。
- 向NameServer發(fā)送請(qǐng)求獲取topic參數(shù)對(duì)應(yīng)的Broker信息和topic配置信息,即TopicRouteData對(duì)象。
public Set fetchSubscribeMessageQueues(String topic) throws MQClientException { try { TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis); if (topicRouteData != null) { // 2、遍歷topicRouteData Set mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData); if (!mqList.isEmpty()) { return mqList; } else { throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null); } } } catch (Exception e) { throw new MQClientException( "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST), e); } throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null); }
遍歷過程TopicRouteData
遍歷TopicRouteData對(duì)象的QueueData列表中每個(gè)QueueData對(duì)象,首先判斷該QueueData對(duì)象是否具有讀權(quán)限,
若有則根據(jù)該QueueData對(duì)象的readQueueNums值,創(chuàng)建readQueueNums個(gè)MessageQueue對(duì)象,并構(gòu)成MessageQueue集合;
最后返回給MessageQueue集合
public static Set topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) { Set mqList = new HashSet(); List qds = route.getQueueDatas(); for (QueueData qd : qds) { if (PermName.isReadable(qd.getPerm())) { for (int i = 0; i < qd.getReadQueueNums(); i++) { MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i); mqList.add(mq); } } } return mqList; }
consumer.fetchConsumeOffset
通過該方法獲取該MessageQueue隊(duì)列下面從offset位置開始的消息內(nèi)容,其中maxNums=32即表示獲取的最大消息個(gè)數(shù),offset為該MessageQueue對(duì)象的開始消費(fèi)位置。
DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)
fetchConsumeOffset()有兩個(gè)入?yún)ⅲ谝粋€(gè)參數(shù)表示隊(duì)列,第二個(gè)參數(shù)表示是否從broker獲取該隊(duì)列的消費(fèi)位移,true表示從broker獲取,false表示從本地記錄獲取,如果本地獲取不到再從broker獲取。
這里說的從本地獲取是指從RemoteBrokerOffsetStore.offsetTable屬性中獲取,該屬性記錄了每個(gè)隊(duì)列的消費(fèi)位移。當(dāng)從broker獲取位移后會(huì)更新offsetTable。
pullBlockIfNotFound拉取信息
rocketmq提供了多個(gè)拉取方法,可以使用pullBlockIfNotFound()方法也可以使用pull()方法。兩者的區(qū)別是如果隊(duì)列中沒有消息,兩個(gè)方法的超時(shí)時(shí)間是不同的,pullBlockIfNotFound會(huì)等待30s返回一個(gè)空結(jié)果,pull是等待10s返回空結(jié)果。
不過pull方法的入?yún)⒖梢哉{(diào)整超時(shí)時(shí)間,而pullBlockIfNotFound則需要修改DefaultMQPullConsumer.consumerPullTimeoutMillis參數(shù)。不過兩個(gè)方法調(diào)用的底層邏輯都是一樣的,都是調(diào)用DefaultMQPullConsumerImpl.pullSyncImpl()方法獲取消息。下面分析一下pullSyncImpl()方法。
public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis()); }
獲取該MessageQueue隊(duì)列的消費(fèi)進(jìn)度來設(shè)定參數(shù)offset值該方法最終調(diào)用pullSyncImpl,可以獲取相關(guān)的結(jié)果數(shù)據(jù)。
參數(shù)1:消息隊(duì)列(通過調(diào)用消費(fèi)者的fetchSubscibeMessageQueue(topic)可以得到相應(yīng)topic的所需要消息隊(duì)列) ;
參數(shù)2:需要過濾用的表達(dá)式 ;
參數(shù)3:偏移量即消費(fèi)隊(duì)列的進(jìn)度 ;
- 參數(shù)4:一次取消息的最大值 ;
DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
DefaultMQPullConsumerImpl.pullSyncImpl的實(shí)現(xiàn)過程
private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException { this.isRunning(); //檢查入?yún)⑹欠窈戏? if (null == mq) { throw new MQClientException("mq is null", null); } if (offset < 0) { throw new MQClientException("offset < 0", null); } if (maxNums <= 0) { throw new MQClientException("maxNums <= 0", null); } //更新再平衡服務(wù)的數(shù)據(jù),因?yàn)樵倨胶夥?wù)不起作用,所以更新數(shù)據(jù)沒有效果 this.subscriptionAutomatically(mq.getTopic()); int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false); //計(jì)算超時(shí)時(shí)間,如果調(diào)用的是pullBlockIfNotFound方法,block參數(shù)就是true,否則就是false long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType()); //調(diào)用PullAPIWrapper從broker拉取消息, //pullKernelImpl方法里面構(gòu)建PullMessageRequest請(qǐng)求對(duì)象 PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( mq,//隊(duì)列 subscriptionData.getSubString(),//消息的過濾規(guī)則 subscriptionData.getExpressionType(), isTagType ? 0L : subscriptionData.getSubVersion(), offset,//拉取消息的位移 maxNums,//建議broker一次性返回最大消息個(gè)數(shù),默認(rèn)是32個(gè) sysFlag, 0,//設(shè)置的提交位移,可以看到永遠(yuǎn)都是0,所以broker無法記錄有效位移,需要程序自己記錄控制提交位移 this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), timeoutMillis,//超時(shí)時(shí)間 CommunicationMode.SYNC, null//回調(diào)邏輯為null ); this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); //If namespace is not null , reset Topic without namespace. this.resetTopic(pullResult.getMsgFoundList()); if (!this.consumeMessageHookList.isEmpty()) { ConsumeMessageContext consumeMessageContext = null; consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace()); consumeMessageContext.setConsumerGroup(this.groupName()); consumeMessageContext.setMq(mq); consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); consumeMessageContext.setSuccess(false); this.executeHookBefore(consumeMessageContext); consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); consumeMessageContext.setSuccess(true); this.executeHookAfter(consumeMessageContext); } return pullResult; }
檢查MessageQueue對(duì)象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap
this.subscriptionAutomatically(mq.getTopic());
// 構(gòu)建標(biāo)志位,邏輯或運(yùn)算|=
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);
SubscriptionData subscriptionData; try { //以請(qǐng)求參數(shù)subExpression以及consumerGroup、topic為參數(shù)調(diào)用FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法構(gòu)造SubscriptionData對(duì)象并返回 subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(), mq.getTopic(), subExpression); } catch (Exception e) { throw new MQClientException("parse subscription error", e); } long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout; // 從broker中拉取消息 PullResult pullResult = this.pullAPIWrapper.pullKernelImpl( mq, subscriptionData.getSubString(), 0L, offset, maxNums, sysFlag, 0, this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(), timeoutMillis, CommunicationMode.SYNC, null ); // 對(duì)拉取到的消息進(jìn)行解碼,過濾并執(zhí)行回調(diào),并把解析的message列表放到MsgFoundList中 this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData); if (!this.consumeMessageHookList.isEmpty()) { ConsumeMessageContext consumeMessageContext = null; consumeMessageContext = new ConsumeMessageContext(); consumeMessageContext.setConsumerGroup(this.groupName()); consumeMessageContext.setMq(mq); consumeMessageContext.setMsgList(pullResult.getMsgFoundList()); consumeMessageContext.setSuccess(false); this.executeHookBefore(consumeMessageContext); consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString()); consumeMessageContext.setSuccess(true); this.executeHookAfter(consumeMessageContext); } return pullResult;}
Push和Pull的操作對(duì)比
- push-優(yōu)點(diǎn):及時(shí)性、服務(wù)端統(tǒng)一處理實(shí)現(xiàn)方便
- push-缺點(diǎn):容易造成堆積、負(fù)載性能不可控
- pull-優(yōu)點(diǎn):獲得消息狀態(tài)方便、負(fù)載均衡性能可控
- pull-缺點(diǎn):及時(shí)性差
使用DefaultMQPullConsumer拉取消息,發(fā)送到broker的提交位移永遠(yuǎn)都是0,所以broker無法記錄有效位移,需要程序自己記錄和控制提交位移。