Skip to content

Commit

Permalink
Fix ReplicaSet.initialRoot.
Browse files Browse the repository at this point in the history
Identify (fairly arbitrarily) one of the replicas as the "primary" and
use its state as the initialRoot for other bosks. The primary will be
the bosk whose driver is constructed first using BroadcastDriver.
This will usually correspond to the bosk whose state gets updated first,
making it a good choice to supply the initial root.
  • Loading branch information
prdoyle committed Aug 19, 2024
1 parent b546d00 commit d922b6c
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 43 deletions.
118 changes: 78 additions & 40 deletions bosk-core/src/main/java/works/bosk/drivers/ReplicaSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,23 @@

import java.io.IOException;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import works.bosk.Bosk;
import works.bosk.BoskDriver;
import works.bosk.BoskInfo;
import works.bosk.DriverFactory;
import works.bosk.DriverStack;
import works.bosk.Identifier;
import works.bosk.Reference;
import works.bosk.RootReference;
import works.bosk.StateTreeNode;
import works.bosk.exceptions.InvalidTypeException;

import static java.util.Objects.requireNonNull;

/**
* A pool of bosks arranged such that submitting an update to any of them submits to all of them.
* New bosks can be added any time by initializing them with {@link #driverFactory()} in their driver stack.
Expand All @@ -41,51 +45,61 @@
public class ReplicaSet<R extends StateTreeNode> {
final Queue<Replica<R>> replicas = new ConcurrentLinkedQueue<>();

/**
* The bosk whose state is returned by {@link BroadcastDriver#initialRoot}.
*/
final AtomicReference<Replica<R>> primary = new AtomicReference<>(null);

/**
* Whether {@link BoskDriver#initialRoot} has been called for the primary replica.
*/
final AtomicBoolean isInitialized = new AtomicBoolean(false);

/**
* We can actually use the same driver for every bosk in the replica set
* because they all do the same thing!
*/
final BroadcastShim shim = new BroadcastShim();
final BroadcastDriver broadcastDriver = new BroadcastDriver();

public DriverFactory<R> driverFactory() {
return (b,d) -> {
replicas.add(new Replica<>(b.rootReference(), d));
return shim;
Replica<R> replica = new Replica<>(b, d);
primary.compareAndSet(null, replica);
replicas.add(replica);
return broadcastDriver;
};
}

/**
* Causes updates to be applied to {@code mirrors} and to the downstream driver.
* Causes updates to be applied to {@code mirrors} before proceeding to the downstream driver.
* <p>
* This is an asymmetric setup where updates to {@code mirrors} do not update the primary.
* (There's really no other option if you want to mirror changes to bosks that have already
* been initialized, because it's impossible to alter those bosks' drivers.)
* <p>
* Assuming the returned factory is only used once, this has the effect of creating
* a fixed replica set to which new replicas can't be added dynamically;
* but the returned factory can be used multiple times.
* however, the returned factory could be used multiple times.
* <p>
* It may seem counterintuitive that mirrors receive updates before the "main" bosk,
* but experience shows that the alternative is even more confusing.
* This way, placing {@code mirroringTo} in a {@link DriverStack} causes the mirroring
* to occur at that location in the stack, which is easy to understand.
*/
@SafeVarargs
public static <RR extends StateTreeNode> DriverFactory<RR> mirroringTo(Bosk<RR>... mirrors) {
var replicaSet = new ReplicaSet<RR>();
for (var m: mirrors) {
BoskDriver<RR> downstream = m.driver();
replicaSet.replicas.add(new Replica<>(
m.rootReference(),
new ForwardingDriver<>(downstream) {
@Override
public RR initialRoot(Type rootType) {
throw new UnsupportedOperationException("Don't use initialRoot from " + m);
}

@Override
public String toString() {
return downstream.toString() + " (minus initial state)";
}
}
));
replicaSet.replicas.add(new Replica<>(m, downstream));
}
return replicaSet.driverFactory();
}

/**
* Causes updates to be applied only to <code>other</code>.
* The resulting driver can accept references to a different bosk
* with the same root type.
*/
public static <RR extends StateTreeNode> BoskDriver<RR> redirectingTo(Bosk<RR> other) {
// A ReplicaSet with only the one replica
Expand All @@ -96,29 +110,43 @@ public static <RR extends StateTreeNode> BoskDriver<RR> redirectingTo(Bosk<RR> o
);
}

