Skip to content

Commit

Permalink
Allowing application to request logoutDisconnect in a session and spe…
Browse files Browse the repository at this point in the history
…cify a custom text
  • Loading branch information
lucianoviana committed Jul 17, 2023
1 parent c79a3eb commit 9e162d7
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,13 @@ private boolean notNullOrEmpty(final String string)
return string != null && string.length() > 0;
}

public long sendLogout(final int msgSeqNo, final int sequenceIndex, final int lastMsgSeqNumProcessed)
public long sendLogout(

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
SessionProxy.sendLogout
; it is advisable to add an Override annotation.
final int msgSeqNo,
final int sequenceIndex,
final int lastMsgSeqNumProcessed,
final byte[] text)
{
return sendLogout(msgSeqNo, null, 0, sequenceIndex, lastMsgSeqNumProcessed);
return sendLogout(msgSeqNo, text, text == null ? 0 : text.length, sequenceIndex, lastMsgSeqNumProcessed);
}

public long sendLogout(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,16 @@ public Reply<ThrottleConfigurationStatus> throttleMessagesAt(
id, throttleWindowInMs, throttleLimitOfMessages);
}

/**
* Default version of {@link Session#logoutAndDisconnect(byte[])}
*
* @return the position within the Aeron stream where the disconnect is encoded.
*/
public long logoutAndDisconnect()
{
return logoutAndDisconnect(APPLICATION_DISCONNECT);
}

/**
* Send a logout message and immediately disconnect the session. You should normally use
* the <code>startLogout</code> method and not this one.
Expand All @@ -596,20 +606,27 @@ public Reply<ThrottleConfigurationStatus> throttleMessagesAt(
* message. This should only be used when you want to rapidly disconnect the session and are willing
* to take the risk that the logout message is not received.
*
* @param text value to be assigned to tag 50 of logout message.
*
* @return the position within the Aeron stream where the disconnect is encoded.
* @see Session#startLogout()
*/
public long logoutAndDisconnect()
public long logoutAndDisconnect(final byte[] text)
{
return logoutAndDisconnect(APPLICATION_DISCONNECT);
return logoutAndDisconnect(APPLICATION_DISCONNECT, text);
}

long logoutAndDisconnect(final DisconnectReason reason)
{
return logoutAndDisconnect(reason, null);
}

