ElasticSearch公共方法封装

news/2025/2/22 7:01:34

业务场景

1、RestClientBuilder初始化(同时支持单机与集群)
2、发送ES查询请求公共方法封装(支持sql、kql、代理访问、集群访问、鉴权支持)
3、判断ES索引是否存在/_cat/indices/${indexName})
4、判断ES索引别名是否存在 (/_cat/aliases/${indexName})
5、判断ES索引指定字段/属性是否存在(这里字段支持多级,如:logObj.id)
6、判断ES索引指定字段/属性的类型(字段支持多级)
7、阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
8、创建索引别名(可用于支持sql查询,索引名中有特殊字符不能用作表名,可通过创建别名来解决)
9、索引别名创建结果解析( 判断acknowledged)
10、KQL查询ES(Kibana语法查询ElasticSearch)
11、SQL查询ES(标准SQL语法查询ElasticSearch)
12、Java在本地通过代理访问ES(可用于解决网络不能直接的问题)
13、Java 客户端访问ES集群(同时支持单机与集群)
14、Java ES客户端鉴权(安全需求)

软件环境

ElasticSearch 7.17.23 下载地址

ElasticSearch 7.17.23 帮助文档

ElasticSearch 8.17.2 下载地址

ElasticSearch 8.17.2 帮助文档

说明:当前例子中用的7,理论上8也通用

Kibana查询效果

KQL查询ES

SQL查询ES

下面讲java代码实现

Java类方法详解

1、RestClientBuilder初始化

同时支持单机与集群

/**
     * RestClientBuilder 初始化
     *
     * @param host 同时支持单机与集群
     *        单机:host和port各司其职
     *        集群时:port参数无效,host中包含IP和PORT,多个实例用逗号分隔
     *
     *    eg:
     *         10.***.6.247
     *     或: host:10.***.6.247:9200,10.***.6.34:9200,10.***.6.120:9200
     * @param port
     * @return
     */
    private RestClientBuilder buildClient(String host, Integer port){
        RestClientBuilder restClientBuilder = null;
        if(host.indexOf(",")==-1)
        {
            // 单机 host:10.***.6.247, 只有单机会使用port参数
            restClientBuilder = RestClient.builder(new HttpHost( host, port, "http" ) );
        }
        else
        {
            // 集群 host:10.***.6.247:9200,10.***.6.34:9200,10.***.6.120:9200,10.***.6.9:9200,10.***.6.183:9200
            String[] hostArr = host.split("\\,");
            HttpHost[] httpHosts = new HttpHost[hostArr.length];
            for( int i=0; i<hostArr.length; i++ )
            {
                String[] addrs = hostArr[i].split("\\:");
                HttpHost httpHost = new HttpHost( addrs[0],  Integer.valueOf(addrs[1]), "http" );
                httpHosts[i] = httpHost;
            }
            restClientBuilder = RestClient.builder( httpHosts );
        }
        return restClientBuilder;
    }

2、发送ES查询请求公共方法

  • SQL支持
  • KQL支持
  • 支持代理访问支持
  • 鉴权支持
/**
     * 发送ES 查询请求
     *
     * @param host
     * @param port
     * @param username
     * @param password
     * @param method
     * @param endpoint ES接口
     *      eg:
     *        1、创建别名: "/_aliases"
     *        2、判断索引是否存在: "/_cat/indices/myIndexName"
     *        2、判断索引别名是否存在: "/_cat/aliases/indexName"
     *
     * @param jsonEntity 查询语句
     *      eg:
     *      1、为索引创建别名(可用于支持sql查询,如果使用sql查询时原索引名中有特殊字符不能用作表名,可通过创建别名来解决)
     *         String kqlJson = "{\"actions\" : [{ \"add\" : { \"index\" : \""+idxName+"\",\"alias\" : \""+idxAliases+"\" } }]}  ";
     *         String kqlJson= "";
     *      2、String jsonEntity = "{\"query\": \"" + sqlQuery2 + "\", " +
     *                 "\"params\": ["+cStart+","+cEnd+"]," +
     *                 " \"fetch_size\": 65536 }";
     *
     *  实际业务场景举例:
     *  POST _sql?format=txt
     * {
     *   "query": "SELECT tags.svc_code, sum(iif(tags.response_code.keyword='0000',1,0)) as success_count, count(metric) as total
     *               FROM order_service_****
     *              where create_time between '2025-02-27T11:00:00+0800' and '2025-02-27T13:59:00+0800'
     *              group by tags.svc_code having count(metric)>=50
     *              order by 3 desc",
     *   "fetch_size": 65536
     * }
     *
     * @return
     * @throws IOException
     */
    public String request(String host, Integer port,
                          String username, String password,
                          String method, String endpoint, String jsonEntity) throws IOException {

        RestClientBuilder restClientBuilder = buildClient( host, port );

        if(!StringUtils.isEmpty(username)){
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    //线程设置
                    httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());

                    /* ******
                    *  能直连ES的不需要;如果本地不能直连ES的则加上,IP根据实际调整
                    *
                    *  */
                    if("dev".equals(profile) && host.indexOf("10.***.137")==0 ) {
                        httpClientBuilder.setProxy(
                                new HttpHost("10.***.248.54", 8443, "http")  //设置代理服务

                        );
                    }else if("dev".equals(profile)  && host.indexOf("10.***.6")==0  ){
                        httpClientBuilder.setProxy(
                                new HttpHost( "192.***.66.30", 8443,"http")  //设置代理服务
                        );
                    }
                    return httpClientBuilder;
                }
            });
        }

        RestClient restClient = restClientBuilder.build();

        Request request = new Request(method, endpoint );
        request.setJsonEntity( jsonEntity );

        Response response = restClient.performRequest(request);
        HttpEntity entity=response.getEntity();

        restClient.close();

        entity = new BufferedHttpEntity(entity);

        return EntityUtils.toString(entity);
    }

3、判断ES索引是否存在

 /**
     *  判断索引名是否存在
     *
     * @param host
     * @param port
     * @param username
     * @param password
     * @param indexName 索引名
     * @return
     * @throws IOException
     */
    public boolean isExistsIndex( String host, Integer port,
                                  String username, String password,
                                  String indexName ) throws IOException {

        RestClientBuilder restClientBuilder = buildClient( host, port );

        if(!StringUtils.isEmpty(username)){
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    //线程设置
                    httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());

                    return httpClientBuilder;
                }
            });
        }

        RestClient restClient = restClientBuilder.build();

        Request request = new Request("GET", "/_cat/indices/"+indexName );
        try{
            Response response = restClient.performRequest(request);
            HttpEntity entity=response.getEntity();
            entity = new BufferedHttpEntity(entity);
            if (!StringUtils.hasLength(EntityUtils.toString(entity))) {
                System.out.println("Index exists.");
                return true;
            } else {
                System.out.println("Index does not exist.");
                return false;
            }
        }catch (Exception e){
            //如果不存在会报404的错误,返回false创建别名
            return false;
        }finally {
            restClient.close();
        }
    }

4、判断ES索引别名是否存在


    /**
     * 判断索引别名是否存在
     *
     * @param host
     * @param port
     * @param username
     * @param password
     * @param indexName
     * @return
     * @throws IOException
     */
    public boolean isExistsAliases( String host, Integer port,
                                    String username, String password,
                                    String indexName ) throws IOException {

        RestClientBuilder restClientBuilder = buildClient( host, port );

        if(!StringUtils.isEmpty(username)){
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    //线程设置
                    httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());

                    return httpClientBuilder;
                }
            });
        }

        RestClient restClient = restClientBuilder.build();

        Request request = new Request("GET", "/_cat/aliases/"+indexName );

        Response response = restClient.performRequest(request);
        HttpEntity entity=response.getEntity();

        restClient.close();

        entity = new BufferedHttpEntity(entity);

        if (!StringUtils.isEmpty(EntityUtils.toString(entity))) {
            System.out.println("Index alias exists.");
            return true;
        } else {
            System.out.println("Index alias does not exist.");
            return false;
        }
    }

5、获取ES索引指定字段/属性是否存在

