diff --git a/.chloggen/add-tracing-information-to-all-components.yaml b/.chloggen/add-tracing-information-to-all-components.yaml new file mode 100644 index 00000000000..42281b19b8e --- /dev/null +++ b/.chloggen/add-tracing-information-to-all-components.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: consumer + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add tracing information to all the components" + +# One or more tracking issues or pull requests related to the change +issues: [8804] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/consumer/consumer.go b/consumer/consumer.go index 64076655f20..79c92b69e49 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -24,3 +24,9 @@ func WithCapabilities(capabilities Capabilities) Option { o.Cap = capabilities } } + +func WithObsReport(report internal.ObsReport) Option { + return func(o *internal.BaseImpl) { + o.ObsReport = report + } +} diff --git a/consumer/internal/consumer.go b/consumer/internal/consumer.go index 1f2b5683b22..23bfdc4a094 100644 --- a/consumer/internal/consumer.go +++ b/consumer/internal/consumer.go @@ -18,7 +18,8 @@ type BaseConsumer interface { } type BaseImpl struct { - Cap Capabilities + Cap Capabilities + ObsReport ObsReport } // Option to construct new consumers. @@ -31,7 +32,8 @@ func (bs BaseImpl) Capabilities() Capabilities { func NewBaseImpl(options ...Option) *BaseImpl { bs := &BaseImpl{ - Cap: Capabilities{MutatesData: false}, + Cap: Capabilities{MutatesData: false}, + ObsReport: noopObsReport, } for _, op := range options { diff --git a/consumer/internal/obsreport.go b/consumer/internal/obsreport.go new file mode 100644 index 00000000000..ab7d9927dfb --- /dev/null +++ b/consumer/internal/obsreport.go @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/consumer/internal" + +import "context" + +// ObsReport contains information required to make an implementor +// of Consumer observable. +type ObsReport interface { + StartTracesOp(context.Context) context.Context + EndTracesOp(context.Context, int, error) +} + +type baseObsReport struct{} + +func (bor baseObsReport) StartTracesOp(ctx context.Context) context.Context { + return ctx +} + +func (bor baseObsReport) EndTracesOp(_ context.Context, _ int, _ error) {} + +var noopObsReport = baseObsReport{} diff --git a/consumer/logs.go b/consumer/logs.go index 15166ef1196..80e1c969432 100644 --- a/consumer/logs.go +++ b/consumer/logs.go @@ -36,8 +36,17 @@ func NewLogs(consume ConsumeLogsFunc, options ...Option) (Logs, error) { if consume == nil { return nil, errNilFunc } + baseImpl := internal.NewBaseImpl(options...) + fn := func(ctx context.Context, ld plog.Logs) error { + baseImpl.ObsReport.StartTracesOp(ctx) + logRecordCount := ld.LogRecordCount() + err := consume(ctx, ld) + baseImpl.ObsReport.EndTracesOp(ctx, logRecordCount, err) + return err + } + return &baseLogs{ - BaseImpl: internal.NewBaseImpl(options...), - ConsumeLogsFunc: consume, + BaseImpl: baseImpl, + ConsumeLogsFunc: fn, }, nil } diff --git a/consumer/metrics.go b/consumer/metrics.go index 47897f9363a..fbf28bc4615 100644 --- a/consumer/metrics.go +++ b/consumer/metrics.go @@ -36,8 +36,17 @@ func NewMetrics(consume ConsumeMetricsFunc, options ...Option) (Metrics, error) if consume == nil { return nil, errNilFunc } + baseImpl := internal.NewBaseImpl(options...) + fn := func(ctx context.Context, ld pmetric.Metrics) error { + ctx = baseImpl.ObsReport.StartTracesOp(ctx) + dataPointCount := ld.DataPointCount() + err := consume(ctx, ld) + baseImpl.ObsReport.EndTracesOp(ctx, dataPointCount, err) + return err + } + return &baseMetrics{ - BaseImpl: internal.NewBaseImpl(options...), - ConsumeMetricsFunc: consume, + BaseImpl: baseImpl, + ConsumeMetricsFunc: fn, }, nil } diff --git a/consumer/traces.go b/consumer/traces.go index 60df2d04536..27e5fc842dc 100644 --- a/consumer/traces.go +++ b/consumer/traces.go @@ -36,8 +36,18 @@ func NewTraces(consume ConsumeTracesFunc, options ...Option) (Traces, error) { if consume == nil { return nil, errNilFunc } + + baseImpl := internal.NewBaseImpl(options...) + fn := func(ctx context.Context, td ptrace.Traces) error { + ctx = baseImpl.ObsReport.StartTracesOp(ctx) + spanCount := td.SpanCount() + err := consume(ctx, td) + baseImpl.ObsReport.EndTracesOp(ctx, spanCount, err) + return err + } + return &baseTraces{ - BaseImpl: internal.NewBaseImpl(options...), - ConsumeTracesFunc: consume, + BaseImpl: baseImpl, + ConsumeTracesFunc: fn, }, nil }