Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: refactor azureeventhubrehydrationreceiver to stream blobs as to not lock up on larger environments (BPOP-831) #2098

Open
wants to merge 7 commits into
base: release/v1.69.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 14 additions & 79 deletions receiver/azureblobrehydrationreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,25 +13,26 @@ This is not a traditional receiver that continually produces data but rather reh
- Traces

## How it works
1. The receiver polls blob storage for all blobs in the specified container.
1. The receiver polls blob storage for pages of blobs in the specified container. There is no current way of specifying a time range to rehydrate so any blobs outside fo the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration.
2. The receiver will parse each blob's path to determine if it matches a path created by the [Azure Blob Exporter](../../exporter/azureblobexporter/README.md#blob-path).
3. If the blob path is from the exporter, the receiver will parse the timestamp represented by the path.
4. If the timestamp is within the configured range the receiver will download the blob and parse its contents into OTLP data.

a. The receiver will process both uncompressed JSON blobs and blobs compressed with gzip.

## Configuration
| Field | Type | Default | Required | Description |
|--------------------|-----------|------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| connection_string | string | | `true` | The connection string to the Azure Blob Storage account. Can be found under the `Access keys` section of your storage account. |
| container | string | | `true` | The name of the container to rehydrate from. |
| root_folder | string | | `false` | The root folder that prefixes the blob path. Should match the `root_folder` value of the Azure Blob Exporter. |
| starting_time | string | | `true ` | The UTC start time that represents the start of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. |
| ending_time | string | | `true ` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. |
| delete_on_read | bool | `false` | `false ` | If `true` the blob will be deleted after being rehydrated. |
| poll_interval | string | `1m` | `false ` | How often to read a new set of blobs. This value is mostly to control how often the blob API is called to ensure once rehydration is done the receiver isn't making too many API calls. |
| poll_timeout | string | `30s` | `false ` | The timeout used when reading blobs from Azure. |
| storage | string | | `false ` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. |

| Field | Type | Default | Required | Description |
|-------|------|---------|----------|-------------|
| connection_string | string | | `true` | The connection string to the Azure Blob Storage account. Can be found under the `Access keys` section of your storage account. |
| container | string | | `true` | The name of the container to rehydrate from. |
| root_folder | string | | `false` | The root folder that prefixes the blob path. Should match the `root_folder` value of the Azure Blob Exporter. |
| starting_time | string | | `true` | The UTC start time that represents the start of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. |
| ending_time | string | | `true` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. |
| delete_on_read | bool | `false` | `false` | If `true` the blob will be deleted after being rehydrated. |
| storage | string | | `false` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. |
| batch_size | int | `100` | `false` | The number of blobs to continue processing in the pipeline before sending more data to the pipeline. |
| page_size | int | `1000` | `false` | The maximum number of blobs to request in a single API call. |

## Example Configuration

Expand All @@ -40,70 +41,4 @@ This is not a traditional receiver that continually produces data but rather reh
This configuration specifies a `connection_string`, `container`, `starting_time`, and `ending_time`.
This will rehydrate all blobs in the container `my-container` that have a path that represents they were created between `1:00pm` and `2:30pm` UTC time on `October 1, 2023`.

Such a path could look like the following:
```
year=2023/month=10/day=01/hour=13/minute=30/metrics_12345.json
year=2023/month=10/day=01/hour=13/minute=30/logs_12345.json
year=2023/month=10/day=01/hour=13/minute=30/traces_12345.json
```

```yaml
azureblobrehydration:
connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net"
container: "my-container"
starting_time: 2023-10-01T13:00
ending_time: 2023-10-01T14:30
```

### Using Storage Extension Configuration

This configuration shows using a storage extension to track rehydration progress over agent restarts. The `storage` field is set to the component ID of the storage extension.


```yaml
extensions:
file_storage:
directory: $OIQ_OTEL_COLLECTOR_HOME/storage

receivers:
azureblobrehydration:
connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net"
container: "my-container"
starting_time: 2023-10-01T13:00
ending_time: 2023-10-01T14:30
storage: "file_storage"
```

### Root Folder Configuration

This configuration specifies an additional field `root_folder` to match the `root_folder` value of the Azure Blob Exporter.
The `root_folder` value in the exporter will prefix the blob path with the root folder and it needs to be accounted for in the rehydration receiver.

Such a path could look like the following:
```
root/year=2023/month=10/day=01/hour=13/minute=30/metrics_12345.json
root/year=2023/month=10/day=01/hour=13/minute=30/logs_12345.json
root/year=2023/month=10/day=01/hour=13/minute=30/traces_12345.json
```

```yaml
azureblobrehydration:
connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net"
container: "my-container"
starting_time: 2023-10-01T13:00
ending_time: 2023-10-01T14:30
root_folder: "root"
```

### Delete on read Configuration

This configuration enables the `delete_on_read` functionality which will delete a blob from Azure after it has been successfully rehydrated into OTLP data and sent onto the next component in the pipeline.

