在本文中,我們將從快速介紹 Resilience4j 開始,然后深入探討其 Retry 模塊。我們將了解何時、如何使用它,以及它提供的功能。在此過程中,我們還將學習實現重試時的一些良好實踐。

代碼示例

本文在 GitHu 上附有工作代碼示例。

什么是 Resilience4j?

當應用程序通過網絡進行通信時,會有很多出錯的情況。由于連接斷開、網絡故障、上游服務不可用等,操作可能會超時或失敗。應用程序可能會相互過載、無響應甚至崩潰。

Resilience4j 是一個 Java 庫,可以幫助我們構建彈性和容錯的應用程序。它提供了一個框架,可編寫代碼以防止和處理此類問題

Resilience4j 為 Java 8 及更高版本編寫,適用于函數接口、lambda 表達式和方法引用等結構。

Resilience4j 模塊

讓我們快速瀏覽一下這些模塊及其用途:

模塊目的
Retry自動重試失敗的遠程操作
RateLimiter限制我們在一定時間內調用遠程操作的次數
TimeLimiter調用遠程操作時設置時間限制
Circuit Breaker當遠程操作持續失敗時,快速失敗或執行默認操作
Bulkhead限制并發遠程操作的數量
Cache存儲昂貴的遠程操作的結果

使用范式

雖然每個模塊都有其抽象,但通常的使用范式如下:

  1. 創建一個 Resilience4j 配置對象
  2. 為此類配置創建一個 Registry 對象
  3. 從注冊表創建或獲取 Resilience4j 對象
  4. 將遠程操作編碼為 lambda 表達式或函數式接口或通常的 Java 方法
  5. 使用提供的輔助方法之一圍繞第 4 步中的代碼創建裝飾器或包裝器
  6. 調用裝飾器方法來調用遠程操作
    步驟 1-5 通常在應用程序啟動時完成一次。讓我們看看重試模塊的這些步驟:
RetryConfig config = RetryConfig.ofDefaults(); // ----> 1RetryRegistry registry = RetryRegistry.of(config); // ----> 2Retry retry = registry.retry("flightSearchService", config); // ----> 3FlightSearchService searchService = new FlightSearchService();SearchRequest request = new SearchRequest("NYC", "LAX", "07/21/2020");Supplier> flightSearchSupplier =  () -> searchService.searchFlights(request); // ----> 4Supplier> retryingFlightSearch =  Retry.decorateSupplier(retry, flightSearchSupplier); // ----> 5System.out.println(retryingFlightSearch.get()); // ----> 6

什么時候使用重試?

遠程操作可以是通過網絡發出的任何請求。通常,它是以下之一:

  1. 向 REST 端點發送 HTTP 請求
  2. 調用遠程過程 (RPC) 或 Web 服務
  3. 從數據存儲(SQL/NoSQL 數據庫、對象存儲等)讀取和寫入數據
  4. 向消息代理(RabbitMQ/ActiveMQ/Kafka 等)發送和接收消息

當遠程操作失敗時,我們有兩種選擇——立即向我們的客戶端返回錯誤,或者重試操作。如果重試成功,這對客戶來說是件好事——他們甚至不必知道這是一個臨時問題。

選擇哪個選項取決于錯誤類型(瞬時或永久)、操作(冪等或非冪等)、客戶端(人或應用程序)和用例。

暫時性錯誤是暫時的,通常,如果重試,操作很可能會成功。請求被上游服務限制、連接斷開或由于某些服務暫時不可用而超時就是例子。

來自 REST API 的硬件故障或 404(未找到)響應是永久性錯誤的示例,重試無濟于事

如果我們想應用重試,操作必須是冪等的。假設遠程服務接收并處理了我們的請求,但在發送響應時出現問題。在這種情況下,當我們重試時,我們不希望服務將請求視為新請求或返回意外錯誤(想想銀行轉賬)。

重試會增加 API 的響應時間。如果客戶端是另一個應用程序,如 cron 作業或守護進程,這可能不是問題。但是,如果是一個人,有時最好做出響應,快速失敗并提供反饋,而不是在我們不斷重試時讓這個人等待。

對于某些關鍵用例,可靠性可能比響應時間更重要,即使客戶是個人,我們也可能需要實現重試。銀行轉賬或旅行社預訂航班和旅行酒店的轉賬就是很好的例子 - 用戶期望可靠性,而不是對此類用例的即時響應。我們可以通過立即通知用戶我們已接受他們的請求并在完成后通知他們來做出響應。

使用 Resilience4j 重試模塊

RetryRegistryRetryConfigRetryresilience4j-retry 中的主要抽象。RetryRegistry 是用于創建和管理 Retry 對象的工廠。RetryConfig 封裝了諸如應該嘗試重試多少次、嘗試之間等待多長時間等配置。每個 Retry 對象都與一個 RetryConfig 相關聯。 Retry 提供了輔助方法來為包含遠程調用的函數式接口或 lambda 表達式創建裝飾器。

讓我們看看如何使用 retry 模塊中可用的各種功能。假設我們正在為一家航空公司建立一個網站,以允許其客戶搜索和預訂航班。我們的服務與 FlightSearchService 類封裝的遠程服務通信。

簡單重試

在簡單重試中,如果在遠程調用期間拋出 RuntimeException,則重試該操作。 我們可以配置嘗試次數、嘗試之間等待多長時間等:

RetryConfig config = RetryConfig.custom()  .maxAttempts(3)  .waitDuration(Duration.of(2, SECONDS))  .build();// Registry, Retry creation omittedFlightSearchService service = new FlightSearchService();SearchRequest request = new SearchRequest("NYC", "LAX", "07/31/2020");Supplier> flightSearchSupplier =  () -> service.searchFlights(request);Supplier> retryingFlightSearch =  Retry.decorateSupplier(retry, flightSearchSupplier);System.out.println(retryingFlightSearch.get());

我們創建了一個 RetryConfig,指定我們最多要重試 3 次,并在兩次嘗試之間等待 2 秒。如果我們改用 RetryConfig.ofDefaults() 方法,則將使用 3 次嘗試和 500 毫秒等待持續時間的默認值。

我們將航班搜索調用表示為 lambda 表達式 - List<Flight>SupplierRetry.decorateSupplier() 方法使用重試功能裝飾此 Supplier。最后,我們在裝飾過的 Supplier 上調用 get() 方法來進行遠程調用。

如果我們想創建一個裝飾器并在代碼庫的不同位置重用它,我們將使用 decorateSupplier()。如果我們想創建它并立即執行它,我們可以使用 executeSupplier() 實例方法代替:

List flights = retry.executeSupplier(  () -> service.searchFlights(request));這是顯示第一個請求失敗然后第二次嘗試成功的示例輸出:Searching for flights; current time = 20:51:34 975Operation failedSearching for flights; current time = 20:51:36 985Flight search successful[Flight{flightNumber=XY 765, flightDate=07/31/2020, from=NYC, to=LAX}, ...]

在已檢異常上重試

現在,假設我們要重試已檢查和未檢查的異常。假設我們正在調用
FlightSearchService.searchFlightsThrowingException(),它可以拋出一個已檢查的 Exception。由于 Supplier 不能拋出已檢查的異常,我們會在這一行得到編譯器錯誤:

Supplier> flightSearchSupplier =  () -> service.searchFlightsThrowingException(request);

我們可能會嘗試在 lambda 表達式中處理 Exception 并返回 Collections.emptyList(),但這看起來不太好。更重要的是,由于我們自己捕獲 Exception,重試不再起作用:

