From e3a1093f137e3797cb4696baf29294a2fb9af859 Mon Sep 17 00:00:00 2001 From: Michael Butler Date: Mon, 21 Oct 2024 10:49:31 -0400 Subject: [PATCH 1/3] backupccl: remove deprecated restore checkpointing method The new checkpointing logic has been default since 23.1, so it is safe delete the old checkpointing logic-- we do not expect 25.1 to resume a restore that began pre 23.1. Epic: none Release note: non --- pkg/ccl/backupccl/bench_covering_test.go | 3 +- .../generative_split_and_scatter_processor.go | 4 +- ...rative_split_and_scatter_processor_test.go | 1 - pkg/ccl/backupccl/restore_job.go | 15 +- .../backupccl/restore_processor_planning.go | 6 +- pkg/ccl/backupccl/restore_progress.go | 180 ++++++------------ pkg/ccl/backupccl/restore_progress_test.go | 2 +- pkg/ccl/backupccl/restore_span_covering.go | 33 +--- .../backupccl/restore_span_covering_test.go | 50 ++--- pkg/sql/execinfrapb/processors_bulk_io.proto | 5 +- 10 files changed, 80 insertions(+), 219 deletions(-) diff --git a/pkg/ccl/backupccl/bench_covering_test.go b/pkg/ccl/backupccl/bench_covering_test.go index c99175b3c0d0..c9ca1a064aa5 100644 --- a/pkg/ccl/backupccl/bench_covering_test.go +++ b/pkg/ccl/backupccl/bench_covering_test.go @@ -91,11 +91,10 @@ func BenchmarkRestoreEntryCover(b *testing.B) { filter, err := makeSpanCoveringFilter( backups[numBackups-1].Spans, []jobspb.RestoreProgress_FrontierEntry{}, - nil, introducedSpanFrontier, 0, defaultMaxFileCount, - false) + ) require.NoError(b, err) defer filter.close() diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go index 22ecb66d4ce5..c393c050d757 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -477,11 +477,9 @@ func runGenerativeSplitAndScatter( filter, err := makeSpanCoveringFilter( spec.Spans, spec.CheckpointedSpans, - spec.HighWater, introducedSpanFrontier, spec.TargetSize, - spec.MaxFileCount, - spec.UseFrontierCheckpointing) + spec.MaxFileCount) if err != nil { return errors.Wrap(err, "failed to make span covering filter") } diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go index f751c52efdf2..21957e83d2e5 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go @@ -267,7 +267,6 @@ func makeTestingGenerativeSplitAndScatterSpec( EndTime: hlc.Timestamp{}, Spans: requiredSpans, BackupLocalityInfo: nil, - HighWater: nil, UserProto: "", ChunkSize: 1, TargetSize: 1, diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 4fc48a8fd807..c8efb577ee62 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -314,15 +314,11 @@ func restore( return emptyRowCount, err } - ver := job.Payload().CreationClusterVersion - // TODO(radu,msbutler,stevendanna): we might be able to remove this now? - on231 := ver.Major > 23 || (ver.Major == 23 && ver.Minor >= 1) restoreCheckpoint := job.Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint requiredSpans := dataToRestore.getSpans() progressTracker, err := makeProgressTracker( requiredSpans, restoreCheckpoint, - on231, restoreCheckpointMaxBytes.Get(&execCtx.ExecCfg().Settings.SV), endTime) if err != nil { @@ -353,11 +349,9 @@ func restore( return makeSpanCoveringFilter( requiredSpans, restoreCheckpoint, - job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater, introducedSpanFrontier, targetSize, - maxFileCount, - progressTracker.useFrontier) + maxFileCount) }(); err != nil { return roachpb.RowCount{}, err } @@ -436,13 +430,6 @@ func restore( } tasks = append(tasks, jobProgressLoop) } - if !progressTracker.useFrontier { - // This goroutine feeds the deprecated high water mark variant of the - // generativeCheckpointLoop. - tasks = append(tasks, func(ctx context.Context) error { - return genSpan(ctx, progressTracker.inFlightSpanFeeder) - }) - } progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) if !details.ExperimentalOnline { diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 0c84e70db7e0..69325f5d73bb 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -173,21 +173,17 @@ func distRestore( EndTime: md.restoreTime, Spans: md.dataToRestore.getSpans(), BackupLocalityInfo: md.backupLocalityInfo, - HighWater: md.spanFilter.highWaterMark, UserProto: execCtx.User().EncodeProto(), TargetSize: md.spanFilter.targetSize, MaxFileCount: int64(md.spanFilter.maxFileCount), ChunkSize: int64(chunkSize), NumEntries: int64(md.numImportSpans), - UseFrontierCheckpointing: md.spanFilter.useFrontierCheckpointing, NumNodes: int64(numNodes), JobID: int64(md.jobID), SQLInstanceIDs: instanceIDs, ExclusiveFileSpanComparison: md.exclusiveEndKeys, } - if md.spanFilter.useFrontierCheckpointing { - spec.CheckpointedSpans = persistFrontier(md.spanFilter.checkpointFrontier, 0) - } + spec.CheckpointedSpans = persistFrontier(md.spanFilter.checkpointFrontier, 0) splitAndScatterProc := physicalplan.Processor{ SQLInstanceID: execCtx.ExecCfg().NodeInfo.NodeID.SQLInstanceID(), diff --git a/pkg/ccl/backupccl/restore_progress.go b/pkg/ccl/backupccl/restore_progress.go index df67d7c0819c..a9bb9d42bb8c 100644 --- a/pkg/ccl/backupccl/restore_progress.go +++ b/pkg/ccl/backupccl/restore_progress.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" spanUtils "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/errors" pbtypes "github.com/gogo/protobuf/types" ) @@ -50,26 +49,7 @@ type progressTracker struct { // res tracks the amount of data that has been ingested. res roachpb.RowCount - - // Note that the fields below are used for the deprecated high watermark progress - // tracker. - // highWaterMark represents the index into the requestsCompleted map. - highWaterMark int64 - ceiling int64 - - // As part of job progress tracking, inFlightImportSpans tracks all the - // spans that have been generated are being processed by the processors in - // distRestore. requestsCompleleted tracks the spans from - // inFlightImportSpans that have completed its processing. Once all spans up - // to index N have been processed (and appear in requestsCompleted), then - // any spans with index < N will be removed from both inFlightImportSpans - // and requestsCompleted maps. - inFlightImportSpans map[int64]roachpb.Span - requestsCompleted map[int64]bool } - useFrontier bool - inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry - // endTime is the restore as of timestamp. This can be empty, and an empty timestamp // indicates a restore of the latest revision. endTime hlc.Timestamp @@ -78,7 +58,6 @@ type progressTracker struct { func makeProgressTracker( requiredSpans roachpb.Spans, persistedSpans []jobspb.RestoreProgress_FrontierEntry, - useFrontier bool, maxBytes int64, endTime hlc.Timestamp, ) (*progressTracker, error) { @@ -87,32 +66,20 @@ func makeProgressTracker( checkpointFrontier spanUtils.Frontier err error nextRequiredSpanKey map[string]roachpb.Key - inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry ) - if useFrontier { - checkpointFrontier, err = loadCheckpointFrontier(requiredSpans, persistedSpans) - if err != nil { - return nil, err - } - nextRequiredSpanKey = make(map[string]roachpb.Key) - for i := 0; i < len(requiredSpans)-1; i++ { - nextRequiredSpanKey[requiredSpans[i].EndKey.String()] = requiredSpans[i+1].Key - } - - } else { - inFlightSpanFeeder = make(chan execinfrapb.RestoreSpanEntry, 1000) + checkpointFrontier, err = loadCheckpointFrontier(requiredSpans, persistedSpans) + if err != nil { + return nil, err + } + nextRequiredSpanKey = make(map[string]roachpb.Key) + for i := 0; i < len(requiredSpans)-1; i++ { + nextRequiredSpanKey[requiredSpans[i].EndKey.String()] = requiredSpans[i+1].Key } pt := &progressTracker{} pt.mu.checkpointFrontier = checkpointFrontier - pt.mu.highWaterMark = -1 - pt.mu.ceiling = 0 - pt.mu.inFlightImportSpans = make(map[int64]roachpb.Span) - pt.mu.requestsCompleted = make(map[int64]bool) pt.nextRequiredSpanKey = nextRequiredSpanKey pt.maxBytes = maxBytes - pt.useFrontier = useFrontier - pt.inFlightSpanFeeder = inFlightSpanFeeder pt.endTime = endTime return pt, nil } @@ -182,16 +149,10 @@ func (pt *progressTracker) updateJobCallback( func() { pt.mu.Lock() defer pt.mu.Unlock() - if pt.useFrontier { - // TODO (msbutler): this requires iterating over every span in the frontier, - // and rewriting every completed required span to disk. - // We may want to be more intelligent about this. - d.Restore.Checkpoint = persistFrontier(pt.mu.checkpointFrontier, pt.maxBytes) - } else { - if pt.mu.highWaterMark >= 0 { - d.Restore.HighWater = pt.mu.inFlightImportSpans[pt.mu.highWaterMark].Key - } - } + // TODO (msbutler): this requires iterating over every span in the frontier, + // and rewriting every completed required span to disk. + // We may want to be more intelligent about this. + d.Restore.Checkpoint = persistFrontier(pt.mu.checkpointFrontier, pt.maxBytes) }() default: log.Errorf(progressedCtx, "job payload had unexpected type %T", d) @@ -224,83 +185,48 @@ func (pt *progressTracker) ingestUpdate( } pt.mu.res.Add(progDetails.Summary) - if pt.useFrontier { - updateSpan := progDetails.DataSpan.Clone() - // If the completedSpan has the same end key as a requiredSpan_i, forward - // the frontier for the span [completedSpan_startKey, - // requiredSpan_i+1_startKey]. This trick ensures the span frontier will - // contain a single entry when the restore completes. Recall that requiredSpans are - // disjoint, and a spanFrontier never merges disjoint spans. So, without - // this trick, the spanFrontier will have O(requiredSpans) entries when the - // restore completes. This trick ensures all spans persisted to the frontier are adjacent, - // and consequently, will eventually merge. - // - // Here's a visual example: - // - this restore has two required spans: [a,d) and [e,h). - // - the restore span entry [c,d) just completed, implying the frontier logically looks like: - // - // tC| x---o - // t0| - // keys--a---b---c---d---e---f---g---h-> - // - // r-spans: |---span1---| |---span2---| - // - // - since [c,d)'s endkey equals the required span (a,d]'s endkey, - // also update the gap between required span 1 and 2 in the frontier: - // - // tC| x-------o - // t0| - // keys--a---b---c---d---e---f---g---h-> - // - // r-spans: |---span1---| |---span2---| - // - // - this will ensure that when all subspans in required spans 1 and 2 complete, - // the checkpoint frontier has one span: - // - // tC| x---------------------------o - // t0| - // keys--a---b---c---d---e---f---g---h-> - // - // r-spans: |---span1---| |---span2---| - if newEndKey, ok := pt.nextRequiredSpanKey[updateSpan.EndKey.String()]; ok { - updateSpan.EndKey = newEndKey - } - if _, err := pt.mu.checkpointFrontier.Forward(updateSpan, completedSpanTime); err != nil { - return false, err - } - } else { - idx := progDetails.ProgressIdx - - if idx >= pt.mu.ceiling { - for i := pt.mu.ceiling; i <= idx; i++ { - importSpan, ok := <-pt.inFlightSpanFeeder - if !ok { - // The channel has been closed, there is nothing left to do. - log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed") - return true, nil - } - pt.mu.inFlightImportSpans[i] = importSpan.Span - } - pt.mu.ceiling = idx + 1 - } - - if sp, ok := pt.mu.inFlightImportSpans[idx]; ok { - // Assert that we're actually marking the correct span done. See #23977. - if !sp.Key.Equal(progDetails.DataSpan.Key) { - return false, errors.Newf("request %d for span %v does not match import span for same idx: %v", - idx, progDetails.DataSpan, sp, - ) - } - pt.mu.requestsCompleted[idx] = true - prevHighWater := pt.mu.highWaterMark - for j := pt.mu.highWaterMark + 1; j < pt.mu.ceiling && pt.mu.requestsCompleted[j]; j++ { - pt.mu.highWaterMark = j - } - for j := prevHighWater; j < pt.mu.highWaterMark; j++ { - delete(pt.mu.requestsCompleted, j) - delete(pt.mu.inFlightImportSpans, j) - } - } + updateSpan := progDetails.DataSpan.Clone() + // If the completedSpan has the same end key as a requiredSpan_i, forward + // the frontier for the span [completedSpan_startKey, + // requiredSpan_i+1_startKey]. This trick ensures the span frontier will + // contain a single entry when the restore completes. Recall that requiredSpans are + // disjoint, and a spanFrontier never merges disjoint spans. So, without + // this trick, the spanFrontier will have O(requiredSpans) entries when the + // restore completes. This trick ensures all spans persisted to the frontier are adjacent, + // and consequently, will eventually merge. + // + // Here's a visual example: + // - this restore has two required spans: [a,d) and [e,h). + // - the restore span entry [c,d) just completed, implying the frontier logically looks like: + // + // tC| x---o + // t0| + // keys--a---b---c---d---e---f---g---h-> + // + // r-spans: |---span1---| |---span2---| + // + // - since [c,d)'s endkey equals the required span (a,d]'s endkey, + // also update the gap between required span 1 and 2 in the frontier: + // + // tC| x-------o + // t0| + // keys--a---b---c---d---e---f---g---h-> + // + // r-spans: |---span1---| |---span2---| + // + // - this will ensure that when all subspans in required spans 1 and 2 complete, + // the checkpoint frontier has one span: + // + // tC| x---------------------------o + // t0| + // keys--a---b---c---d---e---f---g---h-> + // + // r-spans: |---span1---| |---span2---| + if newEndKey, ok := pt.nextRequiredSpanKey[updateSpan.EndKey.String()]; ok { + updateSpan.EndKey = newEndKey + } + if _, err := pt.mu.checkpointFrontier.Forward(updateSpan, completedSpanTime); err != nil { + return false, err } return true, nil } diff --git a/pkg/ccl/backupccl/restore_progress_test.go b/pkg/ccl/backupccl/restore_progress_test.go index 7bb1fcacf731..2e8ebdd92dba 100644 --- a/pkg/ccl/backupccl/restore_progress_test.go +++ b/pkg/ccl/backupccl/restore_progress_test.go @@ -82,7 +82,7 @@ func TestProgressTracker(t *testing.T) { }, } { restoreTime := hlc.Timestamp{} - pt, err := makeProgressTracker(requiredSpans, persistedSpans, true, 0, restoreTime) + pt, err := makeProgressTracker(requiredSpans, persistedSpans, 0, restoreTime) require.NoError(t, err, "step %d", i) done, err := pt.ingestUpdate(ctx, mockUpdate(step.update, step.completeUpTo)) diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index a1a53689ebac..8bdde3b44754 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -140,22 +140,18 @@ func createIntroducedSpanFrontier( // spanCoveringFilter holds metadata that filters which backups and required spans are used to // populate a restoreSpanEntry type spanCoveringFilter struct { - checkpointFrontier spanUtils.Frontier - highWaterMark roachpb.Key - introducedSpanFrontier spanUtils.Frontier - useFrontierCheckpointing bool - targetSize int64 - maxFileCount int + checkpointFrontier spanUtils.Frontier + introducedSpanFrontier spanUtils.Frontier + targetSize int64 + maxFileCount int } func makeSpanCoveringFilter( requiredSpans roachpb.Spans, checkpointedSpans []jobspb.RestoreProgress_FrontierEntry, - highWater roachpb.Key, introducedSpanFrontier spanUtils.Frontier, targetSize int64, maxFileCount int64, - useFrontierCheckpointing bool, ) (spanCoveringFilter, error) { f, err := loadCheckpointFrontier(requiredSpans, checkpointedSpans) if err != nil { @@ -169,12 +165,10 @@ func makeSpanCoveringFilter( maxFileCount = defaultMaxFileCount } sh := spanCoveringFilter{ - introducedSpanFrontier: introducedSpanFrontier, - targetSize: targetSize, - maxFileCount: int(maxFileCount), - highWaterMark: highWater, - useFrontierCheckpointing: useFrontierCheckpointing, - checkpointFrontier: f, + introducedSpanFrontier: introducedSpanFrontier, + targetSize: targetSize, + maxFileCount: int(maxFileCount), + checkpointFrontier: f, } return sh, nil } @@ -182,16 +176,7 @@ func makeSpanCoveringFilter( // filterCompleted returns the subspans of the requiredSpan that still need to be // restored. func (f spanCoveringFilter) filterCompleted(requiredSpan roachpb.Span) roachpb.Spans { - if f.useFrontierCheckpointing { - return f.findToDoSpans(requiredSpan) - } - if requiredSpan.EndKey.Compare(f.highWaterMark) <= 0 { - return roachpb.Spans{} - } - if requiredSpan.Key.Compare(f.highWaterMark) < 0 { - requiredSpan.Key = f.highWaterMark - } - return []roachpb.Span{requiredSpan} + return f.findToDoSpans(requiredSpan) } // findToDoSpans returns the sub spans within the required span that have not completed. diff --git a/pkg/ccl/backupccl/restore_span_covering_test.go b/pkg/ccl/backupccl/restore_span_covering_test.go index ae80f63bea49..9f26892d3a94 100644 --- a/pkg/ccl/backupccl/restore_span_covering_test.go +++ b/pkg/ccl/backupccl/restore_span_covering_test.go @@ -268,7 +268,6 @@ func makeImportSpans( spans []roachpb.Span, backups []backuppb.BackupManifest, layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, - highWaterMark []byte, targetSize int64, introducedSpanFrontier spanUtils.Frontier, completedSpans []jobspb.RestoreProgress_FrontierEntry, @@ -286,11 +285,10 @@ func makeImportSpans( filter, err := makeSpanCoveringFilter( spans, completedSpans, - highWaterMark, introducedSpanFrontier, targetSize, defaultMaxFileCount, - highWaterMark == nil) + ) if err != nil { return nil, err } @@ -417,7 +415,6 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, - nil, noSpanTargetSize, emptySpanFrontier, emptyCompletedSpans) @@ -439,7 +436,6 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, - nil, 2<<20, emptySpanFrontier, emptyCompletedSpans) @@ -463,7 +459,6 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, - nil, noSpanTargetSize, introducedSpanFrontier, emptyCompletedSpans) @@ -492,7 +487,6 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, - nil, noSpanTargetSize, emptySpanFrontier, persistFrontier(frontier, 0)) @@ -520,7 +514,6 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, - nil, noSpanTargetSize, emptySpanFrontier, emptyCompletedSpans) @@ -689,10 +682,9 @@ func TestCheckpointFilter(t *testing.T) { []roachpb.Span{requiredSpan}, checkpointedSpans, nil, - nil, 0, defaultMaxFileCount, - true) + ) require.NoError(t, err) defer f.close() require.Equal(t, tc.expectedToDoSpans, f.filterCompleted(requiredSpan)) @@ -832,7 +824,6 @@ func runTestRestoreEntryCoverForSpanAndFileCounts( backups[numBackups-1].Spans, backups, layerToIterFactory, - nil, target<<20, introducedSpanFrontier, []jobspb.RestoreProgress_FrontierEntry{}) @@ -845,32 +836,23 @@ func runTestRestoreEntryCoverForSpanAndFileCounts( if len(cover) > 0 { for n := 1; n <= 5; n++ { var completedSpans []roachpb.Span - var highWater []byte var frontierEntries []jobspb.RestoreProgress_FrontierEntry // Randomly choose to use frontier checkpointing instead of // explicitly testing both forms to avoid creating an exponential // number of tests. - useFrontierCheckpointing := rand.Intn(2) == 0 - if useFrontierCheckpointing { - completedSpans = getRandomCompletedSpans(cover, n) - for _, sp := range completedSpans { - frontierEntries = append(frontierEntries, jobspb.RestoreProgress_FrontierEntry{ - Span: sp, - Timestamp: completedSpanTime, - }) - } - } else { - idx := r.Intn(len(cover)) - highWater = cover[idx].Span.EndKey + completedSpans = getRandomCompletedSpans(cover, n) + for _, sp := range completedSpans { + frontierEntries = append(frontierEntries, jobspb.RestoreProgress_FrontierEntry{ + Span: sp, + Timestamp: completedSpanTime, + }) } - resumeCover, err := makeImportSpans( ctx, backups[numBackups-1].Spans, backups, layerToIterFactory, - highWater, target<<20, introducedSpanFrontier, frontierEntries) @@ -880,21 +862,11 @@ func runTestRestoreEntryCoverForSpanAndFileCounts( // completed spans from the original required spans. var resumedRequiredSpans roachpb.Spans for _, origReq := range backups[numBackups-1].Spans { - var resumeReq []roachpb.Span - if useFrontierCheckpointing { - resumeReq = roachpb.SubtractSpans([]roachpb.Span{origReq}, completedSpans) - } else { - resumeReq = roachpb.SubtractSpans([]roachpb.Span{origReq}, []roachpb.Span{{Key: cover[0].Span.Key, EndKey: highWater}}) - } + resumeReq := roachpb.SubtractSpans([]roachpb.Span{origReq}, completedSpans) resumedRequiredSpans = append(resumedRequiredSpans, resumeReq...) } - var errorMsg string - if useFrontierCheckpointing { - errorMsg = fmt.Sprintf("completed spans in frontier: %v", completedSpans) - } else { - errorMsg = fmt.Sprintf("highwater: %v", highWater) - } + errorMsg := fmt.Sprintf("completed spans in frontier: %v", completedSpans) require.NoError(t, checkRestoreCovering(ctx, backups, resumedRequiredSpans, resumeCover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage), @@ -1033,7 +1005,7 @@ func TestRestoreEntryCoverZeroSizeFiles(t *testing.T) { expectedCover = tt.expectedCoverGenerated } - cover, err := makeImportSpans(ctx, tt.requiredSpans, backups, layerToIterFactory, nil, noSpanTargetSize, emptySpanFrontier, emptyCompletedSpans) + cover, err := makeImportSpans(ctx, tt.requiredSpans, backups, layerToIterFactory, noSpanTargetSize, emptySpanFrontier, emptyCompletedSpans) require.NoError(t, err) simpleCover := make([]simpleRestoreSpanEntry, len(cover)) diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 41c51217ca4c..4c2477639b2f 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -438,8 +438,7 @@ message GenerativeSplitAndScatterSpec { // Spans is the required spans in the restore. repeated roachpb.Span spans = 10 [(gogoproto.nullable) = false]; repeated jobs.jobspb.RestoreDetails.BackupLocalityInfo backup_locality_info = 11 [(gogoproto.nullable) = false]; - // HighWater is the high watermark of the previous run of restore. - optional bytes high_water = 12; + reserved 12; // User who initiated the restore. optional string user_proto = 13 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"]; // ChunkSize is the number of import spans per chunk. @@ -451,7 +450,7 @@ message GenerativeSplitAndScatterSpec { // NumNodes is the number of nodes available for dist restore. optional int64 num_nodes = 17[(gogoproto.nullable) = false]; optional int64 job_id = 18 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"]; - optional bool use_frontier_checkpointing = 20 [(gogoproto.nullable) = false]; + reserved 20 ; repeated jobs.jobspb.RestoreProgress.FrontierEntry checkpointed_spans = 21 [(gogoproto.nullable) = false]; // ExclusiveFileSpanComparison is true if the backup can safely use // exclusive file span comparison. From 121da383fe4bcc54837dc474a351658de69b2bc9 Mon Sep 17 00:00:00 2001 From: Michael Erickson Date: Thu, 17 Oct 2024 14:01:45 -0700 Subject: [PATCH 2/3] sql/row: fix multi-range reads from PCR standby Reads from tables with external row data (i.e. reads from a PCR standby cluster) need to use the fixed timestamp specified by the external row data. This timestamp might be different from the transaction timestamp, so we were explicitly setting BatchRequest.Timestamp in kv_batch_fetcher. The KV API only allows BatchRequest.Timestamp to be set for non-transactional requests (i.e. requests sent with a NonTransactionalSender, which is a CrossRangeTxnWrapperSender in this case). We were using a NonTransactionalSender, but this had two problems: 1. CrossRangeTxnWrapperSender in turn sends the BatchRequest with a transactional sender, which again does not allow BatchRequest.Timestamp to be set. 2. CrossRangeTxnWrapperSender uses `kv.(*Txn).CommitInBatch`, which does not provide the 1-to-1 request-response guarantee required by txnKVFetcher. It is `kv.(*Txn).Send` which provides this guarantee. Because of these two problems, whenever the txnKVFetcher would send a multi-range-spanning BatchRequest to CrossRangeTxnWrapperSender, it would either fail with a "transactional request must not set batch timestamp" error or would return an unexpected number of responses, violating the txnKVFetcher's assumed mapping from request to response. To fix both these problems, instead of using a NonTransactionalSender, change the txnKVFetcher to open a new root transaction with the correct fixed timestamp, and then use txn.Send. Fixes: #132608 Release note: None --- pkg/ccl/testccl/sqlccl/BUILD.bazel | 3 + pkg/ccl/testccl/sqlccl/standby_read_test.go | 110 ++++++++++++++++++++ pkg/sql/row/kv_batch_fetcher.go | 27 ++++- 3 files changed, 137 insertions(+), 3 deletions(-) create mode 100644 pkg/ccl/testccl/sqlccl/standby_read_test.go diff --git a/pkg/ccl/testccl/sqlccl/BUILD.bazel b/pkg/ccl/testccl/sqlccl/BUILD.bazel index edbfcab19cf9..bb12483f8086 100644 --- a/pkg/ccl/testccl/sqlccl/BUILD.bazel +++ b/pkg/ccl/testccl/sqlccl/BUILD.bazel @@ -11,6 +11,7 @@ go_test( "session_revival_test.go", "show_create_test.go", "show_transfer_state_test.go", + "standby_read_test.go", "temp_table_clean_test.go", "tenant_gc_test.go", ], @@ -39,6 +40,8 @@ go_test( "//pkg/settings/cluster", "//pkg/spanconfig", "//pkg/sql", + "//pkg/sql/catalog/lease", + "//pkg/sql/catalog/replication", "//pkg/sql/gcjob", "//pkg/sql/isql", "//pkg/sql/lexbase", diff --git a/pkg/ccl/testccl/sqlccl/standby_read_test.go b/pkg/ccl/testccl/sqlccl/standby_read_test.go new file mode 100644 index 000000000000..943621cd6ce0 --- /dev/null +++ b/pkg/ccl/testccl/sqlccl/standby_read_test.go @@ -0,0 +1,110 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package sqlccl + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/replication" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestStandbyRead(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testcases := []struct { + standby bool + stmt string + expected [][]string + }{ + {stmt: `CREATE TABLE abc (a INT PRIMARY KEY, b INT, c JSONB)`}, + {stmt: `INSERT INTO abc VALUES (1, 10, '[100]'), (3, 30, '[300]'), (5, 50, '[500]')`}, + {stmt: `ALTER TABLE abc SPLIT AT VALUES (2), (4)`}, + {stmt: `SELECT count(*) FROM [SHOW TABLES]`, expected: [][]string{{"1"}}}, + {stmt: `SELECT count(*) FROM abc`, expected: [][]string{{"3"}}}, + {standby: true, stmt: `SELECT count(*) FROM [SHOW TABLES]`, expected: [][]string{{"1"}}}, + {standby: true, stmt: `SELECT count(*) FROM abc`, expected: [][]string{{"3"}}}, + } + + ctx := context.Background() + tc := serverutils.StartCluster(t, 3, /* numNodes */ + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + DefaultTestTenant: base.TestControlsTenantsExplicitly, + }, + }) + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0) + + _, srcDB, err := ts.TenantController().StartSharedProcessTenant(ctx, + base.TestSharedProcessTenantArgs{ + TenantID: serverutils.TestTenantID(), + TenantName: "src", + UseDatabase: "defaultdb", + }, + ) + require.NoError(t, err) + dstTenant, dstDB, err := ts.TenantController().StartSharedProcessTenant(ctx, + base.TestSharedProcessTenantArgs{ + TenantID: serverutils.TestTenantID2(), + TenantName: "dst", + UseDatabase: "defaultdb", + }, + ) + require.NoError(t, err) + + srcRunner := sqlutils.MakeSQLRunner(srcDB) + dstRunner := sqlutils.MakeSQLRunner(dstDB) + dstInternal := dstTenant.InternalDB().(*sql.InternalDB) + + dstRunner.Exec(t, `SET CLUSTER SETTING sql.defaults.distsql = always`) + dstRunner.Exec(t, `SET distsql = always`) + + waitForReplication := func() { + now := ts.Clock().Now() + err := replication.SetupOrAdvanceStandbyReaderCatalog( + ctx, serverutils.TestTenantID(), now, dstInternal, dstTenant.ClusterSettings(), + ) + if err != nil { + t.Fatal(err) + } + now = ts.Clock().Now() + lm := dstTenant.LeaseManager().(*lease.Manager) + testutils.SucceedsSoon(t, func() error { + if lm.GetSafeReplicationTS().Less(now) { + return errors.AssertionFailedf("waiting for descriptor close timestamp to catch up") + } + return nil + }) + } + + for _, tc := range testcases { + var runner *sqlutils.SQLRunner + if tc.standby { + waitForReplication() + runner = dstRunner + } else { + runner = srcRunner + } + if tc.expected == nil { + runner.Exec(t, tc.stmt) + } else { + runner.CheckQueryResultsRetry(t, tc.stmt, tc.expected) + } + } +} diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 45298a1465a2..2532ea42c7c0 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -298,7 +298,6 @@ func makeExternalSpanSendFunc( ext *fetchpb.IndexFetchSpec_ExternalRowData, db *kv.DB, batchRequestsIssued *int64, ) sendFunc { return func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { - ba.Timestamp = ext.AsOf for _, req := range ba.Requests { // We only allow external row data for a few known types of request. switch r := req.GetInner().(type) { @@ -310,12 +309,34 @@ func makeExternalSpanSendFunc( } } log.VEventf(ctx, 2, "kv external fetcher: sending a batch with %d requests", len(ba.Requests)) - res, err := db.NonTransactionalSender().Send(ctx, ba) + + // Open a new transaction with fixed timestamp set to the external + // timestamp. We must do this with txn.Send rather than using + // db.NonTransactionalSender to get the 1-to-1 request-response guarantee + // required by txnKVFetcher. + // TODO(michae2): Explore whether we should keep this transaction open for + // the duration of the surrounding transaction. + var res *kvpb.BatchResponse + err := db.TxnWithAdmissionControl( + ctx, ba.AdmissionHeader.Source, admissionpb.WorkPriority(ba.AdmissionHeader.Priority), + kv.SteppingDisabled, + func(ctx context.Context, txn *kv.Txn) error { + if err := txn.SetFixedTimestamp(ctx, ext.AsOf); err != nil { + return err + } + var err *kvpb.Error + res, err = txn.Send(ctx, ba) + if err != nil { + return err.GoError() + } + return nil + }) + // Note that in some code paths there is no concurrency when using the // sendFunc, but we choose to unconditionally use atomics here since its // overhead should be negligible in the grand scheme of things anyway. atomic.AddInt64(batchRequestsIssued, 1) - return res, err.GoError() + return res, err } } From 09a0758bc13b8e67d9fd15338265c443aed83317 Mon Sep 17 00:00:00 2001 From: Nathan VanBenschoten Date: Mon, 21 Oct 2024 01:02:41 -0400 Subject: [PATCH 3/3] kv: deflake TestMergeQueue, control merge queue directly Fixes #132831. This commit deflakes TestMergeQueue by changing how it disables the merge queue. Previously, it attempted to disable the merge queue by disabling the replica scanner. This was insufficient to prevent the merge queue from running at unexpected times, because there are other reactive reasons for the merge queue to run (e.g. gossip updates). This commit switches the test to directly setting the merge queue as active or inactive when needed. This is more reliable because it prevents unexpected background activity causing the merge queue to run at inopportune times. Release note: None --- pkg/kv/kvserver/client_merge_test.go | 40 +++++++++++----------------- 1 file changed, 15 insertions(+), 25 deletions(-) diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 2dd80ea16f67..ae0b218488d4 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -3171,7 +3171,6 @@ func TestMergeQueueWithExternalFiles(t *testing.T) { store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID()) require.NoError(t, err) - store.SetMergeQueueActive(true) if skipExternal { verifyUnmergedSoon(t, store, lhsDesc.StartKey, rhsDesc.StartKey) } else { @@ -4293,6 +4292,11 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) { func verifyMergedSoon(t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey roachpb.RKey) { t.Helper() + store.SetMergeQueueActive(true) + defer func() { + store.SetMergeQueueActive(false) + store.MustForceMergeScanAndProcess() // drain any merges that might already be queued + }() testutils.SucceedsSoon(t, func() error { store.MustForceMergeScanAndProcess() repl := store.LookupReplica(rhsStartKey) @@ -4310,6 +4314,11 @@ func verifyUnmergedSoon( t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey roachpb.RKey, ) { t.Helper() + store.SetMergeQueueActive(true) + defer func() { + store.SetMergeQueueActive(false) + store.MustForceMergeScanAndProcess() // drain any merges that might already be queued + }() testutils.SucceedsSoon(t, func() error { store.MustForceMergeScanAndProcess() repl := store.LookupReplica(rhsStartKey) @@ -4344,9 +4353,6 @@ func TestMergeQueue(t *testing.T) { WallClock: manualClock, DefaultZoneConfigOverride: &zoneConfig, }, - Store: &kvserver.StoreTestingKnobs{ - DisableScanner: true, - }, }, }, }) @@ -4354,11 +4360,6 @@ func TestMergeQueue(t *testing.T) { conf := zoneConfig.AsSpanConfig() store := tc.GetFirstStoreFromServer(t, 0) - // The cluster with manual replication disables the merge queue, - // so we need to re-enable. - _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.range_merge.queue.enabled = true`) - require.NoError(t, err) - store.SetMergeQueueActive(true) split := func(t *testing.T, key roachpb.Key, expirationTime hlc.Timestamp) { t.Helper() @@ -4429,6 +4430,7 @@ func TestMergeQueue(t *testing.T) { kvserver.SplitByLoadEnabled.Override(ctx, &s.ClusterSettings().SV, false) } + store.SetMergeQueueActive(false) // reset merge queue to inactive store.MustForceMergeScanAndProcess() // drain any merges that might already be queued split(t, rhsStartKey.AsRawKey(), hlc.Timestamp{} /* expirationTime */) } @@ -4818,7 +4820,8 @@ func TestMergeQueueSeesNonVoters(t *testing.T) { } var clusterArgs = base.TestClusterArgs{ - // We dont want the replicate queue mucking with our test, so disable it. + // We don't want the replicate queue mucking with our test, so disable it. + // This also disables the merge queue, until it is manually enabled. ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ @@ -4841,10 +4844,6 @@ func TestMergeQueueSeesNonVoters(t *testing.T) { store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(1) require.Nil(t, err) - // We're going to split the dummy range created above with an empty - // expiration time. Disable the merge queue before splitting so that the - // split ranges aren't immediately merged. - store.SetMergeQueueActive(false) leftDesc, rightDesc := splitDummyRangeInTestCluster( t, tc, dbName, "kv" /* tableName */, hlc.Timestamp{} /* splitExpirationTime */) @@ -4887,7 +4886,6 @@ func TestMergeQueueSeesNonVoters(t *testing.T) { tc.RemoveVotersOrFatal(t, rightDesc.StartKey.AsRawKey(), tc.Target(0)) rightDesc = tc.LookupRangeOrFatal(t, rightDesc.StartKey.AsRawKey()) - store.SetMergeQueueActive(true) verifyMergedSoon(t, store, leftDesc.StartKey, rightDesc.StartKey) }) } @@ -4909,7 +4907,8 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) { ctx := context.Background() var delaySnapshotTrap atomic.Value var clusterArgs = base.TestClusterArgs{ - // We dont want the replicate queue mucking with our test, so disable it. + // We don't want the replicate queue mucking with our test, so disable it. + // This also disables the merge queue, until it is manually enabled. ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ @@ -4945,17 +4944,9 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) { numNodes := 3 tc, _ := setupTestClusterWithDummyRange(t, clusterArgs, dbName, tableName, numNodes) defer tc.Stopper().Stop(ctx) - // We're controlling merge queue operation via - // `store.SetMergeQueueActive`, so enable the cluster setting here. - _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.range_merge.queue.enabled=true`) - require.NoError(t, err) store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(1) require.Nil(t, err) - // We're going to split the dummy range created above with an empty - // expiration time. Disable the merge queue before splitting so that the - // split ranges aren't immediately merged. - store.SetMergeQueueActive(false) leftDesc, rightDesc := splitDummyRangeInTestCluster( t, tc, dbName, tableName, hlc.Timestamp{}, /* splitExpirationTime */ ) @@ -4972,7 +4963,6 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) { time.Sleep(5 * time.Second) return nil }) - store.SetMergeQueueActive(true) verifyMergedSoon(t, store, leftDesc.StartKey, rightDesc.StartKey) }