From 7cd521c229ac842ae8360b1f984b2545753795cb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=A1clav=20=C4=8Camra?= Date: Wed, 20 Jun 2018 00:34:54 +0200 Subject: [PATCH] Reworked the way local invokers register themselves to process messages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Václav Čamra --- .../ClientServer/Connection.cs | 45 ++++++++++++++++--- .../Invokers/LocalInvoker.cs | 18 ++++++++ Source/WebSocketRPC.Base/Invokers/Messages.cs | 4 +- .../RpcBinders/Base/IBinder.cs | 26 +++++++++-- .../RpcBinders/LocalBinder.cs | 19 ++++---- .../WebsocketRPC.Standalone.csproj | 2 +- 6 files changed, 93 insertions(+), 21 deletions(-) diff --git a/Source/WebSocketRPC.Base/ClientServer/Connection.cs b/Source/WebSocketRPC.Base/ClientServer/Connection.cs index a70e4d1..4fc2ed5 100644 --- a/Source/WebSocketRPC.Base/ClientServer/Connection.cs +++ b/Source/WebSocketRPC.Base/ClientServer/Connection.cs @@ -79,6 +79,7 @@ public static Encoding Encoding WebSocket socket; TaskQueue sendTaskQueue; + List localBinders = new List(); /// /// Creates a new connection. @@ -92,6 +93,11 @@ internal protected Connection(WebSocket socket, IReadOnlyDictionary /// Gets the cookie collection. /// @@ -168,13 +174,13 @@ private void invokeOnReceive(string msg) var members = OnReceive.GetInvocationList().Cast>(); var list = members.Select(x => x(msg)).ToList(); - var tsk = Task.WhenAll(list); - tsk.Wait(); + Task.WaitAll(list.ToArray()); - if (!list.Where(t => !t.IsFaulted).Any()) + bool allFaulted = !list.Where(t => !t.IsFaulted).Any(); + if (allFaulted) { - InvokeOnError(tsk.Exception); - } + InvokeOnError(list[0].Exception); + } } catch (Exception ex) when (ex.InnerException is TaskCanceledException) { } @@ -347,8 +353,7 @@ async Task listenReceiveAsync(CancellationToken token) default: var segment = new ArraySegment(receiveBuffer, 0, count); var msg = segment.ToString(encoding); - - invokeOnReceive(msg); + processMessage(msg); Debug.WriteLine("Received: " + msg); break; } @@ -359,6 +364,32 @@ async Task listenReceiveAsync(CancellationToken token) } } + private async void processMessage(string msg) + { + try + { + Request request = Request.FromJson(msg); + + var binder = localBinders.Where(b => b.CanProcessRequest(request)).FirstOrDefault(); + Response response = new Response(); + if (binder == null) + { + response.CallId = request.CallId; + response.FunctionName = request.FunctionName; + response.Error = $"Function not resolved: {response.FunctionName}"; + } + else + { + await binder.InvokeRequest(request); + } + invokeOnReceive(msg); + await SendAsync(response.ToJson()); + } + catch(Exception) //TODO: + { + } + } + #endregion } } diff --git a/Source/WebSocketRPC.Base/Invokers/LocalInvoker.cs b/Source/WebSocketRPC.Base/Invokers/LocalInvoker.cs index 1450368..ac22fd4 100644 --- a/Source/WebSocketRPC.Base/Invokers/LocalInvoker.cs +++ b/Source/WebSocketRPC.Base/Invokers/LocalInvoker.cs @@ -69,6 +69,24 @@ static void verifyType(MethodInfo[] methodList) throw new NotSupportedException("Properties are not permitted: " + String.Join(", ", propertyList.Select(x => x.Name)) + "."); } + public bool CanInvoke(Request clientMessage) + { + var functionName = clientMessage.FunctionName; + var (iface, name) = parseFunctionName(functionName); + functionName = name; + + bool ifacematch = iface == null || iface == typeof(TObj).FullName; + + if (!ifacematch || !methods.ContainsKey(functionName)) + return false; + + var methodParams = methods[functionName].GetParameters(); + if (methodParams.Length != clientMessage.Arguments.Length) + return false; + + return true; + } + public async Task InvokeAsync(TObj obj, Request clientMessage) { JToken result = null; diff --git a/Source/WebSocketRPC.Base/Invokers/Messages.cs b/Source/WebSocketRPC.Base/Invokers/Messages.cs index c861825..01759ee 100644 --- a/Source/WebSocketRPC.Base/Invokers/Messages.cs +++ b/Source/WebSocketRPC.Base/Invokers/Messages.cs @@ -30,7 +30,7 @@ namespace WebSocketRPC { - struct Request + public class Request { public string FunctionName; public string CallId; @@ -73,7 +73,7 @@ public string ToJson() public bool IsEmpty => FunctionName == null && Arguments == null; } - struct Response + public struct Response { public string FunctionName; public string CallId; diff --git a/Source/WebSocketRPC.Base/RpcBinders/Base/IBinder.cs b/Source/WebSocketRPC.Base/RpcBinders/Base/IBinder.cs index 4d56592..67b8aad 100644 --- a/Source/WebSocketRPC.Base/RpcBinders/Base/IBinder.cs +++ b/Source/WebSocketRPC.Base/RpcBinders/Base/IBinder.cs @@ -12,19 +12,39 @@ public interface IBinder /// /// Gets the associated connection. /// - Connection Connection { get; } + Connection Connection { get; } + } + + /// + /// The base local binder. + /// + public interface ILocalBinder : IBinder + { + /// + /// Checks whether the provided request can be processed by this IBinder. + /// + /// The request being checked. + /// True if this binder can invoke a method for this request; false otherwise. + bool CanProcessRequest(Request request); + + /// + /// Invoke a request. + /// + /// The invoked request. + /// The response for the given request. + Task InvokeRequest(Request request); } /// /// Interface for a local binder for the specified object type. /// /// Object type. - public interface ILocalBinder : IBinder + public interface ILocalBinder : ILocalBinder { /// /// Gets the associated object. /// - TObj Object { get; } + TObj Object { get; } } /// diff --git a/Source/WebSocketRPC.Base/RpcBinders/LocalBinder.cs b/Source/WebSocketRPC.Base/RpcBinders/LocalBinder.cs index c4d9ef5..c3f93ae 100644 --- a/Source/WebSocketRPC.Base/RpcBinders/LocalBinder.cs +++ b/Source/WebSocketRPC.Base/RpcBinders/LocalBinder.cs @@ -38,14 +38,7 @@ public LocalBinder(Connection connection, TObj obj) lInvoker = new LocalInvoker(); Object = obj; - Connection.OnReceive += async d => - { - var msg = Request.FromJson(d); - if (msg.IsEmpty) return; - - var result = await lInvoker.InvokeAsync(Object, msg); - await Connection.SendAsync(result.ToJson()); - }; + Connection.AddLocalBinder(this); Connection.OnClose += (s, d) => { @@ -57,5 +50,15 @@ public LocalBinder(Connection connection, TObj obj) } public TObj Object { get; private set; } + + public bool CanProcessRequest(Request request) + { + return lInvoker.CanInvoke(request); + } + + public Task InvokeRequest(Request request) + { + return lInvoker.InvokeAsync(Object, request); + } } } diff --git a/Source/WebsocketRPC.Standalone/WebsocketRPC.Standalone.csproj b/Source/WebsocketRPC.Standalone/WebsocketRPC.Standalone.csproj index e3b2fd7..edebeb2 100644 --- a/Source/WebsocketRPC.Standalone/WebsocketRPC.Standalone.csproj +++ b/Source/WebsocketRPC.Standalone/WebsocketRPC.Standalone.csproj @@ -30,7 +30,7 @@ websocket; websocket-client; websocket-server; RPC; C#; .NET true - 1.0.9 + 1.1.3 ../../Deploy/Nuget/bin/ True