国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

dubbo之Zookeeper注冊(cè)中心

Null / 2682人閱讀

摘要:目前支持多種注冊(cè)中心。本編文章是分析使用作為注冊(cè)中心,如何整合進(jìn)行服務(wù)注冊(cè)和訂閱服務(wù)。

目前dubbo支持多種注冊(cè)中心:Zookeeper、Redis、Simple、Multicast、Etcd3。

本編文章是分析使用Zookeeper作為注冊(cè)中心,dubbo如何整合Zookeeper進(jìn)行服務(wù)注冊(cè)和訂閱服務(wù)。

首先dubbo將服務(wù)注冊(cè)到Zookeeper后,目錄結(jié)構(gòu)如下所示:(注冊(cè)接口名:com.bob.dubbo.service.CityDubboService)

在consumer和provider服務(wù)啟動(dòng)的時(shí)候,去把自身URL格式化成字符串,然后注冊(cè)到zookeeper相應(yīng)節(jié)點(diǎn)下,作為臨時(shí)節(jié)點(diǎn),斷開連接后,節(jié)點(diǎn)刪除;consumer啟動(dòng)時(shí),不僅會(huì)訂閱服務(wù),同時(shí)也會(huì)將自己的URL注冊(cè)到zookeeper中;

ZookeeperRegistry

ZookeeperRegistry:dubbo與zookeeper交互主要的類,已下結(jié)合源碼進(jìn)行分析,先來看

doSubcribe()

這個(gè)方法主要是用于訂閱服務(wù),添加監(jiān)聽器,動(dòng)態(tài)監(jiān)聽提供者列表變化:

@Override
    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            // 處理所有service層發(fā)起的訂閱,例如監(jiān)控中心的訂閱
            if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap listeners = zkListeners.get(url);
                if (listeners == null) {
                    zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                    listeners = zkListeners.get(url);
                }
                ChildListener zkListener = listeners.get(listener);
                if (zkListener == null) {
                    listeners.putIfAbsent(listener, (parentPath, currentChilds) -> {
                        for (String child : currentChilds) {
                            child = URL.decode(child);
                            if (!anyServices.contains(child)) {
                                anyServices.add(child);
                                subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
                                        Constants.CHECK_KEY, String.valueOf(false)), listener);
                            }
                        }
                    });
                    zkListener = listeners.get(listener);
                }
                zkClient.create(root, false);
                List services = zkClient.addChildListener(root, zkListener);
                if (services != null && !services.isEmpty()) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
                // 處理指定service層發(fā)起的訂閱,例如服務(wù)消費(fèi)者的訂閱
            } else {
                List urls = new ArrayList<>();
                // 循環(huán)分類數(shù)組 , router, configurator, provider
                for (String path : toCategoriesPath(url)) {
                    // 獲得 url 對(duì)應(yīng)的監(jiān)聽器集合
                    ConcurrentMap listeners = zkListeners.get(url);
                    if (listeners == null) {// 不存在,進(jìn)行創(chuàng)建
                        zkListeners.putIfAbsent(url, new ConcurrentHashMap<>());
                        listeners = zkListeners.get(url);
                    }
                    // 獲得 ChildListener 對(duì)象
                    ChildListener zkListener = listeners.get(listener);
                    if (zkListener == null) {//  不存在子目錄的監(jiān)聽器,進(jìn)行創(chuàng)建 ChildListener 對(duì)象
                        // 訂閱父級(jí)目錄, 當(dāng)有子節(jié)點(diǎn)發(fā)生變化時(shí),觸發(fā)此回調(diào)函數(shù),回調(diào)listener中的notify()方法
                        listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)));
                        zkListener = listeners.get(listener);
                    }
                    創(chuàng)建Type節(jié)點(diǎn),此節(jié)點(diǎn)為持久節(jié)點(diǎn)
                    zkClient.create(path, false);
                    // 向 Zookeeper ,PATH 節(jié)點(diǎn),發(fā)起訂閱,返回此節(jié)點(diǎn)下的所有子元素 path : /根節(jié)點(diǎn)/接口全名/providers, 比如 : /dubbo/com.bob.service.CityService/providers
                    List children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                // 首次全量數(shù)據(jù)獲取完成時(shí),調(diào)用 `#notify(...)` 方法,回調(diào) NotifyListener, 在這一步從連接Provider,實(shí)例化Invoker
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

register()

