Skip to content

Commit

Permalink
Node: add binary support for PubSub commands, part 1 (valkey-io#2201)
Browse files Browse the repository at this point in the history
* Node: add binary support for PubSub commands, part 1

---------

Signed-off-by: Yi-Pin Chen <[email protected]>
  • Loading branch information
yipin-chen authored Aug 28, 2024
1 parent 8abbfb3 commit e48133f
Show file tree
Hide file tree
Showing 6 changed files with 48 additions and 28 deletions.
14 changes: 10 additions & 4 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6610,8 +6610,10 @@ export class BaseClient {
*
* @see {@link https://valkey.io/commands/pubsub-channels/|valkey.io} for more details.
*
* @param pattern - A glob-style pattern to match active channels.
* If not provided, all active channels are returned.
* @param options - (Optional) Additional parameters:
* - (Optional) `pattern`: A glob-style pattern to match active channels.
* If not provided, all active channels are returned.
* - (Optional) `decoder`: see {@link DecoderOption}.
* @returns A list of currently active channels matching the given pattern.
* If no pattern is specified, all active channels are returned.
*
Expand All @@ -6624,8 +6626,12 @@ export class BaseClient {
* console.log(newsChannels); // Output: ["news.sports", "news.weather"]
* ```
*/
public async pubsubChannels(pattern?: string): Promise<string[]> {
return this.createWritePromise(createPubSubChannels(pattern));
public async pubsubChannels(
options?: { pattern?: GlideString } & DecoderOption,
): Promise<GlideString[]> {
return this.createWritePromise(createPubSubChannels(options?.pattern), {
decoder: options?.decoder,
});
}

/**
Expand Down
8 changes: 4 additions & 4 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2207,8 +2207,8 @@ export function createTime(): command_request.Command {
* @internal
*/
export function createPublish(
message: string,
channel: string,
message: GlideString,
channel: GlideString,
sharded: boolean = false,
): command_request.Command {
const request = sharded ? RequestType.SPublish : RequestType.Publish;
Expand Down Expand Up @@ -3855,7 +3855,7 @@ export function createBLMPop(
* @internal
*/
export function createPubSubChannels(
pattern?: string,
pattern?: GlideString,
): command_request.Command {
return createCommand(RequestType.PubSubChannels, pattern ? [pattern] : []);
}
Expand All @@ -3880,7 +3880,7 @@ export function createPubSubNumSub(
* @internal
*/
export function createPubsubShardChannels(
pattern?: string,
pattern?: GlideString,
): command_request.Command {
return createCommand(RequestType.PubSubSChannels, pattern ? [pattern] : []);
}
Expand Down
5 changes: 4 additions & 1 deletion node/src/GlideClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,10 @@ export class GlideClient extends BaseClient {
* console.log(result); // Output: 1 - This message was posted to 1 subscription which is configured on primary node
* ```
*/
public async publish(message: string, channel: string): Promise<number> {
public async publish(
message: GlideString,
channel: GlideString,
): Promise<number> {
return this.createWritePromise(createPublish(message, channel));
}

Expand Down
21 changes: 15 additions & 6 deletions node/src/GlideClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1221,8 +1221,8 @@ export class GlideClusterClient extends BaseClient {
* ```
*/
public async publish(
message: string,
channel: string,
message: GlideString,
channel: GlideString,
sharded: boolean = false,
): Promise<number> {
return this.createWritePromise(
Expand All @@ -1236,8 +1236,10 @@ export class GlideClusterClient extends BaseClient {
*
* @see {@link https://valkey.io/commands/pubsub-shardchannels/|valkey.io} for details.
*
* @param pattern - A glob-style pattern to match active shard channels.
* If not provided, all active shard channels are returned.
* @param options - (Optional) Additional parameters:
* - (Optional) `pattern`: A glob-style pattern to match active shard channels.
* If not provided, all active shard channels are returned.
* - (Optional) `decoder`: see {@link DecoderOption}.
* @returns A list of currently active shard channels matching the given pattern.
* If no pattern is specified, all active shard channels are returned.
*
Expand All @@ -1250,8 +1252,15 @@ export class GlideClusterClient extends BaseClient {
* console.log(filteredChannels); // Output: ["channel1", "channel2"]
* ```
*/
public async pubsubShardChannels(pattern?: string): Promise<string[]> {
return this.createWritePromise(createPubsubShardChannels(pattern));
public async pubsubShardChannels(
options?: {
pattern?: GlideString;
} & DecoderOption,
): Promise<GlideString[]> {
return this.createWritePromise(
createPubsubShardChannels(options?.pattern),
{ decoder: options?.decoder },
);
}

/**
Expand Down
10 changes: 5 additions & 5 deletions node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3841,7 +3841,7 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
* Command Response - A list of currently active channels matching the given pattern.
* If no pattern is specified, all active channels are returned.
*/
public pubsubChannels(pattern?: string): T {
public pubsubChannels(pattern?: GlideString): T {
return this.addAndReturn(createPubSubChannels(pattern));
}

Expand Down Expand Up @@ -4027,7 +4027,7 @@ export class Transaction extends BaseTransaction<Transaction> {
* Command Response - Number of subscriptions in primary node that received the message.
* Note that this value does not include subscriptions that configured on replicas.
*/
public publish(message: string, channel: string): Transaction {
public publish(message: GlideString, channel: GlideString): Transaction {
return this.addAndReturn(createPublish(message, channel));
}
}
Expand Down Expand Up @@ -4154,8 +4154,8 @@ export class ClusterTransaction extends BaseTransaction<ClusterTransaction> {
* Command Response - Number of subscriptions in primary node that received the message.
*/
public publish(
message: string,
channel: string,
message: GlideString,
channel: GlideString,
sharded: boolean = false,
): ClusterTransaction {
return this.addAndReturn(createPublish(message, channel, sharded));
Expand All @@ -4172,7 +4172,7 @@ export class ClusterTransaction extends BaseTransaction<ClusterTransaction> {
* Command Response - A list of currently active shard channels matching the given pattern.
* If no pattern is specified, all active shard channels are returned.
*/
public pubsubShardChannels(pattern?: string): ClusterTransaction {
public pubsubShardChannels(pattern?: GlideString): ClusterTransaction {
return this.addAndReturn(createPubsubShardChannels(pattern));
}

Expand Down
18 changes: 10 additions & 8 deletions node/tests/PubSub.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2970,7 +2970,7 @@ describe("PubSub", () => {

expect(
await (publishingClient as GlideClusterClient).publish(
message,
Buffer.from(message),
channel,
true,
),
Expand All @@ -2979,7 +2979,7 @@ describe("PubSub", () => {
expect(
await (publishingClient as GlideClusterClient).publish(
message2,
channel,
Buffer.from(channel),
true,
),
).toEqual(1);
Expand Down Expand Up @@ -3352,15 +3352,17 @@ describe("PubSub", () => {
);

// Test pubsubChannels with pattern
const channelsWithPattern =
await client2.pubsubChannels(pattern);
const channelsWithPattern = await client2.pubsubChannels({
pattern,
});
expect(new Set(channelsWithPattern)).toEqual(
new Set([channel1, channel2]),
);

// Test with non-matching pattern
const nonMatchingChannels =
await client2.pubsubChannels("non_matching_*");
const nonMatchingChannels = await client2.pubsubChannels({
pattern: "non_matching_*",
});
expect(nonMatchingChannels.length).toBe(0);
} finally {
if (client1) {
Expand Down Expand Up @@ -3695,15 +3697,15 @@ describe("PubSub", () => {
// Test pubsubShardchannels with pattern
const channelsWithPattern = await (
client2 as GlideClusterClient
).pubsubShardChannels(pattern);
).pubsubShardChannels({ pattern });
expect(new Set(channelsWithPattern)).toEqual(
new Set([channel1, channel2]),
);

// Test with non-matching pattern
const nonMatchingChannels = await (
client2 as GlideClusterClient
).pubsubShardChannels("non_matching_*");
).pubsubShardChannels({ pattern: "non_matching_*" });
expect(nonMatchingChannels).toEqual([]);
} finally {
if (client1) {
Expand Down

0 comments on commit e48133f

Please sign in to comment.