diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java index fe60d3eda442..166f6f7f61f7 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/hll/HllSketchAggregatorFactory.java @@ -289,10 +289,10 @@ public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggrega return null; } HllSketchAggregatorFactory that = (HllSketchAggregatorFactory) preAggregated; - if (lgK == that.lgK && tgtHllType == that.tgtHllType && stringEncoding == that.stringEncoding && Objects.equals( - fieldName, - that.fieldName - )) { + if (lgK <= that.lgK && + stringEncoding == that.stringEncoding && + Objects.equals(fieldName, that.fieldName) + ) { return getCombiningFactory(); } return null; diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java index df181cc7dd96..04b6d2d23031 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/kll/KllSketchAggregatorFactory.java @@ -226,6 +226,23 @@ public byte[] getCacheKey() return new CacheKeyBuilder(cacheTypeId).appendString(name).appendString(fieldName).appendInt(k).build(); } + @Nullable + @Override + public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated) + { + if (this == preAggregated) { + return getCombiningFactory(); + } + if (getClass() != preAggregated.getClass()) { + return null; + } + KllSketchAggregatorFactory that = (KllSketchAggregatorFactory) preAggregated; + if (Objects.equals(fieldName, that.fieldName) && k == that.k && maxStreamLength <= that.maxStreamLength) { + return getCombiningFactory(); + } + return null; + } + @Override public boolean equals(final Object o) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java index 4778b950a17a..acdef51178fd 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactory.java @@ -424,6 +424,25 @@ public byte[] getCacheKey() return new CacheKeyBuilder(cacheTypeId).appendString(name).appendString(fieldName).appendInt(k).build(); } + @Nullable + @Override + public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated) + { + if (this == preAggregated) { + return getCombiningFactory(); + } + + if (getClass() != preAggregated.getClass()) { + return null; + } + + DoublesSketchAggregatorFactory that = (DoublesSketchAggregatorFactory) preAggregated; + if (k <= that.k && maxStreamLength <= that.getMaxStreamLength() && Objects.equals(fieldName, that.fieldName)) { + return getCombiningFactory(); + } + return null; + } + @Override public boolean equals(Object o) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java index b24e382ec0aa..5a2baec256c5 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactory.java @@ -49,6 +49,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.Objects; public abstract class SketchAggregatorFactory extends AggregatorFactory { @@ -266,6 +267,22 @@ public byte[] getCacheKey() .array(); } + @Override + public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated) + { + if (this == preAggregated) { + return getCombiningFactory(); + } + if (getClass() != preAggregated.getClass()) { + return null; + } + SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory) preAggregated; + if (Objects.equals(fieldName, that.fieldName) && size <= that.size) { + return getCombiningFactory(); + } + return null; + } + @Override public String toString() { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java index addf76d38dfb..8a7dec444a9c 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/theta/SketchMergeAggregatorFactory.java @@ -29,6 +29,7 @@ import org.apache.druid.segment.column.ColumnType; import javax.annotation.Nullable; +import java.util.Objects; public class SketchMergeAggregatorFactory extends SketchAggregatorFactory { @@ -165,6 +166,25 @@ public AggregatorFactory withName(String newName) ); } + @Override + public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated) + { + if (this == preAggregated) { + return getCombiningFactory(); + } + if (getClass() != preAggregated.getClass()) { + return null; + } + SketchMergeAggregatorFactory that = (SketchMergeAggregatorFactory) preAggregated; + if (Objects.equals(fieldName, that.fieldName) && + size <= that.size && + isInputThetaSketch == that.isInputThetaSketch + ) { + return getCombiningFactory(); + } + return null; + } + @Override public boolean equals(Object o) { diff --git a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java index 3fe939b4ca87..8f0adce1ec36 100644 --- a/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java +++ b/extensions-core/datasketches/src/main/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactory.java @@ -307,6 +307,29 @@ public ColumnType getResultType() return ColumnType.DOUBLE; } + @Nullable + @Override + public AggregatorFactory substituteCombiningFactory(AggregatorFactory preAggregated) + { + if (this == preAggregated) { + return getCombiningFactory(); + } + + if (getClass() != preAggregated.getClass()) { + return null; + } + + ArrayOfDoublesSketchAggregatorFactory that = (ArrayOfDoublesSketchAggregatorFactory) preAggregated; + if (nominalEntries <= that.nominalEntries && + numberOfValues == that.numberOfValues && + Objects.equals(fieldName, that.fieldName) && + Objects.equals(metricColumns, that.metricColumns) + ) { + return getCombiningFactory(); + } + return null; + } + @Override public String toString() { diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactoryTest.java index c56598e4a720..8389c81ce51c 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/kll/KllDoublesSketchAggregatorFactoryTest.java @@ -153,4 +153,22 @@ public void testResultArraySignature() new TimeseriesQueryQueryToolChest().resultArraySignature(query) ); } + + @Test + public void testCanSubstitute() + { + AggregatorFactory sketch = new KllDoublesSketchAggregatorFactory("sketch", "x", 200, null); + AggregatorFactory sketch2 = new KllDoublesSketchAggregatorFactory("other", "x", 200, null); + AggregatorFactory sketch3 = new KllDoublesSketchAggregatorFactory("sketch", "x", 200, 1_000L); + AggregatorFactory sketch4 = new KllDoublesSketchAggregatorFactory("sketch", "y", 200, null); + AggregatorFactory sketch5 = new KllDoublesSketchAggregatorFactory("sketch", "x", 300, null); + + Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2)); + Assert.assertNotNull(sketch3.substituteCombiningFactory(sketch2)); + Assert.assertNotNull(sketch3.substituteCombiningFactory(sketch)); + Assert.assertNotNull(sketch2.substituteCombiningFactory(sketch)); + Assert.assertNull(sketch.substituteCombiningFactory(sketch3)); + Assert.assertNull(sketch.substituteCombiningFactory(sketch4)); + Assert.assertNull(sketch.substituteCombiningFactory(sketch5)); + } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java index f5a15cde2428..9af69174eecd 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/quantiles/DoublesSketchAggregatorFactoryTest.java @@ -201,4 +201,19 @@ public void testNullSketches() ac.fold(new TestDoublesSketchColumnValueSelector()); Assert.assertNotNull(ac.getObject()); } + + @Test + public void testCanSubstitute() + { + final DoublesSketchAggregatorFactory sketch = new DoublesSketchAggregatorFactory("sketch", "x", 1024, 1000L, null); + final DoublesSketchAggregatorFactory sketch2 = new DoublesSketchAggregatorFactory("other", "x", 1024, 2000L, null); + final DoublesSketchAggregatorFactory sketch3 = new DoublesSketchAggregatorFactory("another", "x", 2048, 1000L, null); + final DoublesSketchAggregatorFactory incompatible = new DoublesSketchAggregatorFactory("incompatible", "y", 1024, 1000L, null); + + Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2)); + Assert.assertNotNull(sketch.substituteCombiningFactory(sketch3)); + Assert.assertNull(sketch2.substituteCombiningFactory(sketch3)); + Assert.assertNull(sketch.substituteCombiningFactory(incompatible)); + Assert.assertNull(sketch3.substituteCombiningFactory(sketch)); + } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java index 23887652a735..1b2565dde298 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/theta/SketchAggregatorFactoryTest.java @@ -24,6 +24,7 @@ import org.apache.druid.java.util.common.granularity.Granularities; import org.apache.druid.query.Druids; import org.apache.druid.query.aggregation.AggregatorAndSize; +import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.CountAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchBuildAggregatorFactory; import org.apache.druid.query.aggregation.datasketches.theta.oldapi.OldSketchMergeAggregatorFactory; @@ -213,4 +214,18 @@ public void testFactorizeVectorOnUnsupportedComplexColumn() Throwable exception = Assert.assertThrows(DruidException.class, () -> AGGREGATOR_16384.factorizeVector(vectorFactory)); Assert.assertEquals("Unsupported input [x] of type [COMPLEX] for aggregator [COMPLEX].", exception.getMessage()); } + + @Test + public void testCanSubstitute() + { + AggregatorFactory sketch1 = new SketchMergeAggregatorFactory("sketch", "x", 16, true, false, 2); + AggregatorFactory sketch2 = new SketchMergeAggregatorFactory("other", "x", null, false, false, null); + AggregatorFactory sketch3 = new SketchMergeAggregatorFactory("sketch", "x", null, false, false, 3); + AggregatorFactory sketch4 = new SketchMergeAggregatorFactory("sketch", "y", null, false, false, null); + + Assert.assertNotNull(sketch1.substituteCombiningFactory(sketch2)); + Assert.assertNotNull(sketch1.substituteCombiningFactory(sketch3)); + Assert.assertNull(sketch1.substituteCombiningFactory(sketch4)); + Assert.assertNull(sketch2.substituteCombiningFactory(sketch1)); + } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java index 10f2afe22485..d62c6c61ce7a 100644 --- a/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/query/aggregation/datasketches/tuple/ArrayOfDoublesSketchAggregatorFactoryTest.java @@ -118,4 +118,21 @@ public void testWithName() Assert.assertEquals(factory, factory.withName("name")); Assert.assertEquals("newTest", factory.withName("newTest").getName()); } + + @Test + public void testCanSubstitute() + { + AggregatorFactory sketch = new ArrayOfDoublesSketchAggregatorFactory("sketch", "x", null, null, null); + AggregatorFactory sketch2 = new ArrayOfDoublesSketchAggregatorFactory("sketch2", "x", null, null, null); + AggregatorFactory other = new ArrayOfDoublesSketchAggregatorFactory("other", "x", 8192, null, null); + AggregatorFactory incompatible = new ArrayOfDoublesSketchAggregatorFactory("incompatible", "x", 2048, null, null); + AggregatorFactory incompatible2 = new ArrayOfDoublesSketchAggregatorFactory("sketch", "y", null, null, null); + Assert.assertNotNull(sketch.substituteCombiningFactory(other)); + Assert.assertNotNull(sketch.substituteCombiningFactory(sketch2)); + Assert.assertNull(sketch.substituteCombiningFactory(incompatible)); + Assert.assertNotNull(sketch.substituteCombiningFactory(sketch)); + Assert.assertNull(other.substituteCombiningFactory(sketch)); + Assert.assertNull(sketch.substituteCombiningFactory(incompatible2)); + Assert.assertNull(other.substituteCombiningFactory(incompatible2)); + } } diff --git a/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java b/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java new file mode 100644 index 000000000000..9b0af45ef19b --- /dev/null +++ b/extensions-core/datasketches/src/test/java/org/apache/druid/segment/DatasketchesProjectionTest.java @@ -0,0 +1,489 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment; + +import com.google.common.collect.ImmutableMap; +import org.apache.datasketches.common.Family; +import org.apache.datasketches.hll.HllSketch; +import org.apache.datasketches.kll.KllDoublesSketch; +import org.apache.datasketches.quantiles.DoublesSketch; +import org.apache.datasketches.theta.SetOperation; +import org.apache.datasketches.theta.Union; +import org.apache.datasketches.thetacommon.ThetaUtil; +import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesSketch; +import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketch; +import org.apache.datasketches.tuple.arrayofdoubles.ArrayOfDoublesUpdatableSketchBuilder; +import org.apache.druid.collections.CloseableDefaultBlockingPool; +import org.apache.druid.collections.CloseableStupidPool; +import org.apache.druid.collections.NonBlockingPool; +import org.apache.druid.data.input.impl.AggregateProjectionSpec; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.DoubleDimensionSchema; +import org.apache.druid.data.input.impl.FloatDimensionSchema; +import org.apache.druid.data.input.impl.LongDimensionSchema; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.io.Closer; +import org.apache.druid.query.DruidProcessingConfig; +import org.apache.druid.query.QueryContexts; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchHolder; +import org.apache.druid.query.aggregation.datasketches.hll.HllSketchModule; +import org.apache.druid.query.aggregation.datasketches.kll.KllDoublesSketchAggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.kll.KllSketchModule; +import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchModule; +import org.apache.druid.query.aggregation.datasketches.theta.SketchHolder; +import org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.theta.SketchModule; +import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchAggregatorFactory; +import org.apache.druid.query.aggregation.datasketches.tuple.ArrayOfDoublesSketchModule; +import org.apache.druid.query.groupby.GroupByQuery; +import org.apache.druid.query.groupby.GroupByQueryConfig; +import org.apache.druid.query.groupby.GroupByResourcesReservationPool; +import org.apache.druid.query.groupby.GroupingEngine; +import org.apache.druid.query.groupby.ResultRow; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.incremental.IncrementalIndex; +import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory; +import org.apache.druid.segment.incremental.IncrementalIndexSchema; +import org.apache.druid.testing.InitializedNullHandlingTest; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +/** + * like {@link CursorFactoryProjectionTest} but for sketch aggs + */ +@RunWith(Parameterized.class) +public class DatasketchesProjectionTest extends InitializedNullHandlingTest +{ + private static final Closer CLOSER = Closer.create(); + + private static final List PROJECTIONS = Collections.singletonList( + new AggregateProjectionSpec( + "a_projection", + VirtualColumns.create( + Granularities.toVirtualColumn(Granularities.HOUR, "__gran") + ), + Arrays.asList( + new LongDimensionSchema("__gran"), + new StringDimensionSchema("a") + ), + new AggregatorFactory[]{ + new HllSketchBuildAggregatorFactory("_b_hll", "b", null, null, null, null, false), + new SketchMergeAggregatorFactory("_b_theta", "b", null, null, false, null), + new DoublesSketchAggregatorFactory("_d_doubles", "d", null), + new ArrayOfDoublesSketchAggregatorFactory("_bcd_aod", "b", null, Arrays.asList("c", "d"), null), + new KllDoublesSketchAggregatorFactory("_d_kll", "d", null, null) + } + ) + ); + + private static final List AUTO_PROJECTIONS = PROJECTIONS.stream().map(projection -> { + return new AggregateProjectionSpec( + projection.getName(), + projection.getVirtualColumns(), + projection.getGroupingColumns() + .stream() + .map(x -> new AutoTypeColumnSchema(x.getName(), null)) + .collect(Collectors.toList()), + projection.getAggregators() + ); + }).collect(Collectors.toList()); + + @Parameterized.Parameters(name = "name: {0}, sortByDim: {3}, autoSchema: {4}") + public static Collection constructorFeeder() + { + HllSketchModule.registerSerde(); + TestHelper.JSON_MAPPER.registerModules(new HllSketchModule().getJacksonModules()); + SketchModule.registerSerde(); + TestHelper.JSON_MAPPER.registerModules(new SketchModule().getJacksonModules()); + KllSketchModule.registerSerde(); + TestHelper.JSON_MAPPER.registerModules(new KllSketchModule().getJacksonModules()); + DoublesSketchModule.registerSerde(); + TestHelper.JSON_MAPPER.registerModules(new DoublesSketchModule().getJacksonModules()); + ArrayOfDoublesSketchModule.registerSerde(); + TestHelper.JSON_MAPPER.registerModules(new ArrayOfDoublesSketchModule().getJacksonModules()); + + final List constructors = new ArrayList<>(); + final DimensionsSpec.Builder dimensionsBuilder = + DimensionsSpec.builder() + .setDimensions( + Arrays.asList( + new StringDimensionSchema("a"), + new StringDimensionSchema("b"), + new LongDimensionSchema("c"), + new DoubleDimensionSchema("d"), + new FloatDimensionSchema("e") + ) + ); + DimensionsSpec dimsTimeOrdered = dimensionsBuilder.build(); + DimensionsSpec dimsOrdered = dimensionsBuilder.setForceSegmentSortByTime(false).build(); + + + List autoDims = dimsOrdered.getDimensions() + .stream() + .map(x -> new AutoTypeColumnSchema(x.getName(), null)) + .collect(Collectors.toList()); + for (boolean incremental : new boolean[]{true, false}) { + for (boolean sortByDim : new boolean[]{true, false}) { + for (boolean autoSchema : new boolean[]{true, false}) { + final DimensionsSpec dims; + if (sortByDim) { + if (autoSchema) { + dims = dimsOrdered.withDimensions(autoDims); + } else { + dims = dimsOrdered; + } + } else { + if (autoSchema) { + dims = dimsTimeOrdered.withDimensions(autoDims); + } else { + dims = dimsTimeOrdered; + } + } + if (incremental) { + IncrementalIndex index = CLOSER.register(makeBuilder(dims, autoSchema).buildIncrementalIndex()); + constructors.add(new Object[]{ + "incrementalIndex", + new IncrementalIndexCursorFactory(index), + new IncrementalIndexTimeBoundaryInspector(index), + sortByDim, + autoSchema + }); + } else { + QueryableIndex index = CLOSER.register(makeBuilder(dims, autoSchema).buildMMappedIndex()); + constructors.add(new Object[]{ + "queryableIndex", + new QueryableIndexCursorFactory(index), + QueryableIndexTimeBoundaryInspector.create(index), + sortByDim, + autoSchema + }); + } + } + } + } + return constructors; + } + + @AfterClass + public static void cleanup() throws IOException + { + CLOSER.close(); + } + + private static IndexBuilder makeBuilder(DimensionsSpec dimensionsSpec, boolean autoSchema) + { + File tmp = FileUtils.createTempDir(); + CLOSER.register(tmp::delete); + return IndexBuilder.create() + .tmpDir(tmp) + .schema( + IncrementalIndexSchema.builder() + .withDimensionsSpec(dimensionsSpec) + .withRollup(false) + .withMinTimestamp(CursorFactoryProjectionTest.TIMESTAMP.getMillis()) + .withProjections(autoSchema ? AUTO_PROJECTIONS : PROJECTIONS) + .build() + ) + .rows(CursorFactoryProjectionTest.ROWS); + } + + public final CursorFactory projectionsCursorFactory; + public final TimeBoundaryInspector projectionsTimeBoundaryInspector; + + private final GroupingEngine groupingEngine; + + private final NonBlockingPool nonBlockingPool; + public final boolean sortByDim; + public final boolean autoSchema; + + @Rule + public final CloserRule closer = new CloserRule(false); + + public DatasketchesProjectionTest( + String name, + CursorFactory projectionsCursorFactory, + TimeBoundaryInspector projectionsTimeBoundaryInspector, + boolean sortByDim, + boolean autoSchema + ) + { + this.projectionsCursorFactory = projectionsCursorFactory; + this.projectionsTimeBoundaryInspector = projectionsTimeBoundaryInspector; + this.sortByDim = sortByDim; + this.autoSchema = autoSchema; + this.nonBlockingPool = closer.closeLater( + new CloseableStupidPool<>( + "GroupByQueryEngine-bufferPool", + () -> ByteBuffer.allocate(1 << 24) + ) + ); + this.groupingEngine = new GroupingEngine( + new DruidProcessingConfig(), + GroupByQueryConfig::new, + new GroupByResourcesReservationPool( + closer.closeLater( + new CloseableDefaultBlockingPool<>( + () -> ByteBuffer.allocate(1 << 24), + 5 + ) + ), + new GroupByQueryConfig() + ), + TestHelper.makeJsonMapper(), + TestHelper.makeSmileMapper(), + (query, future) -> { + } + ); + } + + @Test + public void testProjectionSingleDim() + { + // test can use the single dimension projection + final GroupByQuery query = + GroupByQuery.builder() + .setDataSource("test") + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ETERNITY) + .addDimension("a") + .setAggregatorSpecs( + new HllSketchBuildAggregatorFactory("b_distinct", "b", null, null, null, true, true), + new SketchMergeAggregatorFactory("b_distinct_theta", "b", null, null, null, null), + new DoublesSketchAggregatorFactory("d_doubles", "d", null, null, null), + new ArrayOfDoublesSketchAggregatorFactory("b_doubles", "b", null, Arrays.asList("c", "d"), null), + new KllDoublesSketchAggregatorFactory("d", "d", null, null) + ) + .build(); + final CursorBuildSpec buildSpec = GroupingEngine.makeCursorBuildSpec(query, null); + try (final CursorHolder cursorHolder = projectionsCursorFactory.makeCursorHolder(buildSpec)) { + final Cursor cursor = cursorHolder.asCursor(); + int rowCount = 0; + while (!cursor.isDone()) { + rowCount++; + cursor.advance(); + } + Assert.assertEquals(3, rowCount); + } + final Sequence resultRows = groupingEngine.process( + query, + projectionsCursorFactory, + projectionsTimeBoundaryInspector, + nonBlockingPool, + null + ); + final List results = resultRows.toList(); + Assert.assertEquals(2, results.size()); + List expectedResults = getSingleDimExpected(); + final RowSignature querySignature = query.getResultRowSignature(RowSignature.Finalization.NO); + for (int i = 0; i < expectedResults.size(); i++) { + assertResults( + expectedResults.get(i), + results.get(i).getArray(), + querySignature + ); + } + } + + @Test + public void testProjectionSingleDimNoProjections() + { + // test can use the single dimension projection + final GroupByQuery query = + GroupByQuery.builder() + .setDataSource("test") + .setGranularity(Granularities.ALL) + .setInterval(Intervals.ETERNITY) + .addDimension("a") + .setAggregatorSpecs( + new HllSketchBuildAggregatorFactory("b_distinct", "b", null, null, null, true, true), + new SketchMergeAggregatorFactory("b_distinct_theta", "b", null, null, null, null), + new DoublesSketchAggregatorFactory("d_doubles", "d", null, null, null), + new ArrayOfDoublesSketchAggregatorFactory("b_doubles", "b", null, Arrays.asList("c", "d"), null), + new KllDoublesSketchAggregatorFactory("d", "d", null, null) + ) + .setContext(ImmutableMap.of(QueryContexts.NO_PROJECTIONS, true)) + .build(); + final CursorBuildSpec buildSpec = GroupingEngine.makeCursorBuildSpec(query, null); + try (final CursorHolder cursorHolder = projectionsCursorFactory.makeCursorHolder(buildSpec)) { + final Cursor cursor = cursorHolder.asCursor(); + int rowCount = 0; + while (!cursor.isDone()) { + rowCount++; + cursor.advance(); + } + Assert.assertEquals(8, rowCount); + } + final Sequence resultRows = groupingEngine.process( + query, + projectionsCursorFactory, + projectionsTimeBoundaryInspector, + nonBlockingPool, + null + ); + final List results = resultRows.toList(); + Assert.assertEquals(2, results.size()); + List expectedResults = getSingleDimExpected(); + final RowSignature querySignature = query.getResultRowSignature(RowSignature.Finalization.NO); + for (int i = 0; i < expectedResults.size(); i++) { + assertResults( + expectedResults.get(i), + results.get(i).getArray(), + querySignature + ); + } + } + + private List getSingleDimExpected() + { + HllSketch hll1 = new HllSketch(HllSketch.DEFAULT_LG_K); + Union theta1 = (Union) SetOperation.builder().build(Family.UNION); + DoublesSketch d1 = DoublesSketch.builder().setK(DoublesSketchAggregatorFactory.DEFAULT_K).build(); + ArrayOfDoublesUpdatableSketch ad1 = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(ThetaUtil.DEFAULT_NOMINAL_ENTRIES) + .setNumberOfValues(2) + .build(); + KllDoublesSketch kll1 = KllDoublesSketch.newHeapInstance(); + hll1.update("aa"); + hll1.update("bb"); + hll1.update("cc"); + hll1.update("dd"); + theta1.update("aa"); + theta1.update("bb"); + theta1.update("cc"); + theta1.update("dd"); + d1.update(1.0); + d1.update(1.1); + d1.update(2.2); + d1.update(1.1); + d1.update(2.2); + ad1.update("aa", new double[]{1.0, 1.0}); + ad1.update("bb", new double[]{1.0, 1.1}); + ad1.update("cc", new double[]{2.0, 2.2}); + ad1.update("aa", new double[]{1.0, 1.1}); + ad1.update("dd", new double[]{2.0, 2.2}); + kll1.update(1.0); + kll1.update(1.1); + kll1.update(2.2); + kll1.update(1.1); + kll1.update(2.2); + HllSketch hll2 = new HllSketch(HllSketch.DEFAULT_LG_K); + Union theta2 = (Union) SetOperation.builder().build(Family.UNION); + DoublesSketch d2 = DoublesSketch.builder().setK(DoublesSketchAggregatorFactory.DEFAULT_K).build(); + ArrayOfDoublesUpdatableSketch ad2 = new ArrayOfDoublesUpdatableSketchBuilder().setNominalEntries(ThetaUtil.DEFAULT_NOMINAL_ENTRIES) + .setNumberOfValues(2) + .build(); + KllDoublesSketch kll2 = KllDoublesSketch.newHeapInstance(); + hll2.update("aa"); + hll2.update("bb"); + theta2.update("aa"); + theta2.update("bb"); + d2.update(3.3); + d2.update(4.4); + d2.update(5.5); + ad2.update("aa", new double[]{3.0, 3.3}); + ad2.update("aa", new double[]{4.0, 4.4}); + ad2.update("bb", new double[]{5.0, 5.5}); + kll2.update(3.3); + kll2.update(4.4); + kll2.update(5.5); + + return Arrays.asList( + new Object[]{"a", HllSketchHolder.of(hll1), SketchHolder.of(theta1), d1, ad1, kll1}, + new Object[]{"b", HllSketchHolder.of(hll2), SketchHolder.of(theta2), d2, ad2, kll2} + ); + + } + + private void assertResults(Object[] expected, Object[] actual, RowSignature signature) + { + Assert.assertEquals(expected.length, actual.length); + for (int i = 0; i < expected.length; i++) { + if (signature.getColumnType(i).get().equals(ColumnType.ofComplex(HllSketchModule.BUILD_TYPE_NAME))) { + Assert.assertEquals( + ((HllSketchHolder) expected[i]).getEstimate(), + ((HllSketchHolder) actual[i]).getEstimate(), + 0.01 + ); + } else if (signature.getColumnType(i).get().equals(DoublesSketchModule.TYPE)) { + Assert.assertEquals( + ((DoublesSketch) expected[i]).getMinItem(), + ((DoublesSketch) actual[i]).getMinItem(), + 0.01 + ); + Assert.assertEquals( + ((DoublesSketch) expected[i]).getMaxItem(), + ((DoublesSketch) actual[i]).getMaxItem(), + 0.01 + ); + } else if (signature.getColumnType(i).get().equals(ArrayOfDoublesSketchModule.BUILD_TYPE)) { + Assert.assertEquals( + ((ArrayOfDoublesSketch) expected[i]).getEstimate(), + ((ArrayOfDoublesSketch) actual[i]).getEstimate(), + 0.01 + ); + Assert.assertEquals( + ((ArrayOfDoublesSketch) expected[i]).getLowerBound(0), + ((ArrayOfDoublesSketch) actual[i]).getLowerBound(0), + 0.01 + ); + Assert.assertEquals( + ((ArrayOfDoublesSketch) expected[i]).getUpperBound(0), + ((ArrayOfDoublesSketch) actual[i]).getUpperBound(0), + 0.01 + ); + } else if (signature.getColumnType(i).get().equals(KllSketchModule.DOUBLES_TYPE)) { + Assert.assertEquals( + ((KllDoublesSketch) expected[i]).getMinItem(), + ((KllDoublesSketch) actual[i]).getMinItem(), + 0.01 + ); + Assert.assertEquals( + ((KllDoublesSketch) expected[i]).getMaxItem(), + ((KllDoublesSketch) actual[i]).getMaxItem(), + 0.01 + ); + } else { + Assert.assertEquals(expected[i], actual[i]); + } + } + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java b/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java index 7712cecb3034..1b63d9cb1df8 100644 --- a/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java +++ b/processing/src/main/java/org/apache/druid/segment/AggregateProjectionMetadata.java @@ -332,6 +332,7 @@ public Projections.ProjectionMatch matches( if (combining != null) { matchBuilder.remapColumn(queryAgg.getName(), projectionAgg.getName()).addPreAggregatedAggregator(combining); foundMatch = true; + break; } } allMatch = allMatch && foundMatch; diff --git a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java index 446e714f997a..7eae70cc1f4e 100644 --- a/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java +++ b/processing/src/test/java/org/apache/druid/segment/CursorFactoryProjectionTest.java @@ -99,15 +99,15 @@ public class CursorFactoryProjectionTest extends InitializedNullHandlingTest { private static final Closer CLOSER = Closer.create(); - private static final DateTime TIMESTAMP = Granularities.DAY.bucket(DateTimes.nowUtc()).getStart(); + static final DateTime TIMESTAMP = Granularities.DAY.bucket(DateTimes.nowUtc()).getStart(); - private static final RowSignature ROW_SIGNATURE = RowSignature.builder() + static final RowSignature ROW_SIGNATURE = RowSignature.builder() .add("a", ColumnType.STRING) .add("b", ColumnType.STRING) .add("c", ColumnType.LONG) .add("d", ColumnType.DOUBLE) .build(); - private static final List ROWS = Arrays.asList( + static final List ROWS = Arrays.asList( new ListBasedInputRow( ROW_SIGNATURE, TIMESTAMP,