Skip to content

Commit

Permalink
Add azure_file_share output
Browse files Browse the repository at this point in the history
  • Loading branch information
mfamador committed Oct 4, 2024
1 parent b1e38d9 commit 575828c
Show file tree
Hide file tree
Showing 8 changed files with 289 additions and 7 deletions.
76 changes: 75 additions & 1 deletion internal/impl/azure/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"os"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share"

"github.com/redpanda-data/benthos/v4/public/service"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
Expand All @@ -36,7 +38,7 @@ const (
bscFieldStorageConnectionString = "storage_connection_string"
)

func azureComponentSpec(forBlobStorage bool) *service.ConfigSpec {
func azureComponentSpec() *service.ConfigSpec {
spec := service.NewConfigSpec().
Categories("Services", "Azure").
Fields(
Expand Down Expand Up @@ -80,6 +82,29 @@ func blobStorageClientFromParsed(pConf *service.ParsedConfig, container *service
return getBlobStorageClient(connectionString, storageAccount, storageAccessKey, storageSASToken, container)
}

func fileShareClientFromParsed(pConf *service.ParsedConfig, shareName string) (*share.Client, bool, bool, string, string, error) {
connectionString, err := pConf.FieldString(bscFieldStorageConnectionString)
if err != nil {
return nil, false, false, "", "", err
}
storageAccount, err := pConf.FieldString(bscFieldStorageAccount)
if err != nil {
return nil, false, false, "", "", err
}
storageAccessKey, err := pConf.FieldString(bscFieldStorageAccessKey)
if err != nil {
return nil, false, false, "", "", err
}
storageSASToken, err := pConf.FieldString(bscFieldStorageSASToken)
if err != nil {
return nil, false, false, "", "", err
}
if storageAccount == "" && connectionString == "" {
return nil, false, false, "", "", errors.New("invalid azure storage account credentials")
}
return getFileShareClient(connectionString, storageAccount, storageAccessKey, storageSASToken, shareName)
}

const (
blobEndpointExp = "https://%s.blob.core.windows.net"
)
Expand Down Expand Up @@ -127,6 +152,55 @@ func getBlobStorageClient(storageConnectionString, storageAccount, storageAccess
return client, containerSASToken, err
}

const (
fileEndpointExp = "https://%s.file.core.windows.net"
)

func getFileShareClient(storageConnectionString, storageAccount, storageAccessKey, storageSASToken, shareName string) (*share.Client, bool, bool, string, string, error) {
var (
client *share.Client
err error
usesStorageSASToken, fileSASToken bool
)
if len(storageConnectionString) > 0 {
storageConnectionString := parseStorageConnectionString(storageConnectionString, storageAccount)
client, err = share.NewClientFromConnectionString(storageConnectionString, shareName, nil)
} else if len(storageAccessKey) > 0 {
cred, credErr := share.NewSharedKeyCredential(storageAccount, storageAccessKey)
if credErr != nil {
return nil, false, false, "", "", fmt.Errorf("error creating shared key credential: %w", credErr)
}
serviceURL := fmt.Sprintf(fileEndpointExp, storageAccount)
client, err = share.NewClientWithSharedKeyCredential(serviceURL, cred, nil)
} else if len(storageSASToken) > 0 {
var serviceURL string
if strings.HasPrefix(storageSASToken, "sp=") {
// file share SAS token
fileSASToken = true
serviceURL = fmt.Sprintf("%s/%s?%s", fmt.Sprintf(fileEndpointExp, storageAccount), shareName, storageSASToken)
} else {
// storage account SAS token
usesStorageSASToken = true
serviceURL = fmt.Sprintf("%s/%s", fmt.Sprintf(fileEndpointExp, storageAccount), storageSASToken)
}
client, err = share.NewClientWithNoCredential(serviceURL, nil)
if usesStorageSASToken {
return client, fileSASToken, usesStorageSASToken, storageAccount, storageSASToken, err
}
} else {
cred, credErr := azidentity.NewDefaultAzureCredential(nil)
if credErr != nil {
return nil, false, false, "", "", fmt.Errorf("error getting default Azure credentials: %v", credErr)
}
serviceURL := fmt.Sprintf(fileEndpointExp, storageAccount)
client, err = share.NewClient(serviceURL, cred, nil)
}
if err != nil {
return nil, false, false, "", "", fmt.Errorf("invalid azure storage account credentials: %v", err)
}
return client, fileSASToken, usesStorageSASToken, "", "", err
}

// getEmulatorConnectionString returns the Azurite connection string for the provided service ports
// Details here: https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio#http-connection-strings
func getEmulatorConnectionString(blobServicePort, queueServicePort, tableServicePort string) string {
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/azure/input_blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func bsiConfigFromParsed(pConf *service.ParsedConfig) (conf bsiConfig, err error
}

func bsiSpec() *service.ConfigSpec {
return azureComponentSpec(true).
return azureComponentSpec().
Beta().
Version("3.36.0").
Summary(`Downloads objects within an Azure Blob Storage container, optionally filtered by a prefix.`).
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/azure/input_queue_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func qsiConfigFromParsed(pConf *service.ParsedConfig) (conf qsiConfig, err error
}

func qsiSpec() *service.ConfigSpec {
return azureComponentSpec(false).
return azureComponentSpec().
Beta().
Version("3.42.0").
Summary(`Dequeue objects from an Azure Storage Queue.`).
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/azure/input_table_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func tsiConfigFromParsed(pConf *service.ParsedConfig) (conf tsiConfig, err error
}

func tsiSpec() *service.ConfigSpec {
return azureComponentSpec(false).
return azureComponentSpec().
Beta().
Version("4.10.0").
Summary(`Queries an Azure Storage Account Table, optionally with multiple filters.`).
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/azure/output_blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func bsoConfigFromParsed(pConf *service.ParsedConfig) (conf bsoConfig, err error
}

func bsoSpec() *service.ConfigSpec {
return azureComponentSpec(true).
return azureComponentSpec().
Beta().
Version("3.36.0").
Summary(`Sends message parts as objects to an Azure Blob Storage Account container. Each object is uploaded with the filename specified with the `+"`container`"+` field.`).
Expand Down
208 changes: 208 additions & 0 deletions internal/impl/azure/output_file_share.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
package azure

import (
"bytes"
"context"
"errors"
"fmt"
"path/filepath"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/fileerror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/redpanda-data/benthos/v4/public/service"
)

const (
// File Share output fields
fsFieldShareName = "share_name"
fsFieldPath = "path"
)

type fsConfig struct {
client *share.Client
storageSASToken string
storageAccount string
usesStorageSASToken bool
ShareName *service.InterpolatedString
Path *service.InterpolatedString
}

func fsConfigFromParsed(pConf *service.ParsedConfig) (conf fsConfig, err error) {
if conf.ShareName, err = pConf.FieldInterpolatedString(fsFieldShareName); err != nil {
return
}
var usesShareSASToken bool
c, err := conf.ShareName.TryString(service.NewMessage([]byte("")))
if err != nil {
return
}
if conf.client, usesShareSASToken, conf.usesStorageSASToken, conf.storageAccount, conf.storageSASToken, err =
fileShareClientFromParsed(pConf, c); err != nil {
return
}
if usesShareSASToken {
// if using a share SAS token, the share name is already implicit
conf.ShareName, _ = service.NewInterpolatedString("")
}
if conf.Path, err = pConf.FieldInterpolatedString(fsFieldPath); err != nil {
return
}
return
}

func fsSpec() *service.ConfigSpec {
return azureComponentSpec().
Beta().
Version("4.24.0").
Summary(`Sends message parts as objects to an Azure File Share. Each object is uploaded with the filename specified with the `+"`path`"+` field.`).
Description(`
In order to have a different path for each object you should use function
interpolations described [here](/docs/configuration/interpolation#bloblang-queries), which are
calculated per message of a batch.
Supports multiple authentication methods but only one of the following is required:
- `+"`storage_connection_string`"+`
- `+"`storage_account` and `storage_access_key`"+`
- `+"`storage_account` and `storage_sas_token`"+`
- `+"`storage_account` to access via [DefaultAzureCredential](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#DefaultAzureCredential)"+`
If multiple are set then the `+"`storage_connection_string`"+` is given priority.
If the `+"`storage_connection_string`"+` does not contain the `+"`AccountName`"+` parameter, please specify it in the
`+"`storage_account`"+` field.`).
Fields(
service.NewInterpolatedStringField(fsFieldShareName).
Description("The file share for uploading the files to. It will be created if it doesn't already exist and the credentials have the necessary permissions.").
Example(`foo-share-${!timestamp("2006")}`),
service.NewInterpolatedStringField(fsFieldPath).
Description("The path of each file to upload.").
Example(`foo-${!timestamp_unix_nano()}.json`).
Example(`${!meta("kafka_key")}.json`).
Example(`${!json("doc.namespace")}/${!json("doc.id")}.json`).
Default(`${!count("files")}-${!timestamp_unix_nano()}.txt`),
service.NewOutputMaxInFlightField(),
)
}

func init() {
err := service.RegisterOutput("azure_file_share", fsSpec(),
func(conf *service.ParsedConfig, mgr *service.Resources) (out service.Output, mif int, err error) {
var pConf fsConfig
if pConf, err = fsConfigFromParsed(conf); err != nil {
return
}
if mif, err = conf.FieldMaxInFlight(); err != nil {
return
}
if out, err = newAzureFileShareWriter(pConf, mgr.Logger()); err != nil {
return
}
return
})
if err != nil {
panic(err)
}
}

type azureFileShareWriter struct {
conf fsConfig
log *service.Logger
}

func newAzureFileShareWriter(conf fsConfig, log *service.Logger) (*azureFileShareWriter, error) {
a := &azureFileShareWriter{
conf: conf,
log: log,
}
return a, nil
}

func (a *azureFileShareWriter) Connect(ctx context.Context) error {
return nil
}

func (a *azureFileShareWriter) uploadFile(ctx context.Context, shareClient *share.Client, path string, message []byte) error {
directory := filepath.Dir(path)
filename := filepath.Base(path)
dirClient := shareClient.NewDirectoryClient(directory)
fClient := dirClient.NewFileClient(filename)
if _, err := fClient.Create(ctx, int64(len(message)), nil); err != nil {
if isFileErrorCode(err, fileerror.ParentNotFound) {
if _, err := dirClient.Create(ctx, nil); err != nil {
return err
}
} else {
return err
}
}
reader := bytes.NewReader(message)
buffer := make([]byte, len(message))
if _, err := reader.Read(buffer); err != nil {
return err
}
if err := fClient.UploadBuffer(ctx, buffer, nil); err != nil {
fmt.Println(err)
return err
}
return nil
}

func (a *azureFileShareWriter) createFileShare(ctx context.Context, client *share.Client) error {
_, err := client.Create(ctx, &share.CreateOptions{})
return err
}

func (a *azureFileShareWriter) getStorageSASURL(shareName string) string {
return fmt.Sprintf("%s/%s%s", fmt.Sprintf(fileEndpointExp, a.conf.storageAccount), shareName, a.conf.storageSASToken)
}

func (a *azureFileShareWriter) Write(ctx context.Context, msg *service.Message) error {
shareName, err := a.conf.ShareName.TryString(msg)
if err != nil {
return fmt.Errorf("file share name interpolation error: %s", err)
}
var shareClient = a.conf.client
if a.conf.usesStorageSASToken && shareName != "" {
if shareClient, err = share.NewClientWithNoCredential(a.getStorageSASURL(shareName), nil); err != nil {
return fmt.Errorf("error getting SAS url with storage SAS token: %v", err)
}
}
fileName, err := a.conf.Path.TryString(msg)
if err != nil {
return fmt.Errorf("path interpolation error: %v", err)
}
mBytes, err := msg.AsBytes()
if err != nil {
return err
}
if err := a.uploadFile(ctx, shareClient, fileName, mBytes); err != nil {
if isFileErrorCode(err, fileerror.ShareNotFound) {
if err := a.createFileShare(ctx, shareClient); err != nil {
if !isFileErrorCode(err, fileerror.ShareAlreadyExists) {
return fmt.Errorf("failed to create file share: %s", err)
}
}
if err := a.uploadFile(ctx, shareClient, fileName, mBytes); err != nil {
return fmt.Errorf("error retrying to upload file: %s", err)
}
} else {
return fmt.Errorf("failed to upload file: %s", err)
}
}
return nil
}

func (a *azureFileShareWriter) Close(context.Context) error {
return nil
}

func isFileErrorCode(err error, code fileerror.Code) bool {
var rerr *azcore.ResponseError
if ok := errors.As(err, &rerr); ok {
return rerr.ErrorCode == string(code)
}

return false
}
2 changes: 1 addition & 1 deletion internal/impl/azure/output_queue_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func qsoConfigFromParsed(pConf *service.ParsedConfig) (conf qsoConfig, err error
}

func qsoSpec() *service.ConfigSpec {
return azureComponentSpec(false).
return azureComponentSpec().
Beta().
Version("3.36.0").
Summary(`Sends messages to an Azure Storage Queue.`).
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/azure/output_table_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func tsoConfigFromParsed(pConf *service.ParsedConfig) (conf tsoConfig, err error
}

func tsoSpec() *service.ConfigSpec {
return azureComponentSpec(false).
return azureComponentSpec().
Beta().
Version("3.36.0").
Summary(`Stores messages in an Azure Table Storage table.`).
Expand Down

0 comments on commit 575828c

Please sign in to comment.