Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http74 add path parameter support #85

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
import java.net.http.HttpRequest.BodyPublishers;
import java.net.http.HttpRequest.Builder;
import java.time.Duration;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.util.FlinkRuntimeException;
import org.slf4j.Logger;

import com.getindata.connectors.http.LookupQueryCreator;
Expand All @@ -16,14 +18,14 @@

/**
* Implementation of {@link HttpRequestFactory} for REST calls that sends their parameters using
* request body.
* request body or in the path.
*/
@Slf4j
public class BodyBasedRequestFactory extends RequestFactoryBase {
public class BodyAndPathBasedRequestFactory extends RequestFactoryBase {

private final String methodName;

public BodyBasedRequestFactory(
public BodyAndPathBasedRequestFactory(
String methodName,
LookupQueryCreator lookupQueryCreator,
HeaderPreprocessor headerPreprocessor,
Expand All @@ -43,7 +45,7 @@ public BodyBasedRequestFactory(
@Override
protected Builder setUpRequestMethod(LookupQueryInfo lookupQueryInfo) {
return HttpRequest.newBuilder()
.uri(constructBodyBasedUri(lookupQueryInfo))
.uri(lookupQueryInfo.getURI())
.method(methodName, BodyPublishers.ofString(lookupQueryInfo.getLookupQuery()))
.timeout(Duration.ofSeconds(this.httpRequestTimeOutSeconds));
}
Expand All @@ -53,17 +55,30 @@ protected Logger getLogger() {
return log;
}

URI constructBodyBasedUri(LookupQueryInfo lookupQueryInfo) {
StringBuilder resolvedUrl = new StringBuilder(baseUrl);
if (lookupQueryInfo.hasBodyBasedUrlQueryParameters()) {
resolvedUrl.append(baseUrl.contains("?") ? "&" : "?")
.append(lookupQueryInfo.getBodyBasedUrlQueryParameters());
}
// URI constructUri(LookupQueryInfo lookupQueryInfo) {
// StringBuilder resolvedUrl = new StringBuilder(baseUrl);
// if (lookupQueryInfo.hasBodyBasedUrlQueryParameters()) {
// resolvedUrl.append(baseUrl.contains("?") ? "&" : "?")
// .append(lookupQueryInfo.getBodyBasedUrlQueryParameters());
// }
// if (lookupQueryInfo.hasPathBasedUrlParameters()) {
// for (Map.Entry<String, String> entry :
// lookupQueryInfo.getPathBasedUrlParameters().entrySet()) {
// String pathParam = "{" + entry.getKey() + "}";
// int startIndex = resolvedUrl.indexOf(pathParam);
// if (startIndex == -1) {
// throw new FlinkRuntimeException(
// "Unexpected error while parsing the URL for path parameters.");
// }
// int endIndex = startIndex + pathParam.length();
// resolvedUrl = resolvedUrl.replace(startIndex, endIndex, entry.getValue());
// }
// }

try {
return new URIBuilder(resolvedUrl.toString()).build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
// try {
// return new URIBuilder(resolvedUrl.toString()).build();
// } catch (URISyntaxException e) {
// throw new RuntimeException(e);
// }
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private PollingClientFactory<RowData> createPollingClientFactory(
lookupQueryCreator,
headerPreprocessor,
lookupConfig) :
new BodyBasedRequestFactory(
new BodyAndPathBasedRequestFactory(
lookupMethod,
lookupQueryCreator,
headerPreprocessor,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.stream.Collectors;

import com.getindata.connectors.http.internal.utils.uri.URIBuilder;
import lombok.Getter;
import lombok.ToString;

import com.getindata.connectors.http.internal.utils.uri.NameValuePair;
import com.getindata.connectors.http.internal.utils.uri.URLEncodedUtils;
import org.apache.flink.util.FlinkRuntimeException;

/**
* Holds the lookup query for an HTTP request.
Expand All @@ -26,32 +31,71 @@ public class LookupQueryInfo implements Serializable {

private final Map<String, String> bodyBasedUrlQueryParams;

private final Map<String, String> pathBasedUrlParams;

public LookupQueryInfo(String lookupQuery) {
this(lookupQuery, null);
this(lookupQuery, null, null);
}

public LookupQueryInfo(String lookupQuery, Map<String, String> bodyBasedUrlQueryParams) {
public LookupQueryInfo(String lookupQuery, Map<String, String> bodyBasedUrlQueryParams,
Map<String, String> pathBasedUrlParams) {
this.lookupQuery =
lookupQuery == null ? "" : lookupQuery;
this.bodyBasedUrlQueryParams =
bodyBasedUrlQueryParams == null ? Collections.emptyMap() : bodyBasedUrlQueryParams;
this.pathBasedUrlParams =
pathBasedUrlParams == null ? Collections.emptyMap() : pathBasedUrlParams;
}

public String getBodyBasedUrlQueryParameters() {
return URLEncodedUtils.format(
bodyBasedUrlQueryParams
.entrySet()
.stream()
// sort the map by key to ensure there is a reliable order for unit tests
.sorted(Map.Entry.comparingByKey())
.map(entry -> new NameValuePair(entry.getKey(), entry.getValue()))
.collect(Collectors.toList()),
StandardCharsets.UTF_8);
}

public Map<String, String> getPathBasedUrlParameters() {
return pathBasedUrlParams;
}

public boolean hasLookupQuery() {
return !lookupQuery.isBlank();
}
public boolean hasBodyBasedUrlQueryParameters() {
return !bodyBasedUrlQueryParams.isEmpty();
}
public boolean hasPathBasedUrlParameters() {
return !pathBasedUrlParams.isEmpty();
}

public URI getURI() {
StringBuilder resolvedUrl = new StringBuilder(lookupQuery);
if (this.hasBodyBasedUrlQueryParameters()) {
resolvedUrl.append(lookupQuery.contains("?") ? "&" : "?")
.append(this.getBodyBasedUrlQueryParameters());
}
if (this.hasPathBasedUrlParameters()) {
for (Map.Entry<String, String> entry :
this.getPathBasedUrlParameters().entrySet()) {
String pathParam = "{" + entry.getKey() + "}";
int startIndex = resolvedUrl.indexOf(pathParam);
if (startIndex == -1) {
throw new FlinkRuntimeException(
"Unexpected error while parsing the URL for path parameters.");
}
int endIndex = startIndex + pathParam.length();
resolvedUrl = resolvedUrl.replace(startIndex, endIndex, entry.getValue());
}
}
try {
return new URIBuilder(resolvedUrl.toString()).build();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ private GetRequestFactory setUpGetRequestFactory(Properties properties) {
);
}

private BodyBasedRequestFactory setUpBodyRequestFactory(
private BodyAndPathBasedRequestFactory setUpBodyRequestFactory(
String methodName,
Properties properties) {

Expand All @@ -345,7 +345,7 @@ private BodyBasedRequestFactory setUpBodyRequestFactory(
boolean useRawAuthHeader = Boolean.parseBoolean(
(String)properties.get(HttpConnectorConfigConstants.LOOKUP_SOURCE_HEADER_USE_RAW));

return new BodyBasedRequestFactory(
return new BodyAndPathBasedRequestFactory(
methodName,
new GenericJsonQueryCreator(jsonSerializer),
HttpHeaderUtils.createDefaultHeaderPreprocessor(useRawAuthHeader),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void shouldBuildBodyBasedClientUri() {
.createEncodingFormat(dynamicTableFactoryContext, new Configuration())
.createRuntimeEncoder(null, lookupPhysicalDataType);

BodyBasedRequestFactory requestFactory = new BodyBasedRequestFactory(
BodyAndPathBasedRequestFactory requestFactory = new BodyAndPathBasedRequestFactory(
"POST",
new GenericJsonQueryCreator(jsonSerializer),
HttpHeaderUtils.createDefaultHeaderPreprocessor(),
Expand All @@ -165,7 +165,7 @@ public void shouldBuildBodyBasedClientUri() {
urlBodyBasedQueryParameters.put("key1", "value1");
urlBodyBasedQueryParameters.put("key2", "value2");

LookupQueryInfo lookupQueryInfo = new LookupQueryInfo("{}", urlBodyBasedQueryParameters);
LookupQueryInfo lookupQueryInfo = new LookupQueryInfo("{}", urlBodyBasedQueryParameters, null);

// WHEN
HttpRequest httpRequest = requestFactory.setUpRequestMethod(lookupQueryInfo).build();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
package com.getindata.connectors.http.internal.table.lookup;

import java.net.URI;
import java.util.Collection;
import java.util.Map;

import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;

import static org.assertj.core.api.Assertions.assertThat;

class LookupQueryInfoTest {
Expand All @@ -14,7 +21,7 @@ public void testConfiguredLookupQuery() {
String lookupQuery = "{\"param1\": \"value1\"}";
Map<String, String> bodyBasedUrlQueryParameters = Map.of("key1", "value1");

lookupQueryInfo = new LookupQueryInfo(lookupQuery, bodyBasedUrlQueryParameters);
lookupQueryInfo = new LookupQueryInfo(lookupQuery, bodyBasedUrlQueryParameters, null);

assertThat(lookupQueryInfo.hasLookupQuery()).isTrue();
assertThat(lookupQueryInfo.getLookupQuery()).isEqualTo(lookupQuery);
Expand All @@ -23,7 +30,7 @@ public void testConfiguredLookupQuery() {
}
@Test
public void testEmptyLookupQueryInfo() {
lookupQueryInfo = new LookupQueryInfo(null, null);
lookupQueryInfo = new LookupQueryInfo(null, null, null);

assertThat(lookupQueryInfo.hasLookupQuery()).isFalse();
assertThat(lookupQueryInfo.hasBodyBasedUrlQueryParameters()).isFalse();
Expand All @@ -35,10 +42,132 @@ public void testEmptyLookupQueryInfo() {
public void testBodyBasedUrlQueryParams() {
Map<String, String> bodyBasedUrlQueryParameters = Map.of("key1", "value1");

lookupQueryInfo = new LookupQueryInfo(null, bodyBasedUrlQueryParameters);
lookupQueryInfo = new LookupQueryInfo(null, bodyBasedUrlQueryParameters, null);

assertThat(lookupQueryInfo.hasLookupQuery()).isFalse();
assertThat(lookupQueryInfo.hasBodyBasedUrlQueryParameters()).isTrue();
assertThat(lookupQueryInfo.getBodyBasedUrlQueryParameters()).isEqualTo("key1=value1");
}

@Test
public void testPathBasedUrlParams1() {
Map<String, String> pathBasedUrlPathParameters = Map.of("key1", "value1");

lookupQueryInfo = new LookupQueryInfo("http://service/{key1}", null, pathBasedUrlPathParameters);

assertThat(lookupQueryInfo.hasLookupQuery()).isTrue();
assertThat(lookupQueryInfo.hasPathBasedUrlParameters()).isTrue();
assertThat(lookupQueryInfo.getPathBasedUrlParameters()).isEqualTo(pathBasedUrlPathParameters);
assertThat(lookupQueryInfo.getURI().toString()).isEqualTo("http://service/value1");
}
@Test
public void testQueryAndPathBasedUrlParams2() {
Map<String, String> pathBasedUrlPathParameters = Map.of("key1", "value1", "key2", "value2");
Map<String, String> bodyBasedQueryParameters = Map.of("key3", "value3", "key4", "value4");

lookupQueryInfo = new LookupQueryInfo(null, bodyBasedQueryParameters, pathBasedUrlPathParameters);

assertThat(lookupQueryInfo.hasLookupQuery()).isFalse();
assertThat(lookupQueryInfo.hasPathBasedUrlParameters()).isTrue();
assertThat(lookupQueryInfo.getPathBasedUrlParameters()).isEqualTo(pathBasedUrlPathParameters);
assertThat(lookupQueryInfo.hasBodyBasedUrlQueryParameters()).isTrue();
assertThat(lookupQueryInfo.getBodyBasedUrlQueryParameters()).isEqualTo("key3=value3&key4=value4");

}

@Test
public void testPathBasedUrlParams2() {
Map<String, String> pathBasedUrlPathParameters = Map.of("key1", "value1", "key2", "value2");

lookupQueryInfo = new LookupQueryInfo(null, null, pathBasedUrlPathParameters);

assertThat(lookupQueryInfo.hasLookupQuery()).isFalse();
assertThat(lookupQueryInfo.hasPathBasedUrlParameters()).isTrue();
assertThat(lookupQueryInfo.getPathBasedUrlParameters()).isEqualTo(pathBasedUrlPathParameters);
}

@ParameterizedTest
@MethodSource("configProviderForGetURI")
void testGetUri(LookupQueryInfoTest.TestSpec testSpec) throws Exception {
LookupQueryInfo lookupQueryInfo = new LookupQueryInfo(testSpec.url, testSpec.bodyBasedUrlQueryParams,
testSpec.pathBasedUrlParams);
URI uri = lookupQueryInfo.getURI();
assertThat(uri.toString()).isEqualTo(testSpec.expected);
}

private static class TestSpec {

Map<String, String> bodyBasedUrlQueryParams;
Map<String, String> pathBasedUrlParams;
String url;
String expected;

private TestSpec(Map<String, String> bodyBasedUrlQueryParams,
Map<String, String> pathBasedUrlParams,
String url,
String expected) {
this.bodyBasedUrlQueryParams = bodyBasedUrlQueryParams;
this.pathBasedUrlParams = pathBasedUrlParams;
this.url = url;
this.expected = expected;
}

@Override
public String toString() {
return "TestSpec{"
+ "bodyBasedUrlQueryParams="
+ bodyBasedUrlQueryParams
+ ", pathBasedUrlParams="
+ pathBasedUrlParams
+ ", url="
+ url
+ ", expected="
+ expected
+ '}';
}
}

static Collection<TestSpec> configProviderForGetURI() {
return ImmutableList.<TestSpec>builder()
.addAll(getTestSpecs())
.build();
}

@NotNull
private static ImmutableList<TestSpec> getTestSpecs() {
return ImmutableList.of(
// 1 path param
new TestSpec(
null,
Map. of("param1", "value1"),
"http://service/{param1}",
"http://service/value1"),
// 2 path param
new TestSpec(
null,
Map. of("param1", "value1", "param2", "value2"),
"http://service/{param1}/param2/{param2}",
"http://service/value1/param2/value2"),
// 1 query param
new TestSpec(
Map. of("param3", "value3"),
null,
"http://service",
"http://service?param3=value3"),
// 2 query params
new TestSpec(
Map. of("param3", "value3", "param4", "value4"),
null,
"http://service",
"http://service?param3=value3&param4=value4"),
// 2 query params and 2 path params
new TestSpec(
Map. of("param3", "value3", "param4", "value4"),
Map. of("param1", "value1", "param2", "value2"),
"http://service/{param1}/param2/{param2}",
"http://service/value1/param2/value2?param3=value3&param4=value4")
);
}


}
Loading