国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

dubbo源碼解析(四十八)異步化改造

lijinke666 / 3767人閱讀

摘要:大揭秘異步化改造目標從源碼的角度分析的新特性中對于異步化的改造原理。看源碼解析四十六消費端發送請求過程講到的十四的,在以前的邏輯會直接在方法中根據配置區分同步異步單向調用。改為關于可以參考源碼解析十遠程通信層的六。

2.7大揭秘——異步化改造
目標:從源碼的角度分析2.7的新特性中對于異步化的改造原理。
前言

dubbo中提供了很多類型的協議,關于協議的系列可以查看下面的文章:

dubbo源碼解析(二十四)遠程調用——dubbo協議

dubbo源碼解析(二十五)遠程調用——hessian協議

dubbo源碼解析(二十六)遠程調用——http協議

dubbo源碼解析(二十七)遠程調用——injvm本地調用

dubbo源碼解析(二十八)遠程調用——memcached協議

dubbo源碼解析(二十九)遠程調用——redis協議

dubbo源碼解析(三十)遠程調用——rest協議

dubbo源碼解析(三十一)遠程調用——rmi協議

dubbo源碼解析(三十二)遠程調用——thrift協議

dubbo源碼解析(三十三)遠程調用——webservice協議

官方推薦的是使用dubbo協議,而異步調用的支持也是在dubbo協議中實現的。

看了我之前寫的2.7新特性的文章,應該對于異步化改造有個大致的印象。要弄懂異步在什么時候起作用,先要弄懂dubbo 的服務暴露和引用過程以及消費端發送請求過程和服務端處理請求過程。我在前四篇文章已經講述了相關內容,異步請求只是dubbo的一種請求方式,基于 dubbo 底層的異步 NIO 實現異步調用,對于 Provider 響應時間較長的場景是必須的,它能有效利用 Consumer 端的資源,相對于 Consumer 端使用多線程來說開銷較小。可以讓消費者無需阻塞等待返回結果。

經過改良后,Provider端也支持異步處理請求,引用官網的話就是現在Provider端異步執行和Consumer端異步調用是相互獨立的,你可以任意正交組合兩端配置。

如何開啟和使用異步可以查看以下鏈接:

Provider異步執行:http://dubbo.apache.org/zh-cn/docs/user/demos/async-execute-on-provider.html

Consumer異步調用:http://dubbo.apache.org/zh-cn/docs/user/demos/async-call.html

異步的改造 Listener做為Filter的內部接口

從設計上

廢棄了Filter原先的onResponse()方法

在Filter接口新增了內部接口Listener,相關接口設計如下。

優點:職責劃分更加明確,進行邏輯分組,增強可讀性,Filter本身應僅是傳遞調用的響應,而所有回調都放入Listener。這樣做以后可以把之前回調的邏輯從invoke里面剝離出來,放到Listener的onResponse或者onError中。

interface Listener {

    /**
     * 回調正常的調用結果
     * @param appResponse
     * @param invoker
     * @param invocation
     */
    void onResponse(Result appResponse, Invoker invoker, Invocation invocation);

    /**
     * 回調異常結果
     * @param t
     * @param invoker
     * @param invocation
     */
    void onError(Throwable t, Invoker invoker, Invocation invocation);
}

新增抽象類ListenableFilter,實現了Filter接口,其中只記錄了一個該過濾器的內部Listener實例。

public abstract class ListenableFilter implements Filter {

    protected Listener listener = null;

    public Listener listener() {
        // 提供該過濾器的內部類listener
        return listener;
    }
}
異步轉同步,新增InvokeMode

不變的是配置來決定調用方式,變的是在何時去做同步異步的不同邏輯處理。看《dubbo源碼解析(四十六)消費端發送請求過程》講到的(十四)DubboInvoker的doInvoke,在以前的邏輯會直接在doInvoke方法中根據配置區分同步、異步、單向調用。現在只多帶帶做了單向調用和需要返回結果的區分,統一先使用AsyncRpcResult來表示結果,也就是說一開始統一都是異步調用,然后在調用回到AsyncToSyncInvoker的invoke中時,才對同步異步做區分,這里新增了InvokeMode,InvokeMode現在有三種模式:SYNC, ASYNC, FUTURE。前兩種很顯而易見,后面一種是調用的返回類型是Future類型,代表調用的方法的返回類型是CompletableFuture類型,這種模式專門用來支持服務端異步的。看下面的源碼。

