From 8801607debb0af697ba74e902a4e68d648cbdc39 Mon Sep 17 00:00:00 2001 From: Bilal Akhtar Date: Fri, 11 Aug 2023 17:22:23 -0400 Subject: [PATCH] db: Add TestConcurrentExcise The cases around erroring out compactions with concurrent excises are getting more complex. This change adds a unit test to be able to test for compactions actually erroring out when a concurrent excise happens, by blocking the compaction and then using a TestIngestShared-like harness to do a two-Pebble shared ingestion. --- ingest_test.go | 414 +++++++++++++++++++++++++++++++++++++ testdata/concurrent_excise | 176 ++++++++++++++++ 2 files changed, 590 insertions(+) create mode 100644 testdata/concurrent_excise diff --git a/ingest_test.go b/ingest_test.go index 3a2d42c20e..ff59b62bd0 100644 --- a/ingest_test.go +++ b/ingest_test.go @@ -1166,6 +1166,420 @@ func TestSimpleIngestShared(t *testing.T) { // of d and e have been updated. } +type blockedCompaction struct { + startBlock, unblock chan struct{} +} + +func TestConcurrentExcise(t *testing.T) { + var d, d1, d2 *DB + var efos map[string]*EventuallyFileOnlySnapshot + backgroundErrs := make(chan error, 5) + var compactions map[string]*blockedCompaction + defer func() { + for _, e := range efos { + require.NoError(t, e.Close()) + } + if d1 != nil { + require.NoError(t, d1.Close()) + } + if d2 != nil { + require.NoError(t, d2.Close()) + } + }() + creatorIDCounter := uint64(1) + replicateCounter := 1 + + var wg sync.WaitGroup + defer wg.Wait() + var blockNextCompaction bool + var blockedJobID int + var blockedCompactionName string + var blockedCompactionsMu sync.Mutex // protects the above three variables. + + reset := func() { + wg.Wait() + for _, e := range efos { + require.NoError(t, e.Close()) + } + if d1 != nil { + require.NoError(t, d1.Close()) + } + if d2 != nil { + require.NoError(t, d2.Close()) + } + efos = make(map[string]*EventuallyFileOnlySnapshot) + compactions = make(map[string]*blockedCompaction) + backgroundErrs = make(chan error, 5) + + var el EventListener + el.EnsureDefaults(testLogger{t: t}) + el.FlushBegin = func(info FlushInfo) { + // Don't block flushes + } + el.BackgroundError = func(err error) { + backgroundErrs <- err + } + el.CompactionBegin = func(info CompactionInfo) { + if info.Reason == "move" { + return + } + blockedCompactionsMu.Lock() + defer blockedCompactionsMu.Unlock() + if blockNextCompaction { + blockNextCompaction = false + blockedJobID = info.JobID + } + } + el.TableCreated = func(info TableCreateInfo) { + blockedCompactionsMu.Lock() + if info.JobID != blockedJobID { + blockedCompactionsMu.Unlock() + return + } + blockedJobID = 0 + c := compactions[blockedCompactionName] + blockedCompactionName = "" + blockedCompactionsMu.Unlock() + c.startBlock <- struct{}{} + <-c.unblock + } + + sstorage := remote.NewInMem() + mem1 := vfs.NewMem() + mem2 := vfs.NewMem() + require.NoError(t, mem1.MkdirAll("ext", 0755)) + require.NoError(t, mem2.MkdirAll("ext", 0755)) + opts1 := &Options{ + Comparer: testkeys.Comparer, + LBaseMaxBytes: 1, + FS: mem1, + L0CompactionThreshold: 100, + L0StopWritesThreshold: 100, + DebugCheck: DebugCheckLevels, + FormatMajorVersion: ExperimentalFormatVirtualSSTables, + } + // lel. + lel := MakeLoggingEventListener(DefaultLogger) + tel := TeeEventListener(lel, el) + opts1.EventListener = &tel + opts1.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{ + "": sstorage, + }) + opts1.Experimental.CreateOnShared = true + opts1.Experimental.CreateOnSharedLocator = "" + // Disable automatic compactions because otherwise we'll race with + // delete-only compactions triggered by ingesting range tombstones. + opts1.DisableAutomaticCompactions = true + + opts2 := &Options{} + *opts2 = *opts1 + opts2.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{ + "": sstorage, + }) + opts2.Experimental.CreateOnShared = true + opts2.Experimental.CreateOnSharedLocator = "" + opts2.FS = mem2 + + var err error + d1, err = Open("", opts1) + require.NoError(t, err) + require.NoError(t, d1.SetCreatorID(creatorIDCounter)) + creatorIDCounter++ + d2, err = Open("", opts2) + require.NoError(t, err) + require.NoError(t, d2.SetCreatorID(creatorIDCounter)) + creatorIDCounter++ + d = d1 + } + reset() + + datadriven.RunTest(t, "testdata/concurrent_excise", func(t *testing.T, td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + reset() + return "" + case "switch": + if len(td.CmdArgs) != 1 { + return "usage: switch <1 or 2>" + } + switch td.CmdArgs[0].Key { + case "1": + d = d1 + case "2": + d = d2 + default: + return "usage: switch <1 or 2>" + } + return "ok" + case "batch": + b := d.NewIndexedBatch() + if err := runBatchDefineCmd(td, b); err != nil { + return err.Error() + } + if err := b.Commit(nil); err != nil { + return err.Error() + } + return "" + case "build": + if err := runBuildCmd(td, d, d.opts.FS); err != nil { + return err.Error() + } + return "" + + case "flush": + if err := d.Flush(); err != nil { + return err.Error() + } + return "" + + case "ingest": + if err := runIngestCmd(td, d, d.opts.FS); err != nil { + return err.Error() + } + // Wait for a possible flush. + d.mu.Lock() + for d.mu.compact.flushing { + d.mu.compact.cond.Wait() + } + d.mu.Unlock() + return "" + + case "ingest-and-excise": + if err := runIngestAndExciseCmd(td, d, d.opts.FS); err != nil { + return err.Error() + } + // Wait for a possible flush. + d.mu.Lock() + for d.mu.compact.flushing { + d.mu.compact.cond.Wait() + } + d.mu.Unlock() + return "" + + case "replicate": + if len(td.CmdArgs) != 4 { + return "usage: replicate " + } + var from, to *DB + switch td.CmdArgs[0].Key { + case "1": + from = d1 + case "2": + from = d2 + default: + return "usage: replicate " + } + switch td.CmdArgs[1].Key { + case "1": + to = d1 + case "2": + to = d2 + default: + return "usage: replicate " + } + startKey := []byte(td.CmdArgs[2].Key) + endKey := []byte(td.CmdArgs[3].Key) + + writeOpts := d.opts.MakeWriterOptions(0 /* level */, to.opts.FormatMajorVersion.MaxTableFormat()) + sstPath := fmt.Sprintf("ext/replicate%d.sst", replicateCounter) + f, err := to.opts.FS.Create(sstPath) + require.NoError(t, err) + replicateCounter++ + w := sstable.NewWriter(objstorageprovider.NewFileWritable(f), writeOpts) + + var sharedSSTs []SharedSSTMeta + err = from.ScanInternal(context.TODO(), startKey, endKey, + func(key *InternalKey, value LazyValue, _ IteratorLevel) error { + val, _, err := value.Value(nil) + require.NoError(t, err) + require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val)) + return nil + }, + func(start, end []byte, seqNum uint64) error { + require.NoError(t, w.DeleteRange(start, end)) + return nil + }, + func(start, end []byte, keys []keyspan.Key) error { + s := keyspan.Span{ + Start: start, + End: end, + Keys: keys, + KeysOrder: 0, + } + require.NoError(t, rangekey.Encode(&s, func(k base.InternalKey, v []byte) error { + return w.AddRangeKey(base.MakeInternalKey(k.UserKey, 0, k.Kind()), v) + })) + return nil + }, + func(sst *SharedSSTMeta) error { + sharedSSTs = append(sharedSSTs, *sst) + return nil + }, + ) + require.NoError(t, err) + require.NoError(t, w.Close()) + + _, err = to.IngestAndExcise([]string{sstPath}, sharedSSTs, KeyRange{Start: startKey, End: endKey}) + require.NoError(t, err) + return fmt.Sprintf("replicated %d shared SSTs", len(sharedSSTs)) + + case "get": + return runGetCmd(t, td, d) + + case "iter": + o := &IterOptions{KeyTypes: IterKeyTypePointsAndRanges} + var reader Reader + reader = d + for _, arg := range td.CmdArgs { + switch arg.Key { + case "mask-suffix": + o.RangeKeyMasking.Suffix = []byte(arg.Vals[0]) + case "mask-filter": + o.RangeKeyMasking.Filter = func() BlockPropertyFilterMask { + return sstable.NewTestKeysMaskingFilter() + } + case "snapshot": + reader = efos[arg.Vals[0]] + } + } + iter, err := reader.NewIter(o) + if err != nil { + return err.Error() + } + return runIterCmd(td, iter, true) + + case "lsm": + return runLSMCmd(td, d) + + case "metrics": + // The asynchronous loading of table stats can change metrics, so + // wait for all the tables' stats to be loaded. + d.mu.Lock() + d.waitTableStats() + d.mu.Unlock() + + return d.Metrics().String() + + case "wait-pending-table-stats": + return runTableStatsCmd(td, d) + + case "excise": + ve := &versionEdit{ + DeletedFiles: map[deletedFileEntry]*fileMetadata{}, + } + var exciseSpan KeyRange + if len(td.CmdArgs) != 2 { + panic("insufficient args for excise command") + } + exciseSpan.Start = []byte(td.CmdArgs[0].Key) + exciseSpan.End = []byte(td.CmdArgs[1].Key) + + d.mu.Lock() + d.mu.versions.logLock() + d.mu.Unlock() + current := d.mu.versions.currentVersion() + for level := range current.Levels { + iter := current.Levels[level].Iter() + for m := iter.SeekGE(d.cmp, exciseSpan.Start); m != nil && d.cmp(m.Smallest.UserKey, exciseSpan.End) < 0; m = iter.Next() { + _, err := d.excise(exciseSpan, m, ve, level) + if err != nil { + d.mu.Lock() + d.mu.versions.logUnlock() + d.mu.Unlock() + return fmt.Sprintf("error when excising %s: %s", m.FileNum, err.Error()) + } + } + } + d.mu.Lock() + d.mu.versions.logUnlock() + d.mu.Unlock() + return fmt.Sprintf("would excise %d files, use ingest-and-excise to excise.\n%s", len(ve.DeletedFiles), ve.String()) + + case "file-only-snapshot": + if len(td.CmdArgs) != 1 { + panic("insufficient args for file-only-snapshot command") + } + name := td.CmdArgs[0].Key + var keyRanges []KeyRange + for _, line := range strings.Split(td.Input, "\n") { + fields := strings.Fields(line) + if len(fields) != 2 { + return "expected two fields for file-only snapshot KeyRanges" + } + kr := KeyRange{Start: []byte(fields[0]), End: []byte(fields[1])} + keyRanges = append(keyRanges, kr) + } + + s := d.NewEventuallyFileOnlySnapshot(keyRanges) + efos[name] = s + return "ok" + + case "wait-for-file-only-snapshot": + if len(td.CmdArgs) != 1 { + panic("insufficient args for file-only-snapshot command") + } + name := td.CmdArgs[0].Key + err := efos[name].WaitForFileOnlySnapshot(1 * time.Millisecond) + if err != nil { + return err.Error() + } + return "ok" + + case "unblock": + name := td.CmdArgs[0].Key + blockedCompactionsMu.Lock() + c := compactions[name] + delete(compactions, name) + blockedCompactionsMu.Unlock() + c.unblock <- struct{}{} + return "ok" + + case "compact": + async := false + var otherArgs []datadriven.CmdArg + var bc *blockedCompaction + for i := range td.CmdArgs { + switch td.CmdArgs[i].Key { + case "block": + name := td.CmdArgs[i].Vals[0] + bc = &blockedCompaction{startBlock: make(chan struct{}), unblock: make(chan struct{})} + blockedCompactionsMu.Lock() + compactions[name] = bc + blockNextCompaction = true + blockedCompactionName = name + blockedCompactionsMu.Unlock() + async = true + default: + otherArgs = append(otherArgs, td.CmdArgs[i]) + } + } + var tdClone datadriven.TestData + tdClone = *td + tdClone.CmdArgs = otherArgs + if !async { + err := runCompactCmd(td, d) + if err != nil { + return err.Error() + } + } else { + wg.Add(1) + go func() { + defer wg.Done() + _ = runCompactCmd(&tdClone, d) + }() + <-bc.startBlock + return "spun off in separate goroutine" + } + return "ok" + case "wait-for-background-error": + err := <-backgroundErrs + return err.Error() + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} + func TestIngestExternal(t *testing.T) { var mem vfs.FS var d *DB diff --git a/testdata/concurrent_excise b/testdata/concurrent_excise new file mode 100644 index 0000000000..135566f22d --- /dev/null +++ b/testdata/concurrent_excise @@ -0,0 +1,176 @@ + +reset +---- + +switch 1 +---- +ok + +batch +set d foo +set e bar +---- + +flush +---- + +compact a-z +---- +ok + +switch 2 +---- +ok + +batch +set c fooz +set f foobar +---- + +flush +---- + +compact a-z +---- +ok + +batch +set d foobar +---- + +flush +---- + +lsm +---- +0.0: + 000007:[d#12,SET-d#12,SET] +6: + 000005:[c#10,SET-f#11,SET] + +compact a-z block=c1 +---- +spun off in separate goroutine + +iter +first +next +next +next +next +---- +c: (fooz, .) +d: (foobar, .) +f: (foobar, .) +. +. + +# This excise should cancel the in-flight compaction, causing it to error out +# below. The eventually file-only snapshot should go through because it's not +# waiting on any keys in memtables + +file-only-snapshot s1 + c e +---- +ok + +replicate 1 2 b e +---- +replicated 1 shared SSTs + +unblock c1 +---- +ok + +wait-for-file-only-snapshot s1 +---- +ok + +lsm +---- +6: + 000010:[d#13,SET-d#13,SET] + 000011:[f#11,SET-f#11,SET] + +compact a-z +---- +ok + +wait-for-background-error +---- +pebble: compaction cancelled by a concurrent operation, will retry compaction + +iter +first +next +next +next +next +---- +d: (foo, .) +f: (foobar, .) +. +. +. + +batch +set d fo +set ee foobar +set f3 something +---- + +flush +---- + +compact a-z +---- +ok + +switch 1 +---- +ok + +# The below file-only snapshot should be errored out by the concurrent excise. + +batch +set d something +---- + +flush +---- + +batch +set dd memory +---- + +file-only-snapshot s2 + c e +---- +ok + +iter snapshot=s2 +first +next +next +next +---- +d: (something, .) +dd: (memory, .) +e: (bar, .) +. + +replicate 2 1 c dd +---- +replicated 1 shared SSTs + +wait-for-file-only-snapshot s2 +---- +pebble: snapshot excised before conversion to file-only snapshot + +iter snapshot=s2 +first +next +next +next +---- +pebble: snapshot excised before conversion to file-only snapshot