• -------------------------------------------------------------
  • ====================================

使用multi-search进行统计分析(java)

elasticsearch dewbay 5年前 (2019-05-08) 2278次浏览 已收录 0个评论 扫描二维码
  1. 整体流程
    public Map<String, Long> statOneField(String aggrField) {
        long sBuildQuery = System.currentTimeMillis();
        List<List<String>> stationGroups = splitGroup();
        //多个查询请求
        MultiSearchRequestBuilder requestBuilder = client.prepareMultiSearch();
        for (List<String> stationGroup: stationGroups) {
            SearchRequestBuilder srb = buildQuery(stationGroup, baseStationField, aggrField);
            requestBuilder.add(srb);
        }
        long eBuildQuery = System.currentTimeMillis();
        System.out.println("创建查询时间:" + (eBuildQuery - sBuildQuery));

        //获取查询结果
        MultiSearchResponse sr = requestBuilder.execute().actionGet();
        long eExcuteQuery = System.currentTimeMillis();
        System.out.println("执行查询时间:" + (eExcuteQuery - eBuildQuery));

        Map<String, Long> totalAggr = new TreeMap<>();
        for (MultiSearchResponse.Item item : sr.getResponses()) {
            SearchResponse response = item.getResponse();
            nbHits += response.getHits().getTotalHits();
            nTook += response.getTook().getMillis();
            Map<String, Long> perAggr = parseResponse(response);
            add(totalAggr, perAggr);
        }
        long eAggrQuery = System.currentTimeMillis();
        System.out.println("统计结果时间:" + (eAggrQuery - eExcuteQuery));

        return totalAggr;
    }

2.建立单个查询,每个查询中包含 aggregation

    SearchRequestBuilder buildQuery(List<String> stationGroup, String field, String aggrField) {
        assert stationGroup != null && !stationGroup.isEmpty();

        final int SEARCH_SIZE = 0;
        SearchRequestBuilder srb = client
            .prepareSearch().setQuery(QueryBuilders.termsQuery(field, stationGroup)).setSize(SEARCH_SIZE)
            .addAggregation(AggregationBuilders.terms(TOP_NAME).field(aggrField).size(TelephoneConf.TOP_SIZE)
            );

        return srb;
    }

3.处理 aggregation 结果

    Map<String, Long> parseResponse(SearchResponse response) {
        Map<String, Long> ir = new TreeMap<>();
        Terms agg = response.getAggregations().get(TOP_NAME);
        for (Terms.Bucket entry : agg.getBuckets()) {
            Object key =  entry.getKey();
            long count = entry.getDocCount();
            ir.put(key.toString(), count);
        }

        return ir;
    }

4.map 根据值排序

    public static <K, V extends Comparable<? super V>> Map<K, V> sortMapByValue( Map<K, V> map, int topN )
    {
        List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>( map.entrySet() );
        Collections.sort( list, new Comparator<Map.Entry<K, V>>()
        {
            public int compare( Map.Entry<K, V> o1, Map.Entry<K, V> o2 )
            {
                return (o2.getValue()).compareTo( o1.getValue() );
            }
        } );

        int size = list.size() >= topN? topN:list.size();

        Map<K, V> result = new LinkedHashMap<K, V>();
        for (int i=0; i<size; i++){
            Map.Entry<K, V> tmpEntry = list.get(i);
            result.put(tmpEntry.getKey(), tmpEntry.getValue());
        }

        return result;
    }

问题:
如果一次传递的 search 数量比较多,会出现EsRejectedExecutionException in elasticsearch for parallel search

           SearchResponse response = item.getResponse();
            if (response == null) {
                System.out.println(item.getFailure().getCause());
                System.out.println("null *************************");
                System.out.println(item.getFailureMessage());
            }

            if (response.getHits() == null) {
                System.out.println("null 2 *************************");
                System.out.println(item.getFailureMessage());
            }

出现异常之后,response 会成为 null 指针,并且有的搜索请求的 totalHits 是不准确的(需要深入了解下原因)。

        //为避免 EsRejectedExecutionException,每 100 次请求,执行一次
        List<String> stationList = new ArrayList<>();
        MultiSearchRequestBuilder requestBuilder = client.prepareMultiSearch();
        int searchCount = 1;
        for (Set<String> stations: stationSets) {
            for (String station: stations) {
                SearchRequestBuilder srb = buildQuery(station, client);
                stationList.add(station);
                requestBuilder.add(srb);
                if (searchCount % 100 == 0) {
                    MultiSearchResponse msr = requestBuilder.execute().actionGet();
                    handleResponse(msr, stationList);
                    stationList.clear();
                    requestBuilder = client.prepareMultiSearch();
                }
                searchCount ++;
            }
        }

        if (!stationList.isEmpty()) {
            MultiSearchResponse msr = requestBuilder.execute().actionGet();
            handleResponse(msr, stationList);
            stationList.clear();
        }

作者:愚公 300 代
链接:https://www.jianshu.com/p/d449161e79f2
来源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。


露水湾 , 版权所有丨如未注明 , 均为原创丨本网站采用BY-NC-SA协议进行授权
转载请注明原文链接:使用multi-search进行统计分析(java)
喜欢 (2)
[]
分享 (0)
关于作者:
发表我的评论
取消评论

表情 贴图 加粗 删除线 居中 斜体 签到

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址