diff --git a/node/src/RedisClusterClient.ts b/node/src/RedisClusterClient.ts index 36ab7cea0e..0da75b6765 100644 --- a/node/src/RedisClusterClient.ts +++ b/node/src/RedisClusterClient.ts @@ -13,7 +13,7 @@ import { createPing, } from "./Commands"; import { connection_request, redis_request } from "./ProtobufMessage"; -import { ClusterTransaction } from "./Transaction"; +import { BaseTransaction, ClusterTransaction } from "./Transaction"; export type ClusterClientConfiguration = BaseClientConfiguration; @@ -48,16 +48,19 @@ export type SlotKeyTypes = { key: string; }; -export type Routes = +export type Routes = + | SingleNodeRoute /** * Route request to all primary nodes. */ - | "allPrimaries" /** * Route request to all nodes. */ - | "allNodes" + | "allNodes"; + + +export type SingleNodeRoute = /** * Route request to a random node. */ @@ -193,12 +196,15 @@ export class RedisClusterClient extends BaseClient { * See https://redis.io/topics/Transactions/ for details on Redis Transactions. * * @param transaction - A ClusterTransaction object containing a list of commands to be executed. + * @param route - If `route` is not provided, the transaction will be routed to the slot owner of the first key found in the transaction. + * If no key is found, the command will be sent to a random node. + * If `route` is provided, the client will route the command to the nodes defined by `route`. * @returns A list of results corresponding to the execution of each command in the transaction. * If a command returns a value, it will be included in the list. If a command doesn't return a value, * the list entry will be null. */ - public exec(transaction: ClusterTransaction): Promise { - return this.createWritePromise(transaction.commands); + public exec(transaction: ClusterTransaction | BaseTransaction , route?: SingleNodeRoute): Promise { + return this.createWritePromise(transaction.commands , toProtobufRoute(route)); } /** Ping the Redis server. diff --git a/node/tests/RedisClientInternals.test.ts b/node/tests/RedisClientInternals.test.ts index 130b1083ad..c561fd9277 100644 --- a/node/tests/RedisClientInternals.test.ts +++ b/node/tests/RedisClientInternals.test.ts @@ -8,11 +8,13 @@ import { Reader } from "protobufjs"; import { BaseClientConfiguration, ClosingError, + InfoOptions, Logger, RedisClient, RedisClusterClient, RequestError, TimeoutError, + Transaction } from "../build-ts"; import { RedisClientConfiguration } from "../build-ts/src/RedisClient"; import { @@ -20,7 +22,7 @@ import { redis_request, response, } from "../src/ProtobufMessage"; -import { ClusterClientConfiguration } from "../src/RedisClusterClient"; +import { ClusterClientConfiguration, SlotKeyTypes } from "../src/RedisClusterClient"; const { RequestType, RedisRequest } = redis_request; @@ -247,6 +249,57 @@ describe("SocketConnectionInternals", () => { }); }); + it("should pass transaction with SlotKeyType", async () => { + await testWithClusterResources(async (connection, socket) => { + socket.once("data", (data) => { + const reader = Reader.create(data); + const request = RedisRequest.decodeDelimited(reader); + + expect(request.transaction?.commands?.at(0)?.requestType).toEqual( + RequestType.SetString + ); + expect(request.transaction?.commands?.at(0)?.argsArray?.args?.length).toEqual( + 2 + ); + expect(request.route?.slotKeyRoute?.slotKey).toEqual("key"); + expect(request.route?.slotKeyRoute?.slotType).toEqual(0); // Primary = 0 + + sendResponse(socket, ResponseType.OK, request.callbackIdx); + }); + const transaction = new Transaction(); + transaction.set("key" , "value"); + const slotKey: SlotKeyTypes = { + type: "primarySlotKey", + key: "key" + }; + const result = await connection.exec(transaction, slotKey); + expect(result).toBe("OK"); + }); + }); + + it("should pass transaction with random node", async () => { + await testWithClusterResources(async (connection, socket) => { + socket.once("data", (data) => { + const reader = Reader.create(data); + const request = RedisRequest.decodeDelimited(reader); + + expect(request.transaction?.commands?.at(0)?.requestType).toEqual( + RequestType.Info + ); + expect(request.transaction?.commands?.at(0)?.argsArray?.args?.length).toEqual( + 1 + ); + expect(request.route?.simpleRoutes).toEqual(redis_request.SimpleRoutes.Random); + + sendResponse(socket, ResponseType.Value, request.callbackIdx , "# Server"); + }); + const transaction = new Transaction(); + transaction.info([InfoOptions.Server]); + const result = await connection.exec(transaction, "randomNode"); + expect(result).toEqual(expect.stringContaining("# Server")); + }); + }); + it("should pass OK returned from socket", async () => { await testWithResources(async (connection, socket) => { socket.once("data", (data) => {