From 4c63481b732ff90d2249e0004f70b325b5210f93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Iv=C3=A1n=20Cea=20Fontenla?= Date: Mon, 23 Dec 2024 13:39:08 +0100 Subject: [PATCH] Fixed ValuesBytesRefGroupingAggregator CB leak (#119121) (#119211) - Fixed a leak if the breaker breaks on `ValuesBytesRefGroupingAggregator` - Improved aggregators Cranky test to show a better error with the failing stacktrace --- .../aggregation/ValuesBytesRefAggregator.java | 16 ++++- .../aggregation/X-ValuesAggregator.java.st | 16 ++++- ...ValuesBytesRefAggregatorFunctionTests.java | 55 ++++++++++++++++ ...tesRefGroupingAggregatorFunctionTests.java | 63 +++++++++++++++++++ .../compute/operator/OperatorTestCase.java | 12 +++- 5 files changed, 156 insertions(+), 6 deletions(-) create mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregatorFunctionTests.java create mode 100644 x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefGroupingAggregatorFunctionTests.java diff --git a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java index 602fd29433193..bd77bd7ff1e46 100644 --- a/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java +++ b/x-pack/plugin/esql/compute/src/main/generated-src/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregator.java @@ -130,8 +130,20 @@ public static class GroupingState implements Releasable { private final BytesRefHash bytes; private GroupingState(BigArrays bigArrays) { - values = new LongLongHash(1, bigArrays); - bytes = new BytesRefHash(1, bigArrays); + LongLongHash _values = null; + BytesRefHash _bytes = null; + try { + _values = new LongLongHash(1, bigArrays); + _bytes = new BytesRefHash(1, bigArrays); + + values = _values; + bytes = _bytes; + + _values = null; + _bytes = null; + } finally { + Releasables.closeExpectNoException(_values, _bytes); + } } void toIntermediate(Block[] blocks, int offset, IntVector selected, DriverContext driverContext) { diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st index a8884c58116f3..1cef234b2238f 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/aggregation/X-ValuesAggregator.java.st @@ -244,8 +244,20 @@ $endif$ $if(long||double)$ values = new LongLongHash(1, bigArrays); $elseif(BytesRef)$ - values = new LongLongHash(1, bigArrays); - bytes = new BytesRefHash(1, bigArrays); + LongLongHash _values = null; + BytesRefHash _bytes = null; + try { + _values = new LongLongHash(1, bigArrays); + _bytes = new BytesRefHash(1, bigArrays); + + values = _values; + bytes = _bytes; + + _values = null; + _bytes = null; + } finally { + Releasables.closeExpectNoException(_values, _bytes); + } $elseif(int||float)$ values = new LongHash(1, bigArrays); $endif$ diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregatorFunctionTests.java new file mode 100644 index 0000000000000..c0a91fe22b87b --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefAggregatorFunctionTests.java @@ -0,0 +1,55 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockUtils; +import org.elasticsearch.compute.operator.SequenceBytesRefBlockSourceOperator; +import org.elasticsearch.compute.operator.SourceOperator; + +import java.util.Arrays; +import java.util.List; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.containsInAnyOrder; + +public class ValuesBytesRefAggregatorFunctionTests extends AggregatorFunctionTestCase { + @Override + protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + return new SequenceBytesRefBlockSourceOperator( + blockFactory, + IntStream.range(0, size).mapToObj(l -> new BytesRef(randomAlphaOfLengthBetween(0, 100))) + ); + } + + @Override + protected AggregatorFunctionSupplier aggregatorFunction(List inputChannels) { + return new ValuesBytesRefAggregatorFunctionSupplier(inputChannels); + } + + @Override + protected String expectedDescriptionOfAggregator() { + return "values of bytes"; + } + + @Override + public void assertSimpleOutput(List input, Block result) { + TreeSet set = new TreeSet<>((List) BlockUtils.toJavaObject(result, 0)); + Object[] values = input.stream() + .flatMap(AggregatorFunctionTestCase::allBytesRefs) + .collect(Collectors.toSet()) + .toArray(Object[]::new); + if (false == set.containsAll(Arrays.asList(values))) { + assertThat(set, containsInAnyOrder(values)); + } + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefGroupingAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefGroupingAggregatorFunctionTests.java new file mode 100644 index 0000000000000..fc9bc90828df3 --- /dev/null +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/ValuesBytesRefGroupingAggregatorFunctionTests.java @@ -0,0 +1,63 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.compute.aggregation; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.compute.data.Block; +import org.elasticsearch.compute.data.BlockFactory; +import org.elasticsearch.compute.data.BlockUtils; +import org.elasticsearch.compute.data.Page; +import org.elasticsearch.compute.operator.LongBytesRefTupleBlockSourceOperator; +import org.elasticsearch.compute.operator.SourceOperator; +import org.elasticsearch.core.Tuple; + +import java.util.Arrays; +import java.util.List; +import java.util.TreeSet; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +public class ValuesBytesRefGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase { + @Override + protected AggregatorFunctionSupplier aggregatorFunction(List inputChannels) { + return new ValuesBytesRefAggregatorFunctionSupplier(inputChannels); + } + + @Override + protected String expectedDescriptionOfAggregator() { + return "values of bytes"; + } + + @Override + protected SourceOperator simpleInput(BlockFactory blockFactory, int size) { + return new LongBytesRefTupleBlockSourceOperator( + blockFactory, + IntStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), new BytesRef(randomAlphaOfLengthBetween(0, 100)))) + ); + } + + @Override + public void assertSimpleGroup(List input, Block result, int position, Long group) { + Object[] values = input.stream().flatMap(p -> allBytesRefs(p, group)).collect(Collectors.toSet()).toArray(Object[]::new); + Object resultValue = BlockUtils.toJavaObject(result, position); + switch (values.length) { + case 0 -> assertThat(resultValue, nullValue()); + case 1 -> assertThat(resultValue, equalTo(values[0])); + default -> { + TreeSet set = new TreeSet<>((List) resultValue); + if (false == set.containsAll(Arrays.asList(values))) { + assertThat(set, containsInAnyOrder(values)); + } + } + } + } +} diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java index be792a0ef2612..54db0453530bc 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/OperatorTestCase.java @@ -143,6 +143,7 @@ public final void testSimpleWithCranky() { DriverContext driverContext = crankyDriverContext(); + Exception exception = null; boolean driverStarted = false; try { Operator operator = simple().get(driverContext); @@ -150,8 +151,8 @@ public final void testSimpleWithCranky() { drive(operator, input.iterator(), driverContext); // Either we get lucky and cranky doesn't throw and the test completes or we don't and it throws } catch (CircuitBreakingException e) { - logger.info("broken", e); assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE)); + exception = e; } if (driverStarted == false) { // if drive hasn't even started then we need to release the input pages @@ -159,7 +160,14 @@ public final void testSimpleWithCranky() { } // Note the lack of try/finally here - we're asserting that when the driver throws an exception we clear the breakers. - assertThat(inputFactoryContext.breaker().getUsed(), equalTo(0L)); + long inputUsedBytes = inputFactoryContext.breaker().getUsed(); + if (inputUsedBytes != 0L) { + fail(exception, "Expected no used bytes for input, found: " + inputUsedBytes); + } + long driverUsedBytes = driverContext.breaker().getUsed(); + if (driverUsedBytes != 0L) { + fail(exception, "Expected no used bytes for driver, found: " + driverUsedBytes); + } } /**