From 6be30b167990c22765c244a703ab0424e7c3b4d9 Mon Sep 17 00:00:00 2001 From: Xuyang Date: Mon, 5 Feb 2024 10:06:03 +0800 Subject: [PATCH] [FLINK-34323][table-planner] Fix named params in session window tvf This close #24243 --- .../sql/SqlSessionTableFunction.java | 2 +- .../functions/sql/SqlWindowTableFunction.java | 3 + .../stream/sql/WindowTableFunctionTest.xml | 154 +++++++++++++++--- .../stream/sql/WindowTableFunctionTest.scala | 65 ++++++++ 4 files changed, 204 insertions(+), 20 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java index 454d1af689861..895dbeefe1a83 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlSessionTableFunction.java @@ -68,7 +68,7 @@ public SqlSessionTableFunction() { /** Operand type checker for SESSION. */ private static class OperandMetadataImpl extends AbstractOperandMetadata { OperandMetadataImpl() { - super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, PARAM_SIZE), 3); + super(ImmutableList.of(PARAM_DATA, PARAM_TIMECOL, GAP), 3); } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java index 0e7879a6dcce7..3f22bed190751 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/SqlWindowTableFunction.java @@ -63,6 +63,9 @@ public class SqlWindowTableFunction extends org.apache.calcite.sql.SqlWindowTabl /** The slide interval, only used for HOP window. */ protected static final String PARAM_STEP = "STEP"; + /** The gap interval, only used for SESSION window. */ + protected static final String GAP = "GAP"; + /** * Type-inference strategy whereby the row type of a table function call is a ROW, which is * combined from the row type of operand #0 (which is a TABLE) and two additional fields. The diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml index fabcb77e269e9..bf45b5f6de5ee 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.xml @@ -16,23 +16,18 @@ See the License for the specific language governing permissions and limitations under the License. --> - + - + - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + TABLE MyTable PARTITION BY (b, a), + TIMECOL => DESCRIPTOR(rowtime), + GAP => INTERVAL '15' MINUTE)) +]]> + + + + + + diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala index ff64550e35d65..a364882fb9331 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/WindowTableFunctionTest.scala @@ -257,4 +257,69 @@ class WindowTableFunctionTest extends TableTestBase { util.verifyRelPlan(sql) } + @Test + def testSessionTVF(): Unit = { + val sql = + """ + |SELECT * + |FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) + |""".stripMargin + util.verifyRelPlan(sql) + } + + @Test + def testSessionTVFProctime(): Unit = { + val sql = + """ + |SELECT * + |FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(proctime), INTERVAL '15' MINUTE)) + |""".stripMargin + util.verifyRelPlan(sql) + } + + @Test + def testSessionTVFWithPartitionKeys(): Unit = { + val sql = + """ + |SELECT * + |FROM TABLE(SESSION(TABLE MyTable PARTITION BY (b, a), DESCRIPTOR(rowtime), INTERVAL '15' MINUTE)) + |""".stripMargin + util.verifyRelPlan(sql) + } + + @Test + def testSessionTVFWithNamedParams(): Unit = { + val sql = + """ + |SELECT * + |FROM TABLE( + | SESSION( + | DATA => TABLE MyTable PARTITION BY (b, a), + | TIMECOL => DESCRIPTOR(rowtime), + | GAP => INTERVAL '15' MINUTE)) + |""".stripMargin + util.verifyRelPlan(sql) + } + + @Test + def testWindowTVFWithNamedParamsOrderChange(): Unit = { + // the DATA param must be the first in FLIP-145 + // change the order about GAP and TIMECOL + // TODO fix it in FLINK-34338 + val sql = + """ + |SELECT * + |FROM TABLE( + | SESSION( + | DATA => TABLE MyTable PARTITION BY (b, a), + | GAP => INTERVAL '15' MINUTE, + | TIMECOL => DESCRIPTOR(rowtime))) + |""".stripMargin + + assertThatThrownBy(() => util.verifyRelPlan(sql)) + .hasMessage("fieldList must not be null, type = INTERVAL MINUTE") + .isInstanceOf[AssertionError] + + } + }