摘要:完全無狀態,可集群部署與集群中的其中一個節點隨機選擇建立長連接,定期從取路由信息,并向提供服務的建立長連接,且定時向發送心跳。既可以從訂閱消息,也可以從訂閱消息,訂閱規則由配置決定。
問題列表:
Name Server 的作用是什么?
Name Server 存儲了Broker的什么信息?
Name Server 為Producer的提供些什么信息?
Name Server 為Consuner的提供些什么信息?
Name Server 作用Name Server在RocketMQ中猶如如它名字一樣,是提供Broker發現服務的.
Producer 與 Name Server 集群中的其中一個節點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,并向提供 Topic 服務的 Master 建立長連接,且定時向 Master 發送心跳。Producer 完全無狀態,可集群部署
Consumer 與 Name Server 集群中的其中一個節點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,并向提供 Topic 服務的 Master、Slave 建立長連接,且定時向 Master、Slave 發送心跳。Consumer 既可以從 Master 訂閱消息,也可以從 Slave 訂閱消息,訂閱規則由 Broker 配置決定。
Name Server 存儲了Broker的什么信息?RouteInfoManager
//主題信息 private final HashMap> topicQueueTable; //broker信息 private final HashMap brokerAddrTable; //集群信息 private final HashMap > clusterAddrTable; //活躍broker信息 private final HashMap brokerLiveTable; //過濾器信息 private final HashMap /* Filter Server */> filterServerTable;
我們注意到保存broker的Map有兩個,即brokerAddrTable用來保存所有的broker列表和brokerLiveTable用來保存當前活躍的broker列表,而BrokerData用來保存broker的主要新增,而BrokerLiveInfo只用來保存上次更新(心跳)時間,我們可以直接看看RouteInfoManager中掃描非活躍broker的方法:
public void scanNotActiveBroker() { Iterator> it = this.brokerLiveTable.entrySet().iterator(); while (it.hasNext()) { Entry next = it.next(); long last = next.getValue().getLastUpdateTimestamp(); if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) { RemotingUtil.closeChannel(next.getValue().getChannel()); it.remove(); log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME); this.onChannelDestroy(next.getKey(), next.getValue().getChannel()); } } }
這個方法由在initialize的定時線程池加載,每十秒執行一次.可以看出,如果兩分鐘內都沒收到一個broker的心跳數據,則直接將其從brokerLiveTable中移除,注意,這還會導致該broker從brokerAddrTable被刪除,當然,如果該broker是Master,則它的所有Slave的broker都將被刪除。具體細節可以參看RouteInfoManager的onChannelDestroy方法.
Name Server 為Producer的提供些什么信息?HashMap> topicQueueTable;
private String brokerName; // broker的名稱 private int readQueueNums; // 讀隊列數量 private int writeQueueNums; // 寫隊列數量 private int perm; // 讀寫權限 private int topicSynFlag; // 同步復制還是異步復制標記
NameServer 維護了key為topic,List
RouteInfoManager.pickupTopicRouteData
public TopicRouteData pickupTopicRouteData(final String topic) { TopicRouteData topicRouteData = new TopicRouteData(); boolean foundQueueData = false; boolean foundBrokerData = false; SetName Server 為Consuner的提供些什么信息?brokerNameSet = new HashSet (); List brokerDataList = new LinkedList (); topicRouteData.setBrokerDatas(brokerDataList); HashMap > filterServerMap = new HashMap >(); topicRouteData.setFilterServerTable(filterServerMap); try { try { this.lock.readLock().lockInterruptibly(); //根據topic獲取QueueData信息 List queueDataList = this.topicQueueTable.get(topic); if (queueDataList != null) { topicRouteData.setQueueDatas(queueDataList); foundQueueData = true; Iterator it = queueDataList.iterator(); while (it.hasNext()) { QueueData qd = it.next(); brokerNameSet.add(qd.getBrokerName()); } for (String brokerName : brokerNameSet) { BrokerData brokerData = this.brokerAddrTable.get(brokerName); if (null != brokerData) { //根據broker名稱獲取其地址信息 BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap ) brokerData .getBrokerAddrs().clone()); brokerDataList.add(brokerDataClone); foundBrokerData = true; for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) { List filterServerList = this.filterServerTable.get(brokerAddr); filterServerMap.put(brokerAddr, filterServerList); } } } } } finally { this.lock.readLock().unlock(); } } catch (Exception e) { log.error("pickupTopicRouteData Exception", e); } if (log.isDebugEnabled()) { log.debug("pickupTopicRouteData {} {}", topic, topicRouteData); } if (foundBrokerData && foundQueueData) { return topicRouteData; } return null; }
Consumer需要哪些信息?
1. Consumer需要的topic的broker信息 2. 每一個consumer group都有哪些consumer,對應的topic是誰
答
1.如上節所述 2.此信息保存在Broker中總結
Name Server比較簡單,如同一個簡單的web服務,提供配置信息,只不過CRUD的不是數據庫而是json文件.
此次RocketMQ學習就告一段落了,只描述了我比較關心的流程,很多細節沒能涉及到,有時間再寫吧,如有疑問和錯誤請在評論中指出,thx!
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70362.html
摘要:每個與集群中的所有節點建立長連接,定時注冊信息到所有。完全無狀態,可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實現雖然版本不太一致但這也是能找到的最詳細的資料了接下來根據其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個成熟的產品來學習吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...
摘要:前提好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲抱歉了。熟悉我的人都知道我寫博客的時間比較早,而且堅持的時間也比較久,一直到現在也是一直保持著更新狀態。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲:抱歉了!。自己這段時...
摘要:但是服務器又確實是收到了這條消息的,只是給客戶端的響應丟失了,所以導致的結果就是扣款失敗,成功發貨。所有的半消息都會寫在為的半消息隊列里,并且每條半消息,在整個鏈路里會被寫多次,如果并發很大且大部分消息都是事務消息的話,可靠性會存在問題。 前言 得益于MQ削峰填谷,系統解耦,操作異步等功能特性,在互聯網行業,可以說有分布式服務的地方,MQ都往往不會缺席。由阿里自研的RocketMQ更是...
摘要:但是服務器又確實是收到了這條消息的,只是給客戶端的響應丟失了,所以導致的結果就是扣款失敗,成功發貨。既然消息的發送不能和本地事務寫在一起,那如何來保證其整體具有原子性的需求呢答案就是今天我們介紹的主角事務消息。 前言 得益于MQ削峰填谷,系統解耦,操作異步等功能特性,在互聯網行業,可以說有分布式服務的地方,MQ都往往不會缺席。由阿里自研的RocketMQ更是經歷了多年的雙十一高并發挑戰...
閱讀 667·2021-11-24 09:39
閱讀 2329·2021-11-22 13:54
閱讀 2204·2021-09-23 11:46
閱讀 3250·2019-08-30 15:55
閱讀 2685·2019-08-30 15:54
閱讀 2409·2019-08-30 14:18
閱讀 1550·2019-08-29 14:15
閱讀 2738·2019-08-29 13:49