Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add C2C custom messages #970

Merged
merged 42 commits into from
Jun 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
9ed0d1b
WiP: echo service and foundation for sending messages
ununhexium Jun 11, 2024
5c6a0b9
WiP kinda works
ununhexium Jun 11, 2024
b88d17c
WiP adapt tests
ununhexium Jun 12, 2024
1f23808
WiP: generic custom messgae: cancelled
ununhexium Jun 12, 2024
810d6be
WiP: roundtrip
ununhexium Jun 13, 2024
c827160
Waypoint: Generic echo service
ununhexium Jun 13, 2024
9f94781
Add test for the handlers
ununhexium Jun 13, 2024
23a2db1
Working e2e samples
ununhexium Jun 13, 2024
4ef66b2
Add demo
ununhexium Jun 13, 2024
ada26d5
Refactor demo folder
ununhexium Jun 14, 2024
ec8f734
Remove lombok usage for Kotlin demo
ununhexium Jun 14, 2024
776cd2c
rm obsolete TODO
ununhexium Jun 14, 2024
397365b
rm obsolete TODO
ununhexium Jun 14, 2024
587f010
Ensure message is authorized
ununhexium Jun 14, 2024
e315337
Rename SovityMessageRequest
ununhexium Jun 14, 2024
908a6db
Refactoring: avoid redundancy in message response
ununhexium Jun 14, 2024
e3fb2b8
Refactoring: use real message type directly
ununhexium Jun 14, 2024
8a60741
Report sovity handler misconfiguration as exception
ununhexium Jun 14, 2024
ecfbf73
Throw exception when a handler fault happened
ununhexium Jun 14, 2024
9dc6a62
cleanup
ununhexium Jun 14, 2024
1ac080b
cleanup
ununhexium Jun 14, 2024
364ee63
move package
ununhexium Jun 14, 2024
468b88b
Add doc and demo
ununhexium Jun 14, 2024
5502f5f
Checkstyle
ununhexium Jun 14, 2024
0e8a938
Remove useless code
ununhexium Jun 14, 2024
7ba4195
More doc and cleanup
ununhexium Jun 14, 2024
c2d3abb
Checkstyle
ununhexium Jun 14, 2024
e594632
Increase timeout for GH tests
ununhexium Jun 14, 2024
459e9fb
Headers
ununhexium Jun 14, 2024
73f1455
Code review
ununhexium Jun 17, 2024
d1b034d
cleanup and checkstyle
ununhexium Jun 17, 2024
eb931d5
Fix dead links
ununhexium Jun 17, 2024
3c0b8a9
Fix links
ununhexium Jun 17, 2024
3493fcc
Core review, Add exception handling and feedback
ununhexium Jun 19, 2024
0efc027
Fix space issue
ununhexium Jun 19, 2024
5bcd9ce
Format for checkstyle
ununhexium Jun 19, 2024
a112ec6
Merge branch 'main' into 1108-custom-c2c-messages
ununhexium Jun 19, 2024
327caee
Cleanup
ununhexium Jun 20, 2024
d6e1678
Disable test demo
ununhexium Jun 20, 2024
3984155
Add payload to sovity messenger exception
ununhexium Jun 20, 2024
830bdc7
Remove Kotlin plugin from previous attempt
ununhexium Jun 25, 2024
c5ec552
Merge branch 'main' into 1108-custom-c2c-messages
richardtreier Jun 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true
indent_size = 4
ij_continuation_indent_size = 4

[*.ts]
quote_type = single
Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ please see [changelog_updates.md](docs/dev/changelog_updates.md).

#### Minor Changes

- Add the SovityMessenger extension

#### Patch Changes

- Unified database migration histories
Expand Down
2 changes: 1 addition & 1 deletion docs/dev/checkstyle/checkstyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@
<property name="braceAdjustment" value="4"/>
<property name="caseIndent" value="4"/>
<property name="throwsIndent" value="4"/>
<property name="lineWrappingIndentation" value="8"/>
<property name="lineWrappingIndentation" value="4"/>
<property name="arrayInitIndent" value="4"/>
</module>
<module name="AbbreviationAsWordInName">
Expand Down
72 changes: 72 additions & 0 deletions extensions/sovity-messenger/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<!-- PROJECT LOGO -->
<br />
<div align="center">
<a href="https://github.com/sovity/edc-extensions">
<img src="https://raw.githubusercontent.com/sovity/edc-ui/main/src/assets/images/sovity_logo.svg" alt="Logo" width="300">
</a>

<h3 align="center">EDC-Connector Extension:<br />Sovity Messenger</h3>

