Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement DataPlaneAuthorizationService #3918

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryAccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.framework.store.InMemoryDataPlaneStore;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.store.AccessTokenDataStore;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
Expand Down Expand Up @@ -64,4 +65,9 @@ public PipelineService pipelineService(ServiceExtensionContext context) {
return new PipelineServiceImpl(context.getMonitor());
}

// todo: should this be a default service?
@Provider(isDefault = true)
public PublicEndpointGeneratorService publicEndpointGenerator() {
return new PublicEndpointGeneratorServiceImpl();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@
package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.api.client.spi.transferprocess.TransferProcessApiClient;
import org.eclipse.edc.connector.dataplane.framework.iam.DataPlaneAuthorizationServiceImpl;
import org.eclipse.edc.connector.dataplane.framework.manager.DataPlaneManagerImpl;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceRegistryImpl;
import org.eclipse.edc.connector.dataplane.framework.registry.TransferServiceSelectionStrategy;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAccessControlService;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAccessTokenService;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.connector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataTransferExecutorServiceContainer;
import org.eclipse.edc.connector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.edc.connector.dataplane.spi.registry.TransferServiceRegistry;
import org.eclipse.edc.connector.dataplane.spi.store.DataPlaneStore;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.retry.ExponentialWaitStrategy;
Expand Down Expand Up @@ -88,6 +94,12 @@ public class DataPlaneFrameworkExtension implements ServiceExtension {

@Inject
private PipelineService pipelineService;
@Inject
private DataPlaneAccessTokenService accessTokenService;
@Inject
private DataPlaneAccessControlService accessControlService;
@Inject
private PublicEndpointGeneratorService endpointGenerator;

@Override
public String name() {
Expand Down Expand Up @@ -139,6 +151,11 @@ public void shutdown() {
}
}

@Provider
public DataPlaneAuthorizationService authorizationService(ServiceExtensionContext context) {
return new DataPlaneAuthorizationServiceImpl(accessTokenService, endpointGenerator, accessControlService, context.getParticipantId(), clock);
}

@NotNull
private EntityRetryProcessConfiguration getEntityRetryProcessConfiguration(ServiceExtensionContext context) {
var retryLimit = context.getSetting(DATAPLANE_SEND_RETRY_LIMIT, DEFAULT_SEND_RETRY_LIMIT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright (c) 2024 Metaform Systems, Inc.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Metaform Systems, Inc. - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.dataplane.spi.Endpoint;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;

import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;

class PublicEndpointGeneratorServiceImpl implements PublicEndpointGeneratorService {
private final Map<String, Function<DataAddress, Endpoint>> generatorFunctions = new ConcurrentHashMap<>();

@Override
public Result<Endpoint> generateFor(DataAddress sourceDataAddress) {
Objects.requireNonNull(sourceDataAddress);
Objects.requireNonNull(sourceDataAddress.getType());

return Optional.ofNullable(generatorFunctions.get(sourceDataAddress.getType()))
.map(function -> function.apply(sourceDataAddress))
.map(Result::success)
.orElseGet(() -> Result.failure("No Endpoint generator function registered for source data type '%s'".formatted(sourceDataAddress.getType())));
}

@Override
public void addGeneratorFunction(String destinationType, Function<DataAddress, Endpoint> generatorFunction) {
generatorFunctions.put(destinationType, generatorFunction);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.framework.iam;

import org.eclipse.edc.connector.dataplane.spi.Endpoint;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAccessControlService;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAccessTokenService;
import org.eclipse.edc.connector.dataplane.spi.iam.DataPlaneAuthorizationService;
import org.eclipse.edc.connector.dataplane.spi.iam.PublicEndpointGeneratorService;
import org.eclipse.edc.jwt.spi.JwtRegisteredClaimNames;
import org.eclipse.edc.spi.iam.TokenParameters;
import org.eclipse.edc.spi.iam.TokenRepresentation;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;

import java.time.Clock;
import java.util.Map;
import java.util.UUID;

import static org.eclipse.edc.spi.result.Result.success;

public class DataPlaneAuthorizationServiceImpl implements DataPlaneAuthorizationService {
public static final String CLAIM_AGREEMENT_ID = "agreement_id";
public static final String CLAIM_ASSET_ID = "asset_id";
public static final String CLAIM_PROCESS_ID = "process_id";
public static final String CLAIM_TRANSFER_TYPE = "transfer_type";
private final DataPlaneAccessTokenService accessTokenService;
private final PublicEndpointGeneratorService endpointGenerator;
private final DataPlaneAccessControlService accessControlService;
private final String ownParticipantId;
private final Clock clock;

public DataPlaneAuthorizationServiceImpl(DataPlaneAccessTokenService accessTokenService,
PublicEndpointGeneratorService endpointGenerator,
DataPlaneAccessControlService accessControlService,
String ownParticipantId,
Clock clock) {
this.accessTokenService = accessTokenService;
this.endpointGenerator = endpointGenerator;
this.accessControlService = accessControlService;
this.ownParticipantId = ownParticipantId;
this.clock = clock;
}

@Override
public Result<DataAddress> createEndpointDataReference(DataFlowStartMessage message) {
var endpoint = endpointGenerator.generateFor(message.getSourceDataAddress());

return endpoint.compose(e -> accessTokenService.obtainToken(createTokenParams(message), message.getSourceDataAddress()))
.compose(tokenRepresentation -> createDataAddress(tokenRepresentation, endpoint.getContent()));
}

@Override
public Result<DataAddress> authorize(String token, Map<String, Object> requestData) {
var accessTokenDataResult = accessTokenService.resolve(token);

return accessTokenDataResult
.compose(atd -> accessControlService.checkAccess(atd.claimToken(), atd.dataAddress(), requestData))
.map(u -> accessTokenDataResult.getContent().dataAddress());
}

private Result<DataAddress> createDataAddress(TokenRepresentation tokenRepresentation, Endpoint publicEndpoint) {
var address = DataAddress.Builder.newInstance()
.type(publicEndpoint.endpointType())
.property("endpoint", publicEndpoint.endpoint())
.property("endpointType", publicEndpoint.endpointType()) //this is duplicated in the type() field, but will make serialization easier
.properties(tokenRepresentation.getAdditional()) // would contain the "authType = bearer" entry
.property("authorization", tokenRepresentation.getToken())
.build();
paullatzelsperger marked this conversation as resolved.
Show resolved Hide resolved

return success(address);
}

private TokenParameters createTokenParams(DataFlowStartMessage message) {
return TokenParameters.Builder.newInstance()
.claims(JwtRegisteredClaimNames.JWT_ID, UUID.randomUUID().toString())
.claims(JwtRegisteredClaimNames.AUDIENCE, message.getParticipantId())
.claims(JwtRegisteredClaimNames.ISSUER, ownParticipantId)
.claims(JwtRegisteredClaimNames.SUBJECT, ownParticipantId)
.claims(JwtRegisteredClaimNames.ISSUED_AT, clock.instant().toEpochMilli()) // todo: milli or second?
.claims(CLAIM_AGREEMENT_ID, message.getAgreementId())
.claims(CLAIM_ASSET_ID, message.getAssetId())
.claims(CLAIM_PROCESS_ID, message.getProcessId())
.claims(CLAIM_TRANSFER_TYPE, message.getTransferType())
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,14 @@ public DefaultDataPlaneAccessTokenServiceImpl(TokenGenerationService tokenGenera
* Generates JWT tokens based on the {@link TokenParameters}. A representation of the claims and the data address is stored for subsequent use, using the token ID ("jti")
* as correlation id.
*
* @param parameters Headers and claims that are to be included in the token. If the claims do <em>not</em> contain a "jti" claim, one is generated randomly and inserted into the claims.
* @param address Information about the data resource for which the token is to be generated. May contain additional information about the token, such as an {@code authType}
* @param parameters Headers and claims that are to be included in the token. If the claims do <em>not</em> contain a "jti" claim, one is generated randomly and inserted into the claims.
* @param backendDataAddress Information about the data resource for which the token is to be generated. May contain additional information about the token, such as an {@code authType}
* @return A token representation in serialized JWT format (signed). The JWTs "kid" header contains the ID of the public key that can be used to verify the token.
*/
@Override
public Result<TokenRepresentation> obtainToken(TokenParameters parameters, DataAddress address) {
public Result<TokenRepresentation> obtainToken(TokenParameters parameters, DataAddress backendDataAddress) {
Objects.requireNonNull(parameters, "TokenParameters must be non-null.");
Objects.requireNonNull(address, "DataAddress must be non-null.");
Objects.requireNonNull(backendDataAddress, "DataAddress must be non-null.");
var claimDecorators = parameters.getClaims().entrySet().stream().map(e -> (TokenDecorator) claimDecorator -> claimDecorator.claims(e.getKey(), e.getValue()));
var headerDecorators = parameters.getHeaders().entrySet().stream().map(e -> (TokenDecorator) headerDecorator -> headerDecorator.header(e.getKey(), e.getValue()));

Expand All @@ -101,10 +101,12 @@ public Result<TokenRepresentation> obtainToken(TokenParameters parameters, DataA

// store a record of the token for future reference. We'll need that when we resolve the AccessTokenData later.
var claimToken = ClaimToken.Builder.newInstance().claims(parameters.getClaims()).build();
var accessTokenData = new AccessTokenData(id, claimToken, address);
var accessTokenData = new AccessTokenData(id, claimToken, backendDataAddress);

var storeResult = accessTokenDataStore.store(accessTokenData);
return storeResult.succeeded() ? Result.success(tokenResult.getContent()) : Result.failure(storeResult.getFailureMessages());
var content = tokenResult.getContent();
content.getAdditional().put("authType", "bearer");
return storeResult.succeeded() ? Result.success(content) : Result.failure(storeResult.getFailureMessages());
}


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (c) 2024 Metaform Systems, Inc.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Metaform Systems, Inc. - initial API and implementation
*
*/

package org.eclipse.edc.connector.dataplane.framework;

import org.eclipse.edc.connector.dataplane.spi.Endpoint;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.junit.jupiter.api.Test;

import java.util.Map;

import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;


class PublicEndpointGeneratorServiceImplTest {

private final PublicEndpointGeneratorServiceImpl generatorService = new PublicEndpointGeneratorServiceImpl();

@Test
void generateFor() {
var endpoint = new Endpoint(Map.of("fizz", "buzz"), "bar-type");
generatorService.addGeneratorFunction("testtype", dataAddress -> endpoint);

assertThat(generatorService.generateFor(DataAddress.Builder.newInstance().type("testtype").build())).isSucceeded()
.isEqualTo(endpoint);
}

@Test
void generateFor_noFunction() {
assertThat(generatorService.generateFor(DataAddress.Builder.newInstance().type("testtype").build()))
.isFailed()
.detail()
.isEqualTo("No Endpoint generator function registered for source data type 'testtype'");
}

}
Loading
Loading