diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java index 7e759909d5..ea16e375cd 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOption.java @@ -14,7 +14,8 @@ public enum CompressionOption { NONE("none", NoneCompressionEngine::new), - GZIP("gzip", GZipCompressionEngine::new); + GZIP("gzip", GZipCompressionEngine::new), + SNAPPY("snappy", SnappyCompressionEngine::new); private static final Map OPTIONS_MAP = Arrays.stream(CompressionOption.values()) .collect(Collectors.toMap( @@ -23,8 +24,8 @@ public enum CompressionOption { )); private final String option; - private final Supplier compressionEngineSupplier; + private final Supplier compressionEngineSupplier; CompressionOption(final String option, final Supplier compressionEngineSupplier) { this.option = option.toLowerCase(); this.compressionEngineSupplier = compressionEngineSupplier; @@ -34,8 +35,12 @@ public CompressionEngine getCompressionEngine() { return compressionEngineSupplier.get(); } + String getOption() { + return option; + } + @JsonCreator public static CompressionOption fromOptionValue(final String option) { - return OPTIONS_MAP.get(option.toLowerCase()); + return OPTIONS_MAP.get(option); } } diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java index f59956a8ed..3fd045b714 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/GZipCompressionEngine.java @@ -10,7 +10,7 @@ import java.io.IOException; import java.io.OutputStream; -public class GZipCompressionEngine implements CompressionEngine { +class GZipCompressionEngine implements CompressionEngine { @Override public OutputStream createOutputStream(final OutputStream outputStream) throws IOException { return new GzipCompressorOutputStream(outputStream); diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java index 9c852b4f85..e7eed68da2 100644 --- a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/NoneCompressionEngine.java @@ -7,7 +7,7 @@ import java.io.OutputStream; -public class NoneCompressionEngine implements CompressionEngine { +class NoneCompressionEngine implements CompressionEngine { @Override public OutputStream createOutputStream(final OutputStream outputStream) { return outputStream; diff --git a/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/SnappyCompressionEngine.java b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/SnappyCompressionEngine.java new file mode 100644 index 0000000000..03bf0eec1b --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/main/java/org/opensearch/dataprepper/plugins/sink/s3/compression/SnappyCompressionEngine.java @@ -0,0 +1,18 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import org.xerial.snappy.SnappyOutputStream; + +import java.io.IOException; +import java.io.OutputStream; + +class SnappyCompressionEngine implements CompressionEngine { + @Override + public OutputStream createOutputStream(final OutputStream outputStream) throws IOException { + return new SnappyOutputStream(outputStream); + } +} diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOptionTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOptionTest.java new file mode 100644 index 0000000000..15a13b31db --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/CompressionOptionTest.java @@ -0,0 +1,51 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.stream.Stream; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.params.provider.Arguments.arguments; + +class CompressionOptionTest { + @ParameterizedTest + @EnumSource(CompressionOption.class) + void fromOptionValue_returns_expected_value(final CompressionOption option) { + assertThat(CompressionOption.fromOptionValue(option.getOption()), equalTo(option)); + } + + @ParameterizedTest + @EnumSource(CompressionOption.class) + void getCompressionEngine_returns_a_CompressionEngine(final CompressionOption option) { + assertThat(option.getCompressionEngine(), instanceOf(CompressionEngine.class)); + } + + @ParameterizedTest + @ArgumentsSource(OptionToExpectedEngine.class) + void getCompressionEngine_returns_expected_engine_type(final CompressionOption option, final Class expectedEngineType) { + assertThat(option.getCompressionEngine(), instanceOf(expectedEngineType)); + } + + static class OptionToExpectedEngine implements ArgumentsProvider { + @Override + public Stream provideArguments(final ExtensionContext context) { + return Stream.of( + arguments(CompressionOption.NONE, NoneCompressionEngine.class), + arguments(CompressionOption.GZIP, GZipCompressionEngine.class), + arguments(CompressionOption.SNAPPY, SnappyCompressionEngine.class) + ); + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/SnappyCompressionEngineTest.java b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/SnappyCompressionEngineTest.java new file mode 100644 index 0000000000..709d445d05 --- /dev/null +++ b/data-prepper-plugins/s3-sink/src/test/java/org/opensearch/dataprepper/plugins/sink/s3/compression/SnappyCompressionEngineTest.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.sink.s3.compression; + +import org.junit.jupiter.api.Test; +import org.xerial.snappy.SnappyCodec; +import org.xerial.snappy.SnappyInputStream; +import org.xerial.snappy.SnappyOutputStream; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; + +class SnappyCompressionEngineTest { + + private SnappyCompressionEngine createObjectUnderTest() { + return new SnappyCompressionEngine(); + } + + @Test + void createOutputStream_should_return_SnappyOutputStream() throws IOException { + final OutputStream innerOutputStream = mock(OutputStream.class); + final OutputStream outputStream = createObjectUnderTest().createOutputStream(innerOutputStream); + + assertThat(outputStream, instanceOf(SnappyOutputStream.class)); + } + + @Test + void createOutputStream_should_write_compressed_data() throws IOException { + final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + + final OutputStream outputStream = createObjectUnderTest().createOutputStream(byteArrayOutputStream); + + final byte[] inputBytes = UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8); + + outputStream.write(inputBytes); + outputStream.close(); + + final byte[] writtenBytes = byteArrayOutputStream.toByteArray(); + + assertTrue(SnappyCodec.hasMagicHeaderPrefix(writtenBytes)); + + final ByteArrayInputStream verificationInputStream = new ByteArrayInputStream(writtenBytes); + + final SnappyInputStream uncompressingInputStream = new SnappyInputStream(verificationInputStream); + final byte[] uncompressedBytes = uncompressingInputStream.readAllBytes(); + assertThat(uncompressedBytes, equalTo(inputBytes)); + } +} \ No newline at end of file