Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the heartbeat issue #40 #42

Merged
merged 1 commit into from
Jul 9, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/main/java/io/vertx/ext/stomp/impl/DefaultStompHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -363,14 +363,14 @@ private void handleConnect(Frame frame, StompServerConnection connection) {
pingH = pingHandler;
}


// Compute heartbeat, and register pinger and ponger
// Stomp server acts as a client to call the computePingPeriod & computePongPeriod method
long ping = Frame.Heartbeat.computePingPeriod(
Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)),
Frame.Heartbeat.create(connection.server().options().getHeartbeat()));
Frame.Heartbeat.create(connection.server().options().getHeartbeat()),
Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)));
long pong = Frame.Heartbeat.computePongPeriod(
Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)),
Frame.Heartbeat.create(connection.server().options().getHeartbeat()));
Frame.Heartbeat.create(connection.server().options().getHeartbeat()),
Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)));

connection.configureHeartbeat(ping, pong, pingH);

Expand Down Expand Up @@ -457,7 +457,7 @@ public StompServerHandler onAuthenticationRequest(StompServerConnection connecti
public User getUserBySession(String session) {
return this.users.get(session);
}

@Override
public List<Destination> getDestinations() {
return new ArrayList<>(destinations.keySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,12 +561,13 @@ private synchronized void handleConnected(Frame frame) {
server = frame.getHeader(Frame.SERVER);

// Compute the heartbeat.
// Stomp client acts as a client to call the computePingPeriod & computePongPeriod method
long ping = Frame.Heartbeat.computePingPeriod(
Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)),
Frame.Heartbeat.create(client.options().getHeartbeat()));
Frame.Heartbeat.create(client.options().getHeartbeat()),
Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)));
long pong = Frame.Heartbeat.computePongPeriod(
Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)),
Frame.Heartbeat.create(client.options().getHeartbeat()));
Frame.Heartbeat.create(client.options().getHeartbeat()),
Frame.Heartbeat.parse(frame.getHeader(Frame.HEARTBEAT)));

if (ping > 0) {
pinger = client.vertx().setPeriodic(ping, l -> pingHandler.handle(this));
Expand Down
43 changes: 43 additions & 0 deletions src/test/java/io/vertx/ext/stomp/impl/StompClientImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,49 @@ public void testServerHeartbeatWhenNoClientActivity() {
);
}

@Test
public void testAsymmetricHeartbeatTime() throws InterruptedException {
AtomicReference<StompClientConnection> reference = new AtomicReference<>();
AsyncLock lock = new AsyncLock<>();
server.close(lock.handler());
lock.waitForSuccess();
lock = new AsyncLock();

List<Long> serverReceivedPingTimestamps = new ArrayList<>();
server = StompServer.create(vertx,
new StompServerOptions().setHeartbeat(new JsonObject().put("x", 300).put("y", 200)))
.handler(StompServerHandler.create(vertx).receivedFrameHandler(frame -> {
if(Frame.Command.PING.equals(frame.frame().getCommand())) {
serverReceivedPingTimestamps.add(System.currentTimeMillis());
}
})).listen(lock.handler());


lock.waitForSuccess();

List<Long> clientReceivedPingTimestamps = new ArrayList<>();
StompClient client = StompClient.create(vertx, new StompClientOptions().setHeartbeat(new JsonObject()
.put("x", 100).put("y", 400)))
.receivedFrameHandler(frame -> {
if(Frame.Command.PING.equals(frame.getCommand())) {
clientReceivedPingTimestamps.add(System.currentTimeMillis());
}
});
client.connect(ar -> reference.set(ar.result()));

Thread.sleep(2000);

// The actual heartbeat of client is 200, assert it greater than 150 considering network delay
serverReceivedPingTimestamps.stream().reduce(0L, (x,y) -> {
assertTrue((y-x) > 150); return y;});

// The actual heartbeat of server is 400, assert it greater than 350 considering network delay
clientReceivedPingTimestamps.stream().reduce(0L,(x,y) -> {
assertTrue((y-x) > 350); return y;});

assertThat(reference.get().server()).isNotNull();
}

@Test
public void testConnectionDroppedHandler() throws InterruptedException {
AtomicBoolean flag = new AtomicBoolean(true);
Expand Down