Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
124631: changefeedccl: pause schedules when restored onto a different cluster  r=dt a=navsetlur

…rrent clusterID. If that's not the case, pause the scheduled changefeed until manual intervention

124830: norm: remove zero cardinality locks r=mw5h a=mw5h

Previously, locks over row expressions known to produce zero rows were left in place. This patch elides those locks during normalization.

Fixes: cockroachdb#114751

Release note (performance improvement): Lock operations are now removed from query plans when the optimizer can prove that no rows would be locked.

Co-authored-by: Naveen Setlur <[email protected]>
Co-authored-by: Matt White <[email protected]>
  • Loading branch information
3 people committed May 30, 2024
3 parents cf7ad52 + 86a4772 + 2e72130 commit e4d5829
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 4 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ go_test(
"//pkg/sql/catalog/catpb",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/desctestutils",
"//pkg/sql/catalog/schemaexpr",
"//pkg/sql/catalog/tabledesc",
Expand Down
25 changes: 21 additions & 4 deletions pkg/ccl/changefeedccl/scheduled_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,30 @@ func (s *scheduledChangefeedExecutor) executeChangefeed(
return errors.AssertionFailedf("scheduled unexpectedly paused")
}

log.Infof(ctx, "Starting scheduled changefeed %d: %s",
sj.ScheduleID(), tree.AsString(changefeedStmt))

// Invoke changefeed plan hook.
hook, cleanup := cfg.PlanHookMaker(ctx, "exec-changefeed", txn.KV(), sj.Owner())
defer cleanup()
changefeedFn, err := planCreateChangefeed(ctx, hook.(sql.PlanHookState), changefeedStmt)

planner := hook.(sql.PlanHookState)
currentClusterID := planner.ExtendedEvalContext().ClusterID
currentDetails := sj.ScheduleDetails()

// If the current cluster ID is different than the schedule's cluster ID,
// pause the schedule. To maintain backward compatability with schedules
// without a clusterID, don't pause schedules without a clusterID.
if !currentDetails.ClusterID.Equal(uuid.Nil) && currentClusterID != currentDetails.ClusterID {
log.Infof(ctx, "scheduled changedfeed %d last run by different cluster %s, pausing until manually resumed",
sj.ScheduleID(),
currentDetails.ClusterID)
currentDetails.ClusterID = currentClusterID
sj.SetScheduleDetails(*currentDetails)
sj.Pause()
return nil
}

log.Infof(ctx, "Starting scheduled changefeed %d: %s",
sj.ScheduleID(), tree.AsString(changefeedStmt))
changefeedFn, err := planCreateChangefeed(ctx, planner, changefeedStmt)
if err != nil {
return err
}
Expand Down
119 changes: 119 additions & 0 deletions pkg/ccl/changefeedccl/scheduled_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/scheduledjobs/schedulebase"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
Expand Down Expand Up @@ -94,6 +95,15 @@ func withSchedulerHelper(sh *testHelper) func(opts *feedTestOptions) {
}
}

func (h *testHelper) loadSchedule(t *testing.T, scheduleID jobspb.ScheduleID) *jobs.ScheduledJob {
t.Helper()

loaded, err := jobs.ScheduledJobDB(h.internalDB()).
Load(context.Background(), h.env, scheduleID)
require.NoError(t, err)
return loaded
}

func (h *testHelper) clearSchedules(t *testing.T) {
t.Helper()

Expand Down Expand Up @@ -162,6 +172,10 @@ func getScheduledChangefeedStatement(t *testing.T, arg *jobspb.ExecutionArgument
return scheduledChangefeed.ChangefeedStatement
}

func (h *testHelper) internalDB() descs.DB {
return h.server.InternalDB().(descs.DB)
}

// This test examines serialized representation of changefeed schedule arguments
// when the scheduled changefeed statement executes. This test does not concern
// itself with the actual scheduling and the execution of those changefeeds.
Expand Down Expand Up @@ -504,6 +518,111 @@ INSERT INTO t2 VALUES (3, 'three'), (2, 'two'), (1, 'one');
}
}

