摘要:上一篇源碼解析概要篇中我們了解到中的一些概念及消費(fèi)端總體調(diào)用過程。由于在生成代理實(shí)例的時候,在構(gòu)造函數(shù)中賦值了,因此可以只用該進(jìn)行方法的調(diào)用。
上一篇 dubbo源碼解析——概要篇中我們了解到dubbo中的一些概念及消費(fèi)端總體調(diào)用過程。本文中,將進(jìn)入消費(fèi)端源碼解析(具體邏輯會放到代碼的注釋中)。本文先是對消費(fèi)過程的總體代碼邏輯理一遍,個別需要細(xì)講的點(diǎn),后面會專門的文章進(jìn)行解析。
開頭進(jìn)入InvokerInvocationHandler通過實(shí)現(xiàn)InvocationHandler,我們知道dubbo生成代理使用的是JDK動態(tài)代理。這個類中主要是對特殊方法進(jìn)行處理。由于在生成代理實(shí)例的時候,在構(gòu)造函數(shù)中賦值了invoker,因此可以只用該invoker進(jìn)行invoke方法的調(diào)用。
/** * dubbo使用JDK動態(tài)代理,對接口對象進(jìn)行注入 * InvokerHandler * * 程序啟動的過程中,在構(gòu)造函數(shù)中,賦值下一個需要調(diào)用的invoker,從而形成執(zhí)行鏈 */ public class InvokerInvocationHandler implements InvocationHandler { private final Invoker> invoker; public InvokerInvocationHandler(Invoker> handler) { this.invoker = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 獲取方法名稱 String methodName = method.getName(); // 獲取參數(shù)類型 Class>[] parameterTypes = method.getParameterTypes(); // 方法所處的類 是 Object類,則直接調(diào)用 if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } /* * toString、hashCode、equals方法比較特殊,如果interface里面定義了這幾個方法,并且進(jìn)行實(shí)現(xiàn), * 通過dubbo遠(yuǎn)程調(diào)用是不會執(zhí)行這些代碼實(shí)現(xiàn)的。 */ /* * 方法調(diào)用是toString,依次執(zhí)行MockClusterInvoker、AbstractClusterInvoker的toString方法 */ if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } /* * interface中含有hashCode方法,直接調(diào)用invoker的hashCode */ if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } /* * interface中含有equals方法,直接調(diào)用invoker的equals */ if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } /* * invocationv包含了遠(yuǎn)程調(diào)用的參數(shù)、方法信息 */ RpcInvocation invocation; /* * todo這段代碼在最新的dubbo版本中沒有 */ if (RpcUtils.hasGeneratedFuture(method)) { Class> clazz = method.getDeclaringClass(); String syncMethodName = methodName.substring(0, methodName.length() - Constants.ASYNC_SUFFIX.length()); Method syncMethod = clazz.getMethod(syncMethodName, method.getParameterTypes()); invocation = new RpcInvocation(syncMethod, args); invocation.setAttachment(Constants.FUTURE_GENERATED_KEY, "true"); invocation.setAttachment(Constants.ASYNC_KEY, "true"); } else { invocation = new RpcInvocation(method, args); if (RpcUtils.hasFutureReturnType(method)) { invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true"); invocation.setAttachment(Constants.ASYNC_KEY, "true"); } } // 繼續(xù)invoker鏈?zhǔn)秸{(diào)用 return invoker.invoke(invocation).recreate(); } }進(jìn)入MockClusterInvoker
這段代碼主要是判斷是否需要進(jìn)行mock調(diào)用
@Override public Result invoke(Invocation invocation) throws RpcException { Result result = null; // 獲取mock參數(shù),從而判斷是否需要mock String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")) { // 不需要mock,繼續(xù)往下調(diào)用 result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { if (logger.isWarnEnabled()) { logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl()); } // 選擇mock的invoker result = doMockInvoke(invocation, null); } else { // 正常調(diào)用失敗,則調(diào)用mock try { result = this.invoker.invoke(invocation); } catch (RpcException e) { if (e.isBiz()) { throw e; } else { if (logger.isWarnEnabled()) { logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e); } result = doMockInvoke(invocation, e); } } } return result; }進(jìn)入AbstractClusterInvoker
進(jìn)入這段代碼,表明開始進(jìn)入到集群
@Override public Result invoke(final Invocation invocation) throws RpcException { // 檢查消費(fèi)端invoker是否銷毀了 checkWhetherDestroyed(); // 將參數(shù)綁定到invocation Map進(jìn)入AbstractDirectorycontextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { ((RpcInvocation) invocation).addAttachments(contextAttachments); } // 獲取滿足條件的invoker(從Directory獲取,并且經(jīng)過router過濾) List > invokers = list(invocation); // 初始化loadBalance LoadBalance loadbalance = initLoadBalance(invokers, invocation); // invocation ID將被添加在異步操作 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); return doInvoke(invocation, invokers, loadbalance); }
@Override public List> list(Invocation invocation) throws RpcException { // 判斷Directory是否銷毀 if (destroyed) { throw new RpcException("Directory already destroyed .url: " + getUrl()); } // 從methodInvokerMap中取出滿足條件的invoker List > invokers = doList(invocation); // 根據(jù)路由列表,篩選出滿足條件的invoker List localRouters = this.routers; // local reference if (localRouters != null && !localRouters.isEmpty()) { for (Router router : localRouters) { try { if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) { invokers = router.route(invokers, getConsumerUrl(), invocation); } } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); } } } return invokers; }
這里主要是從Directory中獲取invoker,并且經(jīng)過router路由的篩選,獲得滿足條件的invoker。
在AbstractDirectory中,有一個關(guān)鍵的方法com.alibaba.dubbo.rpc.cluster.directory.AbstractDirectory#doList,這是一個抽象方法,子類RegistryDirectory的有具體實(shí)現(xiàn),并且調(diào)用RegistryDirectory的doList方法。(這里應(yīng)該是用到了模板方法模式)。后面的文字中會詳細(xì)講下doList方法中做了啥。
經(jīng)過從Directory中獲取invoker,然后router篩選出滿足條件的invoker之后,進(jìn)入到FailoverClusterInvoker。為什么會到這里呢?
根據(jù)官網(wǎng)的描述:
在集群調(diào)用失敗時,Dubbo 提供了多種容錯方案,缺省為 failover 重試。
所以這個時候是到了FailoverClusterInvoker類,但是如果你配置的是Failfast Cluster(快速失敗),Failsafe Cluster(失敗安全),Failback Cluster(失敗自動恢復(fù)),Forking Cluster(并行調(diào)用多個服務(wù)器,只要一個成功即返回),Broadcast Cluster(廣播調(diào)用所有提供者,逐個調(diào)用,任意一臺報錯則報錯),他也會到達(dá)相應(yīng)的類。
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List> invokers, LoadBalance loadbalance) throws RpcException { // 局部引用 List > copyinvokers = invokers; // 參數(shù)校驗(yàn)(這種封裝方法我在工作中借鑒,個人感覺比較好) checkInvokers(copyinvokers, invocation); // 獲取方法名稱 String methodName = RpcUtils.getMethodName(invocation); // 獲取重試次數(shù) int len = getUrl().getMethodParameter(methodName, Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1; if (len <= 0) { // 最少要調(diào)用1次 len = 1; } // 局部引用 RpcException le = null; List > invoked = new ArrayList >(copyinvokers.size()); // invoked invokers. Set providers = new HashSet (len); // i < len 作為循環(huán)條件,說明len是多少就循環(huán)多少次 for (int i = 0; i < len; i++) { // 在重試之前,需要重新選擇,以避免候選invoker的改變 if (i > 0) { // 檢查invoker是否被銷毀 checkWhetherDestroyed(); // 重新選擇invoker copyinvokers = list(invocation); // 參數(shù)檢查 checkInvokers(copyinvokers, invocation); } /* * 這一步就是進(jìn)入loadBalance負(fù)載均衡 * 因?yàn)樯鲜霾襟E可能篩選出invoker數(shù)量大于1,所以再次經(jīng)過loadBalance的篩選 */ Invoker invoker = select(loadbalance, invocation, copyinvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { // 遠(yuǎn)程方法調(diào)用 Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le != null ? le.getCode() : 0, "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyinvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + (le != null ? le.getMessage() : ""), le != null && le.getCause() != null ? le.getCause() : le); }
到達(dá)終點(diǎn)站.我們回憶總結(jié)一下,文初提到的三個關(guān)鍵詞,在這個集群容錯的整體架構(gòu)過程中,dubbo究竟做了什么.其實(shí)也就是三件事
(1)在Directory中找出本次集群中的全部invokers
(2)在Router中,將上一步的全部invokers挑選出能正常執(zhí)行的invokers
(3)在LoadBalance中,將上一步的能正常的執(zhí)行invokers中,根據(jù)配置的負(fù)載均衡策略,挑選出需要執(zhí)行的invoker
后面的文章中,對上述的一些細(xì)節(jié)進(jìn)行解析
文章版權(quán)歸作者所有,未經(jīng)允許請勿轉(zhuǎn)載,若此文章存在違規(guī)行為,您可以聯(lián)系管理員刪除。
轉(zhuǎn)載請注明本文地址:http://specialneedsforspecialkids.com/yun/72100.html
摘要:大揭秘異步化改造目標(biāo)從源碼的角度分析的新特性中對于異步化的改造原理。看源碼解析四十六消費(fèi)端發(fā)送請求過程講到的十四的,在以前的邏輯會直接在方法中根據(jù)配置區(qū)分同步異步單向調(diào)用。改為關(guān)于可以參考源碼解析十遠(yuǎn)程通信層的六。 2.7大揭秘——異步化改造 目標(biāo):從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協(xié)議,關(guān)于協(xié)議的系列可以查看下面的文章: du...
摘要:服務(wù)提供者代碼上面這個類會被封裝成為一個實(shí)例,并新生成一個實(shí)例。這樣當(dāng)網(wǎng)絡(luò)通訊層收到一個請求后,會找到對應(yīng)的實(shí)例,并調(diào)用它所對應(yīng)的實(shí)例,從而真正調(diào)用了服務(wù)提供者的代碼。 這次源碼解析借鑒《肥朝》前輩的dubbo源碼解析,進(jìn)行源碼學(xué)習(xí)。總結(jié)起來就是先總體,后局部.也就是先把需要注意的概念先拋出來,把整體架構(gòu)圖先畫出來.讓讀者拿著地圖跟著我的腳步,并且每一步我都提醒,現(xiàn)在我們在哪,我們下一...
摘要:源碼分析條件路由規(guī)則有兩個條件組成,分別用于對服務(wù)消費(fèi)者和提供者進(jìn)行匹配。如果服務(wù)提供者匹配條件為空,表示對某些服務(wù)消費(fèi)者禁用服務(wù)。此時第六次循環(huán)分隔符,,。第二個和第三個參數(shù)來自方法的參數(shù)列表,這兩個參數(shù)分別為服務(wù)提供者和服務(wù)消費(fèi)者。 1. 簡介 上一篇文章分析了集群容錯的第一部分 -- 服務(wù)目錄 Directory。服務(wù)目錄在刷新 Invoker 列表的過程中,會通過 Router...
摘要:可以參考源碼解析二十四遠(yuǎn)程調(diào)用協(xié)議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠(yuǎn)程通信層的四。二十的可以參考源碼解析十七遠(yuǎn)程通信的一。 2.7大揭秘——消費(fèi)端發(fā)送請求過程 目標(biāo):從源碼的角度分析一個服務(wù)方法調(diào)用經(jīng)歷怎么樣的磨難以后到達(dá)服務(wù)端。 前言 前一篇文章講到的是引用服務(wù)的過程,引用服務(wù)無非就是創(chuàng)建出一個代理。供消費(fèi)者調(diào)用服務(wù)的相關(guān)方法。...
摘要:服務(wù)引用過程目標(biāo)從源碼的角度分析服務(wù)引用過程。并保留服務(wù)提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設(shè)置為查詢字符串中。的可以參考源碼解析二十三遠(yuǎn)程調(diào)用的一的源碼分析。 dubbo服務(wù)引用過程 目標(biāo):從源碼的角度分析服務(wù)引用過程。 前言 前面服務(wù)暴露過程的文章講解到,服務(wù)引用有兩種方式,一種就是直連,也就是直接指定服務(wù)的地址來進(jìn)行引用,這種方式更多的時候被用來做服務(wù)測試,不...
閱讀 2416·2021-11-25 09:43
閱讀 1195·2021-09-07 10:16
閱讀 2603·2021-08-20 09:38
閱讀 2936·2019-08-30 15:55
閱讀 1449·2019-08-30 13:21
閱讀 883·2019-08-29 15:37
閱讀 1435·2019-08-27 10:56
閱讀 2093·2019-08-26 13:45