国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

RocketMQ源碼學習(六)-Name Server

Joyven / 2435人閱讀

摘要:完全無狀態,可集群部署與集群中的其中一個節點隨機選擇建立長連接,定期從取路由信息,并向提供服務的建立長連接,且定時向發送心跳。既可以從訂閱消息,也可以從訂閱消息,訂閱規則由配置決定。

問題列表:

Name Server 的作用是什么?

Name Server 存儲了Broker的什么信息?

Name Server 為Producer的提供些什么信息?

Name Server 為Consuner的提供些什么信息?

Name Server 作用

Name Server在RocketMQ中猶如如它名字一樣,是提供Broker發現服務的.

Producer 與 Name Server 集群中的其中一個節點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,并向提供 Topic 服務的 Master 建立長連接,且定時向 Master 發送心跳。Producer 完全無狀態,可集群部署

Consumer 與 Name Server 集群中的其中一個節點(隨機選擇)建立長連接,定期從 Name Server 取 Topic 路由信息,并向提供 Topic 服務的 Master、Slave 建立長連接,且定時向 Master、Slave 發送心跳。Consumer 既可以從 Master 訂閱消息,也可以從 Slave 訂閱消息,訂閱規則由 Broker 配置決定。

Name Server 存儲了Broker的什么信息?

RouteInfoManager

//主題信息
 private final HashMap> topicQueueTable;
//broker信息
    private final HashMap brokerAddrTable;
//集群信息
    private final HashMap> clusterAddrTable;
//活躍broker信息
    private final HashMap brokerLiveTable;
//過濾器信息
    private final HashMap/* Filter Server */> filterServerTable;

我們注意到保存broker的Map有兩個,即brokerAddrTable用來保存所有的broker列表和brokerLiveTable用來保存當前活躍的broker列表,而BrokerData用來保存broker的主要新增,而BrokerLiveInfo只用來保存上次更新(心跳)時間,我們可以直接看看RouteInfoManager中掃描非活躍broker的方法:

    public void scanNotActiveBroker() {
        Iterator> it = this.brokerLiveTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry next = it.next();
            long last = next.getValue().getLastUpdateTimestamp();
            if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {
                RemotingUtil.closeChannel(next.getValue().getChannel());
                it.remove();
                log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);
                this.onChannelDestroy(next.getKey(), next.getValue().getChannel());
            }
        }
    }

這個方法由在initialize的定時線程池加載,每十秒執行一次.可以看出,如果兩分鐘內都沒收到一個broker的心跳數據,則直接將其從brokerLiveTable中移除,注意,這還會導致該broker從brokerAddrTable被刪除,當然,如果該broker是Master,則它的所有Slave的broker都將被刪除。具體細節可以參看RouteInfoManager的onChannelDestroy方法.

Name Server 為Producer的提供些什么信息?
    HashMap> topicQueueTable;
    private String brokerName;  // broker的名稱
    private int readQueueNums;  // 讀隊列數量
    private int writeQueueNums; // 寫隊列數量
    private int perm;           // 讀寫權限
    private int topicSynFlag;   // 同步復制還是異步復制標記

NameServer 維護了key為topic,List的數據為Producer提供服務.返回TopicRouteData信息
RouteInfoManager.pickupTopicRouteData

