From acaec2777a6c94ab9580bf3c1d0f170ec52b9004 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 8 Aug 2023 16:52:38 -0700 Subject: [PATCH] Pipeline to pipeline communication acked queue improvements. (#15173) (#15243) * Pipeline to pipeline communication acked queue improvements. * Handle InterruptedException exception in input back, warn message improvement when in-flight events are partially sent and other minor such as descriptive logs, etc improvements. * Apply suggestions from code review Check if queue is open after thread acquires the lock. Co-authored-by: Ry Biesemeyer * Apply suggestions from code review Unite test case improvement: use `assertThrows` when validating the exception. Co-authored-by: Ry Biesemeyer * Pulling off of introducing wrap with operations. * Introduce functional interface to broadly use to catch the exception types. * Addressing comments: do not retry sending inflight events on case. We still throw if we get error when opening queue. * will not be reached input retry logic, removing. * Move queue close check after thread acquiring a lock. Make read next page interface private since it is an internal use purpose. * Apply suggestions from code review Leave a comment for the write lock and remove unnecessary warning with `ensure_delivery=>false` Co-authored-by: Ry Biesemeyer * Remove unused method, check if current thread acquired lock when accessing next page. * pq: getting possibly-shared access to next read page is illegal The private `Queue#nextReadPage()` method requires that the caller has exclusive ownership of the lock, and failing to have the lock is an illegal state that cannot be recoverd from; it would leak the effectively-private `Page` to a caller that cannot reliably use it without corrupting other callers. Both callers of this private method already call it with exclusive access, so this safeguard is merely to prevent future development from breaking the expectation unknowingly. As such, we throw an `IllegalStateException`. * pq: use shared queue-closed check for block and non-block reads By moving the closed-check from the blocking `Queue#readBatch` to the shared private `Queue#nextReadPage`, we ensure that both blocking reads by `Queue#readBatch` and non-blocking reads by `Queue#nonBlockReadBatch` behave the same when the queue has been closed. * pq: make exception message constants descriptive * p2p: clarify comment about cumulating retry behaviour --------- Co-authored-by: Ry Biesemeyer Co-authored-by: Ry Biesemeyer (cherry picked from commit e890049c1bf8127cd0b46cb3f3bc9a552511ddfe) Co-authored-by: Mashhur <99575341+mashhurs@users.noreply.github.com> --- .../plugins/builtin/pipeline/input.rb | 3 +- .../logstash/ackedqueue/AckedReadBatch.java | 16 +++--- .../java/org/logstash/ackedqueue/Queue.java | 50 +++++++++++++------ .../ackedqueue/QueueExceptionMessages.java | 42 ++++++++++++++++ .../ackedqueue/QueueRuntimeException.java | 3 ++ .../ackedqueue/ext/JRubyAckedQueueExt.java | 43 +++++++++------- .../ext/JRubyWrappedAckedQueueExt.java | 20 ++------ .../ext/JRubyAbstractQueueWriteClientExt.java | 9 ++-- .../ext/JrubyAckedWriteClientExt.java | 43 +++------------- .../ext/JrubyMemoryWriteClientExt.java | 12 ++--- .../plugins/pipeline/PipelineBus.java | 21 ++++---- .../org/logstash/ackedqueue/QueueTest.java | 22 ++++++-- .../plugins/pipeline/PipelineBusTest.java | 2 +- .../org/logstash/util/ExceptionMatcher.java | 20 ++++++++ .../logstash/util/SetOnceReferenceTest.java | 14 +----- 15 files changed, 186 insertions(+), 134 deletions(-) create mode 100644 logstash-core/src/main/java/org/logstash/ackedqueue/QueueExceptionMessages.java create mode 100644 logstash-core/src/test/java/org/logstash/util/ExceptionMatcher.java diff --git a/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb b/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb index 595f4536b6d..19a28814031 100644 --- a/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb +++ b/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb @@ -68,8 +68,7 @@ def internalReceive(events) stream_position = stream_position + 1 end) ReceiveResponse.completed() - rescue java.lang.InterruptedException, IOError => e - # maybe an IOException in enqueueing + rescue java.lang.InterruptedException, org.logstash.ackedqueue.QueueRuntimeException, IOError => e logger.debug? && logger.debug('queueing event failed', message: e.message, exception: e.class, backtrace: e.backtrace) ReceiveResponse.failed_at(stream_position, e) end diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/AckedReadBatch.java b/logstash-core/src/main/java/org/logstash/ackedqueue/AckedReadBatch.java index fab490f2049..2acdd391b59 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/AckedReadBatch.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/AckedReadBatch.java @@ -25,6 +25,7 @@ import org.logstash.execution.MemoryReadBatch; import org.logstash.execution.QueueBatch; import org.logstash.ext.JrubyEventExtLibrary.RubyEvent; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -33,18 +34,17 @@ /** * Persistent queue collection of events implementation - * */ + */ public final class AckedReadBatch implements QueueBatch { - private AckedBatch ackedBatch; + private final AckedBatch ackedBatch; - private Collection events; + private final Collection events; public static AckedReadBatch create( - final JRubyAckedQueueExt queue, - final int size, - final long timeout) - { + final JRubyAckedQueueExt queue, + final int size, + final long timeout) { try { final AckedBatch batch = queue.readBatch(size, timeout); return (batch == null) ? new AckedReadBatch() : new AckedReadBatch(batch); @@ -69,7 +69,7 @@ private AckedReadBatch(AckedBatch batch) { @Override public RubyArray to_a() { - @SuppressWarnings({"unchecked"}) final RubyArray result = RUBY.newArray(events.size()); + @SuppressWarnings({"unchecked"}) final RubyArray result = RUBY.newArray(events.size()); for (final RubyEvent e : events) { if (!MemoryReadBatch.isCancelled(e)) { result.append(e); diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index c223474f2e0..0a7e4a04ccb 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -80,7 +79,7 @@ public final class Queue implements Closeable { private final Method deserializeMethod; // thread safety - private final Lock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); @@ -100,8 +99,9 @@ public Queue(Settings settings) { } this.dirPath = queueDir.toRealPath(); } catch (final IOException ex) { - throw new IllegalStateException(ex); + throw new IllegalStateException(QueueExceptionMessages.CANNOT_CREATE_QUEUE_DIR, ex); } + this.pageCapacity = settings.getCapacity(); this.maxBytes = settings.getQueueMaxBytes(); this.checkpointIO = new FileCheckpointIO(dirPath, settings.getCheckpointRetry()); @@ -120,7 +120,7 @@ public Queue(Settings settings) { cArg[0] = byte[].class; this.deserializeMethod = this.elementClass.getDeclaredMethod("deserialize", cArg); } catch (NoSuchMethodException e) { - throw new QueueRuntimeException("cannot find deserialize method on class " + this.elementClass.getName(), e); + throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_DESERIALIZE.concat(this.elementClass.getName()), e); } } @@ -402,6 +402,12 @@ private void newCheckpointedHeadpage(int pageNum) throws IOException { * @throws IOException if an IO error occurs */ public long write(Queueable element) throws IOException { + // pre-check before incurring serialization overhead; + // we must check again after acquiring the lock. + if (this.closed.get()) { + throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_WRITE_TO_CLOSED_QUEUE); + } + byte[] data = element.serialize(); // the write strategy with regard to the isFull() state is to assume there is space for this element @@ -413,12 +419,17 @@ public long write(Queueable element) throws IOException { lock.lock(); try { - if (! this.headPage.hasCapacity(data.length)) { - throw new IOException("data to be written is bigger than page capacity"); + // ensure that the queue is still open now that this thread has acquired the lock. + if (this.closed.get()) { + throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_WRITE_TO_CLOSED_QUEUE); + } + + if (!this.headPage.hasCapacity(data.length)) { + throw new QueueRuntimeException(QueueExceptionMessages.BIGGER_DATA_THAN_PAGE_SIZE); } // create a new head page if the current does not have sufficient space left for data to be written - if (! this.headPage.hasSpace(data.length)) { + if (!this.headPage.hasSpace(data.length)) { // TODO: verify queue state integrity WRT Queue.open()/recover() at each step of this process @@ -599,10 +610,12 @@ public synchronized Batch nonBlockReadBatch(int limit) throws IOException { * @param limit size limit of the batch to read. returned {@link Batch} can be smaller. * @param timeout the maximum time to wait in milliseconds on write operations * @return the read {@link Batch} or null if no element upon timeout + * @throws QueueRuntimeException if queue is closed * @throws IOException if an IO error occurs */ public synchronized Batch readBatch(int limit, long timeout) throws IOException { lock.lock(); + try { return readPageBatch(nextReadPage(), limit, timeout); } finally { @@ -790,17 +803,22 @@ private void releaseLockAndSwallow() { } /** - * return the {@link Page} for the next read operation. + * Return the {@link Page} for the next read operation. + * Caller MUST have exclusive access to the lock. * @return {@link Page} will be either a read-only tail page or the head page. + * @throws QueueRuntimeException if queue is closed */ - public Page nextReadPage() { - lock.lock(); - try { - // look at head page if no unreadTailPages - return (this.unreadTailPages.isEmpty()) ? this.headPage : this.unreadTailPages.get(0); - } finally { - lock.unlock(); + private Page nextReadPage() { + if (!lock.isHeldByCurrentThread()) { + throw new IllegalStateException(QueueExceptionMessages.CANNOT_READ_PAGE_WITHOUT_LOCK); } + + if (isClosed()) { + throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_READ_FROM_CLOSED_QUEUE); + } + + + return (this.unreadTailPages.isEmpty()) ? this.headPage : this.unreadTailPages.get(0); } private void removeUnreadPage(Page p) { @@ -849,7 +867,7 @@ public long getUnackedCount() { } } - private boolean isClosed() { + public boolean isClosed() { return this.closed.get(); } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueExceptionMessages.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueExceptionMessages.java new file mode 100644 index 00000000000..6b13a2b599c --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueExceptionMessages.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.logstash.ackedqueue; + +/** + * A public class holds number of descriptive messages are used during the interaction with acked queue. + */ +public class QueueExceptionMessages { + + public final static String CANNOT_READ_FROM_CLOSED_QUEUE = "Attempted to read on a closed acked queue."; + + public final static String CANNOT_WRITE_TO_CLOSED_QUEUE = "Tried to write to a closed queue."; + + public final static String BIGGER_DATA_THAN_PAGE_SIZE = "data to be written is bigger than page capacity"; + + public final static String CANNOT_CREATE_QUEUE_DIR = "Error creating queue directories."; + + public final static String CANNOT_DESERIALIZE = "cannot find deserialize method on class "; + + public final static String UNHANDLED_ERROR_WRITING_TO_QUEUE = "Unhandleable error occurred while writing to queue."; + + public final static String CANNOT_READ_PAGE_WITHOUT_LOCK = "Cannot get next read page without first acquiring the lock."; + +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueRuntimeException.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueRuntimeException.java index d40fb3d957c..e1fa4c002a5 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueRuntimeException.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueRuntimeException.java @@ -31,4 +31,7 @@ public QueueRuntimeException(String message, Throwable cause) { super(message, cause); } + public QueueRuntimeException(String message) { + super(message); + } } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java index 6d39d95617f..c65cb7c2df3 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java @@ -21,6 +21,8 @@ package org.logstash.ackedqueue.ext; import java.io.IOException; +import java.util.Objects; + import org.jruby.Ruby; import org.jruby.RubyBoolean; import org.jruby.RubyClass; @@ -36,11 +38,12 @@ import org.logstash.ackedqueue.AckedBatch; import org.logstash.ackedqueue.Batch; import org.logstash.ackedqueue.Queue; +import org.logstash.ackedqueue.QueueExceptionMessages; import org.logstash.ackedqueue.SettingsImpl; /** * JRuby extension to wrap a persistent queue istance. - * */ + */ @JRubyClass(name = "AckedQueue") public final class JRubyAckedQueueExt extends RubyObject { @@ -68,15 +71,15 @@ public static JRubyAckedQueueExt create(String path, int capacity, int maxEvents private void initializeQueue(String path, int capacity, int maxEvents, int checkpointMaxWrites, int checkpointMaxAcks, boolean checkpointRetry, long maxBytes) { this.queue = new Queue( - SettingsImpl.fileSettingsBuilder(path) - .capacity(capacity) - .maxUnread(maxEvents) - .queueMaxBytes(maxBytes) - .checkpointMaxAcks(checkpointMaxAcks) - .checkpointMaxWrites(checkpointMaxWrites) - .checkpointRetry(checkpointRetry) - .elementClass(Event.class) - .build() + SettingsImpl.fileSettingsBuilder(path) + .capacity(capacity) + .maxUnread(maxEvents) + .queueMaxBytes(maxBytes) + .checkpointMaxAcks(checkpointMaxAcks) + .checkpointMaxWrites(checkpointMaxWrites) + .checkpointRetry(checkpointRetry) + .elementClass(Event.class) + .build() ); } @@ -132,24 +135,28 @@ public void rubyWrite(ThreadContext context, Event event) { } } - public void write(Event event) throws IOException { - this.queue.write(event); + public void write(Event event) { + try { + this.queue.write(event); + } catch (IOException e) { + throw new IllegalStateException(QueueExceptionMessages.UNHANDLED_ERROR_WRITING_TO_QUEUE, e); + } } @JRubyMethod(name = "read_batch", required = 2) - public IRubyObject ruby_read_batch(ThreadContext context, IRubyObject limit, IRubyObject timeout) { - AckedBatch b; + public IRubyObject rubyReadBatch(ThreadContext context, IRubyObject limit, IRubyObject timeout) { + AckedBatch batch; try { - b = readBatch(RubyFixnum.num2int(limit), RubyFixnum.num2int(timeout)); + batch = readBatch(RubyFixnum.num2int(limit), RubyFixnum.num2int(timeout)); } catch (IOException e) { throw RubyUtil.newRubyIOError(context.runtime, e); } - return RubyUtil.toRubyObject(b); + return RubyUtil.toRubyObject(batch); } public AckedBatch readBatch(int limit, long timeout) throws IOException { - Batch b = queue.readBatch(limit, timeout); - return (b == null) ? null : AckedBatch.create(b); + final Batch batch = queue.readBatch(limit, timeout); + return Objects.isNull(batch) ? null : AckedBatch.create(batch); } @JRubyMethod(name = "is_fully_acked?") diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java index bfa077b7031..b132afe54bc 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java @@ -21,7 +21,7 @@ package org.logstash.ackedqueue.ext; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; + import org.jruby.Ruby; import org.jruby.RubyBoolean; import org.jruby.RubyClass; @@ -41,14 +41,13 @@ /** * JRuby extension - * */ + */ @JRubyClass(name = "WrappedAckedQueue") public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt { private static final long serialVersionUID = 1L; private JRubyAckedQueueExt queue; - private final AtomicBoolean isClosed = new AtomicBoolean(); @JRubyMethod(optional = 8) public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException { @@ -57,7 +56,7 @@ public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] int maxEvents = RubyFixnum.num2int(args[2]); int checkpointMaxWrites = RubyFixnum.num2int(args[3]); int checkpointMaxAcks = RubyFixnum.num2int(args[4]); - boolean checkpointRetry = !((RubyBoolean)args[6]).isFalse(); + boolean checkpointRetry = !((RubyBoolean) args[6]).isFalse(); long queueMaxBytes = RubyFixnum.num2long(args[7]); this.queue = JRubyAckedQueueExt.create(args[0].asJavaString(), capacity, maxEvents, @@ -78,19 +77,16 @@ public JRubyAckedQueueExt rubyGetQueue() { public void close() throws IOException { queue.close(); - isClosed.set(true); } @JRubyMethod(name = {"push", "<<"}) public void rubyPush(ThreadContext context, IRubyObject event) { - checkIfClosed("write"); queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent()); } @JRubyMethod(name = "read_batch") public IRubyObject rubyReadBatch(ThreadContext context, IRubyObject size, IRubyObject wait) { - checkIfClosed("read a batch"); - return queue.ruby_read_batch(context, size, wait); + return queue.rubyReadBatch(context, size, wait); } @JRubyMethod(name = "is_empty?") @@ -100,7 +96,7 @@ public IRubyObject rubyIsEmpty(ThreadContext context) { @Override protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext context) { - return JrubyAckedWriteClientExt.create(queue, isClosed); + return JrubyAckedWriteClientExt.create(queue); } @Override @@ -117,10 +113,4 @@ protected IRubyObject doClose(final ThreadContext context) { } return context.nil; } - - private void checkIfClosed(String action) { - if (isClosed.get()) { - throw new RuntimeException("Attempted to " + action + " on a closed AckedQueue"); - } - } } diff --git a/logstash-core/src/main/java/org/logstash/ext/JRubyAbstractQueueWriteClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JRubyAbstractQueueWriteClientExt.java index b0f65d1ce59..0ddb159201b 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JRubyAbstractQueueWriteClientExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JRubyAbstractQueueWriteClientExt.java @@ -21,6 +21,7 @@ package org.logstash.ext; import java.util.Collection; + import org.jruby.Ruby; import org.jruby.RubyBasicObject; import org.jruby.RubyClass; @@ -41,7 +42,7 @@ protected JRubyAbstractQueueWriteClientExt(final Ruby runtime, final RubyClass m @JRubyMethod(name = {"push", "<<"}, required = 1) public final JRubyAbstractQueueWriteClientExt rubyPush(final ThreadContext context, - final IRubyObject event) throws InterruptedException { + final IRubyObject event) throws InterruptedException { doPush(context, (JrubyEventExtLibrary.RubyEvent) event); return this; } @@ -49,14 +50,14 @@ public final JRubyAbstractQueueWriteClientExt rubyPush(final ThreadContext conte @SuppressWarnings("unchecked") @JRubyMethod(name = "push_batch", required = 1) public final JRubyAbstractQueueWriteClientExt rubyPushBatch(final ThreadContext context, - final IRubyObject batch) throws InterruptedException { + final IRubyObject batch) throws InterruptedException { doPushBatch(context, (Collection) batch); return this; } protected abstract JRubyAbstractQueueWriteClientExt doPush(ThreadContext context, - JrubyEventExtLibrary.RubyEvent event) throws InterruptedException; + JrubyEventExtLibrary.RubyEvent event) throws InterruptedException; protected abstract JRubyAbstractQueueWriteClientExt doPushBatch(ThreadContext context, - Collection batch) throws InterruptedException; + Collection batch) throws InterruptedException; } diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyAckedWriteClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyAckedWriteClientExt.java index 7103fc809f3..e58cc931412 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JrubyAckedWriteClientExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyAckedWriteClientExt.java @@ -20,10 +20,9 @@ package org.logstash.ext; -import java.io.IOException; import java.util.Collection; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; + import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.anno.JRubyClass; @@ -41,23 +40,8 @@ public final class JrubyAckedWriteClientExt extends JRubyAbstractQueueWriteClien private JRubyAckedQueueExt queue; - private AtomicBoolean closed = new AtomicBoolean(); - - @JRubyMethod(meta = true, required = 2) - public static JrubyAckedWriteClientExt create(final ThreadContext context, final IRubyObject recv, - final IRubyObject queue, final IRubyObject closed) { - return new JrubyAckedWriteClientExt( - context.runtime, RubyUtil.ACKED_WRITE_CLIENT_CLASS, - queue.toJava( - JRubyAckedQueueExt.class - ), - closed.toJava(AtomicBoolean.class) - ); - } - - public static JrubyAckedWriteClientExt create(final JRubyAckedQueueExt queue, final AtomicBoolean closed) { - return new JrubyAckedWriteClientExt( - RubyUtil.RUBY, RubyUtil.ACKED_WRITE_CLIENT_CLASS, queue, closed); + public static JrubyAckedWriteClientExt create(final JRubyAckedQueueExt queue) { + return new JrubyAckedWriteClientExt(RubyUtil.RUBY, RubyUtil.ACKED_WRITE_CLIENT_CLASS, queue); } public JrubyAckedWriteClientExt(final Ruby runtime, final RubyClass metaClass) { @@ -65,43 +49,30 @@ public JrubyAckedWriteClientExt(final Ruby runtime, final RubyClass metaClass) { } private JrubyAckedWriteClientExt(final Ruby runtime, final RubyClass metaClass, - final JRubyAckedQueueExt queue, final AtomicBoolean closed) { + final JRubyAckedQueueExt queue) { super(runtime, metaClass); this.queue = queue; - this.closed = closed; } @Override protected JRubyAbstractQueueWriteClientExt doPush(final ThreadContext context, - final JrubyEventExtLibrary.RubyEvent event) { - ensureOpen(); + final JrubyEventExtLibrary.RubyEvent event) { queue.rubyWrite(context, event.getEvent()); return this; } @Override protected JRubyAbstractQueueWriteClientExt doPushBatch(final ThreadContext context, - final Collection batch) { - ensureOpen(); + final Collection batch) { for (final IRubyObject event : batch) { queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent()); } return this; } - private void ensureOpen() { - if (closed.get()) { - throw new IllegalStateException("Tried to write to a closed queue."); - } - } - @Override public void push(Map event) { - try { - queue.write(new Event(event)); - } catch (IOException e) { - throw new IllegalStateException(e); - } + queue.write(new Event(event)); } } diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryWriteClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryWriteClientExt.java index 8c02dbde75f..0d920790947 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryWriteClientExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryWriteClientExt.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.BlockingQueue; + import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.anno.JRubyClass; @@ -43,28 +44,27 @@ public JrubyMemoryWriteClientExt(final Ruby runtime, final RubyClass metaClass) } private JrubyMemoryWriteClientExt(final Ruby runtime, final RubyClass metaClass, - final BlockingQueue queue) { + final BlockingQueue queue) { super(runtime, metaClass); this.queue = queue; } public static JrubyMemoryWriteClientExt create( - final BlockingQueue queue) { + final BlockingQueue queue) { return new JrubyMemoryWriteClientExt(RubyUtil.RUBY, - RubyUtil.MEMORY_WRITE_CLIENT_CLASS, queue); + RubyUtil.MEMORY_WRITE_CLIENT_CLASS, queue); } @Override protected JRubyAbstractQueueWriteClientExt doPush(final ThreadContext context, - final JrubyEventExtLibrary.RubyEvent event) - throws InterruptedException { + final JrubyEventExtLibrary.RubyEvent event) throws InterruptedException { queue.put(event); return this; } @Override public JRubyAbstractQueueWriteClientExt doPushBatch(final ThreadContext context, - final Collection batch) throws InterruptedException { + final Collection batch) throws InterruptedException { LsQueueUtils.addAll(queue, batch); return this; } diff --git a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java index 088ac67f37b..c7bc587b182 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java +++ b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java @@ -33,7 +33,7 @@ /** * This class is the communication bus for the `pipeline` inputs and outputs to talk to each other. - * + *

* This class is threadsafe. */ public class PipelineBus { @@ -79,13 +79,12 @@ public void sendEvents(final PipelineOutput sender, partialProcessing = ensureDelivery && !sendWasSuccess; if (partialProcessing) { if (lastResponse != null && lastResponse.getStatus() == PipelineInput.ReceiveStatus.FAIL) { - // when last call to internalReceive generated a fail, restart from the - // fail position to avoid reprocessing of some events in the downstream. - lastFailedPosition = lastResponse.getSequencePosition(); - - logger.warn("Attempted to send event to '{}' but that address reached error condition. " + - "Will Retry. Root cause {}", address, lastResponse.getCauseMessage()); - + // when last call to internalReceive generated a fail for the subset of the orderedEvents + // it is handling, restart from the cumulative last-failed position of the batch so that + // the next attempt will operate on a subset that excludes those successfully received. + lastFailedPosition += lastResponse.getSequencePosition(); + logger.warn("Attempted to send events to '{}' but that address reached error condition with {} events remaining. " + + "Will Retry. Root cause {}", address, orderedEvents.length - lastFailedPosition, lastResponse.getCauseMessage()); } else { logger.warn("Attempted to send event to '{}' but that address was unavailable. " + "Maybe the destination pipeline is down or stopping? Will Retry.", address); @@ -98,7 +97,7 @@ public void sendEvents(final PipelineOutput sender, logger.error("Sleep unexpectedly interrupted in bus retry loop", e); } } - } while(partialProcessing); + } while (partialProcessing); }); } } @@ -216,7 +215,7 @@ public void unlisten(final PipelineInput input, final String address) throws Int * Stop listening on the given address with the given listener. Blocks until upstream outputs have * stopped. * - * @param input Input that should stop listening + * @param input Input that should stop listening * @param address Address on which to stop listening * @throws InterruptedException if interrupted while attempting to stop listening */ @@ -275,6 +274,4 @@ boolean isBlockOnUnlisten() { public void setBlockOnUnlisten(boolean blockOnUnlisten) { this.blockOnUnlisten = blockOnUnlisten; } - - } diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index e182cf160bf..25bca9fc7ae 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -40,8 +40,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import org.hamcrest.CoreMatchers; -import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -50,6 +48,7 @@ import org.junit.rules.TemporaryFolder; import org.logstash.ackedqueue.io.MmapPageIOV2; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; @@ -59,6 +58,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import static org.logstash.ackedqueue.QueueTestHelpers.computeCapacityForMmapPageIO; +import static org.logstash.util.ExceptionMatcher.assertThrows; public class QueueTest { @@ -1098,7 +1098,7 @@ public void lockIsReleasedUponOpenException() throws Exception { queue.open(); fail("expected queue.open() to throws when not enough disk free"); } catch (IOException e) { - assertThat(e.getMessage(), CoreMatchers.containsString("Unable to allocate")); + assertThat(e.getMessage(), containsString("Unable to allocate")); } // at this point the Queue lock should be released and Queue.open should not throw a LockException @@ -1145,4 +1145,20 @@ public void firstUnackedPagePointToFullyAckedPurgedPage() throws Exception { assertFalse("Dangling page's checkpoint file should be removed", cp0.exists()); } } + + @Test + public void writeToClosedQueueException() throws Exception { + Settings settings = TestSettings.persistedQueueSettings(100, dataPath); + Queue queue = new Queue(settings); + + queue.open(); + queue.write(new StringElement("First test string to be written in queue.")); + queue.write(new StringElement("Second test string to be written in queue.")); + queue.close(); + + final QueueRuntimeException qre = assertThrows(QueueRuntimeException.class, () -> { + queue.write(new StringElement("Third test string to be REJECTED to write in queue.")); + }); + assertThat(qre.getMessage(), containsString("Tried to write to a closed queue.")); + } } diff --git a/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java b/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java index 9446bf4d109..76a71adc691 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java +++ b/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java @@ -136,7 +136,7 @@ public void listenUnlistenUpdatesOutputReceivers() throws InterruptedException { @Test public void sendingEmptyListToNowhereStillReturns() { - bus.registerSender(output, Arrays.asList("not_an_address")); + bus.registerSender(output, List.of("not_an_address")); bus.sendEvents(output, Collections.emptyList(), true); } diff --git a/logstash-core/src/test/java/org/logstash/util/ExceptionMatcher.java b/logstash-core/src/test/java/org/logstash/util/ExceptionMatcher.java new file mode 100644 index 00000000000..322d2fc206e --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/util/ExceptionMatcher.java @@ -0,0 +1,20 @@ +package org.logstash.util; + +import org.hamcrest.Matchers; +import org.junit.Assert; + +@FunctionalInterface +public interface ExceptionMatcher { + void execute() throws Throwable; + + static T assertThrows(Class expectedType, ExceptionMatcher executable) { + try { + executable.execute(); + } catch (Throwable actual) { + Assert.assertThat(actual, Matchers.instanceOf(expectedType)); + return expectedType.cast(actual); + } + + throw new AssertionError(String.format("Expected %s to be thrown, but nothing was thrown.", expectedType.getName())); + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/util/SetOnceReferenceTest.java b/logstash-core/src/test/java/org/logstash/util/SetOnceReferenceTest.java index d19f45141bb..49b25ec3327 100644 --- a/logstash-core/src/test/java/org/logstash/util/SetOnceReferenceTest.java +++ b/logstash-core/src/test/java/org/logstash/util/SetOnceReferenceTest.java @@ -1,6 +1,5 @@ package org.logstash.util; -import org.hamcrest.Matchers; import org.junit.Test; import java.util.NoSuchElementException; @@ -14,7 +13,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; +import static org.logstash.util.ExceptionMatcher.assertThrows; public class SetOnceReferenceTest { @Test @@ -252,17 +251,6 @@ void checkImmutableIfSetOrElseSupply(final SetOnceReference immutable, checkExpectedValue(immutable, expectedValue); } - @SuppressWarnings("SameParameterValue") - void assertThrows(final Class expectedThrowable, final Runnable runnable) { - try { - runnable.run(); - } catch (Exception e) { - assertThat("wrong exception thrown", e, Matchers.instanceOf(expectedThrowable)); - return; - } - fail(String.format("expected exception %s but nothing was thrown", expectedThrowable.getSimpleName())); - } - private static class MutableReference { T value;