Skip to content

Commit

Permalink
[#360] Fix link uniqueness checks
Browse files Browse the repository at this point in the history
  • Loading branch information
xinchen10 committed May 17, 2024
1 parent da9139a commit f6bf1d2
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 91 deletions.
15 changes: 15 additions & 0 deletions src/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,18 @@ public IHandler Handler
get { return this.handler; }
}

internal string ContainerId
{
get;
private set;
}

internal string RemoteContainerId
{
get;
set;
}

object ThisLock
{
get { return this.lockObject; }
Expand Down Expand Up @@ -551,6 +563,8 @@ void SendHeader()

void SendOpen(Open open)
{
this.ContainerId = open.ContainerId;

IHandler handler = this.Handler;
if (handler != null && handler.CanHandle(EventId.ConnectionLocalOpen))
{
Expand Down Expand Up @@ -607,6 +621,7 @@ void OnOpen(Open open)
this.channelMax = open.ChannelMax;
}

this.RemoteContainerId = open.ContainerId;
this.remoteMaxFrameSize = open.MaxFrameSize;
uint idleTimeout = open.IdleTimeOut;
if (idleTimeout > 0 && this.heartBeat == null)
Expand Down
99 changes: 96 additions & 3 deletions src/Link.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,83 @@ public enum LinkState
End
}

struct LinkId : IEquatable<LinkId>
{
public string SourceContainer { get; private set; }

public string TargetContainer { get; private set; }

public bool Role { get; private set; }

public string Name { get; private set; }

public static LinkId Create(Connection connection, bool localRole, string name)
{
string sourceContainer;
string targetContainer;
if (localRole)
{
// Receiver
sourceContainer = connection.ContainerId;
targetContainer = connection.RemoteContainerId;
}
else
{
// Sender
sourceContainer = connection.RemoteContainerId;
targetContainer = connection.ContainerId;
}

return new LinkId()
{
SourceContainer = sourceContainer,
TargetContainer = targetContainer,
Role = localRole,
Name = name
};
}

public static bool Equals(LinkId a, LinkId b)
{
// Null values are not supported.
return string.Equals(a.SourceContainer, b.SourceContainer, StringComparison.Ordinal) &&
string.Equals(a.TargetContainer, b.TargetContainer, StringComparison.Ordinal) &&
a.Role == b.Role &&
string.Equals(a.Name, b.Name, StringComparison.Ordinal);
}

public override int GetHashCode()
{
int hash = this.SourceContainer.GetHashCode();
hash = hash * 31 + this.TargetContainer.GetHashCode();
hash = hash * 31 + this.Role.GetHashCode();
hash = hash * 31 + this.Name.GetHashCode();
return hash;
}

public override bool Equals(object obj)
{
if (obj is LinkId)
{
return Equals(this, (LinkId)obj);
}

return false;
}

bool IEquatable<LinkId>.Equals(LinkId other)
{
return Equals(this, other);
}
}

/// <summary>
/// The Link class represents an AMQP link.
/// </summary>
public abstract partial class Link : AmqpObject
{
readonly Session session;
readonly bool role;
readonly string name;
readonly uint handle;
readonly OnAttached onAttached;
Expand All @@ -86,7 +157,20 @@ public abstract partial class Link : AmqpObject
/// <param name="session">The session.</param>
/// <param name="name">The link name.</param>
/// <param name="onAttached">The callback to handle received attach.</param>
[Obsolete]
protected Link(Session session, string name, OnAttached onAttached)
: this(session, false, name, onAttached)
{
}

/// <summary>
/// Initializes the link.
/// </summary>
/// <param name="session">The session.</param>
/// <param name="role">The link's role (true for a receiver and false for a sender).</param>
/// <param name="name">The link name.</param>
/// <param name="onAttached">The callback to handle received attach.</param>
protected Link(Session session, bool role, string name, OnAttached onAttached)
{
if (session == null)
{
Expand All @@ -99,6 +183,7 @@ protected Link(Session session, string name, OnAttached onAttached)
}

this.session = session;
this.role = role;
this.name = name;
this.onAttached = onAttached;
this.handle = session.AddLink(this);
Expand All @@ -113,6 +198,14 @@ public string Name
get { return this.name; }
}

/// <summary>
/// Gets or sets the sender (false) or receiver (true) role of the link.
/// </summary>
public bool Role
{
get { return this.role; }
}

/// <summary>
/// Gets the link handle.
/// </summary>
Expand Down Expand Up @@ -316,14 +409,14 @@ internal void SendFlow(uint deliveryCount, uint credit, bool drain)
}
}

