Skip to content

Commit

Permalink
Introduce Flux#using{When} reactor refaster rules
Browse files Browse the repository at this point in the history
  • Loading branch information
mohamedsamehsalah committed Nov 3, 2024
1 parent 33d9bad commit 472962e
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,145 @@ Mono<T> after(
}
}

/**
* Don't unnecessarily transform a {@link Mono#using(Callable, Function)} to a flux, instead use
* the equivalent API provided by {@link Flux}.
*/
static final class FluxUsing<
D extends AutoCloseable, T, P extends Publisher<? extends T>, M extends Mono<? extends T>> {
@BeforeTemplate
Flux<T> before(Callable<D> resourceSupplier, Function<D, M> sourceSupplier) {
return Mono.using(resourceSupplier, sourceSupplier).flux();
}

@AfterTemplate
Flux<T> after(Callable<D> resourceSupplier, Function<D, P> sourceSupplier) {
return Flux.using(resourceSupplier, sourceSupplier);
}
}

/**
* Don't unnecessarily transform a {@link Mono#using(Callable, Function, boolean)} to a flux,
* instead use the equivalent API provided by {@link Flux}.
*/
static final class FluxUsingEager<
D extends AutoCloseable, T, P extends Publisher<? extends T>, M extends Mono<? extends T>> {
@BeforeTemplate
Flux<T> before(Callable<D> resourceSupplier, Function<D, M> sourceSupplier, boolean eager) {
return Mono.using(resourceSupplier, sourceSupplier, eager).flux();
}

@AfterTemplate
Flux<T> after(Callable<D> resourceSupplier, Function<D, P> sourceSupplier, boolean eager) {
return Flux.using(resourceSupplier, sourceSupplier, eager);
}
}

/**
* Don't unnecessarily transform a {@link Mono#using(Callable, Function, Consumer)} to a flux,
* instead use the equivalent API provided by {@link Flux}.
*/
static final class FluxUsing2<
D, T, P extends Publisher<? extends T>, M extends Mono<? extends T>> {
@BeforeTemplate
Flux<T> before(
Callable<D> resourceSupplier, Function<D, M> sourceSupplier, Consumer<D> resourceCleanup) {
return Mono.using(resourceSupplier, sourceSupplier, resourceCleanup).flux();
}

@AfterTemplate
Flux<T> after(
Callable<D> resourceSupplier, Function<D, P> sourceSupplier, Consumer<D> resourceCleanup) {
return Flux.using(resourceSupplier, sourceSupplier, resourceCleanup);
}
}

/**
* Don't unnecessarily transform a {@link Mono#using(Callable, Function, Consumer, boolean)} to a
* flux, instead use the equivalent API provided by {@link Flux}.
*/
static final class FluxUsing2Eager<
D, T, P extends Publisher<? extends T>, M extends Mono<? extends T>> {
@BeforeTemplate
Flux<T> before(
Callable<D> resourceSupplier,
Function<D, M> sourceSupplier,
Consumer<D> resourceCleanup,
boolean eager) {
return Mono.using(resourceSupplier, sourceSupplier, resourceCleanup, eager).flux();
}

@AfterTemplate
Flux<T> after(
Callable<D> resourceSupplier,
Function<D, P> sourceSupplier,
Consumer<D> resourceCleanup,
boolean eager) {
return Flux.using(resourceSupplier, sourceSupplier, resourceCleanup, eager);
}
}

/**
* Don't unnecessarily transform a {@link Mono#usingWhen(Publisher, Function, Function)} to a
* flux, instead use the equivalent API provided by {@link Flux}.
*/
static final class FluxUsingWhen<
D,
T,
P extends Publisher<? extends T>,
P2 extends Publisher<?>,
M extends Mono<? extends T>> {
@BeforeTemplate
Flux<T> before(
Publisher<D> resourceSupplier,
Function<D, M> resourceClosure,
Function<D, P2> asyncCleanup) {
return Mono.usingWhen(resourceSupplier, resourceClosure, asyncCleanup).flux();
}

@AfterTemplate
Flux<T> after(
Publisher<D> resourceSupplier,
Function<D, P> resourceClosure,
Function<D, P2> asyncCleanup) {
return Flux.usingWhen(resourceSupplier, resourceClosure, asyncCleanup);
}
}

