Skip to content

Commit

Permalink
changefeedccl: move checkpoint creation code into separate package
Browse files Browse the repository at this point in the history
This patch moves the changefeed checkpoint creation code into a
separate package to prevent a dependency cycle when the resolved
span frontiers are moved into its own package in a later commit.
Given the upcoming checkpoint improvement work, this is also a
good opportunity to clean up the checkpoint creation API.
Finally, a unit test covering checkpoint creation has been added.

Release note: None
  • Loading branch information
andyyang890 committed Dec 24, 2024
1 parent 8057da9 commit c0e886c
Show file tree
Hide file tree
Showing 9 changed files with 308 additions and 154 deletions.
3 changes: 3 additions & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ ALL_TESTS = [
"//pkg/ccl/changefeedccl/cdctest:cdctest_test",
"//pkg/ccl/changefeedccl/cdcutils:cdcutils_test",
"//pkg/ccl/changefeedccl/changefeedbase:changefeedbase_test",
"//pkg/ccl/changefeedccl/checkpoint:checkpoint_test",
"//pkg/ccl/changefeedccl/kvevent:kvevent_test",
"//pkg/ccl/changefeedccl/kvfeed:kvfeed_test",
"//pkg/ccl/changefeedccl/schemafeed:schemafeed_test",
Expand Down Expand Up @@ -869,6 +870,8 @@ GO_TARGETS = [
"//pkg/ccl/changefeedccl/changefeedbase:changefeedbase_test",
"//pkg/ccl/changefeedccl/changefeedpb:changefeedpb",
"//pkg/ccl/changefeedccl/changefeedvalidators:changefeedvalidators",
"//pkg/ccl/changefeedccl/checkpoint:checkpoint",
"//pkg/ccl/changefeedccl/checkpoint:checkpoint_test",
"//pkg/ccl/changefeedccl/kvevent:kvevent",
"//pkg/ccl/changefeedccl/kvevent:kvevent_test",
"//pkg/ccl/changefeedccl/kvfeed:kvfeed",
Expand Down
3 changes: 1 addition & 2 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ go_library(
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/ccl/changefeedccl/changefeedpb",
"//pkg/ccl/changefeedccl/changefeedvalidators",
"//pkg/ccl/changefeedccl/checkpoint",
"//pkg/ccl/changefeedccl/kvevent",
"//pkg/ccl/changefeedccl/kvfeed",
"//pkg/ccl/changefeedccl/schemafeed",
Expand Down Expand Up @@ -277,7 +278,6 @@ go_test(
"//pkg/kv/kvserver/kvserverbase",
"//pkg/kv/kvserver/protectedts",
"//pkg/kv/kvserver/protectedts/ptpb",
"//pkg/kv/kvserver/rangefeed",
"//pkg/roachpb",
"//pkg/scheduledjobs",
"//pkg/scheduledjobs/schedulebase",
Expand Down Expand Up @@ -347,7 +347,6 @@ go_test(
"//pkg/util/randident",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/shuffle",
"//pkg/util/span",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand Down
58 changes: 9 additions & 49 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvfeed"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
Expand Down Expand Up @@ -1083,12 +1084,8 @@ func (cs *cachedState) SetHighwater(frontier hlc.Timestamp) {
}

// SetCheckpoint implements the eval.ChangefeedState interface.
func (cs *cachedState) SetCheckpoint(spans []roachpb.Span, timestamp hlc.Timestamp) {
changefeedProgress := cs.progress.Details.(*jobspb.Progress_Changefeed).Changefeed
changefeedProgress.Checkpoint = &jobspb.ChangefeedProgress_Checkpoint{
Spans: spans,
Timestamp: timestamp,
}
func (cs *cachedState) SetCheckpoint(checkpoint jobspb.ChangefeedProgress_Checkpoint) {
cs.progress.Details.(*jobspb.Progress_Changefeed).Changefeed.Checkpoint = &checkpoint
}

func newJobState(
Expand Down Expand Up @@ -1684,7 +1681,7 @@ func (cf *changeFrontier) maybeCheckpointJob(
var checkpoint jobspb.ChangefeedProgress_Checkpoint
if updateCheckpoint {
maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&cf.FlowCtx.Cfg.Settings.SV)
checkpoint.Spans, checkpoint.Timestamp = cf.frontier.getCheckpointSpans(maxBytes)
checkpoint = cf.frontier.MakeCheckpoint(maxBytes)
}

if updateCheckpoint || updateHighWater {
Expand Down Expand Up @@ -1765,7 +1762,7 @@ func (cf *changeFrontier) checkpointJobProgress(
}

cf.localState.SetHighwater(frontier)
cf.localState.SetCheckpoint(checkpoint.Spans, checkpoint.Timestamp)
cf.localState.SetCheckpoint(checkpoint)

return true, nil
}
Expand Down Expand Up @@ -2020,47 +2017,10 @@ func (f *schemaChangeFrontier) ForwardLatestKV(ts time.Time) {
}
}

type spanIter func(forEachSpan span.Operation)

// getCheckpointSpans returns as many spans that should be checkpointed (are
// above the highwater mark) as can fit in maxBytes, along with the earliest
// timestamp of the checkpointed spans. A SpanGroup is used to merge adjacent
// spans above the high-water mark.
func getCheckpointSpans(
frontier hlc.Timestamp, forEachSpan spanIter, maxBytes int64,
) (spans []roachpb.Span, timestamp hlc.Timestamp) {
// Collect leading spans into a SpanGroup to merge adjacent spans and store
// the lowest timestamp found
var checkpointSpanGroup roachpb.SpanGroup
checkpointFrontier := hlc.Timestamp{}
forEachSpan(func(s roachpb.Span, ts hlc.Timestamp) span.OpResult {
if frontier.Less(ts) {
checkpointSpanGroup.Add(s)
if checkpointFrontier.IsEmpty() || ts.Less(checkpointFrontier) {
checkpointFrontier = ts
}
}
return span.ContinueMatch
})

// Ensure we only return up to maxBytes spans
var checkpointSpans []roachpb.Span
var used int64
for _, span := range checkpointSpanGroup.Slice() {
used += int64(len(span.Key)) + int64(len(span.EndKey))
if used > maxBytes {
break
}
checkpointSpans = append(checkpointSpans, span)
}

return checkpointSpans, checkpointFrontier
}

func (f *schemaChangeFrontier) getCheckpointSpans(
maxBytes int64,
) (spans []roachpb.Span, timestamp hlc.Timestamp) {
return getCheckpointSpans(f.Frontier(), f.Entries, maxBytes)
// MakeCheckpoint returns the spans that should be checkpointed based on the
// current state of the frontier.
func (f *schemaChangeFrontier) MakeCheckpoint(maxBytes int64) jobspb.ChangefeedProgress_Checkpoint {
return checkpoint.Make(f.Frontier(), f.Entries, maxBytes)
}

// InBackfill returns whether a resolved span is part of an ongoing backfill
Expand Down
99 changes: 0 additions & 99 deletions pkg/ccl/changefeedccl/changefeed_processors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,13 @@
package changefeedccl

import (
"sort"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/shuffle"
"github.com/cockroachdb/cockroach/pkg/util/span"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -137,95 +130,3 @@ func TestSetupSpansAndFrontier(t *testing.T) {
})
}
}

type rspans []roachpb.Span

func (rs rspans) Len() int {
return len(rs)
}

func (rs rspans) Less(i int, j int) bool {
return rs[i].Key.Compare(rs[j].Key) < 0
}

func (rs rspans) Swap(i int, j int) {
rs[i], rs[j] = rs[j], rs[i]
}

type checkpointSpan struct {
span roachpb.Span
ts hlc.Timestamp
}

type checkpointSpans []checkpointSpan

func (rs checkpointSpans) Len() int {
return len(rs)
}

func (rs checkpointSpans) Less(i int, j int) bool {
return rs[i].span.Key.Compare(rs[j].span.Key) < 0
}

func (rs checkpointSpans) Swap(i int, j int) {
rs[i], rs[j] = rs[j], rs[i]
}

// TestGetCheckpointSpans generates 100 random non-overlapping spans with random
// timestamps within a minute of each other and turns them into checkpoint
// spans. It then does some sanity checks. It also compares the total
// catchup time between the checkpoint timestamp and the high watermark.
// Although the test relies on internal implementation details, it is a
// good base to explore other fine-grained checkpointing algorithms.
func TestGetCheckpointSpans(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const numSpans = 100
maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Default()
hwm := hlc.Timestamp{}
rng, _ := randutil.NewTestRand()

spans := make(checkpointSpans, numSpans)

// Generate spans. They should not be overlapping.
// Randomize the order in which spans are processed.
for i, s := range rangefeed.GenerateRandomizedSpans(rng, numSpans) {
ts := rangefeed.GenerateRandomizedTs(rng, time.Minute.Nanoseconds())
if hwm.IsEmpty() || ts.Less(hwm) {
hwm = ts
}
spans[i] = checkpointSpan{s.AsRawSpanWithNoLocals(), ts}
}
shuffle.Shuffle(spans)

forEachSpan := func(fn span.Operation) {
for _, s := range spans {
fn(s.span, s.ts)
}
}

// Compute the checkpoint.
cpSpans, cpTS := getCheckpointSpans(hwm, forEachSpan, maxBytes)
require.Less(t, len(cpSpans), numSpans)
require.True(t, hwm.Less(cpTS))

// Calculate the total amount of time these spans would have to "catch up"
// using the checkpoint spans compared to starting at the frontier.
catchup := cpTS.GoTime().Sub(hwm.GoTime())
sort.Sort(rspans(cpSpans))
sort.Sort(spans)
var catchupFromCheckpoint, catchupFromHWM time.Duration
j := 0
for _, s := range spans {
catchupFromHWM += s.ts.GoTime().Sub(hwm.GoTime())
if j < len(cpSpans) && cpSpans[j].Equal(s.span) {
catchupFromCheckpoint += s.ts.GoTime().Sub(cpTS.GoTime())
j++
}
}
t.Logf("Checkpoint time improved by %v for %d/%d spans\ntotal catchup from checkpoint: %v\ntotal catchup from high watermark: %v\nPercent improvement %f",
catchup, len(cpSpans), numSpans, catchupFromCheckpoint, catchupFromHWM,
100*(1-float64(catchupFromCheckpoint.Nanoseconds())/float64(catchupFromHWM.Nanoseconds())))
require.Less(t, catchupFromCheckpoint, catchupFromHWM)
}
7 changes: 4 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedvalidators"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/cloud"
"github.com/cockroachdb/cockroach/pkg/docs"
Expand Down Expand Up @@ -1407,21 +1408,21 @@ func reconcileJobStateWithLocalState(
}

maxBytes := changefeedbase.FrontierCheckpointMaxBytes.Get(&execCfg.Settings.SV)
checkpointSpans, checkpointTS := getCheckpointSpans(sf.Frontier(), func(forEachSpan span.Operation) {
checkpoint := checkpoint.Make(sf.Frontier(), func(forEachSpan span.Operation) {
for _, fs := range localState.aggregatorFrontier {
forEachSpan(fs.Span, fs.Timestamp)
}
}, maxBytes)

// Update checkpoint.
updateHW := highWater.Less(sf.Frontier())
updateSpanCheckpoint := len(checkpointSpans) > 0
updateSpanCheckpoint := len(checkpoint.Spans) > 0

if updateHW || updateSpanCheckpoint {
if updateHW {
localState.SetHighwater(sf.Frontier())
}
localState.SetCheckpoint(checkpointSpans, checkpointTS)
localState.SetCheckpoint(checkpoint)
if log.V(1) {
log.Infof(ctx, "Applying checkpoint to job record: hw=%v, cf=%v",
localState.progress.GetHighWater(), localState.progress.GetChangefeed())
Expand Down
33 changes: 33 additions & 0 deletions pkg/ccl/changefeedccl/checkpoint/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "checkpoint",
srcs = ["checkpoint.go"],
importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/checkpoint",
visibility = ["//visibility:public"],
deps = [
"//pkg/jobs/jobspb",
"//pkg/roachpb",
"//pkg/util/hlc",
"//pkg/util/span",
],
)

go_test(
name = "checkpoint_test",
srcs = ["checkpoint_test.go"],
deps = [
":checkpoint",
"//pkg/ccl/changefeedccl/changefeedbase",
"//pkg/jobs/jobspb",
"//pkg/kv/kvserver/rangefeed",
"//pkg/roachpb",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/shuffle",
"//pkg/util/span",
"@com_github_stretchr_testify//require",
],
)
56 changes: 56 additions & 0 deletions pkg/ccl/changefeedccl/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

// Package checkpoint contains code responsible for handling changefeed
// checkpoints.
package checkpoint

import (
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/span"
)

// SpanIter is an iterator over a collection of spans.
type SpanIter func(forEachSpan span.Operation)

// Make creates a checkpoint with as many spans that should be checkpointed (are
// above the highwater mark) as can fit in maxBytes, along with the earliest
// timestamp of the checkpointed spans. A SpanGroup is used to merge adjacent
// spans above the high-water mark.
func Make(
frontier hlc.Timestamp, forEachSpan SpanIter, maxBytes int64,
) jobspb.ChangefeedProgress_Checkpoint {
// Collect leading spans into a SpanGroup to merge adjacent spans and store
// the lowest timestamp found
var checkpointSpanGroup roachpb.SpanGroup
checkpointFrontier := hlc.Timestamp{}
forEachSpan(func(s roachpb.Span, ts hlc.Timestamp) span.OpResult {
if frontier.Less(ts) {
checkpointSpanGroup.Add(s)
if checkpointFrontier.IsEmpty() || ts.Less(checkpointFrontier) {
checkpointFrontier = ts
}
}
return span.ContinueMatch
})

// Ensure we only return up to maxBytes spans
var checkpointSpans []roachpb.Span
var used int64
for _, span := range checkpointSpanGroup.Slice() {
used += int64(len(span.Key)) + int64(len(span.EndKey))
if used > maxBytes {
break
}
checkpointSpans = append(checkpointSpans, span)
}

return jobspb.ChangefeedProgress_Checkpoint{
Spans: checkpointSpans,
Timestamp: checkpointFrontier,
}
}
Loading

0 comments on commit c0e886c

Please sign in to comment.