Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add observable for termination events #1027

Merged
merged 14 commits into from
Sep 4, 2024
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ please see [changelog_updates.md](docs/dev/changelog_updates.md).

#### Minor Changes

- Add Contract Termination Observer
- MDS only
- Log contract termination events in the logging house

#### Patch Changes

### Deployment Migration Notes
Expand Down
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ subprojects {
tasks.register("printClasspath") {
group = libs.versions.edcGroup.get()
description = "The EdcRuntimeExtension JUnit Extension requires the gradle task 'printClasspath'"
println(sourceSets.main.get().runtimeClasspath.asPath);
println(sourceSets.main.get().runtimeClasspath.asPath)
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,20 @@

import de.sovity.edc.extension.contacttermination.query.ContractAgreementTerminationDetailsQuery;
import de.sovity.edc.extension.contacttermination.query.TerminateContractQuery;
import de.sovity.edc.extension.messenger.SovityMessage;
import de.sovity.edc.extension.messenger.SovityMessenger;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.val;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.observe.Observable;
import org.eclipse.edc.spi.observe.ObservableImpl;
import org.jetbrains.annotations.Nullable;
import org.jooq.DSLContext;

import java.time.OffsetDateTime;
import java.util.function.Consumer;

import static de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy.COUNTERPARTY;
import static de.sovity.edc.ext.db.jooq.enums.ContractTerminatedBy.SELF;
Expand All @@ -37,15 +42,20 @@ public class ContractAgreementTerminationService {
private final TerminateContractQuery terminateContractQuery;
private final Monitor monitor;
private final String thisParticipantId;
@Getter
private final Observable<ContractTerminationObserver> contractTerminationObservable = new ObservableImpl<>();

/**
* This is to terminate an EDC's own contract.
* If the termination comes from an external system, use
* {@link #terminateCounterpartyAgreement(DSLContext, String, ContractTerminationParam)}
* {@link #terminateAgreementAsCounterparty(DSLContext, String, ContractTerminationParam)}
* to validate the counter-party's identity.
*/
public OffsetDateTime terminateAgreementOrThrow(DSLContext dsl, ContractTerminationParam termination) {

val starterEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), thisParticipantId);
notifyObservers(it -> it.contractTerminationStartedFromThisInstance(starterEvent));

val details = contractAgreementTerminationDetailsQuery.fetchAgreementDetailsOrThrow(dsl, termination.contractAgreementId());

if (details == null) {
Expand All @@ -58,16 +68,22 @@ public OffsetDateTime terminateAgreementOrThrow(DSLContext dsl, ContractTerminat

val terminatedAt = terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, SELF);

val endEvent = ContractTerminationEvent.from(termination, terminatedAt, thisParticipantId);
notifyObservers(it -> it.contractTerminationCompletedOnThisInstance(endEvent));

notifyTerminationToProvider(details.counterpartyAddress(), termination);

return terminatedAt;
}

public OffsetDateTime terminateCounterpartyAgreement(
public OffsetDateTime terminateAgreementAsCounterparty(
DSLContext dsl,
@Nullable String identity,
ContractTerminationParam termination
) {
val starterEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), null);
notifyObservers(it -> it.contractTerminatedByCounterpartyStarted(starterEvent));

val details = contractAgreementTerminationDetailsQuery.fetchAgreementDetailsOrThrow(dsl, termination.contractAgreementId());

if (details == null) {
Expand All @@ -90,15 +106,35 @@ public OffsetDateTime terminateCounterpartyAgreement(

val agent = thisParticipantId.equals(details.counterpartyId()) ? SELF : COUNTERPARTY;

return terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, agent);
val result = terminateContractQuery.terminateConsumerAgreementOrThrow(dsl, termination, agent);

val endEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), details.counterpartyId());
notifyObservers(it -> it.contractTerminatedByCounterparty(endEvent));

return result;
}

