diff --git a/azure-pipelines.yml b/azure-pipelines.yml index ddd904622..6ba53ab3f 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -257,10 +257,6 @@ jobs: # acquire the mutex before running live tests to avoid conflicts python ./tool_distributed_mutex.py lock "$(MUTEX_URL)" name: 'Acquire_the_distributed_mutex' - - template: azurePipelineTemplates/run-ut.yml - parameters: - directory: 'azbfs' - coverage_name: 'azbfs' - template: azurePipelineTemplates/run-ut.yml parameters: directory: 'cmd' diff --git a/cmd/syncProcessor.go b/cmd/syncProcessor.go index c76575b0b..3ab751e00 100644 --- a/cmd/syncProcessor.go +++ b/cmd/syncProcessor.go @@ -25,6 +25,7 @@ import ( "encoding/json" "fmt" "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake" @@ -374,7 +375,8 @@ func (b *remoteResourceDeleter) delete(object StoredObject) error { var err error switch b.targetLocation { case common.ELocation.Blob(): - blobURLParts, err := blob.ParseURL(b.rootURL.String()) + var blobURLParts blob.URLParts + blobURLParts, err = blob.ParseURL(b.rootURL.String()) if err != nil { return err } @@ -383,7 +385,8 @@ func (b *remoteResourceDeleter) delete(object StoredObject) error { blobClient := common.CreateBlobClient(blobURLParts.String(), b.credInfo, nil, b.clientOptions) _, err = blobClient.Delete(b.ctx, nil) case common.ELocation.File(): - fileURLParts, err := sharefile.ParseURL(b.rootURL.String()) + var fileURLParts sharefile.URLParts + fileURLParts, err = sharefile.ParseURL(b.rootURL.String()) if err != nil { return err } @@ -412,7 +415,8 @@ func (b *remoteResourceDeleter) delete(object StoredObject) error { } } case common.ELocation.BlobFS(): - datalakeURLParts, err := azdatalake.ParseURL(b.rootURL.String()) + var datalakeURLParts azdatalake.URLParts + datalakeURLParts, err = azdatalake.ParseURL(b.rootURL.String()) if err != nil { return err } @@ -471,9 +475,11 @@ func (b *remoteResourceDeleter) delete(object StoredObject) error { } } case common.ELocation.BlobFS(): - directoryClient := common.CreateDatalakeDirectoryClient(objectURL.String(), b.credInfo, nil, b.clientOptions) - // TODO : Recursive delete - _, err = directoryClient.Delete(ctx, nil) + clientOptions := b.clientOptions + clientOptions.PerCallPolicies = append([]policy.Policy{common.NewRecursivePolicy()}, clientOptions.PerCallPolicies...) + directoryClient := common.CreateDatalakeDirectoryClient(objectURL.String(), b.credInfo, nil, clientOptions) + recursiveContext := common.WithRecursive(ctx, false) + _, err = directoryClient.Delete(recursiveContext, nil) default: panic("not implemented, check your code") } diff --git a/common/blobFSRecursiveDeletePolicy.go b/common/blobFSRecursiveDeletePolicy.go new file mode 100644 index 000000000..bded9a2ff --- /dev/null +++ b/common/blobFSRecursiveDeletePolicy.go @@ -0,0 +1,55 @@ +// Copyright © Microsoft +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package common + +import ( + "context" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "net/http" + "strconv" +) + +// CtxRecursiveKey is used as a context key to apply the recursive query parameter. +type CtxRecursiveKey struct{} + +// WithRecursive applies the recursive parameter to the request. +func WithRecursive(parent context.Context, recursive bool) context.Context { + return context.WithValue(parent, CtxRecursiveKey{}, recursive) +} + +type recursivePolicy struct { +} + +// NewRecursivePolicy creates a policy that applies the recursive parameter to the request. +func NewRecursivePolicy() policy.Policy { + return &recursivePolicy{} +} + +func (p *recursivePolicy) Do(req *policy.Request) (*http.Response, error) { + if recursive := req.Raw().Context().Value(CtxRecursiveKey{}); recursive != nil { + if req.Raw().URL.Query().Has("recursive") { + query := req.Raw().URL.Query() + query.Set("recursive", strconv.FormatBool(recursive.(bool))) + req.Raw().URL.RawQuery = query.Encode() + } + } + return req.Next() +} \ No newline at end of file diff --git a/common/blobFSRecursiveDeletePolicy_test.go b/common/blobFSRecursiveDeletePolicy_test.go new file mode 100644 index 000000000..843740f94 --- /dev/null +++ b/common/blobFSRecursiveDeletePolicy_test.go @@ -0,0 +1,106 @@ +package common + +import ( + "context" + "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/stretchr/testify/assert" + "net/http" + "testing" +) + +type testRecursive struct{ + recursive string +} + +func (t testRecursive) Do(req *policy.Request) (*http.Response, error) { + if req.Raw().URL.Query().Has("recursive") { + if req.Raw().URL.Query().Get("recursive") == t.recursive { + return &http.Response{}, nil + } + } + return &http.Response{}, fmt.Errorf("recursive query parameter not found or does not match expected value. expected: %s, actual: %s", t.recursive, req.Raw().URL.Query().Get("recursive")) +} + +func TestRecursivePolicyExpectTrue(t *testing.T) { + a := assert.New(t) + ctx := WithRecursive(context.Background(), true) + policies := []policy.Policy{NewRecursivePolicy(), testRecursive{"true"}} + p := runtime.NewPipeline("testmodule", "v0.1.0", runtime.PipelineOptions{}, &policy.ClientOptions{Transport: nil, PerCallPolicies: policies}) + + endpoints := []string{"https://xxxx.dfs.core.windows.net/container/path?recursive=true", + "https://xxxx.dfs.core.windows.net/container/path?recursive=true&sig=xxxxxx&snapshot=xxxxx&timeout=xxxx", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&recursive=true&snapshot=xxxxx&timeout=xxxx", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&snapshot=xxxxx&timeout=xxxx&recursive=true", + "https://xxxx.dfs.core.windows.net/container/path?recursive=false", + "https://xxxx.dfs.core.windows.net/container/path?recursive=false&sig=xxxxxx&snapshot=xxxxx&timeout=xxxx", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&recursive=false&snapshot=xxxxx&timeout=xxxx", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&snapshot=xxxxx&timeout=xxxx&recursive=false",} + + for _, e := range endpoints { + req, err := runtime.NewRequest(ctx, "HEAD", e) + a.Nil(err) + _, err = p.Do(req) + a.Nil(err) + } +} + +func TestRecursivePolicyExpectFalse(t *testing.T) { + a := assert.New(t) + ctx := WithRecursive(context.Background(), false) + policies := []policy.Policy{NewRecursivePolicy(), testRecursive{"false"}} + p := runtime.NewPipeline("testmodule", "v0.1.0", runtime.PipelineOptions{}, &policy.ClientOptions{Transport: nil, PerCallPolicies: policies}) + + endpoints := []string{"https://xxxx.dfs.core.windows.net/container/path?recursive=true", + "https://xxxx.dfs.core.windows.net/container/path?recursive=true&sig=xxxxxx&snapshot=xxxxx&timeout=xxxx", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&recursive=true&snapshot=xxxxx&timeout=xxxx", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&snapshot=xxxxx&timeout=xxxx&recursive=true", + "https://xxxx.dfs.core.windows.net/container/path?recursive=false", + "https://xxxx.dfs.core.windows.net/container/path?recursive=false&sig=xxxxxx&snapshot=xxxxx&timeout=xxxx", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&recursive=false&snapshot=xxxxx&timeout=xxxx", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&snapshot=xxxxx&timeout=xxxx&recursive=false",} + + for _, e := range endpoints { + req, err := runtime.NewRequest(ctx, "HEAD", e) + a.Nil(err) + _, err = p.Do(req) + a.Nil(err) + } +} + +type testEndpoint struct{ + endpoint string +} + +func (t testEndpoint) Do(req *policy.Request) (*http.Response, error) { + if req.Raw().URL.String() == t.endpoint { + return &http.Response{}, nil + } + return &http.Response{}, fmt.Errorf("recursive query parameter not found or does not match expected value. expected: %s, actual: %s", t.endpoint, req.Raw().URL.String()) +} + +func TestRecursivePolicyExpectNoChange(t *testing.T) { + a := assert.New(t) + + endpoints := []string{"https://xxxx.dfs.core.windows.net/container/path?recursive=true", + "https://xxxx.dfs.core.windows.net/container/path?recursive=true&sig=xxxxxx&snapshot=xxxxx&timeout=xxxx", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&recursive=true&snapshot=xxxxx&timeout=xxxx", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&snapshot=xxxxx&timeout=xxxx&recursive=true", + "https://xxxx.dfs.core.windows.net/container/path?recursive=false", + "https://xxxx.dfs.core.windows.net/container/path?recursive=false&sig=xxxxxx&snapshot=xxxxx&timeout=xxxx", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&recursive=false&snapshot=xxxxx&timeout=xxxx", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&snapshot=xxxxx&timeout=xxxx&recursive=false", + "https://xxxx.dfs.core.windows.net/container/path", + "https://xxxx.dfs.core.windows.net/container/path?sig=xxxxxx&snapshot=xxxxx&timeout=xxxx",} + + for _, e := range endpoints { + policies := []policy.Policy{NewRecursivePolicy(), testEndpoint{e}} + p := runtime.NewPipeline("testmodule", "v0.1.0", runtime.PipelineOptions{}, &policy.ClientOptions{Transport: nil, PerCallPolicies: policies}) + req, err := runtime.NewRequest(context.Background(), "HEAD", e) + a.Nil(err) + _, err = p.Do(req) + a.Nil(err) + } + +} diff --git a/common/credentialFactory.go b/common/credentialFactory.go index 1381bfee8..b7b2ee4e6 100644 --- a/common/credentialFactory.go +++ b/common/credentialFactory.go @@ -21,18 +21,14 @@ package common import ( + gcpUtils "cloud.google.com/go/storage" "context" "errors" "fmt" "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" - "math" "sync" - "time" - - gcpUtils "cloud.google.com/go/storage" - "github.com/Azure/go-autorest/autorest/adal" "github.com/minio/minio-go" "github.com/minio/minio-go/pkg/credentials" ) @@ -57,20 +53,6 @@ func (o CredentialOpOptions) callerMessage() string { return Iff(o.CallerID == "", o.CallerID, o.CallerID+" ") } -// logInfo logs info, if LogInfo is specified in CredentialOpOptions. -func (o CredentialOpOptions) logInfo(str string) { - if o.LogInfo != nil { - o.LogInfo(o.callerMessage() + str) - } -} - -// logError logs error, if LogError is specified in CredentialOpOptions. -func (o CredentialOpOptions) logError(str string) { - if o.LogError != nil { - o.LogError(o.callerMessage() + str) - } -} - // panicError uses built-in panic if no Panic is specified in CredentialOpOptions. func (o CredentialOpOptions) panicError(err error) { newErr := fmt.Errorf("%s%v", o.callerMessage(), err) @@ -81,14 +63,6 @@ func (o CredentialOpOptions) panicError(err error) { } } -func (o CredentialOpOptions) cancel() { - if o.Cancel != nil { - o.Cancel() - } else { - o.panicError(errors.New("cancel the operations")) - } -} - // GetSourceBlobCredential gets the TokenCredential based on the cred info func GetSourceBlobCredential(credInfo CredentialInfo, options CredentialOpOptions) (azcore.TokenCredential, error) { if credInfo.CredentialType.IsAzureOAuth() { @@ -104,32 +78,6 @@ func GetSourceBlobCredential(credInfo CredentialInfo, options CredentialOpOption return nil, nil } -// refreshPolicyHalfOfExpiryWithin is used for calculating next refresh time, -// it checks how long it will be before the token get expired, and use half of the value as -// duration to wait. -func refreshPolicyHalfOfExpiryWithin(token *adal.Token, options CredentialOpOptions) time.Duration { - if token == nil { - // Invalid state, token should not be nil, cancel the operation and stop refresh - options.logError("invalid state, token is nil, cancel will be triggered") - options.cancel() - return time.Duration(math.MaxInt64) - } - - waitDuration := token.Expires().Sub(time.Now().UTC()) / 2 - // In case of refresh flooding - if waitDuration < time.Second { - waitDuration = time.Second - } - - if GlobalTestOAuthInjection.DoTokenRefreshInjection { - waitDuration = GlobalTestOAuthInjection.TokenRefreshDuration - } - - options.logInfo(fmt.Sprintf("next token refresh's wait duration: %v", waitDuration)) - - return waitDuration -} - // CreateS3Credential creates AWS S3 credential according to credential info. func CreateS3Credential(ctx context.Context, credInfo CredentialInfo, options CredentialOpOptions) (*credentials.Credentials, error) { glcm := GetLifecycleMgr() diff --git a/common/genericResourceURLParts.go b/common/genericResourceURLParts.go index cde1a9710..c1b4cab8c 100644 --- a/common/genericResourceURLParts.go +++ b/common/genericResourceURLParts.go @@ -110,8 +110,6 @@ func (g *GenericResourceURLParts) String() string { default: panic(fmt.Sprintf("%s is an invalid location for GenericResourceURLParts", g.location)) } - - return "" } func (g *GenericResourceURLParts) URL() url.URL { diff --git a/ste/mgr-JobPartMgr.go b/ste/mgr-JobPartMgr.go index 881bab3ff..154167f8b 100644 --- a/ste/mgr-JobPartMgr.go +++ b/ste/mgr-JobPartMgr.go @@ -130,19 +130,6 @@ func (d *dialRateLimiter) DialContext(ctx context.Context, network, address stri return d.dialer.DialContext(ctx, network, address) } -// newAzcopyHTTPClientFactory creates a HTTPClientPolicyFactory object that sends HTTP requests to a Go's default http.Client. -func newAzcopyHTTPClientFactory(pipelineHTTPClient *http.Client) pipeline.Factory { - return pipeline.FactoryFunc(func(next pipeline.Policy, po *pipeline.PolicyOptions) pipeline.PolicyFunc { - return func(ctx context.Context, request pipeline.Request) (pipeline.Response, error) { - r, err := pipelineHTTPClient.Do(request.WithContext(ctx)) - if err != nil { - err = pipeline.NewError(err, "HTTP request failed") - } - return pipeline.NewHTTPResponse(r), err - } - }) -} - func NewClientOptions(retry policy.RetryOptions, telemetry policy.TelemetryOptions, transport policy.Transporter, statsAcc *PipelineNetworkStats, log LogOptions, trailingDot *common.TrailingDotOption, from *common.Location) azcore.ClientOptions { // Pipeline will look like // [includeResponsePolicy, newAPIVersionPolicy (ignored), NewTelemetryPolicy, perCall, NewRetryPolicy, perRetry, NewLogPolicy, httpHeaderPolicy, bodyDownloadPolicy] @@ -231,15 +218,8 @@ type jobPartMgr struct { exclusiveDestinationMap *common.ExclusiveStringMap pipeline pipeline.Pipeline // ordered list of Factory objects and an object implementing the HTTPSender interface - // Currently, this only sees use in ADLSG2->ADLSG2 ACL transfers. TODO: Remove it when we can reliably get/set ACLs on blob. - secondaryPipeline pipeline.Pipeline sourceProviderPipeline pipeline.Pipeline - // TODO: Ditto - secondarySourceProviderPipeline pipeline.Pipeline - - // used defensively to protect double init - atomicPipelinesInitedIndicator uint32 // numberOfTransfersDone_doNotUse represents the number of transfer of JobPartOrder // which are either completed or failed diff --git a/ste/sender-pageBlobFromURL.go b/ste/sender-pageBlobFromURL.go index 0b846eca0..df11c9ca3 100644 --- a/ste/sender-pageBlobFromURL.go +++ b/ste/sender-pageBlobFromURL.go @@ -22,6 +22,8 @@ package ste import ( "context" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob" @@ -152,6 +154,12 @@ func newPageRangeOptimizer(srcPageBlobClient *pageblob.Client, ctx context.Conte return &pageRangeOptimizer{srcPageBlobClient: srcPageBlobClient, ctx: ctx} } +// withNoRetryForBlob returns a context that contains a marker to say we don't want any retries to happen +// Is only implemented for blob pipelines at present +func withNoRetryForBlob(ctx context.Context) context.Context { + return runtime.WithRetryOptions(ctx, policy.RetryOptions{MaxRetries: 1}) +} + func (p *pageRangeOptimizer) fetchPages() { // don't fetch page blob list if optimizations are not desired, // the lack of page list indicates that there's data everywhere diff --git a/ste/xfer-deleteBlobFS.go b/ste/xfer-deleteBlobFS.go index c57163fa6..835f5b64d 100644 --- a/ste/xfer-deleteBlobFS.go +++ b/ste/xfer-deleteBlobFS.go @@ -2,6 +2,7 @@ package ste import ( "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake" @@ -44,8 +45,7 @@ func doDeleteHNSResource(jptm IJobPartTransferMgr) { panic("sanity check: HNS source URI did not parse.") } - // TODO : Recursive delete - //recursive := info.BlobFSRecursiveDelete + recursive := info.BlobFSRecursiveDelete transferDone := func(err error) { status := common.ETransferStatus.Success() @@ -72,7 +72,9 @@ func doDeleteHNSResource(jptm IJobPartTransferMgr) { } // Check if the source is a file or directory - directoryClient := common.CreateDatalakeDirectoryClient(info.Source, jptm.CredentialInfo(), jptm.CredentialOpOptions(), jptm.ClientOptions()) + clientOptions := jptm.ClientOptions() + clientOptions.PerCallPolicies = append([]policy.Policy{common.NewRecursivePolicy()}, clientOptions.PerCallPolicies...) + directoryClient := common.CreateDatalakeDirectoryClient(info.Source, jptm.CredentialInfo(), jptm.CredentialOpOptions(), clientOptions) var respFromCtx *http.Response ctxWithResp := runtime.WithCaptureResponse(ctx, &respFromCtx) _, err = directoryClient.GetProperties(ctxWithResp, nil) @@ -88,8 +90,9 @@ func doDeleteHNSResource(jptm IJobPartTransferMgr) { _, err := fileClient.Delete(ctx, nil) transferDone(err) } else { - // TODO : Recursive delete - _, err := directoryClient.Delete(ctx, nil) + // Remove the directory + recursiveContext := common.WithRecursive(ctx, recursive) + _, err := directoryClient.Delete(recursiveContext, nil) transferDone(err) } } \ No newline at end of file diff --git a/ste/xferRetrypolicy.go b/ste/xferRetrypolicy.go deleted file mode 100644 index 5ee3c4391..000000000 --- a/ste/xferRetrypolicy.go +++ /dev/null @@ -1,137 +0,0 @@ -package ste - -import ( - "context" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" - "math/rand" - "time" -) - -// XferRetryPolicy tells the pipeline what kind of retry policy to use. See the XferRetryPolicy* constants. -// Added a new retry policy and not using the existing policy zc_retry_policy.go since there are some changes -// in the retry policy. -// Retry on all the type of network errors instead of retrying only in case of temporary or timeout errors. -type XferRetryPolicy int32 - -const ( - // RetryPolicyExponential tells the pipeline to use an exponential back-off retry policy - RetryPolicyExponential XferRetryPolicy = 0 - - // RetryPolicyFixed tells the pipeline to use a fixed back-off retry policy - RetryPolicyFixed XferRetryPolicy = 1 -) - -// XferRetryOptions configures the retry policy's behavior. -type XferRetryOptions struct { - // Policy tells the pipeline what kind of retry policy to use. See the XferRetryPolicy* constants.\ - // A value of zero means that you accept our default policy. - Policy XferRetryPolicy - - // MaxTries specifies the maximum number of attempts an operation will be tried before producing an error (0=default). - // A value of zero means that you accept our default policy. A value of 1 means 1 try and no retries. - MaxTries int32 - - // TryTimeout indicates the maximum time allowed for any single try of an HTTP request. - // A value of zero means that you accept our default timeout. NOTE: When transferring large amounts - // of data, the default TryTimeout will probably not be sufficient. You should override this value - // based on the bandwidth available to the host machine and proximity to the Storage service. A good - // starting point may be something like (60 seconds per MB of anticipated-payload-size). - TryTimeout time.Duration - - // RetryDelay specifies the amount of delay to use before retrying an operation (0=default). - // The delay increases (exponentially or linearly) with each retry up to a maximum specified by - // MaxRetryDelay. If you specify 0, then you must also specify 0 for MaxRetryDelay. - RetryDelay time.Duration - - // MaxRetryDelay specifies the maximum delay allowed before retrying an operation (0=default). - // If you specify 0, then you must also specify 0 for RetryDelay. - MaxRetryDelay time.Duration - - // RetryReadsFromSecondaryHost specifies whether the retry policy should retry a read operation against another host. - // If RetryReadsFromSecondaryHost is "" (the default) then operations are not retried against another host. - // NOTE: Before setting this field, make sure you understand the issues around reading stale & potentially-inconsistent - // data at this webpage: https://docs.microsoft.com/en-us/azure/storage/common/storage-designing-ha-apps-with-ragrs - RetryReadsFromSecondaryHost string // Comment this our for non-Blob SDKs -} - -func (o XferRetryOptions) retryReadsFromSecondaryHost() string { - return o.RetryReadsFromSecondaryHost // This is for the Blob SDK only - //return "" // This is for non-blob SDKs -} - -func (o XferRetryOptions) defaults() XferRetryOptions { - if o.Policy != RetryPolicyExponential && o.Policy != RetryPolicyFixed { - panic("XferRetryPolicy must be RetryPolicyExponential or RetryPolicyFixed") - } - if o.MaxTries < 0 { - panic("MaxTries must be >= 0") - } - if o.TryTimeout < 0 || o.RetryDelay < 0 || o.MaxRetryDelay < 0 { - panic("TryTimeout, RetryDelay, and MaxRetryDelay must all be >= 0") - } - if o.RetryDelay > o.MaxRetryDelay { - panic("RetryDelay must be <= MaxRetryDelay") - } - if (o.RetryDelay == 0 && o.MaxRetryDelay != 0) || (o.RetryDelay != 0 && o.MaxRetryDelay == 0) { - panic("Both RetryDelay and MaxRetryDelay must be 0 or neither can be 0") - } - - IfDefault := func(current *time.Duration, desired time.Duration) { - if *current == time.Duration(0) { - *current = desired - } - } - - // Set defaults if unspecified - if o.MaxTries == 0 { - o.MaxTries = 4 - } - switch o.Policy { - case RetryPolicyExponential: - IfDefault(&o.TryTimeout, 1*time.Minute) - IfDefault(&o.RetryDelay, 4*time.Second) - IfDefault(&o.MaxRetryDelay, 120*time.Second) - - case RetryPolicyFixed: - IfDefault(&o.TryTimeout, 1*time.Minute) - IfDefault(&o.RetryDelay, 30*time.Second) - IfDefault(&o.MaxRetryDelay, 120*time.Second) - } - return o -} - -func (o XferRetryOptions) calcDelay(try int32) time.Duration { // try is >=1; never 0 - pow := func(number int64, exponent int32) int64 { // pow is nested helper function - var result int64 = 1 - for n := int32(0); n < exponent; n++ { - result *= number - } - return result - } - - delay := time.Duration(0) - switch o.Policy { - case RetryPolicyExponential: - delay = time.Duration(pow(2, try-1)-1) * o.RetryDelay - - case RetryPolicyFixed: - if try > 1 { // Any try after the 1st uses the fixed delay - delay = o.RetryDelay - } - } - - // Introduce some jitter: [0.0, 1.0) / 2 = [0.0, 0.5) + 0.8 = [0.8, 1.3) - // For casts and rounding - be careful, as per https://github.com/golang/go/issues/20757 - delay = time.Duration(float32(delay) * (rand.Float32()/2 + 0.8)) // NOTE: We want math/rand; not crypto/rand - if delay > o.MaxRetryDelay { - delay = o.MaxRetryDelay - } - return delay -} - -// withNoRetryForBlob returns a context that contains a marker to say we don't want any retries to happen -// Is only implemented for blob pipelines at present -func withNoRetryForBlob(ctx context.Context) context.Context { - return runtime.WithRetryOptions(ctx, policy.RetryOptions{MaxRetries: 1}) -} diff --git a/testSuite/cmd/testblobFS.go b/testSuite/cmd/testblobFS.go index 6b43634f1..8df70cd68 100644 --- a/testSuite/cmd/testblobFS.go +++ b/testSuite/cmd/testblobFS.go @@ -88,7 +88,8 @@ func (tbfsc TestBlobFSCommand) verifyRemoteFile() { if datalakeURLParts.SAS.Encode() != "" { fc, err = file.NewClientWithNoCredential(datalakeURLParts.String(), nil) } else { - cred, err := azdatalake.NewSharedKeyCredential(name, key) + var cred *azdatalake.SharedKeyCredential + cred, err = azdatalake.NewSharedKeyCredential(name, key) if err != nil { fmt.Printf("error creating shared key cred. failed with error %s\n", err.Error()) os.Exit(1) @@ -170,6 +171,10 @@ func (tbfsc TestBlobFSCommand) verifyRemoteDir() { fmt.Println("error parsing the datalake sas ", tbfsc.Subject) os.Exit(1) } + // break the remote Url into parts + // and save the directory path + currentDirectoryPath := datalakeURLParts.PathName + datalakeURLParts.PathName = "" // Get the Account Name and Key variables from environment name := os.Getenv("ACCOUNT_NAME") @@ -183,7 +188,8 @@ func (tbfsc TestBlobFSCommand) verifyRemoteDir() { if datalakeURLParts.SAS.Encode() != "" { fsc, err = filesystem.NewClientWithNoCredential(datalakeURLParts.String(), nil) } else { - cred, err := azdatalake.NewSharedKeyCredential(name, key) + var cred *azdatalake.SharedKeyCredential + cred, err = azdatalake.NewSharedKeyCredential(name, key) if err != nil { fmt.Printf("error creating shared key cred. failed with error %s\n", err.Error()) os.Exit(1) @@ -206,9 +212,6 @@ func (tbfsc TestBlobFSCommand) verifyRemoteDir() { fmt.Printf("the source provided %s is not a directory path\n", tbfsc.Object) os.Exit(1) } - // break the remote Url into parts - // and save the directory path - currentDirectoryPath := datalakeURLParts.PathName // List the directory pager := fsc.NewListPathsPager(true, &filesystem.ListPathsOptions{Prefix: ¤tDirectoryPath})