Skip to content
This repository has been archived by the owner on Dec 20, 2019. It is now read-only.

Commit

Permalink
Reworked the way local invokers register themselves to process messages
Browse files Browse the repository at this point in the history
Signed-off-by: Václav Čamra <[email protected]>
  • Loading branch information
vacamra committed Jun 19, 2018
1 parent a8926c4 commit 7cd521c
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 21 deletions.
45 changes: 38 additions & 7 deletions Source/WebSocketRPC.Base/ClientServer/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public static Encoding Encoding

WebSocket socket;
TaskQueue sendTaskQueue;
List<ILocalBinder> localBinders = new List<ILocalBinder>();

/// <summary>
/// Creates a new connection.
Expand All @@ -92,6 +93,11 @@ internal protected Connection(WebSocket socket, IReadOnlyDictionary<string, stri
this.Cookies = cookies;
}

internal void AddLocalBinder(ILocalBinder binder)
{
localBinders.Add(binder);
}

/// <summary>
/// Gets the cookie collection.
/// </summary>
Expand Down Expand Up @@ -168,13 +174,13 @@ private void invokeOnReceive(string msg)
var members = OnReceive.GetInvocationList().Cast<Func<string, Task>>();

var list = members.Select(x => x(msg)).ToList();
var tsk = Task.WhenAll(list);
tsk.Wait();
Task.WaitAll(list.ToArray());

if (!list.Where(t => !t.IsFaulted).Any())
bool allFaulted = !list.Where(t => !t.IsFaulted).Any();
if (allFaulted)
{
InvokeOnError(tsk.Exception);
}
InvokeOnError(list[0].Exception);
}
}
catch (Exception ex) when (ex.InnerException is TaskCanceledException)
{ }
Expand Down Expand Up @@ -347,8 +353,7 @@ async Task listenReceiveAsync(CancellationToken token)
default:
var segment = new ArraySegment<byte>(receiveBuffer, 0, count);
var msg = segment.ToString(encoding);

invokeOnReceive(msg);
processMessage(msg);
Debug.WriteLine("Received: " + msg);
break;
}
Expand All @@ -359,6 +364,32 @@ async Task listenReceiveAsync(CancellationToken token)
}
}

private async void processMessage(string msg)
{
try
{
Request request = Request.FromJson(msg);

var binder = localBinders.Where(b => b.CanProcessRequest(request)).FirstOrDefault();
Response response = new Response();
if (binder == null)
{
response.CallId = request.CallId;
response.FunctionName = request.FunctionName;
response.Error = $"Function not resolved: {response.FunctionName}";
}
else
{
await binder.InvokeRequest(request);
}
invokeOnReceive(msg);
await SendAsync(response.ToJson());
}
catch(Exception) //TODO:
{
}
}

#endregion
}
}
Expand Down
18 changes: 18 additions & 0 deletions Source/WebSocketRPC.Base/Invokers/LocalInvoker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,24 @@ static void verifyType(MethodInfo[] methodList)
throw new NotSupportedException("Properties are not permitted: " + String.Join(", ", propertyList.Select(x => x.Name)) + ".");
}

public bool CanInvoke(Request clientMessage)
{
var functionName = clientMessage.FunctionName;
var (iface, name) = parseFunctionName(functionName);
functionName = name;

bool ifacematch = iface == null || iface == typeof(TObj).FullName;

if (!ifacematch || !methods.ContainsKey(functionName))
return false;

var methodParams = methods[functionName].GetParameters();
if (methodParams.Length != clientMessage.Arguments.Length)
return false;

return true;
}

public async Task<Response> InvokeAsync(TObj obj, Request clientMessage)
{
JToken result = null;
Expand Down
4 changes: 2 additions & 2 deletions Source/WebSocketRPC.Base/Invokers/Messages.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

namespace WebSocketRPC
{
struct Request
public class Request
{
public string FunctionName;
public string CallId;
Expand Down Expand Up @@ -73,7 +73,7 @@ public string ToJson()
public bool IsEmpty => FunctionName == null && Arguments == null;
}

struct Response
public struct Response
{
public string FunctionName;
public string CallId;
Expand Down
26 changes: 23 additions & 3 deletions Source/WebSocketRPC.Base/RpcBinders/Base/IBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,39 @@ public interface IBinder
/// <summary>
/// Gets the associated connection.
/// </summary>
Connection Connection { get; }
Connection Connection { get; }
}

/// <summary>
/// The base local binder.
/// </summary>
public interface ILocalBinder : IBinder
{
/// <summary>
/// Checks whether the provided request can be processed by this IBinder.
/// </summary>
/// <param name="request">The request being checked.</param>
/// <returns>True if this binder can invoke a method for this request; false otherwise.</returns>
bool CanProcessRequest(Request request);

/// <summary>
/// Invoke a request.
/// </summary>
/// <param name="request">The invoked request.</param>
/// <returns>The response for the given request.</returns>
Task<Response> InvokeRequest(Request request);
}

/// <summary>
/// Interface for a local binder for the specified object type.
/// </summary>
/// <typeparam name="TObj">Object type.</typeparam>
public interface ILocalBinder<TObj> : IBinder
public interface ILocalBinder<TObj> : ILocalBinder
{
/// <summary>
/// Gets the associated object.
/// </summary>
TObj Object { get; }
TObj Object { get; }
}

/// <summary>
Expand Down
19 changes: 11 additions & 8 deletions Source/WebSocketRPC.Base/RpcBinders/LocalBinder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,7 @@ public LocalBinder(Connection connection, TObj obj)
lInvoker = new LocalInvoker<TObj>();
Object = obj;

Connection.OnReceive += async d =>
{
var msg = Request.FromJson(d);
if (msg.IsEmpty) return;

var result = await lInvoker.InvokeAsync(Object, msg);
await Connection.SendAsync(result.ToJson());
};
Connection.AddLocalBinder(this);

Connection.OnClose += (s, d) =>
{
Expand All @@ -57,5 +50,15 @@ public LocalBinder(Connection connection, TObj obj)
}

public TObj Object { get; private set; }

public bool CanProcessRequest(Request request)
{
return lInvoker.CanInvoke(request);
}

public Task<Response> InvokeRequest(Request request)
{
return lInvoker.InvokeAsync(Object, request);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
<PackageTags>websocket; websocket-client; websocket-server; RPC; C#; .NET</PackageTags>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>

<Version>1.0.9</Version>
<Version>1.1.3</Version>
<PackageOutputPath>../../Deploy/Nuget/bin/</PackageOutputPath>

<IncludeSource>True</IncludeSource>
Expand Down

0 comments on commit 7cd521c

Please sign in to comment.