Skip to content

Commit

Permalink
[8.17](backport #41755) [AWS] [S3] fix: improve object size metric ca…
Browse files Browse the repository at this point in the history
…lculation (#42153)

* [AWS] [S3] fix: improve object size metric calculation (#41755)

* rely on monitored reader for content length

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* add tests to validate metrics

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* add changelog entry

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* fix lint - ignore ok values as we know the stored value type

Signed-off-by: Kavindu Dodanduwa <[email protected]>

* review change - rename and accessor

Signed-off-by: Kavindu Dodanduwa <[email protected]>

---------

Signed-off-by: Kavindu Dodanduwa <[email protected]>
(cherry picked from commit 92bb2c5)

* Remove extra changelog entries

---------

Co-authored-by: Kavindu Dodanduwa <[email protected]>
Co-authored-by: Maurizio Branca <[email protected]>
  • Loading branch information
3 people authored Dec 24, 2024
1 parent 8b9a76e commit df92801
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015]
- Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036]
- Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019]
- Improve S3 object size metric calculation to support situations where Content-Length is not available. {pull}41755[41755]

*Heartbeat*

Expand Down
20 changes: 14 additions & 6 deletions x-pack/filebeat/input/awss3/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func init() {
// currentTime returns the current time. This exists to allow unit tests
// simulate the passage of time.
func currentTime() time.Time {
clock := clockValue.Load().(clock)
clock, _ := clockValue.Load().(clock)
return clock.Now()
}

Expand Down Expand Up @@ -206,18 +206,26 @@ func newInputMetrics(id string, optionalParent *monitoring.Registry, maxWorkers
return out
}

// monitoredReader implements io.Reader and counts the number of bytes read.
// monitoredReader implements io.Reader and wraps byte read tracking fields for S3 bucket objects.
// Following are the tracked metrics,
// - totalBytesReadMetric - a total metric tracking bytes reads throughout the runtime from all processed objects
// - totalBytesReadCurrent - total bytes read from the currently tracked object
//
// See newMonitoredReader for initialization considerations.
type monitoredReader struct {
reader io.Reader
totalBytesRead *monitoring.Uint
reader io.Reader
totalBytesReadMetric *monitoring.Uint
totalBytesReadCurrent int64
}

// newMonitoredReader initialize the monitoredReader with a shared monitor that tracks all bytes read.
func newMonitoredReader(r io.Reader, metric *monitoring.Uint) *monitoredReader {
return &monitoredReader{reader: r, totalBytesRead: metric}
return &monitoredReader{reader: r, totalBytesReadMetric: metric}
}

func (m *monitoredReader) Read(p []byte) (int, error) {
n, err := m.reader.Read(p)
m.totalBytesRead.Add(uint64(n))
m.totalBytesReadMetric.Add(uint64(n))
m.totalBytesReadCurrent += int64(n)
return n, err
}
9 changes: 5 additions & 4 deletions x-pack/filebeat/input/awss3/s3_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ type s3ObjectProcessor struct {

type s3DownloadedObject struct {
body io.ReadCloser
length int64
contentType string
metadata map[string]interface{}
}
Expand Down Expand Up @@ -142,9 +141,9 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
defer s3Obj.body.Close()

p.s3Metadata = s3Obj.metadata
p.metrics.s3ObjectSizeInBytes.Update(s3Obj.length)

reader, err := p.addGzipDecoderIfNeeded(newMonitoredReader(s3Obj.body, p.metrics.s3BytesProcessedTotal))
mReader := newMonitoredReader(s3Obj.body, p.metrics.s3BytesProcessedTotal)
reader, err := p.addGzipDecoderIfNeeded(mReader)
if err != nil {
return fmt.Errorf("failed checking for gzip content: %w", err)
}
Expand Down Expand Up @@ -213,6 +212,9 @@ func (p *s3ObjectProcessor) ProcessS3Object(log *logp.Logger, eventCallback func
time.Since(start).Nanoseconds(), err)
}

// finally obtain total bytes of the object through metered reader
p.metrics.s3ObjectSizeInBytes.Update(mReader.totalBytesReadCurrent)

return nil
}

Expand Down Expand Up @@ -241,7 +243,6 @@ func (p *s3ObjectProcessor) download() (obj *s3DownloadedObject, err error) {

s := &s3DownloadedObject{
body: getObjectOutput.Body,
length: *getObjectOutput.ContentLength,
contentType: ctType,
metadata: meta,
}
Expand Down
70 changes: 70 additions & 0 deletions x-pack/filebeat/input/awss3/s3_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,76 @@ func TestS3ObjectProcessor(t *testing.T) {
})
}

func TestProcessObjectMetricCollection(t *testing.T) {
logger := logp.NewLogger("testing-s3-processor-metrics")

tests := []struct {
name string
filename string
contentType string
objectSize int64
}{
{
name: "simple text - octet-stream",
filename: "testdata/log.txt",
contentType: "application/octet-stream",
objectSize: 18,
},
{
name: "json text",
filename: "testdata/log.json",
contentType: "application/json",
objectSize: 199,
},
{
name: "gzip with json text",
filename: "testdata/multiline.json.gz",
contentType: "application/x-gzip",
objectSize: 175,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// given
ctx, cancel := context.WithTimeout(context.Background(), testTimeout)
defer cancel()

ctrl, ctx := gomock.WithContext(ctx, t)
defer ctrl.Finish()

s3Event, s3Resp := newS3Object(t, test.filename, test.contentType)
mockS3API := NewMockS3API(ctrl)
gomock.InOrder(
mockS3API.EXPECT().
GetObject(gomock.Any(), gomock.Eq("us-east-1"), gomock.Eq(s3Event.S3.Bucket.Name), gomock.Eq(s3Event.S3.Object.Key)).
Return(s3Resp, nil),
)

// metric recorder with zero workers
metricRecorder := newInputMetrics(test.name, nil, 0)
objFactory := newS3ObjectProcessorFactory(metricRecorder, mockS3API, nil, backupConfig{})
objHandler := objFactory.Create(ctx, s3Event)

// when
err := objHandler.ProcessS3Object(logger, func(_ beat.Event) {})

// then
require.NoError(t, err)

require.Equal(t, uint64(1), metricRecorder.s3ObjectsRequestedTotal.Get())
require.Equal(t, uint64(0), metricRecorder.s3ObjectsInflight.Get())

values := metricRecorder.s3ObjectSizeInBytes.Values()
require.Equal(t, 1, len(values))

// since we processed a single object, total and current process size is same
require.Equal(t, test.objectSize, values[0])
require.Equal(t, uint64(test.objectSize), metricRecorder.s3BytesProcessedTotal.Get())
})
}
}

func testProcessS3Object(t testing.TB, file, contentType string, numEvents int, selectors ...fileSelectorConfig) []beat.Event {
return _testProcessS3Object(t, file, contentType, numEvents, false, selectors)
}
Expand Down

0 comments on commit df92801

Please sign in to comment.