Skip to content

Commit

Permalink
Remove references to Batch.FreeEntries
Browse files Browse the repository at this point in the history
Remove all references, usages and tests to `Batch.FreeEntries` that
was used only by the shipper.
  • Loading branch information
belimawr committed May 17, 2024
1 parent 9cabaf0 commit ef19518
Show file tree
Hide file tree
Showing 9 changed files with 2 additions and 85 deletions.
5 changes: 2 additions & 3 deletions libbeat/outputs/elasticsearch/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ func (bm *batchMock) ACK() {
func (bm *batchMock) Drop() {
bm.drop = true
}
func (bm *batchMock) Retry() { panic("unimplemented") }
func (bm *batchMock) Cancelled() { panic("unimplemented") }
func (bm *batchMock) FreeEntries() {}
func (bm *batchMock) Retry() { panic("unimplemented") }
func (bm *batchMock) Cancelled() { panic("unimplemented") }
func (bm *batchMock) SplitRetry() bool {
if bm.canSplit {
bm.didSplit = true
Expand Down
2 changes: 0 additions & 2 deletions libbeat/outputs/outest/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ func (b *Batch) SplitRetry() bool {
return len(b.events) > 1
}

func (b *Batch) FreeEntries() {}

func (b *Batch) Cancelled() {
b.doSignal(BatchSignal{Tag: BatchCancelled})
}
Expand Down
11 changes: 0 additions & 11 deletions libbeat/publisher/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,6 @@ type Batch interface {
// batch.Drop() if necessary).
SplitRetry() bool

// Release the internal pointer to this batch's events but do not yet
// acknowledge this batch. This exists specifically for the shipper output,
// where there is potentially a long gap between when events are handed off
// to the shipper and when they are acknowledged upstream; during that time,
// we need to preserve batch metadata for producer end-to-end acknowledgments,
// but we do not need the events themselves since they are already queued by
// the shipper. It is only guaranteed to release event pointers when using the
// proxy queue.
// Never call this on a batch that might be retried.
FreeEntries()

// Send was aborted, try again but don't decrease the batch's TTL counter.
Cancelled()
}
Expand Down
5 changes: 0 additions & 5 deletions libbeat/publisher/pipeline/ttl_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ func newBatch(retryer retryer, original queue.Batch, ttl int) *ttlBatch {
events = append(events, event)
}
}
original.FreeEntries()

b := &ttlBatch{
done: original.Done,
Expand Down Expand Up @@ -166,10 +165,6 @@ func (b *ttlBatch) RetryEvents(events []publisher.Event) {
b.Retry()
}

func (b *ttlBatch) FreeEntries() {
b.events = nil
}

// reduceTTL reduces the time to live for all events that have no 'guaranteed'
// sending requirements. reduceTTL returns true if the batch is still alive.
func (b *ttlBatch) reduceTTL() bool {
Expand Down
10 changes: 0 additions & 10 deletions libbeat/publisher/pipeline/ttl_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,6 @@ func TestBatchCallsDoneAndFreesEvents(t *testing.T) {
require.True(t, doneCalled, "Calling batch.Drop should invoke the done callback")
}

func TestNewBatchFreesEvents(t *testing.T) {
queueBatch := &mockQueueBatch{}
_ = newBatch(nil, queueBatch, 0)
assert.Equal(t, 1, queueBatch.freeEntriesCalled, "Creating a new ttlBatch should call FreeEntries on the underlying queue.Batch")
}

type mockQueueBatch struct {
freeEntriesCalled int
}
Expand All @@ -133,10 +127,6 @@ func (b *mockQueueBatch) Entry(i int) queue.Entry {
return fmt.Sprintf("event %v", i)
}

func (b *mockQueueBatch) FreeEntries() {
b.freeEntriesCalled++
}

type mockRetryer struct {
batches []*ttlBatch
}
Expand Down
3 changes: 0 additions & 3 deletions libbeat/publisher/queue/diskqueue/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,6 @@ func (batch *diskQueueBatch) Entry(i int) queue.Entry {
return batch.frames[i].event
}

func (batch *diskQueueBatch) FreeEntries() {
}

func (batch *diskQueueBatch) Done() {
batch.queue.acks.addFrames(batch.frames)
}
9 changes: 0 additions & 9 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,15 +417,6 @@ func (b *batch) Entry(i int) queue.Entry {
return b.rawEntry(i).event
}

func (b *batch) FreeEntries() {
// This signals that the event data has been copied out of the batch, and is
// safe to free from the queue buffer, so set all the event pointers to nil.
for i := 0; i < b.count; i++ {
index := (b.start + i) % len(b.queue.buf)
b.queue.buf[index].event = nil
}
}

func (b *batch) Done() {
b.doneChan <- batchDoneMsg{}
}
38 changes: 0 additions & 38 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,44 +438,6 @@ func TestEntryIDs(t *testing.T) {
})
}

func TestBatchFreeEntries(t *testing.T) {
const queueSize = 10
const batchSize = 5
// 1. Add 10 events to the queue, request two batches with 5 events each
// 2. Make sure the queue buffer has 10 non-nil events
// 3. Call FreeEntries on the second batch
// 4. Make sure only events 6-10 are nil
// 5. Call FreeEntries on the first batch
// 6. Make sure all events are nil
testQueue := NewQueue(nil, nil, Settings{Events: queueSize, MaxGetRequest: batchSize, FlushTimeout: time.Second}, 0, nil)
producer := testQueue.Producer(queue.ProducerConfig{})
for i := 0; i < queueSize; i++ {
_, ok := producer.Publish(i)
require.True(t, ok, "Queue publish must succeed")
}
batch1, err := testQueue.Get(batchSize)
require.NoError(t, err, "Queue read must succeed")
require.Equal(t, batchSize, batch1.Count(), "Returned batch size must match request")
batch2, err := testQueue.Get(batchSize)
require.NoError(t, err, "Queue read must succeed")
require.Equal(t, batchSize, batch2.Count(), "Returned batch size must match request")
// Slight concurrency subtlety: we check events are non-nil after the queue
// reads, since if we do it before we have no way to be sure the insert
// has been completed.
for i := 0; i < queueSize; i++ {
require.NotNil(t, testQueue.buf[i].event, "All queue events must be non-nil")
}
batch2.FreeEntries()
for i := 0; i < batchSize; i++ {
require.NotNilf(t, testQueue.buf[i].event, "Queue index %v: batch 1's events should be unaffected by calling FreeEntries on Batch 2", i)
require.Nilf(t, testQueue.buf[batchSize+i].event, "Queue index %v: batch 2's events should be nil after FreeEntries", batchSize+i)
}
batch1.FreeEntries()
for i := 0; i < queueSize; i++ {
require.Nilf(t, testQueue.buf[i].event, "Queue index %v: all events should be nil after calling FreeEntries on both batches")
}
}

// producerACKWaiter is a helper that can listen to queue producer callbacks
// and wait on them from the test thread, so we can test the queue's asynchronous
// behavior without relying on time.Sleep.
Expand Down
4 changes: 0 additions & 4 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,6 @@ type Producer interface {
type Batch interface {
Count() int
Entry(i int) Entry
// Release the internal references to the contained events, if
// supported (the disk queue does not yet implement it).
// Count() and Entry() cannot be used after this call.
FreeEntries()
Done()
}

Expand Down

0 comments on commit ef19518

Please sign in to comment.