From 17b98305956ee75a44bc8fb1d56eda95881cb822 Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Wed, 14 Jun 2023 15:46:34 -0400 Subject: [PATCH 1/3] Refactor: extract 'recover' method --- .../bosk/drivers/mongo/v2/MainDriver.java | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java index 5882a82d..5c3d28ca 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java @@ -148,23 +148,29 @@ private void recoverFrom(Exception exception) { } else { LOGGER.error("Recovering from unexpected {}; reinitializing", exception.getClass().getSimpleName(), exception); } - R result; - try { - result = initializeReplication(); - } catch (UninitializedCollectionException e) { - LOGGER.warn("Collection is uninitialized; driver is disconnected", e); - return; - } catch (IOException | ReceiverInitializationException e) { - LOGGER.warn("Unable to initialize event receiver", e); - return; - } - if (result != null) { - // Because we haven't called receiver.start() yet, this won't race with other events - downstream.submitReplacement(rootRef, result); - } - if (!isClosed) { - receiver.start(); + recover(); + } + } + } + + private void recover() { + R newRoot; + try { + newRoot = initializeReplication(); + } catch (UninitializedCollectionException e) { + LOGGER.warn("Collection is uninitialized; driver is disconnected", e); + return; + } catch (IOException | ReceiverInitializationException e) { + LOGGER.warn("Unable to initialize event receiver", e); + return; + } + if (newRoot != null) { + // Because we haven't called receiver.start() yet, this won't race with other events + downstream.submitReplacement(rootRef, newRoot); + } + if (!isClosed) { + receiver.start(); } } From 61230035a727adb2688e202cc406aa1d6bf63afc Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Wed, 14 Jun 2023 15:47:50 -0400 Subject: [PATCH 2/3] Add liveness tests to MongoDriverResiliencyTest. Every place we call flush(), also try just waiting. This is not technically required by the spec, but it is a desirable property for MongoDriver to have. --- .../mongo/MongoDriverResiliencyTest.java | 34 +++++++++++++++---- 1 file changed, 27 insertions(+), 7 deletions(-) diff --git a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java index f0e462fd..b02673ea 100644 --- a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java +++ b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java @@ -32,9 +32,12 @@ * A set of tests that only work with {@link io.vena.bosk.drivers.mongo.MongoDriverSettings.ImplementationKind#RESILIENT} */ public class MongoDriverResiliencyTest extends AbstractMongoDriverTest { + FlushOrWait flushOrWait; + @ParametersByName - public MongoDriverResiliencyTest(MongoDriverSettings.MongoDriverSettingsBuilder driverSettings) { + public MongoDriverResiliencyTest(MongoDriverSettings.MongoDriverSettingsBuilder driverSettings, FlushOrWait flushOrWait) { super(driverSettings); + this.flushOrWait = flushOrWait; } @SuppressWarnings("unused") @@ -61,6 +64,12 @@ static Stream driverSettings() { ); } + enum FlushOrWait { FLUSH, WAIT }; + + static Stream flushOrWait() { + return Stream.of(FlushOrWait.values()); + } + @ParametersByName @DisruptsMongoService void initialOutage_recovers() throws InvalidTypeException, InterruptedException, IOException { @@ -91,7 +100,7 @@ void initialOutage_recovers() throws InvalidTypeException, InterruptedException, mongoService.proxy().setConnectionCut(false); LOGGER.debug("Flush and check that the state updates"); - driver.flush(); + waitFor(driver); try (var __ = bosk.readContext()) { assertEquals(initialState, bosk.rootReference().value(), "Updates to database state once it reconnects"); @@ -104,12 +113,23 @@ void initialOutage_recovers() throws InvalidTypeException, InterruptedException, .withListing(Listing.of(refs.catalog(), entity123)); - driver.flush(); + waitFor(driver); try (@SuppressWarnings("unused") Bosk.ReadContext readContext = bosk.readContext()) { assertEquals(expected, bosk.rootReference().value()); } } + private void waitFor(BoskDriver driver) throws IOException, InterruptedException { + switch (flushOrWait) { + case FLUSH: + driver.flush(); + break; + case WAIT: + Thread.sleep(2 * driverSettings.recoveryPollingMS()); + break; + } + } + @ParametersByName @UsesMongoService void databaseDropped_recovers() throws InvalidTypeException, InterruptedException, IOException { @@ -189,7 +209,7 @@ void revisionDeleted_recovers() throws InvalidTypeException, InterruptedExceptio ); LOGGER.debug("Ensure flush works"); - bosk.driver().flush(); + waitFor(bosk.driver()); try (var __ = bosk.readContext()) { assertEquals(beforeState, bosk.rootReference().value()); } @@ -198,7 +218,7 @@ void revisionDeleted_recovers() throws InvalidTypeException, InterruptedExceptio setRevision(1000L); LOGGER.debug("Ensure flush works again"); - bosk.driver().flush(); + waitFor(bosk.driver()); try (var __ = bosk.readContext()) { assertEquals(beforeState, bosk.rootReference().value()); } @@ -222,7 +242,7 @@ private TestEntity initializeDatabase(String distinctiveString) { bosk -> initialRoot(bosk).withString(distinctiveString), driverFactory); MongoDriver driver = (MongoDriver) prepBosk.driver(); - driver.flush(); + waitFor(driver); driver.close(); return initialRoot(prepBosk).withString(distinctiveString); @@ -253,7 +273,7 @@ private void testRecovery(Runnable disruptiveAction, Function Date: Wed, 14 Jun 2023 15:59:15 -0400 Subject: [PATCH 3/3] Add recovery polling to MainDriver for liveness --- .../drivers/mongo/MongoDriverSettings.java | 1 + .../bosk/drivers/mongo/v2/MainDriver.java | 26 +++++++++++++++++++ .../mongo/MongoDriverResiliencyTest.java | 3 +++ 3 files changed, 30 insertions(+) diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java index 20a78239..a7590903 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/MongoDriverSettings.java @@ -12,6 +12,7 @@ public class MongoDriverSettings { String database; @Default long flushTimeoutMS = 30_000; + @Default long recoveryPollingMS = 30_000; @Default DatabaseFormat preferredDatabaseFormat = DatabaseFormat.SINGLE_DOC; @Default Experimental experimental = Experimental.builder().build(); diff --git a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java index 5c3d28ca..4f177456 100644 --- a/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java +++ b/bosk-mongo/src/main/java/io/vena/bosk/drivers/mongo/v2/MainDriver.java @@ -29,7 +29,9 @@ import java.io.IOException; import java.lang.reflect.Type; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.FutureTask; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicReference; import org.bson.BsonDocument; import org.bson.Document; @@ -39,6 +41,7 @@ import static io.vena.bosk.drivers.mongo.MongoDriverSettings.DatabaseFormat.SINGLE_DOC; import static io.vena.bosk.drivers.mongo.v2.Formatter.REVISION_ZERO; +import static java.util.concurrent.TimeUnit.MILLISECONDS; /** * Serves as a harness for driver implementations. @@ -61,6 +64,8 @@ public class MainDriver implements MongoDriver { private volatile FormatDriver formatDriver = new DisconnectedDriver<>("Driver not yet initialized"); private volatile boolean isClosed = false; + private final ScheduledExecutorService livenessPool = Executors.newScheduledThreadPool(1); + public MainDriver( Bosk bosk, MongoClientSettings clientSettings, @@ -81,6 +86,12 @@ public MainDriver( .getDatabase(driverSettings.database()) .getCollection(COLLECTION_NAME); this.receiver = new ChangeEventReceiver(bosk.name(), driverSettings, collection); + + livenessPool.scheduleWithFixedDelay( + this::backgroundReconnectTask, + driverSettings.recoveryPollingMS(), + driverSettings.recoveryPollingMS(), + MILLISECONDS); } private void validateMongoClientSettings(MongoClientSettings clientSettings) { @@ -152,7 +163,22 @@ private void recoverFrom(Exception exception) { } } + private void backgroundReconnectTask() { + boolean isDisconnected = formatDriver instanceof DisconnectedDriver; + if (isDisconnected) { + try (MDCScope __ = beginDriverOperation("backgroundReconnectTask()")) { + LOGGER.debug("Recovering from disconnection"); + try { + recover(); + } catch (RuntimeException e) { + LOGGER.debug("Error recovering from disconnection: {}", e.getClass().getSimpleName(), e); + // Ignore and try again next time + } } + } else { + LOGGER.trace("Nothing to do"); + } + } private void recover() { R newRoot; diff --git a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java index b02673ea..f694ee84 100644 --- a/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java +++ b/bosk-mongo/src/test/java/io/vena/bosk/drivers/mongo/MongoDriverResiliencyTest.java @@ -48,15 +48,18 @@ static Stream driverSettings() { return Stream.of( MongoDriverSettings.builder() .database("boskResiliencyTestDB_" + dbCounter.incrementAndGet()) + .recoveryPollingMS(500) .experimental(resilient), MongoDriverSettings.builder() .database("boskResiliencyTestDB_" + dbCounter.incrementAndGet() + "_late") + .recoveryPollingMS(500) .experimental(resilient) .testing(MongoDriverSettings.Testing.builder() .eventDelayMS(200) .build()), MongoDriverSettings.builder() .database("boskResiliencyTestDB_" + dbCounter.incrementAndGet() + "_early") + .recoveryPollingMS(500) .experimental(resilient) .testing(MongoDriverSettings.Testing.builder() .eventDelayMS(-200)