国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

RocketMQ源碼學習(四)-Consumer

周國輝 / 1595人閱讀

摘要:的都是從消息來消費,但是為了能做到實時收消息,使用長輪詢方式,可以保證消息實時性同方式一致。這種情況建議應用,再消費下一條消息,這樣可以減輕重試消息的壓力。邏輯請求按參數返回按照重置消費從而實現回溯消費

這次源碼學習的方法是帶著問題學習源碼實現,問題列表如下

Consumer Group的概念是什么?

Consumer pull過程是怎樣的?

Consumer 支持push嗎?

Consumer 怎么實現單隊列并行消費?

Consumer 怎么過濾消息?

Consumer 怎么保證一條消息只被Group中的一個服務消費?

Consumer 負載均衡怎么實現?

Consumer 消費失敗怎么辦?

Consumer 可以回溯消費嗎?

Consumer

消息消費者,負責消費消息,一般是后臺系統負責異步消費。

Consumer Group的概念是什么?

一類 Consumer 的集合名稱,這類 Consumer 通常消費一類消息,且消費邏輯一致。一般情況下group中Consumer的數量不能超過訂閱的topic中queue的數量,不然會有閑置的Consumer.

Consumer pull過程是怎樣的?

分析過Producer,看Consumer有種似曾相識的感覺

主要邏輯

1. 根據mq信息去找broker路由信息
2. 根據相關參數構建請求頭
3. 委托netty去broker獲取消息

代碼走讀

MQPullConsumer.pull的參數需指定MessageQueue,和offset(位置偏移)的.

    PullResult pull(final MessageQueue mq, final String subExpression, final long offset,
        final int maxNums) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;

再看Pull操作的返回,有本次獲取的數據信息MessageExt,即位置信息offset

public class PullResult {
    //pull狀態
    private final PullStatus pullStatus;
    //下次pull的偏移量
    private final long nextBeginOffset;
    //最小偏移量
    private final long minOffset;
    //最大偏移量
    private final long maxOffset;
    //獲取到的消息
    private List msgFoundList;
}

MQPullConsumer.pull
-> DefaultMQPullConsumer.pull
-> DefaultMQPullConsumerImpl.pull
-> DefaultMQPullConsumerImpl.pullSyncImpl
-> DefaultMQPullConsumerImpl.pullKernelImpl

  public PullResult pullKernelImpl(
        final MessageQueue mq,
        final String subExpression,
        final String expressionType,
        final long subVersion,
        final long offset,
        final int maxNums,
        final int sysFlag,
        final long commitOffset,
        final long brokerSuspendMaxTimeMillis,
        final long timeoutMillis,
        final CommunicationMode communicationMode,
        final PullCallback pullCallback
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        //獲取broker信息
        FindBrokerResult findBrokerResult =
            this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                this.recalculatePullFromWhichNode(mq), false);
        if (null == findBrokerResult) {
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
            findBrokerResult =
                this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(),
                    this.recalculatePullFromWhichNode(mq), false);
        }

        if (findBrokerResult != null) {
            {
                // check version
                if (!ExpressionType.isTagType(expressionType)
                    && findBrokerResult.getBrokerVersion() < MQVersion.Version.V4_1_0_SNAPSHOT.ordinal()) {
                    throw new MQClientException("The broker[" + mq.getBrokerName() + ", "
                        + findBrokerResult.getBrokerVersion() + "] does not upgrade to support for filter message by " + expressionType, null);
                }
            }
            int sysFlagInner = sysFlag;

            if (findBrokerResult.isSlave()) {
                sysFlagInner = PullSysFlag.clearCommitOffsetFlag(sysFlagInner);
            }
            // 構建pull請求頭
            PullMessageRequestHeader requestHeader = new PullMessageRequestHeader();
            requestHeader.setConsumerGroup(this.consumerGroup);
            requestHeader.setTopic(mq.getTopic());
            requestHeader.setQueueId(mq.getQueueId());
            requestHeader.setQueueOffset(offset);
            requestHeader.setMaxMsgNums(maxNums);
            requestHeader.setSysFlag(sysFlagInner);
            requestHeader.setCommitOffset(commitOffset);
            requestHeader.setSuspendTimeoutMillis(brokerSuspendMaxTimeMillis);
            requestHeader.setSubscription(subExpression);
            requestHeader.setSubVersion(subVersion);
            requestHeader.setExpressionType(expressionType);

            String brokerAddr = findBrokerResult.getBrokerAddr();
            if (PullSysFlag.hasClassFilterFlag(sysFlagInner)) {
                brokerAddr = computPullFromWhichFilterServer(mq.getTopic(), brokerAddr);
            }
            //委托Netty去獲取信息
            PullResult pullResult = this.mQClientFactory.getMQClientAPIImpl().pullMessage(
                brokerAddr,
                requestHeader,
                timeoutMillis,
                communicationMode,
                pullCallback);

            return pullResult;
        }

        throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
    }

