前提介紹

在RocketMQ中一般有兩種獲取消息的方式,一個(gè)是拉(pull,消費(fèi)者主動(dòng)去broker拉取),一個(gè)是推(push,主動(dòng)推送給消費(fèi)者),在上一章節(jié)中已經(jīng)介紹到了相關(guān)的Push操作,接下來的章節(jié)會(huì)介紹Pull操作方式的消費(fèi)機(jī)制體系。

DefaultMQPullConsumer

DefaultMQPullConsumer與DefaultMQPushConsumer相比最大的區(qū)別是,消費(fèi)哪些隊(duì)列的消息,從哪個(gè)位移開始消費(fèi),以及何時(shí)提交消費(fèi)位移都是由程序自己的控制的。下面來介紹一下DefaultMQPullConsumer的內(nèi)部原理。

總體流程執(zhí)行

DefaultMQPullConsumer使用例子

public class MQPullConsumer {    private static final Map OFFSE_TABLE = new HashMap();    public static void main(String[] args) throws MQClientException {        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("groupName");        consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876");        consumer.start();        // 從指定topic中拉取所有消息隊(duì)列        Set mqs = consumer.fetchSubscribeMessageQueues("order-topic");        for(MessageQueue mq:mqs){            try {                // 獲取消息的offset,指定從store中獲取               long offset = consumer.fetchConsumeOffset(mq,true);                while(true){                 PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);                                 putMessageQueueOffset(mq,pullResult.getNextBeginOffset());                    switch(pullResult.getPullStatus()){                    case FOUND:                        List messageExtList = pullResult.getMsgFoundList();                        for (MessageExt m : messageExtList) {                            System.out.println(new String(m.getBody()));                        }                        break;                    case NO_MATCHED_MSG:                        break;                    case NO_NEW_MSG:                        break;                    case OFFSET_ILLEGAL:                        break;                    }                }            } catch (Exception e) {                e.printStackTrace();            }        }        consumer.shutdown();    }    // 保存上次消費(fèi)的消息下標(biāo)    private static void putMessageQueueOffset(MessageQueue mq,            long nextBeginOffset) {        OFFSE_TABLE.put(mq, nextBeginOffset);    }    // 獲取上次消費(fèi)的消息的下標(biāo)    private static Long getMessageQueueOffset(MessageQueue mq) {        Long offset = OFFSE_TABLE.get(mq);        if(offset != null){            return offset;        }        return 0l;    }}
  • 消費(fèi)者啟動(dòng):consumer.start();
  • 獲取主題下所有的消息隊(duì)列:這里是根據(jù)topic從nameserver獲取的這里我們可以修改為從其他位置獲取隊(duì)列信息
    Set mqs = consumer.fetchSubscribeMessageQueues("topicTest");//遍歷隊(duì)列for(MessageQueue mq:mqs){    try {    //獲取當(dāng)前隊(duì)列的消費(fèi)位移,第二個(gè)參數(shù)表示位移是從本地內(nèi)存獲取,還是從broker獲取,true表示從broker獲取        long offset = consumer.fetchConsumeOffset(mq,true);        while(true){            //第二個(gè)參數(shù)表示可以消費(fèi)哪些tag的消息            //第三個(gè)參數(shù)表示從哪個(gè)位移開始消費(fèi)消息            //第四個(gè)參數(shù)表示一次最大拉多少個(gè)消息            PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);}

DefaultMQPullConsumer的總體流程

啟動(dòng)DefaultMQPullConsumer是通過調(diào)用start()方法完成的

DefaultMQPullConsumer拉取源碼分析

分析下DefaultMQPullConsumer拉取消息的流程

consumer.fetchSubscribeMessageQueues("order-topic")

從指定topic中拉取所有消息隊(duì)列

Set mqs = consumer.fetchSubscribeMessageQueues("order-topic");

核心源碼分析

fetchSubscribeMessageQueues()
  • 通過調(diào)用fetchSubscribeMessageQueues()方法可以獲取指定topic(GET_ROUTEINTO_BY_TOPIC)的讀隊(duì)列信息。它通過向nameserver發(fā)送GetRouteInfoRequest請(qǐng)求,請(qǐng)求內(nèi)容為GET_ROUTEINTO_BY_TOPIC,nameserver將主題下的讀隊(duì)列個(gè)數(shù)發(fā)送給消費(fèi)者,然后消費(fèi)者使用如下代碼創(chuàng)建出與讀隊(duì)列個(gè)數(shù)相同的MessageQueue對(duì)象。