<p align="center">
<a href="https://github.com/sovity/edc-extensions/issues/new?template=bug_report.md">Report Bug</a>
·
<a href="https://github.com/sovity/edc-extensions/issues/new?template=feature_request.md">Request Feature</a>
</p>
</div>


## About this Extension

To provide a simpler way to exchange messages between EDCs while re-using the Dataspace's Connector-to-Connector authentication mechanisms, we created our own extension with a much simpler API surface omitting JSON-LD.

## Why does this extension exist?

Adding custom DSP messages to a vanilla EDC is verbose and requires the handling of JSON-LD and implementing your own Transformers. Since we do not care about JSON-LD we wanted a simpler API surface.

## Architecture

The sovity Messenger is implemented on top of the DSP messaging protocol and re-uses its exchange and authentication.

It is abstracted from the internals of the DSP protocol such that changing the underlying implementation remains an option.


```mermaid
---
title: Registering a handler
---
sequenceDiagram
Caller ->> SovityMessengerRegistry: register(inputClass, intputType, handler)
```

```mermaid
---
title: Sending a message
---
sequenceDiagram
Caller ->>+SovityMessenger: send(resultClass, counterPartyAddress, payload)
SovityMessenger -->> RemoteMessageDispatcherRegistry: dispatch(genericMessage)
SovityMessenger -->> -Caller: Future<resultClass>
RemoteMessageDispatcherRegistry ->> +CustomMessageReceiverController: <<sending via DSP>>
CustomMessageReceiverController ->> CustomMessageReceiverController: processMessage(handler, payload)
CustomMessageReceiverController -->> -RemoteMessageDispatcherRegistry: <<result via DSP>>
RemoteMessageDispatcherRegistry -->> SovityMessenger: <<result via DSP>>
SovityMessenger ->> +Caller: Future<resultClass>
Caller ->> -Caller: future.get()
```

## Demo
ununhexium marked this conversation as resolved.
Show resolved Hide resolved

You can find a demo in the [demo](src/test/java/de/sovity/edc/extension/messenger/demo).

The 2 key entry points are:

- Register your message receiving by talking to the SovityMessageRegistry as demonstrated [here](src/test/java/de/sovity/edc/extension/messenger/demo/SovityMessengerDemo.java).
- Send messages by calling the SovityMessenger as shown [here](src/test/java/de/sovity/edc/extension/messenger/demo/SovityMessengerDemoTest.java).

## License

Apache License 2.0 - see [LICENSE](../../LICENSE)

## Contact

sovity GmbH - [email protected]
71 changes: 71 additions & 0 deletions extensions/sovity-messenger/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
plugins {
`java-library`
`maven-publish`
}

dependencies {
annotationProcessor(libs.lombok)

compileOnly(libs.lombok)

implementation(project(":utils:json-and-jsonld-utils"))

implementation(libs.edc.controlPlaneCore)
implementation(libs.edc.dspApiConfiguration)
implementation(libs.edc.dspHttpSpi)
implementation(libs.edc.httpSpi)
implementation(libs.edc.managementApiConfiguration)
implementation(libs.edc.transformCore)


testAnnotationProcessor(libs.lombok)

testCompileOnly(libs.lombok)

testImplementation(project(":utils:test-connector-remote"))
testImplementation(project(":utils:test-utils"))

testImplementation(libs.edc.junit)
testImplementation(libs.edc.dataPlaneSelectorCore)
testImplementation(libs.edc.dspApiConfiguration)
testImplementation(libs.edc.dspHttpCore)
testImplementation(libs.edc.iamMock)
testImplementation(libs.edc.jsonLd)

testImplementation(libs.edc.http) {
exclude(group = "org.eclipse.jetty", module = "jetty-client")
exclude(group = "org.eclipse.jetty", module = "jetty-http")
exclude(group = "org.eclipse.jetty", module = "jetty-io")
exclude(group = "org.eclipse.jetty", module = "jetty-server")
exclude(group = "org.eclipse.jetty", module = "jetty-util")
exclude(group = "org.eclipse.jetty", module = "jetty-webapp")
}

// Updated jetty versions for e.g. CVE-2023-26048
testImplementation(libs.bundles.jetty.cve2023)

testImplementation(libs.assertj.core)
testImplementation(libs.junit.api)
testImplementation(libs.jsonAssert)
testImplementation(libs.mockito.core)
testImplementation(libs.mockito.inline)
testImplementation(libs.mockserver.netty)
testImplementation(libs.restAssured.restAssured)
testImplementation(libs.testcontainers.testcontainers)
testImplementation(libs.testcontainers.junitJupiter)
testImplementation(libs.testcontainers.postgresql)

testRuntimeOnly(libs.junit.engine)
}

tasks.getByName<Test>("test") {
useJUnitPlatform()
}

