Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

Commit

Permalink
Merge pull request #143 from scalecube/refactor-websocket-receive-part
Browse files Browse the repository at this point in the history
Refactored websocket sessions
  • Loading branch information
artem-v authored Jun 6, 2020
2 parents 2bf2b9b + ebd2ef9 commit 7b566f1
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,13 @@ public GatewayClientCodec<ByteBuf> getCodec() {
return codec;
}

private Mono<WebsocketSession> getOrConnect() {
private Mono<WebsocketGatewayClientSession> getOrConnect() {
// noinspection unchecked
return Mono.defer(() -> websocketMonoUpdater.updateAndGet(this, this::getOrConnect0));
}

private Mono<WebsocketSession> getOrConnect0(Mono<WebsocketSession> prev) {
private Mono<WebsocketGatewayClientSession> getOrConnect0(
Mono<WebsocketGatewayClientSession> prev) {
if (prev != null) {
return prev;
}
Expand All @@ -162,7 +163,8 @@ private Mono<WebsocketSession> getOrConnect0(Mono<WebsocketSession> prev) {
: connection)
.map(
connection -> {
WebsocketSession session = new WebsocketSession(codec, connection);
WebsocketGatewayClientSession session =
new WebsocketGatewayClientSession(codec, connection);
LOGGER.info("Created {} on {}:{}", session, settings.host(), settings.port());
// setup shutdown hook
session
Expand Down Expand Up @@ -209,7 +211,7 @@ private void onReadIdle(Connection connection) {
.subscribe(null, ex -> LOGGER.warn("Can't send keepalive on readIdle: " + ex));
}

private void handleCancel(long sid, WebsocketSession session) {
private void handleCancel(long sid, WebsocketGatewayClientSession session) {
ByteBuf byteBuf =
codec.encode(
ServiceMessage.builder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.scalecube.services.gateway.transport.websocket;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.scalecube.services.api.ErrorData;
import io.scalecube.services.api.ServiceMessage;
Expand All @@ -25,9 +23,9 @@
import reactor.netty.http.websocket.WebsocketInbound;
import reactor.netty.http.websocket.WebsocketOutbound;

public final class WebsocketSession {
public final class WebsocketGatewayClientSession {

private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketSession.class);
private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketGatewayClientSession.class);

private static final String STREAM_ID = "sid";
private static final String SIGNAL = "sig";
Expand All @@ -40,19 +38,21 @@ public final class WebsocketSession {
private final Map<Long, Processor<ServiceMessage, ServiceMessage>> inboundProcessors =
new NonBlockingHashMapLong<>(1024);

WebsocketSession(GatewayClientCodec<ByteBuf> codec, Connection connection) {
WebsocketGatewayClientSession(GatewayClientCodec<ByteBuf> codec, Connection connection) {
this.id = Integer.toHexString(System.identityHashCode(this));
this.codec = codec;
this.connection = connection;

WebsocketInbound inbound = (WebsocketInbound) connection.inbound();
inbound
.aggregateFrames()
.receiveFrames()
.filter(f -> !(f instanceof PongWebSocketFrame || f instanceof PingWebSocketFrame))
.map(f -> f.retain().content())
.receive()
.retain()
.subscribe(
byteBuf -> {
if (!byteBuf.isReadable()) {
ReferenceCountUtil.safestRelease(byteBuf);
return;
}
// decode message
ServiceMessage message;
try {
Expand Down Expand Up @@ -186,7 +186,7 @@ private void handleResponse(

@Override
public String toString() {
return new StringJoiner(", ", WebsocketSession.class.getSimpleName() + "[", "]")
return new StringJoiner(", ", WebsocketGatewayClientSession.class.getSimpleName() + "[", "]")
.add("id=" + id)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,15 @@ private Mono<Void> onConnect(WebsocketGatewaySession session) {
.receive()
.doOnError(th -> gatewayHandler.onSessionError(session, th))
.subscribe(
byteBuf ->
Mono.deferWithContext(context -> onRequest(session, byteBuf, context))
.subscriberContext(
context -> gatewayHandler.onRequest(session, byteBuf, context))
.subscribe());
byteBuf -> {
if (!byteBuf.isReadable()) {
ReferenceCountUtil.safestRelease(byteBuf);
return;
}
Mono.deferWithContext(context -> onRequest(session, byteBuf, context))
.subscriberContext(context -> gatewayHandler.onRequest(session, byteBuf, context))
.subscribe();
});

return session.onClose(() -> gatewayHandler.onSessionClose(session));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package io.scalecube.services.gateway.ws;

import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.scalecube.services.api.ServiceMessage;
import io.scalecube.services.gateway.GatewaySession;
Expand Down Expand Up @@ -77,11 +75,7 @@ public Map<String, String> headers() {
* @return flux websocket {@link ByteBuf}
*/
public Flux<ByteBuf> receive() {
return inbound
.aggregateFrames()
.receiveFrames()
.filter(f -> !(f instanceof PongWebSocketFrame || f instanceof PingWebSocketFrame))
.map(f -> f.retain().content());
return inbound.receive().retain();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import io.scalecube.services.gateway.transport.GatewayClientTransports;
import io.scalecube.services.gateway.transport.StaticAddressRouter;
import io.scalecube.services.gateway.transport.websocket.WebsocketGatewayClient;
import io.scalecube.services.gateway.transport.websocket.WebsocketSession;
import io.scalecube.services.gateway.transport.websocket.WebsocketGatewayClientSession;
import io.scalecube.services.gateway.ws.WebsocketGateway;
import io.scalecube.services.transport.rsocket.RSocketServiceTransport;
import java.io.IOException;
Expand Down Expand Up @@ -59,7 +59,6 @@ class WebsocketClientConnectionTest extends BaseTest {
@BeforeEach
void beforEach() {
this.sessionEventHandler = new TestGatewaySessionHandler();
//noinspection unchecked
gateway =
Microservices.builder()
.discovery(ScalecubeServiceDiscovery::new)
Expand Down Expand Up @@ -179,11 +178,12 @@ void testKeepalive()
.build(),
CLIENT_CODEC);

Method getorConn = WebsocketGatewayClient.class.getDeclaredMethod("getOrConnect");
getorConn.setAccessible(true);
Method getOrConnect = WebsocketGatewayClient.class.getDeclaredMethod("getOrConnect");
getOrConnect.setAccessible(true);
//noinspection unchecked
WebsocketSession session = ((Mono<WebsocketSession>) getorConn.invoke(client)).block(TIMEOUT);
Field connectionField = WebsocketSession.class.getDeclaredField("connection");
WebsocketGatewayClientSession session =
((Mono<WebsocketGatewayClientSession>) getOrConnect.invoke(client)).block(TIMEOUT);
Field connectionField = WebsocketGatewayClientSession.class.getDeclaredField("connection");
connectionField.setAccessible(true);
Connection connection = (Connection) connectionField.get(session);
connection.addHandler(
Expand All @@ -192,8 +192,9 @@ void testKeepalive()
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof PongWebSocketFrame) {
keepaliveLatch.countDown();
} else {
super.channelRead(ctx, msg);
}
super.channelRead(ctx, msg);
}
});

Expand Down

0 comments on commit 7b566f1

Please sign in to comment.