From 9f0cf77a54f236bc27b3f11ad758fa842db38586 Mon Sep 17 00:00:00 2001 From: souvik ghosh Date: Thu, 5 Sep 2024 21:10:26 +0530 Subject: [PATCH] Fix and add tests camel flink extension on JVM mode --- extensions-jvm/flink/deployment/pom.xml | 2 +- extensions-jvm/flink/runtime/pom.xml | 12 ++- integration-tests-jvm/flink/pom.xml | 10 ++- .../component/flink/it/FlinkResource.java | 87 +++++++++++++++++-- .../quarkus/component/flink/it/FlinkTest.java | 78 +++++++++++++++-- pom.xml | 1 + 6 files changed, 173 insertions(+), 17 deletions(-) diff --git a/extensions-jvm/flink/deployment/pom.xml b/extensions-jvm/flink/deployment/pom.xml index 37af3bf25280..897b81db4070 100644 --- a/extensions-jvm/flink/deployment/pom.xml +++ b/extensions-jvm/flink/deployment/pom.xml @@ -58,4 +58,4 @@ - + \ No newline at end of file diff --git a/extensions-jvm/flink/runtime/pom.xml b/extensions-jvm/flink/runtime/pom.xml index 9317233d8a9f..090eb344348d 100644 --- a/extensions-jvm/flink/runtime/pom.xml +++ b/extensions-jvm/flink/runtime/pom.xml @@ -50,6 +50,16 @@ io.quarkus quarkus-extension-maven-plugin + + + + org.apache.flink:flink-core + org.apache.flink:flink-rpc-core + org.apache.flink:flink-runtime + + org.apache.maven.plugins @@ -66,4 +76,4 @@ - + \ No newline at end of file diff --git a/integration-tests-jvm/flink/pom.xml b/integration-tests-jvm/flink/pom.xml index 75aa8cb44e85..42236c8c2c82 100644 --- a/integration-tests-jvm/flink/pom.xml +++ b/integration-tests-jvm/flink/pom.xml @@ -51,6 +51,11 @@ rest-assured test + + org.awaitility + awaitility + test + @@ -63,6 +68,9 @@ + org.apache.camel.quarkus camel-quarkus-flink-deployment @@ -80,4 +88,4 @@ - + \ No newline at end of file diff --git a/integration-tests-jvm/flink/src/main/java/org/apache/camel/quarkus/component/flink/it/FlinkResource.java b/integration-tests-jvm/flink/src/main/java/org/apache/camel/quarkus/component/flink/it/FlinkResource.java index 0992ad6aa199..01fe5346ecb7 100644 --- a/integration-tests-jvm/flink/src/main/java/org/apache/camel/quarkus/component/flink/it/FlinkResource.java +++ b/integration-tests-jvm/flink/src/main/java/org/apache/camel/quarkus/component/flink/it/FlinkResource.java @@ -16,14 +16,29 @@ */ package org.apache.camel.quarkus.component.flink.it; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + import jakarta.enterprise.context.ApplicationScoped; import jakarta.inject.Inject; -import jakarta.ws.rs.GET; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; import org.apache.camel.CamelContext; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.component.flink.DataSetCallback; +import org.apache.camel.component.flink.FlinkConstants; +import org.apache.camel.component.flink.Flinks; +import org.apache.camel.component.flink.VoidDataStreamCallback; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.jboss.logging.Logger; @Path("/flink") @@ -33,18 +48,72 @@ public class FlinkResource { private static final Logger LOG = Logger.getLogger(FlinkResource.class); private static final String COMPONENT_FLINK = "flink"; + @Inject CamelContext context; - @Path("/load/component/flink") - @GET + @Inject + ProducerTemplate template; + + String flinkDataSetUri = "flink:dataSet?dataSet=#myDataSet"; + String flinkDataStreamUri = "flink:datastream?datastream=#myDataStream"; + + @Path("/dataset/{filePath}") + @POST + @Consumes(MediaType.TEXT_PLAIN) @Produces(MediaType.TEXT_PLAIN) - public Response loadComponentFlink() throws Exception { - /* This is an autogenerated test */ - if (context.getComponent(COMPONENT_FLINK) != null) { - return Response.ok().build(); + public Response dataSetFromTextFile(@PathParam("filePath") String filePath) { + + if (Files.exists(Paths.get(filePath))) { + ExecutionEnvironment env = Flinks.createExecutionEnvironment(); + DataSet myDataSet = env.readTextFile(filePath); + context.getRegistry().bind("myDataSet", myDataSet); + context.getRegistry().bind("countTotal", addDataSetCallback()); + Long totalCount = template.requestBody( + flinkDataSetUri + "&dataSetCallback=#countTotal", null, Long.class); + return Response.ok(totalCount).build(); } - LOG.warnf("Could not load [%s] from the Camel context", COMPONENT_FLINK); - return Response.status(500, COMPONENT_FLINK + " could not be loaded from the Camel context").build(); + + return Response.status(Response.Status.NOT_FOUND).build(); } + + @Path("/datastream/{filePath}") + @POST + @Consumes(MediaType.TEXT_PLAIN) + @Produces(MediaType.TEXT_PLAIN) + public Response loadStream(@PathParam("filePath") String filePath, String data) throws IOException { + java.nio.file.Path path = Paths.get(filePath); + if (path != null) { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + DataStream datastream = env.fromElements(data); + context.getRegistry().bind("myDataStream", datastream); + template.sendBodyAndHeader(flinkDataStreamUri, null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream dataStream, Object... objects) throws Exception { + dataStream.writeAsText(filePath, + org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE); + dataStream.getExecutionEnvironment().execute(); + } + }); + return Response.ok(Files.size(path)).build(); + } + return Response.status(Response.Status.NOT_FOUND).build(); + + } + + DataSetCallback addDataSetCallback() { + return new DataSetCallback() { + @Override + public Object onDataSet(DataSet ds, Object... payloads) { + try { + return ds.count(); + } catch (Exception e) { + return null; + } + } + }; + } + } diff --git a/integration-tests-jvm/flink/src/test/java/org/apache/camel/quarkus/component/flink/it/FlinkTest.java b/integration-tests-jvm/flink/src/test/java/org/apache/camel/quarkus/component/flink/it/FlinkTest.java index 8849792af142..dead7c77f7e4 100644 --- a/integration-tests-jvm/flink/src/test/java/org/apache/camel/quarkus/component/flink/it/FlinkTest.java +++ b/integration-tests-jvm/flink/src/test/java/org/apache/camel/quarkus/component/flink/it/FlinkTest.java @@ -16,19 +16,87 @@ */ package org.apache.camel.quarkus.component.flink.it; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Duration; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; + import io.quarkus.test.junit.QuarkusTest; import io.restassured.RestAssured; +import io.restassured.http.ContentType; +import org.apache.commons.io.FileUtils; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + @QuarkusTest class FlinkTest { @Test - public void loadComponentFlink() { - /* A simple autogenerated test */ - RestAssured.get("/flink/load/component/flink") - .then() - .statusCode(200); + public void dataSetCallback() throws IOException { + Path path = Files.createTempFile("fileDataSet", ".txt"); + try { + String text = "foo\n" + + "bar\n" + + "baz\n" + + "qux\n" + + "quux"; + Files.writeString(path, text); + RestAssured.given() + .contentType(ContentType.TEXT) + .post("/flink/dataset/{filePath}", path.toAbsolutePath().toString()) + .then() + .statusCode(200) + .and() + .body(greaterThanOrEqualTo("5")); + + } finally { + try { + Files.deleteIfExists(path); + } catch (Exception e) { + // Do nothing + } + } } + @Test + public void dataStreamCallback() throws IOException { + Path path = Files.createTempFile("fileDataStream", ".txt"); + try { + String text = "Hello!!Camel flink!"; + RestAssured.given() + .contentType(ContentType.TEXT) + .body(text) + .post("/flink/datastream/{filePath}", path.toAbsolutePath().toString()) + .then() + .statusCode(200); + + Awaitility.await() + .pollInterval(Duration.ofMillis(250)) + .atMost(10, TimeUnit.SECONDS) + .until(() -> { + if (Files.isDirectory(path)) { + try (Stream walk = Files.walk(path)) { + return walk.filter(Files::isRegularFile).anyMatch(filePath -> { + try { + if (Files.size(filePath) > 0) { + String content = Files.readString(filePath); + return content.trim().equals(text); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return false; + }); + } + } + return false; + }); + } finally { + FileUtils.deleteQuietly(path.toFile()); + } + } } diff --git a/pom.xml b/pom.xml index a66ceb0c4838..2a007e51e5c0 100644 --- a/pom.xml +++ b/pom.xml @@ -95,6 +95,7 @@ ${metrics-version} ${eddsa-version} 0.5.0 + ${flink-version} 2.3.33 0.6.2 1.313