Skip to content

Commit

Permalink
[FLINK-34323][table-planner] Fix named params in session window tvf
Browse files Browse the repository at this point in the history
This close #24243
  • Loading branch information
xuyangzhong authored Feb 5, 2024
1 parent a603f2b commit 6be30b1
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public SqlSessionTableFunction() {
/** Operand type checker for SESSION. */
private static class OperandMetadataImpl extends AbstractOperandMetadata {
OperandMetadataImpl() {
super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SIZE), 3);
super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, GAP), 3);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class SqlWindowTableFunction extends org.apache.calcite.sql.SqlWindowTabl
/** The slide interval, only used for HOP window. */
protected static final String PARAM_STEP = "STEP";

/** The gap interval, only used for SESSION window. */
protected static final String GAP = "GAP";

/**
* Type-inference strategy whereby the row type of a table function call is a ROW, which is
* combined from the row type of operand #0 (which is a TABLE) and two additional fields. The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,18 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
<TestCase name="testCumulateTVFWithNegativeOffset">
<TestCase name="testCumulateTVF">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(
CUMULATE(
TABLE MyTable,
DESCRIPTOR(rowtime),
INTERVAL '1' MINUTE,
INTERVAL '15' MINUTE,
INTERVAL '-5' MINUTE))
CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], window_start=[$6], window_end=[$7], window_time=[$8])
+- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($4), 60000:INTERVAL MINUTE, 900000:INTERVAL MINUTE, -300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($4), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
Expand All @@ -42,25 +37,25 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], wind
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[15 min], step=[1 min], offset=[-5 min])])
+- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testCumulateTVF">
<TestCase name="testCumulateTVFProctime">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(
CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))
CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], window_start=[$6], window_end=[$7], window_time=[$8])
+- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($4), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($5), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
Expand All @@ -69,26 +64,59 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], wind
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[1 h], step=[10 min])])
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
+- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[1 h], step=[10 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testCumulateTVFProctime">
<TestCase name="testSessionTVFWithPartitionKeys">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(SESSION(TABLE MyTable PARTITION BY (b, a), DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], window_start=[$6], window_end=[$7], window_time=[$8])
+- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($1, $0), DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[15 min], partition keys=[b, a])])
+- Exchange(distribution=[hash[b, a]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testCumulateTVFWithNegativeOffset">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(
CUMULATE(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '10' MINUTE, INTERVAL '1' HOUR))
CUMULATE(
TABLE MyTable,
DESCRIPTOR(rowtime),
INTERVAL '1' MINUTE,
INTERVAL '15' MINUTE,
INTERVAL '-5' MINUTE))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], window_start=[$6], window_end=[$7], window_time=[$8])
+- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($5), 600000:INTERVAL MINUTE, 3600000:INTERVAL HOUR)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+- LogicalTableFunctionScan(invocation=[CUMULATE(DESCRIPTOR($4), 60000:INTERVAL MINUTE, 900000:INTERVAL MINUTE, -300000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
Expand All @@ -97,8 +125,8 @@ LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], wind
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
+- WindowTableFunction(window=[CUMULATE(time_col=[proctime], max_size=[1 h], step=[10 min])])
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[CUMULATE(time_col=[rowtime], max_size=[15 min], step=[1 min], offset=[-5 min])])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
Expand Down Expand Up @@ -288,6 +316,94 @@ Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, wi
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testSessionTVF">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], window_start=[$6], window_end=[$7], window_time=[$8])
+- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[15 min])])
+- Exchange(distribution=[single])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testSessionTVFProctime">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], window_start=[$6], window_end=[$7], window_time=[$8])
+- LogicalTableFunctionScan(invocation=[SESSION(DESCRIPTOR($5), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP_LTZ(3) *PROCTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, PROCTIME_MATERIALIZE(window_time) AS window_time])
+- WindowTableFunction(window=[SESSION(time_col=[proctime], gap=[15 min])])
+- Exchange(distribution=[single])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testSessionTVFWithNamedParams">
<Resource name="sql">
<![CDATA[
SELECT *
FROM TABLE(
SESSION(
DATA => TABLE MyTable PARTITION BY (b, a),
TIMECOL => DESCRIPTOR(rowtime),
GAP => INTERVAL '15' MINUTE))
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5], window_start=[$6], window_end=[$7], window_time=[$8])
+- LogicalTableFunctionScan(invocation=[SESSION(PARTITION BY($1, $0), DESCRIPTOR($4), 900000:INTERVAL MINUTE)], rowType=[RecordType(INTEGER a, BIGINT b, VARCHAR(2147483647) c, DECIMAL(10, 3) d, TIMESTAMP(3) *ROWTIME* rowtime, TIMESTAMP_LTZ(3) *PROCTIME* proctime, TIMESTAMP(3) window_start, TIMESTAMP(3) window_end, TIMESTAMP(3) *ROWTIME* window_time)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[$5])
+- LogicalWatermarkAssigner(rowtime=[rowtime], watermark=[-($4, 1000:INTERVAL SECOND)])
+- LogicalProject(a=[$0], b=[$1], c=[$2], d=[$3], rowtime=[$4], proctime=[PROCTIME()])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a, b, c, d, rowtime, PROCTIME_MATERIALIZE(proctime) AS proctime, window_start, window_end, window_time])
+- WindowTableFunction(window=[SESSION(time_col=[rowtime], gap=[15 min], partition keys=[b, a])])
+- Exchange(distribution=[hash[b, a]])
+- WatermarkAssigner(rowtime=[rowtime], watermark=[-(rowtime, 1000:INTERVAL SECOND)])
+- Calc(select=[a, b, c, d, rowtime, PROCTIME() AS proctime])
+- TableSourceScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, d, rowtime])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,69 @@ class WindowTableFunctionTest extends TableTestBase {
util.verifyRelPlan(sql)
}

@Test
def testSessionTVF(): Unit = {
val sql =
"""
|SELECT *
|FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
|""".stripMargin
util.verifyRelPlan(sql)
}

@Test
def testSessionTVFProctime(): Unit = {
val sql =
"""
|SELECT *
|FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE))
|""".stripMargin
util.verifyRelPlan(sql)
}

@Test
def testSessionTVFWithPartitionKeys(): Unit = {
val sql =
"""
|SELECT *
|FROM TABLE(SESSION(TABLE MyTable PARTITION BY (b, a), DESCRIPTOR(rowtime), INTERVAL '15' MINUTE))
|""".stripMargin
util.verifyRelPlan(sql)
}

@Test
def testSessionTVFWithNamedParams(): Unit = {
val sql =
"""
|SELECT *
|FROM TABLE(
| SESSION(
| DATA => TABLE MyTable PARTITION BY (b, a),
| TIMECOL => DESCRIPTOR(rowtime),
| GAP => INTERVAL '15' MINUTE))
|""".stripMargin
util.verifyRelPlan(sql)
}

@Test
def testWindowTVFWithNamedParamsOrderChange(): Unit = {
// the DATA param must be the first in FLIP-145
// change the order about GAP and TIMECOL
// TODO fix it in FLINK-34338
val sql =
"""
|SELECT *
|FROM TABLE(
| SESSION(
| DATA => TABLE MyTable PARTITION BY (b, a),
| GAP => INTERVAL '15' MINUTE,
| TIMECOL => DESCRIPTOR(rowtime)))
|""".stripMargin

assertThatThrownBy(() => util.verifyRelPlan(sql))
.hasMessage("fieldList must not be null, type = INTERVAL MINUTE")
.isInstanceOf[AssertionError]

}

}

0 comments on commit 6be30b1

Please sign in to comment.