diff --git a/.chloggen/resource-processor-add-profiles.yaml b/.chloggen/resource-processor-add-profiles.yaml new file mode 100644 index 000000000000..02fecc36f855 --- /dev/null +++ b/.chloggen/resource-processor-add-profiles.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: resourceprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for profile signal type + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [359979] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/processor/resourceprocessor/README.md b/processor/resourceprocessor/README.md index 19c93b68acdf..8d1ce0ff6df8 100644 --- a/processor/resourceprocessor/README.md +++ b/processor/resourceprocessor/README.md @@ -3,11 +3,13 @@ | Status | | | ------------- |-----------| -| Stability | [beta]: traces, metrics, logs | +| Stability | [development]: profiles | +| | [beta]: traces, metrics, logs | | Distributions | [core], [contrib], [k8s] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fresource%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fresource) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fresource%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fresource) | | [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@dmitryax](https://www.github.com/dmitryax) | +[development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development [beta]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#beta [core]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol [contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib diff --git a/processor/resourceprocessor/factory.go b/processor/resourceprocessor/factory.go index 69ec5220b17a..d0d6b57f74ee 100644 --- a/processor/resourceprocessor/factory.go +++ b/processor/resourceprocessor/factory.go @@ -8,8 +8,11 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper" + "go.opentelemetry.io/collector/processor/xprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/resourceprocessor/internal/metadata" @@ -19,12 +22,14 @@ var processorCapabilities = consumer.Capabilities{MutatesData: true} // NewFactory returns a new factory for the Resource processor. func NewFactory() processor.Factory { - return processor.NewFactory( + return xprocessor.NewFactory( metadata.Type, createDefaultConfig, - processor.WithTraces(createTracesProcessor, metadata.TracesStability), - processor.WithMetrics(createMetricsProcessor, metadata.MetricsStability), - processor.WithLogs(createLogsProcessor, metadata.LogsStability)) + xprocessor.WithTraces(createTracesProcessor, metadata.TracesStability), + xprocessor.WithMetrics(createMetricsProcessor, metadata.MetricsStability), + xprocessor.WithLogs(createLogsProcessor, metadata.LogsStability), + xprocessor.WithProfiles(createProfilesProcessor, metadata.ProfilesStability), + ) } // Note: This isn't a valid configuration because the processor would do no work. @@ -91,3 +96,23 @@ func createLogsProcessor( proc.processLogs, processorhelper.WithCapabilities(processorCapabilities)) } + +func createProfilesProcessor( + ctx context.Context, + set processor.Settings, + cfg component.Config, + nextConsumer xconsumer.Profiles, +) (xprocessor.Profiles, error) { + attrProc, err := attraction.NewAttrProc(&attraction.Settings{Actions: cfg.(*Config).AttributesActions}) + if err != nil { + return nil, err + } + proc := resourceProcessor{logger: set.Logger, attrProc: attrProc} + return xprocessorhelper.NewProfiles( + ctx, + set, + cfg, + nextConsumer, + proc.processProfiles, + xprocessorhelper.WithCapabilities(processorCapabilities)) +} diff --git a/processor/resourceprocessor/factory_test.go b/processor/resourceprocessor/factory_test.go index 8f09bb173e2a..a4d2f7ab04d2 100644 --- a/processor/resourceprocessor/factory_test.go +++ b/processor/resourceprocessor/factory_test.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/processor/processortest" + "go.opentelemetry.io/collector/processor/xprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction" ) @@ -37,6 +38,14 @@ func TestCreateProcessor(t *testing.T) { mp, err := factory.CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) assert.NoError(t, err) assert.NotNil(t, mp) + + lp, err := factory.CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + assert.NoError(t, err) + assert.NotNil(t, lp) + + pp, err := factory.(xprocessor.Factory).CreateProfiles(context.Background(), processortest.NewNopSettings(), cfg, consumertest.NewNop()) + assert.NoError(t, err) + assert.NotNil(t, pp) } func TestInvalidAttributeActions(t *testing.T) { @@ -52,4 +61,10 @@ func TestInvalidAttributeActions(t *testing.T) { _, err = factory.CreateMetrics(context.Background(), processortest.NewNopSettings(), cfg, nil) assert.Error(t, err) + + _, err = factory.CreateLogs(context.Background(), processortest.NewNopSettings(), cfg, nil) + assert.Error(t, err) + + _, err = factory.(xprocessor.Factory).CreateProfiles(context.Background(), processortest.NewNopSettings(), cfg, nil) + assert.Error(t, err) } diff --git a/processor/resourceprocessor/go.mod b/processor/resourceprocessor/go.mod index 2625ce438589..bcec8cb184e1 100644 --- a/processor/resourceprocessor/go.mod +++ b/processor/resourceprocessor/go.mod @@ -11,9 +11,13 @@ require ( go.opentelemetry.io/collector/confmap v1.24.1-0.20250121185328-fbefb22cc2b3 go.opentelemetry.io/collector/consumer v1.24.1-0.20250121185328-fbefb22cc2b3 go.opentelemetry.io/collector/consumer/consumertest v0.118.1-0.20250121185328-fbefb22cc2b3 + go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250121185328-fbefb22cc2b3 go.opentelemetry.io/collector/pdata v1.24.1-0.20250121185328-fbefb22cc2b3 + go.opentelemetry.io/collector/pdata/pprofile v0.118.1-0.20250121185328-fbefb22cc2b3 go.opentelemetry.io/collector/processor v0.118.1-0.20250121185328-fbefb22cc2b3 + go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.118.1-0.20250121185328-fbefb22cc2b3 go.opentelemetry.io/collector/processor/processortest v0.118.1-0.20250121185328-fbefb22cc2b3 + go.opentelemetry.io/collector/processor/xprocessor v0.118.1-0.20250121185328-fbefb22cc2b3 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 ) @@ -40,11 +44,8 @@ require ( go.opentelemetry.io/collector/client v1.24.1-0.20250121185328-fbefb22cc2b3 // indirect go.opentelemetry.io/collector/component/componentstatus v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect - go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect - go.opentelemetry.io/collector/pdata/pprofile v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect go.opentelemetry.io/collector/pdata/testdata v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect go.opentelemetry.io/collector/pipeline v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect - go.opentelemetry.io/collector/processor/xprocessor v0.118.1-0.20250121185328-fbefb22cc2b3 // indirect go.opentelemetry.io/otel v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/sdk v1.34.0 // indirect diff --git a/processor/resourceprocessor/go.sum b/processor/resourceprocessor/go.sum index 1f4efccfe7ae..1e8bb13aa0ca 100644 --- a/processor/resourceprocessor/go.sum +++ b/processor/resourceprocessor/go.sum @@ -82,6 +82,8 @@ go.opentelemetry.io/collector/pipeline v0.118.1-0.20250121185328-fbefb22cc2b3 h1 go.opentelemetry.io/collector/pipeline v0.118.1-0.20250121185328-fbefb22cc2b3/go.mod h1:qE3DmoB05AW0C3lmPvdxZqd/H4po84NPzd5MrqgtL74= go.opentelemetry.io/collector/processor v0.118.1-0.20250121185328-fbefb22cc2b3 h1:wnZcg7/EbMmnLXeY3cVZqMl2S2TI5tS0e9PXQlkvqgI= go.opentelemetry.io/collector/processor v0.118.1-0.20250121185328-fbefb22cc2b3/go.mod h1:8J99pUrKfp0oHair1PuAy5iXnzhbBFoXOB/KOOZCCX0= +go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.118.1-0.20250121185328-fbefb22cc2b3 h1:+tokCHMHqlH4ICXLEsynQnrf/9CR5tDPPpzWoICQEGg= +go.opentelemetry.io/collector/processor/processorhelper/xprocessorhelper v0.118.1-0.20250121185328-fbefb22cc2b3/go.mod h1:WbABuKEwhN+RVrTfPVwDt0mFGgHkMpnRN7hYn0EhQdw= go.opentelemetry.io/collector/processor/processortest v0.118.1-0.20250121185328-fbefb22cc2b3 h1:3rdVJHD7UFFFdYTTN410Vd5G65LzTZSGM44oHzZka+g= go.opentelemetry.io/collector/processor/processortest v0.118.1-0.20250121185328-fbefb22cc2b3/go.mod h1:NPidF4tGoxv3R2KizO89/Yetl43fiibQFyEUcBPICrQ= go.opentelemetry.io/collector/processor/xprocessor v0.118.1-0.20250121185328-fbefb22cc2b3 h1:BfFDyPLGOMuYBPCYUUp3I3/6bc5cFWZBkJdaxx3ftoc= diff --git a/processor/resourceprocessor/internal/metadata/generated_status.go b/processor/resourceprocessor/internal/metadata/generated_status.go index 9b6327066964..e609844c2a1d 100644 --- a/processor/resourceprocessor/internal/metadata/generated_status.go +++ b/processor/resourceprocessor/internal/metadata/generated_status.go @@ -12,7 +12,8 @@ var ( ) const ( - TracesStability = component.StabilityLevelBeta - MetricsStability = component.StabilityLevelBeta - LogsStability = component.StabilityLevelBeta + ProfilesStability = component.StabilityLevelDevelopment + TracesStability = component.StabilityLevelBeta + MetricsStability = component.StabilityLevelBeta + LogsStability = component.StabilityLevelBeta ) diff --git a/processor/resourceprocessor/metadata.yaml b/processor/resourceprocessor/metadata.yaml index 51277506c27f..c3240e99d5b8 100644 --- a/processor/resourceprocessor/metadata.yaml +++ b/processor/resourceprocessor/metadata.yaml @@ -3,6 +3,7 @@ type: resource status: class: processor stability: + development: [profiles] beta: [traces, metrics, logs] distributions: [core, contrib, k8s] codeowners: diff --git a/processor/resourceprocessor/resource_processor.go b/processor/resourceprocessor/resource_processor.go index a7588418415f..3fed3c6a5f2c 100644 --- a/processor/resourceprocessor/resource_processor.go +++ b/processor/resourceprocessor/resource_processor.go @@ -8,6 +8,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" @@ -42,3 +43,11 @@ func (rp *resourceProcessor) processLogs(ctx context.Context, ld plog.Logs) (plo } return ld, nil } + +func (rp *resourceProcessor) processProfiles(ctx context.Context, pd pprofile.Profiles) (pprofile.Profiles, error) { + rps := pd.ResourceProfiles() + for i := 0; i < rps.Len(); i++ { + rp.attrProc.Process(ctx, rp.logger, rps.At(i).Resource().Attributes()) + } + return pd, nil +} diff --git a/processor/resourceprocessor/resource_processor_test.go b/processor/resourceprocessor/resource_processor_test.go index eb81bbf4bf8e..1cd915e34ac7 100644 --- a/processor/resourceprocessor/resource_processor_test.go +++ b/processor/resourceprocessor/resource_processor_test.go @@ -10,10 +10,13 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/processor/processortest" + "go.opentelemetry.io/collector/processor/xprocessor" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/attraction" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal/testdata" @@ -129,6 +132,20 @@ func TestResourceProcessorAttributesUpsert(t *testing.T) { logs := tln.AllLogs() require.Len(t, logs, 1) assert.NoError(t, plogtest.CompareLogs(wantLogData, logs[0])) + + // Test profiles consumer + tpn := new(consumertest.ProfilesSink) + rpp, err := factory.(xprocessor.Factory).CreateProfiles(context.Background(), processortest.NewNopSettings(), tt.config, tpn) + require.NoError(t, err) + assert.True(t, rpp.Capabilities().MutatesData) + + sourceProfileData := generateProfileData(tt.sourceAttributes) + wantProfileData := generateProfileData(tt.wantAttributes) + err = rpp.ConsumeProfiles(context.Background(), sourceProfileData) + require.NoError(t, err) + profiles := tpn.AllProfiles() + require.Len(t, profiles, 1) + compareProfileAttributes(t, wantProfileData, sourceProfileData) }) } } @@ -168,3 +185,29 @@ func generateLogData(attributes map[string]string) plog.Logs { } return ld } + +func generateProfileData(attributes map[string]string) pprofile.Profiles { + p := pprofile.NewProfiles() + rp := p.ResourceProfiles().AppendEmpty() + + for k, v := range attributes { + rp.Resource().Attributes().PutStr(k, v) + } + return p +} + +func compareProfileAttributes(t *testing.T, expected pprofile.Profiles, got pprofile.Profiles) { + require.Equal(t, expected.ResourceProfiles().Len(), got.ResourceProfiles().Len()) + + for i := 0; i < expected.ResourceProfiles().Len(); i++ { + expectedResourceProfile := expected.ResourceProfiles().At(i) + gotResourceProfile := got.ResourceProfiles().At(i) + + expectedResourceProfile.Resource().Attributes().Range(func(k string, v pcommon.Value) bool { + get, ok := gotResourceProfile.Resource().Attributes().Get(k) + require.True(t, ok) + require.Equal(t, v, get) + return true + }) + } +}