Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intermittency #491

Merged
merged 2 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class EngineContext implements AutoCloseable
private ReplayQuery pruneInboundReplayQuery;
private ReplayQuery outboundReplayQuery;
private FramerContext framerContext;
private long inboundIndexRegistrationId;
private long outboundIndexRegistrationId;

EngineContext(
Expand Down Expand Up @@ -281,9 +282,12 @@ private void newIndexers()
}
inboundIndices.add(receivedSequenceNumberIndex);

final Subscription inboundIndexSubscription = inboundLibraryStreams.subscription("inboundIndexer");
inboundIndexRegistrationId = inboundIndexSubscription.registrationId();

inboundIndexer = new Indexer(
inboundIndices,
inboundLibraryStreams.subscription("inboundIndexer"),
inboundIndexSubscription,
configuration.agentNamePrefix(),
inboundCompletionPosition,
configuration.archiveReplayStream());
Expand Down Expand Up @@ -322,6 +326,11 @@ private void newIndexers()
}
}

public long inboundIndexRegistrationId()
{
return inboundIndexRegistrationId;
}

public long outboundIndexRegistrationId()
{
return outboundIndexRegistrationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.status.ReadablePosition;
import uk.co.real_logic.artio.*;
import uk.co.real_logic.artio.engine.framer.EngineStreamInfo;
import uk.co.real_logic.artio.engine.framer.FramerContext;
import uk.co.real_logic.artio.engine.framer.LibraryInfo;
import uk.co.real_logic.artio.timing.EngineTimers;
Expand Down Expand Up @@ -525,4 +526,14 @@ public Reply<?> startReproduction()

return framerContext.startReproduction();
}

