diff --git a/core/control-plane/contract-core/src/test/java/org/eclipse/edc/connector/contract/negotiation/ContractNegotiationIntegrationTest.java b/core/control-plane/contract-core/src/test/java/org/eclipse/edc/connector/contract/negotiation/ContractNegotiationIntegrationTest.java index 009ab8ebd1d..b0bb0e12df4 100644 --- a/core/control-plane/contract-core/src/test/java/org/eclipse/edc/connector/contract/negotiation/ContractNegotiationIntegrationTest.java +++ b/core/control-plane/contract-core/src/test/java/org/eclipse/edc/connector/contract/negotiation/ContractNegotiationIntegrationTest.java @@ -37,6 +37,8 @@ import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.policy.model.PolicyType; import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.monitor.ConsoleMonitor; import org.eclipse.edc.spi.protocol.ProtocolWebhook; @@ -85,7 +87,9 @@ class ContractNegotiationIntegrationTest { private final ContractValidationService validationService = mock(ContractValidationService.class); private final RemoteMessageDispatcherRegistry providerDispatcherRegistry = mock(RemoteMessageDispatcherRegistry.class); private final RemoteMessageDispatcherRegistry consumerDispatcherRegistry = mock(RemoteMessageDispatcherRegistry.class); + private final IdentityService identityService = mock(); protected ClaimToken token = ClaimToken.Builder.newInstance().build(); + protected TokenRepresentation tokenRepresentation = TokenRepresentation.Builder.newInstance().build(); private final ProtocolWebhook protocolWebhook = () -> "http://dummy"; private String consumerNegotiationId; @@ -119,8 +123,9 @@ void init() { .protocolWebhook(protocolWebhook) .build(); - consumerService = new ContractNegotiationProtocolServiceImpl(consumerStore, new NoopTransactionContext(), validationService, new ContractNegotiationObservableImpl(), monitor, mock(Telemetry.class)); - providerService = new ContractNegotiationProtocolServiceImpl(providerStore, new NoopTransactionContext(), validationService, new ContractNegotiationObservableImpl(), monitor, mock(Telemetry.class)); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(token)); + consumerService = new ContractNegotiationProtocolServiceImpl(consumerStore, new NoopTransactionContext(), validationService, identityService, new ContractNegotiationObservableImpl(), monitor, mock(Telemetry.class)); + providerService = new ContractNegotiationProtocolServiceImpl(providerStore, new NoopTransactionContext(), validationService, identityService, new ContractNegotiationObservableImpl(), monitor, mock(Telemetry.class)); } @AfterEach @@ -261,7 +266,7 @@ private Answer onConsumerSentOfferRequest() { return i -> { ContractRequestMessage request = i.getArgument(1); consumerNegotiationId = request.getProcessId(); - var result = providerService.notifyRequested(request, token); + var result = providerService.notifyRequested(request, tokenRepresentation); return toFuture(result); }; } @@ -270,7 +275,7 @@ private Answer onConsumerSentOfferRequest() { private Answer onConsumerSentRejection() { return i -> { ContractNegotiationTerminationMessage request = i.getArgument(1); - var result = providerService.notifyTerminated(request, token); + var result = providerService.notifyTerminated(request, tokenRepresentation); return toFuture(result); }; } @@ -279,7 +284,7 @@ private Answer onConsumerSentRejection() { private Answer onProviderSentAgreementRequest() { return i -> { ContractAgreementMessage request = i.getArgument(1); - var result = consumerService.notifyAgreed(request, token); + var result = consumerService.notifyAgreed(request, tokenRepresentation); return toFuture(result); }; } @@ -288,7 +293,7 @@ private Answer onProviderSentAgreementRequest() { private Answer onProviderSentNegotiationEventMessage() { return i -> { ContractNegotiationEventMessage request = i.getArgument(1); - var result = consumerService.notifyFinalized(request, token); + var result = consumerService.notifyFinalized(request, tokenRepresentation); return toFuture(result); }; } @@ -297,7 +302,7 @@ private Answer onProviderSentNegotiationEventMessage() { private Answer onConsumerSentAgreementVerification() { return i -> { ContractAgreementVerificationMessage request = i.getArgument(1); - var result = providerService.notifyVerified(request, token); + var result = providerService.notifyVerified(request, tokenRepresentation); return toFuture(result); }; } @@ -306,7 +311,7 @@ private Answer onConsumerSentAgreementVerification() { private Answer onProviderSentRejection() { return i -> { ContractNegotiationTerminationMessage request = i.getArgument(1); - var result = consumerService.notifyTerminated(request, token); + var result = consumerService.notifyTerminated(request, tokenRepresentation); return toFuture(result); }; } diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/ControlPlaneServicesExtension.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/ControlPlaneServicesExtension.java index 714435f4186..b151e50508d 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/ControlPlaneServicesExtension.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/ControlPlaneServicesExtension.java @@ -58,6 +58,7 @@ import org.eclipse.edc.spi.asset.AssetIndex; import org.eclipse.edc.spi.command.CommandHandlerRegistry; import org.eclipse.edc.spi.event.EventRouter; +import org.eclipse.edc.spi.iam.IdentityService; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.system.ServiceExtension; @@ -136,6 +137,9 @@ public class ControlPlaneServicesExtension implements ServiceExtension { @Inject private DataAddressValidatorRegistry dataAddressValidator; + @Inject + private IdentityService identityService; + @Override public String name() { return NAME; @@ -155,7 +159,7 @@ public CatalogService catalogService() { @Provider public CatalogProtocolService catalogProtocolService(ServiceExtensionContext context) { - return new CatalogProtocolServiceImpl(datasetResolver, participantAgentService, dataServiceRegistry, context.getParticipantId()); + return new CatalogProtocolServiceImpl(datasetResolver, participantAgentService, dataServiceRegistry, identityService, monitor, context.getParticipantId()); } @Provider @@ -178,7 +182,7 @@ public ContractNegotiationService contractNegotiationService() { @Provider public ContractNegotiationProtocolService contractNegotiationProtocolService() { return new ContractNegotiationProtocolServiceImpl(contractNegotiationStore, - transactionContext, contractValidationService, contractNegotiationObservable, + transactionContext, contractValidationService, identityService, contractNegotiationObservable, monitor, telemetry); } @@ -198,6 +202,6 @@ public TransferProcessService transferProcessService() { @Provider public TransferProcessProtocolService transferProcessProtocolService() { return new TransferProcessProtocolServiceImpl(transferProcessStore, transactionContext, contractNegotiationStore, - contractValidationService, dataAddressValidator, transferProcessObservable, clock, monitor, telemetry); + contractValidationService, identityService, dataAddressValidator, transferProcessObservable, clock, monitor, telemetry); } } diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImpl.java index fdc76761915..645b649e84b 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImpl.java @@ -19,9 +19,12 @@ import org.eclipse.edc.catalog.spi.DataServiceRegistry; import org.eclipse.edc.catalog.spi.Dataset; import org.eclipse.edc.catalog.spi.DatasetResolver; +import org.eclipse.edc.connector.service.protocol.BaseProtocolService; import org.eclipse.edc.connector.spi.catalog.CatalogProtocolService; import org.eclipse.edc.spi.agent.ParticipantAgentService; -import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.iam.TokenRepresentation; +import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.result.ServiceResult; import org.jetbrains.annotations.NotNull; @@ -29,7 +32,7 @@ import static java.util.stream.Collectors.toList; import static org.eclipse.edc.spi.CoreConstants.EDC_NAMESPACE; -public class CatalogProtocolServiceImpl implements CatalogProtocolService { +public class CatalogProtocolServiceImpl extends BaseProtocolService implements CatalogProtocolService { private static final String PARTICIPANT_ID_PROPERTY_KEY = "participantId"; @@ -40,7 +43,11 @@ public class CatalogProtocolServiceImpl implements CatalogProtocolService { public CatalogProtocolServiceImpl(DatasetResolver datasetResolver, ParticipantAgentService participantAgentService, - DataServiceRegistry dataServiceRegistry, String participantId) { + DataServiceRegistry dataServiceRegistry, + IdentityService identityService, + Monitor monitor, + String participantId) { + super(identityService, monitor); this.datasetResolver = datasetResolver; this.participantAgentService = participantAgentService; this.dataServiceRegistry = dataServiceRegistry; @@ -49,32 +56,34 @@ public CatalogProtocolServiceImpl(DatasetResolver datasetResolver, @Override @NotNull - public ServiceResult getCatalog(CatalogRequestMessage message, ClaimToken claimToken) { - var agent = participantAgentService.createFor(claimToken); + public ServiceResult getCatalog(CatalogRequestMessage message, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).map(claimToken -> { + var agent = participantAgentService.createFor(claimToken); - try (var datasets = datasetResolver.query(agent, message.getQuerySpec())) { - var dataServices = dataServiceRegistry.getDataServices(); + try (var datasets = datasetResolver.query(agent, message.getQuerySpec())) { + var dataServices = dataServiceRegistry.getDataServices(); - var catalog = Catalog.Builder.newInstance() - .dataServices(dataServices) - .datasets(datasets.collect(toList())) - .property(EDC_NAMESPACE + PARTICIPANT_ID_PROPERTY_KEY, participantId) - .build(); - - return ServiceResult.success(catalog); - } + return Catalog.Builder.newInstance() + .dataServices(dataServices) + .datasets(datasets.collect(toList())) + .property(EDC_NAMESPACE + PARTICIPANT_ID_PROPERTY_KEY, participantId) + .build(); + } + }); } @Override - public @NotNull ServiceResult getDataset(String datasetId, ClaimToken claimToken) { - var agent = participantAgentService.createFor(claimToken); + public @NotNull ServiceResult getDataset(String datasetId, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> { + var agent = participantAgentService.createFor(claimToken); - var dataset = datasetResolver.getById(agent, datasetId); + var dataset = datasetResolver.getById(agent, datasetId); - if (dataset == null) { - return ServiceResult.notFound(format("Dataset %s does not exist", datasetId)); - } + if (dataset == null) { + return ServiceResult.notFound(format("Dataset %s does not exist", datasetId)); + } - return ServiceResult.success(dataset); + return ServiceResult.success(dataset); + }); } } diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationProtocolServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationProtocolServiceImpl.java index 11cf7e55b11..e42bd924eb2 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationProtocolServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationProtocolServiceImpl.java @@ -28,8 +28,11 @@ import org.eclipse.edc.connector.contract.spi.types.protocol.ContractRemoteMessage; import org.eclipse.edc.connector.contract.spi.validation.ContractValidationService; import org.eclipse.edc.connector.contract.spi.validation.ValidatedConsumerOffer; +import org.eclipse.edc.connector.service.protocol.BaseProtocolService; import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationProtocolService; import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.result.ServiceResult; import org.eclipse.edc.spi.telemetry.Telemetry; @@ -41,7 +44,7 @@ import static org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation.Type.PROVIDER; -public class ContractNegotiationProtocolServiceImpl implements ContractNegotiationProtocolService { +public class ContractNegotiationProtocolServiceImpl extends BaseProtocolService implements ContractNegotiationProtocolService { private final ContractNegotiationStore store; private final TransactionContext transactionContext; @@ -52,8 +55,11 @@ public class ContractNegotiationProtocolServiceImpl implements ContractNegotiati public ContractNegotiationProtocolServiceImpl(ContractNegotiationStore store, TransactionContext transactionContext, - ContractValidationService validationService, ContractNegotiationObservable observable, + ContractValidationService validationService, + IdentityService identityService, + ContractNegotiationObservable observable, Monitor monitor, Telemetry telemetry) { + super(identityService, monitor); this.store = store; this.transactionContext = transactionContext; this.validationService = validationService; @@ -65,106 +71,108 @@ public ContractNegotiationProtocolServiceImpl(ContractNegotiationStore store, @Override @WithSpan @NotNull - public ServiceResult notifyRequested(ContractRequestMessage message, ClaimToken claimToken) { - return transactionContext.execute(() -> validateOffer(message, claimToken) - .compose(validatedOffer -> getNegotiation(message) - .recover(f -> createNegotiation(message, validatedOffer)) - .onSuccess(n -> n.addContractOffer(validatedOffer.getOffer())) - ) - .onSuccess(negotiation -> { - negotiation.transitionRequested(); - update(negotiation); - observable.invokeForEach(l -> l.requested(negotiation)); - })); + public ServiceResult notifyRequested(ContractRequestMessage message, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> transactionContext.execute(() -> validateOffer(message, claimToken) + .compose(validatedOffer -> getNegotiation(message) + .recover(f -> createNegotiation(message, validatedOffer)) + .onSuccess(n -> n.addContractOffer(validatedOffer.getOffer())) + ) + .onSuccess(negotiation -> { + negotiation.transitionRequested(); + update(negotiation); + observable.invokeForEach(l -> l.requested(negotiation)); + }))); + } @Override @WithSpan @NotNull - public ServiceResult notifyOffered(ContractOfferMessage message, ClaimToken claimToken) { - return transactionContext.execute(() -> getNegotiation(message) + public ServiceResult notifyOffered(ContractOfferMessage message, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> transactionContext.execute(() -> getNegotiation(message) .compose(negotiation -> validateRequest(claimToken, negotiation)) .onSuccess(negotiation -> { negotiation.addContractOffer(message.getContractOffer()); negotiation.transitionOffered(); update(negotiation); observable.invokeForEach(l -> l.offered(negotiation)); - })); + }))); } @Override @WithSpan @NotNull - public ServiceResult notifyAccepted(ContractNegotiationEventMessage message, ClaimToken claimToken) { - return transactionContext.execute(() -> getNegotiation(message) + public ServiceResult notifyAccepted(ContractNegotiationEventMessage message, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> transactionContext.execute(() -> getNegotiation(message) .compose(negotiation -> validateRequest(claimToken, negotiation)) .onSuccess(negotiation -> { negotiation.transitionAccepted(); update(negotiation); observable.invokeForEach(l -> l.accepted(negotiation)); - })); + }))); + } @Override @WithSpan @NotNull - public ServiceResult notifyAgreed(ContractAgreementMessage message, ClaimToken claimToken) { - return transactionContext.execute(() -> getNegotiation(message) + public ServiceResult notifyAgreed(ContractAgreementMessage message, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> transactionContext.execute(() -> getNegotiation(message) .compose(negotiation -> validateAgreed(message, claimToken, negotiation)) .onSuccess(negotiation -> { negotiation.setContractAgreement(message.getContractAgreement()); negotiation.transitionAgreed(); update(negotiation); observable.invokeForEach(l -> l.agreed(negotiation)); - })); + }))); } @Override @WithSpan @NotNull - public ServiceResult notifyVerified(ContractAgreementVerificationMessage message, ClaimToken claimToken) { - return transactionContext.execute(() -> getNegotiation(message) + public ServiceResult notifyVerified(ContractAgreementVerificationMessage message, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> transactionContext.execute(() -> getNegotiation(message) .compose(negotiation -> validateRequest(claimToken, negotiation)) .onSuccess(negotiation -> { negotiation.transitionVerified(); update(negotiation); observable.invokeForEach(l -> l.verified(negotiation)); - })); + }))); } @Override @WithSpan @NotNull - public ServiceResult notifyFinalized(ContractNegotiationEventMessage message, ClaimToken claimToken) { - return transactionContext.execute(() -> getNegotiation(message) + public ServiceResult notifyFinalized(ContractNegotiationEventMessage message, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> transactionContext.execute(() -> getNegotiation(message) .compose(negotiation -> validateRequest(claimToken, negotiation)) .onSuccess(negotiation -> { negotiation.transitionFinalized(); update(negotiation); observable.invokeForEach(l -> l.finalized(negotiation)); - })); + }))); } @Override @WithSpan @NotNull - public ServiceResult notifyTerminated(ContractNegotiationTerminationMessage message, ClaimToken claimToken) { - return transactionContext.execute(() -> getNegotiation(message) + public ServiceResult notifyTerminated(ContractNegotiationTerminationMessage message, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> transactionContext.execute(() -> getNegotiation(message) .compose(negotiation -> validateRequest(claimToken, negotiation)) .onSuccess(negotiation -> { negotiation.transitionTerminated(); update(negotiation); observable.invokeForEach(l -> l.terminated(negotiation)); - })); + }))); } @Override @WithSpan @NotNull - public ServiceResult findById(String id, ClaimToken claimToken) { - return transactionContext.execute(() -> Optional.ofNullable(store.findById(id)) + public ServiceResult findById(String id, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> transactionContext.execute(() -> Optional.ofNullable(store.findById(id)) .map(negotiation -> validateRequest(claimToken, negotiation)) - .orElse(ServiceResult.notFound("No negotiation with id %s found".formatted(id)))); + .orElse(ServiceResult.notFound("No negotiation with id %s found".formatted(id))))); } @NotNull diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/protocol/BaseProtocolService.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/protocol/BaseProtocolService.java new file mode 100644 index 00000000000..63ee9989749 --- /dev/null +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/protocol/BaseProtocolService.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG) + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation + * + */ + +package org.eclipse.edc.connector.service.protocol; + +import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.iam.TokenRepresentation; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.spi.result.ServiceResult; + +/** + * Base class for all protocol service implementation. This will contain common logic such as validating the JWT token + * and extracting the {@link ClaimToken} + */ +public abstract class BaseProtocolService { + + private final IdentityService identityService; + + private final Monitor monitor; + + protected BaseProtocolService(IdentityService identityService, Monitor monitor) { + this.identityService = identityService; + this.monitor = monitor; + } + + /** + * Validate and extract the {@link ClaimToken} from the input {@link TokenRepresentation} by using the {@link IdentityService} + * + * @param tokenRepresentation The input {@link TokenRepresentation} + * @return The {@link ClaimToken} if success, failure otherwise + */ + public ServiceResult verifyToken(TokenRepresentation tokenRepresentation) { + // TODO: since we are pushing here the invocation of the IdentityService we don't know the audience here + // The audience removal will be tackle next. IdentityService that relies on this parameter would not work + // for the time being. + var result = identityService.verifyJwtToken(tokenRepresentation, null); + + if (result.failed()) { + monitor.debug(() -> "Unauthorized: %s".formatted(result.getFailureDetail())); + return ServiceResult.unauthorized("Unauthorized"); + } + return ServiceResult.success(result.getContent()); + } +} diff --git a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImpl.java b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImpl.java index 1d8f9608b1a..f69a1ab91ec 100644 --- a/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImpl.java +++ b/core/control-plane/control-plane-aggregate-services/src/main/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImpl.java @@ -18,6 +18,7 @@ import io.opentelemetry.instrumentation.annotations.WithSpan; import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; import org.eclipse.edc.connector.contract.spi.validation.ContractValidationService; +import org.eclipse.edc.connector.service.protocol.BaseProtocolService; import org.eclipse.edc.connector.spi.transferprocess.TransferProcessProtocolService; import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessObservable; import org.eclipse.edc.connector.transfer.spi.observe.TransferProcessStartedData; @@ -31,6 +32,8 @@ import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage; import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferTerminationMessage; import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.result.ServiceResult; @@ -50,7 +53,7 @@ import static org.eclipse.edc.connector.transfer.spi.types.TransferProcess.Type.CONSUMER; import static org.eclipse.edc.connector.transfer.spi.types.TransferProcess.Type.PROVIDER; -public class TransferProcessProtocolServiceImpl implements TransferProcessProtocolService { +public class TransferProcessProtocolServiceImpl extends BaseProtocolService implements TransferProcessProtocolService { private final TransferProcessStore transferProcessStore; private final TransactionContext transactionContext; @@ -65,8 +68,10 @@ public class TransferProcessProtocolServiceImpl implements TransferProcessProtoc public TransferProcessProtocolServiceImpl(TransferProcessStore transferProcessStore, TransactionContext transactionContext, ContractNegotiationStore negotiationStore, ContractValidationService contractValidationService, + IdentityService identityService, DataAddressValidatorRegistry dataAddressValidator, TransferProcessObservable observable, Clock clock, Monitor monitor, Telemetry telemetry) { + super(identityService, monitor); this.transferProcessStore = transferProcessStore; this.transactionContext = transactionContext; this.negotiationStore = negotiationStore; @@ -81,50 +86,53 @@ public TransferProcessProtocolServiceImpl(TransferProcessStore transferProcessSt @Override @WithSpan @NotNull - public ServiceResult notifyRequested(TransferRequestMessage message, ClaimToken claimToken) { - var destination = message.getDataDestination(); - if (destination != null) { - var validDestination = dataAddressValidator.validateDestination(destination); - if (validDestination.failed()) { - return ServiceResult.badRequest(validDestination.getFailureMessages()); + public ServiceResult notifyRequested(TransferRequestMessage message, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> { + var destination = message.getDataDestination(); + if (destination != null) { + var validDestination = dataAddressValidator.validateDestination(destination); + if (validDestination.failed()) { + return ServiceResult.badRequest(validDestination.getFailureMessages()); + } } - } - return transactionContext.execute(() -> - Optional.ofNullable(negotiationStore.findContractAgreement(message.getContractId())) - .filter(agreement -> contractValidationService.validateAgreement(claimToken, agreement).succeeded()) - .map(agreement -> requestedAction(message, agreement.getAssetId())) - .orElse(ServiceResult.conflict(format("Cannot process %s because %s", message.getClass().getSimpleName(), "agreement not found or not valid")))); + return transactionContext.execute(() -> + Optional.ofNullable(negotiationStore.findContractAgreement(message.getContractId())) + .filter(agreement -> contractValidationService.validateAgreement(claimToken, agreement).succeeded()) + .map(agreement -> requestedAction(message, agreement.getAssetId())) + .orElse(ServiceResult.conflict(format("Cannot process %s because %s", message.getClass().getSimpleName(), "agreement not found or not valid")))); + }); + } @Override @WithSpan @NotNull - public ServiceResult notifyStarted(TransferStartMessage message, ClaimToken claimToken) { - return onMessageDo(message, claimToken, transferProcess -> startedAction(message, transferProcess)); + public ServiceResult notifyStarted(TransferStartMessage message, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> onMessageDo(message, claimToken, transferProcess -> startedAction(message, transferProcess))); } @Override @WithSpan @NotNull - public ServiceResult notifyCompleted(TransferCompletionMessage message, ClaimToken claimToken) { - return onMessageDo(message, claimToken, transferProcess -> completedAction(message, transferProcess)); + public ServiceResult notifyCompleted(TransferCompletionMessage message, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> onMessageDo(message, claimToken, transferProcess -> completedAction(message, transferProcess))); } @Override @WithSpan @NotNull - public ServiceResult notifyTerminated(TransferTerminationMessage message, ClaimToken claimToken) { - return onMessageDo(message, claimToken, transferProcess -> terminatedAction(message, transferProcess)); + public ServiceResult notifyTerminated(TransferTerminationMessage message, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> onMessageDo(message, claimToken, transferProcess -> terminatedAction(message, transferProcess))); } @Override @WithSpan @NotNull - public ServiceResult findById(String id, ClaimToken claimToken) { - return transactionContext.execute(() -> Optional.ofNullable(transferProcessStore.findById(id)) + public ServiceResult findById(String id, TokenRepresentation tokenRepresentation) { + return verifyToken(tokenRepresentation).compose(claimToken -> transactionContext.execute(() -> Optional.ofNullable(transferProcessStore.findById(id)) .map(tp -> validateCounterParty(claimToken, tp)) - .orElse(notFound(id))); + .orElse(notFound(id)))); } @NotNull diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/asset/AssetEventDispatchTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/asset/AssetEventDispatchTest.java index 76519407644..567c08764a3 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/asset/AssetEventDispatchTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/asset/AssetEventDispatchTest.java @@ -22,6 +22,7 @@ import org.eclipse.edc.junit.extensions.EdcExtension; import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.event.EventSubscriber; +import org.eclipse.edc.spi.iam.IdentityService; import org.eclipse.edc.spi.protocol.ProtocolWebhook; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.asset.Asset; @@ -49,6 +50,7 @@ public class AssetEventDispatchTest { void setUp(EdcExtension extension) { extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class)); extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class)); + extension.registerServiceMock(IdentityService.class, mock()); extension.setConfiguration(Map.of( "web.http.port", String.valueOf(getFreePort()), "web.http.path", "/api" diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImplTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImplTest.java index 454b427f400..bb21f62cb00 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImplTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/catalog/CatalogProtocolServiceImplTest.java @@ -24,7 +24,10 @@ import org.eclipse.edc.spi.agent.ParticipantAgent; import org.eclipse.edc.spi.agent.ParticipantAgentService; import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.query.QuerySpec; +import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.result.ServiceFailure; import org.junit.jupiter.api.Test; @@ -36,6 +39,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.eclipse.edc.spi.result.ServiceFailure.Reason.NOT_FOUND; +import static org.eclipse.edc.spi.result.ServiceFailure.Reason.UNAUTHORIZED; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -47,21 +51,24 @@ class CatalogProtocolServiceImplTest { private final DatasetResolver datasetResolver = mock(DatasetResolver.class); private final ParticipantAgentService participantAgentService = mock(ParticipantAgentService.class); private final DataServiceRegistry dataServiceRegistry = mock(DataServiceRegistry.class); - - private final CatalogProtocolServiceImpl service = new CatalogProtocolServiceImpl(datasetResolver, participantAgentService, dataServiceRegistry, "participantId"); + private final IdentityService identityService = mock(); + private final CatalogProtocolServiceImpl service = new CatalogProtocolServiceImpl(datasetResolver, participantAgentService, dataServiceRegistry, identityService, mock(), "participantId"); @Test void getCatalog_shouldReturnCatalogWithConnectorDataServiceAndItsDataset() { var querySpec = QuerySpec.none(); var message = CatalogRequestMessage.Builder.newInstance().protocol("protocol").querySpec(querySpec).build(); - var token = createToken(); + var token = create(); + var tokenRepresentation = createTokenRepresentation(); var participantAgent = createParticipantAgent(); var dataService = DataService.Builder.newInstance().build(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(token)); when(dataServiceRegistry.getDataServices()).thenReturn(List.of(dataService)); when(datasetResolver.query(any(), any())).thenReturn(Stream.of(createDataset())); when(participantAgentService.createFor(any())).thenReturn(participantAgent); - var result = service.getCatalog(message, token); + var result = service.getCatalog(message, tokenRepresentation); assertThat(result).isSucceeded().satisfies(catalog -> { assertThat(catalog.getDataServices()).hasSize(1).first().isSameAs(dataService); @@ -71,15 +78,32 @@ void getCatalog_shouldReturnCatalogWithConnectorDataServiceAndItsDataset() { verify(participantAgentService).createFor(token); } + @Test + void getCatalog_shouldFail_whenTokenValidationFails() { + var querySpec = QuerySpec.none(); + var message = CatalogRequestMessage.Builder.newInstance().protocol("protocol").querySpec(querySpec).build(); + var tokenRepresentation = createTokenRepresentation(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.failure("unauthorized")); + + var result = service.getCatalog(message, tokenRepresentation); + + assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(UNAUTHORIZED); + } + @Test void getDataset_shouldReturnDataset() { - var claimToken = createToken(); + var claimToken = create(); + var tokenRepresentation = createTokenRepresentation(); + var participantAgent = createParticipantAgent(); var dataset = createDataset(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(participantAgentService.createFor(any())).thenReturn(participantAgent); when(datasetResolver.getById(any(), any())).thenReturn(dataset); - var result = service.getDataset("datasetId", claimToken); + var result = service.getDataset("datasetId", tokenRepresentation); assertThat(result).isSucceeded().isEqualTo(dataset); verify(participantAgentService).createFor(claimToken); @@ -88,16 +112,31 @@ void getDataset_shouldReturnDataset() { @Test void getDataset_shouldFail_whenDatasetIsNull() { - var claimToken = createToken(); + var claimToken = create(); + var tokenRepresentation = createTokenRepresentation(); var participantAgent = createParticipantAgent(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(participantAgentService.createFor(any())).thenReturn(participantAgent); when(datasetResolver.getById(any(), any())).thenReturn(null); - var result = service.getDataset("datasetId", claimToken); + var result = service.getDataset("datasetId", tokenRepresentation); assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND); } + @Test + void getDataset_shouldFail_whenTokenValidationFails() { + var querySpec = QuerySpec.none(); + var tokenRepresentation = createTokenRepresentation(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.failure("unauthorized")); + + var result = service.getDataset("datasetId", tokenRepresentation); + + assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(UNAUTHORIZED); + } + private ParticipantAgent createParticipantAgent() { return new ParticipantAgent(emptyMap(), emptyMap()); } @@ -111,7 +150,11 @@ private Dataset createDataset() { .build(); } - private ClaimToken createToken() { + private ClaimToken create() { return ClaimToken.Builder.newInstance().build(); } + + private TokenRepresentation createTokenRepresentation() { + return TokenRepresentation.Builder.newInstance().build(); + } } diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractdefinition/ContractDefinitionEventDispatchTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractdefinition/ContractDefinitionEventDispatchTest.java index 2ae57a40374..f4f41be707f 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractdefinition/ContractDefinitionEventDispatchTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractdefinition/ContractDefinitionEventDispatchTest.java @@ -23,6 +23,7 @@ import org.eclipse.edc.junit.extensions.EdcExtension; import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.event.EventSubscriber; +import org.eclipse.edc.spi.iam.IdentityService; import org.eclipse.edc.spi.protocol.ProtocolWebhook; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -47,6 +48,7 @@ public class ContractDefinitionEventDispatchTest { void setUp(EdcExtension extension) { extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class)); extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class)); + extension.registerServiceMock(IdentityService.class, mock()); extension.setConfiguration(Map.of( "web.http.port", String.valueOf(getFreePort()), "web.http.path", "/api" diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationEventDispatchTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationEventDispatchTest.java index 0d114eb8018..64cff7d1d95 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationEventDispatchTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationEventDispatchTest.java @@ -33,10 +33,13 @@ import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.event.EventSubscriber; import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.message.RemoteMessageDispatcher; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.protocol.ProtocolWebhook; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.asset.Asset; import org.eclipse.edc.spi.types.domain.offer.ContractOffer; @@ -46,6 +49,7 @@ import org.junit.jupiter.api.extension.ExtendWith; import java.util.Map; +import java.util.UUID; import static java.util.Collections.emptyList; import static java.util.concurrent.CompletableFuture.completedFuture; @@ -54,6 +58,7 @@ import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -64,8 +69,12 @@ class ContractNegotiationEventDispatchTest { private static final String PROVIDER = "provider"; private final EventSubscriber eventSubscriber = mock(EventSubscriber.class); + private final IdentityService identityService = mock(); private final ClaimToken token = ClaimToken.Builder.newInstance().claim(ParticipantAgentService.DEFAULT_IDENTITY_CLAIM_KEY, CONSUMER).build(); + private final TokenRepresentation tokenRepresentation = TokenRepresentation.Builder.newInstance().token(UUID.randomUUID().toString()).build(); + + @BeforeEach void setUp(EdcExtension extension) { extension.setConfiguration(Map.of( @@ -77,6 +86,7 @@ void setUp(EdcExtension extension) { extension.registerServiceMock(NegotiationWaitStrategy.class, () -> 1); extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class)); extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class)); + extension.registerServiceMock(IdentityService.class, identityService); } @Test @@ -88,6 +98,7 @@ void shouldDispatchEventsOnProviderContractNegotiationStateChanges(EventRouter e AssetIndex assetIndex) { dispatcherRegistry.register(succeedingDispatcher()); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(token)); eventRouter.register(ContractNegotiationEvent.class, eventSubscriber); var policy = Policy.Builder.newInstance().build(); var contractDefinition = ContractDefinition.Builder.newInstance() @@ -100,7 +111,7 @@ void shouldDispatchEventsOnProviderContractNegotiationStateChanges(EventRouter e policyDefinitionStore.create(PolicyDefinition.Builder.newInstance().id("policyId").policy(policy).build()); assetIndex.create(Asset.Builder.newInstance().id("assetId").dataAddress(DataAddress.Builder.newInstance().type("any").build()).build()); - service.notifyRequested(createContractOfferRequest(policy, "assetId"), token); + service.notifyRequested(createContractOfferRequest(policy, "assetId"), tokenRepresentation); await().untilAsserted(() -> { verify(eventSubscriber).on(argThat(isEnvelopeOf(ContractNegotiationRequested.class))); diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationProtocolServiceImplTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationProtocolServiceImplTest.java index 8750d420c74..b86a6481e77 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationProtocolServiceImplTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/contractnegotiation/ContractNegotiationProtocolServiceImplTest.java @@ -30,6 +30,8 @@ import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationProtocolService; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.result.ServiceFailure; import org.eclipse.edc.spi.result.ServiceResult; @@ -65,6 +67,7 @@ import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat; import static org.eclipse.edc.spi.result.ServiceFailure.Reason.BAD_REQUEST; import static org.eclipse.edc.spi.result.ServiceFailure.Reason.NOT_FOUND; +import static org.eclipse.edc.spi.result.ServiceFailure.Reason.UNAUTHORIZED; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; @@ -84,23 +87,23 @@ class ContractNegotiationProtocolServiceImplTest { private final TransactionContext transactionContext = spy(new NoopTransactionContext()); private final ContractValidationService validationService = mock(); private final ContractNegotiationListener listener = mock(); + private final IdentityService identityService = mock(); private ContractNegotiationProtocolService service; @BeforeEach void setUp() { var observable = new ContractNegotiationObservableImpl(); observable.registerListener(listener); - service = new ContractNegotiationProtocolServiceImpl(store, transactionContext, validationService, observable, + service = new ContractNegotiationProtocolServiceImpl(store, transactionContext, validationService, identityService, observable, mock(), mock()); } @Test void notifyRequested_shouldInitiateNegotiation_whenNegotiationDoesNotExist() { - var token = ClaimToken.Builder.newInstance().build(); + var claimToken = ClaimToken.Builder.newInstance().build(); + var tokenRepresentation = tokenRepresentation(); var contractOffer = contractOffer(); var validatedOffer = new ValidatedConsumerOffer(CONSUMER_ID, contractOffer); - when(store.findByCorrelationIdAndLease(any())).thenReturn(StoreResult.notFound("not found")); - when(validationService.validateInitialOffer(token, contractOffer)).thenReturn(Result.success(validatedOffer)); var message = ContractRequestMessage.Builder.newInstance() .callbackAddress("callbackAddress") .protocol("protocol") @@ -108,7 +111,11 @@ void notifyRequested_shouldInitiateNegotiation_whenNegotiationDoesNotExist() { .processId("processId") .build(); - var result = service.notifyRequested(message, token); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease(any())).thenReturn(StoreResult.notFound("not found")); + when(validationService.validateInitialOffer(claimToken, contractOffer)).thenReturn(Result.success(validatedOffer)); + + var result = service.notifyRequested(message, tokenRepresentation); assertThat(result).isSucceeded(); var calls = ArgumentCaptor.forClass(ContractNegotiation.class); @@ -122,18 +129,17 @@ void notifyRequested_shouldInitiateNegotiation_whenNegotiationDoesNotExist() { assertThat(n.getLastContractOffer()).isEqualTo(contractOffer); }); verify(listener).requested(any()); - verify(validationService).validateInitialOffer(token, contractOffer); + verify(validationService).validateInitialOffer(claimToken, contractOffer); verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); } @Test void notifyRequested_shouldTransitionToRequested_whenNegotiationFound() { - var token = ClaimToken.Builder.newInstance().build(); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var contractOffer = contractOffer(); var validatedOffer = new ValidatedConsumerOffer(CONSUMER_ID, contractOffer); var negotiation = createContractNegotiationOffered(); - when(store.findByCorrelationIdAndLease(any())).thenReturn(StoreResult.success(negotiation)); - when(validationService.validateInitialOffer(token, contractOffer)).thenReturn(Result.success(validatedOffer)); var message = ContractRequestMessage.Builder.newInstance() .callbackAddress("callbackAddress") .protocol("protocol") @@ -141,7 +147,12 @@ void notifyRequested_shouldTransitionToRequested_whenNegotiationFound() { .processId("processId") .build(); - var result = service.notifyRequested(message, token); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease(any())).thenReturn(StoreResult.success(negotiation)); + when(validationService.validateInitialOffer(claimToken, contractOffer)).thenReturn(Result.success(validatedOffer)); + + + var result = service.notifyRequested(message, tokenRepresentation); assertThat(result).isSucceeded(); var calls = ArgumentCaptor.forClass(ContractNegotiation.class); @@ -156,14 +167,15 @@ void notifyRequested_shouldTransitionToRequested_whenNegotiationFound() { }); verify(listener).requested(any()); verify(store).findByCorrelationIdAndLease("processId"); - verify(validationService).validateInitialOffer(token, contractOffer); + verify(validationService).validateInitialOffer(claimToken, contractOffer); verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); } @Test void notifyOffered_shouldTransitionToOffered_whenNegotiationFound() { var processId = "processId"; - var token = ClaimToken.Builder.newInstance().build(); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var contractOffer = contractOffer(); var message = ContractOfferMessage.Builder.newInstance() .callbackAddress("callbackAddress") @@ -173,10 +185,11 @@ void notifyOffered_shouldTransitionToOffered_whenNegotiationFound() { .build(); var negotiation = createContractNegotiationRequested(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(store.findByCorrelationIdAndLease(processId)).thenReturn(StoreResult.success(negotiation)); - when(validationService.validateRequest(token, negotiation)).thenReturn(Result.success()); + when(validationService.validateRequest(claimToken, negotiation)).thenReturn(Result.success()); - var result = service.notifyOffered(message, token); + var result = service.notifyOffered(message, tokenRepresentation); assertThat(result).isSucceeded(); var updatedNegotiation = result.getContent(); @@ -190,9 +203,8 @@ void notifyOffered_shouldTransitionToOffered_whenNegotiationFound() { @Test void notifyAccepted_shouldTransitionToAccepted() { var contractNegotiation = createContractNegotiationOffered(); - var token = ClaimToken.Builder.newInstance().build(); - when(store.findByCorrelationIdAndLease("processId")).thenReturn(StoreResult.success(contractNegotiation)); - when(validationService.validateRequest(eq(token), any(ContractNegotiation.class))).thenReturn(Result.success()); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var message = ContractNegotiationEventMessage.Builder.newInstance() .protocol("protocol") .counterPartyAddress("http://any") @@ -201,11 +213,15 @@ void notifyAccepted_shouldTransitionToAccepted() { .policy(Policy.Builder.newInstance().build()) .build(); - var result = service.notifyAccepted(message, token); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("processId")).thenReturn(StoreResult.success(contractNegotiation)); + when(validationService.validateRequest(eq(claimToken), any(ContractNegotiation.class))).thenReturn(Result.success()); + + var result = service.notifyAccepted(message, tokenRepresentation); assertThat(result).isSucceeded(); verify(store).save(argThat(negotiation -> negotiation.getState() == ACCEPTED.code())); - verify(validationService).validateRequest(token, contractNegotiation); + verify(validationService).validateRequest(claimToken, contractNegotiation); verify(listener).accepted(any()); verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); } @@ -213,10 +229,11 @@ void notifyAccepted_shouldTransitionToAccepted() { @Test void notifyAgreed_shouldTransitionToAgreed() { var negotiationConsumerRequested = createContractNegotiationRequested(); - var token = ClaimToken.Builder.newInstance().build(); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); + var contractAgreement = mock(ContractAgreement.class); - when(store.findByCorrelationIdAndLease("processId")).thenReturn(StoreResult.success(negotiationConsumerRequested)); - when(validationService.validateConfirmed(eq(token), eq(contractAgreement), any(ContractOffer.class))).thenReturn(Result.success()); + var message = ContractAgreementMessage.Builder.newInstance() .protocol("protocol") .counterPartyAddress("http://any") @@ -224,30 +241,38 @@ void notifyAgreed_shouldTransitionToAgreed() { .contractAgreement(contractAgreement) .build(); - var result = service.notifyAgreed(message, token); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("processId")).thenReturn(StoreResult.success(negotiationConsumerRequested)); + when(validationService.validateConfirmed(eq(claimToken), eq(contractAgreement), any(ContractOffer.class))).thenReturn(Result.success()); + + var result = service.notifyAgreed(message, tokenRepresentation); assertThat(result).isSucceeded(); verify(store).save(argThat(negotiation -> negotiation.getState() == AGREED.code() && negotiation.getContractAgreement() == contractAgreement )); - verify(validationService).validateConfirmed(eq(token), eq(contractAgreement), any(ContractOffer.class)); + verify(validationService).validateConfirmed(eq(claimToken), eq(contractAgreement), any(ContractOffer.class)); verify(listener).agreed(any()); verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); } @Test void notifyVerified_shouldTransitionToVerified() { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var negotiation = contractNegotiationBuilder().id("negotiationId").type(PROVIDER).state(AGREED.code()).build(); - when(store.findByCorrelationIdAndLease("processId")).thenReturn(StoreResult.success(negotiation)); - when(validationService.validateRequest(any(), any(ContractNegotiation.class))).thenReturn(Result.success()); var message = ContractAgreementVerificationMessage.Builder.newInstance() .protocol("protocol") .counterPartyAddress("http://any") .processId("processId") .build(); - var result = service.notifyVerified(message, claimToken()); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("processId")).thenReturn(StoreResult.success(negotiation)); + when(validationService.validateRequest(any(), any(ContractNegotiation.class))).thenReturn(Result.success()); + + var result = service.notifyVerified(message, tokenRepresentation); assertThat(result).isSucceeded(); verify(store).save(argThat(n -> n.getState() == VERIFIED.code())); @@ -259,17 +284,20 @@ void notifyVerified_shouldTransitionToVerified() { @Test void notifyFinalized_shouldTransitionToFinalized() { var negotiation = contractNegotiationBuilder().id("negotiationId").type(PROVIDER).state(VERIFIED.code()).build(); - when(store.findByCorrelationIdAndLease("processId")).thenReturn(StoreResult.success(negotiation)); - when(validationService.validateRequest(any(), any(ContractNegotiation.class))).thenReturn(Result.success()); var message = ContractNegotiationEventMessage.Builder.newInstance() .type(ContractNegotiationEventMessage.Type.FINALIZED) .protocol("protocol") .counterPartyAddress("http://any") .processId("processId") .build(); - var token = ClaimToken.Builder.newInstance().build(); + var claimToken = ClaimToken.Builder.newInstance().build(); + var tokenRepresentation = tokenRepresentation(); - var result = service.notifyFinalized(message, token); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("processId")).thenReturn(StoreResult.success(negotiation)); + when(validationService.validateRequest(any(), any(ContractNegotiation.class))).thenReturn(Result.success()); + + var result = service.notifyFinalized(message, tokenRepresentation); assertThat(result).isSucceeded(); verify(store).save(argThat(n -> n.getState() == FINALIZED.code())); @@ -281,17 +309,20 @@ void notifyFinalized_shouldTransitionToFinalized() { @Test void notifyTerminated_shouldTransitionToTerminated() { var negotiation = contractNegotiationBuilder().id("negotiationId").type(PROVIDER).state(VERIFIED.code()).build(); - when(store.findByCorrelationIdAndLease("processId")).thenReturn(StoreResult.success(negotiation)); - when(validationService.validateRequest(any(), any(ContractNegotiation.class))).thenReturn(Result.success()); var message = ContractNegotiationTerminationMessage.Builder.newInstance() .protocol("protocol") .processId("processId") .counterPartyAddress("http://any") .rejectionReason("any") .build(); - var token = ClaimToken.Builder.newInstance().build(); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); - var result = service.notifyTerminated(message, token); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("processId")).thenReturn(StoreResult.success(negotiation)); + when(validationService.validateRequest(any(), any(ContractNegotiation.class))).thenReturn(Result.success()); + + var result = service.notifyTerminated(message, tokenRepresentation); assertThat(result).isSucceeded(); verify(store).save(argThat(n -> n.getState() == TERMINATED.code())); @@ -303,13 +334,15 @@ void notifyTerminated_shouldTransitionToTerminated() { @Test void findById_shouldReturnNegotiation_whenValidCounterParty() { var id = "negotiationId"; - var token = ClaimToken.Builder.newInstance().build(); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var negotiation = contractNegotiationBuilder().id(id).type(PROVIDER).state(VERIFIED.code()).build(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(store.findById(id)).thenReturn(negotiation); - when(validationService.validateRequest(token, negotiation)).thenReturn(Result.success()); + when(validationService.validateRequest(claimToken, negotiation)).thenReturn(Result.success()); - var result = service.findById(id, token); + var result = service.findById(id, tokenRepresentation); assertThat(result) .isSucceeded() @@ -318,9 +351,13 @@ void findById_shouldReturnNegotiation_whenValidCounterParty() { @Test void findById_shouldReturnNotFound_whenNegotiationNotFound() { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(store.findById(any())).thenReturn(null); - var result = service.findById("invalidId", ClaimToken.Builder.newInstance().build()); + var result = service.findById("invalidId", tokenRepresentation); assertThat(result) .isFailed() @@ -331,13 +368,16 @@ void findById_shouldReturnNotFound_whenNegotiationNotFound() { @Test void findById_shouldReturnBadRequest_whenCounterPartyUnauthorized() { var id = "negotiationId"; - var token = ClaimToken.Builder.newInstance().build(); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + var negotiation = contractNegotiationBuilder().id(id).type(PROVIDER).state(VERIFIED.code()).build(); when(store.findById(id)).thenReturn(negotiation); - when(validationService.validateRequest(token, negotiation)).thenReturn(Result.failure("validation error")); + when(validationService.validateRequest(claimToken, negotiation)).thenReturn(Result.failure("validation error")); - var result = service.findById(id, token); + var result = service.findById(id, tokenRepresentation); assertThat(result) .isFailed() @@ -348,11 +388,14 @@ void findById_shouldReturnBadRequest_whenCounterPartyUnauthorized() { @ParameterizedTest @ArgumentsSource(NotifyArguments.class) void notify_shouldReturnNotFound_whenNotFound(MethodCall methodCall, M message) { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(store.findByCorrelationIdAndLease(any())).thenReturn(StoreResult.notFound("not found")); // currently ContractRequestMessage cannot happen on an already existing negotiation if (!(message instanceof ContractRequestMessage)) { - var result = methodCall.call(service, message, claimToken()); + var result = methodCall.call(service, message, tokenRepresentation); assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(NOT_FOUND); verify(store, never()).save(any()); @@ -363,18 +406,36 @@ void notify_shouldReturnNotFound_whenNotFound(MethodCa @ParameterizedTest @ArgumentsSource(NotifyArguments.class) void notify_shouldReturnBadRequest_whenValidationFails(MethodCall methodCall, M message) { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(store.findByCorrelationIdAndLease(any())).thenReturn(StoreResult.success(createContractNegotiationOffered())); when(validationService.validateRequest(any(), any(ContractNegotiation.class))).thenReturn(Result.failure("validation error")); when(validationService.validateInitialOffer(any(), any(ContractOffer.class))).thenReturn(Result.failure("error")); when(validationService.validateConfirmed(any(), any(), any(ContractOffer.class))).thenReturn(Result.failure("failure")); - var result = methodCall.call(service, message, claimToken()); + var result = methodCall.call(service, message, tokenRepresentation); assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(BAD_REQUEST); verify(store, never()).save(any()); verifyNoInteractions(listener); } + @ParameterizedTest + @ArgumentsSource(NotifyArguments.class) + void notify_shouldReturnBadRequest_whenTokenValidationFails(MethodCall methodCall, M message) { + var tokenRepresentation = tokenRepresentation(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.failure("unauthorized")); + + var result = methodCall.call(service, message, tokenRepresentation); + + assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(UNAUTHORIZED); + verify(store, never()).save(any()); + verifyNoInteractions(listener); + } + private ClaimToken claimToken() { return ClaimToken.Builder.newInstance() .claim("key", "value") @@ -410,9 +471,15 @@ private ContractNegotiation.Builder contractNegotiationBuilder() { .stateTimestamp(Instant.now().toEpochMilli()); } + private TokenRepresentation tokenRepresentation() { + return TokenRepresentation.Builder.newInstance() + .token(UUID.randomUUID().toString()) + .build(); + } + @FunctionalInterface private interface MethodCall { - ServiceResult call(ContractNegotiationProtocolService service, M message, ClaimToken token); + ServiceResult call(ContractNegotiationProtocolService service, M message, TokenRepresentation token); } private static class NotifyArguments implements ArgumentsProvider { diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/policydefinition/PolicyDefinitionEventDispatchTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/policydefinition/PolicyDefinitionEventDispatchTest.java index 23249963b43..e9d26a192cb 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/policydefinition/PolicyDefinitionEventDispatchTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/policydefinition/PolicyDefinitionEventDispatchTest.java @@ -25,6 +25,7 @@ import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.event.EventSubscriber; +import org.eclipse.edc.spi.iam.IdentityService; import org.eclipse.edc.spi.protocol.ProtocolWebhook; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -48,6 +49,7 @@ public class PolicyDefinitionEventDispatchTest { void setUp(EdcExtension extension) { extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class)); extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class)); + extension.registerServiceMock(IdentityService.class, mock()); extension.setConfiguration(Map.of( "web.http.port", String.valueOf(getFreePort()), "web.http.path", "/api") diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessEventDispatchTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessEventDispatchTest.java index 1475ea2b8e8..82fab9f4614 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessEventDispatchTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessEventDispatchTest.java @@ -43,10 +43,13 @@ import org.eclipse.edc.spi.event.EventRouter; import org.eclipse.edc.spi.event.EventSubscriber; import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.message.RemoteMessageDispatcher; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.protocol.ProtocolWebhook; import org.eclipse.edc.spi.response.StatusResult; +import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.domain.DataAddress; import org.eclipse.edc.spi.types.domain.agreement.ContractAgreement; import org.eclipse.edc.validator.spi.DataAddressValidatorRegistry; @@ -59,6 +62,7 @@ import java.time.Duration; import java.util.Map; +import java.util.UUID; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.Executors.newSingleThreadExecutor; @@ -68,6 +72,7 @@ import static org.eclipse.edc.junit.testfixtures.TestUtils.getFreePort; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.matches; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; @@ -80,6 +85,7 @@ public class TransferProcessEventDispatchTest { public static final Duration TIMEOUT = Duration.ofSeconds(30); private final EventSubscriber eventSubscriber = mock(EventSubscriber.class); + private final IdentityService identityService = mock(); @NotNull private static RemoteMessageDispatcher getTestDispatcher() { @@ -101,6 +107,7 @@ void setUp(EdcExtension extension) { extension.setConfiguration(configuration); extension.registerServiceMock(TransferWaitStrategy.class, () -> 1); extension.registerServiceMock(EventExecutorServiceContainer.class, new EventExecutorServiceContainer(newSingleThreadExecutor())); + extension.registerServiceMock(IdentityService.class, identityService); extension.registerServiceMock(ProtocolWebhook.class, () -> "http://dummy"); extension.registerServiceMock(DataPlaneInstanceStore.class, mock(DataPlaneInstanceStore.class)); extension.registerServiceMock(PolicyArchive.class, mock(PolicyArchive.class)); @@ -122,6 +129,10 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se ParticipantAgentService agentService) { var token = ClaimToken.Builder.newInstance().build(); + var tokenRepresentation = TokenRepresentation.Builder.newInstance().token(UUID.randomUUID().toString()).build(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(token)); + var agent = mock(ParticipantAgent.class); var agreement = mock(ContractAgreement.class); var providerId = "ProviderId"; @@ -154,7 +165,7 @@ void shouldDispatchEventsOnTransferProcessStateChanges(TransferProcessService se .dataAddress(dataAddress) .build(); - protocolService.notifyStarted(startMessage, token); + protocolService.notifyStarted(startMessage, tokenRepresentation); await().atMost(TIMEOUT).untilAsserted(() -> { ArgumentCaptor> captor = ArgumentCaptor.forClass(EventEnvelope.class); diff --git a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImplTest.java b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImplTest.java index dc18fd79c18..4b874440449 100644 --- a/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImplTest.java +++ b/core/control-plane/control-plane-aggregate-services/src/test/java/org/eclipse/edc/connector/service/transferprocess/TransferProcessProtocolServiceImplTest.java @@ -31,6 +31,8 @@ import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferTerminationMessage; import org.eclipse.edc.policy.model.Policy; import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.result.ServiceFailure; import org.eclipse.edc.spi.result.ServiceResult; @@ -65,9 +67,11 @@ import static org.eclipse.edc.spi.result.ServiceFailure.Reason.BAD_REQUEST; import static org.eclipse.edc.spi.result.ServiceFailure.Reason.CONFLICT; import static org.eclipse.edc.spi.result.ServiceFailure.Reason.NOT_FOUND; +import static org.eclipse.edc.spi.result.ServiceFailure.Reason.UNAUTHORIZED; import static org.eclipse.edc.validator.spi.Violation.violation; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -86,18 +90,21 @@ class TransferProcessProtocolServiceImplTest { private final DataAddressValidatorRegistry dataAddressValidator = mock(); private final TransferProcessListener listener = mock(); + private final IdentityService identityService = mock(); private TransferProcessProtocolService service; @BeforeEach void setUp() { var observable = new TransferProcessObservableImpl(); observable.registerListener(listener); - service = new TransferProcessProtocolServiceImpl(store, transactionContext, negotiationStore, validationService, + service = new TransferProcessProtocolServiceImpl(store, transactionContext, negotiationStore, validationService, identityService, dataAddressValidator, observable, mock(), mock(), mock()); } @Test void notifyRequested_validAgreement_shouldInitiateTransfer() { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var message = TransferRequestMessage.Builder.newInstance() .processId("transferProcessId") .contractId("agreementId") @@ -105,11 +112,13 @@ void notifyRequested_validAgreement_shouldInitiateTransfer() { .callbackAddress("http://any") .dataDestination(DataAddress.Builder.newInstance().type("any").build()) .build(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(negotiationStore.findContractAgreement(any())).thenReturn(contractAgreement()); when(validationService.validateAgreement(any(), any())).thenReturn(Result.success(null)); when(dataAddressValidator.validateDestination(any())).thenReturn(ValidationResult.success()); - var result = service.notifyRequested(message, claimToken()); + var result = service.notifyRequested(message, tokenRepresentation); assertThat(result).isSucceeded().satisfies(tp -> { assertThat(tp.getCorrelationId()).isEqualTo("transferProcessId"); @@ -131,12 +140,16 @@ void notifyRequested_doNothingIfProcessAlreadyExist() { .callbackAddress("http://any") .dataDestination(DataAddress.Builder.newInstance().type("any").build()) .build(); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(negotiationStore.findContractAgreement(any())).thenReturn(contractAgreement()); when(validationService.validateAgreement(any(), any())).thenReturn(Result.success(null)); when(dataAddressValidator.validateDestination(any())).thenReturn(ValidationResult.success()); when(store.findForCorrelationId(any())).thenReturn(transferProcess(REQUESTED, "transferProcessId")); - var result = service.notifyRequested(message, claimToken()); + var result = service.notifyRequested(message, tokenRepresentation); assertThat(result).isSucceeded().extracting(TransferProcess::getId).isEqualTo("transferProcessId"); verify(store, never()).save(any()); @@ -151,11 +164,15 @@ void notifyRequested_invalidAgreement_shouldNotInitiateTransfer() { .contractId("agreementId") .dataDestination(DataAddress.Builder.newInstance().type("any").build()) .build(); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(negotiationStore.findContractAgreement(any())).thenReturn(contractAgreement()); when(validationService.validateAgreement(any(), any())).thenReturn(Result.failure("error")); when(dataAddressValidator.validateDestination(any())).thenReturn(ValidationResult.success()); - var result = service.notifyRequested(message, claimToken()); + var result = service.notifyRequested(message, tokenRepresentation); assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT); verify(store, never()).save(any()); @@ -164,7 +181,8 @@ void notifyRequested_invalidAgreement_shouldNotInitiateTransfer() { @Test void notifyRequested_invalidDestination_shouldNotInitiateTransfer() { - when(dataAddressValidator.validateDestination(any())).thenReturn(ValidationResult.failure(violation("invalid data address", "path"))); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var message = TransferRequestMessage.Builder.newInstance() .protocol("protocol") .contractId("agreementId") @@ -172,7 +190,11 @@ void notifyRequested_invalidDestination_shouldNotInitiateTransfer() { .dataDestination(DataAddress.Builder.newInstance().type("any").build()) .build(); - var result = service.notifyRequested(message, claimToken()); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(dataAddressValidator.validateDestination(any())).thenReturn(ValidationResult.failure(violation("invalid data address", "path"))); + + + var result = service.notifyRequested(message, tokenRepresentation); assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(BAD_REQUEST); verify(store, never()).save(any()); @@ -181,16 +203,20 @@ void notifyRequested_invalidDestination_shouldNotInitiateTransfer() { @Test void notifyRequested_missingDestination_shouldInitiateTransfer() { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var message = TransferRequestMessage.Builder.newInstance() .processId("transferProcessId") .protocol("protocol") .contractId("agreementId") .callbackAddress("http://any") .build(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(negotiationStore.findContractAgreement(any())).thenReturn(contractAgreement()); when(validationService.validateAgreement(any(), any())).thenReturn(Result.success(null)); - var result = service.notifyRequested(message, claimToken()); + var result = service.notifyRequested(message, tokenRepresentation); assertThat(result).isSucceeded().satisfies(tp -> { assertThat(tp.getCorrelationId()).isEqualTo("transferProcessId"); @@ -206,21 +232,22 @@ void notifyRequested_missingDestination_shouldInitiateTransfer() { @Test void notifyStarted_shouldTransitionToStarted() { - when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess(REQUESTED, "transferProcessId"))); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var message = TransferStartMessage.Builder.newInstance() .protocol("protocol") .counterPartyAddress("http://any") .processId("correlationId") .dataAddress(DataAddress.Builder.newInstance().type("test").build()) .build(); - - var token = claimToken(); var agreement = contractAgreement(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess(REQUESTED, "transferProcessId"))); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(token, agreement)).thenReturn(Result.success()); + when(validationService.validateRequest(claimToken, agreement)).thenReturn(Result.success()); - var result = service.notifyStarted(message, token); + var result = service.notifyStarted(message, tokenRepresentation); var captor = ArgumentCaptor.forClass(TransferProcessStartedData.class); @@ -235,21 +262,22 @@ void notifyStarted_shouldTransitionToStarted() { @Test void notifyStarted_shouldReturnConflict_whenStatusIsNotValid() { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var transferProcess = transferProcess(COMPLETED, UUID.randomUUID().toString()); - when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); var message = TransferStartMessage.Builder.newInstance() .protocol("protocol") .counterPartyAddress("http://any") .processId("correlationId") .build(); - - var token = claimToken(); var agreement = contractAgreement(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(token, agreement)).thenReturn(Result.success()); + when(validationService.validateRequest(claimToken, agreement)).thenReturn(Result.success()); - var result = service.notifyStarted(message, token); + var result = service.notifyStarted(message, tokenRepresentation); assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT); // state didn't change @@ -259,21 +287,22 @@ void notifyStarted_shouldReturnConflict_whenStatusIsNotValid() { @Test void notifyStarted_shouldReturnNotFound_whenCounterPartyUnauthorized() { - when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess(REQUESTED, "transferProcessId"))); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var message = TransferStartMessage.Builder.newInstance() .protocol("protocol") .counterPartyAddress("http://any") .processId("correlationId") .dataAddress(DataAddress.Builder.newInstance().type("test").build()) .build(); - - var token = claimToken(); var agreement = contractAgreement(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess(REQUESTED, "transferProcessId"))); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(token, agreement)).thenReturn(Result.failure("error")); + when(validationService.validateRequest(claimToken, agreement)).thenReturn(Result.failure("error")); - var result = service.notifyStarted(message, token); + var result = service.notifyStarted(message, tokenRepresentation); assertThat(result) .isFailed() @@ -286,20 +315,21 @@ void notifyStarted_shouldReturnNotFound_whenCounterPartyUnauthorized() { @Test void notifyCompleted_shouldTransitionToCompleted() { - when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess(STARTED, "transferProcessId"))); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var message = TransferCompletionMessage.Builder.newInstance() .protocol("protocol") .counterPartyAddress("http://any") .processId("correlationId") .build(); - - var token = claimToken(); var agreement = contractAgreement(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess(STARTED, "transferProcessId"))); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(token, agreement)).thenReturn(Result.success()); + when(validationService.validateRequest(claimToken, agreement)).thenReturn(Result.success()); - var result = service.notifyCompleted(message, token); + var result = service.notifyCompleted(message, tokenRepresentation); assertThat(result).isSucceeded(); verify(listener).preCompleted(any()); @@ -310,21 +340,22 @@ void notifyCompleted_shouldTransitionToCompleted() { @Test void notifyCompleted_shouldReturnConflict_whenStatusIsNotValid() { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var transferProcess = transferProcess(REQUESTED, UUID.randomUUID().toString()); - when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); var message = TransferCompletionMessage.Builder.newInstance() .protocol("protocol") .counterPartyAddress("http://any") .processId("correlationId") .build(); - - var token = claimToken(); var agreement = contractAgreement(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(token, agreement)).thenReturn(Result.success()); + when(validationService.validateRequest(claimToken, agreement)).thenReturn(Result.success()); - var result = service.notifyCompleted(message, token); + var result = service.notifyCompleted(message, tokenRepresentation); assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT); // state didn't change @@ -334,20 +365,22 @@ void notifyCompleted_shouldReturnConflict_whenStatusIsNotValid() { @Test void notifyCompleted_shouldReturnNotFound_whenCounterPartyUnauthorized() { - when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess(STARTED, "transferProcessId"))); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var message = TransferCompletionMessage.Builder.newInstance() .protocol("protocol") .counterPartyAddress("http://any") .processId("correlationId") .build(); - var token = claimToken(); var agreement = contractAgreement(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess(STARTED, "transferProcessId"))); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(token, agreement)).thenReturn(Result.failure("error")); + when(validationService.validateRequest(claimToken, agreement)).thenReturn(Result.failure("error")); - var result = service.notifyCompleted(message, token); + var result = service.notifyCompleted(message, tokenRepresentation); assertThat(result) .isFailed() @@ -360,7 +393,8 @@ void notifyCompleted_shouldReturnNotFound_whenCounterPartyUnauthorized() { @Test void notifyTerminated_shouldTransitionToTerminated() { - when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess(STARTED, "transferProcessId"))); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var message = TransferTerminationMessage.Builder.newInstance() .protocol("protocol") .counterPartyAddress("http://any") @@ -368,13 +402,13 @@ void notifyTerminated_shouldTransitionToTerminated() { .code("TestCode") .reason("TestReason") .build(); - - var token = claimToken(); var agreement = contractAgreement(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess(STARTED, "transferProcessId"))); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(token, agreement)).thenReturn(Result.success()); - var result = service.notifyTerminated(message, token); + when(validationService.validateRequest(claimToken, agreement)).thenReturn(Result.success()); + var result = service.notifyTerminated(message, tokenRepresentation); assertThat(result).isSucceeded(); verify(listener).preTerminated(any()); @@ -385,8 +419,10 @@ void notifyTerminated_shouldTransitionToTerminated() { @Test void notifyTerminated_shouldReturnConflict_whenStatusIsNotValid() { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var transferProcess = transferProcess(TERMINATED, UUID.randomUUID().toString()); - when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); + var agreement = contractAgreement(); var message = TransferTerminationMessage.Builder.newInstance() .protocol("protocol") .counterPartyAddress("http://any") @@ -395,13 +431,12 @@ void notifyTerminated_shouldReturnConflict_whenStatusIsNotValid() { .reason("TestReason") .build(); - var token = claimToken(); - var agreement = contractAgreement(); - + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(token, agreement)).thenReturn(Result.success()); + when(validationService.validateRequest(claimToken, agreement)).thenReturn(Result.success()); - var result = service.notifyTerminated(message, token); + var result = service.notifyTerminated(message, tokenRepresentation); assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(CONFLICT); // state didn't change @@ -411,8 +446,10 @@ void notifyTerminated_shouldReturnConflict_whenStatusIsNotValid() { @Test void notifyTerminated_shouldReturnNotFound_whenCounterPartyUnauthorized() { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); + var agreement = contractAgreement(); var transferProcess = transferProcess(TERMINATED, UUID.randomUUID().toString()); - when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); var message = TransferTerminationMessage.Builder.newInstance() .protocol("protocol") .counterPartyAddress("http://any") @@ -421,13 +458,12 @@ void notifyTerminated_shouldReturnNotFound_whenCounterPartyUnauthorized() { .reason("TestReason") .build(); - var token = claimToken(); - var agreement = contractAgreement(); - + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); + when(store.findByCorrelationIdAndLease("correlationId")).thenReturn(StoreResult.success(transferProcess)); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(token, agreement)).thenReturn(Result.failure("error")); + when(validationService.validateRequest(claimToken, agreement)).thenReturn(Result.failure("error")); - var result = service.notifyTerminated(message, token); + var result = service.notifyTerminated(message, tokenRepresentation); assertThat(result) .isFailed() @@ -440,16 +476,18 @@ void notifyTerminated_shouldReturnNotFound_whenCounterPartyUnauthorized() { @Test void findById_shouldReturnTransferProcess_whenValidCounterParty() { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var processId = "transferProcessId"; var transferProcess = transferProcess(INITIAL, processId); - var token = claimToken(); var agreement = contractAgreement(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(store.findById(processId)).thenReturn(transferProcess); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(token, agreement)).thenReturn(Result.success()); + when(validationService.validateRequest(claimToken, agreement)).thenReturn(Result.success()); - var result = service.findById(processId, token); + var result = service.findById(processId, tokenRepresentation); assertThat(result) .isSucceeded() @@ -458,9 +496,13 @@ void findById_shouldReturnTransferProcess_whenValidCounterParty() { @Test void findById_shouldReturnNotFound_whenNegotiationNotFound() { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(store.findById(any())).thenReturn(null); - var result = service.findById("invalidId", ClaimToken.Builder.newInstance().build()); + var result = service.findById("invalidId", tokenRepresentation); assertThat(result) .isFailed() @@ -472,14 +514,16 @@ void findById_shouldReturnNotFound_whenNegotiationNotFound() { void findById_shouldReturnNotFound_whenCounterPartyUnauthorized() { var processId = "transferProcessId"; var transferProcess = transferProcess(INITIAL, processId); - var token = claimToken(); + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); var agreement = contractAgreement(); + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(store.findById(processId)).thenReturn(transferProcess); when(negotiationStore.findContractAgreement(any())).thenReturn(agreement); - when(validationService.validateRequest(token, agreement)).thenReturn(Result.failure("error")); + when(validationService.validateRequest(claimToken, agreement)).thenReturn(Result.failure("error")); - var result = service.findById(processId, token); + var result = service.findById(processId, tokenRepresentation); assertThat(result) .isFailed() @@ -490,15 +534,33 @@ void findById_shouldReturnNotFound_whenCounterPartyUnauthorized() { @ParameterizedTest @ArgumentsSource(NotFoundArguments.class) void notify_shouldFail_whenTransferProcessNotFound(MethodCall methodCall, M message) { + var claimToken = claimToken(); + var tokenRepresentation = tokenRepresentation(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.success(claimToken)); when(store.findByCorrelationIdAndLease(any())).thenReturn(StoreResult.notFound("not found")); - var result = methodCall.call(service, message, claimToken()); + var result = methodCall.call(service, message, tokenRepresentation); assertThat(result).matches(ServiceResult::failed); verify(store, never()).save(any()); verify(transactionContext, atLeastOnce()).execute(any(TransactionContext.ResultTransactionBlock.class)); } + @ParameterizedTest + @ArgumentsSource(NotFoundArguments.class) + void notify_shouldFail_whenTokenValidationFails(MethodCall methodCall, M message) { + var tokenRepresentation = tokenRepresentation(); + + when(identityService.verifyJwtToken(eq(tokenRepresentation), any())).thenReturn(Result.failure("unauthorized")); + + var result = methodCall.call(service, message, tokenRepresentation); + + assertThat(result).isFailed().extracting(ServiceFailure::getReason).isEqualTo(UNAUTHORIZED); + verify(store, never()).save(any()); + verifyNoInteractions(listener); + } + private TransferProcess transferProcess(TransferProcessStates state, String id) { return TransferProcess.Builder.newInstance() .state(state.code()) @@ -513,6 +575,12 @@ private ClaimToken claimToken() { .build(); } + private TokenRepresentation tokenRepresentation() { + return TokenRepresentation.Builder.newInstance() + .token(UUID.randomUUID().toString()) + .build(); + } + private ContractAgreement contractAgreement() { return ContractAgreement.Builder.newInstance() .id("agreementId") @@ -532,7 +600,7 @@ private DataRequest dataRequest() { @FunctionalInterface private interface MethodCall { - ServiceResult call(TransferProcessProtocolService service, M message, ClaimToken token); + ServiceResult call(TransferProcessProtocolService service, M message, TokenRepresentation token); } private static class NotFoundArguments implements ArgumentsProvider { diff --git a/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/DspHttpCoreExtension.java b/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/DspHttpCoreExtension.java index ad6eea21e87..89500562822 100644 --- a/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/DspHttpCoreExtension.java +++ b/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/DspHttpCoreExtension.java @@ -44,7 +44,6 @@ import org.eclipse.edc.spi.iam.TokenDecorator; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.monitor.Monitor; -import org.eclipse.edc.spi.protocol.ProtocolWebhook; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; import org.eclipse.edc.spi.types.TypeManager; @@ -102,9 +101,6 @@ public class DspHttpCoreExtension implements ServiceExtension { @Inject private Monitor monitor; - @Inject - private ProtocolWebhook dspWebhookAddress; - @Inject private JsonObjectValidatorRegistry validatorRegistry; @@ -133,7 +129,7 @@ public DspHttpRemoteMessageDispatcher dspHttpRemoteMessageDispatcher(ServiceExte @Provider public DspRequestHandler dspRequestHandler() { - return new DspRequestHandlerImpl(monitor, dspWebhookAddress.url(), identityService, validatorRegistry, transformerRegistry); + return new DspRequestHandlerImpl(monitor, validatorRegistry, transformerRegistry); } @Provider diff --git a/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/message/DspRequestHandlerImpl.java b/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/message/DspRequestHandlerImpl.java index e566e6532a6..88972633a55 100644 --- a/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/message/DspRequestHandlerImpl.java +++ b/data-protocols/dsp/dsp-http-core/src/main/java/org/eclipse/edc/protocol/dsp/message/DspRequestHandlerImpl.java @@ -20,7 +20,6 @@ import org.eclipse.edc.protocol.dsp.spi.message.DspRequestHandler; import org.eclipse.edc.protocol.dsp.spi.message.GetDspRequest; import org.eclipse.edc.protocol.dsp.spi.message.PostDspRequest; -import org.eclipse.edc.spi.iam.IdentityService; import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.result.Result; @@ -38,16 +37,11 @@ public class DspRequestHandlerImpl implements DspRequestHandler { private final Monitor monitor; - private final String callbackAddress; - private final IdentityService identityService; private final JsonObjectValidatorRegistry validatorRegistry; private final TypeTransformerRegistry transformerRegistry; - public DspRequestHandlerImpl(Monitor monitor, String callbackAddress, IdentityService identityService, - JsonObjectValidatorRegistry validatorRegistry, TypeTransformerRegistry transformerRegistry) { + public DspRequestHandlerImpl(Monitor monitor, JsonObjectValidatorRegistry validatorRegistry, TypeTransformerRegistry transformerRegistry) { this.monitor = monitor; - this.callbackAddress = callbackAddress; - this.identityService = identityService; this.validatorRegistry = validatorRegistry; this.transformerRegistry = transformerRegistry; } @@ -57,14 +51,8 @@ public Response getResource(GetDspRequest request) { monitor.debug(() -> "DSP: Incoming resource request for %s id %s".formatted(request.getResultClass(), request.getId())); var tokenRepresentation = TokenRepresentation.Builder.newInstance().token(request.getToken()).build(); - var claimTokenResult = identityService.verifyJwtToken(tokenRepresentation, callbackAddress); - if (claimTokenResult.failed()) { - monitor.debug(() -> "DSP: Unauthorized: %s".formatted(claimTokenResult.getFailureDetail())); - return type(request.getErrorType()).unauthorized(); - } - - var serviceResult = request.getServiceCall().apply(request.getId(), claimTokenResult.getContent()); + var serviceResult = request.getServiceCall().apply(request.getId(), tokenRepresentation); if (serviceResult.failed()) { monitor.debug(() -> "DSP: Service call failed: %s".formatted(serviceResult.getFailureDetail())); return type(request.getErrorType()).processId(request.getId()).from(serviceResult.getFailure()); @@ -90,12 +78,6 @@ public Response createResource(PostDspRequest request.getProcessId() != null ? ": " + request.getProcessId() : "")); var tokenRepresentation = TokenRepresentation.Builder.newInstance().token(request.getToken()).build(); - var claimTokenResult = identityService.verifyJwtToken(tokenRepresentation, callbackAddress); - - if (claimTokenResult.failed()) { - monitor.debug(() -> "DSP: Unauthorized: %s".formatted(claimTokenResult.getFailureDetail())); - return type(request.getErrorType()).unauthorized(); - } var validation = validatorRegistry.validate(request.getExpectedMessageType(), request.getMessage()); if (validation.failed()) { @@ -116,7 +98,7 @@ public Response createResource(PostDspRequest return type(request.getErrorType()).badRequest(); } - var serviceResult = request.getServiceCall().apply(inputTransformation.getContent(), claimTokenResult.getContent()); + var serviceResult = request.getServiceCall().apply(inputTransformation.getContent(), tokenRepresentation); if (serviceResult.failed()) { monitor.debug(() -> "DSP: Service call failed: %s".formatted(serviceResult.getFailureDetail())); return type(request.getErrorType()).from(serviceResult.getFailure()); @@ -142,12 +124,6 @@ public Response updateResource(PostDspRequest request.getProcessId() != null ? ": " + request.getProcessId() : "")); var tokenRepresentation = TokenRepresentation.Builder.newInstance().token(request.getToken()).build(); - var claimTokenResult = identityService.verifyJwtToken(tokenRepresentation, callbackAddress); - - if (claimTokenResult.failed()) { - monitor.debug(() -> "DSP: Unauthorized: %s".formatted(claimTokenResult.getFailureDetail())); - return type(request.getErrorType()).processId(request.getProcessId()).unauthorized(); - } var validation = validatorRegistry.validate(request.getExpectedMessageType(), request.getMessage()); if (validation.failed()) { @@ -175,7 +151,7 @@ public Response updateResource(PostDspRequest } return request.getServiceCall() - .apply(inputTransformation.getContent(), claimTokenResult.getContent()) + .apply(inputTransformation.getContent(), tokenRepresentation) .map(it -> Response.ok().type(MediaType.APPLICATION_JSON_TYPE).build()) .orElse(failure -> { monitor.debug(() -> "DSP: Service call failed: %s".formatted(failure.getFailureDetail())); diff --git a/data-protocols/dsp/dsp-http-core/src/test/java/org/eclipse/edc/protocol/dsp/message/DspRequestHandlerImplTest.java b/data-protocols/dsp/dsp-http-core/src/test/java/org/eclipse/edc/protocol/dsp/message/DspRequestHandlerImplTest.java index 3e1811ca151..891f04702ce 100644 --- a/data-protocols/dsp/dsp-http-core/src/test/java/org/eclipse/edc/protocol/dsp/message/DspRequestHandlerImplTest.java +++ b/data-protocols/dsp/dsp-http-core/src/test/java/org/eclipse/edc/protocol/dsp/message/DspRequestHandlerImplTest.java @@ -19,7 +19,7 @@ import org.eclipse.edc.protocol.dsp.spi.message.GetDspRequest; import org.eclipse.edc.protocol.dsp.spi.message.PostDspRequest; import org.eclipse.edc.spi.iam.ClaimToken; -import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.result.ServiceResult; import org.eclipse.edc.spi.types.domain.message.ProcessRemoteMessage; @@ -42,7 +42,6 @@ import static org.eclipse.edc.protocol.dsp.type.DspPropertyAndTypeNames.DSPACE_PROPERTY_REASON; import static org.eclipse.edc.validator.spi.Violation.violation; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -51,23 +50,18 @@ class DspRequestHandlerImplTest { - private final String callbackAddress = "http://any"; - private final IdentityService identityService = mock(); private final JsonObjectValidatorRegistry validatorRegistry = mock(); private final TypeTransformerRegistry transformerRegistry = mock(); - private final DspRequestHandlerImpl handler = new DspRequestHandlerImpl(mock(), callbackAddress, identityService, - validatorRegistry, transformerRegistry); + private final DspRequestHandlerImpl handler = new DspRequestHandlerImpl(mock(), validatorRegistry, transformerRegistry); @Nested class GetResource { @Test void shouldSucceed() { - var claimToken = claimToken(); var content = new Object(); var resourceJson = Json.createObjectBuilder().build(); - BiFunction> serviceCall = (m, t) -> ServiceResult.success(content); - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken)); + BiFunction> serviceCall = (m, t) -> ServiceResult.success(content); when(transformerRegistry.transform(any(), any())).thenReturn(Result.success(resourceJson)); var request = GetDspRequest.Builder.newInstance(Object.class) .token("token") @@ -79,14 +73,12 @@ void shouldSucceed() { var result = handler.getResource(request); assertThat(result.getStatus()).isEqualTo(200); - verify(identityService).verifyJwtToken(argThat(t -> t.getToken().equals("token")), eq(callbackAddress)); verifyNoInteractions(validatorRegistry); } @Test void shouldFail_whenTokenIsNotValid() { - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.failure("error")); - var request = getDspRequestBuilder().errorType("errorType").build(); + var request = getDspRequestBuilder().errorType("errorType").serviceCall((m, t) -> ServiceResult.unauthorized("unauthorized")).build(); var result = handler.getResource(request); @@ -100,9 +92,7 @@ void shouldFail_whenTokenIsNotValid() { @Test void shouldFail_whenServiceCallFails() { - var claimToken = claimToken(); - BiFunction> serviceCall = (m, t) -> ServiceResult.notFound("error"); - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken)); + BiFunction> serviceCall = (m, t) -> ServiceResult.notFound("error"); var request = getDspRequestBuilder() .serviceCall(serviceCall) .build(); @@ -114,8 +104,6 @@ void shouldFail_whenServiceCallFails() { @Test void shouldFail_whenTransformationFails() { - var claimToken = claimToken(); - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken)); when(transformerRegistry.transform(any(), any())).thenReturn(Result.failure("error")); var request = getDspRequestBuilder().build(); @@ -137,13 +125,11 @@ private GetDspRequest.Builder getDspRequestBuilder() { class CreateResource { @Test void shouldSucceed() { - var claimToken = claimToken(); var jsonMessage = Json.createObjectBuilder().build(); var message = mock(TestMessage.class); var content = new Object(); var responseJson = Json.createObjectBuilder().build(); - BiFunction> serviceCall = (m, t) -> ServiceResult.success(content); - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken)); + BiFunction> serviceCall = (m, t) -> ServiceResult.success(content); when(validatorRegistry.validate(any(), any())).thenReturn(ValidationResult.success()); when(transformerRegistry.transform(any(), eq(TestMessage.class))).thenReturn(Result.success(message)); when(transformerRegistry.transform(any(), eq(JsonObject.class))).thenReturn(Result.success(responseJson)); @@ -160,7 +146,6 @@ void shouldSucceed() { assertThat(result.getStatus()).isEqualTo(200); assertThat(result.getEntity()).isEqualTo(responseJson); assertThat(result.getMediaType()).isEqualTo(APPLICATION_JSON_TYPE); - verify(identityService).verifyJwtToken(argThat(t -> t.getToken().equals("token")), eq(callbackAddress)); verify(validatorRegistry).validate("expected-message-type", jsonMessage); verify(transformerRegistry).transform(jsonMessage, TestMessage.class); verify(message).setProtocol(DATASPACE_PROTOCOL_HTTP); @@ -169,8 +154,11 @@ void shouldSucceed() { @Test void shouldFail_whenTokenIsNotValid() { - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.failure("error")); - var request = postDspRequestBuilder().errorType("errorType").build(); + var request = postDspRequestBuilder().errorType("errorType").serviceCall((m, t) -> ServiceResult.unauthorized("unauthorized")).build(); + var message = mock(TestMessage.class); + + when(validatorRegistry.validate(any(), any())).thenReturn(ValidationResult.success()); + when(transformerRegistry.transform(any(), eq(TestMessage.class))).thenReturn(Result.success(message)); var result = handler.createResource(request); @@ -184,7 +172,6 @@ void shouldFail_whenTokenIsNotValid() { @Test void shouldFail_whenValidationFails() { - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken())); when(validatorRegistry.validate(any(), any())).thenReturn(ValidationResult.failure(violation("error", "path"))); var request = postDspRequestBuilder().build(); @@ -195,7 +182,6 @@ void shouldFail_whenValidationFails() { @Test void shouldFail_whenTransformationFails() { - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken())); when(validatorRegistry.validate(any(), any())).thenReturn(ValidationResult.success()); when(transformerRegistry.transform(any(), any())).thenReturn(Result.failure("error")); var request = postDspRequestBuilder().build(); @@ -207,10 +193,8 @@ void shouldFail_whenTransformationFails() { @Test void shouldFail_whenServiceCallFails() { - var claimToken = claimToken(); var message = mock(TestMessage.class); - BiFunction> serviceCall = (m, t) -> ServiceResult.conflict("error"); - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken)); + BiFunction> serviceCall = (m, t) -> ServiceResult.conflict("error"); when(validatorRegistry.validate(any(), any())).thenReturn(ValidationResult.success()); when(transformerRegistry.transform(any(), any())).thenReturn(Result.success(message)); var request = postDspRequestBuilder().serviceCall(serviceCall).build(); @@ -222,9 +206,7 @@ void shouldFail_whenServiceCallFails() { @Test void shouldReturnInternalServerError_whenOutputTransformationFails() { - var claimToken = claimToken(); var message = mock(TestMessage.class); - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken)); when(validatorRegistry.validate(any(), any())).thenReturn(ValidationResult.success()); when(transformerRegistry.transform(any(), eq(TestMessage.class))).thenReturn(Result.success(message)); when(transformerRegistry.transform(any(), eq(JsonObject.class))).thenReturn(Result.failure("error")); @@ -249,12 +231,10 @@ private PostDspRequest.Builder postDspRequestBuilder() { class UpdateResource { @Test void shouldSucceed() { - var claimToken = claimToken(); var jsonMessage = Json.createObjectBuilder().build(); var message = mock(TestMessage.class); var content = new Object(); - BiFunction> serviceCall = (m, t) -> ServiceResult.success(content); - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken)); + BiFunction> serviceCall = (m, t) -> ServiceResult.success(content); when(validatorRegistry.validate(any(), any())).thenReturn(ValidationResult.success()); when(transformerRegistry.transform(any(), eq(TestMessage.class))).thenReturn(Result.success(message)); var request = PostDspRequest.Builder.newInstance(TestMessage.class, Object.class) @@ -269,7 +249,6 @@ void shouldSucceed() { assertThat(result.getStatus()).isEqualTo(200); assertThat(result.getMediaType()).isEqualTo(APPLICATION_JSON_TYPE); - verify(identityService).verifyJwtToken(argThat(t -> t.getToken().equals("token")), eq(callbackAddress)); verify(validatorRegistry).validate("expected-message-type", jsonMessage); verify(transformerRegistry).transform(jsonMessage, TestMessage.class); verify(message).setProtocol(DATASPACE_PROTOCOL_HTTP); @@ -277,8 +256,18 @@ void shouldSucceed() { @Test void shouldFail_whenTokenIsNotValid() { - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.failure("error")); - var request = postDspRequestBuilder().processId("processId").errorType("errorType").build(); + var jsonMessage = Json.createObjectBuilder().build(); + var request = postDspRequestBuilder() + .processId("processId") + .errorType("errorType") + .message(jsonMessage) + .serviceCall((m, t) -> ServiceResult.unauthorized("unauthorized")) + .build(); + var message = mock(TestMessage.class); + + when(message.getProcessId()).thenReturn("processId"); + when(validatorRegistry.validate(any(), any())).thenReturn(ValidationResult.success()); + when(transformerRegistry.transform(any(), eq(TestMessage.class))).thenReturn(Result.success(message)); var result = handler.updateResource(request); @@ -293,7 +282,6 @@ void shouldFail_whenTokenIsNotValid() { @Test void shouldFail_whenValidationFails() { - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken())); when(validatorRegistry.validate(any(), any())).thenReturn(ValidationResult.failure(violation("error", "path"))); var request = postDspRequestBuilder().processId("processId").errorType("errorType").build(); @@ -310,7 +298,6 @@ void shouldFail_whenValidationFails() { @Test void shouldFail_whenTransformationFails() { - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken())); when(validatorRegistry.validate(any(), any())).thenReturn(ValidationResult.success()); when(transformerRegistry.transform(any(), any())).thenReturn(Result.failure("error")); var request = postDspRequestBuilder().processId("processId").errorType("errorType").build(); @@ -328,9 +315,7 @@ void shouldFail_whenTransformationFails() { @Test void shouldFail_whenIdIsNotValid() { - var claimToken = claimToken(); var message = mock(TestMessage.class); - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken)); when(validatorRegistry.validate(any(), any())).thenReturn(ValidationResult.success()); when(transformerRegistry.transform(any(), any())).thenReturn(Result.success(message)); when(message.getProcessId()).thenReturn("processId"); @@ -349,10 +334,8 @@ void shouldFail_whenIdIsNotValid() { @Test void shouldFail_whenServiceCallFails() { - var claimToken = claimToken(); var message = mock(TestMessage.class); - BiFunction> serviceCall = (m, t) -> ServiceResult.conflict("error"); - when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(claimToken)); + BiFunction> serviceCall = (m, t) -> ServiceResult.conflict("error"); when(validatorRegistry.validate(any(), any())).thenReturn(ValidationResult.success()); when(transformerRegistry.transform(any(), any())).thenReturn(Result.success(message)); when(message.getProcessId()).thenReturn("processId"); diff --git a/data-protocols/dsp/dsp-http-spi/src/main/java/org/eclipse/edc/protocol/dsp/spi/message/DspRequest.java b/data-protocols/dsp/dsp-http-spi/src/main/java/org/eclipse/edc/protocol/dsp/spi/message/DspRequest.java index 042d50c6368..5339c651843 100644 --- a/data-protocols/dsp/dsp-http-spi/src/main/java/org/eclipse/edc/protocol/dsp/spi/message/DspRequest.java +++ b/data-protocols/dsp/dsp-http-spi/src/main/java/org/eclipse/edc/protocol/dsp/spi/message/DspRequest.java @@ -14,7 +14,7 @@ package org.eclipse.edc.protocol.dsp.spi.message; -import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.result.ServiceResult; import java.util.function.BiFunction; @@ -27,7 +27,7 @@ public class DspRequest { protected final Class inputClass; protected String token; protected String errorType; - protected BiFunction> serviceCall; + protected BiFunction> serviceCall; public DspRequest(Class inputClass, Class resultClass) { this.inputClass = inputClass; @@ -46,7 +46,7 @@ public Class getResultClass() { return resultClass; } - public BiFunction> getServiceCall() { + public BiFunction> getServiceCall() { return serviceCall; } @@ -67,7 +67,7 @@ public B token(String token) { return self(); } - public B serviceCall(BiFunction> serviceCall) { + public B serviceCall(BiFunction> serviceCall) { message.serviceCall = serviceCall; return self(); } diff --git a/data-protocols/dsp/dsp-negotiation/dsp-negotiation-api/src/main/java/org/eclipse/edc/protocol/dsp/negotiation/api/controller/DspNegotiationApiController.java b/data-protocols/dsp/dsp-negotiation/dsp-negotiation-api/src/main/java/org/eclipse/edc/protocol/dsp/negotiation/api/controller/DspNegotiationApiController.java index 7f3a1cf6344..a13bb9e058b 100644 --- a/data-protocols/dsp/dsp-negotiation/dsp-negotiation-api/src/main/java/org/eclipse/edc/protocol/dsp/negotiation/api/controller/DspNegotiationApiController.java +++ b/data-protocols/dsp/dsp-negotiation/dsp-negotiation-api/src/main/java/org/eclipse/edc/protocol/dsp/negotiation/api/controller/DspNegotiationApiController.java @@ -35,7 +35,7 @@ import org.eclipse.edc.protocol.dsp.spi.message.DspRequestHandler; import org.eclipse.edc.protocol.dsp.spi.message.GetDspRequest; import org.eclipse.edc.protocol.dsp.spi.message.PostDspRequest; -import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.result.ServiceResult; import org.eclipse.edc.transform.spi.TypeTransformerRegistry; @@ -279,11 +279,11 @@ public Response createAgreement(@PathParam("id") String id, } @NotNull - private ServiceResult validateAndProcessRequest(ContractRequestMessage message, ClaimToken claimToken) { + private ServiceResult validateAndProcessRequest(ContractRequestMessage message, TokenRepresentation tokenRepresentation) { if (message.getCallbackAddress() == null) { throw new InvalidRequestException(format("ContractRequestMessage must contain a '%s' property", DSPACE_PROPERTY_CALLBACK_ADDRESS)); } - return protocolService.notifyRequested(message, claimToken); + return protocolService.notifyRequested(message, tokenRepresentation); } } diff --git a/extensions/common/iam/iam-mock/src/main/java/org/eclipse/edc/iam/mock/MockIdentityService.java b/extensions/common/iam/iam-mock/src/main/java/org/eclipse/edc/iam/mock/MockIdentityService.java index f9c8ba97a8f..f0f0c793eca 100644 --- a/extensions/common/iam/iam-mock/src/main/java/org/eclipse/edc/iam/mock/MockIdentityService.java +++ b/extensions/common/iam/iam-mock/src/main/java/org/eclipse/edc/iam/mock/MockIdentityService.java @@ -23,10 +23,6 @@ import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.types.TypeManager; -import java.util.Objects; - -import static java.lang.String.format; - public class MockIdentityService implements IdentityService { private final String region; private final TypeManager typeManager; @@ -53,9 +49,6 @@ public Result obtainClientCredentials(TokenParameters param @Override public Result verifyJwtToken(TokenRepresentation tokenRepresentation, String audience) { var token = typeManager.readValue(tokenRepresentation.getToken(), MockToken.class); - if (!Objects.equals(token.audience, audience)) { - return Result.failure(format("Mismatched audience: expected %s, got %s", audience, token.audience)); - } return Result.success(ClaimToken.Builder.newInstance() .claim("region", token.region) .claim("client_id", token.clientId) diff --git a/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java b/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java index 9f64fdb5e60..6943c421a84 100644 --- a/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java +++ b/extensions/control-plane/api/control-plane-api-client/src/test/java/org/eclipse/edc/connector/api/client/transferprocess/TransferProcessHttpClientIntegrationTest.java @@ -30,6 +30,7 @@ import org.eclipse.edc.runtime.metamodel.annotation.Inject; import org.eclipse.edc.spi.EdcException; import org.eclipse.edc.spi.entity.StatefulEntity; +import org.eclipse.edc.spi.iam.IdentityService; import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.protocol.ProtocolWebhook; import org.eclipse.edc.spi.response.StatusResult; @@ -79,6 +80,7 @@ void setUp(EdcExtension extension) { extension.registerSystemExtension(ServiceExtension.class, new TransferServiceMockExtension(service)); extension.registerServiceMock(ProtocolWebhook.class, mock()); + extension.registerServiceMock(IdentityService.class, mock()); var registry = mock(RemoteMessageDispatcherRegistry.class); when(registry.dispatch(any(), any())).thenReturn(completedFuture(StatusResult.success("any"))); extension.registerServiceMock(RemoteMessageDispatcherRegistry.class, registry); diff --git a/extensions/control-plane/provision/provision-http/src/test/java/org/eclipse/edc/connector/provision/http/impl/HttpProvisionerExtensionEndToEndTest.java b/extensions/control-plane/provision/provision-http/src/test/java/org/eclipse/edc/connector/provision/http/impl/HttpProvisionerExtensionEndToEndTest.java index 7f62bdf773a..af3f32ba4d7 100644 --- a/extensions/control-plane/provision/provision-http/src/test/java/org/eclipse/edc/connector/provision/http/impl/HttpProvisionerExtensionEndToEndTest.java +++ b/extensions/control-plane/provision/provision-http/src/test/java/org/eclipse/edc/connector/provision/http/impl/HttpProvisionerExtensionEndToEndTest.java @@ -34,6 +34,8 @@ import org.eclipse.edc.spi.entity.StatefulEntity; import org.eclipse.edc.spi.http.EdcHttpClient; import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.IdentityService; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.protocol.ProtocolWebhook; import org.eclipse.edc.spi.result.Result; import org.eclipse.edc.spi.system.ServiceExtension; @@ -78,6 +80,7 @@ public class HttpProvisionerExtensionEndToEndTest { private final int dataPort = getFreePort(); private final Interceptor delegate = mock(Interceptor.class); private final ContractValidationService contractValidationService = mock(); + private final IdentityService identityService = mock(); @BeforeEach void setup(EdcExtension extension) { @@ -92,6 +95,7 @@ void setup(EdcExtension extension) { extension.registerServiceMock(EdcHttpClient.class, testHttpClient(delegate)); extension.registerServiceMock(ContractValidationService.class, contractValidationService); extension.registerServiceMock(ProtocolWebhook.class, mock(ProtocolWebhook.class)); + extension.registerServiceMock(IdentityService.class, identityService); var dataAddressValidatorRegistry = mock(DataAddressValidatorRegistry.class); when(dataAddressValidatorRegistry.validateSource(any())).thenReturn(ValidationResult.success()); when(dataAddressValidatorRegistry.validateDestination(any())).thenReturn(ValidationResult.success()); @@ -121,7 +125,9 @@ void processProviderRequestRetry(TransferProcessProtocolService protocolService, .thenAnswer(invocation -> createResponse(503, invocation)) .thenAnswer(invocation -> createResponse(200, invocation)); - var result = protocolService.notifyRequested(createTransferRequestMessage(), ClaimToken.Builder.newInstance().build()); + when(identityService.verifyJwtToken(any(), any())).thenReturn(Result.success(ClaimToken.Builder.newInstance().build())); + + var result = protocolService.notifyRequested(createTransferRequestMessage(), TokenRepresentation.Builder.newInstance().build()); assertThat(result).isSucceeded(); await().untilAsserted(() -> { diff --git a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/catalog/CatalogProtocolService.java b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/catalog/CatalogProtocolService.java index 368accf99dd..40e82054915 100644 --- a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/catalog/CatalogProtocolService.java +++ b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/catalog/CatalogProtocolService.java @@ -18,6 +18,7 @@ import org.eclipse.edc.catalog.spi.CatalogRequestMessage; import org.eclipse.edc.catalog.spi.Dataset; import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.result.ServiceResult; import org.jetbrains.annotations.NotNull; @@ -29,20 +30,20 @@ public interface CatalogProtocolService { /** * Returns a catalog given a {@link CatalogRequestMessage} and a {@link ClaimToken} * - * @param message the request message. - * @param token the claim token. + * @param message the request message. + * @param tokenRepresentation the claim token. * @return succeeded result with the {@link Catalog}, failed result otherwise. */ @NotNull - ServiceResult getCatalog(CatalogRequestMessage message, ClaimToken token); + ServiceResult getCatalog(CatalogRequestMessage message, TokenRepresentation tokenRepresentation); /** * Returns a dataset given its id and a {@link ClaimToken} * - * @param datasetId the dataset id. - * @param claimToken the claim token. + * @param datasetId the dataset id. + * @param tokenRepresentation the claim token. * @return succeeded result with the {@link Dataset}, failed result otherwise. */ @NotNull - ServiceResult getDataset(String datasetId, ClaimToken claimToken); + ServiceResult getDataset(String datasetId, TokenRepresentation tokenRepresentation); } diff --git a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/contractnegotiation/ContractNegotiationProtocolService.java b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/contractnegotiation/ContractNegotiationProtocolService.java index 4ed0c901333..f696b1b820e 100644 --- a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/contractnegotiation/ContractNegotiationProtocolService.java +++ b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/contractnegotiation/ContractNegotiationProtocolService.java @@ -21,7 +21,7 @@ import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationTerminationMessage; import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractOfferMessage; import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestMessage; -import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.result.ServiceResult; import org.jetbrains.annotations.NotNull; @@ -34,86 +34,86 @@ public interface ContractNegotiationProtocolService { * Notifies the ContractNegotiation that it has been requested by the consumer. * Only callable on provider ContractNegotiation. * - * @param message the incoming message - * @param claimToken the counter-party claim token + * @param message the incoming message + * @param tokenRepresentation the counter-party claim token * @return a succeeded result if the operation was successful, a failed one otherwise */ @NotNull - ServiceResult notifyRequested(ContractRequestMessage message, ClaimToken claimToken); + ServiceResult notifyRequested(ContractRequestMessage message, TokenRepresentation tokenRepresentation); /** * Notifies the ContractNegotiation that it has been offered by the provider. * Only callable on consumer ContractNegotiation. * - * @param message the incoming message - * @param claimToken the counter-party claim token + * @param message the incoming message + * @param tokenRepresentation the counter-party claim token * @return a succeeded result if the operation was successful, a failed one otherwise */ @NotNull - ServiceResult notifyOffered(ContractOfferMessage message, ClaimToken claimToken); + ServiceResult notifyOffered(ContractOfferMessage message, TokenRepresentation tokenRepresentation); /** * Notifies the ContractNegotiation that it has been agreed by the accepted. * Only callable on provider ContractNegotiation. * - * @param message the incoming message - * @param claimToken the counter-party claim token + * @param message the incoming message + * @param tokenRepresentation the counter-party claim token * @return a succeeded result if the operation was successful, a failed one otherwise */ @NotNull - ServiceResult notifyAccepted(ContractNegotiationEventMessage message, ClaimToken claimToken); + ServiceResult notifyAccepted(ContractNegotiationEventMessage message, TokenRepresentation tokenRepresentation); /** * Notifies the ContractNegotiation that it has been agreed by the provider. * Only callable on consumer ContractNegotiation. * - * @param message the incoming message - * @param claimToken the counter-party claim token + * @param message the incoming message + * @param tokenRepresentation the counter-party claim token * @return a succeeded result if the operation was successful, a failed one otherwise */ @NotNull - ServiceResult notifyAgreed(ContractAgreementMessage message, ClaimToken claimToken); + ServiceResult notifyAgreed(ContractAgreementMessage message, TokenRepresentation tokenRepresentation); /** * Notifies the ContractNegotiation that it has been verified by the consumer. * Only callable on provider ContractNegotiation. * - * @param message the incoming message - * @param claimToken the counter-party claim token + * @param message the incoming message + * @param tokenRepresentation the counter-party claim token * @return a succeeded result if the operation was successful, a failed one otherwise */ @NotNull - ServiceResult notifyVerified(ContractAgreementVerificationMessage message, ClaimToken claimToken); + ServiceResult notifyVerified(ContractAgreementVerificationMessage message, TokenRepresentation tokenRepresentation); /** * Notifies the ContractNegotiation that it has been finalized by the provider. * Only callable on consumer ContractNegotiation. * - * @param message the incoming message - * @param claimToken the counter-party claim token + * @param message the incoming message + * @param tokenRepresentation the counter-party claim token * @return a succeeded result if the operation was successful, a failed one otherwise */ @NotNull - ServiceResult notifyFinalized(ContractNegotiationEventMessage message, ClaimToken claimToken); + ServiceResult notifyFinalized(ContractNegotiationEventMessage message, TokenRepresentation tokenRepresentation); /** * Notifies the ContractNegotiation that it has been terminated by the counter-part. * - * @param message the incoming message - * @param claimToken the counter-party claim token + * @param message the incoming message + * @param tokenRepresentation the counter-party claim token * @return a succeeded result if the operation was successful, a failed one otherwise */ @NotNull - ServiceResult notifyTerminated(ContractNegotiationTerminationMessage message, ClaimToken claimToken); + ServiceResult notifyTerminated(ContractNegotiationTerminationMessage message, TokenRepresentation tokenRepresentation); /** * Finds a contract negotiation that has been requested by the counter-part. An existing * negotiation, for which the counter-part is not authorized, is treated as non-existent. * - * @param id id of the negotiation - * @param claimToken the counter-party claim token + * @param id id of the negotiation + * @param tokenRepresentation the counter-party claim token * @return a succeeded result containing the negotiation if it was found, a failed one otherwise */ @NotNull - ServiceResult findById(String id, ClaimToken claimToken); + ServiceResult findById(String id, TokenRepresentation tokenRepresentation); } diff --git a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/transferprocess/TransferProcessProtocolService.java b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/transferprocess/TransferProcessProtocolService.java index 5339f90c0b7..6f493b92d72 100644 --- a/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/transferprocess/TransferProcessProtocolService.java +++ b/spi/control-plane/control-plane-spi/src/main/java/org/eclipse/edc/connector/spi/transferprocess/TransferProcessProtocolService.java @@ -19,7 +19,7 @@ import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferRequestMessage; import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferStartMessage; import org.eclipse.edc.connector.transfer.spi.types.protocol.TransferTerminationMessage; -import org.eclipse.edc.spi.iam.ClaimToken; +import org.eclipse.edc.spi.iam.TokenRepresentation; import org.eclipse.edc.spi.result.ServiceResult; import org.jetbrains.annotations.NotNull; @@ -31,51 +31,51 @@ public interface TransferProcessProtocolService { /** * Notifies the TransferProcess that it has been requested by the counter-part. * - * @param message the incoming message - * @param claimToken the counter-party claim token + * @param message the incoming message + * @param tokenRepresentation the counter-party claim token * @return a succeeded result if the operation was successful, a failed one otherwise */ @NotNull - ServiceResult notifyRequested(TransferRequestMessage message, ClaimToken claimToken); + ServiceResult notifyRequested(TransferRequestMessage message, TokenRepresentation tokenRepresentation); /** * Notifies the TransferProcess that it has been started by the counter-part. * - * @param message the incoming message - * @param claimToken the counter-party claim token + * @param message the incoming message + * @param tokenRepresentation the counter-party claim token * @return a succeeded result if the operation was successful, a failed one otherwise */ @NotNull - ServiceResult notifyStarted(TransferStartMessage message, ClaimToken claimToken); + ServiceResult notifyStarted(TransferStartMessage message, TokenRepresentation tokenRepresentation); /** * Notifies the TransferProcess that it has been completed by the counter-part. * - * @param message the incoming message - * @param claimToken the counter-party claim token + * @param message the incoming message + * @param tokenRepresentation the counter-party claim token * @return a succeeded result if the operation was successful, a failed one otherwise */ @NotNull - ServiceResult notifyCompleted(TransferCompletionMessage message, ClaimToken claimToken); + ServiceResult notifyCompleted(TransferCompletionMessage message, TokenRepresentation tokenRepresentation); /** * Notifies the TransferProcess that it has been terminated by the counter-part. * - * @param message the incoming message - * @param claimToken the counter-party claim token + * @param message the incoming message + * @param tokenRepresentation the counter-party claim token * @return a succeeded result if the operation was successful, a failed one otherwise */ @NotNull - ServiceResult notifyTerminated(TransferTerminationMessage message, ClaimToken claimToken); + ServiceResult notifyTerminated(TransferTerminationMessage message, TokenRepresentation tokenRepresentation); /** * Finds a transfer process that has been requested by the counter-part. An existing * process, for which the counter-part is not authorized, is treated as non-existent. * - * @param id id of the transfer process - * @param claimToken the counter-party claim token + * @param id id of the transfer process + * @param tokenRepresentation the counter-party claim token * @return a succeeded result containing the transfer process if it was found, a failed one otherwise */ @NotNull - ServiceResult findById(String id, ClaimToken claimToken); + ServiceResult findById(String id, TokenRepresentation tokenRepresentation); }