国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

一定能看懂的RocketMQ事務消息源碼分析(干貨)

myshell / 962人閱讀

摘要:但是服務器又確實是收到了這條消息的,只是給客戶端的響應丟失了,所以導致的結果就是扣款失敗,成功發貨。既然消息的發送不能和本地事務寫在一起,那如何來保證其整體具有原子性的需求呢答案就是今天我們介紹的主角事務消息。

前言

得益于MQ削峰填谷,系統解耦,操作異步等功能特性,在互聯網行業,可以說有分布式服務的地方,MQ都往往不會缺席。由阿里自研的RocketMQ更是經歷了多年的雙十一高并發挑戰,其中4.3.0版本推出了事務消息的新特性,本文對RocketMQ 4.5.0版本事務消息相關的源碼跟蹤介紹,通過閱讀讀者可以知道:

事務消息解決什么樣的問題

事務消息的實現原理及其設計亮點

解決什么問題

假設我所在的系統現在有這樣一個場景:

本地開啟數據庫事務進行扣款操作,成功后發送MQ消息給庫存中心進行發貨。

有人會想到開啟mybatis事務實現,把本地事務和MQ消息放在一起不就行了嗎?如果MQ發送成功,就提交事務,發送失敗就回滾事務,整套操作一氣呵成。

</>復制代碼

  1. transaction{
  2. 扣款();
  3. boolean success = 發送MQ();
  4. if(success){
  5. commit();
  6. }else{
  7. rollBack();
  8. }
  9. }

看似沒什么問題,但是網絡是不可靠的。

假設MQ返回過來的響應因為網絡原因遲遲沒有收到,所以在面對不確定的MQ返回結果只好進行回滾。但是MQ 服務器又確實是收到了這條消息的,只是給客戶端的響應丟失了,所以導致的結果就是扣款失敗,成功發貨。

既然MQ消息的發送不能和本地事務寫在一起,那如何來保證其整體具有原子性的需求呢?答案就是今天我們介紹的主角:事務消息

概覽

總體而言RocketMQ事務消息分為兩條主線

定時任務發送流程:發送half message(半消息),執行本地事務,發送事務執行結果

定時任務回查流程:MQ服務器回查本地事務,發送事務執行結果

因此本文也通過這兩條主線對源碼進行分析

源碼分析 半消息發送流程 本地應用(client)

在本地應用發送事務消息的核心類是TransactionMQProducer,該類通過繼承DefaultMQProducer來復用大部分發送消息相關的邏輯,這個類的代碼量非常少只有100來行,下面是這個類的sendMessageTransaction方法

</>復制代碼

  1. @Override
  2. public TransactionSendResult sendMessageInTransaction(final Message msg,
  3. final Object arg) throws MQClientException {
  4. if (null == this.transactionListener) {
  5. throw new MQClientException("TransactionListener is null", null);
  6. }
  7. return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
  8. }

這個方法做了兩件事,

檢查transactionListener是否存在

調用父類執行事務消息發送

TransactionListener在事務消息流程中起到至關重要的作用,一起看看這個接口

</>復制代碼

  1. public interface TransactionListener {
  2. /**
  3. * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
  4. *
  5. * @param msg Half(prepare) message
  6. * @param arg Custom business parameter
  7. * @return Transaction state
  8. */
  9. LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
  10. /**
  11. * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
  12. * method will be invoked to get local transaction status.
  13. *
  14. * @param msg Check message
  15. * @return Transaction state
  16. */
  17. LocalTransactionState checkLocalTransaction(final MessageExt msg);
  18. }

接口注釋說的很明白,配合上面的概覽圖來看就是,executeLocalTransaction方法對應的就是執行本地事務操作,checkLocalTransaction對應的就是回查本地事務操作。

下面是DefaultMQProducer類的sendMessageInTransaction方法源碼

