Skip to content

Commit

Permalink
Migrated code in ste (#2357)
Browse files Browse the repository at this point in the history
  • Loading branch information
gapra-msft authored Sep 5, 2023
1 parent 78856cc commit 3f7e3e6
Show file tree
Hide file tree
Showing 23 changed files with 194 additions and 841 deletions.
1 change: 1 addition & 0 deletions cmd/syncProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,7 @@ func (b *remoteResourceDeleter) delete(object StoredObject) error {
}
case common.ELocation.BlobFS():
directoryClient := common.CreateDatalakeDirectoryClient(objectURL.String(), b.credInfo, nil, b.clientOptions)
// TODO : Recursive delete
_, err = directoryClient.Delete(ctx, nil)
default:
panic("not implemented, check your code")
Expand Down
18 changes: 9 additions & 9 deletions common/fe-ste-models.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
datalakefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
sharefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
"github.com/Azure/azure-storage-azcopy/v10/azbfs"
"math"
"os"
"reflect"
Expand Down Expand Up @@ -1336,14 +1336,14 @@ func (h ResourceHTTPHeaders) ToFileHTTPHeaders() sharefile.HTTPHeaders {
}

// ToBlobFSHTTPHeaders converts ResourceHTTPHeaders to BlobFS Headers.
func (h ResourceHTTPHeaders) ToBlobFSHTTPHeaders() azbfs.BlobFSHTTPHeaders {
return azbfs.BlobFSHTTPHeaders{
ContentType: h.ContentType,
// ContentMD5 isn't in these headers. ContentMD5 is handled separately for BlobFS
ContentEncoding: h.ContentEncoding,
ContentLanguage: h.ContentLanguage,
ContentDisposition: h.ContentDisposition,
CacheControl: h.CacheControl,
func (h ResourceHTTPHeaders) ToBlobFSHTTPHeaders() datalakefile.HTTPHeaders {
return datalakefile.HTTPHeaders{
ContentType: &h.ContentType,
ContentMD5: h.ContentMD5,
ContentEncoding: &h.ContentEncoding,
ContentLanguage: &h.ContentLanguage,
ContentDisposition: &h.ContentDisposition,
CacheControl: &h.CacheControl,
}
}

Expand Down
20 changes: 10 additions & 10 deletions common/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ package common
import (
"fmt"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
datalakefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
sharefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
"log"
"net/url"
"os"
"path"
"runtime"
Expand Down Expand Up @@ -145,10 +145,10 @@ func (jl jobLogger) Panic(err error) {

const TryEquals string = "Try=" // TODO: refactor so that this can be used by the retry policies too? So that when you search the logs for Try= you are guaranteed to find both types of retry (i.e. request send retries, and body read retries)

func NewV1ReadLogFunc(logger ILogger, fullUrl *url.URL) func(int, error, int64, int64, bool) {
redactedUrl := URLStringExtension(fullUrl.String()).RedactSecretQueryParamForLogging()
func NewBlobReadLogFunc(logger ILogger, fullUrl string) func(int32, error, blob.HTTPRange, bool) {
redactedUrl := URLStringExtension(fullUrl).RedactSecretQueryParamForLogging()

return func(failureCount int, err error, offset int64, count int64, willRetry bool) {
return func(failureCount int32, err error, r blob.HTTPRange, willRetry bool) {
retryMessage := "Will retry"
if !willRetry {
retryMessage = "Will NOT retry"
Expand All @@ -165,16 +165,16 @@ func NewV1ReadLogFunc(logger ILogger, fullUrl *url.URL) func(int, error, int64,

retryMessage,
err,
offset,
count,
r.Offset,
r.Count,
redactedUrl))
}
}

func NewBlobReadLogFunc(logger ILogger, fullUrl string) func(int32, error, blob.HTTPRange, bool) {
func NewFileReadLogFunc(logger ILogger, fullUrl string) func(int32, error, sharefile.HTTPRange, bool) {
redactedUrl := URLStringExtension(fullUrl).RedactSecretQueryParamForLogging()

return func(failureCount int32, err error, r blob.HTTPRange, willRetry bool) {
return func(failureCount int32, err error, r sharefile.HTTPRange, willRetry bool) {
retryMessage := "Will retry"
if !willRetry {
retryMessage = "Will NOT retry"
Expand All @@ -197,10 +197,10 @@ func NewBlobReadLogFunc(logger ILogger, fullUrl string) func(int32, error, blob.
}
}

func NewFileReadLogFunc(logger ILogger, fullUrl string) func(int32, error, sharefile.HTTPRange, bool) {
func NewDatalakeReadLogFunc(logger ILogger, fullUrl string) func(int32, error, datalakefile.HTTPRange, bool) {
redactedUrl := URLStringExtension(fullUrl).RedactSecretQueryParamForLogging()

return func(failureCount int32, err error, r sharefile.HTTPRange, willRetry bool) {
return func(failureCount int32, err error, r datalakefile.HTTPRange, willRetry bool) {
retryMessage := "Will retry"
if !willRetry {
retryMessage = "Will NOT retry"
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ require (
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c
)

replace github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake => github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v0.1.0-beta.1.0.20230823184147-c9d65f00ca17

require github.com/stretchr/testify v1.8.1

require (
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ github.com/Azure/azure-sdk-for-go/sdk/internal v1.3.0/go.mod h1:okt5dMMTOFjX/aov
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0 h1:Ma67P/GGprNwsslzEH6+Kb8nybI8jpDTm4Wmzu2ReK8=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0 h1:nVocQV40OQne5613EeLayJiRAJuKlBGy+m22qWG+WRg=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.1.0/go.mod h1:7QJP7dr2wznCMeqIrhMgWGf7XpAQnVrJqDm9nvV3Cu4=
github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v0.1.0-beta.1 h1:JfniP1EfZ5zV3gweEs5qSXTIA7m8xzAyoVUZ/5ehHk0=
github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v0.1.0-beta.1/go.mod h1:LOiiRCZKY9OlgPDmDrdM8uiL63lwSe01M0hklP3/4xc=
github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v0.1.0-beta.1.0.20230823184147-c9d65f00ca17 h1:qbkTKL4uPkbcjHRez9TcQJDPVdPK+dRmmdOMBYYfOvQ=
github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake v0.1.0-beta.1.0.20230823184147-c9d65f00ca17/go.mod h1:LOiiRCZKY9OlgPDmDrdM8uiL63lwSe01M0hklP3/4xc=
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.0.0 h1:iqXx16jKhIkx1FLPA4tsaXLc6zIrj/kMesoutWDv6MI=
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.0.0/go.mod h1:AjDdvSU6d92BGS2JfdsKi+H/c2vQY3OFp4qhxzsUH8g=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
Expand Down
10 changes: 2 additions & 8 deletions ste/ErrorExt.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"errors"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"net/http"

"github.com/Azure/azure-storage-azcopy/v10/azbfs"
)

type ErrorEx struct {
Expand All @@ -18,12 +16,8 @@ func (errex ErrorEx) ErrorCodeAndString() (string, int, string) {
if errors.As(errex.error, &respErr) {
return respErr.ErrorCode, respErr.StatusCode, respErr.RawResponse.Status
}
switch e := interface{}(errex.error).(type) {
case azbfs.StorageError:
return string(e.ServiceCode()), e.Response().StatusCode, e.Response().Status
default:
return "", 0, errex.Error()
}
return "", 0, errex.Error()

}

type hasResponse interface {
Expand Down
28 changes: 12 additions & 16 deletions ste/downloader-blobFS.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,11 @@ package ste

import (
"errors"
"net/url"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
"os"
"time"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-azcopy/v10/azbfs"
"github.com/Azure/azure-storage-azcopy/v10/common"
)

Expand All @@ -40,7 +39,7 @@ func newBlobFSDownloader() downloader {
return &blobFSDownloader{}
}

func (bd *blobFSDownloader) Prologue(jptm IJobPartTransferMgr, srcPipeline pipeline.Pipeline) {
func (bd *blobFSDownloader) Prologue(jptm IJobPartTransferMgr, _ pipeline.Pipeline) {
bd.jptm = jptm
bd.txInfo = jptm.Info() // Inform the downloader
}
Expand Down Expand Up @@ -70,40 +69,37 @@ func (bd *blobFSDownloader) Epilogue() {

// Returns a chunk-func for ADLS gen2 downloads

func (bd *blobFSDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, srcPipeline pipeline.Pipeline, destWriter common.ChunkedFileWriter, id common.ChunkID, length int64, pacer pacer) chunkFunc {
func (bd *blobFSDownloader) GenerateDownloadFunc(jptm IJobPartTransferMgr, _ pipeline.Pipeline, destWriter common.ChunkedFileWriter, id common.ChunkID, length int64, pacer pacer) chunkFunc {
return createDownloadChunkFunc(jptm, id, func() {

// step 1: Downloading the file from range startIndex till (startIndex + adjustedChunkSize)
info := jptm.Info()
u, _ := url.Parse(info.Source)
srcFileURL := azbfs.NewDirectoryURL(*u, srcPipeline).NewFileUrl()
source := info.Source
srcFileClient := common.CreateDatalakeFileClient(source, jptm.CredentialInfo(), jptm.CredentialOpOptions(), jptm.ClientOptions())

// At this point we create an HTTP(S) request for the desired portion of the file, and
// wait until we get the headers back... but we have not yet read its whole body.
// The Download method encapsulates any retries that may be necessary to get to the point of receiving response headers.
jptm.LogChunkStatus(id, common.EWaitReason.HeaderResponse())
get, err := srcFileURL.Download(jptm.Context(), id.OffsetInFile(), length)
get, err := srcFileClient.DownloadStream(jptm.Context(), &file.DownloadStreamOptions{Range: &file.HTTPRange{Offset: id.OffsetInFile(), Count: length}})
if err != nil {
jptm.FailActiveDownload("Downloading response body", err) // cancel entire transfer because this chunk has failed
return
}

// parse the remote lmt, there shouldn't be any error, unless the service returned a new format
remoteLastModified, err := time.Parse(time.RFC1123, get.LastModified())
common.PanicIfErr(err)
remoteLmtLocation := remoteLastModified.Location()

// Verify that the file has not been changed via a client side LMT check
if !remoteLastModified.Equal(jptm.LastModifiedTime().In(remoteLmtLocation)) {
getLMT := get.LastModified.In(time.FixedZone("GMT", 0))
if !getLMT.Equal(jptm.LastModifiedTime().In(time.FixedZone("GMT", 0))) {
jptm.FailActiveDownload("BFS File modified during transfer",
errors.New("BFS File modified during transfer"))
}

// step 2: Enqueue the response body to be written out to disk
// The retryReader encapsulates any retries that may be necessary while downloading the body
jptm.LogChunkStatus(id, common.EWaitReason.Body())
retryReader := get.Body(azbfs.RetryReaderOptions{
MaxRetryRequests: MaxRetryPerDownloadBody,
NotifyFailedRead: common.NewV1ReadLogFunc(jptm, u),
retryReader := get.NewRetryReader(jptm.Context(), &file.RetryReaderOptions{
MaxRetries: MaxRetryPerDownloadBody,
OnFailedRead: common.NewDatalakeReadLogFunc(jptm, source),
})
defer retryReader.Close()
err = destWriter.EnqueueChunk(jptm.Context(), id, length, newPacedResponseBody(jptm.Context(), retryReader, pacer), true)
Expand Down
132 changes: 0 additions & 132 deletions ste/mgr-JobPartMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"time"

"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-azcopy/v10/azbfs"
"github.com/Azure/azure-storage-azcopy/v10/common"
"golang.org/x/sync/semaphore"
)
Expand Down Expand Up @@ -165,33 +164,6 @@ func NewClientOptions(retry policy.RetryOptions, telemetry policy.TelemetryOptio
}
}

// NewBlobFSPipeline creates a pipeline for transfers to and from BlobFS Service
// The blobFS operations currently in azcopy are supported by SharedKey Credentials
func NewBlobFSPipeline(c azbfs.Credential, o azbfs.PipelineOptions, r XferRetryOptions, p pacer, client *http.Client, statsAcc *PipelineNetworkStats) pipeline.Pipeline {
if c == nil {
panic("c can't be nil")
}
// Closest to API goes first; closest to the wire goes last
f := []pipeline.Factory{
azbfs.NewTelemetryPolicyFactory(o.Telemetry),
azbfs.NewUniqueRequestIDPolicyFactory(),
NewBFSXferRetryPolicyFactory(r), // actually retry the operation
newV1RetryNotificationPolicyFactory(), // record that a retry status was returned
}

f = append(f, c)

f = append(f,
pipeline.MethodFactoryMarker(), // indicates at what stage in the pipeline the method factory is invoked
NewRequestLogPolicyFactory(RequestLogOptions{
LogWarningIfTryOverThreshold: o.RequestLog.LogWarningIfTryOverThreshold,
SyslogDisabled: common.IsForceLoggingDisabled(),
}),
newXferStatsPolicyFactory(statsAcc))

return pipeline.NewPipeline(f, pipeline.Options{HTTPSender: newAzcopyHTTPClientFactory(client), Log: o.Log})
}

// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

// Holds the status of transfers in this jptm
Expand Down Expand Up @@ -373,7 +345,6 @@ func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context) {
jpm.priority = plan.Priority

jpm.clientInfo()
jpm.createPipelines(jobCtx) // pipeline is created per job part manager

// *** Schedule this job part's transfers ***
for t := uint32(0); t < plan.NumTransfers; t++ {
Expand Down Expand Up @@ -548,109 +519,6 @@ func (jpm *jobPartMgr) clientInfo() {
jpm.s2sSourceClientOptions = NewClientOptions(retryOptions, telemetryOptions, httpClient, nil, logOptions, sourceTrailingDot, nil)
jpm.clientOptions = NewClientOptions(retryOptions, telemetryOptions, httpClient, networkStats, logOptions, trailingDot, from)}

func (jpm *jobPartMgr) createPipelines(ctx context.Context) {
if atomic.SwapUint32(&jpm.atomicPipelinesInitedIndicator, 1) != 0 {
panic("init client and pipelines for same jobPartMgr twice")
}
fromTo := jpm.planMMF.Plan().FromTo
credInfo := jpm.credInfo
if jpm.credInfo.CredentialType == common.ECredentialType.Unknown() {
credInfo = jpm.jobMgr.getInMemoryTransitJobState().CredentialInfo
}
var userAgent string
if fromTo.From() == common.ELocation.S3() {
userAgent = common.S3ImportUserAgent
} else if fromTo.From() == common.ELocation.GCP() {
userAgent = common.GCPImportUserAgent
} else if fromTo.From() == common.ELocation.Benchmark() || fromTo.To() == common.ELocation.Benchmark() {
userAgent = common.BenchmarkUserAgent
} else {
userAgent = common.GetLifecycleMgr().AddUserAgentPrefix(common.UserAgent)
}

credOption := common.CredentialOpOptions{
LogInfo: func(str string) { jpm.Log(pipeline.LogInfo, str) },
LogError: func(str string) { jpm.Log(pipeline.LogError, str) },
Panic: jpm.Panic,
CallerID: fmt.Sprintf("JobID=%v, Part#=%d", jpm.Plan().JobID, jpm.Plan().PartNum),
Cancel: jpm.jobMgr.Cancel,
}
// TODO: Consider to remove XferRetryPolicy and Options?
xferRetryOption := XferRetryOptions{
Policy: 0,
MaxTries: UploadMaxTries, // TODO: Consider to unify options.
TryTimeout: UploadTryTimeout,
RetryDelay: UploadRetryDelay,
MaxRetryDelay: UploadMaxRetryDelay}

var statsAccForSip *PipelineNetworkStats = nil // we don't accumulate stats on the source info provider

// Create source info provider's pipeline for S2S copy or download (in some cases).
// BlobFS and Blob will utilize the Blob source info provider, as they are the "same" resource, but provide different details on both endpoints
if (fromTo.IsS2S() || fromTo.IsDownload()) && (fromTo.From() == common.ELocation.Blob() || fromTo.From() == common.ELocation.BlobFS()) {
// Prepare to pull dfs properties if we're working with BlobFS
if fromTo.From() == common.ELocation.BlobFS() || jpm.Plan().PreservePermissions.IsTruthy() || jpm.Plan().PreservePOSIXProperties {
credential := common.CreateBlobFSCredential(ctx, credInfo, credOption)
jpm.secondarySourceProviderPipeline = NewBlobFSPipeline(
credential,
azbfs.PipelineOptions{
Log: jpm.jobMgr.PipelineLogInfo(),
Telemetry: azbfs.TelemetryOptions{
Value: userAgent,
},
},
xferRetryOption,
jpm.pacer,
jpm.jobMgr.HttpClient(),
statsAccForSip)
}
}

switch {
case fromTo.IsS2S() && (fromTo.To() == common.ELocation.Blob() || fromTo.To() == common.ELocation.BlobFS()), // destination determines pipeline for S2S, blobfs uses blob for S2S
fromTo.IsUpload() && fromTo.To() == common.ELocation.Blob(), // destination determines pipeline for upload
fromTo.IsDownload() && fromTo.From() == common.ELocation.Blob(), // source determines pipeline for download
fromTo.IsSetProperties() && (fromTo.From() == common.ELocation.Blob() || fromTo.From() == common.ELocation.BlobFS()), // source determines pipeline for set properties, blobfs uses blob for set properties
fromTo.IsDelete() && fromTo.From() == common.ELocation.Blob(): // ditto for delete
jpm.Log(pipeline.LogInfo, fmt.Sprintf("JobID=%v, credential type: %v", jpm.Plan().JobID, credInfo.CredentialType))

// If we need to write specifically to the gen2 endpoint, we should have this available.
if fromTo.To() == common.ELocation.BlobFS() || jpm.Plan().PreservePermissions.IsTruthy() || jpm.Plan().PreservePOSIXProperties {
credential := common.CreateBlobFSCredential(ctx, credInfo, credOption)
jpm.secondaryPipeline = NewBlobFSPipeline(
credential,
azbfs.PipelineOptions{
Log: jpm.jobMgr.PipelineLogInfo(),
Telemetry: azbfs.TelemetryOptions{
Value: userAgent,
},
},
xferRetryOption,
jpm.pacer,
jpm.jobMgr.HttpClient(),
statsAccForSip)
}
case fromTo.IsUpload() && fromTo.To() == common.ELocation.BlobFS(), // Blobfs up/down/delete use the dfs endpoint
fromTo.IsDownload() && fromTo.From() == common.ELocation.BlobFS(),
fromTo.IsDelete() && fromTo.From() == common.ELocation.BlobFS():
credential := common.CreateBlobFSCredential(ctx, credInfo, credOption)
jpm.Log(pipeline.LogInfo, fmt.Sprintf("JobID=%v, credential type: %v", jpm.Plan().JobID, credInfo.CredentialType))

jpm.pipeline = NewBlobFSPipeline(
credential,
azbfs.PipelineOptions{
Log: jpm.jobMgr.PipelineLogInfo(),
Telemetry: azbfs.TelemetryOptions{
Value: userAgent,
},
},
xferRetryOption,
jpm.pacer,
jpm.jobMgr.HttpClient(),
jpm.jobMgr.PipelineNetworkStats())
}
}

func (jpm *jobPartMgr) SlicePool() common.ByteSlicePooler {
return jpm.slicePool
}
Expand Down
Loading

0 comments on commit 3f7e3e6

Please sign in to comment.