diff --git a/CHANGELOG.md b/CHANGELOG.md index 4d6a7dcb..4baf3409 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,11 @@ ## [Unreleased] +### Added + +- Added support for using the result of a lookup join operation in a subsequent select query that adds + or removes columns (project pushdown operation). + ### Changed - Changed [LookupQueryInfo](src/main/java/com/getindata/connectors/http/internal/table/lookup/LookupQueryInfo.java) diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java index 9527f80f..5e688ed5 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java @@ -8,6 +8,7 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes.Field; +import org.apache.flink.table.connector.Projection; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.AsyncTableFunctionProvider; import org.apache.flink.table.connector.source.DynamicTableSource; @@ -38,7 +39,7 @@ public class HttpLookupTableSource implements LookupTableSource, SupportsProjectionPushDown, SupportsLimitPushDown { - private final DataType physicalRowDataType; + private DataType physicalRowDataType; private final HttpLookupConfig lookupConfig; @@ -58,6 +59,11 @@ public HttpLookupTableSource( this.dynamicTableFactoryContext = dynamicTablecontext; } + @Override + public void applyProjection(int[][] projectedFields, DataType producedDataType) { + physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType); + } + @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) { @@ -127,7 +133,7 @@ public void applyLimit(long limit) { @Override public boolean supportsNestedProjection() { - return false; + return true; } private PollingClientFactory createPollingClientFactory( diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java index 2994fbd4..1d7c43e8 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceITCaseTest.java @@ -243,6 +243,160 @@ public void testHttpsMTlsLookupJoin() throws Exception { assertEnrichedRows(rows); } + @Test + public void testLookupJoinProjectionPushDown() throws Exception { + + // GIVEN + setUpServerBodyStub( + "POST", + wireMockServer, + List.of( + matchingJsonPath("$.row.aStringColumn"), + matchingJsonPath("$.row.anIntColumn"), + matchingJsonPath("$.row.aFloatColumn") + ) + ); + + String fields = + "`row` ROW<`aStringColumn` STRING, `anIntColumn` INT, `aFloatColumn` FLOAT>\n"; + + String sourceTable = + "CREATE TABLE Orders (\n" + + " proc_time AS PROCTIME(),\n" + + " id STRING,\n" + + fields + + ") WITH (" + + "'connector' = 'datagen'," + + "'rows-per-second' = '1'," + + "'fields.id.kind' = 'sequence'," + + "'fields.id.start' = '1'," + + "'fields.id.end' = '5'" + + ")"; + + String lookupTable = + "CREATE TABLE Customers (\n" + + " `enrichedInt` INT,\n" + + " `enrichedString` STRING,\n" + + " \n" + + fields + + ") WITH (" + + "'format' = 'json'," + + "'lookup-request.format' = 'json'," + + "'lookup-request.format.json.fail-on-missing-field' = 'true'," + + "'connector' = 'rest-lookup'," + + "'lookup-method' = 'POST'," + + "'url' = 'http://localhost:9090/client'," + + "'gid.connector.http.source.lookup.header.Content-Type' = 'application/json'," + + "'asyncPolling' = 'true'" + + ")"; + + tEnv.executeSql(sourceTable); + tEnv.executeSql(lookupTable); + + // WHEN + // SQL query that performs JOIN on both tables. + String joinQuery = + "CREATE TEMPORARY VIEW lookupResult AS " + + "SELECT o.id, o.`row`, c.enrichedInt, c.enrichedString FROM Orders AS o" + + " JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c" + + " ON (\n" + + " o.`row` = c.`row`\n" + + ")"; + + tEnv.executeSql(joinQuery); + + // SQL query that performs a projection pushdown to limit the number of columns + String lastQuery = + "SELECT r.id, r.enrichedInt FROM lookupResult r;"; + + TableResult result = tEnv.executeSql(lastQuery); + result.await(15, TimeUnit.SECONDS); + + // THEN + SortedSet collectedRows = getCollectedRows(result); + + collectedRows.stream().forEach(row -> assertThat(row.getArity()).isEqualTo(2)); + + assertThat(collectedRows.size()).isEqualTo(5); + } + + @Test + public void testLookupJoinProjectionPushDownNested() throws Exception { + + // GIVEN + setUpServerBodyStub( + "POST", + wireMockServer, + List.of( + matchingJsonPath("$.row.aStringColumn"), + matchingJsonPath("$.row.anIntColumn"), + matchingJsonPath("$.row.aFloatColumn") + ) + ); + + String fields = + "`row` ROW<`aStringColumn` STRING, `anIntColumn` INT, `aFloatColumn` FLOAT>\n"; + + String sourceTable = + "CREATE TABLE Orders (\n" + + " proc_time AS PROCTIME(),\n" + + " id STRING,\n" + + fields + + ") WITH (" + + "'connector' = 'datagen'," + + "'rows-per-second' = '1'," + + "'fields.id.kind' = 'sequence'," + + "'fields.id.start' = '1'," + + "'fields.id.end' = '5'" + + ")"; + + String lookupTable = + "CREATE TABLE Customers (\n" + + " `enrichedInt` INT,\n" + + " `enrichedString` STRING,\n" + + " \n" + + fields + + ") WITH (" + + "'format' = 'json'," + + "'lookup-request.format' = 'json'," + + "'lookup-request.format.json.fail-on-missing-field' = 'true'," + + "'connector' = 'rest-lookup'," + + "'lookup-method' = 'POST'," + + "'url' = 'http://localhost:9090/client'," + + "'gid.connector.http.source.lookup.header.Content-Type' = 'application/json'," + + "'asyncPolling' = 'true'" + + ")"; + + tEnv.executeSql(sourceTable); + tEnv.executeSql(lookupTable); + + // WHEN + // SQL query that performs JOIN on both tables. + String joinQuery = + "CREATE TEMPORARY VIEW lookupResult AS " + + "SELECT o.id, o.`row`, c.enrichedInt, c.enrichedString FROM Orders AS o" + + " JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c" + + " ON (\n" + + " o.`row` = c.`row`\n" + + ")"; + + tEnv.executeSql(joinQuery); + + // SQL query that performs a project pushdown to take a subset of columns with nested value + String lastQuery = + "SELECT r.id, r.enrichedInt, r.`row`.aStringColumn FROM lookupResult r;"; + + TableResult result = tEnv.executeSql(lastQuery); + result.await(15, TimeUnit.SECONDS); + + // THEN + SortedSet collectedRows = getCollectedRows(result); + + collectedRows.stream().forEach(row -> assertThat(row.getArity()).isEqualTo(3)); + + assertThat(collectedRows.size()).isEqualTo(5); + } + @Test public void testLookupJoinOnRowType() throws Exception {