public static InvokeMode getInvokeMode(URL url, Invocation inv) {
    // 如果返回類型是future
    if (isReturnTypeFuture(inv)) {
        return InvokeMode.FUTURE;
    } else if (isAsync(url, inv)) {
        // 如果是異步調用
        return InvokeMode.ASYNC;
    } else {
        // 如果是同步
        return InvokeMode.SYNC;
    }
}

參考《dubbo源碼解析(四十六)消費端發送請求過程》的(十二)AsyncToSyncInvoker的invoke邏輯,如果是同步模式,就會阻塞調用get方法。直到調用成功有結果返回。如果不是同步模式,就直接返回。

ResponseFuture改為CompletableFuture

關于ResponseFuture可以參考《dubbo源碼解析(十)遠程通信——Exchange層》的(六)ResponseFuture。具體的可以看它的兩個實現(七)DefaultFuture和(八)SimpleFuture。

在這次改造中,最小JDK版本從以前的1.6變成了1.8。當然也要用到1.8中新特性,其中就包括CompletableFuture。dubbo的通信主要有兩處,一處是Consumer發送請求消息給Provider,另一處就是Provider把結果發送給Consumer。在Consumer發送請求消息給Provider的時候,Consumer不會一直處于等待,而是生成ResponseFuture會拋給下游去做其他操作,等到Provider把結果返回放入ResponseFuture,Consumer可以通過get方法獲得結果,或者它也支持回調。但是這就暴露了一些問題,也就是為在新特性里提到的缺陷:

Future只支持阻塞式的get()接口獲取結果。因為future.get()會導致線程阻塞。

Future接口無法實現自動回調,而自定義ResponseFuture雖支持callback回調但支持的異步場景有限,如不支持Future間的相互協調或組合等;

針對以上兩個不足,CompletableFuture可以很好的解決它們。

針對第一點不足,因為CompletableFuture實現了CompletionStage和Future接口,所以它還是可以像以前一樣通過阻塞或者輪詢的方式獲得結果。這一點就能保證阻塞式獲得結果,也就是同步調用不會被拋棄。當然本身也不是很建議用get()這樣阻塞的方式來獲取結果。

針對第二點不足,首先是自動回調,CompletableFuture提供了良好的回調方法。比如下面四個方法有關計算結果完成時的處理:

public CompletableFuture     whenComplete(BiConsumer action)
public CompletableFuture     whenCompleteAsync(BiConsumer action)
public CompletableFuture     whenCompleteAsync(BiConsumer action, Executor executor)
public CompletableFuture     exceptionally(Function fn)

當計算完成后,就會執行該方法中的action方法。相比于ResponseFuture,不再需要自己去做回調注冊的編碼,更加易于理解。

還是針對第二點,自定義的ResponseFuture不支持Future間的相互協調或組合,CompletableFuture很好的解決了這個問題,在CompletableFuture中以下三個方法實現了future之間轉化的功能:

public  CompletableFuture     thenApply(Function fn)
public  CompletableFuture     thenApplyAsync(Function fn)
public  CompletableFuture     thenApplyAsync(Function fn, Executor executor)

由于回調風格的實現,我們不必因為等待一個計算完成而阻塞著調用線程,而是告訴CompletableFuture當計算完成的時候請執行某個function。而且我們還可以將這些操作串聯起來,或者將CompletableFuture組合起來。這一組函數的功能是當原來的CompletableFuture計算完后,將結果傳遞給函數fn,將fn的結果作為新的CompletableFuture計算結果。因此它的功能相當于將CompletableFuture轉換成CompletableFuture

除了轉化之外,還有future之間組合的支持,例如以下三個方法:

public  CompletableFuture     thenCompose(Function> fn)
public  CompletableFuture     thenComposeAsync(Function> fn)
public  CompletableFuture     thenComposeAsync(Function> fn, Executor executor)

這一組方法接受一個Function作為參數,這個Function的輸入是當前的CompletableFuture的計算值,返回結果將是一個新的CompletableFuture,這個新的CompletableFuture會組合原來的CompletableFuture和函數返回的CompletableFuture。

現在就能看出CompletableFuture的強大了,它解決了自定義ResponseFuture的許多問題,該類有幾十個方法,感興趣的可以去一一嘗試。

隨處可見的CompletableFuture

