摘要:可以參考源碼解析二十四遠程調用協議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠程通信層的四。二十的可以參考源碼解析十七遠程通信的一。
2.7大揭秘——消費端發送請求過程
目標:從源碼的角度分析一個服務方法調用經歷怎么樣的磨難以后到達服務端。前言
前一篇文章講到的是引用服務的過程,引用服務無非就是創建出一個代理。供消費者調用服務的相關方法。本節將從調用方法開始講解內部的整個調用鏈。我們就拿dubbo內部的例子講。
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/dubbo-consumer.xml"); context.start(); DemoService demoService = context.getBean("demoService", DemoService.class); String hello = demoService.sayHello("world"); System.out.println("result: " + hello);
這是dubbo-demo-xml-consumer內的實例代碼。接下來我們就開始來看調用demoService.sayHello方法的時候,dubbo執行了哪些操作。
執行過程 (一)InvokerInvocationHandler的invokepublic Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 獲得方法名稱 String methodName = method.getName(); // 獲得方法參數類型 Class>[] parameterTypes = method.getParameterTypes(); // 如果該方法所在的類是Object類型,則直接調用invoke。 if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } // 如果這個方法是toString,則直接調用invoker.toString() if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } // 如果這個方法是hashCode直接調用invoker.hashCode() if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } // 如果這個方法是equals,直接調用invoker.equals(args[0]) if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // 調用invoke return invoker.invoke(new RpcInvocation(method, args)).recreate(); }
可以看到上面的源碼,首先對Object的方法進行了處理,如果調用的方法不是這些方法,則先會 創建RpcInvocation,然后再調用invoke。
RpcInvocation的構造方法public RpcInvocation(Method method, Object[] arguments) { this(method.getName(), method.getParameterTypes(), arguments, null, null); }
public RpcInvocation(String methodName, Class>[] parameterTypes, Object[] arguments, Mapattachments, Invoker> invoker) { // 設置方法名 this.methodName = methodName; // 設置參數類型 this.parameterTypes = parameterTypes == null ? new Class>[0] : parameterTypes; // 設置參數 this.arguments = arguments == null ? new Object[0] : arguments; // 設置附加值 this.attachments = attachments == null ? new HashMap () : attachments; // 設置invoker實體 this.invoker = invoker; }
創建完RpcInvocation后,就是調用invoke。先進入的是ListenerInvokerWrapper的invoke。
(二)MockClusterInvoker的invoke可以參考《dubbo源碼解析(四十一)集群——Mock》的(二)MockClusterInvoker,降級后的返回策略的實現,根據配置的不同來決定不用降級還是強制服務降級還是失敗后再服務降級。
(三)AbstractClusterInvoker的invoke可以參考《dubbo源碼解析(三十五)集群——cluster》的(一)AbstractClusterInvoker,該類是一個抽象類,其中封裝了一些公用的方法,AbstractClusterInvoker的invoke也只是做了一些公用操作。主要的邏輯在doInvoke中。
(四)FailoverClusterInvoker的doInvoke可以參考《dubbo源碼解析(三十五)集群——cluster》的(十二)FailoverClusterInvoker,該類實現了失敗重試的容錯策略。
(五)InvokerWrapper的invoke可以參考《dubbo源碼解析(二十二)遠程調用——Protocol》的(五)InvokerWrapper。該類用了裝飾模式,不過并沒有實現實際的功能增強。
(六)ProtocolFilterWrapper的內部類CallbackRegistrationInvoker的invokepublic Result invoke(Invocation invocation) throws RpcException { // 調用攔截器鏈的invoke Result asyncResult = filterInvoker.invoke(invocation); // 把異步返回的結果加入到上下文中 asyncResult.thenApplyWithContext(r -> { // 循環各個過濾器 for (int i = filters.size() - 1; i >= 0; i--) { Filter filter = filters.get(i); // onResponse callback // 如果該過濾器是ListenableFilter類型的 if (filter instanceof ListenableFilter) { // 強制類型轉化 Filter.Listener listener = ((ListenableFilter) filter).listener(); if (listener != null) { // 如果內部類listener不為空,則調用回調方法onResponse listener.onResponse(r, filterInvoker, invocation); } } else { // 否則,直接調用filter的onResponse,做兼容。 filter.onResponse(r, filterInvoker, invocation); } } // 返回異步結果 return r; }); // 返回異步結果 return asyncResult; }
這里看到先是調用攔截器鏈的invoke方法。下面的邏輯是把異步返回的結果放到上下文中,具體的ListenableFilter以及內部類的設計,還有thenApplyWithContext等方法我會在異步的實現中講到。
(七)ProtocolFilterWrapper的buildInvokerChain方法中的invoker實例的invoke方法。public Result invoke(Invocation invocation) throws RpcException { Result asyncResult; try { // 依次調用各個過濾器,獲得最終的返回結果 asyncResult = filter.invoke(next, invocation); } catch (Exception e) { // onError callback // 捕獲異常,如果該過濾器是ListenableFilter類型的 if (filter instanceof ListenableFilter) { // 獲得內部類Listener Filter.Listener listener = ((ListenableFilter) filter).listener(); if (listener != null) { //調用onError,回調錯誤信息 listener.onError(e, invoker, invocation); } } // 拋出異常 throw e; } // 返回結果 return asyncResult; }
該方法中是對異常的捕獲,調用內部類Listener的onError來回調錯誤信息。接下來看它經過了哪些攔截器。
(八)ConsumerContextFilter的invokepublic Result invoke(Invoker> invoker, Invocation invocation) throws RpcException { // 獲得上下文,設置invoker,會話域,本地地址和原創地址 RpcContext.getContext() .setInvoker(invoker) .setInvocation(invocation) .setLocalAddress(NetUtils.getLocalHost(), 0) .setRemoteAddress(invoker.getUrl().getHost(), invoker.getUrl().getPort()); // 如果會話域是RpcInvocation,則設置invoker if (invocation instanceof RpcInvocation) { ((RpcInvocation) invocation).setInvoker(invoker); } try { // 移除服務端的上下文 RpcContext.removeServerContext(); // 調用下一個過濾器 return invoker.invoke(invocation); } finally { // 清空上下文 RpcContext.removeContext(); } }
static class ConsumerContextListener implements Listener { @Override public void onResponse(Result appResponse, Invoker> invoker, Invocation invocation) { // 把結果中的附加值放入到上下文中 RpcContext.getServerContext().setAttachments(appResponse.getAttachments()); } @Override public void onError(Throwable t, Invoker> invoker, Invocation invocation) { // 不做任何處理 } }
可以參考《dubbo源碼解析(二十)遠程調用——Filter》,不過上面的源碼是最新的,而鏈接內的源碼是2.6.x的,雖然做了一些變化,比如內部類的的設計,后續的過濾器也有同樣的實現,但是ConsumerContextFilter作用沒有變化,它依舊是在當前的RpcContext中記錄本地調用的一次狀態信息。該過濾器執行完成后,會回到ProtocolFilterWrapper的invoke中的
Result result = filter.invoke(next, invocation);
然后繼續調用下一個過濾器FutureFilter。
(九)FutureFilter的invokepublic Result invoke(final Invoker> invoker, final Invocation invocation) throws RpcException { // 該方法是真正的調用方法的執行 fireInvokeCallback(invoker, invocation); // need to configure if there"s return value before the invocation in order to help invoker to judge if it"s // necessary to return future. return invoker.invoke(invocation); }
class FutureListener implements Listener { @Override public void onResponse(Result result, Invoker> invoker, Invocation invocation) { if (result.hasException()) { // 處理異常結果 fireThrowCallback(invoker, invocation, result.getException()); } else { // 處理正常結果 fireReturnCallback(invoker, invocation, result.getValue()); } } @Override public void onError(Throwable t, Invoker> invoker, Invocation invocation) { } }
可以參考《dubbo源碼解析(二十四)遠程調用——dubbo協議》中的(十四)FutureFilter,其中會有部分結構不一樣,跟ConsumerContextFilter一樣,因為后續版本對Filter接口進行了新的設計,增加了onResponse方法,把返回的執行邏輯放到onResponse中去了。其他邏輯沒有很大變化。等該過濾器執行完成后,還是回到ProtocolFilterWrapper的invoke中的,繼續調用下一個過濾器MonitorFilter。
(十)MonitorFilter的invokepublic Result invoke(Invoker> invoker, Invocation invocation) throws RpcException { // 如果開啟監控 if (invoker.getUrl().hasParameter(MONITOR_KEY)) { // 設置監控開始時間 invocation.setAttachment(MONITOR_FILTER_START_TIME, String.valueOf(System.currentTimeMillis())); // 獲得當前的調用數,并且增加 getConcurrent(invoker, invocation).incrementAndGet(); // count up } return invoker.invoke(invocation); // proceed invocation chain }
class MonitorListener implements Listener { @Override public void onResponse(Result result, Invoker> invoker, Invocation invocation) { // 如果開啟監控 if (invoker.getUrl().hasParameter(MONITOR_KEY)) { // 執行監控,搜集數據 collect(invoker, invocation, result, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), false); // 減少當前調用數 getConcurrent(invoker, invocation).decrementAndGet(); // count down } } @Override public void onError(Throwable t, Invoker> invoker, Invocation invocation) { // 如果開啟監控 if (invoker.getUrl().hasParameter(MONITOR_KEY)) { // 執行監控,搜集數據 collect(invoker, invocation, null, RpcContext.getContext().getRemoteHost(), Long.valueOf(invocation.getAttachment(MONITOR_FILTER_START_TIME)), true); // 減少當前調用數 getConcurrent(invoker, invocation).decrementAndGet(); // count down } }
可以看到該過濾器實際用來做監控,監控服務的調用數量等。其中監控的邏輯不是本文重點,所以不細講。接下來調用的是ListenerInvokerWrapper的invoke。
(十一)ListenerInvokerWrapper的invokepublic Result invoke(Invocation invocation) throws RpcException { return invoker.invoke(invocation); }
可以參考《dubbo源碼解析(二十一)遠程調用——Listener》,這里用到了裝飾者模式,直接調用了invoker。該類里面做了服務啟動的監聽器。我們直接關注下一個invoke。
(十二)AsyncToSyncInvoker的invokepublic Result invoke(Invocation invocation) throws RpcException { Result asyncResult = invoker.invoke(invocation); try { // 如果是同步的調用 if (InvokeMode.SYNC == ((RpcInvocation)invocation).getInvokeMode()) { // 從異步結果中get結果 asyncResult.get(); } } catch (InterruptedException e) { throw new RpcException("Interrupted unexpectedly while waiting for remoting result to return! method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (ExecutionException e) { Throwable t = e.getCause(); if (t instanceof TimeoutException) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } else if (t instanceof RemotingException) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } } catch (Throwable e) { throw new RpcException(e.getMessage(), e); } // 返回異步結果 return asyncResult; }
AsyncToSyncInvoker類從名字上就很好理解,它的作用是把異步結果轉化為同步結果。新的改動中每個調用只要不是oneway方式調用都會先以異步調用開始,然后根據配置的情況如果是同步調用,則會在這個類中進行異步結果轉同步的處理。當然,這里先是執行了invoke,然后就進入下一個AbstractInvoker的invoke了。
(十三)AbstractInvoker的invokepublic Result invoke(Invocation inv) throws RpcException { // if invoker is destroyed due to address refresh from registry, let"s allow the current invoke to proceed // 如果服務引用銷毀,則打印告警日志,但是通過 if (destroyed.get()) { logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer"); } RpcInvocation invocation = (RpcInvocation) inv; // 會話域中加入該調用鏈 invocation.setInvoker(this); // 把附加值放入會話域 if (CollectionUtils.isNotEmptyMap(attachment)) { invocation.addAttachmentsIfAbsent(attachment); } // 把上下文的附加值放入會話域 MapcontextAttachments = RpcContext.getContext().getAttachments(); if (CollectionUtils.isNotEmptyMap(contextAttachments)) { /** * invocation.addAttachmentsIfAbsent(context){@link RpcInvocation#addAttachmentsIfAbsent(Map)}should not be used here, * because the {@link RpcContext#setAttachment(String, String)} is passed in the Filter when the call is triggered * by the built-in retry mechanism of the Dubbo. The attachment to update RpcContext will no longer work, which is * a mistake in most cases (for example, through Filter to RpcContext output traceId and spanId and other information). */ invocation.addAttachments(contextAttachments); } // 從配置中得到是什么模式的調用,一共有FUTURE、ASYNC和SYNC invocation.setInvokeMode(RpcUtils.getInvokeMode(url, invocation)); // 加入編號 RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { // 執行調用鏈 return doInvoke(invocation); } catch (InvocationTargetException e) { // biz exception // 獲得異常 Throwable te = e.getTargetException(); if (te == null) { // 創建默認的異常異步結果 return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { if (te instanceof RpcException) { // 設置異常碼 ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } // 創建默認的異常異步結果 return AsyncRpcResult.newDefaultAsyncResult(null, te, invocation); } } catch (RpcException e) { if (e.isBiz()) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } else { throw e; } } catch (Throwable e) { return AsyncRpcResult.newDefaultAsyncResult(null, e, invocation); } }
可以參考《dubbo源碼解析(二十二)遠程調用——Protocol》的(三)AbstractInvoker。該方法做了一些公共的操作,比如服務引用銷毀的檢測,加入附加值,加入調用鏈實體域到會話域中等。然后執行了doInvoke抽象方法。各協議自己去實現。然后就是執行到doInvoke方法了。使用的協議不一樣,doInvoke的邏輯也有所不同,我這里舉的例子是使用dubbo協議,所以我就介紹DubboInvoker的doInvoke,其他自行查看具體的實現。此次的異步改造加入了InvokeMode,我會在后續中介紹這個。
(十四)DubboInvoker的doInvokeprotected Result doInvoke(final Invocation invocation) throws Throwable { // rpc會話域 RpcInvocation inv = (RpcInvocation) invocation; // 獲得方法名 final String methodName = RpcUtils.getMethodName(invocation); // 把path放入到附加值中 inv.setAttachment(PATH_KEY, getUrl().getPath()); // 把版本號放入到附加值 inv.setAttachment(VERSION_KEY, version); // 當前的客戶端 ExchangeClient currentClient; // 如果數組內就一個客戶端,則直接取出 if (clients.length == 1) { currentClient = clients[0]; } else { // 取模輪詢 從數組中取,當取到最后一個時,從頭開始 currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 是否是單向發送 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 獲得超時時間 int timeout = getUrl().getMethodParameter(methodName, TIMEOUT_KEY, DEFAULT_TIMEOUT); // 如果是單向發送 if (isOneway) { // 是否等待消息發送,默認不等待消息發出,將消息放入 IO 隊列,即刻返回。 boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // 單向發送只負責發送消息,不等待服務端應答,所以沒有返回值 currentClient.send(inv, isSent); // 設置future為null,因為單向發送沒有返回值 RpcContext.getContext().setFuture(null); // 創建一個默認的AsyncRpcResult return AsyncRpcResult.newDefaultAsyncResult(invocation); } else { // 否則直接創建AsyncRpcResult AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv); // 異步調用,返回CompletableFuture類型的future CompletableFuture
可以參考《dubbo源碼解析(二十四)遠程調用——dubbo協議》的(一)DubboInvoker,不過鏈接內的文章的源碼是2.6.x版本的,而上述的源碼是最新版本的,其中就有對于異步的改動,比如加入了異步返回結果、除了單向調用,一律都先處理成AsyncRpcResult等。具體的AsyncRpcResult以及其中用到的CompletableFuture我會在下文介紹。
上述源碼中執行currentClient.request或者currentClient.send,代表把請求放入channel中,交給channel來處理請求。最后來看一個currentClient.request,因為這其中涉及到了Future的構建。
(十五)ReferenceCountExchangeClient的requestpublic CompletableFuture
ReferenceCountExchangeClient是一個記錄請求數的類,用了適配器模式,對ExchangeClient做了功能增強。
可以參考《dubbo源碼解析(二十四)遠程調用——dubbo協議》的(八)ReferenceCountExchangeClient。
(十六)HeaderExchangeClient的requestpublic CompletableFuture
該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考《dubbo源碼解析(十)遠程通信——Exchange層》的(四)HeaderExchangeClient。然后進入HeaderExchangeChannel的request。
(十七)HeaderExchangeChannel的request可以參考《dubbo源碼解析(十)遠程通信——Exchange層》的(二)HeaderExchangeChannel,在這個request方法中就可以看到
// 創建DefaultFuture對象,可以從future中主動獲得請求對應的響應信息 DefaultFuture future = new DefaultFuture(channel, req, timeout);
生成了需要的future。異步請求結果就是從這個future中獲取。關于DefaultFuture也可以參考《dubbo源碼解析(十)遠程通信——Exchange層》的(七)DefaultFuture。
后面channel.send方法就是跟遠程通信有關了,例如使用netty作為通信實現,則會使用netty實現的客戶端進行通信。
(十八)AbstractPeer的send可以參考《dubbo源碼解析(九)遠程通信——Transport層》的(一)AbstractPeer,其中send方法比較簡單,根據sent配置項去做消息發送。接下來看AbstractClient的send
(十九)AbstractClient的send可以參考《dubbo源碼解析(九)遠程通信——Transport層》的(四)AbstractClient。
public void send(Object message, boolean sent) throws RemotingException { // 如果需要重連或者沒有鏈接,則連接 if (needReconnect && !isConnected()) { connect(); } // 獲得通道 Channel channel = getChannel(); //TODO Can the value returned by getChannel() be null? need improvement. if (channel == null || !channel.isConnected()) { throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl()); } // 通過通道發送消息 channel.send(message, sent); }
該方法中做了重連的邏輯,然后就是通過通道發送消息,dubbo有幾種通信的實現,我這里就按照默認的netty4實現來講解,所以下一步走到了NettyChannel的send。
(二十)NettyChannel的send可以參考《dubbo源碼解析(十七)遠程通信——Netty4》的(一)NettyChannel。這里其中先執行了下面父類AbstractChannel的send,檢查了一下通道是否關閉,然后再走下面的邏輯。當執行writeAndFlush方法后,消息就被發送。
dubbo數據包可以查看《dubbo源碼解析(十)遠程通信——Exchange層》的(二十五)ExchangeCodec,后續關于netty發送消息,以及netty出站數據在發出之前還需要進行編碼操作我就先不做介紹,主要是跟netty知識點強相關,只是dubbo做了一些自己的編碼,以及集成了各類序列化方式。
后記該文章講解了dubbo調用服務的方法所經歷的所有步驟,直到調用消息發送到服務端為止,是目前最新代碼的解析。下一篇文將講解服務端收到方法調用的請求后,如何處理以及如何把調用結果返回的過程。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/74796.html
摘要:大揭秘異步化改造目標從源碼的角度分析的新特性中對于異步化的改造原理。看源碼解析四十六消費端發送請求過程講到的十四的,在以前的邏輯會直接在方法中根據配置區分同步異步單向調用。改為關于可以參考源碼解析十遠程通信層的六。 2.7大揭秘——異步化改造 目標:從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協議,關于協議的系列可以查看下面的文章: du...
摘要:而存在的意義就是保證請求或響應對象可在線程池中被解碼,解碼完成后,就會分發到的。 2.7大揭秘——服務端處理請求過程 目標:從源碼的角度分析服務端接收到請求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費端發送請求的過程,該篇就要將服務端處理請求的過程。也就是當服務端收到請求數據包后的一系列處理以及如何返回最終結果。我們也知道消費端在發送請求的時候已經做了編碼,所以我...
摘要:大揭秘目標了解的新特性,以及版本升級的引導。四元數據改造我們知道以前的版本只有注冊中心,注冊中心的有數十個的鍵值對,包含了一個服務所有的元數據。 DUBBO——2.7大揭秘 目標:了解2.7的新特性,以及版本升級的引導。 前言 我們知道Dubbo在2011年開源,停止更新了一段時間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發布了 2.5.4 版本。隨后,版本...
摘要:服務引用過程目標從源碼的角度分析服務引用過程。并保留服務提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設置為查詢字符串中。的可以參考源碼解析二十三遠程調用的一的源碼分析。 dubbo服務引用過程 目標:從源碼的角度分析服務引用過程。 前言 前面服務暴露過程的文章講解到,服務引用有兩種方式,一種就是直連,也就是直接指定服務的地址來進行引用,這種方式更多的時候被用來做服務測試,不...
摘要:服務提供者代碼上面這個類會被封裝成為一個實例,并新生成一個實例。這樣當網絡通訊層收到一個請求后,會找到對應的實例,并調用它所對應的實例,從而真正調用了服務提供者的代碼。 這次源碼解析借鑒《肥朝》前輩的dubbo源碼解析,進行源碼學習。總結起來就是先總體,后局部.也就是先把需要注意的概念先拋出來,把整體架構圖先畫出來.讓讀者拿著地圖跟著我的腳步,并且每一步我都提醒,現在我們在哪,我們下一...
閱讀 2024·2021-09-30 09:47
閱讀 703·2021-09-22 15:43
閱讀 1981·2019-08-30 15:52
閱讀 2431·2019-08-30 15:52
閱讀 2540·2019-08-30 15:44
閱讀 903·2019-08-30 11:10
閱讀 3372·2019-08-29 16:21
閱讀 3296·2019-08-29 12:19