Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add pending state to ContractNegotiation and TransferProcess #3321

Merged
merged 2 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
*
*/
@FunctionalInterface
public interface StateProcessor {
public interface Processor {

/**
* Process states
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.statemachine;

import java.util.Collection;
import java.util.Objects;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;

import static java.util.function.Predicate.isEqual;

/**
* Describes the processing flow applied by a state machine. The entities are provided by a supplier.
* A process is a function that returns a boolean that indicates if the entity has been processed or not in
* the scope of the function.
* The run method returns the processed state count, this is used by the state machine to decide
* to apply the wait strategy or not.
* <p>
* An {@link Guard} can be registered, if its predicate is verified, the guard processor is executed instead of the standard one.
*
* @param <E> the entity that is processed
*/
public class ProcessorImpl<E> implements Processor {

private final Supplier<Collection<E>> entities;
private Function<E, Boolean> process;
private Guard<E> guard = Guard.noop();

private ProcessorImpl(Supplier<Collection<E>> entitiesSupplier) {
entities = entitiesSupplier;
}

@Override
public Long process() {
return entities.get().stream()
.map(entity -> guard.predicate().test(entity)
? guard.process().apply(entity) :
process.apply(entity))
.filter(isEqual(true))
.count();
}

public static class Builder<E> {

private final ProcessorImpl<E> processor;

public Builder(Supplier<Collection<E>> entitiesSupplier) {
processor = new ProcessorImpl<>(entitiesSupplier);
}

public static <E> Builder<E> newInstance(Supplier<Collection<E>> entitiesSupplier) {
return new Builder<>(entitiesSupplier);
}

public Builder<E> process(Function<E, Boolean> process) {
processor.process = process;
return this;
}

public Builder<E> guard(Predicate<E> predicate, Function<E, Boolean> process) {
processor.guard = new Guard<>(predicate, process);
return this;
}

public ProcessorImpl<E> build() {
Objects.requireNonNull(processor.process);

return processor;
}
}

private record Guard<E>(Predicate<E> predicate, Function<E, Boolean> process) {
static <E> Guard<E> noop() {
return new Guard<>(e -> false, e -> false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
*/
public class StateMachineManager {

private final List<StateProcessor> processors = new ArrayList<>();
private final List<Processor> processors = new ArrayList<>();
private final ScheduledExecutorService executor;
private final AtomicBoolean active = new AtomicBoolean();
private final WaitStrategy waitStrategy;
Expand All @@ -65,7 +65,7 @@ private StateMachineManager(String name, Monitor monitor, ExecutorInstrumentatio
*/
public Future<?> start() {
active.set(true);
return submit(0L);
return scheduleNextIterationIn(0L);
}

/**
Expand Down Expand Up @@ -95,41 +95,37 @@ public boolean isActive() {
return active.get();
}

@NotNull
private Future<?> submit(long delayMillis) {
return executor.schedule(loop(), delayMillis, MILLISECONDS);
}

private Runnable loop() {
return () -> {
if (active.get()) {
long delay = performLogic();

// Submit next execution after delay
submit(delay);
performLogic();
}
};
}

private long performLogic() {
private void performLogic() {
try {
var processed = processors.stream()
.mapToLong(StateProcessor::process)
.mapToLong(Processor::process)
.sum();

waitStrategy.success();

if (processed == 0) {
return waitStrategy.waitForMillis();
}
var delay = processed == 0 ? waitStrategy.waitForMillis() : 0;

scheduleNextIterationIn(delay);
} catch (Error e) {
active.set(false);
monitor.severe(format("StateMachineManager [%s] unrecoverable error", name), e);
} catch (Throwable e) {
monitor.severe(format("StateMachineManager [%s] error caught", name), e);
return waitStrategy.retryInMillis();
scheduleNextIterationIn(waitStrategy.retryInMillis());
}
return 0;
}

@NotNull
private Future<?> scheduleNextIterationIn(long delayMillis) {
return executor.schedule(loop(), delayMillis, MILLISECONDS);
}

public static class Builder {
Expand All @@ -144,7 +140,7 @@ public static Builder newInstance(String name, Monitor monitor, ExecutorInstrume
return new Builder(name, monitor, instrumentation, waitStrategy);
}

public Builder processor(StateProcessor processor) {
public Builder processor(Processor processor) {
loop.processors.add(processor);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
* to apply the wait strategy or not.
*
* @param <T> the entity that is processed
* @deprecated please use {@link ProcessorImpl}
*/
public class StateProcessorImpl<T> implements StateProcessor {
@Deprecated(since = "0.1.3")
public class StateProcessorImpl<T> implements Processor {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there really any point to deprecating this? It's an internal class, not intended for public use

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least tractus-x is using this for the EDR state machine, I think this deprecation work helps a lot migrating from version to version (because the @deprecated tag in the javadoc text)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this begs a broader discussion about SPIs, what we should and should not deprecate etc.


private final Supplier<Collection<T>> entities;
private final Function<T, Boolean> process;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - Initial implementation
*
*/

package org.eclipse.edc.statemachine;

import org.eclipse.edc.statemachine.retry.TestEntity;
import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.function.Function;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;

class ProcessorImplTest {

@Test
void shouldReturnTheProcessedCount() {
var entity = TestEntity.Builder.newInstance().id("id").build();
var processor = ProcessorImpl.Builder.newInstance(() -> List.of(entity))
.process(e -> true)
.build();

var count = processor.process();

assertThat(count).isEqualTo(1);
}

@Test
void shouldNotCountUnprocessedEntities() {
var entity = TestEntity.Builder.newInstance().id("id").build();
var processor = ProcessorImpl.Builder.newInstance(() -> List.of(entity))
.process(e -> false)
.build();

var count = processor.process();

assertThat(count).isEqualTo(0);
}

@Test
void shouldExecuteGuard_whenItsPredicateMatches() {
var entity = TestEntity.Builder.newInstance().id("id").build();
Function<TestEntity, Boolean> process = mock();
Function<TestEntity, Boolean> guardProcess = mock();
when(guardProcess.apply(any())).thenReturn(true);
var processor = ProcessorImpl.Builder.newInstance(() -> List.of(entity))
.guard(e -> true, guardProcess)
.process(process)
.build();

var count = processor.process();

assertThat(count).isEqualTo(1);
verify(guardProcess).apply(entity);
verifyNoInteractions(process);
}

@Test
void shouldExecuteDefaultProcessor_whenGuardPredicateDoesNotMatch() {
var entity = TestEntity.Builder.newInstance().id("id").build();
Function<TestEntity, Boolean> process = mock();
Function<TestEntity, Boolean> guardProcess = mock();
when(process.apply(any())).thenReturn(true);
var processor = ProcessorImpl.Builder.newInstance(() -> List.of(entity))
.guard(e -> false, guardProcess)
.process(process)
.build();

var count = processor.process();

assertThat(count).isEqualTo(1);
verify(process).apply(entity);
verifyNoInteractions(guardProcess);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void setUp() {

@Test
void shouldExecuteProcessorsAsyncAndCanBeStopped() throws InterruptedException {
var processor = mock(StateProcessor.class);
var processor = mock(Processor.class);
when(processor.process()).thenAnswer(i -> {
Thread.sleep(100L);
return 1L;
Expand All @@ -68,7 +68,7 @@ void shouldExecuteProcessorsAsyncAndCanBeStopped() throws InterruptedException {

@Test
void shouldNotWaitForSomeTimeIfTheresAtLeastOneProcessedEntity() throws InterruptedException {
var processor = mock(StateProcessor.class);
var processor = mock(Processor.class);
when(processor.process()).thenReturn(1L);
doAnswer(i -> {
return 1L;
Expand All @@ -87,7 +87,7 @@ void shouldNotWaitForSomeTimeIfTheresAtLeastOneProcessedEntity() throws Interrup

@Test
void shouldWaitForSomeTimeIfNoEntityIsProcessed() throws InterruptedException {
var processor = mock(StateProcessor.class);
var processor = mock(Processor.class);
when(processor.process()).thenReturn(0L);
var waitStrategy = mock(WaitStrategy.class);
doAnswer(i -> {
Expand All @@ -107,7 +107,7 @@ void shouldWaitForSomeTimeIfNoEntityIsProcessed() throws InterruptedException {

@Test
void shouldExitWithAnExceptionIfProcessorExitsWithAnUnrecoverableError() {
var processor = mock(StateProcessor.class);
var processor = mock(Processor.class);
when(processor.process()).thenThrow(new Error("unrecoverable"));
var stateMachine = StateMachineManager.Builder.newInstance("test", monitor, instrumentation, waitStrategy)
.processor(processor)
Expand All @@ -119,7 +119,7 @@ void shouldExitWithAnExceptionIfProcessorExitsWithAnUnrecoverableError() {

@Test
void shouldWaitRetryTimeWhenAnExceptionIsThrownByAnProcessor() throws InterruptedException {
var processor = mock(StateProcessor.class);
var processor = mock(Processor.class);
when(processor.process()).thenThrow(new EdcException("exception")).thenReturn(0L);
when(waitStrategy.retryInMillis()).thenAnswer(i -> {
return 1L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import org.eclipse.edc.spi.entity.StatefulEntity;

class TestEntity extends StatefulEntity<TestEntity> {
public class TestEntity extends StatefulEntity<TestEntity> {
@Override
public TestEntity copy() {
return this;
Expand Down Expand Up @@ -47,7 +47,7 @@ public Builder self() {
}

@Override
protected TestEntity build() {
public TestEntity build() {
return super.build();
}
}
Expand Down
Loading