diff --git a/receiver/azureblobrehydrationreceiver/README.md b/receiver/azureblobrehydrationreceiver/README.md index 906b9ecb9..697d90946 100644 --- a/receiver/azureblobrehydrationreceiver/README.md +++ b/receiver/azureblobrehydrationreceiver/README.md @@ -13,7 +13,7 @@ This is not a traditional receiver that continually produces data but rather reh - Traces ## How it works -1. The receiver polls blob storage for all blobs in the specified container. +1. The receiver polls blob storage for pages of blobs in the specified container. There is no current way of specifying a time range to rehydrate so any blobs outside fo the time range still need to be retrieved from the API in order to filter via the `starting_time` and `ending_time` configuration. 2. The receiver will parse each blob's path to determine if it matches a path created by the [Azure Blob Exporter](../../exporter/azureblobexporter/README.md#blob-path). 3. If the blob path is from the exporter, the receiver will parse the timestamp represented by the path. 4. If the timestamp is within the configured range the receiver will download the blob and parse its contents into OTLP data. @@ -21,17 +21,18 @@ This is not a traditional receiver that continually produces data but rather reh a. The receiver will process both uncompressed JSON blobs and blobs compressed with gzip. ## Configuration -| Field | Type | Default | Required | Description | -|--------------------|-----------|------------------|----------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| connection_string | string | | `true` | The connection string to the Azure Blob Storage account. Can be found under the `Access keys` section of your storage account. | -| container | string | | `true` | The name of the container to rehydrate from. | -| root_folder | string | | `false` | The root folder that prefixes the blob path. Should match the `root_folder` value of the Azure Blob Exporter. | -| starting_time | string | | `true ` | The UTC start time that represents the start of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | -| ending_time | string | | `true ` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | -| delete_on_read | bool | `false` | `false ` | If `true` the blob will be deleted after being rehydrated. | -| poll_interval | string | `1m` | `false ` | How often to read a new set of blobs. This value is mostly to control how often the blob API is called to ensure once rehydration is done the receiver isn't making too many API calls. | -| poll_timeout | string | `30s` | `false ` | The timeout used when reading blobs from Azure. | -| storage | string | | `false ` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. | + +| Field | Type | Default | Required | Description | +|-------|------|---------|----------|-------------| +| connection_string | string | | `true` | The connection string to the Azure Blob Storage account. Can be found under the `Access keys` section of your storage account. | +| container | string | | `true` | The name of the container to rehydrate from. | +| root_folder | string | | `false` | The root folder that prefixes the blob path. Should match the `root_folder` value of the Azure Blob Exporter. | +| starting_time | string | | `true` | The UTC start time that represents the start of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | +| ending_time | string | | `true` | The UTC end time that represents the end of the time range to rehydrate from. Must be in the form `YYYY-MM-DDTHH:MM`. | +| delete_on_read | bool | `false` | `false` | If `true` the blob will be deleted after being rehydrated. | +| storage | string | | `false` | The component ID of a storage extension. The storage extension prevents duplication of data after a collector restart by remembering which blobs were previously rehydrated. | +| batch_size | int | `100` | `false` | The number of blobs to continue processing in the pipeline before sending more data to the pipeline. | +| page_size | int | `1000` | `false` | The maximum number of blobs to request in a single API call. | ## Example Configuration @@ -40,70 +41,4 @@ This is not a traditional receiver that continually produces data but rather reh This configuration specifies a `connection_string`, `container`, `starting_time`, and `ending_time`. This will rehydrate all blobs in the container `my-container` that have a path that represents they were created between `1:00pm` and `2:30pm` UTC time on `October 1, 2023`. -Such a path could look like the following: -``` -year=2023/month=10/day=01/hour=13/minute=30/metrics_12345.json -year=2023/month=10/day=01/hour=13/minute=30/logs_12345.json -year=2023/month=10/day=01/hour=13/minute=30/traces_12345.json -``` - -```yaml -azureblobrehydration: - connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" - container: "my-container" - starting_time: 2023-10-01T13:00 - ending_time: 2023-10-01T14:30 -``` - -### Using Storage Extension Configuration - -This configuration shows using a storage extension to track rehydration progress over agent restarts. The `storage` field is set to the component ID of the storage extension. - - -```yaml -extensions: - file_storage: - directory: $OIQ_OTEL_COLLECTOR_HOME/storage - -receivers: - azureblobrehydration: - connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" - container: "my-container" - starting_time: 2023-10-01T13:00 - ending_time: 2023-10-01T14:30 - storage: "file_storage" -``` - -### Root Folder Configuration - -This configuration specifies an additional field `root_folder` to match the `root_folder` value of the Azure Blob Exporter. -The `root_folder` value in the exporter will prefix the blob path with the root folder and it needs to be accounted for in the rehydration receiver. - -Such a path could look like the following: -``` -root/year=2023/month=10/day=01/hour=13/minute=30/metrics_12345.json -root/year=2023/month=10/day=01/hour=13/minute=30/logs_12345.json -root/year=2023/month=10/day=01/hour=13/minute=30/traces_12345.json -``` - -```yaml -azureblobrehydration: - connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" - container: "my-container" - starting_time: 2023-10-01T13:00 - ending_time: 2023-10-01T14:30 - root_folder: "root" -``` - -### Delete on read Configuration - -This configuration enables the `delete_on_read` functionality which will delete a blob from Azure after it has been successfully rehydrated into OTLP data and sent onto the next component in the pipeline. - -```yaml -azureblobrehydration: - connection_string: "DefaultEndpointsProtocol=https;AccountName=storage_account_name;AccountKey=storage_account_key;EndpointSuffix=core.windows.net" - container: "my-container" - starting_time: 2023-10-01T13:00 - ending_time: 2023-10-01T14:30 - delete_on_read: true -``` +Such a path could look like the following: \ No newline at end of file diff --git a/receiver/azureblobrehydrationreceiver/config.go b/receiver/azureblobrehydrationreceiver/config.go index 1d69e5a57..8192870b4 100644 --- a/receiver/azureblobrehydrationreceiver/config.go +++ b/receiver/azureblobrehydrationreceiver/config.go @@ -25,6 +25,9 @@ import ( // Config is the configuration for the azure blob rehydration receiver type Config struct { + // BatchSize is the number of blobs to process entering the pipeline in a single batch. (default 1000) + BatchSize int `mapstructure:"batch_size"` + // ConnectionString is the Azure Blob Storage connection key, // which can be found in the Azure Blob Storage resource on the Azure Portal. (no default) ConnectionString string `mapstructure:"connection_string"` @@ -45,12 +48,8 @@ type Config struct { // Default value of false DeleteOnRead bool `mapstructure:"delete_on_read"` - // PollInterval is the interval at which the Azure API is scanned for blobs. - // Default value of 1m - PollInterval time.Duration `mapstructure:"poll_interval"` - - // PollTimeout is the timeout for the Azure API to scan for blobs. - PollTimeout time.Duration `mapstructure:"poll_timeout"` + // PageSize is the number of blobs to request from the Azure API at a time. (default 1000) + PageSize int `mapstructure:"page_size"` // ID of the storage extension to use for storing progress StorageID *component.ID `mapstructure:"storage"` @@ -58,6 +57,10 @@ type Config struct { // Validate validates the config func (c *Config) Validate() error { + if c.BatchSize < 1 { + return errors.New("batch_size must be greater than 0") + } + if c.ConnectionString == "" { return errors.New("connection_string is required") } @@ -81,12 +84,8 @@ func (c *Config) Validate() error { return errors.New("ending_time must be at least one minute after starting_time") } - if c.PollInterval < time.Second { - return errors.New("poll_interval must be at least one second") - } - - if c.PollTimeout < time.Second { - return errors.New("poll_timeout must be at least one second") + if c.PageSize < 1 { + return errors.New("page_size must be greater than 0") } return nil diff --git a/receiver/azureblobrehydrationreceiver/config_test.go b/receiver/azureblobrehydrationreceiver/config_test.go index 8ddb34ef9..33b50e0e7 100644 --- a/receiver/azureblobrehydrationreceiver/config_test.go +++ b/receiver/azureblobrehydrationreceiver/config_test.go @@ -17,7 +17,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote import ( "errors" "testing" - "time" "github.com/stretchr/testify/require" ) @@ -37,8 +36,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("connection_string is required"), }, @@ -51,8 +50,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("container is required"), }, @@ -65,8 +64,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("starting_time is invalid: missing value"), }, @@ -79,8 +78,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("ending_time is invalid: missing value"), }, @@ -93,8 +92,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "invalid_time", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("starting_time is invalid: invalid timestamp"), }, @@ -107,8 +106,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "invalid_time", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("ending_time is invalid: invalid timestamp"), }, @@ -121,13 +120,13 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T16:00", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: errors.New("ending_time must be at least one minute after starting_time"), }, { - desc: "Bad poll_interval", + desc: "Bad batch_size", cfg: &Config{ ConnectionString: "connection_string", Container: "container", @@ -135,13 +134,13 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Millisecond, - PollTimeout: time.Second * 10, + BatchSize: 0, + PageSize: 1000, }, - expectErr: errors.New("poll_interval must be at least one second"), + expectErr: errors.New("batch_size must be greater than 0"), }, { - desc: "Bad poll_timeout", + desc: "Bad page_size", cfg: &Config{ ConnectionString: "connection_string", Container: "container", @@ -149,10 +148,10 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second * 2, - PollTimeout: time.Millisecond, + BatchSize: 100, + PageSize: 0, }, - expectErr: errors.New("poll_timeout must be at least one second"), + expectErr: errors.New("page_size must be greater than 0"), }, { desc: "Valid config", @@ -163,8 +162,8 @@ func TestConfigValidate(t *testing.T) { StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T17:01", DeleteOnRead: false, - PollInterval: time.Second, - PollTimeout: time.Second * 10, + BatchSize: 100, + PageSize: 1000, }, expectErr: nil, }, diff --git a/receiver/azureblobrehydrationreceiver/factory.go b/receiver/azureblobrehydrationreceiver/factory.go index 6465eb47f..3309b59a2 100644 --- a/receiver/azureblobrehydrationreceiver/factory.go +++ b/receiver/azureblobrehydrationreceiver/factory.go @@ -17,7 +17,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote import ( "context" "errors" - "time" "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/metadata" "go.opentelemetry.io/collector/component" @@ -43,8 +42,8 @@ func NewFactory() receiver.Factory { func createDefaultConfig() component.Config { return &Config{ DeleteOnRead: false, - PollInterval: time.Minute, - PollTimeout: time.Second * 30, + BatchSize: 100, + PageSize: 1000, } } diff --git a/receiver/azureblobrehydrationreceiver/factory_test.go b/receiver/azureblobrehydrationreceiver/factory_test.go index ce8e95ff0..771089d8f 100644 --- a/receiver/azureblobrehydrationreceiver/factory_test.go +++ b/receiver/azureblobrehydrationreceiver/factory_test.go @@ -16,7 +16,6 @@ package azureblobrehydrationreceiver //import "github.com/observiq/bindplane-ote import ( "testing" - "time" "github.com/stretchr/testify/require" ) @@ -24,8 +23,8 @@ import ( func Test_createDefaultConfig(t *testing.T) { expectedCfg := &Config{ DeleteOnRead: false, - PollInterval: time.Minute, - PollTimeout: time.Second * 30, + BatchSize: 100, + PageSize: 1000, } componentCfg := createDefaultConfig() diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go index dd5373c75..f0ea14316 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client.go @@ -19,6 +19,7 @@ import ( "context" "fmt" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" ) @@ -32,75 +33,122 @@ type BlobInfo struct { // //go:generate mockery --name BlobClient --output ./mocks --with-expecter --filename mock_blob_client.go --structname MockBlobClient type BlobClient interface { - // ListBlobs returns a list of blobInfo objects present in the container with the given prefix - ListBlobs(ctx context.Context, container string, prefix, marker *string) ([]*BlobInfo, *string, error) - // DownloadBlob downloads the contents of the blob into the supplied buffer. // It will return the count of bytes used in the buffer. DownloadBlob(ctx context.Context, container, blobPath string, buf []byte) (int64, error) // DeleteBlob deletes the blob in the specified container DeleteBlob(ctx context.Context, container, blobPath string) error + + // StreamBlobs will stream BlobInfo to the blobChan and errors to the errChan, generally if an errChan gets an item + // then the stream should be stopped + StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) +} + +type blobClient interface { + NewListBlobsFlatPager(containerName string, options *azblob.ListBlobsFlatOptions) *runtime.Pager[azblob.ListBlobsFlatResponse] + DownloadBuffer(ctx context.Context, containerName string, blobPath string, buffer []byte, options *azblob.DownloadBufferOptions) (int64, error) + DeleteBlob(ctx context.Context, containerName string, blobPath string, options *azblob.DeleteBlobOptions) (azblob.DeleteBlobResponse, error) } +var _ blobClient = &azblob.Client{} + // AzureClient is an implementation of the BlobClient for Azure type AzureClient struct { - azClient *azblob.Client + azClient blobClient + batchSize int + pageSize int32 } // NewAzureBlobClient creates a new azureBlobClient with the given connection string -func NewAzureBlobClient(connectionString string) (BlobClient, error) { +func NewAzureBlobClient(connectionString string, batchSize, pageSize int) (BlobClient, error) { azClient, err := azblob.NewClientFromConnectionString(connectionString, nil) if err != nil { return nil, err } return &AzureClient{ - azClient: azClient, + azClient: azClient, + batchSize: batchSize, + pageSize: int32(pageSize), }, nil } -// contentLengthKey key for the content length metadata -const contentLengthKey = "ContentLength" - -// ListBlobs returns a list of blobInfo objects present in the container with the given prefix -func (a *AzureClient) ListBlobs(ctx context.Context, container string, prefix, marker *string) ([]*BlobInfo, *string, error) { - listOptions := &azblob.ListBlobsFlatOptions{ - Marker: marker, - Prefix: prefix, - } +const emptyPollLimit = 3 - pager := a.azClient.NewListBlobsFlatPager(container, listOptions) +// BlobResults contains the blobs for the receiver to process and the last marker +type BlobResults struct { + Blobs []*BlobInfo + LastMarker *string +} - var nextMarker *string - blobs := make([]*BlobInfo, 0) +// StreamBlobs will stream blobs to the blobChan and errors to the errChan, generally if an errChan gets an item +// then the stream should be stopped +func (a *AzureClient) StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *BlobResults, doneChan chan struct{}) { + var marker *string + pager := a.azClient.NewListBlobsFlatPager(container, &azblob.ListBlobsFlatOptions{ + Marker: marker, + Prefix: prefix, + MaxResults: &a.pageSize, + }) + + emptyPollCount := 0 for pager.More() { - resp, err := pager.NextPage(ctx) - if err != nil { - return nil, nil, fmt.Errorf("listBlobs: %w", err) - } + select { + case <-ctx.Done(): + return + default: + // If we had empty polls for the last 3 times, then we can assume that there are no more blobs to process + // and we can close the stream to avoid charging for the requests + if emptyPollCount == emptyPollLimit { + close(doneChan) + return + } - for _, blob := range resp.Segment.BlobItems { - // Skip deleted blobs - if blob.Deleted != nil && *blob.Deleted { - continue + resp, err := pager.NextPage(ctx) + if err != nil { + errChan <- fmt.Errorf("error streaming blobs: %w", err) + return } - // All blob fields are pointers so check all pointers we need before we try to process it - if blob.Name == nil || blob.Properties == nil || blob.Properties.ContentLength == nil { - continue + + batch := []*BlobInfo{} + for _, blob := range resp.Segment.BlobItems { + if blob.Deleted != nil && *blob.Deleted { + continue + } + if blob.Name == nil || blob.Properties == nil || blob.Properties.ContentLength == nil { + continue + } + + info := &BlobInfo{ + Name: *blob.Name, + Size: *blob.Properties.ContentLength, + } + batch = append(batch, info) + if len(batch) == int(a.batchSize) { + blobChan <- &BlobResults{ + Blobs: batch, + LastMarker: marker, + } + batch = []*BlobInfo{} + } } - info := &BlobInfo{ - Name: *blob.Name, - Size: *blob.Properties.ContentLength, + if len(batch) == 0 { + emptyPollCount++ + continue } - blobs = append(blobs, info) + emptyPollCount = 0 + blobChan <- &BlobResults{ + Blobs: batch, + LastMarker: marker, + } + marker = resp.NextMarker } - nextMarker = resp.NextMarker } - return blobs, nextMarker, nil + close(doneChan) } // DownloadBlob downloads the contents of the blob into the supplied buffer. diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go new file mode 100644 index 000000000..8fa446bdc --- /dev/null +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/blob_client_test.go @@ -0,0 +1,151 @@ +// Copyright observIQ, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package azureblob + +import ( + "context" + "errors" + "testing" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/runtime" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +func TestNewAzureBlobClient(t *testing.T) { + tests := []struct { + name string + connectionStr string + batchSize int + pageSize int + expectedError bool + }{ + { + name: "Invalid connection string", + connectionStr: "invalid", + batchSize: 100, + pageSize: 1000, + expectedError: true, + }, + { + name: "Valid connection string", + connectionStr: "DefaultEndpointsProtocol=https;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;", + batchSize: 100, + pageSize: 1000, + expectedError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client, err := NewAzureBlobClient(tt.connectionStr, tt.batchSize, tt.pageSize) + if tt.expectedError { + require.Error(t, err) + require.Nil(t, client) + } else { + require.NoError(t, err) + require.NotNil(t, client) + } + }) + } +} + +func TestDownloadBlob(t *testing.T) { + // Create a mock Azure client using testify/mock + mockClient := &mockAzureClient{} + + client := &AzureClient{ + azClient: mockClient, + batchSize: 100, + pageSize: 1000, + } + + ctx := context.Background() + container := "testcontainer" + blobPath := "test/blob.txt" + testData := []byte("test data content") + buf := make([]byte, 1024) + + mockClient.On("DownloadBuffer", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Run(func(args mock.Arguments) { + buf := args.Get(3).([]byte) + copy(buf, testData) + }).Return(int64(len(testData)), nil) + + t.Run("successful download", func(t *testing.T) { + bytesDownloaded, err := client.DownloadBlob(ctx, container, blobPath, buf) + require.NoError(t, err) + require.Equal(t, int64(len(testData)), bytesDownloaded) + require.Equal(t, string(testData), string(buf[:len(testData)])) + }) +} + +func TestDeleteBlobSuccess(t *testing.T) { + // Create a mock Azure client using testify/mock + mockClient := &mockAzureClient{} + client := &AzureClient{ + azClient: mockClient, + batchSize: 100, + pageSize: 1000, + } + + ctx := context.Background() + container := "testcontainer" + blobPath := "test/blob.txt" + + mockClient.On("DeleteBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(azblob.DeleteBlobResponse{}, nil) + err := client.DeleteBlob(ctx, container, blobPath) + require.NoError(t, err) + +} + +func TestDeleteBlobFailure(t *testing.T) { + // Create a mock Azure client using testify/mock + mockClient := &mockAzureClient{} + client := &AzureClient{ + azClient: mockClient, + batchSize: 100, + pageSize: 1000, + } + + ctx := context.Background() + container := "testcontainer" + blobPath := "test/blob.txt" + + mockClient.On("DeleteBlob", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(azblob.DeleteBlobResponse{}, errors.New("failed to delete")) + err := client.DeleteBlob(ctx, container, blobPath) + require.Error(t, err) + require.Equal(t, "failed to delete", err.Error()) +} + +// mockAzureClient is a mock implementation of the Azure blob client +type mockAzureClient struct { + mock.Mock +} + +func (m *mockAzureClient) NewListBlobsFlatPager(containerName string, options *azblob.ListBlobsFlatOptions) *runtime.Pager[azblob.ListBlobsFlatResponse] { + args := m.Called(containerName, options) + return args.Get(0).(*runtime.Pager[azblob.ListBlobsFlatResponse]) +} + +func (m *mockAzureClient) DownloadBuffer(ctx context.Context, containerName string, blobPath string, buffer []byte, options *azblob.DownloadBufferOptions) (int64, error) { + args := m.Called(ctx, containerName, blobPath, buffer, options) + return args.Get(0).(int64), args.Error(1) +} + +func (m *mockAzureClient) DeleteBlob(ctx context.Context, containerName string, blobPath string, options *azblob.DeleteBlobOptions) (azblob.DeleteBlobResponse, error) { + args := m.Called(ctx, containerName, blobPath, options) + return args.Get(0).(azblob.DeleteBlobResponse), args.Error(1) +} diff --git a/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go b/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go index 8e24aa1c5..621e9211b 100644 --- a/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go +++ b/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks/mock_blob_client.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.50.0. DO NOT EDIT. +// Code generated by mockery v2.49.0. DO NOT EDIT. package mocks @@ -200,6 +200,44 @@ func (_c *MockBlobClient_ListBlobs_Call) RunAndReturn(run func(context.Context, return _c } +// StreamBlobs provides a mock function with given fields: ctx, container, prefix, errChan, blobChan, doneChan +func (_m *MockBlobClient) StreamBlobs(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *azureblob.BlobResults, doneChan chan struct{}) { + _m.Called(ctx, container, prefix, errChan, blobChan, doneChan) +} + +// MockBlobClient_StreamBlobs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'StreamBlobs' +type MockBlobClient_StreamBlobs_Call struct { + *mock.Call +} + +// StreamBlobs is a helper method to define mock.On call +// - ctx context.Context +// - container string +// - prefix *string +// - errChan chan error +// - blobChan chan *azureblob.BlobResults +// - doneChan chan struct{} +func (_e *MockBlobClient_Expecter) StreamBlobs(ctx interface{}, container interface{}, prefix interface{}, errChan interface{}, blobChan interface{}, doneChan interface{}) *MockBlobClient_StreamBlobs_Call { + return &MockBlobClient_StreamBlobs_Call{Call: _e.mock.On("StreamBlobs", ctx, container, prefix, errChan, blobChan, doneChan)} +} + +func (_c *MockBlobClient_StreamBlobs_Call) Run(run func(ctx context.Context, container string, prefix *string, errChan chan error, blobChan chan *azureblob.BlobResults, doneChan chan struct{})) *MockBlobClient_StreamBlobs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(*string), args[3].(chan error), args[4].(chan *azureblob.BlobResults), args[5].(chan struct{})) + }) + return _c +} + +func (_c *MockBlobClient_StreamBlobs_Call) Return() *MockBlobClient_StreamBlobs_Call { + _c.Call.Return() + return _c +} + +func (_c *MockBlobClient_StreamBlobs_Call) RunAndReturn(run func(context.Context, string, *string, chan error, chan *azureblob.BlobResults, chan struct{})) *MockBlobClient_StreamBlobs_Call { + _c.Call.Return(run) + return _c +} + // NewMockBlobClient creates a new instance of MockBlobClient. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewMockBlobClient(t interface { diff --git a/receiver/azureblobrehydrationreceiver/receiver.go b/receiver/azureblobrehydrationreceiver/receiver.go index f55eefb09..07c887090 100644 --- a/receiver/azureblobrehydrationreceiver/receiver.go +++ b/receiver/azureblobrehydrationreceiver/receiver.go @@ -19,6 +19,7 @@ import ( "errors" "fmt" "path/filepath" + "sync" "time" "github.com/observiq/bindplane-otel-collector/internal/rehydration" @@ -40,15 +41,22 @@ type rehydrationReceiver struct { azureClient azureblob.BlobClient supportedTelemetry pipeline.Signal consumer rehydration.Consumer + checkpoint *rehydration.CheckPoint checkpointStore rehydration.CheckpointStorer + blobChan chan *azureblob.BlobResults + errChan chan error + doneChan chan struct{} + + mut *sync.Mutex + + lastBlob *azureblob.BlobInfo + lastBlobTime *time.Time + startingTime time.Time endingTime time.Time - doneChan chan struct{} - started bool - ctx context.Context - cancelFunc context.CancelCauseFunc + cancelFunc context.CancelFunc } // newMetricsReceiver creates a new metrics specific receiver. @@ -92,7 +100,7 @@ func newTracesReceiver(id component.ID, logger *zap.Logger, cfg *Config, nextCon // newRehydrationReceiver creates a new rehydration receiver func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (*rehydrationReceiver, error) { - azureClient, err := newAzureBlobClient(cfg.ConnectionString) + azureClient, err := newAzureBlobClient(cfg.ConnectionString, cfg.BatchSize, cfg.PageSize) if err != nil { return nil, fmt.Errorf("new Azure client: %w", err) } @@ -109,170 +117,150 @@ func newRehydrationReceiver(id component.ID, logger *zap.Logger, cfg *Config) (* return nil, fmt.Errorf("invalid ending_time timestamp: %w", err) } - ctx, cancel := context.WithCancelCause(context.Background()) - return &rehydrationReceiver{ logger: logger, id: id, cfg: cfg, azureClient: azureClient, - doneChan: make(chan struct{}), checkpointStore: rehydration.NewNopStorage(), startingTime: startingTime, endingTime: endingTime, - ctx: ctx, - cancelFunc: cancel, + blobChan: make(chan *azureblob.BlobResults), + errChan: make(chan error), + doneChan: make(chan struct{}), + mut: &sync.Mutex{}, }, nil } +// move delimiter to transform block + // Start starts the rehydration receiver func (r *rehydrationReceiver) Start(ctx context.Context, host component.Host) error { - if r.cfg.StorageID != nil { checkpointStore, err := rehydration.NewCheckpointStorage(ctx, host, *r.cfg.StorageID, r.id, r.supportedTelemetry) if err != nil { return fmt.Errorf("NewCheckpointStorage: %w", err) } - r.checkpointStore = checkpointStore } - - r.started = true - go r.scrape() + go r.streamRehydrateBlobs(ctx) return nil } // Shutdown shuts down the rehydration receiver func (r *rehydrationReceiver) Shutdown(ctx context.Context) error { - r.cancelFunc(errors.New("shutdown")) var err error - // If we have called started then close and wait for goroutine to finish - if r.started { - select { - case <-ctx.Done(): - err = ctx.Err() - case <-r.doneChan: - } + if r.cancelFunc != nil { + r.cancelFunc() + } + if err := r.makeCheckpoint(ctx); err != nil { + r.logger.Error("Error while saving checkpoint", zap.Error(err)) + err = errors.Join(err, err) } - err = errors.Join(err, r.checkpointStore.Close(ctx)) - return err } -// emptyPollLimit is the number of consecutive empty polling cycles that can -// occur before we stop polling. -const emptyPollLimit = 3 - -// scrape scrapes the Azure api on interval -func (r *rehydrationReceiver) scrape() { - defer close(r.doneChan) - ticker := time.NewTicker(r.cfg.PollInterval) - defer ticker.Stop() - - var marker *string - - // load the previous checkpoint. If not exist should return zero value for time - checkpoint, err := r.checkpointStore.LoadCheckPoint(r.ctx, r.checkpointKey()) +func (r *rehydrationReceiver) streamRehydrateBlobs(ctx context.Context) { + checkpoint, err := r.checkpointStore.LoadCheckPoint(ctx, r.checkpointKey()) if err != nil { r.logger.Warn("Error loading checkpoint, continuing without a previous checkpoint", zap.Error(err)) checkpoint = rehydration.NewCheckpoint() } + r.checkpoint = checkpoint - // Call once before the loop to ensure we do a collection before the first ticker - numBlobsRehydrated := r.rehydrateBlobs(checkpoint, marker) - emptyBlobCounter := checkBlobCount(numBlobsRehydrated, 0) - - for { - select { - case <-r.ctx.Done(): - return - case <-ticker.C: - // Polling for blobs has egress charges so we want to stop polling - // after we stop finding blobs. - if emptyBlobCounter == emptyPollLimit { - return - } - - numBlobsRehydrated := r.rehydrateBlobs(checkpoint, marker) - emptyBlobCounter = checkBlobCount(numBlobsRehydrated, emptyBlobCounter) - } - } -} - -// rehydrateBlobs pulls blob paths from the UI and if they are within the specified -// time range then the blobs will be downloaded and rehydrated. -// The passed in checkpoint and marker will be updated and should be used in the next iteration. -// The count of blobs processed will be returned -func (r *rehydrationReceiver) rehydrateBlobs(checkpoint *rehydration.CheckPoint, marker *string) (numBlobsRehydrated int) { var prefix *string if r.cfg.RootFolder != "" { prefix = &r.cfg.RootFolder } - ctxTimeout, cancel := context.WithTimeout(r.ctx, r.cfg.PollTimeout) - defer cancel() + cancelCtx, cancel := context.WithCancel(ctx) + r.cancelFunc = cancel - // get blobs from Azure - blobs, nextMarker, err := r.azureClient.ListBlobs(ctxTimeout, r.cfg.Container, prefix, marker) - if err != nil { - r.logger.Error("Failed to list blobs", zap.Error(err)) - return - } + startTime := time.Now() + r.logger.Info("Starting rehydration", zap.Time("startTime", startTime)) + + go r.azureClient.StreamBlobs(cancelCtx, r.cfg.Container, prefix, r.errChan, r.blobChan, r.doneChan) - marker = nextMarker + for { + select { + case <-ctx.Done(): + return + case <-r.doneChan: + r.logger.Info("Finished rehydrating blobs", zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) + return + case err := <-r.errChan: + r.logger.Error("Error streaming blobs, stopping rehydration", zap.Error(err), zap.Int("durationSeconds", int(time.Since(startTime).Seconds()))) + return + case br := <-r.blobChan: + r.rehydrateBlobs(ctx, br.Blobs) + } + } +} +func (r *rehydrationReceiver) rehydrateBlobs(ctx context.Context, blobs []*azureblob.BlobInfo) { // Go through each blob and parse it's path to determine if we should consume it or not + numProcessedBlobs := 0 for _, blob := range blobs { + select { + case <-ctx.Done(): + return + default: + } + blobTime, telemetryType, err := rehydration.ParseEntityPath(blob.Name) switch { case errors.Is(err, rehydration.ErrInvalidEntityPath): r.logger.Debug("Skipping Blob, non-matching blob path", zap.String("blob", blob.Name)) case err != nil: r.logger.Error("Error processing blob path", zap.String("blob", blob.Name), zap.Error(err)) - case checkpoint.ShouldParse(*blobTime, blob.Name): + case r.checkpoint.ShouldParse(*blobTime, blob.Name): // if the blob is not in the specified time range or not of the telemetry type supported by this receiver // then skip consuming it. if !rehydration.IsInTimeRange(*blobTime, r.startingTime, r.endingTime) || telemetryType != r.supportedTelemetry { continue } + r.lastBlob = blob + r.lastBlobTime = blobTime + // Process and consume the blob at the given path - if err := r.processBlob(blob); err != nil { - r.logger.Error("Error consuming blob", zap.String("blob", blob.Name), zap.Error(err)) + if err := r.processBlob(ctx, blob); err != nil { + // If the error is because the context was canceled, then we don't want to log it + if !errors.Is(err, context.Canceled) { + r.logger.Error("Error consuming blob", zap.String("blob", blob.Name), zap.Error(err)) + } + continue } - numBlobsRehydrated++ - - // Update and save the checkpoint with the most recently processed blob - checkpoint.UpdateCheckpoint(*blobTime, blob.Name) - if err := r.checkpointStore.SaveCheckpoint(r.ctx, r.checkpointKey(), checkpoint); err != nil { - r.logger.Error("Error while saving checkpoint", zap.Error(err)) - } + numProcessedBlobs++ + r.logger.Debug("Processed blob", zap.String("blob", blob.Name), zap.Int("num_processed_blobs", numProcessedBlobs)) // Delete blob if configured to do so if r.cfg.DeleteOnRead { - if err := r.azureClient.DeleteBlob(r.ctx, r.cfg.Container, blob.Name); err != nil { + if err := r.azureClient.DeleteBlob(ctx, r.cfg.Container, blob.Name); err != nil { r.logger.Error("Error while attempting to delete blob", zap.String("blob", blob.Name), zap.Error(err)) } } } } - return + if err := r.makeCheckpoint(ctx); err != nil { + r.logger.Error("Error while saving checkpoint", zap.Error(err)) + } } // processBlob does the following: // 1. Downloads the blob // 2. Decompresses the blob if applicable // 3. Pass the blob to the consumer -func (r *rehydrationReceiver) processBlob(blob *azureblob.BlobInfo) error { +func (r *rehydrationReceiver) processBlob(ctx context.Context, blob *azureblob.BlobInfo) error { // Allocate a buffer the size of the blob. If the buffer isn't big enough download errors. blobBuffer := make([]byte, blob.Size) - size, err := r.azureClient.DownloadBlob(r.ctx, r.cfg.Container, blob.Name, blobBuffer) + size, err := r.azureClient.DownloadBlob(ctx, r.cfg.Container, blob.Name, blobBuffer) if err != nil { return fmt.Errorf("download blob: %w", err) } @@ -291,7 +279,7 @@ func (r *rehydrationReceiver) processBlob(blob *azureblob.BlobInfo) error { return fmt.Errorf("unsupported file type: %s", ext) } - if err := r.consumer.Consume(r.ctx, blobBuffer); err != nil { + if err := r.consumer.Consume(ctx, blobBuffer); err != nil { return fmt.Errorf("consume: %w", err) } return nil @@ -305,16 +293,13 @@ func (r *rehydrationReceiver) checkpointKey() string { return fmt.Sprintf("%s_%s_%s", checkpointStorageKey, r.id, r.supportedTelemetry.String()) } -// checkBlobCount checks the number of blobs rehydrated and the current state of the -// empty counter. If zero blobs were rehydrated increment the counter. -// If there were blobs rehydrated reset the counter as we want to track consecutive zero sized polls. -func checkBlobCount(numBlobsRehydrated, emptyBlobsCounter int) int { - switch { - case emptyBlobsCounter == emptyPollLimit: // If we are at the limit return the limit - return emptyPollLimit - case numBlobsRehydrated == 0: // If no blobs were rehydrated then increment the empty blobs counter - return emptyBlobsCounter + 1 - default: // Default case is numBlobsRehydrated > 0 so reset emptyBlobsCounter to 0 - return 0 +func (r *rehydrationReceiver) makeCheckpoint(ctx context.Context) error { + if r.lastBlob == nil || r.lastBlobTime == nil { + return nil } + r.logger.Debug("Making checkpoint", zap.String("blob", r.lastBlob.Name), zap.Time("time", *r.lastBlobTime)) + r.mut.Lock() + defer r.mut.Unlock() + r.checkpoint.UpdateCheckpoint(*r.lastBlobTime, r.lastBlob.Name) + return r.checkpointStore.SaveCheckpoint(ctx, r.checkpointKey(), r.checkpoint) } diff --git a/receiver/azureblobrehydrationreceiver/receiver_test.go b/receiver/azureblobrehydrationreceiver/receiver_test.go index 3144d6347..88f5dc5c2 100644 --- a/receiver/azureblobrehydrationreceiver/receiver_test.go +++ b/receiver/azureblobrehydrationreceiver/receiver_test.go @@ -19,14 +19,9 @@ import ( "compress/gzip" "context" "errors" - "sync/atomic" "testing" "time" - "github.com/observiq/bindplane-otel-collector/internal/rehydration" - "github.com/observiq/bindplane-otel-collector/internal/testutils" - "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/azureblob" - blobmocks "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" @@ -34,6 +29,11 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pipeline" "go.uber.org/zap" + + "github.com/observiq/bindplane-otel-collector/internal/rehydration" + "github.com/observiq/bindplane-otel-collector/internal/testutils" + "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/azureblob" + blobmocks "github.com/observiq/bindplane-otel-collector/receiver/azureblobrehydrationreceiver/internal/azureblob/mocks" ) func Test_newMetricsReceiver(t *testing.T) { @@ -111,32 +111,10 @@ func Test_fullRehydration(t *testing.T) { cfg := &Config{ StartingTime: "2023-10-02T17:00", EndingTime: "2023-10-02T18:00", - PollInterval: 10 * time.Millisecond, Container: "container", DeleteOnRead: false, } - t.Run("empty blob polling", func(t *testing.T) { - var listCounter atomic.Int32 - - // Setup mocks - mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Times(3).Return([]*azureblob.BlobInfo{}, nil, nil). - Run(func(_ mock.Arguments) { - listCounter.Add(1) - }) - - // Create new receiver - testConsumer := &consumertest.MetricsSink{} - r, err := newMetricsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - - checkFunc := func() bool { - return listCounter.Load() == 3 - } - runRehydrationValidateTest(t, r, checkFunc) - }) - t.Run("metrics", func(t *testing.T) { // Test data metrics, jsonBytes := testutils.GenerateTestMetrics(t) @@ -153,24 +131,29 @@ func Test_fullRehydration(t *testing.T) { }, } + // Create new receiver + targetBlob := returnedBlobInfo[0] // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + testConsumer := &consumertest.MetricsSink{} + r, err := newMetricsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- &azureblob.BlobResults{ + Blobs: returnedBlobInfo, + } + close(r.doneChan) + }) + mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) - copy(buf, jsonBytes) - return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.MetricsSink{} - r, err := newMetricsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.DataPointCount() == metrics.DataPointCount() } @@ -195,23 +178,26 @@ func Test_fullRehydration(t *testing.T) { } targetBlob := returnedBlobInfo[0] + mockClient := setNewAzureBlobClient(t) + + // Create new receiver + testConsumer := &consumertest.TracesSink{} + r, err := newTracesReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) // Setup mocks - mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- &azureblob.BlobResults{ + Blobs: returnedBlobInfo, + } + close(r.doneChan) + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) - copy(buf, jsonBytes) - return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.TracesSink{} - r, err := newTracesReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.SpanCount() == traces.SpanCount() } @@ -239,7 +225,17 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + // Create new receiver + testConsumer := &consumertest.LogsSink{} + r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- &azureblob.BlobResults{ + Blobs: returnedBlobInfo, + } + close(r.doneChan) + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -248,11 +244,6 @@ func Test_fullRehydration(t *testing.T) { return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.LogRecordCount() == logs.LogRecordCount() } @@ -281,7 +272,17 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + // Create new receiver + testConsumer := &consumertest.LogsSink{} + r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- &azureblob.BlobResults{ + Blobs: returnedBlobInfo, + } + close(r.doneChan) + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -290,11 +291,6 @@ func Test_fullRehydration(t *testing.T) { return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.LogRecordCount() == logs.LogRecordCount() } @@ -327,7 +323,17 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + // Create new receiver + testConsumer := &consumertest.LogsSink{} + r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- &azureblob.BlobResults{ + Blobs: returnedBlobInfo, + } + close(r.doneChan) + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -335,12 +341,8 @@ func Test_fullRehydration(t *testing.T) { return expectedBuffSize, nil }) - mockClient.EXPECT().DeleteBlob(mock.Anything, cfg.Container, targetBlob.Name).Return(nil) - // Create new receiver - testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) + mockClient.EXPECT().DeleteBlob(mock.Anything, cfg.Container, targetBlob.Name).Return(nil) checkFunc := func() bool { return testConsumer.LogRecordCount() == logs.LogRecordCount() @@ -375,7 +377,18 @@ func Test_fullRehydration(t *testing.T) { // Setup mocks mockClient := setNewAzureBlobClient(t) - mockClient.EXPECT().ListBlobs(mock.Anything, cfg.Container, (*string)(nil), (*string)(nil)).Return(returnedBlobInfo, nil, nil) + + // Create new receiver + testConsumer := &consumertest.LogsSink{} + r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) + require.NoError(t, err) + + mockClient.EXPECT().StreamBlobs(mock.Anything, cfg.Container, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return().After(time.Millisecond).Run(func(_ mock.Arguments) { + r.blobChan <- &azureblob.BlobResults{ + Blobs: returnedBlobInfo, + } + close(r.doneChan) + }) mockClient.EXPECT().DownloadBlob(mock.Anything, cfg.Container, targetBlob.Name, mock.Anything).RunAndReturn(func(_ context.Context, _ string, _ string, buf []byte) (int64, error) { require.Len(t, buf, int(expectedBuffSize)) @@ -384,11 +397,6 @@ func Test_fullRehydration(t *testing.T) { return expectedBuffSize, nil }) - // Create new receiver - testConsumer := &consumertest.LogsSink{} - r, err := newLogsReceiver(id, testLogger, cfg, testConsumer) - require.NoError(t, err) - checkFunc := func() bool { return testConsumer.LogRecordCount() == logs.LogRecordCount() } @@ -496,10 +504,9 @@ func Test_processBlob(t *testing.T) { }, consumer: mockConsumer, azureClient: mockClient, - ctx: context.Background(), } - err := r.processBlob(tc.info) + err := r.processBlob(context.Background(), tc.info) if tc.expectedErr == nil { require.NoError(t, err) } else { @@ -517,7 +524,7 @@ func setNewAzureBlobClient(t *testing.T) *blobmocks.MockBlobClient { mockClient := blobmocks.NewMockBlobClient(t) - newAzureBlobClient = func(_ string) (azureblob.BlobClient, error) { + newAzureBlobClient = func(_ string, _ int, _ int) (azureblob.BlobClient, error) { return mockClient, nil }