diff --git a/.chloggen/fix-persistent-queue-deadlock.yaml b/.chloggen/fix-persistent-queue-deadlock.yaml new file mode 100644 index 00000000000..141a8ab9dbf --- /dev/null +++ b/.chloggen/fix-persistent-queue-deadlock.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'bug_fix' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: 'exporterqueue' + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fix a bug in persistent queue that Offer can becomes deadlocked when queue is almost full + +# One or more tracking issues or pull requests related to the change +issues: [11015] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/internal/queue/mock_storage.go b/exporter/internal/queue/mock_storage.go index 6ef9810529b..26e2ae994e0 100644 --- a/exporter/internal/queue/mock_storage.go +++ b/exporter/internal/queue/mock_storage.go @@ -10,6 +10,7 @@ import ( "sync" "sync/atomic" "syscall" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/extension/experimental/storage" @@ -20,22 +21,31 @@ type mockStorageExtension struct { component.ShutdownFunc st sync.Map getClientError error + executionDelay time.Duration } func (m *mockStorageExtension) GetClient(context.Context, component.Kind, component.ID, string) (storage.Client, error) { if m.getClientError != nil { return nil, m.getClientError } - return &mockStorageClient{st: &m.st, closed: &atomic.Bool{}}, nil + return &mockStorageClient{st: &m.st, closed: &atomic.Bool{}, executionDelay: m.executionDelay}, nil } func NewMockStorageExtension(getClientError error) storage.Extension { - return &mockStorageExtension{getClientError: getClientError} + return NewMockStorageExtensionWithDelay(getClientError, 0) +} + +func NewMockStorageExtensionWithDelay(getClientError error, executionDelay time.Duration) storage.Extension { + return &mockStorageExtension{ + getClientError: getClientError, + executionDelay: executionDelay, + } } type mockStorageClient struct { - st *sync.Map - closed *atomic.Bool + st *sync.Map + closed *atomic.Bool + executionDelay time.Duration // simulate real storage client delay } func (m *mockStorageClient) Get(ctx context.Context, s string) ([]byte, error) { @@ -61,6 +71,9 @@ func (m *mockStorageClient) Batch(_ context.Context, ops ...storage.Operation) e if m.isClosed() { panic("client already closed") } + if m.executionDelay != 0 { + time.Sleep(m.executionDelay) + } for _, op := range ops { switch op.Type { case storage.Get: diff --git a/exporter/internal/queue/persistent_queue_test.go b/exporter/internal/queue/persistent_queue_test.go index f226e35c430..a6a7b8b974e 100644 --- a/exporter/internal/queue/persistent_queue_test.go +++ b/exporter/internal/queue/persistent_queue_test.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "strconv" + "sync" "sync/atomic" "syscall" "testing" @@ -531,6 +532,70 @@ func TestPersistentQueueStartWithNonDispatched(t *testing.T) { require.Equal(t, 6, newPs.Size()) } +func TestPersistentQueueStartWithNonDispatchedConcurrent(t *testing.T) { + req := newTracesRequest(1, 1) + + ext := NewMockStorageExtensionWithDelay(nil, 20*time.Nanosecond) + pq := createTestPersistentQueueWithItemsCapacity(t, ext, 25) + + proWg := sync.WaitGroup{} + // Sending small amount of data as windows test can't handle the test fast enough + for j := 0; j < 5; j++ { + proWg.Add(1) + go func() { + defer proWg.Done() + // Put in items up to capacity + for i := 0; i < 10; i++ { + for { + // retry infinitely so the exact amount of items are added to the queue eventually + if err := pq.Offer(context.Background(), req); err == nil { + break + } + time.Sleep(50 * time.Nanosecond) + } + } + }() + } + + conWg := sync.WaitGroup{} + for j := 0; j < 5; j++ { + conWg.Add(1) + go func() { + defer conWg.Done() + for i := 0; i < 10; i++ { + require.True(t, pq.Consume(func(context.Context, tracesRequest) error { return nil })) + } + }() + } + + conDone := make(chan struct{}) + go func() { + defer close(conDone) + conWg.Wait() + }() + + proDone := make(chan struct{}) + go func() { + defer close(proDone) + proWg.Wait() + }() + + doneCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + select { + case <-conDone: + case <-doneCtx.Done(): + assert.Fail(t, "timed out waiting for consumers to complete") + } + + select { + case <-proDone: + case <-doneCtx.Done(): + assert.Fail(t, "timed out waiting for producers to complete") + } + assert.Zero(t, pq.sizedChannel.Size()) +} + func TestPersistentQueue_PutCloseReadClose(t *testing.T) { req := newTracesRequest(5, 10) ext := NewMockStorageExtension(nil) diff --git a/exporter/internal/queue/sized_channel.go b/exporter/internal/queue/sized_channel.go index 1702a38ac2f..f322e58c01c 100644 --- a/exporter/internal/queue/sized_channel.go +++ b/exporter/internal/queue/sized_channel.go @@ -55,8 +55,16 @@ func (vcq *sizedChannel[T]) push(el T, size int64, callback func() error) error return err } } - vcq.ch <- el - return nil + + select { + // for persistent queue implementation, channel len can be out of sync with used size. Attempt to put it + // into the channel. If it is full, simply returns ErrQueueIsFull error. This prevents potential deadlock issues. + case vcq.ch <- el: + return nil + default: + vcq.used.Add(-size) + return ErrQueueIsFull + } } // pop removes the element from the queue and returns it. diff --git a/exporter/internal/queue/sized_channel_test.go b/exporter/internal/queue/sized_channel_test.go index 02cd4bf8e68..8d25510ff63 100644 --- a/exporter/internal/queue/sized_channel_test.go +++ b/exporter/internal/queue/sized_channel_test.go @@ -42,3 +42,13 @@ func TestSizedCapacityChannel(t *testing.T) { assert.False(t, ok) assert.Equal(t, 0, el) } + +func TestSizedCapacityChannel_Offer_sizedNotFullButChannelFull(t *testing.T) { + q := newSizedChannel[int](1, nil, 0) + assert.NoError(t, q.push(1, 1, nil)) + + q.used.Store(0) + err := q.push(1, 1, nil) + assert.Error(t, err) + assert.Equal(t, ErrQueueIsFull, err) +}