Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer committed Nov 15, 2024
1 parent 57d15db commit 57191e0
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 22 deletions.
22 changes: 11 additions & 11 deletions br/pkg/restore/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func NewFileSet(files []*backuppb.File, rules *utils.RewriteRules) BackupFileSet
// 4. Log-compacted SST files
//
// It serves as a high-level interface for restoration, supporting implementations such as simpleRestorer
// and MultiRestorer. SstRestorer includes FileImporter for handling raw, transactional, and compacted SSTs,
// and MultiRestorer for TiDB-specific backups.
// and MultiTablesRestorer. SstRestorer includes FileImporter for handling raw, transactional, and compacted SSTs,
// and MultiTablesRestorer for TiDB-specific backups.
type SstRestorer interface {
// Restore imports the specified backup file sets into TiKV.
// The onProgress function is called with progress updates as files are processed.
Expand All @@ -137,15 +137,15 @@ type FileImporter interface {
Close() error
}

// ConcurrentFileImporter is a wrapper around FileImporter that adds concurrency controls.
// BalancedFileImporter is a wrapper around FileImporter that adds concurrency controls.
// It ensures that file imports are balanced across storage nodes, which is particularly useful
// in MultiRestorer scenarios where concurrency management is critical for efficiency.
type ConcurrentFileImporter interface {
// in MultiTablesRestorer scenarios where concurrency management is critical for efficiency.
type BalancedFileImporter interface {
FileImporter

// WaitUntilUnblock manages concurrency by controlling when imports can proceed,
// PauseForBackpressure manages concurrency by controlling when imports can proceed,
// ensuring load is distributed evenly across storage nodes.
WaitUntilUnblock()
PauseForBackpressure()
}

type SimpleRestorer struct {
Expand Down Expand Up @@ -211,13 +211,13 @@ type MultiTablesRestorer struct {
eg *errgroup.Group
ectx context.Context
workerPool *util.WorkerPool
fileImporter ConcurrentFileImporter
fileImporter BalancedFileImporter
checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]
}

func NewMultiTablesRestorer(
ctx context.Context,
fileImporter ConcurrentFileImporter,
fileImporter BalancedFileImporter,
workerPool *util.WorkerPool,
checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType],
) SstRestorer {
Expand Down Expand Up @@ -273,7 +273,7 @@ func (m *MultiTablesRestorer) Restore(onProgress func(int64), batchFileSets ...B
break
}
filesReplica := batchFileSet
m.fileImporter.WaitUntilUnblock()
m.fileImporter.PauseForBackpressure()
m.workerPool.ApplyOnErrorGroup(m.eg, func() (restoreErr error) {
fileStart := time.Now()
defer func() {
Expand Down Expand Up @@ -336,7 +336,7 @@ func (p *PipelineRestorerWrapper[T]) WithSplit(ctx context.Context, i iter.TryNe
iter.FilterOut(i, func(item T) bool {
// Skip items based on the strategy's criteria.
// Non-skip iterms should be filter out.
return !strategy.ShouldSkip(item)
return strategy.ShouldSkip(item)
}), func(item T) (T, error) {
// Accumulate the item for potential splitting.
strategy.Accumulate(item)
Expand Down
16 changes: 8 additions & 8 deletions br/pkg/restore/restorer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,30 +113,30 @@ func createSampleBatchFileSets() restore.BatchBackupFileSet {
}
}

// FakeConcurrentFileImporter is a minimal implementation for testing
type FakeConcurrentFileImporter struct {
// FakeBalancedFileImporteris a minimal implementation for testing
type FakeBalancedFileImporter struct {
hasError bool
unblockCount int
}

func (f *FakeConcurrentFileImporter) Import(ctx context.Context, fileSets ...restore.BackupFileSet) error {
func (f *FakeBalancedFileImporter) Import(ctx context.Context, fileSets ...restore.BackupFileSet) error {
if f.hasError {
return errors.New("import error")
}
return nil
}

func (f *FakeConcurrentFileImporter) WaitUntilUnblock() {
func (f *FakeBalancedFileImporter) PauseForBackpressure() {
f.unblockCount++
}

func (f *FakeConcurrentFileImporter) Close() error {
func (f *FakeBalancedFileImporter) Close() error {
return nil
}

func TestMultiTablesRestorerRestoreSuccess(t *testing.T) {
ctx := context.Background()
importer := &FakeConcurrentFileImporter{}
importer := &FakeBalancedFileImporter{}
workerPool := util.NewWorkerPool(2, "multi-tables-restorer")

restorer := restore.NewMultiTablesRestorer(ctx, importer, workerPool, nil)
Expand All @@ -154,7 +154,7 @@ func TestMultiTablesRestorerRestoreSuccess(t *testing.T) {

func TestMultiTablesRestorerRestoreWithImportError(t *testing.T) {
ctx := context.Background()
importer := &FakeConcurrentFileImporter{hasError: true}
importer := &FakeBalancedFileImporter{hasError: true}
workerPool := util.NewWorkerPool(2, "multi-tables-restorer")

restorer := restore.NewMultiTablesRestorer(ctx, importer, workerPool, nil)
Expand All @@ -168,7 +168,7 @@ func TestMultiTablesRestorerRestoreWithImportError(t *testing.T) {
func TestMultiTablesRestorerRestoreWithContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
importer := &FakeConcurrentFileImporter{}
importer := &FakeBalancedFileImporter{}
workerPool := util.NewWorkerPool(2, "multi-tables-restorer")

restorer := restore.NewMultiTablesRestorer(ctx, importer, workerPool, nil)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func NewSnapFileImporter(
return fileImporter, nil
}

func (importer *SnapFileImporter) WaitUntilUnblock() {
func (importer *SnapFileImporter) PauseForBackpressure() {
importer.cond.L.Lock()
for importer.ShouldBlock() {
// wait for download worker notified
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/restore/snap_client/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestSnapImporter(t *testing.T) {
require.Error(t, err)
files, rules := generateFiles()
for _, file := range files {
importer.WaitUntilUnblock()
importer.PauseForBackpressure()
err = importer.Import(ctx, restore.BackupFileSet{SSTFiles: []*backuppb.File{file}, RewriteRules: rules})
require.NoError(t, err)
}
Expand All @@ -194,7 +194,7 @@ func TestSnapImporterRaw(t *testing.T) {
require.NoError(t, err)
files, rules := generateFiles()
for _, file := range files {
importer.WaitUntilUnblock()
importer.PauseForBackpressure()
err = importer.Import(ctx, restore.BackupFileSet{SSTFiles: []*backuppb.File{file}, RewriteRules: rules})
require.NoError(t, err)
}
Expand Down

0 comments on commit 57191e0

Please sign in to comment.