Skip to content

Commit

Permalink
replace batchList implementation with FIFO helper
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed May 30, 2024
1 parent 3da6d32 commit ac94a2b
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 141 deletions.
21 changes: 21 additions & 0 deletions libbeat/common/fifo/fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,27 @@ func (f *FIFO[T]) First() T {
return f.first.value
}

// Remove the first entry in this FIFO and return it.
func (f *FIFO[T]) ConsumeFirst() T {
result := f.First()
f.Remove()
return result
}

// Append another FIFO queue to an existing one. Takes ownership of
// the given FIFO's contents.
func (f *FIFO[T]) Concat(list FIFO[T]) {
if list.Empty() {
return
}
if f.Empty() {
*f = list
return
}
f.last.next = list.first
f.last = list.last
}

// Remove the first entry in the queue. Does nothing if the FIFO is empty.
func (f *FIFO[T]) Remove() {
if f.first != nil {
Expand Down
65 changes: 31 additions & 34 deletions libbeat/publisher/queue/memqueue/ackloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ func newACKLoop(broker *broker) *ackLoop {
func (l *ackLoop) run() {
b := l.broker
for {
nextBatchChan := l.pendingBatches.nextBatchChannel()
var nextBatchChan chan batchDoneMsg
if !l.pendingBatches.Empty() {
nextBatchChan = l.pendingBatches.First().doneChan
}

select {
case <-b.ctx.Done():
Expand All @@ -46,7 +49,7 @@ func (l *ackLoop) run() {

case chanList := <-b.consumedChan:
// New batches have been generated, add them to the pending list
l.pendingBatches.concat(&chanList)
l.pendingBatches.Concat(chanList)

case <-nextBatchChan:
// The oldest outstanding batch has been acknowledged, advance our
Expand All @@ -58,43 +61,30 @@ func (l *ackLoop) run() {

// handleBatchSig collects and handles a batch ACK/Cancel signal. handleBatchSig
// is run by the ackLoop.
func (l *ackLoop) handleBatchSig() int {
func (l *ackLoop) handleBatchSig() {
ackedBatches := l.collectAcked()

count := 0
for batch := ackedBatches.front(); batch != nil; batch = batch.next {
count += batch.count
}

if count > 0 {
if !ackedBatches.Empty() {
// report acks to waiting clients
l.processACK(ackedBatches, count)
}

for !ackedBatches.empty() {
// Release finished batch structs into the shared memory pool
releaseBatch(ackedBatches.pop())
l.processACK(ackedBatches)
}

// return final ACK to EventLoop, in order to clean up internal buffer
l.broker.logger.Debug("ackloop: return ack to broker loop:", count)

l.broker.logger.Debug("ackloop: done send ack")
return count
}

func (l *ackLoop) collectAcked() batchList {
ackedBatches := batchList{}

acks := l.pendingBatches.pop()
ackedBatches.append(acks)
// The first batch is always included, since that's what triggered the call
// to collectAcked.
nextBatch := l.pendingBatches.ConsumeFirst()
ackedBatches.Add(nextBatch)

done := false
for !l.pendingBatches.empty() && !done {
acks := l.pendingBatches.front()
for !l.pendingBatches.Empty() && !done {
nextBatch = l.pendingBatches.First()
select {
case <-acks.doneChan:
ackedBatches.append(l.pendingBatches.pop())
case <-nextBatch.doneChan:
ackedBatches.Add(nextBatch)
l.pendingBatches.Remove()

default:
done = true
Expand All @@ -107,16 +97,22 @@ func (l *ackLoop) collectAcked() batchList {
// Called by ackLoop. This function exists to decouple the work of collecting
// and running producer callbacks from logical deletion of the events, so
// input callbacks can't block the queue by occupying the runLoop goroutine.
func (l *ackLoop) processACK(lst batchList, N int) {
func (l *ackLoop) processACK(lst batchList) {
ackCallbacks := []func(){}
batches := []batch{}
for !lst.Empty() {
batches = append(batches, lst.First())
lst.Remove()
}
// First we traverse the entries we're about to remove, collecting any callbacks
// we need to run.
lst.reverse()
for !lst.empty() {
batch := lst.pop()
// Traverse entries from last to first, so we can acknowledge the most recent
// ones first and skip repeated producer callbacks.
eventCount := 0
for batchIndex := len(batches) - 1; batchIndex >= 0; batchIndex-- {
batch := batches[batchIndex]
eventCount += batch.count

// Traverse entries from last to first, so we can acknowledge the most recent
// ones first and skip subsequent producer callbacks.
for i := batch.count - 1; i >= 0; i-- {
entry := batch.entry(i)
if entry.producer == nil {
Expand All @@ -136,7 +132,8 @@ func (l *ackLoop) processACK(lst batchList, N int) {
}
}
// Signal runLoop to delete the events
l.broker.deleteChan <- N
l.broker.deleteChan <- eventCount
l.broker.logger.Debug("ackloop: return ack to broker loop:", eventCount)

// The events have been removed; notify their listeners.
for _, f := range ackCallbacks {
Expand Down
117 changes: 13 additions & 104 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package memqueue
import (
"context"
"io"
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common/fifo"
"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand Down Expand Up @@ -117,9 +117,6 @@ type batch struct {
// to be valid).
queueBuf circularBuffer

// Next batch in the containing batchList
next *batch

// Position of the batch's events within the queue. This is an absolute
// index over the lifetime of the queue, to get the position within the
// queue's current circular buffer, use (start % len(queue.buf)).
Expand All @@ -133,10 +130,7 @@ type batch struct {
doneChan chan batchDoneMsg
}

type batchList struct {
head *batch
tail *batch
}
type batchList = fifo.FIFO[batch]

// FactoryForSettings is a simple wrapper around NewQueue so a concrete
// Settings object can be wrapped in a queue-agnostic interface for
Expand Down Expand Up @@ -260,7 +254,7 @@ func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer {
}

func (b *broker) Get(count int, bytes int) (queue.Batch, error) {
responseChan := make(chan *batch, 1)
responseChan := make(chan batch, 1)
select {
case <-b.ctx.Done():
return nil, io.EOF
Expand All @@ -277,93 +271,12 @@ func (b *broker) useByteLimits() bool {
return b.settings.Bytes > 0
}

var batchPool = sync.Pool{
New: func() interface{} {
return &batch{
doneChan: make(chan batchDoneMsg, 1),
}
},
}

func newBatch(queueBuf circularBuffer, start entryIndex, count int) *batch {
batch := batchPool.Get().(*batch)
batch.next = nil
batch.queueBuf = queueBuf
batch.start = start
batch.count = count
return batch
}

func releaseBatch(b *batch) {
b.next = nil
batchPool.Put(b)
}

func (l *batchList) prepend(b *batch) {
b.next = l.head
l.head = b
if l.tail == nil {
l.tail = b
}
}

func (l *batchList) concat(other *batchList) {
if other.head == nil {
return
}

if l.head == nil {
*l = *other
return
}

l.tail.next = other.head
l.tail = other.tail
}

func (l *batchList) append(b *batch) {
if l.head == nil {
l.head = b
} else {
l.tail.next = b
}
l.tail = b
}

func (l *batchList) empty() bool {
return l.head == nil
}

func (l *batchList) front() *batch {
return l.head
}

func (l *batchList) nextBatchChannel() chan batchDoneMsg {
if l.head == nil {
return nil
}
return l.head.doneChan
}

func (l *batchList) pop() *batch {
ch := l.head
if ch != nil {
l.head = ch.next
if l.head == nil {
l.tail = nil
}
}

ch.next = nil
return ch
}

func (l *batchList) reverse() {
tmp := *l
*l = batchList{}

for !tmp.empty() {
l.prepend(tmp.pop())
func newBatch(queueBuf circularBuffer, start entryIndex, count int) batch {
return batch{
doneChan: make(chan batchDoneMsg, 1),
queueBuf: queueBuf,
start: start,
count: count,
}
}

Expand All @@ -379,25 +292,21 @@ func AdjustInputQueueSize(requested, mainQueueSize int) (actual int) {
return actual
}

func (b *batch) Count() int {
func (b batch) Count() int {
return b.count
}

func (ei entryIndex) inBuffer(buf []queueEntry) *queueEntry {
return &buf[int(ei)%len(buf)]
}

// Return a pointer to the queueEntry for the i-th element of this batch
func (b *batch) entry(i int) *queueEntry {
func (b batch) entry(i int) *queueEntry {
entryIndex := b.start.plus(i)
return b.queueBuf.entry(entryIndex)
}

// Return the event referenced by the i-th element of this batch
func (b *batch) Entry(i int) queue.Entry {
func (b batch) Entry(i int) queue.Entry {
return b.entry(i).event
}

func (b *batch) Done() {
func (b batch) Done() {
b.doneChan <- batchDoneMsg{}
}
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/memqueue/internal_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type getRequest struct {
byteCount int

// The channel to send the new batch to.
responseChan chan *batch
responseChan chan batch
}

type batchDoneMsg struct{}
4 changes: 2 additions & 2 deletions libbeat/publisher/queue/memqueue/runloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (l *runLoop) runIteration() {
var consumedChan chan batchList
// Enable sending to the scheduled ACKs channel if we have
// something to send.
if !l.consumedBatches.empty() {
if !l.consumedBatches.Empty() {
consumedChan = l.broker.consumedChan
}

Expand Down Expand Up @@ -244,7 +244,7 @@ func (l *runLoop) handleGetReply(req *getRequest) {

// Send the batch to the caller and update internal state
req.responseChan <- batch
l.consumedBatches.append(batch)
l.consumedBatches.Add(batch)
l.consumedEventCount += batchEntryCount
l.consumedByteCount += batchByteCount
l.broker.observer.ConsumeEvents(batchEntryCount, batchByteCount)
Expand Down

0 comments on commit ac94a2b

Please sign in to comment.