Skip to content

Commit

Permalink
Migrated datalake code in e2etest (#2363)
Browse files Browse the repository at this point in the history
  • Loading branch information
gapra-msft authored Sep 6, 2023
1 parent 3f7e3e6 commit 527ff0f
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 105 deletions.
22 changes: 10 additions & 12 deletions e2etest/declarativeResourceManagers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,10 @@ package e2etest
import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/datalakeerror"
datalakedirectory "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/directory"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share"
"github.com/Azure/azure-storage-azcopy/v10/azbfs"
"net/url"
"os"
"path"
Expand Down Expand Up @@ -252,11 +253,10 @@ func (r *resourceBlobContainer) createFiles(a asserter, s *scenario, isSource bo
break
}

rootURL := TestResourceFactory{}.GetDatalakeServiceURL(r.accountType).NewFileSystemURL(containerURLParts.ContainerName).NewDirectoryURL("/")
rootURL := TestResourceFactory{}.GetDatalakeServiceURL(r.accountType).NewFileSystemClient(containerURLParts.ContainerName).NewDirectoryClient("/")

_, err := rootURL.SetAccessControl(ctx, azbfs.BlobFSAccessControl{
ACL: *v.creationProperties.adlsPermissionsACL,
})
_, err := rootURL.SetAccessControl(ctx,
&datalakedirectory.SetAccessControlOptions{ACL: v.creationProperties.adlsPermissionsACL})
a.AssertNoErr(err)

break
Expand Down Expand Up @@ -331,19 +331,17 @@ func (r *resourceBlobContainer) getAllProperties(a asserter) map[string]*objectP
if r.accountType == EAccountType.HierarchicalNamespaceEnabled() {
urlParts, err := blob.ParseURL(r.containerClient.URL())
a.AssertNoErr(err)
fsURL := TestResourceFactory{}.GetDatalakeServiceURL(r.accountType).NewFileSystemURL(urlParts.ContainerName).NewDirectoryURL("/")
fsURL := TestResourceFactory{}.GetDatalakeServiceURL(r.accountType).NewFileSystemClient(urlParts.ContainerName).NewDirectoryClient("/")

ACL, err := fsURL.GetAccessControl(ctx)
if stgErr, ok := err.(azbfs.StorageError); ok {
if stgErr.ServiceCode() == "FilesystemNotFound" { // skip grabbing ACLs
return objects
}
resp, err := fsURL.GetAccessControl(ctx, nil)
if datalakeerror.HasCode(err, "FilesystemNotFound") {
return objects
}
a.AssertNoErr(err)

objects[""] = &objectProperties{
entityType: common.EEntityType.Folder(),
adlsPermissionsACL: &ACL.ACL,
adlsPermissionsACL: resp.ACL,
}
}

Expand Down
23 changes: 14 additions & 9 deletions e2etest/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,21 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
blobsas "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
blobservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake"
datalakeservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
filesas "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/sas"
fileservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/Azure/azure-storage-azcopy/v10/ste"
"net/url"
"github.com/google/uuid"
"os"
"path"
"runtime"
"strings"
"testing"
"time"

"github.com/Azure/azure-storage-azcopy/v10/azbfs"
"github.com/google/uuid"
)

// provide convenient methods to get access to test resources such as accounts, containers/shares, directories
Expand Down Expand Up @@ -85,13 +84,19 @@ func (TestResourceFactory) GetFileServiceURL(accountType AccountType) *fileservi
return fsc
}

func (TestResourceFactory) GetDatalakeServiceURL(accountType AccountType) azbfs.ServiceURL {
func (TestResourceFactory) GetDatalakeServiceURL(accountType AccountType) *datalakeservice.Client {
accountName, accountKey := GlobalInputManager{}.GetAccountAndKey(accountType)
u, _ := url.Parse(fmt.Sprintf("https://%s.dfs.core.windows.net/", accountName))
resourceURL := fmt.Sprintf("https://%s.dfs.core.windows.net/", accountName)

cred := azbfs.NewSharedKeyCredential(accountName, accountKey)
pipeline := azbfs.NewPipeline(cred, azbfs.PipelineOptions{})
return azbfs.NewServiceURL(*u, pipeline)
credential, err := azdatalake.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
panic(err)
}
dsc, err := datalakeservice.NewClientWithSharedKeyCredential(resourceURL, credential, nil)
if err != nil {
panic(err)
}
return dsc
}

