Skip to content

Commit

Permalink
sUpdating with 2nd patch
Browse files Browse the repository at this point in the history
  • Loading branch information
davidradl committed Mar 25, 2024
1 parent c28db64 commit a1a9460
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 78 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ under the License.

<!-- IMPORTANT: If you update Flink, remember to update link to its docs in maven-javadoc-plugin <links>
section, omitting the patch part (so for 1.15.0 use 1.15). -->
<flink.version>1.18.1</flink.version>
<flink.version>1.16.3</flink.version>

<target.java.version>11</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@
import com.github.tomakehurst.wiremock.stubbing.ServeEvent;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.reporter.MetricReporterFactory;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.testutils.junit.extensions.ContextClassLoaderExtension;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED;
import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -35,31 +39,42 @@

public class HttpSinkConnectionTest {

@RegisterExtension
static final ContextClassLoaderExtension CONTEXT_CLASS_LOADER_EXTENSION =
ContextClassLoaderExtension.builder()
.withServiceEntry(
MetricReporterFactory.class,
SendErrorsTestReporterFactory.class.getName())
.build();


private static final int SERVER_PORT = 9090;

private static final int HTTPS_SERVER_PORT = 8443;

private static final Set<Integer> messageIds = IntStream.range(0, 50)
.boxed()
.collect(Collectors.toSet());
.boxed()
.collect(Collectors.toSet());

private static final List<String> messages = messageIds.stream()
.map(i -> "{\"http-sink-id\":" + i + "}")
.collect(Collectors.toList());
.map(i -> "{\"http-sink-id\":" + i + "}")
.collect(Collectors.toList());

private StreamExecutionEnvironment env;

private WireMockServer wireMockServer;

@BeforeEach
public void setUp() {
SendErrorsTestReporter.reset();
SendErrorsTestReporterFactory.reset();

env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration() {
{
this.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "sendErrorsTestReporter." +
ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX,
SendErrorsTestReporter.class.getName());
setString(
ConfigConstants.METRICS_REPORTER_PREFIX
+ "test."
+ MetricOptions.REPORTER_FACTORY_CLASS.key(),
SendErrorsTestReporterFactory.class.getName());
}
});

Expand All @@ -85,7 +100,7 @@ public void testConnection_singleRequestMode() throws Exception {
};

List<Map<Object, Object>> responses =
testConnection(SinkRequestSubmitMode.SINGLE, responseMapper);
testConnection(SinkRequestSubmitMode.SINGLE, responseMapper);

var idsSet = new HashSet<>(messageIds);
for (var request : responses) {
Expand All @@ -104,14 +119,14 @@ public void testConnection_batchRequestMode() throws Exception {
Function<ServeEvent, List<Map<Object, Object>>> responseMapper = response -> {
try {
return new ObjectMapper().readValue(response.getRequest().getBody(),
new TypeReference<List<Map<Object, Object>>>(){});
new TypeReference<List<Map<Object, Object>>>(){});
} catch (IOException e) {
throw new RuntimeException(e);
}
};

List<List<Map<Object, Object>>> responses =
testConnection(SinkRequestSubmitMode.BATCH, responseMapper);
testConnection(SinkRequestSubmitMode.BATCH, responseMapper);

var idsSet = new HashSet<>(messageIds);
for (var requests : responses) {
Expand All @@ -134,39 +149,39 @@ public <T> List<T> testConnection(
String contentTypeHeader = "application/json";

wireMockServer.stubFor(any(urlPathEqualTo(endpoint))
.withHeader("Content-Type", equalTo(contentTypeHeader))
.willReturn(
aResponse().withHeader("Content-Type", contentTypeHeader)
.withStatus(200)
.withBody("{}")));
.withHeader("Content-Type", equalTo(contentTypeHeader))
.willReturn(
aResponse().withHeader("Content-Type", contentTypeHeader)
.withStatus(200)
.withBody("{}")));

var source = env.fromCollection(messages);
var httpSink = HttpSink.<String>builder()
.setEndpointUrl("http://localhost:" + SERVER_PORT + endpoint)
.setElementConverter(
(s, _context) ->
.setEndpointUrl("http://localhost:" + SERVER_PORT + endpoint)
.setElementConverter(
(s, _context) ->
new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8)))
.setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
.setProperty(
HttpConnectorConfigConstants.SINK_HEADER_PREFIX + "Content-Type",
contentTypeHeader)
.setProperty(
HttpConnectorConfigConstants.SINK_HTTP_REQUEST_MODE,
mode.getMode()
)
.build();
.setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
.setProperty(
HttpConnectorConfigConstants.SINK_HEADER_PREFIX + "Content-Type",
contentTypeHeader)
.setProperty(
HttpConnectorConfigConstants.SINK_HTTP_REQUEST_MODE,
mode.getMode()
)
.build();
source.sinkTo(httpSink);
env.execute("Http Sink test connection");

var responses = wireMockServer.getAllServeEvents();
assertTrue(responses.stream()
.allMatch(response -> Objects.equals(response.getRequest().getUrl(), endpoint)));
.allMatch(response -> Objects.equals(response.getRequest().getUrl(), endpoint)));
assertTrue(
responses.stream().allMatch(response -> response.getResponse().getStatus() == 200));
responses.stream().allMatch(response -> response.getResponse().getStatus() == 200));
assertTrue(responses.stream()
.allMatch(response -> Objects.equals(response.getRequest().getUrl(), endpoint)));
.allMatch(response -> Objects.equals(response.getRequest().getUrl(), endpoint)));
assertTrue(
responses.stream().allMatch(response -> response.getResponse().getStatus() == 200));
responses.stream().allMatch(response -> response.getResponse().getStatus() == 200));