publishing {
publications {
create<MavenPublication>(project.name) {
from(components["java"])
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (c) 2024 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial API and implementation
*/

package de.sovity.edc.extension.messenger;

import com.fasterxml.jackson.annotation.JsonIgnore;

/**
* The interface to implement when sending a message via the {@link SovityMessenger}.
* <br>
* The classes extending this interface must annotate the private fields to be sent with Jackson's
* {@link com.fasterxml.jackson.annotation.JsonProperty}.
* {@code public} fields are serialized automatically.
* <br>
* It is recommended to have a no-args constructor.
* <br>
* See <a href="https://www.baeldung.com/jackson-field-serializable-deserializable-or-not">this doc</a>
* for more detailed info about Jackson's serialization.
*/
public interface SovityMessage {
/**
* To avoid conflicts, it is recommended to use Java package-like naming convention.
*
* @return A unique string across all possible messages to identify the type of message.
*/
@JsonIgnore
String getType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright (c) 2024 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial API and implementation
*/

package de.sovity.edc.extension.messenger;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.sovity.edc.extension.messenger.impl.SovityMessageRequest;
import de.sovity.edc.extension.messenger.impl.SovityMessengerStatus;
import de.sovity.edc.utils.JsonUtils;
import jakarta.json.Json;
import lombok.RequiredArgsConstructor;
import lombok.val;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.response.StatusResult;
import org.jetbrains.annotations.NotNull;

import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;

/**
* The service to send {@link SovityMessage}s.
*/
@RequiredArgsConstructor
public class SovityMessenger {

private final RemoteMessageDispatcherRegistry registry;

private final ObjectMapper serializer;

/**
* Sends a message to the counterparty address and returns a future result.
*
* @param resultType The result's class.
* @param counterPartyAddress The base DSP URL where to send the message. e.g. https://server:port/api/dsp
* @param payload The message to send.
* @param <T> The outgoing message type.
* @param <R> The incoming message type.
* @return A future result.
* @throws SovityMessengerException If a problem related to the message processing happened.
*/
public <T extends SovityMessage, R extends SovityMessage> CompletableFuture<StatusResult<R>> send(
Class<R> resultType, String counterPartyAddress, T payload) {
try {
val message = buildMessage(counterPartyAddress, payload);
val future = registry.dispatch(SovityMessageRequest.class, message);
return future.thenApply(processResponse(resultType, payload));
} catch (URISyntaxException | MalformedURLException | JsonProcessingException e) {
throw new EdcException("Failed to build a custom sovity message", e);
}
}

static class Discarded implements SovityMessage {
@Override
public String getType() {
return "de.sovity.edc.extension.messenger.impl.SovityMessengerImpl.Discarded";
}
}

/**
* Fire-and-forget messaging where you don't care about the response.
*/
public <T extends SovityMessage> void send(String counterPartyAddress, T payload) {
send(Discarded.class, counterPartyAddress, payload);
}

@NotNull
private <R extends SovityMessage, T> Function<StatusResult<SovityMessageRequest>, StatusResult<R>> processResponse(
Class<R> resultType, T payload) {
return statusResult -> statusResult.map(content -> {
try {
val headerStr = content.header();
val header = JsonUtils.parseJsonObj(headerStr);
if (header.getString("status").equals(SovityMessengerStatus.OK.getCode())) {
val resultBody = content.body();
return serializer.readValue(resultBody, resultType);
} else if (header.getString("status").equals(SovityMessengerStatus.HANDLER_EXCEPTION.getCode())) {
throw new SovityMessengerException(
header.getString("message"),
header.getString(SovityMessengerStatus.HANDLER_EXCEPTION.getCode(), "No outgoing body."),
payload);
} else {
throw new SovityMessengerException(header.getString("message"));
}
} catch (JsonProcessingException e) {
throw new EdcException(e);
}
});
}

@NotNull
private <T extends SovityMessage> SovityMessageRequest buildMessage(String counterPartyAddress, T payload)
throws MalformedURLException, URISyntaxException, JsonProcessingException {
val url = new URI(counterPartyAddress).toURL();
val header1 = Json.createObjectBuilder()
.add("type", payload.getType())
.build();
val header = JsonUtils.toJson(header1);
val serialized = serializer.writeValueAsString(payload);
return new SovityMessageRequest(url, header, serialized);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2024 sovity GmbH
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* sovity GmbH - initial API and implementation
*/

package de.sovity.edc.extension.messenger;

import lombok.Getter;

public class SovityMessengerException extends RuntimeException {

@Getter
private String body;
private Object payload;

public SovityMessengerException(String message) {
super(message);
}

public SovityMessengerException(String message, String body, Object payload) {
super(message);
this.body = body;
}
}
Loading
Loading