ES的Bulk API主要用于在单个请求中,批量执行创建、更新、删除文档操作,避免循环发送大量的ES请求。
创建BulkRequest对象
BulkRequest request = new BulkRequest();
添加操作对象
支持index/update/delete操作。
批量index操作
// 通过add方法,添加IndexRequest对象,创建文档,下面插入3个文档
// ps: IndexRequest对象,以键值对的方式设置文档内容
request.add(new IndexRequest("posts").id("1")
.source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts").id("2")
.source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts").id("3")
.source(XContentType.JSON,"field", "baz"));
混合操作
批量执行文档的删除、更新、创建操作
request.add(new DeleteRequest("posts", "3"));
request.add(new UpdateRequest("posts", "2")
.doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4")
.source(XContentType.JSON,"field", "baz"));
其他可选参数
timeout
设置请求超时时间
request.timeout(TimeValue.timeValueMinutes(2)); // 形式1:2分钟
request.timeout("2m"); // 形式2:2分钟
执行请求
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
处理结果
if (bulkResponse.hasFailures()) {
// 至少存在一个错误处理
}
// 循环检测批量操作结果
for (BulkItemResponse bulkItemResponse : bulkResponse) {
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
// 根据操作类型检测执行结果
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
// 处理创建请求
IndexResponse indexResponse = (IndexResponse) itemResponse;
break;
case UPDATE:
// 处理更新请求
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
break;
case DELETE:
// 处理删除请求
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}