diff --git a/libbeat/publisher/pipeline/client.go b/libbeat/publisher/pipeline/client.go index a5c02faace6..7ecce6fd8c7 100644 --- a/libbeat/publisher/pipeline/client.go +++ b/libbeat/publisher/pipeline/client.go @@ -146,8 +146,8 @@ func (c *client) Close() error { c.logger.Debug("client: done closing acker") c.logger.Debug("client: close queue producer") - cancelledEventCount := c.producer.Cancel() - c.onClosed(cancelledEventCount) + c.producer.Close() + c.onClosed() c.logger.Debug("client: done producer close") if c.processors != nil { @@ -168,16 +168,7 @@ func (c *client) onClosing() { } } -func (c *client) onClosed(cancelledEventCount int) { - c.logger.Debugf("client: cancelled %v events", cancelledEventCount) - - if c.eventWaitGroup != nil { - c.logger.Debugf("client: remove client events") - if cancelledEventCount > 0 { - c.eventWaitGroup.Add(-cancelledEventCount) - } - } - +func (c *client) onClosed() { c.observer.clientClosed() if c.clientListener != nil { c.clientListener.Closed() diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index bb75c9619c5..b34d6a64d2c 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -303,6 +303,5 @@ func (emptyProducer) TryPublish(_ queue.Entry) (queue.EntryID, bool) { return 0, false } -func (emptyProducer) Cancel() int { - return 0 +func (emptyProducer) Close() { } diff --git a/libbeat/publisher/pipeline/pipeline.go b/libbeat/publisher/pipeline/pipeline.go index 85eeb0e6497..dbe87681ea6 100644 --- a/libbeat/publisher/pipeline/pipeline.go +++ b/libbeat/publisher/pipeline/pipeline.go @@ -71,10 +71,6 @@ type Pipeline struct { waitCloseTimeout time.Duration eventWaitGroup *sync.WaitGroup - // closeRef signal propagation support - guardStartSigPropagation sync.Once - sigNewClient chan *client - processors processing.Supporter } @@ -250,18 +246,6 @@ func (p *Pipeline) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { producerCfg := queue.ProducerConfig{} - if client.eventWaitGroup != nil || cfg.ClientListener != nil { - producerCfg.OnDrop = func(event queue.Entry) { - publisherEvent, _ := event.(publisher.Event) - if cfg.ClientListener != nil { - cfg.ClientListener.DroppedOnPublish(publisherEvent.Content) - } - if client.eventWaitGroup != nil { - client.eventWaitGroup.Add(-1) - } - } - } - var waiter *clientCloseWaiter if waitClose > 0 { waiter = newClientCloseWaiter(waitClose) diff --git a/libbeat/publisher/pipeline/pipeline_test.go b/libbeat/publisher/pipeline/pipeline_test.go index feb01c4fa6e..78725b043f1 100644 --- a/libbeat/publisher/pipeline/pipeline_test.go +++ b/libbeat/publisher/pipeline/pipeline_test.go @@ -93,7 +93,6 @@ func makeDiscardQueue() queue.Queue { producer: func(cfg queue.ProducerConfig) queue.Producer { producerID.Inc() - id := producerID.Load() // count is a counter that increments on every published event // it's also the returned Event ID @@ -103,10 +102,8 @@ func makeDiscardQueue() queue.Queue { count++ return queue.EntryID(count), true }, - cancel: func() int { - + cancel: func() { wg.Done() - return id }, } @@ -125,7 +122,7 @@ type testQueue struct { type testProducer struct { publish func(try bool, event queue.Entry) (queue.EntryID, bool) - cancel func() int + cancel func() } func (q *testQueue) Metrics() (queue.Metrics, error) { @@ -178,11 +175,10 @@ func (p *testProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { return 0, false } -func (p *testProducer) Cancel() int { +func (p *testProducer) Close() { if p.cancel != nil { - return p.cancel() + p.cancel() } - return 0 } func makeTestQueue() queue.Queue { @@ -194,7 +190,7 @@ func makeTestQueue() queue.Queue { close: func() error { mux.Lock() for producer := range producers { - producer.Cancel() + producer.Close() } mux.Unlock() @@ -216,15 +212,11 @@ func makeTestQueue() queue.Queue { } return p.Publish(event) }, - cancel: func() int { - i := p.Cancel() - + cancel: func() { mux.Lock() defer mux.Unlock() delete(producers, producer) wg.Done() - - return i }, } @@ -248,9 +240,8 @@ func blockingProducer(_ queue.ProducerConfig) queue.Producer { return 0, false }, - cancel: func() int { + cancel: func() { close(sig) - return waiting.Load() }, } } diff --git a/libbeat/publisher/queue/diskqueue/producer.go b/libbeat/publisher/queue/diskqueue/producer.go index 69725c62ccc..7d084adf5ea 100644 --- a/libbeat/publisher/queue/diskqueue/producer.go +++ b/libbeat/publisher/queue/diskqueue/producer.go @@ -94,15 +94,10 @@ func (producer *diskQueueProducer) publish( } } -func (producer *diskQueueProducer) Cancel() int { +func (producer *diskQueueProducer) Close() { if producer.cancelled { - return 0 + return } producer.cancelled = true close(producer.done) - - // TODO (possibly?): message the core loop to remove any pending events that - // were sent through this producer. If we do, return the number of cancelled - // events here instead of zero. - return 0 } diff --git a/libbeat/publisher/queue/memqueue/broker.go b/libbeat/publisher/queue/memqueue/broker.go index a42215f48a6..d9aff10bd3a 100644 --- a/libbeat/publisher/queue/memqueue/broker.go +++ b/libbeat/publisher/queue/memqueue/broker.go @@ -66,10 +66,6 @@ type broker struct { // Consumers send requests to getChan to read events from the queue. getChan chan getRequest - // Producers send requests to cancelChan to cancel events they've - // sent so far that have not yet reached a consumer. - cancelChan chan producerCancelRequest - // Metrics() sends requests to metricChan to expose internal queue // metrics to external callers. metricChan chan metricsRequest @@ -224,7 +220,6 @@ func newQueue( // broker API channels pushChan: make(chan pushRequest, chanSize), getChan: make(chan getRequest), - cancelChan: make(chan producerCancelRequest, 5), metricChan: make(chan metricsRequest), // internal runLoop and ackLoop channels @@ -264,7 +259,7 @@ func (b *broker) Producer(cfg queue.ProducerConfig) queue.Producer { if b.encoderFactory != nil { encoder = b.encoderFactory() } - return newProducer(b, cfg.ACK, cfg.OnDrop, cfg.DropOnCancel, encoder) + return newProducer(b, cfg.ACK, encoder) } func (b *broker) Get(count int) (queue.Batch, error) { diff --git a/libbeat/publisher/queue/memqueue/internal_api.go b/libbeat/publisher/queue/memqueue/internal_api.go index 95b5e0eba90..6575472edbd 100644 --- a/libbeat/publisher/queue/memqueue/internal_api.go +++ b/libbeat/publisher/queue/memqueue/internal_api.go @@ -38,15 +38,6 @@ type pushRequest struct { resp chan queue.EntryID } -type producerCancelRequest struct { - producer *ackProducer - resp chan producerCancelResponse -} - -type producerCancelResponse struct { - removed int -} - // consumer -> broker API type getRequest struct { diff --git a/libbeat/publisher/queue/memqueue/produce.go b/libbeat/publisher/queue/memqueue/produce.go index 55f15a8cc86..a206e357aac 100644 --- a/libbeat/publisher/queue/memqueue/produce.go +++ b/libbeat/publisher/queue/memqueue/produce.go @@ -29,7 +29,6 @@ type forgetfulProducer struct { type ackProducer struct { broker *broker - dropOnCancel bool producedCount uint64 state produceState openState openState @@ -50,15 +49,13 @@ type openState struct { type producerID uint64 type produceState struct { - cb ackHandler - dropCB func(queue.Entry) - cancelled bool - lastACK producerID + cb ackHandler + lastACK producerID } type ackHandler func(count int) -func newProducer(b *broker, cb ackHandler, dropCB func(queue.Entry), dropOnCancel bool, encoder queue.Encoder) queue.Producer { +func newProducer(b *broker, cb ackHandler, encoder queue.Encoder) queue.Producer { openState := openState{ log: b.logger, done: make(chan struct{}), @@ -68,9 +65,8 @@ func newProducer(b *broker, cb ackHandler, dropCB func(queue.Entry), dropOnCance } if cb != nil { - p := &ackProducer{broker: b, dropOnCancel: dropOnCancel, openState: openState} + p := &ackProducer{broker: b, openState: openState} p.state.cb = cb - p.state.dropCB = dropCB return p } return &forgetfulProducer{broker: b, openState: openState} @@ -91,9 +87,8 @@ func (p *forgetfulProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) return p.openState.tryPublish(p.makePushRequest(event)) } -func (p *forgetfulProducer) Cancel() int { +func (p *forgetfulProducer) Close() { p.openState.Close() - return 0 } func (p *ackProducer) makePushRequest(event queue.Entry) pushRequest { @@ -123,21 +118,8 @@ func (p *ackProducer) TryPublish(event queue.Entry) (queue.EntryID, bool) { return id, published } -func (p *ackProducer) Cancel() int { +func (p *ackProducer) Close() { p.openState.Close() - - if p.dropOnCancel { - ch := make(chan producerCancelResponse) - p.broker.cancelChan <- producerCancelRequest{ - producer: p, - resp: ch, - } - - // wait for cancel to being processed - resp := <-ch - return resp.removed - } - return 0 } func (st *openState) Close() { diff --git a/libbeat/publisher/queue/memqueue/queue_test.go b/libbeat/publisher/queue/memqueue/queue_test.go index 41228046c53..5ebf6b6f6fb 100644 --- a/libbeat/publisher/queue/memqueue/queue_test.go +++ b/libbeat/publisher/queue/memqueue/queue_test.go @@ -96,9 +96,7 @@ func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) { p := q.Producer(queue.ProducerConfig{ // We do not read from the queue, so the callbacks are never called - ACK: func(count int) {}, - OnDrop: func(e queue.Entry) {}, - DropOnCancel: false, + ACK: func(count int) {}, }) success := atomic.Bool{} @@ -170,10 +168,6 @@ func TestProducerClosePreservesEventCount(t *testing.T) { ACK: func(count int) { activeEvents.Add(-int64(count)) }, - OnDrop: func(e queue.Entry) { - //activeEvents.Add(-1) - }, - DropOnCancel: false, }) // Asynchronously, send 4 events to the queue. @@ -209,7 +203,7 @@ func TestProducerClosePreservesEventCount(t *testing.T) { // Cancel the producer, then read and acknowledge two batches. If the // Publish calls and the queue code are working, activeEvents should // _usually_ end up as 0, but _always_ end up non-negative. - p.Cancel() + p.Close() // The queue reads also need to be done in a goroutine, in case the // producer cancellation signal went through before the Publish @@ -297,10 +291,6 @@ func queueMetricsAreValid(t *testing.T, q queue.Queue, evtCount, evtLimit, occup assert.Equal(t, testMetrics.UnackedConsumedEvents.ValueOr(0), uint64(occupied), "incorrect OccupiedRead for %s", test) } -func TestProducerCancelRemovesEvents(t *testing.T) { - queuetest.TestProducerCancelRemovesEvents(t, makeTestQueue(1024, 0, 0)) -} - func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory { return func(_ *testing.T) queue.Queue { return NewQueue(nil, nil, Settings{ diff --git a/libbeat/publisher/queue/memqueue/runloop.go b/libbeat/publisher/queue/memqueue/runloop.go index 45ae3c0a1a2..ed14106f20c 100644 --- a/libbeat/publisher/queue/memqueue/runloop.go +++ b/libbeat/publisher/queue/memqueue/runloop.go @@ -122,9 +122,6 @@ func (l *runLoop) runIteration() { case req := <-pushChan: // producer pushing new event l.handleInsert(&req) - case req := <-l.broker.cancelChan: // producer cancelling active events - l.handleCancel(&req) - case req := <-getChan: // consumer asking for next batch l.handleGetRequest(&req) @@ -195,16 +192,15 @@ func (l *runLoop) handleDelete(count int) { } func (l *runLoop) handleInsert(req *pushRequest) { - if l.insert(req, l.nextEntryID) { - // Send back the new event id. - req.resp <- l.nextEntryID + l.insert(req, l.nextEntryID) + // Send back the new event id. + req.resp <- l.nextEntryID - l.nextEntryID++ - l.eventCount++ + l.nextEntryID++ + l.eventCount++ - // See if this gave us enough for a new batch - l.maybeUnblockGetRequest() - } + // See if this gave us enough for a new batch + l.maybeUnblockGetRequest() } // Checks if we can handle pendingGetRequest yet, and handles it if so @@ -223,13 +219,7 @@ func (l *runLoop) maybeUnblockGetRequest() { } } -// Returns true if the event was inserted, false if insertion was cancelled. -func (l *runLoop) insert(req *pushRequest, id queue.EntryID) bool { - if req.producer != nil && req.producer.state.cancelled { - reportCancelledState(req) - return false - } - +func (l *runLoop) insert(req *pushRequest, id queue.EntryID) { index := (l.bufPos + l.eventCount) % len(l.broker.buf) l.broker.buf[index] = queueEntry{ event: req.event, @@ -237,7 +227,6 @@ func (l *runLoop) insert(req *pushRequest, id queue.EntryID) bool { producer: req.producer, producerID: req.producerID, } - return true } func (l *runLoop) handleMetricsRequest(req *metricsRequest) { @@ -253,50 +242,3 @@ func (l *runLoop) handleMetricsRequest(req *metricsRequest) { oldestEntryID: oldestEntryID, } } - -func (l *runLoop) handleCancel(req *producerCancelRequest) { - var removedCount int - - // Traverse all unconsumed events in the buffer, removing any with - // the specified producer. As we go we condense all the remaining - // events to be sequential. - buf := l.broker.buf - startIndex := l.bufPos + l.consumedCount - unconsumedEventCount := l.eventCount - l.consumedCount - for i := 0; i < unconsumedEventCount; i++ { - readIndex := (startIndex + i) % len(buf) - if buf[readIndex].producer == req.producer { - // The producer matches, skip this event - removedCount++ - } else { - // Move the event to its final position after accounting for any - // earlier indices that were removed. - // (Count backwards from (startIndex + i), not from readIndex, to avoid - // sign issues when the buffer wraps.) - writeIndex := (startIndex + i - removedCount) % len(buf) - buf[writeIndex] = buf[readIndex] - } - } - - // Clear the event pointers at the end of the buffer so we don't keep - // old events in memory by accident. - for i := 0; i < removedCount; i++ { - index := (l.bufPos + l.eventCount - removedCount + i) % len(buf) - buf[index].event = nil - } - - // Subtract removed events from the internal event count - l.eventCount -= removedCount - - // signal cancel request being finished - if req.resp != nil { - req.resp <- producerCancelResponse{removed: removedCount} - } -} - -func reportCancelledState(req *pushRequest) { - // do not add waiting events if producer did send cancel signal - if cb := req.producer.state.dropCB; cb != nil { - cb(req.event) - } -} diff --git a/libbeat/publisher/queue/memqueue/runloop_test.go b/libbeat/publisher/queue/memqueue/runloop_test.go index d25537265ea..266704fc1fd 100644 --- a/libbeat/publisher/queue/memqueue/runloop_test.go +++ b/libbeat/publisher/queue/memqueue/runloop_test.go @@ -44,7 +44,7 @@ func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) { }, 10, nil) - producer := newProducer(broker, nil, nil, false, nil) + producer := newProducer(broker, nil, nil) rl := broker.runLoop for i := 0; i < 100; i++ { // Pair each publish call with an iteration of the run loop so we @@ -83,7 +83,7 @@ func TestFlushSettingsBlockPartialBatches(t *testing.T) { }, 10, nil) - producer := newProducer(broker, nil, nil, false, nil) + producer := newProducer(broker, nil, nil) rl := broker.runLoop for i := 0; i < 100; i++ { // Pair each publish call with an iteration of the run loop so we diff --git a/libbeat/publisher/queue/queue.go b/libbeat/publisher/queue/queue.go index 8758c055945..9c186ad30d0 100644 --- a/libbeat/publisher/queue/queue.go +++ b/libbeat/publisher/queue/queue.go @@ -106,16 +106,6 @@ type ProducerConfig struct { // if ACK is set, the callback will be called with number of events produced // by the producer instance and being ACKed by the queue. ACK func(count int) - - // OnDrop is called to report events being silently dropped by - // the queue. Currently this can only happen when a Publish call is sent - // to the memory queue's request channel but the producer is cancelled - // before it reaches the queue buffer. - OnDrop func(Entry) - - // DropOnCancel is a hint to the queue to drop events if the producer disconnects - // via Cancel. - DropOnCancel bool } type EntryID uint64 @@ -134,12 +124,10 @@ type Producer interface { // the event's assigned ID, and false otherwise. TryPublish(entry Entry) (EntryID, bool) - // Cancel closes this Producer endpoint. If the producer is configured to - // drop its entries on Cancel, the number of dropped entries is returned. - // Note: A queue may still send ACK signals even after Cancel is called on - // the originating Producer. The pipeline client must accept and - // discard these ACKs. - Cancel() int + // Close closes this Producer endpoint. + // Note: A queue may still send ACK signals even after Close is called on + // the originating Producer. The pipeline client must accept these ACKs. + Close() } // Batch of entries (usually publisher.Event) to be returned to Consumers. diff --git a/libbeat/publisher/queue/queuetest/producer_cancel.go b/libbeat/publisher/queue/queuetest/producer_cancel.go deleted file mode 100644 index 6bb8a9bdd08..00000000000 --- a/libbeat/publisher/queue/queuetest/producer_cancel.go +++ /dev/null @@ -1,106 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package queuetest - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - "github.com/elastic/beats/v7/libbeat/publisher" - "github.com/elastic/beats/v7/libbeat/publisher/queue" - "github.com/elastic/elastic-agent-libs/mapstr" -) - -// TestSingleProducerConsumer tests buffered events for a producer getting -// cancelled will not be consumed anymore. Concurrent producer/consumer pairs -// might still have active events not yet ACKed (not tested here). -// -// Note: queues not requiring consumers to ACK a events in order to -// return ACKs to the producer are not supported by this test. -func TestProducerCancelRemovesEvents(t *testing.T, factory QueueFactory) { - fn := withOptLogOutput(true, func(t *testing.T) { - var ( - i int - N1 = 3 - N2 = 10 - ) - - log := NewTestLogger(t) - b := factory(t) - defer b.Close() - - log.Debug("create first producer") - producer := b.Producer(queue.ProducerConfig{ - ACK: func(int) {}, // install function pointer, so 'cancel' will remove events - DropOnCancel: true, - }) - - for ; i < N1; i++ { - log.Debugf("send event %v to first producer", i) - producer.Publish(MakeEvent(mapstr.M{ - "value": i, - })) - } - - // cancel producer - log.Debugf("cancel producer") - producer.Cancel() - - // reconnect and send some more events - log.Debug("connect new producer") - producer = b.Producer(queue.ProducerConfig{}) - for ; i < N2; i++ { - log.Debugf("send event %v to new producer", i) - producer.Publish(MakeEvent(mapstr.M{ - "value": i, - })) - } - - // consume all events - total := N2 - N1 - events := make([]interface{}, 0, total) - for len(events) < total { - batch, err := b.Get(-1) // collect all events - if err != nil { - panic(err) - } - - for i := 0; i < batch.Count(); i++ { - events = append(events, batch.Entry(i)) - } - batch.Done() - } - - // verify - if total != len(events) { - assert.Equal(t, total, len(events)) - return - } - - for i, event := range events { - pubEvent, ok := event.(publisher.Event) - assert.True(t, ok, "queue output should be the same type as its input") - value, ok := pubEvent.Content.Fields["value"].(int) - assert.True(t, ok, "event.value should be an int") - assert.Equal(t, i+N1, value) - } - }) - - fn(t) -}