diff --git a/internal/filter/filterottl/filter.go b/internal/filter/filterottl/filter.go index 705b2acf5a4e..a5af8f87d017 100644 --- a/internal/filter/filterottl/filter.go +++ b/internal/filter/filterottl/filter.go @@ -20,7 +20,12 @@ import ( // The passed in functions should use the ottlspan.TransformContext. // If a function named `match` is not present in the function map it will be added automatically so that parsing works as expected func NewBoolExprForSpan(conditions []string, functions map[string]ottl.Factory[ottlspan.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings) (*ottl.ConditionSequence[ottlspan.TransformContext], error) { - parser, err := ottlspan.NewParser(functions, set) + return NewBoolExprForSpanWithOptions(conditions, functions, errorMode, set, nil) +} + +// NewBoolExprForSpanWithOptions is like NewBoolExprForSpan, but with additional options. +func NewBoolExprForSpanWithOptions(conditions []string, functions map[string]ottl.Factory[ottlspan.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings, parserOptions []ottlspan.Option) (*ottl.ConditionSequence[ottlspan.TransformContext], error) { + parser, err := ottlspan.NewParser(functions, set, parserOptions...) if err != nil { return nil, err } @@ -36,7 +41,12 @@ func NewBoolExprForSpan(conditions []string, functions map[string]ottl.Factory[o // The passed in functions should use the ottlspanevent.TransformContext. // If a function named `match` is not present in the function map it will be added automatically so that parsing works as expected func NewBoolExprForSpanEvent(conditions []string, functions map[string]ottl.Factory[ottlspanevent.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings) (*ottl.ConditionSequence[ottlspanevent.TransformContext], error) { - parser, err := ottlspanevent.NewParser(functions, set) + return NewBoolExprForSpanEventWithOptions(conditions, functions, errorMode, set, nil) +} + +// NewBoolExprForSpanEventWithOptions is like NewBoolExprForSpanEvent, but with additional options. +func NewBoolExprForSpanEventWithOptions(conditions []string, functions map[string]ottl.Factory[ottlspanevent.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings, parserOptions []ottlspanevent.Option) (*ottl.ConditionSequence[ottlspanevent.TransformContext], error) { + parser, err := ottlspanevent.NewParser(functions, set, parserOptions...) if err != nil { return nil, err } @@ -52,7 +62,12 @@ func NewBoolExprForSpanEvent(conditions []string, functions map[string]ottl.Fact // The passed in functions should use the ottlmetric.TransformContext. // If a function named `match` is not present in the function map it will be added automatically so that parsing works as expected func NewBoolExprForMetric(conditions []string, functions map[string]ottl.Factory[ottlmetric.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings) (*ottl.ConditionSequence[ottlmetric.TransformContext], error) { - parser, err := ottlmetric.NewParser(functions, set) + return NewBoolExprForMetricWithOptions(conditions, functions, errorMode, set, nil) +} + +// NewBoolExprForMetricWithOptions is like NewBoolExprForMetric, but with additional options. +func NewBoolExprForMetricWithOptions(conditions []string, functions map[string]ottl.Factory[ottlmetric.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings, parserOptions []ottlmetric.Option) (*ottl.ConditionSequence[ottlmetric.TransformContext], error) { + parser, err := ottlmetric.NewParser(functions, set, parserOptions...) if err != nil { return nil, err } @@ -68,7 +83,12 @@ func NewBoolExprForMetric(conditions []string, functions map[string]ottl.Factory // The passed in functions should use the ottldatapoint.TransformContext. // If a function named `match` is not present in the function map it will be added automatically so that parsing works as expected func NewBoolExprForDataPoint(conditions []string, functions map[string]ottl.Factory[ottldatapoint.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings) (*ottl.ConditionSequence[ottldatapoint.TransformContext], error) { - parser, err := ottldatapoint.NewParser(functions, set) + return NewBoolExprForDataPointWithOptions(conditions, functions, errorMode, set, nil) +} + +// NewBoolExprForDataPointWithOptions is like NewBoolExprForDataPoint, but with additional options. +func NewBoolExprForDataPointWithOptions(conditions []string, functions map[string]ottl.Factory[ottldatapoint.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings, parserOptions []ottldatapoint.Option) (*ottl.ConditionSequence[ottldatapoint.TransformContext], error) { + parser, err := ottldatapoint.NewParser(functions, set, parserOptions...) if err != nil { return nil, err } @@ -84,7 +104,12 @@ func NewBoolExprForDataPoint(conditions []string, functions map[string]ottl.Fact // The passed in functions should use the ottllog.TransformContext. // If a function named `match` is not present in the function map it will be added automatically so that parsing works as expected func NewBoolExprForLog(conditions []string, functions map[string]ottl.Factory[ottllog.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings) (*ottl.ConditionSequence[ottllog.TransformContext], error) { - parser, err := ottllog.NewParser(functions, set) + return NewBoolExprForLogWithOptions(conditions, functions, errorMode, set, nil) +} + +// NewBoolExprForLogWithOptions is like NewBoolExprForLog, but with additional options. +func NewBoolExprForLogWithOptions(conditions []string, functions map[string]ottl.Factory[ottllog.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings, parserOptions []ottllog.Option) (*ottl.ConditionSequence[ottllog.TransformContext], error) { + parser, err := ottllog.NewParser(functions, set, parserOptions...) if err != nil { return nil, err } @@ -100,7 +125,12 @@ func NewBoolExprForLog(conditions []string, functions map[string]ottl.Factory[ot // The passed in functions should use the ottlresource.TransformContext. // If a function named `match` is not present in the function map it will be added automatically so that parsing works as expected func NewBoolExprForResource(conditions []string, functions map[string]ottl.Factory[ottlresource.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings) (*ottl.ConditionSequence[ottlresource.TransformContext], error) { - parser, err := ottlresource.NewParser(functions, set) + return NewBoolExprForResourceWithOptions(conditions, functions, errorMode, set, nil) +} + +// NewBoolExprForResourceWithOptions is like NewBoolExprForResource, but with additional options. +func NewBoolExprForResourceWithOptions(conditions []string, functions map[string]ottl.Factory[ottlresource.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings, parserOptions []ottlresource.Option) (*ottl.ConditionSequence[ottlresource.TransformContext], error) { + parser, err := ottlresource.NewParser(functions, set, parserOptions...) if err != nil { return nil, err } @@ -116,7 +146,12 @@ func NewBoolExprForResource(conditions []string, functions map[string]ottl.Facto // The passed in functions should use the ottlresource.TransformContext. // If a function named `match` is not present in the function map it will be added automatically so that parsing works as expected func NewBoolExprForScope(conditions []string, functions map[string]ottl.Factory[ottlscope.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings) (*ottl.ConditionSequence[ottlscope.TransformContext], error) { - parser, err := ottlscope.NewParser(functions, set) + return NewBoolExprForScopeWithOptions(conditions, functions, errorMode, set, nil) +} + +// NewBoolExprForScopeWithOptions is like NewBoolExprForScope, but with additional options. +func NewBoolExprForScopeWithOptions(conditions []string, functions map[string]ottl.Factory[ottlscope.TransformContext], errorMode ottl.ErrorMode, set component.TelemetrySettings, parserOptions []ottlscope.Option) (*ottl.ConditionSequence[ottlscope.TransformContext], error) { + parser, err := ottlscope.NewParser(functions, set, parserOptions...) if err != nil { return nil, err } diff --git a/pkg/ottl/contexts/internal/errors.go b/pkg/ottl/contexts/internal/errors.go index 7ccc931e2ba4..1d13fc59d600 100644 --- a/pkg/ottl/contexts/internal/errors.go +++ b/pkg/ottl/contexts/internal/errors.go @@ -3,7 +3,10 @@ package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/internal" -import "fmt" +import ( + "fmt" + "strings" +) const ( DefaultErrorMessage = "segment %q from path %q is not a valid path nor a valid OTTL keyword for the %v context - review %v to see all valid paths" @@ -20,3 +23,8 @@ const ( func FormatDefaultErrorMessage(pathSegment, fullPath, context, ref string) error { return fmt.Errorf(DefaultErrorMessage, pathSegment, fullPath, context, ref) } + +func FormatCacheErrorMessage(lowerContext, pathContext, fullPath string) error { + pathSuggestion := strings.Replace(fullPath, pathContext+".", lowerContext+".", 1) + return fmt.Errorf(`access to cache must be performed using the same statement's context, please replace "%s" by "%s"`, fullPath, pathSuggestion) +} diff --git a/pkg/ottl/contexts/internal/path.go b/pkg/ottl/contexts/internal/path.go index 954d14329646..6dbcccebb835 100644 --- a/pkg/ottl/contexts/internal/path.go +++ b/pkg/ottl/contexts/internal/path.go @@ -16,6 +16,7 @@ type TestPath[K any] struct { N string KeySlice []ottl.Key[K] NextPath *TestPath[K] + FullPath string } func (p *TestPath[K]) Name() string { @@ -38,6 +39,9 @@ func (p *TestPath[K]) Keys() []ottl.Key[K] { } func (p *TestPath[K]) String() string { + if p.FullPath != "" { + return p.FullPath + } return p.N } diff --git a/pkg/ottl/contexts/internal/resource.go b/pkg/ottl/contexts/internal/resource.go index a3ae9d149f3f..7b454d50ae8c 100644 --- a/pkg/ottl/contexts/internal/resource.go +++ b/pkg/ottl/contexts/internal/resource.go @@ -20,7 +20,7 @@ type ResourceContext interface { GetResourceSchemaURLItem() SchemaURLItem } -func ResourcePathGetSetter[K ResourceContext](path ottl.Path[K]) (ottl.GetSetter[K], error) { +func ResourcePathGetSetter[K ResourceContext](lowerContext string, path ottl.Path[K]) (ottl.GetSetter[K], error) { if path == nil { return nil, FormatDefaultErrorMessage(ResourceContextName, ResourceContextName, "Resource", ResourceContextRef) } @@ -34,6 +34,8 @@ func ResourcePathGetSetter[K ResourceContext](path ottl.Path[K]) (ottl.GetSetter return accessResourceDroppedAttributesCount[K](), nil case "schema_url": return accessResourceSchemaURLItem[K](), nil + case "cache": + return nil, FormatCacheErrorMessage(lowerContext, path.Context(), path.String()) default: return nil, FormatDefaultErrorMessage(path.Name(), path.String(), "Resource", ResourceContextRef) } diff --git a/pkg/ottl/contexts/internal/resource_test.go b/pkg/ottl/contexts/internal/resource_test.go index f1b251a3e22e..c1fc35cd63cd 100644 --- a/pkg/ottl/contexts/internal/resource_test.go +++ b/pkg/ottl/contexts/internal/resource_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" @@ -311,7 +312,7 @@ func TestResourcePathGetSetter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - accessor, err := ResourcePathGetSetter[*resourceContext](tt.path) + accessor, err := ResourcePathGetSetter[*resourceContext](tt.path.Context(), tt.path) assert.NoError(t, err) resource := createResource() @@ -331,6 +332,23 @@ func TestResourcePathGetSetter(t *testing.T) { } } +func TestResourcePathGetSetterCacheAccessError(t *testing.T) { + path := &TestPath[*resourceContext]{ + N: "cache", + C: "resource", + KeySlice: []ottl.Key[*resourceContext]{ + &TestKey[*resourceContext]{ + S: ottltest.Strp("key"), + }, + }, + FullPath: "resource.cache[key]", + } + + _, err := ResourcePathGetSetter[*resourceContext]("log", path) + require.Error(t, err) + require.Contains(t, err.Error(), `replace "resource.cache[key]" by "log.cache[key]"`) +} + func createResource() pcommon.Resource { resource := pcommon.NewResource() resource.Attributes().PutStr("str", "val") diff --git a/pkg/ottl/contexts/internal/scope.go b/pkg/ottl/contexts/internal/scope.go index 4fabf9ee7d89..35a40a3869c3 100644 --- a/pkg/ottl/contexts/internal/scope.go +++ b/pkg/ottl/contexts/internal/scope.go @@ -21,7 +21,7 @@ type InstrumentationScopeContext interface { GetScopeSchemaURLItem() SchemaURLItem } -func ScopePathGetSetter[K InstrumentationScopeContext](path ottl.Path[K]) (ottl.GetSetter[K], error) { +func ScopePathGetSetter[K InstrumentationScopeContext](lowerContext string, path ottl.Path[K]) (ottl.GetSetter[K], error) { if path == nil { return nil, FormatDefaultErrorMessage(InstrumentationScopeContextName, InstrumentationScopeContextName, "Instrumentation Scope", InstrumentationScopeRef) } @@ -40,6 +40,8 @@ func ScopePathGetSetter[K InstrumentationScopeContext](path ottl.Path[K]) (ottl. return accessInstrumentationScopeDroppedAttributesCount[K](), nil case "schema_url": return accessInstrumentationScopeSchemaURLItem[K](), nil + case "cache": + return nil, FormatCacheErrorMessage(lowerContext, path.Context(), path.String()) default: return nil, FormatDefaultErrorMessage(path.Name(), path.String(), "Instrumentation Scope", InstrumentationScopeRef) } diff --git a/pkg/ottl/contexts/internal/scope_test.go b/pkg/ottl/contexts/internal/scope_test.go index 788924115bd5..61c550178473 100644 --- a/pkg/ottl/contexts/internal/scope_test.go +++ b/pkg/ottl/contexts/internal/scope_test.go @@ -8,6 +8,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl" @@ -349,7 +350,7 @@ func TestScopePathGetSetter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - accessor, err := ScopePathGetSetter[*instrumentationScopeContext](tt.path) + accessor, err := ScopePathGetSetter[*instrumentationScopeContext](tt.path.Context(), tt.path) assert.NoError(t, err) is := createInstrumentationScope() @@ -369,6 +370,23 @@ func TestScopePathGetSetter(t *testing.T) { } } +func TestScopePathGetSetterCacheAccessError(t *testing.T) { + path := &TestPath[*instrumentationScopeContext]{ + N: "cache", + C: "instrumentation_scope", + KeySlice: []ottl.Key[*instrumentationScopeContext]{ + &TestKey[*instrumentationScopeContext]{ + S: ottltest.Strp("key"), + }, + }, + FullPath: "instrumentation_scope.cache[key]", + } + + _, err := ScopePathGetSetter[*instrumentationScopeContext]("metric", path) + require.Error(t, err) + require.Contains(t, err.Error(), `replace "instrumentation_scope.cache[key]" by "metric.cache[key]"`) +} + func createInstrumentationScope() pcommon.InstrumentationScope { is := pcommon.NewInstrumentationScope() is.SetName("library") diff --git a/pkg/ottl/contexts/internal/span.go b/pkg/ottl/contexts/internal/span.go index 4669adb7b8fe..fc7ca27177ef 100644 --- a/pkg/ottl/contexts/internal/span.go +++ b/pkg/ottl/contexts/internal/span.go @@ -38,7 +38,7 @@ var SpanSymbolTable = map[ottl.EnumSymbol]ottl.Enum{ "STATUS_CODE_ERROR": ottl.Enum(ptrace.StatusCodeError), } -func SpanPathGetSetter[K SpanContext](path ottl.Path[K]) (ottl.GetSetter[K], error) { +func SpanPathGetSetter[K SpanContext](lowerContext string, path ottl.Path[K]) (ottl.GetSetter[K], error) { if path == nil { return nil, FormatDefaultErrorMessage(SpanContextName, SpanContextName, SpanContextNameDescription, SpanRef) } @@ -128,6 +128,8 @@ func SpanPathGetSetter[K SpanContext](path ottl.Path[K]) (ottl.GetSetter[K], err } } return accessStatus[K](), nil + case "cache": + return nil, FormatCacheErrorMessage(lowerContext, path.Context(), path.String()) default: return nil, FormatDefaultErrorMessage(path.Name(), path.String(), SpanContextNameDescription, SpanRef) } diff --git a/pkg/ottl/contexts/internal/span_test.go b/pkg/ottl/contexts/internal/span_test.go index 899d12493424..b9e5391765c7 100644 --- a/pkg/ottl/contexts/internal/span_test.go +++ b/pkg/ottl/contexts/internal/span_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" @@ -603,7 +604,7 @@ func TestSpanPathGetSetter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - accessor, err := SpanPathGetSetter[*spanContext](tt.path) + accessor, err := SpanPathGetSetter[*spanContext](tt.path.Context(), tt.path) assert.NoError(t, err) span := createSpan() @@ -623,6 +624,23 @@ func TestSpanPathGetSetter(t *testing.T) { } } +func TestSpanPathGetSetterCacheAccessError(t *testing.T) { + path := &TestPath[*spanContext]{ + N: "cache", + C: "span", + KeySlice: []ottl.Key[*spanContext]{ + &TestKey[*spanContext]{ + S: ottltest.Strp("key"), + }, + }, + FullPath: "span.cache[key]", + } + + _, err := SpanPathGetSetter[*spanContext]("spanevent", path) + require.Error(t, err) + require.Contains(t, err.Error(), `replace "span.cache[key]" by "spanevent.cache[key]"`) +} + func createSpan() ptrace.Span { span := ptrace.NewSpan() span.SetTraceID(traceID) diff --git a/pkg/ottl/contexts/ottldatapoint/datapoint.go b/pkg/ottl/contexts/ottldatapoint/datapoint.go index 06c806f126d5..4f491c82caf7 100644 --- a/pkg/ottl/contexts/ottldatapoint/datapoint.go +++ b/pkg/ottl/contexts/ottldatapoint/datapoint.go @@ -64,22 +64,32 @@ func (tCtx TransformContext) MarshalLogObject(encoder zapcore.ObjectEncoder) err type Option func(*ottl.Parser[TransformContext]) -func NewTransformContext(dataPoint any, metric pmetric.Metric, metrics pmetric.MetricSlice, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) TransformContext { - return NewTransformContextWithCache(dataPoint, metric, metrics, instrumentationScope, resource, scopeMetrics, resourceMetrics, pcommon.NewMap()) -} +type TransformContextOption func(*TransformContext) -// Experimental: *NOTE* this function is subject to change or removal in the future. -func NewTransformContextWithCache(dataPoint any, metric pmetric.Metric, metrics pmetric.MetricSlice, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics, cache pcommon.Map) TransformContext { - return TransformContext{ +func NewTransformContext(dataPoint any, metric pmetric.Metric, metrics pmetric.MetricSlice, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics, options ...TransformContextOption) TransformContext { + tc := TransformContext{ dataPoint: dataPoint, metric: metric, metrics: metrics, instrumentationScope: instrumentationScope, resource: resource, - cache: cache, + cache: pcommon.NewMap(), scopeMetrics: scopeMetrics, resourceMetrics: resourceMetrics, } + for _, opt := range options { + opt(&tc) + } + return tc +} + +// Experimental: *NOTE* this option is subject to change or removal in the future. +func WithCache(cache *pcommon.Map) TransformContextOption { + return func(p *TransformContext) { + if cache != nil { + p.cache = *cache + } + } } func (tCtx TransformContext) GetDataPoint() any { @@ -294,9 +304,9 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot func (pep *pathExpressionParser) parseHigherContextPath(context string, path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { switch context { case internal.ResourceContextName: - return internal.ResourcePathGetSetter(path) + return internal.ResourcePathGetSetter(ContextName, path) case internal.InstrumentationScopeContextName: - return internal.ScopePathGetSetter(path) + return internal.ScopePathGetSetter(ContextName, path) case internal.MetricContextName: return internal.MetricPathGetSetter(path) default: diff --git a/pkg/ottl/contexts/ottllog/log.go b/pkg/ottl/contexts/ottllog/log.go index 98fb1a09e734..99970f4a093b 100644 --- a/pkg/ottl/contexts/ottllog/log.go +++ b/pkg/ottl/contexts/ottllog/log.go @@ -71,20 +71,30 @@ func (tCtx TransformContext) MarshalLogObject(encoder zapcore.ObjectEncoder) err type Option func(*ottl.Parser[TransformContext]) -func NewTransformContext(logRecord plog.LogRecord, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeLogs plog.ScopeLogs, resourceLogs plog.ResourceLogs) TransformContext { - return NewTransformContextWithCache(logRecord, instrumentationScope, resource, scopeLogs, resourceLogs, pcommon.NewMap()) -} +type TransformContextOption func(*TransformContext) -// Experimental: *NOTE* this function is subject to change or removal in the future. -func NewTransformContextWithCache(logRecord plog.LogRecord, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeLogs plog.ScopeLogs, resourceLogs plog.ResourceLogs, cache pcommon.Map) TransformContext { - return TransformContext{ +func NewTransformContext(logRecord plog.LogRecord, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeLogs plog.ScopeLogs, resourceLogs plog.ResourceLogs, options ...TransformContextOption) TransformContext { + tc := TransformContext{ logRecord: logRecord, instrumentationScope: instrumentationScope, resource: resource, - cache: cache, + cache: pcommon.NewMap(), scopeLogs: scopeLogs, resourceLogs: resourceLogs, } + for _, opt := range options { + opt(&tc) + } + return tc +} + +// Experimental: *NOTE* this option is subject to change or removal in the future. +func WithCache(cache *pcommon.Map) TransformContextOption { + return func(p *TransformContext) { + if cache != nil { + p.cache = *cache + } + } } func (tCtx TransformContext) GetLogRecord() plog.LogRecord { @@ -295,9 +305,9 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot func (pep *pathExpressionParser) parseHigherContextPath(context string, path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { switch context { case internal.ResourceContextName: - return internal.ResourcePathGetSetter(path) + return internal.ResourcePathGetSetter(ContextName, path) case internal.InstrumentationScopeContextName: - return internal.ScopePathGetSetter(path) + return internal.ScopePathGetSetter(ContextName, path) default: var fullPath string if path != nil { diff --git a/pkg/ottl/contexts/ottlmetric/metrics.go b/pkg/ottl/contexts/ottlmetric/metrics.go index eaf75ee7bb73..04b7b5218bf1 100644 --- a/pkg/ottl/contexts/ottlmetric/metrics.go +++ b/pkg/ottl/contexts/ottlmetric/metrics.go @@ -38,21 +38,31 @@ type TransformContext struct { type Option func(*ottl.Parser[TransformContext]) -func NewTransformContext(metric pmetric.Metric, metrics pmetric.MetricSlice, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) TransformContext { - return NewTransformContextWithCache(metric, metrics, instrumentationScope, resource, scopeMetrics, resourceMetrics, pcommon.NewMap()) -} +type TransformContextOption func(*TransformContext) -// Experimental: *NOTE* this function is subject to change or removal in the future. -func NewTransformContextWithCache(metric pmetric.Metric, metrics pmetric.MetricSlice, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics, cache pcommon.Map) TransformContext { - return TransformContext{ +func NewTransformContext(metric pmetric.Metric, metrics pmetric.MetricSlice, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics, options ...TransformContextOption) TransformContext { + tc := TransformContext{ metric: metric, metrics: metrics, instrumentationScope: instrumentationScope, resource: resource, - cache: cache, + cache: pcommon.NewMap(), scopeMetrics: scopeMetrics, resourceMetrics: resourceMetrics, } + for _, opt := range options { + opt(&tc) + } + return tc +} + +// Experimental: *NOTE* this option is subject to change or removal in the future. +func WithCache(cache *pcommon.Map) TransformContextOption { + return func(p *TransformContext) { + if cache != nil { + p.cache = *cache + } + } } func (tCtx TransformContext) GetMetric() pmetric.Metric { @@ -190,9 +200,9 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot func (pep *pathExpressionParser) parseHigherContextPath(context string, path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { switch context { case internal.ResourceContextName: - return internal.ResourcePathGetSetter(path) + return internal.ResourcePathGetSetter(ContextName, path) case internal.InstrumentationScopeContextName: - return internal.ScopePathGetSetter(path) + return internal.ScopePathGetSetter(ContextName, path) default: var fullPath string if path != nil { diff --git a/pkg/ottl/contexts/ottlresource/resource.go b/pkg/ottl/contexts/ottlresource/resource.go index 3db08a9df6f6..6c44ec4d161f 100644 --- a/pkg/ottl/contexts/ottlresource/resource.go +++ b/pkg/ottl/contexts/ottlresource/resource.go @@ -41,17 +41,27 @@ func (tCtx TransformContext) MarshalLogObject(encoder zapcore.ObjectEncoder) err type Option func(*ottl.Parser[TransformContext]) -func NewTransformContext(resource pcommon.Resource, schemaURLItem internal.SchemaURLItem) TransformContext { - return NewTransformContextWithCache(resource, schemaURLItem, pcommon.NewMap()) -} +type TransformContextOption func(*TransformContext) -// Experimental: *NOTE* this function is subject to change or removal in the future. -func NewTransformContextWithCache(resource pcommon.Resource, schemaURLItem internal.SchemaURLItem, cache pcommon.Map) TransformContext { - return TransformContext{ +func NewTransformContext(resource pcommon.Resource, schemaURLItem internal.SchemaURLItem, options ...TransformContextOption) TransformContext { + tc := TransformContext{ resource: resource, - cache: cache, + cache: pcommon.NewMap(), schemaURLItem: schemaURLItem, } + for _, opt := range options { + opt(&tc) + } + return tc +} + +// Experimental: *NOTE* this option is subject to change or removal in the future. +func WithCache(cache *pcommon.Map) TransformContextOption { + return func(p *TransformContext) { + if cache != nil { + p.cache = *cache + } + } } func (tCtx TransformContext) GetResource() pcommon.Resource { @@ -149,7 +159,7 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot } return accessCacheKey(path.Keys()), nil default: - return internal.ResourcePathGetSetter[TransformContext](path) + return internal.ResourcePathGetSetter[TransformContext](ContextName, path) } } diff --git a/pkg/ottl/contexts/ottlscope/scope.go b/pkg/ottl/contexts/ottlscope/scope.go index 42611cfea729..4ccc8f472d53 100644 --- a/pkg/ottl/contexts/ottlscope/scope.go +++ b/pkg/ottl/contexts/ottlscope/scope.go @@ -44,18 +44,28 @@ func (tCtx TransformContext) MarshalLogObject(encoder zapcore.ObjectEncoder) err type Option func(*ottl.Parser[TransformContext]) -func NewTransformContext(instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, schemaURLItem internal.SchemaURLItem) TransformContext { - return NewTransformContextWithCache(instrumentationScope, resource, schemaURLItem, pcommon.NewMap()) -} +type TransformContextOption func(*TransformContext) -// Experimental: *NOTE* this function is subject to change or removal in the future. -func NewTransformContextWithCache(instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, schemaURLItem internal.SchemaURLItem, cache pcommon.Map) TransformContext { - return TransformContext{ +func NewTransformContext(instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, schemaURLItem internal.SchemaURLItem, options ...TransformContextOption) TransformContext { + tc := TransformContext{ instrumentationScope: instrumentationScope, resource: resource, - cache: cache, + cache: pcommon.NewMap(), schemaURLItem: schemaURLItem, } + for _, opt := range options { + opt(&tc) + } + return tc +} + +// Experimental: *NOTE* this option is subject to change or removal in the future. +func WithCache(cache *pcommon.Map) TransformContextOption { + return func(p *TransformContext) { + if cache != nil { + p.cache = *cache + } + } } func (tCtx TransformContext) GetInstrumentationScope() pcommon.InstrumentationScope { @@ -169,14 +179,14 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot } return accessCacheKey(path.Keys()), nil default: - return internal.ScopePathGetSetter[TransformContext](path) + return internal.ScopePathGetSetter[TransformContext](ContextName, path) } } func (pep *pathExpressionParser) parseHigherContextPath(context string, path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { switch context { case internal.ResourceContextName: - return internal.ResourcePathGetSetter[TransformContext](path) + return internal.ResourcePathGetSetter[TransformContext](ContextName, path) default: var fullPath string if path != nil { diff --git a/pkg/ottl/contexts/ottlspan/span.go b/pkg/ottl/contexts/ottlspan/span.go index 85b1efedb6e6..8f7b4c05f268 100644 --- a/pkg/ottl/contexts/ottlspan/span.go +++ b/pkg/ottl/contexts/ottlspan/span.go @@ -48,20 +48,30 @@ func (tCtx TransformContext) MarshalLogObject(encoder zapcore.ObjectEncoder) err type Option func(*ottl.Parser[TransformContext]) -func NewTransformContext(span ptrace.Span, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeSpans ptrace.ScopeSpans, resourceSpans ptrace.ResourceSpans) TransformContext { - return NewTransformContextWithCache(span, instrumentationScope, resource, scopeSpans, resourceSpans, pcommon.NewMap()) -} +type TransformContextOption func(*TransformContext) -// Experimental: *NOTE* this function is subject to change or removal in the future. -func NewTransformContextWithCache(span ptrace.Span, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeSpans ptrace.ScopeSpans, resourceSpans ptrace.ResourceSpans, cache pcommon.Map) TransformContext { - return TransformContext{ +func NewTransformContext(span ptrace.Span, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeSpans ptrace.ScopeSpans, resourceSpans ptrace.ResourceSpans, options ...TransformContextOption) TransformContext { + tc := TransformContext{ span: span, instrumentationScope: instrumentationScope, resource: resource, - cache: cache, + cache: pcommon.NewMap(), scopeSpans: scopeSpans, resourceSpans: resourceSpans, } + for _, opt := range options { + opt(&tc) + } + return tc +} + +// Experimental: *NOTE* this option is subject to change or removal in the future. +func WithCache(cache *pcommon.Map) TransformContextOption { + return func(p *TransformContext) { + if cache != nil { + p.cache = *cache + } + } } func (tCtx TransformContext) GetSpan() ptrace.Span { @@ -186,16 +196,16 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot } return accessCacheKey(path.Keys()), nil default: - return internal.SpanPathGetSetter[TransformContext](path) + return internal.SpanPathGetSetter[TransformContext](ContextName, path) } } func (pep *pathExpressionParser) parseHigherContextPath(context string, path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { switch context { case internal.ResourceContextName: - return internal.ResourcePathGetSetter[TransformContext](path) + return internal.ResourcePathGetSetter[TransformContext](ContextName, path) case internal.InstrumentationScopeContextName: - return internal.ScopePathGetSetter[TransformContext](path) + return internal.ScopePathGetSetter[TransformContext](ContextName, path) default: var fullPath string if path != nil { diff --git a/pkg/ottl/contexts/ottlspanevent/span_events.go b/pkg/ottl/contexts/ottlspanevent/span_events.go index ac2b7488da9e..839147639d6c 100644 --- a/pkg/ottl/contexts/ottlspanevent/span_events.go +++ b/pkg/ottl/contexts/ottlspanevent/span_events.go @@ -52,21 +52,31 @@ func (tCtx TransformContext) MarshalLogObject(encoder zapcore.ObjectEncoder) err type Option func(*ottl.Parser[TransformContext]) -func NewTransformContext(spanEvent ptrace.SpanEvent, span ptrace.Span, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeSpans ptrace.ScopeSpans, resourceSpans ptrace.ResourceSpans) TransformContext { - return NewTransformContextWithCache(spanEvent, span, instrumentationScope, resource, scopeSpans, resourceSpans, pcommon.NewMap()) -} +type TransformContextOption func(*TransformContext) -// Experimental: *NOTE* this function is subject to change or removal in the future. -func NewTransformContextWithCache(spanEvent ptrace.SpanEvent, span ptrace.Span, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeSpans ptrace.ScopeSpans, resourceSpans ptrace.ResourceSpans, cache pcommon.Map) TransformContext { - return TransformContext{ +func NewTransformContext(spanEvent ptrace.SpanEvent, span ptrace.Span, instrumentationScope pcommon.InstrumentationScope, resource pcommon.Resource, scopeSpans ptrace.ScopeSpans, resourceSpans ptrace.ResourceSpans, options ...TransformContextOption) TransformContext { + tc := TransformContext{ spanEvent: spanEvent, span: span, instrumentationScope: instrumentationScope, resource: resource, - cache: cache, + cache: pcommon.NewMap(), scopeSpans: scopeSpans, resouceSpans: resourceSpans, } + for _, opt := range options { + opt(&tc) + } + return tc +} + +// Experimental: *NOTE* this option is subject to change or removal in the future. +func WithCache(cache *pcommon.Map) TransformContextOption { + return func(p *TransformContext) { + if cache != nil { + p.cache = *cache + } + } } func (tCtx TransformContext) GetSpanEvent() ptrace.SpanEvent { @@ -219,11 +229,11 @@ func (pep *pathExpressionParser) parsePath(path ottl.Path[TransformContext]) (ot func (pep *pathExpressionParser) parseHigherContextPath(context string, path ottl.Path[TransformContext]) (ottl.GetSetter[TransformContext], error) { switch context { case internal.ResourceContextName: - return internal.ResourcePathGetSetter(path) + return internal.ResourcePathGetSetter(ContextName, path) case internal.InstrumentationScopeContextName: - return internal.ScopePathGetSetter(path) + return internal.ScopePathGetSetter(ContextName, path) case internal.SpanContextName: - return internal.SpanPathGetSetter(path) + return internal.SpanPathGetSetter(ContextName, path) default: var fullPath string if path != nil { diff --git a/processor/transformprocessor/config.go b/processor/transformprocessor/config.go index 4c6ea80ca39e..46f91d5bc10a 100644 --- a/processor/transformprocessor/config.go +++ b/processor/transformprocessor/config.go @@ -28,7 +28,7 @@ var ( featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/32080#issuecomment-2120764953"), ) errFlatLogsGateDisabled = errors.New("'flatten_data' requires the 'transform.flatten.logs' feature gate to be enabled") - configContextStatementsFields = []string{"trace_statements", "metric_statements", "log_statements"} + configContextStatementsFields = map[string]string{"trace_statements": "TraceStatements", "metric_statements": "MetricStatements", "log_statements": "LogStatements"} ) // Config defines the configuration for the processor. @@ -48,6 +48,16 @@ type Config struct { logger *zap.Logger } +// The Unmarshal function sets the [common.ContextStatements.SharedCache] field with reflection. +// These variables ensure that all required fields are still present, otherwise the Config +// unmarshalling would fail. +var ( + _ = common.ContextStatements{}.SharedCache + _ = Config{}.TraceStatements + _ = Config{}.MetricStatements + _ = Config{}.LogStatements +) + // Unmarshal is used internally by mapstructure to parse the transformprocessor configuration (Config), // adding support to structured and flat configuration styles (array of statements strings). // When the flat configuration style is used, each statement becomes a new common.ContextStatements @@ -59,33 +69,32 @@ func (c *Config) Unmarshal(component *confmap.Conf) error { return nil } + flatStatementsFieldsIndexes := map[string][]int{} contextStatementsPatch := map[string]any{} - for _, fieldName := range configContextStatementsFields { - if !component.IsSet(fieldName) { + for configName, structFieldName := range configContextStatementsFields { + if !component.IsSet(configName) { continue } - - rawVal := component.Get(fieldName) + rawVal := component.Get(configName) values, ok := rawVal.([]any) if !ok { - return fmt.Errorf("invalid %s type, expected: array, got: %t", fieldName, rawVal) + return fmt.Errorf("invalid %s type, expected: array, got: %t", configName, rawVal) } - if len(values) == 0 { continue } stmts := make([]any, 0, len(values)) - for _, value := range values { + for i, value := range values { // Array of strings means it's a flat configuration style if reflect.TypeOf(value).Kind() == reflect.String { stmts = append(stmts, map[string]any{"statements": []any{value}}) + flatStatementsFieldsIndexes[structFieldName] = append(flatStatementsFieldsIndexes[structFieldName], i) } else { stmts = append(stmts, value) } } - - contextStatementsPatch[fieldName] = stmts + contextStatementsPatch[configName] = stmts } if len(contextStatementsPatch) > 0 { @@ -95,7 +104,21 @@ func (c *Config) Unmarshal(component *confmap.Conf) error { } } - return component.Unmarshal(c) + err := component.Unmarshal(c) + if err != nil { + return err + } + + if len(flatStatementsFieldsIndexes) > 0 { + configValue := reflect.ValueOf(*c) + for fieldName, indexes := range flatStatementsFieldsIndexes { + for _, index := range indexes { + configValue.FieldByName(fieldName).Index(index).FieldByName("SharedCache").Set(reflect.ValueOf(true)) + } + } + } + + return nil } var _ component.Config = (*Config)(nil) diff --git a/processor/transformprocessor/config_test.go b/processor/transformprocessor/config_test.go index 6734927161da..e2e3684f5be1 100644 --- a/processor/transformprocessor/config_test.go +++ b/processor/transformprocessor/config_test.go @@ -153,26 +153,89 @@ func TestLoadConfig(t *testing.T) { ErrorMode: ottl.PropagateError, TraceStatements: []common.ContextStatements{ { - Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`}, + SharedCache: true, + Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`}, }, { - Statements: []string{`set(resource.attributes["name"], "bear")`}, + SharedCache: true, + Statements: []string{`set(resource.attributes["name"], "bear")`}, }, }, MetricStatements: []common.ContextStatements{ { - Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`}, + SharedCache: true, + Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`}, + }, + { + SharedCache: true, + Statements: []string{`set(resource.attributes["name"], "bear")`}, + }, + }, + LogStatements: []common.ContextStatements{ + { + SharedCache: true, + Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`}, + }, + { + SharedCache: true, + Statements: []string{`set(resource.attributes["name"], "bear")`}, + }, + }, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "structured_configuration_with_path_context"), + expected: &Config{ + ErrorMode: ottl.PropagateError, + TraceStatements: []common.ContextStatements{ + { + Context: "span", + Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`}, }, + }, + MetricStatements: []common.ContextStatements{ { - Statements: []string{`set(resource.attributes["name"], "bear")`}, + Context: "metric", + Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`}, }, }, LogStatements: []common.ContextStatements{ { + Context: "log", Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`}, }, + }, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "structured_configuration_with_inferred_context"), + expected: &Config{ + ErrorMode: ottl.PropagateError, + TraceStatements: []common.ContextStatements{ + { + SharedCache: false, + Statements: []string{ + `set(span.name, "bear") where span.attributes["http.path"] == "/animal"`, + `set(resource.attributes["name"], "bear")`, + }, + }, + }, + MetricStatements: []common.ContextStatements{ + { + SharedCache: false, + Statements: []string{ + `set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`, + `set(resource.attributes["name"], "bear")`, + }, + }, + }, + LogStatements: []common.ContextStatements{ { - Statements: []string{`set(resource.attributes["name"], "bear")`}, + SharedCache: false, + Statements: []string{ + `set(log.body, "bear") where log.attributes["http.path"] == "/animal"`, + `set(resource.attributes["name"], "bear")`, + }, }, }, }, @@ -183,7 +246,8 @@ func TestLoadConfig(t *testing.T) { ErrorMode: ottl.PropagateError, TraceStatements: []common.ContextStatements{ { - Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`}, + SharedCache: true, + Statements: []string{`set(span.name, "bear") where span.attributes["http.path"] == "/animal"`}, }, { Context: "span", @@ -192,10 +256,18 @@ func TestLoadConfig(t *testing.T) { `keep_keys(attributes, ["http.method", "http.path"])`, }, }, + { + Statements: []string{`set(span.attributes["name"], "lion")`}, + }, + { + SharedCache: true, + Statements: []string{`set(span.name, "lion") where span.attributes["http.path"] == "/animal"`}, + }, }, MetricStatements: []common.ContextStatements{ { - Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`}, + SharedCache: true, + Statements: []string{`set(metric.name, "bear") where resource.attributes["http.path"] == "/animal"`}, }, { Context: "resource", @@ -204,10 +276,18 @@ func TestLoadConfig(t *testing.T) { `keep_keys(attributes, ["http.method", "http.path"])`, }, }, + { + Statements: []string{`set(metric.name, "lion")`}, + }, + { + SharedCache: true, + Statements: []string{`set(metric.name, "lion") where resource.attributes["http.path"] == "/animal"`}, + }, }, LogStatements: []common.ContextStatements{ { - Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`}, + SharedCache: true, + Statements: []string{`set(log.body, "bear") where log.attributes["http.path"] == "/animal"`}, }, { Context: "resource", @@ -216,6 +296,49 @@ func TestLoadConfig(t *testing.T) { `keep_keys(attributes, ["http.method", "http.path"])`, }, }, + { + Statements: []string{`set(log.attributes["name"], "lion")`}, + }, + { + SharedCache: true, + Statements: []string{`set(log.body, "lion") where log.attributes["http.path"] == "/animal"`}, + }, + }, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "context_statements_error_mode"), + expected: &Config{ + ErrorMode: ottl.IgnoreError, + TraceStatements: []common.ContextStatements{ + { + Statements: []string{`set(resource.attributes["name"], "propagate")`}, + ErrorMode: ottl.PropagateError, + }, + { + Statements: []string{`set(resource.attributes["name"], "ignore")`}, + ErrorMode: "", + }, + }, + MetricStatements: []common.ContextStatements{ + { + Statements: []string{`set(resource.attributes["name"], "silent")`}, + ErrorMode: ottl.SilentError, + }, + { + Statements: []string{`set(resource.attributes["name"], "ignore")`}, + ErrorMode: "", + }, + }, + LogStatements: []common.ContextStatements{ + { + Statements: []string{`set(resource.attributes["name"], "propagate")`}, + ErrorMode: ottl.PropagateError, + }, + { + Statements: []string{`set(resource.attributes["name"], "ignore")`}, + ErrorMode: "", + }, }, }, }, diff --git a/processor/transformprocessor/internal/common/cache.go b/processor/transformprocessor/internal/common/cache.go index eba36a18654b..ab1b3138c602 100644 --- a/processor/transformprocessor/internal/common/cache.go +++ b/processor/transformprocessor/internal/common/cache.go @@ -4,21 +4,19 @@ package common // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" import ( - "context" - "go.opentelemetry.io/collector/pdata/pcommon" ) -type cacheContextKey struct{} - -func WithCache(ctx context.Context, cache *pcommon.Map) context.Context { - return context.WithValue(ctx, cacheContextKey{}, cache) -} - -func newCacheFrom(ctx context.Context) pcommon.Map { - cache := ctx.Value(cacheContextKey{}).(*pcommon.Map) - if cache != nil { - return *cache +func NewContextCache(cache map[ContextID]*pcommon.Map, context ContextID, sharedCache bool) *pcommon.Map { + if !sharedCache { + m := pcommon.NewMap() + return &m + } + existing, ok := cache[context] + if ok { + return existing } - return pcommon.NewMap() + m := pcommon.NewMap() + cache[context] = &m + return &m } diff --git a/processor/transformprocessor/internal/common/config.go b/processor/transformprocessor/internal/common/config.go index 79087389d644..801943c31ff0 100644 --- a/processor/transformprocessor/internal/common/config.go +++ b/processor/transformprocessor/internal/common/config.go @@ -39,6 +39,14 @@ type ContextStatements struct { Context ContextID `mapstructure:"context"` Conditions []string `mapstructure:"conditions"` Statements []string `mapstructure:"statements"` + // ErrorMode determines how the processor reacts to errors that occur while processing + // this group of statements. When set, it overrides the default Config ErrorMode. + ErrorMode ottl.ErrorMode `mapstructure:"error_mode"` + // SharedCache is experimental and subject to change or removal in the future, + // it cannot be configured via `mapstructure`, which means users won't be able + // to set it on the configuration. Instead, it's set by the transformprocessor + // Config unmarshaller. + SharedCache bool `mapstructure:"-"` } func (c ContextStatements) GetStatements() []string { diff --git a/processor/transformprocessor/internal/common/logs.go b/processor/transformprocessor/internal/common/logs.go index 4bb95b917190..2a3b88352992 100644 --- a/processor/transformprocessor/internal/common/logs.go +++ b/processor/transformprocessor/internal/common/logs.go @@ -7,7 +7,7 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr" @@ -16,27 +16,28 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottllog" ) -var _ consumer.Logs = &logStatements{} +type LogsConsumer interface { + Context() ContextID + ConsumeLogs(ctx context.Context, ld plog.Logs, cache *pcommon.Map) error +} type logStatements struct { ottl.StatementSequence[ottllog.TransformContext] expr.BoolExpr[ottllog.TransformContext] } -func (l logStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (l logStatements) Context() ContextID { + return Log } -func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { +func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs, cache *pcommon.Map) error { for i := 0; i < ld.ResourceLogs().Len(); i++ { rlogs := ld.ResourceLogs().At(i) for j := 0; j < rlogs.ScopeLogs().Len(); j++ { slogs := rlogs.ScopeLogs().At(j) logs := slogs.LogRecords() for k := 0; k < logs.Len(); k++ { - tCtx := ottllog.NewTransformContextWithCache(logs.At(k), slogs.Scope(), rlogs.Resource(), slogs, rlogs, newCacheFrom(ctx)) + tCtx := ottllog.NewTransformContext(logs.At(k), slogs.Scope(), rlogs.Resource(), slogs, rlogs, ottllog.WithCache(cache)) condition, err := l.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -53,12 +54,12 @@ func (l logStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { return nil } -type LogParserCollection ottl.ParserCollection[consumer.Logs] +type LogParserCollection ottl.ParserCollection[LogsConsumer] -type LogParserCollectionOption ottl.ParserCollectionOption[consumer.Logs] +type LogParserCollectionOption ottl.ParserCollectionOption[LogsConsumer] func WithLogParser(functions map[string]ottl.Factory[ottllog.TransformContext]) LogParserCollectionOption { - return func(pc *ottl.ParserCollection[consumer.Logs]) error { + return func(pc *ottl.ParserCollection[LogsConsumer]) error { logParser, err := ottllog.NewParser(functions, pc.Settings, ottllog.EnablePathContextNames()) if err != nil { return err @@ -68,17 +69,17 @@ func WithLogParser(functions map[string]ottl.Factory[ottllog.TransformContext]) } func WithLogErrorMode(errorMode ottl.ErrorMode) LogParserCollectionOption { - return LogParserCollectionOption(ottl.WithParserCollectionErrorMode[consumer.Logs](errorMode)) + return LogParserCollectionOption(ottl.WithParserCollectionErrorMode[LogsConsumer](errorMode)) } func NewLogParserCollection(settings component.TelemetrySettings, options ...LogParserCollectionOption) (*LogParserCollection, error) { - pcOptions := []ottl.ParserCollectionOption[consumer.Logs]{ - withCommonContextParsers[consumer.Logs](), - ottl.EnableParserCollectionModifiedStatementLogging[consumer.Logs](true), + pcOptions := []ottl.ParserCollectionOption[LogsConsumer]{ + withCommonContextParsers[LogsConsumer](), + ottl.EnableParserCollectionModifiedStatementLogging[LogsConsumer](true), } for _, option := range options { - pcOptions = append(pcOptions, ottl.ParserCollectionOption[consumer.Logs](option)) + pcOptions = append(pcOptions, ottl.ParserCollectionOption[LogsConsumer](option)) } pc, err := ottl.NewParserCollection(settings, pcOptions...) @@ -90,21 +91,29 @@ func NewLogParserCollection(settings component.TelemetrySettings, options ...Log return &lpc, nil } -func convertLogStatements(pc *ottl.ParserCollection[consumer.Logs], _ *ottl.Parser[ottllog.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottllog.TransformContext]) (consumer.Logs, error) { +func convertLogStatements(pc *ottl.ParserCollection[LogsConsumer], _ *ottl.Parser[ottllog.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottllog.TransformContext]) (LogsConsumer, error) { contextStatements, err := toContextStatements(statements) if err != nil { return nil, err } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLog, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardLogFuncs()) + errorMode := pc.ErrorMode + if contextStatements.ErrorMode != "" { + errorMode = contextStatements.ErrorMode + } + var parserOptions []ottllog.Option + if contextStatements.Context == "" { + parserOptions = append(parserOptions, ottllog.EnablePathContextNames()) + } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForLogWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardLogFuncs(), parserOptions) if errGlobalBoolExpr != nil { return nil, errGlobalBoolExpr } - lStatements := ottllog.NewStatementSequence(parsedStatements, pc.Settings, ottllog.WithStatementSequenceErrorMode(pc.ErrorMode)) + lStatements := ottllog.NewStatementSequence(parsedStatements, pc.Settings, ottllog.WithStatementSequenceErrorMode(errorMode)) return logStatements{lStatements, globalExpr}, nil } -func (lpc *LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Logs, error) { - pc := ottl.ParserCollection[consumer.Logs](*lpc) +func (lpc *LogParserCollection) ParseContextStatements(contextStatements ContextStatements) (LogsConsumer, error) { + pc := ottl.ParserCollection[LogsConsumer](*lpc) if contextStatements.Context != "" { return pc.ParseStatementsWithContext(string(contextStatements.Context), contextStatements, true) } diff --git a/processor/transformprocessor/internal/common/metrics.go b/processor/transformprocessor/internal/common/metrics.go index af3cfba220e8..8919df68d2a3 100644 --- a/processor/transformprocessor/internal/common/metrics.go +++ b/processor/transformprocessor/internal/common/metrics.go @@ -7,7 +7,6 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -18,27 +17,28 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlmetric" ) -var _ consumer.Metrics = &metricStatements{} +type MetricsConsumer interface { + Context() ContextID + ConsumeMetrics(ctx context.Context, md pmetric.Metrics, cache *pcommon.Map) error +} type metricStatements struct { ottl.StatementSequence[ottlmetric.TransformContext] expr.BoolExpr[ottlmetric.TransformContext] } -func (m metricStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (m metricStatements) Context() ContextID { + return Metric } -func (m metricStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { +func (m metricStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics, cache *pcommon.Map) error { for i := 0; i < md.ResourceMetrics().Len(); i++ { rmetrics := md.ResourceMetrics().At(i) for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { smetrics := rmetrics.ScopeMetrics().At(j) metrics := smetrics.Metrics() for k := 0; k < metrics.Len(); k++ { - tCtx := ottlmetric.NewTransformContextWithCache(metrics.At(k), smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, newCacheFrom(ctx)) + tCtx := ottlmetric.NewTransformContext(metrics.At(k), smetrics.Metrics(), smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, ottlmetric.WithCache(cache)) condition, err := m.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -55,20 +55,16 @@ func (m metricStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics return nil } -var _ consumer.Metrics = &dataPointStatements{} - type dataPointStatements struct { ottl.StatementSequence[ottldatapoint.TransformContext] expr.BoolExpr[ottldatapoint.TransformContext] } -func (d dataPointStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (d dataPointStatements) Context() ContextID { + return DataPoint } -func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { +func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics, cache *pcommon.Map) error { for i := 0; i < md.ResourceMetrics().Len(); i++ { rmetrics := md.ResourceMetrics().At(i) for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { @@ -76,19 +72,20 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr metrics := smetrics.Metrics() for k := 0; k < metrics.Len(); k++ { metric := metrics.At(k) + transformContextOptions := []ottldatapoint.TransformContextOption{ottldatapoint.WithCache(cache)} var err error //exhaustive:enforce switch metric.Type() { case pmetric.MetricTypeSum: - err = d.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics) + err = d.handleNumberDataPoints(ctx, metric.Sum().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, transformContextOptions) case pmetric.MetricTypeGauge: - err = d.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics) + err = d.handleNumberDataPoints(ctx, metric.Gauge().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, transformContextOptions) case pmetric.MetricTypeHistogram: - err = d.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics) + err = d.handleHistogramDataPoints(ctx, metric.Histogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, transformContextOptions) case pmetric.MetricTypeExponentialHistogram: - err = d.handleExponetialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics) + err = d.handleExponentialHistogramDataPoints(ctx, metric.ExponentialHistogram().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, transformContextOptions) case pmetric.MetricTypeSummary: - err = d.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics) + err = d.handleSummaryDataPoints(ctx, metric.Summary().DataPoints(), metrics.At(k), metrics, smetrics.Scope(), rmetrics.Resource(), smetrics, rmetrics, transformContextOptions) } if err != nil { return err @@ -99,9 +96,9 @@ func (d dataPointStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metr return nil } -func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) error { +func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pmetric.NumberDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics, options []ottldatapoint.TransformContextOption) error { for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContextWithCache(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics, newCacheFrom(ctx)) + tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics, options...) condition, err := d.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -116,9 +113,9 @@ func (d dataPointStatements) handleNumberDataPoints(ctx context.Context, dps pme return nil } -func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) error { +func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps pmetric.HistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics, options []ottldatapoint.TransformContextOption) error { for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContextWithCache(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics, newCacheFrom(ctx)) + tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics, options...) condition, err := d.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -133,9 +130,9 @@ func (d dataPointStatements) handleHistogramDataPoints(ctx context.Context, dps return nil } -func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) error { +func (d dataPointStatements) handleExponentialHistogramDataPoints(ctx context.Context, dps pmetric.ExponentialHistogramDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics, options []ottldatapoint.TransformContextOption) error { for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContextWithCache(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics, newCacheFrom(ctx)) + tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics, options...) condition, err := d.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -150,9 +147,9 @@ func (d dataPointStatements) handleExponetialHistogramDataPoints(ctx context.Con return nil } -func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics) error { +func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pmetric.SummaryDataPointSlice, metric pmetric.Metric, metrics pmetric.MetricSlice, is pcommon.InstrumentationScope, resource pcommon.Resource, scopeMetrics pmetric.ScopeMetrics, resourceMetrics pmetric.ResourceMetrics, options []ottldatapoint.TransformContextOption) error { for i := 0; i < dps.Len(); i++ { - tCtx := ottldatapoint.NewTransformContextWithCache(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics, newCacheFrom(ctx)) + tCtx := ottldatapoint.NewTransformContext(dps.At(i), metric, metrics, is, resource, scopeMetrics, resourceMetrics, options...) condition, err := d.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -167,12 +164,12 @@ func (d dataPointStatements) handleSummaryDataPoints(ctx context.Context, dps pm return nil } -type MetricParserCollection ottl.ParserCollection[consumer.Metrics] +type MetricParserCollection ottl.ParserCollection[MetricsConsumer] -type MetricParserCollectionOption ottl.ParserCollectionOption[consumer.Metrics] +type MetricParserCollectionOption ottl.ParserCollectionOption[MetricsConsumer] func WithMetricParser(functions map[string]ottl.Factory[ottlmetric.TransformContext]) MetricParserCollectionOption { - return func(pc *ottl.ParserCollection[consumer.Metrics]) error { + return func(pc *ottl.ParserCollection[MetricsConsumer]) error { metricParser, err := ottlmetric.NewParser(functions, pc.Settings, ottlmetric.EnablePathContextNames()) if err != nil { return err @@ -182,7 +179,7 @@ func WithMetricParser(functions map[string]ottl.Factory[ottlmetric.TransformCont } func WithDataPointParser(functions map[string]ottl.Factory[ottldatapoint.TransformContext]) MetricParserCollectionOption { - return func(pc *ottl.ParserCollection[consumer.Metrics]) error { + return func(pc *ottl.ParserCollection[MetricsConsumer]) error { dataPointParser, err := ottldatapoint.NewParser(functions, pc.Settings, ottldatapoint.EnablePathContextNames()) if err != nil { return err @@ -192,17 +189,17 @@ func WithDataPointParser(functions map[string]ottl.Factory[ottldatapoint.Transfo } func WithMetricErrorMode(errorMode ottl.ErrorMode) MetricParserCollectionOption { - return MetricParserCollectionOption(ottl.WithParserCollectionErrorMode[consumer.Metrics](errorMode)) + return MetricParserCollectionOption(ottl.WithParserCollectionErrorMode[MetricsConsumer](errorMode)) } func NewMetricParserCollection(settings component.TelemetrySettings, options ...MetricParserCollectionOption) (*MetricParserCollection, error) { - pcOptions := []ottl.ParserCollectionOption[consumer.Metrics]{ - withCommonContextParsers[consumer.Metrics](), - ottl.EnableParserCollectionModifiedStatementLogging[consumer.Metrics](true), + pcOptions := []ottl.ParserCollectionOption[MetricsConsumer]{ + withCommonContextParsers[MetricsConsumer](), + ottl.EnableParserCollectionModifiedStatementLogging[MetricsConsumer](true), } for _, option := range options { - pcOptions = append(pcOptions, ottl.ParserCollectionOption[consumer.Metrics](option)) + pcOptions = append(pcOptions, ottl.ParserCollectionOption[MetricsConsumer](option)) } pc, err := ottl.NewParserCollection(settings, pcOptions...) @@ -214,34 +211,50 @@ func NewMetricParserCollection(settings component.TelemetrySettings, options ... return &mpc, nil } -func convertMetricStatements(pc *ottl.ParserCollection[consumer.Metrics], _ *ottl.Parser[ottlmetric.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottlmetric.TransformContext]) (consumer.Metrics, error) { +func convertMetricStatements(pc *ottl.ParserCollection[MetricsConsumer], _ *ottl.Parser[ottlmetric.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottlmetric.TransformContext]) (MetricsConsumer, error) { contextStatements, err := toContextStatements(statements) if err != nil { return nil, err } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForMetric, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardMetricFuncs()) + errorMode := pc.ErrorMode + if contextStatements.ErrorMode != "" { + errorMode = contextStatements.ErrorMode + } + var parserOptions []ottlmetric.Option + if contextStatements.Context == "" { + parserOptions = append(parserOptions, ottlmetric.EnablePathContextNames()) + } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForMetricWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardMetricFuncs(), parserOptions) if errGlobalBoolExpr != nil { return nil, errGlobalBoolExpr } - mStatements := ottlmetric.NewStatementSequence(parsedStatements, pc.Settings, ottlmetric.WithStatementSequenceErrorMode(pc.ErrorMode)) + mStatements := ottlmetric.NewStatementSequence(parsedStatements, pc.Settings, ottlmetric.WithStatementSequenceErrorMode(errorMode)) return metricStatements{mStatements, globalExpr}, nil } -func convertDataPointStatements(pc *ottl.ParserCollection[consumer.Metrics], _ *ottl.Parser[ottldatapoint.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottldatapoint.TransformContext]) (consumer.Metrics, error) { +func convertDataPointStatements(pc *ottl.ParserCollection[MetricsConsumer], _ *ottl.Parser[ottldatapoint.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottldatapoint.TransformContext]) (MetricsConsumer, error) { contextStatements, err := toContextStatements(statements) if err != nil { return nil, err } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForDataPoint, contextStatements.Conditions, pc.ErrorMode, pc.Settings, filterottl.StandardDataPointFuncs()) + errorMode := pc.ErrorMode + if contextStatements.ErrorMode != "" { + errorMode = contextStatements.ErrorMode + } + var parserOptions []ottldatapoint.Option + if contextStatements.Context == "" { + parserOptions = append(parserOptions, ottldatapoint.EnablePathContextNames()) + } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForDataPointWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardDataPointFuncs(), parserOptions) if errGlobalBoolExpr != nil { return nil, errGlobalBoolExpr } - dpStatements := ottldatapoint.NewStatementSequence(parsedStatements, pc.Settings, ottldatapoint.WithStatementSequenceErrorMode(pc.ErrorMode)) + dpStatements := ottldatapoint.NewStatementSequence(parsedStatements, pc.Settings, ottldatapoint.WithStatementSequenceErrorMode(errorMode)) return dataPointStatements{dpStatements, globalExpr}, nil } -func (mpc *MetricParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Metrics, error) { - pc := ottl.ParserCollection[consumer.Metrics](*mpc) +func (mpc *MetricParserCollection) ParseContextStatements(contextStatements ContextStatements) (MetricsConsumer, error) { + pc := ottl.ParserCollection[MetricsConsumer](*mpc) if contextStatements.Context != "" { return pc.ParseStatementsWithContext(string(contextStatements.Context), contextStatements, true) } diff --git a/processor/transformprocessor/internal/common/processor.go b/processor/transformprocessor/internal/common/processor.go index 625f5bf6aa91..10d45afb197e 100644 --- a/processor/transformprocessor/internal/common/processor.go +++ b/processor/transformprocessor/internal/common/processor.go @@ -7,7 +7,7 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -19,28 +19,21 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlscope" ) -var ( - _ consumer.Traces = &resourceStatements{} - _ consumer.Metrics = &resourceStatements{} - _ consumer.Logs = &resourceStatements{} - _ baseContext = &resourceStatements{} -) +var _ baseContext = &resourceStatements{} type resourceStatements struct { ottl.StatementSequence[ottlresource.TransformContext] expr.BoolExpr[ottlresource.TransformContext] } -func (r resourceStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (r resourceStatements) Context() ContextID { + return Resource } -func (r resourceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { +func (r resourceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces, cache *pcommon.Map) error { for i := 0; i < td.ResourceSpans().Len(); i++ { rspans := td.ResourceSpans().At(i) - tCtx := ottlresource.NewTransformContextWithCache(rspans.Resource(), rspans, newCacheFrom(ctx)) + tCtx := ottlresource.NewTransformContext(rspans.Resource(), rspans, ottlresource.WithCache(cache)) condition, err := r.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -55,10 +48,10 @@ func (r resourceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) return nil } -func (r resourceStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { +func (r resourceStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics, cache *pcommon.Map) error { for i := 0; i < md.ResourceMetrics().Len(); i++ { rmetrics := md.ResourceMetrics().At(i) - tCtx := ottlresource.NewTransformContextWithCache(rmetrics.Resource(), rmetrics, newCacheFrom(ctx)) + tCtx := ottlresource.NewTransformContext(rmetrics.Resource(), rmetrics, ottlresource.WithCache(cache)) condition, err := r.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -73,10 +66,10 @@ func (r resourceStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metri return nil } -func (r resourceStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { +func (r resourceStatements) ConsumeLogs(ctx context.Context, ld plog.Logs, cache *pcommon.Map) error { for i := 0; i < ld.ResourceLogs().Len(); i++ { rlogs := ld.ResourceLogs().At(i) - tCtx := ottlresource.NewTransformContextWithCache(rlogs.Resource(), rlogs, newCacheFrom(ctx)) + tCtx := ottlresource.NewTransformContext(rlogs.Resource(), rlogs, ottlresource.WithCache(cache)) condition, err := r.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -91,30 +84,23 @@ func (r resourceStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error return nil } -var ( - _ consumer.Traces = &scopeStatements{} - _ consumer.Metrics = &scopeStatements{} - _ consumer.Logs = &scopeStatements{} - _ baseContext = &scopeStatements{} -) +var _ baseContext = &scopeStatements{} type scopeStatements struct { ottl.StatementSequence[ottlscope.TransformContext] expr.BoolExpr[ottlscope.TransformContext] } -func (s scopeStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (s scopeStatements) Context() ContextID { + return Scope } -func (s scopeStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { +func (s scopeStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces, cache *pcommon.Map) error { for i := 0; i < td.ResourceSpans().Len(); i++ { rspans := td.ResourceSpans().At(i) for j := 0; j < rspans.ScopeSpans().Len(); j++ { sspans := rspans.ScopeSpans().At(j) - tCtx := ottlscope.NewTransformContextWithCache(sspans.Scope(), rspans.Resource(), sspans, newCacheFrom(ctx)) + tCtx := ottlscope.NewTransformContext(sspans.Scope(), rspans.Resource(), sspans, ottlscope.WithCache(cache)) condition, err := s.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -130,12 +116,12 @@ func (s scopeStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) er return nil } -func (s scopeStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) error { +func (s scopeStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics, cache *pcommon.Map) error { for i := 0; i < md.ResourceMetrics().Len(); i++ { rmetrics := md.ResourceMetrics().At(i) for j := 0; j < rmetrics.ScopeMetrics().Len(); j++ { smetrics := rmetrics.ScopeMetrics().At(j) - tCtx := ottlscope.NewTransformContextWithCache(smetrics.Scope(), rmetrics.Resource(), smetrics, newCacheFrom(ctx)) + tCtx := ottlscope.NewTransformContext(smetrics.Scope(), rmetrics.Resource(), smetrics, ottlscope.WithCache(cache)) condition, err := s.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -151,12 +137,12 @@ func (s scopeStatements) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) return nil } -func (s scopeStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { +func (s scopeStatements) ConsumeLogs(ctx context.Context, ld plog.Logs, cache *pcommon.Map) error { for i := 0; i < ld.ResourceLogs().Len(); i++ { rlogs := ld.ResourceLogs().At(i) for j := 0; j < rlogs.ScopeLogs().Len(); j++ { slogs := rlogs.ScopeLogs().At(j) - tCtx := ottlscope.NewTransformContextWithCache(slogs.Scope(), rlogs.Resource(), slogs, newCacheFrom(ctx)) + tCtx := ottlscope.NewTransformContext(slogs.Scope(), rlogs.Resource(), slogs, ottlscope.WithCache(cache)) condition, err := s.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -173,9 +159,9 @@ func (s scopeStatements) ConsumeLogs(ctx context.Context, ld plog.Logs) error { } type baseContext interface { - consumer.Traces - consumer.Metrics - consumer.Logs + TracesConsumer + MetricsConsumer + LogsConsumer } func withCommonContextParsers[R any]() ottl.ParserCollectionOption[R] { @@ -204,7 +190,7 @@ func withCommonContextParsers[R any]() ottl.ParserCollectionOption[R] { } func parseResourceContextStatements[R any]( - collection *ottl.ParserCollection[R], + pc *ottl.ParserCollection[R], _ *ottl.Parser[ottlresource.TransformContext], _ string, statements ottl.StatementsGetter, @@ -214,17 +200,25 @@ func parseResourceContextStatements[R any]( if err != nil { return *new(R), err } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForResource, contextStatements.Conditions, collection.ErrorMode, collection.Settings, filterottl.StandardResourceFuncs()) + errorMode := pc.ErrorMode + if contextStatements.ErrorMode != "" { + errorMode = contextStatements.ErrorMode + } + var parserOptions []ottlresource.Option + if contextStatements.Context == "" { + parserOptions = append(parserOptions, ottlresource.EnablePathContextNames()) + } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForResourceWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardResourceFuncs(), parserOptions) if errGlobalBoolExpr != nil { return *new(R), errGlobalBoolExpr } - rStatements := ottlresource.NewStatementSequence(parsedStatements, collection.Settings, ottlresource.WithStatementSequenceErrorMode(collection.ErrorMode)) + rStatements := ottlresource.NewStatementSequence(parsedStatements, pc.Settings, ottlresource.WithStatementSequenceErrorMode(errorMode)) result := (baseContext)(resourceStatements{rStatements, globalExpr}) return result.(R), nil } func parseScopeContextStatements[R any]( - collection *ottl.ParserCollection[R], + pc *ottl.ParserCollection[R], _ *ottl.Parser[ottlscope.TransformContext], _ string, statements ottl.StatementsGetter, @@ -234,24 +228,33 @@ func parseScopeContextStatements[R any]( if err != nil { return *new(R), err } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForScope, contextStatements.Conditions, collection.ErrorMode, collection.Settings, filterottl.StandardScopeFuncs()) + errorMode := pc.ErrorMode + if contextStatements.ErrorMode != "" { + errorMode = contextStatements.ErrorMode + } + var parserOptions []ottlscope.Option + if contextStatements.Context == "" { + parserOptions = append(parserOptions, ottlscope.EnablePathContextNames()) + } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForScopeWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardScopeFuncs(), parserOptions) if errGlobalBoolExpr != nil { return *new(R), errGlobalBoolExpr } - sStatements := ottlscope.NewStatementSequence(parsedStatements, collection.Settings, ottlscope.WithStatementSequenceErrorMode(collection.ErrorMode)) + sStatements := ottlscope.NewStatementSequence(parsedStatements, pc.Settings, ottlscope.WithStatementSequenceErrorMode(errorMode)) result := (baseContext)(scopeStatements{sStatements, globalExpr}) return result.(R), nil } -func parseGlobalExpr[K any]( - boolExprFunc func([]string, map[string]ottl.Factory[K], ottl.ErrorMode, component.TelemetrySettings) (*ottl.ConditionSequence[K], error), +func parseGlobalExpr[K any, O any]( + boolExprFunc func([]string, map[string]ottl.Factory[K], ottl.ErrorMode, component.TelemetrySettings, []O) (*ottl.ConditionSequence[K], error), conditions []string, errorMode ottl.ErrorMode, settings component.TelemetrySettings, standardFuncs map[string]ottl.Factory[K], + parserOptions []O, ) (expr.BoolExpr[K], error) { if len(conditions) > 0 { - return boolExprFunc(conditions, standardFuncs, errorMode, settings) + return boolExprFunc(conditions, standardFuncs, errorMode, settings, parserOptions) } // By default, set the global expression to always true unless conditions are specified. return expr.AlwaysTrue[K](), nil diff --git a/processor/transformprocessor/internal/common/traces.go b/processor/transformprocessor/internal/common/traces.go index 40f1f57c8678..7719e563e5f6 100644 --- a/processor/transformprocessor/internal/common/traces.go +++ b/processor/transformprocessor/internal/common/traces.go @@ -7,7 +7,7 @@ import ( "context" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/expr" @@ -17,27 +17,28 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl/contexts/ottlspanevent" ) -var _ consumer.Traces = &traceStatements{} +type TracesConsumer interface { + Context() ContextID + ConsumeTraces(ctx context.Context, td ptrace.Traces, cache *pcommon.Map) error +} type traceStatements struct { ottl.StatementSequence[ottlspan.TransformContext] expr.BoolExpr[ottlspan.TransformContext] } -func (t traceStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (t traceStatements) Context() ContextID { + return Span } -func (t traceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { +func (t traceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces, cache *pcommon.Map) error { for i := 0; i < td.ResourceSpans().Len(); i++ { rspans := td.ResourceSpans().At(i) for j := 0; j < rspans.ScopeSpans().Len(); j++ { sspans := rspans.ScopeSpans().At(j) spans := sspans.Spans() for k := 0; k < spans.Len(); k++ { - tCtx := ottlspan.NewTransformContextWithCache(spans.At(k), sspans.Scope(), rspans.Resource(), sspans, rspans, newCacheFrom(ctx)) + tCtx := ottlspan.NewTransformContext(spans.At(k), sspans.Scope(), rspans.Resource(), sspans, rspans, ottlspan.WithCache(cache)) condition, err := t.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -54,20 +55,16 @@ func (t traceStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) er return nil } -var _ consumer.Traces = &spanEventStatements{} - type spanEventStatements struct { ottl.StatementSequence[ottlspanevent.TransformContext] expr.BoolExpr[ottlspanevent.TransformContext] } -func (s spanEventStatements) Capabilities() consumer.Capabilities { - return consumer.Capabilities{ - MutatesData: true, - } +func (s spanEventStatements) Context() ContextID { + return SpanEvent } -func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { +func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces, cache *pcommon.Map) error { for i := 0; i < td.ResourceSpans().Len(); i++ { rspans := td.ResourceSpans().At(i) for j := 0; j < rspans.ScopeSpans().Len(); j++ { @@ -77,7 +74,7 @@ func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces span := spans.At(k) spanEvents := span.Events() for n := 0; n < spanEvents.Len(); n++ { - tCtx := ottlspanevent.NewTransformContextWithCache(spanEvents.At(n), span, sspans.Scope(), rspans.Resource(), sspans, rspans, newCacheFrom(ctx)) + tCtx := ottlspanevent.NewTransformContext(spanEvents.At(n), span, sspans.Scope(), rspans.Resource(), sspans, rspans, ottlspanevent.WithCache(cache)) condition, err := s.BoolExpr.Eval(ctx, tCtx) if err != nil { return err @@ -95,12 +92,12 @@ func (s spanEventStatements) ConsumeTraces(ctx context.Context, td ptrace.Traces return nil } -type TraceParserCollection ottl.ParserCollection[consumer.Traces] +type TraceParserCollection ottl.ParserCollection[TracesConsumer] -type TraceParserCollectionOption ottl.ParserCollectionOption[consumer.Traces] +type TraceParserCollectionOption ottl.ParserCollectionOption[TracesConsumer] func WithSpanParser(functions map[string]ottl.Factory[ottlspan.TransformContext]) TraceParserCollectionOption { - return func(pc *ottl.ParserCollection[consumer.Traces]) error { + return func(pc *ottl.ParserCollection[TracesConsumer]) error { parser, err := ottlspan.NewParser(functions, pc.Settings, ottlspan.EnablePathContextNames()) if err != nil { return err @@ -110,7 +107,7 @@ func WithSpanParser(functions map[string]ottl.Factory[ottlspan.TransformContext] } func WithSpanEventParser(functions map[string]ottl.Factory[ottlspanevent.TransformContext]) TraceParserCollectionOption { - return func(pc *ottl.ParserCollection[consumer.Traces]) error { + return func(pc *ottl.ParserCollection[TracesConsumer]) error { parser, err := ottlspanevent.NewParser(functions, pc.Settings, ottlspanevent.EnablePathContextNames()) if err != nil { return err @@ -120,17 +117,17 @@ func WithSpanEventParser(functions map[string]ottl.Factory[ottlspanevent.Transfo } func WithTraceErrorMode(errorMode ottl.ErrorMode) TraceParserCollectionOption { - return TraceParserCollectionOption(ottl.WithParserCollectionErrorMode[consumer.Traces](errorMode)) + return TraceParserCollectionOption(ottl.WithParserCollectionErrorMode[TracesConsumer](errorMode)) } func NewTraceParserCollection(settings component.TelemetrySettings, options ...TraceParserCollectionOption) (*TraceParserCollection, error) { - pcOptions := []ottl.ParserCollectionOption[consumer.Traces]{ - withCommonContextParsers[consumer.Traces](), - ottl.EnableParserCollectionModifiedStatementLogging[consumer.Traces](true), + pcOptions := []ottl.ParserCollectionOption[TracesConsumer]{ + withCommonContextParsers[TracesConsumer](), + ottl.EnableParserCollectionModifiedStatementLogging[TracesConsumer](true), } for _, option := range options { - pcOptions = append(pcOptions, ottl.ParserCollectionOption[consumer.Traces](option)) + pcOptions = append(pcOptions, ottl.ParserCollectionOption[TracesConsumer](option)) } pc, err := ottl.NewParserCollection(settings, pcOptions...) @@ -142,34 +139,50 @@ func NewTraceParserCollection(settings component.TelemetrySettings, options ...T return &tpc, nil } -func convertSpanStatements(collection *ottl.ParserCollection[consumer.Traces], _ *ottl.Parser[ottlspan.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottlspan.TransformContext]) (consumer.Traces, error) { +func convertSpanStatements(pc *ottl.ParserCollection[TracesConsumer], _ *ottl.Parser[ottlspan.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottlspan.TransformContext]) (TracesConsumer, error) { contextStatements, err := toContextStatements(statements) if err != nil { return nil, err } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpan, contextStatements.Conditions, collection.ErrorMode, collection.Settings, filterottl.StandardSpanFuncs()) + errorMode := pc.ErrorMode + if contextStatements.ErrorMode != "" { + errorMode = contextStatements.ErrorMode + } + var parserOptions []ottlspan.Option + if contextStatements.Context == "" { + parserOptions = append(parserOptions, ottlspan.EnablePathContextNames()) + } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpanWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardSpanFuncs(), parserOptions) if errGlobalBoolExpr != nil { return nil, errGlobalBoolExpr } - sStatements := ottlspan.NewStatementSequence(parsedStatements, collection.Settings, ottlspan.WithStatementSequenceErrorMode(collection.ErrorMode)) + sStatements := ottlspan.NewStatementSequence(parsedStatements, pc.Settings, ottlspan.WithStatementSequenceErrorMode(errorMode)) return traceStatements{sStatements, globalExpr}, nil } -func convertSpanEventStatements(collection *ottl.ParserCollection[consumer.Traces], _ *ottl.Parser[ottlspanevent.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottlspanevent.TransformContext]) (consumer.Traces, error) { +func convertSpanEventStatements(pc *ottl.ParserCollection[TracesConsumer], _ *ottl.Parser[ottlspanevent.TransformContext], _ string, statements ottl.StatementsGetter, parsedStatements []*ottl.Statement[ottlspanevent.TransformContext]) (TracesConsumer, error) { contextStatements, err := toContextStatements(statements) if err != nil { return nil, err } - globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpanEvent, contextStatements.Conditions, collection.ErrorMode, collection.Settings, filterottl.StandardSpanEventFuncs()) + errorMode := pc.ErrorMode + if contextStatements.ErrorMode != "" { + errorMode = contextStatements.ErrorMode + } + var parserOptions []ottlspanevent.Option + if contextStatements.Context == "" { + parserOptions = append(parserOptions, ottlspanevent.EnablePathContextNames()) + } + globalExpr, errGlobalBoolExpr := parseGlobalExpr(filterottl.NewBoolExprForSpanEventWithOptions, contextStatements.Conditions, errorMode, pc.Settings, filterottl.StandardSpanEventFuncs(), parserOptions) if errGlobalBoolExpr != nil { return nil, errGlobalBoolExpr } - seStatements := ottlspanevent.NewStatementSequence(parsedStatements, collection.Settings, ottlspanevent.WithStatementSequenceErrorMode(collection.ErrorMode)) + seStatements := ottlspanevent.NewStatementSequence(parsedStatements, pc.Settings, ottlspanevent.WithStatementSequenceErrorMode(errorMode)) return spanEventStatements{seStatements, globalExpr}, nil } -func (tpc *TraceParserCollection) ParseContextStatements(contextStatements ContextStatements) (consumer.Traces, error) { - pc := ottl.ParserCollection[consumer.Traces](*tpc) +func (tpc *TraceParserCollection) ParseContextStatements(contextStatements ContextStatements) (TracesConsumer, error) { + pc := ottl.ParserCollection[TracesConsumer](*tpc) if contextStatements.Context != "" { return pc.ParseStatementsWithContext(string(contextStatements.Context), contextStatements, true) } diff --git a/processor/transformprocessor/internal/logs/processor.go b/processor/transformprocessor/internal/logs/processor.go index 9a4cf86fa7a3..883b276505cb 100644 --- a/processor/transformprocessor/internal/logs/processor.go +++ b/processor/transformprocessor/internal/logs/processor.go @@ -5,10 +5,8 @@ package logs // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "context" - "reflect" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/multierr" @@ -19,8 +17,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) +type parsedContextStatements struct { + common.LogsConsumer + sharedCache bool +} + type Processor struct { - contexts []consumer.Logs + contexts []parsedContextStatements logger *zap.Logger flatMode bool } @@ -31,14 +34,14 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E return nil, err } - contexts := make([]consumer.Logs, len(contextStatements)) + contexts := make([]parsedContextStatements, len(contextStatements)) var errors error for i, cs := range contextStatements { context, err := pc.ParseContextStatements(cs) if err != nil { errors = multierr.Append(errors, err) } - contexts[i] = context + contexts[i] = parsedContextStatements{context, cs.SharedCache} } if errors != nil { @@ -58,16 +61,10 @@ func (p *Processor) ProcessLogs(ctx context.Context, ld plog.Logs) (plog.Logs, e defer pdatautil.GroupByResourceLogs(ld.ResourceLogs()) } - contextCache := make(map[string]*pcommon.Map, len(p.contexts)) + sharedContextCache := make(map[common.ContextID]*pcommon.Map, len(p.contexts)) for _, c := range p.contexts { - cacheKey := reflect.TypeOf(c).String() - cache, ok := contextCache[cacheKey] - if !ok { - m := pcommon.NewMap() - cache = &m - contextCache[cacheKey] = cache - } - err := c.ConsumeLogs(common.WithCache(ctx, cache), ld) + cache := common.NewContextCache(sharedContextCache, c.Context(), c.sharedCache) + err := c.ConsumeLogs(ctx, ld, cache) if err != nil { p.logger.Error("failed processing logs", zap.Error(err)) return ld, err diff --git a/processor/transformprocessor/internal/logs/processor_test.go b/processor/transformprocessor/internal/logs/processor_test.go index ddda184abccb..a6861428fb34 100644 --- a/processor/transformprocessor/internal/logs/processor_test.go +++ b/processor/transformprocessor/internal/logs/processor_test.go @@ -5,10 +5,12 @@ package logs import ( "context" + "fmt" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -907,7 +909,7 @@ func Test_ProcessLogs_InferredMixContext(t *testing.T) { } } -func Test_ProcessLogs_Error(t *testing.T) { +func Test_ProcessLogs_ErrorMode(t *testing.T) { tests := []struct { statement string context common.ContextID @@ -935,6 +937,97 @@ func Test_ProcessLogs_Error(t *testing.T) { } } +func Test_ProcessLogs_StatementsErrorMode(t *testing.T) { + tests := []struct { + name string + errorMode ottl.ErrorMode + statements []common.ContextStatements + want func(td plog.Logs) + wantErrorWith string + }{ + { + name: "log: statements group with error mode", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(log.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(log.attributes["test"], "pass") where log.body == "operationA"`}}, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + name: "log: statements group error mode does not affect default", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(log.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(log.attributes["pass"], ParseJSON(true))`}}, + }, + wantErrorWith: "expected string but got bool", + }, + { + name: "resource: statements group with error mode", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(resource.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(resource.attributes["test"], "pass")`}}, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + name: "resource: statements group error mode does not affect default", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(resource.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(resource.attributes["pass"], ParseJSON(true))`}}, + }, + wantErrorWith: "expected string but got bool", + }, + { + name: "scope: statements group with error mode", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(scope.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(scope.attributes["test"], "pass")`}}, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "scope: statements group error mode does not affect default", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(scope.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(scope.attributes["pass"], ParseJSON(true))`}}, + }, + wantErrorWith: "expected string but got bool", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + td := constructLogs() + processor, err := NewProcessor(tt.statements, tt.errorMode, false, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + _, err = processor.ProcessLogs(context.Background(), td) + if tt.wantErrorWith != "" { + if err == nil { + t.Errorf("expected error containing '%s', got: ", tt.wantErrorWith) + } + assert.Contains(t, err.Error(), tt.wantErrorWith) + return + } + assert.NoError(t, err) + exTd := constructLogs() + tt.want(exTd) + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessLogs_CacheAccess(t *testing.T) { tests := []struct { name string @@ -944,8 +1037,8 @@ func Test_ProcessLogs_CacheAccess(t *testing.T) { { name: "resource:resource.cache", statements: []common.ContextStatements{ - {Statements: []string{`set(resource.cache["test"], "pass")`}}, - {Statements: []string{`set(resource.attributes["test"], resource.cache["test"])`}}, + {Statements: []string{`set(resource.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(resource.attributes["test"], resource.cache["test"])`}, SharedCache: true}, }, want: func(td plog.Logs) { td.ResourceLogs().At(0).Resource().Attributes().PutStr("test", "pass") @@ -969,8 +1062,8 @@ func Test_ProcessLogs_CacheAccess(t *testing.T) { { name: "scope:scope.cache", statements: []common.ContextStatements{ - {Statements: []string{`set(scope.cache["test"], "pass")`}}, - {Statements: []string{`set(scope.attributes["test"], scope.cache["test"])`}}, + {Statements: []string{`set(scope.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(scope.attributes["test"], scope.cache["test"])`}, SharedCache: true}, }, want: func(td plog.Logs) { td.ResourceLogs().At(0).ScopeLogs().At(0).Scope().Attributes().PutStr("test", "pass") @@ -992,8 +1085,8 @@ func Test_ProcessLogs_CacheAccess(t *testing.T) { { name: "log:log.cache", statements: []common.ContextStatements{ - {Statements: []string{`set(log.cache["test"], "pass")`}}, - {Statements: []string{`set(log.attributes["test"], log.cache["test"])`}}, + {Statements: []string{`set(log.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(log.attributes["test"], log.cache["test"])`}, SharedCache: true}, }, want: func(td plog.Logs) { td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") @@ -1014,6 +1107,39 @@ func Test_ProcessLogs_CacheAccess(t *testing.T) { td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") }, }, + { + name: "cache isolation", + statements: []common.ContextStatements{ + { + Statements: []string{`set(log.cache["shared"], "fail")`}, + SharedCache: true, + }, + { + Statements: []string{ + `set(log.cache["test"], "pass")`, + `set(log.attributes["test"], log.cache["test"])`, + `set(log.attributes["test"], log.cache["shared"])`, + }, + }, + { + Context: common.Log, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + `set(attributes["test"], cache["shared"])`, + `set(attributes["test"], log.cache["shared"])`, + }, + }, + { + Statements: []string{`set(log.attributes["test"], "pass") where log.cache["shared"] == "fail"`}, + SharedCache: true, + }, + }, + want: func(td plog.Logs) { + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Attributes().PutStr("test", "pass") + td.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(1).Attributes().PutStr("test", "pass") + }, + }, } for _, tt := range tests { @@ -1033,6 +1159,78 @@ func Test_ProcessLogs_CacheAccess(t *testing.T) { } } +func Test_NewProcessor_ConditionsParse(t *testing.T) { + type testCase struct { + name string + statements []common.ContextStatements + wantErrorWith string + } + + contextsTests := map[string][]testCase{"log": nil, "resource": nil, "scope": nil} + for ctx := range contextsTests { + contextsTests[ctx] = []testCase{ + { + name: "inferred: condition with context", + statements: []common.ContextStatements{ + { + Statements: []string{fmt.Sprintf(`set(%s.cache["test"], "pass")`, ctx)}, + Conditions: []string{fmt.Sprintf(`%s.cache["test"] == ""`, ctx)}, + }, + }, + }, + { + name: "inferred: condition without context", + statements: []common.ContextStatements{ + { + Statements: []string{fmt.Sprintf(`set(%s.cache["test"], "pass")`, ctx)}, + Conditions: []string{`cache["test"] == ""`}, + }, + }, + wantErrorWith: `missing context name for path "cache[test]"`, + }, + { + name: "context defined: condition without context", + statements: []common.ContextStatements{ + { + Context: common.ContextID(ctx), + Statements: []string{`set(cache["test"], "pass")`}, + Conditions: []string{`cache["test"] == ""`}, + }, + }, + }, + { + name: "context defined: condition with context", + statements: []common.ContextStatements{ + { + Context: common.ContextID(ctx), + Statements: []string{`set(attributes["test"], "pass")`}, + Conditions: []string{fmt.Sprintf(`%s.cache["test"] == ""`, ctx)}, + }, + }, + wantErrorWith: fmt.Sprintf(`segment "%s" from path "%[1]s.cache[test]" is not a valid path`, ctx), + }, + } + } + + for ctx, tests := range contextsTests { + t.Run(ctx, func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NewProcessor(tt.statements, ottl.PropagateError, false, componenttest.NewNopTelemetrySettings()) + if tt.wantErrorWith != "" { + if err == nil { + t.Errorf("expected error containing '%s', got: ", tt.wantErrorWith) + } + assert.Contains(t, err.Error(), tt.wantErrorWith) + return + } + require.NoError(t, err) + }) + } + }) + } +} + func constructLogs() plog.Logs { td := plog.NewLogs() rs0 := td.ResourceLogs().AppendEmpty() diff --git a/processor/transformprocessor/internal/metrics/processor.go b/processor/transformprocessor/internal/metrics/processor.go index e41682bca3bd..baa26c08a44b 100644 --- a/processor/transformprocessor/internal/metrics/processor.go +++ b/processor/transformprocessor/internal/metrics/processor.go @@ -5,10 +5,8 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-con import ( "context" - "reflect" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" "go.uber.org/multierr" @@ -18,8 +16,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) +type parsedContextStatements struct { + common.MetricsConsumer + sharedCache bool +} + type Processor struct { - contexts []consumer.Metrics + contexts []parsedContextStatements logger *zap.Logger } @@ -29,14 +32,14 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E return nil, err } - contexts := make([]consumer.Metrics, len(contextStatements)) + contexts := make([]parsedContextStatements, len(contextStatements)) var errors error for i, cs := range contextStatements { context, err := pc.ParseContextStatements(cs) if err != nil { errors = multierr.Append(errors, err) } - contexts[i] = context + contexts[i] = parsedContextStatements{context, cs.SharedCache} } if errors != nil { @@ -50,16 +53,10 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E } func (p *Processor) ProcessMetrics(ctx context.Context, md pmetric.Metrics) (pmetric.Metrics, error) { - contextCache := make(map[string]*pcommon.Map, len(p.contexts)) + sharedContextCache := make(map[common.ContextID]*pcommon.Map, len(p.contexts)) for _, c := range p.contexts { - cacheKey := reflect.TypeOf(c).String() - cache, ok := contextCache[cacheKey] - if !ok { - m := pcommon.NewMap() - cache = &m - contextCache[cacheKey] = cache - } - err := c.ConsumeMetrics(common.WithCache(ctx, cache), md) + cache := common.NewContextCache(sharedContextCache, c.Context(), c.sharedCache) + err := c.ConsumeMetrics(ctx, md, cache) if err != nil { p.logger.Error("failed processing metrics", zap.Error(err)) return md, err diff --git a/processor/transformprocessor/internal/metrics/processor_test.go b/processor/transformprocessor/internal/metrics/processor_test.go index 9c7223efeca6..50ab65a10ba7 100644 --- a/processor/transformprocessor/internal/metrics/processor_test.go +++ b/processor/transformprocessor/internal/metrics/processor_test.go @@ -5,10 +5,12 @@ package metrics import ( "context" + "fmt" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -1572,7 +1574,7 @@ func Test_ProcessMetrics_MixContext(t *testing.T) { } } -func Test_ProcessMetrics_Error(t *testing.T) { +func Test_ProcessMetrics_ErrorMode(t *testing.T) { tests := []struct { statement string context common.ContextID @@ -1607,6 +1609,118 @@ func Test_ProcessMetrics_Error(t *testing.T) { } } +func Test_ProcessMetrics_StatementsErrorMode(t *testing.T) { + tests := []struct { + name string + errorMode ottl.ErrorMode + statements []common.ContextStatements + want func(td pmetric.Metrics) + wantErrorWith string + }{ + { + name: "metric: statements group with error mode", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(metric.name, ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(metric.name, "pass") where metric.name == "operationA" `}}, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).SetName("pass") + }, + }, + { + name: "metric: statements group error mode does not affect default", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(metric.name, ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(metric.name, ParseJSON(true))`}}, + }, + wantErrorWith: "expected string but got bool", + }, + { + name: "datapoint: statements group with error mode", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(datapoint.attributes["test"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(datapoint.attributes["test"], "pass") where metric.name == "operationA" `}}, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") + }, + }, + { + name: "datapoint: statements group error mode does not affect default", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(datapoint.attributes["test"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(datapoint.attributes["test"], ParseJSON(true))`}}, + }, + wantErrorWith: "expected string but got bool", + }, + { + name: "resource: statements group with error mode", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(resource.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(resource.attributes["test"], "pass")`}}, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + name: "resource: statements group error mode does not affect default", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(resource.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(resource.attributes["pass"], ParseJSON(true))`}}, + }, + wantErrorWith: "expected string but got bool", + }, + { + name: "scope: statements group with error mode", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(scope.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(scope.attributes["test"], "pass")`}}, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "scope: statements group error mode does not affect default", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(scope.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(scope.attributes["pass"], ParseJSON(true))`}}, + }, + wantErrorWith: "expected string but got bool", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + td := constructMetrics() + processor, err := NewProcessor(tt.statements, tt.errorMode, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + _, err = processor.ProcessMetrics(context.Background(), td) + if tt.wantErrorWith != "" { + if err == nil { + t.Errorf("expected error containing '%s', got: ", tt.wantErrorWith) + } + assert.Contains(t, err.Error(), tt.wantErrorWith) + return + } + assert.NoError(t, err) + exTd := constructMetrics() + tt.want(exTd) + assert.Equal(t, exTd, td) + }) + } +} + func Test_ProcessMetrics_CacheAccess(t *testing.T) { tests := []struct { name string @@ -1616,8 +1730,8 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { { name: "resource:resource.cache", statements: []common.ContextStatements{ - {Statements: []string{`set(resource.cache["test"], "pass")`}}, - {Statements: []string{`set(resource.attributes["test"], resource.cache["test"])`}}, + {Statements: []string{`set(resource.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(resource.attributes["test"], resource.cache["test"])`}, SharedCache: true}, }, want: func(td pmetric.Metrics) { td.ResourceMetrics().At(0).Resource().Attributes().PutStr("test", "pass") @@ -1641,8 +1755,8 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { { name: "scope:scope.cache", statements: []common.ContextStatements{ - {Statements: []string{`set(scope.cache["test"], "pass")`}}, - {Statements: []string{`set(scope.attributes["test"], scope.cache["test"])`}}, + {Statements: []string{`set(scope.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(scope.attributes["test"], scope.cache["test"])`}, SharedCache: true}, }, want: func(td pmetric.Metrics) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Scope().Attributes().PutStr("test", "pass") @@ -1664,8 +1778,8 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { { name: "metric:metric.cache", statements: []common.ContextStatements{ - {Statements: []string{`set(metric.cache["test"], "pass")`}}, - {Statements: []string{`set(metric.name, metric.cache["test"]) where metric.name == "operationB"`}}, + {Statements: []string{`set(metric.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(metric.name, metric.cache["test"]) where metric.name == "operationB"`}, SharedCache: true}, }, want: func(td pmetric.Metrics) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(1).SetName("pass") @@ -1687,8 +1801,8 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { { name: "datapoint:datapoint.cache", statements: []common.ContextStatements{ - {Statements: []string{`set(datapoint.cache["test"], "pass")`}}, - {Statements: []string{`set(datapoint.attributes["test"], datapoint.cache["test"]) where metric.name == "operationA"`}}, + {Statements: []string{`set(datapoint.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(datapoint.attributes["test"], datapoint.cache["test"]) where metric.name == "operationA"`}, SharedCache: true}, }, want: func(td pmetric.Metrics) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") @@ -1709,6 +1823,48 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") }, }, + { + name: "cache isolation", + statements: []common.ContextStatements{ + { + Statements: []string{`set(datapoint.cache["shared"], "fail")`}, + SharedCache: true, + }, + { + Statements: []string{ + `set(datapoint.cache["test"], "pass")`, + `set(datapoint.attributes["test"], datapoint.cache["test"])`, + `set(datapoint.attributes["test"], datapoint.cache["shared"])`, + }, + Conditions: []string{ + `metric.name == "operationA"`, + }, + }, + { + Context: common.DataPoint, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + `set(attributes["test"], cache["shared"])`, + `set(attributes["test"], datapoint.cache["shared"])`, + }, + Conditions: []string{ + `metric.name == "operationA"`, + }, + }, + { + Statements: []string{`set(datapoint.attributes["test"], "pass") where datapoint.cache["shared"] == "fail"`}, + SharedCache: true, + Conditions: []string{ + `metric.name == "operationA"`, + }, + }, + }, + want: func(td pmetric.Metrics) { + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Attributes().PutStr("test", "pass") + td.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(1).Attributes().PutStr("test", "pass") + }, + }, } for _, tt := range tests { @@ -1728,6 +1884,78 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { } } +func Test_NewProcessor_ConditionsParse(t *testing.T) { + type testCase struct { + name string + statements []common.ContextStatements + wantErrorWith string + } + + contextsTests := map[string][]testCase{"metric": nil, "datapoint": nil, "resource": nil, "scope": nil} + for ctx := range contextsTests { + contextsTests[ctx] = []testCase{ + { + name: "inferred: condition with context", + statements: []common.ContextStatements{ + { + Statements: []string{fmt.Sprintf(`set(%s.cache["test"], "pass")`, ctx)}, + Conditions: []string{fmt.Sprintf(`%s.cache["test"] == ""`, ctx)}, + }, + }, + }, + { + name: "inferred: condition without context", + statements: []common.ContextStatements{ + { + Statements: []string{fmt.Sprintf(`set(%s.cache["test"], "pass")`, ctx)}, + Conditions: []string{`cache["test"] == ""`}, + }, + }, + wantErrorWith: `missing context name for path "cache[test]"`, + }, + { + name: "context defined: condition without context", + statements: []common.ContextStatements{ + { + Context: common.ContextID(ctx), + Statements: []string{`set(cache["test"], "pass")`}, + Conditions: []string{`cache["test"] == ""`}, + }, + }, + }, + { + name: "context defined: condition with context", + statements: []common.ContextStatements{ + { + Context: common.ContextID(ctx), + Statements: []string{`set(cache["test"], "pass")`}, + Conditions: []string{fmt.Sprintf(`%s.cache["test"] == ""`, ctx)}, + }, + }, + wantErrorWith: fmt.Sprintf(`segment "%s" from path "%[1]s.cache[test]" is not a valid path`, ctx), + }, + } + } + + for ctx, tests := range contextsTests { + t.Run(ctx, func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NewProcessor(tt.statements, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + if tt.wantErrorWith != "" { + if err == nil { + t.Errorf("expected error containing '%s', got: ", tt.wantErrorWith) + } + assert.Contains(t, err.Error(), tt.wantErrorWith) + return + } + require.NoError(t, err) + }) + } + }) + } +} + func constructMetrics() pmetric.Metrics { td := pmetric.NewMetrics() rm0 := td.ResourceMetrics().AppendEmpty() diff --git a/processor/transformprocessor/internal/traces/processor.go b/processor/transformprocessor/internal/traces/processor.go index 43b6375f1b1e..2f4d0055d513 100644 --- a/processor/transformprocessor/internal/traces/processor.go +++ b/processor/transformprocessor/internal/traces/processor.go @@ -5,10 +5,8 @@ package traces // import "github.com/open-telemetry/opentelemetry-collector-cont import ( "context" - "reflect" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/multierr" @@ -18,8 +16,13 @@ import ( "github.com/open-telemetry/opentelemetry-collector-contrib/processor/transformprocessor/internal/common" ) +type parsedContextStatements struct { + common.TracesConsumer + sharedCache bool +} + type Processor struct { - contexts []consumer.Traces + contexts []parsedContextStatements logger *zap.Logger } @@ -29,14 +32,14 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E return nil, err } - contexts := make([]consumer.Traces, len(contextStatements)) + contexts := make([]parsedContextStatements, len(contextStatements)) var errors error for i, cs := range contextStatements { context, err := pc.ParseContextStatements(cs) if err != nil { errors = multierr.Append(errors, err) } - contexts[i] = context + contexts[i] = parsedContextStatements{context, cs.SharedCache} } if errors != nil { @@ -50,16 +53,10 @@ func NewProcessor(contextStatements []common.ContextStatements, errorMode ottl.E } func (p *Processor) ProcessTraces(ctx context.Context, td ptrace.Traces) (ptrace.Traces, error) { - contextCache := make(map[string]*pcommon.Map, len(p.contexts)) + sharedContextCache := make(map[common.ContextID]*pcommon.Map, len(p.contexts)) for _, c := range p.contexts { - cacheKey := reflect.TypeOf(c).String() - cache, ok := contextCache[cacheKey] - if !ok { - m := pcommon.NewMap() - cache = &m - contextCache[cacheKey] = cache - } - err := c.ConsumeTraces(common.WithCache(ctx, cache), td) + cache := common.NewContextCache(sharedContextCache, c.Context(), c.sharedCache) + err := c.ConsumeTraces(ctx, td, cache) if err != nil { p.logger.Error("failed processing traces", zap.Error(err)) return td, err diff --git a/processor/transformprocessor/internal/traces/processor_test.go b/processor/transformprocessor/internal/traces/processor_test.go index 92e662962214..75d7b731fec5 100644 --- a/processor/transformprocessor/internal/traces/processor_test.go +++ b/processor/transformprocessor/internal/traces/processor_test.go @@ -5,10 +5,12 @@ package traces import ( "context" + "fmt" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/ptrace" @@ -970,7 +972,7 @@ func Test_ProcessTraces_MixContext(t *testing.T) { } } -func Test_ProcessTraces_Error(t *testing.T) { +func Test_ProcessTraces_ErrorMode(t *testing.T) { tests := []struct { statement string context common.ContextID @@ -1001,7 +1003,118 @@ func Test_ProcessTraces_Error(t *testing.T) { } } -func Test_ProcessMetrics_CacheAccess(t *testing.T) { +func Test_ProcessTraces_StatementsErrorMode(t *testing.T) { + tests := []struct { + name string + errorMode ottl.ErrorMode + statements []common.ContextStatements + want func(td ptrace.Traces) + wantErrorWith string + }{ + { + name: "span: statements group with error mode", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(span.attributes["test"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(span.attributes["test"], "pass") where span.name == "operationA" `}}, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + name: "span: statements group error mode does not affect default", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(span.attributes["test"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(span.attributes["test"], ParseJSON(true))`}}, + }, + wantErrorWith: "expected string but got bool", + }, + { + name: "spanevent: statements group with error mode", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(spanevent.attributes["test"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(spanevent.attributes["test"], "pass") where spanevent.name == "eventA" `}}, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Events().At(0).Attributes().PutStr("test", "pass") + }, + }, + { + name: "spanevent: statements group error mode does not affect default", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(spanevent.attributes["test"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(spanevent.attributes["test"], ParseJSON(true))`}}, + }, + wantErrorWith: "expected string but got bool", + }, + { + name: "resource: statements group with error mode", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(resource.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(resource.attributes["test"], "pass")`}}, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).Resource().Attributes().PutStr("test", "pass") + }, + }, + { + name: "resource: statements group error mode does not affect default", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(resource.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(resource.attributes["pass"], ParseJSON(true))`}}, + }, + wantErrorWith: "expected string but got bool", + }, + { + name: "scope: statements group with error mode", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(scope.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(scope.attributes["test"], "pass")`}}, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().PutStr("test", "pass") + }, + }, + { + name: "scope: statements group error mode does not affect default", + errorMode: ottl.PropagateError, + statements: []common.ContextStatements{ + {Statements: []string{`set(scope.attributes["pass"], ParseJSON(1))`}, ErrorMode: ottl.IgnoreError}, + {Statements: []string{`set(scope.attributes["pass"], ParseJSON(true))`}}, + }, + wantErrorWith: `expected string but got bool`, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + td := constructTraces() + processor, err := NewProcessor(tt.statements, tt.errorMode, componenttest.NewNopTelemetrySettings()) + assert.NoError(t, err) + _, err = processor.ProcessTraces(context.Background(), td) + if tt.wantErrorWith != "" { + if err == nil { + t.Errorf("expected error containing '%s', got: ", tt.wantErrorWith) + } + assert.Contains(t, err.Error(), tt.wantErrorWith) + return + } + assert.NoError(t, err) + exTd := constructTraces() + tt.want(exTd) + assert.Equal(t, exTd, td) + }) + } +} + +func Test_ProcessTraces_CacheAccess(t *testing.T) { tests := []struct { name string statements []common.ContextStatements @@ -1010,8 +1123,8 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { { name: "resource:resource.cache", statements: []common.ContextStatements{ - {Statements: []string{`set(resource.cache["test"], "pass")`}}, - {Statements: []string{`set(resource.attributes["test"], resource.cache["test"])`}}, + {Statements: []string{`set(resource.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(resource.attributes["test"], resource.cache["test"])`}, SharedCache: true}, }, want: func(td ptrace.Traces) { td.ResourceSpans().At(0).Resource().Attributes().PutStr("test", "pass") @@ -1035,8 +1148,8 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { { name: "scope:scope.cache", statements: []common.ContextStatements{ - {Statements: []string{`set(scope.cache["test"], "pass")`}}, - {Statements: []string{`set(scope.attributes["test"], scope.cache["test"])`}}, + {Statements: []string{`set(scope.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(scope.attributes["test"], scope.cache["test"])`}, SharedCache: true}, }, want: func(td ptrace.Traces) { td.ResourceSpans().At(0).ScopeSpans().At(0).Scope().Attributes().PutStr("test", "pass") @@ -1058,8 +1171,8 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { { name: "span:span.cache", statements: []common.ContextStatements{ - {Statements: []string{`set(span.cache["test"], "pass")`}}, - {Statements: []string{`set(span.attributes["test"], span.cache["test"]) where span.name == "operationA"`}}, + {Statements: []string{`set(span.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(span.attributes["test"], span.cache["test"]) where span.name == "operationA"`}, SharedCache: true}, }, want: func(td ptrace.Traces) { td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") @@ -1081,8 +1194,8 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { { name: "spanevent:spanevent.cache", statements: []common.ContextStatements{ - {Statements: []string{`set(spanevent.cache["test"], "pass")`}}, - {Statements: []string{`set(spanevent.attributes["test"], spanevent.cache["test"]) where spanevent.name == "eventA"`}}, + {Statements: []string{`set(spanevent.cache["test"], "pass")`}, SharedCache: true}, + {Statements: []string{`set(spanevent.attributes["test"], spanevent.cache["test"]) where spanevent.name == "eventA"`}, SharedCache: true}, }, want: func(td ptrace.Traces) { td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Events().At(0).Attributes().PutStr("test", "pass") @@ -1101,6 +1214,47 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Events().At(0).Attributes().PutStr("test", "pass") }, }, + { + name: "cache isolation", + statements: []common.ContextStatements{ + { + Statements: []string{`set(span.cache["shared"], "fail")`}, + SharedCache: true, + }, + { + Statements: []string{ + `set(span.cache["test"], "pass")`, + `set(span.attributes["test"], span.cache["test"])`, + `set(span.attributes["test"], span.cache["shared"])`, + }, + Conditions: []string{ + `span.name == "operationA"`, + }, + }, + { + Context: common.Span, + Statements: []string{ + `set(cache["test"], "pass")`, + `set(attributes["test"], cache["test"])`, + `set(attributes["test"], cache["shared"])`, + `set(attributes["test"], span.cache["shared"])`, + }, + Conditions: []string{ + `name == "operationA"`, + }, + }, + { + Statements: []string{`set(span.attributes["test"], "pass") where span.cache["shared"] == "fail"`}, + SharedCache: true, + Conditions: []string{ + `span.name == "operationA"`, + }, + }, + }, + want: func(td ptrace.Traces) { + td.ResourceSpans().At(0).ScopeSpans().At(0).Spans().At(0).Attributes().PutStr("test", "pass") + }, + }, } for _, tt := range tests { @@ -1120,6 +1274,78 @@ func Test_ProcessMetrics_CacheAccess(t *testing.T) { } } +func Test_NewProcessor_ConditionsParse(t *testing.T) { + type testCase struct { + name string + statements []common.ContextStatements + wantErrorWith string + } + + contextsTests := map[string][]testCase{"span": nil, "spanevent": nil, "resource": nil, "scope": nil} + for ctx := range contextsTests { + contextsTests[ctx] = []testCase{ + { + name: "inferred: condition with context", + statements: []common.ContextStatements{ + { + Statements: []string{fmt.Sprintf(`set(%s.cache["test"], "pass")`, ctx)}, + Conditions: []string{fmt.Sprintf(`%s.cache["test"] == ""`, ctx)}, + }, + }, + }, + { + name: "inferred: condition without context", + statements: []common.ContextStatements{ + { + Statements: []string{fmt.Sprintf(`set(%s.cache["test"], "pass")`, ctx)}, + Conditions: []string{`cache["test"] == ""`}, + }, + }, + wantErrorWith: `missing context name for path "cache[test]"`, + }, + { + name: "context defined: condition without context", + statements: []common.ContextStatements{ + { + Context: common.ContextID(ctx), + Statements: []string{`set(cache["test"], "pass")`}, + Conditions: []string{`cache["test"] == ""`}, + }, + }, + }, + { + name: "context defined: condition with context", + statements: []common.ContextStatements{ + { + Context: common.ContextID(ctx), + Statements: []string{`set(cache["test"], "pass")`}, + Conditions: []string{fmt.Sprintf(`%s.cache["test"] == ""`, ctx)}, + }, + }, + wantErrorWith: fmt.Sprintf(`segment "%s" from path "%[1]s.cache[test]" is not a valid path`, ctx), + }, + } + } + + for ctx, tests := range contextsTests { + t.Run(ctx, func(t *testing.T) { + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, err := NewProcessor(tt.statements, ottl.PropagateError, componenttest.NewNopTelemetrySettings()) + if tt.wantErrorWith != "" { + if err == nil { + t.Errorf("expected error containing '%s', got: ", tt.wantErrorWith) + } + assert.Contains(t, err.Error(), tt.wantErrorWith) + return + } + require.NoError(t, err) + }) + } + }) + } +} + func BenchmarkTwoSpans(b *testing.B) { tests := []struct { name string diff --git a/processor/transformprocessor/testdata/config.yaml b/processor/transformprocessor/testdata/config.yaml index cb6fd3e708bc..a0656f0ea4d6 100644 --- a/processor/transformprocessor/testdata/config.yaml +++ b/processor/transformprocessor/testdata/config.yaml @@ -130,6 +130,34 @@ transform/flat_configuration: - set(log.body, "bear") where log.attributes["http.path"] == "/animal" - set(resource.attributes["name"], "bear") +transform/structured_configuration_with_path_context: + trace_statements: + - context: span + statements: + - set(span.name, "bear") where span.attributes["http.path"] == "/animal" + metric_statements: + - context: metric + statements: + - set(metric.name, "bear") where resource.attributes["http.path"] == "/animal" + log_statements: + - context: log + statements: + - set(log.body, "bear") where log.attributes["http.path"] == "/animal" + +transform/structured_configuration_with_inferred_context: + trace_statements: + - statements: + - set(span.name, "bear") where span.attributes["http.path"] == "/animal" + - set(resource.attributes["name"], "bear") + metric_statements: + - statements: + - set(metric.name, "bear") where resource.attributes["http.path"] == "/animal" + - set(resource.attributes["name"], "bear") + log_statements: + - statements: + - set(log.body, "bear") where log.attributes["http.path"] == "/animal" + - set(resource.attributes["name"], "bear") + transform/mixed_configuration_styles: trace_statements: - set(span.name, "bear") where span.attributes["http.path"] == "/animal" @@ -137,15 +165,45 @@ transform/mixed_configuration_styles: statements: - set(attributes["name"], "bear") - keep_keys(attributes, ["http.method", "http.path"]) + - statements: + - set(span.attributes["name"], "lion") + - set(span.name, "lion") where span.attributes["http.path"] == "/animal" metric_statements: - set(metric.name, "bear") where resource.attributes["http.path"] == "/animal" - context: resource statements: - set(attributes["name"], "bear") - keep_keys(attributes, ["http.method", "http.path"]) + - statements: + - set(metric.name, "lion") + - set(metric.name, "lion") where resource.attributes["http.path"] == "/animal" log_statements: - set(log.body, "bear") where log.attributes["http.path"] == "/animal" - context: resource statements: - set(attributes["name"], "bear") - keep_keys(attributes, ["http.method", "http.path"]) + - statements: + - set(log.attributes["name"], "lion") + - set(log.body, "lion") where log.attributes["http.path"] == "/animal" + +transform/context_statements_error_mode: + error_mode: ignore + trace_statements: + - error_mode: propagate + statements: + - set(resource.attributes["name"], "propagate") + - statements: + - set(resource.attributes["name"], "ignore") + metric_statements: + - error_mode: silent + statements: + - set(resource.attributes["name"], "silent") + - statements: + - set(resource.attributes["name"], "ignore") + log_statements: + - error_mode: propagate + statements: + - set(resource.attributes["name"], "propagate") + - statements: + - set(resource.attributes["name"], "ignore")