这里字段支持多级,如:logObj.id
/**
     * 判断索引 某个字段/属性是否存在
     *   说明: 这里字段支持多级,如:logObj.id
     *
     * @param host
     * @param port
     * @param username
     * @param password
     * @param indexName
     * @param property  eg:id、  logObj.id
     * @return
     * @throws IOException
     */
    public boolean isExistsProperty( String host, Integer port,
                                     String username, String password,
                                     String indexName, String property ) throws IOException {

        RestClientBuilder restClientBuilder = buildClient( host, port );

        if(!StringUtils.isEmpty(username)){
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    //线程设置
                    httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());

                    return httpClientBuilder;
                }
            });
        }

        RestClient restClient = restClientBuilder.build();

        Request request = new Request("GET", "/"+indexName+"/_mapping" );

        Response response = restClient.performRequest(request);
        HttpEntity entity=response.getEntity();

        restClient.close();
        entity = new BufferedHttpEntity(entity);

        JSONObject obj = JSONObject.parseObject(EntityUtils.toString(entity));

        JSONObject properties = obj.getJSONObject( obj.keySet().iterator().next() )
                .getJSONObject("mappings" )
                .getJSONObject("properties" );

        String[] arr = property.split("\\.");

        for( int i=0; i<arr.length; i++ ){
            if(i==arr.length-1){

            }else{
                if(properties.containsKey( arr[i] )){
                    properties = properties.getJSONObject(arr[i]).getJSONObject("properties" );
                }else{
                    return false;
                }
            }
        }

        boolean bool =  properties.containsKey( arr[arr.length-1] );

        log.info("property:{} , isExist:{}", property, bool );
        return bool;
    }

6、获取ES索引指定字段/属性的类型

同一个索引的同一字段,不同时间的数据类型可能不一样,从而影响sql语句的写法(sql语法不一样),所以个别场景要做判断

 /**
     * 判断索引某个字段/属性的类型
     *   说明: 这里字段支持多级,如:logObj.id
     *
     * @param host
     * @param port
     * @param username
     * @param password
     * @param indexName
     * @param property  eg:id、  logObj.id
     * @return
     * @throws IOException
     */
    public String getIndexPropertyType( String host, Integer port,
                                    String username, String password,
                                    String indexName, String property ) throws IOException {

        RestClientBuilder restClientBuilder = buildClient( host, port );

        if(!StringUtils.isEmpty(username)){
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    //线程设置
                    httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());

                    return httpClientBuilder;
                }
            });
        }

        RestClient restClient = restClientBuilder.build();

        Request request = new Request("GET", "/"+indexName+"/_mapping" );

        Response response = restClient.performRequest(request);
        HttpEntity entity=response.getEntity();

        restClient.close();

        entity = new BufferedHttpEntity(entity);

        JSONObject obj = JSONObject.parseObject(EntityUtils.toString(entity));

        JSONObject properties = obj.getJSONObject( obj.keySet().iterator().next() )
                .getJSONObject("mappings" )
                .getJSONObject("properties" );

        String[] arr = property.split("\\.");

        for( int i=0; i<arr.length; i++ ){
            if(i==arr.length-1){
                properties = properties.getJSONObject(arr[i]);
            }else{
                if(properties.containsKey( arr[i] )){
                    properties = properties.getJSONObject(arr[i]).getJSONObject("properties" );
                }else{
                    log.error( "判断字段类型时,发现字段不存在!property:{}", property );
                    throw new RuntimeException( "判断字段类型时,发现字段不存在!property:"+property );
                }
            }
        }
        String type = properties.getString("type" );
        log.info("property:{} , type:{}", property, type );
        return type;
    }

7、阻塞线程直至索引就绪

为了应对跨日时索引名短时间可能不存在的问题(不处理可能导致程序报错)

/**
     * 阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
     *
     * @param host
     * @param port
     * @param username
     * @param password
     * @param indexName
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public void waitIndexReady( String host, Integer port,
                    String username, String password,
                    String indexName ) throws IOException, InterruptedException {

        // 循环一次是10s,6次是1分钟,60次是10分钟
        for( int i=0; i<60 && !this.isExistsIndex(host,   port,
                username,   password,
                indexName) ; i++ ){
            // 共循环10分钟
            Thread.sleep( 10*1000 );
        }
    }

8、创建索引别名

有时候索引名带特殊字符,是sql的关键字,所以创建别名可供sql查询用作表名

    /**
     * 创建索引别名
     *
     * @param username
     * @param password
     * @param host
     * @param port
     * @param idxName 索引名
     * @param idxAliases 索引别名
     * @throws ParseException
     * @return
     */
    public void createAliases(  String username, String password,
                                String host, int port,
                                String idxName, String idxAliases ) throws ParseException {
        log.info("创建索引别名 :{}:{}, {}", host, port, idxAliases );

        String method = "POST";
        String endpoint = "/_aliases";

        // 为索引创建别名,用于支持sql查询
        String kqlJson = "{\"actions\" : [{ \"add\" : { \"index\" : \""+idxName+"\",\"alias\" : \""+idxAliases+"\" } }]}  ";

        try
        {
            String body =null;
            body = this.request(   host,   port,
                    username,   password,
                    method,   endpoint,   kqlJson );
            boolean acknowledged = parseCreateAliasesResult( body );
            if(acknowledged){
                log.info("别名创建成功");
            }else{
                log.error("别名创建失败");
                log.error("别名创建失败 kqlJson:{}", kqlJson );
                throw new RuntimeException("索引别名创建失败!");
            }
        }catch (Exception ex)
        {
            log.error("创建索引别名异常!message:{}",ex.getLocalizedMessage());
            log.error("创建索引别名异常 kqlJson:{}", kqlJson );
            ex.printStackTrace();
        }
    }

9、索引别名创建结果解析

判断创建时返回值中的acknowledged属性值

    /**
     *  判断body 中是否包含 acknowledged
     *
     * @param body
     * @return
     */
    private boolean parseCreateAliasesResult(String body  ){
        JSONObject json = JSONObject.parseObject(body);
        if(json.containsKey("acknowledged") && json.getBoolean("acknowledged")){
            return true;
        }
        return false;
    }

完整代码实现

