From dacf1082289d51a19a091e61e669215b1a776e83 Mon Sep 17 00:00:00 2001 From: David Radley <39792797+davidradl@users.noreply.github.com> Date: Wed, 27 Mar 2024 08:41:01 +0000 Subject: [PATCH] [HTTP-64] move to junit 5 and fix junit failures (#72) Signed-off-by: David Radley --- .github/workflows/build.yml | 2 +- CHANGELOG.md | 4 + pom.xml | 29 ++- .../internal/sink/HttpSinkConnectionTest.java | 175 ++++++++++-------- 4 files changed, 123 insertions(+), 87 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 23f0aec1..d209b75d 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -18,7 +18,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - flink: [ "1.15.0", "1.15.3", "1.16.1" ] + flink: [ "1.16.3", "1.17.2", "1.18.1"] steps: - uses: actions/checkout@v3 diff --git a/CHANGELOG.md b/CHANGELOG.md index 60e7db16..a8b555b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## [Unreleased] +### Fixed + + Moved junit support to junit 5, allowing junits to be run against flink 1.17 and 1.18. + ## [0.12.0] - 2024-03-22 ### Added diff --git a/pom.xml b/pom.xml index e0b98497..c4cab477 100644 --- a/pom.xml +++ b/pom.xml @@ -68,7 +68,7 @@ under the License. - 1.15.0 + 1.16.3 11 2.12 @@ -76,7 +76,9 @@ under the License. ${target.java.version} 2.17.2 1.18.22 - 5.8.1 + 4.13.2 + 5.10.1 + ${junit5.version} 3.21.0 4.0.0 2.27.2 @@ -167,15 +169,21 @@ under the License. org.apache.flink - flink-table-common + flink-test-utils ${flink.version} - test-jar test org.apache.flink - flink-connector-base + flink-table-test-utils + ${flink.version} + test + + + + org.apache.flink + flink-table-common ${flink.version} test-jar test @@ -183,8 +191,9 @@ under the License. org.apache.flink - flink-table-planner_${scala.binary.version} + flink-connector-base ${flink.version} + test-jar test @@ -233,9 +242,9 @@ under the License. - org.junit.jupiter - junit-jupiter-params - ${junit.jupiter.version} + org.junit.vintage + junit-vintage-engine + ${junit5.version} test @@ -323,6 +332,8 @@ under the License. **/HttpLookupConnectorOptions.class **/Slf4jHttpPostRequestCallback.class **/SelfSignedTrustManager.class + org/apache/calcite** + org/apache/flink** diff --git a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java index cac72fe0..cbf33c2f 100644 --- a/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java +++ b/src/test/java/com/getindata/connectors/http/internal/sink/HttpSinkConnectionTest.java @@ -14,15 +14,19 @@ import com.github.tomakehurst.wiremock.stubbing.ServeEvent; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.reporter.MetricReporterFactory; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.testutils.junit.extensions.ContextClassLoaderExtension; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; import static com.github.tomakehurst.wiremock.client.WireMock.*; import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -35,17 +39,26 @@ public class HttpSinkConnectionTest { + @RegisterExtension + static final ContextClassLoaderExtension CONTEXT_CLASS_LOADER_EXTENSION = + ContextClassLoaderExtension.builder() + .withServiceEntry( + MetricReporterFactory.class, + SendErrorsTestReporterFactory.class.getName()) + .build(); + + private static final int SERVER_PORT = 9090; private static final int HTTPS_SERVER_PORT = 8443; private static final Set messageIds = IntStream.range(0, 50) - .boxed() - .collect(Collectors.toSet()); + .boxed() + .collect(Collectors.toSet()); private static final List messages = messageIds.stream() - .map(i -> "{\"http-sink-id\":" + i + "}") - .collect(Collectors.toList()); + .map(i -> "{\"http-sink-id\":" + i + "}") + .collect(Collectors.toList()); private StreamExecutionEnvironment env; @@ -53,13 +66,15 @@ public class HttpSinkConnectionTest { @BeforeEach public void setUp() { - SendErrorsTestReporter.reset(); + SendErrorsTestReporterFactory.reset(); env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration() { { - this.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "sendErrorsTestReporter." + - ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, - SendErrorsTestReporter.class.getName()); + setString( + ConfigConstants.METRICS_REPORTER_PREFIX + + "test." + + MetricOptions.REPORTER_FACTORY_CLASS.key(), + SendErrorsTestReporterFactory.class.getName()); } }); @@ -85,7 +100,7 @@ public void testConnection_singleRequestMode() throws Exception { }; List> responses = - testConnection(SinkRequestSubmitMode.SINGLE, responseMapper); + testConnection(SinkRequestSubmitMode.SINGLE, responseMapper); var idsSet = new HashSet<>(messageIds); for (var request : responses) { @@ -104,14 +119,14 @@ public void testConnection_batchRequestMode() throws Exception { Function>> responseMapper = response -> { try { return new ObjectMapper().readValue(response.getRequest().getBody(), - new TypeReference>>(){}); + new TypeReference>>(){}); } catch (IOException e) { throw new RuntimeException(e); } }; List>> responses = - testConnection(SinkRequestSubmitMode.BATCH, responseMapper); + testConnection(SinkRequestSubmitMode.BATCH, responseMapper); var idsSet = new HashSet<>(messageIds); for (var requests : responses) { @@ -134,39 +149,39 @@ public List testConnection( String contentTypeHeader = "application/json"; wireMockServer.stubFor(any(urlPathEqualTo(endpoint)) - .withHeader("Content-Type", equalTo(contentTypeHeader)) - .willReturn( - aResponse().withHeader("Content-Type", contentTypeHeader) - .withStatus(200) - .withBody("{}"))); + .withHeader("Content-Type", equalTo(contentTypeHeader)) + .willReturn( + aResponse().withHeader("Content-Type", contentTypeHeader) + .withStatus(200) + .withBody("{}"))); var source = env.fromCollection(messages); var httpSink = HttpSink.builder() - .setEndpointUrl("http://localhost:" + SERVER_PORT + endpoint) - .setElementConverter( - (s, _context) -> + .setEndpointUrl("http://localhost:" + SERVER_PORT + endpoint) + .setElementConverter( + (s, _context) -> new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) - .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) - .setProperty( - HttpConnectorConfigConstants.SINK_HEADER_PREFIX + "Content-Type", - contentTypeHeader) - .setProperty( - HttpConnectorConfigConstants.SINK_HTTP_REQUEST_MODE, - mode.getMode() - ) - .build(); + .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) + .setProperty( + HttpConnectorConfigConstants.SINK_HEADER_PREFIX + "Content-Type", + contentTypeHeader) + .setProperty( + HttpConnectorConfigConstants.SINK_HTTP_REQUEST_MODE, + mode.getMode() + ) + .build(); source.sinkTo(httpSink); env.execute("Http Sink test connection"); var responses = wireMockServer.getAllServeEvents(); assertTrue(responses.stream() - .allMatch(response -> Objects.equals(response.getRequest().getUrl(), endpoint))); + .allMatch(response -> Objects.equals(response.getRequest().getUrl(), endpoint))); assertTrue( - responses.stream().allMatch(response -> response.getResponse().getStatus() == 200)); + responses.stream().allMatch(response -> response.getResponse().getStatus() == 200)); assertTrue(responses.stream() - .allMatch(response -> Objects.equals(response.getRequest().getUrl(), endpoint))); + .allMatch(response -> Objects.equals(response.getRequest().getUrl(), endpoint))); assertTrue( - responses.stream().allMatch(response -> response.getResponse().getStatus() == 200)); + responses.stream().allMatch(response -> response.getResponse().getStatus() == 200)); List collect = responses.stream().map(responseMapper).collect(Collectors.toList()); assertTrue(collect.stream().allMatch(Objects::nonNull)); @@ -176,30 +191,30 @@ public List testConnection( @Test public void testServerErrorConnection() throws Exception { wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) - .inScenario("Retry Scenario") - .whenScenarioStateIs(STARTED) - .willReturn(serverError()) - .willSetStateTo("Cause Success")); + .withHeader("Content-Type", equalTo("application/json")) + .inScenario("Retry Scenario") + .whenScenarioStateIs(STARTED) + .willReturn(serverError()) + .willSetStateTo("Cause Success")); wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) - .inScenario("Retry Scenario") - .whenScenarioStateIs("Cause Success") - .willReturn(aResponse().withStatus(200)) - .willSetStateTo("Cause Success")); + .withHeader("Content-Type", equalTo("application/json")) + .inScenario("Retry Scenario") + .whenScenarioStateIs("Cause Success") + .willReturn(aResponse().withStatus(200)) + .willSetStateTo("Cause Success")); var source = env.fromCollection(List.of(messages.get(0))); var httpSink = HttpSink.builder() - .setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint") - .setElementConverter( - (s, _context) -> + .setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint") + .setElementConverter( + (s, _context) -> new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) - .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) - .build(); + .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) + .build(); source.sinkTo(httpSink); env.execute("Http Sink test failed connection"); - assertEquals(1, SendErrorsTestReporter.getCount()); + assertEquals(1, SendErrorsTestReporterFactory.getCount()); // TODO: reintroduce along with the retries // var postedRequests = wireMockServer // .findAll(postRequestedFor(urlPathEqualTo("/myendpoint"))); @@ -211,31 +226,32 @@ public void testServerErrorConnection() throws Exception { @Test public void testFailedConnection() throws Exception { wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) - .inScenario("Retry Scenario") - .whenScenarioStateIs(STARTED) - .willReturn(aResponse().withFault(Fault.EMPTY_RESPONSE)) - .willSetStateTo("Cause Success")); + .withHeader("Content-Type", equalTo("application/json")) + .inScenario("Retry Scenario") + .whenScenarioStateIs(STARTED) + .willReturn(aResponse().withFault(Fault.EMPTY_RESPONSE)) + .willSetStateTo("Cause Success")); wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) - .inScenario("Retry Scenario") - .whenScenarioStateIs("Cause Success") - .willReturn(aResponse().withStatus(200)) - .willSetStateTo("Cause Success")); + .withHeader("Content-Type", equalTo("application/json")) + .inScenario("Retry Scenario") + .whenScenarioStateIs("Cause Success") + .willReturn(aResponse().withStatus(200)) + .willSetStateTo("Cause Success")); var source = env.fromCollection(List.of(messages.get(0))); var httpSink = HttpSink.builder() - .setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint") - .setElementConverter( - (s, _context) -> - new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) - .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) - .build(); + .setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint") + .setElementConverter( + (s, _context) -> + new HttpSinkRequestEntry("POST", + s.getBytes(StandardCharsets.UTF_8))) + .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) + .build(); source.sinkTo(httpSink); env.execute("Http Sink test failed connection"); - assertEquals(1, SendErrorsTestReporter.getCount()); + assertEquals(1, SendErrorsTestReporterFactory.getCount()); // var postedRequests = wireMockServer // .findAll(postRequestedFor(urlPathEqualTo("/myendpoint"))); // assertEquals(2, postedRequests.size()); @@ -246,28 +262,28 @@ public void testFailedConnection() throws Exception { @Test public void testFailedConnection404OnWhiteList() throws Exception { wireMockServer.stubFor(any(urlPathEqualTo("/myendpoint")) - .withHeader("Content-Type", equalTo("application/json")) - .willReturn(aResponse().withBody("404 body").withStatus(404))); + .withHeader("Content-Type", equalTo("application/json")) + .willReturn(aResponse().withBody("404 body").withStatus(404))); var source = env.fromCollection(List.of(messages.get(0))); var httpSink = HttpSink.builder() - .setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint") - .setElementConverter( - (s, _context) -> + .setEndpointUrl("http://localhost:" + SERVER_PORT + "/myendpoint") + .setElementConverter( + (s, _context) -> new HttpSinkRequestEntry("POST", s.getBytes(StandardCharsets.UTF_8))) - .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) - .setProperty("gid.connector.http.sink.error.code.exclude", "404, 405") - .setProperty("gid.connector.http.sink.error.code", "4XX") - .build(); + .setSinkHttpClientBuilder(JavaNetSinkHttpClient::new) + .setProperty("gid.connector.http.sink.error.code.exclude", "404, 405") + .setProperty("gid.connector.http.sink.error.code", "4XX") + .build(); source.sinkTo(httpSink); env.execute("Http Sink test failed connection"); - assertEquals(0, SendErrorsTestReporter.getCount()); + assertEquals(0, SendErrorsTestReporterFactory.getCount()); } // must be public because of the reflection - public static class SendErrorsTestReporter implements MetricReporter { - + public static class SendErrorsTestReporterFactory + implements MetricReporter, MetricReporterFactory { static volatile List numRecordsSendErrors = null; public static long getCount() { @@ -300,5 +316,10 @@ public void notifyOfAddedMetric( @Override public void notifyOfRemovedMetric(Metric metric, String s, MetricGroup metricGroup) { } + + @Override + public MetricReporter createMetricReporter(Properties properties) { + return new SendErrorsTestReporterFactory(); + } } }