From b40884c9660289b2c91c3a23dc862ed7b219329b Mon Sep 17 00:00:00 2001 From: pcdv Date: Fri, 5 Apr 2024 18:27:24 +0200 Subject: [PATCH] Stabilize test --- .../AbstractGatewayToGatewaySystemTest.java | 2 - .../system_tests/RaceResendResetTest.java | 82 +++++++------------ .../real_logic/artio/util/DebugFIXClient.java | 21 ++++- .../co/real_logic/artio/util/DebugServer.java | 6 +- 4 files changed, 52 insertions(+), 59 deletions(-) diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java index 55e0a81903..545e72f407 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/AbstractGatewayToGatewaySystemTest.java @@ -209,8 +209,6 @@ void awaitMessage(final int sequenceNumber, final Session session) void disconnectSessions() { - logoutAcceptingSession(); - assertSessionsDisconnected(); } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java index 158e0a1aa4..c6faac9826 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java @@ -5,7 +5,6 @@ import org.junit.Ignore; import org.junit.Test; import uk.co.real_logic.artio.decoder.AbstractResendRequestDecoder; -import uk.co.real_logic.artio.dictionary.generation.Exceptions; import uk.co.real_logic.artio.engine.EngineConfiguration; import uk.co.real_logic.artio.engine.FixEngine; import uk.co.real_logic.artio.fields.EpochFractionFormat; @@ -29,10 +28,6 @@ import static org.junit.Assert.assertTrue; import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver; import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; -import static uk.co.real_logic.artio.system_tests.SystemTestUtil.ACCEPTOR_ID; -import static uk.co.real_logic.artio.system_tests.SystemTestUtil.INITIATOR_ID; -import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingConfig; -import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingLibraryConfig; import static uk.co.real_logic.artio.system_tests.SystemTestUtil.connect; import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingConfig; import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingLibraryConfig; @@ -56,13 +51,14 @@ public class RaceResendResetTest extends AbstractGatewayToGatewaySystemTest private long sleepBeforeSendResendRequest; private final ArrayList autoClose = new ArrayList<>(); + private DebugServer initialAcceptor; - private void launch() + private void launch() throws IOException { mediaDriver = launchMediaDriver(); - launchAccepting(); + launchInitialAcceptor(); launchInitiating(); - testSystem = new TestSystem(acceptingLibrary, initiatingLibrary); + testSystem = new TestSystem(initiatingLibrary); } private void launchInitiating() @@ -103,15 +99,15 @@ public void execute() } } - private void launchAccepting() + private void launchInitialAcceptor() throws IOException { - final EngineConfiguration acceptingConfig = acceptingConfig(port, ACCEPTOR_ID, INITIATOR_ID, nanoClock) - .deleteLogFileDirOnStart(true) - .initialAcceptedSessionOwner(SOLE_LIBRARY); - acceptingEngine = FixEngine.launch(acceptingConfig); - - final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock); - acceptingLibrary = connect(acceptingLibraryConfig); + initialAcceptor = new DebugServer(port); + initialAcceptor.setWaitForData(true); + initialAcceptor.addFIXResponse( + "8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=1|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|", + "8=FIX.4.4|9=94|35=1|49=acceptor|56=initiator|34=2|52=***|112=hello|98=0|108=10|141=N|10=024|" + ); + initialAcceptor.start(); } /** @@ -154,33 +150,19 @@ class Proxy extends DirectSessionProxy @Override public long onResend( final Session session, final AbstractResendRequestDecoder resendRequest, - final int correctedEndSeqNo, final ResendRequestResponse response, + final int endSeqNo, final ResendRequestResponse response, final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength ) { - onResendRequestReceived(session, resendRequest, correctedEndSeqNo, response, - messageBuffer, messageOffset, messageLength); - return 1; - } - - private void onResendRequestReceived( - final Session session, final AbstractResendRequestDecoder request, final int endSeqNo, - final ResendRequestResponse response, - final AsciiBuffer messageBuffer, final int messageOffset, final int messageLength - ) - { - System.err.println("onResendRequestReceived() called"); - if (!useProxy || sleepBeforeSendResendRequest == 0) - { - response.resend(); - } - else + System.err.println("onResend() called"); + if (useProxy && sleepBeforeSendResendRequest != 0) { response.delay(); final MutableAsciiBuffer buf = new MutableAsciiBuffer(new byte[messageLength]); buf.putBytes(0, messageBuffer, messageOffset, messageLength); - pendingResendRequest = new PendingResendRequest(session, request.beginSeqNo(), endSeqNo, buf); + pendingResendRequest = new PendingResendRequest(session, resendRequest.beginSeqNo(), endSeqNo, buf); } + return 1; } @Override @@ -224,10 +206,6 @@ public long sendResendRequest( { pendingResendRequest.execute(); } - else - { - System.err.println("onResend not called (direct)"); - } } return 1; } @@ -303,14 +281,14 @@ private void reconnectTest() throws Exception launch(); connectAndAcquire(); + final DebugFIXClient acc1 = new DebugFIXClient(initialAcceptor.popClient(5000)); + acc1.start(); - messagesCanBeExchanged(); - - disconnectSessions(); - Exceptions.closeAll(this::closeAcceptingEngine); - - assertEquals(3, acceptingSession.lastReceivedMsgSeqNum()); - assertEquals(3, initiatingSession.lastReceivedMsgSeqNum()); + acc1.popAndAssert("35=A 34=1"); + acc1.popAndAssert("35=0 34=2 112=hello"); + acc1.close(); + initialAcceptor.stop(); + assertEquals(2, initiatingSession.lastReceivedMsgSeqNum()); final DebugServer srv = new DebugServer(port); srv.setWaitForData(true); @@ -323,11 +301,12 @@ private void reconnectTest() throws Exception connectPersistentSessions(4, 4, false); - final DebugFIXClient exchange = new DebugFIXClient(srv.popClient(5000)); - autoClose.add(exchange::close); - exchange.popAndAssert("35=A 34=4"); - exchange.popAndAssert("35=2 34=5 7=4 16=0"); // ResendRequest now always received first - exchange.popAndAssert("35=4 34=4 36=6"); + final DebugFIXClient acc2 = new DebugFIXClient(srv.popClient(5000)); + acc2.start(); + autoClose.add(acc2::close); + acc2.popAndAssert("35=A 34=4"); + acc2.popAndAssert("35=2 34=5 7=4 16=0"); // ResendRequest now always received first + acc2.popAndAssert("35=4 34=4 36=6"); } @Override @@ -349,6 +328,5 @@ public void close() private void connectAndAcquire() { connectSessions(); - acceptingSession = acceptingHandler.lastSession(); } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java index 635a7df540..81bca804bf 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java @@ -18,7 +18,7 @@ public class DebugFIXClient { private final DebugServer.HasIOStream io; - private final Thread thread; + private Thread thread; private final BlockingQueue> messages = new LinkedBlockingQueue<>(); private volatile boolean disposed; @@ -27,6 +27,11 @@ public class DebugFIXClient public DebugFIXClient(final DebugServer.HasIOStream io) { this.io = Objects.requireNonNull(io); + } + + public void start() + { + assert thread == null; thread = new Thread(this::run, "DebugFIXClient"); thread.start(); } @@ -36,6 +41,7 @@ public void close() throws Exception disposed = true; io.in.close(); io.in.close(); + io.socket.close(); thread.interrupt(); thread.join(); } @@ -58,7 +64,7 @@ private void run() { messages.add(msg); msg = new HashMap<>(); - System.out.println(prefix + s); + System.err.println(prefix + s); s.setLength(0); } } @@ -78,6 +84,7 @@ public Map popMessage() throws InterruptedException public void popAndAssert(final String tagValues) throws InterruptedException { final Map map = popMessage(); + System.err.println(map); if (map == null) { throw new AssertionError("No message received"); @@ -87,7 +94,15 @@ public void popAndAssert(final String tagValues) throws InterruptedException { final String tag = rule.substring(0, rule.indexOf('=')); final String value = map.get(tag); - Assert.assertEquals(rule, tag + "=" + value); + try + { + Assert.assertEquals(rule, tag + "=" + value); + } + catch (final Throwable e) + { + e.printStackTrace(); + throw e; + } } } diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java index 76630513f8..78dd8e856f 100644 --- a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java @@ -96,7 +96,7 @@ public void run() in.reset(); } - final HasIOStream client = new HasIOStream(in, out); + final HasIOStream client = new HasIOStream(socket, in, out); sendResponses(client.out); clients.add(client); } @@ -154,11 +154,13 @@ public void setWaitForData(final boolean waitForData) public static class HasIOStream { + public final Socket socket; public final InputStream in; public final OutputStream out; - public HasIOStream(final InputStream in, final OutputStream out) + public HasIOStream(final Socket socket, final InputStream in, final OutputStream out) { + this.socket = socket; this.in = in; this.out = out; }