long logoutAndDisconnect(final DisconnectReason reason, final byte[] text)
{
long position = NO_OPERATION;
if (state() != DISCONNECTED)
{
position = trySendLogout();
position = trySendLogout(text);
if (position < 0)
{
state(LOGGING_OUT_AND_DISCONNECTING);
Expand Down Expand Up @@ -2237,10 +2254,15 @@ private void incNextHeartbeatTime()
}

private long trySendLogout()
{
return trySendLogout(null);
}

private long trySendLogout(final byte[] text)
{
final int sentSeqNum = newSentSeqNum();
final long position = (logoutRejectReason == NO_LOGOUT_REJECT_REASON) ?
proxy.sendLogout(sentSeqNum, sequenceIndex(), lastMsgSeqNumProcessed) :
proxy.sendLogout(sentSeqNum, sequenceIndex(), lastMsgSeqNumProcessed, text) :
proxy.sendLogout(sentSeqNum, sequenceIndex(), logoutRejectReason, lastMsgSeqNumProcessed);
if (position >= 0)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ long sendLogon(
CancelOnDisconnectOption cancelOnDisconnectOption,
int cancelOnDisconnectTimeoutWindowInMs);

long sendLogout(int msgSeqNo, int sequenceIndex, int lastMsgSeqNumProcessed);
long sendLogout(int msgSeqNo, int sequenceIndex, int lastMsgSeqNumProcessed, byte[] text);

long sendLogout(
int msgSeqNo, int sequenceIndex, int rejectReason, int lastMsgSeqNumProcessed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,10 @@ public void shouldReplyToValidLogoutWhenBackPressured()

private void backPressureLogout()
{
when(sessionProxy.sendLogout(anyInt(), eq(SEQUENCE_INDEX), anyInt())).thenReturn(BACK_PRESSURED, POSITION);
when(sessionProxy.sendLogout(anyInt(),
eq(SEQUENCE_INDEX),
anyInt(),
any())).thenReturn(BACK_PRESSURED, POSITION);

backPressureDisconnect();
}
Expand Down Expand Up @@ -659,7 +662,7 @@ public void shouldLogoutOnLogonWithLowSequenceNumberAndWithAsyncProxy()
private void verifyLogoutOnlyOnce()
{
verify(sessionProxy, times(1)).sendLogout(
anyInt(), eq(SEQUENCE_INDEX), eq(NO_LAST_MSG_SEQ_NUM_PROCESSED));
anyInt(), eq(SEQUENCE_INDEX), eq(NO_LAST_MSG_SEQ_NUM_PROCESSED), any());
}

@Test
Expand Down Expand Up @@ -1276,7 +1279,12 @@ protected void givenActive()

public void verifyLogout(final int msgSeqNo, final VerificationMode times)
{
verify(sessionProxy, times).sendLogout(msgSeqNo, SEQUENCE_INDEX, NO_LAST_MSG_SEQ_NUM_PROCESSED);
verifyLogout(msgSeqNo, times, null);
}

public void verifyLogout(final int msgSeqNo, final VerificationMode times, final byte[] text)
{
verify(sessionProxy, times).sendLogout(msgSeqNo, SEQUENCE_INDEX, NO_LAST_MSG_SEQ_NUM_PROCESSED, text);
}

public void assertState(final SessionState state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ public void shouldBeActivatedBySuccessfulLogin()
assertState(ACTIVE);
}

@Test
public void shouldRequestLogoutWithCustomText()
{
shouldBeActivatedBySuccessfulLogin();
final byte[] text = "custom text".getBytes();

session.logoutAndDisconnect(text);

verifyLogout(2, times(1), text);
}

@Test
public void shouldRequestResendIfHighSeqNoLogon()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,18 @@ public long sendLogon(
return acceptingSessionWriter.send(logon, adjustedMsgSeqNo);
}

public long sendLogout(final int msgSeqNo, final int sequenceIndex, final int lastMsgSeqNumProcessed)
public long sendLogout(

Check notice

Code scanning / CodeQL

Missing Override annotation Note test

This method overrides
SessionProxy.sendLogout
; it is advisable to add an Override annotation.
final int msgSeqNo, final int sequenceIndex, final int lastMsgSeqNumProcessed, final byte[] text)
{
sentLogouts++;

final int adjustedMsgSeqNo = msgSeqNo + sequenceNumberAdjustment;
final HeaderEncoder header = logout.header();
setupHeader(header, adjustedMsgSeqNo);
if (text != null)
{
logout.text(text);
}

if (send)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,21 @@ public void initiatorSessionCanBeDisconnected()
assertSequenceIndicesAre(0);
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void initiatorSessionCanBeDisconnectedCustomLogoutMessage()
{
acquireAcceptingSession();

String customLogoutMessage = "custom logout message";
logoutSession(initiatingSession, customLogoutMessage);

assertSessionsDisconnected();
testSystem.await("logout text does not match", () ->
acceptingOtfAcceptor.lastReceivedMessage().get(58).equals(customLogoutMessage));

assertSequenceIndicesAre(0);
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void acceptorSessionCanBeDisconnected()
{
Expand Down Expand Up @@ -567,7 +582,7 @@ public void sequenceNumbersShouldResetOverDisconnects()
messagesCanBeExchanged();
assertSequenceFromInitToAcceptAt(2, 2);

logoutSession(initiatingSession);
logoutSession(initiatingSession, null);

assertSequenceIndicesAre(0);
assertSessionsDisconnected();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -576,13 +576,18 @@ public static long disconnectSession(final Session session)
return position;
}

public static long logoutSession(final Session session)
public static long logoutSession(final Session session, final String text)
{
final long position = session.startLogout();
final long position = session.logoutAndDisconnect(text == null ? null : text.getBytes());
assertThat(position, greaterThan(0L));
return position;
}

public static long logoutSession(final Session session)
{
return logoutSession(session, null);
}

public static long logoutSession(final TestSystem testSystem, final Session session)
{
return testSystem.awaitSend(session::startLogout);
Expand Down

0 comments on commit 9e162d7

Please sign in to comment.