From 9c382d94db7b469dddd8bcada2ea95b386800bcb Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Fri, 8 Nov 2024 08:40:31 -0800 Subject: [PATCH] Input operator stop must not crash even if start was not called (#36262) --- pkg/stanza/operator/input/windows/input.go | 16 +++++++++++----- pkg/stanza/operator/input/windows/input_test.go | 6 ++++++ 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/stanza/operator/input/windows/input.go b/pkg/stanza/operator/input/windows/input.go index 76d79bbfa12e..5ad0db5c4fb9 100644 --- a/pkg/stanza/operator/input/windows/input.go +++ b/pkg/stanza/operator/input/windows/input.go @@ -154,22 +154,28 @@ func (i *Input) Start(persister operator.Persister) error { // Stop will stop reading events from a subscription. func (i *Input) Stop() error { - i.cancel() + // Warning: all calls made below must be safe to be done even if Start() was not called or failed. + + if i.cancel != nil { + i.cancel() + } + i.wg.Wait() + var errs error if err := i.subscription.Close(); err != nil { - return fmt.Errorf("failed to close subscription: %w", err) + errs = errors.Join(errs, fmt.Errorf("failed to close subscription: %w", err)) } if err := i.bookmark.Close(); err != nil { - return fmt.Errorf("failed to close bookmark: %w", err) + errs = errors.Join(errs, fmt.Errorf("failed to close bookmark: %w", err)) } if err := i.publisherCache.evictAll(); err != nil { - return fmt.Errorf("failed to close publishers: %w", err) + errs = errors.Join(errs, fmt.Errorf("failed to close publishers: %w", err)) } - return i.stopRemoteSession() + return errors.Join(errs, i.stopRemoteSession()) } // readOnInterval will read events with respect to the polling interval until it reaches the end of the channel. diff --git a/pkg/stanza/operator/input/windows/input_test.go b/pkg/stanza/operator/input/windows/input_test.go index 6d530f89bca1..462dad9a499a 100644 --- a/pkg/stanza/operator/input/windows/input_test.go +++ b/pkg/stanza/operator/input/windows/input_test.go @@ -24,6 +24,12 @@ func newTestInput() *Input { }) } +// TestInputCreate_Stop ensures the input correctly shuts down even if it was never started. +func TestInputCreate_Stop(t *testing.T) { + input := newTestInput() + assert.NoError(t, input.Stop()) +} + // TestInputStart_LocalSubscriptionError ensures the input correctly handles local subscription errors. func TestInputStart_LocalSubscriptionError(t *testing.T) { persister := testutil.NewMockPersister("")