Skip to content

Commit

Permalink
Pipeline to pipeline communication acked queue improvements. (#15173) (
Browse files Browse the repository at this point in the history
…#15243)

* Pipeline to pipeline communication acked queue improvements.

* Handle InterruptedException exception in input back, warn message improvement when in-flight events are partially sent and other minor such as descriptive logs, etc improvements.

* Apply suggestions from code review

Check if queue is open after thread acquires the lock.

Co-authored-by: Ry Biesemeyer <[email protected]>

* Apply suggestions from code review

Unite test case improvement: use `assertThrows` when validating the exception.

Co-authored-by: Ry Biesemeyer <[email protected]>

* Pulling off of introducing  wrap with  operations.

* Introduce  functional interface to broadly use to catch the exception types.

* Addressing comments: do not retry sending inflight events on  case. We still throw  if we get error when opening queue.

* will not be reached input retry logic, removing.

* Move queue close check after thread acquiring a lock. Make read next page interface private since it is an internal use purpose.

* Apply suggestions from code review

Leave a comment for the write lock and remove unnecessary warning with `ensure_delivery=>false`

Co-authored-by: Ry Biesemeyer <[email protected]>

* Remove unused  method, check if current thread acquired lock when accessing next page.

* pq: getting possibly-shared access to next read page is illegal

The private `Queue#nextReadPage()` method requires that the caller has
exclusive ownership of the lock, and failing to have the lock is an
illegal state that cannot be recoverd from; it would leak the
effectively-private `Page` to a caller that cannot reliably use it
without corrupting other callers.

Both callers of this private method already call it with exclusive
access, so this safeguard is merely to prevent future development from
breaking the expectation unknowingly.

As such, we throw an `IllegalStateException`.

* pq: use shared queue-closed check for block and non-block reads

By moving the closed-check from the blocking `Queue#readBatch` to the
shared private `Queue#nextReadPage`, we ensure that both blocking reads
by `Queue#readBatch` and non-blocking reads by `Queue#nonBlockReadBatch`
behave the same when the queue has been closed.

* pq: make exception message constants descriptive

* p2p: clarify comment about cumulating retry behaviour

---------

Co-authored-by: Ry Biesemeyer <[email protected]>
Co-authored-by: Ry Biesemeyer <[email protected]>
(cherry picked from commit e890049)

Co-authored-by: Mashhur <[email protected]>
  • Loading branch information
github-actions[bot] and mashhurs authored Aug 8, 2023
1 parent de4241a commit acaec27
Show file tree
Hide file tree
Showing 15 changed files with 186 additions and 134 deletions.
3 changes: 1 addition & 2 deletions logstash-core/lib/logstash/plugins/builtin/pipeline/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ def internalReceive(events)
stream_position = stream_position + 1
end)
ReceiveResponse.completed()
rescue java.lang.InterruptedException, IOError => e
# maybe an IOException in enqueueing
rescue java.lang.InterruptedException, org.logstash.ackedqueue.QueueRuntimeException, IOError => e
logger.debug? && logger.debug('queueing event failed', message: e.message, exception: e.class, backtrace: e.backtrace)
ReceiveResponse.failed_at(stream_position, e)
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.logstash.execution.MemoryReadBatch;
import org.logstash.execution.QueueBatch;
import org.logstash.ext.JrubyEventExtLibrary.RubyEvent;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -33,18 +34,17 @@

