diff --git a/StackExchange.Redis/StackExchange/Redis/RedisChannel.cs b/StackExchange.Redis/StackExchange/Redis/RedisChannel.cs index 65cc7b014..7673ae9f2 100644 --- a/StackExchange.Redis/StackExchange/Redis/RedisChannel.cs +++ b/StackExchange.Redis/StackExchange/Redis/RedisChannel.cs @@ -12,6 +12,7 @@ public struct RedisChannel : IEquatable internal readonly byte[] Value; internal readonly bool IsPatternBased; + internal readonly bool IsKeyspaceChannel; /// /// Indicates whether the channel-name is either null or a zero-length value @@ -38,6 +39,7 @@ private RedisChannel(byte[] value, bool isPatternBased) { Value = value; IsPatternBased = isPatternBased; + IsKeyspaceChannel = value != null && Encoding.UTF8.GetString(value).ToLower().StartsWith("__key"); } private static bool DeterminePatternBased(byte[] value, PatternMode mode) diff --git a/StackExchange.Redis/StackExchange/Redis/RedisSubscriber.cs b/StackExchange.Redis/StackExchange/Redis/RedisSubscriber.cs index 2f9eca6c1..8d194b6a8 100644 --- a/StackExchange.Redis/StackExchange/Redis/RedisSubscriber.cs +++ b/StackExchange.Redis/StackExchange/Redis/RedisSubscriber.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Net; using System.Threading; using System.Threading.Tasks; @@ -144,7 +145,7 @@ internal long ValidateSubscriptions() private sealed class Subscription { private Action handler; - private ServerEndPoint owner; + private List owners = new List(); public Subscription(Action value) => handler = value; @@ -170,33 +171,81 @@ public bool Remove(Action value) } public Task SubscribeToServer(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall) + { + // subscribe to all masters in cluster for keyspace/keyevent notifications + if (channel.IsKeyspaceChannel) { + return SubscribeToMasters(multiplexer, channel, flags, asyncState, internalCall); + } + return SubscribeToSingleServer(multiplexer, channel, flags, asyncState, internalCall); + } + + private Task SubscribeToSingleServer(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall) { var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE; var selected = multiplexer.SelectServer(-1, cmd, flags, default(RedisKey)); - if (selected == null || Interlocked.CompareExchange(ref owner, selected, null) != null) return null; + lock (owners) + { + if (selected == null || owners.Contains(selected)) return null; + owners.Add(selected); + } var msg = Message.Create(-1, flags, cmd, channel); - + if (internalCall) msg.SetInternalCall(); return selected.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState); } + private Task SubscribeToMasters(ConnectionMultiplexer multiplexer, RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall) + { + List subscribeTasks = new List(); + var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE; + var masters = multiplexer.GetServerSnapshot().Where(s => !s.IsSlave && s.EndPoint.Equals(s.ClusterConfiguration.Origin)); + + lock (owners) + { + foreach (var master in masters) + { + if (owners.Contains(master)) continue; + owners.Add(master); + var msg = Message.Create(-1, flags, cmd, channel); + if (internalCall) msg.SetInternalCall(); + subscribeTasks.Add(master.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState)); + } + } + + return Task.WhenAll(subscribeTasks); + } + public Task UnsubscribeFromServer(RedisChannel channel, CommandFlags flags, object asyncState, bool internalCall) { - var oldOwner = Interlocked.Exchange(ref owner, null); - if (oldOwner == null) return null; + if (owners.Count == 0) return null; + List queuedTasks = new List(); var cmd = channel.IsPatternBased ? RedisCommand.PUNSUBSCRIBE : RedisCommand.UNSUBSCRIBE; var msg = Message.Create(-1, flags, cmd, channel); if (internalCall) msg.SetInternalCall(); - return oldOwner.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState); + foreach (var owner in owners) + queuedTasks.Add(owner.QueueDirectAsync(msg, ResultProcessor.TrackSubscriptions, asyncState)); + owners.Clear(); + return Task.WhenAll(queuedTasks.ToArray()); } - internal ServerEndPoint GetOwner() => Interlocked.CompareExchange(ref owner, null, null); + internal ServerEndPoint GetOwner() + { + var owner = owners?[0]; // we subscribe to arbitrary server, so why not return one + return Interlocked.CompareExchange(ref owner, null, null); + } internal void Resubscribe(RedisChannel channel, ServerEndPoint server) { - if (server != null && Interlocked.CompareExchange(ref owner, server, server) == server) + bool hasOwner; + + lock (owners) + { + hasOwner = owners.Contains(server); + } + + if (server != null && hasOwner) { var cmd = channel.IsPatternBased ? RedisCommand.PSUBSCRIBE : RedisCommand.SUBSCRIBE; var msg = Message.Create(-1, CommandFlags.FireAndForget, cmd, channel); @@ -208,16 +257,15 @@ internal void Resubscribe(RedisChannel channel, ServerEndPoint server) internal bool Validate(ConnectionMultiplexer multiplexer, RedisChannel channel) { bool changed = false; - var oldOwner = Interlocked.CompareExchange(ref owner, null, null); - if (oldOwner != null && !oldOwner.IsSelectable(RedisCommand.PSUBSCRIBE)) + if (owners.Count != 0 && !owners.All(o => o.IsSelectable(RedisCommand.PSUBSCRIBE))) { if (UnsubscribeFromServer(channel, CommandFlags.FireAndForget, null, true) != null) { changed = true; } - oldOwner = null; + owners.Clear(); } - if (oldOwner == null && SubscribeToServer(multiplexer, channel, CommandFlags.FireAndForget, null, true) != null) + if (owners.Count == 0 && SubscribeToServer(multiplexer, channel, CommandFlags.FireAndForget, null, true) != null) { changed = true; }