Skip to content

Commit

Permalink
[receiver/fluentforward] Fix memory leak (#32363)
Browse files Browse the repository at this point in the history
**Description:** 
This fixes a memory leak happening in the Fluent Forward receiver.
`time.Sleep` was being called for a duration of 10 seconds, with no way
to interrupt during shutdown. This updates the functionality to use a
timer that can be stopped in the case of the context being cancelled.

This also enables `goleak` checks on the Fluent Forward receiver to help
ensure no goroutines are being leaked, and adds a missing `Shutdown`
call.

**Link to tracking Issue:** #30438

**Testing:** All existing tests are passing as well as added `goleak` checks.
  • Loading branch information
crobert-1 authored Apr 12, 2024
1 parent 6522a3c commit c07d1e6
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 2 deletions.
27 changes: 27 additions & 0 deletions .chloggen/goleak_fluentforwardrec.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: fluentforwardreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix memory leak

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [32363]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
17 changes: 17 additions & 0 deletions receiver/fluentforwardreceiver/package_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package fluentforwardreceiver

import (
"testing"

"go.uber.org/goleak"
)

// The IgnoreTopFunction call prevents catching the leak generated by opencensus
// defaultWorker.Start which at this time is part of the package's init call.
// See https://github.com/census-instrumentation/opencensus-go/issues/1191 for more information.
func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))
}
1 change: 1 addition & 0 deletions receiver/fluentforwardreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ func TestUnixEndpoint(t *testing.T) {
receiver, err := newFluentReceiver(receivertest.NewNopCreateSettings(), conf, next)
require.NoError(t, err)
require.NoError(t, receiver.Start(ctx, nil))
defer func() { require.NoError(t, receiver.Shutdown(ctx)) }()

conn, err := net.Dial("unix", receiver.(*fluentReceiver).listener.Addr().String())
require.NoError(t, err)
Expand Down
10 changes: 8 additions & 2 deletions receiver/fluentforwardreceiver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,14 @@ func (s *server) handleConnections(ctx context.Context, listener net.Listener) {
// keep trying to accept connections if at all possible. Put in a sleep
// to prevent hot loops in case the error persists.
if err != nil {
time.Sleep(10 * time.Second)
continue
timer := time.NewTimer(10 * time.Second)
select {
case <-ctx.Done():
timer.Stop()
return
case <-timer.C:
continue
}
}
stats.Record(ctx, observ.ConnectionsOpened.M(1))

Expand Down

0 comments on commit c07d1e6

Please sign in to comment.