diff --git a/.chloggen/component-status.yaml b/.chloggen/component-status.yaml new file mode 100755 index 00000000000..f920bb8c897 --- /dev/null +++ b/.chloggen/component-status.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: core + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adds the ability for components to report status and for extensions to subscribe to status events by implementing an optional StatusWatcher interface. + +# One or more tracking issues or pull requests related to the change +issues: [7682] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/component/component.go b/component/component.go index 9a6a95d798a..3b3fe3fc7cf 100644 --- a/component/component.go +++ b/component/component.go @@ -175,3 +175,10 @@ type CreateDefaultConfigFunc func() Config func (f CreateDefaultConfigFunc) CreateDefaultConfig() Config { return f() } + +// InstanceID uniquely identifies a component instance +type InstanceID struct { + ID ID + Kind Kind + PipelineIDs map[ID]struct{} +} diff --git a/component/componenttest/nop_telemetry.go b/component/componenttest/nop_telemetry.go index 438f9ec761a..a14abfb8978 100644 --- a/component/componenttest/nop_telemetry.go +++ b/component/componenttest/nop_telemetry.go @@ -21,5 +21,8 @@ func NewNopTelemetrySettings() component.TelemetrySettings { MeterProvider: noop.NewMeterProvider(), MetricsLevel: configtelemetry.LevelNone, Resource: pcommon.NewResource(), + ReportComponentStatus: func(*component.StatusEvent) error { + return nil + }, } } diff --git a/component/host.go b/component/host.go index ea3825d743f..732e37c8c44 100644 --- a/component/host.go +++ b/component/host.go @@ -12,6 +12,8 @@ type Host interface { // // ReportFatalError should be called by the component anytime after Component.Start() ends and // before Component.Shutdown() begins. + // Deprecated: [0.87.0] Use TelemetrySettings.ReportComponentStatus instead (with an event + // component.StatusFatalError) ReportFatalError(err error) // GetFactory of the specified kind. Returns the factory for a component type. diff --git a/component/status.go b/component/status.go new file mode 100644 index 00000000000..36a449edc12 --- /dev/null +++ b/component/status.go @@ -0,0 +1,193 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package component // import "go.opentelemetry.io/collector/component" + +import ( + "time" +) + +type Status int32 + +// Enumeration of possible component statuses +const ( + StatusNone Status = iota + StatusStarting + StatusOK + StatusRecoverableError + StatusPermanentError + StatusFatalError + StatusStopping + StatusStopped +) + +// String returns a string representation of a Status +func (s Status) String() string { + switch s { + case StatusStarting: + return "StatusStarting" + case StatusOK: + return "StatusOK" + case StatusRecoverableError: + return "StatusRecoverableError" + case StatusPermanentError: + return "StatusPermanentError" + case StatusFatalError: + return "StatusFatalError" + case StatusStopping: + return "StatusStopping" + case StatusStopped: + return "StatusStopped" + } + return "StatusNone" +} + +// StatusEvent contains a status and timestamp, and can contain an error +type StatusEvent struct { + status Status + err error + timestamp time.Time +} + +// Status returns the Status (enum) associated with the StatusEvent +func (ev *StatusEvent) Status() Status { + return ev.status +} + +// Err returns the error associated with the StatusEvent. +func (ev *StatusEvent) Err() error { + return ev.err +} + +// Timestamp returns the timestamp associated with the StatusEvent +func (ev *StatusEvent) Timestamp() time.Time { + return ev.timestamp +} + +// NewStatusEvent creates and returns a StatusEvent with the specified status and sets the timestamp +// time.Now(). To set an error on the event for an error status use one of the dedicated +// constructors (e.g. NewRecoverableErrorEvent, NewPermanentErrorEvent, NewFatalErrorEvent) +func NewStatusEvent(status Status) *StatusEvent { + return &StatusEvent{ + status: status, + timestamp: time.Now(), + } +} + +// NewRecoverableErrorEvent creates and returns a StatusEvent with StatusRecoverableError, the +// specified error, and a timestamp set to time.Now(). +func NewRecoverableErrorEvent(err error) *StatusEvent { + ev := NewStatusEvent(StatusRecoverableError) + ev.err = err + return ev +} + +// NewPermanentErrorEvent creates and returns a StatusEvent with StatusPermanentError, the +// specified error, and a timestamp set to time.Now(). +func NewPermanentErrorEvent(err error) *StatusEvent { + ev := NewStatusEvent(StatusPermanentError) + ev.err = err + return ev +} + +// NewFatalErrorEvent creates and returns a StatusEvent with StatusFatalError, the +// specified error, and a timestamp set to time.Now(). +func NewFatalErrorEvent(err error) *StatusEvent { + ev := NewStatusEvent(StatusFatalError) + ev.err = err + return ev +} + +// StatusFunc is the expected type of ReportComponentStatus for component.TelemetrySettings +type StatusFunc func(*StatusEvent) error + +// AggregateStatus will derive a status for the given input using the following rules in order: +// 1. If all instances have the same status, there is nothing to aggregate, return it. +// 2. If any instance encounters a fatal error, the component is in a Fatal Error state. +// 3. If any instance is in a Permanent Error state, the component status is Permanent Error. +// 4. If any instance is Stopping, the component is in a Stopping state. +// 5. An instance is Stopped, but not all instances are Stopped, we must be in the process of Stopping the component. +// 6. If any instance is in a Recoverable Error state, the component status is Recoverable Error. +// 7. By process of elimination, the only remaining state is starting. +func AggregateStatus[K comparable](eventMap map[K]*StatusEvent) Status { + seen := make(map[Status]struct{}) + for _, ev := range eventMap { + seen[ev.Status()] = struct{}{} + } + + // All statuses are the same. Note, this will handle StatusOK and StatusStopped as these two + // cases require all components be in the same state. + if len(seen) == 1 { + for st := range seen { + return st + } + } + + // Handle mixed status cases + if _, isFatal := seen[StatusFatalError]; isFatal { + return StatusFatalError + } + + if _, isPermanent := seen[StatusPermanentError]; isPermanent { + return StatusPermanentError + } + + if _, isStopping := seen[StatusStopping]; isStopping { + return StatusStopping + } + + if _, isStopped := seen[StatusStopped]; isStopped { + return StatusStopping + } + + if _, isRecoverable := seen[StatusRecoverableError]; isRecoverable { + return StatusRecoverableError + } + + // By process of elimination, this is the last possible status; no check necessary. + return StatusStarting +} + +// StatusIsError returns true for error statuses (e.g. StatusRecoverableError, +// StatusPermanentError, or StatusFatalError) +func StatusIsError(status Status) bool { + return status == StatusRecoverableError || + status == StatusPermanentError || + status == StatusFatalError +} + +// AggregateStatusEvent returns a status event where: +// - The status is set to the aggregate status of the events in the eventMap +// - The timestamp is set to the latest timestamp of the events in the eventMap +// - For an error status, the event will have same error as the most current event of the same +// error type from the eventMap +func AggregateStatusEvent[K comparable](eventMap map[K]*StatusEvent) *StatusEvent { + var lastEvent, lastMatchingEvent *StatusEvent + aggregateStatus := AggregateStatus[K](eventMap) + + for _, ev := range eventMap { + if lastEvent == nil || lastEvent.timestamp.Before(ev.timestamp) { + lastEvent = ev + } + if aggregateStatus == ev.Status() && + (lastMatchingEvent == nil || lastMatchingEvent.timestamp.Before(ev.timestamp)) { + lastMatchingEvent = ev + } + } + + // the effective status matches an existing event + if lastEvent.Status() == aggregateStatus { + return lastEvent + } + + // the effective status requires a synthetic event + aggregateEvent := &StatusEvent{ + status: aggregateStatus, + timestamp: lastEvent.timestamp, + } + if StatusIsError(aggregateStatus) { + aggregateEvent.err = lastMatchingEvent.err + } + + return aggregateEvent +} diff --git a/component/status_test.go b/component/status_test.go new file mode 100644 index 00000000000..13755d078a5 --- /dev/null +++ b/component/status_test.go @@ -0,0 +1,330 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 +package component + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewStatusEvent(t *testing.T) { + statuses := []Status{ + StatusStarting, + StatusOK, + StatusRecoverableError, + StatusPermanentError, + StatusFatalError, + StatusStopping, + StatusStopped, + } + + for _, status := range statuses { + t.Run(fmt.Sprintf("%s without error", status), func(t *testing.T) { + ev := NewStatusEvent(status) + require.Equal(t, status, ev.Status()) + require.Nil(t, ev.Err()) + require.False(t, ev.Timestamp().IsZero()) + }) + } +} + +func TestStatusEventsWithError(t *testing.T) { + statusConstructorMap := map[Status]func(error) *StatusEvent{ + StatusRecoverableError: NewRecoverableErrorEvent, + StatusPermanentError: NewPermanentErrorEvent, + StatusFatalError: NewFatalErrorEvent, + } + + for status, newEvent := range statusConstructorMap { + t.Run(fmt.Sprintf("error status constructor for: %s", status), func(t *testing.T) { + ev := newEvent(assert.AnError) + require.Equal(t, status, ev.Status()) + require.Equal(t, assert.AnError, ev.Err()) + require.False(t, ev.Timestamp().IsZero()) + }) + } +} + +func TestAggregateStatus(t *testing.T) { + for _, tc := range []struct { + name string + statusMap map[*InstanceID]*StatusEvent + expectedStatus Status + }{ + { + name: "aggregate status with fatal is FatalError", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + {}: NewStatusEvent(StatusFatalError), + {}: NewStatusEvent(StatusRecoverableError), + }, + expectedStatus: StatusFatalError, + }, + { + name: "aggregate status with permanent is PermanentError", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + {}: NewStatusEvent(StatusPermanentError), + {}: NewStatusEvent(StatusRecoverableError), + }, + expectedStatus: StatusPermanentError, + }, + { + name: "aggregate status with stopping is Stopping", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + {}: NewStatusEvent(StatusRecoverableError), + {}: NewStatusEvent(StatusStopping), + }, + expectedStatus: StatusStopping, + }, + { + name: "aggregate status with stopped and non-stopped is Stopping", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + {}: NewStatusEvent(StatusRecoverableError), + {}: NewStatusEvent(StatusStopped), + }, + expectedStatus: StatusStopping, + }, + { + name: "aggregate status with all stopped is Stopped", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStopped), + {}: NewStatusEvent(StatusStopped), + {}: NewStatusEvent(StatusStopped), + }, + expectedStatus: StatusStopped, + }, + { + name: "aggregate status with recoverable is RecoverableError", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + {}: NewStatusEvent(StatusRecoverableError), + }, + expectedStatus: StatusRecoverableError, + }, + { + name: "aggregate status with starting is Starting", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + }, + expectedStatus: StatusStarting, + }, + { + name: "aggregate status with all ok is OK", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusOK), + {}: NewStatusEvent(StatusOK), + {}: NewStatusEvent(StatusOK), + }, + expectedStatus: StatusOK, + }, + } { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expectedStatus, AggregateStatus(tc.statusMap)) + }) + } +} + +func TestStatusIsError(t *testing.T) { + for _, tc := range []struct { + status Status + isError bool + }{ + { + status: StatusStarting, + isError: false, + }, + { + status: StatusOK, + isError: false, + }, + { + status: StatusRecoverableError, + isError: true, + }, + { + status: StatusPermanentError, + isError: true, + }, + { + status: StatusFatalError, + isError: true, + }, + { + status: StatusStopping, + isError: false, + }, + { + status: StatusStopped, + isError: false, + }, + } { + name := fmt.Sprintf("StatusIsError(%s) is %t", tc.status, tc.isError) + t.Run(name, func(t *testing.T) { + assert.Equal(t, tc.isError, StatusIsError(tc.status)) + }) + } +} + +func TestAggregateStatusEvent(t *testing.T) { + // maxTime is used to make sure we select the event with the latest timestamp + maxTime := time.Unix(1<<63-62135596801, 999999999) + // latest sets the timestamp for an event to maxTime + latest := func(ev *StatusEvent) *StatusEvent { + ev.timestamp = maxTime + return ev + } + + for _, tc := range []struct { + name string + statusMap map[*InstanceID]*StatusEvent + expectedStatus *StatusEvent + }{ + { + name: "FatalError - existing event", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + {}: latest(NewFatalErrorEvent(assert.AnError)), + {}: NewStatusEvent(StatusRecoverableError), + }, + expectedStatus: &StatusEvent{ + status: StatusFatalError, + timestamp: maxTime, + err: assert.AnError, + }, + }, + { + name: "FatalError - synthetic event", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + {}: NewFatalErrorEvent(assert.AnError), + {}: latest(NewStatusEvent(StatusRecoverableError)), + }, + expectedStatus: &StatusEvent{ + status: StatusFatalError, + timestamp: maxTime, + err: assert.AnError, + }, + }, + { + name: "PermanentError - existing event", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + {}: latest(NewPermanentErrorEvent(assert.AnError)), + {}: NewStatusEvent(StatusRecoverableError), + }, + expectedStatus: &StatusEvent{ + status: StatusPermanentError, + timestamp: maxTime, + err: assert.AnError, + }, + }, + { + name: "PermanentError - synthetic event", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + {}: NewPermanentErrorEvent(assert.AnError), + {}: latest(NewStatusEvent(StatusRecoverableError)), + }, + expectedStatus: &StatusEvent{ + status: StatusPermanentError, + timestamp: maxTime, + err: assert.AnError, + }, + }, + { + name: "Stopping - existing event", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + {}: NewStatusEvent(StatusRecoverableError), + {}: latest(NewStatusEvent(StatusStopping)), + }, + expectedStatus: &StatusEvent{ + status: StatusStopping, + timestamp: maxTime, + }, + }, + { + name: "Stopping - synthetic event", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + {}: NewStatusEvent(StatusRecoverableError), + {}: latest(NewStatusEvent(StatusStopped)), + }, + expectedStatus: &StatusEvent{ + status: StatusStopping, + timestamp: maxTime, + }, + }, + { + name: "Stopped - existing event", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStopped), + {}: latest(NewStatusEvent(StatusStopped)), + {}: NewStatusEvent(StatusStopped), + }, + expectedStatus: &StatusEvent{ + status: StatusStopped, + timestamp: maxTime, + }, + }, + { + name: "RecoverableError - existing event", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: NewStatusEvent(StatusOK), + {}: latest(NewRecoverableErrorEvent(assert.AnError)), + }, + expectedStatus: &StatusEvent{ + status: StatusRecoverableError, + timestamp: maxTime, + err: assert.AnError, + }, + }, + { + name: "Starting - synthetic event", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusStarting), + {}: latest(NewStatusEvent(StatusOK)), + }, + expectedStatus: &StatusEvent{ + status: StatusStarting, + timestamp: maxTime, + }, + }, + { + name: "OK - existing event", + statusMap: map[*InstanceID]*StatusEvent{ + {}: NewStatusEvent(StatusOK), + {}: latest(NewStatusEvent(StatusOK)), + {}: NewStatusEvent(StatusOK), + }, + expectedStatus: &StatusEvent{ + status: StatusOK, + timestamp: maxTime, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + assert.Equal(t, tc.expectedStatus, AggregateStatusEvent(tc.statusMap)) + }) + } +} diff --git a/component/telemetry.go b/component/telemetry.go index 9617e456319..5eb6bcf457a 100644 --- a/component/telemetry.go +++ b/component/telemetry.go @@ -12,7 +12,7 @@ import ( "go.opentelemetry.io/collector/pdata/pcommon" ) -type TelemetrySettings struct { +type TelemetrySettingsBase[T any] struct { // Logger that the factory can use during creation and can pass to the created // component to be used later as well. Logger *zap.Logger @@ -29,4 +29,20 @@ type TelemetrySettings struct { // Resource contains the resource attributes for the collector's telemetry. Resource pcommon.Resource + + // ReportComponentStatus allows a component to report runtime changes in status. The service + // will automatically report status for a component during startup and shutdown. Components can + // use this method to report status after start and before shutdown. ReportComponentStatus + // will only return errors if the API used incorrectly. The two scenarios where an error will + // be returned are: + // + // - An illegal state transition + // - Calling this method before component startup + // + // If the API is being used properly, these errors are safe to ignore. + ReportComponentStatus T } + +// TelemetrySettings and servicetelemetry.Settings differ in the method signature for +// ReportComponentStatus +type TelemetrySettings TelemetrySettingsBase[StatusFunc] diff --git a/extension/extension.go b/extension/extension.go index 6b8df571b81..2521fc65a18 100644 --- a/extension/extension.go +++ b/extension/extension.go @@ -40,6 +40,17 @@ type ConfigWatcher interface { NotifyConfig(ctx context.Context, conf *confmap.Conf) error } +// StatusWatcher is an extra interface for Extension hosted by the OpenTelemetry +// Collector that is to be implemented by extensions interested in changes to component +// status. +type StatusWatcher interface { + // ComponentStatusChanged notifies about a change in the source component status. + // Extensions that implement this interface must be ready that the ComponentStatusChanged + // may be called before, after or concurrently with calls to Component.Start() and Component.Shutdown(). + // The function may be called concurrently with itself. + ComponentStatusChanged(source *component.InstanceID, event *component.StatusEvent) +} + // CreateSettings is passed to Factory.Create(...) function. type CreateSettings struct { // ID returns the ID of the component that will be created. diff --git a/extension/extensiontest/statuswatcher_extension.go b/extension/extensiontest/statuswatcher_extension.go new file mode 100644 index 00000000000..e501353dcc6 --- /dev/null +++ b/extension/extensiontest/statuswatcher_extension.go @@ -0,0 +1,47 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package extensiontest // import "go.opentelemetry.io/collector/extension/extensiontest" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/extension" +) + +// NewStatusWatcherExtensionCreateSettings returns a new nop settings for Create*Extension functions. +func NewStatusWatcherExtensionCreateSettings() extension.CreateSettings { + return extension.CreateSettings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + } +} + +// NewStatusWatcherExtensionFactory returns a component.ExtensionFactory to construct a status watcher extension. +func NewStatusWatcherExtensionFactory( + onStatusChanged func(source *component.InstanceID, event *component.StatusEvent), +) extension.Factory { + return extension.NewFactory( + "statuswatcher", + func() component.Config { + return &struct{}{} + }, + func(context.Context, extension.CreateSettings, component.Config) (component.Component, error) { + return &statusWatcherExtension{onStatusChanged: onStatusChanged}, nil + }, + component.StabilityLevelStable) +} + +// statusWatcherExtension receives status events reported via component status reporting for testing +// purposes. +type statusWatcherExtension struct { + component.StartFunc + component.ShutdownFunc + onStatusChanged func(source *component.InstanceID, event *component.StatusEvent) +} + +func (e statusWatcherExtension) ComponentStatusChanged(source *component.InstanceID, event *component.StatusEvent) { + e.onStatusChanged(source, event) +} diff --git a/extension/extensiontest/statuswatcher_extension_test.go b/extension/extensiontest/statuswatcher_extension_test.go new file mode 100644 index 00000000000..0bfdf4f5eda --- /dev/null +++ b/extension/extensiontest/statuswatcher_extension_test.go @@ -0,0 +1,39 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package extensiontest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/extension" +) + +func TestStatusWatcherExtension(t *testing.T) { + statusChanged := false + factory := NewStatusWatcherExtensionFactory( + func(*component.InstanceID, *component.StatusEvent) { + statusChanged = true + }, + ) + require.NotNil(t, factory) + assert.Equal(t, component.Type("statuswatcher"), factory.Type()) + cfg := factory.CreateDefaultConfig() + assert.Equal(t, &struct{}{}, cfg) + + ext, err := factory.CreateExtension(context.Background(), NewStatusWatcherExtensionCreateSettings(), cfg) + require.NoError(t, err) + assert.NoError(t, ext.Start(context.Background(), componenttest.NewNopHost())) + assert.False(t, statusChanged) + + ext.(extension.StatusWatcher).ComponentStatusChanged(&component.InstanceID{}, &component.StatusEvent{}) + + assert.True(t, statusChanged) + assert.NoError(t, ext.Shutdown(context.Background())) +} diff --git a/internal/sharedcomponent/sharedcomponent.go b/internal/sharedcomponent/sharedcomponent.go index 2d1c74f355e..cddebb59902 100644 --- a/internal/sharedcomponent/sharedcomponent.go +++ b/internal/sharedcomponent/sharedcomponent.go @@ -27,19 +27,37 @@ func NewSharedComponents[K comparable, V component.Component]() *SharedComponent // GetOrAdd returns the already created instance if exists, otherwise creates a new instance // and adds it to the map of references. -func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error)) (*SharedComponent[V], error) { +func (scs *SharedComponents[K, V]) GetOrAdd(key K, create func() (V, error), telemetrySettings *component.TelemetrySettings) (*SharedComponent[V], error) { if c, ok := scs.comps[key]; ok { + // If we haven't already seen this telemetry settings, this shared component represents + // another instance. Wrap ReportComponentStatus to report for all instances this shared + // component represents. + if _, ok := c.seenSettings[telemetrySettings]; !ok { + c.seenSettings[telemetrySettings] = struct{}{} + prev := c.telemetry.ReportComponentStatus + c.telemetry.ReportComponentStatus = func(ev *component.StatusEvent) error { + if err := telemetrySettings.ReportComponentStatus(ev); err != nil { + return err + } + return prev(ev) + } + } return c, nil } comp, err := create() if err != nil { return nil, err } + newComp := &SharedComponent[V]{ component: comp, removeFunc: func() { delete(scs.comps, key) }, + telemetry: telemetrySettings, + seenSettings: map[*component.TelemetrySettings]struct{}{ + telemetrySettings: {}, + }, } scs.comps[key] = newComp return newComp, nil @@ -53,6 +71,9 @@ type SharedComponent[V component.Component] struct { startOnce sync.Once stopOnce sync.Once removeFunc func() + + telemetry *component.TelemetrySettings + seenSettings map[*component.TelemetrySettings]struct{} } // Unwrap returns the original component. @@ -64,7 +85,14 @@ func (r *SharedComponent[V]) Unwrap() V { func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) error { var err error r.startOnce.Do(func() { - err = r.component.Start(ctx, host) + // It's important that status for a sharedcomponent is reported through its + // telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates + // and takes priority over the automated status reporting that happens in graph, making the + // status reporting in graph a no-op. + _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting)) + if err = r.component.Start(ctx, host); err != nil { + _ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) + } }) return err } @@ -73,7 +101,17 @@ func (r *SharedComponent[V]) Start(ctx context.Context, host component.Host) err func (r *SharedComponent[V]) Shutdown(ctx context.Context) error { var err error r.stopOnce.Do(func() { + // It's important that status for a sharedcomponent is reported through its + // telemetrysettings to keep status in sync and avoid race conditions. This logic duplicates + // and takes priority over the automated status reporting that happens in graph, making the + // the status reporting in graph a no-op. + _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping)) err = r.component.Shutdown(ctx) + if err != nil { + _ = r.telemetry.ReportComponentStatus(component.NewPermanentErrorEvent(err)) + } else { + _ = r.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped)) + } r.removeFunc() }) return err diff --git a/internal/sharedcomponent/sharedcomponent_test.go b/internal/sharedcomponent/sharedcomponent_test.go index 112f1d79d07..5ce081fa752 100644 --- a/internal/sharedcomponent/sharedcomponent_test.go +++ b/internal/sharedcomponent/sharedcomponent_test.go @@ -20,6 +20,7 @@ var id = component.NewID("test") type baseComponent struct { component.StartFunc component.ShutdownFunc + telemetry *component.TelemetrySettings } func TestNewSharedComponents(t *testing.T) { @@ -31,7 +32,11 @@ func TestNewSharedComponentsCreateError(t *testing.T) { comps := NewSharedComponents[component.ID, *baseComponent]() assert.Len(t, comps.comps, 0) myErr := errors.New("my error") - _, err := comps.GetOrAdd(id, func() (*baseComponent, error) { return nil, myErr }) + _, err := comps.GetOrAdd( + id, + func() (*baseComponent, error) { return nil, myErr }, + newNopTelemetrySettings(), + ) assert.ErrorIs(t, err, myErr) assert.Len(t, comps.comps, 0) } @@ -40,18 +45,31 @@ func TestSharedComponentsGetOrAdd(t *testing.T) { nop := &baseComponent{} comps := NewSharedComponents[component.ID, *baseComponent]() - got, err := comps.GetOrAdd(id, func() (*baseComponent, error) { return nop, nil }) + got, err := comps.GetOrAdd( + id, + func() (*baseComponent, error) { return nop, nil }, + newNopTelemetrySettings(), + ) require.NoError(t, err) assert.Len(t, comps.comps, 1) assert.Same(t, nop, got.Unwrap()) - gotSecond, err := comps.GetOrAdd(id, func() (*baseComponent, error) { panic("should not be called") }) + gotSecond, err := comps.GetOrAdd( + id, + func() (*baseComponent, error) { panic("should not be called") }, + newNopTelemetrySettings(), + ) + require.NoError(t, err) assert.Same(t, got, gotSecond) // Shutdown nop will remove assert.NoError(t, got.Shutdown(context.Background())) assert.Len(t, comps.comps, 0) - gotThird, err := comps.GetOrAdd(id, func() (*baseComponent, error) { return nop, nil }) + gotThird, err := comps.GetOrAdd( + id, + func() (*baseComponent, error) { return nop, nil }, + newNopTelemetrySettings(), + ) require.NoError(t, err) assert.NotSame(t, got, gotThird) } @@ -71,7 +89,11 @@ func TestSharedComponent(t *testing.T) { }} comps := NewSharedComponents[component.ID, *baseComponent]() - got, err := comps.GetOrAdd(id, func() (*baseComponent, error) { return comp, nil }) + got, err := comps.GetOrAdd( + id, + func() (*baseComponent, error) { return comp, nil }, + newNopTelemetrySettings(), + ) require.NoError(t, err) assert.Equal(t, wantErr, got.Start(context.Background(), componenttest.NewNopHost())) assert.Equal(t, 1, calledStart) @@ -84,3 +106,184 @@ func TestSharedComponent(t *testing.T) { assert.NoError(t, got.Shutdown(context.Background())) assert.Equal(t, 1, calledStop) } + +func TestSharedComponentsReportStatus(t *testing.T) { + reportedStatuses := make(map[*component.InstanceID][]component.Status) + newStatusFunc := func() func(*component.StatusEvent) error { + instanceID := &component.InstanceID{} + return func(ev *component.StatusEvent) error { + // Use an event with component.StatusNone to simulate an error. + if ev.Status() == component.StatusNone { + return assert.AnError + } + reportedStatuses[instanceID] = append(reportedStatuses[instanceID], ev.Status()) + return nil + } + } + + comp := &baseComponent{} + comps := NewSharedComponents[component.ID, *baseComponent]() + var telemetrySettings *component.TelemetrySettings + + // make a shared component that represents three instances + for i := 0; i < 3; i++ { + telemetrySettings = newNopTelemetrySettings() + telemetrySettings.ReportComponentStatus = newStatusFunc() + // The initial settings for the shared component need to match the ones passed to the first + // invocation of GetOrAdd so that underlying telemetry settings reference can be used to + // wrap ReportComponentStatus for subsequently added "instances". + if i == 0 { + comp.telemetry = telemetrySettings + } + got, err := comps.GetOrAdd( + id, + func() (*baseComponent, error) { return comp, nil }, + telemetrySettings, + ) + require.NoError(t, err) + assert.Len(t, comps.comps, 1) + assert.Same(t, comp, got.Unwrap()) + } + + // make sure we don't try to represent a fourth instance if we reuse a telemetrySettings + _, _ = comps.GetOrAdd( + id, + func() (*baseComponent, error) { return comp, nil }, + telemetrySettings, + ) + + err := comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStarting)) + require.NoError(t, err) + + // ok + err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusOK)) + require.NoError(t, err) + + // simulate an error + err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusNone)) + require.ErrorIs(t, err, assert.AnError) + + // stopping + err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopping)) + require.NoError(t, err) + + // stopped + err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusStopped)) + require.NoError(t, err) + + // The shared component represents 3 component instances. Reporting status for the shared + // component should report status for each of the instances it represents. + expectedStatuses := []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + } + + require.Equal(t, 3, len(reportedStatuses)) + + for _, actualStatuses := range reportedStatuses { + require.Equal(t, expectedStatuses, actualStatuses) + } +} + +func TestReportStatusOnStartShutdown(t *testing.T) { + for _, tc := range []struct { + name string + startErr error + shutdownErr error + expectedStatuses []component.Status + }{ + { + name: "successful start/stop", + startErr: nil, + shutdownErr: nil, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + }, + { + name: "start error", + startErr: assert.AnError, + shutdownErr: nil, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusPermanentError, + }, + }, + { + name: "shutdown error", + shutdownErr: assert.AnError, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusStopping, + component.StatusPermanentError, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + reportedStatuses := make(map[*component.InstanceID][]component.Status) + newStatusFunc := func() func(*component.StatusEvent) error { + instanceID := &component.InstanceID{} + return func(ev *component.StatusEvent) error { + reportedStatuses[instanceID] = append(reportedStatuses[instanceID], ev.Status()) + return nil + } + } + base := &baseComponent{} + if tc.startErr != nil { + base.StartFunc = func(context.Context, component.Host) error { + return tc.startErr + } + } + if tc.shutdownErr != nil { + base.ShutdownFunc = func(context.Context) error { + return tc.shutdownErr + } + } + comps := NewSharedComponents[component.ID, *baseComponent]() + var comp *SharedComponent[*baseComponent] + var err error + for i := 0; i < 3; i++ { + telemetrySettings := newNopTelemetrySettings() + telemetrySettings.ReportComponentStatus = newStatusFunc() + if i == 0 { + base.telemetry = telemetrySettings + } + comp, err = comps.GetOrAdd( + id, + func() (*baseComponent, error) { return base, nil }, + telemetrySettings, + ) + require.NoError(t, err) + } + + err = comp.Start(context.Background(), componenttest.NewNopHost()) + require.Equal(t, tc.startErr, err) + + if tc.startErr == nil { + err = comp.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusOK)) + require.NoError(t, err) + + err = comp.Shutdown(context.Background()) + require.Equal(t, tc.shutdownErr, err) + } + + require.Equal(t, 3, len(reportedStatuses)) + + for _, actualStatuses := range reportedStatuses { + require.Equal(t, tc.expectedStatuses, actualStatuses) + } + }) + } +} + +// newNopTelemetrySettings streamlines getting a pointer to a NopTelemetrySettings +func newNopTelemetrySettings() *component.TelemetrySettings { + set := componenttest.NewNopTelemetrySettings() + return &set +} diff --git a/otelcol/collector_test.go b/otelcol/collector_test.go index 1a0ad8d0607..f1a59c54430 100644 --- a/otelcol/collector_test.go +++ b/otelcol/collector_test.go @@ -19,6 +19,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/converter/expandconverter" + "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/processor/processortest" ) func TestStateString(t *testing.T) { @@ -151,6 +153,85 @@ func TestCollectorReportError(t *testing.T) { assert.Equal(t, StateClosed, col.GetState()) } +func TestComponentStatusWatcher(t *testing.T) { + factories, err := nopFactories() + assert.NoError(t, err) + + // Use a processor factory that creates "unhealthy" processor: one that + // always reports StatusRecoverableError after successful Start. + unhealthyProcessorFactory := processortest.NewUnhealthyProcessorFactory() + factories.Processors[unhealthyProcessorFactory.Type()] = unhealthyProcessorFactory + + // Keep track of all status changes in a map. + changedComponents := map[*component.InstanceID][]component.Status{} + var mux sync.Mutex + onStatusChanged := func(source *component.InstanceID, event *component.StatusEvent) { + if source.ID.Type() != unhealthyProcessorFactory.Type() { + return + } + mux.Lock() + defer mux.Unlock() + changedComponents[source] = append(changedComponents[source], event.Status()) + } + + // Add a "statuswatcher" extension that will receive notifications when processor + // status changes. + factory := extensiontest.NewStatusWatcherExtensionFactory(onStatusChanged) + factories.Extensions[factory.Type()] = factory + + // Read config from file. This config uses 3 "unhealthy" processors. + validProvider, err := NewConfigProvider(newDefaultConfigProviderSettings([]string{filepath.Join("testdata", "otelcol-statuswatcher.yaml")})) + require.NoError(t, err) + + // Create a collector + col, err := NewCollector(CollectorSettings{ + BuildInfo: component.NewDefaultBuildInfo(), + Factories: factories, + ConfigProvider: validProvider, + }) + require.NoError(t, err) + + // Start the newly created collector. + wg := startCollector(context.Background(), t, col) + + // An unhealthy processor asynchronously reports a recoverable error. + expectedStatuses := []component.Status{ + component.StatusStarting, + component.StatusRecoverableError, + } + + // The "unhealthy" processors will now begin to asynchronously report StatusRecoverableError. + // We expect to see these reports. + assert.Eventually(t, func() bool { + mux.Lock() + defer mux.Unlock() + + for k, v := range changedComponents { + // All processors must report a status change with the same ID + assert.EqualValues(t, component.NewID(unhealthyProcessorFactory.Type()), k.ID) + // And all must have the expected statuses + assert.Equal(t, expectedStatuses, v) + } + // We have 3 processors with exactly the same ID in otelcol-statuswatcher.yaml + // We must have exactly 3 items in our map. This ensures that the "source" argument + // passed to status change func is unique per instance of source component despite + // components having the same IDs (having same ID for different component instances + // is a normal situation for processors). + return len(changedComponents) == 3 + }, 2*time.Second, time.Millisecond*100) + + col.Shutdown() + wg.Wait() + + // Check for additional statuses after Shutdown. + expectedStatuses = append(expectedStatuses, component.StatusStopping, component.StatusStopped) + for _, v := range changedComponents { + assert.Equal(t, expectedStatuses, v) + } + + assert.Equal(t, StateClosed, col.GetState()) +} + func TestCollectorSendSignal(t *testing.T) { factories, err := nopFactories() require.NoError(t, err) diff --git a/otelcol/testdata/otelcol-statuswatcher.yaml b/otelcol/testdata/otelcol-statuswatcher.yaml new file mode 100644 index 00000000000..2dcc322d341 --- /dev/null +++ b/otelcol/testdata/otelcol-statuswatcher.yaml @@ -0,0 +1,31 @@ +receivers: + nop: + +processors: + nop: + unhealthy: + +exporters: + nop: + +extensions: + statuswatcher: + +service: + telemetry: + metrics: + address: localhost:8888 + extensions: [statuswatcher] + pipelines: + traces: + receivers: [nop] + processors: [nop,unhealthy] + exporters: [nop] + metrics: + receivers: [nop] + processors: [nop,unhealthy] + exporters: [nop] + logs: + receivers: [nop] + processors: [nop,unhealthy] + exporters: [nop] diff --git a/processor/processortest/unhealthy_processor.go b/processor/processortest/unhealthy_processor.go new file mode 100644 index 00000000000..eeb2e1b8d87 --- /dev/null +++ b/processor/processortest/unhealthy_processor.go @@ -0,0 +1,70 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package processortest // import "go.opentelemetry.io/collector/processor/processortest" + +import ( + "context" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/processor" +) + +// NewUnhealthyProcessorCreateSettings returns a new nop settings for Create*Processor functions. +func NewUnhealthyProcessorCreateSettings() processor.CreateSettings { + return processor.CreateSettings{ + TelemetrySettings: componenttest.NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + } +} + +// NewUnhealthyProcessorFactory returns a component.ProcessorFactory that constructs nop processors. +func NewUnhealthyProcessorFactory() processor.Factory { + return processor.NewFactory( + "unhealthy", + func() component.Config { + return &struct{}{} + }, + processor.WithTraces(createUnhealthyTracesProcessor, component.StabilityLevelStable), + processor.WithMetrics(createUnhealthyMetricsProcessor, component.StabilityLevelStable), + processor.WithLogs(createUnhealthyLogsProcessor, component.StabilityLevelStable), + ) +} + +func createUnhealthyTracesProcessor(_ context.Context, set processor.CreateSettings, _ component.Config, _ consumer.Traces) (processor.Traces, error) { + return &unhealthyProcessor{ + Consumer: consumertest.NewNop(), + telemetry: set.TelemetrySettings, + }, nil +} + +func createUnhealthyMetricsProcessor(_ context.Context, set processor.CreateSettings, _ component.Config, _ consumer.Metrics) (processor.Metrics, error) { + return &unhealthyProcessor{ + Consumer: consumertest.NewNop(), + telemetry: set.TelemetrySettings, + }, nil +} + +func createUnhealthyLogsProcessor(_ context.Context, set processor.CreateSettings, _ component.Config, _ consumer.Logs) (processor.Logs, error) { + return &unhealthyProcessor{ + Consumer: consumertest.NewNop(), + telemetry: set.TelemetrySettings, + }, nil +} + +type unhealthyProcessor struct { + component.StartFunc + component.ShutdownFunc + consumertest.Consumer + telemetry component.TelemetrySettings +} + +func (p unhealthyProcessor) Start(_ context.Context, _ component.Host) error { + go func() { + _ = p.telemetry.ReportComponentStatus(component.NewStatusEvent(component.StatusRecoverableError)) + }() + return nil +} diff --git a/processor/processortest/unhealthy_processor_test.go b/processor/processortest/unhealthy_processor_test.go new file mode 100644 index 00000000000..adc80322f22 --- /dev/null +++ b/processor/processortest/unhealthy_processor_test.go @@ -0,0 +1,49 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package processortest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestNewUnhealthyProcessorFactory(t *testing.T) { + factory := NewUnhealthyProcessorFactory() + require.NotNil(t, factory) + assert.Equal(t, component.Type("unhealthy"), factory.Type()) + cfg := factory.CreateDefaultConfig() + assert.Equal(t, &struct{}{}, cfg) + + traces, err := factory.CreateTracesProcessor(context.Background(), NewUnhealthyProcessorCreateSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.Equal(t, consumer.Capabilities{MutatesData: false}, traces.Capabilities()) + assert.NoError(t, traces.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, traces.ConsumeTraces(context.Background(), ptrace.NewTraces())) + assert.NoError(t, traces.Shutdown(context.Background())) + + metrics, err := factory.CreateMetricsProcessor(context.Background(), NewUnhealthyProcessorCreateSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.Equal(t, consumer.Capabilities{MutatesData: false}, metrics.Capabilities()) + assert.NoError(t, metrics.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, metrics.ConsumeMetrics(context.Background(), pmetric.NewMetrics())) + assert.NoError(t, metrics.Shutdown(context.Background())) + + logs, err := factory.CreateLogsProcessor(context.Background(), NewUnhealthyProcessorCreateSettings(), cfg, consumertest.NewNop()) + require.NoError(t, err) + assert.Equal(t, consumer.Capabilities{MutatesData: false}, logs.Capabilities()) + assert.NoError(t, logs.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, logs.ConsumeLogs(context.Background(), plog.NewLogs())) + assert.NoError(t, logs.Shutdown(context.Background())) +} diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 11cf3dc6668..cce8b363cd4 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -68,9 +68,13 @@ func createTraces( nextConsumer consumer.Traces, ) (receiver.Traces, error) { oCfg := cfg.(*Config) - r, err := receivers.GetOrAdd(oCfg, func() (*otlpReceiver, error) { - return newOtlpReceiver(oCfg, set) - }) + r, err := receivers.GetOrAdd( + oCfg, + func() (*otlpReceiver, error) { + return newOtlpReceiver(oCfg, &set) + }, + &set.TelemetrySettings, + ) if err != nil { return nil, err } @@ -89,9 +93,13 @@ func createMetrics( consumer consumer.Metrics, ) (receiver.Metrics, error) { oCfg := cfg.(*Config) - r, err := receivers.GetOrAdd(oCfg, func() (*otlpReceiver, error) { - return newOtlpReceiver(oCfg, set) - }) + r, err := receivers.GetOrAdd( + oCfg, + func() (*otlpReceiver, error) { + return newOtlpReceiver(oCfg, &set) + }, + &set.TelemetrySettings, + ) if err != nil { return nil, err } @@ -110,9 +118,13 @@ func createLog( consumer consumer.Logs, ) (receiver.Logs, error) { oCfg := cfg.(*Config) - r, err := receivers.GetOrAdd(oCfg, func() (*otlpReceiver, error) { - return newOtlpReceiver(oCfg, set) - }) + r, err := receivers.GetOrAdd( + oCfg, + func() (*otlpReceiver, error) { + return newOtlpReceiver(oCfg, &set) + }, + &set.TelemetrySettings, + ) if err != nil { return nil, err } diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index faf9a68fe04..c07fea243cb 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -43,13 +43,13 @@ type otlpReceiver struct { obsrepGRPC *receiverhelper.ObsReport obsrepHTTP *receiverhelper.ObsReport - settings receiver.CreateSettings + settings *receiver.CreateSettings } // newOtlpReceiver just creates the OpenTelemetry receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. -func newOtlpReceiver(cfg *Config, set receiver.CreateSettings) (*otlpReceiver, error) { +func newOtlpReceiver(cfg *Config, set *receiver.CreateSettings) (*otlpReceiver, error) { r := &otlpReceiver{ cfg: cfg, settings: set, @@ -62,7 +62,7 @@ func newOtlpReceiver(cfg *Config, set receiver.CreateSettings) (*otlpReceiver, e r.obsrepGRPC, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "grpc", - ReceiverCreateSettings: set, + ReceiverCreateSettings: *set, }) if err != nil { return nil, err @@ -70,7 +70,7 @@ func newOtlpReceiver(cfg *Config, set receiver.CreateSettings) (*otlpReceiver, e r.obsrepHTTP, err = receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ ReceiverID: set.ID, Transport: "http", - ReceiverCreateSettings: set, + ReceiverCreateSettings: *set, }) if err != nil { return nil, err diff --git a/service/extensions/extensions.go b/service/extensions/extensions.go index 54af9d6544c..bb073f74cc5 100644 --- a/service/extensions/extensions.go +++ b/service/extensions/extensions.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/service/internal/components" + "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/internal/zpages" ) @@ -22,8 +23,9 @@ const zExtensionName = "zextensionname" // Extensions is a map of extensions created from extension configs. type Extensions struct { - telemetry component.TelemetrySettings - extMap map[component.ID]extension.Extension + telemetry servicetelemetry.TelemetrySettings + extMap map[component.ID]extension.Extension + instanceIDs map[component.ID]*component.InstanceID } // Start starts all extensions. @@ -32,7 +34,10 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error { for extID, ext := range bes.extMap { extLogger := components.ExtensionLogger(bes.telemetry.Logger, extID) extLogger.Info("Extension is starting...") + instanceID := bes.instanceIDs[extID] + _ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting)) if err := ext.Start(ctx, components.NewHostWrapper(host, extLogger)); err != nil { + _ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err)) return err } extLogger.Info("Extension started.") @@ -44,8 +49,15 @@ func (bes *Extensions) Start(ctx context.Context, host component.Host) error { func (bes *Extensions) Shutdown(ctx context.Context) error { bes.telemetry.Logger.Info("Stopping extensions...") var errs error - for _, ext := range bes.extMap { - errs = multierr.Append(errs, ext.Shutdown(ctx)) + for extID, ext := range bes.extMap { + instanceID := bes.instanceIDs[extID] + _ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping)) + if err := ext.Shutdown(ctx); err != nil { + _ = bes.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(err)) + errs = multierr.Append(errs, err) + continue + } + _ = bes.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopped)) } return errs @@ -84,6 +96,14 @@ func (bes *Extensions) NotifyConfig(ctx context.Context, conf *confmap.Conf) err return errs } +func (bes *Extensions) NotifyComponentStatusChange(source *component.InstanceID, event *component.StatusEvent) { + for _, ext := range bes.extMap { + if sw, ok := ext.(extension.StatusWatcher); ok { + sw.ComponentStatusChanged(source, event) + } + } +} + func (bes *Extensions) GetExtensions() map[component.ID]component.Component { result := make(map[component.ID]component.Component, len(bes.extMap)) for extID, v := range bes.extMap { @@ -120,7 +140,7 @@ func (bes *Extensions) HandleZPages(w http.ResponseWriter, r *http.Request) { // Settings holds configuration for building Extensions. type Settings struct { - Telemetry component.TelemetrySettings + Telemetry servicetelemetry.TelemetrySettings BuildInfo component.BuildInfo // Extensions builder for extensions. @@ -130,13 +150,18 @@ type Settings struct { // New creates a new Extensions from Config. func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { exts := &Extensions{ - telemetry: set.Telemetry, - extMap: make(map[component.ID]extension.Extension), + telemetry: set.Telemetry, + extMap: make(map[component.ID]extension.Extension), + instanceIDs: make(map[component.ID]*component.InstanceID), } for _, extID := range cfg { + instanceID := &component.InstanceID{ + ID: extID, + Kind: component.KindExtension, + } extSet := extension.CreateSettings{ ID: extID, - TelemetrySettings: set.Telemetry, + TelemetrySettings: set.Telemetry.ToComponentTelemetrySettings(instanceID), BuildInfo: set.BuildInfo, } extSet.TelemetrySettings.Logger = components.ExtensionLogger(set.Telemetry.Logger, extID) @@ -152,6 +177,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Extensions, error) { } exts.extMap[extID] = ext + exts.instanceIDs[extID] = instanceID } return exts, nil diff --git a/service/extensions/extensions_test.go b/service/extensions/extensions_test.go index cbb3dd5238d..4b8a574f4f0 100644 --- a/service/extensions/extensions_test.go +++ b/service/extensions/extensions_test.go @@ -16,6 +16,8 @@ import ( "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/extension/extensiontest" + "go.opentelemetry.io/collector/service/internal/servicetelemetry" + "go.opentelemetry.io/collector/service/internal/status" ) func TestBuildExtensions(t *testing.T) { @@ -81,7 +83,7 @@ func TestBuildExtensions(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { _, err := New(context.Background(), Settings{ - Telemetry: componenttest.NewNopTelemetrySettings(), + Telemetry: servicetelemetry.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), Extensions: extension.NewBuilder(tt.extensionsConfigs, tt.factories), }, tt.config) @@ -167,7 +169,7 @@ func TestNotifyConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { extensions, err := New(context.Background(), Settings{ - Telemetry: componenttest.NewNopTelemetrySettings(), + Telemetry: servicetelemetry.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), Extensions: extension.NewBuilder(tt.extensionsConfigs, tt.factories), }, tt.serviceExtensions) @@ -241,3 +243,122 @@ func newCreateErrorExtensionFactory() extension.Factory { component.StabilityLevelDevelopment, ) } + +func TestStatusReportedOnStartupShutdown(t *testing.T) { + // compare two slices of status events ignoring timestamp + assertEqualStatuses := func(t *testing.T, evts1, evts2 []*component.StatusEvent) { + assert.Equal(t, len(evts1), len(evts2)) + for i := 0; i < len(evts1); i++ { + ev1 := evts1[i] + ev2 := evts2[i] + assert.Equal(t, ev1.Status(), ev2.Status()) + assert.Equal(t, ev1.Err(), ev2.Err()) + } + } + + for _, tc := range []struct { + name string + expectedStatuses []*component.StatusEvent + startErr error + shutdownErr error + }{ + { + name: "successful startup/shutdown", + expectedStatuses: []*component.StatusEvent{ + component.NewStatusEvent(component.StatusStarting), + component.NewStatusEvent(component.StatusStopping), + component.NewStatusEvent(component.StatusStopped), + }, + startErr: nil, + shutdownErr: nil, + }, + { + name: "start error", + expectedStatuses: []*component.StatusEvent{ + component.NewStatusEvent(component.StatusStarting), + component.NewPermanentErrorEvent(assert.AnError), + }, + startErr: assert.AnError, + shutdownErr: nil, + }, + { + name: "shutdown error", + expectedStatuses: []*component.StatusEvent{ + component.NewStatusEvent(component.StatusStarting), + component.NewStatusEvent(component.StatusStopping), + component.NewPermanentErrorEvent(assert.AnError), + }, + startErr: nil, + shutdownErr: assert.AnError, + }, + } { + t.Run(tc.name, func(t *testing.T) { + compID := component.NewID("statustest") + factory := newStatusTestExtensionFactory("statustest", tc.startErr, tc.shutdownErr) + config := factory.CreateDefaultConfig() + extensionsConfigs := map[component.ID]component.Config{ + compID: config, + } + factories := map[component.Type]extension.Factory{ + "statustest": factory, + } + extensions, err := New( + context.Background(), + Settings{ + Telemetry: servicetelemetry.NewNopTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + Extensions: extension.NewBuilder(extensionsConfigs, factories), + }, + []component.ID{compID}, + ) + + assert.NoError(t, err) + + var actualStatuses []*component.StatusEvent + init, statusFunc := status.NewServiceStatusFunc(func(id *component.InstanceID, ev *component.StatusEvent) { + actualStatuses = append(actualStatuses, ev) + }) + extensions.telemetry.ReportComponentStatus = statusFunc + init() + + assert.Equal(t, tc.startErr, extensions.Start(context.Background(), componenttest.NewNopHost())) + if tc.startErr == nil { + assert.Equal(t, tc.shutdownErr, extensions.Shutdown(context.Background())) + } + assertEqualStatuses(t, tc.expectedStatuses, actualStatuses) + }) + } +} + +type statusTestExtension struct { + startErr error + shutdownErr error +} + +func (ext *statusTestExtension) Start(_ context.Context, _ component.Host) error { + return ext.startErr +} + +func (ext *statusTestExtension) Shutdown(_ context.Context) error { + return ext.shutdownErr +} + +func newStatusTestExtension(startErr, shutdownErr error) *statusTestExtension { + return &statusTestExtension{ + startErr: startErr, + shutdownErr: shutdownErr, + } +} + +func newStatusTestExtensionFactory(name component.Type, startErr, shutdownErr error) extension.Factory { + return extension.NewFactory( + name, + func() component.Config { + return &struct{}{} + }, + func(ctx context.Context, set extension.CreateSettings, extension component.Config) (extension.Extension, error) { + return newStatusTestExtension(startErr, shutdownErr), nil + }, + component.StabilityLevelDevelopment, + ) +} diff --git a/service/host.go b/service/host.go index d216ae94adb..b749564b0dd 100644 --- a/service/host.go +++ b/service/host.go @@ -33,10 +33,10 @@ type serviceHost struct { // ReportFatalError is used to report to the host that the receiver encountered // a fatal error (i.e.: an error that the instance can't recover from) after // its start function has already returned. +// Deprecated: [0.87.0] Replaced by servicetelemetry.Settings.ReportComponentStatus func (host *serviceHost) ReportFatalError(err error) { host.asyncErrorChannel <- err } - func (host *serviceHost) GetFactory(kind component.Kind, componentType component.Type) component.Factory { switch kind { case component.KindReceiver: @@ -66,3 +66,10 @@ func (host *serviceHost) GetExtensions() map[component.ID]component.Component { func (host *serviceHost) GetExporters() map[component.DataType]map[component.ID]component.Component { return host.pipelines.GetExporters() } + +func (host *serviceHost) notifyComponentStatusChange(source *component.InstanceID, event *component.StatusEvent) { + host.serviceExtensions.NotifyComponentStatusChange(source, event) + if event.Status() == component.StatusFatalError { + host.asyncErrorChannel <- event.Err() + } +} diff --git a/service/internal/graph/graph.go b/service/internal/graph/graph.go index 5b8a404d66f..902bc3a5afd 100644 --- a/service/internal/graph/graph.go +++ b/service/internal/graph/graph.go @@ -22,12 +22,13 @@ import ( "go.opentelemetry.io/collector/processor" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/service/internal/capabilityconsumer" + "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/pipelines" ) // Settings holds configuration for building builtPipelines. type Settings struct { - Telemetry component.TelemetrySettings + Telemetry servicetelemetry.TelemetrySettings BuildInfo component.BuildInfo ReceiverBuilder *receiver.Builder @@ -45,12 +46,19 @@ type Graph struct { // Keep track of how nodes relate to pipelines, so we can declare edges in the graph. pipelines map[component.ID]*pipelineNodes + + // Keep track of status source per node + instanceIDs map[int64]*component.InstanceID + + telemetry servicetelemetry.TelemetrySettings } func Build(ctx context.Context, set Settings) (*Graph, error) { pipelines := &Graph{ componentGraph: simple.NewDirectedGraph(), pipelines: make(map[component.ID]*pipelineNodes, len(set.PipelineConfigs)), + instanceIDs: make(map[int64]*component.InstanceID), + telemetry: set.Telemetry, } for pipelineID := range set.PipelineConfigs { pipelines.pipelines[pipelineID] = &pipelineNodes{ @@ -82,14 +90,15 @@ func (g *Graph) createNodes(set Settings) error { connectorsAsReceiver[recvID] = append(connectorsAsReceiver[recvID], pipelineID) continue } - rcvrNode := g.createReceiver(pipelineID.Type(), recvID) + rcvrNode := g.createReceiver(pipelineID, recvID) pipe.receivers[rcvrNode.ID()] = rcvrNode } pipe.capabilitiesNode = newCapabilitiesNode(pipelineID) for _, procID := range pipelineCfg.Processors { - pipe.processors = append(pipe.processors, g.createProcessor(pipelineID, procID)) + procNode := g.createProcessor(pipelineID, procID) + pipe.processors = append(pipe.processors, procNode) } pipe.fanOutNode = newFanOutNode(pipelineID) @@ -100,7 +109,7 @@ func (g *Graph) createNodes(set Settings) error { connectorsAsExporter[exprID] = append(connectorsAsExporter[exprID], pipelineID) continue } - expNode := g.createExporter(pipelineID.Type(), exprID) + expNode := g.createExporter(pipelineID, exprID) pipe.exporters[expNode.ID()] = expNode } } @@ -156,6 +165,7 @@ func (g *Graph) createNodes(set Settings) error { continue } connNode := g.createConnector(eID, rID, connID) + g.pipelines[eID].exporters[connNode.ID()] = connNode g.pipelines[rID].receivers[connNode.ID()] = connNode } @@ -164,36 +174,70 @@ func (g *Graph) createNodes(set Settings) error { return nil } -func (g *Graph) createReceiver(pipelineType component.DataType, recvID component.ID) *receiverNode { - rcvrNode := newReceiverNode(pipelineType, recvID) +func (g *Graph) createReceiver(pipelineID, recvID component.ID) *receiverNode { + rcvrNode := newReceiverNode(pipelineID.Type(), recvID) if node := g.componentGraph.Node(rcvrNode.ID()); node != nil { + g.instanceIDs[node.ID()].PipelineIDs[pipelineID] = struct{}{} return node.(*receiverNode) } g.componentGraph.AddNode(rcvrNode) + g.instanceIDs[rcvrNode.ID()] = &component.InstanceID{ + ID: recvID, + Kind: component.KindReceiver, + PipelineIDs: map[component.ID]struct{}{ + pipelineID: {}, + }, + } return rcvrNode } func (g *Graph) createProcessor(pipelineID, procID component.ID) *processorNode { procNode := newProcessorNode(pipelineID, procID) g.componentGraph.AddNode(procNode) + g.instanceIDs[procNode.ID()] = &component.InstanceID{ + ID: procID, + Kind: component.KindProcessor, + PipelineIDs: map[component.ID]struct{}{ + pipelineID: {}, + }, + } return procNode } -func (g *Graph) createExporter(pipelineType component.DataType, exprID component.ID) *exporterNode { - expNode := newExporterNode(pipelineType, exprID) +func (g *Graph) createExporter(pipelineID, exprID component.ID) *exporterNode { + expNode := newExporterNode(pipelineID.Type(), exprID) if node := g.componentGraph.Node(expNode.ID()); node != nil { + g.instanceIDs[expNode.ID()].PipelineIDs[pipelineID] = struct{}{} return node.(*exporterNode) } g.componentGraph.AddNode(expNode) + g.instanceIDs[expNode.ID()] = &component.InstanceID{ + ID: expNode.componentID, + Kind: component.KindExporter, + PipelineIDs: map[component.ID]struct{}{ + pipelineID: {}, + }, + } return expNode } func (g *Graph) createConnector(exprPipelineID, rcvrPipelineID, connID component.ID) *connectorNode { connNode := newConnectorNode(exprPipelineID.Type(), rcvrPipelineID.Type(), connID) if node := g.componentGraph.Node(connNode.ID()); node != nil { + instanceID := g.instanceIDs[connNode.ID()] + instanceID.PipelineIDs[exprPipelineID] = struct{}{} + instanceID.PipelineIDs[rcvrPipelineID] = struct{}{} return node.(*connectorNode) } g.componentGraph.AddNode(connNode) + g.instanceIDs[connNode.ID()] = &component.InstanceID{ + ID: connNode.componentID, + Kind: component.KindConnector, + PipelineIDs: map[component.ID]struct{}{ + exprPipelineID: {}, + rcvrPipelineID: {}, + }, + } return connNode } @@ -227,15 +271,22 @@ func (g *Graph) buildComponents(ctx context.Context, set Settings) error { for i := len(nodes) - 1; i >= 0; i-- { node := nodes[i] + + // skipped for capabilitiesNodes and fanoutNodes as they are not assigned componentIDs. + var telemetrySettings component.TelemetrySettings + if instanceID, ok := g.instanceIDs[node.ID()]; ok { + telemetrySettings = set.Telemetry.ToComponentTelemetrySettings(instanceID) + } + switch n := node.(type) { case *receiverNode: - err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID())) + err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ReceiverBuilder, g.nextConsumers(n.ID())) case *processorNode: - err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0]) + err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ProcessorBuilder, g.nextConsumers(n.ID())[0]) case *exporterNode: - err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ExporterBuilder) + err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ExporterBuilder) case *connectorNode: - err = n.buildComponent(ctx, set.Telemetry, set.BuildInfo, set.ConnectorBuilder, g.nextConsumers(n.ID())) + err = n.buildComponent(ctx, telemetrySettings, set.BuildInfo, set.ConnectorBuilder, g.nextConsumers(n.ID())) case *capabilitiesNode: capability := consumer.Capabilities{MutatesData: false} for _, proc := range g.pipelines[n.pipelineID].processors { @@ -326,12 +377,19 @@ func (g *Graph) StartAll(ctx context.Context, host component.Host) error { // are started before upstream components. This ensures that each // component's consumer is ready to consume. for i := len(nodes) - 1; i >= 0; i-- { - comp, ok := nodes[i].(component.Component) + node := nodes[i] + comp, ok := node.(component.Component) + if !ok { // Skip capabilities/fanout nodes continue } + + instanceID := g.instanceIDs[node.ID()] + _ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStarting)) + if compErr := comp.Start(ctx, host); compErr != nil { + _ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr)) return compErr } } @@ -350,12 +408,24 @@ func (g *Graph) ShutdownAll(ctx context.Context) error { // before the consumer is stopped. var errs error for i := 0; i < len(nodes); i++ { - comp, ok := nodes[i].(component.Component) + node := nodes[i] + comp, ok := node.(component.Component) + if !ok { // Skip capabilities/fanout nodes continue } - errs = multierr.Append(errs, comp.Shutdown(ctx)) + + instanceID := g.instanceIDs[node.ID()] + _ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopping)) + + if compErr := comp.Shutdown(ctx); compErr != nil { + errs = multierr.Append(errs, compErr) + _ = g.telemetry.ReportComponentStatus(instanceID, component.NewPermanentErrorEvent(compErr)) + continue + } + + _ = g.telemetry.ReportComponentStatus(instanceID, component.NewStatusEvent(component.StatusStopped)) } return errs } diff --git a/service/internal/graph/graph_test.go b/service/internal/graph/graph_test.go index 1279ff145d5..4e6bc0256eb 100644 --- a/service/internal/graph/graph_test.go +++ b/service/internal/graph/graph_test.go @@ -27,6 +27,8 @@ import ( "go.opentelemetry.io/collector/processor/processortest" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/collector/service/internal/servicetelemetry" + "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/internal/testcomponents" "go.opentelemetry.io/collector/service/pipelines" ) @@ -141,8 +143,13 @@ func TestGraphStartStop(t *testing.T) { } pg := &Graph{componentGraph: simple.NewDirectedGraph()} + pg.telemetry = servicetelemetry.NewNopTelemetrySettings() + pg.instanceIDs = make(map[int64]*component.InstanceID) + for _, edge := range tt.edges { f, t := &testNode{id: edge[0]}, &testNode{id: edge[1]} + pg.instanceIDs[f.ID()] = &component.InstanceID{} + pg.instanceIDs[t.ID()] = &component.InstanceID{} pg.componentGraph.SetEdge(simple.Edge{F: f, T: t}) } @@ -168,6 +175,13 @@ func TestGraphStartStopCycle(t *testing.T) { c1 := &testNode{id: component.NewIDWithName("c", "1")} e1 := &testNode{id: component.NewIDWithName("e", "1")} + pg.instanceIDs = map[int64]*component.InstanceID{ + r1.ID(): {}, + p1.ID(): {}, + c1.ID(): {}, + e1.ID(): {}, + } + pg.componentGraph.SetEdge(simple.Edge{F: r1, T: p1}) pg.componentGraph.SetEdge(simple.Edge{F: p1, T: c1}) pg.componentGraph.SetEdge(simple.Edge{F: c1, T: e1}) @@ -184,15 +198,22 @@ func TestGraphStartStopCycle(t *testing.T) { func TestGraphStartStopComponentError(t *testing.T) { pg := &Graph{componentGraph: simple.NewDirectedGraph()} + pg.telemetry = servicetelemetry.NewNopTelemetrySettings() + r1 := &testNode{ + id: component.NewIDWithName("r", "1"), + startErr: errors.New("foo"), + } + e1 := &testNode{ + id: component.NewIDWithName("e", "1"), + shutdownErr: errors.New("bar"), + } + pg.instanceIDs = map[int64]*component.InstanceID{ + r1.ID(): {}, + e1.ID(): {}, + } pg.componentGraph.SetEdge(simple.Edge{ - F: &testNode{ - id: component.NewIDWithName("r", "1"), - startErr: errors.New("foo"), - }, - T: &testNode{ - id: component.NewIDWithName("e", "1"), - shutdownErr: errors.New("bar"), - }, + F: r1, + T: e1, }) assert.EqualError(t, pg.StartAll(context.Background(), componenttest.NewNopHost()), "foo") assert.EqualError(t, pg.ShutdownAll(context.Background()), "bar") @@ -618,7 +639,7 @@ func TestConnectorPipelinesGraph(t *testing.T) { t.Run(test.name, func(t *testing.T) { // Build the pipeline set := Settings{ - Telemetry: componenttest.NewNopTelemetrySettings(), + Telemetry: servicetelemetry.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), ReceiverBuilder: receiver.NewBuilder( map[component.ID]component.Config{ @@ -884,7 +905,7 @@ func TestConnectorRouter(t *testing.T) { ctx := context.Background() set := Settings{ - Telemetry: componenttest.NewNopTelemetrySettings(), + Telemetry: servicetelemetry.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), ReceiverBuilder: receiver.NewBuilder( map[component.ID]component.Config{ @@ -1928,7 +1949,7 @@ func TestGraphBuildErrors(t *testing.T) { t.Run(test.name, func(t *testing.T) { set := Settings{ BuildInfo: component.NewDefaultBuildInfo(), - Telemetry: componenttest.NewNopTelemetrySettings(), + Telemetry: servicetelemetry.NewNopTelemetrySettings(), ReceiverBuilder: receiver.NewBuilder( test.receiverCfgs, map[component.Type]receiver.Factory{ @@ -1975,7 +1996,7 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { nopConnectorFactory := connectortest.NewNopFactory() set := Settings{ - Telemetry: componenttest.NewNopTelemetrySettings(), + Telemetry: servicetelemetry.NewNopTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo(), ReceiverBuilder: receiver.NewBuilder( map[component.ID]component.Config{ @@ -2082,6 +2103,152 @@ func TestGraphFailToStartAndShutdown(t *testing.T) { } } +func TestStatusReportedOnStartupShutdown(t *testing.T) { + + rNoErr := &testNode{id: component.NewIDWithName("r-no-err", "1")} + rStErr := &testNode{id: component.NewIDWithName("r-st-err", "1"), startErr: assert.AnError} + rSdErr := &testNode{id: component.NewIDWithName("r-sd-err", "1"), shutdownErr: assert.AnError} + + eNoErr := &testNode{id: component.NewIDWithName("e-no-err", "1")} + eStErr := &testNode{id: component.NewIDWithName("e-st-err", "1"), startErr: assert.AnError} + eSdErr := &testNode{id: component.NewIDWithName("e-sd-err", "1"), shutdownErr: assert.AnError} + + instanceIDs := map[*testNode]*component.InstanceID{ + rNoErr: {ID: rNoErr.id}, + rStErr: {ID: rStErr.id}, + rSdErr: {ID: rSdErr.id}, + eNoErr: {ID: eNoErr.id}, + eStErr: {ID: eStErr.id}, + eSdErr: {ID: eSdErr.id}, + } + + // compare two maps of status events ignoring timestamp + assertEqualStatuses := func(t *testing.T, evMap1, evMap2 map[*component.InstanceID][]*component.StatusEvent) { + assert.Equal(t, len(evMap1), len(evMap2)) + for id, evts1 := range evMap1 { + evts2 := evMap2[id] + assert.Equal(t, len(evts1), len(evts2)) + for i := 0; i < len(evts1); i++ { + ev1 := evts1[i] + ev2 := evts2[i] + assert.Equal(t, ev1.Status(), ev2.Status()) + assert.Equal(t, ev1.Err(), ev2.Err()) + } + } + + } + + for _, tc := range []struct { + name string + edge [2]*testNode + expectedStatuses map[*component.InstanceID][]*component.StatusEvent + startupErr error + shutdownErr error + }{ + { + name: "successful startup/shutdown", + edge: [2]*testNode{rNoErr, eNoErr}, + expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{ + instanceIDs[rNoErr]: { + component.NewStatusEvent(component.StatusStarting), + component.NewStatusEvent(component.StatusStopping), + component.NewStatusEvent(component.StatusStopped), + }, + instanceIDs[eNoErr]: { + component.NewStatusEvent(component.StatusStarting), + component.NewStatusEvent(component.StatusStopping), + component.NewStatusEvent(component.StatusStopped), + }, + }, + }, + { + name: "early startup error", + edge: [2]*testNode{rNoErr, eStErr}, + expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{ + instanceIDs[eStErr]: { + component.NewStatusEvent(component.StatusStarting), + component.NewPermanentErrorEvent(assert.AnError), + }, + }, + startupErr: assert.AnError, + }, + { + name: "late startup error", + edge: [2]*testNode{rStErr, eNoErr}, + expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{ + instanceIDs[rStErr]: { + component.NewStatusEvent(component.StatusStarting), + component.NewPermanentErrorEvent(assert.AnError), + }, + instanceIDs[eNoErr]: { + component.NewStatusEvent(component.StatusStarting), + component.NewStatusEvent(component.StatusStopping), + component.NewStatusEvent(component.StatusStopped), + }, + }, + startupErr: assert.AnError, + }, + { + name: "early shutdown error", + edge: [2]*testNode{rSdErr, eNoErr}, + expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{ + instanceIDs[rSdErr]: { + component.NewStatusEvent(component.StatusStarting), + component.NewStatusEvent(component.StatusStopping), + component.NewPermanentErrorEvent(assert.AnError), + }, + instanceIDs[eNoErr]: { + component.NewStatusEvent(component.StatusStarting), + component.NewStatusEvent(component.StatusStopping), + component.NewStatusEvent(component.StatusStopped), + }, + }, + shutdownErr: assert.AnError, + }, + { + name: "late shutdown error", + edge: [2]*testNode{rNoErr, eSdErr}, + expectedStatuses: map[*component.InstanceID][]*component.StatusEvent{ + instanceIDs[rNoErr]: { + component.NewStatusEvent(component.StatusStarting), + component.NewStatusEvent(component.StatusStopping), + component.NewStatusEvent(component.StatusStopped), + }, + instanceIDs[eSdErr]: { + component.NewStatusEvent(component.StatusStarting), + component.NewStatusEvent(component.StatusStopping), + component.NewPermanentErrorEvent(assert.AnError), + }, + }, + shutdownErr: assert.AnError, + }, + } { + t.Run(tc.name, func(t *testing.T) { + pg := &Graph{componentGraph: simple.NewDirectedGraph()} + pg.telemetry = servicetelemetry.NewNopTelemetrySettings() + + actualStatuses := make(map[*component.InstanceID][]*component.StatusEvent) + init, statusFunc := status.NewServiceStatusFunc(func(id *component.InstanceID, ev *component.StatusEvent) { + actualStatuses[id] = append(actualStatuses[id], ev) + }) + + pg.telemetry.ReportComponentStatus = statusFunc + init() + + e0, e1 := tc.edge[0], tc.edge[1] + pg.instanceIDs = map[int64]*component.InstanceID{ + e0.ID(): instanceIDs[e0], + e1.ID(): instanceIDs[e1], + } + pg.componentGraph.SetEdge(simple.Edge{F: e0, T: e1}) + + assert.Equal(t, tc.startupErr, pg.StartAll(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, tc.shutdownErr, pg.ShutdownAll(context.Background())) + assertEqualStatuses(t, tc.expectedStatuses, actualStatuses) + }) + } +} + func (g *Graph) getReceivers() map[component.DataType]map[component.ID]component.Component { receiversMap := make(map[component.DataType]map[component.ID]component.Component) receiversMap[component.DataTypeTraces] = make(map[component.ID]component.Component) diff --git a/service/internal/servicetelemetry/nop_telemetry_settings.go b/service/internal/servicetelemetry/nop_telemetry_settings.go new file mode 100644 index 00000000000..e0ee305346d --- /dev/null +++ b/service/internal/servicetelemetry/nop_telemetry_settings.go @@ -0,0 +1,28 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package servicetelemetry // import "go.opentelemetry.io/collector/service/internal/servicetelemetry" + +import ( + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +// NewNopTelemetrySettings returns a new nop settings for Create* functions. +func NewNopTelemetrySettings() TelemetrySettings { + return TelemetrySettings{ + Logger: zap.NewNop(), + TracerProvider: trace.NewNoopTracerProvider(), + MeterProvider: noop.NewMeterProvider(), + MetricsLevel: configtelemetry.LevelNone, + Resource: pcommon.NewResource(), + ReportComponentStatus: func(*component.InstanceID, *component.StatusEvent) error { + return nil + }, + } +} diff --git a/service/internal/servicetelemetry/nop_telemetry_settings_test.go b/service/internal/servicetelemetry/nop_telemetry_settings_test.go new file mode 100644 index 00000000000..dd5014c7e0f --- /dev/null +++ b/service/internal/servicetelemetry/nop_telemetry_settings_test.go @@ -0,0 +1,30 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package servicetelemetry + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestNewNopSettings(t *testing.T) { + set := NewNopTelemetrySettings() + + require.NotNil(t, set) + require.IsType(t, TelemetrySettings{}, set) + require.Equal(t, zap.NewNop(), set.Logger) + require.Equal(t, trace.NewNoopTracerProvider(), set.TracerProvider) + require.Equal(t, noop.NewMeterProvider(), set.MeterProvider) + require.Equal(t, configtelemetry.LevelNone, set.MetricsLevel) + require.Equal(t, pcommon.NewResource(), set.Resource) + require.NoError(t, set.ReportComponentStatus(&component.InstanceID{}, component.NewStatusEvent(component.StatusStarting))) +} diff --git a/service/internal/servicetelemetry/telemetry_settings.go b/service/internal/servicetelemetry/telemetry_settings.go new file mode 100644 index 00000000000..00062764d93 --- /dev/null +++ b/service/internal/servicetelemetry/telemetry_settings.go @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package servicetelemetry // import "go.opentelemetry.io/collector/service/internal/servicetelemetry" + +import ( + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/service/internal/status" +) + +// TelemetrySettings mirrors component.TelemetrySettings except for the method signature of +// ReportComponentStatus. The service level TelemetrySettings is not bound a specific component, and +// therefore takes a component.InstanceID as an argument. +type TelemetrySettings component.TelemetrySettingsBase[status.ServiceStatusFunc] + +// ToComponentTelemetrySettings returns a TelemetrySettings for a specific component derived from +// this service level Settings object. +func (s TelemetrySettings) ToComponentTelemetrySettings(id *component.InstanceID) component.TelemetrySettings { + return component.TelemetrySettings{ + Logger: s.Logger, + TracerProvider: s.TracerProvider, + MeterProvider: s.MeterProvider, + MetricsLevel: s.MetricsLevel, + Resource: s.Resource, + ReportComponentStatus: status.NewComponentStatusFunc(id, s.ReportComponentStatus), + } +} diff --git a/service/internal/servicetelemetry/telemetry_settings_test.go b/service/internal/servicetelemetry/telemetry_settings_test.go new file mode 100644 index 00000000000..17300404d2f --- /dev/null +++ b/service/internal/servicetelemetry/telemetry_settings_test.go @@ -0,0 +1,34 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package servicetelemetry + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configtelemetry" + "go.opentelemetry.io/collector/pdata/pcommon" +) + +func TestSettings(t *testing.T) { + set := TelemetrySettings{ + Logger: zap.NewNop(), + TracerProvider: trace.NewNoopTracerProvider(), + MeterProvider: noop.NewMeterProvider(), + MetricsLevel: configtelemetry.LevelNone, + Resource: pcommon.NewResource(), + ReportComponentStatus: func(*component.InstanceID, *component.StatusEvent) error { + return nil + }, + } + require.NoError(t, set.ReportComponentStatus(&component.InstanceID{}, component.NewStatusEvent(component.StatusOK))) + + compSet := set.ToComponentTelemetrySettings(&component.InstanceID{}) + require.NoError(t, compSet.ReportComponentStatus(component.NewStatusEvent(component.StatusOK))) +} diff --git a/service/internal/status/status.go b/service/internal/status/status.go new file mode 100644 index 00000000000..bbccc6939ae --- /dev/null +++ b/service/internal/status/status.go @@ -0,0 +1,156 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package status // import "go.opentelemetry.io/collector/service/internal/status" + +import ( + "errors" + "fmt" + "sync" + + "go.opentelemetry.io/collector/component" +) + +// onTransitionFunc receives a component.StatusEvent on a successful state transition +type onTransitionFunc func(*component.StatusEvent) + +// errInvalidStateTransition is returned for invalid state transitions +var errInvalidStateTransition = errors.New("invalid state transition") + +// fsm is a finite state machine that models transitions for component status +type fsm struct { + current *component.StatusEvent + transitions map[component.Status]map[component.Status]struct{} + onTransition onTransitionFunc +} + +// transition will attempt to execute a state transition. If it's successful, it calls the +// onTransitionFunc with a StatusEvent representing the new state. Returns an error if the arguments +// result in an invalid status, or if the state transition is not valid. +func (m *fsm) transition(ev *component.StatusEvent) error { + if _, ok := m.transitions[m.current.Status()][ev.Status()]; !ok { + return fmt.Errorf( + "cannot transition from %s to %s: %w", + m.current.Status(), + ev.Status(), + errInvalidStateTransition, + ) + } + m.current = ev + m.onTransition(ev) + return nil +} + +// newFSM creates a state machine with all valid transitions for component.Status. +// The initial state is set to component.StatusNone. +func newFSM(onTransition onTransitionFunc) *fsm { + return &fsm{ + current: component.NewStatusEvent(component.StatusNone), + onTransition: onTransition, + transitions: map[component.Status]map[component.Status]struct{}{ + component.StatusNone: { + component.StatusStarting: {}, + }, + component.StatusStarting: { + component.StatusOK: {}, + component.StatusRecoverableError: {}, + component.StatusPermanentError: {}, + component.StatusFatalError: {}, + component.StatusStopping: {}, + }, + component.StatusOK: { + component.StatusRecoverableError: {}, + component.StatusPermanentError: {}, + component.StatusFatalError: {}, + component.StatusStopping: {}, + }, + component.StatusRecoverableError: { + component.StatusOK: {}, + component.StatusPermanentError: {}, + component.StatusFatalError: {}, + component.StatusStopping: {}, + }, + component.StatusPermanentError: {}, + component.StatusFatalError: {}, + component.StatusStopping: { + component.StatusRecoverableError: {}, + component.StatusPermanentError: {}, + component.StatusFatalError: {}, + component.StatusStopped: {}, + }, + component.StatusStopped: {}, + }, + } +} + +// InitFunc can be used to toggle a ready flag to true +type InitFunc func() + +// readFunc can be used to check the value of a ready flag +type readyFunc func() bool + +// initAndReadyFuncs returns a pair of functions to set and check a boolean ready flag +func initAndReadyFuncs() (InitFunc, readyFunc) { + mu := sync.RWMutex{} + isReady := false + + init := func() { + mu.Lock() + defer mu.Unlock() + isReady = true + } + + ready := func() bool { + mu.RLock() + defer mu.RUnlock() + return isReady + } + + return init, ready +} + +// NotifyStatusFunc is the receiver of status events after successful state transitions +type NotifyStatusFunc func(*component.InstanceID, *component.StatusEvent) + +// ServiceStatusFunc is the expected type of ReportComponentStatus for servicetelemetry.Settings +type ServiceStatusFunc func(*component.InstanceID, *component.StatusEvent) error + +// errStatusNotReady is returned when trying to report status before service start +var errStatusNotReady = errors.New("report component status is not ready until service start") + +// NewServiceStatusFunc returns a function to be used as ReportComponentStatus for +// servicetelemetry.Settings, which differs from component.TelemetrySettings in that +// the service version does not correspond to a specific component, and thus needs +// the a component.InstanceID as a parameter. +func NewServiceStatusFunc(notifyStatusChange NotifyStatusFunc) (InitFunc, ServiceStatusFunc) { + init, isReady := initAndReadyFuncs() + // mu synchronizes access to the fsmMap and the underlying fsm during a state transition + mu := sync.Mutex{} + fsmMap := make(map[*component.InstanceID]*fsm) + return init, + func(id *component.InstanceID, ev *component.StatusEvent) error { + if !isReady() { + return errStatusNotReady + } + mu.Lock() + defer mu.Unlock() + fsm, ok := fsmMap[id] + if !ok { + fsm = newFSM(func(ev *component.StatusEvent) { + notifyStatusChange(id, ev) + }) + fsmMap[id] = fsm + } + return fsm.transition(ev) + } + +} + +// NewComponentStatusFunc returns a function to be used as ReportComponentStatus for +// component.TelemetrySettings, which differs from servicetelemetry.Settings in that +// the component version is tied to specific component instance. +func NewComponentStatusFunc(id *component.InstanceID, srvStatus ServiceStatusFunc) component.StatusFunc { + return func(ev *component.StatusEvent) error { + return srvStatus(id, ev) + } +} diff --git a/service/internal/status/status_test.go b/service/internal/status/status_test.go new file mode 100644 index 00000000000..c439cea39af --- /dev/null +++ b/service/internal/status/status_test.go @@ -0,0 +1,268 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package status + +import ( + "fmt" + "sync" + "testing" + + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component" +) + +func TestStatusFSM(t *testing.T) { + for _, tc := range []struct { + name string + reportedStatuses []component.Status + expectedStatuses []component.Status + expectedErrorCount int + }{ + { + name: "successful startup and shutdown", + reportedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + }, + { + name: "component recovered", + reportedStatuses: []component.Status{ + component.StatusStarting, + component.StatusRecoverableError, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusRecoverableError, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + }, + { + name: "repeated events are errors", + reportedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusRecoverableError, + component.StatusRecoverableError, + component.StatusRecoverableError, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusRecoverableError, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + expectedErrorCount: 2, + }, + { + name: "PermanentError is terminal", + reportedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusPermanentError, + component.StatusOK, + }, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusPermanentError, + }, + expectedErrorCount: 1, + }, + { + name: "FatalError is terminal", + reportedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusFatalError, + component.StatusOK, + }, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusFatalError, + }, + expectedErrorCount: 1, + }, + { + name: "Stopped is terminal", + reportedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + component.StatusOK, + }, + expectedStatuses: []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + }, + expectedErrorCount: 1, + }, + } { + t.Run(tc.name, func(t *testing.T) { + var receivedStatuses []component.Status + fsm := newFSM( + func(ev *component.StatusEvent) { + receivedStatuses = append(receivedStatuses, ev.Status()) + }, + ) + + errorCount := 0 + for _, status := range tc.reportedStatuses { + if err := fsm.transition(component.NewStatusEvent(status)); err != nil { + errorCount++ + require.ErrorIs(t, err, errInvalidStateTransition) + } + } + + require.Equal(t, tc.expectedErrorCount, errorCount) + require.Equal(t, tc.expectedStatuses, receivedStatuses) + }) + } +} + +func TestValidSeqsToStopped(t *testing.T) { + events := []*component.StatusEvent{ + component.NewStatusEvent(component.StatusStarting), + component.NewStatusEvent(component.StatusOK), + component.NewStatusEvent(component.StatusRecoverableError), + component.NewStatusEvent(component.StatusPermanentError), + component.NewStatusEvent(component.StatusFatalError), + } + + for _, ev := range events { + name := fmt.Sprintf("transition from: %s to: %s invalid", ev.Status(), component.StatusStopped) + t.Run(name, func(t *testing.T) { + fsm := newFSM(func(*component.StatusEvent) {}) + if ev.Status() != component.StatusStarting { + require.NoError(t, fsm.transition(component.NewStatusEvent(component.StatusStarting))) + } + require.NoError(t, fsm.transition(ev)) + // skipping to stopped is not allowed + err := fsm.transition(component.NewStatusEvent(component.StatusStopped)) + require.ErrorIs(t, err, errInvalidStateTransition) + + // stopping -> stopped is allowed for non-fatal, non-permanent errors + err = fsm.transition(component.NewStatusEvent(component.StatusStopping)) + if ev.Status() == component.StatusPermanentError || ev.Status() == component.StatusFatalError { + require.ErrorIs(t, err, errInvalidStateTransition) + } else { + require.NoError(t, err) + require.NoError(t, fsm.transition(component.NewStatusEvent(component.StatusStopped))) + } + }) + } + +} + +func TestStatusFuncs(t *testing.T) { + id1 := &component.InstanceID{} + id2 := &component.InstanceID{} + + actualStatuses := make(map[*component.InstanceID][]component.Status) + statusFunc := func(id *component.InstanceID, ev *component.StatusEvent) { + actualStatuses[id] = append(actualStatuses[id], ev.Status()) + } + + statuses1 := []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + } + + statuses2 := []component.Status{ + component.StatusStarting, + component.StatusOK, + component.StatusRecoverableError, + component.StatusOK, + component.StatusStopping, + component.StatusStopped, + } + + expectedStatuses := map[*component.InstanceID][]component.Status{ + id1: statuses1, + id2: statuses2, + } + + init, serviceStatusFn := NewServiceStatusFunc(statusFunc) + comp1Func := NewComponentStatusFunc(id1, serviceStatusFn) + comp2Func := NewComponentStatusFunc(id2, serviceStatusFn) + init() + + for _, st := range statuses1 { + require.NoError(t, comp1Func(component.NewStatusEvent(st))) + } + + for _, st := range statuses2 { + require.NoError(t, comp2Func(component.NewStatusEvent(st))) + } + + require.Equal(t, expectedStatuses, actualStatuses) +} + +func TestStatusFuncsConcurrent(t *testing.T) { + ids := []*component.InstanceID{{}, {}, {}, {}} + count := 0 + statusFunc := func(id *component.InstanceID, ev *component.StatusEvent) { + count++ + } + init, serviceStatusFn := NewServiceStatusFunc(statusFunc) + init() + + wg := sync.WaitGroup{} + wg.Add(len(ids)) + + for _, id := range ids { + id := id + go func() { + compFn := NewComponentStatusFunc(id, serviceStatusFn) + _ = compFn(component.NewStatusEvent(component.StatusStarting)) + for i := 0; i < 1000; i++ { + _ = compFn(component.NewStatusEvent(component.StatusRecoverableError)) + _ = compFn(component.NewStatusEvent(component.StatusOK)) + } + wg.Done() + }() + } + + wg.Wait() + require.Equal(t, 8004, count) +} + +func TestStatusFuncReady(t *testing.T) { + statusFunc := func(*component.InstanceID, *component.StatusEvent) {} + init, serviceStatusFn := NewServiceStatusFunc(statusFunc) + id := &component.InstanceID{} + + err := serviceStatusFn(id, component.NewStatusEvent(component.StatusStarting)) + require.ErrorIs(t, err, errStatusNotReady) + + init() + + err = serviceStatusFn(id, component.NewStatusEvent(component.StatusStarting)) + require.NoError(t, err) +} diff --git a/service/service.go b/service/service.go index ed9540ec948..477ed2266ef 100644 --- a/service/service.go +++ b/service/service.go @@ -29,6 +29,8 @@ import ( "go.opentelemetry.io/collector/service/extensions" "go.opentelemetry.io/collector/service/internal/graph" "go.opentelemetry.io/collector/service/internal/proctelemetry" + "go.opentelemetry.io/collector/service/internal/servicetelemetry" + "go.opentelemetry.io/collector/service/internal/status" "go.opentelemetry.io/collector/service/telemetry" ) @@ -69,10 +71,11 @@ type Settings struct { type Service struct { buildInfo component.BuildInfo telemetry *telemetry.Telemetry - telemetrySettings component.TelemetrySettings + telemetrySettings servicetelemetry.TelemetrySettings host *serviceHost telemetryInitializer *telemetryInitializer collectorConf *confmap.Conf + statusInit status.InitFunc } func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { @@ -104,7 +107,7 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { res := buildResource(set.BuildInfo, cfg.Telemetry) pcommonRes := pdataFromSdk(res) - srv.telemetrySettings = component.TelemetrySettings{ + srv.telemetrySettings = servicetelemetry.TelemetrySettings{ Logger: srv.telemetry.Logger(), TracerProvider: srv.telemetry.TracerProvider(), MeterProvider: noop.NewMeterProvider(), @@ -119,6 +122,8 @@ func New(ctx context.Context, set Settings, cfg Config) (*Service, error) { } srv.telemetrySettings.MeterProvider = srv.telemetryInitializer.mp srv.telemetrySettings.TracerProvider = srv.telemetryInitializer.tp + srv.statusInit, srv.telemetrySettings.ReportComponentStatus = + status.NewServiceStatusFunc(srv.host.notifyComponentStatusChange) // process the configuration and initialize the pipeline if err = srv.initExtensionsAndPipeline(ctx, set, cfg); err != nil { @@ -140,6 +145,9 @@ func (srv *Service) Start(ctx context.Context) error { zap.Int("NumCPU", runtime.NumCPU()), ) + // enable status reporting + srv.statusInit() + if err := srv.host.serviceExtensions.Start(ctx, srv.host); err != nil { return fmt.Errorf("failed to start extensions: %w", err) } diff --git a/service/service_test.go b/service/service_test.go index 16c5fd6f82f..8b47218dc6b 100644 --- a/service/service_test.go +++ b/service/service_test.go @@ -414,6 +414,28 @@ func TestServiceTelemetryLogger(t *testing.T) { assert.NotNil(t, srv.telemetrySettings.Logger) } +func TestServiceFatalError(t *testing.T) { + set := newNopSettings() + set.AsyncErrorChannel = make(chan error) + + srv, err := New(context.Background(), set, newNopConfig()) + require.NoError(t, err) + + assert.NoError(t, srv.Start(context.Background())) + t.Cleanup(func() { + assert.NoError(t, srv.Shutdown(context.Background())) + }) + + go func() { + ev := component.NewFatalErrorEvent(assert.AnError) + srv.host.notifyComponentStatusChange(&component.InstanceID{}, ev) + }() + + err = <-srv.host.asyncErrorChannel + + require.ErrorIs(t, err, assert.AnError) +} + func assertResourceLabels(t *testing.T, res pcommon.Resource, expectedLabels map[string]labelValue) { for key, labelValue := range expectedLabels { lookupKey, ok := prometheusToOtelConv[key] diff --git a/service/telemetry.go b/service/telemetry.go index 70fe22bc637..bec39c8adf7 100644 --- a/service/telemetry.go +++ b/service/telemetry.go @@ -29,10 +29,10 @@ import ( "go.uber.org/multierr" "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configtelemetry" "go.opentelemetry.io/collector/internal/obsreportconfig" "go.opentelemetry.io/collector/service/internal/proctelemetry" + "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/telemetry" ) @@ -71,7 +71,7 @@ func newColTelemetry(useOtel bool, disableHighCardinality bool, extendedConfig b } } -func (tel *telemetryInitializer) init(res *resource.Resource, settings component.TelemetrySettings, cfg telemetry.Config, asyncErrorChannel chan error) error { +func (tel *telemetryInitializer) init(res *resource.Resource, settings servicetelemetry.TelemetrySettings, cfg telemetry.Config, asyncErrorChannel chan error) error { if cfg.Metrics.Level == configtelemetry.LevelNone || (cfg.Metrics.Address == "" && len(cfg.Metrics.Readers) == 0) { settings.Logger.Info( "Skipping telemetry setup.", diff --git a/service/telemetry_test.go b/service/telemetry_test.go index a4144e27890..e22c7c88fd4 100644 --- a/service/telemetry_test.go +++ b/service/telemetry_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/internal/testutil" semconv "go.opentelemetry.io/collector/semconv/v1.18.0" "go.opentelemetry.io/collector/service/internal/proctelemetry" + "go.opentelemetry.io/collector/service/internal/servicetelemetry" "go.opentelemetry.io/collector/service/telemetry" ) @@ -272,7 +273,7 @@ func TestTelemetryInit(t *testing.T) { } otelRes := buildResource(buildInfo, *tc.cfg) res := pdataFromSdk(otelRes) - settings := component.TelemetrySettings{ + settings := servicetelemetry.TelemetrySettings{ Logger: zap.NewNop(), Resource: res, }