Skip to content

Commit

Permalink
fix intermittent test
Browse files Browse the repository at this point in the history
  • Loading branch information
wojciech-adaptive committed Sep 14, 2023
1 parent 6821f81 commit f7bcea7
Show file tree
Hide file tree
Showing 16 changed files with 418 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class EngineContext implements AutoCloseable
private ReplayQuery pruneInboundReplayQuery;
private ReplayQuery outboundReplayQuery;
private FramerContext framerContext;
private long inboundIndexRegistrationId;
private long outboundIndexRegistrationId;

EngineContext(
Expand Down Expand Up @@ -281,9 +282,12 @@ private void newIndexers()
}
inboundIndices.add(receivedSequenceNumberIndex);

final Subscription inboundIndexSubscription = inboundLibraryStreams.subscription("inboundIndexer");
inboundIndexRegistrationId = inboundIndexSubscription.registrationId();

inboundIndexer = new Indexer(
inboundIndices,
inboundLibraryStreams.subscription("inboundIndexer"),
inboundIndexSubscription,
configuration.agentNamePrefix(),
inboundCompletionPosition,
configuration.archiveReplayStream());
Expand Down Expand Up @@ -322,6 +326,11 @@ private void newIndexers()
}
}

public long inboundIndexRegistrationId()
{
return inboundIndexRegistrationId;
}

public long outboundIndexRegistrationId()
{
return outboundIndexRegistrationId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.status.ReadablePosition;
import uk.co.real_logic.artio.*;
import uk.co.real_logic.artio.engine.framer.EngineStreamInfo;
import uk.co.real_logic.artio.engine.framer.FramerContext;
import uk.co.real_logic.artio.engine.framer.LibraryInfo;
import uk.co.real_logic.artio.timing.EngineTimers;
Expand Down Expand Up @@ -525,4 +526,14 @@ public Reply<?> startReproduction()

return framerContext.startReproduction();
}

