Skip to content

Commit

Permalink
fix merge issue
Browse files Browse the repository at this point in the history
Signed-off-by: David Radley <[email protected]>
  • Loading branch information
davidradl committed Mar 25, 2024
2 parents a1a9460 + b403ff6 commit 03259fb
Show file tree
Hide file tree
Showing 18 changed files with 318 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/prepare_release_branch.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
- name: Open a PR to bump development version to main
id: open_pr
uses: vsoch/pull-request-action@1.0.12
uses: vsoch/pull-request-action@1.1.0
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
PULL_REQUEST_BRANCH: main
Expand Down
13 changes: 12 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

Moved junit support to junit 5, allowing junits to be run against flink 1.17 and 1.18.

## [0.12.0] - 2024-03-22

### Added

- Added support for passing `Authorization` headers for other purposes than Basic Authentication.
Expand All @@ -14,6 +16,13 @@
transformation for Basic Authentication (base64, addition of "Basic " prefix).
If not specified, defaults to `'false'`.

### Changed

- Changed API for `LookupQueryCreator`. The method `createLookupQuery` no longer returns a String but a
[LookupQueryInfo](src/main/java/com/getindata/connectors/http/internal/table/lookup/LookupQueryInfo.java)
Any custom implementation of this interface that aims to provide body-based request is able to provide
the lookup query as the payload and an optional formatted string representing the query parameters.

## [0.11.0] - 2023-11-20

## [0.10.0] - 2023-07-05
Expand Down Expand Up @@ -157,7 +166,9 @@

- Implement basic support for Http connector for Flink SQL

[Unreleased]: https://github.com/getindata/flink-http-connector/compare/0.11.0...HEAD
[Unreleased]: https://github.com/getindata/flink-http-connector/compare/0.12.0...HEAD

[0.12.0]: https://github.com/getindata/flink-http-connector/compare/0.11.0...0.12.0

[0.11.0]: https://github.com/getindata/flink-http-connector/compare/0.10.0...0.11.0

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ under the License.

<groupId>com.getindata</groupId>
<artifactId>flink-http-connector</artifactId>
<version>0.12.0-SNAPSHOT</version>
<version>0.13.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>flink-http-connector</name>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@

import org.apache.flink.table.data.RowData;

import com.getindata.connectors.http.internal.table.lookup.LookupQueryInfo;

/**
* An interface for a creator of a lookup query in the Http Lookup Source (e.g., the query that
* gets appended to the URI in GET request).
* gets appended as query parameters to the URI in GET request or supplied as the payload of a
* body-based request along with optional query parameters).
*
* <p>One can customize how those queries are built by implementing {@link LookupQueryCreator} and
* {@link LookupQueryCreatorFactory}.
Expand All @@ -20,5 +23,5 @@ public interface LookupQueryCreator extends Serializable {
* @param lookupDataRow a {@link RowData} containing request parameters.
* @return a lookup query.
*/
String createLookupQuery(RowData lookupDataRow);
LookupQueryInfo createLookupQuery(RowData lookupDataRow);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ public BodyBasedRequestFactory(
* Method for preparing {@link HttpRequest.Builder} for REST request that sends their parameters
* in request body, for example PUT or POST methods
*
* @param lookupQuery lookup query used for request body.
* @param lookupQueryInfo lookup query info used for request body.
* @return {@link HttpRequest.Builder} for given lookupQuery.
*/
@Override
protected Builder setUpRequestMethod(String lookupQuery) {
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
return HttpRequest.newBuilder()
.uri(constructGetUri())
.method(methodName, BodyPublishers.ofString(lookupQuery))
.uri(constructBodyBasedUri(lookupQueryInfo))
.method(methodName, BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()))
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
}

Expand All @@ -53,9 +53,15 @@ protected Logger getLogger() {
return log;
}

