Skip to content

Commit

Permalink
Allowing application to request logoutDisconnect in a session and spe…
Browse files Browse the repository at this point in the history
…cify a custom text (#488)
  • Loading branch information
lucianoviana authored Jul 20, 2023
1 parent c79a3eb commit 6b9e298
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,13 @@ private boolean notNullOrEmpty(final String string)
return string != null && string.length() > 0;
}

public long sendLogout(final int msgSeqNo, final int sequenceIndex, final int lastMsgSeqNumProcessed)
public long sendLogout(
final int msgSeqNo,
final int sequenceIndex,
final int lastMsgSeqNumProcessed,
final byte[] text)
{
return sendLogout(msgSeqNo, null, 0, sequenceIndex, lastMsgSeqNumProcessed);
return sendLogout(msgSeqNo, text, text == null ? 0 : text.length, sequenceIndex, lastMsgSeqNumProcessed);
}

public long sendLogout(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ public class Session

private DisconnectReason pendingDisconnectReason;

private byte[] logoutText;

Session(
final int heartbeatIntervalInS,
final long connectionId,
Expand Down Expand Up @@ -587,6 +589,17 @@ public Reply<ThrottleConfigurationStatus> throttleMessagesAt(
id, throttleWindowInMs, throttleLimitOfMessages);
}

/**
* Default version of {@link Session#logoutAndDisconnect(byte[])}
* The result logout message does not have any value in tag 58
*
* @return the position within the Aeron stream where the disconnect is encoded.
*/
public long logoutAndDisconnect()
{
return logoutAndDisconnect(APPLICATION_DISCONNECT);
}

/**
* Send a logout message and immediately disconnect the session. You should normally use
* the <code>startLogout</code> method and not this one.
Expand All @@ -596,20 +609,28 @@ public Reply<ThrottleConfigurationStatus> throttleMessagesAt(
* message. This should only be used when you want to rapidly disconnect the session and are willing
* to take the risk that the logout message is not received.
*
* @param text value to be assigned to tag 58 of logout message.
*
* @return the position within the Aeron stream where the disconnect is encoded.
* @see Session#startLogout()
*/
public long logoutAndDisconnect()
public long logoutAndDisconnect(final byte[] text)
{
return logoutAndDisconnect(APPLICATION_DISCONNECT);
return logoutAndDisconnect(APPLICATION_DISCONNECT, text);
}

long logoutAndDisconnect(final DisconnectReason reason)
{
return logoutAndDisconnect(reason, null);
}

long logoutAndDisconnect(final DisconnectReason reason, final byte[] text)
{
long position = NO_OPERATION;
if (state() != DISCONNECTED)
{
position = trySendLogout();
text(text);
position = trySendLogout(text);
if (position < 0)
{
state(LOGGING_OUT_AND_DISCONNECTING);
Expand Down Expand Up @@ -2237,10 +2258,15 @@ private void incNextHeartbeatTime()
}

private long trySendLogout()
{
return trySendLogout(null);
}

private long trySendLogout(final byte[] text)
{
final int sentSeqNum = newSentSeqNum();
final long position = (logoutRejectReason == NO_LOGOUT_REJECT_REASON) ?
proxy.sendLogout(sentSeqNum, sequenceIndex(), lastMsgSeqNumProcessed) :
proxy.sendLogout(sentSeqNum, sequenceIndex(), lastMsgSeqNumProcessed, text) :
proxy.sendLogout(sentSeqNum, sequenceIndex(), logoutRejectReason, lastMsgSeqNumProcessed);
if (position >= 0)
{
Expand Down Expand Up @@ -2398,7 +2424,7 @@ int poll(final long timeInNs)

case LOGGING_OUT_AND_DISCONNECTING_VALUE:
{
logoutAndDisconnect(APPLICATION_DISCONNECT);
logoutAndDisconnect(APPLICATION_DISCONNECT, this.logoutText);

return actions + 1;
}
Expand Down Expand Up @@ -2725,4 +2751,9 @@ void disconnectOnFirstMessageNotLogon(final boolean disconnectOnFirstMessageNotL
{
this.disconnectOnFirstMessageNotLogon = disconnectOnFirstMessageNotLogon;
}

private void text(final byte[] logoutText)
{
this.logoutText = logoutText;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ long sendLogon(
CancelOnDisconnectOption cancelOnDisconnectOption,
int cancelOnDisconnectTimeoutWindowInMs);

long sendLogout(int msgSeqNo, int sequenceIndex, int lastMsgSeqNumProcessed);
long sendLogout(int msgSeqNo, int sequenceIndex, int lastMsgSeqNumProcessed, byte[] text);

long sendLogout(
int msgSeqNo, int sequenceIndex, int rejectReason, int lastMsgSeqNumProcessed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,10 @@ public void shouldReplyToValidLogoutWhenBackPressured()

private void backPressureLogout()
{
when(sessionProxy.sendLogout(anyInt(), eq(SEQUENCE_INDEX), anyInt())).thenReturn(BACK_PRESSURED, POSITION);
when(sessionProxy.sendLogout(anyInt(),
eq(SEQUENCE_INDEX),
anyInt(),
any())).thenReturn(BACK_PRESSURED, POSITION);

backPressureDisconnect();
}
Expand Down Expand Up @@ -659,7 +662,7 @@ public void shouldLogoutOnLogonWithLowSequenceNumberAndWithAsyncProxy()
private void verifyLogoutOnlyOnce()
{
verify(sessionProxy, times(1)).sendLogout(
anyInt(), eq(SEQUENCE_INDEX), eq(NO_LAST_MSG_SEQ_NUM_PROCESSED));
anyInt(), eq(SEQUENCE_INDEX), eq(NO_LAST_MSG_SEQ_NUM_PROCESSED), any());
}

@Test
Expand All @@ -680,6 +683,27 @@ public void shouldLogoutAndDisconnectUponTimeoutWhenBackPressured()
verifyLogout(9, times(2));
}

@Test
public void shouldKeepCustomLogoutTextWhenBackPressured()
{
shouldSendTestRequestUponTimeout();

fakeClock.advanceSeconds(1);

backPressureLogout();

final byte[] logoutTextBytes = "customText".getBytes();
session().logoutAndDisconnect(logoutTextBytes);

assertState(LOGGING_OUT_AND_DISCONNECTING);

poll();

assertState(DISCONNECTING);

verifyLogout(8, times(2), logoutTextBytes);
}

@Test
public void shouldSuppressTimeoutWhenMessageReceived()
{
Expand Down Expand Up @@ -1276,7 +1300,12 @@ protected void givenActive()

public void verifyLogout(final int msgSeqNo, final VerificationMode times)
{
verify(sessionProxy, times).sendLogout(msgSeqNo, SEQUENCE_INDEX, NO_LAST_MSG_SEQ_NUM_PROCESSED);
verifyLogout(msgSeqNo, times, null);
}

public void verifyLogout(final int msgSeqNo, final VerificationMode times, final byte[] text)
{
verify(sessionProxy, times).sendLogout(msgSeqNo, SEQUENCE_INDEX, NO_LAST_MSG_SEQ_NUM_PROCESSED, text);
}

public void assertState(final SessionState state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ public void shouldBeActivatedBySuccessfulLogin()
assertState(ACTIVE);
}

@Test
public void shouldRequestLogoutWithCustomText()
{
shouldBeActivatedBySuccessfulLogin();
final byte[] text = "custom text".getBytes();

session.logoutAndDisconnect(text);

verifyLogout(2, times(1), text);
}

@Test
public void shouldRequestResendIfHighSeqNoLogon()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,18 @@ public long sendLogon(
return acceptingSessionWriter.send(logon, adjustedMsgSeqNo);
}

public long sendLogout(final int msgSeqNo, final int sequenceIndex, final int lastMsgSeqNumProcessed)
public long sendLogout(
final int msgSeqNo, final int sequenceIndex, final int lastMsgSeqNumProcessed, final byte[] text)
{
sentLogouts++;

final int adjustedMsgSeqNo = msgSeqNo + sequenceNumberAdjustment;
final HeaderEncoder header = logout.header();
setupHeader(header, adjustedMsgSeqNo);
if (text != null)
{
logout.text(text);
}

if (send)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,21 @@ public void initiatorSessionCanBeDisconnected()
assertSequenceIndicesAre(0);
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void initiatorSessionCanBeDisconnectedCustomLogoutMessage()
{
acquireAcceptingSession();

final String customLogoutMessage = "custom logout message";
logoutSessionWithCustomTextAndDisconnect(initiatingSession, customLogoutMessage);

assertSessionsDisconnected();
testSystem.await("logout text does not match", () ->
acceptingOtfAcceptor.lastReceivedMessage().get(58).equals(customLogoutMessage));

assertSequenceIndicesAre(0);
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void acceptorSessionCanBeDisconnected()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ public static long logoutSession(final Session session)
return position;
}

public static long logoutSessionWithCustomTextAndDisconnect(final Session session, final String text)
{
final long position = session.logoutAndDisconnect(text == null ? null : text.getBytes());
assertThat(position, greaterThan(0L));
return position;
}

public static long logoutSession(final TestSystem testSystem, final Session session)
{
return testSystem.awaitSend(session::startLogout);
Expand Down

0 comments on commit 6b9e298

Please sign in to comment.