一架梯子,一头程序猿,仰望星空!

Java Elasticsearch Bulk API 批量操作


ES的Bulk API主要用于在单个请求中,批量执行创建、更新、删除文档操作,避免循环发送大量的ES请求。

创建BulkRequest对象

BulkRequest request = new BulkRequest(); 

添加操作对象

支持index/update/delete操作。

批量index操作

// 通过add方法,添加IndexRequest对象,创建文档,下面插入3个文档
// ps: IndexRequest对象,以键值对的方式设置文档内容
request.add(new IndexRequest("posts").id("1")  
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts").id("2")  
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts").id("3")  
        .source(XContentType.JSON,"field", "baz"));

混合操作

批量执行文档的删除、更新、创建操作

request.add(new DeleteRequest("posts", "3")); 
request.add(new UpdateRequest("posts", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4")  
        .source(XContentType.JSON,"field", "baz"));

其他可选参数

timeout

设置请求超时时间

request.timeout(TimeValue.timeValueMinutes(2)); // 形式1:2分钟
request.timeout("2m"); // 形式2:2分钟

执行请求

BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

处理结果

if (bulkResponse.hasFailures()) { 
   // 至少存在一个错误处理
}

// 循环检测批量操作结果
for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 
    // 根据操作类型检测执行结果
    switch (bulkItemResponse.getOpType()) {
    case INDEX:    
    case CREATE:
        // 处理创建请求
        IndexResponse indexResponse = (IndexResponse) itemResponse;
        break;
    case UPDATE:   
        // 处理更新请求
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
        break;
    case DELETE:  
        // 处理删除请求 
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}