Skip to content

Commit

Permalink
Fix race conditions in ManagerV2 tests
Browse files Browse the repository at this point in the history
This commit fixes a number of race conditions in the ManagerV2 unit
tests. Most of them were due to the use of Testify's Eventually
function to read some values while some callbacks from the manager
would also modify those values. The simplest solution was to use the
atomic values on those cases.

One test (TestInputReload) had a race condition between the test and
the manager itself, so it was removed. There is an integration tests
that covers the same functionality.
  • Loading branch information
belimawr committed Aug 2, 2023
1 parent c7d5920 commit 37b14a1
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 223 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Skip HTTPJSON flakey test. {issue}34929[34929] {pull}35138[35138]
- Fix ingest pipeline for panw module to parse url scheme correctly {pull}35757[35757]
- Renamed an httpjson input metric to follow naming conventions. `httpjson_interval_pages_total` was renamed to `httpjson_interval_pages` because the `_total` suffix is reserved for counters. {issue}35933[35933] {pull}36169[36169]
- Fixed some race conditions in tests {pull}36185]36185]

==== Added

Expand Down
208 changes: 0 additions & 208 deletions x-pack/libbeat/management/input_reload_test.go

This file was deleted.

31 changes: 16 additions & 15 deletions x-pack/libbeat/management/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -34,44 +35,44 @@ func TestManagerV2(t *testing.T) {
inputs := &reloadableList{}
r.MustRegisterInput(inputs)

configsSet := false
configsCleared := false
logLevelSet := false
fqdnEnabled := false
allStopped := false
configsSet := atomic.Bool{}
configsCleared := atomic.Bool{}
logLevelSet := atomic.Bool{}
fqdnEnabled := atomic.Bool{}
allStopped := atomic.Bool{}
onObserved := func(observed *proto.CheckinObserved, currentIdx int) {
if currentIdx == 1 {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg != nil && len(iCfgs) == 3 {
configsSet = true
configsSet.Store(true)
t.Logf("output and inputs configuration set")
}
} else if currentIdx == 2 {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg == nil || len(iCfgs) != 3 {
// should not happen (config no longer set)
configsSet = false
configsSet.Store(false)
t.Logf("output and inputs configuration cleared (should not happen)")
}
} else {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg == nil && len(iCfgs) == 0 {
configsCleared = true
configsCleared.Store(true)
}
if len(observed.Units) == 0 {
allStopped = true
allStopped.Store(true)
t.Logf("output and inputs configuration cleared (stopping)")
}
}
if logp.GetLevel() == zapcore.DebugLevel {
logLevelSet = true
logLevelSet.Store(true)
t.Logf("debug log level set")
}

fqdnEnabled = features.FQDN()
fqdnEnabled.Store(features.FQDN())
t.Logf("FQDN feature flag set to %v", fqdnEnabled)

Check failure on line 76 in x-pack/libbeat/management/managerV2_test.go

View workflow job for this annotation

GitHub Actions / lint (windows)

copylocks: call of t.Logf copies lock value: sync/atomic.Bool contains sync/atomic.noCopy (govet)

Check failure on line 76 in x-pack/libbeat/management/managerV2_test.go

View workflow job for this annotation

GitHub Actions / lint (linux)

copylocks: call of t.Logf copies lock value: sync/atomic.Bool contains sync/atomic.noCopy (govet)
}

Expand Down Expand Up @@ -218,7 +219,7 @@ func TestManagerV2(t *testing.T) {
defer m.Stop()

require.Eventually(t, func() bool {
return configsSet && configsCleared && logLevelSet && fqdnEnabled && allStopped
return configsSet.Load() && configsCleared.Load() && logLevelSet.Load() && fqdnEnabled.Load() && allStopped.Load()
}, 15*time.Second, 300*time.Millisecond)
}

Expand All @@ -242,7 +243,7 @@ func TestOutputError(t *testing.T) {
}
r.MustRegisterInput(inputs)

stateReached := false
stateReached := atomic.Bool{}
units := []*proto.UnitExpected{
{
Id: "output-unit",
Expand Down Expand Up @@ -300,7 +301,7 @@ func TestOutputError(t *testing.T) {
server := &mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
if DoesStateMatch(observed, desiredState, 0) {
stateReached = true
stateReached.Store(true)
}
return &proto.CheckinExpected{
Units: units,
Expand Down Expand Up @@ -345,7 +346,7 @@ func TestOutputError(t *testing.T) {
defer m.Stop()

require.Eventually(t, func() bool {
return stateReached
return stateReached.Load()
}, 10*time.Second, 100*time.Millisecond, "desired state, output failed, was not reached")
}

Expand Down

0 comments on commit 37b14a1

Please sign in to comment.