From cccdb248904e772fb12af462a1c05cd666bf93fe Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 20 Nov 2024 12:17:54 +0800 Subject: [PATCH] resolve conflicts --- br/pkg/checkpoint/BUILD.bazel | 2 +- br/pkg/checkpoint/checkpoint.go | 1 - br/pkg/checkpoint/checkpoint_test.go | 94 ++++++++++++++++++++------ br/pkg/checkpoint/external_storage.go | 9 --- br/pkg/checkpoint/log_restore.go | 2 +- br/pkg/checkpoint/restore.go | 35 ++++++---- br/pkg/checkpoint/storage.go | 12 ++-- br/pkg/glue/progressing.go | 4 ++ br/pkg/restore/import_mode_switcher.go | 65 ++++++++++++------ br/pkg/restore/log_client/client.go | 4 +- br/pkg/restore/restorer.go | 2 +- br/pkg/restore/snap_client/client.go | 6 +- br/pkg/storage/BUILD.bazel | 1 + br/pkg/storage/helper.go | 40 +++++++---- br/pkg/task/operator/migrate_to.go | 6 +- br/pkg/task/restore.go | 18 +++-- 16 files changed, 204 insertions(+), 97 deletions(-) diff --git a/br/pkg/checkpoint/BUILD.bazel b/br/pkg/checkpoint/BUILD.bazel index c8679787db4a8..dc3471726625b 100644 --- a/br/pkg/checkpoint/BUILD.bazel +++ b/br/pkg/checkpoint/BUILD.bazel @@ -45,7 +45,7 @@ go_test( srcs = ["checkpoint_test.go"], flaky = True, race = "on", - shard_count = 8, + shard_count = 9, deps = [ ":checkpoint", "//br/pkg/gluetidb", diff --git a/br/pkg/checkpoint/checkpoint.go b/br/pkg/checkpoint/checkpoint.go index 765ede725fb98..861a60835a392 100644 --- a/br/pkg/checkpoint/checkpoint.go +++ b/br/pkg/checkpoint/checkpoint.go @@ -261,7 +261,6 @@ func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool) // wait the range flusher exit r.wg.Wait() // remove the checkpoint lock - r.checkpointStorage.deleteLock(ctx) r.checkpointStorage.close() } diff --git a/br/pkg/checkpoint/checkpoint_test.go b/br/pkg/checkpoint/checkpoint_test.go index c6756f8058c5c..1205774dac232 100644 --- a/br/pkg/checkpoint/checkpoint_test.go +++ b/br/pkg/checkpoint/checkpoint_test.go @@ -75,7 +75,7 @@ func TestCheckpointMetaForRestore(t *testing.T) { }, }, } - err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, checkpointMetaForSnapshotRestore) + err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, checkpointMetaForSnapshotRestore) require.NoError(t, err) checkpointMetaForSnapshotRestore2, err := checkpoint.LoadCheckpointMetadataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) require.NoError(t, err) @@ -278,9 +278,9 @@ func TestCheckpointRestoreRunner(t *testing.T) { se, err := g.CreateSession(s.Mock.Storage) require.NoError(t, err) - err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{}) + err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{}) require.NoError(t, err) - checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 5*time.Second, 3*time.Second) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 5*time.Second, 3*time.Second) require.NoError(t, err) data := map[string]struct { @@ -310,7 +310,7 @@ func TestCheckpointRestoreRunner(t *testing.T) { } for _, d := range data { - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, d.RangeKey) + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, d.RangeKey, "") require.NoError(t, err) } @@ -320,7 +320,7 @@ func TestCheckpointRestoreRunner(t *testing.T) { checkpointRunner.FlushChecksum(ctx, 4, 4, 4, 4) for _, d := range data2 { - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, d.RangeKey) + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, d.RangeKey, "") require.NoError(t, err) } @@ -343,7 +343,7 @@ func TestCheckpointRestoreRunner(t *testing.T) { respCount += 1 } - _, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checker) + _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.SnapshotRestoreCheckpointDatabaseName, checker) require.NoError(t, err) require.Equal(t, 4, respCount) @@ -355,10 +355,10 @@ func TestCheckpointRestoreRunner(t *testing.T) { require.Equal(t, checksum[i].Crc64xor, uint64(i)) } - err = checkpoint.RemoveCheckpointDataForSnapshotRestore(ctx, s.Mock.Domain, se) + err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.SnapshotRestoreCheckpointDatabaseName) require.NoError(t, err) - exists := checkpoint.ExistsSnapshotRestoreCheckpoint(ctx, s.Mock.Domain) + exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName) require.False(t, exists) exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.SnapshotRestoreCheckpointDatabaseName)) require.False(t, exists) @@ -371,9 +371,9 @@ func TestCheckpointRunnerRetry(t *testing.T) { se, err := g.CreateSession(s.Mock.Storage) require.NoError(t, err) - err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{}) + err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{}) require.NoError(t, err) - checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 100*time.Millisecond, 300*time.Millisecond) require.NoError(t, err) err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes", "return(true)") @@ -382,9 +382,9 @@ func TestCheckpointRunnerRetry(t *testing.T) { err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes") require.NoError(t, err) }() - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123") + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123", "") require.NoError(t, err) - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456") + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456", "") require.NoError(t, err) err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1) require.NoError(t, err) @@ -392,7 +392,7 @@ func TestCheckpointRunnerRetry(t *testing.T) { time.Sleep(time.Second) err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes") require.NoError(t, err) - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 3, "789") + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 3, "789", "") require.NoError(t, err) err = checkpointRunner.FlushChecksum(ctx, 3, 3, 3, 3) require.NoError(t, err) @@ -400,7 +400,8 @@ func TestCheckpointRunnerRetry(t *testing.T) { se, err = g.CreateSession(s.Mock.Storage) require.NoError(t, err) recordSet := make(map[string]int) - _, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), + _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), + checkpoint.SnapshotRestoreCheckpointDatabaseName, func(tableID int64, rangeKey checkpoint.RestoreValueType) { recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1 }) @@ -422,14 +423,14 @@ func TestCheckpointRunnerNoRetry(t *testing.T) { se, err := g.CreateSession(s.Mock.Storage) require.NoError(t, err) - err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{}) + err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{}) require.NoError(t, err) - checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 100*time.Millisecond, 300*time.Millisecond) require.NoError(t, err) - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123") + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123", "") require.NoError(t, err) - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456") + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456", "") require.NoError(t, err) err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1) require.NoError(t, err) @@ -440,7 +441,8 @@ func TestCheckpointRunnerNoRetry(t *testing.T) { se, err = g.CreateSession(s.Mock.Storage) require.NoError(t, err) recordSet := make(map[string]int) - _, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), + _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), + checkpoint.SnapshotRestoreCheckpointDatabaseName, func(tableID int64, rangeKey checkpoint.RestoreValueType) { recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1 }) @@ -584,3 +586,57 @@ func TestCheckpointRunnerLock(t *testing.T) { runner.WaitForFinish(ctx, true) } + +func TestCheckpointCompactedRestoreRunner(t *testing.T) { + ctx := context.Background() + s := utiltest.CreateRestoreSchemaSuite(t) + g := gluetidb.New() + se, err := g.CreateSession(s.Mock.Storage) + require.NoError(t, err) + + err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.CompactedRestoreCheckpointDatabaseName, nil) + require.NoError(t, err) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.CompactedRestoreCheckpointDatabaseName, 500*time.Millisecond, time.Second) + require.NoError(t, err) + + data := map[string]struct { + Name string + }{ + "a": {Name: "a"}, + "A": {Name: "A"}, + "1": {Name: "1"}, + } + + for _, d := range data { + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "", d.Name) + require.NoError(t, err) + } + + checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1) + checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2) + + checkpointRunner.WaitForFinish(ctx, true) + + se, err = g.CreateSession(s.Mock.Storage) + require.NoError(t, err) + respCount := 0 + checker := func(tableID int64, resp checkpoint.RestoreValueType) { + require.NotNil(t, resp) + d, ok := data[resp.Name] + require.True(t, ok) + require.Equal(t, d.Name, resp.Name) + respCount++ + } + + _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.CompactedRestoreCheckpointDatabaseName, checker) + require.NoError(t, err) + require.Equal(t, 3, respCount) + + err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.CompactedRestoreCheckpointDatabaseName) + require.NoError(t, err) + + exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CompactedRestoreCheckpointDatabaseName) + require.False(t, exists) + exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.CompactedRestoreCheckpointDatabaseName)) + require.False(t, exists) +} diff --git a/br/pkg/checkpoint/external_storage.go b/br/pkg/checkpoint/external_storage.go index 078f2f1294e91..1d7c999aa8722 100644 --- a/br/pkg/checkpoint/external_storage.go +++ b/br/pkg/checkpoint/external_storage.go @@ -187,12 +187,3 @@ func (s *externalCheckpointStorage) updateLock(ctx context.Context) error { return nil } - -func (s *externalCheckpointStorage) deleteLock(ctx context.Context) { - if s.lockId > 0 { - err := s.storage.DeleteFile(ctx, s.CheckpointLockPath) - if err != nil { - log.Warn("failed to remove the checkpoint lock", zap.Error(err)) - } - } -} diff --git a/br/pkg/checkpoint/log_restore.go b/br/pkg/checkpoint/log_restore.go index b2ae3c398a3c8..a75eeffac603e 100644 --- a/br/pkg/checkpoint/log_restore.go +++ b/br/pkg/checkpoint/log_restore.go @@ -299,7 +299,7 @@ func TryToGetCheckpointTaskInfo( return nil, errors.Trace(err) } } - hasSnapshotMetadata := ExistsSnapshotRestoreCheckpoint(ctx, dom) + hasSnapshotMetadata := ExistsSstRestoreCheckpoint(ctx, dom, SnapshotRestoreCheckpointDatabaseName) return &CheckpointTaskInfoForLogRestore{ Metadata: metadata, diff --git a/br/pkg/checkpoint/restore.go b/br/pkg/checkpoint/restore.go index 88ff6f8f204de..5e8f54f1f464a 100644 --- a/br/pkg/checkpoint/restore.go +++ b/br/pkg/checkpoint/restore.go @@ -31,6 +31,8 @@ type RestoreKeyType = int64 type RestoreValueType struct { // the file key of a range RangeKey string + // the file name, used for compacted restore + Name string } func (rv RestoreValueType) IdentKey() []byte { @@ -45,11 +47,12 @@ func valueMarshalerForRestore(group *RangeGroup[RestoreKeyType, RestoreValueType func StartCheckpointRestoreRunnerForTest( ctx context.Context, se glue.Session, + dbName string, tick time.Duration, retryDuration time.Duration, ) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error) { runner := newCheckpointRunner[RestoreKeyType, RestoreValueType]( - newTableCheckpointStorage(se, SnapshotRestoreCheckpointDatabaseName), + newTableCheckpointStorage(se, dbName), nil, valueMarshalerForRestore) runner.startCheckpointMainLoop(ctx, tick, tick, 0, retryDuration) @@ -60,9 +63,10 @@ func StartCheckpointRestoreRunnerForTest( func StartCheckpointRunnerForRestore( ctx context.Context, se glue.Session, + dbName string, ) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error) { runner := newCheckpointRunner[RestoreKeyType, RestoreValueType]( - newTableCheckpointStorage(se, SnapshotRestoreCheckpointDatabaseName), + newTableCheckpointStorage(se, dbName), nil, valueMarshalerForRestore) // for restore, no need to set lock @@ -77,23 +81,25 @@ func AppendRangesForRestore( r *CheckpointRunner[RestoreKeyType, RestoreValueType], tableID RestoreKeyType, rangeKey string, + name string, ) error { return r.Append(ctx, &CheckpointMessage[RestoreKeyType, RestoreValueType]{ GroupKey: tableID, Group: []RestoreValueType{ - {RangeKey: rangeKey}, + {RangeKey: rangeKey, Name: name}, }, }) } // load the whole checkpoint range data and retrieve the metadata of restored ranges // and return the total time cost in the past executions -func LoadCheckpointDataForSnapshotRestore[K KeyType, V ValueType]( +func LoadCheckpointDataForSstRestore[K KeyType, V ValueType]( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, + dbName string, fn func(K, V), ) (time.Duration, error) { - return selectCheckpointData(ctx, execCtx, SnapshotRestoreCheckpointDatabaseName, fn) + return selectCheckpointData(ctx, execCtx, dbName, fn) } func LoadCheckpointChecksumForRestore( @@ -118,28 +124,33 @@ func LoadCheckpointMetadataForSnapshotRestore( return m, err } -func SaveCheckpointMetadataForSnapshotRestore( +func SaveCheckpointMetadataForSstRestore( ctx context.Context, se glue.Session, + dbName string, meta *CheckpointMetadataForSnapshotRestore, ) error { - err := initCheckpointTable(ctx, se, SnapshotRestoreCheckpointDatabaseName, + err := initCheckpointTable(ctx, se, dbName, []string{checkpointDataTableName, checkpointChecksumTableName}) if err != nil { return errors.Trace(err) } - return insertCheckpointMeta(ctx, se, SnapshotRestoreCheckpointDatabaseName, checkpointMetaTableName, meta) + if meta != nil { + return insertCheckpointMeta(ctx, se, dbName, checkpointMetaTableName, meta) + } + return nil } -func ExistsSnapshotRestoreCheckpoint( +func ExistsSstRestoreCheckpoint( ctx context.Context, dom *domain.Domain, + dbName string, ) bool { return dom.InfoSchema(). - TableExists(pmodel.NewCIStr(SnapshotRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointMetaTableName)) + TableExists(pmodel.NewCIStr(dbName), pmodel.NewCIStr(checkpointMetaTableName)) } -func RemoveCheckpointDataForSnapshotRestore(ctx context.Context, dom *domain.Domain, se glue.Session) error { - return dropCheckpointTables(ctx, dom, se, SnapshotRestoreCheckpointDatabaseName, +func RemoveCheckpointDataForSstRestore(ctx context.Context, dom *domain.Domain, se glue.Session, dbName string) error { + return dropCheckpointTables(ctx, dom, se, dbName, []string{checkpointDataTableName, checkpointChecksumTableName, checkpointMetaTableName}) } diff --git a/br/pkg/checkpoint/storage.go b/br/pkg/checkpoint/storage.go index 465924f8300f4..116cdc1ff942d 100644 --- a/br/pkg/checkpoint/storage.go +++ b/br/pkg/checkpoint/storage.go @@ -38,7 +38,6 @@ type checkpointStorage interface { initialLock(ctx context.Context) error updateLock(ctx context.Context) error - deleteLock(ctx context.Context) close() } @@ -48,8 +47,9 @@ type checkpointStorage interface { // 2. BR regards the metadata table as a file so that it is not empty if the table exists. // 3. BR regards the checkpoint table as a directory which is managed by metadata table. const ( - LogRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Log_Restore_Checkpoint" - SnapshotRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint" + LogRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Log_Restore_Checkpoint" + SnapshotRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint" + CompactedRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Compacted_Restore_Checkpoint" // directory level table checkpointDataTableName string = "cpt_data" @@ -90,7 +90,9 @@ const ( // IsCheckpointDB checks whether the dbname is checkpoint database. func IsCheckpointDB(dbname pmodel.CIStr) bool { - return dbname.O == LogRestoreCheckpointDatabaseName || dbname.O == SnapshotRestoreCheckpointDatabaseName + return dbname.O == LogRestoreCheckpointDatabaseName || + dbname.O == SnapshotRestoreCheckpointDatabaseName || + dbname.O == CompactedRestoreCheckpointDatabaseName } const CheckpointIdMapBlockSize int = 524288 @@ -142,8 +144,6 @@ func (s *tableCheckpointStorage) updateLock(ctx context.Context) error { return nil } -func (s *tableCheckpointStorage) deleteLock(ctx context.Context) {} - func (s *tableCheckpointStorage) flushCheckpointData(ctx context.Context, data []byte) error { sqls, argss := chunkInsertCheckpointSQLs(s.checkpointDBName, checkpointDataTableName, data) for i, sql := range sqls { diff --git a/br/pkg/glue/progressing.go b/br/pkg/glue/progressing.go index 3182e46ba53df..664bbd0e23508 100644 --- a/br/pkg/glue/progressing.go +++ b/br/pkg/glue/progressing.go @@ -44,6 +44,10 @@ func (p pbProgress) GetCurrent() int64 { // Close marks the progress as 100% complete and that Inc() can no longer be // called. func (p pbProgress) Close() { + // This wait shouldn't block. + // We are just waiting the progress bar refresh to the finished state. + defer p.bar.Wait() + if p.bar.Completed() || p.bar.Aborted() { return } diff --git a/br/pkg/restore/import_mode_switcher.go b/br/pkg/restore/import_mode_switcher.go index be01389c19e5f..255bd3a35094a 100644 --- a/br/pkg/restore/import_mode_switcher.go +++ b/br/pkg/restore/import_mode_switcher.go @@ -31,7 +31,9 @@ type ImportModeSwitcher struct { switchModeInterval time.Duration tlsConf *tls.Config - switchCh chan struct{} + mu sync.Mutex + cancel context.CancelFunc // Manages goroutine lifecycle + wg sync.WaitGroup } func NewImportModeSwitcher( @@ -43,15 +45,23 @@ func NewImportModeSwitcher( pdClient: pdClient, switchModeInterval: switchModeInterval, tlsConf: tlsConf, - switchCh: make(chan struct{}), } } -var closeOnce sync.Once - // switchToNormalMode switch tikv cluster to normal mode. func (switcher *ImportModeSwitcher) SwitchToNormalMode(ctx context.Context) error { - closeOnce.Do(func() { close(switcher.switchCh) }) + switcher.mu.Lock() + defer switcher.mu.Unlock() + + if switcher.cancel == nil { + log.Info("TiKV is already in normal mode") + return nil + } + log.Info("Stopping the import mode goroutine") + switcher.cancel() + switcher.cancel = nil + // wait for switch goroutine exits + switcher.wg.Wait() return switcher.switchTiKVMode(ctx, import_sstpb.SwitchMode_Normal) } @@ -116,26 +126,43 @@ func (switcher *ImportModeSwitcher) switchTiKVMode( return nil } -// SwitchToImportMode switch tikv cluster to import mode. -func (switcher *ImportModeSwitcher) SwitchToImportMode( +// GoSwitchToImportMode switch tikv cluster to import mode. +func (switcher *ImportModeSwitcher) GoSwitchToImportMode( ctx context.Context, -) { +) error { + switcher.mu.Lock() + defer switcher.mu.Unlock() + + if switcher.cancel != nil { + log.Info("TiKV is already in import mode") + return nil + } + + // Create a new context for the goroutine + ctx, cancel := context.WithCancel(context.Background()) + switcher.cancel = cancel + + // [important!] switch tikv mode into import at the beginning + log.Info("switch to import mode at beginning") + err := switcher.switchTiKVMode(ctx, import_sstpb.SwitchMode_Import) + if err != nil { + log.Warn("switch to import mode failed", zap.Error(err)) + return errors.Trace(err) + } + switcher.wg.Add(1) // tikv automatically switch to normal mode in every 10 minutes // so we need ping tikv in less than 10 minute go func() { tick := time.NewTicker(switcher.switchModeInterval) - defer tick.Stop() - - // [important!] switch tikv mode into import at the beginning - log.Info("switch to import mode at beginning") - err := switcher.switchTiKVMode(ctx, import_sstpb.SwitchMode_Import) - if err != nil { - log.Warn("switch to import mode failed", zap.Error(err)) - } + defer func() { + switcher.wg.Done() + tick.Stop() + }() for { select { case <-ctx.Done(): + log.Info("stop automatic switch to import mode when context done") return case <-tick.C: log.Info("switch to import mode") @@ -143,12 +170,10 @@ func (switcher *ImportModeSwitcher) SwitchToImportMode( if err != nil { log.Warn("switch to import mode failed", zap.Error(err)) } - case <-switcher.switchCh: - log.Info("stop automatic switch to import mode") - return } } }() + return nil } // RestorePreWork executes some prepare work before restore. @@ -166,7 +191,7 @@ func RestorePreWork( if switchToImport { // Switch TiKV cluster to import mode (adjust rocksdb configuration). - switcher.SwitchToImportMode(ctx) + switcher.GoSwitchToImportMode(ctx) } return mgr.RemoveSchedulersWithConfig(ctx) diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index 4faf59a316657..1ed4e64271ebb 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -166,7 +166,7 @@ func NewSstRestoreManager( return nil, errors.Trace(err) } if se != nil { - checkpointRunner, err := checkpoint.StartCheckpointRunnerForRestore(ctx, se) + checkpointRunner, err := checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.CompactedRestoreCheckpointDatabaseName) if err != nil { return nil, errors.Trace(err) } @@ -284,7 +284,7 @@ func (rc *LogClient) RestoreCompactedSstFiles( log.Info("[Compacted SST Restore] No SST files found for restoration.") return nil } - importModeSwitcher.SwitchToImportMode(ctx) + importModeSwitcher.GoSwitchToImportMode(ctx) defer func() { switchErr := importModeSwitcher.SwitchToNormalMode(ctx) if switchErr != nil { diff --git a/br/pkg/restore/restorer.go b/br/pkg/restore/restorer.go index 75a21b583eb1f..1201d1b6d463b 100644 --- a/br/pkg/restore/restorer.go +++ b/br/pkg/restore/restorer.go @@ -302,7 +302,7 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets .. for rangeKey := range rangeKeySet { // The checkpoint range shows this ranges of kvs has been restored into // the table corresponding to the table-id. - if err := checkpoint.AppendRangesForRestore(m.ectx, m.checkpointRunner, filesGroup.TableID, rangeKey); err != nil { + if err := checkpoint.AppendRangesForRestore(m.ectx, m.checkpointRunner, filesGroup.TableID, rangeKey, ""); err != nil { return errors.Trace(err) } } diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index a7e0ecab3d230..7dcd37189a246 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -346,7 +346,7 @@ func (rc *SnapClient) InitCheckpoint( } // t1 is the latest time the checkpoint ranges persisted to the external storage. - t1, err := checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, execCtx, func(tableID int64, v checkpoint.RestoreValueType) { + t1, err := checkpoint.LoadCheckpointDataForSstRestore(ctx, execCtx, checkpoint.SnapshotRestoreCheckpointDatabaseName, func(tableID int64, v checkpoint.RestoreValueType) { checkpointSet, exists := checkpointSetWithTableID[tableID] if !exists { checkpointSet = make(map[string]struct{}) @@ -379,7 +379,7 @@ func (rc *SnapClient) InitCheckpoint( if config != nil { meta.SchedulersConfig = &pdutil.ClusterConfig{Schedulers: config.Schedulers, ScheduleCfg: config.ScheduleCfg} } - if err := checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, rc.db.Session(), meta); err != nil { + if err := checkpoint.SaveCheckpointMetadataForSstRestore(ctx, rc.db.Session(), checkpoint.SnapshotRestoreCheckpointDatabaseName, meta); err != nil { return checkpointSetWithTableID, nil, errors.Trace(err) } } @@ -388,7 +388,7 @@ func (rc *SnapClient) InitCheckpoint( if err != nil { return checkpointSetWithTableID, nil, errors.Trace(err) } - rc.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se) + rc.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName) return checkpointSetWithTableID, checkpointClusterConfig, errors.Trace(err) } diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 9084fc41db3cd..60c587893af9f 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -77,6 +77,7 @@ go_library( "@org_golang_google_api//transport/http", "@org_golang_x_net//http2", "@org_golang_x_oauth2//google", + "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", diff --git a/br/pkg/storage/helper.go b/br/pkg/storage/helper.go index c0c5c63ba0747..3f3eca5b3cfb5 100644 --- a/br/pkg/storage/helper.go +++ b/br/pkg/storage/helper.go @@ -7,8 +7,12 @@ import ( "sync/atomic" "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/util" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) func init() { @@ -48,22 +52,32 @@ func UnmarshalDir[T any](ctx context.Context, walkOpt *WalkOption, s ExternalSto errCh := make(chan error, 1) reader := func() { defer close(ch) + pool := util.NewWorkerPool(128, "metadata") + eg, ectx := errgroup.WithContext(ctx) err := s.WalkDir(ctx, walkOpt, func(path string, size int64) error { - metaBytes, err := s.ReadFile(ctx, path) - if err != nil { - return errors.Annotatef(err, "failed during reading file %s", path) - } - var meta T - if err := unmarshal(&meta, path, metaBytes); err != nil { - return errors.Annotatef(err, "failed to parse subcompaction meta of file %s", path) - } - select { - case ch <- &meta: - case <-ctx.Done(): - return ctx.Err() - } + pool.ApplyOnErrorGroup(eg, func() error { + metaBytes, err := s.ReadFile(ectx, path) + if err != nil { + log.Error("failed to read file", zap.String("file", path)) + return errors.Annotatef(err, "during reading meta file %s from storage", path) + } + + var meta T + if err := unmarshal(&meta, path, metaBytes); err != nil { + return errors.Annotatef(err, "failed to unmarshal file %s", path) + } + select { + case ch <- &meta: + case <-ctx.Done(): + return ctx.Err() + } + return nil + }) return nil }) + if err == nil { + err = eg.Wait() + } if err != nil { select { case errCh <- err: diff --git a/br/pkg/task/operator/migrate_to.go b/br/pkg/task/operator/migrate_to.go index 20f76b0f86967..282e82784ecb9 100644 --- a/br/pkg/task/operator/migrate_to.go +++ b/br/pkg/task/operator/migrate_to.go @@ -5,7 +5,7 @@ import ( "github.com/fatih/color" "github.com/pingcap/errors" - backup "github.com/pingcap/kvproto/pkg/brpb" + backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" @@ -39,7 +39,7 @@ func (cx migrateToCtx) printErr(errs []error, msg string) { } } -func (cx migrateToCtx) askForContinue(targetMig *backup.Migration) bool { +func (cx migrateToCtx) askForContinue(targetMig *backuppb.Migration) bool { tbl := cx.console.CreateTable() stream.AddMigrationToTable(targetMig, tbl) cx.console.Println("The migration going to be executed will be like: ") @@ -124,7 +124,7 @@ func RunMigrateTo(ctx context.Context, cfg MigrateToConfig) error { } return run(func(est stream.MigrationExt) stream.MergeAndMigratedTo { - return est.MergeAndMigrateTo(ctx, targetVersion, stream.MMOptInteractiveCheck(func(ctx context.Context, m *backup.Migration) bool { + return est.MergeAndMigrateTo(ctx, targetVersion, stream.MMOptInteractiveCheck(func(ctx context.Context, m *backuppb.Migration) bool { return cfg.Yes || cx.askForContinue(m) })) }) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 90f85cf15a7c7..20b95d1042663 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -359,7 +359,6 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig if err != nil { return errors.Trace(err) } - // parse common config if needed if !skipCommonConfig { err = cfg.Config.ParseFromFlags(flags) @@ -367,7 +366,10 @@ func (cfg *RestoreConfig) ParseFromFlags(flags *pflag.FlagSet, skipCommonConfig return errors.Trace(err) } } - + err = cfg.Config.ParseFromFlags(flags) + if err != nil { + return errors.Trace(err) + } err = cfg.RestoreCommonConfig.ParseFromFlags(flags) if err != nil { return errors.Trace(err) @@ -654,13 +656,13 @@ func registerTaskToPD(ctx context.Context, etcdCLI *clientv3.Client) (closeF fun func DefaultRestoreConfig(commonConfig Config) RestoreConfig { fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError) + DefineCommonFlags(fs) DefineRestoreFlags(fs) cfg := RestoreConfig{} err := cfg.ParseFromFlags(fs, true) if err != nil { log.Panic("failed to parse restore flags to config", zap.Error(err)) } - cfg.Config = commonConfig return cfg } @@ -725,12 +727,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if err != nil { log.Warn("failed to remove checkpoint data for log restore", zap.Error(err)) } - err = checkpoint.RemoveCheckpointDataForSnapshotRestore(c, mgr.GetDomain(), se) + err = checkpoint.RemoveCheckpointDataForSstRestore(c, mgr.GetDomain(), se, checkpoint.CompactedRestoreCheckpointDatabaseName) + if err != nil { + log.Warn("failed to remove checkpoint data for compacted restore", zap.Error(err)) + } + err = checkpoint.RemoveCheckpointDataForSstRestore(c, mgr.GetDomain(), se, checkpoint.SnapshotRestoreCheckpointDatabaseName) if err != nil { log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err)) } } else { - err = checkpoint.RemoveCheckpointDataForSnapshotRestore(c, mgr.GetDomain(), se) + err = checkpoint.RemoveCheckpointDataForSstRestore(c, mgr.GetDomain(), se, checkpoint.SnapshotRestoreCheckpointDatabaseName) if err != nil { log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err)) } @@ -875,7 +881,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s if cfg.UseCheckpoint { // if the checkpoint metadata exists in the checkpoint storage, the restore is not // for the first time. - existsCheckpointMetadata := checkpoint.ExistsSnapshotRestoreCheckpoint(ctx, mgr.GetDomain()) + existsCheckpointMetadata := checkpoint.ExistsSstRestoreCheckpoint(ctx, mgr.GetDomain(), checkpoint.SnapshotRestoreCheckpointDatabaseName) checkpointFirstRun = !existsCheckpointMetadata }