Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Acceptor Dynamic Session Templates (2nd version) #650

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions QuickFIXn/AcceptorSocketDescriptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ public IPEndPoint Address

#endregion

public AcceptorSocketDescriptor(IPEndPoint socketEndPoint, SocketSettings socketSettings, QuickFix.Dictionary sessionDict)
public AcceptorSocketDescriptor(IPEndPoint socketEndPoint, SocketSettings socketSettings, QuickFix.Dictionary sessionDict, SessionProvider sessionProvider)
{
socketEndPoint_ = socketEndPoint;
socketReactor_ = new ThreadedSocketReactor(socketEndPoint_, socketSettings, sessionDict, this);
socketReactor_ = new ThreadedSocketReactor(socketEndPoint_, socketSettings, sessionDict, this, sessionProvider);
}

public void AcceptSession(Session session)
Expand Down
6 changes: 3 additions & 3 deletions QuickFIXn/ClientHandlerThread.cs
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ public ClientHandlerThread(TcpClient tcpClient, long clientId, QuickFix.Dictiona
/// <param name="settingsDict"></param>
/// <param name="socketSettings"></param>
public ClientHandlerThread(TcpClient tcpClient, long clientId, QuickFix.Dictionary settingsDict, SocketSettings socketSettings)
: this(tcpClient, clientId, settingsDict, socketSettings, null)
: this(tcpClient, clientId, settingsDict, socketSettings, null, null)
{

}

internal ClientHandlerThread(TcpClient tcpClient, long clientId, QuickFix.Dictionary settingsDict,
SocketSettings socketSettings, AcceptorSocketDescriptor acceptorDescriptor)
SocketSettings socketSettings, AcceptorSocketDescriptor acceptorDescriptor, SessionProvider sessionProvider)
{
string debugLogFilePath = "log";
if (settingsDict.Has(SessionSettings.DEBUG_FILE_LOG_PATH))
Expand All @@ -73,7 +73,7 @@ internal ClientHandlerThread(TcpClient tcpClient, long clientId, QuickFix.Dictio
"ClientHandlerThread", clientId.ToString(), "Debug-" + Guid.NewGuid().ToString()));

this.Id = clientId;
socketReader_ = new SocketReader(tcpClient, socketSettings, this, acceptorDescriptor);
socketReader_ = new SocketReader(tcpClient, socketSettings, this, acceptorDescriptor, sessionProvider);
}

public void Start()
Expand Down
101 changes: 101 additions & 0 deletions QuickFIXn/SessionProvider.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
using System;
using System.Collections.Generic;

