diff --git a/pom.xml b/pom.xml index a9472146f..201476cf0 100644 --- a/pom.xml +++ b/pom.xml @@ -57,18 +57,18 @@ - 2.6.9 + 2.6.10 1.0.17 1.0.19 - 2020.0.6 + 2020.0.10 2.11.0 - 1.0.4 + 1.0.5 1.6.0 1.7.30 2.13.2 3.4.2 - 4.1.63.Final + 4.1.66.Final 1.26 3.0.2 @@ -116,6 +116,11 @@ scalecube-codec-jackson ${scalecube-cluster.version} + + io.scalecube + scalecube-transport-netty + ${scalecube-cluster.version} + diff --git a/services-discovery/pom.xml b/services-discovery/pom.xml index ebc6f03c6..d285bb217 100644 --- a/services-discovery/pom.xml +++ b/services-discovery/pom.xml @@ -38,6 +38,11 @@ scalecube-codec-jackson test + + io.scalecube + scalecube-transport-netty + test + diff --git a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java index d0e70bb33..a0c4daef9 100644 --- a/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java +++ b/services-discovery/src/main/java/io/scalecube/services/discovery/ScalecubeServiceDiscovery.java @@ -18,11 +18,9 @@ import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscoveryContext; import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; -import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.lang.management.ManagementFactory; import java.nio.ByteBuffer; import java.util.List; -import java.util.Objects; import java.util.StringJoiner; import java.util.concurrent.CopyOnWriteArrayList; import java.util.function.UnaryOperator; @@ -41,8 +39,6 @@ public final class ScalecubeServiceDiscovery implements ServiceDiscovery { private static final Logger LOGGER = LoggerFactory.getLogger(ServiceDiscovery.class); - private final ServiceEndpoint serviceEndpoint; - private ClusterConfig clusterConfig; private Cluster cluster; @@ -50,17 +46,9 @@ public final class ScalecubeServiceDiscovery implements ServiceDiscovery { private final Sinks.Many sink = Sinks.many().multicast().directBestEffort(); - /** - * Constructor. - * - * @param serviceEndpoint service endpoint - */ - public ScalecubeServiceDiscovery(ServiceEndpoint serviceEndpoint) { - this.serviceEndpoint = Objects.requireNonNull(serviceEndpoint, "serviceEndpoint"); - this.clusterConfig = - ClusterConfig.defaultLanConfig() - .metadata(serviceEndpoint) - .transport(opts -> opts.transportFactory(new WebsocketTransportFactory())); + /** Constructor. */ + public ScalecubeServiceDiscovery() { + this.clusterConfig = ClusterConfig.defaultLanConfig(); } /** @@ -69,7 +57,6 @@ public ScalecubeServiceDiscovery(ServiceEndpoint serviceEndpoint) { * @param other other instance */ private ScalecubeServiceDiscovery(ScalecubeServiceDiscovery other) { - this.serviceEndpoint = other.serviceEndpoint; this.clusterConfig = other.clusterConfig; this.cluster = other.cluster; } @@ -255,7 +242,7 @@ private static JmxMonitorMBean start(ScalecubeServiceDiscovery instance) throws jmxMBean.init(); ObjectName objectName = new ObjectName( - String.format(OBJECT_NAME_FORMAT, instance.serviceEndpoint.id(), System.nanoTime())); + String.format(OBJECT_NAME_FORMAT, instance.cluster.member().id(), System.nanoTime())); StandardMBean standardMBean = new StandardMBean(jmxMBean, MonitorMBean.class); mbeanServer.registerMBean(standardMBean, objectName); return jmxMBean; diff --git a/services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java b/services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java index 29a697de6..26f343888 100644 --- a/services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java +++ b/services-discovery/src/test/java/io/scalecube/services/discovery/ScalecubeServiceDiscoveryTest.java @@ -18,6 +18,7 @@ import io.scalecube.services.ServiceRegistration; import io.scalecube.services.discovery.api.ServiceDiscovery; import io.scalecube.services.discovery.api.ServiceDiscoveryEvent; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; @@ -224,7 +225,9 @@ private Mono newServiceDiscovery( Address seedAddress, MetadataCodec metadataCodec) { return Mono.fromCallable( () -> - new ScalecubeServiceDiscovery(newServiceEndpoint()) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(newServiceEndpoint())) .options(opts -> opts.metadataCodec(metadataCodec)) .gossip(cfg -> GOSSIP_CONFIG) .membership(cfg -> MEMBERSHIP_CONFIG) @@ -232,8 +235,10 @@ private Mono newServiceDiscovery( } private void startSeed(MetadataCodec metadataCodec) { - new ScalecubeServiceDiscovery(newServiceEndpoint()) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .membership(opts -> opts.seedMembers(SEED_ADDRESS)) + .options(opts -> opts.metadata(newServiceEndpoint())) .options(opts -> opts.metadataCodec(metadataCodec)) .gossip(cfg -> GOSSIP_CONFIG) .membership(cfg -> MEMBERSHIP_CONFIG) diff --git a/services-examples/pom.xml b/services-examples/pom.xml index 823a61d0a..c949e68a9 100644 --- a/services-examples/pom.xml +++ b/services-examples/pom.xml @@ -39,6 +39,10 @@ scalecube-services-discovery ${project.version} + + io.scalecube + scalecube-transport-netty + it.unimi.dsi diff --git a/services-examples/src/main/java/io/scalecube/services/examples/auth/PrincipalMapperAuthExample.java b/services-examples/src/main/java/io/scalecube/services/examples/auth/PrincipalMapperAuthExample.java index b2c991210..84336b3f2 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/auth/PrincipalMapperAuthExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/auth/PrincipalMapperAuthExample.java @@ -8,6 +8,7 @@ import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.exceptions.UnauthorizedException; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -24,7 +25,12 @@ public class PrincipalMapperAuthExample { public static void main(String[] args) { Microservices service = Microservices.builder() - .discovery("service", ScalecubeServiceDiscovery::new) + .discovery( + "service", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(() -> new RSocketServiceTransport().authenticator(authenticator())) .services( ServiceInfo.fromServiceInstance(new SecuredServiceByApiKeyImpl()) @@ -152,7 +158,9 @@ private static ApiKey apiKeyPrincipalMapper(Map authData) { private static ScalecubeServiceDiscovery discovery( Microservices service, ServiceEndpoint endpoint) { - return new ScalecubeServiceDiscovery(endpoint) + return new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(opts -> opts.seedMembers(service.discovery("service").address())); } } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/auth/ServiceTransportAuthExample.java b/services-examples/src/main/java/io/scalecube/services/examples/auth/ServiceTransportAuthExample.java index 6828495b0..8be3441ff 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/auth/ServiceTransportAuthExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/auth/ServiceTransportAuthExample.java @@ -7,6 +7,7 @@ import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.exceptions.UnauthorizedException; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; import java.util.HashMap; import java.util.UUID; @@ -22,7 +23,12 @@ public class ServiceTransportAuthExample { public static void main(String[] args) { Microservices service = Microservices.builder() - .discovery("service", ScalecubeServiceDiscovery::new) + .discovery( + "service", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(() -> new RSocketServiceTransport().authenticator(authenticator())) .services(new SecuredServiceByUserProfileImpl()) .startAwait(); @@ -71,7 +77,9 @@ private static CredentialsSupplier credsSupplier() { private static ScalecubeServiceDiscovery discovery( Microservices service, ServiceEndpoint endpoint) { - return new ScalecubeServiceDiscovery(endpoint) + return new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(opts -> opts.seedMembers(service.discovery("service").address())); } } diff --git a/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java b/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java index f95898ce4..d5cbddc2d 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/codecs/Example1.java @@ -6,6 +6,7 @@ import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; import io.scalecube.services.examples.helloworld.service.api.GreetingsService; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; public class Example1 { @@ -22,7 +23,12 @@ public static void main(String[] args) { // ScaleCube Node node with no members Microservices seed = Microservices.builder() - .discovery("seed", ScalecubeServiceDiscovery::new) + .discovery( + "seed", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .defaultContentType(PROTOSTUFF) // set explicit default data format .startAwait(); @@ -35,7 +41,9 @@ public static void main(String[] args) { .discovery( "ms", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) .services(new GreetingServiceImpl()) diff --git a/services-examples/src/main/java/io/scalecube/services/examples/discovery/CompositeDiscoveryExample.java b/services-examples/src/main/java/io/scalecube/services/examples/discovery/CompositeDiscoveryExample.java index b39cca8e5..919e6e83d 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/discovery/CompositeDiscoveryExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/discovery/CompositeDiscoveryExample.java @@ -9,6 +9,7 @@ import io.scalecube.services.discovery.api.ServiceDiscoveryContext; import io.scalecube.services.examples.helloworld.service.api.Greeting; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import reactor.core.publisher.Mono; public class CompositeDiscoveryExample { @@ -24,7 +25,9 @@ public static void main(String[] args) { .discovery( "seed1", serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint)) .options(opts -> opts.memberAlias("seed1"))) .transport(RSocketServiceTransport::new) .startAwait(); @@ -34,7 +37,9 @@ public static void main(String[] args) { .discovery( "seed2", serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint)) .options(opts -> opts.memberAlias("seed2"))) .transport(RSocketServiceTransport::new) .startAwait(); @@ -47,7 +52,9 @@ public static void main(String[] args) { .discovery( "ms1", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .options(opts -> opts.memberAlias("ms1")) .membership(cfg -> cfg.seedMembers(seed1Address))) .transport(RSocketServiceTransport::new) @@ -59,7 +66,9 @@ public static void main(String[] args) { .discovery( "ms2", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .options(opts -> opts.memberAlias("ms2")) .membership(cfg -> cfg.seedMembers(seed2Address))) .transport(RSocketServiceTransport::new) @@ -71,13 +80,17 @@ public static void main(String[] args) { .discovery( "domain1", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .options(opts -> opts.memberAlias("domain1")) .membership(cfg -> cfg.seedMembers(seed1Address))) .discovery( "domain2", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .options(opts -> opts.memberAlias("domain2")) .membership(cfg -> cfg.seedMembers(seed2Address))) .transport(RSocketServiceTransport::new) diff --git a/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java b/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java index 25ba9e813..efb236a74 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/exceptions/ExceptionMapperExample.java @@ -5,6 +5,7 @@ import io.scalecube.services.ServiceInfo; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.util.Collections; public class ExceptionMapperExample { @@ -18,7 +19,12 @@ public class ExceptionMapperExample { public static void main(String[] args) throws InterruptedException { Microservices ms1 = Microservices.builder() - .discovery("ms1", ScalecubeServiceDiscovery::new) + .discovery( + "ms1", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .defaultErrorMapper(new ServiceAProviderErrorMapper()) // default mapper for whole node .services( @@ -36,7 +42,9 @@ public static void main(String[] args) throws InterruptedException { .discovery( "ms2", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(address1))) .transport(RSocketServiceTransport::new) .services( diff --git a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java index 165c1fabb..10138f97c 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example1.java @@ -6,6 +6,7 @@ import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; import io.scalecube.services.examples.helloworld.service.api.GreetingsService; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import reactor.core.publisher.Mono; /** @@ -27,7 +28,12 @@ public static void main(String[] args) { // ScaleCube Node node with no members Microservices seed = Microservices.builder() - .discovery("seed", ScalecubeServiceDiscovery::new) + .discovery( + "seed", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .startAwait(); @@ -39,7 +45,9 @@ public static void main(String[] args) { .discovery( "ms", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) .services(new GreetingServiceImpl()) diff --git a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java index 56f17d12d..9ff15c12b 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example2.java @@ -8,6 +8,7 @@ import io.scalecube.services.examples.helloworld.service.GreetingServiceImpl; import io.scalecube.services.examples.helloworld.service.api.Greeting; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; @@ -34,7 +35,12 @@ public static void main(String[] args) { // ScaleCube Node node with no members Microservices seed = Microservices.builder() - .discovery("seed", ScalecubeServiceDiscovery::new) + .discovery( + "seed", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .startAwait(); @@ -46,7 +52,9 @@ public static void main(String[] args) { .discovery( "ms", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) .services(new GreetingServiceImpl()) diff --git a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java index ff7bd8c20..da15491f4 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/helloworld/Example3.java @@ -6,6 +6,7 @@ import io.scalecube.services.examples.helloworld.service.BidiGreetingImpl; import io.scalecube.services.examples.helloworld.service.api.BidiGreetingService; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -28,7 +29,12 @@ public static void main(String[] args) { // ScaleCube Node node with no members Microservices seed = Microservices.builder() - .discovery("seed", ScalecubeServiceDiscovery::new) + .discovery( + "seed", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .startAwait(); @@ -40,7 +46,9 @@ public static void main(String[] args) { .discovery( "ms", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) .services(new BidiGreetingImpl()) diff --git a/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java b/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java index 0e1b80a54..af34e47bc 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/services/Example1.java @@ -4,6 +4,7 @@ import io.scalecube.services.Microservices; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -17,7 +18,12 @@ public class Example1 { public static void main(String[] args) { Microservices gateway = Microservices.builder() - .discovery("gateway", ScalecubeServiceDiscovery::new) + .discovery( + "gateway", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .startAwait(); @@ -28,7 +34,9 @@ public static void main(String[] args) { .discovery( "service2Node", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services(new Service2Impl()) @@ -39,7 +47,9 @@ public static void main(String[] args) { .discovery( "service1Node", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services(new Service1Impl()) diff --git a/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java b/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java index 361ac56eb..6621d5608 100644 --- a/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java +++ b/services-examples/src/main/java/io/scalecube/services/examples/services/Example2.java @@ -4,6 +4,7 @@ import io.scalecube.services.Microservices; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import reactor.core.publisher.Mono; import reactor.core.scheduler.Schedulers; @@ -17,7 +18,12 @@ public class Example2 { public static void main(String[] args) { Microservices gateway = Microservices.builder() - .discovery("gateway", ScalecubeServiceDiscovery::new) + .discovery( + "gateway", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .startAwait(); @@ -28,7 +34,9 @@ public static void main(String[] args) { .discovery( "service2Node", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services(new Service2Impl()) @@ -39,7 +47,9 @@ public static void main(String[] args) { .discovery( "service1Node", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services(new Service1Impl()) diff --git a/services/pom.xml b/services/pom.xml index e82f0ed22..b5e8f00b8 100644 --- a/services/pom.xml +++ b/services/pom.xml @@ -78,6 +78,11 @@ scalecube-codec-jackson test + + io.scalecube + scalecube-transport-netty + test + diff --git a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java index 91b94a8ba..68d63bf77 100644 --- a/services/src/test/java/io/scalecube/services/ErrorFlowTest.java +++ b/services/src/test/java/io/scalecube/services/ErrorFlowTest.java @@ -13,6 +13,7 @@ import io.scalecube.services.sut.GreetingResponse; import io.scalecube.services.sut.GreetingServiceImpl; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -34,8 +35,10 @@ public static void initNodes() { .discovery( "provider", endpoint -> - new ScalecubeServiceDiscovery(endpoint) - .transport(cfg -> cfg.port(PORT.incrementAndGet()))) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .transport(cfg -> cfg.port(PORT.incrementAndGet())) + .options(opts -> opts.metadata(endpoint))) .transport(RSocketServiceTransport::new) .services(new GreetingServiceImpl()) .startAwait(); @@ -47,9 +50,11 @@ public static void initNodes() { .discovery( "consumer", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() .membership(cfg -> cfg.seedMembers(seedAddress)) - .transport(cfg -> cfg.port(PORT.incrementAndGet()))) + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .transport(cfg -> cfg.port(PORT.incrementAndGet())) + .options(opts -> opts.metadata(endpoint))) .transport(RSocketServiceTransport::new) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java index 1ef439f1a..1ebe0408c 100644 --- a/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceAuthRemoteTest.java @@ -16,6 +16,7 @@ import io.scalecube.services.sut.security.SecuredServiceImpl; import io.scalecube.services.sut.security.UserProfile; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; import java.util.Collections; import java.util.HashMap; @@ -75,7 +76,12 @@ static void beforeAll() { service = Microservices.builder() - .discovery("service", ScalecubeServiceDiscovery::new) + .discovery( + "service", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(() -> new RSocketServiceTransport().authenticator(authenticator)) .services( ServiceInfo.fromServiceInstance(new SecuredServiceImpl()) @@ -85,7 +91,12 @@ static void beforeAll() { serviceWithoutAuthenticator = Microservices.builder() - .discovery("service", ScalecubeServiceDiscovery::new) + .discovery( + "service", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .services( ServiceInfo.fromServiceInstance(new AnotherSecuredServiceImpl()) @@ -95,7 +106,12 @@ static void beforeAll() { partiallySecuredService = Microservices.builder() - .discovery("service", ScalecubeServiceDiscovery::new) + .discovery( + "service", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(() -> new RSocketServiceTransport().authenticator(authenticator)) .services( ServiceInfo.fromServiceInstance(new PartiallySecuredServiceImpl()) @@ -317,7 +333,9 @@ private static Mono> invalidCredentialsSupplier( } private static ServiceDiscovery serviceDiscovery(ServiceEndpoint endpoint) { - return new ScalecubeServiceDiscovery(endpoint) + return new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership( opts -> opts.seedMembers( diff --git a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java index a8e859904..3719dcdaf 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallLocalTest.java @@ -27,6 +27,7 @@ import io.scalecube.services.sut.GreetingService; import io.scalecube.services.sut.GreetingServiceImpl; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; import java.util.Collections; import java.util.Optional; @@ -72,7 +73,12 @@ public void test_local_async_no_params() { private static Microservices serviceProvider() { return Microservices.builder() - .discovery("serviceProvider", ScalecubeServiceDiscovery::new) + .discovery( + "serviceProvider", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .services(new GreetingServiceImpl()) .startAwait(); diff --git a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java index 1ab088713..ec1374933 100644 --- a/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceCallRemoteTest.java @@ -29,6 +29,7 @@ import io.scalecube.services.sut.QuoteService; import io.scalecube.services.sut.SimpleQuoteService; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; import java.util.Collections; import java.util.Optional; @@ -76,7 +77,9 @@ private static Microservices serviceProvider(Object service) { .discovery( "serviceProvider", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(gateway.discovery("gateway").address()))) .transport(RSocketServiceTransport::new) .services(service) @@ -298,7 +301,12 @@ private static Optional route( private static Microservices gateway() { return Microservices.builder() - .discovery("gateway", ScalecubeServiceDiscovery::new) + .discovery( + "gateway", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .startAwait(); } diff --git a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java index 073757343..02fd79d75 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRegistryTest.java @@ -16,6 +16,7 @@ import io.scalecube.services.sut.AnnotationServiceImpl; import io.scalecube.services.sut.GreetingServiceImpl; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; @@ -203,13 +204,18 @@ public void test_delayed_listen_to_discovery_events(MetadataCodec metadataCodec) private ServiceDiscoveryFactory defServiceDiscovery(MetadataCodec metadataCodec) { return endpoint -> - new ScalecubeServiceDiscovery(endpoint).options(cfg -> cfg.metadataCodec(metadataCodec)); + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) + .options(cfg -> cfg.metadataCodec(metadataCodec)); } private static ServiceDiscoveryFactory defServiceDiscovery( Address address, MetadataCodec metadataCodec) { return endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .options(cfg -> cfg.metadataCodec(metadataCodec)) .membership(cfg -> cfg.seedMembers(address)); } diff --git a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java index 5d62019a0..df13ce517 100644 --- a/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java +++ b/services/src/test/java/io/scalecube/services/ServiceRemoteTest.java @@ -19,6 +19,7 @@ import io.scalecube.services.sut.GreetingService; import io.scalecube.services.sut.GreetingServiceImpl; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; import java.util.HashMap; import java.util.Map; @@ -68,7 +69,12 @@ public static void tearDown() { private static Microservices gateway() { return Microservices.builder() - .discovery("gateway", ScalecubeServiceDiscovery::new) + .discovery( + "gateway", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .startAwait(); } @@ -501,7 +507,12 @@ public void test_services_contribute_to_cluster_metadata() { Microservices ms = Microservices.builder() - .discovery("ms", ScalecubeServiceDiscovery::new) + .discovery( + "ms", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .tags(tags) .services(new GreetingServiceImpl()) @@ -557,7 +568,9 @@ private GreetingService createProxy() { } private static ServiceDiscovery serviceDiscovery(ServiceEndpoint endpoint) { - return new ScalecubeServiceDiscovery(endpoint) + return new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(gatewayAddress)); } } diff --git a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java index f694374be..1a3f46d88 100644 --- a/services/src/test/java/io/scalecube/services/StreamingServiceTest.java +++ b/services/src/test/java/io/scalecube/services/StreamingServiceTest.java @@ -12,6 +12,7 @@ import io.scalecube.services.sut.SimpleQuoteService; import io.scalecube.services.transport.api.ServiceMessageCodec; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -29,7 +30,12 @@ public class StreamingServiceTest extends BaseTest { public static void setup() { gateway = Microservices.builder() - .discovery("gateway", ScalecubeServiceDiscovery::new) + .discovery( + "gateway", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .defaultDataDecoder(ServiceMessageCodec::decodeData) .startAwait(); @@ -41,7 +47,9 @@ public static void setup() { .discovery( "node", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .defaultDataDecoder(ServiceMessageCodec::decodeData) diff --git a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java index 6a138d1ee..146090516 100644 --- a/services/src/test/java/io/scalecube/services/routings/RoutersTest.java +++ b/services/src/test/java/io/scalecube/services/routings/RoutersTest.java @@ -30,6 +30,7 @@ import io.scalecube.services.sut.GreetingResponse; import io.scalecube.services.sut.GreetingServiceImpl; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; import java.util.HashSet; import java.util.Map; @@ -56,7 +57,12 @@ public class RoutersTest extends BaseTest { public static void setup() { gateway = Microservices.builder() - .discovery("gateway", ScalecubeServiceDiscovery::new) + .discovery( + "gateway", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .startAwait(); @@ -68,7 +74,9 @@ public static void setup() { .discovery( "provider1", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services( @@ -87,7 +95,9 @@ public static void setup() { .discovery( "provider2", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services( @@ -106,7 +116,9 @@ public static void setup() { .discovery( "provider3", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services( diff --git a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java index 848e3929a..fec831d3e 100644 --- a/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java +++ b/services/src/test/java/io/scalecube/services/routings/ServiceTagsExample.java @@ -10,6 +10,7 @@ import io.scalecube.services.routings.sut.WeightedRandomRouter; import io.scalecube.services.sut.GreetingRequest; import io.scalecube.services.transport.rsocket.RSocketServiceTransport; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import reactor.core.publisher.Mono; public class ServiceTagsExample { @@ -22,7 +23,12 @@ public class ServiceTagsExample { public static void main(String[] args) { Microservices gateway = Microservices.builder() - .discovery("gateway", ScalecubeServiceDiscovery::new) + .discovery( + "gateway", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .startAwait(); @@ -33,7 +39,9 @@ public static void main(String[] args) { .discovery( "services1", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) .services( @@ -47,7 +55,9 @@ public static void main(String[] args) { .discovery( "services2", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(seedAddress))) .transport(RSocketServiceTransport::new) .services( diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java index 5bd9872ed..f65bce290 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketNettyColocatedEventLoopGroupTest.java @@ -10,6 +10,7 @@ import io.scalecube.services.annotations.Service; import io.scalecube.services.annotations.ServiceMethod; import io.scalecube.services.discovery.ScalecubeServiceDiscovery; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.jupiter.api.AfterEach; @@ -28,7 +29,12 @@ public class RSocketNettyColocatedEventLoopGroupTest extends BaseTest { public void setUp() { this.gateway = Microservices.builder() - .discovery("gateway", ScalecubeServiceDiscovery::new) + .discovery( + "gateway", + serviceEndpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint))) .transport(RSocketServiceTransport::new) .startAwait(); @@ -39,7 +45,9 @@ public void setUp() { .discovery( "facade", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services(new Facade()) @@ -52,7 +60,9 @@ public void setUp() { .discovery( "ping", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(facadeAddress))) .transport(RSocketServiceTransport::new) .services((PingService) () -> Mono.just(Thread.currentThread().getName())) @@ -63,7 +73,9 @@ public void setUp() { .discovery( "pong", endpoint -> - new ScalecubeServiceDiscovery(endpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint)) .membership(cfg -> cfg.seedMembers(facadeAddress))) .transport(RSocketServiceTransport::new) .services((PongService) () -> Mono.just(Thread.currentThread().getName())) diff --git a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java index e1a31afc2..73d871e24 100644 --- a/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java +++ b/services/src/test/java/io/scalecube/services/transport/rsocket/RSocketServiceTransportTest.java @@ -13,6 +13,7 @@ import io.scalecube.services.exceptions.ConnectionClosedException; import io.scalecube.services.sut.QuoteService; import io.scalecube.services.sut.SimpleQuoteService; +import io.scalecube.transport.netty.websocket.WebsocketTransportFactory; import java.time.Duration; import java.util.Optional; import java.util.concurrent.CountDownLatch; @@ -40,7 +41,12 @@ public class RSocketServiceTransportTest extends BaseTest { public void setUp() { gateway = Microservices.builder() - .discovery("gateway", ScalecubeServiceDiscovery::new) + .discovery( + "gateway", + endpoint -> + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(endpoint))) .transport(RSocketServiceTransport::new) .startAwait(); @@ -51,7 +57,9 @@ public void setUp() { .discovery( "serviceNode", serviceEndpoint -> - new ScalecubeServiceDiscovery(serviceEndpoint) + new ScalecubeServiceDiscovery() + .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) + .options(opts -> opts.metadata(serviceEndpoint)) .membership(cfg -> cfg.seedMembers(gatewayAddress))) .transport(RSocketServiceTransport::new) .services(new SimpleQuoteService())