可以看到以前的版本只能在RpcContext中進行獲取。而經過改良后,首先RpcContext一樣能過獲取,其次在過濾器鏈返回的Result中也能獲取,可以從最新的代碼中看到,原先的RpcResult類已經被去除,而在AsyncRpcResult也繼承了CompletableFuture類,也就是說有AsyncRpcResult的地方,就有CompletableFuture。并且在后續的dubbo3.0中,AsyncRpcResult將會內置CompletableFuture類型的變量,CompletableFuture的獲取方式也會大大增加。

AsyncRpcResult全面替代RpcResult

接下來我就來講解一下AsyncRpcResult類。

/**
 * 當相同的線程用于執行另一個RPC調用時,并且回調發生時,原來的RpcContext可能已經被更改。
 * 所以我們應該保留當前RpcContext實例的引用,并在執行回調之前恢復它。
 * 存儲當前的RpcContext
 */
private RpcContext storedContext;
/**
 * 存儲當前的ServerContext
 */
private RpcContext storedServerContext;

/**
 * 會話域
 */
private Invocation invocation;

public AsyncRpcResult(Invocation invocation) {
    // 設置會話域
    this.invocation = invocation;
    // 獲得當前線程內代表消費者端的Context
    this.storedContext = RpcContext.getContext();
    // 獲得當前線程內代表服務端的Context
    this.storedServerContext = RpcContext.getServerContext();
}

/**
 * 轉換成新的AsyncRpcResult
 * @param asyncRpcResult
 */
public AsyncRpcResult(AsyncRpcResult asyncRpcResult) {
    this.invocation = asyncRpcResult.getInvocation();
    this.storedContext = asyncRpcResult.getStoredContext();
    this.storedServerContext = asyncRpcResult.getStoredServerContext();
}

上面的是AsyncRpcResult核心的變量以及構造函數,storedContext和storedServerContext存儲了相關的RpcContext實例的引用,為的就是防止在回調的時候由于相同的線程用于執行另一個RPC調用導致原來的RpcContext可能已經被更改。所以存儲下來后,我們需要在執行回調之前恢復它。具體的可以看下面的thenApplyWithContext方法。

@Override
public Object getValue() {
    // 獲得計算的結果
    return getAppResponse().getValue();
}

@Override
public void setValue(Object value) {
    // 創建一個AppResponse實例
    AppResponse appResponse = new AppResponse();
    // 把結果放入AppResponse
    appResponse.setValue(value);
    // 標志該future完成,并且把攜帶結果的appResponse設置為該future的結果
    this.complete(appResponse);
}

@Override
public Throwable getException() {
    // 獲得拋出的異常信息
    return getAppResponse().getException();
}

@Override
public void setException(Throwable t) {
    // 創建一個AppResponse實例
    AppResponse appResponse = new AppResponse();
    // 把異常放入appResponse
    appResponse.setException(t);
    // 標志該future完成,并且把攜帶異常的appResponse設置為該future的結果
    this.complete(appResponse);
}

@Override
public boolean hasException() {
    // 設置是否有拋出異常
    return getAppResponse().hasException();
}

public Result getAppResponse() {
    // 如果該結果計算完成,則直接調用get方法獲得結果
    try {
        if (this.isDone()) {
            return this.get();
        }
    } catch (Exception e) {
        // This should never happen;
        logger.error("Got exception when trying to fetch the underlying result from AsyncRpcResult.", e);
    }
    // 創建AppResponse
    return new AppResponse();
}

這些實現了Result接口的方法,可以發現其中都是調用了AppResponse的方法,AppResponse跟AsyncRpcResult一樣也繼承了AbstractResult,不過它是作為回調的數據結構。AppResponse我會在異步化過濾器鏈回調中講述。

@Override
public Object recreate() throws Throwable {
    // 強制類型轉化
    RpcInvocation rpcInvocation = (RpcInvocation) invocation;
    // 如果返回的是future類型
    if (InvokeMode.FUTURE == rpcInvocation.getInvokeMode()) {
        // 創建AppResponse實例
        AppResponse appResponse = new AppResponse();
        // 創建future
        CompletableFuture future = new CompletableFuture<>();
        // appResponse設置future值,因為返回的就是CompletableFuture類型
        appResponse.setValue(future);
        // 當該AsyncRpcResult完成的時候,把結果放入future中,這樣返回的就是CompletableFuture包裹的結果
        this.whenComplete((result, t) -> {
            if (t != null) {
                if (t instanceof CompletionException) {
                    t = t.getCause();
                }
                future.completeExceptionally(t);
            } else {
                if (result.hasException()) {
                    future.completeExceptionally(result.getException());
                } else {
                    future.complete(result.getValue());
                }
            }
        });
        // 重置
        return appResponse.recreate();
    } else if (this.isDone()) {
        // 如果完成,則直接重置
        return this.get().recreate();
    }
    // 如果返回類型不是CompletableFuture,則調用AppResponse的重置
    return (new AppResponse()).recreate();
}

