From 51075b0d1e54fe5c3cc237c2a747c827e730f68f Mon Sep 17 00:00:00 2001 From: Guillaume L Date: Fri, 21 Oct 2022 14:45:08 +0200 Subject: [PATCH] Update RxJava from unmaintained 3.0.x to latest 3.1.x (#287) Update rxjava3 version to 3.1.5 and update imports. 3.0.x is not maintained since 3.1.0 was released in August 2021. Fixes #283. --- gradle/libs.versions.toml | 2 +- .../adapter/rxjava/RxJava3Adapter.java | 32 +++++++++---------- .../adapter/rxjava/RxJava3AdapterTest.java | 2 +- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 6cafd3e85..c1ff15b72 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -19,7 +19,7 @@ mockito = "org.mockito:mockito-core:4.3.1" quickTheories = "org.quicktheories:quicktheories:0.25" reactiveStreams-tck = { module = "org.reactivestreams:reactive-streams-tck", version.ref = "reactiveStreams" } rxJava2 = "io.reactivex.rxjava2:rxjava:2.2.8" -rxJava3 = "io.reactivex.rxjava3:rxjava:3.0.0" +rxJava3 = "io.reactivex.rxjava3:rxjava:3.1.5" slf4j = "org.slf4j:slf4j-api:1.7.36" swt-windows = { module = "org.eclipse.swt:org.eclipse.swt.win32.win32.x86", version.ref = "swt" } swt-windows64 = { module = "org.eclipse.swt:org.eclipse.swt.win32.win32.x86_64", version.ref = "swt" } diff --git a/reactor-adapter/src/main/java/reactor/adapter/rxjava/RxJava3Adapter.java b/reactor-adapter/src/main/java/reactor/adapter/rxjava/RxJava3Adapter.java index db45800ee..1bb0a97ad 100644 --- a/reactor-adapter/src/main/java/reactor/adapter/rxjava/RxJava3Adapter.java +++ b/reactor-adapter/src/main/java/reactor/adapter/rxjava/RxJava3Adapter.java @@ -223,7 +223,7 @@ static final class FlowableAsFluxSubscriber implements FlowableSubscriber, Subscription s; - io.reactivex.rxjava3.internal.fuseable.QueueSubscription qs; + io.reactivex.rxjava3.operators.QueueSubscription qs; public FlowableAsFluxSubscriber(Subscriber actual) { this.actual = actual; @@ -234,8 +234,8 @@ public FlowableAsFluxSubscriber(Subscriber actual) { public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { this.s = s; - if (s instanceof io.reactivex.rxjava3.internal.fuseable.QueueSubscription) { - this.qs = (io.reactivex.rxjava3.internal.fuseable.QueueSubscription)s; + if (s instanceof io.reactivex.rxjava3.operators.QueueSubscription) { + this.qs = (io.reactivex.rxjava3.operators.QueueSubscription)s; } actual.onSubscribe(this); @@ -301,13 +301,13 @@ public int requestFusion(int requestedMode) { } static final class FlowableAsFluxConditionalSubscriber implements - io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber, QueueSubscription { + io.reactivex.rxjava3.operators.ConditionalSubscriber, QueueSubscription { final ConditionalSubscriber actual; Subscription s; - io.reactivex.rxjava3.internal.fuseable.QueueSubscription qs; + io.reactivex.rxjava3.operators.QueueSubscription qs; public FlowableAsFluxConditionalSubscriber(ConditionalSubscriber actual) { this.actual = actual; @@ -318,8 +318,8 @@ public FlowableAsFluxConditionalSubscriber(ConditionalSubscriber actu public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { this.s = s; - if (s instanceof io.reactivex.rxjava3.internal.fuseable.QueueSubscription) { - this.qs = (io.reactivex.rxjava3.internal.fuseable.QueueSubscription)s; + if (s instanceof io.reactivex.rxjava3.operators.QueueSubscription) { + this.qs = (io.reactivex.rxjava3.operators.QueueSubscription)s; } actual.onSubscribe(this); @@ -400,15 +400,15 @@ public FluxAsFlowable(Publisher source) { @Override public void subscribeActual(Subscriber s) { - if (s instanceof io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber) { - source.subscribe(new FluxAsFlowableConditionalSubscriber<>((io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber)s)); + if (s instanceof io.reactivex.rxjava3.operators.ConditionalSubscriber) { + source.subscribe(new FluxAsFlowableConditionalSubscriber<>((io.reactivex.rxjava3.operators.ConditionalSubscriber)s)); } else { source.subscribe(new FluxAsFlowableSubscriber<>(s)); } } static final class FluxAsFlowableSubscriber implements CoreSubscriber, - io.reactivex.rxjava3.internal.fuseable.QueueSubscription { + io.reactivex.rxjava3.operators.QueueSubscription { final Subscriber actual; @@ -493,15 +493,15 @@ public boolean offer(T v1, T v2) { } static final class FluxAsFlowableConditionalSubscriber implements - Fuseable.ConditionalSubscriber, io.reactivex.rxjava3.internal.fuseable.QueueSubscription { + Fuseable.ConditionalSubscriber, io.reactivex.rxjava3.operators.QueueSubscription { - final io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber actual; + final io.reactivex.rxjava3.operators.ConditionalSubscriber actual; Subscription s; - io.reactivex.rxjava3.internal.fuseable.QueueSubscription qs; + io.reactivex.rxjava3.operators.QueueSubscription qs; - public FluxAsFlowableConditionalSubscriber(io.reactivex.rxjava3.internal.fuseable.ConditionalSubscriber actual) { + public FluxAsFlowableConditionalSubscriber(io.reactivex.rxjava3.operators.ConditionalSubscriber actual) { this.actual = actual; } @@ -510,8 +510,8 @@ public FluxAsFlowableConditionalSubscriber(io.reactivex.rxjava3.internal.fuseabl public void onSubscribe(Subscription s) { if (Operators.validate(this.s, s)) { this.s = s; - if (s instanceof io.reactivex.rxjava3.internal.fuseable.QueueSubscription) { - this.qs = (io.reactivex.rxjava3.internal.fuseable.QueueSubscription)s; + if (s instanceof io.reactivex.rxjava3.operators.QueueSubscription) { + this.qs = (io.reactivex.rxjava3.operators.QueueSubscription)s; } actual.onSubscribe(this); diff --git a/reactor-adapter/src/test/java/reactor/adapter/rxjava/RxJava3AdapterTest.java b/reactor-adapter/src/test/java/reactor/adapter/rxjava/RxJava3AdapterTest.java index 71daa2d63..31356e0e3 100644 --- a/reactor-adapter/src/test/java/reactor/adapter/rxjava/RxJava3AdapterTest.java +++ b/reactor-adapter/src/test/java/reactor/adapter/rxjava/RxJava3AdapterTest.java @@ -33,7 +33,7 @@ import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.core.Maybe; import io.reactivex.rxjava3.core.Single; -import io.reactivex.rxjava3.internal.fuseable.QueueSubscription; +import io.reactivex.rxjava3.operators.QueueSubscription; import io.reactivex.rxjava3.internal.subscribers.BasicFuseableSubscriber; import io.reactivex.rxjava3.observers.BaseTestConsumer; import io.reactivex.rxjava3.schedulers.Schedulers;