Skip to content

Commit

Permalink
feat: implements Signaling Data Flow Controller (#3970)
Browse files Browse the repository at this point in the history
feat: implements Signaling Data Flow Controller + first E2E test with push
  • Loading branch information
wolf4ood authored Mar 8, 2024
1 parent 2887028 commit 3b1e083
Show file tree
Hide file tree
Showing 24 changed files with 1,038 additions and 101 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

plugins {
`java-library`
}

dependencies {
api(project(":spi:common:core-spi"))
api(project(":spi:control-plane:contract-spi"))
api(project(":spi:control-plane:transfer-spi"))

api(project(":spi:control-plane:transfer-data-plane-spi"))
api(project(":spi:data-plane:data-plane-spi"))
api(project(":extensions:data-plane:data-plane-signaling:data-plane-signaling-client"))
api(project(":spi:data-plane-selector:data-plane-selector-spi"))

testImplementation(project(":core:common:junit"))
}

edcBuild {
swagger {
apiGroup.set("control-api")
}
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.transfer.dataplane;

import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.transfer.dataplane.flow.DataPlaneSignalingFlowController;
import org.eclipse.edc.connector.transfer.spi.callback.ControlApiUrl;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;

import static org.eclipse.edc.connector.transfer.dataplane.TransferDataPlaneSignalingExtension.NAME;

@Extension(NAME)
public class TransferDataPlaneSignalingExtension implements ServiceExtension {

protected static final String NAME = "Transfer Data Plane Signaling Extension";

private static final String DEFAULT_DATAPLANE_SELECTOR_STRATEGY = "random";

@Setting(value = "Defines strategy for Data Plane instance selection in case Data Plane is not embedded in current runtime", defaultValue = DEFAULT_DATAPLANE_SELECTOR_STRATEGY)
private static final String DPF_SELECTOR_STRATEGY = "edc.dataplane.client.selector.strategy";
@Inject
private DataFlowManager dataFlowManager;
@Inject(required = false)
private ControlApiUrl callbackUrl;

@Inject
private DataPlaneSelectorService selectorService;

@Inject
private DataPlaneClientFactory clientFactory;


@Override
public void initialize(ServiceExtensionContext context) {
var selectionStrategy = context.getSetting(DPF_SELECTOR_STRATEGY, DEFAULT_DATAPLANE_SELECTOR_STRATEGY);
dataFlowManager.register(new DataPlaneSignalingFlowController(callbackUrl, selectorService, clientFactory, selectionStrategy));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.transfer.dataplane.flow;

import org.eclipse.edc.connector.dataplane.selector.spi.DataPlaneSelectorService;
import org.eclipse.edc.connector.dataplane.selector.spi.client.DataPlaneClientFactory;
import org.eclipse.edc.connector.dataplane.selector.spi.instance.DataPlaneInstance;
import org.eclipse.edc.connector.transfer.spi.callback.ControlApiUrl;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowController;
import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.policy.model.Policy;
import org.eclipse.edc.spi.response.ResponseStatus;
import org.eclipse.edc.spi.response.StatusResult;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.spi.types.domain.transfer.FlowType;
import org.jetbrains.annotations.NotNull;

import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Predicate;

import static java.util.stream.Collectors.toSet;

/**
* Implementation of {@link DataFlowController} that is compliant with the data plane signaling.
* <p>
* It handles all the transfer process where the transferType met the criteria defined in the format mapping of the
* signaling spec
*
* @see <a href="https://github.com/eclipse-edc/Connector/blob/main/docs/developer/data-plane-signaling/data-plane-signaling.md">Data plane signaling</a>
* @see <a href="https://github.com/eclipse-edc/Connector/blob/main/docs/developer/data-plane-signaling/data-plane-signaling-mapping.md">Data plane signaling transfer type mapping</a>
*/
public class DataPlaneSignalingFlowController implements DataFlowController {

private final ControlApiUrl callbackUrl;
private final DataPlaneSelectorService selectorClient;
private final DataPlaneClientFactory clientFactory;

private final String selectionStrategy;

public DataPlaneSignalingFlowController(ControlApiUrl callbackUrl, DataPlaneSelectorService selectorClient, DataPlaneClientFactory clientFactory, String selectionStrategy) {
this.callbackUrl = callbackUrl;
this.selectorClient = selectorClient;
this.clientFactory = clientFactory;
this.selectionStrategy = selectionStrategy;
}

@Override
public boolean canHandle(TransferProcess transferProcess) {
return extractFlowType(transferProcess).succeeded();
}

@Override
public @NotNull StatusResult<DataFlowResponse> start(TransferProcess transferProcess, Policy policy) {
var flowType = extractFlowType(transferProcess);
if (flowType.failed()) {
return StatusResult.failure(ResponseStatus.FATAL_ERROR, flowType.getFailureDetail());
}

var dataPlaneInstance = selectorClient.select(transferProcess.getContentDataAddress(), transferProcess.getDataDestination(), selectionStrategy, transferProcess.getTransferType());
var dataFlowRequest = DataFlowStartMessage.Builder.newInstance()
.id(UUID.randomUUID().toString())
.processId(transferProcess.getId())
.sourceDataAddress(transferProcess.getContentDataAddress())
.destinationDataAddress(transferProcess.getDataDestination())
.participantId(policy.getAssignee())
.agreementId(transferProcess.getContractId())
.assetId(transferProcess.getAssetId())
.flowType(flowType.getContent())
.callbackAddress(callbackUrl != null ? callbackUrl.get() : null)
.build();

var dataPlaneInstanceId = dataPlaneInstance != null ? dataPlaneInstance.getId() : null;

return clientFactory.createClient(dataPlaneInstance)
.start(dataFlowRequest)
.map(it -> DataFlowResponse.Builder.newInstance()
.dataAddress(it.getDataAddress())
.dataPlaneId(dataPlaneInstanceId)
.build()
);
}

@Override
public StatusResult<Void> terminate(TransferProcess transferProcess) {
return selectorClient.getAll().stream()
.filter(dataPlaneInstanceFilter(transferProcess))
.map(clientFactory::createClient)
.map(client -> client.terminate(transferProcess.getId()))
.reduce(StatusResult::merge)
.orElse(StatusResult.failure(ResponseStatus.FATAL_ERROR, "Failed to select the data plane for terminating the transfer process %s".formatted(transferProcess.getId())));
}

@Override
public Set<String> transferTypesFor(Asset asset) {
return selectorClient.getAll().stream()
.filter(it -> it.getAllowedSourceTypes().contains(asset.getDataAddress().getType()))
.map(DataPlaneInstance::getAllowedTransferTypes)
.flatMap(Collection::stream)
.collect(toSet());
}

private StatusResult<FlowType> extractFlowType(TransferProcess transferProcess) {
return Optional.ofNullable(transferProcess.getTransferType())
.map(transferType -> transferType.split("-"))
.filter(tokens -> tokens.length == 2)
.map(tokens -> parseFlowType(tokens[1]))
.orElse(StatusResult.failure(ResponseStatus.FATAL_ERROR, "Failed to extract flow type from transferType %s".formatted(transferProcess.getTransferType())));

}

private StatusResult<FlowType> parseFlowType(String flowType) {
try {
return StatusResult.success(FlowType.valueOf(flowType));
} catch (Exception e) {
return StatusResult.failure(ResponseStatus.FATAL_ERROR, "Unknown flow type %s".formatted(flowType));
}
}

private Predicate<DataPlaneInstance> dataPlaneInstanceFilter(TransferProcess transferProcess) {
if (transferProcess.getDataPlaneId() != null) {
return (dataPlaneInstance -> dataPlaneInstance.getId().equals(transferProcess.getDataPlaneId()));
} else {
return (d) -> true;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
#
# This program and the accompanying materials are made available under the
# terms of the Apache License, Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0
#
# SPDX-License-Identifier: Apache-2.0
#
# Contributors:
# Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
#
#

org.eclipse.edc.connector.transfer.dataplane.TransferDataPlaneSignalingExtension
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.transfer.dataplane;

import org.eclipse.edc.connector.transfer.dataplane.flow.DataPlaneSignalingFlowController;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager;
import org.eclipse.edc.junit.extensions.DependencyInjectionExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

@ExtendWith(DependencyInjectionExtension.class)
class TransferDataPlaneSignalingExtensionTest {

private final DataFlowManager dataFlowManager = mock();

@BeforeEach
void setup(ServiceExtensionContext context) {
context.registerService(DataFlowManager.class, dataFlowManager);
}

@Test
void initialize(ServiceExtensionContext context, TransferDataPlaneSignalingExtension extension) {
extension.initialize(context);
verify(dataFlowManager).register(isA(DataPlaneSignalingFlowController.class));
}
}
Loading

0 comments on commit 3b1e083

Please sign in to comment.