国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

dubbo源碼解析(七)注冊中心——zookeeper

wanglu1209 / 2443人閱讀

摘要:層根據不同的目錄可以有服務提供者服務消費者路由規則配置規則。通過這樣的方式,可以處理類似服務提供者為空的情況。

注冊中心——zookeeper
目標:解釋以為zookeeper實現的注冊中心原理,解讀duubo-registry-zookeeper的源碼

這篇文章是講解注冊中心的最后一篇文章。這篇文章講的是dubbo的注冊中心用zookeeper來實現。這種實現注冊中心的方法也是dubbo推薦的方法。為了能更加理解zookeeper在dubbo中的應用,接下來我先簡單的介紹一下zookeeper。

因為dubbo是一個分布式的RPC開源框架,各個服務之間多帶帶部署,就會出現資源之間不一致的問題。而zookeeper就有保證分布式一致性的特性。ZooKeeper是一種為分布式應用所設計的高可用、高性能且一致的開源協調服務。關于dubbo為什么會推薦使用zookeeper作為它的注冊中心實現,有很多書籍以及博客講解了zookeeper的特性以及優勢,這不是本章的重點,我要講的是zookeeper的數據結構,dubbo服務是如何被zookeeper的數據結構存儲管理的,因為這影響到下面源碼的解讀。zookeeper采用的是樹形結構來組織數據節點,它類似于一個標準的文件系統。先來看看下面這張圖:

該圖是官方文檔里面的一張圖,展示了dubbo在zookeeper中存儲的形式以及節點層級,

dubbo的Root層是根目錄,通過的“group”來設置zookeeper的根節點,缺省值是“dubbo”。

Service層是服務接口的全名。

Type層是分類,一共有四種分類,分別是providers(服務提供者列表)、consumers(服務消費者列表)、routes(路由規則列表)、configurations(配置規則列表)。

URL層:根據不同的Type目錄:可以有服務提供者 URL 、服務消費者 URL 、路由規則 URL 、配置規則 URL 。不同的Type關注的URL不同。

zookeeper以每個斜杠來分割每一層的znode,比如第一層根節點dubbo就是“/dubbo”,而第二層的Service層就是/com.foo.Barservice,zookeeper的每個節點通過路徑來表示以及訪問,例如服務提供者啟動時,向/dubbo/com.foo.Barservice/providers目錄下寫入自己的URL地址。關于流程調用說明,見官方文檔:

文檔地址:http://dubbo.apache.org/zh-cn...

了解了dubbo在zookeeper中的節點層級,就可以看相關的源碼了,下圖是包的結構:

跟前面三種實現方式一樣的目錄,也就兩個類,看起來非常的舒服,接下來就來解析這兩個類。

(一)ZookeeperRegistry

該類繼承了FailbackRegistry類,該類就是針對注冊中心核心的功能注冊、訂閱、取消注冊、取消訂閱,查詢注冊列表進行展開,基于zookeeper來實現。

1.屬性
// 日志記錄
private final static Logger logger = LoggerFactory.getLogger(ZookeeperRegistry.class);

// 默認的zookeeper端口
private final static int DEFAULT_ZOOKEEPER_PORT = 2181;

// 默認zookeeper根節點
private final static String DEFAULT_ROOT = "dubbo";

// zookeeper根節點
private final String root;

// 服務接口集合
private final Set anyServices = new ConcurrentHashSet();

// 監聽器集合
private final ConcurrentMap> zkListeners = new ConcurrentHashMap>();

// zookeeper客戶端實例
private final ZookeeperClient zkClient;

其實你會發現zookeeper雖然是最被推薦的,反而它的實現邏輯相對簡單,因為調用了zookeeper服務組件,很多的邏輯不需要在dubbo中自己去實現。上面的屬性介紹也很簡單,不需要多說,更多的是調用zookeeper客戶端。

