diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index f196d137b88e..697771061e87 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -46,13 +46,14 @@ type asyncClient struct { } type msgRef struct { - client *asyncClient - count atomic.Uint32 - batch publisher.Batch - slice []publisher.Event - err error - win *window - batchSize int + client *asyncClient + count atomic.Uint32 + batch publisher.Batch + slice []publisher.Event + err error + win *window + batchSize int + deadlockListener *deadlockListener } func newAsyncClient( @@ -146,13 +147,13 @@ func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error { } ref := &msgRef{ - client: c, - count: atomic.MakeUint32(1), - batch: batch, - slice: events, - batchSize: len(events), - win: c.win, - err: nil, + client: c, + batch: batch, + slice: events, + batchSize: len(events), + win: c.win, + err: nil, + deadlockListener: newDeadlockListener(c.log, logstashDeadlockTimeout), } defer ref.dec() @@ -229,34 +230,21 @@ func (c *asyncClient) getClient() *v2.AsyncClient { return client } -func (r *msgRef) callback(seq uint32, err error) { - if err != nil { - r.fail(seq, err) - } else { - r.done(seq) - } -} - -func (r *msgRef) done(n uint32) { - r.client.observer.Acked(int(n)) +func (r *msgRef) callback(n uint32, err error) { + r.client.observer.AckedEvents(int(n)) r.slice = r.slice[n:] - if r.win != nil { - r.win.tryGrowWindow(r.batchSize) - } - r.dec() -} - -func (r *msgRef) fail(n uint32, err error) { + r.deadlockListener.ack(int(n)) if r.err == nil { r.err = err } - r.slice = r.slice[n:] + // If publishing is windowed, update the window size. if r.win != nil { - r.win.shrinkWindow() + if err != nil { + r.win.shrinkWindow() + } else { + r.win.tryGrowWindow(r.batchSize) + } } - - r.client.observer.Acked(int(n)) - r.dec() } @@ -266,6 +254,8 @@ func (r *msgRef) dec() { return } + r.deadlockListener.close() + if L := len(r.slice); L > 0 { r.client.observer.Failed(L) } diff --git a/libbeat/outputs/logstash/deadlock.go b/libbeat/outputs/logstash/deadlock.go new file mode 100644 index 000000000000..9a291baeda02 --- /dev/null +++ b/libbeat/outputs/logstash/deadlock.go @@ -0,0 +1,95 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logstash + +import ( + "time" + + "github.com/elastic/elastic-agent-libs/logp" +) + +type deadlockListener struct { + log *logp.Logger + timeout time.Duration + ticker *time.Ticker + + ackChan chan int + + doneChan chan struct{} +} + +const logstashDeadlockTimeout = 5 * time.Minute + +func newDeadlockListener(log *logp.Logger, timeout time.Duration) *deadlockListener { + if timeout <= 0 { + return nil + } + r := &deadlockListener{ + log: log, + timeout: timeout, + ticker: time.NewTicker(timeout), + + ackChan: make(chan int), + doneChan: make(chan struct{}), + } + go r.run() + return r +} + +func (r *deadlockListener) run() { + defer r.ticker.Stop() + defer close(r.doneChan) + for { + select { + case n, ok := <-r.ackChan: + if !ok { + // Listener has been closed + return + } + if n > 0 { + // If progress was made, reset the countdown. + r.ticker.Reset(r.timeout) + } + case <-r.ticker.C: + // No progress was made within the timeout, log error so users + // know there is likely a problem with the upstream host + r.log.Errorf("Logstash batch hasn't reported progress in the last %v, the Logstash host may be stalled. This problem can be prevented by configuring Logstash to use PipelineBusV1 or by upgrading Logstash to 8.17+, for details see https://github.com/elastic/logstash/issues/16657", r.timeout) + return + } + } +} + +func (r *deadlockListener) ack(n int) { + if r == nil { + return + } + // Send the new ack to the run loop, unless it has already shut down in + // which case it can be safely ignored. + select { + case r.ackChan <- n: + case <-r.doneChan: + } +} + +func (r *deadlockListener) close() { + if r == nil { + return + } + // Signal the run loop to shut down + close(r.ackChan) +} diff --git a/libbeat/outputs/logstash/deadlock_test.go b/libbeat/outputs/logstash/deadlock_test.go new file mode 100644 index 000000000000..15c3716b9971 --- /dev/null +++ b/libbeat/outputs/logstash/deadlock_test.go @@ -0,0 +1,51 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logstash + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/logp" +) + +func TestDeadlockListener(t *testing.T) { + const timeout = 5 * time.Millisecond + log := logp.NewLogger("test") + listener := newDeadlockListener(log, timeout) + + // Verify that the listener doesn't trigger when receiving regular acks + for i := 0; i < 5; i++ { + time.Sleep(timeout / 2) + listener.ack(1) + } + select { + case <-listener.doneChan: + require.Fail(t, "Deadlock listener should not trigger unless there is no progress for the configured time interval") + case <-time.After(timeout / 2): + } + + // Verify that the listener does trigger when the acks stop + select { + case <-time.After(timeout): + require.Fail(t, "Deadlock listener should trigger when there is no progress for the configured time interval") + case <-listener.doneChan: + } +} diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index 22e133db906c..48549bfc1f09 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -113,6 +113,8 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { return nil } + deadlockListener := newDeadlockListener(c.log, logstashDeadlockTimeout) + defer deadlockListener.close() for len(events) > 0 { // check if we need to reconnect if c.ticker != nil { @@ -146,14 +148,12 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { n, len(events), c.Host()) events = events[n:] - st.Acked(n) + st.AckedEvents(n) + deadlockListener.ack(n) if err != nil { // return batch to pipeline before reporting/counting error batch.RetryEvents(events) - if c.win != nil { - c.win.shrinkWindow() - } _ = c.Close() c.log.Errorf("Failed to publish events caused by: %+v", err) @@ -182,6 +182,7 @@ func (c *syncClient) publishWindowed(events []publisher.Event) (int, error) { n, err := c.sendEvents(events) if err != nil { + c.win.shrinkWindow() return n, err }