Skip to content

Commit

Permalink
Support source lookup array results
Browse files Browse the repository at this point in the history
  • Loading branch information
Grzegorz Kołakowski committed Nov 14, 2024
1 parent b95e7fb commit 12eba1e
Show file tree
Hide file tree
Showing 12 changed files with 189 additions and 38 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

### Added

- Allow to fetch multiple results from REST API endpoint (`gid.connector.http.source.lookup.result-type`).

## [0.16.0] - 2024-10-18

### Added
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ Because of that, if AsyncIO timer passes, Flink will throw TimeoutException whic
The HTTP request timeouts on the other hand will not cause Job restart. In that case, exception will be logged into application logs.
To avoid job restart on timeouts caused by Lookup queries, the value of `gid.connector.http.source.lookup.request.timeout` should be smaller than `table.exec.async-lookup.timeout`.

#### Lookup multiple results

Typically, join can return zero, one or more results. What is more, there are lots of possible REST API designs and
pagination methods. Currently, the connector supports only two simple approaches (`gid.connector.http.source.lookup.result-type`):

- `single-value` - REST API returns single object.
- `array` - REST API returns array of objects. Pagination is not supported yet.

Please be informed that the mechanism will be enhanced in the future. See [HTTP-118](https://github.com/getindata/flink-http-connector/issues/118).

### HTTP Sink
The following example shows the minimum Table API example to create a [HttpDynamicSink](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicSink.java) that writes JSON values to an HTTP endpoint using POST method, assuming Flink has JAR of [JSON serializer](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/json/) installed:

Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ under the License.
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
<lombok.version>1.18.22</lombok.version>
<jackson.version>2.18.1</jackson.version>
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.10.1</junit5.version>
<junit.jupiter.version>${junit5.version}</junit.jupiter.version>
Expand Down Expand Up @@ -153,6 +154,12 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.getindata.connectors.http.internal;

import java.util.Optional;
import java.util.Collection;

import org.apache.flink.table.data.RowData;

Expand All @@ -14,5 +14,5 @@ public interface PollingClient<T> {
* @param lookupRow A {@link RowData} containing request parameters.
* @return an optional result of data lookup.
*/
Optional<T> pull(RowData lookupRow);
Collection<T> pull(RowData lookupRow);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public final class HttpConnectorConfigConstants {
public static final String LOOKUP_SOURCE_HEADER_USE_RAW = GID_CONNECTOR_HTTP
+ "source.lookup.use-raw-authorization-header";

public static final String RESULT_TYPE = GID_CONNECTOR_HTTP
+ "source.lookup.result-type";

// --------- Error code handling configuration ---------
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "sink.error.code.exclude";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.AccessLevel;
Expand Down Expand Up @@ -66,7 +64,6 @@ public void open(FunctionContext context) throws Exception {
@Override
public Collection<RowData> lookup(RowData keyRow) {
localHttpCallCounter.incrementAndGet();
Optional<RowData> result = client.pull(keyRow);
return result.map(Collections::singletonList).orElse(Collections.emptyList());
return client.pull(keyRow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,14 @@
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.List;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
Expand All @@ -19,6 +24,7 @@
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.RESULT_TYPE;

/**
* An implementation of {@link PollingClient} that uses Java 11's {@link HttpClient}.
Expand All @@ -33,18 +39,24 @@ public class JavaNetHttpPollingClient implements PollingClient<RowData> {

private final DeserializationSchema<RowData> responseBodyDecoder;

private final ObjectMapper objectMapper;

private final HttpRequestFactory requestFactory;

private final HttpPostRequestCallback<HttpLookupSourceRequestEntry> httpPostRequestCallback;

private final HttpLookupConfig options;

public JavaNetHttpPollingClient(
HttpClient httpClient,
DeserializationSchema<RowData> responseBodyDecoder,
ObjectMapper objectMapper,
HttpLookupConfig options,
HttpRequestFactory requestFactory) {

this.httpClient = httpClient;
this.responseBodyDecoder = responseBodyDecoder;
this.objectMapper = objectMapper;
this.requestFactory = requestFactory;

this.httpPostRequestCallback = options.getHttpPostRequestCallback();
Expand All @@ -61,21 +73,22 @@ public JavaNetHttpPollingClient(
.build();

this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);
this.options = options;
}

@Override
public Optional<RowData> pull(RowData lookupRow) {
public Collection<RowData> pull(RowData lookupRow) {
try {
log.debug("Optional<RowData> pull with Rowdata={}.", lookupRow);
log.debug("Collection<RowData> pull with Rowdata={}.", lookupRow);
return queryAndProcess(lookupRow);
} catch (Exception e) {
log.error("Exception during HTTP request.", e);
return Optional.empty();
return Collections.emptyList();
}
}

// TODO Add Retry Policy And configure TimeOut from properties
private Optional<RowData> queryAndProcess(RowData lookupData) throws Exception {
private Collection<RowData> queryAndProcess(RowData lookupData) throws Exception {

HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData);
HttpResponse<String> response = httpClient.send(
Expand All @@ -85,14 +98,14 @@ private Optional<RowData> queryAndProcess(RowData lookupData) throws Exception {
return processHttpResponse(response, request);
}

private Optional<RowData> processHttpResponse(
private Collection<RowData> processHttpResponse(
HttpResponse<String> response,
HttpLookupSourceRequestEntry request) throws IOException {

this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap());

if (response == null) {
return Optional.empty();
return Collections.emptyList();
}

String responseBody = response.body();
Expand All @@ -102,14 +115,14 @@ private Optional<RowData> processHttpResponse(
"with Server response body [%s] ", statusCode, responseBody));

if (notErrorCodeAndNotEmptyBody(responseBody, statusCode)) {
return Optional.ofNullable(responseBodyDecoder.deserialize(responseBody.getBytes()));
return deserialize(responseBody);
} else {
log.warn(
String.format("Returned Http status code was invalid or returned body was empty. "
+ "Status Code [%s]", statusCode)
);

return Optional.empty();
return Collections.emptyList();
}
}

Expand All @@ -122,4 +135,24 @@ private boolean notErrorCodeAndNotEmptyBody(String body, int statusCode) {
HttpRequestFactory getRequestFactory() {
return this.requestFactory;
}

private Collection<RowData> deserialize(String responseBody) throws IOException {
byte[] rawBytes = responseBody.getBytes();
String resultType = options.getProperties().getProperty(RESULT_TYPE, "single-value");
if (resultType.equals("single-value")) {
return Collections.singletonList(responseBodyDecoder.deserialize(rawBytes));
} else if (resultType.equals("array")) {
List<JsonNode> rawObjects =
objectMapper.readValue(rawBytes, new TypeReference<>() {
});
List<RowData> result = new ArrayList<>(rawObjects.size());
for (JsonNode rawObject : rawObjects) {
result.add(responseBodyDecoder.deserialize(rawObject.toString().getBytes()));
}
return result;
} else {
throw new IllegalStateException(
String.format("Unknown lookup source result type '%'.", resultType));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.net.http.HttpClient;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.data.RowData;

Expand All @@ -26,6 +27,7 @@ public JavaNetHttpPollingClient createPollClient(
return new JavaNetHttpPollingClient(
httpClient,
schemaDecoder,
new ObjectMapper(),
options,
requestFactory
);
Expand Down
Loading

0 comments on commit 12eba1e

Please sign in to comment.