From 9d5515f1cbde663be98148a862675c4b9ef6d3e6 Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Fri, 23 Aug 2024 18:20:52 +0200 Subject: [PATCH 1/7] docs: section about custom source and sink types --- .../for-contributors/contributor-handbook.md | 6 + .../data-plane/data-plane-extensions.md | 430 ++++++++++++++++++ .../data-plane-signaling.md | 2 +- 3 files changed, 437 insertions(+), 1 deletion(-) create mode 100644 developer/wip/for-contributors/data-plane/data-plane-extensions.md diff --git a/developer/wip/for-contributors/contributor-handbook.md b/developer/wip/for-contributors/contributor-handbook.md index 61d2db3f..8d3aaca4 100644 --- a/developer/wip/for-contributors/contributor-handbook.md +++ b/developer/wip/for-contributors/contributor-handbook.md @@ -200,6 +200,12 @@ information about it and other topics such as data plane self-registration and p ### 4.2 Writing a custom data plane extension (sink/source) +The EDC Data Plane is build on top of the *Data Plane Framework (DPF)*, which can be used for building custom data planes. + The framework has extensibility points for supporting different data sources and sinks (e.g., S3, HTTP, Kafka) and can perform direct streaming between different source and sink types. + +Detailed documentation about writing a custom data plane extension can be found [here](./data-plane/data-plane-extensions.md) + + ### 4.3 Writing a custom data plane (using only DPS) ## 5. Development best practices diff --git a/developer/wip/for-contributors/data-plane/data-plane-extensions.md b/developer/wip/for-contributors/data-plane/data-plane-extensions.md new file mode 100644 index 00000000..629cd937 --- /dev/null +++ b/developer/wip/for-contributors/data-plane/data-plane-extensions.md @@ -0,0 +1,430 @@ +# Data Plane extensions + + +* [Data Plane extensions](#data-plane-extensions) + * [1. The `DataPlaneManager`](#the-dataplanemanager`) + * [1.1 Consumer PULL](#consumer-pull-flow) + * [1.2 Provider PUSH](#provider-push-flow) + * [2. The Data Plane Framework ](#the-data-plane-framework) + * [2.1 Transfer Service](#transferservice) + * [2.2 Pipeline Service](#pipelineservice) + * [3. Writing custom Source/Sink ](#writing-custom-sourcesink) + * [3.1 Custom DataSource ](#custom-source) + * [3.2 Custom DataSink ](#custom-datasink) + * [3.3 Executing the transfer ](#executing-the-transfer) + + + +The EDC Data Plane is a component responsible for transmitting data using a wire protocol and can be easily extended using the [DFF](#the-data-plane-framework) for supporting different protocols and transfer types. + +The main component of an EDC data plane is the [DataPlaneManager](#the-dataplanemanager). + +## The DataPlaneManager + +The `DataPlaneManager` manages execution of data plane requests, using the EDC [State Machine](../control-plane/programming-primitives.md#1-state-machines) pattern for tracking the state of data transmissions. + +It receives `DataFlowStartMessage` from the [Control Plane](../contributor-handbook.md#2-the-control-plane) through [data plane signaling](./data-plane-signaling/data-plane-signaling.md) protocol if it's deployed as standalone process, or directly with method call when it's embedded in the same process. + +The `DataPlaneManager` supports two flow [types](../control-plane/entities.md#71-transfer-and-data-flows-types): + +- [Consumer PULL](#pull-flow) +- [Provider PUSH](#push-flow) + +### Consumer PULL Flow + +When the flow type of the `DataFlowStartMessage` is `PULL` the `DataPlaneManager` delegates the creation of the `DataAddress` to the [`DataPlaneAuthorizationService`](./data-plane-signaling/data-plane-signaling.md#323-access-token-generation), and then returns it to the [ControlPlane](../contributor-handbook.md#2-the-control-plane) as part of the response to a `DataFlowStartMessage`. + +### Provider PUSH Flow + +When the flow type is `PUSH`, the data transmission is handled by the [DPF](#the-data-plane-framework) using the information contained in the `DataFlowStartMessage` such as `sourceDataAddress` and `destinationDataAddress`. + +## The Data Plane Framework + +The `DPF` consists on a set of SPIs and default implementations for transferring data from a `sourceDataAddress` to a `destinationDataAddress`. It has a built-in support for end-to-end streaming transfers using the [PipelineService](#pipelineservice) and it comes with a more generic [TransferService](#transferservice) that can be extended to satisfy more specialized or optimized transfer case. + +Each `TransferService` is registered in the `TransferServiceRegistry`, that the `DataPlaneManager` uses for validating and initiating a data transfer from a `DataFlowStartMessage`. + +### TransferService + +Given a `DataFlowStartMessage`, an implementation of a `TransferService` can transfer data from a `sourceDataAddress` to a `destinationDataAddress`. + +The `TransferService` does not specify how the transfer should happen. It can be internally in the data plane or it could leverage on external and more performant system. + +Relevant methods of the `TransferService` are: + +```java +public interface TransferService { + + boolean canHandle(DataFlowStartMessage request); + + Result validate(DataFlowStartMessage request); + + CompletableFuture> transfer(DataFlowStartMessage request); +} +``` + +- The `canHandle` expresses if the `TransferService` implementation is able to fulfill the transfer request expressed in the `DataFlowStartMessage`. + +- The `validate` performs a validation on the content of a `DataFlowStartMessage`. + +- The `transfer` triggers a data transfer from a `sourceDataAddress` to a `destinationDataAddress`. + +An implementation of a `TransferService` bundled with the [DPF](#the-data-plane-framework) is the [PipelineService](#pipelineservice). + +### PipelineService + +The `PipelineService` is an extension of [TransferService](#transferservice) that leverages on internal Data-Plane transfer mechanism. +It supports end-to-end streaming by connecting a `DataSink`(output) and a `DataSource` (input). + +`DataSink` and `DataSource` are created for each data transfer using `DataSinkFactory` and `DataSourceFactory` from the `DataFlowStartMessage`. Custom source and sink factories should be registered in the `PipelineService` for adding support different data source and sink types (e.g. S3, HTTP, Kafka). + +```java +public interface PipelineService extends TransferService { + + void registerFactory(DataSourceFactory factory); + + void registerFactory(DataSinkFactory factory); + +} +``` + +When the `PipelineService` receives a transfer request, it identifies which `DataSourceFactory` and `DataSinkFactory` can satisfy a `DataFlowStartMessage`, then it creates their respective `DataSource` and `DataSink` and ultimately initiate the transfer by calling `DataSink#transfer(DataSource)`. + +EDC supports out of the box (with specialized extensions) a variety of data source and sink types like S3, HTTP, Kafka, AzureStorage, but it can be easily [extended](#writing-custom-sourcesink) with new types. + +## Writing custom Source/Sink + +The `PipelineService` is the entry point for adding new source and sink types to a data plane runtime. + +We will see how to write a [custom data source](#custom-datasource), a [custom data sink](#custom-datasink) and how we can trigger a transfer leveraging those new types. + +Just as example we will write a custom source type that is based on filesystem and a sink type that is based on [SMTP](https://it.wikipedia.org/wiki/Simple_Mail_Transfer_Protocol) + +> Note: those custom extensions are just example for didactic purpose. + +As always when extending the EDC, the starting point is to create an extension: + +```java +public class MyDataPlaneExtension implements ServiceExtension { + + @Inject + PipelineService pipelineService; + + @Override + public void initialize(ServiceExtensionContext context) { + + } +} +``` + +where we inject the `PipelineService`. + +> the extension module should include `data-plane-spi` as dependency. + +### Custom DataSource + +Just for simplicity the filesystem based `DataSource` will just support transferring a single file and not folders. + +Here's how an implementation of `FileDataSource` might look like: + +```java +public class FileDataSource implements DataSource { + + private final File sourceFile; + + public FileDataSource(File sourceFile) { + this.sourceFile = sourceFile; + } + + @Override + public StreamResult> openPartStream() { + return StreamResult.success(Stream.of(new FileStreamPart(sourceFile))); + } + + @Override + public void close() { + } + + private record FileStreamPart(File file) implements Part { + + @Override + public String name() { + return file.getName(); + } + + @Override + public InputStream openStream() { + try { + return new FileInputStream(file); + } catch (FileNotFoundException e) { + throw new RuntimeException(e); + } + } + } +} +``` + +The relevant method is the `openPartStream`, which will be called for plumbing the source and sink. The `openPartStream` returns a `Stream` of `Part`s, as the `DataSource` can be composed by more that one part (e.g. folders). The `openPartStream` does not actually opens a Java `InputStream`, but rather returns a just stream of `Part`s. + +How to actually transform a `Part` into an `InputStream` must be implemented. In our case the `FileStreamPart#openStream` just returns a `FileInputStream` from the input `File`. + +Now we have a `DataSource` that can be used for transferring the content of a file. The only missing bit is how to create a `DataSource` for a transfer request. + +This can be achieved by implementing a `DataSourceFactory` that creates the `FileDataSource` from a `DataFlowStartMessage`: + + +```java +public class FileDataSourceFactory implements DataSourceFactory { + + @Override + public String supportedType() { + return "File"; + } + + @Override + public DataSource createSource(DataFlowStartMessage request) { + return new FileDataSource(getFile(request).orElseThrow(RuntimeException::new)); + } + + @Override + public @NotNull Result validateRequest(DataFlowStartMessage request) { + return getFile(request) + .map(it -> Result.success()) + .orElseGet(() -> Result.failure("sourceFile is not found or it does not exist")); + } + + private Optional getFile(DataFlowStartMessage request) { + return Optional.ofNullable(request.getSourceDataAddress().getStringProperty("sourceFile")) + .map(File::new) + .filter(File::exists) + .filter(File::isFile); + } +} +``` + +For our implementation we express in the `supportedType` method that the +the `sourceDataAddress` should be of type `File` and in the `validateRequest` method +that it should contains a property `sourceFile` containing the path of the file to be transferred. + +The `FileDataSourceFactory` then should be registered in the `PipelineService`: + +```java +public class MyDataPlaneExtension implements ServiceExtension { + + @Inject + PipelineService pipelineService; + + @Override + public void initialize(ServiceExtensionContext context) { + pipelineService.registerFactory(new FileDataSourceFactory()); + } +} +``` + +### Custom DataSink + +For the `DataSink` we will sketch an implementation of an SMTP based one using the [javamail](https://javaee.github.io/javamail/) API. +The implementation should send the `Part`s of the input `DataSource` as email attachments to a `recipient`. + +The `MailDataSink` may look like this: + +```java +public class MailDataSink implements DataSink { + + private final Session session; + private final String recipient; + private final String sender; + private final String subject; + + public MailDataSink(Session session, String recipient, String sender, String subject) { + this.session = session; + this.recipient = recipient; + this.sender = sender; + this.subject = subject; + } + + @Override + public CompletableFuture> transfer(DataSource source) { + var msg = new MimeMessage(session); + try { + msg.setSentDate(new Date()); + msg.setRecipients(Message.RecipientType.TO, recipient); + msg.setSubject(subject, "UTF-8"); + msg.setFrom(sender); + + var streamResult = source.openPartStream(); + if (streamResult.failed()) { + return CompletableFuture.failedFuture(new EdcException(streamResult.getFailureDetail())); + } + + var multipart = new MimeMultipart(); + streamResult.getContent() + .map(this::createBodyPart) + .forEach(part -> { + try { + multipart.addBodyPart(part); + } catch (MessagingException e) { + throw new EdcException(e); + } + }); + + msg.setContent(multipart); + Transport.send(msg); + return CompletableFuture.completedFuture(StreamResult.success()); + } catch (Exception e) { + return CompletableFuture.failedFuture(e); + } + } + + private BodyPart createBodyPart(DataSource.Part part) { + try { + var messageBodyPart = new MimeBodyPart(); + messageBodyPart.setFileName(part.name()); + var source = new ByteArrayDataSource(part.openStream(), part.mediaType()); + messageBodyPart.setDataHandler(new DataHandler(source)); + return messageBodyPart; + } catch (Exception e) { + throw new EdcException(e); + } + } +} + +``` + +The `MailDataSink` receives in input a `DataSource` in the `transfer` method. After setting up the `MimeMessage` with `recipient`, `sender` and the `subject`, the code maps each `DataSource.Part` into a `BodyPart`(attachments), with `Part#name` as the name of each attachment. + +The message is finally delivered using the `Transport` API. + +> In this case is not a proper streaming, since the `javamail` buffers the `InputStream` when using the `ByteArrayDataSource`. + +To use the `MailDataSink` as available sink type, an implementation of the `DataSinkFactory` is required: + +```java +public class MailDataSinkFactory implements DataSinkFactory { + + private final Session session; + private final String sender; + + public MailDataSinkFactory(Session session, String sender) { + this.session = session; + this.sender = sender; + } + + @Override + public String supportedType() { + return "Mail"; + } + + @Override + public DataSink createSink(DataFlowStartMessage request) { + var recipient = getRecipient(request); + var subject = "File transfer %s".formatted(request.getProcessId()); + return new MailDataSink(session, recipient, sender, subject); + } + + @Override + public @NotNull Result validateRequest(DataFlowStartMessage request) { + return Optional.ofNullable(getRecipient(request)) + .map(it -> Result.success()) + .orElseGet(() -> Result.failure("Missing recipient")); + } + + private String getRecipient(DataFlowStartMessage request) { + var destination = request.getDestinationDataAddress(); + return destination.getStringProperty("recipient"); + } +} + +``` + +The `MailDataSinkFactory` declares the supported type (`Mail`) and implements validation and creation of the `DataSource` based on the `destinationAddress` in the `DataFlowStartMessage`. + +In the validation phase only expects the `recipient` as additional property in the `DataAddress` of the destination. + +Ultimately the `MailDataSinkFactory` should be registered in the `PipelineService`: + +```java +public class MyDataPlaneExtension implements ServiceExtension { + + @Inject + PipelineService pipelineService; + + @Override + public void initialize(ServiceExtensionContext context) { + pipelineService.registerFactory(new FileDataSourceFactory()); + + var sender = // fetch the sender from config + pipelineService.registerFactory(new MailDataSinkFactory(getSession(context),sender)); + } + + private Session getSession(ServiceExtensionContext context) { + // configure the java mail Session + } +} +``` + +### Executing the transfer + +With the `MyDataPlaneExtension` loaded in the provider data plane, that adds +a new `source` type based on filesystem and a `sink` in the runtime we can now complete a `File` -> `Mail` transfer. + +On the provider side we can create an [Asset](../control-plane/entities.md#1-assets) like this: + +```json +{ + "@context": { "@vocab": "https://w3id.org/edc/v0.0.1/ns/" }, + "@id": "file-asset", + "properties": { + }, + "dataAddress": { + "type": "File", + "sourceFile": "{{filePath}}" + } +} +``` + +The `Asset` then should then be [advertised](../control-plane/entities.md#3-contract-definitions) in the [catalog](../control-plane/entities.md#6-catalog). + +When a consumer fetches the provider's [catalog](../control-plane/entities.md#6-catalog), +if the access policy conditions are met, it should see the [Dataset](../control-plane/entities.md#6-catalog) with a new distribution available. + +```json +{ + "@type": "dcat:Distribution", + "dct:format": { + "@id": "Mail-PUSH" + }, + "dcat:accessService": { + "@id": "ef9494bb-7000-4bae-9770-6567f451dba5", + "@type": "dcat:DataService", + "dcat:endpointDescription": "dspace:connector", + "dcat:endpointUrl": "http://localhost:18182/protocol", + "dct:terms": "dspace:connector", + "dct:endpointUrl": "http://localhost:18182/protocol" + } +} +``` + +which indicates that the `Dataset` is also available with the format `Mail-PUSH`. + +Once a contract [agreement](../control-plane/entities.md#5-contract-agreements) is reached between the parties, a consumer may send a transfer request: + +```json +{ + "@context": { + "@vocab": "https://w3id.org/edc/v0.0.1/ns/" + }, + "@type": "TransferRequest", + "dataDestination": { + "type": "Mail", + "recipient": "{{recipientEmail}}" + }, + "protocol": "dataspace-protocol-http", + "contractId": "{{agreementId}}", + "connectorId": "provider", + "counterPartyAddress": "http://localhost:18182/protocol", + "transferType": "Mail-PUSH" +} +``` + +that will deliver the `Dataset` as attachments in the `recipient` email address. diff --git a/developer/wip/for-contributors/data-plane/data-plane-signaling/data-plane-signaling.md b/developer/wip/for-contributors/data-plane/data-plane-signaling/data-plane-signaling.md index 6e5a0e7d..a4cf8c6d 100644 --- a/developer/wip/for-contributors/data-plane/data-plane-signaling/data-plane-signaling.md +++ b/developer/wip/for-contributors/data-plane/data-plane-signaling/data-plane-signaling.md @@ -176,7 +176,7 @@ consumer can initiate a new process before or after that period. #### 3.2.3 Access Token Generation -When the `DataFlowManager` receives a `DataFlowStartMessage` to start the data transmission, it uses the +When the `DataPlaneManager` receives a `DataFlowStartMessage` to start the data transmission, it uses the `DataPlaneAuthorizationService` to generate an access token (in JWT format) and a `DataAddress`, that contains the follwing information: From f5fbcab75674be5f088207415d47c36e88990002 Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Tue, 27 Aug 2024 15:22:27 +0200 Subject: [PATCH 2/7] Update developer/wip/for-contributors/data-plane/data-plane-extensions.md Co-authored-by: Paul Latzelsperger <43503240+paullatzelsperger@users.noreply.github.com> --- .../wip/for-contributors/data-plane/data-plane-extensions.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/developer/wip/for-contributors/data-plane/data-plane-extensions.md b/developer/wip/for-contributors/data-plane/data-plane-extensions.md index 629cd937..964bbe4f 100644 --- a/developer/wip/for-contributors/data-plane/data-plane-extensions.md +++ b/developer/wip/for-contributors/data-plane/data-plane-extensions.md @@ -15,7 +15,7 @@ -The EDC Data Plane is a component responsible for transmitting data using a wire protocol and can be easily extended using the [DFF](#the-data-plane-framework) for supporting different protocols and transfer types. +The EDC Data Plane is a component responsible for transmitting data using a wire protocol and can be easily extended using the [Data Plane Framework (DPF)](#the-data-plane-framework) for supporting different protocols and transfer types. The main component of an EDC data plane is the [DataPlaneManager](#the-dataplanemanager). From 07456bf460d01c7b96330d94481c67a112110dd0 Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Tue, 27 Aug 2024 15:23:16 +0200 Subject: [PATCH 3/7] Update developer/wip/for-contributors/data-plane/data-plane-extensions.md Co-authored-by: Paul Latzelsperger <43503240+paullatzelsperger@users.noreply.github.com> --- .../wip/for-contributors/data-plane/data-plane-extensions.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/developer/wip/for-contributors/data-plane/data-plane-extensions.md b/developer/wip/for-contributors/data-plane/data-plane-extensions.md index 964bbe4f..94097139 100644 --- a/developer/wip/for-contributors/data-plane/data-plane-extensions.md +++ b/developer/wip/for-contributors/data-plane/data-plane-extensions.md @@ -23,7 +23,7 @@ The main component of an EDC data plane is the [DataPlaneManager](#the-dataplane The `DataPlaneManager` manages execution of data plane requests, using the EDC [State Machine](../control-plane/programming-primitives.md#1-state-machines) pattern for tracking the state of data transmissions. -It receives `DataFlowStartMessage` from the [Control Plane](../contributor-handbook.md#2-the-control-plane) through [data plane signaling](./data-plane-signaling/data-plane-signaling.md) protocol if it's deployed as standalone process, or directly with method call when it's embedded in the same process. +It receives `DataFlowStartMessage` from the [Control Plane](../contributor-handbook.md#2-the-control-plane) through the [data plane signaling](./data-plane-signaling/data-plane-signaling.md) protocol if it's deployed as standalone process, or directly via method call when it's embedded in the same process. The `DataPlaneManager` supports two flow [types](../control-plane/entities.md#71-transfer-and-data-flows-types): From 8e1ca7b975d0b628ac1340aa4fc20d46e024f4b9 Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Tue, 27 Aug 2024 15:23:27 +0200 Subject: [PATCH 4/7] Update developer/wip/for-contributors/data-plane/data-plane-extensions.md Co-authored-by: Paul Latzelsperger <43503240+paullatzelsperger@users.noreply.github.com> --- .../wip/for-contributors/data-plane/data-plane-extensions.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/developer/wip/for-contributors/data-plane/data-plane-extensions.md b/developer/wip/for-contributors/data-plane/data-plane-extensions.md index 94097139..fad49911 100644 --- a/developer/wip/for-contributors/data-plane/data-plane-extensions.md +++ b/developer/wip/for-contributors/data-plane/data-plane-extensions.md @@ -48,7 +48,7 @@ Each `TransferService` is registered in the `TransferServiceRegistry`, that the Given a `DataFlowStartMessage`, an implementation of a `TransferService` can transfer data from a `sourceDataAddress` to a `destinationDataAddress`. -The `TransferService` does not specify how the transfer should happen. It can be internally in the data plane or it could leverage on external and more performant system. +The `TransferService` does not specify how the transfer should happen. It can be processed internally in the data plane or it could delegate out to external (and more specialized) systems. Relevant methods of the `TransferService` are: From c1fe383064f68103a14e9388bc482506b95802fa Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Tue, 27 Aug 2024 15:23:52 +0200 Subject: [PATCH 5/7] Update developer/wip/for-contributors/data-plane/data-plane-extensions.md Co-authored-by: Paul Latzelsperger <43503240+paullatzelsperger@users.noreply.github.com> --- .../wip/for-contributors/data-plane/data-plane-extensions.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/developer/wip/for-contributors/data-plane/data-plane-extensions.md b/developer/wip/for-contributors/data-plane/data-plane-extensions.md index fad49911..0881894a 100644 --- a/developer/wip/for-contributors/data-plane/data-plane-extensions.md +++ b/developer/wip/for-contributors/data-plane/data-plane-extensions.md @@ -164,7 +164,7 @@ public class FileDataSource implements DataSource { } ``` -The relevant method is the `openPartStream`, which will be called for plumbing the source and sink. The `openPartStream` returns a `Stream` of `Part`s, as the `DataSource` can be composed by more that one part (e.g. folders). The `openPartStream` does not actually opens a Java `InputStream`, but rather returns a just stream of `Part`s. +The relevant method is the `openPartStream`, which will be called for connecting the source and sink. The `openPartStream` returns a `Stream` of `Part` objects, as the `DataSource` can be composed by more that one part (e.g. folders, files, etc.). The `openPartStream` does not actually open a Java `InputStream`, but returns a stream of `Part`s. How to actually transform a `Part` into an `InputStream` must be implemented. In our case the `FileStreamPart#openStream` just returns a `FileInputStream` from the input `File`. From 3303eae9daabb89a0bf67c969760288fdf59697b Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Tue, 27 Aug 2024 15:24:12 +0200 Subject: [PATCH 6/7] Update developer/wip/for-contributors/data-plane/data-plane-extensions.md Co-authored-by: Paul Latzelsperger <43503240+paullatzelsperger@users.noreply.github.com> --- .../wip/for-contributors/data-plane/data-plane-extensions.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/developer/wip/for-contributors/data-plane/data-plane-extensions.md b/developer/wip/for-contributors/data-plane/data-plane-extensions.md index 0881894a..eae58e0b 100644 --- a/developer/wip/for-contributors/data-plane/data-plane-extensions.md +++ b/developer/wip/for-contributors/data-plane/data-plane-extensions.md @@ -166,7 +166,7 @@ public class FileDataSource implements DataSource { The relevant method is the `openPartStream`, which will be called for connecting the source and sink. The `openPartStream` returns a `Stream` of `Part` objects, as the `DataSource` can be composed by more that one part (e.g. folders, files, etc.). The `openPartStream` does not actually open a Java `InputStream`, but returns a stream of `Part`s. -How to actually transform a `Part` into an `InputStream` must be implemented. In our case the `FileStreamPart#openStream` just returns a `FileInputStream` from the input `File`. +Transforming a `Part` into an `InputStream` is the main task of the `DataSource` implementation. In our case the `FileStreamPart#openStream` just returns a `FileInputStream` from the input `File`. Now we have a `DataSource` that can be used for transferring the content of a file. The only missing bit is how to create a `DataSource` for a transfer request. From fd41bdb5d6120e807cb12bee3408aee11b682d6a Mon Sep 17 00:00:00 2001 From: Enrico Risa Date: Tue, 27 Aug 2024 15:38:15 +0200 Subject: [PATCH 7/7] pr remarks --- .../data-plane/data-plane-extensions.md | 58 +++++++++---------- 1 file changed, 29 insertions(+), 29 deletions(-) diff --git a/developer/wip/for-contributors/data-plane/data-plane-extensions.md b/developer/wip/for-contributors/data-plane/data-plane-extensions.md index eae58e0b..11d8b4bb 100644 --- a/developer/wip/for-contributors/data-plane/data-plane-extensions.md +++ b/developer/wip/for-contributors/data-plane/data-plane-extensions.md @@ -2,24 +2,24 @@ * [Data Plane extensions](#data-plane-extensions) - * [1. The `DataPlaneManager`](#the-dataplanemanager`) - * [1.1 Consumer PULL](#consumer-pull-flow) - * [1.2 Provider PUSH](#provider-push-flow) - * [2. The Data Plane Framework ](#the-data-plane-framework) - * [2.1 Transfer Service](#transferservice) - * [2.2 Pipeline Service](#pipelineservice) - * [3. Writing custom Source/Sink ](#writing-custom-sourcesink) - * [3.1 Custom DataSource ](#custom-source) - * [3.2 Custom DataSink ](#custom-datasink) - * [3.3 Executing the transfer ](#executing-the-transfer) + * [1. The `DataPlaneManager`](#1-the-dataplanemanager) + * [1.1 Consumer PULL](#11-consumer-pull-flow) + * [1.2 Provider PUSH](#12-provider-push-flow) + * [2. The Data Plane Framework ](#2-the-data-plane-framework) + * [2.1 Transfer Service](#21-transferservice) + * [2.2 Pipeline Service](#22-pipelineservice) + * [3. Writing custom Source/Sink ](#3-writing-custom-sourcesink) + * [3.1 Custom DataSource ](#31-custom-datasource) + * [3.2 Custom DataSink ](#32-custom-datasink) + * [3.3 Executing the transfer ](#33-executing-the-transfer) -The EDC Data Plane is a component responsible for transmitting data using a wire protocol and can be easily extended using the [Data Plane Framework (DPF)](#the-data-plane-framework) for supporting different protocols and transfer types. +The EDC Data Plane is a component responsible for transmitting data using a wire protocol and can be easily extended using the [Data Plane Framework (DPF)](#2-the-data-plane-framework) for supporting different protocols and transfer types. -The main component of an EDC data plane is the [DataPlaneManager](#the-dataplanemanager). +The main component of an EDC data plane is the [DataPlaneManager](#1-the-dataplanemanager). -## The DataPlaneManager +## 1. The DataPlaneManager The `DataPlaneManager` manages execution of data plane requests, using the EDC [State Machine](../control-plane/programming-primitives.md#1-state-machines) pattern for tracking the state of data transmissions. @@ -27,24 +27,24 @@ It receives `DataFlowStartMessage` from the [Control Plane](../contributor-handb The `DataPlaneManager` supports two flow [types](../control-plane/entities.md#71-transfer-and-data-flows-types): -- [Consumer PULL](#pull-flow) -- [Provider PUSH](#push-flow) +- [Consumer PULL](#11-consumer-pull-flow) +- [Provider PUSH](#12-provider-push-flow) -### Consumer PULL Flow +### 1.1 Consumer PULL Flow When the flow type of the `DataFlowStartMessage` is `PULL` the `DataPlaneManager` delegates the creation of the `DataAddress` to the [`DataPlaneAuthorizationService`](./data-plane-signaling/data-plane-signaling.md#323-access-token-generation), and then returns it to the [ControlPlane](../contributor-handbook.md#2-the-control-plane) as part of the response to a `DataFlowStartMessage`. -### Provider PUSH Flow +### 1.2 Provider PUSH Flow -When the flow type is `PUSH`, the data transmission is handled by the [DPF](#the-data-plane-framework) using the information contained in the `DataFlowStartMessage` such as `sourceDataAddress` and `destinationDataAddress`. +When the flow type is `PUSH`, the data transmission is handled by the [DPF](#2-the-data-plane-framework) using the information contained in the `DataFlowStartMessage` such as `sourceDataAddress` and `destinationDataAddress`. -## The Data Plane Framework +## 2. The Data Plane Framework -The `DPF` consists on a set of SPIs and default implementations for transferring data from a `sourceDataAddress` to a `destinationDataAddress`. It has a built-in support for end-to-end streaming transfers using the [PipelineService](#pipelineservice) and it comes with a more generic [TransferService](#transferservice) that can be extended to satisfy more specialized or optimized transfer case. +The `DPF` consists on a set of SPIs and default implementations for transferring data from a `sourceDataAddress` to a `destinationDataAddress`. It has a built-in support for end-to-end streaming transfers using the [PipelineService](#22-pipelineservice) and it comes with a more generic [TransferService](#21-transferservice) that can be extended to satisfy more specialized or optimized transfer case. Each `TransferService` is registered in the `TransferServiceRegistry`, that the `DataPlaneManager` uses for validating and initiating a data transfer from a `DataFlowStartMessage`. -### TransferService +### 2.1 TransferService Given a `DataFlowStartMessage`, an implementation of a `TransferService` can transfer data from a `sourceDataAddress` to a `destinationDataAddress`. @@ -69,11 +69,11 @@ public interface TransferService { - The `transfer` triggers a data transfer from a `sourceDataAddress` to a `destinationDataAddress`. -An implementation of a `TransferService` bundled with the [DPF](#the-data-plane-framework) is the [PipelineService](#pipelineservice). +An implementation of a `TransferService` bundled with the [DPF](#2-the-data-plane-framework) is the [PipelineService](#pipelineservice). -### PipelineService +### 2.2 PipelineService -The `PipelineService` is an extension of [TransferService](#transferservice) that leverages on internal Data-Plane transfer mechanism. +The `PipelineService` is an extension of [TransferService](#transferservice) that leverages on an internal Data-Plane transfer mechanism. It supports end-to-end streaming by connecting a `DataSink`(output) and a `DataSource` (input). `DataSink` and `DataSource` are created for each data transfer using `DataSinkFactory` and `DataSourceFactory` from the `DataFlowStartMessage`. Custom source and sink factories should be registered in the `PipelineService` for adding support different data source and sink types (e.g. S3, HTTP, Kafka). @@ -92,11 +92,11 @@ When the `PipelineService` receives a transfer request, it identifies which `Dat EDC supports out of the box (with specialized extensions) a variety of data source and sink types like S3, HTTP, Kafka, AzureStorage, but it can be easily [extended](#writing-custom-sourcesink) with new types. -## Writing custom Source/Sink +## 3. Writing custom Source/Sink The `PipelineService` is the entry point for adding new source and sink types to a data plane runtime. -We will see how to write a [custom data source](#custom-datasource), a [custom data sink](#custom-datasink) and how we can trigger a transfer leveraging those new types. +We will see how to write a [custom data source](#31-custom-datasource), a [custom data sink](#32-custom-datasink) and how we can trigger a transfer leveraging those new types. Just as example we will write a custom source type that is based on filesystem and a sink type that is based on [SMTP](https://it.wikipedia.org/wiki/Simple_Mail_Transfer_Protocol) @@ -121,7 +121,7 @@ where we inject the `PipelineService`. > the extension module should include `data-plane-spi` as dependency. -### Custom DataSource +### 3.1 Custom DataSource Just for simplicity the filesystem based `DataSource` will just support transferring a single file and not folders. @@ -221,7 +221,7 @@ public class MyDataPlaneExtension implements ServiceExtension { } ``` -### Custom DataSink +### 3.2 Custom DataSink For the `DataSink` we will sketch an implementation of an SMTP based one using the [javamail](https://javaee.github.io/javamail/) API. The implementation should send the `Part`s of the input `DataSource` as email attachments to a `recipient`. @@ -363,7 +363,7 @@ public class MyDataPlaneExtension implements ServiceExtension { } ``` -### Executing the transfer +### 3.3 Executing the transfer With the `MyDataPlaneExtension` loaded in the provider data plane, that adds a new `source` type based on filesystem and a `sink` in the runtime we can now complete a `File` -> `Mail` transfer.