From 9a5585eb2c2ed32a46449b2651110755dad22c90 Mon Sep 17 00:00:00 2001 From: Ankit Patel <8731662+ankitpatel96@users.noreply.github.com> Date: Tue, 3 Dec 2024 05:39:36 -0500 Subject: [PATCH] [chore] Schema Processor Revamp [Part 2] - ChangeList and Revision (#35267) **Description:** This is a slice of changes from https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35248 This PR details how operators are used to build the execution pipeline for a given schemafile. Changed files from the [previous PR](https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35214) are: processor/schemaprocessor/internal/changelist/changelist.go processor/schemaprocessor/internal/translation/revision_v1.go processor/schemaprocessor/internal/translation/revision_v1_test.go processor/schemaprocessor/go.mod I'm asking a maintainer if they would be willing to push a copy of the previous PR's branch to the core repo so I can switch the base of this PR to the previous PR - thus only the stacked changes would be shown. Edit: this is apparently not easily supported - so asking reviewers to just focus on the changed files listed above. Sorry about that! **Testing:** Unit tests --------- Co-authored-by: Pablo Baeyens --- processor/schemaprocessor/DESIGN.md | 28 +++ processor/schemaprocessor/README.md | 4 +- processor/schemaprocessor/go.mod | 1 + .../schemaprocessor/internal/alias/alias.go | 6 +- .../internal/changelist/changelist.go | 84 ++++++++ .../internal/migrate/attributes.go | 58 +----- .../internal/migrate/attributes_test.go | 104 +--------- .../internal/migrate/conditional.go | 56 ++---- .../internal/migrate/conditional_test.go | 122 +----------- .../internal/migrate/migrator.go | 17 ++ .../internal/migrate/multi_conditional.go | 77 ++++++++ .../migrate/multi_conditional_test.go | 149 ++++++++++++++ .../internal/migrate/signal.go | 47 +---- .../internal/migrate/signal_test.go | 8 +- .../transformer/attributes_operators.go | 157 +++++++++++++++ .../internal/transformer/attributes_test.go | 121 ++++++++++++ .../transformer/conditional_attributes.go | 80 ++++++++ .../conditional_attributes_test.go | 110 +++++++++++ .../internal/transformer/interface.go | 16 ++ .../multi_conditional_attributes.go | 32 +++ .../multi_conditional_attributes_test.go | 83 ++++++++ .../internal/transformer/signal_name.go | 40 ++++ .../internal/transformer/signal_name_test.go | 34 ++++ .../internal/translation/revision_v1.go | 164 ++++++++------- .../internal/translation/revision_v1_test.go | 186 ++++++++++++------ processor/schemaprocessor/metadata.yaml | 2 +- 26 files changed, 1299 insertions(+), 487 deletions(-) create mode 100644 processor/schemaprocessor/DESIGN.md create mode 100644 processor/schemaprocessor/internal/changelist/changelist.go create mode 100644 processor/schemaprocessor/internal/migrate/migrator.go create mode 100644 processor/schemaprocessor/internal/migrate/multi_conditional.go create mode 100644 processor/schemaprocessor/internal/migrate/multi_conditional_test.go create mode 100644 processor/schemaprocessor/internal/transformer/attributes_operators.go create mode 100644 processor/schemaprocessor/internal/transformer/attributes_test.go create mode 100644 processor/schemaprocessor/internal/transformer/conditional_attributes.go create mode 100644 processor/schemaprocessor/internal/transformer/conditional_attributes_test.go create mode 100644 processor/schemaprocessor/internal/transformer/interface.go create mode 100644 processor/schemaprocessor/internal/transformer/multi_conditional_attributes.go create mode 100644 processor/schemaprocessor/internal/transformer/multi_conditional_attributes_test.go create mode 100644 processor/schemaprocessor/internal/transformer/signal_name.go create mode 100644 processor/schemaprocessor/internal/transformer/signal_name_test.go diff --git a/processor/schemaprocessor/DESIGN.md b/processor/schemaprocessor/DESIGN.md new file mode 100644 index 000000000000..ae70dce24c60 --- /dev/null +++ b/processor/schemaprocessor/DESIGN.md @@ -0,0 +1,28 @@ +# Design + +The Schema Processor is split into several different components. + +Here's a general structure diagram: + +```mermaid +graph LR; + A[Previous Collector Component] --> B[Transformer] + B -- Schema URL --> C[Translation Manager] + C -- Translation --> B + B --> H[Translator] + H --> E[Revision] + E --> I[ChangeList] + subgraph Interpreter + direction RL + I --> F[Transformer] + F --> G[Migrator] + end + +``` +The [Transformer](transformer.go) is registered as a Processor in the Collector by the factory. +Data flows into the Transformer, which uses the Schema URL to fetch the translation from the Translation Manager. +The Translation Manager (at internal/translation/manager.go in a future PR) is responsible for fetching and caching the translations. It takes in a schema URL and returns a Translator struct. + +The Translator struct contains the target schema URL, the target schema version, and a list of Revisions. The Translator figures out what the version of the incoming data is and what Revisions to apply to the incoming data to get it to the target schema version. The Translator is also responsible for applying the Revisions to the incoming data - it iterates through these Revisions and applies them to the incoming data. + +Each Revision represents all the changes within a specific version. It consists of several ChangeLists (at internal/changelist/changelist.go in a future PR) - one for each type of change block (at the time of writing - `all`, `resources`, `spans`, `spanEvents`, `metrics`, `logs`). Each ChangeList is similar to a program in an interpreter - in this case the programming language is the schema file! They iterate through whatever changes they are constructed with, and call a [Transformer](internal/transformer) for each type of change. The Transformer accepts a typed value - a log, a metric, etc. It then, under the hood, calls one of a few Migrators. The Migrators do the fundamental work of changing attributes, changing names, etc. The Migrators generally operate on lower levels than the Transformers - they operate on `Attributes`, or an `alias.NamedSignal` (a signal that implements `Name()` and `SetName()`). diff --git a/processor/schemaprocessor/README.md b/processor/schemaprocessor/README.md index 7d9eea8ab42c..93af6889ffed 100644 --- a/processor/schemaprocessor/README.md +++ b/processor/schemaprocessor/README.md @@ -6,7 +6,7 @@ | Stability | [development]: traces, metrics, logs | | Distributions | [] | | Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Fschema%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Fschema) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Fschema%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Fschema) | -| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy) | +| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MovieStoreGuy](https://www.github.com/MovieStoreGuy), [@ankitpatel96](https://www.github.com/ankitpatel96) | [development]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#development @@ -59,3 +59,5 @@ processors: ``` For more complete examples, please refer to [config.yml](./testdata/config.yml). + +There's a rough design/overview of the processor in the [DESIGN.md](./DESIGN.md) file. \ No newline at end of file diff --git a/processor/schemaprocessor/go.mod b/processor/schemaprocessor/go.mod index 3f33d9e9c5d2..8cf8659d8280 100644 --- a/processor/schemaprocessor/go.mod +++ b/processor/schemaprocessor/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/processor/schem go 1.22.0 require ( + github.com/google/go-cmp v0.6.0 github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v0.114.1-0.20241202231142-b9ff1bc54c99 go.opentelemetry.io/collector/component/componenttest v0.114.1-0.20241202231142-b9ff1bc54c99 diff --git a/processor/schemaprocessor/internal/alias/alias.go b/processor/schemaprocessor/internal/alias/alias.go index 918a08d44d78..7e5e01436350 100644 --- a/processor/schemaprocessor/internal/alias/alias.go +++ b/processor/schemaprocessor/internal/alias/alias.go @@ -1,7 +1,7 @@ // Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 -// Package Alias is a subset of the interfaces defined by pdata and family +// Package alias is a subset of the interfaces defined by pdata and family // package to allow for higher code reuse without using generics. package alias // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias" @@ -30,6 +30,10 @@ type NamedSignal interface { SetName(name string) } +type Attributed interface { + Attributes() pcommon.Map +} + var ( _ Resource = (*plog.ResourceLogs)(nil) _ Resource = (*pmetric.ResourceMetrics)(nil) diff --git a/processor/schemaprocessor/internal/changelist/changelist.go b/processor/schemaprocessor/internal/changelist/changelist.go new file mode 100644 index 000000000000..2ed253989340 --- /dev/null +++ b/processor/schemaprocessor/internal/changelist/changelist.go @@ -0,0 +1,84 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package changelist // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/changelist" + +import ( + "fmt" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/transformer" +) + +// ChangeList represents a list of changes within a section of the schema processor. It can take in a list of different migrators for a specific section and will apply them in order, based on whether Apply or Rollback is called +type ChangeList struct { + Migrators []migrate.Migrator +} + +func (c ChangeList) Do(ss migrate.StateSelector, signal any) error { + for i := 0; i < len(c.Migrators); i++ { + var migrator migrate.Migrator + // todo(ankit) in go1.23 switch to reversed iterators for this + if ss == migrate.StateSelectorApply { + migrator = c.Migrators[i] + } else { + migrator = c.Migrators[len(c.Migrators)-1-i] + } + // switch between transformer types - what do the transformers act on? + switch thisMigrator := migrator.(type) { + // this one acts on both spans and span events! + case transformer.Transformer[ptrace.Span]: + if span, ok := signal.(ptrace.Span); ok { + if err := thisMigrator.Do(ss, span); err != nil { + return err + } + } else { + return fmt.Errorf("span Transformer %T can't act on %T", thisMigrator, signal) + } + case transformer.Transformer[pmetric.Metric]: + if metric, ok := signal.(pmetric.Metric); ok { + if err := thisMigrator.Do(ss, metric); err != nil { + return err + } + } else { + return fmt.Errorf("metric Transformer %T can't act on %T", thisMigrator, signal) + } + case transformer.Transformer[plog.LogRecord]: + if log, ok := signal.(plog.LogRecord); ok { + if err := thisMigrator.Do(ss, log); err != nil { + return err + } + } else { + return fmt.Errorf("log Transformer %T can't act on %T", thisMigrator, signal) + } + case transformer.Transformer[pcommon.Resource]: + if resource, ok := signal.(pcommon.Resource); ok { + if err := thisMigrator.Do(ss, resource); err != nil { + return err + } + } else { + return fmt.Errorf("resource Transformer %T can't act on %T", thisMigrator, signal) + } + case transformer.AllAttributes: + if err := thisMigrator.Do(ss, signal); err != nil { + return err + } + default: + return fmt.Errorf("unsupported migrator type %T", thisMigrator) + } + } + return nil +} + +func (c ChangeList) Apply(signal any) error { + return c.Do(migrate.StateSelectorApply, signal) +} + +func (c ChangeList) Rollback(signal any) error { + return c.Do(migrate.StateSelectorRollback, signal) +} diff --git a/processor/schemaprocessor/internal/migrate/attributes.go b/processor/schemaprocessor/internal/migrate/attributes.go index 37a020cf6123..ced10436dda8 100644 --- a/processor/schemaprocessor/internal/migrate/attributes.go +++ b/processor/schemaprocessor/internal/migrate/attributes.go @@ -11,24 +11,22 @@ import ( "go.uber.org/multierr" ) -// AttributeChangeSet represents an unscoped entry that can be applied. -// +// AttributeChangeSet represents a rename_attributes type operation. // The listed changes are duplicated twice // to allow for simplified means of transition to or from a revision. type AttributeChangeSet struct { - updates ast.AttributeMap + // The keys are the old attribute name used in the previous version, the values are the + // new attribute name starting from this version (comment from ast.AttributeMap) + updates ast.AttributeMap + // the inverse of the updates map rollback ast.AttributeMap } -// AttributeChangeSetSlice allows for `AttributeChangeSet` -// to be chained together as they are defined within the schema -// and be applied sequentially to ensure deterministic behavior. -type AttributeChangeSetSlice []*AttributeChangeSet - // NewAttributeChangeSet allows for typed strings to be used as part // of the invocation that will be converted into the default string type. -func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet { - attr := &AttributeChangeSet{ +func NewAttributeChangeSet(mappings ast.AttributeMap) AttributeChangeSet { + // for ambiguous rollbacks (if updates contains entries with multiple keys that have the same value), rollback contains the last key iterated over in mappings + attr := AttributeChangeSet{ updates: make(map[string]string, len(mappings)), rollback: make(map[string]string, len(mappings)), } @@ -39,15 +37,9 @@ func NewAttributeChangeSet(mappings ast.AttributeMap) *AttributeChangeSet { return attr } -func (a *AttributeChangeSet) Apply(attrs pcommon.Map) error { - return a.do(StateSelectorApply, attrs) -} +func (a AttributeChangeSet) IsMigrator() {} -func (a *AttributeChangeSet) Rollback(attrs pcommon.Map) error { - return a.do(StateSelectorRollback, attrs) -} - -func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error) { +func (a *AttributeChangeSet) Do(ss StateSelector, attrs pcommon.Map) (errs error) { var ( updated = make(map[string]struct{}) results = pcommon.NewMap() @@ -81,33 +73,3 @@ func (a *AttributeChangeSet) do(ss StateSelector, attrs pcommon.Map) (errs error results.CopyTo(attrs) return errs } - -// NewAttributeChangeSetSlice combines all the provided `AttributeChangeSets` -// and allows them to be executed in the provided order. -func NewAttributeChangeSetSlice(changes ...*AttributeChangeSet) *AttributeChangeSetSlice { - values := new(AttributeChangeSetSlice) - for _, c := range changes { - (*values) = append((*values), c) - } - return values -} - -func (slice *AttributeChangeSetSlice) Apply(attrs pcommon.Map) error { - return slice.do(StateSelectorApply, attrs) -} - -func (slice *AttributeChangeSetSlice) Rollback(attrs pcommon.Map) error { - return slice.do(StateSelectorRollback, attrs) -} - -func (slice *AttributeChangeSetSlice) do(ss StateSelector, attrs pcommon.Map) (errs error) { - for i := 0; i < len(*slice); i++ { - switch ss { - case StateSelectorApply: - errs = multierr.Append(errs, (*slice)[i].Apply(attrs)) - case StateSelectorRollback: - errs = multierr.Append(errs, (*slice)[len(*slice)-1-i].Rollback(attrs)) - } - } - return errs -} diff --git a/processor/schemaprocessor/internal/migrate/attributes_test.go b/processor/schemaprocessor/internal/migrate/attributes_test.go index 3d1e708f3b92..051693a74acc 100644 --- a/processor/schemaprocessor/internal/migrate/attributes_test.go +++ b/processor/schemaprocessor/internal/migrate/attributes_test.go @@ -27,7 +27,7 @@ func TestNewAttributeChangeSet(t *testing.T) { "hello": "world", }) - expect := &AttributeChangeSet{ + expect := AttributeChangeSet{ updates: map[string]string{ "hello": "world", }, @@ -45,7 +45,7 @@ func TestAttributeChangeSetApply(t *testing.T) { for _, tc := range []struct { name string - acs *AttributeChangeSet + acs AttributeChangeSet attrs pcommon.Map expect pcommon.Map errVal string @@ -104,7 +104,7 @@ func TestAttributeChangeSetApply(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - err := tc.acs.Apply(tc.attrs) + err := tc.acs.Do(StateSelectorApply, tc.attrs) if tc.errVal == "" { assert.NoError(t, err, "Must not return an error") } else { @@ -120,7 +120,7 @@ func TestAttributeChangeSetRollback(t *testing.T) { for _, tc := range []struct { name string - acs *AttributeChangeSet + acs AttributeChangeSet attrs pcommon.Map expect pcommon.Map errVal string @@ -179,7 +179,7 @@ func TestAttributeChangeSetRollback(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - err := tc.acs.Rollback(tc.attrs) + err := tc.acs.Do(StateSelectorRollback, tc.attrs) if tc.errVal == "" { assert.NoError(t, err, "Must not return an error") } else { @@ -189,97 +189,3 @@ func TestAttributeChangeSetRollback(t *testing.T) { }) } } - -func TestNewAttributeChangeSetSliceApply(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - changes *AttributeChangeSetSlice - attr pcommon.Map - expect pcommon.Map - }{ - { - name: "no changes listed", - changes: NewAttributeChangeSetSlice(), - attr: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.1") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.1") - }), - }, - { - name: "changes defined", - changes: NewAttributeChangeSetSlice( - NewAttributeChangeSet(map[string]string{ - "service_version": "service.version", - }), - NewAttributeChangeSet(map[string]string{ - "service.version": "application.service.version", - }), - ), - attr: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service_version", "v0.0.1") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("application.service.version", "v0.0.1") - }), - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - assert.NoError(t, tc.changes.Apply(tc.attr)) - assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes") - }) - } -} - -func TestNewAttributeChangeSetSliceApplyRollback(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - changes *AttributeChangeSetSlice - attr pcommon.Map - expect pcommon.Map - }{ - { - name: "no changes listed", - changes: NewAttributeChangeSetSlice(), - attr: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.1") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.1") - }), - }, - { - name: "changes defined", - changes: NewAttributeChangeSetSlice( - NewAttributeChangeSet(map[string]string{ - "service_version": "service.version", - }), - NewAttributeChangeSet(map[string]string{ - "service.version": "application.service.version", - }), - ), - attr: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("application.service.version", "v0.0.1") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service_version", "v0.0.1") - }), - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - assert.NoError(t, tc.changes.Rollback(tc.attr)) - assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected attributes") - }) - } -} diff --git a/processor/schemaprocessor/internal/migrate/conditional.go b/processor/schemaprocessor/internal/migrate/conditional.go index b3eea651f9fc..1f76387ea16b 100644 --- a/processor/schemaprocessor/internal/migrate/conditional.go +++ b/processor/schemaprocessor/internal/migrate/conditional.go @@ -6,7 +6,6 @@ package migrate // import "github.com/open-telemetry/opentelemetry-collector-con import ( "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/otel/schema/v1.0/ast" - "go.uber.org/multierr" ) // ValueMatch defines the expected match type @@ -15,74 +14,43 @@ type ValueMatch interface { ~string } +// ConditionalAttributeSet represents a rename_attribute that will happen only if the passed in value matches the `on` set. type ConditionalAttributeSet struct { - on *map[string]struct{} - attrs *AttributeChangeSet + on map[string]struct{} + attrs AttributeChangeSet } type ConditionalAttributeSetSlice []*ConditionalAttributeSet -func NewConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches ...Match) *ConditionalAttributeSet { +func NewConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches ...Match) ConditionalAttributeSet { on := make(map[string]struct{}) for _, m := range matches { on[string(m)] = struct{}{} } - return &ConditionalAttributeSet{ - on: &on, + return ConditionalAttributeSet{ + on: on, attrs: NewAttributeChangeSet(mappings), } } -func (ca *ConditionalAttributeSet) Apply(attrs pcommon.Map, values ...string) (errs error) { - if ca.check(values...) { - errs = ca.attrs.Apply(attrs) - } - return errs -} +func (ca ConditionalAttributeSet) IsMigrator() {} -func (ca *ConditionalAttributeSet) Rollback(attrs pcommon.Map, values ...string) (errs error) { +// Do applies the attribute changes specified in the constructor if any of the values in values matches the matches specified in the constructor. +func (ca *ConditionalAttributeSet) Do(ss StateSelector, attrs pcommon.Map, values ...string) (errs error) { if ca.check(values...) { - errs = ca.attrs.Rollback(attrs) + errs = ca.attrs.Do(ss, attrs) } return errs } func (ca *ConditionalAttributeSet) check(values ...string) bool { - if len(*ca.on) == 0 { + if len(ca.on) == 0 { return true } for _, v := range values { - if _, ok := (*ca.on)[v]; !ok { + if _, ok := (ca.on)[v]; !ok { return false } } return true } - -func NewConditionalAttributeSetSlice(conditions ...*ConditionalAttributeSet) *ConditionalAttributeSetSlice { - values := new(ConditionalAttributeSetSlice) - for _, c := range conditions { - (*values) = append((*values), c) - } - return values -} - -func (slice *ConditionalAttributeSetSlice) Apply(attrs pcommon.Map, values ...string) error { - return slice.do(StateSelectorApply, attrs, values) -} - -func (slice *ConditionalAttributeSetSlice) Rollback(attrs pcommon.Map, values ...string) error { - return slice.do(StateSelectorRollback, attrs, values) -} - -func (slice *ConditionalAttributeSetSlice) do(ss StateSelector, attrs pcommon.Map, values []string) (errs error) { - for i := 0; i < len((*slice)); i++ { - switch ss { - case StateSelectorApply: - errs = multierr.Append(errs, (*slice)[i].Apply(attrs, values...)) - case StateSelectorRollback: - errs = multierr.Append(errs, (*slice)[len((*slice))-i-1].Rollback(attrs, values...)) - } - } - return errs -} diff --git a/processor/schemaprocessor/internal/migrate/conditional_test.go b/processor/schemaprocessor/internal/migrate/conditional_test.go index 0c199c98d050..183944100d98 100644 --- a/processor/schemaprocessor/internal/migrate/conditional_test.go +++ b/processor/schemaprocessor/internal/migrate/conditional_test.go @@ -15,7 +15,7 @@ func TestConditionalAttributeSetApply(t *testing.T) { for _, tc := range []struct { name string - cond *ConditionalAttributeSet + cond ConditionalAttributeSet check string attr pcommon.Map expect pcommon.Map @@ -84,7 +84,7 @@ func TestConditionalAttributeSetApply(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - assert.NoError(t, tc.cond.Apply(tc.attr, tc.check)) + assert.NoError(t, tc.cond.Do(StateSelectorApply, tc.attr, tc.check)) assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected value") }) } @@ -95,7 +95,7 @@ func TestConditionalAttributeSetRollback(t *testing.T) { for _, tc := range []struct { name string - cond *ConditionalAttributeSet + cond ConditionalAttributeSet check string attr pcommon.Map expect pcommon.Map @@ -164,122 +164,8 @@ func TestConditionalAttributeSetRollback(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - assert.NoError(t, tc.cond.Rollback(tc.attr, tc.check)) + assert.NoError(t, tc.cond.Do(StateSelectorRollback, tc.attr, tc.check)) assert.Equal(t, tc.expect.AsRaw(), tc.attr.AsRaw(), "Must match the expected value") }) } } - -func TestConditionalAttribueSetSliceApply(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - slice *ConditionalAttributeSetSlice - check string - attrs pcommon.Map - expect pcommon.Map - }{ - { - name: "No changes", - slice: NewConditionalAttributeSetSlice(), - check: "application start", - attrs: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - }, - { - name: "Not matched check value", - slice: NewConditionalAttributeSetSlice( - NewConditionalAttributeSet[string]( - map[string]string{ - "service_version": "service.version", - }, - ), - // intentially silly to be make it clear - // that this should not be applied - NewConditionalAttributeSet( - map[string]string{ - "service.version": "shark.attack", - }, - "shark spotted", - ), - ), - check: "application start", - attrs: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service_version", "v0.0.0") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - assert.NoError(t, tc.slice.Apply(tc.attrs, tc.check)) - assert.Equal(t, tc.expect.AsRaw(), tc.attrs.AsRaw(), "Must match the expected values") - }) - } -} - -func TestConditionalAttribueSetSliceRollback(t *testing.T) { - t.Parallel() - - for _, tc := range []struct { - name string - slice *ConditionalAttributeSetSlice - check string - attrs pcommon.Map - expect pcommon.Map - }{ - { - name: "No changes", - slice: NewConditionalAttributeSetSlice(), - check: "application start", - attrs: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - }, - { - name: "Not matched check value", - slice: NewConditionalAttributeSetSlice( - NewConditionalAttributeSet[string]( - map[string]string{ - "service_version": "service.version", - }, - ), - // intentially silly to be make it clear - // that this should not be applied - NewConditionalAttributeSet( - map[string]string{ - "service.version": "shark.attack", - }, - "shark spotted", - ), - ), - check: "application start", - attrs: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service.version", "v0.0.0") - }), - expect: testHelperBuildMap(func(m pcommon.Map) { - m.PutStr("service_version", "v0.0.0") - }), - }, - } { - tc := tc - t.Run(tc.name, func(t *testing.T) { - t.Parallel() - - assert.NoError(t, tc.slice.Rollback(tc.attrs, tc.check)) - assert.Equal(t, tc.expect.AsRaw(), tc.attrs.AsRaw(), "Must match the expected values") - }) - } -} diff --git a/processor/schemaprocessor/internal/migrate/migrator.go b/processor/schemaprocessor/internal/migrate/migrator.go new file mode 100644 index 000000000000..7215fae26c88 --- /dev/null +++ b/processor/schemaprocessor/internal/migrate/migrator.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package migrate // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" + +// Migrator is an interface that all migration types must implement. It is basically a marker interface. All Transformers are also Migrators +type Migrator interface { + IsMigrator() +} + +var ( + _ Migrator = (*AttributeChangeSet)(nil) + _ Migrator = (*MultiConditionalAttributeSet)(nil) + _ Migrator = (*SignalNameChange)(nil) + _ Migrator = (*ConditionalAttributeSet)(nil) + _ Migrator = (*SignalNameChange)(nil) +) diff --git a/processor/schemaprocessor/internal/migrate/multi_conditional.go b/processor/schemaprocessor/internal/migrate/multi_conditional.go new file mode 100644 index 000000000000..30b29c76bc1b --- /dev/null +++ b/processor/schemaprocessor/internal/migrate/multi_conditional.go @@ -0,0 +1,77 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package migrate // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" + +import ( + "errors" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/otel/schema/v1.0/ast" +) + +type set = map[string]struct{} + +// MultiConditionalAttributeSet maps from string keys to possible values for each of those keys. The Do function then checks passed in values for each key against the list provided here in the constructor. If there is a matching value for each key, the attribute changes are applied. +type MultiConditionalAttributeSet struct { + // map from string keys (in the intended case "event.name" and "span.name") to a set of acceptable values. + keysToPossibleValues map[string]set + attrs AttributeChangeSet +} + +type MultiConditionalAttributeSetSlice []*MultiConditionalAttributeSet + +func NewMultiConditionalAttributeSet[Match ValueMatch](mappings ast.AttributeMap, matches map[string][]Match) MultiConditionalAttributeSet { + keysToPossibleValues := make(map[string]set) + for k, values := range matches { + on := make(map[string]struct{}) + for _, val := range values { + on[string(val)] = struct{}{} + } + keysToPossibleValues[k] = on + } + return MultiConditionalAttributeSet{ + keysToPossibleValues: keysToPossibleValues, + attrs: NewAttributeChangeSet(mappings), + } +} + +func (ca MultiConditionalAttributeSet) IsMigrator() {} + +// Do function applies the attribute changes if the passed in values match the expected values provided in the constructor. Uses the Do method of the embedded AttributeChangeSet +func (ca *MultiConditionalAttributeSet) Do(ss StateSelector, attrs pcommon.Map, keyToCheckVals map[string]string) (errs error) { + match, err := ca.check(keyToCheckVals) + if err != nil { + return err + } + if match { + errs = ca.attrs.Do(ss, attrs) + } + return errs +} + +func (ca *MultiConditionalAttributeSet) check(keyToCheckVals map[string]string) (bool, error) { + if len(ca.keysToPossibleValues) == 0 { + return true, nil + } + if len(ca.keysToPossibleValues) != len(keyToCheckVals) { + return false, errors.New("passed in wrong number of matchers to MultiConditionalAttributeSet") + } + for k, inVal := range keyToCheckVals { + // We must already have a key matching the input key! If not, return an error + // indicates a programming error, should be impossible if using the class correctly + valToMatch, ok := (ca.keysToPossibleValues)[k] + if !ok { + return false, errors.New("passed in a key that doesn't exist in MultiConditionalAttributeSet") + } + // if there's nothing in here, match all values + if len(valToMatch) == 0 { + continue + } + if _, ok := valToMatch[inVal]; !ok { + return false, nil + } + } + // if we've gone through every one of the keys, and they've all generated matches, return true + return true, nil +} diff --git a/processor/schemaprocessor/internal/migrate/multi_conditional_test.go b/processor/schemaprocessor/internal/migrate/multi_conditional_test.go new file mode 100644 index 000000000000..0b8ae8e8197f --- /dev/null +++ b/processor/schemaprocessor/internal/migrate/multi_conditional_test.go @@ -0,0 +1,149 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package migrate + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestMultiConditionalAttributeSetApply(t *testing.T) { + t.Parallel() + + for _, tc := range []struct { + name string + cond MultiConditionalAttributeSet + inCondData map[string]string + inAttr pcommon.Map + expect pcommon.Map + }{ + { + name: "No changes defined", + cond: NewMultiConditionalAttributeSet[string](map[string]string{}, map[string][]string{}), + inCondData: map[string]string{"span.name": "database operation"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + }, + { + name: "Not matched in value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{"span.name": {"application start"}}, + ), + inCondData: map[string]string{"span.name": "datatbase operation"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + }, + { + name: "No condition set, applys to all", + cond: NewMultiConditionalAttributeSet[string]( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{}, + ), + inCondData: map[string]string{"span.name": "datatbase operation"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("application.version", "v0.0.0") + }), + }, + { + name: "Matched one condition, setting value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{ + "span.name": {"application start", "application end"}, + }, + ), + inCondData: map[string]string{"span.name": "application start"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("application.version", "v0.0.0") + }), + }, + { + name: "Matched one condition, other value, setting value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{ + "span.name": {"application start", "application end"}, + }, + ), + inCondData: map[string]string{"span.name": "application end"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("application.version", "v0.0.0") + }), + }, + { + name: "Matched one out of two conditions, don't set value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{ + "trace.name": {"application start"}, + "span.name": {"application end"}, + }, + ), + inCondData: map[string]string{"span.name": "application start", "trace.name": "application end"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + }, + { + name: "Matched both conditions, set value", + cond: NewMultiConditionalAttributeSet( + map[string]string{ + "service.version": "application.version", + }, + map[string][]string{ + "span.name": {"application start"}, + "trace.name": {"application end"}, + }, + ), + inCondData: map[string]string{"span.name": "application start", "trace.name": "application end"}, + inAttr: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("service.version", "v0.0.0") + }), + expect: testHelperBuildMap(func(m pcommon.Map) { + m.PutStr("application.version", "v0.0.0") + }), + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + require.NoError(t, tc.cond.Do(StateSelectorApply, tc.inAttr, tc.inCondData)) + assert.Equal(t, tc.expect.AsRaw(), tc.inAttr.AsRaw(), "Must match the expected value") + }) + } +} diff --git a/processor/schemaprocessor/internal/migrate/signal.go b/processor/schemaprocessor/internal/migrate/signal.go index 0570819c4a97..bdd2894ac15c 100644 --- a/processor/schemaprocessor/internal/migrate/signal.go +++ b/processor/schemaprocessor/internal/migrate/signal.go @@ -12,18 +12,16 @@ type SignalType interface { } // SignalNameChange allows for migrating types that -// implement the `alias.Signal` interface. +// implement the `alias.NamedSignal` interface. type SignalNameChange struct { updates map[string]string rollback map[string]string } -type SignalNameChangeSlice []*SignalNameChange - -// NewSignalNameChange will create a `Signal` that will check the provided mappings if it can update a `alias.Signal` +// NewSignalNameChange will create a `Signal` that will check the provided mappings if it can update a `alias.NamedSignal` // and if no values are provided for `matches`, then all values will be updated. -func NewSignalNameChange[Key SignalType, Value SignalType](mappings map[Key]Value) *SignalNameChange { - sig := &SignalNameChange{ +func NewSignalNameChange[Key SignalType, Value SignalType](mappings map[Key]Value) SignalNameChange { + sig := SignalNameChange{ updates: make(map[string]string, len(mappings)), rollback: make(map[string]string, len(mappings)), } @@ -34,15 +32,9 @@ func NewSignalNameChange[Key SignalType, Value SignalType](mappings map[Key]Valu return sig } -func (s *SignalNameChange) Apply(signal alias.NamedSignal) { - s.do(StateSelectorApply, signal) -} - -func (s *SignalNameChange) Rollback(signal alias.NamedSignal) { - s.do(StateSelectorRollback, signal) -} +func (s SignalNameChange) IsMigrator() {} -func (s *SignalNameChange) do(ss StateSelector, signal alias.NamedSignal) { +func (s *SignalNameChange) Do(ss StateSelector, signal alias.NamedSignal) { var ( name string matched bool @@ -57,30 +49,3 @@ func (s *SignalNameChange) do(ss StateSelector, signal alias.NamedSignal) { signal.SetName(name) } } - -func NewSignalNameChangeSlice(changes ...*SignalNameChange) *SignalNameChangeSlice { - values := new(SignalNameChangeSlice) - for _, c := range changes { - (*values) = append((*values), c) - } - return values -} - -func (slice *SignalNameChangeSlice) Apply(signal alias.NamedSignal) { - slice.do(StateSelectorApply, signal) -} - -func (slice *SignalNameChangeSlice) Rollback(signal alias.NamedSignal) { - slice.do(StateSelectorRollback, signal) -} - -func (slice *SignalNameChangeSlice) do(ss StateSelector, signal alias.NamedSignal) { - for i := 0; i < len((*slice)); i++ { - switch ss { - case StateSelectorApply: - (*slice)[i].Apply(signal) - case StateSelectorRollback: - (*slice)[len((*slice))-i-1].Rollback(signal) - } - } -} diff --git a/processor/schemaprocessor/internal/migrate/signal_test.go b/processor/schemaprocessor/internal/migrate/signal_test.go index 14f90951f46c..3c82a59f15d4 100644 --- a/processor/schemaprocessor/internal/migrate/signal_test.go +++ b/processor/schemaprocessor/internal/migrate/signal_test.go @@ -17,7 +17,7 @@ func TestSignalApply(t *testing.T) { for _, tc := range []struct { name string - sig *SignalNameChange + sig SignalNameChange val alias.NamedSignal expect string }{ @@ -64,7 +64,7 @@ func TestSignalApply(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - tc.sig.Apply(tc.val) + tc.sig.Do(StateSelectorApply, tc.val) assert.Equal(t, tc.expect, tc.val.Name(), "Must match expected name") }) } @@ -75,7 +75,7 @@ func TestSignalRollback(t *testing.T) { for _, tc := range []struct { name string - sig *SignalNameChange + sig SignalNameChange val alias.NamedSignal expect string }{ @@ -122,7 +122,7 @@ func TestSignalRollback(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - tc.sig.Rollback(tc.val) + tc.sig.Do(StateSelectorRollback, tc.val) assert.Equal(t, tc.expect, tc.val.Name(), "Must match expected name") }) } diff --git a/processor/schemaprocessor/internal/transformer/attributes_operators.go b/processor/schemaprocessor/internal/transformer/attributes_operators.go new file mode 100644 index 000000000000..3cce6ad877ff --- /dev/null +++ b/processor/schemaprocessor/internal/transformer/attributes_operators.go @@ -0,0 +1,157 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +// Package transformer contains various Transformers that represent a high level operation - typically a single "change" block from the schema change file. They rely on Migrators to do the actual work of applying the change to the data. Transformers accept and operate on a specific type of pdata (logs, metrics, etc) +package transformer // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/transformer" + +import ( + "errors" + "fmt" + + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +// MetricAttributes is a Transformer that acts on [pmetric.Metric]'s DataPoint's attributes. It is part of the [AllAttributes]. +type MetricAttributes struct { + AttributeChange migrate.AttributeChangeSet +} + +func (o MetricAttributes) IsMigrator() {} + +func (o MetricAttributes) Do(ss migrate.StateSelector, metric pmetric.Metric) error { + // todo(ankit) handle MetricTypeEmpty + var datam alias.Attributed + switch metric.Type() { + case pmetric.MetricTypeExponentialHistogram: + for dp := 0; dp < metric.ExponentialHistogram().DataPoints().Len(); dp++ { + datam = metric.ExponentialHistogram().DataPoints().At(dp) + if err := o.AttributeChange.Do(ss, datam.Attributes()); err != nil { + return err + } + } + case pmetric.MetricTypeHistogram: + for dp := 0; dp < metric.Histogram().DataPoints().Len(); dp++ { + datam = metric.Histogram().DataPoints().At(dp) + if err := o.AttributeChange.Do(ss, datam.Attributes()); err != nil { + return err + } + } + case pmetric.MetricTypeGauge: + for dp := 0; dp < metric.Gauge().DataPoints().Len(); dp++ { + datam = metric.Gauge().DataPoints().At(dp) + if err := o.AttributeChange.Do(ss, datam.Attributes()); err != nil { + return err + } + } + case pmetric.MetricTypeSum: + for dp := 0; dp < metric.Sum().DataPoints().Len(); dp++ { + datam = metric.Sum().DataPoints().At(dp) + if err := o.AttributeChange.Do(ss, datam.Attributes()); err != nil { + return err + } + } + case pmetric.MetricTypeSummary: + for dp := 0; dp < metric.Summary().DataPoints().Len(); dp++ { + datam = metric.Summary().DataPoints().At(dp) + if err := o.AttributeChange.Do(ss, datam.Attributes()); err != nil { + return err + } + } + default: + return errors.New("unsupported metric type") + } + + return nil +} + +// LogAttributes is a Transformer that acts on [plog.LogRecord] attributes. It powers the [Log's rename_attributes] transformation. It also powers the [AllAttributes]. +// [Log's rename_attributes]: https://opentelemetry.io/docs/specs/otel/schemas/file_format_v1.1.0/#rename_attributes-transformation-3 +type LogAttributes struct { + AttributeChange migrate.AttributeChangeSet +} + +func (o LogAttributes) IsMigrator() {} + +func (o LogAttributes) Do(ss migrate.StateSelector, log plog.LogRecord) error { + return o.AttributeChange.Do(ss, log.Attributes()) +} + +// SpanAttributes is a Transformer that acts on [ptrace.Span] attributes. It powers the [Span's rename_attributes] transformation. It also powers the [AllAttributes]. +// [Span's rename_attributes]: https://opentelemetry.io/docs/specs/otel/schemas/file_format_v1.1.0/#rename_attributes-transformation +type SpanAttributes struct { + AttributeChange migrate.AttributeChangeSet +} + +func (o SpanAttributes) IsMigrator() {} + +func (o SpanAttributes) Do(ss migrate.StateSelector, span ptrace.Span) error { + return o.AttributeChange.Do(ss, span.Attributes()) +} + +// SpanEventAttributes is a Transformer that acts on [ptrace.SpanEvent] attributes. It is part of the [AllAttributes]. +type SpanEventAttributes struct { + AttributeChange migrate.AttributeChangeSet +} + +func (o SpanEventAttributes) IsMigrator() {} + +func (o SpanEventAttributes) Do(ss migrate.StateSelector, spanEvent ptrace.SpanEvent) error { + return o.AttributeChange.Do(ss, spanEvent.Attributes()) +} + +// ResourceAttributes is a Transformer that acts on [pcommon.Resource] attributes. It powers the [Resource's rename_attributes] transformation. It also powers the [AllAttributes]. +// [Resource's rename_attributes]: https://opentelemetry.io/docs/specs/otel/schemas/file_format_v1.1.0/#resources-section +type ResourceAttributes struct { + AttributeChange migrate.AttributeChangeSet +} + +func (o ResourceAttributes) IsMigrator() {} + +func (o ResourceAttributes) Do(ss migrate.StateSelector, resource pcommon.Resource) error { + return o.AttributeChange.Do(ss, resource.Attributes()) +} + +// AllAttributes is a Transformer that acts on . It is a wrapper around the other attribute transformers. It powers the [All rename_attributes] transformation. +// [All rename_attributes]: https://opentelemetry.io/docs/specs/otel/schemas/file_format_v1.1.0/#all-section +type AllAttributes struct { + MetricAttributes MetricAttributes + LogAttributes LogAttributes + SpanAttributes SpanAttributes + SpanEventAttributes SpanEventAttributes + ResourceAttributes ResourceAttributes +} + +func NewAllAttributesTransformer(set migrate.AttributeChangeSet) AllAttributes { + return AllAttributes{ + MetricAttributes: MetricAttributes{AttributeChange: set}, + LogAttributes: LogAttributes{AttributeChange: set}, + SpanAttributes: SpanAttributes{AttributeChange: set}, + SpanEventAttributes: SpanEventAttributes{AttributeChange: set}, + ResourceAttributes: ResourceAttributes{AttributeChange: set}, + } +} + +func (o AllAttributes) IsMigrator() {} + +func (o AllAttributes) Do(ss migrate.StateSelector, data any) error { + switch typedData := data.(type) { + case pmetric.Metric: + return o.MetricAttributes.Do(ss, typedData) + case plog.LogRecord: + return o.LogAttributes.Do(ss, typedData) + case ptrace.Span: + return o.SpanAttributes.Do(ss, typedData) + case ptrace.SpanEvent: + return o.SpanEventAttributes.Do(ss, typedData) + case pcommon.Resource: + return o.ResourceAttributes.Do(ss, typedData) + default: + return fmt.Errorf("AllAttributes can't act on %T", typedData) + } +} diff --git a/processor/schemaprocessor/internal/transformer/attributes_test.go b/processor/schemaprocessor/internal/transformer/attributes_test.go new file mode 100644 index 000000000000..474d2fce9545 --- /dev/null +++ b/processor/schemaprocessor/internal/transformer/attributes_test.go @@ -0,0 +1,121 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transformer + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +func TestAttributeTransformers(t *testing.T) { + attrChange := migrate.NewAttributeChangeSet(map[string]string{ + "service_version": "service.version", + }) + allTransformer := AllAttributes{ + MetricAttributes: MetricAttributes{attrChange}, + LogAttributes: LogAttributes{attrChange}, + SpanAttributes: SpanAttributes{attrChange}, + SpanEventAttributes: SpanEventAttributes{attrChange}, + ResourceAttributes: ResourceAttributes{attrChange}, + } + + tests := []struct { + name string + transformer func() (alias.Attributed, error) + }{ + { + name: "MetricTransformerExponentialHistogram", + transformer: func() (alias.Attributed, error) { + metric := pmetric.NewMetric() + metric.SetEmptyExponentialHistogram().DataPoints().AppendEmpty().Attributes().PutStr("service_version", "1.0.0") + return metric.ExponentialHistogram().DataPoints().At(0), allTransformer.MetricAttributes.Do(migrate.StateSelectorApply, metric) + }, + }, + { + name: "MetricTransformerGauge", + transformer: func() (alias.Attributed, error) { + metric := pmetric.NewMetric() + metric.SetEmptyGauge().DataPoints().AppendEmpty().Attributes().PutStr("service_version", "1.0.0") + return metric.Gauge().DataPoints().At(0), allTransformer.MetricAttributes.Do(migrate.StateSelectorApply, metric) + }, + }, + { + name: "MetricTransformerHistogram", + transformer: func() (alias.Attributed, error) { + metric := pmetric.NewMetric() + metric.SetEmptyHistogram().DataPoints().AppendEmpty().Attributes().PutStr("service_version", "1.0.0") + return metric.Histogram().DataPoints().At(0), allTransformer.MetricAttributes.Do(migrate.StateSelectorApply, metric) + }, + }, + { + name: "MetricTransformerSum", + transformer: func() (alias.Attributed, error) { + metric := pmetric.NewMetric() + metric.SetEmptySum().DataPoints().AppendEmpty().Attributes().PutStr("service_version", "1.0.0") + return metric.Sum().DataPoints().At(0), allTransformer.MetricAttributes.Do(migrate.StateSelectorApply, metric) + }, + }, + { + name: "MetricTransformerSummary", + transformer: func() (alias.Attributed, error) { + metric := pmetric.NewMetric() + metric.SetEmptySummary().DataPoints().AppendEmpty().Attributes().PutStr("service_version", "1.0.0") + return metric.Summary().DataPoints().At(0), allTransformer.MetricAttributes.Do(migrate.StateSelectorApply, metric) + }, + }, + { + name: "LogAttributes", + transformer: func() (alias.Attributed, error) { + log := plog.NewLogRecord() + log.Attributes().PutStr("service_version", "1.0.0") + return log, allTransformer.LogAttributes.Do(migrate.StateSelectorApply, log) + }, + }, + { + name: "SpanAttributes", + transformer: func() (alias.Attributed, error) { + span := ptrace.NewSpan() + span.Attributes().PutStr("service_version", "1.0.0") + return span, allTransformer.SpanAttributes.Do(migrate.StateSelectorApply, span) + }, + }, + { + name: "SpanEventAttributes", + transformer: func() (alias.Attributed, error) { + spanEvent := ptrace.NewSpanEvent() + spanEvent.Attributes().PutStr("service_version", "1.0.0") + return spanEvent, allTransformer.SpanEventAttributes.Do(migrate.StateSelectorApply, spanEvent) + }, + }, + { + name: "ResourceAttributes", + transformer: func() (alias.Attributed, error) { + resource := pcommon.NewResource() + resource.Attributes().PutStr("service_version", "1.0.0") + return resource, allTransformer.ResourceAttributes.Do(migrate.StateSelectorApply, resource) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + item, err := tt.transformer() + if err != nil { + t.Errorf("unexpected error: %v", err) + return + } + attrs := item.Attributes() + val, ok := attrs.Get("service.version") + assert.True(t, ok) + assert.Equal(t, "1.0.0", val.Str()) + }) + } +} diff --git a/processor/schemaprocessor/internal/transformer/conditional_attributes.go b/processor/schemaprocessor/internal/transformer/conditional_attributes.go new file mode 100644 index 000000000000..f7f0e874d528 --- /dev/null +++ b/processor/schemaprocessor/internal/transformer/conditional_attributes.go @@ -0,0 +1,80 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transformer // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/transformer" + +import ( + "errors" + + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +// MetricDataPointAttributes is a conditional Transformer that acts on [pmetric.Metric]'s DataPoint's attributes. It powers the [Metric's rename_attributes] transformation. +// [Metric's rename_attributes]: https://opentelemetry.io/docs/specs/otel/schemas/file_format_v1.1.0/#rename_attributes-transformation-2 +type MetricDataPointAttributes struct { + ConditionalAttributeChange migrate.ConditionalAttributeSet +} + +func (o MetricDataPointAttributes) IsMigrator() {} + +func (o MetricDataPointAttributes) Do(ss migrate.StateSelector, metric pmetric.Metric) error { + // todo(ankit) handle MetricTypeEmpty + var datam alias.Attributed + switch metric.Type() { + case pmetric.MetricTypeExponentialHistogram: + for dp := 0; dp < metric.ExponentialHistogram().DataPoints().Len(); dp++ { + datam = metric.ExponentialHistogram().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + case pmetric.MetricTypeHistogram: + for dp := 0; dp < metric.Histogram().DataPoints().Len(); dp++ { + datam = metric.Histogram().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + case pmetric.MetricTypeGauge: + for dp := 0; dp < metric.Gauge().DataPoints().Len(); dp++ { + datam = metric.Gauge().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + case pmetric.MetricTypeSum: + for dp := 0; dp < metric.Sum().DataPoints().Len(); dp++ { + datam = metric.Sum().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + case pmetric.MetricTypeSummary: + for dp := 0; dp < metric.Summary().DataPoints().Len(); dp++ { + datam = metric.Summary().DataPoints().At(dp) + if err := o.ConditionalAttributeChange.Do(ss, datam.Attributes(), metric.Name()); err != nil { + return err + } + } + default: + return errors.New("unsupported metric type") + } + + return nil +} + +// SpanConditionalAttributes is a conditional Transformer that acts on [ptrace.Span]'s name. It powers the [Span's rename_attributes] transformation. +// [Span's rename_attributes]: https://opentelemetry.io/docs/specs/otel/schemas/file_format_v1.1.0/#rename_attributes-transformation +type SpanConditionalAttributes struct { + Migrator migrate.ConditionalAttributeSet +} + +func (o SpanConditionalAttributes) IsMigrator() {} + +func (o SpanConditionalAttributes) Do(ss migrate.StateSelector, span ptrace.Span) error { + return o.Migrator.Do(ss, span.Attributes(), span.Name()) +} diff --git a/processor/schemaprocessor/internal/transformer/conditional_attributes_test.go b/processor/schemaprocessor/internal/transformer/conditional_attributes_test.go new file mode 100644 index 000000000000..2f3fb7cf2461 --- /dev/null +++ b/processor/schemaprocessor/internal/transformer/conditional_attributes_test.go @@ -0,0 +1,110 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transformer + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/alias" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +func assertAttributeEquals(t *testing.T, attributes pcommon.Map, key string, value string) { + t.Helper() + val, ok := attributes.Get(key) + require.True(t, ok) + require.Equal(t, value, val.Str()) +} + +func TestMetricDataPointAttributesTransformer(t *testing.T) { + attrChange := migrate.NewConditionalAttributeSet(map[string]string{ + "service_version": "service.version", + }, "http_request") + metricDataPointAttributeTransformer := MetricDataPointAttributes{attrChange} + + tests := []struct { + name string + generator func(metric pmetric.Metric) alias.Attributed + }{ + { + name: "MetricTransformerExponentialHistogram", + generator: func(metric pmetric.Metric) alias.Attributed { + metric.SetEmptyExponentialHistogram().DataPoints().AppendEmpty().Attributes().PutStr("service_version", "1.0.0") + return metric.ExponentialHistogram().DataPoints().At(0) + }, + }, + { + name: "MetricTransformerGauge", + generator: func(metric pmetric.Metric) alias.Attributed { + metric.SetEmptyGauge().DataPoints().AppendEmpty().Attributes().PutStr("service_version", "1.0.0") + return metric.Gauge().DataPoints().At(0) + }, + }, + { + name: "MetricTransformerHistogram", + generator: func(metric pmetric.Metric) alias.Attributed { + metric.SetEmptyHistogram().DataPoints().AppendEmpty().Attributes().PutStr("service_version", "1.0.0") + return metric.Histogram().DataPoints().At(0) + }, + }, + { + name: "MetricTransformerSum", + generator: func(metric pmetric.Metric) alias.Attributed { + metric.SetEmptySum().DataPoints().AppendEmpty().Attributes().PutStr("service_version", "1.0.0") + return metric.Sum().DataPoints().At(0) + }, + }, + { + name: "MetricTransformerSummary", + generator: func(metric pmetric.Metric) alias.Attributed { + metric.SetEmptySummary().DataPoints().AppendEmpty().Attributes().PutStr("service_version", "1.0.0") + return metric.Summary().DataPoints().At(0) + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // generate new metric + metric := pmetric.NewMetric() + item := tt.generator(metric) + // assert it was constructed correctly + assertAttributeEquals(t, item.Attributes(), "service_version", "1.0.0") + + // name is blank - migrator shouldn't do anything + err := metricDataPointAttributeTransformer.Do(migrate.StateSelectorApply, metric) + require.NoError(t, err) + assertAttributeEquals(t, item.Attributes(), "service_version", "1.0.0") + + // name is http_request - migrator should change the attribute + metric.SetName("http_request") + err = metricDataPointAttributeTransformer.Do(migrate.StateSelectorApply, metric) + require.NoError(t, err) + assertAttributeEquals(t, item.Attributes(), "service.version", "1.0.0") + }) + } +} + +func TestSpanConditionalAttributeTransformer(t *testing.T) { + attrChange := migrate.NewConditionalAttributeSet(map[string]string{ + "service_version": "service.version", + }, "http_request") + spanConditionalAttributeTransformer := SpanConditionalAttributes{attrChange} + + span := ptrace.NewSpan() + span.Attributes().PutStr("service_version", "1.0.0") + // name is blank, migrator shouldn't do anything + err := spanConditionalAttributeTransformer.Do(migrate.StateSelectorApply, span) + require.NoError(t, err) + assertAttributeEquals(t, span.Attributes(), "service_version", "1.0.0") + + span.SetName("http_request") + err = spanConditionalAttributeTransformer.Do(migrate.StateSelectorApply, span) + require.NoError(t, err) + assertAttributeEquals(t, span.Attributes(), "service.version", "1.0.0") +} diff --git a/processor/schemaprocessor/internal/transformer/interface.go b/processor/schemaprocessor/internal/transformer/interface.go new file mode 100644 index 000000000000..d7a36ae96dba --- /dev/null +++ b/processor/schemaprocessor/internal/transformer/interface.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transformer // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/transformer" +import ( + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +type Transformer[T pmetric.Metric | plog.LogRecord | ptrace.Span | pcommon.Resource] interface { + Do(ss migrate.StateSelector, data T) error +} diff --git a/processor/schemaprocessor/internal/transformer/multi_conditional_attributes.go b/processor/schemaprocessor/internal/transformer/multi_conditional_attributes.go new file mode 100644 index 000000000000..8ebbb429739b --- /dev/null +++ b/processor/schemaprocessor/internal/transformer/multi_conditional_attributes.go @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transformer // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/transformer" + +import ( + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +// SpanEventConditionalAttributes is an transformer that powers the [Span Event's rename_attributes] change. +// [Span Event's rename_attributes]: https://opentelemetry.io/docs/specs/otel/schemas/file_format_v1.1.0/#rename_attributes-transformation-1 +type SpanEventConditionalAttributes struct { + MultiConditionalAttributeSet migrate.MultiConditionalAttributeSet +} + +func (o SpanEventConditionalAttributes) IsMigrator() {} + +func (o SpanEventConditionalAttributes) Do(ss migrate.StateSelector, span ptrace.Span) error { + for e := 0; e < span.Events().Len(); e++ { + event := span.Events().At(e) + if err := o.MultiConditionalAttributeSet.Do(ss, event.Attributes(), + map[string]string{ + "event.name": event.Name(), + "span.name": span.Name(), + }); err != nil { + return err + } + } + return nil +} diff --git a/processor/schemaprocessor/internal/transformer/multi_conditional_attributes_test.go b/processor/schemaprocessor/internal/transformer/multi_conditional_attributes_test.go new file mode 100644 index 000000000000..fa91adddb20d --- /dev/null +++ b/processor/schemaprocessor/internal/transformer/multi_conditional_attributes_test.go @@ -0,0 +1,83 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transformer + +import ( + "testing" + + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +func TestSpanEventConditionalAttributeTransformer(t *testing.T) { + migrator := migrate.NewMultiConditionalAttributeSet(map[string]string{ + "event.name": "event_name", + }, map[string][]string{ + "span.name": {"sqlquery.start", "sqlquery.end"}, + "event.name": {"sqlquery"}, + }) + c := SpanEventConditionalAttributes{migrator} + tests := []struct { + name string + span func() ptrace.Span + changed bool + }{ + { + name: "nomatch", + span: func() ptrace.Span { + span := ptrace.NewSpan() + span.SetName("nosqlquery.start") + span.Events().AppendEmpty().SetName("nosqlquery") + return span + }, + changed: false, + }, + { + name: "spannamematches", + span: func() ptrace.Span { + span := ptrace.NewSpan() + span.SetName("sqlquery.start") + span.Events().AppendEmpty().SetName("nosqlquery") + return span + }, + changed: false, + }, + { + name: "eventnamematches", + span: func() ptrace.Span { + span := ptrace.NewSpan() + span.SetName("nosqlquery.start") + span.Events().AppendEmpty().SetName("sqlquery") + return span + }, + changed: false, + }, + { + name: "bothnamematches", + span: func() ptrace.Span { + span := ptrace.NewSpan() + span.SetName("sqlquery.start") + span.Events().AppendEmpty().SetName("sqlquery") + return span + }, + changed: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + span := tt.span() + span.Events().At(0).Attributes().PutStr("event.name", "blah") + err := c.Do(migrate.StateSelectorApply, span) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if tt.changed { + assertAttributeEquals(t, span.Events().At(0).Attributes(), "event_name", "blah") + } else { + assertAttributeEquals(t, span.Events().At(0).Attributes(), "event.name", "blah") + } + }) + } +} diff --git a/processor/schemaprocessor/internal/transformer/signal_name.go b/processor/schemaprocessor/internal/transformer/signal_name.go new file mode 100644 index 000000000000..7d11b8cac60e --- /dev/null +++ b/processor/schemaprocessor/internal/transformer/signal_name.go @@ -0,0 +1,40 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transformer // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/transformer" + +import ( + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +// SpanEventSignalNameChange is an transformer that powers the [Span Event's rename_events] change. +// [Span Event's rename_events]: https://opentelemetry.io/docs/specs/otel/schemas/file_format_v1.1.0/#rename_events-transformation +type SpanEventSignalNameChange struct { + SignalNameChange migrate.SignalNameChange +} + +func (c SpanEventSignalNameChange) IsMigrator() {} + +func (c SpanEventSignalNameChange) Do(ss migrate.StateSelector, span ptrace.Span) error { + for e := 0; e < span.Events().Len(); e++ { + event := span.Events().At(e) + c.SignalNameChange.Do(ss, event) + } + return nil +} + +// MetricSignalNameChange is an transformer that powers the [Metric's rename_metrics] change. +// [Metric's rename_metrics]: https://opentelemetry.io/docs/specs/otel/schemas/file_format_v1.1.0/#rename_metrics-transformation +type MetricSignalNameChange struct { + SignalNameChange migrate.SignalNameChange +} + +func (c MetricSignalNameChange) IsMigrator() {} + +func (c MetricSignalNameChange) Do(ss migrate.StateSelector, metric pmetric.Metric) error { + c.SignalNameChange.Do(ss, metric) + return nil +} diff --git a/processor/schemaprocessor/internal/transformer/signal_name_test.go b/processor/schemaprocessor/internal/transformer/signal_name_test.go new file mode 100644 index 000000000000..ea384b43cab7 --- /dev/null +++ b/processor/schemaprocessor/internal/transformer/signal_name_test.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package transformer + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" +) + +func TestSpanEventSignalNameChangeTransformer(t *testing.T) { + s := ptrace.NewSpan() + s.Events().AppendEmpty().SetName("event.name") + c := SpanEventSignalNameChange{SignalNameChange: migrate.NewSignalNameChange(map[string]string{ + "event.name": "event_name", + })} + require.NoError(t, c.Do(migrate.StateSelectorApply, s)) + require.Equal(t, "event_name", s.Events().At(0).Name()) +} + +func TestMetricSignalNameChangeTransformer(t *testing.T) { + s := pmetric.NewMetric() + s.SetName("event.name") + c := MetricSignalNameChange{SignalNameChange: migrate.NewSignalNameChange(map[string]string{ + "event.name": "event_name", + })} + require.NoError(t, c.Do(migrate.StateSelectorApply, s)) + require.Equal(t, "event_name", s.Name()) +} diff --git a/processor/schemaprocessor/internal/translation/revision_v1.go b/processor/schemaprocessor/internal/translation/revision_v1.go index de42d0e83a1d..217ecb9daac3 100644 --- a/processor/schemaprocessor/internal/translation/revision_v1.go +++ b/processor/schemaprocessor/internal/translation/revision_v1.go @@ -6,21 +6,22 @@ package translation // import "github.com/open-telemetry/opentelemetry-collector import ( "go.opentelemetry.io/otel/schema/v1.0/ast" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/changelist" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/transformer" ) -// RevisionV1 represents all changes that are to be -// applied to a signal at a given version. +// RevisionV1 represents all changes that are to be applied to a signal at a given version. V1 represents the fact +// that this struct only support the Schema Files version 1.0 - not 1.1 which contains split. +// todo(ankit) implement split and rest of otel schema type RevisionV1 struct { - ver *Version - all *migrate.AttributeChangeSetSlice - resource *migrate.AttributeChangeSetSlice - spans *migrate.ConditionalAttributeSetSlice - eventNames *migrate.SignalNameChangeSlice - eventAttrsOnSpan *migrate.ConditionalAttributeSetSlice - eventAttrsOnName *migrate.ConditionalAttributeSetSlice - metricsAttrs *migrate.ConditionalAttributeSetSlice - metricNames *migrate.SignalNameChangeSlice + ver *Version + all *changelist.ChangeList + resources *changelist.ChangeList + spans *changelist.ChangeList + spanEvents *changelist.ChangeList + metrics *changelist.ChangeList + logs *changelist.ChangeList } // NewRevision processes the VersionDef and assigns the version to this revision @@ -28,87 +29,118 @@ type RevisionV1 struct { // Since VersionDef uses custom types for various definitions, it isn't possible // to cast those values into the primitives so each has to be processed together. // Generics would be handy here. +// todo(ankit) investigate using generics func NewRevision(ver *Version, def ast.VersionDef) *RevisionV1 { + // todo(ankit) change logs to be an ast.Attributes type so I dont have to change this + var logChanges ast.Attributes + for _, change := range def.Logs.Changes { + //nolint:gosimple + logChanges.Changes = append(logChanges.Changes, ast.AttributeChange{RenameAttributes: change.RenameAttributes}) + } return &RevisionV1{ - ver: ver, - all: newAttributeChangeSetSliceFromChanges(def.All), - resource: newAttributeChangeSetSliceFromChanges(def.Resources), - spans: newSpanConditionalAttributeSlice(def.Spans), - eventNames: newSpanEventSignalSlice(def.SpanEvents), - eventAttrsOnSpan: newSpanEventConditionalSpans(def.SpanEvents), - eventAttrsOnName: newSpanEventConditionalNames(def.SpanEvents), - metricsAttrs: newMetricConditionalSlice(def.Metrics), - metricNames: newMetricNameSignalSlice(def.Metrics), + ver: ver, + all: newAllChangeList(def.All), + resources: newResourceChangeList(def.Resources), + spans: newSpanChangeList(def.Spans), + spanEvents: newSpanEventChangeList(def.SpanEvents), + metrics: newMetricChangeList(def.Metrics), + logs: newLogsChangelist(def.Logs), } } -func newAttributeChangeSetSliceFromChanges(attrs ast.Attributes) *migrate.AttributeChangeSetSlice { - values := make([]*migrate.AttributeChangeSet, 0, 10) - for _, at := range attrs.Changes { - if renamed := at.RenameAttributes; renamed != nil { - values = append(values, migrate.NewAttributeChangeSet(renamed.AttributeMap)) - } - } - return migrate.NewAttributeChangeSetSlice(values...) +func (r RevisionV1) Version() *Version { + return r.ver } -func newSpanConditionalAttributeSlice(spans ast.Spans) *migrate.ConditionalAttributeSetSlice { - values := make([]*migrate.ConditionalAttributeSet, 0, 10) - for _, ch := range spans.Changes { - if renamed := ch.RenameAttributes; renamed != nil { - values = append(values, migrate.NewConditionalAttributeSet( - renamed.AttributeMap, - renamed.ApplyToSpans..., - )) +func newAllChangeList(all ast.Attributes) *changelist.ChangeList { + values := make([]migrate.Migrator, 0) + for _, at := range all.Changes { + if renamed := at.RenameAttributes; renamed != nil { + attributeChangeSet := migrate.NewAttributeChangeSet(renamed.AttributeMap) + allTransformer := transformer.NewAllAttributesTransformer(attributeChangeSet) + values = append(values, allTransformer) } } - return migrate.NewConditionalAttributeSetSlice(values...) + return &changelist.ChangeList{Migrators: values} } -func newSpanEventSignalSlice(events ast.SpanEvents) *migrate.SignalNameChangeSlice { - values := make([]*migrate.SignalNameChange, 0, 10) - for _, ch := range events.Changes { - if renamed := ch.RenameEvents; renamed != nil { - values = append(values, migrate.NewSignalNameChange(renamed.EventNameMap)) +func newResourceChangeList(resource ast.Attributes) *changelist.ChangeList { + values := make([]migrate.Migrator, 0) + for _, at := range resource.Changes { + if renamed := at.RenameAttributes; renamed != nil { + attributeChangeSet := migrate.NewAttributeChangeSet(renamed.AttributeMap) + resourceTransformer := transformer.ResourceAttributes{AttributeChange: attributeChangeSet} + values = append(values, resourceTransformer) } } - return migrate.NewSignalNameChangeSlice(values...) + return &changelist.ChangeList{Migrators: values} } -func newSpanEventConditionalSpans(events ast.SpanEvents) *migrate.ConditionalAttributeSetSlice { - values := make([]*migrate.ConditionalAttributeSet, 0, 10) - for _, ch := range events.Changes { - if rename := ch.RenameAttributes; rename != nil { - values = append(values, migrate.NewConditionalAttributeSet(rename.AttributeMap, rename.ApplyToSpans...)) +func newSpanChangeList(spans ast.Spans) *changelist.ChangeList { + values := make([]migrate.Migrator, 0) + for _, at := range spans.Changes { + if renamed := at.RenameAttributes; renamed != nil { + conditionalAttributesChangeSet := transformer.SpanConditionalAttributes{Migrator: migrate.NewConditionalAttributeSet(renamed.AttributeMap, renamed.ApplyToSpans...)} + values = append(values, conditionalAttributesChangeSet) } } - return migrate.NewConditionalAttributeSetSlice(values...) + return &changelist.ChangeList{Migrators: values} } -func newSpanEventConditionalNames(events ast.SpanEvents) *migrate.ConditionalAttributeSetSlice { - values := make([]*migrate.ConditionalAttributeSet, 0, 10) - for _, ch := range events.Changes { - if rename := ch.RenameAttributes; rename != nil { - values = append(values, migrate.NewConditionalAttributeSet(rename.AttributeMap, rename.ApplyToEvents...)) +func newMetricChangeList(metrics ast.Metrics) *changelist.ChangeList { + values := make([]migrate.Migrator, 0) + for _, at := range metrics.Changes { + if renameAttributes := at.RenameAttributes; renameAttributes != nil { + attributeChangeSet := transformer.MetricDataPointAttributes{ + ConditionalAttributeChange: migrate.NewConditionalAttributeSet(renameAttributes.AttributeMap, renameAttributes.ApplyToMetrics...), + } + values = append(values, attributeChangeSet) + } else if renamedMetrics := at.RenameMetrics; renamedMetrics != nil { + signalNameChange := transformer.MetricSignalNameChange{SignalNameChange: migrate.NewSignalNameChange(renamedMetrics)} + values = append(values, signalNameChange) } } - return migrate.NewConditionalAttributeSetSlice(values...) + return &changelist.ChangeList{Migrators: values} } -func newMetricConditionalSlice(metrics ast.Metrics) *migrate.ConditionalAttributeSetSlice { - values := make([]*migrate.ConditionalAttributeSet, 0, 10) - for _, ch := range metrics.Changes { - if rename := ch.RenameAttributes; rename != nil { - values = append(values, migrate.NewConditionalAttributeSet(rename.AttributeMap, rename.ApplyToMetrics...)) +func newSpanEventChangeList(spanEvents ast.SpanEvents) *changelist.ChangeList { + values := make([]migrate.Migrator, 0) + for _, at := range spanEvents.Changes { + if renamedEvent := at.RenameEvents; renamedEvent != nil { + signalNameChange := migrate.NewSignalNameChange(renamedEvent.EventNameMap) + spanEventSignalNameChange := transformer.SpanEventSignalNameChange{SignalNameChange: signalNameChange} + values = append(values, spanEventSignalNameChange) + } else if renamedAttribute := at.RenameAttributes; renamedAttribute != nil { + acceptableSpanNames := make([]string, 0) + for _, spanName := range renamedAttribute.ApplyToSpans { + acceptableSpanNames = append(acceptableSpanNames, string(spanName)) + } + acceptableEventNames := make([]string, 0) + for _, eventName := range renamedAttribute.ApplyToEvents { + acceptableEventNames = append(acceptableEventNames, string(eventName)) + } + + multiConditionalAttributeSet := migrate.NewMultiConditionalAttributeSet(renamedAttribute.AttributeMap, map[string][]string{ + "span.name": acceptableSpanNames, + "event.name": acceptableEventNames, + }) + spanEventAttributeChangeSet := transformer.SpanEventConditionalAttributes{MultiConditionalAttributeSet: multiConditionalAttributeSet} + values = append(values, spanEventAttributeChangeSet) + } else { + panic("spanEvents change must have either RenameEvents or RenameAttributes") } } - return migrate.NewConditionalAttributeSetSlice(values...) + return &changelist.ChangeList{Migrators: values} } -func newMetricNameSignalSlice(metrics ast.Metrics) *migrate.SignalNameChangeSlice { - values := make([]*migrate.SignalNameChange, 0, 10) - for _, ch := range metrics.Changes { - values = append(values, migrate.NewSignalNameChange(ch.RenameMetrics)) +func newLogsChangelist(logs ast.Logs) *changelist.ChangeList { + values := make([]migrate.Migrator, 0) + for _, at := range logs.Changes { + if renamed := at.RenameAttributes; renamed != nil { + attributeChangeSet := migrate.NewAttributeChangeSet(renamed.AttributeMap) + logTransformer := transformer.LogAttributes{AttributeChange: attributeChangeSet} + values = append(values, logTransformer) + } } - return migrate.NewSignalNameChangeSlice(values...) + return &changelist.ChangeList{Migrators: values} } diff --git a/processor/schemaprocessor/internal/translation/revision_v1_test.go b/processor/schemaprocessor/internal/translation/revision_v1_test.go index cf71bc3c3c41..ff0a453e77af 100644 --- a/processor/schemaprocessor/internal/translation/revision_v1_test.go +++ b/processor/schemaprocessor/internal/translation/revision_v1_test.go @@ -5,14 +5,16 @@ package translation import ( "testing" - "github.com/stretchr/testify/assert" + "github.com/google/go-cmp/cmp" "go.opentelemetry.io/otel/schema/v1.0/ast" "go.opentelemetry.io/otel/schema/v1.0/types" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/changelist" "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/migrate" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/schemaprocessor/internal/transformer" ) -func TestNewRevision(t *testing.T) { +func TestNewRevisionV1(t *testing.T) { t.Parallel() for _, tc := range []struct { @@ -26,15 +28,13 @@ func TestNewRevision(t *testing.T) { inVersion: &Version{1, 1, 1}, inDefinition: ast.VersionDef{}, expect: &RevisionV1{ - ver: &Version{1, 1, 1}, - all: migrate.NewAttributeChangeSetSlice(), - resource: migrate.NewAttributeChangeSetSlice(), - spans: migrate.NewConditionalAttributeSetSlice(), - eventNames: migrate.NewSignalNameChangeSlice(), - eventAttrsOnSpan: migrate.NewConditionalAttributeSetSlice(), - eventAttrsOnName: migrate.NewConditionalAttributeSetSlice(), - metricsAttrs: migrate.NewConditionalAttributeSetSlice(), - metricNames: migrate.NewSignalNameChangeSlice(), + ver: &Version{1, 1, 1}, + all: &changelist.ChangeList{Migrators: make([]migrate.Migrator, 0)}, + resources: &changelist.ChangeList{Migrators: make([]migrate.Migrator, 0)}, + spans: &changelist.ChangeList{Migrators: make([]migrate.Migrator, 0)}, + spanEvents: &changelist.ChangeList{Migrators: make([]migrate.Migrator, 0)}, + metrics: &changelist.ChangeList{Migrators: make([]migrate.Migrator, 0)}, + logs: &changelist.ChangeList{Migrators: make([]migrate.Migrator, 0)}, }, }, { @@ -99,6 +99,8 @@ func TestNewRevision(t *testing.T) { "started": "application started", }, }, + }, + { RenameAttributes: &ast.RenameSpanEventAttributes{ ApplyToSpans: []types.SpanName{ "service running", @@ -130,6 +132,8 @@ func TestNewRevision(t *testing.T) { RenameMetrics: map[types.MetricName]types.MetricName{ "service.computed.uptime": "service.uptime", }, + }, + { RenameAttributes: &ast.AttributeMapForMetrics{ ApplyToMetrics: []types.MetricName{ "service.runtime", @@ -144,62 +148,112 @@ func TestNewRevision(t *testing.T) { }, expect: &RevisionV1{ ver: &Version{1, 0, 0}, - all: migrate.NewAttributeChangeSetSlice( - migrate.NewAttributeChangeSet(map[string]string{ - "state": "status", - }), - migrate.NewAttributeChangeSet(map[string]string{ - "status": "state", - }), - ), - resource: migrate.NewAttributeChangeSetSlice( - migrate.NewAttributeChangeSet(map[string]string{ - "service_name": "service.name", - }), - ), - spans: migrate.NewConditionalAttributeSetSlice( - migrate.NewConditionalAttributeSet( + all: &changelist.ChangeList{ + Migrators: []migrate.Migrator{ + transformer.AllAttributes{ + // initialize one of each transformer with the attribute set + MetricAttributes: transformer.MetricAttributes{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "state": "status", + }), + }, + LogAttributes: transformer.LogAttributes{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "state": "status", + }), + }, + SpanAttributes: transformer.SpanAttributes{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "state": "status", + }), + }, + SpanEventAttributes: transformer.SpanEventAttributes{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "state": "status", + }), + }, + ResourceAttributes: transformer.ResourceAttributes{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "state": "status", + }), + }, + }, + transformer.AllAttributes{ + // initialize one of each transformer with the attribute set + MetricAttributes: transformer.MetricAttributes{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "status": "state", + }), + }, + LogAttributes: transformer.LogAttributes{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "status": "state", + }), + }, + SpanAttributes: transformer.SpanAttributes{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "status": "state", + }), + }, + SpanEventAttributes: transformer.SpanEventAttributes{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "status": "state", + }), + }, + ResourceAttributes: transformer.ResourceAttributes{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "status": "state", + }), + }, + }, + }, + }, + resources: &changelist.ChangeList{Migrators: []migrate.Migrator{ + transformer.ResourceAttributes{AttributeChange: migrate.NewAttributeChangeSet( + map[string]string{"service_name": "service.name"}, + )}, + }}, + spans: &changelist.ChangeList{Migrators: []migrate.Migrator{ + transformer.SpanConditionalAttributes{Migrator: migrate.NewConditionalAttributeSet( map[string]string{"service_version": "service.version"}, "application start", - ), - migrate.NewConditionalAttributeSet[string]( + )}, + transformer.SpanConditionalAttributes{Migrator: migrate.NewConditionalAttributeSet[string]( map[string]string{"deployment.environment": "service.deployment.environment"}, - ), - ), - eventNames: migrate.NewSignalNameChangeSlice( - migrate.NewSignalNameChange(map[string]string{ - "started": "application started", - }), - ), - eventAttrsOnSpan: migrate.NewConditionalAttributeSetSlice( - migrate.NewConditionalAttributeSet( - map[string]string{ - "service.app.name": "service.name", - }, - "service running", - ), - ), - eventAttrsOnName: migrate.NewConditionalAttributeSetSlice( - migrate.NewConditionalAttributeSet( - map[string]string{ - "service.app.name": "service.name", - }, - "service errored", - ), - ), - metricsAttrs: migrate.NewConditionalAttributeSetSlice( - migrate.NewConditionalAttributeSet( - map[string]string{ - "runtime": "service.language", - }, - "service.runtime", - ), - ), - metricNames: migrate.NewSignalNameChangeSlice( - migrate.NewSignalNameChange(map[string]string{ + )}, + }}, + spanEvents: &changelist.ChangeList{Migrators: []migrate.Migrator{ + transformer.SpanEventSignalNameChange{ + SignalNameChange: migrate.NewSignalNameChange(map[string]string{ + "started": "application started", + }), + }, + transformer.SpanEventConditionalAttributes{ + MultiConditionalAttributeSet: migrate.NewMultiConditionalAttributeSet( + map[string]string{"service.app.name": "service.name"}, + map[string][]string{ + "span.name": {"service running"}, + "event.name": {"service errored"}, + }, + ), + }, + }}, + metrics: &changelist.ChangeList{Migrators: []migrate.Migrator{ + transformer.MetricSignalNameChange{SignalNameChange: migrate.NewSignalNameChange(map[string]string{ "service.computed.uptime": "service.uptime", - }), - ), + })}, + transformer.MetricDataPointAttributes{ConditionalAttributeChange: migrate.NewConditionalAttributeSet( + map[string]string{"runtime": "service.language"}, + "service.runtime", + )}, + }}, + logs: &changelist.ChangeList{Migrators: []migrate.Migrator{ + transformer.LogAttributes{ + AttributeChange: migrate.NewAttributeChangeSet(map[string]string{ + "ERROR": "error", + }), + }, + }}, }, }, } { @@ -209,7 +263,11 @@ func TestNewRevision(t *testing.T) { t.Parallel() rev := NewRevision(tc.inVersion, tc.inDefinition) - assert.EqualValues(t, tc.expect, rev, "Must match the expected values") + + // use go-cmp to compare tc.expect and rev and fail the test if there's a difference + if diff := cmp.Diff(tc.expect, rev, cmp.AllowUnexported(RevisionV1{}, migrate.AttributeChangeSet{}, migrate.ConditionalAttributeSet{}, migrate.SignalNameChange{}, transformer.SpanEventConditionalAttributes{}, migrate.MultiConditionalAttributeSet{})); diff != "" { + t.Errorf("NewRevisionV1() mismatch (-want +got):\n%s", diff) + } }) } } diff --git a/processor/schemaprocessor/metadata.yaml b/processor/schemaprocessor/metadata.yaml index 4914d3da2028..731ef9a26e34 100644 --- a/processor/schemaprocessor/metadata.yaml +++ b/processor/schemaprocessor/metadata.yaml @@ -6,7 +6,7 @@ status: development: [traces, metrics, logs] distributions: [] codeowners: - active: [MovieStoreGuy] + active: [MovieStoreGuy, ankitpatel96] tests: config: