摘要:異步執行還可以以異步方式執行,以便客戶端可以直接返回,用戶需要指定如何通過將請求和偵聽器傳遞給異步塊方法來處理響應或潛在故障要執行的和在執行完成時使用的。在每次執行之前和之后,或者當一個失敗時,都會調用這個偵聽器。
Bulk API
Java High Level REST Client提供了Bulk處理器來幫助處理批量請求。Bulk請求
BulkRequest可以使用一個請求執行多個索引、更新和/或刪除操作。
它需要在批量請求中添加至少一個操作:
BulkRequest request = new BulkRequest(); request.add(new IndexRequest("posts").id("1") .source(XContentType.JSON,"field", "foo")); request.add(new IndexRequest("posts").id("2") .source(XContentType.JSON,"field", "bar")); request.add(new IndexRequest("posts").id("3") .source(XContentType.JSON,"field", "baz"));
創建BulkRequest。
將IndexRequest添加到Bulk請求。
Bulk API只支持JSON或SMILE編碼的文檔,提供任何其他格式的文檔都會導致錯誤。
不同的操作類型可以添加到同一個BulkRequest:
BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("posts", "3")); request.add(new UpdateRequest("posts", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("posts").id("4") .source(XContentType.JSON,"field", "baz"));
向BulkRequest添加DeleteRequest。
向BulkRequest添加UpdateRequest。
使用JSON格式添加IndexRequest。
可選參數可以選擇提供以下參數:
request.timeout(TimeValue.timeValueMinutes(2)); request.timeout("2m");
作為TimeValue等待bulk請求執行的超時。
作為String等待bulk請求執行的超時。
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
作為WriteRequest.RefreshPolicy實例的刷新策略。
作為String的刷新策略。
request.waitForActiveShards(2); request.waitForActiveShards(ActiveShardCount.ALL);
設置在繼續執行索引/更新/刪除操作之前必須活動的碎片副本的數量。
作為ActiveShardCount提供的碎片副本的數量:可以是ActiveShardCount.ALL、ActiveShardCount.ONE、ActiveShardCount.DEFAULT(默認)。
request.pipeline("pipelineId");
全局pipelineId用于所有子請求,除非在子請求上重寫。
request.routing("routingId");
全局routingId用于所有子請求,除非在子請求上重寫。
BulkRequest defaulted = new BulkRequest("posts");
在所有子請求上使用全局索引的bulk請求,除非在子請求上重寫,這個參數是@Nullable,并只能在創建BulkRequest時設置。
同步執行當以以下方式執行BulkRequest時,客戶端等待BulkResponse返回,然后繼續執行代碼:
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
在高級別REST客戶端中解析REST響應失敗、請求超時或類似的情況,其中沒有來自服務器的響應的情況下,同步調用可能引發IOException。
在服務器返回4xx或5xx錯誤代碼的情況下,高級別客戶端嘗試解析響應體錯誤細節,然后拋出一個通用的ElasticsearchException并將原始的ResponseException作為一個被抑制的異常添加到它。
異步執行還可以以異步方式執行BulkRequest,以便客戶端可以直接返回,用戶需要指定如何通過將請求和偵聽器傳遞給異步塊方法來處理響應或潛在故障:
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
要執行的BulkRequest和在執行完成時使用的ActionListener。
異步方法不會阻塞并立即返回,一旦執行完成,ActionListener將使用onResponse方法(如果執行成功)被調用,或者使用onFailure方法(如果執行失敗)被調用,失敗情況和預期的異常與同步執行情況相同。
一個典型的bulk監聽器是這樣的:
ActionListenerlistener = new ActionListener () { @Override public void onResponse(BulkResponse bulkResponse) { } @Override public void onFailure(Exception e) { } };
onResponse當執行成功完成時調用。
onFailure當整個BulkRequest失敗時調用。
Bulk響應返回的BulkResponse包含執行操作的信息,允許對每個結果進行如下迭代:
for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); switch (bulkItemResponse.getOpType()) { case INDEX: case CREATE: IndexResponse indexResponse = (IndexResponse) itemResponse; break; case UPDATE: UpdateResponse updateResponse = (UpdateResponse) itemResponse; break; case DELETE: DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } }
遍歷所有操作的結果。
檢索操作的響應(成功與否),可以是IndexResponse、UpdateResponse或DeleteResponse,它們都可以看作DocWriteResponse實例。
處理索引操作的響應。
處理更新操作的響應。
處理刪除操作的響應。
Bulk響應提供了一種方法來快速檢查一個或多個操作是否失敗:
if (bulkResponse.hasFailures()) { }
如果至少有一個操作失敗,此方法將返回true。
在這種情況下,需要對所有的操作結果進行迭代,以檢查操作是否失敗,如果失敗,則檢索相應的失敗:
for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); } }
指示給定操作是否失敗。
檢索失敗操作的失敗。
Bulk處理器BulkProcessor提供了一個實用程序類,允許索引/更新/刪除操作在添加到處理器時透明地執行,從而簡化了Bulk API的使用。
為了執行請求,BulkProcessor需要以下組件:
RestHighLevelClient
此客戶端用于執行BulkRequest并檢索BulkResponse。
BulkProcessor.Listener
在每次執行BulkRequest之前和之后,或者當一個BulkRequest失敗時,都會調用這個偵聽器。
然后BulkProcessor.builder方法可以用來構建一個新的BulkProcessor:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { } }; BulkProcessor bulkProcessor = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener).build();
創建BulkProcessor.Listener。
beforeBulk方法在每次執行BulkRequest之前調用。
afterBulk方法在每次執行BulkRequest之后調用。
帶failure參數的afterBulk方法在BulkRequest失敗時調用。
通過從BulkProcessor.builder調用build()方法創建BulkProcessor,RestHighLevelClient.bulkAsync()方法將用于在后臺執行BulkRequest。
BulkProcessor.Builder提供了一些方法來配置BulkProcessor應該如何處理請求執行:
BulkProcessor.Builder builder = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); builder.setBulkActions(500); builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); builder.setConcurrentRequests(0); builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); builder.setBackoffPolicy(BackoffPolicy .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
根據當前添加的操作數量設置刷新新bulk請求的時間(默認為1000,使用-1禁用它)。
根據當前添加的操作大小設置刷新新bulk請求的時間(默認為5Mb,使用-1禁用)。
設置允許執行的并發請求數量(默認為1,使用0只允許執行單個請求)。
設置刷新間隔,如果間隔通過,則刷新任何掛起的BulkRequest(默認為未設置)。
設置一個常量后退策略,該策略最初等待1秒并重試最多3次,有關更多選項,請參見BackoffPolicy.noBackoff()、BackoffPolicy.constantBackoff()和BackoffPolicy.exponentialBackoff()。
一旦創建了BulkProcessor,就可以向它添加請求:
IndexRequest one = new IndexRequest("posts").id("1") .source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts").id("2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts").id("3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);
請求將由BulkProcessor執行,它負責為每個bulk請求調用BulkProcessor.Listener。
監聽器提供訪問BulkRequest和BulkResponse的方法:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Failed to execute bulk", failure); } };
beforeBulk在執行BulkRequest的每次執行之前調用,這個方法允許知道將要在BulkRequest中執行的操作的數量。
afterBulk在每次執行BulkRequest之后調用,這個方法允許知道BulkResponse是否包含錯誤。
如果BulkRequest失敗,則調用帶failure參數的afterBulk方法,該方法允許知道失敗。
將所有請求添加到BulkProcessor之后,需要使用兩種可用的關閉方法之一關閉它的實例。
awaitClose()方法可以用來等待,直到所有的請求都被處理完畢或者指定的等待時間過去:
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
如果所有bulk請求都已完成,則該方法返回true,如果在所有bulk請求完成之前的等待時間已經過去,則返回false。
close()方法可用于立即關閉BulkProcessor:
這兩種方法都在關閉處理器之前刷新添加到處理器的請求,并且禁止向處理器添加任何新請求。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/75436.html
摘要:用于的官方高級別客戶端,基于低級別客戶端,它公開特定的方法,并負責請求編組和響應反編組。入門初始化執行請求讀取響應日志記錄通用配置嗅探器在中被添加。依賴于核心項目,它接受與相同的請求參數,并返回相同的響應對象。 Elasticsearch Java REST Client Java REST Client有兩種類型: Java Low Level REST Client:用于Elast...
摘要:入門本節描述從獲取工件到在應用程序中使用它如何開始使用高級別客戶端。保證能夠與運行在相同主版本和大于或等于的次要版本上的任何節點通信。與具有相同的發布周期,將版本替換為想要的客戶端版本。 Java High Level REST Client 入門 本節描述從獲取工件到在應用程序中使用它如何開始使用高級別REST客戶端。 兼容性 Java High Level REST Client需...
摘要:如果文檔存在,則返回,否則返回。禁用提取存儲的字段。異步方法不會阻塞并立即返回,完成后,如果執行成功完成,則使用方法回調,如果失敗則使用方法。的典型偵聽器如下所示執行成功完成時調用。 Exists API 如果文檔存在,則existsAPI返回true,否則返回false。 Exists請求 它就像Get API一樣使用GetRequest,支持所有可選參數,由于exists()只返回...
閱讀 3110·2021-11-24 09:39
閱讀 968·2021-09-07 10:20
閱讀 2389·2021-08-23 09:45
閱讀 2254·2021-08-05 10:00
閱讀 566·2019-08-29 16:36
閱讀 833·2019-08-29 11:12
閱讀 2813·2019-08-26 11:34
閱讀 1839·2019-08-26 10:56