完整maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>person.brickman</groupId>
    <artifactId>javaProject</artifactId>
    <version>1.0-SNAPSHOT</version>

    <!-- 统一管理jar包版本 -->
    <properties>
        <java.version>17</java.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>

        <spring-boot.version>2.7.18</spring-boot.version>
        <spring-cloud.version>2021.0.9</spring-cloud.version>
        <spring-cloud-starter-bootstrap.version>3.1.9</spring-cloud-starter-bootstrap.version>

        <elasticsearch-client.version>7.17.23</elasticsearch-client.version>
        <commons-lang3.version>3.14.0</commons-lang3.version>
        <fastjson2.version>2.0.53</fastjson2.version>
        <lombok.version>1.18.28</lombok.version>
        <testng.version>6.14.3</testng.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bootstrap</artifactId>
            <version>${spring-cloud-starter-bootstrap.version}</version>
        </dependency>

        <dependency>
            <groupId>org.elasticsearch.client</groupId>
            <artifactId>elasticsearch-rest-high-level-client</artifactId>
            <version>${elasticsearch-client.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.httpcomponents.client5</groupId>
            <artifactId>httpclient5</artifactId>
            <version>5.3</version>
        </dependency>

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
            <version>${commons-lang3.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>${fastjson2.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <optional>true</optional>
        </dependency>

        <!-- 测试相关 默认集成junit5 作者用testng,所以排除掉junit5  -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.jupiter</groupId>
                    <artifactId>junit-jupiter</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.mockito</groupId>
                    <artifactId>mockito-junit-jupiter</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>${testng.version}</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-dependencies</artifactId>
                <version>${spring-boot.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-configuration-processor</artifactId>
                <optional>true</optional>
            </dependency>

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
</project>

完整ES公共组件类

package person.brickman.es;

import com.alibaba.fastjson2.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.BufferedHttpEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.impl.nio.reactor.IOReactorConfig;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;

import java.io.IOException;
import java.text.ParseException;

/**
 * @Description: ES公共方法类(公共组件)
 *   1、RestClientBuilder初始化(同时支持单机与集群)
 *   2、发送ES查询请求公共方法封装(支持sql、kql、代理访问、集群访问、鉴权)
 *   3、判断ES索引是否存在(/_cat/indices/${indexName})
 *   4、判断ES索引别名是否存在 (/_cat/aliases/${indexName})
 *   5、判断ES索引指定字段/属性是否存在(这里字段支持多级,如:logObj.id)
 *   6、判断ES索引指定字段/属性的类型(字段支持多级)
 *   7、阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
 *   8、创建索引别名(可用于支持sql查询,索引名中有特殊字符不能用作表名,可通过创建别名来解决)
 *   9、索引别名创建结果解析( 判断acknowledged)
 *
 * @Author: brickman
 * @CreateDate: 2025/2/20 23:46
 * @Version: 1.0
 */
@Slf4j
@Service
public class ESRestClientService {

    @Value("${spring.profiles.active}")
    private String profile;

    /**
     * RestClientBuilder 初始化
     *
     * @param host 同时支持单机与集群
     *        单机:host和port各司其职
     *        集群时:port参数无效,host中包含IP和PORT,多个实例用逗号分隔
     *
     *    eg:
     *         10.***.6.247
     *     或: host:10.***.6.247:9200,10.***.6.34:9200,10.***.6.120:9200
     * @param port
     * @return
     */
    private RestClientBuilder buildClient(String host, Integer port){
        RestClientBuilder restClientBuilder = null;
        if(host.indexOf(",")==-1)
        {
            // 单机 host:10.***.6.247, 只有单机会使用port参数
            restClientBuilder = RestClient.builder(new HttpHost( host, port, "http" ) );
        }
        else
        {
            // 集群 host:10.***.6.247:9200,10.***.6.34:9200,10.***.6.120:9200,10.***.6.9:9200,10.***.6.183:9200
            String[] hostArr = host.split("\\,");
            HttpHost[] httpHosts = new HttpHost[hostArr.length];
            for( int i=0; i<hostArr.length; i++ )
            {
                String[] addrs = hostArr[i].split("\\:");
                HttpHost httpHost = new HttpHost( addrs[0],  Integer.valueOf(addrs[1]), "http" );
                httpHosts[i] = httpHost;
            }
            restClientBuilder = RestClient.builder( httpHosts );
        }
        return restClientBuilder;
    }

    /**
     * 发送ES 查询请求,包含sql、kql、代理访问、鉴权支持
     *
     * @param host 同时支持单机与集群
     *             单机:host和port各司其职
     *             集群时:port参数无效,host中包含IP和PORT,多个实例用逗号分隔
     * @param port
     * @param username
     * @param password
     * @param method
     * @param endpoint ES接口
     *      eg:
     *        1、创建别名: "/_aliases"
     *        2、判断索引是否存在: "/_cat/indices/myIndexName"
     *        2、判断索引别名是否存在: "/_cat/aliases/indexName"
     *
     * @param jsonEntity 查询语句
     *      eg:
     *      1、为索引创建别名(可用于支持sql查询,如果使用sql查询时原索引名中有特殊字符不能用作表名,可通过创建别名来解决)
     *         String kqlJson = "{\"actions\" : [{ \"add\" : { \"index\" : \""+idxName+"\",\"alias\" : \""+idxAliases+"\" } }]}  ";
     *         String kqlJson= "";
     *      2、String jsonEntity = "{\"query\": \"" + sqlQuery2 + "\", " +
     *                 "\"params\": ["+cStart+","+cEnd+"]," +
     *                 " \"fetch_size\": 65536 }";
     *
     *  实际业务场景举例:
     *  POST _sql?format=txt
     * {
     *   "query": "SELECT tags.svc_code, sum(iif(tags.response_code.keyword='0000',1,0)) as success_count, count(metric) as total
     *               FROM order_service_****
     *              where create_time between '2025-02-27T11:00:00+0800' and '2025-02-27T13:59:00+0800'
     *              group by tags.svc_code having count(metric)>=50
     *              order by 3 desc",
     *   "fetch_size": 65536
     * }
     *
     * @return
     * @throws IOException
     */
    public String request(String host, Integer port,
                          String username, String password,
                          String method, String endpoint, String jsonEntity) throws IOException {

        RestClientBuilder restClientBuilder = buildClient( host, port );

        if(!StringUtils.isEmpty(username)){
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    //线程设置
                    httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());

                    /* ******
                    *  本地能直连ES的不需要;如果本地不能直连ES的则加上,IP根据实际调整
                    *
                    *  */
                    if("dev".equals(profile) && host.indexOf("10.***.137")==0 ) {
                        httpClientBuilder.setProxy(
                                new HttpHost("10.***.248.54", 8443, "http")  //设置代理服务

                        );
                    }else if("dev".equals(profile)  && host.indexOf("10.***.6")==0  ){
                        httpClientBuilder.setProxy(
                                new HttpHost( "192.***.66.30", 8443,"http")  //设置代理服务
                        );
                    }
                    return httpClientBuilder;
                }
            });
        }

        RestClient restClient = restClientBuilder.build();

        Request request = new Request(method, endpoint );
        request.setJsonEntity( jsonEntity );

        Response response = restClient.performRequest(request);
        HttpEntity entity=response.getEntity();

        restClient.close();

        entity = new BufferedHttpEntity(entity);

        return EntityUtils.toString(entity);
    }

    /**
     *  判断索引名是否存在
     *
     * @param host 同时支持单机与集群
     *             单机:host和port各司其职
     *             集群时:port参数无效,host中包含IP和PORT,多个实例用逗号分隔
     * @param port
     * @param username
     * @param password
     * @param indexName 索引名
     * @return
     * @throws IOException
     */
    public boolean isExistsIndex( String host, Integer port,
                                  String username, String password,
                                  String indexName ) throws IOException {

        RestClientBuilder restClientBuilder = buildClient( host, port );

        if(!StringUtils.isEmpty(username)){
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    //线程设置
                    httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());

                    return httpClientBuilder;
                }
            });
        }

        RestClient restClient = restClientBuilder.build();

        Request request = new Request("GET", "/_cat/indices/"+indexName );
        try{
            Response response = restClient.performRequest(request);
            HttpEntity entity=response.getEntity();
            entity = new BufferedHttpEntity(entity);
            if (!StringUtils.hasLength(EntityUtils.toString(entity))) {
                System.out.println("Index exists.");
                return true;
            } else {
                System.out.println("Index does not exist.");
                return false;
            }
        }catch (Exception e){
            //如果不存在会报404的错误,返回false创建别名
            return false;
        }finally {
            restClient.close();
        }
    }

    /**
     * 判断索引别名是否存在
     *
     * @param host
     * @param port
     * @param username
     * @param password
     * @param indexName
     * @return
     * @throws IOException
     */
    public boolean isExistsAliases( String host, Integer port,
                                    String username, String password,
                                    String indexName ) throws IOException {

        RestClientBuilder restClientBuilder = buildClient( host, port );

        if(!StringUtils.isEmpty(username)){
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    //线程设置
                    httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());

                    return httpClientBuilder;
                }
            });
        }

        RestClient restClient = restClientBuilder.build();

        Request request = new Request("GET", "/_cat/aliases/"+indexName );

        Response response = restClient.performRequest(request);
        HttpEntity entity=response.getEntity();

        restClient.close();

        entity = new BufferedHttpEntity(entity);

        if (!StringUtils.isEmpty(EntityUtils.toString(entity))) {
            System.out.println("Index alias exists.");
            return true;
        } else {
            System.out.println("Index alias does not exist.");
            return false;
        }
    }

    /**
     * 判断索引 某个字段/属性是否存在
     *   说明: 这里字段支持多级,如:logObj.id
     *
     * @param host
     * @param port
     * @param username
     * @param password
     * @param indexName
     * @param property  eg:id、  logObj.id
     * @return
     * @throws IOException
     */
    public boolean isExistsProperty( String host, Integer port,
                                     String username, String password,
                                     String indexName, String property ) throws IOException {

        RestClientBuilder restClientBuilder = buildClient( host, port );

        if(!StringUtils.isEmpty(username)){
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    //线程设置
                    httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());

                    return httpClientBuilder;
                }
            });
        }

        RestClient restClient = restClientBuilder.build();

        Request request = new Request("GET", "/"+indexName+"/_mapping" );

        Response response = restClient.performRequest(request);
        HttpEntity entity=response.getEntity();

        restClient.close();
        entity = new BufferedHttpEntity(entity);

        JSONObject obj = JSONObject.parseObject(EntityUtils.toString(entity));

        JSONObject properties = obj.getJSONObject( obj.keySet().iterator().next() )
                .getJSONObject("mappings" )
                .getJSONObject("properties" );

        String[] arr = property.split("\\.");

        for( int i=0; i<arr.length; i++ ){
            if(i==arr.length-1){

            }else{
                if(properties.containsKey( arr[i] )){
                    properties = properties.getJSONObject(arr[i]).getJSONObject("properties" );
                }else{
                    return false;
                }
            }
        }

        boolean bool =  properties.containsKey( arr[arr.length-1] );

        log.info("property:{} , isExist:{}", property, bool );
        return bool;
    }

    /**
     * 判断索引某个字段/属性的类型
     *   说明: 这里字段支持多级,如:logObj.id
     *   同一个索引的同一字段,不同时间的数据类型可能不一样,从而影响sql语句的写法(sql语法不一样),所以个别场景要做判断
     *
     * @param host
     * @param port
     * @param username
     * @param password
     * @param indexName
     * @param property  eg:id、  logObj.id
     * @return
     * @throws IOException
     */
    public String getIndexPropertyType( String host, Integer port,
                                    String username, String password,
                                    String indexName, String property ) throws IOException {

        RestClientBuilder restClientBuilder = buildClient( host, port );

        if(!StringUtils.isEmpty(username)){
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
            restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
                @Override
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
                    httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                    //线程设置
                    httpClientBuilder.setDefaultIOReactorConfig(IOReactorConfig.custom().setIoThreadCount(2).build());

                    return httpClientBuilder;
                }
            });
        }

        RestClient restClient = restClientBuilder.build();

        Request request = new Request("GET", "/"+indexName+"/_mapping" );

        Response response = restClient.performRequest(request);
        HttpEntity entity=response.getEntity();

        restClient.close();

        entity = new BufferedHttpEntity(entity);

        JSONObject obj = JSONObject.parseObject(EntityUtils.toString(entity));

        JSONObject properties = obj.getJSONObject( obj.keySet().iterator().next() )
                .getJSONObject("mappings" )
                .getJSONObject("properties" );

        String[] arr = property.split("\\.");

        for( int i=0; i<arr.length; i++ ){
            if(i==arr.length-1){
                properties = properties.getJSONObject(arr[i]);
            }else{
                if(properties.containsKey( arr[i] )){
                    properties = properties.getJSONObject(arr[i]).getJSONObject("properties" );
                }else{
                    log.error( "判断字段类型时,发现字段不存在!property:{}", property );
                    throw new RuntimeException( "判断字段类型时,发现字段不存在!property:"+property );
                }
            }
        }
        String type = properties.getString("type" );
        log.info("property:{} , type:{}", property, type );
        return type;
    }

    /**
     * 阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
     *
     * @param host
     * @param port
     * @param username
     * @param password
     * @param indexName
     * @throws IOException
     * @throws InterruptedException
     */
    public void waitIndexReady( String host, Integer port,
                    String username, String password,
                    String indexName ) throws IOException, InterruptedException {

        // 循环一次是10s,6次是1分钟,60次是10分钟
        for( int i=0; i<60 && !this.isExistsIndex(host,   port,
                username,   password,
                indexName) ; i++ ){
            // 共循环10分钟
            Thread.sleep( 10*1000 );
        }
    }

    /**
     * 创建索引别名
     *   有时候索引名带特殊字符,是sql的关键字,所以创建别名可供sql查询用作表名
     *
     * @param username
     * @param password
     * @param host
     * @param port
     * @param idxName 索引名
     * @param idxAliases 索引别名
     * @throws ParseException
     * @return
     */
    public void createAliases(  String username, String password,
                                String host, int port,
                                String idxName, String idxAliases ) throws ParseException {
        log.info("创建索引别名 :{}:{}, {}", host, port, idxAliases );

        String method = "POST";
        String endpoint = "/_aliases";

        // 为索引创建别名,用于支持sql查询
        String kqlJson = "{\"actions\" : [{ \"add\" : { \"index\" : \""+idxName+"\",\"alias\" : \""+idxAliases+"\" } }]}  ";

        try
        {
            String body =null;
            body = this.request(   host,   port,
                    username,   password,
                    method,   endpoint,   kqlJson );
            boolean acknowledged = parseCreateAliasesResult( body );
            if(acknowledged){
                log.info("别名创建成功");
            }else{
                log.error("别名创建失败");
                log.error("别名创建失败 kqlJson:{}", kqlJson );
                throw new RuntimeException("索引别名创建失败!");
            }
        }catch (Exception ex)
        {
            log.error("创建索引别名异常!message:{}",ex.getLocalizedMessage());
            log.error("创建索引别名异常 kqlJson:{}", kqlJson );
            ex.printStackTrace();
        }
    }

    /**
     *  判断body 中是否包含 acknowledged
     *
     * @param body
     * @return
     */
    private boolean parseCreateAliasesResult(String body  ){
        JSONObject json = JSONObject.parseObject(body);
        if(json.containsKey("acknowledged") && json.getBoolean("acknowledged")){
            return true;
        }
        return false;
    }
}

