Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Perf/auto batch flush #2938

Closed
wants to merge 16 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 135 additions & 0 deletions src/main/java/io/lettuce/core/AutoBatchFlushOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package io.lettuce.core;

import java.io.Serializable;

import io.lettuce.core.internal.LettuceAssert;

/**
* Options for command timeouts. These options configure how and whether commands time out once they were dispatched. Command
* timeout begins:
* <ul>
* <li>When the command is sent successfully to the transport</li>
* <li>Queued while the connection was inactive</li>
* </ul>
*
* The timeout is canceled upon command completion/cancellation. Timeouts are not tied to a specific API and expire commands
* regardless of the synchronization method provided by the API that was used to enqueue the command.
*
* @author Mark Paluch
* @since 5.1
*/
public class AutoBatchFlushOptions implements Serializable {

public static final boolean DEFAULT_ENABLE_AUTO_BATCH_FLUSH = false;

public static final int DEFAULT_WRITE_SPIN_COUNT = 16;

public static final int DEFAULT_BATCH_SIZE = 8;

private final boolean enableAutoBatchFlush;

private final int writeSpinCount;

private final int batchSize;

public AutoBatchFlushOptions(AutoBatchFlushOptions.Builder builder) {
this.enableAutoBatchFlush = builder.enableAutoBatchFlush;
this.writeSpinCount = builder.writeSpinCount;
this.batchSize = builder.batchSize;
}

/**
* Returns a new {@link AutoBatchFlushOptions.Builder} to construct {@link AutoBatchFlushOptions}.
*/
public static Builder builder() {
return new Builder();
}

/**
* Create a new instance of {@link AutoBatchFlushOptions} with default settings.
*/
public static AutoBatchFlushOptions create() {
return builder().build();
}

/**
* Builder for {@link AutoBatchFlushOptions}.
*/
public static class Builder {

private boolean enableAutoBatchFlush = DEFAULT_ENABLE_AUTO_BATCH_FLUSH;

private int writeSpinCount = DEFAULT_WRITE_SPIN_COUNT;

private int batchSize = DEFAULT_BATCH_SIZE;

/**
* Enable auto batch flush.
*
* @param enableAutoBatchFlush {@code true} to enable auto batch flush.
* @return {@code this}
*/
public Builder enableAutoBatchFlush(boolean enableAutoBatchFlush) {
this.enableAutoBatchFlush = enableAutoBatchFlush;
return this;
}

/**
* how many times to spin batchPoll() from the task queue
*
* @param writeSpinCount the write spin count
* @return {@code this}
*/
public Builder writeSpinCount(int writeSpinCount) {
LettuceAssert.isPositive(writeSpinCount, "Batch size must be greater than 0");

this.writeSpinCount = writeSpinCount;
return this;
}

/**
* how many commands to batch in a single flush
*
* @param batchSize the batch size
* @return {@code this}
*/
public Builder batchSize(int batchSize) {
LettuceAssert.isPositive(batchSize, "Batch size must be greater than 0");

this.batchSize = batchSize;
return this;
}

/**
* Create a new instance of {@link AutoBatchFlushOptions}.
*
* @return new instance of {@link AutoBatchFlushOptions}
*/
public AutoBatchFlushOptions build() {
return new AutoBatchFlushOptions(this);
}

}

/**
* @return {@code true} if auto batch flush is enabled.
*/
public boolean isAutoBatchFlushEnabled() {
return enableAutoBatchFlush;
}

/**
* @return the write spin count
*/
public int getWriteSpinCount() {
return writeSpinCount;
}

/**
* @return the batch size
*/
public int getBatchSize() {
return batchSize;
}

}
31 changes: 26 additions & 5 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public class ClientOptions implements Serializable {

public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.create();

public static final AutoBatchFlushOptions DEFAULT_AUTO_BATCH_FLUSH_OPTIONS = AutoBatchFlushOptions.create();

private final boolean autoReconnect;

private final boolean cancelCommandsOnReconnectFailure;
Expand Down Expand Up @@ -97,6 +99,8 @@ public class ClientOptions implements Serializable {

private final TimeoutOptions timeoutOptions;

private final AutoBatchFlushOptions autoBatchFlushOptions;

protected ClientOptions(Builder builder) {
this.autoReconnect = builder.autoReconnect;
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
Expand All @@ -112,6 +116,7 @@ protected ClientOptions(Builder builder) {
this.sslOptions = builder.sslOptions;
this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure;
this.timeoutOptions = builder.timeoutOptions;
this.autoBatchFlushOptions = builder.autoBatchFlushOptions;
}

protected ClientOptions(ClientOptions original) {
Expand All @@ -129,6 +134,7 @@ protected ClientOptions(ClientOptions original) {
this.sslOptions = original.getSslOptions();
this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure();
this.timeoutOptions = original.getTimeoutOptions();
this.autoBatchFlushOptions = original.getAutoBatchFlushOptions();
}

/**
Expand Down Expand Up @@ -192,6 +198,8 @@ public static class Builder {

private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;

private AutoBatchFlushOptions autoBatchFlushOptions = DEFAULT_AUTO_BATCH_FLUSH_OPTIONS;

protected Builder() {
}

Expand Down Expand Up @@ -247,8 +255,8 @@ public Builder bufferUsageRatio(int bufferUsageRatio) {
*
* @param policy the policy to use in {@link io.lettuce.core.protocol.CommandHandler}
* @return {@code this}
* @since 6.0
* @see DecodeBufferPolicies
* @since 6.0
*/
public Builder decodeBufferPolicy(DecodeBufferPolicy policy) {

Expand Down Expand Up @@ -295,8 +303,8 @@ public Builder pingBeforeActivateConnection(boolean pingBeforeActivateConnection
*
* @param protocolVersion version to use.
* @return {@code this}
* @since 6.0
* @see ProtocolVersion#newestSupported()
* @since 6.0
*/
public Builder protocolVersion(ProtocolVersion protocolVersion) {

Expand All @@ -315,9 +323,9 @@ public Builder protocolVersion(ProtocolVersion protocolVersion) {
*
* @param publishOnScheduler true/false
* @return {@code this}
* @since 5.2
* @see org.reactivestreams.Subscriber#onNext(Object)
* @see ClientResources#eventExecutorGroup()
* @since 5.2
*/
public Builder publishOnScheduler(boolean publishOnScheduler) {
this.publishOnScheduler = publishOnScheduler;
Expand Down Expand Up @@ -422,6 +430,17 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) {
return this;
}

/**
* Sets the {@link AutoBatchFlushOptions}
*
* @param autoBatchFlushOptions must not be {@code null}.
*/
public Builder autoBatchFlushOptions(AutoBatchFlushOptions autoBatchFlushOptions) {
LettuceAssert.notNull(autoBatchFlushOptions, "AutoBatchFlushOptions must not be null");
this.autoBatchFlushOptions = autoBatchFlushOptions;
return this;
}

/**
* Create a new instance of {@link ClientOptions}.
*
Expand All @@ -439,7 +458,6 @@ public ClientOptions build() {
*
* @return a {@link ClientOptions.Builder} to create new {@link ClientOptions} whose settings are replicated from the
* current {@link ClientOptions}.
*
* @since 5.1
*/
public ClientOptions.Builder mutate() {
Expand Down Expand Up @@ -498,7 +516,6 @@ public DecodeBufferPolicy getDecodeBufferPolicy() {
*
* @return zero.
* @since 5.2
*
* @deprecated since 6.0 in favor of {@link DecodeBufferPolicy}.
*/
@Deprecated
Expand Down Expand Up @@ -637,6 +654,10 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

public AutoBatchFlushOptions getAutoBatchFlushOptions() {
return autoBatchFlushOptions;
}

/**
* Behavior of connections in disconnected state.
*/
Expand Down
Loading