Skip to content

Es SearchResponse中结合结果的解析

Egbert-Liu edited this page Dec 24, 2020 · 4 revisions

解析如下

package com.huatai.ueba.engine.es;

import com.alibaba.fastjson.JSON;
import com.huatai.ueba.engine.EngineApp;
import com.huatai.ueba.engine.io.elasticsearch.EsSearchUtil;
import com.huatai.ueba.engine.io.elasticsearch.sql.es4sql.exception.SqlParseException;
import org.elasticsearch.action.search.SearchResponse;

import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.ParsedAggregation;
import org.elasticsearch.search.aggregations.ParsedMultiBucketAggregation;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.histogram.Histogram;
import org.elasticsearch.search.aggregations.bucket.histogram.ParsedDateHistogram;
import org.elasticsearch.search.aggregations.bucket.terms.*;
import org.elasticsearch.search.aggregations.metrics.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.*;

/**
 *
 * sql 被解析为 SearchRequest 对象 然后用来查询
 * 本次只是 对 Aggregation 聚合的结果进行 统一解析处理
 * 抛出了异常,所以 当出现新的 未处理的异常 直接添加一个 新的else if 处理对应的处理方式即可
 */
@SpringBootTest(classes = EngineApp.class)
@RunWith(SpringRunner.class)
public class TestSearchResponseParse {

    @Test
    public void testParseSearchResponse() throws Exception {
        String sql_group = "select operator_no,count(*),client_id  from ueba-event-202011 group by operator_no.keyword,client_id.keyword";

        String sql_group_testDate = "select operator_no,count(*),client_id,@timestamp  from ueba-event-202011 group by operator_no.keyword,client_id.keyword,@timestamp";

        String sql_stats = "select sum(age),avg(age),min(age),max(age),class,name from ueba-event-test group by class.keyword,name.keyword";

        //解析不了 统一指标聚合 的结果 所以结果暂时也不指定
        //String sql_stats_agg = "select stats,class from ueba-event-test group by class.keyword,name.keyword,stats(age)";

        String sql_stats_keyword = "select sum(age),avg(age),min(age),max(age),class,name from ueba-event-test group by class.keyword,name.keyword";

        String sql_stats_text = "select sum(age),avg(age),min(age),max(age),class,name from ueba-event-test group by class.keyword,name.keyword";

        //日期聚合 以及 + class 聚合
        String sql_group_date1 = "select sum(age),avg(age) from ueba-event-test group by class.keyword,date_histogram(field='create_time','time_zone'='+08:00','interval'='1d','alias'='yourAlias','format'='yyyy-MM-dd')";

        SearchResponse response = EsSearchUtil.getResponse(sql_group_testDate);

        Aggregations aggs = response.getAggregations();

        for (Aggregation agg : aggs) {
            covenValue(agg).forEach(map->{
                map.forEach((k,v)->System.out.print(k+"=="+v+"|||"));
                System.out.println("");
            });
            System.out.println("===================");
        }
        System.out.println("==========================在一起==============================");
        for (Aggregation agg : aggs) {
            aggParse(agg).forEach(map->{
                map.forEach((k,v)->System.out.print(k+"=="+v+"|||"));
                System.out.println("");
            });
            System.out.println("===================");
        }
    }

    /**
     * 解析 聚合结果 以List<Map<String,Object>>方式返回 数据
     * @param value 聚合结果
     * @return
     * @throws SqlParseException
     */
    public List<Map<String,Object>> covenValue(Aggregation value) throws SqlParseException {
        List<Map<String,Object>> list = new ArrayList<>();
        if (value instanceof InternalNumericMetricsAggregation.SingleValue) {
            String name = ((InternalNumericMetricsAggregation.SingleValue) value).getName();
            Map<String,Object> map = new HashMap<>();
             map.put(name,((InternalNumericMetricsAggregation.SingleValue)value).value());
             map.put("type","InternalNumericMetricsAggregation.SingleValue");
            list.add(map);
            return list;
        } else if (value instanceof InternalValueCount) {
            String name = ((InternalValueCount) value).getName();
            Map<String,Object> map = new HashMap<>();
             map.put(name,((InternalValueCount) value).getValue());
             map.put("type","InternalValueCount");
            list.add(map);
            return list;
        } else if (value instanceof InternalTopHits) {//top
            Map<String,Object> map = new HashMap<>();
            SearchHits hits = ((InternalTopHits) value).getHits();
            map.put(value.getName(),"InternalTopHits");
            map.put("InternalTopHits",hits);
            list.add(map);
            return list;
        } else if (value instanceof ParsedValueCount) {//日期使用字符串方式 聚合结果的封装
            ParsedValueCount value1 = (ParsedValueCount) value;
                Map<String,Object> map = new HashMap<>();
                map.put(value1.getName(),value1.getValueAsString());
                list.add(map);
               return list;
        }else if(value instanceof ParsedSingleValueNumericMetricsAggregation){//指标聚合结果封装
            ParsedSingleValueNumericMetricsAggregation agg =  (ParsedSingleValueNumericMetricsAggregation)value;
            Map<String,Object> map = new HashMap<>();
            map.put(agg.getName(),agg.getValueAsString());
            list.add(map);
            return list;
        } else if (value instanceof ParsedLongTerms) {
            for (Terms.Bucket parsedBucket : ((Terms) value).getBuckets()) {
                commonTransform(parsedBucket,list,value);
            }
            return list;
        }else if (value instanceof LongTerms) {//LongTerms 结果封装
            for (Terms.Bucket parsedBucket : ((Terms) value).getBuckets()) {//LongTerms
                commonTransform(parsedBucket,list,value);
            }
            return list;
        }  else if (value instanceof ParsedStringTerms) {//字符串类型结果的 封装
            for (Terms.Bucket parsedBucket : ((Terms) value).getBuckets()) {
                commonTransform(parsedBucket,list,value);
            }
            return list;
        } else if (value instanceof ParsedDateHistogram) {//日期类型 聚合结果的封装
            for (Histogram.Bucket parsedBucket : ((Histogram) value).getBuckets()) {//Terms  Histogram
                commonTransform(parsedBucket,list,value);
            }
            return list;
        } else {
            throw new SqlParseException("unknow this agg type " + value.getClass());
        }
    }


