摘要:消息隊列帶來的問題系統(tǒng)可用性降低系統(tǒng)引入的外部依賴越多,系統(tǒng)越容易出問題。系統(tǒng)復(fù)雜性提高加入消息隊列后,需要保證消息沒有被重復(fù)消費,保證消息傳遞的順序性等等。
消息隊列相關(guān)筆記 消息隊列的應(yīng)用場景:
消費者執(zhí)行過程比較長且生產(chǎn)者不需要消費者返回結(jié)果。用于更新索引庫,生成商品詳情頁,發(fā)短信。
為什么要使用消息隊列:通過異步處理提高系統(tǒng)性能(削峰、減少響應(yīng)所需時間);
降低系統(tǒng)耦合性。
削鋒作用:通過異步處理,將短時間高并發(fā)產(chǎn)生的事務(wù)消息存儲在消息隊列中,從而削平高峰期的并發(fā)事務(wù)。
消息隊列帶來的問題:系統(tǒng)可用性降低:系統(tǒng)引入的外部依賴越多,系統(tǒng)越容易出問題。如果MQ出問題,整套系統(tǒng)更容易崩潰。
系統(tǒng)復(fù)雜性提高:加入消息隊列后,需要保證消息沒有被重復(fù)消費,保證消息傳遞的順序性等等。
一致性問題:消息沒有被真正消費者正確消費,會導(dǎo)致數(shù)據(jù)不一致的情況。
JMS:Java Message Service,是一個消息服務(wù)的標(biāo)準(zhǔn)和規(guī)范,允許應(yīng)用程序組件基于JaveEE平臺創(chuàng)建、發(fā)送、接收和讀取消息。
JMS的兩種消息模型:點到點模型(P2P):
使用隊列作為消息通信載體,一條消息只能被一個消費者使用,未消費的消息在隊列中保留直到被消費或者超時。
發(fā)布/訂閱模型(Pub/Sub):
發(fā)布者發(fā)布一條信息,通過主題傳遞給所有的訂閱者,在一條消息廣播后才訂閱的用戶收不到該條消息。
JMS的五種消息正文格式:StreamMessage——Java原始值的數(shù)據(jù)流
MapMessage——一套鍵值對
TextMessage——一個字符串對象
ObjectMessage——一個序列化的Java對象
BytesMessage——一個字節(jié)的數(shù)據(jù)流
AMQP:Advanced Message Queuing Protocol,一個提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級消息隊列協(xié)議(二進(jìn)制應(yīng)用層協(xié)議)。
如何保證消息隊列的高可用?以 RabbitMQ 為例子講解第一種 MQ 的高可用性怎么實現(xiàn)。
RabbitMQ 有三種模式:單機模式、普通集群模式、鏡像集群模式。
單機模式,就是 Demo 級別的,一般就是你本地啟動了玩玩兒的,沒人生產(chǎn)用單機模式。
普通集群模式,無高可用性,在多臺機器上啟動多個 RabbitMQ 實例,每個機器啟動一個。你創(chuàng)建的 queue,只會放在一個 RabbitMQ 實例上,沒做到所謂的分布式,就是個普通集群。因為這導(dǎo)致你要么消費者每次隨機連接一個實例然后拉取數(shù)據(jù),要么固定連接那個 queue 所在實例消費數(shù)據(jù),前者有數(shù)據(jù)拉取的開銷,后者導(dǎo)致單實例性能瓶頸。這個方案提高了吞吐量,但沒有高可用性。
鏡像集群模式,有高可用性,queue里的消息會存在于多個實例上,每個RabbitMQ節(jié)點都有這個queue的一個完整鏡像,包含queue的全部數(shù)據(jù)。該模式的缺點:性能開銷過大,帶寬壓力和消耗很重;不是分布式的,沒有擴(kuò)展性。
Kafka的高可用性:由多個broker組成,每個broker是一個節(jié)點;你創(chuàng)建的一個topic會被劃分為多個partition,每個partition存儲于不同的broker上,每個partition包含部分topic的數(shù)據(jù),是一個天然的分布式消息隊列。
Kafka 0.8 以后,提供了 HA 機制,就是 replica(復(fù)制品) 副本機制。
每個 partition 的數(shù)據(jù)都會同步到其它機器上,形成自己的多個 replica 副本。
所有 replica 會選舉一個 leader 出來,那么生產(chǎn)和消費都跟這個 leader 打交道,然后其他 replica 就是 follower。
這么搞,就有所謂的高可用性了,因為如果某個 broker 宕機了,沒事兒,那個 broker上面的 partition 在其他機器上都有副本的,如果這上面有某個 partition 的 leader,那么此時會從 follower 中重新選舉一個新的 leader 出來,大家繼續(xù)讀寫那個新的 leader 即可。
ack消息延遲:
如果寫入了 RabbitMQ 中,RabbitMQ 會給你回傳一個 ack 消息,告訴你說這個消息 ok 了?!?/p>
持久化可以跟生產(chǎn)者那邊的 confirm 機制配合起來,只有消息被持久化到磁盤之后,才會通知生產(chǎn)者 ack 了?!?/p>
關(guān)閉 RabbitMQ 的自動 ack,可以通過一個 api 來調(diào)用就行,然后每次確保消費端處理完數(shù)據(jù)的時候,再在程序里 ack 一把。
各類MQ對比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
單擊吞吐量 | 萬 級 | 萬級 | 10萬級 | 10萬級 |
topic數(shù)量對吞吐量的影響 | topic可以達(dá)到幾百,幾千個的級別,吞吐量會有較小幅度的下降 | topic從幾十個到幾百個的時候,吞吐量會大幅度下降 | ||
時效性 | ms級 | 微秒級,延遲最低 | ms級 | ms級 |
可用性 | 高 | 高 | 非常高,分布式架構(gòu) | 非常高,kafka是分布式的,一個數(shù)據(jù)多個副本,少數(shù)機器宕機,不會丟失數(shù)據(jù),不會導(dǎo)致不可用 |
消息可靠性 | 低概率丟失數(shù)據(jù) | 經(jīng)過參數(shù)優(yōu)化配置,可以做到0丟失 | 經(jīng)過參數(shù)優(yōu)化配置,可以做到0丟失 | |
功能支持 | 功能極其完備 | 基于erlang開發(fā),所以并發(fā)能力很強,性能極其好,延時很低 | MQ功能較為完善,還是分布式的,擴(kuò)展性好 | 功能較為簡單,主要支持簡單的MQ功能,在大數(shù)據(jù)領(lǐng)域的實時計算以及日志采集被大規(guī)模使用,是事實上的標(biāo)準(zhǔn) |
代碼
org.springframework spring-jms
//生產(chǎn)者 @Autowired private JmsTemplate jmsTemplate; @Autowired private Destination queueSolrDestination; @Autowired private Destination topicPageDestination; //更新索引庫 jmsTemplate.send(queueSolrDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(jsonString); } }); //生成商品詳情頁 jmsTemplate.send(topicPageDestination, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { return session.createTextMessage(id + ""); } });
//消費者(監(jiān)聽) @Component public class PageListener implements MessageListener{ @Autowired private ItemPageService itemPageService; @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { String text = textMessage.getText(); System.out.println("接收到消息:" + text); boolean b = itemPageService.genItemHtml(Long.parseLong(text)); System.out.println("網(wǎng)頁生成結(jié)果:" + b); } catch (JMSException e) { e.printStackTrace(); } } }
?
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/76162.html
摘要:所以基于目前的設(shè)計,建議關(guān)閉自動創(chuàng)建的功能,然后根據(jù)消息量的大小,手動創(chuàng)建。如果發(fā)送消息,返回結(jié)果超時,這種超時不會進(jìn)行重試了如果是方法本身耗時超過,還未來得及調(diào)用發(fā)送消息,此時的超時也不會重試。 先來看下producer核心的類設(shè)計,如下圖: showImg(http://pbdqyl9hh.bkt.clouddn.com/rocketmq/producer%E7%B1%BB%E5%...
摘要:異步如果在函數(shù)返回的時候,調(diào)用者還不能購得到預(yù)期結(jié)果,而是將來通過一定的手段得到例如回調(diào)函數(shù),這就是異步。的意思是,將回調(diào)函數(shù)立刻插入消息隊列,等待執(zhí)行,而不是立即執(zhí)行。 大家好,我是wmingren,小伙伴們都知道JavaScript是單線程的語言,所謂的單線程呢就是指如果有多個任務(wù)就必須去排隊,前面任務(wù)執(zhí)行完成后,后面任務(wù)再執(zhí)行。到這里我們就產(chǎn)生了一個疑問,既然是單線程的,又怎么會...
摘要:第至行從中獲得發(fā)布信息。第至行容錯策略選擇消息隊列邏輯。第至行執(zhí)行發(fā)起網(wǎng)絡(luò)請求。第至行處理消息發(fā)送結(jié)果,設(shè)置響應(yīng)結(jié)果和提示。第至行發(fā)送成功,響應(yīng)。第至行消息格式與大小校驗。 摘要: 原創(chuàng)出處 http://www.iocoder.cn/RocketM... 「芋道源碼」歡迎轉(zhuǎn)載,保留摘要,謝謝! 本文主要基于 RocketMQ 4.0.x 正式版 1、概述 2、Producer 發(fā)...
摘要:比如,服務(wù)數(shù)據(jù)庫的數(shù)據(jù)來源于服務(wù)的數(shù)據(jù)庫服務(wù)的數(shù)據(jù)有變更操作時,需要同步到服務(wù)中。第二種解決方案通過數(shù)據(jù)庫的進(jìn)行同步。并且,我們還用這套架構(gòu)進(jìn)行緩存失效的同步。目前這套同步架構(gòu)正常運行中,后續(xù)有遇到問題再繼續(xù)更新。在微服務(wù)拆分的架構(gòu)中,各服務(wù)擁有自己的數(shù)據(jù)庫,所以常常會遇到服務(wù)之間數(shù)據(jù)通信的問題。比如,B服務(wù)數(shù)據(jù)庫的數(shù)據(jù)來源于A服務(wù)的數(shù)據(jù)庫;A服務(wù)的數(shù)據(jù)有變更操作時,需要同步到B服務(wù)中。第一...
閱讀 3569·2021-11-15 11:36
閱讀 1060·2021-11-11 16:55
閱讀 694·2021-10-20 13:47
閱讀 2993·2021-09-29 09:35
閱讀 3428·2021-09-08 10:45
閱讀 2554·2019-08-30 15:44
閱讀 849·2019-08-30 11:10
閱讀 1428·2019-08-29 13:43