private URI constructGetUri() {
URI constructBodyBasedUri(LookupQueryInfo lookupQueryInfo) {
StringBuilder resolvedUrl = new StringBuilder(baseUrl);
if (lookupQueryInfo.hasBodyBasedUrlQueryParameters()) {
resolvedUrl.append(baseUrl.contains("?") ? "&" : "?")
.append(lookupQueryInfo.getBodyBasedUrlQueryParameters());
}

try {
return new URIBuilder(baseUrl).build();
return new URIBuilder(resolvedUrl.toString()).build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,36 @@ protected Logger getLogger() {
}

/**
* Method for preparing {@link HttpRequest.Builder} for REST GET request, where lookupQuery
* is used as query parameters for example:
* Method for preparing {@link HttpRequest.Builder} for REST GET request, where lookupQueryInfo
* is used as query parameters for GET requests, for example:
* <pre>
* http:localhost:8080/service?id=1
* </pre>
* @param lookupQuery lookup query used for request query parameters.
* or as payload for body-based requests with optional parameters, for example:
* <pre>
* http:localhost:8080/service?id=1
* body payload: { "uid": 2 }
* </pre>
* @param lookupQueryInfo lookup query info used for request query parameters.
* @return {@link HttpRequest.Builder} for given GET lookupQuery
*/
@Override
protected Builder setUpRequestMethod(String lookupQuery) {
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
return HttpRequest.newBuilder()
.uri(constructGetUri(lookupQuery))
.uri(constructGetUri(lookupQueryInfo))
.GET()
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
}

private URI constructGetUri(String lookupQuery) {
URI constructGetUri(LookupQueryInfo lookupQueryInfo) {
StringBuilder resolvedUrl = new StringBuilder(baseUrl);
if (lookupQueryInfo.hasLookupQuery()) {
resolvedUrl.append(baseUrl.contains("?") ? "&" : "?")
.append(lookupQueryInfo.getLookupQuery());
}

try {
return new URIBuilder(baseUrl + "?" + lookupQuery).build();
return new URIBuilder(resolvedUrl.toString()).build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ public class HttpLookupSourceRequestEntry {
* represent a request body, for example a Json string when PUT/POST requests method was used,
* or it can represent a query parameters if GET method was used.
*/
private final String lookupQuery;

public HttpLookupSourceRequestEntry(HttpRequest httpRequest, String lookupQuery) {
private final LookupQueryInfo lookupQueryInfo;

public HttpLookupSourceRequestEntry(HttpRequest httpRequest, LookupQueryInfo lookupQueryInfo) {
this.httpRequest = httpRequest;
this.lookupQuery = lookupQuery;
this.lookupQueryInfo = lookupQueryInfo;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import lombok.Getter;
import lombok.ToString;

import com.getindata.connectors.http.internal.utils.uri.NameValuePair;
import com.getindata.connectors.http.internal.utils.uri.URLEncodedUtils;

/**
* Holds the lookup query for an HTTP request.
* The {@code lookupQuery} either contain the query parameters for a GET operation
* or the payload of a body-based request.
* The {@code bodyBasedUrlQueryParams} contains the optional query parameters of a
* body-based request in addition to its payload supplied with {@code lookupQuery}.
*/
@ToString
public class LookupQueryInfo implements Serializable {
@Getter
private final String lookupQuery;

private final Map<String, String> bodyBasedUrlQueryParams;

public LookupQueryInfo(String lookupQuery) {
this(lookupQuery, null);
}

public LookupQueryInfo(String lookupQuery, Map<String, String> bodyBasedUrlQueryParams) {
this.lookupQuery =
lookupQuery == null ? "" : lookupQuery;
this.bodyBasedUrlQueryParams =
bodyBasedUrlQueryParams == null ? Collections.emptyMap() : bodyBasedUrlQueryParams;
}

public String getBodyBasedUrlQueryParameters() {
return URLEncodedUtils.format(
bodyBasedUrlQueryParams
.entrySet()
.stream()
.map(entry -> new NameValuePair(entry.getKey(), entry.getValue()))
.collect(Collectors.toList()),
StandardCharsets.UTF_8);
}

public boolean hasLookupQuery() {
return !lookupQuery.isBlank();
}
public boolean hasBodyBasedUrlQueryParameters() {
return !bodyBasedUrlQueryParams.isEmpty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -61,16 +61,16 @@ public RequestFactoryBase(
@Override
public HttpLookupSourceRequestEntry buildLookupRequest(RowData lookupRow) {

String lookupQuery = lookupQueryCreator.createLookupQuery(lookupRow);
getLogger().debug("Created Http lookup query: " + lookupQuery);
LookupQueryInfo lookupQueryInfo = lookupQueryCreator.createLookupQuery(lookupRow);
getLogger().debug("Created Http lookup query: " + lookupQueryInfo);

Builder requestBuilder = setUpRequestMethod(lookupQuery);
Builder requestBuilder = setUpRequestMethod(lookupQueryInfo);

if (headersAndValues.length != 0) {
requestBuilder.headers(headersAndValues);
}

return new HttpLookupSourceRequestEntry(requestBuilder.build(), lookupQuery);
return new HttpLookupSourceRequestEntry(requestBuilder.build(), lookupQueryInfo);
}

protected abstract Logger getLogger();
Expand All @@ -80,7 +80,7 @@ public HttpLookupSourceRequestEntry buildLookupRequest(RowData lookupRow) {
* @param lookupQuery lookup query used for request query parameters or body.
* @return {@link HttpRequest.Builder} for given lookupQuery.
*/
protected abstract Builder setUpRequestMethod(String lookupQuery);
protected abstract Builder setUpRequestMethod(LookupQueryInfo lookupQuery);

@VisibleForTesting
String[] getHeadersAndValues() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void call(
httpRequest.uri().toString(),
httpRequest.method(),
headers,
requestEntry.getLookupQuery()
requestEntry.getLookupQueryInfo()
);
} else {
log.info(
Expand All @@ -58,7 +58,7 @@ public void call(
httpRequest.uri().toString(),
httpRequest.method(),
headers,
requestEntry.getLookupQuery(),
requestEntry.getLookupQueryInfo(),
response,
response.body().replaceAll(ConfigUtils.UNIVERSAL_NEW_LINE_REGEXP, "")
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.getindata.connectors.http.LookupArg;
import com.getindata.connectors.http.LookupQueryCreator;
import com.getindata.connectors.http.internal.table.lookup.LookupQueryInfo;
import com.getindata.connectors.http.internal.table.lookup.LookupRow;

/**
Expand Down Expand Up @@ -35,14 +36,15 @@ private static String processLookupArg(LookupArg arg) {
}

@Override
public String createLookupQuery(RowData lookupDataRow) {

public LookupQueryInfo createLookupQuery(RowData lookupDataRow) {
Collection<LookupArg> lookupArgs = lookupRow.convertToLookupArgs(lookupDataRow);

var luceneQuery = lookupArgs.stream()
.map(ElasticSearchLiteQueryCreator::processLookupArg)
.collect(Collectors.joining(ENCODED_SPACE + "AND" + ENCODED_SPACE));

return luceneQuery.isEmpty() ? "" : ("q=" + luceneQuery);
String lookupQuery = luceneQuery.isEmpty() ? "" : ("q=" + luceneQuery);

return new LookupQueryInfo(lookupQuery);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.getindata.connectors.http.LookupArg;
import com.getindata.connectors.http.LookupQueryCreator;
import com.getindata.connectors.http.internal.table.lookup.LookupQueryInfo;
import com.getindata.connectors.http.internal.table.lookup.LookupRow;
import com.getindata.connectors.http.internal.utils.uri.NameValuePair;
import com.getindata.connectors.http.internal.utils.uri.URLEncodedUtils;
Expand All @@ -25,14 +26,17 @@ public GenericGetQueryCreator(LookupRow lookupRow) {
}

@Override
public String createLookupQuery(RowData lookupDataRow) {
public LookupQueryInfo createLookupQuery(RowData lookupDataRow) {

Collection<LookupArg> lookupArgs = lookupRow.convertToLookupArgs(lookupDataRow);

return URLEncodedUtils.format(
lookupArgs.stream()
.map(arg -> new NameValuePair(arg.getArgName(), arg.getArgValue()))
.collect(Collectors.toList()),
StandardCharsets.UTF_8);
String lookupQuery =
URLEncodedUtils.format(
lookupArgs.stream()
.map(arg -> new NameValuePair(arg.getArgName(), arg.getArgValue()))
.collect(Collectors.toList()),
StandardCharsets.UTF_8);

return new LookupQueryInfo(lookupQuery);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.flink.util.FlinkRuntimeException;

import com.getindata.connectors.http.LookupQueryCreator;
import com.getindata.connectors.http.internal.table.lookup.LookupQueryInfo;
import com.getindata.connectors.http.internal.utils.SerializationSchemaUtils;

/**
Expand All @@ -33,9 +34,12 @@ public GenericJsonQueryCreator(SerializationSchema<RowData> jsonSerialization) {
* @return Json string created from lookupDataRow argument.
*/
@Override
public String createLookupQuery(RowData lookupDataRow) {
public LookupQueryInfo createLookupQuery(RowData lookupDataRow) {
checkOpened();
return new String(jsonSerialization.serialize(lookupDataRow), StandardCharsets.UTF_8);
String lookupQuery =
new String(jsonSerialization.serialize(lookupDataRow), StandardCharsets.UTF_8);

return new LookupQueryInfo(lookupQuery);
}

private void checkOpened() {
Expand Down
Loading

0 comments on commit 03259fb

Please sign in to comment.