Skip to content

Commit

Permalink
Add azure_file_share output
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Amador <[email protected]>
  • Loading branch information
mfamador committed Oct 4, 2024
1 parent b1e38d9 commit 0c054dd
Show file tree
Hide file tree
Showing 12 changed files with 436 additions and 7 deletions.
141 changes: 141 additions & 0 deletions docs/modules/components/pages/outputs/azure_file_share.adoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
= azure_file_share
:type: output
:status: beta
:categories: ["Services","Azure"]



////
THIS FILE IS AUTOGENERATED!

To make changes, edit the corresponding source file under:

https://github.com/redpanda-data/connect/tree/main/internal/impl/<provider>.

And:

https://github.com/redpanda-data/connect/tree/main/cmd/tools/docs_gen/templates/plugin.adoc.tmpl
////
// © 2024 Redpanda Data Inc.
component_type_dropdown::[]
Sends message parts as objects to an Azure File Share. Each object is uploaded with the filename specified with the `path` field.
Introduced in version 4.38.0.
```yml
# Config fields, showing default values
output:
label: ""
azure_file_share:
storage_account: ""
storage_access_key: ""
storage_connection_string: ""
storage_sas_token: ""
share_name: foo-share-${!timestamp("2006")} # No default (required)
path: ${!count("files")}-${!timestamp_unix_nano()}.txt
max_in_flight: 64
```
In order to have a different path for each object you should use function
interpolations described [here](/docs/configuration/interpolation#bloblang-queries), which are
calculated per message of a batch.
Supports multiple authentication methods but only one of the following is required:
- `storage_connection_string`
- `storage_account` and `storage_access_key`
- `storage_account` and `storage_sas_token`
- `storage_account` to access via [DefaultAzureCredential](https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#DefaultAzureCredential)
If multiple are set then the `storage_connection_string` is given priority.
If the `storage_connection_string` does not contain the `AccountName` parameter, please specify it in the
`storage_account` field.
== Fields
=== `storage_account`
The storage account to access. This field is ignored if `storage_connection_string` is set.
*Type*: `string`
*Default*: `""`
=== `storage_access_key`
The storage account access key. This field is ignored if `storage_connection_string` is set.
*Type*: `string`
*Default*: `""`
=== `storage_connection_string`
A storage account connection string. This field is required if `storage_account` and `storage_access_key` / `storage_sas_token` are not set.
*Type*: `string`
*Default*: `""`
=== `storage_sas_token`
The storage account SAS token. This field is ignored if `storage_connection_string` or `storage_access_key` are set.
*Type*: `string`
*Default*: `""`
=== `share_name`
The file share for uploading the files to. It will be created if it doesn't already exist and the credentials have the necessary permissions.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
```yml
# Examples
share_name: foo-share-${!timestamp("2006")}
```
=== `path`
The path of each file to upload.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
*Default*: `"${!count(\"files\")}-${!timestamp_unix_nano()}.txt"`
```yml
# Examples
path: foo-${!timestamp_unix_nano()}.json
path: ${!meta("kafka_key")}.json
path: ${!json("doc.namespace")}/${!json("doc.id")}.json
```
=== `max_in_flight`
The maximum number of messages to have in flight at a given time. Increase this to improve throughput.
*Type*: `int`
*Default*: `64`
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v1.0.3
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.2.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.3.1
github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0
github.com/Azure/go-amqp v1.0.5
github.com/ClickHouse/clickhouse-go/v2 v2.27.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.6.0/go.mod h1:oDrbWx4ewMylP7xHivfgixbfGBT6APAwsSoHRKotnIc=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0 h1:Be6KInmFEKV81c0pOAEbRYehLMwmmGI1exuFj248AMk=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.4.0/go.mod h1:WCPBHsOXfBVnivScjs2ypRfimjEW0qPVLGgJkZlrIOA=
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.3.1 h1:a1U6j4GPI18JQCqgz7/DcqXA1vzvGBugm14AXZfU0gs=
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.3.1/go.mod h1:tZyRNcHi2/yo+ugYHTUuOrHiboKilaizLnRL5aZTe6A=
github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0 h1:lJwNFV+xYjHREUTHJKx/ZF6CJSt9znxmLw9DqSTvyRU=
github.com/Azure/azure-sdk-for-go/sdk/storage/azqueue v1.0.0/go.mod h1:GfT0aGew8Qj5yiQVqOO5v7N8fanbJGyUoHqXg56qcVY=
github.com/Azure/azure-storage-blob-go v0.14.0/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck=
Expand Down
76 changes: 75 additions & 1 deletion internal/impl/azure/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"os"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azfile/share"

"github.com/redpanda-data/benthos/v4/public/service"

"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
Expand All @@ -36,7 +38,7 @@ const (
bscFieldStorageConnectionString = "storage_connection_string"
)

func azureComponentSpec(forBlobStorage bool) *service.ConfigSpec {
func azureComponentSpec() *service.ConfigSpec {
spec := service.NewConfigSpec().
Categories("Services", "Azure").
Fields(
Expand Down Expand Up @@ -80,6 +82,29 @@ func blobStorageClientFromParsed(pConf *service.ParsedConfig, container *service
return getBlobStorageClient(connectionString, storageAccount, storageAccessKey, storageSASToken, container)
}

func fileShareClientFromParsed(pConf *service.ParsedConfig, shareName string) (*share.Client, bool, bool, string, string, error) {
connectionString, err := pConf.FieldString(bscFieldStorageConnectionString)
if err != nil {
return nil, false, false, "", "", err
}
storageAccount, err := pConf.FieldString(bscFieldStorageAccount)
if err != nil {
return nil, false, false, "", "", err
}
storageAccessKey, err := pConf.FieldString(bscFieldStorageAccessKey)
if err != nil {
return nil, false, false, "", "", err
}
storageSASToken, err := pConf.FieldString(bscFieldStorageSASToken)
if err != nil {
return nil, false, false, "", "", err
}
if storageAccount == "" && connectionString == "" {
return nil, false, false, "", "", errors.New("invalid azure storage account credentials")
}
return getFileShareClient(connectionString, storageAccount, storageAccessKey, storageSASToken, shareName)
}

const (
blobEndpointExp = "https://%s.blob.core.windows.net"
)
Expand Down Expand Up @@ -127,6 +152,55 @@ func getBlobStorageClient(storageConnectionString, storageAccount, storageAccess
return client, containerSASToken, err
}

const (
fileEndpointExp = "https://%s.file.core.windows.net"
)

func getFileShareClient(storageConnectionString, storageAccount, storageAccessKey, storageSASToken, shareName string) (*share.Client, bool, bool, string, string, error) {
var (
client *share.Client
err error
usesStorageSASToken, fileSASToken bool
)
if len(storageConnectionString) > 0 {
storageConnectionString := parseStorageConnectionString(storageConnectionString, storageAccount)
client, err = share.NewClientFromConnectionString(storageConnectionString, shareName, nil)
} else if len(storageAccessKey) > 0 {
cred, credErr := share.NewSharedKeyCredential(storageAccount, storageAccessKey)
if credErr != nil {
return nil, false, false, "", "", fmt.Errorf("error creating shared key credential: %w", credErr)
}
serviceURL := fmt.Sprintf(fileEndpointExp, storageAccount)
client, err = share.NewClientWithSharedKeyCredential(serviceURL, cred, nil)
} else if len(storageSASToken) > 0 {
var serviceURL string
if strings.HasPrefix(storageSASToken, "sp=") {
// file share SAS token
fileSASToken = true
serviceURL = fmt.Sprintf("%s/%s?%s", fmt.Sprintf(fileEndpointExp, storageAccount), shareName, storageSASToken)
} else {
// storage account SAS token
usesStorageSASToken = true
serviceURL = fmt.Sprintf("%s/%s", fmt.Sprintf(fileEndpointExp, storageAccount), storageSASToken)
}
client, err = share.NewClientWithNoCredential(serviceURL, nil)
if usesStorageSASToken {
return client, fileSASToken, usesStorageSASToken, storageAccount, storageSASToken, err
}
} else {
cred, credErr := azidentity.NewDefaultAzureCredential(nil)
if credErr != nil {
return nil, false, false, "", "", fmt.Errorf("error getting default Azure credentials: %v", credErr)
}
serviceURL := fmt.Sprintf(fileEndpointExp, storageAccount)
client, err = share.NewClient(serviceURL, cred, nil)
}
if err != nil {
return nil, false, false, "", "", fmt.Errorf("invalid azure storage account credentials: %v", err)
}
return client, fileSASToken, usesStorageSASToken, "", "", err
}

// getEmulatorConnectionString returns the Azurite connection string for the provided service ports
// Details here: https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite?tabs=visual-studio#http-connection-strings
func getEmulatorConnectionString(blobServicePort, queueServicePort, tableServicePort string) string {
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/azure/input_blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func bsiConfigFromParsed(pConf *service.ParsedConfig) (conf bsiConfig, err error
}

func bsiSpec() *service.ConfigSpec {
return azureComponentSpec(true).
return azureComponentSpec().
Beta().
Version("3.36.0").
Summary(`Downloads objects within an Azure Blob Storage container, optionally filtered by a prefix.`).
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/azure/input_queue_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func qsiConfigFromParsed(pConf *service.ParsedConfig) (conf qsiConfig, err error
}

func qsiSpec() *service.ConfigSpec {
return azureComponentSpec(false).
return azureComponentSpec().
Beta().
Version("3.42.0").
Summary(`Dequeue objects from an Azure Storage Queue.`).
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/azure/input_table_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func tsiConfigFromParsed(pConf *service.ParsedConfig) (conf tsiConfig, err error
}

func tsiSpec() *service.ConfigSpec {
return azureComponentSpec(false).
return azureComponentSpec().
Beta().
Version("4.10.0").
Summary(`Queries an Azure Storage Account Table, optionally with multiple filters.`).
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/azure/output_blob_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func bsoConfigFromParsed(pConf *service.ParsedConfig) (conf bsoConfig, err error
}

func bsoSpec() *service.ConfigSpec {
return azureComponentSpec(true).
return azureComponentSpec().
Beta().
Version("3.36.0").
Summary(`Sends message parts as objects to an Azure Blob Storage Account container. Each object is uploaded with the filename specified with the `+"`container`"+` field.`).
Expand Down
Loading

0 comments on commit 0c054dd

Please sign in to comment.