Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CURATOR-725: Allow for global compression #512

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,13 @@ public interface CuratorFramework extends Closeable {
*/
SchemaSet getSchemaSet();

/**
* Return whether compression is enabled by default for all create, setData and getData operations.
*
* @return if compression is enabled
*/
boolean compressionEnabled();

/**
* Calls {@link #notifyAll()} on the given object after first synchronizing on it. This is
* done from the {@link #runSafe(Runnable)} thread.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ public static class Builder {
private List<AuthInfo> authInfos = null;
private byte[] defaultData = LOCAL_ADDRESS;
private CompressionProvider compressionProvider = DEFAULT_COMPRESSION_PROVIDER;
private boolean compressionEnabled = false;
private ZookeeperFactory zookeeperFactory = DEFAULT_ZOOKEEPER_FACTORY;
private ACLProvider aclProvider = DEFAULT_ACL_PROVIDER;
private boolean canBeReadOnly = false;
Expand Down Expand Up @@ -367,6 +368,18 @@ public Builder compressionProvider(CompressionProvider compressionProvider) {
return this;
}

/**
* By default, each write or read call must explicitly use compression.
* Call this method to enable compression by default on all read and write calls.
* <p>
* In order to implement filtered compression, use this option and a custom {@link CompressionProvider} that only compresses and decompresses the zNodes that match the desired filter.
* @return this
*/
public Builder enableCompression() {
this.compressionEnabled = true;
return this;
}

/**
* @param zookeeperFactory the zookeeper factory to use
* @return this
Expand Down Expand Up @@ -542,6 +555,10 @@ public CompressionProvider getCompressionProvider() {
return compressionProvider;
}

public boolean compressionEnabled() {
return compressionEnabled;
}

public ThreadFactory getThreadFactory() {
return threadFactory;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ public interface Compressible<T> {
* @return this
*/
public T compressed();

/**
* Cause the data to be uncompressed, even if the {@link org.apache.curator.framework.CuratorFramework}
* has compressionEnabled
*
* @return this
*/
public T uncompressed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,12 @@ public interface Decompressible<T> {
* @return this
*/
public T decompressed();

/**
* Cause the data to not be de-compressed, even if the {@link org.apache.curator.framework.CuratorFramework}
* has compressionEnabled
*
* @return this
*/
public T undecompressed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public class CreateBuilderImpl
acling = new ACLing(client.getAclProvider());
createParentsIfNeeded = false;
createParentsAsContainers = false;
compress = false;
compress = client.compressionEnabled();
setDataIfExists = false;
storingStat = null;
ttl = -1;
Expand Down Expand Up @@ -193,6 +193,12 @@ public ACLCreateModePathAndBytesable<T> compressed() {
return this;
}

@Override
public ACLCreateModePathAndBytesable<T> uncompressed() {
CreateBuilderImpl.this.uncompressed();
return this;
}

@Override
public T forPath(String path) throws Exception {
return forPath(path, client.getDefaultData());
Expand All @@ -216,7 +222,16 @@ public T forPath(String path, byte[] data) throws Exception {

@Override
public CreateBackgroundModeStatACLable compressed() {
compress = true;
return withCompression(true);
}

@Override
public CreateBackgroundModeStatACLable uncompressed() {
return withCompression(false);
}

private CreateBackgroundModeStatACLable withCompression(boolean compress) {
this.compress = compress;
return new CreateBackgroundModeStatACLable() {
@Override
public CreateBackgroundModeACLable storingStatIn(Stat stat) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ public class CuratorFrameworkImpl implements CuratorFramework {
private final FailedDeleteManager failedDeleteManager;
private final FailedRemoveWatchManager failedRemoveWatcherManager;
private final CompressionProvider compressionProvider;
private final boolean compressionEnabled;
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
private final boolean useContainerParentsIfAvailable;
Expand Down Expand Up @@ -184,6 +185,7 @@ public void process(WatchedEvent watchedEvent) {
builder.getSimulatedSessionExpirationPercent(),
builder.getConnectionStateListenerManagerFactory());
compressionProvider = builder.getCompressionProvider();
compressionEnabled = builder.compressionEnabled();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
Expand Down Expand Up @@ -283,6 +285,7 @@ protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) {
failedDeleteManager = parent.failedDeleteManager;
failedRemoveWatcherManager = parent.failedRemoveWatcherManager;
compressionProvider = parent.compressionProvider;
compressionEnabled = parent.compressionEnabled;
aclProvider = parent.aclProvider;
namespaceFacadeCache = parent.namespaceFacadeCache;
namespace = parent.namespace;
Expand Down Expand Up @@ -618,6 +621,11 @@ public SchemaSet getSchemaSet() {
return schemaSet;
}

@Override
public boolean compressionEnabled() {
return compressionEnabled;
}

ACLProvider getAclProvider() {
return aclProvider;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class GetDataBuilderImpl implements GetDataBuilder, BackgroundOperation<S
responseStat = null;
watching = new Watching(client);
backgrounding = new Backgrounding();
decompress = false;
decompress = client.compressionEnabled();
}

public GetDataBuilderImpl(
Expand All @@ -64,7 +64,16 @@ public GetDataBuilderImpl(

@Override
public GetDataWatchBackgroundStatable decompressed() {
decompress = true;
return withDecompression(true);
}

@Override
public GetDataWatchBackgroundStatable undecompressed() {
return withDecompression(false);
}

private GetDataWatchBackgroundStatable withDecompression(boolean decompress) {
this.decompress = decompress;
return new GetDataWatchBackgroundStatable() {
@Override
public ErrorListenerPathable<byte[]> inBackground() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class SetDataBuilderImpl
this.client = client;
backgrounding = new Backgrounding();
version = -1;
compress = false;
compress = client.compressionEnabled();
}

public SetDataBuilderImpl(CuratorFrameworkImpl client, Backgrounding backgrounding, int version, boolean compress) {
Expand Down Expand Up @@ -94,12 +94,27 @@ public VersionPathAndBytesable<T> compressed() {
compress = true;
return this;
}

@Override
public VersionPathAndBytesable<T> uncompressed() {
compress = false;
return this;
}
};
}

@Override
public SetDataBackgroundVersionable compressed() {
compress = true;
return withCompression(true);
}

@Override
public SetDataBackgroundVersionable uncompressed() {
return withCompression(false);
}

public SetDataBackgroundVersionable withCompression(boolean compress) {
this.compress = compress;
return new SetDataBackgroundVersionable() {
@Override
public ErrorListenerPathAndBytesable<Stat> inBackground() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class TempGetDataBuilderImpl implements TempGetDataBuilder {
TempGetDataBuilderImpl(CuratorFrameworkImpl client) {
this.client = client;
responseStat = null;
decompress = false;
decompress = client.compressionEnabled();
}

@Override
Expand All @@ -44,6 +44,12 @@ public StatPathable<byte[]> decompressed() {
return this;
}

@Override
public StatPathable<byte[]> undecompressed() {
decompress = false;
return this;
}

@Override
public Pathable<byte[]> storingStatIn(Stat stat) {
responseStat = stat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,32 @@ public void testSetData() throws Exception {
}
}

@Test
public void testSetDataGlobalCompression() throws Exception {
final byte[] data = "here's a string".getBytes();

CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
.enableCompression()
.build();
try {
client.start();

// Write without explicit compression, read with explicit compression
client.create().creatingParentsIfNeeded().forPath("/a/b/c", data);
assertArrayEquals(data, client.getData().decompressed().forPath("/a/b/c"));
assertNotEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength());

// Write with explicit compression, read without explicit compression
client.setData().compressed().forPath("/a/b/c", data);
assertEquals(data.length, client.getData().forPath("/a/b/c").length);
assertNotEquals(data.length, client.checkExists().forPath("/a/b/c").getDataLength());
} finally {
CloseableUtils.closeQuietly(client);
}
}

@Test
public void testSimple() throws Exception {
final byte[] data = "here's a string".getBytes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,4 +145,33 @@ public void testCreateCompressedAndUncompressed() throws Exception {
CloseableUtils.closeQuietly(client);
}
}

@Test
public void testGlobalCompression() throws Exception {
final String path = "/a";
final byte[] data = "here's a string".getBytes();

CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
.enableCompression()
.build();
try {
client.start();

// Create compressed data in a transaction
CuratorOp op = client.transactionOp().create().forPath(path, data);
client.transaction().forOperations(op);
assertArrayEquals(data, client.getData().decompressed().forPath(path));
assertNotEquals(data.length, client.checkExists().forPath(path).getDataLength());

// Set compressed data in transaction
op = client.transactionOp().setData().forPath(path, data);
client.transaction().forOperations(op);
assertArrayEquals(data, client.getData().decompressed().forPath(path));
assertNotEquals(data.length, client.checkExists().forPath(path).getDataLength());
} finally {
CloseableUtils.closeQuietly(client);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,59 @@ public void testCreateCompressedAndUncompressed() throws Exception {
CloseableUtils.closeQuietly(client);
}
}

@Test
public void testGlobalCompression() throws Exception {
final String path1 = "/a";
final String path2 = "/b";

final byte[] data1 = "here's a string".getBytes();
final byte[] data2 = "here's another string".getBytes();

CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.retryPolicy(new RetryOneTime(1))
.enableCompression()
.build();
try {
client.start();

// Create the nodes
client.inTransaction()
.create()
.forPath(path1, data1)
.and()
.create()
.forPath(path2, data2)
.and()
.commit();

// Check they exist
assertNotNull(client.checkExists().forPath(path1));
assertNotEquals(data1.length, client.checkExists().forPath(path1).getDataLength());
assertNotNull(client.checkExists().forPath(path2));
assertNotEquals(data2.length, client.checkExists().forPath(path2).getDataLength());
assertArrayEquals(data1, client.getData().decompressed().forPath(path1));
assertArrayEquals(data2, client.getData().decompressed().forPath(path2));

// Set the nodes, path1 compressed, path2 uncompressed.
client.inTransaction()
.setData()
.forPath(path1, data2)
.and()
.setData()
.forPath(path2, data1)
Copy link
Member

@kezhuw kezhuw Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
.forPath(path2, data1)
.uncompressed()
.forPath(path2, data1)

Also, I suggest to introduce new variables newData1/newData2 or reassign data1/data2. The following code is not friendly to eyeballs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A separate test method for this case is also good from my side just like testSimple and testCreateCompressedAndUncompressed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code and comment diverge here.

.and()
.commit();

assertNotNull(client.checkExists().forPath(path1));
assertNotEquals(data2.length, client.checkExists().forPath(path1).getDataLength());
assertNotNull(client.checkExists().forPath(path2));
assertNotEquals(data1.length, client.checkExists().forPath(path2).getDataLength());
assertArrayEquals(data2, client.getData().decompressed().forPath(path1));
assertArrayEquals(data1, client.getData().decompressed().forPath(path2));
} finally {
CloseableUtils.closeQuietly(client);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ public interface AsyncGetDataBuilder extends AsyncPathable<AsyncStage<byte[]>> {
*/
AsyncPathable<AsyncStage<byte[]>> decompressed();

/**
* Cause the data to not be de-compressed, even if the {@link org.apache.curator.framework.CuratorFramework}
* has compressionEnabled
*
* @return this
*/
AsyncPathable<AsyncStage<byte[]>> undecompressed();

/**
* Have the operation fill the provided stat object
*
Expand All @@ -50,4 +58,14 @@ public interface AsyncGetDataBuilder extends AsyncPathable<AsyncStage<byte[]>> {
* @return this
*/
AsyncPathable<AsyncStage<byte[]>> decompressedStoringStatIn(Stat stat);

/**
* Have the operation fill the provided stat object without the data being de-compressed
*
* @param stat the stat to have filled in
* @see #undecompressed()
* @see #storingStatIn(org.apache.zookeeper.data.Stat)
* @return this
*/
AsyncPathable<AsyncStage<byte[]>> undecompressedStoringStatIn(Stat stat);
}
Loading
Loading