Skip to content

Commit

Permalink
Bind the termination to the logging house
Browse files Browse the repository at this point in the history
  • Loading branch information
ununhexium committed Aug 27, 2024
1 parent d62eb33 commit fe1432f
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,32 @@
import de.sovity.edc.extension.contacttermination.query.TerminateContractQuery;
import de.sovity.edc.extension.messenger.SovityMessage;
import de.sovity.edc.extension.messenger.SovityMessenger;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.val;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.observe.Observable;
import org.jetbrains.annotations.NotNull;
import org.eclipse.edc.spi.observe.ObservableImpl;
import org.jetbrains.annotations.Nullable;
import org.jooq.DSLContext;

import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;

import static de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy.COUNTERPARTY;
import static de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy.SELF;

@RequiredArgsConstructor
public class ContractAgreementTerminationService implements Observable<ContractTerminationObserver> {
public class ContractAgreementTerminationService {

private final SovityMessenger sovityMessenger;
private final ContractAgreementTerminationDetailsQuery contractAgreementTerminationDetailsQuery;
private final TerminateContractQuery terminateContractQuery;
private final Monitor monitor;
private final String thisParticipantId;
private final List<WeakReference<ContractTerminationObserver>> contractTerminationObservers =
Collections.synchronizedList(new ArrayList<>());
private final ReentrantLock observersLock = new ReentrantLock();
@Getter
private final Observable<ContractTerminationObserver> contractTerminationObservable = new ObservableImpl<>();

/**
* This is to terminate an EDC's own contract.
Expand All @@ -60,7 +53,7 @@ public class ContractAgreementTerminationService implements Observable<ContractT
*/
public OffsetDateTime terminateAgreementOrThrow(DSLContext dsl, ContractTerminationParam termination) {

val starterEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now());
val starterEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), thisParticipantId);
notifyObservers(it -> it.contractTerminationStartedFromThisInstance(starterEvent));

val details = contractAgreementTerminationDetailsQuery.fetchAgreementDetailsOrThrow(dsl, termination.contractAgreementId());
Expand All @@ -75,7 +68,7 @@ public OffsetDateTime terminateAgreementOrThrow(DSLContext dsl, ContractTerminat

val terminatedAt = terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, SELF);

val endEvent = ContractTerminationEvent.from(termination, terminatedAt);
val endEvent = ContractTerminationEvent.from(termination, terminatedAt, thisParticipantId);
notifyObservers(it -> it.contractTerminationCompletedOnThisInstance(endEvent));

notifyTerminationToProvider(details.counterpartyAddress(), termination);
Expand All @@ -88,7 +81,7 @@ public OffsetDateTime terminateAgreementAsCounterparty(
@Nullable String identity,
ContractTerminationParam termination
) {
val starterEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now());
val starterEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), null);
notifyObservers(it -> it.contractTerminatedByCounterpartyStarted(starterEvent));

val details = contractAgreementTerminationDetailsQuery.fetchAgreementDetailsOrThrow(dsl, termination.contractAgreementId());
Expand All @@ -115,18 +108,18 @@ public OffsetDateTime terminateAgreementAsCounterparty(

val result = terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, agent);

val endEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now());
val endEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), details.counterpartyId());
notifyObservers(it -> it.contractTerminatedByCounterparty(endEvent));

return result;
}

public void notifyTerminationToProvider(String counterPartyAddress, ContractTerminationParam termination) {

val notificationEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now());
val notificationEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), null);
notifyObservers(it -> it.contractTerminationOnCounterpartyStarted(notificationEvent));

