diff --git a/extensions-jvm/flink/deployment/pom.xml b/extensions-jvm/flink/deployment/pom.xml
index 37af3bf25280..897b81db4070 100644
--- a/extensions-jvm/flink/deployment/pom.xml
+++ b/extensions-jvm/flink/deployment/pom.xml
@@ -58,4 +58,4 @@
-
+
\ No newline at end of file
diff --git a/extensions-jvm/flink/runtime/pom.xml b/extensions-jvm/flink/runtime/pom.xml
index 9317233d8a9f..090eb344348d 100644
--- a/extensions-jvm/flink/runtime/pom.xml
+++ b/extensions-jvm/flink/runtime/pom.xml
@@ -50,6 +50,16 @@
io.quarkus
quarkus-extension-maven-plugin
+
+
+
+ org.apache.flink:flink-core
+ org.apache.flink:flink-rpc-core
+ org.apache.flink:flink-runtime
+
+
org.apache.maven.plugins
@@ -66,4 +76,4 @@
-
+
\ No newline at end of file
diff --git a/integration-tests-jvm/flink/pom.xml b/integration-tests-jvm/flink/pom.xml
index 75aa8cb44e85..42236c8c2c82 100644
--- a/integration-tests-jvm/flink/pom.xml
+++ b/integration-tests-jvm/flink/pom.xml
@@ -51,6 +51,11 @@
rest-assured
test
+
+ org.awaitility
+ awaitility
+ test
+
@@ -63,6 +68,9 @@
+
org.apache.camel.quarkus
camel-quarkus-flink-deployment
@@ -80,4 +88,4 @@
-
+
\ No newline at end of file
diff --git a/integration-tests-jvm/flink/src/main/java/org/apache/camel/quarkus/component/flink/it/FlinkResource.java b/integration-tests-jvm/flink/src/main/java/org/apache/camel/quarkus/component/flink/it/FlinkResource.java
index 0992ad6aa199..01fe5346ecb7 100644
--- a/integration-tests-jvm/flink/src/main/java/org/apache/camel/quarkus/component/flink/it/FlinkResource.java
+++ b/integration-tests-jvm/flink/src/main/java/org/apache/camel/quarkus/component/flink/it/FlinkResource.java
@@ -16,14 +16,29 @@
*/
package org.apache.camel.quarkus.component.flink.it;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
-import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Consumes;
+import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.camel.CamelContext;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.component.flink.DataSetCallback;
+import org.apache.camel.component.flink.FlinkConstants;
+import org.apache.camel.component.flink.Flinks;
+import org.apache.camel.component.flink.VoidDataStreamCallback;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.jboss.logging.Logger;
@Path("/flink")
@@ -33,18 +48,72 @@ public class FlinkResource {
private static final Logger LOG = Logger.getLogger(FlinkResource.class);
private static final String COMPONENT_FLINK = "flink";
+
@Inject
CamelContext context;
- @Path("/load/component/flink")
- @GET
+ @Inject
+ ProducerTemplate template;
+
+ String flinkDataSetUri = "flink:dataSet?dataSet=#myDataSet";
+ String flinkDataStreamUri = "flink:datastream?datastream=#myDataStream";
+
+ @Path("/dataset/{filePath}")
+ @POST
+ @Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
- public Response loadComponentFlink() throws Exception {
- /* This is an autogenerated test */
- if (context.getComponent(COMPONENT_FLINK) != null) {
- return Response.ok().build();
+ public Response dataSetFromTextFile(@PathParam("filePath") String filePath) {
+
+ if (Files.exists(Paths.get(filePath))) {
+ ExecutionEnvironment env = Flinks.createExecutionEnvironment();
+ DataSet myDataSet = env.readTextFile(filePath);
+ context.getRegistry().bind("myDataSet", myDataSet);
+ context.getRegistry().bind("countTotal", addDataSetCallback());
+ Long totalCount = template.requestBody(
+ flinkDataSetUri + "&dataSetCallback=#countTotal", null, Long.class);
+ return Response.ok(totalCount).build();
}
- LOG.warnf("Could not load [%s] from the Camel context", COMPONENT_FLINK);
- return Response.status(500, COMPONENT_FLINK + " could not be loaded from the Camel context").build();
+
+ return Response.status(Response.Status.NOT_FOUND).build();
}
+
+ @Path("/datastream/{filePath}")
+ @POST
+ @Consumes(MediaType.TEXT_PLAIN)
+ @Produces(MediaType.TEXT_PLAIN)
+ public Response loadStream(@PathParam("filePath") String filePath, String data) throws IOException {
+ java.nio.file.Path path = Paths.get(filePath);
+ if (path != null) {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ DataStream datastream = env.fromElements(data);
+ context.getRegistry().bind("myDataStream", datastream);
+ template.sendBodyAndHeader(flinkDataStreamUri, null,
+ FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
+ new VoidDataStreamCallback() {
+ @Override
+ public void doOnDataStream(DataStream dataStream, Object... objects) throws Exception {
+ dataStream.writeAsText(filePath,
+ org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
+ dataStream.getExecutionEnvironment().execute();
+ }
+ });
+ return Response.ok(Files.size(path)).build();
+ }
+ return Response.status(Response.Status.NOT_FOUND).build();
+
+ }
+
+ DataSetCallback addDataSetCallback() {
+ return new DataSetCallback() {
+ @Override
+ public Object onDataSet(DataSet ds, Object... payloads) {
+ try {
+ return ds.count();
+ } catch (Exception e) {
+ return null;
+ }
+ }
+ };
+ }
+
}
diff --git a/integration-tests-jvm/flink/src/test/java/org/apache/camel/quarkus/component/flink/it/FlinkTest.java b/integration-tests-jvm/flink/src/test/java/org/apache/camel/quarkus/component/flink/it/FlinkTest.java
index 8849792af142..dead7c77f7e4 100644
--- a/integration-tests-jvm/flink/src/test/java/org/apache/camel/quarkus/component/flink/it/FlinkTest.java
+++ b/integration-tests-jvm/flink/src/test/java/org/apache/camel/quarkus/component/flink/it/FlinkTest.java
@@ -16,19 +16,87 @@
*/
package org.apache.camel.quarkus.component.flink.it;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+
import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
+import io.restassured.http.ContentType;
+import org.apache.commons.io.FileUtils;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+
@QuarkusTest
class FlinkTest {
@Test
- public void loadComponentFlink() {
- /* A simple autogenerated test */
- RestAssured.get("/flink/load/component/flink")
- .then()
- .statusCode(200);
+ public void dataSetCallback() throws IOException {
+ Path path = Files.createTempFile("fileDataSet", ".txt");
+ try {
+ String text = "foo\n"
+ + "bar\n"
+ + "baz\n"
+ + "qux\n"
+ + "quux";
+ Files.writeString(path, text);
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .post("/flink/dataset/{filePath}", path.toAbsolutePath().toString())
+ .then()
+ .statusCode(200)
+ .and()
+ .body(greaterThanOrEqualTo("5"));
+
+ } finally {
+ try {
+ Files.deleteIfExists(path);
+ } catch (Exception e) {
+ // Do nothing
+ }
+ }
}
+ @Test
+ public void dataStreamCallback() throws IOException {
+ Path path = Files.createTempFile("fileDataStream", ".txt");
+ try {
+ String text = "Hello!!Camel flink!";
+ RestAssured.given()
+ .contentType(ContentType.TEXT)
+ .body(text)
+ .post("/flink/datastream/{filePath}", path.toAbsolutePath().toString())
+ .then()
+ .statusCode(200);
+
+ Awaitility.await()
+ .pollInterval(Duration.ofMillis(250))
+ .atMost(10, TimeUnit.SECONDS)
+ .until(() -> {
+ if (Files.isDirectory(path)) {
+ try (Stream walk = Files.walk(path)) {
+ return walk.filter(Files::isRegularFile).anyMatch(filePath -> {
+ try {
+ if (Files.size(filePath) > 0) {
+ String content = Files.readString(filePath);
+ return content.trim().equals(text);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ return false;
+ });
+ }
+ }
+ return false;
+ });
+ } finally {
+ FileUtils.deleteQuietly(path.toFile());
+ }
+ }
}
diff --git a/pom.xml b/pom.xml
index a66ceb0c4838..2a007e51e5c0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,6 +95,7 @@
${metrics-version}
${eddsa-version}
0.5.0
+ ${flink-version}
2.3.33
0.6.2
1.313