From 26d02fba7c7977500ceb481698d18e906f6942c1 Mon Sep 17 00:00:00 2001 From: Matthew Wear Date: Fri, 28 Jun 2024 16:06:38 -0700 Subject: [PATCH] Make InstanceID immutable --- .chloggen/immutable-instance-id.yaml | 25 +++++++++++++ component/component.go | 53 +++++++++++++++++++++++++--- component/component_test.go | 24 +++++++++++++ otelcol/collector_test.go | 4 +-- service/extensions/extensions.go | 5 +-- service/internal/graph/graph.go | 53 +++++++++++----------------- service/internal/graph/graph_test.go | 12 +++---- 7 files changed, 127 insertions(+), 49 deletions(-) create mode 100644 .chloggen/immutable-instance-id.yaml diff --git a/.chloggen/immutable-instance-id.yaml b/.chloggen/immutable-instance-id.yaml new file mode 100644 index 00000000000..d73277062ff --- /dev/null +++ b/.chloggen/immutable-instance-id.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'breaking' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: component + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Make InstanceID immutable. This hides previously exported fields, making it a breaking change. + +# One or more tracking issues or pull requests related to the change +issues: [10494] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/component/component.go b/component/component.go index f5f68b57290..8aba8658c2a 100644 --- a/component/component.go +++ b/component/component.go @@ -190,9 +190,54 @@ func (f CreateDefaultConfigFunc) CreateDefaultConfig() Config { return f() } -// InstanceID uniquely identifies a component instance +// InstanceID uniquely identifies a component instance. type InstanceID struct { - ID ID - Kind Kind - PipelineIDs map[ID]struct{} + componentID ID + kind Kind + pipelineIDs map[ID]struct{} +} + +// ComponentID returns the ComponentID associated with this instance. +func (id InstanceID) ComponentID() ID { + return id.componentID +} + +// Kind returns the component Kind associated with this instance. +func (id InstanceID) Kind() Kind { + return id.kind +} + +// PipelineIDs returns a set of PipelineIDs associated with this instance. +func (id InstanceID) PipelineIDs() map[ID]struct{} { + return id.pipelineIDs +} + +// NewInstanceID returns an ID that uniquely identifies a component. +func NewInstanceID(componentID ID, kind Kind, pipelineIDs ...ID) *InstanceID { + instanceID := InstanceID{ + componentID: componentID, + kind: kind, + pipelineIDs: make(map[ID]struct{}, len(pipelineIDs)), + } + for _, pid := range pipelineIDs { + instanceID.pipelineIDs[pid] = struct{}{} + } + return &instanceID +} + +// InstanceIDWithPipelines derives a new InstanceID from id with additional +// pipelineIDs added to it. +func InstanceIDWithPipelines(id *InstanceID, pipelineIDs ...ID) *InstanceID { + instanceID := InstanceID{ + componentID: id.ComponentID(), + kind: id.Kind(), + pipelineIDs: make(map[ID]struct{}, len(id.PipelineIDs())+len(pipelineIDs)), + } + for pid := range id.PipelineIDs() { + instanceID.pipelineIDs[pid] = struct{}{} + } + for _, pid := range pipelineIDs { + instanceID.pipelineIDs[pid] = struct{}{} + } + return &instanceID } diff --git a/component/component_test.go b/component/component_test.go index 8b694fb4303..828357958cb 100644 --- a/component/component_test.go +++ b/component/component_test.go @@ -29,3 +29,27 @@ func TestStabilityLevelString(t *testing.T) { assert.EqualValues(t, "Stable", StabilityLevelStable.String()) assert.EqualValues(t, "", StabilityLevel(100).String()) } + +func TestInstanceID(t *testing.T) { + traces := MustNewID("traces") + metrics := MustNewID("metrics") + logs := MustNewID("logs") + receiver := MustNewID("receiver") + + id1 := NewInstanceID(receiver, KindReceiver, traces) + id2 := InstanceIDWithPipelines(id1, metrics, logs) + + assert.Equal(t, receiver, id1.ComponentID()) + assert.Equal(t, KindReceiver, id1.Kind()) + assert.Equal(t, map[ID]struct{}{ + traces: {}, + }, id1.pipelineIDs) + + assert.Equal(t, receiver, id2.ComponentID()) + assert.Equal(t, KindReceiver, id2.Kind()) + assert.Equal(t, map[ID]struct{}{ + traces: {}, + metrics: {}, + logs: {}, + }, id2.pipelineIDs) +} diff --git a/otelcol/collector_test.go b/otelcol/collector_test.go index e1a2356a553..60b8de9854d 100644 --- a/otelcol/collector_test.go +++ b/otelcol/collector_test.go @@ -148,7 +148,7 @@ func TestComponentStatusWatcher(t *testing.T) { changedComponents := map[*component.InstanceID][]component.Status{} var mux sync.Mutex onStatusChanged := func(source *component.InstanceID, event *component.StatusEvent) { - if source.ID.Type() != unhealthyProcessorFactory.Type() { + if source.ComponentID().Type() != unhealthyProcessorFactory.Type() { return } mux.Lock() @@ -199,7 +199,7 @@ func TestComponentStatusWatcher(t *testing.T) { for k, v := range changedComponents { // All processors must report a status change with the same ID - assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID) + assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ComponentID()) // And all must have a valid startup sequence assert.Equal(t, startupStatuses(v), v) } diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index c8f632965ca..eb728cc2e22 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -179,10 +179,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { extensionIDs: make([]component.ID, 0, len(cfg)), } for _, extID := range cfg { - instanceID := &component.InstanceID{ - ID: extID, - Kind: component.KindExtension, - } + instanceID := component.NewInstanceID(extID, component.KindExtension) extSet := extension.Settings{ ID: extID, TelemetrySettings: set.Telemetry.ToComponentTelemetrySettings(instanceID), diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index cbf7e8538c5..5058486424c 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -193,47 +193,37 @@ func (g *Graph) createNodes(set Settings) error { func (g *Graph) createReceiver(pipelineID, recvID component.ID) *receiverNode { rcvrNode := newReceiverNode(pipelineID.Type(), recvID) if node := g.componentGraph.Node(rcvrNode.ID()); node != nil { - g.instanceIDs[node.ID()].PipelineIDs[pipelineID] = struct{}{} + instanceID := g.instanceIDs[node.ID()] + g.instanceIDs[node.ID()] = component.InstanceIDWithPipelines(instanceID, pipelineID) return node.(*receiverNode) } g.componentGraph.AddNode(rcvrNode) - g.instanceIDs[rcvrNode.ID()] = &component.InstanceID{ - ID: recvID, - Kind: component.KindReceiver, - PipelineIDs: map[component.ID]struct{}{ - pipelineID: {}, - }, - } + g.instanceIDs[rcvrNode.ID()] = component.NewInstanceID( + recvID, component.KindReceiver, pipelineID, + ) return rcvrNode } func (g *Graph) createProcessor(pipelineID, procID component.ID) *processorNode { procNode := newProcessorNode(pipelineID, procID) g.componentGraph.AddNode(procNode) - g.instanceIDs[procNode.ID()] = &component.InstanceID{ - ID: procID, - Kind: component.KindProcessor, - PipelineIDs: map[component.ID]struct{}{ - pipelineID: {}, - }, - } + g.instanceIDs[procNode.ID()] = component.NewInstanceID( + procID, component.KindProcessor, pipelineID, + ) return procNode } func (g *Graph) createExporter(pipelineID, exprID component.ID) *exporterNode { expNode := newExporterNode(pipelineID.Type(), exprID) if node := g.componentGraph.Node(expNode.ID()); node != nil { - g.instanceIDs[expNode.ID()].PipelineIDs[pipelineID] = struct{}{} + instanceID := g.instanceIDs[expNode.ID()] + g.instanceIDs[expNode.ID()] = component.InstanceIDWithPipelines(instanceID, pipelineID) return node.(*exporterNode) } g.componentGraph.AddNode(expNode) - g.instanceIDs[expNode.ID()] = &component.InstanceID{ - ID: expNode.componentID, - Kind: component.KindExporter, - PipelineIDs: map[component.ID]struct{}{ - pipelineID: {}, - }, - } + g.instanceIDs[expNode.ID()] = component.NewInstanceID( + expNode.componentID, component.KindExporter, pipelineID, + ) return expNode } @@ -241,19 +231,16 @@ func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component connNode := newConnectorNode(exprPipelineID.Type(), rcvrPipelineID.Type(), connID) if node := g.componentGraph.Node(connNode.ID()); node != nil { instanceID := g.instanceIDs[connNode.ID()] - instanceID.PipelineIDs[exprPipelineID] = struct{}{} - instanceID.PipelineIDs[rcvrPipelineID] = struct{}{} + g.instanceIDs[connNode.ID()] = component.InstanceIDWithPipelines( + instanceID, exprPipelineID, rcvrPipelineID, + ) return node.(*connectorNode) } g.componentGraph.AddNode(connNode) - g.instanceIDs[connNode.ID()] = &component.InstanceID{ - ID: connNode.componentID, - Kind: component.KindConnector, - PipelineIDs: map[component.ID]struct{}{ - exprPipelineID: {}, - rcvrPipelineID: {}, - }, - } + + g.instanceIDs[connNode.ID()] = component.NewInstanceID( + connNode.componentID, component.KindConnector, exprPipelineID, rcvrPipelineID, + ) return connNode } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 68ba9f04bed..b639eca534c 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -2215,12 +2215,12 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) { eSdErr := &testNode{id: component.MustNewIDWithName("e_sd_err", "1"), shutdownErr: assert.AnError} instanceIDs := map[*testNode]*component.InstanceID{ - rNoErr: {ID: rNoErr.id}, - rStErr: {ID: rStErr.id}, - rSdErr: {ID: rSdErr.id}, - eNoErr: {ID: eNoErr.id}, - eStErr: {ID: eStErr.id}, - eSdErr: {ID: eSdErr.id}, + rNoErr: component.NewInstanceID(rNoErr.id, component.KindReceiver), + rStErr: component.NewInstanceID(rStErr.id, component.KindReceiver), + rSdErr: component.NewInstanceID(rSdErr.id, component.KindReceiver), + eNoErr: component.NewInstanceID(eNoErr.id, component.KindExporter), + eStErr: component.NewInstanceID(eStErr.id, component.KindExporter), + eSdErr: component.NewInstanceID(eSdErr.id, component.KindExporter), } // compare two maps of status events ignoring timestamp