Skip to content

Commit

Permalink
add e2e tests, add generator service impl
Browse files Browse the repository at this point in the history
  • Loading branch information
paullatzelsperger committed Feb 27, 2024
1 parent 293845d commit b32fb30
Show file tree
Hide file tree
Showing 13 changed files with 160 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryAccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryDataPlaneStore;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
Expand Down Expand Up @@ -63,4 +64,10 @@ public AccessTokenDataStore defaultAccessTokenDataStore() {
public PipelineService pipelineService(ServiceExtensionContext context) {
return new PipelineServiceImpl(context.getMonitor());
}

// todo: should this be a default service?
@Provider(isDefault = true)
public PublicEndpointGeneratorService publicEndpointGenerator() {
return new PublicEndpointGeneratorServiceImpl();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
import org.eclipse.edc.connector.dataplane.framework.manager.DataPlaneManagerImpl;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceRegistryImpl;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
import org.eclipse.edc.connector.dataplane.spi.Endpoint;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAccessControlService;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAccessTokenService;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGenerator;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataTransferExecutorServiceContainer;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
Expand All @@ -34,13 +33,11 @@
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.retry.ExponentialWaitStrategy;
import org.eclipse.edc.spi.system.ExecutorInstrumentation;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.eclipse.edc.spi.telemetry.Telemetry;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.statemachine.retry.EntityRetryProcessConfiguration;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -102,7 +99,7 @@ public class DataPlaneFrameworkExtension implements ServiceExtension {
@Inject
private DataPlaneAccessControlService accessControlService;
@Inject
private DataPlaneAuthorizationService dataPlaneAuthService;
private PublicEndpointGeneratorService endpointGenerator;

@Override
public String name() {
Expand Down Expand Up @@ -135,7 +132,6 @@ public void initialize(ServiceExtensionContext context) {
.transferServiceRegistry(transferServiceRegistry)
.store(store)
.transferProcessClient(transferProcessApiClient)
.authorizationService(dataPlaneAuthService)
.monitor(monitor)
.telemetry(telemetry)
.build();
Expand All @@ -157,12 +153,6 @@ public void shutdown() {

@Provider
public DataPlaneAuthorizationService authorizationService(ServiceExtensionContext context) {
var endpointGenerator = new PublicEndpointGenerator() {
@Override
public Result<Endpoint> generateFor(DataAddress sourceDataAddress) {
return Result.success(null);
}
};
return new DataPlaneAuthorizationServiceImpl(accessTokenService, endpointGenerator, accessControlService, context.getParticipantId(), clock);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2024 Metaform Systems, Inc.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Metaform Systems, Inc. - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.dataplane.spi.Endpoint;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

class PublicEndpointGeneratorServiceImpl implements PublicEndpointGeneratorService {
private final Map<String, Function<DataAddress, Endpoint>> generatorFunctions = new ConcurrentHashMap<>();

@Override
public Result<Endpoint> generateFor(DataAddress sourceDataAddress) {
Objects.requireNonNull(sourceDataAddress);
Objects.requireNonNull(sourceDataAddress.getType());

return Optional.ofNullable(generatorFunctions.get(sourceDataAddress.getType()))
.map(function -> function.apply(sourceDataAddress))
.map(Result::success)
.orElseGet(() -> Result.failure("No Endpoint generator function registered for source data type '%s'".formatted(sourceDataAddress.getType())));
}

@Override
public void addGeneratorFunction(String destinationType, Function<DataAddress, Endpoint> generatorFunction) {
generatorFunctions.put(destinationType, generatorFunction);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAccessControlService;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAccessTokenService;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGenerator;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.jwt.spi.JwtRegisteredClaimNames;
import org.eclipse.edc.spi.iam.TokenParameters;
import org.eclipse.edc.spi.iam.TokenRepresentation;
Expand All @@ -38,13 +38,13 @@ public class DataPlaneAuthorizationServiceImpl implements DataPlaneAuthorization
public static final String CLAIM_PROCESS_ID = "process_id";
public static final String CLAIM_TRANSFER_TYPE = "transfer_type";
private final DataPlaneAccessTokenService accessTokenService;
private final PublicEndpointGenerator endpointGenerator;
private final PublicEndpointGeneratorService endpointGenerator;
private final DataPlaneAccessControlService accessControlService;
private final String ownParticipantId;
private final Clock clock;

public DataPlaneAuthorizationServiceImpl(DataPlaneAccessTokenService accessTokenService,
PublicEndpointGenerator endpointGenerator,
PublicEndpointGeneratorService endpointGenerator,
DataPlaneAccessControlService accessControlService,
String ownParticipantId,
Clock clock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public DefaultDataPlaneAccessTokenServiceImpl(TokenGenerationService tokenGenera
* Generates JWT tokens based on the {@link TokenParameters}. A representation of the claims and the data address is stored for subsequent use, using the token ID ("jti")
* as correlation id.
*
* @param parameters Headers and claims that are to be included in the token. If the claims do <em>not</em> contain a "jti" claim, one is generated randomly and inserted into the claims.
* @param backendDataAddress Information about the data resource for which the token is to be generated. May contain additional information about the token, such as an {@code authType}
* @param parameters Headers and claims that are to be included in the token. If the claims do <em>not</em> contain a "jti" claim, one is generated randomly and inserted into the claims.
* @param backendDataAddress Information about the data resource for which the token is to be generated. May contain additional information about the token, such as an {@code authType}
* @return A token representation in serialized JWT format (signed). The JWTs "kid" header contains the ID of the public key that can be used to verify the token.
*/
@Override
Expand Down Expand Up @@ -104,7 +104,9 @@ public Result<TokenRepresentation> obtainToken(TokenParameters parameters, DataA
var accessTokenData = new AccessTokenData(id, claimToken, backendDataAddress);

var storeResult = accessTokenDataStore.store(accessTokenData);
return storeResult.succeeded() ? Result.success(tokenResult.getContent()) : Result.failure(storeResult.getFailureMessages());
var content = tokenResult.getContent();
content.getAdditional().put("authType", "bearer");
return storeResult.succeeded() ? Result.success(content) : Result.failure(storeResult.getFailureMessages());
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.eclipse.edc.connector.core.entity.AbstractStateEntityManager;
import org.eclipse.edc.connector.dataplane.spi.DataFlow;
import org.eclipse.edc.connector.dataplane.spi.DataFlowStates;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
Expand Down Expand Up @@ -50,7 +49,6 @@ public class DataPlaneManagerImpl extends AbstractStateEntityManager<DataFlow, D

private TransferServiceRegistry transferServiceRegistry;
private TransferProcessApiClient transferProcessClient;
private DataPlaneAuthorizationService authorizationService;

private DataPlaneManagerImpl() {

Expand All @@ -67,7 +65,6 @@ public Result<Boolean> validate(DataFlowStartMessage dataRequest) {

@Override
public void initiate(DataFlowStartMessage dataRequest) {
var result = authorizationService.createEndpointDataReference(dataRequest);
var dataFlow = DataFlow.Builder.newInstance()
.id(dataRequest.getProcessId())
.source(dataRequest.getSourceDataAddress())
Expand Down Expand Up @@ -217,11 +214,6 @@ public Builder transferProcessClient(TransferProcessApiClient transferProcessCli
manager.transferProcessClient = transferProcessClient;
return this;
}

public Builder authorizationService(DataPlaneAuthorizationService service) {
manager.authorizationService = service;
return this;
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024 Metaform Systems, Inc.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Metaform Systems, Inc. - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.dataplane.spi.Endpoint;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;


class PublicEndpointGeneratorServiceImplTest {

private final PublicEndpointGeneratorServiceImpl generatorService = new PublicEndpointGeneratorServiceImpl();

@Test
void generateFor() {
var endpoint = new Endpoint(Map.of("fizz", "buzz"), "bar-type");
generatorService.addGeneratorFunction("testtype", dataAddress -> endpoint);

assertThat(generatorService.generateFor(DataAddress.Builder.newInstance().type("testtype").build())).isSucceeded()
.isEqualTo(endpoint);
}

@Test
void generateFor_noFunction() {
assertThat(generatorService.generateFor(DataAddress.Builder.newInstance().type("testtype").build()))
.isFailed()
.detail()
.isEqualTo("No Endpoint generator function registered for source data type 'testtype'");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.eclipse.edc.connector.dataplane.spi.Endpoint;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAccessControlService;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAccessTokenService;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGenerator;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.spi.iam.ClaimToken;
import org.eclipse.edc.spi.iam.TokenRepresentation;
import org.eclipse.edc.spi.result.Result;
Expand Down Expand Up @@ -50,7 +50,7 @@ class DataPlaneAuthorizationServiceImplTest {

public static final String OWN_PARTICIPANT_ID = "test-ownParticipantId";
private final DataPlaneAccessTokenService accessTokenService = mock();
private final PublicEndpointGenerator endpointGenerator = mock();
private final PublicEndpointGeneratorService endpointGenerator = mock();
private final DataPlaneAccessControlService accessControlService = mock();
private final DataPlaneAuthorizationServiceImpl authorizationService = new DataPlaneAuthorizationServiceImpl(accessTokenService, endpointGenerator, accessControlService, OWN_PARTICIPANT_ID, Clock.systemUTC());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,25 @@

import java.util.Map;

/**
* Representation of a publicly accessible data egress point. This indicates to consumers the type of data (denoted by the {@code endpointType})
* and an object containing fields describing the endpoint.
* For HTTP this could be as simple as a {@code "url" -> "http://foo.bar"} entry, where for other protocols there could be
* formatted and structured data.
*
* @param endpoint An object describing the endpoint
* @param endpointType A string uniquely identifying the type of endpoint
*/
public record Endpoint(Map<String, Object> endpoint, String endpointType) {

/**
* Convenience factory method to create a HTTP endpoint.
*
* @param url A URL containing the HTTP endpoint
* @return the endpoint.
*/
public static Endpoint url(String url) {
return new Endpoint(Map.of("url", url), "https://w3id.org/idsa/v4.1/HTTP");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,27 @@
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;

import java.util.function.Function;

/**
* Determines the public endpoint at which the data for a particular data transfer is exposed.
* For example, for HTTP transfers this would likely return the internet-facing HTTP URL of the data plane ("public url").
*/
public interface PublicEndpointGenerator {
public interface PublicEndpointGeneratorService {
/**
* Generates an endpoint for a particular resource (={@link DataAddress}).
*
* @param sourceDataAddress The (private) resource identified by an internal {@link DataAddress}.
* @return The public {@link Endpoint} where the data is made available, or a failure if the endpoint could not be generated.
*/
Result<Endpoint> generateFor(DataAddress sourceDataAddress);

/**
* Adds a function that can generate a {@link Endpoint} for particular source data address. Typically, the source data address
* is <strong>not</strong> directly exposed publicly.
*
* @param destinationType The type of the source {@link DataAddress}
* @param generatorFunction the generator function
*/
void addGeneratorFunction(String destinationType, Function<DataAddress, Endpoint> generatorFunction);
}
10 changes: 2 additions & 8 deletions system-tests/e2e-dataplane-tests/tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,21 @@ plugins {
}

dependencies {
testImplementation(project(":spi:control-plane:transfer-spi"))
testImplementation(project(":extensions:common:sql:sql-core"))

testImplementation(project(":spi:data-plane:data-plane-spi"))

testImplementation(project(":core:common:junit"))
testImplementation(testFixtures(project(":extensions:common:sql:sql-core")))
testImplementation(testFixtures(project(":extensions:control-plane:api:management-api:management-api-test-fixtures")))
testImplementation(project(":core:common:junit"))
testImplementation(project(":extensions:common:json-ld"))
testImplementation(libs.jakartaJson)

testImplementation(libs.postgres)
testImplementation(libs.restAssured)
testImplementation(libs.assertj)
testImplementation(libs.awaitility)
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.mockserver.netty)
testImplementation(libs.mockserver.client)
testImplementation(libs.kafkaClients)

testCompileOnly(project(":system-tests:e2e-transfer-test:backend-service"))
testCompileOnly(project(":system-tests:e2e-transfer-test:control-plane"))
testCompileOnly(project(":system-tests:e2e-transfer-test:data-plane"))
}

Expand Down
Loading

0 comments on commit b32fb30

Please sign in to comment.