From 57191e02f1d97ff676470216dd4a29d6c5fb43e5 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Fri, 15 Nov 2024 18:00:41 +0800 Subject: [PATCH] address comments --- br/pkg/restore/restorer.go | 22 +++++++++++----------- br/pkg/restore/restorer_test.go | 16 ++++++++-------- br/pkg/restore/snap_client/import.go | 2 +- br/pkg/restore/snap_client/import_test.go | 4 ++-- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/br/pkg/restore/restorer.go b/br/pkg/restore/restorer.go index 69a8be03b2e50..0bd8a4553955a 100644 --- a/br/pkg/restore/restorer.go +++ b/br/pkg/restore/restorer.go @@ -112,8 +112,8 @@ func NewFileSet(files []*backuppb.File, rules *utils.RewriteRules) BackupFileSet // 4. Log-compacted SST files // // It serves as a high-level interface for restoration, supporting implementations such as simpleRestorer -// and MultiRestorer. SstRestorer includes FileImporter for handling raw, transactional, and compacted SSTs, -// and MultiRestorer for TiDB-specific backups. +// and MultiTablesRestorer. SstRestorer includes FileImporter for handling raw, transactional, and compacted SSTs, +// and MultiTablesRestorer for TiDB-specific backups. type SstRestorer interface { // Restore imports the specified backup file sets into TiKV. // The onProgress function is called with progress updates as files are processed. @@ -137,15 +137,15 @@ type FileImporter interface { Close() error } -// ConcurrentFileImporter is a wrapper around FileImporter that adds concurrency controls. +// BalancedFileImporter is a wrapper around FileImporter that adds concurrency controls. // It ensures that file imports are balanced across storage nodes, which is particularly useful -// in MultiRestorer scenarios where concurrency management is critical for efficiency. -type ConcurrentFileImporter interface { +// in MultiTablesRestorer scenarios where concurrency management is critical for efficiency. +type BalancedFileImporter interface { FileImporter - // WaitUntilUnblock manages concurrency by controlling when imports can proceed, + // PauseForBackpressure manages concurrency by controlling when imports can proceed, // ensuring load is distributed evenly across storage nodes. - WaitUntilUnblock() + PauseForBackpressure() } type SimpleRestorer struct { @@ -211,13 +211,13 @@ type MultiTablesRestorer struct { eg *errgroup.Group ectx context.Context workerPool *util.WorkerPool - fileImporter ConcurrentFileImporter + fileImporter BalancedFileImporter checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType] } func NewMultiTablesRestorer( ctx context.Context, - fileImporter ConcurrentFileImporter, + fileImporter BalancedFileImporter, workerPool *util.WorkerPool, checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType], ) SstRestorer { @@ -273,7 +273,7 @@ func (m *MultiTablesRestorer) Restore(onProgress func(int64), batchFileSets ...B break } filesReplica := batchFileSet - m.fileImporter.WaitUntilUnblock() + m.fileImporter.PauseForBackpressure() m.workerPool.ApplyOnErrorGroup(m.eg, func() (restoreErr error) { fileStart := time.Now() defer func() { @@ -336,7 +336,7 @@ func (p *PipelineRestorerWrapper[T]) WithSplit(ctx context.Context, i iter.TryNe iter.FilterOut(i, func(item T) bool { // Skip items based on the strategy's criteria. // Non-skip iterms should be filter out. - return !strategy.ShouldSkip(item) + return strategy.ShouldSkip(item) }), func(item T) (T, error) { // Accumulate the item for potential splitting. strategy.Accumulate(item) diff --git a/br/pkg/restore/restorer_test.go b/br/pkg/restore/restorer_test.go index b4831f4a5a570..ccc39eb564992 100644 --- a/br/pkg/restore/restorer_test.go +++ b/br/pkg/restore/restorer_test.go @@ -113,30 +113,30 @@ func createSampleBatchFileSets() restore.BatchBackupFileSet { } } -// FakeConcurrentFileImporter is a minimal implementation for testing -type FakeConcurrentFileImporter struct { +// FakeBalancedFileImporteris a minimal implementation for testing +type FakeBalancedFileImporter struct { hasError bool unblockCount int } -func (f *FakeConcurrentFileImporter) Import(ctx context.Context, fileSets ...restore.BackupFileSet) error { +func (f *FakeBalancedFileImporter) Import(ctx context.Context, fileSets ...restore.BackupFileSet) error { if f.hasError { return errors.New("import error") } return nil } -func (f *FakeConcurrentFileImporter) WaitUntilUnblock() { +func (f *FakeBalancedFileImporter) PauseForBackpressure() { f.unblockCount++ } -func (f *FakeConcurrentFileImporter) Close() error { +func (f *FakeBalancedFileImporter) Close() error { return nil } func TestMultiTablesRestorerRestoreSuccess(t *testing.T) { ctx := context.Background() - importer := &FakeConcurrentFileImporter{} + importer := &FakeBalancedFileImporter{} workerPool := util.NewWorkerPool(2, "multi-tables-restorer") restorer := restore.NewMultiTablesRestorer(ctx, importer, workerPool, nil) @@ -154,7 +154,7 @@ func TestMultiTablesRestorerRestoreSuccess(t *testing.T) { func TestMultiTablesRestorerRestoreWithImportError(t *testing.T) { ctx := context.Background() - importer := &FakeConcurrentFileImporter{hasError: true} + importer := &FakeBalancedFileImporter{hasError: true} workerPool := util.NewWorkerPool(2, "multi-tables-restorer") restorer := restore.NewMultiTablesRestorer(ctx, importer, workerPool, nil) @@ -168,7 +168,7 @@ func TestMultiTablesRestorerRestoreWithImportError(t *testing.T) { func TestMultiTablesRestorerRestoreWithContextCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - importer := &FakeConcurrentFileImporter{} + importer := &FakeBalancedFileImporter{} workerPool := util.NewWorkerPool(2, "multi-tables-restorer") restorer := restore.NewMultiTablesRestorer(ctx, importer, workerPool, nil) diff --git a/br/pkg/restore/snap_client/import.go b/br/pkg/restore/snap_client/import.go index c98795bbc459c..4e71c6fbe0cc4 100644 --- a/br/pkg/restore/snap_client/import.go +++ b/br/pkg/restore/snap_client/import.go @@ -239,7 +239,7 @@ func NewSnapFileImporter( return fileImporter, nil } -func (importer *SnapFileImporter) WaitUntilUnblock() { +func (importer *SnapFileImporter) PauseForBackpressure() { importer.cond.L.Lock() for importer.ShouldBlock() { // wait for download worker notified diff --git a/br/pkg/restore/snap_client/import_test.go b/br/pkg/restore/snap_client/import_test.go index cfd2dc71b875d..9d9c79fe1a6f6 100644 --- a/br/pkg/restore/snap_client/import_test.go +++ b/br/pkg/restore/snap_client/import_test.go @@ -172,7 +172,7 @@ func TestSnapImporter(t *testing.T) { require.Error(t, err) files, rules := generateFiles() for _, file := range files { - importer.WaitUntilUnblock() + importer.PauseForBackpressure() err = importer.Import(ctx, restore.BackupFileSet{SSTFiles: []*backuppb.File{file}, RewriteRules: rules}) require.NoError(t, err) } @@ -194,7 +194,7 @@ func TestSnapImporterRaw(t *testing.T) { require.NoError(t, err) files, rules := generateFiles() for _, file := range files { - importer.WaitUntilUnblock() + importer.PauseForBackpressure() err = importer.Import(ctx, restore.BackupFileSet{SSTFiles: []*backuppb.File{file}, RewriteRules: rules}) require.NoError(t, err) }