diff --git a/CHANGELOG.md b/CHANGELOG.md index 9a514f3a..21b8211d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,8 @@ [Slf4JHttpLookupPostRequestCallback](https://github.com/getindata/flink-http-connector/blob/main/src/main/java/com/getindata/connectors/http/internal/table/lookup/Slf4JHttpLookupPostRequestCallback.java) is used instead. +- Added support for caching of synchronous lookup joins. + ## [0.13.0] - 2024-04-03 ### Added diff --git a/README.md b/README.md index 2fb28f5f..016de876 100644 --- a/README.md +++ b/README.md @@ -417,24 +417,31 @@ is set to `'true'`, it will be used as header value as is, without any extra mod ## Table API Connector Options ### HTTP TableLookup Source -| Option | Required | Description/Value | -|---------------------------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| connector | required | The Value should be set to _rest-lookup_ | -| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. | -| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ | -| asyncPolling | optional | true/false - determines whether Async Pooling should be used. Mechanism is based on Flink's Async I/O. | -| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. | -| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. | -| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. | -| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. | -| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. | -| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | -| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. | -| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. | -| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. | -| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. | -| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. | -| gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. | +| Option | Required | Description/Value | +|---------------------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| connector | required | The Value should be set to _rest-lookup_ | +| format | required | Flink's format name that should be used to decode REST response, Use `json` for a typical REST endpoint. | +| url | required | The base URL that should be use for GET requests. For example _http://localhost:8080/client_ | +| asyncPolling | optional | true/false - determines whether Async Polling should be used. Mechanism is based on Flink's Async I/O. | +| lookup-method | optional | GET/POST/PUT (and any other) - determines what REST method should be used for lookup REST query. If not specified, `GET` method will be used. | +| lookup.cache | optional | Enum possible values: `NONE`, `PARTIAL`. The cache strategy for the lookup table. Currently supports `NONE` (no caching) and `PARTIAL` (caching entries on lookup operation in external API). | +| lookup.partial-cache.max-rows | optional | The max number of rows of lookup cache, over this value, the oldest rows will be expired. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | +| lookup.partial-cache.expire-after-write | optional | The max time to live for each rows in lookup cache after writing into the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | +| lookup.partial-cache.expire-after-access | optional | The max time to live for each rows in lookup cache after accessing the entry in the cache. Specify as a [Duration](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#duration). `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | +| lookup.partial-cache.cache-missing-key | optional | This is a boolean that defaults to true. Whether to store an empty value into the cache if the lookup key doesn't match any rows in the table. `lookup.cache` must be set to `PARTIAL` to use this option. See the following Lookup Cache section for more details. | +| lookup.max-retries | optional | The max retry times if the lookup failed; default is 3. See the following Lookup Cache section for more details. | +| gid.connector.http.lookup.error.code | optional | List of HTTP status codes that should be treated as errors by HTTP Source, separated with comma. | +| gid.connector.http.lookup.error.code.exclude | optional | List of HTTP status codes that should be excluded from the `gid.connector.http.lookup.error.code` list, separated with comma. | +| gid.connector.http.security.cert.server | optional | Path to trusted HTTP server certificate that should be add to connectors key store. More than one path can be specified using `,` as path delimiter. | +| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. | +| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | +| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. | +| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. | +| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. | +| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. | +| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. | +| gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. | + ### HTTP Sink | Option | Required | Description/Value | @@ -460,6 +467,22 @@ is set to `'true'`, it will be used as header value as is, without any extra mod | gid.connector.http.sink.writer.request.mode | optional | Sets Http Sink request submission mode. Two modes are available to select, `single` and `batch` which is the default mode if option is not specified. | | gid.connector.http.sink.request.batch.size | optional | Applicable only for `gid.connector.http.sink.writer.request.mode = batch`. Sets number of individual events/requests that will be submitted as one HTTP request by HTTP sink. The default value is 500 which is same as HTTP Sink `maxBatchSize` | + +## Lookup Cache +The HTTP Client connector can be used in temporal join as a lookup source (also known as a dimension table). + +By default, the lookup cache is not enabled. You can enable it by setting `lookup.cache` to `PARTIAL`. Caching is only enabled if `asyncPolling` = false. +The scope of the cache is per job, so long running jobs can benefit from this caching. + +The lookup cache is used to improve the performance of temporal joins. By default, the lookup cache is not enabled, so all the API requests are sent on the network. When the lookup cache is enabled, Flink looks in the cache first, and only sends requests +on the network when there is no cached value, then the cache is updated with the returned rows. The oldest rows in this cache are expired when the cache hits the max cached rows `lookup.partial-cache.max-rows` or when the row exceeds the max time to live specified by `lookup.partial-cache.expire-after-write` or `lookup.partial-cache.expire-after-access`. +The cached rows might not be the latest, but users can tune expiration options to a smaller value to have fresher data, but this may increase the number of API requests sent. So this is a balance between throughput and correctness. +A good use case for enabling this cache, is when the API responses are very slowly changing; for example master or reference data. +There are many cases when caching is not appropriate, for example calling an API to get the latest stock price. + +By default, flink caches the empty query result for a Primary key. You can toggle this behaviour by setting `lookup.partial-cache.cache-missing-key` to false. + + ## Build and deployment To build the project locally you need to have `maven 3` and Java 11+.
@@ -545,7 +568,7 @@ The mapping from Http Json Response to SQL table schema is done via Flink's Json ## TODO ### HTTP TableLookup Source -- Implement caches. +- Implement caches for async. - Think about Retry Policy for Http Request - Check other `//TODO`'s. diff --git a/pom.xml b/pom.xml index 4170de3e..6e1b6441 100644 --- a/pom.xml +++ b/pom.xml @@ -68,6 +68,7 @@ under the License. + 1.16.3 11 @@ -296,7 +297,8 @@ under the License. maven-surefire-plugin 3.0.0-M5 - + + --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/CachingHttpTableLookupFunction.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/CachingHttpTableLookupFunction.java new file mode 100644 index 00000000..1f1462be --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/CachingHttpTableLookupFunction.java @@ -0,0 +1,88 @@ +package com.getindata.connectors.http.internal.table.lookup; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import lombok.AccessLevel; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.functions.FunctionContext; +import org.apache.flink.table.functions.LookupFunction; + +import com.getindata.connectors.http.internal.PollingClient; +import com.getindata.connectors.http.internal.PollingClientFactory; +import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils; + +@Slf4j +public class CachingHttpTableLookupFunction extends LookupFunction { + private final PollingClientFactory pollingClientFactory; + + private final DeserializationSchema responseSchemaDecoder; + + @VisibleForTesting + @Getter(AccessLevel.PACKAGE) + private final LookupRow lookupRow; + + @VisibleForTesting + @Getter(AccessLevel.PACKAGE) + private final HttpLookupConfig options; + + private transient AtomicInteger localHttpCallCounter; + + private transient PollingClient client; + + private LookupCache cache; + + public CachingHttpTableLookupFunction( + PollingClientFactory pollingClientFactory, + DeserializationSchema responseSchemaDecoder, + LookupRow lookupRow, + HttpLookupConfig options, + LookupCache cache) { + + this.pollingClientFactory = pollingClientFactory; + this.responseSchemaDecoder = responseSchemaDecoder; + this.lookupRow = lookupRow; + this.options = options; + this.cache = cache; + } + + @Override + public void open(FunctionContext context) throws Exception { + super.open(context); + + this.responseSchemaDecoder.open( + SerializationSchemaUtils + .createDeserializationInitContext(CachingHttpTableLookupFunction.class)); + + this.localHttpCallCounter = new AtomicInteger(0); + this.client = pollingClientFactory + .createPollClient(options, responseSchemaDecoder); + + context + .getMetricGroup() + .gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue()); + } + + /** + * This is a lookup method which is called by Flink framework at runtime. + */ + @Override + public Collection lookup(RowData keyRow) throws IOException { + log.debug("lookup=" + lookupRow); + localHttpCallCounter.incrementAndGet(); + Optional rowData= client.pull(keyRow); + List result = new ArrayList<>(); + rowData.ifPresent(row -> { result.add(row); }); + log.debug("lookup result=" + result); + return result; + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java index 5e688ed5..67bcc1aa 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSource.java @@ -13,9 +13,11 @@ import org.apache.flink.table.connector.source.AsyncTableFunctionProvider; import org.apache.flink.table.connector.source.DynamicTableSource; import org.apache.flink.table.connector.source.LookupTableSource; -import org.apache.flink.table.connector.source.TableFunctionProvider; import org.apache.flink.table.connector.source.abilities.SupportsLimitPushDown; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; +import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider; +import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DynamicTableFactory; import org.apache.flink.table.factories.FactoryUtil; @@ -46,17 +48,20 @@ public class HttpLookupTableSource private final DynamicTableFactory.Context dynamicTableFactoryContext; private final DecodingFormat> decodingFormat; + private final LookupCache cache; public HttpLookupTableSource( DataType physicalRowDataType, HttpLookupConfig lookupConfig, DecodingFormat> decodingFormat, - DynamicTableFactory.Context dynamicTablecontext) { + DynamicTableFactory.Context dynamicTablecontext, + LookupCache cache) { this.physicalRowDataType = physicalRowDataType; this.lookupConfig = lookupConfig; this.decodingFormat = decodingFormat; this.dynamicTableFactoryContext = dynamicTablecontext; + this.cache = cache; } @Override @@ -66,6 +71,7 @@ public void applyProjection(int[][] projectedFields, DataType producedDataType) @Override public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) { + log.debug("getLookupRuntimeProvider Entry"); LookupRow lookupRow = extractLookupRow(lookupContext.getKeys()); @@ -94,21 +100,41 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContex PollingClientFactory pollingClientFactory = createPollingClientFactory(lookupQueryCreator, lookupConfig); - HttpTableLookupFunction dataLookupFunction = - new HttpTableLookupFunction( - pollingClientFactory, - responseSchemaDecoder, - lookupRow, - lookupConfig - ); + // In line with the JDBC implementation and current requirements, we are only + // supporting Partial Caching for synchronous operations. + return getLookupRuntimeProvider(lookupRow, responseSchemaDecoder, pollingClientFactory); + } + protected LookupRuntimeProvider getLookupRuntimeProvider(LookupRow lookupRow, + DeserializationSchema responseSchemaDecoder, + PollingClientFactory pollingClientFactory) { if (lookupConfig.isUseAsync()) { + HttpTableLookupFunction dataLookupFunction = + new HttpTableLookupFunction( + pollingClientFactory, + responseSchemaDecoder, + lookupRow, + lookupConfig + ); log.info("Using Async version of HttpLookupTable."); return AsyncTableFunctionProvider.of( new AsyncHttpTableLookupFunction(dataLookupFunction)); } else { - log.info("Using blocking version of HttpLookupTable."); - return TableFunctionProvider.of(dataLookupFunction); + CachingHttpTableLookupFunction dataLookupFunction = + new CachingHttpTableLookupFunction( + pollingClientFactory, + responseSchemaDecoder, + lookupRow, + lookupConfig, + cache + ); + if (cache != null) { + log.debug("PartialCachingLookupProvider; cache = " + cache); + return PartialCachingLookupProvider.of(dataLookupFunction, cache); + } else { + log.debug("Using LookupFunctionProvider."); + return LookupFunctionProvider.of(dataLookupFunction); + } } } @@ -118,7 +144,8 @@ public DynamicTableSource copy() { physicalRowDataType, lookupConfig, decodingFormat, - dynamicTableFactoryContext + dynamicTableFactoryContext, + cache ); } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java index 6822e91b..c9f8f8c2 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java @@ -5,6 +5,7 @@ import java.util.Set; import java.util.function.Predicate; import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.configuration.ConfigOption; @@ -15,6 +16,9 @@ import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.connector.format.DecodingFormat; import org.apache.flink.table.connector.source.DynamicTableSource; +import org.apache.flink.table.connector.source.lookup.LookupOptions; +import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; import org.apache.flink.table.data.RowData; import org.apache.flink.table.factories.DeserializationFormatFactory; import org.apache.flink.table.factories.DynamicTableSourceFactory; @@ -48,7 +52,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext) FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, dynamicTableContext); - ReadableConfig readableConfig = helper.getOptions(); + ReadableConfig readable = helper.getOptions(); helper.validateExcept( // properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions "table.", @@ -62,7 +66,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext) FactoryUtil.FORMAT ); - HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readableConfig); + HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readable); ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema(); @@ -73,7 +77,8 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext) physicalRowDataType, lookupConfig, decodingFormat, - dynamicTableContext + dynamicTableContext, + getLookupCache(readable) ); } @@ -89,7 +94,18 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { - return Set.of(URL_ARGS, ASYNC_POLLING, LOOKUP_METHOD, REQUEST_CALLBACK_IDENTIFIER); + + return Set.of( + URL_ARGS, + ASYNC_POLLING, + LOOKUP_METHOD, + REQUEST_CALLBACK_IDENTIFIER, + LookupOptions.CACHE_TYPE, + LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, + LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE, + LookupOptions.PARTIAL_CACHE_MAX_ROWS, + LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY, + LookupOptions.MAX_RETRIES); } private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) { @@ -115,6 +131,18 @@ private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig re .build(); } + @Nullable + private LookupCache getLookupCache(ReadableConfig tableOptions) { + LookupCache cache = null; + // Do not support legacy cache options + if (tableOptions + .get(LookupOptions.CACHE_TYPE) + .equals(LookupOptions.LookupCacheType.PARTIAL)) { + cache = DefaultLookupCache.fromConfig(tableOptions); + } + return cache; + } + // TODO verify this since we are on 1.15 now. // Backport from Flink 1.15-Master private DataType toRowDataType(List columns, Predicate columnPredicate) { diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java index 655669cd..562b8e75 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpTableLookupFunction.java @@ -53,16 +53,15 @@ public void open(FunctionContext context) throws Exception { super.open(context); this.responseSchemaDecoder.open( - SerializationSchemaUtils - .createDeserializationInitContext(HttpTableLookupFunction.class)); + SerializationSchemaUtils + .createDeserializationInitContext(HttpTableLookupFunction.class)); this.localHttpCallCounter = new AtomicInteger(0); this.client = pollingClientFactory - .createPollClient(options, responseSchemaDecoder); + .createPollClient(options, responseSchemaDecoder); - context - .getMetricGroup() - .gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue()); + context.getMetricGroup() + .gauge("http-table-lookup-call-counter", () -> localHttpCallCounter.intValue()); } /** @@ -70,7 +69,7 @@ public void open(FunctionContext context) throws Exception { */ public void eval(Object... keys) { lookupByKeys(keys) - .ifPresent(this::collect); + .ifPresent(this::collect); } public Optional lookupByKeys(Object[] keys) { diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index 44202341..0b0f3040 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -66,6 +66,7 @@ public JavaNetHttpPollingClient( @Override public Optional pull(RowData lookupRow) { try { + log.debug("Optional pull with Rowdata=" + lookupRow); return queryAndProcess(lookupRow); } catch (Exception e) { log.error("Exception during HTTP request.", e); diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java index 2861491f..8de98b25 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceTest.java @@ -1,28 +1,35 @@ package com.getindata.connectors.http.internal.table.lookup; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import org.apache.flink.configuration.ConfigOptions; +import org.apache.flink.metrics.groups.CacheMetricGroup; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; import org.apache.flink.table.catalog.UniqueConstraint; import org.apache.flink.table.connector.source.AsyncTableFunctionProvider; -import org.apache.flink.table.connector.source.TableFunctionProvider; +import org.apache.flink.table.connector.source.LookupTableSource; +import org.apache.flink.table.connector.source.lookup.LookupFunctionProvider; +import org.apache.flink.table.connector.source.lookup.PartialCachingLookupProvider; +import org.apache.flink.table.connector.source.lookup.cache.LookupCache; import org.apache.flink.table.data.RowData; import org.apache.flink.table.runtime.connector.source.LookupRuntimeProviderContext; import org.apache.flink.table.types.DataType; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; import static com.getindata.connectors.http.internal.table.lookup.HttpLookupTableSourceFactory.row; + class HttpLookupTableSourceTest { public static final DataType PHYSICAL_ROW_DATA_TYPE = @@ -68,12 +75,10 @@ void shouldCreateTableSourceWithParams() { HttpLookupTableSource tableSource = (HttpLookupTableSource) createTableSource(SCHEMA, getOptions()); - TableFunctionProvider lookupProvider = - (TableFunctionProvider) + LookupTableSource.LookupRuntimeProvider lookupProvider = tableSource.getLookupRuntimeProvider(new LookupRuntimeProviderContext(lookupKey)); - - HttpTableLookupFunction tableFunction = - (HttpTableLookupFunction) lookupProvider.createTableFunction(); + CachingHttpTableLookupFunction tableFunction = (CachingHttpTableLookupFunction) + ((LookupFunctionProvider) lookupProvider).createLookupFunction(); LookupRow actualLookupRow = tableFunction.getLookupRow(); assertThat(actualLookupRow).isNotNull(); @@ -98,32 +103,149 @@ void shouldCreateAsyncTableSourceWithParams() { Map options = getOptionsWithAsync(); HttpLookupTableSource tableSource = - (HttpLookupTableSource) createTableSource(SCHEMA, options); + (HttpLookupTableSource) createTableSource(SCHEMA, options); AsyncTableFunctionProvider lookupProvider = - (AsyncTableFunctionProvider) - tableSource.getLookupRuntimeProvider(new LookupRuntimeProviderContext(lookupKey)); + (AsyncTableFunctionProvider) + tableSource.getLookupRuntimeProvider( + new LookupRuntimeProviderContext(lookupKey)); AsyncHttpTableLookupFunction tableFunction = - (AsyncHttpTableLookupFunction) lookupProvider.createAsyncTableFunction(); + (AsyncHttpTableLookupFunction) lookupProvider.createAsyncTableFunction(); LookupRow actualLookupRow = tableFunction.getLookupRow(); assertThat(actualLookupRow).isNotNull(); assertThat(actualLookupRow.getLookupEntries()).isNotEmpty(); assertThat(actualLookupRow.getLookupPhysicalRowDataType()) - .isEqualTo(PHYSICAL_ROW_DATA_TYPE); + .isEqualTo(PHYSICAL_ROW_DATA_TYPE); HttpLookupConfig actualLookupConfig = tableFunction.getOptions(); assertThat(actualLookupConfig).isNotNull(); assertThat(actualLookupConfig.isUseAsync()).isTrue(); assertThat( - actualLookupConfig.getReadableConfig().get(HttpLookupConnectorOptions.ASYNC_POLLING) + actualLookupConfig.getReadableConfig().get(HttpLookupConnectorOptions.ASYNC_POLLING) ) - .withFailMessage( - "Readable config probably was not passed from Table Factory or it is empty.") - .isTrue(); + .withFailMessage( + "Readable config probably was not passed" + + " from Table Factory or it is empty.") + .isTrue(); + } + @ParameterizedTest + @MethodSource("configProvider") + void testgetLookupRuntimeProvider(TestSpec testSpec) throws Exception { + LookupCache cache = new LookupCache() { + @Override + public void open(CacheMetricGroup cacheMetricGroup) { + + } + + @Nullable + @Override + public Collection getIfPresent(RowData rowData) { + return null; + } + + @Override + public Collection put(RowData rowData, Collection collection) { + return null; + } + + @Override + public void invalidate(RowData rowData) { + + } + + @Override + public long size() { + return 0; + } + + @Override + public void close() throws Exception { + + } + }; + + HttpLookupConfig options = HttpLookupConfig.builder() + .useAsync(testSpec.isAsync) + .build(); + LookupTableSource.LookupRuntimeProvider lookupRuntimeProvider = + getLookupRuntimeProvider(testSpec.hasCache?cache:null, options); + assertTrue(testSpec.expected.isInstance(lookupRuntimeProvider)); + + } + + private static class TestSpec { + + boolean hasCache; + boolean isAsync; + + Class expected; + + private TestSpec(boolean hasCache, + boolean isAsync, + Class expected) { + this.hasCache = hasCache; + this.isAsync = isAsync; + this.expected = expected; + } + + @Override + public String toString() { + return "TestSpec{" + + "hasCache=" + +hasCache + + ", isAsync=" + + isAsync + + ", expected=" + + expected + + '}'; + } } + static Collection configProvider() { + return ImmutableList.builder() + .addAll(getTestSpecs()) + .build(); + } + + @NotNull + private static ImmutableList getTestSpecs() { + return ImmutableList.of( + + new TestSpec( + false, + false, + LookupFunctionProvider.class + ), + new TestSpec( + true, + false, + PartialCachingLookupProvider.class + ), + new TestSpec( + false, + true, + AsyncTableFunctionProvider.class + ), + new TestSpec( + true, + true, + AsyncTableFunctionProvider.class + ) + ); + } + + private static LookupTableSource.LookupRuntimeProvider + getLookupRuntimeProvider(LookupCache cache, HttpLookupConfig options) { + HttpLookupTableSource tableSource = new HttpLookupTableSource( + null, options, + null, null, cache); + int[][] lookupKeys = {{1, 2}}; + LookupTableSource.LookupContext lookupContext = + new LookupRuntimeProviderContext(lookupKeys); + return tableSource.getLookupRuntimeProvider(null, null, null); + } private Map getOptionsWithAsync() { Map options = getOptions(); options = new HashMap<>(options);