List<T> collect = responses.stream().map(responseMapper).collect(Collectors.toList());
assertTrue(collect.stream().allMatch(Objects::nonNull));
Expand All @@ -176,30 +191,30 @@ public <T> List<T> testConnection(
@Test
public void testServerErrorConnection() throws Exception {
wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint"))
.withHeader("Content-Type", equalTo("application/json"))
.inScenario("Retry Scenario")
.whenScenarioStateIs(STARTED)
.willReturn(serverError())
.willSetStateTo("Cause Success"));
.withHeader("Content-Type", equalTo("application/json"))
.inScenario("Retry Scenario")
.whenScenarioStateIs(STARTED)
.willReturn(serverError())
.willSetStateTo("Cause Success"));
wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint"))
.withHeader("Content-Type", equalTo("application/json"))
.inScenario("Retry Scenario")
.whenScenarioStateIs("Cause Success")
.willReturn(aResponse().withStatus(200))
.willSetStateTo("Cause Success"));
.withHeader("Content-Type", equalTo("application/json"))
.inScenario("Retry Scenario")
.whenScenarioStateIs("Cause Success")
.willReturn(aResponse().withStatus(200))
.willSetStateTo("Cause Success"));

var source = env.fromCollection(List.of(messages.get(0)));
var httpSink = HttpSink.<String>builder()
.setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint")
.setElementConverter(
(s, _context) ->
.setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint")
.setElementConverter(
(s, _context) ->
new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8)))
.setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
.build();
.setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
.build();
source.sinkTo(httpSink);
env.execute("Http Sink test failed connection");

assertEquals(1, SendErrorsTestReporter.getCount());
assertEquals(1, SendErrorsTestReporterFactory.getCount());
// TODO: reintroduce along with the retries
// var postedRequests = wireMockServer
// .findAll(postRequestedFor(urlPathEqualTo("/myendpoint")));
Expand All @@ -211,31 +226,32 @@ public void testServerErrorConnection() throws Exception {
@Test
public void testFailedConnection() throws Exception {
wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint"))
.withHeader("Content-Type", equalTo("application/json"))
.inScenario("Retry Scenario")
.whenScenarioStateIs(STARTED)
.willReturn(aResponse().withFault(Fault.EMPTY_RESPONSE))
.willSetStateTo("Cause Success"));
.withHeader("Content-Type", equalTo("application/json"))
.inScenario("Retry Scenario")
.whenScenarioStateIs(STARTED)
.willReturn(aResponse().withFault(Fault.EMPTY_RESPONSE))
.willSetStateTo("Cause Success"));

wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint"))
.withHeader("Content-Type", equalTo("application/json"))
.inScenario("Retry Scenario")
.whenScenarioStateIs("Cause Success")
.willReturn(aResponse().withStatus(200))
.willSetStateTo("Cause Success"));
.withHeader("Content-Type", equalTo("application/json"))
.inScenario("Retry Scenario")
.whenScenarioStateIs("Cause Success")
.willReturn(aResponse().withStatus(200))
.willSetStateTo("Cause Success"));

var source = env.fromCollection(List.of(messages.get(0)));
var httpSink = HttpSink.<String>builder()
.setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint")
.setElementConverter(
(s, _context) ->
new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8)))
.setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
.build();
.setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint")
.setElementConverter(
(s, _context) ->
new HttpSinkRequestEntry("POST",
s.getBytes(StandardCharsets.UTF_8)))
.setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
.build();
source.sinkTo(httpSink);
env.execute("Http Sink test failed connection");

assertEquals(1, SendErrorsTestReporter.getCount());
assertEquals(1, SendErrorsTestReporterFactory.getCount());
// var postedRequests = wireMockServer
// .findAll(postRequestedFor(urlPathEqualTo("/myendpoint")));
// assertEquals(2, postedRequests.size());
Expand All @@ -246,28 +262,28 @@ public void testFailedConnection() throws Exception {
@Test
public void testFailedConnection404OnWhiteList() throws Exception {
wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint"))
.withHeader("Content-Type", equalTo("application/json"))
.willReturn(aResponse().withBody("404 body").withStatus(404)));
.withHeader("Content-Type", equalTo("application/json"))
.willReturn(aResponse().withBody("404 body").withStatus(404)));

var source = env.fromCollection(List.of(messages.get(0)));
var httpSink = HttpSink.<String>builder()
.setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint")
.setElementConverter(
(s, _context) ->
.setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint")
.setElementConverter(
(s, _context) ->
new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8)))
.setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
.setProperty("gid.connector.http.sink.error.code.exclude", "404, 405")
.setProperty("gid.connector.http.sink.error.code", "4XX")
.build();
.setSinkHttpClientBuilder(JavaNetSinkHttpClient::new)
.setProperty("gid.connector.http.sink.error.code.exclude", "404, 405")
.setProperty("gid.connector.http.sink.error.code", "4XX")
.build();
source.sinkTo(httpSink);
env.execute("Http Sink test failed connection");

assertEquals(0, SendErrorsTestReporter.getCount());
assertEquals(0, SendErrorsTestReporterFactory.getCount());
}

// must be public because of the reflection
public static class SendErrorsTestReporter implements MetricReporter {

public static class SendErrorsTestReporterFactory
implements MetricReporter, MetricReporterFactory {
static volatile List<Counter> numRecordsSendErrors = null;

public static long getCount() {
Expand Down Expand Up @@ -300,5 +316,10 @@ public void notifyOfAddedMetric(
@Override
public void notifyOfRemovedMetric(Metric metric, String s, MetricGroup metricGroup) {
}

@Override
public MetricReporter createMetricReporter(Properties properties) {
return new SendErrorsTestReporterFactory();
}
}
}

0 comments on commit a1a9460

Please sign in to comment.