From 6007f08494a2e65dc42aa50b3a949127b45a399f Mon Sep 17 00:00:00 2001 From: Marco Amador Date: Tue, 4 Jun 2024 14:23:04 +0100 Subject: [PATCH] Add azure_file_share output Add azure_file_share output --- go.mod | 1 + go.sum | 2 + internal/impl/azure/auth.go | 76 ++++++- internal/impl/azure/input_blob_storage.go | 2 +- internal/impl/azure/input_queue_storage.go | 2 +- internal/impl/azure/input_table_storage.go | 2 +- internal/impl/azure/output_blob_storage.go | 2 +- internal/impl/azure/output_file_share.go | 211 ++++++++++++++++++++ internal/impl/azure/output_queue_storage.go | 2 +- internal/impl/azure/output_table_storage.go | 2 +- 10 files changed, 295 insertions(+), 7 deletions(-) create mode 100644 internal/impl/azure/output_file_share.go diff --git a/go.mod b/go.mod index e2246163e..3d6356070 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.0.3 github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.2.0 github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 + github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.3.1 github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0 github.com/Azure/go-amqp v1.0.5 github.com/ClickHouse/clickhouse-go/v2 v2.27.1 diff --git a/go.sum b/go.sum index cf40c4518..880e4d4eb 100644 --- a/go.sum +++ b/go.sum @@ -89,6 +89,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 h1:Be6KInmFEKV81c0pOAEbRYehLMwmmGI1exuFj248AMk= github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0/go.mod h1:WCPBHsOXfBVnivScjs2ypRfimjEW0qPVLGgJkZlrIOA= +github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.3.1 h1:a1U6j4GPI18JQCqgz7/DcqXA1vzvGBugm14AXZfU0gs= +github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.3.1/go.mod h1:tZyRNcHi2/yo+ugYHTUuOrHiboKilaizLnRL5aZTe6A= github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0 h1:lJwNFV+xYjHREUTHJKx/ZF6CJSt9znxmLw9DqSTvyRU= github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0/go.mod h1:GfT0aGew8Qj5yiQVqOO5v7N8fanbJGyUoHqXg56qcVY= github.com/Azure/azure-storage-blob-go v0.14.0/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck= diff --git a/internal/impl/azure/auth.go b/internal/impl/azure/auth.go index a46557b63..2963673a8 100644 --- a/internal/impl/azure/auth.go +++ b/internal/impl/azure/auth.go @@ -20,6 +20,8 @@ import ( "os" "strings" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share" + "github.com/redpanda-data/benthos/v4/public/service" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" @@ -36,7 +38,7 @@ const ( bscFieldStorageConnectionString = "storage_connection_string" ) -func azureComponentSpec(forBlobStorage bool) *service.ConfigSpec { +func azureComponentSpec() *service.ConfigSpec { spec := service.NewConfigSpec(). Categories("Services", "Azure"). Fields( @@ -80,6 +82,29 @@ func blobStorageClientFromParsed(pConf *service.ParsedConfig, container *service return getBlobStorageClient(connectionString, storageAccount, storageAccessKey, storageSASToken, container) } +func fileShareClientFromParsed(pConf *service.ParsedConfig, shareName string) (*share.Client, bool, bool, string, string, error) { + connectionString, err := pConf.FieldString(bscFieldStorageConnectionString) + if err != nil { + return nil, false, false, "", "", err + } + storageAccount, err := pConf.FieldString(bscFieldStorageAccount) + if err != nil { + return nil, false, false, "", "", err + } + storageAccessKey, err := pConf.FieldString(bscFieldStorageAccessKey) + if err != nil { + return nil, false, false, "", "", err + } + storageSASToken, err := pConf.FieldString(bscFieldStorageSASToken) + if err != nil { + return nil, false, false, "", "", err + } + if storageAccount == "" && connectionString == "" { + return nil, false, false, "", "", errors.New("invalid azure storage account credentials") + } + return getFileShareClient(connectionString, storageAccount, storageAccessKey, storageSASToken, shareName) +} + const ( blobEndpointExp = "https://%s.blob.core.windows.net" ) @@ -127,6 +152,55 @@ func getBlobStorageClient(storageConnectionString, storageAccount, storageAccess return client, containerSASToken, err } +const ( + fileEndpointExp = "https://%s.file.core.windows.net" +) + +func getFileShareClient(storageConnectionString, storageAccount, storageAccessKey, storageSASToken, shareName string) (*share.Client, bool, bool, string, string, error) { + var ( + client *share.Client + err error + usesStorageSASToken, fileSASToken bool + ) + if len(storageConnectionString) > 0 { + storageConnectionString := parseStorageConnectionString(storageConnectionString, storageAccount) + client, err = share.NewClientFromConnectionString(storageConnectionString, shareName, nil) + } else if len(storageAccessKey) > 0 { + cred, credErr := share.NewSharedKeyCredential(storageAccount, storageAccessKey) + if credErr != nil { + return nil, false, false, "", "", fmt.Errorf("error creating shared key credential: %w", credErr) + } + serviceURL := fmt.Sprintf(fileEndpointExp, storageAccount) + client, err = share.NewClientWithSharedKeyCredential(serviceURL, cred, nil) + } else if len(storageSASToken) > 0 { + var serviceURL string + if strings.HasPrefix(storageSASToken, "sp=") { + // file share SAS token + fileSASToken = true + serviceURL = fmt.Sprintf("%s/%s?%s", fmt.Sprintf(fileEndpointExp, storageAccount), shareName, storageSASToken) + } else { + // storage account SAS token + usesStorageSASToken = true + serviceURL = fmt.Sprintf("%s/%s", fmt.Sprintf(fileEndpointExp, storageAccount), storageSASToken) + } + client, err = share.NewClientWithNoCredential(serviceURL, nil) + if usesStorageSASToken { + return client, fileSASToken, usesStorageSASToken, storageAccount, storageSASToken, err + } + } else { + cred, credErr := azidentity.NewDefaultAzureCredential(nil) + if credErr != nil { + return nil, false, false, "", "", fmt.Errorf("error getting default Azure credentials: %v", credErr) + } + serviceURL := fmt.Sprintf(fileEndpointExp, storageAccount) + client, err = share.NewClient(serviceURL, cred, nil) + } + if err != nil { + return nil, false, false, "", "", fmt.Errorf("invalid azure storage account credentials: %v", err) + } + return client, fileSASToken, usesStorageSASToken, "", "", err +} + // getEmulatorConnectionString returns the Azurite connection string for the provided service ports // Details here: https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio#http-connection-strings func getEmulatorConnectionString(blobServicePort, queueServicePort, tableServicePort string) string { diff --git a/internal/impl/azure/input_blob_storage.go b/internal/impl/azure/input_blob_storage.go index 8910b503a..57cda4ee1 100644 --- a/internal/impl/azure/input_blob_storage.go +++ b/internal/impl/azure/input_blob_storage.go @@ -84,7 +84,7 @@ func bsiConfigFromParsed(pConf *service.ParsedConfig) (conf bsiConfig, err error } func bsiSpec() *service.ConfigSpec { - return azureComponentSpec(true). + return azureComponentSpec(). Beta(). Version("3.36.0"). Summary(`Downloads objects within an Azure Blob Storage container, optionally filtered by a prefix.`). diff --git a/internal/impl/azure/input_queue_storage.go b/internal/impl/azure/input_queue_storage.go index a7552ff5d..09283ae78 100644 --- a/internal/impl/azure/input_queue_storage.go +++ b/internal/impl/azure/input_queue_storage.go @@ -61,7 +61,7 @@ func qsiConfigFromParsed(pConf *service.ParsedConfig) (conf qsiConfig, err error } func qsiSpec() *service.ConfigSpec { - return azureComponentSpec(false). + return azureComponentSpec(). Beta(). Version("3.42.0"). Summary(`Dequeue objects from an Azure Storage Queue.`). diff --git a/internal/impl/azure/input_table_storage.go b/internal/impl/azure/input_table_storage.go index cdfb48233..bcf9294dc 100644 --- a/internal/impl/azure/input_table_storage.go +++ b/internal/impl/azure/input_table_storage.go @@ -64,7 +64,7 @@ func tsiConfigFromParsed(pConf *service.ParsedConfig) (conf tsiConfig, err error } func tsiSpec() *service.ConfigSpec { - return azureComponentSpec(false). + return azureComponentSpec(). Beta(). Version("4.10.0"). Summary(`Queries an Azure Storage Account Table, optionally with multiple filters.`). diff --git a/internal/impl/azure/output_blob_storage.go b/internal/impl/azure/output_blob_storage.go index cbad245b8..0ddf73c94 100644 --- a/internal/impl/azure/output_blob_storage.go +++ b/internal/impl/azure/output_blob_storage.go @@ -69,7 +69,7 @@ func bsoConfigFromParsed(pConf *service.ParsedConfig) (conf bsoConfig, err error } func bsoSpec() *service.ConfigSpec { - return azureComponentSpec(true). + return azureComponentSpec(). Beta(). Version("3.36.0"). Summary(`Sends message parts as objects to an Azure Blob Storage Account container. Each object is uploaded with the filename specified with the `+"`container`"+` field.`). diff --git a/internal/impl/azure/output_file_share.go b/internal/impl/azure/output_file_share.go new file mode 100644 index 000000000..de34a584a --- /dev/null +++ b/internal/impl/azure/output_file_share.go @@ -0,0 +1,211 @@ +package azure + +import ( + "bytes" + "context" + "errors" + "fmt" + "path/filepath" + + "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/redpanda-data/benthos/v4/public/service" +) + +const ( + // File Share output fields + fsFieldShareName = "share_name" + fsFieldPath = "path" +) + +type fsConfig struct { + client *share.Client + storageSASToken string + storageAccount string + usesStorageSASToken bool + ShareName *service.InterpolatedString + Path *service.InterpolatedString +} + +func fsConfigFromParsed(pConf *service.ParsedConfig) (conf fsConfig, err error) { + if conf.ShareName, err = pConf.FieldInterpolatedString(fsFieldShareName); err != nil { + return + } + var usesShareSASToken bool + c, err := conf.ShareName.TryString(service.NewMessage([]byte(""))) + if err != nil { + return + } + if conf.client, usesShareSASToken, conf.usesStorageSASToken, conf.storageAccount, conf.storageSASToken, err = + fileShareClientFromParsed(pConf, c); err != nil { + return + } + if usesShareSASToken { + // if using a share SAS token, the share name is already implicit + conf.ShareName, _ = service.NewInterpolatedString("") + } + if conf.Path, err = pConf.FieldInterpolatedString(fsFieldPath); err != nil { + return + } + return +} + +func fsSpec() *service.ConfigSpec { + return azureComponentSpec(). + Beta(). + Version("4.24.0"). + Summary(`Sends message parts as objects to an Azure File Share. Each object is uploaded with the filename specified with the `+"`path`"+` field.`). + Description(` +In order to have a different path for each object you should use function +interpolations described [here](/docs/configuration/interpolation#bloblang-queries), which are +calculated per message of a batch. + +Supports multiple authentication methods but only one of the following is required: +- `+"`storage_connection_string`"+` +- `+"`storage_account` and `storage_access_key`"+` +- `+"`storage_account` and `storage_sas_token`"+` +- `+"`storage_account` to access via [DefaultAzureCredential](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#DefaultAzureCredential)"+` + +If multiple are set then the `+"`storage_connection_string`"+` is given priority. + +If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+` parameter, please specify it in the +`+"`storage_account`"+` field.`). + Fields( + service.NewInterpolatedStringField(fsFieldShareName). + Description("The file share for uploading the files to. It will be created if it doesn't already exist and the credentials have the necessary permissions."). + Example(`foo-share-${!timestamp("2006")}`), + service.NewInterpolatedStringField(fsFieldPath). + Description("The path of each file to upload."). + Example(`foo-${!timestamp_unix_nano()}.json`). + Example(`${!meta("kafka_key")}.json`). + Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`). + Default(`${!count("files")}-${!timestamp_unix_nano()}.txt`), + service.NewOutputMaxInFlightField(), + ) +} + +func init() { + err := service.RegisterOutput("azure_file_share", fsSpec(), + func(conf *service.ParsedConfig, mgr *service.Resources) (out service.Output, mif int, err error) { + var pConf fsConfig + if pConf, err = fsConfigFromParsed(conf); err != nil { + return + } + if mif, err = conf.FieldMaxInFlight(); err != nil { + return + } + if out, err = newAzureFileShareWriter(pConf, mgr.Logger()); err != nil { + return + } + return + }) + if err != nil { + panic(err) + } +} + +type azureFileShareWriter struct { + conf fsConfig + log *service.Logger +} + +func newAzureFileShareWriter(conf fsConfig, log *service.Logger) (*azureFileShareWriter, error) { + a := &azureFileShareWriter{ + conf: conf, + log: log, + } + return a, nil +} + +func (a *azureFileShareWriter) Connect(ctx context.Context) error { + return nil +} + +func (a *azureFileShareWriter) uploadFile(ctx context.Context, shareClient *share.Client, path string, message []byte) error { + directory := filepath.Dir(path) + filename := filepath.Base(path) + dirClient := shareClient.NewDirectoryClient(directory) + fClient := dirClient.NewFileClient(filename) + if _, err := fClient.Create(ctx, int64(len(message)), nil); err != nil { + if isFileErrorCode(err, fileerror.ParentNotFound) { + if _, err := dirClient.Create(ctx, nil); err != nil { + return err + } + } else { + return err + } + } + reader := bytes.NewReader(message) + buffer := make([]byte, len(message)) + if _, err := reader.Read(buffer); err != nil { + return err + } + if err := fClient.UploadBuffer(ctx, buffer, nil); err != nil { + return err + } + return nil +} + +func (a *azureFileShareWriter) createFileShare(ctx context.Context, client *share.Client) error { + _, err := client.Create(ctx, &share.CreateOptions{}) + return err +} + +func (a *azureFileShareWriter) getStorageSASURL(shareName string) string { + return fmt.Sprintf("%s/%s%s", fmt.Sprintf(fileEndpointExp, a.conf.storageAccount), shareName, a.conf.storageSASToken) +} + +func (a *azureFileShareWriter) Write(ctx context.Context, msg *service.Message) error { + shareName, err := a.conf.ShareName.TryString(msg) + if err != nil { + return fmt.Errorf("file share name interpolation error: %s", err) + } + var shareClient = a.conf.client + if a.conf.usesStorageSASToken && shareName != "" { + if shareClient, err = share.NewClientWithNoCredential(a.getStorageSASURL(shareName), nil); err != nil { + return fmt.Errorf("error getting SAS url with storage SAS token: %v", err) + } + } + fileName, err := a.conf.Path.TryString(msg) + if err != nil { + return fmt.Errorf("path interpolation error: %v", err) + } + mBytes, err := msg.AsBytes() + if err != nil { + return err + } + if err := a.uploadFile(ctx, shareClient, fileName, mBytes); err != nil { + if isFileErrorCode(err, fileerror.ShareNotFound) { + if err := a.createFileShare(ctx, shareClient); err != nil { + if !isFileErrorCode(err, fileerror.ShareAlreadyExists) { + return fmt.Errorf("failed to create file share: %s", err) + } + } + if err := a.uploadFile(ctx, shareClient, fileName, mBytes); err != nil { + return fmt.Errorf("error retrying to upload file: %s", err) + } + } else if isFileErrorCode(err, fileerror.ResourceNotFound) { + if err := a.uploadFile(ctx, shareClient, fileName, mBytes); err != nil { + return fmt.Errorf("error retrying to upload file after creating parent: %s", err) + } + } else { + return fmt.Errorf("failed to upload file: %s", err) + } + } + return nil +} + +func (a *azureFileShareWriter) Close(context.Context) error { + return nil +} + +func isFileErrorCode(err error, code fileerror.Code) bool { + var rerr *azcore.ResponseError + if ok := errors.As(err, &rerr); ok { + return rerr.ErrorCode == string(code) + } + + return false +} diff --git a/internal/impl/azure/output_queue_storage.go b/internal/impl/azure/output_queue_storage.go index 8acfcc179..fcb21f94c 100644 --- a/internal/impl/azure/output_queue_storage.go +++ b/internal/impl/azure/output_queue_storage.go @@ -53,7 +53,7 @@ func qsoConfigFromParsed(pConf *service.ParsedConfig) (conf qsoConfig, err error } func qsoSpec() *service.ConfigSpec { - return azureComponentSpec(false). + return azureComponentSpec(). Beta(). Version("3.36.0"). Summary(`Sends messages to an Azure Storage Queue.`). diff --git a/internal/impl/azure/output_table_storage.go b/internal/impl/azure/output_table_storage.go index 3a5bc1c97..11b6c2283 100644 --- a/internal/impl/azure/output_table_storage.go +++ b/internal/impl/azure/output_table_storage.go @@ -80,7 +80,7 @@ func tsoConfigFromParsed(pConf *service.ParsedConfig) (conf tsoConfig, err error } func tsoSpec() *service.ConfigSpec { - return azureComponentSpec(false). + return azureComponentSpec(). Beta(). Version("3.36.0"). Summary(`Stores messages in an Azure Table Storage table.`).