From 37b14a18f413d762314dba468ad5eebb16c55951 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Mon, 31 Jul 2023 19:03:58 +0200 Subject: [PATCH] Fix race conditions in ManagerV2 tests This commit fixes a number of race conditions in the ManagerV2 unit tests. Most of them were due to the use of Testify's Eventually function to read some values while some callbacks from the manager would also modify those values. The simplest solution was to use the atomic values on those cases. One test (TestInputReload) had a race condition between the test and the manager itself, so it was removed. There is an integration tests that covers the same functionality. --- CHANGELOG-developer.next.asciidoc | 1 + .../libbeat/management/input_reload_test.go | 208 ------------------ x-pack/libbeat/management/managerV2_test.go | 31 +-- 3 files changed, 17 insertions(+), 223 deletions(-) delete mode 100644 x-pack/libbeat/management/input_reload_test.go diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index d7323a2081b2..5aea98240853 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -84,6 +84,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Skip HTTPJSON flakey test. {issue}34929[34929] {pull}35138[35138] - Fix ingest pipeline for panw module to parse url scheme correctly {pull}35757[35757] - Renamed an httpjson input metric to follow naming conventions. `httpjson_interval_pages_total` was renamed to `httpjson_interval_pages` because the `_total` suffix is reserved for counters. {issue}35933[35933] {pull}36169[36169] +- Fixed some race conditions in tests {pull}36185]36185] ==== Added diff --git a/x-pack/libbeat/management/input_reload_test.go b/x-pack/libbeat/management/input_reload_test.go deleted file mode 100644 index 61ed315dc7a4..000000000000 --- a/x-pack/libbeat/management/input_reload_test.go +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package management - -import ( - "fmt" - "sync" - "testing" - "time" - - "github.com/joeshaw/multierror" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/reload" - "github.com/elastic/beats/v7/libbeat/tests/integration" - "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent-client/v7/pkg/proto" -) - -func TestInputReload(t *testing.T) { - // Uncomment the line below to see the debug logs for this test - // logp.DevelopmentSetup(logp.WithLevel(logp.DebugLevel), logp.WithSelectors("*", "centralmgmt.V2-manager")) - r := reload.NewRegistry() - - output := &reloadable{} - r.MustRegisterOutput(output) - - reloadCallCount := 0 - inputs := &reloadableListMock{ - ReloadImpl: func(configs []*reload.ConfigWithMeta) error { - reloadCallCount++ - if reloadCallCount == 1 { - e1 := multierror.Errors{fmt.Errorf("%w", &common.ErrInputNotFinished{ - State: "", - File: "/tmp/foo.log", - })} - return e1.Err() - } - - return nil - }, - } - r.MustRegisterInput(inputs) - - configIdx := -1 - onObserved := func(observed *proto.CheckinObserved, currentIdx int) { - configIdx = currentIdx - } - - srv := integration.NewMockServer([][]*proto.UnitExpected{ - { - { - Id: "output-unit", - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "default", - Type: "elasticsearch", - Name: "elasticsearch", - }, - }, - { - Id: "input-unit-1", - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "log-input", - Type: "log", - Name: "log", - Streams: []*proto.Stream{ - { - Id: "log-input-1", - Source: requireNewStruct(t, map[string]interface{}{ - "paths": []interface{}{"/tmp/foo.log"}, - }), - }, - }, - }, - }, - }, - { - { - Id: "output-unit", - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "default", - Type: "elasticsearch", - Name: "elasticsearch", - }, - }, - { - Id: "input-unit-1", - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "log-input-2", - Type: "log", - Name: "log", - Streams: []*proto.Stream{ - { - Id: "log-input-2", - Source: requireNewStruct(t, map[string]interface{}{ - "paths": []interface{}{"/tmp/foo.log"}, - }), - }, - }, - }, - }, - }, - }, - []uint64{1, 1}, - []*proto.Features{ - nil, - nil, - }, - onObserved, - 500*time.Millisecond, - ) - require.NoError(t, srv.Start()) - defer srv.Stop() - - client := client.NewV2(fmt.Sprintf(":%d", srv.Port), "", client.VersionInfo{ - Name: "program", - Version: "v1.0.0", - Meta: map[string]string{ - "key": "value", - }, - }, grpc.WithTransportCredentials(insecure.NewCredentials())) - - m, err := NewV2AgentManagerWithClient( - &Config{ - Enabled: true, - }, - r, - client, - WithChangeDebounce(300*time.Millisecond), - WithForceReloadDebounce(800*time.Millisecond), - ) - require.NoError(t, err) - - mm := m.(*BeatV2Manager) - - err = m.Start() - require.NoError(t, err) - defer m.Stop() - - forceReloadStates := []bool{false, true, false} - forceReloadStateIdx := 0 - forceReloadLastState := true // starts on true so the first iteration is already a change - - eventuallyCheck := func() bool { - forceReload := mm.forceReload - // That detects a state change, we only count/advance steps - // on state changes - if forceReload != forceReloadLastState { - forceReloadLastState = forceReload - if forceReload == forceReloadStates[forceReloadStateIdx] { - // Set to the next state - forceReloadStateIdx++ - } - - // If we went through all states, then succeed - if forceReloadStateIdx == len(forceReloadStates) { - // If we went through all states - if configIdx == 1 { - return true - } - } - } - - return false - } - - require.Eventually(t, eventuallyCheck, 20*time.Second, 300*time.Millisecond, - "the expected changes on forceReload did not happen") -} - -type reloadableListMock struct { - mx sync.Mutex - configs []*reload.ConfigWithMeta - ReloadImpl func(configs []*reload.ConfigWithMeta) error -} - -func (r *reloadableListMock) Reload(configs []*reload.ConfigWithMeta) error { - r.mx.Lock() - defer r.mx.Unlock() - return r.ReloadImpl(configs) -} - -func (r *reloadableListMock) Configs() []*reload.ConfigWithMeta { - r.mx.Lock() - defer r.mx.Unlock() - return r.configs -} diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index 65e240ec21c6..2238df2ebaa5 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "testing" "time" @@ -34,17 +35,17 @@ func TestManagerV2(t *testing.T) { inputs := &reloadableList{} r.MustRegisterInput(inputs) - configsSet := false - configsCleared := false - logLevelSet := false - fqdnEnabled := false - allStopped := false + configsSet := atomic.Bool{} + configsCleared := atomic.Bool{} + logLevelSet := atomic.Bool{} + fqdnEnabled := atomic.Bool{} + allStopped := atomic.Bool{} onObserved := func(observed *proto.CheckinObserved, currentIdx int) { if currentIdx == 1 { oCfg := output.Config() iCfgs := inputs.Configs() if oCfg != nil && len(iCfgs) == 3 { - configsSet = true + configsSet.Store(true) t.Logf("output and inputs configuration set") } } else if currentIdx == 2 { @@ -52,26 +53,26 @@ func TestManagerV2(t *testing.T) { iCfgs := inputs.Configs() if oCfg == nil || len(iCfgs) != 3 { // should not happen (config no longer set) - configsSet = false + configsSet.Store(false) t.Logf("output and inputs configuration cleared (should not happen)") } } else { oCfg := output.Config() iCfgs := inputs.Configs() if oCfg == nil && len(iCfgs) == 0 { - configsCleared = true + configsCleared.Store(true) } if len(observed.Units) == 0 { - allStopped = true + allStopped.Store(true) t.Logf("output and inputs configuration cleared (stopping)") } } if logp.GetLevel() == zapcore.DebugLevel { - logLevelSet = true + logLevelSet.Store(true) t.Logf("debug log level set") } - fqdnEnabled = features.FQDN() + fqdnEnabled.Store(features.FQDN()) t.Logf("FQDN feature flag set to %v", fqdnEnabled) } @@ -218,7 +219,7 @@ func TestManagerV2(t *testing.T) { defer m.Stop() require.Eventually(t, func() bool { - return configsSet && configsCleared && logLevelSet && fqdnEnabled && allStopped + return configsSet.Load() && configsCleared.Load() && logLevelSet.Load() && fqdnEnabled.Load() && allStopped.Load() }, 15*time.Second, 300*time.Millisecond) } @@ -242,7 +243,7 @@ func TestOutputError(t *testing.T) { } r.MustRegisterInput(inputs) - stateReached := false + stateReached := atomic.Bool{} units := []*proto.UnitExpected{ { Id: "output-unit", @@ -300,7 +301,7 @@ func TestOutputError(t *testing.T) { server := &mock.StubServerV2{ CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { if DoesStateMatch(observed, desiredState, 0) { - stateReached = true + stateReached.Store(true) } return &proto.CheckinExpected{ Units: units, @@ -345,7 +346,7 @@ func TestOutputError(t *testing.T) { defer m.Stop() require.Eventually(t, func() bool { - return stateReached + return stateReached.Load() }, 10*time.Second, 100*time.Millisecond, "desired state, output failed, was not reached") }