Skip to content

Commit

Permalink
feat: dsp transfer process validation (eclipse-edc#3378)
Browse files Browse the repository at this point in the history
* extract component to handle dsp requests logic

* Implement validators

* update dependencies

* Avoid duplication
  • Loading branch information
ndr-brt authored and ndkrimbacher committed Oct 4, 2023
1 parent 361750b commit 45de516
Show file tree
Hide file tree
Showing 35 changed files with 1,638 additions and 1,752 deletions.
6 changes: 3 additions & 3 deletions DEPENDENCIES
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,10 @@ maven/mavencentral/io.netty/netty-tcnative-boringssl-static/2.0.56.Final, Apache
maven/mavencentral/io.netty/netty-tcnative-classes/2.0.56.Final, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.netty/netty-transport-native-unix-common/4.1.86.Final, Apache-2.0 AND BSD-3-Clause AND MIT, approved, CQ20926
maven/mavencentral/io.netty/netty-transport/4.1.86.Final, Apache-2.0 AND BSD-3-Clause AND MIT, approved, CQ20926
maven/mavencentral/io.opentelemetry.instrumentation/opentelemetry-instrumentation-annotations/1.29.0, , restricted, clearlydefined
maven/mavencentral/io.opentelemetry.instrumentation/opentelemetry-instrumentation-annotations/1.29.0, Apache-2.0, approved, #10087
maven/mavencentral/io.opentelemetry.proto/opentelemetry-proto/1.0.0-alpha, Apache-2.0, approved, #10044
maven/mavencentral/io.opentelemetry/opentelemetry-api/1.29.0, , restricted, clearlydefined
maven/mavencentral/io.opentelemetry/opentelemetry-context/1.29.0, , restricted, clearlydefined
maven/mavencentral/io.opentelemetry/opentelemetry-api/1.29.0, Apache-2.0, approved, #10088
maven/mavencentral/io.opentelemetry/opentelemetry-context/1.29.0, Apache-2.0, approved, #10090
maven/mavencentral/io.prometheus/simpleclient/0.16.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.prometheus/simpleclient_common/0.16.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.prometheus/simpleclient_httpserver/0.16.0, Apache-2.0, approved, clearlydefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ public ServiceResult<ContractNegotiation> notifyTerminated(ContractNegotiationTe
@NotNull
public ServiceResult<ContractNegotiation> findById(String id, ClaimToken claimToken) {
return transactionContext.execute(() -> Optional.ofNullable(store.findById(id))
.map(negotiation -> validateGetRequest(claimToken, negotiation))
.map(ServiceResult::success)
.map(negotiation -> validateRequest(claimToken, negotiation))
.orElse(ServiceResult.notFound(format("No negotiation with id %s found", id))));
}

Expand Down Expand Up @@ -215,15 +214,6 @@ private ServiceResult<ContractNegotiation> validateRequest(ClaimToken claimToken
return ServiceResult.success(negotiation);
}
}

private ContractNegotiation validateGetRequest(ClaimToken claimToken, ContractNegotiation negotiation) {
var result = validationService.validateRequest(claimToken, negotiation);
if (result.failed()) {
return null;
} else {
return negotiation;
}
}

private ServiceResult<ContractNegotiation> getNegotiation(ContractRemoteMessage message) {
return store.findByCorrelationIdAndLease(message.getProcessId()).flatMap(ServiceResult::from);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ void findById_shouldReturnNotFound_whenNegotiationNotFound() {
}

@Test
void findById_shouldReturnNotFound_whenCounterPartyUnauthorized() {
void findById_shouldReturnBadRequest_whenCounterPartyUnauthorized() {
var id = "negotiationId";
var token = ClaimToken.Builder.newInstance().build();
var negotiation = contractNegotiationBuilder().id(id).type(PROVIDER).state(VERIFIED.code()).build();
Expand All @@ -423,7 +423,7 @@ void findById_shouldReturnNotFound_whenCounterPartyUnauthorized() {
assertThat(result)
.isFailed()
.extracting(ServiceFailure::getReason)
.isEqualTo(NOT_FOUND);
.isEqualTo(BAD_REQUEST);
}

@ParameterizedTest
Expand Down
1 change: 1 addition & 0 deletions data-protocols/dsp/dsp-api-configuration/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
api(project(":spi:common:catalog-spi"))
api(project(":spi:common:core-spi"))
api(project(":data-protocols:dsp:dsp-spi"))
api(project(":data-protocols:dsp:dsp-http-spi"))
implementation(project(":core:common:jersey-providers"))
implementation(project(":core:common:transform-core"))
implementation(project(":extensions:common:http"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.policy.model.AtomicConstraint;
import org.eclipse.edc.policy.model.LiteralExpression;
import org.eclipse.edc.protocol.dsp.api.configuration.message.DspRequestHandlerImpl;
import org.eclipse.edc.protocol.dsp.spi.message.DspRequestHandler;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.spi.iam.IdentityService;
import org.eclipse.edc.spi.protocol.ProtocolWebhook;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry;
import org.eclipse.edc.web.jersey.jsonld.JerseyJsonLdInterceptor;
import org.eclipse.edc.web.jersey.jsonld.ObjectMapperProvider;
import org.eclipse.edc.web.spi.WebServer;
Expand All @@ -63,7 +67,7 @@
* parameters.
*/
@Extension(value = DspApiConfigurationExtension.NAME)
@Provides({ DspApiConfiguration.class, ProtocolWebhook.class })
@Provides({ DspApiConfiguration.class, ProtocolWebhook.class, DspRequestHandler.class })
public class DspApiConfigurationExtension implements ServiceExtension {

public static final String NAME = "Dataspace Protocol API Configuration Extension";
Expand Down Expand Up @@ -102,6 +106,12 @@ public class DspApiConfigurationExtension implements ServiceExtension {
@Inject
private TypeTransformerRegistry transformerRegistry;

@Inject
private IdentityService identityService;

@Inject
private JsonObjectValidatorRegistry validatorRegistry;

@Override
public String name() {
return NAME;
Expand All @@ -112,8 +122,8 @@ public void initialize(ServiceExtensionContext context) {
var config = configurator.configure(context, webServer, SETTINGS);
var dspWebhookAddress = context.getSetting(DSP_CALLBACK_ADDRESS, DEFAULT_DSP_CALLBACK_ADDRESS);
context.registerService(DspApiConfiguration.class, new DspApiConfiguration(config.getContextAlias(), dspWebhookAddress));

context.registerService(ProtocolWebhook.class, () -> dspWebhookAddress);
context.registerService(DspRequestHandler.class, new DspRequestHandlerImpl(context.getMonitor(), dspWebhookAddress, identityService, validatorRegistry, transformerRegistry));

var jsonLdMapper = typeManager.getMapper(JSON_LD);
webService.registerResource(config.getContextAlias(), new ObjectMapperProvider(jsonLdMapper));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.protocol.dsp.api.configuration.message;

import jakarta.json.JsonObject;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.eclipse.edc.protocol.dsp.spi.message.DspRequestHandler;
import org.eclipse.edc.protocol.dsp.spi.message.GetDspRequest;
import org.eclipse.edc.protocol.dsp.spi.message.PostDspRequest;
import org.eclipse.edc.spi.iam.IdentityService;
import org.eclipse.edc.spi.iam.TokenRepresentation;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.message.ProcessRemoteMessage;
import org.eclipse.edc.spi.types.domain.message.RemoteMessage;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry;

import java.util.Objects;
import java.util.UUID;

import static org.eclipse.edc.protocol.dsp.api.configuration.error.DspErrorResponse.type;
import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;

public class DspRequestHandlerImpl implements DspRequestHandler {

private final Monitor monitor;
private final String callbackAddress;
private final IdentityService identityService;
private final JsonObjectValidatorRegistry validatorRegistry;
private final TypeTransformerRegistry transformerRegistry;

public DspRequestHandlerImpl(Monitor monitor, String callbackAddress, IdentityService identityService,
JsonObjectValidatorRegistry validatorRegistry, TypeTransformerRegistry transformerRegistry) {
this.monitor = monitor;
this.callbackAddress = callbackAddress;
this.identityService = identityService;
this.validatorRegistry = validatorRegistry;
this.transformerRegistry = transformerRegistry;
}

@Override
public <R> Response getResource(GetDspRequest<R> request) {
monitor.debug(() -> "DSP: Incoming resource request for %s id %s".formatted(request.getResultClass(), request.getId()));

var tokenRepresentation = TokenRepresentation.Builder.newInstance().token(request.getToken()).build();
var claimTokenResult = identityService.verifyJwtToken(tokenRepresentation, callbackAddress);

if (claimTokenResult.failed()) {
monitor.debug(() -> "DSP: Unauthorized: %s".formatted(claimTokenResult.getFailureDetail()));
return type(request.getErrorType()).unauthorized();
}

var serviceResult = request.getServiceCall().apply(request.getId(), claimTokenResult.getContent());
if (serviceResult.failed()) {
monitor.debug(() -> "DSP: Service call failed: %s".formatted(serviceResult.getFailureDetail()));
return type(request.getErrorType()).processId(request.getId()).from(serviceResult.getFailure());
}

var resource = serviceResult.getContent();

var transformation = transformerRegistry.transform(resource, JsonObject.class);
if (transformation.failed()) {
var errorCode = UUID.randomUUID();
monitor.warning("Error transforming %s, error id %s: %s".formatted(request.getResultClass().getSimpleName(), errorCode, transformation.getFailureDetail()));
return type(request.getErrorType()).processId(request.getId()).message(String.format("Error code %s", errorCode)).internalServerError();
}

return Response.ok().type(MediaType.APPLICATION_JSON).entity(transformation.getContent()).build();
}

@Override
public <I extends RemoteMessage, R> Response createResource(PostDspRequest<I, R> request) {
monitor.debug(() -> "DSP: Incoming %s for %s process%s".formatted(
request.getInputClass().getSimpleName(),
request.getResultClass(),
request.getProcessId() != null ? ": " + request.getProcessId() : ""));

var tokenRepresentation = TokenRepresentation.Builder.newInstance().token(request.getToken()).build();
var claimTokenResult = identityService.verifyJwtToken(tokenRepresentation, callbackAddress);

if (claimTokenResult.failed()) {
monitor.debug(() -> "DSP: Unauthorized: %s".formatted(claimTokenResult.getFailureDetail()));
return type(request.getErrorType()).unauthorized();
}

var validation = validatorRegistry.validate(request.getExpectedMessageType(), request.getMessage());
if (validation.failed()) {
monitor.debug(() -> "DSP: Validation failed: %s".formatted(validation.getFailureMessages()));
return type(request.getErrorType()).badRequest();
}

var inputTransformation = transformerRegistry.transform(request.getMessage(), request.getInputClass())
.compose(message -> {
if (message instanceof ProcessRemoteMessage processRemoteMessage) {
processRemoteMessage.setProtocol(DATASPACE_PROTOCOL_HTTP);
}
return Result.success(message);
});

if (inputTransformation.failed()) {
monitor.debug(() -> "DSP: Transformation failed: %s".formatted(inputTransformation.getFailureMessages()));
return type(request.getErrorType()).badRequest();
}

var serviceResult = request.getServiceCall().apply(inputTransformation.getContent(), claimTokenResult.getContent());
if (serviceResult.failed()) {
monitor.debug(() -> "DSP: Service call failed: %s".formatted(serviceResult.getFailureDetail()));
return type(request.getErrorType()).from(serviceResult.getFailure());
}

var resource = serviceResult.getContent();

var outputTransformation = transformerRegistry.transform(resource, JsonObject.class);
if (outputTransformation.failed()) {
var errorCode = UUID.randomUUID();
monitor.warning("Error transforming %s, error id %s: %s".formatted(request.getResultClass().getSimpleName(), errorCode, outputTransformation.getFailureDetail()));
return type(request.getErrorType()).message("Error code %s".formatted(errorCode)).internalServerError();
}

return Response.ok().type(MediaType.APPLICATION_JSON).entity(outputTransformation.getContent()).build();
}

@Override
public <I extends RemoteMessage, R> Response updateResource(PostDspRequest<I, R> request) {
monitor.debug(() -> "DSP: Incoming %s for %s process%s".formatted(
request.getInputClass().getSimpleName(),
request.getResultClass(),
request.getProcessId() != null ? ": " + request.getProcessId() : ""));

var tokenRepresentation = TokenRepresentation.Builder.newInstance().token(request.getToken()).build();
var claimTokenResult = identityService.verifyJwtToken(tokenRepresentation, callbackAddress);

if (claimTokenResult.failed()) {
monitor.debug(() -> "DSP: Unauthorized: %s".formatted(claimTokenResult.getFailureDetail()));
return type(request.getErrorType()).processId(request.getProcessId()).unauthorized();
}

var validation = validatorRegistry.validate(request.getExpectedMessageType(), request.getMessage());
if (validation.failed()) {
monitor.debug(() -> "DSP: Validation failed: %s".formatted(validation.getFailureMessages()));
return type(request.getErrorType()).processId(request.getProcessId()).badRequest();
}

var inputTransformation = transformerRegistry.transform(request.getMessage(), request.getInputClass())
.compose(message -> {
if (message instanceof ProcessRemoteMessage processRemoteMessage) {
processRemoteMessage.setProtocol(DATASPACE_PROTOCOL_HTTP);

return Objects.equals(request.getProcessId(), processRemoteMessage.getProcessId())
? Result.success(message)
: Result.failure("DSP: Invalid process ID. Expected: %s, actual: %s"
.formatted(request.getProcessId(), processRemoteMessage.getProcessId()));
} else {
return Result.success(message);
}
});

if (inputTransformation.failed()) {
monitor.debug(() -> "DSP: Transformation failed: %s".formatted(validation.getFailureMessages()));
return type(request.getErrorType()).processId(request.getProcessId()).badRequest();
}

return request.getServiceCall()
.apply(inputTransformation.getContent(), claimTokenResult.getContent())
.map(it -> Response.ok().type(MediaType.APPLICATION_JSON_TYPE).build())
.orElse(failure -> {
monitor.debug(() -> "DSP: Service call failed: %s".formatted(failure.getFailureDetail()));
return type(request.getErrorType()).processId(request.getProcessId()).from(failure);
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package org.eclipse.edc.protocol.dsp.api.configuration;

import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.protocol.dsp.api.configuration.message.DspRequestHandlerImpl;
import org.eclipse.edc.protocol.dsp.spi.message.DspRequestHandler;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.eclipse.edc.spi.types.TypeManager;
Expand Down Expand Up @@ -103,4 +105,11 @@ void initialize_shouldRegisterWebServiceProviders(DspApiConfigurationExtension e
verify(webService).registerResource(eq(CONTEXT_ALIAS), isA(ObjectMapperProvider.class));
verify(webService).registerResource(eq(CONTEXT_ALIAS), isA(JerseyJsonLdInterceptor.class));
}

@Test
void initialize_shouldProvideServices(DspApiConfigurationExtension extension, ServiceExtensionContext context) {
extension.initialize(context);

verify(context).registerService(eq(DspRequestHandler.class), isA(DspRequestHandlerImpl.class));
}
}
Loading

0 comments on commit 45de516

Please sign in to comment.