Skip to content

Commit

Permalink
use processorhelperprofiles
Browse files Browse the repository at this point in the history
  • Loading branch information
haoqixu committed Oct 29, 2024
1 parent 70404bc commit 00ff5b5
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 23 deletions.
33 changes: 12 additions & 21 deletions processor/k8sattributesprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumerprofiles"
"go.opentelemetry.io/collector/pdata/pprofile"
"go.opentelemetry.io/collector/processor"
"go.opentelemetry.io/collector/processor/processorhelper"
"go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles"
"go.opentelemetry.io/collector/processor/processorprofiles"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/k8sconfig"
Expand Down Expand Up @@ -141,34 +141,25 @@ func createLogsProcessorWithOptions(
processorhelper.WithShutdown(kp.Shutdown))
}

type profiles struct {
component.StartFunc
component.ShutdownFunc
consumerprofiles.Profiles
}

func createProfilesProcessorWithOptions(
_ context.Context,
ctx context.Context,
set processor.Settings,
cfg component.Config,
nextProfilesConsumer consumerprofiles.Profiles,
options ...option,
) (processorprofiles.Profiles, error) {
kp := createKubernetesProcessor(set, cfg, options...)

profilesConsumer, err := consumerprofiles.NewProfiles(func(ctx context.Context, pd pprofile.Profiles) (err error) {
pd = kp.processProfiles(ctx, pd)
return nextProfilesConsumer.ConsumeProfiles(ctx, pd)
}, consumer.WithCapabilities(consumerCapabilities))
if err != nil {
return nil, err
}

return &profiles{
StartFunc: kp.Start,
ShutdownFunc: kp.Shutdown,
Profiles: profilesConsumer,
}, nil
return processorhelperprofiles.NewProfiles(
ctx,
set,
cfg,
nextProfilesConsumer,
kp.processProfiles,
processorhelperprofiles.WithCapabilities(consumerCapabilities),
processorhelperprofiles.WithStart(kp.Start),
processorhelperprofiles.WithShutdown(kp.Shutdown),
)
}

func createKubernetesProcessor(
Expand Down
1 change: 1 addition & 0 deletions processor/k8sattributesprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
go.opentelemetry.io/collector/pdata/pprofile v0.112.0
go.opentelemetry.io/collector/pipeline v0.112.0
go.opentelemetry.io/collector/processor v0.112.0
go.opentelemetry.io/collector/processor/processorhelper/processorhelperprofiles v0.0.0-20241029112935-002a74860455
go.opentelemetry.io/collector/processor/processorprofiles v0.112.0
go.opentelemetry.io/collector/processor/processortest v0.112.0
go.opentelemetry.io/collector/receiver v0.112.0
Expand Down
2 changes: 2 additions & 0 deletions processor/k8sattributesprocessor/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions processor/k8sattributesprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,13 @@ func (kp *kubernetesprocessor) processLogs(ctx context.Context, ld plog.Logs) (p
}

// processProfiles process profiles and add k8s metadata using resource IP, hostname or incoming IP as pod origin.
func (kp *kubernetesprocessor) processProfiles(ctx context.Context, pd pprofile.Profiles) pprofile.Profiles {
func (kp *kubernetesprocessor) processProfiles(ctx context.Context, pd pprofile.Profiles) (pprofile.Profiles, error) {
rp := pd.ResourceProfiles()
for i := 0; i < rp.Len(); i++ {
kp.processResource(ctx, rp.At(i).Resource())
}

return pd
return pd, nil
}

// processResource adds Pod metadata tags to resource based on pod association configuration
Expand Down

0 comments on commit 00ff5b5

Please sign in to comment.