单元测试方法详解

作者用的testng

1、执行检索

/**
     * 执行检索(这里使用sql查询近一分钟的数据)
     *
     * @throws IOException
     * @throws ParseException
     */
    @Test(groups = "hlog", enabled = true )
    public void testRequest() throws IOException, ParseException {
        String method = "POST";
        String endpoint = "/_sql?format=json";

        RangeTimeUtils rangeTimeUtils = new RangeTimeUtils();
//        String time = "2025-02-21 17:48:00";
        String time = TimeUtils.calcWholeMinute();
        rangeTimeUtils.calcAllByTimeAndPeriod(  time, 1);
        //  "yyyy-MM-dd HH:mm:ss"  --> "yyyy-MM-dd'T'HH:mm"  根据实际时间字段格式调整
        SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm" );
        Long cStart = rangeTimeUtils.getStartDate().getTime();
        Long cEnd = rangeTimeUtils.getStartDate().getTime();

        // 原索引名是sql关键字,索引需要创建别名
        SimpleDateFormat sdf_source = new SimpleDateFormat("yyyy.MM.dd" );
        SimpleDateFormat sdf_target = new SimpleDateFormat("yyyy_MM_dd" );
        String idxAliases = "hlog_crm3_comm_undef_"+sdf_target.format(rangeTimeUtils.getEndDate());

        // 生产环境
        String sqlQuery2 = "SELECT logObj.province,server, logObj.app, logObj.node_ip, " +
                "SUBSTRING(logObj.uri,0,30) , count(id) as num, avg(logObj.costTime) latency ,   " +
                "sum(iif(logObj.code=200,1,0))   as success_count " +
                " FROM " + idxAliases + " "+
//                "where cTime between '"+cStart+":00+0800' and '"+cEnd+":00+0800' " +
                " where cTime between "+cStart+" and "+cEnd+" " +
                "group by logObj.province,server, logObj.app, logObj.node_ip,  SUBSTRING(logObj.uri,0,30)  "; // 将index_name、field_name和value替换为相应的索引名称、字段名和值

        // max fetch_size is 65536  不同的版本单次查询最大数据限制不一样,这里测试只查 5 条
        String jsonEntity = "{\"query\": \"" + sqlQuery2 + "\",  \"fetch_size\": 5}";

        String ret =   service.request(  host,   port,
                username,   password,
                method,   endpoint,   jsonEntity );
        log.debug("ret:{}",ret);
    }

2、判断索引是否存在

/**
     * 判断索引是否存在
     * @throws IOException
     */
    @Test(groups = "hlog", enabled = true )
    public void testIsExistsIndex() throws IOException {
        /*  interf_dand_hig_c_trans_2024_12_03
            interf_dand_hig_c_trans_2025_02_20

            interf_dand_comm_undef_2024_12_03
            interf_dand_comm_undef_2025_02_20
         */
        boolean ret =   service.isExistsIndex( host, port,
                username,   password,
                "interf_dand_comm_undef_2025_02_21");
        log.info("ret:{}",ret);
    }

3、判断索引别名是否存在

    /**
     * 判断索引别名是否存在
     * @throws IOException
     */
    @Test(groups = "hlog", enabled = true )
    public void testIsExistsAliases() throws IOException {
        /*  interf_dand_hig_c_trans_2025_02_21
            interf_dand_comm_undef_2025_02_21
         */
        boolean ret =   service.isExistsAliases( host, port,
                username,   password,
                "interf_dand_comm_undef_2025_02_21");
        log.info("ret:{}",ret);
    }

4、判断索引字段时否存在

    /**
     * 判断索引字段时否存在
     * @throws IOException
     */
    @Test(groups = "hlog", enabled = true )
    public void testIsExistsProperty() throws IOException {
        // logObj.node_ip     logObj.status
        boolean ret =   service.isExistsProperty( host, port,
                username, password,
                "interf_dand_comm_undef_2025_02_21", "logObj.status");
        log.info("ret:{}",ret);
    }

5、获取索引列类型

    /**
     * 获取索引列类型
     *
     * @throws IOException
     */
    @Test( enabled = true )
    public void testGetIndexPropertyType() throws IOException {
        /*  interf_dand_hig_c_trans_2025_02_21
            interf_dand_comm_undef_2025_02_21
         */
        String ret =   service.getIndexPropertyType(   host,   port,
                username,   password,
                "interf_dand_comm_undef_2025_02_21","logObj.status"  );
        log.info("ret:{}",ret);
    }

