Skip to content

Commit

Permalink
Add the ability to reply asynchronously to a ResendRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
pcdv committed Apr 4, 2024
1 parent f16b04c commit 601f665
Show file tree
Hide file tree
Showing 6 changed files with 326 additions and 65 deletions.
2 changes: 2 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ indent_size = 4
max_line_length = 120
ij_java_method_brace_style = next_line
ij_java_block_brace_style = next_line
ij_java_else_on_new_line = true
ij_java_class_brace_style = next_line
ij_java_space_after_type_cast = false
ij_any_catch_on_new_line = true
ij_any_spaces_around_equality_operators = true
ij_java_continuation_indent_size = 4
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package uk.co.real_logic.artio.session;

import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder;
import uk.co.real_logic.artio.util.AsciiBuffer;

/**
* Customer interface to control whether resend requests are responded to or not.
Expand Down Expand Up @@ -45,6 +46,34 @@ void onResend(
int correctedEndSeqNo,
ResendRequestResponse response);

/**
* Alternative version of the callback. It receives the buffer containing the ResendRequest message, so that it
* can be provided when delaying the execution of the request. Don't forget to make a copy of the message, as
* the buffer is mutable.
*
* @param session the session that has received the resend request.
* @param resendRequest the decoded resend request in question.
* @param correctedEndSeqNo the end sequence number that Artio will reply with. This is useful if, for example, the
* resend request uses 0 for its endSeqNo parameter.
* @param response respond to the resend request by calling methods on this object.
* @param messageBuffer buffer containing the ResendRequest message
* @param messageOffset offset of message in buffer
* @param messageLength length of message in buffer
* @see Session#executeResendRequest(int, AsciiBuffer, int, int, int)
*/
default void onResend(
Session session,
AbstractResendRequestDecoder resendRequest,
int correctedEndSeqNo,
ResendRequestResponse response,
AsciiBuffer messageBuffer,
int messageOffset,
int messageLength
)
{
onResend(session, resendRequest, correctedEndSeqNo, response);
}

/**
* This method is invoked when a Session identifies that a resend is complete. It is invoked on the thread
* that the Library is polled on that owns the Session in question.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
package uk.co.real_logic.artio.session;

import uk.co.real_logic.artio.builder.AbstractRejectEncoder;
import uk.co.real_logic.artio.util.AsciiBuffer;

public class ResendRequestResponse
{
private boolean result;
private boolean delay;

private int refTagId;
private AbstractRejectEncoder rejectEncoder;
Expand All @@ -30,6 +32,7 @@ public class ResendRequestResponse
public void resend()
{
result = true;
delay = false;
}

/**
Expand All @@ -42,13 +45,15 @@ public void reject(final int refTagId)
this.refTagId = refTagId;

result = false;
delay = false;
}

public void reject(final AbstractRejectEncoder rejectEncoder)
{
this.rejectEncoder = rejectEncoder;

result = false;
delay = false;
}

AbstractRejectEncoder rejectEncoder()
Expand All @@ -65,4 +70,28 @@ int refTagId()
{
return refTagId;
}

/**
* Since version 0.148(?) it is possible to postpone the execution of a ResendRequest. This method indicates
* that the request must not be processed nor rejected. It is the responsibility of the caller to call
* Session.executeResendRequest() when ready.
*
* @see Session#executeResendRequest(int, AsciiBuffer, int, int, int)
* @return true if response to the request must not be done immediately
*/
public boolean shouldDelay()
{
return delay;
}

/**
* This method indicates that the request must not be processed nor rejected. It is the responsibility of
* the caller to call Session.executeResendRequest() when ready.
*
* @see Session#executeResendRequest(int, AsciiBuffer, int, int, int)
*/
public void delay()
{
delay = true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2097,50 +2097,92 @@ Action onResendRequest(
final ResendRequestResponse resendRequestResponse = this.resendRequestResponse;
if (!backpressuredResendRequestResponse)
{
resendRequestController.onResend(this, resendRequest, correctedEndSeqNo, resendRequestResponse);
resendRequestController.onResend(this, resendRequest, correctedEndSeqNo, resendRequestResponse,
messageBuffer, messageOffset, messageLength);
}

if (resendRequestResponse.result())
{
final long correlationId = generateReplayCorrelationId();

// Notify the sender end point that a replay is going to happen.
if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest)
return executeResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
oldLastReceivedMsgSeqNum);
}
else if (!resendRequestResponse.shouldDelay())
{
final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder();
if (rejectEncoder != null)
{
if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
correlationId, outboundPublication))
{
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
backpressuredResendRequestResponse = true;
backpressuredOutboundValidResendRequest = true;
return ABORT;
}

backpressuredOutboundValidResendRequest = false;
return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder);
}

return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum);
}
else
{
return CONTINUE;
}
}

private Action executeResendRequest(
final int beginSeqNum, final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength,
final int correctedEndSeqNo, final int oldLastReceivedMsgSeqNum
)
{
final long correlationId = generateReplayCorrelationId();

// Notify the sender end point that a replay is going to happen.
if (!backpressuredResendRequestResponse || backpressuredOutboundValidResendRequest)
{
if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
correlationId, inboundPublication))
correlationId, outboundPublication))
{
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
if (lastReceivedMsgSeqNum >= 0)
{
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
}
backpressuredResendRequestResponse = true;
backpressuredOutboundValidResendRequest = true;
return ABORT;
}

backpressuredResendRequestResponse = false;
replaysInFlight++;
return CONTINUE;
backpressuredOutboundValidResendRequest = false;
}
else

if (saveValidResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo,
correlationId, inboundPublication))
{
final AbstractRejectEncoder rejectEncoder = resendRequestResponse.rejectEncoder();
if (rejectEncoder != null)
if (lastReceivedMsgSeqNum >= 0)
{
return sendCustomReject(oldLastReceivedMsgSeqNum, rejectEncoder);
lastReceivedMsgSeqNum(oldLastReceivedMsgSeqNum);
}

return sendReject(msgSeqNum, resendRequestResponse.refTagId(), OTHER, oldLastReceivedMsgSeqNum);
backpressuredResendRequestResponse = true;
return ABORT;
}

backpressuredResendRequestResponse = false;
replaysInFlight++;
return CONTINUE;
}


/**
* Executes a resend request. Used to be done immediately when receiving such a request, but
* it is now possible to delay the execution, so this method must be called when ready.
*
* @param beginSeqNum begin sequence number found in received ResendRequest
* @param messageBuffer buffer containing the ResendRequest message
* @param messageOffset offset of message in buffer
* @param messageLength length of message in buffer
* @param correctedEndSeqNo corrected end sequence number
* @return an Action: be sure to handle back pressure!
* @see ResendRequestController#onResend(Session, AbstractResendRequestDecoder, int, ResendRequestResponse, AsciiBuffer, int, int)
*/
public Action executeResendRequest(
final int beginSeqNum,
final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength,
final int correctedEndSeqNo
)
{
return executeResendRequest(beginSeqNum, messageBuffer, messageOffset, messageLength, correctedEndSeqNo, -1);
}

private Action sendCustomReject(final int oldLastReceivedMsgSeqNum, final AbstractRejectEncoder rejectEncoder)
Expand Down
Loading

0 comments on commit 601f665

Please sign in to comment.