From b3f70738f6a9fb27999b9774a44a92956670e6f4 Mon Sep 17 00:00:00 2001 From: Artem Vysochyn Date: Sat, 23 May 2020 22:37:25 +0300 Subject: [PATCH 1/6] WIP on `Copy message headers to corresp. client transport` issue 131 --- .../transport/GatewayClientSettings.java | 19 +++++++++++++++++-- .../websocket/WebsocketGatewayClient.java | 1 + 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java index 230450a8..14c15544 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java @@ -4,6 +4,9 @@ import io.scalecube.services.exceptions.DefaultErrorMapper; import io.scalecube.services.exceptions.ServiceClientErrorMapper; import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import reactor.netty.tcp.SslProvider; public class GatewayClientSettings { @@ -20,6 +23,7 @@ public class GatewayClientSettings { private final ServiceClientErrorMapper errorMapper; private final Duration keepAliveInterval; private final boolean wiretap; + private final Map headers; private GatewayClientSettings(Builder builder) { this.host = builder.host; @@ -30,6 +34,7 @@ private GatewayClientSettings(Builder builder) { this.errorMapper = builder.errorMapper; this.keepAliveInterval = builder.keepAliveInterval; this.wiretap = builder.wiretap; + this.headers = Collections.unmodifiableMap(builder.headers); } public String host() { @@ -64,6 +69,10 @@ public boolean wiretap() { return this.wiretap; } + public Map headers() { + return headers; + } + public static Builder builder() { return new Builder(); } @@ -96,9 +105,9 @@ public static class Builder { private ServiceClientErrorMapper errorMapper = DefaultErrorMapper.INSTANCE; private Duration keepAliveInterval = DEFAULT_KEEPALIVE_INTERVAL; private boolean wiretap = false; + private Map headers = new HashMap<>(); - private Builder() { - } + private Builder() {} private Builder(GatewayClientSettings originalSettings) { this.host = originalSettings.host; @@ -109,6 +118,7 @@ private Builder(GatewayClientSettings originalSettings) { this.errorMapper = originalSettings.errorMapper; this.keepAliveInterval = originalSettings.keepAliveInterval; this.wiretap = originalSettings.wiretap; + this.headers = Collections.unmodifiableMap(new HashMap<>(originalSettings.headers)); } public Builder host(String host) { @@ -191,6 +201,11 @@ public Builder errorMapper(ServiceClientErrorMapper errorMapper) { return this; } + public Builder headers(Map headers) { + this.headers = new HashMap<>(headers); + return this; + } + public GatewayClientSettings build() { return new GatewayClientSettings(this); } diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java index 905792fc..432657de 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java @@ -55,6 +55,7 @@ public WebsocketGatewayClient(GatewayClientSettings settings, GatewayClientCodec httpClient = HttpClient.newConnection() + .headers(headers -> settings.headers().forEach(headers::add)) .followRedirect(settings.followRedirect()) .tcpConfiguration( tcpClient -> { From 426e4f9db33f3ae1252df4b235de608ced1d577a Mon Sep 17 00:00:00 2001 From: segabriel Date: Sun, 24 May 2020 20:59:09 +0300 Subject: [PATCH 2/6] Put client settings headers on connection setup --- .../transport/GatewayClientSettings.java | 7 ++--- .../transport/http/HttpGatewayClient.java | 1 + .../rsocket/RSocketGatewayClient.java | 7 +++++ .../gateway/TestGatewaySessionHandler.java | 9 ++++++ .../rsocket/RsocketClientConnectionTest.java | 27 +++++++++++++++++ .../WebsocketClientConnectionTest.java | 29 +++++++++++++++++++ 6 files changed, 76 insertions(+), 4 deletions(-) diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java index 14c15544..212b3baa 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java @@ -5,7 +5,6 @@ import io.scalecube.services.exceptions.ServiceClientErrorMapper; import java.time.Duration; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import reactor.netty.tcp.SslProvider; @@ -105,7 +104,7 @@ public static class Builder { private ServiceClientErrorMapper errorMapper = DefaultErrorMapper.INSTANCE; private Duration keepAliveInterval = DEFAULT_KEEPALIVE_INTERVAL; private boolean wiretap = false; - private Map headers = new HashMap<>(); + private Map headers = Collections.emptyMap(); private Builder() {} @@ -118,7 +117,7 @@ private Builder(GatewayClientSettings originalSettings) { this.errorMapper = originalSettings.errorMapper; this.keepAliveInterval = originalSettings.keepAliveInterval; this.wiretap = originalSettings.wiretap; - this.headers = Collections.unmodifiableMap(new HashMap<>(originalSettings.headers)); + this.headers = Map.copyOf(originalSettings.headers); } public Builder host(String host) { @@ -202,7 +201,7 @@ public Builder errorMapper(ServiceClientErrorMapper errorMapper) { } public Builder headers(Map headers) { - this.headers = new HashMap<>(headers); + this.headers = Map.copyOf(headers); return this; } diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java index 5bd190eb..0a790e4f 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/http/HttpGatewayClient.java @@ -42,6 +42,7 @@ public HttpGatewayClient(GatewayClientSettings settings, GatewayClientCodec settings.headers().forEach(headers::add)) .followRedirect(settings.followRedirect()) .tcpConfiguration( tcpClient -> { diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/rsocket/RSocketGatewayClient.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/rsocket/RSocketGatewayClient.java index 107574d0..98e3a63a 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/rsocket/RSocketGatewayClient.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/rsocket/RSocketGatewayClient.java @@ -5,6 +5,7 @@ import io.rsocket.core.RSocketConnector; import io.rsocket.frame.decoder.PayloadDecoder; import io.rsocket.transport.netty.client.WebsocketClientTransport; +import io.rsocket.util.EmptyPayload; import io.scalecube.services.api.ServiceMessage; import io.scalecube.services.exceptions.ConnectionClosedException; import io.scalecube.services.gateway.transport.GatewayClient; @@ -135,8 +136,14 @@ private Mono getOrConnect0(Mono prev) { return prev; } + Payload setupPayload = EmptyPayload.INSTANCE; + if (!settings.headers().isEmpty()) { + setupPayload = codec.encode(ServiceMessage.builder().headers(settings.headers()).build()); + } + return RSocketConnector.create() .payloadDecoder(PayloadDecoder.DEFAULT) + .setupPayload(setupPayload) .metadataMimeType(settings.contentType()) .connect(createRSocketTransport(settings)) .doOnSuccess( diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/TestGatewaySessionHandler.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/TestGatewaySessionHandler.java index 84a30ac7..0ba86bbf 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/TestGatewaySessionHandler.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/TestGatewaySessionHandler.java @@ -2,6 +2,7 @@ import io.scalecube.services.api.ServiceMessage; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; import reactor.util.context.Context; public class TestGatewaySessionHandler implements GatewaySessionHandler { @@ -9,6 +10,7 @@ public class TestGatewaySessionHandler implements GatewaySessionHandler { public final CountDownLatch msgLatch = new CountDownLatch(1); public final CountDownLatch connLatch = new CountDownLatch(1); public final CountDownLatch disconnLatch = new CountDownLatch(1); + private final AtomicReference lastSession = new AtomicReference<>(); @Override public ServiceMessage mapMessage(GatewaySession s, ServiceMessage req, Context context) { @@ -18,11 +20,18 @@ public ServiceMessage mapMessage(GatewaySession s, ServiceMessage req, Context c @Override public void onSessionOpen(GatewaySession s) { + System.err.println(s); + connLatch.countDown(); + lastSession.set(s); } @Override public void onSessionClose(GatewaySession s) { disconnLatch.countDown(); } + + public GatewaySession lastSession() { + return lastSession.get(); + } } diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java index bd2a00db..421ee008 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java @@ -23,11 +23,13 @@ import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import java.io.IOException; import java.time.Duration; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -149,4 +151,29 @@ public void testHandlerEvents() throws InterruptedException { sessionEventHandler.disconnLatch.await(3, TimeUnit.SECONDS); Assertions.assertEquals(0, sessionEventHandler.disconnLatch.getCount()); } + + @Test + @Disabled // todo implement passing headers to rsocket session + void testClintSettingsHeaders() { + String headerKey = "secret-token"; + String headerValue = UUID.randomUUID().toString(); + client = + new RSocketGatewayClient( + GatewayClientSettings.builder().address(gatewayAddress).build(), CLIENT_CODEC); + + TestService service = + new ServiceCall() + .transport(new GatewayClientTransport(client)) + .router(new StaticAddressRouter(gatewayAddress)) + .api(TestService.class); + + StepVerifier.create( + service.one("one").then(Mono.fromCallable(() -> sessionEventHandler.lastSession()))) + .assertNext( + session -> { + assertEquals(headerValue, session.headers().get(headerKey).get(0)); + }) + .expectComplete() + .verify(TIMEOUT); + } } diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java index 4f73e450..73a1b466 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java @@ -31,6 +31,8 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.time.Duration; +import java.util.Map; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -200,4 +202,31 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception assertEquals(0, keepaliveLatch.getCount()); } + + @Test + void testClintSettingsHeaders() { + String headerKey = "secret-token"; + String headerValue = UUID.randomUUID().toString(); + client = + new WebsocketGatewayClient( + GatewayClientSettings.builder() + .address(gatewayAddress) + .headers(Map.of(headerKey, headerValue)) + .build(), + CLIENT_CODEC); + TestService service = + new ServiceCall() + .transport(new GatewayClientTransport(client)) + .router(new StaticAddressRouter(gatewayAddress)) + .api(TestService.class); + + StepVerifier.create( + service.one("one").then(Mono.fromCallable(() -> sessionEventHandler.lastSession()))) + .assertNext( + session -> { + assertEquals(headerValue, session.headers().get(headerKey).get(0)); + }) + .expectComplete() + .verify(TIMEOUT); + } } From dd4fcb3fc27cc30c8f504ec56de5d1f9a5a64e0c Mon Sep 17 00:00:00 2001 From: segabriel Date: Sun, 24 May 2020 21:18:00 +0300 Subject: [PATCH 3/6] Change headers from Map> to Map according to #130 --- .../services/gateway/GatewaySession.java | 5 ++--- .../services/gateway/GatewaySessionHandler.java | 3 +-- .../gateway/rsocket/RSocketGatewaySession.java | 3 +-- .../gateway/ws/WebsocketGatewayAcceptor.java | 15 ++++++--------- .../gateway/ws/WebsocketGatewaySession.java | 8 ++++---- .../gateway/TestGatewaySessionHandler.java | 2 -- .../rsocket/RsocketClientConnectionTest.java | 5 +---- .../websocket/WebsocketClientConnectionTest.java | 5 +---- 8 files changed, 16 insertions(+), 30 deletions(-) diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewaySession.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewaySession.java index 871c66f4..aa3b3d95 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewaySession.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewaySession.java @@ -1,6 +1,5 @@ package io.scalecube.services.gateway; -import java.util.List; import java.util.Map; public interface GatewaySession { @@ -15,7 +14,7 @@ public interface GatewaySession { /** * Returns headers associated with session. * - * @return heades map + * @return headers map */ - Map> headers(); + Map headers(); } diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewaySessionHandler.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewaySessionHandler.java index 71ac8e33..650894d5 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewaySessionHandler.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/GatewaySessionHandler.java @@ -2,7 +2,6 @@ import io.netty.buffer.ByteBuf; import io.scalecube.services.api.ServiceMessage; -import java.util.List; import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +83,7 @@ default void onSessionError(GatewaySession session, Throwable throwable) { * @param headers connection/session headers * @return mono result */ - default Mono onConnectionOpen(long sessionId, Map> headers) { + default Mono onConnectionOpen(long sessionId, Map headers) { return Mono.fromRunnable( () -> LOGGER.debug( diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java index 741c3980..74402f03 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java @@ -10,7 +10,6 @@ import io.scalecube.services.gateway.ReferenceCountUtil; import io.scalecube.services.gateway.ServiceMessageCodec; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; @@ -53,7 +52,7 @@ public long sessionId() { } @Override - public Map> headers() { + public Map headers() { return Collections.emptyMap(); } diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java index 32f0f700..83da2c0a 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java @@ -21,14 +21,14 @@ import io.scalecube.services.exceptions.UnauthorizedException; import io.scalecube.services.gateway.GatewaySessionHandler; import io.scalecube.services.gateway.ReferenceCountUtil; -import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; +import java.util.stream.Collectors; import org.reactivestreams.Publisher; import reactor.core.Disposable; import reactor.core.publisher.Flux; @@ -63,7 +63,7 @@ public WebsocketGatewayAcceptor(ServiceCall serviceCall, GatewaySessionHandler g @Override public Publisher apply(HttpServerRequest httpRequest, HttpServerResponse httpResponse) { - final Map> headers = computeHeaders(httpRequest.requestHeaders()); + final Map headers = computeHeaders(httpRequest.requestHeaders()); final long sessionId = SESSION_ID_GENERATOR.incrementAndGet(); return gatewayHandler @@ -85,12 +85,9 @@ public Publisher apply(HttpServerRequest httpRequest, HttpServerResponse h .onErrorResume(throwable -> Mono.empty()); } - private static Map> computeHeaders(HttpHeaders httpHeaders) { - Map> headers = new HashMap<>(); - for (String name : httpHeaders.names()) { - headers.put(name, httpHeaders.getAll(name)); - } - return headers; + private static Map computeHeaders(HttpHeaders httpHeaders) { + // exception will be thrown on duplicate + return httpHeaders.entries().stream().collect(Collectors.toMap(Entry::getKey, Entry::getValue)); } private static int toStatusCode(Throwable throwable) { diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java index ff04690c..969b1413 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java @@ -33,7 +33,7 @@ public final class WebsocketGatewaySession implements GatewaySession { private final WebsocketServiceMessageCodec codec; private final long sessionId; - private final Map> headers; + private final Map headers; /** * Create a new websocket session with given handshake, inbound and outbound channels. @@ -48,14 +48,14 @@ public final class WebsocketGatewaySession implements GatewaySession { public WebsocketGatewaySession( long sessionId, WebsocketServiceMessageCodec codec, - Map> headers, + Map headers, WebsocketInbound inbound, WebsocketOutbound outbound, GatewaySessionHandler gatewayHandler) { this.sessionId = sessionId; this.codec = codec; - this.headers = Collections.unmodifiableMap(new HashMap<>(headers)); + this.headers = Map.copyOf(headers); this.inbound = (WebsocketInbound) inbound.withConnection(c -> c.onDispose(this::clearSubscriptions)); this.outbound = outbound; @@ -68,7 +68,7 @@ public long sessionId() { } @Override - public Map> headers() { + public Map headers() { return headers; } diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/TestGatewaySessionHandler.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/TestGatewaySessionHandler.java index 0ba86bbf..e4562add 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/TestGatewaySessionHandler.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/TestGatewaySessionHandler.java @@ -20,8 +20,6 @@ public ServiceMessage mapMessage(GatewaySession s, ServiceMessage req, Context c @Override public void onSessionOpen(GatewaySession s) { - System.err.println(s); - connLatch.countDown(); lastSession.set(s); } diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java index 421ee008..83064b47 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java @@ -169,10 +169,7 @@ void testClintSettingsHeaders() { StepVerifier.create( service.one("one").then(Mono.fromCallable(() -> sessionEventHandler.lastSession()))) - .assertNext( - session -> { - assertEquals(headerValue, session.headers().get(headerKey).get(0)); - }) + .assertNext(session -> assertEquals(headerValue, session.headers().get(headerKey))) .expectComplete() .verify(TIMEOUT); } diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java index 73a1b466..5e4fffaa 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java @@ -222,10 +222,7 @@ void testClintSettingsHeaders() { StepVerifier.create( service.one("one").then(Mono.fromCallable(() -> sessionEventHandler.lastSession()))) - .assertNext( - session -> { - assertEquals(headerValue, session.headers().get(headerKey).get(0)); - }) + .assertNext(session -> assertEquals(headerValue, session.headers().get(headerKey))) .expectComplete() .verify(TIMEOUT); } From 0c02e53b79f5ca7180165cb1ba15bde694db64e2 Mon Sep 17 00:00:00 2001 From: segabriel Date: Sun, 24 May 2020 21:38:01 +0300 Subject: [PATCH 4/6] Implement decoding rsocket headers on connect setup --- .../gateway/rsocket/RSocketGatewayAcceptor.java | 9 +++++++++ .../gateway/rsocket/RSocketGatewaySession.java | 6 ++++-- .../gateway/rsocket/RsocketClientConnectionTest.java | 11 +++++++---- .../websocket/WebsocketClientConnectionTest.java | 2 +- 4 files changed, 21 insertions(+), 7 deletions(-) diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewayAcceptor.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewayAcceptor.java index f49d246f..a7732f27 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewayAcceptor.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewayAcceptor.java @@ -7,6 +7,7 @@ import io.scalecube.services.gateway.GatewaySessionHandler; import io.scalecube.services.gateway.ServiceMessageCodec; import io.scalecube.services.transport.api.HeadersCodec; +import java.util.Map; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; @@ -41,6 +42,7 @@ public Mono accept(ConnectionSetupPayload setup, RSocket rsocket) { new RSocketGatewaySession( serviceCall, messageCodec, + headers(messageCodec, setup), (session, req) -> sessionHandler.mapMessage(session, req, Context.empty())); sessionHandler.onSessionOpen(gatewaySession); rsocket @@ -54,4 +56,11 @@ public Mono accept(ConnectionSetupPayload setup, RSocket rsocket) { return Mono.just(gatewaySession); } + + private Map headers( + ServiceMessageCodec messageCodec, ConnectionSetupPayload setup) { + return messageCodec + .decode(setup.sliceData().retain(), setup.sliceMetadata().retain()) + .headers(); + } } diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java index 74402f03..5f4845b9 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java @@ -9,7 +9,6 @@ import io.scalecube.services.gateway.GatewaySession; import io.scalecube.services.gateway.ReferenceCountUtil; import io.scalecube.services.gateway.ServiceMessageCodec; -import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; @@ -29,6 +28,7 @@ public final class RSocketGatewaySession extends AbstractRSocket implements Gate private final ServiceMessageCodec messageCodec; private final long sessionId; private final BiFunction messageMapper; + private final Map headers; /** * Constructor for gateway rsocket. @@ -39,11 +39,13 @@ public final class RSocketGatewaySession extends AbstractRSocket implements Gate public RSocketGatewaySession( ServiceCall serviceCall, ServiceMessageCodec messageCodec, + Map headers, BiFunction messageMapper) { this.serviceCall = serviceCall; this.messageCodec = messageCodec; this.messageMapper = messageMapper; this.sessionId = SESSION_ID_GENERATOR.incrementAndGet(); + this.headers = Map.copyOf(headers); } @Override @@ -53,7 +55,7 @@ public long sessionId() { @Override public Map headers() { - return Collections.emptyMap(); + return headers; } @Override diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java index 83064b47..ab619973 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/rsocket/RsocketClientConnectionTest.java @@ -23,13 +23,13 @@ import io.scalecube.services.transport.rsocket.RSocketServiceTransport; import java.io.IOException; import java.time.Duration; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -153,13 +153,16 @@ public void testHandlerEvents() throws InterruptedException { } @Test - @Disabled // todo implement passing headers to rsocket session - void testClintSettingsHeaders() { + void testClientSettingsHeaders() { String headerKey = "secret-token"; String headerValue = UUID.randomUUID().toString(); client = new RSocketGatewayClient( - GatewayClientSettings.builder().address(gatewayAddress).build(), CLIENT_CODEC); + GatewayClientSettings.builder() + .headers(Map.of(headerKey, headerValue)) + .address(gatewayAddress) + .build(), + CLIENT_CODEC); TestService service = new ServiceCall() diff --git a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java index 5e4fffaa..00425b0b 100644 --- a/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java +++ b/services-gateway-tests/src/test/java/io/scalecube/services/gateway/websocket/WebsocketClientConnectionTest.java @@ -204,7 +204,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception } @Test - void testClintSettingsHeaders() { + void testClientSettingsHeaders() { String headerKey = "secret-token"; String headerValue = UUID.randomUUID().toString(); client = From 3a9014e2cf4af6876bde728d76646897974d6981 Mon Sep 17 00:00:00 2001 From: segabriel Date: Sun, 24 May 2020 21:41:31 +0300 Subject: [PATCH 5/6] Polishing --- .../services/gateway/transport/GatewayClientSettings.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java index 212b3baa..d655601b 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java @@ -33,7 +33,7 @@ private GatewayClientSettings(Builder builder) { this.errorMapper = builder.errorMapper; this.keepAliveInterval = builder.keepAliveInterval; this.wiretap = builder.wiretap; - this.headers = Collections.unmodifiableMap(builder.headers); + this.headers = builder.headers; } public String host() { From 8fb5ed2c44d5b031e3cb8b6f8287045ca569dc86 Mon Sep 17 00:00:00 2001 From: segabriel Date: Sun, 24 May 2020 23:14:31 +0300 Subject: [PATCH 6/6] Revert usage of Map.copyOf --- .../services/gateway/transport/GatewayClientSettings.java | 5 +++-- .../services/gateway/rsocket/RSocketGatewaySession.java | 4 +++- .../services/gateway/ws/WebsocketGatewaySession.java | 3 +-- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java index d655601b..c02acbc5 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/GatewayClientSettings.java @@ -5,6 +5,7 @@ import io.scalecube.services.exceptions.ServiceClientErrorMapper; import java.time.Duration; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import reactor.netty.tcp.SslProvider; @@ -117,7 +118,7 @@ private Builder(GatewayClientSettings originalSettings) { this.errorMapper = originalSettings.errorMapper; this.keepAliveInterval = originalSettings.keepAliveInterval; this.wiretap = originalSettings.wiretap; - this.headers = Map.copyOf(originalSettings.headers); + this.headers = Collections.unmodifiableMap(new HashMap<>(originalSettings.headers)); } public Builder host(String host) { @@ -201,7 +202,7 @@ public Builder errorMapper(ServiceClientErrorMapper errorMapper) { } public Builder headers(Map headers) { - this.headers = Map.copyOf(headers); + this.headers = Collections.unmodifiableMap(new HashMap<>(headers)); return this; } diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java index db51fc5d..85e713b5 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/rsocket/RSocketGatewaySession.java @@ -9,6 +9,8 @@ import io.scalecube.services.gateway.GatewaySession; import io.scalecube.services.gateway.ReferenceCountUtil; import io.scalecube.services.gateway.ServiceMessageCodec; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; @@ -45,7 +47,7 @@ public RSocketGatewaySession( this.messageCodec = messageCodec; this.messageMapper = messageMapper; this.sessionId = SESSION_ID_GENERATOR.incrementAndGet(); - this.headers = Map.copyOf(headers); + this.headers = Collections.unmodifiableMap(new HashMap<>(headers)); } @Override diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java index 969b1413..b21f0994 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java @@ -9,7 +9,6 @@ import io.scalecube.services.gateway.GatewaySessionHandler; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.jctools.maps.NonBlockingHashMapLong; import org.slf4j.Logger; @@ -55,7 +54,7 @@ public WebsocketGatewaySession( this.sessionId = sessionId; this.codec = codec; - this.headers = Map.copyOf(headers); + this.headers = Collections.unmodifiableMap(new HashMap<>(headers)); this.inbound = (WebsocketInbound) inbound.withConnection(c -> c.onDispose(this::clearSubscriptions)); this.outbound = outbound;