国产xxxx99真实实拍_久久不雅视频_高清韩国a级特黄毛片_嗯老师别我我受不了了小说

資訊專欄INFORMATION COLUMN

Elasticsearch Java API 6.2(文檔API)

lykops / 1623人閱讀

摘要:注意當一個文檔在快照的時間和索引請求過程之間發生變化時,會發生版本沖突。當版本匹配時,更新文檔并增加版本號。在正在運行的更新中,使用更改的值使用查找的值。值加快進程立即生效,減慢查詢的值在完成當前批處理后生效,以防止滾動超時。

文檔API

本節描述以下CRUD API:

單文檔的API

Index API

Get API

Delete API

Update API

多文檔API

Multi Get API

Bulk API

Reindex API

Update By Query API

Delete By Query API

注意
所有CRUD API都是單索引API,索引參數接受單個索引名,或指向單個索引的別名
Index API

index API允許將類型化的JSON文檔索引到特定的索引中,并使其可搜索。

生成JSON文檔

生成JSON文檔有幾種不同的方法:

手動的(也就是你自己)使用原生byte[]或作為String

使用一個Map,該Map將自動轉換為它的JSON等效項

使用第三方庫對bean(如Jackson)進行序列化

使用內置的助手XContentFactory.jsonBuilder()

在內部,每個類型被轉換為byte[](像String被轉換為byte[]),因此,如果對象已經以這種形式存在,那么就使用它,jsonBuilder是高度優化的JSON生成器,它直接構造一個byte[]

自己動手

這里沒有什么困難,但是請注意,您必須根據日期格式對日期進行編碼。

String json = "{" +
        ""user":"kimchy"," +
        ""postDate":"2013-01-30"," +
        ""message":"trying out Elasticsearch"" +
    "}";
使用Map

Map是一個鍵:值對集合,它表示一個JSON結構:

Map json = new HashMap();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
bean序列化

可以使用Jacksonbean序列化為JSON,請將Jackson Databind添加到您的項目中,然后,您可以使用ObjectMapper來序列化您的bean:

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
使用Elasticsearch助手

Elasticsearch提供了內置的助手來生成JSON內容。

import static org.elasticsearch.common.xcontent.XContentFactory.*;

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("user", "kimchy")
        .field("postDate", new Date())
        .field("message", "trying out Elasticsearch")
    .endObject()

注意,您還可以使用startArray(String)endArray()方法添加數組,順便說一下,field方法接受許多對象類型,您可以直接傳遞數字、日期甚至其他XContentBuilder對象。

如果需要查看生成的JSON內容,可以使用string()方法。

String json = builder.string();
索引文檔

下面的示例將JSON文檔索引為一個名為twitter的索引,其類型為tweet, id值為1:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        .get();

注意,您還可以將文檔索引為JSON字符串,并且不需要提供ID:

String json = "{" +
        ""user":"kimchy"," +
        ""postDate":"2013-01-30"," +
        ""message":"trying out Elasticsearch"" +
    "}";

IndexResponse response = client.prepareIndex("twitter", "tweet")
        .setSource(json, XContentType.JSON)
        .get();

IndexResponse對象會給你一個響應:

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it"s the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();

有關索引操作的更多信息,請查看REST索引文檔

Get API

get API允許根據索引的id從索引中獲取類型化的JSON文檔,下面的示例從一個名為twitter的索引中獲取JSON文檔,該索引的類型名為tweet, id值為1:

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

有關get操作的更多信息,請查看REST get文檔。

Delete API

delete API允許基于id從特定索引中刪除類型化的JSON文檔,下面的示例從名為twitter的索引中刪除JSON文檔,該索引的類型名為tweet, id值為1:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
Delete By Query API

通過查詢刪除的API可以根據查詢結果刪除給定的一組文檔:

BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male")) 
    .source("persons")                                  
    .get();                                             
long deleted = response.getDeleted();

