Skip to content

Commit

Permalink
Fix and add tests camel flink extension on JVM mode
Browse files Browse the repository at this point in the history
  • Loading branch information
svkcemk committed Sep 5, 2024
1 parent 6e3a51d commit 9f0cf77
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 17 deletions.
2 changes: 1 addition & 1 deletion extensions-jvm/flink/deployment/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,4 @@
</plugins>
</build>

</project>
</project>
12 changes: 11 additions & 1 deletion extensions-jvm/flink/runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,16 @@
<plugin>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-extension-maven-plugin</artifactId>
<configuration>
<!--Flink
uses Inverted Class Loading which causing issue for the Quarkus classloading
mechanism -->
<parentFirstArtifacts>
<parentFirstArtifact>org.apache.flink:flink-core</parentFirstArtifact>
<parentFirstArtifact>org.apache.flink:flink-rpc-core</parentFirstArtifact>
<parentFirstArtifact>org.apache.flink:flink-runtime</parentFirstArtifact>
</parentFirstArtifacts>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
Expand All @@ -66,4 +76,4 @@
</plugin>
</plugins>
</build>
</project>
</project>
10 changes: 9 additions & 1 deletion integration-tests-jvm/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand All @@ -63,6 +68,9 @@
</activation>
<dependencies>
<!-- The following dependencies guarantee that this module is built after them. You can update them by running `mvn process-resources -Pformat -N` from the source tree root directory -->
<!-- The following dependencies guarantee that this module is built after them. You
can update them by running `mvn process-resources -Pformat -N` from the source tree
root directory -->
<dependency>
<groupId>org.apache.camel.quarkus</groupId>
<artifactId>camel-quarkus-flink-deployment</artifactId>
Expand All @@ -80,4 +88,4 @@
</profile>
</profiles>

</project>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,29 @@
*/
package org.apache.camel.quarkus.component.flink.it;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.component.flink.DataSetCallback;
import org.apache.camel.component.flink.FlinkConstants;
import org.apache.camel.component.flink.Flinks;
import org.apache.camel.component.flink.VoidDataStreamCallback;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.jboss.logging.Logger;

@Path("/flink")
Expand All @@ -33,18 +48,72 @@ public class FlinkResource {
private static final Logger LOG = Logger.getLogger(FlinkResource.class);

private static final String COMPONENT_FLINK = "flink";

@Inject
CamelContext context;

@Path("/load/component/flink")
@GET
@Inject
ProducerTemplate template;

String flinkDataSetUri = "flink:dataSet?dataSet=#myDataSet";
String flinkDataStreamUri = "flink:datastream?datastream=#myDataStream";

@Path("/dataset/{filePath}")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Response loadComponentFlink() throws Exception {
/* This is an autogenerated test */
if (context.getComponent(COMPONENT_FLINK) != null) {
return Response.ok().build();
public Response dataSetFromTextFile(@PathParam("filePath") String filePath) {

if (Files.exists(Paths.get(filePath))) {
ExecutionEnvironment env = Flinks.createExecutionEnvironment();
DataSet<String> myDataSet = env.readTextFile(filePath);
context.getRegistry().bind("myDataSet", myDataSet);
context.getRegistry().bind("countTotal", addDataSetCallback());
Long totalCount = template.requestBody(
flinkDataSetUri + "&dataSetCallback=#countTotal", null, Long.class);
return Response.ok(totalCount).build();
}
LOG.warnf("Could not load [%s] from the Camel context", COMPONENT_FLINK);
return Response.status(500, COMPONENT_FLINK + " could not be loaded from the Camel context").build();

return Response.status(Response.Status.NOT_FOUND).build();
}

@Path("/datastream/{filePath}")
@POST
@Consumes(MediaType.TEXT_PLAIN)
@Produces(MediaType.TEXT_PLAIN)
public Response loadStream(@PathParam("filePath") String filePath, String data) throws IOException {
java.nio.file.Path path = Paths.get(filePath);
if (path != null) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> datastream = env.fromElements(data);
context.getRegistry().bind("myDataStream", datastream);
template.sendBodyAndHeader(flinkDataStreamUri, null,
FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER,
new VoidDataStreamCallback() {
@Override
public void doOnDataStream(DataStream dataStream, Object... objects) throws Exception {
dataStream.writeAsText(filePath,
org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
dataStream.getExecutionEnvironment().execute();
}
});
return Response.ok(Files.size(path)).build();
}
return Response.status(Response.Status.NOT_FOUND).build();

}

DataSetCallback addDataSetCallback() {
return new DataSetCallback() {
@Override
public Object onDataSet(DataSet ds, Object... payloads) {
try {
return ds.count();
} catch (Exception e) {
return null;
}
}
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,87 @@
*/
package org.apache.camel.quarkus.component.flink.it;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

import io.quarkus.test.junit.QuarkusTest;
import io.restassured.RestAssured;
import io.restassured.http.ContentType;
import org.apache.commons.io.FileUtils;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

import static org.hamcrest.Matchers.greaterThanOrEqualTo;

@QuarkusTest
class FlinkTest {

@Test
public void loadComponentFlink() {
/* A simple autogenerated test */
RestAssured.get("/flink/load/component/flink")
.then()
.statusCode(200);
public void dataSetCallback() throws IOException {
Path path = Files.createTempFile("fileDataSet", ".txt");
try {
String text = "foo\n"
+ "bar\n"
+ "baz\n"
+ "qux\n"
+ "quux";
Files.writeString(path, text);
RestAssured.given()
.contentType(ContentType.TEXT)
.post("/flink/dataset/{filePath}", path.toAbsolutePath().toString())
.then()
.statusCode(200)
.and()
.body(greaterThanOrEqualTo("5"));

} finally {
try {
Files.deleteIfExists(path);
} catch (Exception e) {
// Do nothing
}
}
}

@Test
public void dataStreamCallback() throws IOException {
Path path = Files.createTempFile("fileDataStream", ".txt");
try {
String text = "Hello!!Camel flink!";
RestAssured.given()
.contentType(ContentType.TEXT)
.body(text)
.post("/flink/datastream/{filePath}", path.toAbsolutePath().toString())
.then()
.statusCode(200);

Awaitility.await()
.pollInterval(Duration.ofMillis(250))
.atMost(10, TimeUnit.SECONDS)
.until(() -> {
if (Files.isDirectory(path)) {
try (Stream<Path> walk = Files.walk(path)) {
return walk.filter(Files::isRegularFile).anyMatch(filePath -> {
try {
if (Files.size(filePath) > 0) {
String content = Files.readString(filePath);
return content.trim().equals(text);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return false;
});
}
}
return false;
});
} finally {
FileUtils.deleteQuietly(path.toFile());
}
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
<dropwizard-metrics.version>${metrics-version}</dropwizard-metrics.version>
<eddsa.version>${eddsa-version}</eddsa.version>
<eclipse-transformer.version>0.5.0</eclipse-transformer.version>
<flink.version>${flink-version}</flink.version>
<freemarker.version>2.3.33</freemarker.version><!-- @sync io.quarkiverse.freemarker:quarkus-freemarker-parent:${quarkiverse-freemarker.version} prop:freemarker.version -->
<geny.version>0.6.2</geny.version>
<github-api.version>1.313</github-api.version><!-- Used in a Groovy script bellow -->
Expand Down

0 comments on commit 9f0cf77

Please sign in to comment.