public void notifyTerminationToProvider(String counterPartyAddress, ContractTerminationParam termination) {

val notificationEvent = ContractTerminationEvent.from(termination, OffsetDateTime.now(), null);
notifyObservers(it -> it.contractTerminationOnCounterpartyStarted(notificationEvent));

sovityMessenger.send(
SovityMessage.class,
counterPartyAddress,
new ContractTerminationMessage(
termination.contractAgreementId(),
termination.detail(),
termination.reason()));
}

private void notifyObservers(Consumer<ContractTerminationObserver> call) {
for (val listener : contractTerminationObservable.getListeners()) {
try {
call.accept(listener);
} catch (Exception e) {
monitor.warning("Failure when notifying the contract termination listener.", e);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2024 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial API and implementation
*
*/

package de.sovity.edc.extension.contacttermination;

import java.time.OffsetDateTime;

public record ContractTerminationEvent(
String contractAgreementId,
String detail,
String reason,
OffsetDateTime timestamp,
String origin
) {
public static ContractTerminationEvent from(ContractTerminationParam contractTerminationParam, OffsetDateTime dateTime, String origin) {
return new ContractTerminationEvent(
contractTerminationParam.contractAgreementId(),
contractTerminationParam.detail(),
contractTerminationParam.reason(),
dateTime,
origin
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private void setupMessenger(ContractAgreementTerminationService terminationServi
ContractTerminationMessage.class,
(claims, termination) ->
dslContextFactory.transactionResult(dsl ->
terminationService.terminateCounterpartyAgreement(
terminationService.terminateAgreementAsCounterparty(
dsl,
participantAgentService.createFor(claims).getIdentity(),
buildTerminationRequest(termination))));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2024 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial API and implementation
*
*/

package de.sovity.edc.extension.contacttermination;

public interface ContractTerminationObserver {

/**
* Indicates that a contract termination was started by this EDC.
*/
default void contractTerminationStartedFromThisInstance(ContractTerminationEvent contractTerminationEvent) {
}

/**
* Indicates that the first step to terminate a contract, terminating a contract on this EDC instance itself, was successful.
* The contract is now marked as terminated on this EDC's side.
*/
default void contractTerminationCompletedOnThisInstance(ContractTerminationEvent contractTerminationEvent) {
}

/**
* Indicates that a contract termination on the counterparty EDC was started.
*/
default void contractTerminationOnCounterpartyStarted(ContractTerminationEvent contractTerminationEvent) {
}

/**
* Indicates that a contract termination was started by a counterparty EDC terminated successfully
*/
default void contractTerminatedByCounterpartyStarted(ContractTerminationEvent contractTerminationEvent) {
}

/**
* Indicates that a contract termination initiated by a counterparty EDC terminated successfully
* The contract is now marked as terminated on this EDC.
*/
default void contractTerminatedByCounterparty(ContractTerminationEvent contractTerminationEvent) {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class TerminateContractQuery {
public OffsetDateTime terminateConsumerAgreementOrThrow(
DSLContext dsl,
ContractTerminationParam termination,
ContractTerminatedBy terminatedBy) {

ContractTerminatedBy terminatedBy
) {
val tooAccurate = OffsetDateTime.now();
val now = tooAccurate.truncatedTo(ChronoUnit.MICROS);

Expand Down
44 changes: 44 additions & 0 deletions extensions/mds-logginghouse-binder/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
<!-- PROJECT LOGO -->
<br />
<div align="center">
<a href="https://github.com/sovity/edc-ce">
<img src="https://raw.githubusercontent.com/sovity/edc-ui/main/src/assets/images/sovity_logo.svg" alt="Logo" width="300">
</a>

<h3 align="center">EDC-Connector Extension:<br />MDS Contract Termination - LoggingHouse binder</h3>

<p align="center">
<a href="https://github.com/sovity/edc-ce/issues/new?template=bug_report.md">Report Bug</a>
·
<a href="https://github.com/sovity/edc-ce/issues/new?template=feature_request.md">Request Feature</a>
</p>
</div>


## About this Extension

It links the Contract Termination events with the LoggingHouse.

## Why does this extension exist?

MDS needs to log the events generated when terminating a contract with their Logging House extension.
The Logging House is an external dependency and the linkage must only happen for the MDS variant.

This extension implements this specific task.

## Architecture

```mermaid
flowchart TD
Binder(MDS LoggingHouse Binder) --> LoggingHouse(Logging House Extension)
Binder(MDS LoggingHouse Binder) --> ContractTermination(Contract Termination Extension)
MDS(MDS CE) --> Binder
```

## License

Apache License 2.0 - see [LICENSE](../../LICENSE)

## Contact

sovity GmbH - [email protected]
62 changes: 62 additions & 0 deletions extensions/mds-logginghouse-binder/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@

ununhexium marked this conversation as resolved.
Show resolved Hide resolved
plugins {
`java-library`
`maven-publish`
}

dependencies {
annotationProcessor(libs.lombok)
compileOnly(libs.lombok)

implementation(project(":extensions:contract-termination"))

implementation(libs.edc.coreSpi)
implementation(libs.edc.dspNegotiationTransform)
implementation(libs.edc.transferSpi)

implementation(libs.loggingHouse.client)
implementation(libs.jakarta.rsApi)


testAnnotationProcessor(libs.lombok)
testCompileOnly(libs.lombok)

testImplementation(project(":extensions:postgres-flyway"))
testImplementation(project(":utils:test-utils"))
testImplementation(project(":utils:versions"))

testImplementation(libs.edc.monitorJdkLogger)
testImplementation(libs.edc.http) {
exclude(group = "org.eclipse.jetty", module = "jetty-client")
exclude(group = "org.eclipse.jetty", module = "jetty-http")
exclude(group = "org.eclipse.jetty", module = "jetty-io")
exclude(group = "org.eclipse.jetty", module = "jetty-server")
exclude(group = "org.eclipse.jetty", module = "jetty-util")
exclude(group = "org.eclipse.jetty", module = "jetty-webapp")
}

// Updated jetty versions for e.g. CVE-2023-26048
testImplementation(libs.bundles.jetty.cve2023)

testImplementation(libs.assertj.core)
testImplementation(libs.flyway.core)
testImplementation(libs.junit.api)
testImplementation(libs.hibernate.validation)
testImplementation(libs.jakarta.el)
testImplementation(libs.mockito.core)
testImplementation(libs.restAssured.restAssured)
testImplementation(libs.testcontainers.testcontainers)
testImplementation(libs.testcontainers.postgresql)

testRuntimeOnly(libs.junit.engine)
}

group = libs.versions.sovityEdcExtensionGroup.get()

publishing {
publications {
create<MavenPublication>(project.name) {
from(components["java"])
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2024 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial API and implementation
*
*/

package de.sovity.edc.extension.mdslogginhousebinder;

import com.fasterxml.jackson.annotation.JsonProperty;
import de.sovity.edc.extension.contacttermination.ContractTerminationEvent;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.time.OffsetDateTime;

@AllArgsConstructor
@Builder
@Data
@NoArgsConstructor
public class LogEntry {

@JsonProperty("contractAgreementId")
private String contractAgreementId;

@JsonProperty("event")
private String event;

@JsonProperty("detail")
private String detail;

@JsonProperty("reason")
private String reason;

@JsonProperty("timestamp")
private OffsetDateTime timestamp;
richardtreier marked this conversation as resolved.
Show resolved Hide resolved

public static LogEntry from(String event, ContractTerminationEvent contractTerminationEvent) {
return LogEntry.builder()
.event(event)
.contractAgreementId(contractTerminationEvent.contractAgreementId())
.detail(contractTerminationEvent.detail())
.reason(contractTerminationEvent.reason())
.timestamp(contractTerminationEvent.timestamp())
.build();
}
}
Loading
Loading