摘要:在對事實性要求沒有那么高的情況下,可以用基于最大努力交付消息隊列以及消息存儲來解決最終一致性。可靠消息服務和消息組件,協調上下游消息的傳遞,并確保上下游數據的一致性。下游應用通知可靠消息服務該消息已經成功消費。
本文對比 二階段事務、最大努力交付以及消息最終一致性,并給出部分解決方案,最終一致性方案參考阿里RockMQ事務消息:http://blog.csdn.net/chunlong...)
項目git地址:https://github.com/vvsuperman...
一 2階段事務分布式系統最終一致性有N種方案,比如2PC(2階段事務) ,以及三段提交等等,但開銷較大,實現起來復雜,比如2階段事務為例,需要引入一個協調者(Coordinator)來統一掌控所有參與者(Participant)的操作結果
以開會為例:
甲乙丙丁四人要組織一個會議,需要確定會議時間,不妨設甲是協調者,乙丙丁是參與者。
投票階段:
(1)甲發郵件給乙丙丁,周二十點開會是否有時間;
(2)甲回復有時間;
(3)乙回復有時間;
(4)丙遲遲不回復,此時對于這個活動,甲乙丙均處于阻塞狀態,算法無法繼續進行;
(5)丙回復有時間(或者沒有時間);
提交階段:
(1)協調者甲將收集到的結果反饋給乙丙丁(什么時候反饋,以及反饋結果如何,在此例中取決與丙的時間與決定);
(2)乙收到;
(3)丙收到;
(4)丁收到;
不僅要鎖住參與者的所有資源,而且要鎖住協調者資源,開銷大。一句話總結就是:2PC效率很低,分布式事務很難做。
在對事實性要求沒有那么高的情況下,可以用基于最大努力交付 && 消息隊列以及消息存儲來解決最終一致性。
二 消息最大努力交付所謂最大努力交付,就是俺反正用最大努力做,能不能成功,不做完全保證
會涉及到三個模塊
上游應用,發消息到 MQ 隊列。
下游應用(例如短信服務、郵件服務),接受請求,并返回通知結果。
最大努力通知服務,監聽消息隊列,將消息存儲到數據庫中,并按照通知規則調用下游應用的發送通知接口。
具體流程如下
上游應用發送 MQ 消息到 MQ 組件內,消息內包含通知規則和通知地址
最大努力通知服務監聽到 MQ 內的消息,解析通知規則并放入延時隊列等待觸發通知
最大努力通知服務調用下游的通知地址,如果調用成功,則該消息標記為通知成功,如果失敗則在滿足通知規則(例如 5 分鐘發一次,共發送 10 次)的情況下重新放入延時隊列等待下次觸發。
最大努力通知服務表示在不影響主業務的情況下,盡可能地確保數據的一致性。它需要開發人員根據業務來指定通知規則,在滿足通知規則的前提下,盡可能的確保數據的一致,以達到最大努力的目的。
實現上也比較簡單,目前主流消息隊列都有ack機制,當沒收到ack的時候用規則做定時重發即可。
優點:實現簡單
缺點:無補償機制,不保證能夠送達
實現要點: 保證消息發送失敗之后能夠和業務一起回滾;消息接受方保證冥等性;定時重發機制,采用一定的重發策略,例如說指數增長,據說阿里采用redis的zset來完成,參考https://zhuanlan.zhihu.com/p/...
消息進到zset后,DelayQ會通過timer觸發(比如秒級),fork相應的消費線程去處理zset里ExecuteTime大于當前時間的消息。DelayQ拿到一條消息后,解析其中的callbackurl,并組裝參數,push業務消息給Consumer.
Consumer返回處理成功,那么zrem Codis里的消息。如果處理失敗,則計算其下次嘗試時間,并更新其ExecuteTime.
此方案涉及 3 個模塊:
上游應用,執行業務并發送 MQ 消息。
可靠消息服務和 MQ 消息組件,協調上下游消息的傳遞,并確保上下游數據的一致性。
下游應用,監聽 MQ 的消息并執行自身業務。
第一階段:上游應用執行業務并發送 MQ 消息上游應用將本地業務執行和消息發送綁定在同一個本地事務中,保證要么本地操作成功并發送 MQ 消息,要么兩步操作都失敗并回滾。
上游應用和可靠消息之間的業務交互圖如下:
上游應用發送待確認消息到可靠消息系統
可靠消息系統保存待確認消息并返回
上游應用執行本地業務
上游應用通知可靠消息系統確認業務已執行并發送消息。
可靠消息系統修改消息狀態為發送狀態并將消息投遞到 MQ 中間件。
以上每一步都可能出現失敗情況,分析一下這 5 步出現異常后上游業務和消息發送是否一致:
上游應用執行完成,下游應用尚未執行或執行失敗時,此事務即處于 BASE 理論的 Soft State 狀態。
第二階段:下游應用監聽 MQ 消息并執行業務下游應用監聽 MQ 消息并執行業務,并且將消息的消費結果通知可靠消息服務。
可靠消息的狀態需要和下游應用的業務執行保持一致,可靠消息狀態不是已完成時,確保下游應用未執行,可靠消息狀態是已完成時,確保下游應用已執行。
下游應用和可靠消息服務之間的交互圖如下:
下游應用監聽 MQ 消息組件并獲取消息
下游應用根據 MQ 消息體信息處理本地業務
下游應用向 MQ 組件自動發送 ACK 確認消息被消費
下游應用通知可靠消息系統消息被成功消費,可靠消息將該消息狀態更改為已完成。
以上每一步都可能出現失敗情況,分析一下這 4 步出現異常后下游業務和消息狀態是否一致:
通過分析以上兩個階段可能失敗的情況,為了確保上下游數據的最終一致性,在可靠消息系統中,需要開發 消息狀態確認 和 消息重發 兩個功能以實現 BASE 理論的 Eventually Consistent 特性。
異常處理一:消息狀態確認可靠消息服務定時監聽消息的狀態,如果存在狀態為待確認并且超時的消息,則表示上游應用和可靠消息交互中的步驟 4 或者 5 出現異常。
可靠消息則攜帶消息體內的信息向上游應用發起請求查詢該業務是否已執行。上游應用提供一個可查詢接口供可靠消息追溯業務執行狀態,如果業務執行成功則更改消息狀態為已發送,否則刪除此消息確保數據一致。具體流程如下:
可靠消息查詢超時的待確認狀態的消息
向上游應用查詢業務執行的情況
業務未執行,則刪除該消息,保證業務和可靠消息服務的一致性。業務已執行,則修改消息狀態為已發送,并發送消息到 MQ 組件。
異常處理二:消息重發消息已發送則表示上游應用已經執行,接下來則確保下游應用也能正常執行。
可靠消息服務發現可靠消息服務中存在消息狀態為已發送并且超時的消息,則表示可靠消息服務和下游應用中存在異常的步驟,無論哪個步驟出現異常,可靠消息服務都將此消息重新投遞到 MQ 組件中供下游應用監聽。
下游應用監聽到此消息后,在保證冪等性的情況下重新執行業務并通知可靠消息服務此消息已經成功消費,最終確保上游應用、下游應用的數據最終一致性。具體流程如下:
可靠消息服務定時查詢狀態為已發送并超時的消息
可靠消息將消息重新投遞到 MQ 組件中
下游應用監聽消息,在滿足冪等性的條件下,重新執行業務。
下游應用通知可靠消息服務該消息已經成功消費。
通過消息狀態確認和消息重發兩個功能,可以確保上游應用、可靠消息服務和下游應用數據的最終一致性。
四 肉身實戰Rabbitmq我們在rabbitmq上肉身實戰了一下可靠消息,rabbitmq的發送過程如下
發送消息到消息服務
消息隊列將消息發送給監聽
消息監聽接受并處理消息
我們來看看可能發送異常的四種
1 直接無法到達消息服務網絡斷了,拋出異常,業務直接回滾即可。如果出現connection closed錯誤,直接增加 connection數即可
connectionFactory.setChannelCacheSize(100);2 消息已經到達服務器,但返回的時候出現異常
rabbitmq提供了確認ack機制,可以用來確認消息是否有返回。因此我們可以在發送前在db中(內存或關系型數據庫)先存一下消息,如果ack異常則進行重發
/**confirmcallback用來確認消息是否有送達消息隊列*/ rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (!ack) { //try to resend msg } else { //delete msg in db } }); /**若消息找不到對應的Exchange會先觸發returncallback */ rabbitTemplate.setReturnCallback((message, replyCode, replyText, tmpExchange, tmpRoutingKey) -> { try { Thread.sleep(Constants.ONE_SECOND); } catch (InterruptedException e) { e.printStackTrace(); } log.info("send message failed: " + replyCode + " " + replyText); rabbitTemplate.send(message); });
如果消息沒有到exchange,則confirm回調,ack=false
如果消息到達exchange,則confirm回調,ack=true
但如果是找不到exchange,則會先觸發returncallback
如果設置了消息持久化,那么ack= true是在消息持久化完成后,就是存到硬盤上之后再發送的,確保消息已經存在硬盤上,萬一消息服務掛了,消息服務恢復是能夠再重發消息
4 未送達消費者消息服務收到消息后,消息會處于"UNACK"的狀態,直到客戶端確認消息
channel.basicQos(1); // accept only one unack-ed message at a time (see below) final Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println(" [x] Received "" + message + """); try { doWork(message); } finally { //確認收到消息 channel.basicAck(envelope.getDeliveryTag(), false); } } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);5 確認消息丟失
消息返回時假設確認消息丟失了,那么消息服務會重發消息。注意,如果你設置了autoAck= false,但又沒應答 channel.baskAck也沒有應答 channel.baskNack,那么會導致非常嚴重的錯誤:消息隊列會被堵塞住,可參考http://blog.sina.com.cn/s/blo...,所以,無論如何都必須應答
6 消費者業務處理異常消息監聽接受消息并處理,假設拋異常了,第一階段事物已經完成,如果要配置回滾則過于麻煩,即使做事務補償也可能事務補償失效的情況,所以這里可以做一個重復執行,比如guava的retry,設置一個指數時間來循環執行,如果n次后依然失敗,發郵件、短信,用人肉來兜底。
參考:http://blog.csdn.net/reviveds...
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/67717.html
本文以一個實際業務問題來談談事務該如何處理。對接外部系統是是不可避免的,從廣泛意義上來說,外部系統范圍很大,中間件(數據庫)也屬于外部系統。當我們討論事務時,通常我們將那些沒有支持事務的系統稱為外部系統,業務系統基本上都是外部系統。問題有這樣一套系統,以gitlab為底層系統, 在gitlab project的基礎上封裝了代碼倉,系統對其中一些與gitlab關聯的數據進行了落表。創建代碼倉的邏輯過...
摘要:針對這樣的客戶,靈雀云除了提供容器云,還會基于容器云提供工具鏈和咨詢服務。第三階段,是上云原生。靈雀云建議,先做邊緣應用系統的微服務化,或者單體直接應用上云。靈雀云會幫助客戶成立專家組,實踐敏捷活動和工具鏈一整套的解決方案。 今天很榮幸能在這里跟大家一起分享下靈雀云在金融行業的云原生解決方案。 CNCF的云原生核心理念是快速交付業務價值,而云原生時代,主要由三駕馬車驅動:容器、DevO...
摘要:也就是說在分布式系統下對多個數據庫進行事務的統一管控,保證數據的一致性。真實系統應當是與的混合體總結分布式系統中,最重要的是滿足業務需求,而不是追求抽象絕對的系統特性 分布式事務是指事務的參與者、支持事務的服務器、資源服務器以及事務管理器分別位于不同的分布式系統的不同節點之上。——百度百科如是說。也就是說在分布式系統下對多個數據庫進行事務的統一管控,保證數據的一致性。 當數據庫單表數據...
摘要:本次演講將介紹蘑菇街微服務治理體系經歷的架構演進歷程,面臨的技術難點和解決思路。年加入蘑菇街,目前負責蘑菇街內部中間件平臺,包括分布式服務通信框架配置中心服務發現消息隊列等其他服務基礎設施等項目。文章來源網易云社區 微服務的概念最早由Martin Fowler與James Lewis于2014年共同提出,核心思想是圍繞業務能力組織服務,各個微服務可被獨立部署,服務間是松耦合的關系,以及...
閱讀 1517·2021-11-18 10:02
閱讀 1657·2021-09-04 16:40
閱讀 3171·2021-09-01 10:48
閱讀 874·2019-08-30 15:55
閱讀 1853·2019-08-30 15:55
閱讀 1365·2019-08-30 13:05
閱讀 3013·2019-08-30 12:52
閱讀 1624·2019-08-30 11:24