Skip to content

Commit

Permalink
implementation of PUBSUB SHARDCHANNELS (#2793)
Browse files Browse the repository at this point in the history
* implementation of PUBSUB SHARDCHANNELS / issue #2756

* Polishing #2756
  • Loading branch information
atakavci authored Mar 26, 2024
1 parent 8c1529a commit fa182b7
Show file tree
Hide file tree
Showing 18 changed files with 248 additions and 1 deletion.
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
* @author Tugdual Grall
* @author dengliming
* @author Andrey Shlykov
* @author Ali Takavci
*/
@SuppressWarnings("unchecked")
public abstract class AbstractRedisAsyncCommands<K, V> implements RedisAclAsyncCommands<K, V>, RedisHashAsyncCommands<K, V>,
Expand Down Expand Up @@ -1556,6 +1557,16 @@ public RedisFuture<Map<K, Long>> pubsubNumsub(K... channels) {
return dispatch(commandBuilder.pubsubNumsub(channels));
}

@Override
public RedisFuture<List<K>> pubsubShardChannels() {
return dispatch(commandBuilder.pubsubShardChannels());
}

@Override
public RedisFuture<List<K>> pubsubShardChannels(K pattern) {
return dispatch(commandBuilder.pubsubShardChannels(pattern));
}

@Override
public RedisFuture<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return dispatch(commandBuilder.pubsubShardNumsub(shardChannels));
Expand Down
11 changes: 11 additions & 0 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
* @author Tugdual Grall
* @author dengliming
* @author Andrey Shlykov
* @author Ali Takavci
* @since 4.0
*/
public abstract class AbstractRedisReactiveCommands<K, V>
Expand Down Expand Up @@ -1635,6 +1636,16 @@ public Mono<Map<K, Long>> pubsubNumsub(K... channels) {
return createMono(() -> commandBuilder.pubsubNumsub(channels));
}

@Override
public Flux<K> pubsubShardChannels() {
return createDissolvingFlux(commandBuilder::pubsubShardChannels);
}

@Override
public Flux<K> pubsubShardChannels(K pattern) {
return createDissolvingFlux(() -> commandBuilder.pubsubShardChannels(pattern));
}

@Override
public Mono<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return createMono(() -> commandBuilder.pubsubShardNumsub(shardChannels));
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/lettuce/core/RedisCommandBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* @author dengliming
* @author Mikhael Sokolov
* @author Tihomir Mateev
* @author Ali Takavci
*/
@SuppressWarnings({ "unchecked", "varargs" })
class RedisCommandBuilder<K, V> extends BaseRedisCommandBuilder<K, V> {
Expand Down Expand Up @@ -2108,6 +2109,18 @@ Command<K, V, Map<K, Long>> pubsubNumsub(K... channels) {
return createCommand(PUBSUB, (MapOutput) new MapOutput<K, Long>((RedisCodec) codec), args);
}

Command<K, V, List<K>> pubsubShardChannels() {
CommandArgs<K, V> args = new CommandArgs<>(codec).add(SHARDCHANNELS);
return createCommand(PUBSUB, new KeyListOutput<>(codec), args);
}

Command<K, V, List<K>> pubsubShardChannels(K pattern) {
LettuceAssert.notNull(pattern, "Pattern " + MUST_NOT_BE_NULL);

CommandArgs<K, V> args = new CommandArgs<>(codec).add(SHARDCHANNELS).addKey(pattern);
return createCommand(PUBSUB, new KeyListOutput<>(codec), args);
}

Command<K, V, Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
LettuceAssert.notNull(shardChannels, "ShardChannels " + MUST_NOT_BE_NULL);
LettuceAssert.notEmpty(shardChannels, "ShardChannels " + MUST_NOT_BE_EMPTY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Ali Takavci
* @since 4.0
* @generated by io.lettuce.apigenerator.CreateAsyncApi
*/
Expand Down Expand Up @@ -66,6 +67,21 @@ public interface BaseRedisAsyncCommands<K, V> {
*/
RedisFuture<Map<K, Long>> pubsubNumsub(K... channels);

/**
* Lists the currently *active shard channels*.
*
* @return List&lt;K&gt; array-reply a list of active channels.
*/
RedisFuture<List<K>> pubsubShardChannels();

/**
* Lists the currently *active shard channels*.
*
* @param pattern the pattern type: patternkey (pattern).
* @return List&lt;K&gt; array-reply a list of active channels, optionally matching the specified pattern.
*/
RedisFuture<List<K>> pubsubShardChannels(K pattern);

/**
* Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Ali Takavci
* @since 4.0
* @generated by io.lettuce.apigenerator.CreateReactiveApi
*/
Expand Down Expand Up @@ -66,6 +67,21 @@ public interface BaseRedisReactiveCommands<K, V> {
*/
Mono<Map<K, Long>> pubsubNumsub(K... channels);

/**
* Lists the currently *active shard channels*.
*
* @return K array-reply a list of active channels.
*/
Flux<K> pubsubShardChannels();

/**
* Lists the currently *active shard channels*.
*
* @param pattern the pattern type: patternkey (pattern).
* @return K array-reply a list of active channels, optionally matching the specified pattern.
*/
Flux<K> pubsubShardChannels(K pattern);

/**
* Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels.
*
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/io/lettuce/core/api/sync/BaseRedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Ali Takavci
* @since 4.0
* @generated by io.lettuce.apigenerator.CreateSyncApi
*/
Expand Down Expand Up @@ -65,6 +66,21 @@ public interface BaseRedisCommands<K, V> {
*/
Map<K, Long> pubsubNumsub(K... channels);

/**
* Lists the currently *active shard channels*.
*
* @return List&lt;K&gt; array-reply a list of active channels.
*/
List<K> pubsubShardChannels();

/**
* Lists the currently *active shard channels*.
*
* @param pattern the pattern type: patternkey (pattern).
* @return List&lt;K&gt; array-reply a list of active channels, optionally matching the specified pattern.
*/
List<K> pubsubShardChannels(K pattern);

/**
* Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Ali Takavci
* @since 4.0
* @generated by io.lettuce.apigenerator.CreateAsyncNodeSelectionClusterApi
*/
Expand Down Expand Up @@ -66,6 +67,21 @@ public interface BaseNodeSelectionAsyncCommands<K, V> {
*/
AsyncExecutions<Map<K, Long>> pubsubNumsub(K... channels);

/**
* Lists the currently *active shard channels*.
*
* @return List&lt;K&gt; array-reply a list of active channels.
*/
AsyncExecutions<List<K>> pubsubShardChannels();

/**
* Lists the currently *active shard channels*.
*
* @param pattern the pattern type: patternkey (pattern).
* @return List&lt;K&gt; array-reply a list of active channels, optionally matching the specified pattern.
*/
AsyncExecutions<List<K>> pubsubShardChannels(K pattern);

/**
* Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Ali Takavci
* @since 4.0
* @generated by io.lettuce.apigenerator.CreateSyncNodeSelectionClusterApi
*/
Expand Down Expand Up @@ -61,6 +62,21 @@ public interface BaseNodeSelectionCommands<K, V> {
*/
Executions<Map<K, Long>> pubsubNumsub(K... channels);

/**
* Lists the currently *active shard channels*.
*
* @return List&lt;K&gt; array-reply a list of active channels.
*/
Executions<List<K>> pubsubShardChannels();

/**
* Lists the currently *active shard channels*.
*
* @param pattern the pattern type: patternkey (pattern).
* @return List&lt;K&gt; array-reply a list of active channels, optionally matching the specified pattern.
*/
Executions<List<K>> pubsubShardChannels(K pattern);

/**
* Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels.
*
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/lettuce/core/protocol/CommandKeyword.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
* @author Zhang Jessey
* @author dengliming
* @author Tihomir Mateev
* @author Ali Takavci
*/
public enum CommandKeyword implements ProtocolKeyword {

Expand All @@ -36,7 +37,7 @@ public enum CommandKeyword implements ProtocolKeyword {

IDLETIME, JUSTID, KILL, KEYSLOT, LEFT, LEN, LIMIT, LIST, LOAD, LOG, MATCH,

MAX, MAXLEN, MEET, MIN, MINID, MOVED, NO, NOACK, NOCOMMANDS, NODE, NODES, NOMKSTREAM, NOPASS, NOSAVE, NOT, NUMSUB, SHARDNUMSUB, NUMPAT, NX, OFF, ON, ONE, OR, PAUSE,
MAX, MAXLEN, MEET, MIN, MINID, MOVED, NO, NOACK, NOCOMMANDS, NODE, NODES, NOMKSTREAM, NOPASS, NOSAVE, NOT, NUMSUB, SHARDCHANNELS, SHARDNUMSUB, NUMPAT, NX, OFF, ON, ONE, OR, PAUSE,

REFCOUNT, REMOVE, RELOAD, REPLACE, REPLICATE, REPLICAS, REV, RESET, RESETCHANNELS, RESETKEYS, RESETPASS,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
*
* @author Mark Paluch
* @author Tihomir Mateev
* @author Ali Takavci
* @since 4.2
*/
@SuppressWarnings("varargs")
Expand Down Expand Up @@ -67,6 +68,11 @@ final Command<K, V, Map<K, Long>> pubsubNumsub(K... channels) {
return createCommand(PUBSUB, new MapOutput<>((RedisCodec) codec), args);
}

Command<K, V, List<K>> pubsubShardChannels(K pattern) {
CommandArgs<K, V> args = new PubSubCommandArgs<>(codec).add(SHARDCHANNELS).addKey(pattern);
return createCommand(PUBSUB, new KeyListOutput<>(codec), args);
}

@SafeVarargs
@SuppressWarnings({ "unchecked", "rawtypes" })
final Command<K, V, Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* @param <V> Value type.
* @author Will Glozer
* @author Mark Paluch
* @author Ali Takavci
*/
public class RedisPubSubAsyncCommandsImpl<K, V> extends RedisAsyncCommandsImpl<K, V> implements RedisPubSubAsyncCommands<K, V> {

Expand Down Expand Up @@ -85,6 +86,11 @@ public RedisFuture<Map<K, Long>> pubsubNumsub(K... channels) {
return dispatch(commandBuilder.pubsubNumsub(channels));
}

@Override
public RedisFuture<List<K>> pubsubShardChannels(K pattern) {
return dispatch(commandBuilder.pubsubShardChannels(pattern));
}

@Override
public RedisFuture<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return dispatch(commandBuilder.pubsubShardNumsub(shardChannels));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* @param <K> Key type.
* @param <V> Value type.
* @author Mark Paluch
* @author Ali Takavci
* @since 5.0
*/
public class RedisPubSubReactiveCommandsImpl<K, V> extends RedisReactiveCommandsImpl<K, V>
Expand Down Expand Up @@ -143,6 +144,11 @@ public Mono<Map<K, Long>> pubsubNumsub(K... channels) {
return createMono(() -> commandBuilder.pubsubNumsub(channels));
}

@Override
public Flux<K> pubsubShardChannels(K channel) {
return createDissolvingFlux(() -> commandBuilder.pubsubShardChannels(channel));
}

@Override
public Mono<Map<K, Long>> pubsubShardNumsub(K... shardChannels) {
return createMono(() -> commandBuilder.pubsubShardNumsub(shardChannels));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import kotlinx.coroutines.flow.Flow
* @param <K> Key type.
* @param <V> Value type.
* @author Mikhael Sokolov
* @author Ali Takavci
* @since 6.0
* @generated by io.lettuce.apigenerator.CreateKotlinCoroutinesApi
*/
Expand Down Expand Up @@ -65,6 +66,21 @@ interface BaseRedisCoroutinesCommands<K : Any, V : Any> {
* @return array-reply a list of channels and number of subscribers for every channel.
*/
suspend fun pubsubNumsub(vararg channels: K): Map<K, Long>

/**
* Lists the currently *active shard channels*.
*
* @return List<K> array-reply a list of active channels.
*/
suspend fun pubsubShardChannels(): List<K>

/**
* Lists the currently *active shard channels*.
*
* @param pattern the pattern type: patternkey (pattern).
* @return List<K> array-reply a list of active channels, optionally matching the specified pattern.
*/
suspend fun pubsubShardChannels(pattern: K): List<K>

/**
* Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import kotlinx.coroutines.reactive.awaitSingle
* @param <K> Key type.
* @param <V> Value type.
* @author Mikhael Sokolov
* @author Ali Takavci
* @since 6.0
*/
@ExperimentalLettuceCoroutinesApi
Expand All @@ -47,6 +48,10 @@ internal class BaseRedisCoroutinesCommandsImpl<K : Any, V : Any>(internal val op

override suspend fun pubsubNumsub(vararg channels: K): Map<K, Long> = ops.pubsubNumsub(*channels).awaitSingle()

override suspend fun pubsubShardChannels(): List<K> = ops.pubsubShardChannels().asFlow().toList()

override suspend fun pubsubShardChannels(pattern: K): List<K> = ops.pubsubShardChannels(pattern).asFlow().toList()

override suspend fun pubsubShardNumsub(vararg shardChannels: K): Map<K, Long> = ops.pubsubShardNumsub(*shardChannels).awaitSingle()

override suspend fun pubsubNumpat(): Long = ops.pubsubNumpat().awaitSingle()
Expand Down
16 changes: 16 additions & 0 deletions src/main/templates/io/lettuce/core/api/BaseRedisCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* @param <V> Value type.
* @author Mark Paluch
* @author Tihomir Mateev
* @author Ali Takavci
* @since 4.0
*/
public interface BaseRedisCommands<K, V> {
Expand Down Expand Up @@ -66,6 +67,21 @@ public interface BaseRedisCommands<K, V> {
*/
Map<K, Long> pubsubNumsub(K... channels);

/**
* Lists the currently *active shard channels*.
*
* @return List&lt;K&gt; array-reply a list of active channels.
*/
List<K> pubsubShardChannels();

/**
* Lists the currently *active shard channels*.
*
* @param pattern the pattern type: patternkey (pattern).
* @return List&lt;K&gt; array-reply a list of active channels, optionally matching the specified pattern.
*/
List<K> pubsubShardChannels(K pattern);

/**
* Returns the number of subscribers (not counting clients subscribed to patterns) for the specified shard channels.
*
Expand Down
Loading

0 comments on commit fa182b7

Please sign in to comment.