QueryBuilders.matchQuery("gender", "male")(查詢)

source("persons") (索引)

get()(執行操作)

response.getDeleted()(被刪除的文檔數)

由于這是一個長時間運行的操作,如果您希望異步執行,可以調用execute而不是get,并提供如下監聽器:

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))     
    .source("persons")                                      
    .execute(new ActionListener() {   
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();           
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });
Update API

您可以創建一個UpdateRequest并將其發送給客戶端:

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

也可以使用prepareUpdate()方法:

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = "male""  , ScriptService.ScriptType.INLINE, null, null))
        .get();

client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder()               
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();

Script()(你的腳本,它也可以是本地存儲的腳本名)

setDoc()(將合并到現有的文檔)

注意,您不能同時提供腳本和doc

使用腳本更新

update API允許基于提供的腳本更新文檔:

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = "male""));
client.update(updateRequest).get();
通過合并文檔更新

update API還支持傳遞一個部分文檔合并到現有文檔中(簡單的遞歸合并,內部合并對象,取代核心的“鍵/值”和數組),例如:

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();
Upsert

也有對Upsert的支持,如果文檔不存在,則使用upsert元素的內容索引新的doc:

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest);              
client.update(updateRequest).get();

如果文檔不存在,將添加indexRequest中的文檔。

如果文件index/type/1已經存在,我們將在此操作后獲得如下文件:

{
    "name"  : "Joe Dalton",
    "gender": "male"        
}

"gender": "male"(此字段由更新請求添加)

如果不存在,我們將有一份新文件:

{
    "name" : "Joe Smith",
    "gender": "male"
}
Multi Get API

multi get API允許根據文檔的indextypeid獲取文檔列表:

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1")           
    .add("twitter", "tweet", "2", "3", "4") 
    .add("another", "type", "foo")          
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {                      
        String json = response.getSourceAsString(); 
    }
}

add("twitter", "tweet", "1")(通過單一id)

add("twitter", "tweet", "2", "3", "4")(或以相同index/type的id列表)

add("another", "type", "foo")(你也可以從另一個索引中得到)

MultiGetItemResponse itemResponse : multiGetItemResponses(迭代結果集)

response.isExists()(您可以檢查文檔是否存在)

response.getSourceAsString()(訪問_source字段)

有關multi get操作的更多信息,請查看剩余的multi get文檔

Bulk API

bulk API允許在一個請求中索引和刪除多個文檔,這里有一個示例用法:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}
使用Bulk處理器

BulkProcessor類提供了一個簡單的接口,可以根據請求的數量或大小自動刷新bulk操作,或者在給定的時間之后。

要使用它,首先創建一個BulkProcessor實例:

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
        .build();

beforeBulk()

此方法在執行bulk之前被調用,例如,您可以通過request.numberOfActions()查看numberOfActions

afterBulk(...BulkResponse response)

此方法在執行bulk之后被調用,例如,您可以通過response.hasFailures()檢查是否存在失敗請求

afterBulk(...Throwable failure)

bulk失敗并引發一個可拋出對象時,將調用此方法

setBulkActions(10000)

我們希望每10,000個請求就執行一次bulk

setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))

我們希望每5MB就flush一次

setFlushInterval(TimeValue.timeValueSeconds(5))

無論請求的數量是多少,我們都希望每5秒flush一次

setConcurrentRequests(1)

設置并發請求的數量,值為0意味著只允許執行一個請求,在積累新的bulk請求時,允許執行一個值為1的并發請求

setBackoffPolicy()

設置一個自定義的備份策略,該策略最初將等待100ms,以指數形式增加并重試三次,當一個或多個bulk項目請求以EsRejectedExecutionException失敗時,將嘗試重試,該異常表明用于處理請求的計算資源太少,要禁用backoff,請傳遞BackoffPolicy.noBackoff()

默認情況下,BulkProcessor:

bulkActions設置為1000

