Skip to content

Commit

Permalink
move queue draining logic into the queue
Browse files Browse the repository at this point in the history
  • Loading branch information
faec committed May 30, 2024
1 parent e70c13b commit e6dbb2d
Show file tree
Hide file tree
Showing 9 changed files with 108 additions and 70 deletions.
71 changes: 47 additions & 24 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package pipeline

import (
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/reload"
Expand Down Expand Up @@ -53,10 +54,15 @@ type outputController struct {
// is called.
queueFactory queue.QueueFactory

// consumer is a helper goroutine that reads event batches from the queue
// and sends them to workerChan for an output worker to process.
consumer *eventConsumer

// Each worker is a goroutine that will read batches from workerChan and
// send them to the output.
workers []outputWorker
workerChan chan publisher.Batch

consumer *eventConsumer
workers []outputWorker
// The InputQueueSize can be set when the Beat is started, in
// libbeat/cmd/instance/Settings we need to preserve that
// value and pass it into the queue factory. The queue
Expand Down Expand Up @@ -96,35 +102,26 @@ func newOutputController(
return controller, nil
}

func (c *outputController) Close() error {
func (c *outputController) WaitClose(timeout time.Duration) error {
// First: signal the queue that we're shutting down, and wait up to the
// given duration for it to drain and process ACKs.
c.closeQueue(timeout)

// We've drained the queue as much as we can, signal eventConsumer to
// close, and wait for it to finish. After consumer.close returns,
// there will be no more writes to c.workerChan, so it is safe to close.
c.consumer.close()
close(c.workerChan)

// Signal the output workers to close. This step is a hint, and carries
// no guarantees. For example, on close the Elasticsearch output workers
// will close idle connections, but will not change any behavior for
// active connections, giving any remaining events a chance to ingest
// before we terminate.
for _, out := range c.workers {
out.Close()
}

// Closing the queue stops ACKs from propagating, so we close everything
// else first to give it a chance to wait for any outstanding events to be
// acknowledged.
c.queueLock.Lock()
if c.queue != nil {
c.queue.Close()
}
for _, req := range c.pendingRequests {
// We can only end up here if there was an attempt to connect to the
// pipeline but it was shut down before any output was set.
// In this case, return nil and Pipeline.ConnectWith will pass on a
// real error to the caller.
// NOTE: under the current shutdown process, Pipeline.Close (and hence
// outputController.Close) is ~never called. So even if we did have
// blocked callers here, in a real shutdown they will never be woken
// up. But in hopes of a day when the shutdown process is more robust,
// I've decided to do the right thing here anyway.
req.responseChan <- nil
}
c.queueLock.Unlock()

return nil
}

Expand Down Expand Up @@ -195,6 +192,32 @@ func (c *outputController) Reload(
return nil
}

// Close the queue, waiting up to the specified timeout for pending events
// to complete.
func (c *outputController) closeQueue(timeout time.Duration) {
c.queueLock.Lock()
defer c.queueLock.Unlock()
if c.queue != nil {
c.queue.Close()
select {
case <-c.queue.Done():
case <-time.After(timeout):
}
}
for _, req := range c.pendingRequests {
// We can only end up here if there was an attempt to connect to the
// pipeline but it was shut down before any output was set.
// In this case, return nil and Pipeline.ConnectWith will pass on a
// real error to the caller.
// NOTE: under the current shutdown process, Pipeline.Close (and hence
// outputController.Close) is ~never called. So even if we did have
// blocked callers here, in a real shutdown they will never be woken
// up. But in hopes of a day when the shutdown process is more robust,
// I've decided to do the right thing here anyway.
req.responseChan <- nil
}
}

// queueProducer creates a queue producer with the given config, blocking
// until the queue is created if it does not yet exist.
func (c *outputController) queueProducer(config queue.ProducerConfig) queue.Producer {
Expand Down
7 changes: 1 addition & 6 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,13 +166,8 @@ func (p *Pipeline) Close() error {

log.Debug("close pipeline")

if p.waitCloseTimeout > 0 {
// TODO (hi fae): delay up to the specified timeout waiting for the queue
// to empty.
}

// Note: active clients are not closed / disconnected.
p.outputController.Close()
p.outputController.WaitClose(p.waitCloseTimeout)

p.observer.cleanup()
return nil
Expand Down
4 changes: 4 additions & 0 deletions libbeat/publisher/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ func (q *testQueue) Close() error {
return nil
}

func (q *testQueue) Done() <-chan struct{} {
return nil
}

func (q *testQueue) QueueType() string {
return "test"
}
Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/core_loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (dq *diskQueue) run() {
// After receiving new ACKs, a segment might be ready to delete.
dq.maybeDeleteACKed()

case <-dq.done:
case <-dq.close:
dq.handleShutdown()
return

Expand Down
2 changes: 1 addition & 1 deletion libbeat/publisher/queue/diskqueue/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (producer *diskQueueProducer) publish(
// blocking the core loop.
response := <-request.responseChan
return response
case <-producer.queue.done:
case <-producer.queue.close:
return false
case <-producer.done:
return false
Expand Down
46 changes: 17 additions & 29 deletions libbeat/publisher/queue/diskqueue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"errors"
"fmt"
"os"
"sync"

"github.com/elastic/beats/v7/libbeat/publisher/queue"
"github.com/elastic/elastic-agent-libs/logp"
Expand Down Expand Up @@ -49,10 +48,6 @@ type diskQueue struct {
writerLoop *writerLoop
deleterLoop *deleterLoop

// Wait group for shutdown of the goroutines associated with this queue:
// reader loop, writer loop, deleter loop, and core loop (diskQueue.run()).
waitGroup sync.WaitGroup

// writing is true if the writer loop is processing a request, false
// otherwise.
writing bool
Expand Down Expand Up @@ -84,7 +79,12 @@ type diskQueue struct {
// waiting for free space in the queue.
blockedProducers []producerWriteRequest

// The channel to signal our goroutines to shut down.
// The channel to signal our goroutines to shut down, used by
// (*diskQueue).Close.
close chan struct{}

// The channel to report that shutdown is finished, used by
// (*diskQueue).Done.
done chan struct{}
}

Expand Down Expand Up @@ -228,30 +228,15 @@ func NewQueue(

producerWriteRequestChan: make(chan producerWriteRequest),

done: make(chan struct{}),
close: make(chan struct{}),
done: make(chan struct{}),
}

// We wait for four goroutines on shutdown: core loop, reader loop,
// writer loop, deleter loop.
queue.waitGroup.Add(4)

// Start the goroutines and return the queue!
go func() {
queue.readerLoop.run()
queue.waitGroup.Done()
}()
go func() {
queue.writerLoop.run()
queue.waitGroup.Done()
}()
go func() {
queue.deleterLoop.run()
queue.waitGroup.Done()
}()
go func() {
queue.run()
queue.waitGroup.Done()
}()
go queue.readerLoop.run()
go queue.writerLoop.run()
go queue.deleterLoop.run()
go queue.run()

return queue, nil
}
Expand All @@ -263,12 +248,15 @@ func NewQueue(
func (dq *diskQueue) Close() error {
// Closing the done channel signals to the core loop that it should
// shut down the other helper goroutines and wrap everything up.
close(dq.done)
dq.waitGroup.Wait()
close(dq.close)

return nil
}

func (dq *diskQueue) Done() <-chan struct{} {
return dq.done
}

func (dq *diskQueue) QueueType() string {
return QueueType
}
Expand Down
14 changes: 11 additions & 3 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type broker struct {
// Consumers send requests to getChan to read events from the queue.
getChan chan getRequest

// Close triggers a queue close by sending to closeChan.
closeChan chan struct{}

///////////////////////////
// internal channels

Expand Down Expand Up @@ -212,8 +215,9 @@ func newQueue(
encoderFactory: encoderFactory,

// broker API channels
pushChan: make(chan pushRequest, chanSize),
getChan: make(chan getRequest),
pushChan: make(chan pushRequest, chanSize),
getChan: make(chan getRequest),
closeChan: make(chan struct{}),

// internal runLoop and ackLoop channels
consumedChan: make(chan batchList),
Expand All @@ -232,10 +236,14 @@ func newQueue(
}

func (b *broker) Close() error {
b.ctxCancel()
b.closeChan <- struct{}{}
return nil
}

func (b *broker) Done() <-chan struct{} {
return b.ctx.Done()
}

func (b *broker) QueueType() string {
return QueueType
}
Expand Down
26 changes: 20 additions & 6 deletions libbeat/publisher/queue/memqueue/runloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ type runLoop struct {
// It is active if and only if pendingGetRequest is non-nil.
getTimer *time.Timer

// closing is set when a close request is received. Once closing is true,
// the queue will not accept any new events, but will continue responding
// to Gets and Acks to allow pending events to complete on shutdown.
closing bool

// TODO (https://github.com/elastic/beats/issues/37893): entry IDs were a
// workaround for an external project that no longer exists. At this point
// they just complicate the API and should be removed.
Expand Down Expand Up @@ -90,8 +95,8 @@ func (l *runLoop) run() {
// standalone helper function to allow testing of loop invariants.
func (l *runLoop) runIteration() {
var pushChan chan pushRequest
// Push requests are enabled if the queue isn't yet full.
if l.eventCount < len(l.broker.buf) {
// Push requests are enabled if the queue isn't full or closing.
if l.eventCount < len(l.broker.buf) && !l.closing {
pushChan = l.broker.pushChan
}

Expand All @@ -116,7 +121,13 @@ func (l *runLoop) runIteration() {
}

select {
case <-l.broker.closeChan:
l.closing = true
// Get requests are handled immediately during shutdown
l.maybeUnblockGetRequest()

case <-l.broker.ctx.Done():
// The queue is fully shut down, do nothing
return

case req := <-pushChan: // producer pushing new event
Expand Down Expand Up @@ -154,8 +165,8 @@ func (l *runLoop) handleGetRequest(req *getRequest) {
}

func (l *runLoop) getRequestShouldBlock(req *getRequest) bool {
if l.broker.settings.FlushTimeout <= 0 {
// Never block if the flush timeout isn't positive
if l.broker.settings.FlushTimeout <= 0 || l.closing {
// Never block if the flush timeout isn't positive, or during shutdown
return false
}
eventsAvailable := l.eventCount - l.consumedCount
Expand Down Expand Up @@ -198,6 +209,10 @@ func (l *runLoop) handleDelete(count int) {
l.eventCount -= count
l.consumedCount -= count
l.broker.observer.RemoveEvents(count, byteCount)
if l.closing && l.eventCount == 0 {
// Our last events were acknowledged during shutdown, signal final shutdown
l.broker.ctxCancel()
}
}

func (l *runLoop) handleInsert(req *pushRequest) {
Expand All @@ -217,8 +232,7 @@ func (l *runLoop) maybeUnblockGetRequest() {
// If a get request is blocked waiting for more events, check if
// we should unblock it.
if getRequest := l.pendingGetRequest; getRequest != nil {
available := l.eventCount - l.consumedCount
if available >= getRequest.entryCount {
if !l.getRequestShouldBlock(getRequest) {
l.pendingGetRequest = nil
if !l.getTimer.Stop() {
<-l.getTimer.C
Expand Down
6 changes: 6 additions & 0 deletions libbeat/publisher/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@ type Entry interface{}
// consumer or flush to some other intermediate storage), it will send an ACK signal
// with the number of ACKed events to the Producer (ACK happens in batches).
type Queue interface {
// Close signals the queue to shut down, but it may keep handling requests
// and acknowledgments for events that are already in progress.
Close() error

// Done returns a channel that unblocks when the queue is closed and all
// its events are persisted or acknowledged.
Done() <-chan struct{}

QueueType() string
BufferConfig() BufferConfig

Expand Down

0 comments on commit e6dbb2d

Please sign in to comment.