6、阻塞线程直至索引就绪

    /**
     * 阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
     *
     * @throws IOException
     */
    @Test(  enabled = true )
    public void testWaitIndexReady() throws IOException, ParseException, InterruptedException {
        RangeTimeUtils rangeTimeUtils = new RangeTimeUtils();
        String time = "2025-02-21 20:00:00";
        rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );

        service.waitIndexReady( host,   port,
                username,   password,
                "interf_dand_comm_undef_2025_02_21"
        );
    }

7、创建索引别名

    /**
     * 创建索引别名
     *
     * @throws IOException
     */
    @Test(  enabled = true )
    public void testCreateAliases() throws IOException, ParseException {
        RangeTimeUtils rangeTimeUtils = new RangeTimeUtils();
        String time = "2025-02-21 20:00:00";
        rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );

        service.createAliases( username,   password,
                host,   port,
                "interf_dand-comm-undef.2025.02.21","interf_dand_comm_undef_2025_02_21"
        );
    }

8、SQL查询ES

    /**
     * sql查询
     *
     * @throws IOException
     */
    @Test(  enabled = true )
    public void testSQLRequest() throws IOException, ParseException, InterruptedException {
        log.info("实时从ES统计接口请求数量、成功率、延迟指标  (所有接口/不区分接口)服务级 :{}:{}", host, port );

        RangeTimeUtils rangeTimeUtils = new RangeTimeUtils();
        String time = "2025-02-21 20:00:00";
        rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );

        List<Object> ret=null;
        String method = "POST";
        String endpoint = "/_sql?format=json";

        //  根据实际时间字段格式调整
        SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
        long cStart = rangeTimeUtils.getStartDate().getTime();
        long cEnd = rangeTimeUtils.getEndDate().getTime();

        // 原索引名是sql关键字,索引需要创建别名
        SimpleDateFormat sdf_source = new SimpleDateFormat("yyyy.MM.dd" );
        SimpleDateFormat sdf_target = new SimpleDateFormat("yyyy_MM_dd" );
        // interf_dand-comm-undef.2024.01.06
        String idxName = ESConsts.INTERFLOG_INDEX_NAME_PREFIX + sdf_source.format(rangeTimeUtils.getStartDate());
        // interf_dand_comm_undef_2024_01_06
        String idxAliases = ESConsts.INTERFLOG_INDEX_ALIASES_PREFIX + sdf_target.format(rangeTimeUtils.getStartDate());

        //等待索引就绪
        service.waitIndexReady(host,   port,
                username,   password,
                idxName);

        // 判断别名是否存在,不存在则创建
        if(!service.isExistsAliases(  host,   port,
                username,   password,
                idxAliases)){
            service.createAliases(  username,   password,  host,   port,
                   idxName, idxAliases);
        }

        /*   logObj.status.keyword(对应String) 、还是 logObj.status(对应int ) 根据类型来  */
        String statusFieldName = getStatusFieldName(host,   port,
                username,   password,
                idxAliases );

        String sqlQuery2 = "SELECT logObj.province,server, logObj.app, '' node_ip, " +
                "  '' uri, count(id) as num, avg(logObj.cost) latency ,   " +
                "sum(iif("+statusFieldName+"=200,1,0))   as success_count " +
                " FROM " + idxAliases + " " +
                // 算头不算尾
                "where cTime >= ? and cTime < ? " +
                "group by logObj.province,server, logObj.app ";

        // max fetch_size is 65536 , 已验证一次查询结果不会超过这个数,故不用做滚动查询
        String jsonEntity = "{\"query\": \"" + sqlQuery2 + "\", " +
                "\"params\": ["+cStart+","+cEnd+"]," +
                " \"fetch_size\": 10 }"; // 65536 -- 不同的版本单次查询最大数据限制不一样,这里测试只查10条

        String body =null;
        try
        {
            body = service.request(    host,   port,
                    username,   password,
                    method,   endpoint,   jsonEntity );
            // 组装成实际需要的业务类型集合
//            ret = ESIntfLoad3Parser.parseESresult4SvcLoad( body, IDC, host ,port, rangeTime);
        }catch (Exception ex)
        {
            log.error("实时从ES统计接口请求数量、成功率、延迟指标  (所有接口)服务级 指标异常!message:{}",ex.getLocalizedMessage());
            log.error("sqlQuery2:{}", sqlQuery2 );
            log.error("jsonEntity:{}", jsonEntity);
            ex.printStackTrace();
        }
        log.debug(" body:{}", body );
    }

9、KQL查询ES

   /**
     * KQL查询
     *
     * @throws IOException
     */
    @Test(  enabled = true )
    public void testKQLRequest() throws IOException, ParseException, InterruptedException {
        log.info("实时从ES统计接口请求数量、成功率、延迟指标  (所有接口/不区分接口)服务级 :{}:{}", host, port );

        RangeTimeUtils rangeTimeUtils = new RangeTimeUtils();
        String time = "2025-02-21 20:00:00";
        rangeTimeUtils.calcAllByTimeAndPeriod( time, 1 );

        List<Object> ret=null;
        String method = "GET";
        String endpoint = "/interf_dand_comm_undef_2025_02_21/_search";

        SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
        // 这里cStart、cEnd两者相关1分钟,在上面做了初始化
        long cStart = rangeTimeUtils.getStartDate().getTime();
        long cEnd = rangeTimeUtils.getEndDate().getTime();

        // 原索引名是sql关键字,索引需要创建别名
        SimpleDateFormat sdf_source = new SimpleDateFormat("yyyy.MM.dd" );
        SimpleDateFormat sdf_target = new SimpleDateFormat("yyyy_MM_dd" );
        // interf_dand-comm-undef.2024.01.06
        String idxName = ESConsts.INTERFLOG_INDEX_NAME_PREFIX + sdf_source.format(rangeTimeUtils.getStartDate());
        // interf_dand_comm_undef_2024_01_06
        String idxAliases = ESConsts.INTERFLOG_INDEX_ALIASES_PREFIX + sdf_target.format(rangeTimeUtils.getStartDate());

        //等待索引就绪
        service.waitIndexReady(host,   port,
                username,   password,
                idxName);

        // 判断别名是否存在,不存在则创建
        if(!service.isExistsAliases(  host,   port,
                username,   password,
                idxAliases)){
            service.createAliases(  username,   password,  host,   port,
                    idxName, idxAliases);
        }

        String kqlQuery2 = "{ " +
                "    \"bool\": { " +
                "      \"must\": [ " +
                "        { \"match_phrase\": { \"logObj.app\":\"order-service\" } }, " +
                "        { \"range\": { " +
                "          \"cTime\": {  " +
                "            \"gte\": ?, " +
                "            \"lt\": ? " +
                "          } " +
                "        }} " +
                "      ] " +
                "    } " +
                "  }"; // 将index_name、field_name和value替换为相应的索引名称、字段名和值

        // max fetch_size is 65536 , 已验证一次查询结果不会超过这个数,故不用做滚动查询
        String jsonEntity = "{\"query\": \"" + kqlQuery2 + "\", " +
                "\"params\": ["+cStart+","+cEnd+"]," +
                " \"size\": 0,\" +\n" +
                " \"size\": 2 }"; // 65536 -- 不同的版本单次查询最大数据限制不一样, 这里测试只查两条

        String body =null;
        try
        {
            body = service.request(    host,   port,
                    username,   password,
                    method,   endpoint,   jsonEntity );
            // 组装成实际需要的业务类型集合
//            ret = ESIntfLoad3Parser.parseESresult4SvcLoad( body, IDC, host ,port, rangeTime);
        }catch (Exception ex)
        {
            log.error("实时从ES统计接口请求数量、成功率、延迟指标  (所有接口)服务级 指标异常!message:{}",ex.getLocalizedMessage());
            log.error("kqlQuery2:{}", kqlQuery2 );
            log.error("jsonEntity:{}", jsonEntity);
            ex.printStackTrace();
        }
        log.debug(" body:{}", body );
    }

完整单元测试实现

完整单元测试类

package person.brickman.es;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.testng.AbstractTestNGSpringContextTests;
import org.testng.annotations.Test;
import person.brickman.MainApplication;
import person.brickman.constant.ESConsts;
import person.brickman.util.RangeTime;
import person.brickman.util.TimeUtils;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.List;