func (TestResourceFactory) GetBlobServiceURLWithSAS(c asserter, accountType AccountType) *blobservice.Client {
Expand Down
71 changes: 30 additions & 41 deletions e2etest/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,27 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob"
blobsas "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
blobservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake"
datalakefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/filesystem"
datalakesas "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/sas"
datalakeservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service"
sharefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
filesas "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/sas"
fileservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share"
"io"
"math/rand"
"mime"
"net/url"
"os"
"strings"
"time"

"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/minio/minio-go/pkg/credentials"
chk "gopkg.in/check.v1"

"github.com/Azure/azure-storage-azcopy/v10/azbfs"
"github.com/Azure/azure-storage-azcopy/v10/ste"
"github.com/minio/minio-go"
"github.com/minio/minio-go/pkg/credentials"
chk "gopkg.in/check.v1"
)

var ctx = context.Background()
Expand Down Expand Up @@ -140,9 +142,9 @@ func getContainerURL(c asserter, bsc *blobservice.Client) (cc *container.Client,
return
}

func getFilesystemURL(c asserter, bfssu azbfs.ServiceURL) (filesystem azbfs.FileSystemURL, name string) {
func getFilesystemURL(c asserter, dsc *datalakeservice.Client) (fsc *filesystem.Client, name string) {
name = generateFilesystemName(c)
filesystem = bfssu.NewFileSystemURL(name)
fsc = dsc.NewFileSystemClient(name)

return
}
Expand All @@ -154,9 +156,9 @@ func getBlockBlobURL(c asserter, cc *container.Client, prefix string) (bc *block
return bc, name
}

func getBfsFileURL(c asserter, filesystemURL azbfs.FileSystemURL, prefix string) (file azbfs.FileURL, name string) {
func getBfsFileURL(c asserter, fsc *filesystem.Client, prefix string) (fc *datalakefile.Client, name string) {
name = prefix + generateBfsFileName(c)
file = filesystemURL.NewRootDirectoryURL().NewFileURL(name)
fc = fsc.NewFileClient(name)

return
}
Expand Down Expand Up @@ -196,30 +198,26 @@ func createNewContainer(c asserter, bsc *blobservice.Client) (cc *container.Clie
return
}

func createNewFilesystem(c asserter, bfssu azbfs.ServiceURL) (filesystem azbfs.FileSystemURL, name string) {
filesystem, name = getFilesystemURL(c, bfssu)
func createNewFilesystem(c asserter, dsc *datalakeservice.Client) (fsc *filesystem.Client, name string) {
fsc, name = getFilesystemURL(c, dsc)

cResp, err := filesystem.Create(ctx)
_, err := fsc.Create(ctx, nil)
c.AssertNoErr(err)
c.Assert(cResp.StatusCode(), equals(), 201)
return
}

func createNewBfsFile(c asserter, filesystem azbfs.FileSystemURL, prefix string) (file azbfs.FileURL, name string) {
file, name = getBfsFileURL(c, filesystem, prefix)
func createNewBfsFile(c asserter, fsc *filesystem.Client, prefix string) (fc *datalakefile.Client, name string) {
fc, name = getBfsFileURL(c, fsc, prefix)

// Create the file
cResp, err := file.Create(ctx, azbfs.BlobFSHTTPHeaders{}, azbfs.BlobFSAccessControl{})
_, err := fc.Create(ctx, nil)
c.AssertNoErr(err)
c.Assert(cResp.StatusCode(), equals(), 201)

aResp, err := file.AppendData(ctx, 0, strings.NewReader(string(make([]byte, defaultBlobFSFileSizeInBytes))))
_, err = fc.AppendData(ctx, 0, streaming.NopCloser(strings.NewReader(string(make([]byte, defaultBlobFSFileSizeInBytes)))), nil)
c.AssertNoErr(err)
c.Assert(aResp.StatusCode(), equals(), 202)

fResp, err := file.FlushData(ctx, defaultBlobFSFileSizeInBytes, nil, azbfs.BlobFSHTTPHeaders{}, false, true)
_, err = fc.FlushData(ctx, defaultBlobFSFileSizeInBytes, &datalakefile.FlushDataOptions{Close: to.Ptr(true)})
c.AssertNoErr(err)
c.Assert(fResp.StatusCode(), equals(), 200)
return
}

Expand Down Expand Up @@ -302,10 +300,9 @@ func deleteContainer(c asserter, cc *container.Client) {
c.AssertNoErr(err)
}

func deleteFilesystem(c asserter, filesystem azbfs.FileSystemURL) {
resp, err := filesystem.Delete(ctx)
func deleteFilesystem(c asserter, fsc *filesystem.Client) {
_, err := fsc.Delete(ctx, nil)
c.AssertNoErr(err)
c.Assert(resp.StatusCode(), equals(), 202)
}

type createS3ResOptions struct {
Expand Down Expand Up @@ -528,26 +525,18 @@ func getShareURLWithSAS(c asserter, credential *sharefile.SharedKeyCredential, s
return sc
}

func getAdlsServiceURLWithSAS(c asserter, credential azbfs.SharedKeyCredential) azbfs.ServiceURL {
sasQueryParams, err := azbfs.AccountSASSignatureValues{
Protocol: azbfs.SASProtocolHTTPS,
ExpiryTime: time.Now().Add(48 * time.Hour),
Permissions: "rwdlacup",
Services: "bqf",
ResourceTypes: "sco",
}.NewSASQueryParameters(&credential)
func getAdlsServiceURLWithSAS(c asserter, credential *azdatalake.SharedKeyCredential) *datalakeservice.Client {
rawURL := fmt.Sprintf("https://%s.dfs.core.windows.net/", credential.AccountName())
dsc, err := datalakeservice.NewClientWithSharedKeyCredential(rawURL, credential, nil)
c.AssertNoErr(err)

// construct the url from scratch
qp := sasQueryParams.Encode()
rawURL := fmt.Sprintf("https://%s.dfs.core.windows.net/?%s",
credential.AccountName(), qp)

// convert the raw url and validate it was parsed successfully
fullURL, err := url.Parse(rawURL)
sasURL, err := dsc.GetSASURL(datalakesas.AccountResourceTypes{Service: true, Container: true, Object: true},
datalakesas.AccountPermissions{Read: true, Write: true, Create: true, Delete: true, List: true, Add: true, Update: true, Process: true},
time.Now().UTC().Add(48 * time.Hour), nil)
c.AssertNoErr(err)

return azbfs.NewServiceURL(*fullURL, azbfs.NewPipeline(azbfs.NewAnonymousCredential(), azbfs.PipelineOptions{}))
dsc, err = datalakeservice.NewClientWithNoCredential(sasURL, nil)
c.AssertNoErr(err)
return dsc
}

// check.v1 style "StringContains" checker
Expand Down
71 changes: 36 additions & 35 deletions e2etest/scenario_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob"
blobservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake"
datalakedirectory "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/directory"
datalakefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/file"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/filesystem"
datalakeservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azdatalake/service"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/directory"
sharefile "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/file"
fileservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/service"
Expand All @@ -52,7 +57,6 @@ import (
"strings"
"time"

"github.com/Azure/azure-storage-azcopy/v10/azbfs"
"github.com/Azure/azure-storage-azcopy/v10/sddl"
"github.com/minio/minio-go"

Expand Down Expand Up @@ -301,15 +305,15 @@ func (scenarioHelper) generateCommonRemoteScenarioForBlob(c asserter, containerC
return
}

func (scenarioHelper) generateCommonRemoteScenarioForBlobFS(c asserter, filesystemURL azbfs.FileSystemURL, prefix string) (pathList []string) {
func (scenarioHelper) generateCommonRemoteScenarioForBlobFS(c asserter, fsc *filesystem.Client, prefix string) (pathList []string) {
pathList = make([]string, 50)

for i := 0; i < 10; i++ {
_, pathName1 := createNewBfsFile(c, filesystemURL, prefix+"top")
_, pathName2 := createNewBfsFile(c, filesystemURL, prefix+"sub1/")
_, pathName3 := createNewBfsFile(c, filesystemURL, prefix+"sub2/")
_, pathName4 := createNewBfsFile(c, filesystemURL, prefix+"sub1/sub3/sub5")
_, pathName5 := createNewBfsFile(c, filesystemURL, prefix+specialNames[i])
_, pathName1 := createNewBfsFile(c, fsc, prefix+"top")
_, pathName2 := createNewBfsFile(c, fsc, prefix+"sub1/")
_, pathName3 := createNewBfsFile(c, fsc, prefix+"sub2/")
_, pathName4 := createNewBfsFile(c, fsc, prefix+"sub1/sub3/sub5")
_, pathName5 := createNewBfsFile(c, fsc, prefix+specialNames[i])

pathList[5*i] = pathName1
pathList[5*i+1] = pathName2
Expand Down Expand Up @@ -374,13 +378,13 @@ func (s scenarioHelper) generateFileSharesAndFilesFromLists(c asserter, serviceC
}
}

func (s scenarioHelper) generateFilesystemsAndFilesFromLists(c asserter, serviceURL azbfs.ServiceURL, fsList []string, fileList []string, data string) {
func (s scenarioHelper) generateFilesystemsAndFilesFromLists(c asserter, dsc *datalakeservice.Client, fsList []string, fileList []string, data string) {
for _, filesystemName := range fsList {
fsURL := serviceURL.NewFileSystemURL(filesystemName)
_, err := fsURL.Create(ctx)
fsc := dsc.NewFileSystemClient(filesystemName)
_, err := fsc.Create(ctx, nil)
c.AssertNoErr(err)

s.generateBFSPathsFromList(c, fsURL, fileList)
s.generateBFSPathsFromList(c, fsc, fileList)
}
}

Expand Down Expand Up @@ -568,30 +572,29 @@ func (scenarioHelper) generateBlobsFromList(c asserter, options *generateBlobFro
}

if b.creationProperties.adlsPermissionsACL != nil {
bfsURLParts := azbfs.NewBfsURLParts(options.rawSASURL)
bfsURLParts, err := azdatalake.ParseURL(options.rawSASURL.String())
c.AssertNoErr(err)
bfsURLParts.Host = strings.Replace(bfsURLParts.Host, ".blob", ".dfs", 1)

bfsContainer := azbfs.NewFileSystemURL(bfsURLParts.URL(), azbfs.NewPipeline(azbfs.NewAnonymousCredential(), azbfs.PipelineOptions{}))
fsc, err := filesystem.NewClientWithNoCredential(bfsURLParts.String(), nil)
c.AssertNoErr(err)

var updateResp *azbfs.PathUpdateResponse
if b.isFolder() {
dirURL := bfsContainer.NewDirectoryURL(b.name)
dc := fsc.NewDirectoryClient(b.name)

updateResp, err = dirURL.SetAccessControl(ctx, azbfs.BlobFSAccessControl{
ACL: *b.creationProperties.adlsPermissionsACL,
})
_, err = dc.SetAccessControl(ctx,
&datalakedirectory.SetAccessControlOptions{ACL: b.creationProperties.adlsPermissionsACL})
} else {
d, f := path.Split(b.name)
dirURL := bfsContainer.NewDirectoryURL(d)
fileURL := dirURL.NewFileURL(f)
dc := fsc.NewDirectoryClient(d)
fc, err := dc.NewFileClient(f)
c.AssertNoErr(err)

updateResp, err = fileURL.SetAccessControl(ctx, azbfs.BlobFSAccessControl{
ACL: *b.creationProperties.adlsPermissionsACL,
})
_, err = fc.SetAccessControl(ctx,
&datalakefile.SetAccessControlOptions{ACL: b.creationProperties.adlsPermissionsACL})
}

c.AssertNoErr(err)
c.Assert(updateResp.StatusCode(), equals(), 200)
}
}

Expand Down Expand Up @@ -1050,22 +1053,19 @@ func (s scenarioHelper) downloadFileContent(a asserter, options downloadContentO
return destData
}

func (scenarioHelper) generateBFSPathsFromList(c asserter, filesystemURL azbfs.FileSystemURL, fileList []string) {
func (scenarioHelper) generateBFSPathsFromList(c asserter, fsc *filesystem.Client, fileList []string) {
for _, bfsPath := range fileList {
file := filesystemURL.NewRootDirectoryURL().NewFileURL(bfsPath)
fc := fsc.NewFileClient(bfsPath)

// Create the file
cResp, err := file.Create(ctx, azbfs.BlobFSHTTPHeaders{}, azbfs.BlobFSAccessControl{})
_, err := fc.Create(ctx, nil)
c.AssertNoErr(err)
c.Assert(cResp.StatusCode(), equals(), 201)

aResp, err := file.AppendData(ctx, 0, strings.NewReader(string(make([]byte, defaultBlobFSFileSizeInBytes))))
_, err = fc.AppendData(ctx, 0, streaming.NopCloser(strings.NewReader(string(make([]byte, defaultBlobFSFileSizeInBytes)))), nil)
c.AssertNoErr(err)
c.Assert(aResp.StatusCode(), equals(), 202)

fResp, err := file.FlushData(ctx, defaultBlobFSFileSizeInBytes, nil, azbfs.BlobFSHTTPHeaders{}, false, true)
_, err = fc.FlushData(ctx, defaultBlobFSFileSizeInBytes, &datalakefile.FlushDataOptions{Close: to.Ptr(true)})
c.AssertNoErr(err)
c.Assert(fResp.StatusCode(), equals(), 200)

}
}
Expand Down Expand Up @@ -1130,11 +1130,12 @@ func (scenarioHelper) getRawFileServiceURLWithSAS(c asserter) string {
return getFileServiceURLWithSAS(c, credential).URL()
}

func (scenarioHelper) getRawAdlsServiceURLWithSAS(c asserter) azbfs.ServiceURL {
func (scenarioHelper) getRawAdlsServiceURLWithSAS(c asserter) *datalakeservice.Client {
accountName, accountKey := GlobalInputManager{}.GetAccountAndKey(EAccountType.Standard())
credential := azbfs.NewSharedKeyCredential(accountName, accountKey)
credential, err := azdatalake.NewSharedKeyCredential(accountName, accountKey)
c.AssertNoErr(err)

return getAdlsServiceURLWithSAS(c, *credential)
return getAdlsServiceURLWithSAS(c, credential)
}

func (scenarioHelper) getBlobServiceURL(c asserter) *blobservice.Client {
Expand Down
Loading

0 comments on commit 527ff0f

Please sign in to comment.