diff --git a/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb b/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb index 595f4536b6d..19a28814031 100644 --- a/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb +++ b/logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb @@ -68,8 +68,7 @@ def internalReceive(events) stream_position = stream_position + 1 end) ReceiveResponse.completed() - rescue java.lang.InterruptedException, IOError => e - # maybe an IOException in enqueueing + rescue java.lang.InterruptedException, org.logstash.ackedqueue.QueueRuntimeException, IOError => e logger.debug? && logger.debug('queueing event failed', message: e.message, exception: e.class, backtrace: e.backtrace) ReceiveResponse.failed_at(stream_position, e) end diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/AckedReadBatch.java b/logstash-core/src/main/java/org/logstash/ackedqueue/AckedReadBatch.java index fab490f2049..2acdd391b59 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/AckedReadBatch.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/AckedReadBatch.java @@ -25,6 +25,7 @@ import org.logstash.execution.MemoryReadBatch; import org.logstash.execution.QueueBatch; import org.logstash.ext.JrubyEventExtLibrary.RubyEvent; + import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -33,18 +34,17 @@ /** * Persistent queue collection of events implementation - * */ + */ public final class AckedReadBatch implements QueueBatch { - private AckedBatch ackedBatch; + private final AckedBatch ackedBatch; - private Collection events; + private final Collection events; public static AckedReadBatch create( - final JRubyAckedQueueExt queue, - final int size, - final long timeout) - { + final JRubyAckedQueueExt queue, + final int size, + final long timeout) { try { final AckedBatch batch = queue.readBatch(size, timeout); return (batch == null) ? new AckedReadBatch() : new AckedReadBatch(batch); @@ -69,7 +69,7 @@ private AckedReadBatch(AckedBatch batch) { @Override public RubyArray to_a() { - @SuppressWarnings({"unchecked"}) final RubyArray result = RUBY.newArray(events.size()); + @SuppressWarnings({"unchecked"}) final RubyArray result = RUBY.newArray(events.size()); for (final RubyEvent e : events) { if (!MemoryReadBatch.isCancelled(e)) { result.append(e); diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java index c223474f2e0..0a7e4a04ccb 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java @@ -34,7 +34,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -80,7 +79,7 @@ public final class Queue implements Closeable { private final Method deserializeMethod; // thread safety - private final Lock lock = new ReentrantLock(); + private final ReentrantLock lock = new ReentrantLock(); private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); @@ -100,8 +99,9 @@ public Queue(Settings settings) { } this.dirPath = queueDir.toRealPath(); } catch (final IOException ex) { - throw new IllegalStateException(ex); + throw new IllegalStateException(QueueExceptionMessages.CANNOT_CREATE_QUEUE_DIR, ex); } + this.pageCapacity = settings.getCapacity(); this.maxBytes = settings.getQueueMaxBytes(); this.checkpointIO = new FileCheckpointIO(dirPath, settings.getCheckpointRetry()); @@ -120,7 +120,7 @@ public Queue(Settings settings) { cArg[0] = byte[].class; this.deserializeMethod = this.elementClass.getDeclaredMethod("deserialize", cArg); } catch (NoSuchMethodException e) { - throw new QueueRuntimeException("cannot find deserialize method on class " + this.elementClass.getName(), e); + throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_DESERIALIZE.concat(this.elementClass.getName()), e); } } @@ -402,6 +402,12 @@ private void newCheckpointedHeadpage(int pageNum) throws IOException { * @throws IOException if an IO error occurs */ public long write(Queueable element) throws IOException { + // pre-check before incurring serialization overhead; + // we must check again after acquiring the lock. + if (this.closed.get()) { + throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_WRITE_TO_CLOSED_QUEUE); + } + byte[] data = element.serialize(); // the write strategy with regard to the isFull() state is to assume there is space for this element @@ -413,12 +419,17 @@ public long write(Queueable element) throws IOException { lock.lock(); try { - if (! this.headPage.hasCapacity(data.length)) { - throw new IOException("data to be written is bigger than page capacity"); + // ensure that the queue is still open now that this thread has acquired the lock. + if (this.closed.get()) { + throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_WRITE_TO_CLOSED_QUEUE); + } + + if (!this.headPage.hasCapacity(data.length)) { + throw new QueueRuntimeException(QueueExceptionMessages.BIGGER_DATA_THAN_PAGE_SIZE); } // create a new head page if the current does not have sufficient space left for data to be written - if (! this.headPage.hasSpace(data.length)) { + if (!this.headPage.hasSpace(data.length)) { // TODO: verify queue state integrity WRT Queue.open()/recover() at each step of this process @@ -599,10 +610,12 @@ public synchronized Batch nonBlockReadBatch(int limit) throws IOException { * @param limit size limit of the batch to read. returned {@link Batch} can be smaller. * @param timeout the maximum time to wait in milliseconds on write operations * @return the read {@link Batch} or null if no element upon timeout + * @throws QueueRuntimeException if queue is closed * @throws IOException if an IO error occurs */ public synchronized Batch readBatch(int limit, long timeout) throws IOException { lock.lock(); + try { return readPageBatch(nextReadPage(), limit, timeout); } finally { @@ -790,17 +803,22 @@ private void releaseLockAndSwallow() { } /** - * return the {@link Page} for the next read operation. + * Return the {@link Page} for the next read operation. + * Caller MUST have exclusive access to the lock. * @return {@link Page} will be either a read-only tail page or the head page. + * @throws QueueRuntimeException if queue is closed */ - public Page nextReadPage() { - lock.lock(); - try { - // look at head page if no unreadTailPages - return (this.unreadTailPages.isEmpty()) ? this.headPage : this.unreadTailPages.get(0); - } finally { - lock.unlock(); + private Page nextReadPage() { + if (!lock.isHeldByCurrentThread()) { + throw new IllegalStateException(QueueExceptionMessages.CANNOT_READ_PAGE_WITHOUT_LOCK); } + + if (isClosed()) { + throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_READ_FROM_CLOSED_QUEUE); + } + + + return (this.unreadTailPages.isEmpty()) ? this.headPage : this.unreadTailPages.get(0); } private void removeUnreadPage(Page p) { @@ -849,7 +867,7 @@ public long getUnackedCount() { } } - private boolean isClosed() { + public boolean isClosed() { return this.closed.get(); } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueExceptionMessages.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueExceptionMessages.java new file mode 100644 index 00000000000..6b13a2b599c --- /dev/null +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueExceptionMessages.java @@ -0,0 +1,42 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +package org.logstash.ackedqueue; + +/** + * A public class holds number of descriptive messages are used during the interaction with acked queue. + */ +public class QueueExceptionMessages { + + public final static String CANNOT_READ_FROM_CLOSED_QUEUE = "Attempted to read on a closed acked queue."; + + public final static String CANNOT_WRITE_TO_CLOSED_QUEUE = "Tried to write to a closed queue."; + + public final static String BIGGER_DATA_THAN_PAGE_SIZE = "data to be written is bigger than page capacity"; + + public final static String CANNOT_CREATE_QUEUE_DIR = "Error creating queue directories."; + + public final static String CANNOT_DESERIALIZE = "cannot find deserialize method on class "; + + public final static String UNHANDLED_ERROR_WRITING_TO_QUEUE = "Unhandleable error occurred while writing to queue."; + + public final static String CANNOT_READ_PAGE_WITHOUT_LOCK = "Cannot get next read page without first acquiring the lock."; + +} diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueRuntimeException.java b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueRuntimeException.java index d40fb3d957c..e1fa4c002a5 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/QueueRuntimeException.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/QueueRuntimeException.java @@ -31,4 +31,7 @@ public QueueRuntimeException(String message, Throwable cause) { super(message, cause); } + public QueueRuntimeException(String message) { + super(message); + } } diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java index 6d39d95617f..c65cb7c2df3 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyAckedQueueExt.java @@ -21,6 +21,8 @@ package org.logstash.ackedqueue.ext; import java.io.IOException; +import java.util.Objects; + import org.jruby.Ruby; import org.jruby.RubyBoolean; import org.jruby.RubyClass; @@ -36,11 +38,12 @@ import org.logstash.ackedqueue.AckedBatch; import org.logstash.ackedqueue.Batch; import org.logstash.ackedqueue.Queue; +import org.logstash.ackedqueue.QueueExceptionMessages; import org.logstash.ackedqueue.SettingsImpl; /** * JRuby extension to wrap a persistent queue istance. - * */ + */ @JRubyClass(name = "AckedQueue") public final class JRubyAckedQueueExt extends RubyObject { @@ -68,15 +71,15 @@ public static JRubyAckedQueueExt create(String path, int capacity, int maxEvents private void initializeQueue(String path, int capacity, int maxEvents, int checkpointMaxWrites, int checkpointMaxAcks, boolean checkpointRetry, long maxBytes) { this.queue = new Queue( - SettingsImpl.fileSettingsBuilder(path) - .capacity(capacity) - .maxUnread(maxEvents) - .queueMaxBytes(maxBytes) - .checkpointMaxAcks(checkpointMaxAcks) - .checkpointMaxWrites(checkpointMaxWrites) - .checkpointRetry(checkpointRetry) - .elementClass(Event.class) - .build() + SettingsImpl.fileSettingsBuilder(path) + .capacity(capacity) + .maxUnread(maxEvents) + .queueMaxBytes(maxBytes) + .checkpointMaxAcks(checkpointMaxAcks) + .checkpointMaxWrites(checkpointMaxWrites) + .checkpointRetry(checkpointRetry) + .elementClass(Event.class) + .build() ); } @@ -132,24 +135,28 @@ public void rubyWrite(ThreadContext context, Event event) { } } - public void write(Event event) throws IOException { - this.queue.write(event); + public void write(Event event) { + try { + this.queue.write(event); + } catch (IOException e) { + throw new IllegalStateException(QueueExceptionMessages.UNHANDLED_ERROR_WRITING_TO_QUEUE, e); + } } @JRubyMethod(name = "read_batch", required = 2) - public IRubyObject ruby_read_batch(ThreadContext context, IRubyObject limit, IRubyObject timeout) { - AckedBatch b; + public IRubyObject rubyReadBatch(ThreadContext context, IRubyObject limit, IRubyObject timeout) { + AckedBatch batch; try { - b = readBatch(RubyFixnum.num2int(limit), RubyFixnum.num2int(timeout)); + batch = readBatch(RubyFixnum.num2int(limit), RubyFixnum.num2int(timeout)); } catch (IOException e) { throw RubyUtil.newRubyIOError(context.runtime, e); } - return RubyUtil.toRubyObject(b); + return RubyUtil.toRubyObject(batch); } public AckedBatch readBatch(int limit, long timeout) throws IOException { - Batch b = queue.readBatch(limit, timeout); - return (b == null) ? null : AckedBatch.create(b); + final Batch batch = queue.readBatch(limit, timeout); + return Objects.isNull(batch) ? null : AckedBatch.create(batch); } @JRubyMethod(name = "is_fully_acked?") diff --git a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java index bfa077b7031..b132afe54bc 100644 --- a/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java +++ b/logstash-core/src/main/java/org/logstash/ackedqueue/ext/JRubyWrappedAckedQueueExt.java @@ -21,7 +21,7 @@ package org.logstash.ackedqueue.ext; import java.io.IOException; -import java.util.concurrent.atomic.AtomicBoolean; + import org.jruby.Ruby; import org.jruby.RubyBoolean; import org.jruby.RubyClass; @@ -41,14 +41,13 @@ /** * JRuby extension - * */ + */ @JRubyClass(name = "WrappedAckedQueue") public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt { private static final long serialVersionUID = 1L; private JRubyAckedQueueExt queue; - private final AtomicBoolean isClosed = new AtomicBoolean(); @JRubyMethod(optional = 8) public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException { @@ -57,7 +56,7 @@ public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] int maxEvents = RubyFixnum.num2int(args[2]); int checkpointMaxWrites = RubyFixnum.num2int(args[3]); int checkpointMaxAcks = RubyFixnum.num2int(args[4]); - boolean checkpointRetry = !((RubyBoolean)args[6]).isFalse(); + boolean checkpointRetry = !((RubyBoolean) args[6]).isFalse(); long queueMaxBytes = RubyFixnum.num2long(args[7]); this.queue = JRubyAckedQueueExt.create(args[0].asJavaString(), capacity, maxEvents, @@ -78,19 +77,16 @@ public JRubyAckedQueueExt rubyGetQueue() { public void close() throws IOException { queue.close(); - isClosed.set(true); } @JRubyMethod(name = {"push", "<<"}) public void rubyPush(ThreadContext context, IRubyObject event) { - checkIfClosed("write"); queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent()); } @JRubyMethod(name = "read_batch") public IRubyObject rubyReadBatch(ThreadContext context, IRubyObject size, IRubyObject wait) { - checkIfClosed("read a batch"); - return queue.ruby_read_batch(context, size, wait); + return queue.rubyReadBatch(context, size, wait); } @JRubyMethod(name = "is_empty?") @@ -100,7 +96,7 @@ public IRubyObject rubyIsEmpty(ThreadContext context) { @Override protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext context) { - return JrubyAckedWriteClientExt.create(queue, isClosed); + return JrubyAckedWriteClientExt.create(queue); } @Override @@ -117,10 +113,4 @@ protected IRubyObject doClose(final ThreadContext context) { } return context.nil; } - - private void checkIfClosed(String action) { - if (isClosed.get()) { - throw new RuntimeException("Attempted to " + action + " on a closed AckedQueue"); - } - } } diff --git a/logstash-core/src/main/java/org/logstash/ext/JRubyAbstractQueueWriteClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JRubyAbstractQueueWriteClientExt.java index b0f65d1ce59..0ddb159201b 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JRubyAbstractQueueWriteClientExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JRubyAbstractQueueWriteClientExt.java @@ -21,6 +21,7 @@ package org.logstash.ext; import java.util.Collection; + import org.jruby.Ruby; import org.jruby.RubyBasicObject; import org.jruby.RubyClass; @@ -41,7 +42,7 @@ protected JRubyAbstractQueueWriteClientExt(final Ruby runtime, final RubyClass m @JRubyMethod(name = {"push", "<<"}, required = 1) public final JRubyAbstractQueueWriteClientExt rubyPush(final ThreadContext context, - final IRubyObject event) throws InterruptedException { + final IRubyObject event) throws InterruptedException { doPush(context, (JrubyEventExtLibrary.RubyEvent) event); return this; } @@ -49,14 +50,14 @@ public final JRubyAbstractQueueWriteClientExt rubyPush(final ThreadContext conte @SuppressWarnings("unchecked") @JRubyMethod(name = "push_batch", required = 1) public final JRubyAbstractQueueWriteClientExt rubyPushBatch(final ThreadContext context, - final IRubyObject batch) throws InterruptedException { + final IRubyObject batch) throws InterruptedException { doPushBatch(context, (Collection) batch); return this; } protected abstract JRubyAbstractQueueWriteClientExt doPush(ThreadContext context, - JrubyEventExtLibrary.RubyEvent event) throws InterruptedException; + JrubyEventExtLibrary.RubyEvent event) throws InterruptedException; protected abstract JRubyAbstractQueueWriteClientExt doPushBatch(ThreadContext context, - Collection batch) throws InterruptedException; + Collection batch) throws InterruptedException; } diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyAckedWriteClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyAckedWriteClientExt.java index 7103fc809f3..e58cc931412 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JrubyAckedWriteClientExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyAckedWriteClientExt.java @@ -20,10 +20,9 @@ package org.logstash.ext; -import java.io.IOException; import java.util.Collection; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; + import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.anno.JRubyClass; @@ -41,23 +40,8 @@ public final class JrubyAckedWriteClientExt extends JRubyAbstractQueueWriteClien private JRubyAckedQueueExt queue; - private AtomicBoolean closed = new AtomicBoolean(); - - @JRubyMethod(meta = true, required = 2) - public static JrubyAckedWriteClientExt create(final ThreadContext context, final IRubyObject recv, - final IRubyObject queue, final IRubyObject closed) { - return new JrubyAckedWriteClientExt( - context.runtime, RubyUtil.ACKED_WRITE_CLIENT_CLASS, - queue.toJava( - JRubyAckedQueueExt.class - ), - closed.toJava(AtomicBoolean.class) - ); - } - - public static JrubyAckedWriteClientExt create(final JRubyAckedQueueExt queue, final AtomicBoolean closed) { - return new JrubyAckedWriteClientExt( - RubyUtil.RUBY, RubyUtil.ACKED_WRITE_CLIENT_CLASS, queue, closed); + public static JrubyAckedWriteClientExt create(final JRubyAckedQueueExt queue) { + return new JrubyAckedWriteClientExt(RubyUtil.RUBY, RubyUtil.ACKED_WRITE_CLIENT_CLASS, queue); } public JrubyAckedWriteClientExt(final Ruby runtime, final RubyClass metaClass) { @@ -65,43 +49,30 @@ public JrubyAckedWriteClientExt(final Ruby runtime, final RubyClass metaClass) { } private JrubyAckedWriteClientExt(final Ruby runtime, final RubyClass metaClass, - final JRubyAckedQueueExt queue, final AtomicBoolean closed) { + final JRubyAckedQueueExt queue) { super(runtime, metaClass); this.queue = queue; - this.closed = closed; } @Override protected JRubyAbstractQueueWriteClientExt doPush(final ThreadContext context, - final JrubyEventExtLibrary.RubyEvent event) { - ensureOpen(); + final JrubyEventExtLibrary.RubyEvent event) { queue.rubyWrite(context, event.getEvent()); return this; } @Override protected JRubyAbstractQueueWriteClientExt doPushBatch(final ThreadContext context, - final Collection batch) { - ensureOpen(); + final Collection batch) { for (final IRubyObject event : batch) { queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent()); } return this; } - private void ensureOpen() { - if (closed.get()) { - throw new IllegalStateException("Tried to write to a closed queue."); - } - } - @Override public void push(Map event) { - try { - queue.write(new Event(event)); - } catch (IOException e) { - throw new IllegalStateException(e); - } + queue.write(new Event(event)); } } diff --git a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryWriteClientExt.java b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryWriteClientExt.java index 8c02dbde75f..0d920790947 100644 --- a/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryWriteClientExt.java +++ b/logstash-core/src/main/java/org/logstash/ext/JrubyMemoryWriteClientExt.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Map; import java.util.concurrent.BlockingQueue; + import org.jruby.Ruby; import org.jruby.RubyClass; import org.jruby.anno.JRubyClass; @@ -43,28 +44,27 @@ public JrubyMemoryWriteClientExt(final Ruby runtime, final RubyClass metaClass) } private JrubyMemoryWriteClientExt(final Ruby runtime, final RubyClass metaClass, - final BlockingQueue queue) { + final BlockingQueue queue) { super(runtime, metaClass); this.queue = queue; } public static JrubyMemoryWriteClientExt create( - final BlockingQueue queue) { + final BlockingQueue queue) { return new JrubyMemoryWriteClientExt(RubyUtil.RUBY, - RubyUtil.MEMORY_WRITE_CLIENT_CLASS, queue); + RubyUtil.MEMORY_WRITE_CLIENT_CLASS, queue); } @Override protected JRubyAbstractQueueWriteClientExt doPush(final ThreadContext context, - final JrubyEventExtLibrary.RubyEvent event) - throws InterruptedException { + final JrubyEventExtLibrary.RubyEvent event) throws InterruptedException { queue.put(event); return this; } @Override public JRubyAbstractQueueWriteClientExt doPushBatch(final ThreadContext context, - final Collection batch) throws InterruptedException { + final Collection batch) throws InterruptedException { LsQueueUtils.addAll(queue, batch); return this; } diff --git a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java index 088ac67f37b..c7bc587b182 100644 --- a/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java +++ b/logstash-core/src/main/java/org/logstash/plugins/pipeline/PipelineBus.java @@ -33,7 +33,7 @@ /** * This class is the communication bus for the `pipeline` inputs and outputs to talk to each other. - * + *

