摘要:目前支持多種注冊(cè)中心。本編文章是分析使用作為注冊(cè)中心,如何整合進(jìn)行服務(wù)注冊(cè)和訂閱服務(wù)。
目前dubbo支持多種注冊(cè)中心:Zookeeper、Redis、Simple、Multicast、Etcd3。
本編文章是分析使用Zookeeper作為注冊(cè)中心,dubbo如何整合Zookeeper進(jìn)行服務(wù)注冊(cè)和訂閱服務(wù)。
首先dubbo將服務(wù)注冊(cè)到Zookeeper后,目錄結(jié)構(gòu)如下所示:(注冊(cè)接口名:com.bob.dubbo.service.CityDubboService)
在consumer和provider服務(wù)啟動(dòng)的時(shí)候,去把自身URL格式化成字符串,然后注冊(cè)到zookeeper相應(yīng)節(jié)點(diǎn)下,作為臨時(shí)節(jié)點(diǎn),斷開連接后,節(jié)點(diǎn)刪除;consumer啟動(dòng)時(shí),不僅會(huì)訂閱服務(wù),同時(shí)也會(huì)將自己的URL注冊(cè)到zookeeper中;
ZookeeperRegistryZookeeperRegistry:dubbo與zookeeper交互主要的類,已下結(jié)合源碼進(jìn)行分析,先來看
doSubcribe()
這個(gè)方法主要是用于訂閱服務(wù),添加監(jiān)聽器,動(dòng)態(tài)監(jiān)聽提供者列表變化:
@Override public void doSubscribe(final URL url, final NotifyListener listener) { try { // 處理所有service層發(fā)起的訂閱,例如監(jiān)控中心的訂閱 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) { String root = toRootPath(); ConcurrentMaplisteners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); listeners = zkListeners.get(url); } ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, (parentPath, currentChilds) -> { for (String child : currentChilds) { child = URL.decode(child); if (!anyServices.contains(child)) { anyServices.add(child); subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, Constants.CHECK_KEY, String.valueOf(false)), listener); } } }); zkListener = listeners.get(listener); } zkClient.create(root, false); List services = zkClient.addChildListener(root, zkListener); if (services != null && !services.isEmpty()) { for (String service : services) { service = URL.decode(service); anyServices.add(service); subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, Constants.CHECK_KEY, String.valueOf(false)), listener); } } // 處理指定service層發(fā)起的訂閱,例如服務(wù)消費(fèi)者的訂閱 } else { List urls = new ArrayList<>(); // 循環(huán)分類數(shù)組 , router, configurator, provider for (String path : toCategoriesPath(url)) { // 獲得 url 對(duì)應(yīng)的監(jiān)聽器集合 ConcurrentMap listeners = zkListeners.get(url); if (listeners == null) {// 不存在,進(jìn)行創(chuàng)建 zkListeners.putIfAbsent(url, new ConcurrentHashMap<>()); listeners = zkListeners.get(url); } // 獲得 ChildListener 對(duì)象 ChildListener zkListener = listeners.get(listener); if (zkListener == null) {// 不存在子目錄的監(jiān)聽器,進(jìn)行創(chuàng)建 ChildListener 對(duì)象 // 訂閱父級(jí)目錄, 當(dāng)有子節(jié)點(diǎn)發(fā)生變化時(shí),觸發(fā)此回調(diào)函數(shù),回調(diào)listener中的notify()方法 listeners.putIfAbsent(listener, (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds))); zkListener = listeners.get(listener); } 創(chuàng)建Type節(jié)點(diǎn),此節(jié)點(diǎn)為持久節(jié)點(diǎn) zkClient.create(path, false); // 向 Zookeeper ,PATH 節(jié)點(diǎn),發(fā)起訂閱,返回此節(jié)點(diǎn)下的所有子元素 path : /根節(jié)點(diǎn)/接口全名/providers, 比如 : /dubbo/com.bob.service.CityService/providers List children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } // 首次全量數(shù)據(jù)獲取完成時(shí),調(diào)用 `#notify(...)` 方法,回調(diào) NotifyListener, 在這一步從連接Provider,實(shí)例化Invoker notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
register()
ZookeeperRegistry父類FailbackRegistry中的方法,用于將服務(wù)注冊(cè)到zookeeper,具體代碼如下:
@Override public void register(URL url) { // 調(diào)用父類AbstractRegistry中的register()方法,將url存儲(chǔ)到注冊(cè)集合中 super.register(url); // 如果之前這個(gè)url注冊(cè)失敗,則會(huì)從注冊(cè)失敗集合中刪除 removeFailedRegistered(url); removeFailedUnregistered(url); try { // 像注冊(cè)中心發(fā)送注冊(cè)請(qǐng)求 doRegister(url); } catch (Exception e) { Throwable t = e; // If the startup detection is opened, the Exception is thrown directly. boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if (skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // 將url存入注冊(cè)失敗集合中,進(jìn)行重試try() addFailedRegistered(url); } }
doRegister()
ZookeeperRegistry類中的方法
@Override public void doRegister(URL url) { try { // 通過zookeeper客戶端向注冊(cè)中心發(fā)送服務(wù)注冊(cè)請(qǐng)求,在zookeeper下創(chuàng)建服務(wù)對(duì)應(yīng)的節(jié)點(diǎn) zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
在介紹注冊(cè)registry()方法的時(shí)候,解析到了FailbackRegistry類,接下來咱們來分析一下這個(gè)類的作用:
FailbackRegistry這個(gè)類是ZookeeperRegistry的父類,通過分析該類的結(jié)構(gòu),主要是用于服務(wù)的注冊(cè)、訂閱、重試,而服務(wù)具體的注冊(cè)、訂閱又在ZookeeperRegistry子類進(jìn)行了實(shí)現(xiàn),現(xiàn)在我們來分析重試這個(gè)功能,服務(wù)暴露和訂閱的配置文件中一般會(huì)設(shè)置重試這個(gè)屬性,如下所示:
上面是一個(gè)服務(wù)暴露的示例,設(shè)置了retries屬性,表示重試的次數(shù)。接下來咱們就以注冊(cè)重試進(jìn)行分析(服務(wù)訂閱是同樣的原理):在注冊(cè)registry()方法中(代碼上面已提供),在異常catch{}代碼塊中有一個(gè)addFailedRegistered(url)方法,這個(gè)就是將注冊(cè)失敗的url添加到集合中,并創(chuàng)建一個(gè)重試的任務(wù)FailedRegisteredTask(url, this),代碼如下:
private void addFailedRegistered(URL url) { // 先從集合中獲取,如果存在,直接返回 FailedRegisteredTask oldOne = failedRegistered.get(url); if (oldOne != null) { return; } // 本地集合不存在,則創(chuàng)建重試定時(shí)任務(wù),默認(rèn)每隔5s執(zhí)行 FailedRegisteredTask newTask = new FailedRegisteredTask(url, this); oldOne = failedRegistered.putIfAbsent(url, newTask); if (oldOne == null) { // 將定時(shí)任務(wù)放置在HashedWheelTimer這個(gè)處理定時(shí)任務(wù)的容器,(HashedWheelTimer執(zhí)行原理,可以自行查找資料,這里就不介紹) retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS); } }
咱們下來看FailedRegisteredTask這個(gè)定時(shí)任務(wù),有哪些東西,F(xiàn)ailedRegisteredTask是AbstractRetryTask的子類,在執(zhí)行new FailedRegisteredTask(url, this)代碼時(shí),其實(shí)調(diào)用的是父類構(gòu)造函數(shù),其中retryTimes表示重試的次數(shù),在沒有配置的情況下,默認(rèn)重試三次:
AbstractRetryTask(URL url, FailbackRegistry registry, String taskName) { if (url == null || StringUtils.isBlank(taskName)) { throw new IllegalArgumentException(); } this.url = url; this.registry = registry; this.taskName = taskName; cancel = false; this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); // 重試次數(shù),默認(rèn)情況下重試三次 this.retryTimes = url.getParameter(Constants.REGISTRY_RETRY_TIMES_KEY, Constants.DEFAULT_REGISTRY_RETRY_TIMES); }
在AbstractRetryTask類中有一個(gè)run()方法,在run()方法會(huì)根據(jù)XML配置文件中的retries屬性值進(jìn)行比較來進(jìn)行重試,如果沒有達(dá)到重試次數(shù),則會(huì)調(diào)用doRetry(url, registry, timeout),而這個(gè)方法又在子類具體實(shí)現(xiàn),這里我以注冊(cè)FailedRegisteredTask舉例:
@Override public void run(Timeout timeout) throws Exception { if (timeout.isCancelled() || timeout.timer().isStop() || isCancel()) { // other thread cancel this timeout or stop the timer. return; } // 重試次數(shù)與設(shè)置的retries進(jìn)行比較,超過則不在進(jìn)行重試 if (times > retryTimes) { // reach the most times of retry. logger.warn("Final failed to execute task " + taskName + ", url: " + url + ", retry " + retryTimes + " times."); return; } if (logger.isInfoEnabled()) { logger.info(taskName + " : " + url); } try { // 調(diào)用子類實(shí)現(xiàn),進(jìn)行重試 doRetry(url, registry, timeout); } catch (Throwable t) { // Ignore all the exceptions and wait for the next retry logger.warn("Failed to execute task " + taskName + ", url: " + url + ", waiting for again, cause:" + t.getMessage(), t); // reput this task when catch exception. reput(timeout, retryPeriod); } }
在子類FailedRegisteredTask中doRetry()方法具體實(shí)現(xiàn):
public final class FailedRegisteredTask extends AbstractRetryTask { private static final String NAME = "retry register"; public FailedRegisteredTask(URL url, FailbackRegistry registry) { super(url, registry, NAME); } @Override protected void doRetry(URL url, FailbackRegistry registry, Timeout timeout) { // 調(diào)用ZookeeperRegistry類中的doRegister()方法進(jìn)行注冊(cè) registry.doRegister(url); registry.removeFailedRegisteredTask(url); } }
分析到這里,有個(gè)疑問:重試任務(wù)已經(jīng)封裝了,任務(wù)什么時(shí)候去執(zhí)行,怎么執(zhí)行的?其實(shí)在上面咱們就分析到過,就是使用了HashedWheelTimer,這個(gè)類是在ZookeeperRegistry類初始化的時(shí)候就會(huì)去初始化:
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { // 這個(gè)地方進(jìn)行初始化的:初始化父類FailbackRegistry super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (!group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } this.root = group; zkClient = zookeeperTransporter.connect(url); zkClient.addStateListener(state -> { if (state == StateListener.RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } }); }
public FailbackRegistry(URL url) { super(url); this.retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); // 創(chuàng)建HashedWheelTimer對(duì)象 retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboRegistryRetryTimer", true), retryPeriod, TimeUnit.MILLISECONDS, 128); }
然后在addFailedRegistered()方法中有retryTimer.newTimeout(newTask, retryPeriod, TimeUnit.MILLISECONDS);這樣的一條代碼,這個(gè)就是執(zhí)行任務(wù)的開始點(diǎn):
@Override public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } // 開啟輪詢?nèi)蝿?wù) start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
調(diào)用start()方法時(shí),開啟一個(gè)線程work去輪詢存儲(chǔ)到HashedWheelTimer容器的任務(wù),然后調(diào)用任務(wù)中的run()方法,
public void start() { switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { // 開啟work線程,執(zhí)行work線程中的run()方法 workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // Wait until the startTime is initialized by the worker. while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
@Override public void run() { // Initialize the startTime. startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it"s not 0 when initialized. startTime = 1; } // Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown(); do { final long deadline = waitForNextTick(); if (deadline > 0) { int idx = (int) (tick & mask); processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; transferTimeoutsToBuckets(); // 執(zhí)行重試任務(wù) bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket : wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (; ; ) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } processCancelledTasks(); }
void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; // process all timeouts while (timeout != null) { // 輪詢獲取重試任務(wù) HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { next = remove(timeout); if (timeout.deadline <= deadline) { // 執(zhí)行重試任務(wù) timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { timeout.remainingRounds--; } timeout = next; } }
public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { // 調(diào)用任務(wù)中的run()方法,(如:AbstractRetryTask任務(wù)中的run()方法,在去調(diào)用子類FailedRegisteredTask中的doRetry()方法進(jìn)行重試注冊(cè)) task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + ".", t); } } }
在上面對(duì)于HashedWheelTimer的具體實(shí)現(xiàn)原理,并沒有進(jìn)行詳細(xì)的進(jìn)行分析,如果想了解的和學(xué)習(xí)的話,可以自行查找資料。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/73033.html
摘要:是目前非常流行的分布式服務(wù)技術(shù),很多公司都在用。空閑之余,搭了個(gè),分享給大家。本文下載下載是服務(wù)的注冊(cè)中心,下載后進(jìn)入到安裝目錄。雙擊即可啟動(dòng)注冊(cè)中心服務(wù)。 dubbo是目前非常流行的分布式服務(wù)技術(shù),很多公司都在用。空閑之余,搭了個(gè)helloworld,分享給大家。本文demo下載1.下載 zookeeperzookeeper是服務(wù)的注冊(cè)中心,下載后進(jìn)入到安裝目錄G:bakCenter...
摘要:?jiǎn)?dòng)容器,加載,運(yùn)行服務(wù)提供者。服務(wù)提供者在啟動(dòng)時(shí),在注冊(cè)中心發(fā)布注冊(cè)自己提供的服務(wù)。注冊(cè)中心返回服務(wù)提供者地址列表給消費(fèi)者,如果有變更,注冊(cè)中心將基于長(zhǎng)連接推送變更數(shù)據(jù)給消費(fèi)者。 一 為什么需要 dubbo 很多時(shí)候,其實(shí)我們使用這個(gè)技術(shù)的時(shí)候,可能都是因?yàn)轫?xiàng)目需要,所以,我們就用了,但是,至于為什么我們需要用到這個(gè)技術(shù),可能自身并不是很了解的,但是,其實(shí)了解技術(shù)的來由及背景知識(shí),對(duì)...
Github 地址:https://github.com/Snailclimb/springboot-integration-examples ,歡迎各位 Star。 目錄: 使用 SpringBoot+Dubbo 搭建一個(gè)簡(jiǎn)單分布式服務(wù) 實(shí)戰(zhàn)之前,先來看幾個(gè)重要的概念 什么是分布式? 什么是 Duboo? Dubbo 架構(gòu) 什么是 RPC? 為什么要用 Dubbo? 開始實(shí)戰(zhàn) 1 ...
摘要:構(gòu)建服務(wù)接口創(chuàng)建一個(gè)簡(jiǎn)單的項(xiàng)目,并在下面定義一個(gè)抽象接口,比如構(gòu)建服務(wù)接口提供方第一步創(chuàng)建一個(gè)項(xiàng)目,在中引入第一步中構(gòu)建的包以及對(duì)和的依賴,比如第一步中構(gòu)建的包這里需要注意兩點(diǎn)必須包含包,不然啟動(dòng)會(huì)報(bào)錯(cuò)。 很早以前,在剛開始搞Spring Cloud基礎(chǔ)教程的時(shí)候,寫過這樣一篇文章:《微服務(wù)架構(gòu)的基礎(chǔ)框架選擇:Spring Cloud還是Dubbo?》,可能不少讀者也都看過。之后也就一...
摘要:在微服務(wù)架構(gòu)中,注冊(cè)中心是核心的基礎(chǔ)服務(wù)之一。在微服務(wù)架構(gòu)流行之前,注冊(cè)中心就已經(jīng)開始出現(xiàn)在分布式架構(gòu)的系統(tǒng)中。服務(wù)提供者注冊(cè)到注冊(cè)中心,服務(wù)消費(fèi)者到注冊(cè)中心訂閱,同時(shí),注冊(cè)中心中的變更也會(huì)通知服務(wù)消費(fèi)者。 在微服務(wù)架構(gòu)中,注冊(cè)中心是核心的基礎(chǔ)服務(wù)之一。在微服務(wù)架構(gòu)流行之前,注冊(cè)中心就已經(jīng)開始出現(xiàn)在分布式架構(gòu)的系統(tǒng)中。Dubbo是一個(gè)在國內(nèi)比較流行的分布式框架,被大量的中小型互聯(lián)網(wǎng)公司...
閱讀 2632·2019-08-30 15:53
閱讀 2870·2019-08-29 16:20
閱讀 1081·2019-08-29 15:10
閱讀 1018·2019-08-26 10:58
閱讀 2188·2019-08-26 10:49
閱讀 630·2019-08-26 10:21
閱讀 700·2019-08-23 18:30
閱讀 1635·2019-08-23 15:58