bulkSize設置為5mb

不設置flushInterval

concurrentrequest設置為1,這意味著flush操作的異步執行

backoffPolicy設置為一個指數備份,8次重試,啟動延時為50ms,總等待時間約為5.1秒

添加請求

然后您可以簡單地將您的請求添加到BulkProcessor:

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
關閉Bulk Processor

當所有的文檔都被加載到BulkProcessor,可以使用awaitCloseclose方法進行關閉:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

bulkProcessor.close();

如果通過設置flushInterval來調度其他計劃的flush,這兩種方法都將flush所有剩余的文檔,并禁用所有其他計劃flush。如果并發請求被啟用,那么awaitClose方法等待指定的超時以完成所有bulk請求,然后返回true,如果在所有bulk請求完成之前指定的等待時間已經過去,則返回falseclose方法不等待任何剩余的批量請求完成并立即退出。

在測試中使用Bulk Processor

如果您正在使用Elasticsearch運行測試,并且正在使用BulkProcessor來填充數據集,那么您最好將并發請求的數量設置為0,以便以同步方式執行批量的flush操作:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don"t need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();
Update By Query API

updateByQuery最簡單的用法是在不更改源的情況下更新索引中的每個文檔,這種用法允許獲取一個新屬性或另一個在線映射更改。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").abortOnVersionConflict(false);
BulkByScrollResponse response = updateByQuery.get();

updateByQuery API的調用從獲取索引快照開始,索引使用內部版本控制找到任何文檔。

注意
當一個文檔在快照的時間和索引請求過程之間發生變化時,會發生版本沖突。

當版本匹配時,updateByQuery更新文檔并增加版本號。

所有更新和查詢失敗都會導致updateByQuery中止,這些故障可以從BulkByScrollResponse#getIndexingFailures方法中獲得,任何成功的更新仍然存在,并且不會回滾,當第一次失敗導致中止時,響應包含由失敗的bulk請求生成的所有失敗。

為了防止版本沖突導致updateByQuery中止,請設置abortOnVersionConflict(false),第一個示例之所以這樣做,是因為它試圖獲取在線映射更改,而版本沖突意味著在相同時間開始updateByQuery和試圖更新文檔的沖突文檔。這很好,因為該更新將獲取在線映射更新。

