Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

F miran seprate upload download worker #23

Open
wants to merge 12 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions cmd/migration_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ var (
region string
migrateToPath string
duplicateSuffix string
concurrency int
encrypt bool
resume bool
skip int // 0 --> Replace; 1 --> Skip; 2 --> Duplicate
Expand Down Expand Up @@ -59,7 +58,6 @@ func init() {
migrateCmd.Flags().BoolVar(&deleteSource, "delete-source", false, "Delete object in s3 that is migrated to dStorage")
migrateCmd.Flags().StringVar(&awsCredPath, "aws-cred-path", "", "File Path to aws credentials")
migrateCmd.Flags().StringVar(&workDir, "wd", filepath.Join(util.GetHomeDir(), ".s3migration"), "Working directory")
migrateCmd.Flags().IntVar(&concurrency, "concurrency", 10, "number of concurrent files to process concurrently during migration")
migrateCmd.Flags().BoolVar(&resume, "resume", false, "pass this option to resume migration from previous state")
migrateCmd.Flags().IntVar(&skip, "skip", 1, "0 --> Replace existing files; 1 --> Skip migration; 2 --> Duplicate")
migrateCmd.Flags().IntVar(&retryCount, "retry", 3, "retry count for upload to dstorage")
Expand Down Expand Up @@ -186,7 +184,6 @@ var migrateCmd = &cobra.Command{
AllocationID: allocationId,
Region: region,
Skip: skip,
Concurrency: concurrency,
Bucket: bucket,
Prefix: prefix,
MigrateToPath: migrateToPath,
Expand Down
12 changes: 8 additions & 4 deletions migration/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,6 @@ type Migration struct {
skip int
retryCount int

//Number of goroutines to run. So at most concurrency * Batch goroutines will run. i.e. for bucket level and object level
concurrency int

szCtMu sync.Mutex //size and count mutex; used to update migratedSize and totalMigratedObjects
migratedSize uint64
totalMigratedObjects uint64
Expand Down Expand Up @@ -100,7 +97,6 @@ func InitMigration(mConfig *MigrationConfig) error {
zStore: dStorageService,
awsStore: awsStorageService,
skip: mConfig.Skip,
concurrency: mConfig.Concurrency,
retryCount: mConfig.RetryCount,
stateFilePath: mConfig.StateFilePath,
migrateTo: mConfig.MigrateToPath,
Expand Down Expand Up @@ -199,6 +195,7 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorke
DoneChan: make(chan struct{}, 1),
ErrChan: make(chan error, 1),
}
migrator.PushToDownloadQueue(downloadObjMeta)

go func() {
defer wg.Done()
Expand All @@ -209,6 +206,7 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorke
}
if downloadObjMeta.IsFileAlreadyExist && migration.skip == Skip {
zlogger.Logger.Info("Skipping migration of object" + downloadObjMeta.ObjectKey)
downloadObjMeta.DoneChan <- struct{}{}
return
}
migrator.DownloadStart(downloadObjMeta)
Expand Down Expand Up @@ -241,6 +239,8 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker)
ErrChan: make(chan error, 1),
Size: downloadObj.Size,
}
migrator.PushToUploadQueue(uploadObj)

wg.Add(1)
go func() {
defer wg.Done()
Expand All @@ -249,6 +249,10 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker)
migrator.SetMigrationError(err)
return
}
if downloadObj.LocalPath == "" {
uploadObj.DoneChan <- struct{}{}
return
}
defer func() {
_ = m.fs.Remove(downloadObj.LocalPath)
}()
Expand Down
1 change: 0 additions & 1 deletion migration/migrateConfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import "time"
type MigrationConfig struct {
AllocationID string
Skip int
Concurrency int
Bucket string
Region string
Prefix string
Expand Down
13 changes: 10 additions & 3 deletions migration/migration_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,13 @@ func (m *MigrationWorker) PauseUpload() {
}
}

func (m *MigrationWorker) PushToUploadQueue(u *UploadObjectMeta) {
m.uploadQueue <- u
}

func (m *MigrationWorker) UploadStart(u *UploadObjectMeta) {
m.incrUploadConcurrency()
atomic.AddInt64(&m.currentUploadSize, u.Size)
m.uploadQueue <- u
}

func (m *MigrationWorker) UploadDone(u *UploadObjectMeta, err error) {
Expand All @@ -96,8 +99,9 @@ func (m *MigrationWorker) UploadDone(u *UploadObjectMeta, err error) {
atomic.AddInt64(&m.currentUploadSize, -u.Size)
if err != nil {
u.ErrChan <- err
} else {
u.DoneChan <- struct{}{}
}
u.DoneChan <- struct{}{}
}

func (m *MigrationWorker) CloseUploadQueue() {
Expand Down Expand Up @@ -126,9 +130,12 @@ func (m *MigrationWorker) PauseDownload() {
}
}

func (m *MigrationWorker) PushToDownloadQueue(d *DownloadObjectMeta) {
m.downloadQueue <- d
}

func (m *MigrationWorker) DownloadStart(d *DownloadObjectMeta) {
m.incrDownloadConcurrency()
m.downloadQueue <- d
m.updateFileSizeOnDisk(d.Size)
atomic.AddInt64(&m.currentDownloadSize, d.Size)
}
Expand Down
3 changes: 2 additions & 1 deletion util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"strings"
"time"

zlogger "github.com/0chain/s3migration/logger"
"github.com/mitchellh/go-homedir"
"github.com/spf13/viper"
)
Expand Down Expand Up @@ -111,7 +112,7 @@ func ConvertGoSDKTimeToTime(in string) time.Time {
func Retry(attempts int, sleep time.Duration, f func() error) (err error) {
for i := 0; i < attempts; i++ {
if err = f(); err != nil {
log.Println("retrying after error:", err)
zlogger.Logger.Error("retrying after error: ", err)
time.Sleep(sleep)
continue
}
Expand Down