Skip to content

Commit

Permalink
Introduce database versioning and migrations
Browse files Browse the repository at this point in the history
The fix for #238 does not mean that everything is fine. We will have databases
which have the wrong value/priority sets written and this would only fix
itself on new writes or deletes to the same key.

So we are unfortunately forced to manually fix it on start. For this we
introduce a data migration.

During a fresh start, we will then find all the keys affected by tombstones
and loop them, finding the best value for them (the correct one) and fixing
them. Once done we record that we are on version=1 and don't run this again.

Future fuckups can be fixed with other migrations.
  • Loading branch information
hsanjuan committed Nov 8, 2024
1 parent 6d2f62f commit a3f6a0b
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 65 deletions.
50 changes: 37 additions & 13 deletions crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
setNs = "s" // set
processedBlocksNs = "b" // blocks
dirtyBitKey = "d" // dirty
versionKey = "crdt_version"
)

// Common errors.
Expand Down Expand Up @@ -86,9 +87,9 @@ type Options struct {
// element is successfully removed from the datastore (either by a
// local or remote update). Unordered and concurrent updates may
// result in the DeleteHook being triggered even though the element is
// still present in the datastore because it was re-added. If that is
// relevant, use Has() to check if the removed element is still part
// of the datastore.
// still present in the datastore because it was re-added or not fully
// tombstoned. If that is relevant, use Has() to check if the removed
// element is still part of the datastore.
DeleteHook func(k ds.Key)
// NumWorkers specifies the number of workers ready to walk DAGs
NumWorkers int
Expand Down Expand Up @@ -257,7 +258,7 @@ func New(
}

ctx, cancel := context.WithCancel(context.Background())
set, err := newCRDTSet(ctx, store, fullSetNs, opts.Logger, setPutHook, setDeleteHook)
set, err := newCRDTSet(ctx, store, fullSetNs, dagSyncer, opts.Logger, setPutHook, setDeleteHook)
if err != nil {
cancel()
return nil, errors.Wrap(err, "error setting up crdt set")
Expand Down Expand Up @@ -285,8 +286,15 @@ func New(
queuedChildren: newCidSafeSet(),
}

err = dstore.applyMigrations(ctx)

Check failure on line 289 in crdt.go

View workflow job for this annotation

GitHub Actions / suite

dstore.applyMigrations undefined (type *Datastore has no field or method applyMigrations)

Check failure on line 289 in crdt.go

View workflow job for this annotation

GitHub Actions / build

dstore.applyMigrations undefined (type *Datastore has no field or method applyMigrations)

Check failure on line 289 in crdt.go

View workflow job for this annotation

GitHub Actions / suite

dstore.applyMigrations undefined (type *Datastore has no field or method applyMigrations)

Check failure on line 289 in crdt.go

View workflow job for this annotation

GitHub Actions / build

dstore.applyMigrations undefined (type *Datastore has no field or method applyMigrations)
if err != nil {
cancel()
return nil, err
}

headList, maxHeight, err := dstore.heads.List()
if err != nil {
cancel()
return nil, err
}
dstore.logger.Infof(
Expand Down Expand Up @@ -576,7 +584,7 @@ func (store *Datastore) handleBlock(c cid.Cid) error {
// Ignore already processed blocks.
// This includes the case when the block is a current
// head.
isProcessed, err := store.isProcessed(c)
isProcessed, err := store.isProcessed(store.ctx, c)
if err != nil {
return errors.Wrapf(err, "error checking for known block %s", c)
}
Expand Down Expand Up @@ -733,11 +741,11 @@ func (store *Datastore) processedBlockKey(c cid.Cid) ds.Key {
return store.namespace.ChildString(processedBlocksNs).ChildString(dshelp.MultihashToDsKey(c.Hash()).String())
}

func (store *Datastore) isProcessed(c cid.Cid) (bool, error) {
func (store *Datastore) isProcessed(ctx context.Context, c cid.Cid) (bool, error) {
return store.store.Has(store.ctx, store.processedBlockKey(c))
}

func (store *Datastore) markProcessed(c cid.Cid) error {
func (store *Datastore) markProcessed(ctx context.Context, c cid.Cid) error {
return store.store.Put(store.ctx, store.processedBlockKey(c), nil)
}

Expand Down Expand Up @@ -785,7 +793,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u

// Record that we have processed the node so that any other worker
// can skip it.
err = store.markProcessed(current)
err = store.markProcessed(store.ctx, current)
if err != nil {
return nil, errors.Wrapf(err, "error recording %s as processed", current)
}
Expand Down Expand Up @@ -827,7 +835,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u
return nil, errors.Wrapf(err, "error checking if %s is head", child)
}

isProcessed, err := store.isProcessed(child)
isProcessed, err := store.isProcessed(store.ctx, child)
if err != nil {
return nil, errors.Wrapf(err, "error checking for known block %s", child)
}
Expand Down Expand Up @@ -959,7 +967,7 @@ func (store *Datastore) repairDAG() error {
}
cancel()

isProcessed, err := store.isProcessed(cur)
isProcessed, err := store.isProcessed(store.ctx, cur)
if err != nil {
return errors.Wrapf(err, "error checking for reprocessed block %s", cur)
}
Expand Down Expand Up @@ -1364,7 +1372,9 @@ func (store *Datastore) printDAGRec(from cid.Cid, depth uint64, ng *crdtNodeGett
return nil
}

nd, delta, err := ng.GetDelta(context.Background(), from)
ctx, cancel := context.WithTimeout(store.ctx, store.opts.DAGSyncerTimeout)
defer cancel()
nd, delta, err := ng.GetDelta(ctx, from)
if err != nil {
return err
}
Expand All @@ -1385,7 +1395,19 @@ func (store *Datastore) printDAGRec(from cid.Cid, depth uint64, ng *crdtNodeGett
cidStr = cidStr[len(cidStr)-4:]
line += fmt.Sprintf("%s,", cidStr)
}
line += "}:"
line += "}"

processed, err := store.isProcessed(store.ctx, nd.Cid())
if err != nil {
return err
}

if !processed {
line += " Unprocessed!"
}

line += ":"

fmt.Println(line)
for _, l := range nd.Links() {
store.printDAGRec(l.Cid, depth+1, ng, set)
Expand Down Expand Up @@ -1433,7 +1455,9 @@ func (store *Datastore) dotDAGRec(w io.Writer, from cid.Cid, depth uint64, ng *c
return nil
}

nd, delta, err := ng.GetDelta(context.Background(), from)
ctx, cancel := context.WithTimeout(store.ctx, store.opts.DAGSyncerTimeout)
defer cancel()
nd, delta, err := ng.GetDelta(ctx, from)
if err != nil {
return err
}
Expand Down
196 changes: 144 additions & 52 deletions crdt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package crdt

import (
"context"
"errors"
"fmt"
"math/rand"
"os"
Expand All @@ -26,7 +27,7 @@ import (
)

var numReplicas = 15
var debug = false
var debug = true

const (
mapStore = iota
Expand Down Expand Up @@ -235,7 +236,7 @@ func makeNReplicas(t testing.TB, n int, opts *Options) ([]*Datastore, func()) {
name: fmt.Sprintf("r#%d: ", i),
l: DefaultOptions().Logger,
}
replicaOpts[i].RebroadcastInterval = time.Second * 10
replicaOpts[i].RebroadcastInterval = time.Second * 1
replicaOpts[i].NumWorkers = 5
replicaOpts[i].DAGSyncerTimeout = time.Second
}
Expand Down Expand Up @@ -817,56 +818,6 @@ func TestCRDTBroadcastBackwardsCompat(t *testing.T) {
}
}

// There is no easy way to see if the bloom filter is doing its job without
// wiring some sort of metric or benchmarking. Instead, this just hits the
// 3 places relevant to bloom filter:
// * When adding a tomb
// * When checking if something is tombstoned
// * When priming the filter
//
// The main thing is to manually verify (via printlns) that the bloom filter
// is used with the expected key everywhere: i.e. "/mykey" and not
// "mykey". "/something/else" and not "/something". Protip: it has been
// verified and it does that.
func TestBloomingTombstones(t *testing.T) {
ctx := context.Background()
replicas, closeReplicas := makeNReplicas(t, 1, nil)
defer closeReplicas()

k := ds.NewKey("hola/adios/")
err := replicas[0].Put(ctx, k, []byte("bytes"))
if err != nil {
t.Fatal(err)
}

err = replicas[0].Delete(ctx, k)
if err != nil {
t.Fatal(err)
}

err = replicas[0].Put(ctx, k, []byte("bytes"))
if err != nil {
t.Fatal(err)
}

q := query.Query{
KeysOnly: false,
}
results, err := replicas[0].Query(ctx, q)
if err != nil {
t.Fatal(err)
}
defer results.Close()

for r := range results.Next() {
if r.Error != nil {
t.Error(r.Error)
}
}

replicas[0].set.primeBloomFilter(ctx)
}

func BenchmarkQueryElements(b *testing.B) {
ctx := context.Background()
replicas, closeReplicas := makeNReplicas(b, 1, nil)
Expand Down Expand Up @@ -914,3 +865,144 @@ func TestRandomizeInterval(t *testing.T) {
prevR = r
}
}

func TestCRDTPutPutDelete(t *testing.T) {
replicas, closeReplicas := makeNReplicas(t, 2, nil)
defer closeReplicas()

ctx := context.Background()

br0 := replicas[0].broadcaster.(*mockBroadcaster)
br0.dropProb = 101

br1 := replicas[1].broadcaster.(*mockBroadcaster)
br1.dropProb = 101

k := ds.NewKey("k1")

// r0 - put put delete
err := replicas[0].Put(ctx, k, []byte("r0-1"))
if err != nil {
t.Fatal(err)
}
err = replicas[0].Put(ctx, k, []byte("r0-2"))
if err != nil {
t.Fatal(err)
}
err = replicas[0].Delete(ctx, k)
if err != nil {
t.Fatal(err)
}

// r1 - put
err = replicas[1].Put(ctx, k, []byte("r1-1"))
if err != nil {
t.Fatal(err)
}

br0.dropProb = 0
br1.dropProb = 0

time.Sleep(5 * time.Second)

r0Res, err := replicas[0].Get(ctx, ds.NewKey("k1"))
if err != nil {
if !errors.Is(err, ds.ErrNotFound) {
t.Fatal(err)
}
}

r1Res, err := replicas[1].Get(ctx, ds.NewKey("k1"))
if err != nil {
t.Fatal(err)
}
closeReplicas()

if string(r0Res) != string(r1Res) {
fmt.Printf("r0Res: %s\nr1Res: %s\n", string(r0Res), string(r1Res))
t.Log("r0 dag")
replicas[0].PrintDAG()

t.Log("r1 dag")
replicas[1].PrintDAG()

t.Fatal("r0 and r1 should have the same value")
}
}

func TestMigration0to1(t *testing.T) {
replicas, closeReplicas := makeNReplicas(t, 1, nil)
defer closeReplicas()
replica := replicas[0]
ctx := context.Background()

nItems := 200
var keys []ds.Key
// Add nItems
for i := 0; i < nItems; i++ {
k := ds.RandomKey()
keys = append(keys, k)
v := []byte(fmt.Sprintf("%d", i))
err := replica.Put(ctx, k, v)
if err != nil {
t.Fatal(err)
}

}

// Overwrite n/2 items 5 times to have multiple tombstones per key
// later...
for j := 0; j < 5; j++ {
for i := 0; i < nItems/2; i++ {
v := []byte(fmt.Sprintf("%d", i))
err := replica.Put(ctx, keys[i], v)
if err != nil {
t.Fatal(err)
}
}
}

// delete keys
for i := 0; i < nItems/2; i++ {
err := replica.Delete(ctx, keys[i])
if err != nil {
t.Fatal(err)
}
}

// And write them again
for i := 0; i < nItems/2; i++ {
err := replica.Put(ctx, keys[i], []byte("final value"))
if err != nil {
t.Fatal(err)
}
}

// And now we manually put the wrong value
for i := 0; i < nItems/2; i++ {
valueK := replica.set.valueKey(keys[i].String())
err := replica.set.store.Put(ctx, valueK, []byte("wrong value"))
if err != nil {
t.Fatal(err)
}
err = replica.set.setPriority(ctx, replica.set.store, keys[i].String(), 1)
if err != nil {
t.Fatal(err)
}
}

err := replica.migrate0to1(ctx)

Check failure on line 994 in crdt_test.go

View workflow job for this annotation

GitHub Actions / suite

replica.migrate0to1 undefined (type *Datastore has no field or method migrate0to1)

Check failure on line 994 in crdt_test.go

View workflow job for this annotation

GitHub Actions / suite

replica.migrate0to1 undefined (type *Datastore has no field or method migrate0to1)
if err != nil {
t.Fatal(err)
}

for i := 0; i < nItems/2; i++ {
v, err := replica.Get(ctx, keys[i])
if err != nil {
t.Fatal(err)
}
if string(v) != "final value" {
t.Fatalf("value for elem %d should be final value: %s", i, string(v))
}
}
}

0 comments on commit a3f6a0b

Please sign in to comment.