internal void SendAttach(bool role, uint initialDeliveryCount, Attach attach)
internal void SendAttach(uint initialDeliveryCount, Attach attach)
{
Fx.Assert(this.state == LinkState.Start, "state must be Start");
this.state = LinkState.AttachSent;
attach.LinkName = this.name;
attach.Handle = this.handle;
attach.Role = role;
if (!role)
attach.Role = this.Role;
if (!this.Role)
{
attach.InitialDeliveryCount = initialDeliveryCount;
}
Expand Down
55 changes: 4 additions & 51 deletions src/Listener/LinkCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,26 @@

namespace Amqp.Listener
{
using System;
using System.Collections.Concurrent;

class LinkCollection
{
readonly string containerId;
readonly ConcurrentDictionary<Key, ListenerLink> links;
readonly ConcurrentDictionary<LinkId, ListenerLink> links;

public LinkCollection(string containerId)
{
this.containerId = containerId;
this.links = new ConcurrentDictionary<Key, ListenerLink>();
this.links = new ConcurrentDictionary<LinkId, ListenerLink>();
}

public bool TryAdd(ListenerLink link)
{
Key key = new Key(this.containerId, link);
var key = LinkId.Create(link.Session.Connection, link.Role, link.Name);
return this.links.TryAdd(key, link);
}

public bool Remove(ListenerLink link)
{
Key key = new Key(this.containerId, link);
var key = LinkId.Create(link.Session.Connection, link.Role, link.Name);
ListenerLink temp;
return this.links.TryRemove(key, out temp);
}
Expand All @@ -48,49 +45,5 @@ public void Clear()
{
this.links.Clear();
}

class Key : IEquatable<Key>
{
string fromContainer;
string toContainer;
string name;

public Key(string containerId, ListenerLink link)
{
this.name = link.Name;
string remoteId = ((ListenerConnection)link.Session.Connection).RemoteContainerId;
if (link.Role)
{
this.fromContainer = remoteId;
this.toContainer = containerId;
}
else
{
this.fromContainer = containerId;
this.toContainer = remoteId;
}
}

public bool Equals(Key other)
{
return string.Equals(this.fromContainer, other.fromContainer, StringComparison.Ordinal) &&
string.Equals(this.toContainer, other.toContainer, StringComparison.Ordinal) &&
string.Equals(this.name, other.name, StringComparison.Ordinal);
}

public override int GetHashCode()
{
int hash = this.fromContainer.GetHashCode();
hash = hash * 31 + this.toContainer.GetHashCode();
hash = hash * 31 + this.name.GetHashCode();
return hash;
}

public override bool Equals(object obj)
{
Key key = obj as Key;
return key == null ? false : this.Equals(key);
}
}
}
}
6 changes: 0 additions & 6 deletions src/Listener/ListenerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ internal ConnectionListener Listener
get { return this.listener; }
}

internal string RemoteContainerId
{
get;
private set;
}

