From 912441f7d36adfb84f06595cecd8895b7518d8c9 Mon Sep 17 00:00:00 2001 From: Andreas Huber Date: Wed, 18 Jan 2023 13:07:08 +0100 Subject: [PATCH] Add .singleOptional() to Mono (#3317) Introducing a new operator for the `Mono` stack. It converts source `Mono` signals of type `T` to `Optional`. In case of a value, it is wrapped. When the source completes without a value, an empty `Optional` is delivered. Errors are simply propagated downstream. Having a dedicated operator for this case improves performance greatly over a combination of operators: `.map(Optional::ofNullable).defaultIfEmpty(Optional.empty())`. --- .../java/reactor/core/publisher/Mono.java | 33 +- .../core/publisher/MonoSingleOptional.java | 124 ++ .../publisher/MonoSingleOptionalCallable.java | 98 ++ .../doc-files/marbles/singleOptional.svg | 1220 +++++++++++++++++ .../core/publisher/MonoSingleMonoTest.java | 15 +- .../MonoSingleOptionalCallableTest.java | 141 ++ .../publisher/MonoSingleOptionalTest.java | 186 +++ 7 files changed, 1814 insertions(+), 3 deletions(-) create mode 100644 reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java create mode 100644 reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java create mode 100644 reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/singleOptional.svg create mode 100644 reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalCallableTest.java create mode 100644 reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java diff --git a/reactor-core/src/main/java/reactor/core/publisher/Mono.java b/reactor-core/src/main/java/reactor/core/publisher/Mono.java index 77cf41a2d8..b715cef9de 100644 --- a/reactor-core/src/main/java/reactor/core/publisher/Mono.java +++ b/reactor-core/src/main/java/reactor/core/publisher/Mono.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2016-2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -4245,6 +4245,37 @@ public final Mono single() { return Mono.onAssembly(new MonoSingleMono<>(this)); } + /** + * Wrap the item produced by this {@link Mono} source into an Optional + * or emit an empty Optional for an empty source. + *

+ * + *

+ * + * @return a {@link Mono} with an Optional containing the item, an empty optional or an error signal + */ + public final Mono> singleOptional() { + if (this instanceof Callable) { + if (this instanceof Fuseable.ScalarCallable) { + @SuppressWarnings("unchecked") + Fuseable.ScalarCallable scalarCallable = (Fuseable.ScalarCallable) this; + + T v; + try { + v = scalarCallable.call(); + } + catch (Exception e) { + return Mono.error(Exceptions.unwrap(e)); + } + return Mono.just(Optional.ofNullable(v)); + } + @SuppressWarnings("unchecked") + Callable thiz = (Callable)this; + return Mono.onAssembly(new MonoSingleOptionalCallable<>(thiz)); + } + return Mono.onAssembly(new MonoSingleOptional<>(this)); + } + /** * Subscribe to this {@link Mono} and request unbounded demand. *

diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java new file mode 100644 index 0000000000..dd0d43331a --- /dev/null +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptional.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import java.util.Optional; + +import org.reactivestreams.Subscription; + +import reactor.core.CoreSubscriber; +import reactor.util.annotation.Nullable; +import reactor.util.context.Context; + +/** + * Emits a single item from the source wrapped into an Optional, emits + * an empty Optional instead for empty source. + * + * @param the value type + * @see Reactive-Streams-Commons + */ +final class MonoSingleOptional extends InternalMonoOperator> { + + MonoSingleOptional(Mono source) { + super(source); + } + + @Override + public CoreSubscriber subscribeOrReturn(CoreSubscriber> actual) { + return new MonoSingleOptional.SingleOptionalSubscriber<>(actual); + } + + @Override + public Object scanUnsafe(Attr key) { + if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; + return super.scanUnsafe(key); + } + + static final class SingleOptionalSubscriber extends Operators.MonoInnerProducerBase> implements InnerConsumer { + + Subscription s; + + boolean done; + + @Override + @Nullable + public Object scanUnsafe(Attr key) { + if (key == Attr.TERMINATED) return done; + if (key == Attr.PARENT) return s; + if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; + + return super.scanUnsafe(key); + } + + @Override + public Context currentContext() { + return actual().currentContext(); + } + + SingleOptionalSubscriber(CoreSubscriber> actual) { + super(actual); + } + + @Override + public void doOnRequest(long n) { + s.request(Long.MAX_VALUE); + } + + @Override + public void doOnCancel() { + s.cancel(); + } + + @Override + public void onSubscribe(Subscription s) { + if (Operators.validate(this.s, s)) { + this.s = s; + actual().onSubscribe(this); + } + } + + @Override + public void onNext(T t) { + if (done) { + Operators.onNextDropped(t, actual().currentContext()); + return; + } + done = true; + complete(Optional.of(t)); + } + + @Override + public void onError(Throwable t) { + if (done) { + Operators.onErrorDropped(t, actual().currentContext()); + return; + } + done = true; + actual().onError(t); + } + + @Override + public void onComplete() { + if (done) { + return; + } + done = true; + complete(Optional.empty()); + } + + } +} diff --git a/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java new file mode 100644 index 0000000000..360e53ce45 --- /dev/null +++ b/reactor-core/src/main/java/reactor/core/publisher/MonoSingleOptionalCallable.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2016-2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import reactor.core.CoreSubscriber; +import reactor.core.Exceptions; +import reactor.util.annotation.Nullable; + +import java.time.Duration; +import java.util.NoSuchElementException; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.Callable; + +/** + * Emits a single item from the source wrapped into an Optional, emits + * an empty Optional instead for empty source. + * + * @param the value type + * @see Reactive-Streams-Commons + */ +final class MonoSingleOptionalCallable extends Mono> + implements Callable>, SourceProducer> { + + final Callable callable; + + MonoSingleOptionalCallable(Callable source) { + this.callable = Objects.requireNonNull(source, "source"); + } + + @Override + public void subscribe(CoreSubscriber> actual) { + Operators.MonoInnerProducerBase> + sds = new Operators.MonoInnerProducerBase<>(actual); + + actual.onSubscribe(sds); + + if (sds.isCancelled()) { + return; + } + + try { + T t = callable.call(); + sds.complete(Optional.ofNullable(t)); + } + catch (Throwable e) { + actual.onError(Operators.onOperatorError(e, actual.currentContext())); + } + + } + + @Override + public Optional block() { + //duration is ignored below + return block(Duration.ZERO); + } + + @Override + public Optional block(Duration m) { + final T v; + + try { + v = callable.call(); + } + catch (Throwable e) { + throw Exceptions.propagate(e); + } + + return Optional.ofNullable(v); + } + + @Override + public Optional call() throws Exception { + final T v = callable.call(); + + return Optional.ofNullable(v); + } + + @Override + public Object scanUnsafe(Attr key) { + if (key == Attr.RUN_STYLE) return Attr.RunStyle.SYNC; + return null; + } +} diff --git a/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/singleOptional.svg b/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/singleOptional.svg new file mode 100644 index 0000000000..bf5aaa56c4 --- /dev/null +++ b/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/singleOptional.svg @@ -0,0 +1,1220 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + singleOptional + + + + + + + + + + + + + + + + + + + + + Optional.of(     ) + + Optional.empty() + diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleMonoTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleMonoTest.java index d443972a6a..f9787c8264 100644 --- a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleMonoTest.java +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleMonoTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2017-2021 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -44,6 +44,12 @@ public void callableValued() { .expectNext("foo") .verifyComplete(); } + + @Test + public void callableError() { + StepVerifier.create(Mono.error(new IllegalStateException("failed")).single()) + .expectErrorMessage("failed"); + } @Test public void normalEmpty() { @@ -59,7 +65,12 @@ public void normalValued() { .expectNext("foo") .verifyComplete(); } - + + @Test + public void normalError() { + StepVerifier.create(Mono.error(new IllegalStateException("failed")).hide().single()) + .expectErrorMessage("failed"); + } // see https://github.com/reactor/reactor-core/issues/2663 @Test void fusionMonoSingleMonoDoesntTriggerFusion() { diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalCallableTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalCallableTest.java new file mode 100644 index 0000000000..73c4ac1d40 --- /dev/null +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalCallableTest.java @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2021-2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.Optional; +import java.util.concurrent.Callable; + +import org.junit.jupiter.api.Test; + +import reactor.core.Fuseable; +import reactor.core.Scannable; +import reactor.test.StepVerifier; + +class MonoSingleOptionalCallableTest { + + @Test + void testCallableFusedEmptySource() { + Mono> mono = Mono + .fromSupplier(() -> null) + .singleOptional(); + + StepVerifier.create(mono) + .expectNext(Optional.empty()) + .verifyComplete(); + } + + @Test + void testCallableFusedSingleEmptySourceOnBlock() { + Mono> mono = Mono + .fromSupplier(() -> null) + .singleOptional(); + + assertEquals(Optional.empty(), mono.block()); + } + + @Test + void testCallableFusedSingleEmptySourceOnCall() throws Exception { + Mono> mono = Mono + .fromSupplier(() -> null) + .singleOptional(); + + assertThat(mono).isInstanceOf(MonoSingleOptionalCallable.class); + + assertEquals(Optional.empty(), ((Callable) mono).call()); + } + + @Test + void sourceNull() { + assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> { + new MonoSingleOptionalCallable<>(null); + }); + } + + @Test + void normal() { + StepVerifier.create(new MonoSingleOptionalCallable<>(() -> 1)) + .expectNext(Optional.of(1)) + .verifyComplete(); + } + + @Test + void normalBackpressured() { + StepVerifier.create(new MonoSingleOptionalCallable<>(() -> 1), 0) + .expectSubscription() + .expectNoEvent(Duration.ofMillis(50)) + .thenRequest(1) + .expectNext(Optional.of(1)) + .verifyComplete(); + } + + //scalarCallable empty/error/just are not instantiating MonoSingleOptionalCallable and are covered in MonoSingleTest + //we still cover the case where a callable source throws + + @Test + void failingCallable() { + StepVerifier.create(new MonoSingleOptionalCallable<>(() -> { throw new IllegalStateException("test"); } )) + .verifyErrorMessage("test"); + } + + @Test + void emptyCallable() { + StepVerifier.create(new MonoSingleOptionalCallable<>(() -> null)) + .expectNext(Optional.empty()) + .verifyComplete(); + } + + @Test + void valuedCallable() { + @SuppressWarnings("unchecked") + Callable fluxCallable = (Callable) Mono.fromCallable(() -> 1).flux(); + + + StepVerifier.create(new MonoSingleOptionalCallable<>(fluxCallable)) + .expectNext(Optional.of(1)) + .verifyComplete(); + } + + @Test + void fusionMonoSingleOptionalCallableDoesntTriggerFusion() { + Mono> fusedCase = Mono + .fromCallable(() -> 1) + .singleOptional(); + + assertThat(fusedCase) + .as("fusedCase assembly check") + .isInstanceOf(MonoSingleOptionalCallable.class) + .isNotInstanceOf(Fuseable.class); + + assertThatCode(() -> fusedCase.filter(v -> true).block()) + .as("fusedCase fused") + .doesNotThrowAnyException(); + } + + @Test + void scanOperator(){ + MonoSingleOptionalCallable test = new MonoSingleOptionalCallable<>(() -> "foo"); + + assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); + } + +} diff --git a/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java new file mode 100644 index 0000000000..44b5b93116 --- /dev/null +++ b/reactor-core/src/test/java/reactor/core/publisher/MonoSingleOptionalTest.java @@ -0,0 +1,186 @@ +/* + * Copyright (c) 2017-2023 VMware Inc. or its affiliates, All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package reactor.core.publisher; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; + +import java.util.Optional; +import java.util.concurrent.Callable; +import java.util.function.Function; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.reactivestreams.Subscription; + +import reactor.core.CoreSubscriber; +import reactor.core.Fuseable; +import reactor.core.Scannable; +import reactor.test.StepVerifier; + +public class MonoSingleOptionalTest { + + @Nested + class ConcreteClassConsistency { + // tests Mono.singleOptional returned classes + + @Test + void monoWithScalarEmpty() { + Mono source = Mono.empty(); + Mono> singleOptional = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(singleOptional).as("singleOptional") + .isInstanceOf(MonoJust.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void monoWithScalarError() { + Mono source = Mono.error(new IllegalStateException("test")); + Mono> singleOptional = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(singleOptional).as("singleOptional") + .isInstanceOf(MonoError.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void monoWithScalarValue() { + Mono source = Mono.just(1); + Mono> single = source.singleOptional(); + + assertThat(source).as("source").isInstanceOf(Fuseable.ScalarCallable.class); + assertThat(single).as("singleOptional") + .isInstanceOf(MonoJust.class) + .isInstanceOf(Fuseable.ScalarCallable.class); + } + + @Test + void monoWithCallable() { + Mono source = Mono.fromSupplier(() -> 1); + Mono> single = source.singleOptional(); + + assertThat(source).as("source") + .isInstanceOf(Callable.class) + .isNotInstanceOf(Fuseable.ScalarCallable.class); + assertThat(single).as("singleOptional").isInstanceOf(MonoSingleOptionalCallable.class); + } + + @Test + void monoWithNormal() { + Mono source = Mono.just(1).hide(); + Mono> single = source.singleOptional(); + + assertThat(source).as("source").isNotInstanceOf(Callable.class); // excludes + // ScalarCallable + // too + assertThat(single).as("singleOptional").isInstanceOf(MonoSingleOptional.class); + } + } + + @Test + void source1Null() { + assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> { + new MonoSingleOptional<>(null); + }); + } + + @Test + public void callableEmpty() { + StepVerifier.create(Mono.empty().singleOptional()) + .expectNext(Optional.empty()) + .verifyComplete(); + } + + @Test + public void callableValued() { + StepVerifier.create(Mono.just("foo").singleOptional()) + .expectNext(Optional.of("foo")) + .verifyComplete(); + } + + @Test + public void callableError() { + StepVerifier.create(Mono.error(new IllegalStateException("failed")).singleOptional()) + .expectErrorMessage("failed"); + } + + @Test + public void normalEmpty() { + StepVerifier.create(Mono.empty().hide().singleOptional()) + .expectNext(Optional.empty()) + .verifyComplete(); + } + + @Test + public void normalValued() { + StepVerifier.create(Mono.just("foo").hide().singleOptional()) + .expectNext(Optional.of("foo")) + .verifyComplete(); + } + + @Test + public void normalError() { + StepVerifier.create(Mono.error(new IllegalStateException("failed")).hide().singleOptional()) + .expectErrorMessage("failed"); + } + + @Test + void fusionMonoSingleFusion() { + Mono> fusedCase = Mono.just(1).map(Function.identity()).singleOptional(); + + assertThat(fusedCase).as("fusedCase assembly check") + .isInstanceOf(MonoSingleOptional.class) + .isNotInstanceOf(Fuseable.class); + + assertThatCode(() -> fusedCase.filter(v -> true).block()).as("fusedCase fused") + .doesNotThrowAnyException(); + } + + @Test + public void scanOperator() { + MonoSingleOptional test = new MonoSingleOptional<>(Mono.just("foo")); + + assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); + } + + @Test + public void scanSubscriber() { + CoreSubscriber> actual = new LambdaMonoSubscriber<>(null, e -> {}, null, null); + MonoSingleOptional.SingleOptionalSubscriber test = new MonoSingleOptional.SingleOptionalSubscriber<>( + actual); + Subscription parent = Operators.emptySubscription(); + test.onSubscribe(parent); + + assertThat(test.scan(Scannable.Attr.PREFETCH)).isEqualTo(Integer.MAX_VALUE); + + assertThat(test.scan(Scannable.Attr.PARENT)).isSameAs(parent); + assertThat(test.scan(Scannable.Attr.ACTUAL)).isSameAs(actual); + assertThat(test.scan(Scannable.Attr.RUN_STYLE)).isSameAs(Scannable.Attr.RunStyle.SYNC); + + assertThat(test.scan(Scannable.Attr.TERMINATED)).isFalse(); + test.onError(new IllegalStateException("boom")); + assertThat(test.scan(Scannable.Attr.TERMINATED)).isTrue(); + + assertThat(test.scan(Scannable.Attr.CANCELLED)).isFalse(); + test.cancel(); + assertThat(test.scan(Scannable.Attr.CANCELLED)).isTrue(); + } +}