diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java index e54bb1f..8f93d64 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClient.java @@ -108,36 +108,32 @@ private WebsocketGatewayClient( @Override public Mono requestResponse(ServiceMessage request) { - return Mono.defer( - () -> { - long sid = sidCounter.incrementAndGet(); - return getOrConnect() - .flatMap( - session -> - session - .send(encodeRequest(request, sid)) - .doOnSubscribe(s -> LOGGER.debug("Sending request {}", request)) - .then(session.newMonoProcessor(sid).asMono()) - .doOnCancel(() -> session.cancel(sid, request.qualifier())) - .doFinally(s -> session.removeProcessor(sid))); - }); + return getOrConnect() + .flatMap( + session -> { + long sid = sidCounter.incrementAndGet(); + return session + .send(encodeRequest(request, sid)) + .doOnSubscribe(s -> LOGGER.debug("Sending request {}", request)) + .then(session.newMonoProcessor(sid).asMono()) + .doOnCancel(() -> session.cancel(sid, request.qualifier())) + .doFinally(s -> session.removeProcessor(sid)); + }); } @Override public Flux requestStream(ServiceMessage request) { - return Flux.defer( - () -> { - long sid = sidCounter.incrementAndGet(); - return getOrConnect() - .flatMapMany( - session -> - session - .send(encodeRequest(request, sid)) - .doOnSubscribe(s -> LOGGER.debug("Sending request {}", request)) - .thenMany(session.newUnicastProcessor(sid).asFlux()) - .doOnCancel(() -> session.cancel(sid, request.qualifier())) - .doFinally(s -> session.removeProcessor(sid))); - }); + return getOrConnect() + .flatMapMany( + session -> { + long sid = sidCounter.incrementAndGet(); + return session + .send(encodeRequest(request, sid)) + .doOnSubscribe(s -> LOGGER.debug("Sending request {}", request)) + .thenMany(session.newUnicastProcessor(sid).asFlux()) + .doOnCancel(() -> session.cancel(sid, request.qualifier())) + .doFinally(s -> session.removeProcessor(sid)); + }); } @Override @@ -161,7 +157,7 @@ private Mono doClose() { private Mono getOrConnect() { // noinspection unchecked - return Mono.defer(() -> websocketMonoUpdater.updateAndGet(this, this::getOrConnect0)); + return websocketMonoUpdater.updateAndGet(this, this::getOrConnect0); } private Mono getOrConnect0( diff --git a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java index 662e291..6827ca0 100644 --- a/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java +++ b/services-gateway-client-transport/src/main/java/io/scalecube/services/gateway/transport/websocket/WebsocketGatewayClientSession.java @@ -10,13 +10,14 @@ import io.scalecube.services.transport.api.ReferenceCountUtil; import java.nio.channels.ClosedChannelException; import java.util.Map; -import java.util.Optional; import java.util.StringJoiner; import org.jctools.maps.NonBlockingHashMapLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.publisher.Sinks.Many; +import reactor.core.publisher.Sinks.One; import reactor.netty.Connection; import reactor.netty.http.websocket.WebsocketInbound; import reactor.netty.http.websocket.WebsocketOutbound; @@ -59,14 +60,16 @@ public final class WebsocketGatewayClientSession { try { message = codec.decode(byteBuf); } catch (Exception ex) { - LOGGER.error("Response decoder failed: " + ex); + LOGGER.error("Response decoder failed:", ex); return; } // ignore messages w/o sid if (!message.headers().containsKey(STREAM_ID)) { LOGGER.error("Ignore response: {} with null sid, session={}", message, id); - Optional.ofNullable(message.data()).ifPresent(ReferenceCountUtil::safestRelease); + if (message.data() != null) { + ReferenceCountUtil.safestRelease(message.data()); + } return; } @@ -74,7 +77,9 @@ public final class WebsocketGatewayClientSession { long sid = Long.parseLong(message.header(STREAM_ID)); Object processor = inboundProcessors.get(sid); if (processor == null) { - Optional.ofNullable(message.data()).ifPresent(ReferenceCountUtil::safestRelease); + if (message.data() != null) { + ReferenceCountUtil.safestRelease(message.data()); + } return; } @@ -88,24 +93,22 @@ public final class WebsocketGatewayClientSession { @SuppressWarnings({"rawtypes", "unchecked"}) Sinks.One newMonoProcessor(long sid) { - return (Sinks.One) - inboundProcessors.computeIfAbsent( - sid, - key -> { - LOGGER.debug("Put sid={}, session={}", sid, id); - return Sinks.one(); - }); + return (Sinks.One) inboundProcessors.computeIfAbsent(sid, this::newMonoProcessor0); } @SuppressWarnings({"rawtypes", "unchecked"}) Sinks.Many newUnicastProcessor(long sid) { - return (Sinks.Many) - inboundProcessors.computeIfAbsent( - sid, - key -> { - LOGGER.debug("Put sid={}, session={}", sid, id); - return Sinks.many().unicast().onBackpressureBuffer(); - }); + return (Sinks.Many) inboundProcessors.computeIfAbsent(sid, this::newUnicastProcessor0); + } + + private One newMonoProcessor0(long sid) { + LOGGER.debug("Put sid={}, session={}", sid, id); + return Sinks.one(); + } + + private Many newUnicastProcessor0(long sid) { + LOGGER.debug("Put sid={}, session={}", sid, id); + return Sinks.many().unicast().onBackpressureBuffer(); } void removeProcessor(long sid) { @@ -115,14 +118,7 @@ void removeProcessor(long sid) { } Mono send(ByteBuf byteBuf) { - return Mono.defer( - () -> { - // send with publisher (defer buffer cleanup to netty) - return connection - .outbound() - .sendObject(Mono.just(byteBuf).map(TextWebSocketFrame::new), f -> true) - .then(); - }); + return connection.outbound().sendObject(new TextWebSocketFrame(byteBuf)).then(); } void cancel(long sid, String qualifier) { @@ -158,25 +154,29 @@ public Mono onClose() { } private void handleResponse(ServiceMessage response, Object processor) { - LOGGER.debug("Handle response: {}, session={}", response, id); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Handle response: {}, session={}", response, id); + } try { - Optional signalOptional = - Optional.ofNullable(response.header(SIGNAL)).map(Signal::from); + Signal signal = null; + final String header = response.header(SIGNAL); + + if (header != null) { + signal = Signal.from(header); + } - if (!signalOptional.isPresent()) { + if (signal == null) { // handle normal response emitNext(processor, response); } else { // handle completion signal - Signal signal = signalOptional.get(); if (signal == Signal.COMPLETE) { emitComplete(processor); } if (signal == Signal.ERROR) { // decode error data to retrieve real error cause - ServiceMessage errorMessage = codec.decodeData(response, ErrorData.class); - emitNext(processor, errorMessage); + emitNext(processor, codec.decodeData(response, ErrorData.class)); } } } catch (Exception e) { diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java index b4e7483..910d0c8 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; -import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; @@ -185,23 +184,22 @@ private void onRequest(WebsocketGatewaySession session, ServiceMessage request, final long sid = getSid(request); final AtomicBoolean receivedError = new AtomicBoolean(false); - final Flux serviceStream = serviceCall.requestMany(request); + Flux serviceStream = serviceCall.requestMany(request); + final String limitRate = request.header(RATE_LIMIT_FIELD); + serviceStream = + limitRate != null ? serviceStream.limitRate(Integer.parseInt(limitRate)) : serviceStream; Disposable disposable = session .send( - Optional.ofNullable(request.header(RATE_LIMIT_FIELD)) - .map(Integer::valueOf) - .map(serviceStream::limitRate) - .orElse(serviceStream) - .map( - response -> { - boolean isErrorResponse = response.isError(); - if (isErrorResponse) { - receivedError.set(true); - } - return newResponseMessage(sid, response, isErrorResponse); - })) + serviceStream.map( + response -> { + boolean isErrorResponse = response.isError(); + if (isErrorResponse) { + receivedError.set(true); + } + return newResponseMessage(sid, response, isErrorResponse); + })) .doOnError( th -> { ReferenceCountUtil.safestRelease(request.data()); diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java index 2aaddf0..33967eb 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewaySession.java @@ -8,6 +8,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.function.Predicate; import org.jctools.maps.NonBlockingHashMapLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +23,8 @@ public final class WebsocketGatewaySession implements GatewaySession { private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketGatewaySession.class); + private static final Predicate SEND_PREDICATE = f -> true; + private final Map subscriptions = new NonBlockingHashMapLong<>(1024); private final GatewaySessionHandler gatewayHandler; @@ -118,7 +121,7 @@ public Mono send(Flux messages) { this, frame.content(), response, (Context) context); return frame; }), - f -> true) + SEND_PREDICATE) .then() .doOnError(th -> gatewayHandler.onError(this, th, (Context) context)); }); @@ -171,7 +174,9 @@ public boolean dispose(Long streamId) { Disposable disposable = subscriptions.remove(streamId); result = disposable != null; if (result) { - LOGGER.debug("Dispose subscription by sid={}, session={}", streamId, sessionId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Dispose subscription by sid={}, session={}", streamId, sessionId); + } disposable.dispose(); } } @@ -188,24 +193,28 @@ public boolean containsSid(Long streamId) { * * @param streamId stream id * @param disposable service subscription - * @return true if disposable subscription was stored */ - public boolean register(Long streamId, Disposable disposable) { + public void register(Long streamId, Disposable disposable) { boolean result = false; if (!disposable.isDisposed()) { result = subscriptions.putIfAbsent(streamId, disposable) == null; } if (result) { - LOGGER.debug("Registered subscription with sid={}, session={}", streamId, sessionId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Registered subscription with sid={}, session={}", streamId, sessionId); + } } - return result; } private void clearSubscriptions() { if (subscriptions.size() > 1) { - LOGGER.debug("Clear all {} subscriptions on session={}", subscriptions.size(), sessionId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Clear all {} subscriptions on session={}", subscriptions.size(), sessionId); + } } else if (subscriptions.size() == 1) { - LOGGER.debug("Clear 1 subscription on session={}", sessionId); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Clear 1 subscription on session={}", sessionId); + } } subscriptions.forEach((sid, disposable) -> disposable.dispose()); subscriptions.clear(); diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketServiceMessageCodec.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketServiceMessageCodec.java index 17b99d2..48615a5 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketServiceMessageCodec.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketServiceMessageCodec.java @@ -24,7 +24,6 @@ import java.io.InputStream; import java.io.OutputStream; import java.util.Map.Entry; -import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -100,7 +99,9 @@ public ByteBuf encode(ServiceMessage message) throws MessageCodecException { generator.writeEndObject(); } catch (Throwable ex) { ReferenceCountUtil.safestRelease(byteBuf); - Optional.ofNullable(message.data()).ifPresent(ReferenceCountUtil::safestRelease); + if (message.data() != null) { + ReferenceCountUtil.safestRelease(message.data()); + } LOGGER.error("Failed to encode gateway service message: {}", message, ex); throw new MessageCodecException("Failed to encode gateway service message", ex); }