internal override void OnBegin(ushort remoteChannel, Begin begin)
{
this.ValidateChannel(remoteChannel);
Expand Down
30 changes: 10 additions & 20 deletions src/Listener/ListenerLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ namespace Amqp.Listener
/// </summary>
public class ListenerLink : Link
{
bool role;
object state;
SequenceNumber deliveryCount;
uint credit;
Expand Down Expand Up @@ -56,20 +55,11 @@ public class ListenerLink : Link
/// <param name="session">The session.</param>
/// <param name="attach">The received attach frame.</param>
public ListenerLink(ListenerSession session, Attach attach)
: base(session, attach.LinkName, null)
: base(session, !attach.Role, attach.LinkName, null)
{
this.role = !attach.Role;
this.SettleOnSend = attach.SndSettleMode == SenderSettleMode.Settled;
}

/// <summary>
/// Gets the sender (false) or receiver (true) role of the link.
/// </summary>
public bool Role
{
get { return this.role; }
}

/// <summary>
/// Gets the settled flag. If it is true, messages are sent settled.
/// </summary>
Expand Down Expand Up @@ -180,7 +170,7 @@ public void DisposeMessage(Message message, DeliveryState deliveryState, bool se
return;
}

this.Session.DisposeDelivery(this.role, delivery, deliveryState, settled);
this.Session.DisposeDelivery(this.Role, delivery, deliveryState, settled);
}

/// <summary>
Expand All @@ -193,16 +183,16 @@ public void CompleteAttach(Attach attach, Error error)
{
if (error != null)
{
this.SendAttach(this.role, attach.InitialDeliveryCount, new Attach() { Target = null, Source = null });
this.SendAttach(attach.InitialDeliveryCount, new Attach() { Target = null, Source = null });
}
else
{
if (!this.role)
if (!this.Role)
{
this.deliveryCount = attach.InitialDeliveryCount;
}

this.SendAttach(this.role, attach.InitialDeliveryCount, attach);
this.SendAttach(attach.InitialDeliveryCount, attach);
}

base.OnAttach(attach.Handle, attach);
Expand All @@ -213,7 +203,7 @@ public void CompleteAttach(Attach attach, Error error)
}
else
{
if (this.role)
if (this.Role)
{
this.SendFlow(this.deliveryCount, this.credit, false);
}
Expand Down Expand Up @@ -291,7 +281,7 @@ internal void InitializeLinkEndpoint(LinkEndpoint linkEndpoint, uint credit)
ThrowIfNotNull(this.onCredit, "sender");
ThrowIfNotNull(this.onDispose, "sender");
this.linkEndpoint = linkEndpoint;
if (this.role)
if (this.Role)
{
this.credit = credit;
this.autoRestore = true;
Expand All @@ -300,7 +290,7 @@ internal void InitializeLinkEndpoint(LinkEndpoint linkEndpoint, uint credit)

internal uint SendMessageInternal(Message message, ByteBuffer buffer, object userToken)
{
if (this.role)
if (this.Role)
{
throw new AmqpException(ErrorCode.NotAllowed, "Cannot send a message over a receiving link.");
}
Expand Down Expand Up @@ -356,7 +346,7 @@ internal uint SendMessageInternal(Message message, ByteBuffer buffer, object use

internal override void OnAttach(uint remoteHandle, Attach attach)
{
if (role)
if (this.Role)
{
this.deliveryCount = attach.InitialDeliveryCount;
}
Expand Down Expand Up @@ -391,7 +381,7 @@ internal override void OnFlow(Flow flow)
int delta = 0;
lock (this.ThisLock)
{
if (!this.role)
if (!this.Role)
{
this.drain = flow.Drain;
var theirLimit = (SequenceNumber)(flow.DeliveryCount + flow.LinkCredit);
Expand Down
4 changes: 2 additions & 2 deletions src/ReceiverLink.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ public ReceiverLink(Session session, string name, Source source, OnAttached onAt
/// <param name="attach">The attach frame to send for this link.</param>
/// <param name="onAttached">The callback to invoke when an attach is received from peer.</param>
public ReceiverLink(Session session, string name, Attach attach, OnAttached onAttached)
: base(session, name, onAttached)
: base(session, true, name, onAttached)
{
this.totalCredit = -1;
this.receivedMessages = new LinkedList();
this.waiterList = new LinkedList();
this.SendAttach(true, 0, attach);
this.SendAttach(0, attach);
}

/// <summary>
Expand Down
Loading

0 comments on commit f6bf1d2

Please sign in to comment.