namespace QuickFix
{
/// <summary>
/// Supporting wildcards in acceptor's session names (for session templates)
/// to create a new Session upon connect, if its SessionID matches a template
/// </summary>
internal class SessionProvider
{
const string WildcardValue = "*";
internal static bool IsSessionTemplate(SessionID sessionID)
{
return sessionID.SenderCompID.Equals(WildcardValue)
|| sessionID.SenderSubID.Equals(WildcardValue)
|| sessionID.SenderLocationID.Equals(WildcardValue)
|| sessionID.TargetCompID.Equals(WildcardValue)
|| sessionID.TargetSubID.Equals(WildcardValue)
|| sessionID.TargetLocationID.Equals(WildcardValue);
}

private Dictionary<SessionID, TemplateParams> _templates = new Dictionary<SessionID, TemplateParams>();
internal void AddTemplate(SessionID sessionID, Dictionary dict, ThreadedSocketAcceptor acceptor, AcceptorSocketDescriptor descriptor)
{
lock (_templates)
{
if (_templates.ContainsKey(sessionID)) throw new ConfigError($"Duplicate session template {sessionID}");
_templates.Add(sessionID, new TemplateParams(dict, acceptor, descriptor));
}
}
/// <summary>
/// Looks up an earlier created session, or creates a new one,
/// if the SessionID matches a session's template
/// </summary>
internal Session GetSession(SessionID sessionID)
{
Session session = Session.LookupSession(sessionID);
if (null == session) session = CreateFromTemplate(sessionID);
return session;
}
/// <summary>
/// If the given SessionID matches any of the templates,
/// creates and returns the new session from the first matching template
/// </summary>
/// <param name="sessionID">Session name without wildcards</param>
private Session CreateFromTemplate(SessionID sessionID)
{
lock (_templates)
{
foreach (KeyValuePair<SessionID, TemplateParams> kv in _templates)
{
if (IsMatching(sessionID, kv.Key))
{
Dictionary dict = ReplaceWildcardsInSettins(sessionID, kv.Key, kv.Value.Dict);
kv.Value.Acceptor.SetSessionSettings(sessionID, dict);
return kv.Value.Acceptor.CreateAcceptorSession(sessionID, dict, kv.Value.Descriptor);
}
}
return null;
}
}

private static bool IsMatching(SessionID sessionID, SessionID templateID)
{
// Matching if for all parts (templateID is "*" AND sessionID is set) OR (templateID == sessionID)
return templateID.BeginString.Equals(sessionID.BeginString) // do not allow a wildcard for BeginString, as would have to implement logic to switch data dictionary to match FIX version, which does not seem to worth it...
&& ((templateID.SenderCompID.Equals(WildcardValue) && !sessionID.SenderCompID.Equals(SessionID.NOT_SET)) || templateID.SenderCompID.Equals(sessionID.SenderCompID))
&& ((templateID.SenderSubID.Equals(WildcardValue) && !sessionID.SenderSubID.Equals(SessionID.NOT_SET)) || templateID.SenderSubID.Equals(sessionID.SenderSubID))
&& ((templateID.SenderLocationID.Equals(WildcardValue) && !sessionID.SenderLocationID.Equals(SessionID.NOT_SET)) || templateID.SenderLocationID.Equals(sessionID.SenderLocationID))
&& ((templateID.TargetCompID.Equals(WildcardValue) && !sessionID.TargetCompID.Equals(SessionID.NOT_SET)) || templateID.TargetCompID.Equals(sessionID.TargetCompID))
&& ((templateID.TargetSubID.Equals(WildcardValue) && !sessionID.TargetSubID.Equals(SessionID.NOT_SET)) || templateID.TargetSubID.Equals(sessionID.TargetSubID))
&& ((templateID.TargetLocationID.Equals(WildcardValue) && !sessionID.TargetLocationID.Equals(SessionID.NOT_SET)) || templateID.TargetLocationID.Equals(sessionID.TargetLocationID));
}

private static Dictionary ReplaceWildcardsInSettins(SessionID sessionID, SessionID templateID, Dictionary dict)
{
QuickFix.Dictionary actualSettings = new QuickFix.Dictionary(dict);
if (WildcardValue.Equals(templateID.SenderCompID)) actualSettings.SetString(SessionSettings.SENDERCOMPID, sessionID.SenderCompID);
if (WildcardValue.Equals(templateID.SenderSubID)) actualSettings.SetString(SessionSettings.SENDERSUBID, sessionID.SenderSubID);
if (WildcardValue.Equals(templateID.SenderLocationID)) actualSettings.SetString(SessionSettings.SENDERLOCID, sessionID.SenderLocationID);
if (WildcardValue.Equals(templateID.TargetCompID)) actualSettings.SetString(SessionSettings.TARGETCOMPID, sessionID.TargetCompID);
if (WildcardValue.Equals(templateID.TargetSubID)) actualSettings.SetString(SessionSettings.TARGETSUBID, sessionID.TargetSubID);
if (WildcardValue.Equals(templateID.TargetLocationID)) actualSettings.SetString(SessionSettings.TARGETLOCID, sessionID.TargetLocationID);
return actualSettings;
}
}

internal class TemplateParams
{
internal Dictionary Dict { get; }
internal ThreadedSocketAcceptor Acceptor { get; }
internal AcceptorSocketDescriptor Descriptor { get; }
internal TemplateParams(Dictionary dict, ThreadedSocketAcceptor acceptor, AcceptorSocketDescriptor descriptor)
{
Dict = dict;
Acceptor = acceptor;
Descriptor = descriptor;
}
}
}
9 changes: 6 additions & 3 deletions QuickFIXn/SocketReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public class SocketReader : IDisposable
private TcpClient tcpClient_;
private ClientHandlerThread responder_;
private readonly AcceptorSocketDescriptor acceptorDescriptor_;
private readonly SessionProvider _sessionProvider;

/// <summary>
/// Keep a handle to the current outstanding read request (if any)
Expand All @@ -32,7 +33,7 @@ public SocketReader(TcpClient tcpClient, ClientHandlerThread responder)
}