final class BroadcastShim implements BoskDriver<R> {
final class BroadcastDriver implements BoskDriver<R> {
/**
* TODO: should return the current state somehow. For now, I guess it's best to attach all the bosks before submitting any updates.
*
* @return The result of calling <code>initialRoot</code> on the first downstream driver
* that doesn't throw {@link UnsupportedOperationException}. Other exceptions are propagated as-is,
* and abort the initialization immediately.
* @return the <em>current state</em> of the replica set, which is the state of its primary
* as obtained by {@link Bosk#supersedingReadContext()}.
*/
@Override
public R initialRoot(Type rootType) throws InvalidTypeException, IOException, InterruptedException {
List<UnsupportedOperationException> exceptions = new ArrayList<>();
for (var r: replicas) {
try {
return r.driver().initialRoot(rootType);
} catch (UnsupportedOperationException e) {
exceptions.add(e);
assert !replicas.isEmpty(): "Replicas must be added during by the driver factory before the drivers are used";
var primary = requireNonNull(ReplicaSet.this.primary.get());
if (isInitialized.getAndSet(true)) {
// Secondary replicas should take their initial state from the primary.
//
// We assume the primary's constructor has finished by this point,
// which is true if the bosks are constructed in the same order as their drivers.
// This should be a safe assumption--some shenanigans would be required
// to violate this--but unfortunately we have no way to verify it here,
// because at this point in the code, we cannot tell which replica we're initializing.
try (var __ = primaryReadContext(primary)) {
return primary.boskInfo().rootReference().value();
}
} else {
// The first time this is called, we assume it's for the primary replica.
return primary.driver().initialRoot(rootType);
}
}

// Oh dear.
UnsupportedOperationException exception = new UnsupportedOperationException("Unable to forward initialRoot request");
exceptions.forEach(exception::addSuppressed);
throw exception;
private static <R extends StateTreeNode> Bosk<R>.ReadContext primaryReadContext(Replica<R> primary) {
try {
// We use supersedingReadContext here on the assumption that if the user,
// for some reason, created a secondary replica in the midst
// of a ReadContext on the primary, they would still want that secondary
// to see the "real" state.
return primary.boskInfo.bosk().supersedingReadContext();
} catch (IllegalStateException e) {
// You have engaged in the aforementioned shenanigans.
throw new IllegalStateException("Unable to acquire primary read context; multiple replicas are being initialized simultaneously", e);
}
}

@Override
Expand Down Expand Up @@ -167,18 +195,28 @@ public void flush() throws IOException, InterruptedException {

}

/**
* @param driver the downstream driver to use for a given replica
* (not the driver that would be returned from {@link Bosk#driver()}).
*/
record Replica<R extends StateTreeNode>(
RootReference<R> root,
BoskInfo<R> boskInfo,
BoskDriver<R> driver
){
) {
public RootReference<R> rootReference() {
return boskInfo.rootReference();
}

@SuppressWarnings("unchecked")
private <T> Reference<T> correspondingReference(Reference<T> original) {
try {
return (Reference<T>) root.then(Object.class, original.path());
return (Reference<T>) rootReference().then(Object.class, original.path());
} catch (InvalidTypeException e) {
throw new AssertionError("Every reference should support a target class of Object", e);
}
}

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class ReplicaSetConformanceTest extends DriverConformanceTest {
@BeforeEach
void setupDriverFactory() {
ReplicaSet<TestEntity> replicaSet = new ReplicaSet<>();
replicaBosk = new Bosk<TestEntity>(
replicaBosk = new Bosk<>(
boskName("Replica"),
TestEntity.class,
AbstractDriverTest::initialRoot,
Expand Down
21 changes: 19 additions & 2 deletions bosk-core/src/test/java/works/bosk/drivers/ReplicaSetTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,28 @@ public interface Refs {
@Test
void joinAfterUpdate_correctInitialState() throws InvalidTypeException {
var replicaSet = new ReplicaSet<TestEntity>();
var bosk1 = new Bosk<TestEntity>(boskName("bosk1"), TestEntity.class, AbstractDriverTest::initialRoot, replicaSet.driverFactory());
var bosk1 = new Bosk<>(boskName("bosk1"), TestEntity.class, AbstractDriverTest::initialRoot, replicaSet.driverFactory());
var refs1 = bosk1.rootReference().buildReferences(Refs.class);
bosk1.driver().submitReplacement(refs1.string(), "New value");

var bosk2 = new Bosk<TestEntity>(boskName("bosk2"), TestEntity.class, AbstractDriverTest::initialRoot, replicaSet.driverFactory());
var bosk2 = new Bosk<>(boskName("bosk2"), TestEntity.class, AbstractDriverTest::initialRoot, replicaSet.driverFactory());
var refs2 = bosk2.rootReference().buildReferences(Refs.class);
try (var _ = bosk2.readContext()) {
assertEquals("New value", refs2.string().value());
}
}

@Test
void secondaryConstructedInPrimaryReadContext_seesLatestState() throws InvalidTypeException {
var replicaSet = new ReplicaSet<TestEntity>();
var bosk1 = new Bosk<>(boskName("bosk1"), TestEntity.class, AbstractDriverTest::initialRoot, replicaSet.driverFactory());
var refs1 = bosk1.rootReference().buildReferences(Refs.class);

Bosk<TestEntity> bosk2;
try (var _ = bosk1.readContext()) {
bosk1.driver().submitReplacement(refs1.string(), "New value");
bosk2 = new Bosk<>(boskName("bosk2"), TestEntity.class, AbstractDriverTest::initialRoot, replicaSet.driverFactory());
}
var refs2 = bosk2.rootReference().buildReferences(Refs.class);
try (var _ = bosk2.readContext()) {
assertEquals("New value", refs2.string().value());
Expand Down

0 comments on commit d922b6c

Please sign in to comment.