Skip to content

Commit

Permalink
[exporter/signalfx] Fix memory leak on shutdown (open-telemetry#30887)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
`goleak` was detecting leaking goroutines in tests, this attempts to
resolve. I found what appeared to be a couple races but can't reproduce
locally so I'll run CI a few times to ensure this works as expected.

Changes in PR:
1. Add correlation client Shutdown function that blocks on the
waitgroup. This is the main fix of this PR that should fix the leaking
goroutines.
2. Re-organize the shutdown process of the apm client correlation test
suite to properly synchronize the shutting down process.
3. Fix typo
4. Add goleak checks to exporter/signalfx/internal/correlation` and
`exporter/signalfx/internal/apm/correlations`

**Link to tracking Issue:** <Issue number if applicable>
Resolves open-telemetry#30864
open-telemetry#30438

**Testing:** <Describe what testing was performed and which tests were
added.>
All existing and added tests should be passing. Since this has only
failed in CI I'm going to try to run it a few times before marking as
ready for review.
  • Loading branch information
crobert-1 authored and XinRanZhAWS committed Mar 13, 2024
1 parent efc1735 commit fa3af08
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 11 deletions.
27 changes: 27 additions & 0 deletions .chloggen/goleak_signalfx_correlations.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: signalfxexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix memory leak in shutdown

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30864, 30438]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
7 changes: 7 additions & 0 deletions exporter/signalfxexporter/internal/apm/correlations/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type CorrelationClient interface {
Delete(*Correlation, SuccessfulDeleteCB)
Get(dimName string, dimValue string, cb SuccessfulGetCB)
Start()
Shutdown()
}

type request struct {
Expand Down Expand Up @@ -387,3 +388,9 @@ func (cc *Client) Start() {
go cc.processChan()
go cc.processRetryChan()
}

// Shutdown the client. This will block until the context's cancel
// function is complete.
func (cc *Client) Shutdown() {
cc.wg.Wait()
}
23 changes: 13 additions & 10 deletions exporter/signalfxexporter/internal/apm/correlations/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,14 @@ func makeHandler(t *testing.T, corCh chan<- *request, forcedRespCode *atomic.Val
})
}

func setup(t *testing.T) (CorrelationClient, chan *request, *atomic.Value, *atomic.Value, context.CancelFunc) {
func setup(t *testing.T) (CorrelationClient, *httptest.Server, chan *request, *atomic.Value, *atomic.Value, context.CancelFunc, context.Context) {
serverCh := make(chan *request, 100)

var forcedRespCode atomic.Value
var forcedRespPayload atomic.Value
server := httptest.NewServer(makeHandler(t, serverCh, &forcedRespCode, &forcedRespPayload))

ctx, cancel := context.WithCancel(context.Background())
go func() {
<-ctx.Done()
server.Close()
}()

serverURL, err := url.Parse(server.URL)
if err != nil {
Expand Down Expand Up @@ -176,13 +172,20 @@ func setup(t *testing.T) (CorrelationClient, chan *request, *atomic.Value, *atom
}
client.Start()

return client, serverCh, &forcedRespCode, &forcedRespPayload, cancel
return client, server, serverCh, &forcedRespCode, &forcedRespPayload, cancel, ctx
}

func teardown(ctx context.Context, client CorrelationClient, server *httptest.Server, serverCh chan *request, cancel context.CancelFunc) {
close(serverCh)
cancel()
<-ctx.Done()
client.Shutdown()
server.Close()
}

func TestCorrelationClient(t *testing.T) {
client, serverCh, forcedRespCode, forcedRespPayload, cancel := setup(t)
defer close(serverCh)
defer cancel()
client, server, serverCh, forcedRespCode, forcedRespPayload, cancel, ctx := setup(t)
defer teardown(ctx, client, server, serverCh, cancel)

for _, correlationType := range []Type{Service, Environment} {
for _, op := range []string{http.MethodPut, http.MethodDelete} {
Expand Down Expand Up @@ -242,7 +245,7 @@ func TestCorrelationClient(t *testing.T) {
client.Correlate(testData, CorrelateCB(func(_ *Correlation, _ error) {}))
// sending the testData twice tests deduplication, since the 500 status
// will trigger retries, and the requests should be deduped and the
// TotalRertriedUpdates should still only be 5
// TotalRetriedUpdates should still only be 5
client.Correlate(testData, CorrelateCB(func(_ *Correlation, _ error) {}))

cors := waitForCors(serverCh, 1, 4)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package correlations

import (
"testing"

"go.uber.org/goleak"
)

// The IgnoreTopFunction call prevents catching the leak generated by opencensus
// defaultWorker.Start which at this time is part of the package's init call.
// See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information.
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ type correlationTestClient struct {
correlateCounter int64
}

func (c *correlationTestClient) Start() { /*no-op*/ }
func (c *correlationTestClient) Start() { /*no-op*/ }
func (c *correlationTestClient) Shutdown() { /*no-op*/ }
func (c *correlationTestClient) Get(_ string, dimValue string, cb correlations.SuccessfulGetCB) {
atomic.AddInt64(&c.getCounter, 1)
go func() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (cor *Tracker) Shutdown(_ context.Context) error {
if cor != nil {
if cor.correlation != nil {
cor.correlation.cancel()
cor.correlation.CorrelationClient.Shutdown()
}

if cor.pTicker != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ func TestTrackerStart(t *testing.T) {
} else {
require.NoError(t, err)
}

assert.NoError(t, tracker.Shutdown(context.Background()))
})
}
}
17 changes: 17 additions & 0 deletions exporter/signalfxexporter/internal/correlation/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package correlation

import (
"testing"

"go.uber.org/goleak"
)

// The IgnoreTopFunction call prevents catching the leak generated by opencensus
// defaultWorker.Start which at this time is part of the package's init call.
// See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information.
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
}

0 comments on commit fa3af08

Please sign in to comment.