From 4d5416912b3bd3735f75176b89115b8874c095e4 Mon Sep 17 00:00:00 2001 From: Ignacio Vera Date: Tue, 6 Feb 2024 17:53:41 +0100 Subject: [PATCH] Use an AbstractList to build the AggregationList for reduction (#105200) We are building a list of InternalAggregations from a list of Buckets, therefore we can use an AbstractList to create the actual list and save some allocations. --- .../adjacency/InternalAdjacencyMatrix.java | 7 +++---- .../histogram/InternalAutoDateHistogram.java | 7 +++---- .../bucket/timeseries/InternalTimeSeries.java | 5 ++--- .../InternalMultiBucketAggregation.java | 20 +++++++++++++++++++ .../bucket/composite/InternalComposite.java | 9 ++++----- .../bucket/filter/InternalFilters.java | 7 +++---- .../bucket/geogrid/InternalGeoGrid.java | 7 +++---- .../histogram/InternalDateHistogram.java | 7 +++---- .../bucket/histogram/InternalHistogram.java | 7 +++---- .../InternalVariableWidthHistogram.java | 9 ++++----- .../bucket/prefix/InternalIpPrefix.java | 7 +++---- .../bucket/range/InternalBinaryRange.java | 6 +++--- .../bucket/range/InternalRange.java | 7 +++---- .../bucket/terms/AbstractInternalTerms.java | 7 +++---- .../bucket/terms/InternalRareTerms.java | 8 +++----- .../terms/InternalSignificantTerms.java | 7 +++---- 16 files changed, 66 insertions(+), 61 deletions(-) diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java index c17cc004e25b5..745585901311a 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/adjacency/InternalAdjacencyMatrix.java @@ -205,18 +205,17 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) { @Override protected InternalBucket reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.size() > 0; + assert buckets.isEmpty() == false; InternalBucket reduced = null; - List aggregationsList = new ArrayList<>(buckets.size()); for (InternalBucket bucket : buckets) { if (reduced == null) { reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations); } else { reduced.docCount += bucket.docCount; } - aggregationsList.add(bucket.aggregations); } - reduced.aggregations = InternalAggregations.reduce(aggregationsList, context); + final List aggregations = new BucketAggregationList<>(buckets); + reduced.aggregations = InternalAggregations.reduce(aggregations, context); return reduced; } diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/InternalAutoDateHistogram.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/InternalAutoDateHistogram.java index de36a9721fe38..78f6d67b0f748 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/InternalAutoDateHistogram.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/InternalAutoDateHistogram.java @@ -410,14 +410,13 @@ private List mergeBuckets( @Override protected Bucket reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.size() > 0; - List aggregations = new ArrayList<>(buckets.size()); + assert buckets.isEmpty() == false; long docCount = 0; for (Bucket bucket : buckets) { docCount += bucket.docCount; - aggregations.add(bucket.getAggregations()); } - InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + final List aggregations = new BucketAggregationList<>(buckets); + final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); return new InternalAutoDateHistogram.Bucket(buckets.get(0).key, docCount, format, aggs); } diff --git a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java index 67a7773fd01bb..725bd5673bccf 100644 --- a/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java +++ b/modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/timeseries/InternalTimeSeries.java @@ -255,16 +255,15 @@ public InternalBucket createBucket(InternalAggregations aggregations, InternalBu @Override protected InternalBucket reduceBucket(List buckets, AggregationReduceContext context) { InternalTimeSeries.InternalBucket reduced = null; - List aggregationsList = new ArrayList<>(buckets.size()); for (InternalTimeSeries.InternalBucket bucket : buckets) { if (reduced == null) { reduced = new InternalTimeSeries.InternalBucket(bucket.key, bucket.docCount, bucket.aggregations, bucket.keyed); } else { reduced.docCount += bucket.docCount; } - aggregationsList.add(bucket.aggregations); } - reduced.aggregations = InternalAggregations.reduce(aggregationsList, context); + final List aggregations = new BucketAggregationList<>(buckets); + reduced.aggregations = InternalAggregations.reduce(aggregations, context); return reduced; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java b/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java index dda632e7aa020..8f6987dfa6be1 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/InternalMultiBucketAggregation.java @@ -15,6 +15,7 @@ import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree; import java.io.IOException; +import java.util.AbstractList; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -77,6 +78,25 @@ protected InternalMultiBucketAggregation(StreamInput in) throws IOException { */ protected abstract B reduceBucket(List buckets, AggregationReduceContext context); + /** Helps to lazily construct the aggregation list for reduction */ + protected static class BucketAggregationList extends AbstractList { + private final List buckets; + + public BucketAggregationList(List buckets) { + this.buckets = buckets; + } + + @Override + public InternalAggregations get(int index) { + return buckets.get(index).getAggregations(); + } + + @Override + public int size() { + return buckets.size(); + } + } + @Override public abstract List getBuckets(); diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java index 922baf1f83f83..e9dc079edaf14 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java @@ -283,19 +283,18 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) { @Override protected InternalBucket reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.size() > 0; - List aggregations = new ArrayList<>(buckets.size()); + assert buckets.isEmpty() == false; long docCount = 0; for (InternalBucket bucket : buckets) { docCount += bucket.docCount; - aggregations.add(bucket.aggregations); } - InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + final List aggregations = new BucketAggregationList<>(buckets); + final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); /* Use the formats from the bucket because they'll be right to format * the key. The formats on the InternalComposite doing the reducing are * just whatever formats make sense for *its* index. This can be real * trouble when the index doing the reducing is unmapped. */ - var reducedFormats = buckets.get(0).formats; + final var reducedFormats = buckets.get(0).formats; return new InternalBucket(sourceNames, reducedFormats, buckets.get(0).key, reverseMuls, missingOrders, docCount, aggs); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java index 726589ca7c1b5..8ae5aed72a3a5 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java @@ -238,18 +238,17 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) { @Override protected InternalBucket reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.size() > 0; + assert buckets.isEmpty() == false; InternalBucket reduced = null; - List aggregationsList = new ArrayList<>(buckets.size()); for (InternalBucket bucket : buckets) { if (reduced == null) { reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations, bucket.keyed, keyedBucket); } else { reduced.docCount += bucket.docCount; } - aggregationsList.add(bucket.aggregations); } - reduced.aggregations = InternalAggregations.reduce(aggregationsList, context); + final List aggregations = new BucketAggregationList<>(buckets); + reduced.aggregations = InternalAggregations.reduce(aggregations, context); return reduced; } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java index 315eda4793a12..bc12555664575 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java @@ -136,14 +136,13 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) { @Override protected InternalGeoGridBucket reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.size() > 0; - List aggregationsList = new ArrayList<>(buckets.size()); + assert buckets.isEmpty() == false; long docCount = 0; for (InternalGeoGridBucket bucket : buckets) { docCount += bucket.docCount; - aggregationsList.add(bucket.aggregations); } - final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + final List aggregations = new BucketAggregationList<>(buckets); + final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); return createBucket(buckets.get(0).hashAsLong, docCount, aggs); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java index 8a7561aaab574..a6d3627ecda28 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java @@ -393,14 +393,13 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent buckets, AggregationReduceContext context) { - assert buckets.size() > 0; - List aggregations = new ArrayList<>(buckets.size()); + assert buckets.isEmpty() == false; long docCount = 0; for (Bucket bucket : buckets) { docCount += bucket.docCount; - aggregations.add(bucket.getAggregations()); } - InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + final List aggregations = new BucketAggregationList<>(buckets); + final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); return createBucket(buckets.get(0).key, docCount, aggs); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java index b6d5a705fe0cd..88777d5abde99 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java @@ -348,14 +348,13 @@ protected boolean lessThan(IteratorAndCurrent a, IteratorAndCurrent buckets, AggregationReduceContext context) { - assert buckets.size() > 0; - List aggregations = new ArrayList<>(buckets.size()); + assert buckets.isEmpty() == false; long docCount = 0; for (Bucket bucket : buckets) { docCount += bucket.docCount; - aggregations.add(bucket.getAggregations()); } - InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + final List aggregations = new BucketAggregationList<>(buckets); + final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); return createBucket(buckets.get(0).key, docCount, aggs); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java index 59bb251368c2e..073621575f292 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java @@ -307,7 +307,6 @@ public Number getKey(MultiBucketsAggregation.Bucket bucket) { @Override protected Bucket reduceBucket(List buckets, AggregationReduceContext context) { - List aggregations = new ArrayList<>(buckets.size()); long docCount = 0; double min = Double.POSITIVE_INFINITY; double max = Double.NEGATIVE_INFINITY; @@ -317,11 +316,11 @@ protected Bucket reduceBucket(List buckets, AggregationReduceContext con min = Math.min(min, bucket.bounds.min); max = Math.max(max, bucket.bounds.max); sum += bucket.docCount * bucket.centroid; - aggregations.add(bucket.getAggregations()); } - InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); - double centroid = sum / docCount; - Bucket.BucketBounds bounds = new Bucket.BucketBounds(min, max); + final List aggregations = new BucketAggregationList<>(buckets); + final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + final double centroid = sum / docCount; + final Bucket.BucketBounds bounds = new Bucket.BucketBounds(min, max); return new Bucket(centroid, bounds, docCount, format, aggs); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/InternalIpPrefix.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/InternalIpPrefix.java index f0104599396dd..33c3122e58967 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/InternalIpPrefix.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/prefix/InternalIpPrefix.java @@ -333,14 +333,13 @@ private Bucket createBucket(Bucket prototype, InternalAggregations aggregations, @Override protected Bucket reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.size() > 0; - List aggregations = new ArrayList<>(buckets.size()); + assert buckets.isEmpty() == false; long docCount = 0; for (InternalIpPrefix.Bucket bucket : buckets) { docCount += bucket.docCount; - aggregations.add(bucket.getAggregations()); } - InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); + final List aggregations = new BucketAggregationList<>(buckets); + final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); return createBucket(buckets.get(0), aggs, docCount); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java index 131be36db2956..414af918e837d 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java @@ -292,9 +292,9 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) { @Override protected Bucket reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.size() > 0; - List aggregationsList = buckets.stream().map(bucket -> bucket.aggregations).toList(); - final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + assert buckets.isEmpty() == false; + final List aggregations = new BucketAggregationList<>(buckets); + final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); return createBucket(aggs, buckets.get(0)); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java index 046d5efb97ece..ec0ace8f3e011 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java @@ -369,14 +369,13 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) { @Override protected B reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.size() > 0; + assert buckets.isEmpty() == false; long docCount = 0; - List aggregationsList = new ArrayList<>(buckets.size()); for (Bucket bucket : buckets) { docCount += bucket.docCount; - aggregationsList.add(bucket.aggregations); } - final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + final List aggregations = new BucketAggregationList<>(buckets); + final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); Bucket prototype = buckets.get(0); return getFactory().createBucket(prototype.key, prototype.from, prototype.to, docCount, aggs, keyed, format); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java index ca3142a0c0797..ea3762503853e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/AbstractInternalTerms.java @@ -88,14 +88,13 @@ public abstract static class AbstractTermsBucket extends InternalMultiBucketAggr @Override public B reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.size() > 0; + assert buckets.isEmpty() == false; long docCount = 0; // For the per term doc count error we add up the errors from the // shards that did not respond with the term. To do this we add up // the errors from the shards that did respond with the terms and // subtract that from the sum of the error from all shards long docCountError = 0; - List aggregationsList = new ArrayList<>(buckets.size()); for (B bucket : buckets) { docCount += bucket.getDocCount(); if (docCountError != -1) { @@ -105,9 +104,9 @@ public B reduceBucket(List buckets, AggregationReduceContext context) { docCountError += bucket.getDocCountError(); } } - aggregationsList.add(bucket.getAggregations()); } - InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + final List aggregations = new BucketAggregationList<>(buckets); + final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); return createBucket(docCount, aggs, docCountError, buckets.get(0)); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java index f3ce541b1b8b9..b5aa8e3973c3e 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalRareTerms.java @@ -21,7 +21,6 @@ import org.elasticsearch.xcontent.XContentBuilder; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -149,14 +148,13 @@ public InternalAggregation reduce(List aggregations, Aggreg @Override protected B reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.size() > 0; + assert buckets.isEmpty() == false; long docCount = 0; - List aggregationsList = new ArrayList<>(buckets.size()); for (B bucket : buckets) { docCount += bucket.docCount; - aggregationsList.add(bucket.aggregations); } - InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + final List aggregations = new BucketAggregationList<>(buckets); + final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); return createBucket(docCount, aggs, buckets.get(0)); } diff --git a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java index d627be186f8ff..be96683b98915 100644 --- a/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java +++ b/server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/InternalSignificantTerms.java @@ -276,16 +276,15 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) { @Override protected B reduceBucket(List buckets, AggregationReduceContext context) { - assert buckets.size() > 0; + assert buckets.isEmpty() == false; long subsetDf = 0; long supersetDf = 0; - List aggregationsList = new ArrayList<>(buckets.size()); for (B bucket : buckets) { subsetDf += bucket.subsetDf; supersetDf += bucket.supersetDf; - aggregationsList.add(bucket.aggregations); } - InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context); + final List aggregations = new BucketAggregationList<>(buckets); + final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context); return createBucket(subsetDf, buckets.get(0).subsetSize, supersetDf, buckets.get(0).supersetSize, aggs, buckets.get(0)); }