Java ElasticSearch客户端使用示例

es的引入及配置

要想使用Java版的es客户端,首先需要在pom.xml引入如下依赖

1
2
3
4
5
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.4.0</version>
</dependency>

新建配置文件,这样可以使用自动注入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.haofly.net.common.elasticsearch.config;

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class EsConfig {
private String host = "127.0.0.1";
private Integer port = 9202;

@Bean
public HttpHost getHttpHost(){
return new HttpHost(host,port);
}
@Bean(destroyMethod = "close")
public RestHighLevelClient getHlvClient(){
return new RestHighLevelClient(RestClient.builder(getHttpHost()));
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public Integer getPort() {
return port;
}
public void setPort(Integer port) {
this.port = port;
}
}

在代码层,只需要直接注入即可,例如在Servic种可以这样引入:

1
2
3
4
5
6
7
8
9
10
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
public class EsService {
@Autowired
RestHighLevelClient restHighLevelClient;
}

java es的增删改查

基本数据定义,下面的示例种出现的数据原型如下

1
2
3
4
5
6
7
8
9
10
String index = "索引"
String type = "类型";
String primaryId = "主键ID";
String data = "{\"field\":\"JSON格式字符串\"}";
HashMap<String, String> datas = new HashMap<>();
datas.put("4", "{\"bbbbbbbbbbbbbb\":\"cccccccccc\"}");
String gteTime = "1234567890000";
String lteTime = "1234567890000";
Integer from = 12345; // 偏移量
Integer size = 1000; // 每页数量
查询搜索
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
SearchRequest request = new SearchRequest(index);

// 构造查询请求
SearchSourceBuilder ssb = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

// 时间范围参数
RangeQueryBuilder rangeQueryBuilder = new RangeQueryBuilder("@timestamp");
rangeQueryBuilder.gte(gteTime).lte(lteTime);
boolQueryBuilder.must(rangeQueryBuilder);

// 查询语句参数
QueryStringQueryBuilder queryStringQueryBuilder = new QueryStringQueryBuilder(query);
boolQueryBuilder.must(queryStringQueryBuilder);

ssb.query(boolQueryBuilder);
ssb.from(from); // 设置偏移量
ssb.size(size); // 设置每页数量

HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.field("*"); // 获取所有字段
ssb.highlighter(highlightBuilder);
request.source(ssb);

// 发送查询请求
SearchResponse searchResponse = restHighLevelClient.search(
request, RequestOptions.DEFAULT);

以上的查询会解析成类似于这样的查询参数(索引是单独的参数)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
"from": 9000,
"size": 1000,
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"from": "1546272000000",
"to": "1577808000000",
"include_lower": true,
"include_upper": true,
"boost": 1
}
}
}
],
"adjust_pure_negative": true,
"boost": 1
}
},
"highlight": {
"fields": {
"*": {}
}
}
}
单条文档操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 单条插入
IndexRequest request = new IndexRequest(index, type, primaryId);
request.source(data, XContentType.JSON);
restHighLevelClient.index(request, RequestOptions.DEFAULT);

// 更新或插入单条doc,有就更新,没有就创建
UpdateRequest request = new UpdateRequest(index, type, primaryId)
.doc(data, XContentType.JSON)
.upsert(data, XContentType.JSON);
restHighLevelClient.update(request, RequestOptions.DEFAULT);

// 单条更新
UpdateRequest request = new UpdateRequest(index, type, primaryId)
.doc(data, XContentType.JSON);
restHighLevelClient.update(request, RequestOptions.DEFAULT);

// 单条删除
DeleteRequest request = new DeleteRequest(index, type, primaryId);
restHighLevelClient.delete(request, RequestOptions.DEFAULT);
批量操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// 批量插入doc
BulkRequest request = new BulkRequest();
for (Map.Entry<String, String> data : datas.entrySet()) {
request.add(new IndexRequest(index, type, data.getKey())
.source(data.getValue(), XContentType.JSON));
}
BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
if (bulkResponse == null || !bulkResponse.hasFailures()) {
return bulkResponse == null ? 0 : datas.size();
}
Integer success = 0; // 插入成功的数量
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
success++;
}
}

// 批量更新doc
BulkRequest request = new BulkRequest();
for (Map.Entry<String, String> data : datas.entrySet()) {
request.add(new UpdateRequest(index, type, data.getKey())
.doc(data.getValue(), XContentType.JSON));
}
BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
if (bulkResponse == null || !bulkResponse.hasFailures()) {
return bulkResponse == null ? 0 : datas.size();
}
Integer success = 0; // 更新成功的数量
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
success++;
}
}

// 批量删除,根据ID进行删除
BulkRequest request = new BulkRequest();
for (String id : ids) {
request.add(new DeleteRequest(index, type, id));
}
BulkResponse bulkResponse = restHighLevelClient.bulk(request, RequestOptions.DEFAULT);
if (bulkResponse == null || !bulkResponse.hasFailures()) {
return bulkResponse == null ? 0 : ids.size();
}
Integer success = 0; // 删除成功的数量
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
success++;
}
}