From 630dec5156eed9f691dd94d087fd45d8fe3178af Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Sun, 11 Jun 2023 12:13:35 -0400 Subject: [PATCH 1/8] Move some MongoDriverSettings to Experimental --- .../io/vena/bosk/drivers/mongo/MongoDriver.java | 2 +- .../bosk/drivers/mongo/MongoDriverSettings.java | 17 +++++++++++++++-- .../mongo/SingleDocumentMongoDriver.java | 4 ++-- .../mongo/MongoDriverResiliencyTest.java | 9 ++++++--- .../vena/bosk/drivers/mongo/TestParameters.java | 12 ++++++++---- 5 files changed, 32 insertions(+), 12 deletions(-) diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriver.java index 1f696a06..5ca4ce07 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriver.java @@ -51,7 +51,7 @@ static MongoDriverFactory factory( MongoDriverSettings driverSettings, BsonPlugin bsonPlugin ) { - if (driverSettings.implementationKind() == RESILIENT) { + if (driverSettings.experimental().implementationKind() == RESILIENT) { return (b, d) -> new MainDriver<>(b, clientSettings, driverSettings, bsonPlugin, d); } else { return (b, d) -> new SingleDocumentMongoDriver<>(b, clientSettings, driverSettings, bsonPlugin, d); diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java index c3f04288..2ae6207e 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java @@ -12,10 +12,23 @@ public class MongoDriverSettings { String database; @Default long flushTimeoutMS = 30_000; - @Default FlushMode flushMode = FlushMode.ECHO; - @Default ImplementationKind implementationKind = ImplementationKind.STABLE; + + @Default Experimental experimental = Experimental.builder().build(); @Default Testing testing = Testing.builder().build(); + /** + * Settings with no guarantee of long-term support. + */ + @Value + @Builder + public static class Experimental { + @Default ImplementationKind implementationKind = ImplementationKind.STABLE; + @Default FlushMode flushMode = FlushMode.ECHO; + } + + /** + * Settings not meant to be used in production. + */ @Value @Builder public static class Testing { diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/SingleDocumentMongoDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/SingleDocumentMongoDriver.java index 6e0a7e9e..073edf4b 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/SingleDocumentMongoDriver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/SingleDocumentMongoDriver.java @@ -161,12 +161,12 @@ public void submitDeletion(Reference target) { @Override public void flush() throws IOException, InterruptedException { LOGGER.debug("+ flush"); - switch (settings.flushMode()) { + switch (settings.experimental().flushMode()) { case REVISION_FIELD_ONLY: receiver.awaitLatestRevision(); break; default: - LOGGER.warn("Unrecognized flush mode {}; defaulting to ECHO", settings.flushMode()); + LOGGER.warn("Unrecognized flush mode {}; defaulting to ECHO", settings.experimental().flushMode()); // fall through case ECHO: performEcho(); diff --git a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java index 28b14665..fc08099d 100644 --- a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java +++ b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java @@ -39,19 +39,22 @@ public MongoDriverResiliencyTest(MongoDriverSettings.MongoDriverSettingsBuilder @SuppressWarnings("unused") static Stream driverSettings() { + MongoDriverSettings.Experimental resilient = MongoDriverSettings.Experimental.builder() + .implementationKind(RESILIENT) + .build(); return Stream.of( MongoDriverSettings.builder() .database("boskResiliencyTestDB_" + dbCounter.incrementAndGet()) - .implementationKind(RESILIENT), + .experimental(resilient), MongoDriverSettings.builder() .database("boskResiliencyTestDB_" + dbCounter.incrementAndGet() + "_late") - .implementationKind(RESILIENT) + .experimental(resilient) .testing(MongoDriverSettings.Testing.builder() .eventDelayMS(200) .build()), MongoDriverSettings.builder() .database("boskResiliencyTestDB_" + dbCounter.incrementAndGet() + "_early") - .implementationKind(RESILIENT) + .experimental(resilient) .testing(MongoDriverSettings.Testing.builder() .eventDelayMS(-200) .build()) diff --git a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/TestParameters.java b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/TestParameters.java index 8ed17916..84fc72d8 100644 --- a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/TestParameters.java +++ b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/TestParameters.java @@ -1,5 +1,6 @@ package io.vena.bosk.drivers.mongo; +import io.vena.bosk.drivers.mongo.MongoDriverSettings.Experimental; import io.vena.bosk.drivers.mongo.MongoDriverSettings.MongoDriverSettingsBuilder; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -13,22 +14,25 @@ public interface TestParameters { @SuppressWarnings("unused") static Stream driverSettings() { String prefix = "boskTestDB_" + dbCounter.incrementAndGet(); + Experimental resilient = Experimental.builder() + .implementationKind(RESILIENT) + .build(); return Stream.of( MongoDriverSettings.builder() .database(prefix + "_stable") - .implementationKind(STABLE), + .experimental(Experimental.builder().implementationKind(STABLE).build()), MongoDriverSettings.builder() .database(prefix + "_resilient") - .implementationKind(RESILIENT) + .experimental(resilient) // MongoDriverSettings.builder() // .database(prefix + "_slow") -// .implementationKind(RESILIENT) +// .experimental(resilient) // .testing(MongoDriverSettings.Testing.builder() // .eventDelayMS(200) // .build()), // MongoDriverSettings.builder() // .database(prefix + "_fast") -// .implementationKind(RESILIENT) +// .experimental(resilient) // .testing(MongoDriverSettings.Testing.builder() // .eventDelayMS(-200) // .build()) From 7fa6fcce9ebafd468ca0ce17349094002fc52a42 Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Sun, 11 Jun 2023 12:48:17 -0400 Subject: [PATCH 2/8] MongoDriverSpecialTest: drop unrelated database --- .../io/vena/bosk/drivers/mongo/MongoDriverSpecialTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverSpecialTest.java b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverSpecialTest.java index 9e578346..f826b368 100644 --- a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverSpecialTest.java +++ b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverSpecialTest.java @@ -49,7 +49,6 @@ * Tests for MongoDB-specific functionality */ class MongoDriverSpecialTest extends AbstractMongoDriverTest implements TestParameters { - @ParametersByName public MongoDriverSpecialTest(MongoDriverSettingsBuilder driverSettings) { super(driverSettings); @@ -321,7 +320,7 @@ void deleteNonexistentField_ignored() throws InvalidTypeException, IOException, @ParametersByName @UsesMongoService void unrelatedDatabase_ignored() throws InvalidTypeException, IOException, InterruptedException { - tearDownActions.add(mongoService.client().getDatabase("unrelated")::drop); + tearDownActions.addFirst(mongoService.client().getDatabase("unrelated")::drop); doUnrelatedChangeTest("unrelated", COLLECTION_NAME, "boskDocument"); } From d1bf29542782e6a2b3a0f95b4e1f6c2a20563772 Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Sun, 11 Jun 2023 08:50:13 -0400 Subject: [PATCH 3/8] Logging tweaks --- .../drivers/mongo/v2/ChangeEventReceiver.java | 72 +++--- .../bosk/drivers/mongo/v2/MainDriver.java | 206 ++++++++++-------- .../mongo/v2/SingleDocFormatDriver.java | 7 +- .../mongo/AbstractMongoDriverTest.java | 36 ++- .../mongo/MongoDriverResiliencyTest.java | 17 +- .../drivers/mongo/MongoDriverSpecialTest.java | 26 +-- 6 files changed, 218 insertions(+), 146 deletions(-) diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java index d563d3df..7105a58c 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java @@ -7,8 +7,10 @@ import com.mongodb.client.MongoCollection; import com.mongodb.client.model.changestream.ChangeStreamDocument; import io.vena.bosk.drivers.mongo.MongoDriverSettings; +import io.vena.bosk.drivers.mongo.v2.MainDriver.MDCScope; import io.vena.bosk.exceptions.NotYetImplementedException; import java.io.Closeable; +import java.util.NoSuchElementException; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -25,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static io.vena.bosk.drivers.mongo.v2.MainDriver.setupMDC; import static java.lang.Thread.currentThread; import static java.lang.Thread.sleep; import static java.util.Objects.requireNonNull; @@ -179,7 +182,7 @@ private boolean setupNewSession(ChangeEventListener newListener) { try { sleep(-settings.testing().eventDelayMS()); } catch (InterruptedException e) { - LOGGER.debug("Sleep aborted; continuing", e); + LOGGER.debug("Sleep interrupted; continuing", e); Thread.interrupted(); } } @@ -224,39 +227,46 @@ private boolean setupNewSession(ChangeEventListener newListener) { private void eventProcessingLoop(Session session) { String oldThreadName = currentThread().getName(); currentThread().setName(getClass().getSimpleName() + " [" + boskName + "]"); - try { - if (session.initialEvent != null) { - processEvent(session, session.initialEvent); - session.initialEvent = null; // Allow GC - } - while (true) { - if (settings.testing().eventDelayMS() > 0) { - LOGGER.debug("- Sleeping"); - Thread.sleep(settings.testing().eventDelayMS()); + try (MDCScope __ = setupMDC(boskName)) { + try { + if (session.initialEvent != null) { + LOGGER.debug("Processing initial event"); + processEvent(session, session.initialEvent); + session.initialEvent = null; // Allow GC } - processEvent(session, session.cursor.next()); - } - } catch (UnprocessableEventException e) { - LOGGER.warn("Unprocessable event; discarding resume token", e); - lastProcessedResumeToken = null; - session.listener.onException(e); - } catch (InterruptedException | MongoInterruptedException e) { - // This can happen if stop() cancels the task with an interrupt; it's part of normal operation - LOGGER.info("Event loop interrupted", e); - Thread.interrupted(); - } catch (MongoException e) { - if (session.isClosed) { - // This happens when stop() cancels the task; this is part of normal operation - LOGGER.info("Session is closed; exiting event loop", e); - } else { - LOGGER.warn("Unexpected MongoException while processing events; event loop aborted", e); + LOGGER.debug("Starting event loop"); + while (true) { + if (settings.testing().eventDelayMS() > 0) { + LOGGER.debug("- Sleeping"); + Thread.sleep(settings.testing().eventDelayMS()); + } + processEvent(session, session.cursor.next()); + } + } catch (UnprocessableEventException e) { + LOGGER.warn("Unprocessable event; discarding resume token", e); + lastProcessedResumeToken = null; + session.listener.onException(e); + } catch (InterruptedException | MongoInterruptedException e) { + // This can happen if stop() cancels the task with an interrupt; it's part of normal operation + LOGGER.info("Event loop interrupted", e); + Thread.interrupted(); + } catch (NoSuchElementException e) { + LOGGER.warn("Change stream has ended; event loop terminated", e); session.listener.onException(e); + } catch (MongoException e) { + if (session.isClosed) { + // This happens when stop() cancels the task; this is part of normal operation + LOGGER.info("Session is closed; exiting event loop", e); + } else { + LOGGER.warn("Unexpected MongoException while processing events; event loop terminated", e); + session.listener.onException(e); + } + } catch (RuntimeException e) { + LOGGER.warn("Unexpected exception while processing events; event loop terminated", e); + session.listener.onException(e); + } finally { + currentThread().setName(oldThreadName); } - } catch (RuntimeException e) { - LOGGER.warn("Unexpected exception while processing events; event loop aborted", e); - session.listener.onException(e); - } finally { - currentThread().setName(oldThreadName); } } diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java index 1e4e8e13..f3fff1aa 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java @@ -34,6 +34,7 @@ import org.bson.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.MDC; import static io.vena.bosk.drivers.mongo.v2.Formatter.REVISION_ONE; @@ -89,133 +90,121 @@ private void validateMongoClientSettings(MongoClientSettings clientSettings) { @Override public R initialRoot(Type rootType) throws InvalidTypeException, IOException, InterruptedException { - beginDriverOperation("initialRoot"); - R result; - try { - result = initializeReplication(); - } catch (UninitializedCollectionException e) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("[{}] Creating collection", bosk.name(), e); + try (MDCScope __ = beginDriverOperation("initialRoot")) { + R result; + try { + result = initializeReplication(); + } catch (UninitializedCollectionException e) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Creating collection", e); + } else { + LOGGER.info("Creating collection"); + } + FormatDriver newDriver = newSingleDocFormatDriver(REVISION_ONE.longValue()); // TODO: Pick based on config? + result = downstream.initialRoot(rootType); + newDriver.initializeCollection(new StateAndMetadata<>(result, REVISION_ONE)); + formatDriver = newDriver; + } catch (IOException | ReceiverInitializationException e) { + LOGGER.debug("Unable to initialize replication", e); + result = null; + } + if (receiver.isReady()) { + receiver.start(); } else { - LOGGER.info("[{}] Creating collection", bosk.name()); + LOGGER.debug("Receiver not started"); + } + if (result == null) { + return downstream.initialRoot(rootType); + } else { + return result; } - FormatDriver newDriver = newSingleDocFormatDriver(REVISION_ONE.longValue()); // TODO: Pick based on config? - result = downstream.initialRoot(rootType); - newDriver.initializeCollection(new StateAndMetadata<>(result, REVISION_ONE)); - formatDriver = newDriver; - } catch (IOException | ReceiverInitializationException e) { - LOGGER.debug("Unable to initialize replication", e); - result = null; - } - if (receiver.isReady()) { - receiver.start(); - } else { - LOGGER.debug("Receiver not started"); - } - if (result == null) { - return downstream.initialRoot(rootType); - } else { - return result; } } private void recoverFrom(Exception exception) { - if (isClosed) { - LOGGER.debug("Closed driver ignoring exception", exception); - return; - } - if (exception instanceof DisconnectedException) { - LOGGER.debug("Recovering from exception; reinitializing", exception); - } else { - LOGGER.error("Recovering from unexpected exception; reinitializing", exception); - } - R result; - try { - result = initializeReplication(); - } catch (UninitializedCollectionException e) { - LOGGER.warn("Collection is uninitialized; driver is disconnected", e); - return; - } catch (IOException | ReceiverInitializationException e) { - LOGGER.warn("Unable to initialize receiver", e); - return; - } - if (result != null) { - // Because we haven't called receiver.start() yet, this won't race with other events - downstream.submitReplacement(rootRef, result); - } - if (!isClosed) { - receiver.start(); + try (MDCScope __ = beginDriverOperation("recoverFrom({})", exception.getClass().getSimpleName())) { + if (isClosed) { + LOGGER.debug("Closed driver ignoring exception", exception); + return; + } + if (exception instanceof DisconnectedException) { + LOGGER.debug("Recovering from {}; reinitializing", exception.getClass().getSimpleName(), exception); + } else { + LOGGER.error("Recovering from unexpected {}; reinitializing", exception.getClass().getSimpleName(), exception); + } + R result; + try { + result = initializeReplication(); + } catch (UninitializedCollectionException e) { + LOGGER.warn("Collection is uninitialized; driver is disconnected", e); + return; + } catch (IOException | ReceiverInitializationException e) { + LOGGER.warn("Unable to initialize receiver", e); + return; + } + if (result != null) { + // Because we haven't called receiver.start() yet, this won't race with other events + downstream.submitReplacement(rootRef, result); + } + if (!isClosed) { + receiver.start(); + } } } @Override public void submitReplacement(Reference target, T newValue) { - beginDriverOperation("submitReplacement({})", target); runWithRetry(() -> - formatDriver.submitReplacement(target, newValue)); + formatDriver.submitReplacement(target, newValue), "submitReplacement({})", target); } @Override public void submitConditionalReplacement(Reference target, T newValue, Reference precondition, Identifier requiredValue) { - beginDriverOperation("submitConditionalReplacement({}, {} = {})", target, precondition, requiredValue); runWithRetry(() -> - formatDriver.submitConditionalReplacement(target, newValue, precondition, requiredValue)); + formatDriver.submitConditionalReplacement(target, newValue, precondition, requiredValue), "submitConditionalReplacement({}, {} = {})", target, precondition, requiredValue); } @Override public void submitInitialization(Reference target, T newValue) { - beginDriverOperation("submitInitialization({})", target); runWithRetry(() -> - formatDriver.submitInitialization(target, newValue)); + formatDriver.submitInitialization(target, newValue), "submitInitialization({})", target); } @Override public void submitDeletion(Reference target) { - beginDriverOperation("submitDeletion({}, {})", target); runWithRetry(() -> - formatDriver.submitDeletion(target)); + formatDriver.submitDeletion(target), "submitDeletion({}, {})", target); } @Override public void submitConditionalDeletion(Reference target, Reference precondition, Identifier requiredValue) { - beginDriverOperation("submitConditionalDeletion({}, {} = {})", target, precondition, requiredValue); runWithRetry(() -> - formatDriver.submitConditionalDeletion(target, precondition, requiredValue)); + formatDriver.submitConditionalDeletion(target, precondition, requiredValue), "submitConditionalDeletion({}, {} = {})", target, precondition, requiredValue); } @Override public void refurbish() throws IOException { - beginDriverOperation("refurbish"); - runWithRetry(this::doRefurbish); + runWithRetry(this::doRefurbish, "refurbish"); } @Override public void flush() throws IOException, InterruptedException { - beginDriverOperation("flush"); - try { - formatDriver.flush(); - } catch (FlushFailureException | RuntimeException e1) { - recoverFrom(e1); - LOGGER.debug("Retrying flush"); + try (MDCScope __ = beginDriverOperation("flush")) { try { formatDriver.flush(); - } catch (DisconnectedException e2) { // Other RuntimeExceptions are unexpected - // The message from DisconnectionException is suitable as-is - throw new FlushFailureException(e2.getMessage(), e2); + } catch (FlushFailureException | RuntimeException e1) { + recoverFrom(e1); + LOGGER.debug("Retrying flush"); + try { + formatDriver.flush(); + } catch (DisconnectedException e2) { // Other RuntimeExceptions are unexpected + // The message from DisconnectionException is suitable as-is + throw new FlushFailureException(e2.getMessage(), e2); + } } } } - private void runWithRetry(Action action) throws X, Y { - try { - action.run(); - } catch (RuntimeException e) { - recoverFrom(e); - LOGGER.debug("Retrying"); - action.run(); - } - } - private void doRefurbish() throws IOException { ClientSessionOptions sessionOptions = ClientSessionOptions.builder() .causallyConsistent(true) @@ -246,12 +235,26 @@ private void doRefurbish() throws IOException { } } + private void runWithRetry(Action action, String description, Object... args) throws X, Y { + try (MDCScope __ = beginDriverOperation(description, args)) { + try { + action.run(); + } catch (RuntimeException e) { + recoverFrom(e); + LOGGER.debug("Retrying"); + action.run(); + } + } + } + @Override public void close() { - logDriverOperation("close"); - isClosed = true; - receiver.close(); - formatDriver.close(); + try (MDCScope __ = setupMDC(bosk.name())) { + LOGGER.debug("close"); + isClosed = true; + receiver.close(); + formatDriver.close(); + } } private interface Action { @@ -330,7 +333,7 @@ private R initializeReplication() throws UninitializedCollectionException, Recei LOGGER.debug("Initialization interrupted", e); throw new NotYetImplementedException(e); } catch (ExecutionException e) { - LOGGER.debug("Initialization threw", e.getCause()); + LOGGER.debug("Initialization threw {}", e.getCause().getClass().getSimpleName(), e.getCause()); // Unpacking the exception is super annoying if (e.getCause() instanceof UninitializedCollectionException) { throw (UninitializedCollectionException) e.getCause(); @@ -402,20 +405,37 @@ private SingleDocFormatDriver newSingleDocFormatDriver(long revisionAlreadySe downstream); } - private void beginDriverOperation(String description, Object... args) { + private MDCScope beginDriverOperation(String description, Object... args) { if (isClosed) { throw new IllegalStateException("Driver is closed"); } - logDriverOperation(description, args); + MDCScope ex = setupMDC(bosk.name()); + LOGGER.debug(description, args); + return ex; } - private void logDriverOperation(String description, Object... args) { - if (LOGGER.isDebugEnabled()) { - String formatString = "+[" + bosk.name() + "] " + description; - LOGGER.debug(formatString, args); - } + static MDCScope setupMDC(String boskName) { + MDCScope result = new MDCScope(); + MDC.put(MDC_KEY, " [" + boskName + "]"); + return result; + } + + /** + * This is like {@link org.slf4j.MDC.MDCCloseable} except instead of + * deleting the MDC entry at the end, it restores it to its prior value, + * which allows us to nest these. + * + *

+ * Note that for a try block using one of these, the catch and finally + * blocks will run after {@link #close()} and won't have the context. + * You probably want to use this in a try block with no catch or finally clause. + */ + static final class MDCScope implements AutoCloseable { + final String oldValue = MDC.get(MDC_KEY); + @Override public void close() { MDC.put(MDC_KEY, oldValue); } } public static final String COLLECTION_NAME = "boskCollection"; private static final Logger LOGGER = LoggerFactory.getLogger(MainDriver.class); + private static final String MDC_KEY = "MongoDriver"; } diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java index a0c4a63d..26ffabad 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java @@ -163,7 +163,11 @@ public void initializeCollection(StateAndMetadata contents) { @Override public void onEvent(ChangeStreamDocument event) throws UnprocessableEventException { - LOGGER.debug("# EVENT: {}", event); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("# EVENT: {} {}", event.getOperationType().getValue(), event); + } else { + LOGGER.debug("# EVENT: {}", event.getOperationType().getValue()); + } if (!DOCUMENT_FILTER.equals(event.getDocumentKey())) { LOGGER.debug("Ignoring event for unrecognized document key: {}", event.getDocumentKey()); return; @@ -180,6 +184,7 @@ public void onEvent(ChangeStreamDocument event) throws UnprocessableEv throw new NotYetImplementedException("No state??"); } R newRoot = formatter.document2object(state, rootRef); + LOGGER.debug("| Replace {}", rootRef); downstream.submitReplacement(rootRef, newRoot); flushLock.finishedRevision(revision); } break; diff --git a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/AbstractMongoDriverTest.java b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/AbstractMongoDriverTest.java index 6429db60..cd232228 100644 --- a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/AbstractMongoDriverTest.java +++ b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/AbstractMongoDriverTest.java @@ -15,12 +15,16 @@ import io.vena.bosk.drivers.mongo.MongoDriverSettings.MongoDriverSettingsBuilder; import io.vena.bosk.drivers.state.TestEntity; import io.vena.bosk.exceptions.InvalidTypeException; +import java.lang.reflect.Method; import java.util.ArrayDeque; import java.util.Deque; import java.util.Optional; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static io.vena.bosk.drivers.mongo.SingleDocumentMongoDriver.COLLECTION_NAME; import static java.util.concurrent.TimeUnit.SECONDS; @@ -32,7 +36,7 @@ abstract class AbstractMongoDriverTest { protected static MongoService mongoService; protected DriverFactory driverFactory; - protected final Deque tearDownActions = new ArrayDeque<>(); + protected Deque tearDownActions; protected final MongoDriverSettings driverSettings; public AbstractMongoDriverTest(MongoDriverSettingsBuilder driverSettings) { @@ -56,9 +60,35 @@ void setupDriverFactory() { .drop(); } + @BeforeEach + void clearTearDown(TestInfo testInfo) { + logTest("/=== Start", testInfo); + tearDownActions = new ArrayDeque<>(); +// tearDownActions.addLast(() -> { +// try { +// LOGGER.debug("Sleeping after teardown"); +// Thread.sleep(10_000); +// } catch (InterruptedException e) { +// LOGGER.debug("Interrupted", e); +// Thread.interrupted(); +// } finally { +// LOGGER.debug("Done sleeping"); +// } +// }); + } + @AfterEach - void runTearDown() { + void runTearDown(TestInfo testInfo) { tearDownActions.forEach(Runnable::run); + logTest("\\=== Done", testInfo); + } + + private static void logTest(String verb, TestInfo testInfo) { + String method = + testInfo.getTestClass().map(Class::getSimpleName).orElse(null) + + "." + + testInfo.getTestMethod().map(Method::getName).orElse(null); + LOGGER.info("{} {} {}", verb, method, testInfo.getDisplayName()); } @@ -106,4 +136,6 @@ public interface Refs { @ReferencePath("/catalog/-child-/catalog") CatalogReference childCatalog(Identifier child); } + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMongoDriverTest.class); } diff --git a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java index fc08099d..84b31a40 100644 --- a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java +++ b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java @@ -64,12 +64,14 @@ static Stream driverSettings() { @ParametersByName @DisruptsMongoService void initialOutage_recovers() throws InvalidTypeException, InterruptedException, IOException { - // Set up the database contents to be different from initialRoot + LOGGER.debug("Set up the database contents to be different from initialRoot"); TestEntity initialState = initializeDatabase("distinctive string"); + LOGGER.debug("Cut mongo connection"); mongoService.proxy().setConnectionCut(true); - Bosk bosk = new Bosk("Test bosk", TestEntity.class, this::initialRoot, driverFactory); + LOGGER.debug("Create a new bosk that can't connect"); + Bosk bosk = new Bosk("Test " + boskCounter.incrementAndGet(), TestEntity.class, this::initialRoot, driverFactory); MongoDriverSpecialTest.Refs refs = bosk.buildReferences(MongoDriverSpecialTest.Refs.class); BoskDriver driver = bosk.driver(); TestEntity defaultState = initialRoot(bosk); @@ -79,20 +81,23 @@ void initialOutage_recovers() throws InvalidTypeException, InterruptedException, "Uses default state if database is unavailable"); } + LOGGER.debug("Verify that driver operations throw"); assertThrows(FlushFailureException.class, driver::flush, "Flush disallowed during outage"); assertThrows(Exception.class, () -> driver.submitReplacement(bosk.rootReference(), initialRoot(bosk)), "Updates disallowed during outage"); + LOGGER.debug("Restore mongo connection"); mongoService.proxy().setConnectionCut(false); + LOGGER.debug("Flush and check that the state updates"); driver.flush(); try (var __ = bosk.readContext()) { assertEquals(initialState, bosk.rootReference().value(), "Updates to database state once it reconnects"); } - // Make a change to the bosk and verify that it gets through + LOGGER.debug("Make a change to the bosk and verify that it gets through"); driver.submitReplacement(refs.listingEntry(entity123), LISTING_ENTRY); TestEntity expected = initialRoot(bosk) .withString("distinctive string") @@ -169,7 +174,7 @@ void revisionDeleted_recovers() throws InvalidTypeException, InterruptedExceptio LOGGER.debug("Setup database to beforeState"); TestEntity beforeState = initializeDatabase("before deletion"); - Bosk bosk = new Bosk("Test bosk " + boskCounter.incrementAndGet(), TestEntity.class, this::initialRoot, driverFactory); + Bosk bosk = new Bosk("Test " + boskCounter.incrementAndGet(), TestEntity.class, this::initialRoot, driverFactory); try (var __ = bosk.readContext()) { assertEquals(beforeState, bosk.rootReference().value()); } @@ -212,7 +217,7 @@ private void setRevision(long revisionNumber) { private TestEntity initializeDatabase(String distinctiveString) { try { Bosk prepBosk = new Bosk( - "Prep bosk " + boskCounter.incrementAndGet(), + "Prep " + boskCounter.incrementAndGet(), TestEntity.class, bosk -> initialRoot(bosk).withString(distinctiveString), driverFactory); @@ -230,7 +235,7 @@ private void testRecovery(Runnable disruptiveAction, Function bosk = new Bosk("Test bosk " + boskCounter.incrementAndGet(), TestEntity.class, this::initialRoot, driverFactory); + Bosk bosk = new Bosk("Test " + boskCounter.incrementAndGet(), TestEntity.class, this::initialRoot, driverFactory); try (var __ = bosk.readContext()) { assertEquals(beforeState, bosk.rootReference().value()); } diff --git a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverSpecialTest.java b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverSpecialTest.java index f826b368..920843f8 100644 --- a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverSpecialTest.java +++ b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverSpecialTest.java @@ -57,7 +57,7 @@ public MongoDriverSpecialTest(MongoDriverSettingsBuilder driverSettings) { @ParametersByName @UsesMongoService void warmStart_stateMatches() throws InvalidTypeException, InterruptedException, IOException { - Bosk setupBosk = new Bosk("Test bosk", TestEntity.class, this::initialRoot, driverFactory); + Bosk setupBosk = new Bosk("Setup", TestEntity.class, this::initialRoot, driverFactory); Refs refs = setupBosk.buildReferences(Refs.class); // Make a change to the bosk so it's not just the initial root @@ -66,7 +66,7 @@ void warmStart_stateMatches() throws InvalidTypeException, InterruptedException, TestEntity expected = initialRoot(setupBosk) .withListing(Listing.of(refs.catalog(), entity123)); - Bosk latecomerBosk = new Bosk("Latecomer bosk", TestEntity.class, b->{ + Bosk latecomerBosk = new Bosk("Latecomer", TestEntity.class, b->{ throw new AssertionError("Default root function should not be called"); }, driverFactory); @@ -83,7 +83,7 @@ void flush_localStateUpdated() throws InvalidTypeException, InterruptedException // Set up MongoDriver writing to a modified BufferingDriver that lets us // have tight control over all the comings and goings from MongoDriver. BlockingQueue> replacementsSeen = new LinkedBlockingDeque<>(); - Bosk bosk = new Bosk("Test bosk", TestEntity.class, this::initialRoot, + Bosk bosk = new Bosk("Test", TestEntity.class, this::initialRoot, (b,d) -> driverFactory.build(b, new BufferingDriver(d){ @Override public void submitReplacement(Reference target, T newValue) { @@ -135,7 +135,7 @@ public void submitReplacement(Reference target, T newValue) { @ParametersByName @UsesMongoService void listing_stateMatches() throws InvalidTypeException, InterruptedException, IOException { - Bosk bosk = new Bosk("Test bosk", TestEntity.class, this::initialRoot, driverFactory); + Bosk bosk = new Bosk("Test", TestEntity.class, this::initialRoot, driverFactory); BoskDriver driver = bosk.driver(); CatalogReference catalogRef = bosk.rootReference().thenCatalog(TestEntity.class, TestEntity.Fields.catalog); @@ -217,11 +217,11 @@ void networkOutage_boskRecovers() throws InvalidTypeException, InterruptedExcept @UsesMongoService void initialStateHasNonexistentFields_ignored() { // Upon creating bosk, the initial value will be saved to MongoDB - Bosk bosk = new Bosk("Newer bosk", TestEntity.class, this::initialRoot, driverFactory); + Bosk bosk = new Bosk("Newer", TestEntity.class, this::initialRoot, driverFactory); // Upon creating prevBosk, the state in the database will be loaded into the local. Bosk prevBosk = new Bosk( - "Older bosk", + "Prev", OldEntity.class, (b) -> { throw new AssertionError("prevBosk should use the state from MongoDB"); }, createDriverFactory()); @@ -238,9 +238,9 @@ void initialStateHasNonexistentFields_ignored() { @ParametersByName @UsesMongoService void updateHasNonexistentFields_ignored() throws InvalidTypeException, IOException, InterruptedException { - Bosk bosk = new Bosk("Newer bosk", TestEntity.class, this::initialRoot, driverFactory); + Bosk bosk = new Bosk("Newer", TestEntity.class, this::initialRoot, driverFactory); Bosk prevBosk = new Bosk( - "Older bosk", + "Prev", OldEntity.class, (b) -> { throw new AssertionError("prevBosk should use the state from MongoDB"); }, createDriverFactory()); @@ -266,9 +266,9 @@ void updateHasNonexistentFields_ignored() throws InvalidTypeException, IOExcepti @ParametersByName @UsesMongoService void updateNonexistentField_ignored() throws InvalidTypeException, IOException, InterruptedException { - Bosk bosk = new Bosk("Newer bosk", TestEntity.class, this::initialRoot, driverFactory); + Bosk bosk = new Bosk("Newer", TestEntity.class, this::initialRoot, driverFactory); Bosk prevBosk = new Bosk( - "Older bosk", + "Prev", OldEntity.class, (b) -> { throw new AssertionError("prevBosk should use the state from MongoDB"); }, createDriverFactory()); @@ -294,9 +294,9 @@ void updateNonexistentField_ignored() throws InvalidTypeException, IOException, @ParametersByName @UsesMongoService void deleteNonexistentField_ignored() throws InvalidTypeException, IOException, InterruptedException { - Bosk bosk = new Bosk("Newer bosk", TestEntity.class, this::initialRoot, driverFactory); + Bosk bosk = new Bosk("Newer", TestEntity.class, this::initialRoot, driverFactory); Bosk prevBosk = new Bosk( - "Older bosk", + "Prev", OldEntity.class, (b) -> { throw new AssertionError("prevBosk should use the state from MongoDB"); }, createDriverFactory()); @@ -337,7 +337,7 @@ void unrelatedDoc_ignored() throws InvalidTypeException, IOException, Interrupte } private void doUnrelatedChangeTest(String databaseName, String collectionName, String docID) throws IOException, InterruptedException, InvalidTypeException { - Bosk bosk = new Bosk("Test bosk", TestEntity.class, this::initialRoot, driverFactory); + Bosk bosk = new Bosk("Test", TestEntity.class, this::initialRoot, driverFactory); MongoCollection counterfeitCollection = mongoService.client() .getDatabase(databaseName) From 0181c36e949996f91829da4a31fec400af0e308e Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Sun, 11 Jun 2023 11:37:08 -0400 Subject: [PATCH 4/8] Javadocs --- .../main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java | 6 ++++++ .../vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java | 3 +++ 2 files changed, 9 insertions(+) diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java index f3fff1aa..5b940058 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java @@ -38,6 +38,12 @@ import static io.vena.bosk.drivers.mongo.v2.Formatter.REVISION_ONE; +/** + * Serves as a harness for driver implementations. + * Handles resiliency concerns like exception handling, disconnecting, and reinitializing. + * Handles {@link MDC} and some basic logging. + * Implements format detection and refurbish operations. + */ public class MainDriver implements MongoDriver { private final Bosk bosk; private final MongoDriverSettings driverSettings; diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java index 26ffabad..12d87a46 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java @@ -43,6 +43,9 @@ import static java.util.Collections.newSetFromMap; import static org.bson.BsonBoolean.FALSE; +/** + * A {@link FormatDriver} that stores the entire bosk state in a single document. + */ final class SingleDocFormatDriver implements FormatDriver { private final String description; private final MongoDriverSettings settings; From 5e0d3259fc26e155ae26250755cf42b4e5f895d9 Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Sun, 11 Jun 2023 11:38:46 -0400 Subject: [PATCH 5/8] Move precodition checks and logging to MainDriver --- .../io/vena/bosk/drivers/mongo/v2/MainDriver.java | 14 ++++++++++++-- .../drivers/mongo/v2/SingleDocFormatDriver.java | 11 +---------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java index 5b940058..96756ce4 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java @@ -178,8 +178,13 @@ public void submitInitialization(Reference target, T newValue) { @Override public void submitDeletion(Reference target) { - runWithRetry(() -> - formatDriver.submitDeletion(target), "submitDeletion({}, {})", target); + if (target.path().isEmpty()) { + // TODO: Refactor this kind of error checking out of LocalDriver and make it available + throw new IllegalArgumentException("Can't delete the root of the bosk"); + } else { + runWithRetry(() -> + formatDriver.submitDeletion(target), "submitDeletion({}, {})", target); + } } @Override @@ -372,6 +377,11 @@ private final class Listener implements ChangeEventListener { @Override public void onEvent(ChangeStreamDocument event) throws UnprocessableEventException { if (isListening) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("# EVENT: {} {}", event.getOperationType().getValue(), event); + } else { + LOGGER.debug("# EVENT: {}", event.getOperationType().getValue()); + } try { formatDriver.onEvent(event); } catch (RuntimeException e) { diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java index 12d87a46..0653a2c3 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/SingleDocFormatDriver.java @@ -96,11 +96,7 @@ public void submitInitialization(Reference target, T newValue) { @Override public void submitDeletion(Reference target) { - if (target.path().isEmpty()) { // TODO: this seems out of place. MainDriver ought to do error checking like this - throw new IllegalArgumentException("Can't delete the root of the bosk"); - } else { - doUpdate(deletionDoc(target), standardPreconditions(target)); - } + doUpdate(deletionDoc(target), standardPreconditions(target)); } @Override @@ -166,11 +162,6 @@ public void initializeCollection(StateAndMetadata contents) { @Override public void onEvent(ChangeStreamDocument event) throws UnprocessableEventException { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("# EVENT: {} {}", event.getOperationType().getValue(), event); - } else { - LOGGER.debug("# EVENT: {}", event.getOperationType().getValue()); - } if (!DOCUMENT_FILTER.equals(event.getDocumentKey())) { LOGGER.debug("Ignoring event for unrecognized document key: {}", event.getDocumentKey()); return; From a0af3d13e5aaf77c40c200a4719517394affacf0 Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Sun, 11 Jun 2023 12:14:39 -0400 Subject: [PATCH 6/8] Add preferredDatabaseFormat setting --- .../bosk/drivers/mongo/MongoDriverSettings.java | 5 +++++ .../io/vena/bosk/drivers/mongo/v2/MainDriver.java | 13 +++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java index 2ae6207e..a1a5e526 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java @@ -12,6 +12,7 @@ public class MongoDriverSettings { String database; @Default long flushTimeoutMS = 30_000; + @Default DatabaseFormat preferredDatabaseFormat = DatabaseFormat.SINGLE_DOC; @Default Experimental experimental = Experimental.builder().build(); @Default Testing testing = Testing.builder().build(); @@ -83,4 +84,8 @@ public enum ImplementationKind { */ RESILIENT, } + + public enum DatabaseFormat { + SINGLE_DOC + } } diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java index 96756ce4..39fa5ac6 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java @@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory; import org.slf4j.MDC; +import static io.vena.bosk.drivers.mongo.MongoDriverSettings.DatabaseFormat.SINGLE_DOC; import static io.vena.bosk.drivers.mongo.v2.Formatter.REVISION_ONE; /** @@ -106,7 +107,7 @@ public R initialRoot(Type rootType) throws InvalidTypeException, IOException, In } else { LOGGER.info("Creating collection"); } - FormatDriver newDriver = newSingleDocFormatDriver(REVISION_ONE.longValue()); // TODO: Pick based on config? + FormatDriver newDriver = newPreferredFormatDriver(); result = downstream.initialRoot(rootType); newDriver.initializeCollection(new StateAndMetadata<>(result, REVISION_ONE)); formatDriver = newDriver; @@ -127,6 +128,14 @@ public R initialRoot(Type rootType) throws InvalidTypeException, IOException, In } } + private FormatDriver newPreferredFormatDriver() { + if (driverSettings.preferredDatabaseFormat() == SINGLE_DOC) { + return newSingleDocFormatDriver(REVISION_ONE.longValue()); + } else { + throw new AssertionError("Unknown database format setting: " + driverSettings.preferredDatabaseFormat()); + } + } + private void recoverFrom(Exception exception) { try (MDCScope __ = beginDriverOperation("recoverFrom({})", exception.getClass().getSimpleName())) { if (isClosed) { @@ -231,7 +240,7 @@ private void doRefurbish() throws IOException { // That system needs to cope with a refurbish operations without any help. session.startTransaction(); StateAndMetadata result = formatDriver.loadAllState(); - FormatDriver newFormatDriver = detectFormat(); // TODO: use the configured driver, not the detected one + FormatDriver newFormatDriver = newPreferredFormatDriver(); collection.deleteMany(new BsonDocument()); newFormatDriver.initializeCollection(result); session.commitTransaction(); From a4ac73cbc97a5e14385682fe1e8620cbb00cc3cd Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Sun, 11 Jun 2023 12:17:11 -0400 Subject: [PATCH 7/8] Add changeStreamInitialWaitMS setting --- .../java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java | 1 + .../io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java | 3 +-- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java index a1a5e526..0f897a92 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java @@ -25,6 +25,7 @@ public class MongoDriverSettings { public static class Experimental { @Default ImplementationKind implementationKind = ImplementationKind.STABLE; @Default FlushMode flushMode = FlushMode.ECHO; + @Default long changeStreamInitialWaitMS = 20; } /** diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java index 7105a58c..ef2ec869 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java @@ -187,9 +187,8 @@ private boolean setupNewSession(ChangeEventListener newListener) { } } LOGGER.debug("Acquire initial resume token"); - // TODO: Config // Note: on a quiescent collection, tryNext() will wait for the Await Time to elapse, so keep it short - try (var initialCursor = collection.watch().maxAwaitTime(20, MILLISECONDS).cursor()) { + try (var initialCursor = collection.watch().maxAwaitTime(settings.experimental().changeStreamInitialWaitMS(), MILLISECONDS).cursor()) { initialEvent = initialCursor.tryNext(); if (initialEvent == null) { // In this case, tryNext() has caused the cursor to point to From 3b0d218b5dbad487494f946a012ed2255e6916a0 Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Sun, 11 Jun 2023 12:17:53 -0400 Subject: [PATCH 8/8] Use flushTimeoutMS for ChangeEventReceiver.stop --- .../io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java index ef2ec869..f5a781bc 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/ChangeEventReceiver.java @@ -32,7 +32,6 @@ import static java.lang.Thread.sleep; import static java.util.Objects.requireNonNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; /** * Has three jobs: @@ -137,7 +136,7 @@ public void stop() throws InterruptedException, TimeoutException { false ); try { - task.get(10, SECONDS); // TODO: Config + task.get(settings.flushTimeoutMS(), MILLISECONDS); // Not strictly a flush timeout, but it's related LOGGER.debug("Cancellation succeeded; event loop exited normally"); this.eventProcessingTask = null; } catch (CancellationException e) {