From f5bef347538c8d1fe127201d62dbdb9a2db93b19 Mon Sep 17 00:00:00 2001 From: davidradl Date: Thu, 15 Aug 2024 15:17:55 +0100 Subject: [PATCH] http91 bearer token support Signed-off-by: davidradl --- CHANGELOG.md | 4 + README.md | 27 +- pom.xml | 8 + .../internal/auth/OidcAccessTokenManager.java | 148 +++++++++++ .../config/HttpConnectorConfigConstants.java | 8 + .../lookup/HttpLookupConnectorOptions.java | 27 +- .../lookup/HttpLookupTableSourceFactory.java | 45 +++- .../lookup/JavaNetHttpPollingClient.java | 4 + .../table/lookup/RequestFactoryBase.java | 44 +++- .../auth/OidcAccessTokenManagerTest.java | 248 ++++++++++++++++++ .../HttpLookupTableSourceFactoryTest.java | 49 +++- 11 files changed, 593 insertions(+), 19 deletions(-) create mode 100644 src/main/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManager.java create mode 100644 src/test/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManagerTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 2889b9d3..c827f6b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Added + +- Added support for OIDC Bearer tokens. + ## [0.15.0] - 2024-07-30 ### Added diff --git a/README.md b/README.md index 3b3ce57a..a1c7e34d 100644 --- a/README.md +++ b/README.md @@ -410,13 +410,31 @@ In this special case, you can configure connector to trust all certificates with To enable this option use `gid.connector.http.security.cert.server.allowSelfSigned` property setting its value to `true`. ## Basic Authentication -The connector supports Basic Authentication mechanism using HTTP `Authorization` header. +The connector supports Basic Authentication using a HTTP `Authorization` header. The header value can be set via properties, similarly as for other headers. The connector converts the passed value to Base64 and uses it for the request. If the used value starts with the prefix `Basic `, or `gid.connector.http.source.lookup.use-raw-authorization-header` is set to `'true'`, it will be used as header value as is, without any extra modification. +## OIDC Bearer Authentication +The connector supports Bearer Authentication using a HTTP `Authorization` header. The [OAuth 2.0 rcf](https://datatracker.ietf.org/doc/html/rfc6749) mentions [Obtaining Authorization](https://datatracker.ietf.org/doc/html/rfc6749#section-4) +and an authorization grant. OIDC makes use of this [authorisation grant](https://datatracker.ietf.org/doc/html/rfc6749#section-1.3) in a [Token Request](https://openid.net/specs/openid-connect-core-1_0.html#TokenRequest) by including a [OAuth grant type](https://oauth.net/2/grant-types/) and associated properties, the response is the [token response](https://openid.net/specs/openid-connect-core-1_0.html#TokenResponse). + +If you want to use this authorization then you should supply the `Token Request` body in `application/x-www-form-urlencoded` encoding +in configuration property `gid.connector.http.security.oidc.token.request`. See [grant extension](https://datatracker.ietf.org/doc/html/rfc6749#section-4.5) for +an example of a customised grant type token request. The supplied `token request` will be issued to the +[token end point](https://datatracker.ietf.org/doc/html/rfc6749#section-3.2), whose url should be supplied in configuration property +`gid.connector.http.security.oidc.token.endpoint.url`. The returned `access token` is then cached and used for subsequent requests; if the token has expired then + a new one is requested. There is a property `gid.connector.http.security.oidc.token.expiry.reduction`, that defaults to 1 second; new tokens will +be requested if the current time is later than the cached token expiry time minus `gid.connector.http.security.oidc.token.expiry.reduction`. + +### Restrictions at this time +* No authentication is applied to the token request. +* This authentication is only available for HTTP TableLookup Source. +* The processing does not use the refresh token if it present. + ## Table API Connector Options ### HTTP TableLookup Source + | Option | Required | Description/Value | |---------------------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | connector | required | The Value should be set to _rest-lookup_ | @@ -436,19 +454,22 @@ is set to `'true'`, it will be used as header value as is, without any extra mod | gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. | | gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. | | gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. | +| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding | +| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued | +| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. | | gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. | | gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. | | gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. | | gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. | | gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. | - ### HTTP Sink + | Option | Required | Description/Value | |---------------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. | -| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. | | format | required | Specify what format to use. | +| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. | | insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. | | sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. | | sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. | diff --git a/pom.xml b/pom.xml index c160e556..6a3a8bbe 100644 --- a/pom.xml +++ b/pom.xml @@ -146,6 +146,12 @@ under the License. provided + + com.fasterxml.jackson.core + jackson-annotations + 2.17.1 + + org.apache.flink flink-connector-base @@ -297,6 +303,8 @@ under the License. maven-surefire-plugin 3.0.0-M5 + + --add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED diff --git a/src/main/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManager.java b/src/main/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManager.java new file mode 100644 index 00000000..c0db0472 --- /dev/null +++ b/src/main/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManager.java @@ -0,0 +1,148 @@ + +/* + * Copyright 2020 Red Hat + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.getindata.connectors.http.internal.auth; + +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.time.Instant; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; + +/** + * This class is inspired by + * https://github.com/Apicurio/apicurio-common-rest-client/blob/ + * 944ac9eb527c291a6083bd10ee012388e1684d20/rest-client-common/src/main/java/io/ + * apicurio/rest/client/auth/OidcAuth.java. + * + * The OIDC access token manager encapsulates the caching of an OIDC access token, + * which can be short lived, for example an hour. The authenticate method will return an + * un-expired access token, either from the cache or by requesting a new access token. + */ +public class OidcAccessTokenManager { + + private static final Duration DEFAULT_TOKEN_EXPIRATION_REDUCTION = Duration.ofSeconds(1); + private final HttpClient httpClient; + private final String tokenRequest; + + private final String url; + private final Duration tokenExpirationReduction; + + private String cachedAccessToken; + private Instant cachedAccessTokenExp; + + /** + * Construct an Oidc access token manager with the default token expiration reduction + * @param httpClient httpClient to use to call the token endpoint. + * @param tokenRequest token request + * @param url token endpoint url + */ + public OidcAccessTokenManager(HttpClient httpClient, String tokenRequest, String url) { + this(httpClient, tokenRequest, url, DEFAULT_TOKEN_EXPIRATION_REDUCTION); + } + /** + * Construct an Oidc access token manager with the supplied token expiration reduction + * @param httpClient httpClient to use to call the token endpoint. + * @param tokenRequest token request this need to be form urlencoded + * @param url token endpoint url + * @param tokenExpirationReduction token expiry reduction, request a new token if the + * current time is later than the cached access token + * expiry time reduced by this value. This means that + * we will not use the cached token if it is about + * to expire. + */ + public OidcAccessTokenManager(HttpClient httpClient, String tokenRequest, String url, + Duration tokenExpirationReduction) { + this.tokenRequest = tokenRequest; + this.httpClient = httpClient; + this.url = url; + if (null == tokenExpirationReduction) { + this.tokenExpirationReduction = DEFAULT_TOKEN_EXPIRATION_REDUCTION; + } else { + this.tokenExpirationReduction = tokenExpirationReduction; + } + } + + /** + * Request an access token from the token endpoint + */ + private void requestAccessToken() { + try { + HttpRequest httpRequest = + HttpRequest.newBuilder() + .uri(URI.create(url)) + .header("Content-Type", "application/x-www-form-urlencoded") + .method("POST", HttpRequest.BodyPublishers.ofString(tokenRequest)) + .build(); + + HttpResponse response = httpClient.send(httpRequest, + HttpResponse.BodyHandlers.ofByteArray()); + //create ObjectMapper instance + ObjectMapper objectMapper = new ObjectMapper(); + if (200 == response.statusCode()) { + byte[] bytes = response.body(); + JsonNode rootNode = objectMapper.readTree(bytes); + JsonNode tokenNode = rootNode.path("access_token"); + JsonNode expiresInNode = rootNode.path("expires_in"); + this.cachedAccessToken = tokenNode.textValue(); + /* + expiresIn is in seconds + */ + Duration expiresIn = Duration.ofSeconds(expiresInNode.asInt()); + if (expiresIn.compareTo(this.tokenExpirationReduction) > 0) { + //expiresIn is greater than tokenExpirationReduction + expiresIn = expiresIn.minus(this.tokenExpirationReduction); + } + this.cachedAccessTokenExp = Instant.now().plus(expiresIn); + } else { + throw new IllegalStateException("Attempted to get an access token but got http" + + " status code " + response.statusCode()); + } + } catch (JsonProcessingException e) { + throw new IllegalStateException("Error found while trying to request a new token"); + } catch (IOException e) { + throw new IllegalStateException("IO Exception occurred", e); + } catch (InterruptedException e) { + throw new IllegalStateException("Interrupted Exception occurred", e); + } + } + + /** + * Get a valid unexpired access token. + * @return access token. + */ + public String authenticate() { + if (isAccessTokenRequired()) { + requestAccessToken(); + } + return cachedAccessToken; + } + + private boolean isAccessTokenRequired() { + return null == cachedAccessToken || isTokenExpired(); + } + + private boolean isTokenExpired() { + return Instant.now().isAfter(this.cachedAccessTokenExp); + } +} diff --git a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java index 5415a159..b501b29b 100644 --- a/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java +++ b/src/main/java/com/getindata/connectors/http/internal/config/HttpConnectorConfigConstants.java @@ -27,6 +27,14 @@ public final class HttpConnectorConfigConstants { public static final String LOOKUP_SOURCE_HEADER_PREFIX = GID_CONNECTOR_HTTP + "source.lookup.header."; + public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP + + "security.oidc.token.request"; + + public static final String OIDC_AUTH_TOKEN_ENDPOINT_URL = GID_CONNECTOR_HTTP + + "security.oidc.token.endpoint.url"; + + public static final String OIDC_AUTH_TOKEN_EXPIRY_REDUCTION = GID_CONNECTOR_HTTP + + "security.oidc.token.expiry.reduction"; /** * Whether to use the raw value of the Authorization header. If set, it prevents * the special treatment of the header for Basic Authentication, thus preserving the passed diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java index 9a7b4320..9947d52d 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupConnectorOptions.java @@ -1,11 +1,11 @@ package com.getindata.connectors.http.internal.table.lookup; +import java.time.Duration; + import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ConfigOptions; -import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW; -import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_QUERY_CREATOR_IDENTIFIER; -import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER; +import static com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants.*; public class HttpLookupConnectorOptions { @@ -53,4 +53,25 @@ public class HttpLookupConnectorOptions { ConfigOptions.key(SOURCE_LOOKUP_REQUEST_CALLBACK_IDENTIFIER) .stringType() .defaultValue(Slf4jHttpLookupPostRequestCallbackFactory.IDENTIFIER); + + public static final ConfigOption SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL = + ConfigOptions.key(OIDC_AUTH_TOKEN_ENDPOINT_URL) + .stringType() + .noDefaultValue() + .withDescription("OIDC Token endpoint url."); + + public static final ConfigOption SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST = + ConfigOptions.key(OIDC_AUTH_TOKEN_REQUEST) + .stringType() + .noDefaultValue() + .withDescription("OIDC token request."); + + public static final ConfigOption SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION = + ConfigOptions.key(OIDC_AUTH_TOKEN_EXPIRY_REDUCTION) + .durationType() + .defaultValue(Duration.ofSeconds(1)) + .withDescription("OIDC authorization access token expiry" + + " reduction as a Duration." + + " A new access token is obtained if the token" + + " is older than it's expiry time minus this value."); } diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java index c9f8f8c2..0218b37c 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactory.java @@ -1,5 +1,6 @@ package com.getindata.connectors.http.internal.table.lookup; +import java.net.URLDecoder; import java.util.List; import java.util.Properties; import java.util.Set; @@ -10,6 +11,8 @@ import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.DataTypes.Field; import org.apache.flink.table.catalog.Column; @@ -52,13 +55,14 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext) FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, dynamicTableContext); - ReadableConfig readable = helper.getOptions(); + ReadableConfig readableConfig = helper.getOptions(); helper.validateExcept( // properties coming from org.apache.flink.table.api.config.ExecutionConfigOptions "table.", HttpConnectorConfigConstants.GID_CONNECTOR_HTTP, LOOKUP_REQUEST_FORMAT.key() ); + validateHttpSourceOptions(readableConfig); DecodingFormat> decodingFormat = helper.discoverDecodingFormat( @@ -66,7 +70,7 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext) FactoryUtil.FORMAT ); - HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readable); + HttpLookupConfig lookupConfig = getHttpLookupOptions(dynamicTableContext, readableConfig); ResolvedSchema resolvedSchema = dynamicTableContext.getCatalogTable().getResolvedSchema(); @@ -78,9 +82,37 @@ public DynamicTableSource createDynamicTableSource(Context dynamicTableContext) lookupConfig, decodingFormat, dynamicTableContext, - getLookupCache(readable) + getLookupCache(readableConfig) ); } + protected void validateHttpSourceOptions(ReadableConfig tableOptions) + throws IllegalArgumentException { + // ensure that there is an OIDC token request if we have an OIDC token endpoint + tableOptions.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL).ifPresent(url -> { + if (tableOptions.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST).isEmpty()) { + throw new IllegalArgumentException("Config option " + + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key() + " is required, if " + + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL.key() + " is configured."); + } + }); + tableOptions.getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST).ifPresent(tokenRequest -> { + try { + String tokenRequestStr = URLDecoder.decode( tokenRequest, "UTF-8" ); + ObjectMapper objectMapper = new ObjectMapper(); + JsonNode rootNode = objectMapper.readTree(tokenRequestStr); + JsonNode grantTypeNode = rootNode.get("grant-type"); + if (grantTypeNode == null) { + throw new IllegalArgumentException("Config option " + + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key() + + " does not contain a grant-type. A grant-type is required."); + } + } catch (Exception e) { + throw new IllegalArgumentException("Config option " + + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key() + + "cannot be decoded into json "); + } + }); + } @Override public String factoryIdentifier() { @@ -94,7 +126,6 @@ public Set> requiredOptions() { @Override public Set> optionalOptions() { - return Set.of( URL_ARGS, ASYNC_POLLING, @@ -105,7 +136,11 @@ public Set> optionalOptions() { LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE, LookupOptions.PARTIAL_CACHE_MAX_ROWS, LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY, - LookupOptions.MAX_RETRIES); + LookupOptions.MAX_RETRIES, + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION, + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST, + SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL + ); } private HttpLookupConfig getHttpLookupOptions(Context context, ReadableConfig readableConfig) { diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java index ce3a31cc..77520caa 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/JavaNetHttpPollingClient.java @@ -35,6 +35,8 @@ public class JavaNetHttpPollingClient implements PollingClient { private final HttpRequestFactory requestFactory; + private final HttpLookupConfig options; + private final HttpPostRequestCallback httpPostRequestCallback; public JavaNetHttpPollingClient( @@ -46,6 +48,7 @@ public JavaNetHttpPollingClient( this.httpClient = httpClient; this.responseBodyDecoder = responseBodyDecoder; this.requestFactory = requestFactory; + this.options = options; this.httpPostRequestCallback = options.getHttpPostRequestCallback(); @@ -78,6 +81,7 @@ public Optional pull(RowData lookupRow) { private Optional queryAndProcess(RowData lookupData) throws Exception { HttpLookupSourceRequestEntry request = requestFactory.buildLookupRequest(lookupData); + HttpResponse response = httpClient.send( request.getHttpRequest(), BodyHandlers.ofString() diff --git a/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java b/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java index f6c19f62..54b3a408 100644 --- a/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java +++ b/src/main/java/com/getindata/connectors/http/internal/table/lookup/RequestFactoryBase.java @@ -1,9 +1,12 @@ package com.getindata.connectors.http.internal.table.lookup; +import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpRequest.Builder; +import java.time.Duration; import java.util.Arrays; import java.util.Map; +import java.util.Optional; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.table.data.RowData; @@ -12,13 +15,15 @@ import com.getindata.connectors.http.LookupQueryCreator; import com.getindata.connectors.http.internal.HeaderPreprocessor; +import com.getindata.connectors.http.internal.auth.OidcAccessTokenManager; import com.getindata.connectors.http.internal.config.HttpConnectorConfigConstants; import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; +import static com.getindata.connectors.http.internal.table.lookup.HttpLookupConnectorOptions.*; /** * Base class for {@link HttpRequest} factories. */ -public abstract class RequestFactoryBase implements HttpRequestFactory { +public abstract class RequestFactoryBase implements HttpRequestFactory{ public static final String DEFAULT_REQUEST_TIMEOUT_SECONDS = "30"; @@ -35,6 +40,7 @@ public abstract class RequestFactoryBase implements HttpRequestFactory { * HTTP headers that should be used for {@link HttpRequest} created by factory. */ private final String[] headersAndValues; + private final HttpLookupConfig options; public RequestFactoryBase( LookupQueryCreator lookupQueryCreator, @@ -43,6 +49,7 @@ public RequestFactoryBase( this.baseUrl = options.getUrl(); this.lookupQueryCreator = lookupQueryCreator; + this.options = options; var headerMap = HttpHeaderUtils .prepareHeaderMap( @@ -71,10 +78,45 @@ public HttpLookupSourceRequestEntry buildLookupRequest(RowData lookupRow) { if (headersAndValues.length != 0) { requestBuilder.headers(headersAndValues); } + Optional oidcAuthURL = this.options.getReadableConfig() + .getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_ENDPOINT_URL); + if (oidcAuthURL.isPresent()) { + Optional oidcTokenRequest = this.options.getReadableConfig() + .getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST); + + Optional oidcExpiryReduction = this.options.getReadableConfig() + .getOptional(SOURCE_LOOKUP_OIDC_AUTH_TOKEN_EXPIRY_REDUCTION); + + addAccessTokenToRequest(requestBuilder, oidcAuthURL, + oidcTokenRequest, oidcExpiryReduction); + } return new HttpLookupSourceRequestEntry(requestBuilder.build(), lookupQueryInfo); } + /** + * Add the access token to the request using OidcAuth authenticate + * method that gives us a valid access token. + * @param requestBuilder request build to add the header to + * @param oidcAuthURL OIDC token endpoint + * @param oidcTokenRequest OIDC Token Request + * @param oidcExpiryReduction OIDC token expiry reduction + */ + void addAccessTokenToRequest(HttpRequest.Builder requestBuilder, + Optional oidcAuthURL, + Optional oidcTokenRequest, + Optional oidcExpiryReduction + ) { + OidcAccessTokenManager auth = new OidcAccessTokenManager( + HttpClient.newBuilder().build(), + oidcTokenRequest.get(), + oidcAuthURL.get(), + oidcExpiryReduction.get() + ); + // apply the OIDC authentication by adding the correct header. + requestBuilder.header("Authorization", "BEARER " + auth.authenticate()); + } + protected abstract Logger getLogger(); /** diff --git a/src/test/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManagerTest.java b/src/test/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManagerTest.java new file mode 100644 index 00000000..3ae5329f --- /dev/null +++ b/src/test/java/com/getindata/connectors/http/internal/auth/OidcAccessTokenManagerTest.java @@ -0,0 +1,248 @@ +package com.getindata.connectors.http.internal.auth; + +import java.net.*; +import java.net.http.HttpClient; +import java.net.http.HttpHeaders; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; +import javax.net.ssl.SSLSession; + +import net.minidev.json.JSONObject; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.getindata.connectors.http.internal.HeaderPreprocessor; +import com.getindata.connectors.http.internal.table.lookup.*; +import com.getindata.connectors.http.internal.utils.HttpHeaderUtils; + + + +public class OidcAccessTokenManagerTest { + + private HeaderPreprocessor headerPreprocessor; + + private HttpLookupConfig options; + + private static final String BASE_URL = "http://localhost/aaa"; + + @BeforeEach + public void setUp() { + this.headerPreprocessor = HttpHeaderUtils.createDefaultHeaderPreprocessor(); + this.options = HttpLookupConfig.builder().url(BASE_URL).build(); + } + + @Test + public void testAuthenticate() throws InterruptedException { + + MockHttpClient authHttpClient = new MockHttpClient(); + + authHttpClient.setIsExpired(1); + authHttpClient.setAccessToken("Access1"); + String url = "http://localhost"; + OidcAccessTokenManager oidcAuth = new OidcAccessTokenManager(authHttpClient, "abc", url); + + // apply the authorization to the httpRequest + String token1 = oidcAuth.authenticate(); + assertThat(token1).isNotNull(); + String token2 = oidcAuth.authenticate(); + assertThat(token2).isNotNull(); + // check the token is cached + assertThat(token1).isEqualTo(token2); + Thread.sleep(2000); + // check the token is different after first token has expired + String token3 = oidcAuth.authenticate(); + assertThat(token3).isNotNull(); + assertThat(token3).isNotEqualTo(token2); + } + @Test + public void testAuthenticateWithBadStatusCode() throws InterruptedException { + + MockHttpClient authHttpClient = new MockHttpClient(); + + authHttpClient.setIsExpired(1); + authHttpClient.setAccessToken("Access1"); + authHttpClient.setStatus(500); + String url = "http://localhost"; + OidcAccessTokenManager oidcAuth = new OidcAccessTokenManager(authHttpClient, "abc", url); + + try { + oidcAuth.authenticate(); + assertTrue(false, "Bad status code should result in an exception."); + } catch (IllegalStateException e) { + // expected + } + } + @Test + public void testAuthenticateWithExpiryReduction() throws InterruptedException { + + MockHttpClient authHttpClient = new MockHttpClient(); + + authHttpClient.setIsExpired(1); + authHttpClient.setAccessToken("Access1"); + String url = "http://localhost"; + OidcAccessTokenManager oidcAuth = new OidcAccessTokenManager(authHttpClient, + "abc", url, Duration.ofSeconds(5)); + + // apply the authorization to the httpRequest + String token1 = oidcAuth.authenticate(); + assertThat(token1).isNotNull(); + String token2 = oidcAuth.authenticate(); + assertThat(token2).isNotNull(); + } + class MockHttpClient extends HttpClient { + private int isExpired; + private String accessToken; + private int count = 0; + private int status = 200; + + @Override + public Optional cookieHandler() { + return Optional.empty(); + } + + @Override + public Optional connectTimeout() { + return Optional.empty(); + } + + @Override + public Redirect followRedirects() { + return null; + } + + @Override + public Optional proxy() { + return Optional.empty(); + } + + @Override + public SSLContext sslContext() { + return null; + } + + @Override + public SSLParameters sslParameters() { + return null; + } + + @Override + public Optional authenticator() { + return Optional.empty(); + } + + @Override + public Version version() { + return null; + } + + @Override + public Optional executor() { + return Optional.empty(); + } + + @Override + public HttpResponse send(HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler) { + + JSONObject json = new JSONObject(); + + json.put("expires_in", 2); + json.put("access_token", "dummy_token_" + this.count++); + byte[] bytes = json.toJSONString().getBytes(); + + MockHttpResponse mockHttpResponse = new MockHttpResponse(); + mockHttpResponse.setStatusCode(status); + mockHttpResponse.setBody(bytes); + + return (HttpResponse) mockHttpResponse; + } + + @Override + public CompletableFuture> sendAsync( + HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler) { + return null; + } + + @Override + public CompletableFuture> sendAsync( + HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler, + HttpResponse.PushPromiseHandler pushPromiseHandler) { + return null; + } + + public void setIsExpired(int isExpired) { + this.isExpired = isExpired; + } + + public void setAccessToken(String accesstoken) { + this.accessToken = accesstoken; + } + + public void setStatus(int status) { + this.status = status; + } + + class MockHttpResponse implements HttpResponse { + int statusCode = 0; + byte[] body = new byte[0]; + + @Override + public int statusCode() { + return statusCode; + } + + @Override + public HttpRequest request() { + return null; + } + + @Override + public Optional> previousResponse() { + return Optional.empty(); + } + + @Override + public HttpHeaders headers() { + return null; + } + + @Override + public byte[] body() { + return body; + } + + @Override + public Optional sslSession() { + return Optional.empty(); + } + + @Override + public URI uri() { + return null; + } + + @Override + public Version version() { + return null; + } + + public void setStatusCode(int statusCode) { + this.statusCode = statusCode; + } + + public void setBody(byte[] body) { + this.body = body; + } + } + } +} diff --git a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java index 31236460..61f45246 100644 --- a/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/table/lookup/HttpLookupTableSourceFactoryTest.java @@ -1,12 +1,10 @@ package com.getindata.connectors.http.internal.table.lookup; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.net.URLEncoder; +import java.util.*; import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.catalog.Column; import org.apache.flink.table.catalog.ResolvedSchema; @@ -14,8 +12,8 @@ import org.apache.flink.table.connector.source.DynamicTableSource; import org.junit.jupiter.api.Test; import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSource; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.assertj.core.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertFalse; public class HttpLookupTableSourceFactoryTest { @@ -36,6 +34,43 @@ public class HttpLookupTableSourceFactoryTest { Collections.emptyList(), UniqueConstraint.primaryKey("id", List.of("id")) ); + @Test + void validateHttpSourceOptions() { + + HttpLookupTableSourceFactory httpLookupTableSourceFactory + = new HttpLookupTableSourceFactory(); + TableConfig tableConfig = new TableConfig(); + httpLookupTableSourceFactory.validateHttpSourceOptions(tableConfig); + tableConfig.set(HttpLookupConnectorOptions + .SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key(), "bbb"); + try { + httpLookupTableSourceFactory.validateHttpSourceOptions(tableConfig); + assertFalse(true, "Expected an error json processing error"); + } catch (IllegalArgumentException e) { + // expected + } + + String json = "{}"; + String urlencoded = URLEncoder.encode(json); + tableConfig.set(HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key(), + urlencoded); + try { + httpLookupTableSourceFactory.validateHttpSourceOptions(tableConfig); + assertFalse(true, "Expected an error as no grant-type."); + } catch (IllegalArgumentException e) { + // expected + } + json = "{\"grant-type\":\"password\"}"; + urlencoded = URLEncoder.encode(json); + tableConfig.set(HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key(), + urlencoded); + httpLookupTableSourceFactory.validateHttpSourceOptions(tableConfig); + json = "{\"grant-type\":\"test1\",\"parm\":\"testval\"}"; + urlencoded = URLEncoder.encode(json); + tableConfig.set(HttpLookupConnectorOptions.SOURCE_LOOKUP_OIDC_AUTH_TOKEN_REQUEST.key(), + urlencoded); + httpLookupTableSourceFactory.validateHttpSourceOptions(tableConfig); + } @Test void shouldCreateForMandatoryFields() {