/**
 *   单元测试类
 *
 *   1、RestClientBuilder初始化(同时支持单机与集群),因每个单元测试方法都会调,所以没有写独立的单元测试方法
 *   2、发送ES查询请求公共方法封装(支持sql、kql、代理访问、集群访问、鉴权),单元测试以SQL查询举例,参数直接拼的(使用占位符示例见:9、10)
 *   3、判断ES索引是否存在(/_cat/indices/${indexName})
 *   4、判断ES索引别名是否存在 (/_cat/aliases/${indexName})
 *   5、判断ES索引指定字段/属性是否存在(这里字段支持多级,如:logObj.id)
 *   6、判断ES索引指定字段/属性的类型(字段支持多级)
 *   7、阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
 *   8、创建索引别名(可用于支持sql查询,索引名中有特殊字符不能用作表名,可通过创建别名来解决)
 *   9、SQL查询ES
 *   10、KQL查询ES
 *
 * @Author:         brickman
 * @CreateDate:     2025/2/21 22:14
 * @Version:        1.0
 */
@Slf4j
@SpringBootTest(classes = MainApplication.class)
public class ESRestClientServiceImplTest extends AbstractTestNGSpringContextTests {

    @Value("${elasticsearch.dand.interf.hosts}")
    private String host;
    @Value("${elasticsearch.dand.interf.port}")
    private int port;
    @Value("${elasticsearch.dand.interf.username}")
    private String username;
    @Value("${elasticsearch.dand.interf.password}")
    private String password;

    @Autowired
    private ESRestClientService service;

    /**
     * 执行检索(这里使用sql查询近一分钟的数据)
     *
     * @throws IOException
     * @throws ParseException
     */
    @Test(groups = "hlog", enabled = true )
    public void testRequest() throws IOException, ParseException {
        String method = "POST";
        String endpoint = "/_sql?format=json";

        RangeTime rangeTime = new RangeTime();
//        String time = "2025-02-21 17:48:00";
        String time = TimeUtils.calcWholeMinute();
        rangeTime.calcAllByTimeAndPeriod(  time, 1);
        //  "yyyy-MM-dd HH:mm:ss"  --> "yyyy-MM-dd'T'HH:mm"  根据实际时间字段格式调整
        SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm" );
//        String cStart = sdf2.format(rangeTime.getStartDate());
//        String cEnd = sdf2.format(rangeTime.getEndDate());
        Long cStart = rangeTime.getStartDate().getTime();
        Long cEnd = rangeTime.getStartDate().getTime();

        // 原索引名是sql关键字,索引需要创建别名
        SimpleDateFormat sdf_source = new SimpleDateFormat("yyyy.MM.dd" );
        SimpleDateFormat sdf_target = new SimpleDateFormat("yyyy_MM_dd" );
//        // hlog-crm3-hig-c-trans-2025.02.21 --》 hlog-crm3-comm-undef.2025.02.21
//        String idxName = "hlog-crm3-hig-c-trans-"+sdf_source.format(rangeTime.getStartDate());
        // hlog_crm3_hig_c_trans_2025_02_21 --》 hlog_crm3_comm_undef_2025_02_21
        String idxAliases = "hlog_crm3_comm_undef_"+sdf_target.format(rangeTime.getEndDate());

        // 生产环境
        String sqlQuery2 = "SELECT logObj.province,server, logObj.app, logObj.node_ip, " +
                "SUBSTRING(logObj.uri,0,30) , count(id) as num, avg(logObj.costTime) latency ,   " +
                "sum(iif(logObj.code=200,1,0))   as success_count " +
                " FROM " + idxAliases + " "+
//                "where cTime between '"+cStart+":00+0800' and '"+cEnd+":00+0800' " +
                " where cTime between "+cStart+" and "+cEnd+" " +
                "group by logObj.province,server, logObj.app, logObj.node_ip,  SUBSTRING(logObj.uri,0,30)  "; // 将index_name、field_name和value替换为相应的索引名称、字段名和值

        // max fetch_size is 65536  不同的版本单次查询最大数据限制不一样,这里测试只查 5 条
        String jsonEntity = "{\"query\": \"" + sqlQuery2 + "\",  \"fetch_size\": 5}";

        String ret =   service.request(  host,   port,
                username,   password,
                method,   endpoint,   jsonEntity );
        log.debug("ret:{}",ret);
    }

    /**
     * 判断索引是否存在
     * @throws IOException
     */
    @Test(groups = "hlog", enabled = true )
    public void testIsExistsIndex() throws IOException {
        /*  hlog_crm3_hig_c_trans_2024_12_03
            hlog_crm3_hig_c_trans_2025_02_20

            hlog_crm3_comm_undef_2024_12_03
            hlog_crm3_comm_undef_2025_02_20
         */
        boolean ret =   service.isExistsIndex( host, port,
                username,   password,
                "hlog_crm3_comm_undef_2025_02_21");
        log.info("ret:{}",ret);
    }

    /**
     * 判断索引别名是否存在
     * @throws IOException
     */
    @Test(groups = "hlog", enabled = true )
    public void testIsExistsAliases() throws IOException {
        /*  hlog_crm3_hig_c_trans_2025_02_21
            hlog_crm3_comm_undef_2025_02_21
         */
        boolean ret =   service.isExistsAliases( host, port,
                username,   password,
                "hlog_crm3_comm_undef_2025_02_21");
        log.info("ret:{}",ret);
    }

    /**
     * 判断索引字段时否存在
     * @throws IOException
     */
    @Test(groups = "hlog", enabled = true )
    public void testIsExistsProperty() throws IOException {
        // logObj.node_ip     logObj.status
        boolean ret =   service.isExistsProperty( host, port,
                username, password,
                "hlog_crm3_comm_undef_2025_02_21", "logObj.status");
        log.info("ret:{}",ret);
    }

    /**
     * 获取索引列类型
     *
     * @throws IOException
     */
    @Test( enabled = true )
    public void testGetIndexPropertyType() throws IOException {
        /*  hlog_crm3_hig_c_trans_2025_02_21
            hlog_crm3_comm_undef_2025_02_21
         */
        String ret =   service.getIndexPropertyType(   host,   port,
                username,   password,
                "hlog_crm3_comm_undef_2025_02_21","logObj.status"  );
        log.info("ret:{}",ret);
    }

    /**
     * 阻塞线程直至索引就绪(为了应对跨日时索引名短时间可能不存在的问题)
     *
     * @throws IOException
     */
    @Test(  enabled = true )
    public void testWaitIndexReady() throws IOException, ParseException, InterruptedException {
        RangeTime rangeTime = new RangeTime();
        String time = "2025-02-21 20:00:00";
        rangeTime.calcAllByTimeAndPeriod( time, 1 );

        service.waitIndexReady( host,   port,
                username,   password,
                "hlog_crm3_comm_undef_2025_02_21"
        );
    }

    /**
     * 创建索引别名
     *
     * @throws IOException
     */
    @Test(  enabled = true )
    public void testCreateAliases() throws IOException, ParseException {
        RangeTime rangeTime = new RangeTime();
        String time = "2025-02-21 20:00:00";
        rangeTime.calcAllByTimeAndPeriod( time, 1 );

        service.createAliases( username,   password,
                host,   port,
                "hlog-crm3-comm-undef.2025.02.21","hlog_crm3_comm_undef_2025_02_21"
        );
    }