pull消息比較簡單,一次請求返回,由Consumer管理offset.一般來說一個Consumer Group中Consumer的數量不能大于MessageQueue的數量.

Consumer 支持push嗎?

Push Consumer

Consumer 的一種,應用通常向 Consumer 對象注冊一個 Listener 接口,一旦收到消息,Consumer 對象立

刻回調 Listener 接口方法。JMS標準中為MessageListener類的onMessage方法.

Pull Consumer

Consumer 的一種,應用通常主動調用 Consumer 的拉消息方法從 Broker 拉消息,主動權由應用控制。

RocketMQ的Consumer都是從Broker pull消息來消費,但是為了能做到實時收消息,RocketMQ 使用長輪詢方式,可以保證消息實時性同Push方式一致。這種長輪詢方式類似于WebQQ收發消息機制。請參考以下信息了解更多Comet:基于 HTTP 長連接的“服務器推”技術

雖然RocketMQ的consumer都是通過pull來實現的但是其封裝了push接口,我們先來看其使用方法

 public static void main(String[] args) throws InterruptedException, MQClientException {  
        /** 
         * 一個應用創建一個Consumer,由應用來維護此對象,可以設置為全局對象或者單例 
         * 注意:ConsumerGroupName需要由應用來保證唯一 
         */  
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testmerchantLeagueConsumerGroup");  
        consumer.setNamesrvAddr("ip:port");  
          
        /** 
         * 訂閱指定topic下tags分別等于TagA或TagB 
         */  
        consumer.subscribe("broker-a", "TagB || TagA");  
  
        /** 
         * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 
         * 如果非第一次啟動,那么按照上次消費的位置繼續消費 
         */  
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
  
        //真正的處理消息邏輯在這里
        consumer.registerMessageListener(new MessageListenerConcurrently() {  
  
            /** 
             * 默認msgs里只有一條消息,可以通過設置consumeMessageBatchMaxSize參數來批量接收消息 
             */  
            @Override  
            public ConsumeConcurrentlyStatus consumeMessage(List msgs,  
                    ConsumeConcurrentlyContext context) {  
                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);  
                MessageExt msg = msgs.get(0);  
                if (msg.getTopic().equals("broker-a")) {  
                    // 執行TopicTest1的消費邏輯  
                    if (msg.getTags() != null && msg.getTags().equals("TagA")) {  
                        // 執行TagA的消費  
                        String message = new String(msg.getBody());  
                        System.out.println(message);  
                    }  
                    else if (msg.getTags() != null && msg.getTags().equals("TagB")) {  
                        // 執行TagB的消費  
                        String message = new String(msg.getBody());  
                        System.out.println(message);  
                    }  
                    
                }  
                //消費者向mq服務器返回消費成功的消息  
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
            }  
        });  
  
       //Consumer對象在使用之前必須要調用start初始化,初始化一次即可
        consumer.start();  
  
    }  

RocketMQ push的實現 :

消息的拉取邏輯

維護一個pullRequestQueue,先放入一個pullRequest,當pullResult為成功時,再構建新的pullRequest放入pullRequestQueue,另起一個線程監測pullRequestQueue,當起不為空時,輪詢pull消息

