From c9684a4926e3d76788a76dfb935d177c1c407255 Mon Sep 17 00:00:00 2001 From: idris52 Date: Fri, 25 Aug 2023 15:56:20 +0200 Subject: [PATCH] doc: added comments --- .../ds/catalog/DatasetEntityIngestor.java | 12 +++++++++-- .../common/DataSpaceCatalogIngestorBase.java | 20 +++++++++++++++++++ 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java index ef30bcc1a08..ac6135a450a 100644 --- a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/DatasetEntityIngestor.java @@ -33,11 +33,11 @@ public class DatasetEntityIngestor extends DataSpaceCatalogIngestorBase { Logger log = LoggerFactory.getLogger(this.getClass().getName()); - RestEmitter emitter = RestEmitter.createWithDefaults(); + final String entityType = "dataset"; /*** - * editableDatasetProperties aspect of dataset entity + * editableDatasetProperties aspect of dataset entity - see details on datahub documentation for dataset entity aspects * */ private DatasetProperties _datasetProperties(Asset asset) { var createdAt=new com.linkedin.common.TimeStamp(); @@ -63,12 +63,20 @@ public SchemaMetadata _schemaMetadata(Asset asset) { //todo: This should not be return new SchemaMetadata().setFields(fields); } + /*** + * Returns datahub style urn for an asset - includes `test` as platform, and includes EDC asset name and id within the urn. + * The FabricType is the environment type such as Dev, Prod, etc. + * */ public Urn _urn(Asset asset) throws URISyntaxException { return new DatasetUrn(_platformUrn(entityType), asset.getName()+asset.getId(), FabricType.DEV); } /*** * This method emits whole dataset, with all aspects (defined within) to dataspace catalog. To only ingest/emit a single aspect, see specs. + * In this method, we first create a dataset with a single aspect - datasetProperties Aspect. Then, we create other aspects such as + * schemaMetadata and editableProperties aspects, and ingest them in parallel. + * Usually it can be done sequentially, but this is to show that, if an entity already exists, then aspects can be pushed in parallel as well. + * Since the calls are asynchronous, the datahub api at the receiving end will respond asynchronously. * */ public Urn emitMetadataChangeProposal(Asset asset) throws URISyntaxException, IOException, ExecutionException, InterruptedException { diff --git a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java index 4ef68bfc8a9..e7a7e301b36 100644 --- a/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java +++ b/extensions/control-plane/api/management-api/dataspace-catalog-api/src/main/java/be/imec/edit/ds/catalog/common/DataSpaceCatalogIngestorBase.java @@ -3,9 +3,11 @@ import com.linkedin.common.urn.DataPlatformUrn; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; +import datahub.client.rest.RestEmitter; import datahub.event.MetadataChangeProposalWrapper; import java.io.IOException; import java.net.URISyntaxException; +import java.util.Collections; import java.util.concurrent.ExecutionException; import org.eclipse.edc.spi.types.domain.asset.Asset; import org.slf4j.Logger; @@ -14,6 +16,16 @@ abstract public class DataSpaceCatalogIngestorBase { Logger log = LoggerFactory.getLogger(this.getClass().getName()); + /*** + * To create an emitter that is pushing to datahub instance that is remote (though an IP or url), see RestEmitter class. It has examples for creating emitter with external urls. An example is shown below commented out + * + * */ + protected RestEmitter emitter = RestEmitter.createWithDefaults(); + //protected RestEmitter emitter = RestEmitter.create(b -> b.server("http://localhost:8080")); // todo: replace the `localhost:8080` with ip address or address of the dathub gms. + /*** + * Method to build change proposal for any entity. aspect represents an aspect, e.g. dataSetProperties or Ownership Aspect of dataset, entityType is e.g. dathahub entity `dataset` etc. + * And the urn is `datahub` style urn. + * */ public MetadataChangeProposalWrapper _metadataChangeProposalWrapper(RecordTemplate aspect, String entityType, Urn urn) { return MetadataChangeProposalWrapper.builder() .entityType(entityType) @@ -22,9 +34,17 @@ public MetadataChangeProposalWrapper _metadataChangeProposalWrapper(RecordTempla .aspect(aspect) .build(); } + /** + * At the moment, edc connectors are experimental. The data platforms (bigquery, snowflake, hudi etc.) are unknown and not provided, so we use `test`. But we can + * Use `conf` files to configure this. + * */ public DataPlatformUrn _platformUrn(String entityType) throws URISyntaxException { return DataPlatformUrn.createFromUrn(DataPlatformUrn.createFromTuple(entityType, "test")); } + + /*** + * A method used by subclasses to implement entity specific changeproposals and emit to datahub (data space catalog) + * */ public abstract Urn emitMetadataChangeProposal(Asset asset) throws URISyntaxException, IOException, ExecutionException, InterruptedException;