Skip to content

Commit

Permalink
V3 resilient MongoDriver implementation (#34)
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle authored Jun 24, 2023
2 parents e0cf202 + 3a14881 commit 2ecf31b
Show file tree
Hide file tree
Showing 25 changed files with 2,209 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,9 @@
import io.vena.bosk.BoskDriver;
import io.vena.bosk.DriverFactory;
import io.vena.bosk.Entity;
import io.vena.bosk.drivers.mongo.v2.MainDriver;
import io.vena.bosk.drivers.mongo.v3.MainDriver;
import java.io.IOException;

import static io.vena.bosk.drivers.mongo.MongoDriverSettings.ImplementationKind.RESILIENT;

public interface MongoDriver<R extends Entity> extends BoskDriver<R> {
/**
* Deserializes and re-serializes the entire bosk contents,
Expand Down Expand Up @@ -51,10 +49,13 @@ static <RR extends Entity> MongoDriverFactory<RR> factory(
MongoDriverSettings driverSettings,
BsonPlugin bsonPlugin
) {
if (driverSettings.experimental().implementationKind() == RESILIENT) {
return (b, d) -> new MainDriver<>(b, clientSettings, driverSettings, bsonPlugin, d);
} else {
return (b, d) -> new SingleDocumentMongoDriver<>(b, clientSettings, driverSettings, bsonPlugin, d);
switch (driverSettings.experimental().implementationKind()) {
case RESILIENT:
return (b, d) -> new io.vena.bosk.drivers.mongo.v2.MainDriver<>(b, clientSettings, driverSettings, bsonPlugin, d);
case RESILIENT3:
return (b, d) -> new MainDriver<>(b, clientSettings, driverSettings, bsonPlugin, d);
default:
return (b, d) -> new SingleDocumentMongoDriver<>(b, clientSettings, driverSettings, bsonPlugin, d);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public enum ImplementationKind {
* Ignores {@link FlushMode FlushMode}; only supports the equivalent of {@link FlushMode#REVISION_FIELD_ONLY REVISION_FIELD_ONLY}.
*/
RESILIENT,
RESILIENT3,
}

public enum DatabaseFormat {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ private <T> BsonDocument deletionDoc(Reference<T> target) {
}

private BsonDocument updateDoc() {
return new BsonDocument("$inc", new BsonDocument(revision.name(), REVISION_ONE));
return new BsonDocument("$inc", new BsonDocument(revision.name(), new BsonInt64(1)));
}

private void ensureDocumentExists(BsonValue initialState, String updateCommand) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.vena.bosk.drivers.mongo.v3;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
import org.bson.Document;

interface ChangeListener {
void onConnectionSucceeded() throws
UnrecognizedFormatException,
UninitializedCollectionException,
InterruptedException,
IOException,
InitialRootException,
TimeoutException;

void onEvent(ChangeStreamDocument<Document> event) throws UnprocessableEventException;

void onConnectionFailed(Exception e) throws InterruptedException, InitialRootException, TimeoutException;
void onDisconnect(Exception e);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
package io.vena.bosk.drivers.mongo.v3;

import com.mongodb.MongoInterruptedException;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.vena.bosk.drivers.mongo.MongoDriverSettings;
import io.vena.bosk.drivers.mongo.v3.MappedDiagnosticContext.MDCScope;
import java.io.Closeable;
import java.io.IOException;
import java.util.NoSuchElementException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.vena.bosk.drivers.mongo.v3.MappedDiagnosticContext.setupMDC;
import static java.lang.Thread.currentThread;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
* Houses a background thread that repeatedly initializes, processes, and closes a change stream cursor.
* Ideally, the opening and closing happen just once, but they're done in a loop for fault tolerance,
* so that the driver can reinitialize if certain unusual conditions arise.
*/
class ChangeReceiver implements Closeable {
private final String boskName;
private final ChangeListener listener;
private final MongoDriverSettings settings;
private final MongoCollection<Document> collection;
private final ScheduledExecutorService ex = Executors.newScheduledThreadPool(1);
private volatile boolean isClosed = false;

ChangeReceiver(String boskName, ChangeListener listener, MongoDriverSettings settings, MongoCollection<Document> collection) {
this.boskName = boskName;
this.listener = listener;
this.settings = settings;
this.collection = collection;
ex.scheduleWithFixedDelay(
this::connectionLoop,
0,
settings.recoveryPollingMS(),
MILLISECONDS
);
}

@Override
public void close() {
isClosed = true;
ex.shutdownNow();
}

/**
* This method has a loop to do immediate reconnections and skip the
* {@link MongoDriverSettings#recoveryPollingMS() recoveryPollingMS} delay,
* but besides that, exiting this method has the same effect as continuing
* around the loop.
*/
private void connectionLoop() {
String oldThreadName = currentThread().getName();
currentThread().setName(getClass().getSimpleName() + " [" + boskName + "]");
try (MDCScope __ = setupMDC(boskName)) {
LOGGER.debug("Starting connectionLoop task");
try {
while (!isClosed) {
// Design notes:
//
// For the following try-catch clause, a `continue` causes us to attempt an immediate reconnection,
// while a `return` causes us to wait for the "recovery polling" interval to elapse first.
// When in doubt, `return` is a bit safer because it's unlikely to cause a spin-loop of rapid reconnections.
//
// Log `warn` and `error` levels are likely to be logged by applications in production,
// and so they will be visible to whatever team is operating the application that uses Bosk.
// They should be written with a reader in mind who is not a Bosk expert, perhaps not even
// knowing what bosk is, and should contain enough information to guide their troubleshooting efforts.
//
// Logs at `info` levels can target knowledgeable Bosk users, and should aim to explain what
// the library is doing in a way that helps them learn to use it more effectively.
//
// Logs at the `debug` level target Bosk developers. They can use some Bosk jargon, though
// they should also be helping new Bosk developers climb the learning curve. They should
// allow developers to tell what code paths executed.
//
// Logs at the `trace` level target expert Bosk developers troubleshooting very tricky bugs,
// and can include information that would be too voluminous to emit under most circumstances.
// Examples include stack traces for routine situations, or dumps of entire data structures,
// neither of which should be done at the `debug` level. It can also include high-frequency messages
// emitted many times for a single user action (again, not recommended at the `debug` level),
// though this must be done cautiously, since even disabled log statements still have nonzero overhead.
//
LOGGER.debug("Opening cursor");
try (MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor = openCursor()) {
try {
try {
listener.onConnectionSucceeded();

// Note that eventLoop does not throw RuntimeException; therefore,
// any RuntimeException must have occurred before this point.
// TODO: Two try blocks?
eventLoop(cursor);
} finally {
if (isClosed) {
LOGGER.debug("Cursor is closed; skipping usual error handling");
return;
}
}
} catch (UnprocessableEventException|UnexpectedEventProcessingException e) {
LOGGER.warn("Unable to process MongoDB change event; reconnecting: {}", e.toString(), e);
listener.onDisconnect(e);
// Reconnection will skip this event, so it's safe to try it right away
continue;
} catch (InterruptedException e) {
LOGGER.warn("Interrupted while processing MongoDB change events; reconnecting", e);
listener.onDisconnect(e);
continue;
} catch (IOException e) {
LOGGER.warn("Unexpected exception while processing MongoDB change events; will wait and retry", e);
listener.onDisconnect(e);
return;
} catch (UnrecognizedFormatException e) {
LOGGER.warn("Unrecognized MongoDB database content format; will wait and retry", e);
listener.onDisconnect(e);
return;
} catch (UninitializedCollectionException e) {
LOGGER.warn("MongoDB collection is not initialized; will wait and retry", e);
listener.onDisconnect(e);
return;
} catch (InitialRootException e) {
LOGGER.warn("Unable to initialize bosk state; will wait and retry", e);
listener.onDisconnect(e);
return;
} catch (TimeoutException e) {
LOGGER.warn("Timed out waiting for bosk state to initialize; will wait and retry", e);
listener.onDisconnect(e);
return;
} catch (RuntimeException e) {
LOGGER.warn("Unexpected exception after connecting to MongoDB; will wait and retry", e);
listener.onDisconnect(e);
return;
}
} catch (RuntimeException e) {
LOGGER.warn("Unable to connect to MongoDB database; will wait and retry", e);
try {
listener.onConnectionFailed(e);
} catch (InterruptedException | InitialRootException | TimeoutException e2) {
LOGGER.error("Error while running MongoDB connection failure handler; will wait and reconnect", e2);
}
return;
}
LOGGER.trace("Change event processing returned normally");
}
} finally {
LOGGER.debug("Ending connectionLoop task; isClosed={}", isClosed);
currentThread().setName(oldThreadName);
}
} catch (RuntimeException e) {
LOGGER.warn("connectionLoop task ended with unexpected {}; discarding", e.getClass().getSimpleName(), e);
}
}

private MongoChangeStreamCursor<ChangeStreamDocument<Document>> openCursor() {
MongoChangeStreamCursor<ChangeStreamDocument<Document>> result = collection
.watch()
.maxAwaitTime(settings.recoveryPollingMS(), MILLISECONDS)
.cursor();
LOGGER.debug("Cursor is open");
return result;
}

/**
* Should not throw RuntimeException, or else {@link #connectionLoop()} is likely to overreact.
*/
private void eventLoop(MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor) throws UnprocessableEventException, UnexpectedEventProcessingException {
if (isClosed) {
LOGGER.debug("Receiver is closed");
return;
}
try {
LOGGER.debug("Starting event loop");
while (!isClosed) {
ChangeStreamDocument<Document> event;
try {
event = cursor.next();
} catch (NoSuchElementException e) {
LOGGER.debug("Cursor is finished");
break;
} catch (MongoInterruptedException e) {
LOGGER.debug("Interrupted while waiting for change event: {}", e.toString());
break;
}
processEvent(event);
}
} catch (RuntimeException e) {
LOGGER.debug("Unexpected {} while processing events", e.getClass().getSimpleName(), e);
throw new UnexpectedEventProcessingException(e);
} finally {
LOGGER.debug("Exited event loop");
}
}

private void processEvent(ChangeStreamDocument<Document> event) throws UnprocessableEventException {
switch (event.getOperationType()) {
case INSERT:
case UPDATE:
case REPLACE:
case DELETE:
case RENAME:
listener.onEvent(event);
break;
case DROP:
case DROP_DATABASE:
case INVALIDATE:
case OTHER:
throw new UnprocessableEventException("Disruptive event received", event.getOperationType());
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(ChangeReceiver.class);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.vena.bosk.drivers.mongo.v3;

import com.mongodb.client.model.changestream.ChangeStreamDocument;
import io.vena.bosk.Entity;
import io.vena.bosk.Identifier;
import io.vena.bosk.Reference;
import io.vena.bosk.drivers.mongo.v2.DisconnectedException;
import io.vena.bosk.exceptions.InitializationFailureException;
import java.io.IOException;
import lombok.RequiredArgsConstructor;
import org.bson.BsonInt64;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RequiredArgsConstructor
public class DisconnectedDriver<R extends Entity> implements FormatDriver<R> {
private final String reason;
@Override
public <T> void submitReplacement(Reference<T> target, T newValue) {
throw disconnected();
}

@Override
public <T> void submitConditionalReplacement(Reference<T> target, T newValue, Reference<Identifier> precondition, Identifier requiredValue) {
throw disconnected();
}

@Override
public <T> void submitInitialization(Reference<T> target, T newValue) {
throw disconnected();
}

@Override
public <T> void submitDeletion(Reference<T> target) {
throw disconnected();
}

@Override
public <T> void submitConditionalDeletion(Reference<T> target, Reference<Identifier> precondition, Identifier requiredValue) {
throw disconnected();
}

@Override
public void flush() throws IOException, InterruptedException {
throw disconnected();
}

@Override
public void close() {
// Nothing to do
}

@Override
public void onEvent(ChangeStreamDocument<Document> event) {
LOGGER.debug("Already disconnected; ignoring event ({})", event.getOperationType().getValue());
}

@Override
public void onRevisionToSkip(BsonInt64 revision) {
throw new AssertionError("Resynchronization should not tell DisconnectedDriver to skip a revision");
}

@Override
public StateAndMetadata<R> loadAllState() throws IOException, UninitializedCollectionException {
throw disconnected();
}

@Override
public void initializeCollection(StateAndMetadata<R> priorContents) throws InitializationFailureException {
throw disconnected();
}

private DisconnectedException disconnected() {
return new DisconnectedException(reason);
}

private static final Logger LOGGER = LoggerFactory.getLogger(DisconnectedDriver.class);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.vena.bosk.drivers.mongo.v3;

import io.vena.bosk.drivers.mongo.MongoDriver;
import java.util.concurrent.FutureTask;

import static java.util.Objects.requireNonNull;

/**
* Unlike other exceptions we use internally, this one is a {@link RuntimeException}
* because it's thrown from a {@link FutureTask}, and those can't throw checked exceptions.
* Callers of {@link FutureTask#get()} implementing {@link MongoDriver#initialRoot}
* need to handle this appropriately without any help from the compiler.
*/
public class DownstreamInitialRootException extends IllegalStateException {
public DownstreamInitialRootException(String message, Throwable cause) {
super(message, requireNonNull(cause));
}

public DownstreamInitialRootException(Throwable cause) {
super(requireNonNull(cause));
}
}
Loading

0 comments on commit 2ecf31b

Please sign in to comment.