Skip to content

Commit

Permalink
Polish resilient driver (#24)
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle authored Jun 11, 2023
2 parents a0d2695 + 3b0d218 commit f83e32c
Show file tree
Hide file tree
Showing 10 changed files with 283 additions and 169 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ static <RR extends Entity> MongoDriverFactory<RR> factory(
MongoDriverSettings driverSettings,
BsonPlugin bsonPlugin
) {
if (driverSettings.implementationKind() == RESILIENT) {
if (driverSettings.experimental().implementationKind() == RESILIENT) {
return (b, d) -> new MainDriver<>(b, clientSettings, driverSettings, bsonPlugin, d);
} else {
return (b, d) -> new SingleDocumentMongoDriver<>(b, clientSettings, driverSettings, bsonPlugin, d);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,25 @@ public class MongoDriverSettings {
String database;

@Default long flushTimeoutMS = 30_000;
@Default FlushMode flushMode = FlushMode.ECHO;
@Default ImplementationKind implementationKind = ImplementationKind.STABLE;
@Default DatabaseFormat preferredDatabaseFormat = DatabaseFormat.SINGLE_DOC;

@Default Experimental experimental = Experimental.builder().build();
@Default Testing testing = Testing.builder().build();

/**
* Settings with no guarantee of long-term support.
*/
@Value
@Builder
public static class Experimental {
@Default ImplementationKind implementationKind = ImplementationKind.STABLE;
@Default FlushMode flushMode = FlushMode.ECHO;
@Default long changeStreamInitialWaitMS = 20;
}

/**
* Settings not meant to be used in production.
*/
@Value
@Builder
public static class Testing {
Expand Down Expand Up @@ -70,4 +85,8 @@ public enum ImplementationKind {
*/
RESILIENT,
}

public enum DatabaseFormat {
SINGLE_DOC
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,12 +161,12 @@ public <T> void submitDeletion(Reference<T> target) {
@Override
public void flush() throws IOException, InterruptedException {
LOGGER.debug("+ flush");
switch (settings.flushMode()) {
switch (settings.experimental().flushMode()) {
case REVISION_FIELD_ONLY:
receiver.awaitLatestRevision();
break;
default:
LOGGER.warn("Unrecognized flush mode {}; defaulting to ECHO", settings.flushMode());
LOGGER.warn("Unrecognized flush mode {}; defaulting to ECHO", settings.experimental().flushMode());
// fall through
case ECHO:
performEcho();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.vena.bosk.drivers.mongo.MongoDriverSettings;
import io.vena.bosk.drivers.mongo.v2.MainDriver.MDCScope;
import io.vena.bosk.exceptions.NotYetImplementedException;
import java.io.Closeable;
import java.util.NoSuchElementException;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
Expand All @@ -25,11 +27,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.vena.bosk.drivers.mongo.v2.MainDriver.setupMDC;
import static java.lang.Thread.currentThread;
import static java.lang.Thread.sleep;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;

/**
* Has three jobs:
Expand Down Expand Up @@ -134,7 +136,7 @@ public void stop() throws InterruptedException, TimeoutException {
false
);
try {
task.get(10, SECONDS); // TODO: Config
task.get(settings.flushTimeoutMS(), MILLISECONDS); // Not strictly a flush timeout, but it's related
LOGGER.debug("Cancellation succeeded; event loop exited normally");
this.eventProcessingTask = null;
} catch (CancellationException e) {
Expand Down Expand Up @@ -179,14 +181,13 @@ private boolean setupNewSession(ChangeEventListener newListener) {
try {
sleep(-settings.testing().eventDelayMS());
} catch (InterruptedException e) {
LOGGER.debug("Sleep aborted; continuing", e);
LOGGER.debug("Sleep interrupted; continuing", e);
Thread.interrupted();
}
}
LOGGER.debug("Acquire initial resume token");
// TODO: Config
// Note: on a quiescent collection, tryNext() will wait for the Await Time to elapse, so keep it short
try (var initialCursor = collection.watch().maxAwaitTime(20, MILLISECONDS).cursor()) {
try (var initialCursor = collection.watch().maxAwaitTime(settings.experimental().changeStreamInitialWaitMS(), MILLISECONDS).cursor()) {
initialEvent = initialCursor.tryNext();
if (initialEvent == null) {
// In this case, tryNext() has caused the cursor to point to
Expand Down Expand Up @@ -224,39 +225,46 @@ private boolean setupNewSession(ChangeEventListener newListener) {
private void eventProcessingLoop(Session session) {
String oldThreadName = currentThread().getName();
currentThread().setName(getClass().getSimpleName() + " [" + boskName + "]");
try {
if (session.initialEvent != null) {
processEvent(session, session.initialEvent);
session.initialEvent = null; // Allow GC
}
while (true) {
if (settings.testing().eventDelayMS() > 0) {
LOGGER.debug("- Sleeping");
Thread.sleep(settings.testing().eventDelayMS());
try (MDCScope __ = setupMDC(boskName)) {
try {
if (session.initialEvent != null) {
LOGGER.debug("Processing initial event");
processEvent(session, session.initialEvent);
session.initialEvent = null; // Allow GC
}
processEvent(session, session.cursor.next());
}
} catch (UnprocessableEventException e) {
LOGGER.warn("Unprocessable event; discarding resume token", e);
lastProcessedResumeToken = null;
session.listener.onException(e);
} catch (InterruptedException | MongoInterruptedException e) {
// This can happen if stop() cancels the task with an interrupt; it's part of normal operation
LOGGER.info("Event loop interrupted", e);
Thread.interrupted();
} catch (MongoException e) {
if (session.isClosed) {
// This happens when stop() cancels the task; this is part of normal operation
LOGGER.info("Session is closed; exiting event loop", e);
} else {
LOGGER.warn("Unexpected MongoException while processing events; event loop aborted", e);
LOGGER.debug("Starting event loop");
while (true) {
if (settings.testing().eventDelayMS() > 0) {
LOGGER.debug("- Sleeping");
Thread.sleep(settings.testing().eventDelayMS());
}
processEvent(session, session.cursor.next());
}
} catch (UnprocessableEventException e) {
LOGGER.warn("Unprocessable event; discarding resume token", e);
lastProcessedResumeToken = null;
session.listener.onException(e);
} catch (InterruptedException | MongoInterruptedException e) {
// This can happen if stop() cancels the task with an interrupt; it's part of normal operation
LOGGER.info("Event loop interrupted", e);
Thread.interrupted();
} catch (NoSuchElementException e) {
LOGGER.warn("Change stream has ended; event loop terminated", e);
session.listener.onException(e);
} catch (MongoException e) {
if (session.isClosed) {
// This happens when stop() cancels the task; this is part of normal operation
LOGGER.info("Session is closed; exiting event loop", e);
} else {
LOGGER.warn("Unexpected MongoException while processing events; event loop terminated", e);
session.listener.onException(e);
}
} catch (RuntimeException e) {
LOGGER.warn("Unexpected exception while processing events; event loop terminated", e);
session.listener.onException(e);
} finally {
currentThread().setName(oldThreadName);
}
} catch (RuntimeException e) {
LOGGER.warn("Unexpected exception while processing events; event loop aborted", e);
session.listener.onException(e);
} finally {
currentThread().setName(oldThreadName);
}
}

Expand Down
Loading

0 comments on commit f83e32c

Please sign in to comment.