Skip to content

Commit

Permalink
Fix issues with partitioning boundaries for MSQ window functions (#16729
Browse files Browse the repository at this point in the history
)

* Fix issues with partitioning boundaries for MSQ window functions

* Address review comments

* Address review comments

* Add test for coverage check failure

* Address review comment

* Remove DruidWindowQueryTest and WindowQueryTestBase, move those tests to DrillWindowQueryTest

* Update extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/WindowOperatorQueryKit.java

* Address review comments

* Add test for equals and hashcode for WindowOperatorQueryFrameProcessorFactory

* Address review comment

* Fix checkstyle

---------

Co-authored-by: Benedict Jin <[email protected]>
  • Loading branch information
Akshat-Jain and asdf2014 authored Jul 18, 2024
1 parent 44b3f8e commit b53c26f
Show file tree
Hide file tree
Showing 44 changed files with 614 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.druid.msq.indexing.error.MSQException;
import org.apache.druid.msq.indexing.error.TooManyRowsInAWindowFault;
import org.apache.druid.query.groupby.ResultRow;
import org.apache.druid.query.operator.NaivePartitioningOperatorFactory;
import org.apache.druid.query.operator.OffsetLimit;
import org.apache.druid.query.operator.Operator;
import org.apache.druid.query.operator.OperatorFactory;
Expand Down Expand Up @@ -70,6 +69,7 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final WindowOperatorQuery query;

