diff --git a/src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java b/src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java index 7449a4f..cdd33e1 100644 --- a/src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java +++ b/src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java @@ -363,14 +363,14 @@ private void handleConnect(Frame frame, StompServerConnection connection) { pingH = pingHandler; } - // Compute heartbeat, and register pinger and ponger + // Stomp server acts as a client to call the computePingPeriod & computePongPeriod method long ping = Frame.Heartbeat.computePingPeriod( - Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)), - Frame.Heartbeat.create(connection.server().options().getHeartbeat())); + Frame.Heartbeat.create(connection.server().options().getHeartbeat()), + Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT))); long pong = Frame.Heartbeat.computePongPeriod( - Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)), - Frame.Heartbeat.create(connection.server().options().getHeartbeat())); + Frame.Heartbeat.create(connection.server().options().getHeartbeat()), + Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT))); connection.configureHeartbeat(ping, pong, pingH); @@ -457,7 +457,7 @@ public StompServerHandler onAuthenticationRequest(StompServerConnection connecti public User getUserBySession(String session) { return this.users.get(session); } - + @Override public List getDestinations() { return new ArrayList<>(destinations.keySet()); diff --git a/src/main/java/io/vertx/ext/stomp/impl/StompClientConnectionImpl.java b/src/main/java/io/vertx/ext/stomp/impl/StompClientConnectionImpl.java index 3b3fa14..91a55da 100644 --- a/src/main/java/io/vertx/ext/stomp/impl/StompClientConnectionImpl.java +++ b/src/main/java/io/vertx/ext/stomp/impl/StompClientConnectionImpl.java @@ -561,12 +561,13 @@ private synchronized void handleConnected(Frame frame) { server = frame.getHeader(Frame.SERVER); // Compute the heartbeat. + // Stomp client acts as a client to call the computePingPeriod & computePongPeriod method long ping = Frame.Heartbeat.computePingPeriod( - Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)), - Frame.Heartbeat.create(client.options().getHeartbeat())); + Frame.Heartbeat.create(client.options().getHeartbeat()), + Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT))); long pong = Frame.Heartbeat.computePongPeriod( - Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)), - Frame.Heartbeat.create(client.options().getHeartbeat())); + Frame.Heartbeat.create(client.options().getHeartbeat()), + Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT))); if (ping > 0) { pinger = client.vertx().setPeriodic(ping, l -> pingHandler.handle(this)); diff --git a/src/test/java/io/vertx/ext/stomp/impl/StompClientImplTest.java b/src/test/java/io/vertx/ext/stomp/impl/StompClientImplTest.java index de542ec..aeb8134 100644 --- a/src/test/java/io/vertx/ext/stomp/impl/StompClientImplTest.java +++ b/src/test/java/io/vertx/ext/stomp/impl/StompClientImplTest.java @@ -391,6 +391,49 @@ public void testServerHeartbeatWhenNoClientActivity() { ); } + @Test + public void testAsymmetricHeartbeatTime() throws InterruptedException { + AtomicReference reference = new AtomicReference<>(); + AsyncLock lock = new AsyncLock<>(); + server.close(lock.handler()); + lock.waitForSuccess(); + lock = new AsyncLock(); + + List serverReceivedPingTimestamps = new ArrayList<>(); + server = StompServer.create(vertx, + new StompServerOptions().setHeartbeat(new JsonObject().put("x", 300).put("y", 200))) + .handler(StompServerHandler.create(vertx).receivedFrameHandler(frame -> { + if(Frame.Command.PING.equals(frame.frame().getCommand())) { + serverReceivedPingTimestamps.add(System.currentTimeMillis()); + } + })).listen(lock.handler()); + + + lock.waitForSuccess(); + + List clientReceivedPingTimestamps = new ArrayList<>(); + StompClient client = StompClient.create(vertx, new StompClientOptions().setHeartbeat(new JsonObject() + .put("x", 100).put("y", 400))) + .receivedFrameHandler(frame -> { + if(Frame.Command.PING.equals(frame.getCommand())) { + clientReceivedPingTimestamps.add(System.currentTimeMillis()); + } + }); + client.connect(ar -> reference.set(ar.result())); + + Thread.sleep(2000); + + // The actual heartbeat of client is 200, assert it greater than 150 considering network delay + serverReceivedPingTimestamps.stream().reduce(0L, (x,y) -> { + assertTrue((y-x) > 150); return y;}); + + // The actual heartbeat of server is 400, assert it greater than 350 considering network delay + clientReceivedPingTimestamps.stream().reduce(0L,(x,y) -> { + assertTrue((y-x) > 350); return y;}); + + assertThat(reference.get().server()).isNotNull(); + } + @Test public void testConnectionDroppedHandler() throws InterruptedException { AtomicBoolean flag = new AtomicBoolean(true);