Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[component] Make InstanceID immutable #10495

Merged
merged 8 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions .chloggen/immutable-instance-id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'breaking'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: componentstatus

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Make componentstatus.InstanceID immutable.

# One or more tracking issues or pull requests related to the change
issues: [10494]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
82 changes: 78 additions & 4 deletions component/componentstatus/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,88 @@

package componentstatus // import "go.opentelemetry.io/collector/component/componentstatus"

import "go.opentelemetry.io/collector/component"
import (
"slices"
"sort"
"strings"

"go.opentelemetry.io/collector/component"
)

// pipelineDelim is the delimiter for internal representation of pipeline
// component IDs.
const pipelineDelim = byte(0x20)

// InstanceID uniquely identifies a component instance
//
// TODO: consider moving this struct to a new package/module like `extension/statuswatcher`
// https://github.com/open-telemetry/opentelemetry-collector/issues/10764
type InstanceID struct {
ID component.ID
Kind component.Kind
PipelineIDs map[component.ID]struct{}
componentID component.ID
kind component.Kind
pipelineIDs string // IDs encoded as a string so InstanceID is Comparable.
}

// NewInstanceID returns an ID that uniquely identifies a component.
func NewInstanceID(componentID component.ID, kind component.Kind, pipelineIDs ...component.ID) *InstanceID {
instanceID := &InstanceID{
componentID: componentID,
kind: kind,
}
instanceID.addPipelines(pipelineIDs)
return instanceID
}

// ComponentID returns the ComponentID associated with this instance.
func (id *InstanceID) ComponentID() component.ID {
return id.componentID

Check warning on line 40 in component/componentstatus/instance.go

View check run for this annotation

Codecov / codecov/patch

component/componentstatus/instance.go#L39-L40

Added lines #L39 - L40 were not covered by tests
}

// Kind returns the component Kind associated with this instance.
func (id *InstanceID) Kind() component.Kind {
return id.kind

Check warning on line 45 in component/componentstatus/instance.go

View check run for this annotation

Codecov / codecov/patch

component/componentstatus/instance.go#L44-L45

Added lines #L44 - L45 were not covered by tests
}

// AllPipelineIDs calls f for each pipeline this instance is associated with. If
// f returns false it will stop iteration.
func (id *InstanceID) AllPipelineIDs(f func(component.ID) bool) {
var bs []byte
for _, b := range []byte(id.pipelineIDs) {
if b != pipelineDelim {
bs = append(bs, b)
continue
}
pipelineID := component.ID{}
err := pipelineID.UnmarshalText(bs)
bs = bs[:0]
if err != nil {
continue
}
if !f(pipelineID) {
break
}
}
}

// WithPipelines returns a new InstanceID updated to include the given
// pipelineIDs.
func (id *InstanceID) WithPipelines(pipelineIDs ...component.ID) *InstanceID {
instanceID := &InstanceID{
componentID: id.componentID,
kind: id.kind,
pipelineIDs: id.pipelineIDs,
}
instanceID.addPipelines(pipelineIDs)
return instanceID
}

func (id *InstanceID) addPipelines(pipelineIDs []component.ID) {
delim := string(pipelineDelim)
strIDs := strings.Split(id.pipelineIDs, delim)
for _, pID := range pipelineIDs {
strIDs = append(strIDs, pID.String())
}
sort.Strings(strIDs)
strIDs = slices.Compact(strIDs)
id.pipelineIDs = strings.Join(strIDs, delim) + delim
}
95 changes: 95 additions & 0 deletions component/componentstatus/instance_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package componentstatus

import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/component"
)

