Skip to content

Commit

Permalink
Support source lookup array results (#135)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Grzegorz Kołakowski <[email protected]>
  • Loading branch information
grzegorz8 and Grzegorz Kołakowski authored Nov 28, 2024
1 parent b95e7fb commit 1a140ea
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 37 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

### Added

- Allow to fetch multiple results from REST API endpoint (`gid.connector.http.source.lookup.result-type`).

## [0.16.0] - 2024-10-18

### Added
Expand Down
10 changes: 10 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ Because of that, if AsyncIO timer passes, Flink will throw TimeoutException whic
The HTTP request timeouts on the other hand will not cause Job restart. In that case, exception will be logged into application logs.
To avoid job restart on timeouts caused by Lookup queries, the value of `gid.connector.http.source.lookup.request.timeout` should be smaller than `table.exec.async-lookup.timeout`.

#### Lookup multiple results

Typically, join can return zero, one or more results. What is more, there are lots of possible REST API designs and
pagination methods. Currently, the connector supports only two simple approaches (`gid.connector.http.source.lookup.result-type`):

- `single-value` - REST API returns single object.
- `array` - REST API returns array of objects. Pagination is not supported yet.

Please be informed that the mechanism will be enhanced in the future. See [HTTP-118](https://github.com/getindata/flink-http-connector/issues/118).

### HTTP Sink
The following example shows the minimum Table API example to create a [HttpDynamicSink](src/main/java/com/getindata/connectors/http/internal/table/HttpDynamicSink.java) that writes JSON values to an HTTP endpoint using POST method, assuming Flink has JAR of [JSON serializer](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/json/) installed:

Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ under the License.
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.2</log4j.version>
<lombok.version>1.18.22</lombok.version>
<jackson.version>2.18.1</jackson.version>
<junit4.version>4.13.2</junit4.version>
<junit5.version>5.10.1</junit5.version>
<junit.jupiter.version>${junit5.version}</junit.jupiter.version>
Expand Down Expand Up @@ -153,6 +154,12 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.getindata.connectors.http.internal;

import java.util.Optional;
import java.util.Collection;

import org.apache.flink.table.data.RowData;

Expand All @@ -14,5 +14,5 @@ public interface PollingClient<T> {
* @param lookupRow A {@link RowData} containing request parameters.
* @return an optional result of data lookup.
*/
Optional<T> pull(RowData lookupRow);
Collection<T> pull(RowData lookupRow);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public final class HttpConnectorConfigConstants {
public static final String LOOKUP_SOURCE_HEADER_USE_RAW = GID_CONNECTOR_HTTP
+ "source.lookup.use-raw-authorization-header";

public static final String RESULT_TYPE = GID_CONNECTOR_HTTP
+ "source.lookup.result-type";

// --------- Error code handling configuration ---------
public static final String HTTP_ERROR_SINK_CODE_WHITE_LIST =
GID_CONNECTOR_HTTP + "sink.error.code.exclude";
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.AccessLevel;
Expand Down Expand Up @@ -66,7 +64,6 @@ public void open(FunctionContext context) throws Exception {
@Override
public Collection<RowData> lookup(RowData keyRow) {
localHttpCallCounter.incrementAndGet();
Optional<RowData> result = client.pull(keyRow);
return result.map(Collections::singletonList).orElse(Collections.emptyList());
return client.pull(keyRow);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@
import java.net.http.HttpClient;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
Expand All @@ -19,6 +26,7 @@
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker;
import com.getindata.connectors.http.internal.status.ComposeHttpStatusCodeChecker.ComposeHttpStatusCodeCheckerConfig;
import com.getindata.connectors.http.internal.status.HttpStatusCodeChecker;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.RESULT_TYPE;

/**
* An implementation of {@link PollingClient} that uses Java 11's {@link HttpClient}.
Expand All @@ -27,6 +35,9 @@
@Slf4j
public class JavaNetHttpPollingClient implements PollingClient<RowData> {

private static final String RESULT_TYPE_SINGLE_VALUE = "single-value";
private static final String RESULT_TYPE_ARRAY = "array";

private final HttpClient httpClient;

private final HttpStatusCodeChecker statusCodeChecker;
Expand All @@ -35,8 +46,12 @@ public class JavaNetHttpPollingClient implements PollingClient<RowData> {

private final HttpRequestFactory requestFactory;

private final ObjectMapper objectMapper;

private final HttpPostRequestCallback<HttpLookupSourceRequestEntry> httpPostRequestCallback;

private final HttpLookupConfig options;

public JavaNetHttpPollingClient(
HttpClient httpClient,
DeserializationSchema<RowData> responseBodyDecoder,
Expand All @@ -47,6 +62,7 @@ public JavaNetHttpPollingClient(
this.responseBodyDecoder = responseBodyDecoder;
this.requestFactory = requestFactory;

this.objectMapper = new ObjectMapper();
this.httpPostRequestCallback = options.getHttpPostRequestCallback();

// TODO Inject this via constructor when implementing a response processor.
Expand All @@ -61,21 +77,22 @@ public JavaNetHttpPollingClient(
.build();

this.statusCodeChecker = new ComposeHttpStatusCodeChecker(checkerConfig);
this.options = options;
}

@Override
public Optional<RowData> pull(RowData lookupRow) {
public Collection<RowData> pull(RowData lookupRow) {
try {
log.debug("Optional<RowData> pull with Rowdata={}.", lookupRow);
log.debug("Collection<RowData> pull with Rowdata={}.", lookupRow);
return queryAndProcess(lookupRow);
} catch (Exception e) {
log.error("Exception during HTTP request.", e);
return Optional.empty();
return Collections.emptyList();
}
}

// TODO Add Retry Policy And configure TimeOut from properties
private Optional<RowData> queryAndProcess(RowData lookupData) throws Exception {
private Collection<RowData> queryAndProcess(RowData lookupData) throws Exception {

HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData);
HttpResponse<String> response = httpClient.send(
Expand All @@ -85,14 +102,14 @@ private Optional<RowData> queryAndProcess(RowData lookupData) throws Exception {
return processHttpResponse(response, request);
}

private Optional<RowData> processHttpResponse(
private Collection<RowData> processHttpResponse(
HttpResponse<String> response,
HttpLookupSourceRequestEntry request) throws IOException {

this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap());

if (response == null) {
return Optional.empty();
return Collections.emptyList();
}

String responseBody = response.body();
Expand All @@ -102,14 +119,14 @@ private Optional<RowData> processHttpResponse(
"with Server response body [%s] ", statusCode, responseBody));

if (notErrorCodeAndNotEmptyBody(responseBody, statusCode)) {
return Optional.ofNullable(responseBodyDecoder.deserialize(responseBody.getBytes()));
return deserialize(responseBody);
} else {
log.warn(
String.format("Returned Http status code was invalid or returned body was empty. "
+ "Status Code [%s]", statusCode)
);

return Optional.empty();
return Collections.emptyList();
}
}

Expand All @@ -122,4 +139,42 @@ private boolean notErrorCodeAndNotEmptyBody(String body, int statusCode) {
HttpRequestFactory getRequestFactory() {
return this.requestFactory;
}

private Collection<RowData> deserialize(String responseBody) throws IOException {
byte[] rawBytes = responseBody.getBytes();
String resultType =
options.getProperties().getProperty(RESULT_TYPE, RESULT_TYPE_SINGLE_VALUE);
if (resultType.equals(RESULT_TYPE_SINGLE_VALUE)) {
return deserializeSingleValue(rawBytes);
} else if (resultType.equals(RESULT_TYPE_ARRAY)) {
return deserializeArray(rawBytes);
} else {
throw new IllegalStateException(
String.format("Unknown lookup source result type '%s'.", resultType));
}
}

private List<RowData> deserializeSingleValue(byte[] rawBytes) throws IOException {
return Optional.ofNullable(responseBodyDecoder.deserialize(rawBytes))
.map(Collections::singletonList)
.orElse(Collections.emptyList());
}

private List<RowData> deserializeArray(byte[] rawBytes) throws IOException {
List<JsonNode> rawObjects =
objectMapper.readValue(rawBytes, new TypeReference<>() {
});
List<RowData> result = new ArrayList<>();
for (JsonNode rawObject : rawObjects) {
if (!(rawObject instanceof NullNode)) {
RowData deserialized =
responseBodyDecoder.deserialize(rawObject.toString().getBytes());
// deserialize() returns null if deserialization fails
if (deserialized != null) {
result.add(deserialized);
}
}
}
return result;
}
}
Loading

0 comments on commit 1a140ea

Please sign in to comment.