diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 53af31cb50a8..03fd0c17c80a 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -9,6 +9,7 @@ ALL_TESTS = [ "//pkg/backup/backuppb:backuppb_test", "//pkg/backup/backuprand:backuprand_test", "//pkg/backup/backupresolver:backupresolver_test", + "//pkg/backup/backupsink:backupsink_test", "//pkg/backup/backuputils:backuputils_test", "//pkg/backup:backup_test", "//pkg/base:base_test", @@ -812,6 +813,8 @@ GO_TARGETS = [ "//pkg/backup/backuprand:backuprand_test", "//pkg/backup/backupresolver:backupresolver", "//pkg/backup/backupresolver:backupresolver_test", + "//pkg/backup/backupsink:backupsink", + "//pkg/backup/backupsink:backupsink_test", "//pkg/backup/backuptestutils:backuptestutils", "//pkg/backup/backuputils:backuputils", "//pkg/backup/backuputils:backuputils_test", diff --git a/pkg/backup/BUILD.bazel b/pkg/backup/BUILD.bazel index c00688db1c27..dfeb6f73839a 100644 --- a/pkg/backup/BUILD.bazel +++ b/pkg/backup/BUILD.bazel @@ -15,7 +15,6 @@ go_library( "backup_span_coverage.go", "backup_telemetry.go", "create_scheduled_backup.go", - "file_sst_sink.go", "generative_split_and_scatter_processor.go", "key_rewriter.go", "restoration_data.go", @@ -44,6 +43,7 @@ go_library( "//pkg/backup/backupinfo", "//pkg/backup/backuppb", "//pkg/backup/backupresolver", + "//pkg/backup/backupsink", "//pkg/backup/backuputils", "//pkg/base", "//pkg/build", @@ -113,7 +113,6 @@ go_library( "//pkg/sql/rowenc", "//pkg/sql/rowexec", "//pkg/sql/schemachanger/scbackup", - "//pkg/sql/sem/builtins", "//pkg/sql/sem/catconstants", "//pkg/sql/sem/catid", "//pkg/sql/sem/eval", @@ -158,7 +157,6 @@ go_library( "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", "@com_github_gogo_protobuf//types", - "@com_github_kr_pretty//:pretty", "@com_github_robfig_cron_v3//:cron", "@org_golang_x_exp//maps", ], @@ -180,7 +178,6 @@ go_test( "create_scheduled_backup_test.go", "data_driven_generated_test.go", # keep "datadriven_test.go", - "file_sst_sink_test.go", "full_cluster_backup_restore_test.go", "generative_split_and_scatter_processor_test.go", "key_rewriter_test.go", diff --git a/pkg/backup/backup_processor.go b/pkg/backup/backup_processor.go index 9ada6ea082a3..50fee9303943 100644 --- a/pkg/backup/backup_processor.go +++ b/pkg/backup/backup_processor.go @@ -11,6 +11,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/backup/backuppb" + "github.com/cockroachdb/cockroach/pkg/backup/backupsink" "github.com/cockroachdb/cockroach/pkg/cloud" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" @@ -68,12 +69,6 @@ var ( time.Minute*5, settings.NonNegativeDuration, settings.WithPublic) - targetFileSize = settings.RegisterByteSizeSetting( - settings.ApplicationLevel, - "bulkio.backup.file_size", - "target size for individual data files produced during BACKUP", - 128<<20, - settings.WithPublic) preSplitExports = settings.RegisterBoolSetting( settings.ApplicationLevel, @@ -301,14 +296,6 @@ type spanAndTime struct { finishesSpec bool } -type exportedSpan struct { - metadata backuppb.BackupManifest_File - dataSST []byte - revStart hlc.Timestamp - completedSpans int32 - resumeKey roachpb.Key -} - func runBackupProcessor( ctx context.Context, flowCtx *execinfra.FlowCtx, @@ -403,11 +390,12 @@ func runBackupProcessor( return err } - sinkConf := sstSinkConf{ - id: flowCtx.NodeID.SQLInstanceID(), - enc: spec.Encryption, - progCh: progCh, - settings: &flowCtx.Cfg.Settings.SV, + sinkConf := backupsink.SSTSinkConf{ + ID: flowCtx.NodeID.SQLInstanceID(), + Enc: spec.Encryption, + ProgCh: progCh, + Settings: &flowCtx.Cfg.Settings.SV, + ElideMode: spec.ElidePrefix, } storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest, cloud.WithClientName("backup")) if err != nil { @@ -473,16 +461,14 @@ func runBackupProcessor( // It is safe to close a nil pacer. defer pacer.Close() - sink := makeFileSSTSink(sinkConf, storage, pacer) + sink := backupsink.MakeFileSSTSink(sinkConf, storage, pacer) defer func() { - if err := sink.flush(ctx); err != nil { + if err := sink.Flush(ctx); err != nil { log.Warningf(ctx, "failed to flush SST sink: %s", err) } logClose(ctx, sink, "SST sink") }() - sink.elideMode = spec.ElidePrefix - // priority becomes true when we're sending re-attempts of reads far enough // in the past that we want to run them with priority. var priority bool @@ -671,40 +657,40 @@ func runBackupProcessor( // Even if the ExportRequest did not export any data we want to report // the span as completed for accurate progress tracking. if len(resp.Files) == 0 { - sink.writeWithNoData(exportedSpan{completedSpans: completedSpans}) + sink.WriteWithNoData(backupsink.ExportedSpan{CompletedSpans: completedSpans}) } for i, file := range resp.Files { entryCounts := countRows(file.Exported, spec.PKIDs) - ret := exportedSpan{ + ret := backupsink.ExportedSpan{ // BackupManifest_File just happens to contain the exact fields // to store the metadata we need, but there's no actual File // on-disk anywhere yet. - metadata: backuppb.BackupManifest_File{ + Metadata: backuppb.BackupManifest_File{ Span: file.Span, EntryCounts: entryCounts, LocalityKV: destLocalityKV, ApproximatePhysicalSize: uint64(len(file.SST)), }, - dataSST: file.SST, - revStart: resp.StartTime, + DataSST: file.SST, + RevStart: resp.StartTime, } if resp.ResumeSpan != nil { - ret.resumeKey = resumeSpan.span.Key + ret.ResumeKey = resumeSpan.span.Key } if span.start != spec.BackupStartTime { - ret.metadata.StartTime = span.start - ret.metadata.EndTime = span.end + ret.Metadata.StartTime = span.start + ret.Metadata.EndTime = span.end } // If multiple files were returned for this span, only one -- the // last -- should count as completing the requested span. if i == len(resp.Files)-1 { - ret.completedSpans = completedSpans + ret.CompletedSpans = completedSpans } // Cannot set the error to err, which is shared across workers. var writeErr error - resumeSpan.span.Key, writeErr = sink.write(ctx, ret) + resumeSpan.span.Key, writeErr = sink.Write(ctx, ret) if writeErr != nil { return err } @@ -719,7 +705,7 @@ func runBackupProcessor( // still be running and may still push new work (a retry) on to todo but // that is OK, since that also means it is still running and thus can // pick up that work on its next iteration. - return sink.flush(ctx) + return sink.Flush(ctx) } } }) diff --git a/pkg/backup/backupsink/BUILD.bazel b/pkg/backup/backupsink/BUILD.bazel new file mode 100644 index 000000000000..af804ba9f115 --- /dev/null +++ b/pkg/backup/backupsink/BUILD.bazel @@ -0,0 +1,55 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "backupsink", + srcs = [ + "file_sst_sink.go", + "sink_utils.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/backup/backupsink", + visibility = ["//visibility:public"], + deps = [ + "//pkg/backup/backuppb", + "//pkg/base", + "//pkg/ccl/storageccl", + "//pkg/cloud", + "//pkg/keys", + "//pkg/kv/kvpb", + "//pkg/roachpb", + "//pkg/settings", + "//pkg/sql/execinfrapb", + "//pkg/sql/sem/builtins", + "//pkg/storage", + "//pkg/util/admission", + "//pkg/util/hlc", + "//pkg/util/log", + "@com_github_cockroachdb_errors//:errors", + "@com_github_gogo_protobuf//types", + "@com_github_kr_pretty//:pretty", + ], +) + +go_test( + name = "backupsink_test", + srcs = ["file_sst_sink_test.go"], + embed = [":backupsink"], + deps = [ + "//pkg/backup/backuppb", + "//pkg/ccl/storageccl", + "//pkg/cloud", + "//pkg/cloud/cloudpb", + "//pkg/cloud/nodelocal", + "//pkg/keys", + "//pkg/roachpb", + "//pkg/settings/cluster", + "//pkg/sql/execinfrapb", + "//pkg/storage", + "//pkg/util/encoding", + "//pkg/util/hlc", + "//pkg/util/leaktest", + "//pkg/util/log", + "@com_github_cockroachdb_errors//:errors", + "@com_github_gogo_protobuf//types", + "@com_github_stretchr_testify//require", + ], +) diff --git a/pkg/backup/file_sst_sink.go b/pkg/backup/backupsink/file_sst_sink.go similarity index 70% rename from pkg/backup/file_sst_sink.go rename to pkg/backup/backupsink/file_sst_sink.go index fb6e53646044..d9db62e6a133 100644 --- a/pkg/backup/file_sst_sink.go +++ b/pkg/backup/backupsink/file_sst_sink.go @@ -3,12 +3,11 @@ // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package backup +package backupsink import ( "bytes" "context" - "fmt" io "io" "github.com/cockroachdb/cockroach/pkg/backup/backuppb" @@ -20,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" - "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/admission" hlc "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -30,16 +28,39 @@ import ( "github.com/kr/pretty" ) -type sstSinkConf struct { - progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress - enc *kvpb.FileEncryptionOptions - id base.SQLInstanceID - settings *settings.Values +var ( + targetFileSize = settings.RegisterByteSizeSetting( + settings.ApplicationLevel, + "bulkio.backup.file_size", + "target size for individual data files produced during BACKUP", + 128<<20, + settings.WithPublic) +) + +type FileSSTSinkWriter interface { + io.Closer + Flush(context.Context) error +} + +type ExportedSpan struct { + Metadata backuppb.BackupManifest_File + DataSST []byte + RevStart hlc.Timestamp + CompletedSpans int32 + ResumeKey roachpb.Key } -type fileSSTSink struct { +type SSTSinkConf struct { + ProgCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress + Enc *kvpb.FileEncryptionOptions + ID base.SQLInstanceID + Settings *settings.Values + ElideMode execinfrapb.ElidePrefix +} + +type FileSSTSink struct { dest cloud.ExternalStorage - conf sstSinkConf + conf SSTSinkConf pacer *admission.Pacer sst storage.SSTWriter @@ -63,10 +84,9 @@ type fileSSTSink struct { // flush. This counter resets on each flush. completedSpans int32 - elideMode execinfrapb.ElidePrefix elidePrefix roachpb.Key - // stats contain statistics about the actions of the fileSSTSink over its + // stats contain statistics about the actions of the FileSSTSink over its // entire lifespan. stats struct { files int // number of files created. @@ -80,13 +100,129 @@ type fileSSTSink struct { // fileSpanByteLimit is the maximum size of a file span that can be extended. const fileSpanByteLimit = 64 << 20 -func makeFileSSTSink( - conf sstSinkConf, dest cloud.ExternalStorage, pacer *admission.Pacer, -) *fileSSTSink { - return &fileSSTSink{conf: conf, dest: dest, pacer: pacer} +func MakeFileSSTSink( + conf SSTSinkConf, dest cloud.ExternalStorage, pacer *admission.Pacer, +) *FileSSTSink { + return &FileSSTSink{conf: conf, dest: dest, pacer: pacer} +} + +func (s *FileSSTSink) Write(ctx context.Context, resp ExportedSpan) (roachpb.Key, error) { + s.stats.files++ + + span := resp.Metadata.Span + + spanPrefix, err := ElidedPrefix(span.Key, s.conf.ElideMode) + if err != nil { + return nil, err + } + + // If this span starts before the last buffered span ended, we need to flush + // since it overlaps but SSTWriter demands writes in-order. + if len(s.flushedFiles) > 0 { + last := s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey + if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidePrefix) { + log.VEventf(ctx, 1, "flushing backup file %s of size %d because span %s cannot append before %s", + s.outName, s.flushedSize, span, last, + ) + s.stats.oooFlushes++ + if err := s.flushFile(ctx); err != nil { + return nil, err + } + } + } + + // Initialize the writer if needed. + if s.out == nil { + if err := s.open(ctx); err != nil { + return nil, err + } + } + s.elidePrefix = append(s.elidePrefix[:0], spanPrefix...) + + log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName) + + // To speed up SST reading, surface all the point keys first, flush, + // then surface all the range keys and flush. + // + // TODO(msbutler): investigate using single a single iterator that surfaces + // all point keys first and then all range keys + maxKey, err := s.copyPointKeys(ctx, resp.DataSST) + if err != nil { + return nil, err + } + + maxRange, err := s.copyRangeKeys(resp.DataSST) + if err != nil { + return nil, err + } + hasRangeKeys := maxRange != nil + + // extend determines if the new span should be added to the last span. This + // will occur if the previous span ended mid row, or if the new span is a + // continuation of the previous span (i.e. the new span picks up where the + // previous one ended and has the same time bounds). + var extend bool + if s.midRow { + extend = true + } else if len(s.flushedFiles) > 0 { + last := s.flushedFiles[len(s.flushedFiles)-1] + extend = last.Span.EndKey.Equal(span.Key) && + last.EndTime == resp.Metadata.EndTime && + last.StartTime == resp.Metadata.StartTime && + last.EntryCounts.DataSize < fileSpanByteLimit + } + + if len(resp.ResumeKey) > 0 { + span.EndKey, s.midRow = adjustFileEndKey(span.EndKey, maxKey, maxRange) + // Update the resume key to be the adjusted end key so that start key of the + // next file is also clean. + resp.ResumeKey = span.EndKey + } else { + s.midRow = false + } + + if extend { + if len(s.flushedFiles) == 0 { + return nil, errors.AssertionFailedf("cannot extend an empty file sink") + } + l := len(s.flushedFiles) - 1 + s.flushedFiles[l].Span.EndKey = span.EndKey + s.flushedFiles[l].EntryCounts.Add(resp.Metadata.EntryCounts) + s.flushedFiles[l].ApproximatePhysicalSize += resp.Metadata.ApproximatePhysicalSize + s.flushedFiles[l].HasRangeKeys = s.flushedFiles[l].HasRangeKeys || hasRangeKeys + s.stats.spanGrows++ + } else { + f := resp.Metadata + f.Path = s.outName + f.Span.EndKey = span.EndKey + f.HasRangeKeys = hasRangeKeys + s.flushedFiles = append(s.flushedFiles, f) + } + + s.flushedRevStart.Forward(resp.RevStart) + s.completedSpans += resp.CompletedSpans + s.flushedSize += int64(len(resp.DataSST)) + + // If our accumulated SST is now big enough, and we are positioned at the end + // of a range flush it. + if s.flushedSize > targetFileSize.Get(s.conf.Settings) && !s.midRow { + s.stats.sizeFlushes++ + log.VEventf(ctx, 2, "flushing backup file %s with size %d", s.outName, s.flushedSize) + if err := s.flushFile(ctx); err != nil { + return nil, err + } + } else { + log.VEventf(ctx, 3, "continuing to write to backup file %s of size %d", s.outName, s.flushedSize) + } + return resp.ResumeKey, err } -func (s *fileSSTSink) Close() error { +func (s *FileSSTSink) WriteWithNoData(resp ExportedSpan) { + s.completedSpans += resp.CompletedSpans + s.midRow = false +} + +func (s *FileSSTSink) Close() error { if log.V(1) && s.ctx != nil { log.Infof(s.ctx, "backup sst sink recv'd %d files, wrote %d (%d due to size, %d due to re-ordering), %d recv files extended prior span", s.stats.files, s.stats.flushes, s.stats.sizeFlushes, s.stats.oooFlushes, s.stats.spanGrows) @@ -101,11 +237,11 @@ func (s *fileSSTSink) Close() error { return nil } -func (s *fileSSTSink) flush(ctx context.Context) error { +func (s *FileSSTSink) Flush(ctx context.Context) error { return s.flushFile(ctx) } -func (s *fileSSTSink) flushFile(ctx context.Context) error { +func (s *FileSSTSink) flushFile(ctx context.Context) error { if s.out == nil { // If the writer was not initialized but the sink has reported completed // spans then there were empty ExportRequests that were processed by the @@ -124,7 +260,7 @@ func (s *fileSSTSink) flushFile(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() - case s.conf.progCh <- prog: + case s.conf.ProgCh <- prog: } s.completedSpans = 0 } @@ -145,7 +281,7 @@ func (s *fileSSTSink) flushFile(ctx context.Context) error { return err } if err := s.out.Close(); err != nil { - log.Warningf(ctx, "failed to close write in fileSSTSink: % #v", pretty.Formatter(err)) + log.Warningf(ctx, "failed to close write in FileSSTSink: % #v", pretty.Formatter(err)) return errors.Wrap(err, "writing SST") } wroteSize := s.sst.Meta.Size @@ -170,7 +306,7 @@ func (s *fileSSTSink) flushFile(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() - case s.conf.progCh <- prog: + case s.conf.ProgCh <- prog: } s.flushedFiles = nil @@ -182,8 +318,8 @@ func (s *fileSSTSink) flushFile(ctx context.Context) error { return nil } -func (s *fileSSTSink) open(ctx context.Context) error { - s.outName = generateUniqueSSTName(s.conf.id) +func (s *FileSSTSink) open(ctx context.Context) error { + s.outName = generateUniqueSSTName(s.conf.ID) if s.ctx == nil { s.ctx, s.cancel = context.WithCancel(ctx) } @@ -192,8 +328,8 @@ func (s *fileSSTSink) open(ctx context.Context) error { return err } s.out = w - if s.conf.enc != nil { - e, err := storageccl.EncryptingWriter(w, s.conf.enc.Key) + if s.conf.Enc != nil { + e, err := storageccl.EncryptingWriter(w, s.conf.Enc.Key) if err != nil { return err } @@ -214,165 +350,7 @@ func (s *fileSSTSink) open(ctx context.Context) error { return nil } - -func (s *fileSSTSink) writeWithNoData(resp exportedSpan) { - s.completedSpans += resp.completedSpans - s.midRow = false -} - -func (s *fileSSTSink) write(ctx context.Context, resp exportedSpan) (roachpb.Key, error) { - s.stats.files++ - - span := resp.metadata.Span - - spanPrefix, err := elidedPrefix(span.Key, s.elideMode) - if err != nil { - return nil, err - } - - // If this span starts before the last buffered span ended, we need to flush - // since it overlaps but SSTWriter demands writes in-order. - if len(s.flushedFiles) > 0 { - last := s.flushedFiles[len(s.flushedFiles)-1].Span.EndKey - if span.Key.Compare(last) < 0 || !bytes.Equal(spanPrefix, s.elidePrefix) { - log.VEventf(ctx, 1, "flushing backup file %s of size %d because span %s cannot append before %s", - s.outName, s.flushedSize, span, last, - ) - s.stats.oooFlushes++ - if err := s.flushFile(ctx); err != nil { - return nil, err - } - } - } - - // Initialize the writer if needed. - if s.out == nil { - if err := s.open(ctx); err != nil { - return nil, err - } - } - s.elidePrefix = append(s.elidePrefix[:0], spanPrefix...) - - log.VEventf(ctx, 2, "writing %s to backup file %s", span, s.outName) - - // To speed up SST reading, surface all the point keys first, flush, - // then surface all the range keys and flush. - // - // TODO(msbutler): investigate using single a single iterator that surfaces - // all point keys first and then all range keys - maxKey, err := s.copyPointKeys(ctx, resp.dataSST) - if err != nil { - return nil, err - } - - maxRange, err := s.copyRangeKeys(resp.dataSST) - if err != nil { - return nil, err - } - hasRangeKeys := maxRange != nil - - // extend determines if the new span should be added to the last span. This - // will occur if the previous span ended mid row, or if the new span is a - // continuation of the previous span (i.e. the new span picks up where the - // previous one ended and has the same time bounds). - var extend bool - if s.midRow { - extend = true - } else if len(s.flushedFiles) > 0 { - last := s.flushedFiles[len(s.flushedFiles)-1] - extend = last.Span.EndKey.Equal(span.Key) && - last.EndTime == resp.metadata.EndTime && - last.StartTime == resp.metadata.StartTime && - last.EntryCounts.DataSize < fileSpanByteLimit - } - - if len(resp.resumeKey) > 0 { - span.EndKey, s.midRow = adjustFileEndKey(span.EndKey, maxKey, maxRange) - // Update the resume key to be the adjusted end key so that start key of the - // next file is also clean. - resp.resumeKey = span.EndKey - } else { - s.midRow = false - } - - if extend { - if len(s.flushedFiles) == 0 { - return nil, errors.AssertionFailedf("cannot extend an empty file sink") - } - l := len(s.flushedFiles) - 1 - s.flushedFiles[l].Span.EndKey = span.EndKey - s.flushedFiles[l].EntryCounts.Add(resp.metadata.EntryCounts) - s.flushedFiles[l].ApproximatePhysicalSize += resp.metadata.ApproximatePhysicalSize - s.flushedFiles[l].HasRangeKeys = s.flushedFiles[l].HasRangeKeys || hasRangeKeys - s.stats.spanGrows++ - } else { - f := resp.metadata - f.Path = s.outName - f.Span.EndKey = span.EndKey - f.HasRangeKeys = hasRangeKeys - s.flushedFiles = append(s.flushedFiles, f) - } - - s.flushedRevStart.Forward(resp.revStart) - s.completedSpans += resp.completedSpans - s.flushedSize += int64(len(resp.dataSST)) - - // If our accumulated SST is now big enough, and we are positioned at the end - // of a range flush it. - if s.flushedSize > targetFileSize.Get(s.conf.settings) && !s.midRow { - s.stats.sizeFlushes++ - log.VEventf(ctx, 2, "flushing backup file %s with size %d", s.outName, s.flushedSize) - if err := s.flushFile(ctx); err != nil { - return nil, err - } - } else { - log.VEventf(ctx, 3, "continuing to write to backup file %s of size %d", s.outName, s.flushedSize) - } - return resp.resumeKey, err -} - -// adjustFileEndKey checks if the export respsonse end key can be used as a -// split point during restore. If the end key is not splitable (i.e. it splits -// two column families in the same row), the function will attempt to adjust the -// endkey to become splitable. The function returns the potentially adjusted -// end key and whether this end key is mid row/unsplitable (i.e. splits a 2 -// column families or mvcc versions). -func adjustFileEndKey(endKey, maxPointKey, maxRangeEnd roachpb.Key) (roachpb.Key, bool) { - maxKey := maxPointKey - if maxKey.Compare(maxRangeEnd) < 0 { - maxKey = maxRangeEnd - } - - endRowKey, err := keys.EnsureSafeSplitKey(endKey) - if err != nil { - // If the key does not parse a family key, it must be from reaching the end - // of a range and be a range boundary. - return endKey, false - } - - // If the end key parses as a family key but truncating to the row key does - // _not_ produce a row key greater than every key in the file, then one of two - // things has happened: we *did* stop at family key mid-row, so we copied some - // families after the row key but have more to get in the next file -- so we - // must *not* flush now -- or the file ended at a range boundary that _looks_ - // like a family key due to a numeric suffix, so the (nonsense) truncated key - // is now some prefix less than the last copied key. The latter is unfortunate - // but should be rare given range-sized export requests. - if endRowKey.Compare(maxKey) <= 0 { - return endKey, true - } - - // If the file end does parse as a family key but the truncated 'row' key is - // still above any key in the file, the end key likely came from export's - // iteration stopping early and setting the end to the resume key, i.e. the - // next real family key. In this case, we are not mid-row, but want to adjust - // our span end -- and where we resume the next file -- to be this row key. - // Thus return the truncated row key and false. - return endRowKey, false - -} - -func (s *fileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachpb.Key, error) { +func (s *FileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachpb.Key, error) { iterOpts := storage.IterOptions{ KeyTypes: storage.IterKeyTypePointsOnly, LowerBound: keys.LocalMax, @@ -454,7 +432,7 @@ func (s *fileSSTSink) copyPointKeys(ctx context.Context, dataSST []byte) (roachp // copyRangeKeys copies all range keys from the dataSST into the buffer and // returns the max range key observed. -func (s *fileSSTSink) copyRangeKeys(dataSST []byte) (roachpb.Key, error) { +func (s *FileSSTSink) copyRangeKeys(dataSST []byte) (roachpb.Key, error) { iterOpts := storage.IterOptions{ KeyTypes: storage.IterKeyTypeRangesOnly, LowerBound: keys.LocalMax, @@ -492,29 +470,3 @@ func (s *fileSSTSink) copyRangeKeys(dataSST []byte) (roachpb.Key, error) { } return maxKey, nil } - -func generateUniqueSSTName(nodeID base.SQLInstanceID) string { - // The data/ prefix, including a /, is intended to group SSTs in most of the - // common file/bucket browse UIs. - return fmt.Sprintf("data/%d.sst", - builtins.GenerateUniqueInt(builtins.ProcessUniqueID(nodeID))) -} - -func elidedPrefix(key roachpb.Key, mode execinfrapb.ElidePrefix) ([]byte, error) { - switch mode { - case execinfrapb.ElidePrefix_TenantAndTable: - rest, err := keys.StripTablePrefix(key) - if err != nil { - return nil, err - } - return key[: len(key)-len(rest) : len(key)-len(rest)], nil - - case execinfrapb.ElidePrefix_Tenant: - rest, err := keys.StripTenantPrefix(key) - if err != nil { - return nil, err - } - return key[: len(key)-len(rest) : len(key)-len(rest)], nil - } - return nil, nil -} diff --git a/pkg/backup/file_sst_sink_test.go b/pkg/backup/backupsink/file_sst_sink_test.go similarity index 90% rename from pkg/backup/file_sst_sink_test.go rename to pkg/backup/backupsink/file_sst_sink_test.go index 5b705433d7c4..042f690229c9 100644 --- a/pkg/backup/file_sst_sink_test.go +++ b/pkg/backup/backupsink/file_sst_sink_test.go @@ -3,7 +3,7 @@ // Use of this software is governed by the CockroachDB Software License // included in the /LICENSE file. -package backup +package backupsink import ( "bytes" @@ -33,7 +33,7 @@ import ( "github.com/stretchr/testify/require" ) -// TestFileSSTSinkExtendOneFile is a regression test for a bug in fileSSTSink in +// TestFileSSTSinkExtendOneFile is a regression test for a bug in FileSSTSink in // which the sink fails to extend its last span added if there's only one file // in the sink so far. func TestFileSSTSinkExtendOneFile(t *testing.T) { @@ -52,8 +52,8 @@ func TestFileSSTSinkExtendOneFile(t *testing.T) { return b.Bytes() } - exportResponse1 := exportedSpan{ - metadata: backuppb.BackupManifest_File{ + exportResponse1 := ExportedSpan{ + Metadata: backuppb.BackupManifest_File{ Span: roachpb.Span{ Key: []byte("b"), EndKey: []byte("b"), @@ -66,14 +66,14 @@ func TestFileSSTSinkExtendOneFile(t *testing.T) { EndTime: hlc.Timestamp{}, LocalityKV: "", }, - dataSST: getKeys("b", 100), - revStart: hlc.Timestamp{}, - completedSpans: 1, - resumeKey: []byte("b"), + DataSST: getKeys("b", 100), + RevStart: hlc.Timestamp{}, + CompletedSpans: 1, + ResumeKey: []byte("b"), } - exportResponse2 := exportedSpan{ - metadata: backuppb.BackupManifest_File{ + exportResponse2 := ExportedSpan{ + Metadata: backuppb.BackupManifest_File{ Span: roachpb.Span{ Key: []byte("b"), EndKey: []byte("z"), @@ -86,28 +86,28 @@ func TestFileSSTSinkExtendOneFile(t *testing.T) { EndTime: hlc.Timestamp{}, LocalityKV: "", }, - dataSST: getKeys("c", 100), - revStart: hlc.Timestamp{}, - completedSpans: 1, + DataSST: getKeys("c", 100), + RevStart: hlc.Timestamp{}, + CompletedSpans: 1, } st := cluster.MakeTestingClusterSettings() targetFileSize.Override(ctx, &st.SV, 20) - sink, _ := fileSSTSinkTestSetUp(ctx, t, st) + sink, _ := fileSSTSinkTestSetup(t, st) - resumeKey, err := sink.write(ctx, exportResponse1) + resumeKey, err := sink.Write(ctx, exportResponse1) require.NoError(t, err) - require.Equal(t, exportResponse1.resumeKey, resumeKey) - resumeKey, err = sink.write(ctx, exportResponse2) + require.Equal(t, exportResponse1.ResumeKey, resumeKey) + resumeKey, err = sink.Write(ctx, exportResponse2) require.NoError(t, err) - require.Equal(t, exportResponse2.resumeKey, resumeKey) + require.Equal(t, exportResponse2.ResumeKey, resumeKey) // Close the sink. require.NoError(t, err) - close(sink.conf.progCh) + close(sink.conf.ProgCh) var progs []execinfrapb.RemoteProducerMetadata_BulkProcessorProgress - for p := range sink.conf.progCh { + for p := range sink.conf.ProgCh { progs = append(progs, p) } @@ -124,7 +124,7 @@ func TestFileSSTSinkExtendOneFile(t *testing.T) { // TestFileSSTSinkWrite tests the contents of flushed files and the internal // unflushed files of the FileSSTSink under different write scenarios. Each test -// writes a sequence of exportedSpans into a fileSSTSink. The test then verifies +// writes a sequence of exportedSpans into a FileSSTSink. The test then verifies // the spans of the flushed files and the unflushed files still left in the // sink, as well as makes sure all keys in each file fall within the span // boundaries. @@ -136,7 +136,7 @@ func TestFileSSTSinkWrite(t *testing.T) { type testCase struct { name string - exportSpans []exportedSpan + exportSpans []ExportedSpan flushedSpans []roachpb.Spans elideFlushedSpans []roachpb.Spans unflushedSpans []roachpb.Spans @@ -150,7 +150,7 @@ func TestFileSSTSinkWrite(t *testing.T) { } for _, tt := range []testCase{{name: "out-of-order-key-boundary", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newExportedSpanBuilder("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "c", timestamp: 10}}).build(), newExportedSpanBuilder("b", "d").withKVs([]kvAndTS{{key: "b", timestamp: 10}, {key: "d", timestamp: 10}}).build(), }, @@ -164,7 +164,7 @@ func TestFileSSTSinkWrite(t *testing.T) { // // TODO (msbutler): this test is currently skipped as it has a non nil errorExplanation. Unskip it. name: "out-of-order-not-key-boundary", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newRawExportedSpanBuilder(s2k0("a"), s2k0("c"), s2k0("c")).withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "c", timestamp: 10}}).build(), newExportedSpanBuilder("b", "d").withKVs([]kvAndTS{{key: "b", timestamp: 10}, {key: "d", timestamp: 10}}).build(), newExportedSpanBuilder("c", "e").withKVs([]kvAndTS{{key: "c", timestamp: 9}, {key: "e", timestamp: 10}}).build(), @@ -178,7 +178,7 @@ func TestFileSSTSinkWrite(t *testing.T) { }, { name: "prefix-differ", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newRawExportedSpanBuilder(s2k0("2/a"), s2k0("2/c"), s2k0("2/c")).withKVs([]kvAndTS{{key: "2/a", timestamp: 10}, {key: "2/c", timestamp: 10}}).build(), newExportedSpanBuilder("2/c", "2/d").withKVs([]kvAndTS{{key: "2/c", timestamp: 9}, {key: "2/d", timestamp: 10}}).build(), newExportedSpanBuilder("3/c", "3/e").withKVs([]kvAndTS{{key: "3/c", timestamp: 9}, {key: "3/d", timestamp: 10}}).build(), @@ -195,7 +195,7 @@ func TestFileSSTSinkWrite(t *testing.T) { }, { name: "extend-key-boundary-1-file", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newExportedSpanBuilder("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10}}).build(), newExportedSpanBuilder("c", "e").withKVs([]kvAndTS{{key: "c", timestamp: 10}, {key: "d", timestamp: 10}}).build(), }, @@ -204,7 +204,7 @@ func TestFileSSTSinkWrite(t *testing.T) { }, { name: "extend-key-boundary-2-files", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newExportedSpanBuilder("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "b", timestamp: 10}}).build(), newExportedSpanBuilder("c", "e").withKVs([]kvAndTS{{key: "c", timestamp: 10}, {key: "d", timestamp: 10}}).build(), newExportedSpanBuilder("e", "g").withKVs([]kvAndTS{{key: "e", timestamp: 10}, {key: "f", timestamp: 10}}).build(), @@ -214,7 +214,7 @@ func TestFileSSTSinkWrite(t *testing.T) { }, { name: "extend-not-key-boundary", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newRawExportedSpanBuilder(s2k0("a"), s2k0("c"), s2k0("c")).withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "c", timestamp: 10}}).build(), newExportedSpanBuilder("c", "e").withKVs([]kvAndTS{{key: "c", timestamp: 9}, {key: "d", timestamp: 10}}).build(), }, @@ -230,7 +230,7 @@ func TestFileSSTSinkWrite(t *testing.T) { // TODO(msbutler): this test is skipped, as it has a non nil errorExplanation. Unskip this. name: "extend-same-key", errorExplanation: "incorrectly fails with pebble: keys must be added in strictly increasing order", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newRawExportedSpanBuilder(s2k0("a"), s2k0("a"), s2k("a")).withKVs([]kvAndTS{{key: "a", timestamp: 10}, {key: "a", timestamp: 9}}).build(), newRawExportedSpanBuilder(s2k0("a"), s2k0("a"), s2k("a")).withKVs([]kvAndTS{{key: "a", timestamp: 5}, {key: "a", timestamp: 4}}).build(), newRawExportedSpanBuilder(s2k0("a"), s2k0("a"), s2k("a")).withKVs([]kvAndTS{{key: "a", timestamp: 8}, {key: "a", timestamp: 7}}).build(), @@ -239,8 +239,8 @@ func TestFileSSTSinkWrite(t *testing.T) { unflushedSpans: []roachpb.Spans{{{Key: []byte("a"), EndKey: []byte("e")}}}, }, { - name: "extend-metadata-same-timestamp", - exportSpans: []exportedSpan{ + name: "extend-Metadata-same-timestamp", + exportSpans: []ExportedSpan{ newExportedSpanBuilder("a", "c"). withKVs([]kvAndTS{{key: "a", timestamp: 5}, {key: "b", timestamp: 5}}). withStartTime(5). @@ -258,8 +258,8 @@ func TestFileSSTSinkWrite(t *testing.T) { }, }, { - name: "no-extend-metadata-timestamp-mismatch", - exportSpans: []exportedSpan{ + name: "no-extend-Metadata-timestamp-mismatch", + exportSpans: []ExportedSpan{ newExportedSpanBuilder("a", "c"). withKVs([]kvAndTS{{key: "a", timestamp: 5}, {key: "b", timestamp: 5}}). withEndTime(5). @@ -277,7 +277,7 @@ func TestFileSSTSinkWrite(t *testing.T) { }, { name: "size-flush", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newExportedSpanBuilder("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10, value: make([]byte, 20<<20)}, {key: "b", timestamp: 10}}).build(), newExportedSpanBuilder("d", "f").withKVs([]kvAndTS{{key: "d", timestamp: 10}, {key: "e", timestamp: 10}}).build(), }, @@ -291,7 +291,7 @@ func TestFileSSTSinkWrite(t *testing.T) { { // No flush can occur between two versions of the same key. Further, we must combine flushes which split a row. name: "no-size-flush-if-mid-mvcc", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newRawExportedSpanBuilder(s2k0("a"), s2k0("c"), s2k0("c")).withKVs([]kvAndTS{{key: "a", timestamp: 10, value: make([]byte, 20<<20)}, {key: "c", timestamp: 10}}).build(), newRawExportedSpanBuilder(s2k0("c"), s2k0("f"), s2k0("f")).withKVs([]kvAndTS{{key: "c", timestamp: 8}, {key: "f", timestamp: 10}}).build(), }, @@ -303,7 +303,7 @@ func TestFileSSTSinkWrite(t *testing.T) { { // No flush can occur between the two column families of the same row. Further, we must combine flushes which split a row. name: "no-size-flush-mid-col-family", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newRawExportedSpanBuilder(s2kWithColFamily("c", 0), s2kWithColFamily("c", 1), s2kWithColFamily("c", 1)).withKVs([]kvAndTS{ {key: "c", timestamp: 10, value: make([]byte, 20<<20)}}).build(), newRawExportedSpanBuilder(s2kWithColFamily("c", 1), s2kWithColFamily("c", 2), s2kWithColFamily("c", 2)).withKVs([]kvAndTS{ @@ -317,7 +317,7 @@ func TestFileSSTSinkWrite(t *testing.T) { { // It's safe to flush at the range boundary. name: "size-flush-at-range-boundary", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newRawExportedSpanBuilder(s2k("a"), s2k("d"), s2k("d")).withKVs([]kvAndTS{{key: "a", timestamp: 10, value: make([]byte, 20<<20)}, {key: "c", timestamp: 10}}).build(), }, flushedSpans: []roachpb.Spans{ @@ -331,7 +331,7 @@ func TestFileSSTSinkWrite(t *testing.T) { // key to this trimmed key. The trimmed key ensures we never split in a // row between two column families. name: "trim-resume-key", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newRawExportedSpanBuilder(s2k0("a"), s2k0("c"), s2k("c")).withKVs([]kvAndTS{{key: "a", timestamp: 10, value: make([]byte, 20<<20)}}).build(), }, flushedSpans: []roachpb.Spans{ @@ -343,7 +343,7 @@ func TestFileSSTSinkWrite(t *testing.T) { // If a logical backup file is sufficiently large, the file may be cut // even if the next span's start key matches the file's end key. name: "file-size-cut", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newExportedSpanBuilder("a", "c").withKVs([]kvAndTS{{key: "a", timestamp: 10, value: make([]byte, 64<<20)}, {key: "b", timestamp: 10}}).build(), newExportedSpanBuilder("c", "f").withKVs([]kvAndTS{{key: "c", timestamp: 10}, {key: "e", timestamp: 10}}).build(), }, @@ -357,7 +357,7 @@ func TestFileSSTSinkWrite(t *testing.T) { // No file cut can occur between the two column families of the same row, // even if the file is sufficiently large to get cut. name: "no-file-cut-mid-col-family", - exportSpans: []exportedSpan{ + exportSpans: []ExportedSpan{ newRawExportedSpanBuilder(s2kWithColFamily("c", 0), s2kWithColFamily("c", 1), s2kWithColFamily("c", 1)).withKVs([]kvAndTS{ {key: "c", timestamp: 10, value: make([]byte, 65<<20)}}).build(), newRawExportedSpanBuilder(s2kWithColFamily("c", 1), s2kWithColFamily("c", 2), s2kWithColFamily("c", 2)).withKVs([]kvAndTS{ @@ -381,24 +381,24 @@ func TestFileSSTSinkWrite(t *testing.T) { targetFileSize.Override(ctx, &st.SV, 10<<10) } - sink, store := fileSSTSinkTestSetUp(ctx, t, st) + sink, store := fileSSTSinkTestSetup(t, st) defer func() { require.NoError(t, sink.Close()) }() - sink.elideMode = elide + sink.conf.ElideMode = elide var resumeKey roachpb.Key var err error for _, es := range tt.exportSpans { if !resumeKey.Equal(roachpb.Key{}) { - require.Equal(t, resumeKey, es.metadata.Span.Key, "invalid test case: if the previous span emits a resume key, the next span must start with this key") + require.Equal(t, resumeKey, es.Metadata.Span.Key, "invalid test case: if the previous span emits a resume key, the next span must start with this key") } - resumeKey, err = sink.write(ctx, es) + resumeKey, err = sink.Write(ctx, es) require.NoError(t, err) - if !es.resumeKey.Equal(resumeKey) { + if !es.ResumeKey.Equal(resumeKey) { require.NoError(t, err) } - require.Equal(t, es.resumeKey, resumeKey, "unexpected resume key") + require.Equal(t, es.ResumeKey, resumeKey, "unexpected resume key") } progress := make([]backuppb.BackupManifest_File, 0) @@ -406,7 +406,7 @@ func TestFileSSTSinkWrite(t *testing.T) { Loop: for { select { - case p := <-sink.conf.progCh: + case p := <-sink.conf.ProgCh: var progDetails backuppb.BackupManifest_Progress if err := types.UnmarshalAny(&p.ProgressDetails, &progDetails); err != nil { t.Fatal(err) @@ -418,11 +418,11 @@ func TestFileSSTSinkWrite(t *testing.T) { } } expectedSpans := tt.flushedSpans - eliding := sink.elideMode != execinfrapb.ElidePrefix_None + eliding := sink.conf.ElideMode != execinfrapb.ElidePrefix_None if eliding && len(tt.elideFlushedSpans) > 0 { expectedSpans = tt.elideFlushedSpans } - // progCh contains the files that have already been created with + // ProgCh contains the files that have already been created with // flushes. Verify the contents. require.NoError(t, checkFiles(ctx, store, progress, expectedSpans, eliding)) @@ -431,10 +431,10 @@ func TestFileSSTSinkWrite(t *testing.T) { var actualUnflushedFiles []backuppb.BackupManifest_File actualUnflushedFiles = append(actualUnflushedFiles, sink.flushedFiles...) // We cannot end the test -- by calling flush -- if the sink is mid-key. - if len(tt.exportSpans) > 0 && !tt.exportSpans[len(tt.exportSpans)-1].resumeKey.Equal(roachpb.Key{}) { - sink.writeWithNoData(newExportedSpanBuilder("z", "zz").build()) + if len(tt.exportSpans) > 0 && !tt.exportSpans[len(tt.exportSpans)-1].ResumeKey.Equal(roachpb.Key{}) { + sink.WriteWithNoData(newExportedSpanBuilder("z", "zz").build()) } - require.NoError(t, sink.flush(ctx)) + require.NoError(t, sink.Flush(ctx)) require.NoError(t, checkFiles(ctx, store, actualUnflushedFiles, tt.unflushedSpans, eliding)) require.Empty(t, sink.flushedFiles) }) @@ -472,7 +472,7 @@ func TestFileSSTSinkStats(t *testing.T) { st := cluster.MakeTestingClusterSettings() targetFileSize.Override(ctx, &st.SV, 10<<10) - sink, _ := fileSSTSinkTestSetUp(ctx, t, st) + sink, _ := fileSSTSinkTestSetup(t, st) defer func() { require.NoError(t, sink.Close()) @@ -489,7 +489,7 @@ func TestFileSSTSinkStats(t *testing.T) { } type inputAndExpectedStats struct { - input exportedSpan + input ExportedSpan expectedStats sinkStats } @@ -525,9 +525,9 @@ func TestFileSSTSinkStats(t *testing.T) { } for _, input := range inputs { - resumeKey, err := sink.write(ctx, input.input) + resumeKey, err := sink.Write(ctx, input.input) require.NoError(t, err) - require.Nil(t, input.input.resumeKey, resumeKey) + require.Nil(t, input.input.ResumeKey, resumeKey) actualStats := sinkStats{ flushedRevStart: sink.flushedRevStart, @@ -539,10 +539,10 @@ func TestFileSSTSinkStats(t *testing.T) { spanGrows: sink.stats.spanGrows, } - require.Equal(t, input.expectedStats, actualStats, "stats after write for span %v", input.input.metadata.Span) + require.Equal(t, input.expectedStats, actualStats, "stats after write for span %v", input.input.Metadata.Span) } - require.NoError(t, sink.flush(ctx)) + require.NoError(t, sink.Flush(ctx)) } func TestFileSSTSinkCopyPointKeys(t *testing.T) { @@ -641,7 +641,7 @@ func TestFileSSTSinkCopyPointKeys(t *testing.T) { t.Run(tt.name, func(t *testing.T) { buf := &bytes.Buffer{} sst := storage.MakeTransportSSTWriter(ctx, settings, buf) - sink := fileSSTSink{sst: sst} + sink := FileSSTSink{sst: sst} compareSST := true for _, input := range tt.inputs { @@ -651,7 +651,7 @@ func TestFileSSTSinkCopyPointKeys(t *testing.T) { withKVs(kvs). withRangeKeys([]rangeKeyAndTS{{"a", "z", 10}}). build() - maxKey, err := sink.copyPointKeys(ctx, es.dataSST) + maxKey, err := sink.copyPointKeys(ctx, es.DataSST) if input.expectErr != "" { // Do not compare resulting SSTs if we expect errors. require.ErrorContains(t, err, input.expectErr) @@ -662,7 +662,7 @@ func TestFileSSTSinkCopyPointKeys(t *testing.T) { // NB: the assertion below will not be true for all exported spans, // but it is for the exported spans constrcted in this test, where we // set the end key of the exported span to the last key in the input. - require.Equal(t, es.resumeKey, maxKey) + require.Equal(t, es.ResumeKey, maxKey) } } @@ -820,7 +820,7 @@ func TestFileSSTSinkCopyRangeKeys(t *testing.T) { t.Run(tt.name, func(t *testing.T) { buf := &bytes.Buffer{} sst := storage.MakeTransportSSTWriter(ctx, settings, buf) - sink := fileSSTSink{sst: sst} + sink := FileSSTSink{sst: sst} compareSST := true for _, input := range tt.inputs { @@ -830,7 +830,7 @@ func TestFileSSTSinkCopyRangeKeys(t *testing.T) { withRangeKeys(rangeKeys). withKVs([]kvAndTS{{key: rangeKeys[0].key, timestamp: rangeKeys[0].timestamp}}). build() - resumeKey, err := sink.copyRangeKeys(es.dataSST) + resumeKey, err := sink.copyRangeKeys(es.DataSST) if input.expectErr != "" { // Do not compare resulting SSTs if we expect errors. require.ErrorContains(t, err, input.expectErr) @@ -929,7 +929,7 @@ func (rk rangeKeyAndTS) span() roachpb.Span { } type exportedSpanBuilder struct { - es *exportedSpan + es *ExportedSpan keyValues []kvAndTS rangeKeys []rangeKeyAndTS } @@ -940,8 +940,8 @@ func newExportedSpanBuilder(spanStart, spanEnd string) *exportedSpanBuilder { func newRawExportedSpanBuilder(spanStart, spanEnd, resumeKey roachpb.Key) *exportedSpanBuilder { return &exportedSpanBuilder{ - es: &exportedSpan{ - metadata: backuppb.BackupManifest_File{ + es: &ExportedSpan{ + Metadata: backuppb.BackupManifest_File{ Span: roachpb.Span{ Key: spanStart, EndKey: spanEnd, @@ -952,8 +952,8 @@ func newRawExportedSpanBuilder(spanStart, spanEnd, resumeKey roachpb.Key) *expor IndexEntries: 0, }, }, - completedSpans: 1, - resumeKey: resumeKey, + CompletedSpans: 1, + ResumeKey: resumeKey, }, } } @@ -961,8 +961,8 @@ func newRawExportedSpanBuilder(spanStart, spanEnd, resumeKey roachpb.Key) *expor func (b *exportedSpanBuilder) withKVs(keyValues []kvAndTS) *exportedSpanBuilder { b.keyValues = keyValues for _, kv := range keyValues { - b.es.metadata.EntryCounts.DataSize += int64(len(kv.key)) - b.es.metadata.EntryCounts.DataSize += int64(len(kv.value)) + b.es.Metadata.EntryCounts.DataSize += int64(len(kv.key)) + b.es.Metadata.EntryCounts.DataSize += int64(len(kv.value)) } return b } @@ -973,25 +973,25 @@ func (b *exportedSpanBuilder) withRangeKeys(rangeKeys []rangeKeyAndTS) *exported } func (b *exportedSpanBuilder) withStartTime(time int64) *exportedSpanBuilder { - b.es.metadata.StartTime = hlc.Timestamp{WallTime: time} + b.es.Metadata.StartTime = hlc.Timestamp{WallTime: time} return b } func (b *exportedSpanBuilder) withEndTime(time int64) *exportedSpanBuilder { - b.es.metadata.EndTime = hlc.Timestamp{WallTime: time} + b.es.Metadata.EndTime = hlc.Timestamp{WallTime: time} return b } func (b *exportedSpanBuilder) withRevStartTime(time int64) *exportedSpanBuilder { - b.es.revStart = hlc.Timestamp{WallTime: time} + b.es.RevStart = hlc.Timestamp{WallTime: time} return b } -func (b *exportedSpanBuilder) build() exportedSpan { +func (b *exportedSpanBuilder) build() ExportedSpan { return b.buildWithEncoding(s2k0) } -func (b *exportedSpanBuilder) buildWithEncoding(stringToKey func(string) roachpb.Key) exportedSpan { +func (b *exportedSpanBuilder) buildWithEncoding(stringToKey func(string) roachpb.Key) ExportedSpan { ctx := context.Background() settings := cluster.MakeTestingClusterSettings() buf := &bytes.Buffer{} @@ -1023,28 +1023,28 @@ func (b *exportedSpanBuilder) buildWithEncoding(stringToKey func(string) roachpb sst.Close() - b.es.dataSST = buf.Bytes() + b.es.DataSST = buf.Bytes() return *b.es } -func fileSSTSinkTestSetUp( - ctx context.Context, t *testing.T, settings *cluster.Settings, -) (*fileSSTSink, cloud.ExternalStorage) { +func fileSSTSinkTestSetup( + t *testing.T, settings *cluster.Settings, +) (*FileSSTSink, cloud.ExternalStorage) { store := nodelocal.TestingMakeNodelocalStorage(t.TempDir(), settings, cloudpb.ExternalStorage{}) // Never block. progCh := make(chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress, 100) - sinkConf := sstSinkConf{ - id: 1, - enc: nil, - progCh: progCh, - settings: &settings.SV, + sinkConf := SSTSinkConf{ + ID: 1, + Enc: nil, + ProgCh: progCh, + Settings: &settings.SV, } - sink := makeFileSSTSink(sinkConf, store, nil /* pacer */) + sink := MakeFileSSTSink(sinkConf, store, nil /* pacer */) return sink, store } diff --git a/pkg/backup/backupsink/sink_utils.go b/pkg/backup/backupsink/sink_utils.go new file mode 100644 index 000000000000..fb483486fc6b --- /dev/null +++ b/pkg/backup/backupsink/sink_utils.go @@ -0,0 +1,84 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package backupsink + +import ( + "fmt" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" + "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" +) + +// ElidedPrefix returns the prefix of the key that is elided by the given mode. +func ElidedPrefix(key roachpb.Key, mode execinfrapb.ElidePrefix) ([]byte, error) { + switch mode { + case execinfrapb.ElidePrefix_TenantAndTable: + rest, err := keys.StripTablePrefix(key) + if err != nil { + return nil, err + } + return key[: len(key)-len(rest) : len(key)-len(rest)], nil + + case execinfrapb.ElidePrefix_Tenant: + rest, err := keys.StripTenantPrefix(key) + if err != nil { + return nil, err + } + return key[: len(key)-len(rest) : len(key)-len(rest)], nil + } + return nil, nil +} + +// adjustFileEndKey checks if the export respsonse end key can be used as a +// split point during restore. If the end key is not splitable (i.e. it splits +// two column families in the same row), the function will attempt to adjust the +// endkey to become splitable. The function returns the potentially adjusted +// end key and whether this end key is mid row/unsplitable (i.e. splits a 2 +// column families or mvcc versions). +func adjustFileEndKey(endKey, maxPointKey, maxRangeEnd roachpb.Key) (roachpb.Key, bool) { + maxKey := maxPointKey + if maxKey.Compare(maxRangeEnd) < 0 { + maxKey = maxRangeEnd + } + + endRowKey, err := keys.EnsureSafeSplitKey(endKey) + if err != nil { + // If the key does not parse a family key, it must be from reaching the end + // of a range and be a range boundary. + return endKey, false + } + + // If the end key parses as a family key but truncating to the row key does + // _not_ produce a row key greater than every key in the file, then one of two + // things has happened: we *did* stop at family key mid-row, so we copied some + // families after the row key but have more to get in the next file -- so we + // must *not* flush now -- or the file ended at a range boundary that _looks_ + // like a family key due to a numeric suffix, so the (nonsense) truncated key + // is now some prefix less than the last copied key. The latter is unfortunate + // but should be rare given range-sized export requests. + if endRowKey.Compare(maxKey) <= 0 { + return endKey, true + } + + // If the file end does parse as a family key but the truncated 'row' key is + // still above any key in the file, the end key likely came from export's + // iteration stopping early and setting the end to the resume key, i.e. the + // next real family key. In this case, we are not mid-row, but want to adjust + // our span end -- and where we resume the next file -- to be this row key. + // Thus return the truncated row key and false. + return endRowKey, false + +} + +func generateUniqueSSTName(nodeID base.SQLInstanceID) string { + // The data/ prefix, including a /, is intended to group SSTs in most of the + // common file/bucket browse UIs. + return fmt.Sprintf("data/%d.sst", + builtins.GenerateUniqueInt(builtins.ProcessUniqueID(nodeID))) +} diff --git a/pkg/backup/restore_data_processor.go b/pkg/backup/restore_data_processor.go index 1dd43a76df23..48472ea34429 100644 --- a/pkg/backup/restore_data_processor.go +++ b/pkg/backup/restore_data_processor.go @@ -13,6 +13,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/backup/backuppb" + "github.com/cockroachdb/cockroach/pkg/backup/backupsink" "github.com/cockroachdb/cockroach/pkg/backup/backuputils" "github.com/cockroachdb/cockroach/pkg/ccl/storageccl" "github.com/cockroachdb/cockroach/pkg/cloud" @@ -457,7 +458,7 @@ func (rd *restoreDataProcessor) processRestoreSpanEntry( iter := sst.iter defer sst.cleanup() - elidedPrefix, err := elidedPrefix(entry.Span.Key, sst.entry.ElidedPrefix) + elidedPrefix, err := backupsink.ElidedPrefix(entry.Span.Key, sst.entry.ElidedPrefix) if err != nil { return summary, err } diff --git a/pkg/backup/restore_online.go b/pkg/backup/restore_online.go index a73fd9da5331..85093685bc87 100644 --- a/pkg/backup/restore_online.go +++ b/pkg/backup/restore_online.go @@ -13,6 +13,7 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/backup/backuppb" + "github.com/cockroachdb/cockroach/pkg/backup/backupsink" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" @@ -202,12 +203,12 @@ func sendAddRemoteSSTs( } func assertCommonPrefix(span roachpb.Span, elidedPrefixType execinfrapb.ElidePrefix) error { - syntheticPrefix, err := elidedPrefix(span.Key, elidedPrefixType) + syntheticPrefix, err := backupsink.ElidedPrefix(span.Key, elidedPrefixType) if err != nil { return err } - endKeyPrefix, err := elidedPrefix(span.EndKey, elidedPrefixType) + endKeyPrefix, err := backupsink.ElidedPrefix(span.EndKey, elidedPrefixType) if err != nil { return err } @@ -372,7 +373,7 @@ func sendRemoteAddSSTable( if fileSize == 0 { fileSize = 16 << 20 } - syntheticPrefix, err := elidedPrefix(file.BackupFileEntrySpan.Key, elidedPrefixType) + syntheticPrefix, err := backupsink.ElidedPrefix(file.BackupFileEntrySpan.Key, elidedPrefixType) if err != nil { return err }