Skip to content

Commit

Permalink
Node: add BZMPOP command (valkey-io#2018)
Browse files Browse the repository at this point in the history
* Added BZMPOP command.

Signed-off-by: Guian Gumpac <[email protected]>
Co-authored-by: Aaron <[email protected]>
  • Loading branch information
GumpacG and aaron-congo authored Jul 26, 2024
1 parent 596ebea commit 33016aa
Show file tree
Hide file tree
Showing 7 changed files with 204 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
* Node: Added FCALL and FCALL_RO commands ([#2011](https://github.com/valkey-io/valkey-glide/pull/2011))
* Node: Added ZMPOP command ([#1994](https://github.com/valkey-io/valkey-glide/pull/1994))
* Node: Added ZINCRBY command ([#2009](https://github.com/valkey-io/valkey-glide/pull/2009))
* Node: Added BZMPOP command ([#2018](https://github.com/valkey-io/valkey-glide/pull/2018))

#### Breaking Changes
* Node: (Refactor) Convert classes to types ([#2005](https://github.com/valkey-io/valkey-glide/pull/2005))
Expand Down
51 changes: 47 additions & 4 deletions node/src/BaseClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import {
createBRPop,
createBitCount,
createBitOp,
createBZMPop,
createBitPos,
createDecr,
createDecrBy,
Expand Down Expand Up @@ -3677,7 +3678,7 @@ export class BaseClient {
* @param keys - The keys of the sorted sets.
* @param modifier - The element pop criteria - either {@link ScoreFilter.MIN} or
* {@link ScoreFilter.MAX} to pop the member with the lowest/highest score accordingly.
* @param count - The number of elements to pop.
* @param count - (Optional) The number of elements to pop. If not supplied, only one element will be popped.
* @returns A two-element `array` containing the key name of the set from which the element
* was popped, and a member-score `Record` of the popped element.
* If no member could be popped, returns `null`.
Expand All @@ -3692,12 +3693,54 @@ export class BaseClient {
* // Output: [ "zSet1", { three: 3, two: 2 } ] - "three" with score 3 and "two" with score 2 were popped from "zSet1".
* ```
*/
public zmpop(
key: string[],
public async zmpop(
keys: string[],
modifier: ScoreFilter,
count?: number,
): Promise<[string, [Record<string, number>]] | null> {
return this.createWritePromise(createZMPop(keys, modifier, count));
}

/**
* Pops a member-score pair from the first non-empty sorted set, with the given `keys` being
* checked in the order they are provided. Blocks the connection when there are no members
* to pop from any of the given sorted sets. `BZMPOP` is the blocking variant of {@link zmpop}.
*
* See https://valkey.io/commands/bzmpop/ for more details.
*
* @remarks
* 1. When in cluster mode, all `keys` must map to the same hash slot.
* 2. `BZMPOP` is a client blocking command, see {@link https://github.com/valkey-io/valkey-glide/wiki/General-Concepts#blocking-commands | the wiki}
* for more details and best practices.
* @param keys - The keys of the sorted sets.
* @param modifier - The element pop criteria - either {@link ScoreFilter.MIN} or
* {@link ScoreFilter.MAX} to pop the member with the lowest/highest score accordingly.
* @param timeout - The number of seconds to wait for a blocking operation to complete.
* A value of 0 will block indefinitely.
* @param count - (Optional) The number of elements to pop. If not supplied, only one element will be popped.
* @returns A two-element `array` containing the key name of the set from which the element
* was popped, and a member-score `Record` of the popped element.
* If no member could be popped, returns `null`.
*
* since Valkey version 7.0.0.
*
* @example
* ```typescript
* await client.zadd("zSet1", { one: 1.0, two: 2.0, three: 3.0 });
* await client.zadd("zSet2", { four: 4.0 });
* console.log(await client.bzmpop(["zSet1", "zSet2"], ScoreFilter.MAX, 0.1, 2));
* // Output: [ "zSet1", { three: 3, two: 2 } ] - "three" with score 3 and "two" with score 2 were popped from "zSet1".
* ```
*/
public async bzmpop(
keys: string[],
modifier: ScoreFilter,
timeout: number,
count?: number,
): Promise<[string, [Record<string, number>]] | null> {
return this.createWritePromise(createZMPop(key, modifier, count));
return this.createWritePromise(
createBZMPop(keys, modifier, timeout, count),
);
}

/**
Expand Down
24 changes: 24 additions & 0 deletions node/src/Commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2211,6 +2211,30 @@ export function createZMPop(
return createCommand(RequestType.ZMPop, args);
}

/**
* @internal
*/
export function createBZMPop(
keys: string[],
modifier: ScoreFilter,
timeout: number,
count?: number,
): command_request.Command {
const args: string[] = [
timeout.toString(),
keys.length.toString(),
...keys,
modifier,
];

if (count !== undefined) {
args.push("COUNT");
args.push(count.toString());
}

return createCommand(RequestType.BZMPop, args);
}

/**
* @internal
*/
Expand Down
34 changes: 33 additions & 1 deletion node/src/Transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
ZAddOptions,
createBLPop,
createBRPop,
createBZMPop,
createBitCount,
createBitOp,
createBitPos,
Expand Down Expand Up @@ -2170,7 +2171,7 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
* @param keys - The keys of the sorted sets.
* @param modifier - The element pop criteria - either {@link ScoreFilter.MIN} or
* {@link ScoreFilter.MAX} to pop the member with the lowest/highest score accordingly.
* @param count - The number of elements to pop.
* @param count - (Optional) The number of elements to pop. If not supplied, only one element will be popped.
*
* Command Response - A two-element `array` containing the key name of the set from which the
* element was popped, and a member-score `Record` of the popped element.
Expand All @@ -2182,6 +2183,37 @@ export class BaseTransaction<T extends BaseTransaction<T>> {
return this.addAndReturn(createZMPop(keys, modifier, count));
}

/**
* Pops a member-score pair from the first non-empty sorted set, with the given `keys` being
* checked in the order they are provided. Blocks the connection when there are no members
* to pop from any of the given sorted sets. `BZMPOP` is the blocking variant of {@link zmpop}.
*
* See https://valkey.io/commands/bzmpop/ for more details.
*
* @remarks `BZMPOP` is a client blocking command, see {@link https://github.com/valkey-io/valkey-glide/wiki/General-Concepts#blocking-commands | the wiki}
* for more details and best practices.
* @param keys - The keys of the sorted sets.
* @param modifier - The element pop criteria - either {@link ScoreFilter.MIN} or
* {@link ScoreFilter.MAX} to pop the member with the lowest/highest score accordingly.
* @param timeout - The number of seconds to wait for a blocking operation to complete.
* A value of 0 will block indefinitely.
* @param count - (Optional) The number of elements to pop. If not supplied, only one element will be popped.
*
* Command Response - A two-element `array` containing the key name of the set from which the element
* was popped, and a member-score `Record` of the popped element.
* If no member could be popped, returns `null`.
*
* since Valkey version 7.0.0.
*/
public bzmpop(
keys: string[],
modifier: ScoreFilter,
timeout: number,
count?: number,
): T {
return this.addAndReturn(createBZMPop(keys, modifier, timeout, count));
}

/**
* Increments the score of `member` in the sorted set stored at `key` by `increment`.
* If `member` does not exist in the sorted set, it is added with `increment` as its score.
Expand Down
1 change: 1 addition & 0 deletions node/tests/RedisClusterClient.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ describe("GlideClusterClient", () => {
client.sintercard(["abc", "zxy", "lkn"]),
client.zintercard(["abc", "zxy", "lkn"]),
client.zmpop(["abc", "zxy", "lkn"], ScoreFilter.MAX),
client.bzmpop(["abc", "zxy", "lkn"], ScoreFilter.MAX, 0.1),
);
}

Expand Down
86 changes: 86 additions & 0 deletions node/tests/SharedTests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4964,6 +4964,92 @@ export function runBaseTests<Context>(config: {
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`bzmpop test_%p`,
async (protocol) => {
await runTest(async (client: BaseClient, cluster: RedisCluster) => {
if (cluster.checkIfServerVersionLessThan("7.0.0")) return;
const key1 = "{key}-1" + uuidv4();
const key2 = "{key}-2" + uuidv4();
const nonExistingKey = "{key}-0" + uuidv4();
const stringKey = "{key}-string" + uuidv4();

expect(await client.zadd(key1, { a1: 1, b1: 2 })).toEqual(2);
expect(await client.zadd(key2, { a2: 0.1, b2: 0.2 })).toEqual(
2,
);

checkSimple(
await client.bzmpop([key1, key2], ScoreFilter.MAX, 0.1),
).toEqual([key1, { b1: 2 }]);
checkSimple(
await client.bzmpop([key2, key1], ScoreFilter.MAX, 0.1, 10),
).toEqual([key2, { a2: 0.1, b2: 0.2 }]);

// ensure that command doesn't time out even if timeout > request timeout (250ms by default)
expect(
await client.bzmpop([nonExistingKey], ScoreFilter.MAX, 0.5),
).toBeNull;
expect(
await client.bzmpop(
[nonExistingKey],
ScoreFilter.MAX,
0.55,
1,
),
).toBeNull;

// key exists, but it is not a sorted set
expect(await client.set(stringKey, "value")).toEqual("OK");
await expect(
client.bzmpop([stringKey], ScoreFilter.MAX, 0.1),
).rejects.toThrow(RequestError);
await expect(
client.bzmpop([stringKey], ScoreFilter.MAX, 0.1, 1),
).rejects.toThrow(RequestError);

// incorrect argument: key list should not be empty
await expect(
client.bzmpop([], ScoreFilter.MAX, 0.1, 1),
).rejects.toThrow(RequestError);

// incorrect argument: count should be greater than 0
await expect(
client.bzmpop([key1], ScoreFilter.MAX, 0.1, 0),
).rejects.toThrow(RequestError);

// incorrect argument: timeout can not be a negative number
await expect(
client.bzmpop([key1], ScoreFilter.MAX, -1, 10),
).rejects.toThrow(RequestError);

// check that order of entries in the response is preserved
const entries: Record<string, number> = {};

for (let i = 0; i < 10; i++) {
// a0 => 0, a1 => 1 etc
entries["a" + i] = i;
}

expect(await client.zadd(key2, entries)).toEqual(10);
const result = await client.bzmpop(
[key2],
ScoreFilter.MIN,
0.1,
10,
);

if (result) {
expect(result[1]).toEqual(entries);
}

// TODO: add test case with 0 timeout (no timeout) should never time out,
// but we wrap the test with timeout to avoid test failing or stuck forever
}, protocol);
},
config.timeout,
);

it.each([ProtocolVersion.RESP2, ProtocolVersion.RESP3])(
`geodist test_%p`,
async (protocol) => {
Expand Down
12 changes: 12 additions & 0 deletions node/tests/TestUtilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,18 @@ export async function transactionTest(
responseData.push(["zmpop([key14], MAX)", [key14, { two: 2.0 }]]);
baseTransaction.zmpop([key14], ScoreFilter.MAX, 1);
responseData.push(["zmpop([key14], MAX, 1)", [key14, { one: 1.0 }]]);
baseTransaction.zadd(key14, { one: 1.0, two: 2.0 });
responseData.push(["zadd(key14, { one: 1.0, two: 2.0 })", 2]);
baseTransaction.bzmpop([key14], ScoreFilter.MAX, 0.1);
responseData.push([
"bzmpop([key14], ScoreFilter.MAX, 0.1)",
[key14, { two: 2.0 }],
]);
baseTransaction.bzmpop([key14], ScoreFilter.MAX, 0.1, 1);
responseData.push([
"bzmpop([key14], ScoreFilter.MAX, 0.1, 1)",
[key14, { one: 1.0 }],
]);
}

baseTransaction.xadd(key9, [["field", "value1"]], { id: "0-1" });
Expand Down

0 comments on commit 33016aa

Please sign in to comment.