From d922b6c8e4ce6a10f40cf43ae1ed40957803f2fe Mon Sep 17 00:00:00 2001 From: Patrick Doyle Date: Mon, 19 Aug 2024 09:39:52 -0400 Subject: [PATCH] Fix ReplicaSet.initialRoot. Identify (fairly arbitrarily) one of the replicas as the "primary" and use its state as the initialRoot for other bosks. The primary will be the bosk whose driver is constructed first using BroadcastDriver. This will usually correspond to the bosk whose state gets updated first, making it a good choice to supply the initial root. --- .../java/works/bosk/drivers/ReplicaSet.java | 118 ++++++++++++------ .../drivers/ReplicaSetConformanceTest.java | 2 +- .../works/bosk/drivers/ReplicaSetTest.java | 21 +++- 3 files changed, 98 insertions(+), 43 deletions(-) diff --git a/bosk-core/src/main/java/works/bosk/drivers/ReplicaSet.java b/bosk-core/src/main/java/works/bosk/drivers/ReplicaSet.java index 42b1f0b4..ea0d6046 100644 --- a/bosk-core/src/main/java/works/bosk/drivers/ReplicaSet.java +++ b/bosk-core/src/main/java/works/bosk/drivers/ReplicaSet.java @@ -2,19 +2,23 @@ import java.io.IOException; import java.lang.reflect.Type; -import java.util.ArrayList; -import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import works.bosk.Bosk; import works.bosk.BoskDriver; +import works.bosk.BoskInfo; import works.bosk.DriverFactory; +import works.bosk.DriverStack; import works.bosk.Identifier; import works.bosk.Reference; import works.bosk.RootReference; import works.bosk.StateTreeNode; import works.bosk.exceptions.InvalidTypeException; +import static java.util.Objects.requireNonNull; + /** * A pool of bosks arranged such that submitting an update to any of them submits to all of them. * New bosks can be added any time by initializing them with {@link #driverFactory()} in their driver stack. @@ -41,51 +45,61 @@ public class ReplicaSet { final Queue> replicas = new ConcurrentLinkedQueue<>(); + /** + * The bosk whose state is returned by {@link BroadcastDriver#initialRoot}. + */ + final AtomicReference> primary = new AtomicReference<>(null); + + /** + * Whether {@link BoskDriver#initialRoot} has been called for the primary replica. + */ + final AtomicBoolean isInitialized = new AtomicBoolean(false); + /** * We can actually use the same driver for every bosk in the replica set * because they all do the same thing! */ - final BroadcastShim shim = new BroadcastShim(); + final BroadcastDriver broadcastDriver = new BroadcastDriver(); public DriverFactory driverFactory() { return (b,d) -> { - replicas.add(new Replica<>(b.rootReference(), d)); - return shim; + Replica replica = new Replica<>(b, d); + primary.compareAndSet(null, replica); + replicas.add(replica); + return broadcastDriver; }; } /** - * Causes updates to be applied to {@code mirrors} and to the downstream driver. + * Causes updates to be applied to {@code mirrors} before proceeding to the downstream driver. + *

+ * This is an asymmetric setup where updates to {@code mirrors} do not update the primary. + * (There's really no other option if you want to mirror changes to bosks that have already + * been initialized, because it's impossible to alter those bosks' drivers.) *

* Assuming the returned factory is only used once, this has the effect of creating * a fixed replica set to which new replicas can't be added dynamically; - * but the returned factory can be used multiple times. + * however, the returned factory could be used multiple times. + *

+ * It may seem counterintuitive that mirrors receive updates before the "main" bosk, + * but experience shows that the alternative is even more confusing. + * This way, placing {@code mirroringTo} in a {@link DriverStack} causes the mirroring + * to occur at that location in the stack, which is easy to understand. */ @SafeVarargs public static DriverFactory mirroringTo(Bosk... mirrors) { var replicaSet = new ReplicaSet(); for (var m: mirrors) { BoskDriver downstream = m.driver(); - replicaSet.replicas.add(new Replica<>( - m.rootReference(), - new ForwardingDriver<>(downstream) { - @Override - public RR initialRoot(Type rootType) { - throw new UnsupportedOperationException("Don't use initialRoot from " + m); - } - - @Override - public String toString() { - return downstream.toString() + " (minus initial state)"; - } - } - )); + replicaSet.replicas.add(new Replica<>(m, downstream)); } return replicaSet.driverFactory(); } /** * Causes updates to be applied only to other. + * The resulting driver can accept references to a different bosk + * with the same root type. */ public static BoskDriver redirectingTo(Bosk other) { // A ReplicaSet with only the one replica @@ -96,29 +110,43 @@ public static BoskDriver redirectingTo(Bosk o ); } - final class BroadcastShim implements BoskDriver { + final class BroadcastDriver implements BoskDriver { /** - * TODO: should return the current state somehow. For now, I guess it's best to attach all the bosks before submitting any updates. - * - * @return The result of calling initialRoot on the first downstream driver - * that doesn't throw {@link UnsupportedOperationException}. Other exceptions are propagated as-is, - * and abort the initialization immediately. + * @return the current state of the replica set, which is the state of its primary + * as obtained by {@link Bosk#supersedingReadContext()}. */ @Override public R initialRoot(Type rootType) throws InvalidTypeException, IOException, InterruptedException { - List exceptions = new ArrayList<>(); - for (var r: replicas) { - try { - return r.driver().initialRoot(rootType); - } catch (UnsupportedOperationException e) { - exceptions.add(e); + assert !replicas.isEmpty(): "Replicas must be added during by the driver factory before the drivers are used"; + var primary = requireNonNull(ReplicaSet.this.primary.get()); + if (isInitialized.getAndSet(true)) { + // Secondary replicas should take their initial state from the primary. + // + // We assume the primary's constructor has finished by this point, + // which is true if the bosks are constructed in the same order as their drivers. + // This should be a safe assumption--some shenanigans would be required + // to violate this--but unfortunately we have no way to verify it here, + // because at this point in the code, we cannot tell which replica we're initializing. + try (var __ = primaryReadContext(primary)) { + return primary.boskInfo().rootReference().value(); } + } else { + // The first time this is called, we assume it's for the primary replica. + return primary.driver().initialRoot(rootType); } + } - // Oh dear. - UnsupportedOperationException exception = new UnsupportedOperationException("Unable to forward initialRoot request"); - exceptions.forEach(exception::addSuppressed); - throw exception; + private static Bosk.ReadContext primaryReadContext(Replica primary) { + try { + // We use supersedingReadContext here on the assumption that if the user, + // for some reason, created a secondary replica in the midst + // of a ReadContext on the primary, they would still want that secondary + // to see the "real" state. + return primary.boskInfo.bosk().supersedingReadContext(); + } catch (IllegalStateException e) { + // You have engaged in the aforementioned shenanigans. + throw new IllegalStateException("Unable to acquire primary read context; multiple replicas are being initialized simultaneously", e); + } } @Override @@ -167,18 +195,28 @@ public void flush() throws IOException, InterruptedException { } + /** + * @param driver the downstream driver to use for a given replica + * (not the driver that would be returned from {@link Bosk#driver()}). + */ record Replica( - RootReference root, + BoskInfo boskInfo, BoskDriver driver - ){ + ) { + public RootReference rootReference() { + return boskInfo.rootReference(); + } + @SuppressWarnings("unchecked") private Reference correspondingReference(Reference original) { try { - return (Reference) root.then(Object.class, original.path()); + return (Reference) rootReference().then(Object.class, original.path()); } catch (InvalidTypeException e) { throw new AssertionError("Every reference should support a target class of Object", e); } } + } + } diff --git a/bosk-core/src/test/java/works/bosk/drivers/ReplicaSetConformanceTest.java b/bosk-core/src/test/java/works/bosk/drivers/ReplicaSetConformanceTest.java index ba9da08a..7ded8e03 100644 --- a/bosk-core/src/test/java/works/bosk/drivers/ReplicaSetConformanceTest.java +++ b/bosk-core/src/test/java/works/bosk/drivers/ReplicaSetConformanceTest.java @@ -14,7 +14,7 @@ class ReplicaSetConformanceTest extends DriverConformanceTest { @BeforeEach void setupDriverFactory() { ReplicaSet replicaSet = new ReplicaSet<>(); - replicaBosk = new Bosk( + replicaBosk = new Bosk<>( boskName("Replica"), TestEntity.class, AbstractDriverTest::initialRoot, diff --git a/bosk-core/src/test/java/works/bosk/drivers/ReplicaSetTest.java b/bosk-core/src/test/java/works/bosk/drivers/ReplicaSetTest.java index 3e60ffca..d2bd68dc 100644 --- a/bosk-core/src/test/java/works/bosk/drivers/ReplicaSetTest.java +++ b/bosk-core/src/test/java/works/bosk/drivers/ReplicaSetTest.java @@ -18,11 +18,28 @@ public interface Refs { @Test void joinAfterUpdate_correctInitialState() throws InvalidTypeException { var replicaSet = new ReplicaSet(); - var bosk1 = new Bosk(boskName("bosk1"), TestEntity.class, AbstractDriverTest::initialRoot, replicaSet.driverFactory()); + var bosk1 = new Bosk<>(boskName("bosk1"), TestEntity.class, AbstractDriverTest::initialRoot, replicaSet.driverFactory()); var refs1 = bosk1.rootReference().buildReferences(Refs.class); bosk1.driver().submitReplacement(refs1.string(), "New value"); - var bosk2 = new Bosk(boskName("bosk2"), TestEntity.class, AbstractDriverTest::initialRoot, replicaSet.driverFactory()); + var bosk2 = new Bosk<>(boskName("bosk2"), TestEntity.class, AbstractDriverTest::initialRoot, replicaSet.driverFactory()); + var refs2 = bosk2.rootReference().buildReferences(Refs.class); + try (var _ = bosk2.readContext()) { + assertEquals("New value", refs2.string().value()); + } + } + + @Test + void secondaryConstructedInPrimaryReadContext_seesLatestState() throws InvalidTypeException { + var replicaSet = new ReplicaSet(); + var bosk1 = new Bosk<>(boskName("bosk1"), TestEntity.class, AbstractDriverTest::initialRoot, replicaSet.driverFactory()); + var refs1 = bosk1.rootReference().buildReferences(Refs.class); + + Bosk bosk2; + try (var _ = bosk1.readContext()) { + bosk1.driver().submitReplacement(refs1.string(), "New value"); + bosk2 = new Bosk<>(boskName("bosk2"), TestEntity.class, AbstractDriverTest::initialRoot, replicaSet.driverFactory()); + } var refs2 = bosk2.rootReference().buildReferences(Refs.class); try (var _ = bosk2.readContext()) { assertEquals("New value", refs2.string().value());