Skip to content

Commit

Permalink
feat: custom C2C messages (#970)
Browse files Browse the repository at this point in the history
  • Loading branch information
ununhexium authored Jun 25, 2024
1 parent b991f83 commit f6c8a5a
Show file tree
Hide file tree
Showing 48 changed files with 2,233 additions and 9 deletions.
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
indent_size = 4
ij_continuation_indent_size = 4

[*.ts]
quote_type = single
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ please see [changelog_updates.md](docs/dev/changelog_updates.md).

#### Minor Changes

- Add the SovityMessenger extension

#### Patch Changes

- Unified database migration histories
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/checkstyle/checkstyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@
<property name="braceAdjustment" value="4"/>
<property name="caseIndent" value="4"/>
<property name="throwsIndent" value="4"/>
<property name="lineWrappingIndentation" value="8"/>
<property name="lineWrappingIndentation" value="4"/>
<property name="arrayInitIndent" value="4"/>
</module>
<module name="AbbreviationAsWordInName">
Expand Down
72 changes: 72 additions & 0 deletions extensions/sovity-messenger/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<!-- PROJECT LOGO -->
<br />
<div align="center">
<a href="https://github.com/sovity/edc-extensions">
<img src="https://raw.githubusercontent.com/sovity/edc-ui/main/src/assets/images/sovity_logo.svg" alt="Logo" width="300">
</a>

<h3 align="center">EDC-Connector Extension:<br />Sovity Messenger</h3>

<p align="center">
<a href="https://github.com/sovity/edc-extensions/issues/new?template=bug_report.md">Report Bug</a>
·
<a href="https://github.com/sovity/edc-extensions/issues/new?template=feature_request.md">Request Feature</a>
</p>
</div>


## About this Extension

To provide a simpler way to exchange messages between EDCs while re-using the Dataspace's Connector-to-Connector authentication mechanisms, we created our own extension with a much simpler API surface omitting JSON-LD.

## Why does this extension exist?

Adding custom DSP messages to a vanilla EDC is verbose and requires the handling of JSON-LD and implementing your own Transformers. Since we do not care about JSON-LD we wanted a simpler API surface.

## Architecture

The sovity Messenger is implemented on top of the DSP messaging protocol and re-uses its exchange and authentication.

It is abstracted from the internals of the DSP protocol such that changing the underlying implementation remains an option.


```mermaid
---
title: Registering a handler
---
sequenceDiagram
Caller ->> SovityMessengerRegistry: register(inputClass, intputType, handler)
```

```mermaid
---
title: Sending a message
---
sequenceDiagram
Caller ->>+SovityMessenger: send(resultClass, counterPartyAddress, payload)
SovityMessenger -->> RemoteMessageDispatcherRegistry: dispatch(genericMessage)
SovityMessenger -->> -Caller: Future<resultClass>
RemoteMessageDispatcherRegistry ->> +CustomMessageReceiverController: <<sending via DSP>>
CustomMessageReceiverController ->> CustomMessageReceiverController: processMessage(handler, payload)
CustomMessageReceiverController -->> -RemoteMessageDispatcherRegistry: <<result via DSP>>
RemoteMessageDispatcherRegistry -->> SovityMessenger: <<result via DSP>>
SovityMessenger ->> +Caller: Future<resultClass>
Caller ->> -Caller: future.get()
```

## Demo

You can find a demo in the [demo](src/test/java/de/sovity/edc/extension/messenger/demo).

The 2 key entry points are:

- Register your message receiving by talking to the SovityMessageRegistry as demonstrated [here](src/test/java/de/sovity/edc/extension/messenger/demo/SovityMessengerDemo.java).
- Send messages by calling the SovityMessenger as shown [here](src/test/java/de/sovity/edc/extension/messenger/demo/SovityMessengerDemoTest.java).

## License

Apache License 2.0 - see [LICENSE](../../LICENSE)

## Contact

sovity GmbH - [email protected]
71 changes: 71 additions & 0 deletions extensions/sovity-messenger/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
plugins {
`java-library`
`maven-publish`
}

dependencies {
annotationProcessor(libs.lombok)

compileOnly(libs.lombok)

implementation(project(":utils:json-and-jsonld-utils"))

implementation(libs.edc.controlPlaneCore)
implementation(libs.edc.dspApiConfiguration)
implementation(libs.edc.dspHttpSpi)
implementation(libs.edc.httpSpi)
implementation(libs.edc.managementApiConfiguration)
implementation(libs.edc.transformCore)


testAnnotationProcessor(libs.lombok)

testCompileOnly(libs.lombok)

testImplementation(project(":utils:test-connector-remote"))
testImplementation(project(":utils:test-utils"))

testImplementation(libs.edc.junit)
testImplementation(libs.edc.dataPlaneSelectorCore)
testImplementation(libs.edc.dspApiConfiguration)
testImplementation(libs.edc.dspHttpCore)
testImplementation(libs.edc.iamMock)
testImplementation(libs.edc.jsonLd)

testImplementation(libs.edc.http) {
exclude(group = "org.eclipse.jetty", module = "jetty-client")
exclude(group = "org.eclipse.jetty", module = "jetty-http")
exclude(group = "org.eclipse.jetty", module = "jetty-io")
exclude(group = "org.eclipse.jetty", module = "jetty-server")
exclude(group = "org.eclipse.jetty", module = "jetty-util")
exclude(group = "org.eclipse.jetty", module = "jetty-webapp")
}

// Updated jetty versions for e.g. CVE-2023-26048
testImplementation(libs.bundles.jetty.cve2023)

testImplementation(libs.assertj.core)
testImplementation(libs.junit.api)
testImplementation(libs.jsonAssert)
testImplementation(libs.mockito.core)
testImplementation(libs.mockito.inline)
testImplementation(libs.mockserver.netty)
testImplementation(libs.restAssured.restAssured)
testImplementation(libs.testcontainers.testcontainers)
testImplementation(libs.testcontainers.junitJupiter)
testImplementation(libs.testcontainers.postgresql)

testRuntimeOnly(libs.junit.engine)
}

tasks.getByName<Test>("test") {
useJUnitPlatform()
}

publishing {
publications {
create<MavenPublication>(project.name) {
from(components["java"])
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2024 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial API and implementation
*/

package de.sovity.edc.extension.messenger;

import com.fasterxml.jackson.annotation.JsonIgnore;

/**
* The interface to implement when sending a message via the {@link SovityMessenger}.
* <br>
* The classes extending this interface must annotate the private fields to be sent with Jackson's
* {@link com.fasterxml.jackson.annotation.JsonProperty}.
* {@code public} fields are serialized automatically.
* <br>
* It is recommended to have a no-args constructor.
* <br>
* See <a href="https://www.baeldung.com/jackson-field-serializable-deserializable-or-not">this doc</a>
* for more detailed info about Jackson's serialization.
*/
public interface SovityMessage {
/**
* To avoid conflicts, it is recommended to use Java package-like naming convention.
*
* @return A unique string across all possible messages to identify the type of message.
*/
@JsonIgnore
String getType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2024 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial API and implementation
*/

package de.sovity.edc.extension.messenger;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.sovity.edc.extension.messenger.impl.SovityMessageRequest;
import de.sovity.edc.extension.messenger.impl.SovityMessengerStatus;
import de.sovity.edc.utils.JsonUtils;
import jakarta.json.Json;
import lombok.RequiredArgsConstructor;
import lombok.val;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.response.StatusResult;
import org.jetbrains.annotations.NotNull;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/**
* The service to send {@link SovityMessage}s.
*/
@RequiredArgsConstructor
public class SovityMessenger {

private final RemoteMessageDispatcherRegistry registry;

private final ObjectMapper serializer;

/**
* Sends a message to the counterparty address and returns a future result.
*
* @param resultType The result's class.
* @param counterPartyAddress The base DSP URL where to send the message. e.g. https://server:port/api/dsp
* @param payload The message to send.
* @param <T> The outgoing message type.
* @param <R> The incoming message type.
* @return A future result.
* @throws SovityMessengerException If a problem related to the message processing happened.
*/
public <T extends SovityMessage, R extends SovityMessage> CompletableFuture<StatusResult<R>> send(
Class<R> resultType, String counterPartyAddress, T payload) {
try {
val message = buildMessage(counterPartyAddress, payload);
val future = registry.dispatch(SovityMessageRequest.class, message);
return future.thenApply(processResponse(resultType, payload));
} catch (URISyntaxException | MalformedURLException | JsonProcessingException e) {
throw new EdcException("Failed to build a custom sovity message", e);
}
}

static class Discarded implements SovityMessage {
@Override
public String getType() {
return "de.sovity.edc.extension.messenger.impl.SovityMessengerImpl.Discarded";
}
}

/**
* Fire-and-forget messaging where you don't care about the response.
*/
public <T extends SovityMessage> void send(String counterPartyAddress, T payload) {
send(Discarded.class, counterPartyAddress, payload);
}

@NotNull
private <R extends SovityMessage, T> Function<StatusResult<SovityMessageRequest>, StatusResult<R>> processResponse(
Class<R> resultType, T payload) {
return statusResult -> statusResult.map(content -> {
try {
val headerStr = content.header();
val header = JsonUtils.parseJsonObj(headerStr);
if (header.getString("status").equals(SovityMessengerStatus.OK.getCode())) {
val resultBody = content.body();
return serializer.readValue(resultBody, resultType);
} else if (header.getString("status").equals(SovityMessengerStatus.HANDLER_EXCEPTION.getCode())) {
throw new SovityMessengerException(
header.getString("message"),
header.getString(SovityMessengerStatus.HANDLER_EXCEPTION.getCode(), "No outgoing body."),
payload);
} else {
throw new SovityMessengerException(header.getString("message"));
}
} catch (JsonProcessingException e) {
throw new EdcException(e);
}
});
}

@NotNull
private <T extends SovityMessage> SovityMessageRequest buildMessage(String counterPartyAddress, T payload)
throws MalformedURLException, URISyntaxException, JsonProcessingException {
val url = new URI(counterPartyAddress).toURL();
val header1 = Json.createObjectBuilder()
.add("type", payload.getType())
.build();
val header = JsonUtils.toJson(header1);
val serialized = serializer.writeValueAsString(payload);
return new SovityMessageRequest(url, header, serialized);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2024 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial API and implementation
*/

package de.sovity.edc.extension.messenger;

import lombok.Getter;

public class SovityMessengerException extends RuntimeException {

@Getter
private String body;
private Object payload;

public SovityMessengerException(String message) {
super(message);
}

public SovityMessengerException(String message, String body, Object payload) {
super(message);
this.body = body;
}
}
Loading

0 comments on commit f6c8a5a

Please sign in to comment.