Skip to content

Commit

Permalink
clean up FIFO handling
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed May 30, 2024
1 parent 24c5564 commit 3da6d32
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 33 deletions.
10 changes: 6 additions & 4 deletions libbeat/common/fifo/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ func (f *FIFO[T]) Empty() bool {
return f.first == nil
}

// Return the first value (if present) without removing it from the queue
func (f *FIFO[T]) First() (T, error) {
// Return the first value (if present) without removing it from the queue.
// Returns a default value if the queue is empty. To recognize this case,
// check (*FIFO).Empty().
func (f *FIFO[T]) First() T {
if f.first == nil {
var none T
return none, errFIFOEmpty
return none
}
return f.first.value, nil
return f.first.value
}

// Remove the first entry in the queue. Does nothing if the FIFO is empty.
Expand Down
24 changes: 0 additions & 24 deletions libbeat/publisher/queue/memqueue/circular_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,6 @@

package memqueue

// the queue's underlying array buffer needs to coordinate concurrent
// access by:
//
// runLoop
// - when a pushRequest is accepted, writes to the newly created entry index.
// - when a producer is cancelled, reads and writes to entry indices that
// have been created but not yet consumed, to discard events from that
// producer.
// - when entries are deleted (after consumed events have been
// acknowledged), reads from the deleted entry indices.
// - when a pushRequest requires resizing of the array, expands and/or
// replaces the buffer.
//
// the queue's consumer (in a live Beat this means queueReader in
// libbeat/publisher/pipeline/queue_reader.go) which reads from entry
// indices that have been consumed but not deleted via (*batch).Entry().
//
// ackLoop, which reads producer metadata from acknowledged entry
// indices before they are deleted so acknowledgment callbacks can be
// invoked.
//
// Most of these are not in conflict since they access disjoint array indices.
// The exception is growing the circular buffer, which conflicts with read
// access from batches of consumed events.
type circularBuffer struct {
// Do not access this array directly! use (circularBuffer).entry().
_entries []queueEntry
Expand Down
7 changes: 2 additions & 5 deletions libbeat/publisher/queue/memqueue/runloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,16 +306,13 @@ func (l *runLoop) canFitPushRequest(req pushRequest) bool {
}

func (l *runLoop) maybeUnblockPushRequests() {
req, err := l.pendingPushRequests.First()
for err == nil {
for !l.pendingPushRequests.Empty() {
req := l.pendingPushRequests.First()
if !l.canFitPushRequest(req) {
break
}
l.doInsert(req)
l.pendingPushRequests.Remove()

// Fetch the next request
req, err = l.pendingPushRequests.First()
}
}

Expand Down

0 comments on commit 3da6d32

Please sign in to comment.