Skip to content

Commit

Permalink
Introduce database versioning and migrations
Browse files Browse the repository at this point in the history
The fix for #238 does not mean that everything is fine. We will have databases
which have the wrong value/priority sets written and this would only fix
itself on new writes or deletes to the same key.

So we are unfortunately forced to manually fix it on start. For this we
introduce a data migration.

During a fresh start, we will then find all the keys affected by tombstones
and loop them, finding the best value for them (the correct one) and fixing
them. Once done we record that we are on version=1 and don't run this again.

Future fuckups can be fixed with other migrations.
  • Loading branch information
hsanjuan committed Nov 11, 2024
1 parent 10a7dca commit ca44680
Show file tree
Hide file tree
Showing 3 changed files with 345 additions and 70 deletions.
50 changes: 37 additions & 13 deletions crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
setNs = "s" // set
processedBlocksNs = "b" // blocks
dirtyBitKey = "d" // dirty
versionKey = "crdt_version"
)

// Common errors.
Expand Down Expand Up @@ -86,9 +87,9 @@ type Options struct {
// element is successfully removed from the datastore (either by a
// local or remote update). Unordered and concurrent updates may
// result in the DeleteHook being triggered even though the element is
// still present in the datastore because it was re-added. If that is
// relevant, use Has() to check if the removed element is still part
// of the datastore.
// still present in the datastore because it was re-added or not fully
// tombstoned. If that is relevant, use Has() to check if the removed
// element is still part of the datastore.
DeleteHook func(k ds.Key)
// NumWorkers specifies the number of workers ready to walk DAGs
NumWorkers int
Expand Down Expand Up @@ -257,7 +258,7 @@ func New(
}

ctx, cancel := context.WithCancel(context.Background())
set, err := newCRDTSet(ctx, store, fullSetNs, opts.Logger, setPutHook, setDeleteHook)
set, err := newCRDTSet(ctx, store, fullSetNs, dagSyncer, opts.Logger, setPutHook, setDeleteHook)
if err != nil {
cancel()
return nil, errors.Wrap(err, "error setting up crdt set")
Expand Down Expand Up @@ -285,8 +286,15 @@ func New(
queuedChildren: newCidSafeSet(),
}

err = dstore.applyMigrations(ctx)
if err != nil {
cancel()
return nil, err
}

headList, maxHeight, err := dstore.heads.List()
if err != nil {
cancel()
return nil, err
}
dstore.logger.Infof(
Expand Down Expand Up @@ -576,7 +584,7 @@ func (store *Datastore) handleBlock(c cid.Cid) error {
// Ignore already processed blocks.
// This includes the case when the block is a current
// head.
isProcessed, err := store.isProcessed(c)
isProcessed, err := store.isProcessed(store.ctx, c)
if err != nil {
return errors.Wrapf(err, "error checking for known block %s", c)
}
Expand Down Expand Up @@ -733,11 +741,11 @@ func (store *Datastore) processedBlockKey(c cid.Cid) ds.Key {
return store.namespace.ChildString(processedBlocksNs).ChildString(dshelp.MultihashToDsKey(c.Hash()).String())
}

func (store *Datastore) isProcessed(c cid.Cid) (bool, error) {
func (store *Datastore) isProcessed(ctx context.Context, c cid.Cid) (bool, error) {
return store.store.Has(store.ctx, store.processedBlockKey(c))
}

func (store *Datastore) markProcessed(c cid.Cid) error {
func (store *Datastore) markProcessed(ctx context.Context, c cid.Cid) error {
return store.store.Put(store.ctx, store.processedBlockKey(c), nil)
}

Expand Down Expand Up @@ -785,7 +793,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u

// Record that we have processed the node so that any other worker
// can skip it.
err = store.markProcessed(current)
err = store.markProcessed(store.ctx, current)
if err != nil {
return nil, errors.Wrapf(err, "error recording %s as processed", current)
}
Expand Down Expand Up @@ -827,7 +835,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u
return nil, errors.Wrapf(err, "error checking if %s is head", child)
}

isProcessed, err := store.isProcessed(child)
isProcessed, err := store.isProcessed(store.ctx, child)
if err != nil {
return nil, errors.Wrapf(err, "error checking for known block %s", child)
}
Expand Down Expand Up @@ -959,7 +967,7 @@ func (store *Datastore) repairDAG() error {
}
cancel()

isProcessed, err := store.isProcessed(cur)
isProcessed, err := store.isProcessed(store.ctx, cur)
if err != nil {
return errors.Wrapf(err, "error checking for reprocessed block %s", cur)
}
Expand Down Expand Up @@ -1364,7 +1372,9 @@ func (store *Datastore) printDAGRec(from cid.Cid, depth uint64, ng *crdtNodeGett
return nil
}

nd, delta, err := ng.GetDelta(context.Background(), from)
ctx, cancel := context.WithTimeout(store.ctx, store.opts.DAGSyncerTimeout)
defer cancel()
nd, delta, err := ng.GetDelta(ctx, from)
if err != nil {
return err
}
Expand All @@ -1385,7 +1395,19 @@ func (store *Datastore) printDAGRec(from cid.Cid, depth uint64, ng *crdtNodeGett
cidStr = cidStr[len(cidStr)-4:]
line += fmt.Sprintf("%s,", cidStr)
}
line += "}:"
line += "}"

processed, err := store.isProcessed(store.ctx, nd.Cid())
if err != nil {
return err
}

if !processed {
line += " Unprocessed!"
}

line += ":"

fmt.Println(line)
for _, l := range nd.Links() {
store.printDAGRec(l.Cid, depth+1, ng, set)
Expand Down Expand Up @@ -1433,7 +1455,9 @@ func (store *Datastore) dotDAGRec(w io.Writer, from cid.Cid, depth uint64, ng *c
return nil
}

nd, delta, err := ng.GetDelta(context.Background(), from)
ctx, cancel := context.WithTimeout(store.ctx, store.opts.DAGSyncerTimeout)
defer cancel()
nd, delta, err := ng.GetDelta(ctx, from)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit ca44680

Please sign in to comment.