Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #238: Non converging operations with unordered deletes. Migration to fix existing datastores. #241

Merged
merged 2 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 37 additions & 13 deletions crdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
setNs = "s" // set
processedBlocksNs = "b" // blocks
dirtyBitKey = "d" // dirty
versionKey = "crdt_version"
)

// Common errors.
Expand Down Expand Up @@ -86,9 +87,9 @@ type Options struct {
// element is successfully removed from the datastore (either by a
// local or remote update). Unordered and concurrent updates may
// result in the DeleteHook being triggered even though the element is
// still present in the datastore because it was re-added. If that is
// relevant, use Has() to check if the removed element is still part
// of the datastore.
// still present in the datastore because it was re-added or not fully
// tombstoned. If that is relevant, use Has() to check if the removed
// element is still part of the datastore.
DeleteHook func(k ds.Key)
// NumWorkers specifies the number of workers ready to walk DAGs
NumWorkers int
Expand Down Expand Up @@ -257,7 +258,7 @@ func New(
}

ctx, cancel := context.WithCancel(context.Background())
set, err := newCRDTSet(ctx, store, fullSetNs, opts.Logger, setPutHook, setDeleteHook)
set, err := newCRDTSet(ctx, store, fullSetNs, dagSyncer, opts.Logger, setPutHook, setDeleteHook)
if err != nil {
cancel()
return nil, errors.Wrap(err, "error setting up crdt set")
Expand Down Expand Up @@ -285,8 +286,15 @@ func New(
queuedChildren: newCidSafeSet(),
}

err = dstore.applyMigrations(ctx)
if err != nil {
cancel()
return nil, err
}

headList, maxHeight, err := dstore.heads.List()
if err != nil {
cancel()
return nil, err
}
dstore.logger.Infof(
Expand Down Expand Up @@ -576,7 +584,7 @@ func (store *Datastore) handleBlock(c cid.Cid) error {
// Ignore already processed blocks.
// This includes the case when the block is a current
// head.
isProcessed, err := store.isProcessed(c)
isProcessed, err := store.isProcessed(store.ctx, c)
if err != nil {
return errors.Wrapf(err, "error checking for known block %s", c)
}
Expand Down Expand Up @@ -733,11 +741,11 @@ func (store *Datastore) processedBlockKey(c cid.Cid) ds.Key {
return store.namespace.ChildString(processedBlocksNs).ChildString(dshelp.MultihashToDsKey(c.Hash()).String())
}

func (store *Datastore) isProcessed(c cid.Cid) (bool, error) {
func (store *Datastore) isProcessed(ctx context.Context, c cid.Cid) (bool, error) {
return store.store.Has(store.ctx, store.processedBlockKey(c))
}

func (store *Datastore) markProcessed(c cid.Cid) error {
func (store *Datastore) markProcessed(ctx context.Context, c cid.Cid) error {
return store.store.Put(store.ctx, store.processedBlockKey(c), nil)
}

Expand Down Expand Up @@ -785,7 +793,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u

// Record that we have processed the node so that any other worker
// can skip it.
err = store.markProcessed(current)
err = store.markProcessed(store.ctx, current)
if err != nil {
return nil, errors.Wrapf(err, "error recording %s as processed", current)
}
Expand Down Expand Up @@ -827,7 +835,7 @@ func (store *Datastore) processNode(ng *crdtNodeGetter, root cid.Cid, rootPrio u
return nil, errors.Wrapf(err, "error checking if %s is head", child)
}

isProcessed, err := store.isProcessed(child)
isProcessed, err := store.isProcessed(store.ctx, child)
if err != nil {
return nil, errors.Wrapf(err, "error checking for known block %s", child)
}
Expand Down Expand Up @@ -959,7 +967,7 @@ func (store *Datastore) repairDAG() error {
}
cancel()

isProcessed, err := store.isProcessed(cur)
isProcessed, err := store.isProcessed(store.ctx, cur)
if err != nil {
return errors.Wrapf(err, "error checking for reprocessed block %s", cur)
}
Expand Down Expand Up @@ -1364,7 +1372,9 @@ func (store *Datastore) printDAGRec(from cid.Cid, depth uint64, ng *crdtNodeGett
return nil
}

nd, delta, err := ng.GetDelta(context.Background(), from)
ctx, cancel := context.WithTimeout(store.ctx, store.opts.DAGSyncerTimeout)
defer cancel()
nd, delta, err := ng.GetDelta(ctx, from)
if err != nil {
return err
}
Expand All @@ -1385,7 +1395,19 @@ func (store *Datastore) printDAGRec(from cid.Cid, depth uint64, ng *crdtNodeGett
cidStr = cidStr[len(cidStr)-4:]
line += fmt.Sprintf("%s,", cidStr)
}
line += "}:"
line += "}"

processed, err := store.isProcessed(store.ctx, nd.Cid())
if err != nil {
return err
}

if !processed {
line += " Unprocessed!"
}

line += ":"

fmt.Println(line)
for _, l := range nd.Links() {
store.printDAGRec(l.Cid, depth+1, ng, set)
Expand Down Expand Up @@ -1433,7 +1455,9 @@ func (store *Datastore) dotDAGRec(w io.Writer, from cid.Cid, depth uint64, ng *c
return nil
}

nd, delta, err := ng.GetDelta(context.Background(), from)
ctx, cancel := context.WithTimeout(store.ctx, store.opts.DAGSyncerTimeout)
defer cancel()
nd, delta, err := ng.GetDelta(ctx, from)
if err != nil {
return err
}
Expand Down
Loading