Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dsp transfer process validation #3378

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions DEPENDENCIES
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
maven/mavencentral/com.apicatalog/carbon-did/0.0.2, Apache-2.0, approved, #9239

Check warning on line 1 in DEPENDENCIES

View workflow job for this annotation

GitHub Actions / check / Dash-Verify-Licenses

Restricted Dependencies found

Some dependencies are marked 'restricted' - please review them
maven/mavencentral/com.apicatalog/iron-verifiable-credentials/0.8.1, Apache-2.0, approved, #9234
maven/mavencentral/com.apicatalog/titanium-json-ld/1.0.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/com.apicatalog/titanium-json-ld/1.3.1, Apache-2.0, approved, #8912
Expand Down Expand Up @@ -124,10 +124,10 @@
maven/mavencentral/io.netty/netty-tcnative-classes/2.0.56.Final, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.netty/netty-transport-native-unix-common/4.1.86.Final, Apache-2.0 AND BSD-3-Clause AND MIT, approved, CQ20926
maven/mavencentral/io.netty/netty-transport/4.1.86.Final, Apache-2.0 AND BSD-3-Clause AND MIT, approved, CQ20926
maven/mavencentral/io.opentelemetry.instrumentation/opentelemetry-instrumentation-annotations/1.29.0, , restricted, clearlydefined
maven/mavencentral/io.opentelemetry.instrumentation/opentelemetry-instrumentation-annotations/1.29.0, Apache-2.0, approved, #10087
maven/mavencentral/io.opentelemetry.proto/opentelemetry-proto/1.0.0-alpha, Apache-2.0, approved, #10044
maven/mavencentral/io.opentelemetry/opentelemetry-api/1.29.0, , restricted, clearlydefined
maven/mavencentral/io.opentelemetry/opentelemetry-context/1.29.0, , restricted, clearlydefined
maven/mavencentral/io.opentelemetry/opentelemetry-api/1.29.0, Apache-2.0, approved, #10088
maven/mavencentral/io.opentelemetry/opentelemetry-context/1.29.0, Apache-2.0, approved, #10090
maven/mavencentral/io.prometheus/simpleclient/0.16.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.prometheus/simpleclient_common/0.16.0, Apache-2.0, approved, clearlydefined
maven/mavencentral/io.prometheus/simpleclient_httpserver/0.16.0, Apache-2.0, approved, clearlydefined
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,7 @@ public ServiceResult<ContractNegotiation> notifyTerminated(ContractNegotiationTe
@NotNull
public ServiceResult<ContractNegotiation> findById(String id, ClaimToken claimToken) {
return transactionContext.execute(() -> Optional.ofNullable(store.findById(id))
.map(negotiation -> validateGetRequest(claimToken, negotiation))
.map(ServiceResult::success)
.map(negotiation -> validateRequest(claimToken, negotiation))
.orElse(ServiceResult.notFound(format("No negotiation with id %s found", id))));
}

Expand Down Expand Up @@ -215,15 +214,6 @@ private ServiceResult<ContractNegotiation> validateRequest(ClaimToken claimToken
return ServiceResult.success(negotiation);
}
}

private ContractNegotiation validateGetRequest(ClaimToken claimToken, ContractNegotiation negotiation) {
var result = validationService.validateRequest(claimToken, negotiation);
if (result.failed()) {
return null;
} else {
return negotiation;
}
}