    /**
     *
     * @param bucket  Terms 与 Histogram 两者 都有内部接口  org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket
     * @param list   结果接受
     * @param agg    原始参数
     * @throws SqlParseException  递归函数 的 异常
     * 公共部分 需要递归 处理的方法 抽取
     */
    public void commonTransform(MultiBucketsAggregation.Bucket bucket, List<Map<String,Object>> list,Aggregation agg) throws SqlParseException {
            Map<String,Object> map = new HashMap<>();
            Aggregations aggregations = bucket.getAggregations();
            if(aggregations!=null){
                for (Aggregation aggregation : aggregations) {
                    List<Map<String, Object>> maps = covenValue(aggregation);
                    for (Map<String, Object> stringObjectMap : maps) {
                        map.putAll(stringObjectMap);
                    }
                }
            }
            map.put(agg.getName(),bucket.getKeyAsString());
            map.put("docCount",bucket.getDocCount());
            list.add(map);
    }


    /**
     * 解析 聚合结果 以List<Map<String,Object>>方式返回 数据
     * @param value 聚合结果
     * @return
     * @throws SqlParseException
     */
    public List<Map<String,Object>> aggParse(Aggregation value) throws SqlParseException {
        List<Map<String,Object>> list = new ArrayList<>();
        if (value instanceof InternalNumericMetricsAggregation.SingleValue) {
            String name = ((InternalNumericMetricsAggregation.SingleValue) value).getName();
            Map<String,Object> map = new HashMap<>();
            map.put(name,((InternalNumericMetricsAggregation.SingleValue)value).value());
            map.put("type","InternalNumericMetricsAggregation.SingleValue");
            list.add(map);
            return list;
        } else if (value instanceof InternalValueCount) {
            String name = ((InternalValueCount) value).getName();
            Map<String,Object> map = new HashMap<>();
            map.put(name,((InternalValueCount) value).getValue());
            map.put("type","InternalValueCount");
            list.add(map);
            return list;
        } else if (value instanceof InternalTopHits) {//top
            Map<String,Object> map = new HashMap<>();
            SearchHits hits = ((InternalTopHits) value).getHits();
            map.put(value.getName(),"InternalTopHits");
            map.put("InternalTopHits",hits);
            list.add(map);
            return list;
        } else if (value instanceof ParsedValueCount) {//日期使用字符串方式 聚合结果的封装
            ParsedValueCount value1 = (ParsedValueCount) value;
            Map<String,Object> map = new HashMap<>();
            map.put(value1.getName(),value1.getValueAsString());
            list.add(map);
            return list;
        }else if(value instanceof ParsedSingleValueNumericMetricsAggregation){//单指标聚合结果封装
            ParsedSingleValueNumericMetricsAggregation agg =  (ParsedSingleValueNumericMetricsAggregation)value;
            Map<String,Object> map = new HashMap<>();
            map.put(agg.getName(),agg.getValueAsString());
            list.add(map);
            return list;
        } else if (value instanceof ParsedMultiBucketAggregation) {//多桶操作结果封装
            //使用 多桶中 顶级接口 来 强转 获取 桶 进行操作 方式 类型转换异常
            for (MultiBucketsAggregation.Bucket bucket : ((MultiBucketsAggregation) value).getBuckets()) {
                commonTransform2(bucket,list,value);
            }

            return list;
        } else {
            throw new SqlParseException("unknow this agg type " + value.getClass());
        }
    }

    /**
     *
     * @param bucket  Terms 与 Histogram 两者 都有内部接口  org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation.Bucket
     * @param list   结果接受
     * @param agg    原始参数
     * @throws SqlParseException  递归函数 的 异常
     * 公共部分 需要递归 处理的方法 抽取
     */
    public void commonTransform2(MultiBucketsAggregation.Bucket bucket, List<Map<String,Object>> list,Aggregation agg) throws SqlParseException {
        Map<String,Object> map = new HashMap<>();
        Aggregations aggregations = bucket.getAggregations();
        if(aggregations!=null){
            for (Aggregation aggregation : aggregations) {
                List<Map<String, Object>> maps = aggParse(aggregation);
                for (Map<String, Object> stringObjectMap : maps) {
                    map.putAll(stringObjectMap);
                }
            }
        }
        map.put(agg.getName(),bucket.getKeyAsString());
        map.put("docCount",bucket.getDocCount());
        list.add(map);
    }


    @Test
    public void testGet() throws Exception {
        String sql = "select * from ueba-event-test limit 30";

        SearchResponse response = EsSearchUtil.getResponse(sql);

        response.getHits().forEach(hit-> System.out.println(hit.getSourceAsString()));

    }
}