Skip to content

Commit

Permalink
HTTP63 HTTP63 caching for synchronous lookups
Browse files Browse the repository at this point in the history
Signed-off-by: David Radley <[email protected]>
  • Loading branch information
davidradl committed Jun 12, 2024
1 parent c1119da commit 5506cb3
Show file tree
Hide file tree
Showing 9 changed files with 355 additions and 63 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
[Slf4JHttpLookupPostRequestCallback](https://github.com/getindata/flink-http-connector/blob/main/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java)
is used instead.

- Added support for caching of synchronous lookup joins.

## [0.13.0] - 2024-04-03

### Added
Expand Down
61 changes: 42 additions & 19 deletions README.md

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ under the License.

<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
section, omitting the patch part (so for 1.15.0 use 1.15). -->

<flink.version>1.16.3</flink.version>

<target.java.version>11</target.java.version>
Expand Down Expand Up @@ -296,7 +297,8 @@ under the License.
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<configuration>

<!-- argLine needed for Flink 1.16 and 1.17 or there are unit test errors-->
<argLine>--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED</argLine>
</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;

import com.getindata.connectors.http.internal.PollingClient;
import com.getindata.connectors.http.internal.PollingClientFactory;
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;

@Slf4j
public class CachingHttpTableLookupFunction extends LookupFunction {
private final PollingClientFactory<RowData> pollingClientFactory;

private final DeserializationSchema<RowData> responseSchemaDecoder;

@VisibleForTesting
@Getter(AccessLevel.PACKAGE)
private final LookupRow lookupRow;

@VisibleForTesting
@Getter(AccessLevel.PACKAGE)
private final HttpLookupConfig options;

private transient AtomicInteger localHttpCallCounter;

private transient PollingClient<RowData> client;

private LookupCache cache;

public CachingHttpTableLookupFunction(
PollingClientFactory<RowData> pollingClientFactory,
DeserializationSchema<RowData> responseSchemaDecoder,
LookupRow lookupRow,
HttpLookupConfig options,
LookupCache cache) {

this.pollingClientFactory = pollingClientFactory;
this.responseSchemaDecoder = responseSchemaDecoder;
this.lookupRow = lookupRow;
this.options = options;
this.cache = cache;
}

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);

this.responseSchemaDecoder.open(
SerializationSchemaUtils
.createDeserializationInitContext(CachingHttpTableLookupFunction.class));

this.localHttpCallCounter = new AtomicInteger(0);
this.client = pollingClientFactory
.createPollClient(options, responseSchemaDecoder);

context
.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
}

/**
* This is a lookup method which is called by Flink framework at runtime.
*/
@Override
public Collection<RowData> lookup(RowData keyRow) throws IOException {
log.debug("lookup=" + lookupRow);
localHttpCallCounter.incrementAndGet();
Optional<RowData> rowData= client.pull(keyRow);
List<RowData> result = new ArrayList<>();
rowData.ifPresent(row -> { result.add(row); });
log.debug("lookup result=" + result);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import org.apache.flink.table.connector.source.AsyncTableFunctionProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider;
import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.FactoryUtil;
Expand Down Expand Up @@ -46,17 +48,20 @@ public class HttpLookupTableSource
private final DynamicTableFactory.Context dynamicTableFactoryContext;

private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
private final LookupCache cache;

public HttpLookupTableSource(
DataType physicalRowDataType,
HttpLookupConfig lookupConfig,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
DynamicTableFactory.Context dynamicTablecontext) {
DynamicTableFactory.Context dynamicTablecontext,
LookupCache cache) {

this.physicalRowDataType = physicalRowDataType;
this.lookupConfig = lookupConfig;
this.decodingFormat = decodingFormat;
this.dynamicTableFactoryContext = dynamicTablecontext;
this.cache = cache;
}

@Override
Expand All @@ -66,6 +71,7 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType)

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
log.debug("getLookupRuntimeProvider Entry");

LookupRow lookupRow = extractLookupRow(lookupContext.getKeys());

Expand Down Expand Up @@ -94,21 +100,41 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex
PollingClientFactory<RowData> pollingClientFactory =
createPollingClientFactory(lookupQueryCreator, lookupConfig);

HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig
);
// In line with the JDBC implementation and current requirements, we are only
// supporting Partial Caching for synchronous operations.
return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory);
}

protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow,
DeserializationSchema<RowData> responseSchemaDecoder,
PollingClientFactory<RowData> pollingClientFactory) {
if (lookupConfig.isUseAsync()) {
HttpTableLookupFunction dataLookupFunction =
new HttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig
);
log.info("Using Async version of HttpLookupTable.");
return AsyncTableFunctionProvider.of(
new AsyncHttpTableLookupFunction(dataLookupFunction));
} else {
log.info("Using blocking version of HttpLookupTable.");
return TableFunctionProvider.of(dataLookupFunction);
CachingHttpTableLookupFunction dataLookupFunction =
new CachingHttpTableLookupFunction(
pollingClientFactory,
responseSchemaDecoder,
lookupRow,
lookupConfig,
cache
);
if (cache != null) {
log.debug("PartialCachingLookupProvider; cache = " + cache);
return PartialCachingLookupProvider.of(dataLookupFunction, cache);
} else {
log.debug("Using LookupFunctionProvider.");
return LookupFunctionProvider.of(dataLookupFunction);
}
}
}

Expand All @@ -118,7 +144,8 @@ public DynamicTableSource copy() {
physicalRowDataType,
lookupConfig,
decodingFormat,
dynamicTableFactoryContext
dynamicTableFactoryContext,
cache
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Set;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
Expand All @@ -15,6 +16,9 @@
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
Expand Down Expand Up @@ -48,7 +52,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, dynamicTableContext);

ReadableConfig readableConfig = helper.getOptions();
ReadableConfig readable = helper.getOptions();
helper.validateExcept(
// properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions
"table.",
Expand All @@ -62,7 +66,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
FactoryUtil.FORMAT
);

HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readableConfig);
HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readable);

ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema();

Expand All @@ -73,7 +77,8 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext)
physicalRowDataType,
lookupConfig,
decodingFormat,
dynamicTableContext
dynamicTableContext,
getLookupCache(readable)
);
}

Expand All @@ -89,7 +94,18 @@ public Set<ConfigOption<?>> requiredOptions() {

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD, REQUEST_CALLBACK_IDENTIFIER);

return Set.of(
URL_ARGS,
ASYNC_POLLING,
LOOKUP_METHOD,
REQUEST_CALLBACK_IDENTIFIER,
LookupOptions.CACHE_TYPE,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS,
LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE,
LookupOptions.PARTIAL_CACHE_MAX_ROWS,
LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY,
LookupOptions.MAX_RETRIES);
}

private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) {
Expand All @@ -115,6 +131,18 @@ private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig re
.build();
}

@Nullable
private LookupCache getLookupCache(ReadableConfig tableOptions) {
LookupCache cache = null;
// Do not support legacy cache options
if (tableOptions
.get(LookupOptions.CACHE_TYPE)
.equals(LookupOptions.LookupCacheType.PARTIAL)) {
cache = DefaultLookupCache.fromConfig(tableOptions);
}
return cache;
}

// TODO verify this since we are on 1.15 now.
// Backport from Flink 1.15-Master
private DataType toRowDataType(List<Column> columns, Predicate<Column> columnPredicate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,23 @@ public void open(FunctionContext context) throws Exception {
super.open(context);

this.responseSchemaDecoder.open(
SerializationSchemaUtils
.createDeserializationInitContext(HttpTableLookupFunction.class));
SerializationSchemaUtils
.createDeserializationInitContext(HttpTableLookupFunction.class));

this.localHttpCallCounter = new AtomicInteger(0);
this.client = pollingClientFactory
.createPollClient(options, responseSchemaDecoder);
.createPollClient(options, responseSchemaDecoder);

context
.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
context.getMetricGroup()
.gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue());
}

/**
* This is a lookup method which is called by Flink framework in a runtime.
*/
public void eval(Object... keys) {
lookupByKeys(keys)
.ifPresent(this::collect);
.ifPresent(this::collect);
}

public Optional<RowData> lookupByKeys(Object[] keys) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public JavaNetHttpPollingClient(
@Override
public Optional<RowData> pull(RowData lookupRow) {
try {
log.debug("Optional<RowData> pull with Rowdata=" + lookupRow);
return queryAndProcess(lookupRow);
} catch (Exception e) {
log.error("Exception during HTTP request.", e);
Expand Down
Loading

0 comments on commit 5506cb3

Please sign in to comment.