public TopicRouteData pickupTopicRouteData(final String topic) {
        TopicRouteData topicRouteData = new TopicRouteData();
        boolean foundQueueData = false;
        boolean foundBrokerData = false;
        Set brokerNameSet = new HashSet();
        List brokerDataList = new LinkedList();
        topicRouteData.setBrokerDatas(brokerDataList);

        HashMap> filterServerMap = new HashMap>();
        topicRouteData.setFilterServerTable(filterServerMap);

        try {
            try {
                this.lock.readLock().lockInterruptibly();
                //根據topic獲取QueueData信息
                List queueDataList = this.topicQueueTable.get(topic);
                if (queueDataList != null) {
                    topicRouteData.setQueueDatas(queueDataList);
                    foundQueueData = true;

                    Iterator it = queueDataList.iterator();
                    while (it.hasNext()) {
                        QueueData qd = it.next();
                        brokerNameSet.add(qd.getBrokerName());
                    }

                    for (String brokerName : brokerNameSet) {
                        BrokerData brokerData = this.brokerAddrTable.get(brokerName);
                        if (null != brokerData) {
                        //根據broker名稱獲取其地址信息
                            BrokerData brokerDataClone = new BrokerData(brokerData.getCluster(), brokerData.getBrokerName(), (HashMap) brokerData
                                .getBrokerAddrs().clone());
                            brokerDataList.add(brokerDataClone);
                            foundBrokerData = true;
                            for (final String brokerAddr : brokerDataClone.getBrokerAddrs().values()) {
                                List filterServerList = this.filterServerTable.get(brokerAddr);
                                filterServerMap.put(brokerAddr, filterServerList);
                            }
                        }
                    }
                }
            } finally {
                this.lock.readLock().unlock();
            }
        } catch (Exception e) {
            log.error("pickupTopicRouteData Exception", e);
        }

        if (log.isDebugEnabled()) {
            log.debug("pickupTopicRouteData {} {}", topic, topicRouteData);
        }

        if (foundBrokerData && foundQueueData) {
            return topicRouteData;
        }

        return null;
    }
Name Server 為Consuner的提供些什么信息?

Consumer需要哪些信息?

   
1. Consumer需要的topic的broker信息
2. 每一個consumer group都有哪些consumer,對應的topic是誰

1.如上節所述
2.此信息保存在Broker中

總結

Name Server比較簡單,如同一個簡單的web服務,提供配置信息,只不過CRUD的不是數據庫而是json文件.

此次RocketMQ學習就告一段落了,只描述了我比較關心的流程,很多細節沒能涉及到,有時間再寫吧,如有疑問和錯誤請在評論中指出,thx!

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/70362.html

相關文章

  • RocketMQ源碼學習(一)-概述

    摘要:每個與集群中的所有節點建立長連接,定時注冊信息到所有。完全無狀態,可集群部署。本系列源碼解析主要參照原理簡介來追尋其代碼實現雖然版本不太一致但這也是能找到的最詳細的資料了接下來根據其模塊來源碼閱讀目錄如下 為什么選擇讀RocketMQ? 對MQ的理解一直不深,上周看了,還是覺得不夠深入,找個成熟的產品來學習吧,RabbitMQ是erLang寫的,Kafka是Scala寫的,非Java寫...

    godlong_X 評論0 收藏0
  • 寫這么多系列博客,怪不得找不到女朋友

    摘要:前提好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲抱歉了。熟悉我的人都知道我寫博客的時間比較早,而且堅持的時間也比較久,一直到現在也是一直保持著更新狀態。 showImg(https://segmentfault.com/img/remote/1460000014076586?w=1920&h=1080); 前提 好幾周沒更新博客了,對不斷支持我博客的童鞋們說聲:抱歉了!。自己這段時...

    JerryWangSAP 評論0 收藏0
  • 讓你看懂的RocketMQ事務消息源碼分析(干貨)

    摘要:但是服務器又確實是收到了這條消息的,只是給客戶端的響應丟失了,所以導致的結果就是扣款失敗,成功發貨。所有的半消息都會寫在為的半消息隊列里,并且每條半消息,在整個鏈路里會被寫多次,如果并發很大且大部分消息都是事務消息的話,可靠性會存在問題。 前言 得益于MQ削峰填谷,系統解耦,操作異步等功能特性,在互聯網行業,可以說有分布式服務的地方,MQ都往往不會缺席。由阿里自研的RocketMQ更是...

    zsirfs 評論0 收藏0
  • 一定能看懂的RocketMQ事務消息源碼分析(干貨)

    摘要:但是服務器又確實是收到了這條消息的,只是給客戶端的響應丟失了,所以導致的結果就是扣款失敗,成功發貨。既然消息的發送不能和本地事務寫在一起,那如何來保證其整體具有原子性的需求呢答案就是今天我們介紹的主角事務消息。 前言 得益于MQ削峰填谷,系統解耦,操作異步等功能特性,在互聯網行業,可以說有分布式服務的地方,MQ都往往不會缺席。由阿里自研的RocketMQ更是經歷了多年的雙十一高并發挑戰...

    myshell 評論0 收藏0

發表評論

0條評論

Joyven

|高級講師

TA的文章

閱讀更多
最新活動
閱讀需要支付1元查看
<