diff --git a/pom.xml b/pom.xml index ee69853..58c100d 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 @@ -33,13 +35,14 @@ - 2.6.13 - 1.0.21 - 2.10.24 + 2.6.15 + 1.0.22 + 2.10.25 - 2020.0.23 + 2022.0.7 1.1.3 - 2.13.3 + 2.15.1 + 4.1.92.Final 1.7.36 2.17.2 3.4.2 @@ -104,6 +107,15 @@ import + + + io.netty + netty-bom + ${netty.version} + pom + import + + org.slf4j diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewayTemplate.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewayTemplate.java index 8277f2f..dbc5af2 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewayTemplate.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewayTemplate.java @@ -15,7 +15,12 @@ public abstract class GatewayTemplate implements Gateway { protected final GatewayOptions options; protected GatewayTemplate(GatewayOptions options) { - this.options = new GatewayOptions(options); + this.options = + new GatewayOptions() + .id(options.id()) + .port(options.port()) + .workerPool(options.workerPool()) + .call(options.call()); } @Override diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/AbstractGatewayExtension.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/AbstractGatewayExtension.java index e6b30be..835a0e4 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/AbstractGatewayExtension.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/AbstractGatewayExtension.java @@ -108,7 +108,7 @@ private ServiceDiscovery serviceDiscovery(ServiceEndpoint serviceEndpoint) { return new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint)) - .membership(opts -> opts.seedMembers(gateway.discovery().address())); + .membership(opts -> opts.seedMembers(gateway.discoveryAddress())); } public void shutdownServices() { diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java index 6011732..52c0ec3 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/http/HttpClientConnectionTest.java @@ -61,7 +61,7 @@ void beforEach() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint)) - .membership(opts -> opts.seedMembers(gateway.discovery().address()))) + .membership(opts -> opts.seedMembers(gateway.discoveryAddress()))) .transport(RSocketServiceTransport::new) .services(new TestServiceImpl()) .startAwait(); @@ -91,7 +91,7 @@ void testCloseServiceStreamAfterLostConnection() { .router(new StaticAddressRouter(gatewayAddress)); StepVerifier.create(serviceCall.api(TestService.class).oneNever("body").log("<<< ")) - .thenAwait(Duration.ofSeconds(1)) + .thenAwait(Duration.ofSeconds(5)) .then(() -> client.close()) .then(() -> client.onClose().block()) .expectError(IOException.class) diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketClientConnectionTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketClientConnectionTest.java index 4e165ad..57a8144 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketClientConnectionTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RSocketClientConnectionTest.java @@ -70,7 +70,7 @@ void beforEach() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint)) - .membership(opts -> opts.seedMembers(gateway.discovery().address()))) + .membership(opts -> opts.seedMembers(gateway.discoveryAddress()))) .transport(RSocketServiceTransport::new) .services(new TestServiceImpl(onCloseCounter::incrementAndGet)) .startAwait(); @@ -100,7 +100,7 @@ void testCloseServiceStreamAfterLostConnection() { .router(new StaticAddressRouter(gatewayAddress)); StepVerifier.create(serviceCall.api(TestService.class).manyNever().log("<<< ")) - .thenAwait(Duration.ofSeconds(1)) + .thenAwait(Duration.ofSeconds(5)) .then(() -> client.close()) .then(() -> client.onClose().block()) .expectError(IOException.class) diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java index b595742..2732ebb 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java @@ -80,7 +80,7 @@ void beforEach() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint)) - .membership(opts -> opts.seedMembers(gateway.discovery().address()))) + .membership(opts -> opts.seedMembers(gateway.discoveryAddress()))) .transport(RSocketServiceTransport::new) .services(new TestServiceImpl(onCloseCounter::incrementAndGet)) .startAwait(); @@ -110,7 +110,7 @@ void testCloseServiceStreamAfterLostConnection() { .router(new StaticAddressRouter(gatewayAddress)); StepVerifier.create(serviceCall.api(TestService.class).manyNever().log("<<< ")) - .thenAwait(Duration.ofSeconds(1)) + .thenAwait(Duration.ofSeconds(5)) .then(() -> client.close()) .then(() -> client.onClose().block()) .expectError(IOException.class) @@ -168,8 +168,11 @@ public void testHandlerEvents() throws InterruptedException { @Test void testKeepalive() - throws InterruptedException, NoSuchFieldException, IllegalAccessException, - NoSuchMethodException, InvocationTargetException { + throws InterruptedException, + NoSuchFieldException, + IllegalAccessException, + NoSuchMethodException, + InvocationTargetException { int expectedKeepalives = 3; Duration keepAliveInterval = Duration.ofSeconds(1); diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java index dcae4b6..82933c6 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.RepeatedTest; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.resources.LoopResources; import reactor.test.StepVerifier; class WebsocketClientTest extends BaseTest { @@ -39,9 +40,12 @@ class WebsocketClientTest extends BaseTest { private static Address gatewayAddress; private static Microservices service; private static GatewayClient client; + private static LoopResources loopResources; @BeforeAll static void beforeAll() { + loopResources = LoopResources.create("websocket-gateway-client"); + gateway = Microservices.builder() .discovery( @@ -62,7 +66,7 @@ static void beforeAll() { new ScalecubeServiceDiscovery() .transport(cfg -> cfg.transportFactory(new WebsocketTransportFactory())) .options(opts -> opts.metadata(serviceEndpoint)) - .membership(opts -> opts.seedMembers(gateway.discovery().address()))) + .membership(opts -> opts.seedMembers(gateway.discoveryAddress()))) .transport(RSocketServiceTransport::new) .services(new TestServiceImpl()) .startAwait(); @@ -82,26 +86,33 @@ static void afterAll() { if (client != null) { client.close(); } + Flux.concat( Mono.justOrEmpty(gateway).map(Microservices::shutdown), Mono.justOrEmpty(service).map(Microservices::shutdown)) .then() .block(); + + if (loopResources != null) { + loopResources.disposeLater().block(); + } } - @RepeatedTest(300) + @RepeatedTest(100) void testMessageSequence() { client = new WebsocketGatewayClient( - GatewayClientSettings.builder().address(gatewayAddress).build(), CLIENT_CODEC); + GatewayClientSettings.builder().address(gatewayAddress).build(), + CLIENT_CODEC, + loopResources); ServiceCall serviceCall = new ServiceCall() .transport(new GatewayClientTransport(client)) .router(new StaticAddressRouter(gatewayAddress)); - int count = (int) 1e3; + int count = 100; StepVerifier.create(serviceCall.api(TestService.class).many(count) /*.log("<<< ")*/) .expectNextSequence(IntStream.range(0, count).boxed().collect(Collectors.toList())) diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java index 02c1dbd..ffd9f3b 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketServerTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.RepeatedTest; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.netty.resources.LoopResources; import reactor.test.StepVerifier; class WebsocketServerTest extends BaseTest { @@ -38,9 +39,12 @@ class WebsocketServerTest extends BaseTest { private static Microservices gateway; private static Address gatewayAddress; private static GatewayClient client; + private static LoopResources loopResources; @BeforeAll static void beforeAll() { + loopResources = LoopResources.create("websocket-gateway-client"); + gateway = Microservices.builder() .discovery( @@ -71,22 +75,29 @@ static void afterAll() { if (client != null) { client.close(); } + Mono.justOrEmpty(gateway).map(Microservices::shutdown).then().block(); + + if (loopResources != null) { + loopResources.disposeLater().block(); + } } - @RepeatedTest(300) + @RepeatedTest(100) void testMessageSequence() { client = new WebsocketGatewayClient( - GatewayClientSettings.builder().address(gatewayAddress).build(), CLIENT_CODEC); + GatewayClientSettings.builder().address(gatewayAddress).build(), + CLIENT_CODEC, + loopResources); ServiceCall serviceCall = new ServiceCall() .transport(new GatewayClientTransport(client)) .router(new StaticAddressRouter(gatewayAddress)); - int count = (int) 1e3; + int count = 100; StepVerifier.create(serviceCall.api(TestService.class).many(count) /*.log("<<< ")*/) .expectNextSequence(IntStream.range(0, count).boxed().collect(Collectors.toList()))