diff --git a/src/main/java/io/lettuce/core/AutoBatchFlushOptions.java b/src/main/java/io/lettuce/core/AutoBatchFlushOptions.java new file mode 100644 index 000000000..7c61c1d04 --- /dev/null +++ b/src/main/java/io/lettuce/core/AutoBatchFlushOptions.java @@ -0,0 +1,135 @@ +package io.lettuce.core; + +import java.io.Serializable; + +import io.lettuce.core.internal.LettuceAssert; + +/** + * Options for command timeouts. These options configure how and whether commands time out once they were dispatched. Command + * timeout begins: + * + * + * The timeout is canceled upon command completion/cancellation. Timeouts are not tied to a specific API and expire commands + * regardless of the synchronization method provided by the API that was used to enqueue the command. + * + * @author Mark Paluch + * @since 5.1 + */ +public class AutoBatchFlushOptions implements Serializable { + + public static final boolean DEFAULT_ENABLE_AUTO_BATCH_FLUSH = false; + + public static final int DEFAULT_WRITE_SPIN_COUNT = 16; + + public static final int DEFAULT_BATCH_SIZE = 8; + + private final boolean enableAutoBatchFlush; + + private final int writeSpinCount; + + private final int batchSize; + + public AutoBatchFlushOptions(AutoBatchFlushOptions.Builder builder) { + this.enableAutoBatchFlush = builder.enableAutoBatchFlush; + this.writeSpinCount = builder.writeSpinCount; + this.batchSize = builder.batchSize; + } + + /** + * Returns a new {@link AutoBatchFlushOptions.Builder} to construct {@link AutoBatchFlushOptions}. + */ + public static Builder builder() { + return new Builder(); + } + + /** + * Create a new instance of {@link AutoBatchFlushOptions} with default settings. + */ + public static AutoBatchFlushOptions create() { + return builder().build(); + } + + /** + * Builder for {@link AutoBatchFlushOptions}. + */ + public static class Builder { + + private boolean enableAutoBatchFlush = DEFAULT_ENABLE_AUTO_BATCH_FLUSH; + + private int writeSpinCount = DEFAULT_WRITE_SPIN_COUNT; + + private int batchSize = DEFAULT_BATCH_SIZE; + + /** + * Enable auto batch flush. + * + * @param enableAutoBatchFlush {@code true} to enable auto batch flush. + * @return {@code this} + */ + public Builder enableAutoBatchFlush(boolean enableAutoBatchFlush) { + this.enableAutoBatchFlush = enableAutoBatchFlush; + return this; + } + + /** + * how many times to spin batchPoll() from the task queue + * + * @param writeSpinCount the write spin count + * @return {@code this} + */ + public Builder writeSpinCount(int writeSpinCount) { + LettuceAssert.isPositive(writeSpinCount, "Batch size must be greater than 0"); + + this.writeSpinCount = writeSpinCount; + return this; + } + + /** + * how many commands to batch in a single flush + * + * @param batchSize the batch size + * @return {@code this} + */ + public Builder batchSize(int batchSize) { + LettuceAssert.isPositive(batchSize, "Batch size must be greater than 0"); + + this.batchSize = batchSize; + return this; + } + + /** + * Create a new instance of {@link AutoBatchFlushOptions}. + * + * @return new instance of {@link AutoBatchFlushOptions} + */ + public AutoBatchFlushOptions build() { + return new AutoBatchFlushOptions(this); + } + + } + + /** + * @return {@code true} if auto batch flush is enabled. + */ + public boolean isAutoBatchFlushEnabled() { + return enableAutoBatchFlush; + } + + /** + * @return the write spin count + */ + public int getWriteSpinCount() { + return writeSpinCount; + } + + /** + * @return the batch size + */ + public int getBatchSize() { + return batchSize; + } + +} diff --git a/src/main/java/io/lettuce/core/ClientOptions.java b/src/main/java/io/lettuce/core/ClientOptions.java index aa3d2ba18..2b45c4f23 100644 --- a/src/main/java/io/lettuce/core/ClientOptions.java +++ b/src/main/java/io/lettuce/core/ClientOptions.java @@ -69,6 +69,8 @@ public class ClientOptions implements Serializable { public static final TimeoutOptions DEFAULT_TIMEOUT_OPTIONS = TimeoutOptions.create(); + public static final AutoBatchFlushOptions DEFAULT_AUTO_BATCH_FLUSH_OPTIONS = AutoBatchFlushOptions.create(); + private final boolean autoReconnect; private final boolean cancelCommandsOnReconnectFailure; @@ -97,6 +99,8 @@ public class ClientOptions implements Serializable { private final TimeoutOptions timeoutOptions; + private final AutoBatchFlushOptions autoBatchFlushOptions; + protected ClientOptions(Builder builder) { this.autoReconnect = builder.autoReconnect; this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure; @@ -112,6 +116,7 @@ protected ClientOptions(Builder builder) { this.sslOptions = builder.sslOptions; this.suspendReconnectOnProtocolFailure = builder.suspendReconnectOnProtocolFailure; this.timeoutOptions = builder.timeoutOptions; + this.autoBatchFlushOptions = builder.autoBatchFlushOptions; } protected ClientOptions(ClientOptions original) { @@ -129,6 +134,7 @@ protected ClientOptions(ClientOptions original) { this.sslOptions = original.getSslOptions(); this.suspendReconnectOnProtocolFailure = original.isSuspendReconnectOnProtocolFailure(); this.timeoutOptions = original.getTimeoutOptions(); + this.autoBatchFlushOptions = original.getAutoBatchFlushOptions(); } /** @@ -192,6 +198,8 @@ public static class Builder { private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS; + private AutoBatchFlushOptions autoBatchFlushOptions = DEFAULT_AUTO_BATCH_FLUSH_OPTIONS; + protected Builder() { } @@ -247,8 +255,8 @@ public Builder bufferUsageRatio(int bufferUsageRatio) { * * @param policy the policy to use in {@link io.lettuce.core.protocol.CommandHandler} * @return {@code this} - * @since 6.0 * @see DecodeBufferPolicies + * @since 6.0 */ public Builder decodeBufferPolicy(DecodeBufferPolicy policy) { @@ -295,8 +303,8 @@ public Builder pingBeforeActivateConnection(boolean pingBeforeActivateConnection * * @param protocolVersion version to use. * @return {@code this} - * @since 6.0 * @see ProtocolVersion#newestSupported() + * @since 6.0 */ public Builder protocolVersion(ProtocolVersion protocolVersion) { @@ -315,9 +323,9 @@ public Builder protocolVersion(ProtocolVersion protocolVersion) { * * @param publishOnScheduler true/false * @return {@code this} - * @since 5.2 * @see org.reactivestreams.Subscriber#onNext(Object) * @see ClientResources#eventExecutorGroup() + * @since 5.2 */ public Builder publishOnScheduler(boolean publishOnScheduler) { this.publishOnScheduler = publishOnScheduler; @@ -422,6 +430,17 @@ public Builder timeoutOptions(TimeoutOptions timeoutOptions) { return this; } + /** + * Sets the {@link AutoBatchFlushOptions} + * + * @param autoBatchFlushOptions must not be {@code null}. + */ + public Builder autoBatchFlushOptions(AutoBatchFlushOptions autoBatchFlushOptions) { + LettuceAssert.notNull(autoBatchFlushOptions, "AutoBatchFlushOptions must not be null"); + this.autoBatchFlushOptions = autoBatchFlushOptions; + return this; + } + /** * Create a new instance of {@link ClientOptions}. * @@ -439,7 +458,6 @@ public ClientOptions build() { * * @return a {@link ClientOptions.Builder} to create new {@link ClientOptions} whose settings are replicated from the * current {@link ClientOptions}. - * * @since 5.1 */ public ClientOptions.Builder mutate() { @@ -498,7 +516,6 @@ public DecodeBufferPolicy getDecodeBufferPolicy() { * * @return zero. * @since 5.2 - * * @deprecated since 6.0 in favor of {@link DecodeBufferPolicy}. */ @Deprecated @@ -637,6 +654,10 @@ public TimeoutOptions getTimeoutOptions() { return timeoutOptions; } + public AutoBatchFlushOptions getAutoBatchFlushOptions() { + return autoBatchFlushOptions; + } + /** * Behavior of connections in disconnected state. */ diff --git a/src/main/java/io/lettuce/core/ContextualChannel.java b/src/main/java/io/lettuce/core/ContextualChannel.java new file mode 100644 index 000000000..188698e0c --- /dev/null +++ b/src/main/java/io/lettuce/core/ContextualChannel.java @@ -0,0 +1,258 @@ +package io.lettuce.core; + +import java.net.SocketAddress; + +import javax.annotation.Nonnull; + +import io.lettuce.core.context.ConnectionContext; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelConfig; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelId; +import io.netty.channel.ChannelMetadata; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelProgressivePromise; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoop; +import io.netty.util.Attribute; +import io.netty.util.AttributeKey; + +/** + * @author chenxiaofan + */ +public class ContextualChannel implements Channel { + + private final Channel delegate; + + public final ConnectionContext context; + + public ConnectionContext getContext() { + return context; + } + + public Channel getDelegate() { + return delegate; + } + + public ContextualChannel(Channel delegate, ConnectionContext.State initialState) { + this.delegate = delegate; + context = new ConnectionContext(initialState); + } + + @Override + public ChannelId id() { + return delegate.id(); + } + + @Override + public EventLoop eventLoop() { + return delegate.eventLoop(); + } + + @Override + public Channel parent() { + return delegate.parent(); + } + + @Override + public ChannelConfig config() { + return delegate.config(); + } + + @Override + public boolean isOpen() { + return delegate.isOpen(); + } + + @Override + public boolean isRegistered() { + return delegate.isRegistered(); + } + + @Override + public boolean isActive() { + return delegate.isActive(); + } + + @Override + public ChannelMetadata metadata() { + return delegate.metadata(); + } + + @Override + public SocketAddress localAddress() { + return delegate.localAddress(); + } + + @Override + public SocketAddress remoteAddress() { + return delegate.remoteAddress(); + } + + @Override + public ChannelFuture closeFuture() { + return delegate.closeFuture(); + } + + @Override + public boolean isWritable() { + return delegate.isWritable(); + } + + @Override + public long bytesBeforeUnwritable() { + return delegate.bytesBeforeUnwritable(); + } + + @Override + public long bytesBeforeWritable() { + return delegate.bytesBeforeWritable(); + } + + @Override + public Unsafe unsafe() { + return delegate.unsafe(); + } + + @Override + public ChannelPipeline pipeline() { + return delegate.pipeline(); + } + + @Override + public ByteBufAllocator alloc() { + return delegate.alloc(); + } + + @Override + public Channel read() { + return delegate.read(); + } + + @Override + public Channel flush() { + return delegate.flush(); + } + + @Override + public ChannelFuture write(Object o) { + return delegate.write(o); + } + + @Override + public ChannelFuture write(Object o, ChannelPromise channelPromise) { + return delegate.write(o, channelPromise); + } + + @Override + public ChannelFuture writeAndFlush(Object o, ChannelPromise channelPromise) { + return delegate.writeAndFlush(o, channelPromise); + } + + @Override + public ChannelFuture writeAndFlush(Object o) { + return delegate.writeAndFlush(o); + } + + @Override + public ChannelPromise newPromise() { + return delegate.newPromise(); + } + + @Override + public ChannelProgressivePromise newProgressivePromise() { + return delegate.newProgressivePromise(); + } + + @Override + public ChannelFuture newSucceededFuture() { + return delegate.newSucceededFuture(); + } + + @Override + public ChannelFuture newFailedFuture(Throwable throwable) { + return delegate.newFailedFuture(throwable); + } + + @Override + public ChannelPromise voidPromise() { + return delegate.voidPromise(); + } + + @Override + public ChannelFuture bind(SocketAddress socketAddress) { + return delegate.bind(socketAddress); + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress) { + return delegate.connect(socketAddress); + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress, SocketAddress socketAddress1) { + return delegate.connect(socketAddress, socketAddress1); + } + + @Override + public ChannelFuture disconnect() { + return delegate.disconnect(); + } + + @Override + public ChannelFuture close() { + return delegate.close(); + } + + @Override + public ChannelFuture deregister() { + return delegate.deregister(); + } + + @Override + public ChannelFuture bind(SocketAddress socketAddress, ChannelPromise channelPromise) { + return delegate.bind(socketAddress, channelPromise); + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress, ChannelPromise channelPromise) { + return delegate.connect(socketAddress, channelPromise); + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress, SocketAddress socketAddress1, ChannelPromise channelPromise) { + return delegate.connect(socketAddress, socketAddress1, channelPromise); + } + + @Override + public ChannelFuture disconnect(ChannelPromise channelPromise) { + return delegate.disconnect(channelPromise); + } + + @Override + public ChannelFuture close(ChannelPromise channelPromise) { + return delegate.close(channelPromise); + } + + @Override + public ChannelFuture deregister(ChannelPromise channelPromise) { + return delegate.deregister(channelPromise); + } + + @Override + public Attribute attr(AttributeKey attributeKey) { + return delegate.attr(attributeKey); + } + + @Override + public boolean hasAttr(AttributeKey attributeKey) { + return delegate.hasAttr(attributeKey); + } + + @Override + public int compareTo(@Nonnull Channel o) { + return this == o ? 0 : this.id().compareTo(o.id()); + } + +} diff --git a/src/main/java/io/lettuce/core/RedisClient.java b/src/main/java/io/lettuce/core/RedisClient.java index 550c5bf10..917e2dce9 100644 --- a/src/main/java/io/lettuce/core/RedisClient.java +++ b/src/main/java/io/lettuce/core/RedisClient.java @@ -19,8 +19,6 @@ */ package io.lettuce.core; -import static io.lettuce.core.internal.LettuceStrings.*; - import java.net.InetSocketAddress; import java.net.SocketAddress; import java.time.Duration; @@ -41,6 +39,7 @@ import io.lettuce.core.masterreplica.MasterReplica; import io.lettuce.core.protocol.CommandExpiryWriter; import io.lettuce.core.protocol.CommandHandler; +import io.lettuce.core.protocol.DefaultBatchFlushEndpoint; import io.lettuce.core.protocol.DefaultEndpoint; import io.lettuce.core.protocol.Endpoint; import io.lettuce.core.protocol.PushHandler; @@ -55,6 +54,9 @@ import io.netty.util.internal.logging.InternalLoggerFactory; import reactor.core.publisher.Mono; +import static io.lettuce.core.internal.LettuceStrings.isEmpty; +import static io.lettuce.core.internal.LettuceStrings.isNotEmpty; + /** * A scalable and thread-safe Redis client supporting synchronous, asynchronous and reactive * execution models. Multiple threads may share one connection if they avoid blocking and transactional operations such as BLPOP @@ -171,7 +173,6 @@ public static RedisClient create(ClientResources clientResources) { * * @param clientResources the client resources, must not be {@code null} * @param uri the Redis URI, must not be {@code null} - * * @return a new instance of {@link RedisClient} */ public static RedisClient create(ClientResources clientResources, String uri) { @@ -275,8 +276,10 @@ private ConnectionFuture> connectStandalone logger.debug("Trying to get a Redis connection for: {}", redisURI); - DefaultEndpoint endpoint = new DefaultEndpoint(getOptions(), getResources()); - RedisChannelWriter writer = endpoint; + Endpoint endpoint = getOptions().getAutoBatchFlushOptions().isAutoBatchFlushEnabled() + ? new DefaultBatchFlushEndpoint(getOptions(), getResources()) + : new DefaultEndpoint(getOptions(), getResources()); + RedisChannelWriter writer = (RedisChannelWriter) endpoint; if (CommandExpiryWriter.isSupported(getOptions())) { writer = new CommandExpiryWriter(writer, getOptions(), getResources()); diff --git a/src/main/java/io/lettuce/core/cluster/ClusterNodeBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/cluster/ClusterNodeBatchFlushEndpoint.java new file mode 100644 index 000000000..977f6f43f --- /dev/null +++ b/src/main/java/io/lettuce/core/cluster/ClusterNodeBatchFlushEndpoint.java @@ -0,0 +1,55 @@ +/* + * Copyright 2011-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.cluster; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.RedisChannelWriter; +import io.lettuce.core.RedisException; +import io.lettuce.core.protocol.DefaultBatchFlushEndpoint; +import io.lettuce.core.resource.ClientResources; + +/** + * Command handler for node connections within the Redis Cluster context. This handler can requeue commands if it is + * disconnected and closed but has commands in the queue. If the handler was connected it would retry commands using the + * {@literal MOVED} or {@literal ASK} redirection. + * + * @author Mark Paluch + */ +public class ClusterNodeBatchFlushEndpoint extends DefaultBatchFlushEndpoint { + + /** + * Initialize a new instance that handles commands from the supplied queue. + * + * @param clientOptions client options for this connection. + * @param clientResources client resources for this connection. + * @param clusterChannelWriter top-most channel writer. + */ + public ClusterNodeBatchFlushEndpoint(ClientOptions clientOptions, ClientResources clientResources, + RedisChannelWriter clusterChannelWriter) { + super(clientOptions, clientResources, clusterChannelWriter != null ? cmd -> { + if (cmd.isDone()) { + return; + } + + try { + clusterChannelWriter.write(cmd); + } catch (RedisException e) { + cmd.completeExceptionally(e); + } + } : DefaultBatchFlushEndpoint::cancelCommandOnEndpointClose); + } + +} diff --git a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java index a60dfd0d8..b8e244e68 100644 --- a/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java +++ b/src/main/java/io/lettuce/core/cluster/RedisClusterClient.java @@ -38,7 +38,20 @@ import java.util.function.Predicate; import java.util.function.Supplier; -import io.lettuce.core.*; +import io.lettuce.core.AbstractRedisClient; +import io.lettuce.core.ClientOptions; +import io.lettuce.core.CommandListenerWriter; +import io.lettuce.core.ConnectionBuilder; +import io.lettuce.core.ConnectionFuture; +import io.lettuce.core.ConnectionState; +import io.lettuce.core.ReadFrom; +import io.lettuce.core.RedisChannelHandler; +import io.lettuce.core.RedisChannelWriter; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.RedisException; +import io.lettuce.core.RedisURI; +import io.lettuce.core.SslConnectionBuilder; +import io.lettuce.core.StatefulRedisConnectionImpl; import io.lettuce.core.api.StatefulRedisConnection; import io.lettuce.core.cluster.api.NodeSelectionSupport; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; @@ -63,6 +76,7 @@ import io.lettuce.core.protocol.CommandExpiryWriter; import io.lettuce.core.protocol.CommandHandler; import io.lettuce.core.protocol.DefaultEndpoint; +import io.lettuce.core.protocol.Endpoint; import io.lettuce.core.protocol.PushHandler; import io.lettuce.core.pubsub.PubSubCommandHandler; import io.lettuce.core.pubsub.PubSubEndpoint; @@ -145,12 +159,12 @@ * possible. * * @author Mark Paluch - * @since 3.0 * @see RedisURI * @see StatefulRedisClusterConnection * @see RedisCodec * @see ClusterClientOptions * @see ClientResources + * @since 3.0 */ public class RedisClusterClient extends AbstractRedisClient { @@ -540,9 +554,11 @@ ConnectionFuture> connectToNodeAsync(RedisC assertNotEmpty(initialUris); LettuceAssert.notNull(socketAddressSupplier, "SocketAddressSupplier must not be null"); - ClusterNodeEndpoint endpoint = new ClusterNodeEndpoint(getClusterClientOptions(), getResources(), clusterWriter); + Endpoint endpoint = getClusterClientOptions().getAutoBatchFlushOptions().isAutoBatchFlushEnabled() + ? new ClusterNodeBatchFlushEndpoint(getClusterClientOptions(), getResources(), clusterWriter) + : new ClusterNodeEndpoint(getClusterClientOptions(), getResources(), clusterWriter); - RedisChannelWriter writer = endpoint; + RedisChannelWriter writer = (RedisChannelWriter) endpoint; if (CommandExpiryWriter.isSupported(getClusterClientOptions())) { writer = new CommandExpiryWriter(writer, getClusterClientOptions(), getResources()); @@ -814,7 +830,7 @@ private , S> Connection */ @SuppressWarnings("unchecked") private , S> ConnectionFuture connectStatefulAsync(T connection, - DefaultEndpoint endpoint, RedisURI connectionSettings, Mono socketAddressSupplier, + Endpoint endpoint, RedisURI connectionSettings, Mono socketAddressSupplier, Supplier commandHandlerSupplier) { ConnectionBuilder connectionBuilder = createConnectionBuilder(connection, connection.getConnectionState(), endpoint, @@ -826,7 +842,7 @@ private , S> ConnectionFuture< } private ConnectionBuilder createConnectionBuilder(RedisChannelHandler connection, ConnectionState state, - DefaultEndpoint endpoint, RedisURI connectionSettings, Mono socketAddressSupplier, + Endpoint endpoint, RedisURI connectionSettings, Mono socketAddressSupplier, Supplier commandHandlerSupplier) { ConnectionBuilder connectionBuilder; diff --git a/src/main/java/io/lettuce/core/constant/DummyContextualChannelInstances.java b/src/main/java/io/lettuce/core/constant/DummyContextualChannelInstances.java new file mode 100644 index 000000000..c4d08bee6 --- /dev/null +++ b/src/main/java/io/lettuce/core/constant/DummyContextualChannelInstances.java @@ -0,0 +1,25 @@ +package io.lettuce.core.constant; + +import io.lettuce.core.ContextualChannel; +import io.lettuce.core.context.ConnectionContext; + +/** + * @author chenxiaofan + */ +public class DummyContextualChannelInstances { + + private DummyContextualChannelInstances() { + } + + public static final ContextualChannel CHANNEL_WILL_RECONNECT = new ContextualChannel(null, + ConnectionContext.State.WILL_RECONNECT); + + public static final ContextualChannel CHANNEL_CONNECTING = new ContextualChannel(null, ConnectionContext.State.CONNECTING); + + public static final ContextualChannel CHANNEL_RECONNECT_FAILED = new ContextualChannel(null, + ConnectionContext.State.RECONNECT_FAILED); + + public static final ContextualChannel CHANNEL_ENDPOINT_CLOSED = new ContextualChannel(null, + ConnectionContext.State.ENDPOINT_CLOSED); + +} diff --git a/src/main/java/io/lettuce/core/context/BatchFlushEndPointContext.java b/src/main/java/io/lettuce/core/context/BatchFlushEndPointContext.java new file mode 100644 index 000000000..1939c62a5 --- /dev/null +++ b/src/main/java/io/lettuce/core/context/BatchFlushEndPointContext.java @@ -0,0 +1,130 @@ +package io.lettuce.core.context; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import io.lettuce.core.datastructure.queue.unmodifiabledeque.UnmodifiableDeque; +import io.lettuce.core.protocol.RedisCommand; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * @author chenxiaofan + */ +public class BatchFlushEndPointContext { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(BatchFlushEndPointContext.class); + + public static class HasOngoingSendLoop { + + /** + * Used in multi-threaded environment, can be used to synchronize between threads. + */ + final AtomicInteger safe; + + /** + * Used in single thread. + */ + boolean unsafe; + + public HasOngoingSendLoop() { + safe = new AtomicInteger(); + unsafe = false; + } + + /** + * Try enter loop with the memory semantic getVolatile + * + * @return true if entered the loop, false if already have a running loop. + */ + public boolean tryEnter() { + return safe.get() == 0 && /* rare case if QPS is high */ safe.compareAndSet(0, 1); + } + + public void exit() { + safe.set(0); + } + + } + + BatchFlushEndPointContext() { + } + + /** + * Tasks that failed to send (probably due to connection errors) + */ + @Nullable + Deque> retryableFailedToSendTasks = null; + + Throwable firstDiscontinueReason = null; + + public Throwable getFirstDiscontinueReason() { + return firstDiscontinueReason; + } + + private int flyingTaskNum; + + @SuppressWarnings("unused") + public int getFlyingTaskNum() { + return flyingTaskNum; + } + + private int total = 0; + + public int getTotal() { + return total; + } + + public final HasOngoingSendLoop hasOngoingSendLoop = new HasOngoingSendLoop(); + + public void add(int n) { + this.total += n; + this.flyingTaskNum += n; + } + + public @Nullable Deque> getAndClearRetryableFailedToSendTasks() { + final Deque> old = this.retryableFailedToSendTasks; + // don't set to null so give us a chance to expose potential bugs if there is addRetryableFailedToSendTask() afterwards + this.retryableFailedToSendTasks = UnmodifiableDeque.emptyDeque(); + return old; + } + + public void done(int n) { + this.flyingTaskNum -= n; + } + + public boolean isDone() { + if (this.flyingTaskNum < 0) { + logger.error("[unexpected] flyingTaskNum < 0, flyingTaskNum: {}, total: {}", this.flyingTaskNum, this.total); + return true; + } + return this.flyingTaskNum == 0; + } + + public boolean hasRetryableFailedToSendTasks() { + return retryableFailedToSendTasks != null; + } + + /** + * @param retryableTask retryable task + * @param cause fail reason + * @return true if this is the first retryable failed task + */ + public boolean addRetryableFailedToSendTask(RedisCommand retryableTask, @Nonnull Throwable cause) { + if (retryableFailedToSendTasks == null) { + retryableFailedToSendTasks = new ArrayDeque<>(); + retryableFailedToSendTasks.add(retryableTask); + + firstDiscontinueReason = cause; + return true; + } + + retryableFailedToSendTasks.add(retryableTask); + return false; + } + +} diff --git a/src/main/java/io/lettuce/core/context/ConnectionContext.java b/src/main/java/io/lettuce/core/context/ConnectionContext.java new file mode 100644 index 000000000..796674eaf --- /dev/null +++ b/src/main/java/io/lettuce/core/context/ConnectionContext.java @@ -0,0 +1,107 @@ +package io.lettuce.core.context; + +import java.util.Deque; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import io.lettuce.core.RedisException; +import io.lettuce.core.protocol.RedisCommand; + +/** + * Should be accessed by the event loop thread only. + * + * @author chenxiaofan + */ +public class ConnectionContext { + + public static class CloseStatus { + + private final boolean willReconnect; + + private Deque> retryablePendingCommands; + + private final RedisException channelClosed; + + public CloseStatus(boolean willReconnect, Deque> retryablePendingCommands, + RedisException channelClosed) { + this.willReconnect = willReconnect; + this.retryablePendingCommands = retryablePendingCommands; + this.channelClosed = channelClosed; + } + + public boolean isWillReconnect() { + return willReconnect; + } + + public @Nullable Deque> getAndClearRetryablePendingCommands() { + final Deque> old = this.retryablePendingCommands; + this.retryablePendingCommands = null; + return old; + } + + public Exception getErr() { + return channelClosed; + } + + @Override + public String toString() { + return "CloseStatus{willReconnect=" + willReconnect + ", clientCloseReason=" + channelClosed + '}'; + } + + } + + public enum State { + + WILL_RECONNECT, CONNECTING, CONNECTED, + /** + * The client is closed. NOTE: this is different from connection closed. + */ + ENDPOINT_CLOSED, RECONNECT_FAILED; + + public boolean isConnected() { + return this == CONNECTED; + } + + } + + public final State initialState; + + public final BatchFlushEndPointContext batchFlushEndPointContext; + + public ConnectionContext(State initialState) { + this.initialState = initialState; + this.batchFlushEndPointContext = new BatchFlushEndPointContext(); + } + + /* below fields must be accessed by the event loop thread only */ + @Nullable + private CloseStatus closeStatus = null; + + public void setCloseStatus(@Nonnull CloseStatus closeStatus) { + this.closeStatus = closeStatus; + } + + public @Nullable CloseStatus getCloseStatus() { + return closeStatus; + } + + public boolean isChannelInactiveEventFired() { + return closeStatus != null; + } + + private boolean channelQuiescent = false; + + public boolean isChannelQuiescent() { + return channelQuiescent; + } + + public boolean setChannelQuiescentOnce() { + if (channelQuiescent) { + return false; + } + channelQuiescent = true; + return true; + } + +} diff --git a/src/main/java/io/lettuce/core/datastructure/queue/offerfirst/UnboundedMpscOfferFirstQueue.java b/src/main/java/io/lettuce/core/datastructure/queue/offerfirst/UnboundedMpscOfferFirstQueue.java new file mode 100644 index 000000000..42b3e7cee --- /dev/null +++ b/src/main/java/io/lettuce/core/datastructure/queue/offerfirst/UnboundedMpscOfferFirstQueue.java @@ -0,0 +1,36 @@ +package io.lettuce.core.datastructure.queue.offerfirst; + +import java.util.Deque; + +import javax.annotation.Nullable; + +/** + * @author chenxiaofan + */ +public interface UnboundedMpscOfferFirstQueue { + + /** + * add element to the tail of the queue. The method is concurrent safe. + */ + void offer(E e); + + /** + * add all elements to the head of the queue. + *

+ * Should only be called from the single consumer thread. + * + * @param q a queue to add + */ + void offerFirstAll(@Nullable Deque q); + + /** + * poll the first element from the head of the queue. + *

+ * Should only be called from the single consumer thread. + * + * @return null if the queue is empty else the first element of the queue + */ + @Nullable + E poll(); + +} diff --git a/src/main/java/io/lettuce/core/datastructure/queue/offerfirst/impl/ConcurrentLinkedMpscOfferFirstQueue.java b/src/main/java/io/lettuce/core/datastructure/queue/offerfirst/impl/ConcurrentLinkedMpscOfferFirstQueue.java new file mode 100644 index 000000000..5def1b546 --- /dev/null +++ b/src/main/java/io/lettuce/core/datastructure/queue/offerfirst/impl/ConcurrentLinkedMpscOfferFirstQueue.java @@ -0,0 +1,45 @@ +package io.lettuce.core.datastructure.queue.offerfirst.impl; + +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; + +import javax.annotation.Nullable; + +import io.lettuce.core.datastructure.queue.offerfirst.UnboundedMpscOfferFirstQueue; + +/** + * @author chenxiaofan + */ +public class ConcurrentLinkedMpscOfferFirstQueue implements UnboundedMpscOfferFirstQueue { + + private final ConcurrentLinkedDeque delegate; + + public ConcurrentLinkedMpscOfferFirstQueue() { + this.delegate = new ConcurrentLinkedDeque<>(); + } + + @Override + public void offer(E e) { + delegate.offer(e); + } + + @Override + public void offerFirstAll(@Nullable Deque q) { + if (q == null) { + return; + } + while (true) { + E e = q.pollLast(); + if (e == null) { + break; + } + delegate.offerFirst(e); + } + } + + @Override + public E poll() { + return delegate.poll(); + } + +} diff --git a/src/main/java/io/lettuce/core/datastructure/queue/offerfirst/impl/JcToolsUnboundedMpscOfferFirstQueue.java b/src/main/java/io/lettuce/core/datastructure/queue/offerfirst/impl/JcToolsUnboundedMpscOfferFirstQueue.java new file mode 100644 index 000000000..feaa8d2ee --- /dev/null +++ b/src/main/java/io/lettuce/core/datastructure/queue/offerfirst/impl/JcToolsUnboundedMpscOfferFirstQueue.java @@ -0,0 +1,64 @@ +package io.lettuce.core.datastructure.queue.offerfirst.impl; + +import java.util.Deque; +import java.util.LinkedList; +import java.util.Objects; +import java.util.Queue; + +import javax.annotation.Nullable; + +import io.lettuce.core.datastructure.queue.offerfirst.UnboundedMpscOfferFirstQueue; +import io.netty.util.internal.PlatformDependent; + +/** + * @author chenxiaofan + */ +public class JcToolsUnboundedMpscOfferFirstQueue implements UnboundedMpscOfferFirstQueue { + + /** + * The queues can only be manipulated in a single thread env. + */ + private final LinkedList> unsafeQueues = new LinkedList<>(); + + private final Queue mpscQueue = PlatformDependent.newMpscQueue(); + + @Override + public void offer(E e) { + mpscQueue.offer(e); + } + + /** + * must call from consumer thread. + * + * @param q an queue to add + */ + @Override + public void offerFirstAll(@Nullable Deque q) { + if (q != null && !q.isEmpty()) { + unsafeQueues.addFirst(q); + } + } + + /** + * Must call from the consumer thread. + * + * @return last element of the queue or null if the queue is empty + */ + @Override + public E poll() { + if (!unsafeQueues.isEmpty()) { + return pollFromUnsafeQueues(); + } + return mpscQueue.poll(); + } + + private E pollFromUnsafeQueues() { + Queue first = unsafeQueues.getFirst(); + E e = first.poll(); + if (first.isEmpty()) { + unsafeQueues.removeFirst(); + } + return Objects.requireNonNull(e); + } + +} diff --git a/src/main/java/io/lettuce/core/datastructure/queue/unmodifiabledeque/UnmodifiableDeque.java b/src/main/java/io/lettuce/core/datastructure/queue/unmodifiabledeque/UnmodifiableDeque.java new file mode 100644 index 000000000..31a0337ac --- /dev/null +++ b/src/main/java/io/lettuce/core/datastructure/queue/unmodifiabledeque/UnmodifiableDeque.java @@ -0,0 +1,211 @@ +package io.lettuce.core.datastructure.queue.unmodifiabledeque; + +import javax.annotation.Nonnull; +import java.util.ArrayDeque; +import java.util.Collection; +import java.util.Deque; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * @param + * @author chenxiaofan + */ +public class UnmodifiableDeque implements Deque { + + private static final UnmodifiableDeque EMPTY_DEQUEUE = new UnmodifiableDeque<>(new ArrayDeque<>()); + + private final Deque delegate; + + public UnmodifiableDeque(Deque delegate) { + this.delegate = delegate; + } + + @SuppressWarnings("unchecked") + public static Deque emptyDeque() { + return (Deque) EMPTY_DEQUEUE; + } + + /* + * unmodifiable throw unsupported exception for all write methods and generate delegate methods for all read methods + */ + @Override + public void addFirst(E e) { + throw new UnsupportedOperationException(); + } + + @Override + public void addLast(E e) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean offerFirst(E e) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean offerLast(E e) { + throw new UnsupportedOperationException(); + } + + @Override + public E removeFirst() { + throw new UnsupportedOperationException(); + } + + @Override + public E removeLast() { + throw new UnsupportedOperationException(); + } + + @Override + public E pollFirst() { + throw new UnsupportedOperationException(); + } + + @Override + public E pollLast() { + throw new UnsupportedOperationException(); + } + + @Override + public E getFirst() { + return delegate.getFirst(); + } + + @Override + public E getLast() { + return delegate.getLast(); + } + + @Override + public E peekFirst() { + return delegate.peekFirst(); + } + + @Override + public E peekLast() { + return delegate.peekLast(); + } + + @Override + public boolean removeFirstOccurrence(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeLastOccurrence(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean add(E e) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean offer(E e) { + throw new UnsupportedOperationException(); + } + + @Override + public E remove() { + throw new UnsupportedOperationException(); + } + + @Override + public E poll() { + throw new UnsupportedOperationException(); + } + + @Override + public E element() { + final E e = peek(); + if (e == null) { + throw new NoSuchElementException(); + } + return e; + } + + @Override + public E peek() { + return delegate.peek(); + } + + @Override + public boolean addAll(Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean removeAll(@Nonnull Collection ignored) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean retainAll(@Nonnull Collection c) { + throw new UnsupportedOperationException(); + } + + @Override + public void clear() { + throw new UnsupportedOperationException(); + } + + @Override + public void push(E e) { + throw new UnsupportedOperationException(); + } + + @Override + public E pop() { + throw new UnsupportedOperationException(); + } + + @Override + public boolean remove(Object o) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean containsAll(@Nonnull Collection c) { + return delegate.containsAll(c); + } + + @Override + public boolean contains(Object o) { + return delegate.contains(o); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public Iterator iterator() { + return delegate.iterator(); + } + + @Override + public Object[] toArray() { + return delegate.toArray(); + } + + @Override + public T[] toArray(@Nonnull T[] a) { + return delegate.toArray(a); + } + + @Override + public Iterator descendingIterator() { + return delegate.descendingIterator(); + } + +} diff --git a/src/main/java/io/lettuce/core/internal/LettuceAssert.java b/src/main/java/io/lettuce/core/internal/LettuceAssert.java index e41a13df4..f468d3b0a 100644 --- a/src/main/java/io/lettuce/core/internal/LettuceAssert.java +++ b/src/main/java/io/lettuce/core/internal/LettuceAssert.java @@ -237,4 +237,10 @@ public static void assertState(boolean condition, Supplier messageSuppli } } + public static void isPositive(int writeSpinCount, String writeSpinCountIsNotPositive) { + if (writeSpinCount <= 0) { + throw new IllegalArgumentException(writeSpinCountIsNotPositive); + } + } + } diff --git a/src/main/java/io/lettuce/core/protocol/BatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/BatchFlushEndpoint.java new file mode 100644 index 000000000..1878bf853 --- /dev/null +++ b/src/main/java/io/lettuce/core/protocol/BatchFlushEndpoint.java @@ -0,0 +1,27 @@ +package io.lettuce.core.protocol; + +import java.util.Deque; + +import io.netty.channel.Channel; + +/** + * @author chenxiaofan + */ +public interface BatchFlushEndpoint extends Endpoint { + + @Override + default void notifyDrainQueuedCommands(HasQueuedCommands queuedCommands) { + throw new UnsupportedOperationException(); + } + + /** + * Merge Endpoint#notifyChannelInactive(Channel) and Endpoint#notifyDrainQueuedCommands(HasQueuedCommands) + * + * @param channel the channel + * @param retryableQueuedCommands retryable queued commands in command handler + */ + void notifyChannelInactiveAfterWatchdogDecision(Channel channel, Deque> retryableQueuedCommands); + + void notifyReconnectFailed(Throwable throwable); + +} diff --git a/src/main/java/io/lettuce/core/protocol/CommandHandler.java b/src/main/java/io/lettuce/core/protocol/CommandHandler.java index 83bf30402..340e77614 100644 --- a/src/main/java/io/lettuce/core/protocol/CommandHandler.java +++ b/src/main/java/io/lettuce/core/protocol/CommandHandler.java @@ -19,8 +19,6 @@ */ package io.lettuce.core.protocol; -import static io.lettuce.core.ConnectionEvents.*; - import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; @@ -28,6 +26,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; +import java.util.Deque; import java.util.LinkedHashSet; import java.util.List; import java.util.Queue; @@ -39,6 +38,7 @@ import io.lettuce.core.RedisException; import io.lettuce.core.api.push.PushListener; import io.lettuce.core.api.push.PushMessage; +import io.lettuce.core.datastructure.queue.unmodifiabledeque.UnmodifiableDeque; import io.lettuce.core.internal.LettuceAssert; import io.lettuce.core.internal.LettuceSets; import io.lettuce.core.metrics.CommandLatencyRecorder; @@ -63,6 +63,8 @@ import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import static io.lettuce.core.ConnectionEvents.Reset; + /** * A netty {@link ChannelHandler} responsible for writing redis commands and reading responses from the server. * @@ -94,6 +96,8 @@ public class CommandHandler extends ChannelDuplexHandler implements HasQueuedCom private final Endpoint endpoint; + private final boolean supportsBatchFlush; + private final ArrayDeque> stack = new ArrayDeque<>(); private final long commandHandlerId = COMMAND_HANDLER_COUNTER.incrementAndGet(); @@ -150,6 +154,7 @@ public CommandHandler(ClientOptions clientOptions, ClientResources clientResourc this.clientOptions = clientOptions; this.clientResources = clientResources; this.endpoint = endpoint; + this.supportsBatchFlush = endpoint instanceof BatchFlushEndpoint; this.commandLatencyRecorder = clientResources.commandLatencyRecorder(); this.latencyMetricsEnabled = commandLatencyRecorder.isEnabled(); this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE; @@ -185,6 +190,19 @@ void setBuffer(ByteBuf buffer) { return drainCommands(stack); } + private Deque> drainStack() { + final Deque> target = new ArrayDeque<>(stack.size()); + + RedisCommand cmd; + while ((cmd = stack.poll()) != null) { + if (!cmd.isDone() && !ActivationCommand.isActivationCommand(cmd)) { + target.add(cmd); + } + } + + return target; + } + protected LifecycleState getState() { return lifecycleState; } @@ -359,7 +377,12 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { setState(LifecycleState.DEACTIVATING); endpoint.notifyChannelInactive(ctx.channel()); - endpoint.notifyDrainQueuedCommands(this); + Deque> batchFlushRetryableDrainQueuedCommands = UnmodifiableDeque.emptyDeque(); + if (supportsBatchFlush) { + batchFlushRetryableDrainQueuedCommands = drainStack(); + } else { + endpoint.notifyDrainQueuedCommands(this); + } setState(LifecycleState.DEACTIVATED); @@ -373,6 +396,12 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { } super.channelInactive(ctx); + + if (supportsBatchFlush) { + // Needs decision of watchdog + ((BatchFlushEndpoint) endpoint).notifyChannelInactiveAfterWatchdogDecision(ctx.channel(), + batchFlushRetryableDrainQueuedCommands); + } } /** diff --git a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java index 84bcb41f1..d23c9adc7 100644 --- a/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java +++ b/src/main/java/io/lettuce/core/protocol/ConnectionWatchdog.java @@ -21,12 +21,11 @@ import java.net.SocketAddress; import java.time.Duration; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import reactor.core.publisher.Mono; -import reactor.util.function.Tuple2; import io.lettuce.core.ClientOptions; import io.lettuce.core.ConnectionBuilder; import io.lettuce.core.ConnectionEvents; @@ -50,6 +49,8 @@ import io.netty.util.internal.logging.InternalLogLevel; import io.netty.util.internal.logging.InternalLogger; import io.netty.util.internal.logging.InternalLoggerFactory; +import reactor.core.publisher.Mono; +import reactor.util.function.Tuple2; /** * A netty {@link ChannelHandler} responsible for monitoring the channel and reconnecting when the connection is lost. @@ -83,6 +84,10 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { private final String epid; + private final boolean useBatchFlushEndpoint; + + private final Endpoint endpoint; + private Channel channel; private SocketAddress remoteAddress; @@ -101,6 +106,8 @@ public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { private volatile Timeout reconnectScheduleTimeout; + private Runnable doReconnectOnEndpointQuiescence; + /** * Create a new watchdog that adds to new connections to the supplied {@link ChannelGroup} and establishes a new * {@link Channel} when disconnected, while reconnect is true. The socketAddressSupplier can supply the reconnect address. @@ -141,6 +148,8 @@ public ConnectionWatchdog(Delay reconnectDelay, ClientOptions clientOptions, Boo this.eventBus = eventBus; this.redisUri = (String) bootstrap.config().attrs().get(ConnectionBuilder.REDIS_URI); this.epid = endpoint.getId(); + this.endpoint = endpoint; + this.useBatchFlushEndpoint = endpoint instanceof BatchFlushEndpoint; Mono wrappedSocketAddressSupplier = socketAddressSupplier.doOnNext(addr -> remoteAddress = addr) .onErrorResume(t -> { @@ -195,6 +204,7 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { + doReconnectOnEndpointQuiescence = null; logger.debug("{} channelInactive()", logPrefix()); if (!armed) { @@ -205,7 +215,21 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { channel = null; if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) { - scheduleReconnect(); + if (!isEventLoopGroupActive()) { + logger.debug("isEventLoopGroupActive() == false"); + return; + } + + if (!isListenOnChannelInactive()) { + logger.debug("Skip reconnect scheduling, listener disabled"); + return; + } + + doReconnectOnEndpointQuiescence = this::scheduleReconnect; + if (!useBatchFlushEndpoint) { + doReconnectOnEndpointQuiescence.run(); + } + // otherwise, will be called later by BatchFlushEndpoint#onEndpointQuiescence } else { logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx); } @@ -213,6 +237,10 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); } + void reconnectOnEndpointQuiescence() { + doReconnectOnEndpointQuiescence.run(); + } + /** * Enable {@link ConnectionWatchdog} to listen for disconnected events. */ @@ -230,11 +258,13 @@ public void scheduleReconnect() { if (!isEventLoopGroupActive()) { logger.debug("isEventLoopGroupActive() == false"); + notifyEndpointFailedToConnectIfNeeded(); return; } if (!isListenOnChannelInactive()) { logger.debug("Skip reconnect scheduling, listener disabled"); + notifyEndpointFailedToConnectIfNeeded(); return; } @@ -252,6 +282,7 @@ public void scheduleReconnect() { if (!isEventLoopGroupActive()) { logger.warn("Cannot execute scheduled reconnect timer, reconnect workers are terminated"); + notifyEndpointFailedToConnectIfNeeded(); return; } @@ -267,6 +298,17 @@ public void scheduleReconnect() { } } else { logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix()); + notifyEndpointFailedToConnectIfNeeded(); + } + } + + private void notifyEndpointFailedToConnectIfNeeded() { + notifyEndpointFailedToConnectIfNeeded(new CancellationException()); + } + + private void notifyEndpointFailedToConnectIfNeeded(Exception e) { + if (useBatchFlushEndpoint) { + ((BatchFlushEndpoint) endpoint).notifyReconnectFailed(e); } } @@ -275,7 +317,6 @@ public void scheduleReconnect() { * the same handler instances contained in the old channel's pipeline. * * @param attempt attempt counter - * * @throws Exception when reconnection fails. */ public void run(int attempt) throws Exception { @@ -288,7 +329,6 @@ public void run(int attempt) throws Exception { * * @param attempt attempt counter. * @param delay retry delay. - * * @throws Exception when reconnection fails. */ private void run(int attempt, Duration delay) throws Exception { @@ -298,16 +338,19 @@ private void run(int attempt, Duration delay) throws Exception { if (!isEventLoopGroupActive()) { logger.debug("isEventLoopGroupActive() == false"); + notifyEndpointFailedToConnectIfNeeded(); return; } if (!isListenOnChannelInactive()) { logger.debug("Skip reconnect scheduling, listener disabled"); + notifyEndpointFailedToConnectIfNeeded(); return; } if (isReconnectSuspended()) { logger.debug("Skip reconnect scheduling, reconnect is suspended"); + notifyEndpointFailedToConnectIfNeeded(); return; } @@ -363,11 +406,14 @@ private void run(int attempt, Duration delay) throws Exception { if (!isReconnectSuspended()) { scheduleReconnect(); + } else { + notifyEndpointFailedToConnectIfNeeded(); } }); } catch (Exception e) { logger.log(warnLevel, "Cannot reconnect: {}", e.toString()); eventBus.publish(new ReconnectFailedEvent(redisUri, epid, LocalAddress.ANY, remoteAddress, e, attempt)); + notifyEndpointFailedToConnectIfNeeded(e); } } @@ -436,4 +482,8 @@ private String logPrefix() { return logPrefix = buffer; } + public boolean willReconnect() { + return doReconnectOnEndpointQuiescence != null; + } + } diff --git a/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java b/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java new file mode 100644 index 000000000..54cbd6c51 --- /dev/null +++ b/src/main/java/io/lettuce/core/protocol/DefaultBatchFlushEndpoint.java @@ -0,0 +1,1097 @@ +/* + * Copyright 2011-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.lettuce.core.protocol; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Deque; +import java.util.HashSet; +import java.util.List; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.function.Consumer; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import io.lettuce.core.ClientOptions; +import io.lettuce.core.ConnectionEvents; +import io.lettuce.core.ContextualChannel; +import io.lettuce.core.RedisChannelWriter; +import io.lettuce.core.RedisConnectionException; +import io.lettuce.core.RedisException; +import io.lettuce.core.api.push.PushListener; +import io.lettuce.core.constant.DummyContextualChannelInstances; +import io.lettuce.core.context.BatchFlushEndPointContext; +import io.lettuce.core.context.ConnectionContext; +import io.lettuce.core.datastructure.queue.offerfirst.UnboundedMpscOfferFirstQueue; +import io.lettuce.core.datastructure.queue.offerfirst.impl.JcToolsUnboundedMpscOfferFirstQueue; +import io.lettuce.core.internal.Futures; +import io.lettuce.core.internal.LettuceAssert; +import io.lettuce.core.resource.ClientResources; +import io.lettuce.core.utils.ExceptionUtils; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.EventLoop; +import io.netty.handler.codec.EncoderException; +import io.netty.util.Recycler; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import io.netty.util.internal.logging.InternalLogger; +import io.netty.util.internal.logging.InternalLoggerFactory; + +/** + * Default {@link Endpoint} implementation. + * + * @author Mark Paluch + */ +public class DefaultBatchFlushEndpoint implements RedisChannelWriter, BatchFlushEndpoint, PushHandler { + + private static final InternalLogger logger = InternalLoggerFactory.getInstance(BatchFlushEndpoint.class); + + private static final AtomicLong ENDPOINT_COUNTER = new AtomicLong(); + + private static final AtomicReferenceFieldUpdater CHANNEL = AtomicReferenceFieldUpdater + .newUpdater(DefaultBatchFlushEndpoint.class, ContextualChannel.class, "channel"); + + private static final AtomicIntegerFieldUpdater QUEUE_SIZE = AtomicIntegerFieldUpdater + .newUpdater(DefaultBatchFlushEndpoint.class, "queueSize"); + + private static final AtomicIntegerFieldUpdater STATUS = AtomicIntegerFieldUpdater + .newUpdater(DefaultBatchFlushEndpoint.class, "status"); + + private static final int ST_OPEN = 0; + + private static final int ST_CLOSED = 1; + + private static final Set> SHOULD_NOT_RETRY_EXCEPTION_TYPES = new HashSet<>(); + + static { + SHOULD_NOT_RETRY_EXCEPTION_TYPES.add(EncoderException.class); + SHOULD_NOT_RETRY_EXCEPTION_TYPES.add(Error.class); + } + + private static boolean isRejectCommand(ClientOptions clientOptions) { + switch (clientOptions.getDisconnectedBehavior()) { + case REJECT_COMMANDS: + return true; + case ACCEPT_COMMANDS: + throw new UnsupportedOperationException("ACCEPT_COMMANDS is not supported"); + case DEFAULT: + return !clientOptions.isAutoReconnect(); + default: + throw new IllegalStateException("Unknown disconnected behavior: " + clientOptions.getDisconnectedBehavior()); + } + } + + protected static void cancelCommandOnEndpointClose(RedisCommand cmd) { + if (cmd.isDone()) { + return; + } + + if (cmd.getOutput() != null) { + cmd.getOutput().setError("endpoint closed"); + } + cmd.cancel(); + } + + private final Reliability reliability; + + private final ClientOptions clientOptions; + + private final ClientResources clientResources; + + private final boolean boundedQueues; + + // access via QUEUE_SIZE + private volatile int queueSize = 0; + + // access via STATUS + private volatile int status = ST_OPEN; + // access via CHANNEL + + protected volatile @Nonnull ContextualChannel channel = DummyContextualChannelInstances.CHANNEL_CONNECTING; + + private final Consumer> callbackOnClose; + + private final boolean rejectCommandsWhileDisconnected; + + private final List pushListeners = new CopyOnWriteArrayList<>(); + + private final boolean debugEnabled = logger.isDebugEnabled(); + + protected final CompletableFuture closeFuture = new CompletableFuture<>(); + + private String logPrefix; + + private boolean autoFlushCommands = true; + + private boolean inActivation = false; + + protected @Nullable ConnectionWatchdog connectionWatchdog; + + private ConnectionFacade connectionFacade; + + private volatile Throwable connectionError; + + private final String cachedEndpointId; + + protected final UnboundedMpscOfferFirstQueue taskQueue; + + private final boolean canFire; + + private volatile boolean inProtectMode; + + private volatile Throwable failedToReconnectReason; + + private volatile EventLoop lastEventLoop = null; + + private final int writeSpinCount; + + private final int batchSize; + + /** + * Create a new {@link BatchFlushEndpoint}. + * + * @param clientOptions client options for this connection, must not be {@code null}. + * @param clientResources client resources for this connection, must not be {@code null}. + */ + public DefaultBatchFlushEndpoint(ClientOptions clientOptions, ClientResources clientResources) { + this(clientOptions, clientResources, DefaultBatchFlushEndpoint::cancelCommandOnEndpointClose); + } + + protected DefaultBatchFlushEndpoint(ClientOptions clientOptions, ClientResources clientResources, + Consumer> callbackOnClose) { + + LettuceAssert.notNull(clientOptions, "ClientOptions must not be null"); + LettuceAssert.notNull(clientOptions, "ClientResources must not be null"); + + this.clientOptions = clientOptions; + this.clientResources = clientResources; + this.reliability = clientOptions.isAutoReconnect() ? Reliability.AT_LEAST_ONCE : Reliability.AT_MOST_ONCE; + this.boundedQueues = clientOptions.getRequestQueueSize() != Integer.MAX_VALUE; + this.rejectCommandsWhileDisconnected = isRejectCommand(clientOptions); + long endpointId = ENDPOINT_COUNTER.incrementAndGet(); + this.cachedEndpointId = "0x" + Long.toHexString(endpointId); + this.taskQueue = new JcToolsUnboundedMpscOfferFirstQueue<>(); + this.canFire = false; + this.callbackOnClose = callbackOnClose; + this.writeSpinCount = clientOptions.getAutoBatchFlushOptions().getWriteSpinCount(); + this.batchSize = clientOptions.getAutoBatchFlushOptions().getBatchSize(); + } + + @Override + public void setConnectionFacade(ConnectionFacade connectionFacade) { + this.connectionFacade = connectionFacade; + } + + @Override + public ClientResources getClientResources() { + return clientResources; + } + + @Override + public void setAutoFlushCommands(boolean autoFlush) { + this.autoFlushCommands = autoFlush; + } + + @Override + public void addListener(PushListener listener) { + pushListeners.add(listener); + } + + @Override + public void removeListener(PushListener listener) { + pushListeners.remove(listener); + } + + @Override + public List getPushListeners() { + return pushListeners; + } + + @Override + public RedisCommand write(RedisCommand command) { + + LettuceAssert.notNull(command, "Command must not be null"); + + final Throwable validation = validateWrite(1); + if (validation != null) { + command.completeExceptionally(validation); + return command; + } + + try { + if (inActivation) { + command = processActivationCommand(command); + } + + this.taskQueue.offer(command); + QUEUE_SIZE.incrementAndGet(this); + + if (autoFlushCommands) { + flushCommands(); + } + + } finally { + if (debugEnabled) { + logger.debug("{} write() done", logPrefix()); + } + } + + return command; + } + + @SuppressWarnings("unchecked") + @Override + public Collection> write(Collection> commands) { + + LettuceAssert.notNull(commands, "Commands must not be null"); + + final Throwable validation = validateWrite(commands.size()); + if (validation != null) { + commands.forEach(it -> it.completeExceptionally(validation)); + return (Collection>) commands; + } + + try { + if (inActivation) { + commands = processActivationCommands(commands); + } + + this.taskQueue.offer(commands); + QUEUE_SIZE.addAndGet(this, commands.size()); + + if (autoFlushCommands) { + flushCommands(); + } + } finally { + if (debugEnabled) { + logger.debug("{} write() done", logPrefix()); + } + } + + return (Collection>) commands; + } + + @Override + public void notifyChannelActive(Channel channel) { + lastEventLoop = channel.eventLoop(); + + final ContextualChannel contextualChannel = new ContextualChannel(channel, ConnectionContext.State.CONNECTED); + + this.logPrefix = null; + this.connectionError = null; + + if (!CHANNEL.compareAndSet(this, DummyContextualChannelInstances.CHANNEL_CONNECTING, contextualChannel)) { + channel.close(); + onUnexpectedState("notifyChannelActive", ConnectionContext.State.CONNECTING); + return; + } + + // Created a synchronize-before with set channel to CHANNEL_CONNECTING, + if (isClosed()) { + logger.info("{} Closing channel because endpoint is already closed", logPrefix()); + channel.close(); + + // Cleaning will be done later in notifyChannelInactiveAfterWatchdogDecision, we are happy so far. + return; + } + + if (connectionWatchdog != null) { + connectionWatchdog.arm(); + } + + try { + if (debugEnabled) { + logger.debug("{} activating endpoint", logPrefix()); + } + + try { + inActivation = true; + connectionFacade.activated(); + } finally { + inActivation = false; + } + + scheduleSendJobOnConnected(contextualChannel); + } catch (Exception e) { + + if (debugEnabled) { + logger.debug("{} channelActive() ran into an exception", logPrefix()); + } + + if (clientOptions.isCancelCommandsOnReconnectFailure()) { + resetInternal(); + } + + throw e; + } + } + + @Override + public void notifyReconnectFailed(Throwable t) { + this.failedToReconnectReason = t; + + if (!CHANNEL.compareAndSet(this, DummyContextualChannelInstances.CHANNEL_CONNECTING, + DummyContextualChannelInstances.CHANNEL_RECONNECT_FAILED)) { + onUnexpectedState("notifyReconnectFailed", ConnectionContext.State.CONNECTING); + return; + } + + syncAfterTerminated(() -> { + if (isClosed()) { + onEndpointClosed(); + } else { + cancelCommands("reconnect failed"); + } + }); + } + + @Override + public void notifyChannelInactive(Channel channel) { + if (debugEnabled) { + logger.debug("{} deactivating endpoint handler", logPrefix()); + } + + connectionFacade.deactivated(); + } + + @Override + public void notifyChannelInactiveAfterWatchdogDecision(Channel channel, + Deque> retryableQueuedCommands) { + final ContextualChannel inactiveChan = this.channel; + if (!inactiveChan.context.initialState.isConnected() || inactiveChan.getDelegate() != channel) { + logger.error("[unexpected][{}] notifyChannelInactive: channel not match", logPrefix()); + return; + } + + if (inactiveChan.context.isChannelInactiveEventFired()) { + logger.error("[unexpected][{}] notifyChannelInactive: already fired", logPrefix()); + return; + } + + boolean willReconnect = connectionWatchdog != null && connectionWatchdog.willReconnect(); + RedisException exception = null; + // Unlike DefaultEndpoint, here we don't check reliability since connectionWatchdog.willReconnect() already does it. + if (isClosed()) { + exception = new RedisException("endpoint closed"); + willReconnect = false; + } + + if (willReconnect) { + CHANNEL.set(this, DummyContextualChannelInstances.CHANNEL_WILL_RECONNECT); + // Create a synchronize-before with this.channel = CHANNEL_WILL_RECONNECT + if (isClosed()) { + exception = new RedisException("endpoint closed"); + willReconnect = false; + } else { + exception = new RedisException("channel inactive and will reconnect"); + } + } else if (exception == null) { + exception = new RedisException("channel inactive and connectionWatchdog won't reconnect"); + } + + if (!willReconnect) { + CHANNEL.set(this, DummyContextualChannelInstances.CHANNEL_ENDPOINT_CLOSED); + } + inactiveChan.context + .setCloseStatus(new ConnectionContext.CloseStatus(willReconnect, retryableQueuedCommands, exception)); + trySetEndpointQuiescence(inactiveChan); + } + + @Override + public void notifyException(Throwable t) { + if (t instanceof RedisConnectionException && RedisConnectionException.isProtectedMode(t.getMessage())) { + connectionError = t; + inProtectMode = true; + } + + final ContextualChannel curr = this.channel; + if (!curr.context.initialState.isConnected() || !curr.isActive()) { + connectionError = t; + } + } + + @Override + public void registerConnectionWatchdog(ConnectionWatchdog connectionWatchdog) { + this.connectionWatchdog = connectionWatchdog; + } + + @Override + public void flushCommands() { + final ContextualChannel chan = this.channel; + switch (chan.context.initialState) { + case ENDPOINT_CLOSED: + syncAfterTerminated(this::onEndpointClosed); + return; + case RECONNECT_FAILED: + syncAfterTerminated(() -> { + if (isClosed()) { + onEndpointClosed(); + } else { + fulfillCommands("Reconnect failed", + cmd -> cmd.completeExceptionally(new RedisException("Reconnect failed"))); + } + }); + return; + case WILL_RECONNECT: + case CONNECTING: + // command will be handled later either in notifyReconnectFailed or in notifyChannelActive + return; + case CONNECTED: + scheduleSendJobIfNeeded(chan); + return; + default: + throw new IllegalStateException("unexpected state: " + chan.context.initialState); + } + } + + /** + * Close the connection. + */ + @Override + public void close() { + + if (debugEnabled) { + logger.debug("{} close()", logPrefix()); + } + + closeAsync().join(); + } + + @Override + public CompletableFuture closeAsync() { + + if (debugEnabled) { + logger.debug("{} closeAsync()", logPrefix()); + } + + if (isClosed()) { + return closeFuture; + } + + if (STATUS.compareAndSet(this, ST_OPEN, ST_CLOSED)) { + if (connectionWatchdog != null) { + connectionWatchdog.prepareClose(); + } + + final ContextualChannel chan = channel; + if (chan.context.initialState.isConnected()) { + // 1. STATUS.compareAndSet(this, ST_OPEN, ST_CLOSED) synchronize-before channel == CONNECTED + // 2. channel == CONNECTED synchronize-before setting channel to WILL_RECONNECT/ENDPOINT_CLOSED + // 3. setting channel to WILL_RECONNECT synchronize-before `isClosed()`, which will cancel all the commands. + Futures.adapt(chan.close(), closeFuture); + } else { + // if is FAILED_TO_CONNECT/CLIENT_CLOSED, don't care, otherwise + // 1. STATUS.compareAndSet(this, ST_OPEN, ST_CLOSED) synchronize-before channel == WILL_RECONNECT/CONNECTING + // 2. channel == WILL_RECONNECT/CONNECTING synchronize-before setting channel to CONNECTED/RECONNECT_FAILED + // 3. setting channel to CONNECTED/RECONNECT_FAILED synchronize-before `isClosed()`, which will cancel the + // commands; + closeFuture.complete(null); + } + } + + return closeFuture; + } + + /** + * Disconnect the channel. + */ + public void disconnect() { + + ContextualChannel chan = this.channel; + + if (chan.context.initialState.isConnected() && chan.isOpen()) { + chan.disconnect(); + } + } + + /** + * Reset the writer state. Queued commands will be canceled and the internal state will be reset. This is useful when the + * internal state machine gets out of sync with the connection. + */ + @Override + public void reset() { + + if (debugEnabled) { + logger.debug("{} reset()", logPrefix()); + } + + final ContextualChannel chan = channel; + if (chan.context.initialState.isConnected()) { + chan.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset()); + } + // Unsafe to call cancelBufferedCommands() here. + // cancelBufferedCommands("Reset"); + } + + private void resetInternal() { + + if (debugEnabled) { + logger.debug("{} reset()", logPrefix()); + } + + ContextualChannel chan = channel; + if (chan.context.initialState.isConnected()) { + chan.pipeline().fireUserEventTriggered(new ConnectionEvents.Reset()); + } + // Unsafe to call cancelBufferedCommands() here. + cancelCommands("Reset"); + } + + /** + * Reset the command-handler to the initial not-connected state. + */ + @Override + public void initialState() { + + // Thread safe since we are not connected yet. + cancelCommands("initialState"); + + ContextualChannel currentChannel = this.channel; + if (currentChannel.context.initialState.isConnected()) { + ChannelFuture close = currentChannel.close(); + if (currentChannel.isOpen()) { + close.syncUninterruptibly(); + } + } + } + + private boolean isClosed() { + return status == ST_CLOSED; + } + + protected String logPrefix() { + + if (logPrefix != null) { + return logPrefix; + } + + String buffer = "[" + ChannelLogDescriptor.logDescriptor(channel.getDelegate()) + ", " + "epid=" + getId() + ']'; + logPrefix = buffer; + return buffer; + } + + @Override + public String getId() { + return cachedEndpointId; + } + + private void scheduleSendJobOnConnected(final ContextualChannel chan) { + LettuceAssert.assertState(chan.eventLoop().inEventLoop(), "must be called in event loop thread"); + + // Schedule directly + loopSend(chan, false); + } + + private void scheduleSendJobIfNeeded(final ContextualChannel chan) { + final EventLoop eventLoop = chan.eventLoop(); + if (eventLoop.inEventLoop()) { + // Possible in reactive() mode. + loopSend(chan, false); + return; + } + + if (chan.context.batchFlushEndPointContext.hasOngoingSendLoop.tryEnter()) { + // Benchmark result of using tryEnterSafeGetVolatile() or not (1 thread, async get): + // 1. uses tryEnterSafeGetVolatile() to avoid unnecessary eventLoop.execute() calls + // Avg latency: 3.2956217278663s + // Avg QPS: 495238.50056392356/s + // 2. uses eventLoop.execute() directly + // Avg latency: 3.2677197021496998s + // Avg QPS: 476925.0751855796/s + eventLoop.execute(() -> loopSend(chan, true)); + } + + // Otherwise: + // 1. offer() (volatile write) synchronizes-before hasOngoingSendLoop.safe.get() == 1 (volatile read) + // 2. hasOngoingSendLoop.safe.get() == 1 (volatile read) synchronizes-before + // hasOngoingSendLoop.safe.set(0) (volatile write) in first loopSend0() + // 3. hasOngoingSendLoop.safe.set(0) (volatile write) synchronizes-before + // second loopSend0(), which will call poll() + } + + private void loopSend(final ContextualChannel chan, boolean entered) { + final ConnectionContext connectionContext = chan.context; + final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext.batchFlushEndPointContext; + if (connectionContext.isChannelInactiveEventFired() || batchFlushEndPointContext.hasRetryableFailedToSendTasks()) { + return; + } + + LettuceAssert.assertState(channel == chan, "unexpected: channel not match but closeStatus == null"); + loopSend0(batchFlushEndPointContext, chan, writeSpinCount, entered); + } + + private void loopSend0(final BatchFlushEndPointContext batchFlushEndPointContext, final ContextualChannel chan, + int remainingSpinnCount, final boolean entered) { + do { + final int count = pollBatch(batchFlushEndPointContext, chan); + if (count < 0) { + return; + } + if (count < batchSize) { + // queue was empty + break; + } + } while (--remainingSpinnCount > 0); + + if (remainingSpinnCount <= 0) { + // Don't need to exitUnsafe since we still have an ongoing consume tasks in this thread. + chan.eventLoop().execute(() -> loopSend(chan, entered)); + return; + } + + if (entered) { + // The send loop will be triggered later when a new task is added, + // // Don't setUnsafe here because loopSend0() may result in a delayed loopSend() call. + batchFlushEndPointContext.hasOngoingSendLoop.exit(); + // // Guarantee thread-safety: no dangling tasks in the queue. + loopSend0(batchFlushEndPointContext, chan, remainingSpinnCount, false); + // chan.eventLoop().schedule(() -> loopSend0(batchFlushEndPointContext, chan, writeSpinCount, false), 100, + // TimeUnit.NANOSECONDS); + } + } + + private int pollBatch(final BatchFlushEndPointContext batchFlushEndPointContext, ContextualChannel chan) { + int count = 0; + while (count < batchSize) { + final Object o = this.taskQueue.poll(); + if (o == null) { + break; + } + + if (o instanceof RedisCommand) { + RedisCommand cmd = (RedisCommand) o; + channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd)); + count++; + } else { + @SuppressWarnings("unchecked") + Collection> commands = (Collection>) o; + for (RedisCommand cmd : commands) { + channelWrite(chan, cmd).addListener(WrittenToChannel.newInstance(this, chan, cmd)); + } + count += commands.size(); + } + } + + if (count > 0) { + batchFlushEndPointContext.add(count); + + channelFlush(chan); + if (batchFlushEndPointContext.hasRetryableFailedToSendTasks()) { + // Wait for onConnectionClose event() + return -1; + } + } + return count; + } + + private void trySetEndpointQuiescence(ContextualChannel chan) { + LettuceAssert.isTrue(chan.eventLoop().inEventLoop(), "unexpected: not in event loop"); + + final ConnectionContext connectionContext = chan.context; + final @Nullable ConnectionContext.CloseStatus closeStatus = connectionContext.getCloseStatus(); + final BatchFlushEndPointContext batchFlushEndPointContext = connectionContext.batchFlushEndPointContext; + if (batchFlushEndPointContext.isDone() && closeStatus != null) { + if (closeStatus.isWillReconnect()) { + onWillReconnect(closeStatus, batchFlushEndPointContext); + } else { + onWontReconnect(closeStatus, batchFlushEndPointContext); + } + + if (chan.context.setChannelQuiescentOnce()) { + onEndpointQuiescence(); + } else { + ExceptionUtils.maybeFire(logger, canFire, "unexpected: setEndpointQuiescenceOncePerConnection() failed"); + } + } + } + + private void onEndpointQuiescence() { + if (channel.context.initialState == ConnectionContext.State.ENDPOINT_CLOSED) { + return; + } + + // Create happens-before with channelActive() + if (!CHANNEL.compareAndSet(this, DummyContextualChannelInstances.CHANNEL_WILL_RECONNECT, + DummyContextualChannelInstances.CHANNEL_CONNECTING)) { + onUnexpectedState("onEndpointQuiescence", ConnectionContext.State.WILL_RECONNECT); + return; + } + + // neither connectionWatchdog nor doReconnectOnEndpointQuiescence could be null + connectionWatchdog.reconnectOnEndpointQuiescence(); + } + + private void onWillReconnect(@Nonnull final ConnectionContext.CloseStatus closeStatus, + final BatchFlushEndPointContext batchFlushEndPointContext) { + final @Nullable Deque> retryableFailedToSendTasks = batchFlushEndPointContext + .getAndClearRetryableFailedToSendTasks(); + if (retryableFailedToSendTasks != null) { + // Save retryable failed tasks + logger.info( + "[onWillReconnect][{}] compensate {} retryableFailedToSendTasks (write failure) for retrying on reconnecting, first write error: {}", + logPrefix(), retryableFailedToSendTasks.size(), + batchFlushEndPointContext.getFirstDiscontinueReason().getMessage()); + offerFirstAll(retryableFailedToSendTasks); + } + + LettuceAssert.assertState(reliability != Reliability.AT_MOST_ONCE, "unexpected: reliability is AT_MOST_ONCE"); + final Deque> retryablePendingCommands = closeStatus.getAndClearRetryablePendingCommands(); + if (retryablePendingCommands != null) { + // Save uncompletedTasks for later retry. + logger.info( + "[onWillReconnect][{}] compensate {} retryable pending commands (write success) for retrying on reconnecting", + logPrefix(), retryablePendingCommands.size()); + offerFirstAll(retryablePendingCommands); + } + + // follow the same logic as DefaultEndpoint + if (inProtectMode) { + cancelCommands("inProtectMode"); + } + } + + private void onWontReconnect(@Nonnull final ConnectionContext.CloseStatus closeStatus, + final BatchFlushEndPointContext batchFlushEndPointContext) { + // No need to use syncAfterTerminated() since we are already in the event loop. + if (isClosed()) { + onEndpointClosed(closeStatus.getAndClearRetryablePendingCommands(), + batchFlushEndPointContext.getAndClearRetryableFailedToSendTasks()); + } else { + fulfillCommands("onConnectionClose called and won't reconnect", + it -> it.completeExceptionally(closeStatus.getErr()), closeStatus.getAndClearRetryablePendingCommands(), + batchFlushEndPointContext.getAndClearRetryableFailedToSendTasks()); + } + } + + private void offerFirstAll(Deque> commands) { + commands.forEach(cmd -> { + if (cmd instanceof DemandAware.Sink) { + ((DemandAware.Sink) cmd).removeSource(); + } + }); + this.taskQueue.offerFirstAll(commands); + QUEUE_SIZE.addAndGet(this, commands.size()); + } + + private void cancelCommands(String message) { + fulfillCommands(message, RedisCommand::cancel); + } + + @SafeVarargs + private final void onEndpointClosed(Queue>... queues) { + fulfillCommands("endpoint closed", callbackOnClose, queues); + } + + @SafeVarargs + private final void fulfillCommands(String message, Consumer> commandConsumer, + Queue>... queues) { + int totalCancelledTaskNum = 0; + for (Queue> queue : queues) { + while (true) { + RedisCommand cmd = queue.poll(); + if (cmd == null) { + break; + } + if (cmd.getOutput() != null) { + cmd.getOutput().setError(message); + } + commandConsumer.accept(cmd); + + totalCancelledTaskNum++; + } + } + + int cancelledTaskNumInTaskQueue = 0; + while (true) { + Object o = this.taskQueue.poll(); + if (o == null) { + break; + } + + if (o instanceof RedisCommand) { + RedisCommand cmd = (RedisCommand) o; + if (cmd.getOutput() != null) { + cmd.getOutput().setError(message); + } + commandConsumer.accept(cmd); + cancelledTaskNumInTaskQueue++; + totalCancelledTaskNum++; + } else { + @SuppressWarnings("unchecked") + Collection> commands = (Collection>) o; + for (RedisCommand cmd : commands) { + if (cmd.getOutput() != null) { + cmd.getOutput().setError(message); + } + commandConsumer.accept(cmd); + } + cancelledTaskNumInTaskQueue += commands.size(); + totalCancelledTaskNum += commands.size(); + } + } + + QUEUE_SIZE.addAndGet(this, -cancelledTaskNumInTaskQueue); + if (totalCancelledTaskNum > 0) { + logger.error("{} cancel {} pending tasks, reason: '{}'", logPrefix(), totalCancelledTaskNum, message); + } + } + + private RedisCommand processActivationCommand(RedisCommand command) { + + if (!ActivationCommand.isActivationCommand(command)) { + return new ActivationCommand<>(command); + } + + return command; + } + + private Collection> processActivationCommands( + Collection> commands) { + + Collection> commandsToReturn = new ArrayList<>(commands.size()); + + for (RedisCommand command : commands) { + + if (!ActivationCommand.isActivationCommand(command)) { + command = new ActivationCommand<>(command); + } + + commandsToReturn.add(command); + } + + return commandsToReturn; + } + + private Throwable validateWrite(@SuppressWarnings("unused") int commands) { + if (isClosed()) { + return new RedisException("Connection is closed"); + } + + final Throwable localConnectionErr = connectionError; + if (localConnectionErr != null /* different logic of DefaultEndpoint */) { + return localConnectionErr; + } + + if (boundedQueues && queueSize + commands > clientOptions.getRequestQueueSize()) { + return new RedisException("Request queue size exceeded: " + clientOptions.getRequestQueueSize() + + ". Commands are not accepted until the queue size drops."); + } + + final ContextualChannel chan = this.channel; + switch (chan.context.initialState) { + case ENDPOINT_CLOSED: + return new RedisException("Connection is closed"); + case RECONNECT_FAILED: + return failedToReconnectReason; + case WILL_RECONNECT: + case CONNECTING: + return rejectCommandsWhileDisconnected ? new RedisException("Currently not connected. Commands are rejected.") + : null; + case CONNECTED: + return !chan.isActive() && rejectCommandsWhileDisconnected ? new RedisException("Connection is closed") : null; + default: + throw new IllegalStateException("unexpected state: " + chan.context.initialState); + } + } + + private void onUnexpectedState(String caller, ConnectionContext.State exp) { + final ConnectionContext.State actual = this.channel.context.initialState; + logger.error("{}[{}][unexpected] : unexpected state: exp '{}' got '{}'", logPrefix(), caller, exp, actual); + cancelCommands(String.format("%s: state not match: expect '%s', got '%s'", caller, exp, actual)); + } + + private void channelFlush(Channel channel) { + if (debugEnabled) { + logger.debug("{} write() channelFlush", logPrefix()); + } + + channel.flush(); + } + + private ChannelFuture channelWrite(Channel channel, RedisCommand command) { + + if (debugEnabled) { + logger.debug("{} write() channelWrite command {}", logPrefix(), command); + } + + return channel.write(command); + } + + /* + * Synchronize after the endpoint is terminated. This is to ensure only one thread can access the task queue after endpoint + * is terminated (state is RECONNECT_FAILED/ENDPOINT_CLOSED) + */ + private void syncAfterTerminated(Runnable runnable) { + final EventLoop localLastEventLoop = lastEventLoop; + LettuceAssert.notNull(localLastEventLoop, "lastEventLoop must not be null after terminated"); + if (localLastEventLoop.inEventLoop()) { + runnable.run(); + } else { + localLastEventLoop.execute(() -> { + runnable.run(); + LettuceAssert.isTrue(lastEventLoop == localLastEventLoop, "lastEventLoop must not be changed after terminated"); + }); + } + } + + private enum Reliability { + AT_MOST_ONCE, AT_LEAST_ONCE + } + + /** + * Add to stack listener. This listener is pooled and must be {@link #recycle() recycled after usage}. + */ + static class WrittenToChannel implements GenericFutureListener> { + + private static final Recycler RECYCLER = new Recycler() { + + @Override + protected WrittenToChannel newObject(Recycler.Handle handle) { + return new WrittenToChannel(handle); + } + + }; + + private final Recycler.Handle handle; + + private DefaultBatchFlushEndpoint endpoint; + + private RedisCommand command; + + private ContextualChannel chan; + + private WrittenToChannel(Recycler.Handle handle) { + this.handle = handle; + } + + /** + * Allocate a new instance. + * + * @return new instance + */ + static WrittenToChannel newInstance(DefaultBatchFlushEndpoint endpoint, ContextualChannel chan, + RedisCommand command) { + + WrittenToChannel entry = RECYCLER.get(); + + entry.endpoint = endpoint; + entry.chan = chan; + entry.command = command; + + return entry; + } + + @Override + public void operationComplete(Future future) { + final BatchFlushEndPointContext batchFlushEndPointContext = chan.context.batchFlushEndPointContext; + try { + QUEUE_SIZE.decrementAndGet(endpoint); + batchFlushEndPointContext.done(1); + + final Throwable retryableErr = checkSendResult(future, chan, command); + if (retryableErr != null && batchFlushEndPointContext.addRetryableFailedToSendTask(command, retryableErr)) { + // Close connection on first transient write failure + internalCloseConnectionIfNeeded(chan, retryableErr); + } + + endpoint.trySetEndpointQuiescence(chan); + } finally { + recycle(); + } + } + + /** + * Check write result. + * + * @param sendFuture The future to check. + * @param contextualChannel The channel instance associated with the future. + * @param cmd The task. + * @return The cause of the failure if is a retryable failed task, otherwise null. + */ + private Throwable checkSendResult(Future sendFuture, ContextualChannel contextualChannel, + RedisCommand cmd) { + if (cmd.isDone()) { + ExceptionUtils.logUnexpectedDone(logger, endpoint.logPrefix(), cmd); + return null; + } + + final ConnectionContext.CloseStatus closeStatus = contextualChannel.context.getCloseStatus(); + if (closeStatus != null) { + logger.warn("[checkSendResult][interesting][{}] callback called after onClose() event, close status: {}", + endpoint.logPrefix(), contextualChannel.context.getCloseStatus()); + final Throwable err = sendFuture.isSuccess() ? closeStatus.getErr() : sendFuture.cause(); + if (!closeStatus.isWillReconnect() || shouldNotRetry(err, cmd)) { + cmd.completeExceptionally(err); + return null; + } else { + return err; + } + } + + if (sendFuture.isSuccess()) { + return null; + } + + final Throwable cause = sendFuture.cause(); + ExceptionUtils.maybeLogSendError(logger, cause); + if (shouldNotRetry(cause, cmd)) { + cmd.completeExceptionally(cause); + return null; + } + + return cause; + } + + private boolean shouldNotRetry(Throwable cause, RedisCommand cmd) { + return endpoint.reliability == Reliability.AT_MOST_ONCE || ActivationCommand.isActivationCommand(cmd) + || ExceptionUtils.oneOf(cause, SHOULD_NOT_RETRY_EXCEPTION_TYPES); + } + + private void internalCloseConnectionIfNeeded(ContextualChannel toCloseChan, Throwable reason) { + if (toCloseChan.context.isChannelInactiveEventFired() || !toCloseChan.isActive()) { + return; + } + + logger.error( + "[internalCloseConnectionIfNeeded][interesting][{}] close the connection due to write error, reason: '{}'", + endpoint.logPrefix(), reason.getMessage(), reason); + toCloseChan.eventLoop().schedule(() -> { + if (toCloseChan.isActive()) { + toCloseChan.close(); + } + }, 1, TimeUnit.SECONDS); + } + + private void recycle() { + this.endpoint = null; + this.chan = null; + this.command = null; + + handle.recycle(this); + } + + } + +} diff --git a/src/main/java/io/lettuce/core/utils/ExceptionUtils.java b/src/main/java/io/lettuce/core/utils/ExceptionUtils.java new file mode 100644 index 000000000..4072d81b3 --- /dev/null +++ b/src/main/java/io/lettuce/core/utils/ExceptionUtils.java @@ -0,0 +1,95 @@ +package io.lettuce.core.utils; + +import io.lettuce.core.output.CommandOutput; +import io.lettuce.core.protocol.RedisCommand; +import io.netty.channel.socket.ChannelOutputShutdownException; +import io.netty.util.internal.logging.InternalLogger; + +import java.io.IOException; +import java.nio.channels.ClosedChannelException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Function; + +public class ExceptionUtils { + + private static final Set SUPPRESS_IO_EXCEPTION_MESSAGES = new HashSet<>( + Arrays.asList("Connection reset by peer", "Broken pipe", "Connection timed out")); + + private ExceptionUtils() { + } + + public static void maybeLogSendError(InternalLogger logger, Throwable cause) { + if (cause instanceof ClosedChannelException) { + return; + } + + if (cause instanceof IOException && (SUPPRESS_IO_EXCEPTION_MESSAGES.contains(cause.getMessage()) + || cause instanceof ChannelOutputShutdownException)) { + logger.debug("[maybeLogSendError] error during request: {}", cause.getMessage(), cause); + } else { + logger.error("[maybeLogSendError][attention] unexpected exception during request: {}", cause.getMessage(), cause); + } + } + + public static T castTo(Throwable throwable, Class clazz, Function supplier) { + if (clazz.isInstance(throwable)) { + return clazz.cast(throwable); + } + return supplier.apply(throwable); + } + + public static T clearStackTrace(T throwable) { + throwable.setStackTrace(new StackTraceElement[0]); + return throwable; + } + + /** + * Returns whether the throwable is one of the exception types or one of the cause in the cause chain is one of the + * exception types + * + * @param throwable exception to check + * @param exceptionTypes target exception types. + * @return whether the throwable is one of the exception types or one of the cause in the cause chain is one of the + * exception types + */ + public static boolean oneOf(final Throwable throwable, final Collection> exceptionTypes) { + Throwable cause = throwable; + do { + for (Class exceptionType : exceptionTypes) { + if (exceptionType.isInstance(cause)) { + return true; + } + } + cause = cause.getCause(); + } while (cause != null); + return false; + } + + public static void maybeFire(InternalLogger logger, boolean canFire, String msg) { + final IllegalStateException ex = new IllegalStateException(msg); + logger.error("[unexpected] {}", msg, ex); + if (canFire) { + throw ex; + } + } + + public static void logUnexpectedDone(InternalLogger logger, String logPrefix, RedisCommand cmd) { + if (cmd.isCancelled()) { + logger.warn("[logUnexpectedDone][{}] command is cancelled: {}", logPrefix, cmd); + return; + } + + final CommandOutput output = cmd.getOutput(); + final String err = output.getError(); + if (err != null) { + logger.warn("[logUnexpectedDone][{}] command completes with err, cmd: [{}], err: [{}]", logPrefix, cmd, err); + return; + } + + logger.warn("[logUnexpectedDone][{}] command completes normally, cmd: [{}], value: [{}]", logPrefix, cmd, output.get()); + } + +}