diff --git a/src/StackExchange.Redis/Exceptions.cs b/src/StackExchange.Redis/Exceptions.cs
index fe7da1424..c3cfd1c5b 100644
--- a/src/StackExchange.Redis/Exceptions.cs
+++ b/src/StackExchange.Redis/Exceptions.cs
@@ -151,6 +151,51 @@ public RedisException(string message, Exception innerException) : base(message,
protected RedisException(SerializationInfo info, StreamingContext ctx) : base(info, ctx) { }
}
+ ///
+ /// Indicates an exception raised by a redis server due to a hashslot being migrated, but command had a flag set to not
+ /// allow redirects
+ ///
+ [Serializable]
+ public sealed partial class RedisHashslotMigratedAndNoRedirectException : RedisException
+ {
+ ///
+ /// The hashslot that was migrated
+ ///
+ public int HashSlot
+ {
+ get;
+ }
+
+ ///
+ /// The new endpoint that hashslot is located on
+ ///
+ public string Endpoint
+ {
+ get;
+ }
+
+ ///
+ /// Creates a new .
+ ///
+ /// The message for the exception.
+ /// The hashslot that was migrated.
+ /// The endpoint where the hashslot is now located
+ public RedisHashslotMigratedAndNoRedirectException(string message, int hashSlot, string endpoint) : base(message)
+ {
+ HashSlot = hashSlot;
+ Endpoint = endpoint;
+ }
+
+ ///
+ /// Returns the error message in the format MOVED [HASH_SLOT] [NEW ENDPOINT]
+ ///
+ ///
+ public string GetMovedErrorMessage()
+ {
+ return CommonReplies.MOVED.ToString() + HashSlot.ToString() + " " + Endpoint;
+ }
+ }
+
///
/// Indicates an exception raised by a redis server.
///
diff --git a/src/StackExchange.Redis/Message.cs b/src/StackExchange.Redis/Message.cs
index 4a366449f..4b2c24a1a 100644
--- a/src/StackExchange.Redis/Message.cs
+++ b/src/StackExchange.Redis/Message.cs
@@ -447,7 +447,7 @@ public string ToStringCommandOnly() =>
bool ICompletable.TryComplete(bool isAsync) { Complete(); return true; }
- public void Complete()
+ public virtual void Complete()
{
//Ensure we can never call Complete on the same resultBox from two threads by grabbing it now
var currBox = Interlocked.Exchange(ref resultBox, null);
diff --git a/src/StackExchange.Redis/PublicAPI.Unshipped.txt b/src/StackExchange.Redis/PublicAPI.Unshipped.txt
index 5f282702b..dd330ffa1 100644
--- a/src/StackExchange.Redis/PublicAPI.Unshipped.txt
+++ b/src/StackExchange.Redis/PublicAPI.Unshipped.txt
@@ -1 +1,5 @@
-
\ No newline at end of file
+StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException
+StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException.Endpoint.get -> string
+StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException.GetMovedErrorMessage() -> string
+StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException.HashSlot.get -> int
+StackExchange.Redis.RedisHashslotMigratedAndNoRedirectException.RedisHashslotMigratedAndNoRedirectException(string message, int hashSlot, string endpoint) -> void
\ No newline at end of file
diff --git a/src/StackExchange.Redis/RedisTransaction.cs b/src/StackExchange.Redis/RedisTransaction.cs
index 35943c80f..54347cd13 100644
--- a/src/StackExchange.Redis/RedisTransaction.cs
+++ b/src/StackExchange.Redis/RedisTransaction.cs
@@ -1,5 +1,7 @@
using System;
+using System.Buffers;
using System.Collections.Generic;
+using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -56,6 +58,12 @@ public Task ExecuteAsync(CommandFlags flags)
return base.ExecuteAsync(msg, proc); // need base to avoid our local wrapping override
}
+ internal bool ExecuteInternal(CommandFlags flags, ServerEndPoint endpoint = null)
+ {
+ var msg = CreateMessage(flags, out ResultProcessor proc);
+ return base.ExecuteSync(msg, proc, endpoint); // need base to avoid our local "not supported" override
+ }
+
internal override Task ExecuteAsync(Message message, ResultProcessor processor, ServerEndPoint server = null)
{
if (message == null) return CompletedTask.Default(asyncState);
@@ -146,6 +154,33 @@ public QueuedMessage(Message message) : base(message.Db, message.Flags | Command
Wrapped = message;
}
+ // for transactions, the inner operations should only be marked completed when the the final EXEC has been processed
+ ///
+ /// For queued messages (InnerOperations) in a the transaction, we cannot actually mark it complete until the exec
+ /// function has returned with the state of the transaction.
+ ///
+ /// Calling the base complete resets the ResultBox,
+ ///
+ public override void Complete()
+ {
+ // still need to activate continuations for GetMessages(),
+ // which might be waiting for the last innerOperation to
+ // complete.
+ ResultBox?.ActivateContinuations();
+ }
+
+ ///
+ /// This is called when the transaction has been complete (Exec), and marks the operations as complete, updates performance,
+ /// and clears the ResultBox.
+ ///
+ /// It also triggers the ActivateContinuations() for a second time.
+ ///
+ ///
+ public void TransactionComplete()
+ {
+ base.Complete();
+ }
+
public bool WasQueued
{
get => wasQueued;
@@ -428,6 +463,21 @@ private bool AreAllConditionsSatisfied(ConnectionMultiplexer multiplexer)
}
return result;
}
+
+ ///
+ /// When the transaction completed, the innerOperations needs to be informed that the transaction has been completed.
+ ///
+ public override void Complete()
+ {
+ // let all the inneroperations know that the transaction is complete
+ foreach (var msg in InnerOperations)
+ {
+ msg.TransactionComplete();
+ }
+
+ // continue marking this message as complete.
+ base.Complete();
+ }
}
private class TransactionProcessor : ResultProcessor
@@ -439,13 +489,90 @@ public override bool SetResult(PhysicalConnection connection, Message message, i
if (result.IsError && message is TransactionMessage tran)
{
string error = result.GetString();
+ bool isMoved = true;
+ var migratedErrorMessage = new HashSet();
+ RedisHashslotMigratedAndNoRedirectException migratedException;
+
foreach (var op in tran.InnerOperations)
{
- var inner = op.Wrapped;
- ServerFail(inner, error);
- inner.Complete();
+ var opResultBox = op.ResultBox;
+ // check if this internal operation errored out
+ if (opResultBox.IsFaulted)
+ {
+ // if this resultbox is one that allows us access to the error
+ if (opResultBox is IResultBox)
+ {
+ // get the error of the inner operation
+ var simpleOpResultBox = opResultBox as IResultBox;
+ Exception exception;
+ simpleOpResultBox.GetResult(out exception);
+
+ // append the inneroperation error to the transaction error
+ error += "\n\n" + op.Command.ToString() + ": " + exception?.Message;
+
+ // if the error is related to a hashslot being migrated, then add the error to a set.
+ // if ALL the errors are related to this hashslot being moved, then it's possibly to retry
+ // the transaction on the new endpoint
+ if (exception is RedisHashslotMigratedAndNoRedirectException)
+ {
+ migratedException = exception as RedisHashslotMigratedAndNoRedirectException;
+ migratedErrorMessage.Add(migratedException.GetMovedErrorMessage());
+ } else
+ {
+ isMoved = false;
+ }
+
+ } else
+ {
+ error += "\n\n" + op.Command.ToString() + ": Undeterminted Error";
+ // have to assume it's false
+ isMoved = false;
+ }
+ }
}
+
+ // all failed due to a hashslot move
+ if (isMoved && migratedErrorMessage.Count > 0)
+ {
+ // there should be a SINGLE MOVED error in the set (same endpoint and hashslot)
+ if (migratedErrorMessage.Count == 1)
+ {
+ // prepend the "MOVED" error to the start of the error, so the ResultProcessor
+ // is able to detect it, and retry the transaction
+ error = migratedErrorMessage.First() + " " + error;
+ foreach (var op in tran.InnerOperations)
+ {
+ // reset the state of the internal operations
+ var wasQueued = SimpleResultBox.Create();
+ op.SetSource(wasQueued, QueuedProcessor.Default);
+ }
+ }
+ // the transaction must have utilized multiple hashslots, with multiple ones that moved
+ else
+ {
+ isMoved = false;
+ error = "Multiple hashslots and/or endpoints detected as MOVED in a single transaction \n\n" + error;
+ }
+ }
+
+ // if this is not a recoverable MOVED error,
+ if(!isMoved)
+ {
+ // then mark all the inneroperation's wrapped operations as failed, and complete
+ foreach (var op in tran.InnerOperations)
+ {
+ var inner = op.Wrapped;
+ ServerFail(inner, error);
+ inner.Complete();
+ }
+ }
+
+ // take our updated error message, and pass it to the base ResultProcessor.
+ var newResult = new ReadOnlySequence(Encoding.UTF8.GetBytes(error));
+ return base.SetResult(connection, message, new RawResult(ResultType.Error, newResult, false));
}
+
+ // allow the base processor to process to result of the transaction
return base.SetResult(connection, message, result);
}
diff --git a/src/StackExchange.Redis/ResultProcessor.cs b/src/StackExchange.Redis/ResultProcessor.cs
index 89e630813..372046cca 100644
--- a/src/StackExchange.Redis/ResultProcessor.cs
+++ b/src/StackExchange.Redis/ResultProcessor.cs
@@ -175,6 +175,11 @@ public static void ConnectionFail(Message message, ConnectionFailureType fail, s
SetException(message, new RedisConnectionException(fail, errorMessage));
}
+ public static void HashSlotMigratedAndNoRedirectAllowedFail(Message message, int hashSlot, string endpoint, string errorMessage)
+ {
+ SetException(message, new RedisHashslotMigratedAndNoRedirectException(errorMessage, hashSlot, endpoint));
+ }
+
public static void ServerFail(Message message, string errorMessage)
{
SetException(message, new RedisServerException(errorMessage));
@@ -207,14 +212,17 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
bool wasNoRedirect = (message.Flags & CommandFlags.NoRedirect) != 0;
string err = string.Empty;
bool unableToConnectError = false;
+ bool hashslotMigratedAndNoRedirectError = false;
+ int hashSlot = -1;
+ EndPoint endpoint = null;
if (isMoved || result.StartsWith(CommonReplies.ASK))
{
message.SetResponseReceived();
log = false;
- string[] parts = result.GetString().Split(StringSplits.Space, 3);
- EndPoint endpoint;
- if (Format.TryParseInt32(parts[1], out int hashSlot)
+ string[] parts = result.GetString().Split(StringSplits.Space, 4);
+
+ if (Format.TryParseInt32(parts[1], out hashSlot)
&& (endpoint = Format.TryParseEndPoint(parts[2])) != null)
{
// no point sending back to same server, and no point sending to a dead server
@@ -231,6 +239,7 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
{
if (isMoved && wasNoRedirect)
{
+ hashslotMigratedAndNoRedirectError = true;
err = $"Key has MOVED to Endpoint {endpoint} and hashslot {hashSlot} but CommandFlags.NoRedirect was specified - redirect not followed for {message.CommandAndKey}. ";
}
else
@@ -258,6 +267,11 @@ public virtual bool SetResult(PhysicalConnection connection, Message message, in
{
ConnectionFail(message, ConnectionFailureType.UnableToConnect, err);
}
+ // if this message has an error due to the hashslot being moved, and the command has been marked as NO REDIRECT.
+ else if(hashslotMigratedAndNoRedirectError && endpoint != null)
+ {
+ HashSlotMigratedAndNoRedirectAllowedFail(message, hashSlot, Format.ToString(endpoint), err);
+ }
else
{
ServerFail(message, err);
diff --git a/tests/StackExchange.Redis.Tests/Cluster.cs b/tests/StackExchange.Redis.Tests/Cluster.cs
index d76570482..61cfea55c 100644
--- a/tests/StackExchange.Redis.Tests/Cluster.cs
+++ b/tests/StackExchange.Redis.Tests/Cluster.cs
@@ -171,7 +171,7 @@ static string StringGet(IServer server, RedisKey key, CommandFlags flags = Comma
string b = StringGet(conn.GetServer(node.EndPoint), key);
Assert.Equal(value, b); // wrong primary, allow redirect
- var ex = Assert.Throws(() => StringGet(conn.GetServer(node.EndPoint), key, CommandFlags.NoRedirect));
+ var ex = Assert.Throws(() => StringGet(conn.GetServer(node.EndPoint), key, CommandFlags.NoRedirect));
Assert.StartsWith($"Key has MOVED to Endpoint {rightPrimaryNode.EndPoint} and hashslot {slot}", ex.Message);
}
@@ -188,7 +188,69 @@ static string StringGet(IServer server, RedisKey key, CommandFlags flags = Comma
string e = StringGet(conn.GetServer(node.EndPoint), key);
Assert.Equal(value, e); // wrong replica, allow redirect
- var ex = Assert.Throws(() => StringGet(conn.GetServer(node.EndPoint), key, CommandFlags.NoRedirect));
+ var ex = Assert.Throws(() => StringGet(conn.GetServer(node.EndPoint), key, CommandFlags.NoRedirect));
+ Assert.StartsWith($"Key has MOVED to Endpoint {rightPrimaryNode.EndPoint} and hashslot {slot}", ex.Message);
+ }
+ }
+ }
+
+ [Fact]
+ public void IntentionalWrongServerForTransaction()
+ {
+ static string[] TransactionalReplace(IServer server, RedisKey key, RedisValue newRedisValue, CommandFlags flags = CommandFlags.None)
+ {
+ var database = server.Multiplexer.GetDatabase();
+ var transaction = database.CreateTransaction();
+ var serverEndpoint = new ServerEndPoint((ConnectionMultiplexer) server.Multiplexer, server.EndPoint);
+
+ Task originalVal = transaction.StringGetAsync(key, flags);
+ Task writeVal = transaction.StringSetAsync(key, newRedisValue, null, false, When.Always, flags);
+ Task newVal = transaction.StringGetAsync(key, flags);
+
+ var result = ((RedisTransaction)transaction).ExecuteInternal(flags, serverEndpoint);
+ Assert.True(result);
+ Assert.True(writeVal.Result);
+
+ return new string[] {
+ originalVal.Result, newVal.Result
+ };
+ }
+
+ using (var conn = Create())
+ {
+ var endpoints = conn.GetEndPoints();
+ var servers = endpoints.Select(e => conn.GetServer(e)).ToList();
+
+ var key = Me();
+ const string value = "abc";
+ const string newValue = "def";
+ var db = conn.GetDatabase();
+ db.KeyDelete(key, CommandFlags.FireAndForget);
+ db.StringSet(key, value, flags: CommandFlags.FireAndForget);
+ servers[0].Ping();
+ var config = servers[0].ClusterConfiguration;
+ Assert.NotNull(config);
+ int slot = conn.HashSlot(key);
+ var rightPrimaryNode = config.GetBySlot(key);
+ Assert.NotNull(rightPrimaryNode);
+ Log("Right Primary: {0} {1}", rightPrimaryNode.EndPoint, rightPrimaryNode.NodeId);
+
+ string[] responses = TransactionalReplace(conn.GetServer(rightPrimaryNode.EndPoint), key, newValue);
+ Assert.Equal(value, responses[0]); // right primary
+ Assert.Equal(newValue, responses[1]);
+
+ db.KeyDelete(key, CommandFlags.FireAndForget);
+ db.StringSet(key, value, flags: CommandFlags.FireAndForget);
+
+ var node = config.Nodes.FirstOrDefault(x => !x.IsReplica && x.NodeId != rightPrimaryNode.NodeId);
+ Assert.NotNull(node);
+ Log("Using Primary: {0}", node.EndPoint, node.NodeId);
+ {
+ string[] otherResponses = TransactionalReplace(conn.GetServer(node.EndPoint), key, newValue);
+ Assert.Equal(value, otherResponses[0]); // right primary
+ Assert.Equal(newValue, otherResponses[1]);
+
+ var ex = Assert.Throws(() => TransactionalReplace(conn.GetServer(node.EndPoint), key, newValue, CommandFlags.NoRedirect));
Assert.StartsWith($"Key has MOVED to Endpoint {rightPrimaryNode.EndPoint} and hashslot {slot}", ex.Message);
}
}