From 224eaf93108a108e6281f1c87ad834c3161a36fb Mon Sep 17 00:00:00 2001 From: ndr_brt Date: Mon, 28 Aug 2023 11:51:42 +0200 Subject: [PATCH] fix: send correlation id as process id in the transfer messages --- .../process/TransferProcessManagerImpl.java | 4 ++-- .../TransferProcessManagerImplTest.java | 19 +++++++++++++------ .../control-plane/build.gradle.kts | 1 + .../data-plane/build.gradle.kts | 1 + .../test/e2e/AbstractEndToEndTransfer.java | 5 +++-- 5 files changed, 20 insertions(+), 10 deletions(-) diff --git a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java index 63bbd983174..12c36802323 100644 --- a/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java +++ b/core/control-plane/transfer-core/src/main/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImpl.java @@ -455,7 +455,7 @@ private boolean processCompleting(TransferProcess process) { var message = TransferCompletionMessage.Builder.newInstance() .protocol(process.getProtocol()) .counterPartyAddress(process.getConnectorAddress()) - .processId(process.getId()) + .processId(process.getCorrelationId()) .policy(policyArchive.findPolicyForContract(process.getContractId())) .build(); @@ -487,7 +487,7 @@ private boolean processTerminating(TransferProcess process) { var message = TransferTerminationMessage.Builder.newInstance() .counterPartyAddress(process.getConnectorAddress()) .protocol(process.getProtocol()) - .processId(process.getId()) + .processId(process.getCorrelationId()) .policy(policyArchive.findPolicyForContract(process.getContractId())) .build(); diff --git a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java index ec01f1e2de4..97fcf8b71ae 100644 --- a/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java +++ b/core/control-plane/transfer-core/src/test/java/org/eclipse/edc/connector/transfer/process/TransferProcessManagerImplTest.java @@ -622,7 +622,7 @@ void started_shouldBreakLeaseIfNotConsumer() { @Test void completing_shouldTransitionToCompleted_whenSendingMessageSucceed() { - var process = createTransferProcess(COMPLETING); + var process = createTransferProcessBuilder(COMPLETING).dataRequest(createDataRequestBuilder().id("correlationId").build()).build(); when(transferProcessStore.nextNotLeased(anyInt(), stateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build()); when(dispatcherRegistry.dispatch(any(), isA(TransferCompletionMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); @@ -630,7 +630,10 @@ void completing_shouldTransitionToCompleted_whenSendingMessageSucceed() { manager.start(); await().untilAsserted(() -> { - verify(dispatcherRegistry).dispatch(any(), isA(TransferCompletionMessage.class)); + var captor = ArgumentCaptor.forClass(TransferCompletionMessage.class); + verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture()); + var message = captor.getValue(); + assertThat(message.getProcessId()).isEqualTo("correlationId"); verify(transferProcessStore, times(RETRY_LIMIT)).save(argThat(p -> p.getState() == COMPLETED.code())); verify(listener).completed(process); }); @@ -638,7 +641,8 @@ void completing_shouldTransitionToCompleted_whenSendingMessageSucceed() { @Test void terminating_shouldTransitionToTerminated_whenMessageSentCorrectly() { - var process = createTransferProcessBuilder(TERMINATING).type(PROVIDER).build(); + var process = createTransferProcessBuilder(TERMINATING).type(PROVIDER) + .dataRequest(createDataRequestBuilder().id("correlationId").build()).build(); when(transferProcessStore.nextNotLeased(anyInt(), stateIs(TERMINATING.code()))).thenReturn(List.of(process)).thenReturn(emptyList()); when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(TERMINATING.code()).build()); when(dispatcherRegistry.dispatch(any(), isA(TransferTerminationMessage.class))).thenReturn(completedFuture(StatusResult.success("any"))); @@ -646,7 +650,10 @@ void terminating_shouldTransitionToTerminated_whenMessageSentCorrectly() { manager.start(); await().untilAsserted(() -> { - verify(dispatcherRegistry).dispatch(any(), isA(TransferTerminationMessage.class)); + var captor = ArgumentCaptor.forClass(TransferTerminationMessage.class); + verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture()); + var message = captor.getValue(); + assertThat(message.getProcessId()).isEqualTo("correlationId"); verify(transferProcessStore, times(RETRY_LIMIT)).save(argThat(p -> p.getState() == TERMINATED.code())); verify(listener).terminated(process); }); @@ -785,7 +792,6 @@ private TransferProcess.Builder createTransferProcessBuilder(TransferProcessStat var processId = UUID.randomUUID().toString(); var dataRequest = createDataRequestBuilder() .processId(processId) - .protocol("protocol") .connectorAddress("http://an/address") .build(); @@ -802,7 +808,8 @@ private DataRequest.Builder createDataRequestBuilder() { .id(UUID.randomUUID().toString()) .contractId(UUID.randomUUID().toString()) .assetId(UUID.randomUUID().toString()) - .destinationType(DESTINATION_TYPE); + .destinationType(DESTINATION_TYPE) + .protocol("protocol"); } private ProvisionedDataDestinationResource provisionedDataDestinationResource() { diff --git a/system-tests/e2e-transfer-test/control-plane/build.gradle.kts b/system-tests/e2e-transfer-test/control-plane/build.gradle.kts index ec80a1c7723..f3dc4636f51 100644 --- a/system-tests/e2e-transfer-test/control-plane/build.gradle.kts +++ b/system-tests/e2e-transfer-test/control-plane/build.gradle.kts @@ -23,6 +23,7 @@ dependencies { implementation(project(":extensions:common:vault:vault-filesystem")) implementation(project(":extensions:common:http")) implementation(project(":extensions:common:iam:iam-mock")) + implementation(project(":extensions:control-plane:api:control-plane-api")) implementation(project(":extensions:control-plane:api:management-api")) implementation(project(":extensions:control-plane:transfer:transfer-data-plane")) implementation(project(":extensions:data-plane:data-plane-client")) diff --git a/system-tests/e2e-transfer-test/data-plane/build.gradle.kts b/system-tests/e2e-transfer-test/data-plane/build.gradle.kts index ffae0b69611..335a1d16c7b 100644 --- a/system-tests/e2e-transfer-test/data-plane/build.gradle.kts +++ b/system-tests/e2e-transfer-test/data-plane/build.gradle.kts @@ -18,6 +18,7 @@ plugins { dependencies { implementation(project(":core:data-plane:data-plane-core")) + implementation(project(":extensions:control-plane:api:control-plane-api-client")) implementation(project(":extensions:data-plane:data-plane-http")) implementation(project(":extensions:data-plane:data-plane-kafka")) implementation(project(":extensions:data-plane:data-plane-http-oauth2")) diff --git a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/AbstractEndToEndTransfer.java b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/AbstractEndToEndTransfer.java index 30e7c716786..4cc18ffd467 100644 --- a/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/AbstractEndToEndTransfer.java +++ b/system-tests/e2e-transfer-test/runner/src/test/java/org/eclipse/edc/test/e2e/AbstractEndToEndTransfer.java @@ -32,6 +32,7 @@ import static java.time.Duration.ofDays; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED; import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED; import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATED; import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE; @@ -147,7 +148,7 @@ void httpPushDataTransfer() { var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination); await().atMost(timeout).untilAsserted(() -> { var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(STARTED.name()); + assertThat(state).isEqualTo(COMPLETED.name()); given() .baseUri(CONSUMER.backendService().toString()) @@ -170,7 +171,7 @@ void httpPushDataTransfer_oauth2Provisioning() { var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination); await().atMost(timeout).untilAsserted(() -> { var state = CONSUMER.getTransferProcessState(transferProcessId); - assertThat(state).isEqualTo(STARTED.name()); + assertThat(state).isEqualTo(COMPLETED.name()); given() .baseUri(CONSUMER.backendService().toString())