RocketMQ的前提回顧
RocketMQ是一款分布式、隊列模型的消息中間件,具有以下特點:
- 能夠保證嚴格的消息順序
- 提供豐富的消息拉取模式
- 高效的訂閱者水平擴展能力
- 實時的消息訂閱機制
- 億級消息堆積能力
為什么使用RocketMQ
- 強調集群無單點,可擴展,任意一點高可用、水平可擴展
- 海量消息堆積能力,消息堆積后寫入低延遲
- 支持上萬個隊列
- 消息失敗重試機制
- 消息可查詢
- 開源社區活躍
- 成熟度已經經過淘寶雙十一的考驗
RocketMQ的發展變化
RocketMQ開源是使用文件作為持久化工具,阿里內部未開源的性能會更高,使用oceanBase作為持久化工具。
在RocketMQ1.x和2.x使用zookeeper管理集群,3.x開始使用nameserver代替zk,更輕量級,此外RocketMQ的客戶端擁有兩種的操作方式:DefaultMQPushConsumer和DefaultMQPullConsumer。
DefaultMQPushConsumer的Maven配置
org.apache.rocketmq rocketmq-client 4.3.0
DefaultMQPushConsumer使用示例
- CONSUME_FROM_LAST_OFFSET:第一次啟動從隊列最后位置消費,后續再啟動接著上次消費的進度開始消費
- CONSUME_FROM_FIRST_OFFSET:第一次啟動從隊列初始位置消費,后續再啟動接著上次消費的進度開始消費
- CONSUME_FROM_TIMESTAMP:第一次啟動從指定時間點位置消費,后續再啟動接著上次消費的進度開始消費
以上所說的第一次啟動是指從來沒有消費過的消費者,如果該消費者消費過,那么會在broker端記錄該消費者的消費位置,如果該消費者掛了再啟動,那么自動從上次消費的進度開始
public class MQPushConsumer { public static void main(String[] args) throws MQClientException { String groupName = "rocketMqGroup1"; // 用于把多個Consumer組織到一起,提高并發處理能力 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); // 設置nameServer地址,多個以;分隔 consumer.setNamesrvAddr("name-serverl-ip:9876;name-server2-ip:9876"); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.setMessageModel(MessageModel.BROADCASTING); // 訂閱topic,可以對指定消息進行過濾,例如:"TopicTest","tagl||tag2||tag3",*或null表示topic所有消息 consumer.subscribe("order-topic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List mgs, ConsumeConcurrentlyContext consumeconcurrentlycontext) { System.out.println(Thread.currentThread().getName()+"Receive New Messages:"+mgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); }}
- CLUSTERING:默認模式,同一個ConsumerGroup(groupName相同)每個consumer只消費所訂閱消息的一部分內容,同一個ConsumerGroup里所有的Consumer消息加起來才是所
- 訂閱topic整體,從而達到負載均衡的目的
- BROADCASTING:同一個ConsumerGroup每個consumer都消費到所訂閱topic所有消息,也就是一個消費會被多次分發,被多個consumer消費。
ConsumeConcurrentlyStatus.RECONSUME_LATER boker會根據設置的messageDelayLevel發起重試,默認16次。
DefaultMQPushConsumerImpl中各個對象的主要功能如下:
RebalancePushImpl:主要負責決定,當前的consumer應該從哪些Queue中消費消息;
- 1)PullAPIWrapper:長連接,負責從broker處拉取消息,然后利用ConsumeMessageService回調用戶的Listener執行消息消費邏輯;
- 2)ConsumeMessageService:實現所謂的"Push-被動"消費機制;從Broker拉取的消息后,封裝成ConsumeRequest提交給ConsumeMessageSerivce,此service負責回調用戶的Listener消費消息;
- 3)OffsetStore:維護當前consumer的消費記錄(offset);有兩種實現,Local和Rmote,Local存儲在本地磁盤上,適用于BROADCASTING廣播消費模式;而Remote則將消費進度存儲在Broker上,適用于CLUSTERING集群消費模式;
- 4)MQClientFactory:負責管理client(consumer、producer),并提供多中功能接口供各個Service(Rebalance、PullMessage等)調用;大部分邏輯均在這個類中完成;
consumer.registerMessageListener執行過程:
/** * Register a callback to execute on message arrival for concurrent consuming. * @param messageListener message handling callback. */ @Override public void registerMessageListener(MessageListenerConcurrently messageListener) { this.messageListener = messageListener; this.defaultMQPushConsumerImpl.registerMessageListener(messageListener); }
通過源碼可以看出主要實現過程在DefaultMQPushConsumerImpl類中consumer.start后調用DefaultMQPushConsumerImpl的同步start方法
public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(), this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode()); this.serviceState = ServiceState.START_FAILED; this.checkConfig(); this.copySubscription(); if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) { this.defaultMQPushConsumer.changeInstanceNameToPID(); } this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); this.pullAPIWrapper = new PullAPIWrapper( mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode()); this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList); if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load(); if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start(); boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); if (!registerOK) { this.serviceState = ServiceState.CREATE_JUST; this.consumeMessageService.shutdown(); throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup() + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL), null); } mQClientFactory.start(); log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup()); this.serviceState = ServiceState.RUNNING; break; case RUNNING: case START_FAILED: case SHUTDOWN_ALREADY: throw new MQClientException("The PushConsumer service state not OK, maybe started once, " + this.serviceState + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK), null); default: break; } this.updateTopicSubscribeInfoWhenSubscriptionChanged(); this.mQClientFactory.checkClientInBroker(); this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); this.mQClientFactory.rebalanceImmediately(); }
通過mQClientFactory.start();發我們發現他調用
public void start() throws MQClientException { synchronized (this) { switch (this.serviceState) { case CREATE_JUST: this.serviceState = ServiceState.START_FAILED; // If not specified,looking address from name server if (null == this.clientConfig.getNamesrvAddr()) { this.mQClientAPIImpl.fetchNameServerAddr(); } // Start request-response channel this.mQClientAPIImpl.start(); // Start various schedule tasks this.startScheduledTask(); // Start pull service this.pullMessageService.start(); // Start rebalance service this.rebalanceService.start(); // Start push service this.defaultMQProducer.getDefaultMQProducerImpl().start(false); log.info("the client factory [{}] start OK", this.clientId); this.serviceState = ServiceState.RUNNING; break; case RUNNING: break; case SHUTDOWN_ALREADY: break; case START_FAILED: throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null); default: break; } } }
在這個方法中有多個start,我們主要看pullMessageService.start();通過這里我們發現RocketMQ的Push模式底層其實也是通過pull實現的,下面我們來看下pullMessageService處理了哪些邏輯:
private void pullMessage(final PullRequest pullRequest) { final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup()); if (consumer != null) { DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer; impl.pullMessage(pullRequest); } else { log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest); } }
我們發現其實他還是通過DefaultMQPushConsumerImpl類的pullMessage方法來進行消息的邏輯處理.
pullRequest拉取方式
PullRequest這里說明一下,上面我們已經提了一下rocketmq的push模式其實是通過pull模式封裝實現的,pullrequest這里是通過長輪詢的方式達到push效果。
長輪詢方式既有pull的優點又有push模式的實時性有點。
push方式是server端接收到消息后,主動把消息推送給client端,實時性高。弊端是server端工作量大,影響性能,其次是client端處理能力不同且client端的狀態不受server端的控制,如果client端不能及時處理消息容易導致消息堆積已經影響正常業務等。
- pull方式是client循環從server端拉取消息,主動權在client端,自己處理完一個消息再去拉取下一個,缺點是循環的時間不好設定,時間太短容易忙等,浪費CPU資源,時間間隔太長client的處理能力會下降,有時候有些消息會處理不及時。
長輪詢的方式可以結合兩者優點
- 檢查PullRequest對象中的ProcessQueue對象的dropped是否為true(在RebalanceService線程中為topic下的MessageQueue創建拉取消息請求時要維護對應的ProcessQueue對象,若Consumer不再訂閱該topic則會將該對象的dropped置為true);若是則認為該請求是已經取消的,則直接跳出該方法;
- 更新PullRequest對象中的ProcessQueue對象的時間戳(ProcessQueue.lastPullTimestamp)為當前時間戳;
- 檢查該Consumer是否運行中,即DefaultMQPushConsumerImpl.serviceState是否為RUNNING;若不是運行狀態或者是暫停狀態(DefaultMQPushConsumerImpl.pause=true),則調用PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)方法延遲再拉取消息,其中timeDelay=3000;該方法的目的是在3秒之后再次將該PullRequest對象放入PullMessageService. pullRequestQueue隊列中;并跳出該方法;
- 進行流控。若ProcessQueue對象的msgCount大于了消費端的流控閾值(DefaultMQPushConsumer.pullThresholdForQueue,默認值為1000),則調用PullMessageService.executePullRequestLater方法,在50毫秒之后重新該PullRequest請求放入PullMessageService.pullRequestQueue隊列中;并跳出該方法;
- 若不是順序消費(即DefaultMQPushConsumerImpl.consumeOrderly等于false),則檢查ProcessQueue對象的msgTreeMap:TreeMap
變量的第一個key值與最后一個key值之間的差額,該key值表示查詢的隊列偏移量queueoffset;若差額大于閾值(由DefaultMQPushConsumer. consumeConcurrentlyMaxSpan指定,默認是2000),則調用PullMessageService.executePullRequestLater方法,在50毫秒之后重新將該PullRequest請求放入PullMessageService.pullRequestQueue隊列中;并跳出該方法; - 以PullRequest.messageQueue對象的topic值為參數從RebalanceImpl.subscriptionInner: ConcurrentHashMap, SubscriptionData>中獲取對應的SubscriptionData對象,若該對象為null,考慮到并發的關系,調用executePullRequestLater方法,稍后重試;并跳出該方法;
- 若消息模型為集群模式(RebalanceImpl.messageModel等于CLUSTERING),則以PullRequest對象的MessageQueue變量值、type =READ_FROM_MEMORY(從內存中獲取消費進度offset值)為參數調用DefaultMQPushConsumerImpl. offsetStore對象(初始化為RemoteBrokerOffsetStore對象)的readOffset(MessageQueue mq, ReadOffsetType type)方法從本地內存中獲取消費進度offset值。若該offset值大于0 則置臨時變量commitOffsetEnable等于true否則為false;該offset值作為pullKernelImpl方法中的commitOffset參數,在Broker端拉取消息之后根據commitOffsetEnable參數值決定是否用該offset更新消息進度。該readOffset方法的邏輯是:以入參MessageQueue對象從RemoteBrokerOffsetStore.offsetTable:ConcurrentHashMap
變量中獲取消費進度偏移量;若該偏移量不為null則返回該值,否則返回-1; - 當每次拉取消息之后需要更新訂閱關系(由DefaultMQPushConsumer. postSubscriptionWhenPull參數表示,默認為false)并且以topic值參數從RebalanceImpl.subscriptionInner獲取的SubscriptionData對象的classFilterMode等于false(默認為false),則將sysFlag標記的第3個字節置為1,否則該字節置為0;
- 該sysFlag標記的第1個字節置為commitOffsetEnable的值;第2個字節(suspend標記)置為1;第4個字節置為classFilterMode的值;
- 初始化匿名內部類PullCallback,實現了onSucess/onException方法; 該方法只有在異步請求的情況下才會回調;
- 調用底層的拉取消息API接口:
PullAPIWrapper.pullKernelImpl
PullAPIWrapper.pullKernelImpl(MessageQueue mq, String subExpression, long subVersion,long offset, int maxNums, int sysFlag,long commitOffset,long brokerSuspendMaxTimeMillis, long timeoutMillis, CommunicationMode communicationMode, PullCallback pullCallback)方法進行消息拉取操作。
將回調類PullCallback傳入該方法中,當采用異步方式拉取消息時,在收到響應之后會回調該回調類的方法。
public void pullMessage(final PullRequest pullRequest) { final ProcessQueue processQueue = pullRequest.getProcessQueue(); if (processQueue.isDropped()) { log.info("the pull request[{}] is dropped.", pullRequest.toString()); return; } pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); try { this.makeSureStateOK(); } catch (MQClientException e) { log.warn("pullMessage exception, consumer state not ok", e); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); return; } if (this.isPause()) { log.warn("consumer was paused, execute pull request later. instanceName={}, group={}", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup()); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return; } long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { log.warn( "the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}", this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes); } return; } if (!this.consumeOrderly) { if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { log.warn( "the queues messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}", processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(), pullRequest, queueMaxSpanFlowControlTimes); } return; } } else { if (processQueue.isLocked()) { if (!pullRequest.isLockedFirst()) { final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); log.info("the first time to pull message, so fix offset from broker. pullRequest: {} NewOffset: {} brokerBusy: {}", pullRequest, offset, brokerBusy); if (brokerBusy) { log.info("[NOTIFYME]the first time to pull message, but pull request offset larger than broker consume offset. pullRequest: {} NewOffset: {}", pullRequest, offset); } pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); } } else { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.info("pull message later because not locked in broker, {}", pullRequest); return; } } final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); log.warn("find the consumers subscription failed, {}", pullRequest); return; } final long beginTimestamp = System.currentTimeMillis(); PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) { if (pullResult != null) { pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult, subscriptionData); switch (pullResult.getPullStatus()) { case FOUND: long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } } if (pullResult.getNextBeginOffset() < prevRequestOffset || firstMsgOffset < prevRequestOffset) { log.warn( "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", pullResult.getNextBeginOffset(), firstMsgOffset, prevRequestOffset); } break; case NO_NEW_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case NO_MATCHED_MSG: pullRequest.setNextOffset(pullResult.getNextBeginOffset()); DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest); DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); break; case OFFSET_ILLEGAL: log.warn("the pull request offset illegal, {} {}", pullRequest.toString(), pullResult.toString()); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); pullRequest.getProcessQueue().setDropped(true); DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() { @Override public void run() { try { DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), false); DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue()); DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue()); log.warn("fix the pull request offset, {}", pullRequest); } catch (Throwable e) { log.error("executeTaskLater Exception", e); } } }, 10000); break; default: break; } } } @Override public void onException(Throwable e) { if (!pullRequest.getMessageQueue().getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("execute the pull request exception", e); } DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); } }; boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } } String subExpression = null; boolean classFilter = false; SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null) { if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter ); try { // 下面我們看繼續跟進這個方法,這個方法已經就是客戶端如何拉取消息 this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), subExpression, subscriptionData.getExpressionType(), subscriptionData.getSubVersion(), pullRequest.getNextOffset(), this.defaultMQPushConsumer.getPullBatchSize(), sysFlag, commitOffsetValue, BROKER_SUSPEND_MAX_TIME_MILLIS, CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 消息的通信方式為異步 CommunicationMode.ASYNC, pullCallback ); } catch (Exception e) { log.error("pullKernelImpl exception", e); this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION); } }
發送遠程請求拉取消息
在MQClientAPIImpl.pullMessage方法中,根據入參communicationMode的值分為異步拉取和同步拉取方式兩種。
無論是異步方式拉取還是同步方式拉取,在發送拉取請求之前都會構造一個ResponseFuture對象,以請求消息的序列號為key值,存入NettyRemotingAbstract.responseTable:ConcurrentHashMap, ResponseFuture>變量中,對該變量有幾種情況會處理:
- 發送失敗后直接刪掉responseTable變量中的相應記錄;
- 收到響應消息之后,會以響應消息中的序列號(由服務端根據請求消息的序列號原樣返回)從responseTable中查找ResponseFuture對象,并設置該對象的responseCommand變量。若是同步發送會喚醒等待響應的ResponseFuture.waitResponse方法;若是異步發送會調用ResponseFuture.executeInvokeCallback()方法完成回調邏輯處理;
- 在NettyRemotingClient.start()啟動時,也會初始化定時任務,該定時任務每隔1秒定期掃描responseTable列表,遍歷該列表中的ResponseFuture對象,檢查等待響應是否超時,若超時,則調用ResponseFuture. executeInvokeCallback()方法,并將該對象從responseTable列表中刪除;
public PullResult pullMessage( final String addr, final PullMessageRequestHeader requestHeader, final long timeoutMillis, final CommunicationMode communicationMode, final PullCallback pullCallback ) throws RemotingException, MQBrokerException, InterruptedException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.PULL_MESSAGE, requestHeader); switch (communicationMode) { case ONEWAY: assert false; return null; case ASYNC: this.pullMessageAsync(addr, request, timeoutMillis, pullCallback); return null; case SYNC: return this.pullMessageSync(addr, request, timeoutMillis); default: assert false; break; } return null; }
同步拉取
對于同步發送方式,調用MQClientAPIImpl.pullMessageSync(String addr, RemotingCommand request, long timeoutMillis)方法,大致步驟如下:
- 調用RemotingClient.invokeSync(String addr, RemotingCommand request, long timeoutMillis)方法:
- 獲取Broker地址的Channel信息。根據broker地址從RemotingClient.channelTables:ConcurrentHashMap, ChannelWrapper>變量中獲取ChannelWrapper對象并返回該對象的Channel變量;若沒有ChannelWrapper對象則與broker地址建立新的連接并將連接信息存入channelTables變量中,便于下次使用;
- 若NettyRemotingClient.rpcHook:RPCHook變量不為空(該變量在應用層初始化DefaultMQPushConsumer或者DefaultMQPullConsumer對象傳入該值),則調用RPCHook.doBeforeRequest(String remoteAddr, RemotingCommand request)方法;
- 調用NettyRemotingAbstract.invokeSyncImpl(Channel channel, RemotingCommand request, long timeoutMillis)方法,該方法的邏輯如下:
- A)使用請求的序列號(opaue)、超時時間初始化ResponseFuture對象;并將該ResponseFuture對象存入NettyRemotingAbstract.responseTable: ConcurrentHashMap變量中;
- B)調用Channel.writeAndFlush(Object msg)方法將請求對象RemotingCommand發送給Broker;然后調用addListener(GenericFutureListener extends Future super Void>> listener)方法添加內部匿名類:該內部匿名類實現了ChannelFutureListener接口的operationComplete方法,在發送完成之后回調該監聽類的operationComplete方法,在該方法中,首先調用ChannelFuture. isSuccess()方法檢查是否發送成功,若成功則置ResponseFuture對象的sendRequestOK等于true并退出此回調方法等待響應結果;若不成功則置ResponseFuture對象的sendRequestOK等于false,然后從NettyRemotingAbstract.responseTable中刪除此請求序列號(opaue)的記錄,置ResponseFuture對象的responseCommand等于null,并喚醒ResponseFuture.waitResponse(long timeoutMillis)方法的等待;
- C)調用ResponseFuture.waitResponse(long timeoutMillis)方法等待響應結果;在發送失敗或者收到響應消息(詳見5.10.3小節)或者超時的情況下會喚醒該方法返回ResponseFuture.responseCommand變量值;
- D)若上一步返回的responseCommand值為null,則拋出異常:若ResponseFuture.sendRequestOK為true,則拋出RemotingTimeoutException異常,否則拋出RemotingSendRequestException異常;
- E)若上一步返回的responseCommand值不為null,則返回responseCommand變量值;
- 若NettyRemotingClient.rpcHook: RPCHook變量不為空,則調用RPCHook.doAfterResponse(String remoteAddr, RemotingCommand request)方法;
- 以上一步的返回值RemotingCommand對象為參數調用MQClientAPIImpl. processPullResponse (RemotingCommand response)方法將返回對象解析并封裝成PullResultExt對象然后返回給調用者,響應消息的結果狀態轉換如下:
- 若RemotingCommand對象的Code等于SUCCESS,則PullResultExt.pullStatus=FOUND;
- 若RemotingCommand對象的Code等于PULL_NOT_FOUND,則PullResultExt.pullStatus= NO_NEW_MSG;
- 若RemotingCommand對象的Code等于PULL_RETRY_IMMEDIATELY,則PullResultExt.pullStatus= NO_MATCHED_MSG;
- 若RemotingCommand對象的Code等于PULL_OFFSET_MOVED,則PullResultExt.pullStatus= OFFSET_ILLEGAL;
@Override public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException { long beginStartTime = System.currentTimeMillis(); final Channel channel = this.getAndCreateChannel(addr); if (channel != null && channel.isActive()) { try { if (this.rpcHook != null) { this.rpcHook.doBeforeRequest(addr, request); } long costTime = System.currentTimeMillis() - beginStartTime; if (timeoutMillis < costTime) { throw new RemotingTimeoutException("invokeSync call timeout"); } RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime); if (this.rpcHook != null) { this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel), request, response); } return response; } catch (RemotingSendRequestException e) { log.warn("invokeSync: send request exception, so close the channel[{}]", addr); this.closeChannel(addr, channel); throw e; } catch (RemotingTimeoutException e) { if (nettyClientConfig.isClientCloseSocketIfTimeout()) { this.closeChannel(addr, channel); log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr); } log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr); throw e; } } else { this.closeChannel(addr, channel); throw new RemotingConnectException(addr); } }
getMQClientAPIImpl().pullMessage最終通過channel寫入并刷新隊列中。然后在消息服務端大體的處理邏輯是服務端收到新消息請求后,如果隊列中沒有消息不急于返回,通過一個循環狀態,每次waitForRunning一段時間默認5秒,然后再check,如果broker一直沒有新新消息,第三次check的時間等到時間超過SuspendMaxTimeMills就返回空,如果在等待過程中收到了新消息直接調用notifyMessageArriving函數返回請求結果。“長輪詢”的核心是,Broker端HOLD住客戶端過來的請求一小段時間,在這個時間內有新消息到達,就利用現有的連接立刻返回消息給 Consumer 。長輪詢的主動權掌握在consumer中,即使broker有大量的消息堆積也不會主動推送給consumer。