DefaultMQPushConsumer.start
-> DefaultMQPushConsumerImpl.start
-> MQClientInstance.start
-> PullMessageService.start
我們來看PullMessageService的run方法,

    //請求消息阻塞鏈表
    private final LinkedBlockingQueue pullRequestQueue = new LinkedBlockingQueue();
    @Override
    public void run() {
        log.info(this.getServiceName() + " service started");
        //只要有請求就去pull消息
        while (!this.isStopped()) {
            try {
                PullRequest pullRequest = this.pullRequestQueue.take();
                if (pullRequest != null) {
                    this.pullMessage(pullRequest);
                }
            } catch (InterruptedException e) {
            } catch (Exception e) {
                log.error("Pull Message Service Run Method exception", e);
            }
        }

        log.info(this.getServiceName() + " service end");
    }

pullRequestQueue在在哪里put呢?
在class里找到在executePullRequestLater方法內會put

   public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
        this.scheduledExecutorService.schedule(new Runnable() {

            @Override
            public void run() {
                PullMessageService.this.executePullRequestImmediately(pullRequest);
            }
        }, timeDelay, TimeUnit.MILLISECONDS);
    }

  public void executePullRequestImmediately(final PullRequest pullRequest) {
        try {
            this.pullRequestQueue.put(pullRequest);
        } catch (InterruptedException e) {
            log.error("executePullRequestImmediately pullRequestQueue.put", e);
        }
    }

查看此方法的調用關系,發現在run中的pullMessage方法中onSuccess回調中會構建下一次的pullRequestQueue待下次請求

 PullCallback pullCallback = new PullCallback() {
            @Override
            public void onSuccess(PullResult pullResult) {
                if (pullResult != null) {
                    pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            long prevRequestOffset = pullRequest.getNextOffset();
              //請求成功就構建新的pullRequest              pullRequest.setNextOffset(pullResult.getNextBeginOffset());
                            long pullRT = System.currentTimeMillis() - beginTimestamp;
                            DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(),
                                pullRequest.getMessageQueue().getTopic(), pullRT);

                            long firstMsgOffset = Long.MAX_VALUE;
                            if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
                                DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            } else {
                                firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();

                                DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),
                                    pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());

                                boolean dispathToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
                                DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                                    pullResult.getMsgFoundList(), //
                                    processQueue, //
                                    pullRequest.getMessageQueue(), //
                                    dispathToConsume);
//放到pullRequestQueue
                                if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
                                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest,
                                        DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
                                } else {
                                    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                                }
                            }

                            if (pullResult.getNextBeginOffset() < prevRequestOffset//
                                || firstMsgOffset < prevRequestOffset) {
                                log.warn(
                                    "[BUG] pull message result maybe data wrong, nextBeginOffset: {} firstMsgOffset: {} prevRequestOffset: {}", //
                                    pullResult.getNextBeginOffset(), //
                                    firstMsgOffset, //
                                    prevRequestOffset);
                            }

                            break;
                        case NO_NEW_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case NO_MATCHED_MSG:
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            DefaultMQPushConsumerImpl.this.correctTagsOffset(pullRequest);

                            DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
                            break;
                        case OFFSET_ILLEGAL:
                            log.warn("the pull request offset illegal, {} {}", //
                                pullRequest.toString(), pullResult.toString());
                            pullRequest.setNextOffset(pullResult.getNextBeginOffset());

                            pullRequest.getProcessQueue().setDropped(true);
                            DefaultMQPushConsumerImpl.this.executeTaskLater(new Runnable() {

                                @Override
                                public void run() {
                                    try {
                                        DefaultMQPushConsumerImpl.this.offsetStore.updateOffset(pullRequest.getMessageQueue(),
                                            pullRequest.getNextOffset(), false);

                                        DefaultMQPushConsumerImpl.this.offsetStore.persist(pullRequest.getMessageQueue());

                                        DefaultMQPushConsumerImpl.this.rebalanceImpl.removeProcessQueue(pullRequest.getMessageQueue());

                                        log.warn("fix the pull request offset, {}", pullRequest);
                                    } catch (Throwable e) {
                                        log.error("executeTaskLater Exception", e);
                                    }
                                }
                            }, 10000);
                            break;
                        default:
                            break;
                    }
                }
            }

至此獲取消息已經搞定,再看怎么觸發MessageListener的消費方法.
還是在DefaultMQPushConsumerImpl.pullMessage方法內的回調,有下列代碼,把消息提供給consumeMessageService處理.

  DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(//
                                    pullResult.getMsgFoundList(), //
                                    processQueue, //
                                    pullRequest.getMessageQueue(), //
                                    dispathToConsume);