</>復制代碼

  1. public TransactionSendResult sendMessageInTransaction(final Message msg,
  2. final LocalTransactionExecuter localTransactionExecuter, final Object arg)
  3. throws MQClientException {
  4. ...
  5. SendResult sendResult = null;
  6. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
  7. MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
  8. ...
  9. sendResult = this.send(msg);
  10. ...
  11. switch (sendResult.getSendStatus()) {
  12. case SEND_OK: {
  13. ...
  14. localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
  15. ...
  16. break;
  17. case FLUSH_DISK_TIMEOUT:
  18. case FLUSH_SLAVE_TIMEOUT:
  19. case SLAVE_NOT_AVAILABLE:
  20. localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
  21. break;
  22. default:
  23. break;
  24. }
  25. ...
  26. this.endTransaction(sendResult, localTransactionState, localException);
  27. ...
  28. }

為了使源碼的邏輯更加直觀,筆者精簡了核心代碼。sendMessageInTransaction方法主要做了以下事情

給消息打上事務消息相關的標記,用于MQ服務端區分普通消息和事務消息

發送半消息(half message)

發送成功則由transactionListener執行本地事務

執行endTransaction方法,如果半消息發送失敗本地事務執行失敗告訴服務端是刪除半消息,半消息發送成功本地事務執行成功則告訴服務端生效半消息。

發送半消息流程,Client端代碼到這里差不多就結束了,接下來看看RocketMQ Server端是如何處理的

RocketMQ Server

Server在接收到消息過后會進行一些領域對象的轉化和是否支持事務消息的權限校驗,對理解事務消息用處不大,此處就省略對旁枝末節的介紹了。下面是TransactionalMessageBridge類處理half message的源碼

</>復制代碼

  1. public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
  2. return store.putMessage(parseHalfMessageInner(messageInner));
  3. }
  4. private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
  5. MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
  6. MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
  7. String.valueOf(msgInner.getQueueId()));
  8. msgInner.setSysFlag(
  9. MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
  10. msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
  11. msgInner.setQueueId(0);
  12. msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
  13. return msgInner;
  14. }

這兩個方法主要做了以下事情:

</>復制代碼

  1. public class Message implements Serializable {
  2. private static final long serialVersionUID = 8445773977080406428L;
  3. private String topic;
  4. private int flag;
  5. private Map properties;
  6. private byte[] body;
  7. private String transactionId;
  8. }

將消息的topic,queueId放進消息體自身的map里進行緩存

將消息的topic 設置為“RMQ_SYS_TRANS_OP_HALF_TOPIC”,queueId設置為0

將消息寫入磁盤持久化

可以看到所有的事務半消息都會被放進同一個topic的同一個queue里面,通過對topic的區分,從而避免了半消息被consumer給消費到

Server將半消息持久化后然后會發送結果給我們本地的應用程序。到了這里Server端對半消息的處理就結束了,緊接著的是定時任務的登場。

定時任務回查流程 RocketMQ Server

定時任務是一個叫TransactionalMessageService類的線程,下面是該類的check方法

</>復制代碼

  1. @Override
  2. public void check(long transactionTimeout, int transactionCheckMax,
  3. AbstractTransactionalMessageCheckListener listener) {
  4. ...
  5. if (!putBackHalfMsgQueue(msgExt, i)) {
  6. continue;
  7. }
  8. listener.resolveHalfMsg(msgExt);
  9. }
  10. ...
  11. }

check方法非常長,省略的代碼大致都是對半消息進行過濾(如超過72小時的事務消息,就被算作過期),只保留符合條件的半消息對其進行回查。

其中很有意思的是putBackHalfMsgQueue方法,因為每次把半消息從磁盤拉到內存里進行處理都會對其屬性進行改變(例如TRANSACTION_CHECK_TIMES,這是是否丟棄事務消息的關鍵信息),所以在發送回查消息之前需要對半消息再次放進磁盤。RocketMQ采取的方法是基于最新的物理偏移量重新寫入,而不是對原有的半消息進行修改,其中的目的就是RocketMQ的存儲設計采用順序寫,如果去修改消息 ,無法做到高性能。

下面是resolveHalfMsg方法,主要就是開啟一個線程然后發送check消息。

</>復制代碼

  1. public void resolveHalfMsg(final MessageExt msgExt) {
  2. executorService.execute(new Runnable() {
  3. @Override
  4. public void run() {
  5. try {
  6. sendCheckMessage(msgExt);
  7. } catch (Exception e) {
  8. LOGGER.error("Send check message error!", e);
  9. }
  10. }
  11. });
  12. }
本地應用(client)

下面是DefaultMQProducerImpl的checkTransactionState方法,是本地應用對回查消息的處理邏輯

</>復制代碼

  1. @Override
  2. public void checkTransactionState(final String addr, final MessageExt msg,
  3. final CheckTransactionStateRequestHeader header) {
  4. Runnable request = new Runnable() {
  5. ...
  6. @Override
  7. public void run() {
  8. ...
  9. TransactionListener transactionListener = getCheckListener();
  10. ...
  11. localTransactionState = transactionListener.checkLocalTransaction(message);
  12. ...
  13. this.processTransactionState(
  14. localTransactionState,
  15. group,
  16. exception);
  17. }
  18. private void processTransactionState(
  19. ...
  20. DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
  21. 3000);
  22. ...
  23. }
  24. };
  25. this.checkExecutor.submit(request);
  26. }

精簡代碼邏輯后可以清晰的看到

開啟一個線程來執行回查的邏輯

執行transactionListener的checkLocalTransaction方法來獲取本地事務執行的結果

RocketMQ Server

RocketMQ 服務器在收到Client發過來的Commit消息后會

