开发时遇到需要连接多个ES的需求,类似于连接多个MySQL数据库一样。

Elasticsearch Java API有四类client连接方式

  • TransportClient
  • RestClient
  • Jest
  • Spring Data Elasticsearch


        其中TransportClient和RestClient是Elasticsearch原生的api。TransportClient可以支持2.x,5.x版本,TransportClient将会在Elasticsearch 7.0弃用并在8.0中完成删除,因此不推荐后续使用;而Jest由于是社区维护,所以更新有一定延迟,目前最新版对接ES6.3.1,近一个月只有四个issue,说明整体活跃度较低,因此也不推荐使用;Spring Data Elasticsearch主要是与Spring生态对接,可以在web系统中整合到Spring中使用。目前比较推荐使用官方的高阶、低阶Rest Client,官方维护,比较值得信赖。

项目中使用后Transport Client连接ES,Transport Client是Elasticsearch原生的api,TransportClient可以支持2.x,5.x版本,TransportClient将会在Elasticsearch 7.0弃用并在8.0中完成删除,因此高版本不推荐此方式。

使用 Spring 注入Bean形式来获取多个bean实例,代码如下。

package cn.org.config;

import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.context.annotation.Scope;
import java.net.InetAddress;
import java.net.UnknownHostException;

@Slf4j
@Configuration
public class EsConfig {

    @Value("${elasticsearch.cluster-nodes.es1}")
    private String es1ClusterNodes;

    @Value("${elasticsearch.cluster-nodes.es2}")
    private String es2ClusterNodes;


    @Bean(name = "es1ElasticsearchClient")
    @Primary
    @Scope("singleton")
    public TransportClient newElasticsearchClient() {
        return clientInit(es1ClusterNodes);
    }

    @Bean(name = "es2ElasticsearchClient")
    @Scope("singleton")
    public TransportClient archiveElasticsearchClient() {
        return clientInit(es2ClusterNodes);
    }

    public TransportClient clientInit(String clusterNodes) {

        Settings settings = Settings.builder()
                .put("cluster.name", "es_cluster")
                .put("client.transport.sniff", true)
                .build();

        TransportClient transportClient = null;
        try {
            transportClient = new PreBuiltTransportClient(settings);
            String[] allEsIpPort = clusterNodes.split(",");
            String esAddress = allEsIpPort[0];
            String InetSocket[] = esAddress.split(":");
            String address = InetSocket[0];
            Integer port = Integer.valueOf(InetSocket[1]);
            transportClient.addTransportAddress(
                    new InetSocketTransportAddress(InetAddress.getByName(address), port));

        } catch (UnknownHostException e) {
            log.error("初始化ES错误:", e);
        }

        return transportClient;
    }

}

 使用时,直接注入多个bean实例进行查询即可。

package cn.org.biz;

import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.ArrayList;

@Service
@Slf4j
public class EsService {

  @Qualifier("es1ElasticsearchClient")
  @Autowired
  private TransportClient es1ElasticsearchClient;

  @Qualifier("es2ElasticsearchClient")
  @Autowired
  private TransportClient es2ElasticsearchClient;

  public List<MsgBean> doQueryMsgFromEs1(String userName, String type) {
    List<MsgBean> list = new ArrayList<>();
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    BoolQueryBuilder queryBuilder =  QueryBuilders.boolQuery();
    queryBuilder.must(QueryBuilders.matchQuery("userName", userName));
    sourceBuilder.query(queryBuilder);
    SearchRequestBuilder builder = es1ElasticsearchClient.prepareSearch(type + "-*");
    SearchResponse searchResponse = builder.setQuery(sourceBuilder.query()).get();
    if (searchResponse.getHits().getTotalHits() > 0) {
      for (SearchHit searchHit : searchResponse.getHits()) {
        MsgBean msgBean = JSON.parseObject(searchHit.getSourceAsString(), MsgBean.class);
        list.add(msgBean);
      }
    }
    return list;
  }

  public List<MsgBean> doQueryMsgFromEs2(String userName, String certType) {
    List<MsgBean> list = new ArrayList<>();
    SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
    BoolQueryBuilder queryBuilder =  QueryBuilders.boolQuery();
    queryBuilder.must(QueryBuilders.matchQuery("userName", userName));
    sourceBuilder.query(queryBuilder);
    SearchRequestBuilder builder = es2ElasticsearchClient.prepareSearch(type + "-*");
    SearchResponse searchResponse = builder.setQuery(sourceBuilder.query()).get();
    if (searchResponse.getHits().getTotalHits() > 0) {
      for (SearchHit searchHit : searchResponse.getHits()) {
        MsgBean msgBean = JSON.parseObject(searchHit.getSourceAsString(), MsgBean.class);
        list.add(msgBean);
      }
    }
    return list;
  }


}

In this article, we will discuss about “How to create a Spring Boot + Spring Data + Elasticsearch Example”.

Tools used in this article :

  1. Spring Boot 1.5.1.RELEASE
  2. Spring Boot Starter Data Elasticsearch 1.5.1.RELEASE
  3. Spring Data Elasticsearch 2.10.RELEASE
  4. Elasticsearch 2.4.4
  5. Maven
  6. Java 8

Note
SpringBoot 1.5.1.RELEASE and Spring Data Elasticsearch 2.10.RELEASE supports only ElasticSearch 2.4.0. They don’t support the latest version of ElasticSearch 5.x version. Read this – Spring Data Elasticsearch Spring Boot version matrix

Related – Elasticsearch Basics

更多文章请关注《万象专栏》