Skip to content

Commit

Permalink
fix: send correlation id as process id in the transfer messages
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Aug 28, 2023
1 parent 149710a commit 224eaf9
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ private boolean processCompleting(TransferProcess process) {
var message = TransferCompletionMessage.Builder.newInstance()
.protocol(process.getProtocol())
.counterPartyAddress(process.getConnectorAddress())
.processId(process.getId())
.processId(process.getCorrelationId())
.policy(policyArchive.findPolicyForContract(process.getContractId()))
.build();

Expand Down Expand Up @@ -487,7 +487,7 @@ private boolean processTerminating(TransferProcess process) {
var message = TransferTerminationMessage.Builder.newInstance()
.counterPartyAddress(process.getConnectorAddress())
.protocol(process.getProtocol())
.processId(process.getId())
.processId(process.getCorrelationId())
.policy(policyArchive.findPolicyForContract(process.getContractId()))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,31 +622,38 @@ void started_shouldBreakLeaseIfNotConsumer() {

@Test
void completing_shouldTransitionToCompleted_whenSendingMessageSucceed() {
var process = createTransferProcess(COMPLETING);
var process = createTransferProcessBuilder(COMPLETING).dataRequest(createDataRequestBuilder().id("correlationId").build()).build();
when(transferProcessStore.nextNotLeased(anyInt(), stateIs(COMPLETING.code()))).thenReturn(List.of(process)).thenReturn(emptyList());
when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(COMPLETING.code()).build());
when(dispatcherRegistry.dispatch(any(), isA(TransferCompletionMessage.class))).thenReturn(completedFuture(StatusResult.success("any")));

manager.start();

await().untilAsserted(() -> {
verify(dispatcherRegistry).dispatch(any(), isA(TransferCompletionMessage.class));
var captor = ArgumentCaptor.forClass(TransferCompletionMessage.class);
verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture());
var message = captor.getValue();
assertThat(message.getProcessId()).isEqualTo("correlationId");
verify(transferProcessStore, times(RETRY_LIMIT)).save(argThat(p -> p.getState() == COMPLETED.code()));
verify(listener).completed(process);
});
}

@Test
void terminating_shouldTransitionToTerminated_whenMessageSentCorrectly() {
var process = createTransferProcessBuilder(TERMINATING).type(PROVIDER).build();
var process = createTransferProcessBuilder(TERMINATING).type(PROVIDER)
.dataRequest(createDataRequestBuilder().id("correlationId").build()).build();
when(transferProcessStore.nextNotLeased(anyInt(), stateIs(TERMINATING.code()))).thenReturn(List.of(process)).thenReturn(emptyList());
when(transferProcessStore.findById(process.getId())).thenReturn(process, process.toBuilder().state(TERMINATING.code()).build());
when(dispatcherRegistry.dispatch(any(), isA(TransferTerminationMessage.class))).thenReturn(completedFuture(StatusResult.success("any")));

manager.start();

await().untilAsserted(() -> {
verify(dispatcherRegistry).dispatch(any(), isA(TransferTerminationMessage.class));
var captor = ArgumentCaptor.forClass(TransferTerminationMessage.class);
verify(dispatcherRegistry).dispatch(eq(Object.class), captor.capture());
var message = captor.getValue();
assertThat(message.getProcessId()).isEqualTo("correlationId");
verify(transferProcessStore, times(RETRY_LIMIT)).save(argThat(p -> p.getState() == TERMINATED.code()));
verify(listener).terminated(process);
});
Expand Down Expand Up @@ -785,7 +792,6 @@ private TransferProcess.Builder createTransferProcessBuilder(TransferProcessStat
var processId = UUID.randomUUID().toString();
var dataRequest = createDataRequestBuilder()
.processId(processId)
.protocol("protocol")
.connectorAddress("http://an/address")
.build();

Expand All @@ -802,7 +808,8 @@ private DataRequest.Builder createDataRequestBuilder() {
.id(UUID.randomUUID().toString())
.contractId(UUID.randomUUID().toString())
.assetId(UUID.randomUUID().toString())
.destinationType(DESTINATION_TYPE);
.destinationType(DESTINATION_TYPE)
.protocol("protocol");
}

private ProvisionedDataDestinationResource provisionedDataDestinationResource() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dependencies {
implementation(project(":extensions:common:vault:vault-filesystem"))
implementation(project(":extensions:common:http"))
implementation(project(":extensions:common:iam:iam-mock"))
implementation(project(":extensions:control-plane:api:control-plane-api"))
implementation(project(":extensions:control-plane:api:management-api"))
implementation(project(":extensions:control-plane:transfer:transfer-data-plane"))
implementation(project(":extensions:data-plane:data-plane-client"))
Expand Down
1 change: 1 addition & 0 deletions system-tests/e2e-transfer-test/data-plane/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ plugins {

dependencies {
implementation(project(":core:data-plane:data-plane-core"))
implementation(project(":extensions:control-plane:api:control-plane-api-client"))
implementation(project(":extensions:data-plane:data-plane-http"))
implementation(project(":extensions:data-plane:data-plane-kafka"))
implementation(project(":extensions:data-plane:data-plane-http-oauth2"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import static java.time.Duration.ofDays;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.COMPLETED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.STARTED;
import static org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates.TERMINATED;
import static org.eclipse.edc.jsonld.spi.JsonLdKeywords.TYPE;
Expand Down Expand Up @@ -147,7 +148,7 @@ void httpPushDataTransfer() {
var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination);
await().atMost(timeout).untilAsserted(() -> {
var state = CONSUMER.getTransferProcessState(transferProcessId);
assertThat(state).isEqualTo(STARTED.name());
assertThat(state).isEqualTo(COMPLETED.name());

given()
.baseUri(CONSUMER.backendService().toString())
Expand All @@ -170,7 +171,7 @@ void httpPushDataTransfer_oauth2Provisioning() {
var transferProcessId = CONSUMER.requestAsset(PROVIDER, assetId, noPrivateProperty(), destination);
await().atMost(timeout).untilAsserted(() -> {
var state = CONSUMER.getTransferProcessState(transferProcessId);
assertThat(state).isEqualTo(STARTED.name());
assertThat(state).isEqualTo(COMPLETED.name());

given()
.baseUri(CONSUMER.backendService().toString())
Expand Down

0 comments on commit 224eaf9

Please sign in to comment.