From 143159a3ebbc0e92a3e61e60b41109e5f45c0971 Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Wed, 14 Aug 2024 09:04:56 +0200 Subject: [PATCH 01/13] Almost there --- .../ContractAgreementTerminationService.java | 96 ++++++++++++++++++- .../ContractTerminationObserver.java | 66 +++++++++++++ .../query/TerminateContractQuery.java | 4 +- .../edc/e2e/ContractTerminationTest.java | 96 +++++++++++++++---- 4 files changed, 237 insertions(+), 25 deletions(-) create mode 100644 extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java index 36a151b5b..1dca09cc5 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java @@ -16,27 +16,41 @@ import de.sovity.edc.extension.contacttermination.query.ContractAgreementTerminationDetailsQuery; import de.sovity.edc.extension.contacttermination.query.TerminateContractQuery; +import de.sovity.edc.extension.messenger.SovityMessage; import de.sovity.edc.extension.messenger.SovityMessenger; import lombok.RequiredArgsConstructor; import lombok.val; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.observe.Observable; +import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import org.jooq.DSLContext; +import java.lang.ref.Reference; +import java.lang.ref.WeakReference; import java.time.OffsetDateTime; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import static de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy.COUNTERPARTY; import static de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy.SELF; @RequiredArgsConstructor -public class ContractAgreementTerminationService { +public class ContractAgreementTerminationService implements Observable { private final SovityMessenger sovityMessenger; private final ContractAgreementTerminationDetailsQuery contractAgreementTerminationDetailsQuery; private final TerminateContractQuery terminateContractQuery; private final Monitor monitor; private final String thisParticipantId; + private final List> contractTerminationObservers = + Collections.synchronizedList(new ArrayList<>()); + private final ReentrantLock observersLock = new ReentrantLock(); /** * This is to terminate an EDC's own contract. @@ -46,6 +60,8 @@ public class ContractAgreementTerminationService { */ public OffsetDateTime terminateAgreementOrThrow(DSLContext dsl, ContractTerminationParam termination) { + notifyObservers(ContractTerminationObserver::contractTerminationStartedFromThisInstance); + val details = contractAgreementTerminationDetailsQuery.fetchAgreementDetailsOrThrow(dsl, termination.contractAgreementId()); if (details == null) { @@ -60,6 +76,9 @@ public OffsetDateTime terminateAgreementOrThrow(DSLContext dsl, ContractTerminat notifyTerminationToProvider(details.counterpartyAddress(), termination); + + notifyObservers(it -> it.contractTerminationCompletedOnThisInstance(terminatedAt)); + return terminatedAt; } @@ -68,6 +87,8 @@ public OffsetDateTime terminateCounterpartyAgreement( @Nullable String identity, ContractTerminationParam termination ) { + notifyObservers(ContractTerminationObserver::contractTerminatedByCounterpartyStarted); + val details = contractAgreementTerminationDetailsQuery.fetchAgreementDetailsOrThrow(dsl, termination.contractAgreementId()); if (details == null) { @@ -90,15 +111,84 @@ public OffsetDateTime terminateCounterpartyAgreement( val agent = thisParticipantId.equals(details.counterpartyId()) ? SELF : COUNTERPARTY; - return terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, agent); + val result = terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, agent); + + notifyObservers(ContractTerminationObserver::contractTerminatedByCounterparty); + + return result; } public void notifyTerminationToProvider(String counterPartyAddress, ContractTerminationParam termination) { - sovityMessenger.send( + + notifyObservers(ContractTerminationObserver::contractTerminationOnCounterpartyStarted); + + val future = sovityMessenger.send( + SovityMessage.class, counterPartyAddress, new ContractTerminationMessage( termination.contractAgreementId(), termination.detail(), termination.reason())); + + future.thenAccept(it -> { + notifyObservers(ContractTerminationObserver::contractTerminationCompletedOnCounterpartyInstance); + }); + } + + @Override + public Collection getListeners() { + return contractTerminationObservers.stream().filter(it -> it.get() != null).toList().stream().map(Reference::get).toList(); + } + + @Override + public void registerListener(ContractTerminationObserver listener) { + try { + observersLock.lock(); + + final var refreshed = getFilteredWeakReferences(null); + + contractTerminationObservers.clear(); + contractTerminationObservers.addAll(refreshed); + contractTerminationObservers.add(new WeakReference<>(listener)); + } finally { + observersLock.unlock(); + } + } + + @Override + public void unregisterListener(ContractTerminationObserver listener) { + try { + observersLock.lock(); + + final var refreshed = getFilteredWeakReferences(listener); + + contractTerminationObservers.clear(); + contractTerminationObservers.addAll(refreshed); + } finally { + observersLock.unlock(); + } + } + + private @NotNull List> getFilteredWeakReferences(ContractTerminationObserver listener) { + return contractTerminationObservers.stream().filter(it -> { + val obs = it.get(); + if (obs == null) { + return false; + } + return obs != listener; + }).toList(); + } + + private void notifyObservers(Consumer call) { + for (val weakRef : contractTerminationObservers) { + try { + val observer = weakRef.get(); + if (observer != null) { + call.accept(observer); + } + } catch (Exception e) { + monitor.warning("Failure when notifying contract termination observer."); + } + } } } diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java new file mode 100644 index 000000000..ca4ed3d42 --- /dev/null +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2024 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial API and implementation + * + */ + +package de.sovity.edc.extension.contacttermination; + +import de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy; +import org.eclipse.edc.spi.observe.Observable; + +import java.time.OffsetDateTime; + +import static de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy.SELF; + +public interface ContractTerminationObserver { + + /** + * Indicates that a contract termination was started by this EDC. + */ + default void contractTerminationStartedFromThisInstance() { + } + + /** + * Indicates that the first step to terminate a contract, terminating a contract on this EDC instance itself, was successful. + * The contract is now marked as terminated on this EDC's side. + * + * @param terminatedAt The termination time, as saved in the database. + */ + default void contractTerminationCompletedOnThisInstance(OffsetDateTime terminatedAt) { + } + + /** + * Indicates that a contract termination on the counterparty EDC was started. + */ + default void contractTerminationOnCounterpartyStarted() { + } + + /** + * Indicates that the second step to terminate a contract, terminating a contract on the counterparty EDC, was successful. + * The contract is now marked as terminated on the counterparty EDC's side. + */ + default void contractTerminationCompletedOnCounterpartyInstance() { + } + + /** + * Indicates that a contract termination was started by a counterparty EDC terminated successfully + */ + default void contractTerminatedByCounterpartyStarted() { + } + + /** + * Indicates that a contract termination initiated by a counterparty EDC terminated successfully + * The contract is now marked as terminated on this EDC. + */ + default void contractTerminatedByCounterparty() { + } +} diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/query/TerminateContractQuery.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/query/TerminateContractQuery.java index f404eb92c..5873784b8 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/query/TerminateContractQuery.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/query/TerminateContractQuery.java @@ -31,8 +31,8 @@ public class TerminateContractQuery { public OffsetDateTime terminateConsumerAgreementOrThrow( DSLContext dsl, ContractTerminationParam termination, - ContractTerminatedBy terminatedBy) { - + ContractTerminatedBy terminatedBy + ) { val tooAccurate = OffsetDateTime.now(); val now = tooAccurate.truncatedTo(ChronoUnit.MICROS); diff --git a/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java b/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java index bc4d8bb10..f86340633 100644 --- a/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java +++ b/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java @@ -22,6 +22,8 @@ import de.sovity.edc.client.gen.model.ContractTerminationRequest; import de.sovity.edc.client.gen.model.InitiateTransferRequest; import de.sovity.edc.client.gen.model.TransferHistoryEntry; +import de.sovity.edc.extension.contacttermination.ContractAgreementTerminationService; +import de.sovity.edc.extension.contacttermination.ContractTerminationObserver; import de.sovity.edc.extension.e2e.extension.Consumer; import de.sovity.edc.extension.e2e.extension.E2eScenario; import de.sovity.edc.extension.e2e.extension.E2eTestExtension; @@ -34,10 +36,12 @@ import org.awaitility.Awaitility; import org.eclipse.edc.connector.contract.spi.ContractId; import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; +import org.eclipse.edc.junit.extensions.EdcExtension; import org.junit.jupiter.api.DynamicTest; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestFactory; import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.Mockito; import org.mockserver.integration.ClientAndServer; import org.mockserver.model.HttpRequest; import org.mockserver.model.HttpResponse; @@ -59,6 +63,8 @@ import static org.assertj.core.api.Assertions.within; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.DynamicTest.dynamicTest; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.verify; public class ContractTerminationTest { @@ -70,8 +76,8 @@ public class ContractTerminationTest { void canGetAgreementPageForNonTerminatedContract( E2eScenario scenario, @Consumer EdcClient consumerClient, - @Provider EdcClient providerClient) { - + @Provider EdcClient providerClient + ) { val assets = IntStream.range(0, 3).mapToObj((it) -> scenario.createAsset()); val agreements = assets @@ -118,8 +124,8 @@ void canGetAgreementPageForNonTerminatedContract( void canGetAgreementPageForTerminatedContract( E2eScenario scenario, @Consumer EdcClient consumerClient, - @Provider EdcClient providerClient) { - + @Provider EdcClient providerClient + ) { val assetId = scenario.createAsset(); scenario.createContractDefinition(assetId); scenario.negotiateAssetAndAwait(assetId); @@ -151,7 +157,6 @@ void canTerminateFromConsumer( @Consumer EdcClient consumerClient, @Provider EdcClient providerClient ) { - val assetId = scenario.createAsset(); scenario.createContractDefinition(assetId); val negotiation = scenario.negotiateAssetAndAwait(assetId); @@ -180,8 +185,8 @@ void canTerminateFromConsumer( void limitTheReasonSizeAt100Chars( E2eScenario scenario, @Consumer EdcClient consumerClient, - @Provider EdcClient providerClient) { - + @Provider EdcClient providerClient + ) { val assetId = scenario.createAsset(); scenario.createContractDefinition(assetId); val negotiation = scenario.negotiateAssetAndAwait(assetId); @@ -221,8 +226,8 @@ void limitTheReasonSizeAt100Chars( void limitTheDetailSizeAt1000Chars( E2eScenario scenario, @Consumer EdcClient consumerClient, - @Provider EdcClient providerClient) { - + @Provider EdcClient providerClient + ) { val assetId = scenario.createAsset(); scenario.createContractDefinition(assetId); val negotiation = scenario.negotiateAssetAndAwait(assetId); @@ -264,8 +269,7 @@ void limitTheDetailSizeAt1000Chars( @TestFactory List theDetailsAreMandatory( E2eScenario scenario, - @Consumer EdcClient consumerClient, - @Provider EdcClient providerClient + @Consumer EdcClient consumerClient ) { val invalidDetails = List.of( "", @@ -305,8 +309,8 @@ List theDetailsAreMandatory( void canTerminateFromProvider( E2eScenario scenario, @Consumer EdcClient consumerClient, - @Provider EdcClient providerClient) { - + @Provider EdcClient providerClient + ) { val assetId = scenario.createAsset(); scenario.createContractDefinition(assetId); val negotiation = scenario.negotiateAssetAndAwait(assetId); @@ -337,7 +341,8 @@ void canTerminateFromProvider( @Test void doesntCrashWhenAgreementDoesntExist( - @Consumer EdcClient consumerClient) { + @Consumer EdcClient consumerClient + ) { // act assertThrows( ApiException.class, @@ -353,8 +358,8 @@ void cantTransferDataAfterTerminated( E2eScenario scenario, ClientAndServer mockServer, @Consumer EdcClient consumerClient, - @Provider EdcClient providerClient) { - + @Provider EdcClient providerClient + ) { val assetId = "asset-1"; val mockedAsset = scenario.createAssetWithMockResource(assetId); scenario.createContractDefinition(assetId); @@ -417,8 +422,8 @@ void cantTransferDataAfterTerminated( void canTerminateOnlyOnce( E2eScenario scenario, @Consumer EdcClient consumerClient, - @Provider EdcClient providerClient) { - + @Provider EdcClient providerClient + ) { val assetId = scenario.createAsset(); scenario.createContractDefinition(assetId); val negotiation = scenario.negotiateAssetAndAwait(assetId); @@ -439,12 +444,63 @@ void canTerminateOnlyOnce( assertThat(alreadyExists.getLastUpdatedDate()).isEqualTo(firstTermination.getLastUpdatedDate()); } + @SneakyThrows + @Test + void canListenToTerminationEvents( + E2eScenario scenario, + @Consumer EdcClient consumerClient, + @Consumer EdcExtension consumerExtension, + @Provider EdcClient providerClient, + @Provider EdcExtension providerExtension + ) { + // arrange + val assetId = scenario.createAsset(); + scenario.createContractDefinition(assetId); + val negotiation = scenario.negotiateAssetAndAwait(assetId); + + val detail = "Some detail"; + val reason = "Some reason"; + val contractTerminationRequest = ContractTerminationRequest.builder().detail(detail).reason(reason).build(); + val contractAgreementId = negotiation.getContractAgreementId(); + + val consumerService = consumerExtension.getContext().getService(ContractAgreementTerminationService.class); + val providerService = providerExtension.getContext().getService(ContractAgreementTerminationService.class); + + val consumerObserver = Mockito.spy(new ContractTerminationObserver() { + }); + val providerObserver = Mockito.spy(new ContractTerminationObserver() { + }); + + consumerService.registerListener(consumerObserver); + providerService.registerListener(providerObserver); + + // act + + val termination = consumerClient.uiApi().terminateContractAgreement(contractAgreementId, contractTerminationRequest); + + awaitTerminationCount(consumerClient, 1); + awaitTerminationCount(providerClient, 1); + + Thread.sleep(2000); + + // assert + verify(consumerObserver).contractTerminationStartedFromThisInstance(); + verify(consumerObserver).contractTerminationCompletedOnThisInstance(any()); + verify(consumerObserver).contractTerminationOnCounterpartyStarted(); + + verify(providerObserver).contractTerminatedByCounterpartyStarted(); + verify(providerObserver).contractTerminatedByCounterparty(); + + // TODO: why is the future not recieved..?! +// verify(consumerObserver).contractTerminationCompletedOnCounterpartyInstance(); + } + private static void assertTermination( ContractAgreementPage consumerSideAgreements, String detail, String reason, - ContractTerminatedBy terminatedBy) { - + ContractTerminatedBy terminatedBy + ) { val contractAgreements = consumerSideAgreements.getContractAgreements(); assertThat(contractAgreements).hasSize(1); assertThat(contractAgreements.get(0).getTerminationStatus()).isEqualTo(TERMINATED); From f29c6b2577d4c9af3aade233555d477ef3e23443 Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Wed, 14 Aug 2024 09:21:21 +0200 Subject: [PATCH 02/13] fixes --- .../ContractAgreementTerminationService.java | 9 ++------- .../ContractTerminationObserver.java | 7 ------- .../sovity/edc/e2e/ContractTerminationTest.java | 15 +++++++++++++-- 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java index 1dca09cc5..cdee839f5 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java @@ -74,11 +74,10 @@ public OffsetDateTime terminateAgreementOrThrow(DSLContext dsl, ContractTerminat val terminatedAt = terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, SELF); - notifyTerminationToProvider(details.counterpartyAddress(), termination); - - notifyObservers(it -> it.contractTerminationCompletedOnThisInstance(terminatedAt)); + notifyTerminationToProvider(details.counterpartyAddress(), termination); + return terminatedAt; } @@ -129,10 +128,6 @@ public void notifyTerminationToProvider(String counterPartyAddress, ContractTerm termination.contractAgreementId(), termination.detail(), termination.reason())); - - future.thenAccept(it -> { - notifyObservers(ContractTerminationObserver::contractTerminationCompletedOnCounterpartyInstance); - }); } @Override diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java index ca4ed3d42..eac7ea53b 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java @@ -44,13 +44,6 @@ default void contractTerminationCompletedOnThisInstance(OffsetDateTime terminate default void contractTerminationOnCounterpartyStarted() { } - /** - * Indicates that the second step to terminate a contract, terminating a contract on the counterparty EDC, was successful. - * The contract is now marked as terminated on the counterparty EDC's side. - */ - default void contractTerminationCompletedOnCounterpartyInstance() { - } - /** * Indicates that a contract termination was started by a counterparty EDC terminated successfully */ diff --git a/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java b/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java index f86340633..8522d9a0a 100644 --- a/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java +++ b/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java @@ -484,6 +484,10 @@ void canListenToTerminationEvents( Thread.sleep(2000); // assert + + assertThat(consumerService.getListeners()).hasSize(1); + assertThat(providerService.getListeners()).hasSize(1); + verify(consumerObserver).contractTerminationStartedFromThisInstance(); verify(consumerObserver).contractTerminationCompletedOnThisInstance(any()); verify(consumerObserver).contractTerminationOnCounterpartyStarted(); @@ -491,8 +495,15 @@ void canListenToTerminationEvents( verify(providerObserver).contractTerminatedByCounterpartyStarted(); verify(providerObserver).contractTerminatedByCounterparty(); - // TODO: why is the future not recieved..?! -// verify(consumerObserver).contractTerminationCompletedOnCounterpartyInstance(); + // act + + consumerService.unregisterListener(consumerObserver); + providerService.unregisterListener(providerObserver); + + // assert + + assertThat(consumerService.getListeners()).hasSize(0); + assertThat(providerService.getListeners()).hasSize(0); } private static void assertTermination( From bc3a7222e5d58b4b45e1227e9a01a1c3e4790924 Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Mon, 19 Aug 2024 09:45:00 +0200 Subject: [PATCH 03/13] Add event content --- build.gradle.kts | 2 +- .../ContractAgreementTerminationService.java | 15 ++++++--- .../ContractTerminationEvent.java | 33 +++++++++++++++++++ .../ContractTerminationObserver.java | 17 +++------- .../e2e/AlwaysTrueMigrationReversedTest.java | 14 ++++++++ .../edc/e2e/ContractTerminationTest.java | 30 ++++++++++++++--- 6 files changed, 88 insertions(+), 23 deletions(-) create mode 100644 extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationEvent.java diff --git a/build.gradle.kts b/build.gradle.kts index 9ee6fa69e..d8cb980a8 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -129,7 +129,7 @@ subprojects { tasks.register("printClasspath") { group = libs.versions.edcGroup.get() description = "The EdcRuntimeExtension JUnit Extension requires the gradle task 'printClasspath'" - println(sourceSets.main.get().runtimeClasspath.asPath); + println(sourceSets.main.get().runtimeClasspath.asPath) } java { diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java index cdee839f5..b34c3984d 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java @@ -60,7 +60,8 @@ public class ContractAgreementTerminationService implements Observable it.contractTerminationStartedFromThisInstance(starterEvent)); val details = contractAgreementTerminationDetailsQuery.fetchAgreementDetailsOrThrow(dsl, termination.contractAgreementId()); @@ -74,7 +75,8 @@ public OffsetDateTime terminateAgreementOrThrow(DSLContext dsl, ContractTerminat val terminatedAt = terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, SELF); - notifyObservers(it -> it.contractTerminationCompletedOnThisInstance(terminatedAt)); + val endEvent = ContractTerminationEvent.from(termination, terminatedAt); + notifyObservers(it -> it.contractTerminationCompletedOnThisInstance(endEvent)); notifyTerminationToProvider(details.counterpartyAddress(), termination); @@ -86,7 +88,8 @@ public OffsetDateTime terminateCounterpartyAgreement( @Nullable String identity, ContractTerminationParam termination ) { - notifyObservers(ContractTerminationObserver::contractTerminatedByCounterpartyStarted); + val starterEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now()); + notifyObservers(it -> it.contractTerminatedByCounterpartyStarted(starterEvent)); val details = contractAgreementTerminationDetailsQuery.fetchAgreementDetailsOrThrow(dsl, termination.contractAgreementId()); @@ -112,14 +115,16 @@ public OffsetDateTime terminateCounterpartyAgreement( val result = terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, agent); - notifyObservers(ContractTerminationObserver::contractTerminatedByCounterparty); + val endEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now()); + notifyObservers(it -> it.contractTerminatedByCounterparty(endEvent)); return result; } public void notifyTerminationToProvider(String counterPartyAddress, ContractTerminationParam termination) { - notifyObservers(ContractTerminationObserver::contractTerminationOnCounterpartyStarted); + val notificationEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now()); + notifyObservers(it -> it.contractTerminationOnCounterpartyStarted(notificationEvent)); val future = sovityMessenger.send( SovityMessage.class, diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationEvent.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationEvent.java new file mode 100644 index 000000000..fb5baadae --- /dev/null +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationEvent.java @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2024 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial API and implementation + * + */ + +package de.sovity.edc.extension.contacttermination; + +import java.time.OffsetDateTime; + +public record ContractTerminationEvent( + String contractAgreementId, + String detail, + String reason, + OffsetDateTime timestamp +) { + public static ContractTerminationEvent from(ContractTerminationParam contractTerminationParam, OffsetDateTime dateTime) { + return new ContractTerminationEvent( + contractTerminationParam.contractAgreementId(), + contractTerminationParam.detail(), + contractTerminationParam.reason(), + dateTime + ); + } +} diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java index eac7ea53b..aae938f51 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java @@ -14,46 +14,39 @@ package de.sovity.edc.extension.contacttermination; -import de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy; -import org.eclipse.edc.spi.observe.Observable; - import java.time.OffsetDateTime; -import static de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy.SELF; - public interface ContractTerminationObserver { /** * Indicates that a contract termination was started by this EDC. */ - default void contractTerminationStartedFromThisInstance() { + default void contractTerminationStartedFromThisInstance(ContractTerminationEvent contractTerminationEvent) { } /** * Indicates that the first step to terminate a contract, terminating a contract on this EDC instance itself, was successful. * The contract is now marked as terminated on this EDC's side. - * - * @param terminatedAt The termination time, as saved in the database. */ - default void contractTerminationCompletedOnThisInstance(OffsetDateTime terminatedAt) { + default void contractTerminationCompletedOnThisInstance(ContractTerminationEvent contractTerminationEvent) { } /** * Indicates that a contract termination on the counterparty EDC was started. */ - default void contractTerminationOnCounterpartyStarted() { + default void contractTerminationOnCounterpartyStarted(ContractTerminationEvent contractTerminationEvent) { } /** * Indicates that a contract termination was started by a counterparty EDC terminated successfully */ - default void contractTerminatedByCounterpartyStarted() { + default void contractTerminatedByCounterpartyStarted(ContractTerminationEvent contractTerminationEvent) { } /** * Indicates that a contract termination initiated by a counterparty EDC terminated successfully * The contract is now marked as terminated on this EDC. */ - default void contractTerminatedByCounterparty() { + default void contractTerminatedByCounterparty(ContractTerminationEvent contractTerminationEvent) { } } diff --git a/tests/src/test/java/de/sovity/edc/e2e/AlwaysTrueMigrationReversedTest.java b/tests/src/test/java/de/sovity/edc/e2e/AlwaysTrueMigrationReversedTest.java index 525e5669d..e9347079d 100644 --- a/tests/src/test/java/de/sovity/edc/e2e/AlwaysTrueMigrationReversedTest.java +++ b/tests/src/test/java/de/sovity/edc/e2e/AlwaysTrueMigrationReversedTest.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2024 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial API and implementation + * + */ + package de.sovity.edc.e2e; import de.sovity.edc.client.EdcClient; diff --git a/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java b/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java index 8522d9a0a..ba5179917 100644 --- a/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java +++ b/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java @@ -23,6 +23,7 @@ import de.sovity.edc.client.gen.model.InitiateTransferRequest; import de.sovity.edc.client.gen.model.TransferHistoryEntry; import de.sovity.edc.extension.contacttermination.ContractAgreementTerminationService; +import de.sovity.edc.extension.contacttermination.ContractTerminationEvent; import de.sovity.edc.extension.contacttermination.ContractTerminationObserver; import de.sovity.edc.extension.e2e.extension.Consumer; import de.sovity.edc.extension.e2e.extension.E2eScenario; @@ -41,6 +42,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestFactory; import org.junit.jupiter.api.extension.RegisterExtension; +import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import org.mockserver.integration.ClientAndServer; import org.mockserver.model.HttpRequest; @@ -476,7 +478,7 @@ void canListenToTerminationEvents( // act - val termination = consumerClient.uiApi().terminateContractAgreement(contractAgreementId, contractTerminationRequest); + consumerClient.uiApi().terminateContractAgreement(contractAgreementId, contractTerminationRequest); awaitTerminationCount(consumerClient, 1); awaitTerminationCount(providerClient, 1); @@ -488,12 +490,22 @@ void canListenToTerminationEvents( assertThat(consumerService.getListeners()).hasSize(1); assertThat(providerService.getListeners()).hasSize(1); - verify(consumerObserver).contractTerminationStartedFromThisInstance(); + ArgumentCaptor argument = ArgumentCaptor.forClass(ContractTerminationEvent.class); + + verify(consumerObserver).contractTerminationStartedFromThisInstance(argument.capture()); + assertTerminationEvent(argument, contractAgreementId, contractTerminationRequest); + verify(consumerObserver).contractTerminationCompletedOnThisInstance(any()); - verify(consumerObserver).contractTerminationOnCounterpartyStarted(); + assertTerminationEvent(argument, contractAgreementId, contractTerminationRequest); + + verify(consumerObserver).contractTerminationOnCounterpartyStarted(any()); + assertTerminationEvent(argument, contractAgreementId, contractTerminationRequest); - verify(providerObserver).contractTerminatedByCounterpartyStarted(); - verify(providerObserver).contractTerminatedByCounterparty(); + verify(providerObserver).contractTerminatedByCounterpartyStarted(any()); + assertTerminationEvent(argument, contractAgreementId, contractTerminationRequest); + + verify(providerObserver).contractTerminatedByCounterparty(any()); + assertTerminationEvent(argument, contractAgreementId, contractTerminationRequest); // act @@ -506,6 +518,14 @@ void canListenToTerminationEvents( assertThat(providerService.getListeners()).hasSize(0); } + private static void assertTerminationEvent(ArgumentCaptor argument, String contractAgreementId, + ContractTerminationRequest contractTerminationRequest) { + assertThat(argument.getValue().contractAgreementId()).isEqualTo(contractAgreementId); + assertThat(argument.getValue().detail()).isEqualTo(contractTerminationRequest.getDetail()); + assertThat(argument.getValue().reason()).isEqualTo(contractTerminationRequest.getReason()); + assertThat(argument.getValue().timestamp()).isNotNull(); + } + private static void assertTermination( ContractAgreementPage consumerSideAgreements, String detail, From 93c9411154cc5cff40f0de8175261887ae6d8334 Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Mon, 19 Aug 2024 09:50:38 +0200 Subject: [PATCH 04/13] Checkstyle --- .../contacttermination/ContractTerminationObserver.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java index aae938f51..ef9cee608 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationObserver.java @@ -14,8 +14,6 @@ package de.sovity.edc.extension.contacttermination; -import java.time.OffsetDateTime; - public interface ContractTerminationObserver { /** From d62eb339a7ae1815c2b6c66a644fd1a65c303446 Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Mon, 26 Aug 2024 13:30:56 +0200 Subject: [PATCH 05/13] WiP contract termination and logging house --- .../ContractAgreementTerminationService.java | 4 +- .../ContractTerminationExtension.java | 2 +- .../mds-logginghouse-binder/build.gradle.kts | 62 +++++++++++++++++++ .../MdsContractTerminationEvent.java | 27 ++++++++ .../MdsLoggingHouseBinder.java | 62 +++++++++++++++++++ ...rg.eclipse.edc.spi.system.ServiceExtension | 1 + gradle/libs.versions.toml | 2 +- settings.gradle.kts | 1 + 8 files changed, 157 insertions(+), 4 deletions(-) create mode 100644 extensions/mds-logginghouse-binder/build.gradle.kts create mode 100644 extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationEvent.java create mode 100644 extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java create mode 100644 extensions/mds-logginghouse-binder/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java index b34c3984d..2b224706e 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java @@ -55,7 +55,7 @@ public class ContractAgreementTerminationService implements Observable dslContextFactory.transactionResult(dsl -> - terminationService.terminateCounterpartyAgreement( + terminationService.terminateAgreementAsCounterparty( dsl, participantAgentService.createFor(claims).getIdentity(), buildTerminationRequest(termination)))); diff --git a/extensions/mds-logginghouse-binder/build.gradle.kts b/extensions/mds-logginghouse-binder/build.gradle.kts new file mode 100644 index 000000000..bd378ea58 --- /dev/null +++ b/extensions/mds-logginghouse-binder/build.gradle.kts @@ -0,0 +1,62 @@ + +plugins { + `java-library` + `maven-publish` +} + +dependencies { + annotationProcessor(libs.lombok) + compileOnly(libs.lombok) + + implementation(project(":extensions:contract-termination")) + + implementation(libs.edc.coreSpi) + implementation(libs.edc.dspNegotiationTransform) + implementation(libs.edc.transferSpi) + + implementation(libs.loggingHouse.client) + implementation(libs.jakarta.rsApi) + + + testAnnotationProcessor(libs.lombok) + testCompileOnly(libs.lombok) + + testImplementation(project(":extensions:postgres-flyway")) + testImplementation(project(":utils:test-utils")) + testImplementation(project(":utils:versions")) + + testImplementation(libs.edc.monitorJdkLogger) + testImplementation(libs.edc.http) { + exclude(group = "org.eclipse.jetty", module = "jetty-client") + exclude(group = "org.eclipse.jetty", module = "jetty-http") + exclude(group = "org.eclipse.jetty", module = "jetty-io") + exclude(group = "org.eclipse.jetty", module = "jetty-server") + exclude(group = "org.eclipse.jetty", module = "jetty-util") + exclude(group = "org.eclipse.jetty", module = "jetty-webapp") + } + + // Updated jetty versions for e.g. CVE-2023-26048 + testImplementation(libs.bundles.jetty.cve2023) + + testImplementation(libs.assertj.core) + testImplementation(libs.flyway.core) + testImplementation(libs.junit.api) + testImplementation(libs.hibernate.validation) + testImplementation(libs.jakarta.el) + testImplementation(libs.mockito.core) + testImplementation(libs.restAssured.restAssured) + testImplementation(libs.testcontainers.testcontainers) + testImplementation(libs.testcontainers.postgresql) + + testRuntimeOnly(libs.junit.engine) +} + +group = libs.versions.sovityEdcExtensionGroup.get() + +publishing { + publications { + create(project.name) { + from(components["java"]) + } + } +} diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationEvent.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationEvent.java new file mode 100644 index 000000000..0237f5fa2 --- /dev/null +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationEvent.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2024 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial API and implementation + * + */ + +package de.sovity.edc.extension.mdslogginhousebinder; + +import com.truzzt.extension.logginghouse.client.events.CustomLoggingHouseEvent; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@RequiredArgsConstructor +@Getter +public class MdsContractTerminationEvent extends CustomLoggingHouseEvent { + private final String eventId; + private final String processId; + private final String messageBody; +} diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java new file mode 100644 index 000000000..8fee2b145 --- /dev/null +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2024 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial API and implementation + * + */ + +package de.sovity.edc.extension.mdslogginhousebinder; + +import de.sovity.edc.extension.contacttermination.ContractAgreementTerminationService; +import de.sovity.edc.extension.contacttermination.ContractTerminationEvent; +import de.sovity.edc.extension.contacttermination.ContractTerminationObserver; +import lombok.val; +import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable; +import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.spi.event.EventRouter; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.system.ServiceExtension; +import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.spi.uuid.UuidGenerator; + +public class MdsLoggingHouseBinder implements ServiceExtension { + + @Inject + private EventRouter eventRouter; + + @Inject + private Monitor monitor; + + @Inject + private TransferProcessObservable observable; + + @Inject + private ContractAgreementTerminationService contractAgreementTerminationService; + + @Override + public void initialize(ServiceExtensionContext context) { + contractAgreementTerminationService.registerListener(new ContractTerminationObserver() { + @Override + public void contractTerminationCompletedOnThisInstance(ContractTerminationEvent contractTerminationEvent) { + val message = new MdsContractTerminationEvent( + UuidGenerator.INSTANCE.generate().toString(), + contractTerminationEvent.contractAgreementId(), + "Contract termination event: terminated contract %s at %s from this EDC. Reason: %s Detail: %s".formatted(contractTerminationEvent.contractAgreementId(), contractTerminationEvent.timestamp(), contractTerminationEvent.reason(), + contractTerminationEvent.detail()) + ); + } + + @Override + public void contractTerminatedByCounterparty(ContractTerminationEvent contractTerminationEvent) { + ContractTerminationObserver.super.contractTerminatedByCounterparty(contractTerminationEvent); + } + }); + } +} diff --git a/extensions/mds-logginghouse-binder/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/extensions/mds-logginghouse-binder/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension new file mode 100644 index 000000000..6ac768693 --- /dev/null +++ b/extensions/mds-logginghouse-binder/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -0,0 +1 @@ +de.sovity.edc.extension.mdslogginhousebinder.MdsLoggingHouseBinder diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index f5a766691..b33b3facf 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -43,7 +43,7 @@ json = "20220924" jsonAssert = "1.5.1" jsonUnit = "3.2.7" junit = "5.10.0" -loggingHouse = "v1.1.0" +loggingHouse = "v1.2.0-alpha.1" lombok = "1.18.30" mockito = "5.12.0" mockserver = "5.15.0" diff --git a/settings.gradle.kts b/settings.gradle.kts index 78a0903b2..003fd30ef 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -8,6 +8,7 @@ include(":extensions:contract-termination") include(":extensions:database-direct-access") include(":extensions:edc-ui-config") include(":extensions:last-commit-info") +include(":extensions:mds-logginghouse-binder") include(":extensions:policy-always-true") include(":extensions:policy-referring-connector") include(":extensions:policy-time-interval") From fe1432fc40181dd4b7b3ae73a38b10c5f02305ea Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Tue, 27 Aug 2024 09:08:08 +0200 Subject: [PATCH 06/13] Bind the termination to the logging house --- .../ContractAgreementTerminationService.java | 82 ++++--------------- .../ContractTerminationEvent.java | 8 +- .../mdslogginhousebinder/LogEntry.java | 57 +++++++++++++ .../MdsLoggingHouseBinder.java | 75 ++++++++++++----- launchers/common/base-mds/build.gradle.kts | 1 + .../edc/e2e/ContractTerminationTest.java | 16 ++-- 6 files changed, 140 insertions(+), 99 deletions(-) create mode 100644 extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LogEntry.java diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java index 2b224706e..0be5589e2 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java @@ -18,39 +18,32 @@ import de.sovity.edc.extension.contacttermination.query.TerminateContractQuery; import de.sovity.edc.extension.messenger.SovityMessage; import de.sovity.edc.extension.messenger.SovityMessenger; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.val; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.observe.Observable; -import org.jetbrains.annotations.NotNull; +import org.eclipse.edc.spi.observe.ObservableImpl; import org.jetbrains.annotations.Nullable; import org.jooq.DSLContext; -import java.lang.ref.Reference; -import java.lang.ref.WeakReference; import java.time.OffsetDateTime; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import static de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy.COUNTERPARTY; import static de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy.SELF; @RequiredArgsConstructor -public class ContractAgreementTerminationService implements Observable { +public class ContractAgreementTerminationService { private final SovityMessenger sovityMessenger; private final ContractAgreementTerminationDetailsQuery contractAgreementTerminationDetailsQuery; private final TerminateContractQuery terminateContractQuery; private final Monitor monitor; private final String thisParticipantId; - private final List> contractTerminationObservers = - Collections.synchronizedList(new ArrayList<>()); - private final ReentrantLock observersLock = new ReentrantLock(); + @Getter + private final Observable contractTerminationObservable = new ObservableImpl<>(); /** * This is to terminate an EDC's own contract. @@ -60,7 +53,7 @@ public class ContractAgreementTerminationService implements Observable it.contractTerminationStartedFromThisInstance(starterEvent)); val details = contractAgreementTerminationDetailsQuery.fetchAgreementDetailsOrThrow(dsl, termination.contractAgreementId()); @@ -75,7 +68,7 @@ public OffsetDateTime terminateAgreementOrThrow(DSLContext dsl, ContractTerminat val terminatedAt = terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, SELF); - val endEvent = ContractTerminationEvent.from(termination, terminatedAt); + val endEvent = ContractTerminationEvent.from(termination, terminatedAt, thisParticipantId); notifyObservers(it -> it.contractTerminationCompletedOnThisInstance(endEvent)); notifyTerminationToProvider(details.counterpartyAddress(), termination); @@ -88,7 +81,7 @@ public OffsetDateTime terminateAgreementAsCounterparty( @Nullable String identity, ContractTerminationParam termination ) { - val starterEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now()); + val starterEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), null); notifyObservers(it -> it.contractTerminatedByCounterpartyStarted(starterEvent)); val details = contractAgreementTerminationDetailsQuery.fetchAgreementDetailsOrThrow(dsl, termination.contractAgreementId()); @@ -115,7 +108,7 @@ public OffsetDateTime terminateAgreementAsCounterparty( val result = terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, agent); - val endEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now()); + val endEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), details.counterpartyId()); notifyObservers(it -> it.contractTerminatedByCounterparty(endEvent)); return result; @@ -123,10 +116,10 @@ public OffsetDateTime terminateAgreementAsCounterparty( public void notifyTerminationToProvider(String counterPartyAddress, ContractTerminationParam termination) { - val notificationEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now()); + val notificationEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), null); notifyObservers(it -> it.contractTerminationOnCounterpartyStarted(notificationEvent)); - val future = sovityMessenger.send( + sovityMessenger.send( SovityMessage.class, counterPartyAddress, new ContractTerminationMessage( @@ -135,59 +128,12 @@ public void notifyTerminationToProvider(String counterPartyAddress, ContractTerm termination.reason())); } - @Override - public Collection getListeners() { - return contractTerminationObservers.stream().filter(it -> it.get() != null).toList().stream().map(Reference::get).toList(); - } - - @Override - public void registerListener(ContractTerminationObserver listener) { - try { - observersLock.lock(); - - final var refreshed = getFilteredWeakReferences(null); - - contractTerminationObservers.clear(); - contractTerminationObservers.addAll(refreshed); - contractTerminationObservers.add(new WeakReference<>(listener)); - } finally { - observersLock.unlock(); - } - } - - @Override - public void unregisterListener(ContractTerminationObserver listener) { - try { - observersLock.lock(); - - final var refreshed = getFilteredWeakReferences(listener); - - contractTerminationObservers.clear(); - contractTerminationObservers.addAll(refreshed); - } finally { - observersLock.unlock(); - } - } - - private @NotNull List> getFilteredWeakReferences(ContractTerminationObserver listener) { - return contractTerminationObservers.stream().filter(it -> { - val obs = it.get(); - if (obs == null) { - return false; - } - return obs != listener; - }).toList(); - } - private void notifyObservers(Consumer call) { - for (val weakRef : contractTerminationObservers) { + for (val listener : contractTerminationObservable.getListeners()) { try { - val observer = weakRef.get(); - if (observer != null) { - call.accept(observer); - } + call.accept(listener); } catch (Exception e) { - monitor.warning("Failure when notifying contract termination observer."); + monitor.warning("Failure when notifying the contract termination listener."); } } } diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationEvent.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationEvent.java index fb5baadae..4def044dc 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationEvent.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationEvent.java @@ -20,14 +20,16 @@ public record ContractTerminationEvent( String contractAgreementId, String detail, String reason, - OffsetDateTime timestamp + OffsetDateTime timestamp, + String origin ) { - public static ContractTerminationEvent from(ContractTerminationParam contractTerminationParam, OffsetDateTime dateTime) { + public static ContractTerminationEvent from(ContractTerminationParam contractTerminationParam, OffsetDateTime dateTime, String origin) { return new ContractTerminationEvent( contractTerminationParam.contractAgreementId(), contractTerminationParam.detail(), contractTerminationParam.reason(), - dateTime + dateTime, + origin ); } } diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LogEntry.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LogEntry.java new file mode 100644 index 000000000..cabbb627d --- /dev/null +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LogEntry.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2024 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial API and implementation + * + */ + +package de.sovity.edc.extension.mdslogginhousebinder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import de.sovity.edc.extension.contacttermination.ContractTerminationEvent; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; + +import java.time.OffsetDateTime; + +@AllArgsConstructor +@Builder +@Data +@NoArgsConstructor +public class LogEntry { + + @JsonProperty("contractAgreementId") + private String contractAgreementId; + + @JsonProperty("event") + private String event; + + @JsonProperty("detail") + private String detail; + + @JsonProperty("reason") + private String reason; + + @JsonProperty("timestamp") + private OffsetDateTime timestamp; + + public static LogEntry from(String event, ContractTerminationEvent contractTerminationEvent) { + return LogEntry.builder() + .event(event) + .contractAgreementId(contractTerminationEvent.contractAgreementId()) + .detail(contractTerminationEvent.detail()) + .reason(contractTerminationEvent.reason()) + .timestamp(contractTerminationEvent.timestamp()) + .build(); + } +} diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java index 8fee2b145..d752c830f 100644 --- a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java @@ -14,12 +14,15 @@ package de.sovity.edc.extension.mdslogginhousebinder; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import de.sovity.edc.extension.contacttermination.ContractAgreementTerminationService; import de.sovity.edc.extension.contacttermination.ContractTerminationEvent; import de.sovity.edc.extension.contacttermination.ContractTerminationObserver; import lombok.val; -import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable; import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.spi.event.EventEnvelope; import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.system.ServiceExtension; @@ -34,29 +37,61 @@ public class MdsLoggingHouseBinder implements ServiceExtension { @Inject private Monitor monitor; - @Inject - private TransferProcessObservable observable; - @Inject private ContractAgreementTerminationService contractAgreementTerminationService; + private ObjectMapper objectMapper; + @Override public void initialize(ServiceExtensionContext context) { - contractAgreementTerminationService.registerListener(new ContractTerminationObserver() { - @Override - public void contractTerminationCompletedOnThisInstance(ContractTerminationEvent contractTerminationEvent) { - val message = new MdsContractTerminationEvent( - UuidGenerator.INSTANCE.generate().toString(), - contractTerminationEvent.contractAgreementId(), - "Contract termination event: terminated contract %s at %s from this EDC. Reason: %s Detail: %s".formatted(contractTerminationEvent.contractAgreementId(), contractTerminationEvent.timestamp(), contractTerminationEvent.reason(), - contractTerminationEvent.detail()) - ); - } - - @Override - public void contractTerminatedByCounterparty(ContractTerminationEvent contractTerminationEvent) { - ContractTerminationObserver.super.contractTerminatedByCounterparty(contractTerminationEvent); - } - }); + + objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JavaTimeModule()); + + contractAgreementTerminationService.getContractTerminationObservable() + .registerListener(new ContractTerminationObserver() { + @Override + public void contractTerminationCompletedOnThisInstance(ContractTerminationEvent contractTerminationEvent) { + val logEntry = LogEntry.from("contractTerminatedByThisInstance", contractTerminationEvent); + + try { + val message = objectMapper.writeValueAsString(logEntry); + val event = new MdsContractTerminationEvent( + UuidGenerator.INSTANCE.generate().toString(), + contractTerminationEvent.contractAgreementId(), + message + ); + + @SuppressWarnings("unchecked") + EventEnvelope.Builder builder = EventEnvelope.Builder.newInstance(); + + val eventEnvelope = builder + .at(System.currentTimeMillis()) + .payload(event) + .build(); + + eventRouter.publish(eventEnvelope); + monitor.debug("Published event for " + logEntry); + } catch (JsonProcessingException e) { + monitor.warning("Failed to serialize the event for the logging house " + logEntry); + } + } + + @Override + public void contractTerminatedByCounterparty(ContractTerminationEvent contractTerminationEvent) { + val logEntry = LogEntry.from("contractTerminatedByCounterparty", contractTerminationEvent); + + try { + val message = objectMapper.writeValueAsString(logEntry); + new MdsContractTerminationEvent( + UuidGenerator.INSTANCE.generate().toString(), + contractTerminationEvent.contractAgreementId(), + message + ); + } catch (JsonProcessingException e) { + monitor.warning("Failed to serialize the event for the logging house " + logEntry); + } + } + }); } } diff --git a/launchers/common/base-mds/build.gradle.kts b/launchers/common/base-mds/build.gradle.kts index 0faceaa8d..89d1a8bd8 100644 --- a/launchers/common/base-mds/build.gradle.kts +++ b/launchers/common/base-mds/build.gradle.kts @@ -4,6 +4,7 @@ plugins { dependencies { implementation(libs.loggingHouse.client) + implementation(project(":extensions:mds-logginghouse-binder")) } group = libs.versions.sovityEdcGroup.get() diff --git a/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java b/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java index ba5179917..81f06a375 100644 --- a/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java +++ b/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java @@ -473,8 +473,8 @@ void canListenToTerminationEvents( val providerObserver = Mockito.spy(new ContractTerminationObserver() { }); - consumerService.registerListener(consumerObserver); - providerService.registerListener(providerObserver); + consumerService.getContractTerminationObservable().registerListener(consumerObserver); + providerService.getContractTerminationObservable().registerListener(providerObserver); // act @@ -487,8 +487,8 @@ void canListenToTerminationEvents( // assert - assertThat(consumerService.getListeners()).hasSize(1); - assertThat(providerService.getListeners()).hasSize(1); + assertThat(consumerService.getContractTerminationObservable().getListeners()).hasSize(1); + assertThat(providerService.getContractTerminationObservable().getListeners()).hasSize(1); ArgumentCaptor argument = ArgumentCaptor.forClass(ContractTerminationEvent.class); @@ -509,13 +509,13 @@ void canListenToTerminationEvents( // act - consumerService.unregisterListener(consumerObserver); - providerService.unregisterListener(providerObserver); + consumerService.getContractTerminationObservable().unregisterListener(consumerObserver); + providerService.getContractTerminationObservable().unregisterListener(providerObserver); // assert - assertThat(consumerService.getListeners()).hasSize(0); - assertThat(providerService.getListeners()).hasSize(0); + assertThat(consumerService.getContractTerminationObservable().getListeners()).hasSize(0); + assertThat(providerService.getContractTerminationObservable().getListeners()).hasSize(0); } private static void assertTerminationEvent(ArgumentCaptor argument, String contractAgreementId, From f42f35407da32c6e7ed768df7b85ca928ed4b291 Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Tue, 27 Aug 2024 09:49:00 +0200 Subject: [PATCH 07/13] Self review --- CHANGELOG.md | 4 ++++ .../sovity/edc/extension/mdslogginhousebinder/LogEntry.java | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 202c865d3..6ff6d81d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,10 @@ please see [changelog_updates.md](docs/dev/changelog_updates.md). #### Minor Changes +- Add Contract Termination Observer +- MDS only + - Log contract termination events in the logging house + #### Patch Changes ### Deployment Migration Notes diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LogEntry.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LogEntry.java index cabbb627d..0ec722beb 100644 --- a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LogEntry.java +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LogEntry.java @@ -20,7 +20,6 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import lombok.ToString; import java.time.OffsetDateTime; From 389acc601a6fb8c5f1428f386a36d5cbcae65ff7 Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Tue, 27 Aug 2024 10:12:42 +0200 Subject: [PATCH 08/13] Forgot counterparty --- .../MdsLoggingHouseBinder.java | 59 +++++++++---------- 1 file changed, 27 insertions(+), 32 deletions(-) diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java index d752c830f..b5528c2bf 100644 --- a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java @@ -54,44 +54,39 @@ public void initialize(ServiceExtensionContext context) { public void contractTerminationCompletedOnThisInstance(ContractTerminationEvent contractTerminationEvent) { val logEntry = LogEntry.from("contractTerminatedByThisInstance", contractTerminationEvent); - try { - val message = objectMapper.writeValueAsString(logEntry); - val event = new MdsContractTerminationEvent( - UuidGenerator.INSTANCE.generate().toString(), - contractTerminationEvent.contractAgreementId(), - message - ); - - @SuppressWarnings("unchecked") - EventEnvelope.Builder builder = EventEnvelope.Builder.newInstance(); - - val eventEnvelope = builder - .at(System.currentTimeMillis()) - .payload(event) - .build(); - - eventRouter.publish(eventEnvelope); - monitor.debug("Published event for " + logEntry); - } catch (JsonProcessingException e) { - monitor.warning("Failed to serialize the event for the logging house " + logEntry); - } + sendMessage(contractTerminationEvent, logEntry); } @Override public void contractTerminatedByCounterparty(ContractTerminationEvent contractTerminationEvent) { val logEntry = LogEntry.from("contractTerminatedByCounterparty", contractTerminationEvent); - - try { - val message = objectMapper.writeValueAsString(logEntry); - new MdsContractTerminationEvent( - UuidGenerator.INSTANCE.generate().toString(), - contractTerminationEvent.contractAgreementId(), - message - ); - } catch (JsonProcessingException e) { - monitor.warning("Failed to serialize the event for the logging house " + logEntry); - } + sendMessage(contractTerminationEvent, logEntry); } }); } + + private void sendMessage(ContractTerminationEvent contractTerminationEvent, LogEntry logEntry) { + try { + val message = objectMapper.writeValueAsString(logEntry); + val event = new MdsContractTerminationEvent( + UuidGenerator.INSTANCE.generate().toString(), + contractTerminationEvent.contractAgreementId(), + message + ); + + @SuppressWarnings("unchecked") + EventEnvelope.Builder builder = EventEnvelope.Builder.newInstance(); + + val eventEnvelope = builder + .at(System.currentTimeMillis()) + .payload(event) + .build(); + + eventRouter.publish(eventEnvelope); + + monitor.debug("Published event for " + logEntry); + } catch (JsonProcessingException e) { + monitor.warning("Failed to serialize the event for the logging house " + logEntry); + } + } } From 6a00ab165aa4527a7cef3032bbeb0199c19cf9fd Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Tue, 3 Sep 2024 16:35:14 +0200 Subject: [PATCH 09/13] Code review --- extensions/mds-logginghouse-binder/README.md | 44 +++++++++++ .../MdsContractTerminationObserver.java | 73 +++++++++++++++++++ .../MdsLoggingHouseBinder.java | 52 ++----------- 3 files changed, 122 insertions(+), 47 deletions(-) create mode 100644 extensions/mds-logginghouse-binder/README.md create mode 100644 extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java diff --git a/extensions/mds-logginghouse-binder/README.md b/extensions/mds-logginghouse-binder/README.md new file mode 100644 index 000000000..6330791e8 --- /dev/null +++ b/extensions/mds-logginghouse-binder/README.md @@ -0,0 +1,44 @@ + +
+
+ + Logo + + +

EDC-Connector Extension:
MDS Contract Termination - LoggingHouse binder

+ +

+ Report Bug + ยท + Request Feature +

+
+ + +## About this Extension + +It links the Contract Termination events with the LoggingHouse. + +## Why does this extension exist? + +MDS needs to log the events generated when terminating a contract with their Logging House extension. +The Logging House is an external dependency and the linkage must only happen for the MDS variant. + +This extension implements this specific task. + +## Architecture + +```mermaid +flowchart TD + Binder(MDS LoggingHouse Binder) --> LoggingHouse(Logging House Extension) + Binder(MDS LoggingHouse Binder) --> ContractTermination(Contract Termination Extension) + MDS(MDS CE) --> Binder +``` + +## License + +Apache License 2.0 - see [LICENSE](../../LICENSE) + +## Contact + +sovity GmbH - contact@sovity.de diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java new file mode 100644 index 000000000..a15cb785d --- /dev/null +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2024 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial API and implementation + * + */ + +package de.sovity.edc.extension.mdslogginhousebinder; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.sovity.edc.extension.contacttermination.ContractTerminationEvent; +import de.sovity.edc.extension.contacttermination.ContractTerminationObserver; +import lombok.RequiredArgsConstructor; +import lombok.val; +import org.eclipse.edc.spi.event.EventEnvelope; +import org.eclipse.edc.spi.event.EventRouter; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.uuid.UuidGenerator; + +@RequiredArgsConstructor +public class MdsContractTerminationObserver implements ContractTerminationObserver { + + private final EventRouter eventRouter; + + private final Monitor monitor; + + private final ObjectMapper objectMapper; + + @Override + public void contractTerminationCompletedOnThisInstance(ContractTerminationEvent contractTerminationEvent) { + sendMessage("contractTerminatedByThisInstance", contractTerminationEvent); + } + + @Override + public void contractTerminatedByCounterparty(ContractTerminationEvent contractTerminationEvent) { + sendMessage("contractTerminatedByCounterparty", contractTerminationEvent); + } + + private void sendMessage(String logEvent, ContractTerminationEvent contractTerminationEvent) { + val logEntry = LogEntry.from(logEvent, contractTerminationEvent); + + try { + val message = objectMapper.writeValueAsString(logEntry); + val event = new MdsContractTerminationEvent( + UuidGenerator.INSTANCE.generate().toString(), + contractTerminationEvent.contractAgreementId(), + message + ); + + @SuppressWarnings("unchecked") + EventEnvelope.Builder builder = EventEnvelope.Builder.newInstance(); + + val eventEnvelope = builder + .at(System.currentTimeMillis()) + .payload(event) + .build(); + + eventRouter.publish(eventEnvelope); + + monitor.debug("Published event for " + logEntry); + } catch (JsonProcessingException e) { + monitor.warning("Failed to serialize the event for the logging house " + logEntry); + } + } +} diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java index b5528c2bf..8fd10a763 100644 --- a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java @@ -14,20 +14,15 @@ package de.sovity.edc.extension.mdslogginhousebinder; -import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import de.sovity.edc.extension.contacttermination.ContractAgreementTerminationService; -import de.sovity.edc.extension.contacttermination.ContractTerminationEvent; -import de.sovity.edc.extension.contacttermination.ContractTerminationObserver; import lombok.val; import org.eclipse.edc.runtime.metamodel.annotation.Inject; -import org.eclipse.edc.spi.event.EventEnvelope; import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; -import org.eclipse.edc.spi.uuid.UuidGenerator; public class MdsLoggingHouseBinder implements ServiceExtension { @@ -40,53 +35,16 @@ public class MdsLoggingHouseBinder implements ServiceExtension { @Inject private ContractAgreementTerminationService contractAgreementTerminationService; - private ObjectMapper objectMapper; - @Override public void initialize(ServiceExtensionContext context) { + setupLoggingHouseTerminationEventsLogging(); + } - objectMapper = new ObjectMapper(); + private void setupLoggingHouseTerminationEventsLogging() { + val objectMapper = new ObjectMapper(); objectMapper.registerModule(new JavaTimeModule()); contractAgreementTerminationService.getContractTerminationObservable() - .registerListener(new ContractTerminationObserver() { - @Override - public void contractTerminationCompletedOnThisInstance(ContractTerminationEvent contractTerminationEvent) { - val logEntry = LogEntry.from("contractTerminatedByThisInstance", contractTerminationEvent); - - sendMessage(contractTerminationEvent, logEntry); - } - - @Override - public void contractTerminatedByCounterparty(ContractTerminationEvent contractTerminationEvent) { - val logEntry = LogEntry.from("contractTerminatedByCounterparty", contractTerminationEvent); - sendMessage(contractTerminationEvent, logEntry); - } - }); - } - - private void sendMessage(ContractTerminationEvent contractTerminationEvent, LogEntry logEntry) { - try { - val message = objectMapper.writeValueAsString(logEntry); - val event = new MdsContractTerminationEvent( - UuidGenerator.INSTANCE.generate().toString(), - contractTerminationEvent.contractAgreementId(), - message - ); - - @SuppressWarnings("unchecked") - EventEnvelope.Builder builder = EventEnvelope.Builder.newInstance(); - - val eventEnvelope = builder - .at(System.currentTimeMillis()) - .payload(event) - .build(); - - eventRouter.publish(eventEnvelope); - - monitor.debug("Published event for " + logEntry); - } catch (JsonProcessingException e) { - monitor.warning("Failed to serialize the event for the logging house " + logEntry); - } + .registerListener(new MdsContractTerminationObserver(eventRouter, monitor, objectMapper)); } } From 40be438c761bf4f4fae4231552b824779c474178 Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Tue, 3 Sep 2024 16:51:57 +0200 Subject: [PATCH 10/13] Refactor message sending --- .../MdsContractTerminationObserver.java | 37 +++++++++++-------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java index a15cb785d..cf7a84796 100644 --- a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java @@ -19,11 +19,13 @@ import de.sovity.edc.extension.contacttermination.ContractTerminationEvent; import de.sovity.edc.extension.contacttermination.ContractTerminationObserver; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.val; import org.eclipse.edc.spi.event.EventEnvelope; import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.uuid.UuidGenerator; +import org.jetbrains.annotations.NotNull; @RequiredArgsConstructor public class MdsContractTerminationObserver implements ContractTerminationObserver { @@ -46,28 +48,33 @@ public void contractTerminatedByCounterparty(ContractTerminationEvent contractTe private void sendMessage(String logEvent, ContractTerminationEvent contractTerminationEvent) { val logEntry = LogEntry.from(logEvent, contractTerminationEvent); - try { + val event = buildLogEvent(contractTerminationEvent.contractAgreementId(), logEntry); + publishEvent(event); + monitor.debug("Published event for " + logEntry); + } catch (JsonProcessingException e) { + monitor.warning("Failed to serialize the event for the logging house " + logEntry); + } + } + + private @NotNull MdsContractTerminationEvent buildLogEvent(String contractAgreementId, LogEntry logEntry) throws JsonProcessingException { val message = objectMapper.writeValueAsString(logEntry); - val event = new MdsContractTerminationEvent( + return new MdsContractTerminationEvent( UuidGenerator.INSTANCE.generate().toString(), - contractTerminationEvent.contractAgreementId(), + contractAgreementId, message ); + } - @SuppressWarnings("unchecked") - EventEnvelope.Builder builder = EventEnvelope.Builder.newInstance(); - - val eventEnvelope = builder - .at(System.currentTimeMillis()) - .payload(event) - .build(); + private void publishEvent(MdsContractTerminationEvent event) { + @SuppressWarnings("unchecked") + EventEnvelope.Builder builder = EventEnvelope.Builder.newInstance(); - eventRouter.publish(eventEnvelope); + val eventEnvelope = builder + .at(System.currentTimeMillis()) + .payload(event) + .build(); - monitor.debug("Published event for " + logEntry); - } catch (JsonProcessingException e) { - monitor.warning("Failed to serialize the event for the logging house " + logEntry); - } + eventRouter.publish(eventEnvelope); } } From d1bf475002ab1ab326bde39263b835ad3e366c30 Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Tue, 3 Sep 2024 16:58:51 +0200 Subject: [PATCH 11/13] checkstyle --- .../MdsContractTerminationObserver.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java index cf7a84796..e00f7f5ca 100644 --- a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java @@ -19,7 +19,6 @@ import de.sovity.edc.extension.contacttermination.ContractTerminationEvent; import de.sovity.edc.extension.contacttermination.ContractTerminationObserver; import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; import lombok.val; import org.eclipse.edc.spi.event.EventEnvelope; import org.eclipse.edc.spi.event.EventRouter; @@ -57,13 +56,14 @@ private void sendMessage(String logEvent, ContractTerminationEvent contractTermi } } - private @NotNull MdsContractTerminationEvent buildLogEvent(String contractAgreementId, LogEntry logEntry) throws JsonProcessingException { - val message = objectMapper.writeValueAsString(logEntry); - return new MdsContractTerminationEvent( - UuidGenerator.INSTANCE.generate().toString(), - contractAgreementId, - message - ); + private @NotNull MdsContractTerminationEvent buildLogEvent(String contractAgreementId, LogEntry logEntry) + throws JsonProcessingException { + val message = objectMapper.writeValueAsString(logEntry); + return new MdsContractTerminationEvent( + UuidGenerator.INSTANCE.generate().toString(), + contractAgreementId, + message + ); } private void publishEvent(MdsContractTerminationEvent event) { From 00c2a27604e46a9183b2a0ad300c6e0045d9e925 Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Wed, 4 Sep 2024 10:02:01 +0200 Subject: [PATCH 12/13] Fix exception handling --- .../ContractAgreementTerminationService.java | 2 +- .../LoggingHouseException.java | 21 +++++++++++++++++++ .../MdsContractTerminationObserver.java | 4 +++- 3 files changed, 25 insertions(+), 2 deletions(-) create mode 100644 extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LoggingHouseException.java diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java index 0be5589e2..a8c8f7bf7 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java @@ -133,7 +133,7 @@ private void notifyObservers(Consumer call) { try { call.accept(listener); } catch (Exception e) { - monitor.warning("Failure when notifying the contract termination listener."); + monitor.warning("Failure when notifying the contract termination listener.", e); } } } diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LoggingHouseException.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LoggingHouseException.java new file mode 100644 index 000000000..3625ef6eb --- /dev/null +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LoggingHouseException.java @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2024 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial API and implementation + * + */ + +package de.sovity.edc.extension.mdslogginhousebinder; + +public class LoggingHouseException extends RuntimeException { + public LoggingHouseException(String message) { + super(message); + } +} diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java index e00f7f5ca..445fc340a 100644 --- a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java @@ -52,7 +52,9 @@ private void sendMessage(String logEvent, ContractTerminationEvent contractTermi publishEvent(event); monitor.debug("Published event for " + logEntry); } catch (JsonProcessingException e) { - monitor.warning("Failed to serialize the event for the logging house " + logEntry); + val message = "Failed to serialize the event for the LoggingHouse " + logEntry; + monitor.debug(message); + throw new LoggingHouseException(message); } } From 98eb76c3d2a76dc063b983b9eff7c7aafe8213ed Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Wed, 4 Sep 2024 11:00:17 +0200 Subject: [PATCH 13/13] code review --- .../extension/mdslogginhousebinder/LoggingHouseException.java | 4 ++-- .../mdslogginhousebinder/MdsContractTerminationObserver.java | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LoggingHouseException.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LoggingHouseException.java index 3625ef6eb..6ca7c24a1 100644 --- a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LoggingHouseException.java +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LoggingHouseException.java @@ -15,7 +15,7 @@ package de.sovity.edc.extension.mdslogginhousebinder; public class LoggingHouseException extends RuntimeException { - public LoggingHouseException(String message) { - super(message); + public LoggingHouseException(String message, Throwable cause) { + super(message, cause); } } diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java index 445fc340a..3ba115efb 100644 --- a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsContractTerminationObserver.java @@ -53,8 +53,7 @@ private void sendMessage(String logEvent, ContractTerminationEvent contractTermi monitor.debug("Published event for " + logEntry); } catch (JsonProcessingException e) { val message = "Failed to serialize the event for the LoggingHouse " + logEntry; - monitor.debug(message); - throw new LoggingHouseException(message); + throw new LoggingHouseException(message, e); } }