private ServiceResult<ContractNegotiation> getNegotiation(ContractRemoteMessage message) {
return store.findByCorrelationIdAndLease(message.getProcessId()).flatMap(ServiceResult::from);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ void findById_shouldReturnNotFound_whenNegotiationNotFound() {
}

@Test
void findById_shouldReturnNotFound_whenCounterPartyUnauthorized() {
void findById_shouldReturnBadRequest_whenCounterPartyUnauthorized() {
var id = "negotiationId";
var token = ClaimToken.Builder.newInstance().build();
var negotiation = contractNegotiationBuilder().id(id).type(PROVIDER).state(VERIFIED.code()).build();
Expand All @@ -423,7 +423,7 @@ void findById_shouldReturnNotFound_whenCounterPartyUnauthorized() {
assertThat(result)
.isFailed()
.extracting(ServiceFailure::getReason)
.isEqualTo(NOT_FOUND);
.isEqualTo(BAD_REQUEST);
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
api(project(":spi:common:catalog-spi"))
api(project(":spi:common:core-spi"))
api(project(":data-protocols:dsp:dsp-spi"))
api(project(":data-protocols:dsp:dsp-http-spi"))
implementation(project(":core:common:jersey-providers"))
implementation(project(":core:common:transform-core"))
implementation(project(":extensions:common:http"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,18 @@
import org.eclipse.edc.jsonld.spi.JsonLd;
import org.eclipse.edc.policy.model.AtomicConstraint;
import org.eclipse.edc.policy.model.LiteralExpression;
import org.eclipse.edc.protocol.dsp.api.configuration.message.DspRequestHandlerImpl;
import org.eclipse.edc.protocol.dsp.spi.message.DspRequestHandler;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.spi.iam.IdentityService;
import org.eclipse.edc.spi.protocol.ProtocolWebhook;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry;
import org.eclipse.edc.web.jersey.jsonld.JerseyJsonLdInterceptor;
import org.eclipse.edc.web.jersey.jsonld.ObjectMapperProvider;
import org.eclipse.edc.web.spi.WebServer;
Expand All @@ -63,7 +67,7 @@
* parameters.
*/
@Extension(value = DspApiConfigurationExtension.NAME)
@Provides({ DspApiConfiguration.class, ProtocolWebhook.class })
@Provides({ DspApiConfiguration.class, ProtocolWebhook.class, DspRequestHandler.class })
public class DspApiConfigurationExtension implements ServiceExtension {

public static final String NAME = "Dataspace Protocol API Configuration Extension";
Expand Down Expand Up @@ -102,6 +106,12 @@ public class DspApiConfigurationExtension implements ServiceExtension {
@Inject
private TypeTransformerRegistry transformerRegistry;

@Inject
private IdentityService identityService;

@Inject
private JsonObjectValidatorRegistry validatorRegistry;

@Override
public String name() {
return NAME;
Expand All @@ -112,8 +122,8 @@ public void initialize(ServiceExtensionContext context) {
var config = configurator.configure(context, webServer, SETTINGS);
var dspWebhookAddress = context.getSetting(DSP_CALLBACK_ADDRESS, DEFAULT_DSP_CALLBACK_ADDRESS);
context.registerService(DspApiConfiguration.class, new DspApiConfiguration(config.getContextAlias(), dspWebhookAddress));

context.registerService(ProtocolWebhook.class, () -> dspWebhookAddress);
context.registerService(DspRequestHandler.class, new DspRequestHandlerImpl(context.getMonitor(), dspWebhookAddress, identityService, validatorRegistry, transformerRegistry));

var jsonLdMapper = typeManager.getMapper(JSON_LD);
webService.registerResource(config.getContextAlias(), new ObjectMapperProvider(jsonLdMapper));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.protocol.dsp.api.configuration.message;

import jakarta.json.JsonObject;
import jakarta.ws.rs.core.MediaType;
import jakarta.ws.rs.core.Response;
import org.eclipse.edc.protocol.dsp.spi.message.DspRequestHandler;
import org.eclipse.edc.protocol.dsp.spi.message.GetDspRequest;
import org.eclipse.edc.protocol.dsp.spi.message.PostDspRequest;
import org.eclipse.edc.spi.iam.IdentityService;
import org.eclipse.edc.spi.iam.TokenRepresentation;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.message.ProcessRemoteMessage;
import org.eclipse.edc.spi.types.domain.message.RemoteMessage;
import org.eclipse.edc.transform.spi.TypeTransformerRegistry;
import org.eclipse.edc.validator.spi.JsonObjectValidatorRegistry;

import java.util.Objects;
import java.util.UUID;

import static org.eclipse.edc.protocol.dsp.api.configuration.error.DspErrorResponse.type;
import static org.eclipse.edc.protocol.dsp.spi.types.HttpMessageProtocol.DATASPACE_PROTOCOL_HTTP;

public class DspRequestHandlerImpl implements DspRequestHandler {

private final Monitor monitor;
private final String callbackAddress;
private final IdentityService identityService;
private final JsonObjectValidatorRegistry validatorRegistry;
private final TypeTransformerRegistry transformerRegistry;

public DspRequestHandlerImpl(Monitor monitor, String callbackAddress, IdentityService identityService,
JsonObjectValidatorRegistry validatorRegistry, TypeTransformerRegistry transformerRegistry) {
this.monitor = monitor;
this.callbackAddress = callbackAddress;
this.identityService = identityService;
this.validatorRegistry = validatorRegistry;
this.transformerRegistry = transformerRegistry;
}

@Override
public <R> Response getResource(GetDspRequest<R> request) {
monitor.debug(() -> "DSP: Incoming resource request for %s id %s".formatted(request.getResultClass(), request.getId()));

var tokenRepresentation = TokenRepresentation.Builder.newInstance().token(request.getToken()).build();
var claimTokenResult = identityService.verifyJwtToken(tokenRepresentation, callbackAddress);

if (claimTokenResult.failed()) {
monitor.debug(() -> "DSP: Unauthorized: %s".formatted(claimTokenResult.getFailureDetail()));
return type(request.getErrorType()).unauthorized();
}

var serviceResult = request.getServiceCall().apply(request.getId(), claimTokenResult.getContent());
if (serviceResult.failed()) {
monitor.debug(() -> "DSP: Service call failed: %s".formatted(serviceResult.getFailureDetail()));
return type(request.getErrorType()).processId(request.getId()).from(serviceResult.getFailure());
}

var resource = serviceResult.getContent();

var transformation = transformerRegistry.transform(resource, JsonObject.class);
if (transformation.failed()) {
var errorCode = UUID.randomUUID();
monitor.warning("Error transforming %s, error id %s: %s".formatted(request.getResultClass().getSimpleName(), errorCode, transformation.getFailureDetail()));
return type(request.getErrorType()).processId(request.getId()).message(String.format("Error code %s", errorCode)).internalServerError();
}

return Response.ok().type(MediaType.APPLICATION_JSON).entity(transformation.getContent()).build();
}

@Override
public <I extends RemoteMessage, R> Response createResource(PostDspRequest<I, R> request) {
monitor.debug(() -> "DSP: Incoming %s for %s process%s".formatted(
request.getInputClass().getSimpleName(),
request.getResultClass(),
request.getProcessId() != null ? ": " + request.getProcessId() : ""));

var tokenRepresentation = TokenRepresentation.Builder.newInstance().token(request.getToken()).build();
var claimTokenResult = identityService.verifyJwtToken(tokenRepresentation, callbackAddress);

if (claimTokenResult.failed()) {
monitor.debug(() -> "DSP: Unauthorized: %s".formatted(claimTokenResult.getFailureDetail()));
return type(request.getErrorType()).unauthorized();
}

var validation = validatorRegistry.validate(request.getExpectedMessageType(), request.getMessage());
if (validation.failed()) {
monitor.debug(() -> "DSP: Validation failed: %s".formatted(validation.getFailureMessages()));
return type(request.getErrorType()).badRequest();
}

var inputTransformation = transformerRegistry.transform(request.getMessage(), request.getInputClass())
.compose(message -> {
if (message instanceof ProcessRemoteMessage processRemoteMessage) {
processRemoteMessage.setProtocol(DATASPACE_PROTOCOL_HTTP);
}
return Result.success(message);
});

if (inputTransformation.failed()) {
monitor.debug(() -> "DSP: Transformation failed: %s".formatted(inputTransformation.getFailureMessages()));
return type(request.getErrorType()).badRequest();
}

var serviceResult = request.getServiceCall().apply(inputTransformation.getContent(), claimTokenResult.getContent());
if (serviceResult.failed()) {
monitor.debug(() -> "DSP: Service call failed: %s".formatted(serviceResult.getFailureDetail()));
return type(request.getErrorType()).from(serviceResult.getFailure());
}

var resource = serviceResult.getContent();

var outputTransformation = transformerRegistry.transform(resource, JsonObject.class);
if (outputTransformation.failed()) {
var errorCode = UUID.randomUUID();
monitor.warning("Error transforming %s, error id %s: %s".formatted(request.getResultClass().getSimpleName(), errorCode, outputTransformation.getFailureDetail()));
return type(request.getErrorType()).message("Error code %s".formatted(errorCode)).internalServerError();
}

return Response.ok().type(MediaType.APPLICATION_JSON).entity(outputTransformation.getContent()).build();
}

@Override
public <I extends RemoteMessage, R> Response updateResource(PostDspRequest<I, R> request) {
monitor.debug(() -> "DSP: Incoming %s for %s process%s".formatted(
request.getInputClass().getSimpleName(),
request.getResultClass(),
request.getProcessId() != null ? ": " + request.getProcessId() : ""));

var tokenRepresentation = TokenRepresentation.Builder.newInstance().token(request.getToken()).build();
var claimTokenResult = identityService.verifyJwtToken(tokenRepresentation, callbackAddress);

if (claimTokenResult.failed()) {
monitor.debug(() -> "DSP: Unauthorized: %s".formatted(claimTokenResult.getFailureDetail()));
return type(request.getErrorType()).processId(request.getProcessId()).unauthorized();
}

var validation = validatorRegistry.validate(request.getExpectedMessageType(), request.getMessage());
if (validation.failed()) {
monitor.debug(() -> "DSP: Validation failed: %s".formatted(validation.getFailureMessages()));
return type(request.getErrorType()).processId(request.getProcessId()).badRequest();
}

var inputTransformation = transformerRegistry.transform(request.getMessage(), request.getInputClass())
.compose(message -> {
if (message instanceof ProcessRemoteMessage processRemoteMessage) {
processRemoteMessage.setProtocol(DATASPACE_PROTOCOL_HTTP);

return Objects.equals(request.getProcessId(), processRemoteMessage.getProcessId())
? Result.success(message)
: Result.failure("DSP: Invalid process ID. Expected: %s, actual: %s"
.formatted(request.getProcessId(), processRemoteMessage.getProcessId()));
} else {
return Result.success(message);
}
});

if (inputTransformation.failed()) {
monitor.debug(() -> "DSP: Transformation failed: %s".formatted(validation.getFailureMessages()));
return type(request.getErrorType()).processId(request.getProcessId()).badRequest();
}

return request.getServiceCall()
.apply(inputTransformation.getContent(), claimTokenResult.getContent())
.map(it -> Response.ok().type(MediaType.APPLICATION_JSON_TYPE).build())
.orElse(failure -> {
monitor.debug(() -> "DSP: Service call failed: %s".formatted(failure.getFailureDetail()));
return type(request.getErrorType()).processId(request.getProcessId()).from(failure);
});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package org.eclipse.edc.protocol.dsp.api.configuration;

import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.protocol.dsp.api.configuration.message.DspRequestHandlerImpl;
import org.eclipse.edc.protocol.dsp.spi.message.DspRequestHandler;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.system.configuration.ConfigFactory;
import org.eclipse.edc.spi.types.TypeManager;
Expand Down Expand Up @@ -103,4 +105,11 @@ void initialize_shouldRegisterWebServiceProviders(DspApiConfigurationExtension e
verify(webService).registerResource(eq(CONTEXT_ALIAS), isA(ObjectMapperProvider.class));
verify(webService).registerResource(eq(CONTEXT_ALIAS), isA(JerseyJsonLdInterceptor.class));
}

@Test
void initialize_shouldProvideServices(DspApiConfigurationExtension extension, ServiceExtensionContext context) {
extension.initialize(context);

verify(context).registerService(eq(DspRequestHandler.class), isA(DspRequestHandlerImpl.class));
}
}
Loading
Loading