Skip to content

Commit

Permalink
Use an AbstractList to build the AggregationList for reduction (elast…
Browse files Browse the repository at this point in the history
…ic#105200)

We are building a list of InternalAggregations from a list of Buckets, therefore we can use an AbstractList to create the actual list and save some allocations.
  • Loading branch information
iverase authored Feb 6, 2024
1 parent e8288fb commit 4d54169
Show file tree
Hide file tree
Showing 16 changed files with 66 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,17 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {

@Override
protected InternalBucket reduceBucket(List<InternalBucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
assert buckets.isEmpty() == false;
InternalBucket reduced = null;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (InternalBucket bucket : buckets) {
if (reduced == null) {
reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations);
} else {
reduced.docCount += bucket.docCount;
}
aggregationsList.add(bucket.aggregations);
}
reduced.aggregations = InternalAggregations.reduce(aggregationsList, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
reduced.aggregations = InternalAggregations.reduce(aggregations, context);
return reduced;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,14 +410,13 @@ private List<Bucket> mergeBuckets(

@Override
protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
assert buckets.isEmpty() == false;
long docCount = 0;
for (Bucket bucket : buckets) {
docCount += bucket.docCount;
aggregations.add(bucket.getAggregations());
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return new InternalAutoDateHistogram.Bucket(buckets.get(0).key, docCount, format, aggs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,15 @@ public InternalBucket createBucket(InternalAggregations aggregations, InternalBu
@Override
protected InternalBucket reduceBucket(List<InternalBucket> buckets, AggregationReduceContext context) {
InternalTimeSeries.InternalBucket reduced = null;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (InternalTimeSeries.InternalBucket bucket : buckets) {
if (reduced == null) {
reduced = new InternalTimeSeries.InternalBucket(bucket.key, bucket.docCount, bucket.aggregations, bucket.keyed);
} else {
reduced.docCount += bucket.docCount;
}
aggregationsList.add(bucket.aggregations);
}
reduced.aggregations = InternalAggregations.reduce(aggregationsList, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
reduced.aggregations = InternalAggregations.reduce(aggregations, context);
return reduced;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator.PipelineTree;

import java.io.IOException;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -77,6 +78,25 @@ protected InternalMultiBucketAggregation(StreamInput in) throws IOException {
*/
protected abstract B reduceBucket(List<B> buckets, AggregationReduceContext context);

/** Helps to lazily construct the aggregation list for reduction */
protected static class BucketAggregationList<B extends Bucket> extends AbstractList<InternalAggregations> {
private final List<B> buckets;

public BucketAggregationList(List<B> buckets) {
this.buckets = buckets;
}

@Override
public InternalAggregations get(int index) {
return buckets.get(index).getAggregations();
}

@Override
public int size() {
return buckets.size();
}
}

@Override
public abstract List<B> getBuckets();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,19 +283,18 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {

@Override
protected InternalBucket reduceBucket(List<InternalBucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
assert buckets.isEmpty() == false;
long docCount = 0;
for (InternalBucket bucket : buckets) {
docCount += bucket.docCount;
aggregations.add(bucket.aggregations);
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
/* Use the formats from the bucket because they'll be right to format
* the key. The formats on the InternalComposite doing the reducing are
* just whatever formats make sense for *its* index. This can be real
* trouble when the index doing the reducing is unmapped. */
var reducedFormats = buckets.get(0).formats;
final var reducedFormats = buckets.get(0).formats;
return new InternalBucket(sourceNames, reducedFormats, buckets.get(0).key, reverseMuls, missingOrders, docCount, aggs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,18 +238,17 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {

@Override
protected InternalBucket reduceBucket(List<InternalBucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
assert buckets.isEmpty() == false;
InternalBucket reduced = null;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (InternalBucket bucket : buckets) {
if (reduced == null) {
reduced = new InternalBucket(bucket.key, bucket.docCount, bucket.aggregations, bucket.keyed, keyedBucket);
} else {
reduced.docCount += bucket.docCount;
}
aggregationsList.add(bucket.aggregations);
}
reduced.aggregations = InternalAggregations.reduce(aggregationsList, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
reduced.aggregations = InternalAggregations.reduce(aggregations, context);
return reduced;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,13 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {

@Override
protected InternalGeoGridBucket reduceBucket(List<InternalGeoGridBucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
assert buckets.isEmpty() == false;
long docCount = 0;
for (InternalGeoGridBucket bucket : buckets) {
docCount += bucket.docCount;
aggregationsList.add(bucket.aggregations);
}
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return createBucket(buckets.get(0).hashAsLong, docCount, aggs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,13 @@ protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Buck
*/
@Override
protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
assert buckets.isEmpty() == false;
long docCount = 0;
for (Bucket bucket : buckets) {
docCount += bucket.docCount;
aggregations.add(bucket.getAggregations());
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return createBucket(buckets.get(0).key, docCount, aggs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,14 +348,13 @@ protected boolean lessThan(IteratorAndCurrent<Bucket> a, IteratorAndCurrent<Buck

@Override
protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
assert buckets.isEmpty() == false;
long docCount = 0;
for (Bucket bucket : buckets) {
docCount += bucket.docCount;
aggregations.add(bucket.getAggregations());
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return createBucket(buckets.get(0).key, docCount, aggs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,6 @@ public Number getKey(MultiBucketsAggregation.Bucket bucket) {

@Override
protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
long docCount = 0;
double min = Double.POSITIVE_INFINITY;
double max = Double.NEGATIVE_INFINITY;
Expand All @@ -317,11 +316,11 @@ protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext con
min = Math.min(min, bucket.bounds.min);
max = Math.max(max, bucket.bounds.max);
sum += bucket.docCount * bucket.centroid;
aggregations.add(bucket.getAggregations());
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
double centroid = sum / docCount;
Bucket.BucketBounds bounds = new Bucket.BucketBounds(min, max);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
final double centroid = sum / docCount;
final Bucket.BucketBounds bounds = new Bucket.BucketBounds(min, max);
return new Bucket(centroid, bounds, docCount, format, aggs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,14 +333,13 @@ private Bucket createBucket(Bucket prototype, InternalAggregations aggregations,

@Override
protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregations = new ArrayList<>(buckets.size());
assert buckets.isEmpty() == false;
long docCount = 0;
for (InternalIpPrefix.Bucket bucket : buckets) {
docCount += bucket.docCount;
aggregations.add(bucket.getAggregations());
}
InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return createBucket(buckets.get(0), aggs, docCount);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,9 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {

@Override
protected Bucket reduceBucket(List<Bucket> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
List<InternalAggregations> aggregationsList = buckets.stream().map(bucket -> bucket.aggregations).toList();
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
assert buckets.isEmpty() == false;
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return createBucket(aggs, buckets.get(0));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,14 +369,13 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {

@Override
protected B reduceBucket(List<B> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
assert buckets.isEmpty() == false;
long docCount = 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (Bucket bucket : buckets) {
docCount += bucket.docCount;
aggregationsList.add(bucket.aggregations);
}
final InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
Bucket prototype = buckets.get(0);
return getFactory().createBucket(prototype.key, prototype.from, prototype.to, docCount, aggs, keyed, format);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,13 @@ public abstract static class AbstractTermsBucket extends InternalMultiBucketAggr

@Override
public B reduceBucket(List<B> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
assert buckets.isEmpty() == false;
long docCount = 0;
// For the per term doc count error we add up the errors from the
// shards that did not respond with the term. To do this we add up
// the errors from the shards that did respond with the terms and
// subtract that from the sum of the error from all shards
long docCountError = 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (B bucket : buckets) {
docCount += bucket.getDocCount();
if (docCountError != -1) {
Expand All @@ -105,9 +104,9 @@ public B reduceBucket(List<B> buckets, AggregationReduceContext context) {
docCountError += bucket.getDocCountError();
}
}
aggregationsList.add(bucket.getAggregations());
}
InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return createBucket(docCount, aggs, docCountError, buckets.get(0));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.elasticsearch.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -149,14 +148,13 @@ public InternalAggregation reduce(List<InternalAggregation> aggregations, Aggreg

@Override
protected B reduceBucket(List<B> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
assert buckets.isEmpty() == false;
long docCount = 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (B bucket : buckets) {
docCount += bucket.docCount;
aggregationsList.add(bucket.aggregations);
}
InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return createBucket(docCount, aggs, buckets.get(0));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,15 @@ public InternalAggregation finalizeSampling(SamplingContext samplingContext) {

@Override
protected B reduceBucket(List<B> buckets, AggregationReduceContext context) {
assert buckets.size() > 0;
assert buckets.isEmpty() == false;
long subsetDf = 0;
long supersetDf = 0;
List<InternalAggregations> aggregationsList = new ArrayList<>(buckets.size());
for (B bucket : buckets) {
subsetDf += bucket.subsetDf;
supersetDf += bucket.supersetDf;
aggregationsList.add(bucket.aggregations);
}
InternalAggregations aggs = InternalAggregations.reduce(aggregationsList, context);
final List<InternalAggregations> aggregations = new BucketAggregationList<>(buckets);
final InternalAggregations aggs = InternalAggregations.reduce(aggregations, context);
return createBucket(subsetDf, buckets.get(0).subsetSize, supersetDf, buckets.get(0).supersetSize, aggs, buckets.get(0));
}

Expand Down

0 comments on commit 4d54169

Please sign in to comment.