在本系列的上一篇文章中,我們了解了 Resilience4j 以及如何使用其 Retry 模塊。現在讓我們了解 RateLimiter - 它是什么,何時以及如何使用它,以及在實施速率限制(或者也稱為“節流”)時要注意什么。
代碼示例
本文附有GitHub 上的工作代碼示例。
什么是 Resilience4j?
請參閱上一篇文章中的描述,快速了解 Resilience4j 的一般工作原理。
什么是限速?
我們可以從兩個角度來看待速率限制——作為服務提供者和作為服務消費者。
服務端限速
作為服務提供商,我們實施速率限制以保護我們的資源免受過載和拒絕服務 (DoS) 攻-擊。
為了滿足我們與所有消費者的服務水平協議 (SLA),我們希望確保一個導致流量激增的消費者不會影響我們對他人的服務質量。
我們通過設置在給定時間單位內允許消費者發出多少請求的限制來做到這一點。我們通過適當的響應拒絕任何超出限制的請求,例如 HTTP 狀態 429(請求過多)。這稱為服務器端速率限制。
速率限制以每秒請求數 (rps)、每分鐘請求數 (rpm) 或類似形式指定。某些服務在不同的持續時間(例如 50 rpm 且不超過 2500 rph)和一天中的不同時間(例如,白天 100 rps 和晚上 150 rps)有多個速率限制。該限制可能適用于單個用戶(由用戶 ID、IP 地址、API 訪問密鑰等標識)或多租戶應用程序中的租戶。
客戶端限速
作為服務的消費者,我們希望確保我們不會使服務提供者過載。此外,我們不想招致意外的成本——無論是金錢上的還是服務質量方面的。
如果我們消費的服務是有彈性的,就會發生這種情況。服務提供商可能不會限制我們的請求,而是會因額外負載而向我們收取額外費用。有些甚至在短時間內禁止行為不端的客戶。消費者為防止此類問題而實施的速率限制稱為客戶端速率限制。
何時使用 RateLimiter?
resilience4j-ratelimiter 用于客戶端速率限制。
服務器端速率限制需要諸如緩存和多個服務器實例之間的協調之類的東西,這是 resilience4j 不支持的。對于服務器端的速率限制,有 API 網關和 API 過濾器,例如 Kong API Gateway 和 Repose API Filter。Resilience4j 的 RateLimiter 模塊并不打算取代它們。
Resilience4j RateLimiter 概念
想要調用遠程服務的線程首先向 RateLimiter 請求許可。如果 RateLimiter 允許,則線程繼續。 否則,RateLimiter 會停放線程或將其置于等待狀態。
RateLimiter 定期創建新權限。當權限可用時,線程會收到通知,然后可以繼續。
一段時間內允許的調用次數稱為 limitForPeriod
。RateLimiter 刷新權限的頻率由 limitRefreshPeriod
指定。timeoutDuration
指定線程可以等待多長時間獲取權限。如果在等待時間結束時沒有可用的權限,RateLimiter 將拋出 RequestNotPermitted
運行時異常。
使用Resilience4j RateLimiter 模塊
RateLimiterRegistry
、RateLimiterConfig
和 RateLimiter
是 resilience4j-ratelimiter 的主要抽象。
RateLimiterRegistry
是一個用于創建和管理 RateLimiter
對象的工廠。
RateLimiterConfig
封裝了 limitForPeriod
、limitRefreshPeriod
和 timeoutDuration
配置。每個 RateLimiter
對象都與一個 RateLimiterConfig
相關聯。
RateLimiter
提供輔助方法來為包含遠程調用的函數式接口或 lambda 表達式創建裝飾器。
讓我們看看如何使用 RateLimiter 模塊中可用的各種功能。假設我們正在為一家航空公司建立一個網站,以允許其客戶搜索和預訂航班。我們的服務與 FlightSearchService
類封裝的遠程服務對話。
基本示例
第一步是創建一個 RateLimiterConfig
:
RateLimiterConfig config = RateLimiterConfig.ofDefaults();
這將創建一個 RateLimiterConfig
,其默認值為 limitForPeriod
(50)、limitRefreshPeriod
(500ns) 和 timeoutDuration
(5s)。
假設我們與航空公司服務的合同規定我們可以以 1 rps 調用他們的搜索 API。然后我們將像這樣創建 RateLimiterConfig
:
RateLimiterConfig config = RateLimiterConfig.custom() .limitForPeriod(1) .limitRefreshPeriod(Duration.ofSeconds(1)) .timeoutDuration(Duration.ofSeconds(1)) .build();
如果線程無法在指定的 1 秒 timeoutDuration
內獲取權限,則會出錯。
然后我們創建一個 RateLimiter
并裝飾 searchFlights()
調用:
RateLimiterRegistry registry = RateLimiterRegistry.of(config);RateLimiter limiter = registry.rateLimiter("flightSearchService");// FlightSearchService and SearchRequest creation omittedSupplier> flightsSupplier = RateLimiter.decorateSupplier(limiter, () -> service.searchFlights(request));
最后,我們多次使用裝飾過的 Supplier<List<Flight>>
:
for (int i=0; i<3; i++) { System.out.println(flightsSupplier.get());}
示例輸出中的時間戳顯示每秒發出一個請求:
Searching for flights; current time = 15:29:40 786...[Flight{flightNumber=XY 765, ... }, ... ]Searching for flights; current time = 15:29:41 791...[Flight{flightNumber=XY 765, ... }, ... ]
如果超出限制,我們會收到 RequestNotPermitted
異常:
Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter flightSearchService does not permit further calls at io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43) at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)... other lines omitted ...
裝飾方法拋出已檢異常
假設我們正在調用FlightSearchService.searchFlightsThrowingException()
,它可以拋出一個已檢 Exception
。那么我們就不能使用RateLimiter.decorateSupplier()
。我們將使用RateLimiter.decorateCheckedSupplier()
代替:
CheckedFunction0> flights = RateLimiter.decorateCheckedSupplier(limiter, () -> service.searchFlightsThrowingException(request));try { System.out.println(flights.apply());} catch (...) { // exception handling}
RateLimiter.decorateCheckedSupplier()
返回一個 CheckedFunction0
,它表示一個沒有參數的函數。請注意對 CheckedFunction0
對象的 apply()
調用以調用遠程操作。
如果我們不想使用 Suppliers
,RateLimiter
提供了更多的輔助裝飾器方法,如 decorateFunction()
、decorateCheckedFunction()
、decorateRunnable()
、decorateCallable()
等,以與其他語言結構一起使用。decorateChecked*
方法用于裝飾拋出已檢查異常的方法。
應用多個速率限制
假設航空公司的航班搜索有多個速率限制:2 rps 和 40 rpm。 我們可以通過創建多個 RateLimiters
在客戶端應用多個限制:
RateLimiterConfig rpsConfig = RateLimiterConfig.custom(). limitForPeriod(2). limitRefreshPeriod(Duration.ofSeconds(1)). timeoutDuration(Duration.ofMillis(2000)).build();RateLimiterConfig rpmConfig = RateLimiterConfig.custom(). limitForPeriod(40). limitRefreshPeriod(Duration.ofMinutes(1)). timeoutDuration(Duration.ofMillis(2000)).build();RateLimiterRegistry registry = RateLimiterRegistry.of(rpsConfig);RateLimiter rpsLimiter = registry.rateLimiter("flightSearchService_rps", rpsConfig);RateLimiter rpmLimiter = registry.rateLimiter("flightSearchService_rpm", rpmConfig); 然后我們使用兩個 RateLimiters 裝飾 searchFlights() 方法:Supplier> rpsLimitedSupplier = RateLimiter.decorateSupplier(rpsLimiter, () -> service.searchFlights(request));Supplier> flightsSupplier = RateLimiter.decorateSupplier(rpmLimiter, rpsLimitedSupplier);
示例輸出顯示每秒發出 2 個請求,并且限制為 40 個請求:
Searching for flights; current time = 15:13:21 246...Searching for flights; current time = 15:13:21 249...Searching for flights; current time = 15:13:22 212...Searching for flights; current time = 15:13:40 215...Exception in thread "main" io.github.resilience4j.ratelimiter.RequestNotPermitted:RateLimiter flightSearchService_rpm does not permit further callsat io.github.resilience4j.ratelimiter.RequestNotPermitted.createRequestNotPermitted(RequestNotPermitted.java:43)at io.github.resilience4j.ratelimiter.RateLimiter.waitForPermission(RateLimiter.java:580)
在運行時更改限制
如果需要,我們可以在運行時更改 limitForPeriod
和 timeoutDuration
的值:
limiter.changeLimitForPeriod(2);limiter.changeTimeoutDuration(Duration.ofSeconds(2));
例如,如果我們的速率限制根據一天中的時間而變化,則此功能很有用 - 我們可以有一個計劃線程來更改這些值。新值不會影響當前正在等待權限的線程。
RateLimiter和 Retry一起使用
假設我們想在收到 RequestNotPermitted
異常時重試,因為它是一個暫時性錯誤。我們會像往常一樣創建 RateLimiter
和 Retry
對象。然后我們裝飾一個 Supplier
的供應商并用 Retry
包裝它:
Supplier> rateLimitedFlightsSupplier = RateLimiter.decorateSupplier(rateLimiter, () -> service.searchFlights(request));Supplier> retryingFlightsSupplier = Retry.decorateSupplier(retry, rateLimitedFlightsSupplier);
示例輸出顯示為 RequestNotPermitted
異常重試請求:
Searching for flights; current time = 15:29:39 847Flight search successful[Flight{flightNumber=XY 765, ... }, ... ]Searching for flights; current time = 17:10:09 218...[Flight{flightNumber=XY 765, flightDate=07/31/2020, from=NYC, to=LAX}, ...]2020-07-27T17:10:09.484: Retry rateLimitedFlightSearch, waiting PT1S until attempt 1. Last attempt failed with exception io.github.resilience4j.ratelimiter.RequestNotPermitted: RateLimiter flightSearchService does not permit further calls.Searching for flights; current time = 17:10:10 492...2020-07-27T17:10:10.494: Retry rateLimitedFlightSearch recorded a successful retry attempt...[Flight{flightNumber=XY 765, flightDate=07/31/2020, from=NYC, to=LAX}, ...]
我們創建裝飾器的順序很重要。如果我們將 Retry
與 RateLimiter
包裝在一起,它將不起作用。
RateLimiter 事件
RateLimiter
有一個 EventPublisher
,它在調用遠程操作時生成 RateLimiterOnSuccessEvent
和 RateLimiterOnFailureEvent
類型的事件,以指示獲取權限是否成功。我們可以監聽這些事件并記錄它們,例如:
RateLimiter limiter = registry.rateLimiter("flightSearchService");limiter.getEventPublisher().onSuccess(e -> System.out.println(e.toString()));limiter.getEventPublisher().onFailure(e -> System.out.println(e.toString()));
日志輸出示例如下:
RateLimiterEvent{type=SUCCESSFUL_ACQUIRE, rateLimiterName=flightSearchService, creationTime=2020-07-21T19:14:33.127+05:30}... other lines omitted ...RateLimiterEvent{type=FAILED_ACQUIRE, rateLimiterName=flightSearchService, creationTime=2020-07-21T19:14:33.186+05:30}
RateLimiter 指標
假設在實施客戶端節流后,我們發現 API 的響應時間增加了。這是可能的 - 正如我們所見,如果在線程調用遠程操作時權限不可用,RateLimiter
會將線程置于等待狀態。
如果我們的請求處理線程經常等待獲得許可,則可能意味著我們的 limitForPeriod
太低。也許我們需要與我們的服務提供商合作并首先獲得額外的配額。
監控 RateLimiter
指標可幫助我們識別此類容量問題,并確保我們在 RateLimiterConfig
上設置的值運行良好。
RateLimiter
跟蹤兩個指標:可用權限的數量(resilience4j.ratelimiter.available.permissions
)和等待權限的線程數量(resilience4j.ratelimiter.waiting.threads
)。
首先,我們像往常一樣創建 RateLimiterConfig
、RateLimiterRegistry
和 RateLimiter
。然后,我們創建一個 MeterRegistry
并將 RateLimiterRegistry
綁定到它:
MeterRegistry meterRegistry = new SimpleMeterRegistry();TaggedRateLimiterMetrics.ofRateLimiterRegistry(registry) .bindTo(meterRegistry);
運行幾次限速操作后,我們顯示捕獲的指標:
Consumer meterConsumer = meter -> { String desc = meter.getId().getDescription(); String metricName = meter.getId().getName(); Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false) .filter(m -> m.getStatistic().name().equals("VALUE")) .findFirst() .map(m -> m.getValue()) .orElse(0.0); System.out.println(desc + " - " + metricName + ": " + metricValue);};meterRegistry.forEachMeter(meterConsumer);
這是一些示例輸出:
The number of available permissions - resilience4j.ratelimiter.available.permissions: -6.0The number of waiting threads - resilience4j.ratelimiter.waiting_threads: 7.0
resilience4j.ratelimiter.available.permissions
的負值顯示為請求線程保留的權限數。在實際應用中,我們會定期將數據導出到監控系統,并在儀表板上進行分析。
實施客戶端速率限制時的陷阱和良好實踐
使速率限制器成為單例
對給定遠程服務的所有調用都應通過相同的 RateLimiter
實例。對于給定的遠程服務,RateLimiter
必須是單例。
如果我們不強制執行此操作,我們代碼庫的某些區域可能會繞過 RateLimiter
直接調用遠程服務。為了防止這種情況,對遠程服務的實際調用應該在核心、內部層和其他區域應該使用內部層暴露的限速裝飾器。
我們如何確保未來的新開發人員理解這一意圖?查看 Tom 的文章,其中揭示了一種解決此類問題的方法,即通過組織包結構來明確此類意圖。此外,它還展示了如何通過在 ArchUnit 測試中編碼意圖來強制執行此操作。
為多個服務器實例配置速率限制器
為配置找出正確的值可能很棘手。如果我們在集群中運行多個服務實例,limitForPeriod
的值必須考慮到這一點。
例如,如果上游服務的速率限制為 100 rps,而我們的服務有 4 個實例,那么我們將配置 25 rps 作為每個實例的限制。
然而,這假設我們每個實例上的負載大致相同。 如果情況并非如此,或者如果我們的服務本身具有彈性并且實例數量可能會有所不同,那么 Resilience4j 的 RateLimiter
可能不適合。
在這種情況下,我們需要一個速率限制器,將其數據保存在分布式緩存中,而不是像 Resilience4j RateLimiter
那樣保存在內存中。但這會影響我們服務的響應時間。另一種選擇是實現某種自適應速率限制。盡管 Resilience4j 可能會支持它,但尚不清楚何時可用。
選擇正確的超時時間
對于 timeoutDuration
配置值,我們應該牢記 API 的預期響應時間。
如果我們將 timeoutDuration 設置得太高,響應時間和吞吐量就會受到影響。如果它太低,我們的錯誤率可能會增加。
由于此處可能涉及一些反復試驗,因此一個好的做法是將我們在 RateLimiterConfig
中使用的值(如 timeoutDuration
、limitForPeriod
和 limitRefreshPeriod
)作為我們服務之外的配置進行維護。然后我們可以在不更改代碼的情況下更改它們。
調優客戶端和服務器端速率限制器
實現客戶端速率限制并不能保證我們永遠不會受到上游服務的速率限制。
假設我們有來自上游服務的 2 rps 的限制,并且我們將 limitForPeriod
配置為 2,將 limitRefreshPeriod
配置為 1s。如果我們在第二秒的最后幾毫秒發出兩個請求,在此之前沒有其他調用,RateLimiter
將允許它們。如果我們在下一秒的前幾毫秒內再進行兩次調用,RateLimiter
也會允許它們,因為有兩個新權限可用。但是上游服務可能會拒絕這兩個請求,因為服務器通常會實現基于滑動窗口的速率限制。
為了保證我們永遠不會從上游服務中獲得超過速率,我們需要將客戶端中的固定窗口配置為短于服務中的滑動窗口。因此,如果我們在前面的示例中將 limitForPeriod
配置為 1 并將 limitRefreshPeriod
配置為 500ms,我們就不會出現超出速率限制的錯誤。但是,第一個請求之后的所有三個請求都會等待,從而增加響應時間并降低吞吐量。
結論
在本文中,我們學習了如何使用 Resilience4j 的 RateLimiter 模塊來實現客戶端速率限制。 我們通過實際示例研究了配置它的不同方法。我們學習了一些在實施速率限制時要記住的良好做法和注意事項。
您可以使用 GitHub 上的代碼演示一個完整的應用程序來說明這些想法。
本文譯自: Implementing Rate Limiting with Resilience4j - Reflectoring