Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions in ManagerV2 tests #36185

Merged
merged 1 commit into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Skip HTTPJSON flakey test. {issue}34929[34929] {pull}35138[35138]
- Fix ingest pipeline for panw module to parse url scheme correctly {pull}35757[35757]
- Renamed an httpjson input metric to follow naming conventions. `httpjson_interval_pages_total` was renamed to `httpjson_interval_pages` because the `_total` suffix is reserved for counters. {issue}35933[35933] {pull}36169[36169]
- Fixed some race conditions in tests {pull}36185[36185]

==== Added

Expand Down
7 changes: 4 additions & 3 deletions x-pack/filebeat/tests/integration/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -265,7 +266,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) {
"../../filebeat.test",
)

finalStateReached := false
finalStateReached := atomic.Bool{}
var units = []*proto.UnitExpected{
{
Id: "output-unit-borken",
Expand Down Expand Up @@ -319,7 +320,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) {
// So we wait until the state matches the desired state
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
if management.DoesStateMatch(observed, units, 0) {
finalStateReached = true
finalStateReached.Store(true)
}

return &proto.CheckinExpected{
Expand All @@ -337,7 +338,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) {
)

require.Eventually(t, func() bool {
return finalStateReached
return finalStateReached.Load()
rdner marked this conversation as resolved.
Show resolved Hide resolved
}, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy")

t.Cleanup(server.Stop)
Expand Down
208 changes: 0 additions & 208 deletions x-pack/libbeat/management/input_reload_test.go

This file was deleted.

40 changes: 20 additions & 20 deletions x-pack/libbeat/management/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,45 +37,45 @@ func TestManagerV2(t *testing.T) {
inputs := &reloadableList{}
r.MustRegisterInput(inputs)

configsSet := false
configsCleared := false
logLevelSet := false
fqdnEnabled := false
allStopped := false
configsSet := atomic.Bool{}
configsCleared := atomic.Bool{}
logLevelSet := atomic.Bool{}
fqdnEnabled := atomic.Bool{}
allStopped := atomic.Bool{}
onObserved := func(observed *proto.CheckinObserved, currentIdx int) {
if currentIdx == 1 {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg != nil && len(iCfgs) == 3 {
configsSet = true
t.Logf("output and inputs configuration set")
configsSet.Store(true)
t.Log("output and inputs configuration set")
}
} else if currentIdx == 2 {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg == nil || len(iCfgs) != 3 {
// should not happen (config no longer set)
configsSet = false
t.Logf("output and inputs configuration cleared (should not happen)")
configsSet.Store(false)
t.Log("output and inputs configuration cleared (should not happen)")
}
} else {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg == nil && len(iCfgs) == 0 {
configsCleared = true
configsCleared.Store(true)
}
if len(observed.Units) == 0 {
allStopped = true
t.Logf("output and inputs configuration cleared (stopping)")
allStopped.Store(true)
t.Log("output and inputs configuration cleared (stopping)")
}
}
if logp.GetLevel() == zapcore.DebugLevel {
logLevelSet = true
t.Logf("debug log level set")
logLevelSet.Store(true)
t.Log("debug log level set")
}

fqdnEnabled = features.FQDN()
t.Logf("FQDN feature flag set to %v", fqdnEnabled)
fqdnEnabled.Store(features.FQDN())
t.Logf("FQDN feature flag set to %v", fqdnEnabled.Load())
}

srv := integration.NewMockServer([][]*proto.UnitExpected{
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestManagerV2(t *testing.T) {
defer m.Stop()

require.Eventually(t, func() bool {
return configsSet && configsCleared && logLevelSet && fqdnEnabled && allStopped
return configsSet.Load() && configsCleared.Load() && logLevelSet.Load() && fqdnEnabled.Load() && allStopped.Load()
}, 15*time.Second, 300*time.Millisecond)
}

Expand All @@ -245,7 +245,7 @@ func TestOutputError(t *testing.T) {
}
r.MustRegisterInput(inputs)

stateReached := false
stateReached := atomic.Bool{}
units := []*proto.UnitExpected{
{
Id: "output-unit",
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestOutputError(t *testing.T) {
server := &mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
if DoesStateMatch(observed, desiredState, 0) {
stateReached = true
stateReached.Store(true)
}
return &proto.CheckinExpected{
Units: units,
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestOutputError(t *testing.T) {
defer m.Stop()

require.Eventually(t, func() bool {
return stateReached
return stateReached.Load()
rdner marked this conversation as resolved.
Show resolved Hide resolved
}, 10*time.Second, 100*time.Millisecond, "desired state, output failed, was not reached")
}

Expand Down
Loading