構建ConsumeRequest,然后提交至線程池消費

    @Override
    public void submitConsumeRequest(//
        final List msgs, //
        final ProcessQueue processQueue, //
        final MessageQueue messageQueue, //
        final boolean dispatchToConsume) {
        final int consumeBatchSize = this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();
        if (msgs.size() <= consumeBatchSize) {
            ConsumeRequest consumeRequest = new ConsumeRequest(msgs, processQueue, messageQueue);
            try {
                this.consumeExecutor.submit(consumeRequest);
            } catch (RejectedExecutionException e) {
                this.submitConsumeRequestLater(consumeRequest);
            }
        } else {
            for (int total = 0; total < msgs.size(); ) {
                List msgThis = new ArrayList(consumeBatchSize);
                for (int i = 0; i < consumeBatchSize; i++, total++) {
                    if (total < msgs.size()) {
                        msgThis.add(msgs.get(total));
                    } else {
                        break;
                    }
                }

                ConsumeRequest consumeRequest = new ConsumeRequest(msgThis, processQueue, messageQueue);
                try {
                    this.consumeExecutor.submit(consumeRequest);
                } catch (RejectedExecutionException e) {
                    for (; total < msgs.size(); total++) {
                        msgThis.add(msgs.get(total));
                    }

                    this.submitConsumeRequestLater(consumeRequest);
                }
            }
        }
    }

終于在ConsumeRequest的run方法中找到了listner的consumeMessage

 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);

這下整個pull邏輯就完成了.

Consumer 怎么實現單隊列并行消費

上節代碼就是取得并行的例子,簡單來說就是把消息提交給線程池,而不阻塞,就單隊列并行消費了

Consumer 怎么過濾消息?

入口還是在DefaultMQPushConsumerImpl.pullMessage

pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrapper.processPullResult(pullRequest.getMessageQueue(), pullResult,
                        subscriptionData);

往里面看,發現有過濾消息的邏輯

   public PullResult processPullResult(final MessageQueue mq, final PullResult pullResult,
        final SubscriptionData subscriptionData) {
        PullResultExt pullResultExt = (PullResultExt) pullResult;

        this.updatePullFromWhichNode(mq, pullResultExt.getSuggestWhichBrokerId());
        if (PullStatus.FOUND == pullResult.getPullStatus()) {
            ByteBuffer byteBuffer = ByteBuffer.wrap(pullResultExt.getMessageBinary());
            List msgList = MessageDecoder.decodes(byteBuffer);

            List msgListFilterAgain = msgList;
            if (!subscriptionData.getTagsSet().isEmpty() && !subscriptionData.isClassFilterMode()) {
                msgListFilterAgain = new ArrayList(msgList.size());
                for (MessageExt msg : msgList) {
                    if (msg.getTags() != null) {
                        if (subscriptionData.getTagsSet().contains(msg.getTags())) {
                            msgListFilterAgain.add(msg);
                        }
                    }
                }
            }
            //消息過濾
            if (this.hasHook()) {
                FilterMessageContext filterMessageContext = new FilterMessageContext();
                filterMessageContext.setUnitMode(unitMode);
                filterMessageContext.setMsgList(msgListFilterAgain);
                this.executeHook(filterMessageContext);
            }

            for (MessageExt msg : msgListFilterAgain) {
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MIN_OFFSET,
                    Long.toString(pullResult.getMinOffset()));
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_MAX_OFFSET,
                    Long.toString(pullResult.getMaxOffset()));
            }

            pullResultExt.setMsgFoundList(msgListFilterAgain);
        }

        pullResultExt.setMessageBinary(null);

        return pullResult;
    }
Consumer 怎么保證一條消息只被Group中的一個服務消費?

因為topic的MessageQueue只能對應Group中的一個Consumer,所以一條消息只被Group中的一個服務消費

Consumer 負載均衡怎么實現?

概念:

consumer同時消費多個MessageQueue,當topic中的MessageQueue變更時,動態調整消費MessageQueue的數量

 //RebalanceImpl
 public void doRebalance(final boolean isOrder) {
        Map subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic, isOrder);
                } catch (Throwable e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }

我們只關心集群模式
主要邏輯:

