From 1a3cb4cfa241a3bcf23a97a013aa81286bff9338 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 12 Dec 2024 14:52:42 +0800 Subject: [PATCH] ddl: fix reorg handle not resumed after changing DDL owner (#56507) (#57926) close pingcap/tidb#56506 --- pkg/ddl/backfilling.go | 2 + pkg/ddl/backfilling_scheduler.go | 3 +- pkg/ddl/column_modify_test.go | 36 ++++++++++++++++++ pkg/ddl/ingest/checkpoint.go | 8 ++-- pkg/ddl/ingest/checkpoint_test.go | 38 +++++++++---------- .../addindextest3/operator_test.go | 2 +- 6 files changed, 64 insertions(+), 25 deletions(-) diff --git a/pkg/ddl/backfilling.go b/pkg/ddl/backfilling.go index 9367ac8c7e0d7..d22d8c35c319d 100644 --- a/pkg/ddl/backfilling.go +++ b/pkg/ddl/backfilling.go @@ -463,6 +463,7 @@ func splitTableRanges( errMsg := fmt.Sprintf("cannot find region in range [%s, %s]", startKey.String(), endKey.String()) return nil, errors.Trace(dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs(errMsg)) } + failpoint.InjectCall("afterLoadTableRanges", len(ranges)) return ranges, nil } @@ -715,6 +716,7 @@ func (dc *ddlCtx) writePhysicalTableRecord( zap.Int64("job ID", reorgInfo.ID), zap.Error(err2)) } + failpoint.InjectCall("afterUpdateReorgMeta") } } } diff --git a/pkg/ddl/backfilling_scheduler.go b/pkg/ddl/backfilling_scheduler.go index bed5815106517..3be976c64f915 100644 --- a/pkg/ddl/backfilling_scheduler.go +++ b/pkg/ddl/backfilling_scheduler.go @@ -580,6 +580,7 @@ func newTaskIDAllocator() *taskIDAllocator { } func (a *taskIDAllocator) alloc() int { + ret := a.id a.id++ - return a.id + return ret } diff --git a/pkg/ddl/column_modify_test.go b/pkg/ddl/column_modify_test.go index d09b7f9715203..5e2b50495e64e 100644 --- a/pkg/ddl/column_modify_test.go +++ b/pkg/ddl/column_modify_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" testddlutil "github.com/pingcap/tidb/pkg/ddl/testutil" "github.com/pingcap/tidb/pkg/ddl/util/callback" "github.com/pingcap/tidb/pkg/domain" @@ -646,3 +647,38 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) { tk.MustExec("drop table if exists t") } + +func TestModifyColumnReorgCheckpoint(t *testing.T) { + store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk2 := testkit.NewTestKit(t, store) + tk2.MustExec("use test") + tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;") + tk.MustExec("create table t (a int primary key, b bigint);") + rowCnt := 10 + for i := 0; i < rowCnt; i++ { + tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i*10000, i*10000)) + } + splitTableSQL := fmt.Sprintf("split table t between (0) and (%d*10000) regions %d;", rowCnt, rowCnt) + tk.MustQuery(splitTableSQL).Check(testkit.Rows(fmt.Sprintf("%d 1", rowCnt-1))) + + retireOwner := false + err := failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterUpdateReorgMeta", func() { + if !retireOwner { + retireOwner = true + dom.DDL().OwnerManager().ResignOwner(context.Background()) + } + }) + require.NoError(t, err) + + rangeCnts := []int{} + err = failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterLoadTableRanges", func(rangeCnt int) { + rangeCnts = append(rangeCnts, rangeCnt) + }) + require.NoError(t, err) + + tk.MustExec("alter table t modify column b int;") + require.Len(t, rangeCnts, 2) // It should have two rounds for loading table ranges. + require.Less(t, rangeCnts[1], rangeCnts[0]) // Verify if the checkpoint is progressing. +} diff --git a/pkg/ddl/ingest/checkpoint.go b/pkg/ddl/ingest/checkpoint.go index 5c74c54628063..e1de2fe43d627 100644 --- a/pkg/ddl/ingest/checkpoint.go +++ b/pkg/ddl/ingest/checkpoint.go @@ -55,7 +55,7 @@ type CheckpointManager struct { // Live in memory. mu sync.Mutex checkpoints map[int]*taskCheckpoint // task ID -> checkpoint - // we require each task ID to be continuous and start from 1. + // we require each task ID to be continuous and start from 0. minTaskIDFinished int dirty bool // Local meta. @@ -167,7 +167,7 @@ func (s *CheckpointManager) Status() (keyCnt int, minKeyImported kv.Key) { } // Register registers a new task. taskID MUST be continuous ascending and start -// from 1. +// from 0. // // TODO(lance6716): remove this constraint, use endKey as taskID and use // ordered map type for checkpoints. @@ -221,14 +221,14 @@ func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) error { // afterFlush should be called after all engine is flushed. func (s *CheckpointManager) afterFlush() { for { - cp := s.checkpoints[s.minTaskIDFinished+1] + cp := s.checkpoints[s.minTaskIDFinished] if cp == nil || !cp.lastBatchRead || cp.writtenKeys < cp.totalKeys { break } + delete(s.checkpoints, s.minTaskIDFinished) s.minTaskIDFinished++ s.flushedKeyLowWatermark = cp.endKey s.flushedKeyCnt += cp.totalKeys - delete(s.checkpoints, s.minTaskIDFinished) s.dirty = true } } diff --git a/pkg/ddl/ingest/checkpoint_test.go b/pkg/ddl/ingest/checkpoint_test.go index 08a907255bcde..560f643549b52 100644 --- a/pkg/ddl/ingest/checkpoint_test.go +++ b/pkg/ddl/ingest/checkpoint_test.go @@ -52,39 +52,39 @@ func TestCheckpointManager(t *testing.T) { require.NoError(t, err) defer mgr.Close() - mgr.Register(1, []byte{'1', '9'}) - mgr.Register(2, []byte{'2', '9'}) - mgr.UpdateTotalKeys(1, 100, false) + mgr.Register(0, []byte{'1', '9'}) + mgr.Register(1, []byte{'2', '9'}) + mgr.UpdateTotalKeys(0, 100, false) require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) - require.NoError(t, mgr.UpdateWrittenKeys(1, 100)) + require.NoError(t, mgr.UpdateWrittenKeys(0, 100)) require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) - mgr.UpdateTotalKeys(1, 100, true) - require.NoError(t, mgr.UpdateWrittenKeys(1, 100)) + mgr.UpdateTotalKeys(0, 100, true) + require.NoError(t, mgr.UpdateWrittenKeys(0, 100)) // The data is not imported to the storage yet. require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'})) flushCtrl.imported = true // Mock the data is imported to the storage. - require.NoError(t, mgr.UpdateWrittenKeys(2, 0)) + require.NoError(t, mgr.UpdateWrittenKeys(1, 0)) require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) // Only when the last batch is completed, the job can be completed. - mgr.UpdateTotalKeys(2, 50, false) - mgr.UpdateTotalKeys(2, 50, true) - require.NoError(t, mgr.UpdateWrittenKeys(2, 50)) + mgr.UpdateTotalKeys(1, 50, false) + mgr.UpdateTotalKeys(1, 50, true) + require.NoError(t, mgr.UpdateWrittenKeys(1, 50)) require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'})) - require.NoError(t, mgr.UpdateWrittenKeys(2, 50)) + require.NoError(t, mgr.UpdateWrittenKeys(1, 50)) require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'})) require.True(t, mgr.IsKeyProcessed([]byte{'2', '9'})) // Only when the subsequent job is completed, the previous job can be completed. - mgr.Register(3, []byte{'3', '9'}) - mgr.Register(4, []byte{'4', '9'}) - mgr.Register(5, []byte{'5', '9'}) + mgr.Register(2, []byte{'3', '9'}) + mgr.Register(3, []byte{'4', '9'}) + mgr.Register(4, []byte{'5', '9'}) + mgr.UpdateTotalKeys(2, 100, true) mgr.UpdateTotalKeys(3, 100, true) mgr.UpdateTotalKeys(4, 100, true) - mgr.UpdateTotalKeys(5, 100, true) - require.NoError(t, mgr.UpdateWrittenKeys(5, 100)) require.NoError(t, mgr.UpdateWrittenKeys(4, 100)) + require.NoError(t, mgr.UpdateWrittenKeys(3, 100)) require.False(t, mgr.IsKeyProcessed([]byte{'3', '9'})) require.False(t, mgr.IsKeyProcessed([]byte{'4', '9'})) } @@ -107,9 +107,9 @@ func TestCheckpointManagerUpdateReorg(t *testing.T) { require.NoError(t, err) defer mgr.Close() - mgr.Register(1, []byte{'1', '9'}) - mgr.UpdateTotalKeys(1, 100, true) - require.NoError(t, mgr.UpdateWrittenKeys(1, 100)) + mgr.Register(0, []byte{'1', '9'}) + mgr.UpdateTotalKeys(0, 100, true) + require.NoError(t, mgr.UpdateWrittenKeys(0, 100)) mgr.Flush() // Wait the global checkpoint to be updated to the reorg table. r, err := tk.Exec("select reorg_meta from mysql.tidb_ddl_reorg where job_id = 1 and ele_id = 1;") require.NoError(t, err) diff --git a/tests/realtikvtest/addindextest3/operator_test.go b/tests/realtikvtest/addindextest3/operator_test.go index c07386a7eb144..a8c2ac62e41ad 100644 --- a/tests/realtikvtest/addindextest3/operator_test.go +++ b/tests/realtikvtest/addindextest3/operator_test.go @@ -73,7 +73,7 @@ func TestBackfillOperators(t *testing.T) { tasks := sink.collect() require.Len(t, tasks, 10) - require.Equal(t, 1, tasks[0].ID) + require.Equal(t, 0, tasks[0].ID) require.Equal(t, startKey, tasks[0].Start) require.Equal(t, endKey, tasks[9].End)