Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allowing application to request logoutDisconnect in a session and spe… #488

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,13 @@
return string != null && string.length() > 0;
}

public long sendLogout(final int msgSeqNo, final int sequenceIndex, final int lastMsgSeqNumProcessed)
public long sendLogout(

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
SessionProxy.sendLogout
; it is advisable to add an Override annotation.
final int msgSeqNo,
final int sequenceIndex,
final int lastMsgSeqNumProcessed,
final byte[] text)
{
return sendLogout(msgSeqNo, null, 0, sequenceIndex, lastMsgSeqNumProcessed);
return sendLogout(msgSeqNo, text, text == null ? 0 : text.length, sequenceIndex, lastMsgSeqNumProcessed);
}

public long sendLogout(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ public class Session

private DisconnectReason pendingDisconnectReason;

private byte[] logoutText;

Session(
final int heartbeatIntervalInS,
final long connectionId,
Expand Down Expand Up @@ -587,6 +589,17 @@ public Reply<ThrottleConfigurationStatus> throttleMessagesAt(
id, throttleWindowInMs, throttleLimitOfMessages);
}

/**
* Default version of {@link Session#logoutAndDisconnect(byte[])}
wojciech-adaptive marked this conversation as resolved.
Show resolved Hide resolved
* The result logout message does not have any value in tag 58
*
* @return the position within the Aeron stream where the disconnect is encoded.
*/
public long logoutAndDisconnect()
{
return logoutAndDisconnect(APPLICATION_DISCONNECT);
}

/**
* Send a logout message and immediately disconnect the session. You should normally use
* the <code>startLogout</code> method and not this one.
Expand All @@ -596,20 +609,28 @@ public Reply<ThrottleConfigurationStatus> throttleMessagesAt(
* message. This should only be used when you want to rapidly disconnect the session and are willing
* to take the risk that the logout message is not received.
*
* @param text value to be assigned to tag 58 of logout message.
*
* @return the position within the Aeron stream where the disconnect is encoded.
* @see Session#startLogout()
*/
public long logoutAndDisconnect()
public long logoutAndDisconnect(final byte[] text)
{
return logoutAndDisconnect(APPLICATION_DISCONNECT);
return logoutAndDisconnect(APPLICATION_DISCONNECT, text);
}

long logoutAndDisconnect(final DisconnectReason reason)
{
return logoutAndDisconnect(reason, null);
}

long logoutAndDisconnect(final DisconnectReason reason, final byte[] text)
{
long position = NO_OPERATION;
if (state() != DISCONNECTED)
{
position = trySendLogout();
text(text);
position = trySendLogout(text);
wojciech-adaptive marked this conversation as resolved.
Show resolved Hide resolved
if (position < 0)
{
state(LOGGING_OUT_AND_DISCONNECTING);
Expand Down Expand Up @@ -2237,10 +2258,15 @@ private void incNextHeartbeatTime()
}

private long trySendLogout()
{
return trySendLogout(null);
}

private long trySendLogout(final byte[] text)
{
final int sentSeqNum = newSentSeqNum();
final long position = (logoutRejectReason == NO_LOGOUT_REJECT_REASON) ?
proxy.sendLogout(sentSeqNum, sequenceIndex(), lastMsgSeqNumProcessed) :
proxy.sendLogout(sentSeqNum, sequenceIndex(), lastMsgSeqNumProcessed, text) :
proxy.sendLogout(sentSeqNum, sequenceIndex(), logoutRejectReason, lastMsgSeqNumProcessed);
if (position >= 0)
{
Expand Down Expand Up @@ -2398,7 +2424,7 @@ int poll(final long timeInNs)

case LOGGING_OUT_AND_DISCONNECTING_VALUE:
{
logoutAndDisconnect(APPLICATION_DISCONNECT);
logoutAndDisconnect(APPLICATION_DISCONNECT, this.logoutText);

return actions + 1;
}
Expand Down Expand Up @@ -2725,4 +2751,9 @@ void disconnectOnFirstMessageNotLogon(final boolean disconnectOnFirstMessageNotL
{
this.disconnectOnFirstMessageNotLogon = disconnectOnFirstMessageNotLogon;
}

private void text(final byte[] logoutText)
{
this.logoutText = logoutText;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ long sendLogon(
CancelOnDisconnectOption cancelOnDisconnectOption,
int cancelOnDisconnectTimeoutWindowInMs);

long sendLogout(int msgSeqNo, int sequenceIndex, int lastMsgSeqNumProcessed);
long sendLogout(int msgSeqNo, int sequenceIndex, int lastMsgSeqNumProcessed, byte[] text);

long sendLogout(
int msgSeqNo, int sequenceIndex, int rejectReason, int lastMsgSeqNumProcessed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,10 @@ public void shouldReplyToValidLogoutWhenBackPressured()

private void backPressureLogout()
{
when(sessionProxy.sendLogout(anyInt(), eq(SEQUENCE_INDEX), anyInt())).thenReturn(BACK_PRESSURED, POSITION);
when(sessionProxy.sendLogout(anyInt(),
eq(SEQUENCE_INDEX),
anyInt(),
any())).thenReturn(BACK_PRESSURED, POSITION);

backPressureDisconnect();
}
Expand Down Expand Up @@ -659,7 +662,7 @@ public void shouldLogoutOnLogonWithLowSequenceNumberAndWithAsyncProxy()
private void verifyLogoutOnlyOnce()
{
verify(sessionProxy, times(1)).sendLogout(
anyInt(), eq(SEQUENCE_INDEX), eq(NO_LAST_MSG_SEQ_NUM_PROCESSED));
anyInt(), eq(SEQUENCE_INDEX), eq(NO_LAST_MSG_SEQ_NUM_PROCESSED), any());
}

@Test
Expand All @@ -680,6 +683,27 @@ public void shouldLogoutAndDisconnectUponTimeoutWhenBackPressured()
verifyLogout(9, times(2));
}

@Test
public void shouldKeepCustomLogoutTextWhenBackPressured()
{
shouldSendTestRequestUponTimeout();

fakeClock.advanceSeconds(1);

backPressureLogout();

final byte[] logoutTextBytes = "customText".getBytes();
session().logoutAndDisconnect(logoutTextBytes);

assertState(LOGGING_OUT_AND_DISCONNECTING);

poll();

assertState(DISCONNECTING);

verifyLogout(8, times(2), logoutTextBytes);
}

@Test
public void shouldSuppressTimeoutWhenMessageReceived()
{
Expand Down Expand Up @@ -1276,7 +1300,12 @@ protected void givenActive()

public void verifyLogout(final int msgSeqNo, final VerificationMode times)
{
verify(sessionProxy, times).sendLogout(msgSeqNo, SEQUENCE_INDEX, NO_LAST_MSG_SEQ_NUM_PROCESSED);
verifyLogout(msgSeqNo, times, null);
}

public void verifyLogout(final int msgSeqNo, final VerificationMode times, final byte[] text)
{
verify(sessionProxy, times).sendLogout(msgSeqNo, SEQUENCE_INDEX, NO_LAST_MSG_SEQ_NUM_PROCESSED, text);
}

public void assertState(final SessionState state)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,17 @@ public void shouldBeActivatedBySuccessfulLogin()
assertState(ACTIVE);
}

@Test
public void shouldRequestLogoutWithCustomText()
{
shouldBeActivatedBySuccessfulLogin();
final byte[] text = "custom text".getBytes();

session.logoutAndDisconnect(text);

verifyLogout(2, times(1), text);
}

@Test
public void shouldRequestResendIfHighSeqNoLogon()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,13 +408,18 @@
return acceptingSessionWriter.send(logon, adjustedMsgSeqNo);
}

public long sendLogout(final int msgSeqNo, final int sequenceIndex, final int lastMsgSeqNumProcessed)
public long sendLogout(

Check notice

Code scanning / CodeQL

Missing Override annotation Note test

This method overrides
SessionProxy.sendLogout
; it is advisable to add an Override annotation.
final int msgSeqNo, final int sequenceIndex, final int lastMsgSeqNumProcessed, final byte[] text)
{
sentLogouts++;

final int adjustedMsgSeqNo = msgSeqNo + sequenceNumberAdjustment;
final HeaderEncoder header = logout.header();
setupHeader(header, adjustedMsgSeqNo);
if (text != null)
{
logout.text(text);
}

if (send)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,21 @@ public void initiatorSessionCanBeDisconnected()
assertSequenceIndicesAre(0);
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void initiatorSessionCanBeDisconnectedCustomLogoutMessage()
{
acquireAcceptingSession();

final String customLogoutMessage = "custom logout message";
logoutSessionWithCustomTextAndDisconnect(initiatingSession, customLogoutMessage);

assertSessionsDisconnected();
testSystem.await("logout text does not match", () ->
acceptingOtfAcceptor.lastReceivedMessage().get(58).equals(customLogoutMessage));

assertSequenceIndicesAre(0);
}

@Test(timeout = TEST_TIMEOUT_IN_MS)
public void acceptorSessionCanBeDisconnected()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,13 @@ public static long logoutSession(final Session session)
return position;
}

public static long logoutSessionWithCustomTextAndDisconnect(final Session session, final String text)
{
final long position = session.logoutAndDisconnect(text == null ? null : text.getBytes());
assertThat(position, greaterThan(0L));
return position;
}

public static long logoutSession(final TestSystem testSystem, final Session session)
{
return testSystem.awaitSend(session::startLogout);
Expand Down
Loading