From fe1432fc40181dd4b7b3ae73a38b10c5f02305ea Mon Sep 17 00:00:00 2001 From: Christophe Loiseau Date: Tue, 27 Aug 2024 09:08:08 +0200 Subject: [PATCH] Bind the termination to the logging house --- .../ContractAgreementTerminationService.java | 82 ++++--------------- .../ContractTerminationEvent.java | 8 +- .../mdslogginhousebinder/LogEntry.java | 57 +++++++++++++ .../MdsLoggingHouseBinder.java | 75 ++++++++++++----- launchers/common/base-mds/build.gradle.kts | 1 + .../edc/e2e/ContractTerminationTest.java | 16 ++-- 6 files changed, 140 insertions(+), 99 deletions(-) create mode 100644 extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LogEntry.java diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java index 2b224706e..0be5589e2 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractAgreementTerminationService.java @@ -18,39 +18,32 @@ import de.sovity.edc.extension.contacttermination.query.TerminateContractQuery; import de.sovity.edc.extension.messenger.SovityMessage; import de.sovity.edc.extension.messenger.SovityMessenger; +import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.val; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.observe.Observable; -import org.jetbrains.annotations.NotNull; +import org.eclipse.edc.spi.observe.ObservableImpl; import org.jetbrains.annotations.Nullable; import org.jooq.DSLContext; -import java.lang.ref.Reference; -import java.lang.ref.WeakReference; import java.time.OffsetDateTime; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import static de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy.COUNTERPARTY; import static de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy.SELF; @RequiredArgsConstructor -public class ContractAgreementTerminationService implements Observable { +public class ContractAgreementTerminationService { private final SovityMessenger sovityMessenger; private final ContractAgreementTerminationDetailsQuery contractAgreementTerminationDetailsQuery; private final TerminateContractQuery terminateContractQuery; private final Monitor monitor; private final String thisParticipantId; - private final List> contractTerminationObservers = - Collections.synchronizedList(new ArrayList<>()); - private final ReentrantLock observersLock = new ReentrantLock(); + @Getter + private final Observable contractTerminationObservable = new ObservableImpl<>(); /** * This is to terminate an EDC's own contract. @@ -60,7 +53,7 @@ public class ContractAgreementTerminationService implements Observable it.contractTerminationStartedFromThisInstance(starterEvent)); val details = contractAgreementTerminationDetailsQuery.fetchAgreementDetailsOrThrow(dsl, termination.contractAgreementId()); @@ -75,7 +68,7 @@ public OffsetDateTime terminateAgreementOrThrow(DSLContext dsl, ContractTerminat val terminatedAt = terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, SELF); - val endEvent = ContractTerminationEvent.from(termination, terminatedAt); + val endEvent = ContractTerminationEvent.from(termination, terminatedAt, thisParticipantId); notifyObservers(it -> it.contractTerminationCompletedOnThisInstance(endEvent)); notifyTerminationToProvider(details.counterpartyAddress(), termination); @@ -88,7 +81,7 @@ public OffsetDateTime terminateAgreementAsCounterparty( @Nullable String identity, ContractTerminationParam termination ) { - val starterEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now()); + val starterEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), null); notifyObservers(it -> it.contractTerminatedByCounterpartyStarted(starterEvent)); val details = contractAgreementTerminationDetailsQuery.fetchAgreementDetailsOrThrow(dsl, termination.contractAgreementId()); @@ -115,7 +108,7 @@ public OffsetDateTime terminateAgreementAsCounterparty( val result = terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, agent); - val endEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now()); + val endEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), details.counterpartyId()); notifyObservers(it -> it.contractTerminatedByCounterparty(endEvent)); return result; @@ -123,10 +116,10 @@ public OffsetDateTime terminateAgreementAsCounterparty( public void notifyTerminationToProvider(String counterPartyAddress, ContractTerminationParam termination) { - val notificationEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now()); + val notificationEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), null); notifyObservers(it -> it.contractTerminationOnCounterpartyStarted(notificationEvent)); - val future = sovityMessenger.send( + sovityMessenger.send( SovityMessage.class, counterPartyAddress, new ContractTerminationMessage( @@ -135,59 +128,12 @@ public void notifyTerminationToProvider(String counterPartyAddress, ContractTerm termination.reason())); } - @Override - public Collection getListeners() { - return contractTerminationObservers.stream().filter(it -> it.get() != null).toList().stream().map(Reference::get).toList(); - } - - @Override - public void registerListener(ContractTerminationObserver listener) { - try { - observersLock.lock(); - - final var refreshed = getFilteredWeakReferences(null); - - contractTerminationObservers.clear(); - contractTerminationObservers.addAll(refreshed); - contractTerminationObservers.add(new WeakReference<>(listener)); - } finally { - observersLock.unlock(); - } - } - - @Override - public void unregisterListener(ContractTerminationObserver listener) { - try { - observersLock.lock(); - - final var refreshed = getFilteredWeakReferences(listener); - - contractTerminationObservers.clear(); - contractTerminationObservers.addAll(refreshed); - } finally { - observersLock.unlock(); - } - } - - private @NotNull List> getFilteredWeakReferences(ContractTerminationObserver listener) { - return contractTerminationObservers.stream().filter(it -> { - val obs = it.get(); - if (obs == null) { - return false; - } - return obs != listener; - }).toList(); - } - private void notifyObservers(Consumer call) { - for (val weakRef : contractTerminationObservers) { + for (val listener : contractTerminationObservable.getListeners()) { try { - val observer = weakRef.get(); - if (observer != null) { - call.accept(observer); - } + call.accept(listener); } catch (Exception e) { - monitor.warning("Failure when notifying contract termination observer."); + monitor.warning("Failure when notifying the contract termination listener."); } } } diff --git a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationEvent.java b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationEvent.java index fb5baadae..4def044dc 100644 --- a/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationEvent.java +++ b/extensions/contract-termination/src/main/java/de/sovity/edc/extension/contacttermination/ContractTerminationEvent.java @@ -20,14 +20,16 @@ public record ContractTerminationEvent( String contractAgreementId, String detail, String reason, - OffsetDateTime timestamp + OffsetDateTime timestamp, + String origin ) { - public static ContractTerminationEvent from(ContractTerminationParam contractTerminationParam, OffsetDateTime dateTime) { + public static ContractTerminationEvent from(ContractTerminationParam contractTerminationParam, OffsetDateTime dateTime, String origin) { return new ContractTerminationEvent( contractTerminationParam.contractAgreementId(), contractTerminationParam.detail(), contractTerminationParam.reason(), - dateTime + dateTime, + origin ); } } diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LogEntry.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LogEntry.java new file mode 100644 index 000000000..cabbb627d --- /dev/null +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/LogEntry.java @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2024 sovity GmbH + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * sovity GmbH - initial API and implementation + * + */ + +package de.sovity.edc.extension.mdslogginhousebinder; + +import com.fasterxml.jackson.annotation.JsonProperty; +import de.sovity.edc.extension.contacttermination.ContractTerminationEvent; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; + +import java.time.OffsetDateTime; + +@AllArgsConstructor +@Builder +@Data +@NoArgsConstructor +public class LogEntry { + + @JsonProperty("contractAgreementId") + private String contractAgreementId; + + @JsonProperty("event") + private String event; + + @JsonProperty("detail") + private String detail; + + @JsonProperty("reason") + private String reason; + + @JsonProperty("timestamp") + private OffsetDateTime timestamp; + + public static LogEntry from(String event, ContractTerminationEvent contractTerminationEvent) { + return LogEntry.builder() + .event(event) + .contractAgreementId(contractTerminationEvent.contractAgreementId()) + .detail(contractTerminationEvent.detail()) + .reason(contractTerminationEvent.reason()) + .timestamp(contractTerminationEvent.timestamp()) + .build(); + } +} diff --git a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java index 8fee2b145..d752c830f 100644 --- a/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java +++ b/extensions/mds-logginghouse-binder/src/main/java/de/sovity/edc/extension/mdslogginhousebinder/MdsLoggingHouseBinder.java @@ -14,12 +14,15 @@ package de.sovity.edc.extension.mdslogginhousebinder; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import de.sovity.edc.extension.contacttermination.ContractAgreementTerminationService; import de.sovity.edc.extension.contacttermination.ContractTerminationEvent; import de.sovity.edc.extension.contacttermination.ContractTerminationObserver; import lombok.val; -import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable; import org.eclipse.edc.runtime.metamodel.annotation.Inject; +import org.eclipse.edc.spi.event.EventEnvelope; import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.system.ServiceExtension; @@ -34,29 +37,61 @@ public class MdsLoggingHouseBinder implements ServiceExtension { @Inject private Monitor monitor; - @Inject - private TransferProcessObservable observable; - @Inject private ContractAgreementTerminationService contractAgreementTerminationService; + private ObjectMapper objectMapper; + @Override public void initialize(ServiceExtensionContext context) { - contractAgreementTerminationService.registerListener(new ContractTerminationObserver() { - @Override - public void contractTerminationCompletedOnThisInstance(ContractTerminationEvent contractTerminationEvent) { - val message = new MdsContractTerminationEvent( - UuidGenerator.INSTANCE.generate().toString(), - contractTerminationEvent.contractAgreementId(), - "Contract termination event: terminated contract %s at %s from this EDC. Reason: %s Detail: %s".formatted(contractTerminationEvent.contractAgreementId(), contractTerminationEvent.timestamp(), contractTerminationEvent.reason(), - contractTerminationEvent.detail()) - ); - } - - @Override - public void contractTerminatedByCounterparty(ContractTerminationEvent contractTerminationEvent) { - ContractTerminationObserver.super.contractTerminatedByCounterparty(contractTerminationEvent); - } - }); + + objectMapper = new ObjectMapper(); + objectMapper.registerModule(new JavaTimeModule()); + + contractAgreementTerminationService.getContractTerminationObservable() + .registerListener(new ContractTerminationObserver() { + @Override + public void contractTerminationCompletedOnThisInstance(ContractTerminationEvent contractTerminationEvent) { + val logEntry = LogEntry.from("contractTerminatedByThisInstance", contractTerminationEvent); + + try { + val message = objectMapper.writeValueAsString(logEntry); + val event = new MdsContractTerminationEvent( + UuidGenerator.INSTANCE.generate().toString(), + contractTerminationEvent.contractAgreementId(), + message + ); + + @SuppressWarnings("unchecked") + EventEnvelope.Builder builder = EventEnvelope.Builder.newInstance(); + + val eventEnvelope = builder + .at(System.currentTimeMillis()) + .payload(event) + .build(); + + eventRouter.publish(eventEnvelope); + monitor.debug("Published event for " + logEntry); + } catch (JsonProcessingException e) { + monitor.warning("Failed to serialize the event for the logging house " + logEntry); + } + } + + @Override + public void contractTerminatedByCounterparty(ContractTerminationEvent contractTerminationEvent) { + val logEntry = LogEntry.from("contractTerminatedByCounterparty", contractTerminationEvent); + + try { + val message = objectMapper.writeValueAsString(logEntry); + new MdsContractTerminationEvent( + UuidGenerator.INSTANCE.generate().toString(), + contractTerminationEvent.contractAgreementId(), + message + ); + } catch (JsonProcessingException e) { + monitor.warning("Failed to serialize the event for the logging house " + logEntry); + } + } + }); } } diff --git a/launchers/common/base-mds/build.gradle.kts b/launchers/common/base-mds/build.gradle.kts index 0faceaa8d..89d1a8bd8 100644 --- a/launchers/common/base-mds/build.gradle.kts +++ b/launchers/common/base-mds/build.gradle.kts @@ -4,6 +4,7 @@ plugins { dependencies { implementation(libs.loggingHouse.client) + implementation(project(":extensions:mds-logginghouse-binder")) } group = libs.versions.sovityEdcGroup.get() diff --git a/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java b/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java index ba5179917..81f06a375 100644 --- a/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java +++ b/tests/src/test/java/de/sovity/edc/e2e/ContractTerminationTest.java @@ -473,8 +473,8 @@ void canListenToTerminationEvents( val providerObserver = Mockito.spy(new ContractTerminationObserver() { }); - consumerService.registerListener(consumerObserver); - providerService.registerListener(providerObserver); + consumerService.getContractTerminationObservable().registerListener(consumerObserver); + providerService.getContractTerminationObservable().registerListener(providerObserver); // act @@ -487,8 +487,8 @@ void canListenToTerminationEvents( // assert - assertThat(consumerService.getListeners()).hasSize(1); - assertThat(providerService.getListeners()).hasSize(1); + assertThat(consumerService.getContractTerminationObservable().getListeners()).hasSize(1); + assertThat(providerService.getContractTerminationObservable().getListeners()).hasSize(1); ArgumentCaptor argument = ArgumentCaptor.forClass(ContractTerminationEvent.class); @@ -509,13 +509,13 @@ void canListenToTerminationEvents( // act - consumerService.unregisterListener(consumerObserver); - providerService.unregisterListener(providerObserver); + consumerService.getContractTerminationObservable().unregisterListener(consumerObserver); + providerService.getContractTerminationObservable().unregisterListener(providerObserver); // assert - assertThat(consumerService.getListeners()).hasSize(0); - assertThat(providerService.getListeners()).hasSize(0); + assertThat(consumerService.getContractTerminationObservable().getListeners()).hasSize(0); + assertThat(providerService.getContractTerminationObservable().getListeners()).hasSize(0); } private static void assertTerminationEvent(ArgumentCaptor argument, String contractAgreementId,