From f48a90263e28755606cc5af441371b194bec8033 Mon Sep 17 00:00:00 2001 From: Md Miran Ahmed Ansari Date: Sun, 23 Jan 2022 14:43:02 +0530 Subject: [PATCH 01/11] migration new flow --- cmd/migration_cmd.go | 2 +- dstorage/dstorage.go | 22 +++++- go.mod | 10 +-- go.sum | 63 ++++++++------- migration/migrate.go | 2 + s3/aws.go | 27 +++++++ util/migration_worker.go | 167 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 259 insertions(+), 34 deletions(-) create mode 100644 util/migration_worker.go diff --git a/cmd/migration_cmd.go b/cmd/migration_cmd.go index 5d2d711..785d046 100644 --- a/cmd/migration_cmd.go +++ b/cmd/migration_cmd.go @@ -206,7 +206,7 @@ var migrateCmd = &cobra.Command{ return err } - return migration.Migrate() + return migration.MigrationStart() }, } diff --git a/dstorage/dstorage.go b/dstorage/dstorage.go index 4c2762d..08f2c56 100644 --- a/dstorage/dstorage.go +++ b/dstorage/dstorage.go @@ -63,6 +63,9 @@ const ( TenMB = 10 * OneMB HundredMB = 10 * TenMB + MaxChunkSize = 2 * TenMB + MinChunkSize = DefaultChunkSize + GetRefRetryWaitTime = 500 * time.Millisecond GetRefRetryCount = 2 ) @@ -85,13 +88,30 @@ func (d *DStorageService) GetFileMetaData(ctx context.Context, remotePath string } } - if len(oResult.Refs) == 0 { + if oResult == nil || len(oResult.Refs) == 0 { return nil, zerror.ErrFileNoExist } return &oResult.Refs[0], nil } +func getChunkSizeNew(size int64, dataShards int) (chunkSize int64) { + + var chunkNum int64 = 1 + for { + chunkSize = (size + int64(dataShards)*chunkNum - 1) / (int64(dataShards) * chunkNum) //equivalent to math.ceil + if chunkSize <= MaxChunkSize { + break + } + chunkNum++ + } + + if chunkSize < MinChunkSize { + chunkSize = MinChunkSize + } + return +} + func getChunkSize(size int64) int64 { var chunkSize int64 switch { diff --git a/go.mod b/go.mod index 499363d..10d45f7 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,11 @@ go 1.16 require ( github.com/0chain/errors v1.0.3 - github.com/0chain/gosdk v1.4.0 - github.com/aws/aws-sdk-go-v2 v1.11.2 - github.com/aws/aws-sdk-go-v2/config v1.1.1 - github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 // indirect - github.com/aws/aws-sdk-go-v2/service/s3 v1.21.0 + github.com/0chain/gosdk v1.4.1-0.20220105140556-1e37d2ba4e8a + github.com/aws/aws-sdk-go-v2 v1.13.0 + github.com/aws/aws-sdk-go-v2/config v1.13.0 + github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.9.0 + github.com/aws/aws-sdk-go-v2/service/s3 v1.24.0 github.com/golang/mock v1.6.0 github.com/mitchellh/go-homedir v1.1.0 github.com/spf13/cobra v1.1.1 diff --git a/go.sum b/go.sum index 3baf9d2..52600c9 100644 --- a/go.sum +++ b/go.sum @@ -49,8 +49,8 @@ collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/0chain/errors v1.0.3 h1:QQZPFxTfnMcRdt32DXbzRQIfGWmBsKoEdszKQDb0rRM= github.com/0chain/errors v1.0.3/go.mod h1:xymD6nVgrbgttWwkpSCfLLEJbFO6iHGQwk/yeSuYkIc= -github.com/0chain/gosdk v1.4.0 h1:H++xkigbb/PCRrbcVwLnd+xBISphSYt08PeNTcfLRQg= -github.com/0chain/gosdk v1.4.0/go.mod h1:2SyFWa4WQxpomgJxBJrMR4ydWLaLMcqkITY4Pduihi8= +github.com/0chain/gosdk v1.4.1-0.20220105140556-1e37d2ba4e8a h1:VYZNUfUTdEB3PLT81bkPWBkRNklUymjdUawAYxhWgyw= +github.com/0chain/gosdk v1.4.1-0.20220105140556-1e37d2ba4e8a/go.mod h1:FB2xXhQyIM1vwvQ1jC98wNclbDTBwqrG+Z/IQC0LaBs= github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4= github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= github.com/Azure/azure-storage-blob-go v0.7.0/go.mod h1:f9YQKtsG1nMisotuTPpO0tjNuEjKRYAcJU8/ydDI++4= @@ -101,39 +101,46 @@ github.com/aws/aws-lambda-go v1.13.3/go.mod h1:4UKl9IzQMoD+QF79YdCuzCwp8VbmG4VAQ github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo= github.com/aws/aws-sdk-go-v2 v0.18.0/go.mod h1:JWVYvqSMppoMJC0x5wdwiImzgXTI9FuZwxzkQq9wy+g= github.com/aws/aws-sdk-go-v2 v1.2.0/go.mod h1:zEQs02YRBw1DjK0PoJv3ygDYOFTre1ejlJWl8FwAuQo= -github.com/aws/aws-sdk-go-v2 v1.11.2 h1:SDiCYqxdIYi6HgQfAWRhgdZrdnOuGyLDJVRSWLeHWvs= -github.com/aws/aws-sdk-go-v2 v1.11.2/go.mod h1:SQfA+m2ltnu1cA0soUkj4dRSsmITiVQUJvBIZjzfPyQ= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0 h1:yVUAwvJC/0WNPbyl0nA3j1L6CW1CN8wBubCRqtG7JLI= -github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.0.0/go.mod h1:Xn6sxgRuIDflLRJFj5Ev7UxABIkNbccFPV/p8itDReM= -github.com/aws/aws-sdk-go-v2/config v1.1.1 h1:ZAoq32boMzcaTW9bcUacBswAmHTbvlvDJICgHFZuECo= +github.com/aws/aws-sdk-go-v2 v1.13.0 h1:1XIXAfxsEmbhbj5ry3D3vX+6ZcUYvIqSm4CWWEuGZCA= +github.com/aws/aws-sdk-go-v2 v1.13.0/go.mod h1:L6+ZpqHaLbAaxsqV0L4cvxZY7QupWJB4fhkf8LXvC7w= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.2.0 h1:scBthy70MB3m4LCMFaBcmYCyR2XWOz6MxSfdSu/+fQo= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.2.0/go.mod h1:oZHzg1OVbuCiRTY0oRPM+c2HQvwnFCGJwKeSqqAJ/yM= github.com/aws/aws-sdk-go-v2/config v1.1.1/go.mod h1:0XsVy9lBI/BCXm+2Tuvt39YmdHwS5unDQmxZOYe8F5Y= -github.com/aws/aws-sdk-go-v2/credentials v1.1.1 h1:NbvWIM1Mx6sNPTxowHgS2ewXCRp+NGTzUYb/96FZJbY= +github.com/aws/aws-sdk-go-v2/config v1.13.0 h1:1ij3YPk13RrIn1h+pH+dArh3lNPD5JSAP+ifOkNhnB0= +github.com/aws/aws-sdk-go-v2/config v1.13.0/go.mod h1:Pjv2OafecIn+4miw9VFDCr06YhKyf/oKOkIcpQOgWKk= github.com/aws/aws-sdk-go-v2/credentials v1.1.1/go.mod h1:mM2iIjwl7LULWtS6JCACyInboHirisUUdkBPoTHMOUo= -github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.2 h1:EtEU7WRaWliitZh2nmuxEXrN0Cb8EgPUFGIoTMeqbzI= +github.com/aws/aws-sdk-go-v2/credentials v1.8.0 h1:8Ow0WcyDesGNL0No11jcgb1JAtE+WtubqXjgxau+S0o= +github.com/aws/aws-sdk-go-v2/credentials v1.8.0/go.mod h1:gnMo58Vwx3Mu7hj1wpcG8DI0s57c9o42UQ6wgTQT5to= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.0.2/go.mod h1:3hGg3PpiEjHnrkrlasTfxFqUsZ2GCk/fMUn4CbKgSkM= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2 h1:XJLnluKuUxQG255zPNe+04izXl7GSyUVafIsgfv9aw4= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.2/go.mod h1:SgKKNBIoDC/E1ZCDhhMW3yalWjwuLjMcpLzsM/QQnWo= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2 h1:EauRoYZVNPlidZSZJDscjJBQ22JhVF2+tdteatax2Ak= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.0.2/go.mod h1:xT4XX6w5Sa3dhg50JrYyy3e4WPYo/+WjY/BXtqXVunU= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2 h1:IQup8Q6lorXeiA/rK72PeToWoWK8h7VAPgHNWdSrtgE= -github.com/aws/aws-sdk-go-v2/internal/ini v1.3.2/go.mod h1:VITe/MdW6EMXPb0o0txu/fsonXbMHUU2OC2Qp7ivU4o= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0 h1:lPLbw4Gn59uoKqvOfSnkJr54XWk5Ak1NK20ZEiSWb3U= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.5.0/go.mod h1:80NaCIH9YU3rzTTs/J/ECATjXuRqzo/wB6ukO6MZ0XY= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.10.0 h1:NITDuUZO34mqtOwFWZiXo7yAHj7kf+XPE+EiKuCBNUI= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.10.0/go.mod h1:I6/fHT/fH460v09eg2gVrd8B/IqskhNdpcLH0WNO3QI= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.9.0 h1:dQYWipBpXgvM+6jz/qxBdNuI+nnerQUazRk5PmTLHlA= +github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.9.0/go.mod h1:2Dy23n/UBFBS9MacM+C/Tgupmq7viabiaHlfdjeN3hk= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.4 h1:CRiQJ4E2RhfDdqbie1ZYDo8QtIo75Mk7oTdJSfwJTMQ= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.4/go.mod h1:XHgQ7Hz2WY2GAn//UXHofLfPXWh+s62MbMOijrg12Lw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.2.0 h1:3ADoioDMOtF4uiK59vCpplpCwugEU+v4ZFD29jDL3RQ= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.2.0/go.mod h1:BsCSJHx5DnDXIrOcqB8KN1/B+hXLG/bi4Y6Vjcx/x9E= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.4 h1:0NrDHIwS1LIR750ltj6ciiu4NZLpr9rgq8vHi/4QD4s= +github.com/aws/aws-sdk-go-v2/internal/ini v1.3.4/go.mod h1:R3sWUqPcfXSiF/LSFJhjyJmpg9uV6yP2yv3YZZjldVI= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.7.0 h1:F1diQIOkNn8jcez4173r+PLPdkWK7chy74r3fKpDrLI= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.7.0/go.mod h1:8ctElVINyp+SjhoZZceUAZw78glZH6R8ox5MVNu5j2s= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.0.2/go.mod h1:45MfaXZ0cNbeuT0KQ1XJylq8A6+OpVV2E5kvY/Kq+u8= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2 h1:CKdUNKmuilw/KNmO2Q53Av8u+ZyXMC2M9aX8Z+c/gzg= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.5.2/go.mod h1:FgR1tCsn8C6+Hf+N5qkfrE4IXvUL1RgW87sunJ+5J4I= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.9.2 h1:GnPGH1FGc4fkn0Jbm/8r2+nPOwSJjYPyHSqFSvY1ii8= -github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.9.2/go.mod h1:eDUYjOYt4Uio7xfHi5jOsO393ZG8TSfZB92a3ZNadWM= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.7.0 h1:4QAOB3KrvI1ApJK14sliGr3Ie2pjyvNypn/lfzDHfUw= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.7.0/go.mod h1:K/qPe6AP2TGYv4l6n7c88zh9jWBDf6nHhvg1fx/EWfU= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.11.0 h1:XAe+PDnaBELHr25qaJKfB415V4CKFWE8H+prUreql8k= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.11.0/go.mod h1:RMlgnt1LbOT2BxJ3cdw+qVz7KL84714LFkWtF6sLI7A= github.com/aws/aws-sdk-go-v2/service/route53 v1.1.1/go.mod h1:rLiOUrPLW/Er5kRcQ7NkwbjlijluLsrIbu/iyl35RO4= -github.com/aws/aws-sdk-go-v2/service/s3 v1.21.0 h1:vUM2P60BI755i35Gyik4s/lXKcnpEbnvw2Vud+soqpI= -github.com/aws/aws-sdk-go-v2/service/s3 v1.21.0/go.mod h1:lQ5AeEW2XWzu8hwQ3dCqZFWORQ3RntO0Kq135Xd9VCo= -github.com/aws/aws-sdk-go-v2/service/sso v1.1.1 h1:37QubsarExl5ZuCBlnRP+7l1tNwZPBSTqpTBrPH98RU= +github.com/aws/aws-sdk-go-v2/service/s3 v1.24.0 h1:REKac2iT0HYxUSzqOSuncnmsZnE3m4MlGfo1dOUN3vg= +github.com/aws/aws-sdk-go-v2/service/s3 v1.24.0/go.mod h1:oIUXg/5F0x0gy6nkwEnlxZboueddwPEKO6Xl+U6/3a0= github.com/aws/aws-sdk-go-v2/service/sso v1.1.1/go.mod h1:SuZJxklHxLAXgLTc1iFXbEWkXs7QRTQpCLGaKIprQW0= -github.com/aws/aws-sdk-go-v2/service/sts v1.1.1 h1:TJoIfnIFubCX0ACVeJ0w46HEH5MwjwYN4iFhuYIhfIY= +github.com/aws/aws-sdk-go-v2/service/sso v1.9.0 h1:1qLJeQGBmNQW3mBNzK2CFmrQNmoXWrscPqsrAaU1aTA= +github.com/aws/aws-sdk-go-v2/service/sso v1.9.0/go.mod h1:vCV4glupK3tR7pw7ks7Y4jYRL86VvxS+g5qk04YeWrU= github.com/aws/aws-sdk-go-v2/service/sts v1.1.1/go.mod h1:Wi0EBZwiz/K44YliU0EKxqTCJGUfYTWXrrBwkq736bM= +github.com/aws/aws-sdk-go-v2/service/sts v1.14.0 h1:ksiDXhvNYg0D2/UFkLejsaz3LqpW5yjNQ8Nx9Sn2c0E= +github.com/aws/aws-sdk-go-v2/service/sts v1.14.0/go.mod h1:u0xMJKDvvfocRjiozsoZglVNXRG19043xzp3r2ivLIk= github.com/aws/smithy-go v1.1.0/go.mod h1:EzMw8dbp/YJL4A5/sbhGddag+NPT7q084agLbB9LgIw= -github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58= -github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= +github.com/aws/smithy-go v1.10.0 h1:gsoZQMNHnX+PaghNw4ynPsyGP7aUCqx5sY2dlPQsZ0w= +github.com/aws/smithy-go v1.10.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= @@ -483,7 +490,9 @@ github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1C github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k= +github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= +github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= diff --git a/migration/migrate.go b/migration/migrate.go index 4ff612a..e040df9 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -61,6 +61,7 @@ type Migration struct { migrateTo string workDir string deleteSource bool + bucket string } func InitMigration(mConfig *MigrationConfig) error { @@ -105,6 +106,7 @@ func InitMigration(mConfig *MigrationConfig) error { migrateTo: mConfig.MigrateToPath, deleteSource: mConfig.DeleteSource, workDir: mConfig.WorkDir, + bucket: mConfig.Bucket, } rootContext, rootContextCancel = context.WithCancel(context.Background()) diff --git a/s3/aws.go b/s3/aws.go index 742618e..f35b27f 100644 --- a/s3/aws.go +++ b/s3/aws.go @@ -6,11 +6,13 @@ import ( "io" "os" "path/filepath" + "strings" "time" zlogger "github.com/0chain/s3migration/logger" "github.com/aws/aws-sdk-go-v2/aws" awsConfig "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/feature/s3/manager" awsS3 "github.com/aws/aws-sdk-go-v2/service/s3" ) @@ -19,6 +21,7 @@ type AwsI interface { ListFilesInBucket(ctx context.Context) (<-chan *ObjectMeta, <-chan error) GetFileContent(ctx context.Context, objectKey string) (*Object, error) DeleteFile(ctx context.Context, objectKey string) error + DownloadToFile(ctx context.Context, objectKey string) (string, error) } type Object struct { @@ -43,6 +46,7 @@ type AwsClient struct { newerThan *time.Time olderThan *time.Time client *awsS3.Client + downloader *manager.Downloader } func GetAwsClient(bucket, prefix, region string, deleteSource bool, newerThan, olderThan *time.Time, startAfter, workDir string) (*AwsClient, error) { @@ -98,6 +102,11 @@ func GetAwsClient(bucket, prefix, region string, deleteSource bool, newerThan, o } } + awsClient.downloader = manager.NewDownloader(awsClient.client, func(u *manager.Downloader) { + u.PartSize = 5 * 1024 * 1024 + u.Concurrency = 100 + }) + zlogger.Logger.Info(fmt.Sprintf( "Aws client initialized with"+ "bucket: %v,"+ @@ -216,3 +225,21 @@ func (a *AwsClient) DeleteFile(ctx context.Context, objectKey string) error { }) return err } + +func (a *AwsClient) DownloadToFile(ctx context.Context, objectKey string) (string, error) { + params := &awsS3.GetObjectInput{ + Bucket: aws.String(a.bucket), + Key: aws.String(objectKey), + } + + fileName := strings.ReplaceAll(objectKey, "/", "") + downloadPath := filepath.Join(a.workDir, fileName) + f, err := os.Create(downloadPath) + if err != nil { + return downloadPath, err + } + + defer f.Close() + _, err = a.downloader.Download(ctx, f, params) + return downloadPath, err +} diff --git a/util/migration_worker.go b/util/migration_worker.go new file mode 100644 index 0000000..edc15e0 --- /dev/null +++ b/util/migration_worker.go @@ -0,0 +1,167 @@ +package util + +import ( + "sync" + "sync/atomic" + "time" +) + +const ( + downloadConcurrencyLimit = 30 + fileSizeLimit = int64(1024*1024) * int64(1024) * int64(5) + uploadConcurrencyLimit = 10 + uploadSizeLimit = int64(1024*1024) * int64(500) + downloadSizeLimit = int64(1024*1024) * int64(500) +) + +type MigrationQueue struct { + diskMutex *sync.RWMutex + errMutex *sync.RWMutex + currentFileSizeOnDisk int64 + downloadQueue chan *DownloadObjectMeta + uploadQueue chan *UploadObjectMeta + downloadConcurrency int32 + uploadConcurrency int32 + errInSystem error + currentUploadSize int64 + currentDownloadSize int64 +} + +type DownloadObjectMeta struct { + ObjectKey string + Size int64 + LocalPath string + DoneChan chan struct{} + ErrChan chan error + IsFileAlreadyExist bool +} + +type UploadObjectMeta struct { + ObjectKey string + Size int64 + DoneChan chan struct{} + ErrChan chan error +} + +func NewMigrationQueue() *MigrationQueue { + return &MigrationQueue{ + diskMutex: &sync.RWMutex{}, + errMutex: &sync.RWMutex{}, + downloadQueue: make(chan *DownloadObjectMeta, 10000), + uploadQueue: make(chan *UploadObjectMeta, 10000), + } +} + +func (m *MigrationQueue) updateFileSizeOnDisk(size int64) { + m.diskMutex.Lock() + m.currentFileSizeOnDisk += size + m.diskMutex.Unlock() +} + +func (m *MigrationQueue) GetDownloadQueue() <-chan *DownloadObjectMeta { + return m.downloadQueue +} + +func (m *MigrationQueue) GetUploadQueue() <-chan *UploadObjectMeta { + return m.uploadQueue +} + +func (m *MigrationQueue) incrUploadConcurrency() { + atomic.AddInt32(&m.uploadConcurrency, 1) +} + +func (m *MigrationQueue) decrUploadConcurrency() { + atomic.AddInt32(&m.uploadConcurrency, -1) +} + +func (m *MigrationQueue) checkUploadStatus() bool { + return atomic.LoadInt32(&m.uploadConcurrency) >= uploadConcurrencyLimit || atomic.LoadInt64(&m.currentUploadSize) >= uploadSizeLimit +} + +func (m *MigrationQueue) PauseUpload() { + for m.checkUploadStatus() { + time.Sleep(5 * time.Second) + } +} + +func (m *MigrationQueue) UploadStart(u *UploadObjectMeta) { + m.incrUploadConcurrency() + atomic.AddInt64(&m.currentUploadSize, u.Size) + m.uploadQueue <- u +} + +func (m *MigrationQueue) UploadDone(u *UploadObjectMeta, err error) { + m.updateFileSizeOnDisk(-u.Size) + m.decrUploadConcurrency() + atomic.AddInt64(&m.currentUploadSize, -u.Size) + if err != nil { + u.ErrChan <- err + } + u.DoneChan <- struct{}{} +} + +func (m *MigrationQueue) CloseUploadQueue() { + close(m.uploadQueue) +} + +func (m *MigrationQueue) incrDownloadConcurrency() { + atomic.AddInt32(&m.downloadConcurrency, 1) +} + +func (m *MigrationQueue) decrDownloadConcurrency() { + atomic.AddInt32(&m.downloadConcurrency, -1) +} + +func (m *MigrationQueue) checkDownloadStatus() bool { + m.diskMutex.RLock() + defer m.diskMutex.RUnlock() + return m.currentFileSizeOnDisk >= fileSizeLimit || + atomic.LoadInt32(&m.downloadConcurrency) >= downloadConcurrencyLimit || + atomic.LoadInt64(&m.currentDownloadSize) >= downloadSizeLimit +} + +func (m *MigrationQueue) PauseDownload() { + for m.checkDownloadStatus() { + time.Sleep(5 * time.Second) + } +} + +func (m *MigrationQueue) DownloadStart(d *DownloadObjectMeta) { + m.incrDownloadConcurrency() + m.downloadQueue <- d + m.updateFileSizeOnDisk(d.Size) + atomic.AddInt64(&m.currentDownloadSize, d.Size) +} + +func (m *MigrationQueue) DownloadDone(d *DownloadObjectMeta, localPath string, err error) { + m.decrDownloadConcurrency() + atomic.AddInt64(&m.currentDownloadSize, -d.Size) + if err != nil { + d.ErrChan <- err + } else { + d.LocalPath = localPath + d.DoneChan <- struct{}{} + } +} + +func (m *MigrationQueue) CloseDownloadQueue() { + close(m.downloadQueue) +} + +func (m *MigrationQueue) GetMigrationError() error { + m.errMutex.RLock() + defer m.errMutex.RUnlock() + return m.errInSystem +} + +func (m *MigrationQueue) IsMigrationError() bool { + return m.GetMigrationError() != nil +} + +func (m *MigrationQueue) SetMigrationError(err error) { + if err != nil { + m.errMutex.Lock() + defer m.errMutex.Unlock() + m.errInSystem = err + } +} From 5cffa2b7da8ae35a8a55f651a68bc7ce25f5b270 Mon Sep 17 00:00:00 2001 From: Md Miran Ahmed Ansari Date: Sun, 23 Jan 2022 14:44:20 +0530 Subject: [PATCH 02/11] migration new filw --- migration/migrate_new.go | 215 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 215 insertions(+) create mode 100644 migration/migrate_new.go diff --git a/migration/migrate_new.go b/migration/migrate_new.go new file mode 100644 index 0000000..0724e7f --- /dev/null +++ b/migration/migrate_new.go @@ -0,0 +1,215 @@ +package migration + +import ( + "context" + "github.com/0chain/gosdk/zboxcore/zboxutil" + zlogger "github.com/0chain/s3migration/logger" + "github.com/0chain/s3migration/util" + "os" + "path/filepath" + "strconv" + "sync" + "time" +) + +func (m *Migration) DownloadWorker(ctx context.Context, migrator *util.MigrationQueue) { + defer migrator.CloseDownloadQueue() + objCh, errCh := migration.awsStore.ListFilesInBucket(rootContext) + wg := &sync.WaitGroup{} + for obj := range objCh { + migrator.PauseDownload() + if migrator.IsMigrationError() { + return + } + wg.Add(1) + + downloadObjMeta := &util.DownloadObjectMeta{ + ObjectKey: obj.Key, + Size: obj.Size, + DoneChan: make(chan struct{}, 1), + ErrChan: make(chan error, 1), + } + + go func() { + defer wg.Done() + err := checkIsFileExist(ctx, downloadObjMeta) + if err != nil { + migrator.SetMigrationError(err) + return + } + if downloadObjMeta.IsFileAlreadyExist && migration.skip == Skip { + zlogger.Logger.Info("Skipping migration of object" + downloadObjMeta.ObjectKey) + return + } + migrator.DownloadStart(downloadObjMeta) + zlogger.Logger.Info("download start", downloadObjMeta.ObjectKey, downloadObjMeta.Size) + downloadPath, err := m.awsStore.DownloadToFile(ctx, downloadObjMeta.ObjectKey) + migrator.DownloadDone(downloadObjMeta, downloadPath, err) + migrator.SetMigrationError(err) + zlogger.Logger.Info("download done", downloadObjMeta.ObjectKey, downloadObjMeta.Size, err) + }() + time.Sleep(1 * time.Second) + } + wg.Wait() + select { + case err := <-errCh: + if err != nil { + migrator.SetMigrationError(err) + } + } +} + +func (m *Migration) UploadWorker(ctx context.Context, migrator *util.MigrationQueue) { + defer migrator.CloseUploadQueue() + downloadQueue := migrator.GetDownloadQueue() + wg := &sync.WaitGroup{} + for d := range downloadQueue { + migrator.PauseUpload() + downloadObj := d + uploadObj := &util.UploadObjectMeta{ + ObjectKey: downloadObj.ObjectKey, + DoneChan: make(chan struct{}, 1), + ErrChan: make(chan error, 1), + Size: downloadObj.Size, + } + wg.Add(1) + go func() { + defer wg.Done() + err := checkDownloadStatus(downloadObj) + if err != nil { + migrator.SetMigrationError(err) + return + } + defer os.Remove(downloadObj.LocalPath) + migrator.UploadStart(uploadObj) + zlogger.Logger.Info("upload start", uploadObj.ObjectKey, uploadObj.Size) + err = util.Retry(3, time.Second*5, func() error { + var err error + err = processUpload(ctx, downloadObj) + return err + }) + migrator.UploadDone(uploadObj, err) + migrator.SetMigrationError(err) + zlogger.Logger.Info("upload done", uploadObj.ObjectKey, uploadObj.Size, err) + }() + time.Sleep(1 * time.Second) + } + wg.Wait() +} + +func getRemotePath(objectKey string) string { + return filepath.Join(migration.migrateTo, migration.bucket, objectKey) +} + +func checkIsFileExist(ctx context.Context, downloadObj *util.DownloadObjectMeta) error { + remotePath := getRemotePath(downloadObj.ObjectKey) + + var isFileExist bool + var err error + err = util.Retry(3, time.Second*5, func() error { + var err error + isFileExist, err = migration.zStore.IsFileExist(ctx, remotePath) + return err + }) + + if err != nil { + zlogger.Logger.Error(err) + return err + } + + downloadObj.IsFileAlreadyExist = isFileExist + return nil +} + +func checkDownloadStatus(downloadObj *util.DownloadObjectMeta) error { + select { + case <-downloadObj.DoneChan: + return nil + case err := <-downloadObj.ErrChan: + return err + } +} + +func processUpload(ctx context.Context, downloadObj *util.DownloadObjectMeta) error { + remotePath := getRemotePath(downloadObj.ObjectKey) + + fileObj, err := os.Open(downloadObj.LocalPath) + if err != nil { + zlogger.Logger.Error(err) + return err + } + + defer fileObj.Close() + + fileInfo, err := fileObj.Stat() + mimeType, err := zboxutil.GetFileContentType(fileObj) + if err != nil { + zlogger.Logger.Error(err) + return err + } + + if downloadObj.IsFileAlreadyExist { + switch migration.skip { + case Replace: + zlogger.Logger.Info("Replacing object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) + err = migration.zStore.Replace(ctx, remotePath, fileObj, fileInfo.Size(), mimeType) + case Duplicate: + zlogger.Logger.Info("Duplicating object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) + err = migration.zStore.Duplicate(ctx, remotePath, fileObj, fileInfo.Size(), mimeType) + } + } else { + zlogger.Logger.Info("Uploading object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) + err = migration.zStore.Upload(ctx, remotePath, fileObj, fileInfo.Size(), mimeType, false) + } + + if err != nil { + zlogger.Logger.Error(err) + return err + } else { + if migration.deleteSource { + migration.awsStore.DeleteFile(ctx, downloadObj.ObjectKey) + } + migration.szCtMu.Lock() + migration.migratedSize += uint64(downloadObj.Size) + migration.totalMigratedObjects++ + migration.szCtMu.Unlock() + return nil + } +} + +func (m *Migration) UpdateStateFile(migrateHandler *util.MigrationQueue) { + updateState, closeStateFile, err := updateStateKeyFunc(migration.stateFilePath) + if err != nil { + migrateHandler.SetMigrationError(err) + return + } + defer closeStateFile() + uploadQueue := migrateHandler.GetUploadQueue() + for u := range uploadQueue { + select { + case <-u.DoneChan: + updateState(u.ObjectKey) + case <-u.ErrChan: + return + } + } + return +} + +func MigrationStart() error { + defer func(start time.Time) { + zlogger.Logger.Info("time taken: ", time.Now().Sub(start)) + }(time.Now()) + + migrateHandler := util.NewMigrationQueue() + go migration.DownloadWorker(rootContext, migrateHandler) + go migration.UploadWorker(rootContext, migrateHandler) + migration.UpdateStateFile(migrateHandler) + err := migrateHandler.GetMigrationError() + if err != nil { + zlogger.Logger.Error("Error while migration, err", err) + } + zlogger.Logger.Info("Total migrated objects: ", migration.totalMigratedObjects) + zlogger.Logger.Info("Total migrated size: ", migration.migratedSize) + return err +} From 14e44ec5d258523c12347a3bad09de38111a316f Mon Sep 17 00:00:00 2001 From: Md Miran Ahmed Ansari Date: Fri, 28 Jan 2022 19:03:46 +0530 Subject: [PATCH 03/11] migration changes --- cmd/migration_cmd.go | 2 +- migration/migrate.go | 355 ++++++++++++++++++------------------- migration/migrate_new.go | 215 ---------------------- s3/aws_integration_test.go | 13 ++ s3/mocks/mock_aws.go | 15 ++ util/migration_worker.go | 48 ++--- 6 files changed, 224 insertions(+), 424 deletions(-) delete mode 100644 migration/migrate_new.go diff --git a/cmd/migration_cmd.go b/cmd/migration_cmd.go index 785d046..627e127 100644 --- a/cmd/migration_cmd.go +++ b/cmd/migration_cmd.go @@ -206,7 +206,7 @@ var migrateCmd = &cobra.Command{ return err } - return migration.MigrationStart() + return migration.StartMigration() }, } diff --git a/migration/migrate.go b/migration/migrate.go index e040df9..a63c0da 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -10,7 +10,7 @@ import ( "syscall" "time" - "github.com/0chain/errors" + "github.com/0chain/gosdk/zboxcore/zboxutil" dStorage "github.com/0chain/s3migration/dstorage" zlogger "github.com/0chain/s3migration/logger" "github.com/0chain/s3migration/s3" @@ -124,175 +124,6 @@ func InitMigration(mConfig *MigrationConfig) error { return nil } -type migratingObjStatus struct { - objectKey string - successCh chan struct{} - errCh chan error //should be of type zerror -} - -func processMigrationBatch(objList []*s3.ObjectMeta, migrationStatuses []*migratingObjStatus, batchSize int64) (stateKey string, batchProcessSuccess bool) { - if err := migration.zStore.UpdateAllocationDetails(); err != nil { - zlogger.Logger.Error("Error while updating allocation details; ", err) - abandonAllOperations(err) - return - } - - availableStorage := migration.zStore.GetAvailableSpace() - - if availableStorage < batchSize { - zlogger.Logger.Error(fmt.Sprintf("Insufficient Space available space: %v, batchStorageSpace: %v", availableStorage, batchSize)) - abandonAllOperations(errors.New(zerror.InsufficientZStorageSpace, fmt.Sprintf("Available: %v, Batch Size: %v", availableStorage, batchSize))) - return - } - - wg := sync.WaitGroup{} - for i := 0; i < len(objList); i++ { - obj := objList[i] - zlogger.Logger.Info("Migrating ", obj.Key) - wg.Add(1) - status := migrationStatuses[i] - status.objectKey = obj.Key - status.successCh = make(chan struct{}, 1) - status.errCh = make(chan error, 1) - go func() { - defer wg.Done() - err := util.Retry(3, time.Second*5, func() error { - err := migrateObject(obj, rootContext) - return err - }) - if err != nil { - status.errCh <- err - } else { - status.successCh <- struct{}{} - migration.szCtMu.Lock() - migration.migratedSize += uint64(obj.Size) - migration.totalMigratedObjects++ - migration.szCtMu.Unlock() - } - }() - } - wg.Wait() - - stateKey, unresolvedError := checkStatuses(migrationStatuses[:len(objList)]) - - if unresolvedError != nil { - //break migration - abandonAllOperations(unresolvedError) - return - } - batchProcessSuccess = true - return -} - -func Migrate() error { - defer rootContextCancel() - - if !isMigrationInitialized { - return fmt.Errorf("migration is not initialized") - } - - updateState, closeStateFile, err := updateStateKeyFunc(migration.stateFilePath) - if err != nil { - return fmt.Errorf("could not create state file path. Error: %v", err) - } - defer closeStateFile() - - objCh, errCh := migration.awsStore.ListFilesInBucket(rootContext) - - var count, batchCount int - - objectList := make([]*s3.ObjectMeta, 10) - migrationStatuses := make([]*migratingObjStatus, 10) - makeMigrationStatuses := func() { - for i := 0; i < 10; i++ { - migrationStatuses[i] = new(migratingObjStatus) - } - } - makeMigrationStatuses() - batchConcurrency := 10 - var batchSize int64 - var migrationSuccess bool - var stateKey string - for obj := range objCh { - objectList[count] = obj - count++ - batchSize += obj.Size - if count == batchConcurrency { - batchCount++ - stateKey, migrationSuccess = processMigrationBatch(objectList[:count], migrationStatuses, batchSize) - if !migrationSuccess { - count = 0 - break - } - - count = 0 - batchSize = 0 - - zlogger.Logger.Info("New State Key: ", stateKey) - updateState(stateKey) - time.Sleep(100 * time.Millisecond) - } - } - - if count != 0 { //last batch that is not multiple of 10 - batchCount++ - stateKey, migrationSuccess = processMigrationBatch(objectList[:count], migrationStatuses, batchSize) - if migrationSuccess { - updateState(stateKey) - } - - } - - zlogger.Logger.Info("Total migrated objects: ", migration.totalMigratedObjects) - zlogger.Logger.Info("Total migrated size: ", migration.migratedSize) - - select { - case err = <-errCh: - if err != nil { - zlogger.Logger.Error("Could not fetch all objects. Error: ", err) - } else { - zlogger.Logger.Info("Got object from s3 without error") - } - case <-rootContext.Done(): - zlogger.Logger.Error("Error: context cancelled") - err = rootContext.Err() - } - - if !migrationSuccess && err == nil { - return context.Canceled - } - - return err -} - -func checkStatuses(statuses []*migratingObjStatus) (stateKey string, unresolvedError error) { - for _, mgrtStatus := range statuses { - select { - case <-mgrtStatus.successCh: - stateKey = mgrtStatus.objectKey - - case err := <-mgrtStatus.errCh: - unresolvedError = err - if resolveError(mgrtStatus.objectKey, err) { - stateKey = mgrtStatus.objectKey - unresolvedError = nil - } else { - return - } - } - } - - return -} - -func resolveError(objectKey string, err error) (isErrorResolved bool) { - switch err.(type) { - - } - - return -} - var updateStateKeyFunc = func(statePath string) (func(stateKey string), func(), error) { f, err := os.Create(statePath) if err != nil { @@ -334,39 +165,172 @@ var updateStateKeyFunc = func(statePath string) (func(stateKey string), func(), return stateKeyUpdater, fileCloser, nil } -func migrateObject(objMeta *s3.ObjectMeta, ctx context.Context) error { - remotePath := filepath.Join(migration.migrateTo, objMeta.Key) +func StartMigration() error { + defer func(start time.Time) { + zlogger.Logger.Info("time taken: ", time.Now().Sub(start)) + }(time.Now()) + + migrationWorker := util.NewMigrationWorker() + go migration.DownloadWorker(rootContext, migrationWorker) + go migration.UploadWorker(rootContext, migrationWorker) + migration.UpdateStateFile(migrationWorker) + err := migrationWorker.GetMigrationError() + if err != nil { + zlogger.Logger.Error("Error while migration, err", err) + } + zlogger.Logger.Info("Total migrated objects: ", migration.totalMigratedObjects) + zlogger.Logger.Info("Total migrated size: ", migration.migratedSize) + return err +} + +func (m *Migration) DownloadWorker(ctx context.Context, migrator *util.MigrationWorker) { + defer migrator.CloseDownloadQueue() + objCh, errCh := migration.awsStore.ListFilesInBucket(rootContext) + wg := &sync.WaitGroup{} + for obj := range objCh { + migrator.PauseDownload() + if migrator.IsMigrationError() { + return + } + wg.Add(1) + + downloadObjMeta := &util.DownloadObjectMeta{ + ObjectKey: obj.Key, + Size: obj.Size, + DoneChan: make(chan struct{}, 1), + ErrChan: make(chan error, 1), + } + + go func() { + defer wg.Done() + err := checkIsFileExist(ctx, downloadObjMeta) + if err != nil { + migrator.SetMigrationError(err) + return + } + if downloadObjMeta.IsFileAlreadyExist && migration.skip == Skip { + zlogger.Logger.Info("Skipping migration of object" + downloadObjMeta.ObjectKey) + return + } + migrator.DownloadStart(downloadObjMeta) + zlogger.Logger.Info("download start", downloadObjMeta.ObjectKey, downloadObjMeta.Size) + downloadPath, err := m.awsStore.DownloadToFile(ctx, downloadObjMeta.ObjectKey) + migrator.DownloadDone(downloadObjMeta, downloadPath, err) + migrator.SetMigrationError(err) + zlogger.Logger.Info("download done", downloadObjMeta.ObjectKey, downloadObjMeta.Size, err) + }() + time.Sleep(1 * time.Second) + } + wg.Wait() + select { + case err := <-errCh: + if err != nil { + migrator.SetMigrationError(err) + } + } +} + +func (m *Migration) UploadWorker(ctx context.Context, migrator *util.MigrationWorker) { + defer migrator.CloseUploadQueue() + downloadQueue := migrator.GetDownloadQueue() + wg := &sync.WaitGroup{} + for d := range downloadQueue { + migrator.PauseUpload() + downloadObj := d + uploadObj := &util.UploadObjectMeta{ + ObjectKey: downloadObj.ObjectKey, + DoneChan: make(chan struct{}, 1), + ErrChan: make(chan error, 1), + Size: downloadObj.Size, + } + wg.Add(1) + go func() { + defer wg.Done() + err := checkDownloadStatus(downloadObj) + if err != nil { + migrator.SetMigrationError(err) + return + } + defer os.Remove(downloadObj.LocalPath) + migrator.UploadStart(uploadObj) + zlogger.Logger.Info("upload start", uploadObj.ObjectKey, uploadObj.Size) + err = util.Retry(3, time.Second*5, func() error { + var err error + err = processUpload(ctx, downloadObj) + return err + }) + migrator.UploadDone(uploadObj, err) + migrator.SetMigrationError(err) + zlogger.Logger.Info("upload done", uploadObj.ObjectKey, uploadObj.Size, err) + }() + time.Sleep(1 * time.Second) + } + wg.Wait() +} + +func getRemotePath(objectKey string) string { + return filepath.Join(migration.migrateTo, migration.bucket, objectKey) +} + +func checkIsFileExist(ctx context.Context, downloadObj *util.DownloadObjectMeta) error { + remotePath := getRemotePath(downloadObj.ObjectKey) - isFileExist, err := migration.zStore.IsFileExist(ctx, remotePath) + var isFileExist bool + var err error + err = util.Retry(3, time.Second*5, func() error { + var err error + isFileExist, err = migration.zStore.IsFileExist(ctx, remotePath) + return err + }) if err != nil { zlogger.Logger.Error(err) return err } - if isFileExist && migration.skip == Skip { - zlogger.Logger.Info("Skipping migration of object" + objMeta.Key) + downloadObj.IsFileAlreadyExist = isFileExist + return nil +} + +func checkDownloadStatus(downloadObj *util.DownloadObjectMeta) error { + select { + case <-downloadObj.DoneChan: return nil + case err := <-downloadObj.ErrChan: + return err + } +} + +func processUpload(ctx context.Context, downloadObj *util.DownloadObjectMeta) error { + remotePath := getRemotePath(downloadObj.ObjectKey) + + fileObj, err := os.Open(downloadObj.LocalPath) + if err != nil { + zlogger.Logger.Error(err) + return err } - obj, err := migration.awsStore.GetFileContent(ctx, objMeta.Key) + defer fileObj.Close() + + fileInfo, err := fileObj.Stat() + mimeType, err := zboxutil.GetFileContentType(fileObj) if err != nil { zlogger.Logger.Error(err) return err } - if isFileExist { + if downloadObj.IsFileAlreadyExist { switch migration.skip { case Replace: - zlogger.Logger.Info("Replacing object" + objMeta.Key + " size " + strconv.FormatInt(objMeta.Size, 10)) - err = migration.zStore.Replace(ctx, remotePath, obj.Body, objMeta.Size, obj.ContentType) + zlogger.Logger.Info("Replacing object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) + err = migration.zStore.Replace(ctx, remotePath, fileObj, fileInfo.Size(), mimeType) case Duplicate: - zlogger.Logger.Info("Duplicating object" + objMeta.Key + " size " + strconv.FormatInt(objMeta.Size, 10)) - err = migration.zStore.Duplicate(ctx, remotePath, obj.Body, objMeta.Size, obj.ContentType) + zlogger.Logger.Info("Duplicating object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) + err = migration.zStore.Duplicate(ctx, remotePath, fileObj, fileInfo.Size(), mimeType) } } else { - zlogger.Logger.Info("Uploading object" + objMeta.Key + " size " + strconv.FormatInt(objMeta.Size, 10)) - err = migration.zStore.Upload(ctx, remotePath, obj.Body, objMeta.Size, obj.ContentType, false) + zlogger.Logger.Info("Uploading object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) + err = migration.zStore.Upload(ctx, remotePath, fileObj, fileInfo.Size(), mimeType, false) } if err != nil { @@ -374,8 +338,31 @@ func migrateObject(objMeta *s3.ObjectMeta, ctx context.Context) error { return err } else { if migration.deleteSource { - migration.awsStore.DeleteFile(ctx, objMeta.Key) + migration.awsStore.DeleteFile(ctx, downloadObj.ObjectKey) } + migration.szCtMu.Lock() + migration.migratedSize += uint64(downloadObj.Size) + migration.totalMigratedObjects++ + migration.szCtMu.Unlock() return nil } } + +func (m *Migration) UpdateStateFile(migrateHandler *util.MigrationWorker) { + updateState, closeStateFile, err := updateStateKeyFunc(migration.stateFilePath) + if err != nil { + migrateHandler.SetMigrationError(err) + return + } + defer closeStateFile() + uploadQueue := migrateHandler.GetUploadQueue() + for u := range uploadQueue { + select { + case <-u.DoneChan: + updateState(u.ObjectKey) + case <-u.ErrChan: + return + } + } + return +} diff --git a/migration/migrate_new.go b/migration/migrate_new.go deleted file mode 100644 index 0724e7f..0000000 --- a/migration/migrate_new.go +++ /dev/null @@ -1,215 +0,0 @@ -package migration - -import ( - "context" - "github.com/0chain/gosdk/zboxcore/zboxutil" - zlogger "github.com/0chain/s3migration/logger" - "github.com/0chain/s3migration/util" - "os" - "path/filepath" - "strconv" - "sync" - "time" -) - -func (m *Migration) DownloadWorker(ctx context.Context, migrator *util.MigrationQueue) { - defer migrator.CloseDownloadQueue() - objCh, errCh := migration.awsStore.ListFilesInBucket(rootContext) - wg := &sync.WaitGroup{} - for obj := range objCh { - migrator.PauseDownload() - if migrator.IsMigrationError() { - return - } - wg.Add(1) - - downloadObjMeta := &util.DownloadObjectMeta{ - ObjectKey: obj.Key, - Size: obj.Size, - DoneChan: make(chan struct{}, 1), - ErrChan: make(chan error, 1), - } - - go func() { - defer wg.Done() - err := checkIsFileExist(ctx, downloadObjMeta) - if err != nil { - migrator.SetMigrationError(err) - return - } - if downloadObjMeta.IsFileAlreadyExist && migration.skip == Skip { - zlogger.Logger.Info("Skipping migration of object" + downloadObjMeta.ObjectKey) - return - } - migrator.DownloadStart(downloadObjMeta) - zlogger.Logger.Info("download start", downloadObjMeta.ObjectKey, downloadObjMeta.Size) - downloadPath, err := m.awsStore.DownloadToFile(ctx, downloadObjMeta.ObjectKey) - migrator.DownloadDone(downloadObjMeta, downloadPath, err) - migrator.SetMigrationError(err) - zlogger.Logger.Info("download done", downloadObjMeta.ObjectKey, downloadObjMeta.Size, err) - }() - time.Sleep(1 * time.Second) - } - wg.Wait() - select { - case err := <-errCh: - if err != nil { - migrator.SetMigrationError(err) - } - } -} - -func (m *Migration) UploadWorker(ctx context.Context, migrator *util.MigrationQueue) { - defer migrator.CloseUploadQueue() - downloadQueue := migrator.GetDownloadQueue() - wg := &sync.WaitGroup{} - for d := range downloadQueue { - migrator.PauseUpload() - downloadObj := d - uploadObj := &util.UploadObjectMeta{ - ObjectKey: downloadObj.ObjectKey, - DoneChan: make(chan struct{}, 1), - ErrChan: make(chan error, 1), - Size: downloadObj.Size, - } - wg.Add(1) - go func() { - defer wg.Done() - err := checkDownloadStatus(downloadObj) - if err != nil { - migrator.SetMigrationError(err) - return - } - defer os.Remove(downloadObj.LocalPath) - migrator.UploadStart(uploadObj) - zlogger.Logger.Info("upload start", uploadObj.ObjectKey, uploadObj.Size) - err = util.Retry(3, time.Second*5, func() error { - var err error - err = processUpload(ctx, downloadObj) - return err - }) - migrator.UploadDone(uploadObj, err) - migrator.SetMigrationError(err) - zlogger.Logger.Info("upload done", uploadObj.ObjectKey, uploadObj.Size, err) - }() - time.Sleep(1 * time.Second) - } - wg.Wait() -} - -func getRemotePath(objectKey string) string { - return filepath.Join(migration.migrateTo, migration.bucket, objectKey) -} - -func checkIsFileExist(ctx context.Context, downloadObj *util.DownloadObjectMeta) error { - remotePath := getRemotePath(downloadObj.ObjectKey) - - var isFileExist bool - var err error - err = util.Retry(3, time.Second*5, func() error { - var err error - isFileExist, err = migration.zStore.IsFileExist(ctx, remotePath) - return err - }) - - if err != nil { - zlogger.Logger.Error(err) - return err - } - - downloadObj.IsFileAlreadyExist = isFileExist - return nil -} - -func checkDownloadStatus(downloadObj *util.DownloadObjectMeta) error { - select { - case <-downloadObj.DoneChan: - return nil - case err := <-downloadObj.ErrChan: - return err - } -} - -func processUpload(ctx context.Context, downloadObj *util.DownloadObjectMeta) error { - remotePath := getRemotePath(downloadObj.ObjectKey) - - fileObj, err := os.Open(downloadObj.LocalPath) - if err != nil { - zlogger.Logger.Error(err) - return err - } - - defer fileObj.Close() - - fileInfo, err := fileObj.Stat() - mimeType, err := zboxutil.GetFileContentType(fileObj) - if err != nil { - zlogger.Logger.Error(err) - return err - } - - if downloadObj.IsFileAlreadyExist { - switch migration.skip { - case Replace: - zlogger.Logger.Info("Replacing object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) - err = migration.zStore.Replace(ctx, remotePath, fileObj, fileInfo.Size(), mimeType) - case Duplicate: - zlogger.Logger.Info("Duplicating object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) - err = migration.zStore.Duplicate(ctx, remotePath, fileObj, fileInfo.Size(), mimeType) - } - } else { - zlogger.Logger.Info("Uploading object" + downloadObj.ObjectKey + " size " + strconv.FormatInt(downloadObj.Size, 10)) - err = migration.zStore.Upload(ctx, remotePath, fileObj, fileInfo.Size(), mimeType, false) - } - - if err != nil { - zlogger.Logger.Error(err) - return err - } else { - if migration.deleteSource { - migration.awsStore.DeleteFile(ctx, downloadObj.ObjectKey) - } - migration.szCtMu.Lock() - migration.migratedSize += uint64(downloadObj.Size) - migration.totalMigratedObjects++ - migration.szCtMu.Unlock() - return nil - } -} - -func (m *Migration) UpdateStateFile(migrateHandler *util.MigrationQueue) { - updateState, closeStateFile, err := updateStateKeyFunc(migration.stateFilePath) - if err != nil { - migrateHandler.SetMigrationError(err) - return - } - defer closeStateFile() - uploadQueue := migrateHandler.GetUploadQueue() - for u := range uploadQueue { - select { - case <-u.DoneChan: - updateState(u.ObjectKey) - case <-u.ErrChan: - return - } - } - return -} - -func MigrationStart() error { - defer func(start time.Time) { - zlogger.Logger.Info("time taken: ", time.Now().Sub(start)) - }(time.Now()) - - migrateHandler := util.NewMigrationQueue() - go migration.DownloadWorker(rootContext, migrateHandler) - go migration.UploadWorker(rootContext, migrateHandler) - migration.UpdateStateFile(migrateHandler) - err := migrateHandler.GetMigrationError() - if err != nil { - zlogger.Logger.Error("Error while migration, err", err) - } - zlogger.Logger.Info("Total migrated objects: ", migration.totalMigratedObjects) - zlogger.Logger.Info("Total migrated size: ", migration.migratedSize) - return err -} diff --git a/s3/aws_integration_test.go b/s3/aws_integration_test.go index 79296e4..9ea7209 100644 --- a/s3/aws_integration_test.go +++ b/s3/aws_integration_test.go @@ -84,3 +84,16 @@ func TestService_DeleteFile(t *testing.T) { log.Printf("object key deletion error,err = %+v", err) } } + +func TestAwsClient_DownloadManager(t *testing.T) { + awsAccessKey := "AKIA4MPQDEZ4FKNA3OPR" + awsSecretKey := "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" + + util.SetAwsEnvCredentials(awsAccessKey, awsSecretKey) + objectKey := "filesforbucket/65.txt" + x := time.Now() + s3Svc, _ := GetAwsClient("lpobkt1", "", "", false, nil, nil, "", "/Users/mdmiranahmedansari/aws_temp") + downloadPath, err := s3Svc.DownloadToFile(context.Background(), objectKey) + log.Println(downloadPath, err) + log.Println(time.Now().Sub(x).Seconds()) +} diff --git a/s3/mocks/mock_aws.go b/s3/mocks/mock_aws.go index d6f20d1..c2232db 100644 --- a/s3/mocks/mock_aws.go +++ b/s3/mocks/mock_aws.go @@ -49,6 +49,21 @@ func (mr *MockAwsIMockRecorder) DeleteFile(arg0, arg1 interface{}) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteFile", reflect.TypeOf((*MockAwsI)(nil).DeleteFile), arg0, arg1) } +// DownloadToFile mocks base method. +func (m *MockAwsI) DownloadToFile(arg0 context.Context, arg1 string) (string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DownloadToFile", arg0, arg1) + ret0, _ := ret[0].(string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DownloadToFile indicates an expected call of DownloadToFile. +func (mr *MockAwsIMockRecorder) DownloadToFile(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadToFile", reflect.TypeOf((*MockAwsI)(nil).DownloadToFile), arg0, arg1) +} + // GetFileContent mocks base method. func (m *MockAwsI) GetFileContent(arg0 context.Context, arg1 string) (*s3.Object, error) { m.ctrl.T.Helper() diff --git a/util/migration_worker.go b/util/migration_worker.go index edc15e0..0372515 100644 --- a/util/migration_worker.go +++ b/util/migration_worker.go @@ -10,11 +10,11 @@ const ( downloadConcurrencyLimit = 30 fileSizeLimit = int64(1024*1024) * int64(1024) * int64(5) uploadConcurrencyLimit = 10 - uploadSizeLimit = int64(1024*1024) * int64(500) + uploadSizeLimit = int64(1024*1024) * int64(1024) * int64(2) downloadSizeLimit = int64(1024*1024) * int64(500) ) -type MigrationQueue struct { +type MigrationWorker struct { diskMutex *sync.RWMutex errMutex *sync.RWMutex currentFileSizeOnDisk int64 @@ -43,8 +43,8 @@ type UploadObjectMeta struct { ErrChan chan error } -func NewMigrationQueue() *MigrationQueue { - return &MigrationQueue{ +func NewMigrationWorker() *MigrationWorker { + return &MigrationWorker{ diskMutex: &sync.RWMutex{}, errMutex: &sync.RWMutex{}, downloadQueue: make(chan *DownloadObjectMeta, 10000), @@ -52,45 +52,45 @@ func NewMigrationQueue() *MigrationQueue { } } -func (m *MigrationQueue) updateFileSizeOnDisk(size int64) { +func (m *MigrationWorker) updateFileSizeOnDisk(size int64) { m.diskMutex.Lock() m.currentFileSizeOnDisk += size m.diskMutex.Unlock() } -func (m *MigrationQueue) GetDownloadQueue() <-chan *DownloadObjectMeta { +func (m *MigrationWorker) GetDownloadQueue() <-chan *DownloadObjectMeta { return m.downloadQueue } -func (m *MigrationQueue) GetUploadQueue() <-chan *UploadObjectMeta { +func (m *MigrationWorker) GetUploadQueue() <-chan *UploadObjectMeta { return m.uploadQueue } -func (m *MigrationQueue) incrUploadConcurrency() { +func (m *MigrationWorker) incrUploadConcurrency() { atomic.AddInt32(&m.uploadConcurrency, 1) } -func (m *MigrationQueue) decrUploadConcurrency() { +func (m *MigrationWorker) decrUploadConcurrency() { atomic.AddInt32(&m.uploadConcurrency, -1) } -func (m *MigrationQueue) checkUploadStatus() bool { +func (m *MigrationWorker) checkUploadStatus() bool { return atomic.LoadInt32(&m.uploadConcurrency) >= uploadConcurrencyLimit || atomic.LoadInt64(&m.currentUploadSize) >= uploadSizeLimit } -func (m *MigrationQueue) PauseUpload() { +func (m *MigrationWorker) PauseUpload() { for m.checkUploadStatus() { time.Sleep(5 * time.Second) } } -func (m *MigrationQueue) UploadStart(u *UploadObjectMeta) { +func (m *MigrationWorker) UploadStart(u *UploadObjectMeta) { m.incrUploadConcurrency() atomic.AddInt64(&m.currentUploadSize, u.Size) m.uploadQueue <- u } -func (m *MigrationQueue) UploadDone(u *UploadObjectMeta, err error) { +func (m *MigrationWorker) UploadDone(u *UploadObjectMeta, err error) { m.updateFileSizeOnDisk(-u.Size) m.decrUploadConcurrency() atomic.AddInt64(&m.currentUploadSize, -u.Size) @@ -100,19 +100,19 @@ func (m *MigrationQueue) UploadDone(u *UploadObjectMeta, err error) { u.DoneChan <- struct{}{} } -func (m *MigrationQueue) CloseUploadQueue() { +func (m *MigrationWorker) CloseUploadQueue() { close(m.uploadQueue) } -func (m *MigrationQueue) incrDownloadConcurrency() { +func (m *MigrationWorker) incrDownloadConcurrency() { atomic.AddInt32(&m.downloadConcurrency, 1) } -func (m *MigrationQueue) decrDownloadConcurrency() { +func (m *MigrationWorker) decrDownloadConcurrency() { atomic.AddInt32(&m.downloadConcurrency, -1) } -func (m *MigrationQueue) checkDownloadStatus() bool { +func (m *MigrationWorker) checkDownloadStatus() bool { m.diskMutex.RLock() defer m.diskMutex.RUnlock() return m.currentFileSizeOnDisk >= fileSizeLimit || @@ -120,20 +120,20 @@ func (m *MigrationQueue) checkDownloadStatus() bool { atomic.LoadInt64(&m.currentDownloadSize) >= downloadSizeLimit } -func (m *MigrationQueue) PauseDownload() { +func (m *MigrationWorker) PauseDownload() { for m.checkDownloadStatus() { time.Sleep(5 * time.Second) } } -func (m *MigrationQueue) DownloadStart(d *DownloadObjectMeta) { +func (m *MigrationWorker) DownloadStart(d *DownloadObjectMeta) { m.incrDownloadConcurrency() m.downloadQueue <- d m.updateFileSizeOnDisk(d.Size) atomic.AddInt64(&m.currentDownloadSize, d.Size) } -func (m *MigrationQueue) DownloadDone(d *DownloadObjectMeta, localPath string, err error) { +func (m *MigrationWorker) DownloadDone(d *DownloadObjectMeta, localPath string, err error) { m.decrDownloadConcurrency() atomic.AddInt64(&m.currentDownloadSize, -d.Size) if err != nil { @@ -144,21 +144,21 @@ func (m *MigrationQueue) DownloadDone(d *DownloadObjectMeta, localPath string, e } } -func (m *MigrationQueue) CloseDownloadQueue() { +func (m *MigrationWorker) CloseDownloadQueue() { close(m.downloadQueue) } -func (m *MigrationQueue) GetMigrationError() error { +func (m *MigrationWorker) GetMigrationError() error { m.errMutex.RLock() defer m.errMutex.RUnlock() return m.errInSystem } -func (m *MigrationQueue) IsMigrationError() bool { +func (m *MigrationWorker) IsMigrationError() bool { return m.GetMigrationError() != nil } -func (m *MigrationQueue) SetMigrationError(err error) { +func (m *MigrationWorker) SetMigrationError(err error) { if err != nil { m.errMutex.Lock() defer m.errMutex.Unlock() From 40754de218a438ffb568fefd1c818dd6e66667f1 Mon Sep 17 00:00:00 2001 From: Md Miran Ahmed Ansari Date: Fri, 28 Jan 2022 20:36:47 +0530 Subject: [PATCH 04/11] test case fixeds --- migration/migrate.go | 6 +- migration/migration_test.go | 142 +++++++++++++++------------------- util/file_system.go | 33 ++++++++ util/mocks/mock_file.go | 109 ++++++++++++++++++++++++++ util/mocks/mock_file_info.go | 120 ++++++++++++++++++++++++++++ util/mocks/mock_filesystem.go | 80 +++++++++++++++++++ util/util.go | 2 +- 7 files changed, 410 insertions(+), 82 deletions(-) create mode 100644 util/file_system.go create mode 100644 util/mocks/mock_file.go create mode 100644 util/mocks/mock_file_info.go create mode 100644 util/mocks/mock_filesystem.go diff --git a/migration/migrate.go b/migration/migrate.go index a63c0da..7f377da 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -46,6 +46,7 @@ func abandonAllOperations(err error) { type Migration struct { zStore dStorage.DStoreI awsStore s3.AwsI + fs util.FileSystem skip int retryCount int @@ -107,6 +108,7 @@ func InitMigration(mConfig *MigrationConfig) error { deleteSource: mConfig.DeleteSource, workDir: mConfig.WorkDir, bucket: mConfig.Bucket, + fs: util.Fs, } rootContext, rootContextCancel = context.WithCancel(context.Background()) @@ -251,7 +253,7 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *util.MigrationWo migrator.SetMigrationError(err) return } - defer os.Remove(downloadObj.LocalPath) + defer m.fs.Remove(downloadObj.LocalPath) migrator.UploadStart(uploadObj) zlogger.Logger.Info("upload start", uploadObj.ObjectKey, uploadObj.Size) err = util.Retry(3, time.Second*5, func() error { @@ -304,7 +306,7 @@ func checkDownloadStatus(downloadObj *util.DownloadObjectMeta) error { func processUpload(ctx context.Context, downloadObj *util.DownloadObjectMeta) error { remotePath := getRemotePath(downloadObj.ObjectKey) - fileObj, err := os.Open(downloadObj.LocalPath) + fileObj, err := migration.fs.Open(downloadObj.LocalPath) if err != nil { zlogger.Logger.Error(err) return err diff --git a/migration/migration_test.go b/migration/migration_test.go index 28415dd..02845d4 100644 --- a/migration/migration_test.go +++ b/migration/migration_test.go @@ -6,6 +6,7 @@ import ( mock_dstorage "github.com/0chain/s3migration/dstorage/mocks" "github.com/0chain/s3migration/s3" mock_s3 "github.com/0chain/s3migration/s3/mocks" + mock_util "github.com/0chain/s3migration/util/mocks" "github.com/golang/mock/gomock" "log" "testing" @@ -18,10 +19,12 @@ func TestMigrate(t *testing.T) { dStorageService := mock_dstorage.NewMockDStoreI(ctrl) awsStorageService := mock_s3.NewMockAwsI(ctrl) + fileSystem := mock_util.NewMockFileSystem(ctrl) migration = Migration{ zStore: dStorageService, awsStore: awsStorageService, skip: Skip, + fs: fileSystem, } isMigrationInitialized = true @@ -32,38 +35,6 @@ func TestMigrate(t *testing.T) { err error migrateFileCount int }{ - { - name: "insufficient allocation space", - setUpMock: func() { - rootContext, rootContextCancel = context.WithCancel(context.Background()) - fileListChan := make(chan *s3.ObjectMeta, 1000) - fileListChan <- &s3.ObjectMeta{ - Key: "file1", Size: 1200, - } - - fileListChan <- &s3.ObjectMeta{ - Key: "file2", Size: 1400, - } - - fileListChan <- &s3.ObjectMeta{ - Key: "file3", Size: 1500, - } - - close(fileListChan) - - errChan := make(chan error, 1) - awsStorageService.EXPECT().ListFilesInBucket(gomock.Any()).Return(fileListChan, errChan) - - updateStateKeyFunc = func(statePath string) (func(stateKey string), func(), error) { - return func(stateKey string) {}, func() {}, nil - } - - dStorageService.EXPECT().UpdateAllocationDetails().Return(nil) - dStorageService.EXPECT().GetAvailableSpace().Return(int64(1200)) - }, - wantErr: true, - err: context.Canceled, - }, { name: "success in uploading files", setUpMock: func() { @@ -87,24 +58,52 @@ func TestMigrate(t *testing.T) { close(errChan) awsStorageService.EXPECT().ListFilesInBucket(gomock.Any()).Return(fileListChan, errChan) + awsStorageService.EXPECT().DownloadToFile(gomock.Any(), "file1").Return("/aws/file1", nil) + awsStorageService.EXPECT().DownloadToFile(gomock.Any(), "file2").Return("/aws/file2", nil) + awsStorageService.EXPECT().DownloadToFile(gomock.Any(), "file3").Return("/aws/file3", nil) + updateStateKeyFunc = func(statePath string) (func(stateKey string), func(), error) { return func(stateKey string) {}, func() {}, nil } - dStorageService.EXPECT().UpdateAllocationDetails().Return(nil) - dStorageService.EXPECT().GetAvailableSpace().Return(int64(4200)) - dStorageService.EXPECT().IsFileExist(gomock.Any(), gomock.Any()).AnyTimes().Return(false, nil) - awsStorageService.EXPECT().GetFileContent(gomock.Any(), "file1").Return(&s3.Object{}, nil) - awsStorageService.EXPECT().GetFileContent(gomock.Any(), "file2").Return(&s3.Object{}, nil) - awsStorageService.EXPECT().GetFileContent(gomock.Any(), "file3").Return(&s3.Object{}, nil) - dStorageService.EXPECT().Upload(gomock.Any(), "file1", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - dStorageService.EXPECT().Upload(gomock.Any(), "file2", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - dStorageService.EXPECT().Upload(gomock.Any(), "file3", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + dStorageService.EXPECT().IsFileExist(gomock.Any(), getRemotePath("file1")).Return(false, nil) + dStorageService.EXPECT().IsFileExist(gomock.Any(), getRemotePath("file2")).Return(false, nil) + dStorageService.EXPECT().IsFileExist(gomock.Any(), getRemotePath("file3")).Return(false, nil) + + fileInfo := mock_util.NewMockFileInfo(ctrl) + file1Data := mock_util.NewMockFile(ctrl) + file1Data.EXPECT().Stat().Return(fileInfo, nil) + file1Data.EXPECT().Read(gomock.Any()).Return(1, nil) + file1Data.EXPECT().Seek(gomock.Any(), gomock.Any()).Return(int64(123), nil) + file1Data.EXPECT().Close().Return(nil) + file2Data := mock_util.NewMockFile(ctrl) + file2Data.EXPECT().Stat().Return(fileInfo, nil) + file2Data.EXPECT().Read(gomock.Any()).Return(1, nil) + file2Data.EXPECT().Seek(gomock.Any(), gomock.Any()).Return(int64(123), nil) + file2Data.EXPECT().Close().Return(nil) + file3Data := mock_util.NewMockFile(ctrl) + file3Data.EXPECT().Stat().Return(fileInfo, nil) + file3Data.EXPECT().Read(gomock.Any()).Return(1, nil) + file3Data.EXPECT().Seek(gomock.Any(), gomock.Any()).Return(int64(123), nil) + fileInfo.EXPECT().Size().AnyTimes().Return(int64(122)) + file3Data.EXPECT().Close().Return(nil) + + fileSystem.EXPECT().Open("/aws/file1").Return(file1Data, nil) + fileSystem.EXPECT().Open("/aws/file2").Return(file2Data, nil) + fileSystem.EXPECT().Open("/aws/file3").Return(file3Data, nil) + + fileSystem.EXPECT().Remove("/aws/file1").Return(nil) + fileSystem.EXPECT().Remove("/aws/file2").Return(nil) + fileSystem.EXPECT().Remove("/aws/file3").Return(nil) + + dStorageService.EXPECT().Upload(gomock.Any(), getRemotePath("file1"), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + dStorageService.EXPECT().Upload(gomock.Any(), getRemotePath("file2"), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + dStorageService.EXPECT().Upload(gomock.Any(), getRemotePath("file3"), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) }, wantErr: false, }, { - name: "aws get content error", + name: "download to file error", setUpMock: func() { rootContext, rootContextCancel = context.WithCancel(context.Background()) fileListChan := make(chan *s3.ObjectMeta, 1000) @@ -112,14 +111,6 @@ func TestMigrate(t *testing.T) { Key: "file11", Size: 1200, } - fileListChan <- &s3.ObjectMeta{ - Key: "file22", Size: 1400, - } - - fileListChan <- &s3.ObjectMeta{ - Key: "file33", Size: 1500, - } - close(fileListChan) errChan := make(chan error, 1) @@ -130,18 +121,12 @@ func TestMigrate(t *testing.T) { return func(stateKey string) {}, func() {}, nil } - dStorageService.EXPECT().UpdateAllocationDetails().Return(nil) - dStorageService.EXPECT().GetAvailableSpace().Return(int64(4200)) - dStorageService.EXPECT().IsFileExist(gomock.Any(), gomock.Any()).AnyTimes().Return(false, nil) - awsStorageService.EXPECT().GetFileContent(gomock.Any(), "file11").AnyTimes().Return(&s3.Object{}, nil) - awsStorageService.EXPECT().GetFileContent(gomock.Any(), "file33").AnyTimes().Return(&s3.Object{}, nil) - awsStorageService.EXPECT().GetFileContent(gomock.Any(), "file22").AnyTimes().Return(&s3.Object{}, errors.New("some error")) - dStorageService.EXPECT().Upload(gomock.Any(), "file11", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) - dStorageService.EXPECT().Upload(gomock.Any(), "file22", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) - dStorageService.EXPECT().Upload(gomock.Any(), "file33", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(nil) + dStorageService.EXPECT().IsFileExist(gomock.Any(), getRemotePath("file11")).Return(false, nil) + + awsStorageService.EXPECT().DownloadToFile(gomock.Any(), "file11").AnyTimes().Return("", errors.New("some error")) }, wantErr: true, - err: context.Canceled, + err: errors.New("some error"), }, { name: "dstorage upload error", @@ -152,14 +137,6 @@ func TestMigrate(t *testing.T) { Key: "file10", Size: 1200, } - fileListChan <- &s3.ObjectMeta{ - Key: "file20", Size: 1400, - } - - fileListChan <- &s3.ObjectMeta{ - Key: "file30", Size: 1500, - } - close(fileListChan) errChan := make(chan error, 1) @@ -170,18 +147,25 @@ func TestMigrate(t *testing.T) { return func(stateKey string) {}, func() {}, nil } - dStorageService.EXPECT().UpdateAllocationDetails().Return(nil) - dStorageService.EXPECT().GetAvailableSpace().Return(int64(4200)) - dStorageService.EXPECT().IsFileExist(gomock.Any(), gomock.Any()).AnyTimes().Return(false, nil) - awsStorageService.EXPECT().GetFileContent(gomock.Any(), "file10").Return(&s3.Object{}, nil) - awsStorageService.EXPECT().GetFileContent(gomock.Any(), "file20").AnyTimes().Return(&s3.Object{}, nil) - awsStorageService.EXPECT().GetFileContent(gomock.Any(), "file30").Return(&s3.Object{}, nil) - dStorageService.EXPECT().Upload(gomock.Any(), "file10", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) - dStorageService.EXPECT().Upload(gomock.Any(), "file20", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes().Return(errors.New("some error")) - dStorageService.EXPECT().Upload(gomock.Any(), "file30", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(nil) + dStorageService.EXPECT().IsFileExist(gomock.Any(), "file10").AnyTimes().Return(false, nil) + awsStorageService.EXPECT().DownloadToFile(gomock.Any(), "file10").Return("/aws/file10", nil) + + fileInfo := mock_util.NewMockFileInfo(ctrl) + file1Data := mock_util.NewMockFile(ctrl) + file1Data.EXPECT().Stat().Return(fileInfo, nil).Times(3) + file1Data.EXPECT().Read(gomock.Any()).Return(1, nil).Times(3) + file1Data.EXPECT().Seek(gomock.Any(), gomock.Any()).Return(int64(123), nil).Times(3) + file1Data.EXPECT().Close().Return(nil).Times(3) + + fileSystem.EXPECT().Open("/aws/file10").Return(file1Data, nil).Times(3) + fileSystem.EXPECT().Remove("/aws/file10").Return(nil) + + fileInfo.EXPECT().Size().AnyTimes().Return(int64(122)) + + dStorageService.EXPECT().Upload(gomock.Any(), "file10", gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(errors.New("some error")).Times(3) }, wantErr: true, - err: context.Canceled, + err: errors.New("after 3 attempts, last error: some error"), }, { name: "aws list object error", @@ -208,7 +192,7 @@ func TestMigrate(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tt.setUpMock() - err := Migrate() + err := StartMigration() log.Println(err) if tt.wantErr != (err != nil) { t.Errorf("s3-migration Migrate, wantErr: %v, got: %v", tt.wantErr, err) diff --git a/util/file_system.go b/util/file_system.go new file mode 100644 index 0000000..0d6745a --- /dev/null +++ b/util/file_system.go @@ -0,0 +1,33 @@ +package util + +import ( + "io" + "os" +) + +var Fs FileSystem = osFS{} + +//go:generate mockgen -destination mocks/mock_filesystem.go -package mock_util github.com/0chain/s3migration/util FileSystem +type FileSystem interface { + Open(name string) (File, error) + Stat(name string) (os.FileInfo, error) + Remove(name string) error +} + +//go:generate mockgen -destination mocks/mock_file.go -package mock_util github.com/0chain/s3migration/util File +type File interface { + io.Closer + io.Reader + io.ReaderAt + io.Seeker + Stat() (os.FileInfo, error) +} + +type osFS struct{} + +func (osFS) Open(name string) (File, error) { return os.Open(name) } +func (osFS) Stat(name string) (os.FileInfo, error) { return os.Stat(name) } +func (osFS) Remove(name string) error { return os.Remove(name) } + +//go:generate mockgen -destination mocks/mock_file_info.go -package mock_util github.com/0chain/s3migration/util FileInfo +type FileInfo os.FileInfo diff --git a/util/mocks/mock_file.go b/util/mocks/mock_file.go new file mode 100644 index 0000000..bdb9f90 --- /dev/null +++ b/util/mocks/mock_file.go @@ -0,0 +1,109 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/0chain/s3migration/util (interfaces: File) + +// Package mock_util is a generated GoMock package. +package mock_util + +import ( + fs "io/fs" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockFile is a mock of File interface. +type MockFile struct { + ctrl *gomock.Controller + recorder *MockFileMockRecorder +} + +// MockFileMockRecorder is the mock recorder for MockFile. +type MockFileMockRecorder struct { + mock *MockFile +} + +// NewMockFile creates a new mock instance. +func NewMockFile(ctrl *gomock.Controller) *MockFile { + mock := &MockFile{ctrl: ctrl} + mock.recorder = &MockFileMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFile) EXPECT() *MockFileMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockFile) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockFileMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockFile)(nil).Close)) +} + +// Read mocks base method. +func (m *MockFile) Read(arg0 []byte) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Read", arg0) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Read indicates an expected call of Read. +func (mr *MockFileMockRecorder) Read(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Read", reflect.TypeOf((*MockFile)(nil).Read), arg0) +} + +// ReadAt mocks base method. +func (m *MockFile) ReadAt(arg0 []byte, arg1 int64) (int, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ReadAt", arg0, arg1) + ret0, _ := ret[0].(int) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ReadAt indicates an expected call of ReadAt. +func (mr *MockFileMockRecorder) ReadAt(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ReadAt", reflect.TypeOf((*MockFile)(nil).ReadAt), arg0, arg1) +} + +// Seek mocks base method. +func (m *MockFile) Seek(arg0 int64, arg1 int) (int64, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Seek", arg0, arg1) + ret0, _ := ret[0].(int64) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Seek indicates an expected call of Seek. +func (mr *MockFileMockRecorder) Seek(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Seek", reflect.TypeOf((*MockFile)(nil).Seek), arg0, arg1) +} + +// Stat mocks base method. +func (m *MockFile) Stat() (fs.FileInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Stat") + ret0, _ := ret[0].(fs.FileInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Stat indicates an expected call of Stat. +func (mr *MockFileMockRecorder) Stat() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stat", reflect.TypeOf((*MockFile)(nil).Stat)) +} diff --git a/util/mocks/mock_file_info.go b/util/mocks/mock_file_info.go new file mode 100644 index 0000000..e20c39c --- /dev/null +++ b/util/mocks/mock_file_info.go @@ -0,0 +1,120 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/0chain/s3migration/util (interfaces: FileInfo) + +// Package mock_util is a generated GoMock package. +package mock_util + +import ( + fs "io/fs" + reflect "reflect" + time "time" + + gomock "github.com/golang/mock/gomock" +) + +// MockFileInfo is a mock of FileInfo interface. +type MockFileInfo struct { + ctrl *gomock.Controller + recorder *MockFileInfoMockRecorder +} + +// MockFileInfoMockRecorder is the mock recorder for MockFileInfo. +type MockFileInfoMockRecorder struct { + mock *MockFileInfo +} + +// NewMockFileInfo creates a new mock instance. +func NewMockFileInfo(ctrl *gomock.Controller) *MockFileInfo { + mock := &MockFileInfo{ctrl: ctrl} + mock.recorder = &MockFileInfoMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFileInfo) EXPECT() *MockFileInfoMockRecorder { + return m.recorder +} + +// IsDir mocks base method. +func (m *MockFileInfo) IsDir() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsDir") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsDir indicates an expected call of IsDir. +func (mr *MockFileInfoMockRecorder) IsDir() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsDir", reflect.TypeOf((*MockFileInfo)(nil).IsDir)) +} + +// ModTime mocks base method. +func (m *MockFileInfo) ModTime() time.Time { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ModTime") + ret0, _ := ret[0].(time.Time) + return ret0 +} + +// ModTime indicates an expected call of ModTime. +func (mr *MockFileInfoMockRecorder) ModTime() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ModTime", reflect.TypeOf((*MockFileInfo)(nil).ModTime)) +} + +// Mode mocks base method. +func (m *MockFileInfo) Mode() fs.FileMode { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Mode") + ret0, _ := ret[0].(fs.FileMode) + return ret0 +} + +// Mode indicates an expected call of Mode. +func (mr *MockFileInfoMockRecorder) Mode() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Mode", reflect.TypeOf((*MockFileInfo)(nil).Mode)) +} + +// Name mocks base method. +func (m *MockFileInfo) Name() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Name") + ret0, _ := ret[0].(string) + return ret0 +} + +// Name indicates an expected call of Name. +func (mr *MockFileInfoMockRecorder) Name() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Name", reflect.TypeOf((*MockFileInfo)(nil).Name)) +} + +// Size mocks base method. +func (m *MockFileInfo) Size() int64 { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Size") + ret0, _ := ret[0].(int64) + return ret0 +} + +// Size indicates an expected call of Size. +func (mr *MockFileInfoMockRecorder) Size() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockFileInfo)(nil).Size)) +} + +// Sys mocks base method. +func (m *MockFileInfo) Sys() interface{} { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Sys") + ret0, _ := ret[0].(interface{}) + return ret0 +} + +// Sys indicates an expected call of Sys. +func (mr *MockFileInfoMockRecorder) Sys() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Sys", reflect.TypeOf((*MockFileInfo)(nil).Sys)) +} diff --git a/util/mocks/mock_filesystem.go b/util/mocks/mock_filesystem.go new file mode 100644 index 0000000..e25082c --- /dev/null +++ b/util/mocks/mock_filesystem.go @@ -0,0 +1,80 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/0chain/s3migration/util (interfaces: FileSystem) + +// Package mock_util is a generated GoMock package. +package mock_util + +import ( + fs "io/fs" + reflect "reflect" + + util "github.com/0chain/s3migration/util" + gomock "github.com/golang/mock/gomock" +) + +// MockFileSystem is a mock of FileSystem interface. +type MockFileSystem struct { + ctrl *gomock.Controller + recorder *MockFileSystemMockRecorder +} + +// MockFileSystemMockRecorder is the mock recorder for MockFileSystem. +type MockFileSystemMockRecorder struct { + mock *MockFileSystem +} + +// NewMockFileSystem creates a new mock instance. +func NewMockFileSystem(ctrl *gomock.Controller) *MockFileSystem { + mock := &MockFileSystem{ctrl: ctrl} + mock.recorder = &MockFileSystemMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFileSystem) EXPECT() *MockFileSystemMockRecorder { + return m.recorder +} + +// Open mocks base method. +func (m *MockFileSystem) Open(arg0 string) (util.File, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Open", arg0) + ret0, _ := ret[0].(util.File) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Open indicates an expected call of Open. +func (mr *MockFileSystemMockRecorder) Open(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Open", reflect.TypeOf((*MockFileSystem)(nil).Open), arg0) +} + +// Remove mocks base method. +func (m *MockFileSystem) Remove(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Remove", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Remove indicates an expected call of Remove. +func (mr *MockFileSystemMockRecorder) Remove(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Remove", reflect.TypeOf((*MockFileSystem)(nil).Remove), arg0) +} + +// Stat mocks base method. +func (m *MockFileSystem) Stat(arg0 string) (fs.FileInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Stat", arg0) + ret0, _ := ret[0].(fs.FileInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Stat indicates an expected call of Stat. +func (mr *MockFileSystemMockRecorder) Stat(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stat", reflect.TypeOf((*MockFileSystem)(nil).Stat), arg0) +} diff --git a/util/util.go b/util/util.go index 150a7d8..73fcc73 100644 --- a/util/util.go +++ b/util/util.go @@ -99,7 +99,7 @@ func GetAwsCredentialsFromEnv() (string, string) { return os.Getenv("AWS_ACCESS_KEY"), os.Getenv("AWS_SECRET_KEY") } -// readLines reads a whole file into memory +// readLines reads a whole File into memory // and returns a slice of its lines. func readLines(path string) ([]string, error) { file, err := os.Open(path) From 595666bc9d57fa94e3ef01f25e9ad75fb73afa76 Mon Sep 17 00:00:00 2001 From: Md Miran Ahmed Ansari Date: Fri, 28 Jan 2022 20:40:00 +0530 Subject: [PATCH 05/11] commit file --- util/util.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/util.go b/util/util.go index 73fcc73..150a7d8 100644 --- a/util/util.go +++ b/util/util.go @@ -99,7 +99,7 @@ func GetAwsCredentialsFromEnv() (string, string) { return os.Getenv("AWS_ACCESS_KEY"), os.Getenv("AWS_SECRET_KEY") } -// readLines reads a whole File into memory +// readLines reads a whole file into memory // and returns a slice of its lines. func readLines(path string) ([]string, error) { file, err := os.Open(path) From 1a761ef7b84c6fdc2bb083078998658a5f39b2ce Mon Sep 17 00:00:00 2001 From: Md Miran Ahmed Ansari Date: Wed, 2 Feb 2022 00:28:11 +0530 Subject: [PATCH 06/11] modify code --- migration/migrate.go | 18 +++++++++--------- {util => migration}/migration_worker.go | 2 +- 2 files changed, 10 insertions(+), 10 deletions(-) rename {util => migration}/migration_worker.go (99%) diff --git a/migration/migrate.go b/migration/migrate.go index 7f377da..ae95051 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -172,7 +172,7 @@ func StartMigration() error { zlogger.Logger.Info("time taken: ", time.Now().Sub(start)) }(time.Now()) - migrationWorker := util.NewMigrationWorker() + migrationWorker := NewMigrationWorker() go migration.DownloadWorker(rootContext, migrationWorker) go migration.UploadWorker(rootContext, migrationWorker) migration.UpdateStateFile(migrationWorker) @@ -185,7 +185,7 @@ func StartMigration() error { return err } -func (m *Migration) DownloadWorker(ctx context.Context, migrator *util.MigrationWorker) { +func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorker) { defer migrator.CloseDownloadQueue() objCh, errCh := migration.awsStore.ListFilesInBucket(rootContext) wg := &sync.WaitGroup{} @@ -196,7 +196,7 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *util.Migration } wg.Add(1) - downloadObjMeta := &util.DownloadObjectMeta{ + downloadObjMeta := &DownloadObjectMeta{ ObjectKey: obj.Key, Size: obj.Size, DoneChan: make(chan struct{}, 1), @@ -232,14 +232,14 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *util.Migration } } -func (m *Migration) UploadWorker(ctx context.Context, migrator *util.MigrationWorker) { +func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) { defer migrator.CloseUploadQueue() downloadQueue := migrator.GetDownloadQueue() wg := &sync.WaitGroup{} for d := range downloadQueue { migrator.PauseUpload() downloadObj := d - uploadObj := &util.UploadObjectMeta{ + uploadObj := &UploadObjectMeta{ ObjectKey: downloadObj.ObjectKey, DoneChan: make(chan struct{}, 1), ErrChan: make(chan error, 1), @@ -274,7 +274,7 @@ func getRemotePath(objectKey string) string { return filepath.Join(migration.migrateTo, migration.bucket, objectKey) } -func checkIsFileExist(ctx context.Context, downloadObj *util.DownloadObjectMeta) error { +func checkIsFileExist(ctx context.Context, downloadObj *DownloadObjectMeta) error { remotePath := getRemotePath(downloadObj.ObjectKey) var isFileExist bool @@ -294,7 +294,7 @@ func checkIsFileExist(ctx context.Context, downloadObj *util.DownloadObjectMeta) return nil } -func checkDownloadStatus(downloadObj *util.DownloadObjectMeta) error { +func checkDownloadStatus(downloadObj *DownloadObjectMeta) error { select { case <-downloadObj.DoneChan: return nil @@ -303,7 +303,7 @@ func checkDownloadStatus(downloadObj *util.DownloadObjectMeta) error { } } -func processUpload(ctx context.Context, downloadObj *util.DownloadObjectMeta) error { +func processUpload(ctx context.Context, downloadObj *DownloadObjectMeta) error { remotePath := getRemotePath(downloadObj.ObjectKey) fileObj, err := migration.fs.Open(downloadObj.LocalPath) @@ -350,7 +350,7 @@ func processUpload(ctx context.Context, downloadObj *util.DownloadObjectMeta) er } } -func (m *Migration) UpdateStateFile(migrateHandler *util.MigrationWorker) { +func (m *Migration) UpdateStateFile(migrateHandler *MigrationWorker) { updateState, closeStateFile, err := updateStateKeyFunc(migration.stateFilePath) if err != nil { migrateHandler.SetMigrationError(err) diff --git a/util/migration_worker.go b/migration/migration_worker.go similarity index 99% rename from util/migration_worker.go rename to migration/migration_worker.go index 0372515..7398759 100644 --- a/util/migration_worker.go +++ b/migration/migration_worker.go @@ -1,4 +1,4 @@ -package util +package migration import ( "sync" From 23cdc9ba07ae880fae922d712242bc8b8907d4a0 Mon Sep 17 00:00:00 2001 From: Md Miran Ahmed Ansari Date: Wed, 2 Feb 2022 01:06:47 +0530 Subject: [PATCH 07/11] golint --- cmd/migration_cmd.go | 2 +- dstorage/dstorage.go | 17 ----------------- migration/migrate.go | 31 +++++++++++++++---------------- migration/migration_test.go | 1 - s3/aws.go | 14 -------------- util/util.go | 18 ------------------ 6 files changed, 16 insertions(+), 67 deletions(-) diff --git a/cmd/migration_cmd.go b/cmd/migration_cmd.go index 627e127..80ac647 100644 --- a/cmd/migration_cmd.go +++ b/cmd/migration_cmd.go @@ -79,7 +79,7 @@ var migrateCmd = &cobra.Command{ Note: Addition of new object or modification of existing file while migrating is not recommended, as it cannot track such changes and you might loose your data. `, RunE: func(cmd *cobra.Command, args []string) error { - cmd.Flags().Parse(args) + _ = cmd.Flags().Parse(args) zlogger.Logger.Info("S3 migration started") var err error if allocationId == "" { diff --git a/dstorage/dstorage.go b/dstorage/dstorage.go index 08f2c56..e051ba1 100644 --- a/dstorage/dstorage.go +++ b/dstorage/dstorage.go @@ -95,23 +95,6 @@ func (d *DStorageService) GetFileMetaData(ctx context.Context, remotePath string return &oResult.Refs[0], nil } -func getChunkSizeNew(size int64, dataShards int) (chunkSize int64) { - - var chunkNum int64 = 1 - for { - chunkSize = (size + int64(dataShards)*chunkNum - 1) / (int64(dataShards) * chunkNum) //equivalent to math.ceil - if chunkSize <= MaxChunkSize { - break - } - chunkNum++ - } - - if chunkSize < MinChunkSize { - chunkSize = MinChunkSize - } - return -} - func getChunkSize(size int64) int64 { var chunkSize int64 switch { diff --git a/migration/migrate.go b/migration/migrate.go index ae95051..c1a5edf 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -26,7 +26,6 @@ const ( ) var migration Migration -var isMigrationInitialized bool //Use context for all requests. var rootContext context.Context @@ -121,8 +120,6 @@ func InitMigration(mConfig *MigrationConfig) error { abandonAllOperations(zerror.ErrOperationCancelledByUser) }() - isMigrationInitialized = true - return nil } @@ -169,7 +166,7 @@ var updateStateKeyFunc = func(statePath string) (func(stateKey string), func(), func StartMigration() error { defer func(start time.Time) { - zlogger.Logger.Info("time taken: ", time.Now().Sub(start)) + zlogger.Logger.Info("time taken: ", time.Since(start)) }(time.Now()) migrationWorker := NewMigrationWorker() @@ -224,12 +221,11 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorke time.Sleep(1 * time.Second) } wg.Wait() - select { - case err := <-errCh: - if err != nil { - migrator.SetMigrationError(err) - } + err := <-errCh + if err != nil { + migrator.SetMigrationError(err) } + } func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) { @@ -253,12 +249,13 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) migrator.SetMigrationError(err) return } - defer m.fs.Remove(downloadObj.LocalPath) + defer func() { + _ = m.fs.Remove(downloadObj.LocalPath) + }() migrator.UploadStart(uploadObj) zlogger.Logger.Info("upload start", uploadObj.ObjectKey, uploadObj.Size) err = util.Retry(3, time.Second*5, func() error { - var err error - err = processUpload(ctx, downloadObj) + err := processUpload(ctx, downloadObj) return err }) migrator.UploadDone(uploadObj, err) @@ -278,8 +275,7 @@ func checkIsFileExist(ctx context.Context, downloadObj *DownloadObjectMeta) erro remotePath := getRemotePath(downloadObj.ObjectKey) var isFileExist bool - var err error - err = util.Retry(3, time.Second*5, func() error { + err := util.Retry(3, time.Second*5, func() error { var err error isFileExist, err = migration.zStore.IsFileExist(ctx, remotePath) return err @@ -315,6 +311,10 @@ func processUpload(ctx context.Context, downloadObj *DownloadObjectMeta) error { defer fileObj.Close() fileInfo, err := fileObj.Stat() + if err != nil { + zlogger.Logger.Error(err) + return err + } mimeType, err := zboxutil.GetFileContentType(fileObj) if err != nil { zlogger.Logger.Error(err) @@ -340,7 +340,7 @@ func processUpload(ctx context.Context, downloadObj *DownloadObjectMeta) error { return err } else { if migration.deleteSource { - migration.awsStore.DeleteFile(ctx, downloadObj.ObjectKey) + _ = migration.awsStore.DeleteFile(ctx, downloadObj.ObjectKey) } migration.szCtMu.Lock() migration.migratedSize += uint64(downloadObj.Size) @@ -366,5 +366,4 @@ func (m *Migration) UpdateStateFile(migrateHandler *MigrationWorker) { return } } - return } diff --git a/migration/migration_test.go b/migration/migration_test.go index 02845d4..ea14e09 100644 --- a/migration/migration_test.go +++ b/migration/migration_test.go @@ -27,7 +27,6 @@ func TestMigrate(t *testing.T) { fs: fileSystem, } - isMigrationInitialized = true tests := []struct { name string setUpMock func() diff --git a/s3/aws.go b/s3/aws.go index f35b27f..dbf0aed 100644 --- a/s3/aws.go +++ b/s3/aws.go @@ -50,20 +50,6 @@ type AwsClient struct { } func GetAwsClient(bucket, prefix, region string, deleteSource bool, newerThan, olderThan *time.Time, startAfter, workDir string) (*AwsClient, error) { - //Get a client; if error return error else return aws client - //buckets comes as slice of array([bucketname, prefix]). Find location and put all of them - //in buckets field. If bucket is nil; then list all buckets from s3 and update the buckets field - // For example - // for _, bkt := range buckets{ - // bucketName := bkt[0] - // prefix := bkt[1] - // location := "abc" // get from client - // awsClient.buckets = append(awsClient.buckets, bucket{ - // Name: bucketName, - // Prefix: prefix, - // Location: location, - // }) - // } if region == "" { region = "us-east-1" diff --git a/util/util.go b/util/util.go index 150a7d8..8c6ac03 100644 --- a/util/util.go +++ b/util/util.go @@ -1,7 +1,6 @@ package util import ( - "bufio" "fmt" "io/ioutil" "log" @@ -99,23 +98,6 @@ func GetAwsCredentialsFromEnv() (string, string) { return os.Getenv("AWS_ACCESS_KEY"), os.Getenv("AWS_SECRET_KEY") } -// readLines reads a whole file into memory -// and returns a slice of its lines. -func readLines(path string) ([]string, error) { - file, err := os.Open(path) - if err != nil { - return nil, err - } - defer file.Close() - - var lines []string - scanner := bufio.NewScanner(file) - for scanner.Scan() { - lines = append(lines, scanner.Text()) - } - return lines, scanner.Err() -} - func ConvertGoSDKTimeToTime(in string) time.Time { t, err := time.Parse(ZGoSDKTimeFormat, in) if err != nil { From 0e62089af69e72d60bd56db6615556e4bb8beaee Mon Sep 17 00:00:00 2001 From: Md Miran Ahmed Ansari Date: Wed, 2 Feb 2022 10:20:14 +0530 Subject: [PATCH 08/11] upgrade gosdk version --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 10d45f7..977dba9 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.16 require ( github.com/0chain/errors v1.0.3 - github.com/0chain/gosdk v1.4.1-0.20220105140556-1e37d2ba4e8a + github.com/0chain/gosdk v1.5.0 github.com/aws/aws-sdk-go-v2 v1.13.0 github.com/aws/aws-sdk-go-v2/config v1.13.0 github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.9.0 diff --git a/go.sum b/go.sum index 52600c9..7329b82 100644 --- a/go.sum +++ b/go.sum @@ -49,8 +49,8 @@ collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/0chain/errors v1.0.3 h1:QQZPFxTfnMcRdt32DXbzRQIfGWmBsKoEdszKQDb0rRM= github.com/0chain/errors v1.0.3/go.mod h1:xymD6nVgrbgttWwkpSCfLLEJbFO6iHGQwk/yeSuYkIc= -github.com/0chain/gosdk v1.4.1-0.20220105140556-1e37d2ba4e8a h1:VYZNUfUTdEB3PLT81bkPWBkRNklUymjdUawAYxhWgyw= -github.com/0chain/gosdk v1.4.1-0.20220105140556-1e37d2ba4e8a/go.mod h1:FB2xXhQyIM1vwvQ1jC98wNclbDTBwqrG+Z/IQC0LaBs= +github.com/0chain/gosdk v1.5.0 h1:vC+eWehOBXaWyLlzVOZnmles4pc3K9kBFCXTyxThbu8= +github.com/0chain/gosdk v1.5.0/go.mod h1:FB2xXhQyIM1vwvQ1jC98wNclbDTBwqrG+Z/IQC0LaBs= github.com/Azure/azure-pipeline-go v0.2.1/go.mod h1:UGSo8XybXnIGZ3epmeBw7Jdz+HiUVpqIlpz/HKHylF4= github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= github.com/Azure/azure-storage-blob-go v0.7.0/go.mod h1:f9YQKtsG1nMisotuTPpO0tjNuEjKRYAcJU8/ydDI++4= From 29c2bba107f74809dd31a409f48c6fac9350d293 Mon Sep 17 00:00:00 2001 From: Md Miran Ahmed Ansari Date: Wed, 2 Feb 2022 23:39:25 +0530 Subject: [PATCH 09/11] remove concurrency check --- cmd/migration_cmd.go | 3 --- migration/migrate.go | 4 ---- migration/migrateConfig.go | 1 - 3 files changed, 8 deletions(-) diff --git a/cmd/migration_cmd.go b/cmd/migration_cmd.go index 80ac647..91a4577 100644 --- a/cmd/migration_cmd.go +++ b/cmd/migration_cmd.go @@ -26,7 +26,6 @@ var ( region string migrateToPath string duplicateSuffix string - concurrency int encrypt bool resume bool skip int // 0 --> Replace; 1 --> Skip; 2 --> Duplicate @@ -59,7 +58,6 @@ func init() { migrateCmd.Flags().BoolVar(&deleteSource, "delete-source", false, "Delete object in s3 that is migrated to dStorage") migrateCmd.Flags().StringVar(&awsCredPath, "aws-cred-path", "", "File Path to aws credentials") migrateCmd.Flags().StringVar(&workDir, "wd", filepath.Join(util.GetHomeDir(), ".s3migration"), "Working directory") - migrateCmd.Flags().IntVar(&concurrency, "concurrency", 10, "number of concurrent files to process concurrently during migration") migrateCmd.Flags().BoolVar(&resume, "resume", false, "pass this option to resume migration from previous state") migrateCmd.Flags().IntVar(&skip, "skip", 1, "0 --> Replace existing files; 1 --> Skip migration; 2 --> Duplicate") migrateCmd.Flags().IntVar(&retryCount, "retry", 3, "retry count for upload to dstorage") @@ -186,7 +184,6 @@ var migrateCmd = &cobra.Command{ AllocationID: allocationId, Region: region, Skip: skip, - Concurrency: concurrency, Bucket: bucket, Prefix: prefix, MigrateToPath: migrateToPath, diff --git a/migration/migrate.go b/migration/migrate.go index c1a5edf..b6003e4 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -50,9 +50,6 @@ type Migration struct { skip int retryCount int - //Number of goroutines to run. So at most concurrency * Batch goroutines will run. i.e. for bucket level and object level - concurrency int - szCtMu sync.Mutex //size and count mutex; used to update migratedSize and totalMigratedObjects migratedSize uint64 totalMigratedObjects uint64 @@ -100,7 +97,6 @@ func InitMigration(mConfig *MigrationConfig) error { zStore: dStorageService, awsStore: awsStorageService, skip: mConfig.Skip, - concurrency: mConfig.Concurrency, retryCount: mConfig.RetryCount, stateFilePath: mConfig.StateFilePath, migrateTo: mConfig.MigrateToPath, diff --git a/migration/migrateConfig.go b/migration/migrateConfig.go index a206e16..49b99ac 100644 --- a/migration/migrateConfig.go +++ b/migration/migrateConfig.go @@ -5,7 +5,6 @@ import "time" type MigrationConfig struct { AllocationID string Skip int - Concurrency int Bucket string Region string Prefix string From 0d5548c3d8354327ac9adde75783e91a268aa058 Mon Sep 17 00:00:00 2001 From: Md Miran Ahmed Ansari Date: Thu, 3 Feb 2022 00:20:25 +0530 Subject: [PATCH 10/11] changes to make queue sync --- migration/migrate.go | 8 ++++++++ migration/migration_worker.go | 10 ++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/migration/migrate.go b/migration/migrate.go index b6003e4..a8d3ae8 100644 --- a/migration/migrate.go +++ b/migration/migrate.go @@ -195,6 +195,7 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorke DoneChan: make(chan struct{}, 1), ErrChan: make(chan error, 1), } + migrator.PushToDownloadQueue(downloadObjMeta) go func() { defer wg.Done() @@ -205,6 +206,7 @@ func (m *Migration) DownloadWorker(ctx context.Context, migrator *MigrationWorke } if downloadObjMeta.IsFileAlreadyExist && migration.skip == Skip { zlogger.Logger.Info("Skipping migration of object" + downloadObjMeta.ObjectKey) + downloadObjMeta.DoneChan <- struct{}{} return } migrator.DownloadStart(downloadObjMeta) @@ -237,6 +239,8 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) ErrChan: make(chan error, 1), Size: downloadObj.Size, } + migrator.PushToUploadQueue(uploadObj) + wg.Add(1) go func() { defer wg.Done() @@ -245,6 +249,10 @@ func (m *Migration) UploadWorker(ctx context.Context, migrator *MigrationWorker) migrator.SetMigrationError(err) return } + if downloadObj.LocalPath == "" { + uploadObj.DoneChan <- struct{}{} + return + } defer func() { _ = m.fs.Remove(downloadObj.LocalPath) }() diff --git a/migration/migration_worker.go b/migration/migration_worker.go index 7398759..4edd72e 100644 --- a/migration/migration_worker.go +++ b/migration/migration_worker.go @@ -84,10 +84,13 @@ func (m *MigrationWorker) PauseUpload() { } } +func (m *MigrationWorker) PushToUploadQueue(u *UploadObjectMeta) { + m.uploadQueue <- u +} + func (m *MigrationWorker) UploadStart(u *UploadObjectMeta) { m.incrUploadConcurrency() atomic.AddInt64(&m.currentUploadSize, u.Size) - m.uploadQueue <- u } func (m *MigrationWorker) UploadDone(u *UploadObjectMeta, err error) { @@ -126,9 +129,12 @@ func (m *MigrationWorker) PauseDownload() { } } +func (m *MigrationWorker) PushToDownloadQueue(d *DownloadObjectMeta) { + m.downloadQueue <- d +} + func (m *MigrationWorker) DownloadStart(d *DownloadObjectMeta) { m.incrDownloadConcurrency() - m.downloadQueue <- d m.updateFileSizeOnDisk(d.Size) atomic.AddInt64(&m.currentDownloadSize, d.Size) } From a51e6da39305f9bb0148741da4400d93cfe5f8ce Mon Sep 17 00:00:00 2001 From: Md Miran Ahmed Ansari Date: Thu, 3 Feb 2022 01:04:46 +0530 Subject: [PATCH 11/11] zlogger error --- migration/migration_worker.go | 3 ++- util/util.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/migration/migration_worker.go b/migration/migration_worker.go index 4edd72e..6078477 100644 --- a/migration/migration_worker.go +++ b/migration/migration_worker.go @@ -99,8 +99,9 @@ func (m *MigrationWorker) UploadDone(u *UploadObjectMeta, err error) { atomic.AddInt64(&m.currentUploadSize, -u.Size) if err != nil { u.ErrChan <- err + } else { + u.DoneChan <- struct{}{} } - u.DoneChan <- struct{}{} } func (m *MigrationWorker) CloseUploadQueue() { diff --git a/util/util.go b/util/util.go index 8c6ac03..7c3a5ee 100644 --- a/util/util.go +++ b/util/util.go @@ -10,6 +10,7 @@ import ( "strings" "time" + zlogger "github.com/0chain/s3migration/logger" "github.com/mitchellh/go-homedir" "github.com/spf13/viper" ) @@ -111,7 +112,7 @@ func ConvertGoSDKTimeToTime(in string) time.Time { func Retry(attempts int, sleep time.Duration, f func() error) (err error) { for i := 0; i < attempts; i++ { if err = f(); err != nil { - log.Println("retrying after error:", err) + zlogger.Logger.Error("retrying after error: ", err) time.Sleep(sleep) continue }