val future = sovityMessenger.send(
sovityMessenger.send(
SovityMessage.class,
counterPartyAddress,
new ContractTerminationMessage(
Expand All @@ -135,59 +128,12 @@ public void notifyTerminationToProvider(String counterPartyAddress, ContractTerm
termination.reason()));
}

@Override
public Collection<ContractTerminationObserver> getListeners() {
return contractTerminationObservers.stream().filter(it -> it.get() != null).toList().stream().map(Reference::get).toList();
}

@Override
public void registerListener(ContractTerminationObserver listener) {
try {
observersLock.lock();

final var refreshed = getFilteredWeakReferences(null);

contractTerminationObservers.clear();
contractTerminationObservers.addAll(refreshed);
contractTerminationObservers.add(new WeakReference<>(listener));
} finally {
observersLock.unlock();
}
}

@Override
public void unregisterListener(ContractTerminationObserver listener) {
try {
observersLock.lock();

final var refreshed = getFilteredWeakReferences(listener);

contractTerminationObservers.clear();
contractTerminationObservers.addAll(refreshed);
} finally {
observersLock.unlock();
}
}

private @NotNull List<WeakReference<ContractTerminationObserver>> getFilteredWeakReferences(ContractTerminationObserver listener) {
return contractTerminationObservers.stream().filter(it -> {
val obs = it.get();
if (obs == null) {
return false;
}
return obs != listener;
}).toList();
}

private void notifyObservers(Consumer<ContractTerminationObserver> call) {
for (val weakRef : contractTerminationObservers) {
for (val listener : contractTerminationObservable.getListeners()) {
try {
val observer = weakRef.get();
if (observer != null) {
call.accept(observer);
}
call.accept(listener);
} catch (Exception e) {
monitor.warning("Failure when notifying contract termination observer.");
monitor.warning("Failure when notifying the contract termination listener.");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ public record ContractTerminationEvent(
String contractAgreementId,
String detail,
String reason,
OffsetDateTime timestamp
OffsetDateTime timestamp,
String origin
) {
public static ContractTerminationEvent from(ContractTerminationParam contractTerminationParam, OffsetDateTime dateTime) {
public static ContractTerminationEvent from(ContractTerminationParam contractTerminationParam, OffsetDateTime dateTime, String origin) {
return new ContractTerminationEvent(
contractTerminationParam.contractAgreementId(),
contractTerminationParam.detail(),
contractTerminationParam.reason(),
dateTime
dateTime,
origin
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright (c) 2024 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial API and implementation
*
*/

package de.sovity.edc.extension.mdslogginhousebinder;

import com.fasterxml.jackson.annotation.JsonProperty;
import de.sovity.edc.extension.contacttermination.ContractTerminationEvent;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.time.OffsetDateTime;

@AllArgsConstructor
@Builder
@Data
@NoArgsConstructor
public class LogEntry {

@JsonProperty("contractAgreementId")
private String contractAgreementId;

@JsonProperty("event")
private String event;

@JsonProperty("detail")
private String detail;

@JsonProperty("reason")
private String reason;

@JsonProperty("timestamp")
private OffsetDateTime timestamp;

public static LogEntry from(String event, ContractTerminationEvent contractTerminationEvent) {
return LogEntry.builder()
.event(event)
.contractAgreementId(contractTerminationEvent.contractAgreementId())
.detail(contractTerminationEvent.detail())
.reason(contractTerminationEvent.reason())
.timestamp(contractTerminationEvent.timestamp())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@

package de.sovity.edc.extension.mdslogginhousebinder;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import de.sovity.edc.extension.contacttermination.ContractAgreementTerminationService;
import de.sovity.edc.extension.contacttermination.ContractTerminationEvent;
import de.sovity.edc.extension.contacttermination.ContractTerminationObserver;
import lombok.val;
import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.spi.event.EventEnvelope;
import org.eclipse.edc.spi.event.EventRouter;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.ServiceExtension;
Expand All @@ -34,29 +37,61 @@ public class MdsLoggingHouseBinder implements ServiceExtension {
@Inject
private Monitor monitor;

@Inject
private TransferProcessObservable observable;

@Inject
private ContractAgreementTerminationService contractAgreementTerminationService;

private ObjectMapper objectMapper;

@Override
public void initialize(ServiceExtensionContext context) {
contractAgreementTerminationService.registerListener(new ContractTerminationObserver() {
@Override
public void contractTerminationCompletedOnThisInstance(ContractTerminationEvent contractTerminationEvent) {
val message = new MdsContractTerminationEvent(
UuidGenerator.INSTANCE.generate().toString(),
contractTerminationEvent.contractAgreementId(),
"Contract termination event: terminated contract %s at %s from this EDC. Reason: %s Detail: %s".formatted(contractTerminationEvent.contractAgreementId(), contractTerminationEvent.timestamp(), contractTerminationEvent.reason(),
contractTerminationEvent.detail())
);
}

@Override
public void contractTerminatedByCounterparty(ContractTerminationEvent contractTerminationEvent) {
ContractTerminationObserver.super.contractTerminatedByCounterparty(contractTerminationEvent);
}
});

objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());

contractAgreementTerminationService.getContractTerminationObservable()
.registerListener(new ContractTerminationObserver() {
@Override
public void contractTerminationCompletedOnThisInstance(ContractTerminationEvent contractTerminationEvent) {
val logEntry = LogEntry.from("contractTerminatedByThisInstance", contractTerminationEvent);

try {
val message = objectMapper.writeValueAsString(logEntry);
val event = new MdsContractTerminationEvent(
UuidGenerator.INSTANCE.generate().toString(),
contractTerminationEvent.contractAgreementId(),
message
);

@SuppressWarnings("unchecked")
EventEnvelope.Builder<MdsContractTerminationEvent> builder = EventEnvelope.Builder.newInstance();

val eventEnvelope = builder
.at(System.currentTimeMillis())
.payload(event)
.build();

eventRouter.publish(eventEnvelope);
monitor.debug("Published event for " + logEntry);
} catch (JsonProcessingException e) {
monitor.warning("Failed to serialize the event for the logging house " + logEntry);
}
}

@Override
public void contractTerminatedByCounterparty(ContractTerminationEvent contractTerminationEvent) {
val logEntry = LogEntry.from("contractTerminatedByCounterparty", contractTerminationEvent);

try {
val message = objectMapper.writeValueAsString(logEntry);
new MdsContractTerminationEvent(
UuidGenerator.INSTANCE.generate().toString(),
contractTerminationEvent.contractAgreementId(),
message
);
} catch (JsonProcessingException e) {
monitor.warning("Failed to serialize the event for the logging house " + logEntry);
}
}
});
}
}
1 change: 1 addition & 0 deletions launchers/common/base-mds/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ plugins {

dependencies {
implementation(libs.loggingHouse.client)
implementation(project(":extensions:mds-logginghouse-binder"))
}

group = libs.versions.sovityEdcGroup.get()
Original file line number Diff line number Diff line change
Expand Up @@ -473,8 +473,8 @@ void canListenToTerminationEvents(
val providerObserver = Mockito.spy(new ContractTerminationObserver() {
});

consumerService.registerListener(consumerObserver);
providerService.registerListener(providerObserver);
consumerService.getContractTerminationObservable().registerListener(consumerObserver);
providerService.getContractTerminationObservable().registerListener(providerObserver);

// act

Expand All @@ -487,8 +487,8 @@ void canListenToTerminationEvents(

// assert

assertThat(consumerService.getListeners()).hasSize(1);
assertThat(providerService.getListeners()).hasSize(1);
assertThat(consumerService.getContractTerminationObservable().getListeners()).hasSize(1);
assertThat(providerService.getContractTerminationObservable().getListeners()).hasSize(1);

ArgumentCaptor<ContractTerminationEvent> argument = ArgumentCaptor.forClass(ContractTerminationEvent.class);

Expand All @@ -509,13 +509,13 @@ void canListenToTerminationEvents(

// act

consumerService.unregisterListener(consumerObserver);
providerService.unregisterListener(providerObserver);
consumerService.getContractTerminationObservable().unregisterListener(consumerObserver);
providerService.getContractTerminationObservable().unregisterListener(providerObserver);

// assert

assertThat(consumerService.getListeners()).hasSize(0);
assertThat(providerService.getListeners()).hasSize(0);
assertThat(consumerService.getContractTerminationObservable().getListeners()).hasSize(0);
assertThat(providerService.getContractTerminationObservable().getListeners()).hasSize(0);
}

private static void assertTerminationEvent(ArgumentCaptor<ContractTerminationEvent> argument, String contractAgreementId,
Expand Down

0 comments on commit fe1432f

Please sign in to comment.