From c52c168761f7145ba277089525b7ee85e9239019 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Tue, 22 Oct 2024 08:57:34 -0400 Subject: [PATCH] Restore memory queue's internal event cleanup after a batch is vended (#41356) Fix https://github.com/elastic/beats/issues/41355, where event data in the memory queue was not being freed when event batches were acknowledged, but only gradually as the queue buffer was overwritten by later events. This gave the same effect as if all beat instances, even low-volume ones, were running with a full / saturated event queue. The root cause, found by @swiatekm, is [this PR](https://github.com/elastic/beats/pull/39584), an unrelated cleanup of old code that accidentally included one live call along with the deprecated ones. (There was an old `FreeEntries` hook in pipeline batches that was only used for deprecated shipper configs, but the cleanup also removed the `FreeEntries` call _inside_ the queue which was essential for releasing event memory.) (cherry picked from commit fdb912a8ace4245dd64076a47b174059dd656b49) --- CHANGELOG.next.asciidoc | 1 + libbeat/publisher/pipeline/ttl_batch.go | 1 + libbeat/publisher/pipeline/ttl_batch_test.go | 10 +++++ libbeat/publisher/queue/diskqueue/consumer.go | 3 ++ libbeat/publisher/queue/memqueue/broker.go | 9 +++++ .../publisher/queue/memqueue/queue_test.go | 38 +++++++++++++++++++ libbeat/publisher/queue/queue.go | 4 ++ 7 files changed, 66 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 654df60c69c..9cff19cc3f1 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -108,6 +108,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Support Elastic Agent control protocol chunking support {pull}37343[37343] - Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816] - Set timeout of 1 minute for FQDN requests {pull}37756[37756] +- Fix issue where old data could be saved in the memory queue after acknowledgment, increasing memory use {pull}41356[41356] *Auditbeat* diff --git a/libbeat/publisher/pipeline/ttl_batch.go b/libbeat/publisher/pipeline/ttl_batch.go index dab77fa5659..0ef4408b613 100644 --- a/libbeat/publisher/pipeline/ttl_batch.go +++ b/libbeat/publisher/pipeline/ttl_batch.go @@ -77,6 +77,7 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch { events = append(events, event) } } + original.FreeEntries() b := &ttlBatch{ done: original.Done, diff --git a/libbeat/publisher/pipeline/ttl_batch_test.go b/libbeat/publisher/pipeline/ttl_batch_test.go index 769ccc37c35..4c5207acbb0 100644 --- a/libbeat/publisher/pipeline/ttl_batch_test.go +++ b/libbeat/publisher/pipeline/ttl_batch_test.go @@ -112,6 +112,12 @@ func TestBatchCallsDoneAndFreesEvents(t *testing.T) { require.True(t, doneCalled, "Calling batch.Drop should invoke the done callback") } +func TestNewBatchFreesEvents(t *testing.T) { + queueBatch := &mockQueueBatch{} + _ = newBatch(nil, queueBatch, 0) + assert.Equal(t, 1, queueBatch.freeEntriesCalled, "Creating a new ttlBatch should call FreeEntries on the underlying queue.Batch") +} + type mockQueueBatch struct { freeEntriesCalled int } @@ -127,6 +133,10 @@ func (b *mockQueueBatch) Entry(i int) queue.Entry { return fmt.Sprintf("event %v", i) } +func (b *mockQueueBatch) FreeEntries() { + b.freeEntriesCalled++ +} + type mockRetryer struct { batches []*ttlBatch } diff --git a/libbeat/publisher/queue/diskqueue/consumer.go b/libbeat/publisher/queue/diskqueue/consumer.go index 20e6648d927..a0e5e944df3 100644 --- a/libbeat/publisher/queue/diskqueue/consumer.go +++ b/libbeat/publisher/queue/diskqueue/consumer.go @@ -97,6 +97,9 @@ func (batch *diskQueueBatch) Entry(i int) queue.Entry { return batch.frames[i].event } +func (batch *diskQueueBatch) FreeEntries() { +} + func (batch *diskQueueBatch) Done() { batch.queue.acks.addFrames(batch.frames) } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index b617bae6110..3e3e47e502c 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -398,6 +398,15 @@ func (b *batch) Entry(i int) queue.Entry { return b.rawEntry(i).event } +func (b *batch) FreeEntries() { + // This signals that the event data has been copied out of the batch, and is + // safe to free from the queue buffer, so set all the event pointers to nil. + for i := 0; i < b.count; i++ { + index := (b.start + i) % len(b.queue.buf) + b.queue.buf[index].event = nil + } +} + func (b *batch) Done() { b.doneChan <- batchDoneMsg{} } diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 9cd209bbd51..168c923e598 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -262,3 +262,41 @@ func TestAdjustInputQueueSize(t *testing.T) { assert.Equal(t, int(float64(mainQueue)*maxInputQueueSizeRatio), AdjustInputQueueSize(mainQueue, mainQueue)) }) } + +func TestBatchFreeEntries(t *testing.T) { + const queueSize = 10 + const batchSize = 5 + // 1. Add 10 events to the queue, request two batches with 5 events each + // 2. Make sure the queue buffer has 10 non-nil events + // 3. Call FreeEntries on the second batch + // 4. Make sure only events 6-10 are nil + // 5. Call FreeEntries on the first batch + // 6. Make sure all events are nil + testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0, nil) + producer := testQueue.Producer(queue.ProducerConfig{}) + for i := 0; i < queueSize; i++ { + _, ok := producer.Publish(i) + require.True(t, ok, "Queue publish must succeed") + } + batch1, err := testQueue.Get(batchSize) + require.NoError(t, err, "Queue read must succeed") + require.Equal(t, batchSize, batch1.Count(), "Returned batch size must match request") + batch2, err := testQueue.Get(batchSize) + require.NoError(t, err, "Queue read must succeed") + require.Equal(t, batchSize, batch2.Count(), "Returned batch size must match request") + // Slight concurrency subtlety: we check events are non-nil after the queue + // reads, since if we do it before we have no way to be sure the insert + // has been completed. + for i := 0; i < queueSize; i++ { + require.NotNil(t, testQueue.buf[i].event, "All queue events must be non-nil") + } + batch2.FreeEntries() + for i := 0; i < batchSize; i++ { + require.NotNilf(t, testQueue.buf[i].event, "Queue index %v: batch 1's events should be unaffected by calling FreeEntries on Batch 2", i) + require.Nilf(t, testQueue.buf[batchSize+i].event, "Queue index %v: batch 2's events should be nil after FreeEntries", batchSize+i) + } + batch1.FreeEntries() + for i := 0; i < queueSize; i++ { + require.Nilf(t, testQueue.buf[i].event, "Queue index %v: all events should be nil after calling FreeEntries on both batches") + } +} diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 075d7ad66a4..983a835a069 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -112,6 +112,10 @@ type Batch interface { Count() int Entry(i int) Entry Done() + // Release internal references to the contained events if supported + // (the disk queue does not currently implement this). + // Entry() should not be used after this call. + FreeEntries() } // Outputs can provide an EncoderFactory to enable early encoding, in which