From dc223f22dbdd15a3a75d95311fb34d63384066b1 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Thu, 26 Sep 2024 22:02:30 -0700 Subject: [PATCH] SQL: Use regular filters for time filtering in subqueries. (#17173) * SQL: Use regular filters for time filtering in subqueries. Using the "intervals" feature on subqueries, or any non-table, should be avoided because it isn't a meaningful optimization in those cases, and it's simpler for runtime implementations if they can assume all filters are located in the regular filter object. Two changes: 1) Fix the logic in DruidQuery.canUseIntervalFiltering. It was intended to return false for QueryDataSource, but actually returned true. 2) Add a validation to ScanQueryFrameProcessor to ensure that when running on an input channel (which would include any subquery), the query has "intervals" set to ONLY_ETERNITY. Prior to this patch, the new test case in testTimeFilterOnSubquery would throw a "Can only handle a single interval" error in the native engine, and "QueryNotSupported" in the MSQ engine. * Mark new case as having extra columns in decoupled mode. * Adjust test. --- .../scan/ScanQueryFrameProcessor.java | 15 +-- .../scan/ScanQueryFrameProcessorTest.java | 18 +++- .../druid/sql/calcite/rel/DruidQuery.java | 4 +- .../druid/sql/calcite/CalciteQueryTest.java | 51 ++++++++++ ...meFilterOnSubquery@NullHandling=default.iq | 92 +++++++++++++++++++ ...stTimeFilterOnSubquery@NullHandling=sql.iq | 88 ++++++++++++++++++ 6 files changed, 255 insertions(+), 13 deletions(-) create mode 100644 sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteQueryTest/testTimeFilterOnSubquery@NullHandling=default.iq create mode 100644 sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteQueryTest/testTimeFilterOnSubquery@NullHandling=sql.iq diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java index 06dce22a1897..05f80b9805d5 100644 --- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java +++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessor.java @@ -25,6 +25,7 @@ import com.google.common.collect.Iterables; import it.unimi.dsi.fastutil.ints.IntSet; import org.apache.druid.collections.ResourceHolder; +import org.apache.druid.error.DruidException; import org.apache.druid.frame.Frame; import org.apache.druid.frame.channel.FrameWithPartition; import org.apache.druid.frame.channel.ReadableFrameChannel; @@ -64,7 +65,6 @@ import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.scan.ScanQueryEngine; import org.apache.druid.query.scan.ScanResultValue; -import org.apache.druid.query.spec.MultipleIntervalSegmentSpec; import org.apache.druid.query.spec.SpecificSegmentSpec; import org.apache.druid.segment.ColumnSelectorFactory; import org.apache.druid.segment.CompleteSegment; @@ -312,13 +312,14 @@ protected ReturnOrAwait runWithInputChannel( ); } + if (!Intervals.ONLY_ETERNITY.equals(query.getIntervals())) { + // runWithInputChannel is for running on subquery results, where we don't expect to see "intervals" set. + // The SQL planner avoid it for subqueries; see DruidQuery#canUseIntervalFiltering. + throw DruidException.defensive("Expected eternity intervals, but got[%s]", query.getIntervals()); + } + final CursorHolder nextCursorHolder = - cursorFactory.makeCursorHolder( - ScanQueryEngine.makeCursorBuildSpec( - query.withQuerySegmentSpec(new MultipleIntervalSegmentSpec(Intervals.ONLY_ETERNITY)), - null - ) - ); + cursorFactory.makeCursorHolder(ScanQueryEngine.makeCursorBuildSpec(query, null)); final Cursor nextCursor = nextCursorHolder.asCursor(); if (nextCursor == null) { diff --git a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java index af0a72035702..96957da53213 100644 --- a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java +++ b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/scan/ScanQueryFrameProcessorTest.java @@ -59,8 +59,11 @@ import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.incremental.IncrementalIndexCursorFactory; import org.apache.druid.timeline.SegmentId; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; import org.junit.Assert; import org.junit.Test; +import org.junit.internal.matchers.ThrowableMessageMatcher; import java.io.IOException; import java.util.Collections; @@ -177,7 +180,7 @@ public void test_runWithInputChannel() throws Exception } } - // put funny intervals on query to ensure it is adjusted to the segment interval before building cursor + // put funny intervals on query to ensure it is validated before building cursor final ScanQuery query = Druids.newScanQueryBuilder() .dataSource("test") @@ -240,11 +243,16 @@ public void close() FrameReader.create(signature) ); - FrameTestUtil.assertRowsEqual( - FrameTestUtil.readRowsFromCursorFactory(cursorFactory, signature, false), - rowsFromProcessor + final RuntimeException e = Assert.assertThrows( + RuntimeException.class, + rowsFromProcessor::toList ); - Assert.assertEquals(Unit.instance(), retVal.get()); + MatcherAssert.assertThat( + e, + ThrowableMessageMatcher.hasMessage(CoreMatchers.containsString( + "Expected eternity intervals, but got[[2001-01-01T00:00:00.000Z/2011-01-01T00:00:00.000Z, " + + "2011-01-02T00:00:00.000Z/2021-01-01T00:00:00.000Z]]")) + ); } } diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java index 3ce33e722452..eb85dc83f303 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/rel/DruidQuery.java @@ -76,6 +76,7 @@ import org.apache.druid.query.operator.ScanOperatorFactory; import org.apache.druid.query.operator.WindowOperatorQuery; import org.apache.druid.query.ordering.StringComparator; +import org.apache.druid.query.planning.DataSourceAnalysis; import org.apache.druid.query.scan.ScanQuery; import org.apache.druid.query.spec.LegacySegmentSpec; import org.apache.druid.query.timeboundary.TimeBoundaryQuery; @@ -884,7 +885,8 @@ static Pair getFiltration( */ private static boolean canUseIntervalFiltering(final DataSource dataSource) { - return dataSource.getAnalysis().isTableBased(); + final DataSourceAnalysis analysis = dataSource.getAnalysis(); + return !analysis.getBaseQuery().isPresent() && analysis.isTableBased(); } private static Filtration toFiltration( diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java index 5af5ec3097c5..412f378e8b52 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/CalciteQueryTest.java @@ -116,6 +116,7 @@ import org.apache.druid.query.topn.TopNQueryBuilder; import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnHolder; import org.apache.druid.segment.column.ColumnType; import org.apache.druid.segment.column.RowSignature; import org.apache.druid.segment.join.JoinType; @@ -7764,6 +7765,56 @@ public void testMultipleExactCountDistinctWithGroupingAndOtherAggregatorsUsingJo ); } + @DecoupledTestConfig(quidemReason = QuidemTestCaseReason.EQUIV_PLAN_EXTRA_COLUMNS, separateDefaultModeTest = true) + @Test + public void testTimeFilterOnSubquery() + { + testQuery( + "SELECT __time, m1 FROM (SELECT * FROM \"foo\" LIMIT 100)\n" + + "WHERE TIME_IN_INTERVAL(__time, '2000/P1D') OR TIME_IN_INTERVAL(__time, '2001/P1D')", + ImmutableList.of( + newScanQueryBuilder() + .dataSource( + newScanQueryBuilder() + .dataSource(CalciteTests.DATASOURCE1) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("__time", "m1") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .limit(100) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .filters(or( + range( + ColumnHolder.TIME_COLUMN_NAME, + ColumnType.LONG, + DateTimes.of("2000").getMillis(), + DateTimes.of("2000-01-02").getMillis(), + false, + true + ), + range( + ColumnHolder.TIME_COLUMN_NAME, + ColumnType.LONG, + DateTimes.of("2001").getMillis(), + DateTimes.of("2001-01-02").getMillis(), + false, + true + ) + )) + .columns("__time", "m1") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .context(QUERY_CONTEXT_DEFAULT) + .build() + ), + ImmutableList.of( + new Object[]{DateTimes.of("2000-01-01").getMillis(), 1.0f}, + new Object[]{DateTimes.of("2001-01-01").getMillis(), 4.0f} + ) + ); + } + @SqlTestFrameworkConfig.NumMergeBuffers(4) @Test public void testMultipleExactCountDistinctWithGroupingUsingGroupingSets() diff --git a/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteQueryTest/testTimeFilterOnSubquery@NullHandling=default.iq b/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteQueryTest/testTimeFilterOnSubquery@NullHandling=default.iq new file mode 100644 index 000000000000..9f63bc9a2222 --- /dev/null +++ b/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteQueryTest/testTimeFilterOnSubquery@NullHandling=default.iq @@ -0,0 +1,92 @@ +# testTimeFilterOnSubquery@NullHandling=default case-crc:73448efc +# quidem testcase reason: EQUIV_PLAN_EXTRA_COLUMNS +!set debug true +!set defaultTimeout 300000 +!set maxScatterGatherBytes 9223372036854775807 +!set plannerStrategy DECOUPLED +!set sqlCurrentTimestamp 2000-01-01T00:00:00Z +!set sqlQueryId dummy +!set outputformat mysql +!use druidtest:/// +SELECT __time, m1 FROM (SELECT * FROM "foo" LIMIT 100) +WHERE TIME_IN_INTERVAL(__time, '2000/P1D') OR TIME_IN_INTERVAL(__time, '2001/P1D'); ++-------------------------+-----+ +| __time | m1 | ++-------------------------+-----+ +| 2000-01-01 00:00:00.000 | 1.0 | +| 2001-01-01 00:00:00.000 | 4.0 | ++-------------------------+-----+ +(2 rows) + +!ok +LogicalProject(__time=[$0], m1=[$5]) + LogicalFilter(condition=[SEARCH($0, Sarg[[2000-01-01 00:00:00:TIMESTAMP(3)..2000-01-02 00:00:00:TIMESTAMP(3)), [2001-01-01 00:00:00:TIMESTAMP(3)..2001-01-02 00:00:00:TIMESTAMP(3))]:TIMESTAMP(3))]) + LogicalSort(fetch=[100]) + LogicalTableScan(table=[[druid, foo]]) + +!logicalPlan +DruidProject(__time=[$0], m1=[$5], druid=[logical]) + DruidFilter(condition=[SEARCH($0, Sarg[[2000-01-01 00:00:00:TIMESTAMP(3)..2000-01-02 00:00:00:TIMESTAMP(3)), [2001-01-01 00:00:00:TIMESTAMP(3)..2001-01-02 00:00:00:TIMESTAMP(3))]:TIMESTAMP(3))]) + DruidSort(fetch=[100], druid=[logical]) + DruidTableScan(table=[[druid, foo]], druid=[logical]) + +!druidPlan +{ + "queryType" : "scan", + "dataSource" : { + "type" : "query", + "query" : { + "queryType" : "scan", + "dataSource" : { + "type" : "table", + "name" : "foo" + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "resultFormat" : "compactedList", + "limit" : 100, + "columns" : [ "__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1" ], + "columnTypes" : [ "LONG", "LONG", "STRING", "STRING", "STRING", "FLOAT", "DOUBLE", "COMPLEX" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false + } + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "resultFormat" : "compactedList", + "filter" : { + "type" : "or", + "fields" : [ { + "type" : "bound", + "dimension" : "__time", + "lower" : "946684800000", + "upper" : "946771200000", + "upperStrict" : true, + "ordering" : { + "type" : "numeric" + } + }, { + "type" : "bound", + "dimension" : "__time", + "lower" : "978307200000", + "upper" : "978393600000", + "upperStrict" : true, + "ordering" : { + "type" : "numeric" + } + } ] + }, + "columns" : [ "__time", "m1" ], + "columnTypes" : [ "LONG", "FLOAT" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false +} +!nativePlan diff --git a/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteQueryTest/testTimeFilterOnSubquery@NullHandling=sql.iq b/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteQueryTest/testTimeFilterOnSubquery@NullHandling=sql.iq new file mode 100644 index 000000000000..40b9d8747773 --- /dev/null +++ b/sql/src/test/quidem/org.apache.druid.sql.calcite.DecoupledPlanningCalciteQueryTest/testTimeFilterOnSubquery@NullHandling=sql.iq @@ -0,0 +1,88 @@ +# testTimeFilterOnSubquery@NullHandling=sql case-crc:73448efc +# quidem testcase reason: EQUIV_PLAN_EXTRA_COLUMNS +!set debug true +!set defaultTimeout 300000 +!set maxScatterGatherBytes 9223372036854775807 +!set plannerStrategy DECOUPLED +!set sqlCurrentTimestamp 2000-01-01T00:00:00Z +!set sqlQueryId dummy +!set outputformat mysql +!use druidtest:/// +SELECT __time, m1 FROM (SELECT * FROM "foo" LIMIT 100) +WHERE TIME_IN_INTERVAL(__time, '2000/P1D') OR TIME_IN_INTERVAL(__time, '2001/P1D'); ++-------------------------+-----+ +| __time | m1 | ++-------------------------+-----+ +| 2000-01-01 00:00:00.000 | 1.0 | +| 2001-01-01 00:00:00.000 | 4.0 | ++-------------------------+-----+ +(2 rows) + +!ok +LogicalProject(__time=[$0], m1=[$5]) + LogicalFilter(condition=[SEARCH($0, Sarg[[2000-01-01 00:00:00:TIMESTAMP(3)..2000-01-02 00:00:00:TIMESTAMP(3)), [2001-01-01 00:00:00:TIMESTAMP(3)..2001-01-02 00:00:00:TIMESTAMP(3))]:TIMESTAMP(3))]) + LogicalSort(fetch=[100]) + LogicalTableScan(table=[[druid, foo]]) + +!logicalPlan +DruidProject(__time=[$0], m1=[$5], druid=[logical]) + DruidFilter(condition=[SEARCH($0, Sarg[[2000-01-01 00:00:00:TIMESTAMP(3)..2000-01-02 00:00:00:TIMESTAMP(3)), [2001-01-01 00:00:00:TIMESTAMP(3)..2001-01-02 00:00:00:TIMESTAMP(3))]:TIMESTAMP(3))]) + DruidSort(fetch=[100], druid=[logical]) + DruidTableScan(table=[[druid, foo]], druid=[logical]) + +!druidPlan +{ + "queryType" : "scan", + "dataSource" : { + "type" : "query", + "query" : { + "queryType" : "scan", + "dataSource" : { + "type" : "table", + "name" : "foo" + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "resultFormat" : "compactedList", + "limit" : 100, + "columns" : [ "__time", "cnt", "dim1", "dim2", "dim3", "m1", "m2", "unique_dim1" ], + "columnTypes" : [ "LONG", "LONG", "STRING", "STRING", "STRING", "FLOAT", "DOUBLE", "COMPLEX" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false + } + }, + "intervals" : { + "type" : "intervals", + "intervals" : [ "-146136543-09-08T08:23:32.096Z/146140482-04-24T15:36:27.903Z" ] + }, + "resultFormat" : "compactedList", + "filter" : { + "type" : "or", + "fields" : [ { + "type" : "range", + "column" : "__time", + "matchValueType" : "LONG", + "lower" : 946684800000, + "upper" : 946771200000, + "upperOpen" : true + }, { + "type" : "range", + "column" : "__time", + "matchValueType" : "LONG", + "lower" : 978307200000, + "upper" : 978393600000, + "upperOpen" : true + } ] + }, + "columns" : [ "__time", "m1" ], + "columnTypes" : [ "LONG", "FLOAT" ], + "granularity" : { + "type" : "all" + }, + "legacy" : false +} +!nativePlan