private final List<OperatorFactory> operatorFactoryList;
private final List<String> partitionColumnNames;
private final ObjectMapper jsonMapper;
private final ArrayList<RowsAndColumns> frameRowsAndCols;
private final ArrayList<RowsAndColumns> resultRowAndCols;
Expand All @@ -79,7 +79,6 @@ public class WindowOperatorQueryFrameProcessor implements FrameProcessor<Object>
private final FrameReader frameReader;
private final ArrayList<ResultRow> objectsOfASingleRac;
private final int maxRowsMaterialized;
List<Integer> partitionColsIndex;
private long currentAllocatorCapacity; // Used for generating FrameRowTooLargeException if needed
private Cursor frameCursor = null;
private Supplier<ResultRow> rowSupplierFromFrameCursor;
Expand All @@ -97,7 +96,8 @@ public WindowOperatorQueryFrameProcessor(
final List<OperatorFactory> operatorFactoryList,
final RowSignature rowSignature,
final boolean isOverEmpty,
final int maxRowsMaterializedInWindow
final int maxRowsMaterializedInWindow,
final List<String> partitionColumnNames
)
{
this.inputChannel = inputChannel;
Expand All @@ -110,9 +110,9 @@ public WindowOperatorQueryFrameProcessor(
this.frameRowsAndCols = new ArrayList<>();
this.resultRowAndCols = new ArrayList<>();
this.objectsOfASingleRac = new ArrayList<>();
this.partitionColsIndex = new ArrayList<>();
this.isOverEmpty = isOverEmpty;
this.maxRowsMaterialized = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;
}

@Override
Expand Down Expand Up @@ -177,12 +177,12 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
*
* Future thoughts: {@link https://github.com/apache/druid/issues/16126}
*
* 1. We are writing 1 partition to each frame in this way. In case of low cardinality data
* we will me making a large number of small frames. We can have a check to keep size of frame to a value
* 1. We are writing 1 partition to each frame in this way. In case of high cardinality data
* we will be making a large number of small frames. We can have a check to keep size of frame to a value
* say 20k rows and keep on adding to the same pending frame and not create a new frame
*
* 2. Current approach with R&C and operators materialize a single R&C for processing. In case of data
* with high cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause
* with low cardinality a single R&C might be too big to consume. Same for the case of empty OVER() clause
* Most of the window operations like SUM(), RANK(), RANGE() etc. can be made with 2 passes of the data.
* We might think to reimplement them in the MSQ way so that we do not have to materialize so much data
*/
Expand Down Expand Up @@ -218,7 +218,6 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
final Frame frame = inputChannel.read();
frameCursor = FrameProcessors.makeCursor(frame, frameReader);
final ColumnSelectorFactory frameColumnSelectorFactory = frameCursor.getColumnSelectorFactory();
partitionColsIndex = findPartitionColumns(frameReader.signature());
final Supplier<Object>[] fieldSuppliers = new Supplier[frameReader.signature().size()];
for (int i = 0; i < fieldSuppliers.length; i++) {
final ColumnValueSelector<?> selector =
Expand Down Expand Up @@ -259,18 +258,17 @@ public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs)
if (outputRow == null) {
outputRow = currentRow;
objectsOfASingleRac.add(currentRow);
} else if (comparePartitionKeys(outputRow, currentRow, partitionColsIndex)) {
} else if (comparePartitionKeys(outputRow, currentRow, partitionColumnNames)) {
// if they have the same partition key
// keep adding them after checking
// guardrails
objectsOfASingleRac.add(currentRow);
if (objectsOfASingleRac.size() > maxRowsMaterialized) {
throw new MSQException(new TooManyRowsInAWindowFault(
objectsOfASingleRac.size(),
maxRowsMaterialized
));
}
objectsOfASingleRac.add(currentRow);

} else {
// key change noted
// create rac from the rows seen before
Expand Down Expand Up @@ -484,37 +482,36 @@ private void convertRowFrameToRowsAndColumns(Frame frame)
frameRowsAndCols.add(ldrc);
}

private List<Integer> findPartitionColumns(RowSignature rowSignature)
{
List<Integer> indexList = new ArrayList<>();
for (OperatorFactory of : operatorFactoryList) {
if (of instanceof NaivePartitioningOperatorFactory) {
for (String s : ((NaivePartitioningOperatorFactory) of).getPartitionColumns()) {
indexList.add(rowSignature.indexOf(s));
}
}
}
return indexList;
}

/**
*
* Compare two rows based only the columns in the partitionIndices
* In case the parition indices is empty or null compare entire row
*
* Compare two rows based on the columns in partitionColumnNames.
* If the partitionColumnNames is empty or null, compare entire row.
* <p>
* For example, say:
* <ul>
* <li>partitionColumnNames = ["d1", "d2"]</li>
* <li>frameReader's row signature = {d1:STRING, d2:STRING, p0:STRING}</li>
* <li>frameReader.signature.indexOf("d1") = 0</li>
* <li>frameReader.signature.indexOf("d2") = 1</li>
* <li>row1 = [d1_row1, d2_row1, p0_row1]</li>
* <li>row2 = [d1_row2, d2_row2, p0_row2]</li>
* </ul>
* <p>
* Then this method will return true if d1_row1==d1_row2 && d2_row1==d2_row2, false otherwise.
* Returning true would indicate that these 2 rows can be put into the same partition for window function processing.
*/
private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<Integer> partitionIndices)
private boolean comparePartitionKeys(ResultRow row1, ResultRow row2, List<String> partitionColumnNames)
{
if (partitionIndices == null || partitionIndices.isEmpty()) {
if (partitionColumnNames == null || partitionColumnNames.isEmpty()) {
return row1.equals(row2);
} else {
int match = 0;
for (int i : partitionIndices) {
for (String columnName : partitionColumnNames) {
int i = frameReader.signature().indexOf(columnName);
if (Objects.equals(row1.get(i), row2.get(i))) {
match++;
}
}
return match == partitionIndices.size();
return match == partitionColumnNames.size();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,24 @@ public class WindowOperatorQueryFrameProcessorFactory extends BaseFrameProcessor
private final RowSignature stageRowSignature;
private final boolean isEmptyOver;
private final int maxRowsMaterializedInWindow;
private final List<String> partitionColumnNames;

@JsonCreator
public WindowOperatorQueryFrameProcessorFactory(
@JsonProperty("query") WindowOperatorQuery query,
@JsonProperty("operatorList") List<OperatorFactory> operatorFactoryList,
@JsonProperty("stageRowSignature") RowSignature stageRowSignature,
@JsonProperty("emptyOver") boolean emptyOver,
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow
@JsonProperty("maxRowsMaterializedInWindow") int maxRowsMaterializedInWindow,
@JsonProperty("partitionColumnNames") List<String> partitionColumnNames
)
{
this.query = Preconditions.checkNotNull(query, "query");
this.operatorList = Preconditions.checkNotNull(operatorFactoryList, "bad operator");
this.stageRowSignature = Preconditions.checkNotNull(stageRowSignature, "stageSignature");
this.isEmptyOver = emptyOver;
this.maxRowsMaterializedInWindow = maxRowsMaterializedInWindow;
this.partitionColumnNames = partitionColumnNames;
}

@JsonProperty("query")
Expand All @@ -90,6 +93,12 @@ public List<OperatorFactory> getOperators()
return operatorList;
}

@JsonProperty("partitionColumnNames")
public List<String> getPartitionColumnNames()
{
return partitionColumnNames;
}

@JsonProperty("stageRowSignature")
public RowSignature getSignature()
{
Expand Down Expand Up @@ -148,7 +157,6 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
readableInput -> {
final OutputChannel outputChannel =
outputChannels.get(readableInput.getStagePartition().getPartitionNumber());

return new WindowOperatorQueryFrameProcessor(
query,
readableInput.getChannel(),
Expand All @@ -159,7 +167,8 @@ public ProcessorsAndChannels<Object, Long> makeProcessors(
operatorList,
stageRowSignature,
isEmptyOver,
maxRowsMaterializedInWindow
maxRowsMaterializedInWindow,
partitionColumnNames
);
}
);
Expand All @@ -185,12 +194,13 @@ public boolean equals(Object o)
&& maxRowsMaterializedInWindow == that.maxRowsMaterializedInWindow
&& Objects.equals(query, that.query)
&& Objects.equals(operatorList, that.operatorList)
&& Objects.equals(partitionColumnNames, that.partitionColumnNames)
&& Objects.equals(stageRowSignature, that.stageRowSignature);
}

@Override
public int hashCode()
{
return Objects.hash(query, operatorList, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow);
return Objects.hash(query, operatorList, partitionColumnNames, stageRowSignature, isEmptyOver, maxRowsMaterializedInWindow);
}
}
Loading

0 comments on commit b53c26f

Please sign in to comment.