A maven project to easily develop new QoOPipelines for the iQAS platform.
In total, 5 Github projects form the iQAS ecosystem:
- iqas-platform
The QoO-aware platform that allows consumers to integrate many observation sources, submit requests with QoO constraints and visualize QoO in real-time. - virtual-sensor-container
A shippable Virtual Sensor Container (VSC) Docker image for the iQAS platform. VSCs allow to generate observations at random, from file, or to retrieve them from the Web. - virtual-app-consumer
A shippable Virtual Application Consumers (VAC) Docker image for the iQAS platform. VACs allow to emulate fake consumers that submit iQAS requests and consume observations while logging the perceived QoO in real-time. - iqas-ontology
Ontological model and examples for the QoOonto ontology, the core ontology used by iQAS. - iqas-pipelines (this project)
An example of a custom-developed QoO Pipeline for the iQAS platform.
- Java (
1.8
) - Apache Maven (
3.3.9
)
- Download, configure and build the iQAS platform. See instructions here.
- Locate the "all-in-one" jar file that has been output when you compiled the iQAS platform.
Generally, it is located at$IQAS_DIR/target/iqas-platform-1.0-SNAPSHOT-allinone.jar
- Run the following command:
mvn install:install-file \ -Dfile=path_to_all_in_one_jar/iqas-platform-1.0-SNAPSHOT-allinone.jar \ -DgroupId=fr.isae.iqas \ -DartifactId=iqas-platform \ -Dversion=1.0-SNAPSHOT \ -Dpackaging=jar \ -DgeneratePom=true
- Now, you should be able to create a new maven project with the following dependency:
<dependencies> <dependency> <groupId>fr.isae.iqas</groupId> <artifactId>iqas-platform</artifactId> <version>1.0-SNAPSHOT</version> </dependency> </dependencies>
- Copy-paste the minimal following template. You can now define the logic for your QoO Pipeline.
If needed, you may have a look to the QoO Pipelines that have already been defined within the iQAS platform (IngestPipeline
,
OutputPipeline`, etc.).import akka.stream.FlowShape; import akka.stream.Graph; import akka.stream.Materializer; import akka.stream.javadsl.Flow; import akka.stream.javadsl.GraphDSL; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import fr.isae.iqas.model.observation.ObservationLevel; import fr.isae.iqas.model.observation.RawData; import fr.isae.iqas.pipelines.AbstractPipeline; import fr.isae.iqas.pipelines.IPipeline; import fr.isae.iqas.pipelines.mechanisms.CloneSameValueGS; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.json.JSONObject; import static fr.isae.iqas.model.request.Operator.NONE; public class MyQoOPipeline extends AbstractPipeline implements IPipeline { public MyQoOPipeline() { super("My QoO Pipeline", "MyQoOPipeline", true); addSupportedOperator(NONE); setParameter("test_param", String.valueOf(1), true); } @Override public Graph<FlowShape<ConsumerRecord<byte[], String>, ProducerRecord<byte[], String>>, Materializer> getPipelineGraph() { final ObservationLevel askedLevelFinal = getAskedLevel(); Graph runnableGraph = GraphDSL .create(builder -> { // ################################# YOUR CODE GOES HERE ################################# final FlowShape<ConsumerRecord, RawData> consumRecordToRawData = builder.add( Flow.of(ConsumerRecord.class).map(r -> { JSONObject sensorDataObject = new JSONObject(r.value().toString()); return new RawData( sensorDataObject.getString("date"), sensorDataObject.getString("value"), sensorDataObject.getString("producer"), sensorDataObject.getString("timestamps")); }) ); final FlowShape<RawData, ProducerRecord> rawDataToProdRecord = builder.add( Flow.of(RawData.class).map(r -> { ObjectMapper mapper = new ObjectMapper(); mapper.enable(SerializationFeature.INDENT_OUTPUT); return new ProducerRecord<byte[], String>(getTopicToPublish(), mapper.writeValueAsString(r)); }) ); builder.from(consumRecordToRawData.out()) .via(builder.add(new CloneSameValueGS<RawData>(Integer.valueOf(getParams().get("nb_copies"))))) .toInlet(rawDataToProdRecord.in()); // ################################# END OF YOUR CODE ################################# return new FlowShape<>(consumRecordToRawData.in(), rawDataToProdRecord.out()); }); return runnableGraph; } @Override public String getPipelineID() { return getClass().getSimpleName(); } }
- Compile your newly-defined QoO Pipeline:
mvn -T C2.0 clean install -DskipTests
- Locate the
.class
file corresponding to your QoO Pipeline. It should be in$PIPELINE_DIR/target/classes/my_pipeline.class
- Copy and paste this
.class
file within the "qoo_pipelines_dir" directory of the iQAS platform. - Register your new QoO Pipeline to the iQAS platform at
http://[api_gateway_endpoint_address]:[api_gateway_endpoint_port]/configuration
The iQAS platform have been developed during the PhD thesis of Antoine Auger at ISAE-SUPAERO (2014-2017).
This research was supported in part by the French Ministry of Defence through financial support of the Direction Générale de l’Armement (DGA).