ZookeeperRegistry父類FailbackRegistry中的方法,用于將服務(wù)注冊(cè)到zookeeper,具體代碼如下:

    @Override
    public void register(URL url) {
        // 調(diào)用父類AbstractRegistry中的register()方法,將url存儲(chǔ)到注冊(cè)集合中
        super.register(url);
        // 如果之前這個(gè)url注冊(cè)失敗,則會(huì)從注冊(cè)失敗集合中刪除
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // 像注冊(cè)中心發(fā)送注冊(cè)請(qǐng)求
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
            }

            // 將url存入注冊(cè)失敗集合中,進(jìn)行重試try()
            addFailedRegistered(url);
        }
    }

doRegister()

ZookeeperRegistry類中的方法

    @Override
    public void doRegister(URL url) {
        try {
            // 通過zookeeper客戶端向注冊(cè)中心發(fā)送服務(wù)注冊(cè)請(qǐng)求,在zookeeper下創(chuàng)建服務(wù)對(duì)應(yīng)的節(jié)點(diǎn)
            zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

在介紹注冊(cè)registry()方法的時(shí)候,解析到了FailbackRegistry類,接下來咱們來分析一下這個(gè)類的作用:

FailbackRegistry

這個(gè)類是ZookeeperRegistry的父類,通過分析該類的結(jié)構(gòu),主要是用于服務(wù)的注冊(cè)、訂閱、重試,而服務(wù)具體的注冊(cè)、訂閱又在ZookeeperRegistry子類進(jìn)行了實(shí)現(xiàn),現(xiàn)在我們來分析重試這個(gè)功能,服務(wù)暴露和訂閱的配置文件中一般會(huì)設(shè)置重試這個(gè)屬性,如下所示:

上面是一個(gè)服務(wù)暴露的示例,設(shè)置了retries屬性,表示重試的次數(shù)。接下來咱們就以注冊(cè)重試進(jìn)行分析(服務(wù)訂閱是同樣的原理):在注冊(cè)registry()方法中(代碼上面已提供),在異常catch{}代碼塊中有一個(gè)addFailedRegistered(url)方法,這個(gè)就是將注冊(cè)失敗的url添加到集合中,并創(chuàng)建一個(gè)重試的任務(wù)FailedRegisteredTask(url, this),代碼如下:

    private void addFailedRegistered(URL url) {
        // 先從集合中獲取,如果存在,直接返回
        FailedRegisteredTask oldOne = failedRegistered.get(url);
        if (oldOne != null) {
            return;
        }
        // 本地集合不存在,則創(chuàng)建重試定時(shí)任務(wù),默認(rèn)每隔5s執(zhí)行
        FailedRegisteredTask newTask = new FailedRegisteredTask(url, this);
        oldOne = failedRegistered.putIfAbsent(url, newTask);
        if (oldOne == null) {
            // 將定時(shí)任務(wù)放置在HashedWheelTimer這個(gè)處理定時(shí)任務(wù)的容器,(HashedWheelTimer執(zhí)行原理,可以自行查找資料,這里就不介紹)
            retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);
        }
    }

咱們下來看FailedRegisteredTask這個(gè)定時(shí)任務(wù),有哪些東西,F(xiàn)ailedRegisteredTask是AbstractRetryTask的子類,在執(zhí)行new FailedRegisteredTask(url, this)代碼時(shí),其實(shí)調(diào)用的是父類構(gòu)造函數(shù),其中retryTimes表示重試的次數(shù),在沒有配置的情況下,默認(rèn)重試三次:

    AbstractRetryTask(URL url, FailbackRegistry registry, String taskName) {
        if (url == null || StringUtils.isBlank(taskName)) {
            throw new IllegalArgumentException();
        }
        this.url = url;
        this.registry = registry;
        this.taskName = taskName;
        cancel = false;
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);
        // 重試次數(shù),默認(rèn)情況下重試三次
        this.retryTimes = url.getParameter(Constants.REGISTRY_RETRY_TIMES_KEY, Constants.DEFAULT_REGISTRY_RETRY_TIMES);
    }

在AbstractRetryTask類中有一個(gè)run()方法,在run()方法會(huì)根據(jù)XML配置文件中的retries屬性值進(jìn)行比較來進(jìn)行重試,如果沒有達(dá)到重試次數(shù),則會(huì)調(diào)用doRetry(url, registry, timeout),而這個(gè)方法又在子類具體實(shí)現(xiàn),這里我以注冊(cè)FailedRegisteredTask舉例:

    @Override
    public void run(Timeout timeout) throws Exception {
        if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) {
            // other thread cancel this timeout or stop the timer.
            return;
        }
        // 重試次數(shù)與設(shè)置的retries進(jìn)行比較,超過則不在進(jìn)行重試
        if (times > retryTimes) {
            // reach the most times of retry.
            logger.warn("Final failed to execute task " + taskName + ", url: " + url + ", retry " + retryTimes + " times.");
            return;
        }
        if (logger.isInfoEnabled()) {
            logger.info(taskName + " : " + url);
        }
        try {
            // 調(diào)用子類實(shí)現(xiàn),進(jìn)行重試
            doRetry(url, registry, timeout);
        } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry
            logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t);
            // reput this task when catch exception.
            reput(timeout, retryPeriod);
        }
    }