1. 獲取topic所有MessageQueue
2. 獲取同ConsumerGroup組所有Consumer信息
3. 根據制定策略分配給此Consumer
 private void rebalanceByTopic(final String topic, final boolean isOrder) {
        switch (messageModel) {
            case BROADCASTING: {
                Set mqSet = this.topicSubscribeInfoTable.get(topic);
                if (mqSet != null) {
                    boolean changed = this.updateProcessQueueTableInRebalance(topic, mqSet, isOrder);
                    if (changed) {
                        this.messageQueueChanged(topic, mqSet, mqSet);
                        log.info("messageQueueChanged {} {} {} {}", //
                            consumerGroup, //
                            topic, //
                            mqSet, //
                            mqSet);
                    }
                } else {
                    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                }
                break;
            }
            case CLUSTERING: {
            //獲取該topic所有MessageQueue
                Set mqSet = this.topicSubscribeInfoTable.get(topic);
            //獲取同consumerGroup信息
                List cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                if (null == mqSet) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
                    }
                }

                if (null == cidAll) {
                    log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
                }

                if (mqSet != null && cidAll != null) {
                    List mqAll = new ArrayList();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List allocateResult = null;
                    try {
                    //根據分配策略分配MessageQueue給當前Consumer
                        allocateResult = strategy.allocate(//
                            this.consumerGroup, //
                            this.mQClientFactory.getClientId(), //
                            mqAll, //
                            cidAll);
                    } catch (Throwable e) {
                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
                            e);
                        return;
                    }

                    Set allocateResultSet = new HashSet();
                    if (allocateResult != null) {
                        allocateResultSet.addAll(allocateResult);
                    }

                    boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
                    if (changed) {
                        log.info(
                            "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
                            strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
                            allocateResultSet.size(), allocateResultSet);
                        this.messageQueueChanged(topic, mqSet, allocateResultSet);
                    }
                }
                break;
            }
            default:
                break;
        }
    }

RocketMQ提供了幾種策略供使用

實現類 策略名
AllocateMessageQueueAveragelyByCircle 輪詢平均分配策略
AllocateMessageQueueByMachineRoom 根據機房分配策略
AllocateMessageQueueConsistentHash 一致Hash分配策略

本節編寫參考分布式消息隊列RocketMQ源碼分析之4 -- Consumer負載均衡與Kafka的Consumer負載均衡之不同點

Consumer 消費失敗怎么辦

Consumer 消費消息失敗后,要提供一種重試機制,令消息再消費一次。Consumer 消費消息失敗通常可以認為 有以下幾種情況

由于消息本身的原因,例如反序列化失敗,消息數據本身無法處理(例如話費充值,當前消息的手機號被注銷,無法充值)等。這種錯誤通常需要跳過這條消息,再消費其他消息,而這條失敗的消息即使立刻重試消費,99%也不成功,所以最好提供一種定時重試機制,即過 10s 秒后再重試。

由于依賴的下游應用服務不可用,例如db連接不可用,外系統網絡不可達等。遇到這種錯誤,即使跳過當前失敗的消息,消費其他消息同樣也會報錯。這種情況建議應用 sleep 30s,再 消費下一條消息,這樣可以減輕 Broker 重試消息的壓力。

具體到代碼實現,會根據消費狀態進行處理,當無返回時會重試.

   if (null == status) {
                log.warn("consumeMessage return null, Group: {} Msgs: {} MQ: {}",
                    ConsumeMessageConcurrentlyService.this.consumerGroup,
                    msgs,
                    messageQueue);
                //設置狀態為重試
                status = ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
public void processConsumeResult(//
        final ConsumeConcurrentlyStatus status, //
        final ConsumeConcurrentlyContext context, //
        final ConsumeRequest consumeRequest//
    ) {
        int ackIndex = context.getAckIndex();

        if (consumeRequest.getMsgs().isEmpty())
            return;

        switch (status) {
            case CONSUME_SUCCESS:
                if (ackIndex >= consumeRequest.getMsgs().size()) {
                    ackIndex = consumeRequest.getMsgs().size() - 1;
                }
                int ok = ackIndex + 1;
                int failed = consumeRequest.getMsgs().size() - ok;
                this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
                break;
            case RECONSUME_LATER:
                ackIndex = -1;
                this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
                    consumeRequest.getMsgs().size());
                break;
            default:
                break;
        }

        switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
                }
                break;
            case CLUSTERING:
                List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());
                for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
                    MessageExt msg = consumeRequest.getMsgs().get(i);
                    boolean result = this.sendMessageBack(msg, context);
                    if (!result) {
                        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
                        msgBackFailed.add(msg);
                    }
                }

                if (!msgBackFailed.isEmpty()) {
                    consumeRequest.getMsgs().removeAll(msgBackFailed);
                //請求重試消費
                    this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
                }
                break;
            default:
                break;
        }

        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }
