Skip to content

Commit

Permalink
feat: add manual interactions to ContractNegotiation and TransferProcess
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt committed Jul 24, 2023
1 parent e8ddc6f commit f3065bd
Show file tree
Hide file tree
Showing 54 changed files with 1,087 additions and 727 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*
*/
@FunctionalInterface
public interface StateProcessor {
public interface Processor {

/**
* Process states
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.statemachine;

import java.util.Collection;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static java.util.function.Predicate.isEqual;

/**
* Describes the processing flow applied by a state machine. The entities are provided by a supplier.
* A process is a function that returns a boolean that indicates if the entity has been processed or not in
* the scope of the function.
* The run method returns the processed state count, this is used by the state machine to decide
* to apply the wait strategy or not.
* <p>
* An {@link Guard} can be registered, if its predicate is verified, the guard processor is executed instead of the standard one.
*
* @param <E> the entity that is processed
*/
public class ProcessorImpl<E> implements Processor {

private final Supplier<Collection<E>> entities;
private Function<E, Boolean> process;
private Guard<E> guard = Guard.noop();

private ProcessorImpl(Supplier<Collection<E>> entitiesSupplier) {
entities = entitiesSupplier;
}

@Override
public Long process() {
return entities.get().stream()
.map(entity -> {
if (guard.predicate().test(entity)) {
return guard.process().apply(entity);
} else {
return process.apply(entity);
}
})
.filter(isEqual(true))
.count();
}

public static class Builder<E> {

private final ProcessorImpl<E> processor;

public Builder(Supplier<Collection<E>> entitiesSupplier) {
processor = new ProcessorImpl<>(entitiesSupplier);
}

public static <E> Builder<E> newInstance(Supplier<Collection<E>> entitiesSupplier) {
return new Builder<>(entitiesSupplier);
}

public Builder<E> process(Function<E, Boolean> process) {
processor.process = process;
return this;
}

public Builder<E> guard(Predicate<E> predicate, Function<E, Boolean> process) {
processor.guard = new Guard<>(predicate, process);
return this;
}

public ProcessorImpl<E> build() {
Objects.requireNonNull(processor.process);

return processor;
}
}

private record Guard<E>(Predicate<E> predicate, Function<E, Boolean> process) {
static <E> Guard<E> noop() {
return new Guard<>(e -> false, e -> false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
*/
public class StateMachineManager {

private final List<StateProcessor> processors = new ArrayList<>();
private final List<Processor> processors = new ArrayList<>();
private final ScheduledExecutorService executor;
private final AtomicBoolean active = new AtomicBoolean();
private final WaitStrategy waitStrategy;
Expand All @@ -65,7 +65,7 @@ private StateMachineManager(String name, Monitor monitor, ExecutorInstrumentatio
*/
public Future<?> start() {
active.set(true);
return submit(0L);
return scheduleNextIterationIn(0L);
}

/**
Expand Down Expand Up @@ -95,41 +95,37 @@ public boolean isActive() {
return active.get();
}

@NotNull
private Future<?> submit(long delayMillis) {
return executor.schedule(loop(), delayMillis, MILLISECONDS);
}

private Runnable loop() {
return () -> {
if (active.get()) {
long delay = performLogic();

// Submit next execution after delay
submit(delay);
performLogic();
}
};
}

private long performLogic() {
private void performLogic() {
try {
var processed = processors.stream()
.mapToLong(StateProcessor::process)
.mapToLong(Processor::process)
.sum();

waitStrategy.success();

if (processed == 0) {
return waitStrategy.waitForMillis();
}
var delay = processed == 0 ? waitStrategy.waitForMillis() : 0;

scheduleNextIterationIn(delay);
} catch (Error e) {
active.set(false);
monitor.severe(format("StateMachineManager [%s] unrecoverable error", name), e);
} catch (Throwable e) {
monitor.severe(format("StateMachineManager [%s] error caught", name), e);
return waitStrategy.retryInMillis();
scheduleNextIterationIn(waitStrategy.retryInMillis());
}
return 0;
}

@NotNull
private Future<?> scheduleNextIterationIn(long delayMillis) {
return executor.schedule(loop(), delayMillis, MILLISECONDS);
}

public static class Builder {
Expand All @@ -144,7 +140,7 @@ public static Builder newInstance(String name, Monitor monitor, ExecutorInstrume
return new Builder(name, monitor, instrumentation, waitStrategy);
}

public Builder processor(StateProcessor processor) {
public Builder processor(Processor processor) {
loop.processors.add(processor);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
* to apply the wait strategy or not.
*
* @param <T> the entity that is processed
* @deprecated please use {@link ProcessorImpl}
*/
public class StateProcessorImpl<T> implements StateProcessor {
@Deprecated(since = "0.1.3")
public class StateProcessorImpl<T> implements Processor {

private final Supplier<Collection<T>> entities;
private final Function<T, Boolean> process;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - Initial implementation
*
*/

package org.eclipse.edc.statemachine;

import org.eclipse.edc.statemachine.retry.TestEntity;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.function.Function;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

class ProcessorImplTest {

@Test
void shouldReturnTheProcessedCount() {
var entity = TestEntity.Builder.newInstance().id("id").build();
var processor = ProcessorImpl.Builder.newInstance(() -> List.of(entity))
.process(e -> true)
.build();

var count = processor.process();

assertThat(count).isEqualTo(1);
}

@Test
void shouldNotCountUnprocessedEntities() {
var entity = TestEntity.Builder.newInstance().id("id").build();
var processor = ProcessorImpl.Builder.newInstance(() -> List.of(entity))
.process(e -> false)
.build();

var count = processor.process();

assertThat(count).isEqualTo(0);
}

@Test
void shouldExecuteGuard_whenItsPredicateMatches() {
var entity = TestEntity.Builder.newInstance().id("id").build();
Function<TestEntity, Boolean> process = mock();
Function<TestEntity, Boolean> guardProcess = mock();
when(guardProcess.apply(any())).thenReturn(true);
var processor = ProcessorImpl.Builder.newInstance(() -> List.of(entity))
.guard(e -> true, guardProcess)
.process(process)
.build();

var count = processor.process();

assertThat(count).isEqualTo(1);
verify(guardProcess).apply(entity);
verifyNoInteractions(process);
}

@Test
void shouldExecuteDefaultProcessor_whenGuardPredicateDoesNotMatch() {
var entity = TestEntity.Builder.newInstance().id("id").build();
Function<TestEntity, Boolean> process = mock();
Function<TestEntity, Boolean> guardProcess = mock();
when(process.apply(any())).thenReturn(true);
var processor = ProcessorImpl.Builder.newInstance(() -> List.of(entity))
.guard(e -> false, guardProcess)
.process(process)
.build();

var count = processor.process();

assertThat(count).isEqualTo(1);
verify(process).apply(entity);
verifyNoInteractions(guardProcess);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void setUp() {

@Test
void shouldExecuteProcessorsAsyncAndCanBeStopped() throws InterruptedException {
var processor = mock(StateProcessor.class);
var processor = mock(Processor.class);
when(processor.process()).thenAnswer(i -> {
Thread.sleep(100L);
return 1L;
Expand All @@ -68,7 +68,7 @@ void shouldExecuteProcessorsAsyncAndCanBeStopped() throws InterruptedException {

@Test
void shouldNotWaitForSomeTimeIfTheresAtLeastOneProcessedEntity() throws InterruptedException {
var processor = mock(StateProcessor.class);
var processor = mock(Processor.class);
when(processor.process()).thenReturn(1L);
doAnswer(i -> {
return 1L;
Expand All @@ -87,7 +87,7 @@ void shouldNotWaitForSomeTimeIfTheresAtLeastOneProcessedEntity() throws Interrup

@Test
void shouldWaitForSomeTimeIfNoEntityIsProcessed() throws InterruptedException {
var processor = mock(StateProcessor.class);
var processor = mock(Processor.class);
when(processor.process()).thenReturn(0L);
var waitStrategy = mock(WaitStrategy.class);
doAnswer(i -> {
Expand All @@ -107,7 +107,7 @@ void shouldWaitForSomeTimeIfNoEntityIsProcessed() throws InterruptedException {

@Test
void shouldExitWithAnExceptionIfProcessorExitsWithAnUnrecoverableError() {
var processor = mock(StateProcessor.class);
var processor = mock(Processor.class);
when(processor.process()).thenThrow(new Error("unrecoverable"));
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
Expand All @@ -119,7 +119,7 @@ void shouldExitWithAnExceptionIfProcessorExitsWithAnUnrecoverableError() {

@Test
void shouldWaitRetryTimeWhenAnExceptionIsThrownByAnProcessor() throws InterruptedException {
var processor = mock(StateProcessor.class);
var processor = mock(Processor.class);
when(processor.process()).thenThrow(new EdcException("exception")).thenReturn(0L);
when(waitStrategy.retryInMillis()).thenAnswer(i -> {
return 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import org.eclipse.edc.spi.entity.StatefulEntity;

class TestEntity extends StatefulEntity<TestEntity> {
public class TestEntity extends StatefulEntity<TestEntity> {
@Override
public TestEntity copy() {
return this;
Expand Down Expand Up @@ -47,7 +47,7 @@ public Builder self() {
}

@Override
protected TestEntity build() {
public TestEntity build() {
return super.build();
}
}
Expand Down
Loading

0 comments on commit f3065bd

Please sign in to comment.