在子類FailedRegisteredTask中doRetry()方法具體實(shí)現(xiàn):

public final class FailedRegisteredTask extends AbstractRetryTask {

    private static final String NAME = "retry register";

    public FailedRegisteredTask(URL url, FailbackRegistry registry) {
        super(url, registry, NAME);
    }

    @Override
    protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) {
        // 調(diào)用ZookeeperRegistry類中的doRegister()方法進(jìn)行注冊(cè)
        registry.doRegister(url);
        registry.removeFailedRegisteredTask(url);
    }
}

分析到這里,有個(gè)疑問:重試任務(wù)已經(jīng)封裝了,任務(wù)什么時(shí)候去執(zhí)行,怎么執(zhí)行的?其實(shí)在上面咱們就分析到過,就是使用了HashedWheelTimer,這個(gè)類是在ZookeeperRegistry類初始化的時(shí)候就會(huì)去初始化:

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        // 這個(gè)地方進(jìn)行初始化的:初始化父類FailbackRegistry
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(Constants.PATH_SEPARATOR)) {
            group = Constants.PATH_SEPARATOR + group;
        }
        this.root = group;
        zkClient = zookeeperTransporter.connect(url);
        zkClient.addStateListener(state -> {
            if (state == StateListener.RECONNECTED) {
                try {
                    recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            }
        });
    }
    public FailbackRegistry(URL url) {
        super(url);
        this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);

        // 創(chuàng)建HashedWheelTimer對(duì)象
        retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128);
    }

然后在addFailedRegistered()方法中有retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);這樣的一條代碼,這個(gè)就是執(zhí)行任務(wù)的開始點(diǎn):

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                    + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                    + "timeouts (" + maxPendingTimeouts + ")");
        }
        // 開啟輪詢?nèi)蝿?wù)
        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

調(diào)用start()方法時(shí),開啟一個(gè)線程work去輪詢存儲(chǔ)到HashedWheelTimer容器的任務(wù),然后調(diào)用任務(wù)中的run()方法,

    public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    // 開啟work線程,執(zhí)行work線程中的run()方法
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }
        @Override
        public void run() {
            // Initialize the startTime.
            startTime = System.nanoTime();
            if (startTime == 0) {
                // We use 0 as an indicator for the uninitialized value here, so make sure it"s not 0 when initialized.
                startTime = 1;
            }

            // Notify the other threads waiting for the initialization at start().
            startTimeInitialized.countDown();

            do {
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    int idx = (int) (tick & mask);
                    processCancelledTasks();
                    HashedWheelBucket bucket =
                            wheel[idx];
                    transferTimeoutsToBuckets();
                    // 執(zhí)行重試任務(wù)
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

            // Fill the unprocessedTimeouts so we can return them from stop() method.
            for (HashedWheelBucket bucket : wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            for (; ; ) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            processCancelledTasks();
        }
       void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;

            // process all timeouts
            while (timeout != null) {
                // 輪詢獲取重試任務(wù)
                HashedWheelTimeout next = timeout.next;
                if (timeout.remainingRounds <= 0) {
                    next = remove(timeout);
                    if (timeout.deadline <= deadline) {
                        // 執(zhí)行重試任務(wù)
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    timeout.remainingRounds--;
                }
                timeout = next;
            }
        }
        public void expire() {
            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
                return;
            }

            try {
                // 調(diào)用任務(wù)中的run()方法,(如:AbstractRetryTask任務(wù)中的run()方法,在去調(diào)用子類FailedRegisteredTask中的doRetry()方法進(jìn)行重試注冊(cè))
                task.run(this);
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ".", t);
                }
            }
        }

在上面對(duì)于HashedWheelTimer的具體實(shí)現(xiàn)原理,并沒有進(jìn)行詳細(xì)的進(jìn)行分析,如果想了解的和學(xué)習(xí)的話,可以自行查找資料。

文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。

轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/73033.html

