Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
umagnus committed May 11, 2023
1 parent 3219155 commit 0050503
Showing 1 changed file with 52 additions and 104 deletions.
156 changes: 52 additions & 104 deletions pkg/blob/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ const (
privateEndpoint = "privateendpoint"
waitForBlobContainerCopyInterval = 5 * time.Second
waitForBlobContainerCopyTimeout = 10 * time.Minute
blobUrl = "blob.core.windows.net"
)

// CreateVolume provisions a volume
Expand Down Expand Up @@ -389,7 +388,7 @@ func (d *Driver) CreateVolume(ctx context.Context, req *csi.CreateVolumeRequest)
}()

if req.GetVolumeContentSource() != nil {
if err := d.copyVolume(ctx, req, accountKey, validContainerName, secrets); err != nil {
if err := d.copyVolume(ctx, req, accountKey, validContainerName, storageEndpointSuffix, secrets); err != nil {
return nil, err
}
} else {
Expand Down Expand Up @@ -684,97 +683,77 @@ func (d *Driver) DeleteBlobContainer(ctx context.Context, subsID, resourceGroupN
}

// CopyBlobContainer copies a blob container in the same storage account
func (d *Driver) CopyBlobContainer(ctx context.Context, subsID, resourceGroupName, accountName, accountKey, srcContainerName, dstContainerName string, secrets map[string]string) error {
func (d *Driver) CopyBlobContainer(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string, secrets map[string]string) error {
resourceGroupName, accountName, srcContainerName, _, subsID, err := GetContainerInfo(req.GetVolumeContentSource().GetVolume().GetVolumeId())
if err != nil {
return status.Error(codes.NotFound, err.Error())
}
klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName)
if srcContainerName == "" {
return fmt.Errorf("srcContainerName is empty")
}
if dstContainerName == "" {
return fmt.Errorf("dstContainerName is empty")
}
if acquired := d.volumeLocks.TryAcquire("copy_" + dstContainerName); !acquired {
return status.Errorf(codes.Aborted, volumeOperationAlreadyExistsFmt, dstContainerName)
}
defer d.volumeLocks.Release("copy_" + dstContainerName)
return wait.ExponentialBackoff(d.cloud.RequestBackoff(), func() (bool, error) {
var err error
if len(secrets) > 0 {
accountSasToken, rerr := getAccountSasToken(secrets, accountName)
if rerr != nil || accountSasToken == "" {
klog.V(2).Infof("get account sas token from secrets err: %s, generate sas token for account(%s)", err.Error(), accountName)
accountSasToken, rerr = GenerateSASToken(accountName, accountKey, blobUrl)
if rerr != nil {
return true, rerr
}
}
srcContainer, getErr := getContainerReference(srcContainerName, secrets, d.cloud.Environment)
if getErr != nil {
return true, getErr
}
srcExist, _ := srcContainer.Exists()
if !srcExist {
return true, fmt.Errorf("srcContainer(%s) is not exist", srcContainerName)
}
srcPath := fmt.Sprintf("'https://%s.%s/%s%s'", accountName, blobUrl, srcContainerName, accountSasToken)
dstPath := fmt.Sprintf("'https://%s.%s/%s%s'", accountName, blobUrl, dstContainerName, accountSasToken)

klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
out, err := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
if err != nil {
return true, status.Errorf(codes.Internal, "failed to copy blob container %v: %v", err, string(out))
}
if waitErr := d.waitForBlobContainerCopy(ctx, subsID, resourceGroupName, accountName, dstContainerName, secrets, waitForBlobContainerCopyInterval, waitForBlobContainerCopyTimeout); waitErr != nil {
return true, status.Error(codes.Internal, fmt.Sprintf("failed to wait for blob container copy: %v", waitErr.Error()))
}
klog.V(2).Infof("copied blob container %s to %s", srcContainerName, dstContainerName)
} else {
klog.V(2).Infof("generate sas token for account(%s)", accountName)
accountSasToken, genErr := GenerateSASToken(accountName, accountKey, blobUrl)
if genErr != nil {
return true, genErr
}
_, rerr := d.cloud.BlobClient.GetContainer(ctx, subsID, resourceGroupName, accountName, srcContainerName)
if rerr != nil {
return true, fmt.Errorf("srcContainer(%s) get error: %s", srcContainerName, rerr.Error())
}
srcPath := fmt.Sprintf("'https://%s.%s/%s%s'", accountName, blobUrl, srcContainerName, accountSasToken)
dstPath := fmt.Sprintf("'https://%s.%s/%s%s'", accountName, blobUrl, dstContainerName, accountSasToken)

klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
out, err := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
if len(secrets) > 0 {
accountSasToken, err := getAccountSasToken(secrets, accountName)
if err != nil || accountSasToken == "" {
klog.V(2).Infof("get account sas token from secrets err: %s, generate sas token for account(%s)", err.Error(), accountName)
accountSasToken, err = GenerateSASToken(accountName, accountKey, storageEndpointSuffix)
if err != nil {
return true, status.Errorf(codes.Internal, "failed to copy blob container %v: %v", err, string(out))
}
if waitErr := d.waitForBlobContainerCopy(ctx, subsID, resourceGroupName, accountName, dstContainerName, secrets, waitForBlobContainerCopyInterval, waitForBlobContainerCopyTimeout); waitErr != nil {
return true, status.Error(codes.Internal, fmt.Sprintf("failed to wait for blob container copy: %v", waitErr.Error()))
return err
}
klog.V(2).Infof("copied blob container %s to %s", srcContainerName, dstContainerName)
}
srcContainer, getErr := getContainerReference(srcContainerName, secrets, d.cloud.Environment)
if getErr != nil {
return getErr
}
exists, err := srcContainer.Exists()
if err != nil {
klog.Warningf("CopyContainer(%s, %s, %s) from(%s, %s, %s) failed with error(%v), retry", resourceGroupName, accountName, dstContainerName, resourceGroupName, accountName, srcContainerName, err)
return false, nil
return err
}
return true, err
})
}
if !exists {
return fmt.Errorf("source container(%s) does not exist", srcContainerName)
}
srcPath := fmt.Sprintf("'https://%s.blob.%s/%s%s'", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
dstPath := fmt.Sprintf("'https://%s.blob.%s/%s%s'", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)

// copyFromVolume create a copied volume from a volume
func (d *Driver) copyFromVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName string, secrets map[string]string) error {
resourceGroupName, accountName, srcContainerName, _, subsID, err := GetContainerInfo(req.GetVolumeContentSource().GetVolume().GetVolumeId())
if err != nil {
return status.Error(codes.NotFound, err.Error())
klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
out, err := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
if err != nil {
return status.Errorf(codes.Internal, "failed to copy blob container %v: %v", err, string(out))
}
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
} else {
klog.V(2).Infof("generate sas token for account(%s)", accountName)
accountSasToken, genErr := GenerateSASToken(accountName, accountKey, storageEndpointSuffix)
if genErr != nil {
return genErr
}
if _, rerr := d.cloud.BlobClient.GetContainer(ctx, subsID, resourceGroupName, accountName, srcContainerName); rerr != nil {
return fmt.Errorf("srcContainer(%s) get error: %s", srcContainerName, rerr.Error())
}
srcPath := fmt.Sprintf("'https://%s.blob.%s/%s%s'", accountName, storageEndpointSuffix, srcContainerName, accountSasToken)
dstPath := fmt.Sprintf("'https://%s.blob.%s/%s%s'", accountName, storageEndpointSuffix, dstContainerName, accountSasToken)

klog.V(2).Infof("copy blob container %s to %s", srcContainerName, dstContainerName)
out, err := exec.Command("azcopy", "copy", srcPath, dstPath, "--recursive", "--check-length=false").CombinedOutput()
if err != nil {
return status.Errorf(codes.Internal, "failed to copy blob container %v: %v", err, string(out))
}
klog.V(2).Infof("copied blob container %s to %s successfully", srcContainerName, dstContainerName)
}
klog.V(2).Infof("begin to copy blob container %s to %s", srcContainerName, dstContainerName)
d.CopyBlobContainer(ctx, subsID, resourceGroupName, accountName, accountKey, srcContainerName, dstContainerName, secrets)
return nil
}

func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName string, secrets map[string]string) error {
func (d *Driver) copyVolume(ctx context.Context, req *csi.CreateVolumeRequest, accountKey, dstContainerName, storageEndpointSuffix string, secrets map[string]string) error {
vs := req.VolumeContentSource
switch vs.Type.(type) {
case *csi.VolumeContentSource_Snapshot:
return status.Errorf(codes.InvalidArgument, "copy volume from volumeSnapshot is not supported")
case *csi.VolumeContentSource_Volume:
return d.copyFromVolume(ctx, req, accountKey, dstContainerName, secrets)
return d.CopyBlobContainer(ctx, req, accountKey, dstContainerName, storageEndpointSuffix, secrets)
default:
return status.Errorf(codes.InvalidArgument, "%v is not a proper volume source", vs)
}
Expand Down Expand Up @@ -805,12 +784,12 @@ func parseDays(dayStr string) (int32, error) {
return int32(days), nil
}

func GenerateSASToken(accountName, accountKey, cloudUrl string) (string, error) {
func GenerateSASToken(accountName, accountKey, storageEndpointSuffix string) (string, error) {
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new shared key credential, accountName: %s, accountKey: %s, err: %s", accountName, accountKey, err.Error()))
}
serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.%s/", accountName, cloudUrl), credential, nil)
serviceClient, err := service.NewClientWithSharedKeyCredential(fmt.Sprintf("https://%s.blob.%s/", accountName, storageEndpointSuffix), credential, nil)
if err != nil {
return "", status.Errorf(codes.Internal, fmt.Sprintf("failed to generate sas token in creating new client with shared key credential, accountName: %s, accountKey: %s, err: %s", accountName, accountKey, err.Error()))
}
Expand All @@ -828,34 +807,3 @@ func GenerateSASToken(accountName, accountKey, cloudUrl string) (string, error)
sasToken := "?" + u.RawQuery
return sasToken, nil
}

// waitForBlobContainerCopy wait for copy blob container
func (d *Driver) waitForBlobContainerCopy(ctx context.Context, subsID, resourceGroupName, accountName, dstContainerName string, secrets map[string]string, intervel, timeout time.Duration) error {
timeAfter := time.After(timeout)
timeTick := time.Tick(intervel)

for {
select {
case <-timeTick:
if len(secrets) > 0 {
dstContainer, getErr := getContainerReference(dstContainerName, secrets, d.cloud.Environment)
if getErr != nil {
return getErr
}
dstExist, _ := dstContainer.Exists()
if dstExist {
klog.V(2).Infof("dstContainer(%s) exists", dstContainerName)
return nil
}
} else {
_, rerr := d.cloud.BlobClient.GetContainer(ctx, subsID, resourceGroupName, accountName, dstContainerName)
if rerr == nil {
klog.V(2).Infof("dstContainer(%s) exists", dstContainerName)
return nil
}
}
case <-timeAfter:
return fmt.Errorf("timeout waiting for copy blob container(%s) under subsID(%s) rg(%s) account(%s)", dstContainerName, subsID, resourceGroupName, accountName)
}
}
}

0 comments on commit 0050503

Please sign in to comment.