Skip to content

Commit

Permalink
http91 bearer token support
Browse files Browse the repository at this point in the history
Signed-off-by: davidradl <[email protected]>
  • Loading branch information
davidradl committed Aug 15, 2024
1 parent 05d3b47 commit bea7c28
Show file tree
Hide file tree
Showing 11 changed files with 596 additions and 18 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## [Unreleased]

### Added

- Added support for OIDC Bearer tokens.

## [0.15.0] - 2024-07-30

### Added
Expand Down
27 changes: 24 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -410,13 +410,31 @@ In this special case, you can configure connector to trust all certificates with
To enable this option use `gid.connector.http.security.cert.server.allowSelfSigned` property setting its value to `true`.

## Basic Authentication
The connector supports Basic Authentication mechanism using HTTP `Authorization` header.
The connector supports Basic Authentication using a HTTP `Authorization` header.
The header value can be set via properties, similarly as for other headers. The connector converts the passed value to Base64 and uses it for the request.
If the used value starts with the prefix `Basic `, or `gid.connector.http.source.lookup.use-raw-authorization-header`
is set to `'true'`, it will be used as header value as is, without any extra modification.

## OIDC Bearer Authentication
The connector supports Bearer Authentication using a HTTP `Authorization` header. The [OAuth 2.0 rcf](https://datatracker.ietf.org/doc/html/rfc6749) mentions [Obtaining Authorization](https://datatracker.ietf.org/doc/html/rfc6749#section-4)
and an authorization grant. OIDC makes use of this [authorisation grant](https://datatracker.ietf.org/doc/html/rfc6749#section-1.3) in a [Token Request](https://openid.net/specs/openid-connect-core-1_0.html#TokenRequest) by including a [OAuth grant type](https://oauth.net/2/grant-types/) and associated properties, the response is the [token response](https://openid.net/specs/openid-connect-core-1_0.html#TokenResponse).

If you want to use this authorization then you should supply the `Token Request` body in `application/x-www-form-urlencoded` encoding
in configuration property `gid.connector.http.security.oidc.token.request`. See [grant extension](https://datatracker.ietf.org/doc/html/rfc6749#section-4.5) for
an example of a customised grant type token request. The supplied `token request` will be issued to the
[token end point](https://datatracker.ietf.org/doc/html/rfc6749#section-3.2), whose url should be supplied in configuration property
`gid.connector.http.security.oidc.token.endpoint.url`. The returned `access token` is then cached and used for subsequent requests; if the token has expired then
a new one is requested. There is a property `gid.connector.http.security.oidc.token.expiry.reduction`, that defaults to 1 second; new tokens will
be requested if the current time is later than the cached token expiry time minus `gid.connector.http.security.oidc.token.expiry.reduction`.

### Restrictions at this time
* No authentication is applied to the token request.
* This authentication is only available for HTTP TableLookup Source.
* The processing does not use the refresh token if it present.

## Table API Connector Options
### HTTP TableLookup Source

| Option | Required | Description/Value |
|---------------------------------------------------------------|----------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| connector | required | The Value should be set to _rest-lookup_ |
Expand All @@ -436,19 +454,22 @@ is set to `'true'`, it will be used as header value as is, without any extra mod
| gid.connector.http.security.cert.client | optional | Path to trusted certificate that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.key.client | optional | Path to trusted private key that should be used by connector's HTTP client for mTLS communication. |
| gid.connector.http.security.cert.server.allowSelfSigned | optional | Accept untrusted certificates for TLS communication. |
| gid.connector.http.security.oidc.token.request | optional | OIDC `Token Request` body in `application/x-www-form-urlencoded` encoding |
| gid.connector.http.security.oidc.token.endpoint.url | optional | OIDC `Token Endpoint` url, to which the token request will be issued |
| gid.connector.http.security.oidc.token.expiry.reduction | optional | OIDC tokens will be requested if the current time is later than the cached token expiry time minus this value. |
| gid.connector.http.source.lookup.request.timeout | optional | Sets HTTP request timeout in seconds. If not specified, the default value of 30 seconds will be used. |
| gid.connector.http.source.lookup.request.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup request processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 8 threads will be used. |
| gid.connector.http.source.lookup.response.thread-pool.size | optional | Sets the size of pool thread for HTTP lookup response processing. Increasing this value would mean that more concurrent requests can be processed in the same time. If not specified, the default value of 4 threads will be used. |
| gid.connector.http.source.lookup.use-raw-authorization-header | optional | If set to `'true'`, uses the raw value set for the `Authorization` header, without transformation for Basic Authentication (base64, addition of "Basic " prefix). If not specified, defaults to `'false'`. |
| gid.connector.http.source.lookup.request-callback | optional | Specify which `HttpLookupPostRequestCallback` implementation to use. By default, it is set to `slf4j-lookup-logger` corresponding to `Slf4jHttpLookupPostRequestCallback`. |


### HTTP Sink

| Option | Required | Description/Value |
|---------------------------------------------------------|----------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| connector | required | Specify what connector to use. For HTTP Sink it should be set to _'http-sink'_. |
| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
| format | required | Specify what format to use. |
| url | required | The base URL that should be use for HTTP requests. For example _http://localhost:8080/client_. |
| insert-method | optional | Specify which HTTP method to use in the request. The value should be set either to `POST` or `PUT`. |
| sink.batch.max-size | optional | Maximum number of elements that may be passed in a batch to be written downstream. |
| sink.requests.max-inflight | optional | The maximum number of in flight requests that may exist, if any more in flight requests need to be initiated once the maximum has been reached, then it will be blocked until some have completed. |
Expand Down
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ under the License.
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>2.17.1</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
Expand Down Expand Up @@ -297,6 +303,8 @@ under the License.
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M5</version>
<configuration>
<!-- argLine needed for Flink 1.16 and 1.17 or there are unit test errors-->
<argLine>--add-opens java.base/java.util=ALL-UNNAMED --add-opens java.base/java.lang=ALL-UNNAMED</argLine>
</configuration>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@

/*
* Copyright 2020 Red Hat
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.getindata.connectors.http.internal.auth;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.time.Instant;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

/**
* This class is inspired by
* https://github.com/Apicurio/apicurio-common-rest-client/blob/
* 944ac9eb527c291a6083bd10ee012388e1684d20/rest-client-common/src/main/java/io/
* apicurio/rest/client/auth/OidcAuth.java.
*
* The OIDC access token manager encapsulates the caching of an OIDC access token,
* which can be short lived, for example an hour. The authenticate method will return an
* un-expired access token, either from the cache or by requesting a new access token.
*/
public class OidcAccessTokenManager {

private static final Duration DEFAULT_TOKEN_EXPIRATION_REDUCTION = Duration.ofSeconds(1);
private final HttpClient httpClient;
private final String tokenRequest;

private final String url;
private final Duration tokenExpirationReduction;

private String cachedAccessToken;
private Instant cachedAccessTokenExp;

/**
* Construct an Oidc access token manager with the default token expiration reduction
* @param httpClient httpClient to use to call the token endpoint.
* @param tokenRequest token request
* @param url token endpoint url
*/
public OidcAccessTokenManager(HttpClient httpClient, String tokenRequest, String url) {
this(httpClient, tokenRequest, url, DEFAULT_TOKEN_EXPIRATION_REDUCTION);
}
/**
* Construct an Oidc access token manager with the supplied token expiration reduction
* @param httpClient httpClient to use to call the token endpoint.
* @param tokenRequest token request this need to be form urlencoded
* @param url token endpoint url
* @param tokenExpirationReduction token expiry reduction, request a new token if the
* current time is later than the cached access token
* expiry time reduced by this value. This means that
* we will not use the cached token if it is about
* to expire.
*/
public OidcAccessTokenManager(HttpClient httpClient, String tokenRequest, String url,
Duration tokenExpirationReduction) {
this.tokenRequest = tokenRequest;
this.httpClient = httpClient;
this.url = url;
if (null == tokenExpirationReduction) {
this.tokenExpirationReduction = DEFAULT_TOKEN_EXPIRATION_REDUCTION;
} else {
this.tokenExpirationReduction = tokenExpirationReduction;
}
}

/**
* Request an access token from the token endpoint
*/
private void requestAccessToken() {
try {
HttpRequest httpRequest =
HttpRequest.newBuilder()
.uri(URI.create(url))
.header("Content-Type", "application/x-www-form-urlencoded")
.method("POST", HttpRequest.BodyPublishers.ofString(tokenRequest))
.build();

HttpResponse<byte[]> response = httpClient.send(httpRequest,
HttpResponse.BodyHandlers.ofByteArray());
//create ObjectMapper instance
ObjectMapper objectMapper = new ObjectMapper();
if (200 == response.statusCode()) {
byte[] bytes = response.body();
JsonNode rootNode = objectMapper.readTree(bytes);
JsonNode tokenNode = rootNode.path("access_token");
JsonNode expiresInNode = rootNode.path("expires_in");
this.cachedAccessToken = tokenNode.textValue();
/*
expiresIn is in seconds
*/
Duration expiresIn = Duration.ofSeconds(expiresInNode.asInt());
if (expiresIn.compareTo(this.tokenExpirationReduction) > 0) {
//expiresIn is greater than tokenExpirationReduction
expiresIn = expiresIn.minus(this.tokenExpirationReduction);
}
this.cachedAccessTokenExp = Instant.now().plus(expiresIn);
} else {
throw new IllegalStateException("Attempted to get an access token but got http" +
" status code " + response.statusCode());
}
} catch (JsonProcessingException e) {
throw new IllegalStateException("Error found while trying to request a new token");
} catch (IOException e) {
throw new IllegalStateException("IO Exception occurred", e);
} catch (InterruptedException e) {
throw new IllegalStateException("Interrupted Exception occurred", e);
}
}

/**
* Get a valid unexpired access token.
* @return access token.
*/
public String authenticate() {
if (isAccessTokenRequired()) {
requestAccessToken();
}
return cachedAccessToken;
}

private boolean isAccessTokenRequired() {
return null == cachedAccessToken || isTokenExpired();
}

private boolean isTokenExpired() {
return Instant.now().isAfter(this.cachedAccessTokenExp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ public final class HttpConnectorConfigConstants {
public static final String LOOKUP_SOURCE_HEADER_PREFIX = GID_CONNECTOR_HTTP
+ "source.lookup.header.";

public static final String OIDC_AUTH_TOKEN_REQUEST = GID_CONNECTOR_HTTP
+ "security.oidc.token.request";

public static final String OIDC_AUTH_TOKEN_ENDPOINT_URL = GID_CONNECTOR_HTTP
+ "security.oidc.token.endpoint.url";

public static final String OIDC_AUTH_TOKEN_EXPIRY_REDUCTION = GID_CONNECTOR_HTTP
+ "security.oidc.token.expiry.reduction";
/**
* Whether to use the raw value of the Authorization header. If set, it prevents
* the special treatment of the header for Basic Authentication, thus preserving the passed
Expand Down
Loading

0 comments on commit bea7c28

Please sign in to comment.