From 00ff5b599094c167fd7bfa54c6a8c453baed0b63 Mon Sep 17 00:00:00 2001 From: haoqixu Date: Tue, 29 Oct 2024 22:38:56 +0800 Subject: [PATCH] use processorhelperprofiles --- processor/k8sattributesprocessor/factory.go | 33 +++++++------------ processor/k8sattributesprocessor/go.mod | 1 + processor/k8sattributesprocessor/go.sum | 2 ++ processor/k8sattributesprocessor/processor.go | 4 +-- 4 files changed, 17 insertions(+), 23 deletions(-) diff --git a/processor/k8sattributesprocessor/factory.go b/processor/k8sattributesprocessor/factory.go index 2edcaaa6d57b..f2f05e8bf4ea 100644 --- a/processor/k8sattributesprocessor/factory.go +++ b/processor/k8sattributesprocessor/factory.go @@ -9,9 +9,9 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumerprofiles" - "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/processor/processorhelper" + "go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles" "go.opentelemetry.io/collector/processor/processorprofiles" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig" @@ -141,14 +141,8 @@ func createLogsProcessorWithOptions( processorhelper.WithShutdown(kp.Shutdown)) } -type profiles struct { - component.StartFunc - component.ShutdownFunc - consumerprofiles.Profiles -} - func createProfilesProcessorWithOptions( - _ context.Context, + ctx context.Context, set processor.Settings, cfg component.Config, nextProfilesConsumer consumerprofiles.Profiles, @@ -156,19 +150,16 @@ func createProfilesProcessorWithOptions( ) (processorprofiles.Profiles, error) { kp := createKubernetesProcessor(set, cfg, options...) - profilesConsumer, err := consumerprofiles.NewProfiles(func(ctx context.Context, pd pprofile.Profiles) (err error) { - pd = kp.processProfiles(ctx, pd) - return nextProfilesConsumer.ConsumeProfiles(ctx, pd) - }, consumer.WithCapabilities(consumerCapabilities)) - if err != nil { - return nil, err - } - - return &profiles{ - StartFunc: kp.Start, - ShutdownFunc: kp.Shutdown, - Profiles: profilesConsumer, - }, nil + return processorhelperprofiles.NewProfiles( + ctx, + set, + cfg, + nextProfilesConsumer, + kp.processProfiles, + processorhelperprofiles.WithCapabilities(consumerCapabilities), + processorhelperprofiles.WithStart(kp.Start), + processorhelperprofiles.WithShutdown(kp.Shutdown), + ) } func createKubernetesProcessor( diff --git a/processor/k8sattributesprocessor/go.mod b/processor/k8sattributesprocessor/go.mod index 8306d2f68425..2fb7f1e884cf 100644 --- a/processor/k8sattributesprocessor/go.mod +++ b/processor/k8sattributesprocessor/go.mod @@ -23,6 +23,7 @@ require ( go.opentelemetry.io/collector/pdata/pprofile v0.112.0 go.opentelemetry.io/collector/pipeline v0.112.0 go.opentelemetry.io/collector/processor v0.112.0 + go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles v0.0.0-20241029112935-002a74860455 go.opentelemetry.io/collector/processor/processorprofiles v0.112.0 go.opentelemetry.io/collector/processor/processortest v0.112.0 go.opentelemetry.io/collector/receiver v0.112.0 diff --git a/processor/k8sattributesprocessor/go.sum b/processor/k8sattributesprocessor/go.sum index 2ed5624b91bc..b69094ccc583 100644 --- a/processor/k8sattributesprocessor/go.sum +++ b/processor/k8sattributesprocessor/go.sum @@ -1279,6 +1279,8 @@ go.opentelemetry.io/collector/pipeline v0.112.0 h1:jqKDdb8k53OLPibvxzX6fmMec0ZHA go.opentelemetry.io/collector/pipeline v0.112.0/go.mod h1:4vOvjVsoYTHVGTbfFwqfnQOSV2K3RKUHofh3jNRc2Mg= go.opentelemetry.io/collector/processor v0.112.0 h1:nMv9DOBYR9MB78ddUgY3A3ytwAwk3t4HQMNIu+w8o0g= go.opentelemetry.io/collector/processor v0.112.0/go.mod h1:AJ8EHq8Z/ev90f4gU6G5ULUncdpWmBRATYk8ioR3pvw= +go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles v0.0.0-20241029112935-002a74860455 h1:va4D0kykoFBAozz/gn/rxuqqGnVX8Fo0p/WiXuKn7W8= +go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles v0.0.0-20241029112935-002a74860455/go.mod h1:L9jBHdFye2OepokuUfvWytp+xg1RAlaPP7aURJ+n4vM= go.opentelemetry.io/collector/processor/processorprofiles v0.112.0 h1:Aef68SAbmBbhbsZZPuZb0ECwkV05vIcHIizGOGbWsbM= go.opentelemetry.io/collector/processor/processorprofiles v0.112.0/go.mod h1:OUS7GcPCvFAIERSUFJLMtj6MSUOTCuS2pGKB7B+OHXs= go.opentelemetry.io/collector/processor/processortest v0.112.0 h1:kW7kZ6EC1YjBiOvdajxN/DxvVljr9MKMemHheoaYcFc= diff --git a/processor/k8sattributesprocessor/processor.go b/processor/k8sattributesprocessor/processor.go index fb578f1699b8..5d63cbbd100a 100644 --- a/processor/k8sattributesprocessor/processor.go +++ b/processor/k8sattributesprocessor/processor.go @@ -119,13 +119,13 @@ func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (p } // processProfiles process profiles and add k8s metadata using resource IP, hostname or incoming IP as pod origin. -func (kp *kubernetesprocessor) processProfiles(ctx context.Context, pd pprofile.Profiles) pprofile.Profiles { +func (kp *kubernetesprocessor) processProfiles(ctx context.Context, pd pprofile.Profiles) (pprofile.Profiles, error) { rp := pd.ResourceProfiles() for i := 0; i < rp.Len(); i++ { kp.processResource(ctx, rp.At(i).Resource()) } - return pd + return pd, nil } // processResource adds Pod metadata tags to resource based on pod association configuration