// TestPauseScheduledChangefeedOnNewClusterID schedules a changefeed and verifies the changefeed pauses
// if it is running on a cluster with a different ID than is stored in the schedule details
func TestPauseScheduledChangefeedOnNewClusterID(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

th, cleanup := newTestHelper(t)
defer cleanup()

th.sqlDB.Exec(t, `
CREATE TABLE t1(a INT PRIMARY KEY, b STRING);
INSERT INTO t1 values (1, 'one'), (10, 'ten'), (100, 'hundred');
CREATE TABLE t2(b INT PRIMARY KEY, c STRING);
INSERT INTO t2 VALUES (3, 'three'), (2, 'two'), (1, 'one');
`)

getFeed := func(isBare bool, db *gosql.DB) (string, *webhookFeed, func()) {
cert, _, err := cdctest.NewCACertBase64Encoded()
require.NoError(t, err)
sinkDest, err := cdctest.StartMockWebhookSink(cert)
require.NoError(t, err)

dummyWrapper := func(s Sink) Sink {
return s
}
// NB: This is a partially initialized webhookFeed.
// You must call th.waitForSuccessfulScheduledJob prior to reading
// feed messages. If messages are tried to access before they are present,
// this feed will panic because sink synchronizer is not initialized.
feed := &webhookFeed{
seenTrackerMap: make(map[string]struct{}),
mockSink: sinkDest,
isBare: isBare,
jobFeed: newJobFeed(db, dummyWrapper),
}

sinkURI := fmt.Sprintf("webhook-%s?insecure_tls_skip_verify=true", sinkDest.URL())
return sinkURI, feed, sinkDest.Close
}

testCase := struct {
name string
scheduleStmt string
expectedPayload []string
isBare bool
}{
name: "one-table",
scheduleStmt: "CREATE SCHEDULE FOR changefeed TABLE t1 INTO $1 RECURRING '@hourly'",
expectedPayload: []string{
`t1: [1]->{"after": {"a": 1, "b": "one"}}`,
`t1: [10]->{"after": {"a": 10, "b": "ten"}}`,
`t1: [100]->{"after": {"a": 100, "b": "hundred"}}`,
},
isBare: false,
}

t.Run(testCase.name, func(t *testing.T) {
sinkURI, feed, cleanup := getFeed(testCase.isBare, th.db)
defer cleanup()

sj, err := th.createChangefeedSchedule(
t, testCase.scheduleStmt, sinkURI)
require.NoError(t, err)
defer th.clearSchedules(t)

// Get schedule DB to update the schedule with new clusterID
scheduleStorage := jobs.ScheduledJobDB(th.internalDB())

details := sj.ScheduleDetails()
currentClusterID := details.ClusterID
require.NotZero(t, currentClusterID)

// Modify the scheduled clusterID and update the job
details.ClusterID = jobstest.DummyClusterID
sj.SetScheduleDetails(*details)
th.env.SetTime(sj.NextRun().Add(time.Second))
require.NoError(t, scheduleStorage.Update(context.Background(), sj))
require.NoError(t, th.executeSchedules())

// Schedule is expected to be paused
testutils.SucceedsSoon(t, func() error {
expectPausedSchedule := th.loadSchedule(t, sj.ScheduleID())
if !expectPausedSchedule.IsPaused() {
return errors.New("schedule has not paused yet")
}
// The cluster ID should have been reset.
require.Equal(t, currentClusterID, expectPausedSchedule.ScheduleDetails().ClusterID)
return nil
})

th.sqlDB.Exec(t, "RESUME SCHEDULE $1", sj.ScheduleID())
resumedSchedule := th.loadSchedule(t, sj.ScheduleID())
require.False(t, resumedSchedule.IsPaused())
th.env.SetTime(resumedSchedule.NextRun().Add(time.Second))
require.NoError(t, th.executeSchedules())
jobID := th.waitForSuccessfulScheduledJob(t, sj.ScheduleID())
// We need to set the job ID here explicitly because the webhookFeed.Next
// function calls jobFeed.Details, which uses the job ID to get the
// changefeed details from the system.jobs table.
feed.jobFeed.jobID = jobspb.JobID(jobID)
assertPayloads(t, feed, testCase.expectedPayload)
})
}

// TestScheduledChangefeedErrors tests cases where a schedule changefeed statement will return an error.
func TestScheduledChangefeedErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
Expand Down
6 changes: 6 additions & 0 deletions pkg/sql/opt/norm/rules/mutation.opt
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,9 @@
$fkChecks
$simplifiedPrivate
)

# RemoveZeroCardLock removes lock operations when we know no rows will be locked.
[RemoveZeroCardLock, Normalize]
(Lock $rows:* & (HasZeroRows $rows))
=>
$rows
46 changes: 46 additions & 0 deletions pkg/sql/opt/norm/testdata/rules/mutation
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,49 @@ update t106663
├── 1 [as=a_new:13]
├── false [as=partial_index_put2:14]
└── false [as=partial_index_del2:15]

norm set=optimizer_use_lock_op_for_serializable=true expect=RemoveZeroCardLock
SELECT a FROM t WHERE a = 1 AND a = 2 FOR UPDATE;
----
values
├── columns: a:2!null
├── cardinality: [0 - 0]
├── key: ()
└── fd: ()-->(2)

norm set=optimizer_use_lock_op_for_serializable=true expect=RemoveZeroCardLock
SELECT a FROM t WHERE a = 1 AND a = 2 FOR SHARE;
----
values
├── columns: a:2!null
├── cardinality: [0 - 0]
├── key: ()
└── fd: ()-->(2)

norm set=optimizer_use_lock_op_for_serializable=true expect-not=RemoveZeroCardLock
SELECT a FROM t WHERE a = 1 FOR SHARE;
----
lock t
├── columns: a:2!null [hidden: k:1!null]
├── locking: for-share
├── volatile, mutations
├── key: (1)
├── fd: ()-->(2)
└── select
├── columns: k:1!null a:2!null
├── key: (1)
├── fd: ()-->(2)
├── scan t
│ ├── columns: k:1!null a:2
│ ├── partial index predicates
│ │ ├── t_c_idx: filters
│ │ │ └── d:5 > 1 [outer=(5), constraints=(/5: [/2 - ]; tight)]
│ │ ├── t_e_idx: filters
│ │ │ ├── f:7 > 1 [outer=(7), constraints=(/7: [/2 - ]; tight)]
│ │ │ └── g:8 > 1 [outer=(8), constraints=(/8: [/2 - ]; tight)]
│ │ └── t_d_idx: filters
│ │ └── c:4 > 1 [outer=(4), constraints=(/4: [/2 - ]; tight)]
│ ├── key: (1)
│ └── fd: (1)-->(2)
└── filters
└── a:2 = 1 [outer=(2), constraints=(/2: [/1 - /1]; tight), fd=()-->(2)]

0 comments on commit e4d5829

Please sign in to comment.