    /**
     * sql查询
     *
     * @throws IOException
     */
    @Test(  enabled = true )
    public void testSQLRequest() throws IOException, ParseException, InterruptedException {
        log.info("实时从ES统计接口请求数量、成功率、延迟指标  (所有接口/不区分接口)服务级 :{}:{}", host, port );

        RangeTime rangeTime = new RangeTime();
        String time = "2025-02-21 20:00:00";
        rangeTime.calcAllByTimeAndPeriod( time, 1 );

        List<Object> ret=null;
        String method = "POST";
        String endpoint = "/_sql?format=json";

        //  根据实际时间字段格式调整
        SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
        long cStart = rangeTime.getStartDate().getTime();
        long cEnd = rangeTime.getEndDate().getTime();

        // 原索引名是sql关键字,索引需要创建别名
        SimpleDateFormat sdf_source = new SimpleDateFormat("yyyy.MM.dd" );
        SimpleDateFormat sdf_target = new SimpleDateFormat("yyyy_MM_dd" );
        // hlog-crm3-comm-undef.2024.01.06
        String idxName = ESConsts.INTERFLOG_INDEX_NAME_PREFIX + sdf_source.format(rangeTime.getStartDate());
        // hlog_crm3_comm_undef_2024_01_06
        String idxAliases = ESConsts.INTERFLOG_INDEX_ALIASES_PREFIX + sdf_target.format(rangeTime.getStartDate());

        //等待索引就绪
        service.waitIndexReady(host,   port,
                username,   password,
                idxName);

        // 判断别名是否存在,不存在则创建
        if(!service.isExistsAliases(  host,   port,
                username,   password,
                idxAliases)){
            service.createAliases(  username,   password,  host,   port,
                   idxName, idxAliases);
        }

        /*   logObj.status.keyword(对应String) 、还是 logObj.status(对应int ) 根据类型来  */
        String statusFieldName = getStatusFieldName(host,   port,
                username,   password,
                idxAliases );

        String sqlQuery2 = "SELECT logObj.province,server, logObj.app, '' node_ip, " +
                "  '' uri, count(id) as num, avg(logObj.cost) latency ,   " +
                "sum(iif("+statusFieldName+"=200,1,0))   as success_count " +
                " FROM " + idxAliases + " " +
                // 算头不算尾
                "where cTime >= ? and cTime < ? " +
                "group by logObj.province,server, logObj.app ";

        // max fetch_size is 65536 , 已验证一次查询结果不会超过这个数,故不用做滚动查询
        String jsonEntity = "{\"query\": \"" + sqlQuery2 + "\", " +
                "\"params\": ["+cStart+","+cEnd+"]," +
                " \"fetch_size\": 10 }"; // 65536 -- 不同的版本单次查询最大数据限制不一样,这里测试只查10条

        String body =null;
        try
        {
            body = service.request(    host,   port,
                    username,   password,
                    method,   endpoint,   jsonEntity );
            // 组装成实际需要的业务类型集合
//            ret = ESIntfLoad3Parser.parseESresult4SvcLoad( body, IDC, host ,port, rangeTime);
        }catch (Exception ex)
        {
            log.error("实时从ES统计接口请求数量、成功率、延迟指标  (所有接口)服务级 指标异常!message:{}",ex.getLocalizedMessage());
            log.error("sqlQuery2:{}", sqlQuery2 );
            log.error("jsonEntity:{}", jsonEntity);
            ex.printStackTrace();
        }
        log.debug(" body:{}", body );
    }
    /**
     * kql查询
     *
     * @throws IOException
     */
    @Test(  enabled = true )
    public void testKQLRequest() throws IOException, ParseException, InterruptedException {
        log.info("实时从ES统计接口请求数量、成功率、延迟指标  (所有接口/不区分接口)服务级 :{}:{}", host, port );

        RangeTime rangeTime = new RangeTime();
        String time = "2025-02-21 20:00:00";
        rangeTime.calcAllByTimeAndPeriod( time, 1 );

        List<Object> ret=null;
        String method = "GET";
        String endpoint = "/hlog_crm3_comm_undef_2025_02_21/_search";

        SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
        // 这里cStart、cEnd两者相关1分钟,在上面做了初始化
        long cStart = rangeTime.getStartDate().getTime();
        long cEnd = rangeTime.getEndDate().getTime();

        // 原索引名是sql关键字,索引需要创建别名
        SimpleDateFormat sdf_source = new SimpleDateFormat("yyyy.MM.dd" );
        SimpleDateFormat sdf_target = new SimpleDateFormat("yyyy_MM_dd" );
        // hlog-crm3-comm-undef.2024.01.06
        String idxName = ESConsts.INTERFLOG_INDEX_NAME_PREFIX + sdf_source.format(rangeTime.getStartDate());
        // hlog_crm3_comm_undef_2024_01_06
        String idxAliases = ESConsts.INTERFLOG_INDEX_ALIASES_PREFIX + sdf_target.format(rangeTime.getStartDate());

        //等待索引就绪
        service.waitIndexReady(host,   port,
                username,   password,
                idxName);

        // 判断别名是否存在,不存在则创建
        if(!service.isExistsAliases(  host,   port,
                username,   password,
                idxAliases)){
            service.createAliases(  username,   password,  host,   port,
                    idxName, idxAliases);
        }

        String kqlQuery2 = "{ " +
                "    \"bool\": { " +
                "      \"must\": [ " +
                "        { \"match_phrase\": { \"logObj.app\":\"inst-service\" } }, " +
                "        { \"range\": { " +
                "          \"cTime\": {  " +
                "            \"gte\": ?, " +
                "            \"lt\": ? " +
                "          } " +
                "        }} " +
                "      ] " +
                "    } " +
                "  }"; // 将index_name、field_name和value替换为相应的索引名称、字段名和值

        // max fetch_size is 65536 , 已验证一次查询结果不会超过这个数,故不用做滚动查询
        String jsonEntity = "{\"query\": \"" + kqlQuery2 + "\", " +
                "\"params\": ["+cStart+","+cEnd+"]," +
                " \"size\": 0,\" +\n" +
                " \"size\": 2 }"; // 65536 -- 不同的版本单次查询最大数据限制不一样, 这里测试只查两条

        String body =null;
        try
        {
            body = service.request(    host,   port,
                    username,   password,
                    method,   endpoint,   jsonEntity );
            // 组装成实际需要的业务类型集合
//            ret = ESIntfLoad3Parser.parseESresult4SvcLoad( body, IDC, host ,port, rangeTime);
        }catch (Exception ex)
        {
            log.error("实时从ES统计接口请求数量、成功率、延迟指标  (所有接口)服务级 指标异常!message:{}",ex.getLocalizedMessage());
            log.error("kqlQuery2:{}", kqlQuery2 );
            log.error("jsonEntity:{}", jsonEntity);
            ex.printStackTrace();
        }
        log.debug(" body:{}", body );
    }

    /**
     *  查询索引 logObj.status 字段类型
     *    如果是 text 则使用 logObj.status.keyword
     *    如果是 long 则使用 logObj.status
     *
     * @Author:         brickman
     * @CreateDate:     2025-02-21 17:48:00
     * @Version:        1.0
     */
    private String getStatusFieldName(String host,   int port,
                                      String username,   String password,
                                      String idxAliases ) throws IOException {
        // logObj.status 字段 default: long
        String statusFieldName = "logObj.status";

        // 查询索引 logObj.status 字段类型
        if("text".equalsIgnoreCase( service.getIndexPropertyType(  host,   port,
                username,   password,
                idxAliases, "logObj.status" ) )){
            statusFieldName = "logObj.status.keyword";
        }
        return statusFieldName;
    }
}

单元测试效果

 作者使用的testng,配置中心nacos

常量类

package person.brickman.constant;

/**
 * @Description:   ES 常量类
 * @Author:         brickman
 * @CreateDate:     2025/2/21 22:05
 * @Version:        1.0
 */
public class ESConsts {

    /**  interf 索引名常量  */
    public static final String INTERFLOG_INDEX_NAME_PREFIX = "interf-log-dand-comm-undef.";
    /**  interf 索引别名常量  */
    public static final String INTERFLOG_INDEX_ALIASES_PREFIX = "interf_log_dand_comm_undef_";

    /**  csb 索引名常量  */
    public static final String CSB_INDEX_NAME_PREFIX = "csb-service.csb.";
    /**  csb 索引别名常量  */
    public static final String CSB_INDEX_ALIASES_PREFIX = "csb_service_csb_";

    /**  skywalking 索引名常量  */
    public static final String SW_INDEX_NAME_PREFIX = "sw_metrics-all-";
    /**  skywalking 索引别名常量  */
    public static final String SW_INDEX_ALIASES_PREFIX = "sw_metrics_all_";
}

工具类

RangeTimeUtils

用于通过指定时间(如当前时间)生成开始时间、截止时间、采集周期、采集时间(跨度)范围、指标采集时间

package person.brickman.util;

import lombok.Data;
import org.apache.commons.lang3.time.DateUtils;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Description:
 *
 * s_start    String(19)	开始时间	因为是时间段的数据,所以增加4个字段
 * s_end	String(19)	截止时间
 * period	smallint	采集周期	多久采一次。默认1,单位分钟,与范围不同
 * n_range	smallint	采集时间(跨度)范围	采多长时间的数据。默认1,单位分钟 eg:1min
 * time	String(19)	指标采集时间	冗余字段,yyyy-MM-dd HH:mm:ss
 * 部分时序数据库以long形式显示timestamp,此字段便于查看
 *
 * @Author: brickman
 * @CreateDate: 2025/02/21 9:03 PM
 * @Version: 1.0
 */
@Data
public class RangeTimeUtils {

    /** 开始时间 yyyy-MM-dd HH:mm:ss   */
    private String cStart;
    /** 截止时间 yyyy-MM-dd HH:mm:ss */
    private String cEnd;
    /** 采集周期	多久采一次。默认1,单位分钟,与范围不同  */
    private int period;

    /**
     * 采集时间(跨度)范围
     * @deprecated  一般与采集周期一致
     * */
    private int nRange;

    /**
     * 采集时间  yyyy-MM-dd HH:mm:ss
     * eg: 2025-02-21 17:48:00
     */
    private String time;

