diff --git a/pom.xml b/pom.xml index 5f7f39883..a9472146f 100644 --- a/pom.xml +++ b/pom.xml @@ -58,7 +58,7 @@ 2.6.9 - 1.0.13 + 1.0.17 1.0.19 2020.0.6 diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index decf2fbf0..d0e70bb33 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -1,9 +1,9 @@ package io.scalecube.services.discovery; +import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointAdded; import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointLeaving; import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointRemoved; -import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED; import io.scalecube.cluster.Cluster; import io.scalecube.cluster.ClusterConfig; @@ -35,9 +35,7 @@ import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.EmitResult; public final class ScalecubeServiceDiscovery implements ServiceDiscovery { @@ -172,13 +170,11 @@ public Mono shutdown() { return Mono.defer( () -> { if (cluster == null) { - sink.emitComplete(EmitFailureHandler.RETRY_NOT_SERIALIZED); + sink.emitComplete(RETRY_NON_SERIALIZED); return Mono.empty(); } cluster.shutdown(); - return cluster - .onShutdown() - .doFinally(s -> sink.emitComplete(EmitFailureHandler.RETRY_NOT_SERIALIZED)); + return cluster.onShutdown().doFinally(s -> sink.emitComplete(RETRY_NON_SERIALIZED)); }); } @@ -195,7 +191,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) { if (discoveryEvent != null) { LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent); - sink.emitNext(discoveryEvent, EmitFailureHandler.RETRY_NOT_SERIALIZED); + sink.emitNext(discoveryEvent, RETRY_NON_SERIALIZED); } } @@ -288,14 +284,4 @@ private void onDiscoveryEvent(ServiceDiscoveryEvent event) { } } } - - private static class EmitFailureHandler implements Sinks.EmitFailureHandler { - - private static final EmitFailureHandler RETRY_NOT_SERIALIZED = new EmitFailureHandler(); - - @Override - public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { - return emitResult == FAIL_NON_SERIALIZED; - } - } } diff --git a/services/src/main/java/io/scalecube/services/Microservices.java b/services/src/main/java/io/scalecube/services/Microservices.java index 9f1335b2e..3446def6a 100644 --- a/services/src/main/java/io/scalecube/services/Microservices.java +++ b/services/src/main/java/io/scalecube/services/Microservices.java @@ -1,6 +1,6 @@ package io.scalecube.services; -import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED; +import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED; import io.scalecube.net.Address; import io.scalecube.services.api.ServiceMessage; @@ -59,9 +59,7 @@ import reactor.core.Exceptions; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.core.publisher.SignalType; import reactor.core.publisher.Sinks; -import reactor.core.publisher.Sinks.EmitResult; import reactor.core.scheduler.Scheduler; import reactor.core.scheduler.Schedulers; @@ -171,7 +169,7 @@ private Microservices(Builder builder) { shutdown .asMono() .then(doShutdown()) - .doFinally(s -> onShutdown.emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED)) + .doFinally(s -> onShutdown.emitEmpty(RETRY_NON_SERIALIZED)) .subscribe( null, ex -> LOGGER.warn("[{}][doShutdown] Exception occurred: {}", id, ex.toString())); } @@ -350,7 +348,7 @@ public Flux listenDiscovery() { public Mono shutdown() { return Mono.defer( () -> { - shutdown.emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED); + shutdown.emitEmpty(RETRY_NON_SERIALIZED); return onShutdown.asMono(); }); } @@ -649,7 +647,7 @@ private Mono start0(String id, ServiceDiscovery discovery) { .subscribeOn(scheduler) .publishOn(scheduler) .doOnNext(event -> onDiscoveryEvent(microservices, event)) - .doOnNext(event -> sink.emitNext(event, EmitFailureHandler.RETRY_NOT_SERIALIZED)) + .doOnNext(event -> sink.emitNext(event, RETRY_NON_SERIALIZED)) .subscribe()); return Mono.deferContextual(context -> discovery.start()) @@ -673,7 +671,7 @@ public Mono shutdown() { return Mono.defer( () -> { disposables.dispose(); - sink.emitComplete(EmitFailureHandler.RETRY_NOT_SERIALIZED); + sink.emitComplete(RETRY_NON_SERIALIZED); return Mono.whenDelayError( discoveryInstances.values().stream() .map(ServiceDiscovery::shutdown) @@ -909,14 +907,4 @@ private static String asString(ServiceInfo serviceInfo) { .toString(); } } - - private static class EmitFailureHandler implements Sinks.EmitFailureHandler { - - private static final EmitFailureHandler RETRY_NOT_SERIALIZED = new EmitFailureHandler(); - - @Override - public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) { - return emitResult == FAIL_NON_SERIALIZED; - } - } }