From a3f6a0b26fb8b09df8301f0dc6ac0ad31d1f89b0 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Fri, 8 Nov 2024 15:27:55 +0100 Subject: [PATCH] Introduce database versioning and migrations The fix for #238 does not mean that everything is fine. We will have databases which have the wrong value/priority sets written and this would only fix itself on new writes or deletes to the same key. So we are unfortunately forced to manually fix it on start. For this we introduce a data migration. During a fresh start, we will then find all the keys affected by tombstones and loop them, finding the best value for them (the correct one) and fixing them. Once done we record that we are on version=1 and don't run this again. Future fuckups can be fixed with other migrations. --- crdt.go | 50 +++++++++---- crdt_test.go | 196 +++++++++++++++++++++++++++++++++++++-------------- 2 files changed, 181 insertions(+), 65 deletions(-) diff --git a/crdt.go b/crdt.go index 282583bc..e9a0cd55 100644 --- a/crdt.go +++ b/crdt.go @@ -48,6 +48,7 @@ const ( setNs = "s" // set processedBlocksNs = "b" // blocks dirtyBitKey = "d" // dirty + versionKey = "crdt_version" ) // Common errors. @@ -86,9 +87,9 @@ type Options struct { // element is successfully removed from the datastore (either by a // local or remote update). Unordered and concurrent updates may // result in the DeleteHook being triggered even though the element is - // still present in the datastore because it was re-added. If that is - // relevant, use Has() to check if the removed element is still part - // of the datastore. + // still present in the datastore because it was re-added or not fully + // tombstoned. If that is relevant, use Has() to check if the removed + // element is still part of the datastore. DeleteHook func(k ds.Key) // NumWorkers specifies the number of workers ready to walk DAGs NumWorkers int @@ -257,7 +258,7 @@ func New( } ctx, cancel := context.WithCancel(context.Background()) - set, err := newCRDTSet(ctx, store, fullSetNs, opts.Logger, setPutHook, setDeleteHook) + set, err := newCRDTSet(ctx, store, fullSetNs, dagSyncer, opts.Logger, setPutHook, setDeleteHook) if err != nil { cancel() return nil, errors.Wrap(err, "error setting up crdt set") @@ -285,8 +286,15 @@ func New( queuedChildren: newCidSafeSet(), } + err = dstore.applyMigrations(ctx) + if err != nil { + cancel() + return nil, err + } + headList, maxHeight, err := dstore.heads.List() if err != nil { + cancel() return nil, err } dstore.logger.Infof( @@ -576,7 +584,7 @@ func (store *Datastore) handleBlock(c cid.Cid) error { // Ignore already processed blocks. // This includes the case when the block is a current // head. - isProcessed, err := store.isProcessed(c) + isProcessed, err := store.isProcessed(store.ctx, c) if err != nil { return errors.Wrapf(err, "error checking for known block %s", c) } @@ -733,11 +741,11 @@ func (store *Datastore) processedBlockKey(c cid.Cid) ds.Key { return store.namespace.ChildString(processedBlocksNs).ChildString(dshelp.MultihashToDsKey(c.Hash()).String()) } -func (store *Datastore) isProcessed(c cid.Cid) (bool, error) { +func (store *Datastore) isProcessed(ctx context.Context, c cid.Cid) (bool, error) { return store.store.Has(store.ctx, store.processedBlockKey(c)) } -func (store *Datastore) markProcessed(c cid.Cid) error { +func (store *Datastore) markProcessed(ctx context.Context, c cid.Cid) error { return store.store.Put(store.ctx, store.processedBlockKey(c), nil) } @@ -785,7 +793,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u // Record that we have processed the node so that any other worker // can skip it. - err = store.markProcessed(current) + err = store.markProcessed(store.ctx, current) if err != nil { return nil, errors.Wrapf(err, "error recording %s as processed", current) } @@ -827,7 +835,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u return nil, errors.Wrapf(err, "error checking if %s is head", child) } - isProcessed, err := store.isProcessed(child) + isProcessed, err := store.isProcessed(store.ctx, child) if err != nil { return nil, errors.Wrapf(err, "error checking for known block %s", child) } @@ -959,7 +967,7 @@ func (store *Datastore) repairDAG() error { } cancel() - isProcessed, err := store.isProcessed(cur) + isProcessed, err := store.isProcessed(store.ctx, cur) if err != nil { return errors.Wrapf(err, "error checking for reprocessed block %s", cur) } @@ -1364,7 +1372,9 @@ func (store *Datastore) printDAGRec(from cid.Cid, depth uint64, ng *crdtNodeGett return nil } - nd, delta, err := ng.GetDelta(context.Background(), from) + ctx, cancel := context.WithTimeout(store.ctx, store.opts.DAGSyncerTimeout) + defer cancel() + nd, delta, err := ng.GetDelta(ctx, from) if err != nil { return err } @@ -1385,7 +1395,19 @@ func (store *Datastore) printDAGRec(from cid.Cid, depth uint64, ng *crdtNodeGett cidStr = cidStr[len(cidStr)-4:] line += fmt.Sprintf("%s,", cidStr) } - line += "}:" + line += "}" + + processed, err := store.isProcessed(store.ctx, nd.Cid()) + if err != nil { + return err + } + + if !processed { + line += " Unprocessed!" + } + + line += ":" + fmt.Println(line) for _, l := range nd.Links() { store.printDAGRec(l.Cid, depth+1, ng, set) @@ -1433,7 +1455,9 @@ func (store *Datastore) dotDAGRec(w io.Writer, from cid.Cid, depth uint64, ng *c return nil } - nd, delta, err := ng.GetDelta(context.Background(), from) + ctx, cancel := context.WithTimeout(store.ctx, store.opts.DAGSyncerTimeout) + defer cancel() + nd, delta, err := ng.GetDelta(ctx, from) if err != nil { return err } diff --git a/crdt_test.go b/crdt_test.go index 9fc2305b..aba9dcd1 100644 --- a/crdt_test.go +++ b/crdt_test.go @@ -2,6 +2,7 @@ package crdt import ( "context" + "errors" "fmt" "math/rand" "os" @@ -26,7 +27,7 @@ import ( ) var numReplicas = 15 -var debug = false +var debug = true const ( mapStore = iota @@ -235,7 +236,7 @@ func makeNReplicas(t testing.TB, n int, opts *Options) ([]*Datastore, func()) { name: fmt.Sprintf("r#%d: ", i), l: DefaultOptions().Logger, } - replicaOpts[i].RebroadcastInterval = time.Second * 10 + replicaOpts[i].RebroadcastInterval = time.Second * 1 replicaOpts[i].NumWorkers = 5 replicaOpts[i].DAGSyncerTimeout = time.Second } @@ -817,56 +818,6 @@ func TestCRDTBroadcastBackwardsCompat(t *testing.T) { } } -// There is no easy way to see if the bloom filter is doing its job without -// wiring some sort of metric or benchmarking. Instead, this just hits the -// 3 places relevant to bloom filter: -// * When adding a tomb -// * When checking if something is tombstoned -// * When priming the filter -// -// The main thing is to manually verify (via printlns) that the bloom filter -// is used with the expected key everywhere: i.e. "/mykey" and not -// "mykey". "/something/else" and not "/something". Protip: it has been -// verified and it does that. -func TestBloomingTombstones(t *testing.T) { - ctx := context.Background() - replicas, closeReplicas := makeNReplicas(t, 1, nil) - defer closeReplicas() - - k := ds.NewKey("hola/adios/") - err := replicas[0].Put(ctx, k, []byte("bytes")) - if err != nil { - t.Fatal(err) - } - - err = replicas[0].Delete(ctx, k) - if err != nil { - t.Fatal(err) - } - - err = replicas[0].Put(ctx, k, []byte("bytes")) - if err != nil { - t.Fatal(err) - } - - q := query.Query{ - KeysOnly: false, - } - results, err := replicas[0].Query(ctx, q) - if err != nil { - t.Fatal(err) - } - defer results.Close() - - for r := range results.Next() { - if r.Error != nil { - t.Error(r.Error) - } - } - - replicas[0].set.primeBloomFilter(ctx) -} - func BenchmarkQueryElements(b *testing.B) { ctx := context.Background() replicas, closeReplicas := makeNReplicas(b, 1, nil) @@ -914,3 +865,144 @@ func TestRandomizeInterval(t *testing.T) { prevR = r } } + +func TestCRDTPutPutDelete(t *testing.T) { + replicas, closeReplicas := makeNReplicas(t, 2, nil) + defer closeReplicas() + + ctx := context.Background() + + br0 := replicas[0].broadcaster.(*mockBroadcaster) + br0.dropProb = 101 + + br1 := replicas[1].broadcaster.(*mockBroadcaster) + br1.dropProb = 101 + + k := ds.NewKey("k1") + + // r0 - put put delete + err := replicas[0].Put(ctx, k, []byte("r0-1")) + if err != nil { + t.Fatal(err) + } + err = replicas[0].Put(ctx, k, []byte("r0-2")) + if err != nil { + t.Fatal(err) + } + err = replicas[0].Delete(ctx, k) + if err != nil { + t.Fatal(err) + } + + // r1 - put + err = replicas[1].Put(ctx, k, []byte("r1-1")) + if err != nil { + t.Fatal(err) + } + + br0.dropProb = 0 + br1.dropProb = 0 + + time.Sleep(5 * time.Second) + + r0Res, err := replicas[0].Get(ctx, ds.NewKey("k1")) + if err != nil { + if !errors.Is(err, ds.ErrNotFound) { + t.Fatal(err) + } + } + + r1Res, err := replicas[1].Get(ctx, ds.NewKey("k1")) + if err != nil { + t.Fatal(err) + } + closeReplicas() + + if string(r0Res) != string(r1Res) { + fmt.Printf("r0Res: %s\nr1Res: %s\n", string(r0Res), string(r1Res)) + t.Log("r0 dag") + replicas[0].PrintDAG() + + t.Log("r1 dag") + replicas[1].PrintDAG() + + t.Fatal("r0 and r1 should have the same value") + } +} + +func TestMigration0to1(t *testing.T) { + replicas, closeReplicas := makeNReplicas(t, 1, nil) + defer closeReplicas() + replica := replicas[0] + ctx := context.Background() + + nItems := 200 + var keys []ds.Key + // Add nItems + for i := 0; i < nItems; i++ { + k := ds.RandomKey() + keys = append(keys, k) + v := []byte(fmt.Sprintf("%d", i)) + err := replica.Put(ctx, k, v) + if err != nil { + t.Fatal(err) + } + + } + + // Overwrite n/2 items 5 times to have multiple tombstones per key + // later... + for j := 0; j < 5; j++ { + for i := 0; i < nItems/2; i++ { + v := []byte(fmt.Sprintf("%d", i)) + err := replica.Put(ctx, keys[i], v) + if err != nil { + t.Fatal(err) + } + } + } + + // delete keys + for i := 0; i < nItems/2; i++ { + err := replica.Delete(ctx, keys[i]) + if err != nil { + t.Fatal(err) + } + } + + // And write them again + for i := 0; i < nItems/2; i++ { + err := replica.Put(ctx, keys[i], []byte("final value")) + if err != nil { + t.Fatal(err) + } + } + + // And now we manually put the wrong value + for i := 0; i < nItems/2; i++ { + valueK := replica.set.valueKey(keys[i].String()) + err := replica.set.store.Put(ctx, valueK, []byte("wrong value")) + if err != nil { + t.Fatal(err) + } + err = replica.set.setPriority(ctx, replica.set.store, keys[i].String(), 1) + if err != nil { + t.Fatal(err) + } + } + + err := replica.migrate0to1(ctx) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < nItems/2; i++ { + v, err := replica.Get(ctx, keys[i]) + if err != nil { + t.Fatal(err) + } + if string(v) != "final value" { + t.Fatalf("value for elem %d should be final value: %s", i, string(v)) + } + } +}