本文將詳細分析< dubbo:service executes=”“/>與< dubbo:reference actives = “”/>的實現機制,深入探討Dubbo自身的保護機制。
1、源碼分析ExecuteLimitFilter
@Activate(group = Constants.PROVIDER, value = Constants.EXECUTES_KEY )
過濾器作用
服務調用方并發度控制。
使用場景
對Dubbo服務提供者實現的一種保護機制,控制每個服務的最大并發度。
阻斷條件
當服務調用超過允許的并發度后,直接拋出RpcException異常。
接下來源碼分析ExecuteLimitFilter的實現細節。
ExecuteLimitFilter#invoke
代碼@1:從服務提供者列表中獲取參數executes的值,如果該值小于等于0,表示不啟用并發度控制,直接沿著調用鏈進行調用。
代碼@2:根據服務提供者url和服務調用方法名,獲取RpcStatus。
這里是并發容器ConcurrentHashMap的經典使用,從 這里可以看出ConcurrentMap< String, ConcurrentMap< String, RpcStatus>> METHOD_STATISTICS的存儲結構為 { 服務提供者URL唯一字符串:{方法名:RpcStatus} }。
代碼@3:根據服務提供者配置的最大并發度,創建該服務該方法對應的信號量對象。
使用了雙重檢測來創建executesLimit 信號量。
代碼@4:如果獲取不到鎖,并不會阻塞等待,而是直接拋出RpcException,服務端的策略是快速拋出異常,供服務調用方(消費者)根據集群策略進行執行,例如重試其他服務提供者。
代碼@5:執行真實的服務調用。
代碼@6:如果成功申請到信號量,在服務調用結束后,釋放信號量。
總結:< dubbo:service executes=”“/>的含義是,針對每個服務每個方法的最大并發度。如果超過該值,則直接拋出RpcException。
2、源碼分析ActiveLimitFilter
@Activate(group = Constants.CONSUMER, value = Constants.ACTIVES_KEY )
過濾器作用
消費端調用服務的并發控制。
使用場景
控制同一個消費端對服務端某一服務的并發調用度,通常該值應該小于< dubbo:service executes=”“/>
阻斷條件
非阻斷,但如果超過允許的并發度會阻塞,超過超時時間后將不再調用服務,而是直接拋出超時。
源碼分析ActiveLimitFilter的實現原理:
ActiveLimitFilter#invoke
代碼@1:從Invoker中獲取消息端URL中的配置的actives參數,為什么從Invoker中獲取的Url是消費端的Url呢?這是因為在消費端根據服務提供者URL創建調用Invoker時,會用服務提供者URL,然后合并消費端的配置屬性,其優先級 -D > 消費端 > 服務端。其代碼位于:、
RegistryDirectory#toInvokers
URL url = mergeUrl(providerUrl);
代碼@2:根據服務提供者URL和調用服務提供者方法,獲取RpcStatus。
代碼@3:獲取接口調用的超時時間,默認為1s。
代碼@4:獲取當前消費者,針對特定服務,特定方法的并發調用度,active值。
代碼@5:如果當前的并發 調用大于等于允許的最大值,則針對該RpcStatus申請鎖,并調用其wait(timeout)進行等待,也就是在接口調用超時時間內,還是未被喚醒,則直接拋出超時異常。
代碼@6:判斷被喚醒的原因是因為等待超時,還是由于調用結束,釋放了”名額“,如果是超時喚醒,則直接拋出異常。
代碼@7:在一次服務調用前,先將 服務名+方法名對應的RpcStatus的active加一。
代碼@8:執行RPC服務調用。
代碼@9:記錄成功調用或失敗調用,并將active減一。
代碼@10:最終成功執行,如果開啟了actives機制(dubbo:referecnce actives=”“)時,喚醒等待者。
總結:< dubbo:reference actives=”“/> 是控制消費端對 單個服務提供者單個服務允許調用的最大并發度。該值的取值不應該大于< dubbo:service executes=”“/>的值,并且如果消費者機器的配置,如果性能不盡相同,不建議對該值進行設置。