```yaml
azureblobrehydration:
connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net"
container: "my-container"
starting_time: 2023-10-01T13:00
ending_time: 2023-10-01T14:30
delete_on_read: true
```
Such a path could look like the following:
23 changes: 11 additions & 12 deletions receiver/azureblobrehydrationreceiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (

// Config is the configuration for the azure blob rehydration receiver
type Config struct {
// BatchSize is the number of blobs to process entering the pipeline in a single batch. (default 1000)
BatchSize int `mapstructure:"batch_size"`

// ConnectionString is the Azure Blob Storage connection key,
// which can be found in the Azure Blob Storage resource on the Azure Portal. (no default)
ConnectionString string `mapstructure:"connection_string"`
Expand All @@ -45,19 +48,19 @@ type Config struct {
// Default value of false
DeleteOnRead bool `mapstructure:"delete_on_read"`

// PollInterval is the interval at which the Azure API is scanned for blobs.
// Default value of 1m
PollInterval time.Duration `mapstructure:"poll_interval"`

// PollTimeout is the timeout for the Azure API to scan for blobs.
PollTimeout time.Duration `mapstructure:"poll_timeout"`
// PageSize is the number of blobs to request from the Azure API at a time. (default 1000)
PageSize int `mapstructure:"page_size"`

// ID of the storage extension to use for storing progress
StorageID *component.ID `mapstructure:"storage"`
}

// Validate validates the config
func (c *Config) Validate() error {
if c.BatchSize < 1 {
return errors.New("batch_size must be greater than 0")
}

if c.ConnectionString == "" {
return errors.New("connection_string is required")
}
Expand All @@ -81,12 +84,8 @@ func (c *Config) Validate() error {
return errors.New("ending_time must be at least one minute after starting_time")
}

if c.PollInterval < time.Second {
return errors.New("poll_interval must be at least one second")
}

if c.PollTimeout < time.Second {
return errors.New("poll_timeout must be at least one second")
if c.PageSize < 1 {
return errors.New("page_size must be greater than 0")
}

return nil
Expand Down
49 changes: 24 additions & 25 deletions receiver/azureblobrehydrationreceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote
import (
"errors"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand All @@ -37,8 +36,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("connection_string is required"),
},
Expand All @@ -51,8 +50,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("container is required"),
},
Expand All @@ -65,8 +64,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("starting_time is invalid: missing value"),
},
Expand All @@ -79,8 +78,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("ending_time is invalid: missing value"),
},
Expand All @@ -93,8 +92,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "invalid_time",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("starting_time is invalid: invalid timestamp"),
},
Expand All @@ -107,8 +106,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "invalid_time",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("ending_time is invalid: invalid timestamp"),
},
Expand All @@ -121,38 +120,38 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T16:00",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: errors.New("ending_time must be at least one minute after starting_time"),
},
{
desc: "Bad poll_interval",
desc: "Bad batch_size",
cfg: &Config{
ConnectionString: "connection_string",
Container: "container",
RootFolder: "root",
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Millisecond,
PollTimeout: time.Second * 10,
BatchSize: 0,
PageSize: 1000,
},
expectErr: errors.New("poll_interval must be at least one second"),
expectErr: errors.New("batch_size must be greater than 0"),
},
{
desc: "Bad poll_timeout",
desc: "Bad page_size",
cfg: &Config{
ConnectionString: "connection_string",
Container: "container",
RootFolder: "root",
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second * 2,
PollTimeout: time.Millisecond,
BatchSize: 100,
PageSize: 0,
},
expectErr: errors.New("poll_timeout must be at least one second"),
expectErr: errors.New("page_size must be greater than 0"),
},
{
desc: "Valid config",
Expand All @@ -163,8 +162,8 @@ func TestConfigValidate(t *testing.T) {
StartingTime: "2023-10-02T17:00",
EndingTime: "2023-10-02T17:01",
DeleteOnRead: false,
PollInterval: time.Second,
PollTimeout: time.Second * 10,
BatchSize: 100,
PageSize: 1000,
},
expectErr: nil,
},
Expand Down
5 changes: 2 additions & 3 deletions receiver/azureblobrehydrationreceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote
import (
"context"
"errors"
"time"

"github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/metadata"
"go.opentelemetry.io/collector/component"
Expand All @@ -43,8 +42,8 @@ func NewFactory() receiver.Factory {
func createDefaultConfig() component.Config {
return &Config{
DeleteOnRead: false,
PollInterval: time.Minute,
PollTimeout: time.Second * 30,
BatchSize: 100,
PageSize: 1000,
}
}

Expand Down
5 changes: 2 additions & 3 deletions receiver/azureblobrehydrationreceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,15 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func Test_createDefaultConfig(t *testing.T) {
expectedCfg := &Config{
DeleteOnRead: false,
PollInterval: time.Minute,
PollTimeout: time.Second * 30,
BatchSize: 100,
PageSize: 1000,
}

componentCfg := createDefaultConfig()
Expand Down
Loading