ExceptionSupplier> flightSearchSupplier = () -> {    try {            return service.searchFlightsThrowingException(request);    } catch (Exception e) {      // dont do this, this breaks the retry!    }    return Collections.emptyList();  };

那么當我們想要重試遠程調用可能拋出的所有異常時,我們應該怎么做呢?我們可以使用
Retry.decorateCheckedSupplier()(或 executeCheckedSupplier() 實例方法)代替 Retry.decorateSupplier()

CheckedFunction0> retryingFlightSearch =  Retry.decorateCheckedSupplier(retry,    () -> service.searchFlightsThrowingException(request));try {  System.out.println(retryingFlightSearch.apply());} catch (...) {  // handle exception that can occur after retries are exhausted}

Retry.decorateCheckedSupplier() 返回一個 CheckedFunction0,它表示一個沒有參數的函數。請注意對 CheckedFunction0 對象的 apply() 調用以調用遠程操作。

如果我們不想使用 SuppliersRetry 提供了更多的輔助裝飾器方法,如 decorateFunction()decorateCheckedFunction()decorateRunnable()decorateCallable() 等,以與其他語言結構一起使用。decorate*decorateChecked* 版本之間的區別在于,decorate* 版本在 RuntimeExceptions 上重試,而 decorateChecked* 版本在 Exception 上重試。

有條件重試

上面的簡單重試示例展示了如何在調用遠程服務時遇到 RuntimeException 或已檢查 Exception 時重試。在實際應用中,我們可能不想對所有異常都重試。 例如,如果我們得到一個
AuthenticationFailedException 重試相同的請求將無濟于事。當我們進行 HTTP 調用時,我們可能想要檢查 HTTP 響應狀態代碼或在響應中查找特定的應用程序錯誤代碼來決定是否應該重試。讓我們看看如何實現這種有條件的重試。

Predicate-based條件重試

假設航空公司的航班服務定期初始化其數據庫中的航班數據。對于給定日期的飛行數據,此內部操作需要幾秒鐘時間。 如果我們在初始化過程中調用當天的航班搜索,該服務將返回一個特定的錯誤代碼 FS-167。航班搜索文檔說這是一個臨時錯誤,可以在幾秒鐘后重試該操作。

讓我們看看如何創建 RetryConfig

RetryConfig config = RetryConfig.custom()  .maxAttempts(3)  .waitDuration(Duration.of(3, SECONDS))  .retryOnResult(searchResponse -> searchResponse    .getErrorCode()    .equals("FS-167"))  .build();

我們使用 retryOnResult() 方法并傳遞執行此檢查的 Predicate。這個 Predicate 中的邏輯可以像我們想要的那樣復雜——它可以是對一組錯誤代碼的檢查,也可以是一些自定義邏輯來決定是否應該重試搜索。

Exception-based條件重試

假設我們有一個通用異常
FlightServiceBaseException,當在與航空公司的航班服務交互期間發生任何意外時會拋出該異常。作為一般策略,我們希望在拋出此異常時重試。但是我們不想重試 SeatsUnavailableException 的一個子類 - 如果航班上沒有可用座位,重試將無濟于事。我們可以通過像這樣創建 RetryConfig 來做到這一點:

RetryConfig config = RetryConfig.custom()  .maxAttempts(3)  .waitDuration(Duration.of(3, SECONDS))  .retryExceptions(FlightServiceBaseException.class)  .ignoreExceptions(SeatsUnavailableException.class)  .build();

retryExceptions() 中,我們指定了一個異常列表。ignoreExceptions() 將重試與此列表中的異常匹配或繼承的任何異常。我們把我們想忽略而不是重試的那些放入ignoreExceptions()。如果代碼在運行時拋出一些其他異常,比如 IOException,它也不會被重試。

假設即使對于給定的異常,我們也不希望在所有情況下都重試。也許我們只想在異常具有特定錯誤代碼或異常消息中的特定文本時重試。在這種情況下,我們可以使用 retryOnException 方法:

Predicate rateLimitPredicate = rle ->  (rle instanceof  RateLimitExceededException) &&  "RL-101".equals(((RateLimitExceededException) rle).getErrorCode());RetryConfig config = RetryConfig.custom()  .maxAttempts(3)  .waitDuration(Duration.of(1, SECONDS))  .retryOnException(rateLimitPredicate)  build();

與 predicate-based (基于謂詞)的條件重試一樣,謂詞內的檢查可以根據需要復雜化。

退避策略

到目前為止,我們的示例有固定的重試等待時間。通常我們希望在每次嘗試后增加等待時間——這是為了讓遠程服務有足夠的時間在當前過載的情況下進行恢復。我們可以使用 IntervalFunction 來做到這一點。

IntervalFunction 是一個函數式接口——它是一個以嘗試次數為參數并以毫秒為單位返回等待時間的 Function

隨機間隔

這里我們指定嘗試之間的隨機等待時間:

RetryConfig config = RetryConfig.custom().maxAttempts(4).intervalFunction(IntervalFunction.ofRandomized(2000)).build();

IntervalFunction.ofRandomized() 有一個關聯的 randomizationFactor。我們可以將其設置為 ofRandomized() 的第二個參數。如果未設置,則采用默認值 0.5。這個 randomizationFactor 決定了隨機值的分布范圍。因此,對于上面的默認值 0.5,生成的等待時間將介于 1000 毫秒(2000 - 2000 0.5)和 3000 毫秒(2000 + 2000 0.5)之間。

這種行為的示例輸出如下:

Searching for flights; current time = 20:27:08 729Operation failedSearching for flights; current time = 20:27:10 643Operation failedSearching for flights; current time = 20:27:13 204Operation failedSearching for flights; current time = 20:27:15 236Flight search successful[Flight{flightNumber=XY 765, flightDate=07/31/2020, from=NYC, to=LAX},...]

指數間隔

對于指數退避,我們指定兩個值 - 初始等待時間和乘數。在這種方法中,由于乘數,等待時間在嘗試之間呈指數增長。例如,如果我們指定初始等待時間為 1 秒,乘數為 2,則重試將在 1 秒、2 秒、4 秒、8 秒、16 秒等之后進行。當客戶端是后臺作業或守護進程時,此方法是推薦的方法。

以下是我們如何為指數退避創建 RetryConfig

RetryConfig config = RetryConfig.custom().maxAttempts(6).intervalFunction(IntervalFunction.ofExponentialBackoff(1000, 2)).build();

這種行為的示例輸出如下:

Searching for flights; current time = 20:37:02 684Operation failedSearching for flights; current time = 20:37:03 727Operation failedSearching for flights; current time = 20:37:05 731Operation failedSearching for flights; current time = 20:37:09 731Operation failedSearching for flights; current time = 20:37:17 731

IntervalFunction 還提供了一個 exponentialRandomBackoff() 方法,它結合了上述兩種方法。我們還可以提供 IntervalFunction 的自定義實現。

重試異步操作

直到現在我們看到的例子都是同步調用。讓我們看看如何重試異步操作。假設我們像這樣異步搜索航班:

CompletableFuture.supplyAsync(() -> service.searchFlights(request))  .thenAccept(System.out::println);

searchFlight() 調用發生在不同的線程上,當它返回時,返回的 List<Flight> 被傳遞給 thenAccept(),它只是打印它。

我們可以使用 Retry 對象上的 executeCompletionStage() 方法對上述異步操作進行重試。 此方法采用兩個參數 - 一個 ScheduledExecutorService 將在其上安排重試,以及一個 Supplier<CompletionStage> 將被裝飾。它裝飾并執行 CompletionStage,然后返回一個 CompletionStage,我們可以像以前一樣調用 thenAccept

ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();Supplier>> completionStageSupplier =  () -> CompletableFuture.supplyAsync(() -> service.searchFlights(request));retry.executeCompletionStage(scheduler, completionStageSupplier).thenAccept(System.out::println);

在實際應用程序中,我們將使用共享線程池 (
Executors.newScheduledThreadPool()) 來調度重試,而不是此處顯示的單線程調度執行器。

重試事件

在所有這些例子中,裝飾器都是一個黑盒子——我們不知道什么時候嘗試失敗了,框架代碼正在嘗試重試。假設對于給定的請求,我們想要記錄一些詳細信息,例如嘗試計數或下一次嘗試之前的等待時間。 我們可以使用在不同執行點發布的重試事件來做到這一點。Retry 有一個 EventPublisher,它具有 onRetry()onSuccess() 等方法。

我們可以通過實現這些監聽器方法來收集和記錄詳細信息:

Retry.EventPublisher publisher = retry.getEventPublisher();publisher.onRetry(event -> System.out.println(event.toString()));publisher.onSuccess(event -> System.out.println(event.toString()));

類似地,RetryRegistry 也有一個 EventPublisher,它在 Retry 對象被添加或從注冊表中刪除時發布事件。

重試指標

Retry 維護計數器以跟蹤操作的次數

  1. 第一次嘗試成功
  2. 重試后成功
  3. 沒有重試就失敗了
  4. 重試后仍失敗

每次執行裝飾器時,它都會更新這些計數器。

為什么要捕獲指標?

捕獲并定期分析指標可以讓我們深入了解上游服務的行為。它還可以幫助識別瓶頸和其他潛在問題

例如,如果我們發現某個操作通常在第一次嘗試時失敗,我們可以調查其原因。如果我們發現我們的請求在建立連接時受到限制或超時,則可能表明遠程服務需要額外的資源或容量。

如何捕獲指標?

Resilience4j 使用 Micrometer 發布指標。Micrometer 為監控系統(如 Prometheus、Azure Monitor、New Relic 等)提供了儀表客戶端的外觀。因此我們可以將指標發布到這些系統中的任何一個或在它們之間切換,而無需更改我們的代碼。

首先,我們像往常一樣創建 RetryConfigRetryRegistryRetry。然后,我們創建一個 MeterRegistry 并將 etryRegistry 綁定到它:

MeterRegistry meterRegistry = new SimpleMeterRegistry();TaggedRetryMetrics.ofRetryRegistry(retryRegistry).bindTo(meterRegistry);

運行幾次可重試操作后,我們顯示捕獲的指標:

Consumer meterConsumer = meter -> {  String desc = meter.getId().getDescription();  String metricName = meter.getId().getTag("kind");  Double metricValue = StreamSupport.stream(meter.measure().spliterator(), false)    .filter(m -> m.getStatistic().name().equals("COUNT"))    .findFirst()    .map(m -> m.getValue())    .orElse(0.0);  System.out.println(desc + " - " + metricName + ": " + metricValue);};meterRegistry.forEachMeter(meterConsumer);

一些示例輸出如下:

The number of successful calls without a retry attempt - successful_without_retry: 4.0The number of failed calls without a retry attempt - failed_without_retry: 0.0The number of failed calls after a retry attempt - failed_with_retry: 0.0The number of successful calls after a retry attempt - successful_with_retry: 6.0

當然,在實際應用中,我們會將數據導出到監控系統并在儀表板上查看。

重試時的注意事項和良好實踐

服務通常提供具有內置重試機制的客戶端庫或 SDK。對于云服務尤其如此。 例如,Azure CosmosDB 和 Azure 服務總線為客戶端庫提供內置重試工具。 它們允許應用程序設置重試策略來控制重試行為。

在這種情況下,最好使用內置的重試而不是我們自己的編碼。如果我們確實需要自己編寫,我們應該禁用內置的默認重試策略 - 否則,它可能導致嵌套重試,其中應用程序的每次嘗試都會導致客戶端庫的多次嘗試

一些云服務記錄瞬時錯誤代碼。例如,Azure SQL 提供了它期望數據庫客戶端重試的錯誤代碼列表。在決定為特定操作添加重試之前,最好檢查一下服務提供商是否有這樣的列表。

另一個好的做法是將我們在 RetryConfig 中使用的值(例如最大嘗試次數、等待時間和可重試錯誤代碼和異常)作為我們服務之外的配置進行維護。如果我們發現新的暫時性錯誤或者我們需要調整嘗試之間的間隔,我們可以在不構建和重新部署服務的情況下進行更改。

通常在重試時,框架代碼中的某處可能會發生 Thread.sleep()。對于在重試之間有等待時間的同步重試就是這種情況。如果我們的代碼在 Web 應用程序的上下文中運行,則 Thread 很可能是 Web 服務器的請求處理線程。因此,如果我們進行過多的重試,則會降低應用程序的吞吐量

結論

在本文中,我們了解了 Resilience4j 是什么,以及如何使用它的重試模塊使我們的應用程序可以在應對臨時錯誤具備彈性。我們研究了配置重試的不同方法,以及在不同方法之間做出決定的一些示例。我們學習了一些在實施重試時要遵循的良好實踐,以及收集和分析重試指標的重要性。

您可以使用 GitHub 上的代碼嘗試一個完整的應用程序來演示這些想法。


本文譯自: Implementing Retry with Resilience4j - Reflectoring