diff --git a/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/DatasetResolverImpl.java b/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/DatasetResolverImpl.java index 5bb4bd0e84d..895400b415f 100644 --- a/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/DatasetResolverImpl.java +++ b/core/control-plane/catalog-core/src/main/java/org/eclipse/edc/connector/catalog/DatasetResolverImpl.java @@ -21,7 +21,6 @@ import org.eclipse.edc.connector.contract.spi.offer.ContractDefinitionResolver; import org.eclipse.edc.connector.contract.spi.types.offer.ContractDefinition; import org.eclipse.edc.connector.policy.spi.store.PolicyDefinitionStore; -import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.agent.ParticipantAgent; import org.eclipse.edc.spi.asset.AssetIndex; import org.eclipse.edc.spi.asset.AssetPredicateConverter; @@ -30,12 +29,11 @@ import org.jetbrains.annotations.NotNull; import java.util.List; -import java.util.Objects; +import java.util.Optional; import java.util.function.Predicate; import java.util.stream.Stream; import static java.lang.Integer.MAX_VALUE; -import static java.util.stream.Collectors.toList; public class DatasetResolverImpl implements DatasetResolver { @@ -56,68 +54,47 @@ public DatasetResolverImpl(ContractDefinitionResolver contractDefinitionResolver @Override @NotNull public Stream query(ParticipantAgent agent, QuerySpec querySpec) { - var contractDefinitions = contractDefinitionResolver.definitionsFor(agent).collect(toList()); + var contractDefinitions = contractDefinitionResolver.definitionsFor(agent).toList(); var assetsQuery = QuerySpec.Builder.newInstance().offset(0).limit(MAX_VALUE).filter(querySpec.getFilterExpression()).build(); return assetIndex.queryAssets(assetsQuery) - .map(asset -> { - var offers = contractDefinitions.stream() - .filter(definition -> definition.getAssetsSelector().stream() - .map(predicateConverter::convert) - .reduce(x -> true, Predicate::and) - .test(asset)) - .map(contractDefinition -> createOffer(contractDefinition, asset.getId())) - .filter(Objects::nonNull) - .collect(toList()); - return new ProtoDataset(asset, offers); - }) - .filter(ProtoDataset::hasOffers) + .map(asset -> toDataset(contractDefinitions, asset)) + .filter(Dataset::hasOffers) .skip(querySpec.getOffset()) - .limit(querySpec.getLimit()) - .map(proto -> { - var asset = proto.asset; - var offers = proto.offers; - var distributions = distributionResolver.getDistributions(asset, null); // TODO: data addresses should be retrieved - var datasetBuilder = Dataset.Builder.newInstance() - .distributions(distributions) - .properties(asset.getProperties()); - - offers.forEach(offer -> datasetBuilder.offer(offer.contractId, offer.policy.withTarget(asset.getId()))); - - return datasetBuilder.build(); - }); - } - - private Offer createOffer(ContractDefinition definition, String assetId) { - var policyDefinition = policyDefinitionStore.findById(definition.getContractPolicyId()); - if (policyDefinition == null) { - return null; - } - var contractId = ContractId.createContractId(definition.getId(), assetId); - return new Offer(contractId, policyDefinition.getPolicy()); + .limit(querySpec.getLimit()); } - private static class Offer { - private final String contractId; - private final Policy policy; - - Offer(String contractId, Policy policy) { - this.contractId = contractId; - this.policy = policy; - } + @Override + public Dataset getById(ParticipantAgent agent, String id) { + var contractDefinitions = contractDefinitionResolver.definitionsFor(agent).toList(); + return Optional.of(id) + .map(assetIndex::findById) + .map(asset -> toDataset(contractDefinitions, asset)) + .orElse(null); } - private static class ProtoDataset { - private final Asset asset; - private final List offers; - - private ProtoDataset(Asset asset, List offers) { - this.asset = asset; - this.offers = offers; - } + private Dataset toDataset(List contractDefinitions, Asset asset) { + + var distributions = distributionResolver.getDistributions(asset, null); // TODO: data addresses should be retrieved + var datasetBuilder = Dataset.Builder.newInstance() + .id(asset.getId()) + .distributions(distributions) + .properties(asset.getProperties()); + + contractDefinitions.stream() + .filter(definition -> definition.getAssetsSelector().stream() + .map(predicateConverter::convert) + .reduce(x -> true, Predicate::and) + .test(asset) + ) + .forEach(contractDefinition -> { + var policyDefinition = policyDefinitionStore.findById(contractDefinition.getContractPolicyId()); + if (policyDefinition != null) { + var contractId = ContractId.createContractId(contractDefinition.getId(), asset.getId()); + datasetBuilder.offer(contractId, policyDefinition.getPolicy().withTarget(asset.getId())); + } + }); - boolean hasOffers() { - return offers.size() > 0; - } + return datasetBuilder.build(); } } diff --git a/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DatasetResolverImplPerformanceTest.java b/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DatasetResolverImplPerformanceTest.java index 45dcb693d9a..4c35f3abb6c 100644 --- a/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DatasetResolverImplPerformanceTest.java +++ b/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DatasetResolverImplPerformanceTest.java @@ -51,11 +51,6 @@ class DatasetResolverImplPerformanceTest { private final Clock clock = Clock.systemUTC(); - @NotNull - private static PolicyDefinition.Builder createPolicyDefinition(String id) { - return PolicyDefinition.Builder.newInstance().id(id).policy(Policy.Builder.newInstance().build()); - } - @BeforeEach void setUp(EdcExtension extension) { extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class)); @@ -100,6 +95,11 @@ void fewDefinitionsSelectAllAssets(DatasetResolver datasetResolver, ContractDefi assertThat(lastPageDatasets).hasSize(100); } + @NotNull + private PolicyDefinition.Builder createPolicyDefinition(String id) { + return PolicyDefinition.Builder.newInstance().id(id).policy(Policy.Builder.newInstance().build()); + } + private Stream queryDatasetsIn(DatasetResolver datasetResolver, QuerySpec querySpec, Duration duration) { var start = clock.instant(); var datasets = datasetResolver.query(new ParticipantAgent(emptyMap(), emptyMap()), querySpec); diff --git a/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DatasetResolverImplTest.java b/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DatasetResolverImplTest.java index 406ac218d97..eb593763530 100644 --- a/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DatasetResolverImplTest.java +++ b/core/control-plane/catalog-core/src/test/java/org/eclipse/edc/connector/catalog/DatasetResolverImplTest.java @@ -38,12 +38,9 @@ import org.junit.jupiter.api.Test; import java.util.List; -import java.util.UUID; -import java.util.function.Predicate; import java.util.stream.Stream; import static java.util.Collections.emptyMap; -import static java.util.stream.Collectors.toList; import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.entry; @@ -86,7 +83,7 @@ void query_shouldReturnOneDatasetPerAsset() { var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none()); assertThat(datasets).isNotNull().hasSize(1).first().satisfies(dataset -> { - assertThat(dataset.getId()).matches(isUuid()); + assertThat(dataset.getId()).isEqualTo("assetId"); assertThat(dataset.getDistributions()).hasSize(1).first().isEqualTo(distribution); assertThat(dataset.getOffers()).hasSize(1).allSatisfy((id, policy) -> { assertThat(id).startsWith("definitionId"); @@ -117,14 +114,14 @@ void query_shouldReturnOneDataset_whenMultipleDefinitionsOnSameAsset() { contractDefinitionBuilder("definition1").contractPolicyId("policy1").build(), contractDefinitionBuilder("definition2").contractPolicyId("policy2").build() )); - when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> Stream.of(createAsset("id").build())); + when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> Stream.of(createAsset("assetId").build())); when(policyStore.findById("policy1")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy1).build()); when(policyStore.findById("policy2")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy2).build()); var datasets = datasetResolver.query(createParticipantAgent(), QuerySpec.none()); assertThat(datasets).hasSize(1).first().satisfies(dataset -> { - assertThat(dataset.getId()).matches(isUuid()); + assertThat(dataset.getId()).isEqualTo("assetId"); assertThat(dataset.getOffers()).hasSize(2) .anySatisfy((id, policy) -> { assertThat(id).startsWith("definition1"); @@ -162,7 +159,7 @@ void query_shouldFilterAssetsByPassedCriteria() { void query_shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_contained() { var contractDefinition = contractDefinitionBuilder("definitionId").contractPolicyId("contractPolicyId").build(); var contractPolicy = Policy.Builder.newInstance().build(); - var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).collect(toList()); + var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).toList(); when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(contractDefinition)); when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> assets.stream()); when(policyStore.findById("contractPolicyId")).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build()); @@ -177,7 +174,7 @@ void query_shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_contained() void query_shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_overflowing() { var contractDefinition = contractDefinitionBuilder("definitionId").contractPolicyId("contractPolicyId").build(); var contractPolicy = Policy.Builder.newInstance().build(); - var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).collect(toList()); + var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).toList(); when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of(contractDefinition)); when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> assets.stream()); when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build()); @@ -190,9 +187,9 @@ void query_shouldLimitDataset_whenSingleDefinitionAndMultipleAssets_overflowing( @Test void query_shouldLimitDataset_whenMultipleDefinitionAndMultipleAssets_across() { - var contractDefinitions = range(0, 2).mapToObj(it -> contractDefinitionBuilder(String.valueOf(it)).build()).collect(toList()); + var contractDefinitions = range(0, 2).mapToObj(it -> contractDefinitionBuilder(String.valueOf(it)).build()).toList(); var contractPolicy = Policy.Builder.newInstance().build(); - var assets = range(0, 20).mapToObj(it -> createAsset(String.valueOf(it)).build()).collect(toList()); + var assets = range(0, 20).mapToObj(it -> createAsset(String.valueOf(it)).build()).toList(); when(contractDefinitionResolver.definitionsFor(any())).thenAnswer(it -> contractDefinitions.stream()); when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> assets.stream()); when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build()); @@ -205,9 +202,9 @@ void query_shouldLimitDataset_whenMultipleDefinitionAndMultipleAssets_across() { @Test void query_shouldLimitDataset_whenMultipleDefinitionsWithSameAssets() { - var contractDefinitions = range(0, 2).mapToObj(it -> contractDefinitionBuilder(String.valueOf(it)).build()).collect(toList()); + var contractDefinitions = range(0, 2).mapToObj(it -> contractDefinitionBuilder(String.valueOf(it)).build()).toList(); var contractPolicy = Policy.Builder.newInstance().build(); - var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).collect(toList()); + var assets = range(0, 10).mapToObj(it -> createAsset(String.valueOf(it)).build()).toList(); when(contractDefinitionResolver.definitionsFor(any())).thenAnswer(it -> contractDefinitions.stream()); when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(i -> assets.stream()); when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(contractPolicy).build()); @@ -216,12 +213,52 @@ void query_shouldLimitDataset_whenMultipleDefinitionsWithSameAssets() { var datasets = datasetResolver.query(createParticipantAgent(), querySpec); assertThat(datasets).hasSize(2) - .allSatisfy(dataset -> { - assertThat(dataset.getOffers()).hasSize(2); - }) + .allSatisfy(dataset -> assertThat(dataset.getOffers()).hasSize(2)) .map(getId()).containsExactly("6", "7"); } + @Test + void getById_shouldReturnDataset() { + var policy1 = Policy.Builder.newInstance().type(SET).build(); + var policy2 = Policy.Builder.newInstance().type(OFFER).build(); + when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of( + contractDefinitionBuilder("definition1").contractPolicyId("policy1").build(), + contractDefinitionBuilder("definition2").contractPolicyId("policy2").build() + )); + when(assetIndex.findById(any())).thenReturn(createAsset("datasetId").build()); + when(policyStore.findById("policy1")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy1).build()); + when(policyStore.findById("policy2")).thenReturn(PolicyDefinition.Builder.newInstance().policy(policy2).build()); + var participantAgent = createParticipantAgent(); + + var dataset = datasetResolver.getById(participantAgent, "datasetId"); + + assertThat(dataset).isNotNull(); + assertThat(dataset.getId()).isEqualTo("datasetId"); + assertThat(dataset.getOffers()).hasSize(2) + .anySatisfy((id, policy) -> { + assertThat(id).startsWith("definition1"); + assertThat(policy.getType()).isEqualTo(SET); + }) + .anySatisfy((id, policy) -> { + assertThat(id).startsWith("definition2"); + assertThat(policy.getType()).isEqualTo(OFFER); + }); + verify(assetIndex).findById("datasetId"); + verify(contractDefinitionResolver).definitionsFor(participantAgent); + } + + @Test + void getById_shouldReturnNull_whenAssetNotFound() { + when(contractDefinitionResolver.definitionsFor(any())).thenReturn(Stream.of( + contractDefinitionBuilder("definition1").contractPolicyId("policy1").build() + )); + when(assetIndex.findById(any())).thenReturn(null); + var participantAgent = createParticipantAgent(); + + var dataset = datasetResolver.getById(participantAgent, "datasetId"); + + assertThat(dataset).isNull(); + } private ContractDefinition.Builder contractDefinitionBuilder(String id) { return ContractDefinition.Builder.newInstance() @@ -247,16 +284,4 @@ private ThrowingExtractor getId() { return it -> it.getProperty(Asset.PROPERTY_ID); } - @NotNull - private Predicate isUuid() { - return it -> { - try { - UUID.fromString(it); - return true; - } catch (Exception e) { - return false; - } - }; - } - } diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImpl.java index 652fd031edf..df74cea89f9 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImpl.java @@ -17,6 +17,7 @@ import org.eclipse.edc.catalog.spi.Catalog; import org.eclipse.edc.catalog.spi.CatalogRequestMessage; import org.eclipse.edc.catalog.spi.DataServiceRegistry; +import org.eclipse.edc.catalog.spi.Dataset; import org.eclipse.edc.catalog.spi.DatasetResolver; import org.eclipse.edc.connector.spi.catalog.CatalogProtocolService; import org.eclipse.edc.service.spi.result.ServiceResult; @@ -24,6 +25,7 @@ import org.eclipse.edc.spi.iam.ClaimToken; import org.jetbrains.annotations.NotNull; +import static java.lang.String.format; import static java.util.stream.Collectors.toList; import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; @@ -47,8 +49,8 @@ public CatalogProtocolServiceImpl(DatasetResolver datasetResolver, @Override @NotNull - public ServiceResult getCatalog(CatalogRequestMessage message, ClaimToken token) { - var agent = participantAgentService.createFor(token); + public ServiceResult getCatalog(CatalogRequestMessage message, ClaimToken claimToken) { + var agent = participantAgentService.createFor(claimToken); try (var datasets = datasetResolver.query(agent, message.getQuerySpec())) { var dataServices = dataServiceRegistry.getDataServices(); @@ -62,4 +64,17 @@ public ServiceResult getCatalog(CatalogRequestMessage message, ClaimTok return ServiceResult.success(catalog); } } + + @Override + public @NotNull ServiceResult getDataset(String datasetId, ClaimToken claimToken) { + var agent = participantAgentService.createFor(claimToken); + + var dataset = datasetResolver.getById(agent, datasetId); + + if (dataset == null) { + return ServiceResult.notFound(format("Dataset %s does not exist", datasetId)); + } + + return ServiceResult.success(dataset); + } } diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImplTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImplTest.java index cc463cb9494..3c816067b3c 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImplTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImplTest.java @@ -21,6 +21,7 @@ import org.eclipse.edc.catalog.spi.DatasetResolver; import org.eclipse.edc.catalog.spi.Distribution; import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.service.spi.result.ServiceFailure; import org.eclipse.edc.spi.agent.ParticipantAgent; import org.eclipse.edc.spi.agent.ParticipantAgentService; import org.eclipse.edc.spi.iam.ClaimToken; @@ -34,6 +35,7 @@ import static java.util.Collections.emptyMap; import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; +import static org.eclipse.edc.service.spi.result.ServiceFailure.Reason.NOT_FOUND; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -52,8 +54,8 @@ class CatalogProtocolServiceImplTest { void getCatalog_shouldReturnCatalogWithConnectorDataServiceAndItsDataset() { var querySpec = QuerySpec.none(); var message = CatalogRequestMessage.Builder.newInstance().protocol("protocol").querySpec(querySpec).build(); - var token = ClaimToken.Builder.newInstance().build(); - var participantAgent = new ParticipantAgent(emptyMap(), emptyMap()); + var token = createToken(); + var participantAgent = createParticipantAgent(); var dataService = DataService.Builder.newInstance().build(); when(dataServiceRegistry.getDataServices()).thenReturn(List.of(dataService)); when(datasetResolver.query(any(), any())).thenReturn(Stream.of(createDataset())); @@ -69,7 +71,38 @@ void getCatalog_shouldReturnCatalogWithConnectorDataServiceAndItsDataset() { verify(participantAgentService).createFor(token); } - private static Dataset createDataset() { + @Test + void getDataset_shouldReturnDataset() { + var claimToken = createToken(); + var participantAgent = createParticipantAgent(); + var dataset = createDataset(); + when(participantAgentService.createFor(any())).thenReturn(participantAgent); + when(datasetResolver.getById(any(), any())).thenReturn(dataset); + + var result = service.getDataset("datasetId", claimToken); + + assertThat(result).isSucceeded().isEqualTo(dataset); + verify(participantAgentService).createFor(claimToken); + verify(datasetResolver).getById(participantAgent, "datasetId"); + } + + @Test + void getDataset_shouldFail_whenDatasetIsNull() { + var claimToken = createToken(); + var participantAgent = createParticipantAgent(); + when(participantAgentService.createFor(any())).thenReturn(participantAgent); + when(datasetResolver.getById(any(), any())).thenReturn(null); + + var result = service.getDataset("datasetId", claimToken); + + assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND); + } + + private ParticipantAgent createParticipantAgent() { + return new ParticipantAgent(emptyMap(), emptyMap()); + } + + private Dataset createDataset() { var dataService = DataService.Builder.newInstance().build(); var distribution = Distribution.Builder.newInstance().dataService(dataService).format("any").build(); return Dataset.Builder.newInstance() @@ -77,4 +110,8 @@ private static Dataset createDataset() { .distribution(distribution) .build(); } + + private ClaimToken createToken() { + return ClaimToken.Builder.newInstance().build(); + } } diff --git a/data-protocols/dsp/dsp-catalog/dsp-catalog-api/src/main/java/org/eclipse/edc/protocol/dsp/catalog/api/CatalogApiPaths.java b/data-protocols/dsp/dsp-catalog/dsp-catalog-api/src/main/java/org/eclipse/edc/protocol/dsp/catalog/api/CatalogApiPaths.java index c67683149ce..c1494e3ba8b 100644 --- a/data-protocols/dsp/dsp-catalog/dsp-catalog-api/src/main/java/org/eclipse/edc/protocol/dsp/catalog/api/CatalogApiPaths.java +++ b/data-protocols/dsp/dsp-catalog/dsp-catalog-api/src/main/java/org/eclipse/edc/protocol/dsp/catalog/api/CatalogApiPaths.java @@ -21,5 +21,6 @@ public interface CatalogApiPaths { String BASE_PATH = "/catalog"; String CATALOG_REQUEST = "/request"; - + String DATASET_REQUEST = "/catalog/datasets"; + } diff --git a/data-protocols/dsp/dsp-catalog/dsp-catalog-api/src/main/java/org/eclipse/edc/protocol/dsp/catalog/api/controller/DspCatalogApiController.java b/data-protocols/dsp/dsp-catalog/dsp-catalog-api/src/main/java/org/eclipse/edc/protocol/dsp/catalog/api/controller/DspCatalogApiController.java index 73594baafd4..6045c2505de 100644 --- a/data-protocols/dsp/dsp-catalog/dsp-catalog-api/src/main/java/org/eclipse/edc/protocol/dsp/catalog/api/controller/DspCatalogApiController.java +++ b/data-protocols/dsp/dsp-catalog/dsp-catalog-api/src/main/java/org/eclipse/edc/protocol/dsp/catalog/api/controller/DspCatalogApiController.java @@ -16,9 +16,11 @@ import jakarta.json.JsonObject; import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.GET; import jakarta.ws.rs.HeaderParam; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; import jakarta.ws.rs.Produces; import jakarta.ws.rs.core.Response; import org.eclipse.edc.catalog.spi.CatalogRequestMessage; @@ -39,6 +41,7 @@ import static org.eclipse.edc.jsonld.spi.TypeUtil.isOfExpectedType; import static org.eclipse.edc.protocol.dsp.catalog.api.CatalogApiPaths.BASE_PATH; import static org.eclipse.edc.protocol.dsp.catalog.api.CatalogApiPaths.CATALOG_REQUEST; +import static org.eclipse.edc.protocol.dsp.catalog.api.CatalogApiPaths.DATASET_REQUEST; import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP; import static org.eclipse.edc.protocol.dsp.type.DspCatalogPropertyAndTypeNames.DSPACE_TYPE_CATALOG_ERROR; import static org.eclipse.edc.protocol.dsp.type.DspCatalogPropertyAndTypeNames.DSPACE_TYPE_CATALOG_REQUEST_MESSAGE; @@ -69,7 +72,7 @@ public DspCatalogApiController(Monitor monitor, IdentityService identityService, @POST @Path(CATALOG_REQUEST) - public Response getCatalog(JsonObject jsonObject, @HeaderParam(AUTHORIZATION) String token) { + public Response requestCatalog(JsonObject jsonObject, @HeaderParam(AUTHORIZATION) String token) { monitor.debug(() -> "DSP: Incoming catalog request."); var tokenRepresentation = TokenRepresentation.Builder.newInstance() @@ -119,6 +122,39 @@ public Response getCatalog(JsonObject jsonObject, @HeaderParam(AUTHORIZATION) St .build(); } + @GET + @Path(DATASET_REQUEST + "/{id}") + public Response getDataset(@PathParam("id") String id, @HeaderParam(AUTHORIZATION) String token) { + var tokenRepresentation = TokenRepresentation.Builder.newInstance() + .token(token) + .build(); + + var verificationResult = identityService.verifyJwtToken(tokenRepresentation, dspCallbackAddress); + if (verificationResult.failed()) { + monitor.debug(format("Unauthorized, %s", verificationResult.getFailureMessages())); + return error().unauthorized(); + } + + var datasetResult = service.getDataset(id, verificationResult.getContent()); + if (datasetResult.failed()) { + var errorCode = UUID.randomUUID(); + monitor.warning(format("Error returning dataset, error id %s: %s", errorCode, datasetResult.getFailureMessages())); + return error().message(format("Error code %s", errorCode)).from(datasetResult.getFailure()); + } + + var datasetJson = transformerRegistry.transform(datasetResult.getContent(), JsonObject.class); + if (datasetJson.failed()) { + var errorCode = UUID.randomUUID(); + monitor.warning(format("Error transforming dataset, error id %s: %s", errorCode, datasetJson.getFailureMessages())); + return error().message(format("Error code %s", errorCode)).internalServerError(); + } + + return status(Response.Status.OK) + .type(APPLICATION_JSON) + .entity(datasetJson.getContent()) + .build(); + } + @NotNull private static DspErrorResponse error() { return DspErrorResponse.type(DSPACE_TYPE_CATALOG_ERROR); diff --git a/data-protocols/dsp/dsp-catalog/dsp-catalog-api/src/test/java/org/eclipse/edc/protocol/dsp/catalog/api/controller/DspCatalogApiControllerTest.java b/data-protocols/dsp/dsp-catalog/dsp-catalog-api/src/test/java/org/eclipse/edc/protocol/dsp/catalog/api/controller/DspCatalogApiControllerTest.java index fa2188d3784..d31fdd57e21 100644 --- a/data-protocols/dsp/dsp-catalog/dsp-catalog-api/src/test/java/org/eclipse/edc/protocol/dsp/catalog/api/controller/DspCatalogApiControllerTest.java +++ b/data-protocols/dsp/dsp-catalog/dsp-catalog-api/src/test/java/org/eclipse/edc/protocol/dsp/catalog/api/controller/DspCatalogApiControllerTest.java @@ -15,14 +15,17 @@ package org.eclipse.edc.protocol.dsp.catalog.api.controller; import io.restassured.specification.RequestSpecification; -import jakarta.json.Json; import jakarta.json.JsonObject; import jakarta.ws.rs.core.HttpHeaders; import org.eclipse.edc.catalog.spi.Catalog; import org.eclipse.edc.catalog.spi.CatalogRequestMessage; +import org.eclipse.edc.catalog.spi.DataService; +import org.eclipse.edc.catalog.spi.Dataset; +import org.eclipse.edc.catalog.spi.Distribution; import org.eclipse.edc.connector.spi.catalog.CatalogProtocolService; import org.eclipse.edc.jsonld.spi.JsonLdKeywords; import org.eclipse.edc.junit.annotations.ApiTest; +import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.service.spi.result.ServiceResult; import org.eclipse.edc.spi.iam.ClaimToken; import org.eclipse.edc.spi.iam.IdentityService; @@ -35,11 +38,14 @@ import static io.restassured.RestAssured.given; import static io.restassured.http.ContentType.JSON; +import static jakarta.json.Json.createObjectBuilder; import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.jsonld.spi.PropertyAndTypeNames.DCAT_DATASET_TYPE; import static org.eclipse.edc.protocol.dsp.catalog.api.CatalogApiPaths.BASE_PATH; import static org.eclipse.edc.protocol.dsp.catalog.api.CatalogApiPaths.CATALOG_REQUEST; +import static org.eclipse.edc.protocol.dsp.catalog.api.CatalogApiPaths.DATASET_REQUEST; import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP; import static org.eclipse.edc.protocol.dsp.type.DspCatalogPropertyAndTypeNames.DSPACE_TYPE_CATALOG_ERROR; import static org.eclipse.edc.protocol.dsp.type.DspCatalogPropertyAndTypeNames.DSPACE_TYPE_CATALOG_REQUEST_MESSAGE; @@ -49,10 +55,12 @@ import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; @ApiTest @@ -63,7 +71,7 @@ class DspCatalogApiControllerTest extends RestControllerTestBase { private final TypeTransformerRegistry transformerRegistry = mock(TypeTransformerRegistry.class); private final CatalogProtocolService service = mock(CatalogProtocolService.class); private final String callbackAddress = "http://callback"; - private final JsonObject request = Json.createObjectBuilder() + private final JsonObject request = createObjectBuilder() .add(TYPE, DSPACE_TYPE_CATALOG_REQUEST_MESSAGE) .build(); private final CatalogRequestMessage requestMessage = CatalogRequestMessage.Builder.newInstance() @@ -71,8 +79,8 @@ class DspCatalogApiControllerTest extends RestControllerTestBase { .build(); @Test - void getCatalog_returnCatalog() { - var catalog = Json.createObjectBuilder().add(JsonLdKeywords.TYPE, "catalog").build(); + void requestCatalog_returnCatalog() { + var catalog = createObjectBuilder().add(JsonLdKeywords.TYPE, "catalog").build(); var token = createToken(); when(transformerRegistry.transform(isA(JsonObject.class), eq(CatalogRequestMessage.class))).thenReturn(Result.success(requestMessage)); @@ -96,11 +104,11 @@ void getCatalog_returnCatalog() { } @Test - void getCatalog_invalidTypeInRequest_throwException() { + void requestCatalog_invalidTypeInRequest_throwException() { when(identityService.verifyJwtToken(any(TokenRepresentation.class), eq(callbackAddress))) .thenReturn(Result.success(createToken())); - var invalidRequest = Json.createObjectBuilder() + var invalidRequest = createObjectBuilder() .add(TYPE, "not-a-catalog-request") .build(); @@ -117,7 +125,7 @@ void getCatalog_invalidTypeInRequest_throwException() { } @Test - void getCatalog_transformingRequestFails_throwException() { + void requestCatalog_transformingRequestFails_throwException() { when(identityService.verifyJwtToken(any(TokenRepresentation.class), eq(callbackAddress))) .thenReturn(Result.success(createToken())); when(transformerRegistry.transform(isA(JsonObject.class), eq(CatalogRequestMessage.class))) @@ -136,7 +144,7 @@ void getCatalog_transformingRequestFails_throwException() { } @Test - void getCatalog_authenticationFails_throwException() { + void requestCatalog_authenticationFails_throwException() { when(identityService.verifyJwtToken(any(TokenRepresentation.class), eq(callbackAddress))) .thenReturn(Result.failure("error")); @@ -153,7 +161,7 @@ void getCatalog_authenticationFails_throwException() { } @Test - void getCatalog_shouldForwardServiceError_whenServiceCallFails() { + void requestCatalog_shouldForwardServiceError_whenServiceCallFails() { when(service.getCatalog(any(), any())).thenReturn(badRequest("error")); when(identityService.verifyJwtToken(any(TokenRepresentation.class), eq(callbackAddress))) .thenReturn(Result.success(createToken())); @@ -174,6 +182,81 @@ void getCatalog_shouldForwardServiceError_whenServiceCallFails() { verify(service).getCatalog(any(), any()); } + @Test + void getDataset_shouldGetDataset() { + var claimToken = createToken(); + when(identityService.verifyJwtToken(any(TokenRepresentation.class), any())) + .thenReturn(Result.success(claimToken)); + var dataset = createDataset(); + when(service.getDataset(any(), any())).thenReturn(ServiceResult.success(dataset)); + var responseBody = createObjectBuilder().add(TYPE, DCAT_DATASET_TYPE).build(); + when(transformerRegistry.transform(any(), any())).thenReturn(Result.success(responseBody)); + + baseRequest() + .get(DATASET_REQUEST + "/datasetId") + .then() + .statusCode(200) + .contentType(JSON) + .body(TYPE, is(DCAT_DATASET_TYPE)); + + verify(identityService).verifyJwtToken(argThat(it -> it.getToken().equals("auth")), eq(callbackAddress)); + verify(service).getDataset("datasetId", claimToken); + verify(transformerRegistry).transform(dataset, JsonObject.class); + } + + @Test + void getDataset_shouldReturnUnauthorized_whenAuthorizationFails() { + when(identityService.verifyJwtToken(any(TokenRepresentation.class), any())).thenReturn(Result.failure("unauthorized")); + + baseRequest() + .get(DATASET_REQUEST + "/datasetId") + .then() + .statusCode(401) + .contentType(JSON) + .body(TYPE, is(DSPACE_TYPE_CATALOG_ERROR)) + .body(format("'%s'", DSPACE_PROPERTY_CODE), is("401")) + .body(format("'%s'", DSPACE_PROPERTY_REASON), notNullValue()); + + verifyNoInteractions(service, transformerRegistry); + } + + @Test + void getDataset_shouldReturnNotFound_whenServiceReturnsNotFound() { + when(identityService.verifyJwtToken(any(TokenRepresentation.class), any())) + .thenReturn(Result.success(createToken())); + when(service.getDataset(any(), any())).thenReturn(ServiceResult.notFound("not found")); + + baseRequest() + .get(DATASET_REQUEST + "/datasetId") + .then() + .statusCode(404) + .contentType(JSON) + .body(TYPE, is(DSPACE_TYPE_CATALOG_ERROR)) + .body(format("'%s'", DSPACE_PROPERTY_CODE), is("404")) + .body(format("'%s'", DSPACE_PROPERTY_REASON), notNullValue()); + + verifyNoInteractions(transformerRegistry); + } + + @Test + void getDataset_shouldReturnInternalServerError_whenTransformationFails() { + var claimToken = createToken(); + when(identityService.verifyJwtToken(any(TokenRepresentation.class), any())) + .thenReturn(Result.success(claimToken)); + var dataset = createDataset(); + when(service.getDataset(any(), any())).thenReturn(ServiceResult.success(dataset)); + when(transformerRegistry.transform(any(), any())).thenReturn(Result.failure("error")); + + baseRequest() + .get(DATASET_REQUEST + "/datasetId") + .then() + .statusCode(500) + .contentType(JSON) + .body(TYPE, is(DSPACE_TYPE_CATALOG_ERROR)) + .body(format("'%s'", DSPACE_PROPERTY_CODE), is("500")) + .body(format("'%s'", DSPACE_PROPERTY_REASON), notNullValue()); + } + @Override protected Object controller() { return new DspCatalogApiController(monitor, identityService, transformerRegistry, callbackAddress, service); @@ -187,6 +270,12 @@ private RequestSpecification baseRequest() { .when(); } + private Dataset createDataset() { + var dataService = DataService.Builder.newInstance().build(); + var distribution = Distribution.Builder.newInstance().dataService(dataService).format("format").build(); + return Dataset.Builder.newInstance().distribution(distribution).offer("offerId", Policy.Builder.newInstance().build()).build(); + } + private ClaimToken createToken() { return ClaimToken.Builder.newInstance().build(); } diff --git a/extensions/common/json-ld/src/main/java/org/eclipse/edc/jsonld/transformer/to/JsonObjectToDatasetTransformer.java b/extensions/common/json-ld/src/main/java/org/eclipse/edc/jsonld/transformer/to/JsonObjectToDatasetTransformer.java index c88a67b50c6..eecd5d4bf8e 100644 --- a/extensions/common/json-ld/src/main/java/org/eclipse/edc/jsonld/transformer/to/JsonObjectToDatasetTransformer.java +++ b/extensions/common/json-ld/src/main/java/org/eclipse/edc/jsonld/transformer/to/JsonObjectToDatasetTransformer.java @@ -51,23 +51,20 @@ public JsonObjectToDatasetTransformer() { } private void transformProperties(String key, JsonValue value, Dataset.Builder builder, TransformerContext context) { - if (ODRL_POLICY_ATTRIBUTE.equals(key)) { - transformPolicies(value, builder, context); - } else if (DCAT_DISTRIBUTION_ATTRIBUTE.equals(key)) { - transformArrayOrObject(value, Distribution.class, builder::distribution, context); - } else { - builder.property(key, transformGenericProperty(value, context)); + switch (key) { + case ODRL_POLICY_ATTRIBUTE -> transformPolicies(value, builder, context); + case DCAT_DISTRIBUTION_ATTRIBUTE -> + transformArrayOrObject(value, Distribution.class, builder::distribution, context); + default -> builder.property(key, transformGenericProperty(value, context)); } } private void transformPolicies(JsonValue value, Dataset.Builder builder, TransformerContext context) { - if (value instanceof JsonObject) { - var object = (JsonObject) value; + if (value instanceof JsonObject object) { var id = nodeId(object); var policy = context.transform(object, Policy.class); builder.offer(id, policy); - } else if (value instanceof JsonArray) { - var array = (JsonArray) value; + } else if (value instanceof JsonArray array) { array.forEach(entry -> transformPolicies(entry, builder, context)); } else { context.problem() diff --git a/extensions/common/json-ld/src/test/java/org/eclipse/edc/jsonld/transformer/to/JsonObjectToDatasetTransformerTest.java b/extensions/common/json-ld/src/test/java/org/eclipse/edc/jsonld/transformer/to/JsonObjectToDatasetTransformerTest.java index 38f440e7679..3bebb6b076c 100644 --- a/extensions/common/json-ld/src/test/java/org/eclipse/edc/jsonld/transformer/to/JsonObjectToDatasetTransformerTest.java +++ b/extensions/common/json-ld/src/test/java/org/eclipse/edc/jsonld/transformer/to/JsonObjectToDatasetTransformerTest.java @@ -48,8 +48,8 @@ class JsonObjectToDatasetTransformerTest { private static final String DATASET_ID = "datasetId"; - private JsonBuilderFactory jsonFactory = Json.createBuilderFactory(Map.of()); - private TransformerContext context = mock(TransformerContext.class); + private final JsonBuilderFactory jsonFactory = Json.createBuilderFactory(Map.of()); + private final TransformerContext context = mock(); private JsonObjectToDatasetTransformer transformer; @@ -125,30 +125,6 @@ void transform_datasetWithAdditionalProperty_returnDataset() { verify(context, times(1)).transform(any(JsonValue.class), eq(Object.class)); } - @Test - void transform_invalidType_reportProblem() { - var dataset = jsonFactory.createObjectBuilder() - .add(TYPE, "not-a-dataset") - .build(); - - transformer.transform(getExpanded(dataset), context); - - verify(context, times(1)).reportProblem(anyString()); - } - - @Test - void transform_requiredAttributesMissing_reportProblem() { - var dataset = jsonFactory.createObjectBuilder() - .add(ID, DATASET_ID) - .add(TYPE, DCAT_DATASET_TYPE) - .build(); - - var result = transformer.transform(getExpanded(dataset), context); - - assertThat(result).isNull(); - verify(context, times(1)).reportProblem(anyString()); - } - private JsonObject getJsonObject(String id, String type) { return jsonFactory.createObjectBuilder() .add(ID, id) diff --git a/spi/common/aggregate-service-spi/src/main/java/org/eclipse/edc/service/spi/result/ServiceResult.java b/spi/common/aggregate-service-spi/src/main/java/org/eclipse/edc/service/spi/result/ServiceResult.java index 7492f69efdb..de38d83a306 100644 --- a/spi/common/aggregate-service-spi/src/main/java/org/eclipse/edc/service/spi/result/ServiceResult.java +++ b/spi/common/aggregate-service-spi/src/main/java/org/eclipse/edc/service/spi/result/ServiceResult.java @@ -60,28 +60,24 @@ public static ServiceResult from(StoreResult storeResult) { if (storeResult.succeeded()) { return success(storeResult.getContent()); } - switch (storeResult.reason()) { - case NOT_FOUND: - return notFound(storeResult.getFailureDetail()); - case ALREADY_EXISTS: - return conflict(storeResult.getFailureDetail()); - default: - return badRequest(storeResult.getFailureDetail()); - } + + return switch (storeResult.reason()) { + case NOT_FOUND -> notFound(storeResult.getFailureDetail()); + case ALREADY_EXISTS -> conflict(storeResult.getFailureDetail()); + default -> badRequest(storeResult.getFailureDetail()); + }; } public static ServiceResult fromFailure(StoreResult storeResult) { if (storeResult.succeeded()) { throw new IllegalArgumentException("Can only use this method when the argument is a failed result!"); } - switch (storeResult.reason()) { - case NOT_FOUND: - return notFound(storeResult.getFailureDetail()); - case ALREADY_EXISTS: - return conflict(storeResult.getFailureDetail()); - default: - return badRequest(storeResult.getFailureDetail()); - } + + return switch (storeResult.reason()) { + case NOT_FOUND -> notFound(storeResult.getFailureDetail()); + case ALREADY_EXISTS -> conflict(storeResult.getFailureDetail()); + default -> badRequest(storeResult.getFailureDetail()); + }; } public static ServiceResult unauthorized(List failureMessages) { diff --git a/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/Dataset.java b/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/Dataset.java index 60f1abafcdd..5361c1e4ca4 100644 --- a/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/Dataset.java +++ b/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/Dataset.java @@ -24,7 +24,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import static java.util.UUID.randomUUID; @@ -41,7 +40,7 @@ public class Dataset { /** * Policies under which this Dataset is available. */ - private Map offers; + private final Map offers = new HashMap<>(); /** * Representations of this Dataset. @@ -74,6 +73,10 @@ public Object getProperty(String key) { return properties.get(key); } + public boolean hasOffers() { + return !offers.isEmpty(); + } + @JsonPOJOBuilder(withPrefix = "") public static class Builder { private final Dataset dataset; @@ -92,15 +95,7 @@ public Builder id(String id) { return this; } - public Builder offers(Map offers) { - this.dataset.offers = offers; - return this; - } - public Builder offer(String offerId, Policy policy) { - if (dataset.offers == null) { - dataset.offers = new HashMap<>(); - } dataset.offers.put(offerId, policy); return this; } @@ -136,9 +131,6 @@ public Dataset build() { dataset.id = randomUUID().toString(); } - Objects.requireNonNull(dataset.offers, "At least one offer required for Dataset."); - Objects.requireNonNull(dataset.distributions, "At least one Distribution required for Dataset."); - return dataset; } } diff --git a/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/DatasetResolver.java b/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/DatasetResolver.java index e5c6d687e01..92baf3f459a 100644 --- a/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/DatasetResolver.java +++ b/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/DatasetResolver.java @@ -26,7 +26,7 @@ public interface DatasetResolver { /** - * Resolves {@link Dataset}s given the {@link ParticipantAgent}, a {@link QuerySpec} and a {@link DataService} + * Resolves {@link Dataset}s given the {@link ParticipantAgent} and a {@link QuerySpec} * * @param agent the participant agent that requested the dataset. * @param querySpec the query spec for filtering and pagination. @@ -34,4 +34,13 @@ public interface DatasetResolver { */ @NotNull Stream query(ParticipantAgent agent, QuerySpec querySpec); + + /** + * Resolves a {@link Dataset} given its id + * + * @param participantAgent the participant agent that requested the dataset. + * @param id the dataset id. + * @return the {@link Dataset} if found, null otherwise. + */ + Dataset getById(ParticipantAgent participantAgent, String id); } diff --git a/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/Distribution.java b/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/Distribution.java index 0a9d9c75bfc..91ac2a8fd4d 100644 --- a/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/Distribution.java +++ b/spi/common/catalog-spi/src/main/java/org/eclipse/edc/catalog/spi/Distribution.java @@ -44,7 +44,7 @@ public DataService getDataService() { @JsonPOJOBuilder(withPrefix = "") public static class Builder { - private Distribution distribution; + private final Distribution distribution; private Builder() { distribution = new Distribution(); diff --git a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/catalog/CatalogProtocolService.java b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/catalog/CatalogProtocolService.java index e3d28acd2a8..d031d3a5088 100644 --- a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/catalog/CatalogProtocolService.java +++ b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/catalog/CatalogProtocolService.java @@ -16,6 +16,7 @@ import org.eclipse.edc.catalog.spi.Catalog; import org.eclipse.edc.catalog.spi.CatalogRequestMessage; +import org.eclipse.edc.catalog.spi.Dataset; import org.eclipse.edc.service.spi.result.ServiceResult; import org.eclipse.edc.spi.iam.ClaimToken; import org.jetbrains.annotations.NotNull; @@ -34,4 +35,14 @@ public interface CatalogProtocolService { */ @NotNull ServiceResult getCatalog(CatalogRequestMessage message, ClaimToken token); + + /** + * Returns a dataset given its id and a {@link ClaimToken} + * + * @param datasetId the dataset id. + * @param claimToken the claim token. + * @return succeeded result with the {@link Dataset}, failed result otherwise. + */ + @NotNull + ServiceResult getDataset(String datasetId, ClaimToken claimToken); }