diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 50f6e56ba..56b883388 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -345,6 +345,13 @@ public interface CuratorFramework extends Closeable { */ SchemaSet getSchemaSet(); + /** + * Return whether compression is enabled by default for all create, setData and getData operations. + * + * @return if compression is enabled + */ + boolean compressionEnabled(); + /** * Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is * done from the {@link #runSafe(Runnable)} thread. diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index d24b56d64..87f0e9f17 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -163,6 +163,7 @@ public static class Builder { private List authInfos = null; private byte[] defaultData = LOCAL_ADDRESS; private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER; + private boolean compressionEnabled = false; private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY; private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER; private boolean canBeReadOnly = false; @@ -367,6 +368,18 @@ public Builder compressionProvider(CompressionProvider compressionProvider) { return this; } + /** + * By default, each write or read call must explicitly use compression. + * Call this method to enable compression by default on all read and write calls. + *

+ * In order to implement filtered compression, use this option and a custom {@link CompressionProvider} that only compresses and decompresses the zNodes that match the desired filter. + * @return this + */ + public Builder enableCompression() { + this.compressionEnabled = true; + return this; + } + /** * @param zookeeperFactory the zookeeper factory to use * @return this @@ -542,6 +555,10 @@ public CompressionProvider getCompressionProvider() { return compressionProvider; } + public boolean compressionEnabled() { + return compressionEnabled; + } + public ThreadFactory getThreadFactory() { return threadFactory; } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java index 1aaca83c7..4d0feef32 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java @@ -26,4 +26,12 @@ public interface Compressible { * @return this */ public T compressed(); + + /** + * Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled + * + * @return this + */ + public T uncompressed(); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java index b6e6bbba9..a60abb55c 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java @@ -26,4 +26,12 @@ public interface Decompressible { * @return this */ public T decompressed(); + + /** + * Cause the data to not be de-compressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled + * + * @return this + */ + public T undecompressed(); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index 5487f8560..b76ce98f4 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -100,7 +100,7 @@ public class CreateBuilderImpl acling = new ACLing(client.getAclProvider()); createParentsIfNeeded = false; createParentsAsContainers = false; - compress = false; + compress = client.compressionEnabled(); setDataIfExists = false; storingStat = null; ttl = -1; @@ -193,6 +193,12 @@ public ACLCreateModePathAndBytesable compressed() { return this; } + @Override + public ACLCreateModePathAndBytesable uncompressed() { + CreateBuilderImpl.this.uncompressed(); + return this; + } + @Override public T forPath(String path) throws Exception { return forPath(path, client.getDefaultData()); @@ -216,7 +222,16 @@ public T forPath(String path, byte[] data) throws Exception { @Override public CreateBackgroundModeStatACLable compressed() { - compress = true; + return withCompression(true); + } + + @Override + public CreateBackgroundModeStatACLable uncompressed() { + return withCompression(false); + } + + private CreateBackgroundModeStatACLable withCompression(boolean compress) { + this.compress = compress; return new CreateBackgroundModeStatACLable() { @Override public CreateBackgroundModeACLable storingStatIn(Stat stat) { diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 816d0bda0..c36b120af 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -108,6 +108,7 @@ public class CuratorFrameworkImpl implements CuratorFramework { private final FailedDeleteManager failedDeleteManager; private final FailedRemoveWatchManager failedRemoveWatcherManager; private final CompressionProvider compressionProvider; + private final boolean compressionEnabled; private final ACLProvider aclProvider; private final NamespaceFacadeCache namespaceFacadeCache; private final boolean useContainerParentsIfAvailable; @@ -184,6 +185,7 @@ public void process(WatchedEvent watchedEvent) { builder.getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory()); compressionProvider = builder.getCompressionProvider(); + compressionEnabled = builder.compressionEnabled(); aclProvider = builder.getAclProvider(); state = new AtomicReference(CuratorFrameworkState.LATENT); useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable(); @@ -283,6 +285,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) { failedDeleteManager = parent.failedDeleteManager; failedRemoveWatcherManager = parent.failedRemoveWatcherManager; compressionProvider = parent.compressionProvider; + compressionEnabled = parent.compressionEnabled; aclProvider = parent.aclProvider; namespaceFacadeCache = parent.namespaceFacadeCache; namespace = parent.namespace; @@ -618,6 +621,11 @@ public SchemaSet getSchemaSet() { return schemaSet; } + @Override + public boolean compressionEnabled() { + return compressionEnabled; + } + ACLProvider getAclProvider() { return aclProvider; } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java index 7219cb518..df6d15cbd 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java @@ -46,7 +46,7 @@ public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation inBackground() { diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java index 73ba8a540..98b21e33a 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java @@ -54,7 +54,7 @@ public class SetDataBuilderImpl this.client = client; backgrounding = new Backgrounding(); version = -1; - compress = false; + compress = client.compressionEnabled(); } public SetDataBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, int version, boolean compress) { @@ -94,12 +94,27 @@ public VersionPathAndBytesable compressed() { compress = true; return this; } + + @Override + public VersionPathAndBytesable uncompressed() { + compress = false; + return this; + } }; } @Override public SetDataBackgroundVersionable compressed() { - compress = true; + return withCompression(true); + } + + @Override + public SetDataBackgroundVersionable uncompressed() { + return withCompression(false); + } + + public SetDataBackgroundVersionable withCompression(boolean compress) { + this.compress = compress; return new SetDataBackgroundVersionable() { @Override public ErrorListenerPathAndBytesable inBackground() { diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java index 63c9706c9..fabf255dd 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java @@ -35,7 +35,7 @@ class TempGetDataBuilderImpl implements TempGetDataBuilder { TempGetDataBuilderImpl(CuratorFrameworkImpl client) { this.client = client; responseStat = null; - decompress = false; + decompress = client.compressionEnabled(); } @Override @@ -44,6 +44,12 @@ public StatPathable decompressed() { return this; } + @Override + public StatPathable undecompressed() { + decompress = false; + return this; + } + @Override public Pathable storingStatIn(Stat stat) { responseStat = stat; diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java index 691b74374..9288939ee 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java @@ -22,7 +22,9 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import java.util.concurrent.atomic.AtomicInteger; +import java.util.zip.ZipException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.CompressionProvider; @@ -97,6 +99,73 @@ public void testSetData() throws Exception { } } + @Test + public void testSetDataGlobalCompression() throws Exception { + final byte[] data = "here's a string".getBytes(); + final byte[] gzipedData = GzipCompressionProvider.doCompress(data); + + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryOneTime(1)) + .enableCompression() + .build(); + try { + client.start(); + + // Create with explicit compression + client.create().compressed().creatingParentsIfNeeded().forPath("/a/b/c", data); + assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c")); + assertArrayEquals(data, client.getData().forPath("/a/b/c")); + assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c")); + assertEquals( + gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength()); + + // Create explicitly without compression + client.delete().forPath("/a/b/c"); + client.create().uncompressed().creatingParentsIfNeeded().forPath("/a/b/c", data); + assertArrayEquals(data, client.getData().undecompressed().forPath("/a/b/c")); + assertThrows( + ZipException.class, () -> client.getData().decompressed().forPath("/a/b/c")); + assertThrows(ZipException.class, () -> client.getData().forPath("/a/b/c")); + assertEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength()); + + // Create with implicit (global) compression + client.delete().forPath("/a/b/c"); + client.create().creatingParentsIfNeeded().forPath("/a/b/c", data); + assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c")); + assertArrayEquals(data, client.getData().forPath("/a/b/c")); + assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c")); + assertEquals( + gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength()); + + // SetData with explicit compression + client.setData().compressed().forPath("/a/b/c", data); + assertArrayEquals(data, client.getData().forPath("/a/b/c")); + assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c")); + assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c")); + assertEquals( + gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength()); + + // SetData explicitly without compression + client.setData().uncompressed().forPath("/a/b/c", data); + assertArrayEquals(data, client.getData().undecompressed().forPath("/a/b/c")); + assertThrows( + ZipException.class, () -> client.getData().decompressed().forPath("/a/b/c")); + assertThrows(ZipException.class, () -> client.getData().forPath("/a/b/c")); + assertEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength()); + + // SetData with implicit (global) compression + client.setData().forPath("/a/b/c", data); + assertArrayEquals(data, client.getData().forPath("/a/b/c")); + assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c")); + assertArrayEquals(gzipedData, client.getData().undecompressed().forPath("/a/b/c")); + assertEquals( + gzipedData.length, client.checkExists().forPath("/a/b/c").getDataLength()); + } finally { + CloseableUtils.closeQuietly(client); + } + } + @Test public void testSimple() throws Exception { final byte[] data = "here's a string".getBytes(); diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java index 7d389e351..f7389ce07 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java @@ -20,8 +20,11 @@ package org.apache.curator.framework.imps; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.zip.ZipException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.transaction.CuratorOp; @@ -145,4 +148,65 @@ public void testCreateCompressedAndUncompressed() throws Exception { CloseableUtils.closeQuietly(client); } } + + @Test + public void testGlobalCompression() throws Exception { + final String path1 = "/a"; + final String path2 = "/b"; + + final byte[] data1 = "here's a string".getBytes(); + final byte[] data2 = "here's another string".getBytes(); + final byte[] gzipedData1 = GzipCompressionProvider.doCompress(data1); + final byte[] gzipedData2 = GzipCompressionProvider.doCompress(data2); + + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryOneTime(1)) + .enableCompression() + .build(); + try { + client.start(); + + // Create the nodes in a transaction + // path1 is compressed (globally) + // path2 is uncompressed (override) + CuratorOp op1 = client.transactionOp().create().forPath(path1, data1); + CuratorOp op2 = client.transactionOp().create().uncompressed().forPath(path2, data2); + client.transaction().forOperations(op1, op2); + + // Check they exist + assertNotNull(client.checkExists().forPath(path1)); + assertEquals(gzipedData1.length, client.checkExists().forPath(path1).getDataLength()); + assertNotNull(client.checkExists().forPath(path2)); + assertEquals(data2.length, client.checkExists().forPath(path2).getDataLength()); + assertArrayEquals(data1, client.getData().forPath(path1)); + assertArrayEquals(data1, client.getData().decompressed().forPath(path1)); + assertArrayEquals(gzipedData1, client.getData().undecompressed().forPath(path1)); + assertArrayEquals(data2, client.getData().undecompressed().forPath(path2)); + assertThrows( + ZipException.class, () -> client.getData().decompressed().forPath(path2)); + assertThrows(ZipException.class, () -> client.getData().forPath(path2)); + + // Set data in transaction + // path1 is uncompressed (override) + // path2 is compressed (globally) + op1 = client.transactionOp().setData().uncompressed().forPath(path1, data1); + op2 = client.transactionOp().setData().forPath(path2, data2); + client.transaction().forOperations(op1, op2); + + assertNotNull(client.checkExists().forPath(path1)); + assertEquals(data1.length, client.checkExists().forPath(path1).getDataLength()); + assertNotNull(client.checkExists().forPath(path2)); + assertEquals(gzipedData2.length, client.checkExists().forPath(path2).getDataLength()); + assertArrayEquals(data1, client.getData().undecompressed().forPath(path1)); + assertThrows( + ZipException.class, () -> client.getData().decompressed().forPath(path1)); + assertThrows(ZipException.class, () -> client.getData().forPath(path1)); + assertArrayEquals(data2, client.getData().decompressed().forPath(path2)); + assertArrayEquals(data2, client.getData().forPath(path2)); + assertArrayEquals(gzipedData2, client.getData().undecompressed().forPath(path2)); + } finally { + CloseableUtils.closeQuietly(client); + } + } } diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java index d347e20fe..9e8958f2e 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java @@ -20,8 +20,11 @@ package org.apache.curator.framework.imps; import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import java.util.zip.ZipException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; @@ -172,4 +175,77 @@ public void testCreateCompressedAndUncompressed() throws Exception { CloseableUtils.closeQuietly(client); } } + + @Test + public void testGlobalCompression() throws Exception { + final String path1 = "/a"; + final String path2 = "/b"; + + final byte[] data1 = "here's a string".getBytes(); + final byte[] data2 = "here's another string".getBytes(); + final byte[] gzipedData1 = GzipCompressionProvider.doCompress(data1); + final byte[] gzipedData2 = GzipCompressionProvider.doCompress(data2); + + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryOneTime(1)) + .enableCompression() + .build(); + try { + client.start(); + + // Create the nodes in a transaction + // path1 is compressed (globally) + // path2 is uncompressed (override) + client.inTransaction() + .create() + .forPath(path1, data1) + .and() + .create() + .uncompressed() + .forPath(path2, data2) + .and() + .commit(); + + // Check they exist + assertNotNull(client.checkExists().forPath(path1)); + assertEquals(gzipedData1.length, client.checkExists().forPath(path1).getDataLength()); + assertNotNull(client.checkExists().forPath(path2)); + assertEquals(data2.length, client.checkExists().forPath(path2).getDataLength()); + assertArrayEquals(data1, client.getData().forPath(path1)); + assertArrayEquals(data1, client.getData().decompressed().forPath(path1)); + assertArrayEquals(gzipedData1, client.getData().undecompressed().forPath(path1)); + assertArrayEquals(data2, client.getData().undecompressed().forPath(path2)); + assertThrows( + ZipException.class, () -> client.getData().decompressed().forPath(path2)); + assertThrows(ZipException.class, () -> client.getData().forPath(path2)); + + // Set data in transaction + // path1 is uncompressed (override) + // path2 is compressed (globally) + client.inTransaction() + .setData() + .uncompressed() + .forPath(path1, data1) + .and() + .setData() + .forPath(path2, data2) + .and() + .commit(); + + assertNotNull(client.checkExists().forPath(path1)); + assertEquals(data1.length, client.checkExists().forPath(path1).getDataLength()); + assertNotNull(client.checkExists().forPath(path2)); + assertEquals(gzipedData2.length, client.checkExists().forPath(path2).getDataLength()); + assertArrayEquals(data1, client.getData().undecompressed().forPath(path1)); + assertThrows( + ZipException.class, () -> client.getData().decompressed().forPath(path1)); + assertThrows(ZipException.class, () -> client.getData().forPath(path1)); + assertArrayEquals(data2, client.getData().decompressed().forPath(path2)); + assertArrayEquals(data2, client.getData().forPath(path2)); + assertArrayEquals(gzipedData2, client.getData().undecompressed().forPath(path2)); + } finally { + CloseableUtils.closeQuietly(client); + } + } } diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncGetDataBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncGetDataBuilder.java index 1840e39b3..9c4c98d26 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncGetDataBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncGetDataBuilder.java @@ -33,6 +33,14 @@ public interface AsyncGetDataBuilder extends AsyncPathable> { */ AsyncPathable> decompressed(); + /** + * Cause the data to not be de-compressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled + * + * @return this + */ + AsyncPathable> undecompressed(); + /** * Have the operation fill the provided stat object * @@ -50,4 +58,14 @@ public interface AsyncGetDataBuilder extends AsyncPathable> { * @return this */ AsyncPathable> decompressedStoringStatIn(Stat stat); + + /** + * Have the operation fill the provided stat object without the data being de-compressed + * + * @param stat the stat to have filled in + * @see #undecompressed() + * @see #storingStatIn(org.apache.zookeeper.data.Stat) + * @return this + */ + AsyncPathable> undecompressedStoringStatIn(Stat stat); } diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncSetDataBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncSetDataBuilder.java index 972ddcfaa..8208ff551 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncSetDataBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncSetDataBuilder.java @@ -33,6 +33,14 @@ public interface AsyncSetDataBuilder extends AsyncPathAndBytesable> compressed(); + /** + * Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled + * + * @return this + */ + AsyncPathAndBytesable> uncompressed(); + /** * Cause the data to be compressed using the configured compression provider. * Only sets if the version matches. By default -1 is used @@ -43,6 +51,16 @@ public interface AsyncSetDataBuilder extends AsyncPathAndBytesable> compressedWithVersion(int version); + /** + * Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled. Only sets if the version matches. By default -1 is used + * which matches all versions. + * + * @param version version + * @return this + */ + AsyncPathAndBytesable> uncompressedWithVersion(int version); + /** * Only sets if the version matches. By default -1 is used * which matches all versions. diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java index 1bb7de523..fc9bd77d8 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java @@ -51,6 +51,14 @@ public interface AsyncTransactionCreateBuilder extends AsyncPathAndBytesable compressed(); + /** + * Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled + * + * @return this + */ + AsyncPathAndBytesable uncompressed(); + /** * Specify a TTL when mode is {@link org.apache.zookeeper.CreateMode#PERSISTENT_WITH_TTL} or * {@link org.apache.zookeeper.CreateMode#PERSISTENT_SEQUENTIAL_WITH_TTL}. If diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionSetDataBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionSetDataBuilder.java index 37f45a1d2..82a754f79 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionSetDataBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionSetDataBuilder.java @@ -40,6 +40,14 @@ public interface AsyncTransactionSetDataBuilder extends AsyncPathAndBytesable compressed(); + /** + * Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled + * + * @return this + */ + AsyncPathAndBytesable uncompressed(); + /** * Cause the data to be compressed using the configured compression provider. * Also changes the version number used. By default, -1 is used @@ -48,4 +56,13 @@ public interface AsyncTransactionSetDataBuilder extends AsyncPathAndBytesable withVersionCompressed(int version); + + /** + * Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled. Also changes the version number used. By default, -1 is used + * + * @param version version to use + * @return this + */ + AsyncPathAndBytesable withVersionUncompressed(int version); } diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/CreateOption.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/CreateOption.java index 6214f9c1d..6b6a005fe 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/CreateOption.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/CreateOption.java @@ -69,6 +69,11 @@ public enum CreateOption { */ compress, + /** + * Cause the data to be uncompressed + */ + uncompress, + /** * If the ZNode already exists, Curator will instead call setData() */ diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java index 5ab579934..6f9a6c045 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java @@ -167,7 +167,9 @@ private AsyncStage internalForPath(String path, byte[] data, boolean use || options.contains(CreateOption.createParentsAsContainers), options.contains(CreateOption.createParentsAsContainers), options.contains(CreateOption.doProtected), - options.contains(CreateOption.compress), + options.contains(CreateOption.compress) + ? true + : options.contains(CreateOption.uncompress) ? false : client.compressionEnabled(), options.contains(CreateOption.setDataIfExists), aclList, stat, diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java index a94d1052b..1498968a1 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java @@ -33,13 +33,14 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder { private final CuratorFrameworkImpl client; private final Filters filters; private final WatchMode watchMode; - private boolean decompressed = false; + private boolean decompressed; private Stat stat = null; AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, Filters filters, WatchMode watchMode) { this.client = client; this.filters = filters; this.watchMode = watchMode; + this.decompressed = client.compressionEnabled(); } @Override @@ -48,6 +49,12 @@ public AsyncPathable> decompressed() { return this; } + @Override + public AsyncPathable> undecompressed() { + decompressed = false; + return this; + } + @Override public AsyncPathable> storingStatIn(Stat stat) { this.stat = stat; @@ -61,6 +68,13 @@ public AsyncPathable> decompressedStoringStatIn(Stat stat) { return this; } + @Override + public AsyncPathable> undecompressedStoringStatIn(Stat stat) { + decompressed = false; + this.stat = stat; + return this; + } + @Override public AsyncStage forPath(String path) { BuilderCommon common = new BuilderCommon<>(filters, watchMode, dataProc); diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java index af1452457..0de17249d 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java @@ -31,12 +31,13 @@ class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder { private final CuratorFrameworkImpl client; private final Filters filters; - private boolean compressed = false; + private boolean compressed; private int version = -1; AsyncSetDataBuilderImpl(CuratorFrameworkImpl client, Filters filters) { this.client = client; this.filters = filters; + this.compressed = client.compressionEnabled(); } @Override @@ -55,6 +56,12 @@ public AsyncPathAndBytesable> compressed() { return this; } + @Override + public AsyncPathAndBytesable> uncompressed() { + compressed = false; + return this; + } + @Override public AsyncPathAndBytesable> compressedWithVersion(int version) { compressed = true; @@ -62,6 +69,13 @@ public AsyncPathAndBytesable> compressedWithVersion(int version return this; } + @Override + public AsyncPathAndBytesable> uncompressedWithVersion(int version) { + compressed = false; + this.version = version; + return this; + } + @Override public AsyncPathAndBytesable> withVersion(int version) { this.version = version; diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java index e9ccbf16d..1ef3d23d4 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java @@ -50,7 +50,7 @@ public AsyncTransactionCreateBuilder create() { return new AsyncTransactionCreateBuilder() { private List aclList = null; private CreateMode createMode = CreateMode.PERSISTENT; - private boolean compressed = false; + private boolean compressed = client.compressionEnabled(); private long ttl = -1; @Override @@ -71,6 +71,12 @@ public AsyncPathAndBytesable compressed() { return this; } + @Override + public AsyncPathAndBytesable uncompressed() { + compressed = false; + return this; + } + @Override public AsyncPathAndBytesable withTtl(long ttl) { this.ttl = ttl; @@ -107,8 +113,9 @@ private CuratorOp internalForPath(String path, byte[] data, boolean useData) { TransactionCreateBuilder2 builder1 = (ttl > 0) ? client.transactionOp().create().withTtl(ttl) : client.transactionOp().create(); - ACLPathAndBytesable builder2 = - compressed ? builder1.compressed().withMode(createMode) : builder1.withMode(createMode); + ACLPathAndBytesable builder2 = compressed + ? builder1.compressed().withMode(createMode) + : builder1.uncompressed().withMode(createMode); PathAndBytesable builder3 = builder2.withACL(aclList); try { return useData ? builder3.forPath(path, data) : builder3.forPath(path); @@ -145,7 +152,7 @@ public CuratorOp forPath(String path) { public AsyncTransactionSetDataBuilder setData() { return new AsyncTransactionSetDataBuilder() { private int version = -1; - private boolean compressed = false; + private boolean compressed = client.compressionEnabled(); @Override public AsyncPathAndBytesable withVersion(int version) { @@ -159,6 +166,12 @@ public AsyncPathAndBytesable compressed() { return this; } + @Override + public AsyncPathAndBytesable uncompressed() { + compressed = false; + return this; + } + @Override public AsyncPathAndBytesable withVersionCompressed(int version) { this.version = version; @@ -166,6 +179,13 @@ public AsyncPathAndBytesable withVersionCompressed(int version) { return this; } + @Override + public AsyncPathAndBytesable withVersionUncompressed(int version) { + this.version = version; + compressed = false; + return this; + } + @Override public CuratorOp forPath(String path, byte[] data) { return internalForPath(path, data, true); @@ -179,7 +199,8 @@ public CuratorOp forPath(String path) { private CuratorOp internalForPath(String path, byte[] data, boolean useData) { TransactionSetDataBuilder builder1 = client.transactionOp().setData(); - VersionPathAndBytesable builder2 = compressed ? builder1.compressed() : builder1; + VersionPathAndBytesable builder2 = + compressed ? builder1.compressed() : builder1.uncompressed(); PathAndBytesable builder3 = builder2.withVersion(version); try { return useData ? builder3.forPath(path, data) : builder3.forPath(path); diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index 01e95decc..c02c93127 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -213,8 +213,12 @@ public AsyncStage update(T item, int version) { try { byte[] bytes = modelSpec.serializer().serialize(item); AsyncSetDataBuilder dataBuilder = dslClient.setData(); - AsyncPathAndBytesable> next = - isCompressed() ? dataBuilder.compressedWithVersion(version) : dataBuilder.withVersion(version); + AsyncPathAndBytesable> next; + if (isCompressed()) { + next = dataBuilder.compressedWithVersion(version); + } else { + next = dataBuilder.uncompressedWithVersion(version); + } return next.forPath(resolveForSet(item), bytes); } catch (Exception e) { return ModelStage.exceptionally(e); @@ -352,11 +356,7 @@ public static boolean isCompressed(Set createOptions) { public CuratorOp createOp(T model) { return client.transactionOp() .create() - .withOptions( - modelSpec.createMode(), - fixAclList(modelSpec.aclList()), - modelSpec.createOptions().contains(CreateOption.compress), - modelSpec.ttl()) + .withOptions(modelSpec.createMode(), fixAclList(modelSpec.aclList()), isCompressed(), modelSpec.ttl()) .forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); } @@ -368,12 +368,13 @@ public CuratorOp updateOp(T model) { @Override public CuratorOp updateOp(T model, int version) { AsyncTransactionSetDataBuilder builder = client.transactionOp().setData(); + AsyncPathAndBytesable builder2; if (isCompressed()) { - return builder.withVersionCompressed(version) - .forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); + builder2 = builder.withVersionCompressed(version); + } else { + builder2 = builder.withVersionUncompressed(version); } - return builder.withVersion(version) - .forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); + return builder2.forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); } @Override @@ -408,14 +409,23 @@ public AsyncStage> inTransaction(List } private boolean isCompressed() { - return modelSpec.createOptions().contains(CreateOption.compress); + if (modelSpec.createOptions().contains(CreateOption.compress)) { + return true; + } else if (modelSpec.createOptions().contains(CreateOption.uncompress)) { + return false; + } else { + return client.unwrap().compressionEnabled(); + } } private ModelStage internalRead(Function, U> resolver, Stat storingStatIn) { Stat stat = (storingStatIn != null) ? storingStatIn : new Stat(); - AsyncPathable> next = isCompressed() - ? watchableClient.getData().decompressedStoringStatIn(stat) - : watchableClient.getData().storingStatIn(stat); + AsyncPathable> next; + if (isCompressed()) { + next = watchableClient.getData().decompressedStoringStatIn(stat); + } else { + next = watchableClient.getData().undecompressedStoringStatIn(stat); + } AsyncStage asyncStage = next.forPath(modelSpec.path().fullPath()); ModelStage modelStage = ModelStage.make(asyncStage.event()); asyncStage.whenComplete((value, e) -> { diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java index 1ee7d7334..78f26fa84 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/TestBasicOperations.java @@ -26,11 +26,13 @@ import static org.apache.zookeeper.CreateMode.PERSISTENT_SEQUENTIAL; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.IOException; import java.util.Arrays; +import java.util.Collections; import java.util.concurrent.CompletionStage; import java.util.concurrent.CountDownLatch; import org.apache.curator.framework.CuratorFramework; @@ -38,6 +40,7 @@ import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.async.api.CreateOption; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.data.Stat; @@ -200,4 +203,185 @@ public void testGetDataWithStat() { complete(client.getData().storingStatIn(stat).forPath("/test")); assertEquals(stat.getDataLength(), "hey".length()); } + + @Test + public void testCompression() { + // Test create + byte[] data = "hey".getBytes(); + complete(client.create().withOptions(Collections.singleton(compress)).forPath("/test", data)); + + Stat stat = new Stat(); + complete(client.getData().decompressedStoringStatIn(stat).forPath("/test"), (v, e) -> { + assertNull(e); + assertEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + complete(client.getData().storingStatIn(stat).forPath("/test"), (v, e) -> { + assertNull(e); + assertNotEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + complete(client.getData().undecompressedStoringStatIn(stat).forPath("/test"), (v, e) -> { + assertNull(e); + assertNotEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + + // Test setData + byte[] data2 = "hey2".getBytes(); + complete(client.setData().compressed().forPath("/test", data2)); + + stat = new Stat(); + complete(client.getData().decompressedStoringStatIn(stat).forPath("/test"), (v, e) -> { + assertNull(e); + assertEquals(data2.length, v.length); + }); + assertNotEquals(data2.length, stat.getDataLength()); + complete(client.getData().storingStatIn(stat).forPath("/test"), (v, e) -> { + assertNull(e); + assertNotEquals(data2.length, v.length); + }); + assertNotEquals(data2.length, stat.getDataLength()); + complete(client.getData().undecompressedStoringStatIn(stat).forPath("/test"), (v, e) -> { + assertNull(e); + assertNotEquals(data2.length, v.length); + }); + assertNotEquals(data2.length, stat.getDataLength()); + } + + @Test + public void testGlobalCompression() throws Exception { + try (CuratorFramework syncClient = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryOneTime(timing.forSleepingABit().milliseconds())) + .enableCompression() + .build()) { + syncClient.start(); + AsyncCuratorFramework client = AsyncCuratorFramework.wrap(syncClient); + + Stat stat = new Stat(); + byte[] data = "hey".getBytes(); + byte[] data2 = "hey2".getBytes(); + String path = "/test"; + + // Test create with explicit compression + complete( + client.create().withOptions(Collections.singleton(compress)).forPath(path, data)); + + complete(client.getData().decompressedStoringStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + complete(client.getData().storingStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + complete(client.getData().undecompressedStoringStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertNotEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + + // Test create explicitly without compression + syncClient.delete().forPath(path); + complete(client.create() + .withOptions(Collections.singleton(CreateOption.uncompress)) + .forPath(path, data)); + + complete(client.getData().decompressedStoringStatIn(stat).forPath(path), (v, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + assertEquals(data.length, stat.getDataLength()); + complete(client.getData().storingStatIn(stat).forPath(path), (v, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + assertEquals(data.length, stat.getDataLength()); + complete(client.getData().undecompressedStoringStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertEquals(data.length, v.length); + }); + assertEquals(data.length, stat.getDataLength()); + + // Test create with implicit (global) compression + syncClient.delete().forPath(path); + complete(client.create().forPath(path, data)); + + complete(client.getData().decompressedStoringStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + complete(client.getData().storingStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + complete(client.getData().undecompressedStoringStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertNotEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + + // Test setData with explicit compression + complete(client.setData().compressed().forPath(path, data)); + + complete(client.getData().decompressedStoringStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + complete(client.getData().storingStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + complete(client.getData().undecompressedStoringStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertNotEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + + // Test setData explicitly without compression + complete(client.setData().uncompressed().forPath(path, data)); + + complete(client.getData().decompressedStoringStatIn(stat).forPath(path), (v, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + assertEquals(data.length, stat.getDataLength()); + complete(client.getData().storingStatIn(stat).forPath(path), (v, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + assertEquals(data.length, stat.getDataLength()); + complete(client.getData().undecompressedStoringStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertEquals(data.length, v.length); + }); + assertEquals(data.length, stat.getDataLength()); + + // Test setData with implicit (global) compression + complete(client.setData().forPath(path, data)); + + complete(client.getData().decompressedStoringStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + complete(client.getData().storingStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + complete(client.getData().undecompressedStoringStatIn(stat).forPath(path), (v, e) -> { + assertNull(e); + assertNotEquals(data.length, v.length); + }); + assertNotEquals(data.length, stat.getDataLength()); + } + } } diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java index 768c90128..967b04744 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFramework.java @@ -300,4 +300,442 @@ public List getAclForPath(String path) { assertTrue(e.getCause() instanceof KeeperException.NoAuthException); } } + + @Test + public void testCompressedCreateAndRead() throws Exception { + try (CuratorFramework compressedRawClient = + createRawClientBuilder().enableCompression().build()) { + compressedRawClient.start(); + AsyncCuratorFramework compressedAsync = AsyncCuratorFramework.wrap(compressedRawClient); + TestModel rawModel = new TestModel("John", "Galt", "1 Galt's Gulch", 42, BigInteger.valueOf(1)); + + // These should be compressed + ModeledFramework clientWithCompressedFramework = + ModeledFramework.wrap(compressedAsync, modelSpec); + ModeledFramework clientWithCompressedModel = ModeledFramework.wrap(async, compressedModelSpec); + + // These should be uncompressed + ModeledFramework client = ModeledFramework.wrap(async, modelSpec); + ModeledFramework clientWithUncompressedModel = + ModeledFramework.wrap(async, uncompressedModelSpec); + ModeledFramework clientWithCompressedFrameworkAndUncompressedModel = + ModeledFramework.wrap(compressedAsync, uncompressedModelSpec); + + // Create with compressedFramework, read with all other clients + complete(clientWithCompressedFramework.set(rawModel), (path, e) -> assertNull(e)); + complete(clientWithCompressedModel.read(), (model, e) -> assertEquals(model, rawModel)); + complete(client.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithCompressedFrameworkAndUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + + // Create with compressedModel, read with all other clients + clientWithCompressedModel.delete(); + complete(clientWithCompressedModel.set(rawModel), (path, e) -> assertNull(e)); + complete(clientWithCompressedFramework.read(), (model, e) -> assertEquals(model, rawModel)); + complete(client.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithCompressedFrameworkAndUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + + // Create with regular (implicitly uncompressed) client, read with all other clients + client.delete(); + complete(client.set(rawModel), (path, e) -> assertNull(e)); + complete(clientWithUncompressedModel.read(), (model, e) -> assertEquals(model, rawModel)); + complete( + clientWithCompressedFrameworkAndUncompressedModel.read(), + (model, e) -> assertEquals(model, rawModel)); + complete(clientWithCompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + complete(clientWithCompressedFramework.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + + // Create with uncompressedModel, read with all other clients + clientWithUncompressedModel.delete(); + complete(clientWithUncompressedModel.set(rawModel), (path, e) -> assertNull(e)); + complete(client.read(), (model, e) -> assertEquals(model, rawModel)); + complete( + clientWithCompressedFrameworkAndUncompressedModel.read(), + (model, e) -> assertEquals(model, rawModel)); + complete(clientWithCompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + complete(clientWithCompressedFramework.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + + // Create with compressedFramework overriden by an uncompressedModel, read with all other clients + clientWithCompressedFrameworkAndUncompressedModel.delete(); + complete(clientWithCompressedFrameworkAndUncompressedModel.set(rawModel), (path, e) -> assertNull(e)); + complete(client.read(), (model, e) -> assertEquals(model, rawModel)); + complete(clientWithUncompressedModel.read(), (model, e) -> assertEquals(model, rawModel)); + complete(clientWithCompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + complete(clientWithCompressedFramework.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + clientWithCompressedFrameworkAndUncompressedModel.delete(); + } + } + + @Test + public void testCompressedUpdateAndRead() throws Exception { + try (CuratorFramework compressedRawClient = + createRawClientBuilder().enableCompression().build()) { + compressedRawClient.start(); + AsyncCuratorFramework compressedAsync = AsyncCuratorFramework.wrap(compressedRawClient); + TestModel rawModel = new TestModel("John", "Galt", "1 Galt's Gulch", 42, BigInteger.valueOf(1)); + + // These should be compressed + ModeledFramework clientWithCompressedFramework = + ModeledFramework.wrap(compressedAsync, modelSpec); + ModeledFramework clientWithCompressedModel = ModeledFramework.wrap(async, compressedModelSpec); + + // These should be uncompressed + ModeledFramework client = ModeledFramework.wrap(async, modelSpec); + ModeledFramework clientWithUncompressedModel = + ModeledFramework.wrap(async, uncompressedModelSpec); + ModeledFramework clientWithCompressedFrameworkAndUncompressedModel = + ModeledFramework.wrap(compressedAsync, uncompressedModelSpec); + + // Create the node - so we can update in each command + complete(client.set(rawModel), (model, e) -> assertNull(e)); + + // Update with compressedFramework, read with all other clients + complete(clientWithCompressedFramework.update(rawModel), (stat, e) -> assertNull(e)); + complete(clientWithCompressedModel.read(), (model, e) -> assertEquals(model, rawModel)); + complete(client.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithCompressedFrameworkAndUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + + // Update with compressedModel, read with all other clients + complete(clientWithCompressedModel.update(rawModel), (stat, e) -> assertNull(e)); + complete(clientWithCompressedFramework.read(), (model, e) -> assertEquals(model, rawModel)); + complete(client.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithCompressedFrameworkAndUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + + // Update with regular (implicitly uncompressed) client, read with all other clients + complete(client.update(rawModel), (stat, e) -> assertNull(e)); + complete(clientWithUncompressedModel.read(), (model, e) -> assertEquals(model, rawModel)); + complete( + clientWithCompressedFrameworkAndUncompressedModel.read(), + (model, e) -> assertEquals(model, rawModel)); + complete(clientWithCompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + complete(clientWithCompressedFramework.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + + // Update with uncompressedModel, read with all other clients + complete(clientWithUncompressedModel.update(rawModel), (stat, e) -> assertNull(e)); + complete(client.read(), (model, e) -> assertEquals(model, rawModel)); + complete( + clientWithCompressedFrameworkAndUncompressedModel.read(), + (model, e) -> assertEquals(model, rawModel)); + complete(clientWithCompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + complete(clientWithCompressedFramework.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + + // Update with compressedFramework overriden by an uncompressedModel, read with all other clients + complete(clientWithCompressedFrameworkAndUncompressedModel.update(rawModel), (stat, e) -> assertNull(e)); + complete(client.read(), (model, e) -> assertEquals(model, rawModel)); + complete(clientWithUncompressedModel.read(), (model, e) -> assertEquals(model, rawModel)); + complete(clientWithCompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + complete(clientWithCompressedFramework.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + } + } + + @Test + public void testCompressedCreateOp() throws Exception { + try (CuratorFramework compressedRawClient = + createRawClientBuilder().enableCompression().build()) { + compressedRawClient.start(); + AsyncCuratorFramework compressedAsync = AsyncCuratorFramework.wrap(compressedRawClient); + TestModel rawModel = new TestModel("John", "Galt", "1 Galt's Gulch", 42, BigInteger.valueOf(1)); + + // These should be compressed + ModeledFramework clientWithCompressedFramework = + ModeledFramework.wrap(compressedAsync, modelSpec); + ModeledFramework clientWithCompressedModel = ModeledFramework.wrap(async, compressedModelSpec); + + // These should be uncompressed + ModeledFramework client = ModeledFramework.wrap(async, modelSpec); + ModeledFramework clientWithUncompressedModel = + ModeledFramework.wrap(async, uncompressedModelSpec); + ModeledFramework clientWithCompressedFrameworkAndUncompressedModel = + ModeledFramework.wrap(compressedAsync, uncompressedModelSpec); + + // Make sure the parent node(s) exist + rawClient + .create() + .creatingParentsIfNeeded() + .forPath(modelSpec.path().parent().fullPath()); + + // Create with compressedFramework, read with all other clients + complete( + clientWithCompressedFramework.inTransaction( + Collections.singletonList(clientWithCompressedFramework.createOp(rawModel))), + (results, e) -> assertNull(e)); + complete(clientWithCompressedModel.read(), (model, e) -> assertEquals(model, rawModel)); + complete(client.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithCompressedFrameworkAndUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + + // Create with compressedModel, read with all other clients + clientWithCompressedModel.delete(); + complete( + clientWithCompressedModel.inTransaction( + Collections.singletonList(clientWithCompressedModel.createOp(rawModel))), + (results, e) -> assertNull(e)); + complete(clientWithCompressedFramework.read(), (model, e) -> assertEquals(model, rawModel)); + complete(client.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithCompressedFrameworkAndUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + + // Create with regular (implicitly uncompressed) client, read with all other clients + client.delete(); + complete( + client.inTransaction(Collections.singletonList(client.createOp(rawModel))), + (results, e) -> assertNull(e)); + complete(clientWithUncompressedModel.read(), (model, e) -> assertEquals(model, rawModel)); + complete( + clientWithCompressedFrameworkAndUncompressedModel.read(), + (model, e) -> assertEquals(model, rawModel)); + complete(clientWithCompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + complete(clientWithCompressedFramework.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + + // Create with uncompressedModel, read with all other clients + clientWithUncompressedModel.delete(); + complete( + clientWithUncompressedModel.inTransaction( + Collections.singletonList(clientWithUncompressedModel.createOp(rawModel))), + (results, e) -> assertNull(e)); + complete(client.read(), (model, e) -> assertEquals(model, rawModel)); + complete( + clientWithCompressedFrameworkAndUncompressedModel.read(), + (model, e) -> assertEquals(model, rawModel)); + complete(clientWithCompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + complete(clientWithCompressedFramework.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + + // Create with compressedFramework overriden by an uncompressedModel, read with all other clients + clientWithCompressedFrameworkAndUncompressedModel.delete(); + complete( + clientWithCompressedFrameworkAndUncompressedModel.inTransaction(Collections.singletonList( + clientWithCompressedFrameworkAndUncompressedModel.createOp(rawModel))), + (results, e) -> assertNull(e)); + complete(client.read(), (model, e) -> assertEquals(model, rawModel)); + complete(clientWithUncompressedModel.read(), (model, e) -> assertEquals(model, rawModel)); + complete(clientWithCompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + complete(clientWithCompressedFramework.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + clientWithCompressedFrameworkAndUncompressedModel.delete(); + } + } + + @Test + public void testCompressedUpdateOp() throws Exception { + try (CuratorFramework compressedRawClient = + createRawClientBuilder().enableCompression().build()) { + compressedRawClient.start(); + AsyncCuratorFramework compressedAsync = AsyncCuratorFramework.wrap(compressedRawClient); + TestModel rawModel = new TestModel("John", "Galt", "1 Galt's Gulch", 42, BigInteger.valueOf(1)); + + // These should be compressed + ModeledFramework clientWithCompressedFramework = + ModeledFramework.wrap(compressedAsync, modelSpec); + ModeledFramework clientWithCompressedModel = ModeledFramework.wrap(async, compressedModelSpec); + + // These should be uncompressed + ModeledFramework client = ModeledFramework.wrap(async, modelSpec); + ModeledFramework clientWithUncompressedModel = + ModeledFramework.wrap(async, uncompressedModelSpec); + ModeledFramework clientWithCompressedFrameworkAndUncompressedModel = + ModeledFramework.wrap(compressedAsync, uncompressedModelSpec); + + // Create the node - so we can update in each command + complete(client.set(rawModel), (model, e) -> assertNull(e)); + + // Update with compressedFramework, read with all other clients + complete( + clientWithCompressedFramework.inTransaction( + Collections.singletonList(clientWithCompressedFramework.updateOp(rawModel))), + (results, e) -> assertNull(e)); + complete(clientWithCompressedModel.read(), (model, e) -> assertEquals(model, rawModel)); + complete(client.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithCompressedFrameworkAndUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + + // Update with compressedModel, read with all other clients + complete( + clientWithCompressedModel.inTransaction( + Collections.singletonList(clientWithCompressedModel.updateOp(rawModel))), + (results, e) -> assertNull(e)); + complete(clientWithCompressedFramework.read(), (model, e) -> assertEquals(model, rawModel)); + complete(client.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + complete(clientWithCompressedFrameworkAndUncompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(RuntimeException.class, e.getClass()); + }); + + // Update with regular (implicitly uncompressed) client, read with all other clients + complete( + client.inTransaction(Collections.singletonList(client.updateOp(rawModel))), + (results, e) -> assertNull(e)); + complete(clientWithUncompressedModel.read(), (model, e) -> assertEquals(model, rawModel)); + complete( + clientWithCompressedFrameworkAndUncompressedModel.read(), + (model, e) -> assertEquals(model, rawModel)); + complete(clientWithCompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + complete(clientWithCompressedFramework.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + + // Update with uncompressedModel, read with all other clients + complete( + clientWithUncompressedModel.inTransaction( + Collections.singletonList(clientWithUncompressedModel.updateOp(rawModel))), + (results, e) -> assertNull(e)); + complete(client.read(), (model, e) -> assertEquals(model, rawModel)); + complete( + clientWithCompressedFrameworkAndUncompressedModel.read(), + (model, e) -> assertEquals(model, rawModel)); + complete(clientWithCompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + complete(clientWithCompressedFramework.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + + // Update with compressedFramework overriden by an uncompressedModel, read with all other clients + complete( + clientWithCompressedFrameworkAndUncompressedModel.inTransaction(Collections.singletonList( + clientWithCompressedFrameworkAndUncompressedModel.updateOp(rawModel))), + (results, e) -> assertNull(e)); + complete(client.read(), (model, e) -> assertEquals(model, rawModel)); + complete(clientWithUncompressedModel.read(), (model, e) -> assertEquals(model, rawModel)); + complete(clientWithCompressedModel.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + complete(clientWithCompressedFramework.read(), (model, e) -> { + assertNotNull(e); + assertEquals(KeeperException.DataInconsistencyException.class, e.getClass()); + }); + } + } } diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java index 2941ef33a..a33a1d5ec 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/TestModeledFrameworkBase.java @@ -19,12 +19,14 @@ package org.apache.curator.x.async.modeled; +import java.util.Collections; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.CompletableBaseClassForTests; +import org.apache.curator.x.async.api.CreateOption; import org.apache.curator.x.async.modeled.models.TestModel; import org.apache.curator.x.async.modeled.models.TestNewerModel; import org.junit.jupiter.api.AfterEach; @@ -35,15 +37,24 @@ public class TestModeledFrameworkBase extends CompletableBaseClassForTests { protected CuratorFramework rawClient; protected ModelSpec modelSpec; protected ModelSpec newModelSpec; + protected ModelSpec compressedModelSpec; + protected ModelSpec uncompressedModelSpec; protected AsyncCuratorFramework async; + public CuratorFrameworkFactory.Builder createRawClientBuilder() { + return CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryOneTime(1)) + .sessionTimeoutMs(timing.session()) + .connectionTimeoutMs(timing.connection()); + } + @BeforeEach @Override public void setup() throws Exception { super.setup(); - rawClient = CuratorFrameworkFactory.newClient( - server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1)); + rawClient = createRawClientBuilder().build(); rawClient.start(); async = AsyncCuratorFramework.wrap(rawClient); @@ -52,6 +63,12 @@ public void setup() throws Exception { modelSpec = ModelSpec.builder(path, serializer).build(); newModelSpec = ModelSpec.builder(path, newSerializer).build(); + compressedModelSpec = ModelSpec.builder(path, serializer) + .withCreateOptions(Collections.singleton(CreateOption.compress)) + .build(); + uncompressedModelSpec = ModelSpec.builder(path, serializer) + .withCreateOptions(Collections.singleton(CreateOption.uncompress)) + .build(); } @AfterEach