Skip to content

Commit

Permalink
implementation of ssubscribe
Browse files Browse the repository at this point in the history
reactive commands implementation and additional tests
  • Loading branch information
atakavci committed Apr 2, 2024
1 parent 8848d15 commit 948d017
Show file tree
Hide file tree
Showing 25 changed files with 330 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.models.partitions.Partitions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import io.lettuce.core.cluster.pubsub.RedisClusterShardedPubSubListener;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.RedisPubSubListener;
Expand All @@ -28,7 +28,7 @@ class ClusterPubSubConnectionProvider<K, V> extends PooledClusterConnectionProvi

private final RedisCodec<K, V> redisCodec;

private final RedisClusterPubSubListener<K, V> notifications;
private final RedisClusterShardedPubSubListener<K, V> notifications;

/**
* Creates a new {@link ClusterPubSubConnectionProvider}.
Expand All @@ -40,7 +40,7 @@ class ClusterPubSubConnectionProvider<K, V> extends PooledClusterConnectionProvi
* @param clusterEventListener must not be {@code null}.
*/
ClusterPubSubConnectionProvider(RedisClusterClient redisClusterClient, RedisChannelWriter clusterWriter,
RedisCodec<K, V> redisCodec, RedisClusterPubSubListener<K, V> notificationTarget,
RedisCodec<K, V> redisCodec, RedisClusterShardedPubSubListener<K, V> notificationTarget,
ClusterEventListener clusterEventListener) {

super(redisClusterClient, clusterWriter, redisCodec, clusterEventListener);
Expand Down Expand Up @@ -167,6 +167,11 @@ public void punsubscribed(K pattern, long count) {
notifications.punsubscribed(getNode(), pattern, count);
}

@Override
public void ssubscribed(K channel, long count) {
notifications.ssubscribed(getNode(), channel, count);
}

private RedisClusterNode getNode() {
return nodeId != null ? getPartitions().getPartitionByNodeId(nodeId) : getPartitions().getPartition(host, port);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static ReadOnlyCommands.ReadOnlyPredicate asPredicate() {
enum CommandName {

// Pub/Sub commands are no key-space commands so they are safe to execute on replica nodes
PUBLISH, PUBSUB, PSUBSCRIBE, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE
PUBLISH, PUBSUB, PSUBSCRIBE, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, SSUBSCRIBE
}

}
23 changes: 22 additions & 1 deletion src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,17 @@

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import io.lettuce.core.cluster.pubsub.RedisClusterShardedPubSubListener;
import io.lettuce.core.pubsub.PubSubEndpoint;
import io.lettuce.core.pubsub.PubSubMessage;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.RedisShardedPubSubListener;
import io.lettuce.core.resource.ClientResources;

/**
Expand Down Expand Up @@ -45,7 +49,7 @@ public void addListener(RedisClusterPubSubListener<K, V> listener) {
clusterListeners.add(listener);
}

public RedisClusterPubSubListener<K, V> getUpstreamListener() {
public RedisClusterShardedPubSubListener<K, V> getUpstreamListener() {
return upstream;
}

Expand Down Expand Up @@ -88,6 +92,9 @@ protected void notifyListeners(PubSubMessage<K, V> output) {
case unsubscribe:
multicast.unsubscribed(clusterNode, output.channel(), output.count());
break;
case ssubscribe:
multicast.ssubscribed(clusterNode, output.channel(), output.count());
break;

Check warning on line 97 in src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java#L96-L97

Added lines #L96 - L97 were not covered by tests
default:
throw new UnsupportedOperationException("Operation " + output.type() + " not supported");
}
Expand Down Expand Up @@ -189,6 +196,20 @@ public void punsubscribed(RedisClusterNode node, K pattern, long count) {
clusterListeners.forEach(listener -> listener.punsubscribed(node, pattern, count));
}

@Override
public void ssubscribed(RedisClusterNode node, K channel, long count) {
getListeners().forEach(listener -> {
if (listener instanceof RedisShardedPubSubListener) {
((RedisShardedPubSubListener<K, V>) listener).ssubscribed(channel, count);

Check warning on line 203 in src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java#L203

Added line #L203 was not covered by tests
}
});

Check warning on line 205 in src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java#L205

Added line #L205 was not covered by tests
clusterListeners.forEach(listener -> {
if (listener instanceof RedisClusterShardedPubSubListener) {
((RedisClusterShardedPubSubListener<K, V>) listener).ssubscribed(node, channel, count);
}
});
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* @author Mark Paluch
* @since 4.4
*/
public class RedisClusterPubSubAdapter<K, V> implements RedisClusterPubSubListener<K, V> {
public class RedisClusterPubSubAdapter<K, V> implements RedisClusterShardedPubSubListener<K, V> {

@Override
public void message(RedisClusterNode node, K channel, V message) {
Expand Down Expand Up @@ -42,4 +42,9 @@ public void punsubscribed(RedisClusterNode node, K pattern, long count) {
// empty adapter method
}

@Override
public void ssubscribed(RedisClusterNode node, K channel, long count) {
// empty adapter method
}

Check warning on line 48 in src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java#L48

Added line #L48 was not covered by tests

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package io.lettuce.core.cluster.pubsub;

import io.lettuce.core.cluster.models.partitions.RedisClusterNode;

/**
* Interface for Redis Cluster Pub/Sub listeners.
*
* @param <K> Key type.
* @param <V> Value type.
* @author Ali Takavci
* @since 4.4
*/
public interface RedisClusterShardedPubSubListener<K, V> extends RedisClusterPubSubListener<K, V>{

/**
* Subscribed to a shard channel.
*
* @param node the {@link RedisClusterNode} from which the {@code message} originates.
* @param shardChannel Shard channel
* @param count Subscription count.
*/
void ssubscribed(RedisClusterNode node, K shardChannel, long count);

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,11 @@ public interface NodeSelectionPubSubAsyncCommands<K, V> {
*/
AsyncExecutions<Void> unsubscribe(K... channels);

/**
* Listen for messages published to the given shard channels.
*
* @param shardChannels the channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code subscribe} completion
*/
AsyncExecutions<Void> ssubscribe(K... shardChannels);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,12 @@ public interface NodeSelectionPubSubReactiveCommands<K, V> {
*/
ReactiveExecutions<Void> unsubscribe(K... channels);

/**
* Listen for messages published to the given shard channels.
*
* @param shardCchannels the channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code subscribe} completion
*/
ReactiveExecutions<Void> ssubscribe(K... shardCchannels);

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,12 @@ public interface NodeSelectionPubSubCommands<K, V> {
*/
Executions<Void> unsubscribe(K... channels);

/**
* Listen for messages published to the given shard channels.
*
* @param shardChannels the channels
* @return Executions Future to synchronize {@code subscribe} completion
*/
Executions<Void> ssubscribe(K... shardChannels);

}
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/protocol/CommandType.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public enum CommandType implements ProtocolKeyword {

// Pub/Sub

PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB,
PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE,

// Sets

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ final Command<K, V, V> unsubscribe(K... channels) {
return pubSubCommand(UNSUBSCRIBE, new PubSubOutput<>(codec), channels);
}

@SafeVarargs
final Command<K, V, V> ssubscribe(K... shardChannels) {
LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKeys(shardChannels);
return createCommand(SSUBSCRIBE, new PubSubOutput<>(codec), args);
}

@SafeVarargs
final <T> Command<K, V, T> pubSubCommand(CommandType type, CommandOutput<K, V, T> output, K... keys) {
return new Command<>(type, output, new PubSubCommandArgs<>(codec).addKeys(keys));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ private boolean shouldCompleteCommand(PubSubOutput.Type type, RedisCommand<?, ?,
case subscribe:
return commandType.equalsIgnoreCase("SUBSCRIBE");

case ssubscribe:
return commandType.equalsIgnoreCase("SSUBSCRIBE");

case psubscribe:
return commandType.equalsIgnoreCase("PSUBSCRIBE");

Expand Down
18 changes: 18 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionState;
Expand Down Expand Up @@ -56,6 +57,8 @@ public class PubSubEndpoint<K, V> extends DefaultEndpoint {

private final Set<Wrapper<K>> channels;

private final Set<Wrapper<K>> shardChannels;

private final Set<Wrapper<K>> patterns;

private volatile boolean subscribeWritten = false;
Expand All @@ -70,13 +73,15 @@ public class PubSubEndpoint<K, V> extends DefaultEndpoint {
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.UNSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PUNSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.SSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.QUIT.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PING.name());

SUBSCRIBE_COMMANDS = new HashSet<>(2, 1);

SUBSCRIBE_COMMANDS.add(CommandType.SUBSCRIBE.name());
SUBSCRIBE_COMMANDS.add(CommandType.PSUBSCRIBE.name());
SUBSCRIBE_COMMANDS.add(CommandType.SSUBSCRIBE.name());
}

/**
Expand All @@ -91,6 +96,7 @@ public PubSubEndpoint(ClientOptions clientOptions, ClientResources clientResourc

this.channels = ConcurrentHashMap.newKeySet();
this.patterns = ConcurrentHashMap.newKeySet();
this.shardChannels = ConcurrentHashMap.newKeySet();
}

/**
Expand Down Expand Up @@ -257,12 +263,21 @@ protected void notifyListeners(PubSubMessage<K, V> message) {
case unsubscribe:
listener.unsubscribed(message.channel(), message.count());
break;
case ssubscribe:
shardNotify(listener, (l) -> l.ssubscribed(message.channel(), message.count()));
break;
default:
throw new UnsupportedOperationException("Operation " + message.type() + " not supported");
}
}
}

private void shardNotify(RedisPubSubListener<K, V> listener, Consumer<RedisShardedPubSubListener<K, V>> c) {
if (listener instanceof RedisShardedPubSubListener) {
c.accept((RedisShardedPubSubListener<K, V>) listener);
}
}

private void updateInternalState(PubSubMessage<K, V> message) {
// update internal state
switch (message.type()) {
Expand All @@ -278,6 +293,9 @@ private void updateInternalState(PubSubMessage<K, V> message) {
case unsubscribe:
channels.remove(new Wrapper<>(message.channel()));
break;
case ssubscribe:
shardChannels.add(new Wrapper<>(message.channel()));
break;
default:
break;
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/lettuce/core/pubsub/PubSubOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PubSubOutput<K, V> extends CommandOutput<K, V, V> implements PubSub

public enum Type {

message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe;
message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe;

private final static Set<String> names = new HashSet<>();

Expand Down Expand Up @@ -122,6 +122,7 @@ private void handleOutput(ByteBuffer bytes) {
break;
case subscribe:
case unsubscribe:
case ssubscribe:
channel = codec.decodeKey(bytes);
break;
default:
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
*
* @author Will Glozer
*/
public class RedisPubSubAdapter<K, V> implements RedisPubSubListener<K, V> {
public class RedisPubSubAdapter<K, V> implements RedisShardedPubSubListener<K, V> {

@Override
public void message(K channel, V message) {
Expand Down Expand Up @@ -59,4 +59,9 @@ public void punsubscribed(K pattern, long count) {
// empty adapter method
}

@Override
public void ssubscribed(K shardChannel, long count) {
// empty adapter method
}

Check warning on line 65 in src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java#L65

Added line #L65 was not covered by tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public RedisFuture<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return dispatch(commandBuilder.pubsubShardNumsub(shardChannels));
}

@Override
@SuppressWarnings("unchecked")
public RedisFuture<Void> ssubscribe(K... channels) {
return (RedisFuture<Void>) dispatch(commandBuilder.ssubscribe(channels));
}

@Override
@SuppressWarnings("unchecked")
public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import reactor.core.publisher.Mono;
import io.lettuce.core.RedisReactiveCommandsImpl;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.pubsub.api.reactive.ChannelMessage;
import io.lettuce.core.pubsub.api.reactive.PatternMessage;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
Expand Down Expand Up @@ -158,6 +159,15 @@ public Mono<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return createMono(() -> commandBuilder.pubsubShardNumsub(shardChannels));
}

@Override
public Mono<Void> ssubscribe(K... shardChannels) {
return createFlux(() -> {
Command<K, V, V> c = commandBuilder.ssubscribe(shardChannels);
System.out.println("");
return c;
}).then();
}

@Override
@SuppressWarnings("unchecked")
public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.lettuce.core.pubsub;

/**
* Interface for Redis Pub/Sub listeners.
*
* @param <K> Key type.
* @param <V> Value type.
* @author Ali Takavci
*/
public interface RedisShardedPubSubListener<K, V> extends RedisPubSubListener<K, V>{

/**
* Subscribed to a Shard channel.
*
* @param shardChannel Shard channel
* @param count Subscription count.
*/
void ssubscribed(K chashardChannelnnel, long count);

}
Loading

0 comments on commit 948d017

Please sign in to comment.