摘要:注意當一個文檔在快照的時間和索引請求過程之間發生變化時,會發生版本沖突。當版本匹配時,更新文檔并增加版本號。在正在運行的更新中,使用更改的值使用查找的值。值加快進程立即生效,減慢查詢的值在完成當前批處理后生效,以防止滾動超時。
文檔API
本節描述以下CRUD API:
單文檔的APIIndex API
Get API
Delete API
Update API
多文檔APIMulti Get API
Bulk API
Reindex API
Update By Query API
Delete By Query API
注意Index API
所有CRUD API都是單索引API,索引參數接受單個索引名,或指向單個索引的別名
index API允許將類型化的JSON文檔索引到特定的索引中,并使其可搜索。
生成JSON文檔生成JSON文檔有幾種不同的方法:
手動的(也就是你自己)使用原生byte[]或作為String
使用一個Map,該Map將自動轉換為它的JSON等效項
使用第三方庫對bean(如Jackson)進行序列化
使用內置的助手XContentFactory.jsonBuilder()
在內部,每個類型被轉換為byte[](像String被轉換為byte[]),因此,如果對象已經以這種形式存在,那么就使用它,jsonBuilder是高度優化的JSON生成器,它直接構造一個byte[]。
自己動手這里沒有什么困難,但是請注意,您必須根據日期格式對日期進行編碼。
String json = "{" + ""user":"kimchy"," + ""postDate":"2013-01-30"," + ""message":"trying out Elasticsearch"" + "}";使用Map
Map是一個鍵:值對集合,它表示一個JSON結構:
Map將bean序列化json = new HashMap (); json.put("user","kimchy"); json.put("postDate",new Date()); json.put("message","trying out Elasticsearch");
可以使用Jackson將bean序列化為JSON,請將Jackson Databind添加到您的項目中,然后,您可以使用ObjectMapper來序列化您的bean:
import com.fasterxml.jackson.databind.*; // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json byte[] json = mapper.writeValueAsBytes(yourbeaninstance);使用Elasticsearch助手
Elasticsearch提供了內置的助手來生成JSON內容。
import static org.elasticsearch.common.xcontent.XContentFactory.*; XContentBuilder builder = jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject()
注意,您還可以使用startArray(String)和endArray()方法添加數組,順便說一下,field方法接受許多對象類型,您可以直接傳遞數字、日期甚至其他XContentBuilder對象。
如果需要查看生成的JSON內容,可以使用string()方法。
String json = builder.string();索引文檔
下面的示例將JSON文檔索引為一個名為twitter的索引,其類型為tweet, id值為1:
import static org.elasticsearch.common.xcontent.XContentFactory.*; IndexResponse response = client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) .get();
注意,您還可以將文檔索引為JSON字符串,并且不需要提供ID:
String json = "{" + ""user":"kimchy"," + ""postDate":"2013-01-30"," + ""message":"trying out Elasticsearch"" + "}"; IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json, XContentType.JSON) .get();
IndexResponse對象會給你一個響應:
// Index name String _index = response.getIndex(); // Type name String _type = response.getType(); // Document ID (generated or not) String _id = response.getId(); // Version (if it"s the first time you index this document, you will get: 1) long _version = response.getVersion(); // status has stored current instance statement. RestStatus status = response.status();
有關索引操作的更多信息,請查看REST索引文檔
Get APIget API允許根據索引的id從索引中獲取類型化的JSON文檔,下面的示例從一個名為twitter的索引中獲取JSON文檔,該索引的類型名為tweet, id值為1:
GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
有關get操作的更多信息,請查看REST get文檔。
Delete APIdelete API允許基于id從特定索引中刪除類型化的JSON文檔,下面的示例從名為twitter的索引中刪除JSON文檔,該索引的類型名為tweet, id值為1:
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();Delete By Query API
通過查詢刪除的API可以根據查詢結果刪除給定的一組文檔:
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("persons") .get(); long deleted = response.getDeleted();
QueryBuilders.matchQuery("gender", "male")(查詢)
source("persons") (索引)
get()(執行操作)
response.getDeleted()(被刪除的文檔數)
由于這是一個長時間運行的操作,如果您希望異步執行,可以調用execute而不是get,并提供如下監聽器:
DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("persons") .execute(new ActionListenerUpdate API() { @Override public void onResponse(BulkByScrollResponse response) { long deleted = response.getDeleted(); } @Override public void onFailure(Exception e) { // Handle the exception } });
您可以創建一個UpdateRequest并將其發送給客戶端:
UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("index"); updateRequest.type("type"); updateRequest.id("1"); updateRequest.doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
也可以使用prepareUpdate()方法:
client.prepareUpdate("ttl", "doc", "1") .setScript(new Script("ctx._source.gender = "male"" , ScriptService.ScriptType.INLINE, null, null)) .get(); client.prepareUpdate("ttl", "doc", "1") .setDoc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .get();
Script()(你的腳本,它也可以是本地存儲的腳本名)
setDoc()(將合并到現有的文檔)
注意,您不能同時提供腳本和doc
使用腳本更新update API允許基于提供的腳本更新文檔:
UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1") .script(new Script("ctx._source.gender = "male"")); client.update(updateRequest).get();通過合并文檔更新
update API還支持傳遞一個部分文檔合并到現有文檔中(簡單的遞歸合并,內部合并對象,取代核心的“鍵/值”和數組),例如:
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();Upsert
也有對Upsert的支持,如果文檔不存在,則使用upsert元素的內容索引新的doc:
IndexRequest indexRequest = new IndexRequest("index", "type", "1") .source(jsonBuilder() .startObject() .field("name", "Joe Smith") .field("gender", "male") .endObject()); UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .upsert(indexRequest); client.update(updateRequest).get();
如果文檔不存在,將添加indexRequest中的文檔。
如果文件index/type/1已經存在,我們將在此操作后獲得如下文件:
{ "name" : "Joe Dalton", "gender": "male" }
"gender": "male"(此字段由更新請求添加)
如果不存在,我們將有一份新文件:
{ "name" : "Joe Smith", "gender": "male" }Multi Get API
multi get API允許根據文檔的index、type和id獲取文檔列表:
MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("another", "type", "foo") .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String json = response.getSourceAsString(); } }
add("twitter", "tweet", "1")(通過單一id)
add("twitter", "tweet", "2", "3", "4")(或以相同index/type的id列表)
add("another", "type", "foo")(你也可以從另一個索引中得到)
MultiGetItemResponse itemResponse : multiGetItemResponses(迭代結果集)
response.isExists()(您可以檢查文檔是否存在)
response.getSourceAsString()(訪問_source字段)
有關multi get操作的更多信息,請查看剩余的multi get文檔
Bulk APIbulk API允許在一個請求中索引和刪除多個文檔,這里有一個示例用法:
import static org.elasticsearch.common.xcontent.XContentFactory.*; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item }使用Bulk處理器
BulkProcessor類提供了一個簡單的接口,可以根據請求的數量或大小自動刷新bulk操作,或者在給定的時間之后。
要使用它,首先創建一個BulkProcessor實例:
import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build();
beforeBulk()
此方法在執行bulk之前被調用,例如,您可以通過request.numberOfActions()查看numberOfActions
afterBulk(...BulkResponse response)
此方法在執行bulk之后被調用,例如,您可以通過response.hasFailures()檢查是否存在失敗請求
afterBulk(...Throwable failure)
當bulk失敗并引發一個可拋出對象時,將調用此方法
setBulkActions(10000)
我們希望每10,000個請求就執行一次bulk
setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
我們希望每5MB就flush一次
setFlushInterval(TimeValue.timeValueSeconds(5))
無論請求的數量是多少,我們都希望每5秒flush一次
setConcurrentRequests(1)
設置并發請求的數量,值為0意味著只允許執行一個請求,在積累新的bulk請求時,允許執行一個值為1的并發請求
setBackoffPolicy()
設置一個自定義的備份策略,該策略最初將等待100ms,以指數形式增加并重試三次,當一個或多個bulk項目請求以EsRejectedExecutionException失敗時,將嘗試重試,該異常表明用于處理請求的計算資源太少,要禁用backoff,請傳遞BackoffPolicy.noBackoff()
默認情況下,BulkProcessor:
bulkActions設置為1000
bulkSize設置為5mb
不設置flushInterval
將concurrentrequest設置為1,這意味著flush操作的異步執行
將backoffPolicy設置為一個指數備份,8次重試,啟動延時為50ms,總等待時間約為5.1秒
添加請求然后您可以簡單地將您的請求添加到BulkProcessor:
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */)); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));關閉Bulk Processor
當所有的文檔都被加載到BulkProcessor,可以使用awaitClose或close方法進行關閉:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或
bulkProcessor.close();
如果通過設置flushInterval來調度其他計劃的flush,這兩種方法都將flush所有剩余的文檔,并禁用所有其他計劃flush。如果并發請求被啟用,那么awaitClose方法等待指定的超時以完成所有bulk請求,然后返回true,如果在所有bulk請求完成之前指定的等待時間已經過去,則返回false,close方法不等待任何剩余的批量請求完成并立即退出。
在測試中使用Bulk Processor如果您正在使用Elasticsearch運行測試,并且正在使用BulkProcessor來填充數據集,那么您最好將并發請求的數量設置為0,以便以同步方式執行批量的flush操作:
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ }) .setBulkActions(10000) .setConcurrentRequests(0) .build(); // Add your requests bulkProcessor.add(/* Your requests */); // Flush any remaining requests bulkProcessor.flush(); // Or close the bulkProcessor if you don"t need it anymore bulkProcessor.close(); // Refresh your indices client.admin().indices().prepareRefresh().get(); // Now you can start searching! client.prepareSearch().get();Update By Query API
updateByQuery最簡單的用法是在不更改源的情況下更新索引中的每個文檔,這種用法允許獲取一個新屬性或另一個在線映射更改。
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index").abortOnVersionConflict(false); BulkByScrollResponse response = updateByQuery.get();
對updateByQuery API的調用從獲取索引快照開始,索引使用內部版本控制找到任何文檔。
注意
當一個文檔在快照的時間和索引請求過程之間發生變化時,會發生版本沖突。
當版本匹配時,updateByQuery更新文檔并增加版本號。
所有更新和查詢失敗都會導致updateByQuery中止,這些故障可以從BulkByScrollResponse#getIndexingFailures方法中獲得,任何成功的更新仍然存在,并且不會回滾,當第一次失敗導致中止時,響應包含由失敗的bulk請求生成的所有失敗。
為了防止版本沖突導致updateByQuery中止,請設置abortOnVersionConflict(false),第一個示例之所以這樣做,是因為它試圖獲取在線映射更改,而版本沖突意味著在相同時間開始updateByQuery和試圖更新文檔的沖突文檔。這很好,因為該更新將獲取在線映射更新。
UpdateByQueryRequestBuilder API支持過濾更新的文檔,限制要更新的文檔總數,并使用腳本更新文檔:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index") .filter(QueryBuilders.termQuery("level", "awesome")) .size(1000) .script(new Script(ScriptType.INLINE, "ctx._source.awesome = "absolutely"", "painless", Collections.emptyMap())); BulkByScrollResponse response = updateByQuery.get();
UpdateByQueryRequestBuilder還允許直接訪問用于選擇文檔的查詢,您可以使用此訪問來更改默認的滾動大小,或者以其他方式修改對匹配文檔的請求。
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index") .source().setSize(500); BulkByScrollResponse response = updateByQuery.get();
您還可以將大小與排序相結合以限制文檔的更新:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index").size(100) .source().addSort("cat", SortOrder.DESC); BulkByScrollResponse response = updateByQuery.get();
除了更改文檔的_source字段外,還可以使用腳本更改操作,類似于Update API:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index") .script(new Script( ScriptType.INLINE, "if (ctx._source.awesome == "absolutely) {" + " ctx.op="noop"" + "} else if (ctx._source.awesome == "lame") {" + " ctx.op="delete"" + "} else {" + "ctx._source.awesome = "absolutely"}", "painless", Collections.emptyMap())); BulkByScrollResponse response = updateByQuery.get();
在Update API中,可以設置ctx.op的值來更改執行的操作:
noop
如果您的腳本沒有做任何更改,設置ctx.op = "noop",updateByQuery操作將從更新中省略該文檔,這種行為增加了響應主體中的noop計數器。
delete
如果您的腳本決定必須刪除該文檔,設置ctx.op = "delete",刪除將在響應主體中已刪除的計數器中報告。
將ctx.op設置為任何其他值都會產生錯誤,在ctx中設置任何其他字段都會產生錯誤。
這個API不允許您移動它所接觸的文檔,只是修改它們的源,這是故意的!我們沒有規定要把文件從原來的位置移走。
您也可以同時對多個索引和類型執行這些操作,類似于search API:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("foo", "bar").source().setTypes("a", "b"); BulkByScrollResponse response = updateByQuery.get();
如果提供路由值,則進程將路由值復制到滾動查詢,將進程限制為與路由值匹配的碎片:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source().setRouting("cat"); BulkByScrollResponse response = updateByQuery.get();
updateByQuery也可以通過指定這樣的pipeline來使用ingest節點:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.setPipeline("hurray"); BulkByScrollResponse response = updateByQuery.get();使用Task API
您可以使用Task API獲取所有正在運行的update-by-query請求的狀態:
ListTasksResponse tasksList = client.admin().cluster().prepareListTasks() .setActions(UpdateByQueryAction.NAME).setDetailed(true).get(); for (TaskInfo info: tasksList.getTasks()) { TaskId taskId = info.getTaskId(); BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus(); // do stuff }
使用上面所示的TaskId,您可以直接查找任務:
GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();
使用Cancel Task API
任何查詢更新都可以使用Task Cancel API取消:
// Cancel all update-by-query requests client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks(); // Cancel a specific update-by-query request client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks();
使用list tasks API查找taskId的值。
取消請求通常是一個非常快速的過程,但可能要花費幾秒鐘的時間,task status API繼續列出任務,直到取消完成。
Rethrottling在正在運行的更新中,使用_rethrottle API更改requests_per_second的值:
RethrottleAction.INSTANCE.newRequestBuilder(client) .setTaskId(taskId) .setRequestsPerSecond(2.0f) .get();
使用list tasks API查找taskId的值。
與updateByQuery API一樣,requests_per_second的值可以是任何正值的浮點值來設置節流的級別,或者Float.POSITIVE_INFINITY禁用節流。requests_per_second值加快進程立即生效,減慢查詢的requests_per_second值在完成當前批處理后生效,以防止滾動超時。
Reindex API詳情見reindex API
BulkByScrollResponse response = ReindexAction.INSTANCE.newRequestBuilder(client) .destination("target_index") .filter(QueryBuilders.matchQuery("category", "xzy")) .get();
還可以提供查詢來篩選應該從源索引到目標索引的哪些文檔。
文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。
轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69697.html
摘要:高級客戶端目前支持更常用的,但還有很多東西需要補充,您可以通過告訴我們您的應用程序需要哪些缺失的來幫助我們優化優先級,通過向這個添加注釋高級客戶端完整性。傳輸客戶端排除非數據節點的原因是為了避免將搜索流量發送給主節點。 前言 本節描述了Elasticsearch提供的Java API,所有的Elasticsearch操作都使用客戶端對象執行,所有操作本質上都是完全異步的(要么接收監聽器...
摘要:入門本節描述從獲取工件到在應用程序中使用它如何開始使用高級別客戶端。保證能夠與運行在相同主版本和大于或等于的次要版本上的任何節點通信。與具有相同的發布周期,將版本替換為想要的客戶端版本。 Java High Level REST Client 入門 本節描述從獲取工件到在應用程序中使用它如何開始使用高級別REST客戶端。 兼容性 Java High Level REST Client需...
閱讀 2161·2021-11-12 10:36
閱讀 2155·2021-09-03 10:41
閱讀 2769·2021-08-19 10:57
閱讀 1238·2021-08-17 10:14
閱讀 1496·2019-08-30 15:53
閱讀 1216·2019-08-30 15:43
閱讀 979·2019-08-30 13:16
閱讀 2989·2019-08-29 16:56