Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for SSUBSCRIBE #2813

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,11 @@ public void punsubscribed(K pattern, long count) {
notifications.punsubscribed(getNode(), pattern, count);
}

@Override
public void ssubscribed(K channel, long count) {
notifications.ssubscribed(getNode(), channel, count);
}

private RedisClusterNode getNode() {
return nodeId != null ? getPartitions().getPartitionByNodeId(nodeId) : getPartitions().getPartition(host, port);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public static ReadOnlyCommands.ReadOnlyPredicate asPredicate() {
enum CommandName {

// Pub/Sub commands are no key-space commands so they are safe to execute on replica nodes
PUBLISH, PUBSUB, PSUBSCRIBE, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE
PUBLISH, PUBSUB, PSUBSCRIBE, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, SSUBSCRIBE
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@
case unsubscribe:
multicast.unsubscribed(clusterNode, output.channel(), output.count());
break;
case ssubscribe:
multicast.ssubscribed(clusterNode, output.channel(), output.count());
break;

Check warning on line 93 in src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/PubSubClusterEndpoint.java#L92-L93

Added lines #L92 - L93 were not covered by tests
default:
throw new UnsupportedOperationException("Operation " + output.type() + " not supported");
}
Expand Down Expand Up @@ -189,6 +192,12 @@
clusterListeners.forEach(listener -> listener.punsubscribed(node, pattern, count));
}

@Override
public void ssubscribed(RedisClusterNode node, K channel, long count) {
getListeners().forEach(listener -> listener.ssubscribed(channel, count));
clusterListeners.forEach(listener -> listener.ssubscribed(node, channel, count));
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@
// empty adapter method
}

@Override
public void ssubscribed(RedisClusterNode node, K channel, long count) {
// empty adapter method
}

Check warning on line 48 in src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubAdapter.java#L48

Added line #L48 was not covered by tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,16 @@
*/
void punsubscribed(RedisClusterNode node, K pattern, long count);

/**
* Subscribed to a shard channel.
*
* @param node the {@link RedisClusterNode} from which the {@code message} originates.
* @param shardChannel Shard channel
* @param count Subscription count.
*/
default void ssubscribed(RedisClusterNode node, K shardChannel, long count){
atakavci marked this conversation as resolved.
Show resolved Hide resolved
throw new UnsupportedOperationException(

Check warning on line 78 in src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/cluster/pubsub/RedisClusterPubSubListener.java#L78

Added line #L78 was not covered by tests
"This operation is not available by default! Use one of available pub/sub adapters or override this method in your class.");
atakavci marked this conversation as resolved.
Show resolved Hide resolved
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,11 @@ public interface NodeSelectionPubSubAsyncCommands<K, V> {
*/
AsyncExecutions<Void> unsubscribe(K... channels);

/**
* Listen for messages published to the given shard channels.
*
* @param shardChannels the channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code subscribe} completion
*/
AsyncExecutions<Void> ssubscribe(K... shardChannels);
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,12 @@ public interface NodeSelectionPubSubReactiveCommands<K, V> {
*/
ReactiveExecutions<Void> unsubscribe(K... channels);

/**
atakavci marked this conversation as resolved.
Show resolved Hide resolved
* Listen for messages published to the given shard channels.
*
* @param shardCchannels the channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code subscribe} completion
*/
ReactiveExecutions<Void> ssubscribe(K... shardCchannels);

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,12 @@ public interface NodeSelectionPubSubCommands<K, V> {
*/
Executions<Void> unsubscribe(K... channels);

/**
atakavci marked this conversation as resolved.
Show resolved Hide resolved
* Listen for messages published to the given shard channels.
*
* @param shardChannels the channels
* @return Executions Future to synchronize {@code subscribe} completion
*/
Executions<Void> ssubscribe(K... shardChannels);

}
2 changes: 1 addition & 1 deletion src/main/java/io/lettuce/core/protocol/CommandType.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public enum CommandType implements ProtocolKeyword {

// Pub/Sub

PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB,
PSUBSCRIBE, PUBLISH, PUNSUBSCRIBE, SUBSCRIBE, UNSUBSCRIBE, PUBSUB, SSUBSCRIBE,

// Sets

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ final Command<K, V, V> unsubscribe(K... channels) {
return pubSubCommand(UNSUBSCRIBE, new PubSubOutput<>(codec), channels);
}

@SafeVarargs
final Command<K, V, V> ssubscribe(K... shardChannels) {
LettuceAssert.notEmpty(shardChannels, "Shard channels " + MUST_NOT_BE_EMPTY);

CommandArgs<K, V> args = new CommandArgs<>(codec).addKeys(shardChannels);
return createCommand(SSUBSCRIBE, new PubSubOutput<>(codec), args);
}

@SafeVarargs
final <T> Command<K, V, T> pubSubCommand(CommandType type, CommandOutput<K, V, T> output, K... keys) {
return new Command<>(type, output, new PubSubCommandArgs<>(codec).addKeys(keys));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,9 @@ private boolean shouldCompleteCommand(PubSubOutput.Type type, RedisCommand<?, ?,
case subscribe:
return commandType.equalsIgnoreCase("SUBSCRIBE");

case ssubscribe:
return commandType.equalsIgnoreCase("SSUBSCRIBE");

case psubscribe:
return commandType.equalsIgnoreCase("PSUBSCRIBE");

Expand Down
12 changes: 12 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;

import io.lettuce.core.ClientOptions;
import io.lettuce.core.ConnectionState;
Expand Down Expand Up @@ -56,6 +57,8 @@ public class PubSubEndpoint<K, V> extends DefaultEndpoint {

private final Set<Wrapper<K>> channels;

private final Set<Wrapper<K>> shardChannels;

private final Set<Wrapper<K>> patterns;

private volatile boolean subscribeWritten = false;
Expand All @@ -70,13 +73,15 @@ public class PubSubEndpoint<K, V> extends DefaultEndpoint {
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.UNSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PUNSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.SSUBSCRIBE.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.QUIT.name());
ALLOWED_COMMANDS_SUBSCRIBED.add(CommandType.PING.name());

SUBSCRIBE_COMMANDS = new HashSet<>(2, 1);

SUBSCRIBE_COMMANDS.add(CommandType.SUBSCRIBE.name());
SUBSCRIBE_COMMANDS.add(CommandType.PSUBSCRIBE.name());
SUBSCRIBE_COMMANDS.add(CommandType.SSUBSCRIBE.name());
}

/**
Expand All @@ -91,6 +96,7 @@ public PubSubEndpoint(ClientOptions clientOptions, ClientResources clientResourc

this.channels = ConcurrentHashMap.newKeySet();
this.patterns = ConcurrentHashMap.newKeySet();
this.shardChannels = ConcurrentHashMap.newKeySet();
}

/**
Expand Down Expand Up @@ -257,6 +263,9 @@ protected void notifyListeners(PubSubMessage<K, V> message) {
case unsubscribe:
listener.unsubscribed(message.channel(), message.count());
break;
case ssubscribe:
listener.ssubscribed(message.channel(), message.count());
break;
default:
throw new UnsupportedOperationException("Operation " + message.type() + " not supported");
}
Expand All @@ -278,6 +287,9 @@ private void updateInternalState(PubSubMessage<K, V> message) {
case unsubscribe:
channels.remove(new Wrapper<>(message.channel()));
break;
case ssubscribe:
shardChannels.add(new Wrapper<>(message.channel()));
break;
default:
break;
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/lettuce/core/pubsub/PubSubOutput.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class PubSubOutput<K, V> extends CommandOutput<K, V, V> implements PubSub

public enum Type {

message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe;
message, pmessage, psubscribe, punsubscribe, subscribe, unsubscribe, ssubscribe;

private final static Set<String> names = new HashSet<>();

Expand Down Expand Up @@ -122,6 +122,7 @@ private void handleOutput(ByteBuffer bytes) {
break;
case subscribe:
case unsubscribe:
case ssubscribe:
channel = codec.decodeKey(bytes);
break;
default:
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,9 @@
// empty adapter method
}

@Override
public void ssubscribed(K shardChannel, long count) {
// empty adapter method
}

Check warning on line 65 in src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/pubsub/RedisPubSubAdapter.java#L65

Added line #L65 was not covered by tests

}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,12 @@ public RedisFuture<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return dispatch(commandBuilder.pubsubShardNumsub(shardChannels));
}

@Override
@SuppressWarnings("unchecked")
public RedisFuture<Void> ssubscribe(K... channels) {
return (RedisFuture<Void>) dispatch(commandBuilder.ssubscribe(channels));
}

@Override
@SuppressWarnings("unchecked")
public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
Expand Down
10 changes: 10 additions & 0 deletions src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,14 @@
*/
void punsubscribed(K pattern, long count);

/**
* Subscribed to a Shard channel.
*
* @param shardChannel Shard channel
* @param count Subscription count.
*/
default void ssubscribed(K chashardChannelnnel, long count) {
atakavci marked this conversation as resolved.
Show resolved Hide resolved
throw new UnsupportedOperationException(

Check warning on line 87 in src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java#L87

Added line #L87 was not covered by tests
"This operation is not available by default. Use one of available pub/sub adapters or override this method in your class!");
}
atakavci marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import reactor.core.publisher.Mono;
import io.lettuce.core.RedisReactiveCommandsImpl;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.pubsub.api.reactive.ChannelMessage;
import io.lettuce.core.pubsub.api.reactive.PatternMessage;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
Expand Down Expand Up @@ -158,6 +159,15 @@ public Mono<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return createMono(() -> commandBuilder.pubsubShardNumsub(shardChannels));
}

@Override
public Mono<Void> ssubscribe(K... shardChannels) {
return createFlux(() -> {
Command<K, V, V> c = commandBuilder.ssubscribe(shardChannels);
System.out.println("");
atakavci marked this conversation as resolved.
Show resolved Hide resolved
return c;
}).then();
}

@Override
@SuppressWarnings("unchecked")
public StatefulRedisPubSubConnection<K, V> getStatefulConnection() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,12 @@ public interface RedisPubSubAsyncCommands<K, V> extends RedisAsyncCommands<K, V>
*/
StatefulRedisPubSubConnection<K, V> getStatefulConnection();

/**
* Listen for messages published to the given shard channels.
*
* @param shardChannels the shard channels
* @return RedisFuture&lt;Void&gt; Future to synchronize {@code subscribe} completion
*/
RedisFuture<Void> ssubscribe(K... shardChannels);

}
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,15 @@ public interface RedisPubSubReactiveCommands<K, V> extends RedisReactiveCommands
*/
Mono<Void> unsubscribe(K... channels);

/**
* Listen for messages published to the given shard channels. The {@link Mono} completes without a result as soon as the *
* subscription is registered.
*
* @param shardChannels the channels.
* @return Mono&lt;Void&gt; Mono for {@code subscribe} command.
*/
Mono<Void> ssubscribe(K... shardChannels);

/**
* @return the underlying connection.
* @since 6.2, will be removed with Lettuce 7 to avoid exposing the underlying connection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,13 @@ public interface RedisPubSubCommands<K, V> extends RedisCommands<K, V> {
*/
void unsubscribe(K... channels);

/**
* Listen for messages published to the given shard channels.
*
* @param shardChannels the channels
*/
void ssubscribe(K... shardChannels);

/**
* @return the underlying connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class ClusterReadOnlyCommandsUnitTests {

@Test
void testCount() {
assertThat(ClusterReadOnlyCommands.getReadOnlyCommands()).hasSize(85);
assertThat(ClusterReadOnlyCommands.getReadOnlyCommands()).hasSize(86);
}

@Test
Expand All @@ -26,4 +26,5 @@ void testResolvableCommandNames() {
assertThat(readOnlyCommand.name()).isEqualTo(CommandType.valueOf(readOnlyCommand.name()).name());
}
}

}
Loading
Loading