Skip to content

Commit

Permalink
Fixed ValuesBytesRefGroupingAggregator CB leak (#119121) (#119211)
Browse files Browse the repository at this point in the history
- Fixed a leak if the breaker breaks on `ValuesBytesRefGroupingAggregator`
- Improved aggregators Cranky test to show a better error with the failing stacktrace
  • Loading branch information
ivancea authored Dec 23, 2024
1 parent 2f865a6 commit 4c63481
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 6 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -244,8 +244,20 @@ $endif$
$if(long||double)$
values = new LongLongHash(1, bigArrays);
$elseif(BytesRef)$
values = new LongLongHash(1, bigArrays);
bytes = new BytesRefHash(1, bigArrays);
LongLongHash _values = null;
BytesRefHash _bytes = null;
try {
_values = new LongLongHash(1, bigArrays);
_bytes = new BytesRefHash(1, bigArrays);

values = _values;
bytes = _bytes;

_values = null;
_bytes = null;
} finally {
Releasables.closeExpectNoException(_values, _bytes);
}
$elseif(int||float)$
values = new LongHash(1, bigArrays);
$endif$
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.aggregation;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.operator.SequenceBytesRefBlockSourceOperator;
import org.elasticsearch.compute.operator.SourceOperator;

import java.util.Arrays;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.containsInAnyOrder;

public class ValuesBytesRefAggregatorFunctionTests extends AggregatorFunctionTestCase {
@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
return new SequenceBytesRefBlockSourceOperator(
blockFactory,
IntStream.range(0, size).mapToObj(l -> new BytesRef(randomAlphaOfLengthBetween(0, 100)))
);
}

@Override
protected AggregatorFunctionSupplier aggregatorFunction(List<Integer> inputChannels) {
return new ValuesBytesRefAggregatorFunctionSupplier(inputChannels);
}

@Override
protected String expectedDescriptionOfAggregator() {
return "values of bytes";
}

@Override
public void assertSimpleOutput(List<Block> input, Block result) {
TreeSet<?> set = new TreeSet<>((List<?>) BlockUtils.toJavaObject(result, 0));
Object[] values = input.stream()
.flatMap(AggregatorFunctionTestCase::allBytesRefs)
.collect(Collectors.toSet())
.toArray(Object[]::new);
if (false == set.containsAll(Arrays.asList(values))) {
assertThat(set, containsInAnyOrder(values));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.compute.aggregation;

import org.apache.lucene.util.BytesRef;
import org.elasticsearch.compute.data.Block;
import org.elasticsearch.compute.data.BlockFactory;
import org.elasticsearch.compute.data.BlockUtils;
import org.elasticsearch.compute.data.Page;
import org.elasticsearch.compute.operator.LongBytesRefTupleBlockSourceOperator;
import org.elasticsearch.compute.operator.SourceOperator;
import org.elasticsearch.core.Tuple;

import java.util.Arrays;
import java.util.List;
import java.util.TreeSet;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.nullValue;

public class ValuesBytesRefGroupingAggregatorFunctionTests extends GroupingAggregatorFunctionTestCase {
@Override
protected AggregatorFunctionSupplier aggregatorFunction(List<Integer> inputChannels) {
return new ValuesBytesRefAggregatorFunctionSupplier(inputChannels);
}

@Override
protected String expectedDescriptionOfAggregator() {
return "values of bytes";
}

@Override
protected SourceOperator simpleInput(BlockFactory blockFactory, int size) {
return new LongBytesRefTupleBlockSourceOperator(
blockFactory,
IntStream.range(0, size).mapToObj(l -> Tuple.tuple(randomLongBetween(0, 4), new BytesRef(randomAlphaOfLengthBetween(0, 100))))
);
}

@Override
public void assertSimpleGroup(List<Page> input, Block result, int position, Long group) {
Object[] values = input.stream().flatMap(p -> allBytesRefs(p, group)).collect(Collectors.toSet()).toArray(Object[]::new);
Object resultValue = BlockUtils.toJavaObject(result, position);
switch (values.length) {
case 0 -> assertThat(resultValue, nullValue());
case 1 -> assertThat(resultValue, equalTo(values[0]));
default -> {
TreeSet<?> set = new TreeSet<>((List<?>) resultValue);
if (false == set.containsAll(Arrays.asList(values))) {
assertThat(set, containsInAnyOrder(values));
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,23 +143,31 @@ public final void testSimpleWithCranky() {

DriverContext driverContext = crankyDriverContext();

Exception exception = null;
boolean driverStarted = false;
try {
Operator operator = simple().get(driverContext);
driverStarted = true;
drive(operator, input.iterator(), driverContext);
// Either we get lucky and cranky doesn't throw and the test completes or we don't and it throws
} catch (CircuitBreakingException e) {
logger.info("broken", e);
assertThat(e.getMessage(), equalTo(CrankyCircuitBreakerService.ERROR_MESSAGE));
exception = e;
}
if (driverStarted == false) {
// if drive hasn't even started then we need to release the input pages
Releasables.closeExpectNoException(Releasables.wrap(() -> Iterators.map(input.iterator(), p -> p::releaseBlocks)));
}

// Note the lack of try/finally here - we're asserting that when the driver throws an exception we clear the breakers.
assertThat(inputFactoryContext.breaker().getUsed(), equalTo(0L));
long inputUsedBytes = inputFactoryContext.breaker().getUsed();
if (inputUsedBytes != 0L) {
fail(exception, "Expected no used bytes for input, found: " + inputUsedBytes);
}
long driverUsedBytes = driverContext.breaker().getUsed();
if (driverUsedBytes != 0L) {
fail(exception, "Expected no used bytes for driver, found: " + driverUsedBytes);
}
}

/**
Expand Down

0 comments on commit 4c63481

Please sign in to comment.