diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/GatewayMessages.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/GatewayMessages.java index aba8889d..09c3186d 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/GatewayMessages.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/GatewayMessages.java @@ -1,11 +1,9 @@ package io.scalecube.services.gateway.ws; import io.scalecube.services.api.ServiceMessage; -import io.scalecube.services.api.ServiceMessage.Builder; import io.scalecube.services.exceptions.DefaultErrorMapper; -import java.util.Optional; -final class GatewayMessages { +public final class GatewayMessages { static final String QUALIFIER_FIELD = "q"; static final String STREAM_ID_FIELD = "sid"; @@ -18,49 +16,95 @@ private GatewayMessages() { // Do not instantiate } - static ServiceMessage newCancelMessage(long sid) { - Builder builder = ServiceMessage.builder(); - setSid(builder, sid); - setSignal(builder, Signal.CANCEL); - return builder.build(); + /** + * Returns cancel message by given arguments. + * + * @param sid sid + * @param qualifier qualifier + * @return {@link ServiceMessage} instance as the cancel signal + */ + public static ServiceMessage newCancelMessage(long sid, String qualifier) { + return ServiceMessage.builder() + .qualifier(qualifier) + .header(STREAM_ID_FIELD, sid) + .header(SIGNAL_FIELD, Signal.CANCEL.code()) + .build(); } - static ServiceMessage newErrorMessage(ServiceMessage message, Throwable th) { + /** + * Returns error message by given arguments. + * + * @param message request + * @param th cause + * @return {@link ServiceMessage} instance as the error signal + */ + public static ServiceMessage newErrorMessage(ServiceMessage message, Throwable th) { ServiceMessage.Builder builder = - ServiceMessage.from(DefaultErrorMapper.INSTANCE.toMessage(message.qualifier(), th)); - Optional.ofNullable(GatewayMessages.getSidHeader(message)) - .ifPresent(s -> GatewayMessages.setSid(builder, s)); - GatewayMessages.setSignal(builder, Signal.ERROR); + ServiceMessage.from(DefaultErrorMapper.INSTANCE.toMessage(message.qualifier(), th)) + .header(SIGNAL_FIELD, Signal.ERROR.code()); + String sid = message.header(STREAM_ID_FIELD); + if (sid != null) { + builder.header(STREAM_ID_FIELD, sid); + } return builder.build(); } - static ServiceMessage newCompleteMessage(ServiceMessage message) { - ServiceMessage.Builder builder = ServiceMessage.builder(); - Optional.ofNullable(GatewayMessages.getSidHeader(message)) - .ifPresent(s -> GatewayMessages.setSid(builder, s)); - GatewayMessages.setSignal(builder, Signal.COMPLETE); - return builder.build(); + /** + * Returns complete message by given arguments. + * + * @param sid sid + * @param qualifier qualifier + * @return {@link ServiceMessage} instance as the complete signal + */ + public static ServiceMessage newCompleteMessage(long sid, String qualifier) { + return ServiceMessage.builder() + .qualifier(qualifier) + .header(STREAM_ID_FIELD, sid) + .header(SIGNAL_FIELD, Signal.COMPLETE.code()) + .build(); } - static ServiceMessage newResponseMessage( + /** + * Returns response message by given arguments. + * + * @param sid sid + * @param message request + * @param isErrorResponse should the message be marked as an error? + * @return {@link ServiceMessage} instance as the response + */ + public static ServiceMessage newResponseMessage( long sid, ServiceMessage message, boolean isErrorResponse) { - ServiceMessage.Builder builder = ServiceMessage.from(message); - GatewayMessages.setSid(builder, sid); if (isErrorResponse) { - GatewayMessages.setSignal(builder, Signal.ERROR); + return ServiceMessage.from(message) + .header(STREAM_ID_FIELD, sid) + .header(SIGNAL_FIELD, Signal.ERROR.code()) + .build(); } - return builder.build(); + return ServiceMessage.from(message).header(STREAM_ID_FIELD, sid).build(); } - static ServiceMessage validateSid(ServiceMessage message) { - if (getSidHeader(message) == null) { + /** + * Verifies the sid existence in a given message. + * + * @param message message + * @return incoming message + */ + public static ServiceMessage validateSid(ServiceMessage message) { + if (message.header(STREAM_ID_FIELD) == null) { throw WebsocketContextException.badRequest("sid is missing", message); } else { return message; } } - static ServiceMessage validateSidOnSession( + /** + * Verifies the sid is not used in a given session. + * + * @param session session + * @param message message + * @return incoming message + */ + public static ServiceMessage validateSidOnSession( WebsocketGatewaySession session, ServiceMessage message) { long sid = getSid(message); if (session.containsSid(sid)) { @@ -70,35 +114,37 @@ static ServiceMessage validateSidOnSession( } } - static ServiceMessage validateQualifier(ServiceMessage message) { + /** + * Verifies the qualifier existence in a given message. + * + * @param message message + * @return incoming message + */ + public static ServiceMessage validateQualifier(ServiceMessage message) { if (message.qualifier() == null) { throw WebsocketContextException.badRequest("qualifier is missing", message); } return message; } - static long getSid(ServiceMessage message) { - return Long.parseLong(getSidHeader(message)); - } - - static String getSidHeader(ServiceMessage message) { - return message.header(STREAM_ID_FIELD); + /** + * Returns sid from a given message. + * + * @param message message + * @return sid + */ + public static long getSid(ServiceMessage message) { + return Long.parseLong(message.header(STREAM_ID_FIELD)); } - static ServiceMessage.Builder setSid(ServiceMessage.Builder builder, long sid) { - return builder.header(STREAM_ID_FIELD, sid); - } - - static ServiceMessage.Builder setSid(ServiceMessage.Builder builder, String sid) { - return builder.header(STREAM_ID_FIELD, sid); - } - - static Signal getSignal(ServiceMessage message) { + /** + * Returns signal from a given message. + * + * @param message message + * @return signal + */ + public static Signal getSignal(ServiceMessage message) { String header = message.header(SIGNAL_FIELD); return header != null ? Signal.from(Integer.parseInt(header)) : null; } - - static ServiceMessage.Builder setSignal(ServiceMessage.Builder builder, Signal signal) { - return builder.header(SIGNAL_FIELD, signal.code()); - } } diff --git a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java index 83da2c0a..418f1bab 100644 --- a/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java +++ b/services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java @@ -184,7 +184,7 @@ private void onMessage(WebsocketGatewaySession session, ServiceMessage message, () -> { if (!receivedError.get()) { session - .send(newCompleteMessage(message)) + .send(newCompleteMessage(sid, message.qualifier())) .subscriberContext(context) .subscribe(); } @@ -205,6 +205,7 @@ private Mono onCancel(WebsocketGatewaySession session, ServiceMessage message long sid = getSid(message); // dispose by sid (if anything to dispose) session.dispose(sid); - return session.send(newCancelMessage(sid)); // no need to subscribe here since flatMap will do + // no need to subscribe here since flatMap will do + return session.send(newCancelMessage(sid, message.qualifier())); } }