Skip to content

Commit

Permalink
HTTP-92 Customization of HTTP lookup source logger
Browse files Browse the repository at this point in the history
Customization of HTTP lookup source logger

Signed-off-by: Olivier Zembri <<[email protected]>>
  • Loading branch information
OlivierZembri authored and kristoffSC committed May 10, 2024
1 parent a1e485e commit d75e18d
Show file tree
Hide file tree
Showing 16 changed files with 240 additions and 16 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

## [Unreleased]

### Added

- Added support for optionally using a custom SLF4J logger to trace HTTP lookup queries.
New configuration parameter: `gid.connector.http.source.lookup.request-callback` with default value
`slf4j-lookup-logger`. If this parameter is not provided then the default SLF4J logger
[Slf4JHttpLookupPostRequestCallback](https://github.com/getindata/flink-http-connector/blob/main/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java)
is used instead.

## [0.13.0] - 2024-04-03

### Added
Expand Down
34 changes: 30 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -338,18 +338,43 @@ CREATE TABLE http (
```

#### Custom request/response callback
Http Sink processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the

- Http Sink processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the
behaviour of the additional stage of processing done by Table API Sink by implementing
[HttpPostRequestCallback](src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java) and
[HttpPostRequestCallbackFactory](src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java)
interfaces. Custom implementations of `HttpSinkRequestCallbackFactory` can be registered along other factories in
`resources/META-INF.services/org.apache.flink.table.factories.Factory` file and then referenced by their identifiers in
interfaces. Custom implementations of `HttpPostRequestCallbackFactory<HttpRequest>` can be registered along other factories in
`resources/META-INF/services/org.apache.flink.table.factories.Factory` file and then referenced by their identifiers in
the HttpSink DDL property field `gid.connector.http.sink.request-callback`.

A default implementation that logs those pairs as *INFO* level logs using Slf4j
For example, one can create a class `CustomHttpSinkPostRequestCallbackFactory` with a unique identifier, say `rest-sink-logger`,
that implements interface `HttpPostRequestCallbackFactory<HttpRequest>` to create a new instance of a custom callback
`CustomHttpSinkPostRequestCallback`. This factory can be registered along other factories by appending the fully-qualified name
of class `CustomHttpSinkPostRequestCallbackFactory` in `resources/META-INF/services/org.apache.flink.table.factories.Factory` file
and then reference identifier `rest-sink-logger` in the HttpSink DDL property field `gid.connector.http.sink.request-callback`.

A default implementation that logs those pairs as *INFO* level logs using Slf4j
([Slf4jHttpPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/sink/Slf4jHttpPostRequestCallback.java))
is provided.


- Http Lookup Source processes responses that it gets from the HTTP endpoint along their respective requests. One can customize the
behaviour of the additional stage of processing done by Table Function API by implementing
[HttpPostRequestCallback](src/main/java/com/getindata/connectors/http/HttpPostRequestCallback.java) and
[HttpPostRequestCallbackFactory](src/main/java/com/getindata/connectors/http/HttpPostRequestCallbackFactory.java)
interfaces.

For example, one can create a class `CustomHttpLookupPostRequestCallbackFactory` with a unique identifier, say `rest-lookup-logger`,
that implements interface `HttpPostRequestCallbackFactory<HttpLookupSourceRequestEntry>` to create a new instance of a custom callback
`CustomHttpLookupPostRequestCallback`. This factory can be registered along other factories by appending the fully-qualified name
of class `CustomHttpLookupPostRequestCallbackFactory` in `resources/META-INF/services/org.apache.flink.table.factories.Factory` file
and then reference identifier `rest-lookup-logger` in the HTTP lookup DDL property field `gid.connector.http.source.lookup.request-callback`.

A default implementation that logs those pairs as *INFO* level logs using Slf4j
([Slf4JHttpLookupPostRequestCallback](src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java))
is provided.


## HTTP status code handler
Http Sink and Lookup Source connectors allow defining list of HTTP status codes that should be treated as errors.
By default all 400s and 500s response codes will be interpreted as error code.
Expand Down Expand Up @@ -409,6 +434,7 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. |
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |
| gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. |

### HTTP Sink
| Option | Required | Description/Value |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,27 @@

import org.apache.flink.table.factories.Factory;

import com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSource;
import com.getindata.connectors.http.internal.table.sink.HttpDynamicSink;

/**
* The {@link Factory} that dynamically creates and injects {@link HttpPostRequestCallback} to
* {@link HttpDynamicSink}.
* {@link HttpDynamicSink} and {@link HttpLookupTableSource}.
*
* <p>Custom implementations of {@link HttpPostRequestCallbackFactory} can be registered along
* other factories in
* <pre>resources/META-INF.services/org.apache.flink.table.factories.Factory</pre>
* file and then referenced by their identifiers in the HttpSink DDL property field
* <i>gid.connector.http.sink.request-callback</i>.
* <pre>resources/META-INF/services/org.apache.flink.table.factories.Factory</pre>
* file and then referenced by their identifiers in:
* <li>
* The HttpSink DDL property field <i>gid.connector.http.sink.request-callback</i>
* for HTTP sink.
* </li>
* <li>
* The Http lookup DDL property field <i>gid.connector.http.source.lookup.request-callback</i>
* for HTTP lookup.
* </li>
*
* <br />
*
* <p>The following example shows the minimum Table API example to create a {@link HttpDynamicSink}
* that uses a custom callback created by a factory that returns <i>my-callback</i> as its
Expand All @@ -30,8 +40,24 @@
* )
* }</pre>
*
* <p>The following example shows the minimum Table API example to create a
* {@link HttpLookupTableSource} that uses a custom callback created by a factory that
* returns <i>my-callback</i> as its identifier.
*
* <pre>{@code
* CREATE TABLE httplookup (
* id bigint
* ) with (
* 'connector' = 'rest-lookup',
* 'url' = 'http://example.com/myendpoint',
* 'format' = 'json',
* 'gid.connector.http.source.lookup.request-callback' = 'my-callback'
* )
* }</pre>
*
* @param <RequestT> type of the HTTP request wrapper
*/

public interface HttpPostRequestCallbackFactory<RequestT> extends Factory {
/**
* @return {@link HttpPostRequestCallback} custom request callback instance
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ public final class HttpConnectorConfigConstants {
GID_CONNECTOR_HTTP + "source.lookup.error.code";
// -----------------------------------------------------

public static final String SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER =
GID_CONNECTOR_HTTP + "source.lookup.request-callback";

public static final String SINK_REQUEST_CALLBACK_IDENTIFIER =
GID_CONNECTOR_HTTP + "sink.request-callback";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;

import com.getindata.connectors.http.HttpPostRequestCallback;

@Builder
@Data
@RequiredArgsConstructor
Expand All @@ -26,4 +28,6 @@ public class HttpLookupConfig implements Serializable {

@Builder.Default
private final ReadableConfig readableConfig = new Configuration();

private final HttpPostRequestCallback<HttpLookupSourceRequestEntry> httpPostRequestCallback;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER;
import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER;

public class HttpLookupConnectorOptions {

Expand Down Expand Up @@ -47,4 +48,9 @@ public class HttpLookupConnectorOptions {
.booleanType()
.defaultValue(false)
.withDescription("Whether to use the raw value of Authorization header");

public static final ConfigOption<String> REQUEST_CALLBACK_IDENTIFIER =
ConfigOptions.key(SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER)
.stringType()
.defaultValue(Slf4jHttpLookupPostRequestCallbackFactory.IDENTIFIER);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static org.apache.flink.table.api.DataTypes.FIELD;
import static org.apache.flink.table.types.utils.DataTypeUtils.removeTimeAttribute;

import com.getindata.connectors.http.HttpPostRequestCallbackFactory;
import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants;
import com.getindata.connectors.http.internal.utils.ConfigUtils;
import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*;
Expand Down Expand Up @@ -88,20 +89,29 @@ public Set<ConfigOption<?>> requiredOptions() {

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD);
return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD, REQUEST_CALLBACK_IDENTIFIER);
}

private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) {

Properties httpConnectorProperties =
ConfigUtils.getHttpConnectorProperties(context.getCatalogTable().getOptions());

final HttpPostRequestCallbackFactory<HttpLookupSourceRequestEntry>
postRequestCallbackFactory =
FactoryUtil.discoverFactory(
context.getClassLoader(),
HttpPostRequestCallbackFactory.class,
readableConfig.get(REQUEST_CALLBACK_IDENTIFIER)
);

return HttpLookupConfig.builder()
.lookupMethod(readableConfig.get(LOOKUP_METHOD))
.url(readableConfig.get(URL))
.useAsync(readableConfig.get(ASYNC_POLLING))
.properties(httpConnectorProperties)
.readableConfig(readableConfig)
.httpPostRequestCallback(postRequestCallbackFactory.createHttpPostRequestCallback())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public JavaNetHttpPollingClient(
this.responseBodyDecoder = responseBodyDecoder;
this.requestFactory = requestFactory;

// TODO inject same way as it is done for Sink
this.httpPostRequestCallback = new Slf4JHttpLookupPostRequestCallback();
this.httpPostRequestCallback = options.getHttpPostRequestCallback();

// TODO Inject this via constructor when implementing a response processor.
// Processor will be injected and it will wrap statusChecker implementation.
Expand Down Expand Up @@ -92,22 +91,21 @@ private Optional<RowData> processHttpResponse(
this.httpPostRequestCallback.call(response, request, "endpoint", Collections.emptyMap());

if (response == null) {
log.warn("Null Http response for request " + request.getHttpRequest().uri().toString());
return Optional.empty();
}

String responseBody = response.body();
int statusCode = response.statusCode();

log.debug("Received {} status code for RestTableSource Request", statusCode);
log.debug("Received status code [%s] for RestTableSource request " +
"with Server response body [%s] ", statusCode, responseBody);

if (notErrorCodeAndNotEmptyBody(responseBody, statusCode)) {
log.trace("Server response body" + responseBody);
return Optional.ofNullable(responseBodyDecoder.deserialize(responseBody.getBytes()));
} else {
log.warn(
String.format("Returned Http status code was invalid or returned body was empty. "
+ "Status Code [%s], "
+ "response body [%s]", statusCode, responseBody)
+ "Status Code [%s]", statusCode)
);

return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public void call(
}

if (response == null) {
log.warn("Null Http response for request " + httpRequest.uri().toString());

log.info(
"Got response for a request.\n Request:\n URL: {}\n " +
"Method: {}\n Headers: {}\n Params/Body: {}\nResponse: null",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.util.HashSet;
import java.util.Set;

import org.apache.flink.configuration.ConfigOption;

import com.getindata.connectors.http.HttpPostRequestCallback;
import com.getindata.connectors.http.HttpPostRequestCallbackFactory;

/**
* Factory for creating {@link Slf4JHttpLookupPostRequestCallback}.
*/
public class Slf4jHttpLookupPostRequestCallbackFactory
implements HttpPostRequestCallbackFactory<HttpLookupSourceRequestEntry> {

public static final String IDENTIFIER = "slf4j-lookup-logger";

@Override
public HttpPostRequestCallback<HttpLookupSourceRequestEntry> createHttpPostRequestCallback() {
return new Slf4JHttpLookupPostRequestCallback();
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
return new HashSet<>();
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return new HashSet<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory
com.getindata.connectors.http.internal.table.lookup.querycreators.ElasticSearchLiteQueryCreatorFactory
com.getindata.connectors.http.internal.table.lookup.querycreators.GenericGetQueryCreatorFactory
com.getindata.connectors.http.internal.table.lookup.querycreators.GenericJsonQueryCreatorFactory
com.getindata.connectors.http.internal.table.lookup.Slf4jHttpLookupPostRequestCallbackFactory
com.getindata.connectors.http.internal.table.sink.HttpDynamicTableSinkFactory
com.getindata.connectors.http.internal.table.sink.Slf4jHttpPostRequestCallbackFactory
Loading

0 comments on commit d75e18d

Please sign in to comment.