/**
* Persistent queue collection of events implementation
* */
*/
public final class AckedReadBatch implements QueueBatch {

private AckedBatch ackedBatch;
private final AckedBatch ackedBatch;

private Collection<RubyEvent> events;
private final Collection<RubyEvent> events;

public static AckedReadBatch create(
final JRubyAckedQueueExt queue,
final int size,
final long timeout)
{
final JRubyAckedQueueExt queue,
final int size,
final long timeout) {
try {
final AckedBatch batch = queue.readBatch(size, timeout);
return (batch == null) ? new AckedReadBatch() : new AckedReadBatch(batch);
Expand All @@ -69,7 +69,7 @@ private AckedReadBatch(AckedBatch batch) {

@Override
public RubyArray<RubyEvent> to_a() {
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> result = RUBY.newArray(events.size());
@SuppressWarnings({"unchecked"}) final RubyArray<RubyEvent> result = RUBY.newArray(events.size());
for (final RubyEvent e : events) {
if (!MemoryReadBatch.isCancelled(e)) {
result.append(e);
Expand Down
50 changes: 34 additions & 16 deletions logstash-core/src/main/java/org/logstash/ackedqueue/Queue.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -80,7 +79,7 @@ public final class Queue implements Closeable {
private final Method deserializeMethod;

// thread safety
private final Lock lock = new ReentrantLock();
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

Expand All @@ -100,8 +99,9 @@ public Queue(Settings settings) {
}
this.dirPath = queueDir.toRealPath();
} catch (final IOException ex) {
throw new IllegalStateException(ex);
throw new IllegalStateException(QueueExceptionMessages.CANNOT_CREATE_QUEUE_DIR, ex);
}

this.pageCapacity = settings.getCapacity();
this.maxBytes = settings.getQueueMaxBytes();
this.checkpointIO = new FileCheckpointIO(dirPath, settings.getCheckpointRetry());
Expand All @@ -120,7 +120,7 @@ public Queue(Settings settings) {
cArg[0] = byte[].class;
this.deserializeMethod = this.elementClass.getDeclaredMethod("deserialize", cArg);
} catch (NoSuchMethodException e) {
throw new QueueRuntimeException("cannot find deserialize method on class " + this.elementClass.getName(), e);
throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_DESERIALIZE.concat(this.elementClass.getName()), e);
}
}

Expand Down Expand Up @@ -402,6 +402,12 @@ private void newCheckpointedHeadpage(int pageNum) throws IOException {
* @throws IOException if an IO error occurs
*/
public long write(Queueable element) throws IOException {
// pre-check before incurring serialization overhead;
// we must check again after acquiring the lock.
if (this.closed.get()) {
throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_WRITE_TO_CLOSED_QUEUE);
}

byte[] data = element.serialize();

// the write strategy with regard to the isFull() state is to assume there is space for this element
Expand All @@ -413,12 +419,17 @@ public long write(Queueable element) throws IOException {

lock.lock();
try {
if (! this.headPage.hasCapacity(data.length)) {
throw new IOException("data to be written is bigger than page capacity");
// ensure that the queue is still open now that this thread has acquired the lock.
if (this.closed.get()) {
throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_WRITE_TO_CLOSED_QUEUE);
}

if (!this.headPage.hasCapacity(data.length)) {
throw new QueueRuntimeException(QueueExceptionMessages.BIGGER_DATA_THAN_PAGE_SIZE);
}

// create a new head page if the current does not have sufficient space left for data to be written
if (! this.headPage.hasSpace(data.length)) {
if (!this.headPage.hasSpace(data.length)) {

// TODO: verify queue state integrity WRT Queue.open()/recover() at each step of this process

Expand Down Expand Up @@ -599,10 +610,12 @@ public synchronized Batch nonBlockReadBatch(int limit) throws IOException {
* @param limit size limit of the batch to read. returned {@link Batch} can be smaller.
* @param timeout the maximum time to wait in milliseconds on write operations
* @return the read {@link Batch} or null if no element upon timeout
* @throws QueueRuntimeException if queue is closed
* @throws IOException if an IO error occurs
*/
public synchronized Batch readBatch(int limit, long timeout) throws IOException {
lock.lock();

try {
return readPageBatch(nextReadPage(), limit, timeout);
} finally {
Expand Down Expand Up @@ -790,17 +803,22 @@ private void releaseLockAndSwallow() {
}

/**
* return the {@link Page} for the next read operation.
* Return the {@link Page} for the next read operation.
* Caller <em>MUST</em> have exclusive access to the lock.
* @return {@link Page} will be either a read-only tail page or the head page.
* @throws QueueRuntimeException if queue is closed
*/
public Page nextReadPage() {
lock.lock();
try {
// look at head page if no unreadTailPages
return (this.unreadTailPages.isEmpty()) ? this.headPage : this.unreadTailPages.get(0);
} finally {
lock.unlock();
private Page nextReadPage() {
if (!lock.isHeldByCurrentThread()) {
throw new IllegalStateException(QueueExceptionMessages.CANNOT_READ_PAGE_WITHOUT_LOCK);
}

if (isClosed()) {
throw new QueueRuntimeException(QueueExceptionMessages.CANNOT_READ_FROM_CLOSED_QUEUE);
}


return (this.unreadTailPages.isEmpty()) ? this.headPage : this.unreadTailPages.get(0);
}

private void removeUnreadPage(Page p) {
Expand Down Expand Up @@ -849,7 +867,7 @@ public long getUnackedCount() {
}
}

private boolean isClosed() {
public boolean isClosed() {
return this.closed.get();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/


package org.logstash.ackedqueue;

/**
* A public class holds number of descriptive messages are used during the interaction with acked queue.
*/
public class QueueExceptionMessages {

public final static String CANNOT_READ_FROM_CLOSED_QUEUE = "Attempted to read on a closed acked queue.";

public final static String CANNOT_WRITE_TO_CLOSED_QUEUE = "Tried to write to a closed queue.";

public final static String BIGGER_DATA_THAN_PAGE_SIZE = "data to be written is bigger than page capacity";

public final static String CANNOT_CREATE_QUEUE_DIR = "Error creating queue directories.";

public final static String CANNOT_DESERIALIZE = "cannot find deserialize method on class ";

public final static String UNHANDLED_ERROR_WRITING_TO_QUEUE = "Unhandleable error occurred while writing to queue.";

public final static String CANNOT_READ_PAGE_WITHOUT_LOCK = "Cannot get next read page without first acquiring the lock.";

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,7 @@ public QueueRuntimeException(String message, Throwable cause) {
super(message, cause);
}

public QueueRuntimeException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package org.logstash.ackedqueue.ext;

import java.io.IOException;
import java.util.Objects;

import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
Expand All @@ -36,11 +38,12 @@
import org.logstash.ackedqueue.AckedBatch;
import org.logstash.ackedqueue.Batch;
import org.logstash.ackedqueue.Queue;
import org.logstash.ackedqueue.QueueExceptionMessages;
import org.logstash.ackedqueue.SettingsImpl;

/**
* JRuby extension to wrap a persistent queue istance.
* */
*/
@JRubyClass(name = "AckedQueue")
public final class JRubyAckedQueueExt extends RubyObject {

Expand Down Expand Up @@ -68,15 +71,15 @@ public static JRubyAckedQueueExt create(String path, int capacity, int maxEvents
private void initializeQueue(String path, int capacity, int maxEvents, int checkpointMaxWrites,
int checkpointMaxAcks, boolean checkpointRetry, long maxBytes) {
this.queue = new Queue(
SettingsImpl.fileSettingsBuilder(path)
.capacity(capacity)
.maxUnread(maxEvents)
.queueMaxBytes(maxBytes)
.checkpointMaxAcks(checkpointMaxAcks)
.checkpointMaxWrites(checkpointMaxWrites)
.checkpointRetry(checkpointRetry)
.elementClass(Event.class)
.build()
SettingsImpl.fileSettingsBuilder(path)
.capacity(capacity)
.maxUnread(maxEvents)
.queueMaxBytes(maxBytes)
.checkpointMaxAcks(checkpointMaxAcks)
.checkpointMaxWrites(checkpointMaxWrites)
.checkpointRetry(checkpointRetry)
.elementClass(Event.class)
.build()
);
}

Expand Down Expand Up @@ -132,24 +135,28 @@ public void rubyWrite(ThreadContext context, Event event) {
}
}

public void write(Event event) throws IOException {
this.queue.write(event);
public void write(Event event) {
try {
this.queue.write(event);
} catch (IOException e) {
throw new IllegalStateException(QueueExceptionMessages.UNHANDLED_ERROR_WRITING_TO_QUEUE, e);
}
}

@JRubyMethod(name = "read_batch", required = 2)
public IRubyObject ruby_read_batch(ThreadContext context, IRubyObject limit, IRubyObject timeout) {
AckedBatch b;
public IRubyObject rubyReadBatch(ThreadContext context, IRubyObject limit, IRubyObject timeout) {
AckedBatch batch;
try {
b = readBatch(RubyFixnum.num2int(limit), RubyFixnum.num2int(timeout));
batch = readBatch(RubyFixnum.num2int(limit), RubyFixnum.num2int(timeout));
} catch (IOException e) {
throw RubyUtil.newRubyIOError(context.runtime, e);
}
return RubyUtil.toRubyObject(b);
return RubyUtil.toRubyObject(batch);
}

public AckedBatch readBatch(int limit, long timeout) throws IOException {
Batch b = queue.readBatch(limit, timeout);
return (b == null) ? null : AckedBatch.create(b);
final Batch batch = queue.readBatch(limit, timeout);
return Objects.isNull(batch) ? null : AckedBatch.create(batch);
}

@JRubyMethod(name = "is_fully_acked?")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
package org.logstash.ackedqueue.ext;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

import org.jruby.Ruby;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
Expand All @@ -41,14 +41,13 @@

/**
* JRuby extension
* */
*/
@JRubyClass(name = "WrappedAckedQueue")
public final class JRubyWrappedAckedQueueExt extends AbstractWrappedQueueExt {

private static final long serialVersionUID = 1L;

private JRubyAckedQueueExt queue;
private final AtomicBoolean isClosed = new AtomicBoolean();

@JRubyMethod(optional = 8)
public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[] args) throws IOException {
Expand All @@ -57,7 +56,7 @@ public JRubyWrappedAckedQueueExt initialize(ThreadContext context, IRubyObject[]
int maxEvents = RubyFixnum.num2int(args[2]);
int checkpointMaxWrites = RubyFixnum.num2int(args[3]);
int checkpointMaxAcks = RubyFixnum.num2int(args[4]);
boolean checkpointRetry = !((RubyBoolean)args[6]).isFalse();
boolean checkpointRetry = !((RubyBoolean) args[6]).isFalse();
long queueMaxBytes = RubyFixnum.num2long(args[7]);

this.queue = JRubyAckedQueueExt.create(args[0].asJavaString(), capacity, maxEvents,
Expand All @@ -78,19 +77,16 @@ public JRubyAckedQueueExt rubyGetQueue() {

public void close() throws IOException {
queue.close();
isClosed.set(true);
}

@JRubyMethod(name = {"push", "<<"})
public void rubyPush(ThreadContext context, IRubyObject event) {
checkIfClosed("write");
queue.rubyWrite(context, ((JrubyEventExtLibrary.RubyEvent) event).getEvent());
}

@JRubyMethod(name = "read_batch")
public IRubyObject rubyReadBatch(ThreadContext context, IRubyObject size, IRubyObject wait) {
checkIfClosed("read a batch");
return queue.ruby_read_batch(context, size, wait);
return queue.rubyReadBatch(context, size, wait);
}

@JRubyMethod(name = "is_empty?")
Expand All @@ -100,7 +96,7 @@ public IRubyObject rubyIsEmpty(ThreadContext context) {

@Override
protected JRubyAbstractQueueWriteClientExt getWriteClient(final ThreadContext context) {
return JrubyAckedWriteClientExt.create(queue, isClosed);
return JrubyAckedWriteClientExt.create(queue);
}

@Override
Expand All @@ -117,10 +113,4 @@ protected IRubyObject doClose(final ThreadContext context) {
}
return context.nil;
}

private void checkIfClosed(String action) {
if (isClosed.get()) {
throw new RuntimeException("Attempted to " + action + " on a closed AckedQueue");
}
}
}
Loading

0 comments on commit acaec27

Please sign in to comment.