摘要:基礎(chǔ)概念消息總線,是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。每一個(gè)都會(huì)接收到消息。概念標(biāo)簽,用于對(duì)消息分類,在的基礎(chǔ)上進(jìn)行更細(xì)的劃分。概念中負(fù)責(zé)接收生產(chǎn)者消息給消費(fèi)者發(fā)送消息的組件。
MQ基礎(chǔ)概念:
MQ:
消息總線(Message Queue),是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。在互聯(lián)網(wǎng)架構(gòu)中,MQ是一種非常常見的上下游“邏輯解耦+物理解耦”的消息通信服務(wù)。使用MQ之后,消息發(fā)送上游只需要依賴MQ,邏輯上和物理上都不用依賴其他服務(wù)。
MQ的不足
(1)系統(tǒng)更加復(fù)雜,多了一個(gè)MQ組件
(2)消息傳遞路徑更長,延時(shí)會(huì)增加
(3)消息可能會(huì)被重復(fù)消費(fèi)
(4)上游無法知道下游的執(zhí)行結(jié)果(因此,調(diào)用方實(shí)時(shí)依賴執(zhí)行結(jié)果的業(yè)務(wù)場(chǎng)景,請(qǐng)使用調(diào)用,而不是MQ)
使用場(chǎng)景
(1)上游不關(guān)注執(zhí)行結(jié)果
(2)上游關(guān)注結(jié)果,但執(zhí)行時(shí)間比較長。舉個(gè)例子,微信支付,跨公網(wǎng)調(diào)用微信的接口,執(zhí)行時(shí)間會(huì)比較長,但調(diào)用方又非常關(guān)注執(zhí)行結(jié)果,此時(shí)一般怎么玩呢?
一般采用“回調(diào)網(wǎng)關(guān)+MQ”方案來解耦:
a、調(diào)用方直接跨公網(wǎng)調(diào)用微信接口 b、微信返回調(diào)用成功,此時(shí)并不代表返回成功 c、微信執(zhí)行完成后,回調(diào)統(tǒng)一網(wǎng)關(guān) d、網(wǎng)關(guān)將返回結(jié)果通知MQ e、請(qǐng)求方收到結(jié)果通知
rocketMQ:
RocketMQ 是什么?
Github 上關(guān)于 RocketMQ 的介紹:
RcoketMQ 是一款低延遲、高可靠、可伸縮、易于使用的消息中間件。具有以下特性:
支持發(fā)布/訂閱(Pub/Sub)和點(diǎn)對(duì)點(diǎn)(P2P)消息模型
在一個(gè)隊(duì)列中可靠的先進(jìn)先出(FIFO)和嚴(yán)格的順序傳遞
支持拉(pull)和推(push)兩種消息模式
單一隊(duì)列百萬消息的堆積能力
支持多種消息協(xié)議,如 JMS、MQTT 等
分布式高可用的部署架構(gòu),滿足至少一次消息傳遞語義
提供 docker 鏡像用于隔離測(cè)試和云集群部署
提供配置、指標(biāo)和監(jiān)控等功能豐富的 Dashboard
consumer group:
1、概念:消費(fèi)者分組,多個(gè)消費(fèi)者在一個(gè)消費(fèi)者分組中。
2、注意點(diǎn):一個(gè)consumer group中的機(jī)器相當(dāng)于一個(gè)集群,consumer group中只有一臺(tái)機(jī)器會(huì)接收到消息,并進(jìn)行消費(fèi)。每一個(gè)consumer group都會(huì)接收到消息。這樣子的設(shè)計(jì)要求消費(fèi)端需要保證冪等性。
topic:
1、概念:Topic 是一種消息的邏輯分類,比如說你有訂單類的消息,也有庫存類的消息,那么就需要進(jìn)行分類,一個(gè)是訂單 Topic 存放訂單相關(guān)的消息,一個(gè)是庫存 Topic 存儲(chǔ)庫存相關(guān)的消息。
2、生產(chǎn)方發(fā)出的消息綁定某個(gè)topic,然后消費(fèi)方監(jiān)聽某個(gè)topic,消費(fèi)方(各個(gè)group)接收到消息,進(jìn)行消費(fèi)
3、topic應(yīng)用級(jí)別:整個(gè)應(yīng)用最好都使用一個(gè)topic,而更加細(xì)的區(qū)分,使用tags來區(qū)分。
tag:
1、概念:標(biāo)簽,用于對(duì)消息分類,在topic的基礎(chǔ)上進(jìn)行更細(xì)的劃分。
nameServer:
1、概念:Name Server 為 producer 和 consumer 提供路由信息。類似rpc中的注冊(cè)中心。當(dāng)producer需要發(fā)送消息首先去詢問nameServer需要請(qǐng)求哪個(gè)broker。而當(dāng)consumer需要拉取消息,也會(huì)先詢問nameServer需要請(qǐng)求哪個(gè)broker。
broker:
1、概念:rocketMQ中負(fù)責(zé)接收生產(chǎn)者消息、給消費(fèi)者發(fā)送消息的組件。
Message:
1、概念:Message 是消息的載體。一個(gè) Message 必須指定 topic。Message 還有一個(gè)可選的 tag 設(shè)置,以便消費(fèi)端可以基于 tag 進(jìn)行過濾消息。也可以添加額外的鍵值對(duì),例如你需要一個(gè)業(yè)務(wù) key 來查找 broker 上的消息,方便在開發(fā)過程中診斷問題。
public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { //聲明并初始化一個(gè)producer //需要一個(gè)producer group名字作為構(gòu)造方法的參數(shù),這里為producer1 DefaultMQProducer producer = new DefaultMQProducer("producer1"); //設(shè)置NameServer地址,此處應(yīng)改為實(shí)際NameServer地址,多個(gè)地址之間用;分隔 //NameServer的地址必須有,但是也可以通過環(huán)境變量的方式設(shè)置,不一定非得寫死在代碼里 producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876"); //調(diào)用start()方法啟動(dòng)一個(gè)producer實(shí)例 producer.start(); //發(fā)送10條消息到Topic為TopicTest,tag為TagA,消息內(nèi)容為“Hello RocketMQ”拼接上i的值 for (int i = 0; i < 10; i++) { try { Message msg = new Message("TopicTest",// topic "TagA",// tag ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body ); //調(diào)用producer的send()方法發(fā)送消息 //這里調(diào)用的是同步的方式,所以會(huì)有返回結(jié)果 SendResult sendResult = producer.send(msg); //打印返回結(jié)果,可以看到消息發(fā)送的狀態(tài)以及一些相關(guān)信息 System.out.println(sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } //發(fā)送完消息之后,調(diào)用shutdown()方法關(guān)閉producer producer.shutdown(); } }MQ消費(fèi)者實(shí)例:
在開發(fā)過程中,如果想測(cè)試生產(chǎn)者是否發(fā)出了mq,可以編寫一個(gè)消費(fèi)者進(jìn)行測(cè)試
@Test public void testMqConsumer() throws Exception { String rocketmqAddress="10.113.41.2:9876;10.113.41.4:9876"; int threadNum = 5; String topics = "WechatUnionCoreTemplateNotifyTopic"; String instanceName = "TemplateComsumer"; String groupName = "wechatUnionTemplateNotifyConsumer"; DefaultMQPushConsumer consumer = null; consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(rocketmqAddress);//MQ地址 consumer.setClientCallbackExecutorThreads(threadNum);//消費(fèi)現(xiàn)場(chǎng)數(shù)量 consumer.setInstanceName(instanceName);//實(shí)例名稱 consumer.subscribe(topics, "*"); //注冊(cè)監(jiān)聽 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( Listmsgs, ConsumeConcurrentlyContext context) { for (int i = 0; i < msgs.size(); i++) { MessageExt msgExt = msgs.get(i); String msgId = msgExt.getMsgId(); Integer flag = msgExt.getFlag(); TemplateNotifyItem templateNotifyItem = ProtoBufSerialize.fromProto(msgExt.getBody(), TemplateNotifyItem.class); logger.info("receive new Msg: " + " msgId=" + msgId + " flag=" + flag + " templateNotifyItem=" + templateNotifyItem); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); logger.info("監(jiān)聽執(zhí)行中"); Thread.sleep(1000000); }
參考:
http://blog.csdn.net/manzhizh...
https://www.jianshu.com/p/824...
架構(gòu)師之路
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/68163.html
摘要:前提好幾周沒更新博客了,對(duì)不斷支持我博客的童鞋們說聲抱歉了。熟悉我的人都知道我寫博客的時(shí)間比較早,而且堅(jiān)持的時(shí)間也比較久,一直到現(xiàn)在也是一直保持著更新狀態(tài)。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒更新博客了,對(duì)不斷支持我博客的童鞋們說聲:抱歉了!。自己這段時(shí)...
摘要:具體可以參考消息隊(duì)列之具體可以參考實(shí)戰(zhàn)之快速入門十分鐘入門阿里中間件團(tuán)隊(duì)博客是一個(gè)分布式的可分區(qū)的可復(fù)制的基于發(fā)布訂閱的消息系統(tǒng)主要用于大數(shù)據(jù)領(lǐng)域當(dāng)然在分布式系統(tǒng)中也有應(yīng)用。目前市面上流行的消息隊(duì)列就是阿里借鑒的原理用開發(fā)而得。 我自己總結(jié)的Java學(xué)習(xí)的系統(tǒng)知識(shí)點(diǎn)以及面試問題,目前已經(jīng)開源,會(huì)一直完善下去,歡迎建議和指導(dǎo)歡迎Star: https://github.com/Snail...
摘要:每個(gè)與集群中的所有節(jié)點(diǎn)建立長連接,定時(shí)注冊(cè)信息到所有。完全無狀態(tài),可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實(shí)現(xiàn)雖然版本不太一致但這也是能找到的最詳細(xì)的資料了接下來根據(jù)其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對(duì)MQ的理解一直不深,上周看了,還是覺得不夠深入,找個(gè)成熟的產(chǎn)品來學(xué)習(xí)吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...
摘要:通過以上分析我們可以得出消息隊(duì)列具有很好的削峰作用的功能即通過異步處理,將短時(shí)間高并發(fā)產(chǎn)生的事務(wù)消息存儲(chǔ)在消息隊(duì)列中,從而削平高峰期的并發(fā)事務(wù)。 該文已加入開源項(xiàng)目:JavaGuide(一份涵蓋大部分Java程序員所需要掌握的核心知識(shí)的文檔類項(xiàng)目,Star 數(shù)接近 16k)。地址:https://github.com/Snailclimb... 本文內(nèi)容思維導(dǎo)圖:showImg(ht...
閱讀 6179·2021-11-22 15:32
閱讀 813·2021-11-11 16:54
閱讀 3157·2021-10-13 09:40
閱讀 2162·2021-09-03 10:35
閱讀 1824·2021-08-09 13:47
閱讀 1865·2019-08-30 15:55
閱讀 1933·2019-08-30 15:43
閱讀 2455·2019-08-29 17:06