func TestInstanceID(t *testing.T) {
traces := component.MustNewID("traces")
tracesA := component.MustNewIDWithName("traces", "a")
tracesB := component.MustNewIDWithName("traces", "b")
tracesC := component.MustNewIDWithName("traces", "c")

idTracesA := NewInstanceID(traces, component.KindReceiver, tracesA)
idTracesAll := NewInstanceID(traces, component.KindReceiver, tracesA, tracesB, tracesC)
assert.NotEqual(t, idTracesA, idTracesAll)

assertHasPipelines := func(t *testing.T, instanceID *InstanceID, expectedPipelineIDs []component.ID) {
var pipelineIDs []component.ID
instanceID.AllPipelineIDs(func(id component.ID) bool {
pipelineIDs = append(pipelineIDs, id)
return true
})
assert.Equal(t, expectedPipelineIDs, pipelineIDs)
}

for _, tc := range []struct {
name string
id1 *InstanceID
id2 *InstanceID
pipelineIDs []component.ID
}{
{
name: "equal instances",
id1: idTracesA,
id2: NewInstanceID(traces, component.KindReceiver, tracesA),
pipelineIDs: []component.ID{tracesA},
},
{
name: "equal instances - out of order",
id1: idTracesAll,
id2: NewInstanceID(traces, component.KindReceiver, tracesC, tracesB, tracesA),
pipelineIDs: []component.ID{tracesA, tracesB, tracesC},
},
{
name: "with pipelines",
id1: idTracesAll,
id2: idTracesA.WithPipelines(tracesB, tracesC),
pipelineIDs: []component.ID{tracesA, tracesB, tracesC},
},
{
name: "with pipelines - out of order",
id1: idTracesAll,
id2: idTracesA.WithPipelines(tracesC, tracesB),
pipelineIDs: []component.ID{tracesA, tracesB, tracesC},
},
} {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.id1, tc.id2)
assertHasPipelines(t, tc.id1, tc.pipelineIDs)
assertHasPipelines(t, tc.id2, tc.pipelineIDs)
})
}
}

