Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Limit badger gc concurrency to 1 to avoid panic (backport #14340) #14403

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelogs/8.16.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ https://github.com/elastic/apm-server/compare/v8.15.2\...v8.16.0[View commits]
- Track all bulk request response status codes {pull}13574[13574]
- Fix a concurrent map write panic in monitoring middleware {pull}14335[14335]
- Apply shutdown timeout to http server {pull}14339[14339]
- Tail-based sampling: Fix rare gc thread failure after EA hot reload causing storage not reclaimed and stuck with "storage limit reached" {pull}13574[13574]

[float]
==== Breaking Changes
Expand Down
28 changes: 27 additions & 1 deletion x-pack/apm-server/sampling/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
shutdownGracePeriod = 5 * time.Second
)

var (
// gcCh works like a global mutex to protect gc from running concurrently when 2 TBS processors are active during a hot reload
gcCh = make(chan struct{}, 1)
)

// Processor is a tail-sampling event processor.
type Processor struct {
config Config
Expand Down Expand Up @@ -386,6 +391,16 @@ func (p *Processor) Run() error {
}
})
g.Go(func() error {
// Protect this goroutine from running concurrently when 2 TBS processors are active
// as badger GC is not concurrent safe.
select {
case <-p.stopping:
return nil
case gcCh <- struct{}{}:
}
defer func() {
<-gcCh
}()
// This goroutine is responsible for periodically garbage
// collecting the Badger value log, using the recommended
// discard ratio of 0.5.
Expand All @@ -411,7 +426,9 @@ func (p *Processor) Run() error {
})
g.Go(func() error {
// Subscribe to remotely sampled trace IDs. This is cancelled immediately when
// Stop is called. The next subscriber will pick up from the previous position.
// Stop is called. But it is possible that both old and new subscriber goroutines
// run concurrently, before the old one eventually receives the Stop call.
// The next subscriber will pick up from the previous position.
defer close(remoteSampledTraceIDs)
defer close(subscriberPositions)
ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -558,7 +575,13 @@ func (p *Processor) Run() error {
return nil
}

// subscriberPositionFileMutex protects the subscriber file from concurrent RW, in case of hot reload.
var subscriberPositionFileMutex sync.Mutex

func readSubscriberPosition(logger *logp.Logger, storageDir string) (pubsub.SubscriberPosition, error) {
subscriberPositionFileMutex.Lock()
defer subscriberPositionFileMutex.Unlock()

var pos pubsub.SubscriberPosition
data, err := os.ReadFile(filepath.Join(storageDir, subscriberPositionFile))
if errors.Is(err, os.ErrNotExist) {
Expand All @@ -579,6 +602,9 @@ func writeSubscriberPosition(storageDir string, pos pubsub.SubscriberPosition) e
if err != nil {
return err
}

subscriberPositionFileMutex.Lock()
defer subscriberPositionFileMutex.Unlock()
return os.WriteFile(filepath.Join(storageDir, subscriberPositionFile), data, 0644)
}

Expand Down
26 changes: 26 additions & 0 deletions x-pack/apm-server/sampling/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/testing/protocmp"

"github.com/elastic/apm-data/model/modelpb"
Expand Down Expand Up @@ -668,6 +669,31 @@ func TestStorageGC(t *testing.T) {
t.Fatal("timed out waiting for value log garbage collection")
}

func TestStorageGCConcurrency(t *testing.T) {
// This test ensures that TBS processor does not return an error
// even when run concurrently e.g. in hot reload
if testing.Short() {
t.Skip("skipping slow test")
}

config := newTempdirConfig(t)
config.TTL = 10 * time.Millisecond
config.FlushInterval = 10 * time.Millisecond
config.StorageGCInterval = 10 * time.Millisecond

g := errgroup.Group{}
for i := 0; i < 2; i++ {
processor, err := sampling.NewProcessor(config)
require.NoError(t, err)
g.Go(processor.Run)
go func() {
time.Sleep(time.Second)
assert.NoError(t, processor.Stop(context.Background()))
}()
}
assert.NoError(t, g.Wait())
}

func TestStorageLimit(t *testing.T) {
// This test ensures that when tail sampling is configured with a hard
// storage limit, the limit is respected once the size is available.
Expand Down
Loading