Skip to content

Commit

Permalink
HTTP-74 Add path parameter support (#84)
Browse files Browse the repository at this point in the history
Signed-off-by: David Radley <[email protected]>
Co-authored-by: David Radley <[email protected]>
  • Loading branch information
OlivierZembri and davidradl authored Apr 3, 2024
1 parent 93b0358 commit 3c57b39
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 2 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

## [Unreleased]

### Added

- Added support for using the result of a lookup join operation in a subsequent select query that adds
or removes columns (project pushdown operation).

### Changed

- Changed [LookupQueryInfo](src/main/java/com/getindata/connectors/http/internal/table/lookup/LookupQueryInfo.java)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.DataTypes.Field;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
Expand Down Expand Up @@ -38,7 +39,7 @@
public class HttpLookupTableSource
implements LookupTableSource, SupportsProjectionPushDown, SupportsLimitPushDown {

private final DataType physicalRowDataType;
private DataType physicalRowDataType;

private final HttpLookupConfig lookupConfig;

Expand All @@ -58,6 +59,11 @@ public HttpLookupTableSource(
this.dynamicTableFactoryContext = dynamicTablecontext;
}

@Override
public void applyProjection(int[][] projectedFields, DataType producedDataType) {
physicalRowDataType = Projection.of(projectedFields).project(physicalRowDataType);
}

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {

Expand Down Expand Up @@ -127,7 +133,7 @@ public void applyLimit(long limit) {

@Override
public boolean supportsNestedProjection() {
return false;
return true;
}

private PollingClientFactory<RowData> createPollingClientFactory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,160 @@ public void testHttpsMTlsLookupJoin() throws Exception {
assertEnrichedRows(rows);
}

@Test
public void testLookupJoinProjectionPushDown() throws Exception {

// GIVEN
setUpServerBodyStub(
"POST",
wireMockServer,
List.of(
matchingJsonPath("$.row.aStringColumn"),
matchingJsonPath("$.row.anIntColumn"),
matchingJsonPath("$.row.aFloatColumn")
)
);

String fields =
"`row` ROW<`aStringColumn` STRING, `anIntColumn` INT, `aFloatColumn` FLOAT>\n";

String sourceTable =
"CREATE TABLE Orders (\n"
+ " proc_time AS PROCTIME(),\n"
+ " id STRING,\n"
+ fields
+ ") WITH ("
+ "'connector' = 'datagen',"
+ "'rows-per-second' = '1',"
+ "'fields.id.kind' = 'sequence',"
+ "'fields.id.start' = '1',"
+ "'fields.id.end' = '5'"
+ ")";

String lookupTable =
"CREATE TABLE Customers (\n" +
" `enrichedInt` INT,\n" +
" `enrichedString` STRING,\n" +
" \n"
+ fields
+ ") WITH ("
+ "'format' = 'json',"
+ "'lookup-request.format' = 'json',"
+ "'lookup-request.format.json.fail-on-missing-field' = 'true',"
+ "'connector' = 'rest-lookup',"
+ "'lookup-method' = 'POST',"
+ "'url' = 'http://localhost:9090/client',"
+ "'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',"
+ "'asyncPolling' = 'true'"
+ ")";

tEnv.executeSql(sourceTable);
tEnv.executeSql(lookupTable);

// WHEN
// SQL query that performs JOIN on both tables.
String joinQuery =
"CREATE TEMPORARY VIEW lookupResult AS " +
"SELECT o.id, o.`row`, c.enrichedInt, c.enrichedString FROM Orders AS o"
+ " JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c"
+ " ON (\n"
+ " o.`row` = c.`row`\n"
+ ")";

tEnv.executeSql(joinQuery);

// SQL query that performs a projection pushdown to limit the number of columns
String lastQuery =
"SELECT r.id, r.enrichedInt FROM lookupResult r;";

TableResult result = tEnv.executeSql(lastQuery);
result.await(15, TimeUnit.SECONDS);

// THEN
SortedSet<Row> collectedRows = getCollectedRows(result);

collectedRows.stream().forEach(row -> assertThat(row.getArity()).isEqualTo(2));

assertThat(collectedRows.size()).isEqualTo(5);
}

@Test
public void testLookupJoinProjectionPushDownNested() throws Exception {

// GIVEN
setUpServerBodyStub(
"POST",
wireMockServer,
List.of(
matchingJsonPath("$.row.aStringColumn"),
matchingJsonPath("$.row.anIntColumn"),
matchingJsonPath("$.row.aFloatColumn")
)
);

String fields =
"`row` ROW<`aStringColumn` STRING, `anIntColumn` INT, `aFloatColumn` FLOAT>\n";

String sourceTable =
"CREATE TABLE Orders (\n"
+ " proc_time AS PROCTIME(),\n"
+ " id STRING,\n"
+ fields
+ ") WITH ("
+ "'connector' = 'datagen',"
+ "'rows-per-second' = '1',"
+ "'fields.id.kind' = 'sequence',"
+ "'fields.id.start' = '1',"
+ "'fields.id.end' = '5'"
+ ")";

String lookupTable =
"CREATE TABLE Customers (\n" +
" `enrichedInt` INT,\n" +
" `enrichedString` STRING,\n" +
" \n"
+ fields
+ ") WITH ("
+ "'format' = 'json',"
+ "'lookup-request.format' = 'json',"
+ "'lookup-request.format.json.fail-on-missing-field' = 'true',"
+ "'connector' = 'rest-lookup',"
+ "'lookup-method' = 'POST',"
+ "'url' = 'http://localhost:9090/client',"
+ "'gid.connector.http.source.lookup.header.Content-Type' = 'application/json',"
+ "'asyncPolling' = 'true'"
+ ")";

tEnv.executeSql(sourceTable);
tEnv.executeSql(lookupTable);

// WHEN
// SQL query that performs JOIN on both tables.
String joinQuery =
"CREATE TEMPORARY VIEW lookupResult AS " +
"SELECT o.id, o.`row`, c.enrichedInt, c.enrichedString FROM Orders AS o"
+ " JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c"
+ " ON (\n"
+ " o.`row` = c.`row`\n"
+ ")";

tEnv.executeSql(joinQuery);

// SQL query that performs a project pushdown to take a subset of columns with nested value
String lastQuery =
"SELECT r.id, r.enrichedInt, r.`row`.aStringColumn FROM lookupResult r;";

TableResult result = tEnv.executeSql(lastQuery);
result.await(15, TimeUnit.SECONDS);

// THEN
SortedSet<Row> collectedRows = getCollectedRows(result);

collectedRows.stream().forEach(row -> assertThat(row.getArity()).isEqualTo(3));

assertThat(collectedRows.size()).isEqualTo(5);
}

@Test
public void testLookupJoinOnRowType() throws Exception {

Expand Down

0 comments on commit 3c57b39

Please sign in to comment.