/**
* Don't unnecessarily transform a {@link Mono#usingWhen(Publisher, Function, Function,
* BiFunction, Function)} to a flux, instead use the equivalent API provided by {@link Flux}.
*/
static final class FluxUsingWhen2<
D,
T,
P extends Publisher<? extends T>,
P2 extends Publisher<?>,
M extends Mono<? extends T>> {
@BeforeTemplate
Flux<T> before(
Publisher<D> resourceSupplier,
Function<D, M> resourceClosure,
Function<D, P2> asyncComplete,
BiFunction<D, ? super Throwable, P2> asyncError,
Function<D, P2> asyncCancel) {
return Mono.usingWhen(
resourceSupplier, resourceClosure, asyncComplete, asyncError, asyncCancel)
.flux();
}

@AfterTemplate
Flux<T> after(
Publisher<D> resourceSupplier,
Function<D, P> resourceClosure,
Function<D, P2> asyncComplete,
BiFunction<D, ? super Throwable, ? extends Publisher<?>> asyncError,
Function<D, P2> asyncCancel) {
return Flux.usingWhen(
resourceSupplier, resourceClosure, asyncComplete, asyncError, asyncCancel);
}
}

/** Don't unnecessarily pass an empty publisher to {@link Flux#switchIfEmpty(Publisher)}. */
static final class FluxSwitchIfEmptyOfEmptyPublisher<T> {
@BeforeTemplate
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,38 @@ Mono<String> testMonoUsingWhen2() {
.single();
}

Flux<String> testFluxUsing() {
return Mono.using(() -> new ByteArrayInputStream(new byte[] {}), s -> Mono.just("foo")).flux();
}

Flux<String> testFluxUsingEager() {
return Mono.using(() -> new ByteArrayInputStream(new byte[] {}), s -> Mono.just("foo"), false)
.flux();
}

Flux<String> testFluxUsing2() {
return Mono.using(() -> "foo", foo -> Mono.just("bar"), foo -> {}).flux();
}

Flux<String> testFluxUsing2Eager() {
return Mono.using(() -> "foo", foo -> Mono.just("bar"), foo -> {}, false).flux();
}

Flux<String> testFluxUsingWhen() {
return Mono.usingWhen(Mono.just("foo"), foo -> Mono.just("bar"), foo -> Mono.just("baz"))
.flux();
}

Flux<String> testFluxUsingWhen2() {
return Mono.usingWhen(
Mono.just("foo"),
foo -> Mono.just("bar"),
foo -> Mono.just("baz"),
(foo, e) -> Mono.just("qux"),
foo -> Mono.just("thud"))
.flux();
}

ImmutableSet<Flux<Integer>> testFluxSwitchIfEmptyOfEmptyPublisher() {
return ImmutableSet.of(
Flux.just(1).switchIfEmpty(Mono.empty()), Flux.just(2).switchIfEmpty(Flux.empty()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,35 @@ Mono<String> testMonoUsingWhen2() {
foo -> Mono.just("thud"));
}

Flux<String> testFluxUsing() {
return Flux.using(() -> new ByteArrayInputStream(new byte[] {}), s -> Mono.just("foo"));
}

Flux<String> testFluxUsingEager() {
return Flux.using(() -> new ByteArrayInputStream(new byte[] {}), s -> Mono.just("foo"), false);
}

Flux<String> testFluxUsing2() {
return Flux.using(() -> "foo", foo -> Mono.just("bar"), foo -> {});
}

Flux<String> testFluxUsing2Eager() {
return Flux.using(() -> "foo", foo -> Mono.just("bar"), foo -> {}, false);
}

Flux<String> testFluxUsingWhen() {
return Flux.usingWhen(Mono.just("foo"), foo -> Mono.just("bar"), foo -> Mono.just("baz"));
}

Flux<String> testFluxUsingWhen2() {
return Flux.usingWhen(
Mono.just("foo"),
foo -> Mono.just("bar"),
foo -> Mono.just("baz"),
(foo, e) -> Mono.just("qux"),
foo -> Mono.just("thud"));
}

ImmutableSet<Flux<Integer>> testFluxSwitchIfEmptyOfEmptyPublisher() {
return ImmutableSet.of(Flux.just(1), Flux.just(2));
}
Expand Down

0 comments on commit 472962e

Please sign in to comment.