Skip to content

Commit

Permalink
refactor: avoid illegal status in state machines (#3438)
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt authored Sep 12, 2023
1 parent 8cac269 commit 1d2ca31
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@

package org.eclipse.edc.connector.contract.negotiation;

import io.opentelemetry.instrumentation.annotations.WithSpan;
import org.eclipse.edc.connector.contract.spi.negotiation.ContractNegotiationPendingGuard;
import org.eclipse.edc.connector.contract.spi.negotiation.observe.ContractNegotiationObservable;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationTerminationMessage;
import org.eclipse.edc.connector.policy.spi.store.PolicyDefinitionStore;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.monitor.Monitor;
Expand All @@ -39,6 +41,7 @@
import java.util.Objects;
import java.util.function.Function;

import static java.lang.String.format;
import static org.eclipse.edc.connector.contract.ContractCoreExtension.DEFAULT_BATCH_SIZE;
import static org.eclipse.edc.connector.contract.ContractCoreExtension.DEFAULT_ITERATION_WAIT;
import static org.eclipse.edc.connector.contract.ContractCoreExtension.DEFAULT_SEND_RETRY_BASE_DELAY;
Expand Down Expand Up @@ -80,6 +83,32 @@ private boolean setPending(ContractNegotiation contractNegotiation) {
return true;
}

/**
* Processes {@link ContractNegotiation} in state TERMINATING. Tries to send a contract termination to the counter
* party. If this succeeds, the ContractNegotiation is transitioned to state TERMINATED. Else, it is transitioned
* to TERMINATING for a retry.
*
* @return true if processed, false elsewhere
*/
@WithSpan
protected boolean processTerminating(ContractNegotiation negotiation) {
var termination = ContractNegotiationTerminationMessage.Builder.newInstance()
.protocol(negotiation.getProtocol())
.counterPartyAddress(negotiation.getCounterPartyAddress())
.processId(negotiation.getCorrelationId())
.rejectionReason(negotiation.getErrorDetail())
.policy(negotiation.getLastContractOffer().getPolicy())
.build();

return entityRetryProcessFactory.doAsyncStatusResultProcess(negotiation, () -> dispatcherRegistry.dispatch(Object.class, termination))
.entityRetrieve(negotiationStore::findById)
.onSuccess((n, result) -> transitionToTerminated(n))
.onFailure((n, throwable) -> transitionToTerminating(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminated(n, format("Failed to send %s to counter party: %s", termination.getClass().getSimpleName(), throwable.getMessage())))
.execute("[%s] send termination".formatted(type().name()));
}

protected void transitionToInitial(ContractNegotiation negotiation) {
negotiation.transitionInitial();
update(negotiation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreementMessage;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreementVerificationMessage;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationTerminationMessage;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequest;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractRequestMessage;
import org.eclipse.edc.spi.response.StatusResult;
Expand Down Expand Up @@ -129,7 +128,7 @@ private boolean processRequesting(ContractNegotiation negotiation) {
.counterPartyAddress(negotiation.getCounterPartyAddress())
.callbackAddress(protocolWebhook.url())
.protocol(negotiation.getProtocol())
.processId(negotiation.getId())
.processId(negotiation.getCorrelationId())
.type(ContractRequestMessage.Type.INITIAL)
.build();

Expand Down Expand Up @@ -167,7 +166,7 @@ private boolean processAccepting(ContractNegotiation negotiation) {
.protocol(negotiation.getProtocol())
.counterPartyAddress(negotiation.getCounterPartyAddress())
.contractAgreement(agreement)
.processId(negotiation.getId())
.processId(negotiation.getCorrelationId())
.build();

return entityRetryProcessFactory.doAsyncStatusResultProcess(negotiation, () -> dispatcherRegistry.dispatch(Object.class, request))
Expand Down Expand Up @@ -202,7 +201,7 @@ private boolean processVerifying(ContractNegotiation negotiation) {
var message = ContractAgreementVerificationMessage.Builder.newInstance()
.protocol(negotiation.getProtocol())
.counterPartyAddress(negotiation.getCounterPartyAddress())
.processId(negotiation.getId())
.processId(negotiation.getCorrelationId())
.policy(negotiation.getContractAgreement().getPolicy())
.build();

Expand All @@ -215,32 +214,6 @@ private boolean processVerifying(ContractNegotiation negotiation) {
.execute(format("[consumer] send %s", message.getClass().getSimpleName()));
}

/**
* Processes {@link ContractNegotiation} in state TERMINATING. Tries to send a contract rejection to the respective
* provider. If this succeeds, the ContractNegotiation is transitioned to state TERMINATED. Else, it is transitioned
* to TERMINATING for a retry.
*
* @return true if processed, false otherwise
*/
@WithSpan
private boolean processTerminating(ContractNegotiation negotiation) {
var rejection = ContractNegotiationTerminationMessage.Builder.newInstance()
.protocol(negotiation.getProtocol())
.counterPartyAddress(negotiation.getCounterPartyAddress())
.processId(negotiation.getId())
.rejectionReason(negotiation.getErrorDetail())
.policy(negotiation.getLastContractOffer().getPolicy())
.build();

return entityRetryProcessFactory.doAsyncStatusResultProcess(negotiation, () -> dispatcherRegistry.dispatch(Object.class, rejection))
.entityRetrieve(negotiationStore::findById)
.onSuccess((n, result) -> transitionToTerminated(n))
.onFailure((n, throwable) -> transitionToTerminating(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminated(n, format("Failed to send %s to provider: %s", rejection.getClass().getSimpleName(), throwable.getMessage())))
.execute("[Consumer] send rejection");
}

/**
* Builder for ConsumerContractNegotiationManagerImpl.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreementMessage;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractNegotiationEventMessage;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationTerminationMessage;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractOfferMessage;
import org.eclipse.edc.statemachine.StateMachineManager;

Expand All @@ -42,6 +41,7 @@
* Implementation of the {@link ProviderContractNegotiationManager}.
*/
public class ProviderContractNegotiationManagerImpl extends AbstractContractNegotiationManager implements ProviderContractNegotiationManager {

private StateMachineManager stateMachineManager;

private ProviderContractNegotiationManagerImpl() {
Expand Down Expand Up @@ -99,32 +99,6 @@ private boolean processOffering(ContractNegotiation negotiation) {
.execute("[Provider] send counter offer");
}

/**
* Processes {@link ContractNegotiation} in state TERMINATING. Tries to send a contract rejection to the respective
* consumer. If this succeeds, the ContractNegotiation is transitioned to state TERMINATED. Else, it is transitioned
* to TERMINATING for a retry.
*
* @return true if processed, false elsewhere
*/
@WithSpan
private boolean processTerminating(ContractNegotiation negotiation) {
var rejection = ContractNegotiationTerminationMessage.Builder.newInstance()
.protocol(negotiation.getProtocol())
.counterPartyAddress(negotiation.getCounterPartyAddress())
.processId(negotiation.getCorrelationId())
.rejectionReason(negotiation.getErrorDetail())
.policy(negotiation.getLastContractOffer().getPolicy())
.build();

return entityRetryProcessFactory.doAsyncStatusResultProcess(negotiation, () -> dispatcherRegistry.dispatch(Object.class, rejection))
.entityRetrieve(negotiationStore::findById)
.onSuccess((n, result) -> transitionToTerminated(n))
.onFailure((n, throwable) -> transitionToTerminating(n))
.onFatalError((n, failure) -> transitionToTerminated(n, failure.getFailureDetail()))
.onRetryExhausted((n, throwable) -> transitionToTerminated(n, format("Failed to send %s to consumer: %s", rejection.getClass().getSimpleName(), throwable.getMessage())))
.execute("[Provider] send rejection");
}

/**
* Processes {@link ContractNegotiation} in state REQUESTED. It transitions to AGREEING, because the automatic agreement.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.edc.connector.policy.spi.store.PolicyArchive;
import org.eclipse.edc.connector.transfer.provision.DeprovisionResponsesHandler;
import org.eclipse.edc.connector.transfer.provision.ProvisionResponsesHandler;
import org.eclipse.edc.connector.transfer.provision.ResponsesHandler;
import org.eclipse.edc.connector.transfer.spi.TransferProcessManager;
import org.eclipse.edc.connector.transfer.spi.TransferProcessPendingGuard;
import org.eclipse.edc.connector.transfer.spi.flow.DataFlowManager;
Expand All @@ -31,8 +32,6 @@
import org.eclipse.edc.connector.transfer.spi.store.TransferProcessStore;
import org.eclipse.edc.connector.transfer.spi.types.DataFlowResponse;
import org.eclipse.edc.connector.transfer.spi.types.DataRequest;
import org.eclipse.edc.connector.transfer.spi.types.DeprovisionedResource;
import org.eclipse.edc.connector.transfer.spi.types.ProvisionResponse;
import org.eclipse.edc.connector.transfer.spi.types.ResourceManifest;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcess;
import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates;
Expand Down Expand Up @@ -144,9 +143,9 @@ public void start() {
.processor(processTransfersInState(INITIAL, this::processInitial))
.processor(processTransfersInState(PROVISIONING, this::processProvisioning))
.processor(processTransfersInState(PROVISIONED, this::processProvisioned))
.processor(processTransfersInState(REQUESTING, this::processRequesting))
.processor(processTransfersInState(STARTING, this::processStarting))
.processor(processTransfersInState(STARTED, this::processStarted))
.processor(processConsumerTransfersInState(REQUESTING, this::processRequesting))
.processor(processProviderTransfersInState(STARTING, this::processStarting))
.processor(processConsumerTransfersInState(STARTED, this::processStarted))
.processor(processTransfersInState(COMPLETING, this::processCompleting))
.processor(processTransfersInState(TERMINATING, this::processTerminating))
.processor(processTransfersInState(DEPROVISIONING, this::processDeprovisioning))
Expand Down Expand Up @@ -202,24 +201,6 @@ public StatusResult<TransferProcess> initiateConsumerRequest(TransferRequest tra
return StatusResult.success(process);
}

private void handleProvisionResult(TransferProcess transferProcess, List<StatusResult<ProvisionResponse>> responses) {
if (provisionResponsesHandler.handle(transferProcess, responses)) {
update(transferProcess);
provisionResponsesHandler.postActions(transferProcess);
} else {
breakLease(transferProcess);
}
}

private void handleDeprovisionResult(TransferProcess transferProcess, List<StatusResult<DeprovisionedResource>> responses) {
if (deprovisionResponsesHandler.handle(transferProcess, responses)) {
update(transferProcess);
deprovisionResponsesHandler.postActions(transferProcess);
} else {
breakLease(transferProcess);
}
}

/**
* Process INITIAL transfer<p> set it to PROVISIONING
*
Expand Down Expand Up @@ -288,7 +269,7 @@ private boolean processProvisioning(TransferProcess process) {

return entityRetryProcessFactory.doAsyncProcess(process, () -> provisionManager.provision(resources, policy))
.entityRetrieve(transferProcessStore::findById)
.onSuccess(this::handleProvisionResult)
.onSuccess((transferProcess, responses) -> handleResult(transferProcess, responses, provisionResponsesHandler))
.onFailure((t, throwable) -> transitionToProvisioning(t))
.onRetryExhausted((t, throwable) -> {
if (t.getType() == PROVIDER) {
Expand Down Expand Up @@ -324,10 +305,6 @@ private boolean processProvisioned(TransferProcess process) {
*/
@WithSpan
private boolean processRequesting(TransferProcess process) {
if (process.getType() == PROVIDER) {
return false; // should never happen: a provider transfer cannot be REQUESTING
}

var originalDestination = process.getDataDestination();
var dataDestination = Optional.ofNullable(originalDestination.getKeyName())
.flatMap(key -> Optional.ofNullable(vault.resolveSecret(key)))
Expand Down Expand Up @@ -362,11 +339,6 @@ private boolean processRequesting(TransferProcess process) {
*/
@WithSpan
private boolean processStarting(TransferProcess process) {
if (CONSUMER == process.getType()) {
// should never happen: a consumer transfer cannot be STARTING
return false;
}

var policy = policyArchive.findPolicyForContract(process.getContractId());

var description = "Initiate data flow";
Expand Down Expand Up @@ -409,10 +381,6 @@ private void sendTransferStartMessage(TransferProcess process, DataFlowResponse
*/
@WithSpan
private boolean processStarted(TransferProcess transferProcess) {
if (transferProcess.getType() != CONSUMER) {
return false;
}

return entityRetryProcessFactory.doSimpleProcess(transferProcess, () -> checkCompletion(transferProcess))
.execute("Check completion");
}
Expand Down Expand Up @@ -508,14 +476,37 @@ private boolean processDeprovisioning(TransferProcess process) {

return entityRetryProcessFactory.doAsyncProcess(process, () -> provisionManager.deprovision(resourcesToDeprovision, policy))
.entityRetrieve(transferProcessStore::findById)
.onSuccess(this::handleDeprovisionResult)
.onSuccess((transferProcess, responses) -> handleResult(transferProcess, responses, deprovisionResponsesHandler))
.onFailure((t, throwable) -> transitionToDeprovisioning(t))
.onRetryExhausted((t, throwable) -> transitionToDeprovisioningError(t, throwable.getMessage()))
.execute("deprovisioning");
}

private <T> void handleResult(TransferProcess transferProcess, List<StatusResult<T>> responses, ResponsesHandler<StatusResult<T>> handler) {
if (handler.handle(transferProcess, responses)) {
update(transferProcess);
handler.postActions(transferProcess);
} else {
breakLease(transferProcess);
}
}

private Processor processConsumerTransfersInState(TransferProcessStates state, Function<TransferProcess, Boolean> function) {
var filter = new Criterion[]{ hasState(state.code()), isNotPending(), Criterion.criterion("type", "=", CONSUMER.name()) };
return createProcessor(function, filter);
}

private Processor processProviderTransfersInState(TransferProcessStates state, Function<TransferProcess, Boolean> function) {
var filter = new Criterion[]{ hasState(state.code()), isNotPending(), Criterion.criterion("type", "=", PROVIDER.name()) };
return createProcessor(function, filter);
}

private Processor processTransfersInState(TransferProcessStates state, Function<TransferProcess, Boolean> function) {
var filter = new Criterion[]{ hasState(state.code()), isNotPending() };
return createProcessor(function, filter);
}

private ProcessorImpl<TransferProcess> createProcessor(Function<TransferProcess, Boolean> function, Criterion[] filter) {
return ProcessorImpl.Builder.newInstance(() -> transferProcessStore.nextNotLeased(batchSize, filter))
.process(telemetry.contextPropagationMiddleware(function))
.guard(pendingGuard, this::setPending)
Expand Down
Loading

0 comments on commit 1d2ca31

Please sign in to comment.