UpdateByQueryRequestBuilder API支持過濾更新的文檔,限制要更新的文檔總數,并使用腳本更新文檔:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .filter(QueryBuilders.termQuery("level", "awesome"))
    .size(1000)
    .script(new Script(ScriptType.INLINE, "ctx._source.awesome = "absolutely"", "painless", Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

UpdateByQueryRequestBuilder還允許直接訪問用于選擇文檔的查詢,您可以使用此訪問來更改默認的滾動大小,或者以其他方式修改對匹配文檔的請求。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .source().setSize(500);
BulkByScrollResponse response = updateByQuery.get();

您還可以將大小與排序相結合以限制文檔的更新:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").size(100)
    .source().addSort("cat", SortOrder.DESC);
BulkByScrollResponse response = updateByQuery.get();

除了更改文檔的_source字段外,還可以使用腳本更改操作,類似于Update API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .script(new Script(
        ScriptType.INLINE,
        "if (ctx._source.awesome == "absolutely) {"
            + "  ctx.op="noop""
            + "} else if (ctx._source.awesome == "lame") {"
            + "  ctx.op="delete""
            + "} else {"
            + "ctx._source.awesome = "absolutely"}",
        "painless",
        Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

Update API中,可以設置ctx.op的值來更改執行的操作:

noop

如果您的腳本沒有做任何更改,設置ctx.op = "noop"updateByQuery操作將從更新中省略該文檔,這種行為增加了響應主體中的noop計數器。

delete

如果您的腳本決定必須刪除該文檔,設置ctx.op = "delete",刪除將在響應主體中已刪除的計數器中報告。

ctx.op設置為任何其他值都會產生錯誤,在ctx中設置任何其他字段都會產生錯誤。

這個API不允許您移動它所接觸的文檔,只是修改它們的源,這是故意的!我們沒有規定要把文件從原來的位置移走。

您也可以同時對多個索引和類型執行這些操作,類似于search API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("foo", "bar").source().setTypes("a", "b");
BulkByScrollResponse response = updateByQuery.get();

如果提供路由值,則進程將路由值復制到滾動查詢,將進程限制為與路由值匹配的碎片:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source().setRouting("cat");
BulkByScrollResponse response = updateByQuery.get();

updateByQuery也可以通過指定這樣的pipeline來使用ingest節點:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.setPipeline("hurray");
BulkByScrollResponse response = updateByQuery.get();
使用Task API

您可以使用Task API獲取所有正在運行的update-by-query請求的狀態:

ListTasksResponse tasksList = client.admin().cluster().prepareListTasks()
    .setActions(UpdateByQueryAction.NAME).setDetailed(true).get();
for (TaskInfo info: tasksList.getTasks()) {
    TaskId taskId = info.getTaskId();
    BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus();
    // do stuff
}

使用上面所示的TaskId,您可以直接查找任務:

GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();

使用Cancel Task API

任何查詢更新都可以使用Task Cancel API取消:

// Cancel all update-by-query requests
client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks();
// Cancel a specific update-by-query request
client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks();

使用list tasks API查找taskId的值。

取消請求通常是一個非常快速的過程,但可能要花費幾秒鐘的時間,task status API繼續列出任務,直到取消完成。

Rethrottling

在正在運行的更新中,使用_rethrottle API更改requests_per_second的值:

RethrottleAction.INSTANCE.newRequestBuilder(client)
    .setTaskId(taskId)
    .setRequestsPerSecond(2.0f)
    .get();

使用list tasks API查找taskId的值。

updateByQuery API一樣,requests_per_second的值可以是任何正值的浮點值來設置節流的級別,或者Float.POSITIVE_INFINITY禁用節流。requests_per_second值加快進程立即生效,減慢查詢的requests_per_second值在完成當前批處理后生效,以防止滾動超時。

Reindex API

詳情見reindex API

BulkByScrollResponse response = ReindexAction.INSTANCE.newRequestBuilder(client)
    .destination("target_index")
    .filter(QueryBuilders.matchQuery("category", "xzy")) 
    .get();

還可以提供查詢來篩選應該從源索引到目標索引的哪些文檔。

文章版權歸作者所有,未經允許請勿轉載,若此文章存在違規行為,您可以聯系管理員刪除。

轉載請注明本文地址:http://specialneedsforspecialkids.com/yun/69697.html

相關文章

  • Elasticsearch Java API 6.2java client)

    摘要:高級客戶端目前支持更常用的,但還有很多東西需要補充,您可以通過告訴我們您的應用程序需要哪些缺失的來幫助我們優化優先級,通過向這個添加注釋高級客戶端完整性。傳輸客戶端排除非數據節點的原因是為了避免將搜索流量發送給主節點。 前言 本節描述了Elasticsearch提供的Java API,所有的Elasticsearch操作都使用客戶端對象執行,所有操作本質上都是完全異步的(要么接收監聽器...

    Gu_Yan 評論0 收藏0
  • Elasticsearch Java High Level REST Client(入門)

    摘要:入門本節描述從獲取工件到在應用程序中使用它如何開始使用高級別客戶端。保證能夠與運行在相同主版本和大于或等于的次要版本上的任何節點通信。與具有相同的發布周期,將版本替換為想要的客戶端版本。 Java High Level REST Client 入門 本節描述從獲取工件到在應用程序中使用它如何開始使用高級別REST客戶端。 兼容性 Java High Level REST Client需...

    honmaple 評論0 收藏0

發表評論

0條評論

最新活動
閱讀需要支付1元查看
<