From 534df6490e0fc179173efabd882ff10a749d508f Mon Sep 17 00:00:00 2001 From: bvarghese1 Date: Thu, 28 Dec 2023 19:00:59 -0800 Subject: [PATCH] [FLINK-33958] Remove IntervalJoin Json Plan & IT tests - The json tests are covered by the newly added restore tests --- .../exec/stream/IntervalJoinJsonPlanTest.java | 99 --- .../jsonplan/IntervalJoinJsonPlanITCase.java | 117 --- ...estProcessingTimeInnerJoinWithOnClause.out | 784 ------------------ .../testRowTimeInnerJoinWithOnClause.out | 594 ------------- 4 files changed, 1594 deletions(-) delete mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IntervalJoinJsonPlanTest.java delete mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java delete mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IntervalJoinJsonPlanTest_jsonplan/testProcessingTimeInnerJoinWithOnClause.out delete mode 100644 flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IntervalJoinJsonPlanTest_jsonplan/testRowTimeInnerJoinWithOnClause.out diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IntervalJoinJsonPlanTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IntervalJoinJsonPlanTest.java deleted file mode 100644 index 71a6f52ea0b0d..0000000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/IntervalJoinJsonPlanTest.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.plan.nodes.exec.stream; - -import org.apache.flink.table.api.TableConfig; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.planner.utils.StreamTableTestUtil; -import org.apache.flink.table.planner.utils.TableTestBase; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -/** Test json serialization/deserialization for IntervalJoin. */ -class IntervalJoinJsonPlanTest extends TableTestBase { - - private StreamTableTestUtil util; - private TableEnvironment tEnv; - - @BeforeEach - void setup() { - util = streamTestUtil(TableConfig.getDefault()); - tEnv = util.getTableEnv(); - - String srcTableA = - "CREATE TABLE A (\n" - + " a int,\n" - + " b varchar,\n" - + " c bigint,\n" - + " proctime as PROCTIME(),\n" - + " rowtime as TO_TIMESTAMP(FROM_UNIXTIME(c)),\n" - + " watermark for rowtime as rowtime - INTERVAL '1' second \n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false')"; - String srcTableB = - "CREATE TABLE B (\n" - + " a int,\n" - + " b varchar,\n" - + " c bigint,\n" - + " proctime as PROCTIME(),\n" - + " rowtime as TO_TIMESTAMP(FROM_UNIXTIME(c)),\n" - + " watermark for rowtime as rowtime - INTERVAL '1' second \n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'bounded' = 'false')"; - tEnv.executeSql(srcTableA); - tEnv.executeSql(srcTableB); - } - - @Test - void testProcessingTimeInnerJoinWithOnClause() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a int,\n" - + " b varchar\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "INSERT INTO MySink " - + " SELECT t1.a, t2.b FROM A t1 JOIN B t2 ON\n" - + " t1.a = t2.a AND \n" - + " t1.proctime BETWEEN t2.proctime - INTERVAL '1' HOUR AND t2.proctime + INTERVAL '1' HOUR"); - } - - @Test - void testRowTimeInnerJoinWithOnClause() { - String sinkTableDdl = - "CREATE TABLE MySink (\n" - + " a int,\n" - + " b varchar\n" - + ") with (\n" - + " 'connector' = 'values',\n" - + " 'table-sink-class' = 'DEFAULT')"; - tEnv.executeSql(sinkTableDdl); - util.verifyJsonPlan( - "INSERT INTO MySink " - + "SELECT t1.a, t2.b FROM A t1 JOIN B t2 ON\n" - + " t1.a = t2.a AND\n" - + " t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR"); - } -} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java deleted file mode 100644 index d4569058eaac4..0000000000000 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/jsonplan/IntervalJoinJsonPlanITCase.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.table.planner.runtime.stream.jsonplan; - -import org.apache.flink.table.planner.factories.TestValuesTableFactory; -import org.apache.flink.table.planner.utils.JsonPlanTestBase; -import org.apache.flink.types.Row; - -import org.junit.jupiter.api.Test; - -import java.util.Arrays; -import java.util.List; - -/** Test for IntervalJoin json plan. */ -class IntervalJoinJsonPlanITCase extends JsonPlanTestBase { - - /** test process time inner join. * */ - @Test - void testProcessTimeInnerJoin() throws Exception { - List rowT1 = - Arrays.asList( - Row.of(1, 1L, "Hi1"), - Row.of(1, 2L, "Hi2"), - Row.of(1, 5L, "Hi3"), - Row.of(2, 7L, "Hi5"), - Row.of(1, 9L, "Hi6"), - Row.of(1, 8L, "Hi8")); - - List rowT2 = Arrays.asList(Row.of(1, 1L, "HiHi"), Row.of(2, 2L, "HeHe")); - createTestValuesSourceTable( - "T1", rowT1, "a int", "b bigint", "c varchar", "proctime as PROCTIME()"); - createTestValuesSourceTable( - "T2", rowT2, "a int", "b bigint", "c varchar", "proctime as PROCTIME()"); - createTestValuesSinkTable("MySink", "a int", "c1 varchar", "c2 varchar"); - - compileSqlAndExecutePlan( - "insert into MySink " - + "SELECT t2.a, t2.c, t1.c\n" - + "FROM T1 as t1 join T2 as t2 ON\n" - + " t1.a = t2.a AND\n" - + " t1.proctime BETWEEN t2.proctime - INTERVAL '5' SECOND AND\n" - + " t2.proctime + INTERVAL '5' SECOND") - .await(); - List expected = - Arrays.asList( - "+I[1, HiHi, Hi1]", - "+I[1, HiHi, Hi2]", - "+I[1, HiHi, Hi3]", - "+I[1, HiHi, Hi6]", - "+I[1, HiHi, Hi8]", - "+I[2, HeHe, Hi5]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } - - @Test - void testRowTimeInnerJoin() throws Exception { - List rowT1 = - Arrays.asList( - Row.of(1, 1L, "Hi1"), - Row.of(1, 2L, "Hi2"), - Row.of(1, 5L, "Hi3"), - Row.of(2, 7L, "Hi5"), - Row.of(1, 9L, "Hi6"), - Row.of(1, 8L, "Hi8")); - - List rowT2 = Arrays.asList(Row.of(1, 1L, "HiHi"), Row.of(2, 2L, "HeHe")); - createTestValuesSourceTable( - "T1", - rowT1, - "a int", - "b bigint", - "c varchar", - "rowtime as TO_TIMESTAMP (FROM_UNIXTIME(b))", - "watermark for rowtime as rowtime - INTERVAL '5' second"); - createTestValuesSourceTable( - "T2", - rowT2, - "a int", - "b bigint", - "c varchar", - "rowtime as TO_TIMESTAMP (FROM_UNIXTIME(b))", - "watermark for rowtime as rowtime - INTERVAL '5' second"); - createTestValuesSinkTable("MySink", "a int", "c1 varchar", "c2 varchar"); - - compileSqlAndExecutePlan( - "insert into MySink \n" - + "SELECT t2.a, t2.c, t1.c\n" - + "FROM T1 as t1 join T2 as t2 ON\n" - + " t1.a = t2.a AND\n" - + " t1.rowtime BETWEEN t2.rowtime - INTERVAL '5' SECOND AND\n" - + " t2.rowtime + INTERVAL '6' SECOND") - .await(); - List expected = - Arrays.asList( - "+I[1, HiHi, Hi1]", - "+I[1, HiHi, Hi2]", - "+I[1, HiHi, Hi3]", - "+I[2, HeHe, Hi5]"); - assertResult(expected, TestValuesTableFactory.getResultsAsStrings("MySink")); - } -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IntervalJoinJsonPlanTest_jsonplan/testProcessingTimeInnerJoinWithOnClause.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IntervalJoinJsonPlanTest_jsonplan/testProcessingTimeInnerJoinWithOnClause.out deleted file mode 100644 index 08846eee7452d..0000000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IntervalJoinJsonPlanTest_jsonplan/testProcessingTimeInnerJoinWithOnClause.out +++ /dev/null @@ -1,784 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`A`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "INT" - }, { - "name" : "b", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "c", - "dataType" : "BIGINT" - }, { - "name" : "proctime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$PROCTIME$1", - "operands" : [ ], - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, - "serializableString" : "PROCTIME()" - } - }, { - "name" : "rowtime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$TO_TIMESTAMP$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FROM_UNIXTIME$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT" - } ], - "type" : "VARCHAR(2147483647)" - } ], - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "TO_TIMESTAMP(FROM_UNIXTIME(`c`))" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$-$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "TIMESTAMP(3)" - }, { - "kind" : "LITERAL", - "value" : "1000", - "type" : "INTERVAL SECOND(6) NOT NULL" - } ], - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "`rowtime` - INTERVAL '1' SECOND" - } - } ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 2 ] ], - "producedType" : "ROW<`a` INT, `c` BIGINT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` INT, `c` BIGINT> NOT NULL" - } ] - }, - "outputType" : "ROW<`a` INT, `c` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, A, project=[a, c], metadata=[]]], fields=[a, c])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - }, { - "kind" : "CALL", - "internalName" : "$PROCTIME$1", - "operands" : [ ], - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, { - "kind" : "CALL", - "internalName" : "$TO_TIMESTAMP$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FROM_UNIXTIME$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT" - } ], - "type" : "VARCHAR(2147483647)" - } ], - "type" : "TIMESTAMP(3)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "proctime", - "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, { - "name" : "rowtime", - "fieldType" : "TIMESTAMP(3)" - } ] - }, - "description" : "Calc(select=[a, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])" - }, { - "id" : 3, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$-$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, { - "kind" : "LITERAL", - "value" : "1000", - "type" : "INTERVAL SECOND(6) NOT NULL" - } ], - "type" : "TIMESTAMP(3)" - }, - "rowtimeFieldIndex" : 2, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "proctime", - "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" - }, { - "id" : 4, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "proctime", - "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - } ] - }, - "description" : "Calc(select=[a, proctime])" - }, { - "id" : 5, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "proctime", - "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - } ] - }, - "description" : "Exchange(distribution=[hash[a]])" - }, { - "id" : 6, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`B`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "INT" - }, { - "name" : "b", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "c", - "dataType" : "BIGINT" - }, { - "name" : "proctime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$PROCTIME$1", - "operands" : [ ], - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, - "serializableString" : "PROCTIME()" - } - }, { - "name" : "rowtime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$TO_TIMESTAMP$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FROM_UNIXTIME$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT" - } ], - "type" : "VARCHAR(2147483647)" - } ], - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "TO_TIMESTAMP(FROM_UNIXTIME(`c`))" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$-$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "TIMESTAMP(3)" - }, { - "kind" : "LITERAL", - "value" : "1000", - "type" : "INTERVAL SECOND(6) NOT NULL" - } ], - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "`rowtime` - INTERVAL '1' SECOND" - } - } ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c])", - "inputProperties" : [ ] - }, { - "id" : 7, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT" - }, { - "kind" : "CALL", - "internalName" : "$PROCTIME$1", - "operands" : [ ], - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, { - "kind" : "CALL", - "internalName" : "$TO_TIMESTAMP$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FROM_UNIXTIME$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT" - } ], - "type" : "VARCHAR(2147483647)" - } ], - "type" : "TIMESTAMP(3)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "b", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "c", - "fieldType" : "BIGINT" - }, { - "name" : "proctime", - "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, { - "name" : "rowtime", - "fieldType" : "TIMESTAMP(3)" - } ] - }, - "description" : "Calc(select=[a, b, c, PROCTIME() AS proctime, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])" - }, { - "id" : 8, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$-$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "TIMESTAMP(3)" - }, { - "kind" : "LITERAL", - "value" : "1000", - "type" : "INTERVAL SECOND(6) NOT NULL" - } ], - "type" : "TIMESTAMP(3)" - }, - "rowtimeFieldIndex" : 4, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "b", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "c", - "fieldType" : "BIGINT" - }, { - "name" : "proctime", - "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" - }, { - "id" : 9, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "b", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "proctime", - "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - } ] - }, - "description" : "Calc(select=[a, b, proctime])" - }, { - "id" : 10, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "b", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "proctime", - "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - } ] - }, - "description" : "Exchange(distribution=[hash[a]])" - }, { - "id" : 11, - "type" : "stream-exec-interval-join_1", - "intervalJoinSpec" : { - "joinSpec" : { - "joinType" : "INNER", - "leftKeys" : [ 0 ], - "rightKeys" : [ 0 ], - "filterNulls" : [ true ], - "nonEquiCondition" : null - }, - "windowBounds" : { - "isEventTime" : false, - "leftLowerBound" : -3600000, - "leftUpperBound" : 3600000, - "leftTimeIndex" : 1, - "rightTimeIndex" : 2 - } - }, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - }, { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "proctime", - "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, { - "name" : "a0", - "fieldType" : "INT" - }, { - "name" : "b", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "proctime0", - "fieldType" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - } ] - }, - "description" : "IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=false, leftLowerBound=-3600000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (proctime >= (proctime0 - 3600000:INTERVAL HOUR)) AND (proctime <= (proctime0 + 3600000:INTERVAL HOUR)))], select=[a, proctime, a0, b, proctime0])" - }, { - "id" : 12, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647)>", - "description" : "Calc(select=[a, b])" - }, { - "id" : 13, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "INT" - }, { - "name" : "b", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 5, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 8, - "target" : 9, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 9, - "target" : 10, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 11, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 10, - "target" : 11, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 11, - "target" : 12, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 12, - "target" : 13, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -} diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IntervalJoinJsonPlanTest_jsonplan/testRowTimeInnerJoinWithOnClause.out b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IntervalJoinJsonPlanTest_jsonplan/testRowTimeInnerJoinWithOnClause.out deleted file mode 100644 index ab5aa86939d24..0000000000000 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/nodes/exec/stream/IntervalJoinJsonPlanTest_jsonplan/testRowTimeInnerJoinWithOnClause.out +++ /dev/null @@ -1,594 +0,0 @@ -{ - "flinkVersion" : "", - "nodes" : [ { - "id" : 1, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`A`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "INT" - }, { - "name" : "b", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "c", - "dataType" : "BIGINT" - }, { - "name" : "proctime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$PROCTIME$1", - "operands" : [ ], - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, - "serializableString" : "PROCTIME()" - } - }, { - "name" : "rowtime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$TO_TIMESTAMP$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FROM_UNIXTIME$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT" - } ], - "type" : "VARCHAR(2147483647)" - } ], - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "TO_TIMESTAMP(FROM_UNIXTIME(`c`))" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$-$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "TIMESTAMP(3)" - }, { - "kind" : "LITERAL", - "value" : "1000", - "type" : "INTERVAL SECOND(6) NOT NULL" - } ], - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "`rowtime` - INTERVAL '1' SECOND" - } - } ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - }, - "abilities" : [ { - "type" : "ProjectPushDown", - "projectedFields" : [ [ 0 ], [ 2 ] ], - "producedType" : "ROW<`a` INT, `c` BIGINT> NOT NULL" - }, { - "type" : "ReadingMetadata", - "metadataKeys" : [ ], - "producedType" : "ROW<`a` INT, `c` BIGINT> NOT NULL" - } ] - }, - "outputType" : "ROW<`a` INT, `c` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, A, project=[a, c], metadata=[]]], fields=[a, c])", - "inputProperties" : [ ] - }, { - "id" : 2, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - }, { - "kind" : "CALL", - "internalName" : "$TO_TIMESTAMP$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FROM_UNIXTIME$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "BIGINT" - } ], - "type" : "VARCHAR(2147483647)" - } ], - "type" : "TIMESTAMP(3)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` INT, `rowtime` TIMESTAMP(3)>", - "description" : "Calc(select=[a, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])" - }, { - "id" : 3, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$-$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "TIMESTAMP(3)" - }, { - "kind" : "LITERAL", - "value" : "1000", - "type" : "INTERVAL SECOND(6) NOT NULL" - } ], - "type" : "TIMESTAMP(3)" - }, - "rowtimeFieldIndex" : 1, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" - }, { - "id" : 4, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "Exchange(distribution=[hash[a]])" - }, { - "id" : 5, - "type" : "stream-exec-table-source-scan_1", - "scanTableSource" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`B`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "INT" - }, { - "name" : "b", - "dataType" : "VARCHAR(2147483647)" - }, { - "name" : "c", - "dataType" : "BIGINT" - }, { - "name" : "proctime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$PROCTIME$1", - "operands" : [ ], - "type" : { - "type" : "TIMESTAMP_WITH_LOCAL_TIME_ZONE", - "nullable" : false, - "precision" : 3, - "kind" : "PROCTIME" - } - }, - "serializableString" : "PROCTIME()" - } - }, { - "name" : "rowtime", - "kind" : "COMPUTED", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "internalName" : "$TO_TIMESTAMP$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FROM_UNIXTIME$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT" - } ], - "type" : "VARCHAR(2147483647)" - } ], - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "TO_TIMESTAMP(FROM_UNIXTIME(`c`))" - } - } ], - "watermarkSpecs" : [ { - "rowtimeAttribute" : "rowtime", - "expression" : { - "rexNode" : { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$-$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 4, - "type" : "TIMESTAMP(3)" - }, { - "kind" : "LITERAL", - "value" : "1000", - "type" : "INTERVAL SECOND(6) NOT NULL" - } ], - "type" : "TIMESTAMP(3)" - }, - "serializableString" : "`rowtime` - INTERVAL '1' SECOND" - } - } ] - }, - "partitionKeys" : [ ], - "options" : { - "bounded" : "false", - "connector" : "values" - } - } - } - }, - "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `c` BIGINT>", - "description" : "TableSourceScan(table=[[default_catalog, default_database, B]], fields=[a, b, c])", - "inputProperties" : [ ] - }, { - "id" : 6, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 1, - "type" : "VARCHAR(2147483647)" - }, { - "kind" : "CALL", - "internalName" : "$TO_TIMESTAMP$1", - "operands" : [ { - "kind" : "CALL", - "internalName" : "$FROM_UNIXTIME$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "BIGINT" - } ], - "type" : "VARCHAR(2147483647)" - } ], - "type" : "TIMESTAMP(3)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647), `rowtime` TIMESTAMP(3)>", - "description" : "Calc(select=[a, b, TO_TIMESTAMP(FROM_UNIXTIME(c)) AS rowtime])" - }, { - "id" : 7, - "type" : "stream-exec-watermark-assigner_1", - "watermarkExpr" : { - "kind" : "CALL", - "syntax" : "SPECIAL", - "internalName" : "$-$1", - "operands" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 2, - "type" : "TIMESTAMP(3)" - }, { - "kind" : "LITERAL", - "value" : "1000", - "type" : "INTERVAL SECOND(6) NOT NULL" - } ], - "type" : "TIMESTAMP(3)" - }, - "rowtimeFieldIndex" : 2, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "b", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "WatermarkAssigner(rowtime=[rowtime], watermark=[(rowtime - 1000:INTERVAL SECOND)])" - }, { - "id" : 8, - "type" : "stream-exec-exchange_1", - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "HASH", - "keys" : [ 0 ] - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "b", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "Exchange(distribution=[hash[a]])" - }, { - "id" : 9, - "type" : "stream-exec-interval-join_1", - "intervalJoinSpec" : { - "joinSpec" : { - "joinType" : "INNER", - "leftKeys" : [ 0 ], - "rightKeys" : [ 0 ], - "filterNulls" : [ true ], - "nonEquiCondition" : null - }, - "windowBounds" : { - "isEventTime" : true, - "leftLowerBound" : -10000, - "leftUpperBound" : 3600000, - "leftTimeIndex" : 1, - "rightTimeIndex" : 2 - } - }, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - }, { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : { - "type" : "ROW", - "fields" : [ { - "name" : "a", - "fieldType" : "INT" - }, { - "name" : "rowtime", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - }, { - "name" : "a0", - "fieldType" : "INT" - }, { - "name" : "b", - "fieldType" : "VARCHAR(2147483647)" - }, { - "name" : "rowtime0", - "fieldType" : { - "type" : "TIMESTAMP_WITHOUT_TIME_ZONE", - "precision" : 3, - "kind" : "ROWTIME" - } - } ] - }, - "description" : "IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 10000:INTERVAL SECOND)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])" - }, { - "id" : 10, - "type" : "stream-exec-calc_1", - "projection" : [ { - "kind" : "INPUT_REF", - "inputIndex" : 0, - "type" : "INT" - }, { - "kind" : "INPUT_REF", - "inputIndex" : 3, - "type" : "VARCHAR(2147483647)" - } ], - "condition" : null, - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647)>", - "description" : "Calc(select=[a, b])" - }, { - "id" : 11, - "type" : "stream-exec-sink_1", - "configuration" : { - "table.exec.sink.keyed-shuffle" : "AUTO", - "table.exec.sink.not-null-enforcer" : "ERROR", - "table.exec.sink.rowtime-inserter" : "ENABLED", - "table.exec.sink.type-length-enforcer" : "IGNORE", - "table.exec.sink.upsert-materialize" : "AUTO" - }, - "dynamicTableSink" : { - "table" : { - "identifier" : "`default_catalog`.`default_database`.`MySink`", - "resolvedTable" : { - "schema" : { - "columns" : [ { - "name" : "a", - "dataType" : "INT" - }, { - "name" : "b", - "dataType" : "VARCHAR(2147483647)" - } ], - "watermarkSpecs" : [ ] - }, - "partitionKeys" : [ ], - "options" : { - "connector" : "values", - "table-sink-class" : "DEFAULT" - } - } - } - }, - "inputChangelogMode" : [ "INSERT" ], - "inputProperties" : [ { - "requiredDistribution" : { - "type" : "UNKNOWN" - }, - "damBehavior" : "PIPELINED", - "priority" : 0 - } ], - "outputType" : "ROW<`a` INT, `b` VARCHAR(2147483647)>", - "description" : "Sink(table=[default_catalog.default_database.MySink], fields=[a, b])" - } ], - "edges" : [ { - "source" : 1, - "target" : 2, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 2, - "target" : 3, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 3, - "target" : 4, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 5, - "target" : 6, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 6, - "target" : 7, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 7, - "target" : 8, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 4, - "target" : 9, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 8, - "target" : 9, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 9, - "target" : 10, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - }, { - "source" : 10, - "target" : 11, - "shuffle" : { - "type" : "FORWARD" - }, - "shuffleMode" : "PIPELINED" - } ] -}