Skip to content

Commit

Permalink
ddl: fix reorg handle not resumed after changing DDL owner (#56507) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Dec 12, 2024
1 parent 4df9aa8 commit 1a3cb4c
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 25 deletions.
2 changes: 2 additions & 0 deletions pkg/ddl/backfilling.go
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ func splitTableRanges(
errMsg := fmt.Sprintf("cannot find region in range [%s, %s]", startKey.String(), endKey.String())
return nil, errors.Trace(dbterror.ErrInvalidSplitRegionRanges.GenWithStackByArgs(errMsg))
}
failpoint.InjectCall("afterLoadTableRanges", len(ranges))
return ranges, nil
}

Expand Down Expand Up @@ -715,6 +716,7 @@ func (dc *ddlCtx) writePhysicalTableRecord(
zap.Int64("job ID", reorgInfo.ID),
zap.Error(err2))
}
failpoint.InjectCall("afterUpdateReorgMeta")
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ddl/backfilling_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,7 @@ func newTaskIDAllocator() *taskIDAllocator {
}

func (a *taskIDAllocator) alloc() int {
ret := a.id
a.id++
return a.id
return ret
}
36 changes: 36 additions & 0 deletions pkg/ddl/column_modify_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
testddlutil "github.com/pingcap/tidb/pkg/ddl/testutil"
"github.com/pingcap/tidb/pkg/ddl/util/callback"
"github.com/pingcap/tidb/pkg/domain"
Expand Down Expand Up @@ -646,3 +647,38 @@ func TestColumnTypeChangeGenUniqueChangingName(t *testing.T) {

tk.MustExec("drop table if exists t")
}

func TestModifyColumnReorgCheckpoint(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, columnModifyLease)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk.MustExec("set global tidb_ddl_reorg_worker_cnt = 1;")
tk.MustExec("create table t (a int primary key, b bigint);")
rowCnt := 10
for i := 0; i < rowCnt; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%d, %d)", i*10000, i*10000))
}
splitTableSQL := fmt.Sprintf("split table t between (0) and (%d*10000) regions %d;", rowCnt, rowCnt)
tk.MustQuery(splitTableSQL).Check(testkit.Rows(fmt.Sprintf("%d 1", rowCnt-1)))

retireOwner := false
err := failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterUpdateReorgMeta", func() {
if !retireOwner {
retireOwner = true
dom.DDL().OwnerManager().ResignOwner(context.Background())
}
})
require.NoError(t, err)

rangeCnts := []int{}
err = failpoint.EnableCall("github.com/pingcap/tidb/pkg/ddl/afterLoadTableRanges", func(rangeCnt int) {
rangeCnts = append(rangeCnts, rangeCnt)
})
require.NoError(t, err)