該方法是重置,本來也是直接調用了AppResponse的方法,不過因為支持了以CompletableFuture為返回類型的服務方法調用,所以這里做了一些額外的邏輯,也就是把結果用CompletableFuture包裹,作為返回的結果放入AppResponse實例中。可以對標使用了CompletableFuture簽名的服務。

@Override
public Result thenApplyWithContext(Function fn) {
    // 當該AsyncRpcResult完成后,結果作為參數先執行beforeContext,再執行fn,最后執行andThen
    this.thenApply(fn.compose(beforeContext).andThen(afterContext));
    // You may need to return a new Result instance representing the next async stage,
    // like thenApply will return a new CompletableFuture.
    return this;
}


/**
 * tmp context to use when the thread switch to Dubbo thread.
 * 臨時的RpcContext,當用戶線程切換為Dubbo線程時候使用
 */
/**
 * 臨時的RpcContext
 */
private RpcContext tmpContext;
private RpcContext tmpServerContext;

private Function beforeContext = (appResponse) -> {
    // 獲得當前線程消費者端的RpcContext
    tmpContext = RpcContext.getContext();
    // 獲得當前線程服務端的RpcContext
    tmpServerContext = RpcContext.getServerContext();
    // 重新設置消費者端的RpcContext
    RpcContext.restoreContext(storedContext);
    // 重新設置服務端的RpcContext
    RpcContext.restoreServerContext(storedServerContext);
    return appResponse;
};

private Function afterContext = (appResponse) -> {
    // 重新把臨時的RpcContext設置回去
    RpcContext.restoreContext(tmpContext);
    RpcContext.restoreServerContext(tmpServerContext);
    return appResponse;
};

把這幾部分代碼放在一起時因為當用戶線程切換為Dubbo線程時候需要用到臨時的RpcContext來記錄,如何使用該thenApplyWithContext方法,我也會在異步化過濾器鏈回調中講到。

其他的方法比較好理解,我就不一一講解。

異步化過濾器鏈回調

如果看過前兩篇關于發送請求和處理請求的過程,應該就知道在整個調用鏈中有許多的過濾器,而Consumer和Provider分別都有各自的過濾器來做一些功能增強。過濾器有執行鏈,也有回調鏈,如果整一個鏈路都是同步的,那么過濾器一旦增多,鏈路增長,就會帶來請求響應時間的增加,這當然是最不想看到的事情。那如果把過濾器的調用鏈異步化,那么我們就可以用一個future來代替結果拋給下游,讓下游不再阻塞。這樣就大大降低了響應時間,節省資源,提升RPC響應性能。而這里的future就是下面要介紹的AppResponse。那我先來介紹一下如何實現異步化過濾器鏈回調。我就拿消費端發送請求過程來舉例子說明。

參考《dubbo源碼解析(四十六)消費端發送請求過程》的(六)ProtocolFilterWrapper的內部類CallbackRegistrationInvoker的invoke,可以看到當所有的過濾器執行完后,會遍歷每一個過濾器鏈,獲得上面所說的內部接口Listener實現類,進行異步回調,因為請求已經在(十四)DubboInvoker的doInvoke中進行了發送,返回給下游一個AsyncRpcResult,而AsyncRpcResult內包裹的是AppResponse,可以看《dubbo源碼解析(四十七)服務端處理請求過程》的(二十三)AbstractProxyInvoker的invoke,當代理類執行相關方法后,會創建一個AppResponse,把結果放入AppResponse中。所以AsyncRpcResult中包裹的是AppResponse,然后調用回調方法onResponse。并且會執行thenApplyWithContext把回調結果放入上下文中。而這個上下文如何避免相同的線程用于執行另一個RPC調用導致原來的RpcContext可能已經被更改的情況,我也在上面已經說明。