相關(guān)文章

  • dubbo zookeeper spring整合helloworld

    摘要:是目前非常流行的分布式服務(wù)技術(shù),很多公司都在用。空閑之余,搭了個(gè),分享給大家。本文下載下載是服務(wù)的注冊(cè)中心,下載后進(jìn)入到安裝目錄。雙擊即可啟動(dòng)注冊(cè)中心服務(wù)。 dubbo是目前非常流行的分布式服務(wù)技術(shù),很多公司都在用。空閑之余,搭了個(gè)helloworld,分享給大家。本文demo下載1.下載 zookeeperzookeeper是服務(wù)的注冊(cè)中心,下載后進(jìn)入到安裝目錄G:bakCenter...

    sugarmo 評(píng)論0 收藏0
  • Dubbo 一篇文章就夠了:從入門到實(shí)戰(zhàn)

    摘要:?jiǎn)?dòng)容器,加載,運(yùn)行服務(wù)提供者。服務(wù)提供者在啟動(dòng)時(shí),在注冊(cè)中心發(fā)布注冊(cè)自己提供的服務(wù)。注冊(cè)中心返回服務(wù)提供者地址列表給消費(fèi)者,如果有變更,注冊(cè)中心將基于長(zhǎng)連接推送變更數(shù)據(jù)給消費(fèi)者。 一 為什么需要 dubbo 很多時(shí)候,其實(shí)我們使用這個(gè)技術(shù)的時(shí)候,可能都是因?yàn)轫?xiàng)目需要,所以,我們就用了,但是,至于為什么我們需要用到這個(gè)技術(shù),可能自身并不是很了解的,但是,其實(shí)了解技術(shù)的來由及背景知識(shí),對(duì)...

    tomener 評(píng)論0 收藏0
  • 超詳細(xì),新手都能看懂 !使用SpringBoot+Dubbo 搭建一個(gè)簡(jiǎn)單的分布式服務(wù)

    Github 地址:https://github.com/Snailclimb/springboot-integration-examples ,歡迎各位 Star。 目錄: 使用 SpringBoot+Dubbo 搭建一個(gè)簡(jiǎn)單分布式服務(wù) 實(shí)戰(zhàn)之前,先來看幾個(gè)重要的概念 什么是分布式? 什么是 Duboo? Dubbo 架構(gòu) 什么是 RPC? 為什么要用 Dubbo? 開始實(shí)戰(zhàn) 1 ...

    chengtao1633 評(píng)論0 收藏0
  • Spring Cloud與Dubbo的完美融合手「Spring Cloud Alibaba」

    摘要:構(gòu)建服務(wù)接口創(chuàng)建一個(gè)簡(jiǎn)單的項(xiàng)目,并在下面定義一個(gè)抽象接口,比如構(gòu)建服務(wù)接口提供方第一步創(chuàng)建一個(gè)項(xiàng)目,在中引入第一步中構(gòu)建的包以及對(duì)和的依賴,比如第一步中構(gòu)建的包這里需要注意兩點(diǎn)必須包含包,不然啟動(dòng)會(huì)報(bào)錯(cuò)。 很早以前,在剛開始搞Spring Cloud基礎(chǔ)教程的時(shí)候,寫過這樣一篇文章:《微服務(wù)架構(gòu)的基礎(chǔ)框架選擇:Spring Cloud還是Dubbo?》,可能不少讀者也都看過。之后也就一...

    wpw 評(píng)論0 收藏0
  • 微服務(wù)架構(gòu)基礎(chǔ)注冊(cè)中心

    摘要:在微服務(wù)架構(gòu)中,注冊(cè)中心是核心的基礎(chǔ)服務(wù)之一。在微服務(wù)架構(gòu)流行之前,注冊(cè)中心就已經(jīng)開始出現(xiàn)在分布式架構(gòu)的系統(tǒng)中。服務(wù)提供者注冊(cè)到注冊(cè)中心,服務(wù)消費(fèi)者到注冊(cè)中心訂閱,同時(shí),注冊(cè)中心中的變更也會(huì)通知服務(wù)消費(fèi)者。 在微服務(wù)架構(gòu)中,注冊(cè)中心是核心的基礎(chǔ)服務(wù)之一。在微服務(wù)架構(gòu)流行之前,注冊(cè)中心就已經(jīng)開始出現(xiàn)在分布式架構(gòu)的系統(tǒng)中。Dubbo是一個(gè)在國內(nèi)比較流行的分布式框架,被大量的中小型互聯(lián)網(wǎng)公司...

    JayChen 評(píng)論0 收藏0

發(fā)表評(píng)論

0條評(píng)論

最新活動(dòng)
閱讀需要支付1元查看
<