From 61f1e56a039060ffe5df398343b565e1691a945e Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Wed, 4 Dec 2024 15:37:39 -0600 Subject: [PATCH 1/4] CURATOR-725: Allow for global compression --- .../framework/CuratorFrameworkFactory.java | 17 ++++++ .../framework/imps/CreateBuilderImpl.java | 4 +- .../framework/imps/CuratorFrameworkImpl.java | 7 +++ .../framework/imps/GetDataBuilderImpl.java | 4 +- .../framework/imps/SetDataBuilderImpl.java | 4 +- .../imps/TempGetDataBuilderImpl.java | 2 +- .../framework/imps/TestCompression.java | 26 +++++++++ .../imps/TestCompressionInTransactionNew.java | 29 ++++++++++ .../imps/TestCompressionInTransactionOld.java | 55 +++++++++++++++++++ 9 files changed, 141 insertions(+), 7 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index d24b56d64..a5196a378 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -163,6 +163,7 @@ public static class Builder { private List authInfos = null; private byte[] defaultData = LOCAL_ADDRESS; private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER; + private boolean globalCompressionEnabled = false; private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY; private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER; private boolean canBeReadOnly = false; @@ -367,6 +368,18 @@ public Builder compressionProvider(CompressionProvider compressionProvider) { return this; } + /** + * By default, each write or read call must explicitly use compression. + * Call this method to enable compression on all read and write calls. + *

+ * In order to implement filtered compression, use this option and a custom {@link CompressionProvider} that only compresses and decompresses the zNodes that match the desired filter. + * @return this + */ + public Builder enableGlobalCompression() { + this.globalCompressionEnabled = true; + return this; + } + /** * @param zookeeperFactory the zookeeper factory to use * @return this @@ -542,6 +555,10 @@ public CompressionProvider getCompressionProvider() { return compressionProvider; } + public boolean globalCompressionEnabled() { + return globalCompressionEnabled; + } + public ThreadFactory getThreadFactory() { return threadFactory; } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index 5487f8560..97e2bfc0e 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -100,7 +100,7 @@ public class CreateBuilderImpl acling = new ACLing(client.getAclProvider()); createParentsIfNeeded = false; createParentsAsContainers = false; - compress = false; + compress = client.globalCompressionEnabled(); setDataIfExists = false; storingStat = null; ttl = -1; @@ -123,7 +123,7 @@ public CreateBuilderImpl( this.backgrounding = backgrounding; this.createParentsIfNeeded = createParentsIfNeeded; this.createParentsAsContainers = createParentsAsContainers; - this.compress = compress; + this.compress = client.globalCompressionEnabled() || compress; this.setDataIfExists = setDataIfExists; this.acling = new ACLing(client.getAclProvider(), aclList); this.storingStat = storingStat; diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index 816d0bda0..ca26b94f8 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -108,6 +108,7 @@ public class CuratorFrameworkImpl implements CuratorFramework { private final FailedDeleteManager failedDeleteManager; private final FailedRemoveWatchManager failedRemoveWatcherManager; private final CompressionProvider compressionProvider; + private final boolean globalCompressionEnabled; private final ACLProvider aclProvider; private final NamespaceFacadeCache namespaceFacadeCache; private final boolean useContainerParentsIfAvailable; @@ -184,6 +185,7 @@ public void process(WatchedEvent watchedEvent) { builder.getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory()); compressionProvider = builder.getCompressionProvider(); + globalCompressionEnabled = builder.globalCompressionEnabled(); aclProvider = builder.getAclProvider(); state = new AtomicReference(CuratorFrameworkState.LATENT); useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable(); @@ -283,6 +285,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) { failedDeleteManager = parent.failedDeleteManager; failedRemoveWatcherManager = parent.failedRemoveWatcherManager; compressionProvider = parent.compressionProvider; + globalCompressionEnabled = parent.globalCompressionEnabled; aclProvider = parent.aclProvider; namespaceFacadeCache = parent.namespaceFacadeCache; namespace = parent.namespace; @@ -642,6 +645,10 @@ CompressionProvider getCompressionProvider() { return compressionProvider; } + boolean globalCompressionEnabled() { + return globalCompressionEnabled; + } + boolean useContainerParentsIfAvailable() { return useContainerParentsIfAvailable; } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java index 7219cb518..a447397fa 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java @@ -46,7 +46,7 @@ public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation TransactionSetDataBuilder asTransactionSetDataBuilder( diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java index 63c9706c9..276b67332 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java @@ -35,7 +35,7 @@ class TempGetDataBuilderImpl implements TempGetDataBuilder { TempGetDataBuilderImpl(CuratorFrameworkImpl client) { this.client = client; responseStat = null; - decompress = false; + decompress = client.globalCompressionEnabled(); } @Override diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java index 691b74374..b889b5ed9 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java @@ -97,6 +97,32 @@ public void testSetData() throws Exception { } } + @Test + public void testSetDataGlobalCompression() throws Exception { + final byte[] data = "here's a string".getBytes(); + + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryOneTime(1)) + .enableGlobalCompression() + .build(); + try { + client.start(); + + // Write without explicit compression, read with explicit compression + client.create().creatingParentsIfNeeded().forPath("/a/b/c", data); + assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c")); + assertNotEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength()); + + // Write with explicit compression, read without explicit compression + client.setData().compressed().forPath("/a/b/c", data); + assertEquals(data.length, client.getData().forPath("/a/b/c").length); + assertNotEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength()); + } finally { + CloseableUtils.closeQuietly(client); + } + } + @Test public void testSimple() throws Exception { final byte[] data = "here's a string".getBytes(); diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java index 7d389e351..8b4b3764c 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java @@ -145,4 +145,33 @@ public void testCreateCompressedAndUncompressed() throws Exception { CloseableUtils.closeQuietly(client); } } + + @Test + public void testGlobalCompression() throws Exception { + final String path = "/a"; + final byte[] data = "here's a string".getBytes(); + + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryOneTime(1)) + .enableGlobalCompression() + .build(); + try { + client.start(); + + // Create compressed data in a transaction + CuratorOp op = client.transactionOp().create().forPath(path, data); + client.transaction().forOperations(op); + assertArrayEquals(data, client.getData().decompressed().forPath(path)); + assertNotEquals(data.length, client.checkExists().forPath(path).getDataLength()); + + // Set compressed data in transaction + op = client.transactionOp().setData().forPath(path, data); + client.transaction().forOperations(op); + assertArrayEquals(data, client.getData().decompressed().forPath(path)); + assertNotEquals(data.length, client.checkExists().forPath(path).getDataLength()); + } finally { + CloseableUtils.closeQuietly(client); + } + } } diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java index d347e20fe..77447c30d 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java @@ -172,4 +172,59 @@ public void testCreateCompressedAndUncompressed() throws Exception { CloseableUtils.closeQuietly(client); } } + + @Test + public void testGlobalCompression() throws Exception { + final String path1 = "/a"; + final String path2 = "/b"; + + final byte[] data1 = "here's a string".getBytes(); + final byte[] data2 = "here's another string".getBytes(); + + CuratorFramework client = CuratorFrameworkFactory.builder() + .connectString(server.getConnectString()) + .retryPolicy(new RetryOneTime(1)) + .enableGlobalCompression() + .build(); + try { + client.start(); + + // Create the nodes + client.inTransaction() + .create() + .forPath(path1, data1) + .and() + .create() + .forPath(path2, data2) + .and() + .commit(); + + // Check they exist + assertNotNull(client.checkExists().forPath(path1)); + assertNotEquals(data1.length, client.checkExists().forPath(path1).getDataLength()); + assertNotNull(client.checkExists().forPath(path2)); + assertNotEquals(data2.length, client.checkExists().forPath(path2).getDataLength()); + assertArrayEquals(data1, client.getData().decompressed().forPath(path1)); + assertArrayEquals(data2, client.getData().decompressed().forPath(path2)); + + // Set the nodes, path1 compressed, path2 uncompressed. + client.inTransaction() + .setData() + .forPath(path1, data2) + .and() + .setData() + .forPath(path2, data1) + .and() + .commit(); + + assertNotNull(client.checkExists().forPath(path1)); + assertNotEquals(data2.length, client.checkExists().forPath(path1).getDataLength()); + assertNotNull(client.checkExists().forPath(path2)); + assertNotEquals(data1.length, client.checkExists().forPath(path2).getDataLength()); + assertArrayEquals(data2, client.getData().decompressed().forPath(path1)); + assertArrayEquals(data1, client.getData().decompressed().forPath(path2)); + } finally { + CloseableUtils.closeQuietly(client); + } + } } From 88e0bb25ffe77cdc8c3201715bccee0c9b7e5378 Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Tue, 7 Jan 2025 11:48:36 -0600 Subject: [PATCH 2/4] Add uncompressed and undecompressed options. Fill in other APIs --- .../curator/framework/CuratorFramework.java | 7 +++ .../framework/CuratorFrameworkFactory.java | 12 ++--- .../curator/framework/api/Compressible.java | 8 +++ .../curator/framework/api/Decompressible.java | 8 +++ .../framework/imps/CreateBuilderImpl.java | 21 ++++++-- .../framework/imps/CuratorFrameworkImpl.java | 15 +++--- .../framework/imps/GetDataBuilderImpl.java | 21 +++++--- .../framework/imps/SetDataBuilderImpl.java | 23 +++++++-- .../imps/TempGetDataBuilderImpl.java | 9 +++- .../framework/imps/TestCompression.java | 2 +- .../imps/TestCompressionInTransactionNew.java | 2 +- .../imps/TestCompressionInTransactionOld.java | 2 +- .../x/async/api/AsyncGetDataBuilder.java | 18 +++++++ .../x/async/api/AsyncSetDataBuilder.java | 18 +++++++ .../api/AsyncTransactionCreateBuilder.java | 12 ++++- .../api/AsyncTransactionSetDataBuilder.java | 17 +++++++ .../curator/x/async/api/CreateOption.java | 5 ++ .../async/details/AsyncCreateBuilderImpl.java | 2 +- .../details/AsyncGetDataBuilderImpl.java | 16 +++++- .../details/AsyncSetDataBuilderImpl.java | 16 +++++- .../async/details/AsyncTransactionOpImpl.java | 35 ++++++++++--- .../modeled/details/ModeledFrameworkImpl.java | 49 ++++++++++++++----- 22 files changed, 264 insertions(+), 54 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java index 50f6e56ba..56b883388 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFramework.java @@ -345,6 +345,13 @@ public interface CuratorFramework extends Closeable { */ SchemaSet getSchemaSet(); + /** + * Return whether compression is enabled by default for all create, setData and getData operations. + * + * @return if compression is enabled + */ + boolean compressionEnabled(); + /** * Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is * done from the {@link #runSafe(Runnable)} thread. diff --git a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java index a5196a378..87f0e9f17 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/CuratorFrameworkFactory.java @@ -163,7 +163,7 @@ public static class Builder { private List authInfos = null; private byte[] defaultData = LOCAL_ADDRESS; private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER; - private boolean globalCompressionEnabled = false; + private boolean compressionEnabled = false; private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY; private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER; private boolean canBeReadOnly = false; @@ -370,13 +370,13 @@ public Builder compressionProvider(CompressionProvider compressionProvider) { /** * By default, each write or read call must explicitly use compression. - * Call this method to enable compression on all read and write calls. + * Call this method to enable compression by default on all read and write calls. *

* In order to implement filtered compression, use this option and a custom {@link CompressionProvider} that only compresses and decompresses the zNodes that match the desired filter. * @return this */ - public Builder enableGlobalCompression() { - this.globalCompressionEnabled = true; + public Builder enableCompression() { + this.compressionEnabled = true; return this; } @@ -555,8 +555,8 @@ public CompressionProvider getCompressionProvider() { return compressionProvider; } - public boolean globalCompressionEnabled() { - return globalCompressionEnabled; + public boolean compressionEnabled() { + return compressionEnabled; } public ThreadFactory getThreadFactory() { diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java index 1aaca83c7..4d0feef32 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Compressible.java @@ -26,4 +26,12 @@ public interface Compressible { * @return this */ public T compressed(); + + /** + * Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled + * + * @return this + */ + public T uncompressed(); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java b/curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java index b6e6bbba9..a60abb55c 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/api/Decompressible.java @@ -26,4 +26,12 @@ public interface Decompressible { * @return this */ public T decompressed(); + + /** + * Cause the data to not be de-compressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled + * + * @return this + */ + public T undecompressed(); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java index 97e2bfc0e..b76ce98f4 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CreateBuilderImpl.java @@ -100,7 +100,7 @@ public class CreateBuilderImpl acling = new ACLing(client.getAclProvider()); createParentsIfNeeded = false; createParentsAsContainers = false; - compress = client.globalCompressionEnabled(); + compress = client.compressionEnabled(); setDataIfExists = false; storingStat = null; ttl = -1; @@ -123,7 +123,7 @@ public CreateBuilderImpl( this.backgrounding = backgrounding; this.createParentsIfNeeded = createParentsIfNeeded; this.createParentsAsContainers = createParentsAsContainers; - this.compress = client.globalCompressionEnabled() || compress; + this.compress = compress; this.setDataIfExists = setDataIfExists; this.acling = new ACLing(client.getAclProvider(), aclList); this.storingStat = storingStat; @@ -193,6 +193,12 @@ public ACLCreateModePathAndBytesable compressed() { return this; } + @Override + public ACLCreateModePathAndBytesable uncompressed() { + CreateBuilderImpl.this.uncompressed(); + return this; + } + @Override public T forPath(String path) throws Exception { return forPath(path, client.getDefaultData()); @@ -216,7 +222,16 @@ public T forPath(String path, byte[] data) throws Exception { @Override public CreateBackgroundModeStatACLable compressed() { - compress = true; + return withCompression(true); + } + + @Override + public CreateBackgroundModeStatACLable uncompressed() { + return withCompression(false); + } + + private CreateBackgroundModeStatACLable withCompression(boolean compress) { + this.compress = compress; return new CreateBackgroundModeStatACLable() { @Override public CreateBackgroundModeACLable storingStatIn(Stat stat) { diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java index ca26b94f8..c36b120af 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorFrameworkImpl.java @@ -108,7 +108,7 @@ public class CuratorFrameworkImpl implements CuratorFramework { private final FailedDeleteManager failedDeleteManager; private final FailedRemoveWatchManager failedRemoveWatcherManager; private final CompressionProvider compressionProvider; - private final boolean globalCompressionEnabled; + private final boolean compressionEnabled; private final ACLProvider aclProvider; private final NamespaceFacadeCache namespaceFacadeCache; private final boolean useContainerParentsIfAvailable; @@ -185,7 +185,7 @@ public void process(WatchedEvent watchedEvent) { builder.getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory()); compressionProvider = builder.getCompressionProvider(); - globalCompressionEnabled = builder.globalCompressionEnabled(); + compressionEnabled = builder.compressionEnabled(); aclProvider = builder.getAclProvider(); state = new AtomicReference(CuratorFrameworkState.LATENT); useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable(); @@ -285,7 +285,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) { failedDeleteManager = parent.failedDeleteManager; failedRemoveWatcherManager = parent.failedRemoveWatcherManager; compressionProvider = parent.compressionProvider; - globalCompressionEnabled = parent.globalCompressionEnabled; + compressionEnabled = parent.compressionEnabled; aclProvider = parent.aclProvider; namespaceFacadeCache = parent.namespaceFacadeCache; namespace = parent.namespace; @@ -621,6 +621,11 @@ public SchemaSet getSchemaSet() { return schemaSet; } + @Override + public boolean compressionEnabled() { + return compressionEnabled; + } + ACLProvider getAclProvider() { return aclProvider; } @@ -645,10 +650,6 @@ CompressionProvider getCompressionProvider() { return compressionProvider; } - boolean globalCompressionEnabled() { - return globalCompressionEnabled; - } - boolean useContainerParentsIfAvailable() { return useContainerParentsIfAvailable; } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java index a447397fa..6f4c8fbbe 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java @@ -39,14 +39,14 @@ public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation inBackground() { @@ -78,7 +87,7 @@ public ErrorListenerPathable inBackground(BackgroundCallback callback, O @Override public ErrorListenerPathable inBackground( - BackgroundCallback callback, Object context, Executor executor) { + BackgroundCallback callback, Object context, Executor executor) { return GetDataBuilderImpl.this.inBackground(callback, context, executor); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java index 6629f7914..8534981ce 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java @@ -54,14 +54,14 @@ public class SetDataBuilderImpl this.client = client; backgrounding = new Backgrounding(); version = -1; - compress = client.globalCompressionEnabled(); + compress = client.compressionEnabled(); } public SetDataBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, int version, boolean compress) { this.client = client; this.backgrounding = backgrounding; this.version = version; - this.compress = client.globalCompressionEnabled() || compress; + this.compress = compress; } TransactionSetDataBuilder asTransactionSetDataBuilder( @@ -94,12 +94,27 @@ public VersionPathAndBytesable compressed() { compress = true; return this; } + + @Override + public VersionPathAndBytesable uncompressed() { + compress = false; + return this; + } }; } @Override public SetDataBackgroundVersionable compressed() { - compress = true; + return withCompression(true); + } + + @Override + public SetDataBackgroundVersionable uncompressed() { + return withCompression(false); + } + + public SetDataBackgroundVersionable withCompression(boolean compress) { + this.compress = compress; return new SetDataBackgroundVersionable() { @Override public ErrorListenerPathAndBytesable inBackground() { @@ -113,7 +128,7 @@ public ErrorListenerPathAndBytesable inBackground(BackgroundCallback callb @Override public ErrorListenerPathAndBytesable inBackground( - BackgroundCallback callback, Object context, Executor executor) { + BackgroundCallback callback, Object context, Executor executor) { return SetDataBuilderImpl.this.inBackground(callback, context, executor); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java index 276b67332..650d416f7 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java @@ -35,7 +35,7 @@ class TempGetDataBuilderImpl implements TempGetDataBuilder { TempGetDataBuilderImpl(CuratorFrameworkImpl client) { this.client = client; responseStat = null; - decompress = client.globalCompressionEnabled(); + decompress = client.compressionEnabled(); } @Override @@ -44,6 +44,13 @@ public StatPathable decompressed() { return this; } + + @Override + public StatPathable undecompressed() { + decompress = false; + return this; + } + @Override public Pathable storingStatIn(Stat stat) { responseStat = stat; diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java index b889b5ed9..2af286ae4 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompression.java @@ -104,7 +104,7 @@ public void testSetDataGlobalCompression() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .retryPolicy(new RetryOneTime(1)) - .enableGlobalCompression() + .enableCompression() .build(); try { client.start(); diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java index 8b4b3764c..d8b2d0f42 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionNew.java @@ -154,7 +154,7 @@ public void testGlobalCompression() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .retryPolicy(new RetryOneTime(1)) - .enableGlobalCompression() + .enableCompression() .build(); try { client.start(); diff --git a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java index 77447c30d..c8486f10d 100644 --- a/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java +++ b/curator-framework/src/test/java/org/apache/curator/framework/imps/TestCompressionInTransactionOld.java @@ -184,7 +184,7 @@ public void testGlobalCompression() throws Exception { CuratorFramework client = CuratorFrameworkFactory.builder() .connectString(server.getConnectString()) .retryPolicy(new RetryOneTime(1)) - .enableGlobalCompression() + .enableCompression() .build(); try { client.start(); diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncGetDataBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncGetDataBuilder.java index 1840e39b3..9c4c98d26 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncGetDataBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncGetDataBuilder.java @@ -33,6 +33,14 @@ public interface AsyncGetDataBuilder extends AsyncPathable> { */ AsyncPathable> decompressed(); + /** + * Cause the data to not be de-compressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled + * + * @return this + */ + AsyncPathable> undecompressed(); + /** * Have the operation fill the provided stat object * @@ -50,4 +58,14 @@ public interface AsyncGetDataBuilder extends AsyncPathable> { * @return this */ AsyncPathable> decompressedStoringStatIn(Stat stat); + + /** + * Have the operation fill the provided stat object without the data being de-compressed + * + * @param stat the stat to have filled in + * @see #undecompressed() + * @see #storingStatIn(org.apache.zookeeper.data.Stat) + * @return this + */ + AsyncPathable> undecompressedStoringStatIn(Stat stat); } diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncSetDataBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncSetDataBuilder.java index 972ddcfaa..8208ff551 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncSetDataBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncSetDataBuilder.java @@ -33,6 +33,14 @@ public interface AsyncSetDataBuilder extends AsyncPathAndBytesable> compressed(); + /** + * Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled + * + * @return this + */ + AsyncPathAndBytesable> uncompressed(); + /** * Cause the data to be compressed using the configured compression provider. * Only sets if the version matches. By default -1 is used @@ -43,6 +51,16 @@ public interface AsyncSetDataBuilder extends AsyncPathAndBytesable> compressedWithVersion(int version); + /** + * Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled. Only sets if the version matches. By default -1 is used + * which matches all versions. + * + * @param version version + * @return this + */ + AsyncPathAndBytesable> uncompressedWithVersion(int version); + /** * Only sets if the version matches. By default -1 is used * which matches all versions. diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java index 1bb7de523..674da31cb 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionCreateBuilder.java @@ -51,6 +51,14 @@ public interface AsyncTransactionCreateBuilder extends AsyncPathAndBytesable compressed(); + /** + * Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled + * + * @return this + */ + AsyncPathAndBytesable uncompressed(); + /** * Specify a TTL when mode is {@link org.apache.zookeeper.CreateMode#PERSISTENT_WITH_TTL} or * {@link org.apache.zookeeper.CreateMode#PERSISTENT_SEQUENTIAL_WITH_TTL}. If @@ -74,7 +82,7 @@ public interface AsyncTransactionCreateBuilder extends AsyncPathAndBytesable withOptions(CreateMode createMode, List aclList, boolean compressed); + AsyncPathAndBytesable withOptions(CreateMode createMode, List aclList, Boolean compressed); /** * Specify mode, acl list, compression and ttl @@ -89,5 +97,5 @@ public interface AsyncTransactionCreateBuilder extends AsyncPathAndBytesable withOptions( - CreateMode createMode, List aclList, boolean compressed, long ttl); + CreateMode createMode, List aclList, Boolean compressed, long ttl); } diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionSetDataBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionSetDataBuilder.java index 37f45a1d2..82a754f79 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionSetDataBuilder.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/AsyncTransactionSetDataBuilder.java @@ -40,6 +40,14 @@ public interface AsyncTransactionSetDataBuilder extends AsyncPathAndBytesable compressed(); + /** + * Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled + * + * @return this + */ + AsyncPathAndBytesable uncompressed(); + /** * Cause the data to be compressed using the configured compression provider. * Also changes the version number used. By default, -1 is used @@ -48,4 +56,13 @@ public interface AsyncTransactionSetDataBuilder extends AsyncPathAndBytesable withVersionCompressed(int version); + + /** + * Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework} + * has compressionEnabled. Also changes the version number used. By default, -1 is used + * + * @param version version to use + * @return this + */ + AsyncPathAndBytesable withVersionUncompressed(int version); } diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/api/CreateOption.java b/curator-x-async/src/main/java/org/apache/curator/x/async/api/CreateOption.java index 6214f9c1d..6b6a005fe 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/api/CreateOption.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/api/CreateOption.java @@ -69,6 +69,11 @@ public enum CreateOption { */ compress, + /** + * Cause the data to be uncompressed + */ + uncompress, + /** * If the ZNode already exists, Curator will instead call setData() */ diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java index 5ab579934..1017e11f4 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java @@ -167,7 +167,7 @@ private AsyncStage internalForPath(String path, byte[] data, boolean use || options.contains(CreateOption.createParentsAsContainers), options.contains(CreateOption.createParentsAsContainers), options.contains(CreateOption.doProtected), - options.contains(CreateOption.compress), + options.contains(CreateOption.compress) ? true : options.contains(CreateOption.uncompress) ? false : client.compressionEnabled(), options.contains(CreateOption.setDataIfExists), aclList, stat, diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java index a94d1052b..1498968a1 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncGetDataBuilderImpl.java @@ -33,13 +33,14 @@ class AsyncGetDataBuilderImpl implements AsyncGetDataBuilder { private final CuratorFrameworkImpl client; private final Filters filters; private final WatchMode watchMode; - private boolean decompressed = false; + private boolean decompressed; private Stat stat = null; AsyncGetDataBuilderImpl(CuratorFrameworkImpl client, Filters filters, WatchMode watchMode) { this.client = client; this.filters = filters; this.watchMode = watchMode; + this.decompressed = client.compressionEnabled(); } @Override @@ -48,6 +49,12 @@ public AsyncPathable> decompressed() { return this; } + @Override + public AsyncPathable> undecompressed() { + decompressed = false; + return this; + } + @Override public AsyncPathable> storingStatIn(Stat stat) { this.stat = stat; @@ -61,6 +68,13 @@ public AsyncPathable> decompressedStoringStatIn(Stat stat) { return this; } + @Override + public AsyncPathable> undecompressedStoringStatIn(Stat stat) { + decompressed = false; + this.stat = stat; + return this; + } + @Override public AsyncStage forPath(String path) { BuilderCommon common = new BuilderCommon<>(filters, watchMode, dataProc); diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java index af1452457..0de17249d 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncSetDataBuilderImpl.java @@ -31,12 +31,13 @@ class AsyncSetDataBuilderImpl implements AsyncSetDataBuilder { private final CuratorFrameworkImpl client; private final Filters filters; - private boolean compressed = false; + private boolean compressed; private int version = -1; AsyncSetDataBuilderImpl(CuratorFrameworkImpl client, Filters filters) { this.client = client; this.filters = filters; + this.compressed = client.compressionEnabled(); } @Override @@ -55,6 +56,12 @@ public AsyncPathAndBytesable> compressed() { return this; } + @Override + public AsyncPathAndBytesable> uncompressed() { + compressed = false; + return this; + } + @Override public AsyncPathAndBytesable> compressedWithVersion(int version) { compressed = true; @@ -62,6 +69,13 @@ public AsyncPathAndBytesable> compressedWithVersion(int version return this; } + @Override + public AsyncPathAndBytesable> uncompressedWithVersion(int version) { + compressed = false; + this.version = version; + return this; + } + @Override public AsyncPathAndBytesable> withVersion(int version) { this.version = version; diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java index e9ccbf16d..97e8c3187 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java @@ -50,7 +50,7 @@ public AsyncTransactionCreateBuilder create() { return new AsyncTransactionCreateBuilder() { private List aclList = null; private CreateMode createMode = CreateMode.PERSISTENT; - private boolean compressed = false; + private boolean compressed = client.compressionEnabled(); private long ttl = -1; @Override @@ -71,6 +71,12 @@ public AsyncPathAndBytesable compressed() { return this; } + @Override + public AsyncPathAndBytesable uncompressed() { + compressed = false; + return this; + } + @Override public AsyncPathAndBytesable withTtl(long ttl) { this.ttl = ttl; @@ -79,16 +85,20 @@ public AsyncPathAndBytesable withTtl(long ttl) { @Override public AsyncPathAndBytesable withOptions( - CreateMode createMode, List aclList, boolean compressed) { + CreateMode createMode, List aclList, Boolean compressed) { return withOptions(createMode, aclList, compressed, ttl); } @Override public AsyncPathAndBytesable withOptions( - CreateMode createMode, List aclList, boolean compressed, long ttl) { + CreateMode createMode, List aclList, Boolean compressed, long ttl) { this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null"); this.aclList = aclList; - this.compressed = compressed; + if (compressed == Boolean.TRUE) { + this.compressed = true; + } else if (compressed == Boolean.FALSE) { + this.compressed = false; + } this.ttl = ttl; return this; } @@ -108,7 +118,7 @@ private CuratorOp internalForPath(String path, byte[] data, boolean useData) { ? client.transactionOp().create().withTtl(ttl) : client.transactionOp().create(); ACLPathAndBytesable builder2 = - compressed ? builder1.compressed().withMode(createMode) : builder1.withMode(createMode); + compressed ? builder1.compressed().withMode(createMode) : builder1.uncompressed().withMode(createMode); PathAndBytesable builder3 = builder2.withACL(aclList); try { return useData ? builder3.forPath(path, data) : builder3.forPath(path); @@ -145,7 +155,7 @@ public CuratorOp forPath(String path) { public AsyncTransactionSetDataBuilder setData() { return new AsyncTransactionSetDataBuilder() { private int version = -1; - private boolean compressed = false; + private boolean compressed = client.compressionEnabled(); @Override public AsyncPathAndBytesable withVersion(int version) { @@ -159,6 +169,12 @@ public AsyncPathAndBytesable compressed() { return this; } + @Override + public AsyncPathAndBytesable uncompressed() { + compressed = false; + return this; + } + @Override public AsyncPathAndBytesable withVersionCompressed(int version) { this.version = version; @@ -166,6 +182,13 @@ public AsyncPathAndBytesable withVersionCompressed(int version) { return this; } + @Override + public AsyncPathAndBytesable withVersionUncompressed(int version) { + this.version = version; + compressed = false; + return this; + } + @Override public CuratorOp forPath(String path, byte[] data) { return internalForPath(path, data, true); diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index 01e95decc..d0a315eac 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -213,8 +213,15 @@ public AsyncStage update(T item, int version) { try { byte[] bytes = modelSpec.serializer().serialize(item); AsyncSetDataBuilder dataBuilder = dslClient.setData(); - AsyncPathAndBytesable> next = - isCompressed() ? dataBuilder.compressedWithVersion(version) : dataBuilder.withVersion(version); + AsyncPathAndBytesable> next; + Boolean isCompressed = isCompressed(); + if (isCompressed == Boolean.TRUE) { + next = dataBuilder.compressedWithVersion(version); + } else if (isCompressed == Boolean.FALSE) { + next = dataBuilder.uncompressedWithVersion(version); + } else { + next = dataBuilder.withVersion(version); + } return next.forPath(resolveForSet(item), bytes); } catch (Exception e) { return ModelStage.exceptionally(e); @@ -355,7 +362,7 @@ public CuratorOp createOp(T model) { .withOptions( modelSpec.createMode(), fixAclList(modelSpec.aclList()), - modelSpec.createOptions().contains(CreateOption.compress), + isCompressed(), modelSpec.ttl()) .forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); } @@ -368,12 +375,16 @@ public CuratorOp updateOp(T model) { @Override public CuratorOp updateOp(T model, int version) { AsyncTransactionSetDataBuilder builder = client.transactionOp().setData(); - if (isCompressed()) { - return builder.withVersionCompressed(version) - .forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); + Boolean isCompressed = isCompressed(); + AsyncPathAndBytesable builder2; + if (isCompressed == Boolean.TRUE) { + builder2 = builder.withVersionCompressed(version); + } else if (isCompressed == Boolean.FALSE) { + builder2 = builder.withVersionUncompressed(version); + } else { + builder2 = builder.withVersion(version); } - return builder.withVersion(version) - .forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); + return builder2.forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); } @Override @@ -407,15 +418,27 @@ public AsyncStage> inTransaction(List return client.transaction().forOperations(operations); } - private boolean isCompressed() { - return modelSpec.createOptions().contains(CreateOption.compress); + private Boolean isCompressed() { + if (modelSpec.createOptions().contains(CreateOption.compress)) { + return Boolean.TRUE; + } else if (modelSpec.createOptions().contains(CreateOption.uncompress)) { + return Boolean.FALSE; + } else { + return null; + } } private ModelStage internalRead(Function, U> resolver, Stat storingStatIn) { Stat stat = (storingStatIn != null) ? storingStatIn : new Stat(); - AsyncPathable> next = isCompressed() - ? watchableClient.getData().decompressedStoringStatIn(stat) - : watchableClient.getData().storingStatIn(stat); + AsyncPathable> next; + Boolean isCompressed = isCompressed(); + if (isCompressed == Boolean.TRUE) { + next = watchableClient.getData().decompressedStoringStatIn(stat); + } else if (isCompressed == Boolean.FALSE) { + next = watchableClient.getData().undecompressedStoringStatIn(stat); + } else { + next = watchableClient.getData().storingStatIn(stat); + } AsyncStage asyncStage = next.forPath(modelSpec.path().fullPath()); ModelStage modelStage = ModelStage.make(asyncStage.event()); asyncStage.whenComplete((value, e) -> { From 33be92527e41a3716e2893a779606d6c4de5738f Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Tue, 7 Jan 2025 12:40:02 -0600 Subject: [PATCH 3/4] Spotless --- .../apache/curator/framework/imps/GetDataBuilderImpl.java | 2 +- .../apache/curator/framework/imps/SetDataBuilderImpl.java | 2 +- .../curator/framework/imps/TempGetDataBuilderImpl.java | 1 - .../curator/x/async/details/AsyncCreateBuilderImpl.java | 4 +++- .../curator/x/async/details/AsyncTransactionOpImpl.java | 5 +++-- .../x/async/modeled/details/ModeledFrameworkImpl.java | 6 +----- 6 files changed, 9 insertions(+), 11 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java index 6f4c8fbbe..3468ce6be 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java @@ -87,7 +87,7 @@ public ErrorListenerPathable inBackground(BackgroundCallback callback, O @Override public ErrorListenerPathable inBackground( - BackgroundCallback callback, Object context, Executor executor) { + BackgroundCallback callback, Object context, Executor executor) { return GetDataBuilderImpl.this.inBackground(callback, context, executor); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java index 8534981ce..98b21e33a 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/SetDataBuilderImpl.java @@ -128,7 +128,7 @@ public ErrorListenerPathAndBytesable inBackground(BackgroundCallback callb @Override public ErrorListenerPathAndBytesable inBackground( - BackgroundCallback callback, Object context, Executor executor) { + BackgroundCallback callback, Object context, Executor executor) { return SetDataBuilderImpl.this.inBackground(callback, context, executor); } diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java index 650d416f7..fabf255dd 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/TempGetDataBuilderImpl.java @@ -44,7 +44,6 @@ public StatPathable decompressed() { return this; } - @Override public StatPathable undecompressed() { decompress = false; diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java index 1017e11f4..6f9a6c045 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncCreateBuilderImpl.java @@ -167,7 +167,9 @@ private AsyncStage internalForPath(String path, byte[] data, boolean use || options.contains(CreateOption.createParentsAsContainers), options.contains(CreateOption.createParentsAsContainers), options.contains(CreateOption.doProtected), - options.contains(CreateOption.compress) ? true : options.contains(CreateOption.uncompress) ? false : client.compressionEnabled(), + options.contains(CreateOption.compress) + ? true + : options.contains(CreateOption.uncompress) ? false : client.compressionEnabled(), options.contains(CreateOption.setDataIfExists), aclList, stat, diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java index 97e8c3187..7de1ff4e6 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java @@ -117,8 +117,9 @@ private CuratorOp internalForPath(String path, byte[] data, boolean useData) { TransactionCreateBuilder2 builder1 = (ttl > 0) ? client.transactionOp().create().withTtl(ttl) : client.transactionOp().create(); - ACLPathAndBytesable builder2 = - compressed ? builder1.compressed().withMode(createMode) : builder1.uncompressed().withMode(createMode); + ACLPathAndBytesable builder2 = compressed + ? builder1.compressed().withMode(createMode) + : builder1.uncompressed().withMode(createMode); PathAndBytesable builder3 = builder2.withACL(aclList); try { return useData ? builder3.forPath(path, data) : builder3.forPath(path); diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index d0a315eac..02a2bb39f 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -359,11 +359,7 @@ public static boolean isCompressed(Set createOptions) { public CuratorOp createOp(T model) { return client.transactionOp() .create() - .withOptions( - modelSpec.createMode(), - fixAclList(modelSpec.aclList()), - isCompressed(), - modelSpec.ttl()) + .withOptions(modelSpec.createMode(), fixAclList(modelSpec.aclList()), isCompressed(), modelSpec.ttl()) .forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); } From 7586f77e98e25187f0a29116c45d2d913cb5f0da Mon Sep 17 00:00:00 2001 From: Houston Putman Date: Wed, 8 Jan 2025 12:22:27 -0600 Subject: [PATCH 4/4] Remove Boolean classes. use client.unwrap() --- .../framework/imps/GetDataBuilderImpl.java | 6 ++-- .../api/AsyncTransactionCreateBuilder.java | 4 +-- .../async/details/AsyncTransactionOpImpl.java | 10 ++----- .../modeled/details/ModeledFrameworkImpl.java | 29 +++++++------------ 4 files changed, 18 insertions(+), 31 deletions(-) diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java index 3468ce6be..df6d15cbd 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/GetDataBuilderImpl.java @@ -39,7 +39,7 @@ public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation withOptions(CreateMode createMode, List aclList, Boolean compressed); + AsyncPathAndBytesable withOptions(CreateMode createMode, List aclList, boolean compressed); /** * Specify mode, acl list, compression and ttl @@ -97,5 +97,5 @@ public interface AsyncTransactionCreateBuilder extends AsyncPathAndBytesable withOptions( - CreateMode createMode, List aclList, Boolean compressed, long ttl); + CreateMode createMode, List aclList, boolean compressed, long ttl); } diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java index 7de1ff4e6..6d1d44f43 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/details/AsyncTransactionOpImpl.java @@ -85,20 +85,16 @@ public AsyncPathAndBytesable withTtl(long ttl) { @Override public AsyncPathAndBytesable withOptions( - CreateMode createMode, List aclList, Boolean compressed) { + CreateMode createMode, List aclList, boolean compressed) { return withOptions(createMode, aclList, compressed, ttl); } @Override public AsyncPathAndBytesable withOptions( - CreateMode createMode, List aclList, Boolean compressed, long ttl) { + CreateMode createMode, List aclList, boolean compressed, long ttl) { this.createMode = Objects.requireNonNull(createMode, "createMode cannot be null"); this.aclList = aclList; - if (compressed == Boolean.TRUE) { - this.compressed = true; - } else if (compressed == Boolean.FALSE) { - this.compressed = false; - } + this.compressed = compressed; this.ttl = ttl; return this; } diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java index 02a2bb39f..c02c93127 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/details/ModeledFrameworkImpl.java @@ -214,13 +214,10 @@ public AsyncStage update(T item, int version) { byte[] bytes = modelSpec.serializer().serialize(item); AsyncSetDataBuilder dataBuilder = dslClient.setData(); AsyncPathAndBytesable> next; - Boolean isCompressed = isCompressed(); - if (isCompressed == Boolean.TRUE) { + if (isCompressed()) { next = dataBuilder.compressedWithVersion(version); - } else if (isCompressed == Boolean.FALSE) { - next = dataBuilder.uncompressedWithVersion(version); } else { - next = dataBuilder.withVersion(version); + next = dataBuilder.uncompressedWithVersion(version); } return next.forPath(resolveForSet(item), bytes); } catch (Exception e) { @@ -371,14 +368,11 @@ public CuratorOp updateOp(T model) { @Override public CuratorOp updateOp(T model, int version) { AsyncTransactionSetDataBuilder builder = client.transactionOp().setData(); - Boolean isCompressed = isCompressed(); AsyncPathAndBytesable builder2; - if (isCompressed == Boolean.TRUE) { + if (isCompressed()) { builder2 = builder.withVersionCompressed(version); - } else if (isCompressed == Boolean.FALSE) { - builder2 = builder.withVersionUncompressed(version); } else { - builder2 = builder.withVersion(version); + builder2 = builder.withVersionUncompressed(version); } return builder2.forPath(resolveForSet(model), modelSpec.serializer().serialize(model)); } @@ -414,26 +408,23 @@ public AsyncStage> inTransaction(List return client.transaction().forOperations(operations); } - private Boolean isCompressed() { + private boolean isCompressed() { if (modelSpec.createOptions().contains(CreateOption.compress)) { - return Boolean.TRUE; + return true; } else if (modelSpec.createOptions().contains(CreateOption.uncompress)) { - return Boolean.FALSE; + return false; } else { - return null; + return client.unwrap().compressionEnabled(); } } private ModelStage internalRead(Function, U> resolver, Stat storingStatIn) { Stat stat = (storingStatIn != null) ? storingStatIn : new Stat(); AsyncPathable> next; - Boolean isCompressed = isCompressed(); - if (isCompressed == Boolean.TRUE) { + if (isCompressed()) { next = watchableClient.getData().decompressedStoringStatIn(stat); - } else if (isCompressed == Boolean.FALSE) { - next = watchableClient.getData().undecompressedStoringStatIn(stat); } else { - next = watchableClient.getData().storingStatIn(stat); + next = watchableClient.getData().undecompressedStoringStatIn(stat); } AsyncStage asyncStage = next.forPath(modelSpec.path().fullPath()); ModelStage modelStage = ModelStage.make(asyncStage.event());