* This class is threadsafe. */ public class PipelineBus { @@ -79,13 +79,12 @@ public void sendEvents(final PipelineOutput sender, partialProcessing = ensureDelivery && !sendWasSuccess; if (partialProcessing) { if (lastResponse != null && lastResponse.getStatus() == PipelineInput.ReceiveStatus.FAIL) { - // when last call to internalReceive generated a fail, restart from the - // fail position to avoid reprocessing of some events in the downstream. - lastFailedPosition = lastResponse.getSequencePosition(); - - logger.warn("Attempted to send event to '{}' but that address reached error condition. " + - "Will Retry. Root cause {}", address, lastResponse.getCauseMessage()); - + // when last call to internalReceive generated a fail for the subset of the orderedEvents + // it is handling, restart from the cumulative last-failed position of the batch so that + // the next attempt will operate on a subset that excludes those successfully received. + lastFailedPosition += lastResponse.getSequencePosition(); + logger.warn("Attempted to send events to '{}' but that address reached error condition with {} events remaining. " + + "Will Retry. Root cause {}", address, orderedEvents.length - lastFailedPosition, lastResponse.getCauseMessage()); } else { logger.warn("Attempted to send event to '{}' but that address was unavailable. " + "Maybe the destination pipeline is down or stopping? Will Retry.", address); @@ -98,7 +97,7 @@ public void sendEvents(final PipelineOutput sender, logger.error("Sleep unexpectedly interrupted in bus retry loop", e); } } - } while(partialProcessing); + } while (partialProcessing); }); } } @@ -216,7 +215,7 @@ public void unlisten(final PipelineInput input, final String address) throws Int * Stop listening on the given address with the given listener. Blocks until upstream outputs have * stopped. * - * @param input Input that should stop listening + * @param input Input that should stop listening * @param address Address on which to stop listening * @throws InterruptedException if interrupted while attempting to stop listening */ @@ -275,6 +274,4 @@ boolean isBlockOnUnlisten() { public void setBlockOnUnlisten(boolean blockOnUnlisten) { this.blockOnUnlisten = blockOnUnlisten; } - - } diff --git a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java index e182cf160bf..25bca9fc7ae 100644 --- a/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java +++ b/logstash-core/src/test/java/org/logstash/ackedqueue/QueueTest.java @@ -40,8 +40,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; -import org.hamcrest.CoreMatchers; -import org.hamcrest.Matchers; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -50,6 +48,7 @@ import org.junit.rules.TemporaryFolder; import org.logstash.ackedqueue.io.MmapPageIOV2; +import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.notNullValue; @@ -59,6 +58,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import static org.logstash.ackedqueue.QueueTestHelpers.computeCapacityForMmapPageIO; +import static org.logstash.util.ExceptionMatcher.assertThrows; public class QueueTest { @@ -1098,7 +1098,7 @@ public void lockIsReleasedUponOpenException() throws Exception { queue.open(); fail("expected queue.open() to throws when not enough disk free"); } catch (IOException e) { - assertThat(e.getMessage(), CoreMatchers.containsString("Unable to allocate")); + assertThat(e.getMessage(), containsString("Unable to allocate")); } // at this point the Queue lock should be released and Queue.open should not throw a LockException @@ -1145,4 +1145,20 @@ public void firstUnackedPagePointToFullyAckedPurgedPage() throws Exception { assertFalse("Dangling page's checkpoint file should be removed", cp0.exists()); } } + + @Test + public void writeToClosedQueueException() throws Exception { + Settings settings = TestSettings.persistedQueueSettings(100, dataPath); + Queue queue = new Queue(settings); + + queue.open(); + queue.write(new StringElement("First test string to be written in queue.")); + queue.write(new StringElement("Second test string to be written in queue.")); + queue.close(); + + final QueueRuntimeException qre = assertThrows(QueueRuntimeException.class, () -> { + queue.write(new StringElement("Third test string to be REJECTED to write in queue.")); + }); + assertThat(qre.getMessage(), containsString("Tried to write to a closed queue.")); + } } diff --git a/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java b/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java index 9446bf4d109..76a71adc691 100644 --- a/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java +++ b/logstash-core/src/test/java/org/logstash/plugins/pipeline/PipelineBusTest.java @@ -136,7 +136,7 @@ public void listenUnlistenUpdatesOutputReceivers() throws InterruptedException { @Test public void sendingEmptyListToNowhereStillReturns() { - bus.registerSender(output, Arrays.asList("not_an_address")); + bus.registerSender(output, List.of("not_an_address")); bus.sendEvents(output, Collections.emptyList(), true); } diff --git a/logstash-core/src/test/java/org/logstash/util/ExceptionMatcher.java b/logstash-core/src/test/java/org/logstash/util/ExceptionMatcher.java new file mode 100644 index 00000000000..322d2fc206e --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/util/ExceptionMatcher.java @@ -0,0 +1,20 @@ +package org.logstash.util; + +import org.hamcrest.Matchers; +import org.junit.Assert; + +@FunctionalInterface +public interface ExceptionMatcher { + void execute() throws Throwable; + + static T assertThrows(Class expectedType, ExceptionMatcher executable) { + try { + executable.execute(); + } catch (Throwable actual) { + Assert.assertThat(actual, Matchers.instanceOf(expectedType)); + return expectedType.cast(actual); + } + + throw new AssertionError(String.format("Expected %s to be thrown, but nothing was thrown.", expectedType.getName())); + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/util/SetOnceReferenceTest.java b/logstash-core/src/test/java/org/logstash/util/SetOnceReferenceTest.java index d19f45141bb..49b25ec3327 100644 --- a/logstash-core/src/test/java/org/logstash/util/SetOnceReferenceTest.java +++ b/logstash-core/src/test/java/org/logstash/util/SetOnceReferenceTest.java @@ -1,6 +1,5 @@ package org.logstash.util; -import org.hamcrest.Matchers; import org.junit.Test; import java.util.NoSuchElementException; @@ -14,7 +13,7 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; +import static org.logstash.util.ExceptionMatcher.assertThrows; public class SetOnceReferenceTest { @Test @@ -252,17 +251,6 @@ void checkImmutableIfSetOrElseSupply(final SetOnceReference immutable, checkExpectedValue(immutable, expectedValue); } - @SuppressWarnings("SameParameterValue") - void assertThrows(final Class expectedThrowable, final Runnable runnable) { - try { - runnable.run(); - } catch (Exception e) { - assertThat("wrong exception thrown", e, Matchers.instanceOf(expectedThrowable)); - return; - } - fail(String.format("expected exception %s but nothing was thrown", expectedThrowable.getSimpleName())); - } - private static class MutableReference { T value;