diff --git a/build.gradle b/build.gradle index f61f6ee..fb46d91 100644 --- a/build.gradle +++ b/build.gradle @@ -74,7 +74,8 @@ dependencies { compileOnly 'com.fasterxml.jackson.core:jackson-databind:2.7.4' compileOnly 'com.fasterxml.jackson.module:jackson-module-afterburner:2.7.4' compileOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.7.4' - compileOnly group: 'org.jruby', name: 'jruby-complete', version: "1.7.25" + compileOnly 'org.jruby:jruby-complete:9.1.10.0' + compileOnly fileTree(dir: logstashCoreGemPath, include: '**/*.jar') testCompile group: 'junit', name: 'junit', version: '4.12' @@ -85,6 +86,6 @@ dependencies { testCompile 'com.fasterxml.jackson.core:jackson-databind:2.7.4' testCompile 'com.fasterxml.jackson.module:jackson-module-afterburner:2.7.4' testCompile 'com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:2.7.4' - testCompile group: 'org.jruby', name: 'jruby-complete', version: "1.7.25" + testCompile 'org.jruby:jruby-complete:9.1.10.0'; testCompile fileTree(dir: logstashCoreGemPath, include: '**/*.jar') } diff --git a/src/main/java/org/logstash/input/DeadLetterQueueInputPlugin.java b/src/main/java/org/logstash/input/DeadLetterQueueInputPlugin.java index fb79aca..570d4eb 100644 --- a/src/main/java/org/logstash/input/DeadLetterQueueInputPlugin.java +++ b/src/main/java/org/logstash/input/DeadLetterQueueInputPlugin.java @@ -25,6 +25,7 @@ import org.logstash.ackedqueue.Queueable; import org.logstash.common.io.DeadLetterQueueReader; +import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Files; @@ -34,7 +35,7 @@ import java.util.function.Consumer; -public class DeadLetterQueueInputPlugin { +public class DeadLetterQueueInputPlugin implements Closeable { private static final Logger logger = LogManager.getLogger(DeadLetterQueueInputPlugin.class); private final static char VERSION = '1'; @@ -57,9 +58,13 @@ public DeadLetterQueueReader getQueueReader() { } public void register() throws IOException { - if (sinceDbPath != null && Files.exists(sinceDbPath) && targetTimestamp == null) { + if (sinceDbPath != null && Files.exists(sinceDbPath)) { byte[] bytes = Files.readAllBytes(sinceDbPath); + if (bytes.length == 0) { + if (targetTimestamp != null) { + queueReader.seekToNextEvent(targetTimestamp); + } return; } ByteBuffer buffer = ByteBuffer.wrap(bytes); @@ -87,7 +92,6 @@ public void run(Consumer queueConsumer) throws Exception { } private void writeOffsets(Path segment, long offset) throws IOException { - logger.info("writing offsets"); String path = segment.toAbsolutePath().toString(); ByteBuffer buffer = ByteBuffer.allocate(path.length() + 1 + 64); buffer.putChar(VERSION); @@ -97,12 +101,14 @@ private void writeOffsets(Path segment, long offset) throws IOException { Files.write(sinceDbPath, buffer.array()); } + @Override public void close() throws IOException { - logger.warn("closing dead letter queue input plugin"); - if (commitOffsets) { - writeOffsets(queueReader.getCurrentSegment(), queueReader.getCurrentPosition()); + if (open.get()) { + if (commitOffsets) { + writeOffsets(queueReader.getCurrentSegment(), queueReader.getCurrentPosition()); + } + queueReader.close(); + open.set(false); } - queueReader.close(); - open.set(false); } } diff --git a/src/test/java/org/logstash/input/DeadLetterQueueInputPluginTests.java b/src/test/java/org/logstash/input/DeadLetterQueueInputPluginTests.java index 1eabadf..428328e 100644 --- a/src/test/java/org/logstash/input/DeadLetterQueueInputPluginTests.java +++ b/src/test/java/org/logstash/input/DeadLetterQueueInputPluginTests.java @@ -18,7 +18,6 @@ */ package org.logstash.input; -import org.joda.time.DateTime; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -28,8 +27,9 @@ import org.logstash.Timestamp; import org.logstash.common.io.DeadLetterQueueWriter; +import java.io.IOException; import java.nio.file.Path; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import static junit.framework.TestCase.assertEquals; @@ -46,70 +46,153 @@ public void setUp() throws Exception { } @Test - public void test() throws Exception { - DeadLetterQueueWriter queueWriter = new DeadLetterQueueWriter(dir, 100000000, 10000000); - DLQEntry entry = new DLQEntry(new Event(), "test", "test", "test"); - for (int i = 0; i < 10000; i++) { - queueWriter.writeEntry(entry); + public void testConsumeTwiceNoOffsetsWithDate() throws Exception { + DeadLetterQueueWriter queueWriter = null; + try { + queueWriter = new DeadLetterQueueWriter(dir, 100000000, 10000000); + Timestamp cutoffTimestamp = writeEventsWithCutoff(queueWriter, 1000, 800); + + try(DeadLetterQueueInputPlugin plugin = new DeadLetterQueueInputPlugin(dir, false, null, cutoffTimestamp)) { + assertMessagesReceived(plugin, 200); + } + writeEvents(queueWriter, 5); + try(DeadLetterQueueInputPlugin secondPlugin = new DeadLetterQueueInputPlugin(dir, false, null, cutoffTimestamp)) { + assertMessagesReceived(secondPlugin, 205); + } + } finally { + queueWriter.close(); } + } - Path since = temporaryFolder.newFile(".sincdb").toPath(); - DeadLetterQueueInputPlugin plugin = new DeadLetterQueueInputPlugin(dir, true, since, null); + @Test + public void testConsumeTwiceOffsetsNoDate() throws Exception { + DeadLetterQueueWriter queueWriter = null; + try { + queueWriter = new DeadLetterQueueWriter(dir, 100000000, 10000000); + Path since = getSinceDbPathName(); + writeEventsWithCutoff(queueWriter, 1000, 800); - final AtomicInteger count = new AtomicInteger(); - Thread pluginThread = new Thread(() -> { - try { - plugin.register(); - plugin.run((e) -> {count.incrementAndGet();}); - } catch (Exception e) { - // do nothing + try(DeadLetterQueueInputPlugin plugin = new DeadLetterQueueInputPlugin(dir, true, since, null)) { + assertMessagesReceived(plugin, 1000); + } + writeEvents(queueWriter, 5); + try(DeadLetterQueueInputPlugin secondPlugin = new DeadLetterQueueInputPlugin(dir, true, since, null)) { + assertMessagesReceived(secondPlugin, 5); + } + }finally{ + queueWriter.close(); + } + } + + @Test + public void testConsumeTwiceOffsetsWithDate() throws Exception { + DeadLetterQueueWriter queueWriter = null; + try { + queueWriter = new DeadLetterQueueWriter(dir, 100000000, 10000000); + Path since = getSinceDbPathName(); + Timestamp cutoffTimestamp = writeEventsWithCutoff(queueWriter, 1000, 800); + + try(DeadLetterQueueInputPlugin plugin = new DeadLetterQueueInputPlugin(dir, true, since, cutoffTimestamp)){ + assertMessagesReceived(plugin, 200); } - }); - pluginThread.start(); - Thread.sleep(2000); - assertEquals(10000, count.get()); - queueWriter.writeEntry(entry); - Thread.sleep(200); - assertEquals(10001, count.get()); - pluginThread.interrupt(); - pluginThread.join(); - plugin.close(); - queueWriter.writeEntry(entry); - queueWriter.writeEntry(entry); + writeEvents(queueWriter, 5); + try(DeadLetterQueueInputPlugin secondPlugin = new DeadLetterQueueInputPlugin(dir, true, since, cutoffTimestamp)){ + assertMessagesReceived(secondPlugin, 5); + } + }finally{ + queueWriter.close(); + } + } - DeadLetterQueueInputPlugin secondPlugin = new DeadLetterQueueInputPlugin(dir, true, since, null); + @Test + public void testConsumeTwiceNoOffsetsNoDate() throws Exception { + DeadLetterQueueWriter queueWriter = null; + try { + queueWriter = new DeadLetterQueueWriter(dir, 100000000, 10000000); + writeEventsWithCutoff(queueWriter, 1000, 800); + + try(DeadLetterQueueInputPlugin plugin = new DeadLetterQueueInputPlugin(dir, false, null, null)){ + assertMessagesReceived(plugin, 1000); + } + + writeEvents(queueWriter, 5); + + try(DeadLetterQueueInputPlugin secondPlugin = new DeadLetterQueueInputPlugin(dir, false, null, null)) { + assertMessagesReceived(secondPlugin, 1005); + } + }finally{ + queueWriter.close(); + } + } - pluginThread = new Thread(() -> { + /** + * Assert that the number of messages received by the plugin matches the expected count. + * @param plugin + * @param expectedCount + * @throws InterruptedException + * @throws IOException + */ + private static void assertMessagesReceived(DeadLetterQueueInputPlugin plugin, int expectedCount) throws InterruptedException, IOException { + LongAdder count = new LongAdder(); + Thread pluginThread = new Thread(() -> { try { - secondPlugin.register(); - secondPlugin.run((e) -> {count.incrementAndGet();}); + plugin.register(); + plugin.run((e) -> {count.increment();}); } catch (Exception e) { // do nothing } }); + pluginThread.start(); - Thread.sleep(200); + // Use a sleep to make sure we do not receive too many messages. + Thread.sleep(500); pluginThread.interrupt(); pluginThread.join(); - secondPlugin.close(); - assertEquals(10003, count.get()); + assertEquals(expectedCount, count.intValue()); } - @Test - public void testTimestamp() throws Exception { - DeadLetterQueueWriter queueWriter = new DeadLetterQueueWriter(dir, 100000, 10000000); + /** + * Write events to the queue, adding a boundary + * @param queueWriter instance of {@link DeadLetterQueueWriter} to write entry to queue + * @param eventsToWrite How many events to write in total + * @param cutOffPoint After how many events should the 'cutoff' timestamp be written + * @return CutOff {@link Timestamp} + * @throws IOException + */ + private static Timestamp writeEventsWithCutoff(DeadLetterQueueWriter queueWriter, int eventsToWrite, int cutOffPoint) throws IOException { long epoch = 1490659200000L; - String targetDateString = ""; - for (int i = 0; i < 10000; i++) { + Timestamp cutoffTimestamp = null; + for (int i = 0; i < eventsToWrite; i++) { DLQEntry entry = new DLQEntry(new Event(), "test", "test", "test", new Timestamp(epoch)); queueWriter.writeEntry(entry); epoch += 1000; - if (i == 800) { - targetDateString = entry.getEntryTime().toIso8601(); + if (i == cutOffPoint) { + cutoffTimestamp = entry.getEntryTime(); } } - DeadLetterQueueInputPlugin plugin = new DeadLetterQueueInputPlugin(dir, false, null, new Timestamp(targetDateString)); - plugin.register(); + return cutoffTimestamp; } + + /** + * Write events to the queue + * @param queueWriter instance of {@link DeadLetterQueueWriter} to write entry to queue + * @param eventsToWrite How many events to write in total + * @throws IOException + */ + private static void writeEvents(DeadLetterQueueWriter queueWriter, int eventsToWrite) throws IOException { + for (int i = 0; i < eventsToWrite; i++) { + DLQEntry entry = new DLQEntry(new Event(), "test", "test", "test"); + queueWriter.writeEntry(entry); + } + } + + /** + * Return the path of the since db, but do not create + * @return {@link Path} Location of the since db. + */ + private Path getSinceDbPathName() { + return temporaryFolder.getRoot().toPath().resolve(".sincdb"); + } + }