tk.MustExec("alter table t modify column b int;")
require.Len(t, rangeCnts, 2) // It should have two rounds for loading table ranges.
require.Less(t, rangeCnts[1], rangeCnts[0]) // Verify if the checkpoint is progressing.
}
8 changes: 4 additions & 4 deletions pkg/ddl/ingest/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type CheckpointManager struct {
// Live in memory.
mu sync.Mutex
checkpoints map[int]*taskCheckpoint // task ID -> checkpoint
// we require each task ID to be continuous and start from 1.
// we require each task ID to be continuous and start from 0.
minTaskIDFinished int
dirty bool
// Local meta.
Expand Down Expand Up @@ -167,7 +167,7 @@ func (s *CheckpointManager) Status() (keyCnt int, minKeyImported kv.Key) {
}

// Register registers a new task. taskID MUST be continuous ascending and start
// from 1.
// from 0.
//
// TODO(lance6716): remove this constraint, use endKey as taskID and use
// ordered map type for checkpoints.
Expand Down Expand Up @@ -221,14 +221,14 @@ func (s *CheckpointManager) UpdateWrittenKeys(taskID int, delta int) error {
// afterFlush should be called after all engine is flushed.
func (s *CheckpointManager) afterFlush() {
for {
cp := s.checkpoints[s.minTaskIDFinished+1]
cp := s.checkpoints[s.minTaskIDFinished]
if cp == nil || !cp.lastBatchRead || cp.writtenKeys < cp.totalKeys {
break
}
delete(s.checkpoints, s.minTaskIDFinished)
s.minTaskIDFinished++
s.flushedKeyLowWatermark = cp.endKey
s.flushedKeyCnt += cp.totalKeys
delete(s.checkpoints, s.minTaskIDFinished)
s.dirty = true
}
}
Expand Down
38 changes: 19 additions & 19 deletions pkg/ddl/ingest/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,39 +52,39 @@ func TestCheckpointManager(t *testing.T) {
require.NoError(t, err)
defer mgr.Close()

mgr.Register(1, []byte{'1', '9'})
mgr.Register(2, []byte{'2', '9'})
mgr.UpdateTotalKeys(1, 100, false)
mgr.Register(0, []byte{'1', '9'})
mgr.Register(1, []byte{'2', '9'})
mgr.UpdateTotalKeys(0, 100, false)
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.NoError(t, mgr.UpdateWrittenKeys(1, 100))
require.NoError(t, mgr.UpdateWrittenKeys(0, 100))
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
mgr.UpdateTotalKeys(1, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(1, 100))
mgr.UpdateTotalKeys(0, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(0, 100))
// The data is not imported to the storage yet.
require.False(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
flushCtrl.imported = true // Mock the data is imported to the storage.
require.NoError(t, mgr.UpdateWrittenKeys(2, 0))
require.NoError(t, mgr.UpdateWrittenKeys(1, 0))
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))

// Only when the last batch is completed, the job can be completed.
mgr.UpdateTotalKeys(2, 50, false)
mgr.UpdateTotalKeys(2, 50, true)
require.NoError(t, mgr.UpdateWrittenKeys(2, 50))
mgr.UpdateTotalKeys(1, 50, false)
mgr.UpdateTotalKeys(1, 50, true)
require.NoError(t, mgr.UpdateWrittenKeys(1, 50))
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'2', '9'}))
require.NoError(t, mgr.UpdateWrittenKeys(2, 50))
require.NoError(t, mgr.UpdateWrittenKeys(1, 50))
require.True(t, mgr.IsKeyProcessed([]byte{'1', '9'}))
require.True(t, mgr.IsKeyProcessed([]byte{'2', '9'}))

// Only when the subsequent job is completed, the previous job can be completed.
mgr.Register(3, []byte{'3', '9'})
mgr.Register(4, []byte{'4', '9'})
mgr.Register(5, []byte{'5', '9'})
mgr.Register(2, []byte{'3', '9'})
mgr.Register(3, []byte{'4', '9'})
mgr.Register(4, []byte{'5', '9'})
mgr.UpdateTotalKeys(2, 100, true)
mgr.UpdateTotalKeys(3, 100, true)
mgr.UpdateTotalKeys(4, 100, true)
mgr.UpdateTotalKeys(5, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(5, 100))
require.NoError(t, mgr.UpdateWrittenKeys(4, 100))
require.NoError(t, mgr.UpdateWrittenKeys(3, 100))
require.False(t, mgr.IsKeyProcessed([]byte{'3', '9'}))
require.False(t, mgr.IsKeyProcessed([]byte{'4', '9'}))
}
Expand All @@ -107,9 +107,9 @@ func TestCheckpointManagerUpdateReorg(t *testing.T) {
require.NoError(t, err)
defer mgr.Close()

mgr.Register(1, []byte{'1', '9'})
mgr.UpdateTotalKeys(1, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(1, 100))
mgr.Register(0, []byte{'1', '9'})
mgr.UpdateTotalKeys(0, 100, true)
require.NoError(t, mgr.UpdateWrittenKeys(0, 100))
mgr.Flush() // Wait the global checkpoint to be updated to the reorg table.
r, err := tk.Exec("select reorg_meta from mysql.tidb_ddl_reorg where job_id = 1 and ele_id = 1;")
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion tests/realtikvtest/addindextest3/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestBackfillOperators(t *testing.T) {

tasks := sink.collect()
require.Len(t, tasks, 10)
require.Equal(t, 1, tasks[0].ID)
require.Equal(t, 0, tasks[0].ID)
require.Equal(t, startKey, tasks[0].Start)
require.Equal(t, endKey, tasks[9].End)

Expand Down

0 comments on commit 1a3cb4c

Please sign in to comment.