diff --git a/.chloggen/auto-record-type.yaml b/.chloggen/auto-record-type.yaml new file mode 100644 index 000000000000..1a7f5f09c0a3 --- /dev/null +++ b/.chloggen/auto-record-type.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsfirehosereceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add auto record type. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36708] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Add the auto record type, capable of distinguishing between all current possible record types. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/awsfirehosereceiver/factory.go b/receiver/awsfirehosereceiver/factory.go index 5058ad42064d..c78a27419df4 100644 --- a/receiver/awsfirehosereceiver/factory.go +++ b/receiver/awsfirehosereceiver/factory.go @@ -16,6 +16,7 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/internal/common/testutil" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/auto" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream" @@ -32,6 +33,7 @@ var ( cwmetricstream.TypeStr: true, cwlog.TypeStr: true, otlpmetricstream.TypeStr: true, + auto.TypeStr: true, } ) @@ -59,17 +61,21 @@ func validateRecordType(recordType string) error { func defaultMetricsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.MetricsUnmarshaler { cwmsu := cwmetricstream.NewUnmarshaler(logger) otlpv1msu := otlpmetricstream.NewUnmarshaler(logger) + autoUnmarshaler := auto.NewUnmarshaler(logger) return map[string]unmarshaler.MetricsUnmarshaler{ cwmsu.Type(): cwmsu, otlpv1msu.Type(): otlpv1msu, + auto.TypeStr: autoUnmarshaler, } } // defaultLogsUnmarshalers creates a map of the available logs unmarshalers. func defaultLogsUnmarshalers(logger *zap.Logger) map[string]unmarshaler.LogsUnmarshaler { u := cwlog.NewUnmarshaler(logger) + autoUnmarshaler := auto.NewUnmarshaler(logger) return map[string]unmarshaler.LogsUnmarshaler{ - u.Type(): u, + u.Type(): u, + auto.TypeStr: autoUnmarshaler, } } diff --git a/receiver/awsfirehosereceiver/factory_test.go b/receiver/awsfirehosereceiver/factory_test.go index c42417511f32..78e55560ec9f 100644 --- a/receiver/awsfirehosereceiver/factory_test.go +++ b/receiver/awsfirehosereceiver/factory_test.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver/receivertest" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/auto" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream" ) @@ -46,5 +47,6 @@ func TestValidateRecordType(t *testing.T) { require.NoError(t, validateRecordType(defaultMetricsRecordType)) require.NoError(t, validateRecordType(defaultLogsRecordType)) require.NoError(t, validateRecordType(otlpmetricstream.TypeStr)) + require.NoError(t, validateRecordType(auto.TypeStr)) require.Error(t, validateRecordType("nop")) } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go new file mode 100644 index 000000000000..44f76d06bdba --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller.go @@ -0,0 +1,241 @@ +package auto + +import ( + "bytes" + "encoding/json" + "errors" + "github.com/gogo/protobuf/proto" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.uber.org/zap" +) + +const ( + TypeStr = "auto" + recordDelimiter = "\n" +) + +var ( + errInvalidRecords = errors.New("record format invalid") + errUnknownLength = errors.New("unable to decode data length from message") +) + +// Unmarshaler for the CloudWatch Log JSON record format. +type Unmarshaler struct { + logger *zap.Logger +} + +var _ unmarshaler.Unmarshaler = (*Unmarshaler)(nil) + +// NewUnmarshaler creates a new instance of the Unmarshaler. +func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { + return &Unmarshaler{logger} +} + +// isJSON returns true if record starts with { and ends with }. Ignores new lines at the end. +func isJSON(record []byte) bool { + if len(record) < 2 { + return false + } + + // Remove all newlines at the end, if there are any + lastIndex := len(record) - 1 + for lastIndex >= 0 && record[lastIndex] == '\n' { + lastIndex-- + } + + return lastIndex > 0 && record[0] == '{' && record[lastIndex] == '}' +} + +// isCloudWatchLog checks if the data has the entries needed to be considered a cloudwatch +// log (struct cwlog.CWLog) +func isCloudWatchLog(data []byte) bool { + if !bytes.Contains(data, []byte(`"owner":`)) { + return false + } + if !bytes.Contains(data, []byte(`"logGroup":`)) { + return false + } + if !bytes.Contains(data, []byte(`"logStream":`)) { + return false + } + return true +} + +// isCloudwatchMetrics checks if the data has the entries needed to be considered a cloudwatch +// metric (struct cwmetricstream.CWMetric) +func isCloudwatchMetrics(data []byte) bool { + if !bytes.Contains(data, []byte(`"metric_name":`)) { + return false + } + if !bytes.Contains(data, []byte(`"namespace":`)) { + return false + } + if !bytes.Contains(data, []byte(`"unit":`)) { + return false + } + if !bytes.Contains(data, []byte(`"value":`)) { + return false + } + return true +} + +// addCloudwatchLog unmarshalls the record to a cwlog.CWLog and adds +// it to the logs +func (u *Unmarshaler) addCloudwatchLog( + record []byte, + resourceLogs map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder, + ld plog.Logs, +) error { + var log cwlog.CWLog + if err := json.Unmarshal(record, &log); err != nil { + return err + } + attrs := cwlog.ResourceAttributes{ + Owner: log.Owner, + LogGroup: log.LogGroup, + LogStream: log.LogStream, + } + lb, exists := resourceLogs[attrs] + if !exists { + lb = cwlog.NewResourceLogsBuilder(ld, attrs) + resourceLogs[attrs] = lb + } + lb.AddLog(log) + return nil +} + +// addCloudwatchMetric unmarshalls the record to a cwmetric.CWMetric and adds +// it to the metrics +func (u *Unmarshaler) addCloudwatchMetric( + record []byte, + resourceMetrics map[cwmetricstream.ResourceAttributes]*cwmetricstream.ResourceMetricsBuilder, + md pmetric.Metrics, +) error { + var metric cwmetricstream.CWMetric + if err := json.Unmarshal(record, &metric); err != nil { + return err + } + attrs := cwmetricstream.ResourceAttributes{ + MetricStreamName: metric.MetricStreamName, + Namespace: metric.Namespace, + AccountID: metric.AccountID, + Region: metric.Region, + } + mb, exists := resourceMetrics[attrs] + if !exists { + mb = cwmetricstream.NewResourceMetricsBuilder(md, attrs) + resourceMetrics[attrs] = mb + } + mb.AddMetric(metric) + return nil +} + +func (u *Unmarshaler) UnmarshalLogs(records [][]byte) (plog.Logs, error) { + ld := plog.NewLogs() + cloudwatchLogs := make(map[cwlog.ResourceAttributes]*cwlog.ResourceLogsBuilder) + for i, record := range records { + if isJSON(record) { + for j, datum := range bytes.Split(record, []byte(recordDelimiter)) { + if isCloudWatchLog(datum) { + if err := u.addCloudwatchLog(datum, cloudwatchLogs, ld); err != nil { + u.logger.Error( + "Unable to unmarshal record to cloudwatch log", + zap.Error(err), + zap.Int("datum_index", j), + zap.Int("record_index", i), + ) + } + } else { + u.logger.Error( + "Unsupported log type for JSON record", + zap.Int("datum_index", j), + zap.Int("record_index", i), + ) + } + } + } else { + u.logger.Error( + "Unsupported log type for protobuf record", + zap.Int("record_index", i), + ) + } + } + + if ld.LogRecordCount() == 0 { + return ld, errInvalidRecords + } + return ld, nil +} + +func (u *Unmarshaler) addOTLPMetric(record []byte, md pmetric.Metrics) error { + dataLen, pos := len(record), 0 + for pos < dataLen { + n, nLen := proto.DecodeVarint(record) + if nLen == 0 && n == 0 { + return errUnknownLength + } + req := pmetricotlp.NewExportRequest() + pos += nLen + if err := req.UnmarshalProto(record[pos : pos+int(n)]); err != nil { + return err + } + pos += int(n) + req.Metrics().ResourceMetrics().MoveAndAppendTo(md.ResourceMetrics()) + } + return nil +} + +func (u *Unmarshaler) UnmarshalMetrics(records [][]byte) (pmetric.Metrics, error) { + md := pmetric.NewMetrics() + cloudwatchMetrics := make(map[cwmetricstream.ResourceAttributes]*cwmetricstream.ResourceMetricsBuilder) + for i, record := range records { + if isJSON(record) { + for j, datum := range bytes.Split(record, []byte(recordDelimiter)) { + if isCloudwatchMetrics(datum) { + if err := u.addCloudwatchMetric(datum, cloudwatchMetrics, md); err != nil { + u.logger.Error( + "Unable to unmarshal input", + zap.Error(err), + zap.Int("datum_index", j), + zap.Int("record_index", i), + ) + } + } else { + u.logger.Error( + "Unsupported metric type for JSON record", + zap.Int("datum_index", j), + zap.Int("record_index", i), + ) + } + } + } else { // is protobuf + // OTLP metric is the only option currently supported + if err := u.addOTLPMetric(record, md); err != nil { + u.logger.Error( + "failed to unmarshall ExportRequest from proto bytes", + zap.Int("record_index", i), + zap.Error(err), + ) + } else { + u.logger.Error( + "Unsupported metric type for protobuf record", + zap.Int("record_index", i), + ) + } + } + } + + if md.MetricCount() == 0 { + return md, errInvalidRecords + } + return md, nil +} + +func (u *Unmarshaler) Type() string { + return TypeStr +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go new file mode 100644 index 000000000000..5a47d4c97e1b --- /dev/null +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/auto/unmarshaller_test.go @@ -0,0 +1,219 @@ +package auto + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" + "go.uber.org/zap" +) + +func TestType(t *testing.T) { + unmarshaler := NewUnmarshaler(zap.NewExample()) + require.Equal(t, TypeStr, unmarshaler.Type()) +} + +func TestUnmarshalMetrics_JSON(t *testing.T) { + t.Parallel() + + unmarshaler := NewUnmarshaler(zap.NewNop()) + testCases := map[string]struct { + dir string + filename string + metricResourceCount int + metricCount int + metricDataPointCount int + err error + }{ + "cwmetric:WithMultipleRecords": { + dir: "cwmetricstream", + filename: "multiple_records", + metricResourceCount: 6, + metricCount: 33, + metricDataPointCount: 127, + }, + "cwmetric:WithSingleRecord": { + dir: "cwmetricstream", + filename: "single_record", + metricResourceCount: 1, + metricCount: 1, + metricDataPointCount: 1, + }, + "cwmetric:WithInvalidRecords": { + dir: "cwmetricstream", + filename: "invalid_records", + err: errInvalidRecords, + }, + "cwmetric:WithSomeInvalidRecords": { + dir: "cwmetricstream", + filename: "some_invalid_records", + metricResourceCount: 5, + metricCount: 35, + metricDataPointCount: 88, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + record, err := os.ReadFile(filepath.Join("..", testCase.dir, "testdata", testCase.filename)) + require.NoError(t, err) + + records := [][]byte{record} + + metrics, err := unmarshaler.UnmarshalMetrics(records) + require.Equal(t, testCase.err, err) + + require.Equal(t, testCase.metricResourceCount, metrics.ResourceMetrics().Len()) + require.Equal(t, testCase.metricDataPointCount, metrics.DataPointCount()) + require.Equal(t, testCase.metricCount, metrics.MetricCount()) + }) + } +} + +// Unmarshall cloudwatch metrics and logs +func TestUnmarshalLogs_JSON(t *testing.T) { + t.Parallel() + + unmarshaler := NewUnmarshaler(zap.NewExample()) + testCases := map[string]struct { + dir string + filename string + logResourceCount int + logRecordCount int + err error + }{ + "cwlog:WithMultipleRecords": { + dir: "cwlog", + filename: "multiple_records", + logResourceCount: 1, + logRecordCount: 2, + }, + "cwlog:WithSingleRecord": { + dir: "cwlog", + filename: "single_record", + logResourceCount: 1, + logRecordCount: 1, + }, + "cwlog:WithInvalidRecords": { + dir: "cwlog", + filename: "invalid_records", + err: errInvalidRecords, + }, + "cwlog:WithSomeInvalidRecords": { + dir: "cwlog", + filename: "some_invalid_records", + logResourceCount: 1, + logRecordCount: 2, + }, + "cwlog:WithMultipleResources": { + dir: "cwlog", + filename: "multiple_resources", + logResourceCount: 3, + logRecordCount: 6, + }, + } + + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + record, err := os.ReadFile(filepath.Join("..", testCase.dir, "testdata", testCase.filename)) + require.NoError(t, err) + + records := [][]byte{record} + + logs, err := unmarshaler.UnmarshalLogs(records) + require.Equal(t, testCase.err, err) + + require.Equal(t, testCase.logResourceCount, logs.ResourceLogs().Len()) + require.Equal(t, testCase.logRecordCount, logs.LogRecordCount()) + }) + } +} + +func createMetricRecord() []byte { + er := pmetricotlp.NewExportRequest() + rsm := er.Metrics().ResourceMetrics().AppendEmpty() + sm := rsm.ScopeMetrics().AppendEmpty().Metrics().AppendEmpty() + sm.SetName("TestMetric") + dp := sm.SetEmptySummary().DataPoints().AppendEmpty() + dp.SetCount(1) + dp.SetSum(1) + qv := dp.QuantileValues() + min := qv.AppendEmpty() + min.SetQuantile(0) + min.SetValue(0) + max := qv.AppendEmpty() + max.SetQuantile(1) + max.SetValue(1) + dp.SetTimestamp(pcommon.NewTimestampFromTime(time.Now())) + + temp, _ := er.MarshalProto() + record := proto.EncodeVarint(uint64(len(temp))) + record = append(record, temp...) + return record +} + +func TestUnmarshal(t *testing.T) { + t.Parallel() + + unmarshaler := NewUnmarshaler(zap.NewNop()) + testCases := map[string]struct { + records [][]byte + wantResourceCount int + wantMetricCount int + wantDatapointCount int + wantErr error + }{ + "WithSingleRecord": { + records: [][]byte{ + createMetricRecord(), + }, + wantResourceCount: 1, + wantMetricCount: 1, + wantDatapointCount: 1, + }, + "WithMultipleRecords": { + records: [][]byte{ + createMetricRecord(), + createMetricRecord(), + createMetricRecord(), + createMetricRecord(), + createMetricRecord(), + createMetricRecord(), + }, + wantResourceCount: 6, + wantMetricCount: 6, + wantDatapointCount: 6, + }, + "WithInvalidRecords": { + records: [][]byte{{1, 2}}, + wantResourceCount: 0, + wantMetricCount: 0, + wantDatapointCount: 0, + wantErr: errInvalidRecords, + }, + "WithSomeInvalidRecords": { + records: [][]byte{ + createMetricRecord(), + {1, 2}, + createMetricRecord(), + }, + wantResourceCount: 2, + wantMetricCount: 2, + wantDatapointCount: 2, + }, + } + for name, testCase := range testCases { + t.Run(name, func(t *testing.T) { + got, err := unmarshaler.UnmarshalMetrics(testCase.records) + require.Equal(t, testCase.wantErr, err) + require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len()) + require.Equal(t, testCase.wantMetricCount, got.MetricCount()) + require.Equal(t, testCase.wantDatapointCount, got.DataPointCount()) + }) + } +} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go index 2ebca77861dd..954896f7a5bc 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go @@ -18,10 +18,7 @@ func Zip(data []byte) ([]byte, error) { return nil, err } - if err = w.Flush(); err != nil { - return nil, err - } - + //Close handles flushing. if err = w.Close(); err != nil { return nil, err } @@ -37,6 +34,7 @@ func Unzip(data []byte) ([]byte, error) { if err != nil { return nil, err } + defer r.Close() var rv bytes.Buffer _, err = rv.ReadFrom(r) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/cwlog.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/cwlog.go index 1ab85509873a..7d20947a3625 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/cwlog.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/cwlog.go @@ -3,7 +3,7 @@ package cwlog // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog" -type cWLog struct { +type CWLog struct { MessageType string `json:"messageType"` Owner string `json:"owner"` LogGroup string `json:"logGroup"` diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go index 5dc7a3db59f8..fc0e60177f73 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/logsbuilder.go @@ -16,34 +16,36 @@ const ( attributeAWSCloudWatchLogStreamName = "aws.cloudwatch.log_stream_name" ) -// resourceAttributes are the CloudWatch log attributes that define a unique resource. -type resourceAttributes struct { - owner, logGroup, logStream string +// ResourceAttributes are the CloudWatch log attributes that define a unique resource. +type ResourceAttributes struct { + Owner string + LogGroup string + LogStream string } -// resourceLogsBuilder provides convenient access to the a Resource's LogRecordSlice. -type resourceLogsBuilder struct { +// ResourceLogsBuilder provides convenient access to a Resource's LogRecordSlice. +type ResourceLogsBuilder struct { rls plog.LogRecordSlice } -// setAttributes applies the resourceAttributes to the provided Resource. -func (ra *resourceAttributes) setAttributes(resource pcommon.Resource) { +// setAttributes applies the ResourceAttributes to the provided Resource. +func (ra *ResourceAttributes) setAttributes(resource pcommon.Resource) { attrs := resource.Attributes() - attrs.PutStr(conventions.AttributeCloudAccountID, ra.owner) - attrs.PutStr(attributeAWSCloudWatchLogGroupName, ra.logGroup) - attrs.PutStr(attributeAWSCloudWatchLogStreamName, ra.logStream) + attrs.PutStr(conventions.AttributeCloudAccountID, ra.Owner) + attrs.PutStr(attributeAWSCloudWatchLogGroupName, ra.LogGroup) + attrs.PutStr(attributeAWSCloudWatchLogStreamName, ra.LogStream) } -// newResourceLogsBuilder to capture logs for the Resource defined by the provided attributes. -func newResourceLogsBuilder(logs plog.Logs, attrs resourceAttributes) *resourceLogsBuilder { +// NewResourceLogsBuilder to capture logs for the Resource defined by the provided attributes. +func NewResourceLogsBuilder(logs plog.Logs, attrs ResourceAttributes) *ResourceLogsBuilder { rls := logs.ResourceLogs().AppendEmpty() attrs.setAttributes(rls.Resource()) - return &resourceLogsBuilder{rls.ScopeLogs().AppendEmpty().LogRecords()} + return &ResourceLogsBuilder{rls.ScopeLogs().AppendEmpty().LogRecords()} } // AddLog events to the LogRecordSlice. Resource attributes are captured when creating -// the resourceLogsBuilder, so we only need to consider the LogEvents themselves. -func (rlb *resourceLogsBuilder) AddLog(log cWLog) { +// the ResourceLogsBuilder, so we only need to consider the LogEvents themselves. +func (rlb *ResourceLogsBuilder) AddLog(log CWLog) { for _, event := range log.LogEvents { logLine := rlb.rls.AppendEmpty() // pcommon.Timestamp is a time specified as UNIX Epoch time in nanoseconds diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go index b3fa132166b5..60b1e24bac48 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go @@ -12,7 +12,6 @@ import ( "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression" ) const ( @@ -34,55 +33,44 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { return &Unmarshaler{logger} } -// Unmarshal deserializes the records into cWLogs and uses the -// resourceLogsBuilder to group them into a single plog.Logs. -// Skips invalid cWLogs received in the record and -func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) { +// UnmarshalLogs deserializes the records into CWLog and uses the +// ResourceLogsBuilder to group them into a single plog.Logs. +// Skips invalid CWLog received in the record. +func (u Unmarshaler) UnmarshalLogs(records [][]byte) (plog.Logs, error) { md := plog.NewLogs() - builders := make(map[resourceAttributes]*resourceLogsBuilder) - for recordIndex, compressedRecord := range records { - record, err := compression.Unzip(compressedRecord) - if err != nil { - u.logger.Error("Failed to unzip record", - zap.Error(err), - zap.Int("record_index", recordIndex), - ) - continue - } - // Multiple logs in each record separated by newline character + builders := make(map[ResourceAttributes]*ResourceLogsBuilder) + for recordIndex, record := range records { for datumIndex, datum := range bytes.Split(record, []byte(recordDelimiter)) { - if len(datum) > 0 { - var log cWLog - err := json.Unmarshal(datum, &log) - if err != nil { - u.logger.Error( - "Unable to unmarshal input", - zap.Error(err), - zap.Int("datum_index", datumIndex), - zap.Int("record_index", recordIndex), - ) - continue - } - if !u.isValid(log) { - u.logger.Error( - "Invalid log", - zap.Int("datum_index", datumIndex), - zap.Int("record_index", recordIndex), - ) - continue - } - attrs := resourceAttributes{ - owner: log.Owner, - logGroup: log.LogGroup, - logStream: log.LogStream, - } - lb, ok := builders[attrs] - if !ok { - lb = newResourceLogsBuilder(md, attrs) - builders[attrs] = lb - } - lb.AddLog(log) + var log CWLog + err := json.Unmarshal(datum, &log) + if err != nil { + u.logger.Error( + "Unable to unmarshal input", + zap.Error(err), + zap.Int("datum_index", datumIndex), + zap.Int("record_index", recordIndex), + ) + continue + } + if !u.isValid(log) { + u.logger.Error( + "Invalid log", + zap.Int("datum_index", datumIndex), + zap.Int("record_index", recordIndex), + ) + continue + } + attrs := ResourceAttributes{ + Owner: log.Owner, + LogGroup: log.LogGroup, + LogStream: log.LogStream, + } + lb, ok := builders[attrs] + if !ok { + lb = NewResourceLogsBuilder(md, attrs) + builders[attrs] = lb } + lb.AddLog(log) } } @@ -93,8 +81,8 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) { return md, nil } -// isValid validates that the cWLog has been unmarshalled correctly. -func (u Unmarshaler) isValid(log cWLog) bool { +// isValid validates that the CWLog has been unmarshalled correctly. +func (u Unmarshaler) isValid(log CWLog) bool { return log.Owner != "" && log.LogGroup != "" && log.LogStream != "" } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go index 71b49295df60..6b3a90244477 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go @@ -10,8 +10,6 @@ import ( "github.com/stretchr/testify/require" "go.uber.org/zap" - - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression" ) func TestType(t *testing.T) { @@ -57,27 +55,13 @@ func TestUnmarshal(t *testing.T) { record, err := os.ReadFile(filepath.Join(".", "testdata", testCase.filename)) require.NoError(t, err) - compressedRecord, err := compression.Zip(record) - require.NoError(t, err) - records := [][]byte{compressedRecord} + records := [][]byte{record} - got, err := unmarshaler.Unmarshal(records) - if testCase.wantErr != nil { - require.Error(t, err) - require.Equal(t, testCase.wantErr, err) - } else { - require.NoError(t, err) - require.NotNil(t, got) - require.Equal(t, testCase.wantResourceCount, got.ResourceLogs().Len()) - gotLogCount := 0 - for i := 0; i < got.ResourceLogs().Len(); i++ { - rm := got.ResourceLogs().At(i) - require.Equal(t, 1, rm.ScopeLogs().Len()) - ilm := rm.ScopeLogs().At(0) - gotLogCount += ilm.LogRecords().Len() - } - require.Equal(t, testCase.wantLogCount, gotLogCount) - } + got, err := unmarshaler.UnmarshalLogs(records) + require.Equal(t, testCase.wantErr, err) + require.NotNil(t, got) + require.Equal(t, testCase.wantResourceCount, got.ResourceLogs().Len()) + require.Equal(t, testCase.wantLogCount, got.LogRecordCount()) }) } } @@ -87,11 +71,9 @@ func TestLogTimestamp(t *testing.T) { record, err := os.ReadFile(filepath.Join(".", "testdata", "single_record")) require.NoError(t, err) - compressedRecord, err := compression.Zip(record) - require.NoError(t, err) - records := [][]byte{compressedRecord} + records := [][]byte{record} - got, err := unmarshaler.Unmarshal(records) + got, err := unmarshaler.UnmarshalLogs(records) require.NoError(t, err) require.NotNil(t, got) require.Equal(t, 1, got.ResourceLogs().Len()) diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/cwmetric.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/cwmetric.go index 1896fd0c869e..0426fc81ad8c 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/cwmetric.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/cwmetric.go @@ -3,11 +3,11 @@ package cwmetricstream // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream" -// The cWMetric is the format for the CloudWatch metric stream records. +// The CWMetric is the format for the CloudWatch metric stream records. // // More details can be found at: // https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch-metric-streams-formats-json.html -type cWMetric struct { +type CWMetric struct { // MetricStreamName is the name of the CloudWatch metric stream. MetricStreamName string `json:"metric_stream_name"` // AccountID is the AWS account ID associated with the metric. @@ -24,9 +24,9 @@ type cWMetric struct { // Timestamp is the milliseconds since epoch for // the metric. Timestamp int64 `json:"timestamp"` - // Value is the cWMetricValue, which has the min, max, + // Value is the CWMetricValue, which has the min, max, // sum, and count. - Value *cWMetricValue `json:"value"` + Value *CWMetricValue `json:"value"` // Unit is the unit for the metric. // // More details can be found at: @@ -34,8 +34,8 @@ type cWMetric struct { Unit string `json:"unit"` } -// The cWMetricValue is the actual values of the CloudWatch metric. -type cWMetricValue struct { +// The CWMetricValue is the actual values of the CloudWatch metric. +type CWMetricValue struct { // Max is the highest value observed. Max float64 `json:"max"` // Min is the lowest value observed. diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder.go index af47ba4b6509..a44951d15693 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder.go @@ -19,34 +19,34 @@ const ( namespaceDelimiter = "/" ) -// resourceAttributes are the CloudWatch metric stream attributes that define a +// ResourceAttributes are the CloudWatch metric stream attributes that define a // unique resource. -type resourceAttributes struct { - // metricStreamName is the metric stream name. - metricStreamName string - // accountID is the AWS account ID. - accountID string - // region is the AWS region. - region string - // namespace is the CloudWatch metric namespace. - namespace string +type ResourceAttributes struct { + // MetricStreamName is the metric stream name. + MetricStreamName string + // AccountID is the AWS account ID. + AccountID string + // Region is the AWS Region. + Region string + // Namespace is the CloudWatch metric namespace. + Namespace string } -// The resourceMetricsBuilder is used to aggregate metrics for the -// same resourceAttributes. -type resourceMetricsBuilder struct { +// The ResourceMetricsBuilder is used to aggregate metrics for the +// same ResourceAttributes. +type ResourceMetricsBuilder struct { rms pmetric.MetricSlice // metricBuilders is the map of metrics within the same // resource group. metricBuilders map[string]*metricBuilder } -// newResourceMetricsBuilder creates a resourceMetricsBuilder with the -// resourceAttributes. -func newResourceMetricsBuilder(md pmetric.Metrics, attrs resourceAttributes) *resourceMetricsBuilder { +// NewResourceMetricsBuilder creates a ResourceMetricsBuilder with the +// ResourceAttributes. +func NewResourceMetricsBuilder(md pmetric.Metrics, attrs ResourceAttributes) *ResourceMetricsBuilder { rms := md.ResourceMetrics().AppendEmpty() attrs.setAttributes(rms.Resource()) - return &resourceMetricsBuilder{ + return &ResourceMetricsBuilder{ rms: rms.ScopeMetrics().AppendEmpty().Metrics(), metricBuilders: make(map[string]*metricBuilder), } @@ -54,7 +54,7 @@ func newResourceMetricsBuilder(md pmetric.Metrics, attrs resourceAttributes) *re // AddMetric adds a metric to one of the metric builders based on // the key generated for each. -func (rmb *resourceMetricsBuilder) AddMetric(metric cWMetric) { +func (rmb *ResourceMetricsBuilder) AddMetric(metric CWMetric) { mb, ok := rmb.metricBuilders[metric.MetricName] if !ok { mb = newMetricBuilder(rmb.rms, metric.MetricName, metric.Unit) @@ -63,18 +63,18 @@ func (rmb *resourceMetricsBuilder) AddMetric(metric cWMetric) { mb.AddDataPoint(metric) } -// setAttributes creates a pcommon.Resource from the fields in the resourceMetricsBuilder. -func (rmb *resourceAttributes) setAttributes(resource pcommon.Resource) { +// setAttributes creates a pcommon.Resource from the fields in the ResourceMetricsBuilder. +func (rmb *ResourceAttributes) setAttributes(resource pcommon.Resource) { attributes := resource.Attributes() attributes.PutStr(conventions.AttributeCloudProvider, conventions.AttributeCloudProviderAWS) - attributes.PutStr(conventions.AttributeCloudAccountID, rmb.accountID) - attributes.PutStr(conventions.AttributeCloudRegion, rmb.region) - serviceNamespace, serviceName := toServiceAttributes(rmb.namespace) + attributes.PutStr(conventions.AttributeCloudAccountID, rmb.AccountID) + attributes.PutStr(conventions.AttributeCloudRegion, rmb.Region) + serviceNamespace, serviceName := toServiceAttributes(rmb.Namespace) if serviceNamespace != "" { attributes.PutStr(conventions.AttributeServiceNamespace, serviceNamespace) } attributes.PutStr(conventions.AttributeServiceName, serviceName) - attributes.PutStr(attributeAWSCloudWatchMetricStreamName, rmb.metricStreamName) + attributes.PutStr(attributeAWSCloudWatchMetricStreamName, rmb.MetricStreamName) } // toServiceAttributes splits the CloudWatch namespace into service namespace/name @@ -120,7 +120,7 @@ func newMetricBuilder(rms pmetric.MetricSlice, name, unit string) *metricBuilder // AddDataPoint adds the metric as a datapoint if a metric for that timestamp // hasn't already been added. -func (mb *metricBuilder) AddDataPoint(metric cWMetric) { +func (mb *metricBuilder) AddDataPoint(metric CWMetric) { key := dataPointKey{ timestamp: metric.Timestamp, dimensions: fmt.Sprint(metric.Dimensions), @@ -131,9 +131,9 @@ func (mb *metricBuilder) AddDataPoint(metric cWMetric) { } } -// toDataPoint converts a cWMetric into a pdata datapoint and attaches the +// toDataPoint converts a CWMetric into a pdata datapoint and attaches the // dimensions as attributes. -func (mb *metricBuilder) toDataPoint(dp pmetric.SummaryDataPoint, metric cWMetric) { +func (mb *metricBuilder) toDataPoint(dp pmetric.SummaryDataPoint, metric CWMetric) { dp.SetCount(uint64(metric.Value.Count)) dp.SetSum(metric.Value.Sum) qv := dp.QuantileValues() diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder_test.go index 114554648f7d..4a716df79a51 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/metricsbuilder_test.go @@ -45,7 +45,7 @@ func TestToSemConvAttributeKey(t *testing.T) { func TestMetricBuilder(t *testing.T) { t.Run("WithSingleMetric", func(t *testing.T) { - metric := cWMetric{ + metric := CWMetric{ MetricName: "name", Unit: "unit", Timestamp: time.Now().UnixMilli(), @@ -72,7 +72,7 @@ func TestMetricBuilder(t *testing.T) { }) t.Run("WithTimestampCollision", func(t *testing.T) { timestamp := time.Now().UnixMilli() - metrics := []cWMetric{ + metrics := []CWMetric{ { Timestamp: timestamp, Value: testCWMetricValue(), @@ -136,21 +136,21 @@ func TestResourceMetricsBuilder(t *testing.T) { } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - metric := cWMetric{ + metric := CWMetric{ MetricName: "name", Unit: "unit", Timestamp: time.Now().UnixMilli(), Value: testCWMetricValue(), Dimensions: map[string]string{}, } - attrs := resourceAttributes{ - metricStreamName: testStreamName, - accountID: testAccountID, - region: testRegion, - namespace: testCase.namespace, + attrs := ResourceAttributes{ + MetricStreamName: testStreamName, + AccountID: testAccountID, + Region: testRegion, + Namespace: testCase.namespace, } gots := pmetric.NewMetrics() - rmb := newResourceMetricsBuilder(gots, attrs) + rmb := NewResourceMetricsBuilder(gots, attrs) rmb.AddMetric(metric) require.Equal(t, 1, gots.ResourceMetrics().Len()) got := gots.ResourceMetrics().At(0) @@ -167,7 +167,7 @@ func TestResourceMetricsBuilder(t *testing.T) { }) } t.Run("WithSameMetricDifferentDimensions", func(t *testing.T) { - metrics := []cWMetric{ + metrics := []CWMetric{ { MetricName: "name", Unit: "unit", @@ -185,14 +185,14 @@ func TestResourceMetricsBuilder(t *testing.T) { }, }, } - attrs := resourceAttributes{ - metricStreamName: testStreamName, - accountID: testAccountID, - region: testRegion, - namespace: "AWS/EC2", + attrs := ResourceAttributes{ + MetricStreamName: testStreamName, + AccountID: testAccountID, + Region: testRegion, + Namespace: "AWS/EC2", } gots := pmetric.NewMetrics() - rmb := newResourceMetricsBuilder(gots, attrs) + rmb := NewResourceMetricsBuilder(gots, attrs) for _, metric := range metrics { rmb.AddMetric(metric) } @@ -206,7 +206,7 @@ func TestResourceMetricsBuilder(t *testing.T) { }) } -// testCWMetricValue is a convenience function for creating a test cWMetricValue -func testCWMetricValue() *cWMetricValue { - return &cWMetricValue{100, 0, float64(rand.Int63n(100)), float64(rand.Int63n(4))} +// testCWMetricValue is a convenience function for creating a test CWMetricValue +func testCWMetricValue() *CWMetricValue { + return &CWMetricValue{100, 0, float64(rand.Int63n(100)), float64(rand.Int63n(4))} } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go index 110ef4afc0aa..d404b97b41c0 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler.go @@ -7,7 +7,6 @@ import ( "bytes" "encoding/json" "errors" - "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/zap" @@ -36,48 +35,49 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { return &Unmarshaler{logger} } -// Unmarshal deserializes the records into cWMetrics and uses the -// resourceMetricsBuilder to group them into a single pmetric.Metrics. -// Skips invalid cWMetrics received in the record and -func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) { +// UnmarshalMetrics deserializes the records into CWMetric and uses the +// ResourceMetricsBuilder to group them into a single pmetric.Metrics. +// Skips invalid CWMetric received in the record. +func (u Unmarshaler) UnmarshalMetrics(records [][]byte) (pmetric.Metrics, error) { md := pmetric.NewMetrics() - builders := make(map[resourceAttributes]*resourceMetricsBuilder) + builders := make(map[ResourceAttributes]*ResourceMetricsBuilder) for recordIndex, record := range records { - // Multiple metrics in each record separated by newline character + // In a CloudWatch metric stream that uses the JSON format, + // each Firehose record contains multiple JSON objects separated + // by a newline character (\n). Each object includes a single data + // point of a single metric. for datumIndex, datum := range bytes.Split(record, []byte(recordDelimiter)) { - if len(datum) > 0 { - var metric cWMetric - err := json.Unmarshal(datum, &metric) - if err != nil { - u.logger.Error( - "Unable to unmarshal input", - zap.Error(err), - zap.Int("datum_index", datumIndex), - zap.Int("record_index", recordIndex), - ) - continue - } - if !u.isValid(metric) { - u.logger.Error( - "Invalid metric", - zap.Int("datum_index", datumIndex), - zap.Int("record_index", recordIndex), - ) - continue - } - attrs := resourceAttributes{ - metricStreamName: metric.MetricStreamName, - namespace: metric.Namespace, - accountID: metric.AccountID, - region: metric.Region, - } - mb, ok := builders[attrs] - if !ok { - mb = newResourceMetricsBuilder(md, attrs) - builders[attrs] = mb - } - mb.AddMetric(metric) + var metric CWMetric + err := json.Unmarshal(datum, &metric) + if err != nil { + u.logger.Error( + "Unable to unmarshal input", + zap.Error(err), + zap.Int("datum_index", datumIndex), + zap.Int("record_index", recordIndex), + ) + continue + } + if !u.isValid(metric) { + u.logger.Error( + "Invalid metric", + zap.Int("datum_index", datumIndex), + zap.Int("record_index", recordIndex), + ) + continue + } + attrs := ResourceAttributes{ + MetricStreamName: metric.MetricStreamName, + Namespace: metric.Namespace, + AccountID: metric.AccountID, + Region: metric.Region, + } + mb, ok := builders[attrs] + if !ok { + mb = NewResourceMetricsBuilder(md, attrs) + builders[attrs] = mb } + mb.AddMetric(metric) } } @@ -88,8 +88,8 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) { return md, nil } -// isValid validates that the cWMetric has been unmarshalled correctly. -func (u Unmarshaler) isValid(metric cWMetric) bool { +// isValid validates that the CWMetric has been unmarshalled correctly. +func (u Unmarshaler) isValid(metric CWMetric) bool { return metric.MetricName != "" && metric.Namespace != "" && metric.Unit != "" && metric.Value != nil } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go index 16a517db1756..963a0ddb746a 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwmetricstream/unmarshaler_test.go @@ -56,29 +56,12 @@ func TestUnmarshal(t *testing.T) { records := [][]byte{record} - got, err := unmarshaler.Unmarshal(records) - if testCase.wantErr != nil { - require.Error(t, err) - require.Equal(t, testCase.wantErr, err) - } else { - require.NoError(t, err) - require.NotNil(t, got) - require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len()) - gotMetricCount := 0 - gotDatapointCount := 0 - for i := 0; i < got.ResourceMetrics().Len(); i++ { - rm := got.ResourceMetrics().At(i) - require.Equal(t, 1, rm.ScopeMetrics().Len()) - ilm := rm.ScopeMetrics().At(0) - gotMetricCount += ilm.Metrics().Len() - for j := 0; j < ilm.Metrics().Len(); j++ { - metric := ilm.Metrics().At(j) - gotDatapointCount += metric.Summary().DataPoints().Len() - } - } - require.Equal(t, testCase.wantMetricCount, gotMetricCount) - require.Equal(t, testCase.wantDatapointCount, gotDatapointCount) - } + got, err := unmarshaler.UnmarshalMetrics(records) + require.Equal(t, testCase.wantErr, err) + require.NotNil(t, got) + require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len()) + require.Equal(t, testCase.wantMetricCount, got.MetricCount()) + require.Equal(t, testCase.wantDatapointCount, got.DataPointCount()) }) } } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go index c3dde9699e90..3d7c4a273aab 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler.go @@ -36,8 +36,8 @@ func NewUnmarshaler(logger *zap.Logger) *Unmarshaler { return &Unmarshaler{logger} } -// Unmarshal deserializes the records into pmetric.Metrics -func (u Unmarshaler) Unmarshal(records [][]byte) (pmetric.Metrics, error) { +// UnmarshalMetrics deserializes the records into pmetric.Metrics +func (u Unmarshaler) UnmarshalMetrics(records [][]byte) (pmetric.Metrics, error) { md := pmetric.NewMetrics() for recordIndex, record := range records { dataLen, pos := len(record), 0 diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go index 0f5a1ee3c905..401ad17c8e22 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/otlpmetricstream/unmarshaler_test.go @@ -97,29 +97,11 @@ func TestUnmarshal(t *testing.T) { } for name, testCase := range testCases { t.Run(name, func(t *testing.T) { - got, err := unmarshaler.Unmarshal(testCase.records) - if testCase.wantErr != nil { - require.Error(t, err) - require.Equal(t, testCase.wantErr, err) - } else { - require.NoError(t, err) - require.NotNil(t, got) - require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len()) - gotMetricCount := 0 - gotDatapointCount := 0 - for i := 0; i < got.ResourceMetrics().Len(); i++ { - rm := got.ResourceMetrics().At(i) - require.Equal(t, 1, rm.ScopeMetrics().Len()) - ilm := rm.ScopeMetrics().At(0) - gotMetricCount += ilm.Metrics().Len() - for j := 0; j < ilm.Metrics().Len(); j++ { - metric := ilm.Metrics().At(j) - gotDatapointCount += metric.Summary().DataPoints().Len() - } - } - require.Equal(t, testCase.wantMetricCount, gotMetricCount) - require.Equal(t, testCase.wantDatapointCount, gotDatapointCount) - } + got, err := unmarshaler.UnmarshalMetrics(testCase.records) + require.NoError(t, err) + require.Equal(t, testCase.wantResourceCount, got.ResourceMetrics().Len()) + require.Equal(t, testCase.wantMetricCount, got.MetricCount()) + require.Equal(t, testCase.wantDatapointCount, got.DataPointCount()) }) } } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go index 0ffb4b0a80e8..30498474f419 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshaler.go @@ -10,8 +10,8 @@ import ( // MetricsUnmarshaler deserializes the message body type MetricsUnmarshaler interface { - // Unmarshal deserializes the records into metrics. - Unmarshal(records [][]byte) (pmetric.Metrics, error) + // UnmarshalMetrics deserializes the records into metrics. + UnmarshalMetrics(records [][]byte) (pmetric.Metrics, error) // Type of the serialized messages. Type() string @@ -19,8 +19,20 @@ type MetricsUnmarshaler interface { // LogsUnmarshaler deserializes the message body type LogsUnmarshaler interface { - // Unmarshal deserializes the records into logs. - Unmarshal(records [][]byte) (plog.Logs, error) + // UnmarshalLogs deserializes the records into logs. + UnmarshalLogs(records [][]byte) (plog.Logs, error) + + // Type of the serialized messages. + Type() string +} + +// Unmarshaler deserializes the message body +type Unmarshaler interface { + // UnmarshalMetrics deserializes the records into metrics. + UnmarshalMetrics(records [][]byte) (pmetric.Metrics, error) + + // UnmarshalLogs deserializes the records into logs. + UnmarshalLogs(records [][]byte) (plog.Logs, error) // Type of the serialized messages. Type() string diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go index 79f29caecfdb..7e44458dddf7 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler.go @@ -18,26 +18,14 @@ type NopLogsUnmarshaler struct { var _ unmarshaler.LogsUnmarshaler = (*NopLogsUnmarshaler)(nil) -// NewNopLogs provides a nop logs unmarshaler with the default -// plog.Logs and no error. -func NewNopLogs() *NopLogsUnmarshaler { - return &NopLogsUnmarshaler{} +// NewNopLogs provides a nop logs unmarshaler with the passed +// error and logs as the result of the unmarshal. +func NewNopLogs(logs plog.Logs, err error) *NopLogsUnmarshaler { + return &NopLogsUnmarshaler{logs: logs, err: err} } -// NewWithLogs provides a nop logs unmarshaler with the passed -// in logs as the result of the Unmarshal and no error. -func NewWithLogs(logs plog.Logs) *NopLogsUnmarshaler { - return &NopLogsUnmarshaler{logs: logs} -} - -// NewErrLogs provides a nop logs unmarshaler with the passed -// in error as the Unmarshal error. -func NewErrLogs(err error) *NopLogsUnmarshaler { - return &NopLogsUnmarshaler{err: err} -} - -// Unmarshal deserializes the records into logs. -func (u *NopLogsUnmarshaler) Unmarshal([][]byte) (plog.Logs, error) { +// UnmarshalLogs deserializes the records into logs. +func (u *NopLogsUnmarshaler) UnmarshalLogs([][]byte) (plog.Logs, error) { return u.logs, u.err } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go index ce90c351cfbd..eb1bf5aac3ec 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_logs_unmarshaler_test.go @@ -5,37 +5,38 @@ package unmarshalertest import ( "errors" + "go.opentelemetry.io/collector/pdata/plog" "testing" "github.com/stretchr/testify/require" - "go.opentelemetry.io/collector/pdata/plog" ) func TestNewNopLogs(t *testing.T) { - unmarshaler := NewNopLogs() - got, err := unmarshaler.Unmarshal(nil) - require.NoError(t, err) - require.NotNil(t, got) - require.Equal(t, typeStr, unmarshaler.Type()) -} - -func TestNewWithLogs(t *testing.T) { - logs := plog.NewLogs() - logs.ResourceLogs().AppendEmpty() - unmarshaler := NewWithLogs(logs) - got, err := unmarshaler.Unmarshal(nil) - require.NoError(t, err) - require.NotNil(t, got) - require.Equal(t, logs, got) - require.Equal(t, typeStr, unmarshaler.Type()) -} + t.Parallel() + tests := []struct { + name string + logs plog.Logs + err error + }{ + { + name: "no error", + logs: plog.NewLogs(), + err: nil, + }, + { + name: "with error", + logs: plog.NewLogs(), + err: errors.New("test error"), + }, + } -func TestNewErrLogs(t *testing.T) { - wantErr := errors.New("test error") - unmarshaler := NewErrLogs(wantErr) - got, err := unmarshaler.Unmarshal(nil) - require.Error(t, err) - require.Equal(t, wantErr, err) - require.NotNil(t, got) - require.Equal(t, typeStr, unmarshaler.Type()) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + unmarshaler := NewNopLogs(test.logs, test.err) + got, err := unmarshaler.UnmarshalLogs(nil) + require.Equal(t, test.err, err) + require.Equal(t, test.logs, got) + require.Equal(t, typeStr, unmarshaler.Type()) + }) + } } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go index a8f5c36319e3..488039499173 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler.go @@ -20,26 +20,14 @@ type NopMetricsUnmarshaler struct { var _ unmarshaler.MetricsUnmarshaler = (*NopMetricsUnmarshaler)(nil) -// NewNopMetrics provides a nop metrics unmarshaler with the default -// pmetric.Metrics and no error. -func NewNopMetrics() *NopMetricsUnmarshaler { - return &NopMetricsUnmarshaler{} +// NewNopMetrics provides a nop metrics unmarshaler with the passed +// error and metrics as the result of the unmarshal. +func NewNopMetrics(metrics pmetric.Metrics, err error) *NopMetricsUnmarshaler { + return &NopMetricsUnmarshaler{metrics: metrics, err: err} } -// NewWithMetrics provides a nop metrics unmarshaler with the passed -// in metrics as the result of the Unmarshal and no error. -func NewWithMetrics(metrics pmetric.Metrics) *NopMetricsUnmarshaler { - return &NopMetricsUnmarshaler{metrics: metrics} -} - -// NewErrMetrics provides a nop metrics unmarshaler with the passed -// in error as the Unmarshal error. -func NewErrMetrics(err error) *NopMetricsUnmarshaler { - return &NopMetricsUnmarshaler{err: err} -} - -// Unmarshal deserializes the records into metrics. -func (u *NopMetricsUnmarshaler) Unmarshal([][]byte) (pmetric.Metrics, error) { +// UnmarshalMetrics deserializes the records into metrics. +func (u *NopMetricsUnmarshaler) UnmarshalMetrics([][]byte) (pmetric.Metrics, error) { return u.metrics, u.err } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go index 572c39bc475c..dc1c6d449bd5 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/unmarshalertest/nop_metrics_unmarshaler_test.go @@ -12,30 +12,31 @@ import ( ) func TestNewNopMetrics(t *testing.T) { - unmarshaler := NewNopMetrics() - got, err := unmarshaler.Unmarshal(nil) - require.NoError(t, err) - require.NotNil(t, got) - require.Equal(t, typeStr, unmarshaler.Type()) -} - -func TestNewWithMetrics(t *testing.T) { - metrics := pmetric.NewMetrics() - metrics.ResourceMetrics().AppendEmpty() - unmarshaler := NewWithMetrics(metrics) - got, err := unmarshaler.Unmarshal(nil) - require.NoError(t, err) - require.NotNil(t, got) - require.Equal(t, metrics, got) - require.Equal(t, typeStr, unmarshaler.Type()) -} + t.Parallel() + tests := []struct { + name string + metrics pmetric.Metrics + err error + }{ + { + name: "no error", + metrics: pmetric.NewMetrics(), + err: nil, + }, + { + name: "with error", + metrics: pmetric.NewMetrics(), + err: errors.New("test error"), + }, + } -func TestNewErrMetrics(t *testing.T) { - wantErr := errors.New("test error") - unmarshaler := NewErrMetrics(wantErr) - got, err := unmarshaler.Unmarshal(nil) - require.Error(t, err) - require.Equal(t, wantErr, err) - require.NotNil(t, got) - require.Equal(t, typeStr, unmarshaler.Type()) + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + unmarshaler := NewNopMetrics(test.metrics, test.err) + got, err := unmarshaler.UnmarshalMetrics(nil) + require.Equal(t, test.err, err) + require.Equal(t, test.metrics, got) + require.Equal(t, typeStr, unmarshaler.Type()) + }) + } } diff --git a/receiver/awsfirehosereceiver/logs_receiver.go b/receiver/awsfirehosereceiver/logs_receiver.go index 570e6cf1e745..0b5153c69865 100644 --- a/receiver/awsfirehosereceiver/logs_receiver.go +++ b/receiver/awsfirehosereceiver/logs_receiver.go @@ -62,8 +62,13 @@ func newLogsReceiver( // Consume uses the configured unmarshaler to deserialize the records into a // single plog.Logs. It will send the final result // to the next consumer. -func (mc *logsConsumer) Consume(ctx context.Context, records [][]byte, commonAttributes map[string]string) (int, error) { - md, err := mc.unmarshaler.Unmarshal(records) +func (mc *logsConsumer) Consume( + ctx context.Context, + records [][]byte, + commonAttributes map[string]string, +) (int, error) { + md, err := mc.unmarshaler.UnmarshalLogs(records) + if err != nil { return http.StatusBadRequest, err } diff --git a/receiver/awsfirehosereceiver/logs_receiver_test.go b/receiver/awsfirehosereceiver/logs_receiver_test.go index 6739f8137929..6df2ad1786a3 100644 --- a/receiver/awsfirehosereceiver/logs_receiver_test.go +++ b/receiver/awsfirehosereceiver/logs_receiver_test.go @@ -97,7 +97,7 @@ func TestLogsConsumer(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { mc := &logsConsumer{ - unmarshaler: unmarshalertest.NewErrLogs(testCase.unmarshalerErr), + unmarshaler: unmarshalertest.NewNopLogs(plog.NewLogs(), testCase.unmarshalerErr), consumer: consumertest.NewErr(testCase.consumerErr), } gotStatus, gotErr := mc.Consume(context.TODO(), nil, nil) @@ -111,7 +111,7 @@ func TestLogsConsumer(t *testing.T) { base.ResourceLogs().AppendEmpty() rc := logsRecordConsumer{} mc := &logsConsumer{ - unmarshaler: unmarshalertest.NewWithLogs(base), + unmarshaler: unmarshalertest.NewNopLogs(base, nil), consumer: &rc, } gotStatus, gotErr := mc.Consume(context.TODO(), nil, map[string]string{ diff --git a/receiver/awsfirehosereceiver/metrics_receiver.go b/receiver/awsfirehosereceiver/metrics_receiver.go index 4a5128583ac0..d9628f057234 100644 --- a/receiver/awsfirehosereceiver/metrics_receiver.go +++ b/receiver/awsfirehosereceiver/metrics_receiver.go @@ -64,8 +64,12 @@ func newMetricsReceiver( // single pmetric.Metrics. If there are common attributes available, then it will // attach those to each of the pcommon.Resources. It will send the final result // to the next consumer. -func (mc *metricsConsumer) Consume(ctx context.Context, records [][]byte, commonAttributes map[string]string) (int, error) { - md, err := mc.unmarshaler.Unmarshal(records) +func (mc *metricsConsumer) Consume( + ctx context.Context, + records [][]byte, + commonAttributes map[string]string, +) (int, error) { + md, err := mc.unmarshaler.UnmarshalMetrics(records) if err != nil { return http.StatusBadRequest, err } diff --git a/receiver/awsfirehosereceiver/metrics_receiver_test.go b/receiver/awsfirehosereceiver/metrics_receiver_test.go index d32ec4efc8a5..fde7ecee243b 100644 --- a/receiver/awsfirehosereceiver/metrics_receiver_test.go +++ b/receiver/awsfirehosereceiver/metrics_receiver_test.go @@ -98,7 +98,7 @@ func TestMetricsConsumer(t *testing.T) { for name, testCase := range testCases { t.Run(name, func(t *testing.T) { mc := &metricsConsumer{ - unmarshaler: unmarshalertest.NewErrMetrics(testCase.unmarshalerErr), + unmarshaler: unmarshalertest.NewNopMetrics(pmetric.NewMetrics(), testCase.unmarshalerErr), consumer: consumertest.NewErr(testCase.consumerErr), } gotStatus, gotErr := mc.Consume(context.TODO(), nil, nil) @@ -112,7 +112,7 @@ func TestMetricsConsumer(t *testing.T) { base.ResourceMetrics().AppendEmpty() rc := recordConsumer{} mc := &metricsConsumer{ - unmarshaler: unmarshalertest.NewWithMetrics(base), + unmarshaler: unmarshalertest.NewNopMetrics(base, nil), consumer: &rc, } gotStatus, gotErr := mc.Consume(context.TODO(), nil, map[string]string{ diff --git a/receiver/awsfirehosereceiver/receiver.go b/receiver/awsfirehosereceiver/receiver.go index baa9750b6162..9d4f76b93f94 100644 --- a/receiver/awsfirehosereceiver/receiver.go +++ b/receiver/awsfirehosereceiver/receiver.go @@ -9,10 +9,10 @@ import ( "encoding/json" "errors" "fmt" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression" "io" "net" "net/http" - "strconv" "sync" "time" @@ -28,6 +28,7 @@ const ( headerFirehoseCommonAttributes = "X-Amz-Firehose-Common-Attributes" headerContentType = "Content-Type" headerContentLength = "Content-Length" + headerContentEncoding = "Content-Encoding" ) var ( @@ -154,6 +155,7 @@ func (fmr *firehoseReceiver) Shutdown(context.Context) error { func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { ctx := r.Context() + // extract and validate request id requestID := r.Header.Get(headerFirehoseRequestID) if requestID == "" { fmr.settings.Logger.Error( @@ -165,6 +167,7 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { } fmr.settings.Logger.Debug("Processing Firehose request", zap.String("RequestID", requestID)) + // validate access key if statusCode, err := fmr.validate(r); err != nil { fmr.settings.Logger.Error( "Invalid Firehose request", @@ -174,44 +177,60 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { return } + // parse the body body, err := fmr.getBody(r) if err != nil { + fmr.settings.Logger.Error( + "Failed to parse the request body", + zap.Error(err), + ) fmr.sendResponse(w, requestID, http.StatusBadRequest, err) return } + // unmarshall request var fr firehoseRequest if err = json.Unmarshal(body, &fr); err != nil { + fmr.settings.Logger.Error( + "Failed to unmarshall firehose request", + zap.Error(err), + ) fmr.sendResponse(w, requestID, http.StatusBadRequest, err) return } + // validate request id if fr.RequestID == "" { + fmr.settings.Logger.Error( + "Missing firehose request ID", + zap.Error(err), + ) fmr.sendResponse(w, requestID, http.StatusBadRequest, errInBodyMissingRequestID) return } else if fr.RequestID != requestID { + fmr.settings.Logger.Error( + "Firehose request ID does not match the one in the header", + zap.Error(err), + zap.String(headerFirehoseRequestID, requestID), + zap.String("requestID", fr.RequestID), + ) fmr.sendResponse(w, requestID, http.StatusBadRequest, errInBodyDiffRequestID) return } - records := make([][]byte, 0, len(fr.Records)) - for index, record := range fr.Records { - if record.Data != "" { - var decoded []byte - decoded, err = base64.StdEncoding.DecodeString(record.Data) - if err != nil { - fmr.sendResponse( - w, - requestID, - http.StatusBadRequest, - fmt.Errorf("unable to base64 decode the record at index %d: %w", index, err), - ) - return - } - records = append(records, decoded) - } + // transform records + encoding := r.Header.Get(headerContentEncoding) + records, err := fmr.transformRecords(fr.Records, encoding == "gzip") + if err != nil { + fmr.settings.Logger.Error( + "Failed decoding the records", + zap.Error(err), + ) + fmr.sendResponse(w, requestID, http.StatusBadRequest, err) + return } + // extract common attributes commonAttributes, err := fmr.getCommonAttributes(r) if err != nil { fmr.settings.Logger.Error( @@ -220,6 +239,7 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { ) } + // consume telemetry statusCode, err := fmr.consumer.Consume(ctx, records, commonAttributes) if err != nil { fmr.settings.Logger.Error( @@ -233,6 +253,33 @@ func (fmr *firehoseReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request) { fmr.sendResponse(w, requestID, http.StatusOK, nil) } +// transformRecords transforms the received records data by decoding them and decompressing them, +// if gzip is the content encoding +func (fmr *firehoseReceiver) transformRecords(records []firehoseRecord, decompress bool) ([][]byte, error) { + transformed := make([][]byte, 0, len(records)) + for index, record := range records { + if record.Data == "" { + continue + } + + data, err := base64.StdEncoding.DecodeString(record.Data) + if err != nil { + return transformed, fmt.Errorf("unable to base64 decode the record at index %d: %w", index, err) + } + + if decompress { + data, err = compression.Unzip(data) + if err != nil { + return transformed, fmt.Errorf("unable to unzip decoded record at index %d: %w", index, err) + } + } + + transformed = append(transformed, data) + } + + return transformed, nil +} + // validate checks the Firehose access key in the header against // the one passed into the Config func (fmr *firehoseReceiver) validate(r *http.Request) (int, error) { @@ -252,11 +299,7 @@ func (fmr *firehoseReceiver) getBody(r *http.Request) ([]byte, error) { if err != nil { return nil, err } - err = r.Body.Close() - if err != nil { - return nil, err - } - return body, nil + return body, r.Body.Close() } // getCommonAttributes unmarshalls the common attributes from the request header @@ -285,7 +328,6 @@ func (fmr *firehoseReceiver) sendResponse(w http.ResponseWriter, requestID strin } payload, _ := json.Marshal(body) w.Header().Set(headerContentType, "application/json") - w.Header().Set(headerContentLength, strconv.Itoa(len(payload))) w.WriteHeader(statusCode) if _, err = w.Write(payload); err != nil { fmr.settings.Logger.Error("Failed to send response", zap.Error(err))