Skip to content

Commit

Permalink
[libbeat] Remove "producer cancel" features from queue API (elastic#3…
Browse files Browse the repository at this point in the history
…9760)

"Producer cancel" is a feature that allows closing queue producers to also cancel any pending events created by that producer that have not yet been sent to a queue reader. It was introduced as a small part of a [very large refactor](elastic#4492) in 2017, but current code doesn't depend on it for anything. Since this feature adds considerable complexity to the queue API and implementation, this PR removes the feature and associated helpers.

This PR should cause no user-visible behavior change.
  • Loading branch information
faec authored May 31, 2024
1 parent d77596a commit 074c1dd
Show file tree
Hide file tree
Showing 13 changed files with 36 additions and 294 deletions.
15 changes: 3 additions & 12 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ func (c *client) Close() error {
c.logger.Debug("client: done closing acker")

c.logger.Debug("client: close queue producer")
cancelledEventCount := c.producer.Cancel()
c.onClosed(cancelledEventCount)
c.producer.Close()
c.onClosed()
c.logger.Debug("client: done producer close")

if c.processors != nil {
Expand All @@ -168,16 +168,7 @@ func (c *client) onClosing() {
}
}

func (c *client) onClosed(cancelledEventCount int) {
c.logger.Debugf("client: cancelled %v events", cancelledEventCount)

if c.eventWaitGroup != nil {
c.logger.Debugf("client: remove client events")
if cancelledEventCount > 0 {
c.eventWaitGroup.Add(-cancelledEventCount)
}
}

func (c *client) onClosed() {
c.observer.clientClosed()
if c.clientListener != nil {
c.clientListener.Closed()
Expand Down
3 changes: 1 addition & 2 deletions libbeat/publisher/pipeline/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,5 @@ func (emptyProducer) TryPublish(_ queue.Entry) (queue.EntryID, bool) {
return 0, false
}

func (emptyProducer) Cancel() int {
return 0
func (emptyProducer) Close() {
}
16 changes: 0 additions & 16 deletions libbeat/publisher/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,6 @@ type Pipeline struct {
waitCloseTimeout time.Duration
eventWaitGroup *sync.WaitGroup

// closeRef signal propagation support
guardStartSigPropagation sync.Once
sigNewClient chan *client

processors processing.Supporter
}

Expand Down Expand Up @@ -250,18 +246,6 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {

producerCfg := queue.ProducerConfig{}

if client.eventWaitGroup != nil || cfg.ClientListener != nil {
producerCfg.OnDrop = func(event queue.Entry) {
publisherEvent, _ := event.(publisher.Event)
if cfg.ClientListener != nil {
cfg.ClientListener.DroppedOnPublish(publisherEvent.Content)
}
if client.eventWaitGroup != nil {
client.eventWaitGroup.Add(-1)
}
}
}

var waiter *clientCloseWaiter
if waitClose > 0 {
waiter = newClientCloseWaiter(waitClose)
Expand Down
23 changes: 7 additions & 16 deletions libbeat/publisher/pipeline/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func makeDiscardQueue() queue.Queue {

producer: func(cfg queue.ProducerConfig) queue.Producer {
producerID.Inc()
id := producerID.Load()

// count is a counter that increments on every published event
// it's also the returned Event ID
Expand All @@ -103,10 +102,8 @@ func makeDiscardQueue() queue.Queue {
count++
return queue.EntryID(count), true
},
cancel: func() int {

cancel: func() {
wg.Done()
return id
},
}

Expand All @@ -125,7 +122,7 @@ type testQueue struct {

type testProducer struct {
publish func(try bool, event queue.Entry) (queue.EntryID, bool)
cancel func() int
cancel func()
}

func (q *testQueue) Metrics() (queue.Metrics, error) {
Expand Down Expand Up @@ -178,11 +175,10 @@ func (p *testProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) {
return 0, false
}

func (p *testProducer) Cancel() int {
func (p *testProducer) Close() {
if p.cancel != nil {
return p.cancel()
p.cancel()
}
return 0
}

func makeTestQueue() queue.Queue {
Expand All @@ -194,7 +190,7 @@ func makeTestQueue() queue.Queue {
close: func() error {
mux.Lock()
for producer := range producers {
producer.Cancel()
producer.Close()
}
mux.Unlock()

Expand All @@ -216,15 +212,11 @@ func makeTestQueue() queue.Queue {
}
return p.Publish(event)
},
cancel: func() int {
i := p.Cancel()

cancel: func() {
mux.Lock()
defer mux.Unlock()
delete(producers, producer)
wg.Done()

return i
},
}

Expand All @@ -248,9 +240,8 @@ func blockingProducer(_ queue.ProducerConfig) queue.Producer {
return 0, false
},

cancel: func() int {
cancel: func() {
close(sig)
return waiting.Load()
},
}
}
9 changes: 2 additions & 7 deletions libbeat/publisher/queue/diskqueue/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,10 @@ func (producer *diskQueueProducer) publish(
}
}

func (producer *diskQueueProducer) Cancel() int {
func (producer *diskQueueProducer) Close() {
if producer.cancelled {
return 0
return
}
producer.cancelled = true
close(producer.done)

// TODO (possibly?): message the core loop to remove any pending events that
// were sent through this producer. If we do, return the number of cancelled
// events here instead of zero.
return 0
}
7 changes: 1 addition & 6 deletions libbeat/publisher/queue/memqueue/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,6 @@ type broker struct {
// Consumers send requests to getChan to read events from the queue.
getChan chan getRequest

// Producers send requests to cancelChan to cancel events they've
// sent so far that have not yet reached a consumer.
cancelChan chan producerCancelRequest

// Metrics() sends requests to metricChan to expose internal queue
// metrics to external callers.
metricChan chan metricsRequest
Expand Down Expand Up @@ -224,7 +220,6 @@ func newQueue(
// broker API channels
pushChan: make(chan pushRequest, chanSize),
getChan: make(chan getRequest),
cancelChan: make(chan producerCancelRequest, 5),
metricChan: make(chan metricsRequest),

// internal runLoop and ackLoop channels
Expand Down Expand Up @@ -264,7 +259,7 @@ func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer {
if b.encoderFactory != nil {
encoder = b.encoderFactory()
}
return newProducer(b, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel, encoder)
return newProducer(b, cfg.ACK, encoder)
}

func (b *broker) Get(count int) (queue.Batch, error) {
Expand Down
9 changes: 0 additions & 9 deletions libbeat/publisher/queue/memqueue/internal_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,6 @@ type pushRequest struct {
resp chan queue.EntryID
}

type producerCancelRequest struct {
producer *ackProducer
resp chan producerCancelResponse
}

type producerCancelResponse struct {
removed int
}

// consumer -> broker API

type getRequest struct {
Expand Down
30 changes: 6 additions & 24 deletions libbeat/publisher/queue/memqueue/produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ type forgetfulProducer struct {

type ackProducer struct {
broker *broker
dropOnCancel bool
producedCount uint64
state produceState
openState openState
Expand All @@ -50,15 +49,13 @@ type openState struct {
type producerID uint64

type produceState struct {
cb ackHandler
dropCB func(queue.Entry)
cancelled bool
lastACK producerID
cb ackHandler
lastACK producerID
}

type ackHandler func(count int)

func newProducer(b *broker, cb ackHandler, dropCB func(queue.Entry), dropOnCancel bool, encoder queue.Encoder) queue.Producer {
func newProducer(b *broker, cb ackHandler, encoder queue.Encoder) queue.Producer {
openState := openState{
log: b.logger,
done: make(chan struct{}),
Expand All @@ -68,9 +65,8 @@ func newProducer(b *broker, cb ackHandler, dropCB func(queue.Entry), dropOnCance
}

if cb != nil {
p := &ackProducer{broker: b, dropOnCancel: dropOnCancel, openState: openState}
p := &ackProducer{broker: b, openState: openState}
p.state.cb = cb
p.state.dropCB = dropCB
return p
}
return &forgetfulProducer{broker: b, openState: openState}
Expand All @@ -91,9 +87,8 @@ func (p *forgetfulProducer) TryPublish(event queue.Entry) (queue.EntryID, bool)
return p.openState.tryPublish(p.makePushRequest(event))
}

func (p *forgetfulProducer) Cancel() int {
func (p *forgetfulProducer) Close() {
p.openState.Close()
return 0
}

func (p *ackProducer) makePushRequest(event queue.Entry) pushRequest {
Expand Down Expand Up @@ -123,21 +118,8 @@ func (p *ackProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) {
return id, published
}

func (p *ackProducer) Cancel() int {
func (p *ackProducer) Close() {
p.openState.Close()

if p.dropOnCancel {
ch := make(chan producerCancelResponse)
p.broker.cancelChan <- producerCancelRequest{
producer: p,
resp: ch,
}

// wait for cancel to being processed
resp := <-ch
return resp.removed
}
return 0
}

func (st *openState) Close() {
Expand Down
14 changes: 2 additions & 12 deletions libbeat/publisher/queue/memqueue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,7 @@ func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) {

p := q.Producer(queue.ProducerConfig{
// We do not read from the queue, so the callbacks are never called
ACK: func(count int) {},
OnDrop: func(e queue.Entry) {},
DropOnCancel: false,
ACK: func(count int) {},
})

success := atomic.Bool{}
Expand Down Expand Up @@ -170,10 +168,6 @@ func TestProducerClosePreservesEventCount(t *testing.T) {
ACK: func(count int) {
activeEvents.Add(-int64(count))
},
OnDrop: func(e queue.Entry) {
//activeEvents.Add(-1)
},
DropOnCancel: false,
})

// Asynchronously, send 4 events to the queue.
Expand Down Expand Up @@ -209,7 +203,7 @@ func TestProducerClosePreservesEventCount(t *testing.T) {
// Cancel the producer, then read and acknowledge two batches. If the
// Publish calls and the queue code are working, activeEvents should
// _usually_ end up as 0, but _always_ end up non-negative.
p.Cancel()
p.Close()

// The queue reads also need to be done in a goroutine, in case the
// producer cancellation signal went through before the Publish
Expand Down Expand Up @@ -297,10 +291,6 @@ func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occup
assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test)
}

func TestProducerCancelRemovesEvents(t *testing.T) {
queuetest.TestProducerCancelRemovesEvents(t, makeTestQueue(1024, 0, 0))
}

func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory {
return func(_ *testing.T) queue.Queue {
return NewQueue(nil, nil, Settings{
Expand Down
Loading

0 comments on commit 074c1dd

Please sign in to comment.