Skip to content

Commit

Permalink
backupccl: remove deprecated restore checkpointing method
Browse files Browse the repository at this point in the history
The new checkpointing logic has been default since 23.1, so it is safe delete
the old checkpointing logic-- we do not expect 25.1 to resume a restore that
began pre 23.1.

Epic: none

Release note: non
  • Loading branch information
msbutler committed Oct 21, 2024
1 parent b908a2f commit e3a1093
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 219 deletions.
3 changes: 1 addition & 2 deletions pkg/ccl/backupccl/bench_covering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,10 @@ func BenchmarkRestoreEntryCover(b *testing.B) {
filter, err := makeSpanCoveringFilter(
backups[numBackups-1].Spans,
[]jobspb.RestoreProgress_FrontierEntry{},
nil,
introducedSpanFrontier,
0,
defaultMaxFileCount,
false)
)
require.NoError(b, err)
defer filter.close()

Expand Down
4 changes: 1 addition & 3 deletions pkg/ccl/backupccl/generative_split_and_scatter_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,9 @@ func runGenerativeSplitAndScatter(
filter, err := makeSpanCoveringFilter(
spec.Spans,
spec.CheckpointedSpans,
spec.HighWater,
introducedSpanFrontier,
spec.TargetSize,
spec.MaxFileCount,
spec.UseFrontierCheckpointing)
spec.MaxFileCount)
if err != nil {
return errors.Wrap(err, "failed to make span covering filter")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,6 @@ func makeTestingGenerativeSplitAndScatterSpec(
EndTime: hlc.Timestamp{},
Spans: requiredSpans,
BackupLocalityInfo: nil,
HighWater: nil,
UserProto: "",
ChunkSize: 1,
TargetSize: 1,
Expand Down
15 changes: 1 addition & 14 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,15 +314,11 @@ func restore(
return emptyRowCount, err
}

ver := job.Payload().CreationClusterVersion
// TODO(radu,msbutler,stevendanna): we might be able to remove this now?
on231 := ver.Major > 23 || (ver.Major == 23 && ver.Minor >= 1)
restoreCheckpoint := job.Progress().Details.(*jobspb.Progress_Restore).Restore.Checkpoint
requiredSpans := dataToRestore.getSpans()
progressTracker, err := makeProgressTracker(
requiredSpans,
restoreCheckpoint,
on231,
restoreCheckpointMaxBytes.Get(&execCtx.ExecCfg().Settings.SV),
endTime)
if err != nil {
Expand Down Expand Up @@ -353,11 +349,9 @@ func restore(
return makeSpanCoveringFilter(
requiredSpans,
restoreCheckpoint,
job.Progress().Details.(*jobspb.Progress_Restore).Restore.HighWater,
introducedSpanFrontier,
targetSize,
maxFileCount,
progressTracker.useFrontier)
maxFileCount)
}(); err != nil {
return roachpb.RowCount{}, err
}
Expand Down Expand Up @@ -436,13 +430,6 @@ func restore(
}
tasks = append(tasks, jobProgressLoop)
}
if !progressTracker.useFrontier {
// This goroutine feeds the deprecated high water mark variant of the
// generativeCheckpointLoop.
tasks = append(tasks, func(ctx context.Context) error {
return genSpan(ctx, progressTracker.inFlightSpanFeeder)
})
}

progCh := make(chan *execinfrapb.RemoteProducerMetadata_BulkProcessorProgress)
if !details.ExperimentalOnline {
Expand Down
6 changes: 1 addition & 5 deletions pkg/ccl/backupccl/restore_processor_planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,21 +173,17 @@ func distRestore(
EndTime: md.restoreTime,
Spans: md.dataToRestore.getSpans(),
BackupLocalityInfo: md.backupLocalityInfo,
HighWater: md.spanFilter.highWaterMark,
UserProto: execCtx.User().EncodeProto(),
TargetSize: md.spanFilter.targetSize,
MaxFileCount: int64(md.spanFilter.maxFileCount),
ChunkSize: int64(chunkSize),
NumEntries: int64(md.numImportSpans),
UseFrontierCheckpointing: md.spanFilter.useFrontierCheckpointing,
NumNodes: int64(numNodes),
JobID: int64(md.jobID),
SQLInstanceIDs: instanceIDs,
ExclusiveFileSpanComparison: md.exclusiveEndKeys,
}
if md.spanFilter.useFrontierCheckpointing {
spec.CheckpointedSpans = persistFrontier(md.spanFilter.checkpointFrontier, 0)
}
spec.CheckpointedSpans = persistFrontier(md.spanFilter.checkpointFrontier, 0)

splitAndScatterProc := physicalplan.Processor{
SQLInstanceID: execCtx.ExecCfg().NodeInfo.NodeID.SQLInstanceID(),
Expand Down
180 changes: 53 additions & 127 deletions pkg/ccl/backupccl/restore_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
spanUtils "github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
pbtypes "github.com/gogo/protobuf/types"
)

Expand Down Expand Up @@ -50,26 +49,7 @@ type progressTracker struct {

// res tracks the amount of data that has been ingested.
res roachpb.RowCount

// Note that the fields below are used for the deprecated high watermark progress
// tracker.
// highWaterMark represents the index into the requestsCompleted map.
highWaterMark int64
ceiling int64

// As part of job progress tracking, inFlightImportSpans tracks all the
// spans that have been generated are being processed by the processors in
// distRestore. requestsCompleleted tracks the spans from
// inFlightImportSpans that have completed its processing. Once all spans up
// to index N have been processed (and appear in requestsCompleted), then
// any spans with index < N will be removed from both inFlightImportSpans
// and requestsCompleted maps.
inFlightImportSpans map[int64]roachpb.Span
requestsCompleted map[int64]bool
}
useFrontier bool
inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry

// endTime is the restore as of timestamp. This can be empty, and an empty timestamp
// indicates a restore of the latest revision.
endTime hlc.Timestamp
Expand All @@ -78,7 +58,6 @@ type progressTracker struct {
func makeProgressTracker(
requiredSpans roachpb.Spans,
persistedSpans []jobspb.RestoreProgress_FrontierEntry,
useFrontier bool,
maxBytes int64,
endTime hlc.Timestamp,
) (*progressTracker, error) {
Expand All @@ -87,32 +66,20 @@ func makeProgressTracker(
checkpointFrontier spanUtils.Frontier
err error
nextRequiredSpanKey map[string]roachpb.Key
inFlightSpanFeeder chan execinfrapb.RestoreSpanEntry
)
if useFrontier {
checkpointFrontier, err = loadCheckpointFrontier(requiredSpans, persistedSpans)
if err != nil {
return nil, err
}
nextRequiredSpanKey = make(map[string]roachpb.Key)
for i := 0; i < len(requiredSpans)-1; i++ {
nextRequiredSpanKey[requiredSpans[i].EndKey.String()] = requiredSpans[i+1].Key
}

} else {
inFlightSpanFeeder = make(chan execinfrapb.RestoreSpanEntry, 1000)
checkpointFrontier, err = loadCheckpointFrontier(requiredSpans, persistedSpans)
if err != nil {
return nil, err
}
nextRequiredSpanKey = make(map[string]roachpb.Key)
for i := 0; i < len(requiredSpans)-1; i++ {
nextRequiredSpanKey[requiredSpans[i].EndKey.String()] = requiredSpans[i+1].Key
}

pt := &progressTracker{}
pt.mu.checkpointFrontier = checkpointFrontier
pt.mu.highWaterMark = -1
pt.mu.ceiling = 0
pt.mu.inFlightImportSpans = make(map[int64]roachpb.Span)
pt.mu.requestsCompleted = make(map[int64]bool)
pt.nextRequiredSpanKey = nextRequiredSpanKey
pt.maxBytes = maxBytes
pt.useFrontier = useFrontier
pt.inFlightSpanFeeder = inFlightSpanFeeder
pt.endTime = endTime
return pt, nil
}
Expand Down Expand Up @@ -182,16 +149,10 @@ func (pt *progressTracker) updateJobCallback(
func() {
pt.mu.Lock()
defer pt.mu.Unlock()
if pt.useFrontier {
// TODO (msbutler): this requires iterating over every span in the frontier,
// and rewriting every completed required span to disk.
// We may want to be more intelligent about this.
d.Restore.Checkpoint = persistFrontier(pt.mu.checkpointFrontier, pt.maxBytes)
} else {
if pt.mu.highWaterMark >= 0 {
d.Restore.HighWater = pt.mu.inFlightImportSpans[pt.mu.highWaterMark].Key
}
}
// TODO (msbutler): this requires iterating over every span in the frontier,
// and rewriting every completed required span to disk.
// We may want to be more intelligent about this.
d.Restore.Checkpoint = persistFrontier(pt.mu.checkpointFrontier, pt.maxBytes)
}()
default:
log.Errorf(progressedCtx, "job payload had unexpected type %T", d)
Expand Down Expand Up @@ -224,83 +185,48 @@ func (pt *progressTracker) ingestUpdate(
}

pt.mu.res.Add(progDetails.Summary)
if pt.useFrontier {
updateSpan := progDetails.DataSpan.Clone()
// If the completedSpan has the same end key as a requiredSpan_i, forward
// the frontier for the span [completedSpan_startKey,
// requiredSpan_i+1_startKey]. This trick ensures the span frontier will
// contain a single entry when the restore completes. Recall that requiredSpans are
// disjoint, and a spanFrontier never merges disjoint spans. So, without
// this trick, the spanFrontier will have O(requiredSpans) entries when the
// restore completes. This trick ensures all spans persisted to the frontier are adjacent,
// and consequently, will eventually merge.
//
// Here's a visual example:
// - this restore has two required spans: [a,d) and [e,h).
// - the restore span entry [c,d) just completed, implying the frontier logically looks like:
//
// tC| x---o
// t0|
// keys--a---b---c---d---e---f---g---h->
//
// r-spans: |---span1---| |---span2---|
//
// - since [c,d)'s endkey equals the required span (a,d]'s endkey,
// also update the gap between required span 1 and 2 in the frontier:
//
// tC| x-------o
// t0|
// keys--a---b---c---d---e---f---g---h->
//
// r-spans: |---span1---| |---span2---|
//
// - this will ensure that when all subspans in required spans 1 and 2 complete,
// the checkpoint frontier has one span:
//
// tC| x---------------------------o
// t0|
// keys--a---b---c---d---e---f---g---h->
//
// r-spans: |---span1---| |---span2---|
if newEndKey, ok := pt.nextRequiredSpanKey[updateSpan.EndKey.String()]; ok {
updateSpan.EndKey = newEndKey
}
if _, err := pt.mu.checkpointFrontier.Forward(updateSpan, completedSpanTime); err != nil {
return false, err
}
} else {
idx := progDetails.ProgressIdx

if idx >= pt.mu.ceiling {
for i := pt.mu.ceiling; i <= idx; i++ {
importSpan, ok := <-pt.inFlightSpanFeeder
if !ok {
// The channel has been closed, there is nothing left to do.
log.Infof(ctx, "exiting restore checkpoint loop as the import span channel has been closed")
return true, nil
}
pt.mu.inFlightImportSpans[i] = importSpan.Span
}
pt.mu.ceiling = idx + 1
}

if sp, ok := pt.mu.inFlightImportSpans[idx]; ok {
// Assert that we're actually marking the correct span done. See #23977.
if !sp.Key.Equal(progDetails.DataSpan.Key) {
return false, errors.Newf("request %d for span %v does not match import span for same idx: %v",
idx, progDetails.DataSpan, sp,
)
}
pt.mu.requestsCompleted[idx] = true
prevHighWater := pt.mu.highWaterMark
for j := pt.mu.highWaterMark + 1; j < pt.mu.ceiling && pt.mu.requestsCompleted[j]; j++ {
pt.mu.highWaterMark = j
}
for j := prevHighWater; j < pt.mu.highWaterMark; j++ {
delete(pt.mu.requestsCompleted, j)
delete(pt.mu.inFlightImportSpans, j)
}
}
updateSpan := progDetails.DataSpan.Clone()
// If the completedSpan has the same end key as a requiredSpan_i, forward
// the frontier for the span [completedSpan_startKey,
// requiredSpan_i+1_startKey]. This trick ensures the span frontier will
// contain a single entry when the restore completes. Recall that requiredSpans are
// disjoint, and a spanFrontier never merges disjoint spans. So, without
// this trick, the spanFrontier will have O(requiredSpans) entries when the
// restore completes. This trick ensures all spans persisted to the frontier are adjacent,
// and consequently, will eventually merge.
//
// Here's a visual example:
// - this restore has two required spans: [a,d) and [e,h).
// - the restore span entry [c,d) just completed, implying the frontier logically looks like:
//
// tC| x---o
// t0|
// keys--a---b---c---d---e---f---g---h->
//
// r-spans: |---span1---| |---span2---|
//
// - since [c,d)'s endkey equals the required span (a,d]'s endkey,
// also update the gap between required span 1 and 2 in the frontier:
//
// tC| x-------o
// t0|
// keys--a---b---c---d---e---f---g---h->
//
// r-spans: |---span1---| |---span2---|
//
// - this will ensure that when all subspans in required spans 1 and 2 complete,
// the checkpoint frontier has one span:
//
// tC| x---------------------------o
// t0|
// keys--a---b---c---d---e---f---g---h->
//
// r-spans: |---span1---| |---span2---|
if newEndKey, ok := pt.nextRequiredSpanKey[updateSpan.EndKey.String()]; ok {
updateSpan.EndKey = newEndKey
}
if _, err := pt.mu.checkpointFrontier.Forward(updateSpan, completedSpanTime); err != nil {
return false, err
}
return true, nil
}
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/restore_progress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestProgressTracker(t *testing.T) {
},
} {
restoreTime := hlc.Timestamp{}
pt, err := makeProgressTracker(requiredSpans, persistedSpans, true, 0, restoreTime)
pt, err := makeProgressTracker(requiredSpans, persistedSpans, 0, restoreTime)
require.NoError(t, err, "step %d", i)

done, err := pt.ingestUpdate(ctx, mockUpdate(step.update, step.completeUpTo))
Expand Down
Loading

0 comments on commit e3a1093

Please sign in to comment.