Skip to content

Commit

Permalink
MongoDriver liveness (#29)
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle authored Jun 14, 2023
2 parents 64b18bc + 78b3a84 commit d0c1ca8
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ public class MongoDriverSettings {
String database;

@Default long flushTimeoutMS = 30_000;
@Default long recoveryPollingMS = 30_000;
@Default DatabaseFormat preferredDatabaseFormat = DatabaseFormat.SINGLE_DOC;

@Default Experimental experimental = Experimental.builder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import org.bson.BsonDocument;
import org.bson.Document;
Expand All @@ -39,6 +41,7 @@

import static io.vena.bosk.drivers.mongo.MongoDriverSettings.DatabaseFormat.SINGLE_DOC;
import static io.vena.bosk.drivers.mongo.v2.Formatter.REVISION_ZERO;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
* Serves as a harness for driver implementations.
Expand All @@ -61,6 +64,8 @@ public class MainDriver<R extends Entity> implements MongoDriver<R> {
private volatile FormatDriver<R> formatDriver = new DisconnectedDriver<>("Driver not yet initialized");
private volatile boolean isClosed = false;

private final ScheduledExecutorService livenessPool = Executors.newScheduledThreadPool(1);

public MainDriver(
Bosk<R> bosk,
MongoClientSettings clientSettings,
Expand All @@ -81,6 +86,12 @@ public MainDriver(
.getDatabase(driverSettings.database())
.getCollection(COLLECTION_NAME);
this.receiver = new ChangeEventReceiver(bosk.name(), driverSettings, collection);

livenessPool.scheduleWithFixedDelay(
this::backgroundReconnectTask,
driverSettings.recoveryPollingMS(),
driverSettings.recoveryPollingMS(),
MILLISECONDS);
}

private void validateMongoClientSettings(MongoClientSettings clientSettings) {
Expand Down Expand Up @@ -148,23 +159,44 @@ private void recoverFrom(Exception exception) {
} else {
LOGGER.error("Recovering from unexpected {}; reinitializing", exception.getClass().getSimpleName(), exception);
}
R result;
try {
result = initializeReplication();
} catch (UninitializedCollectionException e) {
LOGGER.warn("Collection is uninitialized; driver is disconnected", e);
return;
} catch (IOException | ReceiverInitializationException e) {
LOGGER.warn("Unable to initialize event receiver", e);
return;
}
if (result != null) {
// Because we haven't called receiver.start() yet, this won't race with other events
downstream.submitReplacement(rootRef, result);
}
if (!isClosed) {
receiver.start();
recover();
}
}

private void backgroundReconnectTask() {
boolean isDisconnected = formatDriver instanceof DisconnectedDriver;
if (isDisconnected) {
try (MDCScope __ = beginDriverOperation("backgroundReconnectTask()")) {
LOGGER.debug("Recovering from disconnection");
try {
recover();
} catch (RuntimeException e) {
LOGGER.debug("Error recovering from disconnection: {}", e.getClass().getSimpleName(), e);
// Ignore and try again next time
}
}
} else {
LOGGER.trace("Nothing to do");
}
}

private void recover() {
R newRoot;
try {
newRoot = initializeReplication();
} catch (UninitializedCollectionException e) {
LOGGER.warn("Collection is uninitialized; driver is disconnected", e);
return;
} catch (IOException | ReceiverInitializationException e) {
LOGGER.warn("Unable to initialize event receiver", e);
return;
}
if (newRoot != null) {
// Because we haven't called receiver.start() yet, this won't race with other events
downstream.submitReplacement(rootRef, newRoot);
}
if (!isClosed) {
receiver.start();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,12 @@
* A set of tests that only work with {@link io.vena.bosk.drivers.mongo.MongoDriverSettings.ImplementationKind#RESILIENT}
*/
public class MongoDriverResiliencyTest extends AbstractMongoDriverTest {
FlushOrWait flushOrWait;

@ParametersByName
public MongoDriverResiliencyTest(MongoDriverSettings.MongoDriverSettingsBuilder driverSettings) {
public MongoDriverResiliencyTest(MongoDriverSettings.MongoDriverSettingsBuilder driverSettings, FlushOrWait flushOrWait) {
super(driverSettings);
this.flushOrWait = flushOrWait;
}

@SuppressWarnings("unused")
Expand All @@ -45,22 +48,31 @@ static Stream<MongoDriverSettings.MongoDriverSettingsBuilder> driverSettings() {
return Stream.of(
MongoDriverSettings.builder()
.database("boskResiliencyTestDB_" + dbCounter.incrementAndGet())
.recoveryPollingMS(500)
.experimental(resilient),
MongoDriverSettings.builder()
.database("boskResiliencyTestDB_" + dbCounter.incrementAndGet() + "_late")
.recoveryPollingMS(500)
.experimental(resilient)
.testing(MongoDriverSettings.Testing.builder()
.eventDelayMS(200)
.build()),
MongoDriverSettings.builder()
.database("boskResiliencyTestDB_" + dbCounter.incrementAndGet() + "_early")
.recoveryPollingMS(500)
.experimental(resilient)
.testing(MongoDriverSettings.Testing.builder()
.eventDelayMS(-200)
.build())
);
}

enum FlushOrWait { FLUSH, WAIT };

static Stream<FlushOrWait> flushOrWait() {
return Stream.of(FlushOrWait.values());
}

@ParametersByName
@DisruptsMongoService
void initialOutage_recovers() throws InvalidTypeException, InterruptedException, IOException {
Expand Down Expand Up @@ -91,7 +103,7 @@ void initialOutage_recovers() throws InvalidTypeException, InterruptedException,
mongoService.proxy().setConnectionCut(false);

LOGGER.debug("Flush and check that the state updates");
driver.flush();
waitFor(driver);
try (var __ = bosk.readContext()) {
assertEquals(initialState, bosk.rootReference().value(),
"Updates to database state once it reconnects");
Expand All @@ -104,12 +116,23 @@ void initialOutage_recovers() throws InvalidTypeException, InterruptedException,
.withListing(Listing.of(refs.catalog(), entity123));


driver.flush();
waitFor(driver);
try (@SuppressWarnings("unused") Bosk<?>.ReadContext readContext = bosk.readContext()) {
assertEquals(expected, bosk.rootReference().value());
}
}

private void waitFor(BoskDriver<TestEntity> driver) throws IOException, InterruptedException {
switch (flushOrWait) {
case FLUSH:
driver.flush();
break;
case WAIT:
Thread.sleep(2 * driverSettings.recoveryPollingMS());
break;
}
}

@ParametersByName
@UsesMongoService
void databaseDropped_recovers() throws InvalidTypeException, InterruptedException, IOException {
Expand Down Expand Up @@ -189,7 +212,7 @@ void revisionDeleted_recovers() throws InvalidTypeException, InterruptedExceptio
);

LOGGER.debug("Ensure flush works");
bosk.driver().flush();
waitFor(bosk.driver());
try (var __ = bosk.readContext()) {
assertEquals(beforeState, bosk.rootReference().value());
}
Expand All @@ -198,7 +221,7 @@ void revisionDeleted_recovers() throws InvalidTypeException, InterruptedExceptio
setRevision(1000L);

LOGGER.debug("Ensure flush works again");
bosk.driver().flush();
waitFor(bosk.driver());
try (var __ = bosk.readContext()) {
assertEquals(beforeState, bosk.rootReference().value());
}
Expand All @@ -222,7 +245,7 @@ private TestEntity initializeDatabase(String distinctiveString) {
bosk -> initialRoot(bosk).withString(distinctiveString),
driverFactory);
MongoDriver<TestEntity> driver = (MongoDriver<TestEntity>) prepBosk.driver();
driver.flush();
waitFor(driver);
driver.close();

return initialRoot(prepBosk).withString(distinctiveString);
Expand Down Expand Up @@ -253,7 +276,7 @@ private void testRecovery(Runnable disruptiveAction, Function<TestEntity, TestEn
TestEntity afterState = recoveryAction.apply(beforeState);

LOGGER.debug("Ensure flush works");
bosk.driver().flush();
waitFor(bosk.driver());
try (var __ = bosk.readContext()) {
assertEquals(afterState, bosk.rootReference().value());
}
Expand Down

0 comments on commit d0c1ca8

Please sign in to comment.