-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[extension/opampagent] use status subscription for fine granular health reporting #35892
[extension/opampagent] use status subscription for fine granular health reporting #35892
Conversation
…th reporting Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
…lth-report' into feat/35856/opamp-extension-health-report
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for working on this @bacherfl. I have some bigger picture observations / suggestions before getting into the details of the current implementation.
The status.Aggregator
needs to consume status events from the component status reporting system. This is done by implementing the optional componentstatus.Watcher interface. I would expect this to be very similar to the healthcheckv2extension. See:
opentelemetry-collector-contrib/extension/healthcheckv2extension/extension.go
Lines 125 to 144 in eb48ed7
// ComponentStatusChanged implements the extension.StatusWatcher interface. | |
func (hc *healthCheckExtension) ComponentStatusChanged( | |
source *componentstatus.InstanceID, | |
event *componentstatus.Event, | |
) { | |
// There can be late arriving events after shutdown. We need to close | |
// the event channel so that this function doesn't block and we release all | |
// goroutines, but attempting to write to a closed channel will panic; log | |
// and recover. | |
defer func() { | |
if r := recover(); r != nil { | |
hc.telemetry.Logger.Info( | |
"discarding event received after shutdown", | |
zap.Any("source", source), | |
zap.Any("event", event), | |
) | |
} | |
}() | |
hc.eventCh <- &eventSourcePair{source: source, event: event} | |
} |
and
opentelemetry-collector-contrib/extension/healthcheckv2extension/extension.go
Lines 168 to 209 in eb48ed7
func (hc *healthCheckExtension) eventLoop(ctx context.Context) { | |
// Record events with component.StatusStarting, but queue other events until | |
// PipelineWatcher.Ready is called. This prevents aggregate statuses from | |
// flapping between StatusStarting and StatusOK as components are started | |
// individually by the service. | |
var eventQueue []*eventSourcePair | |
for loop := true; loop; { | |
select { | |
case esp, ok := <-hc.eventCh: | |
if !ok { | |
return | |
} | |
if esp.event.Status() != componentstatus.StatusStarting { | |
eventQueue = append(eventQueue, esp) | |
continue | |
} | |
hc.aggregator.RecordStatus(esp.source, esp.event) | |
case <-hc.readyCh: | |
for _, esp := range eventQueue { | |
hc.aggregator.RecordStatus(esp.source, esp.event) | |
} | |
eventQueue = nil | |
loop = false | |
case <-ctx.Done(): | |
return | |
} | |
} | |
// After PipelineWatcher.Ready, record statuses as they are received. | |
for { | |
select { | |
case esp, ok := <-hc.eventCh: | |
if !ok { | |
return | |
} | |
hc.aggregator.RecordStatus(esp.source, esp.event) | |
case <-ctx.Done(): | |
return | |
} | |
} | |
} |
Note the comment below eventLoop
for a description as to why the code is structured as it is. Those details may or may not be relevant for the OpAMP extension.
Components can update their status at any time while the collector is running. You'll need to decide if you want to send an updated component health message for each status update, or if you want to periodically retrieve the overall status and send an updated component health message if it's changed. If you want to do the latter, you can use the timestamp on the overall status to see if anything has changed.
If you haven't seen it, there is a document that describes the status reporting system.
Both the status.AggregateStatus
and the ComponentHealth
message are recursively defined. The existing conversion code assumes they will be two levels deep. It could be future proofed if it accounted for arbitrary levels of nesting.
The last thing I wanted to mention is how the healthcheckv2extension handles the Healthy
flag. I'm not sure if any of this applies to the OpAMP extension, but I figured I should mention this while were on this topic. The relationship between component status and whether or not a component is Healthy is somewhat subjective. The healthcheckv2extension has some configuration options to control how this works. They are: include_permanent_errors
, include_recoverable_errors
, recovery_duration
. If include_permanent_errors
is set a permanent error will be considered unhealthy. It's similar for include_recoverable_errors
, but there's an additional recovery_duration
setting where a recoverable error will be considered healthy until the recovery duration has elapsed. I think it's probably safe to ignore this for now, but I wanted to bring it up.
Thanks for the details @mwear! I was assuming that the Also good point about the nested ComponentHealth messages, I'll adapt the conversion to handle arbitrary levels of nesting. Will change this PR to draft for now and keep you updated once it's ready for another review |
Signed-off-by: Florian Bacher <[email protected]>
# Conflicts: # extension/opampextension/go.mod # extension/opampextension/opamp_agent.go # extension/opampextension/opamp_agent_test.go
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
…sent to opamp agent Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
…lth-report' into feat/35856/opamp-extension-health-report
@mwear I have adapted the implementation now - when you have time feel free to have a look to see if this goes into the right direction |
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks @bacherfl!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay. This mostly looks good to me, just a few minor suggestions.
|
||
statusAggregator statusAggregator | ||
statusSubscriptionWg *sync.WaitGroup | ||
componentHealthWg *sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I can tell, we never call Wait
on this, so I think we can remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch - seems like the Wait
was missing in the Shutdown
method. I added it now, to make sure the component health routine exits properly when we receive the shutdown signal
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI i had to increase some of the timeouts in the e2e tests of the supervisor, as the agent now needs a bit more time to shut down - I will re-trigger the tests a couple of times, to make sure the tests are passing consistently now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
alright, it seems like now the tests are passing consistently (the previous run failed due to a temporary issue with the govuln
check, but that is unrelated to the changes made in this PR
@@ -198,6 +215,27 @@ func (o *opampAgent) NotReady() error { | |||
return nil | |||
} | |||
|
|||
// ComponentStatusChanged implements the componentstatus.Watcher interface. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add an interface implementation assertion for componentstatus.Watcher
above?
@@ -35,6 +44,7 @@ func TestNewOpampAgent(t *testing.T) { | |||
assert.True(t, o.capabilities.ReportsHealth) | |||
assert.Empty(t, o.effectiveConfig) | |||
assert.Nil(t, o.agentDescription) | |||
assert.NoError(t, o.Shutdown(context.TODO())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use context.Background
in here instead, or are we planning to pass an actual context object?
# Conflicts: # extension/opampextension/go.mod
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
Signed-off-by: Florian Bacher <[email protected]>
) | ||
|
||
var _ extensioncapabilities.PipelineWatcher = (*opampAgent)(nil) | ||
var ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you move these to the definitions on line 88?
Signed-off-by: Florian Bacher <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
…36754) <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The changes introduced in #35892 seemed to have introduced some flakyness in the opampsupervisor e2e tests, as the shutdown of the opamp agent waits for the component health loop to end. Due to an unclosed channel within the opamp agent however, the agent does not properly shut down, and the supervisor runs into a timeout before ultimately sending a SIGKILL to the agent process. Closing the channel in the Shutdown method of the opamp extension fixes that and the agent gets shut down properly upon the reception of the SIGINT signal #### Link to tracking Issue: Fixes #36764 #### Testing This fixes the failing test mentioned in the issue (#36764) --------- Signed-off-by: Florian Bacher <[email protected]>
…pen-telemetry#36754) <!--Ex. Fixing a bug - Describe the bug and how this fixes the issue. Ex. Adding a feature - Explain what this achieves.--> #### Description The changes introduced in open-telemetry#35892 seemed to have introduced some flakyness in the opampsupervisor e2e tests, as the shutdown of the opamp agent waits for the component health loop to end. Due to an unclosed channel within the opamp agent however, the agent does not properly shut down, and the supervisor runs into a timeout before ultimately sending a SIGKILL to the agent process. Closing the channel in the Shutdown method of the opamp extension fixes that and the agent gets shut down properly upon the reception of the SIGINT signal #### Link to tracking Issue: Fixes open-telemetry#36764 #### Testing This fixes the failing test mentioned in the issue (open-telemetry#36764) --------- Signed-off-by: Florian Bacher <[email protected]>
Description
This PR extends the opamp agent extension to make use of the recently introduced
pkg/status
to subscribe to health updates, convert them and send them to the opamp serverLink to tracking issue
Fixes #35856
Testing
Added unit tests