From 3e390a00db1c6cc5559a7ba79391dece86aa5941 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Fri, 8 Nov 2024 14:51:15 +0100 Subject: [PATCH] Fix #238: Non-converging operations on unordered deletes. The problem: a replica may tombstone a value and then receive a new write for that value that had happened BEFORE the tombstone from a different replica. The final value/priority pair should be set to the value/priority that was not tombstoned. However we did not do that. We knew the key was not tombstoned, but the value returned corresponded to an update that was tombstoned. The solution: every time a tombstone arrives, we need to look for the "best value/priority", that is, we need to make sure that among all the key values that we have, we set the best one that was not tombstoned according to the CRDT rules (highest priority or lexicographical sorting when equal). The consecuences: this makes tombstoning a more expensive operation but it also allows us to remove value/priority altogether when all the values have been tombstoned. As such, we don't need to check if a value has been tombstoned anymore when doing Gets/List, before returning the element. That saves lookups and that also means we no longer need to bloom filter, which was supposed to speed up this operation. In general, datastore which mostly add data will be better afterwards. --- set.go | 336 +++++++++++++++++++++++++++------------------------------ 1 file changed, 161 insertions(+), 175 deletions(-) diff --git a/set.go b/set.go index d0fe5d36..5970e9ea 100644 --- a/set.go +++ b/set.go @@ -7,10 +7,11 @@ import ( "errors" "strings" "sync" - "time" - bloom "github.com/ipfs/bbloom" + dshelp "github.com/ipfs/boxo/datastore/dshelp" + cid "github.com/ipfs/go-cid" pb "github.com/ipfs/go-ds-crdt/pb" + ipld "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" goprocess "github.com/jbenet/goprocess" multierr "go.uber.org/multierr" @@ -35,6 +36,7 @@ var ( // sorting their unique IDs alphabetically. type set struct { store ds.Datastore + dagService ipld.DAGService namespace ds.Key putHook func(key string, v []byte) deleteHook func(key string) @@ -43,84 +45,28 @@ type set struct { // Avoid merging two things at the same time since // we read-write value-priorities in a non-atomic way. putElemsMux sync.Mutex - - tombstonesBloom *bloom.Bloom } -// Tombstones bloom filter options. -// We have optimized the defaults for speed: -// - commit 30 MiB of memory to the filter -// - only hash twice -// - False positive probabily is 1 in 10000 when working with 1M items in the filter. -// See https://hur.st/bloomfilter/?n=&p=0.0001&m=30MiB&k=2 -var ( - TombstonesBloomFilterSize float64 = 30 * 1024 * 1024 * 8 // 30 MiB - TombstonesBloomFilterHashes float64 = 2 -) - func newCRDTSet( ctx context.Context, d ds.Datastore, namespace ds.Key, + dagService ipld.DAGService, logger logging.StandardLogger, putHook func(key string, v []byte), deleteHook func(key string), ) (*set, error) { - blm, err := bloom.New( - float64(TombstonesBloomFilterSize), - float64(TombstonesBloomFilterHashes), - ) - if err != nil { - return nil, err - } - set := &set{ - namespace: namespace, - store: d, - logger: logger, - putHook: putHook, - deleteHook: deleteHook, - tombstonesBloom: blm, + namespace: namespace, + store: d, + dagService: dagService, + logger: logger, + putHook: putHook, + deleteHook: deleteHook, } - return set, set.primeBloomFilter(ctx) -} - -// We need to add all keys in tombstones to the filter. -func (s *set) primeBloomFilter(ctx context.Context) error { - tombsPrefix := s.keyPrefix(tombsNs) // /ns/tombs - q := query.Query{ - Prefix: tombsPrefix.String(), - KeysOnly: true, - } - - t := time.Now() - nTombs := 0 - - results, err := s.store.Query(ctx, q) - if err != nil { - return err - } - defer results.Close() - - for r := range results.Next() { - if r.Error != nil { - return r.Error - } - - // Switch from /ns/tombs/key/block to /key/block - key := ds.NewKey( - strings.TrimPrefix(r.Key, tombsPrefix.String())) - // Switch from /key/block to key - key = key.Parent() - // fmt.Println("Bloom filter priming with:", key) - // put the key in the bloom cache - s.tombstonesBloom.Add(key.Bytes()) - nTombs++ - } - s.logger.Infof("Tombstones have bloomed: %d tombs. Took: %s", nTombs, time.Since(t)) - return nil + return set, nil } // Add returns a new delta-set adding the given key/value. @@ -188,34 +134,15 @@ func (s *set) Element(ctx context.Context, key string) ([]byte, error) { // We can only GET an element if it's part of the Set (in // "elements" and not in "tombstones"). - // As an optimization: - // * If the key has a value in the store it means: - // -> It occurs at least once in "elems" - // -> It may or not be tombstoned - // * If the key does not have a value in the store: - // -> It was either never added + // * If the key has a value in the store it means that it has been + // written and is alive. putTombs will delete the value if all elems + // are tombstoned, or leave the best one. + valueK := s.valueKey(key) value, err := s.store.Get(ctx, valueK) if err != nil { // not found is fine, we just return it return value, err } - - // We have an existing element. Check if tombstoned. - inSet, err := s.checkNotTombstoned(ctx, key) - if err != nil { - return nil, err - } - - if !inSet { - // attempt to remove so next time we do not have to do this - // lookup. - // In concurrency, this may delete a key that was just written - // and should not be deleted. - // s.store.Delete(valueK) - - return nil, ds.ErrNotFound - } - // otherwise return the value return value, nil } @@ -326,15 +253,11 @@ func (s *set) Elements(ctx context.Context, q query.Query) (query.Results, error entry.Value = r.Value entry.Size = r.Size entry.Expiration = r.Expiration - has, err := s.checkNotTombstoned(ctx, key) - if err != nil { - sendResult(b, p, query.Result{Error: err}) - return - } - if !has { - continue - } + // The fact that /v is set means it is not tombstoned, + // as tombstoning removes /v and /p or sets them to + // the best value. + if q.KeysOnly { entry.Size = -1 entry.Value = nil @@ -354,70 +277,7 @@ func (s *set) InSet(ctx context.Context, key string) (bool, error) { // Optimization: if we do not have a value // this key was never added. valueK := s.valueKey(key) - if ok, err := s.store.Has(ctx, valueK); !ok { - return false, err - } - - // Otherwise, do the long check. - return s.checkNotTombstoned(ctx, key) -} - -// Returns true when we have a key/block combination in the -// elements set that has not been tombstoned. -// -// Warning: In order to do a quick bloomfilter check, this assumes the key is -// in elems already. Any code calling this function already has verified -// that there is a value-key entry for the key, thus there must necessarily -// be a non-empty set of key/block in elems. -// -// Put otherwise: this code will misbehave when called directly to check if an -// element exists. See Element()/InSet() etc.. -func (s *set) checkNotTombstoned(ctx context.Context, key string) (bool, error) { - // Bloom filter check: has this key been potentially tombstoned? - - // fmt.Println("Bloom filter check:", key) - if !s.tombstonesBloom.HasTS([]byte(key)) { - return true, nil - } - - // /namespace/elems/ - prefix := s.elemsPrefix(key) - q := query.Query{ - Prefix: prefix.String(), - KeysOnly: true, - } - - results, err := s.store.Query(ctx, q) - if err != nil { - return false, err - } - defer results.Close() - - // range all the /namespace/elems//. - for r := range results.Next() { - if r.Error != nil { - return false, err - } - - id := strings.TrimPrefix(r.Key, prefix.String()) - if !ds.RawKey(id).IsTopLevel() { - // our prefix matches blocks from other keys i.e. our - // prefix is "hello" and we have a different key like - // "hello/bye" so we have a block id like - // "bye/". If we got the right key, then the id - // should be the block id only. - continue - } - // if not tombstoned, we have it - inTomb, err := s.inTombsKeyID(ctx, key, id) - if err != nil { - return false, err - } - if !inTomb { - return true, nil - } - } - return false, nil + return s.store.Has(ctx, valueK) } // /namespace/ @@ -476,8 +336,7 @@ func (s *set) setPriority(ctx context.Context, writeStore ds.Write, key string, // sets a value if priority is higher. When equal, it sets if the // value is lexicographically higher than the current value. func (s *set) setValue(ctx context.Context, writeStore ds.Write, key, id string, value []byte, prio uint64) error { - // If this key was tombstoned already, do not store/update the value - // at all. + // If this key was tombstoned already, do not store/update the value. deleted, err := s.inTombsKeyID(ctx, key, id) if err != nil || deleted { return err @@ -518,6 +377,113 @@ func (s *set) setValue(ctx context.Context, writeStore ds.Write, key, id string, return nil } +// findBestValue looks for all entries for the given key, figures out their +// priority from their delta (skipping the blocks by the given pendingTombIDs) +// and returns the value with the highest priority that is not tombstoned nor +// about to be tombstoned. +func (s *set) findBestValue(ctx context.Context, key string, pendingTombIDs []string) ([]byte, uint64, error) { + // /namespace/elems/ + prefix := s.elemsPrefix(key) + q := query.Query{ + Prefix: prefix.String(), + KeysOnly: true, + } + + results, err := s.store.Query(ctx, q) + if err != nil { + return nil, 0, err + } + defer results.Close() + + var bestValue []byte + var bestPriority uint64 + var deltaCid cid.Cid + ng := crdtNodeGetter{NodeGetter: s.dagService} + + // range all the /namespace/elems//. +NEXT: + for r := range results.Next() { + if r.Error != nil { + return nil, 0, err + } + + id := strings.TrimPrefix(r.Key, prefix.String()) + if !ds.RawKey(id).IsTopLevel() { + // our prefix matches blocks from other keys i.e. our + // prefix is "hello" and we have a different key like + // "hello/bye" so we have a block id like + // "bye/". If we got the right key, then the id + // should be the block id only. + continue + } + // if block is one of the pending tombIDs, continue + for _, tombID := range pendingTombIDs { + if tombID == id { + continue NEXT + } + } + + // if tombstoned, continue + inTomb, err := s.inTombsKeyID(ctx, key, id) + if err != nil { + return nil, 0, err + } + if inTomb { + continue + } + + // get the block + mhash, err := dshelp.DsKeyToMultihash(ds.NewKey(id)) + if err != nil { + return nil, 0, err + } + deltaCid = cid.NewCidV1(cid.DagProtobuf, mhash) + _, delta, err := ng.GetDelta(ctx, deltaCid) + if err != nil { + return nil, 0, err + } + + // discard this delta. + if delta.Priority < bestPriority { + continue + } + + // When equal priority, choose the greatest among values in + // the delta and current. When higher priority, choose the + // greatest only among those in the delta. + var greatestValueInDelta []byte + for _, elem := range delta.GetElements() { + if elem.GetKey() != key { + continue + } + v := elem.GetValue() + if bytes.Compare(greatestValueInDelta, v) < 0 { + greatestValueInDelta = v + } + } + + if delta.Priority > bestPriority { + bestValue = greatestValueInDelta + bestPriority = delta.Priority + continue + } + + // equal priority + if bytes.Compare(bestValue, greatestValueInDelta) < 0 { + bestValue = greatestValueInDelta + } + } + + // if deltaCid.Defined() { + // cidStr := deltaCid.String() + // cidStr = cidStr[len(cidStr)-4:] + // fmt.Printf("best value: (%s, %d) on block: %s\n", bestValue, bestPriority, cidStr) + // } else { + // fmt.Printf("best value: NONE\n") + // } + return bestValue, bestPriority, nil +} + // putElems adds items to the "elems" set. It will also set current // values and priorities for each element. This needs to run in a lock, // as otherwise races may occur when reading/writing the priorities, resulting @@ -588,23 +554,35 @@ func (s *set) putTombs(ctx context.Context, tombs []*pb.Element) error { } } - deletedElems := make(map[string]struct{}) + // key -> tombstonedBlockID. Carries the tombstoned blocks for each + // element in this delta. + deletedElems := make(map[string][]string) + for _, e := range tombs { // /namespace/tombs// - elemKey := e.GetKey() - k := s.tombsPrefix(elemKey).ChildString(e.GetId()) - err := store.Put(ctx, k, nil) + key := e.GetKey() + id := e.GetId() + valueK := s.valueKey(key) + deletedElems[key] = append(deletedElems[key], id) + + // Find best value for element that we are going to delete + v, p, err := s.findBestValue(ctx, key, deletedElems[key]) if err != nil { return err } - s.tombstonesBloom.AddTS([]byte(elemKey)) - //fmt.Println("Bloom filter add:", elemKey) - // run delete hook only once for all - // versions of the same element tombstoned - // in this delta - if _, ok := deletedElems[elemKey]; !ok { - deletedElems[elemKey] = struct{}{} - s.deleteHook(elemKey) + if v == nil { + store.Delete(ctx, valueK) + store.Delete(ctx, s.priorityKey(key)) + } else { + store.Put(ctx, valueK, v) + s.setPriority(ctx, store, key, p) + } + + // Write tomb into store. + k := s.tombsPrefix(key).ChildString(id) + err = store.Put(ctx, k, nil) + if err != nil { + return err } } @@ -614,6 +592,14 @@ func (s *set) putTombs(ctx context.Context, tombs []*pb.Element) error { return err } } + + // run delete hook only once for all versions of the same element + // tombstoned in this delta. Note it may be that the element was not + // fully deleted and only a different value took its place. + for del := range deletedElems { + s.deleteHook(del) + } + return nil }