摘要:遠程調用開篇目標介紹之后解讀遠程調用模塊的內容如何編排介紹中的包結構設計以及最外層的的源碼解析。十該類就是遠程調用的上下文,貫穿著整個調用,例如調用,然后調用。十五該類是系統上下文,僅供內部使用。
遠程調用——開篇
目標:介紹之后解讀遠程調用模塊的內容如何編排、介紹dubbo-rpc-api中的包結構設計以及最外層的的源碼解析。前言
最近我面臨著一個選擇,因為dubbo 2.7.0-release出現在了倉庫里,最近一直在進行2.7.0版本的code review,那我之前說這一系列的文章都是講述2.6.x版本的源代碼,我現在要不要選擇直接開始講解2.7.0的版本的源碼呢?我最后還是決定繼續講解2.6.x,因為我覺得還是有很多公司在用著2.6.x的版本,并且對于升級2.7.0的計劃應該還沒那么快,并且在了解2.6.x版本的原理后,再去了解2.7.0新增的特性會更加容易,也能夠品位到設計者的意圖。當然在結束2.6.x的重要模塊講解后,我也會對2.7.0的新特性以及實現原理做一個全面的分析,2.7.0作為dubbo社區的畢業版,更加強大,敬請期待。
前面講了很多的內容,現在開始將遠程調用RPC,好像又回到我第一篇文章 《dubbo源碼解析(一)Hello,Dubbo》,在這篇文章開頭我講到了什么叫做RPC,再通俗一點講,就是我把一個項目的兩部分代碼分開來,分別放到兩臺機器上,當我部署在A服務器上的應用想要調用部署在B服務器上的應用等方法,由于不存在同一個內存空間,不能直接調用。而其實整個dubbo都在做遠程調用的事情,它涉及到很多內容,比如配置、代理、集群、監控等等,那么這次講的內容是只關心一對一的調用,dubbo-rpc遠程調用模塊抽象各種協議,以及動態代理,Proxy層和Protocol層rpc的核心,我將會在本系列中講到。下面我們來看兩張官方文檔的圖:
暴露服務的時序圖:
你會發現其中有我們以前講到的Transporter、Server、Registry,而這次的系列將會講到的就是紅色框框內的部分。
引用服務時序圖
在引用服務時序圖中,對應的也是紅色框框的部分。
當閱讀完該系列后,希望能對這個調用鏈有所感悟。接下來看看dubbo-rpc的包結構:
可以看到有很多包,很規整,其中dubbo-rpc-api是對協議、暴露、引用、代理等的抽象和實現,是rpc整個設計的核心內容。其他的包則是dubbo支持的9種協議,在官方文檔也能查看介紹,并且包括一種本地調用injvm。那么我們再來看看dubbo-rpc-api中包結構:
filter包:在進行服務引用時會進行一系列的過濾。其中包括了很多過濾器。
listener包:看上面兩張服務引用和服務暴露的時序圖,發現有兩個listener,其中的邏輯實現就在這個包內
protocol包:這個包實現了協議的一些公共邏輯
proxy包:實現了代理的邏輯。
service包:其中包含了一個需要調用的方法等封裝抽象。
support包:包括了工具類
最外層的實現。
下面的篇幅設計,本文會講解最外層的源碼和service下的源碼,support包下的源碼我會穿插在其他用到的地方一并講解,filter、listener、protocol、proxy以及各類協議的實現各自用一篇來講。
源碼分析 (一)Invokerpublic interface Invokerextends Node { /** * get service interface. * 獲得服務接口 * @return service interface. */ Class getInterface(); /** * invoke. * 調用下一個會話域 * @param invocation * @return result * @throws RpcException */ Result invoke(Invocation invocation) throws RpcException; }
該接口是實體域,它是dubbo的核心模型,其他模型都向它靠攏,或者轉化成它,它代表了一個可執行體,可以向它發起invoke調用,這個有可能是一個本地的實現,也可能是一個遠程的實現,也可能是一個集群的實現。它代表了一次調用
(二)Invocationpublic interface Invocation { /** * get method name. * 獲得方法名稱 * @return method name. * @serial */ String getMethodName(); /** * get parameter types. * 獲得參數類型 * @return parameter types. * @serial */ Class>[] getParameterTypes(); /** * get arguments. * 獲得參數 * @return arguments. * @serial */ Object[] getArguments(); /** * get attachments. * 獲得附加值集合 * @return attachments. * @serial */ MapgetAttachments(); /** * get attachment by key. * 獲得附加值 * @return attachment value. * @serial */ String getAttachment(String key); /** * get attachment by key with default value. * 獲得附加值 * @return attachment value. * @serial */ String getAttachment(String key, String defaultValue); /** * get the invoker in current context. * 獲得當前上下文的invoker * @return invoker. * @transient */ Invoker> getInvoker(); }
Invocation 是會話域,它持有調用過程中的變量,比如方法名,參數等。
(三)Exporterpublic interface Exporter{ /** * get invoker. * 獲得對應的實體域invoker * @return invoker */ Invoker getInvoker(); /** * unexport. * 取消暴露 * *
* getInvoker().destroy(); *
*/ void unexport(); }
該接口是暴露服務的接口,定義了兩個方法分別是獲得invoker和取消暴露服務。
(四)ExporterListener@SPI public interface ExporterListener { /** * The exporter exported. * 暴露服務 * @param exporter * @throws RpcException * @see com.alibaba.dubbo.rpc.Protocol#export(Invoker) */ void exported(Exporter> exporter) throws RpcException; /** * The exporter unexported. * 取消暴露 * @param exporter * @throws RpcException * @see com.alibaba.dubbo.rpc.Exporter#unexport() */ void unexported(Exporter> exporter); }
該接口是服務暴露的監聽器接口,定義了兩個方法是暴露和取消暴露,參數都是Exporter類型的。
(五)Protocol@SPI("dubbo") public interface Protocol { /** * Get default port when user doesn"t config the port. * 獲得默認的端口 * @return default port */ int getDefaultPort(); /** * Export service for remote invocation:
* 1. Protocol should record request source address after receive a request: * RpcContext.getContext().setRemoteAddress();
* 2. export() must be idempotent, that is, there"s no difference between invoking once and invoking twice when * export the same URL
* 3. Invoker instance is passed in by the framework, protocol needs not to care
* 暴露服務方法, * @paramService type 服務類型 * @param invoker Service invoker 服務的實體域 * @return exporter reference for exported service, useful for unexport the service later * @throws RpcException thrown when error occurs during export the service, for example: port is occupied */ @Adaptive Exporter export(Invoker invoker) throws RpcException; /** * Refer a remote service:
* 1. When user calls `invoke()` method of `Invoker` object which"s returned from `refer()` call, the protocol * needs to correspondingly execute `invoke()` method of `Invoker` object
* 2. It"s protocol"s responsibility to implement `Invoker` which"s returned from `refer()`. Generally speaking, * protocol sends remote request in the `Invoker` implementation.
* 3. When there"s check=false set in URL, the implementation must not throw exception but try to recover when * connection fails. * 引用服務方法 * @paramService type 服務類型 * @param type Service class 服務類名 * @param url URL address for the remote service * @return invoker service"s local proxy * @throws RpcException when there"s any error while connecting to the service provider */ @Adaptive Invoker refer(Class type, URL url) throws RpcException; /** * Destroy protocol:
* 1. Cancel all services this protocol exports and refers
* 2. Release all occupied resources, for example: connection, port, etc.
* 3. Protocol can continue to export and refer new service even after it"s destroyed. */ void destroy(); }
該接口是服務域接口,也是協議接口,它是一個可擴展的接口,默認實現的是dubbo協議。定義了四個方法,關鍵的是服務暴露和引用兩個方法。
(六)Filter@SPI public interface Filter { /** * do invoke filter. **
* // before filter * Result result = invoker.invoke(invocation); * // after filter * return result; *
* * @param invoker service * @param invocation invocation. * @return invoke result. * @throws RpcException * @see com.alibaba.dubbo.rpc.Invoker#invoke(Invocation) */ Result invoke(Invoker> invoker, Invocation invocation) throws RpcException; }
該接口是invoker調用時過濾器接口,其中就只有一個invoke方法。在該方法中對調用進行過濾
(七)InvokerListener@SPI public interface InvokerListener { /** * The invoker referred * 在服務引用的時候進行監聽 * @param invoker * @throws RpcException * @see com.alibaba.dubbo.rpc.Protocol#refer(Class, com.alibaba.dubbo.common.URL) */ void referred(Invoker> invoker) throws RpcException; /** * The invoker destroyed. * 銷毀實體域 * @param invoker * @see com.alibaba.dubbo.rpc.Invoker#destroy() */ void destroyed(Invoker> invoker); }
該接口是實體域的監聽器,定義了兩個方法,分別是服務引用和銷毀的時候執行的方法。
(八)Result該接口是實體域執行invoke的結果接口,里面定義了獲得結果異常以及附加值等方法。比較好理解我就不貼代碼了。
(九)ProxyFactory@SPI("javassist") public interface ProxyFactory { /** * create proxy. * 創建一個代理 * @param invoker * @return proxy */ @Adaptive({Constants.PROXY_KEY})T getProxy(Invoker invoker) throws RpcException; /** * create proxy. * 創建一個代理 * @param invoker * @return proxy */ @Adaptive({Constants.PROXY_KEY}) T getProxy(Invoker invoker, boolean generic) throws RpcException; /** * create invoker. * 創建一個實體域 * @param * @param proxy * @param type * @param url * @return invoker */ @Adaptive({Constants.PROXY_KEY}) Invoker getInvoker(T proxy, Class type, URL url) throws RpcException; }
該接口是代理工廠接口,它也是個可擴展接口,默認實現javassist,dubbo提供兩種動態代理方法分別是javassist/jdk,該接口定義了三個方法,前兩個方法是通過invoker創建代理,最后一個是通過代理來獲得invoker。
(十)RpcContext該類就是遠程調用的上下文,貫穿著整個調用,例如A調用B,然后B調用C。在服務B上,RpcContext在B之前將調用信息從A保存到B。開始調用C,并在B調用C后將調用信息從B保存到C。RpcContext保存了調用信息。
public class RpcContext { /** * use internal thread local to improve performance * 本地上下文 */ private static final InternalThreadLocalLOCAL = new InternalThreadLocal () { @Override protected RpcContext initialValue() { return new RpcContext(); } }; /** * 服務上下文 */ private static final InternalThreadLocal SERVER_LOCAL = new InternalThreadLocal () { @Override protected RpcContext initialValue() { return new RpcContext(); } }; /** * 附加值集合 */ private final Map attachments = new HashMap (); /** * 上下文值 */ private final Map values = new HashMap (); /** * 線程結果 */ private Future> future; /** * url集合 */ private List urls; /** * 當前的url */ private URL url; /** * 方法名稱 */ private String methodName; /** * 參數類型集合 */ private Class>[] parameterTypes; /** * 參數集合 */ private Object[] arguments; /** * 本地地址 */ private InetSocketAddress localAddress; /** * 遠程地址 */ private InetSocketAddress remoteAddress; /** * 實體域集合 */ @Deprecated private List > invokers; /** * 實體域 */ @Deprecated private Invoker> invoker; /** * 會話域 */ @Deprecated private Invocation invocation; // now we don"t use the "values" map to hold these objects // we want these objects to be as generic as possible /** * 請求 */ private Object request; /** * 響應 */ private Object response;
該類中最重要的是它的一些屬性,因為該上下文就是用來保存信息的。方法我就不介紹了,因為比較簡單。
(十一)RpcException/** * 不知道異常 */ public static final int UNKNOWN_EXCEPTION = 0; /** * 網絡異常 */ public static final int NETWORK_EXCEPTION = 1; /** * 超時異常 */ public static final int TIMEOUT_EXCEPTION = 2; /** * 基礎異常 */ public static final int BIZ_EXCEPTION = 3; /** * 禁止訪問異常 */ public static final int FORBIDDEN_EXCEPTION = 4; /** * 序列化異常 */ public static final int SERIALIZATION_EXCEPTION = 5;
該類是rpc調用拋出的異常類,其中封裝了五種通用的錯誤碼。
(十二)RpcInvocation/** * 方法名稱 */ private String methodName; /** * 參數類型集合 */ private Class>[] parameterTypes; /** * 參數集合 */ private Object[] arguments; /** * 附加值 */ private Mapattachments; /** * 實體域 */ private transient Invoker> invoker;
該類實現了Invocation接口,是rpc的會話域,其中的方法比較簡單,主要是封裝了上述的屬性。
(十三)RpcResult/** * 結果 */ private Object result; /** * 異常 */ private Throwable exception; /** * 附加值 */ private Mapattachments = new HashMap ();
該類實現了Result接口,是rpc的結果實現類,其中關鍵是封裝了以上三個屬性。
(十四)RpcStatus該類是rpc的一些狀態監控,其中封裝了許多的計數器,用來記錄rpc調用的狀態。
1.屬性/** * uri對應的狀態集合,key為uri,value為RpcStatus對象 */ private static final ConcurrentMapSERVICE_STATISTICS = new ConcurrentHashMap (); /** * method對應的狀態集合,key是uri,第二個key是方法名methodName */ private static final ConcurrentMap > METHOD_STATISTICS = new ConcurrentHashMap >(); /** * 已經沒用了 */ private final ConcurrentMap values = new ConcurrentHashMap (); /** * 活躍狀態 */ private final AtomicInteger active = new AtomicInteger(); /** * 總的數量 */ private final AtomicLong total = new AtomicLong(); /** * 失敗的個數 */ private final AtomicInteger failed = new AtomicInteger(); /** * 總調用時長 */ private final AtomicLong totalElapsed = new AtomicLong(); /** * 總調用失敗時長 */ private final AtomicLong failedElapsed = new AtomicLong(); /** * 最大調用時長 */ private final AtomicLong maxElapsed = new AtomicLong(); /** * 最大調用失敗時長 */ private final AtomicLong failedMaxElapsed = new AtomicLong(); /** * 最大調用成功時長 */ private final AtomicLong succeededMaxElapsed = new AtomicLong(); /** * Semaphore used to control concurrency limit set by `executes` * 信號量用來控制`execution`設置的并發限制 */ private volatile Semaphore executesLimit; /** * 用來控制`execution`設置的許可證 */ private volatile int executesPermits;
以上是該類的屬性,可以看到保存了很多的計數器,分別用來記錄了失敗調用成功調用等累計數。
2.beginCount/** * 開始計數 * @param url */ public static void beginCount(URL url, String methodName) { // 對該url對應對活躍計數器加一 beginCount(getStatus(url)); // 對該方法對活躍計數器加一 beginCount(getStatus(url, methodName)); } /** * 以原子方式加1 * @param status */ private static void beginCount(RpcStatus status) { status.active.incrementAndGet(); }
該方法是增加計數。
3.endCountpublic static void endCount(URL url, String methodName, long elapsed, boolean succeeded) { // url對應的狀態中計數器減一 endCount(getStatus(url), elapsed, succeeded); // 方法對應的狀態中計數器減一 endCount(getStatus(url, methodName), elapsed, succeeded); } private static void endCount(RpcStatus status, long elapsed, boolean succeeded) { // 活躍計數器減一 status.active.decrementAndGet(); // 總計數器加1 status.total.incrementAndGet(); // 總調用時長加上調用時長 status.totalElapsed.addAndGet(elapsed); // 如果最大調用時長小于elapsed,則設置最大調用時長 if (status.maxElapsed.get() < elapsed) { status.maxElapsed.set(elapsed); } // 如果rpc調用成功 if (succeeded) { // 如果成最大調用成功時長小于elapsed,則設置最大調用成功時長 if (status.succeededMaxElapsed.get() < elapsed) { status.succeededMaxElapsed.set(elapsed); } } else { // 失敗計數器加一 status.failed.incrementAndGet(); // 失敗的過期數加上elapsed status.failedElapsed.addAndGet(elapsed); // 總調用失敗時長小于elapsed,則設置總調用失敗時長 if (status.failedMaxElapsed.get() < elapsed) { status.failedMaxElapsed.set(elapsed); } } }
該方法是計數器減少。
(十五)StaticContext該類是系統上下文,僅供內部使用。
/** * 系統名稱 */ private static final String SYSTEMNAME = "system"; /** * 系統上下文集合,僅供內部使用 */ private static final ConcurrentMapcontext_map = new ConcurrentHashMap (); /** * 系統上下文名稱 */ private String name;
上面是該類的屬性,它還記錄了所有的系統上下文集合。
(十六)EchoServicepublic interface EchoService { /** * echo test. * 回聲測試 * @param message message. * @return message. */ Object $echo(Object message); }
該接口是回聲服務接口,定義了一個一個回聲測試的方法,回聲測試用于檢測服務是否可用,回聲測試按照正常請求流程執行,能夠測試整個調用是否通暢,可用于監控,所有服務自動實現該接口,只需將任意服務強制轉化為EchoService,就可以用了。
(十七)GenericException該方法是通用的異常類。
/** * 異常類名 */ private String exceptionClass; /** * 異常信息 */ private String exceptionMessage;
比較簡單,就封裝了兩個屬性。
(十八)GenericServicepublic interface GenericService { /** * Generic invocation * 通用的會話域 * @param method Method name, e.g. findPerson. If there are overridden methods, parameter info is * required, e.g. findPerson(java.lang.String) * @param parameterTypes Parameter types * @param args Arguments * @return invocation return value * @throws Throwable potential exception thrown from the invocation */ Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException; }
該接口是通用的服務接口,同樣定義了一個類似invoke的方法
后記該部分相關的源碼解析地址:https://github.com/CrazyHZM/i...
該文章講解了遠程調用的開篇,介紹之后解讀遠程調用模塊的內容如何編排、介紹dubbo-rpc-api中的包結構設計以及最外層的的源碼解析,其中的邏輯不負責,要關注的是其中的一些概念和dubbo如何去做暴露服務和引用服務,其中很多的接口定義需要弄清楚。接下來我將開始對rpc模塊的過濾器進行講解。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/72826.html
摘要:源碼分析一該類繼承了類,是協議實現的核心。屬性默認端口號不支持協議的服務暴露,拋出異常可以看到不支持服務暴露。后記該部分相關的源碼解析地址該文章講解了遠程調用中關于協議實現的部分,邏輯比較簡單。 遠程調用——redis協議 目標:介紹redis協議的設計和實現,介紹dubbo-rpc-redis的源碼。 前言 dubbo支持的redis協議是基于Redis的,Redis 是一個高效的 ...
摘要:而編碼器是講應用程序的數據轉化為網絡格式,解碼器則是講網絡格式轉化為應用程序,同時具備這兩種功能的單一組件就叫編解碼器。在中是老的編解碼器接口,而是新的編解碼器接口,并且已經用把適配成了。 遠程通訊——開篇 目標:介紹之后解讀遠程通訊模塊的內容如何編排、介紹dubbo-remoting-api中的包結構設計以及最外層的的源碼解析。 前言 服務治理框架中可以大致分為服務通信和服務管理兩個...
摘要:大揭秘異步化改造目標從源碼的角度分析的新特性中對于異步化的改造原理。看源碼解析四十六消費端發送請求過程講到的十四的,在以前的邏輯會直接在方法中根據配置區分同步異步單向調用。改為關于可以參考源碼解析十遠程通信層的六。 2.7大揭秘——異步化改造 目標:從源碼的角度分析2.7的新特性中對于異步化的改造原理。 前言 dubbo中提供了很多類型的協議,關于協議的系列可以查看下面的文章: du...
摘要:服務引用過程目標從源碼的角度分析服務引用過程。并保留服務提供者的部分配置,比如版本,,時間戳等最后將合并后的配置設置為查詢字符串中。的可以參考源碼解析二十三遠程調用的一的源碼分析。 dubbo服務引用過程 目標:從源碼的角度分析服務引用過程。 前言 前面服務暴露過程的文章講解到,服務引用有兩種方式,一種就是直連,也就是直接指定服務的地址來進行引用,這種方式更多的時候被用來做服務測試,不...
摘要:服務暴露過程目標從源碼的角度分析服務暴露過程。導出服務,包含暴露服務到本地,和暴露服務到遠程兩個過程。其中服務暴露的第八步已經沒有了。將泛化調用版本號或者等信息加入獲得服務暴露地址和端口號,利用內數據組裝成。 dubbo服務暴露過程 目標:從源碼的角度分析服務暴露過程。 前言 本來這一篇一個寫異步化改造的內容,但是最近我一直在想,某一部分的優化改造該怎么去撰寫才能更加的讓讀者理解。我覺...
閱讀 3242·2021-10-27 14:20
閱讀 2525·2021-10-08 10:05
閱讀 1625·2021-09-09 09:33
閱讀 2902·2019-08-30 13:16
閱讀 1435·2019-08-29 18:34
閱讀 1170·2019-08-29 10:58
閱讀 1228·2019-08-28 18:22
閱讀 1226·2019-08-26 13:33