2.構造方法
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
    super(url);
    if (url.isAnyHost()) {
        throw new IllegalStateException("registry address == null");
    }
    // 獲得url攜帶的分組配置,并且作為zookeeper的根節點
    String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
    if (!group.startsWith(Constants.PATH_SEPARATOR)) {
        group = Constants.PATH_SEPARATOR + group;
    }
    this.root = group;
    // 創建zookeeper client
    zkClient = zookeeperTransporter.connect(url);
    // 添加狀態監聽器,當狀態為重連的時候調用恢復方法
    zkClient.addStateListener(new StateListener() {
        @Override
        public void stateChanged(int state) {
            if (state == RECONNECTED) {
                try {
                    // 恢復
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    });
}

這里有以下幾個關注點:

參數中ZookeeperTransporter是一個接口,并且在dubbo中有ZkclientZookeeperTransporter和CuratorZookeeperTransporter兩個實現類,ZookeeperTransporter還是一個可擴展的接口,基于 Dubbo SPI Adaptive 機制,會根據url中攜帶的參數去選擇用哪個實現類。

上面我說明了dubbo在zookeeper節點層級有一層是root層,該層是通過group屬性來設置的。

給客戶端添加一個監聽器,當狀態為重連的時候調用FailbackRegistry的恢復方法

3.appendDefaultPort
static String appendDefaultPort(String address) {
    if (address != null && address.length() > 0) {
        int i = address.indexOf(":");
        // 如果地址本身沒有端口,則使用默認端口2181
        if (i < 0) {
            return address + ":" + DEFAULT_ZOOKEEPER_PORT;
        } else if (Integer.parseInt(address.substring(i + 1)) == 0) {
            return address.substring(0, i + 1) + DEFAULT_ZOOKEEPER_PORT;
        }
    }
    return address;
}

該方法是拼接使用默認的zookeeper端口,就是方地址本身沒有端口的時候才使用默認端口。

4.isAvailable && destroy
@Override
public boolean isAvailable() {
    return zkClient.isConnected();
}

@Override
public void destroy() {
    super.destroy();
    try {
        zkClient.close();
    } catch (Exception e) {
        logger.warn("Failed to close zookeeper client " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

這里兩個方法分別是檢測zookeeper是否連接以及銷毀連接,很簡單,都是調用了zookeeper客戶端封裝好的方法。

5.doRegister && doUnregister
@Override
protected void doRegister(URL url) {
    try {
        // 創建URL節點,也就是URL層的節點
        zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
        throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

@Override
protected void doUnregister(URL url) {
    try {
        // 刪除節點
        zkClient.delete(toUrlPath(url));
    } catch (Throwable e) {
        throw new RpcException("Failed to unregister " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

這兩個方法分別是注冊和取消注冊,也很簡單,調用都是客戶端create和delete方法,一個是創建一個節點,另一個是刪除節點,該操作都在URL層。

6.doSubscribe
@Override
protected void doSubscribe(final URL url, final NotifyListener listener) {
    try {
        // 處理所有Service層發起的訂閱,例如監控中心的訂閱
        if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
            // 獲得根目錄
            String root = toRootPath();
            // 獲得url對應的監聽器集合
            ConcurrentMap listeners = zkListeners.get(url);
            // 不存在就創建監聽器集合
            if (listeners == null) {
                zkListeners.putIfAbsent(url, new ConcurrentHashMap());
                listeners = zkListeners.get(url);
            }
            // 獲得節點監聽器
            ChildListener zkListener = listeners.get(listener);
            // 如果該節點監聽器為空,則創建
            if (zkListener == null) {
                listeners.putIfAbsent(listener, new ChildListener() {
                    @Override
                    public void childChanged(String parentPath, List currentChilds) {
                        // 遍歷現有的節點,如果現有的服務集合中沒有該節點,則加入該節點,然后訂閱該節點
                        for (String child : currentChilds) {
                            // 解碼
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    }
                });
                // 重新獲取,為了保證一致性
                zkListener = listeners.get(listener);
            }
            // 創建service節點,該節點為持久節點
            zkClient.create(root, false);
            // 向zookeeper的service節點發起訂閱,獲得Service接口全名數組
            List services = zkClient.addChildListener(root, zkListener);
            if (services != null && !services.isEmpty()) {
                // 遍歷Service接口全名數組
                for (String service : services) {
                    service = URL.decode(service);
                    anyServices.add(service);
                    // 發起該service層的訂閱
                    subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                            Constants.CHECK_KEY, String.valueOf(false)), listener);
                }
            }
        } else {
            // 處理指定 Service 層的發起訂閱,例如服務消費者的訂閱
            List urls = new ArrayList();
            // 遍歷分類數組
            for (String path : toCategoriesPath(url)) {
                // 獲得監聽器集合
                ConcurrentMap listeners = zkListeners.get(url);
                // 如果沒有則創建
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap());
                    listeners = zkListeners.get(url);
                }
                // 獲得節點監聽器
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, new ChildListener() {
                        @Override
                        public void childChanged(String parentPath, List currentChilds) {
                            // 通知服務變化 回調NotifyListener
                            ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
                        }
                    });
                    // 重新獲取節點監聽器,保證一致性
                    zkListener = listeners.get(listener);
                }
                // 創建type節點,該節點為持久節點
                zkClient.create(path, false);
                // 向zookeeper的type節點發起訂閱
                List children = zkClient.addChildListener(path, zkListener);
                if (children != null) {
                    // 加入到自子節點數據數組
                    urls.addAll(toUrlsWithEmpty(url, path, children));
                }
            }
            // 通知數據變化
            notify(url, listener, urls);
        }
    } catch (Throwable e) {
        throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

這個方法是訂閱,邏輯實現比較多,可以分兩段來看,這里的實現把所有Service層發起的訂閱以及指定的Service層發起的訂閱分開處理。所有Service層類似于監控中心發起的訂閱。指定的Service層發起的訂閱可以看作是服務消費者的訂閱。訂閱的大致邏輯類似,不過還是有幾個區別:

所有Service層發起的訂閱中的ChildListener是在在 Service 層發生變更時,才會做出解碼,用anyServices屬性判斷是否是新增的服務,最后調用父類的subscribe訂閱。而指定的Service層發起的訂閱是在URL層發生變更的時候,調用notify,回調回調NotifyListener的邏輯,做到通知服務變更。

所有Service層發起的訂閱中客戶端創建的節點是Service節點,該節點為持久節點,而指定的Service層發起的訂閱中創建的節點是Type節點,該節點也是持久節點。這里補充一下zookeeper的持久節點是節點創建后,就一直存在,直到有刪除操作來主動清除這個節點,不會因為創建該節點的客戶端會話失效而消失。而臨時節點的生命周期和客戶端會話綁定。也就是說,如果客戶端會話失效,那么這個節點就會自動被清除掉。注意,這里提到的是會話失效,而非連接斷開。另外,在臨時節點下面不能創建子節點。

指定的Service層發起的訂閱中調用了兩次notify,第一次是增量的通知,也就是只是通知這次增加的服務節點,而第二個是全量的通知。

7.doUnsubscribe
@Override
protected void doUnsubscribe(URL url, NotifyListener listener) {
    // 獲得監聽器集合
    ConcurrentMap listeners = zkListeners.get(url);
    if (listeners != null) {
        // 獲得子節點的監聽器
        ChildListener zkListener = listeners.get(listener);
        if (zkListener != null) {
            // 如果為全部的服務接口,例如監控中心
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                // 獲得根目錄
                String root = toRootPath();
                // 移除監聽器
                zkClient.removeChildListener(root, zkListener);
            } else {
                // 遍歷分類數組進行移除監聽器
                for (String path : toCategoriesPath(url)) {
                    zkClient.removeChildListener(path, zkListener);
                }
            }
        }
    }
}

該方法是取消訂閱,也是分為兩種情況,所有的Service發起的取消訂閱還是指定的Service發起的取消訂閱。可以看到所有的Service發起的取消訂閱就直接移除了根目錄下所有的監聽器,而指定的Service發起的取消訂閱是移除了該Service層下面的所有Type節點監聽器。如果不太明白再回去看看前面的那個節點層級圖。

8.lookup
@Override
public List lookup(URL url) {
    if (url == null) {
        throw new IllegalArgumentException("lookup url == null");
    }
    try {
        List providers = new ArrayList();
        // 遍歷分組類別
        for (String path : toCategoriesPath(url)) {
            // 獲得子節點
            List children = zkClient.getChildren(path);
            if (children != null) {
                providers.addAll(children);
            }
        }
        // 獲得 providers 中,和 consumer 匹配的 URL 數組
        return toUrlsWithoutEmpty(url, providers);
    } catch (Throwable e) {
        throw new RpcException("Failed to lookup " + url + " from zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
}

該方法就是查詢符合條件的已經注冊的服務。調用了toUrlsWithoutEmpty方法,在后面會講到。

9.toServicePath
private String toServicePath(URL url) {
    String name = url.getServiceInterface();
    // 如果是包括所有服務,則返回根節點
    if (Constants.ANY_VALUE.equals(name)) {
        return toRootPath();
    }
    return toRootDir() + URL.encode(name);
}

該方法是獲得服務路徑,拼接規則:Root + Type。

10.toCategoriesPath
private String[] toCategoriesPath(URL url) {
    String[] categories;
    // 如果url攜帶的分類配置為*,則創建包括所有分類的數組
    if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
        categories = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
                Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
    } else {
        // 返回url攜帶的分類配置
        categories = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
    }
    String[] paths = new String[categories.length];
    for (int i = 0; i < categories.length; i++) {
        // 加上服務路徑
        paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categories[i];
    }
    return paths;
}

private String toCategoryPath(URL url) {
    return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
}

第一個方法是獲得分類數組,也就是url攜帶的服務下的所有Type節點數組。第二個是獲得分類路徑,分類路徑拼接規則:Root + Service + Type

11.toUrlPath
private String toUrlPath(URL url) {
    return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
}

該方法是獲得URL路徑,拼接規則是Root + Service + Type + URL

12.toUrlsWithoutEmpty && toUrlsWithEmpty
private List toUrlsWithoutEmpty(URL consumer, List providers) {
    List urls = new ArrayList();
    if (providers != null && !providers.isEmpty()) {
        // 遍歷服務提供者
        for (String provider : providers) {
            // 解碼
            provider = URL.decode(provider);
            if (provider.contains("://")) {
                // 把服務轉化成url的形式
                URL url = URL.valueOf(provider);
                // 判斷是否匹配,如果匹配, 則加入到集合中
                if (UrlUtils.isMatch(consumer, url)) {
                    urls.add(url);
                }
            }
        }
    }
    return urls;
}

private List toUrlsWithEmpty(URL consumer, String path, List providers) {
    // 返回和服務消費者匹配的服務提供者url
    List urls = toUrlsWithoutEmpty(consumer, providers);
    // 如果不存在,則創建`empty://` 的 URL返回
    if (urls == null || urls.isEmpty()) {
        int i = path.lastIndexOf("/");
        String category = i < 0 ? path : path.substring(i + 1);
        URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category);
        urls.add(empty);
    }
    return urls;
}

第一個toUrlsWithoutEmpty方法是獲得 providers 中,和 consumer 匹配的 URL 數組,第二個toUrlsWithEmpty方法是調用了第一個方法后增加了若不存在匹配,則創建 empty:// 的 URL返回。通過這樣的方式,可以處理類似服務提供者為空的情況。

(二)ZookeeperRegistryFactory

該類繼承了AbstractRegistryFactory類,實現了AbstractRegistryFactory抽象出來的createRegistry方法,看一下原代碼:

public class ZookeeperRegistryFactory extends AbstractRegistryFactory {

    private ZookeeperTransporter zookeeperTransporter;

    public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
        this.zookeeperTransporter = zookeeperTransporter;
    }

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

}

可以看到就是實例化了ZookeeperRegistry而已,所有這里就不解釋了。

后記
該部分相關的源碼解析地址:https://github.com/CrazyHZM/i...

該文章講解了dubbo利用zookeeper來實現注冊中心,其中關鍵的是需要弄明白dubbo在zookeeper中存儲的節點層級意義,也就是root層、service層、type層以及url層分別代表什么,其他的邏輯并不復雜大多數調用了zookeeper客戶端的能力,有興趣的同學也可以深入的去了解zookeeper。如果我在哪一部分寫的不夠到位或者寫錯了,歡迎給我提意見,我的私人微信號碼:HUA799695226。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72402.html

相關文章

  • dubbo源碼解析(一)Hello,Dubbo

    摘要:英文全名為,也叫遠程過程調用,其實就是一個計算機通信協議,它是一種通過網絡從遠程計算機程序上請求服務而不需要了解底層網絡技術的協議。 Hello,Dubbo 你好,dubbo,初次見面,我想和你交個朋友。 Dubbo你到底是什么? 先給出一套官方的說法:Apache Dubbo是一款高性能、輕量級基于Java的RPC開源框架。 那么什么是RPC? 文檔地址:http://dubbo.a...

    evin2016 評論0 收藏0
  • dubbo源碼解析(四十四)服務暴露過程

    摘要:服務暴露過程目標從源碼的角度分析服務暴露過程。導出服務,包含暴露服務到本地,和暴露服務到遠程兩個過程。其中服務暴露的第八步已經沒有了。將泛化調用版本號或者等信息加入獲得服務暴露地址和端口號,利用內數據組裝成。 dubbo服務暴露過程 目標:從源碼的角度分析服務暴露過程。 前言 本來這一篇一個寫異步化改造的內容,但是最近我一直在想,某一部分的優化改造該怎么去撰寫才能更加的讓讀者理解。我覺...

    light 評論0 收藏0
  • dubbo源碼解析(四十三)2.7新特性

    摘要:大揭秘目標了解的新特性,以及版本升級的引導。四元數據改造我們知道以前的版本只有注冊中心,注冊中心的有數十個的鍵值對,包含了一個服務所有的元數據。 DUBBO——2.7大揭秘 目標:了解2.7的新特性,以及版本升級的引導。 前言 我們知道Dubbo在2011年開源,停止更新了一段時間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發布了 2.5.4 版本。隨后,版本...

    qqlcbb 評論0 收藏0
  • Dubbo 2.7.1 踩坑記

    摘要:面試題服務提供者能實現失效踢出是什么原理高頻題服務宕機的時候,該節點由于是持久節點會永遠存在,而且當服務再次重啟的時候會將重新注冊一個新節點。 Dubbo 2.7 版本增加新特性,新系統開始使用 Dubbo 2.7.1 嘗鮮新功能。使用過程中不慎踩到這個版本的 Bug。 系統架構 Spring Boot 2.14-Release + Dubbo 2.7.1 現象 Dubbo 服務者啟動...

    wudengzan 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<