From b53c26f5c5972d51f712fa4a2c4b906006fbb392 Mon Sep 17 00:00:00 2001 From: Akshat Jain Date: Thu, 18 Jul 2024 07:35:09 +0530 Subject: [PATCH] Fix issues with partitioning boundaries for MSQ window functions (#16729) * Fix issues with partitioning boundaries for MSQ window functions * Address review comments * Address review comments * Add test for coverage check failure * Address review comment * Remove DruidWindowQueryTest and WindowQueryTestBase, move those tests to DrillWindowQueryTest * Update extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java * Address review comments * Add test for equals and hashcode for WindowOperatorQueryFrameProcessorFactory * Address review comment * Fix checkstyle --------- Co-authored-by: Benedict Jin --- .../WindowOperatorQueryFrameProcessor.java | 61 ++++--- ...dowOperatorQueryFrameProcessorFactory.java | 18 +- .../msq/querykit/WindowOperatorQueryKit.java | 157 +++++++++++++----- ...peratorQueryFrameProcessorFactoryTest.java | 35 ++++ .../druid/msq/test/CalciteMSQTestsHelper.java | 16 ++ .../apache/druid/query/operator/Operator.java | 2 +- .../operator/window/ComposingProcessor.java | 12 ++ .../query/operator/window/Processor.java | 7 + .../WindowFramedAggregateProcessor.java | 12 ++ .../ranking/WindowPercentileProcessor.java | 8 + .../ranking/WindowRankingProcessorBase.java | 6 + .../ranking/WindowRowNumberProcessor.java | 9 + .../value/WindowValueProcessorBase.java | 8 + .../operator/WindowProcessorOperatorTest.java | 9 + .../window/ComposingProcessorTest.java | 10 ++ .../WindowFramedAggregateProcessorTest.java | 2 + .../ranking/WindowCumeDistProcessorTest.java | 2 + .../ranking/WindowDenseRankProcessorTest.java | 2 + .../WindowPercentileProcessorTest.java | 7 + .../ranking/WindowRankProcessorTest.java | 4 + .../ranking/WindowRowNumberProcessorTest.java | 3 + .../value/WindowFirstProcessorTest.java | 7 + .../window/value/WindowLastProcessorTest.java | 6 + .../sql/calcite/DrillWindowQueryTest.java | 74 +++++++++ .../multiple_windows/wikipedia_query_1.e | 13 ++ .../multiple_windows/wikipedia_query_1.q | 6 + .../wikipedia_query_1_named_windows.e | 13 ++ .../wikipedia_query_1_named_windows.q | 9 + .../wikipedia_query_1.e | 15 ++ .../wikipedia_query_1.q | 7 + .../wikipedia_query_2.e | 15 ++ .../wikipedia_query_2.q | 9 + .../wikipedia_query_1.e | 15 ++ .../wikipedia_query_1.q | 6 + .../wikipedia_query_1_named_window.e | 15 ++ .../wikipedia_query_1_named_window.q | 7 + .../shuffle_columns/wikipedia_query_1.e | 15 ++ .../shuffle_columns/wikipedia_query_1.q | 5 + .../wikipedia_query_1_shuffle_1.e | 15 ++ .../wikipedia_query_1_shuffle_1.q | 5 + .../shuffle_columns/wikipedia_query_2.e | 16 ++ .../shuffle_columns/wikipedia_query_2.q | 9 + .../wikipedia_query_2_shuffle_1.e | 16 ++ .../wikipedia_query_2_shuffle_1.q | 9 + 44 files changed, 614 insertions(+), 83 deletions(-) create mode 100644 extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.q create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.q create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.q create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.q create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.q create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.q create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.q create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.q create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.q create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.e create mode 100644 sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.q diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java index 6d8cfdfd2773..2bf21397ffb1 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessor.java @@ -38,7 +38,6 @@ import org.apache.druid.msq.indexing.error.MSQException; import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault; import org.apache.druid.query.groupby.ResultRow; -import org.apache.druid.query.operator.NaivePartitioningOperatorFactory; import org.apache.druid.query.operator.OffsetLimit; import org.apache.druid.query.operator.Operator; import org.apache.druid.query.operator.OperatorFactory; @@ -70,6 +69,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor private final WindowOperatorQuery query; private final List operatorFactoryList; + private final List partitionColumnNames; private final ObjectMapper jsonMapper; private final ArrayList frameRowsAndCols; private final ArrayList resultRowAndCols; @@ -79,7 +79,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor private final FrameReader frameReader; private final ArrayList objectsOfASingleRac; private final int maxRowsMaterialized; - List partitionColsIndex; private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed private Cursor frameCursor = null; private Supplier rowSupplierFromFrameCursor; @@ -97,7 +96,8 @@ public WindowOperatorQueryFrameProcessor( final List operatorFactoryList, final RowSignature rowSignature, final boolean isOverEmpty, - final int maxRowsMaterializedInWindow + final int maxRowsMaterializedInWindow, + final List partitionColumnNames ) { this.inputChannel = inputChannel; @@ -110,9 +110,9 @@ public WindowOperatorQueryFrameProcessor( this.frameRowsAndCols = new ArrayList<>(); this.resultRowAndCols = new ArrayList<>(); this.objectsOfASingleRac = new ArrayList<>(); - this.partitionColsIndex = new ArrayList<>(); this.isOverEmpty = isOverEmpty; this.maxRowsMaterialized = maxRowsMaterializedInWindow; + this.partitionColumnNames = partitionColumnNames; } @Override @@ -177,12 +177,12 @@ public ReturnOrAwait runIncrementally(IntSet readableInputs) * * Future thoughts: {@link https://github.com/apache/druid/issues/16126} * - * 1. We are writing 1 partition to each frame in this way. In case of low cardinality data - * we will me making a large number of small frames. We can have a check to keep size of frame to a value + * 1. We are writing 1 partition to each frame in this way. In case of high cardinality data + * we will be making a large number of small frames. We can have a check to keep size of frame to a value * say 20k rows and keep on adding to the same pending frame and not create a new frame * * 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data - * with high cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause + * with low cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause * Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data. * We might think to reimplement them in the MSQ way so that we do not have to materialize so much data */ @@ -218,7 +218,6 @@ public ReturnOrAwait runIncrementally(IntSet readableInputs) final Frame frame = inputChannel.read(); frameCursor = FrameProcessors.makeCursor(frame, frameReader); final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory(); - partitionColsIndex = findPartitionColumns(frameReader.signature()); final Supplier[] fieldSuppliers = new Supplier[frameReader.signature().size()]; for (int i = 0; i < fieldSuppliers.length; i++) { final ColumnValueSelector selector = @@ -259,18 +258,17 @@ public ReturnOrAwait runIncrementally(IntSet readableInputs) if (outputRow == null) { outputRow = currentRow; objectsOfASingleRac.add(currentRow); - } else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) { + } else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) { // if they have the same partition key // keep adding them after checking // guardrails + objectsOfASingleRac.add(currentRow); if (objectsOfASingleRac.size() > maxRowsMaterialized) { throw new MSQException(new TooManyRowsInAWindowFault( objectsOfASingleRac.size(), maxRowsMaterialized )); } - objectsOfASingleRac.add(currentRow); - } else { // key change noted // create rac from the rows seen before @@ -484,37 +482,36 @@ private void convertRowFrameToRowsAndColumns(Frame frame) frameRowsAndCols.add(ldrc); } - private List findPartitionColumns(RowSignature rowSignature) - { - List indexList = new ArrayList<>(); - for (OperatorFactory of : operatorFactoryList) { - if (of instanceof NaivePartitioningOperatorFactory) { - for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) { - indexList.add(rowSignature.indexOf(s)); - } - } - } - return indexList; - } - /** - * - * Compare two rows based only the columns in the partitionIndices - * In case the parition indices is empty or null compare entire row - * + * Compare two rows based on the columns in partitionColumnNames. + * If the partitionColumnNames is empty or null, compare entire row. + *

+ * For example, say: + *

    + *
  • partitionColumnNames = ["d1", "d2"]
  • + *
  • frameReader's row signature = {d1:STRING, d2:STRING, p0:STRING}
  • + *
  • frameReader.signature.indexOf("d1") = 0
  • + *
  • frameReader.signature.indexOf("d2") = 1
  • + *
  • row1 = [d1_row1, d2_row1, p0_row1]
  • + *
  • row2 = [d1_row2, d2_row2, p0_row2]
  • + *
+ *

+ * Then this method will return true if d1_row1==d1_row2 && d2_row1==d2_row2, false otherwise. + * Returning true would indicate that these 2 rows can be put into the same partition for window function processing. */ - private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List partitionIndices) + private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List partitionColumnNames) { - if (partitionIndices == null || partitionIndices.isEmpty()) { + if (partitionColumnNames == null || partitionColumnNames.isEmpty()) { return row1.equals(row2); } else { int match = 0; - for (int i : partitionIndices) { + for (String columnName : partitionColumnNames) { + int i = frameReader.signature().indexOf(columnName); if (Objects.equals(row1.get(i), row2.get(i))) { match++; } } - return match == partitionIndices.size(); + return match == partitionColumnNames.size(); } } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java index fbbc0a0fc3e7..d9c14390736f 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactory.java @@ -61,6 +61,7 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor private final RowSignature stageRowSignature; private final boolean isEmptyOver; private final int maxRowsMaterializedInWindow; + private final List partitionColumnNames; @JsonCreator public WindowOperatorQueryFrameProcessorFactory( @@ -68,7 +69,8 @@ public WindowOperatorQueryFrameProcessorFactory( @JsonProperty("operatorList") List operatorFactoryList, @JsonProperty("stageRowSignature") RowSignature stageRowSignature, @JsonProperty("emptyOver") boolean emptyOver, - @JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow + @JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow, + @JsonProperty("partitionColumnNames") List partitionColumnNames ) { this.query = Preconditions.checkNotNull(query, "query"); @@ -76,6 +78,7 @@ public WindowOperatorQueryFrameProcessorFactory( this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature"); this.isEmptyOver = emptyOver; this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow; + this.partitionColumnNames = partitionColumnNames; } @JsonProperty("query") @@ -90,6 +93,12 @@ public List getOperators() return operatorList; } + @JsonProperty("partitionColumnNames") + public List getPartitionColumnNames() + { + return partitionColumnNames; + } + @JsonProperty("stageRowSignature") public RowSignature getSignature() { @@ -148,7 +157,6 @@ public ProcessorsAndChannels makeProcessors( readableInput -> { final OutputChannel outputChannel = outputChannels.get(readableInput.getStagePartition().getPartitionNumber()); - return new WindowOperatorQueryFrameProcessor( query, readableInput.getChannel(), @@ -159,7 +167,8 @@ public ProcessorsAndChannels makeProcessors( operatorList, stageRowSignature, isEmptyOver, - maxRowsMaterializedInWindow + maxRowsMaterializedInWindow, + partitionColumnNames ); } ); @@ -185,12 +194,13 @@ public boolean equals(Object o) && maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow && Objects.equals(query, that.query) && Objects.equals(operatorList, that.operatorList) + && Objects.equals(partitionColumnNames, that.partitionColumnNames) && Objects.equals(stageRowSignature, that.stageRowSignature); } @Override public int hashCode() { - return Objects.hash(query, operatorList, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow); + return Objects.hash(query, operatorList, partitionColumnNames, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow); } } diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java index d08d78ef791f..3754f081a27a 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java @@ -24,9 +24,12 @@ import org.apache.druid.frame.key.ClusterBy; import org.apache.druid.frame.key.KeyColumn; import org.apache.druid.frame.key.KeyOrder; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.msq.exec.Limits; import org.apache.druid.msq.input.stage.StageInputSpec; import org.apache.druid.msq.kernel.HashShuffleSpec; +import org.apache.druid.msq.kernel.MixShuffleSpec; import org.apache.druid.msq.kernel.QueryDefinition; import org.apache.druid.msq.kernel.QueryDefinitionBuilder; import org.apache.druid.msq.kernel.ShuffleSpec; @@ -39,6 +42,7 @@ import org.apache.druid.query.operator.OperatorFactory; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.operator.window.WindowOperatorFactory; +import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import java.util.ArrayList; @@ -48,6 +52,7 @@ public class WindowOperatorQueryKit implements QueryKit { + private static final Logger log = new Logger(WindowOperatorQueryKit.class); private final ObjectMapper jsonMapper; public WindowOperatorQueryKit(ObjectMapper jsonMapper) @@ -65,13 +70,22 @@ public QueryDefinition makeQueryDefinition( int minStageNumber ) { - // need to validate query first - // populate the group of operators to be processed as each stage - // the size of the operators is the number of serialized stages - // later we should also check if these can be parallelized - // check there is an empty over clause or not - List> operatorList = new ArrayList<>(); - boolean isEmptyOverFound = ifEmptyOverPresentInWindowOperstors(originalQuery, operatorList); + // Need to validate query first. + // Populate the group of operators to be processed at each stage. + // The size of the operators is the number of serialized stages. + // Later we should also check if these can be parallelized. + // Check if there is an empty OVER() clause or not. + RowSignature rowSignature = originalQuery.getRowSignature(); + log.info("Row signature received for query is [%s].", rowSignature); + + boolean isEmptyOverPresent = originalQuery.getOperators() + .stream() + .filter(of -> of instanceof NaivePartitioningOperatorFactory) + .map(of -> (NaivePartitioningOperatorFactory) of) + .anyMatch(of -> of.getPartitionColumns().isEmpty()); + + List> operatorList = getOperatorListFromQuery(originalQuery); + log.info("Created operatorList with operator factories: [%s]", operatorList); ShuffleSpec nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(0), maxWorkerCount); // add this shuffle spec to the last stage of the inner query @@ -102,16 +116,14 @@ public QueryDefinition makeQueryDefinition( final int firstStageNumber = Math.max(minStageNumber, queryDefBuilder.getNextStageNumber()); final WindowOperatorQuery queryToRun = (WindowOperatorQuery) originalQuery.withDataSource(dataSourcePlan.getNewDataSource()); final int maxRowsMaterialized; - RowSignature rowSignature = queryToRun.getRowSignature(); + if (originalQuery.context() != null && originalQuery.context().containsKey(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW)) { - maxRowsMaterialized = (int) originalQuery.context() - .get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW); + maxRowsMaterialized = (int) originalQuery.context().get(MultiStageQueryContext.MAX_ROWS_MATERIALIZED_IN_WINDOW); } else { maxRowsMaterialized = Limits.MAX_ROWS_MATERIALIZED_IN_WINDOW; } - - if (isEmptyOverFound) { + if (isEmptyOverPresent) { // empty over clause found // moving everything to a single partition queryDefBuilder.add( @@ -125,28 +137,59 @@ public QueryDefinition makeQueryDefinition( queryToRun.getOperators(), rowSignature, true, - maxRowsMaterialized + maxRowsMaterialized, + new ArrayList<>() )) ); } else { - // there are multiple windows present in the query - // Create stages for each window in the query - // These stages will be serialized - // the partition by clause of the next window will be the shuffle key for the previous window + // There are multiple windows present in the query. + // Create stages for each window in the query. + // These stages will be serialized. + // The partition by clause of the next window will be the shuffle key for the previous window. RowSignature.Builder bob = RowSignature.builder(); - final int numberOfWindows = operatorList.size(); - final int baseSize = rowSignature.size() - numberOfWindows; - for (int i = 0; i < baseSize; i++) { - bob.add(rowSignature.getColumnName(i), rowSignature.getColumnType(i).get()); + RowSignature signatureFromInput = dataSourcePlan.getSubQueryDefBuilder().get().build().getFinalStageDefinition().getSignature(); + log.info("Row signature received from last stage is [%s].", signatureFromInput); + + for (int i = 0; i < signatureFromInput.getColumnNames().size(); i++) { + bob.add(signatureFromInput.getColumnName(i), signatureFromInput.getColumnType(i).get()); } - for (int i = 0; i < numberOfWindows; i++) { - bob.add(rowSignature.getColumnName(baseSize + i), rowSignature.getColumnType(baseSize + i).get()).build(); + List partitionColumnNames = new ArrayList<>(); + + /* + operatorList is a List>, where each List corresponds to the operator factories + to be used for a different window stage. + + We iterate over operatorList, and add the definition for a window stage to QueryDefinitionBuilder. + */ + for (int i = 0; i < operatorList.size(); i++) { + for (OperatorFactory operatorFactory : operatorList.get(i)) { + if (operatorFactory instanceof WindowOperatorFactory) { + List outputColumnNames = ((WindowOperatorFactory) operatorFactory).getProcessor().getOutputColumnNames(); + + // Need to add column names which are present in outputColumnNames and rowSignature but not in bob, + // since they need to be present in the row signature for this window stage. + for (String columnName : outputColumnNames) { + int indexInRowSignature = rowSignature.indexOf(columnName); + if (indexInRowSignature != -1 && bob.build().indexOf(columnName) == -1) { + ColumnType columnType = rowSignature.getColumnType(indexInRowSignature).get(); + bob.add(columnName, columnType); + log.info("Added column [%s] of type [%s] to row signature for window stage.", columnName, columnType); + } else { + throw new ISE( + "Found unexpected column [%s] already present in row signature [%s].", + columnName, + rowSignature + ); + } + } + } + } + // find the shuffle spec of the next stage // if it is the last stage set the next shuffle spec to single partition - if (i + 1 == numberOfWindows) { - nextShuffleSpec = ShuffleSpecFactories.singlePartition() - .build(ClusterBy.none(), false); + if (i + 1 == operatorList.size()) { + nextShuffleSpec = MixShuffleSpec.instance(); } else { nextShuffleSpec = findShuffleSpecForNextWindow(operatorList.get(i + 1), maxWorkerCount); } @@ -162,6 +205,28 @@ public QueryDefinition makeQueryDefinition( ); } + log.info("Using row signature [%s] for window stage.", stageRowSignature); + + boolean partitionOperatorExists = false; + List currentPartitionColumns = new ArrayList<>(); + for (OperatorFactory of : operatorList.get(i)) { + if (of instanceof NaivePartitioningOperatorFactory) { + for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) { + currentPartitionColumns.add(s); + partitionOperatorExists = true; + } + } + } + + if (partitionOperatorExists) { + partitionColumnNames = currentPartitionColumns; + } + + log.info( + "Columns which would be used to define partitioning boundaries for this window stage are [%s]", + partitionColumnNames + ); + queryDefBuilder.add( StageDefinition.builder(firstStageNumber + i) .inputs(new StageInputSpec(firstStageNumber + i - 1)) @@ -173,7 +238,8 @@ public QueryDefinition makeQueryDefinition( operatorList.get(i), stageRowSignature, false, - maxRowsMaterialized + maxRowsMaterialized, + partitionColumnNames )) ); } @@ -184,14 +250,12 @@ public QueryDefinition makeQueryDefinition( /** * * @param originalQuery - * @param operatorList - * @return true if the operator List has a partitioning operator with an empty OVER clause, false otherwise + * @return A list of list of operator factories, where each list represents the operator factories for a particular + * window stage. */ - private boolean ifEmptyOverPresentInWindowOperstors( - WindowOperatorQuery originalQuery, - List> operatorList - ) + private List> getOperatorListFromQuery(WindowOperatorQuery originalQuery) { + List> operatorList = new ArrayList<>(); final List operators = originalQuery.getOperators(); List operatorFactoryList = new ArrayList<>(); for (OperatorFactory of : operators) { @@ -203,18 +267,17 @@ private boolean ifEmptyOverPresentInWindowOperstors( if (((NaivePartitioningOperatorFactory) of).getPartitionColumns().isEmpty()) { operatorList.clear(); operatorList.add(originalQuery.getOperators()); - return true; + return operatorList; } } } - return false; + return operatorList; } private ShuffleSpec findShuffleSpecForNextWindow(List operatorFactories, int maxWorkerCount) { NaivePartitioningOperatorFactory partition = null; NaiveSortOperatorFactory sort = null; - List keyColsOfWindow = new ArrayList<>(); for (OperatorFactory of : operatorFactories) { if (of instanceof NaivePartitioningOperatorFactory) { partition = (NaivePartitioningOperatorFactory) of; @@ -222,29 +285,31 @@ private ShuffleSpec findShuffleSpecForNextWindow(List operatorF sort = (NaiveSortOperatorFactory) of; } } - Map colMap = new HashMap<>(); + + Map sortColumnsMap = new HashMap<>(); if (sort != null) { for (ColumnWithDirection sortColumn : sort.getSortColumns()) { - colMap.put(sortColumn.getColumn(), sortColumn.getDirection()); + sortColumnsMap.put(sortColumn.getColumn(), sortColumn.getDirection()); } } - assert partition != null; - if (partition.getPartitionColumns().isEmpty()) { + + if (partition == null || partition.getPartitionColumns().isEmpty()) { + // If operatorFactories doesn't have any partitioning factory, then we should keep the shuffle spec from previous stage. + // This indicates that we already have the data partitioned correctly, and hence we don't need to do any shuffling. return null; } + + List keyColsOfWindow = new ArrayList<>(); for (String partitionColumn : partition.getPartitionColumns()) { KeyColumn kc; - if (colMap.containsKey(partitionColumn)) { - if (colMap.get(partitionColumn) == ColumnWithDirection.Direction.ASC) { - kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING); - } else { - kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING); - } + if (sortColumnsMap.get(partitionColumn) == ColumnWithDirection.Direction.DESC) { + kc = new KeyColumn(partitionColumn, KeyOrder.DESCENDING); } else { kc = new KeyColumn(partitionColumn, KeyOrder.ASCENDING); } keyColsOfWindow.add(kc); } + return new HashShuffleSpec(new ClusterBy(keyColsOfWindow, 0), maxWorkerCount); } } diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java new file mode 100644 index 000000000000..2049c0194ed1 --- /dev/null +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/WindowOperatorQueryFrameProcessorFactoryTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.Test; + +public class WindowOperatorQueryFrameProcessorFactoryTest +{ + @Test + public void testEqualsAndHashcode() + { + EqualsVerifier.forClass(WindowOperatorQueryFrameProcessorFactory.class) + .withNonnullFields("query", "operatorList", "stageRowSignature", "isEmptyOver", "maxRowsMaterializedInWindow", "partitionColumnNames") + .usingGetClass() + .verify(); + } +} diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java index 54552e5d5b0b..eaa2a9efe5ae 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/test/CalciteMSQTestsHelper.java @@ -39,6 +39,7 @@ import org.apache.druid.guice.annotations.Self; import org.apache.druid.indexing.common.SegmentCacheManagerFactory; import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.math.expr.ExprMacroTable; @@ -62,12 +63,14 @@ import org.apache.druid.query.groupby.TestGroupByBuffers; import org.apache.druid.segment.IndexBuilder; import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.IndexSpec; import org.apache.druid.segment.QueryableIndex; import org.apache.druid.segment.QueryableIndexStorageAdapter; import org.apache.druid.segment.Segment; import org.apache.druid.segment.StorageAdapter; import org.apache.druid.segment.TestIndex; import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.incremental.IncrementalIndex; import org.apache.druid.segment.incremental.IncrementalIndexSchema; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.LocalDataSegmentPusher; @@ -91,6 +94,7 @@ import java.io.File; import java.util.List; import java.util.Set; +import java.util.UUID; import java.util.function.Function; import java.util.function.Supplier; @@ -99,6 +103,7 @@ import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE2; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE3; import static org.apache.druid.sql.calcite.util.CalciteTests.DATASOURCE5; +import static org.apache.druid.sql.calcite.util.CalciteTests.WIKIPEDIA; import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_LOTS_O_COLUMNS; import static org.apache.druid.sql.calcite.util.TestDataBuilder.INDEX_SCHEMA_NUMERIC_DIMS; import static org.apache.druid.sql.calcite.util.TestDataBuilder.ROWS1; @@ -205,6 +210,17 @@ private static Supplier> getSupplierForSegment(Function< { final QueryableIndex index; switch (segmentId.getDataSource()) { + case WIKIPEDIA: + try { + final File directory = new File(tempFolderProducer.apply("tmpDir"), StringUtils.format("wikipedia-index-%s", UUID.randomUUID())); + final IncrementalIndex incrementalIndex = TestIndex.makeWikipediaIncrementalIndex(); + TestIndex.INDEX_MERGER.persist(incrementalIndex, directory, IndexSpec.DEFAULT, null); + index = TestIndex.INDEX_IO.loadIndex(directory); + } + catch (Exception e) { + throw new RuntimeException(e); + } + break; case DATASOURCE1: IncrementalIndexSchema foo1Schema = new IncrementalIndexSchema.Builder() .withMetrics( diff --git a/processing/src/main/java/org/apache/druid/query/operator/Operator.java b/processing/src/main/java/org/apache/druid/query/operator/Operator.java index a9a18c36d547..57bc1013fc44 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/Operator.java +++ b/processing/src/main/java/org/apache/druid/query/operator/Operator.java @@ -126,7 +126,7 @@ enum Signal */ STOP, /** - * Inidcates that the downstream processing should pause its pushing of results and instead return a + * Indicates that the downstream processing should pause its pushing of results and instead return a * continuation object that encapsulates whatever state is required to resume processing. When this signal is * received, Operators that are generating data might choose to exert backpressure or otherwise pause their * processing efforts until called again with the returned continuation object. diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/ComposingProcessor.java b/processing/src/main/java/org/apache/druid/query/operator/window/ComposingProcessor.java index a4fa74967f61..0e0fc59498c0 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/ComposingProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/ComposingProcessor.java @@ -23,7 +23,9 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.query.rowsandcols.RowsAndColumns; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; public class ComposingProcessor implements Processor { @@ -37,6 +39,16 @@ public ComposingProcessor( this.processors = processors; } + @Override + public List getOutputColumnNames() + { + List outputColumnNames = new ArrayList<>(); + for (Processor processor : processors) { + outputColumnNames.addAll(processor.getOutputColumnNames()); + } + return outputColumnNames; + } + @JsonProperty("processors") public Processor[] getProcessors() { diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/Processor.java b/processing/src/main/java/org/apache/druid/query/operator/window/Processor.java index fe8d125cbdf3..b271d3064efa 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/Processor.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/Processor.java @@ -31,6 +31,8 @@ import org.apache.druid.query.operator.window.value.WindowOffsetProcessor; import org.apache.druid.query.rowsandcols.RowsAndColumns; +import java.util.List; + /** * A Processor is a bit of logic that processes a single RowsAndColumns object to produce a new RowsAndColumns * object. Generally speaking, it is used to add or alter columns in a batch-oriented fashion. @@ -80,4 +82,9 @@ public interface Processor * @return boolean identifying if these processors should be considered equivalent to each other. */ boolean validateEquivalent(Processor otherProcessor); + + /** + * @return List of output column names for the Processor. + */ + List getOutputColumnNames(); } diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java index 3545c3740f40..41baced4e611 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessor.java @@ -27,7 +27,9 @@ import org.apache.druid.query.rowsandcols.semantic.FramedOnHeapAggregatable; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Objects; public class WindowFramedAggregateProcessor implements Processor @@ -45,6 +47,16 @@ private static T[] emptyToNull(T[] arr) private final WindowFrame frame; private final AggregatorFactory[] aggregations; + @Override + public List getOutputColumnNames() + { + List outputColumnNames = new ArrayList<>(); + for (AggregatorFactory aggregation : aggregations) { + outputColumnNames.add(aggregation.getName()); + } + return outputColumnNames; + } + @JsonCreator public WindowFramedAggregateProcessor( @JsonProperty("frame") WindowFrame frame, diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessor.java b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessor.java index 541c1399e36e..b7f77d509694 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessor.java @@ -28,12 +28,20 @@ import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; import java.util.Arrays; +import java.util.Collections; +import java.util.List; public class WindowPercentileProcessor implements Processor { private final int numBuckets; private final String outputColumn; + @Override + public List getOutputColumnNames() + { + return Collections.singletonList(outputColumn); + } + @JsonCreator public WindowPercentileProcessor( @JsonProperty("outputColumn") String outputColumn, diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java index fb5bedf9519f..4e026cbdd3db 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRankingProcessorBase.java @@ -27,6 +27,7 @@ import org.apache.druid.query.rowsandcols.semantic.ClusteredGroupPartitioner; import org.apache.druid.query.rowsandcols.semantic.DefaultClusteredGroupPartitioner; +import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.function.Function; @@ -124,4 +125,9 @@ public boolean equals(Object obj) return Objects.equals(groupingCols, other.groupingCols) && Objects.equals(outputColumn, other.outputColumn); } + @Override + public List getOutputColumnNames() + { + return Collections.singletonList(outputColumn); + } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java index 7821e3fd53b4..98b09b6f80d1 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessor.java @@ -28,6 +28,9 @@ import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; import org.apache.druid.segment.column.ColumnType; +import java.util.Collections; +import java.util.List; + public class WindowRowNumberProcessor implements Processor { private final String outputColumn; @@ -128,4 +131,10 @@ public String toString() "outputColumn='" + outputColumn + '\'' + '}'; } + + @Override + public List getOutputColumnNames() + { + return Collections.singletonList(outputColumn); + } } diff --git a/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowValueProcessorBase.java b/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowValueProcessorBase.java index 2e084ae983a8..93a7ccd9a5bb 100644 --- a/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowValueProcessorBase.java +++ b/processing/src/main/java/org/apache/druid/query/operator/window/value/WindowValueProcessorBase.java @@ -26,6 +26,8 @@ import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.semantic.AppendableRowsAndColumns; +import java.util.Collections; +import java.util.List; import java.util.function.Function; public abstract class WindowValueProcessorBase implements Processor @@ -100,4 +102,10 @@ protected String internalToString() return "inputColumn=" + inputColumn + ", outputColumn='" + outputColumn + '\''; } + + @Override + public List getOutputColumnNames() + { + return Collections.singletonList(outputColumn); + } } diff --git a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java index 9cce74cb98cc..c11a50cf5cb0 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/WindowProcessorOperatorTest.java @@ -27,6 +27,9 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Collections; +import java.util.List; + public class WindowProcessorOperatorTest { @Test @@ -53,6 +56,12 @@ public boolean validateEquivalent(Processor otherProcessor) { return true; } + + @Override + public List getOutputColumnNames() + { + return Collections.emptyList(); + } }, InlineScanOperator.make(rac) ); diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/ComposingProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/ComposingProcessorTest.java index 570cba65d92c..d8f4599eb1ac 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/ComposingProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/ComposingProcessorTest.java @@ -23,6 +23,9 @@ import org.junit.Assert; import org.junit.Test; +import java.util.Collections; +import java.util.List; + public class ComposingProcessorTest { @Test @@ -32,6 +35,7 @@ public void testSanity() final ProcessorForTesting secondProcessor = new ProcessorForTesting(); ComposingProcessor proc = new ComposingProcessor(firstProcessor, secondProcessor); + Assert.assertTrue(proc.getOutputColumnNames().isEmpty()); proc.process(null); Assert.assertEquals(1, firstProcessor.processCounter); @@ -70,5 +74,11 @@ public boolean validateEquivalent(Processor otherProcessor) ++validateCounter; return validationResult; } + + @Override + public List getOutputColumnNames() + { + return Collections.emptyList(); + } } } diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java index 88d79c87cdbc..5af321b53c88 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/WindowFramedAggregateProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.operator.window; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import nl.jqno.equalsverifier.EqualsVerifier; import org.apache.druid.common.config.NullHandling; @@ -51,6 +52,7 @@ public void testIsPassThruWhenRACReturnsSemanticInterface() new DoubleSumAggregatorFactory("cummSum", "doubleCol") }; WindowFramedAggregateProcessor proc = new WindowFramedAggregateProcessor(theFrame, theAggs); + Assert.assertEquals(ImmutableList.of("cummMax", "cummSum"), proc.getOutputColumnNames()); final MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(ImmutableMap.of( "yay", new IntArrayColumn(new int[]{1, 2, 3}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowCumeDistProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowCumeDistProcessorTest.java index f5914e4f5dbe..877c78415496 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowCumeDistProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowCumeDistProcessorTest.java @@ -25,6 +25,7 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.junit.Assert; import org.junit.Test; import java.util.Collections; @@ -42,6 +43,7 @@ public void testCumeDistProcessing() MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map); Processor processor = new WindowCumeDistProcessor(Collections.singletonList("vals"), "CumeDist"); + Assert.assertEquals(Collections.singletonList("CumeDist"), processor.getOutputColumnNames()); final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() .expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowDenseRankProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowDenseRankProcessorTest.java index e165f46f0746..86580e5bd2fa 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowDenseRankProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowDenseRankProcessorTest.java @@ -25,6 +25,7 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.junit.Assert; import org.junit.Test; import java.util.Collections; @@ -42,6 +43,7 @@ public void testDenseRankProcessing() MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map); Processor processor = new WindowDenseRankProcessor(Collections.singletonList("vals"), "DenseRank"); + Assert.assertEquals(Collections.singletonList("DenseRank"), processor.getOutputColumnNames()); final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() .expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessorTest.java index c38cd2a245c1..bf5bb727b0a0 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowPercentileProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.operator.window.ranking; +import com.google.common.collect.ImmutableList; import org.apache.druid.query.operator.window.ComposingProcessor; import org.apache.druid.query.operator.window.Processor; import org.apache.druid.query.operator.window.RowsAndColumnsHelper; @@ -29,6 +30,7 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; import org.apache.druid.segment.column.ColumnType; +import org.junit.Assert; import org.junit.Test; import java.util.LinkedHashMap; @@ -63,6 +65,11 @@ public void testPercentileProcessing() new WindowPercentileProcessor("10292", 10292) ); + Assert.assertEquals( + ImmutableList.of("1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "10292"), + processor.getOutputColumnNames() + ); + final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() .expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9}) .expectColumn("doubleCol", new double[]{0.4728, 1, 2, 3, 4, 5, 6, 7, 8, 9}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRankProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRankProcessorTest.java index 59c7dd6df363..b7f281c423eb 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRankProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRankProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.operator.window.ranking; +import com.google.common.collect.ImmutableList; import org.apache.druid.query.operator.window.ComposingProcessor; import org.apache.druid.query.operator.window.Processor; import org.apache.druid.query.operator.window.RowsAndColumnsHelper; @@ -26,6 +27,7 @@ import org.apache.druid.query.rowsandcols.RowsAndColumns; import org.apache.druid.query.rowsandcols.column.Column; import org.apache.druid.query.rowsandcols.column.IntArrayColumn; +import org.junit.Assert; import org.junit.Test; import java.util.Collections; @@ -49,6 +51,8 @@ public void testRankProcessing() new WindowRankProcessor(orderingCols, "rankAsPercent", true) ); + Assert.assertEquals(ImmutableList.of("rank", "rankAsPercent"), processor.getOutputColumnNames()); + final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() .expectColumn("vals", new int[]{7, 18, 18, 30, 120, 121, 122, 122, 8290, 8290}) .expectColumn("rank", new int[]{1, 2, 2, 4, 5, 6, 7, 7, 9, 9}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessorTest.java index 937fea7c3605..f4f9b5bfeee4 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/ranking/WindowRowNumberProcessorTest.java @@ -28,8 +28,10 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; import org.apache.druid.segment.column.ColumnType; +import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.Map; @@ -49,6 +51,7 @@ public void testRowNumberProcessing() MapOfColumnsRowsAndColumns rac = MapOfColumnsRowsAndColumns.fromMap(map); Processor processor = new WindowRowNumberProcessor("rowRow"); + Assert.assertEquals(Collections.singletonList("rowRow"), processor.getOutputColumnNames()); final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() .expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowFirstProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowFirstProcessorTest.java index 67242f055033..eb6caa10a0b5 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowFirstProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowFirstProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.operator.window.value; +import com.google.common.collect.ImmutableList; import org.apache.druid.query.operator.window.ComposingProcessor; import org.apache.druid.query.operator.window.RowsAndColumnsHelper; import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; @@ -28,6 +29,7 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; import org.apache.druid.segment.column.ColumnType; +import org.junit.Assert; import org.junit.Test; import java.util.LinkedHashMap; @@ -59,6 +61,11 @@ public void testFirstProcessing() new WindowFirstProcessor("nullFirstCol", "NullFirstCol") ); + Assert.assertEquals( + ImmutableList.of("FirstIntCol", "FirstDoubleCol", "FirstObjectCol", "NullFirstCol"), + processor.getOutputColumnNames() + ); + final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() .expectColumn("intCol", new int[]{88, 1, 2, 3, 4, 5, 6, 7, 8, 9}) .expectColumn("doubleCol", new double[]{0.4728, 1, 2, 3, 4, 5, 6, 7, 8, 9}) diff --git a/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowLastProcessorTest.java b/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowLastProcessorTest.java index 5aa212b6acb2..1910401f34a7 100644 --- a/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowLastProcessorTest.java +++ b/processing/src/test/java/org/apache/druid/query/operator/window/value/WindowLastProcessorTest.java @@ -19,6 +19,7 @@ package org.apache.druid.query.operator.window.value; +import com.google.common.collect.ImmutableList; import org.apache.druid.query.operator.window.ComposingProcessor; import org.apache.druid.query.operator.window.RowsAndColumnsHelper; import org.apache.druid.query.rowsandcols.MapOfColumnsRowsAndColumns; @@ -28,6 +29,7 @@ import org.apache.druid.query.rowsandcols.column.IntArrayColumn; import org.apache.druid.query.rowsandcols.column.ObjectArrayColumn; import org.apache.druid.segment.column.ColumnType; +import org.junit.Assert; import org.junit.Test; import java.util.LinkedHashMap; @@ -58,6 +60,10 @@ public void testLastProcessing() new WindowLastProcessor("objectCol", "LastObjectCol"), new WindowLastProcessor("nullLastCol", "NullLastCol") ); + Assert.assertEquals( + ImmutableList.of("LastIntCol", "LastDoubleCol", "LastObjectCol", "NullLastCol"), + processor.getOutputColumnNames() + ); final RowsAndColumnsHelper expectations = new RowsAndColumnsHelper() diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java index cb7bed7e0416..4e958383945d 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/DrillWindowQueryTest.java @@ -7533,4 +7533,78 @@ public void test_nestedAggs_multiWin_8() { windowQueryTest(); } + + /* + Druid query tests + */ + + @DrillTest("druid_queries/same_window_across_columns/wikipedia_query_1") + @Test + public void test_same_window_wikipedia_query_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/same_window_across_columns/wikipedia_query_1_named_window") + @Test + public void test_same_window_wikipedia_query_1_named_window() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/multiple_windows/wikipedia_query_1") + @Test + public void test_multiple_windows_wikipedia_query_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/multiple_windows/wikipedia_query_1_named_windows") + @Test + public void test_multiple_windows_wikipedia_query_1_named_windows() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/shuffle_columns/wikipedia_query_1") + @Test + public void test_shuffle_columns_wikipedia_query_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1") + @Test + public void test_shuffle_columns_wikipedia_query_1_shuffle_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/shuffle_columns/wikipedia_query_2") + @Test + public void test_shuffle_columns_wikipedia_query_2() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1") + @Test + public void test_shuffle_columns_wikipedia_query_2_shuffle_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/partition_by_multiple_columns/wikipedia_query_1") + @Test + public void test_partition_by_multiple_columns_wikipedia_query_1() + { + windowQueryTest(); + } + + @DrillTest("druid_queries/partition_by_multiple_columns/wikipedia_query_2") + @Test + public void test_partition_by_multiple_columns_wikipedia_query_2() + { + windowQueryTest(); + } } diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.e new file mode 100644 index 000000000000..3625be892e2d --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.e @@ -0,0 +1,13 @@ +null Austria 1 1 +null Republic of Korea 1 2 +null Republic of Korea 2 3 +null Republic of Korea 3 4 +Horsching Austria 2 1 +Jeonju Republic of Korea 4 1 +Seongnam-si Republic of Korea 5 1 +Seoul Republic of Korea 6 1 +Suwon-si Republic of Korea 7 1 +Vienna Austria 3 1 +Vienna Austria 4 2 +Vienna Austria 5 3 +Yongsan-dong Republic of Korea 8 1 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.q new file mode 100644 index 000000000000..d61a33e401f4 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1.q @@ -0,0 +1,6 @@ +select cityName, countryName, +row_number() over (partition by countryName order by countryName, cityName, channel) as c1, +count(channel) over (partition by cityName order by countryName, cityName, channel) as c2 +from wikipedia +where countryName in ('Austria', 'Republic of Korea') +group by countryName, cityName, channel diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.e b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.e new file mode 100644 index 000000000000..3625be892e2d --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.e @@ -0,0 +1,13 @@ +null Austria 1 1 +null Republic of Korea 1 2 +null Republic of Korea 2 3 +null Republic of Korea 3 4 +Horsching Austria 2 1 +Jeonju Republic of Korea 4 1 +Seongnam-si Republic of Korea 5 1 +Seoul Republic of Korea 6 1 +Suwon-si Republic of Korea 7 1 +Vienna Austria 3 1 +Vienna Austria 4 2 +Vienna Austria 5 3 +Yongsan-dong Republic of Korea 8 1 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.q b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.q new file mode 100644 index 000000000000..12739d58ceb3 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/multiple_windows/wikipedia_query_1_named_windows.q @@ -0,0 +1,9 @@ +select cityName, countryName, +row_number() over w1 as c1, +count(channel) over w2 as c2 +from wikipedia +where countryName in ('Austria', 'Republic of Korea') +group by countryName, cityName, channel +WINDOW + w1 AS (partition by countryName order by countryName, cityName, channel), + w2 AS (partition by cityName order by countryName, cityName, channel) diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.e new file mode 100644 index 000000000000..36812a418aeb --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.e @@ -0,0 +1,15 @@ +Austria null 94 7 +Austria null 4685 7 +Austria null 14 7 +Austria null 0 7 +Austria null 272 7 +Austria null 0 7 +Austria null 6979 7 +Guatemala null 0 1 +Guatemala El Salvador 1 1 +Guatemala Guatemala City 173 1 +Austria Horsching 0 1 +Austria Vienna 93 4 +Austria Vienna 72 4 +Austria Vienna 0 4 +Austria Vienna 0 4 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.q new file mode 100644 index 000000000000..5d0dd0756784 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_1.q @@ -0,0 +1,7 @@ +SELECT +countryName, +cityName, +added, +count(added) OVER (PARTITION BY countryName, cityName) +FROM "wikipedia" +where countryName in ('Guatemala', 'Austria') diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.e b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.e new file mode 100644 index 000000000000..a1b94f5a865d --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.e @@ -0,0 +1,15 @@ +Austria null 0 7 12044 1 +Austria null 0 7 12044 2 +Austria null 14 7 12044 1 +Austria null 94 7 12044 1 +Austria null 272 7 12044 1 +Austria null 4685 7 12044 1 +Austria null 6979 7 12044 1 +Guatemala null 0 1 0 1 +Guatemala El Salvador 1 1 1 1 +Guatemala Guatemala City 173 1 173 1 +Austria Horsching 0 1 0 1 +Austria Vienna 0 4 165 1 +Austria Vienna 0 4 165 2 +Austria Vienna 72 4 165 1 +Austria Vienna 93 4 165 1 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.q b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.q new file mode 100644 index 000000000000..b1a594beedaf --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/partition_by_multiple_columns/wikipedia_query_2.q @@ -0,0 +1,9 @@ +SELECT +countryName, +cityName, +added, +count(added) OVER (PARTITION BY countryName, cityName), +sum(added) OVER (PARTITION BY countryName, cityName), +ROW_NUMBER() OVER (PARTITION BY countryName, cityName, added) +FROM "wikipedia" +where countryName in ('Guatemala', 'Austria') diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.e new file mode 100644 index 000000000000..0dfb6a832b8a --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.e @@ -0,0 +1,15 @@ +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Guatemala 167 7 174 +Guatemala 167 7 174 +Guatemala 167 7 174 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.q new file mode 100644 index 000000000000..dcb83c09c231 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1.q @@ -0,0 +1,6 @@ +SELECT countryName, +sum("deleted") OVER (PARTITION BY countryName) as count_c3, +sum(delta) OVER (PARTITION BY countryName) as count_c1, +sum(added) OVER (PARTITION BY countryName) as count_c2 +FROM "wikipedia" +where countryName in ('Guatemala', 'Austria') diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.e b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.e new file mode 100644 index 000000000000..0dfb6a832b8a --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.e @@ -0,0 +1,15 @@ +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Austria 162 12047 12209 +Guatemala 167 7 174 +Guatemala 167 7 174 +Guatemala 167 7 174 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.q b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.q new file mode 100644 index 000000000000..adb9287d3788 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/same_window_across_columns/wikipedia_query_1_named_window.q @@ -0,0 +1,7 @@ +SELECT countryName, +sum("deleted") OVER w as count_c3, +sum(delta) OVER w as count_c1, +sum(added) OVER w as count_c2 +FROM "wikipedia" +where countryName in ('Guatemala', 'Austria') +WINDOW w AS (PARTITION BY countryName) diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.e new file mode 100644 index 000000000000..e934bc8fc276 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.e @@ -0,0 +1,15 @@ +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Austria 1017.4166666666666 +Guatemala 58 +Guatemala 58 +Guatemala 58 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.q new file mode 100644 index 000000000000..f1a7bcb09b17 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1.q @@ -0,0 +1,5 @@ +SELECT +countryName, +AVG(added) OVER(PARTITION BY countryName) +FROM wikipedia +where countryName in ('Guatemala', 'Austria') diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.e new file mode 100644 index 000000000000..e74706be0098 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.e @@ -0,0 +1,15 @@ +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +1017.4166666666666 Austria +58 Guatemala +58 Guatemala +58 Guatemala diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.q new file mode 100644 index 000000000000..c2dc11546a94 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_1_shuffle_1.q @@ -0,0 +1,5 @@ +SELECT +AVG(added) OVER(PARTITION BY countryName), +countryName +FROM wikipedia +where countryName in ('Guatemala', 'Austria') diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.e b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.e new file mode 100644 index 000000000000..daf6eff61ba1 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.e @@ -0,0 +1,16 @@ +Austria null 1 #de.wikipedia 1 +Guatemala null 1 #es.wikipedia 2 +Republic of Korea null 1 #en.wikipedia 3 +Republic of Korea null 2 #ja.wikipedia 4 +Republic of Korea null 3 #ko.wikipedia 5 +Guatemala El Salvador 2 #es.wikipedia 1 +Guatemala Guatemala City 3 #es.wikipedia 1 +Austria Horsching 2 #de.wikipedia 1 +Republic of Korea Jeonju 4 #ko.wikipedia 1 +Republic of Korea Seongnam-si 5 #ko.wikipedia 1 +Republic of Korea Seoul 6 #ko.wikipedia 1 +Republic of Korea Suwon-si 7 #ko.wikipedia 1 +Austria Vienna 3 #de.wikipedia 1 +Austria Vienna 4 #es.wikipedia 2 +Austria Vienna 5 #tr.wikipedia 3 +Republic of Korea Yongsan-dong 8 #ko.wikipedia 1 diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.q b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.q new file mode 100644 index 000000000000..d3ea2dfc729a --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2.q @@ -0,0 +1,9 @@ +SELECT +countryName, +cityName, +ROW_NUMBER() OVER(PARTITION BY countryName), +channel, +COUNT(channel) over (PARTITION BY cityName order by countryName, cityName, channel) +FROM wikipedia +where countryName in ('Guatemala', 'Austria', 'Republic of Korea') +group by countryName, cityName, channel diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.e b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.e new file mode 100644 index 000000000000..813ccdbf6aaf --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.e @@ -0,0 +1,16 @@ +1 Austria null 1 #de.wikipedia +1 Guatemala null 2 #es.wikipedia +1 Republic of Korea null 3 #en.wikipedia +2 Republic of Korea null 4 #ja.wikipedia +3 Republic of Korea null 5 #ko.wikipedia +2 Guatemala El Salvador 1 #es.wikipedia +3 Guatemala Guatemala City 1 #es.wikipedia +2 Austria Horsching 1 #de.wikipedia +4 Republic of Korea Jeonju 1 #ko.wikipedia +5 Republic of Korea Seongnam-si 1 #ko.wikipedia +6 Republic of Korea Seoul 1 #ko.wikipedia +7 Republic of Korea Suwon-si 1 #ko.wikipedia +3 Austria Vienna 1 #de.wikipedia +4 Austria Vienna 2 #es.wikipedia +5 Austria Vienna 3 #tr.wikipedia +8 Republic of Korea Yongsan-dong 1 #ko.wikipedia diff --git a/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.q b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.q new file mode 100644 index 000000000000..779aaf3a86f3 --- /dev/null +++ b/sql/src/test/resources/drill/window/queries/druid_queries/shuffle_columns/wikipedia_query_2_shuffle_1.q @@ -0,0 +1,9 @@ +SELECT +ROW_NUMBER() OVER(PARTITION BY countryName), +countryName, +cityName, +COUNT(channel) over (PARTITION BY cityName order by countryName, cityName, channel), +channel +FROM wikipedia +where countryName in ('Guatemala', 'Austria', 'Republic of Korea') +group by countryName, cityName, channel