    /**
     * 通过时间和周期计算所有字段值
     *   默认偏移1分钟,因之前调的默认方法且需要偏移,所有遵循开闭原则
     * @param time eg: 2025-02-21 17:48:00
     * @param period 单位:分钟
     * @return void
     **/
    public void calcAllByTimeAndPeriod( String time, int period ) throws ParseException {
        calcAllByTimeAndPeriod( time, period, -1 );
    }

    /**
     * @param shifting 波动时间、偏移时间
     * @return null
     **/
    public void calcAllByTimeAndPeriod( String time, int period, int shifting ) throws ParseException {
        SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date dt = sdf1.parse(time);// dt:date time

        SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm");
        Date de = sdf2.parse(sdf2.format(dt));//de:date end
        Date ds = DateUtils.addMinutes(de, -period ); // ds: date start

        /*   */
        ds = DateUtils.addMinutes(ds, shifting);
        de = DateUtils.addMinutes(de, shifting);

        this.cStart = sdf1.format(ds);
        this.cEnd = sdf1.format(de);
        this.period=period;
        // 采集时间(跨度)范围 与 采集周期一致
        this.nRange = this.period;
        this.time=time;
    }

    public Date getStartDate() throws ParseException {
        SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return sdf1.parse(this.cStart);// dt:date time
    }

    public Date getEndDate() throws ParseException {
        SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return sdf1.parse(this.cEnd);// dt:date time
    }

    public Date getGraphQLStartDate() throws ParseException {
        SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return sdf1.parse(this.cStart);// dt:date time
    }

    public Date getGraphQLEndDate() throws ParseException {
        SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return sdf1.parse(this.cEnd);// dt:date time
    }

    public String getGraphQLStart() throws ParseException {
        SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HHmm");
        return sdf1.format(getGraphQLStartDate());// dt:date time
    }

    public String getGraphQLEnd() throws ParseException {
        SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd HHmm");
        return sdf1.format(getGraphQLEndDate());// dt:date time
    }
}

TimeUtils

 取当前时间精确到分(取整分)

package person.brickman.util;

import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @Description:    时间工具类
 *     取当前时间精确到分(取整分)
 *
 * @Author:         brickman
 * @CreateDate:     2025/2/21 22:09
 * @Version:        1.0
 */
public class TimeUtils {
    /**  取当前时间精确到分(取整分),可优化为取数据库时间   **/
    public static String calcWholeMinute(){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
        String time = sdf.format(new Date());
        System.out.println("#### WholeMinuteTime:"+time+":00");
        return time+":00";
    }

    /**  根椐入参取整分   **/
    public static String calcWholeMinute( Date date ){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm");
        String time = sdf.format( date );
        System.out.println("#### WholeMinuteTime:"+time+":00");
        return time+":00";
    }
}

总结

1、ElasticSearch公共方法封装能降低开发成本、提高开发效率

2、非常通用的方法,如查Skywalking的索引数据、查自研的接口调用日志数据等

3、说本文将ES的日常开发代码一网代尽都不为过

附件一:ElasticSearch介绍

ElasticSearch介绍

在Elasticsearch(简称ES)中,它是一个基于Apache Lucene构建的开源、分布式、RESTful搜索引擎,旨在实时地存储、搜索和分析大量数据。Elasticsearch广泛应用于日志分析、全文搜索、实时分析等场景。下面我将介绍Elasticsearch的一些基本概念和功能:

1. 基本概念

  • 索引(Index):在Elasticsearch中,索引类似于传统关系数据库中的“数据库”。它是存储相关文档(数据)的地方。

  • 类型(Type):在Elasticsearch 7.x及以前版本中,每个索引可以包含多个类型。但从Elasticsearch 7.x开始,一个索引中只能有一个类型(默认为_doc),这一改动主要是因为Elasticsearch 8.x将完全废弃类型功能。

  • 文档(Document):文档是Elasticsearch中最小的数据单元,可以是JSON格式的数据。每个文档都有一个唯一的ID。

  • 字段(Field):文档由一个或多个字段组成,每个字段都有一个名称和一个值。

  • 映射(Mapping):映射定义了索引中文档的结构,包括字段的类型、是否索引、是否存储等属性。

2. 核心功能

  • 全文搜索:Elasticsearch提供了强大的全文搜索能力,支持模糊搜索、范围查询等。

  • 实时性:数据输入Elasticsearch后即可被搜索,具有很高的实时性。

  • 分布式特性:Elasticsearch可以分布式部署在多台服务器上,实现数据的分布式存储和查询,提高了系统的可扩展性和可靠性。

  • RESTful API:通过RESTful API可以方便地对Elasticsearch进行索引的创建、文档的增删改查等操作。

  • 聚合(Aggregations):聚合允许你对数据进行复杂的分析,如分组统计、计算平均值、求和等。

  • 多租户(Multi-tenancy):支持多租户模式,可以轻松地管理和隔离不同客户或项目的数据。

结语

Elasticsearch是一个功能强大且灵活的搜索引擎,适用于各种需要快速检索大量数据的场景。通过了解其基本概念和核心功能,你可以开始构建自己的搜索解决方案。随着不断学习和实践,你将能够充分利用Elasticsearch的潜力。


http://www.niftyadmin.cn/n/5861848.html

相关文章

AI基础:数据可视化简易入门(Matplotlib和Seaborn)

Matplotlib是一个Python的2D绘图库&#xff0c;它以各种硬拷贝和跨平台的交互式环境生成出版质量级别的图形。 Seaborn是基于Python且非常受欢迎的图形可视化库&#xff0c;在Matplotlib的基础上进行了更高级别的封装&#xff0c;使作图更加方便快捷。 1 Matplotlib 1.1 通过…

微信问题总结(onpageshow ,popstate事件)

此坑描述 订单详情某按钮点击&#xff0c;通过window.location.href跳转到&#xff08;外部&#xff09;第三方链接后&#xff0c;回退后&#xff0c;在ios中生命周期和路由导航钩子都失效了&#xff0c;无法触发。 在安卓中无视此坑&#xff0c; 回退没有问题 解决 原因&am…

MySQL MHA 部署全攻略:从零搭建高可用数据库架构

文章目录 1.MHA介绍2.MHA组件介绍3.集群规划4.服务器初始化5.MySQL集群部署5.1 安装MySQL集群5.2 配置一主两从5.3 测试MySQL主从5.4 赋予MHA用户连接权限 6.安装MHA环境6.1 安装MHA Node6.2 安装MHA Manager 7.配置MHA环境8.MySQL MHA高可用集群测试8.1 通过VIP连接MySQL8.2模…

精准测量PMD:OCI-V光矢量分析系统赋能光纤通信性能优化

在光纤通信技术飞速发展的今天&#xff0c;偏振模色散&#xff08;PMD&#xff09;已成为制约系统性能的核心瓶颈之一。PMD会导致信号失真、码间串扰&#xff0c;并限制传输距离&#xff0c;严重影响系统的带宽容量和传输可靠性。因此&#xff0c;精准测量PMD对于优化光纤通信系…

kafka-集群缩容

一. 简述&#xff1a; 当业务增加时&#xff0c;服务瓶颈&#xff0c;我们需要进行扩容。当业务量下降时&#xff0c;为成本考虑。自然也会涉及到缩容。假设集群有 15 台机器&#xff0c;预计缩到 10 台机器&#xff0c;那么需要做 5 次缩容操作&#xff0c;每次将一个节点下线…

Ubuntu 22.04 Install deepseek

前言 deepseekAI助手。它具有聊天机器人功能&#xff0c;可以与用户进行自然语言交互&#xff0c;回答问题、提供建议和帮助解决问题。DeepSeek 的特点包括&#xff1a; 强大的语言理解能力&#xff1a;能够理解和生成自然语言&#xff0c;与用户进行流畅的对话。多领域知识&…

使用Open WebUI下载的模型文件(Model)默认存放在哪里?

&#x1f3e1;作者主页&#xff1a;点击&#xff01; &#x1f916;Ollama部署LLM专栏&#xff1a;点击&#xff01; ⏰️创作时间&#xff1a;2025年2月21日21点21分 &#x1f004;️文章质量&#xff1a;95分 文章目录 使用CMD安装存放位置 默认存放路径 Open WebUI下…

Android Studio SVN下载安装

原文链接&#xff1a;SVN简介和安装 步骤1&#xff1a;安装 SVN 插件 首先&#xff0c;你需要在 Android Studio 中安装 SVN 插件。你可以按照以下步骤进行操作&#xff1a; 打开 Android Studio点击 “File” 菜单&#xff0c;选择 “Settings”在 “Settings” 窗口中&…