diff --git a/DEPENDENCIES b/DEPENDENCIES index 315a36a6936..55562f0ccff 100644 --- a/DEPENDENCIES +++ b/DEPENDENCIES @@ -81,7 +81,7 @@ maven/mavencentral/com.lmax/disruptor/3.4.4, Apache-2.0, approved, clearlydefine maven/mavencentral/com.networknt/json-schema-validator/1.0.76, Apache-2.0, approved, CQ22638 maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.28, Apache-2.0, approved, clearlydefined maven/mavencentral/com.nimbusds/nimbus-jose-jwt/9.37.3, Apache-2.0, approved, #11701 -maven/mavencentral/com.puppycrawl.tools/checkstyle/10.14.0, , restricted, clearlydefined +maven/mavencentral/com.puppycrawl.tools/checkstyle/10.14.0, LGPL-2.1-or-later AND (Apache-2.0 AND LGPL-2.1-or-later) AND Apache-2.0, approved, #13562 maven/mavencentral/com.samskivert/jmustache/1.15, BSD-2-Clause, approved, clearlydefined maven/mavencentral/com.squareup.okhttp3/okhttp-dnsoverhttps/4.12.0, Apache-2.0, approved, #11159 maven/mavencentral/com.squareup.okhttp3/okhttp/4.12.0, Apache-2.0, approved, #11156 diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/build.gradle.kts b/extensions/control-plane/transfer/transfer-data-plane-signaling/build.gradle.kts new file mode 100644 index 00000000000..2f8660e3361 --- /dev/null +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/build.gradle.kts @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +plugins { + `java-library` +} + +dependencies { + api(project(":spi:common:core-spi")) + api(project(":spi:control-plane:contract-spi")) + api(project(":spi:control-plane:transfer-spi")) + + api(project(":spi:control-plane:transfer-data-plane-spi")) + api(project(":spi:data-plane:data-plane-spi")) + api(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-client")) + api(project(":spi:data-plane-selector:data-plane-selector-spi")) + + testImplementation(project(":core:common:junit")) +} + +edcBuild { + swagger { + apiGroup.set("control-api") + } +} + + + diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/TransferDataPlaneSignalingExtension.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/TransferDataPlaneSignalingExtension.java new file mode 100644 index 00000000000..8d2ab138cff --- /dev/null +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/TransferDataPlaneSignalingExtension.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.dataplane; + +import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; +import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; +import org.eclipse.edc.connector.transfer.dataplane.flow.DataPlaneSignalingFlowController; +import org.eclipse.edc.connector.transfer.spi.callback.ControlApiUrl; +import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager; +import org.eclipse.edc.runtime.metamodel.annotation.Extension; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.runtime.metamodel.annotation.Setting; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; + +import static org.eclipse.edc.connector.transfer.dataplane.TransferDataPlaneSignalingExtension.NAME; + +@Extension(NAME) +public class TransferDataPlaneSignalingExtension implements ServiceExtension { + + protected static final String NAME = "Transfer Data Plane Signaling Extension"; + + private static final String DEFAULT_DATAPLANE_SELECTOR_STRATEGY = "random"; + + @Setting(value = "Defines strategy for Data Plane instance selection in case Data Plane is not embedded in current runtime", defaultValue = DEFAULT_DATAPLANE_SELECTOR_STRATEGY) + private static final String DPF_SELECTOR_STRATEGY = "edc.dataplane.client.selector.strategy"; + @Inject + private DataFlowManager dataFlowManager; + @Inject(required = false) + private ControlApiUrl callbackUrl; + + @Inject + private DataPlaneSelectorService selectorService; + + @Inject + private DataPlaneClientFactory clientFactory; + + + @Override + public void initialize(ServiceExtensionContext context) { + var selectionStrategy = context.getSetting(DPF_SELECTOR_STRATEGY, DEFAULT_DATAPLANE_SELECTOR_STRATEGY); + dataFlowManager.register(new DataPlaneSignalingFlowController(callbackUrl, selectorService, clientFactory, selectionStrategy)); + } +} diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowController.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowController.java new file mode 100644 index 00000000000..203e581b892 --- /dev/null +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowController.java @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.dataplane.flow; + +import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; +import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; +import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; +import org.eclipse.edc.connector.transfer.spi.callback.ControlApiUrl; +import org.eclipse.edc.connector.transfer.spi.flow.DataFlowController; +import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.spi.response.ResponseStatus; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.types.domain.asset.Asset; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; +import org.jetbrains.annotations.NotNull; + +import java.util.Collection; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.function.Predicate; + +import static java.util.stream.Collectors.toSet; + +public class DataPlaneSignalingFlowController implements DataFlowController { + + private final ControlApiUrl callbackUrl; + private final DataPlaneSelectorService selectorClient; + private final DataPlaneClientFactory clientFactory; + + private final String selectionStrategy; + + public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, DataPlaneClientFactory clientFactory, String selectionStrategy) { + this.callbackUrl = callbackUrl; + this.selectorClient = selectorClient; + this.clientFactory = clientFactory; + this.selectionStrategy = selectionStrategy; + } + + @Override + public boolean canHandle(TransferProcess transferProcess) { + return extractFlowType(transferProcess).succeeded(); + } + + @Override + public @NotNull StatusResult start(TransferProcess transferProcess, Policy policy) { + var flowType = extractFlowType(transferProcess); + if (flowType.failed()) { + return StatusResult.failure(ResponseStatus.FATAL_ERROR, flowType.getFailureDetail()); + } + + var dataPlaneInstance = selectorClient.select(transferProcess.getContentDataAddress(), transferProcess.getDataDestination(), selectionStrategy, transferProcess.getTransferType()); + var dataFlowRequest = DataFlowStartMessage.Builder.newInstance() + .id(UUID.randomUUID().toString()) + .processId(transferProcess.getId()) + .sourceDataAddress(transferProcess.getContentDataAddress()) + .destinationDataAddress(transferProcess.getDataDestination()) + .participantId(policy.getAssignee()) + .agreementId(transferProcess.getContractId()) + .assetId(transferProcess.getAssetId()) + .flowType(flowType.getContent()) + .callbackAddress(callbackUrl != null ? callbackUrl.get() : null) + .build(); + + var dataPlaneInstanceId = dataPlaneInstance != null ? dataPlaneInstance.getId() : null; + + return clientFactory.createClient(dataPlaneInstance) + .start(dataFlowRequest) + .map(it -> DataFlowResponse.Builder.newInstance() + .dataAddress(it.getDataAddress()) + .dataPlaneId(dataPlaneInstanceId) + .build() + ); + } + + @Override + public StatusResult terminate(TransferProcess transferProcess) { + return selectorClient.getAll().stream() + .filter(dataPlaneInstanceFilter(transferProcess)) + .map(clientFactory::createClient) + .map(client -> client.terminate(transferProcess.getId())) + .reduce(StatusResult::merge) + .orElse(StatusResult.failure(ResponseStatus.FATAL_ERROR, "Failed to select the data plane for terminating the transfer process %s".formatted(transferProcess.getId()))); + } + + @Override + public Set transferTypesFor(Asset asset) { + return selectorClient.getAll().stream() + .filter(it -> it.getAllowedSourceTypes().contains(asset.getDataAddress().getType())) + .map(DataPlaneInstance::getAllowedTransferTypes) + .flatMap(Collection::stream) + .collect(toSet()); + } + + private StatusResult extractFlowType(TransferProcess transferProcess) { + return Optional.ofNullable(transferProcess.getTransferType()) + .map(transferType -> transferType.split("-")) + .filter(tokens -> tokens.length == 2) + .map(tokens -> parseFlowType(tokens[1])) + .orElse(StatusResult.failure(ResponseStatus.FATAL_ERROR, "Failed to extract flow type from transferType %s".formatted(transferProcess.getTransferType()))); + + } + + private StatusResult parseFlowType(String flowType) { + try { + return StatusResult.success(FlowType.valueOf(flowType)); + } catch (Exception e) { + return StatusResult.failure(ResponseStatus.FATAL_ERROR, "Unknown flow type %s".formatted(flowType)); + } + } + + private Predicate dataPlaneInstanceFilter(TransferProcess transferProcess) { + if (transferProcess.getDataPlaneId() != null) { + return (dataPlaneInstance -> dataPlaneInstance.getId().equals(transferProcess.getDataPlaneId())); + } else { + return (d) -> true; + } + } +} diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 00000000000..aea37faa375 --- /dev/null +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1,15 @@ +# +# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) +# +# This program and the accompanying materials are made available under the +# terms of the Apache License, Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# +# Contributors: +# Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation +# +# + +org.eclipse.edc.connector.transfer.dataplane.TransferDataPlaneSignalingExtension diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/TransferDataPlaneSignalingExtensionTest.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/TransferDataPlaneSignalingExtensionTest.java new file mode 100644 index 00000000000..c9742495742 --- /dev/null +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/TransferDataPlaneSignalingExtensionTest.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.dataplane; + +import org.eclipse.edc.connector.transfer.dataplane.flow.DataPlaneSignalingFlowController; +import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager; +import org.eclipse.edc.junit.extensions.DependencyInjectionExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +import static org.mockito.ArgumentMatchers.isA; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +@ExtendWith(DependencyInjectionExtension.class) +class TransferDataPlaneSignalingExtensionTest { + + private final DataFlowManager dataFlowManager = mock(); + + @BeforeEach + void setup(ServiceExtensionContext context) { + context.registerService(DataFlowManager.class, dataFlowManager); + } + + @Test + void initialize(ServiceExtensionContext context, TransferDataPlaneSignalingExtension extension) { + extension.initialize(context); + verify(dataFlowManager).register(isA(DataPlaneSignalingFlowController.class)); + } +} diff --git a/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java new file mode 100644 index 00000000000..74bd689e35c --- /dev/null +++ b/extensions/control-plane/transfer/transfer-data-plane-signaling/src/test/java/org/eclipse/edc/connector/transfer/dataplane/flow/DataPlaneSignalingFlowControllerTest.java @@ -0,0 +1,311 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.transfer.dataplane.flow; + +import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService; +import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClient; +import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory; +import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance; +import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse; +import org.eclipse.edc.connector.transfer.spi.types.DataRequest; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcess; +import org.eclipse.edc.junit.assertions.AbstractResultAssert; +import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.spi.response.ResponseStatus; +import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.asset.Asset; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; + +import java.net.URI; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class DataPlaneSignalingFlowControllerTest { + + private static final String HTTP_DATA_PULL = "HttpData-PULL"; + private static final String CUSTOM_PUSH = "Custom-PUSH"; + private final DataPlaneClient dataPlaneClient = mock(); + private final DataPlaneClientFactory dataPlaneClientFactory = mock(); + private final DataPlaneSelectorService selectorService = mock(); + private final DataPlaneSignalingFlowController flowController = + new DataPlaneSignalingFlowController(() -> URI.create("http://localhost"), selectorService, dataPlaneClientFactory, "random"); + + @NotNull + private static DataPlaneInstance.Builder dataPlaneInstanceBuilder() { + return DataPlaneInstance.Builder.newInstance().url("http://any"); + } + + @Test + void canHandle() { + var transferProcess = transferProcess("HttpData", HTTP_DATA_PULL); + var transferProcess1 = transferProcess("Custom", "notHandledFormat"); + var transferProcess2 = transferProcess("Custom", "Custom-INVALID"); + + assertThat(flowController.canHandle(transferProcess)).isTrue(); + assertThat(flowController.canHandle(transferProcess1)).isFalse(); + assertThat(flowController.canHandle(transferProcess2)).isFalse(); + } + + @ParameterizedTest + @ValueSource(strings = { + HTTP_DATA_PULL, + CUSTOM_PUSH, + }) + void initiateFlow_transferSuccess(String transferType) { + var request = createDataRequest(); + var source = testDataAddress(); + var policy = Policy.Builder.newInstance().assignee("participantId").build(); + var transferProcess = TransferProcess.Builder.newInstance() + .dataRequest(createDataRequest()) + .transferType(transferType) + .contentDataAddress(testDataAddress()) + .build(); + + when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(mock(DataFlowResponseMessage.class))); + var dataPlaneInstance = createDataPlaneInstance(); + when(selectorService.select(any(), any(), any(), eq(transferType))).thenReturn(dataPlaneInstance); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + + var result = flowController.start(transferProcess, policy); + + assertThat(result).isSucceeded().extracting(DataFlowResponse::getDataPlaneId).isEqualTo(dataPlaneInstance.getId()); + var captor = ArgumentCaptor.forClass(DataFlowStartMessage.class); + verify(dataPlaneClient).start(captor.capture()); + var captured = captor.getValue(); + assertThat(captured.getProcessId()).isEqualTo(transferProcess.getId()); + assertThat(captured.getSourceDataAddress()).usingRecursiveComparison().isEqualTo(source); + assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(request.getDataDestination()); + assertThat(captured.getParticipantId()).isEqualTo(policy.getAssignee()); + assertThat(captured.getAgreementId()).isEqualTo(transferProcess.getContractId()); + assertThat(captured.getAssetId()).isEqualTo(transferProcess.getAssetId()); + assertThat(transferType).contains(captured.getFlowType().toString()); + assertThat(captured.getProperties()).isEmpty(); + assertThat(captured.getCallbackAddress()).isNotNull(); + } + + @Test + void initiateFlow_transferSuccess_withReturnedDataAddress() { + var policy = Policy.Builder.newInstance().assignee("participantId").build(); + var transferProcess = TransferProcess.Builder.newInstance() + .dataRequest(createDataRequest()) + .transferType(HTTP_DATA_PULL) + .contentDataAddress(testDataAddress()) + .build(); + + var response = mock(DataFlowResponseMessage.class); + when(response.getDataAddress()).thenReturn(DataAddress.Builder.newInstance().type("type").build()); + when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(response)); + var dataPlaneInstance = createDataPlaneInstance(); + when(selectorService.select(any(), any(), any(), eq(HTTP_DATA_PULL))).thenReturn(dataPlaneInstance); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + + var result = flowController.start(transferProcess, policy); + + assertThat(result).isSucceeded() + .satisfies(dataFlowResponse -> { + assertThat(dataFlowResponse.getDataPlaneId()).isEqualTo(dataPlaneInstance.getId()); + assertThat(dataFlowResponse.getDataAddress()).isNotNull(); + }); + } + + @Test + void initiateFlow_transferSuccess_withoutDataPlane() { + var request = createDataRequest(); + var source = testDataAddress(); + var transferProcess = TransferProcess.Builder.newInstance() + .dataRequest(createDataRequest()) + .contentDataAddress(testDataAddress()) + .transferType(HTTP_DATA_PULL) + .build(); + + when(dataPlaneClient.start(any(DataFlowStartMessage.class))).thenReturn(StatusResult.success(mock(DataFlowResponseMessage.class))); + when(selectorService.select(any(), any())).thenReturn(null); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); + + AbstractResultAssert.assertThat(result).isSucceeded().extracting(DataFlowResponse::getDataPlaneId).isNull(); + var captor = ArgumentCaptor.forClass(DataFlowStartMessage.class); + verify(dataPlaneClient).start(captor.capture()); + var captured = captor.getValue(); + assertThat(captured.getProcessId()).isEqualTo(transferProcess.getId()); + assertThat(captured.getSourceDataAddress()).usingRecursiveComparison().isEqualTo(source); + assertThat(captured.getDestinationDataAddress()).usingRecursiveComparison().isEqualTo(request.getDataDestination()); + assertThat(captured.getProperties()).isEmpty(); + assertThat(captured.getCallbackAddress()).isNotNull(); + } + + + @ParameterizedTest + @ValueSource(strings = { + "httppull", + "http-", + "", + }) + void initiateFlow_invalidTransferType(String transferType) { + var transferProcess = TransferProcess.Builder.newInstance() + .dataRequest(createDataRequest()) + .contentDataAddress(testDataAddress()) + .transferType(transferType) + .build(); + + + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains("Failed to extract flow type from transferType %s".formatted(transferType))); + } + + @Test + void initiateFlow_returnFailedResultIfTransferFails() { + var errorMsg = "error"; + var transferProcess = TransferProcess.Builder.newInstance() + .dataRequest(createDataRequest()) + .contentDataAddress(testDataAddress()) + .transferType(HTTP_DATA_PULL) + .build(); + + when(dataPlaneClient.start(any())).thenReturn(StatusResult.failure(ResponseStatus.FATAL_ERROR, errorMsg)); + var dataPlaneInstance = createDataPlaneInstance(); + when(selectorService.select(any(), any())).thenReturn(dataPlaneInstance); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + + var result = flowController.start(transferProcess, Policy.Builder.newInstance().build()); + + verify(dataPlaneClient).start(any()); + + assertThat(result.failed()).isTrue(); + assertThat(result.getFailureMessages()).allSatisfy(s -> assertThat(s).contains(errorMsg)); + } + + @Test + void terminate_shouldCallTerminate() { + var transferProcess = TransferProcess.Builder.newInstance() + .id("transferProcessId") + .dataRequest(createDataRequest()) + .contentDataAddress(testDataAddress()) + .build(); + when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success()); + var dataPlaneInstance = createDataPlaneInstance(); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance)); + + var result = flowController.terminate(transferProcess); + + AbstractResultAssert.assertThat(result).isSucceeded(); + verify(dataPlaneClient).terminate("transferProcessId"); + } + + @Test + void terminate_shouldCallTerminateOnTheRightDataPlane() { + var dataPlaneInstance = createDataPlaneInstance(); + var mockedDataPlane = mock(DataPlaneInstance.class); + var transferProcess = TransferProcess.Builder.newInstance() + .id("transferProcessId") + .dataRequest(createDataRequest()) + .contentDataAddress(testDataAddress()) + .dataPlaneId(dataPlaneInstance.getId()) + .build(); + when(mockedDataPlane.getId()).thenReturn("notValidId"); + when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success()); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance, mockedDataPlane)); + + var result = flowController.terminate(transferProcess); + + AbstractResultAssert.assertThat(result).isSucceeded(); + verify(dataPlaneClient).terminate("transferProcessId"); + verify(mockedDataPlane).getId(); + } + + @Test + void terminate_shouldFail_withInvalidDataPlaneId() { + var dataPlaneInstance = createDataPlaneInstance(); + var transferProcess = TransferProcess.Builder.newInstance() + .id("transferProcessId") + .dataRequest(createDataRequest()) + .contentDataAddress(testDataAddress()) + .dataPlaneId("invalid") + .build(); + when(dataPlaneClient.terminate(any())).thenReturn(StatusResult.success()); + when(dataPlaneClientFactory.createClient(any())).thenReturn(dataPlaneClient); + when(selectorService.getAll()).thenReturn(List.of(dataPlaneInstance)); + + var result = flowController.terminate(transferProcess); + + AbstractResultAssert.assertThat(result).isFailed().detail().contains("Failed to select the data plane for terminating the transfer process"); + } + + @Test + void transferTypes_shouldReturnTypesForSpecifiedAsset() { + when(selectorService.getAll()).thenReturn(List.of( + dataPlaneInstanceBuilder().allowedTransferType("Custom-PUSH").allowedSourceType("TargetSrc").allowedDestType("TargetDest").build(), + dataPlaneInstanceBuilder().allowedTransferType("Custom-PULL").allowedSourceType("TargetSrc").allowedDestType("AnotherTargetDest").build(), + dataPlaneInstanceBuilder().allowedSourceType("AnotherSrc").allowedDestType("ThisWontBeListed").build() + )); + var asset = Asset.Builder.newInstance().dataAddress(DataAddress.Builder.newInstance().type("TargetSrc").build()).build(); + + var transferTypes = flowController.transferTypesFor(asset); + + assertThat(transferTypes).containsExactly("Custom-PUSH", "Custom-PULL"); + } + + private DataPlaneInstance createDataPlaneInstance() { + return dataPlaneInstanceBuilder().build(); + } + + private DataAddress testDataAddress() { + return DataAddress.Builder.newInstance().type("test-type").build(); + } + + private DataRequest createDataRequest() { + return createDataRequest("test"); + } + + private DataRequest createDataRequest(String destinationType) { + return DataRequest.Builder.newInstance() + .id(UUID.randomUUID().toString()) + .protocol("test-protocol") + .contractId(UUID.randomUUID().toString()) + .assetId(UUID.randomUUID().toString()) + .connectorAddress("test.connector.address") + .processId(UUID.randomUUID().toString()) + .destinationType(destinationType) + .build(); + } + + + private TransferProcess transferProcess(String destinationType, String transferType) { + return TransferProcess.Builder.newInstance() + .transferType(transferType) + .dataRequest(DataRequest.Builder.newInstance().destinationType(destinationType).build()) + .build(); + } +} diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java index 7d69fad84b8..142ce2ea17b 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ConsumerPullTransferDataFlowController.java @@ -35,6 +35,7 @@ import static org.eclipse.edc.spi.response.StatusResult.failure; import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PULL; +@Deprecated public class ConsumerPullTransferDataFlowController implements DataFlowController { private final DataPlaneSelectorService selectorService; diff --git a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java index ff69e6f1f17..dffbb00d6f8 100644 --- a/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java +++ b/extensions/control-plane/transfer/transfer-data-plane/src/main/java/org/eclipse/edc/connector/transfer/dataplane/flow/ProviderPushTransferDataFlowController.java @@ -39,6 +39,7 @@ import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PULL; import static org.eclipse.edc.spi.types.domain.transfer.FlowType.PUSH; +@Deprecated public class ProviderPushTransferDataFlowController implements DataFlowController { private final ControlApiUrl callbackUrl; diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api-configuration/src/main/java/org/eclipse/edc/connector/api/signaling/configuration/SignalingApiConfigurationExtension.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api-configuration/src/main/java/org/eclipse/edc/connector/api/signaling/configuration/SignalingApiConfigurationExtension.java index 369c00009dc..da0a86c3a55 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api-configuration/src/main/java/org/eclipse/edc/connector/api/signaling/configuration/SignalingApiConfigurationExtension.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api-configuration/src/main/java/org/eclipse/edc/connector/api/signaling/configuration/SignalingApiConfigurationExtension.java @@ -20,10 +20,11 @@ import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistryImpl; import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataAddressTransformer; import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowResponseMessageTransformer; -import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataAddressTransformer; import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowStartMessageTransformer; import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowSuspendMessageTransformer; import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowTerminateMessageTransformer; +import org.eclipse.edc.core.transform.transformer.to.JsonObjectToDataAddressTransformer; +import org.eclipse.edc.core.transform.transformer.to.JsonValueToGenericTypeTransformer; import org.eclipse.edc.jsonld.spi.JsonLd; import org.eclipse.edc.runtime.metamodel.annotation.Extension; import org.eclipse.edc.runtime.metamodel.annotation.Inject; @@ -107,6 +108,8 @@ public SignalingApiTransformerRegistry managementApiTypeTransformerRegistry() { registry.register(new JsonObjectToDataAddressTransformer()); registry.register(new JsonObjectFromDataFlowResponseMessageTransformer(factory)); registry.register(new JsonObjectFromDataAddressTransformer(factory, getJsonLdMapper())); + registry.register(new JsonValueToGenericTypeTransformer(getJsonLdMapper())); + return registry; } diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiController.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiController.java index a34ab0c1b4e..688b8106fbf 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiController.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/main/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiController.java @@ -27,9 +27,11 @@ import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowSuspendMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage; +import org.eclipse.edc.spi.types.domain.transfer.FlowType; import org.eclipse.edc.transform.spi.TypeTransformerRegistry; import org.eclipse.edc.web.spi.exception.InvalidRequestException; @@ -66,15 +68,19 @@ public JsonObject start(JsonObject dataFlowStartMessage) { new InvalidRequestException("Failed to validate request: %s".formatted(startMsg.getId())) : new InvalidRequestException(f.getMessages())); - monitor.debug("Create EDR"); - var dataAddress = dataPlaneAuthorizationService.createEndpointDataReference(startMsg) - .onFailure(f -> monitor.warning("Error obtaining EDR DataAddress: %s".formatted(f.getFailureDetail()))) - .orElseThrow(InvalidRequestException::new); + var flowResponse = DataFlowResponseMessage.Builder.newInstance(); + if (startMsg.getFlowType().equals(FlowType.PULL)) { + monitor.debug("Create EDR"); + var dataAddress = dataPlaneAuthorizationService.createEndpointDataReference(startMsg) + .onFailure(f -> monitor.warning("Error obtaining EDR DataAddress: %s".formatted(f.getFailureDetail()))) + .orElseThrow(InvalidRequestException::new); + + flowResponse.dataAddress(dataAddress); + } dataPlaneManager.initiate(startMsg); - return typeTransformerRegistry.transform(dataAddress, JsonObject.class) - .onFailure(f -> monitor.warning("Error obtaining EDR DataAddress: %s".formatted(f.getFailureDetail()))) + return typeTransformerRegistry.transform(flowResponse.build(), JsonObject.class) .orElseThrow(f -> new EdcException(f.getFailureDetail())); } diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiControllerTest.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiControllerTest.java index 077550c4ab5..c81ce87615c 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiControllerTest.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-api/src/test/java/org/eclipse/edc/connector/dataplane/api/controller/v1/DataPlaneSignalingApiControllerTest.java @@ -26,6 +26,7 @@ import org.eclipse.edc.spi.response.StatusResult; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowSuspendMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage; @@ -67,6 +68,9 @@ void start() { when(authService.createEndpointDataReference(any())) .thenReturn(success(DataAddress.Builder.newInstance().type("test-edr").build())); + when(transformerRegistry.transform(isA(DataFlowResponseMessage.class), eq(JsonObject.class))) + .thenReturn(success(Json.createObjectBuilder().add("foo", "bar").build())); + when(transformerRegistry.transform(isA(DataAddress.class), eq(JsonObject.class))) .thenReturn(success(Json.createObjectBuilder().add("foo", "bar").build())); @@ -246,7 +250,7 @@ private DataFlowStartMessage createFlowStartMessage() { .assetId("assetId") .agreementId("agreementId") .participantId("participantId") - .flowType(FlowType.PUSH) + .flowType(FlowType.PULL) .callbackAddress(URI.create("http://localhost")) .sourceDataAddress(DataAddress.Builder.newInstance().type("sourceType").build()) .destinationDataAddress(DataAddress.Builder.newInstance().type("destType").build()) diff --git a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientExtension.java b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientExtension.java index 66d35f23f2f..e6194fdcbc3 100644 --- a/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientExtension.java +++ b/extensions/data-plane/data-plane-signaling/data-plane-signaling-client/src/main/java/org/eclipse/edc/connector/dataplane/client/DataPlaneSignalingClientExtension.java @@ -44,6 +44,7 @@ public class DataPlaneSignalingClientExtension implements ServiceExtension { @Inject private SignalingApiTransformerRegistry transformerRegistry; + @Inject private JsonLd jsonLd; @Override diff --git a/settings.gradle.kts b/settings.gradle.kts index b12dea2c7df..87b3b59c74a 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -153,6 +153,7 @@ include(":extensions:control-plane:api:management-api:management-api-test-fixtur include(":extensions:control-plane:api:management-api:policy-definition-api") include(":extensions:control-plane:api:management-api:transfer-process-api") include(":extensions:control-plane:transfer:transfer-data-plane") +include(":extensions:control-plane:transfer:transfer-data-plane-signaling") include(":extensions:control-plane:transfer:transfer-pull-http-receiver") include(":extensions:control-plane:transfer:transfer-pull-http-dynamic-receiver") include(":extensions:control-plane:provision:provision-http") diff --git a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java index 5d8584e2706..8ca84694b0c 100644 --- a/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java +++ b/system-tests/e2e-dataplane-tests/tests/src/test/java/org/eclipse/edc/test/e2e/DataPlaneSignalingApiEndToEndTest.java @@ -21,19 +21,22 @@ import jakarta.json.JsonObject; import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistry; import org.eclipse.edc.connector.api.signaling.transform.SignalingApiTransformerRegistryImpl; -import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataAddressTransformer; import org.eclipse.edc.connector.api.signaling.transform.from.JsonObjectFromDataFlowStartMessageTransformer; import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataAddressTransformer; +import org.eclipse.edc.connector.api.signaling.transform.to.JsonObjectToDataFlowResponseMessageTransformer; import org.eclipse.edc.connector.dataplane.spi.DataFlow; import org.eclipse.edc.connector.dataplane.spi.DataFlowStates; import org.eclipse.edc.connector.dataplane.spi.Endpoint; import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService; import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore; import org.eclipse.edc.core.transform.TypeTransformerRegistryImpl; +import org.eclipse.edc.core.transform.transformer.from.JsonObjectFromDataAddressTransformer; +import org.eclipse.edc.jsonld.spi.JsonLd; import org.eclipse.edc.jsonld.util.JacksonJsonLd; import org.eclipse.edc.junit.annotations.EndToEndTest; import org.eclipse.edc.spi.result.Failure; import org.eclipse.edc.spi.types.domain.DataAddress; +import org.eclipse.edc.spi.types.domain.transfer.DataFlowResponseMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage; import org.eclipse.edc.spi.types.domain.transfer.DataFlowTerminateMessage; import org.eclipse.edc.spi.types.domain.transfer.FlowType; @@ -72,8 +75,9 @@ void setup() { var builderFactory = Json.createBuilderFactory(Map.of()); mapper = JacksonJsonLd.createObjectMapper(); registry.register(new JsonObjectFromDataFlowStartMessageTransformer(builderFactory, mapper)); - registry.register(new JsonObjectFromDataAddressTransformer(builderFactory, mapper)); + registry.register(new JsonObjectFromDataAddressTransformer(builderFactory)); registry.register(new JsonObjectToDataAddressTransformer()); + registry.register(new JsonObjectToDataFlowResponseMessageTransformer()); } @DisplayName("Verify the POST /v1/dataflows endpoint returns the correct EDR") @@ -81,6 +85,7 @@ void setup() { void startTransfer() throws JsonProcessingException { var generator = runtime.getContext().getService(PublicEndpointGeneratorService.class); generator.addGeneratorFunction("HttpData", dataAddress -> Endpoint.url(DATAPLANE_PUBLIC_ENDPOINT_URL)); + var jsonLd = runtime.getContext().getService(JsonLd.class); var processId = "test-processId"; var flowMessage = createStartMessage(processId); @@ -95,9 +100,14 @@ void startTransfer() throws JsonProcessingException { .body(Matchers.notNullValue()) .statusCode(200) .extract().body().asString(); - var dataAddress = registry.transform(mapper.readValue(resultJson, JsonObject.class), DataAddress.class) + + var dataFlowResponseMessage = jsonLd.expand(mapper.readValue(resultJson, JsonObject.class)) + .compose(json -> registry.transform(json, DataFlowResponseMessage.class)) .orElseThrow(failTest()); + + var dataAddress = dataFlowResponseMessage.getDataAddress(); + // verify basic shape of the DSPACE data address (=EDR token) assertThat(dataAddress).isNotNull(); assertThat(dataAddress.getType()).isEqualTo("https://w3id.org/idsa/v4.1/HTTP"); diff --git a/system-tests/e2e-transfer-test/control-plane/build.gradle.kts b/system-tests/e2e-transfer-test/control-plane/build.gradle.kts index a41b7e6d17e..24884e095bb 100644 --- a/system-tests/e2e-transfer-test/control-plane/build.gradle.kts +++ b/system-tests/e2e-transfer-test/control-plane/build.gradle.kts @@ -25,8 +25,6 @@ dependencies { implementation(project(":extensions:common:iam:iam-mock")) implementation(project(":extensions:control-plane:api:control-plane-api")) implementation(project(":extensions:control-plane:api:management-api")) - implementation(project(":extensions:control-plane:transfer:transfer-data-plane")) - implementation(project(":extensions:data-plane:data-plane-client")) implementation(project(":core:data-plane-selector:data-plane-selector-core")) implementation(project(":extensions:data-plane-selector:data-plane-selector-api")) diff --git a/system-tests/e2e-transfer-test/data-plane/build.gradle.kts b/system-tests/e2e-transfer-test/data-plane/build.gradle.kts index 5cfef9f0d75..deaab78072f 100644 --- a/system-tests/e2e-transfer-test/data-plane/build.gradle.kts +++ b/system-tests/e2e-transfer-test/data-plane/build.gradle.kts @@ -24,6 +24,8 @@ dependencies { implementation(project(":extensions:data-plane:data-plane-http-oauth2")) implementation(project(":extensions:data-plane:data-plane-control-api")) implementation(project(":extensions:data-plane:data-plane-public-api")) + implementation(project(":extensions:data-plane:data-plane-public-api-v2")) + implementation(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-api")) implementation(project(":extensions:common:vault:vault-filesystem")) } diff --git a/system-tests/e2e-transfer-test/runner/build.gradle.kts b/system-tests/e2e-transfer-test/runner/build.gradle.kts index d3979d56e70..d35ce738b95 100644 --- a/system-tests/e2e-transfer-test/runner/build.gradle.kts +++ b/system-tests/e2e-transfer-test/runner/build.gradle.kts @@ -19,6 +19,7 @@ plugins { dependencies { testImplementation(project(":spi:control-plane:transfer-spi")) + testImplementation(project(":spi:data-plane:data-plane-spi")) testImplementation(project(":extensions:common:sql:sql-core")) testImplementation(project(":core:common:junit")) diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java index 30af209d84c..13f6e4092c9 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndKafkaTransferTest.java @@ -73,14 +73,12 @@ class EndToEndKafkaTransferTest { private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private static final String KAFKA_SERVER = "localhost:9092"; private static final Duration TIMEOUT = Duration.ofSeconds(60); - private static final String SINK_HTTP_PATH = "/api/service"; private static final String SOURCE_TOPIC = "source_topic"; private static final String SINK_TOPIC = "sink_topic"; private static final int EVENT_DESTINATION_PORT = getFreePort(); private static final JsonNode JSON_MESSAGE = sampleMessage(); private static final AtomicInteger MESSAGE_COUNTER = new AtomicInteger(); - private static final EndToEndTransferParticipant CONSUMER = EndToEndTransferParticipant.Builder.newInstance() .name("consumer") .id("urn:connector:consumer") @@ -89,12 +87,16 @@ class EndToEndKafkaTransferTest { .name("provider") .id("urn:connector:provider") .build(); - + static String[] controlPlaneModules = new String[]{ + ":system-tests:e2e-transfer-test:control-plane", + ":extensions:control-plane:transfer:transfer-data-plane", + ":extensions:data-plane:data-plane-client" + }; @RegisterExtension static EdcRuntimeExtension consumerControlPlane = new EdcRuntimeExtension( - ":system-tests:e2e-transfer-test:control-plane", "consumer-control-plane", - CONSUMER.controlPlaneConfiguration() + CONSUMER.controlPlaneConfiguration(), + controlPlaneModules ); @RegisterExtension @@ -106,9 +108,9 @@ class EndToEndKafkaTransferTest { @RegisterExtension static EdcRuntimeExtension providerControlPlane = new EdcRuntimeExtension( - ":system-tests:e2e-transfer-test:control-plane", "provider-control-plane", - PROVIDER.controlPlaneConfiguration() + PROVIDER.controlPlaneConfiguration(), + controlPlaneModules ); @BeforeAll @@ -116,6 +118,79 @@ public static void setUp() { startKafkaProducer(); } + private static Consumer createKafkaConsumer() { + var props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "runner"); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); + return new KafkaConsumer<>(props); + } + + private static Producer createKafkaProducer() { + var props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); + return new KafkaProducer<>(props); + } + + private static void startKafkaProducer() { + var producer = createKafkaProducer(); + Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( + () -> producer.send(new ProducerRecord<>(SOURCE_TOPIC, String.valueOf(MESSAGE_COUNTER.getAndIncrement()), JSON_MESSAGE)), + 0, 100, MILLISECONDS); + } + + private static JsonObject httpSink() { + return Json.createObjectBuilder() + .add(TYPE, EDC_NAMESPACE + "DataAddress") + .add(EDC_NAMESPACE + "type", "HttpData") + .add(EDC_NAMESPACE + "properties", Json.createObjectBuilder() + .add(EDC_NAMESPACE + "name", "data") + .add(EDC_NAMESPACE + "baseUrl", format("http://localhost:%s", EVENT_DESTINATION_PORT)) + .add(EDC_NAMESPACE + "path", SINK_HTTP_PATH) + .build()) + .build(); + } + + @NotNull + private static JsonObject kafkaSink() { + return Json.createObjectBuilder() + .add(TYPE, EDC_NAMESPACE + "DataAddress") + .add(EDC_NAMESPACE + "type", "Kafka") + .add(EDC_NAMESPACE + "properties", Json.createObjectBuilder() + .add(EDC_NAMESPACE + "topic", SINK_TOPIC) + .add(EDC_NAMESPACE + kafkaProperty("bootstrap.servers"), KAFKA_SERVER) + .build()) + .build(); + } + + @NotNull + private static Map kafkaSourceProperty() { + return Map.of( + "name", "data", + "type", "Kafka", + "topic", SOURCE_TOPIC, + kafkaProperty("bootstrap.servers"), KAFKA_SERVER, + kafkaProperty("max.poll.records"), "100" + ); + } + + private static JsonNode sampleMessage() { + var node = OBJECT_MAPPER.createObjectNode(); + node.put("foo", "bar"); + return node; + } + + private static String kafkaProperty(String property) { + return "kafka." + property; + } + + private static JsonObject noPrivateProperty() { + return Json.createObjectBuilder().build(); + } + @Test void kafkaToHttpTransfer() throws JsonProcessingException { var destinationServer = startClientAndServer(EVENT_DESTINATION_PORT); @@ -185,83 +260,10 @@ void kafkaToKafkaTransfer() { } } - private static Consumer createKafkaConsumer() { - var props = new Properties(); - props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); - props.put(ConsumerConfig.GROUP_ID_CONFIG, "runner"); - props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonDeserializer.class.getName()); - return new KafkaConsumer<>(props); - } - - private static Producer createKafkaProducer() { - var props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JacksonSerializer.class.getName()); - return new KafkaProducer<>(props); - } - - private static void startKafkaProducer() { - var producer = createKafkaProducer(); - Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate( - () -> producer.send(new ProducerRecord<>(SOURCE_TOPIC, String.valueOf(MESSAGE_COUNTER.getAndIncrement()), JSON_MESSAGE)), - 0, 100, MILLISECONDS); - } - private void createResourcesOnProvider(String assetId, Map dataAddressProperties) { PROVIDER.createAsset(assetId, Map.of("description", "description"), dataAddressProperties); var policy = inForceDatePolicy("gteq", "contractAgreement+0s", "lteq", "contractAgreement+10s"); var policyDefinition = PROVIDER.createPolicyDefinition(policy); PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), policyDefinition, policyDefinition); } - - private static JsonObject httpSink() { - return Json.createObjectBuilder() - .add(TYPE, EDC_NAMESPACE + "DataAddress") - .add(EDC_NAMESPACE + "type", "HttpData") - .add(EDC_NAMESPACE + "properties", Json.createObjectBuilder() - .add(EDC_NAMESPACE + "name", "data") - .add(EDC_NAMESPACE + "baseUrl", format("http://localhost:%s", EVENT_DESTINATION_PORT)) - .add(EDC_NAMESPACE + "path", SINK_HTTP_PATH) - .build()) - .build(); - } - - @NotNull - private static JsonObject kafkaSink() { - return Json.createObjectBuilder() - .add(TYPE, EDC_NAMESPACE + "DataAddress") - .add(EDC_NAMESPACE + "type", "Kafka") - .add(EDC_NAMESPACE + "properties", Json.createObjectBuilder() - .add(EDC_NAMESPACE + "topic", SINK_TOPIC) - .add(EDC_NAMESPACE + kafkaProperty("bootstrap.servers"), KAFKA_SERVER) - .build()) - .build(); - } - - @NotNull - private static Map kafkaSourceProperty() { - return Map.of( - "name", "data", - "type", "Kafka", - "topic", SOURCE_TOPIC, - kafkaProperty("bootstrap.servers"), KAFKA_SERVER, - kafkaProperty("max.poll.records"), "100" - ); - } - - private static JsonNode sampleMessage() { - var node = OBJECT_MAPPER.createObjectNode(); - node.put("foo", "bar"); - return node; - } - - private static String kafkaProperty(String property) { - return "kafka." + property; - } - - private static JsonObject noPrivateProperty() { - return Json.createObjectBuilder().build(); - } } diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferInMemoryTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferInMemoryTest.java index 384ecb18e0c..6327ac7d39c 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferInMemoryTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferInMemoryTest.java @@ -24,12 +24,18 @@ @EndToEndTest class EndToEndTransferInMemoryTest extends AbstractEndToEndTransfer { + static String[] controlPlaneModules = new String[]{ + ":system-tests:e2e-transfer-test:control-plane", + ":extensions:control-plane:transfer:transfer-data-plane", + ":extensions:data-plane:data-plane-client" + }; + @RegisterExtension static EdcClassRuntimesExtension runtimes = new EdcClassRuntimesExtension( new EdcRuntimeExtension( - ":system-tests:e2e-transfer-test:control-plane", "consumer-control-plane", - CONSUMER.controlPlaneConfiguration() + CONSUMER.controlPlaneConfiguration(), + controlPlaneModules ), new EdcRuntimeExtension( ":system-tests:e2e-transfer-test:backend-service", @@ -46,9 +52,9 @@ class EndToEndTransferInMemoryTest extends AbstractEndToEndTransfer { PROVIDER.dataPlaneConfiguration() ), new EdcRuntimeExtension( - ":system-tests:e2e-transfer-test:control-plane", "provider-control-plane", - PROVIDER.controlPlaneConfiguration() + PROVIDER.controlPlaneConfiguration(), + controlPlaneModules ), new EdcRuntimeExtension( ":system-tests:e2e-transfer-test:backend-service", diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferPostgresqlTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferPostgresqlTest.java index 7b3ca21de9a..ad277a83c51 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferPostgresqlTest.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/EndToEndTransferPostgresqlTest.java @@ -33,8 +33,10 @@ class EndToEndTransferPostgresqlTest extends AbstractEndToEndTransfer { createDatabase(PROVIDER.getName()); }; - static String[] controlPlanePostgresqlModules = new String[] { + static String[] controlPlanePostgresqlModules = new String[]{ ":system-tests:e2e-transfer-test:control-plane", + ":extensions:control-plane:transfer:transfer-data-plane", + ":extensions:data-plane:data-plane-client", ":extensions:control-plane:store:sql:control-plane-sql", ":extensions:common:sql:sql-pool:sql-pool-apache-commons", ":extensions:common:transaction:transaction-local", @@ -42,7 +44,7 @@ class EndToEndTransferPostgresqlTest extends AbstractEndToEndTransfer { ":extensions:policy-monitor:store:sql:policy-monitor-store-sql" }; - static String[] dataPlanePostgresqlModules = new String[] { + static String[] dataPlanePostgresqlModules = new String[]{ ":system-tests:e2e-transfer-test:data-plane", ":extensions:data-plane:store:sql:data-plane-store-sql", ":extensions:common:sql:sql-pool:sql-pool-apache-commons", diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java index a9f6ba8ce85..e0e39e0014a 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/participant/EndToEndTransferParticipant.java @@ -54,6 +54,7 @@ public class EndToEndTransferParticipant extends Participant { private final URI controlPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); private final URI dataPlaneDefault = URI.create("http://localhost:" + getFreePort()); private final URI dataPlaneControl = URI.create("http://localhost:" + getFreePort() + "/control"); + private final URI dataPlaneSignaling = URI.create("http://localhost:" + getFreePort() + "/signaling"); private final URI dataPlanePublic = URI.create("http://localhost:" + getFreePort() + "/public"); private final URI backendService = URI.create("http://localhost:" + getFreePort()); @@ -149,17 +150,60 @@ public URI backendService() { return backendService; } + public URI publicDataPlane() { + return dataPlanePublic; + } + + /** + * Register a data plane using the old data plane control API URL and no transfer types + */ public void registerDataPlane() { - registerDataPlane(Set.of("HttpData", "HttpProvision", "Kafka"), Set.of("HttpData", "HttpProvision", "HttpProxy", "Kafka")); + registerDataPlane(dataPlaneControl + "/transfer", Set.of()); + } + + /** + * Register a data plane using with input transfer type using the data plane signaling API url + */ + public void registerDataPlane(Set transferTypes) { + registerDataPlane(dataPlaneSignaling + "/v1/dataflows", Set.of("HttpData", "HttpProvision", "Kafka"), Set.of("HttpData", "HttpProvision", "HttpProxy", "Kafka"), transferTypes); + } + + /** + * Register a data plane + * + * @param url The data plane url + * @param transferTypes supported transfer types + */ + public void registerDataPlane(String url, Set transferTypes) { + registerDataPlane(url, Set.of("HttpData", "HttpProvision", "Kafka"), Set.of("HttpData", "HttpProvision", "HttpProxy", "Kafka"), transferTypes); } + /** + * Register a data plane with the old data plane control API url + * + * @param sources The allowed source types + * @param destinations The allowed destination types + */ public void registerDataPlane(Set sources, Set destinations) { + registerDataPlane(dataPlaneControl + "/transfer", sources, destinations, Set.of()); + } + + /** + * Register a data plane + * + * @param url The url of the data plane + * @param sources The allowed source types + * @param destinations The allowed destination types + * @param transferTypes The allowed transfer types + */ + public void registerDataPlane(String url, Set sources, Set destinations, Set transferTypes) { var jsonObject = Json.createObjectBuilder() .add(CONTEXT, createObjectBuilder().add(EDC_PREFIX, EDC_NAMESPACE)) .add(ID, UUID.randomUUID().toString()) - .add(EDC_NAMESPACE + "url", dataPlaneControl + "/transfer") + .add(EDC_NAMESPACE + "url", url) .add(EDC_NAMESPACE + "allowedSourceTypes", createArrayBuilder(sources)) .add(EDC_NAMESPACE + "allowedDestTypes", createArrayBuilder(destinations)) + .add(EDC_NAMESPACE + "allowedTransferTypes", createArrayBuilder(transferTypes)) .add(EDC_NAMESPACE + "properties", createObjectBuilder().add("publicApiUrl", dataPlanePublic.toString())) .build(); @@ -221,10 +265,14 @@ public Map dataPlaneConfiguration() { put("web.http.public.path", "/public"); put("web.http.control.port", String.valueOf(dataPlaneControl.getPort())); put("web.http.control.path", dataPlaneControl.getPath()); + put("web.http.signaling.port", String.valueOf(dataPlaneSignaling.getPort())); + put("web.http.signaling.path", dataPlaneSignaling.getPath()); put("edc.vault", resourceAbsolutePath(getName() + "-vault.properties")); put("edc.keystore", resourceAbsolutePath("certs/cert.pfx")); put("edc.keystore.password", "123456"); put("edc.dataplane.token.validation.endpoint", controlPlaneControl + "/token"); + put("edc.transfer.proxy.token.signer.privatekey.alias", "1"); + put("edc.transfer.proxy.token.verifier.publickey.alias", "public-key"); put("edc.dataplane.http.sink.partition.size", "1"); } }; diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/AbstractSignalingTransfer.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/AbstractSignalingTransfer.java new file mode 100644 index 00000000000..f7bd48eab78 --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/AbstractSignalingTransfer.java @@ -0,0 +1,159 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e.signaling; + +import jakarta.json.Json; +import jakarta.json.JsonObject; +import org.eclipse.edc.test.e2e.participant.EndToEndTransferParticipant; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.time.Duration; +import java.util.Map; +import java.util.Set; +import java.util.UUID; + +import static io.restassured.RestAssured.given; +import static jakarta.json.Json.createObjectBuilder; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; +import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; +import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; +import static org.eclipse.edc.test.system.utils.PolicyFixtures.noConstraintPolicy; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.anyOf; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; + +public abstract class AbstractSignalingTransfer { + + protected static final EndToEndTransferParticipant CONSUMER = EndToEndTransferParticipant.Builder.newInstance() + .name("consumer") + .id("urn:connector:consumer") + .build(); + protected static final EndToEndTransferParticipant PROVIDER = EndToEndTransferParticipant.Builder.newInstance() + .name("provider") + .id("urn:connector:provider") + .build(); + protected final Duration timeout = Duration.ofSeconds(60); + + @Test + @Disabled + void httpPull_dataTransfer() { + registerDataPlanes(); + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); + var dynamicReceiverProps = CONSUMER.dynamicReceiverPrivateProperties(); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, dynamicReceiverProps, syncDataAddress(), "HttpData-PULL"); + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(STARTED.name()); + }); + + // retrieve the data reference + var edr = CONSUMER.getDataReference(transferProcessId); + + // pull the data without query parameter + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of(), equalTo("some information"))); + + // pull the data with additional query parameter + var msg = UUID.randomUUID().toString(); + await().atMost(timeout).untilAsserted(() -> CONSUMER.pullData(edr, Map.of("message", msg), equalTo(msg))); + + // We expect two EDR because in the test runtime we have two receiver extensions static and dynamic one + assertThat(CONSUMER.getAllDataReferences(transferProcessId)).hasSize(2); + } + + @Test + void httpPushDataTransfer() { + registerDataPlanes(); + var assetId = UUID.randomUUID().toString(); + createResourcesOnProvider(assetId, noConstraintPolicy(), httpDataAddressProperties()); + var destination = httpDataAddress(CONSUMER.backendService() + "/api/consumer/store"); + + var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination, "HttpData-PUSH"); + await().atMost(timeout).untilAsserted(() -> { + var state = CONSUMER.getTransferProcessState(transferProcessId); + assertThat(state).isEqualTo(COMPLETED.name()); + + given() + .baseUri(CONSUMER.backendService().toString()) + .when() + .get("/api/consumer/data") + .then() + .statusCode(anyOf(is(200), is(204))) + .body(is(notNullValue())); + }); + } + + private JsonObject httpDataAddress(String baseUrl) { + return createObjectBuilder() + .add(TYPE, EDC_NAMESPACE + "DataAddress") + .add(EDC_NAMESPACE + "type", "HttpData") + .add(EDC_NAMESPACE + "properties", createObjectBuilder() + .add(EDC_NAMESPACE + "baseUrl", baseUrl) + .build()) + .build(); + } + + private JsonObject syncDataAddress() { + return createObjectBuilder() + .add(TYPE, EDC_NAMESPACE + "DataAddress") + .add(EDC_NAMESPACE + "type", "HttpProxy") + .build(); + } + + @NotNull + private Map httpDataAddressOauth2Properties() { + return Map.of( + "name", "transfer-test", + "baseUrl", PROVIDER.backendService() + "/api/provider/oauth2data", + "type", "HttpData", + "proxyQueryParams", "true", + "oauth2:clientId", "clientId", + "oauth2:clientSecretKey", "provision-oauth-secret", + "oauth2:tokenUrl", PROVIDER.backendService() + "/api/oauth2/token" + ); + } + + @NotNull + private Map httpDataAddressProperties() { + return Map.of( + "name", "transfer-test", + "baseUrl", PROVIDER.backendService() + "/api/provider/data", + "type", "HttpData", + "proxyQueryParams", "true" + ); + } + + private void registerDataPlanes() { + PROVIDER.registerDataPlane(Set.of("HttpData-PUSH", "HttpData-PULL")); + } + + private void createResourcesOnProvider(String assetId, JsonObject contractPolicy, Map dataAddressProperties) { + PROVIDER.createAsset(assetId, Map.of("description", "description"), dataAddressProperties); + var accessPolicyId = PROVIDER.createPolicyDefinition(noConstraintPolicy()); + var contractPolicyId = PROVIDER.createPolicyDefinition(contractPolicy); + PROVIDER.createContractDefinition(assetId, UUID.randomUUID().toString(), accessPolicyId, contractPolicyId); + } + + private JsonObject noPrivateProperty() { + return Json.createObjectBuilder().build(); + } +} diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingTransferInMemoryTest.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingTransferInMemoryTest.java new file mode 100644 index 00000000000..fcb64aacaea --- /dev/null +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/signaling/SignalingTransferInMemoryTest.java @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.test.e2e.signaling; + +import org.eclipse.edc.connector.dataplane.spi.Endpoint; +import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService; +import org.eclipse.edc.junit.annotations.EndToEndTest; +import org.eclipse.edc.junit.extensions.EdcClassRuntimesExtension; +import org.eclipse.edc.junit.extensions.EdcRuntimeExtension; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.HashMap; + +@EndToEndTest +class SignalingTransferInMemoryTest extends AbstractSignalingTransfer { + + static String[] controlPlaneModules = new String[]{ + ":system-tests:e2e-transfer-test:control-plane", + ":extensions:control-plane:transfer:transfer-data-plane-signaling", + ":extensions:data-plane:data-plane-signaling:data-plane-signaling-client" + }; + + static String[] dataPlanePostgresqlModules = new String[]{ + ":system-tests:e2e-transfer-test:data-plane", + }; + + + static EdcRuntimeExtension dataPlane = new EdcRuntimeExtension( + "provider-data-plane", + PROVIDER.dataPlaneConfiguration(), + dataPlanePostgresqlModules + ); + + @RegisterExtension + static EdcClassRuntimesExtension runtimes = new EdcClassRuntimesExtension( + new EdcRuntimeExtension( + "consumer-control-plane", + CONSUMER.controlPlaneConfiguration(), + controlPlaneModules + ), + new EdcRuntimeExtension( + ":system-tests:e2e-transfer-test:backend-service", + "consumer-backend-service", + new HashMap<>() { + { + put("web.http.port", String.valueOf(CONSUMER.backendService().getPort())); + } + } + ), + dataPlane, + new EdcRuntimeExtension( + "provider-control-plane", + PROVIDER.controlPlaneConfiguration(), + controlPlaneModules + ), + new EdcRuntimeExtension( + ":system-tests:e2e-transfer-test:backend-service", + "provider-backend-service", + new HashMap<>() { + { + put("web.http.port", String.valueOf(PROVIDER.backendService().getPort())); + } + } + ) + ); + + @BeforeAll + static void setup() { + var generator = dataPlane.getContext().getService(PublicEndpointGeneratorService.class); + generator.addGeneratorFunction("HttpData", dataAddress -> Endpoint.url(PROVIDER.publicDataPlane() + "/v2")); + } +}