Skip to content

Commit

Permalink
feat: make state transitions sync (#3313)
Browse files Browse the repository at this point in the history
* feat: make state transitions sync

* PR remarks

* Add decision record

* PR remarks
  • Loading branch information
ndr-brt authored Jul 20, 2023
1 parent b691e01 commit b6c5f4e
Show file tree
Hide file tree
Showing 125 changed files with 2,885 additions and 2,557 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,40 @@

package org.eclipse.edc.connector.core.base;

import org.eclipse.edc.spi.command.Command;
import org.eclipse.edc.spi.command.CommandHandler;
import org.eclipse.edc.spi.command.CommandHandlerRegistry;
import org.eclipse.edc.spi.command.CommandResult;
import org.eclipse.edc.spi.command.EntityCommand;

import java.util.HashMap;
import java.util.Map;

import static java.lang.String.format;

/**
* Implementation of the {@link CommandHandlerRegistry} interface.
*/
public class CommandHandlerRegistryImpl implements CommandHandlerRegistry {

private final Map<Class<? extends Command>, CommandHandler<?>> registrations;
private final Map<Class<? extends EntityCommand>, CommandHandler<?>> registrations;

public CommandHandlerRegistryImpl() {
this.registrations = new HashMap<>();
}

@Override
public <C extends Command> void register(CommandHandler<C> handler) {
public <C extends EntityCommand> void register(CommandHandler<C> handler) {
registrations.put(handler.getType(), handler);
}

@Override
public <C extends Command> CommandHandler<C> get(Class<C> commandClass) {
return (CommandHandler<C>) registrations.get(commandClass);
public <C extends EntityCommand> CommandResult execute(C command) {
var commandHandler = (CommandHandler<C>) registrations.get(command.getClass());

if (commandHandler == null) {
return CommandResult.notExecutable(format("Command type %s cannot be executed", command.getClass()));
}

return commandHandler.handle(command);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.connector.core.base;

import org.eclipse.edc.spi.command.CommandFailure;
import org.eclipse.edc.spi.command.CommandHandler;
import org.eclipse.edc.spi.command.CommandResult;
import org.eclipse.edc.spi.command.EntityCommand;
import org.junit.jupiter.api.Test;

import static org.eclipse.edc.junit.assertions.AbstractResultAssert.assertThat;
import static org.eclipse.edc.spi.command.CommandFailure.Reason.NOT_EXECUTABLE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class CommandHandlerRegistryImplTest {

private final CommandHandler<?> handler = mock();
private final CommandHandlerRegistryImpl registry = new CommandHandlerRegistryImpl();

@Test
void execute_shouldExecuteCommand() {
doReturn(TestCommand.class).when(handler).getType();
when(handler.handle(any())).thenReturn(CommandResult.success());
registry.register(handler);
var command = new TestCommand("id");

var result = registry.execute(command);

assertThat(result).isSucceeded();
}

@Test
void execute_shouldReturnFailure_whenCommandHandlerNotFound() {
var command = new TestCommand("id");

var result = registry.execute(command);

assertThat(result).isFailed().extracting(CommandFailure::getReason).isEqualTo(NOT_EXECUTABLE);
}

private static class TestCommand extends EntityCommand {

TestCommand(String entityId) {
super(entityId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ public TestEntity copy() {
return this;
}

@Override
public String stateAsString() {
return "STATE";
}

@JsonPOJOBuilder(withPrefix = "")
public static class Builder extends StatefulEntity.Builder<TestEntity, Builder> {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright (c) 2023 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.validator.jsonobject.validators;

import jakarta.json.JsonString;
import org.eclipse.edc.validator.jsonobject.JsonLdPath;
import org.eclipse.edc.validator.spi.ValidationResult;
import org.eclipse.edc.validator.spi.Validator;

import static java.lang.String.format;
import static org.eclipse.edc.validator.spi.Violation.violation;

/**
* Verify that the @id field is not null or blank.
*/
public class MandatoryIdNotBlank implements Validator<JsonString> {

private final JsonLdPath path;

public MandatoryIdNotBlank(JsonLdPath path) {
this.path = path;
}

@Override
public ValidationResult validate(JsonString id) {
if (id == null || id.getString().isBlank()) {
return ValidationResult.failure(violation(format("%s cannot be null or blank", path), path.toString()));
} else {
return ValidationResult.success();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.contract.spi.offer.ContractDefinitionResolver;
import org.eclipse.edc.connector.contract.spi.offer.store.ContractDefinitionStore;
import org.eclipse.edc.connector.contract.spi.types.command.ContractNegotiationCommand;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.validation.ContractValidationService;
import org.eclipse.edc.connector.contract.validation.ContractExpiryCheckFunction;
Expand All @@ -48,10 +47,6 @@
import org.eclipse.edc.runtime.metamodel.annotation.Setting;
import org.eclipse.edc.spi.agent.ParticipantAgentService;
import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.command.BoundedCommandQueue;
import org.eclipse.edc.spi.command.CommandHandlerRegistry;
import org.eclipse.edc.spi.command.CommandQueue;
import org.eclipse.edc.spi.command.CommandRunner;
import org.eclipse.edc.spi.event.EventRouter;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.monitor.Monitor;
Expand Down Expand Up @@ -120,9 +115,6 @@ public class ContractCoreExtension implements ServiceExtension {
@Inject
private RemoteMessageDispatcherRegistry dispatcherRegistry;

@Inject
private CommandHandlerRegistry commandHandlerRegistry;

@Inject
private ContractNegotiationStore store;

Expand Down Expand Up @@ -205,9 +197,6 @@ private void registerServices(ServiceExtensionContext context) {
var iterationWaitMillis = context.getSetting(NEGOTIATION_STATE_MACHINE_ITERATION_WAIT_MILLIS, DEFAULT_ITERATION_WAIT);
var waitStrategy = context.hasService(NegotiationWaitStrategy.class) ? context.getService(NegotiationWaitStrategy.class) : new ExponentialWaitStrategy(iterationWaitMillis);

CommandQueue<ContractNegotiationCommand> commandQueue = new BoundedCommandQueue<>(10);
var commandRunner = new CommandRunner<ContractNegotiationCommand>(commandHandlerRegistry, monitor);

var observable = new ContractNegotiationObservableImpl();
observable.registerListener(new ContractNegotiationEventListener(eventRouter, clock));

Expand All @@ -219,8 +208,6 @@ private void registerServices(ServiceExtensionContext context) {
.waitStrategy(waitStrategy)
.dispatcherRegistry(dispatcherRegistry)
.monitor(monitor)
.commandQueue(commandQueue)
.commandRunner(commandRunner)
.observable(observable)
.clock(clock)
.telemetry(telemetry)
Expand All @@ -237,8 +224,6 @@ private void registerServices(ServiceExtensionContext context) {
.waitStrategy(waitStrategy)
.dispatcherRegistry(dispatcherRegistry)
.monitor(monitor)
.commandQueue(commandQueue)
.commandRunner(commandRunner)
.observable(observable)
.clock(clock)
.telemetry(telemetry)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,38 +14,41 @@

package org.eclipse.edc.connector.contract;

import org.eclipse.edc.connector.contract.negotiation.command.handlers.CancelNegotiationCommandHandler;
import org.eclipse.edc.connector.contract.negotiation.command.handlers.DeclineNegotiationCommandHandler;
import org.eclipse.edc.connector.contract.negotiation.command.handlers.TerminateNegotiationCommandHandler;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.runtime.metamodel.annotation.CoreExtension;
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
import org.eclipse.edc.runtime.metamodel.annotation.Provides;
import org.eclipse.edc.spi.command.CommandHandlerRegistry;
import org.eclipse.edc.spi.system.ServiceExtension;
import org.eclipse.edc.spi.system.ServiceExtensionContext;

import static org.eclipse.edc.connector.contract.ContractNegotiationCommandExtension.NAME;

/**
* Adds a {@link CommandHandlerRegistry} to the context and registers the
* handlers the core provides.
*/
@CoreExtension
@Provides({ CommandHandlerRegistry.class })
@Extension(value = "Contract Negotiation command handler")
@Extension(value = NAME)
public class ContractNegotiationCommandExtension implements ServiceExtension {

public static final String NAME = "Contract Negotiation command handlers";

@Override
public String name() {
return NAME;
}

@Inject
private ContractNegotiationStore store;

@Inject
private CommandHandlerRegistry registry;

@Override
public void initialize(ServiceExtensionContext context) {
CommandHandlerRegistry registry = context.getService(CommandHandlerRegistry.class);
registerDefaultCommandHandlers(registry);
}

private void registerDefaultCommandHandlers(CommandHandlerRegistry registry) {
registry.register(new CancelNegotiationCommandHandler(store));
registry.register(new DeclineNegotiationCommandHandler(store));
registry.register(new TerminateNegotiationCommandHandler(store));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,9 @@
import org.eclipse.edc.connector.contract.spi.negotiation.observe.ContractNegotiationObservable;
import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.contract.spi.types.command.ContractNegotiationCommand;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates;
import org.eclipse.edc.connector.policy.spi.store.PolicyDefinitionStore;
import org.eclipse.edc.spi.command.CommandProcessor;
import org.eclipse.edc.spi.command.CommandQueue;
import org.eclipse.edc.spi.command.CommandRunner;
import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.protocol.ProtocolWebhook;
Expand All @@ -48,9 +44,6 @@ public abstract class AbstractContractNegotiationManager {
protected ContractNegotiationStore negotiationStore;
protected RemoteMessageDispatcherRegistry dispatcherRegistry;
protected ContractNegotiationObservable observable;
protected CommandQueue<ContractNegotiationCommand> commandQueue;
protected CommandRunner<ContractNegotiationCommand> commandRunner;
protected CommandProcessor<ContractNegotiationCommand> commandProcessor;
protected Monitor monitor;
protected Clock clock;
protected Telemetry telemetry;
Expand Down Expand Up @@ -198,16 +191,6 @@ public Builder<T> dispatcherRegistry(RemoteMessageDispatcherRegistry dispatcherR
return this;
}

public Builder<T> commandQueue(CommandQueue<ContractNegotiationCommand> commandQueue) {
manager.commandQueue = commandQueue;
return this;
}

public Builder<T> commandRunner(CommandRunner<ContractNegotiationCommand> commandRunner) {
manager.commandRunner = commandRunner;
return this;
}

public Builder<T> clock(Clock clock) {
manager.clock = clock;
return this;
Expand Down Expand Up @@ -252,16 +235,13 @@ public T build() {
Objects.requireNonNull(manager.participantId, "participantId");
Objects.requireNonNull(manager.monitor, "monitor");
Objects.requireNonNull(manager.dispatcherRegistry, "dispatcherRegistry");
Objects.requireNonNull(manager.commandQueue, "commandQueue");
Objects.requireNonNull(manager.commandRunner, "commandRunner");
Objects.requireNonNull(manager.observable, "observable");
Objects.requireNonNull(manager.clock, "clock");
Objects.requireNonNull(manager.telemetry, "telemetry");
Objects.requireNonNull(manager.executorInstrumentation, "executorInstrumentation");
Objects.requireNonNull(manager.negotiationStore, "store");
Objects.requireNonNull(manager.policyStore, "policyStore");

manager.commandProcessor = new CommandProcessor<>(manager.commandQueue, manager.commandRunner, manager.monitor);
manager.entityRetryProcessFactory = new EntityRetryProcessFactory(manager.monitor, manager.clock, manager.entityRetryProcessConfiguration);

return manager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreementMessage;
import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreementVerificationMessage;
import org.eclipse.edc.connector.contract.spi.types.command.ContractNegotiationCommand;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiation;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationStates;
import org.eclipse.edc.connector.contract.spi.types.negotiation.ContractNegotiationTerminationMessage;
Expand Down Expand Up @@ -66,7 +65,6 @@ public void start() {
.processor(processNegotiationsInState(AGREED, this::processAgreed))
.processor(processNegotiationsInState(VERIFYING, this::processVerifying))
.processor(processNegotiationsInState(TERMINATING, this::processTerminating))
.processor(onCommands(this::processCommand))
.build();

stateMachineManager.start();
Expand Down Expand Up @@ -106,11 +104,6 @@ public StatusResult<ContractNegotiation> initiate(ContractRequest request) {
return StatusResult.success(negotiation);
}

@Override
public void enqueueCommand(ContractNegotiationCommand command) {
commandQueue.enqueue(command);
}

/**
* Processes {@link ContractNegotiation} in state INITIAL. Transition ContractNegotiation to REQUESTING.
*
Expand Down Expand Up @@ -266,14 +259,6 @@ private StateProcessorImpl<ContractNegotiation> processNegotiationsInState(Contr
return new StateProcessorImpl<>(() -> negotiationStore.nextNotLeased(batchSize, filter), telemetry.contextPropagationMiddleware(function));
}

private StateProcessorImpl<ContractNegotiationCommand> onCommands(Function<ContractNegotiationCommand, Boolean> process) {
return new StateProcessorImpl<>(() -> commandQueue.dequeue(5), process);
}

private boolean processCommand(ContractNegotiationCommand command) {
return commandProcessor.processCommandQueue(command);
}

/**
* Builder for ConsumerContractNegotiationManagerImpl.
*/
Expand Down
Loading

0 comments on commit b6c5f4e

Please sign in to comment.