Skip to content

Commit

Permalink
Node: add routing option to exec command
Browse files Browse the repository at this point in the history
  • Loading branch information
shohamazon authored Nov 14, 2023
1 parent 8056ce0 commit 90c43fd
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 7 deletions.
18 changes: 12 additions & 6 deletions node/src/RedisClusterClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
createPing,
} from "./Commands";
import { connection_request, redis_request } from "./ProtobufMessage";
import { ClusterTransaction } from "./Transaction";
import { BaseTransaction, ClusterTransaction } from "./Transaction";

export type ClusterClientConfiguration = BaseClientConfiguration;

Expand Down Expand Up @@ -48,16 +48,19 @@ export type SlotKeyTypes = {
key: string;
};

export type Routes =
export type Routes =
| SingleNodeRoute
/**
* Route request to all primary nodes.
*/

| "allPrimaries"
/**
* Route request to all nodes.
*/
| "allNodes"
| "allNodes";


export type SingleNodeRoute =
/**
* Route request to a random node.
*/
Expand Down Expand Up @@ -193,12 +196,15 @@ export class RedisClusterClient extends BaseClient {
* See https://redis.io/topics/Transactions/ for details on Redis Transactions.
*
* @param transaction - A ClusterTransaction object containing a list of commands to be executed.
* @param route - If `route` is not provided, the transaction will be routed to the slot owner of the first key found in the transaction.
* If no key is found, the command will be sent to a random node.
* If `route` is provided, the client will route the command to the nodes defined by `route`.
* @returns A list of results corresponding to the execution of each command in the transaction.
* If a command returns a value, it will be included in the list. If a command doesn't return a value,
* the list entry will be null.
*/
public exec(transaction: ClusterTransaction): Promise<ReturnType[]> {
return this.createWritePromise(transaction.commands);
public exec(transaction: ClusterTransaction | BaseTransaction , route?: SingleNodeRoute): Promise<ReturnType[]> {
return this.createWritePromise(transaction.commands , toProtobufRoute(route));
}

/** Ping the Redis server.
Expand Down
55 changes: 54 additions & 1 deletion node/tests/RedisClientInternals.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ import { Reader } from "protobufjs";
import {
BaseClientConfiguration,
ClosingError,
InfoOptions,
Logger,
RedisClient,
RedisClusterClient,
RequestError,
TimeoutError,
Transaction
} from "../build-ts";
import { RedisClientConfiguration } from "../build-ts/src/RedisClient";
import {
connection_request,
redis_request,
response,
} from "../src/ProtobufMessage";
import { ClusterClientConfiguration } from "../src/RedisClusterClient";
import { ClusterClientConfiguration, SlotKeyTypes } from "../src/RedisClusterClient";

const { RequestType, RedisRequest } = redis_request;

Expand Down Expand Up @@ -247,6 +249,57 @@ describe("SocketConnectionInternals", () => {
});
});

it("should pass transaction with SlotKeyType", async () => {
await testWithClusterResources(async (connection, socket) => {
socket.once("data", (data) => {
const reader = Reader.create(data);
const request = RedisRequest.decodeDelimited(reader);

expect(request.transaction?.commands?.at(0)?.requestType).toEqual(
RequestType.SetString
);
expect(request.transaction?.commands?.at(0)?.argsArray?.args?.length).toEqual(
2
);
expect(request.route?.slotKeyRoute?.slotKey).toEqual("key");
expect(request.route?.slotKeyRoute?.slotType).toEqual(0); // Primary = 0

sendResponse(socket, ResponseType.OK, request.callbackIdx);
});
const transaction = new Transaction();
transaction.set("key" , "value");
const slotKey: SlotKeyTypes = {
type: "primarySlotKey",
key: "key"
};
const result = await connection.exec(transaction, slotKey);
expect(result).toBe("OK");
});
});

it("should pass transaction with random node", async () => {
await testWithClusterResources(async (connection, socket) => {
socket.once("data", (data) => {
const reader = Reader.create(data);
const request = RedisRequest.decodeDelimited(reader);

expect(request.transaction?.commands?.at(0)?.requestType).toEqual(
RequestType.Info
);
expect(request.transaction?.commands?.at(0)?.argsArray?.args?.length).toEqual(
1
);
expect(request.route?.simpleRoutes).toEqual(redis_request.SimpleRoutes.Random);

sendResponse(socket, ResponseType.Value, request.callbackIdx , "# Server");
});
const transaction = new Transaction();
transaction.info([InfoOptions.Server]);
const result = await connection.exec(transaction, "randomNode");
expect(result).toEqual(expect.stringContaining("# Server"));
});
});

it("should pass OK returned from socket", async () => {
await testWithResources(async (connection, socket) => {
socket.once("data", (data) => {
Expand Down

0 comments on commit 90c43fd

Please sign in to comment.