/**
* Returns information about engine streams. Internal API for testing.
*
* @return a reply that should eventually contain information about engine streams or {@code null} if back-pressured
*/
Reply<EngineStreamInfo> engineStreamInfo()
{
return framerContext.engineStreamInfo();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package uk.co.real_logic.artio.engine.framer;

public final class EngineStreamInfo
{
private final long inboundIndexSubscriptionRegistrationId;
private final long outboundIndexSubscriptionRegistrationId;
private final int inboundPublicationSessionId;
private final long inboundPublicationPosition;
private final int outboundPublicationSessionId;
private final long outboundPublicationPosition;

EngineStreamInfo(
final long inboundIndexSubscriptionRegistrationId,
final long outboundIndexSubscriptionRegistrationId,
final int inboundPublicationSessionId,
final long inboundPublicationPosition,
final int outboundPublicationSessionId,
final long outboundPublicationPosition)
{
this.inboundIndexSubscriptionRegistrationId = inboundIndexSubscriptionRegistrationId;
this.outboundIndexSubscriptionRegistrationId = outboundIndexSubscriptionRegistrationId;
this.inboundPublicationSessionId = inboundPublicationSessionId;
this.inboundPublicationPosition = inboundPublicationPosition;
this.outboundPublicationSessionId = outboundPublicationSessionId;
this.outboundPublicationPosition = outboundPublicationPosition;
}

public long inboundIndexSubscriptionRegistrationId()
{
return inboundIndexSubscriptionRegistrationId;
}

public long outboundIndexSubscriptionRegistrationId()
{
return outboundIndexSubscriptionRegistrationId;
}

public int inboundPublicationSessionId()
{
return inboundPublicationSessionId;
}

public long inboundPublicationPosition()
{
return inboundPublicationPosition;
}

public int outboundPublicationSessionId()
{
return outboundPublicationSessionId;
}

public long outboundPublicationPosition()
{
return outboundPublicationPosition;
}

public String toString()

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
Object.toString
; it is advisable to add an Override annotation.
{
return "EngineStreamInfo{" +
"inboundIndexSubscriptionRegistrationId=" + inboundIndexSubscriptionRegistrationId +
", outboundIndexSubscriptionRegistrationId=" + outboundIndexSubscriptionRegistrationId +
", inboundPublicationSessionId=" + inboundPublicationSessionId +
", inboundPublicationPosition=" + inboundPublicationPosition +
", outboundPublicationSessionId=" + outboundPublicationSessionId +
", outboundPublicationPosition=" + outboundPublicationPosition +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package uk.co.real_logic.artio.engine.framer;

import uk.co.real_logic.artio.Reply;

final class EngineStreamInfoRequestCommand implements AdminCommand, Reply<EngineStreamInfo>
{
private volatile State state = State.EXECUTING;

private EngineStreamInfo engineStreamInfo;

public Throwable error()

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
Reply.error
; it is advisable to add an Override annotation.
{
return null;
}

public EngineStreamInfo resultIfPresent()

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
Reply.resultIfPresent
; it is advisable to add an Override annotation.
{
return engineStreamInfo;
}

public State state()

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
Reply.state
; it is advisable to add an Override annotation.
{
return state;
}

public void execute(final Framer framer)

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
AdminCommand.execute
; it is advisable to add an Override annotation.
{
framer.onEngineStreamInfoRequest(this);
}

public void complete(final EngineStreamInfo engineStreamInfo)
{
this.engineStreamInfo = engineStreamInfo;
state = State.COMPLETED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
private final ReceiverEndPoints receiverEndPoints;
private final FixSenderEndPoints fixSenderEndPoints;
private final CountersReader countersReader;
private final long inboundIndexRegistrationId;
private final long outboundIndexRegistrationId;
private final SenderSequenceNumbers senderSequenceNumbers;
private final ReproductionLogWriter reproductionLogWriter;
Expand Down Expand Up @@ -228,6 +229,7 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
final RecordingCoordinator recordingCoordinator,
final FixPContexts fixPContexts,
final CountersReader countersReader,
final long inboundIndexRegistrationId,
final long outboundIndexRegistrationId,
final FixCounters fixCounters,
final SenderSequenceNumbers senderSequenceNumbers,
Expand All @@ -254,6 +256,7 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
this.outboundLibraryCompletionPosition = outboundLibraryCompletionPosition;
this.fixSenderEndPoints = new FixSenderEndPoints(errorHandler);
this.countersReader = countersReader;
this.inboundIndexRegistrationId = inboundIndexRegistrationId;
this.outboundIndexRegistrationId = outboundIndexRegistrationId;
this.senderSequenceNumbers = senderSequenceNumbers;
this.reproductionLogWriter = reproductionLogWriter;
Expand Down Expand Up @@ -3490,6 +3493,17 @@ public void onPositionRequest(final PositionRequestCommand command)
command.position(new UnsafeBufferPosition((UnsafeBuffer)countersReader.valuesBuffer(), counterId));
}

public void onEngineStreamInfoRequest(final EngineStreamInfoRequestCommand command)
{
command.complete(new EngineStreamInfo(
inboundIndexRegistrationId,
outboundIndexRegistrationId,
inboundPublication.sessionId(),
inboundPublication.position(),
outboundPublication.sessionId(),
outboundPublication.position()));
}

public void onWriteMetaDataResponse(final WriteMetaDataResponse response)
{
schedule(() -> inboundPublication.saveWriteMetaDataReply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public FramerContext(
recordingCoordinator,
fixPContexts,
aeron.countersReader(),
engineContext.inboundIndexRegistrationId(),
engineContext.outboundIndexRegistrationId(),
fixCounters,
engineContext.senderSequenceNumbers(),
Expand Down Expand Up @@ -425,4 +426,16 @@ public Reply<?> startReproduction()
}
return command;
}

public Reply<EngineStreamInfo> engineStreamInfo()
{
final EngineStreamInfoRequestCommand command = new EngineStreamInfoRequestCommand();

if (adminCommands.offer(command))
{
return command;
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -535,4 +535,13 @@ protected boolean shouldRethrowExceptionInErrorHandler()
return RETHROW_EXCEPTION.get();
}

/**
* Returns information about library streams. Internal API for testing.
*
* @return information about library streams
*/
LibraryStreamInfo libraryStreamInfo()
{
return poller.libraryStreamInfo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2427,6 +2427,15 @@ public List<FixPConnection> fixPConnections()
{
return (List<FixPConnection>)(List<?>)unmodifiableFixPConnections;
}

LibraryStreamInfo libraryStreamInfo()
{
return new LibraryStreamInfo(
inboundPublication.sessionId(),
inboundPublication.position(),
outboundPublication.sessionId(),
outboundPublication.position());
}
}

class UnmodifiableWrapper<T> extends AbstractList<T>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package uk.co.real_logic.artio.library;

public final class LibraryStreamInfo
{
private final int inboundPublicationSessionId;
private final long inboundPublicationPosition;
private final int outboundPublicationSessionId;
private final long outboundPublicationPosition;

LibraryStreamInfo(
final int inboundPublicationSessionId,
final long inboundPublicationPosition,
final int outboundPublicationSessionId,
final long outboundPublicationPosition)
{
this.inboundPublicationSessionId = inboundPublicationSessionId;
this.inboundPublicationPosition = inboundPublicationPosition;
this.outboundPublicationSessionId = outboundPublicationSessionId;
this.outboundPublicationPosition = outboundPublicationPosition;
}

public int inboundPublicationSessionId()
{
return inboundPublicationSessionId;
}

public long inboundPublicationPosition()
{
return inboundPublicationPosition;
}

public int outboundPublicationSessionId()
{
return outboundPublicationSessionId;
}

public long outboundPublicationPosition()
{
return outboundPublicationPosition;
}

public String toString()

Check notice

Code scanning / CodeQL

Missing Override annotation Note

This method overrides
Object.toString
; it is advisable to add an Override annotation.
{
return "LibraryStreamInfo{" +
"inboundPublicationSessionId=" + inboundPublicationSessionId +
", inboundPublicationPosition=" + inboundPublicationPosition +
", outboundPublicationSessionId=" + outboundPublicationSessionId +
", outboundPublicationPosition=" + outboundPublicationPosition +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public void setUp() throws IOException
mock(RecordingCoordinator.class),
mock(FixPContexts.class),
mock(CountersReader.class),
2,
1,
mock(FixCounters.class),
mock(SenderSequenceNumbers.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package uk.co.real_logic.artio.engine;

import uk.co.real_logic.artio.Reply;
import uk.co.real_logic.artio.engine.framer.EngineStreamInfo;

public final class FixEngineInternals
{
public static Reply<EngineStreamInfo> engineStreamInfo(final FixEngine fixEngine)
{
return fixEngine.engineStreamInfo();
}

private FixEngineInternals()
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package uk.co.real_logic.artio.library;

public final class FixLibraryInternals
{
public static LibraryStreamInfo libraryStreamInfo(final FixLibrary library)
{
return library.libraryStreamInfo();
}

private FixLibraryInternals()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ public void shouldQueryMultipleSessions()
messagesCanBeExchanged(otherInitSession);
assertThat(acceptingLibrary.sessions(), hasSize(1));

// admin API queries the index for sequence numbers, so we need to wait
awaitIndexerCaughtUp(
testSystem,
mediaDriver.mediaDriver().aeronDirectoryName(),
acceptingEngine,
acceptingLibrary);

testSystem.awaitLongBlocking(() ->
{
launchArtioAdmin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testConnectingAfterConnectionTimeouts()
assertEquals(Reply.State.TIMED_OUT, secondConnectReply.state());

// Make connections work again
debugTcpChannelSupplier.unpauseConnects();
debugTcpChannelSupplier.unpauseAndDiscardConnects();

// Now it should connect
connectSessions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ public synchronized void unpauseConnects()
connectsPaused = false;
}

public synchronized void unpauseAndDiscardConnects()
{
connectsPaused = false;
pausedOperations.clear();
}

protected void onFinishConnect(
final InitiatedChannelHandler channelHandler, final SocketChannel channel) throws IOException
{
Expand Down
Loading
Loading