diff --git a/metamorphic/generator.go b/metamorphic/generator.go index f00626d517..646ec70955 100644 --- a/metamorphic/generator.go +++ b/metamorphic/generator.go @@ -1009,14 +1009,14 @@ func (g *generator) iterSeekGEWithLimit(iterID objID) { }) } -func (g *generator) randKeyToReadWithinBounds(lower, upper []byte, readerID objID) []*keyMeta { +// inRangeKeys returns all keys in the range [lower, upper) associated with the +// given object. +func (g *generator) inRangeKeys(lower, upper []byte, o objID) []*keyMeta { var inRangeKeys []*keyMeta - for _, keyMeta := range g.keyManager.byObj[readerID] { - posKey := keyMeta.key - if g.cmp(posKey, lower) < 0 || g.cmp(posKey, upper) >= 0 { - continue + for _, keyMeta := range g.keyManager.sortedKeysForObj(o) { + if g.cmp(keyMeta.key, lower) >= 0 && g.cmp(keyMeta.key, upper) < 0 { + inRangeKeys = append(inRangeKeys, keyMeta) } - inRangeKeys = append(inRangeKeys, keyMeta) } return inRangeKeys } @@ -1036,7 +1036,7 @@ func (g *generator) iterSeekPrefixGE(iterID objID) { // random key. if g.rng.Intn(10) >= 1 { possibleKeys := make([][]byte, 0, 100) - inRangeKeys := g.randKeyToReadWithinBounds(lower, upper, g.objDB[iterID]) + inRangeKeys := g.inRangeKeys(lower, upper, g.objDB[iterID]) for _, keyMeta := range inRangeKeys { visibleHistory := keyMeta.history.before(iterCreationTimestamp) @@ -1049,7 +1049,7 @@ func (g *generator) iterSeekPrefixGE(iterID objID) { } if len(possibleKeys) > 0 { - key = []byte(possibleKeys[g.rng.Int31n(int32(len(possibleKeys)))]) + key = possibleKeys[g.rng.Int31n(int32(len(possibleKeys)))] } } diff --git a/metamorphic/key_manager.go b/metamorphic/key_manager.go index e383b7eee1..aff57a048f 100644 --- a/metamorphic/key_manager.go +++ b/metamorphic/key_manager.go @@ -1,7 +1,6 @@ package metamorphic import ( - "bytes" "cmp" "fmt" "slices" @@ -14,32 +13,11 @@ import ( "github.com/stretchr/testify/require" ) -// objKey is a tuple of (objID, key). This struct is used primarily as a map -// key for keyManager. Only writer objTags can occur here, i.e., dbTag and -// batchTag, since this is used for tracking the keys in a writer. -type objKey struct { - id objID - key []byte -} - -// makeObjKey returns a new objKey given and id and key. -func makeObjKey(id objID, key []byte) objKey { - if id.tag() != dbTag && id.tag() != batchTag { - panic(fmt.Sprintf("unexpected non-writer tag %v", id.tag())) - } - return objKey{id, key} -} - -// String implements fmt.Stringer, returning a stable string representation of -// the objKey. This string is used as map key. -func (o objKey) String() string { - return fmt.Sprintf("%s:%s", o.id, o.key) -} - // keyMeta is metadata associated with an (objID, key) pair, where objID is // a writer containing the key. type keyMeta struct { - objKey + objID objID + key []byte // history provides the history of writer operations applied against this // key on this object. history is always ordered by non-decreasing // metaTimestamp. @@ -67,7 +45,7 @@ func (m *keyMeta) mergeInto(dst *keyMeta, ts int) { // okay, because as long as we're properly generating single deletes // according to the W1 invariant described in keyManager's comment, a // single delete is equivalent to delete for the current history. - if dst.objKey.id.tag() == dbTag && op.opType.isDelete() { + if dst.objID.tag() == dbTag && op.opType.isDelete() { dst.clear() continue } @@ -84,15 +62,18 @@ type bounds struct { largestExcl bool // is largest exclusive? } -func (b *bounds) String() string { +func (b bounds) String() string { if b.largestExcl { return fmt.Sprintf("[%q,%q)", b.smallest, b.largest) } return fmt.Sprintf("[%q,%q]", b.smallest, b.largest) } -// overlaps returns true iff the bounds intersect. -func (b *bounds) overlaps(cmp base.Compare, other *bounds) bool { +// Overlaps returns true iff the bounds intersect. +func (b *bounds) Overlaps(cmp base.Compare, other bounds) bool { + if b.IsUnset() || other.IsUnset() { + return false + } // Is b strictly before other? if v := cmp(b.largest, other.smallest); v < 0 || (v == 0 && b.largestExcl) { return false @@ -104,14 +85,27 @@ func (b *bounds) overlaps(cmp base.Compare, other *bounds) bool { return true } -// mergeInto merges the receiver bounds into other, mutating other. -func (b bounds) mergeInto(cmp base.Compare, other *bounds) { - if cmp(other.smallest, b.smallest) > 0 { - other.smallest = b.smallest +// IsUnset returns true if the bounds haven't been set. +func (b bounds) IsUnset() bool { + return b.smallest == nil && b.largest == nil +} + +// Expand potentially expands the receiver bounds to include the other given +// bounds. If the receiver is unset, the other bounds are copied. +func (b *bounds) Expand(cmp base.Compare, other bounds) { + if other.IsUnset() { + return + } + if b.IsUnset() { + *b = other + return + } + if cmp(b.smallest, other.smallest) > 0 { + b.smallest = other.smallest } - if v := cmp(other.largest, b.largest); v < 0 || (v == 0 && other.largestExcl) { - other.largest = b.largest - other.largestExcl = b.largestExcl + if v := cmp(b.largest, other.largest); v < 0 || (v == 0 && b.largestExcl) { + b.largest = other.largest + b.largestExcl = other.largestExcl } } @@ -184,18 +178,7 @@ type keyManager struct { // iterator. metaTimestamp int - // byObjKey tracks the state for each (writer, key) pair. It refers to the - // same *keyMeta as in the byObj slices. Using a map allows for fast state - // lookups when changing the state based on a writer operation on the key. - byObjKey map[string]*keyMeta - // List of keys per writer, and what has happened to it in that writer. - // Will be transferred when needed. - byObj map[objID][]*keyMeta - // boundsByObj holds user key bounds encompassing all the keys set within an - // object. It's updated within `update` when a new op is generated. It's - // used when determining whether an ingestion should succeed or not. - boundsByObj map[objID]*bounds - + byObj map[objID]*objKeyMeta // globalKeys represents all the keys that have been generated so far. Not // all these keys have been written to. globalKeys is sorted. globalKeys [][]byte @@ -210,6 +193,68 @@ type keyManager struct { globalKeyPrefixesMap map[string]struct{} } +type objKeyMeta struct { + id objID + // List of keys, and what has happened to each in this object. + // Will be transferred when needed. + keys map[string]*keyMeta + // bounds holds user key bounds encompassing all the keys set within an + // object. It's updated within `update` when a new op is generated. + bounds bounds +} + +// MergeKey adds the given key at the given meta timestamp, merging the histories as needed. +func (okm *objKeyMeta) MergeKey(k *keyMeta, ts int) { + meta, ok := okm.keys[string(k.key)] + if !ok { + meta = &keyMeta{ + objID: okm.id, + key: k.key, + } + okm.keys[string(k.key)] = meta + } + k.mergeInto(meta, ts) +} + +// CollapseKeys collapses the history of all keys. Used with ingestion operation +// which only use the last value of any given key. +func (okm *objKeyMeta) CollapseKeys() { + for _, keyMeta := range okm.keys { + keyMeta.history = keyMeta.history.collapsed() + } +} + +// objKeyMeta looks up the objKeyMeta for a given object, creating it if necessary. +func (k *keyManager) objKeyMeta(o objID) *objKeyMeta { + m, ok := k.byObj[o] + if !ok { + m = &objKeyMeta{ + id: o, + keys: make(map[string]*keyMeta), + } + k.byObj[o] = m + } + return m +} + +// sortedKeysForObj returns all the values in objKeyMeta(o).keys, in sorted +// order. +func (k *keyManager) sortedKeysForObj(o objID) []*keyMeta { + okm := k.objKeyMeta(o) + res := make([]*keyMeta, 0, len(okm.keys)) + for _, m := range okm.keys { + res = append(res, m) + } + slices.SortFunc(res, func(a, b *keyMeta) int { + cmp := k.comparer.Compare(a.key, b.key) + if cmp == 0 { + panic(fmt.Sprintf("distinct keys %q and %q compared as equal", a.key, b.key)) + } + return cmp + }) + return res +} + func (k *keyManager) nextMetaTimestamp() int { ret := k.metaTimestamp k.metaTimestamp++ @@ -221,14 +266,12 @@ func (k *keyManager) nextMetaTimestamp() int { func newKeyManager(numInstances int) *keyManager { m := &keyManager{ comparer: testkeys.Comparer, - byObjKey: make(map[string]*keyMeta), - byObj: make(map[objID][]*keyMeta), - boundsByObj: make(map[objID]*bounds), + byObj: make(map[objID]*objKeyMeta), globalKeysMap: make(map[string]bool), globalKeyPrefixesMap: make(map[string]struct{}), } for i := 1; i <= max(numInstances, 1); i++ { - m.byObj[makeObjID(dbTag, uint32(i))] = []*keyMeta{} + m.objKeyMeta(makeObjID(dbTag, uint32(i))) } return m } @@ -253,115 +296,62 @@ func (k *keyManager) addNewKey(key []byte) bool { // getOrInit returns the keyMeta for the (objID, key) pair, if it exists, else // allocates, initializes and returns a new value. func (k *keyManager) getOrInit(id objID, key []byte) *keyMeta { - o := makeObjKey(id, key) - m, ok := k.byObjKey[o.String()] + objKeys := k.objKeyMeta(id) + m, ok := objKeys.keys[string(key)] if ok { return m } - m = &keyMeta{objKey: makeObjKey(id, key)} + m = &keyMeta{ + objID: id, + key: key, + } // Initialize the key-to-meta index. - k.byObjKey[o.String()] = m - // Add to the id-to-metas slide. - k.byObj[o.id] = append(k.byObj[o.id], m) - + objKeys.keys[string(key)] = m // Expand the object's bounds to contain this key if they don't already. - k.expandBounds(id, bounds{ + objKeys.bounds.Expand(k.comparer.Compare, bounds{ smallest: key, largest: key, }) return m } -// mergeKeysInto merges all metadata for all keys associated with the "from" ID -// with the metadata for keys associated with the "to" ID. -func (k *keyManager) mergeKeysInto(from, to objID, mergeFunc func(src, dst *keyMeta, ts int)) { - msFrom, ok := k.byObj[from] - if !ok { - msFrom = []*keyMeta{} - k.byObj[from] = msFrom - } - msTo, ok := k.byObj[to] - if !ok { - msTo = []*keyMeta{} - k.byObj[to] = msTo - } - - // Sort to facilitate a merge. - slices.SortFunc(msFrom, func(a, b *keyMeta) int { - return bytes.Compare(a.key, b.key) - }) - slices.SortFunc(msTo, func(a, b *keyMeta) int { - return bytes.Compare(a.key, b.key) - }) - +// mergeKeysInto merges the keys and bounds from an object into another. Assumes +// the source object will not be used again. +func (k *keyManager) mergeKeysInto(from, to objID) { + fromMeta := k.objKeyMeta(from) + toMeta := k.objKeyMeta(to) ts := k.nextMetaTimestamp() - var msNew []*keyMeta - var iTo int - for _, m := range msFrom { - // Move cursor on mTo forward. - for iTo < len(msTo) && bytes.Compare(msTo[iTo].key, m.key) < 0 { - msNew = append(msNew, msTo[iTo]) - iTo++ - } - - var mTo *keyMeta - if iTo < len(msTo) && bytes.Equal(msTo[iTo].key, m.key) { - mTo = msTo[iTo] - iTo++ - } else { - mTo = &keyMeta{objKey: makeObjKey(to, m.key)} - k.byObjKey[mTo.String()] = mTo - } - - mergeFunc(m, mTo, ts) - msNew = append(msNew, mTo) - - delete(k.byObjKey, m.String()) // Unlink "from". - } - - // Add any remaining items from the "to" set. - for iTo < len(msTo) { - msNew = append(msNew, msTo[iTo]) - iTo++ + // The result should be the same, regardless of the ordering of the keys. + for _, keyMeta := range fromMeta.keys { + toMeta.MergeKey(keyMeta, ts) } - // All the keys in `from` have been merged into `to`. Expand `to`'s bounds // to be at least as wide as `from`'s. - if fromBounds := k.boundsByObj[from]; fromBounds != nil { - k.expandBounds(to, *fromBounds) - } - k.byObj[to] = msNew // Update "to" obj. - delete(k.byObj, from) // Unlink "from" obj. - delete(k.boundsByObj, from) // Unlink "from" bounds. + toMeta.bounds.Expand(k.comparer.Compare, fromMeta.bounds) + + delete(k.byObj, from) // Unlink "from" obj. } // expandBounds expands the incrementally maintained bounds of o to be at least // as wide as `b`. func (k *keyManager) expandBounds(o objID, b bounds) { - existing, ok := k.boundsByObj[o] - if !ok { - existing = new(bounds) - *existing = b - k.boundsByObj[o] = existing - return - } - b.mergeInto(k.comparer.Compare, existing) + k.objKeyMeta(o).bounds.Expand(k.comparer.Compare, b) } // doObjectBoundsOverlap returns true iff any of the named objects have key // bounds that overlap any other named object. func (k *keyManager) doObjectBoundsOverlap(objIDs []objID) bool { for i := range objIDs { - ib, iok := k.boundsByObj[objIDs[i]] + ib, iok := k.byObj[objIDs[i]] if !iok { continue } for j := i + 1; j < len(objIDs); j++ { - jb, jok := k.boundsByObj[objIDs[j]] + jb, jok := k.byObj[objIDs[j]] if !jok { continue } - if ib.overlaps(k.comparer.Compare, jb) { + if ib.bounds.Overlaps(k.comparer.Compare, jb.bounds) { return true } } @@ -382,8 +372,9 @@ func (k *keyManager) doObjectBoundsOverlap(objIDs []objID) bool { // ingestOps which are NOT equivalent to committing the batch, because they can // only commit 1 internal point key at each unique user key. func (k *keyManager) checkForSingleDelConflicts(srcObj, dstObj objID, srcCollapsed bool) [][]byte { + dstKeys := k.objKeyMeta(dstObj) var conflicts [][]byte - for _, src := range k.byObj[srcObj] { + for _, src := range k.sortedKeysForObj(srcObj) { // Single delete generation logic already ensures that both srcObj and // dstObj's single deletes are deterministic within the context of their // existing writes. However, applying srcObj on top of dstObj may @@ -450,7 +441,7 @@ func (k *keyManager) checkForSingleDelConflicts(srcObj, dstObj objID, srcCollaps continue } - dst, ok := k.byObjKey[makeObjKey(dstObj, src.key).String()] + dst, ok := dstKeys.keys[string(src.key)] // If the destination writer has no record of the key, the combined key // history is simply the src object's key history which is valid due to // per-object single deletion invariants. @@ -500,7 +491,7 @@ func (k *keyManager) update(o op) { }) case *deleteOp: meta := k.getOrInit(s.writerID, s.key) - if meta.objKey.id.tag() == dbTag { + if meta.objID.tag() == dbTag { meta.clear() } else { meta.history = append(meta.history, keyHistoryItem{ @@ -518,7 +509,7 @@ func (k *keyManager) update(o op) { keyRange := pebble.KeyRange{Start: s.start, End: s.end} for _, key := range k.knownKeysInRange(keyRange) { meta := k.getOrInit(s.writerID, key) - if meta.objKey.id.tag() == dbTag { + if meta.objID.tag() == dbTag { meta.clear() } else { meta.history = append(meta.history, keyHistoryItem{ @@ -579,21 +570,18 @@ func (k *keyManager) update(o op) { // single sequence number). Instead we compute the collapsed history and // merge that. for _, batchID := range s.batchIDs { - k.mergeKeysInto(batchID, s.dbID, func(src, dst *keyMeta, ts int) { - collapsedSrc := keyMeta{ - objKey: src.objKey, - history: src.history.collapsed(), - } - collapsedSrc.mergeInto(dst, ts) - }) + k.objKeyMeta(batchID).CollapseKeys() + k.mergeKeysInto(batchID, s.dbID) } - // TODO(bilal): Handle ingestAndExciseOp and replicateOp here. + // TODO(bilal): Handle ingestAndExciseOp and replicateOp here. We currently + // disable SingleDelete when these operations are enabled (see + // multiInstanceConfig). case *applyOp: // Merge the keys from this writer into the parent writer. - k.mergeKeysInto(s.batchID, s.writerID, (*keyMeta).mergeInto) + k.mergeKeysInto(s.batchID, s.writerID) case *batchCommitOp: // Merge the keys from the batch with the keys from the DB. - k.mergeKeysInto(s.batchID, s.dbID, (*keyMeta).mergeInto) + k.mergeKeysInto(s.batchID, s.dbID) } } @@ -631,9 +619,9 @@ func (k *keyManager) prefixExists(prefix []byte) bool { func (k *keyManager) eligibleSingleDeleteKeys(o objID) (keys [][]byte) { // Creating a slice of keys is wasteful given that the caller will pick one, // but makes it simpler for unit testing. + objKeys := k.objKeyMeta(o) for _, key := range k.globalKeys { - objKey := makeObjKey(o, key) - meta, ok := k.byObjKey[objKey.String()] + meta, ok := objKeys.keys[string(key)] if !ok { keys = append(keys, key) continue diff --git a/metamorphic/key_manager_test.go b/metamorphic/key_manager_test.go index 79ffc58b12..3e44eefba5 100644 --- a/metamorphic/key_manager_test.go +++ b/metamorphic/key_manager_test.go @@ -12,28 +12,6 @@ import ( "github.com/stretchr/testify/require" ) -func TestObjKey(t *testing.T) { - testCases := []struct { - key objKey - want string - }{ - { - key: makeObjKey(makeObjID(dbTag, 1), []byte("foo")), - want: "db1:foo", - }, - { - key: makeObjKey(makeObjID(batchTag, 1), []byte("bar")), - want: "batch1:bar", - }, - } - - for _, tc := range testCases { - t.Run("", func(t *testing.T) { - require.Equal(t, tc.want, tc.key.String()) - }) - } -} - func TestKeyManager_AddKey(t *testing.T) { m := newKeyManager(1 /* numInstances */) require.Empty(t, m.globalKeys) @@ -74,25 +52,6 @@ func TestKeyManager_AddKey(t *testing.T) { }, m.prefixes()) } -func TestKeyManager_GetOrInit(t *testing.T) { - id := makeObjID(batchTag, 1) - key := []byte("foo") - o := makeObjKey(id, key) - - m := newKeyManager(1 /* numInstances */) - require.NotContains(t, m.byObjKey, o.String()) - require.NotContains(t, m.byObj, id) - require.Contains(t, m.byObj, makeObjID(dbTag, 1)) // Always contains the DB key. - - meta1 := m.getOrInit(id, key) - require.Contains(t, m.byObjKey, o.String()) - require.Contains(t, m.byObj, id) - - // Idempotent. - meta2 := m.getOrInit(id, key) - require.Equal(t, meta1, meta2) -} - func mustParseObjID(s string) objID { id, err := parseObjID(s) if err != nil { @@ -137,7 +96,7 @@ func TestKeyManager(t *testing.T) { case "bounds": for i := 1; i < len(fields); i++ { objID := mustParseObjID(fields[1]) - fmt.Fprintf(&buf, "%s: %s\n", objID, km.boundsByObj[objID]) + fmt.Fprintf(&buf, "%s: %s\n", objID, km.objKeyMeta(objID).bounds) } case "keys": fmt.Fprintf(&buf, "keys: ")