diff --git a/pkg/ccl/backupccl/bench_covering_test.go b/pkg/ccl/backupccl/bench_covering_test.go index c99175b3c0d0..c9ca1a064aa5 100644 --- a/pkg/ccl/backupccl/bench_covering_test.go +++ b/pkg/ccl/backupccl/bench_covering_test.go @@ -91,11 +91,10 @@ func BenchmarkRestoreEntryCover(b *testing.B) { filter, err := makeSpanCoveringFilter( backups[numBackups-1].Spans, []jobspb.RestoreProgress_FrontierEntry{}, - nil, introducedSpanFrontier, 0, defaultMaxFileCount, - false) + ) require.NoError(b, err) defer filter.close() diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go index 22ecb66d4ce5..c393c050d757 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor.go @@ -477,11 +477,9 @@ func runGenerativeSplitAndScatter( filter, err := makeSpanCoveringFilter( spec.Spans, spec.CheckpointedSpans, - spec.HighWater, introducedSpanFrontier, spec.TargetSize, - spec.MaxFileCount, - spec.UseFrontierCheckpointing) + spec.MaxFileCount) if err != nil { return errors.Wrap(err, "failed to make span covering filter") } diff --git a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go index f751c52efdf2..21957e83d2e5 100644 --- a/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go +++ b/pkg/ccl/backupccl/generative_split_and_scatter_processor_test.go @@ -267,7 +267,6 @@ func makeTestingGenerativeSplitAndScatterSpec( EndTime: hlc.Timestamp{}, Spans: requiredSpans, BackupLocalityInfo: nil, - HighWater: nil, UserProto: "", ChunkSize: 1, TargetSize: 1, diff --git a/pkg/ccl/backupccl/restore_job.go b/pkg/ccl/backupccl/restore_job.go index 4fc48a8fd807..c8efb577ee62 100644 --- a/pkg/ccl/backupccl/restore_job.go +++ b/pkg/ccl/backupccl/restore_job.go @@ -314,15 +314,11 @@ func restore( return emptyRowCount, err } - ver := job.Payload().CreationClusterVersion - // TODO(radu,msbutler,stevendanna): we might be able to remove this now? - on231 := ver.Major > 23 || (ver.Major == 23 && ver.Minor >= 1) restoreCheckpoint := job.Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint requiredSpans := dataToRestore.getSpans() progressTracker, err := makeProgressTracker( requiredSpans, restoreCheckpoint, - on231, restoreCheckpointMaxBytes.Get(&execCtx.ExecCfg().Settings.SV), endTime) if err != nil { @@ -353,11 +349,9 @@ func restore( return makeSpanCoveringFilter( requiredSpans, restoreCheckpoint, - job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater, introducedSpanFrontier, targetSize, - maxFileCount, - progressTracker.useFrontier) + maxFileCount) }(); err != nil { return roachpb.RowCount{}, err } @@ -436,13 +430,6 @@ func restore( } tasks = append(tasks, jobProgressLoop) } - if !progressTracker.useFrontier { - // This goroutine feeds the deprecated high water mark variant of the - // generativeCheckpointLoop. - tasks = append(tasks, func(ctx context.Context) error { - return genSpan(ctx, progressTracker.inFlightSpanFeeder) - }) - } progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress) if !details.ExperimentalOnline { diff --git a/pkg/ccl/backupccl/restore_processor_planning.go b/pkg/ccl/backupccl/restore_processor_planning.go index 0c84e70db7e0..69325f5d73bb 100644 --- a/pkg/ccl/backupccl/restore_processor_planning.go +++ b/pkg/ccl/backupccl/restore_processor_planning.go @@ -173,21 +173,17 @@ func distRestore( EndTime: md.restoreTime, Spans: md.dataToRestore.getSpans(), BackupLocalityInfo: md.backupLocalityInfo, - HighWater: md.spanFilter.highWaterMark, UserProto: execCtx.User().EncodeProto(), TargetSize: md.spanFilter.targetSize, MaxFileCount: int64(md.spanFilter.maxFileCount), ChunkSize: int64(chunkSize), NumEntries: int64(md.numImportSpans), - UseFrontierCheckpointing: md.spanFilter.useFrontierCheckpointing, NumNodes: int64(numNodes), JobID: int64(md.jobID), SQLInstanceIDs: instanceIDs, ExclusiveFileSpanComparison: md.exclusiveEndKeys, } - if md.spanFilter.useFrontierCheckpointing { - spec.CheckpointedSpans = persistFrontier(md.spanFilter.checkpointFrontier, 0) - } + spec.CheckpointedSpans = persistFrontier(md.spanFilter.checkpointFrontier, 0) splitAndScatterProc := physicalplan.Processor{ SQLInstanceID: execCtx.ExecCfg().NodeInfo.NodeID.SQLInstanceID(), diff --git a/pkg/ccl/backupccl/restore_progress.go b/pkg/ccl/backupccl/restore_progress.go index df67d7c0819c..a9bb9d42bb8c 100644 --- a/pkg/ccl/backupccl/restore_progress.go +++ b/pkg/ccl/backupccl/restore_progress.go @@ -17,7 +17,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" spanUtils "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/cockroach/pkg/util/syncutil" - "github.com/cockroachdb/errors" pbtypes "github.com/gogo/protobuf/types" ) @@ -50,26 +49,7 @@ type progressTracker struct { // res tracks the amount of data that has been ingested. res roachpb.RowCount - - // Note that the fields below are used for the deprecated high watermark progress - // tracker. - // highWaterMark represents the index into the requestsCompleted map. - highWaterMark int64 - ceiling int64 - - // As part of job progress tracking, inFlightImportSpans tracks all the - // spans that have been generated are being processed by the processors in - // distRestore. requestsCompleleted tracks the spans from - // inFlightImportSpans that have completed its processing. Once all spans up - // to index N have been processed (and appear in requestsCompleted), then - // any spans with index < N will be removed from both inFlightImportSpans - // and requestsCompleted maps. - inFlightImportSpans map[int64]roachpb.Span - requestsCompleted map[int64]bool } - useFrontier bool - inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry - // endTime is the restore as of timestamp. This can be empty, and an empty timestamp // indicates a restore of the latest revision. endTime hlc.Timestamp @@ -78,7 +58,6 @@ type progressTracker struct { func makeProgressTracker( requiredSpans roachpb.Spans, persistedSpans []jobspb.RestoreProgress_FrontierEntry, - useFrontier bool, maxBytes int64, endTime hlc.Timestamp, ) (*progressTracker, error) { @@ -87,32 +66,20 @@ func makeProgressTracker( checkpointFrontier spanUtils.Frontier err error nextRequiredSpanKey map[string]roachpb.Key - inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry ) - if useFrontier { - checkpointFrontier, err = loadCheckpointFrontier(requiredSpans, persistedSpans) - if err != nil { - return nil, err - } - nextRequiredSpanKey = make(map[string]roachpb.Key) - for i := 0; i < len(requiredSpans)-1; i++ { - nextRequiredSpanKey[requiredSpans[i].EndKey.String()] = requiredSpans[i+1].Key - } - - } else { - inFlightSpanFeeder = make(chan execinfrapb.RestoreSpanEntry, 1000) + checkpointFrontier, err = loadCheckpointFrontier(requiredSpans, persistedSpans) + if err != nil { + return nil, err + } + nextRequiredSpanKey = make(map[string]roachpb.Key) + for i := 0; i < len(requiredSpans)-1; i++ { + nextRequiredSpanKey[requiredSpans[i].EndKey.String()] = requiredSpans[i+1].Key } pt := &progressTracker{} pt.mu.checkpointFrontier = checkpointFrontier - pt.mu.highWaterMark = -1 - pt.mu.ceiling = 0 - pt.mu.inFlightImportSpans = make(map[int64]roachpb.Span) - pt.mu.requestsCompleted = make(map[int64]bool) pt.nextRequiredSpanKey = nextRequiredSpanKey pt.maxBytes = maxBytes - pt.useFrontier = useFrontier - pt.inFlightSpanFeeder = inFlightSpanFeeder pt.endTime = endTime return pt, nil } @@ -182,16 +149,10 @@ func (pt *progressTracker) updateJobCallback( func() { pt.mu.Lock() defer pt.mu.Unlock() - if pt.useFrontier { - // TODO (msbutler): this requires iterating over every span in the frontier, - // and rewriting every completed required span to disk. - // We may want to be more intelligent about this. - d.Restore.Checkpoint = persistFrontier(pt.mu.checkpointFrontier, pt.maxBytes) - } else { - if pt.mu.highWaterMark >= 0 { - d.Restore.HighWater = pt.mu.inFlightImportSpans[pt.mu.highWaterMark].Key - } - } + // TODO (msbutler): this requires iterating over every span in the frontier, + // and rewriting every completed required span to disk. + // We may want to be more intelligent about this. + d.Restore.Checkpoint = persistFrontier(pt.mu.checkpointFrontier, pt.maxBytes) }() default: log.Errorf(progressedCtx, "job payload had unexpected type %T", d) @@ -224,83 +185,48 @@ func (pt *progressTracker) ingestUpdate( } pt.mu.res.Add(progDetails.Summary) - if pt.useFrontier { - updateSpan := progDetails.DataSpan.Clone() - // If the completedSpan has the same end key as a requiredSpan_i, forward - // the frontier for the span [completedSpan_startKey, - // requiredSpan_i+1_startKey]. This trick ensures the span frontier will - // contain a single entry when the restore completes. Recall that requiredSpans are - // disjoint, and a spanFrontier never merges disjoint spans. So, without - // this trick, the spanFrontier will have O(requiredSpans) entries when the - // restore completes. This trick ensures all spans persisted to the frontier are adjacent, - // and consequently, will eventually merge. - // - // Here's a visual example: - // - this restore has two required spans: [a,d) and [e,h). - // - the restore span entry [c,d) just completed, implying the frontier logically looks like: - // - // tC| x---o - // t0| - // keys--a---b---c---d---e---f---g---h-> - // - // r-spans: |---span1---| |---span2---| - // - // - since [c,d)'s endkey equals the required span (a,d]'s endkey, - // also update the gap between required span 1 and 2 in the frontier: - // - // tC| x-------o - // t0| - // keys--a---b---c---d---e---f---g---h-> - // - // r-spans: |---span1---| |---span2---| - // - // - this will ensure that when all subspans in required spans 1 and 2 complete, - // the checkpoint frontier has one span: - // - // tC| x---------------------------o - // t0| - // keys--a---b---c---d---e---f---g---h-> - // - // r-spans: |---span1---| |---span2---| - if newEndKey, ok := pt.nextRequiredSpanKey[updateSpan.EndKey.String()]; ok { - updateSpan.EndKey = newEndKey - } - if _, err := pt.mu.checkpointFrontier.Forward(updateSpan, completedSpanTime); err != nil { - return false, err - } - } else { - idx := progDetails.ProgressIdx - - if idx >= pt.mu.ceiling { - for i := pt.mu.ceiling; i <= idx; i++ { - importSpan, ok := <-pt.inFlightSpanFeeder - if !ok { - // The channel has been closed, there is nothing left to do. - log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed") - return true, nil - } - pt.mu.inFlightImportSpans[i] = importSpan.Span - } - pt.mu.ceiling = idx + 1 - } - - if sp, ok := pt.mu.inFlightImportSpans[idx]; ok { - // Assert that we're actually marking the correct span done. See #23977. - if !sp.Key.Equal(progDetails.DataSpan.Key) { - return false, errors.Newf("request %d for span %v does not match import span for same idx: %v", - idx, progDetails.DataSpan, sp, - ) - } - pt.mu.requestsCompleted[idx] = true - prevHighWater := pt.mu.highWaterMark - for j := pt.mu.highWaterMark + 1; j < pt.mu.ceiling && pt.mu.requestsCompleted[j]; j++ { - pt.mu.highWaterMark = j - } - for j := prevHighWater; j < pt.mu.highWaterMark; j++ { - delete(pt.mu.requestsCompleted, j) - delete(pt.mu.inFlightImportSpans, j) - } - } + updateSpan := progDetails.DataSpan.Clone() + // If the completedSpan has the same end key as a requiredSpan_i, forward + // the frontier for the span [completedSpan_startKey, + // requiredSpan_i+1_startKey]. This trick ensures the span frontier will + // contain a single entry when the restore completes. Recall that requiredSpans are + // disjoint, and a spanFrontier never merges disjoint spans. So, without + // this trick, the spanFrontier will have O(requiredSpans) entries when the + // restore completes. This trick ensures all spans persisted to the frontier are adjacent, + // and consequently, will eventually merge. + // + // Here's a visual example: + // - this restore has two required spans: [a,d) and [e,h). + // - the restore span entry [c,d) just completed, implying the frontier logically looks like: + // + // tC| x---o + // t0| + // keys--a---b---c---d---e---f---g---h-> + // + // r-spans: |---span1---| |---span2---| + // + // - since [c,d)'s endkey equals the required span (a,d]'s endkey, + // also update the gap between required span 1 and 2 in the frontier: + // + // tC| x-------o + // t0| + // keys--a---b---c---d---e---f---g---h-> + // + // r-spans: |---span1---| |---span2---| + // + // - this will ensure that when all subspans in required spans 1 and 2 complete, + // the checkpoint frontier has one span: + // + // tC| x---------------------------o + // t0| + // keys--a---b---c---d---e---f---g---h-> + // + // r-spans: |---span1---| |---span2---| + if newEndKey, ok := pt.nextRequiredSpanKey[updateSpan.EndKey.String()]; ok { + updateSpan.EndKey = newEndKey + } + if _, err := pt.mu.checkpointFrontier.Forward(updateSpan, completedSpanTime); err != nil { + return false, err } return true, nil } diff --git a/pkg/ccl/backupccl/restore_progress_test.go b/pkg/ccl/backupccl/restore_progress_test.go index 7bb1fcacf731..2e8ebdd92dba 100644 --- a/pkg/ccl/backupccl/restore_progress_test.go +++ b/pkg/ccl/backupccl/restore_progress_test.go @@ -82,7 +82,7 @@ func TestProgressTracker(t *testing.T) { }, } { restoreTime := hlc.Timestamp{} - pt, err := makeProgressTracker(requiredSpans, persistedSpans, true, 0, restoreTime) + pt, err := makeProgressTracker(requiredSpans, persistedSpans, 0, restoreTime) require.NoError(t, err, "step %d", i) done, err := pt.ingestUpdate(ctx, mockUpdate(step.update, step.completeUpTo)) diff --git a/pkg/ccl/backupccl/restore_span_covering.go b/pkg/ccl/backupccl/restore_span_covering.go index a1a53689ebac..8bdde3b44754 100644 --- a/pkg/ccl/backupccl/restore_span_covering.go +++ b/pkg/ccl/backupccl/restore_span_covering.go @@ -140,22 +140,18 @@ func createIntroducedSpanFrontier( // spanCoveringFilter holds metadata that filters which backups and required spans are used to // populate a restoreSpanEntry type spanCoveringFilter struct { - checkpointFrontier spanUtils.Frontier - highWaterMark roachpb.Key - introducedSpanFrontier spanUtils.Frontier - useFrontierCheckpointing bool - targetSize int64 - maxFileCount int + checkpointFrontier spanUtils.Frontier + introducedSpanFrontier spanUtils.Frontier + targetSize int64 + maxFileCount int } func makeSpanCoveringFilter( requiredSpans roachpb.Spans, checkpointedSpans []jobspb.RestoreProgress_FrontierEntry, - highWater roachpb.Key, introducedSpanFrontier spanUtils.Frontier, targetSize int64, maxFileCount int64, - useFrontierCheckpointing bool, ) (spanCoveringFilter, error) { f, err := loadCheckpointFrontier(requiredSpans, checkpointedSpans) if err != nil { @@ -169,12 +165,10 @@ func makeSpanCoveringFilter( maxFileCount = defaultMaxFileCount } sh := spanCoveringFilter{ - introducedSpanFrontier: introducedSpanFrontier, - targetSize: targetSize, - maxFileCount: int(maxFileCount), - highWaterMark: highWater, - useFrontierCheckpointing: useFrontierCheckpointing, - checkpointFrontier: f, + introducedSpanFrontier: introducedSpanFrontier, + targetSize: targetSize, + maxFileCount: int(maxFileCount), + checkpointFrontier: f, } return sh, nil } @@ -182,16 +176,7 @@ func makeSpanCoveringFilter( // filterCompleted returns the subspans of the requiredSpan that still need to be // restored. func (f spanCoveringFilter) filterCompleted(requiredSpan roachpb.Span) roachpb.Spans { - if f.useFrontierCheckpointing { - return f.findToDoSpans(requiredSpan) - } - if requiredSpan.EndKey.Compare(f.highWaterMark) <= 0 { - return roachpb.Spans{} - } - if requiredSpan.Key.Compare(f.highWaterMark) < 0 { - requiredSpan.Key = f.highWaterMark - } - return []roachpb.Span{requiredSpan} + return f.findToDoSpans(requiredSpan) } // findToDoSpans returns the sub spans within the required span that have not completed. diff --git a/pkg/ccl/backupccl/restore_span_covering_test.go b/pkg/ccl/backupccl/restore_span_covering_test.go index ae80f63bea49..9f26892d3a94 100644 --- a/pkg/ccl/backupccl/restore_span_covering_test.go +++ b/pkg/ccl/backupccl/restore_span_covering_test.go @@ -268,7 +268,6 @@ func makeImportSpans( spans []roachpb.Span, backups []backuppb.BackupManifest, layerToIterFactory backupinfo.LayerToBackupManifestFileIterFactory, - highWaterMark []byte, targetSize int64, introducedSpanFrontier spanUtils.Frontier, completedSpans []jobspb.RestoreProgress_FrontierEntry, @@ -286,11 +285,10 @@ func makeImportSpans( filter, err := makeSpanCoveringFilter( spans, completedSpans, - highWaterMark, introducedSpanFrontier, targetSize, defaultMaxFileCount, - highWaterMark == nil) + ) if err != nil { return nil, err } @@ -417,7 +415,6 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, - nil, noSpanTargetSize, emptySpanFrontier, emptyCompletedSpans) @@ -439,7 +436,6 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, - nil, 2<<20, emptySpanFrontier, emptyCompletedSpans) @@ -463,7 +459,6 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, - nil, noSpanTargetSize, introducedSpanFrontier, emptyCompletedSpans) @@ -492,7 +487,6 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, - nil, noSpanTargetSize, emptySpanFrontier, persistFrontier(frontier, 0)) @@ -520,7 +514,6 @@ func TestRestoreEntryCoverExample(t *testing.T) { spans, backups, layerToIterFactory, - nil, noSpanTargetSize, emptySpanFrontier, emptyCompletedSpans) @@ -689,10 +682,9 @@ func TestCheckpointFilter(t *testing.T) { []roachpb.Span{requiredSpan}, checkpointedSpans, nil, - nil, 0, defaultMaxFileCount, - true) + ) require.NoError(t, err) defer f.close() require.Equal(t, tc.expectedToDoSpans, f.filterCompleted(requiredSpan)) @@ -832,7 +824,6 @@ func runTestRestoreEntryCoverForSpanAndFileCounts( backups[numBackups-1].Spans, backups, layerToIterFactory, - nil, target<<20, introducedSpanFrontier, []jobspb.RestoreProgress_FrontierEntry{}) @@ -845,32 +836,23 @@ func runTestRestoreEntryCoverForSpanAndFileCounts( if len(cover) > 0 { for n := 1; n <= 5; n++ { var completedSpans []roachpb.Span - var highWater []byte var frontierEntries []jobspb.RestoreProgress_FrontierEntry // Randomly choose to use frontier checkpointing instead of // explicitly testing both forms to avoid creating an exponential // number of tests. - useFrontierCheckpointing := rand.Intn(2) == 0 - if useFrontierCheckpointing { - completedSpans = getRandomCompletedSpans(cover, n) - for _, sp := range completedSpans { - frontierEntries = append(frontierEntries, jobspb.RestoreProgress_FrontierEntry{ - Span: sp, - Timestamp: completedSpanTime, - }) - } - } else { - idx := r.Intn(len(cover)) - highWater = cover[idx].Span.EndKey + completedSpans = getRandomCompletedSpans(cover, n) + for _, sp := range completedSpans { + frontierEntries = append(frontierEntries, jobspb.RestoreProgress_FrontierEntry{ + Span: sp, + Timestamp: completedSpanTime, + }) } - resumeCover, err := makeImportSpans( ctx, backups[numBackups-1].Spans, backups, layerToIterFactory, - highWater, target<<20, introducedSpanFrontier, frontierEntries) @@ -880,21 +862,11 @@ func runTestRestoreEntryCoverForSpanAndFileCounts( // completed spans from the original required spans. var resumedRequiredSpans roachpb.Spans for _, origReq := range backups[numBackups-1].Spans { - var resumeReq []roachpb.Span - if useFrontierCheckpointing { - resumeReq = roachpb.SubtractSpans([]roachpb.Span{origReq}, completedSpans) - } else { - resumeReq = roachpb.SubtractSpans([]roachpb.Span{origReq}, []roachpb.Span{{Key: cover[0].Span.Key, EndKey: highWater}}) - } + resumeReq := roachpb.SubtractSpans([]roachpb.Span{origReq}, completedSpans) resumedRequiredSpans = append(resumedRequiredSpans, resumeReq...) } - var errorMsg string - if useFrontierCheckpointing { - errorMsg = fmt.Sprintf("completed spans in frontier: %v", completedSpans) - } else { - errorMsg = fmt.Sprintf("highwater: %v", highWater) - } + errorMsg := fmt.Sprintf("completed spans in frontier: %v", completedSpans) require.NoError(t, checkRestoreCovering(ctx, backups, resumedRequiredSpans, resumeCover, target != noSpanTargetSize, execCfg.DistSQLSrv.ExternalStorage), @@ -1033,7 +1005,7 @@ func TestRestoreEntryCoverZeroSizeFiles(t *testing.T) { expectedCover = tt.expectedCoverGenerated } - cover, err := makeImportSpans(ctx, tt.requiredSpans, backups, layerToIterFactory, nil, noSpanTargetSize, emptySpanFrontier, emptyCompletedSpans) + cover, err := makeImportSpans(ctx, tt.requiredSpans, backups, layerToIterFactory, noSpanTargetSize, emptySpanFrontier, emptyCompletedSpans) require.NoError(t, err) simpleCover := make([]simpleRestoreSpanEntry, len(cover)) diff --git a/pkg/ccl/testccl/sqlccl/BUILD.bazel b/pkg/ccl/testccl/sqlccl/BUILD.bazel index edbfcab19cf9..bb12483f8086 100644 --- a/pkg/ccl/testccl/sqlccl/BUILD.bazel +++ b/pkg/ccl/testccl/sqlccl/BUILD.bazel @@ -11,6 +11,7 @@ go_test( "session_revival_test.go", "show_create_test.go", "show_transfer_state_test.go", + "standby_read_test.go", "temp_table_clean_test.go", "tenant_gc_test.go", ], @@ -39,6 +40,8 @@ go_test( "//pkg/settings/cluster", "//pkg/spanconfig", "//pkg/sql", + "//pkg/sql/catalog/lease", + "//pkg/sql/catalog/replication", "//pkg/sql/gcjob", "//pkg/sql/isql", "//pkg/sql/lexbase", diff --git a/pkg/ccl/testccl/sqlccl/standby_read_test.go b/pkg/ccl/testccl/sqlccl/standby_read_test.go new file mode 100644 index 000000000000..943621cd6ce0 --- /dev/null +++ b/pkg/ccl/testccl/sqlccl/standby_read_test.go @@ -0,0 +1,110 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package sqlccl + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/replication" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestStandbyRead(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + testcases := []struct { + standby bool + stmt string + expected [][]string + }{ + {stmt: `CREATE TABLE abc (a INT PRIMARY KEY, b INT, c JSONB)`}, + {stmt: `INSERT INTO abc VALUES (1, 10, '[100]'), (3, 30, '[300]'), (5, 50, '[500]')`}, + {stmt: `ALTER TABLE abc SPLIT AT VALUES (2), (4)`}, + {stmt: `SELECT count(*) FROM [SHOW TABLES]`, expected: [][]string{{"1"}}}, + {stmt: `SELECT count(*) FROM abc`, expected: [][]string{{"3"}}}, + {standby: true, stmt: `SELECT count(*) FROM [SHOW TABLES]`, expected: [][]string{{"1"}}}, + {standby: true, stmt: `SELECT count(*) FROM abc`, expected: [][]string{{"3"}}}, + } + + ctx := context.Background() + tc := serverutils.StartCluster(t, 3, /* numNodes */ + base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{ + DefaultTestTenant: base.TestControlsTenantsExplicitly, + }, + }) + defer tc.Stopper().Stop(ctx) + ts := tc.Server(0) + + _, srcDB, err := ts.TenantController().StartSharedProcessTenant(ctx, + base.TestSharedProcessTenantArgs{ + TenantID: serverutils.TestTenantID(), + TenantName: "src", + UseDatabase: "defaultdb", + }, + ) + require.NoError(t, err) + dstTenant, dstDB, err := ts.TenantController().StartSharedProcessTenant(ctx, + base.TestSharedProcessTenantArgs{ + TenantID: serverutils.TestTenantID2(), + TenantName: "dst", + UseDatabase: "defaultdb", + }, + ) + require.NoError(t, err) + + srcRunner := sqlutils.MakeSQLRunner(srcDB) + dstRunner := sqlutils.MakeSQLRunner(dstDB) + dstInternal := dstTenant.InternalDB().(*sql.InternalDB) + + dstRunner.Exec(t, `SET CLUSTER SETTING sql.defaults.distsql = always`) + dstRunner.Exec(t, `SET distsql = always`) + + waitForReplication := func() { + now := ts.Clock().Now() + err := replication.SetupOrAdvanceStandbyReaderCatalog( + ctx, serverutils.TestTenantID(), now, dstInternal, dstTenant.ClusterSettings(), + ) + if err != nil { + t.Fatal(err) + } + now = ts.Clock().Now() + lm := dstTenant.LeaseManager().(*lease.Manager) + testutils.SucceedsSoon(t, func() error { + if lm.GetSafeReplicationTS().Less(now) { + return errors.AssertionFailedf("waiting for descriptor close timestamp to catch up") + } + return nil + }) + } + + for _, tc := range testcases { + var runner *sqlutils.SQLRunner + if tc.standby { + waitForReplication() + runner = dstRunner + } else { + runner = srcRunner + } + if tc.expected == nil { + runner.Exec(t, tc.stmt) + } else { + runner.CheckQueryResultsRetry(t, tc.stmt, tc.expected) + } + } +} diff --git a/pkg/kv/kvserver/client_merge_test.go b/pkg/kv/kvserver/client_merge_test.go index 2dd80ea16f67..ae0b218488d4 100644 --- a/pkg/kv/kvserver/client_merge_test.go +++ b/pkg/kv/kvserver/client_merge_test.go @@ -3171,7 +3171,6 @@ func TestMergeQueueWithExternalFiles(t *testing.T) { store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID()) require.NoError(t, err) - store.SetMergeQueueActive(true) if skipExternal { verifyUnmergedSoon(t, store, lhsDesc.StartKey, rhsDesc.StartKey) } else { @@ -4293,6 +4292,11 @@ func TestStoreRangeMergeDuringShutdown(t *testing.T) { func verifyMergedSoon(t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey roachpb.RKey) { t.Helper() + store.SetMergeQueueActive(true) + defer func() { + store.SetMergeQueueActive(false) + store.MustForceMergeScanAndProcess() // drain any merges that might already be queued + }() testutils.SucceedsSoon(t, func() error { store.MustForceMergeScanAndProcess() repl := store.LookupReplica(rhsStartKey) @@ -4310,6 +4314,11 @@ func verifyUnmergedSoon( t *testing.T, store *kvserver.Store, lhsStartKey, rhsStartKey roachpb.RKey, ) { t.Helper() + store.SetMergeQueueActive(true) + defer func() { + store.SetMergeQueueActive(false) + store.MustForceMergeScanAndProcess() // drain any merges that might already be queued + }() testutils.SucceedsSoon(t, func() error { store.MustForceMergeScanAndProcess() repl := store.LookupReplica(rhsStartKey) @@ -4344,9 +4353,6 @@ func TestMergeQueue(t *testing.T) { WallClock: manualClock, DefaultZoneConfigOverride: &zoneConfig, }, - Store: &kvserver.StoreTestingKnobs{ - DisableScanner: true, - }, }, }, }) @@ -4354,11 +4360,6 @@ func TestMergeQueue(t *testing.T) { conf := zoneConfig.AsSpanConfig() store := tc.GetFirstStoreFromServer(t, 0) - // The cluster with manual replication disables the merge queue, - // so we need to re-enable. - _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.range_merge.queue.enabled = true`) - require.NoError(t, err) - store.SetMergeQueueActive(true) split := func(t *testing.T, key roachpb.Key, expirationTime hlc.Timestamp) { t.Helper() @@ -4429,6 +4430,7 @@ func TestMergeQueue(t *testing.T) { kvserver.SplitByLoadEnabled.Override(ctx, &s.ClusterSettings().SV, false) } + store.SetMergeQueueActive(false) // reset merge queue to inactive store.MustForceMergeScanAndProcess() // drain any merges that might already be queued split(t, rhsStartKey.AsRawKey(), hlc.Timestamp{} /* expirationTime */) } @@ -4818,7 +4820,8 @@ func TestMergeQueueSeesNonVoters(t *testing.T) { } var clusterArgs = base.TestClusterArgs{ - // We dont want the replicate queue mucking with our test, so disable it. + // We don't want the replicate queue mucking with our test, so disable it. + // This also disables the merge queue, until it is manually enabled. ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ @@ -4841,10 +4844,6 @@ func TestMergeQueueSeesNonVoters(t *testing.T) { store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(1) require.Nil(t, err) - // We're going to split the dummy range created above with an empty - // expiration time. Disable the merge queue before splitting so that the - // split ranges aren't immediately merged. - store.SetMergeQueueActive(false) leftDesc, rightDesc := splitDummyRangeInTestCluster( t, tc, dbName, "kv" /* tableName */, hlc.Timestamp{} /* splitExpirationTime */) @@ -4887,7 +4886,6 @@ func TestMergeQueueSeesNonVoters(t *testing.T) { tc.RemoveVotersOrFatal(t, rightDesc.StartKey.AsRawKey(), tc.Target(0)) rightDesc = tc.LookupRangeOrFatal(t, rightDesc.StartKey.AsRawKey()) - store.SetMergeQueueActive(true) verifyMergedSoon(t, store, leftDesc.StartKey, rightDesc.StartKey) }) } @@ -4909,7 +4907,8 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) { ctx := context.Background() var delaySnapshotTrap atomic.Value var clusterArgs = base.TestClusterArgs{ - // We dont want the replicate queue mucking with our test, so disable it. + // We don't want the replicate queue mucking with our test, so disable it. + // This also disables the merge queue, until it is manually enabled. ReplicationMode: base.ReplicationManual, ServerArgs: base.TestServerArgs{ Knobs: base.TestingKnobs{ @@ -4945,17 +4944,9 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) { numNodes := 3 tc, _ := setupTestClusterWithDummyRange(t, clusterArgs, dbName, tableName, numNodes) defer tc.Stopper().Stop(ctx) - // We're controlling merge queue operation via - // `store.SetMergeQueueActive`, so enable the cluster setting here. - _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.range_merge.queue.enabled=true`) - require.NoError(t, err) store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(1) require.Nil(t, err) - // We're going to split the dummy range created above with an empty - // expiration time. Disable the merge queue before splitting so that the - // split ranges aren't immediately merged. - store.SetMergeQueueActive(false) leftDesc, rightDesc := splitDummyRangeInTestCluster( t, tc, dbName, tableName, hlc.Timestamp{}, /* splitExpirationTime */ ) @@ -4972,7 +4963,6 @@ func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) { time.Sleep(5 * time.Second) return nil }) - store.SetMergeQueueActive(true) verifyMergedSoon(t, store, leftDesc.StartKey, rightDesc.StartKey) } diff --git a/pkg/sql/execinfrapb/processors_bulk_io.proto b/pkg/sql/execinfrapb/processors_bulk_io.proto index 41c51217ca4c..4c2477639b2f 100644 --- a/pkg/sql/execinfrapb/processors_bulk_io.proto +++ b/pkg/sql/execinfrapb/processors_bulk_io.proto @@ -438,8 +438,7 @@ message GenerativeSplitAndScatterSpec { // Spans is the required spans in the restore. repeated roachpb.Span spans = 10 [(gogoproto.nullable) = false]; repeated jobs.jobspb.RestoreDetails.BackupLocalityInfo backup_locality_info = 11 [(gogoproto.nullable) = false]; - // HighWater is the high watermark of the previous run of restore. - optional bytes high_water = 12; + reserved 12; // User who initiated the restore. optional string user_proto = 13 [(gogoproto.nullable) = false, (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/security/username.SQLUsernameProto"]; // ChunkSize is the number of import spans per chunk. @@ -451,7 +450,7 @@ message GenerativeSplitAndScatterSpec { // NumNodes is the number of nodes available for dist restore. optional int64 num_nodes = 17[(gogoproto.nullable) = false]; optional int64 job_id = 18 [(gogoproto.nullable) = false, (gogoproto.customname) = "JobID"]; - optional bool use_frontier_checkpointing = 20 [(gogoproto.nullable) = false]; + reserved 20 ; repeated jobs.jobspb.RestoreProgress.FrontierEntry checkpointed_spans = 21 [(gogoproto.nullable) = false]; // ExclusiveFileSpanComparison is true if the backup can safely use // exclusive file span comparison. diff --git a/pkg/sql/row/kv_batch_fetcher.go b/pkg/sql/row/kv_batch_fetcher.go index 45298a1465a2..2532ea42c7c0 100644 --- a/pkg/sql/row/kv_batch_fetcher.go +++ b/pkg/sql/row/kv_batch_fetcher.go @@ -298,7 +298,6 @@ func makeExternalSpanSendFunc( ext *fetchpb.IndexFetchSpec_ExternalRowData, db *kv.DB, batchRequestsIssued *int64, ) sendFunc { return func(ctx context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { - ba.Timestamp = ext.AsOf for _, req := range ba.Requests { // We only allow external row data for a few known types of request. switch r := req.GetInner().(type) { @@ -310,12 +309,34 @@ func makeExternalSpanSendFunc( } } log.VEventf(ctx, 2, "kv external fetcher: sending a batch with %d requests", len(ba.Requests)) - res, err := db.NonTransactionalSender().Send(ctx, ba) + + // Open a new transaction with fixed timestamp set to the external + // timestamp. We must do this with txn.Send rather than using + // db.NonTransactionalSender to get the 1-to-1 request-response guarantee + // required by txnKVFetcher. + // TODO(michae2): Explore whether we should keep this transaction open for + // the duration of the surrounding transaction. + var res *kvpb.BatchResponse + err := db.TxnWithAdmissionControl( + ctx, ba.AdmissionHeader.Source, admissionpb.WorkPriority(ba.AdmissionHeader.Priority), + kv.SteppingDisabled, + func(ctx context.Context, txn *kv.Txn) error { + if err := txn.SetFixedTimestamp(ctx, ext.AsOf); err != nil { + return err + } + var err *kvpb.Error + res, err = txn.Send(ctx, ba) + if err != nil { + return err.GoError() + } + return nil + }) + // Note that in some code paths there is no concurrency when using the // sendFunc, but we choose to unconditionally use atomics here since its // overhead should be negligible in the grand scheme of things anyway. atomic.AddInt64(batchRequestsIssued, 1) - return res, err.GoError() + return res, err } }