摘要:失敗安全,出現(xiàn)異常時(shí),直接忽略。失敗自動(dòng)恢復(fù),在調(diào)用失敗后,返回一個(gè)空結(jié)果給服務(wù)提供者。源碼分析一該類(lèi)實(shí)現(xiàn)了接口,是集群的抽象類(lèi)。
集群——cluster
目標(biāo):介紹dubbo中集群容錯(cuò)的幾種模式,介紹dubbo-cluster下support包的源碼。前言
集群容錯(cuò)還是很好理解的,就是當(dāng)你調(diào)用失敗的時(shí)候所作出的措施。先來(lái)看看有哪些模式:
圖有點(diǎn)小,見(jiàn)諒,不過(guò)可以瞇著眼睛看稍微能看出來(lái)一點(diǎn),每一個(gè)Cluster實(shí)現(xiàn)類(lèi)都對(duì)應(yīng)著一個(gè)invoker,因?yàn)檫@個(gè)模式啟用的時(shí)間點(diǎn)就是在調(diào)用的時(shí)候,而我在之前的文章里面講過(guò),invoker貫穿來(lái)整個(gè)服務(wù)的調(diào)用。不過(guò)這里除了調(diào)用失敗的一些模式外,還有幾個(gè)特別的模式,他們應(yīng)該說(shuō)成是失敗的措施,而已調(diào)用的方式。
Failsafe Cluster:失敗安全,出現(xiàn)異常時(shí),直接忽略。失敗安全就是當(dāng)調(diào)用過(guò)程中出現(xiàn)異常時(shí),F(xiàn)ailsafeClusterInvoker 僅會(huì)打印異常,而不會(huì)拋出異常。適用于寫(xiě)入審計(jì)日志等操作
Failover Cluster:失敗自動(dòng)切換,當(dāng)調(diào)用出現(xiàn)失敗的時(shí)候,會(huì)自動(dòng)切換集群中其他服務(wù)器,來(lái)獲得invoker重試,通常用于讀操作,但重試會(huì)帶來(lái)更長(zhǎng)延遲。一般都會(huì)設(shè)置重試次數(shù)。
Failfast Cluster:只會(huì)進(jìn)行一次調(diào)用,失敗后立即拋出異常。適用于冪等操作,比如新增記錄。
Failback Cluster:失敗自動(dòng)恢復(fù),在調(diào)用失敗后,返回一個(gè)空結(jié)果給服務(wù)提供者。并通過(guò)定時(shí)任務(wù)對(duì)失敗的調(diào)用記錄并且重傳,適合執(zhí)行消息通知等操作。
Forking Cluster:會(huì)在線程池中運(yùn)行多個(gè)線程,來(lái)調(diào)用多個(gè)服務(wù)器,只要一個(gè)成功即返回。通常用于實(shí)時(shí)性要求較高的讀操作,但需要浪費(fèi)更多服務(wù)資源。一般會(huì)設(shè)置最大并行數(shù)。
Available Cluster:調(diào)用第一個(gè)可用的服務(wù)器,僅僅應(yīng)用于多注冊(cè)中心。
Broadcast Cluster:廣播調(diào)用所有提供者,逐個(gè)調(diào)用,在循環(huán)調(diào)用結(jié)束后,只要任意一臺(tái)報(bào)錯(cuò)就報(bào)錯(cuò)。通常用于通知所有提供者更新緩存或日志等本地資源信息
Mergeable Cluster:該部分在分組聚合講述。
MockClusterWrapper:該部分在本地偽裝講述。
源碼分析 (一)AbstractClusterInvoker該類(lèi)實(shí)現(xiàn)了Invoker接口,是集群Invoker的抽象類(lèi)。
1.屬性private static final Logger logger = LoggerFactory .getLogger(AbstractClusterInvoker.class); /** * 目錄,包含多個(gè)invoker */ protected final Directory2.selectdirectory; /** * 是否需要核對(duì)可用 */ protected final boolean availablecheck; /** * 是否銷(xiāo)毀 */ private AtomicBoolean destroyed = new AtomicBoolean(false); /** * 粘滯連接的Invoker */ private volatile Invoker stickyInvoker = null;
protected Invokerselect(LoadBalance loadbalance, Invocation invocation, List > invokers, List > selected) throws RpcException { // 如果invokers為空,則返回null if (invokers == null || invokers.isEmpty()) return null; // 獲得方法名 String methodName = invocation == null ? "" : invocation.getMethodName(); // 是否啟動(dòng)了粘滯連接 boolean sticky = invokers.get(0).getUrl().getMethodParameter(methodName, Constants.CLUSTER_STICKY_KEY, Constants.DEFAULT_CLUSTER_STICKY); { //ignore overloaded method // 如果上一次粘滯連接的調(diào)用不在可選的提供者列合內(nèi),則直接設(shè)置為空 if (stickyInvoker != null && !invokers.contains(stickyInvoker)) { stickyInvoker = null; } //ignore concurrency problem // stickyInvoker不為null,并且沒(méi)在已選列表中,返回上次的服務(wù)提供者stickyInvoker,但之前強(qiáng)制校驗(yàn)可達(dá)性。 // 由于stickyInvoker不能包含在selected列表中,通過(guò)代碼看,可以得知forking和failover集群策略,用不了sticky屬性 if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) { if (availablecheck && stickyInvoker.isAvailable()) { return stickyInvoker; } } } // 利用負(fù)載均衡選一個(gè)提供者 Invoker invoker = doSelect(loadbalance, invocation, invokers, selected); // 如果啟動(dòng)粘滯連接,則記錄這一次的調(diào)用 if (sticky) { stickyInvoker = invoker; } return invoker; }
該方法實(shí)現(xiàn)了使用負(fù)載均衡策略選擇一個(gè)調(diào)用者。首先,使用loadbalance選擇一個(gè)調(diào)用者。如果此調(diào)用者位于先前選擇的列表中,或者如果此調(diào)用者不可用,則重新選擇,否則返回第一個(gè)選定的調(diào)用者。重新選擇,重選的驗(yàn)證規(guī)則:選擇>可用。這條規(guī)則可以保證所選的調(diào)用者最少有機(jī)會(huì)成為之前選擇的列表中的一個(gè),也是保證這個(gè)調(diào)用程序可用。
3.doSelectprivate InvokerdoSelect(LoadBalance loadbalance, Invocation invocation, List > invokers, List > selected) throws RpcException { if (invokers == null || invokers.isEmpty()) return null; // 如果只有一個(gè) ,就直接返回這個(gè) if (invokers.size() == 1) return invokers.get(0); // 如果沒(méi)有指定用哪個(gè)負(fù)載均衡策略,則默認(rèn)用隨機(jī)負(fù)載均衡策略 if (loadbalance == null) { loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE); } // 調(diào)用負(fù)載均衡選擇 Invoker invoker = loadbalance.select(invokers, getUrl(), invocation); //If the `invoker` is in the `selected` or invoker is unavailable && availablecheck is true, reselect. // 如果選擇的提供者,已在selected中或者不可用則重新選擇 if ((selected != null && selected.contains(invoker)) || (!invoker.isAvailable() && getUrl() != null && availablecheck)) { try { // 重新選擇 Invoker rinvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck); if (rinvoker != null) { invoker = rinvoker; } else { //Check the index of current selected invoker, if it"s not the last one, choose the one at index+1. // 如果重新選擇失敗,看下第一次選的位置,如果不是最后,選+1位置. int index = invokers.indexOf(invoker); try { //Avoid collision // 最后再避免選擇到同一個(gè)invoker invoker = index < invokers.size() - 1 ? invokers.get(index + 1) : invokers.get(0); } catch (Exception e) { logger.warn(e.getMessage() + " may because invokers list dynamic change, ignore.", e); } } } catch (Throwable t) { logger.error("cluster reselect fail reason is :" + t.getMessage() + " if can not solve, you can set cluster.availablecheck=false in url", t); } } return invoker; }
該方法是用負(fù)載均衡選擇一個(gè)invoker的主要邏輯。
4.reselectprivate Invokerreselect(LoadBalance loadbalance, Invocation invocation, List > invokers, List > selected, boolean availablecheck) throws RpcException { //Allocating one in advance, this list is certain to be used. //預(yù)先分配一個(gè)重選列表,這個(gè)列表是一定會(huì)用到的. List > reselectInvokers = new ArrayList >(invokers.size() > 1 ? (invokers.size() - 1) : invokers.size()); //First, try picking a invoker not in `selected`. //先從非select中選 //把不包含在selected中的提供者,放入重選列表reselectInvokers,讓負(fù)載均衡器選擇 if (availablecheck) { // invoker.isAvailable() should be checked for (Invoker invoker : invokers) { if (invoker.isAvailable()) { if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } } // 在重選列表中用負(fù)載均衡器選擇 if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } } else { // do not check invoker.isAvailable() // 不核對(duì)服務(wù)是否可以,把不包含在selected中的提供者,放入重選列表reselectInvokers,讓負(fù)載均衡器選擇 for (Invoker invoker : invokers) { if (selected == null || !selected.contains(invoker)) { reselectInvokers.add(invoker); } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } } // Just pick an available invoker using loadbalance policy { // 如果非selected的列表中沒(méi)有選擇到,則從selected中選擇 if (selected != null) { for (Invoker invoker : selected) { if ((invoker.isAvailable()) // available first && !reselectInvokers.contains(invoker)) { reselectInvokers.add(invoker); } } } if (!reselectInvokers.isEmpty()) { return loadbalance.select(reselectInvokers, getUrl(), invocation); } } return null; }
該方法是是重新選擇的邏輯實(shí)現(xiàn)。
5.invoke@Override public Result invoke(final Invocation invocation) throws RpcException { // 核對(duì)是否已經(jīng)銷(xiāo)毀 checkWhetherDestroyed(); LoadBalance loadbalance = null; // binding attachments into invocation. // 獲得上下文的附加值 MapcontextAttachments = RpcContext.getContext().getAttachments(); // 把附加值放入到會(huì)話域中 if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 生成服務(wù)提供者集合 List > invokers = list(invocation); if (invokers != null && !invokers.isEmpty()) { // 獲得負(fù)載均衡器 loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl() .getMethodParameter(RpcUtils.getMethodName(invocation), Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE)); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
該方法是invoker接口必備的方法,調(diào)用鏈的邏輯,不過(guò)主要的邏輯在doInvoke方法中,該方法是該類(lèi)的抽象方法,讓子類(lèi)只關(guān)注doInvoke方法。
6.listprotected List> list(Invocation invocation) throws RpcException { // 把會(huì)話域中的invoker加入集合 List > invokers = directory.list(invocation); return invokers; }
該方法是調(diào)用了directory的list方法,從會(huì)話域中獲得所有的Invoker集合。關(guān)于directory我會(huì)在后續(xù)文章講解。
(二)AvailableClusterpublic class AvailableCluster implements Cluster { public static final String NAME = "available"; @Override publicInvoker join(Directory directory) throws RpcException { // 創(chuàng)建一個(gè)AbstractClusterInvoker return new AbstractClusterInvoker (directory) { @Override public Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { // 遍歷所有的involer,只要有一個(gè)可用就直接調(diào)用。 for (Invoker invoker : invokers) { if (invoker.isAvailable()) { return invoker.invoke(invocation); } } throw new RpcException("No provider available in " + invokers); } }; } }
Available Cluster我在上面已經(jīng)講過(guò)了,只要找到一個(gè)可用的,則直接調(diào)用。
(三)BroadcastClusterpublic class BroadcastCluster implements Cluster { @Override publicInvoker join(Directory directory) throws RpcException { // 創(chuàng)建一個(gè)BroadcastClusterInvoker return new BroadcastClusterInvoker (directory); } }
關(guān)鍵實(shí)現(xiàn)在于BroadcastClusterInvoker。
(四)BroadcastClusterInvokerpublic class BroadcastClusterInvoker(五)ForkingClusterextends AbstractClusterInvoker { private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class); public BroadcastClusterInvoker(Directory directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(final Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { // 檢測(cè)invokers是否為空 checkInvokers(invokers, invocation); // 把invokers放到上下文 RpcContext.getContext().setInvokers((List) invokers); RpcException exception = null; Result result = null; // 遍歷invokers,逐個(gè)調(diào)用,在循環(huán)調(diào)用結(jié)束后,只要任意一臺(tái)報(bào)錯(cuò)就報(bào)錯(cuò) for (Invoker invoker : invokers) { try { result = invoker.invoke(invocation); } catch (RpcException e) { exception = e; logger.warn(e.getMessage(), e); } catch (Throwable e) { exception = new RpcException(e.getMessage(), e); logger.warn(e.getMessage(), e); } } if (exception != null) { throw exception; } return result; } }
public class ForkingCluster implements Cluster { public final static String NAME = "forking"; @Override public(六)ForkingClusterInvokerInvoker join(Directory directory) throws RpcException { // 創(chuàng)建ForkingClusterInvoker return new ForkingClusterInvoker (directory); } }
public class ForkingClusterInvoker(七)FailbackClusterextends AbstractClusterInvoker { /** * 線程池 * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread} * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}. */ private final ExecutorService executor = Executors.newCachedThreadPool( new NamedInternalThreadFactory("forking-cluster-timer", true)); public ForkingClusterInvoker(Directory directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(final Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { try { // 檢測(cè)invokers是否為空 checkInvokers(invokers, invocation); final List > selected; // 獲取 forks 配置 final int forks = getUrl().getParameter(Constants.FORKS_KEY, Constants.DEFAULT_FORKS); // 獲取超時(shí)配置 final int timeout = getUrl().getParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 如果 forks 配置不合理,則直接將 invokers 賦值給 selected if (forks <= 0 || forks >= invokers.size()) { selected = invokers; } else { selected = new ArrayList >(); // 循環(huán)選出 forks 個(gè) Invoker,并添加到 selected 中 for (int i = 0; i < forks; i++) { // TODO. Add some comment here, refer chinese version for more details. // 選擇 Invoker Invoker invoker = select(loadbalance, invocation, invokers, selected); if (!selected.contains(invoker)) {//Avoid add the same invoker several times. // 加入到selected集合 selected.add(invoker); } } } // 放入上下文 RpcContext.getContext().setInvokers((List) selected); final AtomicInteger count = new AtomicInteger(); final BlockingQueue
public class FailbackCluster implements Cluster { public final static String NAME = "failback"; @Override public(八)FailbackClusterInvoker 1.屬性Invoker join(Directory directory) throws RpcException { // 創(chuàng)建一個(gè)FailbackClusterInvoker return new FailbackClusterInvoker (directory); } }
private static final Logger logger = LoggerFactory.getLogger(FailbackClusterInvoker.class); // 重試間隔 private static final long RETRY_FAILED_PERIOD = 5 * 1000; /** * 定時(shí)器 * Use {@link NamedInternalThreadFactory} to produce {@link com.alibaba.dubbo.common.threadlocal.InternalThread} * which with the use of {@link com.alibaba.dubbo.common.threadlocal.InternalThreadLocal} in {@link RpcContext}. */ private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedInternalThreadFactory("failback-cluster-timer", true)); /** * 失敗集合 */ private final ConcurrentMap2.doInvoke> failed = new ConcurrentHashMap >(); /** * future */ private volatile ScheduledFuture> retryFuture;
@Override protected Result doInvoke(Invocation invocation, List> invokers, LoadBalance loadbalance) throws RpcException { try { // 檢測(cè)invokers是否為空 checkInvokers(invokers, invocation); // 選擇出invoker Invoker invoker = select(loadbalance, invocation, invokers, null); // 調(diào)用 return invoker.invoke(invocation); } catch (Throwable e) { logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: " + e.getMessage() + ", ", e); // 如果失敗,則加入到失敗隊(duì)列,等待重試 addFailed(invocation, this); return new RpcResult(); // ignore } }
該方法是選擇invoker調(diào)用的邏輯,在拋出異常的時(shí)候,做了失敗重試的機(jī)制,主要實(shí)現(xiàn)在addFailed。
3.addFailedprivate void addFailed(Invocation invocation, AbstractClusterInvoker> router) { if (retryFuture == null) { // 鎖住 synchronized (this) { if (retryFuture == null) { // 創(chuàng)建定時(shí)任務(wù),每隔5秒執(zhí)行一次 retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() { @Override public void run() { // collect retry statistics try { // 對(duì)失敗的調(diào)用進(jìn)行重試 retryFailed(); } catch (Throwable t) { // Defensive fault tolerance logger.error("Unexpected error occur at collect statistic", t); } } }, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS); } } } // 添加 invocation 和 invoker 到 failed 中 failed.put(invocation, router); }
該方法做的事創(chuàng)建了定時(shí)器,然后把失敗的調(diào)用放入到集合中。
4.retryFailedvoid retryFailed() { // 如果失敗隊(duì)列為0,返回 if (failed.size() == 0) { return; } // 遍歷失敗隊(duì)列 for (Map.Entry> entry : new HashMap >( failed).entrySet()) { // 獲得會(huì)話域 Invocation invocation = entry.getKey(); // 獲得invoker Invoker> invoker = entry.getValue(); try { // 重新調(diào)用 invoker.invoke(invocation); // 從失敗隊(duì)列中移除 failed.remove(invocation); } catch (Throwable e) { logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e); } } }
這個(gè)方法是調(diào)用失敗的invoker重新調(diào)用的機(jī)制。
(九)FailfastClusterpublic class FailfastCluster implements Cluster { public final static String NAME = "failfast"; @Override public(十)FailfastClusterInvokerInvoker join(Directory directory) throws RpcException { // 創(chuàng)建FailfastClusterInvoker return new FailfastClusterInvoker (directory); } }
public class FailfastClusterInvokerextends AbstractClusterInvoker { public FailfastClusterInvoker(Directory directory) { super(directory); } @Override public Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { // 檢測(cè)invokers是否為空 checkInvokers(invokers, invocation); // 選擇一個(gè)invoker Invoker invoker = select(loadbalance, invocation, invokers, null); try { // 調(diào)用 return invoker.invoke(invocation); } catch (Throwable e) { if (e instanceof RpcException && ((RpcException) e).isBiz()) { // biz exception. // 拋出異常 throw (RpcException) e; } // 拋出異常 throw new RpcException(e instanceof RpcException ? ((RpcException) e).getCode() : 0, "Failfast invoke providers " + invoker.getUrl() + " " + loadbalance.getClass().getSimpleName() + " select from all providers " + invokers + " for service " + getInterface().getName() + " method " + invocation.getMethodName() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", but no luck to perform the invocation. Last error is: " + e.getMessage(), e.getCause() != null ? e.getCause() : e); } } }
邏輯比較簡(jiǎn)單,調(diào)用拋出異常就直接拋出。
(十一)FailoverClusterpublic class FailoverCluster implements Cluster { public final static String NAME = "failover"; @Override public(十二)FailoverClusterInvokerInvoker join(Directory directory) throws RpcException { // 創(chuàng)建FailoverClusterInvoker return new FailoverClusterInvoker (directory); } }
public class FailoverClusterInvokerextends AbstractClusterInvoker { private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class); public FailoverClusterInvoker(Directory directory) { super(directory); } @Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List > invokers, LoadBalance loadbalance) throws RpcException { // 復(fù)制一個(gè)invoker集合 List > copyinvokers = invokers; // 檢測(cè)是否為空 checkInvokers(copyinvokers, invocation); // 獲取重試次數(shù) int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. // 記錄最后一個(gè)異常 RpcException le = null; // last exception. List > invoked = new ArrayList >(copyinvokers.size()); // invoked invokers. Set providers = new HashSet (len); // 循環(huán)調(diào)用,失敗重試 for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. // 在進(jìn)行重試前重新列舉 Invoker,這樣做的好處是,如果某個(gè)服務(wù)掛了, // 通過(guò)調(diào)用 list 可得到最新可用的 Invoker 列表 if (i > 0) { checkWhetherDestroyed(); copyinvokers = list(invocation); // check again // 檢測(cè)copyinvokers 是否為空 checkInvokers(copyinvokers, invocation); } // 通過(guò)負(fù)載均衡選擇invoker Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked); // 添加到 invoker 到 invoked 列表中 invoked.add(invoker); // 設(shè)置 invoked 到 RPC 上下文中 RpcContext.getContext().setInvokers((List) invoked); try { // 調(diào)用目標(biāo) Invoker 的 invoke 方法 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } // 若重試失敗,則拋出異常 throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + invocation.getMethodName() + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); } }
該類(lèi)實(shí)現(xiàn)了失敗重試的容錯(cuò)策略,當(dāng)調(diào)用失敗的時(shí)候,記錄下異常,然后循環(huán)調(diào)用下一個(gè)選擇出來(lái)的invoker,直到重試次數(shù)用完,拋出最后一次的異常。
(十三)FailsafeClusterpublic class FailsafeCluster implements Cluster { public final static String NAME = "failsafe"; @Override public(十四)FailsafeClusterInvokerInvoker join(Directory directory) throws RpcException { // 創(chuàng)建FailsafeClusterInvoker return new FailsafeClusterInvoker (directory); } }
public class FailsafeClusterInvokerextends AbstractClusterInvoker { private static final Logger logger = LoggerFactory.getLogger(FailsafeClusterInvoker.class); public FailsafeClusterInvoker(Directory directory) { super(directory); } @Override public Result doInvoke(Invocation invocation, List > invokers, LoadBalance loadbalance) throws RpcException { try { // 檢測(cè)invokers是否為空 checkInvokers(invokers, invocation); // 選擇一個(gè)invoker Invoker invoker = select(loadbalance, invocation, invokers, null); // 調(diào)用 return invoker.invoke(invocation); } catch (Throwable e) { // 如果失敗打印異常,返回一個(gè)空結(jié)果 logger.error("Failsafe ignore exception: " + e.getMessage(), e); return new RpcResult(); // ignore } } }
邏輯比較簡(jiǎn)單,就是不拋出異常,只是打印異常。
后記該部分相關(guān)的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了集群中關(guān)于cluster實(shí)現(xiàn)的部分,講了幾種調(diào)用方式和容錯(cuò)策略。接下來(lái)我將開(kāi)始對(duì)集群模塊關(guān)于配置規(guī)則部分進(jìn)行講解。
文章版權(quán)歸作者所有,未經(jīng)允許請(qǐng)勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請(qǐng)注明本文地址:http://specialneedsforspecialkids.com/yun/77383.html
摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類(lèi)也是用了適配器模式,該類(lèi)主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請(qǐng)求過(guò)程 目標(biāo):從源碼的角度分析一個(gè)服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過(guò)程,引用服務(wù)無(wú)非就是創(chuàng)建出一個(gè)代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...
摘要:源碼分析一創(chuàng)建該類(lèi)實(shí)現(xiàn)了接口,是分組集合的集群實(shí)現(xiàn)。三工廠類(lèi),獲得指定類(lèi)型的對(duì)象。后記該部分相關(guān)的源碼解析地址該文章講解了集群中關(guān)于分組聚合實(shí)現(xiàn)的部分。接下來(lái)我將開(kāi)始對(duì)集群模塊關(guān)于路由部分進(jìn)行講解。 集群——merger 目標(biāo):介紹dubbo中集群的分組聚合,介紹dubbo-cluster下merger包的源碼。 前言 按組合并返回結(jié)果 ,比如菜單服務(wù),接口一樣,但有多種實(shí)現(xiàn),用gro...
摘要:服務(wù)引用過(guò)程目標(biāo)從源碼的角度分析服務(wù)引用過(guò)程。并保留服務(wù)提供者的部分配置,比如版本,,時(shí)間戳等最后將合并后的配置設(shè)置為查詢(xún)字符串中。的可以參考源碼解析二十三遠(yuǎn)程調(diào)用的一的源碼分析。 dubbo服務(wù)引用過(guò)程 目標(biāo):從源碼的角度分析服務(wù)引用過(guò)程。 前言 前面服務(wù)暴露過(guò)程的文章講解到,服務(wù)引用有兩種方式,一種就是直連,也就是直接指定服務(wù)的地址來(lái)進(jìn)行引用,這種方式更多的時(shí)候被用來(lái)做服務(wù)測(cè)試,不...
摘要:集群目標(biāo)介紹中集群的負(fù)載均衡,介紹下包的源碼。源碼分析一該類(lèi)實(shí)現(xiàn)了接口,是負(fù)載均衡的抽象類(lèi),提供了權(quán)重計(jì)算的功能。四該類(lèi)是負(fù)載均衡基于一致性的邏輯實(shí)現(xiàn)。 集群——LoadBalance 目標(biāo):介紹dubbo中集群的負(fù)載均衡,介紹dubbo-cluster下loadBalance包的源碼。 前言 負(fù)載均衡,說(shuō)的通俗點(diǎn)就是要一碗水端平。在這個(gè)時(shí)代,公平是很重要的,在網(wǎng)絡(luò)請(qǐng)求的時(shí)候同樣是這個(gè)...
摘要:首先將根據(jù)路由規(guī)則服務(wù)提供者和配置規(guī)則三種類(lèi)型分開(kāi),分別放入三個(gè)集合,然后對(duì)每個(gè)集合進(jìn)行修改或者通知設(shè)置禁止訪問(wèn)置空關(guān)閉所有的關(guān)閉禁止訪問(wèn)引用老的傳入的為空,說(shuō)明是路由規(guī)則或配置規(guī)則發(fā)生改變,此時(shí)是空的,直接使用。 集群——directory 目標(biāo):介紹dubbo中集群的目錄,介紹dubbo-cluster下directory包的源碼。 前言 我在前面的文章中也提到了Directory...
閱讀 3456·2023-04-26 00:39
閱讀 4059·2021-09-22 10:02
閱讀 2544·2021-08-09 13:46
閱讀 1102·2019-08-29 18:40
閱讀 1447·2019-08-29 18:33
閱讀 775·2019-08-29 17:14
閱讀 1517·2019-08-29 12:40
閱讀 2979·2019-08-28 18:07