Skip to content

Commit

Permalink
Merge pull request #818 from scalecube/update-commons-get-rid-of-code…
Browse files Browse the repository at this point in the history
…-duplications

Bump scalecube-commons version to .17, get to use RetryNonSerializedE…
  • Loading branch information
artem-v authored Aug 4, 2021
2 parents 2ebdbac + cc528d2 commit f0cda59
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@

<properties>
<scalecube-cluster.version>2.6.9</scalecube-cluster.version>
<scalecube-commons.version>1.0.13</scalecube-commons.version>
<scalecube-commons.version>1.0.17</scalecube-commons.version>
<scalecube-security.version>1.0.19</scalecube-security.version>

<reactor.version>2020.0.6</reactor.version>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.scalecube.services.discovery;

import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;
import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointAdded;
import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointLeaving;
import static io.scalecube.services.discovery.api.ServiceDiscoveryEvent.newEndpointRemoved;
import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;

import io.scalecube.cluster.Cluster;
import io.scalecube.cluster.ClusterConfig;
Expand Down Expand Up @@ -35,9 +35,7 @@
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;

public final class ScalecubeServiceDiscovery implements ServiceDiscovery {

Expand Down Expand Up @@ -172,13 +170,11 @@ public Mono<Void> shutdown() {
return Mono.defer(
() -> {
if (cluster == null) {
sink.emitComplete(EmitFailureHandler.RETRY_NOT_SERIALIZED);
sink.emitComplete(RETRY_NON_SERIALIZED);
return Mono.empty();
}
cluster.shutdown();
return cluster
.onShutdown()
.doFinally(s -> sink.emitComplete(EmitFailureHandler.RETRY_NOT_SERIALIZED));
return cluster.onShutdown().doFinally(s -> sink.emitComplete(RETRY_NON_SERIALIZED));
});
}

Expand All @@ -195,7 +191,7 @@ private void onMembershipEvent(MembershipEvent membershipEvent) {

if (discoveryEvent != null) {
LOGGER.debug("Publish discoveryEvent: {}", discoveryEvent);
sink.emitNext(discoveryEvent, EmitFailureHandler.RETRY_NOT_SERIALIZED);
sink.emitNext(discoveryEvent, RETRY_NON_SERIALIZED);
}
}

Expand Down Expand Up @@ -288,14 +284,4 @@ private void onDiscoveryEvent(ServiceDiscoveryEvent event) {
}
}
}

private static class EmitFailureHandler implements Sinks.EmitFailureHandler {

private static final EmitFailureHandler RETRY_NOT_SERIALIZED = new EmitFailureHandler();

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult == FAIL_NON_SERIALIZED;
}
}
}
22 changes: 5 additions & 17 deletions services/src/main/java/io/scalecube/services/Microservices.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.scalecube.services;

import static reactor.core.publisher.Sinks.EmitResult.FAIL_NON_SERIALIZED;
import static io.scalecube.reactor.RetryNonSerializedEmitFailureHandler.RETRY_NON_SERIALIZED;

import io.scalecube.net.Address;
import io.scalecube.services.api.ServiceMessage;
Expand Down Expand Up @@ -59,9 +59,7 @@
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

Expand Down Expand Up @@ -171,7 +169,7 @@ private Microservices(Builder builder) {
shutdown
.asMono()
.then(doShutdown())
.doFinally(s -> onShutdown.emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED))
.doFinally(s -> onShutdown.emitEmpty(RETRY_NON_SERIALIZED))
.subscribe(
null, ex -> LOGGER.warn("[{}][doShutdown] Exception occurred: {}", id, ex.toString()));
}
Expand Down Expand Up @@ -350,7 +348,7 @@ public Flux<ServiceDiscoveryEvent> listenDiscovery() {
public Mono<Void> shutdown() {
return Mono.defer(
() -> {
shutdown.emitEmpty(EmitFailureHandler.RETRY_NOT_SERIALIZED);
shutdown.emitEmpty(RETRY_NON_SERIALIZED);
return onShutdown.asMono();
});
}
Expand Down Expand Up @@ -649,7 +647,7 @@ private Mono<? extends Void> start0(String id, ServiceDiscovery discovery) {
.subscribeOn(scheduler)
.publishOn(scheduler)
.doOnNext(event -> onDiscoveryEvent(microservices, event))
.doOnNext(event -> sink.emitNext(event, EmitFailureHandler.RETRY_NOT_SERIALIZED))
.doOnNext(event -> sink.emitNext(event, RETRY_NON_SERIALIZED))
.subscribe());

return Mono.deferContextual(context -> discovery.start())
Expand All @@ -673,7 +671,7 @@ public Mono<Void> shutdown() {
return Mono.defer(
() -> {
disposables.dispose();
sink.emitComplete(EmitFailureHandler.RETRY_NOT_SERIALIZED);
sink.emitComplete(RETRY_NON_SERIALIZED);
return Mono.whenDelayError(
discoveryInstances.values().stream()
.map(ServiceDiscovery::shutdown)
Expand Down Expand Up @@ -909,14 +907,4 @@ private static String asString(ServiceInfo serviceInfo) {
.toString();
}
}

private static class EmitFailureHandler implements Sinks.EmitFailureHandler {

private static final EmitFailureHandler RETRY_NOT_SERIALIZED = new EmitFailureHandler();

@Override
public boolean onEmitFailure(SignalType signalType, EmitResult emitResult) {
return emitResult == FAIL_NON_SERIALIZED;
}
}
}

0 comments on commit f0cda59

Please sign in to comment.