本文將詳細分析< dubbo:service executes=”“/>與< dubbo:reference actives = “”/>的實現機制,深入探討Dubbo自身的保護機制。

1、源碼分析ExecuteLimitFilter

@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY )


過濾器作用

服務調用方并發度控制。

使用場景

對Dubbo服務提供者實現的一種保護機制,控制每個服務的最大并發度。

阻斷條件

當服務調用超過允許的并發度后,直接拋出RpcException異常。

接下來源碼分析ExecuteLimitFilter的實現細節。

ExecuteLimitFilter#invoke


代碼@1:從服務提供者列表中獲取參數executes的值,如果該值小于等于0,表示不啟用并發度控制,直接沿著調用鏈進行調用。

代碼@2:根據服務提供者url和服務調用方法名,獲取RpcStatus。


這里是并發容器ConcurrentHashMap的經典使用,從 這里可以看出ConcurrentMap< String, ConcurrentMap< String, RpcStatus>> METHOD_STATISTICS的存儲結構為 { 服務提供者URL唯一字符串:{方法名:RpcStatus} }。

代碼@3:根據服務提供者配置的最大并發度,創建該服務該方法對應的信號量對象。


使用了雙重檢測來創建executesLimit 信號量。

代碼@4:如果獲取不到鎖,并不會阻塞等待,而是直接拋出RpcException,服務端的策略是快速拋出異常,供服務調用方(消費者)根據集群策略進行執行,例如重試其他服務提供者。

代碼@5:執行真實的服務調用。

代碼@6:如果成功申請到信號量,在服務調用結束后,釋放信號量。

總結:< dubbo:service executes=”“/>的含義是,針對每個服務每個方法的最大并發度。如果超過該值,則直接拋出RpcException。


2、源碼分析ActiveLimitFilter

@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY )


過濾器作用

消費端調用服務的并發控制。

使用場景

控制同一個消費端對服務端某一服務的并發調用度,通常該值應該小于< dubbo:service executes=”“/>

阻斷條件

非阻斷,但如果超過允許的并發度會阻塞,超過超時時間后將不再調用服務,而是直接拋出超時。

源碼分析ActiveLimitFilter的實現原理:

ActiveLimitFilter#invoke

代碼@1:從Invoker中獲取消息端URL中的配置的actives參數,為什么從Invoker中獲取的Url是消費端的Url呢?這是因為在消費端根據服務提供者URL創建調用Invoker時,會用服務提供者URL,然后合并消費端的配置屬性,其優先級 -D > 消費端 > 服務端。其代碼位于:、

RegistryDirectory#toInvokers

URL url = mergeUrl(providerUrl);

代碼@2:根據服務提供者URL和調用服務提供者方法,獲取RpcStatus。

代碼@3:獲取接口調用的超時時間,默認為1s。

代碼@4:獲取當前消費者,針對特定服務,特定方法的并發調用度,active值。

代碼@5:如果當前的并發 調用大于等于允許的最大值,則針對該RpcStatus申請鎖,并調用其wait(timeout)進行等待,也就是在接口調用超時時間內,還是未被喚醒,則直接拋出超時異常。

代碼@6:判斷被喚醒的原因是因為等待超時,還是由于調用結束,釋放了”名額“,如果是超時喚醒,則直接拋出異常。

代碼@7:在一次服務調用前,先將 服務名+方法名對應的RpcStatus的active加一。

代碼@8:執行RPC服務調用。

代碼@9:記錄成功調用或失敗調用,并將active減一。

代碼@10:最終成功執行,如果開啟了actives機制(dubbo:referecnce actives=”“)時,喚醒等待者。

總結:< dubbo:reference actives=”“/> 是控制消費端對 單個服務提供者單個服務允許調用的最大并發度。該值的取值不應該大于< dubbo:service executes=”“/>的值,并且如果消費者機器的配置,如果性能不盡相同,不建議對該值進行設置。