Skip to content

Commit

Permalink
Merge pull request #782 from scalecube/cleanup
Browse files Browse the repository at this point in the history
Cleanup (removed ConnectionClosedException and jackson-smile)
  • Loading branch information
artem-v authored Jul 3, 2020
2 parents c14dbed + 7ebfc04 commit c44986d
Show file tree
Hide file tree
Showing 8 changed files with 11 additions and 72 deletions.
5 changes: 0 additions & 5 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@
<artifactId>scalecube-codec-jackson</artifactId>
<version>${scalecube-cluster.version}</version>
</dependency>
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-codec-jackson-smile</artifactId>
<version>${scalecube-cluster.version}</version>
</dependency>

<!-- Scalecube Config -->
<dependency>
Expand Down

This file was deleted.

5 changes: 0 additions & 5 deletions services-discovery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,6 @@
<artifactId>scalecube-codec-jackson</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-codec-jackson-smile</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import io.scalecube.cluster.ClusterMath;
import io.scalecube.cluster.codec.jackson.JacksonMetadataCodec;
import io.scalecube.cluster.codec.jackson.smile.JacksonSmileMetadataCodec;
import io.scalecube.cluster.fdetector.FailureDetectorConfig;
import io.scalecube.cluster.gossip.GossipConfig;
import io.scalecube.cluster.membership.MembershipConfig;
Expand Down Expand Up @@ -56,9 +55,7 @@ public static void setUp() {

private static Stream<Arguments> metadataCodecSource() {
return Stream.of(
Arguments.of(new JdkMetadataCodec()),
Arguments.of(new JacksonMetadataCodec()),
Arguments.of(new JacksonSmileMetadataCodec()));
Arguments.of(new JdkMetadataCodec()), Arguments.of(new JacksonMetadataCodec()));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,17 @@
import io.rsocket.RSocket;
import io.rsocket.util.ByteBufPayload;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.transport.api.ClientChannel;
import io.scalecube.services.transport.api.ServiceMessageCodec;
import java.lang.reflect.Type;
import java.nio.channels.ClosedChannelException;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class RSocketClientChannel implements ClientChannel {

private Mono<RSocket> rsocket;
private ServiceMessageCodec messageCodec;
private final Mono<RSocket> rsocket;
private final ServiceMessageCodec messageCodec;

public RSocketClientChannel(Mono<RSocket> rsocket, ServiceMessageCodec codec) {
this.rsocket = rsocket;
Expand All @@ -26,27 +24,15 @@ public RSocketClientChannel(Mono<RSocket> rsocket, ServiceMessageCodec codec) {
@Override
public Mono<ServiceMessage> requestResponse(ServiceMessage message, Type responseType) {
return rsocket
.flatMap(
rsocket ->
rsocket
.requestResponse(toPayload(message))
.onErrorMap(
ClosedChannelException.class,
e -> new ConnectionClosedException("Connection closed")))
.flatMap(rsocket -> rsocket.requestResponse(toPayload(message)))
.map(this::toMessage)
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType));
}

@Override
public Flux<ServiceMessage> requestStream(ServiceMessage message, Type responseType) {
return rsocket
.flatMapMany(
rsocket ->
rsocket
.requestStream(toPayload(message))
.onErrorMap(
ClosedChannelException.class,
e -> new ConnectionClosedException("Connection closed")))
.flatMapMany(rsocket -> rsocket.requestStream(toPayload(message)))
.map(this::toMessage)
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType));
}
Expand All @@ -55,13 +41,7 @@ public Flux<ServiceMessage> requestStream(ServiceMessage message, Type responseT
public Flux<ServiceMessage> requestChannel(
Publisher<ServiceMessage> publisher, Type responseType) {
return rsocket
.flatMapMany(
rsocket ->
rsocket
.requestChannel(Flux.from(publisher).map(this::toPayload))
.onErrorMap(
ClosedChannelException.class,
e -> new ConnectionClosedException("Connection closed")))
.flatMapMany(rsocket -> rsocket.requestChannel(Flux.from(publisher).map(this::toPayload)))
.map(this::toMessage)
.map(msg -> ServiceMessageCodec.decodeData(msg, responseType));
}
Expand Down
5 changes: 0 additions & 5 deletions services/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@
<artifactId>scalecube-codec-jackson</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.scalecube</groupId>
<artifactId>scalecube-codec-jackson-smile</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

import io.scalecube.cluster.codec.jackson.JacksonMetadataCodec;
import io.scalecube.cluster.codec.jackson.smile.JacksonSmileMetadataCodec;
import io.scalecube.cluster.metadata.JdkMetadataCodec;
import io.scalecube.cluster.metadata.MetadataCodec;
import io.scalecube.net.Address;
Expand Down Expand Up @@ -34,9 +33,7 @@ public class ServiceRegistryTest extends BaseTest {

private static Stream<Arguments> metadataCodecSource() {
return Stream.of(
Arguments.of(new JdkMetadataCodec()),
Arguments.of(new JacksonMetadataCodec()),
Arguments.of(new JacksonSmileMetadataCodec()));
Arguments.of(new JdkMetadataCodec()), Arguments.of(new JacksonMetadataCodec()));
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.discovery.ScalecubeServiceDiscovery;
import io.scalecube.services.discovery.api.ServiceDiscoveryEvent;
import io.scalecube.services.exceptions.ConnectionClosedException;
import io.scalecube.services.sut.QuoteService;
import io.scalecube.services.sut.SimpleQuoteService;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -94,7 +94,7 @@ public void test_remote_node_died_mono_never() throws Exception {
TimeUnit.MILLISECONDS.sleep(100);

assertEquals(0, latch1.getCount());
assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass());
assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass());
assertTrue(sub1.get().isDisposed());
}

Expand Down Expand Up @@ -122,7 +122,7 @@ public void test_remote_node_died_many_never() throws Exception {
TimeUnit.MILLISECONDS.sleep(100);

assertEquals(0, latch1.getCount());
assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass());
assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass());
assertTrue(sub1.get().isDisposed());
}

Expand Down Expand Up @@ -154,7 +154,7 @@ public void test_remote_node_died_many_then_never() throws Exception {
TimeUnit.MILLISECONDS.sleep(100);

assertEquals(0, latch1.getCount());
assertEquals(ConnectionClosedException.class, exceptionHolder.get().getClass());
assertEquals(ClosedChannelException.class, exceptionHolder.get().getClass());
assertTrue(sub1.get().isDisposed());
}
}

0 comments on commit c44986d

Please sign in to comment.