diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 9afe4ff5857..baa6ca3ec95 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -84,6 +84,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Skip HTTPJSON flakey test. {issue}34929[34929] {pull}35138[35138] - Fix ingest pipeline for panw module to parse url scheme correctly {pull}35757[35757] - Renamed an httpjson input metric to follow naming conventions. `httpjson_interval_pages_total` was renamed to `httpjson_interval_pages` because the `_total` suffix is reserved for counters. {issue}35933[35933] {pull}36169[36169] +- Fixed some race conditions in tests {pull}36185[36185] ==== Added diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index 5e3111a0e09..3332d549fa2 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -10,6 +10,7 @@ import ( "fmt" "os" "path/filepath" + "sync/atomic" "testing" "time" @@ -265,7 +266,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { "../../filebeat.test", ) - finalStateReached := false + finalStateReached := atomic.Bool{} var units = []*proto.UnitExpected{ { Id: "output-unit-borken", @@ -319,7 +320,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { // So we wait until the state matches the desired state CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { if management.DoesStateMatch(observed, units, 0) { - finalStateReached = true + finalStateReached.Store(true) } return &proto.CheckinExpected{ @@ -337,7 +338,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { ) require.Eventually(t, func() bool { - return finalStateReached + return finalStateReached.Load() }, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy") t.Cleanup(server.Stop) diff --git a/x-pack/libbeat/management/input_reload_test.go b/x-pack/libbeat/management/input_reload_test.go deleted file mode 100644 index 8d79f685da7..00000000000 --- a/x-pack/libbeat/management/input_reload_test.go +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package management - -import ( - "fmt" - "sync" - "testing" - "time" - - "github.com/joeshaw/multierror" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/reload" - "github.com/elastic/beats/v7/libbeat/tests/integration" - "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent-client/v7/pkg/proto" -) - -func TestInputReload(t *testing.T) { - // Uncomment the line below to see the debug logs for this test - // logp.DevelopmentSetup(logp.WithLevel(logp.DebugLevel), logp.WithSelectors("*", "centralmgmt.V2-manager")) - r := reload.NewRegistry() - - output := &reloadable{} - r.MustRegisterOutput(output) - - reloadCallCount := 0 - inputs := &reloadableListMock{ - ReloadImpl: func(configs []*reload.ConfigWithMeta) error { - reloadCallCount++ - if reloadCallCount == 1 { - e1 := multierror.Errors{fmt.Errorf("%w", &common.ErrInputNotFinished{ - State: "", - File: "/tmp/foo.log", - })} - return e1.Err() - } - - return nil - }, - } - r.MustRegisterInput(inputs) - - configIdx := -1 - onObserved := func(observed *proto.CheckinObserved, currentIdx int) { - configIdx = currentIdx - } - - srv := integration.NewMockServer([][]*proto.UnitExpected{ - { - { - Id: "output-unit", - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "default", - Type: "elasticsearch", - Name: "elasticsearch", - }, - }, - { - Id: "input-unit-1", - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_STARTING, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "log-input", - Type: "log", - Name: "log", - Streams: []*proto.Stream{ - { - Id: "log-input-1", - Source: requireNewStruct(t, map[string]interface{}{ - "paths": []interface{}{"/tmp/foo.log"}, - }), - }, - }, - }, - }, - }, - { - { - Id: "output-unit", - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "default", - Type: "elasticsearch", - Name: "elasticsearch", - }, - }, - { - Id: "input-unit-1", - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "log-input-2", - Type: "log", - Name: "log", - Streams: []*proto.Stream{ - { - Id: "log-input-2", - Source: requireNewStruct(t, map[string]interface{}{ - "paths": []interface{}{"/tmp/foo.log"}, - }), - }, - }, - }, - }, - }, - }, - []uint64{1, 1}, - []*proto.Features{ - nil, - nil, - }, - onObserved, - 500*time.Millisecond, - ) - require.NoError(t, srv.Start()) - defer srv.Stop() - - client := client.NewV2(fmt.Sprintf(":%d", srv.Port), "", client.VersionInfo{ - Name: "program", - Version: "v1.0.0", - Meta: map[string]string{ - "key": "value", - }, - }, grpc.WithTransportCredentials(insecure.NewCredentials())) - - m, err := NewV2AgentManagerWithClient( - &Config{ - Enabled: true, - }, - r, - client, - WithChangeDebounce(300*time.Millisecond), - WithForceReloadDebounce(800*time.Millisecond), - ) - require.NoError(t, err) - - mm := m.(*BeatV2Manager) - - err = m.Start() - require.NoError(t, err) - defer m.Stop() - - forceReloadStates := []bool{false, true, false} - forceReloadStateIdx := 0 - forceReloadLastState := true // starts on true so the first iteration is already a change - - eventuallyCheck := func() bool { - forceReload := mm.forceReload - // That detects a state change, we only count/advance steps - // on state changes - if forceReload != forceReloadLastState { - forceReloadLastState = forceReload - if forceReload == forceReloadStates[forceReloadStateIdx] { - // Set to the next state - forceReloadStateIdx++ - } - - // If we went through all states, then succeed - if forceReloadStateIdx == len(forceReloadStates) { - // If we went through all states - if configIdx == 1 { - return true - } - } - } - - return false - } - - require.Eventually(t, eventuallyCheck, 20*time.Second, 300*time.Millisecond, - "the expected changes on forceReload did not happen") -} - -type reloadableListMock struct { - mx sync.Mutex - configs []*reload.ConfigWithMeta - ReloadImpl func(configs []*reload.ConfigWithMeta) error -} - -func (r *reloadableListMock) Reload(configs []*reload.ConfigWithMeta) error { - r.mx.Lock() - defer r.mx.Unlock() - return r.ReloadImpl(configs) -} - -func (r *reloadableListMock) Configs() []*reload.ConfigWithMeta { - r.mx.Lock() - defer r.mx.Unlock() - return r.configs -} diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index 3aec77bac91..9fe238605b4 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -37,45 +37,45 @@ func TestManagerV2(t *testing.T) { inputs := &reloadableList{} r.MustRegisterInput(inputs) - configsSet := false - configsCleared := false - logLevelSet := false - fqdnEnabled := false - allStopped := false + configsSet := atomic.Bool{} + configsCleared := atomic.Bool{} + logLevelSet := atomic.Bool{} + fqdnEnabled := atomic.Bool{} + allStopped := atomic.Bool{} onObserved := func(observed *proto.CheckinObserved, currentIdx int) { if currentIdx == 1 { oCfg := output.Config() iCfgs := inputs.Configs() if oCfg != nil && len(iCfgs) == 3 { - configsSet = true - t.Logf("output and inputs configuration set") + configsSet.Store(true) + t.Log("output and inputs configuration set") } } else if currentIdx == 2 { oCfg := output.Config() iCfgs := inputs.Configs() if oCfg == nil || len(iCfgs) != 3 { // should not happen (config no longer set) - configsSet = false - t.Logf("output and inputs configuration cleared (should not happen)") + configsSet.Store(false) + t.Log("output and inputs configuration cleared (should not happen)") } } else { oCfg := output.Config() iCfgs := inputs.Configs() if oCfg == nil && len(iCfgs) == 0 { - configsCleared = true + configsCleared.Store(true) } if len(observed.Units) == 0 { - allStopped = true - t.Logf("output and inputs configuration cleared (stopping)") + allStopped.Store(true) + t.Log("output and inputs configuration cleared (stopping)") } } if logp.GetLevel() == zapcore.DebugLevel { - logLevelSet = true - t.Logf("debug log level set") + logLevelSet.Store(true) + t.Log("debug log level set") } - fqdnEnabled = features.FQDN() - t.Logf("FQDN feature flag set to %v", fqdnEnabled) + fqdnEnabled.Store(features.FQDN()) + t.Logf("FQDN feature flag set to %v", fqdnEnabled.Load()) } srv := integration.NewMockServer([][]*proto.UnitExpected{ @@ -221,7 +221,7 @@ func TestManagerV2(t *testing.T) { defer m.Stop() require.Eventually(t, func() bool { - return configsSet && configsCleared && logLevelSet && fqdnEnabled && allStopped + return configsSet.Load() && configsCleared.Load() && logLevelSet.Load() && fqdnEnabled.Load() && allStopped.Load() }, 15*time.Second, 300*time.Millisecond) } @@ -245,7 +245,7 @@ func TestOutputError(t *testing.T) { } r.MustRegisterInput(inputs) - stateReached := false + stateReached := atomic.Bool{} units := []*proto.UnitExpected{ { Id: "output-unit", @@ -303,7 +303,7 @@ func TestOutputError(t *testing.T) { server := &mock.StubServerV2{ CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { if DoesStateMatch(observed, desiredState, 0) { - stateReached = true + stateReached.Store(true) } return &proto.CheckinExpected{ Units: units, @@ -348,7 +348,7 @@ func TestOutputError(t *testing.T) { defer m.Stop() require.Eventually(t, func() bool { - return stateReached + return stateReached.Load() }, 10*time.Second, 100*time.Millisecond, "desired state, output failed, was not reached") }