讀出半消息——>恢復topic等原消息體的信息——>和普通消息一樣再次寫入磁盤——>刪除之前的半消息

如果是Rollback消息則直接刪除之前的半消息

到此,整條RocketMQ 事務消息的調用鏈就結束了

思考

1. 分布式事務等于事務消息嗎?

兩者并沒有關系,事務消息僅僅保證本地事務和MQ消息發送形成整體的原子性,而投遞到MQ服務器后,消費者是否能一定消費成功是無法保證的。

2. 源碼設計上有什么亮點嗎?

通過對整條鏈路源碼的學習理解發現還是有不少亮點的

server端回查消息的發送,client端回查消息邏輯的處理,client端commit/rollback消息的提交都是用了異步進行,可以說能異步的地方都用了異步,通過異步+重試的方式保證了在分布式環境中即使短暫的網絡狀況不良好,也不會影響整體邏輯。

引入TransactionListener,真正做到了開閉原則以及依賴倒置原則,面向接口編程。整體擴展性做得非常好,使用者只需要編寫自己的Listener就可以做到事務消息的發送,非常方便

TransactionMQProducer通過繼承DefaultMQProducer極大地復用了關于發送消息相關的邏輯

3. 源碼設計上有什么不足嗎?

RocketMQ作為一款極其成功的消息中間件,要發現不足不是那么容易了,筆者談幾點看法

sendMessageIntransaction等事務相關的方法被劃分在了DefaultMQProducer里面,從內聚的角度來說這是跟事務相關的發送消息方法應該被劃分在TransactionMQProducer。

所有topic的半消息都會寫在topic為RMQ_SYS_TRANS_OP_HALF_TOPIC的半消息隊列里,并且每條半消息,在整個鏈路里會被寫多次,如果并發很大且大部分消息都是事務消息的話,可靠性會存在問題。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/75328.html

相關文章

  • 讓你看懂的RocketMQ事務消息源碼分析(干貨)

    摘要:但是服務器又確實是收到了這條消息的,只是給客戶端的響應丟失了,所以導致的結果就是扣款失敗,成功發貨。所有的半消息都會寫在為的半消息隊列里,并且每條半消息,在整個鏈路里會被寫多次,如果并發很大且大部分消息都是事務消息的話,可靠性會存在問題。 前言 得益于MQ削峰填谷,系統解耦,操作異步等功能特性,在互聯網行業,可以說有分布式服務的地方,MQ都往往不會缺席。由阿里自研的RocketMQ更是...

    zsirfs 評論0 收藏0
  • 新手也看懂消息隊列其實很簡單

    摘要:通過以上分析我們可以得出消息隊列具有很好的削峰作用的功能即通過異步處理,將短時間高并發產生的事務消息存儲在消息隊列中,從而削平高峰期的并發事務。 該文已加入開源項目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識的文檔類項目,Star 數接近 16k)。地址:https://github.com/Snailclimb... 本文內容思維導圖:showImg(ht...

    Clect 評論0 收藏0
  • 后端經驗

    摘要:在結構上引入了頭結點和尾節點,他們分別指向隊列的頭和尾,嘗試獲取鎖入隊服務教程在它提出十多年后的今天,已經成為最重要的應用技術之一。隨著編程經驗的日積月累,越來越感覺到了解虛擬機相關要領的重要性。 JVM 源碼分析之 Jstat 工具原理完全解讀 http://click.aliyun.com/m/8315/ JVM 源碼分析之 Jstat 工具原理完全解讀 http:...

    i_garfileo 評論0 收藏0
  • 高并發異步解耦利器:RocketMQ究竟強在哪里?

    摘要:它是阿里巴巴于年開源的第三代分布式消息中間件。是一個分布式消息中間件,具有低延遲高性能和可靠性萬億級別的容量和靈活的可擴展性,它是阿里巴巴于年開源的第三代分布式消息中間件。上篇文章消息隊列那么多,為什么建議深入了解下RabbitMQ?我們講到了消息隊列的發展史:并且詳細介紹了RabbitMQ,其功能也是挺強大的,那么,為啥又要搞一個RocketMQ出來呢?是重復造輪子嗎?本文我們就帶大家來詳...

    tainzhi 評論0 收藏0
  • 【備戰春招/秋招系列】美團Java面經總結進階篇 (附詳解答案)

    摘要:我在前面的文章中也提到了應該怎么做自我介紹與項目介紹,詳情可以查看這篇文章備戰春招秋招系列初出茅廬的程序員該如何準備面試。因此基于事件消息對象驅動的業務架構可以是一系列流程。 showImg(https://user-gold-cdn.xitu.io/2018/11/14/16711ac29c2ae52c?w=928&h=531&f=png&s=798562); 一 消息隊列MQ的...

    chengjianhua 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<