Skip to content

Commit

Permalink
Merge pull request #1275 from ably/fix/spec-RTL12
Browse files Browse the repository at this point in the history
Fix spec RTL12
  • Loading branch information
sacOO7 authored Apr 19, 2024
2 parents cb7fb4a + ca85929 commit e6e7537
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 81 deletions.
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ This is a .NET client library for Ably which targets the 2.0 client library spec
## Supported platforms

* [.NET Standard 2.0+](https://learn.microsoft.com/en-us/dotnet/standard/net-standard?tabs=net-standard-2-0)
* .NET 6.x, 7.x, MAUI, check [MAUI config](#maui-configuration).
* .NET 6.0+, MAUI, check [MAUI config](#maui-configuration).
* .NET Framework 4.6.2+
* .NET (Core) 2.0+
* Mono 5.4+
Expand Down Expand Up @@ -424,7 +424,6 @@ var options = new ClientOptions
{
AuthCallback = async tokenParams =>
{
// Return serialized jwttokenstring returned from server
string jwtToken = await getJwtTokenFromServer(tokenParams);
int expiresIn = 3600; // assuming jwtToken has 1 hr expiry
return new TokenDetails(jwtToken) {
Expand Down
26 changes: 13 additions & 13 deletions src/IO.Ably.Shared/Extensions/PresenceExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,43 +2,43 @@
{
internal static class PresenceExtensions
{
public static bool IsSynthesized(this PresenceMessage msg)
public static bool IsServerSynthesized(this PresenceMessage msg)
{
return msg.Id == null || !msg.Id.StartsWith(msg.ConnectionId);
}

// RTP2b, RTP2c
public static bool IsNewerThan(this PresenceMessage thisMessage, PresenceMessage thatMessage)
public static bool IsNewerThan(this PresenceMessage existingMsg, PresenceMessage incomingMsg)
{
// RTP2b1
if (thisMessage.IsSynthesized() || thatMessage.IsSynthesized())
if (existingMsg.IsServerSynthesized() || incomingMsg.IsServerSynthesized())
{
return thisMessage.Timestamp > thatMessage.Timestamp;
return existingMsg.Timestamp > incomingMsg.Timestamp;
}

// RTP2b2
var thisValues = thisMessage.Id.Split(':');
var thatValues = thatMessage.Id.Split(':');
var thisValues = existingMsg.Id.Split(':');
var thatValues = incomingMsg.Id.Split(':');

// if any part of the message serial fails to parse then throw an exception
if (thisValues.Length != 3 ||
!(int.TryParse(thisValues[1], out int msgSerialThis) | int.TryParse(thisValues[2], out int indexThis)))
!(int.TryParse(thisValues[1], out int existingMsgSerial) | int.TryParse(thisValues[2], out int existingMsgIndex)))
{
throw new AblyException($"Parsing error. The Presence Message has an invalid Id '{thisMessage.Id}'.");
throw new AblyException($"Parsing error. The Presence Message has an invalid Id '{existingMsg.Id}'.");
}

if (thatValues.Length != 3 ||
!(int.TryParse(thatValues[1], out int msgSerialThat) | int.TryParse(thatValues[2], out int indexThat)))
!(int.TryParse(thatValues[1], out int incomingMsgSerial) | int.TryParse(thatValues[2], out int incomingMsgIndex)))
{
throw new AblyException($"Parsing error. The Presence Message has an invalid Id '{thatMessage.Id}'.");
throw new AblyException($"Parsing error. The Presence Message has an invalid Id '{incomingMsg.Id}'.");
}

if (msgSerialThis == msgSerialThat)
if (existingMsgSerial == incomingMsgSerial)
{
return indexThis > indexThat;
return existingMsgIndex > incomingMsgIndex;
}

return msgSerialThis > msgSerialThat;
return existingMsgSerial > incomingMsgSerial;
}
}
}
9 changes: 6 additions & 3 deletions src/IO.Ably.Shared/Realtime/ChannelMessageProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ public Task<bool> MessageReceived(ProtocolMessage protocolMessage, RealtimeState
}

// RTL12
if (channel.State == ChannelState.Attached && !protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed))
if (channel.State == ChannelState.Attached)
{
channel.Presence.ChannelAttached(protocolMessage, false);
channel.EmitErrorUpdate(protocolMessage.Error, false, protocolMessage);
if (!protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed))
{
channel.Presence.ChannelAttached(protocolMessage);
channel.EmitErrorUpdate(protocolMessage.Error, false, protocolMessage);
}
}
else
{
Expand Down
56 changes: 29 additions & 27 deletions src/IO.Ably.Shared/Realtime/Presence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,28 @@ internal Presence(IConnectionManager connection, RealtimeChannel channel, string
internal ILogger Logger { get; private set; }

/// <summary>
/// Has the sync completed.
/// Checks if presence sync has ended.
/// </summary>
public bool IsSyncComplete => MembersMap.SyncCompleted && !IsSyncInProgress;
///
[Obsolete("This property is deprecated, use SyncComplete instead")]
public bool IsSyncComplete => SyncComplete; // RTP13.

/// <summary>
/// Checks if presence sync has ended.
/// </summary>
///
public bool SyncComplete => MembersMap.SyncCompleted && !SyncInProgress; // RTP13

/// <summary>
/// Indicates whether there is currently a sync in progress.
/// </summary>
[Obsolete("This property is internal, will be removed in the future")]
public bool IsSyncInProgress => SyncInProgress;

/// <summary>
/// Indicates whether there is currently a sync in progress.
/// </summary>
public bool IsSyncInProgress => MembersMap.SyncInProgress;
internal bool SyncInProgress => MembersMap.SyncInProgress;

/// <summary>
/// Indicates all members present on the channel.
Expand Down Expand Up @@ -153,7 +167,7 @@ private async Task<bool> WaitForSyncAsync()
// The InternalSync should be completed and the channels Attached or Attaching
void CheckAndSet()
{
if (IsSyncComplete
if (SyncComplete
&& (_channel.State == ChannelState.Attached || _channel.State == ChannelState.Attaching))
{
tsc.TrySetResult(true);
Expand Down Expand Up @@ -532,7 +546,7 @@ internal void OnSyncMessage(ProtocolMessage protocolMessage)
/* If a new sequence identifier is sent from Ably, then the client library
* must consider that to be the start of a new sync sequence
* and any previous in-flight sync should be discarded. (part of RTP18)*/
if (IsSyncInProgress && _currentSyncChannelSerial.IsNotEmpty() && _currentSyncChannelSerial != syncSequenceId)
if (SyncInProgress && _currentSyncChannelSerial.IsNotEmpty() && _currentSyncChannelSerial != syncSequenceId)
{
EndSync();
}
Expand Down Expand Up @@ -582,7 +596,7 @@ internal void OnPresence(PresenceMessage[] messages)
// RTP2e
case PresenceAction.Leave:
broadcast &= MembersMap.Remove(message);
if (updateInternalPresence && !message.IsSynthesized())
if (updateInternalPresence && !message.IsServerSynthesized())
{
InternalMembersMap.Remove(message);
}
Expand Down Expand Up @@ -613,15 +627,15 @@ internal void OnPresence(PresenceMessage[] messages)

internal void StartSync()
{
if (!IsSyncInProgress)
if (!SyncInProgress)
{
MembersMap.StartSync();
}
}

private void EndSync()
{
if (!IsSyncInProgress)
if (!SyncInProgress)
{
return;
}
Expand All @@ -647,7 +661,7 @@ private void EnterMembersFromInternalPresenceMap()
{
try
{
var itemToSend = new PresenceMessage(PresenceAction.Enter, item.ClientId, item.Data);
var itemToSend = new PresenceMessage(PresenceAction.Enter, item.ClientId, item.Data, item.Id);
UpdatePresence(itemToSend, (success, info) =>
{
if (!success)
Expand Down Expand Up @@ -721,24 +735,15 @@ internal void ChannelSuspended(ErrorInfo error)
FailQueuedMessages(error);
}

internal void ChannelAttached(ProtocolMessage attachedMessage, bool isAttachWithoutMessageLoss = true)
internal void ChannelAttached(ProtocolMessage attachedMessage)
{
// RTP19
StartSync();

// RTP1
var hasPresence = attachedMessage != null && attachedMessage.HasFlag(ProtocolMessage.Flag.HasPresence);
if (hasPresence)
{
if (Logger.IsDebug)
{
Logger.Debug(
$"Protocol message has presence flag. Starting Presence SYNC. Flag: {attachedMessage.Flags}");
}

StartSync();
}
else
// RTP19
StartSync();

if (!hasPresence)
{
EndSync(); // RTP19
}
Expand All @@ -747,10 +752,7 @@ internal void ChannelAttached(ProtocolMessage attachedMessage, bool isAttachWith
SendQueuedMessages();

// RTP17f
if (isAttachWithoutMessageLoss)
{
EnterMembersFromInternalPresenceMap();
}
EnterMembersFromInternalPresenceMap();
}

private void SendQueuedMessages()
Expand Down
5 changes: 4 additions & 1 deletion src/IO.Ably.Shared/Realtime/RealtimeChannels.cs
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,10 @@ internal IDictionary<string, string> GetChannelSerials()
var channelSerials = new Dictionary<string, string>();
foreach (var realtimeChannel in this)
{
channelSerials[realtimeChannel.Name] = realtimeChannel.Properties.ChannelSerial;
if (realtimeChannel.State == ChannelState.Attached)
{
channelSerials[realtimeChannel.Name] = realtimeChannel.Properties.ChannelSerial;
}
}

return channelSerials;
Expand Down
4 changes: 2 additions & 2 deletions src/IO.Ably.Shared/Realtime/RecoveryKeyContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ public static RecoveryKeyContext Decode(string recover, ILogger logger = null)
{
return JsonHelper.Deserialize<RecoveryKeyContext>(recover);
}
catch (Exception)
catch (Exception e)
{
logger?.Warning($"Error deserializing recover - {recover}, setting it as null");
logger?.Warning($"Error deserializing recover - {recover}, setting it as null", e);
return null;
}
}
Expand Down
54 changes: 22 additions & 32 deletions src/IO.Ably.Tests.Shared/Realtime/PresenceSandboxSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public async Task WhenAttachingToAChannelWithNoMembers_PresenceShouldBeConsidere
await channel.WaitForAttachedState();
channel.State.Should().Be(ChannelState.Attached);

channel.Presence.IsSyncComplete.Should().BeTrue();
channel.Presence.SyncComplete.Should().BeTrue();
}

[Theory]
Expand Down Expand Up @@ -109,8 +109,8 @@ public async Task WhenAttachingToAChannelWithMembers_PresenceShouldBeInProgress(
if (args.Current == ChannelState.Attached)
{
Logger.Debug("Test: Setting inSync to - " + channel2.Presence.MembersMap.SyncInProgress);
syncInProgress = channel2.Presence.IsSyncInProgress;
syncComplete = channel2.Presence.IsSyncComplete;
syncInProgress = channel2.Presence.SyncInProgress;
syncComplete = channel2.Presence.SyncComplete;
awaiter.SetCompleted();
}
};
Expand Down Expand Up @@ -406,7 +406,6 @@ async Task<bool> WaitForNoPresenceOnChannel(IRestChannel rChannel)

// let the library know the transport is really dead
testTransport.Listener?.OnTransportEvent(testTransport.Id, TransportState.Closed);

await realtimeClient.WaitForState(ConnectionState.Disconnected);
await realtimeClient.WaitForState(ConnectionState.Connected);
await realtimeChannel.WaitForAttachedState();
Expand Down Expand Up @@ -450,45 +449,38 @@ public async Task OnAttach_ShouldEnterMembersFromInternalMap(Protocol protocol)
await Task.Delay(250);
presence.MembersMap.Members.Should().HaveCount(4);
presence.InternalMembersMap.Members.Should().HaveCount(1);
var internalMemberId = presence.InternalMembersMap.Members.Values.First().Id;

List<PresenceMessage> leaveMessages = new List<PresenceMessage>();
PresenceMessage updateMessage = null;
PresenceMessage enterMessage = null;
PresenceMessage enteredMember = null;
await WaitForMultiple(2, partialDone =>
{
presence.Subscribe(PresenceAction.Leave, message =>
{
leaveMessages.Add(message);
});
presence.Subscribe(PresenceAction.Update, message =>
{
updateMessage = message;
partialDone(); // 1 call
});
presence.Subscribe(PresenceAction.Enter, message =>
client.GetTestTransport().BeforeMessageSend = message =>
{
enterMessage = message; // not expected to hit
});
enteredMember = message.Presence.First();
client.GetTestTransport().BeforeMessageSend = _ => { };
partialDone();
};
client.GetTestTransport().AfterDataReceived = message =>
{
if (message.Action == ProtocolMessage.MessageAction.Attached)
{
bool hasPresence = message.HasFlag(ProtocolMessage.Flag.HasPresence);
hasPresence.Should().BeFalse();
bool resumed = message.HasFlag(ProtocolMessage.Flag.Resumed);
resumed.Should().BeTrue();
client.GetTestTransport().AfterDataReceived = _ => { };
partialDone(); // 1 call
}
};
// inject duplicate attached message with resume flag ( no RTL12 message loss event)
// inject duplicate attached message without resume flag
var protocolMessage = new ProtocolMessage(ProtocolMessage.MessageAction.Attached)
{
Channel = channelName,
Flags = 0,
};
protocolMessage.SetFlag(ProtocolMessage.Flag.Resumed);
protocolMessage.HasFlag(ProtocolMessage.Flag.Resumed).Should().BeTrue();
client.GetTestTransport().FakeReceivedMessage(protocolMessage);
});

Expand All @@ -498,15 +490,13 @@ await WaitForMultiple(2, partialDone =>
msg.ClientId.Should().BeOneOf("member_0", "member_1", "member_2", "local");
}

updateMessage.Should().NotBeNull();
updateMessage.ClientId.Should().Be("local");
enterMessage.Should().BeNull();

enteredMember.Should().NotBeNull();
enteredMember.Id.Should().Be(internalMemberId);
enteredMember.Action.Should().Be(PresenceAction.Enter);
enteredMember.ClientId.Should().Be("local");
presence.Unsubscribe();
var remainingMembers = await presence.GetAsync();

remainingMembers.Should().HaveCount(1);
remainingMembers.First().ClientId.Should().Be("local");
client.Close();
}

[Theory]
Expand Down Expand Up @@ -646,7 +636,7 @@ await WaitForMultiple(2, partialDone =>
msgA = null;
msgB = null;
var synthesizedMsg = new PresenceMessage(PresenceAction.Leave, clientB.ClientId) { ConnectionId = null };
synthesizedMsg.IsSynthesized().Should().BeTrue();
synthesizedMsg.IsServerSynthesized().Should().BeTrue();
channelB.Presence.OnPresence(new[] { synthesizedMsg });

msgB.Should().BeNull();
Expand Down Expand Up @@ -750,8 +740,8 @@ public async Task PresenceMap_WhenNotSyncingAndLeaveActionArrivesMemberKeyShould
members.Should().HaveCount(20);

// sync should not be in progress and initial an sync should have completed
channel.Presence.IsSyncInProgress.Should().BeFalse("sync should have completed");
channel.Presence.IsSyncComplete.Should().BeTrue();
channel.Presence.SyncInProgress.Should().BeFalse("sync should have completed");
channel.Presence.SyncComplete.Should().BeTrue();

// pull a random member key from the presence map
var memberNumber = new Random().Next(0, 19);
Expand Down Expand Up @@ -1514,19 +1504,19 @@ await WaitForMultiple(2, partialDone =>
presence2.Subscribe(PresenceAction.Enter, msg =>
{
presence2.MembersMap.Members.Should().HaveCount(presence2.IsSyncComplete ? 2 : 1);
presence2.MembersMap.Members.Should().HaveCount(presence2.SyncComplete ? 2 : 1);
presence2.Unsubscribe();
partialDone();
});
presence2.PendingPresenceQueue.Should().HaveCount(1);
presence2.IsSyncComplete.Should().BeFalse();
presence2.SyncComplete.Should().BeFalse();
presence2.MembersMap.Members.Should().HaveCount(0);
taskCountWaiter.Tick();
});

var transport = client2.GetTestTransport();
await new ConditionalAwaiter(() => presence2.IsSyncComplete);
await new ConditionalAwaiter(() => presence2.SyncComplete);
transport.ProtocolMessagesReceived.Any(m => m.Action == ProtocolMessage.MessageAction.Sync).
Should().BeTrue("Should receive sync message");
presence2.MembersMap.Members.Should().HaveCount(2);
Expand Down
2 changes: 1 addition & 1 deletion src/IO.Ably.Tests.Shared/Rest/SandboxSpecExtension.cs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ internal static async Task WaitForAttachedState(this IRealtimeChannel channel, T
internal static async Task<bool> WaitSync(this Presence presence, TimeSpan? waitSpan = null)
{
TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>();
var inProgress = presence.IsSyncInProgress;
var inProgress = presence.SyncInProgress;
if (inProgress == false)
{
return true;
Expand Down

0 comments on commit e6e7537

Please sign in to comment.