public SocketReader(TcpClient tcpClient, SocketSettings settings, ClientHandlerThread responder)
: this(tcpClient, settings, responder, null)
: this(tcpClient, settings, responder, null, null)
{

}
Expand All @@ -41,12 +42,14 @@ internal SocketReader(
TcpClient tcpClient,
SocketSettings settings,
ClientHandlerThread responder,
AcceptorSocketDescriptor acceptorDescriptor)
AcceptorSocketDescriptor acceptorDescriptor,
SessionProvider sessionProvider)
{
tcpClient_ = tcpClient;
responder_ = responder;
acceptorDescriptor_ = acceptorDescriptor;
stream_ = Transport.StreamFactory.CreateServerStream(tcpClient, settings, responder.GetLog());
_sessionProvider = sessionProvider;
}

/// <summary> FIXME </summary>
Expand Down Expand Up @@ -142,7 +145,7 @@ protected void OnMessageFoundInternal(string msg)
{
if (null == qfSession_)
{
qfSession_ = Session.LookupSession(Message.GetReverseSessionID(msg));
qfSession_ = _sessionProvider.GetSession(Message.GetReverseSessionID(msg));
if (null == qfSession_)
{
this.Log("ERROR: Disconnecting; received message for unknown session: " + msg);
Expand Down
39 changes: 32 additions & 7 deletions QuickFIXn/ThreadedSocketAcceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@ namespace QuickFix
/// </summary>
public class ThreadedSocketAcceptor : IAcceptor
{


private Dictionary<SessionID, Session> sessions_ = new Dictionary<SessionID, Session>();
private SessionSettings settings_;
private Dictionary<IPEndPoint, AcceptorSocketDescriptor> socketDescriptorForAddress_ = new Dictionary<IPEndPoint, AcceptorSocketDescriptor>();
private SessionFactory sessionFactory_;
private SessionProvider _sessionProvider = new SessionProvider();
private bool isStarted_ = false;
private bool _disposed = false;
private object sync_ = new object();
Expand Down Expand Up @@ -112,7 +113,7 @@ private AcceptorSocketDescriptor GetAcceptorSocketDescriptor(Dictionary dict)
IPEndPoint socketEndPoint;
if (dict.Has(SessionSettings.SOCKET_ACCEPT_HOST))
{
string host = dict.GetString(SessionSettings.SOCKET_ACCEPT_HOST);
string host = dict.GetString(SessionSettings.SOCKET_ACCEPT_HOST);
IPAddress[] addrs = Dns.GetHostAddresses(host);
socketEndPoint = new IPEndPoint(addrs[0], port);
// Set hostname (if it is not already configured)
Expand All @@ -124,12 +125,12 @@ private AcceptorSocketDescriptor GetAcceptorSocketDescriptor(Dictionary dict)
}

socketSettings.Configure(dict);


AcceptorSocketDescriptor descriptor;
if (!socketDescriptorForAddress_.TryGetValue(socketEndPoint, out descriptor))
{
descriptor = new AcceptorSocketDescriptor(socketEndPoint, socketSettings, dict);
descriptor = new AcceptorSocketDescriptor(socketEndPoint, socketSettings, dict, _sessionProvider);
socketDescriptorForAddress_[socketEndPoint] = descriptor;
}

Expand All @@ -150,14 +151,38 @@ private bool CreateSession(SessionID sessionID, Dictionary dict)
if ("acceptor" == connectionType)
{
AcceptorSocketDescriptor descriptor = GetAcceptorSocketDescriptor(dict);
Session session = sessionFactory_.Create(sessionID, dict);
descriptor.AcceptSession(session);
sessions_[sessionID] = session;
if (SessionProvider.IsSessionTemplate(sessionID))
{
_sessionProvider.AddTemplate(sessionID, dict, this, descriptor);
}
else
{
_ = CreateAcceptorSession(sessionID, dict, descriptor);
}
return true;
}
}
return false;
}
/// <summary>
/// Can be called at initial sessions creation
/// or to create from a template a new dynamic session right at its first connect
/// </summary>
internal Session CreateAcceptorSession(SessionID sessionID, Dictionary dict, AcceptorSocketDescriptor descriptor)
{
Session session = sessionFactory_.Create(sessionID, dict);
descriptor.AcceptSession(session);
sessions_[sessionID] = session;
return session;
}
/// <summary>
/// Must be called before creating a dynamic session at its first connect
/// to add its settings into the global dictionary
/// </summary>
internal void SetSessionSettings(SessionID sessionID, Dictionary dict)
{
settings_.Set(sessionID, dict);
}

private void StartAcceptingConnections()
{
Expand Down
9 changes: 5 additions & 4 deletions QuickFIXn/ThreadedSocketReactor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public State ReactorState
private QuickFix.Dictionary sessionDict_;
private IPEndPoint serverSocketEndPoint_;
private readonly AcceptorSocketDescriptor acceptorDescriptor_;

private readonly SessionProvider _sessionProvider;
#endregion

[Obsolete("Use the other constructor")]
Expand All @@ -46,17 +46,18 @@ public ThreadedSocketReactor(IPEndPoint serverSocketEndPoint, SocketSettings soc
{ }

public ThreadedSocketReactor(IPEndPoint serverSocketEndPoint, SocketSettings socketSettings,
QuickFix.Dictionary sessionDict) : this(serverSocketEndPoint, socketSettings, sessionDict, null)
QuickFix.Dictionary sessionDict) : this(serverSocketEndPoint, socketSettings, sessionDict, null, null)
{

}
internal ThreadedSocketReactor(IPEndPoint serverSocketEndPoint, SocketSettings socketSettings, QuickFix.Dictionary sessionDict, AcceptorSocketDescriptor acceptorDescriptor)
internal ThreadedSocketReactor(IPEndPoint serverSocketEndPoint, SocketSettings socketSettings, QuickFix.Dictionary sessionDict, AcceptorSocketDescriptor acceptorDescriptor, SessionProvider sessionProvider)
{
socketSettings_ = socketSettings;
serverSocketEndPoint_ = serverSocketEndPoint;
tcpListener_ = new TcpListener(serverSocketEndPoint_);
sessionDict_ = sessionDict;
acceptorDescriptor_ = acceptorDescriptor;
_sessionProvider = sessionProvider;
}

public void Start()
Expand Down Expand Up @@ -115,7 +116,7 @@ public void Run()
{
ApplySocketOptions(client, socketSettings_);
ClientHandlerThread t =
new ClientHandlerThread(client, nextClientId_++, sessionDict_, socketSettings_, acceptorDescriptor_);
new ClientHandlerThread(client, nextClientId_++, sessionDict_, socketSettings_, acceptorDescriptor_, _sessionProvider);
t.Exited += OnClientHandlerThreadExited;
lock (sync_)
{
Expand Down
31 changes: 31 additions & 0 deletions UnitTests/SessionSettingsTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,37 @@ public void LoadSettingsWithDefaultSectionLast()
Assert.That(settings.Get(session6).GetLong("VALUE"), Is.EqualTo(6));
}

[Test]
public void LoadSettingsWithWildcards()
{
// May set wildcards to any field in the [DEFAULT] section
string configuration = new System.Text.StringBuilder()
.AppendLine("[DEFAULT]")
.AppendLine("ConnectionType=acceptor")
.AppendLine("BeginString=*")
.AppendLine("SenderCompID=*")
.AppendLine("TargetCompID=*")
.AppendLine("SessionQualifier=*")
.AppendLine("SomeValue=whatever")
.AppendLine("Empty=")
.ToString();
SessionSettings settings = new SessionSettings(new System.IO.StringReader(configuration));
Assert.That(settings.Get().GetString("BeginString"), Is.EqualTo("*"));
Assert.That(settings.Get().GetString("SenderCompID"), Is.EqualTo("*"));
Assert.That(settings.Get().GetString("TargetCompID"), Is.EqualTo("*"));
Assert.That(settings.Get().GetString("SessionQualifier"), Is.EqualTo("*"));
// BeginString cannot be a wildcard in any [SESSION] section
configuration = new System.Text.StringBuilder()
.AppendLine("[SESSION]")
.AppendLine("ConnectionType=acceptor")
.AppendLine("BeginString=*")
.AppendLine("SenderCompID=*")
.AppendLine("TargetCompID=*")
.ToString();
ConfigError ex = Assert.Throws<ConfigError>(delegate { _ = new SessionSettings(new System.IO.StringReader(configuration)); });
Assert.That(ex.Message, Is.EqualTo("Configuration failed: BeginString (*) must be FIX.4.0 to FIX.4.4 or FIXT.1.1"));
}

[Test]
public void DuplicateSession()
{
Expand Down
Loading