diff --git a/x-pack/filebeat/fbreceiver/receiver_test.go b/x-pack/filebeat/fbreceiver/receiver_test.go index 14365f854691..2da0fdaf81cc 100644 --- a/x-pack/filebeat/fbreceiver/receiver_test.go +++ b/x-pack/filebeat/fbreceiver/receiver_test.go @@ -40,6 +40,7 @@ func TestNewReceiver(t *testing.T) { "path.home": t.TempDir(), }, } + var zapLogs bytes.Buffer core := zapcore.NewCore( zapcore.NewJSONEncoder(zap.NewProductionEncoderConfig()), @@ -61,8 +62,27 @@ func TestNewReceiver(t *testing.T) { err = r.Start(context.Background(), nil) assert.NoError(t, err, "Error starting filebeatreceiver") - assert.Eventuallyf(t, func() bool { return countLogs > 0 }, 60*time.Second, 1*time.Second, "consumed logs didn't increase\nCount: %d\nLogs: %s\n", countLogs, zapLogs.String()) + ch := make(chan bool, 1) + timer := time.NewTimer(120 * time.Second) + defer timer.Stop() + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + for tick := ticker.C; ; { + select { + case <-timer.C: + t.Fatalf("consumed logs didn't increase\nCount: %d\nLogs: %s\n", countLogs, zapLogs.String()) + case <-tick: + tick = nil + go func() { ch <- countLogs > 0 }() + case v := <-ch: + if v { + goto found + } + tick = ticker.C + } + } +found: err = r.Shutdown(context.Background()) assert.NoError(t, err, "Error shutting down filebeatreceiver") }