Consumer 可以回溯消費嗎?

回溯消費是指Consumer已經消費成功的消息,由于業務上需求需要重新消費,要支持此功能,Broker在向Consumer投遞成功消息后,消息仍然需要保留。并且重新消費一般是按照時間維度,例如由于Consumer系統故障,恢復后需要重新消費 1 小時前的數據,那么Broker要提供一種機制,可以按照時間維度來回退消費進度。RocketMQ支持按照時間回溯消費,時間維度精確到毫秒,可以向前回溯,也可以向后回溯。

邏輯: 請求broker按參數返回offset,按照offset重置消費offset,從而實現回溯消費

    public Map invokeBrokerToResetOffset(final String addr, final String topic, final String group,
        final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC)
        throws RemotingException, MQClientException, InterruptedException {
        ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
        requestHeader.setTopic(topic);
        requestHeader.setGroup(group);
        requestHeader.setTimestamp(timestamp);
        requestHeader.setForce(isForce);

        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
        if (isC) {
            request.setLanguage(LanguageCode.CPP);
        }

        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            case ResponseCode.SUCCESS: {
                if (response.getBody() != null) {
                    ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
                    return body.getOffsetTable();
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70344.html

相關文章

  • 關于 RocketMQ ClientID 相同引發的消息堆積的問題

    摘要:本篇著重聊聊為什么會消息堆積。默認的策略很好理解,將平均的分配給。那么最后時,本來不同的,會取到相同的舉個例子,和都取到了前個,從而造成有些如果有的話沒有對其消費,而沒有被消費,消息也在不停的投遞進來,就會造成消息的大量堆積。 首先,造成這個問題的 BUG RocketMQ 官方已經在 3月16號 的這個提交中...

    psychola 評論0 收藏0
  • RocketMQ源碼學習(一)-概述

    摘要:每個與集群中的所有節點建立長連接,定時注冊信息到所有。完全無狀態,可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實現雖然版本不太一致但這也是能找到的最詳細的資料了接下來根據其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個成熟的產品來學習吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...

    godlong_X 評論0 收藏0
  • RocketMQ源碼學習(六)-Name Server

    摘要:完全無狀態,可集群部署與集群中的其中一個節點隨機選擇建立長連接,定期從取路由信息,并向提供服務的建立長連接,且定時向發送心跳。既可以從訂閱消息,也可以從訂閱消息,訂閱規則由配置決定。 問題列表: Name Server 的作用是什么? Name Server 存儲了Broker的什么信息? Name Server 為Producer的提供些什么信息? Name Server 為Co...

    Joyven 評論0 收藏0
  • RocketMQ源碼學習(五)-Broker(與Consumer交互部分)

    摘要:發送消息階段,不允許發送重復的消息。雖然不能嚴格保證不重復,但是正常情況下很少會出現重復發送消費情況,只有網絡異常,啟停等異常情況下會出現消息重復。 問題列表 Broker 怎么響應Consumer請求? Broker 怎么維護ConsumeQueue? Broker 怎么處理事務消息的 ConsumeQueue ? Broker 怎么處理定時消息的 ConsumeQueue? B...

    paulli3 評論0 收藏0
  • SpringBoot RocketMQ 整合使用和監控

    摘要:前提通過前面兩篇文章可以簡單的了解和安裝,今天就將和整合起來使用。然后我運行之前的整合項目,查看監控信息如下總結整篇文章講述了與整合和監控平臺的搭建。 showImg(https://segmentfault.com/img/remote/1460000013232432?w=1920&h=1277); 前提 通過前面兩篇文章可以簡單的了解 RocketMQ 和 安裝 RocketMQ...

    Jacendfeng 評論0 收藏0

發表評論

0條評論

周國輝

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<