  • 每個(gè)MessageQueue對(duì)象里面記錄了topic、broker名和讀隊(duì)列號(hào)。最后fetchSubscribeMessageQueues()將MessageQueue對(duì)象集合返回給調(diào)用者。

  • 向NameServer發(fā)送請(qǐng)求獲取topic參數(shù)對(duì)應(yīng)的Broker信息和topic配置信息,即TopicRouteData對(duì)象。
 public Set fetchSubscribeMessageQueues(String topic) throws MQClientException {        try {            TopicRouteData topicRouteData = this.mQClientFactory.getMQClientAPIImpl().getTopicRouteInfoFromNameServer(topic, timeoutMillis);            if (topicRouteData != null) {                // 2、遍歷topicRouteData                Set mqList = MQClientInstance.topicRouteData2TopicSubscribeInfo(topic, topicRouteData);                if (!mqList.isEmpty()) {                    return mqList;                } else {                    throw new MQClientException("Can not find Message Queue for this topic, " + topic + " Namesrv return empty", null);                }            }        } catch (Exception e) {            throw new MQClientException(                "Can not find Message Queue for this topic, " + topic + FAQUrl.suggestTodo(FAQUrl.MQLIST_NOT_EXIST),                e);        }        throw new MQClientException("Unknow why, Can not find Message Queue for this topic, " + topic, null);    }

遍歷過程TopicRouteData

遍歷TopicRouteData對(duì)象的QueueData列表中每個(gè)QueueData對(duì)象,首先判斷該QueueData對(duì)象是否具有讀權(quán)限,
若有則根據(jù)該QueueData對(duì)象的readQueueNums值,創(chuàng)建readQueueNums個(gè)MessageQueue對(duì)象,并構(gòu)成MessageQueue集合;
最后返回給MessageQueue集合

public static Set topicRouteData2TopicSubscribeInfo(final String topic, final TopicRouteData route) {        Set mqList = new HashSet();        List qds = route.getQueueDatas();        for (QueueData qd : qds) {            if (PermName.isReadable(qd.getPerm())) {                for (int i = 0; i < qd.getReadQueueNums(); i++) {                    MessageQueue mq = new MessageQueue(topic, qd.getBrokerName(), i);                    mqList.add(mq);                }            }        }        return mqList;    }
consumer.fetchConsumeOffset

通過該方法獲取該MessageQueue隊(duì)列下面從offset位置開始的消息內(nèi)容,其中maxNums=32即表示獲取的最大消息個(gè)數(shù),offset為該MessageQueue對(duì)象的開始消費(fèi)位置。

DefaultMQPullConsumer.fetchConsumeOffset(MessageQueue mq, boolean fromStore)

fetchConsumeOffset()有兩個(gè)入?yún)ⅲ谝粋€(gè)參數(shù)表示隊(duì)列,第二個(gè)參數(shù)表示是否從broker獲取該隊(duì)列的消費(fèi)位移,true表示從broker獲取,false表示從本地記錄獲取,如果本地獲取不到再從broker獲取。
這里說的從本地獲取是指從RemoteBrokerOffsetStore.offsetTable屬性中獲取,該屬性記錄了每個(gè)隊(duì)列的消費(fèi)位移。當(dāng)從broker獲取位移后會(huì)更新offsetTable。

pullBlockIfNotFound拉取信息

rocketmq提供了多個(gè)拉取方法,可以使用pullBlockIfNotFound()方法也可以使用pull()方法。兩者的區(qū)別是如果隊(duì)列中沒有消息,兩個(gè)方法的超時(shí)時(shí)間是不同的,pullBlockIfNotFound會(huì)等待30s返回一個(gè)空結(jié)果,pull是等待10s返回空結(jié)果。

不過pull方法的入?yún)⒖梢哉{(diào)整超時(shí)時(shí)間,而pullBlockIfNotFound則需要修改DefaultMQPullConsumer.consumerPullTimeoutMillis參數(shù)。不過兩個(gè)方法調(diào)用的底層邏輯都是一樣的,都是調(diào)用DefaultMQPullConsumerImpl.pullSyncImpl()方法獲取消息。下面分析一下pullSyncImpl()方法。

public PullResult pullBlockIfNotFound(MessageQueue mq, String subExpression, long offset, int maxNums)        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        return this.pullSyncImpl(mq, subExpression, offset, maxNums, true, this.getDefaultMQPullConsumer().getConsumerPullTimeoutMillis());    }

獲取該MessageQueue隊(duì)列的消費(fèi)進(jìn)度來設(shè)定參數(shù)offset值該方法最終調(diào)用pullSyncImpl,可以獲取相關(guān)的結(jié)果數(shù)據(jù)。

  • 參數(shù)1:消息隊(duì)列(通過調(diào)用消費(fèi)者的fetchSubscibeMessageQueue(topic)可以得到相應(yīng)topic的所需要消息隊(duì)列) ;

  • 參數(shù)2:需要過濾用的表達(dá)式 ;

  • 參數(shù)3:偏移量即消費(fèi)隊(duì)列的進(jìn)度 ;

  • 參數(shù)4:一次取消息的最大值 ;
DefaultMQPullConsumerImpl.pullSyncImpl(MessageQueue mq, String subExpression, long offset, int maxNums, boolean block)
DefaultMQPullConsumerImpl.pullSyncImpl的實(shí)現(xiàn)過程
      private PullResult pullSyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, boolean block,        long timeout)        throws MQClientException, RemotingException, MQBrokerException, InterruptedException {        this.isRunning();        //檢查入?yún)⑹欠窈戏?       if (null == mq) {            throw new MQClientException("mq is null", null);        }        if (offset < 0) {            throw new MQClientException("offset < 0", null);        }        if (maxNums <= 0) {            throw new MQClientException("maxNums <= 0", null);        }        //更新再平衡服務(wù)的數(shù)據(jù),因?yàn)樵倨胶夥?wù)不起作用,所以更新數(shù)據(jù)沒有效果        this.subscriptionAutomatically(mq.getTopic());        int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);        //計(jì)算超時(shí)時(shí)間,如果調(diào)用的是pullBlockIfNotFound方法,block參數(shù)就是true,否則就是false        long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;        boolean isTagType = ExpressionType.isTagType(subscriptionData.getExpressionType());        //調(diào)用PullAPIWrapper從broker拉取消息,        //pullKernelImpl方法里面構(gòu)建PullMessageRequest請(qǐng)求對(duì)象        PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(            mq,//隊(duì)列            subscriptionData.getSubString(),//消息的過濾規(guī)則            subscriptionData.getExpressionType(),            isTagType ? 0L : subscriptionData.getSubVersion(),            offset,//拉取消息的位移            maxNums,//建議broker一次性返回最大消息個(gè)數(shù),默認(rèn)是32個(gè)            sysFlag,            0,//設(shè)置的提交位移,可以看到永遠(yuǎn)都是0,所以broker無法記錄有效位移,需要程序自己記錄控制提交位移            this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),            timeoutMillis,//超時(shí)時(shí)間            CommunicationMode.SYNC,            null//回調(diào)邏輯為null        );        this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);        //If namespace is not null , reset Topic without namespace.        this.resetTopic(pullResult.getMsgFoundList());        if (!this.consumeMessageHookList.isEmpty()) {            ConsumeMessageContext consumeMessageContext = null;            consumeMessageContext = new ConsumeMessageContext();            consumeMessageContext.setNamespace(defaultMQPullConsumer.getNamespace());            consumeMessageContext.setConsumerGroup(this.groupName());            consumeMessageContext.setMq(mq);            consumeMessageContext.setMsgList(pullResult.getMsgFoundList());            consumeMessageContext.setSuccess(false);            this.executeHookBefore(consumeMessageContext);            consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());            consumeMessageContext.setSuccess(true);            this.executeHookAfter(consumeMessageContext);        }        return pullResult;    }

檢查MessageQueue對(duì)象的topic是否在RebalanceImpl.subscriptionInner:ConcurrentHashMap變量中,若不在則以consumerGroup、topic、subExpression為參數(shù)調(diào)用FilterAPI.buildSubscriptionData(String consumerGroup, String topic, String subExpression)方法構(gòu)造SubscriptionData對(duì)象保存到RebalanceImpl.subscriptionInner變量中,其中 subExpression="*"
this.subscriptionAutomatically(mq.getTopic());
// 構(gòu)建標(biāo)志位,邏輯或運(yùn)算|=
int sysFlag = PullSysFlag.buildSysFlag(false, block, true, false);

    SubscriptionData subscriptionData;    try {        //以請(qǐng)求參數(shù)subExpression以及consumerGroup、topic為參數(shù)調(diào)用FilterAPI.buildSubscriptionData(String consumerGroup,Stringtopic, String subExpression)方法構(gòu)造SubscriptionData對(duì)象并返回        subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPullConsumer.getConsumerGroup(),            mq.getTopic(), subExpression);    } catch (Exception e) {        throw new MQClientException("parse subscription error", e);    }    long timeoutMillis = block ? this.defaultMQPullConsumer.getConsumerTimeoutMillisWhenSuspend() : timeout;    // 從broker中拉取消息    PullResult pullResult = this.pullAPIWrapper.pullKernelImpl(        mq,        subscriptionData.getSubString(),        0L,        offset,        maxNums,        sysFlag,        0,        this.defaultMQPullConsumer.getBrokerSuspendMaxTimeMillis(),        timeoutMillis,        CommunicationMode.SYNC,        null    );    // 對(duì)拉取到的消息進(jìn)行解碼,過濾并執(zhí)行回調(diào),并把解析的message列表放到MsgFoundList中    this.pullAPIWrapper.processPullResult(mq, pullResult, subscriptionData);    if (!this.consumeMessageHookList.isEmpty()) {        ConsumeMessageContext consumeMessageContext = null;        consumeMessageContext = new ConsumeMessageContext();        consumeMessageContext.setConsumerGroup(this.groupName());        consumeMessageContext.setMq(mq);        consumeMessageContext.setMsgList(pullResult.getMsgFoundList());        consumeMessageContext.setSuccess(false);        this.executeHookBefore(consumeMessageContext);        consumeMessageContext.setStatus(ConsumeConcurrentlyStatus.CONSUME_SUCCESS.toString());        consumeMessageContext.setSuccess(true);        this.executeHookAfter(consumeMessageContext);    }    return pullResult;}

Push和Pull的操作對(duì)比

  • push-優(yōu)點(diǎn):及時(shí)性、服務(wù)端統(tǒng)一處理實(shí)現(xiàn)方便
  • push-缺點(diǎn):容易造成堆積、負(fù)載性能不可控
  • pull-優(yōu)點(diǎn):獲得消息狀態(tài)方便、負(fù)載均衡性能可控
  • pull-缺點(diǎn):及時(shí)性差

使用DefaultMQPullConsumer拉取消息,發(fā)送到broker的提交位移永遠(yuǎn)都是0,所以broker無法記錄有效位移,需要程序自己記錄和控制提交位移。

資料參考