Skip to content

Commit

Permalink
Debug resilient driver (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
prdoyle authored Jun 6, 2023
2 parents 6b7edde + 8982c2c commit a0d2695
Show file tree
Hide file tree
Showing 14 changed files with 640 additions and 229 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ public interface MongoDriver<R extends Entity> extends BoskDriver<R> {
* database state, deserializes it, re-serializes it, and writes it back.
* This produces predictable results even if done concurrently with
* other database updates.
*
* <p>
* This requires the database state to be in good condition at the outset;
* it can't generally be used to repair corrupted databases
* unless the corruption is so mild that it doesn't
* interfere with proper functioning beforehand.
* It can be expected to evolve the database from that of a supported prior format,
* but for unsupported formats or other corruption, YMMV.
*/
void refurbish() throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
import org.bson.Document;

interface ChangeEventListener {
void onEvent(ChangeStreamDocument<Document> event);
void onEvent(ChangeStreamDocument<Document> event) throws UnprocessableEventException;
void onException(Exception e);
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.vena.bosk.drivers.mongo.v2;

import com.mongodb.MongoCommandException;
import com.mongodb.MongoException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.client.MongoChangeStreamCursor;
import com.mongodb.client.MongoCollection;
Expand All @@ -25,6 +26,7 @@
import org.slf4j.LoggerFactory;

import static java.lang.Thread.currentThread;
import static java.lang.Thread.sleep;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
Expand Down Expand Up @@ -61,6 +63,7 @@ private static final class Session {
final MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor;
final ChangeEventListener listener;
ChangeStreamDocument<Document> initialEvent;
volatile boolean isClosed;
}

/**
Expand All @@ -76,16 +79,16 @@ private static final class Session {
* thread itself, since a re-initialization could be triggered by an event or exception.
* For example, a {@link ChangeEventListener#onException} implementation can call this.
*
* @return true if we obtained a resume token.
* @return true if we succeeded in establishing a new session;
* false if we should enter the disconnected state
* @see #start()
*/
public boolean initialize(ChangeEventListener listener) throws ReceiverInitializationException {
LOGGER.debug("Initializing receiver");
try {
lock.lock();
stop();
setupNewSession(listener);
return lastProcessedResumeToken != null;
return setupNewSession(listener);
} catch (RuntimeException | InterruptedException | TimeoutException e) {
throw new ReceiverInitializationException(e);
} finally {
Expand Down Expand Up @@ -116,13 +119,24 @@ public void start() {
public void stop() throws InterruptedException, TimeoutException {
try {
lock.lock();
Session session = currentSession;
if (session != null) {
session.isClosed = true;
session.cursor.close();
}
Future<?> task = this.eventProcessingTask;
if (task != null) {
LOGGER.debug("Canceling event processing task");
task.cancel(true);
task.cancel(
// You'd think this should be true, but the Mongo client does not seem
// to deal with being interrupted very well. Closing the cursor seems
// to have the right effect though.
false
);
try {
task.get(10, SECONDS); // TODO: Config
LOGGER.warn("Normal completion of event processing task was not expected");
LOGGER.debug("Cancellation succeeded; event loop exited normally");
this.eventProcessingTask = null;
} catch (CancellationException e) {
LOGGER.debug("Cancellation succeeded; event loop interrupted");
this.eventProcessingTask = null;
Expand All @@ -145,16 +159,30 @@ public void close() {
ex.shutdown();
}

private void setupNewSession(ChangeEventListener newListener) {
/**
* @return true if we succeeded in establishing a new session;
* false if we should enter the disconnected state
*/
private boolean setupNewSession(ChangeEventListener newListener) {
assert this.eventProcessingTask == null;
LOGGER.debug("Setup new session");
this.currentSession = null; // In case any exceptions happen during this method

for (int attempt = 1; attempt <= 2; attempt++) {
int attempt;
for (attempt = 1; attempt <= 2; attempt++) {
LOGGER.debug("Attempt #{}", attempt);
ChangeStreamDocument<Document> initialEvent;
BsonDocument resumePoint = lastProcessedResumeToken;
BsonDocument resumePoint = null; //lastProcessedResumeToken;
if (resumePoint == null) {
if (settings.testing().eventDelayMS() < 0) {
LOGGER.debug("- Sleeping");
try {
sleep(-settings.testing().eventDelayMS());
} catch (InterruptedException e) {
LOGGER.debug("Sleep aborted; continuing", e);
Thread.interrupted();
}
}
LOGGER.debug("Acquire initial resume token");
// TODO: Config
// Note: on a quiescent collection, tryNext() will wait for the Await Time to elapse, so keep it short
Expand All @@ -178,14 +206,16 @@ private void setupNewSession(ChangeEventListener newListener) {
try {
MongoChangeStreamCursor<ChangeStreamDocument<Document>> cursor
= collection.watch().resumeAfter(resumePoint).cursor();
currentSession = new Session(cursor, newListener, initialEvent);
return;
currentSession = new Session(cursor, newListener, initialEvent, false);
return true;
} catch (MongoCommandException e) {
LOGGER.error("Change stream cursor command failed; discarding resume token", e);
lastProcessedResumeToken = null;
// If we haven't already retried, we'll continue around the loop
}
}
LOGGER.debug("Giving up initializing session after attempt #{}", attempt-1);
return false;
}

/**
Expand All @@ -202,18 +232,26 @@ private void eventProcessingLoop(Session session) {
while (true) {
if (settings.testing().eventDelayMS() > 0) {
LOGGER.debug("- Sleeping");
try {
Thread.sleep(settings.testing().eventDelayMS());
} catch (InterruptedException e) {
LOGGER.debug("| Interrupted");
}
Thread.sleep(settings.testing().eventDelayMS());
}
processEvent(session, session.cursor.next());
}
} catch (MongoInterruptedException e) {
// This happens when stop() cancels the task; this is part of normal operation
LOGGER.debug("Event loop interrupted", e);
} catch (UnprocessableEventException e) {
LOGGER.warn("Unprocessable event; discarding resume token", e);
lastProcessedResumeToken = null;
session.listener.onException(e);
} catch (InterruptedException | MongoInterruptedException e) {
// This can happen if stop() cancels the task with an interrupt; it's part of normal operation
LOGGER.info("Event loop interrupted", e);
Thread.interrupted();
} catch (MongoException e) {
if (session.isClosed) {
// This happens when stop() cancels the task; this is part of normal operation
LOGGER.info("Session is closed; exiting event loop", e);
} else {
LOGGER.warn("Unexpected MongoException while processing events; event loop aborted", e);
session.listener.onException(e);
}
} catch (RuntimeException e) {
LOGGER.warn("Unexpected exception while processing events; event loop aborted", e);
session.listener.onException(e);
Expand All @@ -222,7 +260,7 @@ private void eventProcessingLoop(Session session) {
}
}

private void processEvent(Session session, ChangeStreamDocument<Document> event) {
private void processEvent(Session session, ChangeStreamDocument<Document> event) throws UnprocessableEventException {
session.listener.onEvent(event);
lastProcessedResumeToken = event.getResumeToken();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@
import io.vena.bosk.Identifier;
import io.vena.bosk.Reference;
import java.io.IOException;
import lombok.RequiredArgsConstructor;
import org.bson.BsonInt64;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RequiredArgsConstructor
class DisconnectedDriver<R extends Entity> implements FormatDriver<R> {
@Override
public boolean isDisconnected() {
return true;
}
final String reason;

@Override
public <T> void submitReplacement(Reference<T> target, T newValue) {
Expand Down Expand Up @@ -65,12 +64,12 @@ public void onRevisionToSkip(BsonInt64 revision) {
}

private DisconnectedException disconnected(String name) {
return new DisconnectedException("Disconnected driver cannot execute " + name);
return new DisconnectedException("Cannot execute " + name + " while disconnected (due to: " + reason + ")");
}

@Override
public void onEvent(ChangeStreamDocument<Document> event) {
LOGGER.info("Event received in disconnected mode: {} {}", event.getOperationType(), event.getResumeToken());
LOGGER.info("Ignoring {} event while disconnected (due to: {})", event.getOperationType(), reason);
}

private static final Logger LOGGER = LoggerFactory.getLogger(DisconnectedDriver.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.vena.bosk.drivers.mongo.v2;

public class DisconnectedException extends RuntimeException {
public DisconnectedException() {
}

public DisconnectedException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.vena.bosk.drivers.mongo.MongoDriverSettings;
import io.vena.bosk.exceptions.FlushFailureException;
import java.io.Closeable;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
Expand All @@ -16,18 +17,20 @@
/**
* Implements waiting mechanism for revision numbers
*/
class FlushLock {
class FlushLock implements Closeable {
private final MongoDriverSettings settings;
private final Lock queueLock = new ReentrantLock();
private final PriorityBlockingQueue<Waiter> queue = new PriorityBlockingQueue<>();
private volatile long alreadySeen;
private boolean isClosed;

/**
* @param revisionAlreadySeen needs to be the exact revision from the database:
* too old, and we'll wait forever for intervening revisions that have already happened;
* too new, and we'll proceed immediately without waiting for revisions that haven't happened yet.
*/
public FlushLock(MongoDriverSettings settings, long revisionAlreadySeen) {
LOGGER.debug("New flush lock at revision {}", revisionAlreadySeen);
this.settings = settings;
this.alreadySeen = revisionAlreadySeen;
}
Expand Down Expand Up @@ -57,10 +60,13 @@ void awaitRevision(BsonInt64 revision) throws InterruptedException, FlushFailure
if (revisionValue > past) {
LOGGER.debug("Awaiting revision {} > {}", revisionValue, past);
if (!semaphore.tryAcquire(settings.flushTimeoutMS(), MILLISECONDS)) {
throw new FlushFailureException("Timed out waiting for revision " + revisionValue);
throw new FlushFailureException("Timed out waiting for revision " + revisionValue + " > " + alreadySeen);
}
if (isClosed) {
throw new FlushFailureException("Wait aborted");
}
} else {
LOGGER.debug("Revision {} is in the past; don't wait", revisionValue);
LOGGER.debug("Revision {} <= {} is in the past; don't wait", revisionValue, past);
return;
}
LOGGER.trace("Done awaiting revision {}", revisionValue);
Expand All @@ -74,10 +80,18 @@ void finishedRevision(BsonInt64 revision) {
if (revision == null) {
return;
}
long revisionValue = revision.longValue();

try {
queueLock.lock();
long revisionValue = revision.longValue();
if (isClosed) {
LOGGER.debug("Closed FlushLock ignoring revision {}", revisionValue);
return;
}
if (revisionValue <= alreadySeen) {
LOGGER.debug("Revision did not advance: {} <= {}", revisionValue, alreadySeen);
}

do {
Waiter w = queue.peek();
if (w == null || w.revision > revisionValue) {
Expand All @@ -89,13 +103,27 @@ void finishedRevision(BsonInt64 revision) {
}
} while (true);

assert alreadySeen <= revisionValue;
alreadySeen = revisionValue;
LOGGER.debug("Finished {}", revisionValue);
} finally {
queueLock.unlock();
}
}

@Override
public void close() {
try {
queueLock.lock();
LOGGER.debug("Closing");
isClosed = true;
Waiter w;
while ((w = queue.poll()) != null) {
w.semaphore.release();
}
} finally {
queueLock.unlock();
}
}

private static final Logger LOGGER = LoggerFactory.getLogger(FlushLock.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
* </li></ol>
*/
interface FormatDriver<R extends Entity> extends MongoDriver<R> {
void onEvent(ChangeStreamDocument<Document> event);
void onEvent(ChangeStreamDocument<Document> event) throws UnprocessableEventException;

/**
* Implementations should ignore subsequent calls to {@link #onEvent}
Expand All @@ -52,8 +52,6 @@ interface FormatDriver<R extends Entity> extends MongoDriver<R> {
*/
void initializeCollection(StateAndMetadata<R> contents) throws InitializationFailureException;

default boolean isDisconnected() { return false; }

@Override
default R initialRoot(Type rootType) {
throw new UnsupportedOperationException(
Expand Down
Loading

0 comments on commit a0d2695

Please sign in to comment.