/**
* Returns information about engine streams. Internal API for testing.
*
* @return a reply that should eventually contain information about engine streams or {@code null} if back-pressured
*/
Reply<EngineStreamInfo> engineStreamInfo()
{
return framerContext.engineStreamInfo();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package uk.co.real_logic.artio.engine.framer;

public final class EngineStreamInfo
{
private final long inboundIndexSubscriptionRegistrationId;
private final long outboundIndexSubscriptionRegistrationId;
private final int inboundPublicationSessionId;
private final long inboundPublicationPosition;
private final int outboundPublicationSessionId;
private final long outboundPublicationPosition;

EngineStreamInfo(
final long inboundIndexSubscriptionRegistrationId,
final long outboundIndexSubscriptionRegistrationId,
final int inboundPublicationSessionId,
final long inboundPublicationPosition,
final int outboundPublicationSessionId,
final long outboundPublicationPosition)
{
this.inboundIndexSubscriptionRegistrationId = inboundIndexSubscriptionRegistrationId;
this.outboundIndexSubscriptionRegistrationId = outboundIndexSubscriptionRegistrationId;
this.inboundPublicationSessionId = inboundPublicationSessionId;
this.inboundPublicationPosition = inboundPublicationPosition;
this.outboundPublicationSessionId = outboundPublicationSessionId;
this.outboundPublicationPosition = outboundPublicationPosition;
}

public long inboundIndexSubscriptionRegistrationId()
{
return inboundIndexSubscriptionRegistrationId;
}

public long outboundIndexSubscriptionRegistrationId()
{
return outboundIndexSubscriptionRegistrationId;
}

public int inboundPublicationSessionId()
{
return inboundPublicationSessionId;
}

public long inboundPublicationPosition()
{
return inboundPublicationPosition;
}

public int outboundPublicationSessionId()
{
return outboundPublicationSessionId;
}

public long outboundPublicationPosition()
{
return outboundPublicationPosition;
}

public String toString()
{
return "EngineStreamInfo{" +
"inboundIndexSubscriptionRegistrationId=" + inboundIndexSubscriptionRegistrationId +
", outboundIndexSubscriptionRegistrationId=" + outboundIndexSubscriptionRegistrationId +
", inboundPublicationSessionId=" + inboundPublicationSessionId +
", inboundPublicationPosition=" + inboundPublicationPosition +
", outboundPublicationSessionId=" + outboundPublicationSessionId +
", outboundPublicationPosition=" + outboundPublicationPosition +
'}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package uk.co.real_logic.artio.engine.framer;

import uk.co.real_logic.artio.Reply;

final class EngineStreamInfoRequestCommand implements AdminCommand, Reply<EngineStreamInfo>
{
private volatile State state = State.EXECUTING;

private EngineStreamInfo engineStreamInfo;

public Throwable error()
{
return null;
}

public EngineStreamInfo resultIfPresent()
{
return engineStreamInfo;
}

public State state()
{
return state;
}

public void execute(final Framer framer)
{
framer.onEngineStreamInfoRequest(this);
}

public void complete(final EngineStreamInfo engineStreamInfo)
{
this.engineStreamInfo = engineStreamInfo;
state = State.COMPLETED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
private final ReceiverEndPoints receiverEndPoints;
private final FixSenderEndPoints fixSenderEndPoints;
private final CountersReader countersReader;
private final long inboundIndexRegistrationId;
private final long outboundIndexRegistrationId;
private final SenderSequenceNumbers senderSequenceNumbers;
private final ReproductionLogWriter reproductionLogWriter;
Expand Down Expand Up @@ -228,6 +229,7 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
final RecordingCoordinator recordingCoordinator,
final FixPContexts fixPContexts,
final CountersReader countersReader,
final long inboundIndexRegistrationId,
final long outboundIndexRegistrationId,
final FixCounters fixCounters,
final SenderSequenceNumbers senderSequenceNumbers,
Expand All @@ -254,6 +256,7 @@ class Framer implements Agent, EngineEndPointHandler, ProtocolHandler
this.outboundLibraryCompletionPosition = outboundLibraryCompletionPosition;
this.fixSenderEndPoints = new FixSenderEndPoints(errorHandler);
this.countersReader = countersReader;
this.inboundIndexRegistrationId = inboundIndexRegistrationId;
this.outboundIndexRegistrationId = outboundIndexRegistrationId;
this.senderSequenceNumbers = senderSequenceNumbers;
this.reproductionLogWriter = reproductionLogWriter;
Expand Down Expand Up @@ -3490,6 +3493,17 @@ public void onPositionRequest(final PositionRequestCommand command)
command.position(new UnsafeBufferPosition((UnsafeBuffer)countersReader.valuesBuffer(), counterId));
}

public void onEngineStreamInfoRequest(final EngineStreamInfoRequestCommand command)
{
command.complete(new EngineStreamInfo(
inboundIndexRegistrationId,
outboundIndexRegistrationId,
inboundPublication.sessionId(),
inboundPublication.position(),
outboundPublication.sessionId(),
outboundPublication.position()));
}

public void onWriteMetaDataResponse(final WriteMetaDataResponse response)
{
schedule(() -> inboundPublication.saveWriteMetaDataReply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public FramerContext(
recordingCoordinator,
fixPContexts,
aeron.countersReader(),
engineContext.inboundIndexRegistrationId(),
engineContext.outboundIndexRegistrationId(),
fixCounters,
engineContext.senderSequenceNumbers(),
Expand Down Expand Up @@ -425,4 +426,16 @@ public Reply<?> startReproduction()
}
return command;
}

public Reply<EngineStreamInfo> engineStreamInfo()
{
final EngineStreamInfoRequestCommand command = new EngineStreamInfoRequestCommand();

if (adminCommands.offer(command))
{
return command;
}

return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -535,4 +535,13 @@ protected boolean shouldRethrowExceptionInErrorHandler()
return RETHROW_EXCEPTION.get();
}

/**
* Returns information about library streams. Internal API for testing.
*
* @return information about library streams
*/
LibraryStreamInfo libraryStreamInfo()
{
return poller.libraryStreamInfo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2427,6 +2427,15 @@ public List<FixPConnection> fixPConnections()
{
return (List<FixPConnection>)(List<?>)unmodifiableFixPConnections;
}

LibraryStreamInfo libraryStreamInfo()
{
return new LibraryStreamInfo(
inboundPublication.sessionId(),
inboundPublication.position(),
outboundPublication.sessionId(),
outboundPublication.position());
}
}

class UnmodifiableWrapper<T> extends AbstractList<T>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package uk.co.real_logic.artio.library;

public final class LibraryStreamInfo
{
private final int inboundPublicationSessionId;
private final long inboundPublicationPosition;
private final int outboundPublicationSessionId;
private final long outboundPublicationPosition;

LibraryStreamInfo(
final int inboundPublicationSessionId,
final long inboundPublicationPosition,
final int outboundPublicationSessionId,
final long outboundPublicationPosition)
{
this.inboundPublicationSessionId = inboundPublicationSessionId;
this.inboundPublicationPosition = inboundPublicationPosition;
this.outboundPublicationSessionId = outboundPublicationSessionId;
this.outboundPublicationPosition = outboundPublicationPosition;
}

public int inboundPublicationSessionId()
{
return inboundPublicationSessionId;
}

public long inboundPublicationPosition()
{
return inboundPublicationPosition;
}

public int outboundPublicationSessionId()
{
return outboundPublicationSessionId;
}

public long outboundPublicationPosition()
{
return outboundPublicationPosition;
}

public String toString()
{
return "LibraryStreamInfo{" +
"inboundPublicationSessionId=" + inboundPublicationSessionId +
", inboundPublicationPosition=" + inboundPublicationPosition +
", outboundPublicationSessionId=" + outboundPublicationSessionId +
", outboundPublicationPosition=" + outboundPublicationPosition +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ public void setUp() throws IOException
mock(RecordingCoordinator.class),
mock(FixPContexts.class),
mock(CountersReader.class),
2,
1,
mock(FixCounters.class),
mock(SenderSequenceNumbers.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package uk.co.real_logic.artio.engine;

import uk.co.real_logic.artio.Reply;
import uk.co.real_logic.artio.engine.framer.EngineStreamInfo;

public final class FixEngineInternals
{
public static Reply<EngineStreamInfo> engineStreamInfo(final FixEngine fixEngine)
{
return fixEngine.engineStreamInfo();
}

private FixEngineInternals()
{
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package uk.co.real_logic.artio.library;

public final class FixLibraryInternals
{
public static LibraryStreamInfo libraryStreamInfo(final FixLibrary library)
{
return library.libraryStreamInfo();
}

private FixLibraryInternals()
{
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ public void shouldQueryMultipleSessions()
messagesCanBeExchanged(otherInitSession);
assertThat(acceptingLibrary.sessions(), hasSize(1));

// admin API queries the index for sequence numbers, so we need to wait
awaitIndexerCaughtUp(
testSystem,
mediaDriver.mediaDriver().aeronDirectoryName(),
acceptingEngine,
acceptingLibrary);

testSystem.awaitLongBlocking(() ->
{
launchArtioAdmin();
Expand Down
Loading

0 comments on commit f7bcea7

Please sign in to comment.