新增AppResponse

AppResponse繼承了AbstractResult,同樣也是CompletableFuture類型,但是AppResponse跟AsyncRpcResult職能不一樣,AsyncRpcResult作為一個future,而AppResponse可以說是作為rpc調用結果的一個數據結構,它的實現很簡單,就是封裝了以下三個屬性和對應的一些方法。

/**
 * 調用結果
 */
private Object result;

/**
 * rpc調用時的異常
 */
private Throwable exception;

/**
 * 附加值
 */
private Map attachments = new HashMap();

前面我也講了,Provider處理請求完成后,會把結果放在AppResponse內,在整個鏈路調用過程中AsyncRpcResult內部必然會有一個AppResponse存在,而為上文提到的過濾器內置接口Listener的onResponse方法中的appResponse就是AppResponse類型的,它作為一個回調的數據類型。

后記

該文章講解了dubbo 2.7.x版本對于異步化改造的介紹,上面只是羅列了所有改動的點,沒有具體講述在哪些新增功能上的應用,如果感興趣,可以參考前幾篇的調用過程文章,來看看新增的功能點如何運用上述的設計的,比如Provider異步,有一種實現方式就用到了上述的InvokeMode。接下來一篇我會講述元數據的改造。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/75781.html

相關文章

  • dubbo源碼解析四十七)服務端處理請求過程

    摘要:而存在的意義就是保證請求或響應對象可在線程池中被解碼,解碼完成后,就會分發到的。 2.7大揭秘——服務端處理請求過程 目標:從源碼的角度分析服務端接收到請求后的一系列操作,最終把客戶端需要的值返回。 前言 上一篇講到了消費端發送請求的過程,該篇就要將服務端處理請求的過程。也就是當服務端收到請求數據包后的一系列處理以及如何返回最終結果。我們也知道消費端在發送請求的時候已經做了編碼,所以我...

    yzzz 評論0 收藏0
  • dubbo源碼解析四十三)2.7新特性

    摘要:大揭秘目標了解的新特性,以及版本升級的引導。四元數據改造我們知道以前的版本只有注冊中心,注冊中心的有數十個的鍵值對,包含了一個服務所有的元數據。 DUBBO——2.7大揭秘 目標:了解2.7的新特性,以及版本升級的引導。 前言 我們知道Dubbo在2011年開源,停止更新了一段時間。在2017 年 9 月 7 日,Dubbo 悄悄的在 GitHub 發布了 2.5.4 版本。隨后,版本...

    qqlcbb 評論0 收藏0
  • dubbo源碼解析四十六)消費端發送請求過程

    摘要:可以參考源碼解析二十四遠程調用協議的八。十六的該類也是用了適配器模式,該類主要的作用就是增加了心跳功能,可以參考源碼解析十遠程通信層的四。二十的可以參考源碼解析十七遠程通信的一。 2.7大揭秘——消費端發送請求過程 目標:從源碼的角度分析一個服務方法調用經歷怎么樣的磨難以后到達服務端。 前言 前一篇文章講到的是引用服務的過程,引用服務無非就是創建出一個代理。供消費者調用服務的相關方法。...

    fish 評論0 收藏0
  • dubbo源碼解析四十四)服務暴露過程

    摘要:服務暴露過程目標從源碼的角度分析服務暴露過程。導出服務,包含暴露服務到本地,和暴露服務到遠程兩個過程。其中服務暴露的第八步已經沒有了。將泛化調用版本號或者等信息加入獲得服務暴露地址和端口號,利用內數據組裝成。 dubbo服務暴露過程 目標:從源碼的角度分析服務暴露過程。 前言 本來這一篇一個寫異步化改造的內容,但是最近我一直在想,某一部分的優化改造該怎么去撰寫才能更加的讓讀者理解。我覺...

    light 評論0 收藏0
  • dubbo源碼解析四十二)序列——開篇

    摘要:在版本中,支持五種序列化方式,分別是依賴阿里的庫,功能強大支持普通類包括任意或完全兼容序列化協議的系列化框架,序列化速度大概是的倍,大小是大小的左右。但這里實際不是原生的序列化,而是阿里修改過的,它是默認啟用的序列化方式自帶的序列化實現。 序列化——開篇 目標:介紹dubbo中序列化的內容,對dubbo中支持的序列化方式做對比,介紹dubbo-serialization-api下的源碼...

    keke 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<