func TestAllPipelineIDs(t *testing.T) {
instanceID := NewInstanceID(
component.MustNewID("traces"),
component.KindReceiver,
component.MustNewIDWithName("traces", "a"),
component.MustNewIDWithName("traces", "b"),
component.MustNewIDWithName("traces", "c"),
)

count := 0
instanceID.AllPipelineIDs(func(component.ID) bool {
count++
return true
})
assert.Equal(t, 3, count)

count = 0
instanceID.AllPipelineIDs(func(component.ID) bool {
count++
return false
})
assert.Equal(t, 1, count)

}
2 changes: 1 addition & 1 deletion internal/e2e/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func Test_ComponentStatusReporting_SharedInstance(t *testing.T) {
assert.Equal(t, 5, len(eventsReceived))

for instanceID, events := range eventsReceived {
if instanceID.ID == component.NewID(component.MustNewType("test")) {
if instanceID.ComponentID() == component.NewID(component.MustNewType("test")) {
for i, e := range events {
if i == 0 {
assert.Equal(t, componentstatus.StatusStarting, e.Status())
Expand Down
4 changes: 2 additions & 2 deletions otelcol/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestComponentStatusWatcher(t *testing.T) {
changedComponents := map[*componentstatus.InstanceID][]componentstatus.Status{}
var mux sync.Mutex
onStatusChanged := func(source *componentstatus.InstanceID, event *componentstatus.Event) {
if source.ID.Type() != unhealthyProcessorFactory.Type() {
if source.ComponentID().Type() != unhealthyProcessorFactory.Type() {
return
}
mux.Lock()
Expand Down Expand Up @@ -200,7 +200,7 @@ func TestComponentStatusWatcher(t *testing.T) {

for k, v := range changedComponents {
// All processors must report a status change with the same ID
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID)
assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ComponentID())
// And all must have a valid startup sequence
assert.Equal(t, startupStatuses(v), v)
}
Expand Down
5 changes: 1 addition & 4 deletions service/extensions/extensions.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,7 @@ func New(ctx context.Context, set Settings, cfg Config, options ...Option) (*Ext
}

for _, extID := range cfg {
instanceID := &componentstatus.InstanceID{
ID: extID,
Kind: component.KindExtension,
}
instanceID := componentstatus.NewInstanceID(extID, component.KindExtension)
extSet := extension.Settings{
ID: extID,
TelemetrySettings: set.Telemetry,
Expand Down
54 changes: 19 additions & 35 deletions service/internal/graph/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,67 +197,51 @@ func (g *Graph) createNodes(set Settings) error {
func (g *Graph) createReceiver(pipelineID, recvID component.ID) *receiverNode {
rcvrNode := newReceiverNode(pipelineID.Type(), recvID)
if node := g.componentGraph.Node(rcvrNode.ID()); node != nil {
g.instanceIDs[node.ID()].PipelineIDs[pipelineID] = struct{}{}
instanceID := g.instanceIDs[node.ID()]
g.instanceIDs[node.ID()] = instanceID.WithPipelines(pipelineID)
return node.(*receiverNode)
}
g.componentGraph.AddNode(rcvrNode)
g.instanceIDs[rcvrNode.ID()] = &componentstatus.InstanceID{
ID: recvID,
Kind: component.KindReceiver,
PipelineIDs: map[component.ID]struct{}{
pipelineID: {},
},
}
g.instanceIDs[rcvrNode.ID()] = componentstatus.NewInstanceID(
recvID, component.KindReceiver, pipelineID,
)
return rcvrNode
}

func (g *Graph) createProcessor(pipelineID, procID component.ID) *processorNode {
procNode := newProcessorNode(pipelineID, procID)
g.componentGraph.AddNode(procNode)
g.instanceIDs[procNode.ID()] = &componentstatus.InstanceID{
ID: procID,
Kind: component.KindProcessor,
PipelineIDs: map[component.ID]struct{}{
pipelineID: {},
},
}
g.instanceIDs[procNode.ID()] = componentstatus.NewInstanceID(
procID, component.KindProcessor, pipelineID,
)
return procNode
}

func (g *Graph) createExporter(pipelineID, exprID component.ID) *exporterNode {
expNode := newExporterNode(pipelineID.Type(), exprID)
if node := g.componentGraph.Node(expNode.ID()); node != nil {
g.instanceIDs[expNode.ID()].PipelineIDs[pipelineID] = struct{}{}
instanceID := g.instanceIDs[expNode.ID()]
g.instanceIDs[expNode.ID()] = instanceID.WithPipelines(pipelineID)
return node.(*exporterNode)
}
g.componentGraph.AddNode(expNode)
g.instanceIDs[expNode.ID()] = &componentstatus.InstanceID{
ID: expNode.componentID,
Kind: component.KindExporter,
PipelineIDs: map[component.ID]struct{}{
pipelineID: {},
},
}
g.instanceIDs[expNode.ID()] = componentstatus.NewInstanceID(
expNode.componentID, component.KindExporter, pipelineID,
)
return expNode
}

func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component.ID) *connectorNode {
connNode := newConnectorNode(exprPipelineID.Type(), rcvrPipelineID.Type(), connID)
if node := g.componentGraph.Node(connNode.ID()); node != nil {
instanceID := g.instanceIDs[connNode.ID()]
instanceID.PipelineIDs[exprPipelineID] = struct{}{}
instanceID.PipelineIDs[rcvrPipelineID] = struct{}{}
g.instanceIDs[connNode.ID()] = instanceID.WithPipelines(exprPipelineID, rcvrPipelineID)
return node.(*connectorNode)
}
g.componentGraph.AddNode(connNode)
g.instanceIDs[connNode.ID()] = &componentstatus.InstanceID{
ID: connNode.componentID,
Kind: component.KindConnector,
PipelineIDs: map[component.ID]struct{}{
exprPipelineID: {},
rcvrPipelineID: {},
},
}
g.instanceIDs[connNode.ID()] = componentstatus.NewInstanceID(
connNode.componentID, component.KindConnector, exprPipelineID, rcvrPipelineID,
)
return connNode
}

Expand Down Expand Up @@ -429,8 +413,8 @@ func (g *Graph) StartAll(ctx context.Context, host *Host) error {
g.telemetry.Logger.WithOptions(zap.AddStacktrace(zap.DPanicLevel)).
Error("Failed to start component",
zap.Error(compErr),
zap.String("type", instanceID.Kind.String()),
zap.String("id", instanceID.ID.String()),
zap.String("type", instanceID.Kind().String()),
zap.String("id", instanceID.ComponentID().String()),
)
return compErr
}
Expand Down
12 changes: 6 additions & 6 deletions service/internal/graph/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2260,12 +2260,12 @@ func TestStatusReportedOnStartupShutdown(t *testing.T) {
eSdErr := &testNode{id: component.MustNewIDWithName("e_sd_err", "1"), shutdownErr: assert.AnError}

instanceIDs := map[*testNode]*componentstatus.InstanceID{
rNoErr: {ID: rNoErr.id},
rStErr: {ID: rStErr.id},
rSdErr: {ID: rSdErr.id},
eNoErr: {ID: eNoErr.id},
eStErr: {ID: eStErr.id},
eSdErr: {ID: eSdErr.id},
rNoErr: componentstatus.NewInstanceID(rNoErr.id, component.KindReceiver),
rStErr: componentstatus.NewInstanceID(rStErr.id, component.KindReceiver),
rSdErr: componentstatus.NewInstanceID(rSdErr.id, component.KindReceiver),
eNoErr: componentstatus.NewInstanceID(eNoErr.id, component.KindExporter),
eStErr: componentstatus.NewInstanceID(eStErr.id, component.KindExporter),
eSdErr: componentstatus.NewInstanceID(eSdErr.id, component.KindExporter),
}

// compare two maps of status events ignoring timestamp
Expand Down
Loading